Best Python code snippet using localstack_python
lambda_api.py
Source:lambda_api.py
...240 result.append(m)241 return result242def get_function_version(arn, version):243 func = arn_to_lambda.get(arn)244 return format_func_details(func, version=version, always_add_version=True)245def publish_new_function_version(arn):246 func_details = arn_to_lambda.get(arn)247 versions = func_details.versions248 last_version = func_details.max_version()249 versions[str(last_version + 1)] = {250 'CodeSize': versions.get('$LATEST').get('CodeSize'),251 'CodeSha256': versions.get('$LATEST').get('CodeSha256'),252 'Function': versions.get('$LATEST').get('Function'),253 'RevisionId': str(uuid.uuid4())254 }255 return get_function_version(arn, str(last_version + 1))256def do_list_versions(arn):257 return sorted([get_function_version(arn, version) for version in258 arn_to_lambda.get(arn).versions.keys()], key=lambda k: str(k.get('Version')))259def do_update_alias(arn, alias, version, description=None):260 new_alias = {261 'AliasArn': arn + ':' + alias,262 'FunctionVersion': version,263 'Name': alias,264 'Description': description or '',265 'RevisionId': str(uuid.uuid4())266 }267 arn_to_lambda.get(arn).aliases[alias] = new_alias268 return new_alias269@cloudwatched('lambda')270def run_lambda(event, context, func_arn, version=None, suppress_output=False, asynchronous=False):271 if suppress_output:272 stdout_ = sys.stdout273 stderr_ = sys.stderr274 stream = StringIO()275 sys.stdout = stream276 sys.stderr = stream277 try:278 func_arn = aws_stack.fix_arn(func_arn)279 func_details = arn_to_lambda.get(func_arn)280 if not context:281 context = LambdaContext(func_details, version)282 result, log_output = LAMBDA_EXECUTOR.execute(func_arn, func_details,283 event, context=context, version=version, asynchronous=asynchronous)284 except Exception as e:285 return error_response('Error executing Lambda function %s: %s %s' % (func_arn, e, traceback.format_exc()))286 finally:287 if suppress_output:288 sys.stdout = stdout_289 sys.stderr = stderr_290 return result291def exec_lambda_code(script, handler_function='handler', lambda_cwd=None, lambda_env=None):292 if lambda_cwd or lambda_env:293 exec_mutex.acquire()294 if lambda_cwd:295 previous_cwd = os.getcwd()296 os.chdir(lambda_cwd)297 sys.path = [lambda_cwd] + sys.path298 if lambda_env:299 previous_env = dict(os.environ)300 os.environ.update(lambda_env)301 # generate lambda file name302 lambda_id = 'l_%s' % short_uid()303 lambda_file = LAMBDA_SCRIPT_PATTERN.replace('*', lambda_id)304 save_file(lambda_file, script)305 # delete temporary .py and .pyc files on exit306 TMP_FILES.append(lambda_file)307 TMP_FILES.append('%sc' % lambda_file)308 try:309 handler_module = imp.load_source(lambda_id, lambda_file)310 module_vars = handler_module.__dict__311 except Exception as e:312 LOG.error('Unable to exec: %s %s' % (script, traceback.format_exc()))313 raise e314 finally:315 if lambda_cwd or lambda_env:316 if lambda_cwd:317 os.chdir(previous_cwd)318 sys.path.pop(0)319 if lambda_env:320 os.environ = previous_env321 exec_mutex.release()322 return module_vars[handler_function]323def get_handler_file_from_name(handler_name, runtime=LAMBDA_DEFAULT_RUNTIME):324 # TODO: support Java Lambdas in the future325 delimiter = '.'326 if runtime.startswith(LAMBDA_RUNTIME_NODEJS):327 file_ext = '.js'328 elif runtime.startswith(LAMBDA_RUNTIME_GOLANG):329 file_ext = ''330 elif runtime.startswith(LAMBDA_RUNTIME_DOTNETCORE2) or runtime.startswith(LAMBDA_RUNTIME_DOTNETCORE21):331 file_ext = '.dll'332 delimiter = ':'333 elif runtime.startswith(LAMBDA_RUNTIME_RUBY):334 file_ext = '.rb'335 elif runtime.startswith(LAMBDA_RUNTIME_CUSTOM_RUNTIME):336 file_ext = '.sh'337 else:338 file_ext = '.py'339 return '%s%s' % (handler_name.split(delimiter)[0], file_ext)340def get_handler_function_from_name(handler_name, runtime=LAMBDA_DEFAULT_RUNTIME):341 # TODO: support Java Lambdas in the future342 if runtime.startswith(LAMBDA_RUNTIME_DOTNETCORE2) or runtime.startswith(LAMBDA_RUNTIME_DOTNETCORE21):343 return handler_name.split(':')[-1]344 else:345 return handler_name.split('.')[-1]346def error_response(msg, code=500, error_type='InternalFailure'):347 LOG.warning(msg)348 return aws_responses.flask_error_response(msg, code=code, error_type=error_type)349def get_zip_bytes(function_code):350 """Returns the ZIP file contents from a FunctionCode dict.351 :type function_code: dict352 :param function_code: https://docs.aws.amazon.com/lambda/latest/dg/API_FunctionCode.html353 :returns: bytes of the Zip file.354 """355 if 'S3Bucket' in function_code:356 s3_client = aws_stack.connect_to_service('s3')357 bytes_io = BytesIO()358 try:359 s3_client.download_fileobj(function_code['S3Bucket'], function_code['S3Key'], bytes_io)360 zip_file_content = bytes_io.getvalue()361 except Exception as e:362 raise ClientError('Unable to fetch Lambda archive from S3: %s' % e, 404)363 elif 'ZipFile' in function_code:364 zip_file_content = function_code['ZipFile']365 zip_file_content = base64.b64decode(zip_file_content)366 else:367 raise ClientError('No valid Lambda archive specified.')368 return zip_file_content369def get_java_handler(zip_file_content, handler, main_file):370 """Creates a Java handler from an uploaded ZIP or JAR.371 :type zip_file_content: bytes372 :param zip_file_content: ZIP file bytes.373 :type handler: str374 :param handler: The lambda handler path.375 :type main_file: str376 :param main_file: Filepath to the uploaded ZIP or JAR file.377 :returns: function or flask.Response378 """379 if not is_jar_archive(zip_file_content):380 with zipfile.ZipFile(BytesIO(zip_file_content)) as zip_ref:381 jar_entries = [e for e in zip_ref.infolist() if e.filename.endswith('.jar')]382 if len(jar_entries) != 1:383 raise ClientError('Expected exactly one *.jar entry in zip file, found %s' % len(jar_entries))384 zip_file_content = zip_ref.read(jar_entries[0].filename)385 LOG.info('Found jar file %s with %s bytes in Lambda zip archive' %386 (jar_entries[0].filename, len(zip_file_content)))387 main_file = new_tmp_file()388 save_file(main_file, zip_file_content)389 if is_jar_archive(zip_file_content):390 def execute(event, context):391 result, log_output = lambda_executors.EXECUTOR_LOCAL.execute_java_lambda(392 event, context, handler=handler, main_file=main_file)393 return result394 return execute395 raise ClientError(error_response(396 'Unable to extract Java Lambda handler - file is not a valid zip/jar files', 400, error_type='ValidationError'))397def set_archive_code(code, lambda_name, zip_file_content=None):398 # get metadata399 lambda_arn = func_arn(lambda_name)400 lambda_details = arn_to_lambda[lambda_arn]401 is_local_mount = code.get('S3Bucket') == BUCKET_MARKER_LOCAL402 # Stop/remove any containers that this arn uses.403 LAMBDA_EXECUTOR.cleanup(lambda_arn)404 if is_local_mount:405 # Mount or use a local folder lambda executors can reference406 # WARNING: this means we're pointing lambda_cwd to a local path in the user's407 # file system! We must ensure that there is no data loss (i.e., we must *not* add408 # this folder to TMP_FILES or similar).409 return code['S3Key']410 # get file content411 zip_file_content = zip_file_content or get_zip_bytes(code)412 # Save the zip file to a temporary file that the lambda executors can reference413 code_sha_256 = base64.standard_b64encode(hashlib.sha256(zip_file_content).digest())414 lambda_details.get_version('$LATEST')['CodeSize'] = len(zip_file_content)415 lambda_details.get_version('$LATEST')['CodeSha256'] = code_sha_256.decode('utf-8')416 tmp_dir = '%s/zipfile.%s' % (config.TMP_FOLDER, short_uid())417 mkdir(tmp_dir)418 tmp_file = '%s/%s' % (tmp_dir, LAMBDA_ZIP_FILE_NAME)419 save_file(tmp_file, zip_file_content)420 TMP_FILES.append(tmp_dir)421 lambda_details.cwd = tmp_dir422 return tmp_dir423def set_function_code(code, lambda_name, lambda_cwd=None):424 def generic_handler(event, context):425 raise ClientError(('Unable to find executor for Lambda function "%s". Note that ' +426 'Node.js, Golang, and .Net Core Lambdas currently require LAMBDA_EXECUTOR=docker') % lambda_name)427 arn = func_arn(lambda_name)428 lambda_details = arn_to_lambda[arn]429 runtime = lambda_details.runtime430 lambda_environment = lambda_details.envvars431 handler_name = lambda_details.handler or LAMBDA_DEFAULT_HANDLER432 code_passed = code433 code = code or lambda_details.code434 is_local_mount = code.get('S3Bucket') == BUCKET_MARKER_LOCAL435 zip_file_content = None436 if code_passed:437 lambda_cwd = lambda_cwd or set_archive_code(code_passed, lambda_name)438 if not is_local_mount:439 # Save the zip file to a temporary file that the lambda executors can reference440 zip_file_content = get_zip_bytes(code_passed)441 else:442 lambda_cwd = lambda_cwd or lambda_details.cwd443 # get local lambda working directory444 tmp_file = '%s/%s' % (lambda_cwd, LAMBDA_ZIP_FILE_NAME)445 if not zip_file_content:446 zip_file_content = load_file(tmp_file, mode='rb')447 # Set the appropriate lambda handler.448 lambda_handler = generic_handler449 if runtime == LAMBDA_RUNTIME_JAVA8:450 # The Lambda executors for Docker subclass LambdaExecutorContainers,451 # which runs Lambda in Docker by passing all *.jar files in the function452 # working directory as part of the classpath. Because of this, we need to453 # save the zip_file_content as a .jar here.454 if is_jar_archive(zip_file_content):455 jar_tmp_file = '{working_dir}/{file_name}'.format(456 working_dir=lambda_cwd, file_name=LAMBDA_JAR_FILE_NAME)457 save_file(jar_tmp_file, zip_file_content)458 lambda_handler = get_java_handler(zip_file_content, handler_name, tmp_file)459 if isinstance(lambda_handler, Response):460 return lambda_handler461 else:462 handler_file = get_handler_file_from_name(handler_name, runtime=runtime)463 handler_function = get_handler_function_from_name(handler_name, runtime=runtime)464 if not is_local_mount:465 # Lambda code must be uploaded in Zip format466 if not is_zip_file(zip_file_content):467 raise ClientError(468 'Uploaded Lambda code for runtime ({}) is not in Zip format'.format(runtime))469 unzip(tmp_file, lambda_cwd)470 main_file = '%s/%s' % (lambda_cwd, handler_file)471 if os.path.isfile(main_file):472 # make sure the file is actually readable, then read contents473 ensure_readable(main_file)474 zip_file_content = load_file(main_file, mode='rb')475 else:476 # Raise an error if (1) this is not a local mount lambda, or (2) we're477 # running Lambdas locally (not in Docker), or (3) we're using remote Docker.478 # -> We do *not* want to raise an error if we're using local mount in non-remote Docker479 if not is_local_mount or not use_docker() or config.LAMBDA_REMOTE_DOCKER:480 file_list = run('ls -la %s' % lambda_cwd)481 LOG.debug('Lambda archive content:\n%s' % file_list)482 raise ClientError(error_response(483 'Unable to find handler script in Lambda archive.', 400,484 error_type='ValidationError'))485 if runtime.startswith('python') and not use_docker():486 try:487 lambda_handler = exec_lambda_code(488 zip_file_content,489 handler_function=handler_function,490 lambda_cwd=lambda_cwd,491 lambda_env=lambda_environment)492 except Exception as e:493 raise ClientError('Unable to get handler function from lambda code.', e)494 add_function_mapping(lambda_name, lambda_handler, lambda_cwd)495 return {'FunctionName': lambda_name}496def do_list_functions():497 funcs = []498 for f_arn, func in arn_to_lambda.items():499 if type(func) != LambdaFunction:500 continue501 func_name = f_arn.split(':function:')[-1]502 arn = func_arn(func_name)503 func_details = arn_to_lambda.get(arn)504 funcs.append(format_func_details(func_details))505 return funcs506def format_func_details(func_details, version=None, always_add_version=False):507 version = version or '$LATEST'508 result = {509 'CodeSha256': func_details.get_version(version).get('CodeSha256'),510 'Role': func_details.role,511 'Version': version,512 'FunctionArn': func_details.arn(),513 'FunctionName': func_details.name(),514 'CodeSize': func_details.get_version(version).get('CodeSize'),515 'Handler': func_details.handler,516 'Runtime': func_details.runtime,517 'Timeout': func_details.timeout,518 'Description': func_details.description,519 'MemorySize': func_details.memory_size,520 'LastModified': func_details.last_modified,521 'TracingConfig': {'Mode': 'PassThrough'},522 'RevisionId': func_details.get_version(version).get('RevisionId')523 }524 if func_details.envvars:525 result['Environment'] = {526 'Variables': func_details.envvars527 }528 if (always_add_version or version != '$LATEST') and len(result['FunctionArn'].split(':')) <= 7:529 result['FunctionArn'] += ':%s' % (version)530 return result531def forward_to_fallback_url(func_arn, data):532 """ If LAMBDA_FALLBACK_URL is configured, forward the invocation of this non-existing533 Lambda to the configured URL. """534 if not config.LAMBDA_FALLBACK_URL:535 return None536 if config.LAMBDA_FALLBACK_URL.startswith('dynamodb://'):537 table_name = urlparse(config.LAMBDA_FALLBACK_URL.replace('dynamodb://', 'http://')).netloc538 dynamodb = aws_stack.connect_to_service('dynamodb')539 item = {540 'id': {'S': short_uid()},541 'timestamp': {'N': str(now_utc())},542 'payload': {'S': str(data)}543 }544 aws_stack.create_dynamodb_table(table_name, partition_key='id')545 dynamodb.put_item(TableName=table_name, Item=item)546 return ''547 if re.match(r'^https?://.+', config.LAMBDA_FALLBACK_URL):548 response = safe_requests.post(config.LAMBDA_FALLBACK_URL, data)549 return response.content550 raise ClientError('Unexpected value for LAMBDA_FALLBACK_URL: %s' % config.LAMBDA_FALLBACK_URL)551# ------------552# API METHODS553# ------------554@app.before_request555def before_request():556 # fix to enable chunked encoding, as this is used by some Lambda clients557 transfer_encoding = request.headers.get('Transfer-Encoding', '').lower()558 if transfer_encoding == 'chunked':559 request.environ['wsgi.input_terminated'] = True560@app.route('%s/functions' % PATH_ROOT, methods=['POST'])561def create_function():562 """ Create new function563 ---564 operationId: 'createFunction'565 parameters:566 - name: 'request'567 in: body568 """569 arn = 'n/a'570 try:571 data = json.loads(to_str(request.data))572 lambda_name = data['FunctionName']573 event_publisher.fire_event(event_publisher.EVENT_LAMBDA_CREATE_FUNC,574 payload={'n': event_publisher.get_hash(lambda_name)})575 arn = func_arn(lambda_name)576 if arn in arn_to_lambda:577 return error_response('Function already exist: %s' %578 lambda_name, 409, error_type='ResourceConflictException')579 arn_to_lambda[arn] = func_details = LambdaFunction(arn)580 func_details.versions = {'$LATEST': {'RevisionId': str(uuid.uuid4())}}581 func_details.last_modified = isoformat_milliseconds(datetime.utcnow()) + '+0000'582 func_details.description = data.get('Description', '')583 func_details.handler = data['Handler']584 func_details.runtime = data['Runtime']585 func_details.envvars = data.get('Environment', {}).get('Variables', {})586 func_details.tags = data.get('Tags', {})587 func_details.timeout = data.get('Timeout', LAMBDA_DEFAULT_TIMEOUT)588 func_details.role = data['Role']589 func_details.memory_size = data.get('MemorySize')590 func_details.code = data['Code']591 result = set_function_code(func_details.code, lambda_name)592 if isinstance(result, Response):593 del arn_to_lambda[arn]594 return result595 # remove content from code attribute, if present596 func_details.code.pop('ZipFile', None)597 # prepare result598 result.update(format_func_details(func_details))599 if data.get('Publish', False):600 result['Version'] = publish_new_function_version(arn)['Version']601 return jsonify(result or {})602 except Exception as e:603 arn_to_lambda.pop(arn, None)604 if isinstance(e, ClientError):605 return e.get_response()606 return error_response('Unknown error: %s %s' % (e, traceback.format_exc()))607@app.route('%s/functions/<function>' % PATH_ROOT, methods=['GET'])608def get_function(function):609 """ Get details for a single function610 ---611 operationId: 'getFunction'612 parameters:613 - name: 'request'614 in: body615 - name: 'function'616 in: path617 """618 funcs = do_list_functions()619 for func in funcs:620 if func['FunctionName'] == function:621 result = {622 'Configuration': func,623 'Code': {624 'Location': '%s/code' % request.url625 }626 }627 lambda_details = arn_to_lambda.get(func['FunctionArn'])628 if lambda_details.concurrency is not None:629 result['Concurrency'] = lambda_details.concurrency630 return jsonify(result)631 return error_response(632 'Function not found: %s' % func_arn(function), 404, error_type='ResourceNotFoundException')633@app.route('%s/functions/' % PATH_ROOT, methods=['GET'])634def list_functions():635 """ List functions636 ---637 operationId: 'listFunctions'638 parameters:639 - name: 'request'640 in: body641 """642 funcs = do_list_functions()643 result = {}644 result['Functions'] = funcs645 return jsonify(result)646@app.route('%s/functions/<function>' % PATH_ROOT, methods=['DELETE'])647def delete_function(function):648 """ Delete an existing function649 ---650 operationId: 'deleteFunction'651 parameters:652 - name: 'request'653 in: body654 """655 arn = func_arn(function)656 # Stop/remove any containers that this arn uses.657 LAMBDA_EXECUTOR.cleanup(arn)658 try:659 arn_to_lambda.pop(arn)660 except KeyError:661 return error_response('Function does not exist: %s' % function, 404, error_type='ResourceNotFoundException')662 event_publisher.fire_event(event_publisher.EVENT_LAMBDA_DELETE_FUNC,663 payload={'n': event_publisher.get_hash(function)})664 i = 0665 while i < len(event_source_mappings):666 mapping = event_source_mappings[i]667 if mapping['FunctionArn'] == arn:668 del event_source_mappings[i]669 i -= 1670 i += 1671 result = {}672 return jsonify(result)673@app.route('%s/functions/<function>/code' % PATH_ROOT, methods=['PUT'])674def update_function_code(function):675 """ Update the code of an existing function676 ---677 operationId: 'updateFunctionCode'678 parameters:679 - name: 'request'680 in: body681 """682 data = json.loads(to_str(request.data))683 result = set_function_code(data, function)684 arn = func_arn(function)685 func_details = arn_to_lambda.get(arn)686 result.update(format_func_details(func_details))687 if isinstance(result, Response):688 return result689 return jsonify(result or {})690@app.route('%s/functions/<function>/code' % PATH_ROOT, methods=['GET'])691def get_function_code(function):692 """ Get the code of an existing function693 ---694 operationId: 'getFunctionCode'695 parameters:696 """697 arn = func_arn(function)698 lambda_cwd = arn_to_lambda[arn].cwd699 tmp_file = '%s/%s' % (lambda_cwd, LAMBDA_ZIP_FILE_NAME)700 return Response(load_file(tmp_file, mode='rb'),701 mimetype='application/zip',702 headers={'Content-Disposition': 'attachment; filename=lambda_archive.zip'})703@app.route('%s/functions/<function>/configuration' % PATH_ROOT, methods=['GET'])704def get_function_configuration(function):705 """ Get the configuration of an existing function706 ---707 operationId: 'getFunctionConfiguration'708 parameters:709 """710 arn = func_arn(function)711 lambda_details = arn_to_lambda.get(arn)712 if not lambda_details:713 return error_response('Function not found: %s' % arn, 404, error_type='ResourceNotFoundException')714 result = format_func_details(lambda_details)715 return jsonify(result)716@app.route('%s/functions/<function>/configuration' % PATH_ROOT, methods=['PUT'])717def update_function_configuration(function):718 """ Update the configuration of an existing function719 ---720 operationId: 'updateFunctionConfiguration'721 parameters:722 - name: 'request'723 in: body724 """725 data = json.loads(to_str(request.data))726 arn = func_arn(function)727 # Stop/remove any containers that this arn uses.728 LAMBDA_EXECUTOR.cleanup(arn)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!