Best Python code snippet using localstack_python
lambda_executors.py
Source:lambda_executors.py
...374 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:375 """Make the given file available to the Lambda process (e.g., by copying into the container) for the376 given invocation context; Returns the path to the file that will be available to the Lambda handler."""377 raise NotImplementedError378 def apply_plugin_patches(self, inv_context: InvocationContext) -> Optional[InvocationResult]:379 """Loop through the list of plugins, and apply their patches to the invocation context (if applicable)"""380 invocation_results = []381 for plugin in LambdaExecutorPlugin.get_plugins():382 if not plugin.should_apply(inv_context):383 continue384 # initialize, if not done yet385 if not hasattr(plugin, "_initialized"):386 LOG.debug("Initializing Lambda executor plugin %s", plugin.__class__)387 plugin.initialize()388 plugin._initialized = True389 # invoke plugin to prepare invocation390 inv_options = plugin.prepare_invocation(inv_context)391 if not inv_options:392 continue393 if isinstance(inv_options, InvocationResult):394 invocation_results.append(inv_options)395 continue396 # copy files397 file_keys_map = {}398 for key, file_path in inv_options.files_to_add.items():399 file_in_container = self.provide_file_to_lambda(file_path, inv_context)400 file_keys_map[key] = file_in_container401 # replace placeholders like "{<fileKey>}" with corresponding file path402 for key, file_path in file_keys_map.items():403 for env_key, env_value in inv_options.env_updates.items():404 inv_options.env_updates[env_key] = str(env_value).replace(405 "{%s}" % key, file_path406 )407 if inv_options.updated_command:408 inv_options.updated_command = inv_options.updated_command.replace(409 "{%s}" % key, file_path410 )411 inv_context.lambda_command = inv_options.updated_command412 # update environment413 inv_context.environment.update(inv_options.env_updates)414 # update handler415 if inv_options.updated_handler:416 inv_context.handler = inv_options.updated_handler417 if invocation_results:418 # TODO: This is currently indeterministic! If multiple execution plugins attempt to return419 # an invocation result right away, only the first one is returned. We need a more solid420 # mechanism for conflict resolution if multiple plugins interfere!421 if len(invocation_results) > 1:422 LOG.warning(423 "Multiple invocation results returned from "424 "LambdaExecutorPlugin.prepare_invocation calls - choosing the first one: %s",425 invocation_results,426 )427 return invocation_results[0]428 def process_result_via_plugins(429 self, inv_context: InvocationContext, invocation_result: InvocationResult430 ) -> InvocationResult:431 """Loop through the list of plugins, and apply their post-processing logic to the Lambda invocation result."""432 for plugin in LambdaExecutorPlugin.get_plugins():433 if not plugin.should_apply(inv_context):434 continue435 invocation_result = plugin.process_result(inv_context, invocation_result)436 return invocation_result437class ContainerInfo:438 """Contains basic information about a docker container."""439 def __init__(self, name, entry_point):440 self.name = name441 self.entry_point = entry_point442class LambdaExecutorContainers(LambdaExecutor):443 """Abstract executor class for executing Lambda functions in Docker containers"""444 def execute_in_container(445 self,446 lambda_function: LambdaFunction,447 inv_context: InvocationContext,448 stdin=None,449 background=False,450 ) -> Tuple[bytes, bytes]:451 raise NotImplementedError452 def run_lambda_executor(self, lambda_function: LambdaFunction, inv_context: InvocationContext):453 env_vars = inv_context.environment454 runtime = lambda_function.runtime or ""455 event = inv_context.event456 stdin_str = None457 event_body = event if event is not None else env_vars.get("AWS_LAMBDA_EVENT_BODY")458 event_body = json.dumps(event_body) if isinstance(event_body, dict) else event_body459 event_body = event_body or ""460 is_large_event = len(event_body) > MAX_ENV_ARGS_LENGTH461 is_provided = runtime.startswith(LAMBDA_RUNTIME_PROVIDED)462 if (463 not is_large_event464 and lambda_function465 and is_provided466 and env_vars.get("DOCKER_LAMBDA_USE_STDIN") == "1"467 ):468 # Note: certain "provided" runtimes (e.g., Rust programs) can block if we pass in469 # the event payload via stdin, hence we rewrite the command to "echo ... | ..." below470 env_updates = {471 "AWS_LAMBDA_EVENT_BODY": to_str(472 event_body473 ), # Note: seems to be needed for provided runtimes!474 "DOCKER_LAMBDA_USE_STDIN": "1",475 }476 env_vars.update(env_updates)477 # Note: $AWS_LAMBDA_COGNITO_IDENTITY='{}' causes Rust Lambdas to hang478 env_vars.pop("AWS_LAMBDA_COGNITO_IDENTITY", None)479 if is_large_event:480 # in case of very large event payloads, we need to pass them via stdin481 LOG.debug(482 "Received large Lambda event payload (length %s) - passing via stdin"483 % len(event_body)484 )485 env_vars["DOCKER_LAMBDA_USE_STDIN"] = "1"486 if env_vars.get("DOCKER_LAMBDA_USE_STDIN") == "1":487 stdin_str = event_body488 if not is_provided:489 env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)490 elif "AWS_LAMBDA_EVENT_BODY" not in env_vars:491 env_vars["AWS_LAMBDA_EVENT_BODY"] = to_str(event_body)492 # apply plugin patches493 result = self.apply_plugin_patches(inv_context)494 if isinstance(result, InvocationResult):495 return result496 if config.LAMBDA_DOCKER_FLAGS:497 inv_context.docker_flags = (498 f"{config.LAMBDA_DOCKER_FLAGS} {inv_context.docker_flags or ''}".strip()499 )500 event_stdin_bytes = stdin_str and to_bytes(stdin_str)501 error = None502 try:503 result, log_output = self.execute_in_container(504 lambda_function,505 inv_context,506 stdin=event_stdin_bytes,507 )508 except ContainerException as e:509 result = e.stdout or ""510 log_output = e.stderr or ""511 error = e512 try:513 result = to_str(result).strip()514 except Exception:515 pass516 log_output = to_str(log_output).strip()517 # Note: The user's code may have been logging to stderr, in which case the logs518 # will be part of the "result" variable here. Hence, make sure that we extract519 # only the *last* line of "result" and consider anything above that as log output.520 if isinstance(result, str) and "\n" in result:521 lines = result.split("\n")522 idx = last_index_of(523 lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)524 )525 if idx >= 0:526 result = lines[idx]527 additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])528 log_output += "\n%s" % additional_logs529 log_formatted = log_output.strip().replace("\n", "\n> ")530 func_arn = lambda_function and lambda_function.arn()531 LOG.debug(532 "Lambda %s result / log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)533 )534 # store log output - TODO get live logs from `process` above?535 store_lambda_logs(lambda_function, log_output)536 if error:537 raise InvocationException(538 "Lambda process returned with error. Result: %s. Output:\n%s"539 % (result, log_output),540 log_output,541 result,542 ) from error543 # create result544 invocation_result = InvocationResult(result, log_output=log_output)545 # run plugins post-processing logic546 invocation_result = self.process_result_via_plugins(inv_context, invocation_result)547 return invocation_result548 def prepare_event(self, environment: Dict, event_body: str) -> bytes:549 """Return the event as a stdin string."""550 # amend the environment variables for execution551 environment["AWS_LAMBDA_EVENT_BODY"] = event_body552 return event_body.encode()553 def _execute(self, lambda_function: LambdaFunction, inv_context: InvocationContext):554 runtime = lambda_function.runtime555 handler = lambda_function.handler556 environment = inv_context.environment = self._prepare_environment(lambda_function)557 event = inv_context.event558 context = inv_context.context559 # configure USE_SSL in environment560 if config.USE_SSL:561 environment["USE_SSL"] = "1"562 # prepare event body563 if not event:564 LOG.info(565 'Empty event body specified for invocation of Lambda "%s"' % lambda_function.arn()566 )567 event = {}568 event_body = json.dumps(json_safe(event))569 event_bytes_for_stdin = self.prepare_event(environment, event_body)570 inv_context.event = event_bytes_for_stdin571 Util.inject_endpoints_into_env(environment)572 environment["EDGE_PORT"] = str(config.EDGE_PORT)573 environment[LAMBDA_HANDLER_ENV_VAR_NAME] = handler574 if os.environ.get("HTTP_PROXY"):575 environment["HTTP_PROXY"] = os.environ["HTTP_PROXY"]576 if lambda_function.timeout:577 environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)578 if context:579 environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name580 environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version581 environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn582 environment["AWS_LAMBDA_COGNITO_IDENTITY"] = json.dumps(context.cognito_identity or {})583 if context.client_context is not None:584 environment["AWS_LAMBDA_CLIENT_CONTEXT"] = json.dumps(585 to_str(base64.b64decode(to_bytes(context.client_context)))586 )587 # pass JVM options to the Lambda environment, if configured588 if config.LAMBDA_JAVA_OPTS and is_java_lambda(runtime):589 if environment.get("JAVA_TOOL_OPTIONS"):590 LOG.info(591 "Skip setting LAMBDA_JAVA_OPTS as JAVA_TOOL_OPTIONS already defined in Lambda env vars"592 )593 else:594 LOG.debug(595 "Passing JVM options to container environment: JAVA_TOOL_OPTIONS=%s"596 % config.LAMBDA_JAVA_OPTS597 )598 environment["JAVA_TOOL_OPTIONS"] = config.LAMBDA_JAVA_OPTS599 # accept any self-signed certificates for outgoing calls from the Lambda600 if is_nodejs_runtime(runtime):601 environment["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"602 # run Lambda executor and fetch invocation result603 LOG.info("Running lambda: %s" % lambda_function.arn())604 result = self.run_lambda_executor(lambda_function=lambda_function, inv_context=inv_context)605 return result606 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:607 if config.LAMBDA_REMOTE_DOCKER:608 LOG.info("TODO: copy file into container for LAMBDA_REMOTE_DOCKER=1 - %s", local_file)609 return local_file610 mountable_file = Util.get_host_path_for_path_in_docker(local_file)611 _, extension = os.path.splitext(local_file)612 target_file_name = f"{md5(local_file)}{extension}"613 target_path = f"/tmp/{target_file_name}"614 inv_context.docker_flags = inv_context.docker_flags or ""615 inv_context.docker_flags += f"-v {mountable_file}:{target_path}"616 return target_path617class LambdaExecutorReuseContainers(LambdaExecutorContainers):618 """Executor class for executing Lambda functions in re-usable Docker containers"""619 def __init__(self):620 super(LambdaExecutorReuseContainers, self).__init__()621 # locking thread for creation/destruction of docker containers.622 self.docker_container_lock = threading.RLock()623 # On each invocation we try to construct a port unlikely to conflict624 # with a previously invoked lambda function. This is a problem with at625 # least the lambci/lambda:go1.x container, which execs a go program that626 # attempts to bind to the same default port.627 self.next_port = 0628 self.max_port = LAMBDA_SERVER_UNIQUE_PORTS629 self.port_offset = LAMBDA_SERVER_PORT_OFFSET630 def execute_in_container(631 self,632 lambda_function: LambdaFunction,633 inv_context: InvocationContext,634 stdin=None,635 background=False,636 ) -> Tuple[bytes, bytes]:637 func_arn = lambda_function.arn()638 lambda_cwd = lambda_function.cwd639 runtime = lambda_function.runtime640 env_vars = inv_context.environment641 # check whether the Lambda has been invoked before642 has_been_invoked_before = func_arn in self.function_invoke_times643 # Choose a port for this invocation644 with self.docker_container_lock:645 env_vars["_LAMBDA_SERVER_PORT"] = str(self.next_port + self.port_offset)646 self.next_port = (self.next_port + 1) % self.max_port647 # create/verify the docker container is running.648 LOG.debug(649 'Priming docker container with runtime "%s" and arn "%s".',650 runtime,651 func_arn,652 )653 container_info = self.prime_docker_container(654 lambda_function, dict(env_vars), lambda_cwd, inv_context.docker_flags655 )656 if not inv_context.lambda_command and inv_context.handler:657 command = container_info.entry_point.split()658 command.append(inv_context.handler)659 inv_context.lambda_command = command660 # determine files to be copied into the container661 if not has_been_invoked_before and config.LAMBDA_REMOTE_DOCKER:662 # if this is the first invocation: copy the entire folder into the container663 DOCKER_CLIENT.copy_into_container(664 container_info.name, f"{lambda_cwd}/.", DOCKER_TASK_FOLDER665 )666 return DOCKER_CLIENT.exec_in_container(667 container_name_or_id=container_info.name,668 command=inv_context.lambda_command,669 interactive=True,670 env_vars=env_vars,671 stdin=stdin,672 )673 def _execute(self, func_arn, *args, **kwargs):674 if not LAMBDA_CONCURRENCY_LOCK.get(func_arn):675 concurrency_lock = threading.RLock()676 LAMBDA_CONCURRENCY_LOCK[func_arn] = concurrency_lock677 with LAMBDA_CONCURRENCY_LOCK[func_arn]:678 return super(LambdaExecutorReuseContainers, self)._execute(func_arn, *args, **kwargs)679 def startup(self):680 self.cleanup()681 # start a process to remove idle containers682 if config.LAMBDA_REMOVE_CONTAINERS:683 self.start_idle_container_destroyer_interval()684 def cleanup(self, arn=None):685 if arn:686 self.function_invoke_times.pop(arn, None)687 return self.destroy_docker_container(arn)688 self.function_invoke_times = {}689 return self.destroy_existing_docker_containers()690 def prime_docker_container(691 self,692 lambda_function: LambdaFunction,693 env_vars: Dict,694 lambda_cwd: str,695 docker_flags: str = None,696 ):697 """698 Prepares a persistent docker container for a specific function.699 :param lambda_function: The Details of the lambda function.700 :param env_vars: The environment variables for the lambda.701 :param lambda_cwd: The local directory containing the code for the lambda function.702 :return: ContainerInfo class containing the container name and default entry point.703 """704 with self.docker_container_lock:705 # Get the container name and id.706 func_arn = lambda_function.arn()707 container_name = self.get_container_name(func_arn)708 status = self.get_docker_container_status(func_arn)709 LOG.debug('Priming Docker container (status "%s"): %s' % (status, container_name))710 docker_image = Util.docker_image_for_lambda(lambda_function)711 # Container is not running or doesn't exist.712 if status < 1:713 # Make sure the container does not exist in any form/state.714 self.destroy_docker_container(func_arn)715 # get container startup command and run it716 LOG.debug("Creating container: %s" % container_name)717 self.create_container(lambda_function, env_vars, lambda_cwd, docker_flags)718 if config.LAMBDA_REMOTE_DOCKER:719 LOG.debug(720 'Copying files to container "%s" from "%s".' % (container_name, lambda_cwd)721 )722 DOCKER_CLIENT.copy_into_container(723 container_name, "%s/." % lambda_cwd, DOCKER_TASK_FOLDER724 )725 LOG.debug("Starting docker-reuse Lambda container: %s", container_name)726 DOCKER_CLIENT.start_container(container_name)727 # give the container some time to start up728 time.sleep(1)729 container_network = self.get_docker_container_network(func_arn)730 entry_point = DOCKER_CLIENT.get_image_entrypoint(docker_image)731 LOG.debug(732 'Using entrypoint "%s" for container "%s" on network "%s".'733 % (entry_point, container_name, container_network)734 )735 return ContainerInfo(container_name, entry_point)736 def create_container(737 self,738 lambda_function: LambdaFunction,739 env_vars: Dict,740 lambda_cwd: str,741 docker_flags: str = None,742 ):743 docker_image = Util.docker_image_for_lambda(lambda_function)744 container_name = self.get_container_name(lambda_function.arn())745 # make sure we set LOCALSTACK_HOSTNAME746 Util.inject_endpoints_into_env(env_vars)747 # make sure AWS_LAMBDA_EVENT_BODY is not set (otherwise causes issues with "docker exec ..." above)748 env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)749 network = config.LAMBDA_DOCKER_NETWORK750 additional_flags = docker_flags751 dns = config.LAMBDA_DOCKER_DNS752 mount_volumes = not config.LAMBDA_REMOTE_DOCKER753 lambda_cwd_on_host = Util.get_host_path_for_path_in_docker(lambda_cwd)754 if ":" in lambda_cwd and "\\" in lambda_cwd:755 lambda_cwd_on_host = Util.format_windows_path(lambda_cwd_on_host)756 mount_volumes = [(lambda_cwd_on_host, DOCKER_TASK_FOLDER)] if mount_volumes else None757 if os.environ.get("HOSTNAME"):758 env_vars["HOSTNAME"] = os.environ.get("HOSTNAME")759 env_vars["EDGE_PORT"] = config.EDGE_PORT760 LOG.debug(761 "Creating docker-reuse Lambda container %s from image %s", container_name, docker_image762 )763 return DOCKER_CLIENT.create_container(764 image_name=docker_image,765 remove=True,766 interactive=True,767 name=container_name,768 entrypoint="/bin/bash",769 network=network,770 env_vars=env_vars,771 dns=dns,772 mount_volumes=mount_volumes,773 additional_flags=additional_flags,774 )775 def destroy_docker_container(self, func_arn):776 """777 Stops and/or removes a docker container for a specific lambda function ARN.778 :param func_arn: The ARN of the lambda function.779 :return: None780 """781 with self.docker_container_lock:782 status = self.get_docker_container_status(func_arn)783 # Get the container name and id.784 container_name = self.get_container_name(func_arn)785 if status == 1:786 LOG.debug("Stopping container: %s" % container_name)787 DOCKER_CLIENT.stop_container(container_name)788 status = self.get_docker_container_status(func_arn)789 if status == -1:790 LOG.debug("Removing container: %s" % container_name)791 rm_docker_container(container_name, safe=True)792 # clean up function invoke times, as some init logic depends on this793 self.function_invoke_times.pop(func_arn, None)794 def get_all_container_names(self):795 """796 Returns a list of container names for lambda containers.797 :return: A String[] localstack docker container names for each function.798 """799 with self.docker_container_lock:800 LOG.debug("Getting all lambda containers names.")801 list_result = DOCKER_CLIENT.list_containers(802 filter=f"name={self.get_container_prefix()}*"803 )804 container_names = list(map(lambda container: container["name"], list_result))805 return container_names806 def destroy_existing_docker_containers(self):807 """808 Stops and/or removes all lambda docker containers for localstack.809 :return: None810 """811 with self.docker_container_lock:812 container_names = self.get_all_container_names()813 LOG.debug("Removing %d containers." % len(container_names))814 for container_name in container_names:815 DOCKER_CLIENT.remove_container(container_name)816 def get_docker_container_status(self, func_arn):817 """818 Determine the status of a docker container.819 :param func_arn: The ARN of the lambda function.820 :return: 1 If the container is running,821 -1 if the container exists but is not running822 0 if the container does not exist.823 """824 with self.docker_container_lock:825 # Get the container name and id.826 container_name = self.get_container_name(func_arn)827 container_status = DOCKER_CLIENT.get_container_status(container_name)828 return container_status.value829 def get_docker_container_network(self, func_arn):830 """831 Determine the network of a docker container.832 :param func_arn: The ARN of the lambda function.833 :return: name of the container network834 """835 with self.docker_container_lock:836 status = self.get_docker_container_status(func_arn)837 # container does not exist838 if status == 0:839 return ""840 # Get the container name.841 container_name = self.get_container_name(func_arn)842 container_network = DOCKER_CLIENT.get_network(container_name)843 return container_network844 def idle_container_destroyer(self):845 """846 Iterates though all the lambda containers and destroys any container that has847 been inactive for longer than MAX_CONTAINER_IDLE_TIME_MS.848 :return: None849 """850 LOG.debug("Checking if there are idle containers ...")851 current_time = int(time.time() * 1000)852 for func_arn, last_run_time in dict(self.function_invoke_times).items():853 duration = current_time - last_run_time854 # not enough idle time has passed855 if duration < MAX_CONTAINER_IDLE_TIME_MS:856 continue857 # container has been idle, destroy it.858 self.destroy_docker_container(func_arn)859 def start_idle_container_destroyer_interval(self):860 """861 Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.862 Thus checking for idle containers and destroying them.863 :return: None864 """865 self.idle_container_destroyer()866 threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()867 def get_container_prefix(self) -> str:868 """869 Returns the prefix of all docker-reuse lambda containers for this LocalStack instance870 :return: Lambda container name prefix871 """872 return f"{bootstrap.get_main_container_name()}_lambda_"873 def get_container_name(self, func_arn):874 """875 Given a function ARN, returns a valid docker container name.876 :param func_arn: The ARN of the lambda function.877 :return: A docker compatible name for the arn.878 """879 return self.get_container_prefix() + re.sub(r"[^a-zA-Z0-9_.-]", "_", func_arn)880class LambdaExecutorSeparateContainers(LambdaExecutorContainers):881 def __init__(self):882 super(LambdaExecutorSeparateContainers, self).__init__()883 self.max_port = LAMBDA_API_UNIQUE_PORTS884 self.port_offset = LAMBDA_API_PORT_OFFSET885 def prepare_event(self, environment: Dict, event_body: str) -> bytes:886 # Tell Lambci to use STDIN for the event887 environment["DOCKER_LAMBDA_USE_STDIN"] = "1"888 return event_body.encode()889 def execute_in_container(890 self,891 lambda_function: LambdaFunction,892 inv_context: InvocationContext,893 stdin=None,894 background=False,895 ) -> Tuple[bytes, bytes]:896 lambda_cwd = lambda_function.cwd897 env_vars = inv_context.environment898 entrypoint = None899 if inv_context.lambda_command:900 entrypoint = ""901 elif inv_context.handler:902 inv_context.lambda_command = inv_context.handler903 # add Docker Lambda env vars904 network = config.LAMBDA_DOCKER_NETWORK or None905 if network == "host":906 port = get_free_tcp_port()907 env_vars["DOCKER_LAMBDA_API_PORT"] = port908 env_vars["DOCKER_LAMBDA_RUNTIME_PORT"] = port909 additional_flags = inv_context.docker_flags or ""910 dns = config.LAMBDA_DOCKER_DNS911 docker_java_ports = PortMappings()912 if Util.debug_java_port:913 docker_java_ports.add(Util.debug_java_port)914 docker_image = Util.docker_image_for_lambda(lambda_function)915 if config.LAMBDA_REMOTE_DOCKER:916 container_id = DOCKER_CLIENT.create_container(917 image_name=docker_image,918 interactive=True,919 entrypoint=entrypoint,920 remove=True,921 network=network,922 env_vars=env_vars,923 dns=dns,924 additional_flags=additional_flags,925 ports=docker_java_ports,926 command=inv_context.lambda_command,927 )928 DOCKER_CLIENT.copy_into_container(container_id, f"{lambda_cwd}/.", DOCKER_TASK_FOLDER)929 return DOCKER_CLIENT.start_container(930 container_id, interactive=not background, attach=not background, stdin=stdin931 )932 else:933 mount_volumes = None934 if lambda_cwd:935 mount_volumes = [936 (Util.get_host_path_for_path_in_docker(lambda_cwd), DOCKER_TASK_FOLDER)937 ]938 return DOCKER_CLIENT.run_container(939 image_name=docker_image,940 interactive=True,941 detach=background,942 entrypoint=entrypoint,943 remove=True,944 network=network,945 env_vars=env_vars,946 dns=dns,947 additional_flags=additional_flags,948 command=inv_context.lambda_command,949 mount_volumes=mount_volumes,950 stdin=stdin,951 )952class LambdaExecutorLocal(LambdaExecutor):953 def _execute_in_custom_runtime(954 self, cmd: Union[str, List[str]], lambda_function: LambdaFunction = None955 ) -> InvocationResult:956 """957 Generic run function for executing lambdas in custom runtimes.958 :param cmd: the command to execute959 :param lambda_function: function details960 :return: the InvocationResult961 """962 env_vars = lambda_function and lambda_function.envvars963 kwargs = {"stdin": True, "inherit_env": True, "asynchronous": True, "env_vars": env_vars}964 process = run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE, **kwargs)965 result, log_output = process.communicate()966 try:967 result = to_str(result).strip()968 except Exception:969 pass970 log_output = to_str(log_output).strip()971 return_code = process.returncode972 # Note: The user's code may have been logging to stderr, in which case the logs973 # will be part of the "result" variable here. Hence, make sure that we extract974 # only the *last* line of "result" and consider anything above that as log output.975 # TODO: not sure if this code is needed/used976 if isinstance(result, str) and "\n" in result:977 lines = result.split("\n")978 idx = last_index_of(979 lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)980 )981 if idx >= 0:982 result = lines[idx]983 additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])984 log_output += "\n%s" % additional_logs985 log_formatted = log_output.strip().replace("\n", "\n> ")986 func_arn = lambda_function and lambda_function.arn()987 LOG.debug(988 "Lambda %s result / log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)989 )990 # store log output - TODO get live logs from `process` above?991 # store_lambda_logs(lambda_function, log_output)992 if return_code != 0:993 raise InvocationException(994 "Lambda process returned error status code: %s. Result: %s. Output:\n%s"995 % (return_code, result, log_output),996 log_output,997 result,998 )999 invocation_result = InvocationResult(result, log_output=log_output)1000 return invocation_result1001 def _execute(1002 self, lambda_function: LambdaFunction, inv_context: InvocationContext1003 ) -> InvocationResult:1004 # apply plugin patches to prepare invocation context1005 result = self.apply_plugin_patches(inv_context)1006 if isinstance(result, InvocationResult):1007 return result1008 lambda_cwd = lambda_function.cwd1009 environment = self._prepare_environment(lambda_function)1010 if lambda_function.timeout:1011 environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)1012 context = inv_context.context1013 if context:1014 environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name1015 environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version1016 environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn1017 environment["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"] = str(context.memory_limit_in_mb)1018 # execute the Lambda function in a forked sub-process, sync result via queue1019 queue = Queue()1020 lambda_function_callable = lambda_function.function(inv_context.function_version)1021 def do_execute():1022 # now we're executing in the child process, safe to change CWD and ENV1023 result = None1024 try:1025 if lambda_cwd:1026 os.chdir(lambda_cwd)1027 sys.path.insert(0, "")1028 if environment:1029 os.environ.update(environment)1030 # set default env variables required for most Lambda handlers1031 self.set_default_env_variables()1032 # run the actual handler function1033 result = lambda_function_callable(inv_context.event, context)1034 except Exception as e:1035 result = str(e)1036 sys.stderr.write("%s %s" % (e, traceback.format_exc()))1037 raise1038 finally:1039 queue.put(result)1040 process = Process(target=do_execute)1041 start_time = now(millis=True)1042 error = None1043 with CaptureOutput() as c:1044 try:1045 process.run()1046 except Exception as e:1047 error = e1048 result = queue.get()1049 end_time = now(millis=True)1050 # Make sure to keep the log line below, to ensure the log stream gets created1051 request_id = long_uid()1052 log_output = 'START %s: Lambda %s started via "local" executor ...' % (1053 request_id,1054 lambda_function.arn(),1055 )1056 # TODO: Interweaving stdout/stderr currently not supported1057 for stream in (c.stdout(), c.stderr()):1058 if stream:1059 log_output += ("\n" if log_output else "") + stream1060 if isinstance(result, InvocationResult) and result.log_output:1061 log_output += "\n" + result.log_output1062 log_output += "\nEND RequestId: %s" % request_id1063 log_output += "\nREPORT RequestId: %s Duration: %s ms" % (1064 request_id,1065 int((end_time - start_time) * 1000),1066 )1067 # store logs to CloudWatch1068 store_lambda_logs(lambda_function, log_output)1069 result = result.result if isinstance(result, InvocationResult) else result1070 if error:1071 LOG.info(1072 'Error executing Lambda "%s": %s %s',1073 lambda_function.arn(),1074 error,1075 "".join(traceback.format_tb(error.__traceback__)),1076 )1077 raise InvocationException(result, log_output)1078 # construct final invocation result1079 invocation_result = InvocationResult(result, log_output=log_output)1080 # run plugins post-processing logic1081 invocation_result = self.process_result_via_plugins(inv_context, invocation_result)1082 return invocation_result1083 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:1084 # This is a no-op for local executors - simply return the given local file path1085 return local_file1086 def execute_java_lambda(1087 self, event, context, main_file, lambda_function: LambdaFunction = None1088 ) -> InvocationResult:1089 lambda_function.envvars = lambda_function.envvars or {}1090 java_opts = config.LAMBDA_JAVA_OPTS or ""1091 handler = lambda_function.handler1092 lambda_function.envvars[LAMBDA_HANDLER_ENV_VAR_NAME] = handler1093 event_file = EVENT_FILE_PATTERN.replace("*", short_uid())1094 save_file(event_file, json.dumps(json_safe(event)))1095 TMP_FILES.append(event_file)1096 classpath = "%s:%s:%s" % (1097 main_file,1098 Util.get_java_classpath(main_file),1099 LAMBDA_EXECUTOR_JAR,1100 )1101 cmd = "java %s -cp %s %s %s" % (1102 java_opts,1103 classpath,1104 LAMBDA_EXECUTOR_CLASS,1105 event_file,1106 )1107 # apply plugin patches1108 inv_context = InvocationContext(1109 lambda_function, event, environment=lambda_function.envvars, lambda_command=cmd1110 )1111 result = self.apply_plugin_patches(inv_context)1112 if isinstance(result, InvocationResult):1113 return result1114 cmd = inv_context.lambda_command1115 LOG.info(cmd)1116 # execute Lambda and get invocation result1117 invocation_result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1118 return invocation_result1119 def execute_javascript_lambda(1120 self, event, context, main_file, lambda_function: LambdaFunction = None1121 ):1122 handler = lambda_function.handler1123 function = handler.split(".")[-1]1124 event_json_string = "%s" % (json.dumps(json_safe(event)) if event else "{}")1125 context_json_string = "%s" % (json.dumps(context.__dict__) if context else "{}")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!