Best Python code snippet using lisa_python
features.py
Source:features.py
...517 return schema.NetworkInterfaceOptionSettings518 def _initialize(self, *args: Any, **kwargs: Any) -> None:519 super()._initialize(*args, **kwargs)520 self._initialize_information(self._node)521 all_nics = self._get_all_nics()522 # store extra synthetic and sriov nics count523 # in order to restore nics status after testing which needs change nics524 # extra synthetic nics count before testing525 self.origin_extra_synthetic_nics_count = len(526 [527 x528 for x in all_nics529 if x.primary is False and x.enable_accelerated_networking is False530 ]531 )532 # extra sriov nics count before testing533 self.origin_extra_sriov_nics_count = (534 len(all_nics) - self.origin_extra_synthetic_nics_count - 1535 )536 def switch_sriov(537 self, enable: bool, wait: bool = True, reset_connections: bool = True538 ) -> None:539 azure_platform: AzurePlatform = self._platform # type: ignore540 network_client = get_network_client(azure_platform)541 vm = get_vm(azure_platform, self._node)542 for nic in vm.network_profile.network_interfaces:543 # get nic name from nic id544 # /subscriptions/[subid]/resourceGroups/[rgname]/providers545 # /Microsoft.Network/networkInterfaces/[nicname]546 nic_name = nic.id.split("/")[-1]547 updated_nic = network_client.network_interfaces.get(548 self._resource_group_name, nic_name549 )550 if updated_nic.enable_accelerated_networking == enable:551 self._log.debug(552 f"network interface {nic_name}'s accelerated networking default "553 f"status [{updated_nic.enable_accelerated_networking}] is "554 f"consistent with set status [{enable}], no need to update."555 )556 else:557 self._log.debug(558 f"network interface {nic_name}'s accelerated networking default "559 f"status [{updated_nic.enable_accelerated_networking}], "560 f"now set its status into [{enable}]."561 )562 updated_nic.enable_accelerated_networking = enable563 network_client.network_interfaces.begin_create_or_update(564 self._resource_group_name, updated_nic.name, updated_nic565 )566 updated_nic = network_client.network_interfaces.get(567 self._resource_group_name, nic_name568 )569 assert_that(updated_nic.enable_accelerated_networking).described_as(570 f"fail to set network interface {nic_name}'s accelerated "571 f"networking into status [{enable}]"572 ).is_equal_to(enable)573 # wait settings effective574 if wait:575 self._check_sriov_enabled(enable, reset_connections)576 def is_enabled_sriov(self) -> bool:577 azure_platform: AzurePlatform = self._platform # type: ignore578 network_client = get_network_client(azure_platform)579 sriov_enabled: bool = False580 vm = get_vm(azure_platform, self._node)581 nic = self._get_primary(vm.network_profile.network_interfaces)582 nic_name = nic.id.split("/")[-1]583 primary_nic = network_client.network_interfaces.get(584 self._resource_group_name, nic_name585 )586 sriov_enabled = primary_nic.enable_accelerated_networking587 return sriov_enabled588 def attach_nics(589 self, extra_nic_count: int, enable_accelerated_networking: bool = True590 ) -> None:591 if 0 == extra_nic_count:592 return593 azure_platform: AzurePlatform = self._platform # type: ignore594 network_client = get_network_client(azure_platform)595 compute_client = get_compute_client(azure_platform)596 vm = get_vm(azure_platform, self._node)597 current_nic_count = len(vm.network_profile.network_interfaces)598 nic_count_after_add_extra = extra_nic_count + current_nic_count599 assert (600 self._node.capability.network_interface601 and self._node.capability.network_interface.max_nic_count602 )603 assert isinstance(604 self._node.capability.network_interface.max_nic_count, int605 ), f"actual: {type(self._node.capability.network_interface.max_nic_count)}"606 node_capability_nic_count = (607 self._node.capability.network_interface.max_nic_count608 )609 if nic_count_after_add_extra > node_capability_nic_count:610 raise LisaException(611 f"nic count after add extra nics is {nic_count_after_add_extra},"612 f" it exceeds the vm size's capability {node_capability_nic_count}."613 )614 nic = self._get_primary(vm.network_profile.network_interfaces)615 nic_name = nic.id.split("/")[-1]616 primary_nic = network_client.network_interfaces.get(617 self._resource_group_name, nic_name618 )619 startstop = self._node.features[StartStop]620 startstop.stop()621 network_interfaces_section = []622 index = 0623 while index < current_nic_count + extra_nic_count - 1:624 extra_nic_name = f"{self._node.name}-extra-{index}"625 self._log.debug(f"start to create the nic {extra_nic_name}.")626 params = {627 "location": vm.location,628 "enable_accelerated_networking": enable_accelerated_networking,629 "ip_configurations": [630 {631 "name": extra_nic_name,632 "subnet": {"id": primary_nic.ip_configurations[0].subnet.id},633 "primary": False,634 }635 ],636 }637 network_client.network_interfaces.begin_create_or_update(638 resource_group_name=self._resource_group_name,639 network_interface_name=extra_nic_name,640 parameters=params,641 )642 self._log.debug(f"create the nic {extra_nic_name} successfully.")643 extra_nic = network_client.network_interfaces.get(644 network_interface_name=extra_nic_name,645 resource_group_name=self._resource_group_name,646 )647 network_interfaces_section.append({"id": extra_nic.id, "primary": False})648 index += 1649 network_interfaces_section.append({"id": primary_nic.id, "primary": True})650 self._log.debug(f"start to attach the nics into VM {self._node.name}.")651 compute_client.virtual_machines.begin_update(652 resource_group_name=self._resource_group_name,653 vm_name=self._node.name,654 parameters={655 "network_profile": {"network_interfaces": network_interfaces_section},656 },657 )658 self._log.debug(f"attach the nics into VM {self._node.name} successfully.")659 startstop.start()660 def get_nic_count(self, is_sriov_enabled: bool = True) -> int:661 return len(662 [663 x664 for x in self._get_all_nics()665 if x.enable_accelerated_networking == is_sriov_enabled666 ]667 )668 def remove_extra_nics(self) -> None:669 azure_platform: AzurePlatform = self._platform # type: ignore670 network_client = get_network_client(azure_platform)671 compute_client = get_compute_client(azure_platform)672 vm = get_vm(azure_platform, self._node)673 if len(vm.network_profile.network_interfaces) == 1:674 self._log.debug("No existed extra nics can be disassociated.")675 return676 nic = self._get_primary(vm.network_profile.network_interfaces)677 nic_name = nic.id.split("/")[-1]678 primary_nic = network_client.network_interfaces.get(679 self._resource_group_name, nic_name680 )681 network_interfaces_section = []682 network_interfaces_section.append({"id": primary_nic.id, "primary": True})683 startstop = self._node.features[StartStop]684 startstop.stop()685 compute_client.virtual_machines.begin_update(686 resource_group_name=self._resource_group_name,687 vm_name=self._node.name,688 parameters={689 "network_profile": {"network_interfaces": network_interfaces_section},690 },691 )692 self._log.debug(693 f"Only associated nic {primary_nic.id} into VM {self._node.name}."694 )695 startstop.start()696 def reload_module(self) -> None:697 modprobe_tool = self._node.tools[Modprobe]698 modprobe_tool.reload(["hv_netvsc"])699 @retry(tries=60, delay=10)700 def _check_sriov_enabled(701 self, enabled: bool, reset_connections: bool = True702 ) -> None:703 if reset_connections:704 self._node.close()705 self._node.nics.reload()706 default_nic = self._node.nics.get_nic_by_index(0)707 if enabled and not default_nic.lower:708 raise LisaException("SRIOV is enabled, but VF is not found.")709 elif not enabled and default_nic.lower:710 raise LisaException("SRIOV is disabled, but VF exists.")711 else:712 # the enabled flag is consistent with VF presents.713 ...714 def _get_primary(715 self, nics: List[NetworkInterfaceReference]716 ) -> NetworkInterfaceReference:717 found_primary = False718 for nic in nics:719 if nic.primary:720 found_primary = True721 break722 if not found_primary:723 raise LisaException(f"fail to find primary nic for vm {self._node.name}")724 return nic725 def _get_all_nics(self) -> Any:726 azure_platform: AzurePlatform = self._platform # type: ignore727 network_client = get_network_client(azure_platform)728 vm = get_vm(azure_platform, self._node)729 all_nics = []730 for nic in vm.network_profile.network_interfaces:731 # get nic name from nic id732 # /subscriptions/[subid]/resourceGroups/[rgname]/providers733 # /Microsoft.Network/networkInterfaces/[nicname]734 nic_name = nic.id.split("/")[-1]735 all_nics.append(736 network_client.network_interfaces.get(737 self._resource_group_name, nic_name738 )739 )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!