Best Python code snippet using localstack_python
yosun.py
Source:yosun.py
...20 def __call__(self, *args, **kwargs):21 try:22 self._callback(*args, **kwargs)23 except Exception as e:24 self._on_exception(e)25 def _default_on_exception(self, exception):26 raise exception27class Subscription(object):28 def __init__(self, connection, exchange, binding_key, key_prefix='', reconnect_timeout=10, on_exception=None):29 self._connection = connection30 self._exchange = exchange31 self._binding_key = binding_key32 self._key_prefix = key_prefix33 self._reconnect_timeout = reconnect_timeout34 if on_exception is not None:35 if not isroutine(on_exception):36 raise ValueError('on_exception must be a function or method')37 self._on_exception = on_exception38 else:39 self._on_exception = self._default_on_exception40 self._handlers = defaultdict(list)41 self._handlers_for_all = []42 self._events = {}43 self._event_any = Event()44 self._running = False45 self._thread = None46 self.start()47 def __del__(self):48 self._running = False49 try:50 self._thread.join()51 except Exception as e:52 pass53 def _default_on_exception(self, exception):54 raise exception55 @property56 def is_alive(self):57 return self._thread is not None and self._thread.is_alive()58 def start(self):59 if not self._running:60 self._running = True61 self._thread = Thread(target=self._consume)62 self._thread.start()63 def stop(self):64 self._running = False65 def on(self, routing_key, callback, on_exception=None):66 # register a callback for the given routing key67 routing_key = '{0}{1}'.format(self._key_prefix, routing_key)68 self._handlers[routing_key].append(Handler(callback, on_exception))69 return self70 def all(self, callback, on_exception=None):71 # register a callback for all routing keys72 self._handlers_for_all.append(Handler(callback, on_exception))73 return self74 def wait(self, routing_key, timeout=None):75 # register an event wait76 routing_key = '{0}{1}'.format(self._key_prefix, routing_key)77 if routing_key not in self._events:78 self._events[routing_key] = Event()79 return self._events[routing_key].wait(timeout)80 def wait_any(self, timeout=None):81 return self._event_any.wait(timeout)82 def _on_message(self, body, message):83 if not self._running:84 return85 routing_key = message.delivery_info['routing_key']86 if routing_key in self._events:87 self._events[routing_key].set()88 self._events[routing_key].clear()89 self._event_any.set()90 self._event_any.clear()91 for handler in self._handlers[routing_key]:92 try:93 handler(body, message)94 except Exception as e:95 self._on_exception(e)96 for handler in self._handlers_for_all:97 try:98 handler(body, message)99 except Exception as e:100 self._on_exception(e)101 try:102 message.ack()103 except MessageStateError:104 pass105 def _consume(self):106 routing_key = '{0}{1}'.format(self._key_prefix, self._binding_key)107 while self._running:108 try:109 with connections[self._connection].acquire(block=True) as conn:110 queue = Queue(exchange=self._exchange, routing_key=routing_key, channel=conn,111 durable=False, exclusive=True, auto_delete=True)112 with Consumer(conn, queue, callbacks=[self._on_message]):113 try:114 while self._running:...
utils.py
Source:utils.py
...20 def __call__(self, *args, **kwargs):21 try:22 self._callback(*args, **kwargs)23 except Exception as e:24 self._on_exception(e)25 def _default_on_exception(self, exception):26 raise exception27class Subscription(object):28 def __init__(self, connection, exchange, binding_key, key_prefix='', reconnect_timeout=10, on_exception=None):29 self._connection = connection30 self._exchange = exchange31 self._binding_key = binding_key32 self._key_prefix = key_prefix33 self._reconnect_timeout = reconnect_timeout34 if on_exception is not None:35 if not isroutine(on_exception):36 raise ValueError('on_exception must be a function or method')37 self._on_exception = on_exception38 else:39 self._on_exception = self._default_on_exception40 self._handlers = defaultdict(list)41 self._handlers_for_all = []42 self._events = {}43 self._event_any = Event()44 self._running = False45 self._thread = None46 self.start()47 def __del__(self):48 self._running = False49 try:50 self._thread.join()51 except Exception as e:52 pass53 def _default_on_exception(self, exception):54 raise exception55 @property56 def is_alive(self):57 return self._thread is not None and self._thread.is_alive()58 def start(self):59 if not self._running:60 self._running = True61 self._thread = Thread(target=self._consume)62 self._thread.start()63 def stop(self):64 self._running = False65 def on(self, routing_key, callback, on_exception=None):66 # register a callback for the given routing key67 routing_key = '{0}{1}'.format(self._key_prefix, routing_key)68 self._handlers[routing_key].append(Handler(callback, on_exception))69 return self70 def all(self, callback, on_exception=None):71 # register a callback for all routing keys72 self._handlers_for_all.append(Handler(callback, on_exception))73 return self74 def wait(self, routing_key, timeout=None):75 # register an event wait76 routing_key = '{0}{1}'.format(self._key_prefix, routing_key)77 if routing_key not in self._events:78 self._events[routing_key] = Event()79 return self._events[routing_key].wait(timeout)80 def wait_any(self, timeout=None):81 return self._event_any.wait(timeout)82 def _on_message(self, body, message):83 if not self._running:84 return85 routing_key = message.delivery_info['routing_key']86 if routing_key in self._events:87 self._events[routing_key].set()88 self._events[routing_key].clear()89 self._event_any.set()90 self._event_any.clear()91 for handler in self._handlers[routing_key]:92 try:93 handler(body)94 except Exception as e:95 self._on_exception(e)96 for handler in self._handlers_for_all:97 try:98 handler(body)99 except Exception as e:100 self._on_exception(e)101 try:102 message.ack()103 except MessageStateError:104 pass105 def _consume(self):106 routing_key = '{0}{1}'.format(self._key_prefix, self._binding_key)107 while self._running:108 try:109 with connections[self._connection].acquire(block=True) as conn:110 queue = Queue(exchange=self._exchange, routing_key=routing_key, channel=conn,111 durable=False, exclusive=True, auto_delete=True)112 with Consumer(conn, queue, callbacks=[self._on_message]):113 try:114 while self._running:...
retry.py
Source:retry.py
...20 except catch_exceptions_types as ex:21 attempt += 122 if on_exception:23 if iscoroutinefunction(on_exception):24 await on_exception(ex, attempt)25 else:26 on_exception(ex, attempt)27 if attempt > times:28 raise29 if delay is not None:30 await asyncio.sleep(delay, loop=loop)31 return async_wrapper32def retry(times: int = 3,33 delay: Optional[float] = 0.1,34 catch_exceptions_types: CatchException = None,35 on_exception: OnException = None,36 loop=None):37 if catch_exceptions_types is None:38 catch_exceptions_types = Exception39 def retry_decorator(fn):40 if iscoroutinefunction(fn):41 return _get_retry_async_wrapper(fn,42 times,43 delay,44 catch_exceptions_types,45 on_exception,46 loop)47 @wraps(fn)48 def wrapper(*args, **kwargs):49 attempt = 050 while True:51 try:52 return fn(*args, **kwargs)53 except catch_exceptions_types as ex:54 attempt += 155 if on_exception:56 on_exception(ex, attempt)57 if attempt > times:58 raise59 if delay is not None:60 time.sleep(delay)61 return wrapper...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!