How to use get_service_name method in localstack

Best Python code snippet using localstack_python

api_service_test.py

Source:api_service_test.py Github

copy

Full Screen

...22 self.client.remove_service(service['ID'])23 except docker.errors.APIError:24 pass25 super().tearDown()26 def get_service_name(self):27 return f'dockerpytest_{random.getrandbits(64):x}'28 def get_service_container(self, service_name, attempts=20, interval=0.5,29 include_stopped=False):30 # There is some delay between the service's creation and the creation31 # of the service's containers. This method deals with the uncertainty32 # when trying to retrieve the container associated with a service.33 while True:34 containers = self.client.containers(35 filters={'name': [service_name]}, quiet=True,36 all=include_stopped37 )38 if len(containers) > 0:39 return containers[0]40 attempts -= 141 if attempts <= 0:42 return None43 time.sleep(interval)44 def create_simple_service(self, name=None, labels=None):45 if name:46 name = f'dockerpytest_{name}'47 else:48 name = self.get_service_name()49 container_spec = docker.types.ContainerSpec(50 TEST_IMG, ['echo', 'hello']51 )52 task_tmpl = docker.types.TaskTemplate(container_spec)53 return name, self.client.create_service(54 task_tmpl, name=name, labels=labels55 )56 @requires_api_version('1.24')57 def test_list_services(self):58 services = self.client.services()59 assert isinstance(services, list)60 test_services = self.client.services(filters={'name': 'dockerpytest_'})61 assert len(test_services) == 062 self.create_simple_service()63 test_services = self.client.services(filters={'name': 'dockerpytest_'})64 assert len(test_services) == 165 assert 'dockerpytest_' in test_services[0]['Spec']['Name']66 @requires_api_version('1.24')67 def test_list_services_filter_by_label(self):68 test_services = self.client.services(filters={'label': 'test_label'})69 assert len(test_services) == 070 self.create_simple_service(labels={'test_label': 'testing'})71 test_services = self.client.services(filters={'label': 'test_label'})72 assert len(test_services) == 173 assert test_services[0]['Spec']['Labels']['test_label'] == 'testing'74 def test_inspect_service_by_id(self):75 svc_name, svc_id = self.create_simple_service()76 svc_info = self.client.inspect_service(svc_id)77 assert 'ID' in svc_info78 assert svc_info['ID'] == svc_id['ID']79 def test_inspect_service_by_name(self):80 svc_name, svc_id = self.create_simple_service()81 svc_info = self.client.inspect_service(svc_name)82 assert 'ID' in svc_info83 assert svc_info['ID'] == svc_id['ID']84 @requires_api_version('1.29')85 def test_inspect_service_insert_defaults(self):86 svc_name, svc_id = self.create_simple_service()87 svc_info = self.client.inspect_service(svc_id)88 svc_info_defaults = self.client.inspect_service(89 svc_id, insert_defaults=True90 )91 assert svc_info != svc_info_defaults92 assert 'RollbackConfig' in svc_info_defaults['Spec']93 assert 'RollbackConfig' not in svc_info['Spec']94 def test_remove_service_by_id(self):95 svc_name, svc_id = self.create_simple_service()96 assert self.client.remove_service(svc_id)97 test_services = self.client.services(filters={'name': 'dockerpytest_'})98 assert len(test_services) == 099 def test_remove_service_by_name(self):100 svc_name, svc_id = self.create_simple_service()101 assert self.client.remove_service(svc_name)102 test_services = self.client.services(filters={'name': 'dockerpytest_'})103 assert len(test_services) == 0104 def test_create_service_simple(self):105 name, svc_id = self.create_simple_service()106 assert self.client.inspect_service(svc_id)107 services = self.client.services(filters={'name': name})108 assert len(services) == 1109 assert services[0]['ID'] == svc_id['ID']110 @requires_api_version('1.25')111 @requires_experimental(until='1.29')112 def test_service_logs(self):113 name, svc_id = self.create_simple_service()114 assert self.get_service_container(name, include_stopped=True)115 attempts = 20116 while True:117 if attempts == 0:118 self.fail('No service logs produced by endpoint')119 return120 logs = self.client.service_logs(svc_id, stdout=True, is_tty=False)121 try:122 log_line = next(logs)123 except StopIteration:124 attempts -= 1125 time.sleep(0.1)126 continue127 else:128 break129 if log_line is not None:130 log_line = log_line.decode('utf-8')131 assert 'hello\n' in log_line132 def test_create_service_custom_log_driver(self):133 container_spec = docker.types.ContainerSpec(134 TEST_IMG, ['echo', 'hello']135 )136 log_cfg = docker.types.DriverConfig('none')137 task_tmpl = docker.types.TaskTemplate(138 container_spec, log_driver=log_cfg139 )140 name = self.get_service_name()141 svc_id = self.client.create_service(task_tmpl, name=name)142 svc_info = self.client.inspect_service(svc_id)143 assert 'TaskTemplate' in svc_info['Spec']144 res_template = svc_info['Spec']['TaskTemplate']145 assert 'LogDriver' in res_template146 assert 'Name' in res_template['LogDriver']147 assert res_template['LogDriver']['Name'] == 'none'148 def test_create_service_with_volume_mount(self):149 vol_name = self.get_service_name()150 container_spec = docker.types.ContainerSpec(151 TEST_IMG, ['ls'],152 mounts=[153 docker.types.Mount(target='/test', source=vol_name)154 ]155 )156 self.tmp_volumes.append(vol_name)157 task_tmpl = docker.types.TaskTemplate(container_spec)158 name = self.get_service_name()159 svc_id = self.client.create_service(task_tmpl, name=name)160 svc_info = self.client.inspect_service(svc_id)161 assert 'ContainerSpec' in svc_info['Spec']['TaskTemplate']162 cspec = svc_info['Spec']['TaskTemplate']['ContainerSpec']163 assert 'Mounts' in cspec164 assert len(cspec['Mounts']) == 1165 mount = cspec['Mounts'][0]166 assert mount['Target'] == '/test'167 assert mount['Source'] == vol_name168 assert mount['Type'] == 'volume'169 def test_create_service_with_resources_constraints(self):170 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])171 resources = docker.types.Resources(172 cpu_limit=4000000, mem_limit=3 * 1024 * 1024 * 1024,173 cpu_reservation=3500000, mem_reservation=2 * 1024 * 1024 * 1024174 )175 task_tmpl = docker.types.TaskTemplate(176 container_spec, resources=resources177 )178 name = self.get_service_name()179 svc_id = self.client.create_service(task_tmpl, name=name)180 svc_info = self.client.inspect_service(svc_id)181 assert 'TaskTemplate' in svc_info['Spec']182 res_template = svc_info['Spec']['TaskTemplate']183 assert 'Resources' in res_template184 assert res_template['Resources']['Limits'] == resources['Limits']185 assert res_template['Resources']['Reservations'] == resources[186 'Reservations'187 ]188 def _create_service_with_generic_resources(self, generic_resources):189 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])190 resources = docker.types.Resources(191 generic_resources=generic_resources192 )193 task_tmpl = docker.types.TaskTemplate(194 container_spec, resources=resources195 )196 name = self.get_service_name()197 svc_id = self.client.create_service(task_tmpl, name=name)198 return resources, self.client.inspect_service(svc_id)199 @requires_api_version('1.32')200 def test_create_service_with_generic_resources(self):201 successful = [{202 'input': [203 {'DiscreteResourceSpec': {'Kind': 'gpu', 'Value': 1}},204 {'NamedResourceSpec': {'Kind': 'gpu', 'Value': 'test'}}205 ]}, {206 'input': {'gpu': 2, 'mpi': 'latest'},207 'expected': [208 {'DiscreteResourceSpec': {'Kind': 'gpu', 'Value': 2}},209 {'NamedResourceSpec': {'Kind': 'mpi', 'Value': 'latest'}}210 ]}211 ]212 for test in successful:213 t = test['input']214 resrcs, svc_info = self._create_service_with_generic_resources(t)215 assert 'TaskTemplate' in svc_info['Spec']216 res_template = svc_info['Spec']['TaskTemplate']217 assert 'Resources' in res_template218 res_reservations = res_template['Resources']['Reservations']219 assert res_reservations == resrcs['Reservations']220 assert 'GenericResources' in res_reservations221 def _key(d, specs=('DiscreteResourceSpec', 'NamedResourceSpec')):222 return [d.get(s, {}).get('Kind', '') for s in specs]223 actual = res_reservations['GenericResources']224 expected = test.get('expected', test['input'])225 assert sorted(actual, key=_key) == sorted(expected, key=_key)226 @requires_api_version('1.32')227 def test_create_service_with_invalid_generic_resources(self):228 for test_input in ['1', 1.0, lambda: '1', {1, 2}]:229 with pytest.raises(docker.errors.InvalidArgument):230 self._create_service_with_generic_resources(test_input)231 def test_create_service_with_update_config(self):232 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])233 task_tmpl = docker.types.TaskTemplate(container_spec)234 update_config = docker.types.UpdateConfig(235 parallelism=10, delay=5, failure_action='pause'236 )237 name = self.get_service_name()238 svc_id = self.client.create_service(239 task_tmpl, update_config=update_config, name=name240 )241 svc_info = self.client.inspect_service(svc_id)242 assert 'UpdateConfig' in svc_info['Spec']243 uc = svc_info['Spec']['UpdateConfig']244 assert update_config['Parallelism'] == uc['Parallelism']245 assert update_config['Delay'] == uc['Delay']246 assert update_config['FailureAction'] == uc['FailureAction']247 @requires_api_version('1.28')248 def test_create_service_with_failure_action_rollback(self):249 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])250 task_tmpl = docker.types.TaskTemplate(container_spec)251 update_config = docker.types.UpdateConfig(failure_action='rollback')252 name = self.get_service_name()253 svc_id = self.client.create_service(254 task_tmpl, update_config=update_config, name=name255 )256 svc_info = self.client.inspect_service(svc_id)257 assert 'UpdateConfig' in svc_info['Spec']258 uc = svc_info['Spec']['UpdateConfig']259 assert update_config['FailureAction'] == uc['FailureAction']260 @requires_api_version('1.25')261 def test_create_service_with_update_config_monitor(self):262 container_spec = docker.types.ContainerSpec('busybox', ['true'])263 task_tmpl = docker.types.TaskTemplate(container_spec)264 update_config = docker.types.UpdateConfig(265 monitor=300000000, max_failure_ratio=0.4266 )267 name = self.get_service_name()268 svc_id = self.client.create_service(269 task_tmpl, update_config=update_config, name=name270 )271 svc_info = self.client.inspect_service(svc_id)272 assert 'UpdateConfig' in svc_info['Spec']273 uc = svc_info['Spec']['UpdateConfig']274 assert update_config['Monitor'] == uc['Monitor']275 assert update_config['MaxFailureRatio'] == uc['MaxFailureRatio']276 @requires_api_version('1.28')277 def test_create_service_with_rollback_config(self):278 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])279 task_tmpl = docker.types.TaskTemplate(container_spec)280 rollback_cfg = docker.types.RollbackConfig(281 parallelism=10, delay=5, failure_action='pause',282 monitor=300000000, max_failure_ratio=0.4283 )284 name = self.get_service_name()285 svc_id = self.client.create_service(286 task_tmpl, rollback_config=rollback_cfg, name=name287 )288 svc_info = self.client.inspect_service(svc_id)289 assert 'RollbackConfig' in svc_info['Spec']290 rc = svc_info['Spec']['RollbackConfig']291 assert rollback_cfg['Parallelism'] == rc['Parallelism']292 assert rollback_cfg['Delay'] == rc['Delay']293 assert rollback_cfg['FailureAction'] == rc['FailureAction']294 assert rollback_cfg['Monitor'] == rc['Monitor']295 assert rollback_cfg['MaxFailureRatio'] == rc['MaxFailureRatio']296 def test_create_service_with_restart_policy(self):297 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])298 policy = docker.types.RestartPolicy(299 docker.types.RestartPolicy.condition_types.ANY,300 delay=5, max_attempts=5301 )302 task_tmpl = docker.types.TaskTemplate(303 container_spec, restart_policy=policy304 )305 name = self.get_service_name()306 svc_id = self.client.create_service(task_tmpl, name=name)307 svc_info = self.client.inspect_service(svc_id)308 assert 'RestartPolicy' in svc_info['Spec']['TaskTemplate']309 assert policy == svc_info['Spec']['TaskTemplate']['RestartPolicy']310 def test_create_service_with_custom_networks(self):311 net1 = self.client.create_network(312 'dockerpytest_1', driver='overlay', ipam={'Driver': 'default'}313 )314 self.tmp_networks.append(net1['Id'])315 net2 = self.client.create_network(316 'dockerpytest_2', driver='overlay', ipam={'Driver': 'default'}317 )318 self.tmp_networks.append(net2['Id'])319 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])320 task_tmpl = docker.types.TaskTemplate(container_spec)321 name = self.get_service_name()322 svc_id = self.client.create_service(323 task_tmpl, name=name, networks=[324 'dockerpytest_1', {'Target': 'dockerpytest_2'}325 ]326 )327 svc_info = self.client.inspect_service(svc_id)328 assert 'Networks' in svc_info['Spec']329 assert svc_info['Spec']['Networks'] == [330 {'Target': net1['Id']}, {'Target': net2['Id']}331 ]332 def test_create_service_with_network_attachment_config(self):333 network = self.client.create_network(334 'dockerpytest_1', driver='overlay', ipam={'Driver': 'default'}335 )336 self.tmp_networks.append(network['Id'])337 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])338 network_config = docker.types.NetworkAttachmentConfig(339 target='dockerpytest_1',340 aliases=['dockerpytest_1_alias'],341 options={342 'foo': 'bar'343 }344 )345 task_tmpl = docker.types.TaskTemplate(346 container_spec,347 networks=[network_config]348 )349 name = self.get_service_name()350 svc_id = self.client.create_service(351 task_tmpl, name=name352 )353 svc_info = self.client.inspect_service(svc_id)354 assert 'Networks' in svc_info['Spec']['TaskTemplate']355 service_networks_info = svc_info['Spec']['TaskTemplate']['Networks']356 assert len(service_networks_info) == 1357 assert service_networks_info[0]['Target'] == network['Id']358 assert service_networks_info[0]['Aliases'] == ['dockerpytest_1_alias']359 assert service_networks_info[0]['DriverOpts'] == {'foo': 'bar'}360 def test_create_service_with_placement(self):361 node_id = self.client.nodes()[0]['ID']362 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])363 task_tmpl = docker.types.TaskTemplate(364 container_spec, placement=[f'node.id=={node_id}']365 )366 name = self.get_service_name()367 svc_id = self.client.create_service(task_tmpl, name=name)368 svc_info = self.client.inspect_service(svc_id)369 assert 'Placement' in svc_info['Spec']['TaskTemplate']370 assert (svc_info['Spec']['TaskTemplate']['Placement'] ==371 {'Constraints': [f'node.id=={node_id}']})372 def test_create_service_with_placement_object(self):373 node_id = self.client.nodes()[0]['ID']374 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])375 placemt = docker.types.Placement(376 constraints=[f'node.id=={node_id}']377 )378 task_tmpl = docker.types.TaskTemplate(379 container_spec, placement=placemt380 )381 name = self.get_service_name()382 svc_id = self.client.create_service(task_tmpl, name=name)383 svc_info = self.client.inspect_service(svc_id)384 assert 'Placement' in svc_info['Spec']['TaskTemplate']385 assert svc_info['Spec']['TaskTemplate']['Placement'] == placemt386 @requires_api_version('1.30')387 def test_create_service_with_placement_platform(self):388 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])389 placemt = docker.types.Placement(platforms=[('x86_64', 'linux')])390 task_tmpl = docker.types.TaskTemplate(391 container_spec, placement=placemt392 )393 name = self.get_service_name()394 svc_id = self.client.create_service(task_tmpl, name=name)395 svc_info = self.client.inspect_service(svc_id)396 assert 'Placement' in svc_info['Spec']['TaskTemplate']397 assert svc_info['Spec']['TaskTemplate']['Placement'] == placemt398 @requires_api_version('1.27')399 def test_create_service_with_placement_preferences(self):400 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])401 placemt = docker.types.Placement(preferences=[402 {'Spread': {'SpreadDescriptor': 'com.dockerpy.test'}}403 ])404 task_tmpl = docker.types.TaskTemplate(405 container_spec, placement=placemt406 )407 name = self.get_service_name()408 svc_id = self.client.create_service(task_tmpl, name=name)409 svc_info = self.client.inspect_service(svc_id)410 assert 'Placement' in svc_info['Spec']['TaskTemplate']411 assert svc_info['Spec']['TaskTemplate']['Placement'] == placemt412 @requires_api_version('1.27')413 def test_create_service_with_placement_preferences_tuple(self):414 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])415 placemt = docker.types.Placement(preferences=(416 ('spread', 'com.dockerpy.test'),417 ))418 task_tmpl = docker.types.TaskTemplate(419 container_spec, placement=placemt420 )421 name = self.get_service_name()422 svc_id = self.client.create_service(task_tmpl, name=name)423 svc_info = self.client.inspect_service(svc_id)424 assert 'Placement' in svc_info['Spec']['TaskTemplate']425 assert svc_info['Spec']['TaskTemplate']['Placement'] == placemt426 @requires_api_version('1.40')427 def test_create_service_with_placement_maxreplicas(self):428 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])429 placemt = docker.types.Placement(maxreplicas=1)430 task_tmpl = docker.types.TaskTemplate(431 container_spec, placement=placemt432 )433 name = self.get_service_name()434 svc_id = self.client.create_service(task_tmpl, name=name)435 svc_info = self.client.inspect_service(svc_id)436 assert 'Placement' in svc_info['Spec']['TaskTemplate']437 assert svc_info['Spec']['TaskTemplate']['Placement'] == placemt438 def test_create_service_with_endpoint_spec(self):439 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])440 task_tmpl = docker.types.TaskTemplate(container_spec)441 name = self.get_service_name()442 endpoint_spec = docker.types.EndpointSpec(ports={443 12357: (1990, 'udp'),444 12562: (678,),445 53243: 8080,446 })447 svc_id = self.client.create_service(448 task_tmpl, name=name, endpoint_spec=endpoint_spec449 )450 svc_info = self.client.inspect_service(svc_id)451 ports = svc_info['Spec']['EndpointSpec']['Ports']452 for port in ports:453 if port['PublishedPort'] == 12562:454 assert port['TargetPort'] == 678455 assert port['Protocol'] == 'tcp'456 elif port['PublishedPort'] == 53243:457 assert port['TargetPort'] == 8080458 assert port['Protocol'] == 'tcp'459 elif port['PublishedPort'] == 12357:460 assert port['TargetPort'] == 1990461 assert port['Protocol'] == 'udp'462 else:463 self.fail(f'Invalid port specification: {port}')464 assert len(ports) == 3465 @requires_api_version('1.32')466 def test_create_service_with_endpoint_spec_host_publish_mode(self):467 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])468 task_tmpl = docker.types.TaskTemplate(container_spec)469 name = self.get_service_name()470 endpoint_spec = docker.types.EndpointSpec(ports={471 12357: (1990, None, 'host'),472 })473 svc_id = self.client.create_service(474 task_tmpl, name=name, endpoint_spec=endpoint_spec475 )476 svc_info = self.client.inspect_service(svc_id)477 ports = svc_info['Spec']['EndpointSpec']['Ports']478 assert len(ports) == 1479 port = ports[0]480 assert port['PublishedPort'] == 12357481 assert port['TargetPort'] == 1990482 assert port['Protocol'] == 'tcp'483 assert port['PublishMode'] == 'host'484 def test_create_service_with_env(self):485 container_spec = docker.types.ContainerSpec(486 TEST_IMG, ['true'], env={'DOCKER_PY_TEST': 1}487 )488 task_tmpl = docker.types.TaskTemplate(489 container_spec,490 )491 name = self.get_service_name()492 svc_id = self.client.create_service(task_tmpl, name=name)493 svc_info = self.client.inspect_service(svc_id)494 assert 'TaskTemplate' in svc_info['Spec']495 assert 'ContainerSpec' in svc_info['Spec']['TaskTemplate']496 con_spec = svc_info['Spec']['TaskTemplate']['ContainerSpec']497 assert 'Env' in con_spec498 assert con_spec['Env'] == ['DOCKER_PY_TEST=1']499 @requires_api_version('1.29')500 def test_create_service_with_update_order(self):501 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])502 task_tmpl = docker.types.TaskTemplate(container_spec)503 update_config = docker.types.UpdateConfig(504 parallelism=10, delay=5, order='start-first'505 )506 name = self.get_service_name()507 svc_id = self.client.create_service(508 task_tmpl, update_config=update_config, name=name509 )510 svc_info = self.client.inspect_service(svc_id)511 assert 'UpdateConfig' in svc_info['Spec']512 uc = svc_info['Spec']['UpdateConfig']513 assert update_config['Parallelism'] == uc['Parallelism']514 assert update_config['Delay'] == uc['Delay']515 assert update_config['Order'] == uc['Order']516 @requires_api_version('1.25')517 def test_create_service_with_tty(self):518 container_spec = docker.types.ContainerSpec(519 TEST_IMG, ['true'], tty=True520 )521 task_tmpl = docker.types.TaskTemplate(522 container_spec,523 )524 name = self.get_service_name()525 svc_id = self.client.create_service(task_tmpl, name=name)526 svc_info = self.client.inspect_service(svc_id)527 assert 'TaskTemplate' in svc_info['Spec']528 assert 'ContainerSpec' in svc_info['Spec']['TaskTemplate']529 con_spec = svc_info['Spec']['TaskTemplate']['ContainerSpec']530 assert 'TTY' in con_spec531 assert con_spec['TTY'] is True532 @requires_api_version('1.25')533 def test_create_service_with_tty_dict(self):534 container_spec = {535 'Image': TEST_IMG,536 'Command': ['true'],537 'TTY': True538 }539 task_tmpl = docker.types.TaskTemplate(container_spec)540 name = self.get_service_name()541 svc_id = self.client.create_service(task_tmpl, name=name)542 svc_info = self.client.inspect_service(svc_id)543 assert 'TaskTemplate' in svc_info['Spec']544 assert 'ContainerSpec' in svc_info['Spec']['TaskTemplate']545 con_spec = svc_info['Spec']['TaskTemplate']['ContainerSpec']546 assert 'TTY' in con_spec547 assert con_spec['TTY'] is True548 def test_create_service_global_mode(self):549 container_spec = docker.types.ContainerSpec(550 TEST_IMG, ['echo', 'hello']551 )552 task_tmpl = docker.types.TaskTemplate(container_spec)553 name = self.get_service_name()554 svc_id = self.client.create_service(555 task_tmpl, name=name, mode='global'556 )557 svc_info = self.client.inspect_service(svc_id)558 assert 'Mode' in svc_info['Spec']559 assert 'Global' in svc_info['Spec']['Mode']560 def test_create_service_replicated_mode(self):561 container_spec = docker.types.ContainerSpec(562 TEST_IMG, ['echo', 'hello']563 )564 task_tmpl = docker.types.TaskTemplate(container_spec)565 name = self.get_service_name()566 svc_id = self.client.create_service(567 task_tmpl, name=name,568 mode=docker.types.ServiceMode('replicated', 5)569 )570 svc_info = self.client.inspect_service(svc_id)571 assert 'Mode' in svc_info['Spec']572 assert 'Replicated' in svc_info['Spec']['Mode']573 assert svc_info['Spec']['Mode']['Replicated'] == {'Replicas': 5}574 @requires_api_version('1.25')575 def test_update_service_force_update(self):576 container_spec = docker.types.ContainerSpec(577 'busybox', ['echo', 'hello']578 )579 task_tmpl = docker.types.TaskTemplate(container_spec)580 name = self.get_service_name()581 svc_id = self.client.create_service(task_tmpl, name=name)582 svc_info = self.client.inspect_service(svc_id)583 assert 'TaskTemplate' in svc_info['Spec']584 assert 'ForceUpdate' in svc_info['Spec']['TaskTemplate']585 assert svc_info['Spec']['TaskTemplate']['ForceUpdate'] == 0586 version_index = svc_info['Version']['Index']587 task_tmpl = docker.types.TaskTemplate(container_spec, force_update=10)588 self.client.update_service(name, version_index, task_tmpl, name=name)589 svc_info = self.client.inspect_service(svc_id)590 new_index = svc_info['Version']['Index']591 assert new_index > version_index592 assert svc_info['Spec']['TaskTemplate']['ForceUpdate'] == 10593 @requires_api_version('1.25')594 def test_create_service_with_secret(self):595 secret_name = 'favorite_touhou'596 secret_data = b'phantasmagoria of flower view'597 secret_id = self.client.create_secret(secret_name, secret_data)598 self.tmp_secrets.append(secret_id)599 secret_ref = docker.types.SecretReference(secret_id, secret_name)600 container_spec = docker.types.ContainerSpec(601 'busybox', ['sleep', '999'], secrets=[secret_ref]602 )603 task_tmpl = docker.types.TaskTemplate(container_spec)604 name = self.get_service_name()605 svc_id = self.client.create_service(task_tmpl, name=name)606 svc_info = self.client.inspect_service(svc_id)607 assert 'Secrets' in svc_info['Spec']['TaskTemplate']['ContainerSpec']608 secrets = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Secrets']609 assert secrets[0] == secret_ref610 container = self.get_service_container(name)611 assert container is not None612 exec_id = self.client.exec_create(613 container, f'cat /run/secrets/{secret_name}'614 )615 assert self.client.exec_start(exec_id) == secret_data616 @requires_api_version('1.25')617 def test_create_service_with_unicode_secret(self):618 secret_name = 'favorite_touhou'619 secret_data = '東方花映塚'620 secret_id = self.client.create_secret(secret_name, secret_data)621 self.tmp_secrets.append(secret_id)622 secret_ref = docker.types.SecretReference(secret_id, secret_name)623 container_spec = docker.types.ContainerSpec(624 'busybox', ['sleep', '999'], secrets=[secret_ref]625 )626 task_tmpl = docker.types.TaskTemplate(container_spec)627 name = self.get_service_name()628 svc_id = self.client.create_service(task_tmpl, name=name)629 svc_info = self.client.inspect_service(svc_id)630 assert 'Secrets' in svc_info['Spec']['TaskTemplate']['ContainerSpec']631 secrets = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Secrets']632 assert secrets[0] == secret_ref633 container = self.get_service_container(name)634 assert container is not None635 exec_id = self.client.exec_create(636 container, f'cat /run/secrets/{secret_name}'637 )638 container_secret = self.client.exec_start(exec_id)639 container_secret = container_secret.decode('utf-8')640 assert container_secret == secret_data641 @requires_api_version('1.30')642 def test_create_service_with_config(self):643 config_name = 'favorite_touhou'644 config_data = b'phantasmagoria of flower view'645 config_id = self.client.create_config(config_name, config_data)646 self.tmp_configs.append(config_id)647 config_ref = docker.types.ConfigReference(config_id, config_name)648 container_spec = docker.types.ContainerSpec(649 'busybox', ['sleep', '999'], configs=[config_ref]650 )651 task_tmpl = docker.types.TaskTemplate(container_spec)652 name = self.get_service_name()653 svc_id = self.client.create_service(task_tmpl, name=name)654 svc_info = self.client.inspect_service(svc_id)655 assert 'Configs' in svc_info['Spec']['TaskTemplate']['ContainerSpec']656 configs = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Configs']657 assert configs[0] == config_ref658 container = self.get_service_container(name)659 assert container is not None660 exec_id = self.client.exec_create(661 container, f'cat /{config_name}'662 )663 assert self.client.exec_start(exec_id) == config_data664 @requires_api_version('1.30')665 def test_create_service_with_unicode_config(self):666 config_name = 'favorite_touhou'667 config_data = '東方花映塚'668 config_id = self.client.create_config(config_name, config_data)669 self.tmp_configs.append(config_id)670 config_ref = docker.types.ConfigReference(config_id, config_name)671 container_spec = docker.types.ContainerSpec(672 'busybox', ['sleep', '999'], configs=[config_ref]673 )674 task_tmpl = docker.types.TaskTemplate(container_spec)675 name = self.get_service_name()676 svc_id = self.client.create_service(task_tmpl, name=name)677 svc_info = self.client.inspect_service(svc_id)678 assert 'Configs' in svc_info['Spec']['TaskTemplate']['ContainerSpec']679 configs = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Configs']680 assert configs[0] == config_ref681 container = self.get_service_container(name)682 assert container is not None683 exec_id = self.client.exec_create(684 container, f'cat /{config_name}'685 )686 container_config = self.client.exec_start(exec_id)687 container_config = container_config.decode('utf-8')688 assert container_config == config_data689 @requires_api_version('1.25')690 def test_create_service_with_hosts(self):691 container_spec = docker.types.ContainerSpec(692 'busybox', ['sleep', '999'], hosts={693 'foobar': '127.0.0.1',694 'baz': '8.8.8.8',695 }696 )697 task_tmpl = docker.types.TaskTemplate(container_spec)698 name = self.get_service_name()699 svc_id = self.client.create_service(task_tmpl, name=name)700 svc_info = self.client.inspect_service(svc_id)701 assert 'Hosts' in svc_info['Spec']['TaskTemplate']['ContainerSpec']702 hosts = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Hosts']703 assert len(hosts) == 2704 assert '127.0.0.1 foobar' in hosts705 assert '8.8.8.8 baz' in hosts706 @requires_api_version('1.25')707 def test_create_service_with_hostname(self):708 container_spec = docker.types.ContainerSpec(709 'busybox', ['sleep', '999'], hostname='foobar.baz.com'710 )711 task_tmpl = docker.types.TaskTemplate(container_spec)712 name = self.get_service_name()713 svc_id = self.client.create_service(task_tmpl, name=name)714 svc_info = self.client.inspect_service(svc_id)715 assert 'Hostname' in svc_info['Spec']['TaskTemplate']['ContainerSpec']716 assert (717 svc_info['Spec']['TaskTemplate']['ContainerSpec']['Hostname'] ==718 'foobar.baz.com'719 )720 @requires_api_version('1.25')721 def test_create_service_with_groups(self):722 container_spec = docker.types.ContainerSpec(723 'busybox', ['sleep', '999'], groups=['shrinemaidens', 'youkais']724 )725 task_tmpl = docker.types.TaskTemplate(container_spec)726 name = self.get_service_name()727 svc_id = self.client.create_service(task_tmpl, name=name)728 svc_info = self.client.inspect_service(svc_id)729 assert 'Groups' in svc_info['Spec']['TaskTemplate']['ContainerSpec']730 groups = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Groups']731 assert len(groups) == 2732 assert 'shrinemaidens' in groups733 assert 'youkais' in groups734 @requires_api_version('1.25')735 def test_create_service_with_dns_config(self):736 dns_config = docker.types.DNSConfig(737 nameservers=['8.8.8.8', '8.8.4.4'],738 search=['local'], options=['debug']739 )740 container_spec = docker.types.ContainerSpec(741 TEST_IMG, ['sleep', '999'], dns_config=dns_config742 )743 task_tmpl = docker.types.TaskTemplate(container_spec)744 name = self.get_service_name()745 svc_id = self.client.create_service(task_tmpl, name=name)746 svc_info = self.client.inspect_service(svc_id)747 assert 'DNSConfig' in svc_info['Spec']['TaskTemplate']['ContainerSpec']748 assert (749 dns_config ==750 svc_info['Spec']['TaskTemplate']['ContainerSpec']['DNSConfig']751 )752 @requires_api_version('1.25')753 def test_create_service_with_healthcheck(self):754 second = 1000000000755 hc = docker.types.Healthcheck(756 test='true', retries=3, timeout=1 * second,757 start_period=3 * second, interval=int(second / 2),758 )759 container_spec = docker.types.ContainerSpec(760 TEST_IMG, ['sleep', '999'], healthcheck=hc761 )762 task_tmpl = docker.types.TaskTemplate(container_spec)763 name = self.get_service_name()764 svc_id = self.client.create_service(task_tmpl, name=name)765 svc_info = self.client.inspect_service(svc_id)766 assert (767 'Healthcheck' in svc_info['Spec']['TaskTemplate']['ContainerSpec']768 )769 assert (770 hc ==771 svc_info['Spec']['TaskTemplate']['ContainerSpec']['Healthcheck']772 )773 @requires_api_version('1.28')774 def test_create_service_with_readonly(self):775 container_spec = docker.types.ContainerSpec(776 TEST_IMG, ['sleep', '999'], read_only=True777 )778 task_tmpl = docker.types.TaskTemplate(container_spec)779 name = self.get_service_name()780 svc_id = self.client.create_service(task_tmpl, name=name)781 svc_info = self.client.inspect_service(svc_id)782 assert (783 'ReadOnly' in svc_info['Spec']['TaskTemplate']['ContainerSpec']784 )785 assert svc_info['Spec']['TaskTemplate']['ContainerSpec']['ReadOnly']786 @requires_api_version('1.28')787 def test_create_service_with_stop_signal(self):788 container_spec = docker.types.ContainerSpec(789 TEST_IMG, ['sleep', '999'], stop_signal='SIGINT'790 )791 task_tmpl = docker.types.TaskTemplate(container_spec)792 name = self.get_service_name()793 svc_id = self.client.create_service(task_tmpl, name=name)794 svc_info = self.client.inspect_service(svc_id)795 assert (796 'StopSignal' in svc_info['Spec']['TaskTemplate']['ContainerSpec']797 )798 assert (799 svc_info['Spec']['TaskTemplate']['ContainerSpec']['StopSignal'] ==800 'SIGINT'801 )802 @requires_api_version('1.30')803 def test_create_service_with_privileges(self):804 priv = docker.types.Privileges(selinux_disable=True)805 container_spec = docker.types.ContainerSpec(806 TEST_IMG, ['sleep', '999'], privileges=priv807 )808 task_tmpl = docker.types.TaskTemplate(container_spec)809 name = self.get_service_name()810 svc_id = self.client.create_service(task_tmpl, name=name)811 svc_info = self.client.inspect_service(svc_id)812 assert (813 'Privileges' in svc_info['Spec']['TaskTemplate']['ContainerSpec']814 )815 privileges = (816 svc_info['Spec']['TaskTemplate']['ContainerSpec']['Privileges']817 )818 assert privileges['SELinuxContext']['Disable'] is True819 @requires_api_version('1.38')820 def test_create_service_with_init(self):821 container_spec = docker.types.ContainerSpec(822 'busybox', ['sleep', '999'], init=True823 )824 task_tmpl = docker.types.TaskTemplate(container_spec)825 name = self.get_service_name()826 svc_id = self.client.create_service(task_tmpl, name=name)827 svc_info = self.client.inspect_service(svc_id)828 assert 'Init' in svc_info['Spec']['TaskTemplate']['ContainerSpec']829 assert (830 svc_info['Spec']['TaskTemplate']['ContainerSpec']['Init'] is True831 )832 @requires_api_version('1.25')833 def test_update_service_with_defaults_name(self):834 container_spec = docker.types.ContainerSpec(835 'busybox', ['echo', 'hello']836 )837 task_tmpl = docker.types.TaskTemplate(container_spec)838 name = self.get_service_name()839 svc_id = self.client.create_service(task_tmpl, name=name)840 svc_info = self.client.inspect_service(svc_id)841 assert 'Name' in svc_info['Spec']842 assert svc_info['Spec']['Name'] == name843 version_index = svc_info['Version']['Index']844 task_tmpl = docker.types.TaskTemplate(container_spec, force_update=10)845 self._update_service(846 svc_id, name, version_index, task_tmpl, fetch_current_spec=True847 )848 svc_info = self.client.inspect_service(svc_id)849 new_index = svc_info['Version']['Index']850 assert new_index > version_index851 assert 'Name' in svc_info['Spec']852 assert svc_info['Spec']['Name'] == name853 @requires_api_version('1.25')854 def test_update_service_with_defaults_labels(self):855 container_spec = docker.types.ContainerSpec(856 'busybox', ['echo', 'hello']857 )858 task_tmpl = docker.types.TaskTemplate(container_spec)859 name = self.get_service_name()860 svc_id = self.client.create_service(861 task_tmpl, name=name, labels={'service.label': 'SampleLabel'}862 )863 svc_info = self.client.inspect_service(svc_id)864 assert 'Labels' in svc_info['Spec']865 assert 'service.label' in svc_info['Spec']['Labels']866 assert svc_info['Spec']['Labels']['service.label'] == 'SampleLabel'867 version_index = svc_info['Version']['Index']868 task_tmpl = docker.types.TaskTemplate(container_spec, force_update=10)869 self._update_service(870 svc_id, name, version_index, task_tmpl, name=name,871 fetch_current_spec=True872 )873 svc_info = self.client.inspect_service(svc_id)874 new_index = svc_info['Version']['Index']875 assert new_index > version_index876 assert 'Labels' in svc_info['Spec']877 assert 'service.label' in svc_info['Spec']['Labels']878 assert svc_info['Spec']['Labels']['service.label'] == 'SampleLabel'879 def test_update_service_with_defaults_mode(self):880 container_spec = docker.types.ContainerSpec(881 'busybox', ['echo', 'hello']882 )883 task_tmpl = docker.types.TaskTemplate(container_spec)884 name = self.get_service_name()885 svc_id = self.client.create_service(886 task_tmpl, name=name,887 mode=docker.types.ServiceMode(mode='replicated', replicas=2)888 )889 svc_info = self.client.inspect_service(svc_id)890 assert 'Mode' in svc_info['Spec']891 assert 'Replicated' in svc_info['Spec']['Mode']892 assert 'Replicas' in svc_info['Spec']['Mode']['Replicated']893 assert svc_info['Spec']['Mode']['Replicated']['Replicas'] == 2894 version_index = svc_info['Version']['Index']895 self._update_service(896 svc_id, name, version_index, labels={'force': 'update'},897 fetch_current_spec=True898 )899 svc_info = self.client.inspect_service(svc_id)900 new_index = svc_info['Version']['Index']901 assert new_index > version_index902 assert 'Mode' in svc_info['Spec']903 assert 'Replicated' in svc_info['Spec']['Mode']904 assert 'Replicas' in svc_info['Spec']['Mode']['Replicated']905 assert svc_info['Spec']['Mode']['Replicated']['Replicas'] == 2906 def test_update_service_with_defaults_container_labels(self):907 container_spec = docker.types.ContainerSpec(908 'busybox', ['echo', 'hello'],909 labels={'container.label': 'SampleLabel'}910 )911 task_tmpl = docker.types.TaskTemplate(container_spec)912 name = self.get_service_name()913 svc_id = self.client.create_service(914 task_tmpl, name=name, labels={'service.label': 'SampleLabel'}915 )916 svc_info = self.client.inspect_service(svc_id)917 assert 'TaskTemplate' in svc_info['Spec']918 assert 'ContainerSpec' in svc_info['Spec']['TaskTemplate']919 assert 'Labels' in svc_info['Spec']['TaskTemplate']['ContainerSpec']920 labels = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Labels']921 assert labels['container.label'] == 'SampleLabel'922 version_index = svc_info['Version']['Index']923 self._update_service(924 svc_id, name, version_index, labels={'force': 'update'},925 fetch_current_spec=True926 )927 svc_info = self.client.inspect_service(svc_id)928 new_index = svc_info['Version']['Index']929 assert new_index > version_index930 assert 'TaskTemplate' in svc_info['Spec']931 assert 'ContainerSpec' in svc_info['Spec']['TaskTemplate']932 assert 'Labels' in svc_info['Spec']['TaskTemplate']['ContainerSpec']933 labels = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Labels']934 assert labels['container.label'] == 'SampleLabel'935 container_spec = docker.types.ContainerSpec(936 'busybox', ['echo', 'hello']937 )938 task_tmpl = docker.types.TaskTemplate(container_spec)939 self._update_service(940 svc_id, name, new_index, task_tmpl, fetch_current_spec=True941 )942 svc_info = self.client.inspect_service(svc_id)943 newer_index = svc_info['Version']['Index']944 assert newer_index > new_index945 assert 'TaskTemplate' in svc_info['Spec']946 assert 'ContainerSpec' in svc_info['Spec']['TaskTemplate']947 assert 'Labels' in svc_info['Spec']['TaskTemplate']['ContainerSpec']948 labels = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Labels']949 assert labels['container.label'] == 'SampleLabel'950 def test_update_service_with_defaults_update_config(self):951 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])952 task_tmpl = docker.types.TaskTemplate(container_spec)953 update_config = docker.types.UpdateConfig(954 parallelism=10, delay=5, failure_action='pause'955 )956 name = self.get_service_name()957 svc_id = self.client.create_service(958 task_tmpl, update_config=update_config, name=name959 )960 svc_info = self.client.inspect_service(svc_id)961 assert 'UpdateConfig' in svc_info['Spec']962 uc = svc_info['Spec']['UpdateConfig']963 assert update_config['Parallelism'] == uc['Parallelism']964 assert update_config['Delay'] == uc['Delay']965 assert update_config['FailureAction'] == uc['FailureAction']966 version_index = svc_info['Version']['Index']967 self._update_service(968 svc_id, name, version_index, labels={'force': 'update'},969 fetch_current_spec=True970 )971 svc_info = self.client.inspect_service(svc_id)972 new_index = svc_info['Version']['Index']973 assert new_index > version_index974 assert 'UpdateConfig' in svc_info['Spec']975 uc = svc_info['Spec']['UpdateConfig']976 assert update_config['Parallelism'] == uc['Parallelism']977 assert update_config['Delay'] == uc['Delay']978 assert update_config['FailureAction'] == uc['FailureAction']979 def test_update_service_with_defaults_networks(self):980 net1 = self.client.create_network(981 'dockerpytest_1', driver='overlay', ipam={'Driver': 'default'}982 )983 self.tmp_networks.append(net1['Id'])984 net2 = self.client.create_network(985 'dockerpytest_2', driver='overlay', ipam={'Driver': 'default'}986 )987 self.tmp_networks.append(net2['Id'])988 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])989 task_tmpl = docker.types.TaskTemplate(container_spec)990 name = self.get_service_name()991 svc_id = self.client.create_service(992 task_tmpl, name=name, networks=[993 'dockerpytest_1', {'Target': 'dockerpytest_2'}994 ]995 )996 svc_info = self.client.inspect_service(svc_id)997 assert 'Networks' in svc_info['Spec']998 assert svc_info['Spec']['Networks'] == [999 {'Target': net1['Id']}, {'Target': net2['Id']}1000 ]1001 version_index = svc_info['Version']['Index']1002 self._update_service(1003 svc_id, name, version_index, labels={'force': 'update'},1004 fetch_current_spec=True1005 )1006 svc_info = self.client.inspect_service(svc_id)1007 new_index = svc_info['Version']['Index']1008 assert new_index > version_index1009 assert 'Networks' in svc_info['Spec']['TaskTemplate']1010 assert svc_info['Spec']['TaskTemplate']['Networks'] == [1011 {'Target': net1['Id']}, {'Target': net2['Id']}1012 ]1013 self._update_service(1014 svc_id, name, new_index, networks=[net1['Id']],1015 fetch_current_spec=True1016 )1017 svc_info = self.client.inspect_service(svc_id)1018 assert 'Networks' in svc_info['Spec']['TaskTemplate']1019 assert svc_info['Spec']['TaskTemplate']['Networks'] == [1020 {'Target': net1['Id']}1021 ]1022 def test_update_service_with_defaults_endpoint_spec(self):1023 container_spec = docker.types.ContainerSpec(TEST_IMG, ['true'])1024 task_tmpl = docker.types.TaskTemplate(container_spec)1025 name = self.get_service_name()1026 endpoint_spec = docker.types.EndpointSpec(ports={1027 12357: (1990, 'udp'),1028 12562: (678,),1029 53243: 8080,1030 })1031 svc_id = self.client.create_service(1032 task_tmpl, name=name, endpoint_spec=endpoint_spec1033 )1034 svc_info = self.client.inspect_service(svc_id)1035 print(svc_info)1036 ports = svc_info['Spec']['EndpointSpec']['Ports']1037 for port in ports:1038 if port['PublishedPort'] == 12562:1039 assert port['TargetPort'] == 6781040 assert port['Protocol'] == 'tcp'1041 elif port['PublishedPort'] == 53243:1042 assert port['TargetPort'] == 80801043 assert port['Protocol'] == 'tcp'1044 elif port['PublishedPort'] == 12357:1045 assert port['TargetPort'] == 19901046 assert port['Protocol'] == 'udp'1047 else:1048 self.fail(f'Invalid port specification: {port}')1049 assert len(ports) == 31050 svc_info = self.client.inspect_service(svc_id)1051 version_index = svc_info['Version']['Index']1052 self._update_service(1053 svc_id, name, version_index, labels={'force': 'update'},1054 fetch_current_spec=True1055 )1056 svc_info = self.client.inspect_service(svc_id)1057 new_index = svc_info['Version']['Index']1058 assert new_index > version_index1059 ports = svc_info['Spec']['EndpointSpec']['Ports']1060 for port in ports:1061 if port['PublishedPort'] == 12562:1062 assert port['TargetPort'] == 6781063 assert port['Protocol'] == 'tcp'1064 elif port['PublishedPort'] == 53243:1065 assert port['TargetPort'] == 80801066 assert port['Protocol'] == 'tcp'1067 elif port['PublishedPort'] == 12357:1068 assert port['TargetPort'] == 19901069 assert port['Protocol'] == 'udp'1070 else:1071 self.fail(f'Invalid port specification: {port}')1072 assert len(ports) == 31073 @requires_api_version('1.25')1074 def test_update_service_remove_healthcheck(self):1075 second = 10000000001076 hc = docker.types.Healthcheck(1077 test='true', retries=3, timeout=1 * second,1078 start_period=3 * second, interval=int(second / 2),1079 )1080 container_spec = docker.types.ContainerSpec(1081 TEST_IMG, ['sleep', '999'], healthcheck=hc1082 )1083 task_tmpl = docker.types.TaskTemplate(container_spec)1084 name = self.get_service_name()1085 svc_id = self.client.create_service(task_tmpl, name=name)1086 svc_info = self.client.inspect_service(svc_id)1087 assert (1088 'Healthcheck' in svc_info['Spec']['TaskTemplate']['ContainerSpec']1089 )1090 assert (1091 hc ==1092 svc_info['Spec']['TaskTemplate']['ContainerSpec']['Healthcheck']1093 )1094 container_spec = docker.types.ContainerSpec(1095 TEST_IMG, ['sleep', '999'], healthcheck={}1096 )1097 task_tmpl = docker.types.TaskTemplate(container_spec)1098 version_index = svc_info['Version']['Index']1099 self._update_service(1100 svc_id, name, version_index, task_tmpl, fetch_current_spec=True1101 )1102 svc_info = self.client.inspect_service(svc_id)1103 new_index = svc_info['Version']['Index']1104 assert new_index > version_index1105 container_spec = svc_info['Spec']['TaskTemplate']['ContainerSpec']1106 assert (1107 'Healthcheck' not in container_spec or1108 not container_spec['Healthcheck']1109 )1110 def test_update_service_remove_labels(self):1111 container_spec = docker.types.ContainerSpec(1112 'busybox', ['echo', 'hello']1113 )1114 task_tmpl = docker.types.TaskTemplate(container_spec)1115 name = self.get_service_name()1116 svc_id = self.client.create_service(1117 task_tmpl, name=name, labels={'service.label': 'SampleLabel'}1118 )1119 svc_info = self.client.inspect_service(svc_id)1120 assert 'Labels' in svc_info['Spec']1121 assert 'service.label' in svc_info['Spec']['Labels']1122 assert svc_info['Spec']['Labels']['service.label'] == 'SampleLabel'1123 version_index = svc_info['Version']['Index']1124 self._update_service(1125 svc_id, name, version_index, labels={}, fetch_current_spec=True1126 )1127 svc_info = self.client.inspect_service(svc_id)1128 new_index = svc_info['Version']['Index']1129 assert new_index > version_index1130 assert not svc_info['Spec'].get('Labels')1131 def test_update_service_remove_container_labels(self):1132 container_spec = docker.types.ContainerSpec(1133 'busybox', ['echo', 'hello'],1134 labels={'container.label': 'SampleLabel'}1135 )1136 task_tmpl = docker.types.TaskTemplate(container_spec)1137 name = self.get_service_name()1138 svc_id = self.client.create_service(1139 task_tmpl, name=name, labels={'service.label': 'SampleLabel'}1140 )1141 svc_info = self.client.inspect_service(svc_id)1142 assert 'TaskTemplate' in svc_info['Spec']1143 assert 'ContainerSpec' in svc_info['Spec']['TaskTemplate']1144 assert 'Labels' in svc_info['Spec']['TaskTemplate']['ContainerSpec']1145 labels = svc_info['Spec']['TaskTemplate']['ContainerSpec']['Labels']1146 assert labels['container.label'] == 'SampleLabel'1147 version_index = svc_info['Version']['Index']1148 container_spec = docker.types.ContainerSpec(1149 'busybox', ['echo', 'hello'],1150 labels={}1151 )1152 task_tmpl = docker.types.TaskTemplate(container_spec)1153 self._update_service(1154 svc_id, name, version_index, task_tmpl, fetch_current_spec=True1155 )1156 svc_info = self.client.inspect_service(svc_id)1157 new_index = svc_info['Version']['Index']1158 assert new_index > version_index1159 assert 'TaskTemplate' in svc_info['Spec']1160 assert 'ContainerSpec' in svc_info['Spec']['TaskTemplate']1161 container_spec = svc_info['Spec']['TaskTemplate']['ContainerSpec']1162 assert not container_spec.get('Labels')1163 @requires_api_version('1.29')1164 def test_update_service_with_network_change(self):1165 container_spec = docker.types.ContainerSpec(1166 'busybox', ['echo', 'hello']1167 )1168 task_tmpl = docker.types.TaskTemplate(container_spec)1169 net1 = self.client.create_network(1170 self.get_service_name(), driver='overlay',1171 ipam={'Driver': 'default'}1172 )1173 self.tmp_networks.append(net1['Id'])1174 net2 = self.client.create_network(1175 self.get_service_name(), driver='overlay',1176 ipam={'Driver': 'default'}1177 )1178 self.tmp_networks.append(net2['Id'])1179 name = self.get_service_name()1180 svc_id = self.client.create_service(1181 task_tmpl, name=name, networks=[net1['Id']]1182 )1183 svc_info = self.client.inspect_service(svc_id)1184 assert 'Networks' in svc_info['Spec']1185 assert len(svc_info['Spec']['Networks']) > 01186 assert svc_info['Spec']['Networks'][0]['Target'] == net1['Id']1187 svc_info = self.client.inspect_service(svc_id)1188 version_index = svc_info['Version']['Index']1189 task_tmpl = docker.types.TaskTemplate(container_spec)1190 self._update_service(1191 svc_id, name, version_index, task_tmpl, name=name,1192 networks=[net2['Id']], fetch_current_spec=True1193 )1194 svc_info = self.client.inspect_service(svc_id)1195 task_template = svc_info['Spec']['TaskTemplate']1196 assert 'Networks' in task_template1197 assert len(task_template['Networks']) > 01198 assert task_template['Networks'][0]['Target'] == net2['Id']1199 svc_info = self.client.inspect_service(svc_id)1200 new_index = svc_info['Version']['Index']1201 assert new_index > version_index1202 self._update_service(1203 svc_id, name, new_index, name=name, networks=[net1['Id']],1204 fetch_current_spec=True1205 )1206 svc_info = self.client.inspect_service(svc_id)1207 task_template = svc_info['Spec']['TaskTemplate']1208 assert 'ContainerSpec' in task_template1209 new_spec = task_template['ContainerSpec']1210 assert 'Image' in new_spec1211 assert new_spec['Image'].split(':')[0] == 'busybox'1212 assert 'Command' in new_spec1213 assert new_spec['Command'] == ['echo', 'hello']1214 assert 'Networks' in task_template1215 assert len(task_template['Networks']) > 01216 assert task_template['Networks'][0]['Target'] == net1['Id']1217 svc_info = self.client.inspect_service(svc_id)1218 new_index = svc_info['Version']['Index']1219 task_tmpl = docker.types.TaskTemplate(1220 container_spec, networks=[net2['Id']]1221 )1222 self._update_service(1223 svc_id, name, new_index, task_tmpl, name=name,1224 fetch_current_spec=True1225 )1226 svc_info = self.client.inspect_service(svc_id)1227 task_template = svc_info['Spec']['TaskTemplate']1228 assert 'Networks' in task_template1229 assert len(task_template['Networks']) > 01230 assert task_template['Networks'][0]['Target'] == net2['Id']1231 def _update_service(self, svc_id, *args, **kwargs):1232 # service update tests seem to be a bit flaky1233 # give them a chance to retry the update with a new version index1234 try:1235 self.client.update_service(*args, **kwargs)1236 except docker.errors.APIError as e:1237 if e.explanation.endswith("update out of sequence"):1238 svc_info = self.client.inspect_service(svc_id)1239 version_index = svc_info['Version']['Index']1240 if len(args) > 1:1241 args = (args[0], version_index) + args[2:]1242 else:1243 kwargs['version'] = version_index1244 self.client.update_service(*args, **kwargs)1245 else:1246 raise1247 @requires_api_version('1.41')1248 def test_create_service_cap_add(self):1249 name = self.get_service_name()1250 container_spec = docker.types.ContainerSpec(1251 TEST_IMG, ['echo', 'hello'], cap_add=['CAP_SYSLOG']1252 )1253 task_tmpl = docker.types.TaskTemplate(container_spec)1254 svc_id = self.client.create_service(task_tmpl, name=name)1255 assert self.client.inspect_service(svc_id)1256 services = self.client.services(filters={'name': name})1257 assert len(services) == 11258 assert services[0]['ID'] == svc_id['ID']1259 spec = services[0]['Spec']['TaskTemplate']['ContainerSpec']1260 assert 'CAP_SYSLOG' in spec['CapabilityAdd']1261 @requires_api_version('1.41')1262 def test_create_service_cap_drop(self):1263 name = self.get_service_name()1264 container_spec = docker.types.ContainerSpec(1265 TEST_IMG, ['echo', 'hello'], cap_drop=['CAP_SYSLOG']1266 )1267 task_tmpl = docker.types.TaskTemplate(container_spec)1268 svc_id = self.client.create_service(task_tmpl, name=name)1269 assert self.client.inspect_service(svc_id)1270 services = self.client.services(filters={'name': name})1271 assert len(services) == 11272 assert services[0]['ID'] == svc_id['ID']1273 spec = services[0]['Spec']['TaskTemplate']['ContainerSpec']...

