Best Python code snippet using localstack_python
api_generator.py
Source:api_generator.py
1import logging2from collections import namedtuple3from samtranslator.metrics.method_decorator import cw_timer4from samtranslator.model.intrinsics import ref, fnGetAtt, make_or_condition5from samtranslator.model.apigateway import (6 ApiGatewayDeployment,7 ApiGatewayRestApi,8 ApiGatewayStage,9 ApiGatewayAuthorizer,10 ApiGatewayResponse,11 ApiGatewayDomainName,12 ApiGatewayBasePathMapping,13 ApiGatewayUsagePlan,14 ApiGatewayUsagePlanKey,15 ApiGatewayApiKey,16)17from samtranslator.model.route53 import Route53RecordSetGroup18from samtranslator.model.exceptions import InvalidResourceException, InvalidTemplateException, InvalidDocumentException19from samtranslator.model.s3_utils.uri_parser import parse_s3_uri20from samtranslator.region_configuration import RegionConfiguration21from samtranslator.swagger.swagger import SwaggerEditor22from samtranslator.model.intrinsics import is_intrinsic, fnSub23from samtranslator.model.lambda_ import LambdaPermission24from samtranslator.translator.logical_id_generator import LogicalIdGenerator25from samtranslator.translator.arn_generator import ArnGenerator26from samtranslator.model.tags.resource_tagging import get_tag_list27from samtranslator.utils.py27hash_fix import Py27Dict, Py27UniStr28LOG = logging.getLogger(__name__)29_CORS_WILDCARD = "'*'"30CorsProperties = namedtuple(31 "CorsProperties", ["AllowMethods", "AllowHeaders", "AllowOrigin", "MaxAge", "AllowCredentials"]32)33# Default the Cors Properties to '*' wildcard and False AllowCredentials. Other properties are actually Optional34CorsProperties.__new__.__defaults__ = (None, None, _CORS_WILDCARD, None, False)35AuthProperties = namedtuple(36 "AuthProperties",37 [38 "Authorizers",39 "DefaultAuthorizer",40 "InvokeRole",41 "AddDefaultAuthorizerToCorsPreflight",42 "ApiKeyRequired",43 "ResourcePolicy",44 "UsagePlan",45 ],46)47AuthProperties.__new__.__defaults__ = (None, None, None, True, None, None, None)48UsagePlanProperties = namedtuple(49 "UsagePlanProperties", ["CreateUsagePlan", "Description", "Quota", "Tags", "Throttle", "UsagePlanName"]50)51UsagePlanProperties.__new__.__defaults__ = (None, None, None, None, None, None)52GatewayResponseProperties = ["ResponseParameters", "ResponseTemplates", "StatusCode"]53class SharedApiUsagePlan(object):54 """55 Collects API information from different API resources in the same template,56 so that these information can be used in the shared usage plan57 """58 SHARED_USAGE_PLAN_CONDITION_NAME = "SharedUsagePlanCondition"59 def __init__(self):60 self.usage_plan_shared = False61 self.stage_keys_shared = list()62 self.api_stages_shared = list()63 self.depends_on_shared = list()64 # shared resource level attributes65 self.conditions = set()66 self.any_api_without_condition = False67 self.deletion_policy = None68 self.update_replace_policy = None69 def get_combined_resource_attributes(self, resource_attributes, conditions):70 """71 This method returns a dictionary which combines 'DeletionPolicy', 'UpdateReplacePolicy' and 'Condition'72 values of API definitions that could be used in Shared Usage Plan resources.73 Parameters74 ----------75 resource_attributes: Dict[str]76 A dictionary of resource level attributes of the API resource77 conditions: Dict[str]78 Conditions section of the template79 """80 self._set_deletion_policy(resource_attributes.get("DeletionPolicy"))81 self._set_update_replace_policy(resource_attributes.get("UpdateReplacePolicy"))82 self._set_condition(resource_attributes.get("Condition"), conditions)83 combined_resource_attributes = dict()84 if self.deletion_policy:85 combined_resource_attributes["DeletionPolicy"] = self.deletion_policy86 if self.update_replace_policy:87 combined_resource_attributes["UpdateReplacePolicy"] = self.update_replace_policy88 # do not set Condition if any of the API resource does not have Condition in it89 if self.conditions and not self.any_api_without_condition:90 combined_resource_attributes["Condition"] = SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME91 return combined_resource_attributes92 def _set_deletion_policy(self, deletion_policy):93 if deletion_policy:94 if self.deletion_policy:95 # update only if new deletion policy is Retain96 if deletion_policy == "Retain":97 self.deletion_policy = deletion_policy98 else:99 self.deletion_policy = deletion_policy100 def _set_update_replace_policy(self, update_replace_policy):101 if update_replace_policy:102 if self.update_replace_policy:103 # if new value is Retain or104 # new value is retain and current value is Delete then update its value105 if (update_replace_policy == "Retain") or (106 update_replace_policy == "Snapshot" and self.update_replace_policy == "Delete"107 ):108 self.update_replace_policy = update_replace_policy109 else:110 self.update_replace_policy = update_replace_policy111 def _set_condition(self, condition, template_conditions):112 # if there are any API without condition, then skip113 if self.any_api_without_condition:114 return115 if condition and condition not in self.conditions:116 if template_conditions is None:117 raise InvalidTemplateException(118 "Can't have condition without having 'Conditions' section in the template"119 )120 if self.conditions:121 self.conditions.add(condition)122 or_condition = make_or_condition(self.conditions)123 template_conditions[SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME] = or_condition124 else:125 self.conditions.add(condition)126 template_conditions[SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME] = condition127 elif condition is None:128 self.any_api_without_condition = True129 if template_conditions and SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME in template_conditions:130 del template_conditions[SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME]131class ApiGenerator(object):132 def __init__(133 self,134 logical_id,135 cache_cluster_enabled,136 cache_cluster_size,137 variables,138 depends_on,139 definition_body,140 definition_uri,141 name,142 stage_name,143 shared_api_usage_plan,144 template_conditions,145 tags=None,146 endpoint_configuration=None,147 method_settings=None,148 binary_media=None,149 minimum_compression_size=None,150 disable_execute_api_endpoint=None,151 cors=None,152 auth=None,153 gateway_responses=None,154 access_log_setting=None,155 canary_setting=None,156 tracing_enabled=None,157 resource_attributes=None,158 passthrough_resource_attributes=None,159 open_api_version=None,160 models=None,161 domain=None,162 fail_on_warnings=None,163 description=None,164 mode=None,165 api_key_source_type=None,166 ):167 """Constructs an API Generator class that generates API Gateway resources168 :param logical_id: Logical id of the SAM API Resource169 :param cache_cluster_enabled: Whether cache cluster is enabled170 :param cache_cluster_size: Size of the cache cluster171 :param variables: API Gateway Variables172 :param depends_on: Any resources that need to be depended on173 :param definition_body: API definition174 :param definition_uri: URI to API definition175 :param name: Name of the API Gateway resource176 :param stage_name: Name of the Stage177 :param tags: Stage Tags178 :param access_log_setting: Whether to send access logs and where for Stage179 :param canary_setting: Canary Setting for Stage180 :param tracing_enabled: Whether active tracing with X-ray is enabled181 :param resource_attributes: Resource attributes to add to API resources182 :param passthrough_resource_attributes: Attributes such as `Condition` that are added to derived resources183 :param models: Model definitions to be used by API methods184 :param description: Description of the API Gateway resource185 """186 self.logical_id = logical_id187 self.cache_cluster_enabled = cache_cluster_enabled188 self.cache_cluster_size = cache_cluster_size189 self.variables = variables190 self.depends_on = depends_on191 self.definition_body = definition_body192 self.definition_uri = definition_uri193 self.name = name194 self.stage_name = stage_name195 self.tags = tags196 self.endpoint_configuration = endpoint_configuration197 self.method_settings = method_settings198 self.binary_media = binary_media199 self.minimum_compression_size = minimum_compression_size200 self.disable_execute_api_endpoint = disable_execute_api_endpoint201 self.cors = cors202 self.auth = auth203 self.gateway_responses = gateway_responses204 self.access_log_setting = access_log_setting205 self.canary_setting = canary_setting206 self.tracing_enabled = tracing_enabled207 self.resource_attributes = resource_attributes208 self.passthrough_resource_attributes = passthrough_resource_attributes209 self.open_api_version = open_api_version210 self.remove_extra_stage = open_api_version211 self.models = models212 self.domain = domain213 self.fail_on_warnings = fail_on_warnings214 self.description = description215 self.shared_api_usage_plan = shared_api_usage_plan216 self.template_conditions = template_conditions217 self.mode = mode218 self.api_key_source_type = api_key_source_type219 def _construct_rest_api(self):220 """Constructs and returns the ApiGateway RestApi.221 :returns: the RestApi to which this SAM Api corresponds222 :rtype: model.apigateway.ApiGatewayRestApi223 """224 rest_api = ApiGatewayRestApi(self.logical_id, depends_on=self.depends_on, attributes=self.resource_attributes)225 # NOTE: For backwards compatibility we need to retain BinaryMediaTypes on the CloudFormation Property226 # Removing this and only setting x-amazon-apigateway-binary-media-types results in other issues.227 rest_api.BinaryMediaTypes = self.binary_media228 rest_api.MinimumCompressionSize = self.minimum_compression_size229 if self.endpoint_configuration:230 self._set_endpoint_configuration(rest_api, self.endpoint_configuration)231 elif not RegionConfiguration.is_apigw_edge_configuration_supported():232 # Since this region does not support EDGE configuration, we explicitly set the endpoint type233 # to Regional which is the only supported config.234 self._set_endpoint_configuration(rest_api, "REGIONAL")235 if self.definition_uri and self.definition_body:236 raise InvalidResourceException(237 self.logical_id, "Specify either 'DefinitionUri' or 'DefinitionBody' property and not both."238 )239 if self.open_api_version:240 if not SwaggerEditor.safe_compare_regex_with_string(241 SwaggerEditor.get_openapi_versions_supported_regex(), self.open_api_version242 ):243 raise InvalidResourceException(244 self.logical_id, "The OpenApiVersion value must be of the format '3.0.0'."245 )246 self._add_cors()247 self._add_auth()248 self._add_gateway_responses()249 self._add_binary_media_types()250 self._add_models()251 if self.fail_on_warnings:252 rest_api.FailOnWarnings = self.fail_on_warnings253 if self.disable_execute_api_endpoint is not None:254 self._add_endpoint_extension()255 if self.definition_uri:256 rest_api.BodyS3Location = self._construct_body_s3_dict()257 elif self.definition_body:258 # # Post Process OpenApi Auth Settings259 self.definition_body = self._openapi_postprocess(self.definition_body)260 rest_api.Body = self.definition_body261 if self.name:262 rest_api.Name = self.name263 if self.description:264 rest_api.Description = self.description265 if self.mode:266 rest_api.Mode = self.mode267 if self.api_key_source_type:268 rest_api.ApiKeySourceType = self.api_key_source_type269 return rest_api270 def _add_endpoint_extension(self):271 """Add disableExecuteApiEndpoint if it is set in SAM272 Note:273 If neither DefinitionUri nor DefinitionBody are specified,274 SAM will generate a openapi definition body based on template configuration.275 https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-resource-api.html#sam-api-definitionbody276 For this reason, we always put DisableExecuteApiEndpoint into openapi object irrespective of origin of DefinitionBody.277 """278 if self.disable_execute_api_endpoint is not None and not self.definition_body:279 raise InvalidResourceException(280 self.logical_id, "DisableExecuteApiEndpoint works only within 'DefinitionBody' property."281 )282 editor = SwaggerEditor(self.definition_body)283 editor.add_disable_execute_api_endpoint_extension(self.disable_execute_api_endpoint)284 self.definition_body = editor.swagger285 def _construct_body_s3_dict(self):286 """Constructs the RestApi's `BodyS3Location property`_, from the SAM Api's DefinitionUri property.287 :returns: a BodyS3Location dict, containing the S3 Bucket, Key, and Version of the Swagger definition288 :rtype: dict289 """290 if isinstance(self.definition_uri, dict):291 if not self.definition_uri.get("Bucket", None) or not self.definition_uri.get("Key", None):292 # DefinitionUri is a dictionary but does not contain Bucket or Key property293 raise InvalidResourceException(294 self.logical_id, "'DefinitionUri' requires Bucket and Key properties to be specified."295 )296 s3_pointer = self.definition_uri297 else:298 # DefinitionUri is a string299 s3_pointer = parse_s3_uri(self.definition_uri)300 if s3_pointer is None:301 raise InvalidResourceException(302 self.logical_id,303 "'DefinitionUri' is not a valid S3 Uri of the form "304 "'s3://bucket/key' with optional versionId query parameter.",305 )306 if isinstance(self.definition_uri, Py27UniStr):307 # self.defintion_uri is a Py27UniStr instance if it is defined in the template308 # we need to preserve the Py27UniStr type309 s3_pointer["Bucket"] = Py27UniStr(s3_pointer["Bucket"])310 s3_pointer["Key"] = Py27UniStr(s3_pointer["Key"])311 if "Version" in s3_pointer:312 s3_pointer["Version"] = Py27UniStr(s3_pointer["Version"])313 # Construct body_s3 as py27 dict314 body_s3 = Py27Dict()315 body_s3["Bucket"] = s3_pointer["Bucket"]316 body_s3["Key"] = s3_pointer["Key"]317 if "Version" in s3_pointer:318 body_s3["Version"] = s3_pointer["Version"]319 return body_s3320 def _construct_deployment(self, rest_api):321 """Constructs and returns the ApiGateway Deployment.322 :param model.apigateway.ApiGatewayRestApi rest_api: the RestApi for this Deployment323 :returns: the Deployment to which this SAM Api corresponds324 :rtype: model.apigateway.ApiGatewayDeployment325 """326 deployment = ApiGatewayDeployment(327 self.logical_id + "Deployment", attributes=self.passthrough_resource_attributes328 )329 deployment.RestApiId = rest_api.get_runtime_attr("rest_api_id")330 if not self.remove_extra_stage:331 deployment.StageName = "Stage"332 return deployment333 def _construct_stage(self, deployment, swagger, redeploy_restapi_parameters):334 """Constructs and returns the ApiGateway Stage.335 :param model.apigateway.ApiGatewayDeployment deployment: the Deployment for this Stage336 :returns: the Stage to which this SAM Api corresponds337 :rtype: model.apigateway.ApiGatewayStage338 """339 # If StageName is some intrinsic function, then don't prefix the Stage's logical ID340 # This will NOT create duplicates because we allow only ONE stage per API resource341 stage_name_prefix = self.stage_name if isinstance(self.stage_name, str) else ""342 if stage_name_prefix.isalnum():343 stage_logical_id = self.logical_id + stage_name_prefix + "Stage"344 else:345 generator = LogicalIdGenerator(self.logical_id + "Stage", stage_name_prefix)346 stage_logical_id = generator.gen()347 stage = ApiGatewayStage(stage_logical_id, attributes=self.passthrough_resource_attributes)348 stage.RestApiId = ref(self.logical_id)349 stage.update_deployment_ref(deployment.logical_id)350 stage.StageName = self.stage_name351 stage.CacheClusterEnabled = self.cache_cluster_enabled352 stage.CacheClusterSize = self.cache_cluster_size353 stage.Variables = self.variables354 stage.MethodSettings = self.method_settings355 stage.AccessLogSetting = self.access_log_setting356 stage.CanarySetting = self.canary_setting357 stage.TracingEnabled = self.tracing_enabled358 if swagger is not None:359 deployment.make_auto_deployable(360 stage, self.remove_extra_stage, swagger, self.domain, redeploy_restapi_parameters361 )362 if self.tags is not None:363 stage.Tags = get_tag_list(self.tags)364 return stage365 def _construct_api_domain(self, rest_api, route53_record_set_groups):366 """367 Constructs and returns the ApiGateway Domain and BasepathMapping368 """369 if self.domain is None:370 return None, None, None371 if self.domain.get("DomainName") is None or self.domain.get("CertificateArn") is None:372 raise InvalidResourceException(373 self.logical_id, "Custom Domains only works if both DomainName and CertificateArn" " are provided."374 )375 self.domain["ApiDomainName"] = "{}{}".format(376 "ApiGatewayDomainName", LogicalIdGenerator("", self.domain.get("DomainName")).gen()377 )378 domain = ApiGatewayDomainName(self.domain.get("ApiDomainName"), attributes=self.passthrough_resource_attributes)379 domain.DomainName = self.domain.get("DomainName")380 endpoint = self.domain.get("EndpointConfiguration")381 if endpoint is None:382 endpoint = "REGIONAL"383 self.domain["EndpointConfiguration"] = "REGIONAL"384 elif endpoint not in ["EDGE", "REGIONAL", "PRIVATE"]:385 raise InvalidResourceException(386 self.logical_id,387 "EndpointConfiguration for Custom Domains must be"388 " one of {}.".format(["EDGE", "REGIONAL", "PRIVATE"]),389 )390 if endpoint == "REGIONAL":391 domain.RegionalCertificateArn = self.domain.get("CertificateArn")392 else:393 domain.CertificateArn = self.domain.get("CertificateArn")394 domain.EndpointConfiguration = {"Types": [endpoint]}395 mutual_tls_auth = self.domain.get("MutualTlsAuthentication", None)396 if mutual_tls_auth:397 if isinstance(mutual_tls_auth, dict):398 if not set(mutual_tls_auth.keys()).issubset({"TruststoreUri", "TruststoreVersion"}):399 invalid_keys = list()400 for key in mutual_tls_auth.keys():401 if not key in {"TruststoreUri", "TruststoreVersion"}:402 invalid_keys.append(key)403 invalid_keys.sort()404 raise InvalidResourceException(405 ",".join(invalid_keys),406 "Available MutualTlsAuthentication fields are {}.".format(407 ["TruststoreUri", "TruststoreVersion"]408 ),409 )410 domain.MutualTlsAuthentication = {}411 if mutual_tls_auth.get("TruststoreUri", None):412 domain.MutualTlsAuthentication["TruststoreUri"] = mutual_tls_auth["TruststoreUri"]413 if mutual_tls_auth.get("TruststoreVersion", None):414 domain.MutualTlsAuthentication["TruststoreVersion"] = mutual_tls_auth["TruststoreVersion"]415 else:416 raise InvalidResourceException(417 mutual_tls_auth,418 "MutualTlsAuthentication must be a map with at least one of the following fields {}.".format(419 ["TruststoreUri", "TruststoreVersion"]420 ),421 )422 if self.domain.get("SecurityPolicy", None):423 domain.SecurityPolicy = self.domain["SecurityPolicy"]424 if self.domain.get("OwnershipVerificationCertificateArn", None):425 domain.OwnershipVerificationCertificateArn = self.domain["OwnershipVerificationCertificateArn"]426 # Create BasepathMappings427 if self.domain.get("BasePath") and isinstance(self.domain.get("BasePath"), str):428 basepaths = [self.domain.get("BasePath")]429 elif self.domain.get("BasePath") and isinstance(self.domain.get("BasePath"), list):430 basepaths = self.domain.get("BasePath")431 else:432 basepaths = None433 basepath_resource_list = []434 if basepaths is None:435 basepath_mapping = ApiGatewayBasePathMapping(436 self.logical_id + "BasePathMapping", attributes=self.passthrough_resource_attributes437 )438 basepath_mapping.DomainName = ref(self.domain.get("ApiDomainName"))439 basepath_mapping.RestApiId = ref(rest_api.logical_id)440 basepath_mapping.Stage = ref(rest_api.logical_id + ".Stage")441 basepath_resource_list.extend([basepath_mapping])442 else:443 for path in basepaths:444 path = "".join(e for e in path if e.isalnum())445 logical_id = "{}{}{}".format(self.logical_id, path, "BasePathMapping")446 basepath_mapping = ApiGatewayBasePathMapping(447 logical_id, attributes=self.passthrough_resource_attributes448 )449 basepath_mapping.DomainName = ref(self.domain.get("ApiDomainName"))450 basepath_mapping.RestApiId = ref(rest_api.logical_id)451 basepath_mapping.Stage = ref(rest_api.logical_id + ".Stage")452 basepath_mapping.BasePath = path453 basepath_resource_list.extend([basepath_mapping])454 # Create the Route53 RecordSetGroup resource455 record_set_group = None456 if self.domain.get("Route53") is not None:457 route53 = self.domain.get("Route53")458 if not isinstance(route53, dict):459 raise InvalidResourceException(460 self.logical_id,461 "Invalid property type '{}' for Route53. "462 "Expected a map defines an Amazon Route 53 configuration'.".format(type(route53).__name__),463 )464 if route53.get("HostedZoneId") is None and route53.get("HostedZoneName") is None:465 raise InvalidResourceException(466 self.logical_id,467 "HostedZoneId or HostedZoneName is required to enable Route53 support on Custom Domains.",468 )469 logical_id_suffix = LogicalIdGenerator(470 "", route53.get("HostedZoneId") or route53.get("HostedZoneName")471 ).gen()472 logical_id = "RecordSetGroup" + logical_id_suffix473 record_set_group = route53_record_set_groups.get(logical_id)474 if not record_set_group:475 record_set_group = Route53RecordSetGroup(logical_id, attributes=self.passthrough_resource_attributes)476 if "HostedZoneId" in route53:477 record_set_group.HostedZoneId = route53.get("HostedZoneId")478 if "HostedZoneName" in route53:479 record_set_group.HostedZoneName = route53.get("HostedZoneName")480 record_set_group.RecordSets = []481 route53_record_set_groups[logical_id] = record_set_group482 record_set_group.RecordSets += self._construct_record_sets_for_domain(self.domain)483 return domain, basepath_resource_list, record_set_group484 def _construct_record_sets_for_domain(self, domain):485 recordset_list = []486 recordset = {}487 route53 = domain.get("Route53")488 recordset["Name"] = domain.get("DomainName")489 recordset["Type"] = "A"490 recordset["AliasTarget"] = self._construct_alias_target(self.domain)491 recordset_list.extend([recordset])492 recordset_ipv6 = {}493 if route53.get("IpV6") is not None and route53.get("IpV6") is True:494 recordset_ipv6["Name"] = domain.get("DomainName")495 recordset_ipv6["Type"] = "AAAA"496 recordset_ipv6["AliasTarget"] = self._construct_alias_target(self.domain)497 recordset_list.extend([recordset_ipv6])498 return recordset_list499 def _construct_alias_target(self, domain):500 alias_target = {}501 route53 = domain.get("Route53")502 target_health = route53.get("EvaluateTargetHealth")503 if target_health is not None:504 alias_target["EvaluateTargetHealth"] = target_health505 if domain.get("EndpointConfiguration") == "REGIONAL":506 alias_target["HostedZoneId"] = fnGetAtt(self.domain.get("ApiDomainName"), "RegionalHostedZoneId")507 alias_target["DNSName"] = fnGetAtt(self.domain.get("ApiDomainName"), "RegionalDomainName")508 else:509 if route53.get("DistributionDomainName") is None:510 route53["DistributionDomainName"] = fnGetAtt(self.domain.get("ApiDomainName"), "DistributionDomainName")511 alias_target["HostedZoneId"] = "Z2FDTNDATAQYW2"512 alias_target["DNSName"] = route53.get("DistributionDomainName")513 return alias_target514 @cw_timer(prefix="Generator", name="Api")515 def to_cloudformation(self, redeploy_restapi_parameters, route53_record_set_groups):516 """Generates CloudFormation resources from a SAM API resource517 :returns: a tuple containing the RestApi, Deployment, and Stage for an empty Api.518 :rtype: tuple519 """520 rest_api = self._construct_rest_api()521 domain, basepath_mapping, route53 = self._construct_api_domain(rest_api, route53_record_set_groups)522 deployment = self._construct_deployment(rest_api)523 swagger = None524 if rest_api.Body is not None:525 swagger = rest_api.Body526 elif rest_api.BodyS3Location is not None:527 swagger = rest_api.BodyS3Location528 stage = self._construct_stage(deployment, swagger, redeploy_restapi_parameters)529 permissions = self._construct_authorizer_lambda_permission()530 usage_plan = self._construct_usage_plan(rest_api_stage=stage)531 return rest_api, deployment, stage, permissions, domain, basepath_mapping, route53, usage_plan532 def _add_cors(self):533 """534 Add CORS configuration to the Swagger file, if necessary535 """536 INVALID_ERROR = "Invalid value for 'Cors' property"537 if not self.cors:538 return539 if self.cors and not self.definition_body:540 raise InvalidResourceException(541 self.logical_id, "Cors works only with inline Swagger specified in 'DefinitionBody' property."542 )543 if isinstance(self.cors, str) or is_intrinsic(self.cors):544 # Just set Origin property. Others will be defaults545 properties = CorsProperties(AllowOrigin=self.cors)546 elif isinstance(self.cors, dict):547 # Make sure keys in the dict are recognized548 if not all(key in CorsProperties._fields for key in self.cors.keys()):549 raise InvalidResourceException(self.logical_id, INVALID_ERROR)550 properties = CorsProperties(**self.cors)551 else:552 raise InvalidResourceException(self.logical_id, INVALID_ERROR)553 if not SwaggerEditor.is_valid(self.definition_body):554 raise InvalidResourceException(555 self.logical_id,556 "Unable to add Cors configuration because "557 "'DefinitionBody' does not contain a valid Swagger definition.",558 )559 if properties.AllowCredentials is True and properties.AllowOrigin == _CORS_WILDCARD:560 raise InvalidResourceException(561 self.logical_id,562 "Unable to add Cors configuration because "563 "'AllowCredentials' can not be true when "564 "'AllowOrigin' is \"'*'\" or not set",565 )566 editor = SwaggerEditor(self.definition_body)567 for path in editor.iter_on_path():568 try:569 editor.add_cors(570 path,571 properties.AllowOrigin,572 properties.AllowHeaders,573 properties.AllowMethods,574 max_age=properties.MaxAge,575 allow_credentials=properties.AllowCredentials,576 )577 except InvalidTemplateException as ex:578 raise InvalidResourceException(self.logical_id, ex.message)579 # Assign the Swagger back to template580 self.definition_body = editor.swagger581 def _add_binary_media_types(self):582 """583 Add binary media types to Swagger584 """585 if not self.binary_media:586 return587 # We don't raise an error here like we do for similar cases because that would be backwards incompatible588 if self.binary_media and not self.definition_body:589 return590 editor = SwaggerEditor(self.definition_body)591 editor.add_binary_media_types(self.binary_media)592 # Assign the Swagger back to template593 self.definition_body = editor.swagger594 def _add_auth(self):595 """596 Add Auth configuration to the Swagger file, if necessary597 """598 if not self.auth:599 return600 if self.auth and not self.definition_body:601 raise InvalidResourceException(602 self.logical_id, "Auth works only with inline Swagger specified in " "'DefinitionBody' property."603 )604 # Make sure keys in the dict are recognized605 if not all(key in AuthProperties._fields for key in self.auth.keys()):606 raise InvalidResourceException(self.logical_id, "Invalid value for 'Auth' property")607 if not SwaggerEditor.is_valid(self.definition_body):608 raise InvalidResourceException(609 self.logical_id,610 "Unable to add Auth configuration because "611 "'DefinitionBody' does not contain a valid Swagger definition.",612 )613 swagger_editor = SwaggerEditor(self.definition_body)614 auth_properties = AuthProperties(**self.auth)615 authorizers = self._get_authorizers(auth_properties.Authorizers, auth_properties.DefaultAuthorizer)616 if authorizers:617 swagger_editor.add_authorizers_security_definitions(authorizers)618 self._set_default_authorizer(619 swagger_editor,620 authorizers,621 auth_properties.DefaultAuthorizer,622 auth_properties.AddDefaultAuthorizerToCorsPreflight,623 auth_properties.Authorizers,624 )625 if auth_properties.ApiKeyRequired:626 swagger_editor.add_apikey_security_definition()627 self._set_default_apikey_required(swagger_editor)628 if auth_properties.ResourcePolicy:629 SwaggerEditor.validate_is_dict(630 auth_properties.ResourcePolicy, "ResourcePolicy must be a map (ResourcePolicyStatement)."631 )632 for path in swagger_editor.iter_on_path():633 swagger_editor.add_resource_policy(auth_properties.ResourcePolicy, path, self.stage_name)634 if auth_properties.ResourcePolicy.get("CustomStatements"):635 swagger_editor.add_custom_statements(auth_properties.ResourcePolicy.get("CustomStatements"))636 self.definition_body = self._openapi_postprocess(swagger_editor.swagger)637 def _construct_usage_plan(self, rest_api_stage=None):638 """Constructs and returns the ApiGateway UsagePlan, ApiGateway UsagePlanKey, ApiGateway ApiKey for Auth.639 :param model.apigateway.ApiGatewayStage stage: the stage of rest api640 :returns: UsagePlan, UsagePlanKey, ApiKey for this rest Api641 :rtype: model.apigateway.ApiGatewayUsagePlan, model.apigateway.ApiGatewayUsagePlanKey,642 model.apigateway.ApiGatewayApiKey643 """644 create_usage_plans_accepted_values = ["SHARED", "PER_API", "NONE"]645 if not self.auth:646 return []647 auth_properties = AuthProperties(**self.auth)648 if auth_properties.UsagePlan is None:649 return []650 usage_plan_properties = auth_properties.UsagePlan651 # throws error if UsagePlan is not a dict652 if not isinstance(usage_plan_properties, dict):653 raise InvalidResourceException(self.logical_id, "'UsagePlan' must be a dictionary")654 # throws error if the property invalid/ unsupported for UsagePlan655 if not all(key in UsagePlanProperties._fields for key in usage_plan_properties.keys()):656 raise InvalidResourceException(self.logical_id, "Invalid property for 'UsagePlan'")657 create_usage_plan = usage_plan_properties.get("CreateUsagePlan")658 usage_plan = None659 api_key = None660 usage_plan_key = None661 if create_usage_plan is None:662 raise InvalidResourceException(self.logical_id, "'CreateUsagePlan' is a required field for UsagePlan.")663 if create_usage_plan not in create_usage_plans_accepted_values:664 raise InvalidResourceException(665 self.logical_id, "'CreateUsagePlan' accepts one of {}.".format(create_usage_plans_accepted_values)666 )667 if create_usage_plan == "NONE":668 return []669 # create usage plan for this api only670 elif usage_plan_properties.get("CreateUsagePlan") == "PER_API":671 usage_plan_logical_id = self.logical_id + "UsagePlan"672 usage_plan = ApiGatewayUsagePlan(673 logical_id=usage_plan_logical_id,674 depends_on=[self.logical_id],675 attributes=self.passthrough_resource_attributes,676 )677 api_stages = list()678 api_stage = dict()679 api_stage["ApiId"] = ref(self.logical_id)680 api_stage["Stage"] = ref(rest_api_stage.logical_id)681 api_stages.append(api_stage)682 usage_plan.ApiStages = api_stages683 api_key = self._construct_api_key(usage_plan_logical_id, create_usage_plan, rest_api_stage)684 usage_plan_key = self._construct_usage_plan_key(usage_plan_logical_id, create_usage_plan, api_key)685 # create a usage plan for all the Apis686 elif create_usage_plan == "SHARED":687 LOG.info("Creating SHARED usage plan for all the Apis")688 usage_plan_logical_id = "ServerlessUsagePlan"689 if self.logical_id not in self.shared_api_usage_plan.depends_on_shared:690 self.shared_api_usage_plan.depends_on_shared.append(self.logical_id)691 usage_plan = ApiGatewayUsagePlan(692 logical_id=usage_plan_logical_id,693 depends_on=self.shared_api_usage_plan.depends_on_shared,694 attributes=self.shared_api_usage_plan.get_combined_resource_attributes(695 self.passthrough_resource_attributes, self.template_conditions696 ),697 )698 api_stage = dict()699 api_stage["ApiId"] = ref(self.logical_id)700 api_stage["Stage"] = ref(rest_api_stage.logical_id)701 if api_stage not in self.shared_api_usage_plan.api_stages_shared:702 self.shared_api_usage_plan.api_stages_shared.append(api_stage)703 usage_plan.ApiStages = self.shared_api_usage_plan.api_stages_shared704 api_key = self._construct_api_key(usage_plan_logical_id, create_usage_plan, rest_api_stage)705 usage_plan_key = self._construct_usage_plan_key(usage_plan_logical_id, create_usage_plan, api_key)706 if usage_plan_properties.get("UsagePlanName"):707 usage_plan.UsagePlanName = usage_plan_properties.get("UsagePlanName")708 if usage_plan_properties.get("Description"):709 usage_plan.Description = usage_plan_properties.get("Description")710 if usage_plan_properties.get("Quota"):711 usage_plan.Quota = usage_plan_properties.get("Quota")712 if usage_plan_properties.get("Tags"):713 usage_plan.Tags = usage_plan_properties.get("Tags")714 if usage_plan_properties.get("Throttle"):715 usage_plan.Throttle = usage_plan_properties.get("Throttle")716 return usage_plan, api_key, usage_plan_key717 def _construct_api_key(self, usage_plan_logical_id, create_usage_plan, rest_api_stage):718 """719 :param usage_plan_logical_id: String720 :param create_usage_plan: String721 :param rest_api_stage: model.apigateway.ApiGatewayStage stage: the stage of rest api722 :return: api_key model.apigateway.ApiGatewayApiKey resource which is created for the given usage plan723 """724 if create_usage_plan == "SHARED":725 # create an api key resource for all the apis726 LOG.info("Creating api key resource for all the Apis from SHARED usage plan")727 api_key_logical_id = "ServerlessApiKey"728 api_key = ApiGatewayApiKey(729 logical_id=api_key_logical_id,730 depends_on=[usage_plan_logical_id],731 attributes=self.shared_api_usage_plan.get_combined_resource_attributes(732 self.passthrough_resource_attributes, self.template_conditions733 ),734 )735 api_key.Enabled = True736 stage_key = dict()737 stage_key["RestApiId"] = ref(self.logical_id)738 stage_key["StageName"] = ref(rest_api_stage.logical_id)739 if stage_key not in self.shared_api_usage_plan.stage_keys_shared:740 self.shared_api_usage_plan.stage_keys_shared.append(stage_key)741 api_key.StageKeys = self.shared_api_usage_plan.stage_keys_shared742 # for create_usage_plan = "PER_API"743 else:744 # create an api key resource for this api745 api_key_logical_id = self.logical_id + "ApiKey"746 api_key = ApiGatewayApiKey(747 logical_id=api_key_logical_id,748 depends_on=[usage_plan_logical_id],749 attributes=self.passthrough_resource_attributes,750 )751 api_key.Enabled = True752 stage_keys = list()753 stage_key = dict()754 stage_key["RestApiId"] = ref(self.logical_id)755 stage_key["StageName"] = ref(rest_api_stage.logical_id)756 stage_keys.append(stage_key)757 api_key.StageKeys = stage_keys758 return api_key759 def _construct_usage_plan_key(self, usage_plan_logical_id, create_usage_plan, api_key):760 """761 :param usage_plan_logical_id: String762 :param create_usage_plan: String763 :param api_key: model.apigateway.ApiGatewayApiKey resource764 :return: model.apigateway.ApiGatewayUsagePlanKey resource that contains the mapping between usage plan and api key765 """766 if create_usage_plan == "SHARED":767 # create a mapping between api key and the usage plan768 usage_plan_key_logical_id = "ServerlessUsagePlanKey"769 resource_attributes = self.shared_api_usage_plan.get_combined_resource_attributes(770 self.passthrough_resource_attributes, self.template_conditions771 )772 # for create_usage_plan = "PER_API"773 else:774 # create a mapping between api key and the usage plan775 usage_plan_key_logical_id = self.logical_id + "UsagePlanKey"776 resource_attributes = self.passthrough_resource_attributes777 usage_plan_key = ApiGatewayUsagePlanKey(778 logical_id=usage_plan_key_logical_id,779 depends_on=[api_key.logical_id],780 attributes=resource_attributes,781 )782 usage_plan_key.KeyId = ref(api_key.logical_id)783 usage_plan_key.KeyType = "API_KEY"784 usage_plan_key.UsagePlanId = ref(usage_plan_logical_id)785 return usage_plan_key786 def _add_gateway_responses(self):787 """788 Add Gateway Response configuration to the Swagger file, if necessary789 """790 if not self.gateway_responses:791 return792 if self.gateway_responses and not self.definition_body:793 raise InvalidResourceException(794 self.logical_id,795 "GatewayResponses works only with inline Swagger specified in " "'DefinitionBody' property.",796 )797 # Make sure keys in the dict are recognized798 for responses_key, responses_value in self.gateway_responses.items():799 if is_intrinsic(responses_value):800 # TODO: Add intrinsic support for this field.801 raise InvalidResourceException(802 self.logical_id,803 "Unable to set GatewayResponses attribute because "804 "intrinsic functions are not supported for this field.",805 )806 elif not isinstance(responses_value, dict):807 raise InvalidResourceException(808 self.logical_id,809 "Invalid property type '{}' for GatewayResponses. "810 "Expected an object of type 'GatewayResponse'.".format(type(responses_value).__name__),811 )812 for response_key in responses_value.keys():813 if response_key not in GatewayResponseProperties:814 raise InvalidResourceException(815 self.logical_id,816 "Invalid property '{}' in 'GatewayResponses' property '{}'.".format(817 response_key, responses_key818 ),819 )820 if not SwaggerEditor.is_valid(self.definition_body):821 raise InvalidResourceException(822 self.logical_id,823 "Unable to add Auth configuration because "824 "'DefinitionBody' does not contain a valid Swagger definition.",825 )826 swagger_editor = SwaggerEditor(self.definition_body)827 # The dicts below will eventually become part of swagger/openapi definition, thus requires using Py27Dict()828 gateway_responses = Py27Dict()829 for response_type, response in self.gateway_responses.items():830 gateway_responses[response_type] = ApiGatewayResponse(831 api_logical_id=self.logical_id,832 response_parameters=response.get("ResponseParameters", Py27Dict()),833 response_templates=response.get("ResponseTemplates", Py27Dict()),834 status_code=response.get("StatusCode", None),835 )836 if gateway_responses:837 swagger_editor.add_gateway_responses(gateway_responses)838 # Assign the Swagger back to template839 self.definition_body = swagger_editor.swagger840 def _add_models(self):841 """842 Add Model definitions to the Swagger file, if necessary843 :return:844 """845 if not self.models:846 return847 if self.models and not self.definition_body:848 raise InvalidResourceException(849 self.logical_id, "Models works only with inline Swagger specified in " "'DefinitionBody' property."850 )851 if not SwaggerEditor.is_valid(self.definition_body):852 raise InvalidResourceException(853 self.logical_id,854 "Unable to add Models definitions because "855 "'DefinitionBody' does not contain a valid Swagger definition.",856 )857 if not all(isinstance(model, dict) for model in self.models.values()):858 raise InvalidResourceException(self.logical_id, "Invalid value for 'Models' property")859 swagger_editor = SwaggerEditor(self.definition_body)860 swagger_editor.add_models(self.models)861 # Assign the Swagger back to template862 self.definition_body = self._openapi_postprocess(swagger_editor.swagger)863 def _openapi_postprocess(self, definition_body):864 """865 Convert definitions to openapi 3 in definition body if OpenApiVersion flag is specified.866 If the is swagger defined in the definition body, we treat it as a swagger spec and do not867 make any openapi 3 changes to it868 """869 if definition_body.get("swagger") is not None:870 return definition_body871 if definition_body.get("openapi") is not None and self.open_api_version is None:872 self.open_api_version = definition_body.get("openapi")873 if self.open_api_version and SwaggerEditor.safe_compare_regex_with_string(874 SwaggerEditor.get_openapi_version_3_regex(), self.open_api_version875 ):876 if definition_body.get("securityDefinitions"):877 components = definition_body.get("components", Py27Dict())878 # In the previous line, the default value `Py27Dict()` will be only returned only if `components`879 # property is not in definition_body dict, but if it exist, and its value is None, so None will be880 # returned and not the default value. That is why the below line is required.881 components = components if components else Py27Dict()882 components["securitySchemes"] = definition_body["securityDefinitions"]883 definition_body["components"] = components884 del definition_body["securityDefinitions"]885 if definition_body.get("definitions"):886 components = definition_body.get("components", Py27Dict())887 components["schemas"] = definition_body["definitions"]888 definition_body["components"] = components889 del definition_body["definitions"]890 # removes `consumes` and `produces` options for CORS in openapi3 and891 # adds `schema` for the headers in responses for openapi3892 paths = definition_body.get("paths")893 if paths:894 for path, path_item in paths.items():895 SwaggerEditor.validate_path_item_is_dict(path_item, path)896 if path_item.get("options"):897 options = path_item.get("options").copy()898 for field, field_val in options.items():899 # remove unsupported produces and consumes in options for openapi3900 if field in ["produces", "consumes"]:901 del definition_body["paths"][path]["options"][field]902 # add schema for the headers in options section for openapi3903 if field in ["responses"]:904 SwaggerEditor.validate_is_dict(905 field_val,906 "Value of responses in options method for path {} must be a "907 "dictionary according to Swagger spec.".format(path),908 )909 if field_val.get("200") and field_val.get("200").get("headers"):910 headers = field_val["200"]["headers"]911 for header, header_val in headers.items():912 new_header_val_with_schema = Py27Dict()913 new_header_val_with_schema["schema"] = header_val914 definition_body["paths"][path]["options"][field]["200"]["headers"][915 header916 ] = new_header_val_with_schema917 return definition_body918 def _get_authorizers(self, authorizers_config, default_authorizer=None):919 # The dict below will eventually become part of swagger/openapi definition, thus requires using Py27Dict()920 authorizers = Py27Dict()921 if default_authorizer == "AWS_IAM":922 authorizers[default_authorizer] = ApiGatewayAuthorizer(923 api_logical_id=self.logical_id, name=default_authorizer, is_aws_iam_authorizer=True924 )925 if not authorizers_config:926 if "AWS_IAM" in authorizers:927 return authorizers928 return None929 if not isinstance(authorizers_config, dict):930 raise InvalidResourceException(self.logical_id, "Authorizers must be a dictionary.")931 for authorizer_name, authorizer in authorizers_config.items():932 if not isinstance(authorizer, dict):933 raise InvalidResourceException(934 self.logical_id, "Authorizer %s must be a dictionary." % (authorizer_name)935 )936 authorizers[authorizer_name] = ApiGatewayAuthorizer(937 api_logical_id=self.logical_id,938 name=authorizer_name,939 user_pool_arn=authorizer.get("UserPoolArn"),940 function_arn=authorizer.get("FunctionArn"),941 identity=authorizer.get("Identity"),942 function_payload_type=authorizer.get("FunctionPayloadType"),943 function_invoke_role=authorizer.get("FunctionInvokeRole"),944 authorization_scopes=authorizer.get("AuthorizationScopes"),945 )946 return authorizers947 def _get_permission(self, authorizer_name, authorizer_lambda_function_arn):948 """Constructs and returns the Lambda Permission resource allowing the Authorizer to invoke the function.949 :returns: the permission resource950 :rtype: model.lambda_.LambdaPermission951 """952 rest_api = ApiGatewayRestApi(self.logical_id, depends_on=self.depends_on, attributes=self.resource_attributes)953 api_id = rest_api.get_runtime_attr("rest_api_id")954 partition = ArnGenerator.get_partition_name()955 resource = "${__ApiId__}/authorizers/*"956 source_arn = fnSub(957 ArnGenerator.generate_arn(partition=partition, service="execute-api", resource=resource),958 {"__ApiId__": api_id},959 )960 lambda_permission = LambdaPermission(961 self.logical_id + authorizer_name + "AuthorizerPermission", attributes=self.passthrough_resource_attributes962 )963 lambda_permission.Action = "lambda:InvokeFunction"964 lambda_permission.FunctionName = authorizer_lambda_function_arn965 lambda_permission.Principal = "apigateway.amazonaws.com"966 lambda_permission.SourceArn = source_arn967 return lambda_permission968 def _construct_authorizer_lambda_permission(self):969 if not self.auth:970 return []971 auth_properties = AuthProperties(**self.auth)972 authorizers = self._get_authorizers(auth_properties.Authorizers)973 if not authorizers:974 return []975 permissions = []976 for authorizer_name, authorizer in authorizers.items():977 # Construct permissions for Lambda Authorizers only978 if not authorizer.function_arn:979 continue980 permission = self._get_permission(authorizer_name, authorizer.function_arn)981 permissions.append(permission)982 return permissions983 def _set_default_authorizer(984 self, swagger_editor, authorizers, default_authorizer, add_default_auth_to_preflight=True, api_authorizers=None985 ):986 if not default_authorizer:987 return988 if not isinstance(default_authorizer, str):989 raise InvalidResourceException(990 self.logical_id,991 "DefaultAuthorizer is not a string.",992 )993 if not authorizers.get(default_authorizer) and default_authorizer != "AWS_IAM":994 raise InvalidResourceException(995 self.logical_id,996 "Unable to set DefaultAuthorizer because '"997 + default_authorizer998 + "' was not defined in 'Authorizers'.",999 )1000 for path in swagger_editor.iter_on_path():1001 swagger_editor.set_path_default_authorizer(1002 path,1003 default_authorizer,1004 authorizers=authorizers,1005 add_default_auth_to_preflight=add_default_auth_to_preflight,1006 api_authorizers=api_authorizers,1007 )1008 def _set_default_apikey_required(self, swagger_editor):1009 for path in swagger_editor.iter_on_path():1010 swagger_editor.set_path_default_apikey_required(path)1011 def _set_endpoint_configuration(self, rest_api, value):1012 """1013 Sets endpoint configuration property of AWS::ApiGateway::RestApi resource1014 :param rest_api: RestApi resource1015 :param string/dict value: Value to be set1016 """1017 if isinstance(value, dict) and value.get("Type"):1018 rest_api.Parameters = {"endpointConfigurationTypes": value.get("Type")}1019 rest_api.EndpointConfiguration = {"Types": [value.get("Type")]}1020 if "VPCEndpointIds" in value.keys():1021 rest_api.EndpointConfiguration["VpcEndpointIds"] = value.get("VPCEndpointIds")1022 else:1023 rest_api.EndpointConfiguration = {"Types": [value]}...
apigateway.py
Source:apigateway.py
2import jsbeautifier3def create_api_key(client, name):4 create_key = client.create_api_key(name=name)5 return create_key6def create_usage_plan(client, name):7 create_usage_plan = client.create_usage_plan(name=name)8 return create_usage_plan9def create_api(client, name):10 create_api = client.create_rest_api(name=name)11 return create_api12 13def list_api_keys(client):14 keys = [x['id'] for x in client.get_api_keys()['items']]15 return keys16def list_apis(client):17 list_apis = client.get_rest_apis()18 return list_apis19def delete_usage_plans(client):20 responses = []21 for plan in client.get_usage_plans()['items']:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!