Best Python code snippet using playwright-python
evaluation.py
Source:evaluation.py
...470 else get_i18n_string("judge.evaluation.dynamic")471 )472 elif isinstance(test, ExitCodeOutputChannel):473 return str(test.value)474def _add_channel(475 bundle: Bundle, output: OutputChannel, channel: Channel, updates: List[Update]476):477 """Add a channel to the output if it should be shown."""478 if should_show(output, channel):479 updates.append(480 StartTest(expected=guess_expected_value(bundle, output), channel=channel)481 )482 updates.append(483 CloseTest(484 generated="",485 status=StatusMessage(486 enum=Status.NOT_EXECUTED,487 human=get_i18n_string("judge.evaluation.missing.test"),488 ),489 accepted=False,490 )491 )492def prepare_evaluation(bundle: Bundle, collector: OutputManager):493 """494 Generate output depicting the expected testplan. This output will be shown if495 the normal execution terminates early for some reason. This function assumes496 the output is OK, but does not accept anything.497 :param bundle: The configuration bundle.498 :param collector: The output collector.499 """500 collector.prepare_judgment(StartJudgment())501 for i, tab in enumerate(bundle.plan.tabs):502 collector.prepare_tab(StartTab(title=tab.name, hidden=tab.hidden), i)503 context_index = 0504 for run in tab.runs:505 run_testcase = run.run506 has_main = run_testcase.input.main_call507 # Get exit code stuff, but don't evaluate yet.508 exit_output = run_testcase.output.exit_code509 if has_main:510 collector.prepare_context(511 StartContext(description=run_testcase.description), i, context_index512 )513 readable_input, inlined_files = get_readable_input(514 bundle, run_testcase.link_files, run_testcase515 )516 # Add file links517 non_inlined = set(run_testcase.link_files).difference(inlined_files)518 if non_inlined:519 _link_files_message(non_inlined, collector)520 updates = [521 AppendMessage(522 message=get_i18n_string("judge.evaluation.missing.context")523 ),524 StartTestcase(description=readable_input),525 ]526 # Do the normal output channels.527 output = run_testcase.output528 _add_channel(bundle, output.stdout, Channel.STDOUT, updates)529 _add_channel(bundle, output.stderr, Channel.STDERR, updates)530 _add_channel(bundle, output.file, Channel.FILE, updates)531 _add_channel(bundle, output.exception, Channel.EXCEPTION, updates)532 # If this is the last testcase, do the exit code.533 if not run.contexts:534 _add_channel(bundle, exit_output, Channel.EXIT, updates)535 updates.append(CloseTestcase(accepted=False))536 collector.prepare_context(updates, i, context_index)537 collector.prepare_context(538 CloseContext(accepted=False), i, context_index539 )540 context_index += 1541 for j, context in enumerate(run.contexts, start=int(has_main)):542 updates = []543 collector.prepare_context(544 StartContext(description=context.description), i, context_index545 )546 updates.append(547 AppendMessage(548 message=get_i18n_string("judge.evaluation.missing.context")549 )550 )551 inlined_files: Set[FileUrl] = set()552 # Begin normal testcases.553 for t, testcase in enumerate(context.testcases, 1):554 readable_input, seen = get_readable_input(555 bundle, context.link_files, testcase556 )557 inlined_files = inlined_files.union(seen)558 updates.append(StartTestcase(description=readable_input))559 # Do the normal output channels.560 output = testcase.output561 _add_channel(bundle, output.stdout, Channel.STDOUT, updates)562 _add_channel(bundle, output.stderr, Channel.STDERR, updates)563 _add_channel(bundle, output.file, Channel.FILE, updates)564 _add_channel(bundle, output.exception, Channel.EXCEPTION, updates)565 _add_channel(bundle, output.result, Channel.RETURN, updates)566 # If last testcase, do exit code.567 if j == int(has_main) + len(run.contexts) - 1 and t == len(568 context.testcases569 ):570 _add_channel(bundle, exit_output, Channel.EXIT, updates)571 updates.append(CloseTestcase(accepted=False))572 # Add file links573 non_inlined = set(context.link_files).difference(inlined_files)574 if non_inlined:575 updates.insert(0, _link_files_message(non_inlined))576 collector.prepare_context(updates, i, context_index)577 collector.prepare_context(578 CloseContext(accepted=False), i, context_index579 )580 context_index += 1581 collector.prepare_tab(CloseTab(), i)...
client.py
Source:client.py
...130 # No target time, as no work to do131 self.target_time = 0132 self.work_items = [ ]133 for url in urls:134 self._add_channel(url)135 def _reconnect(self, url, event_name, event_arg):136 if event_name == 'closed' or event_name == 'error':137 # Stupid connection closed for some reason. Set up a reconnect. Note138 # that it should have been removed from asyncore.socket_map already.139 self._reconnect_later(url)140 # Call the user's callback now.141 self.event_callback(url, event_name, event_arg)142 def _reconnect_later(self, url):143 # Set up a work item to reconnect in a little while.144 self.work_items.append(url)145 # Only set a target if one has not been set yet. Otherwise, we could146 # create a race condition of continually moving out towards the future147 if not self.target_time:148 self.target_time = time.time() + RECONNECT_DELAY149 def _add_channel(self, url):150 # Simply instantiating the client will install it into the global map151 # for processing in the main event loop.152 Client(url,153 functools.partial(self.commit_callback, url),154 functools.partial(self._reconnect, url))155 def _check_stale(self):156 now = time.time()157 for client in asyncore.socket_map.values():158 if client.last_activity + STALE_DELAY < now:159 # Whoops. No activity in a while. Signal this fact, Close the160 # Client, then have it reconnected later on.161 self.event_callback(client.url, 'stale', client.last_activity)162 # This should remove it from .socket_map.163 client.close()164 self._reconnect_later(client.url)165 def _maybe_work(self):166 # If we haven't reach the targetted time, or have no work to do,167 # then fast-path exit168 if time.time() < self.target_time or not self.work_items:169 return170 # We'll take care of all the work items, so no target for future work171 self.target_time = 0172 # Play a little dance just in case work gets added while we're173 # currently working on stuff174 work = self.work_items175 self.work_items = [ ]176 for url in work:177 self._add_channel(url)178 def run_forever(self):179 while True:180 if asyncore.socket_map:181 asyncore.loop(timeout=TIMEOUT, count=1)182 else:183 time.sleep(TIMEOUT)184 self._check_stale()...
asyncapi.py
Source:asyncapi.py
...99 )100 self.channels = []101 self.messages = []102 self.name = "interface"103 def _add_channel(self, channel: Channel):104 self.channels.append(channel)105 def _add_message(self, message: Message):106 self.messages.append(message)107 def _set_interface_name(self, name):108 self.name = name109 self.asyncapi["id"] = f"urn:stingeripc:{name}"110 def _set_interface_version(self, version_str):111 self.asyncapi["info"]["version"] = version_str112 message = Message("stinger_version", {"type": "string"})113 channel = Channel(114 topic=f"{self.name}/stingerVersion",115 name="stinger_version",116 direction=Direction.SERVER_PUBLISHES,117 message_name=message.name,118 )119 channel.set_mqtt(1, True)120 self._add_channel(channel)121 self._add_message(message)122 def _add_signal(self, signal_name, signal_def):123 msg_name = f"{signal_name}Signal"124 msg = Message(msg_name).set_reference(signal_def["payload"])125 self._add_message(msg)126 channel = Channel(127 f"{self.name}/{signal_name}",128 signal_name,129 Direction.SERVER_PUBLISHES,130 msg.name,131 )132 channel.set_mqtt(2, False)133 self._add_channel(channel)134 def _add_param(self, param_name, param_def):135 msg_name = "{}Param".format(stringmanip.upper_camel_case(param_name))136 msg = Message(msg_name).set_reference(param_def["type"])137 self._add_message(msg)138 value_channel_topic = f"{self.name}/{param_name}/value"139 value_channel_name = f"{param_name}Value"140 value_channel = Channel(141 value_channel_topic,142 value_channel_name,143 Direction.SERVER_PUBLISHES,144 msg.name,145 )146 value_channel.set_mqtt(1, True)147 self._add_channel(value_channel)148 update_channel_topic = f"{self.name}/{param_name}/update"149 update_channel_name = f"Update{param_name}"150 update_channel = Channel(151 update_channel_topic,152 update_channel_name,153 Direction.SERVER_SUBSCRIBES,154 msg.name,155 )156 update_channel.set_mqtt(1, True)157 self._add_channel(update_channel)158 def add_stinger(self, stinger):159 assert stinger["stingeripc"] == "0.0.1"160 try:161 self._set_interface_name(stinger["interface"]["name"])162 except KeyError:163 raise Exception("The Stinger File does not have a name")164 self._set_interface_version(stinger["interface"]["version"])165 if "signals" in stinger:166 for sig_name, sig_def in stinger["signals"].items():167 self._add_signal(sig_name, sig_def)168 if "params" in stinger:169 for param_name, param_def in stinger["params"].items():170 self._add_param(param_name, param_def)171 def get_asyncapi(self, client_type: SpecType, use_common=None):...
test_filesystem.py
Source:test_filesystem.py
...41 try:42 channel._qos._delivered.clear()43 except AttributeError:44 pass45 def _add_channel(self, channel):46 self.channels.add(channel)47 return channel48 def test_produce_consume_noack(self):49 producer = Producer(self._add_channel(self.p.channel()), self.e)50 consumer = Consumer(self._add_channel(self.c.channel()), self.q,51 no_ack=True)52 for i in range(10):53 producer.publish({'foo': i},54 routing_key='test_transport_filesystem')55 _received = []56 def callback(message_data, message):57 _received.append(message)58 consumer.register_callback(callback)59 consumer.consume()60 while 1:61 if len(_received) == 10:62 break63 self.c.drain_events()64 assert len(_received) == 1065 def test_produce_consume(self):66 producer_channel = self._add_channel(self.p.channel())67 consumer_channel = self._add_channel(self.c.channel())68 producer = Producer(producer_channel, self.e)69 consumer1 = Consumer(consumer_channel, self.q)70 consumer2 = Consumer(consumer_channel, self.q2)71 self.q2(consumer_channel).declare()72 for i in range(10):73 producer.publish({'foo': i},74 routing_key='test_transport_filesystem')75 for i in range(10):76 producer.publish({'foo': i},77 routing_key='test_transport_filesystem2')78 _received1 = []79 _received2 = []80 def callback1(message_data, message):81 _received1.append(message)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!