Best Python code snippet using lisa_python
platform_.py
Source:platform_.py
...390 else:391 normalized_name = constants.NORMALIZE_PATTERN.sub("-", constants.RUN_NAME)392 # Take last chars to make sure the length is to exceed max 90 chars393 # allowed in resource group name.394 resource_group_name = truncate_keep_prefix(395 f"{normalized_name}-e{environment.id}", 80396 )397 environment_context.resource_group_is_specified = True398 environment_context.resource_group_name = resource_group_name399 if self._azure_runbook.dry_run:400 log.info(f"dry_run: {self._azure_runbook.dry_run}")401 else:402 try:403 if self._azure_runbook.deploy:404 log.info(405 f"creating or updating resource group: [{resource_group_name}]"406 )407 check_or_create_resource_group(408 self.credential,409 subscription_id=self.subscription_id,410 resource_group_name=resource_group_name,411 location=RESOURCE_GROUP_LOCATION,412 log=log,413 )414 else:415 log.info(f"reusing resource group: [{resource_group_name}]")416 location, deployment_parameters = self._create_deployment_parameters(417 resource_group_name, environment, log418 )419 if self._azure_runbook.deploy:420 self._validate_template(deployment_parameters, log)421 self._deploy(location, deployment_parameters, log)422 # Even skipped deploy, try best to initialize nodes423 self.initialize_environment(environment, log)424 except Exception as identifier:425 self._delete_environment(environment, log)426 raise identifier427 def _delete_environment(self, environment: Environment, log: Logger) -> None:428 environment_context = get_environment_context(environment=environment)429 resource_group_name = environment_context.resource_group_name430 # the resource group name is empty when it is not deployed for some reasons,431 # like capability doesn't meet case requirement.432 if not resource_group_name:433 return434 assert self._azure_runbook435 if not environment_context.resource_group_is_specified:436 log.info(437 f"skipped to delete resource group: {resource_group_name}, "438 f"as it's specified in runbook."439 )440 elif self._azure_runbook.dry_run:441 log.info(442 f"skipped to delete resource group: {resource_group_name}, "443 f"as it's a dry run."444 )445 else:446 assert self._rm_client447 az_rg_exists = self._rm_client.resource_groups.check_existence(448 resource_group_name449 )450 if not az_rg_exists:451 return452 log.info(453 f"deleting resource group: {resource_group_name}, "454 f"wait: {self._azure_runbook.wait_delete}"455 )456 try:457 self._delete_boot_diagnostic_container(resource_group_name, log)458 except Exception as identifier:459 log.debug(460 f"exception on deleting boot diagnostic container: {identifier}"461 )462 delete_operation: Any = None463 try:464 delete_operation = self._rm_client.resource_groups.begin_delete(465 resource_group_name466 )467 except Exception as identifier:468 log.debug(f"exception on delete resource group: {identifier}")469 if delete_operation and self._azure_runbook.wait_delete:470 wait_operation(471 delete_operation, failure_identity="delete resource group"472 )473 else:474 log.debug("not wait deleting")475 def _delete_boot_diagnostic_container(476 self, resource_group_name: str, log: Logger477 ) -> None:478 compute_client = get_compute_client(self)479 vms = compute_client.virtual_machines.list(resource_group_name)480 for vm in vms:481 diagnostic_data = (482 compute_client.virtual_machines.retrieve_boot_diagnostics_data(483 resource_group_name=resource_group_name, vm_name=vm.name484 )485 )486 if not diagnostic_data:487 continue488 # A sample url,489 # https://storageaccountname.blob.core.windows.net:443/490 # bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70/491 # node-0.30779088-9b10-4074-8c27-98b91f1d8b70.serialconsole.log492 # ?sv=2018-03-28&sr=b&sig=mJEsvk9WunbKHfBs1lo1jcIBe4owq1brP8Kw3qXTQJA%3d&493 # se=2021-09-14T08%3a55%3a38Z&sp=r494 blob_uri = diagnostic_data.console_screenshot_blob_uri495 if blob_uri:496 matched = self._diagnostic_storage_container_pattern.match(blob_uri)497 assert matched498 # => storageaccountname499 storage_name = matched.group("storage_name")500 # => bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70501 container_name = matched.group("container_name")502 container_client = get_or_create_storage_container(503 credential=self.credential,504 subscription_id=self.subscription_id,505 account_name=storage_name,506 container_name=container_name,507 resource_group_name=self._azure_runbook.shared_resource_group_name,508 )509 log.debug(510 f"deleting boot diagnostic container: {container_name}"511 f" under storage account {storage_name} of vm {vm.name}"512 )513 try:514 container_client.delete_container()515 except Exception as identifier:516 log.debug(517 f"exception on deleting boot diagnostic container:"518 f" {identifier}"519 )520 def _get_node_information(self, node: Node) -> Dict[str, str]:521 information: Dict[str, Any] = {}522 node.log.debug("detecting lis version...")523 modinfo = node.tools[Modinfo]524 information["lis_version"] = modinfo.get_version("hv_vmbus")525 node.log.debug("detecting vm generation...")526 information[KEY_VM_GENERATION] = node.tools[VmGeneration].get_generation()527 node.log.debug(f"vm generation: {information[KEY_VM_GENERATION]}")528 return information529 def _get_kernel_version(self, node: Node) -> str:530 result: str = ""531 if not result and hasattr(node, ATTRIBUTE_FEATURES):532 # try to get kernel version in Azure. use it, when uname doesn't work533 node.log.debug("detecting kernel version from serial log...")534 serial_console = node.features[features.SerialConsole]535 result = serial_console.get_matched_str(KERNEL_VERSION_PATTERN)536 return result537 def _get_host_version(self, node: Node) -> str:538 result: str = ""539 try:540 if node.is_connected and node.is_posix:541 node.log.debug("detecting host version from dmesg...")542 dmesg = node.tools[Dmesg]543 result = get_matched_str(544 dmesg.get_output(), HOST_VERSION_PATTERN, first_match=False545 )546 except Exception as identifier:547 # it happens on some error vms. Those error should be caught earlier in548 # test cases not here. So ignore any error here to collect information only.549 node.log.debug(f"error on run dmesg: {identifier}")550 # if not get, try again from serial console log.551 # skip if node is not initialized.552 if not result and hasattr(node, ATTRIBUTE_FEATURES):553 node.log.debug("detecting host version from serial log...")554 serial_console = node.features[features.SerialConsole]555 result = serial_console.get_matched_str(HOST_VERSION_PATTERN)556 return result557 def _get_wala_version(self, node: Node) -> str:558 result = ""559 try:560 if node.is_connected and node.is_posix:561 node.log.debug("detecting wala version from waagent...")562 waagent = node.tools[Waagent]563 result = waagent.get_version()564 except Exception as identifier:565 # it happens on some error vms. Those error should be caught earlier in566 # test cases not here. So ignore any error here to collect information only.567 node.log.debug(f"error on run waagent: {identifier}")568 if not result and hasattr(node, ATTRIBUTE_FEATURES):569 node.log.debug("detecting wala agent version from serial log...")570 serial_console = node.features[features.SerialConsole]571 result = serial_console.get_matched_str(WALA_VERSION_PATTERN)572 return result573 def _get_wala_distro_version(self, node: Node) -> str:574 result = "Unknown"575 try:576 if node.is_connected and node.is_posix:577 waagent = node.tools[Waagent]578 result = waagent.get_distro_version()579 except Exception as identifier:580 # it happens on some error vms. Those error should be caught earlier in581 # test cases not here. So ignore any error here to collect information only.582 node.log.debug(f"error on get waagent distro version: {identifier}")583 return result584 def _get_platform_information(self, environment: Environment) -> Dict[str, str]:585 result: Dict[str, str] = {}586 azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(587 AzurePlatformSchema588 )589 result[AZURE_RG_NAME_KEY] = get_environment_context(590 environment591 ).resource_group_name592 if azure_runbook.availability_set_properties:593 for (594 property_name,595 property_value,596 ) in azure_runbook.availability_set_properties.items():597 if property_name in [598 "platformFaultDomainCount",599 "platformUpdateDomainCount",600 ]:601 continue602 if isinstance(property_value, dict):603 for key, value in property_value.items():604 if value:605 result[key] = value606 if azure_runbook.availability_set_tags:607 for key, value in azure_runbook.availability_set_tags.items():608 if value:609 result[key] = value610 if azure_runbook.vm_tags:611 for key, value in azure_runbook.vm_tags.items():612 if value:613 result[key] = value614 return result615 def _get_environment_information(self, environment: Environment) -> Dict[str, str]:616 information: Dict[str, str] = {}617 node_runbook: Optional[AzureNodeSchema] = None618 if environment.nodes:619 node: Optional[Node] = environment.default_node620 else:621 node = None622 if node:623 node_runbook = node.capability.get_extended_runbook(AzureNodeSchema, AZURE)624 for key, method in self._environment_information_hooks.items():625 node.log.debug(f"detecting {key} ...")626 try:627 value = method(node)628 if value:629 information[key] = value630 except Exception as identifier:631 node.log.exception(f"error on get {key}.", exc_info=identifier)632 information.update(self._get_platform_information(environment))633 if node.is_connected and node.is_posix:634 information.update(self._get_node_information(node))635 elif environment.capability and environment.capability.nodes:636 # get deployment information, if failed on preparing phase637 node_space = environment.capability.nodes[0]638 node_runbook = node_space.get_extended_runbook(639 AzureNodeSchema, type_name=AZURE640 )641 if node_runbook:642 information["location"] = node_runbook.location643 information["vmsize"] = node_runbook.vm_size644 information["image"] = node_runbook.get_image_name()645 return information646 def _initialize(self, *args: Any, **kwargs: Any) -> None:647 # set needed environment variables for authentication648 azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(649 AzurePlatformSchema650 )651 assert azure_runbook, "platform runbook cannot be empty"652 self._azure_runbook = azure_runbook653 self.subscription_id = azure_runbook.subscription_id654 self._initialize_credential()655 check_or_create_resource_group(656 self.credential,657 self.subscription_id,658 azure_runbook.shared_resource_group_name,659 RESOURCE_GROUP_LOCATION,660 self._log,661 )662 self._rm_client = get_resource_management_client(663 self.credential, self.subscription_id664 )665 def _initialize_credential(self) -> None:666 azure_runbook = self._azure_runbook667 credential_key = (668 f"{azure_runbook.service_principal_tenant_id}_"669 f"{azure_runbook.service_principal_client_id}"670 )671 credential = self._credentials.get(credential_key, None)672 if not credential:673 # set azure log to warn level only674 logging.getLogger("azure").setLevel(azure_runbook.log_level)675 if azure_runbook.service_principal_tenant_id:676 os.environ[677 "AZURE_TENANT_ID"678 ] = azure_runbook.service_principal_tenant_id679 if azure_runbook.service_principal_client_id:680 os.environ[681 "AZURE_CLIENT_ID"682 ] = azure_runbook.service_principal_client_id683 if azure_runbook.service_principal_key:684 os.environ["AZURE_CLIENT_SECRET"] = azure_runbook.service_principal_key685 credential = DefaultAzureCredential()686 with SubscriptionClient(credential) as self._sub_client:687 # suppress warning message by search for different credential types688 azure_identity_logger = logging.getLogger("azure.identity")689 azure_identity_logger.setLevel(logging.ERROR)690 with global_credential_access_lock:691 subscription = self._sub_client.subscriptions.get(692 self.subscription_id693 )694 azure_identity_logger.setLevel(logging.WARN)695 if not subscription:696 raise LisaException(697 f"Cannot find subscription id: '{self.subscription_id}'. "698 f"Make sure it exists and current account can access it."699 )700 self._log.info(701 f"connected to subscription: "702 f"{subscription.id}, '{subscription.display_name}'"703 )704 self._credentials[credential_key] = credential705 self.credential = credential706 def _load_template(self) -> Any:707 if self._arm_template is None:708 template_file_path = Path(__file__).parent / "arm_template.json"709 with open(template_file_path, "r") as f:710 self._arm_template = json.load(f)711 return self._arm_template712 @retry(tries=10, delay=1, jitter=(0.5, 1))713 def _load_location_info_from_file(714 self, cached_file_name: Path, log: Logger715 ) -> Optional[AzureLocation]:716 loaded_obj: Optional[AzureLocation] = None717 if cached_file_name.exists():718 try:719 with open(cached_file_name, "r") as f:720 loaded_data: Dict[str, Any] = json.load(f)721 loaded_obj = schema.load_by_type(AzureLocation, loaded_data)722 except Exception as identifier:723 # if schema changed, There may be exception, remove cache and retry724 # Note: retry on this method depends on decorator725 log.debug(726 f"error on loading cache, delete cache and retry. {identifier}"727 )728 cached_file_name.unlink()729 raise identifier730 return loaded_obj731 def get_location_info(self, location: str, log: Logger) -> AzureLocation:732 cached_file_name = constants.CACHE_PATH.joinpath(733 f"azure_locations_{location}.json"734 )735 should_refresh: bool = True736 key = self._get_location_key(location)737 location_data = self._locations_data_cache.get(key, None)738 if not location_data:739 location_data = self._load_location_info_from_file(740 cached_file_name=cached_file_name, log=log741 )742 if location_data:743 delta = datetime.now() - location_data.updated_time744 # refresh cached locations every 1 day.745 if delta.days < 1:746 should_refresh = False747 else:748 log.debug(749 f"{key}: cache timeout: {location_data.updated_time},"750 f"sku count: {len(location_data.capabilities)}"751 )752 else:753 log.debug(f"{key}: no cache found")754 if should_refresh:755 compute_client = get_compute_client(self)756 log.debug(f"{key}: querying")757 all_skus: Dict[str, AzureCapability] = dict()758 paged_skus = compute_client.resource_skus.list(759 f"location eq '{location}'"760 ).by_page()761 for skus in paged_skus:762 for sku_obj in skus:763 try:764 if sku_obj.resource_type == "virtualMachines":765 if sku_obj.restrictions and any(766 restriction.type == "Location"767 for restriction in sku_obj.restrictions768 ):769 # restricted on this location770 continue771 resource_sku = sku_obj.as_dict()772 capability = self._resource_sku_to_capability(773 location, sku_obj774 )775 # estimate vm cost for priority776 assert isinstance(capability.core_count, int)777 assert isinstance(capability.gpu_count, int)778 azure_capability = AzureCapability(779 location=location,780 vm_size=sku_obj.name,781 capability=capability,782 resource_sku=resource_sku,783 )784 all_skus[azure_capability.vm_size] = azure_capability785 except Exception as identifier:786 log.error(f"unknown sku: {sku_obj}")787 raise identifier788 location_data = AzureLocation(location=location, capabilities=all_skus)789 log.debug(f"{location}: saving to disk")790 with open(cached_file_name, "w") as f:791 json.dump(location_data.to_dict(), f) # type: ignore792 log.debug(f"{key}: new data, " f"sku: {len(location_data.capabilities)}")793 assert location_data794 self._locations_data_cache[key] = location_data795 return location_data796 def _create_deployment_parameters(797 self, resource_group_name: str, environment: Environment, log: Logger798 ) -> Tuple[str, Dict[str, Any]]:799 assert environment.runbook, "env data cannot be None"800 assert environment.runbook.nodes_requirement, "node requirement cannot be None"801 log.debug("creating deployment")802 # construct parameters803 arm_parameters = AzureArmParameter()804 copied_fields = [805 "availability_set_tags",806 "availability_set_properties",807 "vm_tags",808 ]809 set_filtered_fields(self._azure_runbook, arm_parameters, copied_fields)810 is_windows: bool = False811 arm_parameters.admin_username = self.runbook.admin_username812 if self.runbook.admin_private_key_file:813 arm_parameters.admin_key_data = get_public_key_data(814 self.runbook.admin_private_key_file815 )816 else:817 arm_parameters.admin_password = self.runbook.admin_password818 environment_context = get_environment_context(environment=environment)819 arm_parameters.vm_tags["RG"] = environment_context.resource_group_name820 # get local lisa environment821 arm_parameters.vm_tags["lisa_username"] = local().tools[Whoami].get_username()822 arm_parameters.vm_tags["lisa_hostname"] = local().tools[Hostname].get_hostname()823 nodes_parameters: List[AzureNodeArmParameter] = []824 features_settings: Dict[str, schema.FeatureSettings] = {}825 for node_space in environment.runbook.nodes_requirement:826 assert isinstance(827 node_space, schema.NodeSpace828 ), f"actual: {type(node_space)}"829 azure_node_runbook = node_space.get_extended_runbook(830 AzureNodeSchema, type_name=AZURE831 )832 # Subscription Id is used by Shared Gallery images located833 # in subscription different from where LISA is run834 azure_node_runbook.subscription_id = self.subscription_id835 # init node836 node = environment.create_node_from_requirement(837 node_space,838 )839 azure_node_runbook = self._create_node_runbook(840 len(nodes_parameters), node_space, log, resource_group_name841 )842 # save parsed runbook back, for example, the version of marketplace may be843 # parsed from latest to a specified version.844 node.capability.set_extended_runbook(azure_node_runbook)845 node_arm_parameters = self._create_node_arm_parameters(node.capability, log)846 nodes_parameters.append(node_arm_parameters)847 # Set data disk array848 arm_parameters.data_disks = self._generate_data_disks(849 node, node_arm_parameters850 )851 if not arm_parameters.location:852 # take first one's location853 arm_parameters.location = azure_node_runbook.location854 # save vm's information into node855 node_context = get_node_context(node)856 node_context.resource_group_name = environment_context.resource_group_name857 # vm's name, use to find it from azure858 node_context.vm_name = azure_node_runbook.name859 # ssh related information will be filled back once vm is created. If860 # it's Windows, fill in the password always. If it's Linux, the861 # private key has higher priority.862 node_context.username = arm_parameters.admin_username863 if azure_node_runbook.is_linux:864 node_context.password = arm_parameters.admin_password865 else:866 is_windows = True867 if not self.runbook.admin_password:868 # password is required, if it doesn't present, generate one.869 password = generate_random_chars()870 add_secret(password)871 self.runbook.admin_password = password872 node_context.password = self.runbook.admin_password873 node_context.private_key_file = self.runbook.admin_private_key_file874 # collect all features to handle special deployment logic. If one875 # node has this, it needs to run.876 if node.capability.features:877 for f in node.capability.features:878 if f.type not in features_settings:879 features_settings[f.type] = f880 log.info(f"vm setting: {azure_node_runbook}")881 if is_windows:882 # set password for windows any time.883 arm_parameters.admin_password = self.runbook.admin_password884 arm_parameters.nodes = nodes_parameters885 arm_parameters.storage_name = get_storage_account_name(886 self.subscription_id, arm_parameters.location887 )888 if (889 self._azure_runbook.availability_set_properties890 or self._azure_runbook.availability_set_tags891 ):892 arm_parameters.use_availability_sets = True893 # In Azure, each VM should have only one nic in one subnet. So calculate894 # the max nic count, and set to subnet count.895 arm_parameters.subnet_count = max(x.nic_count for x in arm_parameters.nodes)896 arm_parameters.shared_resource_group_name = (897 self._azure_runbook.shared_resource_group_name898 )899 # the arm template may be updated by the hooks, so make a copy to avoid900 # the original template is modified.901 template = deepcopy(self._load_template())902 plugin_manager.hook.azure_update_arm_template(903 template=template, environment=environment904 )905 # change deployment for each feature.906 for f in features_settings.values():907 feature_type = next(908 x for x in self.supported_features() if x.name() == f.type909 )910 feature_type.on_before_deployment(911 arm_parameters=arm_parameters,912 template=template,913 settings=f,914 environment=environment,915 log=log,916 )917 # composite deployment properties918 parameters = arm_parameters.to_dict() # type:ignore919 parameters = {k: {"value": v} for k, v in parameters.items()}920 log.debug(f"parameters: {parameters}")921 deployment_properties = DeploymentProperties(922 mode=DeploymentMode.incremental,923 template=template,924 parameters=parameters,925 )926 # dump arm_template and arm_parameters to file927 template_dump_path = environment.log_path / "arm_template.json"928 param_dump_path = environment.log_path / "arm_template_parameters.json"929 dump_file(template_dump_path, json.dumps(template, indent=4))930 dump_file(param_dump_path, json.dumps(parameters, indent=4))931 return (932 arm_parameters.location,933 {934 AZURE_RG_NAME_KEY: resource_group_name,935 "deployment_name": AZURE_DEPLOYMENT_NAME,936 "parameters": Deployment(properties=deployment_properties),937 },938 )939 def _create_node_runbook(940 self,941 index: int,942 node_space: schema.NodeSpace,943 log: Logger,944 name_prefix: str,945 ) -> AzureNodeSchema:946 azure_node_runbook = node_space.get_extended_runbook(947 AzureNodeSchema, type_name=AZURE948 )949 if not azure_node_runbook.name:950 # the max length of vm name is 64 chars. Below logic takes last 45951 # chars in resource group name and keep the leading 5 chars.952 # name_prefix can contain any of customized (existing) or953 # generated (starts with "lisa-") resource group name,954 # so, pass the first 5 chars as prefix to truncate_keep_prefix955 # to handle both cases956 node_name = f"{name_prefix}-n{index}"957 azure_node_runbook.name = truncate_keep_prefix(node_name, 50, node_name[:5])958 # It's used as computer name only. Windows doesn't support name more959 # than 15 chars960 azure_node_runbook.short_name = truncate_keep_prefix(961 azure_node_runbook.name, 15, azure_node_runbook.name[:5]962 )963 if not azure_node_runbook.vm_size:964 raise LisaException("vm_size is not detected before deploy")965 if not azure_node_runbook.location:966 raise LisaException("location is not detected before deploy")967 if azure_node_runbook.hyperv_generation not in [1, 2]:968 raise LisaException(969 "hyperv_generation need value 1 or 2, "970 f"but {azure_node_runbook.hyperv_generation}",971 )972 if azure_node_runbook.vhd:973 # vhd is higher priority974 azure_node_runbook.vhd = self._get_deployable_vhd_path(...
__init__.py
Source:__init__.py
...454 mm_field=field_function(*args, **kwargs),455 )456def is_unittest() -> bool:457 return "unittest" in sys.argv[0]458def truncate_keep_prefix(content: str, kept_len: int, prefix: str = "lisa-") -> str:459 """460 This method is used to truncate names, when some resource has length461 limitation. It keeps meaningful part and the defined prefix.462 To support custom names. if the string size doesn't exceed the limitation,463 there is no any validation and truncate.464 The last chars include the datetime pattern, it's more unique than leading465 project/test pass names. The name is used to identify lisa deployed466 environment too, so it needs to keep the leading "lisa-" after truncated.467 This makes the name from lisa-long-name... to lisa-name...468 """469 # do nothing, if the string is shorter than required.470 if len(content) <= kept_len:471 return content472 if not content.startswith(prefix):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!