Best Python code snippet using localstack_python
lambda_executors.py
Source:lambda_executors.py
...531 LOG.debug(532 "Lambda %s result / log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)533 )534 # store log output - TODO get live logs from `process` above?535 store_lambda_logs(lambda_function, log_output)536 if error:537 raise InvocationException(538 "Lambda process returned with error. Result: %s. Output:\n%s"539 % (result, log_output),540 log_output,541 result,542 ) from error543 # create result544 invocation_result = InvocationResult(result, log_output=log_output)545 # run plugins post-processing logic546 invocation_result = self.process_result_via_plugins(inv_context, invocation_result)547 return invocation_result548 def prepare_event(self, environment: Dict, event_body: str) -> bytes:549 """Return the event as a stdin string."""550 # amend the environment variables for execution551 environment["AWS_LAMBDA_EVENT_BODY"] = event_body552 return event_body.encode()553 def _execute(self, lambda_function: LambdaFunction, inv_context: InvocationContext):554 runtime = lambda_function.runtime555 handler = lambda_function.handler556 environment = inv_context.environment = self._prepare_environment(lambda_function)557 event = inv_context.event558 context = inv_context.context559 # configure USE_SSL in environment560 if config.USE_SSL:561 environment["USE_SSL"] = "1"562 # prepare event body563 if not event:564 LOG.info(565 'Empty event body specified for invocation of Lambda "%s"' % lambda_function.arn()566 )567 event = {}568 event_body = json.dumps(json_safe(event))569 event_bytes_for_stdin = self.prepare_event(environment, event_body)570 inv_context.event = event_bytes_for_stdin571 Util.inject_endpoints_into_env(environment)572 environment["EDGE_PORT"] = str(config.EDGE_PORT)573 environment[LAMBDA_HANDLER_ENV_VAR_NAME] = handler574 if os.environ.get("HTTP_PROXY"):575 environment["HTTP_PROXY"] = os.environ["HTTP_PROXY"]576 if lambda_function.timeout:577 environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)578 if context:579 environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name580 environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version581 environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn582 environment["AWS_LAMBDA_COGNITO_IDENTITY"] = json.dumps(context.cognito_identity or {})583 if context.client_context is not None:584 environment["AWS_LAMBDA_CLIENT_CONTEXT"] = json.dumps(585 to_str(base64.b64decode(to_bytes(context.client_context)))586 )587 # pass JVM options to the Lambda environment, if configured588 if config.LAMBDA_JAVA_OPTS and is_java_lambda(runtime):589 if environment.get("JAVA_TOOL_OPTIONS"):590 LOG.info(591 "Skip setting LAMBDA_JAVA_OPTS as JAVA_TOOL_OPTIONS already defined in Lambda env vars"592 )593 else:594 LOG.debug(595 "Passing JVM options to container environment: JAVA_TOOL_OPTIONS=%s"596 % config.LAMBDA_JAVA_OPTS597 )598 environment["JAVA_TOOL_OPTIONS"] = config.LAMBDA_JAVA_OPTS599 # accept any self-signed certificates for outgoing calls from the Lambda600 if is_nodejs_runtime(runtime):601 environment["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"602 # run Lambda executor and fetch invocation result603 LOG.info("Running lambda: %s" % lambda_function.arn())604 result = self.run_lambda_executor(lambda_function=lambda_function, inv_context=inv_context)605 return result606 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:607 if config.LAMBDA_REMOTE_DOCKER:608 LOG.info("TODO: copy file into container for LAMBDA_REMOTE_DOCKER=1 - %s", local_file)609 return local_file610 mountable_file = Util.get_host_path_for_path_in_docker(local_file)611 _, extension = os.path.splitext(local_file)612 target_file_name = f"{md5(local_file)}{extension}"613 target_path = f"/tmp/{target_file_name}"614 inv_context.docker_flags = inv_context.docker_flags or ""615 inv_context.docker_flags += f"-v {mountable_file}:{target_path}"616 return target_path617class LambdaExecutorReuseContainers(LambdaExecutorContainers):618 """Executor class for executing Lambda functions in re-usable Docker containers"""619 def __init__(self):620 super(LambdaExecutorReuseContainers, self).__init__()621 # locking thread for creation/destruction of docker containers.622 self.docker_container_lock = threading.RLock()623 # On each invocation we try to construct a port unlikely to conflict624 # with a previously invoked lambda function. This is a problem with at625 # least the lambci/lambda:go1.x container, which execs a go program that626 # attempts to bind to the same default port.627 self.next_port = 0628 self.max_port = LAMBDA_SERVER_UNIQUE_PORTS629 self.port_offset = LAMBDA_SERVER_PORT_OFFSET630 def execute_in_container(631 self,632 lambda_function: LambdaFunction,633 inv_context: InvocationContext,634 stdin=None,635 background=False,636 ) -> Tuple[bytes, bytes]:637 func_arn = lambda_function.arn()638 lambda_cwd = lambda_function.cwd639 runtime = lambda_function.runtime640 env_vars = inv_context.environment641 # check whether the Lambda has been invoked before642 has_been_invoked_before = func_arn in self.function_invoke_times643 # Choose a port for this invocation644 with self.docker_container_lock:645 env_vars["_LAMBDA_SERVER_PORT"] = str(self.next_port + self.port_offset)646 self.next_port = (self.next_port + 1) % self.max_port647 # create/verify the docker container is running.648 LOG.debug(649 'Priming docker container with runtime "%s" and arn "%s".',650 runtime,651 func_arn,652 )653 container_info = self.prime_docker_container(654 lambda_function, dict(env_vars), lambda_cwd, inv_context.docker_flags655 )656 if not inv_context.lambda_command and inv_context.handler:657 command = container_info.entry_point.split()658 command.append(inv_context.handler)659 inv_context.lambda_command = command660 # determine files to be copied into the container661 if not has_been_invoked_before and config.LAMBDA_REMOTE_DOCKER:662 # if this is the first invocation: copy the entire folder into the container663 DOCKER_CLIENT.copy_into_container(664 container_info.name, f"{lambda_cwd}/.", DOCKER_TASK_FOLDER665 )666 return DOCKER_CLIENT.exec_in_container(667 container_name_or_id=container_info.name,668 command=inv_context.lambda_command,669 interactive=True,670 env_vars=env_vars,671 stdin=stdin,672 )673 def _execute(self, func_arn, *args, **kwargs):674 if not LAMBDA_CONCURRENCY_LOCK.get(func_arn):675 concurrency_lock = threading.RLock()676 LAMBDA_CONCURRENCY_LOCK[func_arn] = concurrency_lock677 with LAMBDA_CONCURRENCY_LOCK[func_arn]:678 return super(LambdaExecutorReuseContainers, self)._execute(func_arn, *args, **kwargs)679 def startup(self):680 self.cleanup()681 # start a process to remove idle containers682 if config.LAMBDA_REMOVE_CONTAINERS:683 self.start_idle_container_destroyer_interval()684 def cleanup(self, arn=None):685 if arn:686 self.function_invoke_times.pop(arn, None)687 return self.destroy_docker_container(arn)688 self.function_invoke_times = {}689 return self.destroy_existing_docker_containers()690 def prime_docker_container(691 self,692 lambda_function: LambdaFunction,693 env_vars: Dict,694 lambda_cwd: str,695 docker_flags: str = None,696 ):697 """698 Prepares a persistent docker container for a specific function.699 :param lambda_function: The Details of the lambda function.700 :param env_vars: The environment variables for the lambda.701 :param lambda_cwd: The local directory containing the code for the lambda function.702 :return: ContainerInfo class containing the container name and default entry point.703 """704 with self.docker_container_lock:705 # Get the container name and id.706 func_arn = lambda_function.arn()707 container_name = self.get_container_name(func_arn)708 status = self.get_docker_container_status(func_arn)709 LOG.debug('Priming Docker container (status "%s"): %s' % (status, container_name))710 docker_image = Util.docker_image_for_lambda(lambda_function)711 # Container is not running or doesn't exist.712 if status < 1:713 # Make sure the container does not exist in any form/state.714 self.destroy_docker_container(func_arn)715 # get container startup command and run it716 LOG.debug("Creating container: %s" % container_name)717 self.create_container(lambda_function, env_vars, lambda_cwd, docker_flags)718 if config.LAMBDA_REMOTE_DOCKER:719 LOG.debug(720 'Copying files to container "%s" from "%s".' % (container_name, lambda_cwd)721 )722 DOCKER_CLIENT.copy_into_container(723 container_name, "%s/." % lambda_cwd, DOCKER_TASK_FOLDER724 )725 LOG.debug("Starting docker-reuse Lambda container: %s", container_name)726 DOCKER_CLIENT.start_container(container_name)727 # give the container some time to start up728 time.sleep(1)729 container_network = self.get_docker_container_network(func_arn)730 entry_point = DOCKER_CLIENT.get_image_entrypoint(docker_image)731 LOG.debug(732 'Using entrypoint "%s" for container "%s" on network "%s".'733 % (entry_point, container_name, container_network)734 )735 return ContainerInfo(container_name, entry_point)736 def create_container(737 self,738 lambda_function: LambdaFunction,739 env_vars: Dict,740 lambda_cwd: str,741 docker_flags: str = None,742 ):743 docker_image = Util.docker_image_for_lambda(lambda_function)744 container_name = self.get_container_name(lambda_function.arn())745 # make sure we set LOCALSTACK_HOSTNAME746 Util.inject_endpoints_into_env(env_vars)747 # make sure AWS_LAMBDA_EVENT_BODY is not set (otherwise causes issues with "docker exec ..." above)748 env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)749 network = config.LAMBDA_DOCKER_NETWORK750 additional_flags = docker_flags751 dns = config.LAMBDA_DOCKER_DNS752 mount_volumes = not config.LAMBDA_REMOTE_DOCKER753 lambda_cwd_on_host = Util.get_host_path_for_path_in_docker(lambda_cwd)754 if ":" in lambda_cwd and "\\" in lambda_cwd:755 lambda_cwd_on_host = Util.format_windows_path(lambda_cwd_on_host)756 mount_volumes = [(lambda_cwd_on_host, DOCKER_TASK_FOLDER)] if mount_volumes else None757 if os.environ.get("HOSTNAME"):758 env_vars["HOSTNAME"] = os.environ.get("HOSTNAME")759 env_vars["EDGE_PORT"] = config.EDGE_PORT760 LOG.debug(761 "Creating docker-reuse Lambda container %s from image %s", container_name, docker_image762 )763 return DOCKER_CLIENT.create_container(764 image_name=docker_image,765 remove=True,766 interactive=True,767 name=container_name,768 entrypoint="/bin/bash",769 network=network,770 env_vars=env_vars,771 dns=dns,772 mount_volumes=mount_volumes,773 additional_flags=additional_flags,774 )775 def destroy_docker_container(self, func_arn):776 """777 Stops and/or removes a docker container for a specific lambda function ARN.778 :param func_arn: The ARN of the lambda function.779 :return: None780 """781 with self.docker_container_lock:782 status = self.get_docker_container_status(func_arn)783 # Get the container name and id.784 container_name = self.get_container_name(func_arn)785 if status == 1:786 LOG.debug("Stopping container: %s" % container_name)787 DOCKER_CLIENT.stop_container(container_name)788 status = self.get_docker_container_status(func_arn)789 if status == -1:790 LOG.debug("Removing container: %s" % container_name)791 rm_docker_container(container_name, safe=True)792 # clean up function invoke times, as some init logic depends on this793 self.function_invoke_times.pop(func_arn, None)794 def get_all_container_names(self):795 """796 Returns a list of container names for lambda containers.797 :return: A String[] localstack docker container names for each function.798 """799 with self.docker_container_lock:800 LOG.debug("Getting all lambda containers names.")801 list_result = DOCKER_CLIENT.list_containers(802 filter=f"name={self.get_container_prefix()}*"803 )804 container_names = list(map(lambda container: container["name"], list_result))805 return container_names806 def destroy_existing_docker_containers(self):807 """808 Stops and/or removes all lambda docker containers for localstack.809 :return: None810 """811 with self.docker_container_lock:812 container_names = self.get_all_container_names()813 LOG.debug("Removing %d containers." % len(container_names))814 for container_name in container_names:815 DOCKER_CLIENT.remove_container(container_name)816 def get_docker_container_status(self, func_arn):817 """818 Determine the status of a docker container.819 :param func_arn: The ARN of the lambda function.820 :return: 1 If the container is running,821 -1 if the container exists but is not running822 0 if the container does not exist.823 """824 with self.docker_container_lock:825 # Get the container name and id.826 container_name = self.get_container_name(func_arn)827 container_status = DOCKER_CLIENT.get_container_status(container_name)828 return container_status.value829 def get_docker_container_network(self, func_arn):830 """831 Determine the network of a docker container.832 :param func_arn: The ARN of the lambda function.833 :return: name of the container network834 """835 with self.docker_container_lock:836 status = self.get_docker_container_status(func_arn)837 # container does not exist838 if status == 0:839 return ""840 # Get the container name.841 container_name = self.get_container_name(func_arn)842 container_network = DOCKER_CLIENT.get_network(container_name)843 return container_network844 def idle_container_destroyer(self):845 """846 Iterates though all the lambda containers and destroys any container that has847 been inactive for longer than MAX_CONTAINER_IDLE_TIME_MS.848 :return: None849 """850 LOG.debug("Checking if there are idle containers ...")851 current_time = int(time.time() * 1000)852 for func_arn, last_run_time in dict(self.function_invoke_times).items():853 duration = current_time - last_run_time854 # not enough idle time has passed855 if duration < MAX_CONTAINER_IDLE_TIME_MS:856 continue857 # container has been idle, destroy it.858 self.destroy_docker_container(func_arn)859 def start_idle_container_destroyer_interval(self):860 """861 Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.862 Thus checking for idle containers and destroying them.863 :return: None864 """865 self.idle_container_destroyer()866 threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()867 def get_container_prefix(self) -> str:868 """869 Returns the prefix of all docker-reuse lambda containers for this LocalStack instance870 :return: Lambda container name prefix871 """872 return f"{bootstrap.get_main_container_name()}_lambda_"873 def get_container_name(self, func_arn):874 """875 Given a function ARN, returns a valid docker container name.876 :param func_arn: The ARN of the lambda function.877 :return: A docker compatible name for the arn.878 """879 return self.get_container_prefix() + re.sub(r"[^a-zA-Z0-9_.-]", "_", func_arn)880class LambdaExecutorSeparateContainers(LambdaExecutorContainers):881 def __init__(self):882 super(LambdaExecutorSeparateContainers, self).__init__()883 self.max_port = LAMBDA_API_UNIQUE_PORTS884 self.port_offset = LAMBDA_API_PORT_OFFSET885 def prepare_event(self, environment: Dict, event_body: str) -> bytes:886 # Tell Lambci to use STDIN for the event887 environment["DOCKER_LAMBDA_USE_STDIN"] = "1"888 return event_body.encode()889 def execute_in_container(890 self,891 lambda_function: LambdaFunction,892 inv_context: InvocationContext,893 stdin=None,894 background=False,895 ) -> Tuple[bytes, bytes]:896 lambda_cwd = lambda_function.cwd897 env_vars = inv_context.environment898 entrypoint = None899 if inv_context.lambda_command:900 entrypoint = ""901 elif inv_context.handler:902 inv_context.lambda_command = inv_context.handler903 # add Docker Lambda env vars904 network = config.LAMBDA_DOCKER_NETWORK or None905 if network == "host":906 port = get_free_tcp_port()907 env_vars["DOCKER_LAMBDA_API_PORT"] = port908 env_vars["DOCKER_LAMBDA_RUNTIME_PORT"] = port909 additional_flags = inv_context.docker_flags or ""910 dns = config.LAMBDA_DOCKER_DNS911 docker_java_ports = PortMappings()912 if Util.debug_java_port:913 docker_java_ports.add(Util.debug_java_port)914 docker_image = Util.docker_image_for_lambda(lambda_function)915 if config.LAMBDA_REMOTE_DOCKER:916 container_id = DOCKER_CLIENT.create_container(917 image_name=docker_image,918 interactive=True,919 entrypoint=entrypoint,920 remove=True,921 network=network,922 env_vars=env_vars,923 dns=dns,924 additional_flags=additional_flags,925 ports=docker_java_ports,926 command=inv_context.lambda_command,927 )928 DOCKER_CLIENT.copy_into_container(container_id, f"{lambda_cwd}/.", DOCKER_TASK_FOLDER)929 return DOCKER_CLIENT.start_container(930 container_id, interactive=not background, attach=not background, stdin=stdin931 )932 else:933 mount_volumes = None934 if lambda_cwd:935 mount_volumes = [936 (Util.get_host_path_for_path_in_docker(lambda_cwd), DOCKER_TASK_FOLDER)937 ]938 return DOCKER_CLIENT.run_container(939 image_name=docker_image,940 interactive=True,941 detach=background,942 entrypoint=entrypoint,943 remove=True,944 network=network,945 env_vars=env_vars,946 dns=dns,947 additional_flags=additional_flags,948 command=inv_context.lambda_command,949 mount_volumes=mount_volumes,950 stdin=stdin,951 )952class LambdaExecutorLocal(LambdaExecutor):953 def _execute_in_custom_runtime(954 self, cmd: Union[str, List[str]], lambda_function: LambdaFunction = None955 ) -> InvocationResult:956 """957 Generic run function for executing lambdas in custom runtimes.958 :param cmd: the command to execute959 :param lambda_function: function details960 :return: the InvocationResult961 """962 env_vars = lambda_function and lambda_function.envvars963 kwargs = {"stdin": True, "inherit_env": True, "asynchronous": True, "env_vars": env_vars}964 process = run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE, **kwargs)965 result, log_output = process.communicate()966 try:967 result = to_str(result).strip()968 except Exception:969 pass970 log_output = to_str(log_output).strip()971 return_code = process.returncode972 # Note: The user's code may have been logging to stderr, in which case the logs973 # will be part of the "result" variable here. Hence, make sure that we extract974 # only the *last* line of "result" and consider anything above that as log output.975 # TODO: not sure if this code is needed/used976 if isinstance(result, str) and "\n" in result:977 lines = result.split("\n")978 idx = last_index_of(979 lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)980 )981 if idx >= 0:982 result = lines[idx]983 additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])984 log_output += "\n%s" % additional_logs985 log_formatted = log_output.strip().replace("\n", "\n> ")986 func_arn = lambda_function and lambda_function.arn()987 LOG.debug(988 "Lambda %s result / log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)989 )990 # store log output - TODO get live logs from `process` above?991 # store_lambda_logs(lambda_function, log_output)992 if return_code != 0:993 raise InvocationException(994 "Lambda process returned error status code: %s. Result: %s. Output:\n%s"995 % (return_code, result, log_output),996 log_output,997 result,998 )999 invocation_result = InvocationResult(result, log_output=log_output)1000 return invocation_result1001 def _execute(1002 self, lambda_function: LambdaFunction, inv_context: InvocationContext1003 ) -> InvocationResult:1004 # apply plugin patches to prepare invocation context1005 result = self.apply_plugin_patches(inv_context)1006 if isinstance(result, InvocationResult):1007 return result1008 lambda_cwd = lambda_function.cwd1009 environment = self._prepare_environment(lambda_function)1010 if lambda_function.timeout:1011 environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)1012 context = inv_context.context1013 if context:1014 environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name1015 environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version1016 environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn1017 environment["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"] = str(context.memory_limit_in_mb)1018 # execute the Lambda function in a forked sub-process, sync result via queue1019 queue = Queue()1020 lambda_function_callable = lambda_function.function(inv_context.function_version)1021 def do_execute():1022 # now we're executing in the child process, safe to change CWD and ENV1023 result = None1024 try:1025 if lambda_cwd:1026 os.chdir(lambda_cwd)1027 sys.path.insert(0, "")1028 if environment:1029 os.environ.update(environment)1030 # set default env variables required for most Lambda handlers1031 self.set_default_env_variables()1032 # run the actual handler function1033 result = lambda_function_callable(inv_context.event, context)1034 except Exception as e:1035 result = str(e)1036 sys.stderr.write("%s %s" % (e, traceback.format_exc()))1037 raise1038 finally:1039 queue.put(result)1040 process = Process(target=do_execute)1041 start_time = now(millis=True)1042 error = None1043 with CaptureOutput() as c:1044 try:1045 process.run()1046 except Exception as e:1047 error = e1048 result = queue.get()1049 end_time = now(millis=True)1050 # Make sure to keep the log line below, to ensure the log stream gets created1051 request_id = long_uid()1052 log_output = 'START %s: Lambda %s started via "local" executor ...' % (1053 request_id,1054 lambda_function.arn(),1055 )1056 # TODO: Interweaving stdout/stderr currently not supported1057 for stream in (c.stdout(), c.stderr()):1058 if stream:1059 log_output += ("\n" if log_output else "") + stream1060 if isinstance(result, InvocationResult) and result.log_output:1061 log_output += "\n" + result.log_output1062 log_output += "\nEND RequestId: %s" % request_id1063 log_output += "\nREPORT RequestId: %s Duration: %s ms" % (1064 request_id,1065 int((end_time - start_time) * 1000),1066 )1067 # store logs to CloudWatch1068 store_lambda_logs(lambda_function, log_output)1069 result = result.result if isinstance(result, InvocationResult) else result1070 if error:1071 LOG.info(1072 'Error executing Lambda "%s": %s %s',1073 lambda_function.arn(),1074 error,1075 "".join(traceback.format_tb(error.__traceback__)),1076 )1077 raise InvocationException(result, log_output)1078 # construct final invocation result1079 invocation_result = InvocationResult(result, log_output=log_output)1080 # run plugins post-processing logic1081 invocation_result = self.process_result_via_plugins(inv_context, invocation_result)1082 return invocation_result...
lambda_utils.py
Source:lambda_utils.py
...92 return runtime.startswith("nodejs")93def is_python_runtime(lambda_details):94 runtime = getattr(lambda_details, "runtime", lambda_details) or ""95 return runtime.startswith("python")96def store_lambda_logs(97 lambda_function: LambdaFunction, log_output: str, invocation_time=None, container_id=None98):99 # leave here to avoid import issues from CLI100 from localstack.utils.cloudwatch.cloudwatch_util import store_cloudwatch_logs101 log_group_name = "/aws/lambda/%s" % lambda_function.name()102 container_id = container_id or short_uid()103 invocation_time = invocation_time or int(time.time() * 1000)104 invocation_time_secs = int(invocation_time / 1000)105 time_str = time.strftime("%Y/%m/%d", time.gmtime(invocation_time_secs))106 log_stream_name = "%s/[LATEST]%s" % (time_str, container_id)107 return store_cloudwatch_logs(log_group_name, log_stream_name, log_output, invocation_time)108def get_main_endpoint_from_container():109 global DOCKER_MAIN_CONTAINER_IP110 if not config.HOSTNAME_FROM_LAMBDA and DOCKER_MAIN_CONTAINER_IP is None:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!