Best Python code snippet using pytest-asyncio_python
fixtures.py
Source:fixtures.py
1#2# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.3# This software is distributed under the terms of the MIT License.4#5# (@@@@%%%%%%%%%&@@&.6# /%&&%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%&@@(7# *@&%%%%%%%%%&&%%%%%%%%%%%%%%%%%%&&&%%%%%%%8# @ @@@(@@@@%%%%%%%%%%%%%%%%&@@&* @@@ .9# , . . .@@@& /10# . . *11# @@ . @12# @&&&&&&@. . . *@%&@13# &&&&&&&&&&&&&&&&@@ *@@############@14# *&/ @@ #&&&&&&&&&&&&&&&&&&&&@ ###################*15# @&&&&&&&&&&&&&&&&&&##################@16# %@&&&&&&&&&&&&&&################@17# @&&&&&&&&&&%#######&@%18# nanaimo (@&&&&####@@*19#20"""21Almost everything in Nanaimo is a :class:`nanaimo.fixtures.Fixture`. Fixtures can be pytest fixtures, instrument22abstractions, aggregates of other fixtures, or anything else that makes sense. The important thing23is that any fixture can be a pytest fixture or can be awaited directly using :ref:`nait`.24"""25import abc26import asyncio27import io28import logging29import math30import textwrap31import typing32import nanaimo33class Fixture(metaclass=abc.ABCMeta):34 """35 Common, abstract class for pytest fixtures based on Nanaimo. Nanaimo fixtures provide a visitor pattern for36 arguments that are common for both pytest extra arguments and for argparse commandline arguments. This allows37 a Nanaimo fixture to expose a direct invocation mode for debuging with the same arguments used by the fixture38 as a pytest plugin. Additionally all Nanaimo fixtures provide a :func:`gather` function that takes a39 :class:`nanaimo.Namespace` containing the provided arguments and returns a set of :class:`nanaimo.Artifacts`40 gathered by the fixture. The contents of these artifacts are documented by each concrete fixture.41 .. invisible-code-block: python42 import nanaimo43 import nanaimo.fixtures44 import asyncio45 _doc_loop = asyncio.new_event_loop()46 .. code-block:: python47 class MyFixture(nanaimo.fixtures.Fixture):48 @classmethod49 def on_visit_test_arguments(cls, arguments: nanaimo.Arguments) -> None:50 arguments.add_argument('--foo', default='bar')51 async def on_gather(self, args: nanaimo.Namespace) -> nanaimo.Artifacts:52 artifacts = nanaimo.Artifacts(-1)53 # do something and then return54 artifacts.result_code = 055 return artifacts56 .. invisible-code-block: python57 foo = MyFixture(nanaimo.fixtures.FixtureManager(_doc_loop), nanaimo.Namespace())58 _doc_loop.run_until_complete(foo.gather())59 `MyFixture` can now be used from a commandline like::60 python -m nanaimo MyFixture --foo baz61 or as part of a pytest::62 pytest --foo=baz63 :param FixtureManager manager: The fixture manager that is the scope for this fixture. There must be64 a 1:1 relationship between a fixture instance and a fixture manager instance.65 :param nanaimo.Namespace args: A namespace containing the arguments for this fixture.66 :param kwargs: All fixtures can be given a :class:`asyncio.AbstractEventLoop` instance to use as ``loop`` and67 an initial value for :data:`gather_timeout_seconds` as ``gather_timeout_seconds (float)``. Other keyword68 arguments may used by fixture specializations.69 """70 @classmethod71 def get_canonical_name(cls) -> str:72 """73 The name to use as a key for this :class:`nanaimo.fixtures.Fixture` type.74 If a class defines a string `fixture_name` this will be used75 as the canonical name otherwise it will be the name of the76 fixture class itself.77 .. invisible-code-block: python78 import nanaimo79 import nanaimo.fixtures80 .. code-block:: python81 class MyFixture(nanaimo.fixtures.Fixture):82 fixture_name = 'my_fixture'83 assert 'my_fixture' == MyFixture.get_canonical_name()84 """85 return str(getattr(cls, 'fixture_name', '.'.join([cls.__module__, cls.__qualname__])))86 @classmethod87 def get_argument_prefix(cls) -> str:88 """89 The name to use as a prefix for arguments. This also becomes the configuration90 section that the fixture's arguments can be overridden from. If the fixture91 defines an ``argument_prefix`` class member this value is used otherwise the92 value returned from :meth:`get_canonical_name` is used.93 .. invisible-code-block: python94 import nanaimo95 import nanaimo.fixtures96 .. code-block:: python97 class MyFixture(nanaimo.fixtures.Fixture):98 argument_prefix = 'mf'99 >>> MyFixture.get_argument_prefix() # noqa : F821100 'mf'101 .. code-block:: python102 class MyOtherFixture(nanaimo.fixtures.Fixture):103 # this class doesn't define argument_prefix so104 # so the canonical name is used instead.105 fixture_name = 'my_outre_fixture'106 >>> MyOtherFixture.get_argument_prefix() # noqa : F821107 'my-outre-fixture'108 """109 return str(getattr(cls, 'argument_prefix', cls.get_canonical_name().replace('_', '-')))110 @classmethod111 def get_arg_covariant(cls,112 args: nanaimo.Namespace,113 base_name: str,114 default_value: typing.Optional[typing.Any] = None) -> typing.Any:115 """116 When called by a baseclass this method will return the most specalized argument value117 available.118 :param args: The arguments to search.119 :param base_name: The base name. For example ``foo`` for ``--prefix-bar``.120 :param default_value: The value to use if the argument could not be found.121 """122 prefix = cls.get_argument_prefix().replace('-', '_')123 if len(prefix) > 0:124 prefix += '_'125 full_key = '{}{}'.format(prefix, base_name.replace('-', '_'))126 result = getattr(args, full_key)127 if result is None:128 return default_value129 else:130 return result131 @classmethod132 def get_arg_covariant_or_fail(cls, args: nanaimo.Namespace, base_name: str) -> typing.Any:133 """134 Calls :meth:`get_arg_covariant` but raises :class:`ValueError` if the result is `None`.135 :raises ValueError: if no value could be found for the argument.136 """137 optional_result = cls.get_arg_covariant(args, base_name)138 if optional_result is None:139 raise ValueError('{base_name} argument not provided (--[argument prefix]-{base_name}'140 .format(base_name=base_name))141 return optional_result142 def __init__(self,143 manager: 'FixtureManager',144 args: typing.Optional[nanaimo.Namespace] = None,145 **kwargs: typing.Any):146 self._manager = manager147 self._args = (args if args is not None else nanaimo.Namespace())148 self._name = self.get_canonical_name()149 self._logger = logging.getLogger(self._name)150 if 'loop' in kwargs:151 raise ValueError('Do not pass the loop into the fixture. '152 'Fixtures obtain the loop from their FixtureManager.')153 if 'gather_timeout_seconds' in kwargs:154 gather_timeout_seconds = typing.cast(typing.Optional[float], kwargs['gather_timeout_seconds'])155 self._gather_timeout_seconds = gather_timeout_seconds156 else:157 self._gather_timeout_seconds = None158 def gather_until_complete(self, *args: typing.Any, **kwargs: typing.Any) -> nanaimo.Artifacts:159 """160 helper function where this:161 .. invisible-code-block: python162 import nanaimo163 import nanaimo.fixtures164 import asyncio165 _doc_loop = asyncio.new_event_loop()166 class MyFixture(nanaimo.fixtures.Fixture):167 @classmethod168 def on_visit_test_arguments(cls, arguments: nanaimo.Arguments) -> None:169 pass170 async def on_gather(self, args: nanaimo.Namespace) -> nanaimo.Artifacts:171 artifacts = nanaimo.Artifacts(0)172 return artifacts173 foo = MyFixture(nanaimo.fixtures.FixtureManager(_doc_loop), nanaimo.Namespace())174 .. code-block:: python175 foo.gather_until_complete()176 is equivalent to this:177 .. code-block:: python178 foo.loop.run_until_complete(foo.gather())179 """180 return self.loop.run_until_complete(self.gather(*args, **kwargs))181 async def gather(self, *args: typing.Any, **kwargs: typing.Any) -> nanaimo.Artifacts:182 """183 Coroutine awaited to gather a new set of fixture artifacts.184 :param kwargs: Optional arguments to override or augment the arguments provided to the185 :class:`nanaimo.fixtures.Fixture` constructor186 :return: A set of artifacts with the :attr:`nanaimo.Artifacts.result_code` set to indicate the success or187 failure of the fixture's artifact gathering activies.188 :raises asyncio.TimeoutError: If :data:`gather_timeout_seconds` is > 0 and :meth:`on_gather` takes longer189 then this to complete or if on_gather itself raises a timeout error.190 """191 pushed_args = self._args192 self._args = self._args.merge(**kwargs)193 try:194 routine = self.on_gather(self._args) # type: typing.Coroutine195 if self._gather_timeout_seconds is not None:196 done, pending = await asyncio.wait([asyncio.ensure_future(routine)],197 loop=self.loop,198 timeout=self._gather_timeout_seconds,199 return_when=asyncio.ALL_COMPLETED) \200 # type: typing.Set[asyncio.Future], typing.Set[asyncio.Future]201 if len(pending) > 0:202 pending.pop().cancel()203 raise asyncio.TimeoutError('{} gather was cancelled after waiting for {} seconds'204 .format(self.get_canonical_name(),205 self._gather_timeout_seconds))206 return done.pop().result()207 else:208 return await routine209 finally:210 self._args = pushed_args211 # +-----------------------------------------------------------------------+212 # | PROPERTIES213 # +-----------------------------------------------------------------------+214 @property215 def name(self) -> str:216 """217 The canonical name for the Fixture.218 """219 return self._name220 @property221 def loop(self) -> asyncio.AbstractEventLoop:222 """223 The running asyncio EventLoop in use by this Fixture.224 This will be the loop provided to the fixture in the constructor if that loop is still225 running otherwise the loop will be a running loop retrieved by :func:`asyncio.get_event_loop`.226 :raises RuntimeError: if no running event loop could be found.227 """228 return self.manager.loop229 @property230 def manager(self) -> 'FixtureManager':231 """232 The :class:`FixtureManager` that owns this :class:`nanaimo.fixtures.Fixture`.233 """234 return self._manager235 @property236 def logger(self) -> logging.Logger:237 """238 A logger for this :class:`nanaimo.fixtures.Fixture` instance.239 """240 return self._logger241 @property242 def fixture_arguments(self) -> nanaimo.Namespace:243 """244 The Fixture-wide arguments. Can be overridden by kwargs for each :meth:`gather` invocation.245 """246 return self._args247 @property248 def gather_timeout_seconds(self) -> typing.Optional[float]:249 """250 The timeout in fractional seconds to wait for :meth:`on_gather` to complete before raising251 a :class:`asyncio.TimeoutError`.252 """253 return self._gather_timeout_seconds254 @gather_timeout_seconds.setter255 def gather_timeout_seconds(self, gather_timeout_seconds: float) -> None:256 self._gather_timeout_seconds = gather_timeout_seconds257 @classmethod258 def visit_test_arguments(cls, arguments: nanaimo.Arguments) -> None:259 """260 Visit this fixture's :meth:`on_visit_test_arguments` but with the261 proper :data:`nanaimo.Arguments.required_prefix` set.262 """263 previous_required_prefix = arguments.required_prefix264 arguments.required_prefix = cls.get_argument_prefix().replace('_', '-')265 cls.on_visit_test_arguments(arguments)266 arguments.required_prefix = previous_required_prefix267 # +-----------------------------------------------------------------------+268 # | ABSTRACT METHODS269 # +-----------------------------------------------------------------------+270 @classmethod271 @abc.abstractmethod272 def on_visit_test_arguments(cls, arguments: nanaimo.Arguments) -> None:273 """274 Called by the environment before instantiating any :class:`nanaimo.fixtures.Fixture` instances to register275 arguments supported by each type. These arguments should be portable between both :mod:`argparse`276 and ``pytest``. The fixture is registered for this callback by returning a reference to its277 type from a ``pytest_nanaimo_fixture_type`` hook in your fixture's pytest plugin module.278 """279 ...280 @abc.abstractmethod281 async def on_gather(self, args: nanaimo.Namespace) -> nanaimo.Artifacts:282 """283 Coroutine awaited by a call to :meth:`gather`. The fixture should always retrieve new artifacts when invoked284 leaving caching to the caller.285 :param args: The arguments provided for the fixture instance merged with kwargs provided to the :meth:`gather`286 method.287 :type args: nanaimo.Namespace288 :return: A set of artifacts with the :attr:`nanaimo.Artifacts.result_code` set to indicate the success or289 failure of the fixture's artifact gathering activies.290 :raises asyncio.TimeoutError: It is valid for a fixture to raise timeout errors from this method.291 """292 ...293 # +-----------------------------------------------------------------------+294 # | Optional Hooks295 # +-----------------------------------------------------------------------+296 def on_test_teardown(self, test_name: str) -> None:297 """298 Invoked for each fixture after it was used in a test as part of the pytest299 :func:`_pytest.hookspec.pytest_runtest_teardown` protocol.300 :param test_name: The name of the test the fixture was used with.301 """302 pass303 # +-----------------------------------------------------------------------+304 # | ASYNC HELPERS305 # +-----------------------------------------------------------------------+306 async def countdown_sleep(self, sleep_time_seconds: float) -> None:307 """308 Calls :func:`asyncio.sleep` for 1 second then emits an :meth:`logging.Logger.info`309 of the time remaining until `sleep_time_seconds`.310 This is useful for long waits as an indication that the process is not deadlocked.311 :param sleep_time_seconds: The amount of time in seconds for this coroutine to wait312 before exiting. For each second that passes while waiting for this amount of time313 the coroutine will :func:`asyncio.sleep` for 1 second.314 :type sleep_time_seconds: float315 """316 count_down = sleep_time_seconds317 while count_down >= 0:318 self._logger.info('%d', math.ceil(count_down))319 await asyncio.sleep(1)320 count_down -= 1321 async def observe_tasks_assert_not_done(self,322 observer_co_or_f: typing.Union[typing.Coroutine, asyncio.Future],323 timeout_seconds: typing.Optional[float],324 *persistent_tasks: typing.Union[typing.Coroutine, asyncio.Future]) \325 -> typing.Set[asyncio.Future]:326 """327 Allows running a set of tasks but returning when an observer task completes. This allows a pattern where328 a single task is evaluating the side-effects of other tasks as a gate to continuing the test.329 :param observer_co_or_f: The task that is expected to complete in less than ``timeout_seconds``.330 :type observer_co_or_f: typing.Union[typing.Coroutine, asyncio.Future]331 :param float timeout_seconds: Time in seconds to observe for before raising :class:`asyncio.TimeoutError`.332 Set to None to disable.333 :param persistent_tasks: Iterable of tasks that must remain active or :class:`AssertionError` will be raised.334 :type persistent_tasks: typing.Union[typing.Coroutine, asyncio.Future]335 :return: a list of the persistent tasks as futures.336 :rtype: typing.Set[asyncio.Future]337 :raises AssertionError: if any of the persistent tasks exited.338 :raises asyncio.TimeoutError: if the observer task does not complete within ``timeout_seconds``.339 """340 _, _, done, pending = await self._observe_tasks(observer_co_or_f, timeout_seconds, False, *persistent_tasks)341 if len(done) > 1:342 raise nanaimo.AssertionError('Tasks under observation completed before the observation was complete.')343 return pending344 async def observe_tasks(self,345 observer_co_or_f: typing.Union[typing.Coroutine, asyncio.Future],346 timeout_seconds: typing.Optional[float],347 *persistent_tasks: typing.Union[typing.Coroutine, asyncio.Future]) \348 -> typing.Set[asyncio.Future]:349 """350 Allows running a set of tasks but returning when an observer task completes. This allows a pattern where351 a single task is evaluating the side-effects of other tasks as a gate to continuing the test or simply352 that a set of task should continue to run but a single task must complete.353 :param observer_co_or_f: The task that is expected to complete in less than ``timeout_seconds``.354 :type observer_co_or_f: typing.Union[typing.Coroutine, asyncio.Future]355 :param float timeout_seconds: Time in seconds to observe for before raising :class:`asyncio.TimeoutError`.356 Set to None to disable.357 :param persistent_tasks: Iterable of tasks that may remain active.358 :type persistent_tasks: typing.Union[typing.Coroutine, asyncio.Future]359 :return: a list of the persistent tasks as futures.360 :rtype: typing.Set[asyncio.Future]361 :raises AssertionError: if any of the persistent tasks exited.362 :raises asyncio.TimeoutError: if the observer task does not complete within ``timeout_seconds``.363 """364 _, _, done, pending = await self._observe_tasks(observer_co_or_f, timeout_seconds, False, *persistent_tasks)365 return pending366 async def gate_tasks(self,367 gate_co_or_f: typing.Union[typing.Coroutine, asyncio.Future],368 timeout_seconds: typing.Optional[float],369 *gated_tasks: typing.Union[typing.Coroutine, asyncio.Future]) \370 -> typing.Tuple[asyncio.Future, typing.List[asyncio.Future]]:371 """372 Runs a set of tasks until a gate task completes then cancels the remaining tasks.373 .. invisible-code-block: python374 from nanaimo.fixtures import FixtureManager375 from nanaimo.builtin import nanaimo_bar376 import asyncio377 loop = asyncio.get_event_loop()378 manager = FixtureManager(loop=loop)379 .. code-block:: python380 async def gated_task():381 while True:382 await asyncio.sleep(.1)383 async def gate_task():384 await asyncio.sleep(1)385 return 'gate passed'386 async def example():387 any_fixture = nanaimo_bar.Fixture(manager)388 gate_future, gated_futures = await any_fixture.gate_tasks(gate_task(), None, gated_task())389 assert not gate_future.cancelled()390 assert 'gate passed' == gate_future.result()391 assert len(gated_futures) == 1392 assert gated_futures[0].cancelled()393 .. invisible-code-block: python394 loop.run_until_complete(example())395 :param gate_co_or_f: The task that is expected to complete in less than ``timeout_seconds``.396 :type gate_co_or_f: typing.Union[typing.Coroutine, asyncio.Future]397 :param float timeout_seconds: Time in seconds to wait for the gate for before raising398 :class:`asyncio.TimeoutError`. Set to None to disable.399 :param persistent_tasks: Iterable of tasks that may remain active.400 :type persistent_tasks: typing.Union[typing.Coroutine, asyncio.Future]401 :return: a tuple of the gate future and a set of the gated futures.402 :rtype: typing.Tuple[asyncio.Future, typing.Set[asyncio.Future]]:403 :raises AssertionError: if any of the persistent tasks exited.404 :raises asyncio.TimeoutError: if the observer task does not complete within ``timeout_seconds``.405 """406 observer, observed, _, _ = await self._observe_tasks(gate_co_or_f, timeout_seconds, True, *gated_tasks)407 return observer, observed408 # +-----------------------------------------------------------------------+409 # | PRIVATE410 # +-----------------------------------------------------------------------+411 @classmethod412 async def _do_cancel_remaining(cls, pending: typing.Set[asyncio.Future]) -> None:413 for f in pending:414 f.cancel()415 try:416 await f417 except asyncio.CancelledError:418 pass419 async def _observe_tasks(self,420 observer_co_or_f: typing.Union[typing.Coroutine, asyncio.Future],421 timeout_seconds: typing.Optional[float],422 cancel_remaining: bool,423 *args: typing.Union[typing.Coroutine, asyncio.Future]) -> \424 typing.Tuple[asyncio.Future, typing.List[asyncio.Future],425 typing.Set[asyncio.Future], typing.Set[asyncio.Future]]:426 """427 :returns: observer, observed, done, pending428 """429 did_timeout = False430 observer_future = asyncio.ensure_future(observer_co_or_f)431 the_children_are_our_futures = [observer_future]432 observed_futures = []433 done_done = set() # type: typing.Set[asyncio.Future]434 for co_or_f in args:435 o = asyncio.ensure_future(co_or_f)436 the_children_are_our_futures.append(o)437 observed_futures.append(o)438 start_time = self.loop.time()439 while True:440 done, pending = await asyncio.wait(441 the_children_are_our_futures,442 timeout=timeout_seconds,443 return_when=asyncio.FIRST_COMPLETED) # type: typing.Set[asyncio.Future], typing.Set[asyncio.Future]444 for d in done:445 the_children_are_our_futures.remove(d)446 done_done.add(d)447 if observer_future.done():448 break449 if timeout_seconds is not None and self.loop.time() - start_time > timeout_seconds:450 did_timeout = True451 break452 if cancel_remaining:453 await self._do_cancel_remaining(pending)454 done_done.update(pending)455 pending.clear()456 if did_timeout:457 raise asyncio.TimeoutError()458 else:459 return observer_future, observed_futures, done_done, pending460# +---------------------------------------------------------------------------+461class SubprocessFixture(Fixture):462 """463 Fixture base type that accepts a string argument ``cmd`` and executes it as a subprocess.464 Because some subprocess commands might result in huge amounts of data being sent to stdout465 and/or stderr this class does not capture this data by default. Instead, tests are encouraged466 to either filter the subprocess pipes or use the ``--logfile`` argument to write the output467 to a file in persistent storage.468 Filtering is accomplished using the :data:`stdout_filter <nanaimo.fixtures.SubprocessFixture.stdout_filter>`469 or :data:`stderr_filter <nanaimo.fixtures.SubprocessFixture.stderr_filter>` property of this class.470 For example:471 .. invisible-code-block: python472 import nanaimo.version473 import asyncio474 loop = asyncio.get_event_loop()475 manager = nanaimo.fixtures.FixtureManager(loop=loop)476 .. code-block:: python477 class MySubprocessFixture(nanaimo.fixtures.SubprocessFixture):478 '''479 Subprocess test fixture that simply calls "nait --version"480 '''481 @classmethod482 def on_visit_test_arguments(cls, arguments: nanaimo.Arguments) -> None:483 pass484 def on_construct_command(self, arguments: nanaimo.Namespace, inout_artifacts: nanaimo.Artifacts) -> str:485 return 'nait --version'486 async def example(manager):487 subject = MySubprocessFixture(manager)488 # The accumulator does capture all stdout. Only use this if you know489 # the subprocess will produce a managable and bounded amount of output.490 filter = nanaimo.fixtures.SubprocessFixture.SubprocessMessageAccumulator()491 subject.stdout_filter = filter492 artifacts = await subject.gather()493 # In our example the subprocess produces only and exactly the Nanaimo494 # version number495 assert filter.getvalue() == nanaimo.version.__version__496 .. invisible-code-block: python497 loop.run_until_complete(example(manager))498 :param stdout_filter: A :class:`logging.Filter` used when gathering the subprocess.499 :param stderr_filter: A :class:`logging.Filter` used when gathering the subprocess.500 """501 class SubprocessMessageAccumulator(logging.Filter, io.StringIO):502 """503 Helper class for working with :meth:`SubprocessFixture.stdout_filter` or504 :meth:`SubprocessFixture.stderr_filter`. This implementation will simply505 write all log messages (i.e. :meth:`logging.LogRecord.getMessage()`) to506 its internal buffer. Use :meth:`getvalue()` to get a reference to the507 buffer.508 You can also subclass this method and override its :meth:`logging.Filter.filter`509 method to customize your filter criteria.510 :param minimum_level: The minimum loglevel to accumulate messages for.511 """512 def __init__(self, minimum_level: int = logging.INFO):513 logging.Filter.__init__(self)514 io.StringIO.__init__(self)515 self._minimum_level = minimum_level516 def filter(self, record: logging.LogRecord) -> bool:517 if record.levelno >= self._minimum_level:518 self.write(record.getMessage())519 return True520 class SubprocessMessageMatcher(logging.Filter):521 """522 Helper class for working with :meth:`SubprocessFixture.stdout_filter` or523 :meth:`SubprocessFixture.stderr_filter`. This implementation will watch every524 log message and store any that match the provided pattern.525 This matcher does not buffer all logged messages.526 :param pattern: A regular expression to match messages on.527 :param minimum_level: The minimum loglevel to accumulate messages for.528 """529 def __init__(self, pattern: typing.Any, minimum_level: int = logging.INFO):530 logging.Filter.__init__(self)531 self._pattern = pattern532 self._minimum_level = minimum_level533 self._matches = [] # type: typing.List534 @property535 def match_count(self) -> int:536 """537 The number of messages that matched the provided pattern.538 """539 return len(self._matches)540 @property541 def matches(self) -> typing.List:542 return self._matches543 def filter(self, record: logging.LogRecord) -> bool:544 if record.levelno >= self._minimum_level:545 match = self._pattern.match(record.getMessage())546 if match is not None:547 self._matches.append(match)548 return True549 def __init__(self,550 manager: 'FixtureManager',551 args: typing.Optional[nanaimo.Namespace] = None,552 **kwargs: typing.Any):553 super().__init__(manager, args, **kwargs)554 self._stdout_filter = None # type: typing.Optional[logging.Filter]555 self._stderr_filter = None # type: typing.Optional[logging.Filter]556 if 'stdout_filter' in kwargs:557 self._stdout_filter = kwargs['stdout_filter']558 if 'stderr_filter' in kwargs:559 self._stderr_filter = kwargs['stderr_filter']560 @property561 def stdout_filter(self) -> typing.Optional[logging.Filter]:562 """563 A filter used when logging the stdout stream with the subprocess.564 """565 return self._stdout_filter566 @stdout_filter.setter567 def stdout_filter(self, filter: typing.Optional[logging.Filter]) -> None:568 self._stdout_filter = filter569 @property570 def stderr_filter(self) -> typing.Optional[logging.Filter]:571 """572 A filter used when logging the stderr stream with the subprocess.573 """574 return self._stderr_filter575 @stderr_filter.setter576 def stderr_filter(self, filter: typing.Optional[logging.Filter]) -> None:577 self._stderr_filter = filter578 @classmethod579 def on_visit_test_arguments(cls, arguments: nanaimo.Arguments) -> None:580 arguments.add_argument('--cwd', help='The working directory to launch the subprocess at.')581 arguments.add_argument('--logfile', help='Path to a file to write stdout, stderr, and test logs to.')582 arguments.add_argument('--logfile-amend',583 default=False,584 help=textwrap.dedent('''585 If True then the logfile will be amended otherwise it will be overwritten586 (the default)587 ''').strip())588 arguments.add_argument('--logfile-format',589 default='%(asctime)s %(levelname)s %(name)s: %(message)s',590 help='Logger format to use for the logfile.')591 arguments.add_argument('--logfile-date-format',592 default='%Y-%m-%d %H:%M:%S',593 help='Logger date format to use for the logfile.')594 async def on_gather(self, args: nanaimo.Namespace) -> nanaimo.Artifacts:595 """596 +--------------+---------------------------+-------------------------------------------------------------------+597 | **Artifacts** |598 | |599 +--------------+---------------------------+-------------------------------------------------------------------+600 | key | type | Notes |601 +==============+===========================+===================================================================+602 | ``cmd`` | str | The command used to execute the subprocess. |603 +--------------+---------------------------+-------------------------------------------------------------------+604 | ``logfile`` | Optional[pathlib.Path] | A file containing stdout, stderr, and test logs |605 +--------------+---------------------------+-------------------------------------------------------------------+606 """607 artifacts = nanaimo.Artifacts()608 cmd = self.on_construct_command(args, artifacts)609 setattr(artifacts, 'cmd', cmd)610 logfile_handler = None # type: typing.Optional[logging.FileHandler]611 logfile = self.get_arg_covariant(args, 'logfile')612 logfile_amend = bool(self.get_arg_covariant(args, 'logfile-amend'))613 logfile_fmt = self.get_arg_covariant(args, 'logfile-format')614 logfile_datefmt = self.get_arg_covariant(args, 'logfile-date-format')615 cwd = self.get_arg_covariant(args, 'cwd')616 if logfile is not None:617 setattr(artifacts, 'logfile', logfile)618 logfile_handler = logging.FileHandler(filename=str(logfile), mode=('a' if logfile_amend else 'w'))619 file_formatter = logging.Formatter(fmt=logfile_fmt, datefmt=logfile_datefmt)620 logfile_handler.setFormatter(file_formatter)621 self.logger.addHandler(logfile_handler)622 stdout_filter = self._stdout_filter623 stderr_filter = self._stderr_filter624 if stdout_filter is not None:625 self.logger.addFilter(stdout_filter)626 if stderr_filter is not None:627 self.logger.addFilter(stderr_filter)628 try:629 self.logger.debug('About to execute command "%s" in a subprocess shell', cmd)630 proc = await asyncio.create_subprocess_shell(631 cmd,632 stdout=asyncio.subprocess.PIPE,633 stderr=asyncio.subprocess.PIPE,634 cwd=cwd635 ) # type: asyncio.subprocess.Process636 # Simply let the background process do it's thing and wait for it to finish.637 await self._wait_for_either_until_neither(638 (proc.stdout if proc.stdout is not None else self._NoopStreamReader()),639 (proc.stderr if proc.stderr is not None else self._NoopStreamReader()))640 await proc.wait()641 self._logger.debug('command "%s" exited with %i', cmd, proc.returncode)642 artifacts.result_code = proc.returncode643 return artifacts644 finally:645 if stderr_filter is not None:646 self._logger.removeFilter(stderr_filter)647 if stdout_filter is not None:648 self._logger.removeFilter(stdout_filter)649 if logfile_handler is not None:650 self._logger.removeHandler(logfile_handler)651 # +-----------------------------------------------------------------------+652 # | ABSTRACT METHODS653 # +-----------------------------------------------------------------------+654 @abc.abstractmethod655 def on_construct_command(self, arguments: nanaimo.Namespace, inout_artifacts: nanaimo.Artifacts) -> str:656 """657 Called by the subprocess fixture to ask the specialization to form a command658 given a set of arguments.659 :param arguments: The arguments passed into :meth:`Fixture.on_gather`.660 :type arguments: nanaimo.Arguments661 :param inout_artifacts: A set of artifacts the superclass is assembling This662 is provided to the subclass to allow it to optionally contribute artifacts.663 :type inout_artifacts: nanaimo.Artifacts664 :return: The command to run in a subprocess shell.665 """666 ...667 # +-----------------------------------------------------------------------+668 # | PRIVATE METHODS669 # +-----------------------------------------------------------------------+670 class _NoopStreamReader(asyncio.StreamReader):671 def __init__(self) -> None:672 super().__init__()673 self.feed_eof()674 async def _wait_for_either_until_neither(self,675 stdout: asyncio.StreamReader,676 stderr: asyncio.StreamReader) -> None:677 """678 Wait for a line of data from either stdout or stderr and log this data as received.679 When both are EOF then exit.680 :returns: Tuple of stdout, stderr681 """682 future_out = asyncio.ensure_future(stdout.readline())683 future_err = asyncio.ensure_future(stderr.readline())684 pending = set([future_out, future_err]) # type: typing.Set[asyncio.Future]685 done = set() # type: typing.Set[asyncio.Future]686 while len(pending) > 0:687 done, pending = await asyncio.wait(pending)688 for future_done in done:689 result = future_done.result()690 if len(result) > 0:691 line = result.decode(errors='replace')692 if future_done == future_err:693 future_err = asyncio.ensure_future(stderr.readline())694 pending.add(future_err)695 self._logger.error(self._ensure_no_newline_at_end(line))696 else:697 future_out = asyncio.ensure_future(stdout.readline())698 pending.add(future_out)699 self._logger.info(self._ensure_no_newline_at_end(line))700 elif future_done == future_err and not stderr.at_eof():701 # spurious awake?702 future_err = asyncio.ensure_future(stderr.readline())703 pending.add(future_err)704 elif future_done == future_out and not stdout.at_eof():705 # spurious awake?706 future_out = asyncio.ensure_future(stdout.readline())707 pending.add(future_out)708 @staticmethod709 def _ensure_no_newline_at_end(text: str) -> str:710 if text.endswith('\n'):711 text = text[:-1]712 if text.endswith('\r'):713 text = text[:-1]714 return text715# +---------------------------------------------------------------------------+716class FixtureManager:717 """718 A simple fixture manager and a baseclass for specalized managers.719 """720 def __init__(self, loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> None:721 self._loop = loop722 @property723 def loop(self) -> asyncio.AbstractEventLoop:724 """725 The running asyncio EventLoop in use by all Fixtures.726 This will be the loop provided to the fixture manager in the constructor if that loop is still727 running otherwise the loop will be a running loop retrieved by :func:`asyncio.get_event_loop`.728 :raises RuntimeError: if no running event loop could be found.729 """730 if self._loop is None or self._loop.is_closed():731 self._loop = asyncio.get_event_loop()732 return self._loop733 def create_fixture(self,734 canonical_name: str,735 args: typing.Optional[nanaimo.Namespace] = None,736 loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> Fixture:737 """738 Create a new :class:`nanaimo.fixtures.Fixture` instance iff the ``canonical_name``` is a registered739 plugin for this process.740 :param str canonical_name: The canonical name of the fixture to instantiate.741 :param nanaimo.Namespace args: The arguments to provide to the new instance.742 :param loop: An event loop to provide the fixture instance.743 :type loop: typing.Optional[asyncio.AbstractEventLoop]744 :raises KeyError: if ``canonical_name`` was not registered with this manager.745 """746 raise NotImplementedError('The base class is an incomplete implementation.')747class PluggyFixtureManager:748 """749 DEPRECATED. Do not use.750 """751 @staticmethod752 def type_factory(type_getter: typing.Callable[[], typing.Type['nanaimo.fixtures.Fixture']]) \753 -> typing.Callable[[], typing.Type['nanaimo.fixtures.Fixture']]:754 raise DeprecationWarning('DEPRECATED: do not use the PluggyFixtureManager.type_factory annotation anymore.'755 'Remove this annotation from your get_fixture_type method and change the name of '756 'this hook to "pytest_nanaimo_fixture_type".')757 def __init__(self):758 raise DeprecationWarning('PluggyFixtureManager has been removed. See '...
test_fixtures.py
Source:test_fixtures.py
...7 import pytest_asyncio8 import pytest_trio9 pytest_plugins = ["pytest_asyncio", "pytest_trio"]10 @pytest_trio.trio_fixture11 async def any_fixture():12 return True13 @pytest.mark.trio14 async def test_anything(any_fixture):15 pass16 """17 )18 )19 result = testdir.runpytest("--asyncio-mode=strict")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!