Best Python code snippet using localstack_python
region_collector.py
Source:region_collector.py
1# Copyright 2019-2022 Foreseeti AB <https://foreseeti.com>2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# https://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from __future__ import annotations15import json16import logging17from datetime import datetime, timedelta18from threading import Lock19from typing import Any, Callable, Optional20from boto3.session import Session21from botocore.client import BaseClient22from botocore.exceptions import ClientError23from securicad.aws_collector import utils24log = logging.getLogger("securicad-aws-collector")25def collect(26 credentials: dict[str, str],27 region_data: dict[str, Any],28 include_inspector: bool,29 include_guardduty: bool,30 threads: Optional[int],31) -> None:32 session = Session(**credentials, region_name=region_data["region_name"]) # type: ignore33 region_data.update(34 get_region_data(session, include_inspector, include_guardduty, threads)35 )36def wafv2_get_web_acls_wrapper(37 scope: str, unpaginated: utils.UnpaginatedCallable38) -> Callable[[], tuple[list[str], Any]]:39 def wafv2_get_web_acls() -> tuple[list[str], Any]:40 log.debug("Executing wafv2 list-web-acls list-resources-for-web-acl")41 acls = unpaginated("wafv2", "list_web_acls", param={"Scope": scope})["WebACLs"]42 for acl in acls:43 if scope == "REGIONAL":44 lb_resources = unpaginated(45 "wafv2",46 "list_resources_for_web_acl",47 param={48 "WebACLArn": acl["ARN"],49 "ResourceType": "APPLICATION_LOAD_BALANCER",50 },51 )["ResourceArns"]52 apigw_resources = unpaginated(53 "wafv2",54 "list_resources_for_web_acl",55 param={"WebACLArn": acl["ARN"], "ResourceType": "API_GATEWAY"},56 )["ResourceArns"]57 else:58 lb_resources = []59 apigw_resources = []60 acl_details = unpaginated(61 "wafv2",62 "get_web_acl",63 param={"Name": acl["Name"], "Scope": scope, "Id": acl["Id"]},64 )["WebACL"]65 tags = unpaginated(66 "wafv2", "list_tags_for_resource", param={"ResourceARN": acl["ARN"]}67 )["TagInfoForResource"]68 acl["ALBResourceArns"] = lb_resources69 acl["APIGWResourceArns"] = apigw_resources70 acl.update(acl_details)71 acl.update(tags)72 return ["wafv2", "WebACLs"], acls73 return wafv2_get_web_acls74def wafv2_get_ip_sets_wrapper(75 scope: str, unpaginated: utils.UnpaginatedCallable76) -> Callable[[], tuple[list[str], Any]]:77 def wafv2_get_ip_sets() -> tuple[list[str], Any]:78 ip_sets = unpaginated("wafv2", "list_ip_sets", param={"Scope": scope})["IPSets"]79 for ip_set in ip_sets:80 ip_set_details = unpaginated(81 "wafv2",82 "get_ip_set",83 param={"Name": ip_set["Name"], "Scope": scope, "Id": ip_set["Id"]},84 )["IPSet"]85 tags = unpaginated(86 "wafv2", "list_tags_for_resource", param={"ResourceARN": ip_set["ARN"]}87 )["TagInfoForResource"]88 ip_set.update(ip_set_details)89 ip_set.update(tags)90 return ["wafv2", "IPSets"], ip_sets91 return wafv2_get_ip_sets92def collect_cloudfront_waf(93 credentials: dict[str, str],94 threads: Optional[int],95) -> dict[str, Any]:96 """CloudFront WAFs are global but must be collected by calling the us-east-1 endpoint:97 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/wafv2.html#WAFV2.Client.list_web_acls"""98 session = Session(**credentials, region_name="us-east-1") # type: ignore99 client_lock: Lock = Lock()100 client_cache: dict[str, BaseClient] = {}101 unpaginated = utils.get_unpaginated(session, client_lock, client_cache)102 tasks = [103 wafv2_get_web_acls_wrapper("CLOUDFRONT", unpaginated),104 wafv2_get_ip_sets_wrapper("CLOUDFRONT", unpaginated),105 ]106 wafv2_data = utils.execute_tasks(tasks, threads)107 return wafv2_data108def get_region_data(109 session: Session,110 include_inspector: bool,111 include_guardduty: bool,112 threads: Optional[int],113) -> dict[str, Any]:114 client_lock: Lock = Lock()115 client_cache: dict[str, BaseClient] = {}116 unpaginated = utils.get_unpaginated(session, client_lock, client_cache)117 paginate = utils.get_paginate(session, client_lock, client_cache)118 fake_paginate = utils.get_fake_paginate(session, client_lock, client_cache)119 tasks: list[Callable[[], tuple[list[str], Any]]] = []120 def add_task(task: Callable[[], tuple[list[str], Any]], *services: str) -> None:121 for service in services:122 if service == "wafv2":123 # Buggy for wafv2124 continue125 if session.region_name not in session.get_available_regions(service):126 log.info(127 f'Region "{session.region_name}" does not support service "{service}"'128 )129 return130 tasks.append(task)131 def ec2_describe_instances() -> tuple[list[str], Any]:132 log.debug("Executing ec2 describe-instances, describe-images")133 def get_image_id_to_instances(134 reservations: list[dict[str, Any]]135 ) -> dict[str, list[dict[str, Any]]]:136 image_id_to_instances: dict[str, list[dict[str, Any]]] = {}137 for reservation in reservations:138 for instance in reservation["Instances"]:139 if instance["ImageId"] not in image_id_to_instances:140 image_id_to_instances[instance["ImageId"]] = []141 image_id_to_instances[instance["ImageId"]].append(instance)142 return image_id_to_instances143 def get_images(image_ids: list[str]) -> list[dict[str, Any]]:144 return unpaginated("ec2", "describe_images", param={"ImageIds": image_ids})[145 "Images"146 ]147 def set_is_windows(reservations: list[dict[str, Any]]) -> None:148 image_id_to_instances = get_image_id_to_instances(reservations)149 images = get_images(list(image_id_to_instances))150 for image in images:151 is_windows = image.get("Platform") == "windows"152 for instance in image_id_to_instances.get(image["ImageId"], []):153 instance["IsWindows"] = is_windows154 reservations = paginate("ec2", "describe_instances", key="Reservations")155 set_is_windows(reservations)156 # TODO: Change to ["ec2", "Reservations"]157 return ["instance", "Reservations"], reservations158 add_task(ec2_describe_instances, "ec2")159 def ec2_describe_network_interfaces() -> tuple[list[str], Any]:160 log.debug("Executing ec2 describe-network-interfaces")161 # TODO: Change to ["ec2", "NetworkInterfaces"]162 return ["interface", "NetworkInterfaces"], paginate(163 "ec2", "describe_network_interfaces", key="NetworkInterfaces"164 )165 add_task(ec2_describe_network_interfaces, "ec2")166 def ec2_describe_security_groups() -> tuple[list[str], Any]:167 log.debug("Executing ec2 describe-security-groups")168 # TODO: Change to ["ec2", "SecurityGroups"]169 return ["securitygroup", "SecurityGroups"], paginate(170 "ec2", "describe_security_groups", key="SecurityGroups"171 )172 add_task(ec2_describe_security_groups, "ec2")173 def ec2_describe_subnets() -> tuple[list[str], Any]:174 log.debug("Executing ec2 describe-subnets")175 # TODO: Change to ["ec2", "Subnets"]176 return ["subnet", "Subnets"], paginate("ec2", "describe_subnets", key="Subnets")177 add_task(ec2_describe_subnets, "ec2")178 def ec2_describe_network_acls() -> tuple[list[str], Any]:179 log.debug("Executing ec2 describe-network-acls")180 # TODO: Change to ["ec2", "NetworkAcls"]181 return ["acl", "NetworkAcls"], paginate(182 "ec2", "describe_network_acls", key="NetworkAcls"183 )184 add_task(ec2_describe_network_acls, "ec2")185 def ec2_describe_vpcs() -> tuple[list[str], Any]:186 log.debug("Executing ec2 describe-vpcs")187 # TODO: Change to ["ec2", "Vpcs"]188 return ["vpc", "Vpcs"], paginate("ec2", "describe_vpcs", key="Vpcs")189 add_task(ec2_describe_vpcs, "ec2")190 def ec2_describe_vpc_peering_connections() -> tuple[list[str], Any]:191 log.debug("Executing ec2 describe-vpc-peering-connections")192 # TODO: Change to ["ec2", "VpcPeeringConnections"]193 return ["vpcpeering", "VpcPeeringConnections"], paginate(194 "ec2", "describe_vpc_peering_connections", key="VpcPeeringConnections"195 )196 add_task(ec2_describe_vpc_peering_connections, "ec2")197 def ec2_describe_internet_gateways() -> tuple[list[str], Any]:198 log.debug("Executing ec2 describe-internet-gateways")199 # TODO: Change to ["ec2", "InternetGateways"]200 return ["igw", "InternetGateways"], paginate(201 "ec2", "describe_internet_gateways", key="InternetGateways"202 )203 add_task(ec2_describe_internet_gateways, "ec2")204 def ec2_describe_vpn_gateways() -> tuple[list[str], Any]:205 log.debug("Executing ec2 describe-vpn-gateways")206 # TODO: Change to ["ec2", "VpnGateways"]207 return ["vgw", "VpnGateways"], unpaginated("ec2", "describe_vpn_gateways")[208 "VpnGateways"209 ]210 add_task(ec2_describe_vpn_gateways, "ec2")211 def ec2_describe_nat_gateways() -> tuple[list[str], Any]:212 log.debug("Executing ec2 describe-nat-gateways")213 # TODO: Change to ["ec2", "NatGateways"]214 return ["ngw", "NatGateways"], paginate(215 "ec2", "describe_nat_gateways", key="NatGateways"216 )217 add_task(ec2_describe_nat_gateways, "ec2")218 def ec2_describe_route_tables() -> tuple[list[str], Any]:219 log.debug("Executing ec2 describe-route-tables")220 # TODO: Change to ["ec2", "RouteTables"]221 return ["routetable", "RouteTables"], paginate(222 "ec2", "describe_route_tables", key="RouteTables"223 )224 add_task(ec2_describe_route_tables, "ec2")225 def ec2_describe_vpc_endpoints() -> tuple[list[str], Any]:226 log.debug("Executing ec2 describe-vpc-endpoints")227 # TODO: Change to ["ec2", "VpcEndpoints"]228 return ["vpcendpoint", "VpcEndpoints"], paginate(229 "ec2", "describe_vpc_endpoints", key="VpcEndpoints"230 )231 add_task(ec2_describe_vpc_endpoints, "ec2")232 def ec2_describe_volumes() -> tuple[list[str], Any]:233 log.debug("Executing ec2 describe-volumes")234 # TODO: Change to ["ec2", "Volumes"]235 return ["ebs", "Volumes"], paginate("ec2", "describe_volumes", key="Volumes")236 add_task(ec2_describe_volumes, "ec2")237 def ec2_describe_transit_gateways() -> tuple[list[str], Any]:238 log.debug(239 "Executing ec2 describe-transit-gateways, describe-transit-gateway-attachments, get-transit-gateway-attachment-propagations, describe-transit-gateway-peering-attachments, describe-transit-gateway-route-tables, search-transit-gateway-routes, get-transit-gateway-prefix-list-references, get-transit-gateway-route-table-associations, get-transit-gateway-route-table-propagations, describe-transit-gateway-vpc-attachments"240 )241 def get_attachments() -> dict[str, dict[str, dict[str, Any]]]:242 def get_attachment_propagations(attachment_id: str) -> list[dict[str, Any]]:243 try:244 return paginate(245 "ec2",246 "get_transit_gateway_attachment_propagations",247 key="TransitGatewayAttachmentPropagations",248 param={"TransitGatewayAttachmentId": attachment_id},249 )250 except ClientError as e:251 if (252 e.response.get("Error", {}).get("Code")253 != "InvalidTransitGatewayAttachmentID.NotFound"254 ):255 raise256 return []257 attachments = paginate(258 "ec2",259 "describe_transit_gateway_attachments",260 key="TransitGatewayAttachments",261 param={262 "Filters": [263 {"Name": "association.state", "Values": ["associated"]},264 {"Name": "state", "Values": ["available"]},265 ]266 },267 )268 res: dict[str, dict[str, dict[str, Any]]] = {}269 for attachment in attachments:270 tgw_id = attachment["TransitGatewayId"]271 attachment_id = attachment["TransitGatewayAttachmentId"]272 attachment["Propagations"] = get_attachment_propagations(attachment_id)273 if tgw_id not in res:274 res[tgw_id] = {}275 res[tgw_id][attachment_id] = attachment276 return res277 def get_peering_attachments() -> dict[str, dict[str, Any]]:278 peering_attachments = paginate(279 "ec2",280 "describe_transit_gateway_peering_attachments",281 key="TransitGatewayPeeringAttachments",282 )283 res: dict[str, dict[str, Any]] = {}284 for peering_attachment in peering_attachments:285 attachment_id = peering_attachment["TransitGatewayAttachmentId"]286 res[attachment_id] = peering_attachment287 return res288 def get_route_tables() -> dict[str, dict[str, dict[str, Any]]]:289 def get_routes(route_table_id: str) -> list[dict[str, Any]]:290 param = {291 "TransitGatewayRouteTableId": route_table_id,292 "Filters": [293 {294 "Name": "route-search.subnet-of-match",295 "Values": ["0.0.0.0/0", "::/0"],296 }297 ],298 }299 return unpaginated("ec2", "search_transit_gateway_routes", param=param)[300 "Routes"301 ]302 def get_prefix_list_references(route_table_id: str) -> list[dict[str, Any]]:303 return paginate(304 "ec2",305 "get_transit_gateway_prefix_list_references",306 key="TransitGatewayPrefixListReferences",307 param={"TransitGatewayRouteTableId": route_table_id},308 )309 def get_associations(route_table_id: str) -> list[dict[str, Any]]:310 return paginate(311 "ec2",312 "get_transit_gateway_route_table_associations",313 key="Associations",314 param={"TransitGatewayRouteTableId": route_table_id},315 )316 def get_propagations(route_table_id: str) -> list[dict[str, Any]]:317 return paginate(318 "ec2",319 "get_transit_gateway_route_table_propagations",320 key="TransitGatewayRouteTablePropagations",321 param={"TransitGatewayRouteTableId": route_table_id},322 )323 route_tables = paginate(324 "ec2",325 "describe_transit_gateway_route_tables",326 key="TransitGatewayRouteTables",327 )328 res: dict[str, dict[str, dict[str, Any]]] = {}329 for route_table in route_tables:330 tgw_id = route_table["TransitGatewayId"]331 route_table_id = route_table["TransitGatewayRouteTableId"]332 route_table["Routes"] = get_routes(route_table_id)333 route_table["PrefixListReferences"] = get_prefix_list_references(334 route_table_id335 )336 route_table["Associations"] = get_associations(route_table_id)337 route_table["Propagations"] = get_propagations(route_table_id)338 if tgw_id not in res:339 res[tgw_id] = {}340 res[tgw_id][route_table_id] = route_table341 return res342 def get_vpc_attachments() -> dict[str, dict[str, dict[str, Any]]]:343 vpc_attachments = paginate(344 "ec2",345 "describe_transit_gateway_vpc_attachments",346 key="TransitGatewayVpcAttachments",347 )348 res: dict[str, dict[str, dict[str, Any]]] = {}349 for vpc_attachment in vpc_attachments:350 tgw_id = vpc_attachment["TransitGatewayId"]351 attachment_id = vpc_attachment["TransitGatewayAttachmentId"]352 if tgw_id not in res:353 res[tgw_id] = {}354 res[tgw_id][attachment_id] = vpc_attachment355 return res356 transit_gateways = paginate(357 "ec2", "describe_transit_gateways", key="TransitGateways"358 )359 attachments = get_attachments()360 peering_attachments = get_peering_attachments()361 route_tables = get_route_tables()362 vpc_attachments = get_vpc_attachments()363 for transit_gateway in transit_gateways:364 tgw_id = transit_gateway["TransitGatewayId"]365 transit_gateway["Attachments"] = list(attachments.get(tgw_id, {}).values())366 transit_gateway["PeeringAttachments"] = [367 peering_attachments[attachment_id]368 for attachment_id in attachments.get(tgw_id, {})369 if attachment_id in peering_attachments370 ]371 transit_gateway["RouteTables"] = list(route_tables.get(tgw_id, {}).values())372 transit_gateway["VpcAttachments"] = list(373 vpc_attachments.get(tgw_id, {}).values()374 )375 return ["ec2", "TransitGateways"], transit_gateways376 add_task(ec2_describe_transit_gateways, "ec2")377 def elb_describe_load_balancers() -> tuple[list[str], Any]:378 log.debug("Executing elb describe-load-balancers")379 return ["elb", "LoadBalancerDescriptions"], paginate(380 "elb", "describe_load_balancers", key="LoadBalancerDescriptions"381 )382 add_task(elb_describe_load_balancers, "elb")383 def elbv2_describe_load_balancers() -> tuple[list[str], Any]:384 log.debug(385 "Executing elbv2 describe-load-balancers, describe-listeners, describe-rules"386 )387 def get_rules(listener_arn: str) -> list[dict[str, Any]]:388 try:389 return paginate(390 "elbv2",391 "describe_rules",392 key="Rules",393 param={"ListenerArn": listener_arn},394 )395 except ClientError as e:396 if e.response.get("Error", {}).get("Code") != "ListenerNotFound":397 raise398 return []399 def get_listeners(load_balancer_arn: str) -> list[dict[str, Any]]:400 listeners = paginate(401 "elbv2",402 "describe_listeners",403 key="Listeners",404 param={"LoadBalancerArn": load_balancer_arn},405 )406 for listener in listeners:407 listener["Rules"] = get_rules(listener["ListenerArn"])408 return listeners409 all_load_balancers = paginate(410 "elbv2", "describe_load_balancers", key="LoadBalancers"411 )412 valid_lbs = []413 for lb in all_load_balancers:414 if lb["Type"] != "gateway":415 valid_lbs.append(lb)416 for load_balancer in valid_lbs:417 load_balancer["Listeners"] = get_listeners(load_balancer["LoadBalancerArn"])418 return ["elbv2", "LoadBalancers"], valid_lbs419 add_task(elbv2_describe_load_balancers, "elbv2")420 def elbv2_describe_target_groups() -> tuple[list[str], Any]:421 log.debug("Executing elbv2 describe-target-groups, describe-target-health")422 def get_targets(target_group_arn: str) -> list[dict[str, Any]]:423 try:424 target_health_descriptions = unpaginated(425 "elbv2",426 "describe_target_health",427 param={"TargetGroupArn": target_group_arn},428 )["TargetHealthDescriptions"]429 return [430 target_health_description["Target"]431 for target_health_description in target_health_descriptions432 ]433 except ClientError as e:434 if e.response.get("Error", {}).get("Code") != "TargetGroupNotFound":435 raise436 return []437 target_groups = paginate("elbv2", "describe_target_groups", key="TargetGroups")438 for target_group in target_groups:439 target_group["Targets"] = get_targets(target_group["TargetGroupArn"])440 return ["elbv2", "TargetGroups"], target_groups441 add_task(elbv2_describe_target_groups, "elbv2")442 def autoscaling_describe_launch_configurations() -> tuple[list[str], Any]:443 log.debug("Executing autoscaling describe-launch-configurations")444 # TODO: Change to ["autoscaling", "LaunchConfigurations"]445 return ["launchconfigs", "LaunchConfigurations"], paginate(446 "autoscaling", "describe_launch_configurations", key="LaunchConfigurations"447 )448 add_task(autoscaling_describe_launch_configurations, "autoscaling")449 def rds_describe_db_instances() -> tuple[list[str], Any]:450 log.debug("Executing rds describe-db-instances")451 # TODO: Change to ["rds", "DBInstances"]452 return ["rds", "Instances", "DBInstances"], paginate(453 "rds", "describe_db_instances", key="DBInstances"454 )455 add_task(rds_describe_db_instances, "rds")456 def rds_describe_db_subnet_groups() -> tuple[list[str], Any]:457 log.debug("Executing rds describe-db-subnet-groups")458 # TODO: Change to ["rds", "DBSubnetGroups"]459 return ["rds", "SubnetGroups", "DBSubnetGroups"], paginate(460 "rds", "describe_db_subnet_groups", key="DBSubnetGroups"461 )462 add_task(rds_describe_db_subnet_groups, "rds")463 def rds_describe_db_clusters() -> tuple[list[str], Any]:464 log.debug("Executing rds describe-db-clusters")465 return ["rds", "DBClusters"], paginate(466 "rds", "describe_db_clusters", key="DBClusters"467 )468 add_task(rds_describe_db_clusters, "rds")469 def lambda_list_functions() -> tuple[list[str], Any]:470 log.debug("Executing lambda list-functions list-tags")471 def list_tags(arn: str) -> dict[str, Any]:472 return unpaginated("lambda", "list_tags", param={"Resource": arn})["Tags"]473 functions = paginate("lambda", "list_functions", key="Functions")474 for function in functions:475 function["Tags"] = list_tags(function["FunctionArn"])476 return ["lambda", "Functions"], functions477 add_task(lambda_list_functions, "lambda")478 def kms_list_keys() -> tuple[list[str], Any]:479 log.debug("Executing kms list-keys, get-key-policy")480 def get_policy(key_id: str) -> dict[str, Any]:481 return json.loads(482 unpaginated(483 "kms",484 "get_key_policy",485 param={"KeyId": key_id, "PolicyName": "default"},486 )["Policy"]487 )488 def get_tags(key_id: str) -> list[dict[str, Any]]:489 try:490 return unpaginated(491 "kms", "list_resource_tags", param={"KeyId": key_id}492 )["Tags"]493 except ClientError as e:494 if e.response.get("Error", {}).get("Code") != "AccessDeniedException":495 raise496 # No resource-based policy allows the kms:ListResourceTags action497 return []498 keys = paginate("kms", "list_keys", key="Keys")499 for key in keys:500 key["Policy"] = get_policy(key["KeyId"])501 key["Tags"] = get_tags(key["KeyId"])502 return ["kms", "Keys"], keys503 add_task(kms_list_keys, "kms")504 def inspector_describe_findings() -> tuple[list[str], Any]:505 log.debug(506 "Executing inspector list-assessment-runs, describe-assessment-runs, list-rules-packages, describe-rules-packages, list-findings, describe-findings"507 )508 # Filter findings to only return findings and runs from the last 365 days509 now = datetime.now()510 time_range = {"beginDate": now - timedelta(days=365), "endDate": now}511 def get_assessment_run_arns() -> list[str]:512 # List all runs within the timeframe513 return paginate(514 "inspector",515 "list_assessment_runs",516 key="assessmentRunArns",517 param={"filter": {"completionTimeRange": time_range}},518 )519 def get_assessment_runs() -> list[dict[str, Any]]:520 assessment_run_arns = get_assessment_run_arns()521 if not assessment_run_arns:522 return []523 return fake_paginate(524 "inspector",525 "describe_assessment_runs",526 request_key="assessmentRunArns",527 response_key="assessmentRuns",528 n=10,529 items=assessment_run_arns,530 )531 def get_latest_assessment_run_arn() -> Optional[str]:532 # Get the latest run with findings in it533 assessment_runs = get_assessment_runs()534 sorted_assessment_runs = sorted(535 assessment_runs,536 key=lambda assessment_run: assessment_run["completedAt"],537 reverse=True,538 )539 for assessment_run in sorted_assessment_runs:540 finding_count = sum(assessment_run.get("findingCounts", {}).values())541 if finding_count > 0:542 return assessment_run["arn"]543 return None544 def get_rules_package_arns() -> list[str]:545 return paginate("inspector", "list_rules_packages", key="rulesPackageArns")546 def get_rules_packages() -> list[dict[str, Any]]:547 rules_package_arns = get_rules_package_arns()548 if not rules_package_arns:549 return []550 return fake_paginate(551 "inspector",552 "describe_rules_packages",553 request_key="rulesPackageArns",554 response_key="rulesPackages",555 n=10,556 items=rules_package_arns,557 )558 def get_supported_rules_package_arns() -> list[str]:559 # Get all supported rule packages560 rules_packages = get_rules_packages()561 rules_package_arns = []562 for rules_package in rules_packages:563 if "Common Vulnerabilities and Exposures" in rules_package["name"]:564 rules_package_arns.append(rules_package["arn"])565 if "Network Reachability" in rules_package["name"]:566 rules_package_arns.append(rules_package["arn"])567 return rules_package_arns568 def get_finding_arns() -> list[str]:569 # List all findings within the timeframe and the latest run570 # Filter to only include supported finding types571 assessment_run_arn = get_latest_assessment_run_arn()572 if not assessment_run_arn:573 return []574 rules_package_arns = get_supported_rules_package_arns()575 if not rules_package_arns:576 return []577 return paginate(578 "inspector",579 "list_findings",580 key="findingArns",581 param={582 "assessmentRunArns": [assessment_run_arn],583 "filter": {584 "rulesPackageArns": rules_package_arns,585 "creationTimeRange": time_range,586 },587 },588 )589 def get_findings() -> list[dict[str, Any]]:590 finding_arns = get_finding_arns()591 if not finding_arns:592 return []593 return fake_paginate(594 "inspector",595 "describe_findings",596 request_key="findingArns",597 response_key="findings",598 n=10,599 items=finding_arns,600 )601 # TODO: Change to ["inspector", "findings"]602 return ["inspector"], get_findings()603 if include_inspector:604 add_task(inspector_describe_findings, "inspector")605 def dynamodb_list_tables() -> tuple[list[str], Any]:606 log.debug("Executing dynamodb list-tables")607 table_names = paginate("dynamodb", "list_tables", key="TableNames")608 tables = []609 for table_name in table_names:610 table = unpaginated(611 "dynamodb", "describe_table", param={"TableName": table_name}612 )["Table"]613 arn = table.get("TableArn")614 tags = paginate(615 "dynamodb",616 "list_tags_of_resource",617 param={"ResourceArn": arn},618 key="Tags",619 )620 table["Tags"] = tags621 tables.append(table)622 return ["dynamodb", "Tables"], tables623 add_task(dynamodb_list_tables, "dynamodb")624 def ecr_describe_repositories_wrapper(625 prev_region_data: dict[str, Any]626 ) -> Callable[[], tuple[list[str], Any]]:627 def ecr_describe_repositories() -> tuple[list[str], Any]:628 log.debug(629 "Executing ecr describe-repositories, get-repository-policy, list-images"630 )631 def get_policy(repository_name: str) -> dict[str, Any]:632 return json.loads(633 unpaginated(634 "ecr",635 "get_repository_policy",636 param={"repositoryName": repository_name},637 )["policyText"]638 )639 def get_images(640 repository_name: str, images: list[dict[str, Any]]641 ) -> list[dict[str, Any]]:642 if not images:643 return []644 try:645 return fake_paginate(646 "ecr",647 "describe_images",648 request_key="imageIds",649 response_key="imageDetails",650 n=100,651 items=images,652 param={653 "repositoryName": repository_name,654 "filter": {"tagStatus": "TAGGED"},655 },656 )657 except ClientError as e:658 if (659 e.response.get("Error", {}).get("Code")660 != "ImageNotFoundException"661 ):662 raise663 # message = e.response["Error"]["Message"]664 # message = "The image with imageId {imageDigest:'sha256:x', imageTag:'x'} does not exist within the repository with name 'x' in the registry with id 'x'"665 return []666 def get_findings(repository_name: str, image: dict[str, Any]):667 try:668 return paginate(669 "ecr",670 "describe_image_scan_findings",671 param={672 "repositoryName": repository_name,673 "imageId": image,674 },675 )676 except ClientError as e:677 if e.response.get("Error", {}).get("Code") not in {678 "ImageNotFoundException",679 "ScanNotFoundException",680 }:681 raise682 return []683 def get_tags(arn: str) -> list[dict[str, Any]]:684 return unpaginated(685 "ecr",686 "list_tags_for_resource",687 param={"resourceArn": arn},688 )["tags"]689 images = []690 non_dh_images = []691 for cluster in prev_region_data.get("ecs", []):692 for task in cluster.get("tasks", []):693 for container in task.get("containers", []):694 if "imageDigest" not in container:695 continue696 image = container["image"]697 if ":" in image:698 image_tag = image.split(":")[1]699 elif "@" in image:700 image_tag = image.split("@")[1]701 else:702 image_tag = "latest"703 image_id = {704 "imageDigest": container["imageDigest"],705 "imageTag": image_tag,706 }707 images.append(image_id)708 if "/" in image:709 non_dh_images.append(image_id)710 repositories = paginate("ecr", "describe_repositories", key="repositories")711 for repository in repositories:712 repository_name = repository["repositoryName"]713 try:714 repository["policy"] = get_policy(repository_name)715 except ClientError as e:716 if (717 e.response.get("Error", {}).get("Code")718 != "RepositoryPolicyNotFoundException"719 ):720 raise721 repository["policy"] = None722 repository["imageDetails"] = get_images(repository_name, non_dh_images)723 repository["tags"] = get_tags(repository["repositoryArn"])724 for image_details in repository["imageDetails"]:725 for image in non_dh_images:726 if (727 image_details["imageDigest"] == image["imageDigest"]728 and image["imageTag"] in image_details["imageTags"]729 ):730 findings = get_findings(repository_name, image)731 image_details["findings"] = findings732 # TODO: Change to ["ecr", "repositories"]733 return ["ecr"], repositories734 return ecr_describe_repositories735 def ecs_describe_clusters() -> tuple[list[str], Any]:736 log.debug(737 "Executing ecs list-clusters, describe-clusters, list-services, describe-services, list-container-instances, describe-container-instances, list-tasks, describe-tasks"738 )739 def get_cluster_arns() -> list[str]:740 return paginate("ecs", "list_clusters", key="clusterArns")741 def get_clusters() -> list[dict[str, Any]]:742 cluster_arns = get_cluster_arns()743 return fake_paginate(744 "ecs",745 "describe_clusters",746 request_key="clusters",747 response_key="clusters",748 n=100,749 items=cluster_arns,750 param={"include": ["ATTACHMENTS", "SETTINGS", "STATISTICS", "TAGS"]},751 )752 def get_service_arns(cluster_arn: str) -> list[str]:753 return paginate(754 "ecs",755 "list_services",756 key="serviceArns",757 param={"cluster": cluster_arn},758 )759 def get_services(cluster_arn: str) -> list[dict[str, Any]]:760 def get_task_arns(service_name: str) -> list[str]:761 return paginate(762 "ecs",763 "list_tasks",764 key="taskArns",765 param={"cluster": cluster_arn, "serviceName": service_name},766 )767 service_arns = get_service_arns(cluster_arn)768 services = fake_paginate(769 "ecs",770 "describe_services",771 request_key="services",772 response_key="services",773 n=10,774 items=service_arns,775 param={"cluster": cluster_arn},776 )777 for service in services:778 # TODO: Use key "taskArns"779 service["tasks"] = get_task_arns(service["serviceName"])780 return services781 def get_container_instance_arns(cluster_arn: str) -> list[str]:782 return paginate(783 "ecs",784 "list_container_instances",785 key="containerInstanceArns",786 param={"cluster": cluster_arn},787 )788 def get_container_instances(cluster_arn: str) -> list[dict[str, Any]]:789 def get_task_arns(container_instance_arn: str) -> list[str]:790 return paginate(791 "ecs",792 "list_tasks",793 key="taskArns",794 param={795 "cluster": cluster_arn,796 "containerInstance": container_instance_arn,797 },798 )799 container_instance_arns = get_container_instance_arns(cluster_arn)800 container_instances = fake_paginate(801 "ecs",802 "describe_container_instances",803 request_key="containerInstances",804 response_key="containerInstances",805 n=100,806 items=container_instance_arns,807 param={"cluster": cluster_arn, "include": ["TAGS"]},808 )809 for container_instance in container_instances:810 # TODO: Use key "taskArns"811 container_instance["tasks"] = get_task_arns(812 container_instance["containerInstanceArn"]813 )814 return container_instances815 def get_task_arns(cluster_arn: str) -> list[str]:816 return paginate(817 "ecs", "list_tasks", key="taskArns", param={"cluster": cluster_arn}818 )819 def get_task_definition(taskdef_arn: str) -> dict[str, Any]:820 return unpaginated(821 "ecs",822 "describe_task_definition",823 param={"taskDefinition": taskdef_arn, "include": ["TAGS"]},824 )["taskDefinition"]825 def get_tasks(cluster_arn: str) -> list[dict[str, Any]]:826 task_arns = get_task_arns(cluster_arn)827 return fake_paginate(828 "ecs",829 "describe_tasks",830 request_key="tasks",831 response_key="tasks",832 n=100,833 items=task_arns,834 param={"cluster": cluster_arn, "include": ["TAGS"]},835 )836 clusters = get_clusters()837 for cluster in clusters:838 cluster_arn = cluster["clusterArn"]839 cluster["services"] = get_services(cluster_arn)840 cluster["containerInstances"] = get_container_instances(cluster_arn)841 cluster["tasks"] = get_tasks(cluster_arn)842 for task in cluster["tasks"]:843 task["taskDefinition"] = get_task_definition(task["taskDefinitionArn"])844 # TODO: Change to ["ecs", "clusters"]845 return ["ecs"], clusters846 add_task(ecs_describe_clusters, "ecs")847 def apigateway_get_rest_apis() -> tuple[list[str], Any]:848 log.debug(849 "Executing apigateway get-rest-apis, get-authorizers, get-deployments, get-request-validators, get-stages, get-resources, get-method, get-vpc-links"850 )851 def get_vpc_links() -> list[dict[str, Any]]:852 return paginate(853 "apigateway",854 "get_vpc_links",855 key="items",856 )857 def get_authorizers(rest_api_id: str) -> list[dict[str, Any]]:858 return paginate(859 "apigateway",860 "get_authorizers",861 key="items",862 param={"restApiId": rest_api_id},863 )864 def get_deployments(rest_api_id: str) -> list[dict[str, Any]]:865 return paginate(866 "apigateway",867 "get_deployments",868 key="items",869 param={"restApiId": rest_api_id},870 )871 def get_request_validators(rest_api_id: str) -> list[dict[str, Any]]:872 return paginate(873 "apigateway",874 "get_request_validators",875 key="items",876 param={"restApiId": rest_api_id},877 )878 def get_stages(rest_api_id: str) -> list[dict[str, Any]]:879 return unpaginated(880 "apigateway", "get_stages", param={"restApiId": rest_api_id}881 )["item"]882 def get_resources(rest_api_id: str) -> list[dict[str, Any]]:883 return paginate(884 "apigateway",885 "get_resources",886 key="items",887 param={"restApiId": rest_api_id},888 )889 def get_method(890 rest_api_id: str, resource_id: str, method: str891 ) -> dict[str, Any]:892 return unpaginated(893 "apigateway",894 "get_method",895 param={896 "restApiId": rest_api_id,897 "resourceId": resource_id,898 "httpMethod": method,899 },900 )901 rest_apis = paginate("apigateway", "get_rest_apis", key="items")902 for rest_api in rest_apis:903 rest_api_id = rest_api["id"]904 rest_api["vpcLinks"] = get_vpc_links()905 rest_api["authorizers"] = get_authorizers(rest_api_id)906 rest_api["deployments"] = get_deployments(rest_api_id)907 rest_api["requestValidators"] = get_request_validators(rest_api_id)908 rest_api["stages"] = get_stages(rest_api_id)909 rest_api["resources"] = get_resources(rest_api_id)910 for resource in rest_api["resources"]:911 resource["methods"] = []912 for method in resource.get("resourceMethods", []):913 resource["methods"].append(914 get_method(rest_api_id, resource["id"], method)915 )916 # TODO: Change to ["apigateway", "RestApis"]917 return ["apigateway", "Apis"], rest_apis918 add_task(apigateway_get_rest_apis, "apigateway")919 def apigateway_get_usage_plans() -> tuple[list[str], Any]:920 log.debug("Executing apigateway get-usage-plans, get-usage-plan-keys")921 def get_keys(usage_plan_id: str) -> list[dict[str, Any]]:922 return paginate(923 "apigateway",924 "get_usage_plan_keys",925 key="items",926 param={"usagePlanId": usage_plan_id},927 )928 usage_plans = paginate("apigateway", "get_usage_plans", key="items")929 for usage_plan in usage_plans:930 usage_plan["keys"] = get_keys(usage_plan["id"])931 return ["apigateway", "UsagePlans"], usage_plans932 add_task(apigateway_get_usage_plans, "apigateway")933 def apigatewayv2_get_apis() -> tuple[list[str], Any]:934 log.debug(935 "Executing apigatewayv2 get-apis, get-routes, get-integrations, get-authorizers"936 )937 def get_vpc_links() -> list[dict[str, Any]]:938 return unpaginated(939 "apigatewayv2",940 "get_vpc_links",941 )["Items"]942 def get_apis() -> list[dict[str, Any]]:943 return paginate(944 "apigatewayv2",945 "get_apis",946 key="Items",947 )948 def get_routes(api_id: str) -> list[dict[str, Any]]:949 return paginate(950 "apigatewayv2",951 "get_routes",952 key="Items",953 param={"ApiId": api_id},954 )955 def get_authorizers(api_id: str) -> list[dict[str, Any]]:956 return paginate(957 "apigatewayv2",958 "get_authorizers",959 key="Items",960 param={"ApiId": api_id},961 )962 def get_integrations(api_id: str) -> list[dict[str, Any]]:963 return paginate(964 "apigatewayv2",965 "get_integrations",966 key="Items",967 param={"ApiId": api_id},968 )969 apis = get_apis()970 for api in apis:971 api_id = api["ApiId"]972 api["Routes"] = get_routes(api_id)973 api["Authorizers"] = get_authorizers(api_id)974 api["Integrations"] = get_integrations(api_id)975 api["VpcLinks"] = get_vpc_links()976 return ["apigatewayv2", "Apis"], apis977 add_task(apigatewayv2_get_apis, "apigatewayv2")978 def guardduty_get_findings() -> tuple[list[str], Any]:979 log.debug("Executing guardduty list-detectors, list-findings, get-findings")980 detectors = paginate("guardduty", "list_detectors", key="DetectorIds")981 findings = []982 for detector_id in detectors:983 finding_ids = paginate(984 "guardduty",985 "list_findings",986 key="FindingIds",987 param={"DetectorId": detector_id},988 )989 if finding_ids:990 findings.extend(991 fake_paginate(992 "guardduty",993 "get_findings",994 request_key="FindingIds",995 response_key="Findings",996 n=50,997 items=finding_ids,998 param={"DetectorId": detector_id},999 )1000 )1001 return ["guardduty", "Findings"], findings1002 if include_guardduty:1003 add_task(guardduty_get_findings, "guardduty")1004 add_task(wafv2_get_web_acls_wrapper("REGIONAL", unpaginated))1005 add_task(wafv2_get_ip_sets_wrapper("REGIONAL", unpaginated))1006 # phase 1, everything except ecr1007 region_data = utils.execute_tasks(tasks, threads)1008 if region_data is None:1009 raise RuntimeError("utils.execute_tasks phase 1 returned None")1010 # phase 2, ecr1011 tasks = [ecr_describe_repositories_wrapper(region_data)]1012 ecr_data = utils.execute_tasks(tasks, threads)1013 if ecr_data is None:1014 raise RuntimeError("utils.execute_tasks phase 2 returned None")1015 region_data.update(ecr_data)...
ec2_fixtures.py
Source:ec2_fixtures.py
1import boto32import pytest3from moto import mock_ec24from tests.utils import random_str5@pytest.fixture6def gen_ec2_client(aws_setup):7 with mock_ec2():8 yield boto3.client("ec2")9@pytest.fixture10def gen_ec2_resource(aws_setup):11 with mock_ec2():12 yield boto3.resource("ec2")13# ============================14# WAITER15# ============================16@pytest.fixture17def gen_instance_exists_waiter(gen_ec2_client):18 return gen_ec2_client.get_waiter("instance_exists")19@pytest.fixture20def gen_bundle_task_complete_waiter(gen_ec2_client):21 return gen_ec2_client.get_waiter("bundle_task_complete")22@pytest.fixture23def gen_conversion_task_cancelled_waiter(gen_ec2_client):24 return gen_ec2_client.get_waiter("conversion_task_cancelled")25@pytest.fixture26def gen_conversion_task_completed_waiter(gen_ec2_client):27 return gen_ec2_client.get_waiter("conversion_task_completed")28@pytest.fixture29def gen_conversion_task_deleted_waiter(gen_ec2_client):30 return gen_ec2_client.get_waiter("conversion_task_deleted")31@pytest.fixture32def gen_customer_gateway_available_waiter(gen_ec2_client):33 return gen_ec2_client.get_waiter("customer_gateway_available")34@pytest.fixture35def gen_export_task_cancelled_waiter(gen_ec2_client):36 return gen_ec2_client.get_waiter("export_task_cancelled")37@pytest.fixture38def gen_export_task_completed_waiter(gen_ec2_client):39 return gen_ec2_client.get_waiter("export_task_completed")40@pytest.fixture41def gen_image_exists_waiter(gen_ec2_client):42 return gen_ec2_client.get_waiter("image_exists")43@pytest.fixture44def gen_image_available_waiter(gen_ec2_client):45 return gen_ec2_client.get_waiter("image_available")46@pytest.fixture47def gen_instance_running_waiter(gen_ec2_client):48 return gen_ec2_client.get_waiter("instance_running")49@pytest.fixture50def gen_instance_status_ok_waiter(gen_ec2_client):51 return gen_ec2_client.get_waiter("instance_status_ok")52@pytest.fixture53def gen_instance_stopped_waiter(gen_ec2_client):54 return gen_ec2_client.get_waiter("instance_stopped")55@pytest.fixture56def gen_instance_terminated_waiter(gen_ec2_client):57 return gen_ec2_client.get_waiter("instance_terminated")58@pytest.fixture59def gen_key_pair_exists_waiter(gen_ec2_client):60 return gen_ec2_client.get_waiter("key_pair_exists")61@pytest.fixture62def gen_nat_gateway_available_waiter(gen_ec2_client):63 return gen_ec2_client.get_waiter("nat_gateway_available")64@pytest.fixture65def gen_network_interface_available_waiter(gen_ec2_client):66 return gen_ec2_client.get_waiter("network_interface_available")67@pytest.fixture68def gen_password_data_available_waiter(gen_ec2_client):69 return gen_ec2_client.get_waiter("password_data_available")70@pytest.fixture71def gen_snapshot_completed_waiter(gen_ec2_client):72 return gen_ec2_client.get_waiter("snapshot_completed")73@pytest.fixture74def gen_security_group_exists_waiter(gen_ec2_client):75 return gen_ec2_client.get_waiter("security_group_exists")76@pytest.fixture77def gen_spot_instance_request_fulfilled_waiter(gen_ec2_client):78 return gen_ec2_client.get_waiter("spot_instance_request_fulfilled")79@pytest.fixture80def gen_subnet_available_waiter(gen_ec2_client):81 return gen_ec2_client.get_waiter("subnet_available")82@pytest.fixture83def gen_system_status_ok_waiter(gen_ec2_client):84 return gen_ec2_client.get_waiter("system_status_ok")85@pytest.fixture86def gen_volume_available_waiter(gen_ec2_client):87 return gen_ec2_client.get_waiter("volume_available")88@pytest.fixture89def gen_volume_deleted_waiter(gen_ec2_client):90 return gen_ec2_client.get_waiter("volume_deleted")91@pytest.fixture92def gen_volume_in_use_waiter(gen_ec2_client):93 return gen_ec2_client.get_waiter("volume_in_use")94@pytest.fixture95def gen_vpc_available_waiter(gen_ec2_client):96 return gen_ec2_client.get_waiter("vpc_available")97@pytest.fixture98def gen_vpc_exists_waiter(gen_ec2_client):99 return gen_ec2_client.get_waiter("vpc_exists")100@pytest.fixture101def gen_vpn_connection_available_waiter(gen_ec2_client):102 return gen_ec2_client.get_waiter("vpn_connection_available")103@pytest.fixture104def gen_vpn_connection_deleted_waiter(gen_ec2_client):105 return gen_ec2_client.get_waiter("vpn_connection_deleted")106@pytest.fixture107def gen_vpc_peering_connection_exists_waiter(gen_ec2_client):108 return gen_ec2_client.get_waiter("vpc_peering_connection_exists")109@pytest.fixture110def gen_vpc_peering_connection_deleted_waiter(gen_ec2_client):111 return gen_ec2_client.get_waiter("vpc_peering_connection_deleted")112# ============================113# PAGINATOR114# ============================115@pytest.fixture116def gen_describe_route_tables_paginator(gen_ec2_client):117 return gen_ec2_client.get_paginator("describe_route_tables")118@pytest.fixture119def gen_describe_iam_instance_profile_associations_paginator(gen_ec2_client):120 return gen_ec2_client.get_paginator("describe_iam_instance_profile_associations")121@pytest.fixture122def gen_describe_instance_status_paginator(gen_ec2_client):123 return gen_ec2_client.get_paginator("describe_instance_status")124@pytest.fixture125def gen_describe_instances_paginator(gen_ec2_client):126 return gen_ec2_client.get_paginator("describe_instances")127@pytest.fixture128def gen_describe_reserved_instances_offerings_paginator(gen_ec2_client):129 return gen_ec2_client.get_paginator("describe_reserved_instances_offerings")130@pytest.fixture131def gen_describe_reserved_instances_modifications_paginator(gen_ec2_client):132 return gen_ec2_client.get_paginator("describe_reserved_instances_modifications")133@pytest.fixture134def gen_describe_security_groups_paginator(gen_ec2_client):135 return gen_ec2_client.get_paginator("describe_security_groups")136@pytest.fixture137def gen_describe_snapshots_paginator(gen_ec2_client):138 return gen_ec2_client.get_paginator("describe_snapshots")139@pytest.fixture140def gen_describe_spot_fleet_instances_paginator(gen_ec2_client):141 return gen_ec2_client.get_paginator("describe_spot_fleet_instances")142@pytest.fixture143def gen_describe_spot_fleet_requests_paginator(gen_ec2_client):144 return gen_ec2_client.get_paginator("describe_spot_fleet_requests")145@pytest.fixture146def gen_describe_spot_price_history_paginator(gen_ec2_client):147 return gen_ec2_client.get_paginator("describe_spot_price_history")148@pytest.fixture149def gen_describe_tags_paginator(gen_ec2_client):150 return gen_ec2_client.get_paginator("describe_tags")151@pytest.fixture152def gen_describe_volume_status_paginator(gen_ec2_client):153 return gen_ec2_client.get_paginator("describe_volume_status")154@pytest.fixture155def gen_describe_volumes_paginator(gen_ec2_client):156 return gen_ec2_client.get_paginator("describe_volumes")157@pytest.fixture158def gen_describe_nat_gateways_paginator(gen_ec2_client):159 return gen_ec2_client.get_paginator("describe_nat_gateways")160@pytest.fixture161def gen_describe_network_interfaces_paginator(gen_ec2_client):162 return gen_ec2_client.get_paginator("describe_network_interfaces")163@pytest.fixture164def gen_describe_vpc_endpoints_paginator(gen_ec2_client):165 return gen_ec2_client.get_paginator("describe_vpc_endpoints")166@pytest.fixture167def gen_describe_vpc_endpoint_services_paginator(gen_ec2_client):168 return gen_ec2_client.get_paginator("describe_vpc_endpoint_services")169@pytest.fixture170def gen_describe_vpc_endpoint_connections_paginator(gen_ec2_client):171 return gen_ec2_client.get_paginator("describe_vpc_endpoint_connections")172@pytest.fixture173def gen_describe_byoip_cidrs_paginator(gen_ec2_client):174 return gen_ec2_client.get_paginator("describe_byoip_cidrs")175@pytest.fixture176def gen_describe_capacity_reservations_paginator(gen_ec2_client):177 return gen_ec2_client.get_paginator("describe_capacity_reservations")178@pytest.fixture179def gen_describe_classic_link_instances_paginator(gen_ec2_client):180 return gen_ec2_client.get_paginator("describe_classic_link_instances")181@pytest.fixture182def gen_describe_client_vpn_authorization_rules_paginator(gen_ec2_client):183 return gen_ec2_client.get_paginator("describe_client_vpn_authorization_rules")184@pytest.fixture185def gen_describe_client_vpn_connections_paginator(gen_ec2_client):186 return gen_ec2_client.get_paginator("describe_client_vpn_connections")187@pytest.fixture188def gen_describe_client_vpn_endpoints_paginator(gen_ec2_client):189 return gen_ec2_client.get_paginator("describe_client_vpn_endpoints")190@pytest.fixture191def gen_describe_client_vpn_routes_paginator(gen_ec2_client):192 return gen_ec2_client.get_paginator("describe_client_vpn_routes")193@pytest.fixture194def gen_describe_client_vpn_target_networks_paginator(gen_ec2_client):195 return gen_ec2_client.get_paginator("describe_client_vpn_target_networks")196@pytest.fixture197def gen_describe_egress_only_internet_gateways_paginator(gen_ec2_client):198 return gen_ec2_client.get_paginator("describe_egress_only_internet_gateways")199@pytest.fixture200def gen_describe_fleets_paginator(gen_ec2_client):201 return gen_ec2_client.get_paginator("describe_fleets")202@pytest.fixture203def gen_describe_flow_logs_paginator(gen_ec2_client):204 return gen_ec2_client.get_paginator("describe_flow_logs")205@pytest.fixture206def gen_describe_fpga_images_paginator(gen_ec2_client):207 return gen_ec2_client.get_paginator("describe_fpga_images")208@pytest.fixture209def gen_describe_host_reservation_offerings_paginator(gen_ec2_client):210 return gen_ec2_client.get_paginator("describe_host_reservation_offerings")211@pytest.fixture212def gen_describe_host_reservations_paginator(gen_ec2_client):213 return gen_ec2_client.get_paginator("describe_host_reservations")214@pytest.fixture215def gen_describe_hosts_paginator(gen_ec2_client):216 return gen_ec2_client.get_paginator("describe_hosts")217@pytest.fixture218def gen_describe_import_image_tasks_paginator(gen_ec2_client):219 return gen_ec2_client.get_paginator("describe_import_image_tasks")220@pytest.fixture221def gen_describe_import_snapshot_tasks_paginator(gen_ec2_client):222 return gen_ec2_client.get_paginator("describe_import_snapshot_tasks")223@pytest.fixture224def gen_describe_instance_credit_specifications_paginator(gen_ec2_client):225 return gen_ec2_client.get_paginator("describe_instance_credit_specifications")226@pytest.fixture227def gen_describe_launch_template_versions_paginator(gen_ec2_client):228 return gen_ec2_client.get_paginator("describe_launch_template_versions")229@pytest.fixture230def gen_describe_launch_templates_paginator(gen_ec2_client):231 return gen_ec2_client.get_paginator("describe_launch_templates")232@pytest.fixture233def gen_describe_moving_addresses_paginator(gen_ec2_client):234 return gen_ec2_client.get_paginator("describe_moving_addresses")235@pytest.fixture236def gen_describe_network_interface_permissions_paginator(gen_ec2_client):237 return gen_ec2_client.get_paginator("describe_network_interface_permissions")238@pytest.fixture239def gen_describe_prefix_lists_paginator(gen_ec2_client):240 return gen_ec2_client.get_paginator("describe_prefix_lists")241@pytest.fixture242def gen_describe_principal_id_format_paginator(gen_ec2_client):243 return gen_ec2_client.get_paginator("describe_principal_id_format")244@pytest.fixture245def gen_describe_public_ipv4_pools_paginator(gen_ec2_client):246 return gen_ec2_client.get_paginator("describe_public_ipv4_pools")247@pytest.fixture248def gen_describe_scheduled_instance_availability_paginator(gen_ec2_client):249 return gen_ec2_client.get_paginator("describe_scheduled_instance_availability")250@pytest.fixture251def gen_describe_scheduled_instances_paginator(gen_ec2_client):252 return gen_ec2_client.get_paginator("describe_scheduled_instances")253@pytest.fixture254def gen_describe_stale_security_groups_paginator(gen_ec2_client):255 return gen_ec2_client.get_paginator("describe_stale_security_groups")256@pytest.fixture257def gen_describe_transit_gateway_attachments_paginator(gen_ec2_client):258 return gen_ec2_client.get_paginator("describe_transit_gateway_attachments")259@pytest.fixture260def gen_describe_transit_gateway_route_tables_paginator(gen_ec2_client):261 return gen_ec2_client.get_paginator("describe_transit_gateway_route_tables")262@pytest.fixture263def gen_describe_transit_gateway_vpc_attachments_paginator(gen_ec2_client):264 return gen_ec2_client.get_paginator("describe_transit_gateway_vpc_attachments")265@pytest.fixture266def gen_describe_transit_gateways_paginator(gen_ec2_client):267 return gen_ec2_client.get_paginator("describe_transit_gateways")268@pytest.fixture269def gen_describe_volumes_modifications_paginator(gen_ec2_client):270 return gen_ec2_client.get_paginator("describe_volumes_modifications")271@pytest.fixture272def gen_describe_vpc_classic_link_dns_support_paginator(gen_ec2_client):273 return gen_ec2_client.get_paginator("describe_vpc_classic_link_dns_support")274@pytest.fixture275def gen_describe_vpc_endpoint_connection_notifications_paginator(gen_ec2_client):276 return gen_ec2_client.get_paginator(277 "describe_vpc_endpoint_connection_notifications"278 )279@pytest.fixture280def gen_describe_vpc_endpoint_service_configurations_paginator(gen_ec2_client):281 return gen_ec2_client.get_paginator("describe_vpc_endpoint_service_configurations")282@pytest.fixture283def gen_describe_vpc_endpoint_service_permissions_paginator(gen_ec2_client):284 return gen_ec2_client.get_paginator("describe_vpc_endpoint_service_permissions")285@pytest.fixture286def gen_describe_vpc_peering_connections_paginator(gen_ec2_client):287 return gen_ec2_client.get_paginator("describe_vpc_peering_connections")288@pytest.fixture289def gen_get_transit_gateway_attachment_propagations_paginator(gen_ec2_client):290 return gen_ec2_client.get_paginator("get_transit_gateway_attachment_propagations")291@pytest.fixture292def gen_get_transit_gateway_route_table_associations_paginator(gen_ec2_client):293 return gen_ec2_client.get_paginator("get_transit_gateway_route_table_associations")294@pytest.fixture295def gen_get_transit_gateway_route_table_propagations_paginator(gen_ec2_client):296 return gen_ec2_client.get_paginator("get_transit_gateway_route_table_propagations")297@pytest.fixture298def gen_describe_internet_gateways_paginator(gen_ec2_client):299 return gen_ec2_client.get_paginator("describe_internet_gateways")300@pytest.fixture301def gen_describe_network_acls_paginator(gen_ec2_client):302 return gen_ec2_client.get_paginator("describe_network_acls")303@pytest.fixture304def gen_describe_vpcs_paginator(gen_ec2_client):305 return gen_ec2_client.get_paginator("describe_vpcs")306@pytest.fixture307def gen_describe_spot_instance_requests_paginator(gen_ec2_client):308 return gen_ec2_client.get_paginator("describe_spot_instance_requests")309@pytest.fixture310def gen_describe_dhcp_options_paginator(gen_ec2_client):311 return gen_ec2_client.get_paginator("describe_dhcp_options")312@pytest.fixture313def gen_describe_subnets_paginator(gen_ec2_client):314 return gen_ec2_client.get_paginator("describe_subnets")315@pytest.fixture316def gen_describe_traffic_mirror_filters_paginator(gen_ec2_client):317 return gen_ec2_client.get_paginator("describe_traffic_mirror_filters")318@pytest.fixture319def gen_describe_traffic_mirror_sessions_paginator(gen_ec2_client):320 return gen_ec2_client.get_paginator("describe_traffic_mirror_sessions")321@pytest.fixture322def gen_describe_traffic_mirror_targets_paginator(gen_ec2_client):323 return gen_ec2_client.get_paginator("describe_traffic_mirror_targets")324@pytest.fixture325def gen_describe_export_image_tasks_paginator(gen_ec2_client):326 return gen_ec2_client.get_paginator("describe_export_image_tasks")327@pytest.fixture328def gen_describe_fast_snapshot_restores_paginator(gen_ec2_client):329 return gen_ec2_client.get_paginator("describe_fast_snapshot_restores")330@pytest.fixture331def gen_describe_ipv6_pools_paginator(gen_ec2_client):332 return gen_ec2_client.get_paginator("describe_ipv6_pools")333@pytest.fixture334def gen_get_associated_ipv6_pool_cidrs_paginator(gen_ec2_client):335 return gen_ec2_client.get_paginator("get_associated_ipv6_pool_cidrs")336@pytest.fixture337def gen_describe_coip_pools_paginator(gen_ec2_client):338 return gen_ec2_client.get_paginator("describe_coip_pools")339@pytest.fixture340def gen_describe_instance_type_offerings_paginator(gen_ec2_client):341 return gen_ec2_client.get_paginator("describe_instance_type_offerings")342@pytest.fixture343def gen_describe_instance_types_paginator(gen_ec2_client):344 return gen_ec2_client.get_paginator("describe_instance_types")345@pytest.fixture346def gen_describe_local_gateway_route_table_virtual_interface_group_associations_paginator(347 gen_ec2_client,348):349 return gen_ec2_client.get_paginator(350 "describe_local_gateway_route_table_virtual_interface_group_associations"351 )352@pytest.fixture353def gen_describe_local_gateway_route_table_vpc_associations_paginator(gen_ec2_client):354 return gen_ec2_client.get_paginator(355 "describe_local_gateway_route_table_vpc_associations"356 )357@pytest.fixture358def gen_describe_local_gateway_route_tables_paginator(gen_ec2_client):359 return gen_ec2_client.get_paginator("describe_local_gateway_route_tables")360@pytest.fixture361def gen_describe_local_gateway_virtual_interface_groups_paginator(gen_ec2_client):362 return gen_ec2_client.get_paginator(363 "describe_local_gateway_virtual_interface_groups"364 )365@pytest.fixture366def gen_describe_local_gateway_virtual_interfaces_paginator(gen_ec2_client):367 return gen_ec2_client.get_paginator("describe_local_gateway_virtual_interfaces")368@pytest.fixture369def gen_describe_local_gateways_paginator(gen_ec2_client):370 return gen_ec2_client.get_paginator("describe_local_gateways")371@pytest.fixture372def gen_describe_transit_gateway_multicast_domains_paginator(gen_ec2_client):373 return gen_ec2_client.get_paginator("describe_transit_gateway_multicast_domains")374@pytest.fixture375def gen_describe_transit_gateway_peering_attachments_paginator(gen_ec2_client):376 return gen_ec2_client.get_paginator("describe_transit_gateway_peering_attachments")377@pytest.fixture378def gen_get_transit_gateway_multicast_domain_associations_paginator(gen_ec2_client):379 return gen_ec2_client.get_paginator(380 "get_transit_gateway_multicast_domain_associations"381 )382@pytest.fixture383def gen_search_local_gateway_routes_paginator(gen_ec2_client):384 return gen_ec2_client.get_paginator("search_local_gateway_routes")385@pytest.fixture386def gen_search_transit_gateway_multicast_groups_paginator(gen_ec2_client):387 return gen_ec2_client.get_paginator("search_transit_gateway_multicast_groups")388@pytest.fixture389def gen_describe_managed_prefix_lists_paginator(gen_ec2_client):390 return gen_ec2_client.get_paginator("describe_managed_prefix_lists")391@pytest.fixture392def gen_get_managed_prefix_list_associations_paginator(gen_ec2_client):393 return gen_ec2_client.get_paginator("get_managed_prefix_list_associations")394@pytest.fixture395def gen_get_managed_prefix_list_entries_paginator(gen_ec2_client):396 return gen_ec2_client.get_paginator("get_managed_prefix_list_entries")397@pytest.fixture398def gen_get_groups_for_capacity_reservation_paginator(gen_ec2_client):399 return gen_ec2_client.get_paginator("get_groups_for_capacity_reservation")400@pytest.fixture401def gen_describe_carrier_gateways_paginator(gen_ec2_client):402 return gen_ec2_client.get_paginator("describe_carrier_gateways")403@pytest.fixture404def gen_get_transit_gateway_prefix_list_references_paginator(gen_ec2_client):405 return gen_ec2_client.get_paginator("get_transit_gateway_prefix_list_references")406@pytest.fixture407def gen_describe_network_insights_analyses_paginator(gen_ec2_client):408 return gen_ec2_client.get_paginator("describe_network_insights_analyses")409@pytest.fixture410def gen_describe_network_insights_paths_paginator(gen_ec2_client):411 return gen_ec2_client.get_paginator("describe_network_insights_paths")412@pytest.fixture413def gen_describe_transit_gateway_connect_peers_paginator(gen_ec2_client):414 return gen_ec2_client.get_paginator("describe_transit_gateway_connect_peers")415@pytest.fixture416def gen_describe_transit_gateway_connects_paginator(gen_ec2_client):417 return gen_ec2_client.get_paginator("describe_transit_gateway_connects")418@pytest.fixture419def gen_describe_addresses_attribute_paginator(gen_ec2_client):420 return gen_ec2_client.get_paginator("describe_addresses_attribute")421@pytest.fixture422def gen_describe_replace_root_volume_tasks_paginator(gen_ec2_client):423 return gen_ec2_client.get_paginator("describe_replace_root_volume_tasks")424@pytest.fixture425def gen_describe_store_image_tasks_paginator(gen_ec2_client):426 return gen_ec2_client.get_paginator("describe_store_image_tasks")427@pytest.fixture428def gen_describe_security_group_rules_paginator(gen_ec2_client):429 return gen_ec2_client.get_paginator("describe_security_group_rules")430@pytest.fixture431def gen_describe_instance_event_windows_paginator(gen_ec2_client):432 return gen_ec2_client.get_paginator("describe_instance_event_windows")433@pytest.fixture434def gen_describe_trunk_interface_associations_paginator(gen_ec2_client):435 return gen_ec2_client.get_paginator("describe_trunk_interface_associations")436@pytest.fixture437def gen_get_vpn_connection_device_types_paginator(gen_ec2_client):438 return gen_ec2_client.get_paginator("get_vpn_connection_device_types")439@pytest.fixture440def gen_describe_capacity_reservation_fleets_paginator(gen_ec2_client):441 return gen_ec2_client.get_paginator("describe_capacity_reservation_fleets")442@pytest.fixture443def gen_get_instance_types_from_instance_requirements_paginator(gen_ec2_client):444 return gen_ec2_client.get_paginator("get_instance_types_from_instance_requirements")445@pytest.fixture446def gen_get_spot_placement_scores_paginator(gen_ec2_client):447 return gen_ec2_client.get_paginator("get_spot_placement_scores")448# ============================449# RESOURCES450# ============================451@pytest.fixture452def gen_classic_address(gen_ec2_resource):453 return gen_ec2_resource.ClassicAddress(random_str())454@pytest.fixture455def gen_dhcp_options(gen_ec2_resource):456 return gen_ec2_resource.DhcpOptions(random_str())457@pytest.fixture458def gen_image(gen_ec2_resource):459 return gen_ec2_resource.Image(random_str())460@pytest.fixture461def gen_instance(gen_ec2_resource):462 return gen_ec2_resource.Instance(random_str())463@pytest.fixture464def gen_internet_gateway(gen_ec2_resource):465 return gen_ec2_resource.InternetGateway(random_str())466@pytest.fixture467def gen_key_pair(gen_ec2_resource):468 return gen_ec2_resource.KeyPair(random_str())469@pytest.fixture470def gen_network_acl(gen_ec2_resource):471 return gen_ec2_resource.NetworkAcl(random_str())472@pytest.fixture473def gen_network_interface(gen_ec2_resource):474 return gen_ec2_resource.NetworkInterface(random_str())475@pytest.fixture476def gen_network_interface_association(gen_ec2_resource):477 return gen_ec2_resource.NetworkInterfaceAssociation(random_str())478@pytest.fixture479def gen_placement_group(gen_ec2_resource):480 return gen_ec2_resource.PlacementGroup(random_str())481@pytest.fixture482def gen_route(gen_ec2_resource):483 return gen_ec2_resource.Route(random_str(), random_str())484@pytest.fixture485def gen_route_table(gen_ec2_resource):486 return gen_ec2_resource.RouteTable(random_str())487@pytest.fixture488def gen_route_table_association(gen_ec2_resource):489 return gen_ec2_resource.RouteTableAssociation(random_str())490@pytest.fixture491def gen_security_group(gen_ec2_resource):492 return gen_ec2_resource.SecurityGroup(random_str())493@pytest.fixture494def gen_snapshot(gen_ec2_resource):495 return gen_ec2_resource.Snapshot(random_str())496@pytest.fixture497def gen_subnet(gen_ec2_resource):498 return gen_ec2_resource.Subnet(random_str())499@pytest.fixture500def gen_tag(gen_ec2_resource):501 return gen_ec2_resource.Tag(random_str(), random_str(), random_str())502@pytest.fixture503def gen_volume(gen_ec2_resource):504 return gen_ec2_resource.Volume(random_str())505@pytest.fixture506def gen_vpc(gen_ec2_resource):507 return gen_ec2_resource.Vpc(random_str())508@pytest.fixture509def gen_vpc_peering_connection(gen_ec2_resource):510 return gen_ec2_resource.VpcPeeringConnection(random_str())511@pytest.fixture512def gen_vpc_address(gen_ec2_resource):513 return gen_ec2_resource.VpcAddress(random_str())514# ============================515# COLLECTIONS516# ============================517@pytest.fixture518def gen_service_resource_classic_addresses_collection(gen_ec2_resource):519 return gen_ec2_resource.classic_addresses.all()520@pytest.fixture521def gen_service_resource_dhcp_options_sets_collection(gen_ec2_resource):522 return gen_ec2_resource.dhcp_options_sets.all()523@pytest.fixture524def gen_service_resource_images_collection(gen_ec2_resource):525 return gen_ec2_resource.images.all()526@pytest.fixture527def gen_service_resource_instances_collection(gen_ec2_resource):528 return gen_ec2_resource.instances.all()529@pytest.fixture530def gen_service_resource_internet_gateways_collection(gen_ec2_resource):531 return gen_ec2_resource.internet_gateways.all()532@pytest.fixture533def gen_service_resource_key_pairs_collection(gen_ec2_resource):534 return gen_ec2_resource.key_pairs.all()535@pytest.fixture536def gen_service_resource_network_acls_collection(gen_ec2_resource):537 return gen_ec2_resource.network_acls.all()538@pytest.fixture539def gen_service_resource_network_interfaces_collection(gen_ec2_resource):540 return gen_ec2_resource.network_interfaces.all()541@pytest.fixture542def gen_service_resource_placement_groups_collection(gen_ec2_resource):543 return gen_ec2_resource.placement_groups.all()544@pytest.fixture545def gen_service_resource_route_tables_collection(gen_ec2_resource):546 return gen_ec2_resource.route_tables.all()547@pytest.fixture548def gen_service_resource_security_groups_collection(gen_ec2_resource):549 return gen_ec2_resource.security_groups.all()550@pytest.fixture551def gen_service_resource_snapshots_collection(gen_ec2_resource):552 return gen_ec2_resource.snapshots.all()553@pytest.fixture554def gen_service_resource_subnets_collection(gen_ec2_resource):555 return gen_ec2_resource.subnets.all()556@pytest.fixture557def gen_service_resource_volumes_collection(gen_ec2_resource):558 return gen_ec2_resource.volumes.all()559@pytest.fixture560def gen_service_resource_vpc_addresses_collection(gen_ec2_resource):561 return gen_ec2_resource.vpc_addresses.all()562@pytest.fixture563def gen_service_resource_vpc_peering_connections_collection(gen_ec2_resource):564 return gen_ec2_resource.vpc_peering_connections.all()565@pytest.fixture566def gen_service_resource_vpcs_collection(gen_ec2_resource):567 return gen_ec2_resource.vpcs.all()568@pytest.fixture569def gen_instance_volumes_collection(gen_instance):570 return gen_instance.volumes.all()571@pytest.fixture572def gen_instance_vpc_addresses_collection(gen_instance):573 return gen_instance.vpc_addresses.all()574@pytest.fixture575def gen_placement_group_instances_collection(gen_placement_group):576 return gen_placement_group.instances.all()577@pytest.fixture578def gen_subnet_instances_collection(gen_subnet):579 return gen_subnet.instances.all()580@pytest.fixture581def gen_subnet_network_interfaces_collection(gen_subnet):582 return gen_subnet.network_interfaces.all()583@pytest.fixture584def gen_volume_snapshots_collection(gen_volume):585 return gen_volume.snapshots.all()586@pytest.fixture587def gen_vpc_accepted_vpc_peering_connections_collection(gen_vpc):588 return gen_vpc.accepted_vpc_peering_connections.all()589@pytest.fixture590def gen_vpc_instances_collection(gen_vpc):591 return gen_vpc.instances.all()592@pytest.fixture593def gen_vpc_internet_gateways_collection(gen_vpc):594 return gen_vpc.internet_gateways.all()595@pytest.fixture596def gen_vpc_network_acls_collection(gen_vpc):597 return gen_vpc.network_acls.all()598@pytest.fixture599def gen_vpc_network_interfaces_collection(gen_vpc):600 return gen_vpc.network_interfaces.all()601@pytest.fixture602def gen_vpc_requested_vpc_peering_connections_collection(gen_vpc):603 return gen_vpc.requested_vpc_peering_connections.all()604@pytest.fixture605def gen_vpc_route_tables_collection(gen_vpc):606 return gen_vpc.route_tables.all()607@pytest.fixture608def gen_vpc_security_groups_collection(gen_vpc):609 return gen_vpc.security_groups.all()610@pytest.fixture611def gen_vpc_subnets_collection(gen_vpc):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!