Best Python code snippet using lisa_python
platform.py
Source:platform.py
...79 @classmethod80 def platform_runbook_type(cls) -> type:81 return BaseLibvirtPlatformSchema82 @classmethod83 def node_runbook_type(cls) -> type:84 return BaseLibvirtNodeSchema85 def _initialize(self, *args: Any, **kwargs: Any) -> None:86 libvirt_events_thread.init()87 # 49512 is the first available private port88 self._next_available_port = 4915289 self._port_forwarding_lock = Lock()90 self.platform_runbook = self.runbook.get_extended_runbook(91 self.__platform_runbook_type(), type_name=type(self).type_name()92 )93 def _prepare_environment(self, environment: Environment, log: Logger) -> bool:94 # Ensure environment log directory is created before connecting to any nodes.95 _ = environment.log_path96 if len(self.platform_runbook.hosts) > 1:97 log.warning(98 "Multiple hosts are currently not supported. "99 "Only the first host will be used."100 )101 host = self.platform_runbook.hosts[0]102 if host.is_remote():103 assert host.address104 if not host.username:105 raise LisaException("Username must be provided for remote host")106 if not host.private_key_file:107 raise LisaException("Private key file must be provided for remote host")108 self.host_node = RemoteNode(109 runbook=schema.Node(name="libvirt-host"),110 index=-1,111 logger_name="libvirt-host",112 base_part_path=environment.environment_part_path,113 parent_logger=log,114 )115 self.host_node.set_connection_info(116 address=host.address,117 username=host.username,118 private_key_file=host.private_key_file,119 )120 else:121 self.host_node = local_node_connect(122 name="libvirt-host",123 base_part_path=environment.environment_part_path,124 parent_logger=log,125 )126 self.__init_libvirt_conn_string()127 self._configure_environment(environment, log)128 with libvirt.open(self.libvirt_conn_str) as lv_conn:129 return self._configure_node_capabilities(environment, log, lv_conn)130 def _deploy_environment(self, environment: Environment, log: Logger) -> None:131 self._deploy_nodes(environment, log)132 def _delete_environment(self, environment: Environment, log: Logger) -> None:133 self._delete_nodes(environment, log)134 if self.host_node.is_remote:135 self._stop_port_forwarding(environment, log)136 libvirt_log = self.host_node.tools[Journalctl].logs_for_unit(137 "libvirtd", sudo=self.host_node.is_remote138 )139 libvirt_log_path = self.host_node.local_log_path / "libvirtd.log"140 with open(str(libvirt_log_path), "w") as f:141 f.write(libvirt_log)142 def _configure_environment(self, environment: Environment, log: Logger) -> None:143 environment_context = get_environment_context(environment)144 if self.platform_runbook.network_boot_timeout:145 environment_context.network_boot_timeout = (146 self.platform_runbook.network_boot_timeout147 )148 environment_context.ssh_public_key = get_public_key_data(149 self.runbook.admin_private_key_file150 )151 def _configure_node_capabilities(152 self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect153 ) -> bool:154 if not environment.runbook.nodes_requirement:155 return True156 host_capabilities = self._get_host_capabilities(lv_conn, log)157 nodes_capabilities = self._create_node_capabilities(host_capabilities)158 nodes_requirement = []159 for node_space in environment.runbook.nodes_requirement:160 # Check that the general node capabilities are compatible with this node's161 # specific requirements.162 if not node_space.check(nodes_capabilities):163 return False164 # Rectify the general node capabilities with this node's specific165 # requirements.166 node_requirement = node_space.generate_min_capability(nodes_capabilities)167 nodes_requirement.append(node_requirement)168 if not self._check_host_capabilities(nodes_requirement, host_capabilities, log):169 return False170 environment.runbook.nodes_requirement = nodes_requirement171 return True172 def _get_host_capabilities(173 self, lv_conn: libvirt.virConnect, log: Logger174 ) -> _HostCapabilities:175 host_capabilities = _HostCapabilities()176 capabilities_xml_str = lv_conn.getCapabilities()177 capabilities_xml = ET.fromstring(capabilities_xml_str)178 host_xml = capabilities_xml.find("host")179 assert host_xml180 topology_xml = host_xml.find("topology")181 assert topology_xml182 cells_xml = topology_xml.find("cells")183 assert cells_xml184 for cell in cells_xml.findall("cell"):185 cpus_xml = cell.find("cpus")186 assert cpus_xml187 host_capabilities.core_count += int(cpus_xml.attrib["num"])188 # Get free memory.189 # Include the disk cache size, as it will be freed if memory becomes limited.190 memory_stats = lv_conn.getMemoryStats(libvirt.VIR_NODE_MEMORY_STATS_ALL_CELLS)191 host_capabilities.free_memory_kib = (192 memory_stats[libvirt.VIR_NODE_MEMORY_STATS_FREE]193 + memory_stats[libvirt.VIR_NODE_MEMORY_STATS_CACHED]194 )195 log.debug(196 f"QEMU host: "197 f"CPU Cores = {host_capabilities.core_count}, "198 f"Free Memory = {host_capabilities.free_memory_kib} KiB"199 )200 return host_capabilities201 # Create the set of capabilities that are generally supported on QEMU nodes.202 def _create_node_capabilities(203 self, host_capabilities: _HostCapabilities204 ) -> schema.NodeSpace:205 node_capabilities = schema.NodeSpace()206 node_capabilities.name = "QEMU"207 node_capabilities.node_count = 1208 node_capabilities.core_count = search_space.IntRange(209 min=1, max=host_capabilities.core_count210 )211 node_capabilities.disk = schema.DiskOptionSettings(212 data_disk_count=search_space.IntRange(min=0),213 data_disk_size=search_space.IntRange(min=1),214 )215 node_capabilities.network_interface = schema.NetworkInterfaceOptionSettings()216 node_capabilities.network_interface.max_nic_count = 1217 node_capabilities.network_interface.nic_count = 1218 node_capabilities.gpu_count = 0219 node_capabilities.features = search_space.SetSpace[schema.FeatureSettings](220 is_allow_set=True,221 items=[222 schema.FeatureSettings.create(SerialConsole.name()),223 ],224 )225 return node_capabilities226 # Check that the VM requirements can be fulfilled by the host.227 def _check_host_capabilities(228 self,229 nodes_requirements: List[schema.NodeSpace],230 host_capabilities: _HostCapabilities,231 log: Logger,232 ) -> bool:233 total_required_memory_mib = 0234 for node_requirements in nodes_requirements:235 # Calculate the total amount of memory required for all the VMs.236 assert isinstance(node_requirements.memory_mb, int)237 total_required_memory_mib += node_requirements.memory_mb238 # Ensure host has enough memory for all the VMs.239 total_required_memory_kib = total_required_memory_mib * 1024240 if total_required_memory_kib > host_capabilities.free_memory_kib:241 log.error(242 f"Nodes require a total of {total_required_memory_kib} KiB memory. "243 f"Host only has {host_capabilities.free_memory_kib} KiB free."244 )245 return False246 return True247 # Get the minimum value for a node requirement with an interger type.248 # Note: Unlike other orchestrators, we don't want to fill up the capacity of249 # the host in case the test is running on a dev box.250 def _get_count_space_min(self, count_space: search_space.CountSpace) -> int:251 return search_space.generate_min_capability_countspace(count_space, count_space)252 def _deploy_nodes(self, environment: Environment, log: Logger) -> None:253 self._configure_nodes(environment, log)254 with libvirt.open(self.libvirt_conn_str) as lv_conn:255 try:256 self._create_nodes(environment, log, lv_conn)257 self._fill_nodes_metadata(environment, log, lv_conn)258 except Exception as ex:259 assert environment.platform260 if (261 environment.platform.runbook.keep_environment262 == constants.ENVIRONMENT_KEEP_NO263 ):264 self._delete_nodes(environment, log)265 raise ex266 # Pre-determine all the nodes' properties, including the name of all the resouces267 # to be created. This makes it easier to cleanup everything after the test is268 # finished (or fails).269 def _configure_nodes(self, environment: Environment, log: Logger) -> None:270 # Generate a random name for the VMs.271 test_suffix = "".join(random.choice(string.ascii_uppercase) for _ in range(5))272 vm_name_prefix = f"lisa-{test_suffix}"273 self.vm_disks_dir = os.path.join(274 self.platform_runbook.hosts[0].lisa_working_dir, vm_name_prefix275 )276 assert environment.runbook.nodes_requirement277 for i, node_space in enumerate(environment.runbook.nodes_requirement):278 assert isinstance(279 node_space, schema.NodeSpace280 ), f"actual: {type(node_space)}"281 node_runbook: BaseLibvirtNodeSchema = node_space.get_extended_runbook(282 self.__node_runbook_type(), type_name=type(self).type_name()283 )284 if not os.path.exists(node_runbook.disk_img):285 raise LisaException(f"file does not exist: {node_runbook.disk_img}")286 node = environment.create_node_from_requirement(node_space)287 self._configure_node(288 node,289 i,290 node_space,291 node_runbook,292 vm_name_prefix,293 )294 def _configure_node(295 self,296 node: Node,297 node_idx: int,298 node_space: schema.NodeSpace,299 node_runbook: BaseLibvirtNodeSchema,300 vm_name_prefix: str,301 ) -> None:302 node_context = get_node_context(node)303 if (304 not node_runbook.firmware_type305 or node_runbook.firmware_type == FIRMWARE_TYPE_UEFI306 ):307 node_context.use_bios_firmware = False308 elif node_runbook.firmware_type == FIRMWARE_TYPE_BIOS:309 node_context.use_bios_firmware = True310 if node_runbook.enable_secure_boot:311 raise LisaException("Secure-boot requires UEFI firmware.")312 else:313 raise LisaException(314 f"Unknown node firmware type: {node_runbook.firmware_type}."315 f"Expecting either {FIRMWARE_TYPE_UEFI} or {FIRMWARE_TYPE_BIOS}."316 )317 node_context.machine_type = node_runbook.machine_type or None318 node_context.enable_secure_boot = node_runbook.enable_secure_boot319 node_context.vm_name = f"{vm_name_prefix}-{node_idx}"320 if not node.name:321 node.name = node_context.vm_name322 node_context.cloud_init_file_path = os.path.join(323 self.vm_disks_dir, f"{node_context.vm_name}-cloud-init.iso"324 )325 if self.host_node.is_remote:326 node_context.os_disk_source_file_path = node_runbook.disk_img327 node_context.os_disk_base_file_path = os.path.join(328 self.vm_disks_dir, os.path.basename(node_runbook.disk_img)329 )330 else:331 node_context.os_disk_base_file_path = node_runbook.disk_img332 node_context.os_disk_base_file_fmt = DiskImageFormat(333 node_runbook.disk_img_format334 )335 node_context.os_disk_file_path = os.path.join(336 self.vm_disks_dir, f"{node_context.vm_name}-os.qcow2"337 )338 node_context.console_log_file_path = str(339 node.local_log_path / "qemu-console.log"340 )341 # Read extra cloud-init data.342 extra_user_data = (343 node_runbook.cloud_init and node_runbook.cloud_init.extra_user_data344 )345 if extra_user_data:346 node_context.extra_cloud_init_user_data = []347 if isinstance(extra_user_data, str):348 extra_user_data = [extra_user_data]349 for relative_file_path in extra_user_data:350 if not relative_file_path:351 continue352 file_path = constants.RUNBOOK_PATH.joinpath(relative_file_path)353 with open(file_path, "r") as file:354 node_context.extra_cloud_init_user_data.append(yaml.safe_load(file))355 # Configure data disks.356 if node_space.disk:357 assert isinstance(358 node_space.disk.data_disk_count, int359 ), f"actual: {type(node_space.disk.data_disk_count)}"360 assert isinstance(361 node_space.disk.data_disk_size, int362 ), f"actual: {type(node_space.disk.data_disk_size)}"363 for i in range(node_space.disk.data_disk_count):364 data_disk = DataDiskContext()365 data_disk.file_path = os.path.join(366 self.vm_disks_dir, f"{node_context.vm_name}-data-{i}.qcow2"367 )368 data_disk.size_gib = node_space.disk.data_disk_size369 node_context.data_disks.append(data_disk)370 def _create_domain_and_attach_logger(371 self,372 libvirt_conn: libvirt.virConnect,373 node_context: NodeContext,374 ) -> None:375 # Start the VM in the paused state.376 # This gives the console logger a chance to connect before the VM starts377 # for real.378 assert node_context.domain379 node_context.domain.createWithFlags(libvirt.VIR_DOMAIN_START_PAUSED)380 # Attach the console logger381 node_context.console_logger = QemuConsoleLogger()382 node_context.console_logger.attach(383 libvirt_conn, node_context.domain, node_context.console_log_file_path384 )385 # Start the VM.386 node_context.domain.resume()387 # Create all the VMs.388 def _create_nodes(389 self,390 environment: Environment,391 log: Logger,392 lv_conn: libvirt.virConnect,393 ) -> None:394 self.host_node.shell.mkdir(Path(self.vm_disks_dir), exist_ok=True)395 for node in environment.nodes.list():396 node_context = get_node_context(node)397 self._create_node(398 node,399 node_context,400 environment,401 log,402 lv_conn,403 )404 def _create_node(405 self,406 node: Node,407 node_context: NodeContext,408 environment: Environment,409 log: Logger,410 lv_conn: libvirt.virConnect,411 ) -> None:412 # Create required directories and copy the required files to the host413 # node.414 if node_context.os_disk_source_file_path:415 source_exists = self.host_node.tools[Ls].path_exists(416 path=node_context.os_disk_base_file_path, sudo=True417 )418 if not source_exists:419 self.host_node.shell.copy(420 Path(node_context.os_disk_source_file_path),421 Path(node_context.os_disk_base_file_path),422 )423 # Create cloud-init ISO file.424 self._create_node_cloud_init_iso(environment, log, node)425 # Create OS disk from the provided image.426 self._create_node_os_disk(environment, log, node)427 # Create data disks428 self._create_node_data_disks(node)429 # Create libvirt domain (i.e. VM).430 xml = self._create_node_domain_xml(environment, log, node, lv_conn)431 node_context.domain = lv_conn.defineXML(xml)432 self._create_domain_and_attach_logger(433 lv_conn,434 node_context,435 )436 # Delete all the VMs.437 def _delete_nodes(self, environment: Environment, log: Logger) -> None:438 # Delete nodes.439 for node in environment.nodes.list():440 self._delete_node(node, log)441 # Delete VM disks directory.442 try:443 self.host_node.shell.remove(Path(self.vm_disks_dir), True)444 except Exception as ex:445 log.warning(f"Failed to delete VM files directory: {ex}")446 def _delete_node_watchdog_callback(self) -> None:447 print("VM delete watchdog timer fired.\n", file=sys.__stderr__)448 faulthandler.dump_traceback(file=sys.__stderr__, all_threads=True)449 os._exit(1)450 def _delete_node(self, node: Node, log: Logger) -> None:451 node_context = get_node_context(node)452 watchdog = Timer(60.0, self._delete_node_watchdog_callback)453 watchdog.start()454 # Stop the VM.455 if node_context.domain:456 log.debug(f"Stop VM: {node_context.vm_name}")457 try:458 # In the libvirt API, "destroy" means "stop".459 node_context.domain.destroy()460 except libvirt.libvirtError as ex:461 log.warning(f"VM stop failed. {ex}")462 # Wait for console log to close.463 # Note: libvirt can deadlock if you try to undefine the VM while the stream464 # is trying to close.465 if node_context.console_logger:466 log.debug(f"Close VM console log: {node_context.vm_name}")467 node_context.console_logger.close()468 node_context.console_logger = None469 # Undefine the VM.470 if node_context.domain:471 log.debug(f"Delete VM: {node_context.vm_name}")472 try:473 node_context.domain.undefineFlags(self._get_domain_undefine_flags())474 except libvirt.libvirtError as ex:475 log.warning(f"VM delete failed. {ex}")476 node_context.domain = None477 watchdog.cancel()478 def _get_domain_undefine_flags(self) -> int:479 return int(480 libvirt.VIR_DOMAIN_UNDEFINE_MANAGED_SAVE481 | libvirt.VIR_DOMAIN_UNDEFINE_SNAPSHOTS_METADATA482 | libvirt.VIR_DOMAIN_UNDEFINE_NVRAM483 | libvirt.VIR_DOMAIN_UNDEFINE_CHECKPOINTS_METADATA484 )485 def _stop_port_forwarding(self, environment: Environment, log: Logger) -> None:486 log.debug(f"Clearing port forwarding rules for environment {environment.name}")487 environment_context = get_environment_context(environment)488 for (port, address) in environment_context.port_forwarding_list:489 self.host_node.tools[Iptables].stop_forwarding(port, address, 22)490 # Retrieve the VMs' dynamic properties (e.g. IP address).491 def _fill_nodes_metadata(492 self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect493 ) -> None:494 environment_context = get_environment_context(environment)495 # Give all the VMs some time to boot and then acquire an IP address.496 timeout = time.time() + environment_context.network_boot_timeout497 if self.host_node.is_remote:498 remote_node = cast(RemoteNode, self.host_node)499 conn_info = remote_node.connection_info500 address = conn_info[constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS]501 for node in environment.nodes.list():502 assert isinstance(node, RemoteNode)503 # Get the VM's IP address.504 local_address = self._get_node_ip_address(505 environment, log, lv_conn, node, timeout506 )507 node_port = 22508 if self.host_node.is_remote:509 with self._port_forwarding_lock:510 port_not_found = True511 while port_not_found:512 if self._next_available_port > 65535:513 raise LisaException(514 "No available ports on the host to forward"515 )516 # check if the port is already in use517 output = self.host_node.execute(518 f"nc -vz 127.0.0.1 {self._next_available_port}"519 )520 if output.exit_code == 1: # port not in use521 node_port = self._next_available_port522 port_not_found = False523 self._next_available_port += 1524 self.host_node.tools[Iptables].start_forwarding(525 node_port, local_address, 22526 )527 environment_context.port_forwarding_list.append(528 (node_port, local_address)529 )530 else:531 address = local_address532 # Set SSH connection info for the node.533 node.set_connection_info(534 address=local_address,535 public_address=address,536 public_port=node_port,537 username=self.runbook.admin_username,538 private_key_file=self.runbook.admin_private_key_file,539 )540 # Ensure cloud-init completes its setup.541 node.execute(542 "cloud-init status --wait",543 sudo=True,544 expected_exit_code=0,545 expected_exit_code_failure_message="waiting on cloud-init",546 )547 # Create a cloud-init ISO for a VM.548 def _create_node_cloud_init_iso(549 self, environment: Environment, log: Logger, node: Node550 ) -> None:551 environment_context = get_environment_context(environment)552 node_context = get_node_context(node)553 user_data = {554 "users": [555 "default",556 {557 "name": self.runbook.admin_username,558 "shell": "/bin/bash",559 "sudo": ["ALL=(ALL) NOPASSWD:ALL"],560 "groups": ["sudo", "docker"],561 "ssh_authorized_keys": [environment_context.ssh_public_key],562 },563 ],564 }565 # Iterate through all the top-level properties.566 for extra_user_data in node_context.extra_cloud_init_user_data:567 for key, value in extra_user_data.items():568 existing_value = user_data.get(key)569 if not existing_value:570 # Property doesn't exist yet. So, add it.571 user_data[key] = value572 elif isinstance(existing_value, dict) and isinstance(value, dict):573 # Merge two dictionaries by adding properties from new value and574 # replacing any existing properties.575 # Examples: disk_setup, etc.576 existing_value.update(value)577 elif isinstance(existing_value, list) and isinstance(value, list):578 # Merge two lists by appending to the end of the existing list.579 # Examples: write_files, runcmd, etc.580 existing_value.extend(value)581 else:582 # String, unknown type or mismatched type.583 # Just replace the existing property.584 user_data[key] = value585 meta_data = {586 "local-hostname": node_context.vm_name,587 }588 # Note: cloud-init requires the user-data file to be prefixed with589 # `#cloud-config`.590 user_data_string = "#cloud-config\n" + yaml.safe_dump(user_data)591 meta_data_string = yaml.safe_dump(meta_data)592 iso_path = node_context.cloud_init_file_path593 tmp_dir = tempfile.TemporaryDirectory()594 try:595 iso_path = os.path.join(tmp_dir.name, "cloud-init.iso")596 self._create_iso(597 iso_path,598 [("/user-data", user_data_string), ("/meta-data", meta_data_string)],599 )600 self.host_node.shell.copy(601 Path(iso_path), Path(node_context.cloud_init_file_path)602 )603 finally:604 tmp_dir.cleanup()605 # Create an ISO file.606 def _create_iso(self, file_path: str, files: List[Tuple[str, str]]) -> None:607 iso = pycdlib.PyCdlib()608 iso.new(joliet=3, vol_ident="cidata")609 for i, file in enumerate(files):610 path, contents = file611 contents_data = contents.encode()612 iso.add_fp(613 io.BytesIO(contents_data),614 len(contents_data),615 f"/{i}.;1",616 joliet_path=path,617 )618 iso.write(file_path)619 # Create the OS disk.620 def _create_node_os_disk(621 self, environment: Environment, log: Logger, node: Node622 ) -> None:623 raise NotImplementedError()624 def _create_node_data_disks(self, node: Node) -> None:625 node_context = get_node_context(node)626 qemu_img = self.host_node.tools[QemuImg]627 for disk in node_context.data_disks:628 qemu_img.create_new_qcow2(disk.file_path, disk.size_gib * 1024)629 # Create the XML definition for the VM.630 def _create_node_domain_xml(631 self,632 environment: Environment,633 log: Logger,634 node: Node,635 lv_conn: libvirt.virConnect,636 ) -> str:637 node_context = get_node_context(node)638 domain = ET.Element("domain")639 domain.attrib["type"] = "kvm"640 name = ET.SubElement(domain, "name")641 name.text = node_context.vm_name642 memory = ET.SubElement(domain, "memory")643 memory.attrib["unit"] = "MiB"644 assert isinstance(node.capability.memory_mb, int)645 memory.text = str(node.capability.memory_mb)646 vcpu = ET.SubElement(domain, "vcpu")647 assert isinstance(node.capability.core_count, int)648 vcpu.text = str(node.capability.core_count)649 os_tag = ET.SubElement(domain, "os")650 os_type = ET.SubElement(os_tag, "type")651 os_type.text = "hvm"652 if node_context.machine_type:653 os_type.attrib["machine"] = node_context.machine_type654 if not node_context.use_bios_firmware:655 # In an ideal world, we would use libvirt's firmware auto-selection feature.656 # Unfortunatley, it isn't possible to specify the secure-boot state until657 # libvirt v7.2.0 and Ubuntu 20.04 only has libvirt v6.0.0. Therefore, we658 # have to select the firmware manually.659 firmware_config = self._get_firmware_config(660 lv_conn, node_context.machine_type, node_context.enable_secure_boot661 )662 print(firmware_config)663 loader = ET.SubElement(os_tag, "loader")664 loader.attrib["readonly"] = "yes"665 loader.attrib["type"] = "pflash"666 loader.attrib["secure"] = "yes" if node_context.enable_secure_boot else "no"667 loader.text = firmware_config["mapping"]["executable"]["filename"]668 nvram = ET.SubElement(os_tag, "nvram")669 nvram.attrib["template"] = firmware_config["mapping"]["nvram-template"][670 "filename"671 ]672 features = ET.SubElement(domain, "features")673 ET.SubElement(features, "acpi")674 ET.SubElement(features, "apic")675 cpu = ET.SubElement(domain, "cpu")676 cpu.attrib["mode"] = "host-passthrough"677 clock = ET.SubElement(domain, "clock")678 clock.attrib["offset"] = "utc"679 on_poweroff = ET.SubElement(domain, "on_poweroff")680 on_poweroff.text = "destroy"681 on_reboot = ET.SubElement(domain, "on_reboot")682 on_reboot.text = "restart"683 on_crash = ET.SubElement(domain, "on_crash")684 on_crash.text = "destroy"685 devices = ET.SubElement(domain, "devices")686 serial = ET.SubElement(devices, "serial")687 serial.attrib["type"] = "pty"688 serial_target = ET.SubElement(serial, "target")689 serial_target.attrib["type"] = "isa-serial"690 serial_target.attrib["port"] = "0"691 serial_target_model = ET.SubElement(serial_target, "model")692 serial_target_model.attrib["name"] = "isa-serial"693 console = ET.SubElement(devices, "console")694 console.attrib["type"] = "pty"695 console_target = ET.SubElement(console, "target")696 console_target.attrib["type"] = "serial"697 console_target.attrib["port"] = "0"698 video = ET.SubElement(devices, "video")699 video_model = ET.SubElement(video, "model")700 if isinstance(self.host_node.os, CBLMariner):701 video_model.attrib["type"] = "vga"702 else:703 video_model.attrib["type"] = "qxl"704 graphics = ET.SubElement(devices, "graphics")705 graphics.attrib["type"] = "spice"706 network_interface = ET.SubElement(devices, "interface")707 network_interface.attrib["type"] = "network"708 network_interface_source = ET.SubElement(network_interface, "source")709 network_interface_source.attrib["network"] = "default"710 network_interface_model = ET.SubElement(network_interface, "model")711 network_interface_model.attrib["type"] = "virtio"712 self._add_disk_xml(713 node_context,714 devices,715 node_context.cloud_init_file_path,716 "cdrom",717 "raw",718 "sata",719 )720 self._add_disk_xml(721 node_context,722 devices,723 node_context.os_disk_file_path,724 "disk",725 "qcow2",726 "virtio",727 )728 for data_disk in node_context.data_disks:729 self._add_disk_xml(730 node_context,731 devices,732 data_disk.file_path,733 "disk",734 "qcow2",735 "virtio",736 )737 xml = ET.tostring(domain, "unicode")738 return xml739 def _add_disk_xml(740 self,741 node_context: NodeContext,742 devices: ET.Element,743 file_path: str,744 device_type: str,745 image_type: str,746 bus_type: str,747 ) -> None:748 device_name = self._new_disk_device_name(node_context)749 disk = ET.SubElement(devices, "disk")750 disk.attrib["type"] = "file"751 disk.attrib["device"] = device_type752 disk_driver = ET.SubElement(disk, "driver")753 disk_driver.attrib["name"] = "qemu"754 disk_driver.attrib["type"] = image_type755 disk_target = ET.SubElement(disk, "target")756 disk_target.attrib["dev"] = device_name757 disk_target.attrib["bus"] = bus_type758 disk_source = ET.SubElement(disk, "source")759 disk_source.attrib["file"] = file_path760 def _add_virtio_disk_xml(761 self,762 node_context: NodeContext,763 devices: ET.Element,764 file_path: str,765 queues: int,766 ) -> None:767 device_name = self._new_disk_device_name(node_context, True)768 disk = ET.SubElement(devices, "disk")769 disk.attrib["type"] = "file"770 disk_driver = ET.SubElement(disk, "driver")771 disk_driver.attrib["if"] = "virtio"772 disk_driver.attrib["type"] = "raw"773 disk_driver.attrib["queues"] = str(queues)774 disk_target = ET.SubElement(disk, "target")775 disk_target.attrib["dev"] = device_name776 disk_source = ET.SubElement(disk, "source")777 disk_source.attrib["file"] = file_path778 def _new_disk_device_name(779 self,780 node_context: NodeContext,781 is_paravirtualized: bool = False,782 ) -> str:783 disk_index = node_context.next_disk_index784 node_context.next_disk_index += 1785 device_name = self._get_disk_device_name(disk_index, is_paravirtualized)786 return device_name787 def _get_disk_device_name(788 self, disk_index: int, is_paravirtualized: bool = False789 ) -> str:790 # The disk device name is required to follow the standard Linux device naming791 # scheme. That is: [ sda, sdb, ..., sdz, sdaa, sdab, ... ]. However, it is792 # unlikely that someone will ever need more than 26 disks. So, keep is simple793 # for now.794 if disk_index < 0 or disk_index > 25:795 raise LisaException(f"Unsupported disk index: {disk_index}.")796 prefix = "v" if is_paravirtualized else "s"797 suffix = chr(ord("a") + disk_index)798 return f"{prefix}d{suffix}"799 # Wait for the VM to boot and then get the IP address.800 def _get_node_ip_address(801 self,802 environment: Environment,803 log: Logger,804 lv_conn: libvirt.virConnect,805 node: Node,806 timeout: float,807 ) -> str:808 node_context = get_node_context(node)809 while True:810 addr = self._try_get_node_ip_address(environment, log, lv_conn, node)811 if addr:812 return addr813 if time.time() > timeout:814 raise LisaException(f"no IP addresses found for {node_context.vm_name}")815 # Try to get the IP address of the VM.816 def _try_get_node_ip_address(817 self,818 environment: Environment,819 log: Logger,820 lv_conn: libvirt.virConnect,821 node: Node,822 ) -> Optional[str]:823 node_context = get_node_context(node)824 domain = lv_conn.lookupByName(node_context.vm_name)825 # Acquire IP address from libvirt's DHCP server.826 interfaces = domain.interfaceAddresses(827 libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE828 )829 if len(interfaces) < 1:830 return None831 interface_name = next(iter(interfaces))832 addrs = interfaces[interface_name]["addrs"]833 if len(addrs) < 1:834 return None835 addr = addrs[0]["addr"]836 assert isinstance(addr, str)837 return addr838 def _get_firmware_config(839 self,840 lv_conn: libvirt.virConnect,841 machine_type: Optional[str],842 enable_secure_boot: bool,843 ) -> Dict[str, Any]:844 # Resolve the machine type to its full name.845 domain_caps_str = lv_conn.getDomainCapabilities(846 machine=machine_type, virttype="kvm"847 )848 domain_caps = ET.fromstring(domain_caps_str)849 full_machine_type = domain_caps.findall("./machine")[0].text850 arch = domain_caps.findall("./arch")[0].text851 # Read the QEMU firmware config files.852 # Note: "/usr/share/qemu/firmware" is a well known location for these files.853 firmware_configs_str = self.host_node.execute(854 "cat /usr/share/qemu/firmware/*.json",855 shell=True,856 expected_exit_code=0,857 no_debug_log=True,858 ).stdout859 firmware_configs = self._read_concat_json_str(firmware_configs_str)860 # Filter on architecture.861 filtered_firmware_configs = filter(862 lambda f: f["targets"][0]["architecture"] == arch, firmware_configs863 )864 # Filter on machine type.865 filtered_firmware_configs = filter(866 lambda f: any(867 fnmatch.fnmatch(full_machine_type, target_machine)868 for target_machine in f["targets"][0]["machines"]869 ),870 filtered_firmware_configs,871 )872 # Exclude Intel TDX and AMD SEV-ES firmwares.873 filtered_firmware_configs = filter(874 lambda f: "intel-tdx" not in f["features"]875 and "amd-sev-es" not in f["features"],876 filtered_firmware_configs,877 )878 # Filter on secure boot.879 if enable_secure_boot:880 filtered_firmware_configs = filter(881 lambda f: "secure-boot" in f["features"]882 and "enrolled-keys" in f["features"],883 filtered_firmware_configs,884 )885 else:886 filtered_firmware_configs = filter(887 lambda f: "secure-boot" not in f["features"], filtered_firmware_configs888 )889 # Get first matching firmware.890 firmware_config = next(filtered_firmware_configs, None)891 if firmware_config is None:892 raise LisaException(893 f"Could not find matching firmware for machine-type={machine_type} "894 f"and secure-boot={enable_secure_boot}."895 )896 return firmware_config897 # Read a bunch of JSON files that have been concatenated together.898 def _read_concat_json_str(self, json_str: str) -> List[Dict[str, Any]]:899 objs = []900 # From: https://stackoverflow.com/a/42985887901 decoder = json.JSONDecoder()902 text = json_str.lstrip() # decode hates leading whitespace903 while text:904 obj, index = decoder.raw_decode(text)905 text = text[index:].lstrip()906 objs.append(obj)907 return objs908 def _libvirt_uri_schema(self) -> str:909 raise NotImplementedError()910 def __init_libvirt_conn_string(self) -> None:911 hypervisor = self._libvirt_uri_schema()912 host = self.platform_runbook.hosts[0]913 host_addr = ""914 transport = ""915 params = ""916 if host.is_remote():917 assert host.address918 assert host.username919 host_addr = f"{host.username}@{host.address}"920 transport = "+ssh"921 params = f"?keyfile={host.private_key_file}"922 self.libvirt_conn_str = f"{hypervisor}{transport}://{host_addr}/system{params}"923 def __platform_runbook_type(self) -> type:924 platform_runbook_type: type = type(self).platform_runbook_type()925 assert issubclass(platform_runbook_type, BaseLibvirtPlatformSchema)926 return platform_runbook_type927 def __node_runbook_type(self) -> type:928 node_runbook_type: type = type(self).node_runbook_type()929 assert issubclass(node_runbook_type, BaseLibvirtNodeSchema)930 return node_runbook_type931 def _get_host_distro(self) -> str:932 result = self.host_node.os.information.full_version if self.host_node else ""933 return result934 def _get_host_kernel_version(self) -> str:935 result = ""936 if self.host_node:937 uname = self.host_node.tools[Uname]938 result = uname.get_linux_information().kernel_version_raw939 return result940 def _get_libvirt_version(self) -> str:941 result = ""942 if self.host_node:...
ch_platform.py
Source:ch_platform.py
...25 @classmethod26 def supported_features(cls) -> List[Type[Feature]]:27 return BaseLibvirtPlatform._supported_features28 @classmethod29 def node_runbook_type(cls) -> type:30 return CloudHypervisorNodeSchema31 def _libvirt_uri_schema(self) -> str:32 return "ch"33 def _configure_node(34 self,35 node: Node,36 node_idx: int,37 node_space: schema.NodeSpace,38 node_runbook: BaseLibvirtNodeSchema,39 vm_name_prefix: str,40 ) -> None:41 super()._configure_node(42 node,43 node_idx,...
qemu_platform.py
Source:qemu_platform.py
...19 @classmethod20 def supported_features(cls) -> List[Type[Feature]]:21 return BaseLibvirtPlatform._supported_features22 @classmethod23 def node_runbook_type(cls) -> type:24 return QemuNodeSchema25 def _libvirt_uri_schema(self) -> str:26 return "qemu"27 # Create the OS disk.28 def _create_node_os_disk(29 self, environment: Environment, log: Logger, node: Node30 ) -> None:31 node_context = get_node_context(node)32 self.host_node.tools[QemuImg].create_diff_qcow2(33 node_context.os_disk_file_path, node_context.os_disk_base_file_path34 )35 def _get_vmm_version(self) -> str:36 result = "Unknown"37 if self.host_node:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!