Full Screen

Full Screen

_base.py

Source:_base.py Github

copy

Full Screen

...15 if service_name is not None:16 self.service_name = service_name17 async def get_service_metadata(self):18 return await self.internal_request(19 self.get_service_name(),20 method='get_service_metadata',21 registered_services=self.settings['registered_services'])22 def get_service_name(self):23 if self.service_name is None:24 raise ImproperlyConfigured(25 "ServiceContextMixin requires either a definition of "26 "'service_name' or an implementation of 'get_service_name()'")27 else:28 return self.service_name29 async def get_context_data(self, **kwargs):30 context = await super().get_context_data(**kwargs)31 try:32 metadata = await self.get_service_metadata()33 except RequestTimeoutError:34 pass # ¯\_(ツ)_/¯35 except ServiceDoesNotExist:36 raise HttpNotFoundError37 else:38 context['metadata'] = metadata39 context['service_name'] = self.get_service_name()40 return context41class PageHandlerMixin:42 page_name = None43 breadcrumbs = None44 template_root = ''45 def get_template_root(self):46 return self.template_root47 def get_template_name(self):48 return os.path.join(self.get_template_root(), self.page_name + '.html')49 def get_breadcrumbs(self):50 return self.breadcrumbs51 @property52 def extra_context(self):53 return {54 'page': self.page_name,55 'breadcrumbs': self.get_breadcrumbs(),56 }57class UserTemplateServiceRequestHandler(ServiceContextMixin, TemplateHandler, UserHandlerMixin):58 template_name = None59 def get_template_root(self):60 return os.path.join('services', self.get_service_name())61 def get_template_name(self):62 return os.path.join(self.get_template_root(), self.template_name)63 def render(self, template_name=None, **kwargs):64 try:65 super().render(template_name, **kwargs)66 except FileNotFoundError:67 super().render(os.path.join('services', 'default.html'), **kwargs)68class ServicePageHandler(PageHandlerMixin, UserTemplateServiceRequestHandler):69 def get_template_root(self):70 return os.path.join('services', self.get_service_name())71 def get_template_name(self):72 return os.path.join(self.get_template_root(), self.page_name + '.html')73class ServiceFormHandler(FormMixin, ProcessFormMixin, ServicePageHandler):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful