Best Python code snippet using localstack_python
lambda_executors.py
Source:lambda_executors.py
...326 aws_stack.inject_test_credentials_into_env(result)327 # injecting the region into the docker environment328 aws_stack.inject_region_into_env(result, lambda_function.region())329 return result330 def _lambda_result_to_destination(331 self,332 func_details: LambdaFunction,333 event: Dict,334 result: InvocationResult,335 is_async: bool,336 error: Optional[InvocationException],337 ):338 if not func_details.destination_enabled():339 return340 payload = {341 "version": "1.0",342 "timestamp": timestamp_millis(),343 "requestContext": {344 "requestId": long_uid(),345 "functionArn": func_details.arn(),346 "condition": "RetriesExhausted",347 "approximateInvokeCount": 1,348 },349 "requestPayload": event,350 "responseContext": {"statusCode": 200, "executedVersion": "$LATEST"},351 "responsePayload": {},352 }353 if result and result.result:354 try:355 payload["requestContext"]["condition"] = "Success"356 payload["responsePayload"] = json.loads(result.result)357 except Exception:358 payload["responsePayload"] = result.result359 if error:360 payload["responseContext"]["functionError"] = "Unhandled"361 # add the result in the response payload362 if error.result is not None:363 payload["responsePayload"] = json.loads(error.result)364 send_event_to_target(func_details.on_failed_invocation, payload)365 return366 if func_details.on_successful_invocation is not None:367 send_event_to_target(func_details.on_successful_invocation, payload)368 def execute(369 self,370 func_arn: str, # TODO remove and get from lambda_function371 lambda_function: LambdaFunction,372 event: Dict,373 context: LambdaContext = None,374 version: str = None,375 asynchronous: bool = False,376 callback: Callable = None,377 lock_discriminator: str = None,378 ):379 def do_execute(*args):380 @cloudwatched("lambda")381 def _run(func_arn=None):382 with contextlib.ExitStack() as stack:383 if lock_discriminator:384 stack.enter_context(LAMBDA_ASYNC_LOCKS.locks[lock_discriminator])385 # set the invocation time in milliseconds386 invocation_time = int(time.time() * 1000)387 # start the execution388 raised_error = None389 result = None390 invocation_type = "Event" if asynchronous else "RequestResponse"391 inv_context = InvocationContext(392 lambda_function,393 event=event,394 function_version=version,395 context=context,396 invocation_type=invocation_type,397 )398 try:399 result = self._execute(lambda_function, inv_context)400 except Exception as e:401 raised_error = e402 handle_error(lambda_function, event, e, asynchronous)403 raise e404 finally:405 self.function_invoke_times[func_arn] = invocation_time406 if callback:407 callback(result, func_arn, event, error=raised_error)408 self._lambda_result_to_destination(409 lambda_function, event, result, asynchronous, raised_error410 )411 # return final result412 return result413 return _run(func_arn=func_arn)414 # Inform users about asynchronous mode of the lambda execution.415 if asynchronous:416 LOG.debug(417 "Lambda executed in Event (asynchronous) mode, no response will be returned to caller"418 )419 FuncThread(do_execute).start()420 return InvocationResult(None, log_output="Lambda executed asynchronously.")421 return do_execute()422 def _execute(self, lambda_function: LambdaFunction, inv_context: InvocationContext):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!