Best Python code snippet using localstack_python
test_volumes.py
Source:test_volumes.py
...111 self.client.delete_volume(VolumeId=volume_id)112 self.cancelResourceCleanUp(res_clean)113 self.get_volume_waiter().wait_delete(volume_id)114 @testtools.skipUnless(CONF.aws.ebs_image_id, "ebs image id is not defined")115 def test_attach_detach_volume(self):116 clean_dict = {}117 instance_id = self.run_instance(ImageId=CONF.aws.ebs_image_id,118 clean_dict=clean_dict)119 clean_i = clean_dict['instance']120 kwargs = {121 'Size': 1,122 'AvailabilityZone': CONF.aws.aws_zone123 }124 data = self.client.create_volume(*[], **kwargs)125 volume_id = data['VolumeId']126 clean_v = self.addResourceCleanUp(self.client.delete_volume,127 VolumeId=volume_id)128 self.get_volume_waiter().wait_available(volume_id)129 kwargs = {130 'Device': '/dev/sdh',131 'InstanceId': instance_id,132 'VolumeId': volume_id,133 }134 self.client.attach_volume(*[], **kwargs)135 clean_vi = self.addResourceCleanUp(self.client.detach_volume,136 VolumeId=volume_id)137 self.get_volume_attachment_waiter().wait_available(138 volume_id, final_set=('attached'))139 # reorder cleanups to avoid error on volume delete140 self.cancelResourceCleanUp(clean_i)141 clean_i = self.addResourceCleanUp(self.client.terminate_instances,142 InstanceIds=[instance_id])143 data = self.client.describe_volumes(VolumeIds=[volume_id])144 self.assertEqual(1, len(data['Volumes']))145 volume = data['Volumes'][0]146 self.assertEqual('in-use', volume['State'])147 self.assertEqual(1, len(volume['Attachments']))148 attachment = volume['Attachments'][0]149 self.assertFalse(attachment['DeleteOnTermination'])150 self.assertIsNotNone(attachment['Device'])151 self.assertEqual(instance_id, attachment['InstanceId'])152 self.assertEqual(volume_id, attachment['VolumeId'])153 data = self.client.describe_instances(InstanceIds=[instance_id])154 self.assertEqual(1, len(data.get('Reservations', [])))155 self.assertEqual(1, len(data['Reservations'][0].get('Instances', [])))156 bdms = data['Reservations'][0]['Instances'][0]['BlockDeviceMappings']157 self.assertNotEmpty(bdms)158 self.assertIn('DeviceName', bdms[0])159 self.assertIn('Ebs', bdms[0])160 # stop instance to prevent 'busy' state of detached volume161 data = self.client.stop_instances(InstanceIds=[instance_id])162 self.get_instance_waiter().wait_available(instance_id,163 final_set=('stopped'))164 self.client.detach_volume(VolumeId=volume_id)165 self.get_volume_attachment_waiter().wait_delete(volume_id)166 self.cancelResourceCleanUp(clean_vi)167 data = self.client.describe_volumes(VolumeIds=[volume_id])168 self.assertEqual(1, len(data['Volumes']))169 volume = data['Volumes'][0]170 self.assertEqual('available', volume['State'])171 self.assertEqual(0, len(volume['Attachments']))172 self.client.delete_volume(VolumeId=volume_id)173 self.cancelResourceCleanUp(clean_v)174 self.get_volume_waiter().wait_delete(volume_id)175 self.client.terminate_instances(InstanceIds=[instance_id])176 self.cancelResourceCleanUp(clean_i)177 self.get_instance_waiter().wait_delete(instance_id)178 @testtools.skipUnless(CONF.aws.ebs_image_id, "EBS image id is not defined")179 def test_attaching_stage(self):180 clean_dict = {}181 instance_id = self.run_instance(ImageId=CONF.aws.ebs_image_id,182 clean_dict=clean_dict)183 clean_i = clean_dict['instance']184 data = self.client.create_volume(185 AvailabilityZone=CONF.aws.aws_zone, Size=1)186 volume_id = data['VolumeId']187 clean_v = self.addResourceCleanUp(self.client.delete_volume,188 VolumeId=volume_id)189 self.get_volume_waiter().wait_available(volume_id)190 device_name = '/dev/xvdh'191 kwargs = {192 'Device': device_name,193 'InstanceId': instance_id,194 'VolumeId': volume_id,195 }196 data = self.client.attach_volume(*[], **kwargs)197 clean_vi = self.addResourceCleanUp(self.client.detach_volume,198 VolumeId=volume_id)199 self.assertEqual('attaching', data['State'])200 if CONF.aws.run_incompatible_tests:201 bdt = self.get_instance_bdm(instance_id, device_name)202 self.assertIsNotNone(bdt)203 self.assertEqual('attaching', bdt['Ebs']['Status'])204 self.get_volume_attachment_waiter().wait_available(205 volume_id, final_set=('attached'))206 # reorder cleanups to avoid error on volume delete207 self.cancelResourceCleanUp(clean_i)208 clean_i = self.addResourceCleanUp(self.client.terminate_instances,209 InstanceIds=[instance_id])210 # stop instance to prevent 'busy' state of detached volume211 data = self.client.stop_instances(InstanceIds=[instance_id])212 self.get_instance_waiter().wait_available(instance_id,213 final_set=('stopped'))214 self.client.detach_volume(VolumeId=volume_id)215 self.get_volume_attachment_waiter().wait_delete(volume_id)216 self.cancelResourceCleanUp(clean_vi)217 self.client.delete_volume(VolumeId=volume_id)218 self.cancelResourceCleanUp(clean_v)219 self.get_volume_waiter().wait_delete(volume_id)220 self.client.terminate_instances(InstanceIds=[instance_id])221 self.cancelResourceCleanUp(clean_i)222 self.get_instance_waiter().wait_delete(instance_id)223 @testtools.skipUnless(CONF.aws.run_incompatible_tests,224 "Volume statuses are not implemented")225 @testtools.skipUnless(CONF.aws.ebs_image_id, "EBS image id is not defined")226 def test_delete_detach_attached_volume(self):227 instance_id = self.run_instance(ImageId=CONF.aws.ebs_image_id)228 kwargs = {229 'Size': 1,230 'AvailabilityZone': CONF.aws.aws_zone231 }232 data = self.client.create_volume(*[], **kwargs)233 volume_id = data['VolumeId']234 clean_v = self.addResourceCleanUp(self.client.delete_volume,235 VolumeId=volume_id)236 self.get_volume_waiter().wait_available(volume_id)237 kwargs = {238 'Device': '/dev/sdh',239 'InstanceId': instance_id,240 'VolumeId': volume_id,241 }242 self.client.attach_volume(*[], **kwargs)243 clean_vi = self.addResourceCleanUp(self.client.detach_volume,244 VolumeId=volume_id)245 self.get_volume_attachment_waiter().wait_available(246 volume_id, final_set=('attached'))247 self.assertRaises('VolumeInUse',248 self.client.attach_volume,249 **kwargs)250 kwargs['Device'] = '/dev/sdi'251 self.assertRaises('VolumeInUse',252 self.client.attach_volume,253 **kwargs)254 self.assertRaises('VolumeInUse',255 self.client.delete_volume,256 VolumeId=volume_id)257 # stop instance to prevent 'busy' state of detached volume258 data = self.client.stop_instances(InstanceIds=[instance_id])259 self.get_instance_waiter().wait_available(instance_id,260 final_set=('stopped'))261 self.client.detach_volume(VolumeId=volume_id)262 self.cancelResourceCleanUp(clean_vi)263 self.get_volume_attachment_waiter().wait_delete(volume_id)264 self.assertRaises('IncorrectState',265 self.client.detach_volume,266 VolumeId=volume_id)267 self.client.delete_volume(VolumeId=volume_id)268 self.cancelResourceCleanUp(clean_v)269 self.get_volume_waiter().wait_delete(volume_id)270 self.assertRaises('InvalidVolume.NotFound',271 self.client.detach_volume,272 VolumeId=volume_id)273 self.client.terminate_instances(InstanceIds=[instance_id])274 self.get_instance_waiter().wait_delete(instance_id)275 @testtools.skipUnless(CONF.aws.image_id, "image id is not defined")...
compute.py
Source:compute.py
...76 msg = _('Driver attach volume failed: %s') % e77 LOG.error(msg)78 raise webob.exc.HTTPBadRequest(explanation=msg)79 return webob.Response(status_int=202)80 def detach_volume(self, req, id, body):81 context = None82 try:83 volume = self.compute_driver.get(context, id)84 except exception.VolumeNotFound as error:85 raise webob.exc.HTTPNotFound(explanation=error.msg)86 attachment_id = body.get('attachment_id', None)87 self.compute_driver.detach_volume(context, volume, attachment_id)88 return webob.Response(status_int=202)89def create_router(mapper):90 controller = ContainerController()91 mapper.connect('/server/list',92 controller=controller,93 action='list',94 conditions=dict(method=['GET']))95 mapper.connect('/server/attach-volume',96 controller=controller,97 action='attach-volume',98 conditions=dict(method=['POST']))99 mapper.connect('/server/detach_volume',100 controller=controller,101 action='detach_volume',...
detach_volume.py
Source:detach_volume.py
...28 def run(self, result):29 """execute the test"""30 if not self.setup_done:31 self.setup()32 status = op_utils.detach_volume(self.server_id, self.volume_id)33 if status:34 result.update({"detach_volume": 1})35 LOG.info("Detach volume from server successful!")36 else:37 result.update({"detach_volume": 0})...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!