Best Python code snippet using lisa_python
transformers.py
Source:transformers.py
...112 else:113 qemu_version = installer._get_version()114 self._log.info(f"Already installed! qemu version: {qemu_version}")115 if runbook.libvirt:116 _install_libvirt(runbook.libvirt, node, self._log)117 return {}118class CloudHypervisorInstallerTransformer(Transformer):119 @classmethod120 def type_name(cls) -> str:121 return "cloudhypervisor_installer"122 @classmethod123 def type_schema(cls) -> Type[schema.TypedSchema]:124 return InstallerTransformerSchema125 @property126 def _output_names(self) -> List[str]:127 return []128 def _internal_run(self) -> Dict[str, Any]:129 runbook: InstallerTransformerSchema = self.runbook130 assert runbook.connection, "connection must be defined."131 assert runbook.installer, "installer must be defined."132 node = quick_connect(runbook.connection, "cloudhypervisor_installer_node")133 factory = subclasses.Factory[CloudHypervisorInstaller](CloudHypervisorInstaller)134 installer = factory.create_by_runbook(135 runbook=runbook.installer,136 node=node,137 log=self._log,138 )139 if not installer._is_installed():140 installer.validate()141 ch_version = installer.install()142 self._log.info(f"installed cloud-hypervisor version: {ch_version}")143 node.reboot()144 else:145 ch_version = installer._get_version()146 self._log.info(f"Already installed! cloud-hypervisor version: {ch_version}")147 if runbook.libvirt:148 _install_libvirt(runbook.libvirt, node, self._log)149 return {}150class LibvirtPackageInstaller(LibvirtInstaller):151 _distro_package_mapping = {152 Ubuntu.__name__: ["libvirt-daemon-system"],153 CBLMariner.__name__: ["dnsmasq", "ebtables", "libvirt"],154 }155 @classmethod156 def type_name(cls) -> str:157 return "package"158 @classmethod159 def type_schema(cls) -> Type[schema.TypedSchema]:160 return BaseInstallerSchema161 def install(self) -> str:162 node: Node = self._node163 linux: Linux = cast(Linux, node.os)164 packages_list = self._distro_package_mapping[type(linux).__name__]165 self._log.info(f"installing packages: {packages_list}")166 linux.install_packages(packages_list)167 self._fix_mariner_installation()168 return self._get_version()169 # Some fixes to the libvirt installation on Mariner.170 # Can be removed once the issues have been addressed in Mariner.171 def _fix_mariner_installation(self) -> None:172 if not isinstance(self._node.os, CBLMariner):173 return174 self._node.tools[Usermod].add_user_to_group("libvirt", sudo=True)175 self._node.tools[Sed].substitute(176 "hidepid=2",177 "hidepid=0",178 "/etc/fstab",179 sudo=True,180 )181class QemuPackageInstaller(QemuInstaller):182 _distro_package_mapping = {183 Ubuntu.__name__: ["qemu-kvm"],184 CBLMariner.__name__: ["qemu-kvm", "edk2-ovmf"],185 }186 @classmethod187 def type_name(cls) -> str:188 return "package"189 @classmethod190 def type_schema(cls) -> Type[schema.TypedSchema]:191 return BaseInstallerSchema192 def install(self) -> str:193 node: Node = self._node194 linux: Linux = cast(Linux, node.os)195 packages_list = self._distro_package_mapping[type(linux).__name__]196 self._log.info(f"installing packages: {packages_list}")197 if isinstance(node.os, CBLMariner):198 linux.install_packages(199 ["mariner-repos-preview.noarch", "mariner-repos-extended"]200 )201 linux.install_packages(packages_list)202 username = node.tools[Whoami].get_username()203 node.tools[Usermod].add_user_to_group(group=username, user="qemu", sudo=True)204 return self._get_version()205class CloudHypervisorPackageInstaller(CloudHypervisorInstaller):206 _distro_package_mapping = {207 CBLMariner.__name__: ["cloud-hypervisor"],208 }209 @classmethod210 def type_name(cls) -> str:211 return "package"212 @classmethod213 def type_schema(cls) -> Type[schema.TypedSchema]:214 return BaseInstallerSchema215 def install(self) -> str:216 node: Node = self._node217 linux: Linux = cast(Linux, node.os)218 packages_list = self._distro_package_mapping[type(linux).__name__]219 self._log.info(f"installing packages: {packages_list}")220 linux.install_packages(packages_list)221 return self._get_version()222class LibvirtSourceInstaller(LibvirtInstaller):223 _distro_package_mapping = {224 Ubuntu.__name__: [225 "ninja-build",226 "dnsmasq-base",227 "libxml2-utils",228 "xsltproc",229 "python3-docutils",230 "libglib2.0-dev",231 "libgnutls28-dev",232 "libxml2-dev",233 "libnl-3-dev",234 "libnl-route-3-dev",235 "libtirpc-dev",236 "libyajl-dev",237 "libcurl4-gnutls-dev",238 "python3-pip",239 ],240 CBLMariner.__name__: [241 "ninja-build",242 "build-essential",243 "rpcsvc-proto",244 "python3-docutils",245 "glibc-devel",246 "glib-devel",247 "gnutls-devel",248 "libnl3-devel",249 "libtirpc-devel",250 "curl-devel",251 "ebtables",252 "yajl-devel",253 "python3-pip",254 "dnsmasq",255 "nmap-ncat",256 ],257 }258 @classmethod259 def type_name(cls) -> str:260 return "source"261 @classmethod262 def type_schema(cls) -> Type[schema.TypedSchema]:263 return SourceInstallerSchema264 def _build_and_install(self, code_path: PurePath) -> None:265 self._node.execute(266 "meson build -D driver_ch=enabled -D driver_qemu=disabled \\"267 "-D driver_openvz=disabled -D driver_esx=disabled \\"268 "-D driver_vmware=disabled -D driver_lxc=disabled \\"269 "-D driver_libxl=disabled -D driver_vbox=disabled \\"270 "-D selinux=disabled -D system=true --prefix=/usr \\"271 "-D git_werror=disabled",272 cwd=code_path,273 shell=True,274 sudo=True,275 )276 self._node.execute(277 "ninja -C build",278 shell=True,279 sudo=True,280 cwd=code_path,281 expected_exit_code=0,282 expected_exit_code_failure_message="'ninja -C build' command failed.",283 )284 self._node.execute(285 "ninja -C build install",286 shell=True,287 sudo=True,288 cwd=code_path,289 expected_exit_code=0,290 expected_exit_code_failure_message="'ninja -C install' command failed.",291 )292 self._node.execute("ldconfig", shell=True, sudo=True)293 def _install_dependencies(self) -> None:294 linux: Linux = cast(Linux, self._node.os)295 dep_packages_list = self._distro_package_mapping[type(linux).__name__]296 linux.install_packages(dep_packages_list)297 result = self._node.execute("pip3 install meson", shell=True, sudo=True)298 assert not result.exit_code, "Failed to install meson"299 def install(self) -> str:300 runbook: SourceInstallerSchema = self.runbook301 self._log.info("Installing dependencies for Libvirt...")302 self._install_dependencies()303 self._log.info("Cloning source code of Libvirt...")304 code_path = _get_source_code(runbook, self._node, self.type_name(), self._log)305 self._log.info("Building source code of Libvirt...")306 self._build_and_install(code_path)307 return self._get_version()308class CloudHypervisorSourceInstaller(CloudHypervisorInstaller):309 _distro_package_mapping = {310 Ubuntu.__name__: ["gcc"],311 CBLMariner.__name__: ["gcc", "binutils", "glibc-devel"],312 }313 @classmethod314 def type_name(cls) -> str:315 return "source"316 @classmethod317 def type_schema(cls) -> Type[schema.TypedSchema]:318 return SourceInstallerSchema319 def _build_and_install(self, code_path: PurePath) -> None:320 self._node.execute(321 "cargo build --release",322 shell=True,323 sudo=False,324 cwd=code_path,325 expected_exit_code=0,326 expected_exit_code_failure_message="Failed to build cloud-hypervisor",327 )328 self._node.execute(329 "cp ./target/release/cloud-hypervisor /usr/local/bin",330 shell=True,331 sudo=True,332 cwd=code_path,333 )334 self._node.execute(335 "chmod a+rx /usr/local/bin/cloud-hypervisor",336 shell=True,337 sudo=True,338 )339 self._node.execute(340 "setcap cap_net_admin+ep /usr/local/bin/cloud-hypervisor",341 shell=True,342 sudo=True,343 )344 def _install_dependencies(self) -> None:345 linux: Linux = cast(Linux, self._node.os)346 packages_list = self._distro_package_mapping[type(linux).__name__]347 linux.install_packages(packages_list)348 self._node.execute(349 "curl https://sh.rustup.rs -sSf | sh -s -- -y",350 shell=True,351 sudo=False,352 expected_exit_code=0,353 expected_exit_code_failure_message="Failed to install Rust & Cargo",354 )355 self._node.execute("source ~/.cargo/env", shell=True, sudo=False)356 if isinstance(self._node.os, Ubuntu):357 output = self._node.execute("echo $HOME", shell=True)358 path = self._node.get_pure_path(output.stdout)359 self._node.execute(360 "cp .cargo/bin/* /usr/local/bin/",361 shell=True,362 sudo=True,363 cwd=path,364 )365 self._node.execute("cargo --version", shell=True)366 def install(self) -> str:367 runbook: SourceInstallerSchema = self.runbook368 self._log.info("Installing dependencies for Cloudhypervisor...")369 self._install_dependencies()370 self._log.info("Cloning source code of Cloudhypervisor ...")371 code_path = _get_source_code(runbook, self._node, self.type_name(), self._log)372 self._log.info("Building source code of Cloudhypervisor...")373 self._build_and_install(code_path)374 return self._get_version()375class CloudHypervisorBinaryInstaller(CloudHypervisorInstaller):376 _distro_package_mapping = {377 Ubuntu.__name__: ["jq"],378 CBLMariner.__name__: ["jq"],379 }380 @classmethod381 def type_name(cls) -> str:382 return "binary"383 @classmethod384 def type_schema(cls) -> Type[schema.TypedSchema]:385 return BaseInstallerSchema386 def install(self) -> str:387 linux: Linux = cast(Linux, self._node.os)388 packages_list = self._distro_package_mapping[type(linux).__name__]389 self._log.info(f"installing packages: {packages_list}")390 linux.install_packages(packages_list)391 command = (392 "curl -s https://api.github.com/repos/cloud-hypervisor/"393 "cloud-hypervisor/releases/latest | jq -r '.tag_name'"394 )395 latest_release_tag = self._node.execute(command, shell=True)396 self._log.debug(f"latest tag: {latest_release_tag}")397 wget = self._node.tools[Wget]398 file_url = (399 "https://github.com/cloud-hypervisor/cloud-hypervisor/"400 f"releases/download/{latest_release_tag}/cloud-hypervisor"401 )402 file_path = wget.get(403 url=file_url,404 executable=True,405 filename="cloud-hypervisor",406 )407 self._node.execute(f"cp {file_path} /usr/local/bin", sudo=True)408 self._node.execute(409 "chmod a+rx /usr/local/bin/cloud-hypervisor",410 shell=True,411 sudo=True,412 )413 self._node.execute(414 "setcap cap_net_admin+ep /usr/local/bin/cloud-hypervisor",415 sudo=True,416 )417 return self._get_version()418def _get_source_code(419 runbook: SourceInstallerSchema, node: Node, default_name: str, log: Logger420) -> PurePath:421 code_path = node.working_path422 log.debug(f"cloning code from {runbook.repo} to {code_path}...")423 git = node.tools[Git]424 code_path = git.clone(url=runbook.repo, cwd=code_path, ref=runbook.ref)425 return code_path426def _install_libvirt(runbook: schema.TypedSchema, node: Node, log: Logger) -> None:427 libvirt_factory = subclasses.Factory[LibvirtInstaller](LibvirtInstaller)428 libvirt_installer = libvirt_factory.create_by_runbook(429 runbook=runbook,430 node=node,431 log=log,432 )433 if not libvirt_installer._is_installed():434 libvirt_installer.validate()435 libvirt_version = libvirt_installer.install()436 log.info(f"installed libvirt version: {libvirt_version}")437 if isinstance(node.os, Ubuntu):438 node.execute("systemctl disable apparmor", shell=True, sudo=True)439 node.execute("systemctl enable libvirtd", shell=True, sudo=True)440 node.reboot()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!