Best Python code snippet using localstack_python
invocations.py
Source:invocations.py
...285 func_arn = (286 uri.split(":lambda:path")[1].split("functions/")[1].split("/invocations")[0]287 )288 invocation_context.context = helpers.get_event_request_context(invocation_context)289 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)290 if invocation_context.authorizer_type:291 authorizer_context = {292 invocation_context.authorizer_type: invocation_context.auth_context293 }294 invocation_context.context["authorizer"] = authorizer_context295 request_templates = RequestTemplates()296 payload = request_templates.render(invocation_context)297 # TODO: change this signature to InvocationContext as well!298 result = lambda_api.process_apigateway_invocation(299 func_arn,300 relative_path,301 payload,302 stage,303 api_id,304 headers,305 is_base64_encoded=invocation_context.is_data_base64_encoded,306 path_params=path_params,307 query_string_params=query_string_params,308 method=method,309 resource_path=resource_path,310 request_context=invocation_context.context,311 stage_variables=invocation_context.stage_variables,312 )313 if isinstance(result, FlaskResponse):314 response = flask_to_requests_response(result)315 elif isinstance(result, Response):316 response = result317 else:318 response = LambdaResponse()319 parsed_result = (320 result if isinstance(result, dict) else json.loads(str(result or "{}"))321 )322 parsed_result = common.json_safe(parsed_result)323 parsed_result = {} if parsed_result is None else parsed_result324 response.status_code = int(parsed_result.get("statusCode", 200))325 parsed_headers = parsed_result.get("headers", {})326 if parsed_headers is not None:327 response.headers.update(parsed_headers)328 try:329 result_body = parsed_result.get("body")330 if isinstance(result_body, dict):331 response._content = json.dumps(result_body)332 else:333 body_bytes = to_bytes(to_str(result_body or ""))334 if parsed_result.get("isBase64Encoded", False):335 body_bytes = base64.b64decode(body_bytes)336 response._content = body_bytes337 except Exception as e:338 LOG.warning("Couldn't set Lambda response content: %s", e)339 response._content = "{}"340 update_content_length(response)341 response.multi_value_headers = parsed_result.get("multiValueHeaders") or {}342 # apply custom response template343 invocation_context.response = response344 response_templates = ResponseTemplates()345 response_templates.render(invocation_context)346 invocation_context.response.headers["Content-Length"] = str(len(response.content or ""))347 return invocation_context.response348 raise Exception(349 f'API Gateway integration type "{integration_type}", action "{uri}", method "{method}"'350 )351 elif integration_type == "AWS":352 if "kinesis:action/" in uri:353 if uri.endswith("kinesis:action/PutRecord"):354 target = kinesis_listener.ACTION_PUT_RECORD355 elif uri.endswith("kinesis:action/PutRecords"):356 target = kinesis_listener.ACTION_PUT_RECORDS357 elif uri.endswith("kinesis:action/ListStreams"):358 target = kinesis_listener.ACTION_LIST_STREAMS359 else:360 LOG.info(361 f"Unexpected API Gateway integration URI '{uri}' for integration type {integration_type}",362 )363 target = ""364 try:365 invocation_context.context = helpers.get_event_request_context(invocation_context)366 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)367 request_templates = RequestTemplates()368 payload = request_templates.render(invocation_context)369 except Exception as e:370 LOG.warning("Unable to convert API Gateway payload to str", e)371 raise372 # forward records to target kinesis stream373 headers = aws_stack.mock_aws_request_headers(374 service="kinesis", region_name=invocation_context.region_name375 )376 headers["X-Amz-Target"] = target377 result = common.make_http_request(378 url=config.service_url("kineses"), data=payload, headers=headers, method="POST"379 )380 # apply response template381 invocation_context.response = result382 response_templates = ResponseTemplates()383 response_templates.render(invocation_context)384 return invocation_context.response385 elif "states:action/" in uri:386 action = uri.split("/")[-1]387 if APPLICATION_JSON in integration.get("requestTemplates", {}):388 request_templates = RequestTemplates()389 payload = request_templates.render(invocation_context)390 payload = json.loads(payload)391 else:392 # XXX decoding in py3 sounds wrong, this actually might break393 payload = json.loads(data.decode("utf-8"))394 client = aws_stack.connect_to_service("stepfunctions")395 if isinstance(payload.get("input"), dict):396 payload["input"] = json.dumps(payload["input"])397 # Hot fix since step functions local package responses: Unsupported Operation: 'StartSyncExecution'398 method_name = (399 camel_to_snake_case(action) if action != "StartSyncExecution" else "start_execution"400 )401 try:402 method = getattr(client, method_name)403 except AttributeError:404 msg = "Invalid step function action: %s" % method_name405 LOG.error(msg)406 return make_error_response(msg, 400)407 result = method(**payload)408 result = json_safe({k: result[k] for k in result if k not in "ResponseMetadata"})409 response = requests_response(410 content=result,411 headers=aws_stack.mock_aws_request_headers(),412 )413 if action == "StartSyncExecution":414 # poll for the execution result and return it415 result = await_sfn_execution_result(result["executionArn"])416 result_status = result.get("status")417 if result_status != "SUCCEEDED":418 return make_error_response(419 "StepFunctions execution %s failed with status '%s'"420 % (result["executionArn"], result_status),421 500,422 )423 result = json_safe(result)424 response = requests_response(content=result)425 # apply response templates426 invocation_context.response = response427 response_templates = ResponseTemplates()428 response_templates.render(invocation_context)429 # response = apply_request_response_templates(430 # response, response_templates, content_type=APPLICATION_JSON431 # )432 return response433 # https://docs.aws.amazon.com/apigateway/api-reference/resource/integration/434 elif ("s3:path/" in uri or "s3:action/" in uri) and method == "GET":435 s3 = aws_stack.connect_to_service("s3")436 uri = apply_request_parameters(437 uri,438 integration=integration,439 path_params=path_params,440 query_params=query_string_params,441 )442 uri_match = re.match(TARGET_REGEX_PATH_S3_URI, uri) or re.match(443 TARGET_REGEX_ACTION_S3_URI, uri444 )445 if uri_match:446 bucket, object_key = uri_match.group("bucket", "object")447 LOG.debug("Getting request for bucket %s object %s", bucket, object_key)448 try:449 object = s3.get_object(Bucket=bucket, Key=object_key)450 except s3.exceptions.NoSuchKey:451 msg = "Object %s not found" % object_key452 LOG.debug(msg)453 return make_error_response(msg, 404)454 headers = aws_stack.mock_aws_request_headers(service="s3")455 if object.get("ContentType"):456 headers["Content-Type"] = object["ContentType"]457 # stream used so large files do not fill memory458 response = request_response_stream(stream=object["Body"], headers=headers)459 return response460 else:461 msg = "Request URI does not match s3 specifications"462 LOG.warning(msg)463 return make_error_response(msg, 400)464 if method == "POST":465 if uri.startswith("arn:aws:apigateway:") and ":sqs:path" in uri:466 template = integration["requestTemplates"][APPLICATION_JSON]467 account_id, queue = uri.split("/")[-2:]468 region_name = uri.split(":")[3]469 if "GetQueueUrl" in template or "CreateQueue" in template:470 request_templates = RequestTemplates()471 payload = request_templates.render(invocation_context)472 new_request = f"{payload}&QueueName={queue}"473 else:474 request_templates = RequestTemplates()475 payload = request_templates.render(invocation_context)476 queue_url = f"{config.get_edge_url()}/{account_id}/{queue}"477 new_request = f"{payload}&QueueUrl={queue_url}"478 headers = aws_stack.mock_aws_request_headers(service="sqs", region_name=region_name)479 url = urljoin(config.service_url("sqs"), f"{TEST_AWS_ACCOUNT_ID}/{queue}")480 result = common.make_http_request(481 url, method="POST", headers=headers, data=new_request482 )483 return result484 elif uri.startswith("arn:aws:apigateway:") and ":sns:path" in uri:485 invocation_context.context = helpers.get_event_request_context(invocation_context)486 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)487 integration_response = SnsIntegration().invoke(invocation_context)488 return apply_request_response_templates(489 integration_response, response_templates, content_type=APPLICATION_JSON490 )491 raise Exception(492 'API Gateway AWS integration action URI "%s", method "%s" not yet implemented'493 % (uri, method)494 )495 elif integration_type == "AWS_PROXY":496 if uri.startswith("arn:aws:apigateway:") and ":dynamodb:action" in uri:497 # arn:aws:apigateway:us-east-1:dynamodb:action/PutItem&Table=MusicCollection498 table_name = uri.split(":dynamodb:action")[1].split("&Table=")[1]499 action = uri.split(":dynamodb:action")[1].split("&Table=")[0]500 if "PutItem" in action and method == "PUT":501 response_template = response_templates.get("application/json")502 if response_template is None:503 msg = "Invalid response template defined in integration response."504 LOG.info("%s Existing: %s", msg, response_templates)505 return make_error_response(msg, 404)506 response_template = json.loads(response_template)507 if response_template["TableName"] != table_name:508 msg = "Invalid table name specified in integration response template."509 return make_error_response(msg, 404)510 dynamo_client = aws_stack.connect_to_resource("dynamodb")511 table = dynamo_client.Table(table_name)512 event_data = {}513 data_dict = json.loads(data)514 for key, _ in response_template["Item"].items():515 event_data[key] = data_dict[key]516 table.put_item(Item=event_data)517 response = requests_response(event_data)518 return response519 else:520 raise Exception(521 'API Gateway action uri "%s", integration type %s not yet implemented'522 % (uri, integration_type)523 )524 elif integration_type in ["HTTP_PROXY", "HTTP"]:525 if ":servicediscovery:" in uri:526 # check if this is a servicediscovery integration URI527 client = aws_stack.connect_to_service("servicediscovery")528 service_id = uri.split("/")[-1]529 instances = client.list_instances(ServiceId=service_id)["Instances"]530 instance = (instances or [None])[0]531 if instance and instance.get("Id"):532 uri = "http://%s/%s" % (instance["Id"], invocation_path.lstrip("/"))533 # apply custom request template534 invocation_context.context = helpers.get_event_request_context(invocation_context)535 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)536 request_templates = RequestTemplates()537 payload = request_templates.render(invocation_context)538 if isinstance(payload, dict):539 payload = json.dumps(payload)540 uri = apply_request_parameters(541 uri, integration=integration, path_params=path_params, query_params=query_string_params542 )543 result = requests.request(method=method, url=uri, data=payload, headers=headers)544 # apply custom response template545 invocation_context.response = result546 response_templates = ResponseTemplates()547 response_templates.render(invocation_context)548 return invocation_context.response549 elif integration_type == "MOCK":550 # TODO: apply tell don't ask principle inside ResponseTemplates or InvocationContext551 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)552 invocation_context.response = requests_response({})553 response_templates = ResponseTemplates()554 response_templates.render(invocation_context)555 return invocation_context.response556 if method == "OPTIONS":557 # fall back to returning CORS headers if this is an OPTIONS request558 return get_cors_response(headers)559 raise Exception(560 'API Gateway integration type "%s", method "%s", URI "%s" not yet implemented'561 % (integration_type, method, uri)562 )563def apply_request_response_templates(564 data: Union[Response, bytes],565 templates: Dict[str, str],...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!