Best Python code snippet using localstack_python
test_ipam.py
Source:test_ipam.py
...65 try:66 self.neutron_client.list_ports(67 network_id=networks['networks'][0]['id'])68 except Exception as e:69 self.docker_client.disconnect_container_from_network(70 container_id,71 container_net_id)72 message = ("Failed to list neutron ports: %s")73 self.fail(message % e.args[0])74 # Disconnect container from network, this release ip address.75 self.docker_client.disconnect_container_from_network(container_id,76 container_net_id)77 # Delete docker network, if no endpoint, will release the pool78 # and delete the subnetpool in Neutron.79 self.docker_client.stop(container=container_id)80 self.docker_client.remove_network(container_net_id)81 subnetpools = self.neutron_client.list_subnetpools(name=pool_name)82 self.assertEqual(0, len(subnetpools['subnetpools']))83 def test_container_ipam_request_address(self):84 fake_ipam = {85 "Driver": "kuryr",86 "Options": {},87 "Config": [88 {89 "Subnet": "10.12.0.0/16",90 "IPRange": "10.12.0.0/24",91 "Gateway": "10.12.0.1"92 }93 ]94 }95 container_net_name = lib_utils.get_random_string(8)96 container_net = self.docker_client.create_network(97 name=container_net_name,98 driver='kuryr',99 ipam=fake_ipam)100 container_net_id = container_net.get('Id')101 try:102 networks = self.neutron_client.list_networks(103 tags=utils.make_net_tags(container_net_id))104 except Exception as e:105 self.docker_client.remove_network(container_net_id)106 message = ("Failed to list neutron networks: %s")107 self.fail(message % e.args[0])108 # Currently we cannot get IPAM pool from docker client.109 pool_name = "kuryrPool-" + "10.12.0.0/24"110 subnetpools = self.neutron_client.list_subnetpools(name=pool_name)111 self.assertEqual(1, len(subnetpools['subnetpools']))112 subnets = self.neutron_client.list_subnets(113 network_id=networks['networks'][0]['id'],114 cidr="10.12.0.0/24")115 self.assertEqual(1, len(subnets['subnets']))116 # Boot a container, and connect to the docker network.117 container_name = lib_utils.get_random_string(8)118 container = self.docker_client.create_container(119 image='kuryr/busybox',120 command='/bin/sleep 600',121 hostname='kuryr_test_container',122 name=container_name)123 warn_msg = container.get('Warning')124 container_id = container.get('Id')125 self.assertIsNone(warn_msg, 'Warn in creating container')126 self.assertIsNotNone(container_id, 'Create container id must not '127 'be None')128 self.docker_client.start(container=container_id)129 self.docker_client.connect_container_to_network(container_id,130 container_net_id)131 try:132 ports = self.neutron_client.list_ports(133 network_id=networks['networks'][0]['id'])134 except Exception as e:135 self.docker_client.disconnect_container_from_network(136 container_id,137 container_net_id)138 message = ("Failed to list neutron ports: %s")139 self.fail(message % e.args[0])140 # DHCP port and container endpoint141 self.assertEqual(2, len(ports['ports']))142 # Find the kuryr port143 kuryr_port_param = {"network_id": networks['networks'][0]['id']}144 kuryr_ports = self.neutron_client.list_ports(145 **kuryr_port_param)146 kuryr_port = [port for port in kuryr_ports['ports'] if147 (lib_const.DEVICE_OWNER in port['tags'] or148 port['name'] ==149 utils.get_neutron_port_name(port['device_id']))]150 self.assertEqual(1, len(kuryr_port))151 # Disconnect container from network, this release ip address.152 self.docker_client.disconnect_container_from_network(container_id,153 container_net_id)154 # Cleanup resources155 self.docker_client.stop(container=container_id)156 self.docker_client.remove_network(container_net_id)157 def test_container_ipam_release_address(self):158 fake_ipam = {159 "Driver": "kuryr",160 "Options": {},161 "Config": [162 {163 "Subnet": "10.13.0.0/16",164 "IPRange": "10.13.0.0/24",165 "Gateway": "10.13.0.1"166 }167 ]168 }169 container_net_name = lib_utils.get_random_string(8)170 container_net = self.docker_client.create_network(171 name=container_net_name,172 driver='kuryr',173 ipam=fake_ipam)174 container_net_id = container_net.get('Id')175 try:176 networks = self.neutron_client.list_networks(177 tags=utils.make_net_tags(container_net_id))178 except Exception as e:179 self.docker_client.remove_network(container_net_id)180 message = ("Failed to list neutron networks: %s")181 self.fail(message % e.args[0])182 self.assertEqual(1, len(networks['networks']))183 # Boot a container, and connect to the docker network.184 container_name = lib_utils.get_random_string(8)185 container = self.docker_client.create_container(186 image='kuryr/busybox',187 command='/bin/sleep 600',188 hostname='kuryr_test_container',189 name=container_name)190 warn_msg = container.get('Warning')191 container_id = container.get('Id')192 self.assertIsNone(warn_msg, 'Warn in creating container')193 self.assertIsNotNone(container_id, 'Create container id must not '194 'be None')195 self.docker_client.start(container=container_id)196 self.docker_client.connect_container_to_network(container_id,197 container_net_id)198 try:199 ports = self.neutron_client.list_ports(200 network_id=networks['networks'][0]['id'])201 except Exception as e:202 self.docker_client.disconnect_container_from_network(203 container_id,204 container_net_id)205 message = ("Failed to list neutron ports: %s")206 self.fail(message % e.args[0])207 # A dhcp port gets created as well; dhcp is enabled by default208 self.assertEqual(2, len(ports['ports']))209 # Find the kuryr port210 kuryr_port_param = {"network_id": networks['networks'][0]['id']}211 kuryr_ports = self.neutron_client.list_ports(212 **kuryr_port_param)213 kuryr_port = [port for port in kuryr_ports['ports'] if214 (lib_const.DEVICE_OWNER in port['tags'] or215 port['name'] ==216 utils.get_neutron_port_name(port['device_id']))]217 self.assertEqual(1, len(kuryr_port))218 # Disconnect container from network, this release ip address.219 self.docker_client.disconnect_container_from_network(container_id,220 container_net_id)221 ports = self.neutron_client.list_ports(222 network_id=networks['networks'][0]['id'])223 # DHCP port leave behind.224 self.assertEqual(1, len(ports['ports']))225 kuryr_ports = self.neutron_client.list_ports(226 **kuryr_port_param)227 kuryr_port = [port for port in kuryr_ports['ports'] if228 (lib_const.DEVICE_OWNER in port['tags'] or229 port['name'] ==230 utils.get_neutron_port_name(port['device_id']))]231 self.assertEqual(0, len(kuryr_port))232 # Cleanup resources233 self.docker_client.stop(container=container_id)234 self.docker_client.remove_network(container_net_id)235 def test_container_ipam_release_address_with_existing_network(self):236 # pre-created Neutron network and subnet237 neutron_net_name = lib_utils.get_random_string(8)238 neutron_network = self.neutron_client.create_network(239 {'network': {'name': neutron_net_name,240 "admin_state_up": True}})241 neutron_subnet_name = lib_utils.get_random_string(8)242 subnet_param = [{243 'name': neutron_subnet_name,244 'network_id': neutron_network['network']['id'],245 'ip_version': 4,246 'cidr': "10.10.0.0/24",247 'enable_dhcp': True,248 }]249 self.neutron_client.create_subnet({'subnets': subnet_param})250 fake_ipam = {251 "Driver": "kuryr",252 "Options": {},253 "Config": [254 {255 "Subnet": "10.10.0.0/16",256 "IPRange": "10.10.0.0/24",257 "Gateway": "10.10.0.1"258 }259 ]260 }261 # Create docker network using existing Neutron network262 options = {'neutron.net.name': neutron_net_name}263 container_net_name = lib_utils.get_random_string(8)264 container_net = self.docker_client.create_network(265 name=container_net_name,266 driver='kuryr',267 options=options,268 ipam=fake_ipam)269 container_net_id = container_net.get('Id')270 try:271 networks = self.neutron_client.list_networks(272 tags=utils.make_net_tags(container_net_id))273 except Exception as e:274 self.docker_client.remove_network(container_net_id)275 message = ("Failed to list neutron networks: %s")276 self.fail(message % e.args[0])277 self.assertEqual(1, len(networks['networks']))278 # Boot a container, and connect to the docker network.279 container_name = lib_utils.get_random_string(8)280 container = self.docker_client.create_container(281 image='kuryr/busybox',282 command='/bin/sleep 600',283 hostname='kuryr_test_container',284 name=container_name)285 warn_msg = container.get('Warning')286 container_id = container.get('Id')287 self.assertIsNone(warn_msg, 'Warn in creating container')288 self.assertIsNotNone(container_id, 'Create container id must not '289 'be None')290 self.docker_client.start(container=container_id)291 self.docker_client.connect_container_to_network(container_id,292 container_net_id)293 try:294 ports = self.neutron_client.list_ports(295 network_id=neutron_network['network']['id'])296 except Exception as e:297 self.docker_client.disconnect_container_from_network(298 container_id,299 container_net_id)300 message = ("Failed to list neutron ports: %s")301 self.fail(message % e.args[0])302 # A dhcp port gets created as well; dhcp is enabled by default303 self.assertEqual(2, len(ports['ports']))304 # Find the kuryr port305 kuryr_port_param = {"network_id": neutron_network['network']['id']}306 kuryr_ports = self.neutron_client.list_ports(307 **kuryr_port_param)308 kuryr_port = [port for port in kuryr_ports['ports'] if309 (lib_const.DEVICE_OWNER in port['tags'] or310 port['name'] ==311 utils.get_neutron_port_name(port['device_id']))]312 self.assertEqual(1, len(kuryr_port))313 # Disconnect container from network, this release ip address.314 self.docker_client.disconnect_container_from_network(container_id,315 container_net_id)316 ports = self.neutron_client.list_ports(317 network_id=neutron_network['network']['id'])318 self.assertEqual(1, len(ports['ports']))319 kuryr_ports = self.neutron_client.list_ports(320 **kuryr_port_param)321 kuryr_port = [port for port in kuryr_ports['ports'] if322 (lib_const.DEVICE_OWNER in port['tags'] or323 port['name'] ==324 utils.get_neutron_port_name(port['device_id']))]325 self.assertEqual(0, len(kuryr_port))326 # Cleanup resources327 self.docker_client.stop(container=container_id)328 self.docker_client.remove_network(container_net_id)329 self.neutron_client.delete_network(neutron_network['network']['id'])330 def test_container_ipam_request_address_with_existing_port(self):331 # pre-created Neutron network and subnet and port332 neutron_net_name = lib_utils.get_random_string(8)333 neutron_network = self.neutron_client.create_network(334 {'network': {'name': neutron_net_name,335 "admin_state_up": True}})336 neutron_subnet_name = lib_utils.get_random_string(8)337 subnet_param = [{338 'name': neutron_subnet_name,339 'network_id': neutron_network['network']['id'],340 'ip_version': 4,341 'cidr': "10.14.0.0/24",342 }]343 neutron_subnet = self.neutron_client.create_subnet(344 {'subnets': subnet_param})345 neutron_v6_subnet_name = lib_utils.get_random_string(8)346 v6_subnet_param = [{347 'name': neutron_v6_subnet_name,348 'network_id': neutron_network['network']['id'],349 'ip_version': 6,350 'cidr': "fe81::/64",351 }]352 neutron_v6_subnet = self.neutron_client.create_subnet(353 {'subnets': v6_subnet_param})354 existing_neutron_port = self.neutron_client.create_port(355 {'port': {'network_id': neutron_network['network']['id']}})356 fixed_ips = {fip['subnet_id']: fip['ip_address']357 for fip in existing_neutron_port['port']['fixed_ips']}358 ipv4_address = fixed_ips[neutron_subnet['subnets'][0]['id']]359 ipv6_address = fixed_ips[neutron_v6_subnet['subnets'][0]['id']]360 fake_ipam = {361 "Driver": "kuryr",362 "Options": {},363 "Config": [364 {365 "Subnet": "10.14.0.0/24",366 "Gateway": "10.14.0.1"367 },368 {369 "Subnet": "fe81::/64",370 "Gateway": "fe81::1"371 },372 ]373 }374 # Create docker network using existing Neutron network375 options = {'neutron.net.name': neutron_net_name}376 container_net_name = lib_utils.get_random_string(8)377 container_net = self.docker_client.create_network(378 name=container_net_name,379 driver='kuryr',380 enable_ipv6=True,381 options=options,382 ipam=fake_ipam)383 container_net_id = container_net.get('Id')384 try:385 networks = self.neutron_client.list_networks(386 tags=utils.make_net_tags(container_net_id))387 except Exception as e:388 self.docker_client.remove_network(container_net_id)389 message = ("Failed to list neutron networks: %s")390 self.fail(message % e.args[0])391 self.assertEqual(1, len(networks['networks']))392 # Boot a container, and connect to the docker network.393 container_name = lib_utils.get_random_string(8)394 container = self.docker_client.create_container(395 image='kuryr/busybox',396 command='/bin/sleep 600',397 hostname='kuryr_test_container',398 name=container_name)399 warn_msg = container.get('Warning')400 container_id = container.get('Id')401 self.assertIsNone(warn_msg, 'Warn in creating container')402 self.assertIsNotNone(container_id, 'Create container id must not '403 'be None')404 self.docker_client.start(container=container_id)405 self.docker_client.connect_container_to_network(406 container_id, container_net_id, ipv4_address=ipv4_address,407 ipv6_address=ipv6_address)408 try:409 ports = self.neutron_client.list_ports(410 network_id=neutron_network['network']['id'])411 except Exception as e:412 self.docker_client.disconnect_container_from_network(413 container_id,414 container_net_id)415 message = ("Failed to list neutron ports: %s")416 self.fail(message % e.args[0])417 # A dhcp port gets created as well; dhcp is enabled by default418 self.assertEqual(2, len(ports['ports']))419 # Find the existing neutron port420 neutron_port_param = {"network_id": neutron_network['network']['id']}421 neutron_ports = self.neutron_client.list_ports(422 **neutron_port_param)423 neutron_port = [port for port in neutron_ports['ports'] if424 (const.KURYR_EXISTING_NEUTRON_PORT in port['tags'] or425 port['name'] ==426 utils.get_neutron_port_name(port['device_id']))]427 self.assertEqual(1, len(neutron_port))428 # Disconnect container from network.429 self.docker_client.disconnect_container_from_network(container_id,430 container_net_id)431 ports = self.neutron_client.list_ports(432 network_id=neutron_network['network']['id'])433 self.assertEqual(2, len(ports['ports']))434 neutron_ports = self.neutron_client.list_ports(435 **neutron_port_param)436 neutron_port = [port for port in neutron_ports['ports'] if437 (const.KURYR_EXISTING_NEUTRON_PORT in port['tags'] or438 port['name'] ==439 utils.get_neutron_port_name(port['device_id']))]440 self.assertEqual(0, len(neutron_port))441 # Cleanup resources442 self.docker_client.stop(container=container_id)443 self.docker_client.remove_network(container_net_id)444 self.neutron_client.delete_port(existing_neutron_port['port']['id'])445 self.neutron_client.delete_subnet(neutron_subnet['subnets'][0]['id'])446 self.neutron_client.delete_subnet(447 neutron_v6_subnet['subnets'][0]['id'])448 self.neutron_client.delete_network(neutron_network['network']['id'])449 def test_container_ipam_release_address_with_existing_port_same_ip(self):450 ipv4_address = "10.15.0.10"451 # pre-created the first Neutron network and subnet and port452 neutron_net_name = lib_utils.get_random_string(8)453 neutron_network = self.neutron_client.create_network(454 {'network': {'name': neutron_net_name,455 "admin_state_up": True}})456 neutron_subnet_name = lib_utils.get_random_string(8)457 subnet_param = [{458 'name': neutron_subnet_name,459 'network_id': neutron_network['network']['id'],460 'ip_version': 4,461 'cidr': "10.15.0.0/24",462 }]463 neutron_subnet = self.neutron_client.create_subnet(464 {'subnets': subnet_param})465 existing_neutron_port = self.neutron_client.create_port(466 {'port': {'network_id': neutron_network['network']['id'],467 'fixed_ips': [{'ip_address': ipv4_address}]}})468 fake_ipam = {469 "Driver": "kuryr",470 "Options": {471 'neutron.subnet.name': neutron_subnet_name472 },473 "Config": [474 {475 "Subnet": "10.15.0.0/24",476 "Gateway": "10.15.0.1"477 }478 ]479 }480 # Create docker network using existing Neutron network481 options = {'neutron.net.name': neutron_net_name,482 'neutron.subnet.name': neutron_subnet_name}483 container_net_name = lib_utils.get_random_string(8)484 container_net = self.docker_client.create_network(485 name=container_net_name,486 driver='kuryr',487 options=options,488 ipam=fake_ipam)489 container_net_id = container_net.get('Id')490 # pre-created the second Neutron network and subnet and port491 neutron_net_name2 = lib_utils.get_random_string(8)492 neutron_network2 = self.neutron_client.create_network(493 {'network': {'name': neutron_net_name2,494 "admin_state_up": True}})495 neutron_subnet_name2 = lib_utils.get_random_string(8)496 subnet_param2 = [{497 'name': neutron_subnet_name2,498 'network_id': neutron_network2['network']['id'],499 'ip_version': 4,500 'cidr': "10.15.0.0/24",501 }]502 neutron_subnet2 = self.neutron_client.create_subnet(503 {'subnets': subnet_param2})504 existing_neutron_port2 = self.neutron_client.create_port(505 {'port': {'network_id': neutron_network2['network']['id'],506 'fixed_ips': [{'ip_address': ipv4_address}]}})507 fake_ipam2 = {508 "Driver": "kuryr",509 "Options": {510 'neutron.subnet.name': neutron_subnet_name2511 },512 "Config": [513 {514 "Subnet": "10.15.0.0/24",515 "Gateway": "10.15.0.1"516 }517 ]518 }519 # Create docker network using existing Neutron network520 options = {'neutron.net.name': neutron_net_name2,521 'neutron.subnet.name': neutron_subnet_name2}522 container_net_name2 = lib_utils.get_random_string(8)523 container_net2 = self.docker_client.create_network(524 name=container_net_name2,525 driver='kuryr',526 options=options,527 ipam=fake_ipam2)528 container_net_id2 = container_net2.get('Id')529 # Boot the first container, and connect to the first docker network.530 endpoint_config = self.docker_client.create_endpoint_config(531 ipv4_address=ipv4_address)532 network_config = self.docker_client.create_networking_config({533 container_net_id: endpoint_config})534 container_name = lib_utils.get_random_string(8)535 container = self.docker_client.create_container(536 image='kuryr/busybox',537 command='/bin/sleep 600',538 hostname='kuryr_test_container',539 name=container_name,540 networking_config=network_config)541 container_id = container.get('Id')542 self.docker_client.start(container=container_id)543 # Boot the second container, and connect to the second docker network.544 endpoint_config = self.docker_client.create_endpoint_config(545 ipv4_address=ipv4_address)546 network_config = self.docker_client.create_networking_config({547 container_net_id2: endpoint_config})548 container_name2 = lib_utils.get_random_string(8)549 container2 = self.docker_client.create_container(550 image='kuryr/busybox',551 command='/bin/sleep 600',552 hostname='kuryr_test_container2',553 name=container_name2,554 networking_config=network_config)555 container_id2 = container2.get('Id')556 self.docker_client.start(container=container_id2)557 # Assert both existing neutron ports active558 for port_id in (existing_neutron_port['port']['id'],559 existing_neutron_port2['port']['id']):560 utils.wait_for_port_active(561 self.neutron_client, port_id, 60)562 neutron_port = self.neutron_client.show_port(port_id)563 self.assertEqual('ACTIVE', neutron_port['port']['status'])564 # Disconnect the first container from network and565 # assert the first neutron port is down and the second is still active566 self.docker_client.disconnect_container_from_network(container_id,567 container_net_id)568 existing_neutron_port = self.neutron_client.show_port(569 existing_neutron_port['port']['id'])570 self.assertEqual('DOWN', existing_neutron_port['port']['status'])571 existing_neutron_port2 = self.neutron_client.show_port(572 existing_neutron_port2['port']['id'])573 self.assertEqual('ACTIVE', existing_neutron_port2['port']['status'])574 # Disconnect the second container from network and575 # assert both neutron ports are down.576 self.docker_client.disconnect_container_from_network(container_id2,577 container_net_id2)578 for port_id in (existing_neutron_port['port']['id'],579 existing_neutron_port2['port']['id']):580 neutron_port = self.neutron_client.show_port(port_id)581 self.assertEqual('DOWN', neutron_port['port']['status'])582 # Cleanup resources583 self.docker_client.stop(container=container_id)584 self.docker_client.stop(container=container_id2)585 self.docker_client.remove_network(container_net_id)586 self.docker_client.remove_network(container_net_id2)587 self.neutron_client.delete_port(existing_neutron_port['port']['id'])588 self.neutron_client.delete_port(existing_neutron_port2['port']['id'])589 self.neutron_client.delete_subnet(neutron_subnet['subnets'][0]['id'])590 self.neutron_client.delete_subnet(neutron_subnet2['subnets'][0]['id'])...
api_network_test.py
Source:api_network_test.py
...82 container['Id']83 ]84 with pytest.raises(docker.errors.APIError):85 self.client.connect_container_to_network(container, net_id)86 self.client.disconnect_container_from_network(container, net_id)87 network_data = self.client.inspect_network(net_id)88 assert not network_data.get('Containers')89 with pytest.raises(docker.errors.APIError):90 self.client.disconnect_container_from_network(container, net_id)91 @requires_api_version('1.22')92 def test_connect_and_force_disconnect_container(self):93 net_name, net_id = self.create_network()94 container = self.client.create_container(TEST_IMG, 'top')95 self.tmp_containers.append(container)96 self.client.start(container)97 network_data = self.client.inspect_network(net_id)98 assert not network_data.get('Containers')99 self.client.connect_container_to_network(container, net_id)100 network_data = self.client.inspect_network(net_id)101 assert list(network_data['Containers'].keys()) == \102 [container['Id']]103 self.client.disconnect_container_from_network(container, net_id, True)104 network_data = self.client.inspect_network(net_id)105 assert not network_data.get('Containers')106 with pytest.raises(docker.errors.APIError):107 self.client.disconnect_container_from_network(108 container, net_id, force=True109 )110 @requires_api_version('1.22')111 def test_connect_with_aliases(self):112 net_name, net_id = self.create_network()113 container = self.client.create_container(TEST_IMG, 'top')114 self.tmp_containers.append(container)115 self.client.start(container)116 self.client.connect_container_to_network(117 container, net_id, aliases=['foo', 'bar'])118 container_data = self.client.inspect_container(container)119 aliases = (120 container_data['NetworkSettings']['Networks'][net_name]['Aliases']121 )122 assert 'foo' in aliases123 assert 'bar' in aliases124 def test_connect_on_container_create(self):125 net_name, net_id = self.create_network()126 container = self.client.create_container(127 image=TEST_IMG,128 command='top',129 host_config=self.client.create_host_config(network_mode=net_name),130 )131 self.tmp_containers.append(container)132 self.client.start(container)133 network_data = self.client.inspect_network(net_id)134 assert list(network_data['Containers'].keys()) == \135 [container['Id']]136 self.client.disconnect_container_from_network(container, net_id)137 network_data = self.client.inspect_network(net_id)138 assert not network_data.get('Containers')139 @requires_api_version('1.22')140 def test_create_with_aliases(self):141 net_name, net_id = self.create_network()142 container = self.client.create_container(143 image=TEST_IMG,144 command='top',145 host_config=self.client.create_host_config(146 network_mode=net_name,147 ),148 networking_config=self.client.create_networking_config({149 net_name: self.client.create_endpoint_config(150 aliases=['foo', 'bar'],151 ),152 }),153 )154 self.tmp_containers.append(container)155 self.client.start(container)156 container_data = self.client.inspect_container(container)157 aliases = (158 container_data['NetworkSettings']['Networks'][net_name]['Aliases']159 )160 assert 'foo' in aliases161 assert 'bar' in aliases162 @requires_api_version('1.22')163 def test_create_with_ipv4_address(self):164 net_name, net_id = self.create_network(165 ipam=IPAMConfig(166 driver='default',167 pool_configs=[IPAMPool(subnet="132.124.0.0/16")],168 ),169 )170 container = self.client.create_container(171 image=TEST_IMG, command='top',172 host_config=self.client.create_host_config(network_mode=net_name),173 networking_config=self.client.create_networking_config({174 net_name: self.client.create_endpoint_config(175 ipv4_address='132.124.0.23'176 )177 })178 )179 self.tmp_containers.append(container)180 self.client.start(container)181 net_settings = self.client.inspect_container(container)[182 'NetworkSettings'183 ]184 assert net_settings['Networks'][net_name]['IPAMConfig']['IPv4Address']\185 == '132.124.0.23'186 @requires_api_version('1.22')187 def test_create_with_ipv6_address(self):188 net_name, net_id = self.create_network(189 ipam=IPAMConfig(190 driver='default',191 pool_configs=[IPAMPool(subnet="2001:389::1/64")],192 ),193 )194 container = self.client.create_container(195 image=TEST_IMG, command='top',196 host_config=self.client.create_host_config(network_mode=net_name),197 networking_config=self.client.create_networking_config({198 net_name: self.client.create_endpoint_config(199 ipv6_address='2001:389::f00d'200 )201 })202 )203 self.tmp_containers.append(container)204 self.client.start(container)205 net_settings = self.client.inspect_container(container)[206 'NetworkSettings'207 ]208 assert net_settings['Networks'][net_name]['IPAMConfig']['IPv6Address']\209 == '2001:389::f00d'210 @requires_api_version('1.24')211 def test_create_with_linklocal_ips(self):212 container = self.client.create_container(213 TEST_IMG, 'top',214 networking_config=self.client.create_networking_config(215 {216 'bridge': self.client.create_endpoint_config(217 link_local_ips=['169.254.8.8']218 )219 }220 ),221 host_config=self.client.create_host_config(network_mode='bridge')222 )223 self.tmp_containers.append(container)224 self.client.start(container)225 container_data = self.client.inspect_container(container)226 net_cfg = container_data['NetworkSettings']['Networks']['bridge']227 assert 'IPAMConfig' in net_cfg228 assert 'LinkLocalIPs' in net_cfg['IPAMConfig']229 assert net_cfg['IPAMConfig']['LinkLocalIPs'] == ['169.254.8.8']230 @requires_api_version('1.32')231 def test_create_with_driveropt(self):232 container = self.client.create_container(233 TEST_IMG, 'top',234 networking_config=self.client.create_networking_config(235 {236 'bridge': self.client.create_endpoint_config(237 driver_opt={'com.docker-py.setting': 'on'}238 )239 }240 ),241 host_config=self.client.create_host_config(network_mode='bridge')242 )243 self.tmp_containers.append(container)244 self.client.start(container)245 container_data = self.client.inspect_container(container)246 net_cfg = container_data['NetworkSettings']['Networks']['bridge']247 assert 'DriverOpts' in net_cfg248 assert 'com.docker-py.setting' in net_cfg['DriverOpts']249 assert net_cfg['DriverOpts']['com.docker-py.setting'] == 'on'250 @requires_api_version('1.22')251 def test_create_with_links(self):252 net_name, net_id = self.create_network()253 container = self.create_and_start(254 host_config=self.client.create_host_config(network_mode=net_name),255 networking_config=self.client.create_networking_config({256 net_name: self.client.create_endpoint_config(257 links=[('docker-py-test-upstream', 'bar')],258 ),259 }),260 )261 net_settings = self.client.inspect_container(container)[262 'NetworkSettings'263 ]264 assert net_settings['Networks'][net_name]['Links'] == [265 'docker-py-test-upstream:bar'266 ]267 self.create_and_start(268 name='docker-py-test-upstream',269 host_config=self.client.create_host_config(network_mode=net_name),270 )271 self.execute(container, ['nslookup', 'bar'])272 def test_create_check_duplicate(self):273 net_name, net_id = self.create_network()274 with pytest.raises(docker.errors.APIError):275 self.client.create_network(net_name, check_duplicate=True)276 net_id = self.client.create_network(net_name, check_duplicate=False)277 self.tmp_networks.append(net_id['Id'])278 @requires_api_version('1.22')279 def test_connect_with_links(self):280 net_name, net_id = self.create_network()281 container = self.create_and_start(282 host_config=self.client.create_host_config(network_mode=net_name))283 self.client.disconnect_container_from_network(container, net_name)284 self.client.connect_container_to_network(285 container, net_name,286 links=[('docker-py-test-upstream', 'bar')])287 net_settings = self.client.inspect_container(container)[288 'NetworkSettings'289 ]290 assert net_settings['Networks'][net_name]['Links'] == [291 'docker-py-test-upstream:bar'292 ]293 self.create_and_start(294 name='docker-py-test-upstream',295 host_config=self.client.create_host_config(network_mode=net_name),296 )297 self.execute(container, ['nslookup', 'bar'])298 @requires_api_version('1.22')299 def test_connect_with_ipv4_address(self):300 net_name, net_id = self.create_network(301 ipam=IPAMConfig(302 driver='default',303 pool_configs=[304 IPAMPool(305 subnet="172.28.0.0/16", iprange="172.28.5.0/24",306 gateway="172.28.5.254"307 )308 ]309 )310 )311 container = self.create_and_start(312 host_config=self.client.create_host_config(network_mode=net_name))313 self.client.disconnect_container_from_network(container, net_name)314 self.client.connect_container_to_network(315 container, net_name, ipv4_address='172.28.5.24'316 )317 container_data = self.client.inspect_container(container)318 net_data = container_data['NetworkSettings']['Networks'][net_name]319 assert net_data['IPAMConfig']['IPv4Address'] == '172.28.5.24'320 @requires_api_version('1.22')321 def test_connect_with_ipv6_address(self):322 net_name, net_id = self.create_network(323 ipam=IPAMConfig(324 driver='default',325 pool_configs=[326 IPAMPool(327 subnet="2001:389::1/64", iprange="2001:389::0/96",328 gateway="2001:389::ffff"329 )330 ]331 )332 )333 container = self.create_and_start(334 host_config=self.client.create_host_config(network_mode=net_name))335 self.client.disconnect_container_from_network(container, net_name)336 self.client.connect_container_to_network(337 container, net_name, ipv6_address='2001:389::f00d'338 )339 container_data = self.client.inspect_container(container)340 net_data = container_data['NetworkSettings']['Networks'][net_name]341 assert net_data['IPAMConfig']['IPv6Address'] == '2001:389::f00d'342 @requires_api_version('1.23')343 def test_create_internal_networks(self):344 _, net_id = self.create_network(internal=True)345 net = self.client.inspect_network(net_id)346 assert net['Internal'] is True347 @requires_api_version('1.23')348 def test_create_network_with_labels(self):349 _, net_id = self.create_network(labels={...
test_docker_compose.py
Source:test_docker_compose.py
...59 caplog.set_level("INFO")60 with docker_compose_cm(docker_compose_yml=other_docker_compose_yml) as docker_compose:61 container = next(iter(docker_compose))62 network = network_name_from_yml(other_docker_compose_yml)63 disconnect_container_from_network(container=container, network=network)64 # Connecting multiple times is idempotent65 connect_container_to_network(container=container, network=network)66 assert f"Connected {container} to network {network}." in caplog.text67 connect_container_to_network(container=container, network=network)68 assert f"Unable to connect {container} to network {network}." in caplog.text69def test_disconnect_container_from_network(docker_compose_cm, other_docker_compose_yml, caplog):70 caplog.set_level("INFO")71 with docker_compose_cm(docker_compose_yml=other_docker_compose_yml) as docker_compose:72 container = next(iter(docker_compose))73 network = network_name_from_yml(other_docker_compose_yml)74 # Disconnecting multiple times is idempotent75 disconnect_container_from_network(container=container, network=network)76 assert f"Disconnected {container} from network {network}." in caplog.text77 disconnect_container_from_network(container=container, network=network)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!