Best Python code snippet using localstack_python
WexAccount.py
Source:WexAccount.py
1#!/usr/bin/env python32import boto33import json4import os5import re6import tempfile7import logging8from argparse import Namespace9system_cleanup = set([10 'ResponseMetadata',11 'MaxResults',12 'IsTruncated',13 'MaxItems',14 'Marker',15 'NextMarker',16 'NextToken',17 ])18aws = {19 'regions': {20 'global': None,21 'service': 'ec2',22 'command': 'describe_regions',23 'match': 'RegionName',24 },25 'vpc-association-authorizations': {26 'global': None,27 'dep': {28 'hosted-zones': None,29 },30 'service': 'route53',31 'command': 'list_vpc_association_authorizations',32 'cleanup': set(['HostedZoneId']),33 },34 'hosted-zone-associations': {35 'global': None,36 'dep': {37 'hosted-zones': None,38 },39 'service': 'route53',40 'command': 'get_hosted_zone',41 'cleanup': set(['HostedZone']),42 },43 'hosted-zones': {44 'global': None,45 'service': 'route53',46 'command': 'list_hosted_zones',47 },48 'vpcs': {49 'service': 'ec2',50 'command': 'describe_vpcs',51 },52 'availability-zones': {53 'dep': {54 'vpcs': None,55 },56 'service': 'ec2',57 'command': 'describe_availability_zones',58 },59 'subnets': {60 'service': 'ec2',61 'command': 'describe_subnets',62 'match': 'SubnetId',63 },64 'security-groups': {65 'service': 'ec2',66 'command': 'describe_security_groups',67 },68 'security-group-references': {69 'dep': {70 'security-groups': {71 'GroupId': ['GroupId'], # TODO command uses list(...)72 },73 },74 'service': 'ec2',75 'command': 'describe_security_group_references',76 },77 'vpc-peering-connections': {78 'service': 'ec2',79 'command': 'describe_vpc_peering_connections',80 },81 'route-tables': {82 'service': 'ec2',83 'command': 'describe_route_tables',84 },85 'prefix-lists': {86 'service': 'ec2',87 'command': 'describe_prefix_lists',88 },89 'resolver-endpoints': {90 'service': 'route53resolver',91 'command': 'list_resolver_endpoints',92 'skip-regions': [93 'sa-east-1', # Route 53 Resolver unsupported in São Paulo94 'me-south-1', # Route 53 Resolver unsupported in Bahrain95 ],96 },97 'resolver-endpoint-ip-addresses': {98 'dep': {99 'resolver-endpoints': {100 'Id': 'ResolverEndpointId', # TODO command uses string101 },102 },103 'service': 'route53resolver',104 'command': 'list_resolver_endpoint_ip_addresses',105 'skip-regions': [106 'sa-east-1',107 'me-south-1',108 ],109 },110 'resolver-rules': {111 'dep': {112 'vpcs': None,113 },114 'service': 'route53resolver',115 'command': 'list_resolver_rules',116 'skip-regions': [117 'sa-east-1',118 'me-south-1',119 ],120 },121 'resolver-rule-associations': {122 'dep': {123 'vpcs': None,124 },125 'service': 'route53resolver',126 'command': 'list_resolver_rule_associations',127 'skip-regions': [128 'sa-east-1',129 'me-south-1',130 ],131 },132 }133aws_region_limit = set([134 'us-east-1',135 'us-west-2',136 'eu-central-1',137 'eu-west-1',138 'ap-southeast-1',139 'ap-southeast-2',140 ])141class WexAccount:142 def __init__(self, args, profile):143 self.logger = logging.getLogger('WexAccount')144 self.logger.setLevel(args.logging)145 ch = logging.StreamHandler()146 ch.setLevel(args.logging)147 formatter = logging.Formatter(148 '%(asctime)s - %(name)s - %(levelname)s - %(message)s')149 ch.setFormatter(formatter)150 self.logger.addHandler(ch)151 self.profile = profile152 self.data = dict([153 [name, dict()]154 for name155 in aws.keys()156 ])157 self.retrieve_all()158 def cache_data(self, key, value=None):159 tempdir = (f'{tempfile.gettempdir()}/{self.profile}')160 if not os.path.isdir(f'{tempdir}'):161 os.mkdir(f'{tempdir}')162 if value is not None:163 with open(f'{tempdir}/{key}.json', 'w') as f:164 json.dump(value, f)165 return166 try:167 with open(f'{tempdir}/{key}.json') as f:168 value = json.load(f)169 except FileNotFoundError:170 pass171 return value172 def aws_items(self, args):173 value = list()174 self.logger.info(f'loading {args} ..')175 item, region = args.pop('Item'), args.pop('Region')176 session = boto3.Session(profile_name=self.profile, region_name=region)177 client = session.client(aws[item]['service'])178 try:179 while True:180 page = getattr(client, aws[item]['command'])(**args)181 keys = page.keys() - system_cleanup182 # if extra cleanup is required - do it now183 if 'cleanup' in aws[item]:184 keys -= aws[item]['cleanup']185 # at this point there should be only one key; bail if more186 if len(keys) != 1:187 raise ValueError(f'Unexpected keys: {keys}')188 value += page[list(keys)[0]]189 if 'NextToken' not in page:190 if 'IsTruncated' not in page or \191 not json.loads(f'{page["IsTruncated"]}'.lower()):192 break193 # more results expected; process NextToken/NextMarker194 if 'NextMarker' in page:195 args['Marker'] = page['NextMarker']196 elif 'NextToken' in page:197 args['NextToken'] = page['NextToken']198 else:199 raise ValueError(f'Internal error: {page}')200 self.logger.info(f'\t.. more results expected: {args}')201 except KeyboardInterrupt:202 raise203 except Exception as e:204 print(f'Internal error {item}/{region}: {e}')205 raise206 return value207 def resolve_dependencies(self):208 '''209 Returns AWS service names in the correct order; TODO deadlock detection210 '''211 regional, worldwide, objects = list(), list(), list(aws.keys())212 while True:213 name = objects.pop(0)214 dst = worldwide if 'global' in aws[name] else regional215 dep = list() if 'dep' not in aws[name] else list(aws[name]['dep'])216 for index in range(len(dst)):217 if not dep:218 dst.insert(index, name)219 name = None220 break221 if dst[index] in dep:222 dep.remove(dst[index])223 if name:224 if dep:225 objects.append(name)226 else:227 dst.append(name)228 if not objects:229 break230 return worldwide + regional231 def retrieve_all(self):232 for name in self.resolve_dependencies():233 # check if we have cached data stored under `/tmp/{profile}`234 cached = self.cache_data(name)235 if cached is not None:236 self.data[name] = cached237 continue # data is loaded from cache; don't reload238 self.logger.info(f'Retrieving {self.profile}:{name}')239 if 'global' in aws[name]:240 if name in [241 'vpc-association-authorizations',242 'hosted-zone-associations',243 ]:244 zones = [245 re.sub('.*/', '', zone['Id'])246 for zone in self.data['hosted-zones']247 if zone['Config']['PrivateZone']248 ]249 for zone in zones:250 args = {251 'Item': name,252 'Region': 'us-east-1',253 }254 if name == 'vpc-association-authorizations':255 args['HostedZoneId'] = zone256 elif name == 'hosted-zone-associations':257 args['Id'] = zone258 else:259 raise ValueError(f'{name} should not be here')260 self.data[name][zone] = self.aws_items(args)261 else:262 args = {263 'Item': name,264 'Region': 'us-east-1',265 }266 self.data[name] = self.aws_items(args)267 else:268 # collect data for all regions (minus unsupported)269 regions = set([270 region['RegionName']271 for region272 in self.data['regions']273 if 'skip-regions'274 not in aws[name]275 or region['RegionName']276 not in aws[name]['skip-regions']277 ])278 for region in aws_region_limit.intersection(regions):279 # TODO avoid copy/paste, make it a table lookup280 if name in [281 'resolver-endpoint-ip-addresses',282 ]:283 self.data[name][region] = dict()284 for id in [285 id['Id']286 for id287 in self.data['resolver-endpoints'][region]288 ]:289 args = {290 'Item': name,291 'Region': region,292 'ResolverEndpointId': id,293 }294 self.data[name][region][id] = self.aws_items(args)295 elif name in [296 'security-group-references',297 ]:298 self.data[name][region] = dict()299 for id in [300 id['GroupId']301 for id302 in self.data['security-groups'][region]303 ]:304 args = {305 'Item': name,306 'Region': region,307 'GroupId': [id],308 }309 self.data[name][region][id] = self.aws_items(args)310 else:311 args = {312 'Item': name,313 'Region': region,314 }315 self.data[name][region] = self.aws_items(args)316 self.cache_data(name, self.data[name])317 return self318 def retrieve_object(self, args):319 '''320 args = {321 'name': (subnets|vpcs|...),322 'match': (subnet_id|vpc_id|...),323 'region': region_name, # optional324 }325 '''326 src = self.data[args['name']]327 if 'global' not in aws[args['name']]:328 # region is present; use data from args['region'] only329 src = src[args['region']]330 if isinstance(src, dict):331 if args['match'] in src:332 return src[args['match']]333 elif isinstance(src, list):334 for obj in src:335 if obj[aws[args['name']]['match']] == args['match']:336 return obj337 self.logger.warn(f'Object not found: {json.dumps(args)}')338 return None339 def resolver_ip_public(self, region_name, ip):340 '''341 Per requirement we only care about 'Name' tag to contain342 'private' substring343 '''344 subnet = self.retrieve_object({345 'name': 'subnets',346 'region': region_name,347 'match': ip['SubnetId'],348 })349 if 'Tags' not in subnet:350 self.logger.warn(f'Subnet is untagged: {json.dumps(subnet)}')351 return False352 value = None353 for pair in subnet['Tags']:354 if 'Key' not in pair or 'Value' not in pair:355 continue356 if pair['Key'] != 'Name':357 continue358 value = pair['Value'].lower().find('private') == -1359 break360 if value is None:361 self.logger.warn(f'Subnet is tagged but no `Name` tag:'362 f'{json.dumps(subnet)}')363 return value364 def resolver_ip_az(self, region_name, ip):365 subnet = self.retrieve_object({366 'name': 'subnets',367 'region': region_name,368 'match': ip['SubnetId'],369 })370 return subnet['AvailabilityZoneId']371 def is_resolver_endpoint_valid(self, region_name, endpoint):372 '''373 WEX Inc. requirements:374 1. 2 IP addresses375 2. Different AZs376 3. Private subnets377 '''378 endpoint_ips = self.retrieve_object({379 'name': 'resolver-endpoint-ip-addresses',380 'region': region_name,381 'match': endpoint['Id'],382 })383 endpoints = dict([384 [385 ip["IpId"], {386 'az': self.resolver_ip_az(region_name, ip),387 'pub': self.resolver_ip_public(region_name, ip),388 }389 ]390 for ip in endpoint_ips391 ])392 private = dict([393 endpoint394 for endpoint395 in endpoints.items()396 if not endpoint[1]['pub']397 ])398 private_azs = set([399 endpoint[1]['az']400 for endpoint401 in private.items()402 ])403 if len(private) != 2:404 self.logger.warn(f'Endpoint has {len(private)} private IPs:'405 f' {endpoint["Id"]}')406 if len(private_azs) != 2:407 self.logger.warn(f'Endpoint has {len(private)} AZs:'408 f' {endpoint["Id"]}')409 return len(private) == 2 and len(private_azs) == 2410 def r53_resolver_outpoints(self, region_name, vpc_id):411 return [412 endpoint413 for endpoint414 in self.data['resolver-endpoints'][region_name]415 if endpoint['Direction'] == 'OUTBOUND'416 and endpoint['HostVPCId'] == vpc_id417 ]418if __name__ == '__main__':419 wex = WexAccount(Namespace(logging='DEBUG'), "wex-189106039250")420 endpoint = wex.data['resolver-endpoints']['us-east-1'][0]421 wex.is_resolver_endpoint_valid('us-east-1', endpoint)422 exit(0)423 if True:424 print(wex.retrieve_object({425 'name': 'resolver-endpoint-ip-addresses',426 'region': 'us-east-1',427 'match': 'rslvr-in-ca740e04c9c840849',428 }))429 if True:430 print(wex.retrieve_object({431 'name': 'regions',432 'match': 'us-east-1',...
ec2.py
Source:ec2.py
1import logging2from typing import Any, Dict, Generator, Iterator, List, Tuple3import botocore.exceptions4from introspector import ImportWriter, PathStack5from introspector.aws.fetch import Proxy, ServiceProxy6from introspector.aws.svc import RegionalService, ServiceSpec, resource_gate7_log = logging.getLogger(__name__)8def _synthesize_defaults(proxy: ServiceProxy,9 region: str) -> Iterator[Tuple[str, Any]]:10 defaults = {'name': 'Defaults', 'uri': f'ec2/defaults/{region}'}11 # TODO: update permissions12 # key_id_resp = proxy.get('get_ebs_default_kms_key_id')13 # defaults['EbsDefaultKmsKeyId'] = key_id_resp.get('KmsKeyId')14 encryption_resp = proxy.get('get_ebs_encryption_by_default')15 defaults['EbsEncryptionByDefault'] = encryption_resp[16 'EbsEncryptionByDefault']17 yield 'Defaults', defaults18def _add_security_group_references(proxy: ServiceProxy, response: Dict):19 security_groups = response.get('SecurityGroups', [])20 for security_group in security_groups:21 group_id = security_group['GroupId']22 result = proxy.list('describe_security_group_references', GroupId=group_id)23 if result is not None:24 # everything is mutable, sigh...25 security_group['references'] = result[1].get('SecurityGroupReferenceSet',26 [])27def _add_user_data(proxy: ServiceProxy, response: Dict):28 reservations = response.get('Reservations', [])29 for reservation in reservations:30 instances = reservation.get('Instances', [])31 for instance in instances:32 instance_id = instance['InstanceId']33 try:34 user_data = proxy.get('describe_instance_attribute',35 InstanceId=instance_id,36 Attribute='userData')37 if user_data is not None:38 instance['UserData'] = user_data.get('UserData', {}).get('Value')39 except botocore.exceptions.ClientError:40 pass41def _add_launch_permissions(proxy: ServiceProxy, response: Dict):42 snapshots = response.get('Snapshots', [])43 for snapshot in snapshots:44 snapshot_id = snapshot['SnapshotId']45 permission_resp = proxy.get('describe_snapshot_attribute',46 SnapshotId=snapshot_id,47 Attribute='createVolumePermission')48 permissions = permission_resp.get('CreateVolumePermissions', [])49 snapshot['CreateVolumePermissions'] = permissions50def _add_image_attributes(proxy: ServiceProxy, response: Dict[str, Any]):51 images = response.get('Images', [])52 for image in images:53 launch_permission = proxy.get('describe_image_attribute',54 Attribute='launchPermission',55 ImageId=image['ImageId'])56 if launch_permission is not None:57 image['LaunchPermissions'] = launch_permission.get(58 'LaunchPermissions', [])59 else:60 image['LaunchPermissions'] = []61RESOURCES = [62 'Addresses', 'FlowLogs', 'Images', 'Instances', 'KeyPairs',63 'NetworkInterfaces', 'RouteTables', 'SecurityGroups', 'Snapshots',64 'Subnets', 'Volumes', 'VpcPeeringConnections', 'Vpcs', 'VpcEndpoints'65]66def _import_ec2_region(67 proxy: ServiceProxy, region: str,68 spec: ServiceSpec) -> Generator[Tuple[str, Any], None, None]:69 for resource in proxy.resource_names():70 _log.info(f'importing {resource}')71 op_name = proxy._impl._client._PY_TO_OP_NAME[resource]72 op_resource = op_name[len('Describe'):]73 if op_resource in RESOURCES:74 if resource_gate(spec, op_resource):75 result = proxy.list(resource)76 if result is not None:77 if resource == 'describe_instances':78 _add_user_data(proxy, result[1])79 elif resource == 'describe_snapshots':80 _add_launch_permissions(proxy, result[1])81 elif resource == 'describe_images':82 _add_image_attributes(proxy, result[1])83 elif resource == 'describe_security_groups':84 _add_security_group_references(proxy, result[1])85 yield result[0], result[1]86 _log.info(f'done with {resource}')87 if resource_gate(spec, 'Defaults'):88 yield from _synthesize_defaults(proxy, region)89SVC = RegionalService('ec2', _import_ec2_region)90def add_amis_to_import_job(proxy: Proxy, writer: ImportWriter, ps: PathStack,91 region: str, amis: List[str]) -> str:92 ps = ps.scope(region)93 service_proxy = proxy.service('ec2', region)94 result = service_proxy.list(95 'describe_images',96 ImageIds=amis,97 # Remove the default filters98 Filters=[])99 _log.debug(f'describe images result {result}')100 if result is not None:101 resource_name = result[0]102 raw_resources = result[1]103 # We can't add launch permissions here because we don't own the images104 #_add_image_attributes(service_proxy, raw_resources)105 writer(ps, resource_name, raw_resources, {'region': region})...
main.py
Source:main.py
...16print('\nExternally Referenced Security Groups:')17list_length = 20018external_sgs = set()19for sublist in [security_groups[i:i+list_length] for i in range(0, len(security_groups), list_length)]:20 response = ec2_client.describe_security_group_references(21 GroupId=[sg['GroupId'] for sg in sublist]22 )23 external_sgs.update(set(response['SecurityGroupReferenceSet']))24print(external_sgs)25print('\nStale Security Groups')26stale_sgs = set()27paginator = ec2_client.get_paginator('describe_stale_security_groups')28page_iterator = paginator.paginate(VpcId=security_groups[0]['VpcId'])29for page in page_iterator:30 stale_sgs.update(set(page['StaleSecurityGroupSet']))31print('Lambda Security Groups:')32paginator = lambda_client.get_paginator('list_functions')33page_iterator = paginator.paginate()34lambda_functions = []...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!