Best Python code snippet using lisa_python
features.py
Source:features.py
...1060 add_disk_names = [managed_disk.name for managed_disk in managed_disks]1061 self.disks += add_disk_names1062 self._node.capability.disk.data_disk_count += len(managed_disks)1063 return add_disk_names1064 def remove_data_disk(self, names: Optional[List[str]] = None) -> None:1065 assert self._node.capability.disk1066 platform: AzurePlatform = self._platform # type: ignore1067 compute_client = get_compute_client(platform)1068 # if names is None, remove all data disks1069 if names is None:1070 names = self.disks1071 # detach managed disk1072 azure_platform: AzurePlatform = self._platform # type: ignore1073 vm = get_vm(azure_platform, self._node)1074 # remove managed disk1075 data_disks = vm.storage_profile.data_disks1076 data_disks[:] = [disk for disk in data_disks if disk.name not in names]1077 # update vm1078 async_vm_update = compute_client.virtual_machines.begin_create_or_update(...
storage.py
Source:storage.py
...350 disk = node.features[Disk]351 # cleanup any disks added as part of the test352 # If the cleanup operation fails, mark node to be recycled353 try:354 disk.remove_data_disk()355 except Exception:356 raise BadEnvironmentStateException357 def _hot_add_disk_serial(358 self, log: Logger, node: Node, type: DiskType, size: int359 ) -> None:360 disk = node.features[Disk]361 lsblk = node.tools[Lsblk]362 # get max data disk count for the node363 assert node.capability.disk364 assert isinstance(node.capability.disk.max_data_disk_count, int)365 max_data_disk_count = node.capability.disk.max_data_disk_count366 log.debug(f"max_data_disk_count: {max_data_disk_count}")367 # get the number of data disks already added to the vm368 assert isinstance(node.capability.disk.data_disk_count, int)369 current_data_disk_count = node.capability.disk.data_disk_count370 log.debug(f"current_data_disk_count: {current_data_disk_count}")371 # disks to be added to the vm372 disks_to_add = max_data_disk_count - current_data_disk_count373 # get partition info before adding data disk374 partitions_before_adding_disk = lsblk.get_disks(force_run=True)375 for _ in range(disks_to_add):376 # add data disk377 log.debug("Adding 1 managed disk")378 disks_added = disk.add_data_disk(1, type, size)379 # verify that partition count is increased by 1380 # and the size of partition is correct381 partitons_after_adding_disk = lsblk.get_disks(force_run=True)382 added_partitions = [383 item384 for item in partitons_after_adding_disk385 if item not in partitions_before_adding_disk386 ]387 log.debug(f"added_partitions: {added_partitions}")388 assert_that(added_partitions, "Data disk should be added").is_length(1)389 assert_that(390 added_partitions[0].size_in_gb,391 f"data disk { added_partitions[0].name} size should be equal to "392 f"{size} GB",393 ).is_equal_to(size)394 # remove data disk395 log.debug(f"Removing managed disk: {disks_added}")396 disk.remove_data_disk(disks_added)397 # verify that partition count is decreased by 1398 partition_after_removing_disk = lsblk.get_disks(force_run=True)399 added_partitions = [400 item401 for item in partitions_before_adding_disk402 if item not in partition_after_removing_disk403 ]404 assert_that(added_partitions, "data disks should not be present").is_length(405 0406 )407 def _hot_add_disk_parallel(408 self, log: Logger, node: Node, type: DiskType, size: int409 ) -> None:410 disk = node.features[Disk]411 lsblk = node.tools[Lsblk]412 # get max data disk count for the node413 assert node.capability.disk414 assert isinstance(node.capability.disk.max_data_disk_count, int)415 max_data_disk_count = node.capability.disk.max_data_disk_count416 log.debug(f"max_data_disk_count: {max_data_disk_count}")417 # get the number of data disks already added to the vm418 assert isinstance(node.capability.disk.data_disk_count, int)419 current_data_disk_count = node.capability.disk.data_disk_count420 log.debug(f"current_data_disk_count: {current_data_disk_count}")421 # disks to be added to the vm422 disks_to_add = max_data_disk_count - current_data_disk_count423 # get partition info before adding data disks424 partitions_before_adding_disks = lsblk.get_disks(force_run=True)425 # add data disks426 log.debug(f"Adding {disks_to_add} managed disks")427 disks_added = disk.add_data_disk(disks_to_add, type, size)428 # verify that partition count is increased by disks_to_add429 # and the size of partition is correct430 partitons_after_adding_disks = lsblk.get_disks(force_run=True)431 added_partitions = [432 item433 for item in partitons_after_adding_disks434 if item not in partitions_before_adding_disks435 ]436 log.debug(f"added_partitions: {added_partitions}")437 assert_that(438 added_partitions, f"{disks_to_add} disks should be added"439 ).is_length(disks_to_add)440 for partition in added_partitions:441 assert_that(442 partition.size_in_gb,443 f"data disk {partition.name} size should be equal to {size} GB",444 ).is_equal_to(size)445 # remove data disks446 log.debug(f"Removing managed disks: {disks_added}")447 disk.remove_data_disk(disks_added)448 # verify that partition count is decreased by disks_to_add449 partition_after_removing_disk = lsblk.get_disks(force_run=True)450 added_partitions = [451 item452 for item in partitions_before_adding_disks453 if item not in partition_after_removing_disk454 ]455 assert_that(added_partitions, "data disks should not be present").is_length(0)456 def _get_managed_disk_id(self, identifier: str) -> str:457 return f"disk_{identifier}"458 def _get_mtab_mount_point_regex(self, mount_point: str) -> Pattern[str]:459 regex = re.compile(rf".*\s+\/dev\/(?P<partition>\D+).*\s+{mount_point}.*")...
disks.py
Source:disks.py
...40 type: schema.DiskType = schema.DiskType.StandardHDDLRS,41 size_in_gb: int = 20,42 ) -> List[str]:43 raise NotImplementedError44 def remove_data_disk(self, names: Optional[List[str]] = None) -> None:45 raise NotImplementedError46 def _initialize(self, *args: Any, **kwargs: Any) -> None:47 self.disks: List[str] = []48DiskEphemeral = partial(schema.DiskOptionSettings, disk_type=schema.DiskType.Ephemeral)49DiskPremiumSSDLRS = partial(50 schema.DiskOptionSettings, disk_type=schema.DiskType.PremiumSSDLRS51)52DiskStandardHDDLRS = partial(53 schema.DiskOptionSettings, disk_type=schema.DiskType.StandardHDDLRS54)55DiskStandardSSDLRS = partial(56 schema.DiskOptionSettings, disk_type=schema.DiskType.StandardSSDLRS...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!