Best Python code snippet using playwright-python
test_contextlib_async.py
Source:test_contextlib_async.py
...20class TestAbstractAsyncContextManager(unittest.TestCase):21 @_async_test22 async def test_enter(self):23 class DefaultEnter(AbstractAsyncContextManager):24 async def __aexit__(self, *args):25 await super().__aexit__(*args)26 manager = DefaultEnter()27 self.assertIs(await manager.__aenter__(), manager)28 async with manager as context:29 self.assertIs(manager, context)30 @_async_test31 async def test_async_gen_propagates_generator_exit(self):32 # A regression test for https://bugs.python.org/issue33786.33 @asynccontextmanager34 async def ctx():35 yield36 async def gen():37 async with ctx():38 yield 1139 ret = []40 exc = ValueError(22)41 with self.assertRaises(ValueError):42 async with ctx():43 async for val in gen():44 ret.append(val)45 raise exc46 self.assertEqual(ret, [11])47 def test_exit_is_abstract(self):48 class MissingAexit(AbstractAsyncContextManager):49 pass50 with self.assertRaises(TypeError):51 MissingAexit()52 def test_structural_subclassing(self):53 class ManagerFromScratch:54 async def __aenter__(self):55 return self56 async def __aexit__(self, exc_type, exc_value, traceback):57 return None58 self.assertTrue(issubclass(ManagerFromScratch, AbstractAsyncContextManager))59 class DefaultEnter(AbstractAsyncContextManager):60 async def __aexit__(self, *args):61 await super().__aexit__(*args)62 self.assertTrue(issubclass(DefaultEnter, AbstractAsyncContextManager))63 class NoneAenter(ManagerFromScratch):64 __aenter__ = None65 self.assertFalse(issubclass(NoneAenter, AbstractAsyncContextManager))66 class NoneAexit(ManagerFromScratch):67 __aexit__ = None68 self.assertFalse(issubclass(NoneAexit, AbstractAsyncContextManager))69class AsyncContextManagerTestCase(unittest.TestCase):70 @_async_test71 async def test_contextmanager_plain(self):72 state = []73 @asynccontextmanager74 async def woohoo():75 state.append(1)76 yield 4277 state.append(999)78 async with woohoo() as x:79 self.assertEqual(state, [1])80 self.assertEqual(x, 42)81 state.append(x)82 self.assertEqual(state, [1, 42, 999])83 @_async_test84 async def test_contextmanager_finally(self):85 state = []86 @asynccontextmanager87 async def woohoo():88 state.append(1)89 try:90 yield 4291 finally:92 state.append(999)93 with self.assertRaises(ZeroDivisionError):94 async with woohoo() as x:95 self.assertEqual(state, [1])96 self.assertEqual(x, 42)97 state.append(x)98 raise ZeroDivisionError()99 self.assertEqual(state, [1, 42, 999])100 @_async_test101 async def test_contextmanager_no_reraise(self):102 @asynccontextmanager103 async def whee():104 yield105 ctx = whee()106 await ctx.__aenter__()107 # Calling __aexit__ should not result in an exception108 self.assertFalse(await ctx.__aexit__(TypeError, TypeError("foo"), None))109 @_async_test110 async def test_contextmanager_trap_yield_after_throw(self):111 @asynccontextmanager112 async def whoo():113 try:114 yield115 except:116 yield117 ctx = whoo()118 await ctx.__aenter__()119 with self.assertRaises(RuntimeError):120 await ctx.__aexit__(TypeError, TypeError('foo'), None)121 @_async_test122 async def test_contextmanager_trap_no_yield(self):123 @asynccontextmanager124 async def whoo():125 if False:126 yield127 ctx = whoo()128 with self.assertRaises(RuntimeError):129 await ctx.__aenter__()130 @_async_test131 async def test_contextmanager_trap_second_yield(self):132 @asynccontextmanager133 async def whoo():134 yield135 yield136 ctx = whoo()137 await ctx.__aenter__()138 with self.assertRaises(RuntimeError):139 await ctx.__aexit__(None, None, None)140 @_async_test141 async def test_contextmanager_non_normalised(self):142 @asynccontextmanager143 async def whoo():144 try:145 yield146 except RuntimeError:147 raise SyntaxError148 ctx = whoo()149 await ctx.__aenter__()150 with self.assertRaises(SyntaxError):151 await ctx.__aexit__(RuntimeError, None, None)152 @_async_test153 async def test_contextmanager_except(self):154 state = []155 @asynccontextmanager156 async def woohoo():157 state.append(1)158 try:159 yield 42160 except ZeroDivisionError as e:161 state.append(e.args[0])162 self.assertEqual(state, [1, 42, 999])163 async with woohoo() as x:164 self.assertEqual(state, [1])165 self.assertEqual(x, 42)166 state.append(x)167 raise ZeroDivisionError(999)168 self.assertEqual(state, [1, 42, 999])169 @_async_test170 async def test_contextmanager_except_stopiter(self):171 @asynccontextmanager172 async def woohoo():173 yield174 for stop_exc in (StopIteration('spam'), StopAsyncIteration('ham')):175 with self.subTest(type=type(stop_exc)):176 try:177 async with woohoo():178 raise stop_exc179 except Exception as ex:180 self.assertIs(ex, stop_exc)181 else:182 self.fail(f'{stop_exc} was suppressed')183 @_async_test184 async def test_contextmanager_wrap_runtimeerror(self):185 @asynccontextmanager186 async def woohoo():187 try:188 yield189 except Exception as exc:190 raise RuntimeError(f'caught {exc}') from exc191 with self.assertRaises(RuntimeError):192 async with woohoo():193 1 / 0194 # If the context manager wrapped StopAsyncIteration in a RuntimeError,195 # we also unwrap it, because we can't tell whether the wrapping was196 # done by the generator machinery or by the generator itself.197 with self.assertRaises(StopAsyncIteration):198 async with woohoo():199 raise StopAsyncIteration200 def _create_contextmanager_attribs(self):201 def attribs(**kw):202 def decorate(func):203 for k,v in kw.items():204 setattr(func,k,v)205 return func206 return decorate207 @asynccontextmanager208 @attribs(foo='bar')209 async def baz(spam):210 """Whee!"""211 yield212 return baz213 def test_contextmanager_attribs(self):214 baz = self._create_contextmanager_attribs()215 self.assertEqual(baz.__name__,'baz')216 self.assertEqual(baz.foo, 'bar')217 @support.requires_docstrings218 def test_contextmanager_doc_attrib(self):219 baz = self._create_contextmanager_attribs()220 self.assertEqual(baz.__doc__, "Whee!")221 @support.requires_docstrings222 @_async_test223 async def test_instance_docstring_given_cm_docstring(self):224 baz = self._create_contextmanager_attribs()(None)225 self.assertEqual(baz.__doc__, "Whee!")226 async with baz:227 pass # suppress warning228 @_async_test229 async def test_keywords(self):230 # Ensure no keyword arguments are inhibited231 @asynccontextmanager232 async def woohoo(self, func, args, kwds):233 yield (self, func, args, kwds)234 async with woohoo(self=11, func=22, args=33, kwds=44) as target:235 self.assertEqual(target, (11, 22, 33, 44))236class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase):237 class SyncAsyncExitStack(AsyncExitStack):238 @staticmethod239 def run_coroutine(coro):240 loop = asyncio.get_event_loop()241 f = asyncio.ensure_future(coro)242 f.add_done_callback(lambda f: loop.stop())243 loop.run_forever()244 exc = f.exception()245 if not exc:246 return f.result()247 else:248 context = exc.__context__249 try:250 raise exc251 except:252 exc.__context__ = context253 raise exc254 def close(self):255 return self.run_coroutine(self.aclose())256 def __enter__(self):257 return self.run_coroutine(self.__aenter__())258 def __exit__(self, *exc_details):259 return self.run_coroutine(self.__aexit__(*exc_details))260 exit_stack = SyncAsyncExitStack261 def setUp(self):262 self.loop = asyncio.new_event_loop()263 asyncio.set_event_loop(self.loop)264 self.addCleanup(self.loop.close)265 self.addCleanup(asyncio.set_event_loop_policy, None)266 @_async_test267 async def test_async_callback(self):268 expected = [269 ((), {}),270 ((1,), {}),271 ((1,2), {}),272 ((), dict(example=1)),273 ((1,), dict(example=1)),274 ((1,2), dict(example=1)),275 ]276 result = []277 async def _exit(*args, **kwds):278 """Test metadata propagation"""279 result.append((args, kwds))280 async with AsyncExitStack() as stack:281 for args, kwds in reversed(expected):282 if args and kwds:283 f = stack.push_async_callback(_exit, *args, **kwds)284 elif args:285 f = stack.push_async_callback(_exit, *args)286 elif kwds:287 f = stack.push_async_callback(_exit, **kwds)288 else:289 f = stack.push_async_callback(_exit)290 self.assertIs(f, _exit)291 for wrapper in stack._exit_callbacks:292 self.assertIs(wrapper[1].__wrapped__, _exit)293 self.assertNotEqual(wrapper[1].__name__, _exit.__name__)294 self.assertIsNone(wrapper[1].__doc__, _exit.__doc__)295 self.assertEqual(result, expected)296 result = []297 async with AsyncExitStack() as stack:298 with self.assertRaises(TypeError):299 stack.push_async_callback(arg=1)300 with self.assertRaises(TypeError):301 self.exit_stack.push_async_callback(arg=2)302 with self.assertWarns(DeprecationWarning):303 stack.push_async_callback(callback=_exit, arg=3)304 self.assertEqual(result, [((), {'arg': 3})])305 @_async_test306 async def test_async_push(self):307 exc_raised = ZeroDivisionError308 async def _expect_exc(exc_type, exc, exc_tb):309 self.assertIs(exc_type, exc_raised)310 async def _suppress_exc(*exc_details):311 return True312 async def _expect_ok(exc_type, exc, exc_tb):313 self.assertIsNone(exc_type)314 self.assertIsNone(exc)315 self.assertIsNone(exc_tb)316 class ExitCM(object):317 def __init__(self, check_exc):318 self.check_exc = check_exc319 async def __aenter__(self):320 self.fail("Should not be called!")321 async def __aexit__(self, *exc_details):322 await self.check_exc(*exc_details)323 async with self.exit_stack() as stack:324 stack.push_async_exit(_expect_ok)325 self.assertIs(stack._exit_callbacks[-1][1], _expect_ok)326 cm = ExitCM(_expect_ok)327 stack.push_async_exit(cm)328 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)329 stack.push_async_exit(_suppress_exc)330 self.assertIs(stack._exit_callbacks[-1][1], _suppress_exc)331 cm = ExitCM(_expect_exc)332 stack.push_async_exit(cm)333 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)334 stack.push_async_exit(_expect_exc)335 self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)336 stack.push_async_exit(_expect_exc)337 self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)338 1/0339 @_async_test340 async def test_async_enter_context(self):341 class TestCM(object):342 async def __aenter__(self):343 result.append(1)344 async def __aexit__(self, *exc_details):345 result.append(3)346 result = []347 cm = TestCM()348 async with AsyncExitStack() as stack:349 @stack.push_async_callback # Registered first => cleaned up last350 async def _exit():351 result.append(4)352 self.assertIsNotNone(_exit)353 await stack.enter_async_context(cm)354 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)355 result.append(2)356 self.assertEqual(result, [1, 2, 3, 4])357 @_async_test358 async def test_async_exit_exception_chaining(self):...
test_lock.py
Source:test_lock.py
...12 async with lock:13 assert await cache.get(pytest.KEY + "-lock") == lock._value14 @pytest.mark.asyncio15 async def test_release_does_nothing_when_no_lock(self, lock):16 assert await lock.__aexit__("exc_type", "exc_value", "traceback") is None17 @pytest.mark.asyncio18 async def test_acquire_release(self, cache, lock):19 async with lock:20 pass21 assert await cache.get(pytest.KEY + "-lock") is None22 @pytest.mark.asyncio23 @pytest.mark.skip(reason="flaky test")24 async def test_locking_dogpile(self, mocker, cache):25 mocker.spy(cache, "get")26 mocker.spy(cache, "set")27 mocker.spy(cache, "_add")28 async def dummy():29 res = await cache.get(pytest.KEY)30 if res is not None:31 return res32 async with RedLock(cache, pytest.KEY, lease=5):33 res = await cache.get(pytest.KEY)34 if res is not None:35 return res36 await asyncio.sleep(0.1)37 await cache.set(pytest.KEY, "value")38 await asyncio.gather(dummy(), dummy(), dummy(), dummy())39 assert cache._add.call_count == 440 assert cache.get.call_count == 841 assert cache.set.call_count == 142 @pytest.mark.asyncio43 async def test_locking_dogpile_lease_expiration(self, mocker, cache):44 mocker.spy(cache, "get")45 mocker.spy(cache, "set")46 async def dummy():47 res = await cache.get(pytest.KEY)48 if res is not None:49 return res50 async with RedLock(cache, pytest.KEY, lease=1):51 res = await cache.get(pytest.KEY)52 if res is not None:53 return res54 await asyncio.sleep(1.1)55 await cache.set(pytest.KEY, "value")56 await asyncio.gather(dummy(), dummy(), dummy(), dummy())57 assert cache.get.call_count == 858 assert cache.set.call_count == 459 @pytest.mark.asyncio60 async def test_locking_dogpile_propagates_exceptions(self, cache):61 async def dummy():62 async with RedLock(cache, pytest.KEY, lease=1):63 raise ValueError()64 with pytest.raises(ValueError):65 await dummy()66class TestMemoryRedLock:67 @pytest.fixture68 def lock(self, memory_cache):69 return RedLock(memory_cache, pytest.KEY, 20)70 @pytest.mark.asyncio71 async def test_release_wrong_token_fails(self, lock):72 await lock.__aenter__()73 lock._value = "random"74 assert await lock.__aexit__("exc_type", "exc_value", "traceback") is None75 @pytest.mark.asyncio76 async def test_release_wrong_client_fails(self, memory_cache, lock):77 wrong_lock = RedLock(memory_cache, pytest.KEY, 20)78 await lock.__aenter__()79 assert await wrong_lock.__aexit__("exc_type", "exc_value", "traceback") is None80 @pytest.mark.asyncio81 async def test_float_lease(self, memory_cache):82 lock = RedLock(memory_cache, pytest.KEY, 0.1)83 await lock.__aenter__()84 await asyncio.sleep(0.2)85 assert await lock.__aexit__("exc_type", "exc_value", "traceback") is None86class TestRedisRedLock:87 @pytest.fixture88 def lock(self, redis_cache):89 return RedLock(redis_cache, pytest.KEY, 20)90 @pytest.mark.asyncio91 async def test_release_wrong_token_fails(self, lock):92 await lock.__aenter__()93 lock._value = "random"94 assert await lock.__aexit__("exc_type", "exc_value", "traceback") is None95 @pytest.mark.asyncio96 async def test_release_wrong_client_fails(self, redis_cache, lock):97 wrong_lock = RedLock(redis_cache, pytest.KEY, 20)98 await lock.__aenter__()99 assert await wrong_lock.__aexit__("exc_type", "exc_value", "traceback") is None100 @pytest.mark.asyncio101 async def test_float_lease(self, redis_cache):102 lock = RedLock(redis_cache, pytest.KEY, 0.1)103 await lock.__aenter__()104 await asyncio.sleep(0.2)105 assert await lock.__aexit__("exc_type", "exc_value", "traceback") is None106class TestMemcachedRedLock:107 @pytest.fixture108 def lock(self, memcached_cache):109 return RedLock(memcached_cache, pytest.KEY, 20)110 @pytest.mark.asyncio111 async def test_release_wrong_token_succeeds_meh(self, lock):112 await lock.__aenter__()113 lock._value = "random"114 assert await lock.__aexit__("exc_type", "exc_value", "traceback") is None115 @pytest.mark.asyncio116 async def test_release_wrong_client_succeeds_meh(self, memcached_cache, lock):117 wrong_lock = RedLock(memcached_cache, pytest.KEY, 20)118 await lock.__aenter__()119 assert await wrong_lock.__aexit__("exc_type", "exc_value", "traceback") is None120 @pytest.mark.asyncio121 async def test_float_lease(self, memcached_cache):122 lock = RedLock(memcached_cache, pytest.KEY, 0.1)123 with pytest.raises(TypeError):124 await lock.__aenter__()125class TestOptimisticLock:126 @pytest.fixture127 def lock(self, cache):128 return OptimisticLock(cache, pytest.KEY)129 @pytest.mark.asyncio130 async def test_acquire(self, cache, lock):131 await cache.set(pytest.KEY, "value")132 async with lock:133 assert lock._token == await cache._gets(cache._build_key(pytest.KEY))134 @pytest.mark.asyncio135 async def test_release_does_nothing(self, lock):136 assert await lock.__aexit__("exc_type", "exc_value", "traceback") is None137 @pytest.mark.asyncio138 async def test_check_and_set_not_existing_never_fails(self, cache, lock):139 async with lock as locked:140 await cache.set(pytest.KEY, "conflicting_value")141 await locked.cas("value")142 assert await cache.get(pytest.KEY) == "value"143 @pytest.mark.asyncio144 async def test_check_and_set(self, cache, lock):145 await cache.set(pytest.KEY, "previous_value")146 async with lock as locked:147 await locked.cas("value")148 assert await cache.get(pytest.KEY) == "value"149 @pytest.mark.asyncio150 async def test_check_and_set_fail(self, cache, lock):...
context.py
Source:context.py
...23# await self.txn.__aenter__()24# return self25# def __exit__(self, exc_type, exc_val, exc_tb):26# raise NotImplementedError()27# async def __aexit__(self, exc_type, exc_val, exc_tb):28# async with self.database._conn_lock:29# if self.connection is None:30# self.database.pop_execution_context()31# else:32# try:33# if self.with_transaction:34# if not exc_type:35# self.txn.commit(False)36# await self.txn.__aexit__(exc_type, exc_val, exc_tb)37# finally:38# self.database.pop_execution_context()39# await self.database._close(self.connection)40# class AioUsing(AioExecutionContext, Using):41# def __enter__(self):42# raise NotImplementedError()43# async def __aenter__(self):44# self._orig = []45# for model in self.models:46# self._orig.append(model._meta.database)47# model._meta.database = self.database48# return super(Using, self).__aenter__()49# def __exit__(self, exc_type, exc_val, exc_tb):50# raise NotImplementedError()51# async def __aexit__(self, exc_type, exc_val, exc_tb):52# await super(Using, self).__aexit__(exc_type, exc_val, exc_tb)53# for i, model in enumerate(self.models):54# model._meta.database = self._orig[i]55class _aio_atomic(_aio_callable_context_manager):56 __slots__ = ('conn', 'transaction_type', 'context_manager')57 def __init__(self, conn, transaction_type=None):58 self.conn = conn59 self.transaction_type = transaction_type60 async def __aenter__(self):61 await self.conn.__aenter__()62 if self.conn.transaction_depth() == 0:63 self.context_manager = self.conn.transaction(self.transaction_type)64 else:65 self.context_manager = self.conn.savepoint()66 return await self.context_manager.__aenter__()67 async def __aexit__(self, exc_type, exc_val, exc_tb):68 await self.context_manager.__aexit__(exc_type, exc_val, exc_tb)69 await self.conn.__aexit__(exc_type, exc_val, exc_tb)70class aio_transaction(_aio_callable_context_manager):71 __slots__ = ('conn', 'autocommit', 'transaction_type')72 def __init__(self, conn, transaction_type=None):73 self.conn = conn74 self.transaction_type = transaction_type75 async def _begin(self):76 if self.transaction_type:77 await self.conn.begin(self.transaction_type)78 else:79 await self.conn.begin()80 async def commit(self, begin=True):81 await self.conn.commit()82 if begin:83 await self._begin()84 async def rollback(self, begin=True):85 await self.conn.rollback()86 if begin:87 await self._begin()88 async def __aenter__(self):89 self.autocommit = self.conn.autocommit90 self.conn.autocommit = False91 if self.conn.transaction_depth() == 0:92 await self._begin()93 self.conn.push_transaction(self)94 return self95 async def __aexit__(self, exc_type, exc_val, exc_tb):96 try:97 if exc_type:98 await self.rollback(False)99 elif self.conn.transaction_depth() == 1:100 try:101 await self.commit(False)102 except:103 await self.rollback(False)104 raise105 finally:106 self.conn.autocommit = self.autocommit107 self.conn.pop_transaction()108class aio_savepoint(_aio_callable_context_manager):109 __slots__ = ('conn', 'sid', 'quoted_sid', 'autocommit')110 def __init__(self, conn, sid=None):111 self.conn = conn112 self.sid = sid or uuid.uuid4().hex113 _compiler = conn.compiler() # TODO: breing the compiler here somehow114 self.quoted_sid = _compiler.quote(self.sid)115 async def _execute(self, query):116 await self.conn.execute_sql(query, require_commit=False)117 async def _begin(self):118 await self._execute('SAVEPOINT %s;' % self.quoted_sid)119 async def commit(self, begin=True):120 await self._execute('RELEASE SAVEPOINT %s;' % self.quoted_sid)121 if begin:122 await self._begin()123 async def rollback(self):124 await self._execute('ROLLBACK TO SAVEPOINT %s;' % self.quoted_sid)125 def __enter__(self):126 raise NotImplementedError()127 async def __aenter__(self):128 self.autocommit = self.conn.get_autocommit()129 self.conn.set_autocommit(False)130 await self._begin()131 return self132 def __exit__(self, exc_type, exc_val, exc_tb):133 raise NotImplementedError()134 async def __aexit__(self, exc_type, exc_val, exc_tb):135 try:136 if exc_type:137 await self.rollback()138 else:139 try:140 await self.commit(begin=False)141 except:142 await self.rollback()143 raise144 finally:...
unit_of_work.py
Source:unit_of_work.py
...28 steps: repository.AbstractStepRepository29 session: AbstractSession30 async def __aenter__(self) -> AbstractUnitOfWork:31 return self32 async def __aexit__(self, *args):33 await self.rollback()34 async def commit(self):35 await self._commit()36 def collect_new_events(self):37 for repo in [self.services, self.deployments, self.steps, self.users]:38 for model in repo.seen:39 model.raise_recorded_events()40 while model.events:41 yield model.events.pop(0)42 @abc.abstractmethod43 async def _commit(self):44 raise NotImplementedError45 @abc.abstractmethod46 async def rollback(self):47 raise NotImplementedError48DEFAULT_ENGINE = create_async_engine(settings.database_url, echo=False)49DEFAULT_SESSION_FACTORY = sessionmaker(class_=AsyncSession, expire_on_commit=False)50class SqlAlchemyUnitOfWork(AbstractUnitOfWork):51 def __init__(self, session_factory=DEFAULT_SESSION_FACTORY, engine=DEFAULT_ENGINE):52 self.engine = engine53 self.session_factory = session_factory54 async def __aenter__(self):55 self.connection = await self.engine.connect()56 self.session = self.session_factory(bind=self.connection)57 self.services = repository.SqlAlchemyServiceRepository(self.session)58 self.users = repository.SqlAlchemyUserRepository(self.session)59 self.deployments = repository.SqlAlchemyDeploymentRepository(self.session)60 self.steps = repository.SqlAlchemyStepRepository(self.session)61 return await super().__aenter__()62 async def __aexit__(self, *args):63 self.session.expunge_all()64 await super().__aexit__(*args)65 await self.session.close()66 # await self.connection.close()67 # await self.engine.dispose()68 async def _commit(self):69 await self.session.commit()70 async def rollback(self):71 await self.session.rollback()72class TestableSqlAlchemyUnitOfWork(SqlAlchemyUnitOfWork):73 """74 Since we want to wrap our tests in a transaction we can roll75 back at the end of the test to leave the database untouched,76 we cannot use SqlAlchemyUnitOfWork directly. It will call77 session.close() and session.rollback() on __exit__. And this78 will trigger the outer transaction to rollback :(.79 Therefore we just override the __exit__ method and do nothing80 instead.81 See https://docs.sqlalchemy.org/en/14/orm/session_transaction.html82 #joining-a-session-into-an-external-transaction-such-as-for-test-suites83 """84 async def __aexit__(self, *args):85 pass86class DoNothingSession(AbstractSession):87 def expunge_all(self):88 pass89 def expunge(self, obj):90 pass91 def close(self):92 pass93 def commit(self):94 pass95 def rollback(self):96 pass97class InMemoryUnitOfWork(AbstractUnitOfWork):98 def __init__(self):99 self.session = DoNothingSession()100 self.services = repository.InMemoryServiceRepository()101 self.users = repository.InMemoryUserRepository()102 self.deployments = repository.InMemoryDeploymentRepository()103 self.steps = repository.InMemoryStepRepository()104 self.committed = False105 async def __aexit__(self, *args):106 await super().__aexit__(*args)107 async def _commit(self):108 self.committed = True109 async def rollback(self):...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!