Best Python code snippet using localstack_python
producer.py
Source:producer.py
1import logging2import numpy as np3import pandas as pd4from typing import Any5from queue import Queue6from collections import deque7from threading import Thread, RLock8from dataclasses import dataclass, field9@dataclass(order = True)10class Result:11 priority : Any12 index : int13 result : Any = field(compare = False)14class StoppedException(Exception):15 def __init__(self, * args, ** kwargs):16 super(StoppedException, self).__init__(* args, ** kwargs)17def _create_generator(generator):18 if isinstance(generator, (list, tuple, np.ndarray)):19 return lambda: iter(generator)20 elif isinstance(generator, pd.DataFrame):21 def _df_iterator():22 for idx, row in generator.iterrows():23 yield row24 return _df_iterator25 elif isinstance(generator, Queue):26 def _queue_iterator():27 while True:28 item = queue.get()29 if item is None: raise StopIteration()30 yield item31 return _queue_iterator32 else:33 raise ValueError('Unknown generator type ({}) : {}'.format(type(generator), generator))34 35def _get_thread_name(generator, name):36 if name is not None: return name37 if hasattr(generator, 'name'): return generator.name38 elif hasattr(generator, '__name__'): return generator.__name__39 return None40def _get_listener_name(listener):41 if hasattr(listener, 'name'): return listener.name42 elif hasattr(listener, '__name__'): return listener.__name__43 else: return listener.__class__.__name__44class Producer(Thread):45 """46 Thread that iterates on a `generator` in a Thread47 It does not store any result but has `listeners` that are called after each generation48 """49 def __init__(self,50 generator,51 * args,52 53 description = None,54 55 consumers = None,56 start_listeners = None,57 stop_listeners = None,58 59 run_main_thread = False,60 stop_no_more_listeners = True,61 62 name = None,63 ** kwargs64 ):65 """66 Constructor for the `Producer`67 68 Arguments :69 - generator : the callable function that returns a `generator` (or `iterator`)70 71 - consumers : consumers to add72 - {start / stop}_listeners : listeners to add on start / stop events73 74 - run_main_thread : whether to run in a separate Thread or not75 - stop_no_more_listeners : whether to call `self.stop` when the producer has lost all its consumers (note that if a producer (or consumer) has never had any consumer, this argument is ignored)76 - name : the Thread's name77 """78 Thread.__init__(self, name = _get_thread_name(generator, name))79 80 self.generator = generator if callable(generator) else _create_generator(generator)81 self.description = self.generator.__doc__ if not description and hasattr(self.generator, '__doc__') else description82 self.run_main_thread = run_main_thread83 self.stop_no_more_listeners = stop_no_more_listeners84 self.start_listeners = []85 self.item_listeners = []86 self.stop_listeners = []87 88 self.mutex_infos = RLock()89 self._stopped = False90 self._finished = False91 self.__size = 092 93 if start_listeners is not None:94 if not isinstance(start_listeners, (list, tuple)): start_listeners = [start_listeners]95 for l in start_listeners: self.add_listener(l, on = 'start')96 97 if consumers is not None:98 if not isinstance(consumers, (list, tuple)): consumers = [consumers]99 for c in consumers: self.add_consumer(c, start = True, link_stop = True)100 101 if stop_listeners is not None:102 if not isinstance(stop_listeners, (list, tuple)): stop_listeners = [stop_listeners]103 for l in stop_listeners: self.add_listener(l, on = 'stop')104 105 @property106 def size(self):107 with self.mutex_infos: return self.__size108 109 @property110 def stopped(self):111 with self.mutex_infos: return self._stopped112 @property113 def finished(self):114 with self.mutex_infos: return self._finished115 116 @property117 def str_status(self):118 if self.run_main_thread: return '/'119 elif self.is_alive(): return 'alive'120 elif self.finished: return 'finished'121 else: return 'not started'122 123 @property124 def node_text(self):125 des = "{}\n".format(self.name)126 if self.description: des += "{}\n".format(self.description)127 des += "Thread : {}\n".format(self.str_status)128 return des129 130 def __iter__(self):131 return self.generator()132 133 def __len__(self):134 return len(self.generator) if self.generator is not self and hasattr(self.generator, '__len__') else self.size135 136 def __str__(self):137 des = 'Producer {}:\n'.format(self.name)138 des += 'Thread alive : {}\n'.format(self.is_alive())139 des += 'Already produced {} items\n'.format(self.size)140 des += '# Listeners :\n- Start : {}\n- Items : {}\n- Stop : {}\n'.format(141 len(self.start_listeners), len(self.item_listeners), len(self.stop_listeners)142 )143 return des144 145 def add_listener(self, listener, * args, on = 'item', ** kwargs):146 """147 Add a `listener` (callable) called at the given (`on`) event148 If the event is `item`, the first argument received is the produced item149 args / kwargs are given when called150 151 /!\ Listeners are executed in the Producer's thread so make sure to use `Consumer`'s running on separated threads to ensure a correct parallelization152 """153 assert on in('item', 'start', 'stop')154 155 if not callable(listener):156 raise ValueError('`listener` must be a callable ! Got type {}'.format(type(listener)))157 158 if isinstance(listener, Producer) and on == 'item':159 logging.debug('[LISTENER {}] consumer added !'.format(self.name))160 else:161 logging.debug('[LISTENER {}] listener added on `{}` event !'.format(self.name, on))162 163 infos = {'name' : _get_listener_name(listener), 'stopped' : False}164 if on == 'item':165 if isinstance(listener, Producer): infos['consumer_class'] = listener166 self.item_listeners.append((167 lambda item, ** kw: listener(item, * args, ** {** kwargs, ** kw}), infos168 ))169 elif on == 'start':170 self.start_listeners.append((lambda: listener(* args, ** kwargs), infos))171 elif on == 'stop':172 self.stop_listeners.append((lambda: listener(* args, ** kwargs), infos))173 174 def add_consumer(self, consumer, * args, start = True, link_stop = False, ** kwargs):175 """176 Add a `Consumer` (possibly creates it)177 178 Arguments :179 - consumer : the `callable` or `Consumer` instance that will consume the produced items180 - stateful : whether to use a `StatefulConsumer` or a Consumer181 - start : whether to start the Consumer's thread or not182 - link_stop : if True, it will call the Consumer's `stop` when the producer stops183 - args / kwargs : passed to the Consumer's constructor (if called)184 Return : the `Consumer` instance185 """186 from utils.thread_utils.consumer import Consumer187 188 if not isinstance(consumer, Consumer):189 if not isinstance(consumer, list):190 if not isinstance(consumer, dict): consumer = {'consumer' : consumer}191 consumer = Consumer(* args, ** {** kwargs, ** consumer})192 else:193 from utils.thread_utils.pipeline import Pipeline194 consumer = Pipeline(consumer, * args, ** kwargs)195 196 self.add_listener(consumer, on = 'item')197 if link_stop:198 self.add_listener(consumer.stop, on = 'stop')199 consumer.add_listener(self.stop, on = 'stop')200 if start and not consumer.is_alive(): consumer.start()201 return consumer202 203 def run(self):204 """ Start the producer, iterates on the `generator` then stops the thread """205 self.on_start()206 for item in self:207 if self.stopped: break208 self.on_item_produced(item)209 self.on_stop()210 211 def start(self):212 if self.run_main_thread: self.run()213 else: super().start()214 215 def stop(self):216 with self.mutex_infos:217 self._stopped = True218 219 def join(self, * args, recursive = False, ** kwargs):220 logging.debug('[{}JOIN {}]{}'.format(221 'RECURSIVE ' if recursive else '', self.name,222 ' {} consumers'.format(len(self.item_listeners)) if recursive else ''223 ))224 if not self.run_main_thread:225 super().join(* args, ** kwargs)226 227 if recursive:228 for l, infos in self.item_listeners:229 if 'consumer_class' not in infos: continue230 infos['consumer_class'].join(* args, recursive = True, ** kwargs)231 logging.debug('[JOIN {}] Joined !'.format(self.name))232 233 def on_start(self):234 """ Function called when starting the thread """235 logging.debug('[STATUS {}] Start'.format(self.name))236 for l, _ in self.start_listeners: l()237 def on_stop(self):238 """ Function called when stopping the thread """239 logging.debug('[STATUS {}] Stop'.format(self.name))240 self.stop()241 with self.mutex_infos: self._finished = True242 for l, _ in self.stop_listeners: l()243 def on_item_produced(self, item):244 """ Function called when a new item is generated """245 logging.debug('[ITEM PRODUCED {}]'.format(self.name))246 with self.mutex_infos:247 self.__size += 1248 249 for l, infos in self.item_listeners:250 if infos.get('stopped', False): continue251 try:252 l(item) if not isinstance(item, Result) else l(item.result, priority = item.priority)253 except StoppedException:254 logging.debug('[CONSUMER STOPPED {}] consumer {} stopped'.format(255 self.name, infos['name']256 ))257 infos['stopped'] = True258 259 _stopped = [l for l, i in self.item_listeners if i.get('stopped', False)]260 if len(_stopped) > 0 and len(_stopped) == len(self.item_listeners) and self.stop_no_more_listeners:261 logging.debug('[STATUS {}] no more consumers, stopping the thread'.format(self.name))262 self.stop()263 264 def plot(self, filename = None, name = None, view = True, graph = None,265 node_graph = None, node_id = 0):266 """ Builds a `graphviz.Digraph` representing the producer-consumer pipeline """267 def _add_listeners_graph(listeners, label, edge_label):268 if len(listeners) == 0: return False269 g_name = 'cluster_{}{}'.format(str_id, edge_label)270 271 with graph.subgraph(name = g_name) as sub_graph:272 sub_graph.attr(label = label)273 for l, infos in listeners:274 sub_graph.node(str(l), label = infos['name'], shape = 'circle')275 graph.edge(str_id, str(l), label = edge_label, lhead = g_name)276 return True277 278 import graphviz as gv279 if graph is None:280 if name is None: name = filename if filename else 'Graph'281 graph = gv.Digraph(name = name, filename = filename)282 graph.attr(compound = 'true')283 284 str_id = str(node_id)285 286 plot_style = {287 'id' : str_id,288 'shape' : "box" if type(self) == Producer else "ellipse",289 'label' : self.node_text.replace('\n', '\l').replace('\l', '\n', 1)290 }291 if node_graph is None: node_graph = graph292 293 node_graph.node(str_id, ** plot_style)294 295 _add_listeners_graph(self.start_listeners, 'Start listeners', 'on_start')296 next_id = node_id + 1297 if len(self.item_listeners) > 0:298 cons_graph_name = 'cluster_{}consumers'.format(str_id)299 with graph.subgraph(name = cons_graph_name) as sub_graph:300 sub_graph.attr(label = 'Consumers')301 for l, infos in self.item_listeners:302 if 'consumer_class' in infos:303 graph, child_id, next_id = infos['consumer_class'].plot(304 graph = graph, node_graph = sub_graph, node_id = next_id, filename = None305 )306 else:307 child_id = str(l)308 sub_graph.node(child_id, label = infos['name'], shape = 'circle')309 graph.edge(str_id, child_id, lhead = cons_graph_name)310 311 _add_listeners_graph(self.stop_listeners, 'Stop listeners', 'on_stop')312 if node_id == 0 and view:313 graph.view()314 ...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!