Best Python code snippet using localstack_python
test_network_interfaces.py
Source:test_network_interfaces.py
...54 subnet_id = data['Subnet']['SubnetId']55 res_clean_subnet = self.addResourceCleanUp(self.client.delete_subnet,56 SubnetId=subnet_id)57 self.get_subnet_waiter().wait_available(subnet_id)58 data = self.client.create_network_interface(SubnetId=subnet_id)59 ni_id = data['NetworkInterface']['NetworkInterfaceId']60 res_clean_ni = self.addResourceCleanUp(61 self.client.delete_network_interface, NetworkInterfaceId=ni_id)62 self.get_network_interface_waiter().wait_available(ni_id)63 time.sleep(2)64 self.assertRaises('DependencyViolation',65 self.client.delete_subnet,66 SubnetId=subnet_id)67 self.client.delete_network_interface(NetworkInterfaceId=ni_id)68 self.cancelResourceCleanUp(res_clean_ni)69 self.get_network_interface_waiter().wait_delete(ni_id)70 self.client.delete_subnet(SubnetId=subnet_id)71 self.cancelResourceCleanUp(res_clean_subnet)72 self.get_subnet_waiter().wait_delete(subnet_id)73 def test_create_network_interface(self):74 desc = data_utils.rand_name('ni')75 data = self.client.create_network_interface(SubnetId=self.subnet_id,76 Description=desc)77 ni_id = data['NetworkInterface']['NetworkInterfaceId']78 res_clean = self.addResourceCleanUp(79 self.client.delete_network_interface, NetworkInterfaceId=ni_id)80 ni = data['NetworkInterface']81 self.assertEqual(self.vpc_id, ni['VpcId'])82 self.assertEqual(self.subnet_id, ni['SubnetId'])83 self.assertEqual(desc, ni['Description'])84 self.assertNotEmpty(ni.get('Groups'))85 self.assertEqual('default', ni['Groups'][0]['GroupName'])86 address = ni.get('PrivateIpAddress')87 self.assertIsNotNone(address)88 addresses = ni.get('PrivateIpAddresses')89 self.assertIsNotNone(addresses)90 self.assertEqual(1, len(addresses))91 self.assertTrue(addresses[0]['Primary'])92 self.assertEqual(address, addresses[0]['PrivateIpAddress'])93 self.assertIsNotNone(ni.get('MacAddress'))94 self.assertIsNotNone(ni.get('OwnerId'))95 self.assertIsNotNone(ni.get('RequesterManaged'))96 self.assertIsNotNone(ni.get('SourceDestCheck'))97 self.get_network_interface_waiter().wait_available(ni_id)98 self.client.delete_network_interface(NetworkInterfaceId=ni_id)99 self.cancelResourceCleanUp(res_clean)100 self.get_network_interface_waiter().wait_delete(ni_id)101 self.assertRaises('InvalidNetworkInterfaceID.NotFound',102 self.client.describe_network_interfaces,103 NetworkInterfaceIds=[ni_id])104 # TODO(andrey-mp): add creation with addresses105 def test_create_max_network_interface(self):106 # NOTE(andrey-mp): wait some time while all ports will be deleted107 # for this subnet(that are deleting after previous test)108 time.sleep(5)109 data = self.client.describe_subnets(SubnetIds=[self.subnet_id])110 count_before = data['Subnets'][0]['AvailableIpAddressCount']111 addresses = []112 while True:113 try:114 data = self.client.create_network_interface(115 SubnetId=self.subnet_id)116 except botocore.exceptions.ClientError as e:117 error_code = e.response['Error']['Code']118 self.assertEqual('InsufficientFreeAddressesInSubnet',119 error_code, e.message)120 break121 ni_id = data['NetworkInterface']['NetworkInterfaceId']122 res_clean = self.addResourceCleanUp(123 self.client.delete_network_interface,124 NetworkInterfaceId=ni_id)125 addresses.append((ni_id, res_clean))126 data = self.client.describe_subnets(SubnetIds=[self.subnet_id])127 count_after = data['Subnets'][0]['AvailableIpAddressCount']128 # NOTE(andrey-mp): This is strange but Amazon can't create last NI129 # and Openstack can130 self.assertIn(count_after, [0, 1])131 self.assertEqual(len(addresses), count_before - count_after)132 for addr in addresses:133 self.client.delete_network_interface(NetworkInterfaceId=addr[0])134 self.cancelResourceCleanUp(addr[1])135 self.get_network_interface_waiter().wait_delete(addr[0])136 def test_unassign_primary_addresses(self):137 data = self.client.create_network_interface(SubnetId=self.subnet_id)138 ni_id = data['NetworkInterface']['NetworkInterfaceId']139 res_clean = self.addResourceCleanUp(140 self.client.delete_network_interface, NetworkInterfaceId=ni_id)141 primary_address = data['NetworkInterface'].get('PrivateIpAddress')142 self.get_network_interface_waiter().wait_available(ni_id)143 self.assertRaises('InvalidParameterValue',144 self.client.unassign_private_ip_addresses,145 NetworkInterfaceId=ni_id,146 PrivateIpAddresses=[primary_address])147 self.client.delete_network_interface(NetworkInterfaceId=ni_id)148 self.cancelResourceCleanUp(res_clean)149 self.get_network_interface_waiter().wait_delete(ni_id)150 def test_assign_unassign_private_addresses_by_count(self):151 data = self.client.describe_subnets(SubnetIds=[self.subnet_id])152 count = data['Subnets'][0]['AvailableIpAddressCount']153 data = self.client.create_network_interface(SubnetId=self.subnet_id)154 ni_id = data['NetworkInterface']['NetworkInterfaceId']155 res_clean = self.addResourceCleanUp(156 self.client.delete_network_interface, NetworkInterfaceId=ni_id)157 self.get_network_interface_waiter().wait_available(ni_id)158 data = self.client.assign_private_ip_addresses(159 NetworkInterfaceId=ni_id,160 SecondaryPrivateIpAddressCount=2)161 self._wait_assignment(ni_id, data)162 data = self.client.describe_subnets(SubnetIds=[self.subnet_id])163 count_after = data['Subnets'][0]['AvailableIpAddressCount']164 self.assertEqual(count - 3, count_after)165 data = self.client.describe_network_interfaces(166 NetworkInterfaceIds=[ni_id])167 addresses = []168 for addr in data['NetworkInterfaces'][0]['PrivateIpAddresses']:169 if not addr['Primary']:170 addresses.append(addr['PrivateIpAddress'])171 self.assertEqual(2, len(addresses))172 data = self.client.unassign_private_ip_addresses(173 NetworkInterfaceId=ni_id,174 PrivateIpAddresses=addresses)175 self._wait_assignment(ni_id, data)176 data = self.client.describe_subnets(SubnetIds=[self.subnet_id])177 count_after = data['Subnets'][0]['AvailableIpAddressCount']178 self.assertEqual(count - 1, count_after)179 self.client.delete_network_interface(NetworkInterfaceId=ni_id)180 self.cancelResourceCleanUp(res_clean)181 self.get_network_interface_waiter().wait_delete(ni_id)182 def test_assign_unassign_private_addresses_by_addresses(self):183 data = self.client.describe_subnets(SubnetIds=[self.subnet_id])184 count = data['Subnets'][0]['AvailableIpAddressCount']185 data = self.client.create_network_interface(SubnetId=self.subnet_id)186 ni_id = data['NetworkInterface']['NetworkInterfaceId']187 res_clean = self.addResourceCleanUp(188 self.client.delete_network_interface, NetworkInterfaceId=ni_id)189 self.get_network_interface_waiter().wait_available(ni_id)190 addresses = ['10.7.0.10', '10.7.0.11']191 data = self.client.assign_private_ip_addresses(192 NetworkInterfaceId=ni_id,193 PrivateIpAddresses=addresses)194 self._wait_assignment(ni_id, data)195 data = self.client.describe_subnets(SubnetIds=[self.subnet_id])196 count_after = data['Subnets'][0]['AvailableIpAddressCount']197 # NOTE(Alex): Amazon misses 1 IP address by some reason.198 self.assertIn(count_after, [count - 3, count - 4])199 data = self.client.describe_network_interfaces(200 NetworkInterfaceIds=[ni_id])201 assigned_addresses = []202 for addr in data['NetworkInterfaces'][0]['PrivateIpAddresses']:203 if not addr['Primary']:204 self.assertIn(addr['PrivateIpAddress'], addresses)205 assigned_addresses.append(addr['PrivateIpAddress'])206 self.assertEqual(2, len(assigned_addresses))207 data = self.client.unassign_private_ip_addresses(208 NetworkInterfaceId=ni_id,209 PrivateIpAddresses=addresses)210 self._wait_assignment(ni_id, data)211 data = self.client.describe_subnets(SubnetIds=[self.subnet_id])212 count_after = data['Subnets'][0]['AvailableIpAddressCount']213 self.assertIn(count_after, [count - 1, count - 2])214 self.client.delete_network_interface(NetworkInterfaceId=ni_id)215 self.cancelResourceCleanUp(res_clean)216 self.get_network_interface_waiter().wait_delete(ni_id)217 @testtools.skipUnless(CONF.aws.image_id, "image id is not defined")218 def test_attach_network_interface(self):219 data = self.client.create_network_interface(SubnetId=self.subnet_id)220 ni_id = data['NetworkInterface']['NetworkInterfaceId']221 self.addResourceCleanUp(self.client.delete_network_interface,222 NetworkInterfaceId=ni_id)223 ni = data['NetworkInterface']224 address = ni.get('PrivateIpAddress')225 self.assertIsNotNone(address)226 self.get_network_interface_waiter().wait_available(ni_id)227 instance_id = self.run_instance(SubnetId=self.subnet_id)228 # NOTE(andrey-mp): Amazon can't attach to device index = 0229 kwargs = {230 'DeviceIndex': 0,231 'InstanceId': instance_id,232 'NetworkInterfaceId': ni_id233 }234 self.assertRaises('InvalidParameterValue',235 self.client.attach_network_interface,236 **kwargs)237 kwargs = {238 'DeviceIndex': 2,239 'InstanceId': instance_id,240 'NetworkInterfaceId': ni_id241 }242 data = self.client.attach_network_interface(*[], **kwargs)243 attachment_id = data['AttachmentId']244 instance = self.get_instance(instance_id)245 nis = instance.get('NetworkInterfaces', [])246 self.assertEqual(2, len(nis))247 ids = [nis[0]['Attachment']['AttachmentId'],248 nis[1]['Attachment']['AttachmentId']]249 self.assertIn(attachment_id, ids)250 self.assertRaises('InvalidParameterValue',251 self.client.delete_network_interface,252 NetworkInterfaceId=ni_id)253 self.client.detach_network_interface(AttachmentId=attachment_id)254 self.client.terminate_instances(InstanceIds=[instance_id])255 self.get_instance_waiter().wait_delete(instance_id)256 @testtools.skipUnless(CONF.aws.image_id, "image id is not defined")257 def test_network_interfaces_are_not_deleted_on_termination(self):258 instance_id = self.run_instance(SubnetId=self.subnet_id)259 instance = self.get_instance(instance_id)260 nis = instance.get('NetworkInterfaces', [])261 self.assertEqual(1, len(nis))262 self.assertTrue(nis[0]['Attachment']['DeleteOnTermination'])263 ni_id = nis[0]['NetworkInterfaceId']264 attachment_id = nis[0]['Attachment']['AttachmentId']265 kwargs = {266 'NetworkInterfaceId': ni_id,267 'Attachment': {268 'AttachmentId': attachment_id,269 'DeleteOnTermination': False,270 }271 }272 self.client.modify_network_interface_attribute(*[], **kwargs)273 clean_ni = self.addResourceCleanUp(274 self.client.delete_network_interface, NetworkInterfaceId=ni_id)275 data = self.client.create_network_interface(SubnetId=self.subnet_id)276 ni_id2 = data['NetworkInterface']['NetworkInterfaceId']277 clean_ni2 = self.addResourceCleanUp(278 self.client.delete_network_interface, NetworkInterfaceId=ni_id2)279 self.get_network_interface_waiter().wait_available(ni_id2)280 kwargs = {281 'DeviceIndex': 2,282 'InstanceId': instance_id,283 'NetworkInterfaceId': ni_id2284 }285 data = self.client.attach_network_interface(*[], **kwargs)286 attachment_id = data['AttachmentId']287 instance = self.get_instance(instance_id)288 nis = instance.get('NetworkInterfaces', [])289 self.assertEqual(2, len(nis))290 ni = nis[0]291 if ni['Attachment']['AttachmentId'] != attachment_id:292 ni = nis[1]293 self.assertEqual(attachment_id, ni['Attachment']['AttachmentId'])294 self.assertFalse(ni['Attachment']['DeleteOnTermination'])295 self.client.terminate_instances(InstanceIds=[instance_id])296 self.get_instance_waiter().wait_delete(instance_id)297 self.get_network_interface_waiter().wait_available(ni_id)298 self.get_network_interface_waiter().wait_available(ni_id2)299 self.client.delete_network_interface(NetworkInterfaceId=ni_id)300 self.cancelResourceCleanUp(clean_ni)301 self.get_network_interface_waiter().wait_delete(ni_id)302 self.client.delete_network_interface(NetworkInterfaceId=ni_id2)303 self.cancelResourceCleanUp(clean_ni2)304 self.get_network_interface_waiter().wait_delete(ni_id2)305 @testtools.skipUnless(CONF.aws.image_id, "image id is not defined")306 def test_network_interfaces_are_deleted_on_termination(self):307 instance_id = self.run_instance(SubnetId=self.subnet_id)308 instance = self.get_instance(instance_id)309 nis = instance.get('NetworkInterfaces', [])310 self.assertEqual(1, len(nis))311 self.assertTrue(nis[0]['Attachment']['DeleteOnTermination'])312 ni_id = nis[0]['NetworkInterfaceId']313 data = self.client.create_network_interface(SubnetId=self.subnet_id)314 ni_id2 = data['NetworkInterface']['NetworkInterfaceId']315 self.addResourceCleanUp(self.client.delete_network_interface,316 NetworkInterfaceId=ni_id2)317 self.get_network_interface_waiter().wait_available(ni_id2)318 kwargs = {319 'DeviceIndex': 2,320 'InstanceId': instance_id,321 'NetworkInterfaceId': ni_id2322 }323 data = self.client.attach_network_interface(*[], **kwargs)324 attachment_id = data['AttachmentId']325 kwargs = {326 'NetworkInterfaceId': ni_id2,327 'Attachment': {328 'AttachmentId': attachment_id,329 'DeleteOnTermination': True,330 }331 }332 self.client.modify_network_interface_attribute(*[], **kwargs)333 self.client.terminate_instances(InstanceIds=[instance_id])334 self.get_instance_waiter().wait_delete(instance_id)335 self.get_network_interface_waiter().wait_delete(ni_id)336 self.get_network_interface_waiter().wait_delete(ni_id2)337 def test_network_interface_attribute_description(self):338 desc = data_utils.rand_name('ni')339 data = self.client.create_network_interface(340 SubnetId=self.subnet_id, Description=desc)341 ni_id = data['NetworkInterface']['NetworkInterfaceId']342 res_clean = self.addResourceCleanUp(343 self.client.delete_network_interface, NetworkInterfaceId=ni_id)344 self.get_network_interface_waiter().wait_available(ni_id)345 data = self.client.describe_network_interface_attribute(346 NetworkInterfaceId=ni_id, Attribute='description')347 self.assertEqual(desc, data['Description']['Value'])348 new_desc = data_utils.rand_name('new-ni')349 self.client.modify_network_interface_attribute(350 NetworkInterfaceId=ni_id, Description={'Value': new_desc})351 data = self.client.describe_network_interface_attribute(352 NetworkInterfaceId=ni_id, Attribute='description')353 self.assertEqual(new_desc, data['Description']['Value'])354 self.client.delete_network_interface(NetworkInterfaceId=ni_id)355 self.cancelResourceCleanUp(res_clean)356 self.get_network_interface_waiter().wait_delete(ni_id)357 def test_network_interface_attribute_source_dest_check(self):358 data = self.client.create_network_interface(SubnetId=self.subnet_id)359 ni_id = data['NetworkInterface']['NetworkInterfaceId']360 res_clean = self.addResourceCleanUp(361 self.client.delete_network_interface, NetworkInterfaceId=ni_id)362 self.get_network_interface_waiter().wait_available(ni_id)363 self.client.modify_network_interface_attribute(364 NetworkInterfaceId=ni_id, SourceDestCheck={'Value': False})365 data = self.client.describe_network_interface_attribute(366 NetworkInterfaceId=ni_id, Attribute='sourceDestCheck')367 self.assertFalse(data['SourceDestCheck']['Value'])368 # NOTE(andrey-mp): ResetNetworkInterfaceAttribute had inadequate json369 # scheme in botocore and doesn't work against Amazon.370 self.client.modify_network_interface_attribute(371 NetworkInterfaceId=ni_id, SourceDestCheck={'Value': True})372 data = self.client.describe_network_interface_attribute(373 NetworkInterfaceId=ni_id, Attribute='sourceDestCheck')374 self.assertEqual(True, data['SourceDestCheck']['Value'])375 self.client.delete_network_interface(NetworkInterfaceId=ni_id)376 self.cancelResourceCleanUp(res_clean)377 self.get_network_interface_waiter().wait_delete(ni_id)378 @testtools.skipUnless(CONF.aws.image_id, "image id is not defined")379 def test_network_interface_attribute_attachment(self):380 instance_id = self.run_instance(SubnetId=self.subnet_id)381 instance = self.get_instance(instance_id)382 nis = instance.get('NetworkInterfaces', [])383 self.assertEqual(1, len(nis))384 self.assertTrue(nis[0]['Attachment']['DeleteOnTermination'])385 ni_id = nis[0]['NetworkInterfaceId']386 data = self.client.describe_network_interface_attribute(387 NetworkInterfaceId=ni_id, Attribute='attachment')388 self.assertIn('Attachment', data)389 self.assertTrue(data['Attachment'].get('AttachmentId'))390 self.assertTrue(data['Attachment'].get('DeleteOnTermination'))391 self.assertEqual(0, data['Attachment'].get('DeviceIndex'))392 self.assertEqual(instance_id, data['Attachment'].get('InstanceId'))393 self.assertEqual('attached', data['Attachment'].get('Status'))394 self.client.terminate_instances(InstanceIds=[instance_id])395 self.get_instance_waiter().wait_delete(instance_id)396 def test_network_interface_attribute_empty_attachment(self):397 data = self.client.create_network_interface(SubnetId=self.subnet_id)398 ni_id = data['NetworkInterface']['NetworkInterfaceId']399 res_clean = self.addResourceCleanUp(400 self.client.delete_network_interface, NetworkInterfaceId=ni_id)401 self.get_network_interface_waiter().wait_available(ni_id)402 data = self.client.describe_network_interface_attribute(403 NetworkInterfaceId=ni_id, Attribute='attachment')404 self.assertNotIn('Attachment', data)405 self.client.delete_network_interface(NetworkInterfaceId=ni_id)406 self.cancelResourceCleanUp(res_clean)407 self.get_network_interface_waiter().wait_delete(ni_id)408 def test_network_interface_attribute_group_set(self):409 data = self.client.create_network_interface(SubnetId=self.subnet_id)410 ni_id = data['NetworkInterface']['NetworkInterfaceId']411 res_clean = self.addResourceCleanUp(412 self.client.delete_network_interface, NetworkInterfaceId=ni_id)413 self.get_network_interface_waiter().wait_available(ni_id)414 data = self.client.describe_network_interface_attribute(415 NetworkInterfaceId=ni_id, Attribute='groupSet')416 self.assertIn('Groups', data)417 self.assertEqual(1, len(data['Groups']))418 self.assertEqual('default', data['Groups'][0]['GroupName'])419 self.client.delete_network_interface(NetworkInterfaceId=ni_id)420 self.cancelResourceCleanUp(res_clean)421 self.get_network_interface_waiter().wait_delete(ni_id)422 def test_instance_attributes_negative(self):423 data = self.client.create_network_interface(SubnetId=self.subnet_id)424 ni_id = data['NetworkInterface']['NetworkInterfaceId']425 res_clean = self.addResourceCleanUp(426 self.client.delete_network_interface, NetworkInterfaceId=ni_id)427 self.get_network_interface_waiter().wait_available(ni_id)428 self.assertRaises('InvalidParameterCombination',429 self.client.describe_network_interface_attribute,430 NetworkInterfaceId=ni_id)431 self.assertRaises('InvalidParameterValue',432 self.client.describe_network_interface_attribute,433 NetworkInterfaceId=ni_id, Attribute='fake')434 self.assertRaises('InvalidNetworkInterfaceID.NotFound',435 self.client.describe_network_interface_attribute,436 NetworkInterfaceId='eni-0', Attribute='description')437 self.assertRaises('InvalidParameterCombination',...
test_elastic_network_interfaces.py
Source:test_elastic_network_interfaces.py
...15 conn = boto.connect_vpc("the_key", "the_secret")16 vpc = conn.create_vpc("10.0.0.0/16")17 subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")18 with assert_raises(EC2ResponseError) as ex:19 eni = conn.create_network_interface(subnet.id, dry_run=True)20 ex.exception.error_code.should.equal("DryRunOperation")21 ex.exception.status.should.equal(400)22 ex.exception.message.should.equal(23 "An error occurred (DryRunOperation) when calling the CreateNetworkInterface operation: Request would have succeeded, but DryRun flag is set"24 )25 eni = conn.create_network_interface(subnet.id)26 all_enis = conn.get_all_network_interfaces()27 all_enis.should.have.length_of(1)28 eni = all_enis[0]29 eni.groups.should.have.length_of(0)30 eni.private_ip_addresses.should.have.length_of(1)31 eni.private_ip_addresses[0].private_ip_address.startswith("10.").should.be.true32 with assert_raises(EC2ResponseError) as ex:33 conn.delete_network_interface(eni.id, dry_run=True)34 ex.exception.error_code.should.equal("DryRunOperation")35 ex.exception.status.should.equal(400)36 ex.exception.message.should.equal(37 "An error occurred (DryRunOperation) when calling the DeleteNetworkInterface operation: Request would have succeeded, but DryRun flag is set"38 )39 conn.delete_network_interface(eni.id)40 all_enis = conn.get_all_network_interfaces()41 all_enis.should.have.length_of(0)42 with assert_raises(EC2ResponseError) as cm:43 conn.delete_network_interface(eni.id)44 cm.exception.error_code.should.equal("InvalidNetworkInterfaceID.NotFound")45 cm.exception.status.should.equal(400)46 cm.exception.request_id.should_not.be.none47@mock_ec2_deprecated48def test_elastic_network_interfaces_subnet_validation():49 conn = boto.connect_vpc("the_key", "the_secret")50 with assert_raises(EC2ResponseError) as cm:51 conn.create_network_interface("subnet-abcd1234")52 cm.exception.error_code.should.equal("InvalidSubnetID.NotFound")53 cm.exception.status.should.equal(400)54 cm.exception.request_id.should_not.be.none55@mock_ec2_deprecated56def test_elastic_network_interfaces_with_private_ip():57 conn = boto.connect_vpc("the_key", "the_secret")58 vpc = conn.create_vpc("10.0.0.0/16")59 subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")60 private_ip = "54.0.0.1"61 eni = conn.create_network_interface(subnet.id, private_ip)62 all_enis = conn.get_all_network_interfaces()63 all_enis.should.have.length_of(1)64 eni = all_enis[0]65 eni.groups.should.have.length_of(0)66 eni.private_ip_addresses.should.have.length_of(1)67 eni.private_ip_addresses[0].private_ip_address.should.equal(private_ip)68@mock_ec2_deprecated69def test_elastic_network_interfaces_with_groups():70 conn = boto.connect_vpc("the_key", "the_secret")71 vpc = conn.create_vpc("10.0.0.0/16")72 subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")73 security_group1 = conn.create_security_group(74 "test security group #1", "this is a test security group"75 )76 security_group2 = conn.create_security_group(77 "test security group #2", "this is a test security group"78 )79 conn.create_network_interface(80 subnet.id, groups=[security_group1.id, security_group2.id]81 )82 all_enis = conn.get_all_network_interfaces()83 all_enis.should.have.length_of(1)84 eni = all_enis[0]85 eni.groups.should.have.length_of(2)86 set([group.id for group in eni.groups]).should.equal(87 set([security_group1.id, security_group2.id])88 )89@requires_boto_gte("2.12.0")90@mock_ec2_deprecated91def test_elastic_network_interfaces_modify_attribute():92 conn = boto.connect_vpc("the_key", "the_secret")93 vpc = conn.create_vpc("10.0.0.0/16")94 subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")95 security_group1 = conn.create_security_group(96 "test security group #1", "this is a test security group"97 )98 security_group2 = conn.create_security_group(99 "test security group #2", "this is a test security group"100 )101 conn.create_network_interface(subnet.id, groups=[security_group1.id])102 all_enis = conn.get_all_network_interfaces()103 all_enis.should.have.length_of(1)104 eni = all_enis[0]105 eni.groups.should.have.length_of(1)106 eni.groups[0].id.should.equal(security_group1.id)107 with assert_raises(EC2ResponseError) as ex:108 conn.modify_network_interface_attribute(109 eni.id, "groupset", [security_group2.id], dry_run=True110 )111 ex.exception.error_code.should.equal("DryRunOperation")112 ex.exception.status.should.equal(400)113 ex.exception.message.should.equal(114 "An error occurred (DryRunOperation) when calling the ModifyNetworkInterface operation: Request would have succeeded, but DryRun flag is set"115 )116 conn.modify_network_interface_attribute(eni.id, "groupset", [security_group2.id])117 all_enis = conn.get_all_network_interfaces()118 all_enis.should.have.length_of(1)119 eni = all_enis[0]120 eni.groups.should.have.length_of(1)121 eni.groups[0].id.should.equal(security_group2.id)122@mock_ec2_deprecated123def test_elastic_network_interfaces_filtering():124 conn = boto.connect_vpc("the_key", "the_secret")125 vpc = conn.create_vpc("10.0.0.0/16")126 subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")127 security_group1 = conn.create_security_group(128 "test security group #1", "this is a test security group"129 )130 security_group2 = conn.create_security_group(131 "test security group #2", "this is a test security group"132 )133 eni1 = conn.create_network_interface(134 subnet.id, groups=[security_group1.id, security_group2.id]135 )136 eni2 = conn.create_network_interface(subnet.id, groups=[security_group1.id])137 eni3 = conn.create_network_interface(subnet.id, description="test description")138 all_enis = conn.get_all_network_interfaces()139 all_enis.should.have.length_of(3)140 # Filter by NetworkInterfaceId141 enis_by_id = conn.get_all_network_interfaces([eni1.id])142 enis_by_id.should.have.length_of(1)143 set([eni.id for eni in enis_by_id]).should.equal(set([eni1.id]))144 # Filter by ENI ID145 enis_by_id = conn.get_all_network_interfaces(146 filters={"network-interface-id": eni1.id}147 )148 enis_by_id.should.have.length_of(1)149 set([eni.id for eni in enis_by_id]).should.equal(set([eni1.id]))150 # Filter by Security Group151 enis_by_group = conn.get_all_network_interfaces(152 filters={"group-id": security_group1.id}153 )154 enis_by_group.should.have.length_of(2)155 set([eni.id for eni in enis_by_group]).should.equal(set([eni1.id, eni2.id]))156 # Filter by ENI ID and Security Group157 enis_by_group = conn.get_all_network_interfaces(158 filters={"network-interface-id": eni1.id, "group-id": security_group1.id}159 )160 enis_by_group.should.have.length_of(1)161 set([eni.id for eni in enis_by_group]).should.equal(set([eni1.id]))162 # Filter by Description163 enis_by_description = conn.get_all_network_interfaces(164 filters={"description": eni3.description}165 )166 enis_by_description.should.have.length_of(1)167 enis_by_description[0].description.should.equal(eni3.description)168 # Unsupported filter169 conn.get_all_network_interfaces.when.called_with(170 filters={"not-implemented-filter": "foobar"}171 ).should.throw(NotImplementedError)172@mock_ec2173def test_elastic_network_interfaces_get_by_tag_name():174 ec2 = boto3.resource("ec2", region_name="us-west-2")175 ec2_client = boto3.client("ec2", region_name="us-west-2")176 vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")177 subnet = ec2.create_subnet(178 VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-2a"179 )180 eni1 = ec2.create_network_interface(181 SubnetId=subnet.id, PrivateIpAddress="10.0.10.5"182 )183 with assert_raises(ClientError) as ex:184 eni1.create_tags(Tags=[{"Key": "Name", "Value": "eni1"}], DryRun=True)185 ex.exception.response["Error"]["Code"].should.equal("DryRunOperation")186 ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)187 ex.exception.response["Error"]["Message"].should.equal(188 "An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set"189 )190 eni1.create_tags(Tags=[{"Key": "Name", "Value": "eni1"}])191 # The status of the new interface should be 'available'192 waiter = ec2_client.get_waiter("network_interface_available")193 waiter.wait(NetworkInterfaceIds=[eni1.id])194 filters = [{"Name": "tag:Name", "Values": ["eni1"]}]195 enis = list(ec2.network_interfaces.filter(Filters=filters))196 enis.should.have.length_of(1)197 filters = [{"Name": "tag:Name", "Values": ["wrong-name"]}]198 enis = list(ec2.network_interfaces.filter(Filters=filters))199 enis.should.have.length_of(0)200@mock_ec2201def test_elastic_network_interfaces_get_by_availability_zone():202 ec2 = boto3.resource("ec2", region_name="us-west-2")203 ec2_client = boto3.client("ec2", region_name="us-west-2")204 vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")205 subnet1 = ec2.create_subnet(206 VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-2a"207 )208 subnet2 = ec2.create_subnet(209 VpcId=vpc.id, CidrBlock="10.0.1.0/24", AvailabilityZone="us-west-2b"210 )211 eni1 = ec2.create_network_interface(212 SubnetId=subnet1.id, PrivateIpAddress="10.0.0.15"213 )214 eni2 = ec2.create_network_interface(215 SubnetId=subnet2.id, PrivateIpAddress="10.0.1.15"216 )217 # The status of the new interface should be 'available'218 waiter = ec2_client.get_waiter("network_interface_available")219 waiter.wait(NetworkInterfaceIds=[eni1.id, eni2.id])220 filters = [{"Name": "availability-zone", "Values": ["us-west-2a"]}]221 enis = list(ec2.network_interfaces.filter(Filters=filters))222 enis.should.have.length_of(1)223 filters = [{"Name": "availability-zone", "Values": ["us-west-2c"]}]224 enis = list(ec2.network_interfaces.filter(Filters=filters))225 enis.should.have.length_of(0)226@mock_ec2227def test_elastic_network_interfaces_get_by_private_ip():228 ec2 = boto3.resource("ec2", region_name="us-west-2")229 ec2_client = boto3.client("ec2", region_name="us-west-2")230 vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")231 subnet = ec2.create_subnet(232 VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-2a"233 )234 eni1 = ec2.create_network_interface(235 SubnetId=subnet.id, PrivateIpAddress="10.0.10.5"236 )237 # The status of the new interface should be 'available'238 waiter = ec2_client.get_waiter("network_interface_available")239 waiter.wait(NetworkInterfaceIds=[eni1.id])240 filters = [{"Name": "private-ip-address", "Values": ["10.0.10.5"]}]241 enis = list(ec2.network_interfaces.filter(Filters=filters))242 enis.should.have.length_of(1)243 filters = [{"Name": "private-ip-address", "Values": ["10.0.10.10"]}]244 enis = list(ec2.network_interfaces.filter(Filters=filters))245 enis.should.have.length_of(0)246 filters = [{"Name": "addresses.private-ip-address", "Values": ["10.0.10.5"]}]247 enis = list(ec2.network_interfaces.filter(Filters=filters))248 enis.should.have.length_of(1)249 filters = [{"Name": "addresses.private-ip-address", "Values": ["10.0.10.10"]}]250 enis = list(ec2.network_interfaces.filter(Filters=filters))251 enis.should.have.length_of(0)252@mock_ec2253def test_elastic_network_interfaces_get_by_vpc_id():254 ec2 = boto3.resource("ec2", region_name="us-west-2")255 ec2_client = boto3.client("ec2", region_name="us-west-2")256 vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")257 subnet = ec2.create_subnet(258 VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-2a"259 )260 eni1 = ec2.create_network_interface(261 SubnetId=subnet.id, PrivateIpAddress="10.0.10.5"262 )263 # The status of the new interface should be 'available'264 waiter = ec2_client.get_waiter("network_interface_available")265 waiter.wait(NetworkInterfaceIds=[eni1.id])266 filters = [{"Name": "vpc-id", "Values": [subnet.vpc_id]}]267 enis = list(ec2.network_interfaces.filter(Filters=filters))268 enis.should.have.length_of(1)269 filters = [{"Name": "vpc-id", "Values": ["vpc-aaaa1111"]}]270 enis = list(ec2.network_interfaces.filter(Filters=filters))271 enis.should.have.length_of(0)272@mock_ec2273def test_elastic_network_interfaces_get_by_subnet_id():274 ec2 = boto3.resource("ec2", region_name="us-west-2")275 ec2_client = boto3.client("ec2", region_name="us-west-2")276 vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")277 subnet = ec2.create_subnet(278 VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-2a"279 )280 eni1 = ec2.create_network_interface(281 SubnetId=subnet.id, PrivateIpAddress="10.0.10.5"282 )283 # The status of the new interface should be 'available'284 waiter = ec2_client.get_waiter("network_interface_available")285 waiter.wait(NetworkInterfaceIds=[eni1.id])286 filters = [{"Name": "subnet-id", "Values": [subnet.id]}]287 enis = list(ec2.network_interfaces.filter(Filters=filters))288 enis.should.have.length_of(1)289 filters = [{"Name": "subnet-id", "Values": ["subnet-aaaa1111"]}]290 enis = list(ec2.network_interfaces.filter(Filters=filters))291 enis.should.have.length_of(0)292@mock_ec2293def test_elastic_network_interfaces_get_by_description():294 ec2 = boto3.resource("ec2", region_name="us-west-2")295 ec2_client = boto3.client("ec2", region_name="us-west-2")296 vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")297 subnet = ec2.create_subnet(298 VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-2a"299 )300 eni1 = ec2.create_network_interface(301 SubnetId=subnet.id, PrivateIpAddress="10.0.10.5", Description="test interface"302 )303 # The status of the new interface should be 'available'304 waiter = ec2_client.get_waiter("network_interface_available")305 waiter.wait(NetworkInterfaceIds=[eni1.id])306 filters = [{"Name": "description", "Values": [eni1.description]}]307 enis = list(ec2.network_interfaces.filter(Filters=filters))308 enis.should.have.length_of(1)309 filters = [{"Name": "description", "Values": ["bad description"]}]310 enis = list(ec2.network_interfaces.filter(Filters=filters))311 enis.should.have.length_of(0)312@mock_ec2313def test_elastic_network_interfaces_describe_network_interfaces_with_filter():314 ec2 = boto3.resource("ec2", region_name="us-west-2")315 ec2_client = boto3.client("ec2", region_name="us-west-2")316 vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")317 subnet = ec2.create_subnet(318 VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-2a"319 )320 eni1 = ec2.create_network_interface(321 SubnetId=subnet.id, PrivateIpAddress="10.0.10.5", Description="test interface"322 )323 # The status of the new interface should be 'available'324 waiter = ec2_client.get_waiter("network_interface_available")325 waiter.wait(NetworkInterfaceIds=[eni1.id])326 # Filter by network-interface-id327 response = ec2_client.describe_network_interfaces(328 Filters=[{"Name": "network-interface-id", "Values": [eni1.id]}]329 )330 response["NetworkInterfaces"].should.have.length_of(1)331 response["NetworkInterfaces"][0]["NetworkInterfaceId"].should.equal(eni1.id)332 response["NetworkInterfaces"][0]["PrivateIpAddress"].should.equal(333 eni1.private_ip_address334 )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!