Best Python code snippet using playwright-python
test_futures.py
Source:test_futures.py
...83 try:84 yield logging_stream85 finally:86 logging.root.removeHandler(handler)87def create_future(state=PENDING, exception=None, result=None):88 f = Future()89 f._state = state90 f._exception = exception91 f._result = result92 return f93PENDING_FUTURE = create_future(state=PENDING)94RUNNING_FUTURE = create_future(state=RUNNING)95CANCELLED_FUTURE = create_future(state=CANCELLED)96CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)97EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())98SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)99def mul(x, y):100 return x * y101def sleep_and_raise(t):102 time.sleep(t)103 raise Exception('this is an exception')104def sleep_and_print(t, msg):105 time.sleep(t)106 print(msg)107 sys.stdout.flush()108class ExecutorMixin:109 worker_count = 5110 def setUp(self):111 self.t1 = time.time()112 try:113 self.executor = self.executor_type(max_workers=self.worker_count)114 except NotImplementedError:115 e = sys.exc_info()[1]116 self.skipTest(str(e))117 self._prime_executor()118 def tearDown(self):119 self.executor.shutdown(wait=True)120 dt = time.time() - self.t1121 if test_support.verbose:122 print("%.2fs" % dt)123 self.assertLess(dt, 60, "synchronization issue: test lasted too long")124 def _prime_executor(self):125 # Make sure that the executor is ready to do work before running the126 # tests. This should reduce the probability of timeouts in the tests.127 futures = [self.executor.submit(time.sleep, 0.1)128 for _ in range(self.worker_count)]129 for f in futures:130 f.result()131class ThreadPoolMixin(ExecutorMixin):132 executor_type = futures.ThreadPoolExecutor133class ProcessPoolMixin(ExecutorMixin):134 executor_type = futures.ProcessPoolExecutor135class ExecutorShutdownTest(unittest.TestCase):136 def test_run_after_shutdown(self):137 self.executor.shutdown()138 self.assertRaises(RuntimeError,139 self.executor.submit,140 pow, 2, 5)141 def test_interpreter_shutdown(self):142 # Test the atexit hook for shutdown of worker threads and processes143 rc, out, err = assert_python_ok('-c', """if 1:144 from concurrent.futures import %s145 from time import sleep146 from test_futures import sleep_and_print147 t = %s(5)148 t.submit(sleep_and_print, 1.0, "apple")149 """ % (self.executor_type.__name__, self.executor_type.__name__))150 # Errors in atexit hooks don't change the process exit code, check151 # stderr manually.152 self.assertFalse(err)153 self.assertEqual(out.strip(), "apple".encode())154 def test_hang_issue12364(self):155 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]156 self.executor.shutdown()157 for f in fs:158 f.result()159class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):160 def _prime_executor(self):161 pass162 def test_threads_terminate(self):163 self.executor.submit(mul, 21, 2)164 self.executor.submit(mul, 6, 7)165 self.executor.submit(mul, 3, 14)166 self.assertEqual(len(self.executor._threads), 3)167 self.executor.shutdown()168 for t in self.executor._threads:169 t.join()170 def test_context_manager_shutdown(self):171 with futures.ThreadPoolExecutor(max_workers=5) as e:172 executor = e173 self.assertEqual(list(e.map(abs, range(-5, 5))),174 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])175 for t in executor._threads:176 t.join()177 def test_del_shutdown(self):178 executor = futures.ThreadPoolExecutor(max_workers=5)179 executor.map(abs, range(-5, 5))180 threads = executor._threads181 del executor182 for t in threads:183 t.join()184class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):185 def _prime_executor(self):186 pass187 def test_processes_terminate(self):188 self.executor.submit(mul, 21, 2)189 self.executor.submit(mul, 6, 7)190 self.executor.submit(mul, 3, 14)191 self.assertEqual(len(self.executor._processes), 5)192 processes = self.executor._processes193 self.executor.shutdown()194 for p in processes:195 p.join()196 def test_context_manager_shutdown(self):197 with futures.ProcessPoolExecutor(max_workers=5) as e:198 processes = e._processes199 self.assertEqual(list(e.map(abs, range(-5, 5))),200 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])201 for p in processes:202 p.join()203 def test_del_shutdown(self):204 executor = futures.ProcessPoolExecutor(max_workers=5)205 list(executor.map(abs, range(-5, 5)))206 queue_management_thread = executor._queue_management_thread207 processes = executor._processes208 del executor209 queue_management_thread.join()210 for p in processes:211 p.join()212class WaitTests(unittest.TestCase):213 def test_first_completed(self):214 future1 = self.executor.submit(mul, 21, 2)215 future2 = self.executor.submit(time.sleep, 1.5)216 done, not_done = futures.wait(217 [CANCELLED_FUTURE, future1, future2],218 return_when=futures.FIRST_COMPLETED)219 self.assertEqual(set([future1]), done)220 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)221 def test_first_completed_some_already_completed(self):222 future1 = self.executor.submit(time.sleep, 1.5)223 finished, pending = futures.wait(224 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],225 return_when=futures.FIRST_COMPLETED)226 self.assertEqual(227 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),228 finished)229 self.assertEqual(set([future1]), pending)230 def test_first_exception(self):231 future1 = self.executor.submit(mul, 2, 21)232 future2 = self.executor.submit(sleep_and_raise, 1.5)233 future3 = self.executor.submit(time.sleep, 3)234 finished, pending = futures.wait(235 [future1, future2, future3],236 return_when=futures.FIRST_EXCEPTION)237 self.assertEqual(set([future1, future2]), finished)238 self.assertEqual(set([future3]), pending)239 def test_first_exception_some_already_complete(self):240 future1 = self.executor.submit(divmod, 21, 0)241 future2 = self.executor.submit(time.sleep, 1.5)242 finished, pending = futures.wait(243 [SUCCESSFUL_FUTURE,244 CANCELLED_FUTURE,245 CANCELLED_AND_NOTIFIED_FUTURE,246 future1, future2],247 return_when=futures.FIRST_EXCEPTION)248 self.assertEqual(set([SUCCESSFUL_FUTURE,249 CANCELLED_AND_NOTIFIED_FUTURE,250 future1]), finished)251 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)252 def test_first_exception_one_already_failed(self):253 future1 = self.executor.submit(time.sleep, 2)254 finished, pending = futures.wait(255 [EXCEPTION_FUTURE, future1],256 return_when=futures.FIRST_EXCEPTION)257 self.assertEqual(set([EXCEPTION_FUTURE]), finished)258 self.assertEqual(set([future1]), pending)259 def test_all_completed(self):260 future1 = self.executor.submit(divmod, 2, 0)261 future2 = self.executor.submit(mul, 2, 21)262 finished, pending = futures.wait(263 [SUCCESSFUL_FUTURE,264 CANCELLED_AND_NOTIFIED_FUTURE,265 EXCEPTION_FUTURE,266 future1,267 future2],268 return_when=futures.ALL_COMPLETED)269 self.assertEqual(set([SUCCESSFUL_FUTURE,270 CANCELLED_AND_NOTIFIED_FUTURE,271 EXCEPTION_FUTURE,272 future1,273 future2]), finished)274 self.assertEqual(set(), pending)275 def test_timeout(self):276 future1 = self.executor.submit(mul, 6, 7)277 future2 = self.executor.submit(time.sleep, 3)278 finished, pending = futures.wait(279 [CANCELLED_AND_NOTIFIED_FUTURE,280 EXCEPTION_FUTURE,281 SUCCESSFUL_FUTURE,282 future1, future2],283 timeout=1.5,284 return_when=futures.ALL_COMPLETED)285 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,286 EXCEPTION_FUTURE,287 SUCCESSFUL_FUTURE,288 future1]), finished)289 self.assertEqual(set([future2]), pending)290class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):291 def test_pending_calls_race(self):292 # Issue #14406: multi-threaded race condition when waiting on all293 # futures.294 event = threading.Event()295 def future_func():296 event.wait()297 oldswitchinterval = sys.getcheckinterval()298 sys.setcheckinterval(1)299 try:300 fs = set(self.executor.submit(future_func) for i in range(100))301 event.set()302 futures.wait(fs, return_when=futures.ALL_COMPLETED)303 finally:304 sys.setcheckinterval(oldswitchinterval)305class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):306 pass307class AsCompletedTests(unittest.TestCase):308 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.309 def test_no_timeout(self):310 future1 = self.executor.submit(mul, 2, 21)311 future2 = self.executor.submit(mul, 7, 6)312 completed = set(futures.as_completed(313 [CANCELLED_AND_NOTIFIED_FUTURE,314 EXCEPTION_FUTURE,315 SUCCESSFUL_FUTURE,316 future1, future2]))317 self.assertEqual(set(318 [CANCELLED_AND_NOTIFIED_FUTURE,319 EXCEPTION_FUTURE,320 SUCCESSFUL_FUTURE,321 future1, future2]),322 completed)323 def test_zero_timeout(self):324 future1 = self.executor.submit(time.sleep, 2)325 completed_futures = set()326 try:327 for future in futures.as_completed(328 [CANCELLED_AND_NOTIFIED_FUTURE,329 EXCEPTION_FUTURE,330 SUCCESSFUL_FUTURE,331 future1],332 timeout=0):333 completed_futures.add(future)334 except futures.TimeoutError:335 pass336 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,337 EXCEPTION_FUTURE,338 SUCCESSFUL_FUTURE]),339 completed_futures)340 def test_duplicate_futures(self):341 # Issue 20367. Duplicate futures should not raise exceptions or give342 # duplicate responses.343 future1 = self.executor.submit(time.sleep, 2)344 completed = [f for f in futures.as_completed([future1,future1])]345 self.assertEqual(len(completed), 1)346class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):347 pass348class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):349 pass350class ExecutorTest(unittest.TestCase):351 # Executor.shutdown() and context manager usage is tested by352 # ExecutorShutdownTest.353 def test_submit(self):354 future = self.executor.submit(pow, 2, 8)355 self.assertEqual(256, future.result())356 def test_submit_keyword(self):357 future = self.executor.submit(mul, 2, y=8)358 self.assertEqual(16, future.result())359 def test_map(self):360 self.assertEqual(361 list(self.executor.map(pow, range(10), range(10))),362 list(map(pow, range(10), range(10))))363 def test_map_exception(self):364 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])365 self.assertEqual(next(i), (0, 1))366 self.assertEqual(next(i), (0, 1))367 self.assertRaises(ZeroDivisionError, next, i)368 def test_map_timeout(self):369 results = []370 try:371 for i in self.executor.map(time.sleep,372 [0, 0, 3],373 timeout=1.5):374 results.append(i)375 except futures.TimeoutError:376 pass377 else:378 self.fail('expected TimeoutError')379 self.assertEqual([None, None], results)380class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):381 def test_map_submits_without_iteration(self):382 """Tests verifying issue 11777."""383 finished = []384 def record_finished(n):385 finished.append(n)386 self.executor.map(record_finished, range(10))387 self.executor.shutdown(wait=True)388 self.assertEqual(len(finished), 10)389class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):390 pass391class FutureTests(unittest.TestCase):392 def test_done_callback_with_result(self):393 callback_result = [None]394 def fn(callback_future):395 callback_result[0] = callback_future.result()396 f = Future()397 f.add_done_callback(fn)398 f.set_result(5)399 self.assertEqual(5, callback_result[0])400 def test_done_callback_with_exception(self):401 callback_exception = [None]402 def fn(callback_future):403 callback_exception[0] = callback_future.exception()404 f = Future()405 f.add_done_callback(fn)406 f.set_exception(Exception('test'))407 self.assertEqual(('test',), callback_exception[0].args)408 def test_done_callback_with_cancel(self):409 was_cancelled = [None]410 def fn(callback_future):411 was_cancelled[0] = callback_future.cancelled()412 f = Future()413 f.add_done_callback(fn)414 self.assertTrue(f.cancel())415 self.assertTrue(was_cancelled[0])416 def test_done_callback_raises(self):417 with captured_stderr() as stderr:418 raising_was_called = [False]419 fn_was_called = [False]420 def raising_fn(callback_future):421 raising_was_called[0] = True422 raise Exception('doh!')423 def fn(callback_future):424 fn_was_called[0] = True425 f = Future()426 f.add_done_callback(raising_fn)427 f.add_done_callback(fn)428 f.set_result(5)429 self.assertTrue(raising_was_called)430 self.assertTrue(fn_was_called)431 self.assertIn('Exception: doh!', stderr.getvalue())432 def test_done_callback_already_successful(self):433 callback_result = [None]434 def fn(callback_future):435 callback_result[0] = callback_future.result()436 f = Future()437 f.set_result(5)438 f.add_done_callback(fn)439 self.assertEqual(5, callback_result[0])440 def test_done_callback_already_failed(self):441 callback_exception = [None]442 def fn(callback_future):443 callback_exception[0] = callback_future.exception()444 f = Future()445 f.set_exception(Exception('test'))446 f.add_done_callback(fn)447 self.assertEqual(('test',), callback_exception[0].args)448 def test_done_callback_already_cancelled(self):449 was_cancelled = [None]450 def fn(callback_future):451 was_cancelled[0] = callback_future.cancelled()452 f = Future()453 self.assertTrue(f.cancel())454 f.add_done_callback(fn)455 self.assertTrue(was_cancelled[0])456 def test_repr(self):457 self.assertRegexpMatches(repr(PENDING_FUTURE),458 '<Future at 0x[0-9a-f]+ state=pending>')459 self.assertRegexpMatches(repr(RUNNING_FUTURE),460 '<Future at 0x[0-9a-f]+ state=running>')461 self.assertRegexpMatches(repr(CANCELLED_FUTURE),462 '<Future at 0x[0-9a-f]+ state=cancelled>')463 self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),464 '<Future at 0x[0-9a-f]+ state=cancelled>')465 self.assertRegexpMatches(466 repr(EXCEPTION_FUTURE),467 '<Future at 0x[0-9a-f]+ state=finished raised IOError>')468 self.assertRegexpMatches(469 repr(SUCCESSFUL_FUTURE),470 '<Future at 0x[0-9a-f]+ state=finished returned int>')471 def test_cancel(self):472 f1 = create_future(state=PENDING)473 f2 = create_future(state=RUNNING)474 f3 = create_future(state=CANCELLED)475 f4 = create_future(state=CANCELLED_AND_NOTIFIED)476 f5 = create_future(state=FINISHED, exception=IOError())477 f6 = create_future(state=FINISHED, result=5)478 self.assertTrue(f1.cancel())479 self.assertEqual(f1._state, CANCELLED)480 self.assertFalse(f2.cancel())481 self.assertEqual(f2._state, RUNNING)482 self.assertTrue(f3.cancel())483 self.assertEqual(f3._state, CANCELLED)484 self.assertTrue(f4.cancel())485 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)486 self.assertFalse(f5.cancel())487 self.assertEqual(f5._state, FINISHED)488 self.assertFalse(f6.cancel())489 self.assertEqual(f6._state, FINISHED)490 def test_cancelled(self):491 self.assertFalse(PENDING_FUTURE.cancelled())492 self.assertFalse(RUNNING_FUTURE.cancelled())493 self.assertTrue(CANCELLED_FUTURE.cancelled())494 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())495 self.assertFalse(EXCEPTION_FUTURE.cancelled())496 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())497 def test_done(self):498 self.assertFalse(PENDING_FUTURE.done())499 self.assertFalse(RUNNING_FUTURE.done())500 self.assertTrue(CANCELLED_FUTURE.done())501 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())502 self.assertTrue(EXCEPTION_FUTURE.done())503 self.assertTrue(SUCCESSFUL_FUTURE.done())504 def test_running(self):505 self.assertFalse(PENDING_FUTURE.running())506 self.assertTrue(RUNNING_FUTURE.running())507 self.assertFalse(CANCELLED_FUTURE.running())508 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())509 self.assertFalse(EXCEPTION_FUTURE.running())510 self.assertFalse(SUCCESSFUL_FUTURE.running())511 def test_result_with_timeout(self):512 self.assertRaises(futures.TimeoutError,513 PENDING_FUTURE.result, timeout=0)514 self.assertRaises(futures.TimeoutError,515 RUNNING_FUTURE.result, timeout=0)516 self.assertRaises(futures.CancelledError,517 CANCELLED_FUTURE.result, timeout=0)518 self.assertRaises(futures.CancelledError,519 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)520 self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)521 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)522 def test_result_with_success(self):523 # TODO(brian@sweetapp.com): This test is timing dependant.524 def notification():525 # Wait until the main thread is waiting for the result.526 time.sleep(1)527 f1.set_result(42)528 f1 = create_future(state=PENDING)529 t = threading.Thread(target=notification)530 t.start()531 self.assertEqual(f1.result(timeout=5), 42)532 def test_result_with_cancel(self):533 # TODO(brian@sweetapp.com): This test is timing dependant.534 def notification():535 # Wait until the main thread is waiting for the result.536 time.sleep(1)537 f1.cancel()538 f1 = create_future(state=PENDING)539 t = threading.Thread(target=notification)540 t.start()541 self.assertRaises(futures.CancelledError, f1.result, timeout=5)542 def test_exception_with_timeout(self):543 self.assertRaises(futures.TimeoutError,544 PENDING_FUTURE.exception, timeout=0)545 self.assertRaises(futures.TimeoutError,546 RUNNING_FUTURE.exception, timeout=0)547 self.assertRaises(futures.CancelledError,548 CANCELLED_FUTURE.exception, timeout=0)549 self.assertRaises(futures.CancelledError,550 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)551 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),552 IOError))553 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)554 def test_exception_with_success(self):555 def notification():556 # Wait until the main thread is waiting for the exception.557 time.sleep(1)558 with f1._condition:559 f1._state = FINISHED560 f1._exception = IOError()561 f1._condition.notify_all()562 f1 = create_future(state=PENDING)563 t = threading.Thread(target=notification)564 t.start()565 self.assertTrue(isinstance(f1.exception(timeout=5), IOError))566@reap_threads567def test_main():568 try:569 test_support.run_unittest(ProcessPoolExecutorTest,570 ThreadPoolExecutorTest,571 ProcessPoolWaitTests,572 ThreadPoolWaitTests,573 ProcessPoolAsCompletedTests,574 ThreadPoolAsCompletedTests,575 FutureTests,576 ProcessPoolShutdownTest,...
test_concurrent_futures.py
Source:test_concurrent_futures.py
...15from concurrent import futures16from concurrent.futures._base import (17 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)18import concurrent.futures.process19def create_future(state=PENDING, exception=None, result=None):20 f = Future()21 f._state = state22 f._exception = exception23 f._result = result24 return f25PENDING_FUTURE = create_future(state=PENDING)26RUNNING_FUTURE = create_future(state=RUNNING)27CANCELLED_FUTURE = create_future(state=CANCELLED)28CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)29EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())30SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)31def mul(x, y):32 return x * y33def sleep_and_raise(t):34 time.sleep(t)35 raise Exception('this is an exception')36def sleep_and_print(t, msg):37 time.sleep(t)38 print(msg)39 sys.stdout.flush()40class ExecutorMixin:41 worker_count = 542 def setUp(self):43 self.t1 = time.time()44 try:45 self.executor = self.executor_type(max_workers=self.worker_count)46 except NotImplementedError as e:47 self.skipTest(str(e))48 self._prime_executor()49 def tearDown(self):50 self.executor.shutdown(wait=True)51 dt = time.time() - self.t152 if test.support.verbose:53 print("%.2fs" % dt, end=' ')54 self.assertLess(dt, 60, "synchronization issue: test lasted too long")55 def _prime_executor(self):56 # Make sure that the executor is ready to do work before running the57 # tests. This should reduce the probability of timeouts in the tests.58 futures = [self.executor.submit(time.sleep, 0.1)59 for _ in range(self.worker_count)]60 for f in futures:61 f.result()62class ThreadPoolMixin(ExecutorMixin):63 executor_type = futures.ThreadPoolExecutor64class ProcessPoolMixin(ExecutorMixin):65 executor_type = futures.ProcessPoolExecutor66class ExecutorShutdownTest(unittest.TestCase):67 def test_run_after_shutdown(self):68 self.executor.shutdown()69 self.assertRaises(RuntimeError,70 self.executor.submit,71 pow, 2, 5)72 def test_interpreter_shutdown(self):73 # Test the atexit hook for shutdown of worker threads and processes74 rc, out, err = assert_python_ok('-c', """if 1:75 from concurrent.futures import {executor_type}76 from time import sleep77 from test.test_concurrent_futures import sleep_and_print78 t = {executor_type}(5)79 t.submit(sleep_and_print, 1.0, "apple")80 """.format(executor_type=self.executor_type.__name__))81 # Errors in atexit hooks don't change the process exit code, check82 # stderr manually.83 self.assertFalse(err)84 self.assertEqual(out.strip(), b"apple")85class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):86 def _prime_executor(self):87 pass88 def test_threads_terminate(self):89 self.executor.submit(mul, 21, 2)90 self.executor.submit(mul, 6, 7)91 self.executor.submit(mul, 3, 14)92 self.assertEqual(len(self.executor._threads), 3)93 self.executor.shutdown()94 for t in self.executor._threads:95 t.join()96 def test_context_manager_shutdown(self):97 with futures.ThreadPoolExecutor(max_workers=5) as e:98 executor = e99 self.assertEqual(list(e.map(abs, range(-5, 5))),100 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])101 for t in executor._threads:102 t.join()103 def test_del_shutdown(self):104 executor = futures.ThreadPoolExecutor(max_workers=5)105 executor.map(abs, range(-5, 5))106 threads = executor._threads107 del executor108 for t in threads:109 t.join()110class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):111 def _prime_executor(self):112 pass113 def test_processes_terminate(self):114 self.executor.submit(mul, 21, 2)115 self.executor.submit(mul, 6, 7)116 self.executor.submit(mul, 3, 14)117 self.assertEqual(len(self.executor._processes), 5)118 processes = self.executor._processes119 self.executor.shutdown()120 for p in processes:121 p.join()122 def test_context_manager_shutdown(self):123 with futures.ProcessPoolExecutor(max_workers=5) as e:124 processes = e._processes125 self.assertEqual(list(e.map(abs, range(-5, 5))),126 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])127 for p in processes:128 p.join()129 def test_del_shutdown(self):130 executor = futures.ProcessPoolExecutor(max_workers=5)131 list(executor.map(abs, range(-5, 5)))132 queue_management_thread = executor._queue_management_thread133 processes = executor._processes134 del executor135 queue_management_thread.join()136 for p in processes:137 p.join()138class WaitTests(unittest.TestCase):139 def test_first_completed(self):140 future1 = self.executor.submit(mul, 21, 2)141 future2 = self.executor.submit(time.sleep, 1.5)142 done, not_done = futures.wait(143 [CANCELLED_FUTURE, future1, future2],144 return_when=futures.FIRST_COMPLETED)145 self.assertEqual(set([future1]), done)146 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)147 def test_first_completed_some_already_completed(self):148 future1 = self.executor.submit(time.sleep, 1.5)149 finished, pending = futures.wait(150 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],151 return_when=futures.FIRST_COMPLETED)152 self.assertEqual(153 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),154 finished)155 self.assertEqual(set([future1]), pending)156 def test_first_exception(self):157 future1 = self.executor.submit(mul, 2, 21)158 future2 = self.executor.submit(sleep_and_raise, 1.5)159 future3 = self.executor.submit(time.sleep, 3)160 finished, pending = futures.wait(161 [future1, future2, future3],162 return_when=futures.FIRST_EXCEPTION)163 self.assertEqual(set([future1, future2]), finished)164 self.assertEqual(set([future3]), pending)165 def test_first_exception_some_already_complete(self):166 future1 = self.executor.submit(divmod, 21, 0)167 future2 = self.executor.submit(time.sleep, 1.5)168 finished, pending = futures.wait(169 [SUCCESSFUL_FUTURE,170 CANCELLED_FUTURE,171 CANCELLED_AND_NOTIFIED_FUTURE,172 future1, future2],173 return_when=futures.FIRST_EXCEPTION)174 self.assertEqual(set([SUCCESSFUL_FUTURE,175 CANCELLED_AND_NOTIFIED_FUTURE,176 future1]), finished)177 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)178 def test_first_exception_one_already_failed(self):179 future1 = self.executor.submit(time.sleep, 2)180 finished, pending = futures.wait(181 [EXCEPTION_FUTURE, future1],182 return_when=futures.FIRST_EXCEPTION)183 self.assertEqual(set([EXCEPTION_FUTURE]), finished)184 self.assertEqual(set([future1]), pending)185 def test_all_completed(self):186 future1 = self.executor.submit(divmod, 2, 0)187 future2 = self.executor.submit(mul, 2, 21)188 finished, pending = futures.wait(189 [SUCCESSFUL_FUTURE,190 CANCELLED_AND_NOTIFIED_FUTURE,191 EXCEPTION_FUTURE,192 future1,193 future2],194 return_when=futures.ALL_COMPLETED)195 self.assertEqual(set([SUCCESSFUL_FUTURE,196 CANCELLED_AND_NOTIFIED_FUTURE,197 EXCEPTION_FUTURE,198 future1,199 future2]), finished)200 self.assertEqual(set(), pending)201 def test_timeout(self):202 future1 = self.executor.submit(mul, 6, 7)203 future2 = self.executor.submit(time.sleep, 3)204 finished, pending = futures.wait(205 [CANCELLED_AND_NOTIFIED_FUTURE,206 EXCEPTION_FUTURE,207 SUCCESSFUL_FUTURE,208 future1, future2],209 timeout=1.5,210 return_when=futures.ALL_COMPLETED)211 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,212 EXCEPTION_FUTURE,213 SUCCESSFUL_FUTURE,214 future1]), finished)215 self.assertEqual(set([future2]), pending)216class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):217 pass218class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):219 pass220class AsCompletedTests(unittest.TestCase):221 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.222 def test_no_timeout(self):223 future1 = self.executor.submit(mul, 2, 21)224 future2 = self.executor.submit(mul, 7, 6)225 completed = set(futures.as_completed(226 [CANCELLED_AND_NOTIFIED_FUTURE,227 EXCEPTION_FUTURE,228 SUCCESSFUL_FUTURE,229 future1, future2]))230 self.assertEqual(set(231 [CANCELLED_AND_NOTIFIED_FUTURE,232 EXCEPTION_FUTURE,233 SUCCESSFUL_FUTURE,234 future1, future2]),235 completed)236 def test_zero_timeout(self):237 future1 = self.executor.submit(time.sleep, 2)238 completed_futures = set()239 try:240 for future in futures.as_completed(241 [CANCELLED_AND_NOTIFIED_FUTURE,242 EXCEPTION_FUTURE,243 SUCCESSFUL_FUTURE,244 future1],245 timeout=0):246 completed_futures.add(future)247 except futures.TimeoutError:248 pass249 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,250 EXCEPTION_FUTURE,251 SUCCESSFUL_FUTURE]),252 completed_futures)253class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):254 pass255class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):256 pass257class ExecutorTest(unittest.TestCase):258 # Executor.shutdown() and context manager usage is tested by259 # ExecutorShutdownTest.260 def test_submit(self):261 future = self.executor.submit(pow, 2, 8)262 self.assertEqual(256, future.result())263 def test_submit_keyword(self):264 future = self.executor.submit(mul, 2, y=8)265 self.assertEqual(16, future.result())266 def test_map(self):267 self.assertEqual(268 list(self.executor.map(pow, range(10), range(10))),269 list(map(pow, range(10), range(10))))270 def test_map_exception(self):271 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])272 self.assertEqual(i.__next__(), (0, 1))273 self.assertEqual(i.__next__(), (0, 1))274 self.assertRaises(ZeroDivisionError, i.__next__)275 def test_map_timeout(self):276 results = []277 try:278 for i in self.executor.map(time.sleep,279 [0, 0, 3],280 timeout=1.5):281 results.append(i)282 except futures.TimeoutError:283 pass284 else:285 self.fail('expected TimeoutError')286 self.assertEqual([None, None], results)287class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):288 pass289class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):290 pass291class FutureTests(unittest.TestCase):292 def test_done_callback_with_result(self):293 callback_result = None294 def fn(callback_future):295 nonlocal callback_result296 callback_result = callback_future.result()297 f = Future()298 f.add_done_callback(fn)299 f.set_result(5)300 self.assertEqual(5, callback_result)301 def test_done_callback_with_exception(self):302 callback_exception = None303 def fn(callback_future):304 nonlocal callback_exception305 callback_exception = callback_future.exception()306 f = Future()307 f.add_done_callback(fn)308 f.set_exception(Exception('test'))309 self.assertEqual(('test',), callback_exception.args)310 def test_done_callback_with_cancel(self):311 was_cancelled = None312 def fn(callback_future):313 nonlocal was_cancelled314 was_cancelled = callback_future.cancelled()315 f = Future()316 f.add_done_callback(fn)317 self.assertTrue(f.cancel())318 self.assertTrue(was_cancelled)319 def test_done_callback_raises(self):320 with test.support.captured_stderr() as stderr:321 raising_was_called = False322 fn_was_called = False323 def raising_fn(callback_future):324 nonlocal raising_was_called325 raising_was_called = True326 raise Exception('doh!')327 def fn(callback_future):328 nonlocal fn_was_called329 fn_was_called = True330 f = Future()331 f.add_done_callback(raising_fn)332 f.add_done_callback(fn)333 f.set_result(5)334 self.assertTrue(raising_was_called)335 self.assertTrue(fn_was_called)336 self.assertIn('Exception: doh!', stderr.getvalue())337 def test_done_callback_already_successful(self):338 callback_result = None339 def fn(callback_future):340 nonlocal callback_result341 callback_result = callback_future.result()342 f = Future()343 f.set_result(5)344 f.add_done_callback(fn)345 self.assertEqual(5, callback_result)346 def test_done_callback_already_failed(self):347 callback_exception = None348 def fn(callback_future):349 nonlocal callback_exception350 callback_exception = callback_future.exception()351 f = Future()352 f.set_exception(Exception('test'))353 f.add_done_callback(fn)354 self.assertEqual(('test',), callback_exception.args)355 def test_done_callback_already_cancelled(self):356 was_cancelled = None357 def fn(callback_future):358 nonlocal was_cancelled359 was_cancelled = callback_future.cancelled()360 f = Future()361 self.assertTrue(f.cancel())362 f.add_done_callback(fn)363 self.assertTrue(was_cancelled)364 def test_repr(self):365 self.assertRegex(repr(PENDING_FUTURE),366 '<Future at 0x[0-9a-f]+ state=pending>')367 self.assertRegex(repr(RUNNING_FUTURE),368 '<Future at 0x[0-9a-f]+ state=running>')369 self.assertRegex(repr(CANCELLED_FUTURE),370 '<Future at 0x[0-9a-f]+ state=cancelled>')371 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),372 '<Future at 0x[0-9a-f]+ state=cancelled>')373 self.assertRegex(374 repr(EXCEPTION_FUTURE),375 '<Future at 0x[0-9a-f]+ state=finished raised IOError>')376 self.assertRegex(377 repr(SUCCESSFUL_FUTURE),378 '<Future at 0x[0-9a-f]+ state=finished returned int>')379 def test_cancel(self):380 f1 = create_future(state=PENDING)381 f2 = create_future(state=RUNNING)382 f3 = create_future(state=CANCELLED)383 f4 = create_future(state=CANCELLED_AND_NOTIFIED)384 f5 = create_future(state=FINISHED, exception=IOError())385 f6 = create_future(state=FINISHED, result=5)386 self.assertTrue(f1.cancel())387 self.assertEqual(f1._state, CANCELLED)388 self.assertFalse(f2.cancel())389 self.assertEqual(f2._state, RUNNING)390 self.assertTrue(f3.cancel())391 self.assertEqual(f3._state, CANCELLED)392 self.assertTrue(f4.cancel())393 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)394 self.assertFalse(f5.cancel())395 self.assertEqual(f5._state, FINISHED)396 self.assertFalse(f6.cancel())397 self.assertEqual(f6._state, FINISHED)398 def test_cancelled(self):399 self.assertFalse(PENDING_FUTURE.cancelled())400 self.assertFalse(RUNNING_FUTURE.cancelled())401 self.assertTrue(CANCELLED_FUTURE.cancelled())402 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())403 self.assertFalse(EXCEPTION_FUTURE.cancelled())404 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())405 def test_done(self):406 self.assertFalse(PENDING_FUTURE.done())407 self.assertFalse(RUNNING_FUTURE.done())408 self.assertTrue(CANCELLED_FUTURE.done())409 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())410 self.assertTrue(EXCEPTION_FUTURE.done())411 self.assertTrue(SUCCESSFUL_FUTURE.done())412 def test_running(self):413 self.assertFalse(PENDING_FUTURE.running())414 self.assertTrue(RUNNING_FUTURE.running())415 self.assertFalse(CANCELLED_FUTURE.running())416 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())417 self.assertFalse(EXCEPTION_FUTURE.running())418 self.assertFalse(SUCCESSFUL_FUTURE.running())419 def test_result_with_timeout(self):420 self.assertRaises(futures.TimeoutError,421 PENDING_FUTURE.result, timeout=0)422 self.assertRaises(futures.TimeoutError,423 RUNNING_FUTURE.result, timeout=0)424 self.assertRaises(futures.CancelledError,425 CANCELLED_FUTURE.result, timeout=0)426 self.assertRaises(futures.CancelledError,427 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)428 self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)429 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)430 def test_result_with_success(self):431 # TODO(brian@sweetapp.com): This test is timing dependant.432 def notification():433 # Wait until the main thread is waiting for the result.434 time.sleep(1)435 f1.set_result(42)436 f1 = create_future(state=PENDING)437 t = threading.Thread(target=notification)438 t.start()439 self.assertEqual(f1.result(timeout=5), 42)440 def test_result_with_cancel(self):441 # TODO(brian@sweetapp.com): This test is timing dependant.442 def notification():443 # Wait until the main thread is waiting for the result.444 time.sleep(1)445 f1.cancel()446 f1 = create_future(state=PENDING)447 t = threading.Thread(target=notification)448 t.start()449 self.assertRaises(futures.CancelledError, f1.result, timeout=5)450 def test_exception_with_timeout(self):451 self.assertRaises(futures.TimeoutError,452 PENDING_FUTURE.exception, timeout=0)453 self.assertRaises(futures.TimeoutError,454 RUNNING_FUTURE.exception, timeout=0)455 self.assertRaises(futures.CancelledError,456 CANCELLED_FUTURE.exception, timeout=0)457 self.assertRaises(futures.CancelledError,458 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)459 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),460 IOError))461 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)462 def test_exception_with_success(self):463 def notification():464 # Wait until the main thread is waiting for the exception.465 time.sleep(1)466 with f1._condition:467 f1._state = FINISHED468 f1._exception = IOError()469 f1._condition.notify_all()470 f1 = create_future(state=PENDING)471 t = threading.Thread(target=notification)472 t.start()473 self.assertTrue(isinstance(f1.exception(timeout=5), IOError))474@test.support.reap_threads475def test_main():476 try:477 test.support.run_unittest(ProcessPoolExecutorTest,478 ThreadPoolExecutorTest,479 ProcessPoolWaitTests,480 ThreadPoolWaitTests,481 ProcessPoolAsCompletedTests,482 ThreadPoolAsCompletedTests,483 FutureTests,484 ProcessPoolShutdownTest,...
_base.py
Source:_base.py
1# Copyright 2009 Brian Quinlan. All Rights Reserved.2# Licensed to PSF under a Contributor Agreement.3from __future__ import with_statement4import functools5import logging6import threading7import time8try:9 from collections import namedtuple10except ImportError:11 from concurrent.futures._compat import namedtuple12__author__ = 'Brian Quinlan (brian@sweetapp.com)'13FIRST_COMPLETED = 'FIRST_COMPLETED'14FIRST_EXCEPTION = 'FIRST_EXCEPTION'15ALL_COMPLETED = 'ALL_COMPLETED'16_AS_COMPLETED = '_AS_COMPLETED'17# Possible future states (for internal use by the futures package).18PENDING = 'PENDING'19RUNNING = 'RUNNING'20# The future was cancelled by the user...21CANCELLED = 'CANCELLED'22# ...and _Waiter.add_cancelled() was called by a worker.23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'24FINISHED = 'FINISHED'25_FUTURE_STATES = [26 PENDING,27 RUNNING,28 CANCELLED,29 CANCELLED_AND_NOTIFIED,30 FINISHED31]32_STATE_TO_DESCRIPTION_MAP = {33 PENDING: "pending",34 RUNNING: "running",35 CANCELLED: "cancelled",36 CANCELLED_AND_NOTIFIED: "cancelled",37 FINISHED: "finished"38}39# Logger for internal use by the futures package.40LOGGER = logging.getLogger("concurrent.futures")41STDERR_HANDLER = logging.StreamHandler()42LOGGER.addHandler(STDERR_HANDLER)43class Error(Exception):44 """Base class for all future-related exceptions."""45 pass46class CancelledError(Error):47 """The Future was cancelled."""48 pass49class TimeoutError(Error):50 """The operation exceeded the given deadline."""51 pass52class _Waiter(object):53 """Provides the event that wait() and as_completed() block on."""54 def __init__(self):55 self.event = threading.Event()56 self.finished_futures = []57 def add_result(self, future):58 self.finished_futures.append(future)59 def add_exception(self, future):60 self.finished_futures.append(future)61 def add_cancelled(self, future):62 self.finished_futures.append(future)63class _AsCompletedWaiter(_Waiter):64 """Used by as_completed()."""65 def __init__(self):66 super(_AsCompletedWaiter, self).__init__()67 self.lock = threading.Lock()68 def add_result(self, future):69 with self.lock:70 super(_AsCompletedWaiter, self).add_result(future)71 self.event.set()72 def add_exception(self, future):73 with self.lock:74 super(_AsCompletedWaiter, self).add_exception(future)75 self.event.set()76 def add_cancelled(self, future):77 with self.lock:78 super(_AsCompletedWaiter, self).add_cancelled(future)79 self.event.set()80class _FirstCompletedWaiter(_Waiter):81 """Used by wait(return_when=FIRST_COMPLETED)."""82 def add_result(self, future):83 super(_FirstCompletedWaiter, self).add_result(future)84 self.event.set()85 def add_exception(self, future):86 super(_FirstCompletedWaiter, self).add_exception(future)87 self.event.set()88 def add_cancelled(self, future):89 super(_FirstCompletedWaiter, self).add_cancelled(future)90 self.event.set()91class _AllCompletedWaiter(_Waiter):92 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""93 def __init__(self, num_pending_calls, stop_on_exception):94 self.num_pending_calls = num_pending_calls95 self.stop_on_exception = stop_on_exception96 super(_AllCompletedWaiter, self).__init__()97 def _decrement_pending_calls(self):98 if self.num_pending_calls == len(self.finished_futures):99 self.event.set()100 def add_result(self, future):101 super(_AllCompletedWaiter, self).add_result(future)102 self._decrement_pending_calls()103 def add_exception(self, future):104 super(_AllCompletedWaiter, self).add_exception(future)105 if self.stop_on_exception:106 self.event.set()107 else:108 self._decrement_pending_calls()109 def add_cancelled(self, future):110 super(_AllCompletedWaiter, self).add_cancelled(future)111 self._decrement_pending_calls()112class _AcquireFutures(object):113 """A context manager that does an ordered acquire of Future conditions."""114 def __init__(self, futures):115 self.futures = sorted(futures, key=id)116 def __enter__(self):117 for future in self.futures:118 future._condition.acquire()119 def __exit__(self, *args):120 for future in self.futures:121 future._condition.release()122def _create_and_install_waiters(fs, return_when):123 if return_when == _AS_COMPLETED:124 waiter = _AsCompletedWaiter()125 elif return_when == FIRST_COMPLETED:126 waiter = _FirstCompletedWaiter()127 else:128 pending_count = sum(129 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)130 if return_when == FIRST_EXCEPTION:131 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)132 elif return_when == ALL_COMPLETED:133 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)134 else:135 raise ValueError("Invalid return condition: %r" % return_when)136 for f in fs:137 f._waiters.append(waiter)138 return waiter139def as_completed(fs, timeout=None):140 """An iterator over the given futures that yields each as it completes.141 Args:142 fs: The sequence of Futures (possibly created by different Executors) to143 iterate over.144 timeout: The maximum number of seconds to wait. If None, then there145 is no limit on the wait time.146 Returns:147 An iterator that yields the given Futures as they complete (finished or148 cancelled).149 Raises:150 TimeoutError: If the entire result iterator could not be generated151 before the given timeout.152 """153 if timeout is not None:154 end_time = timeout + time.time()155 with _AcquireFutures(fs):156 finished = set(157 f for f in fs158 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])159 pending = set(fs) - finished160 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)161 try:162 for future in finished:163 yield future164 while pending:165 if timeout is None:166 wait_timeout = None167 else:168 wait_timeout = end_time - time.time()169 if wait_timeout < 0:170 raise TimeoutError(171 '%d (of %d) futures unfinished' % (172 len(pending), len(fs)))173 waiter.event.wait(wait_timeout)174 with waiter.lock:175 finished = waiter.finished_futures176 waiter.finished_futures = []177 waiter.event.clear()178 for future in finished:179 yield future180 pending.remove(future)181 finally:182 for f in fs:183 f._waiters.remove(waiter)184DoneAndNotDoneFutures = namedtuple(185 'DoneAndNotDoneFutures', 'done not_done')186def wait(fs, timeout=None, return_when=ALL_COMPLETED):187 """Wait for the futures in the given sequence to complete.188 Args:189 fs: The sequence of Futures (possibly created by different Executors) to190 wait upon.191 timeout: The maximum number of seconds to wait. If None, then there192 is no limit on the wait time.193 return_when: Indicates when this function should return. The options194 are:195 FIRST_COMPLETED - Return when any future finishes or is196 cancelled.197 FIRST_EXCEPTION - Return when any future finishes by raising an198 exception. If no future raises an exception199 then it is equivalent to ALL_COMPLETED.200 ALL_COMPLETED - Return when all futures finish or are cancelled.201 Returns:202 A named 2-tuple of sets. The first set, named 'done', contains the203 futures that completed (is finished or cancelled) before the wait204 completed. The second set, named 'not_done', contains uncompleted205 futures.206 """207 with _AcquireFutures(fs):208 done = set(f for f in fs209 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])210 not_done = set(fs) - done211 if (return_when == FIRST_COMPLETED) and done:212 return DoneAndNotDoneFutures(done, not_done)213 elif (return_when == FIRST_EXCEPTION) and done:214 if any(f for f in done215 if not f.cancelled() and f.exception() is not None):216 return DoneAndNotDoneFutures(done, not_done)217 if len(done) == len(fs):218 return DoneAndNotDoneFutures(done, not_done)219 waiter = _create_and_install_waiters(fs, return_when)220 waiter.event.wait(timeout)221 for f in fs:222 f._waiters.remove(waiter)223 done.update(waiter.finished_futures)224 return DoneAndNotDoneFutures(done, set(fs) - done)225class Future(object):226 """Represents the result of an asynchronous computation."""227 def __init__(self):228 """Initializes the future. Should not be called by clients."""229 self._condition = threading.Condition()230 self._state = PENDING231 self._result = None232 self._exception = None233 self._waiters = []234 self._done_callbacks = []235 def _invoke_callbacks(self):236 for callback in self._done_callbacks:237 try:238 callback(self)239 except Exception:240 LOGGER.exception('exception calling callback for %r', self)241 def __repr__(self):242 with self._condition:243 if self._state == FINISHED:244 if self._exception:245 return '<Future at %s state=%s raised %s>' % (246 hex(id(self)),247 _STATE_TO_DESCRIPTION_MAP[self._state],248 self._exception.__class__.__name__)249 else:250 return '<Future at %s state=%s returned %s>' % (251 hex(id(self)),252 _STATE_TO_DESCRIPTION_MAP[self._state],253 self._result.__class__.__name__)254 return '<Future at %s state=%s>' % (255 hex(id(self)),256 _STATE_TO_DESCRIPTION_MAP[self._state])257 def cancel(self):258 """Cancel the future if possible.259 Returns True if the future was cancelled, False otherwise. A future260 cannot be cancelled if it is running or has already completed.261 """262 with self._condition:263 if self._state in [RUNNING, FINISHED]:264 return False265 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:266 return True267 self._state = CANCELLED268 self._condition.notify_all()269 self._invoke_callbacks()270 return True271 def cancelled(self):272 """Return True if the future has cancelled."""273 with self._condition:274 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]275 def running(self):276 """Return True if the future is currently executing."""277 with self._condition:278 return self._state == RUNNING279 def done(self):280 """Return True of the future was cancelled or finished executing."""281 with self._condition:282 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]283 def __get_result(self):284 if self._exception:285 raise self._exception286 else:287 return self._result288 def add_done_callback(self, fn):289 """Attaches a callable that will be called when the future finishes.290 Args:291 fn: A callable that will be called with this future as its only292 argument when the future completes or is cancelled. The callable293 will always be called by a thread in the same process in which294 it was added. If the future has already completed or been295 cancelled then the callable will be called immediately. These296 callables are called in the order that they were added.297 """298 with self._condition:299 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:300 self._done_callbacks.append(fn)301 return302 fn(self)303 def result(self, timeout=None):304 """Return the result of the call that the future represents.305 Args:306 timeout: The number of seconds to wait for the result if the future307 isn't done. If None, then there is no limit on the wait time.308 Returns:309 The result of the call that the future represents.310 Raises:311 CancelledError: If the future was cancelled.312 TimeoutError: If the future didn't finish executing before the given313 timeout.314 Exception: If the call raised then that exception will be raised.315 """316 with self._condition:317 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:318 raise CancelledError()319 elif self._state == FINISHED:320 return self.__get_result()321 self._condition.wait(timeout)322 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:323 raise CancelledError()324 elif self._state == FINISHED:325 return self.__get_result()326 else:327 raise TimeoutError()328 def exception(self, timeout=None):329 """Return the exception raised by the call that the future represents.330 Args:331 timeout: The number of seconds to wait for the exception if the332 future isn't done. If None, then there is no limit on the wait333 time.334 Returns:335 The exception raised by the call that the future represents or None336 if the call completed without raising.337 Raises:338 CancelledError: If the future was cancelled.339 TimeoutError: If the future didn't finish executing before the given340 timeout.341 """342 with self._condition:343 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:344 raise CancelledError()345 elif self._state == FINISHED:346 return self._exception347 self._condition.wait(timeout)348 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:349 raise CancelledError()350 elif self._state == FINISHED:351 return self._exception352 else:353 raise TimeoutError()354 # The following methods should only be used by Executors and in tests.355 def set_running_or_notify_cancel(self):356 """Mark the future as running or process any cancel notifications.357 Should only be used by Executor implementations and unit tests.358 If the future has been cancelled (cancel() was called and returned359 True) then any threads waiting on the future completing (though calls360 to as_completed() or wait()) are notified and False is returned.361 If the future was not cancelled then it is put in the running state362 (future calls to running() will return True) and True is returned.363 This method should be called by Executor implementations before364 executing the work associated with this future. If this method returns365 False then the work should not be executed.366 Returns:367 False if the Future was cancelled, True otherwise.368 Raises:369 RuntimeError: if this method was already called or if set_result()370 or set_exception() was called.371 """372 with self._condition:373 if self._state == CANCELLED:374 self._state = CANCELLED_AND_NOTIFIED375 for waiter in self._waiters:376 waiter.add_cancelled(self)377 # self._condition.notify_all() is not necessary because378 # self.cancel() triggers a notification.379 return False380 elif self._state == PENDING:381 self._state = RUNNING382 return True383 else:384 LOGGER.critical('Future %s in unexpected state: %s',385 id(self.future),386 self.future._state)387 raise RuntimeError('Future in unexpected state')388 def set_result(self, result):389 """Sets the return value of work associated with the future.390 Should only be used by Executor implementations and unit tests.391 """392 with self._condition:393 self._result = result394 self._state = FINISHED395 for waiter in self._waiters:396 waiter.add_result(self)397 self._condition.notify_all()398 self._invoke_callbacks()399 def set_exception(self, exception):400 """Sets the result of the future as being the given exception.401 Should only be used by Executor implementations and unit tests.402 """403 with self._condition:404 self._exception = exception405 self._state = FINISHED406 for waiter in self._waiters:407 waiter.add_exception(self)408 self._condition.notify_all()409 self._invoke_callbacks()410class Executor(object):411 """This is an abstract base class for concrete asynchronous executors."""412 def submit(self, fn, *args, **kwargs):413 """Submits a callable to be executed with the given arguments.414 Schedules the callable to be executed as fn(*args, **kwargs) and returns415 a Future instance representing the execution of the callable.416 Returns:417 A Future representing the given call.418 """419 raise NotImplementedError()420 def map(self, fn, *iterables, **kwargs):421 """Returns a iterator equivalent to map(fn, iter).422 Args:423 fn: A callable that will take take as many arguments as there are424 passed iterables.425 timeout: The maximum number of seconds to wait. If None, then there426 is no limit on the wait time.427 Returns:428 An iterator equivalent to: map(func, *iterables) but the calls may429 be evaluated out-of-order.430 Raises:431 TimeoutError: If the entire result iterator could not be generated432 before the given timeout.433 Exception: If fn(*args) raises for any values.434 """435 timeout = kwargs.get('timeout')436 if timeout is not None:437 end_time = timeout + time.time()438 fs = [self.submit(fn, *args) for args in zip(*iterables)]439 try:440 for future in fs:441 if timeout is None:442 yield future.result()443 else:444 yield future.result(end_time - time.time())445 finally:446 for future in fs:447 future.cancel()448 def shutdown(self, wait=True):449 """Clean-up the resources associated with the Executor.450 It is safe to call this method several times. Otherwise, no other451 methods can be called after this one.452 Args:453 wait: If True then shutdown will not return until all running454 futures have finished executing and the resources used by the455 executor have been reclaimed.456 """457 pass458 def __enter__(self):459 return self460 def __exit__(self, exc_type, exc_val, exc_tb):461 self.shutdown(wait=True)...
futures.py
Source:futures.py
...266 dest.set_exception(_convert_future_exc(exception))267 else:268 result = source.result()269 dest.set_result(result)270def _chain_future(source, destination):271 """Chain two futures so that when one completes, so does the other.272 The result (or exception) of source will be copied to destination.273 If destination is cancelled, source gets cancelled too.274 Compatible with both asyncio.Future and concurrent.futures.Future.275 """276 if not isfuture(source) and not isinstance(source,277 concurrent.futures.Future):278 raise TypeError('A future is required for source argument')279 if not isfuture(destination) and not isinstance(destination,280 concurrent.futures.Future):281 raise TypeError('A future is required for destination argument')282 source_loop = _get_loop(source) if isfuture(source) else None283 dest_loop = _get_loop(destination) if isfuture(destination) else None284 def _set_state(future, other):285 if isfuture(future):286 _copy_future_state(other, future)287 else:288 _set_concurrent_future_state(future, other)289 def _call_check_cancel(destination):290 if destination.cancelled():291 if source_loop is None or source_loop is dest_loop:292 source.cancel()293 else:294 source_loop.call_soon_threadsafe(source.cancel)295 def _call_set_state(source):296 if (destination.cancelled() and297 dest_loop is not None and dest_loop.is_closed()):298 return299 if dest_loop is None or dest_loop is source_loop:300 _set_state(destination, source)301 else:302 dest_loop.call_soon_threadsafe(_set_state, destination, source)303 destination.add_done_callback(_call_check_cancel)304 source.add_done_callback(_call_set_state)305def wrap_future(future, *, loop=None):306 """Wrap concurrent.futures.Future object."""307 if isfuture(future):308 return future309 assert isinstance(future, concurrent.futures.Future), \310 f'concurrent.futures.Future is expected, got {future!r}'311 if loop is None:312 loop = events.get_event_loop()313 new_future = loop.create_future()314 _chain_future(future, new_future)315 return new_future316try:317 import _asyncio318except ImportError:319 pass320else:321 # _CFuture is needed for tests....
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!