Best Python code snippet using fMBT_python
test_contextlib_async.py
Source:test_contextlib_async.py
1import asyncio2from contextlib import asynccontextmanager, AbstractAsyncContextManager, AsyncExitStack3import functools4from test import support5import unittest6from test.test_contextlib import TestBaseExitStack7def _async_test(func):8 """Decorator to turn an async function into a test case."""9 @functools.wraps(func)10 def wrapper(*args, **kwargs):11 coro = func(*args, **kwargs)12 loop = asyncio.new_event_loop()13 asyncio.set_event_loop(loop)14 try:15 return loop.run_until_complete(coro)16 finally:17 loop.close()18 asyncio.set_event_loop(None)19 return wrapper20class TestAbstractAsyncContextManager(unittest.TestCase):21 @_async_test22 async def test_enter(self):23 class DefaultEnter(AbstractAsyncContextManager):24 async def __aexit__(self, *args):25 await super().__aexit__(*args)26 manager = DefaultEnter()27 self.assertIs(await manager.__aenter__(), manager)28 async with manager as context:29 self.assertIs(manager, context)30 def test_exit_is_abstract(self):31 class MissingAexit(AbstractAsyncContextManager):32 pass33 with self.assertRaises(TypeError):34 MissingAexit()35 def test_structural_subclassing(self):36 class ManagerFromScratch:37 async def __aenter__(self):38 return self39 async def __aexit__(self, exc_type, exc_value, traceback):40 return None41 self.assertTrue(issubclass(ManagerFromScratch, AbstractAsyncContextManager))42 class DefaultEnter(AbstractAsyncContextManager):43 async def __aexit__(self, *args):44 await super().__aexit__(*args)45 self.assertTrue(issubclass(DefaultEnter, AbstractAsyncContextManager))46 class NoneAenter(ManagerFromScratch):47 __aenter__ = None48 self.assertFalse(issubclass(NoneAenter, AbstractAsyncContextManager))49 class NoneAexit(ManagerFromScratch):50 __aexit__ = None51 self.assertFalse(issubclass(NoneAexit, AbstractAsyncContextManager))52class AsyncContextManagerTestCase(unittest.TestCase):53 @_async_test54 async def test_contextmanager_plain(self):55 state = []56 @asynccontextmanager57 async def woohoo():58 state.append(1)59 yield 4260 state.append(999)61 async with woohoo() as x:62 self.assertEqual(state, [1])63 self.assertEqual(x, 42)64 state.append(x)65 self.assertEqual(state, [1, 42, 999])66 @_async_test67 async def test_contextmanager_finally(self):68 state = []69 @asynccontextmanager70 async def woohoo():71 state.append(1)72 try:73 yield 4274 finally:75 state.append(999)76 with self.assertRaises(ZeroDivisionError):77 async with woohoo() as x:78 self.assertEqual(state, [1])79 self.assertEqual(x, 42)80 state.append(x)81 raise ZeroDivisionError()82 self.assertEqual(state, [1, 42, 999])83 @_async_test84 async def test_contextmanager_no_reraise(self):85 @asynccontextmanager86 async def whee():87 yield88 ctx = whee()89 await ctx.__aenter__()90 # Calling __aexit__ should not result in an exception91 self.assertFalse(await ctx.__aexit__(TypeError, TypeError("foo"), None))92 @_async_test93 async def test_contextmanager_trap_yield_after_throw(self):94 @asynccontextmanager95 async def whoo():96 try:97 yield98 except:99 yield100 ctx = whoo()101 await ctx.__aenter__()102 with self.assertRaises(RuntimeError):103 await ctx.__aexit__(TypeError, TypeError('foo'), None)104 @_async_test105 async def test_contextmanager_trap_no_yield(self):106 @asynccontextmanager107 async def whoo():108 if False:109 yield110 ctx = whoo()111 with self.assertRaises(RuntimeError):112 await ctx.__aenter__()113 @_async_test114 async def test_contextmanager_trap_second_yield(self):115 @asynccontextmanager116 async def whoo():117 yield118 yield119 ctx = whoo()120 await ctx.__aenter__()121 with self.assertRaises(RuntimeError):122 await ctx.__aexit__(None, None, None)123 @_async_test124 async def test_contextmanager_non_normalised(self):125 @asynccontextmanager126 async def whoo():127 try:128 yield129 except RuntimeError:130 raise SyntaxError131 ctx = whoo()132 await ctx.__aenter__()133 with self.assertRaises(SyntaxError):134 await ctx.__aexit__(RuntimeError, None, None)135 @_async_test136 async def test_contextmanager_except(self):137 state = []138 @asynccontextmanager139 async def woohoo():140 state.append(1)141 try:142 yield 42143 except ZeroDivisionError as e:144 state.append(e.args[0])145 self.assertEqual(state, [1, 42, 999])146 async with woohoo() as x:147 self.assertEqual(state, [1])148 self.assertEqual(x, 42)149 state.append(x)150 raise ZeroDivisionError(999)151 self.assertEqual(state, [1, 42, 999])152 @_async_test153 async def test_contextmanager_except_stopiter(self):154 @asynccontextmanager155 async def woohoo():156 yield157 for stop_exc in (StopIteration('spam'), StopAsyncIteration('ham')):158 with self.subTest(type=type(stop_exc)):159 try:160 async with woohoo():161 raise stop_exc162 except Exception as ex:163 self.assertIs(ex, stop_exc)164 else:165 self.fail(f'{stop_exc} was suppressed')166 @_async_test167 async def test_contextmanager_wrap_runtimeerror(self):168 @asynccontextmanager169 async def woohoo():170 try:171 yield172 except Exception as exc:173 raise RuntimeError(f'caught {exc}') from exc174 with self.assertRaises(RuntimeError):175 async with woohoo():176 1 / 0177 # If the context manager wrapped StopAsyncIteration in a RuntimeError,178 # we also unwrap it, because we can't tell whether the wrapping was179 # done by the generator machinery or by the generator itself.180 with self.assertRaises(StopAsyncIteration):181 async with woohoo():182 raise StopAsyncIteration183 def _create_contextmanager_attribs(self):184 def attribs(**kw):185 def decorate(func):186 for k,v in kw.items():187 setattr(func,k,v)188 return func189 return decorate190 @asynccontextmanager191 @attribs(foo='bar')192 async def baz(spam):193 """Whee!"""194 yield195 return baz196 def test_contextmanager_attribs(self):197 baz = self._create_contextmanager_attribs()198 self.assertEqual(baz.__name__,'baz')199 self.assertEqual(baz.foo, 'bar')200 @support.requires_docstrings201 def test_contextmanager_doc_attrib(self):202 baz = self._create_contextmanager_attribs()203 self.assertEqual(baz.__doc__, "Whee!")204 @support.requires_docstrings205 @_async_test206 async def test_instance_docstring_given_cm_docstring(self):207 baz = self._create_contextmanager_attribs()(None)208 self.assertEqual(baz.__doc__, "Whee!")209 async with baz:210 pass # suppress warning211 @_async_test212 async def test_keywords(self):213 # Ensure no keyword arguments are inhibited214 @asynccontextmanager215 async def woohoo(self, func, args, kwds):216 yield (self, func, args, kwds)217 async with woohoo(self=11, func=22, args=33, kwds=44) as target:218 self.assertEqual(target, (11, 22, 33, 44))219class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase):220 class SyncAsyncExitStack(AsyncExitStack):221 @staticmethod222 def run_coroutine(coro):223 loop = asyncio.get_event_loop()224 f = asyncio.ensure_future(coro)225 f.add_done_callback(lambda f: loop.stop())226 loop.run_forever()227 exc = f.exception()228 if not exc:229 return f.result()230 else:231 context = exc.__context__232 try:233 raise exc234 except:235 exc.__context__ = context236 raise exc237 def close(self):238 return self.run_coroutine(self.aclose())239 def __enter__(self):240 return self.run_coroutine(self.__aenter__())241 def __exit__(self, *exc_details):242 return self.run_coroutine(self.__aexit__(*exc_details))243 exit_stack = SyncAsyncExitStack244 def setUp(self):245 self.loop = asyncio.new_event_loop()246 asyncio.set_event_loop(self.loop)247 self.addCleanup(self.loop.close)248 @_async_test249 async def test_async_callback(self):250 expected = [251 ((), {}),252 ((1,), {}),253 ((1,2), {}),254 ((), dict(example=1)),255 ((1,), dict(example=1)),256 ((1,2), dict(example=1)),257 ]258 result = []259 async def _exit(*args, **kwds):260 """Test metadata propagation"""261 result.append((args, kwds))262 async with AsyncExitStack() as stack:263 for args, kwds in reversed(expected):264 if args and kwds:265 f = stack.push_async_callback(_exit, *args, **kwds)266 elif args:267 f = stack.push_async_callback(_exit, *args)268 elif kwds:269 f = stack.push_async_callback(_exit, **kwds)270 else:271 f = stack.push_async_callback(_exit)272 self.assertIs(f, _exit)273 for wrapper in stack._exit_callbacks:274 self.assertIs(wrapper[1].__wrapped__, _exit)275 self.assertNotEqual(wrapper[1].__name__, _exit.__name__)276 self.assertIsNone(wrapper[1].__doc__, _exit.__doc__)277 self.assertEqual(result, expected)278 @_async_test279 async def test_async_push(self):280 exc_raised = ZeroDivisionError281 async def _expect_exc(exc_type, exc, exc_tb):282 self.assertIs(exc_type, exc_raised)283 async def _suppress_exc(*exc_details):284 return True285 async def _expect_ok(exc_type, exc, exc_tb):286 self.assertIsNone(exc_type)287 self.assertIsNone(exc)288 self.assertIsNone(exc_tb)289 class ExitCM(object):290 def __init__(self, check_exc):291 self.check_exc = check_exc292 async def __aenter__(self):293 self.fail("Should not be called!")294 async def __aexit__(self, *exc_details):295 await self.check_exc(*exc_details)296 async with self.exit_stack() as stack:297 stack.push_async_exit(_expect_ok)298 self.assertIs(stack._exit_callbacks[-1][1], _expect_ok)299 cm = ExitCM(_expect_ok)300 stack.push_async_exit(cm)301 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)302 stack.push_async_exit(_suppress_exc)303 self.assertIs(stack._exit_callbacks[-1][1], _suppress_exc)304 cm = ExitCM(_expect_exc)305 stack.push_async_exit(cm)306 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)307 stack.push_async_exit(_expect_exc)308 self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)309 stack.push_async_exit(_expect_exc)310 self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)311 1/0312 @_async_test313 async def test_async_enter_context(self):314 class TestCM(object):315 async def __aenter__(self):316 result.append(1)317 async def __aexit__(self, *exc_details):318 result.append(3)319 result = []320 cm = TestCM()321 async with AsyncExitStack() as stack:322 @stack.push_async_callback # Registered first => cleaned up last323 async def _exit():324 result.append(4)325 self.assertIsNotNone(_exit)326 await stack.enter_async_context(cm)327 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)328 result.append(2)329 self.assertEqual(result, [1, 2, 3, 4])330 @_async_test331 async def test_async_exit_exception_chaining(self):332 # Ensure exception chaining matches the reference behaviour333 async def raise_exc(exc):334 raise exc335 saved_details = None336 async def suppress_exc(*exc_details):337 nonlocal saved_details338 saved_details = exc_details339 return True340 try:341 async with self.exit_stack() as stack:342 stack.push_async_callback(raise_exc, IndexError)343 stack.push_async_callback(raise_exc, KeyError)344 stack.push_async_callback(raise_exc, AttributeError)345 stack.push_async_exit(suppress_exc)346 stack.push_async_callback(raise_exc, ValueError)347 1 / 0348 except IndexError as exc:349 self.assertIsInstance(exc.__context__, KeyError)350 self.assertIsInstance(exc.__context__.__context__, AttributeError)351 # Inner exceptions were suppressed352 self.assertIsNone(exc.__context__.__context__.__context__)353 else:354 self.fail("Expected IndexError, but no exception was raised")355 # Check the inner exceptions356 inner_exc = saved_details[1]357 self.assertIsInstance(inner_exc, ValueError)358 self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)359if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!