Best Python code snippet using lisa_python
storage.py
Source:storage.py
...95 .disk96 )97 mtab = node.tools[Cat].run("/etc/mtab").stdout98 resource_disk_from_mtab = get_matched_str(99 mtab, self._get_mtab_mount_point_regex(resource_disk_mount_point)100 )101 assert (102 resource_disk_from_mtab103 ), f"resource disk mountpoint not found {resource_disk_mount_point}"104 assert_that(105 resource_disk_from_mtab, "Resource disk should not be equal to os disk"106 ).is_not_equal_to(os_disk)107 @TestCaseMetadata(108 description="""109 This test will check that the swap is correctly configured on the VM.110 Steps:111 1. Check if swap file/partition is configured by checking the output of112 `swapon -s` and `lsblk`.113 2. Check swap status in `waagent.conf`.114 3. Verify that truth value in step 1 and step 2 match.115 """,116 priority=1,117 requirement=simple_requirement(supported_platform_type=[AZURE]),118 )119 def verify_swap(self, node: RemoteNode) -> None:120 is_swap_enabled_wa_agent = node.tools[Waagent].is_swap_enabled()121 is_swap_enabled_distro = node.tools[Swap].is_swap_enabled()122 assert_that(123 is_swap_enabled_distro,124 "swap configuration from waagent.conf and distro should match",125 ).is_equal_to(is_swap_enabled_wa_agent)126 @TestCaseMetadata(127 description="""128 This test will check that the file IO operations are working correctly129 Steps:130 1. Get the mount point for the resource disk. If `/var/log/cloud-init.log`131 file is present, mount location is `/mnt`, otherwise it is obtained from132 `ResourceDisk.MountPoint` entry in `waagent.conf` configuration file.133 2. Verify that resource disk is mounted from the output of `mount` command.134 3. Write a text file to the resource disk.135 4. Read the text file and verify that content is same.136 """,137 priority=1,138 requirement=simple_requirement(139 disk=AzureDiskOptionSettings(has_resource_disk=True),140 supported_platform_type=[AZURE],141 ),142 )143 def verify_resource_disk_io(self, log: Logger, node: RemoteNode) -> None:144 resource_disk_mount_point = get_resource_disk_mount_point(log, node)145 # verify that resource disk is mounted146 # function returns successfully if disk matching mount point is present147 node.features[Disk].get_partition_with_mount_point(resource_disk_mount_point)148 file_path = f"{resource_disk_mount_point}/sample.txt"149 original_text = "Writing to resource disk!!!"150 # write content to the file151 node.tools[Echo].write_to_file(152 original_text, node.get_pure_path(file_path), sudo=True153 )154 # read content from the file155 read_text = node.tools[Cat].read(file_path, force_run=True, sudo=True)156 assert_that(157 read_text,158 "content read from file should be equal to content written to file",159 ).is_equal_to(original_text)160 @TestCaseMetadata(161 description="""162 This test will verify that identifier of root partition matches163 from different sources.164 Steps:165 1. Get the partition identifier from `blkid` command.166 2. Verify that the partition identifier from `blkid` is present in dmesg.167 3. Verify that the partition identifier from `blkid` is present in fstab output.168 """,169 priority=1,170 requirement=simple_requirement(171 supported_platform_type=[AZURE],172 ),173 )174 def verify_os_partition_identifier(self, log: Logger, node: RemoteNode) -> None:175 # get information of root disk from blkid176 os_partition = (177 node.features[Disk]178 .get_partition_with_mount_point(self.os_disk_mount_point)179 .name180 )181 os_partition_info = node.tools[Blkid].get_partition_info_by_name(os_partition)182 # verify that root=<name> or root=uuid=<uuid> or root=partuuid=<part_uuid> is183 # present in dmesg184 dmesg = node.tools[Dmesg].run(sudo=True).stdout185 if (186 not get_matched_str(187 dmesg,188 re.compile(189 rf".*BOOT_IMAGE=.*root={os_partition_info.name}",190 ),191 )192 and not get_matched_str(193 dmesg, re.compile(rf".*BOOT_IMAGE=.*root=UUID={os_partition_info.uuid}")194 )195 and not get_matched_str(196 dmesg,197 re.compile(198 rf".*BOOT_IMAGE=.*root=PARTUUID={os_partition_info.part_uuid}"199 ),200 )201 ):202 raise LisaException(203 f"One of root={os_partition_info.name} or "204 f"root=UUID={os_partition_info.uuid} or "205 f"root=PARTUUID={os_partition_info.part_uuid} "206 "should be present in dmesg output"207 )208 # verify that "<uuid> /" or "<name> /"or "<part_uuid> /" present in /etc/fstab209 fstab = node.tools[Cat].run("/etc/fstab", sudo=True).stdout210 if (211 not get_matched_str(212 fstab,213 re.compile(214 rf".*{os_partition_info.name}\s+/",215 ),216 )217 and not get_matched_str(218 fstab,219 re.compile(rf".*UUID={os_partition_info.uuid}\s+/"),220 )221 and not get_matched_str(222 fstab,223 re.compile(rf".*PARTUUID={os_partition_info.part_uuid}\s+/"),224 )225 ):226 raise LisaException(227 f"One of '{os_partition_info.name} /' or "228 "'UUID={os_partition_info.uuid} /' or "229 "'PARTUUID={os_partition_info.part_uuid} /' should be present in fstab"230 )231 @TestCaseMetadata(232 description="""233 This test case will verify that the standard hdd data disks disks can234 be added one after other (serially) while the vm is running.235 Steps:236 1. Get maximum number of data disk for the current vm_size.237 2. Get the number of data disks already added to the vm.238 3. Serially add and remove the data disks and verify that the added239 disks are present in the vm.240 """,241 timeout=TIME_OUT,242 requirement=simple_requirement(disk=DiskStandardHDDLRS()),243 )244 def hot_add_disk_serial(self, log: Logger, node: Node) -> None:245 self._hot_add_disk_serial(246 log, node, DiskType.StandardHDDLRS, self.DEFAULT_DISK_SIZE_IN_GB247 )248 @TestCaseMetadata(249 description="""250 This test case will verify that the standard ssd data disks disks can251 be added serially while the vm is running. The test steps are same as252 `hot_add_disk_serial`.253 """,254 timeout=TIME_OUT,255 requirement=simple_requirement(disk=DiskStandardSSDLRS()),256 )257 def hot_add_disk_serial_standard_ssd(self, log: Logger, node: Node) -> None:258 self._hot_add_disk_serial(259 log, node, DiskType.StandardSSDLRS, self.DEFAULT_DISK_SIZE_IN_GB260 )261 @TestCaseMetadata(262 description="""263 This test case will verify that the premium ssd data disks disks can264 be added serially while the vm is running. The test steps are same as265 `hot_add_disk_serial`.266 """,267 timeout=TIME_OUT,268 requirement=simple_requirement(disk=DiskPremiumSSDLRS()),269 )270 def hot_add_disk_serial_premium_ssd(self, log: Logger, node: Node) -> None:271 self._hot_add_disk_serial(272 log, node, DiskType.PremiumSSDLRS, self.DEFAULT_DISK_SIZE_IN_GB273 )274 @TestCaseMetadata(275 description="""276 This test case will verify that the standard HDD data disks can277 be added in one go (parallel) while the vm is running.278 Steps:279 1. Get maximum number of data disk for the current vm_size.280 2. Get the number of data disks already added to the vm.281 3. Add maximum number of data disks to the VM in parallel.282 4. Verify that the disks are added are available in the OS.283 5. Remove the disks from the vm in parallel.284 6. Verify that the disks are removed from the OS.285 """,286 timeout=TIME_OUT,287 requirement=simple_requirement(disk=DiskStandardHDDLRS()),288 )289 def hot_add_disk_parallel(self, log: Logger, node: Node) -> None:290 self._hot_add_disk_parallel(291 log, node, DiskType.StandardHDDLRS, self.DEFAULT_DISK_SIZE_IN_GB292 )293 @TestCaseMetadata(294 description="""295 This test case will verify that the standard ssd data disks disks can296 be added serially while the vm is running. The test steps are same as297 `hot_add_disk_parallel`.298 """,299 timeout=TIME_OUT,300 requirement=simple_requirement(disk=DiskStandardSSDLRS()),301 )302 def hot_add_disk_parallel_standard_ssd(self, log: Logger, node: Node) -> None:303 self._hot_add_disk_parallel(304 log, node, DiskType.StandardSSDLRS, self.DEFAULT_DISK_SIZE_IN_GB305 )306 @TestCaseMetadata(307 description="""308 This test case will verify that the premium ssd data disks disks can309 be added serially while the vm is running. The test steps are same as310 `hot_add_disk_parallel`.311 """,312 timeout=TIME_OUT,313 requirement=simple_requirement(disk=DiskPremiumSSDLRS()),314 )315 def hot_add_disk_parallel_premium_ssd(self, log: Logger, node: Node) -> None:316 self._hot_add_disk_parallel(317 log, node, DiskType.PremiumSSDLRS, self.DEFAULT_DISK_SIZE_IN_GB318 )319 @TestCaseMetadata(320 description="""321 This test case will verify mount azure nfs on guest successfully.322 """,323 timeout=TIME_OUT,324 requirement=simple_requirement(supported_features=[Nfs]),325 priority=2,326 )327 def verify_azure_file_share_nfs(self, log: Logger, node: Node) -> None:328 nfs = node.features[Nfs]329 mount_dir = "/mount/azure_share"330 nfs.create_share()331 storage_account_name = nfs.storage_account_name332 mount_nfs = f"{storage_account_name}.file.core.windows.net"333 server_shared_dir = f"{nfs.storage_account_name}/{nfs.file_share_name}"334 try:335 node.tools[NFSClient].setup(336 mount_nfs,337 server_shared_dir,338 mount_dir,339 )340 except Exception as identifier:341 raise LisaException(342 f"fail to mount {server_shared_dir} into {mount_dir}"343 f"{identifier.__class__.__name__}: {identifier}."344 )345 finally:346 nfs.delete_share()347 node.tools[NFSClient].stop(mount_dir)348 def after_case(self, log: Logger, **kwargs: Any) -> None:349 node: Node = kwargs["node"]350 disk = node.features[Disk]351 # cleanup any disks added as part of the test352 # If the cleanup operation fails, mark node to be recycled353 try:354 disk.remove_data_disk()355 except Exception:356 raise BadEnvironmentStateException357 def _hot_add_disk_serial(358 self, log: Logger, node: Node, type: DiskType, size: int359 ) -> None:360 disk = node.features[Disk]361 lsblk = node.tools[Lsblk]362 # get max data disk count for the node363 assert node.capability.disk364 assert isinstance(node.capability.disk.max_data_disk_count, int)365 max_data_disk_count = node.capability.disk.max_data_disk_count366 log.debug(f"max_data_disk_count: {max_data_disk_count}")367 # get the number of data disks already added to the vm368 assert isinstance(node.capability.disk.data_disk_count, int)369 current_data_disk_count = node.capability.disk.data_disk_count370 log.debug(f"current_data_disk_count: {current_data_disk_count}")371 # disks to be added to the vm372 disks_to_add = max_data_disk_count - current_data_disk_count373 # get partition info before adding data disk374 partitions_before_adding_disk = lsblk.get_disks(force_run=True)375 for _ in range(disks_to_add):376 # add data disk377 log.debug("Adding 1 managed disk")378 disks_added = disk.add_data_disk(1, type, size)379 # verify that partition count is increased by 1380 # and the size of partition is correct381 partitons_after_adding_disk = lsblk.get_disks(force_run=True)382 added_partitions = [383 item384 for item in partitons_after_adding_disk385 if item not in partitions_before_adding_disk386 ]387 log.debug(f"added_partitions: {added_partitions}")388 assert_that(added_partitions, "Data disk should be added").is_length(1)389 assert_that(390 added_partitions[0].size_in_gb,391 f"data disk { added_partitions[0].name} size should be equal to "392 f"{size} GB",393 ).is_equal_to(size)394 # remove data disk395 log.debug(f"Removing managed disk: {disks_added}")396 disk.remove_data_disk(disks_added)397 # verify that partition count is decreased by 1398 partition_after_removing_disk = lsblk.get_disks(force_run=True)399 added_partitions = [400 item401 for item in partitions_before_adding_disk402 if item not in partition_after_removing_disk403 ]404 assert_that(added_partitions, "data disks should not be present").is_length(405 0406 )407 def _hot_add_disk_parallel(408 self, log: Logger, node: Node, type: DiskType, size: int409 ) -> None:410 disk = node.features[Disk]411 lsblk = node.tools[Lsblk]412 # get max data disk count for the node413 assert node.capability.disk414 assert isinstance(node.capability.disk.max_data_disk_count, int)415 max_data_disk_count = node.capability.disk.max_data_disk_count416 log.debug(f"max_data_disk_count: {max_data_disk_count}")417 # get the number of data disks already added to the vm418 assert isinstance(node.capability.disk.data_disk_count, int)419 current_data_disk_count = node.capability.disk.data_disk_count420 log.debug(f"current_data_disk_count: {current_data_disk_count}")421 # disks to be added to the vm422 disks_to_add = max_data_disk_count - current_data_disk_count423 # get partition info before adding data disks424 partitions_before_adding_disks = lsblk.get_disks(force_run=True)425 # add data disks426 log.debug(f"Adding {disks_to_add} managed disks")427 disks_added = disk.add_data_disk(disks_to_add, type, size)428 # verify that partition count is increased by disks_to_add429 # and the size of partition is correct430 partitons_after_adding_disks = lsblk.get_disks(force_run=True)431 added_partitions = [432 item433 for item in partitons_after_adding_disks434 if item not in partitions_before_adding_disks435 ]436 log.debug(f"added_partitions: {added_partitions}")437 assert_that(438 added_partitions, f"{disks_to_add} disks should be added"439 ).is_length(disks_to_add)440 for partition in added_partitions:441 assert_that(442 partition.size_in_gb,443 f"data disk {partition.name} size should be equal to {size} GB",444 ).is_equal_to(size)445 # remove data disks446 log.debug(f"Removing managed disks: {disks_added}")447 disk.remove_data_disk(disks_added)448 # verify that partition count is decreased by disks_to_add449 partition_after_removing_disk = lsblk.get_disks(force_run=True)450 added_partitions = [451 item452 for item in partitions_before_adding_disks453 if item not in partition_after_removing_disk454 ]455 assert_that(added_partitions, "data disks should not be present").is_length(0)456 def _get_managed_disk_id(self, identifier: str) -> str:457 return f"disk_{identifier}"458 def _get_mtab_mount_point_regex(self, mount_point: str) -> Pattern[str]:459 regex = re.compile(rf".*\s+\/dev\/(?P<partition>\D+).*\s+{mount_point}.*")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!