Best Python code snippet using localstack_python
__init__.py
Source: __init__.py
...44 started = threading.Event()45 @wraps(f)46 def _wrapped(*args, **kwargs):47 started.clear()48 started.loop = ensure_event_loop()49 started.set()50 return started.loop.run_until_complete(f(*args, **kwargs))51 def _cancel():52 loop = ensure_event_loop()53 current_task = asyncio.tasks.Task.current_task(loop=loop)54 while True:55 try:56 tasks_to_cancel = asyncio.Task.all_tasks(loop=loop)57 tasks = [task for task in tasks_to_cancel58 if task is not current_task]59 break60 except RuntimeError as exception:61 # e.g., set changed size during iteration62 _L().debug('ERROR: %s', exception, exc_info=True)63 list(map(lambda task: task.cancel(), tasks))64 def cancel():65 started.loop.call_soon_threadsafe(_cancel)66 _wrapped.started = started67 _wrapped.cancel = lambda: started.loop.call_soon_threadsafe(_cancel)68 return _wrapped69def new_file_event_loop():70 '''71 Create an asyncio event loop compatible with file IO events, e.g., serial72 device events.73 On Windows, only sockets are supported by the default event loop (i.e.,74 :class:`asyncio.SelectorEventLoop`). However, file IO events _are_75 supported on Windows by the :class:`asyncio.ProactorEventLoop` event loop76 to support file I/O events.77 Returns78 -------79 asyncio.ProactorEventLoop or asyncio.SelectorEventLoop80 '''81 return (asyncio.ProactorEventLoop() if platform.system() == 'Windows'82 else asyncio.new_event_loop())83def ensure_event_loop():84 '''85 Ensure that an asyncio event loop has been bound to the local thread86 context.87 .. notes::88 While an asyncio event loop _is_ assigned to the local thread context89 for the main execution thread, other threads _do not_ have an event90 loop bound to the thread context.91 Returns92 -------93 asyncio.ProactorEventLoop or asyncio.SelectorEventLoop94 '''95 try:96 loop = asyncio.get_event_loop()97 except RuntimeError as e:98 if 'There is no current event loop' in str(e):99 loop = new_file_event_loop()100 asyncio.set_event_loop(loop)101 else:102 raise103 return loop104def with_loop(func):105 '''106 Decorator to run coroutine within an asyncio event loop.107 Parameters108 ----------109 func : asyncio.coroutine110 .. notes::111 Uses :class:`asyncio.ProactorEventLoop` on Windows to support file I/O112 events, e.g., serial device events.113 If an event loop is already bound to the thread, but is either a)114 currently running, or b) *not a :class:`asyncio.ProactorEventLoop`115 instance*, execute function in a new thread running a new116 :class:`asyncio.ProactorEventLoop` instance.117 Example118 -------119 >>> @with_loop120 ... @asyncio.coroutine121 ... def foo(a):122 ... raise asyncio.Return(a)123 >>>124 >>> a = 'hello'125 >>> assert(foo(a) == a)126 '''127 @wraps(func)128 def wrapped(*args, **kwargs):129 loop = ensure_event_loop()130 thread_required = False131 if loop.is_running():132 _L().debug('Event loop is already running.')133 thread_required = True134 elif all([platform.system() == 'Windows',135 not isinstance(loop, asyncio.ProactorEventLoop)]):136 _L().debug('`ProactorEventLoop` required, not `%s` loop in '137 'background thread.', type(loop))138 thread_required = True139 if thread_required:140 _L().debug('Execute new loop in background thread.')141 finished = threading.Event()142 def _run(generator):143 loop = ensure_event_loop()144 try:145 result = loop.run_until_complete(asyncio146 .ensure_future(generator))147 except Exception as e:148 finished.result = None149 finished.error = e150 else:151 finished.result = result152 finished.error = None153 finally:154 loop.close()155 _L().debug('closed event loop')156 finished.set()157 thread = threading.Thread(target=_run,...
async.py
Source: async.py
...13 _read_device_id, asyncio, read_packet)14def new_file_event_loop():15 return (asyncio.ProactorEventLoop() if platform.system() == 'Windows'16 else asyncio.new_event_loop())17def ensure_event_loop():18 try:19 loop = asyncio.get_event_loop()20 except RuntimeError as e:21 if 'There is no current event loop' in str(e):22 loop = new_file_event_loop()23 asyncio.set_event_loop(loop)24 else:25 raise26 return loop27def with_loop(func):28 '''29 Decorator to run function within an asyncio event loop.30 .. notes::31 Uses :class:`asyncio.ProactorEventLoop` on Windows to support file I/O32 events, e.g., serial device events.33 If an event loop is already bound to the thread, but is either a)34 currently running, or b) *not a :class:`asyncio.ProactorEventLoop`35 instance*, execute function in a new thread running a new36 :class:`asyncio.ProactorEventLoop` instance.37 '''38 @wraps(func)39 def wrapped(*args, **kwargs):40 loop = ensure_event_loop()41 thread_required = False42 if loop.is_running():43 _L().debug('Event loop is already running.')44 thread_required = True45 elif all([platform.system() == 'Windows',46 not isinstance(loop, asyncio.ProactorEventLoop)]):47 _L().debug('`ProactorEventLoop` required, not `%s` loop in '48 'background thread.', type(loop))49 thread_required = True50 if thread_required:51 _L().debug('Execute new loop in background thread.')52 finished = threading.Event()53 def _run(generator):54 loop = ensure_event_loop()55 try:56 result = loop.run_until_complete(asyncio57 .ensure_future(generator))58 except Exception as e:59 finished.result = None60 finished.error = e61 else:62 finished.result = result63 finished.error = None64 finally:65 loop.close()66 _L().debug('closed event loop')67 finished.set()68 thread = threading.Thread(target=_run,...
asyncio_util.py
Source: asyncio_util.py
...29 :class:`ProactorEventLoop` must explicitly be used on Windows. **30 '''31 return (asyncio.ProactorEventLoop() if platform.system() == 'Windows'32 else asyncio.new_event_loop())33def ensure_event_loop():34 '''35 .. versionadded:: 0.1536 Get existing event loop or create a new one if necessary.37 Returns38 -------39 asyncio.BaseEventLoop40 '''41 try:42 loop = asyncio.get_event_loop()43 except RuntimeError as e:44 if 'There is no current event loop' in str(e):45 loop = new_file_event_loop()46 asyncio.set_event_loop(loop)47 else:48 raise49 return loop50def with_loop(func):51 '''52 .. versionadded:: 0.1553 Decorator to run function within an asyncio event loop.54 .. notes::55 Uses :class:`asyncio.ProactorEventLoop` on Windows to support file I/O56 events, e.g., serial device events.57 If an event loop is already bound to the thread, but is either a)58 currently running, or b) *not a :class:`asyncio.ProactorEventLoop`59 instance*, execute function in a new thread running a new60 :class:`asyncio.ProactorEventLoop` instance.61 '''62 @wraps(func)63 def wrapped(*args, **kwargs):64 loop = ensure_event_loop()65 thread_required = False66 if loop.is_running():67 logger.debug('Event loop is already running.')68 thread_required = True69 elif all([platform.system() == 'Windows',70 not isinstance(loop, asyncio.ProactorEventLoop)]):71 logger.debug('`ProactorEventLoop` required, not `%s`'72 'loop in background thread.', type(loop))73 thread_required = True74 if thread_required:75 logger.debug('Execute new loop in background thread.')76 finished = threading.Event()77 def _run(generator):78 loop = ensure_event_loop()79 try:80 result = loop.run_until_complete(asyncio81 .ensure_future(generator))82 except Exception as e:83 finished.result = None84 finished.error = e85 else:86 finished.result = result87 finished.error = None88 finished.set()89 thread = threading.Thread(target=_run,90 args=(func(*args, **kwargs), ))91 thread.daemon = True92 thread.start()...
Check out the latest blogs from LambdaTest on this topic:
Joseph, who has been working as a Quality Engineer, was assigned to perform web automation for the company’s website.
These days, development teams depend heavily on feedback from automated tests to evaluate the quality of the system they are working on.
The best agile teams are built from people who work together as one unit, where each team member has both the technical and the personal skills to allow the team to become self-organized, cross-functional, and self-motivated. These are all big words that I hear in almost every agile project. Still, the criteria to make a fantastic agile team are practically impossible to achieve without one major factor: motivation towards a common goal.
Pair testing can help you complete your testing tasks faster and with higher quality. But who can do pair testing, and when should it be done? And what form of pair testing is best for your circumstance? Check out this blog for more information on how to conduct pair testing to optimize its benefits.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!