Best Python code snippet using lisa_python
platform_.py
Source:platform_.py
...1574 product=plan_product,1575 publisher=plan_publisher,1576 )1577 return plan1578 def _generate_max_capability(self, vm_size: str, location: str) -> AzureCapability:1579 # some vm size cannot be queried from API, so use default capability to1580 # run with best guess on capability.1581 node_space = schema.NodeSpace(1582 node_count=1,1583 core_count=search_space.IntRange(min=1),1584 memory_mb=search_space.IntRange(min=0),1585 gpu_count=search_space.IntRange(min=0),1586 )1587 node_space.disk = features.AzureDiskOptionSettings()1588 node_space.disk.data_disk_count = search_space.IntRange(min=0)1589 node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1590 is_allow_set=True, items=[]1591 )1592 node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1593 node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1594 node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1595 node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1596 node_space.network_interface = schema.NetworkInterfaceOptionSettings()1597 node_space.network_interface.data_path = search_space.SetSpace[1598 schema.NetworkDataPath1599 ](is_allow_set=True, items=[])1600 node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1601 node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1602 node_space.network_interface.nic_count = search_space.IntRange(min=1)1603 # till now, the max nic number supported in Azure is 81604 node_space.network_interface.max_nic_count = 81605 azure_capability = AzureCapability(1606 location=location,1607 vm_size=vm_size,1608 capability=node_space,1609 resource_sku={},1610 )1611 node_space.name = f"{location}_{vm_size}"1612 node_space.features = search_space.SetSpace[schema.FeatureSettings](1613 is_allow_set=True1614 )1615 # all nodes support following features1616 all_features = self.supported_features()1617 node_space.features.update(1618 [schema.FeatureSettings.create(x.name()) for x in all_features]1619 )1620 _convert_to_azure_node_space(node_space)1621 return azure_capability1622 def _generate_min_capability(1623 self,1624 requirement: schema.NodeSpace,1625 azure_capability: AzureCapability,1626 location: str,1627 ) -> schema.NodeSpace:1628 min_cap: schema.NodeSpace = requirement.generate_min_capability(1629 azure_capability.capability1630 )1631 # Apply azure specified values. They will pass into arm template1632 azure_node_runbook = min_cap.get_extended_runbook(AzureNodeSchema, AZURE)1633 if azure_node_runbook.location:1634 assert location in azure_node_runbook.location, (1635 f"predefined location [{azure_node_runbook.location}] "1636 f"must be same as "1637 f"cap location [{location}]"1638 )1639 # the location may not be set1640 azure_node_runbook.location = location1641 azure_node_runbook.vm_size = azure_capability.vm_size1642 return min_cap1643 def _generate_sas_token(self, result_dict: Dict[str, str]) -> Any:1644 sc_name = result_dict["account_name"]1645 container_name = result_dict["container_name"]1646 rg = result_dict["resource_group_name"]1647 blob_name = result_dict["blob_name"]1648 source_container_client = get_or_create_storage_container(1649 credential=self.credential,1650 subscription_id=self.subscription_id,1651 account_name=sc_name,1652 container_name=container_name,1653 resource_group_name=rg,1654 )1655 source_blob = source_container_client.get_blob_client(blob_name)1656 sas_token = generate_sas_token(1657 credential=self.credential,1658 subscription_id=self.subscription_id,1659 account_name=sc_name,1660 resource_group_name=rg,1661 )1662 source_url = source_blob.url + "?" + sas_token1663 return source_url1664 @lru_cache(maxsize=10) # noqa: B0191665 def _get_deployable_vhd_path(1666 self, vhd_path: str, location: str, log: Logger1667 ) -> str:1668 """1669 The sas url is not able to create a vm directly, so this method check if1670 the vhd_path is a sas url. If so, copy it to a location in current1671 subscription, so it can be deployed.1672 """1673 matches = SAS_URL_PATTERN.match(vhd_path)1674 if not matches:1675 vhd_details = self._get_vhd_details(vhd_path)1676 vhd_location = vhd_details["location"]1677 if location == vhd_location:1678 return vhd_path1679 else:1680 vhd_path = self._generate_sas_token(vhd_details)1681 matches = SAS_URL_PATTERN.match(vhd_path)1682 assert matches, f"fail to generate sas url for {vhd_path}"1683 log.debug(1684 f"the vhd location {location} is not same with running case "1685 f"location {vhd_location}, generate a sas url for source vhd, "1686 f"it needs to be copied into location {location}."1687 )1688 else:1689 log.debug("found the vhd is a sas url, it may need to be copied.")1690 # get original vhd's hash key for comparing.1691 original_key: Optional[bytearray] = None1692 original_vhd_path = vhd_path1693 original_blob_client = BlobClient.from_blob_url(original_vhd_path)1694 properties = original_blob_client.get_blob_properties()1695 if properties.content_settings:1696 original_key = properties.content_settings.get(1697 "content_md5", None1698 ) # type: ignore1699 storage_name = get_storage_account_name(1700 subscription_id=self.subscription_id, location=location, type="t"1701 )1702 check_or_create_storage_account(1703 self.credential,1704 self.subscription_id,1705 storage_name,1706 self._azure_runbook.shared_resource_group_name,1707 location,1708 log,1709 )1710 container_client = get_or_create_storage_container(1711 credential=self.credential,1712 subscription_id=self.subscription_id,1713 account_name=storage_name,1714 container_name=SAS_COPIED_CONTAINER_NAME,1715 resource_group_name=self._azure_runbook.shared_resource_group_name,1716 )1717 normalized_vhd_name = constants.NORMALIZE_PATTERN.sub("-", vhd_path)1718 year = matches["year"] if matches["year"] else "9999"1719 month = matches["month"] if matches["month"] else "01"1720 day = matches["day"] if matches["day"] else "01"1721 # use the expire date to generate the path. It's easy to identify when1722 # the cache can be removed.1723 vhd_path = f"{year}{month}{day}/{normalized_vhd_name}.vhd"1724 full_vhd_path = f"{container_client.url}/{vhd_path}"1725 # lock here to prevent a vhd is copied in multi-thread1726 global _global_sas_vhd_copy_lock1727 cached_key: Optional[bytearray] = None1728 with _global_sas_vhd_copy_lock:1729 blobs = container_client.list_blobs(name_starts_with=vhd_path)1730 for blob in blobs:1731 if blob:1732 # check if hash key matched with original key.1733 if blob.content_settings:1734 cached_key = blob.content_settings.get("content_md5", None)1735 if original_key == cached_key:1736 # if it exists, return the link, not to copy again.1737 log.debug("the sas url is copied already, use it directly.")1738 return full_vhd_path1739 else:1740 log.debug("found cached vhd, but the hash key mismatched.")1741 blob_client = container_client.get_blob_client(vhd_path)1742 blob_client.start_copy_from_url(1743 original_vhd_path, metadata=None, incremental_copy=False1744 )1745 wait_copy_blob(blob_client, vhd_path, log)1746 return full_vhd_path1747 def _get_vhd_details(self, vhd_path: str) -> Any:1748 matched = STORAGE_CONTAINER_BLOB_PATTERN.match(vhd_path)1749 assert matched, f"fail to get matched info from {vhd_path}"1750 sc_name = matched.group("sc")1751 container_name = matched.group("container")1752 blob_name = matched.group("blob")1753 storage_client = get_storage_client(self.credential, self.subscription_id)1754 sc = [x for x in storage_client.storage_accounts.list() if x.name == sc_name]1755 assert sc[1756 01757 ], f"fail to get storage account {sc_name} from {self.subscription_id}"1758 rg = get_matched_str(sc[0].id, RESOURCE_GROUP_PATTERN)1759 return {1760 "location": sc[0].location,1761 "resource_group_name": rg,1762 "account_name": sc_name,1763 "container_name": container_name,1764 "blob_name": blob_name,1765 }1766 def _generate_data_disks(1767 self,1768 node: Node,1769 azure_node_runbook: AzureNodeArmParameter,1770 ) -> List[DataDiskSchema]:1771 data_disks: List[DataDiskSchema] = []1772 assert node.capability.disk1773 if azure_node_runbook.marketplace:1774 marketplace = self._get_image_info(1775 azure_node_runbook.location, azure_node_runbook.marketplace1776 )1777 # some images has data disks by default1778 # e.g. microsoft-ads linux-data-science-vm linuxdsvm 21.05.271779 # we have to inject below part when dataDisks section added in1780 # arm template,1781 # otherwise will see below exception:1782 # deployment failed: InvalidParameter: StorageProfile.dataDisks.lun1783 # does not have required value(s) for image specified in1784 # storage profile.1785 for default_data_disk in marketplace.data_disk_images:1786 data_disks.append(1787 DataDiskSchema(1788 node.capability.disk.data_disk_caching_type,1789 default_data_disk.additional_properties["sizeInGb"],1790 azure_node_runbook.disk_type,1791 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_FROM_IMAGE,1792 )1793 )1794 assert isinstance(1795 node.capability.disk.data_disk_count, int1796 ), f"actual: {type(node.capability.disk.data_disk_count)}"1797 for _ in range(node.capability.disk.data_disk_count):1798 assert isinstance(node.capability.disk.data_disk_size, int)1799 data_disks.append(1800 DataDiskSchema(1801 node.capability.disk.data_disk_caching_type,1802 node.capability.disk.data_disk_size,1803 azure_node_runbook.disk_type,1804 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY,1805 )1806 )1807 return data_disks1808 @lru_cache(maxsize=10) # noqa: B0191809 def _get_image_info(1810 self, location: str, marketplace: Optional[AzureVmMarketplaceSchema]1811 ) -> VirtualMachineImage:1812 compute_client = get_compute_client(self)1813 assert isinstance(marketplace, AzureVmMarketplaceSchema)1814 with global_credential_access_lock:1815 image_info = compute_client.virtual_machine_images.get(1816 location=location,1817 publisher_name=marketplace.publisher,1818 offer=marketplace.offer,1819 skus=marketplace.sku,1820 version=marketplace.version,1821 )1822 return image_info1823 def _get_location_key(self, location: str) -> str:1824 return f"{self.subscription_id}_{location}"1825 def _enable_ssh_on_windows(self, node: Node) -> None:1826 runbook = node.capability.get_extended_runbook(AzureNodeSchema)1827 if runbook.is_linux:1828 return1829 context = get_node_context(node)1830 remote_node = cast(RemoteNode, node)1831 log = node.log1832 log.debug(1833 f"checking if SSH port {remote_node.public_port} is reachable "1834 f"on {remote_node.name}..."1835 )1836 connected, _ = wait_tcp_port_ready(1837 address=remote_node.public_address,1838 port=remote_node.public_port,1839 log=log,1840 timeout=3,1841 )1842 if connected:1843 log.debug("SSH port is reachable.")1844 return1845 log.debug("SSH port is not open, enabling ssh on Windows ...")1846 # The SSH port is not opened, try to enable it.1847 public_key_data = get_public_key_data(self.runbook.admin_private_key_file)1848 with open(Path(__file__).parent / "Enable-SSH.ps1", "r") as f:1849 script = f.read()1850 parameters = RunCommandInputParameter(name="PublicKey", value=public_key_data)1851 command = RunCommandInput(1852 command_id="RunPowerShellScript",1853 script=[script],1854 parameters=[parameters],1855 )1856 compute_client = get_compute_client(self)1857 operation = compute_client.virtual_machines.begin_run_command(1858 resource_group_name=context.resource_group_name,1859 vm_name=context.vm_name,1860 parameters=command,1861 )1862 result = wait_operation(operation=operation, failure_identity="enable ssh")1863 log.debug("SSH script result:")1864 log.dump_json(logging.DEBUG, result)1865 def _get_vhd_os_disk_size(self, blob_url: str) -> int:1866 result_dict = self._get_vhd_details(blob_url)1867 container_client = get_or_create_storage_container(1868 credential=self.credential,1869 subscription_id=self.subscription_id,1870 account_name=result_dict["account_name"],1871 container_name=result_dict["container_name"],1872 resource_group_name=result_dict["resource_group_name"],1873 )1874 vhd_blob = container_client.get_blob_client(result_dict["blob_name"])1875 properties = vhd_blob.get_blob_properties()1876 assert properties.size, f"fail to get blob size of {blob_url}"1877 # Azure requires only megabyte alignment of vhds, round size up1878 # for cases where the size is megabyte aligned1879 return math.ceil(properties.size / 1024 / 1024 / 1024)1880 def _get_sig_info(1881 self, shared_image: SharedImageGallerySchema1882 ) -> GalleryImageVersion:1883 compute_client = get_compute_client(self)1884 return compute_client.gallery_image_versions.get(1885 resource_group_name=shared_image.resource_group_name,1886 gallery_name=shared_image.image_gallery,1887 gallery_image_name=shared_image.image_definition,1888 gallery_image_version_name=shared_image.image_version,1889 expand="ReplicationStatus",1890 )1891 def _get_sig_os_disk_size(self, shared_image: SharedImageGallerySchema) -> int:1892 found_image = self._get_sig_info(shared_image)1893 assert found_image.storage_profile.os_disk_image.size_in_gb1894 return int(found_image.storage_profile.os_disk_image.size_in_gb)1895 def _get_normalized_vm_sizes(1896 self, name: str, location: str, log: Logger1897 ) -> List[str]:1898 split_vm_sizes: List[str] = [x.strip() for x in name.split(",")]1899 for index, vm_size in enumerate(split_vm_sizes):1900 split_vm_sizes[index] = self._get_normalized_vm_size(vm_size, location, log)1901 return [x for x in split_vm_sizes if x]1902 def _get_normalized_vm_size(self, name: str, location: str, log: Logger) -> str:1903 # find predefined vm size on all available's.1904 location_info: AzureLocation = self.get_location_info(location, log)1905 matched_score: float = 01906 matched_name: str = ""1907 matcher = SequenceMatcher(None, name.lower(), "")1908 for vm_size in location_info.capabilities:1909 matcher.set_seq2(vm_size.lower())1910 if name.lower() in vm_size.lower() and matched_score < matcher.ratio():1911 matched_name = vm_size1912 matched_score = matcher.ratio()1913 return matched_name1914 def _get_capabilities(1915 self, vm_sizes: List[str], location: str, use_max_capability: bool, log: Logger1916 ) -> List[AzureCapability]:1917 candidate_caps: List[AzureCapability] = []1918 caps = self.get_location_info(location, log).capabilities1919 for vm_size in vm_sizes:1920 # force to use max capability to run test cases as much as possible,1921 # or force to support non-exists vm size.1922 if use_max_capability:1923 candidate_caps.append(self._generate_max_capability(vm_size, location))1924 continue1925 if vm_size in caps:1926 candidate_caps.append(caps[vm_size])1927 return candidate_caps1928 def _get_matched_capability(1929 self,1930 requirement: schema.NodeSpace,1931 candidate_capabilities: List[AzureCapability],1932 ) -> Optional[schema.NodeSpace]:1933 matched_cap: Optional[schema.NodeSpace] = None1934 # filter allowed vm sizes1935 for azure_cap in candidate_capabilities:1936 check_result = requirement.check(azure_cap.capability)1937 if check_result.result:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!