Best Python code snippet using localstack_python
lambda_executors.py
Source:lambda_executors.py
...567 event = {}568 event_body = json.dumps(json_safe(event))569 event_bytes_for_stdin = self.prepare_event(environment, event_body)570 inv_context.event = event_bytes_for_stdin571 Util.inject_endpoints_into_env(environment)572 environment["EDGE_PORT"] = str(config.EDGE_PORT)573 environment[LAMBDA_HANDLER_ENV_VAR_NAME] = handler574 if os.environ.get("HTTP_PROXY"):575 environment["HTTP_PROXY"] = os.environ["HTTP_PROXY"]576 if lambda_function.timeout:577 environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)578 if context:579 environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name580 environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version581 environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn582 environment["AWS_LAMBDA_COGNITO_IDENTITY"] = json.dumps(context.cognito_identity or {})583 if context.client_context is not None:584 environment["AWS_LAMBDA_CLIENT_CONTEXT"] = json.dumps(585 to_str(base64.b64decode(to_bytes(context.client_context)))586 )587 # pass JVM options to the Lambda environment, if configured588 if config.LAMBDA_JAVA_OPTS and is_java_lambda(runtime):589 if environment.get("JAVA_TOOL_OPTIONS"):590 LOG.info(591 "Skip setting LAMBDA_JAVA_OPTS as JAVA_TOOL_OPTIONS already defined in Lambda env vars"592 )593 else:594 LOG.debug(595 "Passing JVM options to container environment: JAVA_TOOL_OPTIONS=%s"596 % config.LAMBDA_JAVA_OPTS597 )598 environment["JAVA_TOOL_OPTIONS"] = config.LAMBDA_JAVA_OPTS599 # accept any self-signed certificates for outgoing calls from the Lambda600 if is_nodejs_runtime(runtime):601 environment["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"602 # run Lambda executor and fetch invocation result603 LOG.info("Running lambda: %s" % lambda_function.arn())604 result = self.run_lambda_executor(lambda_function=lambda_function, inv_context=inv_context)605 return result606 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:607 if config.LAMBDA_REMOTE_DOCKER:608 LOG.info("TODO: copy file into container for LAMBDA_REMOTE_DOCKER=1 - %s", local_file)609 return local_file610 mountable_file = Util.get_host_path_for_path_in_docker(local_file)611 _, extension = os.path.splitext(local_file)612 target_file_name = f"{md5(local_file)}{extension}"613 target_path = f"/tmp/{target_file_name}"614 inv_context.docker_flags = inv_context.docker_flags or ""615 inv_context.docker_flags += f"-v {mountable_file}:{target_path}"616 return target_path617class LambdaExecutorReuseContainers(LambdaExecutorContainers):618 """Executor class for executing Lambda functions in re-usable Docker containers"""619 def __init__(self):620 super(LambdaExecutorReuseContainers, self).__init__()621 # locking thread for creation/destruction of docker containers.622 self.docker_container_lock = threading.RLock()623 # On each invocation we try to construct a port unlikely to conflict624 # with a previously invoked lambda function. This is a problem with at625 # least the lambci/lambda:go1.x container, which execs a go program that626 # attempts to bind to the same default port.627 self.next_port = 0628 self.max_port = LAMBDA_SERVER_UNIQUE_PORTS629 self.port_offset = LAMBDA_SERVER_PORT_OFFSET630 def execute_in_container(631 self,632 lambda_function: LambdaFunction,633 inv_context: InvocationContext,634 stdin=None,635 background=False,636 ) -> Tuple[bytes, bytes]:637 func_arn = lambda_function.arn()638 lambda_cwd = lambda_function.cwd639 runtime = lambda_function.runtime640 env_vars = inv_context.environment641 # check whether the Lambda has been invoked before642 has_been_invoked_before = func_arn in self.function_invoke_times643 # Choose a port for this invocation644 with self.docker_container_lock:645 env_vars["_LAMBDA_SERVER_PORT"] = str(self.next_port + self.port_offset)646 self.next_port = (self.next_port + 1) % self.max_port647 # create/verify the docker container is running.648 LOG.debug(649 'Priming docker container with runtime "%s" and arn "%s".',650 runtime,651 func_arn,652 )653 container_info = self.prime_docker_container(654 lambda_function, dict(env_vars), lambda_cwd, inv_context.docker_flags655 )656 if not inv_context.lambda_command and inv_context.handler:657 command = container_info.entry_point.split()658 command.append(inv_context.handler)659 inv_context.lambda_command = command660 # determine files to be copied into the container661 if not has_been_invoked_before and config.LAMBDA_REMOTE_DOCKER:662 # if this is the first invocation: copy the entire folder into the container663 DOCKER_CLIENT.copy_into_container(664 container_info.name, f"{lambda_cwd}/.", DOCKER_TASK_FOLDER665 )666 return DOCKER_CLIENT.exec_in_container(667 container_name_or_id=container_info.name,668 command=inv_context.lambda_command,669 interactive=True,670 env_vars=env_vars,671 stdin=stdin,672 )673 def _execute(self, func_arn, *args, **kwargs):674 if not LAMBDA_CONCURRENCY_LOCK.get(func_arn):675 concurrency_lock = threading.RLock()676 LAMBDA_CONCURRENCY_LOCK[func_arn] = concurrency_lock677 with LAMBDA_CONCURRENCY_LOCK[func_arn]:678 return super(LambdaExecutorReuseContainers, self)._execute(func_arn, *args, **kwargs)679 def startup(self):680 self.cleanup()681 # start a process to remove idle containers682 if config.LAMBDA_REMOVE_CONTAINERS:683 self.start_idle_container_destroyer_interval()684 def cleanup(self, arn=None):685 if arn:686 self.function_invoke_times.pop(arn, None)687 return self.destroy_docker_container(arn)688 self.function_invoke_times = {}689 return self.destroy_existing_docker_containers()690 def prime_docker_container(691 self,692 lambda_function: LambdaFunction,693 env_vars: Dict,694 lambda_cwd: str,695 docker_flags: str = None,696 ):697 """698 Prepares a persistent docker container for a specific function.699 :param lambda_function: The Details of the lambda function.700 :param env_vars: The environment variables for the lambda.701 :param lambda_cwd: The local directory containing the code for the lambda function.702 :return: ContainerInfo class containing the container name and default entry point.703 """704 with self.docker_container_lock:705 # Get the container name and id.706 func_arn = lambda_function.arn()707 container_name = self.get_container_name(func_arn)708 status = self.get_docker_container_status(func_arn)709 LOG.debug('Priming Docker container (status "%s"): %s' % (status, container_name))710 docker_image = Util.docker_image_for_lambda(lambda_function)711 # Container is not running or doesn't exist.712 if status < 1:713 # Make sure the container does not exist in any form/state.714 self.destroy_docker_container(func_arn)715 # get container startup command and run it716 LOG.debug("Creating container: %s" % container_name)717 self.create_container(lambda_function, env_vars, lambda_cwd, docker_flags)718 if config.LAMBDA_REMOTE_DOCKER:719 LOG.debug(720 'Copying files to container "%s" from "%s".' % (container_name, lambda_cwd)721 )722 DOCKER_CLIENT.copy_into_container(723 container_name, "%s/." % lambda_cwd, DOCKER_TASK_FOLDER724 )725 LOG.debug("Starting docker-reuse Lambda container: %s", container_name)726 DOCKER_CLIENT.start_container(container_name)727 # give the container some time to start up728 time.sleep(1)729 container_network = self.get_docker_container_network(func_arn)730 entry_point = DOCKER_CLIENT.get_image_entrypoint(docker_image)731 LOG.debug(732 'Using entrypoint "%s" for container "%s" on network "%s".'733 % (entry_point, container_name, container_network)734 )735 return ContainerInfo(container_name, entry_point)736 def create_container(737 self,738 lambda_function: LambdaFunction,739 env_vars: Dict,740 lambda_cwd: str,741 docker_flags: str = None,742 ):743 docker_image = Util.docker_image_for_lambda(lambda_function)744 container_name = self.get_container_name(lambda_function.arn())745 # make sure we set LOCALSTACK_HOSTNAME746 Util.inject_endpoints_into_env(env_vars)747 # make sure AWS_LAMBDA_EVENT_BODY is not set (otherwise causes issues with "docker exec ..." above)748 env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)749 network = config.LAMBDA_DOCKER_NETWORK750 additional_flags = docker_flags751 dns = config.LAMBDA_DOCKER_DNS752 mount_volumes = not config.LAMBDA_REMOTE_DOCKER753 lambda_cwd_on_host = Util.get_host_path_for_path_in_docker(lambda_cwd)754 if ":" in lambda_cwd and "\\" in lambda_cwd:755 lambda_cwd_on_host = Util.format_windows_path(lambda_cwd_on_host)756 mount_volumes = [(lambda_cwd_on_host, DOCKER_TASK_FOLDER)] if mount_volumes else None757 if os.environ.get("HOSTNAME"):758 env_vars["HOSTNAME"] = os.environ.get("HOSTNAME")759 env_vars["EDGE_PORT"] = config.EDGE_PORT760 LOG.debug(761 "Creating docker-reuse Lambda container %s from image %s", container_name, docker_image762 )763 return DOCKER_CLIENT.create_container(764 image_name=docker_image,765 remove=True,766 interactive=True,767 name=container_name,768 entrypoint="/bin/bash",769 network=network,770 env_vars=env_vars,771 dns=dns,772 mount_volumes=mount_volumes,773 additional_flags=additional_flags,774 )775 def destroy_docker_container(self, func_arn):776 """777 Stops and/or removes a docker container for a specific lambda function ARN.778 :param func_arn: The ARN of the lambda function.779 :return: None780 """781 with self.docker_container_lock:782 status = self.get_docker_container_status(func_arn)783 # Get the container name and id.784 container_name = self.get_container_name(func_arn)785 if status == 1:786 LOG.debug("Stopping container: %s" % container_name)787 DOCKER_CLIENT.stop_container(container_name)788 status = self.get_docker_container_status(func_arn)789 if status == -1:790 LOG.debug("Removing container: %s" % container_name)791 rm_docker_container(container_name, safe=True)792 # clean up function invoke times, as some init logic depends on this793 self.function_invoke_times.pop(func_arn, None)794 def get_all_container_names(self):795 """796 Returns a list of container names for lambda containers.797 :return: A String[] localstack docker container names for each function.798 """799 with self.docker_container_lock:800 LOG.debug("Getting all lambda containers names.")801 list_result = DOCKER_CLIENT.list_containers(802 filter=f"name={self.get_container_prefix()}*"803 )804 container_names = list(map(lambda container: container["name"], list_result))805 return container_names806 def destroy_existing_docker_containers(self):807 """808 Stops and/or removes all lambda docker containers for localstack.809 :return: None810 """811 with self.docker_container_lock:812 container_names = self.get_all_container_names()813 LOG.debug("Removing %d containers." % len(container_names))814 for container_name in container_names:815 DOCKER_CLIENT.remove_container(container_name)816 def get_docker_container_status(self, func_arn):817 """818 Determine the status of a docker container.819 :param func_arn: The ARN of the lambda function.820 :return: 1 If the container is running,821 -1 if the container exists but is not running822 0 if the container does not exist.823 """824 with self.docker_container_lock:825 # Get the container name and id.826 container_name = self.get_container_name(func_arn)827 container_status = DOCKER_CLIENT.get_container_status(container_name)828 return container_status.value829 def get_docker_container_network(self, func_arn):830 """831 Determine the network of a docker container.832 :param func_arn: The ARN of the lambda function.833 :return: name of the container network834 """835 with self.docker_container_lock:836 status = self.get_docker_container_status(func_arn)837 # container does not exist838 if status == 0:839 return ""840 # Get the container name.841 container_name = self.get_container_name(func_arn)842 container_network = DOCKER_CLIENT.get_network(container_name)843 return container_network844 def idle_container_destroyer(self):845 """846 Iterates though all the lambda containers and destroys any container that has847 been inactive for longer than MAX_CONTAINER_IDLE_TIME_MS.848 :return: None849 """850 LOG.debug("Checking if there are idle containers ...")851 current_time = int(time.time() * 1000)852 for func_arn, last_run_time in dict(self.function_invoke_times).items():853 duration = current_time - last_run_time854 # not enough idle time has passed855 if duration < MAX_CONTAINER_IDLE_TIME_MS:856 continue857 # container has been idle, destroy it.858 self.destroy_docker_container(func_arn)859 def start_idle_container_destroyer_interval(self):860 """861 Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.862 Thus checking for idle containers and destroying them.863 :return: None864 """865 self.idle_container_destroyer()866 threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()867 def get_container_prefix(self) -> str:868 """869 Returns the prefix of all docker-reuse lambda containers for this LocalStack instance870 :return: Lambda container name prefix871 """872 return f"{bootstrap.get_main_container_name()}_lambda_"873 def get_container_name(self, func_arn):874 """875 Given a function ARN, returns a valid docker container name.876 :param func_arn: The ARN of the lambda function.877 :return: A docker compatible name for the arn.878 """879 return self.get_container_prefix() + re.sub(r"[^a-zA-Z0-9_.-]", "_", func_arn)880class LambdaExecutorSeparateContainers(LambdaExecutorContainers):881 def __init__(self):882 super(LambdaExecutorSeparateContainers, self).__init__()883 self.max_port = LAMBDA_API_UNIQUE_PORTS884 self.port_offset = LAMBDA_API_PORT_OFFSET885 def prepare_event(self, environment: Dict, event_body: str) -> bytes:886 # Tell Lambci to use STDIN for the event887 environment["DOCKER_LAMBDA_USE_STDIN"] = "1"888 return event_body.encode()889 def execute_in_container(890 self,891 lambda_function: LambdaFunction,892 inv_context: InvocationContext,893 stdin=None,894 background=False,895 ) -> Tuple[bytes, bytes]:896 lambda_cwd = lambda_function.cwd897 env_vars = inv_context.environment898 entrypoint = None899 if inv_context.lambda_command:900 entrypoint = ""901 elif inv_context.handler:902 inv_context.lambda_command = inv_context.handler903 # add Docker Lambda env vars904 network = config.LAMBDA_DOCKER_NETWORK or None905 if network == "host":906 port = get_free_tcp_port()907 env_vars["DOCKER_LAMBDA_API_PORT"] = port908 env_vars["DOCKER_LAMBDA_RUNTIME_PORT"] = port909 additional_flags = inv_context.docker_flags or ""910 dns = config.LAMBDA_DOCKER_DNS911 docker_java_ports = PortMappings()912 if Util.debug_java_port:913 docker_java_ports.add(Util.debug_java_port)914 docker_image = Util.docker_image_for_lambda(lambda_function)915 if config.LAMBDA_REMOTE_DOCKER:916 container_id = DOCKER_CLIENT.create_container(917 image_name=docker_image,918 interactive=True,919 entrypoint=entrypoint,920 remove=True,921 network=network,922 env_vars=env_vars,923 dns=dns,924 additional_flags=additional_flags,925 ports=docker_java_ports,926 command=inv_context.lambda_command,927 )928 DOCKER_CLIENT.copy_into_container(container_id, f"{lambda_cwd}/.", DOCKER_TASK_FOLDER)929 return DOCKER_CLIENT.start_container(930 container_id, interactive=not background, attach=not background, stdin=stdin931 )932 else:933 mount_volumes = None934 if lambda_cwd:935 mount_volumes = [936 (Util.get_host_path_for_path_in_docker(lambda_cwd), DOCKER_TASK_FOLDER)937 ]938 return DOCKER_CLIENT.run_container(939 image_name=docker_image,940 interactive=True,941 detach=background,942 entrypoint=entrypoint,943 remove=True,944 network=network,945 env_vars=env_vars,946 dns=dns,947 additional_flags=additional_flags,948 command=inv_context.lambda_command,949 mount_volumes=mount_volumes,950 stdin=stdin,951 )952class LambdaExecutorLocal(LambdaExecutor):953 def _execute_in_custom_runtime(954 self, cmd: Union[str, List[str]], lambda_function: LambdaFunction = None955 ) -> InvocationResult:956 """957 Generic run function for executing lambdas in custom runtimes.958 :param cmd: the command to execute959 :param lambda_function: function details960 :return: the InvocationResult961 """962 env_vars = lambda_function and lambda_function.envvars963 kwargs = {"stdin": True, "inherit_env": True, "asynchronous": True, "env_vars": env_vars}964 process = run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE, **kwargs)965 result, log_output = process.communicate()966 try:967 result = to_str(result).strip()968 except Exception:969 pass970 log_output = to_str(log_output).strip()971 return_code = process.returncode972 # Note: The user's code may have been logging to stderr, in which case the logs973 # will be part of the "result" variable here. Hence, make sure that we extract974 # only the *last* line of "result" and consider anything above that as log output.975 # TODO: not sure if this code is needed/used976 if isinstance(result, str) and "\n" in result:977 lines = result.split("\n")978 idx = last_index_of(979 lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)980 )981 if idx >= 0:982 result = lines[idx]983 additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])984 log_output += "\n%s" % additional_logs985 log_formatted = log_output.strip().replace("\n", "\n> ")986 func_arn = lambda_function and lambda_function.arn()987 LOG.debug(988 "Lambda %s result / log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)989 )990 # store log output - TODO get live logs from `process` above?991 # store_lambda_logs(lambda_function, log_output)992 if return_code != 0:993 raise InvocationException(994 "Lambda process returned error status code: %s. Result: %s. Output:\n%s"995 % (return_code, result, log_output),996 log_output,997 result,998 )999 invocation_result = InvocationResult(result, log_output=log_output)1000 return invocation_result1001 def _execute(1002 self, lambda_function: LambdaFunction, inv_context: InvocationContext1003 ) -> InvocationResult:1004 # apply plugin patches to prepare invocation context1005 result = self.apply_plugin_patches(inv_context)1006 if isinstance(result, InvocationResult):1007 return result1008 lambda_cwd = lambda_function.cwd1009 environment = self._prepare_environment(lambda_function)1010 if lambda_function.timeout:1011 environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)1012 context = inv_context.context1013 if context:1014 environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name1015 environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version1016 environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn1017 environment["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"] = str(context.memory_limit_in_mb)1018 # execute the Lambda function in a forked sub-process, sync result via queue1019 queue = Queue()1020 lambda_function_callable = lambda_function.function(inv_context.function_version)1021 def do_execute():1022 # now we're executing in the child process, safe to change CWD and ENV1023 result = None1024 try:1025 if lambda_cwd:1026 os.chdir(lambda_cwd)1027 sys.path.insert(0, "")1028 if environment:1029 os.environ.update(environment)1030 # set default env variables required for most Lambda handlers1031 self.set_default_env_variables()1032 # run the actual handler function1033 result = lambda_function_callable(inv_context.event, context)1034 except Exception as e:1035 result = str(e)1036 sys.stderr.write("%s %s" % (e, traceback.format_exc()))1037 raise1038 finally:1039 queue.put(result)1040 process = Process(target=do_execute)1041 start_time = now(millis=True)1042 error = None1043 with CaptureOutput() as c:1044 try:1045 process.run()1046 except Exception as e:1047 error = e1048 result = queue.get()1049 end_time = now(millis=True)1050 # Make sure to keep the log line below, to ensure the log stream gets created1051 request_id = long_uid()1052 log_output = 'START %s: Lambda %s started via "local" executor ...' % (1053 request_id,1054 lambda_function.arn(),1055 )1056 # TODO: Interweaving stdout/stderr currently not supported1057 for stream in (c.stdout(), c.stderr()):1058 if stream:1059 log_output += ("\n" if log_output else "") + stream1060 if isinstance(result, InvocationResult) and result.log_output:1061 log_output += "\n" + result.log_output1062 log_output += "\nEND RequestId: %s" % request_id1063 log_output += "\nREPORT RequestId: %s Duration: %s ms" % (1064 request_id,1065 int((end_time - start_time) * 1000),1066 )1067 # store logs to CloudWatch1068 store_lambda_logs(lambda_function, log_output)1069 result = result.result if isinstance(result, InvocationResult) else result1070 if error:1071 LOG.info(1072 'Error executing Lambda "%s": %s %s',1073 lambda_function.arn(),1074 error,1075 "".join(traceback.format_tb(error.__traceback__)),1076 )1077 raise InvocationException(result, log_output)1078 # construct final invocation result1079 invocation_result = InvocationResult(result, log_output=log_output)1080 # run plugins post-processing logic1081 invocation_result = self.process_result_via_plugins(inv_context, invocation_result)1082 return invocation_result1083 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:1084 # This is a no-op for local executors - simply return the given local file path1085 return local_file1086 def execute_java_lambda(1087 self, event, context, main_file, lambda_function: LambdaFunction = None1088 ) -> InvocationResult:1089 lambda_function.envvars = lambda_function.envvars or {}1090 java_opts = config.LAMBDA_JAVA_OPTS or ""1091 handler = lambda_function.handler1092 lambda_function.envvars[LAMBDA_HANDLER_ENV_VAR_NAME] = handler1093 event_file = EVENT_FILE_PATTERN.replace("*", short_uid())1094 save_file(event_file, json.dumps(json_safe(event)))1095 TMP_FILES.append(event_file)1096 classpath = "%s:%s:%s" % (1097 main_file,1098 Util.get_java_classpath(main_file),1099 LAMBDA_EXECUTOR_JAR,1100 )1101 cmd = "java %s -cp %s %s %s" % (1102 java_opts,1103 classpath,1104 LAMBDA_EXECUTOR_CLASS,1105 event_file,1106 )1107 # apply plugin patches1108 inv_context = InvocationContext(1109 lambda_function, event, environment=lambda_function.envvars, lambda_command=cmd1110 )1111 result = self.apply_plugin_patches(inv_context)1112 if isinstance(result, InvocationResult):1113 return result1114 cmd = inv_context.lambda_command1115 LOG.info(cmd)1116 # execute Lambda and get invocation result1117 invocation_result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1118 return invocation_result1119 def execute_javascript_lambda(1120 self, event, context, main_file, lambda_function: LambdaFunction = None1121 ):1122 handler = lambda_function.handler1123 function = handler.split(".")[-1]1124 event_json_string = "%s" % (json.dumps(json_safe(event)) if event else "{}")1125 context_json_string = "%s" % (json.dumps(context.__dict__) if context else "{}")1126 cmd = [1127 "node",1128 "-e",1129 'require("%s").%s(%s,%s).then(r => process.stdout.write(JSON.stringify(r)))'1130 % (1131 main_file,1132 function,1133 event_json_string,1134 context_json_string,1135 ),1136 ]1137 LOG.info(cmd)1138 result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1139 return result1140 @staticmethod1141 def set_default_env_variables():1142 # set default env variables required for most Lambda handlers1143 default_env_vars = {"AWS_DEFAULT_REGION": aws_stack.get_region()}1144 env_vars_before = {var: os.environ.get(var) for var in default_env_vars}1145 os.environ.update({k: v for k, v in default_env_vars.items() if not env_vars_before.get(k)})1146 return env_vars_before1147 @staticmethod1148 def reset_default_env_variables(env_vars_before):1149 for env_name, env_value in env_vars_before.items():1150 env_value_before = env_vars_before.get(env_name)1151 os.environ[env_name] = env_value_before or ""1152 if env_value_before is None:1153 os.environ.pop(env_name, None)1154 def execute_go_lambda(self, event, context, main_file, lambda_function: LambdaFunction = None):1155 if lambda_function:1156 lambda_function.envvars["AWS_LAMBDA_FUNCTION_HANDLER"] = main_file1157 lambda_function.envvars["AWS_LAMBDA_EVENT_BODY"] = json.dumps(json_safe(event))1158 else:1159 LOG.warning("Unable to get function details for local execution of Golang Lambda")1160 cmd = GO_LAMBDA_RUNTIME1161 LOG.info(cmd)1162 result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1163 return result1164class Util:1165 debug_java_port = False1166 @classmethod1167 def get_java_opts(cls):1168 opts = config.LAMBDA_JAVA_OPTS or ""1169 # Replace _debug_port_ with a random free port1170 if "_debug_port_" in opts:1171 if not cls.debug_java_port:1172 cls.debug_java_port = get_free_tcp_port()1173 opts = opts.replace("_debug_port_", ("%s" % cls.debug_java_port))1174 else:1175 # Parse the debug port from opts1176 m = re.match(".*address=(.+:)?(\\d+).*", opts)1177 if m is not None:1178 cls.debug_java_port = m.groups()[1]1179 return opts1180 @classmethod1181 def get_host_path_for_path_in_docker(cls, path):1182 return re.sub(r"^%s/(.*)$" % config.TMP_FOLDER, r"%s/\1" % config.HOST_TMP_FOLDER, path)1183 @classmethod1184 def format_windows_path(cls, path):1185 temp = path.replace(":", "").replace("\\", "/")1186 if len(temp) >= 1 and temp[:1] != "/":1187 temp = "/" + temp1188 temp = "%s%s" % (config.WINDOWS_DOCKER_MOUNT_PREFIX, temp)1189 return temp1190 @classmethod1191 def docker_image_for_lambda(cls, lambda_function: LambdaFunction):1192 runtime = lambda_function.runtime or ""1193 if lambda_function.code.get("ImageUri"):1194 LOG.warning(1195 "ImageUri is set: Using Lambda container images is only supported in LocalStack Pro"1196 )1197 docker_tag = runtime1198 docker_image = config.LAMBDA_CONTAINER_REGISTRY1199 # TODO: remove prefix once execution issues are fixed with dotnetcore/python lambdas1200 # See https://github.com/lambci/docker-lambda/pull/2181201 lambdas_to_add_prefix = [1202 "dotnetcore2.0",1203 "dotnetcore2.1",1204 "python3.6",1205 "python3.7",1206 ]1207 if docker_image == "lambci/lambda" and any(1208 img in docker_tag for img in lambdas_to_add_prefix1209 ):1210 docker_tag = "20191117-%s" % docker_tag1211 if runtime == "nodejs14.x":1212 # TODO temporary fix until lambci image for nodejs14.x becomes available1213 docker_image = "localstack/lambda-js"1214 if runtime == "python3.9":1215 # TODO temporary fix until we support AWS images via https://github.com/localstack/localstack/pull/47341216 docker_image = "mlupin/docker-lambda"1217 return "%s:%s" % (docker_image, docker_tag)1218 @classmethod1219 def get_java_classpath(cls, archive):1220 """1221 Return the Java classpath, using the parent folder of the1222 given archive as the base folder.1223 The result contains any *.jar files in the base folder, as1224 well as any JAR files in the "lib/*" subfolder living1225 alongside the supplied java archive (.jar or .zip).1226 :param archive: an absolute path to a .jar or .zip Java archive1227 :return: the Java classpath, relative to the base dir of "archive"1228 """1229 entries = ["."]1230 base_dir = os.path.dirname(archive)1231 for pattern in ["%s/*.jar", "%s/lib/*.jar", "%s/java/lib/*.jar", "%s/*.zip"]:1232 for entry in glob.glob(pattern % base_dir):1233 if os.path.realpath(archive) != os.path.realpath(entry):1234 entries.append(os.path.relpath(entry, base_dir))1235 # make sure to append the localstack-utils.jar at the end of the classpath1236 # https://github.com/localstack/localstack/issues/11601237 entries.append(os.path.relpath(archive, base_dir))1238 entries.append("*.jar")1239 entries.append("java/lib/*.jar")1240 result = ":".join(entries)1241 return result1242 @staticmethod1243 def mountable_tmp_file():1244 f = os.path.join(config.TMP_FOLDER, short_uid())1245 TMP_FILES.append(f)1246 return f1247 @staticmethod1248 def inject_endpoints_into_env(env_vars: Dict[str, str]):1249 env_vars = env_vars or {}1250 main_endpoint = get_main_endpoint_from_container()1251 if not env_vars.get("LOCALSTACK_HOSTNAME"):1252 env_vars["LOCALSTACK_HOSTNAME"] = main_endpoint1253 return env_vars1254# --------------1255# GLOBAL STATE1256# --------------1257EXECUTOR_LOCAL = LambdaExecutorLocal()1258EXECUTOR_CONTAINERS_SEPARATE = LambdaExecutorSeparateContainers()1259EXECUTOR_CONTAINERS_REUSE = LambdaExecutorReuseContainers()1260DEFAULT_EXECUTOR = EXECUTOR_CONTAINERS_SEPARATE1261# the keys of AVAILABLE_EXECUTORS map to the LAMBDA_EXECUTOR config variable1262AVAILABLE_EXECUTORS = {...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!