Best Python code snippet using molotov_python
thread_pool_executor.py
Source:thread_pool_executor.py
1#2# Licensed to the Apache Software Foundation (ASF) under one or more3# contributor license agreements. See the NOTICE file distributed with4# this work for additional information regarding copyright ownership.5# The ASF licenses this file to You under the Apache License, Version 2.06# (the "License"); you may not use this file except in compliance with7# the License. You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17# pytype: skip-file18from __future__ import absolute_import19import sys20import threading21import weakref22from concurrent.futures import _base23try: # Python324 import queue25except Exception: # Python226 import Queue as queue # type: ignore[no-redef]27class _WorkItem(object):28 def __init__(self, future, fn, args, kwargs):29 self._future = future30 self._fn = fn31 self._fn_args = args32 self._fn_kwargs = kwargs33 def run(self):34 if self._future.set_running_or_notify_cancel():35 # If the future wasn't cancelled, then attempt to execute it.36 try:37 self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))38 except BaseException as exc:39 # Even though Python 2 futures library has #set_exection(),40 # the way it generates the traceback doesn't align with41 # the way in which Python 3 does it so we provide alternative42 # implementations that match our test expectations.43 if sys.version_info.major >= 3:44 self._future.set_exception(exc)45 else:46 e, tb = sys.exc_info()[1:]47 self._future.set_exception_info(e, tb)48class _Worker(threading.Thread):49 def __init__(50 self, idle_worker_queue, permitted_thread_age_in_seconds, work_item):51 super(_Worker, self).__init__()52 self._idle_worker_queue = idle_worker_queue53 self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds54 self._work_item = work_item55 self._wake_event = threading.Event()56 self._lock = threading.Lock()57 self._shutdown = False58 def run(self):59 while True:60 self._work_item.run()61 self._work_item = None62 # If we are explicitly awake then don't add ourselves back to the63 # idle queue. This occurs in case 3 described below.64 if not self._wake_event.is_set():65 self._idle_worker_queue.put(self)66 self._wake_event.wait(self._permitted_thread_age_in_seconds)67 with self._lock:68 # When we are awoken, we may be in one of three states:69 # 1) _work_item is set and _shutdown is False.70 # This represents the case when we have accepted work.71 # 2) _work_item is unset and _shutdown is True.72 # This represents the case where either we timed out before73 # accepting work or explicitly were shutdown without accepting74 # any work.75 # 3) _work_item is set and _shutdown is True.76 # This represents a race where we accepted work and also77 # were shutdown before the worker thread started processing78 # that work. In this case we guarantee to process the work79 # but we don't clear the event ensuring that the next loop80 # around through to the wait() won't block and we will exit81 # since _work_item will be unset.82 # We only exit when _work_item is unset to prevent dropping of83 # submitted work.84 if self._work_item is None:85 self._shutdown = True86 return87 if not self._shutdown:88 self._wake_event.clear()89 def accepted_work(self, work_item):90 """Returns True if the work was accepted.91 This method must only be called while the worker is idle.92 """93 with self._lock:94 if self._shutdown:95 return False96 self._work_item = work_item97 self._wake_event.set()98 return True99 def shutdown(self):100 """Marks this thread as shutdown possibly waking it up if it is idle."""101 with self._lock:102 if self._shutdown:103 return104 self._shutdown = True105 self._wake_event.set()106class UnboundedThreadPoolExecutor(_base.Executor):107 def __init__(self, permitted_thread_age_in_seconds=30):108 self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds109 self._idle_worker_queue = queue.Queue()110 self._workers = weakref.WeakSet()111 self._shutdown = False112 self._lock = threading.Lock() # Guards access to _workers and _shutdown113 def submit(self, fn, *args, **kwargs):114 """Attempts to submit the work item.115 A runtime error is raised if the pool has been shutdown.116 """117 future = _base.Future()118 work_item = _WorkItem(future, fn, args, kwargs)119 try:120 # Keep trying to get an idle worker from the queue until we find one121 # that accepts the work.122 while not self._idle_worker_queue.get(123 block=False).accepted_work(work_item):124 pass125 return future126 except queue.Empty:127 with self._lock:128 if self._shutdown:129 raise RuntimeError(130 'Cannot schedule new tasks after thread pool '131 'has been shutdown.')132 worker = _Worker(133 self._idle_worker_queue,134 self._permitted_thread_age_in_seconds,135 work_item)136 worker.daemon = True137 worker.start()138 self._workers.add(worker)139 return future140 def shutdown(self, wait=True):141 with self._lock:142 if self._shutdown:143 return144 self._shutdown = True145 for worker in self._workers:146 worker.shutdown()147 if wait:148 for worker in self._workers:...
thread.py
Source:thread.py
1# Copyright 2009 Brian Quinlan. All Rights Reserved.2# Licensed to PSF under a Contributor Agreement.3"""Implements ThreadPoolExecutor."""4__author__ = 'Brian Quinlan (brian@sweetapp.com)'5import atexit6from concurrent.futures import _base7import queue8import threading9import weakref10# Workers are created as daemon threads. This is done to allow the interpreter11# to exit when there are still idle threads in a ThreadPoolExecutor's thread12# pool (i.e. shutdown() was not called). However, allowing workers to die with13# the interpreter has two undesirable properties:14# - The workers would still be running during interpretor shutdown,15# meaning that they would fail in unpredictable ways.16# - The workers could be killed while evaluating a work item, which could17# be bad if the callable being evaluated has external side-effects e.g.18# writing to a file.19#20# To work around this problem, an exit handler is installed which tells the21# workers to exit when their work queues are empty and then waits until the22# threads finish.23_threads_queues = weakref.WeakKeyDictionary()24_shutdown = False25def _python_exit():26 global _shutdown27 _shutdown = True28 items = list(_threads_queues.items())29 for t, q in items:30 q.put(None)31 for t, q in items:32 t.join()33atexit.register(_python_exit)34class _WorkItem(object):35 def __init__(self, future, fn, args, kwargs):36 self.future = future37 self.fn = fn38 self.args = args39 self.kwargs = kwargs40 def run(self):41 if not self.future.set_running_or_notify_cancel():42 return43 try:44 result = self.fn(*self.args, **self.kwargs)45 except BaseException as e:46 self.future.set_exception(e)47 else:48 self.future.set_result(result)49def _worker(executor_reference, work_queue):50 try:51 while True:52 work_item = work_queue.get(block=True)53 if work_item is not None:54 work_item.run()55 continue56 executor = executor_reference()57 # Exit if:58 # - The interpreter is shutting down OR59 # - The executor that owns the worker has been collected OR60 # - The executor that owns the worker has been shutdown.61 if _shutdown or executor is None or executor._shutdown:62 # Notice other workers63 work_queue.put(None)64 return65 del executor66 except BaseException as e:67 _base.LOGGER.critical('Exception in worker', exc_info=True)68class ThreadPoolExecutor(_base.Executor):69 def __init__(self, max_workers):70 """Initializes a new ThreadPoolExecutor instance.71 Args:72 max_workers: The maximum number of threads that can be used to73 execute the given calls.74 """75 self._max_workers = max_workers76 self._work_queue = queue.Queue()77 self._threads = set()78 self._shutdown = False79 self._shutdown_lock = threading.Lock()80 def submit(self, fn, *args, **kwargs):81 with self._shutdown_lock:82 if self._shutdown:83 raise RuntimeError('cannot schedule new futures after shutdown')84 f = _base.Future()85 w = _WorkItem(f, fn, args, kwargs)86 self._work_queue.put(w)87 self._adjust_thread_count()88 return f89 submit.__doc__ = _base.Executor.submit.__doc__90 def _adjust_thread_count(self):91 # When the executor gets lost, the weakref callback will wake up92 # the worker threads.93 def weakref_cb(_, q=self._work_queue):94 q.put(None)95 # TODO(bquinlan): Should avoid creating new threads if there are more96 # idle threads than items in the work queue.97 if len(self._threads) < self._max_workers:98 t = threading.Thread(target=_worker,99 args=(weakref.ref(self, weakref_cb),100 self._work_queue))101 t.daemon = True102 t.start()103 self._threads.add(t)104 _threads_queues[t] = self._work_queue105 def shutdown(self, wait=True):106 with self._shutdown_lock:107 self._shutdown = True108 self._work_queue.put(None)109 if wait:110 for t in self._threads:111 t.join()...
fix_threading_shutdown.py
Source:fix_threading_shutdown.py
...15def set_shutdown_timeout(value):16 """Set the shutdown timeout value (int)."""17 global SHUTDOWN_TIMEOUT18 SHUTDOWN_TIMEOUT = value19def get_allow_shutdown():20 """Return if "shutdown" automatically calls the thread allow_shutdown."""21 global ALLOW_SHUTDOWN22 return ALLOW_SHUTDOWN23def set_allow_shutdown(value):24 """Set if "shutdown" automatically calls the thread allow_shutdown."""25 global ALLOW_SHUTDOWN26 ALLOW_SHUTDOWN = value27def shutdown(timeout=None, allow_shutdown=True):28 """Join and allow all threads to shutdown.29 Python's threading._shutdown may hang preventing the process from exiting completely. It uses the "_tstate_lock"30 to wait for every thread to "join". Python's threading._shutdown used to call join to achieve the same behavior.31 This library overrides threading._shutdown to "join" all Threads before the process ends.32 You may want to put this at the end of your code. Atexit will not work for this.33 I did not have a problem with Python 3.4. I noticed this issue in Python 3.8. I do not know34 when this changed and stopped working.35 .. code-block :: python36 import time37 import continuous_threading38 def do_something():39 print('hello')40 time.sleep(1)41 th = continuous_threading.PausableThread(target=do_something)42 th.start()43 time.sleep(10)44 continuous_threading.shutdown()45 Args:46 timeout (int/float)[None]: Timeout argument to pass into every thread's join method.47 allow_shutdown (bool)[True]: If True also call allow_shutdown on the thread.48 """49 if allow_shutdown is None:50 allow_shutdown = get_allow_shutdown()51 for th in threading.enumerate():52 # Join the thread if not joined53 try:54 if th is not threading.main_thread() and th.is_alive() and not th.isDaemon():55 th.join(timeout)56 except (AttributeError, ValueError, TypeError, Exception) as err:57 print(err)58 # Allow threading._shutdown() to continue59 if allow_shutdown:60 try:61 th.allow_shutdown()62 except (AttributeError, Exception):63 pass64# Save the original threading._shutdown function65threading_shutdown = threading._shutdown66def custom_shutdown():67 """Safely shutdown the threads."""68 shutdown(get_shutdown_timeout(), get_allow_shutdown())69 threading_shutdown()70def using_custom_shutdown():71 """Return True if the threading._shutdown function is using the custom shutdown function."""72 return threading._shutdown != threading_shutdown73def set_shutdown(func=None):74 """Set the threading._shutdown function to "join" all threads."""75 if func is None:76 func = custom_shutdown77 threading._shutdown = func78def reset_shutdown():79 """Reset the threading._shutdown function to use its original function."""80 threading._shutdown = threading_shutdown81# Change threading._shutdown to use our custom function...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!