How to use create_container method in localstack

Best Python code snippet using localstack_python

test_allocation.py

Source:test_allocation.py Github

copy

Full Screen

1def test_inactive_agent(super_client, new_context):2 host = super_client.reload(new_context.host)3 agent = host.agent()4 c = new_context.create_container()5 assert c.state == "running"6 agent = super_client.wait_success(agent.deactivate())7 assert agent.state == "inactive"8 c = new_context.create_container_no_success()9 assert c.transitioning == "error"10 assert c.transitioningMessage == "Allocation failed: No healthy hosts with sufficient " "resources available"11 assert c.state == "error"12def test_allocation_with_shared_storage_pool(super_client, new_context):13 count = 314 client = new_context.client15 host2 = register_simulated_host(client)16 register_simulated_host(client)17 hosts = [new_context.host, host2]18 hosts = wait_all_success(super_client, hosts)19 sp = add_storage_pool(new_context, [new_context.host.uuid, host2.uuid])20 sp_name = sp.name21 for h in hosts:22 assert h.state == "active"23 assert h.agent().state == "active"24 assert len(h.storagePools()) == 225 assert h.storagePools()[0].state == "active"26 assert h.storagePools()[1].state == "active"27 v1 = client.create_volume(name=random_str(), driver=sp_name)28 v1 = client.wait_success(v1)29 assert v1.state == "inactive"30 data_volume_mounts = {"/con/path": v1.id}31 containers = []32 for _ in range(len(hosts) * count):33 c = client.create_container(imageUuid=new_context.image_uuid, dataVolumeMounts=data_volume_mounts)34 containers.append(c)35 time.sleep(1)36 wait_all_success(super_client, containers, timeout=60)37 for c in containers:38 new_context.wait_for_state(c, "running")39def test_allocate_to_host_with_pool(new_context, super_client):40 client = new_context.client41 host = new_context.host42 host2 = register_simulated_host(client)43 sp = add_storage_pool(new_context)44 sp_name = sp.name45 assert len(host.storagePools()) == 246 assert len(host2.storagePools()) == 147 c = new_context.create_container_no_success(imageUuid=new_context.image_uuid, volumeDriver=sp_name, requestedHostId=host2.id, dataVolume=["vol1:/con/path"])48 c = super_client.reload(c)49 assert c.state == "error"50 assert c.transitioning == "error"51 assert c.transitioningMessage.startswith("Allocation failed: valid host(s) [")52def test_allocation_stay_associated_to_host(super_client, context):53 c = context.create_container()54 c = context.client.wait_success(c.stop())55 assert c.state == "stopped"56 assert len(c.hosts()) == 157def test_port_constraint(new_context):58 host1 = new_context.host59 client = new_context.client60 image_uuid = new_context.image_uuid61 containers = []62 try:63 c = client.wait_success(client.create_container(imageUuid=image_uuid, requestedHostId=host1.id, ports=["8081:81/tcp"]))64 containers.append(c)65 c2 = client.wait_transitioning(client.create_container(imageUuid=image_uuid, ports=["8081:81/tcp"]))66 assert c2.transitioning == "error"67 assert "Allocation failed: host needs ports 8081/tcp available" in c2.transitioningMessage68 assert c2.state == "error"69 c3 = new_context.super_create_container(imageUuid=image_uuid, ports=["8082:81/tcp"])70 containers.append(c3)71 c4 = client.wait_success(client.create_container(imageUuid=image_uuid, ports=["8081:81/udp"]))72 containers.append(c4)73 c5 = client.wait_transitioning(client.create_container(imageUuid=image_uuid, ports=["8081:81/udp"]))74 assert c5.transitioning == "error"75 assert "Allocation failed: host needs ports 8081/udp available" in c5.transitioningMessage76 assert c5.state == "error"77 c6 = client.wait_success(client.create_container(imageUuid=image_uuid, requestedHostId=host1.id, ports=["127.2.2.2:8081:81/tcp"]))78 containers.append(c6)79 c7 = client.wait_transitioning(client.create_container(imageUuid=image_uuid, ports=["127.2.2.2:8081:81/tcp"]))80 assert c7.transitioning == "error"81 assert "Allocation failed: host needs ports 8081/tcp available" in c7.transitioningMessage82 assert c7.state == "error"83 host2 = register_simulated_host(new_context.client)84 c8 = client.wait_success(client.create_container(imageUuid=image_uuid, ports=["8081:81/tcp"]))85 assert c8.hosts()[0].id == host2.id86 containers.append(c8)87 finally:88 for c in containers:89 if c is not None:90 new_context.delete(c)91def test_conflicting_ports_in_deployment_unit(new_context):92 client = new_context.client93 image_uuid = new_context.image_uuid94 client.wait_success(client.create_container(name="reset", imageUuid=image_uuid))95 env = client.create_stack(name=random_str())96 env = client.wait_success(env)97 assert env.state == "active"98 launch_config = {"imageUuid": image_uuid, "ports": ["5555:6666"]}99 secondary_lc = {"imageUuid": image_uuid, "name": "secondary", "ports": ["5555:6666"]}100 svc = client.create_service(name=random_str(), stackId=env.id, launchConfig=launch_config, secondaryLaunchConfigs=[secondary_lc])101 svc = client.wait_success(svc)102 assert svc.state == "inactive"103 svc = svc.activate()104 c = _wait_for_compose_instance_error(client, svc, env)105 assert "Port 5555/tcp requested more than once." in c.transitioningMessage106 env.remove()107def test_simultaneous_port_allocation(new_context):108 client = new_context.client109 image_uuid = new_context.image_uuid110 env = client.create_stack(name=random_str())111 env = client.wait_success(env)112 assert env.state == "active"113 launch_config = {"imageUuid": image_uuid, "ports": ["5555:6666"]}114 svc = client.create_service(name=random_str(), stackId=env.id, launchConfig=launch_config, scale=2)115 svc = client.wait_success(svc)116 assert svc.state == "inactive"117 svc = svc.activate()118 c = _wait_for_compose_instance_error(client, svc, env)119 assert "host needs ports 5555/tcp available" in c.transitioningMessage120def _wait_for_compose_instance_error(client, service, env):121 name = env.name + "-" + service.name + "%"122 def check():123 containers = client.list_container(name_like=name, state="error")124 if len(containers) > 0:125 return containers[0]126 container = wait_for(check)127 return container128def test_request_host_override(new_context):129 host = new_context.host130 c = None131 c2 = None132 try:133 c = new_context.super_create_container(validHostIds=[host.id], ports=["8081:81/tcp"])134 c2 = new_context.super_create_container(requestedHostId=host.id, ports=["8081:81/tcp"])135 finally:136 if c is not None:137 new_context.delete(c)138 if c2 is not None:139 new_context.delete(c2)140def test_host_affinity(super_client, new_context):141 host = new_context.host142 host2 = register_simulated_host(new_context)143 host = super_client.update(host, labels={"size": "huge", "latency": "long"})144 host2 = super_client.update(host2, labels={"size": "tiny", "latency": "short"})145 containers = []146 try:147 c = new_context.create_container(environment={"constraint:size==huge": ""})148 assert c.hosts()[0].id == host.id149 containers.append(c)150 c = new_context.create_container(labels={"io.rancher.scheduler.affinity:host_label": "size=huge"})151 assert c.hosts()[0].id == host.id152 containers.append(c)153 c = new_context.create_container(environment={"constraint:size!=huge": ""})154 assert c.hosts()[0].id == host2.id155 containers.append(c)156 c = new_context.create_container(labels={"io.rancher.scheduler.affinity:host_label_ne": "size=huge"})157 assert c.hosts()[0].id == host2.id158 containers.append(c)159 c = new_context.create_container(environment={"constraint:size==huge": "", "constraint:latency==~short": ""})160 assert c.hosts()[0].id == host.id161 containers.append(c)162 c = new_context.create_container(labels={"io.rancher.scheduler.affinity:host_label": "size=huge", "io.rancher.scheduler.affinity:host_label_soft_ne": "latency=short"})163 assert c.hosts()[0].id == host.id164 containers.append(c)165 c = new_context.create_container(environment={"constraint:latency!=~long": ""})166 assert c.hosts()[0].id == host2.id167 containers.append(c)168 c = new_context.create_container(labels={"io.rancher.scheduler.affinity:host_label_soft_ne": "latency=long"})169 assert c.hosts()[0].id == host2.id170 containers.append(c)171 finally:172 for c in containers:173 new_context.delete(c)174def test_container_affinity(new_context):175 register_simulated_host(new_context)176 containers = []177 try:178 name1 = "affinity" + random_str()179 c1 = new_context.create_container(name=name1)180 containers.append(c1)181 c2 = new_context.create_container(environment={"affinity:container==" + name1: ""})182 containers.append(c2)183 assert c2.hosts()[0].id == c1.hosts()[0].id184 c3 = new_context.create_container(labels={"io.rancher.scheduler.affinity:container": name1})185 containers.append(c3)186 assert c3.hosts()[0].id == c1.hosts()[0].id187 c4 = new_context.create_container(environment={"affinity:container==" + c1.uuid: ""})188 containers.append(c4)189 assert c4.hosts()[0].id == c1.hosts()[0].id190 c5 = new_context.create_container(labels={"io.rancher.scheduler.affinity:container": c1.uuid})191 containers.append(c5)192 assert c5.hosts()[0].id == c1.hosts()[0].id193 c6 = new_context.create_container(environment={"affinity:container!=" + name1: ""})194 containers.append(c6)195 assert c6.hosts()[0].id != c1.hosts()[0].id196 c7 = new_context.create_container(labels={"io.rancher.scheduler.affinity:container_ne": name1})197 containers.append(c7)198 assert c7.hosts()[0].id != c1.hosts()[0].id199 finally:200 for c in containers:201 new_context.delete(c)202def test_container_label_affinity(new_context):203 register_simulated_host(new_context)204 containers = []205 try:206 c1_label = random_str()207 c1 = new_context.create_container(labels={"foo": c1_label})208 containers.append(c1)209 c2 = new_context.create_container(environment={"affinity:foo==" + c1_label: ""})210 containers.append(c2)211 assert c2.hosts()[0].id == c1.hosts()[0].id212 c3 = new_context.create_container(labels={"io.rancher.scheduler.affinity:container_label": "foo=" + c1_label})213 containers.append(c3)214 assert c3.hosts()[0].id == c1.hosts()[0].id215 c4_label = random_str()216 c4 = new_context.create_container(environment={"affinity:foo!=" + c1_label: ""}, labels={"foo": c4_label})217 containers.append(c4)218 assert c4.hosts()[0].id != c1.hosts()[0].id219 c5 = new_context.create_container(environment={"affinity:foo!=" + c1_label: "", "affinity:foo!=~" + c4_label: ""})220 containers.append(c5)221 assert c5.hosts()[0].id == c4.hosts()[0].id222 c6 = new_context.create_container(environment={"affinity:foo!=" + c1_label: ""}, labels={"io.rancher.scheduler.affinity:container_label_soft_ne": "foo=" + c4_label})223 containers.append(c6)224 assert c6.hosts()[0].id == c4.hosts()[0].id225 finally:226 for c in containers:227 new_context.delete(c)228def test_volumes_from_constraint(new_context):229 register_simulated_host(new_context)230 register_simulated_host(new_context)231 containers = []232 try:233 c1 = new_context.create_container_no_success(startOnCreate=False)234 c2 = new_context.create_container_no_success(startOnCreate=False, dataVolumesFrom=[c1.id])235 c1 = c1.start()236 c2 = c2.start()237 c1 = new_context.wait_for_state(c1, "running")238 c2 = new_context.wait_for_state(c2, "running")239 containers.append(c1)240 containers.append(c2)241 assert c1.hosts()[0].id == c2.hosts()[0].id242 c3 = new_context.create_container_no_success(startOnCreate=False)243 c4 = new_context.create_container_no_success(startOnCreate=False, dataVolumesFrom=[c3.id])244 c4 = c4.start()245 c4 = new_context.client.wait_transitioning(c4)246 assert c4.transitioning == "error"247 assert c4.transitioningMessage == "volumeFrom instance is not " "running : Dependencies readiness" " error"248 finally:249 for c in containers:250 new_context.delete(c)251def test_network_mode_constraint(new_context):252 client = new_context.client253 register_simulated_host(new_context)254 register_simulated_host(new_context)255 containers = []256 try:257 c1 = new_context.create_container(startOnCreate=False)258 c2 = new_context.create_container(startOnCreate=False, networkMode="container", networkContainerId=c1.id)259 c1 = client.wait_success(c1.start())260 c2 = client.wait_success(c2.start())261 assert c1.state == "running"262 containers.append(c1)263 assert c1.state == "running"264 containers.append(c2)265 assert c1.hosts()[0].id == c2.hosts()[0].id266 finally:267 for c in containers:...

