Best Python code snippet using localstack_python
fetching.py
Source:fetching.py
1"""2The only place in this package that directly interacts with the AWS API.3Contains the cached fetcher that ensures that expensive calls to the AWS API4are cached.5"""6import time7from timeit import default_timer8from loguru import logger9from prometheus_ecs_discoverer import s, telemetry, toolbox10from prometheus_ecs_discoverer.caching import SlidingCache11# Telemetry ====================================================================12CLUSTERS = telemetry.gauge("clusters", "Fetched clusters.")13CINSTANCES = telemetry.gauge(14 "container_instances", "Fetched container instances.", ("cluster",)15)16TASKS = telemetry.gauge("tasks", "Fetched tasks.", ("cluster",))17DURATION = telemetry.histogram(18 "api_requests_duration_seconds", "Duration of requests to the AWS API.", ("method",)19)20# ==============================================================================21class CachedFetcher:22 """Work with the AWS API leveraging a sliding cache.23 Reduces the amount of request made to the AWS API helping to stay below the24 request limits. Only implements necessary methods. So not a generic class.25 Rember to flush all caches with `flush_caches` after every "full round".26 """27 def __init__(28 self,29 ecs_client,30 ec2_client,31 throttle_interval_seconds: float = 0.1,32 should_throttle: bool = False,33 ):34 """35 :param ecs_client: Boto3 ECS client.36 :param ec2_client: Boto3 EC2 client.37 :param throttle_interval_seconds: Time to sleep after every single38 request made to the AWS API.39 :param should_throttle: If process should go to sleep after a request40 made to the AWS API.41 """42 self.ecs = ecs_client43 self.ec2 = ec2_client44 self.throttle_interval_seconds = throttle_interval_seconds45 self.should_throttle = should_throttle46 self.task_cache = SlidingCache(name="task_cache")47 self.task_definition_cache = SlidingCache(name="task_definition_cache")48 self.container_instance_cache = SlidingCache(name="container_instance_cache")49 self.ec2_instance_cache = SlidingCache(name="ec2_instance_cache")50 def flush_caches(self) -> None:51 """Flush all caches. Should be called at the end of a round."""52 self.task_cache.flush()53 self.task_definition_cache.flush()54 self.container_instance_cache.flush()55 self.ec2_instance_cache.flush()56 # ==========================================================================57 def get_arns(self, method: str, key: str, **aws_api_parameters) -> list:58 """Get the ARNs with a given method and key.59 Args:60 method: AWS API method to use.61 key: Key to extract from the response(s).62 Returns:63 list: List of ARNs.64 """65 arns = []66 total_start_time = default_timer()67 start_time = total_start_time68 for page in self.ecs.get_paginator(method).paginate(**aws_api_parameters):69 DURATION.labels(method).observe(max(default_timer() - start_time, 0))70 arns += page.get(key, [])71 if self.should_throttle:72 time.sleep(self.throttle_interval_seconds)73 start_time = default_timer()74 if s.DEBUG:75 logger.bind(**aws_api_parameters).debug("{} {}.", method, key)76 if s.PRINT_STRUCTS:77 toolbox.pstruct(arns, f"{key} {method}")78 return arns79 def get_cluster_arns(self) -> list:80 """Get cluster ARNs."""81 arns = self.get_arns("list_clusters", "clusterArns")82 CLUSTERS.set(len(arns))83 return arns84 def get_container_instance_arns(self, cluster_arn: str) -> list:85 """Get container instance ARNs for given cluster ARN."""86 arns = self.get_arns(87 "list_container_instances", "containerInstanceArns", cluster=cluster_arn88 )89 CINSTANCES.labels(cluster_arn).set(len(arns))90 return arns91 def get_task_definition_arns(self) -> list:92 """Get task definition ARNs."""93 return self.get_arns("list_task_definitions", "taskDefinitionArns")94 def get_task_arns(self, cluster_arn: str) -> list:95 """Get task ARNs for given cluster ARN."""96 arns = self.get_arns("list_tasks", "taskArns", cluster=cluster_arn)97 TASKS.labels(cluster_arn).set(len(arns))98 return arns99 # ==========================================================================100 def get_tasks(self, cluster_arn: str, task_arns: list = None) -> dict:101 """Get task descriptions from cache / AWS API.102 Returns:103 dict: Keys are the task ARNs, values the respective task descriptions.104 """105 def uncached_fetch(task_arns: list) -> dict:106 logger.bind(cluster_arn=cluster_arn, task_arns=task_arns).debug(107 "Fetch tasks from AWS with describe_tasks."108 ) if s.DEBUG else None109 tasks = []110 chunked_task_arns = toolbox.chunk_list(task_arns, 100)111 for task_arns_chunk in chunked_task_arns:112 start_time = default_timer()113 _t = self.ecs.describe_tasks(cluster=cluster_arn, tasks=task_arns_chunk)[114 "tasks"115 ]116 tasks += list(117 filter(lambda x: x.get("lastStatus", None) == "RUNNING", _t)118 )119 DURATION.labels("describe_tasks").observe(120 max(default_timer() - start_time, 0)121 )122 if self.should_throttle:123 time.sleep(self.throttle_interval_seconds)124 if s.PRINT_STRUCTS:125 toolbox.pstruct(tasks, "describe_tasks")126 return toolbox.list_to_dict(tasks, "taskArn")127 if task_arns is None:128 task_arns = self.get_task_arns(cluster_arn)129 return self.task_cache.get_multiple(task_arns, uncached_fetch)130 def get_task_definition(self, arn: str) -> dict:131 """Get single task definition descriptions from cache / AWS API.132 Returns:133 dict: Key is the task definition ARN, value the task definition134 description.135 """136 def uncached_fetch(arn: str) -> dict:137 logger.bind(arn=arn).debug(138 "Fetch task definition from AWS with describe_task_definition."139 ) if s.DEBUG else None140 start_time = default_timer()141 task_definition: dict = self.ecs.describe_task_definition(taskDefinition=arn)[142 "taskDefinition"143 ]144 DURATION.labels("describe_task_definition").observe(145 max(default_timer() - start_time, 0)146 )147 if s.PRINT_STRUCTS:148 toolbox.pstruct(task_definition, "fetched task definition")149 if self.should_throttle:150 time.sleep(self.throttle_interval_seconds)151 return task_definition152 return self.task_definition_cache.get_single(arn, uncached_fetch)153 def get_task_definitions(self, arns: list = None) -> dict:154 """Get task definition descriptions from cache / AWS API.155 Every given ARN corresponds with a (cached) call.156 Returns:157 dict: Keys are the task definition ARNs, values the respective task158 definition descriptions.159 """160 def uncached_fetch(arns: list) -> dict:161 logger.bind(arns=arns).debug(162 "Fetch task definitions from AWS with describe_task_definition."163 ) if s.DEBUG else None164 descriptions = {}165 for arn in arns:166 start_time = default_timer()167 response = self.ecs.describe_task_definition(taskDefinition=arn)168 DURATION.labels("describe_task_definition").observe(169 max(default_timer() - start_time, 0)170 )171 response_arn = response["taskDefinition"]["taskDefinitionArn"]172 descriptions[response_arn] = response["taskDefinition"]173 if self.should_throttle:174 time.sleep(self.throttle_interval_seconds)175 if s.PRINT_STRUCTS:176 toolbox.pstruct(descriptions, "fetched task definitions")177 return descriptions178 if arns is None:179 arns = self.get_task_definition_arns()180 return self.task_definition_cache.get_multiple(181 arns,182 uncached_fetch,183 )184 def get_container_instances(self, cluster_arn: str, arns: list = None) -> dict:185 """Get container instance descriptions from cache / AWS API.186 Returns:187 dict: Keys are the container instance ARNs, values the respective188 container instance descriptions.189 """190 def uncached_fetch(arns: list) -> dict:191 logger.bind(arns=arns).debug(192 "Fetch container instances from AWS with describe_container_instances."193 ) if s.DEBUG else None194 lst = []195 arns_chunks = toolbox.chunk_list(arns, 100)196 for arns_chunk in arns_chunks:197 start_time = default_timer()198 lst += self.ecs.describe_container_instances(199 cluster=cluster_arn, containerInstances=arns_chunk200 )["containerInstances"]201 DURATION.labels("describe_container_instances").observe(202 max(default_timer() - start_time, 0)203 )204 if self.should_throttle:205 time.sleep(self.throttle_interval_seconds)206 dct = toolbox.list_to_dict(lst, "containerInstanceArn")207 if s.PRINT_STRUCTS:208 toolbox.pstruct(dct, "describe_container_instances")209 return dct210 if arns is None:211 arns = self.get_container_instance_arns(cluster_arn)212 return self.container_instance_cache.get_multiple(213 arns,214 uncached_fetch,215 )216 def get_ec2_instances(self, instance_ids: list) -> dict:217 """Get EC2 instance descriptions from cache / AWS API.218 Returns:219 dict: Keys are the EC2 instance ARNs, values the respective220 EC2 instance descriptions.221 """222 def uncached_fetch(instance_ids: list) -> dict:223 logger.bind(instance_ids=instance_ids).debug(224 "Fetch EC2 instances from AWS with describe_instances."225 ) if s.DEBUG else None226 instances_list = []227 ids_chunks = toolbox.chunk_list(instance_ids, 100)228 for ids_chunk in ids_chunks:229 start_time = default_timer()230 response = self.ec2.describe_instances(InstanceIds=ids_chunk)231 for reservation in response["Reservations"]:232 instances_list += reservation["Instances"]233 DURATION.labels("describe_instances").observe(234 max(default_timer() - start_time, 0)235 )236 if self.should_throttle:237 time.sleep(self.throttle_interval_seconds)238 dct = toolbox.list_to_dict(instances_list, "InstanceId")239 if s.PRINT_STRUCTS:240 toolbox.pstruct(dct, "ec2.describe_instances")241 return dct242 return self.ec2_instance_cache.get_multiple(243 instance_ids,244 uncached_fetch,...
main.py
Source:main.py
1"""2Entry to PromED. Contains a lot of instrumentation and is responsible for3looping the discovery. It daemonizes the functionality.4"""5import sys6import time7from timeit import default_timer8import boto39from botocore.config import Config10from loguru import logger11from prometheus_client import Histogram, start_http_server12from prometheus_ecs_discoverer import discovery, fetching, marshalling, s, telemetry13INTERVAL_BREACHED_COUNTER = telemetry.counter(14 "execution_breaches_total",15 "Number of times the discovery round took longer than the configured interval.",16)17INTERVAL_BREACHED_COUNTER.inc(0)18def configure_logging() -> None:19 """Configure Loguru logging."""20 logger.remove()21 if s.LOG_JSON:22 fmt = "{message}"23 logger.add(sys.stderr, format=fmt, serialize=True, level=s.LOG_LEVEL)24 else:25 fmt = "<green>{time:HH:mm:ss}</green> <level>{level}</level> <cyan>{function}</cyan> {message} <dim>{extra}</dim>"26 logger.add(sys.stderr, colorize=True, format=fmt, level=s.LOG_LEVEL)27 if s.BOTO3_DEBUG:28 import boto329 boto3.set_stream_logger(name="botocore")30def expose_info() -> None:31 """Expose a gauge with info label values."""32 telemetry.info(33 {34 "interval_seconds": str(s.INTERVAL),35 }36 )37 INTERVAL_INFO = telemetry.gauge(38 "info_interval_seconds", "Configured interval in seconds."39 )40 INTERVAL_INFO.set(s.INTERVAL)41def get_interval_histogram(interval: int) -> Histogram:42 """Create histogram with buckets that fit the given interval.43 10 buckets below the interval and two buckets with 10 second steps larger44 than the interval.45 Args:46 interval (int): Interval PromED is running at.47 Returns:48 Histogram: Prometheus Histogram object.49 """50 steps = 1051 step_size = round(interval / steps, 0)52 return telemetry.histogram(53 "round_duration_seconds",54 "Histogram for duration",55 buckets=tuple(56 [x * step_size for x in range(steps)]57 + [58 interval + 10,59 interval + 20,60 float("inf"),61 ]62 ),63 )64def main():65 interval: int = s.INTERVAL66 output_dir: str = s.OUTPUT_DIRECTORY67 should_throttle: bool = s.WARMUP_THROTTLE68 configure_logging()69 expose_info()70 logger.info("Welcome to PromED, the Prometheus ECS Discoverer.")71 logger.bind(settings=s.as_dict()).info("Here is the used configuration.")72 DURATION_HISTOGRAM = get_interval_histogram(interval)73 if s.PROMETHEUS_START_HTTP_SERVER:74 port = s.PROMETHEUS_SERVER_PORT75 logger.bind(port=port).info("Start Prometheus HTTP server to expose metrics.")76 start_http_server(port=port)77 logger.info("Create Boto3 session.")78 session = boto3.Session()79 config = Config(retries={"max_attempts": s.MAX_RETRY_ATTEMPTS, "mode": "standard"})80 logger.info("Create Boto3 clients and CachedFetcher.")81 fetcher = fetching.CachedFetcher(82 session.client("ecs", config=config),83 session.client("ec2", config=config),84 should_throttle=should_throttle,85 throttle_interval_seconds=s.THROTTLE_INTERVAL_SECONDS,86 )87 logger.info("Create PrometheusEcsDiscoverer.")88 discoverer = discovery.PrometheusEcsDiscoverer(fetcher)89 if should_throttle:90 logger.info("First discovery round will be throttled down.")91 logger.info("Ready for discovery. The discoverer will run until interrupted.")92 first_round = True93 while True:94 logger.info("Start new discovery round.")95 start_time = default_timer()96 if not first_round:97 discoverer.fetcher.should_throttle = False98 targets = discoverer.discover()99 marshalling.write_targets_to_file(targets, output_dir)100 if first_round and should_throttle:101 fetcher.should_throttle = False102 first_round = False103 duration = max(default_timer() - start_time, 0)104 logger.bind(duration=duration).info("Finished discovery round.")105 DURATION_HISTOGRAM.observe(duration)106 if duration > interval:107 logger.bind(duration=duration).warning(108 "Discovery round took longer than the configured interval. Please investigate."109 )110 INTERVAL_BREACHED_COUNTER.inc()111 time_left = max(interval - duration, 0)112 time.sleep(time_left)113if __name__ == "__main__":...
plex_detector.py
Source:plex_detector.py
1import asyncio2import traceback3from plexapi.server import PlexServer4from helpers import PlexInhibitor, InhibitSource5import logging6logging.getLogger(__name__).setLevel(logging.DEBUG)7class PlexDetector:8 """Detects if anyone is streaming on a Plex server, and if so it determines if qbittorrent should have its upload9 throttled"""10 def __init__(self, plex_url, plex_token, interface_class=PlexInhibitor):11 logging.info(f"Initializing plexDetector, connecting to {plex_url}")12 self.plex_url = plex_url13 self.plex_token = plex_token14 try:15 self.plex_server = PlexServer(self.plex_url, self.plex_token)16 except Exception as e:17 logging.error(f"Failed to connect to {plex_url}: {e}")18 self.plex_server = None19 self.interface_class = interface_class20 logging.info(f"Connected to {plex_url}")21 self.interface_class.connected_to_plex = True22 def _get_activity(self):23 try:24 sessions = self.plex_server.sessions()25 should_throttle = False26 self.interface_class.total_sessions = 027 for session in sessions:28 if session.players[0].state == "playing" or session.players[0].state == "buffering":29 if session.session[0].location == "lan":30 continue31 should_throttle = True32 self.interface_class.total_sessions += 133 return should_throttle34 except Exception as e:35 logging.error(f"Failed to get plex activity: {e}\n{traceback.format_exc()}")36 self.interface_class.connected_to_plex = False37 def get_activity(self):38 return self._get_activity()39 async def run(self):40 while not self.interface_class.shutdown:41 logging.debug("Checking plex activity")42 if self._get_activity():43 self.interface_class.should_inhibit = True44 else:45 self.interface_class.should_inhibit = False...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!