Best Python code snippet using localstack_python
client.py
Source:client.py
...105 def list_tags_for_resources(self, ResourceType: str, ResourceIds: List) -> Dict:106 pass107 def list_traffic_policies(self, TrafficPolicyIdMarker: str = None, MaxItems: str = None) -> Dict:108 pass109 def list_traffic_policy_instances(self, HostedZoneIdMarker: str = None, TrafficPolicyInstanceNameMarker: str = None, TrafficPolicyInstanceTypeMarker: str = None, MaxItems: str = None) -> Dict:110 pass111 def list_traffic_policy_instances_by_hosted_zone(self, HostedZoneId: str, TrafficPolicyInstanceNameMarker: str = None, TrafficPolicyInstanceTypeMarker: str = None, MaxItems: str = None) -> Dict:112 pass113 def list_traffic_policy_instances_by_policy(self, TrafficPolicyId: str, TrafficPolicyVersion: int, HostedZoneIdMarker: str = None, TrafficPolicyInstanceNameMarker: str = None, TrafficPolicyInstanceTypeMarker: str = None, MaxItems: str = None) -> Dict:114 pass115 def list_traffic_policy_versions(self, Id: str, TrafficPolicyVersionMarker: str = None, MaxItems: str = None) -> Dict:116 pass117 def list_vpc_association_authorizations(self, HostedZoneId: str, NextToken: str = None, MaxResults: str = None) -> Dict:118 pass119 def test_dns_answer(self, HostedZoneId: str, RecordName: str, RecordType: str, ResolverIP: str = None, EDNS0ClientSubnetIP: str = None, EDNS0ClientSubnetMask: str = None) -> Dict:120 pass121 def update_health_check(self, HealthCheckId: str, HealthCheckVersion: int = None, IPAddress: str = None, Port: int = None, ResourcePath: str = None, FullyQualifiedDomainName: str = None, SearchString: str = None, FailureThreshold: int = None, Inverted: bool = None, Disabled: bool = None, HealthThreshold: int = None, ChildHealthChecks: List = None, EnableSNI: bool = None, Regions: List = None, AlarmIdentifier: Dict = None, InsufficientDataHealthStatus: str = None, ResetElements: List = None) -> Dict:122 pass123 def update_hosted_zone_comment(self, Id: str, Comment: str = None) -> Dict:...
route53_client.py
Source:route53_client.py
1"""2AWS route-53 client to handle route-53 service API requests.3"""4import sys5import os6from horey.aws_api.aws_clients.boto3_client import Boto3Client7from horey.aws_api.aws_services_entities.route53_hosted_zone import HostedZone8from horey.h_logger import get_logger9import pdb10logger = get_logger()11class Route53Client(Boto3Client):12 """13 Client to handle specific aws service API calls.14 """15 def __init__(self):16 client_name = "route53"17 super().__init__(client_name)18 def get_all_hosted_zones(self, full_information=True, name=None):19 """20 Get all osted zones21 :param full_information:22 :return:23 """24 final_result = list()25 if name is not None and not name.endswith("."):26 name += "."27 for response in self.execute(self.client.list_hosted_zones, "HostedZones"):28 obj = HostedZone(response)29 if name is not None and obj.name != name:30 continue31 if full_information:32 self.get_hosted_zone_full_information(obj)33 final_result.append(obj)34 return final_result35 def get_hosted_zone_full_information(self, hsoted_zone):36 hsoted_zone.records = []37 for update_info in self.execute(self.client.list_resource_record_sets, "ResourceRecordSets",38 filters_req={"HostedZoneId": hsoted_zone.id}):39 hsoted_zone.update_record_set(update_info)40 def change_resource_record_sets(self, name):41 pdb.set_trace()42 ret = self.get_all_hosted_zones()43 for response in self.execute(self.client.list_traffic_policy_instances, "HostedZones", raw_data=True):44 pdb.set_trace()45 def provision_hosted_zone(self, hosted_zone):46 changes = []47 for record in hosted_zone.records:48 change = {49 'Action': 'UPSERT',50 'ResourceRecordSet': {51 'Name': record.name,52 'Type': record.type,53 'TTL': record.ttl,54 'ResourceRecords': record.resource_records55 }56 }57 changes.append(change)58 hosted_zones = self.get_all_hosted_zones(name=hosted_zone.name)59 if len(hosted_zones) > 1:60 raise ValueError(f"More then 1 '{hosted_zone.name}' hosted_zone found: {len(hosted_zones)}")61 if len(hosted_zones) == 1:62 hosted_zone.update_from_raw_response(hosted_zones[0].dict_src)63 else:64 request = hosted_zone.generate_create_request()65 response = self.raw_create_hosted_zone(request)66 hosted_zone.id = response["Id"]67 self.associate_hosted_zone(hosted_zone)68 if len(changes) != 0:69 request = {"HostedZoneId": hosted_zone.id, "ChangeBatch": {"Changes": changes}}70 self.raw_change_resource_record_sets(request)71 hosted_zones = self.get_all_hosted_zones(name=hosted_zone.name)72 hosted_zone.update_from_raw_response(hosted_zones[0].dict_src)73 hosted_zone.records = hosted_zones[0].records74 def update(self, hosted_zone):75 hosted_zones = self.get_all_hosted_zones(name=hosted_zone.name)76 if len(hosted_zones) > 1:77 raise ValueError(f"More then 1 hosted_zone found: {len(hosted_zones)}")78 hosted_zone.updated(hosted_zones[0].dict_src)79 def associate_hosted_zone(self, hosted_zone):80 for vpc_association in hosted_zone.vpc_associations:81 associate_request = {"HostedZoneId": hosted_zone.id,82 "VPC": vpc_association83 }84 self.raw_associate_vpc_with_hosted_zone(associate_request)85 def raw_create_hosted_zone(self, request_dict):86 logger.info(f"Creating hosted zone: {request_dict}")87 for response in self.execute(self.client.create_hosted_zone, "HostedZone", filters_req=request_dict):88 return response89 def raw_associate_vpc_with_hosted_zone(self, request_dict):90 logger.info(f"Associating VPC with hosted zone: {request_dict}")91 try:92 for response in self.execute(self.client.associate_vpc_with_hosted_zone, "ChangeInfo", filters_req=request_dict):93 return response94 except Exception as exception_instance:95 repr_exception = repr(exception_instance)96 logger.warning(repr_exception)97 if "ConflictingDomainExists" not in repr_exception:98 raise99 def raw_change_resource_record_sets(self, request_dict):100 logger.info(f"Updating hosted zone record set: {request_dict}")101 try:102 for response in self.execute(self.client.change_resource_record_sets, "ChangeInfo", filters_req=request_dict):103 return response104 except Exception as exception_instance:105 repr_exception = repr(exception_instance)106 if "already exists" not in repr_exception:107 raise108 logger.warning(repr_exception)109 def update_hosted_zone_information(self, hosted_zone, full_information=False):110 hosted_zone_name = hosted_zone.name.strip(".")111 filters_req = {"DNSName": hosted_zone_name}112 for response in self.execute(self.client.list_hosted_zones_by_name, "HostedZones", filters_req=filters_req):113 if response["Name"].strip(".") == hosted_zone_name:114 hosted_zone.update_from_raw_response(response)115 break116 else:117 raise RuntimeError(f"Can not find hosted_zone by name: '{hosted_zone.name}'")118 if full_information:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!