Best Python code snippet using localstack_python
docker_utils.py
Source:docker_utils.py
...860 network = flag861 cur_state = None862 return env_vars, ports, mounts, extra_hosts, network863 @staticmethod864 def convert_mount_list_to_dict(865 mount_volumes: List[Tuple[str, str]]866 ) -> Dict[str, Dict[str, str]]:867 """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""868 return dict(869 map(870 lambda paths: (str(paths[0]), {"bind": paths[1], "mode": "rw"}),871 mount_volumes,872 )873 )874class SdkDockerClient(ContainerClient):875 """Class for managing docker using the python docker sdk"""876 docker_client: Optional[DockerClient]877 def __init__(self):878 try:879 self.docker_client = docker.from_env()880 logging.getLogger("urllib3").setLevel(logging.INFO)881 except DockerException:882 self.docker_client = None883 def client(self):884 if self.docker_client:885 return self.docker_client886 else:887 raise ContainerException("Docker not available")888 def _read_from_sock(self, sock: socket, tty: bool):889 """Reads multiplexed messages from a socket returned by attach_socket.890 Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach891 """892 stdout = b""893 stderr = b""894 for frame_type, frame_data in frames_iter(sock, tty):895 if frame_type == STDOUT:896 stdout += frame_data897 elif frame_type == STDERR:898 stderr += frame_data899 else:900 raise ContainerException("Invalid frame type when reading from socket")901 return stdout, stderr902 def _container_path_info(self, container: Container, container_path: str):903 """904 Get information about a path in the given container905 :param container: Container to be inspected906 :param container_path: Path in container907 :return: Tuple (path_exists, path_is_directory)908 """909 # Docker CLI copy uses go FileMode to determine if target is a dict or not910 # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260911 # The isDir Bit is the most significant bit in the 32bit struct:912 # https://golang.org/src/os/types.go?s=2650:2683913 stats = {}914 try:915 _, stats = container.get_archive(container_path)916 target_exists = True917 except APIError:918 target_exists = False919 target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)920 return target_exists, target_is_dir921 def get_container_status(self, container_name: str) -> DockerContainerStatus:922 # LOG.debug("Getting container status for container: %s", container_name) # too verbose923 try:924 container = self.client().containers.get(container_name)925 if container.status == "running":926 return DockerContainerStatus.UP927 else:928 return DockerContainerStatus.DOWN929 except NotFound:930 return DockerContainerStatus.NON_EXISTENT931 except APIError:932 raise ContainerException()933 def get_network(self, container_name: str) -> str:934 LOG.debug("Getting network type for container: %s", container_name)935 try:936 container = self.client().containers.get(container_name)937 return container.attrs["HostConfig"]["NetworkMode"]938 except NotFound:939 raise NoSuchContainer(container_name)940 except APIError:941 raise ContainerException()942 def stop_container(self, container_name: str) -> None:943 LOG.debug("Stopping container: %s", container_name)944 try:945 container = self.client().containers.get(container_name)946 container.stop(timeout=0)947 except NotFound:948 raise NoSuchContainer(container_name)949 except APIError:950 raise ContainerException()951 def remove_container(self, container_name: str, force=True, check_existence=False) -> None:952 LOG.debug("Removing container: %s", container_name)953 if check_existence and container_name not in self.get_running_container_names():954 LOG.debug("Aborting removing due to check_existence check")955 return956 try:957 container = self.client().containers.get(container_name)958 container.remove(force=force)959 except NotFound:960 if not force:961 raise NoSuchContainer(container_name)962 except APIError:963 raise ContainerException()964 def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:965 if filter:966 filter = [filter] if isinstance(filter, str) else filter967 filter = dict([f.split("=", 1) for f in filter])968 LOG.debug("Listing containers with filters: %s", filter)969 try:970 container_list = self.client().containers.list(filters=filter, all=all)971 return list(972 map(973 lambda container: {974 "id": container.id,975 "image": container.image,976 "name": container.name,977 "status": container.status,978 "labels": container.labels,979 },980 container_list,981 )982 )983 except APIError:984 raise ContainerException()985 def copy_into_container(986 self, container_name: str, local_path: str, container_path: str987 ) -> None: # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/988 LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)989 try:990 container = self.client().containers.get(container_name)991 target_exists, target_isdir = self._container_path_info(container, container_path)992 target_path = container_path if target_isdir else os.path.dirname(container_path)993 with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:994 container.put_archive(target_path, tar)995 except NotFound:996 raise NoSuchContainer(container_name)997 except APIError:998 raise ContainerException()999 def copy_from_container(1000 self,1001 container_name: str,1002 local_path: str,1003 container_path: str,1004 ) -> None:1005 LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)1006 try:1007 container = self.client().containers.get(container_name)1008 bits, _ = container.get_archive(container_path)1009 Util.untar_to_path(bits, local_path)1010 except NotFound:1011 raise NoSuchContainer(container_name)1012 except APIError:1013 raise ContainerException()1014 def pull_image(self, docker_image: str) -> None:1015 LOG.debug("Pulling image: %s", docker_image)1016 # some path in the docker image string indicates a custom repository1017 path_split = docker_image.rpartition("/")1018 image_split = path_split[2].partition(":")1019 repository = f"{path_split[0]}{path_split[1]}{image_split[0]}"1020 tag = image_split[2]1021 try:1022 LOG.debug("Repository: %s Tag: %s", repository, tag)1023 self.client().images.pull(repository, tag)1024 except ImageNotFound:1025 raise NoSuchImage(docker_image)1026 except APIError:1027 raise ContainerException()1028 def get_docker_image_names(self, strip_latest=True, include_tags=True):1029 try:1030 images = self.client().images.list()1031 image_names = [tag for image in images for tag in image.tags if image.tags]1032 if not include_tags:1033 image_names = list(map(lambda image_name: image_name.split(":")[0], image_names))1034 if strip_latest:1035 Util.append_without_latest(image_names)1036 return image_names1037 except APIError:1038 raise ContainerException()1039 def get_container_logs(self, container_name_or_id: str, safe=False) -> str:1040 try:1041 container = self.client().containers.get(container_name_or_id)1042 return to_str(container.logs())1043 except NotFound:1044 if safe:1045 return ""1046 raise NoSuchContainer(container_name_or_id)1047 except APIError:1048 if safe:1049 return ""1050 raise ContainerException()1051 def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:1052 try:1053 return self.client().containers.get(container_name_or_id).attrs1054 except NotFound:1055 raise NoSuchContainer(container_name_or_id)1056 except APIError:1057 raise ContainerException()1058 def inspect_image(self, image_name: str) -> Dict[str, Union[Dict, str]]:1059 try:1060 return self.client().images.get(image_name).attrs1061 except NotFound:1062 raise NoSuchImage(image_name)1063 except APIError:1064 raise ContainerException()1065 def inspect_network(self, network_name: str) -> Dict[str, Union[Dict, str]]:1066 try:1067 return self.client().networks.get(network_name).attrs1068 except NotFound:1069 raise NoSuchNetwork(network_name)1070 except APIError:1071 raise ContainerException()1072 def get_container_ip(self, container_name_or_id: str) -> str:1073 networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]1074 network_names = list(networks)1075 if len(network_names) > 1:1076 LOG.info("Container has more than one assigned network. Picking the first one...")1077 return networks[network_names[0]]["IPAddress"]1078 def has_docker(self) -> bool:1079 try:1080 if not self.docker_client:1081 return False1082 self.client().ping()1083 return True1084 except APIError:1085 return False1086 def start_container(1087 self,1088 container_name_or_id: str,1089 stdin=None,1090 interactive: bool = False,1091 attach: bool = False,1092 flags: Optional[str] = None,1093 ) -> Tuple[bytes, bytes]:1094 LOG.debug("Starting container %s", container_name_or_id)1095 try:1096 container = self.client().containers.get(container_name_or_id)1097 stdout = to_bytes(container_name_or_id)1098 stderr = b""1099 if interactive or attach:1100 params = {"stdout": 1, "stderr": 1, "stream": 1}1101 if interactive:1102 params["stdin"] = 11103 sock = container.attach_socket(params=params)1104 sock = sock._sock if hasattr(sock, "_sock") else sock1105 result_queue = queue.Queue()1106 thread_started = threading.Event()1107 start_waiting = threading.Event()1108 # Note: We need to be careful about potential race conditions here - .wait() should happen right1109 # after .start(). Hence starting a thread and asynchronously waiting for the container exit code1110 def wait_for_result(*_):1111 _exit_code = -11112 try:1113 thread_started.set()1114 start_waiting.wait()1115 _exit_code = container.wait()["StatusCode"]1116 except APIError as e:1117 _exit_code = 11118 raise ContainerException(str(e))1119 finally:1120 result_queue.put(_exit_code)1121 # start listener thread1122 start_worker_thread(wait_for_result)1123 thread_started.wait()1124 # start container1125 container.start()1126 # start awaiting container result1127 start_waiting.set()1128 # handle container input/output1129 with sock:1130 try:1131 if stdin:1132 sock.sendall(to_bytes(stdin))1133 sock.shutdown(socket.SHUT_WR)1134 stdout, stderr = self._read_from_sock(sock, False)1135 except socket.timeout:1136 LOG.debug(1137 f"Socket timeout when talking to the I/O streams of Docker container '{container_name_or_id}'"1138 )1139 # get container exit code1140 exit_code = result_queue.get()1141 if exit_code:1142 raise ContainerException(1143 "Docker container returned with exit code %s" % exit_code,1144 stdout=stdout,1145 stderr=stderr,1146 )1147 else:1148 container.start()1149 return stdout, stderr1150 except NotFound:1151 raise NoSuchContainer(container_name_or_id)1152 except APIError:1153 raise ContainerException()1154 def create_container(1155 self,1156 image_name: str,1157 *,1158 name: Optional[str] = None,1159 entrypoint: Optional[str] = None,1160 remove: bool = False,1161 interactive: bool = False,1162 tty: bool = False,1163 detach: bool = False,1164 command: Optional[Union[List[str], str]] = None,1165 mount_volumes: Optional[List[Tuple[str, str]]] = None,1166 ports: Optional[PortMappings] = None,1167 env_vars: Optional[Dict[str, str]] = None,1168 user: Optional[str] = None,1169 cap_add: Optional[str] = None,1170 network: Optional[str] = None,1171 dns: Optional[str] = None,1172 additional_flags: Optional[str] = None,1173 workdir: Optional[str] = None,1174 ) -> str:1175 LOG.debug(1176 "Creating container with image %s, command '%s', volumes %s, env vars %s",1177 image_name,1178 command,1179 mount_volumes,1180 env_vars,1181 )1182 extra_hosts = None1183 if additional_flags:1184 env_vars, ports, mount_volumes, extra_hosts, network = Util.parse_additional_flags(1185 additional_flags, env_vars, ports, mount_volumes, network1186 )1187 try:1188 kwargs = {}1189 if cap_add:1190 kwargs["cap_add"] = [cap_add]1191 if dns:1192 kwargs["dns"] = [dns]1193 if ports:1194 kwargs["ports"] = ports.to_dict()1195 if workdir:1196 kwargs["working_dir"] = workdir1197 mounts = None1198 if mount_volumes:1199 mounts = Util.convert_mount_list_to_dict(mount_volumes)1200 def create_container():1201 return self.client().containers.create(1202 image=image_name,1203 command=command,1204 auto_remove=remove,1205 name=name,1206 stdin_open=interactive,1207 tty=tty,1208 entrypoint=entrypoint,1209 environment=env_vars,1210 detach=detach,1211 user=user,1212 network=network,1213 volumes=mounts,...
docker_sdk_client.py
Source:docker_sdk_client.py
...462 if workdir:463 kwargs["working_dir"] = workdir464 mounts = None465 if mount_volumes:466 mounts = Util.convert_mount_list_to_dict(mount_volumes)467 def create_container():468 return self.client().containers.create(469 image=image_name,470 command=command,471 auto_remove=remove,472 name=name,473 stdin_open=interactive,474 tty=tty,475 entrypoint=entrypoint,476 environment=env_vars,477 detach=detach,478 user=user,479 network=network,480 volumes=mounts,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!