Best Python code snippet using localstack_python
docker_utils.py
Source:docker_utils.py
...470 try:471 output = safe_run(cmd)472 image_names = output.splitlines()473 if strip_latest:474 Util.append_without_latest(image_names)475 return image_names476 except Exception as e:477 LOG.info('Unable to list Docker images via "%s": %s' % (cmd, e))478 return []479 def get_container_logs(self, container_name_or_id: str, safe=False) -> str:480 cmd = self._docker_cmd()481 cmd += ["logs", container_name_or_id]482 try:483 return safe_run(cmd)484 except subprocess.CalledProcessError as e:485 if safe:486 return ""487 if "No such container" in to_str(e.stdout):488 raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)489 else:490 raise ContainerException(491 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr492 )493 def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[Dict, str]]:494 cmd = self._docker_cmd()495 cmd += ["inspect", "--format", "{{json .}}", object_name_or_id]496 try:497 cmd_result = safe_run(cmd)498 except subprocess.CalledProcessError as e:499 if "No such object" in to_str(e.stdout):500 raise NoSuchObject(object_name_or_id, stdout=e.stdout, stderr=e.stderr)501 else:502 raise ContainerException(503 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr504 )505 image_data = json.loads(cmd_result.strip())506 return image_data507 def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:508 try:509 return self._inspect_object(container_name_or_id)510 except NoSuchObject as e:511 raise NoSuchContainer(container_name_or_id=e.object_id)512 def inspect_image(self, image_name: str) -> Dict[str, Union[Dict, str]]:513 try:514 return self._inspect_object(image_name)515 except NoSuchObject as e:516 raise NoSuchImage(image_name=e.object_id)517 def inspect_network(self, network_name: str) -> Dict[str, Union[Dict, str]]:518 try:519 return self._inspect_object(network_name)520 except NoSuchObject as e:521 raise NoSuchNetwork(network_name=e.object_id)522 def get_container_ip(self, container_name_or_id: str) -> str:523 cmd = self._docker_cmd()524 cmd += [525 "inspect",526 "--format",527 "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",528 container_name_or_id,529 ]530 try:531 return safe_run(cmd).strip()532 except subprocess.CalledProcessError as e:533 if "No such object" in to_str(e.stdout):534 raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)535 else:536 raise ContainerException(537 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr538 )539 def has_docker(self) -> bool:540 try:541 safe_run(self._docker_cmd() + ["ps"])542 return True543 except subprocess.CalledProcessError:544 return False545 def create_container(self, image_name: str, **kwargs) -> str:546 cmd, env_file = self._build_run_create_cmd("create", image_name, **kwargs)547 LOG.debug("Create container with cmd: %s", cmd)548 try:549 container_id = safe_run(cmd)550 # Note: strip off Docker warning messages like "DNS setting (--dns=127.0.0.1) may fail in containers"551 container_id = container_id.strip().split("\n")[-1]552 return container_id.strip()553 except subprocess.CalledProcessError as e:554 if "Unable to find image" in to_str(e.stdout):555 raise NoSuchImage(image_name, stdout=e.stdout, stderr=e.stderr)556 raise ContainerException(557 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr558 )559 finally:560 Util.rm_env_vars_file(env_file)561 def run_container(self, image_name: str, stdin=None, **kwargs) -> Tuple[bytes, bytes]:562 cmd, env_file = self._build_run_create_cmd("run", image_name, **kwargs)563 LOG.debug("Run container with cmd: %s", cmd)564 result = self._run_async_cmd(cmd, stdin, kwargs.get("name") or "", image_name)565 Util.rm_env_vars_file(env_file)566 return result567 def exec_in_container(568 self,569 container_name_or_id: str,570 command: Union[List[str], str],571 interactive=False,572 detach=False,573 env_vars: Optional[Dict[str, str]] = None,574 stdin: Optional[bytes] = None,575 user: Optional[str] = None,576 ) -> Tuple[bytes, bytes]:577 env_file = None578 cmd = self._docker_cmd()579 cmd.append("exec")580 if interactive:581 cmd.append("--interactive")582 if detach:583 cmd.append("--detach")584 if user:585 cmd += ["--user", user]586 if env_vars:587 env_flag, env_file = Util.create_env_vars_file_flag(env_vars)588 cmd += env_flag589 cmd.append(container_name_or_id)590 cmd += command if isinstance(command, List) else [command]591 LOG.debug("Execute in container cmd: %s", cmd)592 result = self._run_async_cmd(cmd, stdin, container_name_or_id)593 Util.rm_env_vars_file(env_file)594 return result595 def start_container(596 self,597 container_name_or_id: str,598 stdin=None,599 interactive: bool = False,600 attach: bool = False,601 flags: Optional[str] = None,602 ) -> Tuple[bytes, bytes]:603 cmd = self._docker_cmd() + ["start"]604 if flags:605 cmd.append(flags)606 if interactive:607 cmd.append("--interactive")608 if attach:609 cmd.append("--attach")610 cmd.append(container_name_or_id)611 LOG.debug("Start container with cmd: %s", cmd)612 return self._run_async_cmd(cmd, stdin, container_name_or_id)613 def _run_async_cmd(614 self, cmd: List[str], stdin: bytes, container_name: str, image_name=None615 ) -> Tuple[bytes, bytes]:616 kwargs = {617 "inherit_env": True,618 "asynchronous": True,619 "stderr": subprocess.PIPE,620 "outfile": subprocess.PIPE,621 }622 if stdin:623 kwargs["stdin"] = True624 try:625 process = safe_run(cmd, **kwargs)626 stdout, stderr = process.communicate(input=stdin)627 if process.returncode != 0:628 raise subprocess.CalledProcessError(629 process.returncode,630 cmd,631 stdout,632 stderr,633 )634 else:635 return stdout, stderr636 except subprocess.CalledProcessError as e:637 stderr_str = to_str(e.stderr)638 if "Unable to find image" in stderr_str:639 raise NoSuchImage(image_name or "", stdout=e.stdout, stderr=e.stderr)640 if "No such container" in stderr_str:641 raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)642 raise ContainerException(643 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr644 )645 def _build_run_create_cmd(646 self,647 action: str,648 image_name: str,649 *,650 name: Optional[str] = None,651 entrypoint: Optional[str] = None,652 remove: bool = False,653 interactive: bool = False,654 tty: bool = False,655 detach: bool = False,656 command: Optional[Union[List[str], str]] = None,657 mount_volumes: Optional[List[Tuple[str, str]]] = None,658 ports: Optional[PortMappings] = None,659 env_vars: Optional[Dict[str, str]] = None,660 user: Optional[str] = None,661 cap_add: Optional[str] = None,662 network: Optional[str] = None,663 dns: Optional[str] = None,664 additional_flags: Optional[str] = None,665 workdir: Optional[str] = None,666 ) -> Tuple[List[str], str]:667 env_file = None668 cmd = self._docker_cmd() + [action]669 if remove:670 cmd.append("--rm")671 if name:672 cmd += ["--name", name]673 if entrypoint is not None: # empty string entrypoint can be intentional674 cmd += ["--entrypoint", entrypoint]675 if mount_volumes:676 cmd += [677 volume678 for host_path, docker_path in mount_volumes679 for volume in ["-v", f"{host_path}:{docker_path}"]680 ]681 if interactive:682 cmd.append("--interactive")683 if tty:684 cmd.append("--tty")685 if detach:686 cmd.append("--detach")687 if ports:688 cmd += ports.to_list()689 if env_vars:690 env_flags, env_file = Util.create_env_vars_file_flag(env_vars)691 cmd += env_flags692 if user:693 cmd += ["--user", user]694 if cap_add:695 cmd += ["--cap-add", cap_add]696 if network:697 cmd += ["--network", network]698 if dns:699 cmd += ["--dns", dns]700 if workdir:701 cmd += ["--workdir", workdir]702 if additional_flags:703 cmd += shlex.split(additional_flags)704 cmd.append(image_name)705 if command:706 cmd += command if isinstance(command, List) else [command]707 return cmd, env_file708class Util:709 MAX_ENV_ARGS_LENGTH = 20000710 @classmethod711 def create_env_vars_file_flag(cls, env_vars: Dict) -> Tuple[List[str], Optional[str]]:712 if not env_vars:713 return [], None714 result = []715 env_vars = dict(env_vars)716 env_file = None717 if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:718 # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...719 env_file = cls.mountable_tmp_file()720 env_content = ""721 for name, value in dict(env_vars).items():722 if len(value) > cls.MAX_ENV_ARGS_LENGTH:723 # each line in the env file has a max size as well (error "bufio.Scanner: token too long")724 continue725 env_vars.pop(name)726 value = value.replace("\n", "\\")727 env_content += "%s=%s\n" % (name, value)728 save_file(env_file, env_content)729 result += ["--env-file", env_file]730 env_vars_res = [item for k, v in env_vars.items() for item in ["-e", "{}={}".format(k, v)]]731 result += env_vars_res732 return result, env_file733 @staticmethod734 def rm_env_vars_file(env_vars_file) -> None:735 if env_vars_file:736 return rm_rf(env_vars_file)737 @staticmethod738 def mountable_tmp_file():739 f = os.path.join(config.TMP_FOLDER, short_uid())740 TMP_FILES.append(f)741 return f742 @staticmethod743 def append_without_latest(image_names):744 suffix = ":latest"745 for image in list(image_names):746 if image.endswith(suffix):747 image_names.append(image[: -len(suffix)])748 @staticmethod749 def tar_path(path, target_path, is_dir: bool):750 f = tempfile.NamedTemporaryFile()751 with tarfile.open(mode="w", fileobj=f) as t:752 abs_path = os.path.abspath(path)753 arcname = (754 os.path.basename(path)755 if is_dir756 else (os.path.basename(target_path) or os.path.basename(path))757 )758 t.add(abs_path, arcname=arcname)759 f.seek(0)760 return f761 @staticmethod762 def untar_to_path(tardata, target_path):763 target_path = Path(target_path)764 with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:765 if target_path.is_dir():766 t.extractall(path=target_path)767 else:768 member = t.next()769 if member:770 member.name = target_path.name771 t.extract(member, target_path.parent)772 else:773 LOG.debug("File to copy empty, ignoring...")774 @staticmethod775 def parse_additional_flags(776 additional_flags: str,777 env_vars: Dict[str, str] = None,778 ports: PortMappings = None,779 mounts: List[Tuple[str, str]] = None,780 network: Optional[str] = None,781 ) -> Tuple[782 Dict[str, str], PortMappings, List[Tuple[str, str]], Optional[Dict[str, str]], Optional[str]783 ]:784 """Parses environment, volume and port flags passed as string785 :param additional_flags: String which contains the flag definitions786 :param env_vars: Dict with env vars. Will be modified in place.787 :param ports: PortMapping object. Will be modified in place.788 :param mounts: List of mount tuples (host_path, container_path). Will be modified in place.789 :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.790 :return: A tuple containing the env_vars, ports, mount, extra_hosts and network objects. Will return new objects791 if respective parameters were None and additional flags contained a flag for that object, the same which792 are passed otherwise.793 """794 cur_state = None795 extra_hosts = None796 # TODO Use argparse to simplify this logic797 for flag in shlex.split(additional_flags):798 if not cur_state:799 if flag in ["-v", "--volume"]:800 cur_state = "volume"801 elif flag in ["-p", "--publish"]:802 cur_state = "port"803 elif flag in ["-e", "--env"]:804 cur_state = "env"805 elif flag == "--add-host":806 cur_state = "add-host"807 elif flag == "--network":808 cur_state = "set-network"809 else:810 raise NotImplementedError(811 f"Flag {flag} is currently not supported by this Docker client."812 )813 else:814 if cur_state == "volume":815 mounts = mounts if mounts is not None else []816 match = re.match(817 r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",818 flag,819 )820 if not match:821 LOG.warning("Unable to parse volume mount Docker flags: %s", flag)822 continue823 host_path = match.group("host")824 container_path = match.group("container")825 rw_args = match.group("arg")826 if rw_args:827 LOG.info("Volume options like :ro or :rw are currently ignored.")828 mounts.append((host_path, container_path))829 elif cur_state == "port":830 port_split = flag.split(":")831 protocol = "tcp"832 if len(port_split) == 2:833 host_port, container_port = port_split834 elif len(port_split) == 3:835 LOG.warning(836 "Host part of port mappings are ignored currently in additional flags"837 )838 _, host_port, container_port = port_split839 else:840 raise ValueError("Invalid port string provided: %s", flag)841 if "/" in container_port:842 container_port, protocol = container_port.split("/")843 ports = ports if ports is not None else PortMappings()844 ports.add(int(host_port), int(container_port), protocol)845 elif cur_state == "env":846 lhs, _, rhs = flag.partition("=")847 env_vars = env_vars if env_vars is not None else {}848 env_vars[lhs] = rhs849 elif cur_state == "add-host":850 extra_hosts = extra_hosts if extra_hosts is not None else {}851 hosts_split = flag.split(":")852 extra_hosts[hosts_split[0]] = hosts_split[1]853 elif cur_state == "set-network":854 if network:855 LOG.warning(856 "Overwriting Docker container network '%s' with new value '%s'",857 network,858 flag,859 )860 network = flag861 cur_state = None862 return env_vars, ports, mounts, extra_hosts, network863 @staticmethod864 def convert_mount_list_to_dict(865 mount_volumes: List[Tuple[str, str]]866 ) -> Dict[str, Dict[str, str]]:867 """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""868 return dict(869 map(870 lambda paths: (str(paths[0]), {"bind": paths[1], "mode": "rw"}),871 mount_volumes,872 )873 )874class SdkDockerClient(ContainerClient):875 """Class for managing docker using the python docker sdk"""876 docker_client: Optional[DockerClient]877 def __init__(self):878 try:879 self.docker_client = docker.from_env()880 logging.getLogger("urllib3").setLevel(logging.INFO)881 except DockerException:882 self.docker_client = None883 def client(self):884 if self.docker_client:885 return self.docker_client886 else:887 raise ContainerException("Docker not available")888 def _read_from_sock(self, sock: socket, tty: bool):889 """Reads multiplexed messages from a socket returned by attach_socket.890 Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach891 """892 stdout = b""893 stderr = b""894 for frame_type, frame_data in frames_iter(sock, tty):895 if frame_type == STDOUT:896 stdout += frame_data897 elif frame_type == STDERR:898 stderr += frame_data899 else:900 raise ContainerException("Invalid frame type when reading from socket")901 return stdout, stderr902 def _container_path_info(self, container: Container, container_path: str):903 """904 Get information about a path in the given container905 :param container: Container to be inspected906 :param container_path: Path in container907 :return: Tuple (path_exists, path_is_directory)908 """909 # Docker CLI copy uses go FileMode to determine if target is a dict or not910 # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260911 # The isDir Bit is the most significant bit in the 32bit struct:912 # https://golang.org/src/os/types.go?s=2650:2683913 stats = {}914 try:915 _, stats = container.get_archive(container_path)916 target_exists = True917 except APIError:918 target_exists = False919 target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)920 return target_exists, target_is_dir921 def get_container_status(self, container_name: str) -> DockerContainerStatus:922 # LOG.debug("Getting container status for container: %s", container_name) # too verbose923 try:924 container = self.client().containers.get(container_name)925 if container.status == "running":926 return DockerContainerStatus.UP927 else:928 return DockerContainerStatus.DOWN929 except NotFound:930 return DockerContainerStatus.NON_EXISTENT931 except APIError:932 raise ContainerException()933 def get_network(self, container_name: str) -> str:934 LOG.debug("Getting network type for container: %s", container_name)935 try:936 container = self.client().containers.get(container_name)937 return container.attrs["HostConfig"]["NetworkMode"]938 except NotFound:939 raise NoSuchContainer(container_name)940 except APIError:941 raise ContainerException()942 def stop_container(self, container_name: str) -> None:943 LOG.debug("Stopping container: %s", container_name)944 try:945 container = self.client().containers.get(container_name)946 container.stop(timeout=0)947 except NotFound:948 raise NoSuchContainer(container_name)949 except APIError:950 raise ContainerException()951 def remove_container(self, container_name: str, force=True, check_existence=False) -> None:952 LOG.debug("Removing container: %s", container_name)953 if check_existence and container_name not in self.get_running_container_names():954 LOG.debug("Aborting removing due to check_existence check")955 return956 try:957 container = self.client().containers.get(container_name)958 container.remove(force=force)959 except NotFound:960 if not force:961 raise NoSuchContainer(container_name)962 except APIError:963 raise ContainerException()964 def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:965 if filter:966 filter = [filter] if isinstance(filter, str) else filter967 filter = dict([f.split("=", 1) for f in filter])968 LOG.debug("Listing containers with filters: %s", filter)969 try:970 container_list = self.client().containers.list(filters=filter, all=all)971 return list(972 map(973 lambda container: {974 "id": container.id,975 "image": container.image,976 "name": container.name,977 "status": container.status,978 "labels": container.labels,979 },980 container_list,981 )982 )983 except APIError:984 raise ContainerException()985 def copy_into_container(986 self, container_name: str, local_path: str, container_path: str987 ) -> None: # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/988 LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)989 try:990 container = self.client().containers.get(container_name)991 target_exists, target_isdir = self._container_path_info(container, container_path)992 target_path = container_path if target_isdir else os.path.dirname(container_path)993 with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:994 container.put_archive(target_path, tar)995 except NotFound:996 raise NoSuchContainer(container_name)997 except APIError:998 raise ContainerException()999 def copy_from_container(1000 self,1001 container_name: str,1002 local_path: str,1003 container_path: str,1004 ) -> None:1005 LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)1006 try:1007 container = self.client().containers.get(container_name)1008 bits, _ = container.get_archive(container_path)1009 Util.untar_to_path(bits, local_path)1010 except NotFound:1011 raise NoSuchContainer(container_name)1012 except APIError:1013 raise ContainerException()1014 def pull_image(self, docker_image: str) -> None:1015 LOG.debug("Pulling image: %s", docker_image)1016 # some path in the docker image string indicates a custom repository1017 path_split = docker_image.rpartition("/")1018 image_split = path_split[2].partition(":")1019 repository = f"{path_split[0]}{path_split[1]}{image_split[0]}"1020 tag = image_split[2]1021 try:1022 LOG.debug("Repository: %s Tag: %s", repository, tag)1023 self.client().images.pull(repository, tag)1024 except ImageNotFound:1025 raise NoSuchImage(docker_image)1026 except APIError:1027 raise ContainerException()1028 def get_docker_image_names(self, strip_latest=True, include_tags=True):1029 try:1030 images = self.client().images.list()1031 image_names = [tag for image in images for tag in image.tags if image.tags]1032 if not include_tags:1033 image_names = list(map(lambda image_name: image_name.split(":")[0], image_names))1034 if strip_latest:1035 Util.append_without_latest(image_names)1036 return image_names1037 except APIError:1038 raise ContainerException()1039 def get_container_logs(self, container_name_or_id: str, safe=False) -> str:1040 try:1041 container = self.client().containers.get(container_name_or_id)1042 return to_str(container.logs())1043 except NotFound:1044 if safe:1045 return ""1046 raise NoSuchContainer(container_name_or_id)1047 except APIError:1048 if safe:1049 return ""...
docker.py
Source:docker.py
...447 try:448 output = safe_run(cmd)449 image_names = output.splitlines()450 if strip_latest:451 Util.append_without_latest(image_names)452 return image_names453 except Exception as e:454 LOG.info('Unable to list Docker images via "%s": %s' % (cmd, e))455 return []456 def get_container_logs(self, container_name_or_id: str, safe=False) -> str:457 cmd = self._docker_cmd()458 cmd += ["logs", container_name_or_id]459 try:460 return safe_run(cmd)461 except subprocess.CalledProcessError as e:462 if safe:463 return ""464 if "No such container" in to_str(e.stdout):465 raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)466 else:467 raise ContainerException(468 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr469 )470 def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[Dict, str]]:471 cmd = self._docker_cmd()472 cmd += ["inspect", "--format", "{{json .}}", object_name_or_id]473 try:474 cmd_result = safe_run(cmd)475 except subprocess.CalledProcessError as e:476 if "No such object" in to_str(e.stdout):477 raise NoSuchObject(object_name_or_id, stdout=e.stdout, stderr=e.stderr)478 else:479 raise ContainerException(480 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr481 )482 image_data = json.loads(cmd_result.strip())483 return image_data484 def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:485 try:486 return self._inspect_object(container_name_or_id)487 except NoSuchObject as e:488 raise NoSuchContainer(container_name_or_id=e.object_id)489 def inspect_image(self, image_name: str) -> Dict[str, Union[Dict, str]]:490 try:491 return self._inspect_object(image_name)492 except NoSuchObject as e:493 raise NoSuchImage(image_name=e.object_id)494 def get_container_ip(self, container_name_or_id: str) -> str:495 cmd = self._docker_cmd()496 cmd += [497 "inspect",498 "--format",499 "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",500 container_name_or_id,501 ]502 try:503 return safe_run(cmd).strip()504 except subprocess.CalledProcessError as e:505 if "No such object" in to_str(e.stdout):506 raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)507 else:508 raise ContainerException(509 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr510 )511 def has_docker(self) -> bool:512 try:513 safe_run(self._docker_cmd() + ["ps"])514 return True515 except subprocess.CalledProcessError:516 return False517 def create_container(self, image_name: str, **kwargs) -> str:518 cmd, env_file = self._build_run_create_cmd("create", image_name, **kwargs)519 LOG.debug("Create container with cmd: %s", cmd)520 try:521 container_id = safe_run(cmd)522 # Note: strip off Docker warning messages like "DNS setting (--dns=127.0.0.1) may fail in containers"523 container_id = container_id.strip().split("\n")[-1]524 return container_id.strip()525 except subprocess.CalledProcessError as e:526 if "Unable to find image" in to_str(e.stdout):527 raise NoSuchImage(image_name, stdout=e.stdout, stderr=e.stderr)528 raise ContainerException(529 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr530 )531 finally:532 Util.rm_env_vars_file(env_file)533 def run_container(self, image_name: str, stdin=None, **kwargs) -> Tuple[bytes, bytes]:534 cmd, env_file = self._build_run_create_cmd("run", image_name, **kwargs)535 LOG.debug("Run container with cmd: %s", cmd)536 result = self._run_async_cmd(cmd, stdin, kwargs.get("name") or "", image_name)537 Util.rm_env_vars_file(env_file)538 return result539 def exec_in_container(540 self,541 container_name_or_id: str,542 command: Union[List[str], str],543 interactive=False,544 detach=False,545 env_vars: Optional[Dict[str, str]] = None,546 stdin: Optional[bytes] = None,547 user: Optional[str] = None,548 ) -> Tuple[bytes, bytes]:549 env_file = None550 cmd = self._docker_cmd()551 cmd.append("exec")552 if interactive:553 cmd.append("--interactive")554 if detach:555 cmd.append("--detach")556 if user:557 cmd += ["--user", user]558 if env_vars:559 env_flag, env_file = Util.create_env_vars_file_flag(env_vars)560 cmd += env_flag561 cmd.append(container_name_or_id)562 cmd += command if isinstance(command, List) else [command]563 LOG.debug("Execute in container cmd: %s", cmd)564 result = self._run_async_cmd(cmd, stdin, container_name_or_id)565 Util.rm_env_vars_file(env_file)566 return result567 def start_container(568 self,569 container_name_or_id: str,570 stdin=None,571 interactive: bool = False,572 attach: bool = False,573 flags: Optional[str] = None,574 ) -> Tuple[bytes, bytes]:575 cmd = self._docker_cmd() + ["start"]576 if flags:577 cmd.append(flags)578 if interactive:579 cmd.append("--interactive")580 if attach:581 cmd.append("--attach")582 cmd.append(container_name_or_id)583 LOG.debug("Start container with cmd: %s", cmd)584 return self._run_async_cmd(cmd, stdin, container_name_or_id)585 def _run_async_cmd(586 self, cmd: List[str], stdin: bytes, container_name: str, image_name=None587 ) -> Tuple[bytes, bytes]:588 kwargs = {589 "inherit_env": True,590 "asynchronous": True,591 "stderr": subprocess.PIPE,592 "outfile": subprocess.PIPE,593 }594 if stdin:595 kwargs["stdin"] = True596 try:597 process = safe_run(cmd, **kwargs)598 stdout, stderr = process.communicate(input=stdin)599 if process.returncode != 0:600 raise subprocess.CalledProcessError(601 process.returncode,602 cmd,603 stdout,604 stderr,605 )606 else:607 return stdout, stderr608 except subprocess.CalledProcessError as e:609 stderr_str = to_str(e.stderr)610 if "Unable to find image" in stderr_str:611 raise NoSuchImage(image_name or "", stdout=e.stdout, stderr=e.stderr)612 if "No such container" in stderr_str:613 raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)614 raise ContainerException(615 "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr616 )617 def _build_run_create_cmd(618 self,619 action: str,620 image_name: str,621 *,622 name: Optional[str] = None,623 entrypoint: Optional[str] = None,624 remove: bool = False,625 interactive: bool = False,626 tty: bool = False,627 detach: bool = False,628 command: Optional[Union[List[str], str]] = None,629 mount_volumes: Optional[List[Tuple[str, str]]] = None,630 ports: Optional[PortMappings] = None,631 env_vars: Optional[Dict[str, str]] = None,632 user: Optional[str] = None,633 cap_add: Optional[str] = None,634 network: Optional[str] = None,635 dns: Optional[str] = None,636 additional_flags: Optional[str] = None,637 ) -> Tuple[List[str], str]:638 env_file = None639 cmd = self._docker_cmd() + [action]640 if remove:641 cmd.append("--rm")642 if name:643 cmd += ["--name", name]644 if entrypoint is not None: # empty string entrypoint can be intentional645 cmd += ["--entrypoint", entrypoint]646 if mount_volumes:647 cmd += [648 volume649 for host_path, docker_path in mount_volumes650 for volume in ["-v", f"{host_path}:{docker_path}"]651 ]652 if interactive:653 cmd.append("--interactive")654 if tty:655 cmd.append("--tty")656 if detach:657 cmd.append("--detach")658 if ports:659 cmd += ports.to_list()660 if env_vars:661 env_flags, env_file = Util.create_env_vars_file_flag(env_vars)662 cmd += env_flags663 if user:664 cmd += ["--user", user]665 if cap_add:666 cmd += ["--cap-add", cap_add]667 if network:668 cmd += ["--network", network]669 if dns:670 cmd += ["--dns", dns]671 if additional_flags:672 cmd += shlex.split(additional_flags)673 cmd.append(image_name)674 if command:675 cmd += command if isinstance(command, List) else [command]676 return cmd, env_file677class Util:678 MAX_ENV_ARGS_LENGTH = 20000679 @classmethod680 def create_env_vars_file_flag(cls, env_vars: Dict) -> Tuple[List[str], Optional[str]]:681 if not env_vars:682 return [], None683 result = []684 env_vars = dict(env_vars)685 env_file = None686 if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:687 # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...688 env_file = cls.mountable_tmp_file()689 env_content = ""690 for name, value in dict(env_vars).items():691 if len(value) > cls.MAX_ENV_ARGS_LENGTH:692 # each line in the env file has a max size as well (error "bufio.Scanner: token too long")693 continue694 env_vars.pop(name)695 value = value.replace("\n", "\\")696 env_content += "%s=%s\n" % (name, value)697 save_file(env_file, env_content)698 result += ["--env-file", env_file]699 env_vars_res = [item for k, v in env_vars.items() for item in ["-e", "{}={}".format(k, v)]]700 result += env_vars_res701 return result, env_file702 @staticmethod703 def rm_env_vars_file(env_vars_file) -> None:704 if env_vars_file:705 return rm_rf(env_vars_file)706 @staticmethod707 def mountable_tmp_file():708 f = os.path.join(config.TMP_FOLDER, short_uid())709 TMP_FILES.append(f)710 return f711 @staticmethod712 def append_without_latest(image_names):713 suffix = ":latest"714 for image in list(image_names):715 if image.endswith(suffix):716 image_names.append(image[: -len(suffix)])717 @staticmethod718 def tar_path(path, target_path, is_dir: bool):719 f = tempfile.NamedTemporaryFile()720 with tarfile.open(mode="w", fileobj=f) as t:721 abs_path = os.path.abspath(path)722 arcname = (723 os.path.basename(path)724 if is_dir725 else (os.path.basename(target_path) or os.path.basename(path))726 )727 t.add(abs_path, arcname=arcname)728 f.seek(0)729 return f730 @staticmethod731 def untar_to_path(tardata, target_path):732 target_path = Path(target_path)733 with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:734 if target_path.is_dir():735 t.extractall(path=target_path)736 else:737 member = t.next()738 if member:739 member.name = target_path.name740 t.extract(member, target_path.parent)741 else:742 LOG.debug("File to copy empty, ignoring...")743 @staticmethod744 def parse_additional_flags(745 additional_flags: str,746 env_vars: Dict[str, str] = None,747 ports: PortMappings = None,748 mounts: List[Tuple[str, str]] = None,749 network: Optional[str] = None,750 ) -> Tuple[751 Dict[str, str], PortMappings, List[Tuple[str, str]], Optional[Dict[str, str]], Optional[str]752 ]:753 """Parses environment, volume and port flags passed as string754 :param additional_flags: String which contains the flag definitions755 :param env_vars: Dict with env vars. Will be modified in place.756 :param ports: PortMapping object. Will be modified in place.757 :param mounts: List of mount tuples (host_path, container_path). Will be modified in place.758 :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.759 :return: A tuple containing the env_vars, ports, mount, extra_hosts and network objects. Will return new objects760 if respective parameters were None and additional flags contained a flag for that object, the same which761 are passed otherwise.762 """763 cur_state = None764 extra_hosts = None765 # TODO Use argparse to simplify this logic766 for flag in shlex.split(additional_flags):767 if not cur_state:768 if flag in ["-v", "--volume"]:769 cur_state = "volume"770 elif flag in ["-p", "--publish"]:771 cur_state = "port"772 elif flag in ["-e", "--env"]:773 cur_state = "env"774 elif flag == "--add-host":775 cur_state = "add-host"776 elif flag == "--network":777 cur_state = "set-network"778 else:779 raise NotImplementedError(780 f"Flag {flag} is currently not supported by this Docker client."781 )782 else:783 if cur_state == "volume":784 mounts = mounts if mounts is not None else []785 match = re.match(786 r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",787 flag,788 )789 if not match:790 LOG.warning("Unable to parse volume mount Docker flags: %s", flag)791 continue792 host_path = match.group("host")793 container_path = match.group("container")794 rw_args = match.group("arg")795 if rw_args:796 LOG.info("Volume options like :ro or :rw are currently ignored.")797 mounts.append((host_path, container_path))798 elif cur_state == "port":799 port_split = flag.split(":")800 protocol = "tcp"801 if len(port_split) == 2:802 host_port, container_port = port_split803 elif len(port_split) == 3:804 LOG.warning(805 "Host part of port mappings are ignored currently in additional flags"806 )807 _, host_port, container_port = port_split808 else:809 raise ValueError("Invalid port string provided: %s", flag)810 if "/" in container_port:811 container_port, protocol = container_port.split("/")812 ports = ports if ports is not None else PortMappings()813 ports.add(int(host_port), int(container_port), protocol)814 elif cur_state == "env":815 lhs, _, rhs = flag.partition("=")816 env_vars = env_vars if env_vars is not None else {}817 env_vars[lhs] = rhs818 elif cur_state == "add-host":819 extra_hosts = extra_hosts if extra_hosts is not None else {}820 hosts_split = flag.split(":")821 extra_hosts[hosts_split[0]] = hosts_split[1]822 elif cur_state == "set-network":823 if network:824 LOG.warning(825 "Overwriting Docker container network '%s' with new value '%s'",826 network,827 flag,828 )829 network = flag830 cur_state = None831 return env_vars, ports, mounts, extra_hosts, network832 @staticmethod833 def convert_mount_list_to_dict(834 mount_volumes: List[Tuple[str, str]]835 ) -> Dict[str, Dict[str, str]]:836 """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""837 return dict(838 map(839 lambda paths: (str(paths[0]), {"bind": paths[1], "mode": "rw"}),840 mount_volumes,841 )842 )843class SdkDockerClient(ContainerClient):844 """Class for managing docker using the python docker sdk"""845 docker_client: Optional[DockerClient]846 def __init__(self):847 try:848 self.docker_client = docker.from_env()849 logging.getLogger("urllib3").setLevel(logging.INFO)850 except DockerException:851 self.docker_client = None852 def client(self):853 if self.docker_client:854 return self.docker_client855 else:856 raise ContainerException("Docker not available")857 def _read_from_sock(self, sock: socket, tty: bool):858 """Reads multiplexed messages from a socket returned by attach_socket.859 Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach860 """861 stdout = b""862 stderr = b""863 for frame_type, frame_data in frames_iter(sock, tty):864 if frame_type == STDOUT:865 stdout += frame_data866 elif frame_type == STDERR:867 stderr += frame_data868 else:869 raise ContainerException("Invalid frame type when reading from socket")870 return stdout, stderr871 def _container_path_info(self, container: Container, container_path: str):872 """873 Get information about a path in the given container874 :param container: Container to be inspected875 :param container_path: Path in container876 :return: Tuple (path_exists, path_is_directory)877 """878 # Docker CLI copy uses go FileMode to determine if target is a dict or not879 # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260880 # The isDir Bit is the most significant bit in the 32bit struct:881 # https://golang.org/src/os/types.go?s=2650:2683882 stats = {}883 try:884 _, stats = container.get_archive(container_path)885 target_exists = True886 except APIError:887 target_exists = False888 target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)889 return target_exists, target_is_dir890 def get_container_status(self, container_name: str) -> DockerContainerStatus:891 # LOG.debug("Getting container status for container: %s", container_name) # too verbose892 try:893 container = self.client().containers.get(container_name)894 if container.status == "running":895 return DockerContainerStatus.UP896 else:897 return DockerContainerStatus.DOWN898 except NotFound:899 return DockerContainerStatus.NON_EXISTENT900 except APIError:901 raise ContainerException()902 def get_network(self, container_name: str) -> str:903 LOG.debug("Getting network type for container: %s", container_name)904 try:905 container = self.client().containers.get(container_name)906 return container.attrs["HostConfig"]["NetworkMode"]907 except NotFound:908 raise NoSuchContainer(container_name)909 except APIError:910 raise ContainerException()911 def stop_container(self, container_name: str) -> None:912 LOG.debug("Stopping container: %s", container_name)913 try:914 container = self.client().containers.get(container_name)915 container.stop(timeout=0)916 except NotFound:917 raise NoSuchContainer(container_name)918 except APIError:919 raise ContainerException()920 def remove_container(self, container_name: str, force=True, check_existence=False) -> None:921 LOG.debug("Removing container: %s", container_name)922 if check_existence and container_name not in self.get_running_container_names():923 LOG.debug("Aborting removing due to check_existence check")924 return925 try:926 container = self.client().containers.get(container_name)927 container.remove(force=force)928 except NotFound:929 if not force:930 raise NoSuchContainer(container_name)931 except APIError:932 raise ContainerException()933 def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:934 if filter:935 filter = [filter] if isinstance(filter, str) else filter936 filter = dict([f.split("=", 1) for f in filter])937 LOG.debug("Listing containers with filters: %s", filter)938 try:939 container_list = self.client().containers.list(filters=filter, all=all)940 return list(941 map(942 lambda container: {943 "id": container.id,944 "image": container.image,945 "name": container.name,946 "status": container.status,947 "labels": container.labels,948 },949 container_list,950 )951 )952 except APIError:953 raise ContainerException()954 def copy_into_container(955 self, container_name: str, local_path: str, container_path: str956 ) -> None: # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/957 LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)958 try:959 container = self.client().containers.get(container_name)960 target_exists, target_isdir = self._container_path_info(container, container_path)961 target_path = container_path if target_isdir else os.path.dirname(container_path)962 with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:963 container.put_archive(target_path, tar)964 except NotFound:965 raise NoSuchContainer(container_name)966 except APIError:967 raise ContainerException()968 def copy_from_container(969 self,970 container_name: str,971 local_path: str,972 container_path: str,973 ) -> None:974 LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)975 try:976 container = self.client().containers.get(container_name)977 bits, _ = container.get_archive(container_path)978 Util.untar_to_path(bits, local_path)979 except NotFound:980 raise NoSuchContainer(container_name)981 except APIError:982 raise ContainerException()983 def pull_image(self, docker_image: str) -> None:984 LOG.debug("Pulling image: %s", docker_image)985 # some path in the docker image string indicates a custom repository986 path_split = docker_image.rpartition("/")987 image_split = path_split[2].partition(":")988 repository = f"{path_split[0]}{path_split[1]}{image_split[0]}"989 tag = image_split[2]990 try:991 LOG.debug("Repository: %s Tag: %s", repository, tag)992 self.client().images.pull(repository, tag)993 except ImageNotFound:994 raise NoSuchImage(docker_image)995 except APIError:996 raise ContainerException()997 def get_docker_image_names(self, strip_latest=True, include_tags=True):998 try:999 images = self.client().images.list()1000 image_names = [tag for image in images for tag in image.tags if image.tags]1001 if not include_tags:1002 image_names = list(map(lambda image_name: image_name.split(":")[0], image_names))1003 if strip_latest:1004 Util.append_without_latest(image_names)1005 return image_names1006 except APIError:1007 raise ContainerException()1008 def get_container_logs(self, container_name_or_id: str, safe=False) -> str:1009 try:1010 container = self.client().containers.get(container_name_or_id)1011 return to_str(container.logs())1012 except NotFound:1013 if safe:1014 return ""1015 raise NoSuchContainer(container_name_or_id)1016 except APIError:1017 if safe:1018 return ""...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!