Best Python code snippet using autotest_python
test_mountinfo.py
Source:test_mountinfo.py
...16 def setUp(self):17 LandscapeTest.setUp(self)18 self.mstore.set_accepted_types(["mount-info", "free-space"])19 self.log_helper.ignore_errors("Typelib file for namespace")20 def get_mount_info(self, *args, **kwargs):21 if "mounts_file" not in kwargs:22 kwargs["mounts_file"] = self.makeFile("/dev/hda1 / ext3 rw 0 0\n")23 if "mtab_file" not in kwargs:24 kwargs["mtab_file"] = self.makeFile("/dev/hda1 / ext3 rw 0 0\n")25 if "statvfs" not in kwargs:26 kwargs["statvfs"] = lambda path: os.statvfs_result(27 (0, 0, 0, 0, 0, 0, 0, 0, 0, 0))28 plugin = MountInfo(*args, **kwargs)29 # To make sure tests are isolated from the real system by default.30 plugin.is_device_removable = lambda x: False31 return plugin32 def test_read_proc_mounts(self):33 """34 When the mount info plugin runs it reads data from35 /proc/mounts to discover mounts and calls os.statvfs() to36 retrieve current data for each mount point. This test makes37 sure that os.statvfs() is called without failing, that38 /proc/mounts is readable, and that messages with the expected39 datatypes are generated.40 """41 plugin = self.get_mount_info(create_time=self.reactor.time)42 self.monitor.add(plugin)43 self.reactor.advance(self.monitor.step_size)44 message = plugin.create_mount_info_message()45 self.assertTrue(message)46 self.assertEqual(message["type"], "mount-info")47 self.assertTrue("mount-info" in message)48 self.assertTrue(len(message["mount-info"]) > 0)49 keys = set(["filesystem", "total-space", "device", "mount-point"])50 for now, mount_info in message["mount-info"]:51 self.assertEqual(set(mount_info.keys()), keys)52 self.assertTrue(isinstance(mount_info["filesystem"], basestring))53 self.assertTrue(isinstance(mount_info["device"], basestring))54 self.assertTrue(isinstance(mount_info["total-space"], (int, long)))55 self.assertTrue(isinstance(mount_info["mount-point"], basestring))56 def test_read_sample_data(self):57 """58 Sample data is used to ensure that the free space included in59 the message is calculated correctly.60 """61 def statvfs(path):62 if path == "/":63 return os.statvfs_result(64 (4096, 0, mb(1000), mb(100), 0, 0, 0, 0, 0, 0))65 else:66 return os.statvfs_result(67 (4096, 0, mb(10000), mb(1000), 0, 0, 0, 0, 0, 0))68 filename = self.makeFile("""\69rootfs / rootfs rw 0 070none /dev ramfs rw 0 071/dev/hda1 / ext3 rw 0 072/dev/hda1 /dev/.static/dev ext3 rw 0 073proc /proc proc rw,nodiratime 0 074sysfs /sys sysfs rw 0 075usbfs /proc/bus/usb usbfs rw 0 076devpts /dev/pts devpts rw 0 077tmpfs /dev/shm tmpfs rw 0 078tmpfs /lib/modules/2.6.12-10-386/volatile tmpfs rw 0 079/dev/hde1 /mnt/hde1 reiserfs rw 0 080/dev/hde1 /mnt/bind reiserfs rw 0 081/dev/sdb2 /media/Boot\\040OSX hfsplus nls=utf8 0 082""")83 mtab_filename = self.makeFile("""\84rootfs / rootfs rw 0 085none /dev ramfs rw 0 086/dev/hda1 / ext3 rw 0 087/dev/hda1 /dev/.static/dev ext3 rw 0 088proc /proc proc rw,nodiratime 0 089sysfs /sys sysfs rw 0 090usbfs /proc/bus/usb usbfs rw 0 091devpts /dev/pts devpts rw 0 092tmpfs /dev/shm tmpfs rw 0 093tmpfs /lib/modules/2.6.12-10-386/volatile tmpfs rw 0 094/dev/hde1 /mnt/hde1 reiserfs rw 0 095/dev/hde1 /mnt/bind none rw,bind 0 096/dev/sdb2 /media/Boot\\040OSX hfsplus rw 0 097""")98 plugin = self.get_mount_info(mounts_file=filename, statvfs=statvfs,99 create_time=self.reactor.time,100 mtab_file=mtab_filename)101 self.monitor.add(plugin)102 self.reactor.advance(self.monitor.step_size)103 message = plugin.create_mount_info_message()104 self.assertTrue(message)105 self.assertEqual(message["type"], "mount-info")106 mount_info = message.get("mount-info", ())107 self.assertEqual(len(mount_info), 3)108 self.assertEqual(mount_info[0][1],109 {"device": "/dev/hda1", "mount-point": "/",110 "filesystem": "ext3", "total-space": 4096000})111 self.assertEqual(mount_info[1][1],112 {"device": "/dev/hde1", "mount-point": "/mnt/hde1",113 "filesystem": "reiserfs", "total-space": 40960000})114 self.assertEqual(115 mount_info[2][1],116 {"device": "/dev/sdb2", "mount-point": "/media/Boot OSX",117 "filesystem": "hfsplus", "total-space": 40960000})118 def test_read_changing_total_space(self):119 """120 Total space measurements are only sent when (a) none have ever121 been sent, or (b) the value has changed since the last time122 data was collected. The test sets the mount info plugin123 interval to the same value as the step size and advances the124 reactor such that the plugin will be run twice. Each time it125 runs it gets a different value from our sample statvfs()126 function which should cause it to queue new messages.127 """128 counter = mock_counter(1)129 def statvfs(path, multiplier=lambda: next(counter)):130 return os.statvfs_result(131 (4096, 0, mb(multiplier() * 1000), mb(100), 0, 0, 0, 0, 0, 0))132 plugin = self.get_mount_info(133 statvfs=statvfs, create_time=self.reactor.time,134 interval=self.monitor.step_size)135 self.monitor.add(plugin)136 self.reactor.advance(self.monitor.step_size * 2)137 message = plugin.create_mount_info_message()138 mount_info = message["mount-info"]139 self.assertEqual(len(mount_info), 2)140 for i, total_space in enumerate([4096000, 8192000]):141 self.assertEqual(mount_info[i][0],142 (i + 1) * self.monitor.step_size)143 self.assertEqual(mount_info[i][1],144 {"device": "/dev/hda1", "filesystem": "ext3",145 "mount-point": "/", "total-space": total_space})146 def test_read_disjointed_changing_total_space(self):147 """148 Total space measurements are only sent when (a) none have ever149 been sent, or (b) the value has changed since the last time150 data was collected. This test ensures that the (b) criteria151 is checked per-mount point. The sample statvfs() function152 only provides changing total space for /; therefore, new153 messages should only be queued for / after the first message154 is created.155 """156 counter = mock_counter(1)157 def statvfs(path, multiplier=lambda: next(counter)):158 if path == "/":159 return os.statvfs_result(160 (4096, 0, mb(1000), mb(100), 0, 0, 0, 0, 0, 0))161 return os.statvfs_result(162 (4096, 0, mb(multiplier() * 1000), mb(100), 0, 0, 0, 0, 0, 0))163 filename = self.makeFile("""\164/dev/hda1 / ext3 rw 0 0165/dev/hde1 /mnt/hde1 ext3 rw 0 0166""")167 plugin = self.get_mount_info(mounts_file=filename, statvfs=statvfs,168 create_time=self.reactor.time,169 interval=self.monitor.step_size,170 mtab_file=filename)171 self.monitor.add(plugin)172 self.reactor.advance(self.monitor.step_size * 2)173 message = plugin.create_mount_info_message()174 self.assertTrue(message)175 mount_info = message.get("mount-info", ())176 self.assertEqual(len(mount_info), 3)177 self.assertEqual(mount_info[0][0], self.monitor.step_size)178 self.assertEqual(mount_info[0][1],179 {"device": "/dev/hda1", "mount-point": "/",180 "filesystem": "ext3", "total-space": 4096000})181 self.assertEqual(mount_info[1][0], self.monitor.step_size)182 self.assertEqual(mount_info[1][1],183 {"device": "/dev/hde1", "mount-point": "/mnt/hde1",184 "filesystem": "ext3", "total-space": 4096000})185 self.assertEqual(mount_info[2][0], self.monitor.step_size * 2)186 self.assertEqual(mount_info[2][1],187 {"device": "/dev/hde1", "mount-point": "/mnt/hde1",188 "filesystem": "ext3", "total-space": 8192000})189 def test_exchange_messages(self):190 """191 The mount_info plugin queues message when manager.exchange()192 is called. Each message should be aligned to a step boundary;193 messages collected bewteen exchange periods should be194 delivered in a single message.195 """196 plugin = self.get_mount_info(197 statvfs=statvfs_result_fixture, create_time=self.reactor.time)198 step_size = self.monitor.step_size199 self.monitor.add(plugin)200 # Exchange should trigger a flush of the persist database201 plugin.registry.flush = mock.Mock()202 self.reactor.advance(step_size * 2)203 self.monitor.exchange()204 messages = self.mstore.get_pending_messages()205 self.assertEqual(len(messages), 2)206 message = [d for d in messages if d["type"] == "free-space"][0]207 free_space = message["free-space"]208 for i in range(len(free_space)):209 self.assertEqual(free_space[i][0], (i + 1) * step_size)210 self.assertEqual(free_space[i][1], "/")211 self.assertEqual(free_space[i][2], 409600)212 plugin.registry.flush.assert_called_with()213 def test_messaging_flushes(self):214 """215 Duplicate message should never be created. If no data is216 available, None will be returned when messages are created.217 """218 plugin = self.get_mount_info(219 statvfs=statvfs_result_fixture, create_time=self.reactor.time)220 self.monitor.add(plugin)221 self.reactor.advance(self.monitor.step_size)222 messages = plugin.create_messages()223 self.assertEqual(len(messages), 2)224 messages = plugin.create_messages()225 self.assertEqual(len(messages), 0)226 def test_read_multi_bound_mounts(self):227 """228 The mount info plugin should handle multi-bound mount points229 by reporting them only once. In practice, this test doesn't230 really test anything since the current behaviour is to ignore231 any mount point for which the device doesn't start with /dev.232 """233 filename = self.makeFile("""\234/dev/hdc4 /mm xfs rw 0 0235/mm/ubuntu-mirror /home/dchroot/warty/mirror none bind 0 0236/mm/ubuntu-mirror /home/dchroot/hoary/mirror none bind 0 0237/mm/ubuntu-mirror /home/dchroot/breezy/mirror none bind 0 0238""")239 plugin = self.get_mount_info(mounts_file=filename,240 statvfs=statvfs_result_fixture,241 create_time=self.reactor.time,242 mtab_file=filename)243 step_size = self.monitor.step_size244 self.monitor.add(plugin)245 self.reactor.advance(step_size)246 message = plugin.create_mount_info_message()247 self.assertTrue(message)248 mount_info = message.get("mount-info", ())249 self.assertEqual(len(mount_info), 1)250 self.assertEqual(mount_info[0][0], step_size)251 self.assertEqual(mount_info[0][1],252 {"device": "/dev/hdc4", "mount-point": "/mm",253 "filesystem": "xfs", "total-space": 4096000})254 def test_ignore_nfs_mounts(self):255 """256 The mount info plugin should only report data about local257 mount points.258 """259 filename = self.makeFile("""\260ennui:/data /data nfs rw,v3,rsize=32768,wsize=32768,hard,lock,proto=udp,\261addr=ennui 0 0262""")263 plugin = self.get_mount_info(mounts_file=filename, mtab_file=filename)264 self.monitor.add(plugin)265 plugin.run()266 message = plugin.create_mount_info_message()267 self.assertEqual(message, None)268 def test_ignore_removable_partitions(self):269 """270 "Removable" partitions are not reported to the server.271 """272 plugin = self.get_mount_info()273 plugin.is_device_removable = lambda x: True # They are all removable274 self.monitor.add(plugin)275 plugin.run()276 message = plugin.create_mount_info_message()277 self.assertEqual(message, None)278 def test_sample_free_space(self):279 """Test collecting information about free space."""280 counter = mock_counter(1)281 def statvfs(path, multiplier=lambda: next(counter)):282 return os.statvfs_result(283 (4096, 0, mb(1000), mb(multiplier() * 100), 0, 0, 0, 0, 0, 0))284 plugin = self.get_mount_info(285 statvfs=statvfs, create_time=self.reactor.time)286 step_size = self.monitor.step_size287 self.monitor.add(plugin)288 self.reactor.advance(step_size)289 message = plugin.create_free_space_message()290 self.assertTrue(message)291 self.assertEqual(message.get("type"), "free-space")292 free_space = message.get("free-space", ())293 self.assertEqual(len(free_space), 1)294 self.assertEqual(free_space[0], (step_size, "/", 409600))295 def test_never_exchange_empty_messages(self):296 """297 When the plugin has no data, it's various create_X_message()298 methods will return None. Empty or null messages should never299 be queued.300 """301 self.mstore.set_accepted_types(["load-average"])302 filename = self.makeFile("")303 plugin = self.get_mount_info(mounts_file=filename, mtab_file=filename)304 self.monitor.add(plugin)305 self.monitor.exchange()306 self.assertEqual(len(self.mstore.get_pending_messages()), 0)307 def test_messages(self):308 """309 Test ensures all expected messages are created and contain the310 right datatypes.311 """312 filename = self.makeFile("""\313/dev/hda2 / xfs rw 0 0314""")315 plugin = self.get_mount_info(mounts_file=filename,316 statvfs=statvfs_result_fixture,317 create_time=self.reactor.time,318 mtab_file=filename)319 step_size = self.monitor.step_size320 self.monitor.add(plugin)321 self.reactor.advance(step_size)322 self.monitor.exchange()323 messages = self.mstore.get_pending_messages()324 self.assertEqual(len(messages), 2)325 self.assertEqual(messages[0].get("mount-info"),326 [(step_size,327 {"device": "/dev/hda2", "mount-point": "/",328 "filesystem": "xfs", "total-space": 4096000})])329 self.assertEqual(messages[1].get("free-space"),330 [(step_size, "/", 409600)])331 self.assertTrue(isinstance(messages[1]["free-space"][0][2],332 (int, long)))333 def test_resynchronize(self):334 """335 On the reactor "resynchronize" event, new mount-info messages336 should be sent.337 """338 plugin = self.get_mount_info(339 create_time=self.reactor.time, statvfs=statvfs_result_fixture)340 self.monitor.add(plugin)341 plugin.run()342 plugin.exchange()343 self.reactor.fire("resynchronize", scopes=["disk"])344 plugin.run()345 plugin.exchange()346 messages = self.mstore.get_pending_messages()347 messages = [message for message in messages348 if message["type"] == "mount-info"]349 expected_message = {350 "type": "mount-info",351 "mount-info": [(0, {"device": "/dev/hda1", "mount-point": "/",352 "total-space": 4096000,353 "filesystem": "ext3"})]}354 self.assertMessages(messages, [expected_message, expected_message])355 def test_bind_mounts(self):356 """357 Mounted devices that are mounted using Linux's "--bind" option358 shouldn't be listed, as they have the same free space/used space as the359 device they're bound to.360 """361 # From this test data, we expect only two mount points to be returned,362 # and the other two to be ignored (the rebound /dev/hda2 -> /mnt363 # mounting)364 filename = self.makeFile("""\365/dev/devices/by-uuid/12345567 / ext3 rw 0 0366/dev/hda2 /usr ext3 rw 0 0367/dev/devices/by-uuid/12345567 /mnt ext3 rw 0 0368/dev/devices/by-uuid/12345567 /media/Boot\\040OSX hfsplus rw 0 0369""")370 mtab_filename = self.makeFile("""\371/dev/hda1 / ext3 rw 0 0372/dev/hda2 /usr ext3 rw 0 0373/opt /mnt none rw,bind 0 0374/opt /media/Boot\\040OSX none rw,bind 0 0375""")376 plugin = MountInfo(377 mounts_file=filename, create_time=self.reactor.time,378 statvfs=statvfs_result_fixture, mtab_file=mtab_filename)379 self.monitor.add(plugin)380 plugin.run()381 message = plugin.create_mount_info_message()382 self.assertEqual(message.get("mount-info"),383 [(0, {"device": "/dev/devices/by-uuid/12345567",384 "mount-point": "/", "total-space": 4096000,385 "filesystem": "ext3"}),386 (0, {"device": "/dev/hda2",387 "mount-point": "/usr",388 "total-space": 4096000,389 "filesystem": "ext3"}),390 ])391 def test_no_mtab_file(self):392 """393 If there's no mtab file available, then we can make no guesses about394 bind mounted directories, so any filesystems in /proc/mounts will be395 reported.396 """397 # In this test, we expect all mount points to be returned, as we can't398 # identify any as bind mounts.399 filename = self.makeFile("""\400/dev/devices/by-uuid/12345567 / ext3 rw 0 0401/dev/hda2 /usr ext3 rw 0 0402/dev/devices/by-uuid/12345567 /mnt ext3 rw 0 0403""")404 # mktemp isn't normally secure, due to race conditions, but in this405 # case, we don't actually create the file at all.406 mtab_filename = tempfile.mktemp()407 plugin = MountInfo(408 mounts_file=filename, create_time=self.reactor.time,409 statvfs=statvfs_result_fixture, mtab_file=mtab_filename)410 self.monitor.add(plugin)411 plugin.run()412 message = plugin.create_mount_info_message()413 self.assertEqual(message.get("mount-info"),414 [(0, {"device": "/dev/devices/by-uuid/12345567",415 "mount-point": "/", "total-space": 4096000,416 "filesystem": "ext3"}),417 (0, {"device": "/dev/hda2",418 "mount-point": "/usr",419 "total-space": 4096000,420 "filesystem": "ext3"}),421 (0, {"device": "/dev/devices/by-uuid/12345567",422 "mount-point": "/mnt",423 "total-space": 4096000,424 "filesystem": "ext3"})])425 def test_no_message_if_not_accepted(self):426 """427 Don't add any messages at all if the broker isn't currently428 accepting their type.429 """430 self.mstore.set_accepted_types([])431 # From this test data, we expect only two mount points to be returned,432 # and the third to be ignored (the rebound /dev/hda2 -> /mnt mounting)433 filename = self.makeFile("""\434/dev/devices/by-uuid/12345567 / ext3 rw 0 0435/dev/hda2 /usr ext3 rw 0 0436/dev/devices/by-uuid/12345567 /mnt ext3 rw 0 0437""")438 mtab_filename = self.makeFile("""\439/dev/hda1 / ext3 rw 0 0440/dev/hda2 /usr ext3 rw 0 0441/opt /mnt none rw,bind 0 0442""")443 plugin = MountInfo(444 mounts_file=filename, create_time=self.reactor.time,445 statvfs=statvfs_result_fixture, mtab_file=mtab_filename)446 self.monitor.add(plugin)447 self.reactor.advance(self.monitor.step_size * 2)448 self.monitor.exchange()449 self.mstore.set_accepted_types(["mount-info"])450 self.assertMessages(list(self.mstore.get_pending_messages()), [])451 def test_call_on_accepted(self):452 plugin = self.get_mount_info(create_time=self.reactor.time)453 self.monitor.add(plugin)454 self.reactor.advance(plugin.run_interval)455 with mock.patch.object(self.remote, "send_message"):456 self.reactor.fire(457 ("message-type-acceptance-changed", "mount-info"),458 True)459 self.remote.send_message.assert_called_with(460 mock.ANY, mock.ANY, urgent=True)461 self.assertEqual(self.remote.send_message.call_count, 2)462 def test_persist_timing(self):463 """Mount info are only persisted when exchange happens.464 Previously mount info were persisted as soon as they were gathered: if465 an event happened between the persist and the exchange, the server466 didn't get the mount info at all. This test ensures that mount info are467 only saved when exchange happens.468 """469 filename = self.makeFile("""\470/dev/hda1 / ext3 rw 0 0471""")472 plugin = MountInfo(473 mounts_file=filename, create_time=self.reactor.time,474 statvfs=statvfs_result_fixture, mtab_file=filename)475 self.monitor.add(plugin)476 plugin.run()477 message1 = plugin.create_mount_info_message()478 self.assertEqual(479 message1.get("mount-info"),480 [(0, {"device": "/dev/hda1",481 "filesystem": "ext3",482 "mount-point": "/",483 "total-space": 4096000})])484 plugin.run()485 message2 = plugin.create_mount_info_message()486 self.assertEqual(487 message2.get("mount-info"),488 [(0, {"device": "/dev/hda1",489 "filesystem": "ext3",490 "mount-point": "/",491 "total-space": 4096000})])492 # Run again, calling create_mount_info_message purge the information493 plugin.run()494 plugin.exchange()495 plugin.run()496 message3 = plugin.create_mount_info_message()497 self.assertIdentical(message3, None)498 def test_exchange_limits_exchanged_free_space_messages(self):499 """500 In order not to overload the server, the client should stagger the501 exchange of free-space messages.502 """503 plugin = self.get_mount_info(504 statvfs=statvfs_result_fixture, create_time=self.reactor.time)505 # Limit the test exchange to 5 items.506 plugin.max_free_space_items_to_exchange = 5507 step_size = self.monitor.step_size508 self.monitor.add(plugin)509 # Exchange should trigger a flush of the persist database510 plugin.registry.flush = mock.Mock()511 # Generate 10 data points512 self.reactor.advance(step_size * 10)513 self.monitor.exchange()514 messages = self.mstore.get_pending_messages()515 self.assertEqual(len(messages), 2)516 message = [d for d in messages if d["type"] == "free-space"][0]517 free_space = message["free-space"]...
dataset_details.py
Source:dataset_details.py
...219 collapsed.append(dataset)220 for child in dataset.get('children', []):221 self.collapse_datasets(child, collapsed)222 @private223 def get_mount_info(self, path, mntinfo):224 mount_info = {}225 try:226 devid = os.stat(path).st_dev227 except Exception:228 # path deleted/umounted/locked etc229 pass230 else:231 if devid in mntinfo:232 mount_info = mntinfo[devid]233 return mount_info234 @private235 def get_atime_and_casesensitivity(self, ds, mntinfo):236 atime = case = True237 for devid, info in filter(lambda x: x[1]['mountpoint'] == ds['mountpoint'], mntinfo.items()):238 atime = not ('NOATIME' in info['mount_opts'])239 case = any((i for i in ('CASESENSITIVE', 'CASEMIXED') if i in info['super_opts']))240 # case sensitivity is either on or off (sensitive or insensitve)241 # the "mixed" property is silently ignored in our use case because it242 # only applies to illumos kernel when using the in-kernel SMB server.243 # if it's set to "mixed" on linux, it's treated as case sensitive.244 return atime, case245 @private246 def build_details(self, mntinfo):247 results = {248 'iscsi': [], 'nfs': [], 'smb': [],249 'repl': [], 'snap': [], 'cloud': [],250 'rsync': [], 'vm': [], 'app': []251 }252 # iscsi253 t_to_e = self.middleware.call_sync('iscsi.targetextent.query')254 t = {i['id']: i for i in self.middleware.call_sync('iscsi.target.query')}255 e = {i['id']: i for i in self.middleware.call_sync('iscsi.extent.query')}256 for i in filter(lambda x: x['target'] in t and t[x['target']]['groups'] and x['extent'] in e, t_to_e):257 """258 1. make sure target's and extent's id exist in the target to extent table259 2. make sure the target has `groups` entry since, without it, it's impossible260 that it's being shared via iscsi261 """262 results['iscsi'].append({263 'extent': e[i['extent']],264 'target': t[i['target']],265 'mount_info': self.get_mount_info(e[i['extent']]['path'], mntinfo),266 })267 # nfs and smb268 for key in ('nfs', 'smb'):269 for share in self.middleware.call_sync(f'sharing.{key}.query'):270 share['mount_info'] = self.get_mount_info(share['path'], mntinfo)271 results[key].append(share)272 # replication273 options = {'prefix': 'repl_'}274 for task in self.middleware.call_sync('datastore.query', 'storage.replication', [], options):275 # replication can only be configured on a dataset so getting mount info is unnecessary276 results['repl'].append(task)277 # snapshots278 for task in self.middleware.call_sync('datastore.query', 'storage.task', [], {'prefix': 'task_'}):279 # snapshots can only be configured on a dataset so getting mount info is unnecessary280 results['snap'].append(task)281 # cloud sync282 for task in self.middleware.call_sync('datastore.query', 'tasks.cloudsync'):283 task['mount_info'] = self.get_mount_info(task['path'], mntinfo)284 results['cloud'].append(task)285 # rsync286 for task in self.middleware.call_sync('rsynctask.query'):287 task['mount_info'] = self.get_mount_info(task['path'], mntinfo)288 results['rsync'].append(task)289 # vm290 for vm in self.middleware.call_sync('datastore.query', 'vm.device', [['dtype', 'in', ['RAW', 'DISK']]]):291 if vm['dtype'] == 'DISK':292 # disk type is always a zvol293 vm['zvol'] = vm['attributes']['path'].removeprefix('/dev/zvol/')294 else:295 # raw type is always a file296 vm['mount_info'] = self.get_mount_info(vm['attributes']['path'], mntinfo)297 results['vm'].append(vm)298 # app299 for app_name, paths in self.middleware.call_sync('chart.release.get_consumed_host_paths').items():300 # We want to filter out any other paths which might be consumed to improve performance here301 # and avoid unnecessary mount info calls i.e /proc /sys /etc/ etc302 for path in filter(303 lambda x: x.startswith('/mnt/') and 'ix-applications/' not in x,304 paths305 ):306 results['app'].append({307 'name': app_name,308 'path': path,309 'mount_info': self.get_mount_info(path, mntinfo),310 })311 return results312 @private313 def get_snapcount_and_encryption_status(self, ds, mntinfo):314 snap_count = 0315 locked = False316 if ds['type'] == 'FILESYSTEM':317 # FIXME: determining zvol snapshot count requires iterating over318 # all snapshots for a given zvol and then counting them which is319 # painful. This will be moot when this is merged upstream320 # https://github.com/openzfs/zfs/pull/13635321 try:322 st = os.stat(f'{ds["mountpoint"]}/.zfs/snapshot')323 except FileNotFoundError:...
__init__.py
Source:__init__.py
...30 ')').format(path, owner, group, mode, copy_files, usage_quota, inode_quota)31 )32 if not os.path.exists(path):33 logging.info('%s: Path does not exist, creating...', path)34 mnt = get_mount_info(path)35 os.mkdir(path)36 for src, dest in copy_files:37 shutil.copyfile(src, os.path.join(path, dest))38 run_cmd('chown -R {0}:{1} {2}'.format(owner, group, path))39 os.chmod(path, mode)40 41 if usage_quota or inode_quota:42 run_cmd(set_quota_cmd(43 owner, mnt.file, fstype=mnt.vfstype, usage=usage_quota, inode=inode_quota44 ))45 logging.info('... done.')46class UnusedPeriodPolicy(object):47 """Instances of this policy return True if the file at the given path48 hasn't been accessed or modified within the given time period, which ...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!