Best Python code snippet using avocado_python
blockdev_stream_hotunplug.py
Source: blockdev_stream_hotunplug.py
1import time2from virttest.qemu_monitor import QMPCmdError3from provider import job_utils4from provider.blockdev_stream_nowait import BlockdevStreamNowaitTest5class BlockdevStreamHotunplugTest(BlockdevStreamNowaitTest):6 """7 Block stream with hotunplug test8 """9 def hotunplug_frontend_device(self):10 """11 device_del the frontend device during stream,12 the device CAN be removed without any issue13 """14 self.main_vm.monitor.cmd('device_del', {'id': self.base_tag})15 def wait_till_frontend_device_deleted(self):16 """Wait till devices removed from output of query-block"""17 def _is_device_deleted(device):18 for item in self.main_vm.monitor.query("block"):19 if device in item["qdev"]:20 return False21 return True22 tmo = self.params.get_numeric('device_del_timeout', 60)23 for i in range(tmo):24 if _is_device_deleted(self.base_tag):25 break26 time.sleep(1)27 else:28 self.test.fail('Failed to hotunplug the frontend device')29 def hotunplug_format_node(self):30 """31 blockdev-del the format nodes during streaming,32 the nodes CANNOT be removed for they are busy33 """34 try:35 self.main_vm.monitor.cmd('blockdev-del',36 {'node-name': self.params['node']})37 except QMPCmdError as e:38 if self.params['block_node_busy_error'] not in str(e):39 self.test.fail('Unexpected error: %s' % str(e))40 else:41 self.test.fail('blockdev-del succeeded unexpectedly')42 def do_test(self):43 self.snapshot_test()44 self.blockdev_stream()45 job_utils.check_block_jobs_started(46 self.main_vm, [self._job],47 self.params.get_numeric('job_started_timeout', 60)48 )49 self.hotunplug_frontend_device()50 self.wait_till_frontend_device_deleted()51 self.hotunplug_format_node()52 job_utils.check_block_jobs_running(53 self.main_vm, [self._job],54 self.params.get_numeric('job_running_timeout', 300)55 )56 self.main_vm.monitor.cmd(57 "block-job-set-speed", {'device': self._job, 'speed': 0})58 self.wait_stream_job_completed()59 self.check_backing_file()60 self.clone_vm.create()61 self.mount_data_disks()62 self.verify_data_file()63def run(test, params, env):64 """65 Block stream with hotunplug test66 test steps:67 1. boot VM with a 2G data disk68 2. format the data disk and mount it69 3. create a file70 4. add a snapshot image to VM via qmp commands71 5. do live snapshot (base: data, overlay: snapshot)72 6. do block-stream73 7. hotunplug the frontend device with device_del (OK)74 8. hotunplug the format node with blockdev-del (ERROR)75 9. check stream continues and wait stream done76 10. restart VM with the snapshot disk77 11. check the file and its md578 :param test: test object79 :param params: test configuration dict80 :param env: env object81 """82 stream_test = BlockdevStreamHotunplugTest(test, params, env)...
blockdev_mirror_hotunplug.py
Source: blockdev_mirror_hotunplug.py
1import time2from virttest.qemu_monitor import QMPCmdError3from provider.blockdev_mirror_nowait import BlockdevMirrorNowaitTest4class BlockdevMirrorHotunplugTest(BlockdevMirrorNowaitTest):5 """6 Block mirror with hotunplug test7 """8 def hotunplug_frontend_devices(self):9 """10 device_del the frontend devices during mirroring,11 the devices CAN be removed without any issue12 """13 def _device_del(device):14 self.main_vm.monitor.cmd('device_del', {'id': device})15 list(map(_device_del, self._source_images))16 def wait_till_frontend_devices_deleted(self):17 """Wait till devices removed from output of query-block"""18 def _is_device_deleted(device):19 for item in self.main_vm.monitor.query("block"):20 if device in item["qdev"]:21 return False22 return True23 def _wait_till_device_deleted(device):24 tmo = self.params.get_numeric('device_del_timeout', 60)25 for i in range(tmo):26 if _is_device_deleted(device):27 break28 time.sleep(1)29 else:30 self.test.fail('Failed to hotunplug the frontend device')31 list(map(_wait_till_device_deleted, self._source_images))32 def hotunplug_format_nodes(self):33 """34 blockdev-del the format nodes during mirroring,35 the nodes CANNOT be removed for they are busy36 """37 def _blockdev_del(node):38 try:39 self.main_vm.monitor.cmd('blockdev-del', {'node-name': node})40 except QMPCmdError as e:41 err = self.params['block_node_busy_error'] % node42 if err not in str(e):43 self.test.fail('Unexpected error: %s' % str(e))44 else:45 self.test.fail('blockdev-del succeeded unexpectedly')46 list(map(_blockdev_del, self._source_nodes))47 def do_test(self):48 self.blockdev_mirror()49 self.check_block_jobs_started(self._jobs)50 self.hotunplug_frontend_devices()51 self.wait_till_frontend_devices_deleted()52 self.hotunplug_format_nodes()53 self.check_block_jobs_running(self._jobs)54 self.wait_mirror_jobs_completed()55 self.clone_vm_with_mirrored_images()56 self.verify_data_files()57def run(test, params, env):58 """59 Block mirror with hotunplug test60 test steps:61 1. boot VM with a 2G data disk62 2. format the data disk and mount it63 3. create a file64 4. add a target disk for mirror to VM via qmp commands65 5. do block-mirror with sync mode full66 6. hotunplug the frontend device with device_del (OK)67 7. hotunplug the format node with blockdev-del (ERROR)68 8. check mirror continues and wait mirror done69 9. restart VM with the mirror disk70 10. check the file and its md571 :param test: test object72 :param params: test configuration dict73 :param env: env object74 """75 mirror_test = BlockdevMirrorHotunplugTest(test, params, env)...
Check out the latest blogs from LambdaTest on this topic:
Hola Testers! Hope you all had a great Thanksgiving weekend! To make this time more memorable, we at LambdaTest have something to offer you as a token of appreciation.
In addition to the four values, the Agile Manifesto contains twelve principles that are used as guides for all methodologies included under the Agile movement, such as XP, Scrum, and Kanban.
JavaScript is one of the most widely used programming languages. This popularity invites a lot of JavaScript development and testing frameworks to ease the process of working with it. As a result, numerous JavaScript testing frameworks can be used to perform unit testing.
When I started writing tests with Cypress, I was always going to use the user interface to interact and change the application’s state when running tests.
The rapid shift in the use of technology has impacted testing and quality assurance significantly, especially around the cloud adoption of agile development methodologies. With this, the increasing importance of quality and automation testing has risen enough to deliver quality work.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!