Best Python code snippet using playwright-python
PipelineProcess.py
Source:PipelineProcess.py
...20class TestConstruct(TmpDirMixin):21 def runTest(self):22 cwd = pathlib.Path(self.tmpdir.name).resolve()23 args = [which('touch'), "tmp.txt"]24 event_loop = asyncio.new_event_loop()25 with NullStream() as null_stream:26 process = PipelineProcess(27 event_loop,28 cwd = cwd,29 env = {},30 args = args,31 stdin_stream = null_stream,32 stdout_stream = null_stream,33 stderr_stream = null_stream34 )35 self.assertEqual(process.cwd, cwd)36 self.assertEqual(process.args, args)37 str(process)38 repr(process)39@export40@register()41class TestCreateAndWaitAsync(TmpDirMixin):42 def runTest(self):43 cwd = pathlib.Path(self.tmpdir.name)44 test_file = cwd / 'tmp.txt'45 args = [which('touch'), test_file]46 event_loop = asyncio.new_event_loop()47 async def run_and_wait():48 with NullStream() as null_stream:49 process = await PipelineProcess.create(50 event_loop,51 cwd = cwd.resolve(),52 env = {},53 args = args,54 stdin_stream = null_stream,55 stdout_stream = null_stream,56 stderr_stream = null_stream57 )58 await process.wait_async()59 return process60 process = event_loop.run_until_complete(run_and_wait())61 self.assertEqual(process.proc.returncode, 0)62 self.assertTrue(test_file.exists())63@export64@register()65class TestCreatePoll(TmpDirMixin):66 def runTest(self):67 cwd = pathlib.Path(self.tmpdir.name)68 test_file = cwd / 'tmp.txt'69 args = [which('touch'), test_file]70 event_loop = asyncio.new_event_loop()71 async def run_and_wait(event_loop):72 with NullStream() as null_stream:73 process = await PipelineProcess.create(74 event_loop,75 cwd = cwd.resolve(),76 env = {},77 args = args,78 stdin_stream = null_stream,79 stdout_stream = null_stream,80 stderr_stream = null_stream81 )82 time.sleep(0.1)83 return process84 process = event_loop.run_until_complete(run_and_wait(event_loop))85 86 poll_rc = process.poll()87 poll2_rc = process.poll()88 self.assertEqual(process.proc.returncode, 0)89 self.assertEqual(poll_rc, [process.proc.returncode])90 self.assertEqual(poll2_rc, [process.proc.returncode])91 self.assertTrue(test_file.exists())92@export93@register()94class TestCreateTerminatePoll(TmpDirMixin):95 def runTest(self):96 cwd = pathlib.Path(self.tmpdir.name)97 args = [which('sleep'), "1"]98 event_loop = asyncio.new_event_loop()99 async def run_and_wait(event_loop):100 with NullStream() as null_stream:101 process = await PipelineProcess.create(102 event_loop,103 cwd = cwd.resolve(),104 env = {},105 args = args,106 stdin_stream = null_stream,107 stdout_stream = null_stream,108 stderr_stream = null_stream109 )110 return process111 process = event_loop.run_until_complete(run_and_wait(event_loop))112 113 process.terminate()114 poll_rc = process.poll(0.1)115 self.assertEqual(process.proc.returncode, -signal.SIGTERM)116 self.assertEqual(poll_rc, [-signal.SIGTERM])117 self.assertEqual(process.wait(), [-signal.SIGTERM])118@export119@register()120class TestCreateKillPoll(TmpDirMixin):121 def runTest(self):122 cwd = pathlib.Path(self.tmpdir.name)123 args = [which('sleep'), "1"]124 event_loop = asyncio.new_event_loop()125 async def run_and_wait(event_loop):126 with NullStream() as null_stream:127 process = await PipelineProcess.create(128 event_loop,129 cwd = cwd.resolve(),130 env = {},131 args = args,132 stdin_stream = null_stream,133 stdout_stream = null_stream,134 stderr_stream = null_stream135 )136 return process137 process = event_loop.run_until_complete(run_and_wait(event_loop))138 139 process.kill()140 poll_rc = process.poll(0.1)141 self.assertEqual(process.proc.returncode, -signal.SIGKILL)142 self.assertEqual(poll_rc, [-signal.SIGKILL])143 self.assertEqual(process.wait(), [-signal.SIGKILL])144@export145@register()146class TestCreatePollFail(TmpDirMixin):147 def runTest(self):148 cwd = pathlib.Path(self.tmpdir.name)149 args = [which('sleep'), "1"]150 event_loop = asyncio.new_event_loop()151 async def run_and_wait(event_loop):152 with NullStream() as null_stream:153 process = await PipelineProcess.create(154 event_loop,155 cwd = cwd.resolve(),156 env = {},157 args = args,158 stdin_stream = null_stream,159 stdout_stream = null_stream,160 stderr_stream = null_stream161 )162 return process163 process = event_loop.run_until_complete(run_and_wait(event_loop))164 165 poll_rc = process.poll()166 self.assertEqual(process.proc.returncode, None)167 self.assertEqual(poll_rc, [None])168 self.assertEqual(process.wait(), [0])169@export170@register()171class TestCreateWithDifferentUser(TmpDirMixin):172 def setUp(self):173 super().setUp()174 if ((sys.version_info.major, sys.version_info.minor) < (3, 9)):175 raise unittest.SkipTest("Python version is less than 3.9")176 if os.getuid() != 0:177 raise unittest.SkipTest("Not running as root")178 def unless_key_error(fun):179 try:180 return fun()181 except KeyError:182 return None183 self.uid = unless_key_error(lambda: pwd.getpwnam('nobody').pw_uid)184 if self.uid is None:185 raise unittest.SkipTest("No user exists with name 'nobody'")186 def runTest(self):187 cwd = pathlib.Path(self.tmpdir.name)188 args = [which('id'), '-u']189 event_loop = asyncio.new_event_loop()190 async def run_and_wait(event_loop):191 with NullStream() as null_stream, PipeStream(None) as stdout_stream:192 process = await PipelineProcess.create(193 event_loop,194 cwd = cwd.resolve(),195 env = {},196 args = args,197 stdin_stream = null_stream,198 stdout_stream = stdout_stream,199 stderr_stream = null_stream,200 user = 'nobody'201 )202 stdout_stream.close_writer()203 await process.wait_async()204 return stdout_stream.reader().read()205 observed_uid = event_loop.run_until_complete(run_and_wait(event_loop))206 self.assertEqual(observed_uid.strip(), str(self.uid))207@export208@register()209class TestCreateWithDifferentGroup(TmpDirMixin):210 def setUp(self):211 super().setUp()212 if ((sys.version_info.major, sys.version_info.minor) < (3, 9)):213 raise unittest.SkipTest("Python version is less than 3.9")214 if os.getuid() != 0:215 raise unittest.SkipTest("Not running as root")216 def unless_key_error(fun):217 try:218 return fun()219 except KeyError:220 return None221 self.gid = unless_key_error(lambda: pwd.getpwnam('nobody').pw_gid)222 if self.gid is None:223 raise unittest.SkipTest("No group exists with name 'nobody'")224 def runTest(self):225 cwd = pathlib.Path(self.tmpdir.name)226 args = [which('id'), '-g']227 event_loop = asyncio.new_event_loop()228 async def run_and_wait(event_loop):229 with NullStream() as null_stream, PipeStream(None) as stdout_stream:230 process = await PipelineProcess.create(231 event_loop,232 cwd = cwd.resolve(),233 env = {},234 args = args,235 stdin_stream = null_stream,236 stdout_stream = stdout_stream,237 stderr_stream = null_stream,238 group = self.gid239 )240 stdout_stream.close_writer()241 await process.wait_async()242 return stdout_stream.reader().read()243 observed_gid = event_loop.run_until_complete(run_and_wait(event_loop))244 self.assertEqual(observed_gid.strip(), str(self.gid))245 246@export247@register()248class TestCreateWait(TmpDirMixin):249 def runTest(self):250 cwd = pathlib.Path(self.tmpdir.name)251 test_file = cwd / 'tmp.txt'252 args = [which('touch'), test_file]253 event_loop = asyncio.new_event_loop()254 async def run_and_wait(event_loop):255 with NullStream() as null_stream:256 process = await PipelineProcess.create(257 event_loop,258 cwd = cwd.resolve(),259 env = {},260 args = args,261 stdin_stream = null_stream,262 stdout_stream = null_stream,263 stderr_stream = null_stream264 )265 return process266 process = event_loop.run_until_complete(run_and_wait(event_loop))267 268 wait_rc = process.wait()269 self.assertEqual(process.proc.returncode, 0)270 self.assertEqual(wait_rc, [process.proc.returncode])271 self.assertTrue(test_file.exists())272@export273@register()274class TestEnvironmentVariableExists(TmpDirMixin):275 def runTest(self):276 cwd = pathlib.Path(self.tmpdir.name)277 from .. import test_util278 with importlib.resources.path(test_util.__package__, 'echo_env.py') as echo_env:279 args = [which('python3'), echo_env]280 message = 'Hello World!'281 event_loop = asyncio.new_event_loop()282 async def run_and_wait():283 with Pipe() as pipe:284 with NullStream() as null_stream, ManualStream(fileobj_w=pipe.writer) as stdout_stream:285 process = await PipelineProcess.create(286 event_loop,287 cwd = cwd.resolve(),288 env = {289 'A': 'wrong output',290 'MESSAGE': message,291 'Z': 'wrong output'292 },293 args = [which('python3'), echo_env, "MESSAGE"],294 stdin_stream = null_stream,295 stdout_stream = stdout_stream,...
test_mdns_functional.py
Source:test_mdns_functional.py
1"""Functional tests pyatv.support.mdns."""2import asyncio3import logging4import typing5from ipaddress import IPv4Address6from unittest.mock import MagicMock, patch7import pytest8from pyatv.support import mdns, net9from tests import fake_udns, utils10SERVICE_NAME = "Kitchen"11MEDIAREMOTE_SERVICE = "_mediaremotetv._tcp.local"12DEVICE_INFO_SERVICE = "_device-info._tcp._local"13TEST_SERVICES = dict(14 [15 fake_udns.mrp_service(16 SERVICE_NAME, SERVICE_NAME, "mrp_id", address="127.0.0.1", port=123417 ),18 ]19)20@pytest.fixture(autouse=True)21def mdns_debug():22 logger = logging.getLogger("pyatv.support.mdns")23 logger.setLevel(mdns.TRAFFIC_LEVEL)24 yield25@pytest.fixture26async def udns_server(event_loop):27 server = fake_udns.FakeUdns(event_loop, TEST_SERVICES)28 await server.start()29 yield server30 server.close()31@pytest.fixture(autouse=True)32def stub_local_addresses():33 with patch("pyatv.net.get_private_addresses") as mock:34 mock.return_value = [IPv4Address("127.0.0.1")]35 yield36# Requests are normally not sent to localhost, so we need to fake that localhost37# is not s loopback address38@pytest.fixture(autouse=True)39def stub_ip_address():40 with patch("pyatv.support.mdns.ip_address") as mock:41 mock.return_value = mock42 mock.is_loopback = False43 yield44# Hack-ish fixture to make sure multicast does not listen on any global port,45# i.e. 5353 since data from other places can leak into the test46@pytest.fixture(autouse=True)47def redirect_mcast(udns_server):48 real_mcast_socket = net.mcast_socket49 with patch("pyatv.net.mcast_socket") as mock:50 mock.side_effect = lambda addr, port=0: real_mcast_socket(51 addr, port if port == udns_server.port else 052 )53 yield54# This is a very complex fixture that will hook into the receiver of multicast55# responses and abort the "waiting" period whenever all responses or a certain amount56# of requests have been received. Mainly this is for not slowing down tests.57@pytest.fixture58async def multicast_fastexit(event_loop, monkeypatch, udns_server):59 clients: typing.List[asyncio.Future] = []60 # Interface used to set number of expected responses (1 by default)61 conditions: typing.Dict[str, object] = {"responses": 1, "requests": 0}62 # Checks if either response or request count has been fulfilled63 def _check_cond(protocol: mdns.MulticastDnsSdClientProtocol) -> bool:64 if len(protocol.responses) == conditions["responses"]:65 return False66 if conditions["requests"] == 0:67 return True68 return udns_server.request_count < conditions["responses"]69 def _cast(typ, val):70 if isinstance(val, mdns.MulticastDnsSdClientProtocol):71 async def _poll_responses():72 await utils.until(lambda: not _check_cond(val))73 val.semaphore.release()74 clients.append(asyncio.ensure_future(_poll_responses()))75 return val76 monkeypatch.setattr(typing, "cast", _cast)77 yield lambda **kwargs: conditions.update(**kwargs)78 await asyncio.gather(*clients)79async def unicast(event_loop, udns_server, service_name, timeout=1):80 return (81 await mdns.unicast(82 event_loop,83 "127.0.0.1",84 [service_name],85 port=udns_server.port,86 timeout=timeout,87 ),88 TEST_SERVICES.get(service_name),89 )90@pytest.mark.asyncio91async def test_unicast_has_valid_service(event_loop, udns_server):92 resp, service = await unicast(event_loop, udns_server, MEDIAREMOTE_SERVICE)93 assert len(resp.services) == 194 assert resp.services[0].type == MEDIAREMOTE_SERVICE95 assert resp.services[0].name == service.name96 assert resp.services[0].port == service.port97@pytest.mark.asyncio98async def test_unicast_resend_if_no_response(event_loop, udns_server):99 udns_server.skip_count = 2100 resp, service = await unicast(event_loop, udns_server, MEDIAREMOTE_SERVICE, 3)101 assert len(resp.services) == 1102 assert resp.services[0].type == MEDIAREMOTE_SERVICE103 assert resp.services[0].name == service.name104 assert resp.services[0].port == service.port105@pytest.mark.asyncio106async def test_unicast_specific_service(event_loop, udns_server):107 resp, _ = await unicast(108 event_loop, udns_server, SERVICE_NAME + "." + MEDIAREMOTE_SERVICE109 )110 assert len(resp.services) == 1111 service = TEST_SERVICES.get(MEDIAREMOTE_SERVICE)112 assert resp.services[0].type == MEDIAREMOTE_SERVICE113 assert resp.services[0].name == service.name114 assert resp.services[0].port == service.port115@pytest.mark.asyncio116async def test_multicast_no_response(event_loop, udns_server, multicast_fastexit):117 multicast_fastexit(responses=0, requests=0)118 await mdns.multicast(event_loop, [], "127.0.0.1", udns_server.port)119@pytest.mark.asyncio120async def test_multicast_has_valid_service(event_loop, udns_server, multicast_fastexit):121 multicast_fastexit(responses=1, requests=0)122 resp = await mdns.multicast(123 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port124 )125 assert len(resp) == 1126 first = resp[IPv4Address("127.0.0.1")].services[0]127 assert first.type == MEDIAREMOTE_SERVICE128 assert first.name == SERVICE_NAME129 assert first.port == 1234130@pytest.mark.asyncio131async def test_multicast_end_condition_met(132 event_loop, udns_server, multicast_fastexit, stub_ip_address133):134 multicast_fastexit(responses=4, requests=10)135 actor = MagicMock()136 def _end_cond(response):137 actor(response)138 return True139 resp = await mdns.multicast(140 event_loop,141 [MEDIAREMOTE_SERVICE],142 "127.0.0.1",143 udns_server.port,144 end_condition=_end_cond,145 )146 assert len(resp) == 1147 actor.assert_called_once_with(resp[IPv4Address("127.0.0.1")])148@pytest.mark.asyncio149async def test_multicast_sleeping_device(event_loop, udns_server, multicast_fastexit):150 multicast_fastexit(responses=0, requests=3)151 udns_server.sleep_proxy = True152 udns_server.services = {153 MEDIAREMOTE_SERVICE: fake_udns.FakeDnsService(154 name=SERVICE_NAME, address=None, port=0, properties={}, model=None155 ),156 }157 resp = await mdns.multicast(158 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port159 )160 assert len(resp) == 0161 multicast_fastexit(responses=1, requests=0)162 udns_server.services = TEST_SERVICES163 resp = await mdns.multicast(164 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port165 )166 assert len(resp) == 1167@pytest.mark.asyncio168async def test_multicast_deep_sleep(event_loop, udns_server, multicast_fastexit):169 multicast_fastexit(responses=1, requests=0)170 resp = await mdns.multicast(171 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port172 )173 assert not resp[IPv4Address("127.0.0.1")].deep_sleep174 udns_server.sleep_proxy = True175 multicast_fastexit(responses=1, requests=0)176 resp = await mdns.multicast(177 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port178 )179 assert resp[IPv4Address("127.0.0.1")].deep_sleep180@pytest.mark.asyncio181async def test_multicast_device_model(event_loop, udns_server, multicast_fastexit):182 multicast_fastexit(responses=1, requests=0)183 resp = await mdns.multicast(184 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port185 )186 assert not resp[IPv4Address("127.0.0.1")].model187 udns_server.services = {188 MEDIAREMOTE_SERVICE: fake_udns.FakeDnsService(189 name=SERVICE_NAME,190 address="127.0.0.1",191 port=1234,192 properties={},193 model="dummy",194 ),195 }196 multicast_fastexit(responses=1, requests=0)197 resp = await mdns.multicast(198 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port199 )...
test_aiocontext.py
Source:test_aiocontext.py
...108 def test_len(self, context, context_loop):109 context['key1'] = 'value1'110 context['key2'] = 'value2'111 assert len(context) == 2112 def test_missing_event_loop(self, context):113 with pytest.raises(EventLoopError):114 context.get_data()115 with pytest.raises(EventLoopError):116 context['key']117 with pytest.raises(EventLoopError):118 context['key'] = 'value'119 with pytest.raises(EventLoopError):120 del context['key']121 with pytest.raises(EventLoopError):122 list(context)123 with pytest.raises(EventLoopError):124 len(context)125 @pytest.mark.asyncio126 @asyncio.coroutine...
test_player.py
Source:test_player.py
...16 player.play()17 sound = test_data.get_file('media', 'alert.wav')18 source = pyglet.media.load(sound, streaming=False)19 player.queue(source)20 event_loop.run_event_loop()21 event_loop.ask_question('Did you hear the alert sound playing?', screenshot=False)22 sound2 = test_data.get_file('media', 'receive.wav')23 source2 = pyglet.media.load(sound2, streaming=False)24 player.queue(source2)25 player.play()26 event_loop.run_event_loop()27 event_loop.ask_question('Did you hear the receive sound playing?', screenshot=False)28@pytest.mark.requires_user_validation29def test_playback_fire_and_forget(event_loop, test_data):30 """Test playing back sound files using fire and forget."""31 sound = test_data.get_file('media', 'alert.wav')32 source = pyglet.media.load(sound, streaming=False)33 source.play()34 event_loop.ask_question('Did you hear the alert sound playing?', screenshot=False)35@pytest.mark.requires_user_validation36def test_play_queue(event_loop):37 """Test playing a single sound on the queue."""38 source = procedural.WhiteNoise(1.0)39 player = Player()40 player.on_player_eos = event_loop.interrupt_event_loop41 player.play()42 player.queue(source)43 event_loop.run_event_loop()44 event_loop.ask_question('Did you hear white noise for 1 second?', screenshot=False)45@pytest.mark.requires_user_validation46def test_queue_play(event_loop):47 """Test putting a single sound on the queue and then starting the player."""48 source = procedural.WhiteNoise(1.0)49 player = Player()50 player.on_player_eos = event_loop.interrupt_event_loop51 player.queue(source)52 player.play()53 event_loop.run_event_loop()54 event_loop.ask_question('Did you hear white noise for 1 second?', screenshot=False)55@pytest.mark.requires_user_validation56def test_pause_queue(event_loop):57 """Test the queue is not played when player is paused."""58 source = procedural.WhiteNoise(1.0)59 player = Player()60 player.pause()61 player.queue(source)62 # Run for the duration of the sound63 event_loop.run_event_loop(1.0)64 event_loop.ask_question('Did you not hear any sound?', screenshot=False)65@pytest.mark.requires_user_validation66def test_pause_sound(event_loop):67 """Test that a playing sound can be paused."""68 source = procedural.WhiteNoise(60.0)69 player = Player()70 player.queue(source)71 player.play()72 event_loop.run_event_loop(1.0)73 player.pause()74 event_loop.ask_question('Did you hear white noise for 1 second and is it now silent?',75 screenshot=False)76 player.play()77 event_loop.ask_question('Do you hear white noise again?', screenshot=False)78 player.delete()79 event_loop.ask_question('Is it silent again?', screenshot=False)80@pytest.mark.requires_user_validation81def test_next_on_end_of_stream(event_loop):82 """Test that multiple items on the queue are played after each other."""83 source1 = procedural.WhiteNoise(1.0)84 source2 = procedural.Sine(1.0)85 player = Player()86 player.on_player_eos = event_loop.interrupt_event_loop87 player.queue(source1)88 player.queue(source2)89 player.play()90 event_loop.run_event_loop()91 event_loop.ask_question('Did you hear white noise for 1 second and then a tone at 440 Hz'92 '(A above middle C)?', screenshot=False)93@pytest.mark.requires_user_validation94def test_static_source_wrapping(event_loop):95 """Test that a sound can be recursively wrappend inside a static source."""96 source = procedural.WhiteNoise(1.0)97 source = StaticSource(source)98 source = StaticSource(source)99 player = Player()100 player.on_player_eos = event_loop.interrupt_event_loop101 player.queue(source)102 player.play()103 event_loop.run_event_loop()...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!