Full Screen

Full Screen

test_link.py

Source:test_link.py Github

copy

Full Screen

1from common_fixtures import *2def test_link_instance_stop_start(super_client, client, context):3 target1 = context.create_container(ports=["180", "122/udp"])4 target2 = context.create_container(ports=["280", "222/udp"])5 c = context.create_container(instanceLinks={"target1_link": target1.id, "target2_link": target2.id})6 assert c.state == "running"7 assert len(c.instanceLinks()) > 08def _find_agent_instance_ip(nsp, source):9 assert source is not None10 vnet_id = source.nics()[0].vnetId11 assert vnet_id is not None12 for agent_instance in nsp.instances():13 if agent_instance.nics()[0].vnetId == vnet_id:14 assert agent_instance.primaryIpAddress is not None15 return agent_instance.primaryIpAddress16 assert False, "Failed to find agent instance for " + source.id17def test_link_create(client, super_client, context):18 target1 = context.create_container(ports=["180", "122/udp"])19 target2 = context.create_container(ports=["280", "222/udp"])20 c = context.create_container(instanceLinks={"target1_link": target1.id, "target2_link": target2.id})21 assert c.state == "running"22 assert len(c.instanceLinks()) == 223 assert len(target1.targetInstanceLinks()) == 124 assert len(target2.targetInstanceLinks()) == 125 links = c.instanceLinks()26 names = set([x.linkName for x in links])27 assert names == set(["target1_link", "target2_link"])28def test_link_update(client, context):29 target1 = context.create_container()30 target2 = context.create_container()31 c = context.create_container(instanceLinks={"target1_link": target1.id})32 link = c.instanceLinks()[0]33 assert link.targetInstanceId == target1.id34 link.targetInstanceId = target2.id35 link = client.update(link, link)36 assert link.state == "updating-active"37 link = client.wait_success(link)38 assert link.targetInstanceId == target2.id39 assert link.state == "active"40def test_link_remove(client, context):41 target1 = context.create_container()42 c = client.create_container(imageUuid=context.image_uuid, startOnCreate=False, instanceLinks={"target1_link": target1.id})43 c = client.wait_success(c)44 links = c.instanceLinks()45 assert len(links) == 146 link = links[0]47 assert link.state == "inactive"48 c = client.wait_success(c.start())49 link = client.reload(link)50 assert c.state == "running"51 assert link.state == "active"52 c = client.wait_success(c.stop())53 link = client.reload(link)54 assert c.state == "stopped"55 assert link.state == "inactive"56 c = client.wait_success(client.delete(c))57 link = client.reload(link)58 assert c.state == "removed"59 assert link.state != "active"60def test_null_links(context):61 c = context.create_container(instanceLinks={"null_link": None})62 links = c.instanceLinks()63 assert len(links) == 164 assert links[0].state == "active"65 assert links[0].linkName == "null_link"66 assert links[0].targetInstanceId is None67def test_link_timeout(super_client, client, context):68 t = client.create_container(imageUuid=context.image_uuid, startOnCreate=False)69 c = super_client.create_container(accountId=context.project.id, imageUuid=context.image_uuid, instanceLinks={"t": t.id}, data={"linkWaitTime": 100})70 c = client.wait_transitioning(c)71 assert c.state == "running"72def test_link_remove_instance_restart(client, super_client, context):73 target1 = context.create_container()74 c = client.create_container(imageUuid=context.image_uuid, startOnCreate=False, instanceLinks={"target1_link": target1.id})75 c = client.wait_success(c)76 links = c.instanceLinks()77 assert len(links) == 178 link = links[0]79 assert link.state == "inactive"80 c = client.wait_success(c.start())81 link = client.reload(link)82 assert c.state == "running"83 assert link.state == "active"84 c = client.wait_success(c.stop())85 assert c.state == "stopped"86 link = client.reload(link)87 link = super_client.wait_success(link.remove())88 assert link.state == "removed"...

Full Screen

Full Screen

test_container.py

Source:test_container.py Github

copy

Full Screen

...10from src.exceptions import CoffeeMachineException11from src.models.container import Container12from src.utils import Refillable13@fixture()14def create_container(request) -> Container:15 """Create Container object for testing."""16 try:17 _container_capacity = request.param18 except AttributeError:19 _container_capacity = TestContainer.capacity_default20 _container = Container(capacity=_container_capacity)21 return _container22class TestContainer:23 capacity_default: Refillable = 100024 def test_initialization(self, create_container: Container) -> None:25 """Test Container object initialization"""26 pass27 def test_initial_attribute_values(self, create_container: Container) -> None:28 """Test checking the initial attribute values of the Container"""...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful