...374 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:375 """Make the given file available to the Lambda process (e.g., by copying into the container) for the376 given invocation context; Returns the path to the file that will be available to the Lambda handler."""377 raise NotImplementedError378 def apply_plugin_patches(self, inv_context: InvocationContext) -> Optional[InvocationResult]:379 """Loop through the list of plugins, and apply their patches to the invocation context (if applicable)"""380 invocation_results = []381 for plugin in LambdaExecutorPlugin.get_plugins():382 if not plugin.should_apply(inv_context):383 continue384 # initialize, if not done yet385 if not hasattr(plugin, "_initialized"):386 LOG.debug("Initializing Lambda executor plugin %s", plugin.__class__)387 plugin.initialize()388 plugin._initialized = True389 # invoke plugin to prepare invocation390 inv_options = plugin.prepare_invocation(inv_context)391 if not inv_options:392 continue393 if isinstance(inv_options, InvocationResult):394 invocation_results.append(inv_options)395 continue396 # copy files397 file_keys_map = {}398 for key, file_path in inv_options.files_to_add.items():399 file_in_container = self.provide_file_to_lambda(file_path, inv_context)400 file_keys_map[key] = file_in_container401 # replace placeholders like "{<fileKey>}" with corresponding file path402 for key, file_path in file_keys_map.items():403 for env_key, env_value in inv_options.env_updates.items():404 inv_options.env_updates[env_key] = str(env_value).replace(405 "{%s}" % key, file_path406 )407 if inv_options.updated_command:408 inv_options.updated_command = inv_options.updated_command.replace(409 "{%s}" % key, file_path410 )411 inv_context.lambda_command = inv_options.updated_command412 # update environment413 inv_context.environment.update(inv_options.env_updates)414 # update handler415 if inv_options.updated_handler:416 inv_context.handler = inv_options.updated_handler417 if invocation_results:418 # TODO: This is currently indeterministic! If multiple execution plugins attempt to return419 # an invocation result right away, only the first one is returned. We need a more solid420 # mechanism for conflict resolution if multiple plugins interfere!421 if len(invocation_results) > 1:422 LOG.warning(423 "Multiple invocation results returned from "424 "LambdaExecutorPlugin.prepare_invocation calls - choosing the first one: %s",425 invocation_results,426 )427 return invocation_results[0]428 def process_result_via_plugins(429 self, inv_context: InvocationContext, invocation_result: InvocationResult430 ) -> InvocationResult:431 """Loop through the list of plugins, and apply their post-processing logic to the Lambda invocation result."""432 for plugin in LambdaExecutorPlugin.get_plugins():433 if not plugin.should_apply(inv_context):434 continue435 invocation_result = plugin.process_result(inv_context, invocation_result)436 return invocation_result437class ContainerInfo:438 """Contains basic information about a docker container."""439 def __init__(self, name, entry_point):440 = name441 self.entry_point = entry_point442class LambdaExecutorContainers(LambdaExecutor):443 """Abstract executor class for executing Lambda functions in Docker containers"""444 def execute_in_container(445 self,446 lambda_function: LambdaFunction,447 inv_context: InvocationContext,448 stdin=None,449 background=False,450 ) -> Tuple[bytes, bytes]:451 raise NotImplementedError452 def run_lambda_executor(self, lambda_function: LambdaFunction, inv_context: InvocationContext):453 env_vars = inv_context.environment454 runtime = lambda_function.runtime or ""455 event = inv_context.event456 stdin_str = None457 event_body = event if event is not None else env_vars.get("AWS_LAMBDA_EVENT_BODY")458 event_body = json.dumps(event_body) if isinstance(event_body, dict) else event_body459 event_body = event_body or ""460 is_large_event = len(event_body) > MAX_ENV_ARGS_LENGTH461 is_provided = runtime.startswith(LAMBDA_RUNTIME_PROVIDED)462 if (463 not is_large_event464 and lambda_function465 and is_provided466 and env_vars.get("DOCKER_LAMBDA_USE_STDIN") == "1"467 ):468 # Note: certain "provided" runtimes (e.g., Rust programs) can block if we pass in469 # the event payload via stdin, hence we rewrite the command to "echo ... | ..." below470 env_updates = {471 "AWS_LAMBDA_EVENT_BODY": to_str(472 event_body473 ), # Note: seems to be needed for provided runtimes!474 "DOCKER_LAMBDA_USE_STDIN": "1",475 }476 env_vars.update(env_updates)477 # Note: $AWS_LAMBDA_COGNITO_IDENTITY='{}' causes Rust Lambdas to hang478 env_vars.pop("AWS_LAMBDA_COGNITO_IDENTITY", None)479 if is_large_event:480 # in case of very large event payloads, we need to pass them via stdin481 LOG.debug(482 "Received large Lambda event payload (length %s) - passing via stdin"483 % len(event_body)484 )485 env_vars["DOCKER_LAMBDA_USE_STDIN"] = "1"486 if env_vars.get("DOCKER_LAMBDA_USE_STDIN") == "1":487 stdin_str = event_body488 if not is_provided:489 env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)490 elif "AWS_LAMBDA_EVENT_BODY" not in env_vars:491 env_vars["AWS_LAMBDA_EVENT_BODY"] = to_str(event_body)492 # apply plugin patches493 result = self.apply_plugin_patches(inv_context)494 if isinstance(result, InvocationResult):495 return result496 if config.LAMBDA_DOCKER_FLAGS:497 inv_context.docker_flags = (498 f"{config.LAMBDA_DOCKER_FLAGS} {inv_context.docker_flags or ''}".strip()499 )500 event_stdin_bytes = stdin_str and to_bytes(stdin_str)501 error = None502 try:503 result, log_output = self.execute_in_container(504 lambda_function,505 inv_context,506 stdin=event_stdin_bytes,507 )508 except ContainerException as e:509 result = e.stdout or ""510 log_output = e.stderr or ""511 error = e512 try:513 result = to_str(result).strip()514 except Exception:515 pass516 log_output = to_str(log_output).strip()517 # Note: The user's code may have been logging to stderr, in which case the logs518 # will be part of the "result" variable here. Hence, make sure that we extract519 # only the *last* line of "result" and consider anything above that as log output.520 if isinstance(result, str) and "\n" in result:521 lines = result.split("\n")522 idx = last_index_of(523 lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)524 )525 if idx >= 0:526 result = lines[idx]527 additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])528 log_output += "\n%s" % additional_logs529 log_formatted = log_output.strip().replace("\n", "\n> ")530 func_arn = lambda_function and lambda_function.arn()531 LOG.debug(532 "Lambda %s result /​ log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)533 )534 # store log output - TODO get live logs from `process` above?535 store_lambda_logs(lambda_function, log_output)536 if error:537 raise InvocationException(538 "Lambda process returned with error. Result: %s. Output:\n%s"539 % (result, log_output),540 log_output,541 result,542 ) from error543 # create result544 invocation_result = InvocationResult(result, log_output=log_output)545 # run plugins post-processing logic546 invocation_result = self.process_result_via_plugins(inv_context, invocation_result)547 return invocation_result548 def prepare_event(self, environment: Dict, event_body: str) -> bytes:549 """Return the event as a stdin string."""550 # amend the environment variables for execution551 environment["AWS_LAMBDA_EVENT_BODY"] = event_body552 return event_body.encode()553 def _execute(self, lambda_function: LambdaFunction, inv_context: InvocationContext):554 runtime = lambda_function.runtime555 handler = lambda_function.handler556 environment = inv_context.environment = self._prepare_environment(lambda_function)557 event = inv_context.event558 context = inv_context.context559 # configure USE_SSL in environment560 if config.USE_SSL:561 environment["USE_SSL"] = "1"562 # prepare event body563 if not event:564 'Empty event body specified for invocation of Lambda "%s"' % lambda_function.arn()566 )567 event = {}568 event_body = json.dumps(json_safe(event))569 event_bytes_for_stdin = self.prepare_event(environment, event_body)570 inv_context.event = event_bytes_for_stdin571 Util.inject_endpoints_into_env(environment)572 environment["EDGE_PORT"] = str(config.EDGE_PORT)573 environment[LAMBDA_HANDLER_ENV_VAR_NAME] = handler574 if os.environ.get("HTTP_PROXY"):575 environment["HTTP_PROXY"] = os.environ["HTTP_PROXY"]576 if lambda_function.timeout:577 environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)578 if context:579 environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name580 environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version581 environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn582 environment["AWS_LAMBDA_COGNITO_IDENTITY"] = json.dumps(context.cognito_identity or {})583 if context.client_context is not None:584 environment["AWS_LAMBDA_CLIENT_CONTEXT"] = json.dumps(585 to_str(base64.b64decode(to_bytes(context.client_context)))586 )587 # pass JVM options to the Lambda environment, if configured588 if config.LAMBDA_JAVA_OPTS and is_java_lambda(runtime):589 if environment.get("JAVA_TOOL_OPTIONS"):590 "Skip setting LAMBDA_JAVA_OPTS as JAVA_TOOL_OPTIONS already defined in Lambda env vars"592 )593 else:594 LOG.debug(595 "Passing JVM options to container environment: JAVA_TOOL_OPTIONS=%s"596 % config.LAMBDA_JAVA_OPTS597 )598 environment["JAVA_TOOL_OPTIONS"] = config.LAMBDA_JAVA_OPTS599 # accept any self-signed certificates for outgoing calls from the Lambda600 if is_nodejs_runtime(runtime):601 environment["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"602 # run Lambda executor and fetch invocation result603"Running lambda: %s" % lambda_function.arn())604 result = self.run_lambda_executor(lambda_function=lambda_function, inv_context=inv_context)605 return result606 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:607 if config.LAMBDA_REMOTE_DOCKER:608"TODO: copy file into container for LAMBDA_REMOTE_DOCKER=1 - %s", local_file)609 return local_file610 mountable_file = Util.get_host_path_for_path_in_docker(local_file)611 _, extension = os.path.splitext(local_file)612 target_file_name = f"{md5(local_file)}{extension}"613 target_path = f"/​tmp/​{target_file_name}"614 inv_context.docker_flags = inv_context.docker_flags or ""615 inv_context.docker_flags += f"-v {mountable_file}:{target_path}"616 return target_path617class LambdaExecutorReuseContainers(LambdaExecutorContainers):618 """Executor class for executing Lambda functions in re-usable Docker containers"""619 def __init__(self):620 super(LambdaExecutorReuseContainers, self).__init__()621 # locking thread for creation/​destruction of docker containers.622 self.docker_container_lock = threading.RLock()623 # On each invocation we try to construct a port unlikely to conflict624 # with a previously invoked lambda function. This is a problem with at625 # least the lambci/​lambda:go1.x container, which execs a go program that626 # attempts to bind to the same default port.627 self.next_port = 0628 self.max_port = LAMBDA_SERVER_UNIQUE_PORTS629 self.port_offset = LAMBDA_SERVER_PORT_OFFSET630 def execute_in_container(631 self,632 lambda_function: LambdaFunction,633 inv_context: InvocationContext,634 stdin=None,635 background=False,636 ) -> Tuple[bytes, bytes]:637 func_arn = lambda_function.arn()638 lambda_cwd = lambda_function.cwd639 runtime = lambda_function.runtime640 env_vars = inv_context.environment641 # check whether the Lambda has been invoked before642 has_been_invoked_before = func_arn in self.function_invoke_times643 # Choose a port for this invocation644 with self.docker_container_lock:645 env_vars["_LAMBDA_SERVER_PORT"] = str(self.next_port + self.port_offset)646 self.next_port = (self.next_port + 1) % self.max_port647 # create/​verify the docker container is running.648 LOG.debug(649 'Priming docker container with runtime "%s" and arn "%s".',650 runtime,651 func_arn,652 )653 container_info = self.prime_docker_container(654 lambda_function, dict(env_vars), lambda_cwd, inv_context.docker_flags655 )656 if not inv_context.lambda_command and inv_context.handler:657 command = container_info.entry_point.split()658 command.append(inv_context.handler)659 inv_context.lambda_command = command660 # determine files to be copied into the container661 if not has_been_invoked_before and config.LAMBDA_REMOTE_DOCKER:662 # if this is the first invocation: copy the entire folder into the container663 DOCKER_CLIENT.copy_into_container(664, f"{lambda_cwd}/​.", DOCKER_TASK_FOLDER665 )666 return DOCKER_CLIENT.exec_in_container(667,668 command=inv_context.lambda_command,669 interactive=True,670 env_vars=env_vars,671 stdin=stdin,672 )673 def _execute(self, func_arn, *args, **kwargs):674 if not LAMBDA_CONCURRENCY_LOCK.get(func_arn):675 concurrency_lock = threading.RLock()676 LAMBDA_CONCURRENCY_LOCK[func_arn] = concurrency_lock677 with LAMBDA_CONCURRENCY_LOCK[func_arn]:678 return super(LambdaExecutorReuseContainers, self)._execute(func_arn, *args, **kwargs)679 def startup(self):680 self.cleanup()681 # start a process to remove idle containers682 if config.LAMBDA_REMOVE_CONTAINERS:683 self.start_idle_container_destroyer_interval()684 def cleanup(self, arn=None):685 if arn:686 self.function_invoke_times.pop(arn, None)687 return self.destroy_docker_container(arn)688 self.function_invoke_times = {}689 return self.destroy_existing_docker_containers()690 def prime_docker_container(691 self,692 lambda_function: LambdaFunction,693 env_vars: Dict,694 lambda_cwd: str,695 docker_flags: str = None,696 ):697 """698 Prepares a persistent docker container for a specific function.699 :param lambda_function: The Details of the lambda function.700 :param env_vars: The environment variables for the lambda.701 :param lambda_cwd: The local directory containing the code for the lambda function.702 :return: ContainerInfo class containing the container name and default entry point.703 """704 with self.docker_container_lock:705 # Get the container name and id.706 func_arn = lambda_function.arn()707 container_name = self.get_container_name(func_arn)708 status = self.get_docker_container_status(func_arn)709 LOG.debug('Priming Docker container (status "%s"): %s' % (status, container_name))710 docker_image = Util.docker_image_for_lambda(lambda_function)711 # Container is not running or doesn't exist.712 if status < 1:713 # Make sure the container does not exist in any form/​state.714 self.destroy_docker_container(func_arn)715 # get container startup command and run it716 LOG.debug("Creating container: %s" % container_name)717 self.create_container(lambda_function, env_vars, lambda_cwd, docker_flags)718 if config.LAMBDA_REMOTE_DOCKER:719 LOG.debug(720 'Copying files to container "%s" from "%s".' % (container_name, lambda_cwd)721 )722 DOCKER_CLIENT.copy_into_container(723 container_name, "%s/​." % lambda_cwd, DOCKER_TASK_FOLDER724 )725 LOG.debug("Starting docker-reuse Lambda container: %s", container_name)726 DOCKER_CLIENT.start_container(container_name)727 # give the container some time to start up728 time.sleep(1)729 container_network = self.get_docker_container_network(func_arn)730 entry_point = DOCKER_CLIENT.get_image_entrypoint(docker_image)731 LOG.debug(732 'Using entrypoint "%s" for container "%s" on network "%s".'733 % (entry_point, container_name, container_network)734 )735 return ContainerInfo(container_name, entry_point)736 def create_container(737 self,738 lambda_function: LambdaFunction,739 env_vars: Dict,740 lambda_cwd: str,741 docker_flags: str = None,742 ):743 docker_image = Util.docker_image_for_lambda(lambda_function)744 container_name = self.get_container_name(lambda_function.arn())745 # make sure we set LOCALSTACK_HOSTNAME746 Util.inject_endpoints_into_env(env_vars)747 # make sure AWS_LAMBDA_EVENT_BODY is not set (otherwise causes issues with "docker exec ..." above)748 env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)749 network = config.LAMBDA_DOCKER_NETWORK750 additional_flags = docker_flags751 dns = config.LAMBDA_DOCKER_DNS752 mount_volumes = not config.LAMBDA_REMOTE_DOCKER753 lambda_cwd_on_host = Util.get_host_path_for_path_in_docker(lambda_cwd)754 if ":" in lambda_cwd and "\\" in lambda_cwd:755 lambda_cwd_on_host = Util.format_windows_path(lambda_cwd_on_host)756 mount_volumes = [(lambda_cwd_on_host, DOCKER_TASK_FOLDER)] if mount_volumes else None757 if os.environ.get("HOSTNAME"):758 env_vars["HOSTNAME"] = os.environ.get("HOSTNAME")759 env_vars["EDGE_PORT"] = config.EDGE_PORT760 LOG.debug(761 "Creating docker-reuse Lambda container %s from image %s", container_name, docker_image762 )763 return DOCKER_CLIENT.create_container(764 image_name=docker_image,765 remove=True,766 interactive=True,767 name=container_name,768 entrypoint="/​bin/​bash",769 network=network,770 env_vars=env_vars,771 dns=dns,772 mount_volumes=mount_volumes,773 additional_flags=additional_flags,774 )775 def destroy_docker_container(self, func_arn):776 """777 Stops and/​or removes a docker container for a specific lambda function ARN.778 :param func_arn: The ARN of the lambda function.779 :return: None780 """781 with self.docker_container_lock:782 status = self.get_docker_container_status(func_arn)783 # Get the container name and id.784 container_name = self.get_container_name(func_arn)785 if status == 1:786 LOG.debug("Stopping container: %s" % container_name)787 DOCKER_CLIENT.stop_container(container_name)788 status = self.get_docker_container_status(func_arn)789 if status == -1:790 LOG.debug("Removing container: %s" % container_name)791 rm_docker_container(container_name, safe=True)792 # clean up function invoke times, as some init logic depends on this793 self.function_invoke_times.pop(func_arn, None)794 def get_all_container_names(self):795 """796 Returns a list of container names for lambda containers.797 :return: A String[] localstack docker container names for each function.798 """799 with self.docker_container_lock:800 LOG.debug("Getting all lambda containers names.")801 list_result = DOCKER_CLIENT.list_containers(802 filter=f"name={self.get_container_prefix()}*"803 )804 container_names = list(map(lambda container: container["name"], list_result))805 return container_names806 def destroy_existing_docker_containers(self):807 """808 Stops and/​or removes all lambda docker containers for localstack.809 :return: None810 """811 with self.docker_container_lock:812 container_names = self.get_all_container_names()813 LOG.debug("Removing %d containers." % len(container_names))814 for container_name in container_names:815 DOCKER_CLIENT.remove_container(container_name)816 def get_docker_container_status(self, func_arn):817 """818 Determine the status of a docker container.819 :param func_arn: The ARN of the lambda function.820 :return: 1 If the container is running,821 -1 if the container exists but is not running822 0 if the container does not exist.823 """824 with self.docker_container_lock:825 # Get the container name and id.826 container_name = self.get_container_name(func_arn)827 container_status = DOCKER_CLIENT.get_container_status(container_name)828 return container_status.value829 def get_docker_container_network(self, func_arn):830 """831 Determine the network of a docker container.832 :param func_arn: The ARN of the lambda function.833 :return: name of the container network834 """835 with self.docker_container_lock:836 status = self.get_docker_container_status(func_arn)837 # container does not exist838 if status == 0:839 return ""840 # Get the container name.841 container_name = self.get_container_name(func_arn)842 container_network = DOCKER_CLIENT.get_network(container_name)843 return container_network844 def idle_container_destroyer(self):845 """846 Iterates though all the lambda containers and destroys any container that has847 been inactive for longer than MAX_CONTAINER_IDLE_TIME_MS.848 :return: None849 """850 LOG.debug("Checking if there are idle containers ...")851 current_time = int(time.time() * 1000)852 for func_arn, last_run_time in dict(self.function_invoke_times).items():853 duration = current_time - last_run_time854 # not enough idle time has passed855 if duration < MAX_CONTAINER_IDLE_TIME_MS:856 continue857 # container has been idle, destroy it.858 self.destroy_docker_container(func_arn)859 def start_idle_container_destroyer_interval(self):860 """861 Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.862 Thus checking for idle containers and destroying them.863 :return: None864 """865 self.idle_container_destroyer()866 threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()867 def get_container_prefix(self) -> str:868 """869 Returns the prefix of all docker-reuse lambda containers for this LocalStack instance870 :return: Lambda container name prefix871 """872 return f"{bootstrap.get_main_container_name()}_lambda_"873 def get_container_name(self, func_arn):874 """875 Given a function ARN, returns a valid docker container name.876 :param func_arn: The ARN of the lambda function.877 :return: A docker compatible name for the arn.878 """879 return self.get_container_prefix() + re.sub(r"[^a-zA-Z0-9_.-]", "_", func_arn)880class LambdaExecutorSeparateContainers(LambdaExecutorContainers):881 def __init__(self):882 super(LambdaExecutorSeparateContainers, self).__init__()883 self.max_port = LAMBDA_API_UNIQUE_PORTS884 self.port_offset = LAMBDA_API_PORT_OFFSET885 def prepare_event(self, environment: Dict, event_body: str) -> bytes:886 # Tell Lambci to use STDIN for the event887 environment["DOCKER_LAMBDA_USE_STDIN"] = "1"888 return event_body.encode()889 def execute_in_container(890 self,891 lambda_function: LambdaFunction,892 inv_context: InvocationContext,893 stdin=None,894 background=False,895 ) -> Tuple[bytes, bytes]:896 lambda_cwd = lambda_function.cwd897 env_vars = inv_context.environment898 entrypoint = None899 if inv_context.lambda_command:900 entrypoint = ""901 elif inv_context.handler:902 inv_context.lambda_command = inv_context.handler903 # add Docker Lambda env vars904 network = config.LAMBDA_DOCKER_NETWORK or None905 if network == "host":906 port = get_free_tcp_port()907 env_vars["DOCKER_LAMBDA_API_PORT"] = port908 env_vars["DOCKER_LAMBDA_RUNTIME_PORT"] = port909 additional_flags = inv_context.docker_flags or ""910 dns = config.LAMBDA_DOCKER_DNS911 docker_java_ports = PortMappings()912 if Util.debug_java_port:913 docker_java_ports.add(Util.debug_java_port)914 docker_image = Util.docker_image_for_lambda(lambda_function)915 if config.LAMBDA_REMOTE_DOCKER:916 container_id = DOCKER_CLIENT.create_container(917 image_name=docker_image,918 interactive=True,919 entrypoint=entrypoint,920 remove=True,921 network=network,922 env_vars=env_vars,923 dns=dns,924 additional_flags=additional_flags,925 ports=docker_java_ports,926 command=inv_context.lambda_command,927 )928 DOCKER_CLIENT.copy_into_container(container_id, f"{lambda_cwd}/​.", DOCKER_TASK_FOLDER)929 return DOCKER_CLIENT.start_container(930 container_id, interactive=not background, attach=not background, stdin=stdin931 )932 else:933 mount_volumes = None934 if lambda_cwd:935 mount_volumes = [936 (Util.get_host_path_for_path_in_docker(lambda_cwd), DOCKER_TASK_FOLDER)937 ]938 return DOCKER_CLIENT.run_container(939 image_name=docker_image,940 interactive=True,941 detach=background,942 entrypoint=entrypoint,943 remove=True,944 network=network,945 env_vars=env_vars,946 dns=dns,947 additional_flags=additional_flags,948 command=inv_context.lambda_command,949 mount_volumes=mount_volumes,950 stdin=stdin,951 )952class LambdaExecutorLocal(LambdaExecutor):953 def _execute_in_custom_runtime(954 self, cmd: Union[str, List[str]], lambda_function: LambdaFunction = None955 ) -> InvocationResult:956 """957 Generic run function for executing lambdas in custom runtimes.958 :param cmd: the command to execute959 :param lambda_function: function details960 :return: the InvocationResult961 """962 env_vars = lambda_function and lambda_function.envvars963 kwargs = {"stdin": True, "inherit_env": True, "asynchronous": True, "env_vars": env_vars}964 process = run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE, **kwargs)965 result, log_output = process.communicate()966 try:967 result = to_str(result).strip()968 except Exception:969 pass970 log_output = to_str(log_output).strip()971 return_code = process.returncode972 # Note: The user's code may have been logging to stderr, in which case the logs973 # will be part of the "result" variable here. Hence, make sure that we extract974 # only the *last* line of "result" and consider anything above that as log output.975 # TODO: not sure if this code is needed/​used976 if isinstance(result, str) and "\n" in result:977 lines = result.split("\n")978 idx = last_index_of(979 lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)980 )981 if idx >= 0:982 result = lines[idx]983 additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])984 log_output += "\n%s" % additional_logs985 log_formatted = log_output.strip().replace("\n", "\n> ")986 func_arn = lambda_function and lambda_function.arn()987 LOG.debug(988 "Lambda %s result /​ log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)989 )990 # store log output - TODO get live logs from `process` above?991 # store_lambda_logs(lambda_function, log_output)992 if return_code != 0:993 raise InvocationException(994 "Lambda process returned error status code: %s. Result: %s. Output:\n%s"995 % (return_code, result, log_output),996 log_output,997 result,998 )999 invocation_result = InvocationResult(result, log_output=log_output)1000 return invocation_result1001 def _execute(1002 self, lambda_function: LambdaFunction, inv_context: InvocationContext1003 ) -> InvocationResult:1004 # apply plugin patches to prepare invocation context1005 result = self.apply_plugin_patches(inv_context)1006 if isinstance(result, InvocationResult):1007 return result1008 lambda_cwd = lambda_function.cwd1009 environment = self._prepare_environment(lambda_function)1010 if lambda_function.timeout:1011 environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)1012 context = inv_context.context1013 if context:1014 environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name1015 environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version1016 environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn1017 environment["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"] = str(context.memory_limit_in_mb)1018 # execute the Lambda function in a forked sub-process, sync result via queue1019 queue = Queue()1020 lambda_function_callable = lambda_function.function(inv_context.function_version)1021 def do_execute():1022 # now we're executing in the child process, safe to change CWD and ENV1023 result = None1024 try:1025 if lambda_cwd:1026 os.chdir(lambda_cwd)1027 sys.path.insert(0, "")1028 if environment:1029 os.environ.update(environment)1030 # set default env variables required for most Lambda handlers1031 self.set_default_env_variables()1032 # run the actual handler function1033 result = lambda_function_callable(inv_context.event, context)1034 except Exception as e:1035 result = str(e)1036 sys.stderr.write("%s %s" % (e, traceback.format_exc()))1037 raise1038 finally:1039 queue.put(result)1040 process = Process(target=do_execute)1041 start_time = now(millis=True)1042 error = None1043 with CaptureOutput() as c:1044 try:1045 except Exception as e:1047 error = e1048 result = queue.get()1049 end_time = now(millis=True)1050 # Make sure to keep the log line below, to ensure the log stream gets created1051 request_id = long_uid()1052 log_output = 'START %s: Lambda %s started via "local" executor ...' % (1053 request_id,1054 lambda_function.arn(),1055 )1056 # TODO: Interweaving stdout/​stderr currently not supported1057 for stream in (c.stdout(), c.stderr()):1058 if stream:1059 log_output += ("\n" if log_output else "") + stream1060 if isinstance(result, InvocationResult) and result.log_output:1061 log_output += "\n" + result.log_output1062 log_output += "\nEND RequestId: %s" % request_id1063 log_output += "\nREPORT RequestId: %s Duration: %s ms" % (1064 request_id,1065 int((end_time - start_time) * 1000),1066 )1067 # store logs to CloudWatch1068 store_lambda_logs(lambda_function, log_output)1069 result = result.result if isinstance(result, InvocationResult) else result1070 if error:1071 'Error executing Lambda "%s": %s %s',1073 lambda_function.arn(),1074 error,1075 "".join(traceback.format_tb(error.__traceback__)),1076 )1077 raise InvocationException(result, log_output)1078 # construct final invocation result1079 invocation_result = InvocationResult(result, log_output=log_output)1080 # run plugins post-processing logic1081 invocation_result = self.process_result_via_plugins(inv_context, invocation_result)1082 return invocation_result1083 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:1084 # This is a no-op for local executors - simply return the given local file path1085 return local_file1086 def execute_java_lambda(1087 self, event, context, main_file, lambda_function: LambdaFunction = None1088 ) -> InvocationResult:1089 lambda_function.envvars = lambda_function.envvars or {}1090 java_opts = config.LAMBDA_JAVA_OPTS or ""1091 handler = lambda_function.handler1092 lambda_function.envvars[LAMBDA_HANDLER_ENV_VAR_NAME] = handler1093 event_file = EVENT_FILE_PATTERN.replace("*", short_uid())1094 save_file(event_file, json.dumps(json_safe(event)))1095 TMP_FILES.append(event_file)1096 classpath = "%s:%s:%s" % (1097 main_file,1098 Util.get_java_classpath(main_file),1099 LAMBDA_EXECUTOR_JAR,1100 )1101 cmd = "java %s -cp %s %s %s" % (1102 java_opts,1103 classpath,1104 LAMBDA_EXECUTOR_CLASS,1105 event_file,1106 )1107 # apply plugin patches1108 inv_context = InvocationContext(1109 lambda_function, event, environment=lambda_function.envvars, lambda_command=cmd1110 )1111 result = self.apply_plugin_patches(inv_context)1112 if isinstance(result, InvocationResult):1113 return result1114 cmd = inv_context.lambda_command1115 # execute Lambda and get invocation result1117 invocation_result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1118 return invocation_result1119 def execute_javascript_lambda(1120 self, event, context, main_file, lambda_function: LambdaFunction = None1121 ):1122 handler = lambda_function.handler1123 function = handler.split(".")[-1]1124 event_json_string = "%s" % (json.dumps(json_safe(event)) if event else "{}")1125 context_json_string = "%s" % (json.dumps(context.__dict__) if context else "{}")...

