Best Python code snippet using lisa_python
ethtool.py
Source:ethtool.py
...704 result.assert_exit_code(705 message=f" Couldn't change device {interface} hash level for {protocol}."706 )707 return self.get_device_rx_hash_level(interface, protocol, force_run=True)708 def get_device_sg_settings(709 self, interface: str, force_run: bool = False710 ) -> DeviceSgSettings:711 device = self._get_or_create_device_setting(interface)712 if not force_run and device.device_sg_settings:713 return device.device_sg_settings714 result = self.run(f"-k {interface}", force_run=force_run)715 result.assert_exit_code()716 device.device_sg_settings = DeviceSgSettings(interface, result.stdout)717 return device.device_sg_settings718 def change_device_sg_settings(719 self, interface: str, sg_setting: bool720 ) -> DeviceSgSettings:721 sg = "on" if sg_setting else "off"722 change_result = self.run(723 f"-K {interface} sg {sg}",724 sudo=True,725 force_run=True,726 )727 change_result.assert_exit_code(728 message=f" Couldn't change device {interface} scatter-gather settings."729 )730 return self.get_device_sg_settings(interface, force_run=True)731 def get_device_statistics(732 self, interface: str, force_run: bool = False733 ) -> DeviceStatistics:734 device = self._get_or_create_device_setting(interface)735 if not force_run and device.device_statistics:736 return device.device_statistics737 result = self.run(f"-S {interface}", force_run=True)738 if (result.exit_code != 0) and (739 "Operation not supported" in result.stdout740 or "no stats available" in result.stdout741 ):742 raise UnsupportedOperationException(743 f"ethtool -S {interface} operation not supported."744 )...
sriov.py
Source:sriov.py
...441 client_ethtool = client_node.tools[Ethtool]442 vm_nics = initialize_nic_info(environment)443 # skip test if scatter-gather can't be updated444 for _, client_nic_info in vm_nics[client_node.name].items():445 device_sg_settings = client_ethtool.get_device_sg_settings(446 client_nic_info.upper, True447 )448 if device_sg_settings.sg_fixed:449 raise SkippedException(450 "scatter-gather is fixed, it cannot be changed for device"451 f" {client_nic_info.upper}. Skipping test."452 )453 else:454 break455 # save original enabled features456 device_enabled_features_origin = client_ethtool.get_all_device_enabled_features(457 True458 )459 # run iperf3 on server side and client side460 # iperfResults.log stored client side log461 source_iperf3 = server_node.tools[Iperf3]462 dest_iperf3 = client_node.tools[Iperf3]463 source_iperf3.run_as_server_async()464 dest_iperf3.run_as_client_async(465 server_ip=server_node.internal_address,466 log_file=client_iperf3_log,467 run_time_seconds=self.TIME_OUT,468 )469 # wait for a while then check any error shown up in iperfResults.log470 dest_cat = client_node.tools[Cat]471 iperf_log = dest_cat.read(client_iperf3_log, sudo=True, force_run=True)472 assert_that(iperf_log).does_not_contain("error")473 # disable and enable VF in pci level474 for node in environment.nodes.list():475 lspci = node.tools[Lspci]476 lspci.disable_devices_by_type(constants.DEVICE_TYPE_SRIOV)477 lspci.enable_devices()478 # check VF still paired with synthetic nic479 vm_nics = initialize_nic_info(environment)480 # get the enabled features after disable and enable VF481 # make sure there is not any change482 device_enabled_features_after = client_ethtool.get_all_device_enabled_features(483 True484 )485 assert_that(device_enabled_features_origin[0].enabled_features).is_equal_to(486 device_enabled_features_after[0].enabled_features487 )488 # set on for scatter-gather feature for synthetic nic489 # verify vf scatter-gather feature has value 'on'490 for _, client_nic_info in vm_nics[client_node.name].items():491 new_settings = client_ethtool.change_device_sg_settings(492 client_nic_info.upper, True493 )494 device_vf_sg_settings = client_ethtool.get_device_sg_settings(495 client_nic_info.lower, True496 )497 assert_that(498 new_settings.sg_setting,499 "sg setting is not sync into VF.",500 ).is_equal_to(device_vf_sg_settings.sg_setting)501 # set off for scatter-gather feature for synthetic nic502 # verify vf scatter-gather feature has value 'off'503 for _, client_nic_info in vm_nics[client_node.name].items():504 new_settings = client_ethtool.change_device_sg_settings(505 client_nic_info.upper, False506 )507 device_vf_sg_settings = client_ethtool.get_device_sg_settings(508 client_nic_info.lower, True509 )510 assert_that(511 new_settings.sg_setting,512 "sg setting is not sync into VF.",513 ).is_equal_to(device_vf_sg_settings.sg_setting)514 # disable and enable VF in pci level515 for node in environment.nodes.list():516 lspci = node.tools[Lspci]517 lspci.disable_devices_by_type(constants.DEVICE_TYPE_SRIOV)518 lspci.enable_devices()519 # check VF still paired with synthetic nic520 vm_nics = initialize_nic_info(environment)521 # check VF's scatter-gather feature keep consistent with previous status522 for _, client_nic_info in vm_nics[client_node.name].items():523 device_vf_sg_settings = client_ethtool.get_device_sg_settings(524 client_nic_info.lower, True525 )526 assert_that(527 device_vf_sg_settings.sg_setting,528 "sg setting is not sync into VF.",529 ).is_equal_to(False)530 # disable and enable sriov in network interface level531 network_interface_feature = client_node.features[NetworkInterface]532 for _ in range(3):533 sriov_is_enabled = network_interface_feature.is_enabled_sriov()534 network_interface_feature.switch_sriov(enable=(not sriov_is_enabled))535 network_interface_feature.switch_sriov(enable=True)536 # check VF still paired with synthetic nic537 vm_nics = initialize_nic_info(environment)538 # check VF's scatter-gather feature keep consistent with previous status539 for _, client_nic_info in vm_nics[client_node.name].items():540 device_vf_sg_settings = client_ethtool.get_device_sg_settings(541 client_nic_info.lower, True542 )543 assert_that(544 device_vf_sg_settings.sg_setting,545 "sg setting is not sync into VF.",546 ).is_equal_to(False)547 # reload sriov modules548 module_in_used = ""549 for node in environment.nodes.list():550 module_in_used = remove_module(node)551 for node in environment.nodes.list():552 load_module(node, module_in_used)553 # check VF still paired with synthetic nic554 vm_nics = initialize_nic_info(environment)555 # check VF's scatter-gather feature keep consistent with previous status556 for _, client_nic_info in vm_nics[client_node.name].items():557 device_vf_sg_settings = client_ethtool.get_device_sg_settings(558 client_nic_info.lower, True559 )560 assert_that(561 device_vf_sg_settings.sg_setting,562 "sg setting is not sync into VF.",563 ).is_equal_to(False)564 # check there is no error happen in iperf3 log565 # after above operations566 dest_cat = client_node.tools[Cat]567 iperf_log = dest_cat.read(client_iperf3_log, sudo=True, force_run=True)568 assert_that(iperf_log).does_not_contain("error")569 @TestCaseMetadata(570 description="""571 This case is to verify interrupts count increased after network traffic...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!