Best Python code snippet using localstack_python
apigateway_listener.py
Source:apigateway_listener.py
...374 method = invocation_context.method375 headers = invocation_context.headers376 # run gateway authorizers for this request377 authorize_invocation(invocation_context)378 extracted_path, resource = get_target_resource_details(invocation_context)379 if not resource:380 return make_error_response("Unable to find path %s" % invocation_context.path, 404)381 # validate request382 validator = RequestValidator(invocation_context, aws_stack.connect_to_service("apigateway"))383 if not validator.is_request_valid():384 return make_error_response("Invalid request body", 400)385 api_key_required = resource.get("resourceMethods", {}).get(method, {}).get("apiKeyRequired")386 if not is_api_key_valid(api_key_required, headers, invocation_context.stage):387 return make_error_response("Access denied - invalid API key", 403)388 integrations = resource.get("resourceMethods", {})389 integration = integrations.get(method, {})390 if not integration:391 # HttpMethod: '*'392 # ResourcePath: '/*' - produces 'X-AMAZON-APIGATEWAY-ANY-METHOD'393 integration = integrations.get("ANY", {}) or integrations.get(394 "X-AMAZON-APIGATEWAY-ANY-METHOD", {}395 )396 integration = integration.get("methodIntegration")397 if not integration:398 if method == "OPTIONS" and "Origin" in headers:399 # default to returning CORS headers if this is an OPTIONS request400 return get_cors_response(headers)401 return make_error_response(402 "Unable to find integration for: %s %s (%s)" % (method, invocation_path, raw_path),403 404,404 )405 res_methods = resource.get("resourceMethods", {})406 meth_integration = res_methods.get(method, {}).get("methodIntegration", {})407 int_responses = meth_integration.get("integrationResponses", {})408 response_templates = int_responses.get("200", {}).get("responseTemplates", {})409 # update fields in invocation context, then forward request to next handler410 invocation_context.resource = resource411 invocation_context.resource_path = extracted_path412 invocation_context.response_templates = response_templates413 invocation_context.integration = integration414 return invoke_rest_api_integration(invocation_context)415def invoke_rest_api_integration(invocation_context: ApiInvocationContext):416 try:417 response = invoke_rest_api_integration_backend(invocation_context)418 # TODO remove this setter once all the integrations are migrated to the new response419 # handling420 invocation_context.response = response421 response = apply_response_parameters(invocation_context)422 return response423 except Exception as e:424 msg = f"Error invoking integration for API Gateway ID '{invocation_context.api_id}': {e}"425 LOG.exception(msg)426 return make_error_response(msg, 400)427# TODO: refactor this to have a class per integration type to make it easy to428# test the encapsulated logic429def invoke_rest_api_integration_backend(invocation_context: ApiInvocationContext):430 # define local aliases from invocation context431 invocation_path = invocation_context.path_with_query_string432 method = invocation_context.method433 data = invocation_context.data434 headers = invocation_context.headers435 api_id = invocation_context.api_id436 stage = invocation_context.stage437 resource_path = invocation_context.resource_path438 response_templates = invocation_context.response_templates439 integration = invocation_context.integration440 # extract integration type and path parameters441 relative_path, query_string_params = extract_query_string_params(path=invocation_path)442 integration_type_orig = integration.get("type") or integration.get("integrationType") or ""443 integration_type = integration_type_orig.upper()444 uri = integration.get("uri") or integration.get("integrationUri") or ""445 # XXX we need replace the internal Authorization header with an Authorization header set from446 # the customer, even if it's empty that's what's expected in the integration.447 custom_auth_header = invocation_context.headers.pop(HEADER_LOCALSTACK_AUTHORIZATION, "")448 invocation_context.headers["Authorization"] = custom_auth_header449 try:450 path_params = extract_path_params(path=relative_path, extracted_path=resource_path)451 invocation_context.path_params = path_params452 except Exception:453 path_params = {}454 if (uri.startswith("arn:aws:apigateway:") and ":lambda:path" in uri) or uri.startswith(455 "arn:aws:lambda"456 ):457 if integration_type in ["AWS", "AWS_PROXY"]:458 func_arn = uri459 if ":lambda:path" in uri:460 func_arn = (461 uri.split(":lambda:path")[1].split("functions/")[1].split("/invocations")[0]462 )463 invocation_context.context = get_event_request_context(invocation_context)464 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)465 if invocation_context.authorizer_type:466 authorizer_context = {467 invocation_context.authorizer_type: invocation_context.auth_context468 }469 invocation_context.context["authorizer"] = authorizer_context470 request_templates = RequestTemplates()471 payload = request_templates.render(invocation_context)472 # TODO: change this signature to InvocationContext as well!473 result = lambda_api.process_apigateway_invocation(474 func_arn,475 relative_path,476 payload,477 stage,478 api_id,479 headers,480 is_base64_encoded=invocation_context.is_data_base64_encoded,481 path_params=path_params,482 query_string_params=query_string_params,483 method=method,484 resource_path=resource_path,485 request_context=invocation_context.context,486 stage_variables=invocation_context.stage_variables,487 )488 if isinstance(result, FlaskResponse):489 response = flask_to_requests_response(result)490 elif isinstance(result, Response):491 response = result492 else:493 response = LambdaResponse()494 parsed_result = (495 result if isinstance(result, dict) else json.loads(str(result or "{}"))496 )497 parsed_result = common.json_safe(parsed_result)498 parsed_result = {} if parsed_result is None else parsed_result499 response.status_code = int(parsed_result.get("statusCode", 200))500 parsed_headers = parsed_result.get("headers", {})501 if parsed_headers is not None:502 response.headers.update(parsed_headers)503 try:504 result_body = parsed_result.get("body")505 if isinstance(result_body, dict):506 response._content = json.dumps(result_body)507 else:508 body_bytes = to_bytes(to_str(result_body or ""))509 if parsed_result.get("isBase64Encoded", False):510 body_bytes = base64.b64decode(body_bytes)511 response._content = body_bytes512 except Exception as e:513 LOG.warning("Couldn't set Lambda response content: %s", e)514 response._content = "{}"515 update_content_length(response)516 response.multi_value_headers = parsed_result.get("multiValueHeaders") or {}517 # apply custom response template518 invocation_context.response = response519 response_templates = ResponseTemplates()520 response_templates.render(invocation_context)521 invocation_context.response.headers["Content-Length"] = str(len(response.content or ""))522 return invocation_context.response523 raise Exception(524 f'API Gateway integration type "{integration_type}", action "{uri}", method "{method}"'525 )526 elif integration_type == "AWS":527 if "kinesis:action/" in uri:528 if uri.endswith("kinesis:action/PutRecord"):529 target = kinesis_listener.ACTION_PUT_RECORD530 elif uri.endswith("kinesis:action/PutRecords"):531 target = kinesis_listener.ACTION_PUT_RECORDS532 elif uri.endswith("kinesis:action/ListStreams"):533 target = kinesis_listener.ACTION_LIST_STREAMS534 else:535 LOG.info(536 f"Unexpected API Gateway integration URI '{uri}' for integration type {integration_type}",537 )538 target = ""539 try:540 invocation_context.context = get_event_request_context(invocation_context)541 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)542 request_templates = RequestTemplates()543 payload = request_templates.render(invocation_context)544 except Exception as e:545 LOG.warning("Unable to convert API Gateway payload to str", e)546 raise547 # forward records to target kinesis stream548 headers = aws_stack.mock_aws_request_headers(549 service="kinesis", region_name=invocation_context.region_name550 )551 headers["X-Amz-Target"] = target552 result = common.make_http_request(553 url=config.service_url("kineses"), data=payload, headers=headers, method="POST"554 )555 # apply response template556 invocation_context.response = result557 response_templates = ResponseTemplates()558 response_templates.render(invocation_context)559 return invocation_context.response560 elif "states:action/" in uri:561 action = uri.split("/")[-1]562 if APPLICATION_JSON in integration.get("requestTemplates", {}):563 request_templates = RequestTemplates()564 payload = request_templates.render(invocation_context)565 payload = json.loads(payload)566 else:567 # XXX decoding in py3 sounds wrong, this actually might break568 payload = json.loads(data.decode("utf-8"))569 client = aws_stack.connect_to_service("stepfunctions")570 if isinstance(payload.get("input"), dict):571 payload["input"] = json.dumps(payload["input"])572 # Hot fix since step functions local package responses: Unsupported Operation: 'StartSyncExecution'573 method_name = (574 camel_to_snake_case(action) if action != "StartSyncExecution" else "start_execution"575 )576 try:577 method = getattr(client, method_name)578 except AttributeError:579 msg = "Invalid step function action: %s" % method_name580 LOG.error(msg)581 return make_error_response(msg, 400)582 result = method(**payload)583 result = json_safe({k: result[k] for k in result if k not in "ResponseMetadata"})584 response = requests_response(585 content=result,586 headers=aws_stack.mock_aws_request_headers(),587 )588 if action == "StartSyncExecution":589 # poll for the execution result and return it590 result = await_sfn_execution_result(result["executionArn"])591 result_status = result.get("status")592 if result_status != "SUCCEEDED":593 return make_error_response(594 "StepFunctions execution %s failed with status '%s'"595 % (result["executionArn"], result_status),596 500,597 )598 result = json_safe(result)599 response = requests_response(content=result)600 # apply response templates601 invocation_context.response = response602 response_templates = ResponseTemplates()603 response_templates.render(invocation_context)604 # response = apply_request_response_templates(605 # response, response_templates, content_type=APPLICATION_JSON606 # )607 return response608 # https://docs.aws.amazon.com/apigateway/api-reference/resource/integration/609 elif ("s3:path/" in uri or "s3:action/" in uri) and method == "GET":610 s3 = aws_stack.connect_to_service("s3")611 uri = apply_request_parameters(612 uri,613 integration=integration,614 path_params=path_params,615 query_params=query_string_params,616 )617 uri_match = re.match(TARGET_REGEX_PATH_S3_URI, uri) or re.match(618 TARGET_REGEX_ACTION_S3_URI, uri619 )620 if uri_match:621 bucket, object_key = uri_match.group("bucket", "object")622 LOG.debug("Getting request for bucket %s object %s", bucket, object_key)623 try:624 object = s3.get_object(Bucket=bucket, Key=object_key)625 except s3.exceptions.NoSuchKey:626 msg = "Object %s not found" % object_key627 LOG.debug(msg)628 return make_error_response(msg, 404)629 headers = aws_stack.mock_aws_request_headers(service="s3")630 if object.get("ContentType"):631 headers["Content-Type"] = object["ContentType"]632 # stream used so large files do not fill memory633 response = request_response_stream(stream=object["Body"], headers=headers)634 return response635 else:636 msg = "Request URI does not match s3 specifications"637 LOG.warning(msg)638 return make_error_response(msg, 400)639 if method == "POST":640 if uri.startswith("arn:aws:apigateway:") and ":sqs:path" in uri:641 template = integration["requestTemplates"][APPLICATION_JSON]642 account_id, queue = uri.split("/")[-2:]643 region_name = uri.split(":")[3]644 if "GetQueueUrl" in template or "CreateQueue" in template:645 request_templates = RequestTemplates()646 payload = request_templates.render(invocation_context)647 new_request = f"{payload}&QueueName={queue}"648 else:649 request_templates = RequestTemplates()650 payload = request_templates.render(invocation_context)651 queue_url = f"{config.get_edge_url()}/{account_id}/{queue}"652 new_request = f"{payload}&QueueUrl={queue_url}"653 headers = aws_stack.mock_aws_request_headers(service="sqs", region_name=region_name)654 url = urljoin(config.service_url("sqs"), f"{TEST_AWS_ACCOUNT_ID}/{queue}")655 result = common.make_http_request(656 url, method="POST", headers=headers, data=new_request657 )658 return result659 elif uri.startswith("arn:aws:apigateway:") and ":sns:path" in uri:660 invocation_context.context = get_event_request_context(invocation_context)661 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)662 integration_response = SnsIntegration().invoke(invocation_context)663 return apply_request_response_templates(664 integration_response, response_templates, content_type=APPLICATION_JSON665 )666 raise Exception(667 'API Gateway AWS integration action URI "%s", method "%s" not yet implemented'668 % (uri, method)669 )670 elif integration_type == "AWS_PROXY":671 if uri.startswith("arn:aws:apigateway:") and ":dynamodb:action" in uri:672 # arn:aws:apigateway:us-east-1:dynamodb:action/PutItem&Table=MusicCollection673 table_name = uri.split(":dynamodb:action")[1].split("&Table=")[1]674 action = uri.split(":dynamodb:action")[1].split("&Table=")[0]675 if "PutItem" in action and method == "PUT":676 response_template = response_templates.get("application/json")677 if response_template is None:678 msg = "Invalid response template defined in integration response."679 LOG.info("%s Existing: %s", msg, response_templates)680 return make_error_response(msg, 404)681 response_template = json.loads(response_template)682 if response_template["TableName"] != table_name:683 msg = "Invalid table name specified in integration response template."684 return make_error_response(msg, 404)685 dynamo_client = aws_stack.connect_to_resource("dynamodb")686 table = dynamo_client.Table(table_name)687 event_data = {}688 data_dict = json.loads(data)689 for key, _ in response_template["Item"].items():690 event_data[key] = data_dict[key]691 table.put_item(Item=event_data)692 response = requests_response(event_data)693 return response694 else:695 raise Exception(696 'API Gateway action uri "%s", integration type %s not yet implemented'697 % (uri, integration_type)698 )699 elif integration_type in ["HTTP_PROXY", "HTTP"]:700 if ":servicediscovery:" in uri:701 # check if this is a servicediscovery integration URI702 client = aws_stack.connect_to_service("servicediscovery")703 service_id = uri.split("/")[-1]704 instances = client.list_instances(ServiceId=service_id)["Instances"]705 instance = (instances or [None])[0]706 if instance and instance.get("Id"):707 uri = "http://%s/%s" % (instance["Id"], invocation_path.lstrip("/"))708 # apply custom request template709 invocation_context.context = get_event_request_context(invocation_context)710 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)711 request_templates = RequestTemplates()712 payload = request_templates.render(invocation_context)713 if isinstance(payload, dict):714 payload = json.dumps(payload)715 uri = apply_request_parameters(716 uri, integration=integration, path_params=path_params, query_params=query_string_params717 )718 result = requests.request(method=method, url=uri, data=payload, headers=headers)719 # apply custom response template720 invocation_context.response = result721 response_templates = ResponseTemplates()722 response_templates.render(invocation_context)723 return invocation_context.response724 elif integration_type == "MOCK":725 # TODO: apply tell don't ask principle inside ResponseTemplates or InvocationContext726 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)727 invocation_context.response = requests_response({})728 response_templates = ResponseTemplates()729 response_templates.render(invocation_context)730 return invocation_context.response731 if method == "OPTIONS":732 # fall back to returning CORS headers if this is an OPTIONS request733 return get_cors_response(headers)734 raise Exception(735 'API Gateway integration type "%s", method "%s", URI "%s" not yet implemented'736 % (integration_type, method, uri)737 )738def get_target_resource_details(invocation_context: ApiInvocationContext) -> Tuple[str, Dict]:739 """Look up and return the API GW resource (path pattern + resource dict) for the given invocation context."""740 path_map = helpers.get_rest_api_paths(741 rest_api_id=invocation_context.api_id, region_name=invocation_context.region_name742 )743 relative_path = invocation_context.invocation_path744 try:745 extracted_path, resource = get_resource_for_path(path=relative_path, path_map=path_map)746 invocation_context.resource = resource747 return extracted_path, resource748 except Exception:749 return None, None750def get_target_resource_method(invocation_context: ApiInvocationContext) -> Optional[Dict]:751 """Look up and return the API GW resource method for the given invocation context."""752 _, resource = get_target_resource_details(invocation_context)753 if not resource:754 return None755 methods = resource.get("resourceMethods") or {}756 method_name = invocation_context.method.upper()757 return methods.get(method_name) or methods.get("ANY")758def get_event_request_context(invocation_context: ApiInvocationContext):759 method = invocation_context.method760 path = invocation_context.path761 headers = invocation_context.headers762 integration_uri = invocation_context.integration_uri763 resource_path = invocation_context.resource_path764 resource_id = invocation_context.resource_id765 set_api_id_stage_invocation_path(invocation_context)766 relative_path, query_string_params = extract_query_string_params(...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!