Best Python code snippet using localstack_python
responses.py
Source:responses.py
...146 return self.delete_vpc_link()147 if request.method == "GET":148 return self.get_vpc_link()149 if request.method == "PATCH":150 return self.update_vpc_link()151 def vpc_links(self, request, full_url, headers):152 self.setup_class(request, full_url, headers)153 if request.method == "GET":154 return self.get_vpc_links()155 if request.method == "POST":156 return self.create_vpc_link()157 def create_api(self):158 params = json.loads(self.body)159 api_key_selection_expression = params.get("apiKeySelectionExpression")160 cors_configuration = params.get("corsConfiguration")161 credentials_arn = params.get("credentialsArn")162 description = params.get("description")163 disable_schema_validation = params.get("disableSchemaValidation")164 disable_execute_api_endpoint = params.get("disableExecuteApiEndpoint")165 name = params.get("name")166 protocol_type = params.get("protocolType")167 route_key = params.get("routeKey")168 route_selection_expression = params.get("routeSelectionExpression")169 tags = params.get("tags")170 target = params.get("target")171 version = params.get("version")172 if protocol_type not in ["HTTP", "WEBSOCKET"]:173 raise UnknownProtocol174 api = self.apigatewayv2_backend.create_api(175 api_key_selection_expression=api_key_selection_expression,176 cors_configuration=cors_configuration,177 credentials_arn=credentials_arn,178 description=description,179 disable_schema_validation=disable_schema_validation,180 disable_execute_api_endpoint=disable_execute_api_endpoint,181 name=name,182 protocol_type=protocol_type,183 route_key=route_key,184 route_selection_expression=route_selection_expression,185 tags=tags,186 target=target,187 version=version,188 )189 return 200, {}, json.dumps(api.to_json())190 def delete_api(self):191 api_id = self.path.split("/")[-1]192 self.apigatewayv2_backend.delete_api(api_id=api_id)193 return 200, "", "{}"194 def get_api(self):195 api_id = self.path.split("/")[-1]196 api = self.apigatewayv2_backend.get_api(api_id=api_id)197 return 200, {}, json.dumps(api.to_json())198 def get_apis(self):199 apis = self.apigatewayv2_backend.get_apis()200 return 200, {}, json.dumps({"items": [a.to_json() for a in apis]})201 def update_api(self):202 api_id = self.path.split("/")[-1]203 params = json.loads(self.body)204 api_key_selection_expression = params.get("apiKeySelectionExpression")205 cors_configuration = params.get("corsConfiguration")206 description = params.get("description")207 disable_schema_validation = params.get("disableSchemaValidation")208 disable_execute_api_endpoint = params.get("disableExecuteApiEndpoint")209 name = params.get("name")210 route_selection_expression = params.get("routeSelectionExpression")211 version = params.get("version")212 api = self.apigatewayv2_backend.update_api(213 api_id=api_id,214 api_key_selection_expression=api_key_selection_expression,215 cors_configuration=cors_configuration,216 description=description,217 disable_schema_validation=disable_schema_validation,218 disable_execute_api_endpoint=disable_execute_api_endpoint,219 name=name,220 route_selection_expression=route_selection_expression,221 version=version,222 )223 return 200, {}, json.dumps(api.to_json())224 def reimport_api(self):225 api_id = self.path.split("/")[-1]226 params = json.loads(self.body)227 body = params.get("body")228 fail_on_warnings = (229 str(self._get_param("failOnWarnings", "false")).lower() == "true"230 )231 api = self.apigatewayv2_backend.reimport_api(api_id, body, fail_on_warnings)232 return 201, {}, json.dumps(api.to_json())233 def create_authorizer(self):234 api_id = self.path.split("/")[-2]235 params = json.loads(self.body)236 auth_creds_arn = params.get("authorizerCredentialsArn")237 auth_payload_format_version = params.get("authorizerPayloadFormatVersion")238 auth_result_ttl = params.get("authorizerResultTtlInSeconds")239 authorizer_type = params.get("authorizerType")240 authorizer_uri = params.get("authorizerUri")241 enable_simple_response = params.get("enableSimpleResponses")242 identity_source = params.get("identitySource")243 identity_validation_expr = params.get("identityValidationExpression")244 jwt_config = params.get("jwtConfiguration")245 name = params.get("name")246 authorizer = self.apigatewayv2_backend.create_authorizer(247 api_id,248 auth_creds_arn=auth_creds_arn,249 auth_payload_format_version=auth_payload_format_version,250 auth_result_ttl=auth_result_ttl,251 authorizer_type=authorizer_type,252 authorizer_uri=authorizer_uri,253 enable_simple_response=enable_simple_response,254 identity_source=identity_source,255 identity_validation_expr=identity_validation_expr,256 jwt_config=jwt_config,257 name=name,258 )259 return 200, {}, json.dumps(authorizer.to_json())260 def delete_authorizer(self):261 api_id = self.path.split("/")[-3]262 authorizer_id = self.path.split("/")[-1]263 self.apigatewayv2_backend.delete_authorizer(api_id, authorizer_id)264 return 200, {}, "{}"265 def get_authorizer(self):266 api_id = self.path.split("/")[-3]267 authorizer_id = self.path.split("/")[-1]268 authorizer = self.apigatewayv2_backend.get_authorizer(api_id, authorizer_id)269 return 200, {}, json.dumps(authorizer.to_json())270 def update_authorizer(self):271 api_id = self.path.split("/")[-3]272 authorizer_id = self.path.split("/")[-1]273 params = json.loads(self.body)274 auth_creds_arn = params.get("authorizerCredentialsArn")275 auth_payload_format_version = params.get("authorizerPayloadFormatVersion")276 auth_result_ttl = params.get("authorizerResultTtlInSeconds")277 authorizer_type = params.get("authorizerType")278 authorizer_uri = params.get("authorizerUri")279 enable_simple_response = params.get("enableSimpleResponses")280 identity_source = params.get("identitySource")281 identity_validation_expr = params.get("identityValidationExpression")282 jwt_config = params.get("jwtConfiguration")283 name = params.get("name")284 authorizer = self.apigatewayv2_backend.update_authorizer(285 api_id,286 authorizer_id=authorizer_id,287 auth_creds_arn=auth_creds_arn,288 auth_payload_format_version=auth_payload_format_version,289 auth_result_ttl=auth_result_ttl,290 authorizer_type=authorizer_type,291 authorizer_uri=authorizer_uri,292 enable_simple_response=enable_simple_response,293 identity_source=identity_source,294 identity_validation_expr=identity_validation_expr,295 jwt_config=jwt_config,296 name=name,297 )298 return 200, {}, json.dumps(authorizer.to_json())299 def delete_cors_configuration(self):300 api_id = self.path.split("/")[-2]301 self.apigatewayv2_backend.delete_cors_configuration(api_id)302 return 200, {}, "{}"303 def create_model(self):304 api_id = self.path.split("/")[-2]305 params = json.loads(self.body)306 content_type = params.get("contentType")307 description = params.get("description")308 name = params.get("name")309 schema = params.get("schema")310 model = self.apigatewayv2_backend.create_model(311 api_id, content_type, description, name, schema312 )313 return 200, {}, json.dumps(model.to_json())314 def delete_model(self):315 api_id = self.path.split("/")[-3]316 model_id = self.path.split("/")[-1]317 self.apigatewayv2_backend.delete_model(api_id, model_id)318 return 200, {}, "{}"319 def get_model(self):320 api_id = self.path.split("/")[-3]321 model_id = self.path.split("/")[-1]322 model = self.apigatewayv2_backend.get_model(api_id, model_id)323 return 200, {}, json.dumps(model.to_json())324 def update_model(self):325 api_id = self.path.split("/")[-3]326 model_id = self.path.split("/")[-1]327 params = json.loads(self.body)328 content_type = params.get("contentType")329 description = params.get("description")330 name = params.get("name")331 schema = params.get("schema")332 model = self.apigatewayv2_backend.update_model(333 api_id,334 model_id,335 content_type=content_type,336 description=description,337 name=name,338 schema=schema,339 )340 return 200, {}, json.dumps(model.to_json())341 def get_tags(self):342 resource_arn = unquote(self.path.split("/tags/")[1])343 tags = self.apigatewayv2_backend.get_tags(resource_arn)344 return 200, {}, json.dumps({"tags": tags})345 def tag_resource(self):346 resource_arn = unquote(self.path.split("/tags/")[1])347 tags = json.loads(self.body).get("tags", {})348 self.apigatewayv2_backend.tag_resource(resource_arn, tags)349 return 201, {}, "{}"350 def untag_resource(self):351 resource_arn = unquote(self.path.split("/tags/")[1])352 tag_keys = self.querystring.get("tagKeys")353 self.apigatewayv2_backend.untag_resource(resource_arn, tag_keys)354 return 200, {}, "{}"355 def create_route(self):356 api_id = self.path.split("/")[-2]357 params = json.loads(self.body)358 api_key_required = params.get("apiKeyRequired", False)359 authorization_scopes = params.get("authorizationScopes")360 authorization_type = params.get("authorizationType", "NONE")361 authorizer_id = params.get("authorizerId")362 model_selection_expression = params.get("modelSelectionExpression")363 operation_name = params.get("operationName")364 request_models = params.get("requestModels")365 request_parameters = params.get("requestParameters")366 route_key = params.get("routeKey")367 route_response_selection_expression = params.get(368 "routeResponseSelectionExpression"369 )370 target = params.get("target")371 route = self.apigatewayv2_backend.create_route(372 api_id=api_id,373 api_key_required=api_key_required,374 authorization_scopes=authorization_scopes,375 authorization_type=authorization_type,376 authorizer_id=authorizer_id,377 model_selection_expression=model_selection_expression,378 operation_name=operation_name,379 request_models=request_models,380 request_parameters=request_parameters,381 route_key=route_key,382 route_response_selection_expression=route_response_selection_expression,383 target=target,384 )385 return 201, {}, json.dumps(route.to_json())386 def delete_route(self):387 api_id = self.path.split("/")[-3]388 route_id = self.path.split("/")[-1]389 self.apigatewayv2_backend.delete_route(api_id=api_id, route_id=route_id)390 return 200, {}, "{}"391 def delete_route_request_parameter(self):392 api_id = self.path.split("/")[-5]393 route_id = self.path.split("/")[-3]394 request_param = self.path.split("/")[-1]395 self.apigatewayv2_backend.delete_route_request_parameter(396 api_id, route_id, request_param397 )398 return 200, {}, "{}"399 def get_route(self):400 api_id = self.path.split("/")[-3]401 route_id = self.path.split("/")[-1]402 api = self.apigatewayv2_backend.get_route(api_id=api_id, route_id=route_id)403 return 200, {}, json.dumps(api.to_json())404 def get_routes(self):405 api_id = self.path.split("/")[-2]406 apis = self.apigatewayv2_backend.get_routes(api_id=api_id)407 return 200, {}, json.dumps({"items": [api.to_json() for api in apis]})408 def update_route(self):409 api_id = self.path.split("/")[-3]410 route_id = self.path.split("/")[-1]411 params = json.loads(self.body)412 api_key_required = params.get("apiKeyRequired")413 authorization_scopes = params.get("authorizationScopes")414 authorization_type = params.get("authorizationType")415 authorizer_id = params.get("authorizerId")416 model_selection_expression = params.get("modelSelectionExpression")417 operation_name = params.get("operationName")418 request_models = params.get("requestModels")419 request_parameters = params.get("requestParameters")420 route_key = params.get("routeKey")421 route_response_selection_expression = params.get(422 "routeResponseSelectionExpression"423 )424 target = params.get("target")425 api = self.apigatewayv2_backend.update_route(426 api_id=api_id,427 api_key_required=api_key_required,428 authorization_scopes=authorization_scopes,429 authorization_type=authorization_type,430 authorizer_id=authorizer_id,431 model_selection_expression=model_selection_expression,432 operation_name=operation_name,433 request_models=request_models,434 request_parameters=request_parameters,435 route_id=route_id,436 route_key=route_key,437 route_response_selection_expression=route_response_selection_expression,438 target=target,439 )440 return 200, {}, json.dumps(api.to_json())441 def create_route_response(self):442 api_id = self.path.split("/")[-4]443 route_id = self.path.split("/")[-2]444 params = json.loads(self.body)445 response_models = params.get("responseModels")446 route_response_key = params.get("routeResponseKey")447 model_selection_expression = params.get("modelSelectionExpression")448 route_response = self.apigatewayv2_backend.create_route_response(449 api_id,450 route_id,451 route_response_key,452 model_selection_expression=model_selection_expression,453 response_models=response_models,454 )455 return 200, {}, json.dumps(route_response.to_json())456 def delete_route_response(self):457 api_id = self.path.split("/")[-5]458 route_id = self.path.split("/")[-3]459 route_response_id = self.path.split("/")[-1]460 self.apigatewayv2_backend.delete_route_response(461 api_id, route_id, route_response_id462 )463 return 200, {}, "{}"464 def get_route_response(self):465 api_id = self.path.split("/")[-5]466 route_id = self.path.split("/")[-3]467 route_response_id = self.path.split("/")[-1]468 route_response = self.apigatewayv2_backend.get_route_response(469 api_id, route_id, route_response_id470 )471 return 200, {}, json.dumps(route_response.to_json())472 def create_integration(self):473 api_id = self.path.split("/")[-2]474 params = json.loads(self.body)475 connection_id = params.get("connectionId")476 connection_type = params.get("connectionType")477 content_handling_strategy = params.get("contentHandlingStrategy")478 credentials_arn = params.get("credentialsArn")479 description = params.get("description")480 integration_method = params.get("integrationMethod")481 integration_subtype = params.get("integrationSubtype")482 integration_type = params.get("integrationType")483 integration_uri = params.get("integrationUri")484 passthrough_behavior = params.get("passthroughBehavior")485 payload_format_version = params.get("payloadFormatVersion")486 request_parameters = params.get("requestParameters")487 request_templates = params.get("requestTemplates")488 response_parameters = params.get("responseParameters")489 template_selection_expression = params.get("templateSelectionExpression")490 timeout_in_millis = params.get("timeoutInMillis")491 tls_config = params.get("tlsConfig")492 integration = self.apigatewayv2_backend.create_integration(493 api_id=api_id,494 connection_id=connection_id,495 connection_type=connection_type,496 content_handling_strategy=content_handling_strategy,497 credentials_arn=credentials_arn,498 description=description,499 integration_method=integration_method,500 integration_subtype=integration_subtype,501 integration_type=integration_type,502 integration_uri=integration_uri,503 passthrough_behavior=passthrough_behavior,504 payload_format_version=payload_format_version,505 request_parameters=request_parameters,506 request_templates=request_templates,507 response_parameters=response_parameters,508 template_selection_expression=template_selection_expression,509 timeout_in_millis=timeout_in_millis,510 tls_config=tls_config,511 )512 return 200, {}, json.dumps(integration.to_json())513 def get_integration(self):514 api_id = self.path.split("/")[-3]515 integration_id = self.path.split("/")[-1]516 integration = self.apigatewayv2_backend.get_integration(517 api_id=api_id, integration_id=integration_id518 )519 return 200, {}, json.dumps(integration.to_json())520 def get_integrations(self):521 api_id = self.path.split("/")[-2]522 integrations = self.apigatewayv2_backend.get_integrations(api_id=api_id)523 return 200, {}, json.dumps({"items": [i.to_json() for i in integrations]})524 def delete_integration(self):525 api_id = self.path.split("/")[-3]526 integration_id = self.path.split("/")[-1]527 self.apigatewayv2_backend.delete_integration(528 api_id=api_id, integration_id=integration_id,529 )530 return 200, {}, "{}"531 def update_integration(self):532 api_id = self.path.split("/")[-3]533 integration_id = self.path.split("/")[-1]534 params = json.loads(self.body)535 connection_id = params.get("connectionId")536 connection_type = params.get("connectionType")537 content_handling_strategy = params.get("contentHandlingStrategy")538 credentials_arn = params.get("credentialsArn")539 description = params.get("description")540 integration_method = params.get("integrationMethod")541 integration_subtype = params.get("integrationSubtype")542 integration_type = params.get("integrationType")543 integration_uri = params.get("integrationUri")544 passthrough_behavior = params.get("passthroughBehavior")545 payload_format_version = params.get("payloadFormatVersion")546 request_parameters = params.get("requestParameters")547 request_templates = params.get("requestTemplates")548 response_parameters = params.get("responseParameters")549 template_selection_expression = params.get("templateSelectionExpression")550 timeout_in_millis = params.get("timeoutInMillis")551 tls_config = params.get("tlsConfig")552 integration = self.apigatewayv2_backend.update_integration(553 api_id=api_id,554 connection_id=connection_id,555 connection_type=connection_type,556 content_handling_strategy=content_handling_strategy,557 credentials_arn=credentials_arn,558 description=description,559 integration_id=integration_id,560 integration_method=integration_method,561 integration_subtype=integration_subtype,562 integration_type=integration_type,563 integration_uri=integration_uri,564 passthrough_behavior=passthrough_behavior,565 payload_format_version=payload_format_version,566 request_parameters=request_parameters,567 request_templates=request_templates,568 response_parameters=response_parameters,569 template_selection_expression=template_selection_expression,570 timeout_in_millis=timeout_in_millis,571 tls_config=tls_config,572 )573 return 200, {}, json.dumps(integration.to_json())574 def create_integration_response(self):575 api_id = self.path.split("/")[-4]576 int_id = self.path.split("/")[-2]577 params = json.loads(self.body)578 content_handling_strategy = params.get("contentHandlingStrategy")579 integration_response_key = params.get("integrationResponseKey")580 response_parameters = params.get("responseParameters")581 response_templates = params.get("responseTemplates")582 template_selection_expression = params.get("templateSelectionExpression")583 integration_response = self.apigatewayv2_backend.create_integration_response(584 api_id=api_id,585 integration_id=int_id,586 content_handling_strategy=content_handling_strategy,587 integration_response_key=integration_response_key,588 response_parameters=response_parameters,589 response_templates=response_templates,590 template_selection_expression=template_selection_expression,591 )592 return 200, {}, json.dumps(integration_response.to_json())593 def delete_integration_response(self):594 api_id = self.path.split("/")[-5]595 int_id = self.path.split("/")[-3]596 int_res_id = self.path.split("/")[-1]597 self.apigatewayv2_backend.delete_integration_response(598 api_id=api_id, integration_id=int_id, integration_response_id=int_res_id599 )600 return 200, {}, "{}"601 def get_integration_response(self):602 api_id = self.path.split("/")[-5]603 int_id = self.path.split("/")[-3]604 int_res_id = self.path.split("/")[-1]605 int_response = self.apigatewayv2_backend.get_integration_response(606 api_id=api_id, integration_id=int_id, integration_response_id=int_res_id607 )608 return 200, {}, json.dumps(int_response.to_json())609 def get_integration_responses(self):610 api_id = self.path.split("/")[-4]611 int_id = self.path.split("/")[-2]612 int_response = self.apigatewayv2_backend.get_integration_responses(613 api_id=api_id, integration_id=int_id614 )615 return 200, {}, json.dumps({"items": [res.to_json() for res in int_response]})616 def update_integration_response(self):617 api_id = self.path.split("/")[-5]618 int_id = self.path.split("/")[-3]619 int_res_id = self.path.split("/")[-1]620 params = json.loads(self.body)621 content_handling_strategy = params.get("contentHandlingStrategy")622 integration_response_key = params.get("integrationResponseKey")623 response_parameters = params.get("responseParameters")624 response_templates = params.get("responseTemplates")625 template_selection_expression = params.get("templateSelectionExpression")626 integration_response = self.apigatewayv2_backend.update_integration_response(627 api_id=api_id,628 integration_id=int_id,629 integration_response_id=int_res_id,630 content_handling_strategy=content_handling_strategy,631 integration_response_key=integration_response_key,632 response_parameters=response_parameters,633 response_templates=response_templates,634 template_selection_expression=template_selection_expression,635 )636 return 200, {}, json.dumps(integration_response.to_json())637 def create_vpc_link(self):638 params = json.loads(self.body)639 name = params.get("name")640 sg_ids = params.get("securityGroupIds")641 subnet_ids = params.get("subnetIds")642 tags = params.get("tags")643 vpc_link = self.apigatewayv2_backend.create_vpc_link(644 name, sg_ids, subnet_ids, tags645 )646 return 200, {}, json.dumps(vpc_link.to_json())647 def delete_vpc_link(self):648 vpc_link_id = self.path.split("/")[-1]649 self.apigatewayv2_backend.delete_vpc_link(vpc_link_id)650 return 200, {}, "{}"651 def get_vpc_link(self):652 vpc_link_id = self.path.split("/")[-1]653 vpc_link = self.apigatewayv2_backend.get_vpc_link(vpc_link_id)654 return 200, {}, json.dumps(vpc_link.to_json())655 def get_vpc_links(self):656 vpc_links = self.apigatewayv2_backend.get_vpc_links()657 return 200, {}, json.dumps({"items": [l.to_json() for l in vpc_links]})658 def update_vpc_link(self):659 vpc_link_id = self.path.split("/")[-1]660 params = json.loads(self.body)661 name = params.get("name")662 vpc_link = self.apigatewayv2_backend.update_vpc_link(vpc_link_id, name=name)...
test_apigatewayv2_vpclinks.py
Source:test_apigatewayv2_vpclinks.py
...84 client.delete_vpc_link(VpcLinkId=vpc_link_id)85 links = client.get_vpc_links()["Items"]86 links.should.have.length_of(0)87@mock_apigatewayv288def test_update_vpc_link():89 client = boto3.client("apigatewayv2", region_name="eu-north-1")90 vpc_link_id = client.create_vpc_link(91 Name="vpcl",92 SecurityGroupIds=["sg1", "sg2"],93 SubnetIds=["sid1", "sid2"],94 Tags={"key1": "value1"},95 )["VpcLinkId"]96 resp = client.update_vpc_link(VpcLinkId=vpc_link_id, Name="vpcl2")97 resp.should.have.key("CreatedDate")98 resp.should.have.key("Name").equals("vpcl2")99 resp.should.have.key("SecurityGroupIds").equals(["sg1", "sg2"])100 resp.should.have.key("SubnetIds").equals(["sid1", "sid2"])101 resp.should.have.key("Tags").equals({"key1": "value1"})102 resp.should.have.key("VpcLinkId")103 resp.should.have.key("VpcLinkStatus").equals("AVAILABLE")104 resp.should.have.key("VpcLinkVersion").equals("V2")105@mock_apigatewayv2106def test_untag_vpc_link():107 client = boto3.client("apigatewayv2", region_name="eu-west-1")108 vpc_link_id = client.create_vpc_link(109 Name="vpcl",110 SecurityGroupIds=["sg1", "sg2"],...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!