Best Python code snippet using autotest_python
worker.py
Source:worker.py
1# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing,10# software distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""15General code for parallelizing the workers16"""17import time18from queue import Empty, Full, Queue19from threading import Thread20from typing import Any, Callable, Iterator, List21__all__ = ["ParallelWorker"]22class ParallelWorker(object):23 """24 Multi threading worker to parallelize tasks25 :param worker_func: the function to parallelize across multiple tasks26 :param num_workers: number of workers to use27 :param indefinite: True to keep the thread pooling running so that28 more tasks can be added, False to stop after no more tasks are added29 :param max_source_size: the maximum size for the source queue30 """31 def __init__(32 self,33 worker_func: Callable,34 num_workers: int,35 indefinite: bool,36 max_source_size: int = -1,37 ):38 self._worker_func = worker_func39 self._num_workers = num_workers40 self._pending_count = 041 self._source_queue = (42 Queue(maxsize=max_source_size) if max_source_size > 0 else Queue()43 )44 self._completed = Queue()45 self._indefinite = Queue()46 self._shutdown = Queue()47 self.indefinite = indefinite48 def __iter__(self) -> Iterator[Any]:49 while self._shutdown.empty() and not (50 self._indefinite.empty()51 and self._pending_count < 152 and self._completed.empty()53 ):54 try:55 res = self._completed.get(block=True, timeout=1.0)56 self._pending_count -= 157 yield res58 except Empty:59 continue60 def __len__(self):61 return self._pending_count62 @property63 def indefinite(self) -> bool:64 """65 :return: True to keep the thread pooling running so that66 more tasks can be added, False to stop after no more tasks are added67 """68 return not self._indefinite.empty()69 @indefinite.setter70 def indefinite(self, value: bool):71 """72 :param value: True to keep the thread pooling running so that73 more tasks can be added, False to stop after no more tasks are added74 """75 if value and self._indefinite.empty():76 self._indefinite.put(True)77 elif not value and not self._indefinite.empty():78 self._indefinite.get()79 def start(self):80 """81 Start the workers82 """83 for _ in range(self._num_workers):84 Thread(85 target=ParallelWorker._worker,86 args=(87 self._worker_func,88 self._source_queue,89 self._completed,90 self._indefinite,91 self._shutdown,92 ),93 ).start()94 def shutdown(self):95 """96 Stop the workers97 """98 self._shutdown.put(True)99 def add(self, vals: List[Any]):100 """101 :param vals: the values to add for processing work102 """103 self._pending_count += len(vals)104 ParallelWorker._adder(vals, self._source_queue, self._shutdown)105 def add_async(self, vals: List[Any]):106 """107 :param vals: the values to add for async workers108 """109 self._pending_count += len(vals)110 Thread(111 target=ParallelWorker._adder,112 args=(vals, self._source_queue, self._shutdown),113 ).start()114 def add_async_generator(self, gen: Iterator[Any]):115 """116 :param gen: add an async generator to pull values from for processing117 """118 Thread(119 target=ParallelWorker._gen_adder,120 args=(gen, self._source_queue, self._shutdown, self._indefinite),121 ).start()122 def add_item(self, val: Any):123 """124 :param val: add a single item for processing125 """126 self._pending_count += 1127 self._source_queue.put(val)128 @staticmethod129 def _worker(130 worker_func: Callable,131 source_queue: Queue,132 completed: Queue,133 indefinite: Queue,134 shutdown: Queue,135 ):136 while True:137 if not shutdown.empty() or (source_queue.empty() and indefinite.empty()):138 return139 try:140 val = source_queue.get(block=True, timeout=0.01)141 except Empty:142 continue143 res = worker_func(val)144 completed.put(res)145 source_queue.task_done()146 @staticmethod147 def _adder(vals: List[Any], source_queue: Queue, shutdown: Queue):148 index = 0149 while index < len(vals) and shutdown.empty():150 try:151 source_queue.put(vals[index], block=True, timeout=0.01)152 index += 1153 except Full:154 continue155 @staticmethod156 def _gen_adder(157 gen: Iterator[Any], source_queue: Queue, shutdown: Queue, indefinite: Queue158 ):159 indefinite.put(True)160 for val in gen:161 while True:162 if not shutdown.empty():163 return164 try:165 source_queue.put(val, block=True, timeout=0.01)166 break167 except Full:168 continue169 # give some time for everything to complete since we didn't add to pending count170 # need to architect this better in the future to get rid of171 # the edge case (last items don't complete in 1 sec)172 while not source_queue.empty():173 time.sleep(0.1)174 time.sleep(1.0)175 while not indefinite.empty():176 try:177 indefinite.get_nowait()178 except Empty:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!