Best Python code snippet using localstack_python
provider.py
Source:provider.py
...398 region_details = APIGatewayRegion.get()399 link_id = short_uid()400 entry = {"id": link_id, "status": "AVAILABLE"}401 region_details.vpc_links[link_id] = entry402 result = to_vpc_link_response_json(entry)403 return VpcLink(**result)404 def get_vpc_links(405 self, context: RequestContext, position: String = None, limit: NullableInteger = None406 ) -> VpcLinks:407 region_details = APIGatewayRegion.get()408 result = region_details.vpc_links.values()409 result = [to_vpc_link_response_json(r) for r in result]410 result = {"items": result}411 return result412 def get_vpc_link(self, context: RequestContext, vpc_link_id: String) -> VpcLink:413 region_details = APIGatewayRegion.get()414 vpc_link = region_details.vpc_links.get(vpc_link_id)415 if vpc_link is None:416 raise NotFoundException(f'VPC link ID "{vpc_link_id}" not found')417 result = to_vpc_link_response_json(vpc_link)418 return VpcLink(**result)419 def update_vpc_link(420 self,421 context: RequestContext,422 vpc_link_id: String,423 patch_operations: ListOfPatchOperation = None,424 ) -> VpcLink:425 region_details = APIGatewayRegion.get()426 vpc_link = region_details.vpc_links.get(vpc_link_id)427 if vpc_link is None:428 raise NotFoundException(f'VPC link ID "{vpc_link_id}" not found')429 result = apply_json_patch_safe(vpc_link, patch_operations)430 result = to_vpc_link_response_json(result)431 return VpcLink(**result)432 def delete_vpc_link(self, context: RequestContext, vpc_link_id: String) -> None:433 region_details = APIGatewayRegion.get()434 vpc_link = region_details.vpc_links.pop(vpc_link_id, None)435 if vpc_link is None:436 raise NotFoundException(f'VPC link ID "{vpc_link_id}" not found for deletion')437 # request validators438 def get_request_validators(439 self,440 context: RequestContext,441 rest_api_id: String,442 position: String = None,443 limit: NullableInteger = None,444 ) -> RequestValidators:445 region_details = APIGatewayRegion.get()446 auth_list = region_details.validators.get(rest_api_id) or []447 result = [to_validator_response_json(rest_api_id, a) for a in auth_list]448 return RequestValidators(items=result)449 def get_request_validator(450 self, context: RequestContext, rest_api_id: String, request_validator_id: String451 ) -> RequestValidator:452 region_details = APIGatewayRegion.get()453 auth_list = region_details.validators.get(rest_api_id) or []454 validator = ([a for a in auth_list if a["id"] == request_validator_id] or [None])[0]455 if validator is None:456 raise NotFoundException(457 f"Validator {request_validator_id} for API Gateway {rest_api_id} not found"458 )459 result = to_validator_response_json(rest_api_id, validator)460 return RequestValidator(**result)461 def create_request_validator(462 self,463 context: RequestContext,464 rest_api_id: String,465 name: String = None,466 validate_request_body: Boolean = None,467 validate_request_parameters: Boolean = None,468 ) -> RequestValidator:469 region_details = APIGatewayRegion.get()470 # length 6 for AWS parity and TF compatibility471 validator_id = short_uid()[:6]472 entry = {473 "id": validator_id,474 "name": name,475 "restApiId": rest_api_id,476 "validateRequestBody": validate_request_body,477 "validateRequestPparameters": validate_request_parameters,478 }479 region_details.validators.setdefault(rest_api_id, []).append(entry)480 return RequestValidator(**entry)481 def update_request_validator(482 self,483 context: RequestContext,484 rest_api_id: String,485 request_validator_id: String,486 patch_operations: ListOfPatchOperation = None,487 ) -> RequestValidator:488 region_details = APIGatewayRegion.get()489 auth_list = region_details.validators.get(rest_api_id) or []490 validator = ([a for a in auth_list if a["id"] == request_validator_id] or [None])[0]491 if validator is None:492 raise NotFoundException(493 f"Validator {request_validator_id} for API Gateway {rest_api_id} not found"494 )495 result = apply_json_patch_safe(validator, patch_operations)496 entry_list = region_details.validators[rest_api_id]497 for i in range(len(entry_list)):498 if entry_list[i]["id"] == request_validator_id:499 entry_list[i] = result500 result = to_validator_response_json(rest_api_id, result)501 return RequestValidator(**result)502 def delete_request_validator(503 self, context: RequestContext, rest_api_id: String, request_validator_id: String504 ) -> None:505 region_details = APIGatewayRegion.get()506 auth_list = region_details.validators.get(rest_api_id, [])507 for i in range(len(auth_list)):508 if auth_list[i]["id"] == request_validator_id:509 del auth_list[i]510 return511 raise NotFoundException(512 f"Validator {request_validator_id} for API Gateway {rest_api_id} not found"513 )514 # tags515 def get_tags(516 self,517 context: RequestContext,518 resource_arn: String,519 position: String = None,520 limit: NullableInteger = None,521 ) -> Tags:522 result = APIGatewayRegion.TAGS.get(resource_arn, {})523 return Tags(tags=result)524 def tag_resource(525 self, context: RequestContext, resource_arn: String, tags: MapOfStringToString526 ) -> None:527 resource_tags = APIGatewayRegion.TAGS.setdefault(resource_arn, {})528 resource_tags.update(tags)529 def untag_resource(530 self, context: RequestContext, resource_arn: String, tag_keys: ListOfString531 ) -> None:532 resource_tags = APIGatewayRegion.TAGS.setdefault(resource_arn, {})533 for key in tag_keys:534 resource_tags.pop(key, None)535# ---------------536# UTIL FUNCTIONS537# ---------------538def normalize_authorizer(data):539 is_list = isinstance(data, list)540 entries = ensure_list(data)541 for i in range(len(entries)):542 entry = deepcopy(entries[i])543 # terraform sends this as a string in patch, so convert to int544 entry["authorizerResultTtlInSeconds"] = int(entry.get("authorizerResultTtlInSeconds", 300))545 entries[i] = entry546 return entries if is_list else entries[0]547def to_authorizer_response_json(api_id, data):548 return to_response_json("authorizer", data, api_id=api_id)549def to_validator_response_json(api_id, data):550 return to_response_json("validator", data, api_id=api_id)551def to_documentation_part_response_json(api_id, data):552 return to_response_json("documentationpart", data, api_id=api_id)553def to_base_mapping_response_json(domain_name, base_path, data):554 self_link = "/domainnames/%s/basepathmappings/%s" % (domain_name, base_path)555 return to_response_json("basepathmapping", data, self_link=self_link)556def to_account_response_json(data):557 return to_response_json("account", data, self_link="/account")558def to_vpc_link_response_json(data):559 return to_response_json("vpclink", data)560def to_client_cert_response_json(data):561 return to_response_json("clientcertificate", data, id_attr="clientCertificateId")562def to_response_json(model_type, data, api_id=None, self_link=None, id_attr=None):563 if isinstance(data, list) and len(data) == 1:564 data = data[0]565 id_attr = id_attr or "id"566 result = deepcopy(data)567 if not self_link:568 self_link = "/%ss/%s" % (model_type, data[id_attr])569 if api_id:570 self_link = "/restapis/%s/%s" % (api_id, self_link)571 if "_links" not in result:572 result["_links"] = {}...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!