Best Python code snippet using lisa_python
ethtool.py
Source:ethtool.py
...359 )360 def __init__(self, interface: str, protocol: str, raw_str: str) -> None:361 self.interface = interface362 self.protocol_hash_map: Dict[str, bool] = {}363 self._parse_rx_hash_level(interface, protocol, raw_str)364 def _parse_rx_hash_level(self, interface: str, protocol: str, raw_str: str) -> None:365 hash_level_pattern = self._rx_hash_level_pattern.search(raw_str)366 if not hash_level_pattern:367 raise LisaException(368 f"Cannot get {interface} rx hash level information for {protocol}"369 )370 self.protocol_hash_map[protocol] = (371 True372 if self._tcp_udp_rx_hash_level_enable_pattern.search(raw_str)373 else False374 )375class DeviceStatistics:376 # NIC statistics:377 # tx_scattered: 0378 # tx_no_memory: 0379 _statistics_pattern = re.compile(r"^\s+(?P<name>.*?)\: +?(?P<value>\d*?)\r?$")380 def __init__(self, interface: str, device_statistics_raw: str) -> None:381 self._parse_statistics_info(interface, device_statistics_raw)382 def _parse_statistics_info(self, interface: str, raw_str: str) -> None:383 statistics: Dict[str, int] = {}384 items = find_groups_in_lines(raw_str, self._statistics_pattern)385 statistics = {x["name"]: int(x["value"]) for x in items}386 self.interface = interface387 self.counters = statistics388@dataclass389class DeviceSettings:390 interface: str391 device_channel: Optional[DeviceChannel] = None392 device_features: Optional[DeviceFeatures] = None393 device_link_settings: Optional[DeviceLinkSettings] = None394 device_msg_level: Optional[DeviceMessageLevel] = None395 device_ringbuffer_settings: Optional[DeviceRingBufferSettings] = None396 device_gro_lro_settings: Optional[DeviceGroLroSettings] = None397 device_rss_hash_key: Optional[DeviceRssHashKey] = None398 device_rx_hash_level: Optional[DeviceRxHashLevel] = None399 device_sg_settings: Optional[DeviceSgSettings] = None400 device_firmware_version: Optional[str] = None401 device_statistics: Optional[DeviceStatistics] = None402class Ethtool(Tool):403 # ethtool -i eth0404 # driver: hv_netvsc405 # version:406 # firmware-version: N/A407 _firmware_version_pattern = re.compile(408 r"^firmware-version:[\s+](?P<value>.*?)?$", re.MULTILINE409 )410 @property411 def command(self) -> str:412 return "ethtool"413 @property414 def can_install(self) -> bool:415 return True416 def _initialize(self, *args: Any, **kwargs: Any) -> None:417 self._command = "ethtool"418 self._device_set: Set[str] = set()419 self._device_settings_map: Dict[str, DeviceSettings] = {}420 def _install(self) -> bool:421 posix_os: Posix = cast(Posix, self.node.os)422 posix_os.install_packages("ethtool")423 return self._check_exists()424 def get_device_driver(self, interface: str) -> str:425 _device_driver_pattern = re.compile(426 r"^[\s]*driver:(?P<value>.*?)?$", re.MULTILINE427 )428 cmd_result = self.run(f"-i {interface}")429 cmd_result.assert_exit_code(430 message=f"Could not find the driver information for {interface}"431 )432 driver_info = re.search(_device_driver_pattern, cmd_result.stdout)433 if not driver_info:434 raise LisaException(f"No driver information found for device {interface}")435 return driver_info.group("value")436 def get_device_list(self, force_run: bool = False) -> Set[str]:437 if (not force_run) and self._device_set:438 return self._device_set439 find_tool = self.node.tools[Find]440 netdirs = find_tool.find_files(441 self.node.get_pure_path("/sys/devices"),442 name_pattern="net",443 path_pattern="*vmbus*",444 ignore_case=True,445 )446 for netdir in netdirs:447 if not netdir:448 continue449 cmd_result = self.node.execute(f"ls {netdir}")450 cmd_result.assert_exit_code(message="Could not find the network device.")451 # add only the network devices with netvsc driver452 driver = self.get_device_driver(cmd_result.stdout)453 if "hv_netvsc" in driver:454 self._device_set.add(cmd_result.stdout)455 if not self._device_set:456 raise LisaException("Did not find any synthetic network interface.")457 return self._device_set458 def get_device_channels_info(459 self, interface: str, force_run: bool = False460 ) -> DeviceChannel:461 device = self._get_or_create_device_setting(interface)462 if not force_run and device.device_channel:463 return device.device_channel464 result = self.run(f"-l {interface}", force_run=force_run)465 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):466 raise UnsupportedOperationException(467 "ethtool -l {interface} operation not supported."468 )469 result.assert_exit_code(470 message=f"Couldn't get device {interface} channels info."471 )472 device_channel_info = DeviceChannel(interface, result.stdout)473 # Find the vCPU count to accurately get max channels for the device.474 lscpu = self.node.tools[Lscpu]475 vcpu_count = lscpu.get_core_count(force_run=True)476 if vcpu_count < device_channel_info.max_channels:477 device_channel_info.max_channels = vcpu_count478 device.device_channel = device_channel_info479 return device_channel_info480 def change_device_channels_info(481 self,482 interface: str,483 channel_count: int,484 ) -> DeviceChannel:485 change_result = self.run(486 f"-L {interface} combined {channel_count}", sudo=True, force_run=True487 )488 change_result.assert_exit_code(489 message=f" Couldn't change device {interface} channels count."490 )491 return self.get_device_channels_info(interface, force_run=True)492 def get_device_enabled_features(493 self, interface: str, force_run: bool = False494 ) -> DeviceFeatures:495 device = self._get_or_create_device_setting(interface)496 if not force_run and device.device_features:497 return device.device_features498 result = self.run(f"-k {interface}", force_run=force_run)499 result.assert_exit_code()500 device.device_features = DeviceFeatures(interface, result.stdout)501 return device.device_features502 def get_device_gro_lro_settings(503 self, interface: str, force_run: bool = False504 ) -> DeviceGroLroSettings:505 device = self._get_or_create_device_setting(interface)506 if not force_run and device.device_gro_lro_settings:507 return device.device_gro_lro_settings508 result = self.run(f"-k {interface}", force_run=force_run)509 result.assert_exit_code()510 device.device_gro_lro_settings = DeviceGroLroSettings(interface, result.stdout)511 return device.device_gro_lro_settings512 def change_device_gro_lro_settings(513 self, interface: str, gro_setting: bool, lro_setting: bool514 ) -> DeviceGroLroSettings:515 gro = "on" if gro_setting else "off"516 lro = "on" if lro_setting else "off"517 change_result = self.run(518 f"-K {interface} gro {gro} lro {lro}",519 sudo=True,520 force_run=True,521 )522 change_result.assert_exit_code(523 message=f" Couldn't change device {interface} GRO LRO settings."524 )525 return self.get_device_gro_lro_settings(interface, force_run=True)526 def get_device_link_settings(self, interface: str) -> DeviceLinkSettings:527 device = self._get_or_create_device_setting(interface)528 if device.device_link_settings:529 return device.device_link_settings530 result = self.run(interface)531 result.assert_exit_code()532 link_settings = DeviceLinkSettings(interface, result.stdout)533 device.device_link_settings = link_settings534 # Caching the message level settings if captured in DeviceLinkSettings.535 # Not returning this info from this method. Only caching.536 if link_settings.msg_level_number and link_settings.msg_level_name:537 msg_level_settings = DeviceMessageLevel(538 interface,539 msg_level_number=link_settings.msg_level_number,540 msg_level_name=link_settings.msg_level_name,541 )542 device.device_msg_level = msg_level_settings543 return link_settings544 def get_device_msg_level(545 self, interface: str, force_run: bool = False546 ) -> DeviceMessageLevel:547 device = self._get_or_create_device_setting(interface)548 if not force_run and device.device_msg_level:549 return device.device_msg_level550 result = self.run(interface, force_run=force_run)551 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):552 raise UnsupportedOperationException(553 f"ethtool {interface} operation not supported."554 )555 result.assert_exit_code(556 message=f"Couldn't get device {interface} message level information"557 )558 msg_level_settings = DeviceMessageLevel(interface, result.stdout)559 device.device_msg_level = msg_level_settings560 # caching the link settings if captured in DeviceMessageLevel.561 # will not this info from this method. Only caching.562 if msg_level_settings.link_settings:563 link_settings = DeviceLinkSettings(564 interface, link_settings=msg_level_settings.link_settings565 )566 device.device_link_settings = link_settings567 return msg_level_settings568 def set_unset_device_message_flag_by_name(569 self, interface: str, msg_flag: List[str], set: bool570 ) -> DeviceMessageLevel:571 if set:572 result = self.run(573 f"-s {interface} msglvl {' on '.join(flag for flag in msg_flag)} on",574 sudo=True,575 force_run=True,576 )577 result.assert_exit_code(578 message=f" Couldn't set device {interface} message flag/s {msg_flag}."579 )580 else:581 result = self.run(582 f"-s {interface} msglvl {' off '.join(flag for flag in msg_flag)} off",583 sudo=True,584 force_run=True,585 )586 result.assert_exit_code(587 message=f" Couldn't unset device {interface} message flag/s {msg_flag}."588 )589 return self.get_device_msg_level(interface, force_run=True)590 def set_device_message_flag_by_num(591 self, interface: str, msg_flag: str592 ) -> DeviceMessageLevel:593 result = self.run(594 f"-s {interface} msglvl {msg_flag}",595 sudo=True,596 force_run=True,597 )598 result.assert_exit_code(599 message=f" Couldn't set device {interface} message flag {msg_flag}."600 )601 return self.get_device_msg_level(interface, force_run=True)602 def get_device_ring_buffer_settings(603 self, interface: str, force_run: bool = False604 ) -> DeviceRingBufferSettings:605 device = self._get_or_create_device_setting(interface)606 if not force_run and device.device_ringbuffer_settings:607 return device.device_ringbuffer_settings608 result = self.run(f"-g {interface}", force_run=force_run)609 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):610 raise UnsupportedOperationException(611 f"ethtool -g {interface} operation not supported."612 )613 result.assert_exit_code(614 message=f"Couldn't get device {interface} ring buffer settings."615 )616 device.device_ringbuffer_settings = DeviceRingBufferSettings(617 interface, result.stdout618 )619 return device.device_ringbuffer_settings620 def change_device_ring_buffer_settings(621 self, interface: str, rx: int, tx: int622 ) -> DeviceRingBufferSettings:623 change_result = self.run(624 f"-G {interface} rx {rx} tx {tx}", sudo=True, force_run=True625 )626 change_result.assert_exit_code(627 message=f" Couldn't change device {interface} ring buffer settings."628 )629 return self.get_device_ring_buffer_settings(interface, force_run=True)630 def get_device_rss_hash_key(631 self, interface: str, force_run: bool = False632 ) -> DeviceRssHashKey:633 device = self._get_or_create_device_setting(interface)634 if not force_run and device.device_rss_hash_key:635 return device.device_rss_hash_key636 result = self.run(f"-x {interface}", force_run=force_run)637 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):638 raise UnsupportedOperationException(639 f"ethtool -x {interface} operation not supported."640 )641 result.assert_exit_code(642 message=f"Couldn't get device {interface} ring buffer settings."643 )644 device.device_rss_hash_key = DeviceRssHashKey(interface, result.stdout)645 return device.device_rss_hash_key646 def change_device_rss_hash_key(647 self, interface: str, hash_key: str648 ) -> DeviceRssHashKey:649 result = self.run(f"-X {interface} hkey {hash_key}", sudo=True, force_run=True)650 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):651 raise UnsupportedOperationException(652 f"Changing RSS hash key with 'ethtool -X {interface}' not supported."653 )654 result.assert_exit_code(655 message=f" Couldn't change device {interface} hash key."656 )657 return self.get_device_rss_hash_key(interface, force_run=True)658 def get_device_rx_hash_level(659 self, interface: str, protocol: str, force_run: bool = False660 ) -> DeviceRxHashLevel:661 device = self._get_or_create_device_setting(interface)662 if (663 not force_run664 and device.device_rx_hash_level665 and (protocol in device.device_rx_hash_level.protocol_hash_map.keys())666 ):667 return device.device_rx_hash_level668 result = self.run(669 f"-n {interface} rx-flow-hash {protocol}", force_run=force_run670 )671 if "Operation not supported" in result.stdout:672 raise UnsupportedOperationException(673 f"ethtool -n {interface} operation not supported."674 )675 result.assert_exit_code(676 message=f"Couldn't get device {interface} RX flow hash level for"677 f" protocol {protocol}."678 )679 if device.device_rx_hash_level:680 device.device_rx_hash_level._parse_rx_hash_level(681 interface, protocol, result.stdout682 )683 device_rx_hash_level = device.device_rx_hash_level684 else:685 device_rx_hash_level = DeviceRxHashLevel(interface, protocol, result.stdout)686 device.device_rx_hash_level = device_rx_hash_level687 return device_rx_hash_level688 def change_device_rx_hash_level(689 self, interface: str, protocol: str, enable: bool690 ) -> DeviceRxHashLevel:691 param = "sd"692 if enable:693 param = "sdfn"694 result = self.run(...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!