How to use _should_use_stay_open_mode method in localstack

Best Python code snippet using localstack_python

lambda_executors.py

Source:lambda_executors.py Github

copy

Full Screen

...695 command = shlex.split(container_info.entry_point)696 command.append(inv_context.handler)697 inv_context.lambda_command = command698 lambda_docker_ip = DOCKER_CLIENT.get_container_ip(container_info.name)699 if not self._should_use_stay_open_mode(lambda_function, lambda_docker_ip, check_port=True):700 LOG.debug("Using 'docker exec' to run invocation in docker-reuse Lambda container")701 # disable stay open mode for this one, to prevent starting runtime API server702 env_vars["DOCKER_LAMBDA_STAY_OPEN"] = None703 return DOCKER_CLIENT.exec_in_container(704 container_name_or_id=container_info.name,705 command=inv_context.lambda_command,706 interactive=True,707 env_vars=env_vars,708 stdin=stdin,709 )710 inv_result = self.invoke_lambda(lambda_function, inv_context, lambda_docker_ip)711 return (inv_result.result, inv_result.log_output)712 def invoke_lambda(713 self,714 lambda_function: LambdaFunction,715 inv_context: InvocationContext,716 lambda_docker_ip=None,717 ) -> InvocationResult:718 full_url = self._get_lambda_stay_open_url(lambda_docker_ip)719 client = aws_stack.connect_to_service("lambda", endpoint_url=full_url)720 event = inv_context.event or "{}"721 LOG.debug(f"Calling {full_url} to run invocation in docker-reuse Lambda container")722 response = client.invoke(723 FunctionName=lambda_function.name(),724 InvocationType=inv_context.invocation_type,725 Payload=to_bytes(event),726 LogType="Tail",727 )728 log_output = base64.b64decode(response.get("LogResult") or b"").decode("utf-8")729 result = response["Payload"].read().decode("utf-8")730 if "FunctionError" in response:731 raise InvocationException(732 "Lambda process returned with error. Result: %s. Output:\n%s"733 % (result, log_output),734 log_output,735 result,736 )737 return InvocationResult(result, log_output)738 def _should_use_stay_open_mode(739 self,740 lambda_function: LambdaFunction,741 lambda_docker_ip: Optional[str] = None,742 check_port: bool = False,743 ) -> bool:744 """Return whether to use stay-open execution mode - if we're running in Docker, the given IP745 is defined, and if the target API endpoint is available (optionally, if check_port is True)."""746 if not lambda_docker_ip:747 func_arn = lambda_function.arn()748 container_name = self.get_container_name(func_arn)749 lambda_docker_ip = DOCKER_CLIENT.get_container_ip(container_name_or_id=container_name)750 should_use = lambda_docker_ip and config.LAMBDA_STAY_OPEN_MODE751 if not should_use or not check_port:752 return should_use753 full_url = self._get_lambda_stay_open_url(lambda_docker_ip)754 return is_port_open(full_url)755 def _get_lambda_stay_open_url(self, lambda_docker_ip: str) -> str:756 return f"http://{lambda_docker_ip}:{STAY_OPEN_API_PORT}"757 def _execute(self, func_arn: str, *args, **kwargs) -> InvocationResult:758 if not LAMBDA_CONCURRENCY_LOCK.get(func_arn):759 concurrency_lock = threading.RLock()760 LAMBDA_CONCURRENCY_LOCK[func_arn] = concurrency_lock761 with LAMBDA_CONCURRENCY_LOCK[func_arn]:762 return super(LambdaExecutorReuseContainers, self)._execute(func_arn, *args, **kwargs)763 def startup(self):764 self.cleanup()765 # start a process to remove idle containers766 if config.LAMBDA_REMOVE_CONTAINERS:767 self.start_idle_container_destroyer_interval()768 def cleanup(self, arn: str = None):769 if arn:770 self.function_invoke_times.pop(arn, None)771 return self.destroy_docker_container(arn)772 self.function_invoke_times = {}773 return self.destroy_existing_docker_containers()774 def prime_docker_container(775 self,776 lambda_function: LambdaFunction,777 env_vars: Dict,778 lambda_cwd: str,779 docker_flags: str = None,780 ):781 """782 Prepares a persistent docker container for a specific function.783 :param lambda_function: The Details of the lambda function.784 :param env_vars: The environment variables for the lambda.785 :param lambda_cwd: The local directory containing the code for the lambda function.786 :return: ContainerInfo class containing the container name and default entry point.787 """788 with self.docker_container_lock:789 # Get the container name and id.790 func_arn = lambda_function.arn()791 container_name = self.get_container_name(func_arn)792 status = self.get_docker_container_status(func_arn)793 LOG.debug('Priming Docker container (status "%s"): %s', status, container_name)794 docker_image = Util.docker_image_for_lambda(lambda_function)795 # Container is not running or doesn't exist.796 if status < 1:797 # Make sure the container does not exist in any form/state.798 self.destroy_docker_container(func_arn)799 # get container startup command and run it800 LOG.debug("Creating container: %s", container_name)801 self.create_container(lambda_function, env_vars, lambda_cwd, docker_flags)802 LOG.debug("Starting docker-reuse Lambda container: %s", container_name)803 DOCKER_CLIENT.start_container(container_name)804 def wait_up():805 cont_status = DOCKER_CLIENT.get_container_status(container_name)806 assert cont_status == DockerContainerStatus.UP807 if not in_docker():808 return809 # if we're executing in Docker using stay-open mode, additionally check if the target is available810 lambda_docker_ip = DOCKER_CLIENT.get_container_ip(container_name)811 if self._should_use_stay_open_mode(lambda_function, lambda_docker_ip):812 full_url = self._get_lambda_stay_open_url(lambda_docker_ip)813 wait_for_port_open(full_url, sleep_time=0.5, retries=8)814 # give the container some time to start up815 retry(wait_up, retries=15, sleep=0.8)816 container_network = self.get_docker_container_network(func_arn)817 entry_point = DOCKER_CLIENT.get_image_entrypoint(docker_image)818 LOG.debug(819 'Using entrypoint "%s" for container "%s" on network "%s".',820 entry_point,821 container_name,822 container_network,823 )824 return ContainerInfo(container_name, entry_point)825 def create_container(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful