Best Python code snippet using lisa_python
test_mount.py
Source:test_mount.py
...156 self.test.mock_rmdir = self.useFixture(fixtures.MockPatch(157 'nova.privsep.path.rmdir')).mock158 def _fake_mount(self, fstype, export, mountpoint, options):159 self.mounts.add(mountpoint)160 def _fake_umount(self, mountpoint):161 self.mounts.remove(mountpoint)162 def _fake_ismount(self, mountpoint):163 return mountpoint in self.mounts164class HostMountStateTestCase(test.NoDBTestCase):165 def setUp(self):166 super(HostMountStateTestCase, self).setUp()167 self.mountfixture = self.useFixture(MountFixture(self))168 def test_init(self):169 # Test that we initialise the state of MountManager correctly at170 # startup171 def fake_disk(disk):172 libvirt_disk = libvirt_config.LibvirtConfigGuestDisk()173 libvirt_disk.source_type = disk[0]174 libvirt_disk.source_path = os.path.join(*disk[1])175 return libvirt_disk176 def mock_guest(uuid, disks):177 guest = mock.create_autospec(libvirt_guest.Guest)178 guest.uuid = uuid179 guest.get_all_disks.return_value = map(fake_disk, disks)180 return guest181 local_dir = '/local'182 mountpoint_a = '/mnt/a'183 mountpoint_b = '/mnt/b'184 self.mounts.add(mountpoint_a)185 self.mounts.add(mountpoint_b)186 guests = map(mock_guest, [uuids.instance_a, uuids.instance_b], [187 # Local file root disk and a volume on each of mountpoints a and b188 [189 ('file', (local_dir, uuids.instance_a, 'disk')),190 ('file', (mountpoint_a, 'vola1')),191 ('file', (mountpoint_b, 'volb1')),192 ],193 # Local LVM root disk and a volume on each of mountpoints a and b194 [195 ('block', ('/dev', 'vg', uuids.instance_b + '_disk')),196 ('file', (mountpoint_a, 'vola2')),197 ('file', (mountpoint_b, 'volb2')),198 ]199 ])200 host = mock.create_autospec(libvirt_host.Host)201 host.list_guests.return_value = guests202 m = mount._HostMountState(host, 0)203 self.assertEqual([mountpoint_a, mountpoint_b],204 sorted(m.mountpoints.keys()))205 self.assertSetEqual(set([('vola1', uuids.instance_a),206 ('vola2', uuids.instance_b)]),207 m.mountpoints[mountpoint_a].attachments)208 self.assertSetEqual(set([('volb1', uuids.instance_a),209 ('volb2', uuids.instance_b)]),210 m.mountpoints[mountpoint_b].attachments)211 @staticmethod212 def _get_clean_hostmountstate():213 # list_guests returns no guests: _HostMountState initial state is214 # clean.215 host = mock.create_autospec(libvirt_host.Host)216 host.list_guests.return_value = []217 return mount._HostMountState(host, 0)218 def _sentinel_mount(self, m, vol, mountpoint=mock.sentinel.mountpoint,219 instance=None):220 if instance is None:221 instance = mock.sentinel.instance222 instance.uuid = uuids.instance223 m.mount(mock.sentinel.fstype, mock.sentinel.export,224 vol, mountpoint, instance,225 [mock.sentinel.option1, mock.sentinel.option2])226 def _sentinel_umount(self, m, vol, mountpoint=mock.sentinel.mountpoint,227 instance=mock.sentinel.instance):228 m.umount(vol, mountpoint, instance)229 def test_mount_umount(self):230 # Mount 2 different volumes from the same export. Test that we only231 # mount and umount once.232 m = self._get_clean_hostmountstate()233 # Mount vol_a from export234 self._sentinel_mount(m, mock.sentinel.vol_a)235 self.mock_ensure_tree.assert_has_calls([236 mock.call(mock.sentinel.mountpoint)])237 self.mock_mount.assert_has_calls([238 mock.call(mock.sentinel.fstype,239 mock.sentinel.export, mock.sentinel.mountpoint,240 [mock.sentinel.option1, mock.sentinel.option2])])241 # Mount vol_b from export. We shouldn't have mounted again242 self._sentinel_mount(m, mock.sentinel.vol_b)243 self.mock_ensure_tree.assert_has_calls([244 mock.call(mock.sentinel.mountpoint)])245 self.mock_mount.assert_has_calls([246 mock.call(mock.sentinel.fstype,247 mock.sentinel.export, mock.sentinel.mountpoint,248 [mock.sentinel.option1, mock.sentinel.option2])])249 # Unmount vol_a. We shouldn't have unmounted250 self._sentinel_umount(m, mock.sentinel.vol_a)251 self.mock_ensure_tree.assert_has_calls([252 mock.call(mock.sentinel.mountpoint)])253 self.mock_mount.assert_has_calls([254 mock.call(mock.sentinel.fstype,255 mock.sentinel.export, mock.sentinel.mountpoint,256 [mock.sentinel.option1, mock.sentinel.option2])])257 # Unmount vol_b. We should have umounted.258 self._sentinel_umount(m, mock.sentinel.vol_b)259 self.mock_ensure_tree.assert_has_calls([260 mock.call(mock.sentinel.mountpoint)])261 self.mock_mount.assert_has_calls([262 mock.call(mock.sentinel.fstype,263 mock.sentinel.export, mock.sentinel.mountpoint,264 [mock.sentinel.option1, mock.sentinel.option2])])265 self.mock_umount.assert_has_calls([266 mock.call(mock.sentinel.mountpoint)])267 self.mock_rmdir.assert_has_calls([268 mock.call(mock.sentinel.mountpoint)])269 def test_mount_umount_multi_attach(self):270 # Mount a volume from a single export for 2 different instances. Test271 # that we only mount and umount once.272 m = self._get_clean_hostmountstate()273 instance_a = mock.sentinel.instance_a274 instance_a.uuid = uuids.instance_a275 instance_b = mock.sentinel.instance_b276 instance_b.uuid = uuids.instance_b277 # Mount vol_a for instance_a278 self._sentinel_mount(m, mock.sentinel.vol_a, instance=instance_a)279 self.mock_mount.assert_has_calls([280 mock.call(mock.sentinel.fstype, mock.sentinel.export,281 mock.sentinel.mountpoint,282 [mock.sentinel.option1, mock.sentinel.option2])])283 self.mock_mount.reset_mock()284 # Mount vol_a for instance_b. We shouldn't have mounted again285 self._sentinel_mount(m, mock.sentinel.vol_a, instance=instance_b)286 self.mock_mount.assert_not_called()287 # Unmount vol_a for instance_a. We shouldn't have unmounted288 self._sentinel_umount(m, mock.sentinel.vol_a, instance=instance_a)289 self.mock_umount.assert_not_called()290 # Unmount vol_a for instance_b. We should have umounted.291 self._sentinel_umount(m, mock.sentinel.vol_a, instance=instance_b)292 self.mock_umount.assert_has_calls(293 [mock.call(mock.sentinel.mountpoint)])294 def test_mount_concurrent(self):295 # This is 2 tests in 1, because the first test is the precondition296 # for the second.297 # The first test is that if 2 threads call mount simultaneously,298 # only one of them will do the mount299 # The second test is that we correctly handle the case where we300 # delete a lock after umount. During the umount of the first test,301 # which will delete the lock when it completes, we start 2 more302 # threads which both call mount. These threads are holding a lock303 # which is about to be deleted. We test that they still don't race,304 # and only one of them calls mount.305 m = self._get_clean_hostmountstate()306 def mount_a():307 # Mount vol_a from export308 self._sentinel_mount(m, mock.sentinel.vol_a)309 ThreadController.current().waitpoint('mounted')310 self._sentinel_umount(m, mock.sentinel.vol_a)311 def mount_b():312 # Mount vol_b from export313 self._sentinel_mount(m, mock.sentinel.vol_b)314 self._sentinel_umount(m, mock.sentinel.vol_b)315 def mount_c():316 self._sentinel_mount(m, mock.sentinel.vol_c)317 def mount_d():318 self._sentinel_mount(m, mock.sentinel.vol_d)319 ctl_a = ThreadController(mount_a)320 ctl_b = ThreadController(mount_b)321 ctl_c = ThreadController(mount_c)322 ctl_d = ThreadController(mount_d)323 def trap_mount(*args, **kwargs):324 # Conditionally wait at a waitpoint named after the command325 # we're executing326 self.mountfixture._fake_mount(*args, **kwargs)327 ThreadController.current().waitpoint('mount')328 def trap_umount(*args, **kwargs):329 # Conditionally wait at a waitpoint named after the command330 # we're executing331 self.mountfixture._fake_umount(*args, **kwargs)332 ThreadController.current().waitpoint('umount')333 self.mock_mount.side_effect = trap_mount334 self.mock_umount.side_effect = trap_umount335 # Run the first thread until it's blocked while calling mount336 ctl_a.runto('mount')337 self.mock_ensure_tree.assert_has_calls([338 mock.call(mock.sentinel.mountpoint)])339 self.mock_mount.assert_has_calls([340 mock.call(mock.sentinel.fstype, mock.sentinel.export,341 mock.sentinel.mountpoint,342 [mock.sentinel.option1, mock.sentinel.option2])])343 # Start the second mount, and ensure it's got plenty of opportunity344 # to race.345 ctl_b.start()346 time.sleep(0.01)347 self.mock_ensure_tree.assert_has_calls([348 mock.call(mock.sentinel.mountpoint)])349 self.mock_mount.assert_has_calls([350 mock.call(mock.sentinel.fstype, mock.sentinel.export,351 mock.sentinel.mountpoint,352 [mock.sentinel.option1, mock.sentinel.option2])])353 self.mock_umount.assert_not_called()354 # Allow ctl_a to complete its mount355 ctl_a.runto('mounted')356 self.mock_ensure_tree.assert_has_calls([357 mock.call(mock.sentinel.mountpoint)])358 self.mock_mount.assert_has_calls([359 mock.call(mock.sentinel.fstype, mock.sentinel.export,360 mock.sentinel.mountpoint,361 [mock.sentinel.option1, mock.sentinel.option2])])362 self.mock_umount.assert_not_called()363 # Allow ctl_b to finish. We should not have done a umount364 ctl_b.finish()365 self.mock_ensure_tree.assert_has_calls([366 mock.call(mock.sentinel.mountpoint)])367 self.mock_mount.assert_has_calls([368 mock.call(mock.sentinel.fstype, mock.sentinel.export,369 mock.sentinel.mountpoint,370 [mock.sentinel.option1, mock.sentinel.option2])])371 self.mock_umount.assert_not_called()372 # Allow ctl_a to start umounting. We haven't executed rmdir yet,373 # because we've blocked during umount374 ctl_a.runto('umount')375 self.mock_ensure_tree.assert_has_calls([376 mock.call(mock.sentinel.mountpoint)])377 self.mock_mount.assert_has_calls([378 mock.call(mock.sentinel.fstype, mock.sentinel.export,379 mock.sentinel.mountpoint,380 [mock.sentinel.option1, mock.sentinel.option2])])381 self.mock_umount.assert_has_calls(382 [mock.call(mock.sentinel.mountpoint)])383 self.mock_rmdir.assert_not_called()384 # While ctl_a is umounting, simultaneously start both ctl_c and385 # ctl_d, and ensure they have an opportunity to race386 ctl_c.start()387 ctl_d.start()388 time.sleep(0.01)389 # Allow a, c, and d to complete390 for ctl in (ctl_a, ctl_c, ctl_d):391 ctl.finish()392 # We should have completed the previous umount, then remounted393 # exactly once394 self.mock_ensure_tree.assert_has_calls([395 mock.call(mock.sentinel.mountpoint)])396 self.mock_mount.assert_has_calls([397 mock.call(mock.sentinel.fstype, mock.sentinel.export,398 mock.sentinel.mountpoint,399 [mock.sentinel.option1, mock.sentinel.option2]),400 mock.call(mock.sentinel.fstype, mock.sentinel.export,401 mock.sentinel.mountpoint,402 [mock.sentinel.option1, mock.sentinel.option2])])403 self.mock_umount.assert_has_calls(404 [mock.call(mock.sentinel.mountpoint)])405 def test_mount_concurrent_no_interfere(self):406 # Test that concurrent calls to mount volumes in different exports407 # run concurrently408 m = self._get_clean_hostmountstate()409 def mount_a():410 # Mount vol on mountpoint a411 self._sentinel_mount(m, mock.sentinel.vol,412 mock.sentinel.mountpoint_a)413 ThreadController.current().waitpoint('mounted')414 self._sentinel_umount(m, mock.sentinel.vol,415 mock.sentinel.mountpoint_a)416 def mount_b():417 # Mount vol on mountpoint b418 self._sentinel_mount(m, mock.sentinel.vol,419 mock.sentinel.mountpoint_b)420 self._sentinel_umount(m, mock.sentinel.vol,421 mock.sentinel.mountpoint_b)422 ctl_a = ThreadController(mount_a)423 ctl_b = ThreadController(mount_b)424 ctl_a.runto('mounted')425 self.mock_mount.assert_has_calls([426 mock.call(mock.sentinel.fstype, mock.sentinel.export,427 mock.sentinel.mountpoint_a,428 [mock.sentinel.option1, mock.sentinel.option2])])429 self.mock_mount.reset_mock()430 ctl_b.finish()431 self.mock_mount.assert_has_calls([432 mock.call(mock.sentinel.fstype, mock.sentinel.export,433 mock.sentinel.mountpoint_b,434 [mock.sentinel.option1, mock.sentinel.option2])])435 self.mock_umount.assert_has_calls(436 [mock.call(mock.sentinel.mountpoint_b)])437 self.mock_umount.reset_mock()438 ctl_a.finish()439 self.mock_umount.assert_has_calls(440 [mock.call(mock.sentinel.mountpoint_a)])441 def test_mount_after_failed_umount(self):442 # Test that MountManager correctly tracks state when umount fails.443 # Test that when umount fails a subsequent mount doesn't try to444 # remount it.445 m = self._get_clean_hostmountstate()446 self.mock_umount.side_effect = processutils.ProcessExecutionError447 # Mount vol_a448 self._sentinel_mount(m, mock.sentinel.vol_a)449 self.mock_mount.assert_has_calls([450 mock.call(mock.sentinel.fstype, mock.sentinel.export,451 mock.sentinel.mountpoint,452 [mock.sentinel.option1, mock.sentinel.option2])])453 self.mock_mount.reset_mock()454 # Umount vol_a. The umount command will fail.455 self._sentinel_umount(m, mock.sentinel.vol_a)456 self.mock_umount.assert_has_calls(457 [mock.call(mock.sentinel.mountpoint)])458 # We should not have called rmdir, because umount failed459 self.mock_rmdir.assert_not_called()460 # Mount vol_a again. We should not have called mount, because umount461 # failed.462 self._sentinel_mount(m, mock.sentinel.vol_a)463 self.mock_mount.assert_not_called()464 # Prevent future failure of umount465 self.mock_umount.side_effect = self.mountfixture._fake_umount466 # Umount vol_a successfully467 self._sentinel_umount(m, mock.sentinel.vol_a)468 self.mock_umount.assert_has_calls(469 [mock.call(mock.sentinel.mountpoint)])470 @mock.patch.object(mount.LOG, 'error')471 def test_umount_log_failure(self, mock_log):472 self.mock_umount.side_effect = processutils.ProcessExecutionError(473 None, None, None, 'umount', 'umount: device is busy.')474 m = self._get_clean_hostmountstate()475 self._sentinel_mount(m, mock.sentinel.vol_a)476 self._sentinel_umount(m, mock.sentinel.vol_a)477 mock_log.assert_called()478class MountManagerTestCase(test.NoDBTestCase):479 class FakeHostMountState(object):480 def __init__(self, host, generation):481 self.host = host482 self.generation = generation483 ctl = ThreadController.current()484 if ctl is not None:485 ctl.waitpoint('init')486 def setUp(self):487 super(MountManagerTestCase, self).setUp()488 self.useFixture(fixtures.MonkeyPatch(489 'nova.virt.libvirt.volume.mount._HostMountState',490 self.FakeHostMountState))...
mountsnoop.py
Source:mountsnoop.py
...123 event.retval = PT_REGS_RC(ctx);124 events.perf_submit(ctx, &event, sizeof(event));125 return 0;126}127int syscall__umount(struct pt_regs *ctx, char __user *target, int flags)128{129 struct data_t event = {};130 struct task_struct *task;131 struct nsproxy *nsproxy;132 struct mnt_namespace *mnt_ns;133 event.pid = bpf_get_current_pid_tgid() & 0xffffffff;134 event.tgid = bpf_get_current_pid_tgid() >> 32;135 event.type = EVENT_UMOUNT;136 bpf_get_current_comm(event.enter.comm, sizeof(event.enter.comm));137 event.enter.flags = flags;138 task = (struct task_struct *)bpf_get_current_task();139 nsproxy = task->nsproxy;140 mnt_ns = nsproxy->mnt_ns;141 event.enter.mnt_ns = mnt_ns->ns.inum;142 events.perf_submit(ctx, &event, sizeof(event));143 event.type = EVENT_UMOUNT_TARGET;144 __builtin_memset(event.str, 0, sizeof(event.str));145 bpf_probe_read(event.str, sizeof(event.str), target);146 events.perf_submit(ctx, &event, sizeof(event));147 return 0;148}149int do_ret_sys_umount(struct pt_regs *ctx)150{151 struct data_t event = {};152 event.type = EVENT_UMOUNT_RET;153 event.pid = bpf_get_current_pid_tgid() & 0xffffffff;154 event.tgid = bpf_get_current_pid_tgid() >> 32;155 event.retval = PT_REGS_RC(ctx);156 events.perf_submit(ctx, &event, sizeof(event));157 return 0;158}159"""160# sys/mount.h161MS_MGC_VAL = 0xc0ed0000162MS_MGC_MSK = 0xffff0000163MOUNT_FLAGS = [164 ('MS_RDONLY', 1),165 ('MS_NOSUID', 2),166 ('MS_NODEV', 4),167 ('MS_NOEXEC', 8),168 ('MS_SYNCHRONOUS', 16),169 ('MS_REMOUNT', 32),170 ('MS_MANDLOCK', 64),171 ('MS_DIRSYNC', 128),172 ('MS_NOATIME', 1024),173 ('MS_NODIRATIME', 2048),174 ('MS_BIND', 4096),175 ('MS_MOVE', 8192),176 ('MS_REC', 16384),177 ('MS_SILENT', 32768),178 ('MS_POSIXACL', 1 << 16),179 ('MS_UNBINDABLE', 1 << 17),180 ('MS_PRIVATE', 1 << 18),181 ('MS_SLAVE', 1 << 19),182 ('MS_SHARED', 1 << 20),183 ('MS_RELATIME', 1 << 21),184 ('MS_KERNMOUNT', 1 << 22),185 ('MS_I_VERSION', 1 << 23),186 ('MS_STRICTATIME', 1 << 24),187 ('MS_LAZYTIME', 1 << 25),188 ('MS_ACTIVE', 1 << 30),189 ('MS_NOUSER', 1 << 31),190]191UMOUNT_FLAGS = [192 ('MNT_FORCE', 1),193 ('MNT_DETACH', 2),194 ('MNT_EXPIRE', 4),195 ('UMOUNT_NOFOLLOW', 8),196]197TASK_COMM_LEN = 16 # linux/sched.h198MAX_STR_LEN = 412199class EventType(object):200 EVENT_MOUNT = 0201 EVENT_MOUNT_SOURCE = 1202 EVENT_MOUNT_TARGET = 2203 EVENT_MOUNT_TYPE = 3204 EVENT_MOUNT_DATA = 4205 EVENT_MOUNT_RET = 5206 EVENT_UMOUNT = 6207 EVENT_UMOUNT_TARGET = 7208 EVENT_UMOUNT_RET = 8209class EnterData(ctypes.Structure):210 _fields_ = [211 ('mnt_ns', ctypes.c_uint),212 ('comm', ctypes.c_char * TASK_COMM_LEN),213 ('flags', ctypes.c_ulong),214 ]215class DataUnion(ctypes.Union):216 _fields_ = [217 ('enter', EnterData),218 ('str', ctypes.c_char * MAX_STR_LEN),219 ('retval', ctypes.c_int),220 ]221class Event(ctypes.Structure):222 _fields_ = [223 ('type', ctypes.c_uint),224 ('pid', ctypes.c_uint),225 ('tgid', ctypes.c_uint),226 ('union', DataUnion),227 ]228def _decode_flags(flags, flag_list):229 str_flags = []230 for flag, bit in flag_list:231 if flags & bit:232 str_flags.append(flag)233 flags &= ~bit234 if flags or not str_flags:235 str_flags.append('0x{:x}'.format(flags))236 return str_flags237def decode_flags(flags, flag_list):238 return '|'.join(_decode_flags(flags, flag_list))239def decode_mount_flags(flags):240 str_flags = []241 if flags & MS_MGC_MSK == MS_MGC_VAL:242 flags &= ~MS_MGC_MSK243 str_flags.append('MS_MGC_VAL')244 str_flags.extend(_decode_flags(flags, MOUNT_FLAGS))245 return '|'.join(str_flags)246def decode_umount_flags(flags):247 return decode_flags(flags, UMOUNT_FLAGS)248def decode_errno(retval):249 try:250 return '-' + errno.errorcode[-retval]251 except KeyError:252 return str(retval)253_escape_chars = {254 ord('\a'): '\\a',255 ord('\b'): '\\b',256 ord('\t'): '\\t',257 ord('\n'): '\\n',258 ord('\v'): '\\v',259 ord('\f'): '\\f',260 ord('\r'): '\\r',261 ord('"'): '\\"',262 ord('\\'): '\\\\',263}264def escape_character(c):265 try:266 return _escape_chars[c]267 except KeyError:268 if 0x20 <= c <= 0x7e:269 return chr(c)270 else:271 return '\\x{:02x}'.format(c)272if sys.version_info.major < 3:273 def decode_mount_string(s):274 return '"{}"'.format(''.join(escape_character(ord(c)) for c in s))275else:276 def decode_mount_string(s):277 return '"{}"'.format(''.join(escape_character(c) for c in s))278def print_event(mounts, umounts, cpu, data, size):279 event = ctypes.cast(data, ctypes.POINTER(Event)).contents280 try:281 if event.type == EventType.EVENT_MOUNT:282 mounts[event.pid] = {283 'pid': event.pid,284 'tgid': event.tgid,285 'mnt_ns': event.union.enter.mnt_ns,286 'comm': event.union.enter.comm,287 'flags': event.union.enter.flags,288 }289 elif event.type == EventType.EVENT_MOUNT_SOURCE:290 mounts[event.pid]['source'] = event.union.str291 elif event.type == EventType.EVENT_MOUNT_TARGET:292 mounts[event.pid]['target'] = event.union.str293 elif event.type == EventType.EVENT_MOUNT_TYPE:294 mounts[event.pid]['type'] = event.union.str295 elif event.type == EventType.EVENT_MOUNT_DATA:296 # XXX: data is not always a NUL-terminated string297 mounts[event.pid]['data'] = event.union.str298 elif event.type == EventType.EVENT_UMOUNT:299 umounts[event.pid] = {300 'pid': event.pid,301 'tgid': event.tgid,302 'mnt_ns': event.union.enter.mnt_ns,303 'comm': event.union.enter.comm,304 'flags': event.union.enter.flags,305 }306 elif event.type == EventType.EVENT_UMOUNT_TARGET:307 umounts[event.pid]['target'] = event.union.str308 elif (event.type == EventType.EVENT_MOUNT_RET or309 event.type == EventType.EVENT_UMOUNT_RET):310 if event.type == EventType.EVENT_MOUNT_RET:311 syscall = mounts.pop(event.pid)312 call = ('mount({source}, {target}, {type}, {flags}, {data}) ' +313 '= {retval}').format(314 source=decode_mount_string(syscall['source']),315 target=decode_mount_string(syscall['target']),316 type=decode_mount_string(syscall['type']),317 flags=decode_mount_flags(syscall['flags']),318 data=decode_mount_string(syscall['data']),319 retval=decode_errno(event.union.retval))320 else:321 syscall = umounts.pop(event.pid)322 call = 'umount({target}, {flags}) = {retval}'.format(323 target=decode_mount_string(syscall['target']),324 flags=decode_umount_flags(syscall['flags']),325 retval=decode_errno(event.union.retval))326 print('{:16} {:<7} {:<7} {:<11} {}'.format(327 syscall['comm'].decode('utf-8', 'replace'), syscall['tgid'],328 syscall['pid'], syscall['mnt_ns'], call))329 except KeyError:330 # This might happen if we lost an event.331 pass332def main():333 parser = argparse.ArgumentParser(334 description='trace mount() and umount() syscalls'335 )336 parser.add_argument("--ebpf", action="store_true",337 help=argparse.SUPPRESS)338 args = parser.parse_args()339 mounts = {}340 umounts = {}341 if args.ebpf:342 print(bpf_text)343 exit()344 b = bcc.BPF(text=bpf_text)345 mount_fnname = b.get_syscall_fnname("mount")346 b.attach_kprobe(event=mount_fnname, fn_name="syscall__mount")347 b.attach_kretprobe(event=mount_fnname, fn_name="do_ret_sys_mount")348 umount_fnname = b.get_syscall_fnname("umount")...
mount_manager_test.py
Source:mount_manager_test.py
...52 ):53 mock_command.side_effect = Exception54 mock_mounted.return_value = True55 with self._caplog.at_level(logging.WARNING):56 assert self.mount_manager.umount(raise_on_busy=False) is False57 assert mock_command.call_args_list == [58 call(['umount', '/some/mountpoint']), # 159 call(['umount', '/some/mountpoint']), # 260 call(['umount', '/some/mountpoint']), # 361 call(['umount', '/some/mountpoint']), # 462 call(['umount', '/some/mountpoint']), # 563 call(['umount', '/some/mountpoint']), # 664 call(['umount', '/some/mountpoint']), # 765 call(['umount', '/some/mountpoint']), # 866 call(['umount', '/some/mountpoint']), # 967 call(['umount', '/some/mountpoint']) # 1068 ]69 @patch('kiwi.mount_manager.Command.run')70 @patch('kiwi.mount_manager.MountManager.is_mounted')71 @patch('time.sleep')72 @patch('kiwi.mount_manager.Path.which')73 def test_umount_with_errors_raises_no_lsof_present(74 self, mock_Path_which, mock_sleep, mock_mounted, mock_command75 ):76 def command_call(args):77 if 'umount' in args:78 raise Exception79 mock_Path_which.return_value = None80 mock_command.side_effect = command_call81 mock_mounted.return_value = True82 with raises(KiwiUmountBusyError):83 self.mount_manager.umount()84 @patch('kiwi.mount_manager.Command.run')85 @patch('kiwi.mount_manager.MountManager.is_mounted')86 @patch('time.sleep')87 @patch('kiwi.mount_manager.Path.which')88 def test_umount_with_errors_raises_lsof_present(89 self, mock_Path_which, mock_sleep, mock_mounted, mock_command90 ):91 def command_call(args, raise_on_error=None):92 if 'umount' in args:93 raise Exception94 else:95 call_return = Mock()96 call_return.output = 'HEADLINE\ndata'97 return call_return98 mock_Path_which.return_value = 'lsof'99 mock_command.side_effect = command_call100 mock_mounted.return_value = True101 with raises(KiwiUmountBusyError) as issue:102 self.mount_manager.umount()103 assert 'HEADLINE' in issue.value.message104 @patch('kiwi.mount_manager.Command.run')105 @patch('kiwi.mount_manager.MountManager.is_mounted')106 def test_umount_success(self, mock_mounted, mock_command):107 mock_mounted.return_value = True108 assert self.mount_manager.umount() is True109 mock_command.assert_called_once_with(110 ['umount', '/some/mountpoint']111 )112 @patch('kiwi.mount_manager.Command.run')113 def test_is_mounted_true(self, mock_command):114 command = Mock()115 command.returncode = 0116 mock_command.return_value = command117 assert self.mount_manager.is_mounted() is True118 mock_command.assert_called_once_with(119 command=['mountpoint', '-q', '/some/mountpoint'],120 raise_on_error=False121 )122 @patch('kiwi.mount_manager.Command.run')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!