Best Python code snippet using lisa_python
xfstesting.py
Source:xfstesting.py
...54 node.execute("sync")55 for disk, mount_point in disk_mount.items():56 mkfs.format_disk(disk, file_system)57 node.execute(f"mkdir {mount_point}", sudo=True)58def _get_smb_version(node: Node) -> str:59 if node.tools[KernelConfig].is_enabled("CONFIG_CIFS_SMB311"):60 version = "3.1.1"61 else:62 version = "3.0"63 return version64def _prepare_azure_file_share(65 node: Node,66 account_credential: Dict[str, str],67 test_folders_share_dict: Dict[str, str],68 fstab_info: str,69) -> None:70 folder_path = node.get_pure_path("/etc/smbcredentials")71 if node.shell.exists(folder_path):72 node.execute(f"rm -rf {folder_path}", sudo=True)73 node.shell.mkdir(folder_path)74 file_path = node.get_pure_path("/etc/smbcredentials/lisa.cred")75 echo = node.tools[Echo]76 username = account_credential["account_name"]77 password = account_credential["account_key"]78 echo.write_to_file(f"username={username}", file_path, sudo=True, append=True)79 echo.write_to_file(f"password={password}", file_path, sudo=True, append=True)80 node.execute("cp -f /etc/fstab /etc/fstab_cifs", sudo=True)81 for folder_name, share in test_folders_share_dict.items():82 node.execute(f"mkdir {folder_name}", sudo=True)83 echo.write_to_file(84 f"{share} {folder_name} cifs {fstab_info}",85 node.get_pure_path("/etc/fstab"),86 sudo=True,87 append=True,88 )89@TestSuiteMetadata(90 area="storage",91 category="community",92 description="""93 This test suite is to validate different types of data disk on Linux VM94 using xfstests.95 """,96)97class Xfstesting(TestSuite):98 # Use xfstests benchmark to test the different types of data disk,99 # it will run many cases, so the runtime is longer than usual case.100 TIME_OUT = 14400101 # TODO: will include btrfs/244 once the kernel contains below fix.102 # exclude btrfs/244 temporarily for below commit not picked up by distro vendor.103 # https://git.kernel.org/pub/scm/linux/kernel/git/next/linux-next.git/commit/fs/btrfs/volumes.c?id=e4571b8c5e9ffa1e85c0c671995bd4dcc5c75091 # noqa: E501104 # TODO: will include ext4/054 once the kernel contains below fix.105 # This is a regression test for three kernel commit:106 # 1. 0f2f87d51aebc (ext4: prevent partial update of the extent blocks)107 # 2. 9c6e071913792 (ext4: check for inconsistent extents between index108 # and leaf block)109 # 3. 8dd27fecede55 (ext4: check for out-of-order index extents in110 # ext4_valid_extent_entries())111 # TODO: will figure out the detailed reason of every excluded case.112 EXCLUDED_TESTS = (113 "generic/211 generic/430 generic/431 generic/434 /xfs/438 xfs/490"114 + " btrfs/007 btrfs/178 btrfs/244 btrfs/262"115 + " xfs/030 xfs/032 xfs/050 xfs/052 xfs/106 xfs/107 xfs/122 xfs/132 xfs/138"116 + " xfs/144 xfs/148 xfs/175 xfs/191-input-validation xfs/289 xfs/293 xfs/424"117 + " xfs/432 xfs/500 xfs/508 xfs/512 xfs/514 xfs/515 xfs/516 xfs/518 xfs/521"118 + " xfs/528 xfs/544 ext4/054"119 )120 @TestCaseMetadata(121 description="""122 This test case will run generic xfstests testing against123 standard data disk with xfs type system.124 """,125 requirement=simple_requirement(126 disk=schema.DiskOptionSettings(127 disk_type=schema.DiskType.StandardHDDLRS,128 data_disk_iops=500,129 data_disk_count=search_space.IntRange(min=1),130 ),131 ),132 timeout=TIME_OUT,133 priority=3,134 )135 def xfstesting_generic_standard_datadisk_validation(136 self, log_path: Path, result: TestResult137 ) -> None:138 environment = result.environment139 assert environment, "fail to get environment from testresult"140 node = cast(RemoteNode, environment.nodes[0])141 xfstests = self._install_xfstests(node)142 disk = node.features[Disk]143 data_disks = disk.get_raw_data_disks()144 self._execute_xfstests(145 log_path,146 xfstests,147 result,148 data_disks[0],149 f"{data_disks[0]}1",150 f"{data_disks[0]}2",151 excluded_tests=self.EXCLUDED_TESTS,152 )153 @TestCaseMetadata(154 description="""155 This test case will run xfs xfstests testing against156 standard data disk with xfs type system.157 """,158 requirement=simple_requirement(159 disk=schema.DiskOptionSettings(160 disk_type=schema.DiskType.StandardHDDLRS,161 data_disk_iops=500,162 data_disk_count=search_space.IntRange(min=1),163 ),164 ),165 timeout=TIME_OUT,166 priority=3,167 )168 def xfstesting_xfs_standard_datadisk_validation(169 self, log_path: Path, result: TestResult170 ) -> None:171 environment = result.environment172 assert environment, "fail to get environment from testresult"173 node = cast(RemoteNode, environment.nodes[0])174 xfstests = self._install_xfstests(node)175 disk = node.features[Disk]176 data_disks = disk.get_raw_data_disks()177 self._execute_xfstests(178 log_path,179 xfstests,180 result,181 data_disks[0],182 f"{data_disks[0]}1",183 f"{data_disks[0]}2",184 test_type=FileSystem.xfs.name,185 excluded_tests=self.EXCLUDED_TESTS,186 )187 @TestCaseMetadata(188 description="""189 This test case will run ext4 xfstests testing against190 standard data disk with ext4 type system.191 """,192 requirement=simple_requirement(193 disk=schema.DiskOptionSettings(194 disk_type=schema.DiskType.StandardHDDLRS,195 data_disk_iops=500,196 data_disk_count=search_space.IntRange(min=1),197 ),198 ),199 timeout=TIME_OUT,200 priority=3,201 )202 def xfstesting_ext4_standard_datadisk_validation(203 self, log_path: Path, result: TestResult204 ) -> None:205 environment = result.environment206 assert environment, "fail to get environment from testresult"207 node = cast(RemoteNode, environment.nodes[0])208 xfstests = self._install_xfstests(node)209 disk = node.features[Disk]210 data_disks = disk.get_raw_data_disks()211 self._execute_xfstests(212 log_path,213 xfstests,214 result,215 data_disks[0],216 f"{data_disks[0]}1",217 f"{data_disks[0]}2",218 file_system=FileSystem.ext4,219 test_type=FileSystem.ext4.name,220 excluded_tests=self.EXCLUDED_TESTS,221 )222 @TestCaseMetadata(223 description="""224 This test case will run btrfs xfstests testing against225 standard data disk with btrfs type system.226 """,227 requirement=simple_requirement(228 disk=schema.DiskOptionSettings(229 disk_type=schema.DiskType.StandardHDDLRS,230 data_disk_iops=500,231 data_disk_count=search_space.IntRange(min=1),232 ),233 ),234 timeout=TIME_OUT,235 priority=3,236 )237 def xfstesting_btrfs_standard_datadisk_validation(238 self, log_path: Path, result: TestResult239 ) -> None:240 environment = result.environment241 assert environment, "fail to get environment from testresult"242 node = cast(RemoteNode, environment.nodes[0])243 self._check_btrfs_supported(node)244 xfstests = self._install_xfstests(node)245 disk = node.features[Disk]246 data_disks = disk.get_raw_data_disks()247 self._execute_xfstests(248 log_path,249 xfstests,250 result,251 data_disks[0],252 f"{data_disks[0]}1",253 f"{data_disks[0]}2",254 file_system=FileSystem.btrfs,255 test_type=FileSystem.btrfs.name,256 excluded_tests=self.EXCLUDED_TESTS,257 )258 @TestCaseMetadata(259 description="""260 This test case will run generic xfstests testing against261 nvme data disk with xfs type system.262 """,263 timeout=TIME_OUT,264 priority=3,265 requirement=simple_requirement(266 supported_features=[Nvme],267 ),268 )269 def xfstesting_generic_nvme_datadisk_validation(270 self, log_path: Path, result: TestResult271 ) -> None:272 environment = result.environment273 assert environment, "fail to get environment from testresult"274 node = cast(RemoteNode, environment.nodes[0])275 xfstests = self._install_xfstests(node)276 nvme_disk = node.features[Nvme]277 nvme_data_disks = nvme_disk.get_raw_data_disks()278 self._execute_xfstests(279 log_path,280 xfstests,281 result,282 nvme_data_disks[0],283 f"{nvme_data_disks[0]}p1",284 f"{nvme_data_disks[0]}p2",285 excluded_tests=self.EXCLUDED_TESTS,286 )287 @TestCaseMetadata(288 description="""289 This test case will run xfs xfstests testing against290 nvme data disk with xfs type system.291 """,292 timeout=TIME_OUT,293 priority=3,294 requirement=simple_requirement(295 supported_features=[Nvme],296 ),297 )298 def xfstesting_xfs_nvme_datadisk_validation(299 self, log_path: Path, result: TestResult300 ) -> None:301 environment = result.environment302 assert environment, "fail to get environment from testresult"303 node = cast(RemoteNode, environment.nodes[0])304 xfstests = self._install_xfstests(node)305 nvme_disk = node.features[Nvme]306 nvme_data_disks = nvme_disk.get_raw_data_disks()307 self._execute_xfstests(308 log_path,309 xfstests,310 result,311 nvme_data_disks[0],312 f"{nvme_data_disks[0]}p1",313 f"{nvme_data_disks[0]}p2",314 test_type=FileSystem.xfs.name,315 excluded_tests=self.EXCLUDED_TESTS,316 )317 @TestCaseMetadata(318 description="""319 This test case will run ext4 xfstests testing against320 nvme data disk with ext4 type system.321 """,322 timeout=TIME_OUT,323 priority=3,324 requirement=simple_requirement(325 supported_features=[Nvme],326 ),327 )328 def xfstesting_ext4_nvme_datadisk_validation(329 self, log_path: Path, result: TestResult330 ) -> None:331 environment = result.environment332 assert environment, "fail to get environment from testresult"333 node = cast(RemoteNode, environment.nodes[0])334 xfstests = self._install_xfstests(node)335 nvme_disk = node.features[Nvme]336 nvme_data_disks = nvme_disk.get_raw_data_disks()337 self._execute_xfstests(338 log_path,339 xfstests,340 result,341 nvme_data_disks[0],342 f"{nvme_data_disks[0]}p1",343 f"{nvme_data_disks[0]}p2",344 file_system=FileSystem.ext4,345 test_type=FileSystem.ext4.name,346 excluded_tests=self.EXCLUDED_TESTS,347 )348 @TestCaseMetadata(349 description="""350 This test case will run btrfs xfstests testing against351 nvme data disk with btrfs type system.352 """,353 timeout=TIME_OUT,354 priority=3,355 requirement=simple_requirement(356 supported_features=[Nvme],357 ),358 )359 def xfstesting_btrfs_nvme_datadisk_validation(360 self, log_path: Path, result: TestResult361 ) -> None:362 environment = result.environment363 assert environment, "fail to get environment from testresult"364 node = cast(RemoteNode, environment.nodes[0])365 self._check_btrfs_supported(node)366 xfstests = self._install_xfstests(node)367 nvme_disk = node.features[Nvme]368 nvme_data_disks = nvme_disk.get_raw_data_disks()369 self._execute_xfstests(370 log_path,371 xfstests,372 result,373 nvme_data_disks[0],374 f"{nvme_data_disks[0]}p1",375 f"{nvme_data_disks[0]}p2",376 file_system=FileSystem.btrfs,377 test_type=FileSystem.btrfs.name,378 excluded_tests=self.EXCLUDED_TESTS,379 )380 @TestCaseMetadata(381 description="""382 This test case will run cifs xfstests testing against383 azure file share.384 """,385 requirement=simple_requirement(386 min_core_count=16,387 supported_platform_type=[AZURE],388 ),389 timeout=TIME_OUT,390 priority=3,391 )392 def xfstesting_azure_file_share_validation(393 self, log: Logger, log_path: Path, result: TestResult394 ) -> None:395 environment = result.environment396 assert environment, "fail to get environment from testresult"397 assert isinstance(environment.platform, AzurePlatform)398 node = cast(RemoteNode, environment.nodes[0])399 if not node.tools[KernelConfig].is_enabled("CONFIG_CIFS"):400 raise UnsupportedDistroException(401 node.os, "current distro not enable cifs module."402 )403 xfstests = self._install_xfstests(node)404 version = _get_smb_version(node)405 fstab_info = (406 f"nofail,vers={version},credentials=/etc/smbcredentials/lisa.cred"407 ",dir_mode=0777,file_mode=0777,serverino"408 )409 mount_opts = (410 f"-o vers={version},credentials=/etc/smbcredentials/lisa.cred"411 ",dir_mode=0777,file_mode=0777,serverino"412 )413 platform = environment.platform414 information = environment.get_information()415 resource_group_name = information["resource_group_name"]416 location = information["location"]417 random_str = generate_random_chars(string.ascii_lowercase + string.digits, 10)418 storage_account_name = f"lisasc{random_str}"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!