How to use test_container_info method in avocado

Best Python code snippet using avocado_python

abstract_spawn_test_environment.py

Source:abstract_spawn_test_environment.py Github

copy

Full Screen

1from pathlib import Path2from typing import Generator, Tuple, Optional3import luigi4from exasol_integration_test_docker_environment.abstract_method_exception import AbstractMethodException5from exasol_integration_test_docker_environment.lib.base.base_task import BaseTask6from exasol_integration_test_docker_environment.lib.base.docker_base_task import DockerBaseTask7from exasol_integration_test_docker_environment.lib.data.container_info import ContainerInfo8from exasol_integration_test_docker_environment.lib.data.database_credentials import DatabaseCredentialsParameter9from exasol_integration_test_docker_environment.lib.data.database_info import DatabaseInfo10from exasol_integration_test_docker_environment.lib.data.docker_network_info import DockerNetworkInfo11from exasol_integration_test_docker_environment.lib.data.docker_volume_info import DockerVolumeInfo12from exasol_integration_test_docker_environment.lib.data.environment_info import EnvironmentInfo13from exasol_integration_test_docker_environment.lib.test_environment.docker_container_copy import DockerContainerCopy14from exasol_integration_test_docker_environment.lib.test_environment.parameter.general_spawn_test_environment_parameter import \15 GeneralSpawnTestEnvironmentParameter16from exasol_integration_test_docker_environment.lib.test_environment.spawn_test_container import SpawnTestContainer17DATABASE = "database"18TEST_CONTAINER = "test_container"19class AbstractSpawnTestEnvironment(DockerBaseTask,20 GeneralSpawnTestEnvironmentParameter,21 DatabaseCredentialsParameter):22 environment_name = luigi.Parameter() # type: str23 def __init__(self, *args, **kwargs):24 super().__init__(*args, **kwargs)25 self.test_container_name = f"""test_container_{self.environment_name}"""26 self.network_name = f"""db_network_{self.environment_name}"""27 def get_environment_type(self):28 raise AbstractMethodException()29 def run_task(self):30 test_environment_info = yield from self._attempt_database_start()31 self.return_object(test_environment_info)32 def _attempt_database_start(self):33 is_database_ready = False34 attempt = 035 database_info = None36 test_container_info = None37 while not is_database_ready and attempt < self.max_start_attempts:38 network_info, database_info, is_database_ready, test_container_info = \39 yield from self._start_database(attempt)40 attempt += 141 if not is_database_ready and not attempt < self.max_start_attempts:42 raise Exception(f"Maximum attempts {attempt} to start the database reached.")43 test_environment_info = \44 EnvironmentInfo(name=self.environment_name,45 env_type=self.get_environment_type(),46 database_info=database_info,47 test_container_info=test_container_info,48 network_info=network_info)49 self.create_test_environment_info_in_test_container_and_on_host(test_environment_info)50 return test_environment_info51 def create_test_environment_info_in_test_container(self, test_environment_info: EnvironmentInfo,52 environment_variables: str,53 environment_variables_with_export: str,54 json: str):55 test_container_name = test_environment_info.test_container_info.container_name56 with self._get_docker_client() as docker_client:57 test_container = docker_client.containers.get(test_container_name)58 self.logger.info(f"Create test environment info in test container '{test_container_name}' at '/'")59 copy = DockerContainerCopy(test_container)60 copy.add_string_to_file("environment_info.json", json)61 copy.add_string_to_file("environment_info.conf", environment_variables)62 copy.add_string_to_file("environment_info.sh", environment_variables_with_export)63 copy.copy("/")64 def create_test_environment_info_in_test_container_and_on_host(65 self, test_environment_info: EnvironmentInfo):66 test_environment_info_base_host_path = Path(self.get_cache_path(),67 f"environments/{self.environment_name}")68 test_environment_info_base_host_path.mkdir(exist_ok=True, parents=True)69 self.logger.info(f"Create test environment info on the host at '{test_environment_info_base_host_path}'")70 json = test_environment_info.to_json()71 cache_environment_info_json_path = Path(test_environment_info_base_host_path,72 "environment_info.json")73 with cache_environment_info_json_path.open("w") as f:74 f.write(json)75 if test_environment_info.test_container_info is not None:76 test_container_name = test_environment_info.test_container_info.container_name77 else:78 test_container_name = ""79 environment_variables = \80 self.collect_environment_info_variables(test_container_name,81 test_environment_info)82 cache_environment_info_conf_path = Path(test_environment_info_base_host_path,83 "environment_info.conf")84 with cache_environment_info_conf_path.open("w") as f:85 f.write(environment_variables)86 environment_variables_with_export = ""87 for line in environment_variables.splitlines():88 environment_variables_with_export += f"export {line}\n"89 cache_environment_info_sh_path = Path(test_environment_info_base_host_path, "environment_info.sh")90 with cache_environment_info_sh_path.open("w") as f:91 f.write(environment_variables_with_export)92 if test_environment_info.test_container_info is not None:93 self.create_test_environment_info_in_test_container(test_environment_info,94 environment_variables,95 environment_variables_with_export, json)96 def collect_environment_info_variables(self, test_container_name: str, test_environment_info):97 environment_variables = ""98 environment_variables += f"ENVIRONMENT_NAME={test_environment_info.name}\n"99 environment_variables += f"ENVIRONMENT_TYPE={test_environment_info.type}\n"100 environment_variables += f"ENVIRONMENT_DATABASE_HOST={test_environment_info.database_info.host}\n"101 environment_variables += f"ENVIRONMENT_DATABASE_DB_PORT={test_environment_info.database_info.db_port}\n"102 environment_variables += f"ENVIRONMENT_DATABASE_BUCKETFS_PORT={test_environment_info.database_info.bucketfs_port}\n"103 if test_environment_info.database_info.container_info is not None:104 environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_NAME={test_environment_info.database_info.container_info.container_name}\n"""105 database_container_network_aliases = " ".join(106 test_environment_info.database_info.container_info.network_aliases)107 environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_NETWORK_ALIASES="{database_container_network_aliases}"\n"""108 environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_IP_ADDRESS={test_environment_info.database_info.container_info.ip_address}\n"""109 environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_VOLUMNE_NAME={test_environment_info.database_info.container_info.volume_name}\n"""110 with self._get_docker_client() as docker_client:111 db_container = docker_client.containers.get(112 test_environment_info.database_info.container_info.container_name)113 db_container.reload()114 default_bridge_ip_address = db_container.attrs["NetworkSettings"]["Networks"]["bridge"]["IPAddress"]115 environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_DEFAULT_BRIDGE_IP_ADDRESS={default_bridge_ip_address}\n"""116 if test_environment_info.test_container_info is not None:117 environment_variables += f"""ENVIRONMENT_TEST_CONTAINER_NAME={test_container_name}\n"""118 test_container_network_aliases = " ".join(test_environment_info.test_container_info.network_aliases)119 environment_variables += f"""ENVIRONMENT_TEST_CONTAINER_NETWORK_ALIASES="{test_container_network_aliases}"\n"""120 environment_variables += f"""ENVIRONMENT_TEST_CONTAINER_IP_ADDRESS={test_environment_info.test_container_info.ip_address}\n"""121 return environment_variables122 def _start_database(self, attempt) \123 -> Generator[BaseTask, BaseTask, Tuple[DockerNetworkInfo, DatabaseInfo, bool, Optional[ContainerInfo]]]:124 network_info = yield from self._create_network(attempt)125 ssl_volume_info = None126 if self.create_certificates:127 ssl_volume_info = yield from self._create_ssl_certificates()128 database_info, test_container_info = \129 yield from self._spawn_database_and_test_container(network_info, ssl_volume_info, attempt)130 is_database_ready = yield from self._wait_for_database(database_info, attempt)131 return network_info, database_info, is_database_ready, test_container_info132 def _create_ssl_certificates(self) -> DockerVolumeInfo:133 ssl_info_future = yield from self.run_dependencies(self.create_ssl_certificates())134 ssl_info = self.get_values_from_future(ssl_info_future)135 return ssl_info136 def create_ssl_certificates(self):137 raise AbstractMethodException()138 def _create_network(self, attempt):139 network_info_future = yield from self.run_dependencies(self.create_network_task(attempt))140 network_info = self.get_values_from_future(network_info_future)141 return network_info142 def create_network_task(self, attempt: int):143 raise AbstractMethodException()144 def _spawn_database_and_test_container(self,145 network_info: DockerNetworkInfo,146 certificate_volume_info: Optional[DockerVolumeInfo],147 attempt: int) -> Tuple[DatabaseInfo, Optional[ContainerInfo]]:148 certificate_volume_name = certificate_volume_info.volume_name if certificate_volume_info is not None else None149 dependencies_tasks = {150 DATABASE: self.create_spawn_database_task(network_info, certificate_volume_info, attempt)151 }152 if self.test_container_content is not None:153 dependencies_tasks[TEST_CONTAINER] = \154 self.create_spawn_test_container_task(network_info, certificate_volume_name, attempt)155 database_and_test_container_info_future = yield from self.run_dependencies(dependencies_tasks)156 database_and_test_container_info = \157 self.get_values_from_futures(database_and_test_container_info_future)158 test_container_info = None159 if self.test_container_content is not None:160 test_container_info = database_and_test_container_info[TEST_CONTAINER]161 database_info = database_and_test_container_info[DATABASE]162 return database_info, test_container_info163 def create_spawn_database_task(self,164 network_info: DockerNetworkInfo,165 certificate_volume_info: Optional[DockerVolumeInfo],166 attempt: int):167 raise AbstractMethodException()168 def create_spawn_test_container_task(self, network_info: DockerNetworkInfo,169 certificate_volume_name: str, attempt: int):170 return self.create_child_task_with_common_params(171 SpawnTestContainer,172 test_container_name=self.test_container_name,173 network_info=network_info,174 ip_address_index_in_subnet=1,175 certificate_volume_name=certificate_volume_name,176 attempt=attempt,177 test_container_content=self.test_container_content178 )179 def _wait_for_database(self,180 database_info: DatabaseInfo,181 attempt: int):182 database_ready_target_future = \183 yield from self.run_dependencies(self.create_wait_for_database_task(attempt, database_info))184 is_database_ready = self.get_values_from_futures(database_ready_target_future)185 return is_database_ready186 def create_wait_for_database_task(self,187 attempt: int,188 database_info: DatabaseInfo):...

Full Screen

Full Screen

environment_info.py

Source:environment_info.py Github

copy

Full Screen

1from typing import Optional2from exasol_integration_test_docker_environment.lib.base.info import Info3from exasol_integration_test_docker_environment.lib.data.container_info import ContainerInfo4from exasol_integration_test_docker_environment.lib.data.database_info import DatabaseInfo5from exasol_integration_test_docker_environment.lib.data.docker_network_info import DockerNetworkInfo6class EnvironmentInfo(Info):7 def __init__(self,8 name: str, env_type: str,9 database_info: DatabaseInfo,10 test_container_info: Optional[ContainerInfo],11 network_info: DockerNetworkInfo):12 self.name = name13 self.type = env_type14 self.test_container_info = test_container_info15 self.database_info = database_info...

Full Screen

Full Screen

test_docker_models.py

Source:test_docker_models.py Github

copy

Full Screen

1import pytest2from docker_addons.models import ContainerInfo3@pytest.mark.django_db4def test_container_info():5 seen = set()6 for _ in range(30):7 instance = ContainerInfo.objects.create()8 assert len(instance.name) == 509 assert instance.name not in seen...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful