Best Python code snippet using playwright-python
_browser_context.py
Source:_browser_context.py
...355 @property356 def service_workers(self) -> List[Worker]:357 return list(self._service_workers)358 async def new_cdp_session(self, page: Union[Page, Frame]) -> CDPSession:359 page = to_impl(page)360 params = {}361 if isinstance(page, Page):362 params["page"] = page._channel363 elif isinstance(page, Frame):364 params["frame"] = page._channel365 else:366 raise Error("page: expected Page or Frame")367 return from_channel(await self._channel.send("newCDPSession", params))368 @property369 def tracing(self) -> Tracing:370 return self._tracing371 @property372 def request(self) -> "APIRequestContext":373 return self._request
generate_api.py
Source:generate_api.py
...119 "typing.Any" in value_str120 or "typing.Dict" in value_str121 or "Handle" in value_str122 ):123 tokens.append(f"{name}=mapping.to_impl({to_snake_case(name)})")124 elif (125 re.match(r"<class 'playwright\._impl\.[\w]+\.[\w]+", value_str)126 and "_api_structures" not in value_str127 ):128 tokens.append(f"{name}={to_snake_case(name)}._impl_obj")129 else:130 tokens.append(f"{name}={to_snake_case(name)}")131 return split.join(tokens)132def return_type(func: FunctionType) -> str:133 value = get_type_hints(func, globals())["return"]134 return process_type(value)135def short_name(t: Any) -> str:136 match = cast(137 Match[str], re.compile(r"playwright\._impl\.[^.]+\.([^']+)").search(str(t))...
op_lib.py
Source:op_lib.py
1# ------------------------------------------------------------2# Copyright (c) 2017-present, SeetaTech, Co.,Ltd.3#4# Licensed under the BSD 2-Clause License.5# You should have received a copy of the BSD 2-Clause License6# along with the software. If not, See,7#8# <https://opensource.org/licenses/BSD-2-Clause>9#10# ------------------------------------------------------------11"""Operator library."""12from __future__ import absolute_import13from __future__ import division14from __future__ import print_function15import numpy16from dragon.core.autograph.op_schema import OpSchema17from dragon.core.framework import context18from dragon.core.framework import proto_util19from dragon.core.framework import tapes20from dragon.core.framework import workspace21from dragon.core.framework.tensor import Tensor22from dragon.core.util import nest23class OpExec(object):24 """The executable operator."""25 _created_instances = {}26 def __init__(self, op_type):27 self._op_type = op_type28 self._ignore_keys = {'outputs'}29 def_args = {}30 def_args_getter = OpSchema.get_args(op_type)31 if def_args_getter is not None:32 def_args = def_args_getter()33 for k, v in def_args.items():34 if k.endswith('_desc'):35 self._ignore_keys.add(k.split('_desc')[0])36 self._config_cache = {}37 @classmethod38 def get_instance(cls, op_type):39 """Return the executable operator."""40 try:41 instance = cls._created_instances[op_type]42 except KeyError:43 instance = OpExec(op_type)44 cls._created_instances[op_type] = instance45 return instance46 def get_config(self, **kwargs):47 """Return the execution config."""48 device = context.get_device()49 cache_key = self._op_type + '/' + str(device)50 for k, v in kwargs.items():51 if k not in self._ignore_keys:52 cache_key += '/' + str(v)53 try:54 return self._config_cache[cache_key]55 except KeyError:56 def_args, feed_dict = {}, {}57 def_args_getter = OpSchema.get_args(self._op_type)58 if def_args_getter is not None:59 def_args = def_args_getter(**kwargs)60 device = def_args.pop('device', device)61 no_grad = def_args.pop('no_grad', False)62 for k, v in def_args.items():63 if k.endswith('_desc') and v:64 name = k.split('_desc')[0]65 feed_dict[name] = v66 def_args[k] = '$NAME/' + name67 op_def = proto_util.make_operator_def(68 op_type=self._op_type,69 name=kwargs.get('name', ''),70 device_option=device.to_proto(False),71 cache_key=cache_key,72 to_impl=True, **def_args)73 config = {'def': op_def,74 'device': device,75 'no_grad': no_grad,76 'feed_dict': feed_dict}77 self._config_cache[cache_key] = config78 return config79class OpLib(object):80 """Library to apply the registered operators."""81 @staticmethod82 def add(op_type, inputs, **kwargs):83 """Add operator to output symbols."""84 op_tape = tapes.OrderedTape()85 graph_tape = tapes.get_tape()86 execute_ws = workspace.get_workspace()87 # Add inputs.88 enable_grad = False89 inputs = nest.flatten(inputs)90 for input in inputs:91 op_tape.add_source(input)92 if graph_tape and (input.requires_grad or93 graph_tape.is_target(id(input))):94 enable_grad = True95 # Add extra inputs.96 for input in nest.flatten(kwargs.pop('extra_inputs', [])):97 op_tape.add_source(input)98 op_tape.add_target(input.id)99 # Add outputs.100 name = kwargs.pop('name', None)101 num_outputs = kwargs.pop('num_outputs', 1)102 outputs = []103 for i in range(num_outputs):104 outputs.append(Tensor(105 impl=execute_ws.create_tensor(scope='Tensor'),106 name=name if name else op_type + ':%d' % i,107 symbolic=True))108 # Create def.109 op_def = proto_util.make_operator_def(110 op_type=op_type,111 inputs=[input.id for input in inputs],112 outputs=[output.id for output in outputs],113 device_option=proto_util.get_default_device_option(),114 name=execute_ws.create_handle('Op'), **kwargs)115 # Record def.116 op_tape.add_element(op_def)117 graph_tape.add_element(op_def) if enable_grad else None118 # Set tape for outputs.119 for output in outputs:120 output._tape = op_tape121 output._requires_grad = enable_grad122 # Add spec to outputs.123 add_output_spec = OpSchema.get_spec(op_type)124 if add_output_spec is None:125 add_output_spec = OpSchema.get_spec('Unchanged')126 outputs = add_output_spec(kwargs, inputs, outputs)127 # Return single or repeated outputs.128 return outputs[0] if num_outputs == 1 else outputs129 @staticmethod130 def execute(op_type, inputs, **kwargs):131 """Execute an operator."""132 op_exec = OpExec.get_instance(op_type)133 run_config = op_exec.get_config(**kwargs)134 return OpLib.run(inputs, run_config, **kwargs)135 @staticmethod136 def run(inputs, run_config, **kwargs):137 """Run operator once."""138 graph_tape = tapes.get_tape()139 execute_ws = workspace.get_workspace()140 # Add inputs.141 input_names = []142 enable_grad = False143 for input in inputs:144 input_names.append(input.id)145 if graph_tape and (input.requires_grad or146 graph_tape.is_target(id(input))):147 enable_grad = True148 # Unify grad modes.149 no_grad = run_config['no_grad']150 enable_grad = enable_grad and not no_grad151 if hasattr(graph_tape, '_exporting'):152 # Ensure the intermediates saved for the exporting graph.153 no_grad, enable_grad = False, True154 # Add outputs.155 outputs, output_names = [], []156 output_specs = list(kwargs.get('outputs', [None]))157 for i, spec in enumerate(output_specs):158 if spec is None:159 outputs.append(Tensor(160 device=run_config['device'].copy(),161 impl=execute_ws.create_tensor(162 scope=context.get_variable_scope(enable_grad)),163 deleter=execute_ws._handle_pool))164 output_names.append(outputs[i].id)165 else:166 assert isinstance(spec, Tensor)167 outputs.append(spec)168 output_names.append(spec.id)169 if enable_grad and output_names[-1] not in input_names:170 raise RuntimeError('Output that requires gradient is not in inputs.')171 # Specialize def for given inputs and outputs.172 op_name = '' # Optional operator name.173 op_def = run_config['def'].DeriveTo(input_names, output_names)174 # Record def if grad is enabled.175 if len(inputs) > 0 and not no_grad:176 if enable_grad:177 op_name = execute_ws.create_handle(op_def.type)178 op_def.name = op_name179 graph_tape.add_element(op_def)180 graph_tape.add_handle(op_name)181 for input in inputs:182 graph_tape.add_source(input)183 for output in outputs:184 output._requires_grad = True185 else:186 for output in outputs:187 output._requires_grad = False188 # Ensure the named operator for the tracing graph.189 if hasattr(graph_tape, '_tracing') and not op_name:190 op_def.name = op_name = execute_ws.create_handle(op_def.type)191 graph_tape.add_handle(op_name)192 # Emit to dispatch this execution.193 for feed_key, value_type in run_config['feed_dict'].items():194 dest = execute_ws.create_tensor(op_name + '/' + feed_key)195 dest.FromNumpy(numpy.array(kwargs[feed_key], value_type), True)196 execute_ws.run_operator(op_def)197 # Return single or repeated outputs....
checkpoint.py
Source:checkpoint.py
1# ------------------------------------------------------------2# Copyright (c) 2017-present, SeetaTech, Co.,Ltd.3#4# Licensed under the BSD 2-Clause License.5# You should have received a copy of the BSD 2-Clause License6# along with the software. If not, See,7#8# <https://opensource.org/licenses/BSD-2-Clause>9#10# ------------------------------------------------------------11"""Checkpoint utilities."""12from __future__ import absolute_import13from __future__ import division14from __future__ import print_function15from dragon.core.framework import context16from dragon.core.framework import proto_util17from dragon.core.framework import tapes18from dragon.core.framework import workspace19from dragon.core.util import decorator20from dragon.core.util import nest21from dragon.vm.torch.core.autograd import grad_mode22from dragon.vm.torch.core.tensor import Tensor23from dragon.vm.torch.core.nn.modules.container import Sequential24class CheckpointFunction(object):25 """Checkpointing function."""26 @staticmethod27 def apply(function, *args, **kwargs):28 """Apply function and create a checkpoint."""29 kwargs.pop('preserve_rng_state', True)30 variable_scope = kwargs.pop('variable_scope', 'Buffer')31 original_variable_scope = context.get_variable_scope(True)32 if kwargs:33 raise ValueError('Unexpected keyword arguments: ' +34 ','.join(arg for arg in kwargs))35 # Run function.36 graph_tape = tapes.Tape()37 graph_tape._tracing = True # Enable tracing.38 graph_tape._checkpointing = True # Enable checkpointing.39 graph_tape._original_variable_scope = original_variable_scope40 with grad_mode.no_grad(), graph_tape:41 with context.variable_scope(variable_scope):42 outputs = function(*args)43 # Collect involving tensors.44 tensor_inputs, tensor_outputs = [], []45 for arg in args:46 if isinstance(arg, Tensor):47 tensor_inputs.append(arg)48 for arg in nest.flatten(outputs):49 if isinstance(arg, Tensor):50 tensor_outputs.append(arg)51 # Fill tape with function context.52 op_tape = tapes.OrderedTape()53 op_handle = workspace.get_workspace().create_handle('Checkpoint')54 op_tape.add_element(proto_util.make_operator_def(55 op_type='Checkpoint',56 name=op_handle,57 inputs=[input.id for input in tensor_inputs],58 outputs=[output.id for output in tensor_outputs],59 defs=[v.SerializeAs() for v in graph_tape.get_elements()],60 buffer_scope=variable_scope,61 to_impl=True))62 op_tape.add_handle(op_handle)63 op_tape.merge_handles(graph_tape.get_handles())64 # Save input tensors for backward.65 for input in tensor_inputs + graph_tape.get_sources():66 op_tape.add_source(input)67 # Save tape for backward.68 for output in tensor_outputs:69 output._tape = op_tape70 output._requires_grad = True71 return outputs72def checkpoint(function, *args, **kwargs):73 """Apply function and create a checkpoint.74 Parameters75 ----------76 function : callable77 The function to apply.78 Returns79 -------80 Any81 The function outputs.82 """83 if not grad_mode.is_grad_enabled():84 return function(*args, **kwargs)85 return CheckpointFunction.apply(function, *args, **kwargs)86def checkpoint_sequential(functions, input, segments=1, **kwargs):87 """Apply functions and create segmental checkpoints.88 Parameters89 ----------90 functions : Union[torch.nn.Sequential, Sequence[callable]]91 The functions to apply sequentially.92 input : dragon.vm.torch.Tensor93 The input tensor.94 segments : Union[int, Sequence[int]], optional95 The number or size of chunked checkpoints.96 Returns97 -------98 Any99 The function outputs.100 """101 def run_function(start, end, functions):102 def forward(input):103 for j in range(start, end):104 input = functions[j](input)105 with no_checkpoint():106 input = functions[end](input)107 return input108 return forward109 preserve_rng_state = kwargs.pop('preserve_rng_state', True)110 variable_scope = kwargs.pop('variable_scope', 'Buffer')111 if kwargs:112 raise ValueError('Unexpected keyword arguments: ' +113 ','.join(arg for arg in kwargs))114 if isinstance(functions, Sequential):115 functions = list(functions.children())116 start, end = 0, len(functions) - 1117 if not grad_mode.is_grad_enabled():118 return run_function(start, end, functions)(input)119 if nest.is_sequence(segments):120 size_segments = segments121 if sum(size_segments) != len(functions):122 raise ValueError('Failed to chunk {} functions into {} segments.'123 .format(len(functions), segments))124 else:125 size = (len(functions) + segments - 1) // segments126 last_size = len(functions) - size * (segments - 1)127 if last_size <= 0:128 raise ValueError('Failed to chunk {} functions into {} segments.'129 .format(len(functions), segments))130 size_segments = [size] * (segments - 1) + [last_size]131 for size in size_segments:132 end = start + size - 1133 input = checkpoint(134 run_function(start, end, functions), input,135 preserve_rng_state=preserve_rng_state,136 variable_scope=variable_scope)137 start = end + 1138 return input139class no_checkpoint(decorator._DecoratorContextManager):140 """Context-manager to disable checkpointing."""141 def __init__(self):142 """Create a ``no_checkpoint`` context manager."""143 self._checkpointing = False144 def __enter__(self):145 graph_tape = tapes.get_tape()146 if hasattr(graph_tape, '_checkpointing'):147 self._checkpointing = True148 context._GLOBAL_VARIABLE_SCOPE_STACK.push(149 graph_tape._original_variable_scope)150 def __exit__(self, *args):151 if self._checkpointing:152 self._checkpointing = False153 context._GLOBAL_VARIABLE_SCOPE_STACK.pop()...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!