Best Python code snippet using uiautomator
client.py
Source:client.py
1# Copyright (c) 2016, Louis Opter <louis@opter.org>2# All rights reserved.3#4# Redistribution and use in source and binary forms, with or without5# modification, are permitted provided that the following conditions are met:6#7# 1. Redistributions of source code must retain the above copyright notice,8# this list of conditions and the following disclaimer.9#10# 2. Redistributions in binary form must reproduce the above copyright notice,11# this list of conditions and the following disclaimer in the documentation12# and/or other materials provided with the distribution.13#14# 3. Neither the name of the copyright holder nor the names of its contributors15# may be used to endorse or promote products derived from this software16# without specific prior written permission.17#18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE28# POSSIBILITY OF SUCH DAMAGE.29import asyncio30import functools31import json32import locale33import logging34import os35import urllib.parse36import uuid37from typing import (38 Any,39 Callable,40 Dict,41 List,42 NamedTuple,43 Sequence,44 Tuple,45)46from typing import Type # noqa47from . import (48 exceptions,49 requests,50 responses,51 structs,52)53logger = logging.getLogger("lightsc.client")54_JSONRPCMethod = NamedTuple("_JSONRPCMethod", [55 ("name", str),56 ("map_result", Callable[[Any], responses.Response]),57])58_JSONRPC_API = {59 requests.GetLightState: _JSONRPCMethod(60 name="get_light_state",61 map_result=lambda result: responses.LightsState([62 structs.LightBulb(63 b["label"], b["power"], *b["hsbk"], tags=b["tags"]64 ) for b in result65 ])66 ),67 requests.SetLightFromHSBK: _JSONRPCMethod(68 name="set_light_from_hsbk",69 map_result=lambda result: responses.Bool(result)70 ),71 requests.PowerOn: _JSONRPCMethod(72 name="power_on",73 map_result=lambda result: responses.Bool(result)74 ),75 requests.PowerOff: _JSONRPCMethod(76 name="power_off",77 map_result=lambda result: responses.Bool(result)78 ),79 requests.PowerToggle: _JSONRPCMethod(80 name="power_toggle",81 map_result=lambda result: responses.Bool(result)82 ),83 requests.SetWaveform: _JSONRPCMethod(84 name="set_waveform",85 map_result=lambda result: responses.Bool(result)86 ),87} # type: Dict[Type[requests.RequestClass], _JSONRPCMethod]88class _JSONRPCCall:89 def __init__(90 self, method: str, params: Sequence[Any], timeout: int = None91 ) -> None:92 self.id = str(uuid.uuid4())93 self.method = method94 self.params = params95 self.timeout = timeout96 self.timeout_handle = None # type: asyncio.Handle97 self.request = {98 "id": self.id,99 "jsonrpc": "2.0",100 "method": method,101 "params": params,102 }103 self.response = asyncio.Future() # type: asyncio.futures.Future104 @property105 def response_or_exception(self) -> Any:106 ex = self.response.exception()107 return ex if ex is not None else self.response.result()108class AsyncJSONRPCLightsClient:109 READ_SIZE = 8192110 TIMEOUT = 2 # seconds111 ENCODING = "utf-8"112 def __init__(113 self,114 url: str,115 encoding: str = ENCODING,116 timeout: int = TIMEOUT,117 read_size: int = READ_SIZE,118 loop: asyncio.AbstractEventLoop = None119 ) -> None:120 self.url = url121 self.encoding = encoding122 self.timeout = timeout123 self.read_size = read_size124 self._listen_task = None # type: asyncio.Task125 self._pending_calls = {} # type: Dict[str, _JSONRPCCall]126 self._reader = None # type: asyncio.StreamReader127 self._writer = None # type: asyncio.StreamWriter128 self._loop = loop or asyncio.get_event_loop()129 def _handle_response(130 self, id: str, response: Any, timeout: bool = False131 ) -> None:132 call = self._pending_calls.pop(id)133 if timeout is True:134 call.response.set_exception(exceptions.LightsClientTimeoutError())135 return136 call.timeout_handle.cancel()137 call.response.set_result(response)138 async def _jsonrpc_execute(139 self, pipeline: List[_JSONRPCCall]140 ) -> Dict[str, Any]:141 if not pipeline:142 return {}143 requests = [call.request for call in pipeline]144 for req in requests:145 logger.info("Request {id}: {method}({params})".format(**req))146 payload = json.dumps(requests[0] if len(requests) == 1 else requests)147 self._writer.write(payload.encode(self.encoding, "surrogateescape"))148 await self._writer.drain()149 for call in pipeline:150 call.timeout_handle = self._loop.call_later(151 call.timeout,152 functools.partial(153 self._handle_response, call.id, response=None, timeout=True154 )155 )156 self._pending_calls[call.id] = call157 futures = [call.response for call in pipeline]158 await asyncio.wait(futures, loop=self._loop)159 return {call.id: call.response_or_exception for call in pipeline}160 async def close(self) -> None:161 if self._listen_task is not None:162 self._listen_task.cancel()163 await asyncio.wait([self._listen_task], loop=self._loop)164 self._listen_task = None165 if self._writer is not None:166 if self._writer.can_write_eof():167 self._writer.write_eof()168 self._writer.close()169 if self._reader is not None:170 self._reader.feed_eof()171 if not self._reader.at_eof():172 await self._reader.read()173 self._reader = self._writer = None174 self._pending_calls = {}175 async def _reconnect(self) -> None:176 await self.close()177 await self.connect()178 async def apply(self, req: requests.Request, timeout: int = TIMEOUT):179 method = _JSONRPC_API[req.__class__]180 call = _JSONRPCCall(method.name, req.params, timeout=timeout)181 result = (await self._jsonrpc_execute([call]))[call.id]182 if isinstance(result, Exception):183 raise result184 return method.map_result(result)185 async def connect(self) -> None:186 parts = urllib.parse.urlparse(self.url)187 if parts.scheme == "unix+jsonrpc":188 path = os.path.join(parts.netloc, parts.path).rstrip(os.path.sep)189 open_connection = functools.partial(190 asyncio.open_unix_connection, path191 )192 elif parts.scheme == "tcp+jsonrpc":193 open_connection = functools.partial(194 asyncio.open_connection, parts.hostname, parts.port195 )196 else:197 raise ValueError("Unsupported url {}".format(self.url))198 try:199 self._reader, self._writer = await asyncio.wait_for(200 open_connection(limit=self.read_size, loop=self._loop),201 self.timeout,202 loop=self._loop,203 )204 self._listen_task = self._loop.create_task(self._listen())205 except Exception:206 logger.error("Couldn't open {}".format(self.url))207 raise208 async def _listen(self) -> None:209 # FIXME:210 #211 # This method is fucked, we need to add a real streaming mode on212 # lightsd's side and then an async version of ijson:213 buf = bytearray() # those bufs need to be bound to some max size214 sbuf = str()215 while True:216 chunk = await self._reader.read(self.READ_SIZE)217 if not len(chunk): # EOF, reconnect218 logger.info("EOF, reconnecting...")219 # XXX: deadlock within the close call in _reconnect? (and if220 # that's the case maybe you can use an event or something).221 await self._reconnect()222 return223 buf += chunk224 try:225 sbuf += buf.decode(self.encoding, "strict") # strict is fucked226 except UnicodeError:227 continue228 buf = bytearray()229 while sbuf:230 # and this is completely fucked:231 try:232 response = json.loads(sbuf)233 sbuf = str()234 except Exception:235 def find_response(delim: str) -> Tuple[Dict[str, Any], str]:236 offset = sbuf.find(delim)237 while offset != -1:238 try:239 response = json.loads(sbuf[:offset + 1])240 return response, sbuf[offset + 1:]241 except Exception:242 offset = sbuf.find(delim, offset + 2)243 return None, sbuf244 for delim in {"}{", "}[", "]{", "]["}:245 response, sbuf = find_response(delim)246 if response is not None:247 break # yay!248 else:249 break # need more data250 batch = response if isinstance(response, list) else [response]251 for response in batch:252 id = response["id"]253 error = response.get("error")254 if error is not None:255 code = error.get("code")256 msg = error.get("message")257 logger.warning("Error on request {}: {} - {}".format(258 id, code, msg259 ))260 call = self._pending_calls.pop(id)261 ex = exceptions.LightsClientError(msg)262 call.response.set_exception(ex)263 call.timeout_handle.cancel()264 continue265 logger.info("Response {}: {}".format(266 id, response["result"]267 ))268 self._handle_response(id, response["result"])269 def batch(self) -> "_AsyncJSONRPCBatch":270 return _AsyncJSONRPCBatch(self)271# LightsClient could eventually point to a different but api-compatible class272# someday:273LightsClient = AsyncJSONRPCLightsClient274class _AsyncJSONRPCBatch:275 def __init__(self, client: AsyncJSONRPCLightsClient) -> None:276 self.responses = None # type: List[responses.Response]277 self.exceptions = None # type: List[Exception]278 self._client = client279 self._batch = [] # type: List[Tuple[_JSONRPCMethod, _JSONRPCCall]]280 async def __aenter__(self) -> "_AsyncJSONRPCBatch":281 return self282 async def __aexit__(self, exc_type, exc_val, exc_tb):283 if exc_type is None:284 await self.execute()285 def append(286 self,287 req: requests.Request,288 timeout: int = AsyncJSONRPCLightsClient.TIMEOUT289 ) -> None:290 method = _JSONRPC_API[req.__class__]291 call = _JSONRPCCall(method.name, req.params, timeout=timeout)292 self._batch.append((method, call))293 async def execute(self) -> None:294 resp_by_id = await self._client._jsonrpc_execute([295 call for method, call in self._batch296 ])297 self.responses = []298 self.exceptions = []299 for method, call in self._batch:300 raw_resp = resp_by_id[call.id]301 if isinstance(raw_resp, Exception):302 self.exceptions.append(raw_resp)303 else:304 self.responses.append(method.map_result(raw_resp))305LightsCommandBatch = _AsyncJSONRPCBatch306async def get_lightsd_unix_socket_async(307 loop: asyncio.AbstractEventLoop = None,308) -> str:309 lightsdrundir = None310 try:311 process = await asyncio.create_subprocess_exec(312 "lightsd", "--rundir",313 stdout=asyncio.subprocess.PIPE,314 stderr=asyncio.subprocess.DEVNULL,315 loop=loop,316 )317 except FileNotFoundError: # couldn't find the lightsd bin318 pass319 else:320 stdout, stderr = await process.communicate()321 stdout = stdout.decode(locale.getpreferredencoding()).strip()322 if process.returncode == 0 and stdout:323 lightsdrundir = stdout324 if lightsdrundir is None:325 lightsdrundir = "build"326 logger.warning(327 "Couldn't infer lightsd's runtime directory, is "328 "lightsd installed? Trying {}â¦".format(lightsdrundir)329 )330 return "unix+jsonrpc://" + os.path.join(lightsdrundir, "socket")331async def create_async_lightsd_connection(332 url: str = None,333 loop: asyncio.AbstractEventLoop = None334) -> AsyncJSONRPCLightsClient:335 if loop is None:336 loop = asyncio.get_event_loop()337 if url is None:338 url = await get_lightsd_unix_socket_async(loop)339 c = AsyncJSONRPCLightsClient(url, loop=loop)340 await c.connect()341 return c342def create_lightsd_connection(url: str = None) -> None:...
automator_client.py
Source:automator_client.py
...68 timeout=int(os.environ.get("jsonrpc_timeout", 90)))69 def jsonrpc_wrap(self, timeout):70 server = self71 ERROR_CODE_BASE = -3200072 def _JsonRPCMethod(url, method, timeout, restart=True):73 _method_obj = JsonRPCMethod(url, method, timeout)74 def wrapper(*args, **kwargs):75 URLError = urllib3.exceptions.HTTPError if os.name == "nt" \76 else urllib2.URLError77 try:78 return _method_obj(*args, **kwargs)79 except (URLError, socket.error, HTTPException) as e:80 if restart:81 server.stop()82 server.start(timeout=30)83 return _JsonRPCMethod(84 url, method, timeout, False)(*args, **kwargs)85 else:86 raise87 except JsonRPCError as e:88 if e.code >= ERROR_CODE_BASE - 1:89 server.stop()90 server.start()91 return _method_obj(*args, **kwargs)92 elif e.code == ERROR_CODE_BASE - 2 and self.handlers['on']:93 # Not Found94 try:95 self.handlers['on'] = False96 # any handler returns True97 # will break the left handlers...
autostub.py
Source:autostub.py
...139 timeout=int(os.environ.get("jsonrpc_timeout", 90)))140 def jsonrpc_wrap(self, timeout):141 server = self142 ERROR_CODE_BASE = -32000143 def _JsonRPCMethod(url, method, timeout, restart=True):144 _method_obj = JsonRPCMethod(url, method, timeout)145 def wrapper(*args, **kwargs):146 URLError = urllib3.exceptions.HTTPError if os.name == "nt" \147 else urllib2.URLError148 try:149 return _method_obj(*args, **kwargs)150 except (URLError, socket.error, HTTPException) as e:151 if restart:152 server.stop()153 server.start(timeout=30)154 return _JsonRPCMethod(155 url, method, timeout, False)(*args, **kwargs)156 else:157 raise158 except JsonRPCError as e:159 if e.code >= ERROR_CODE_BASE - 1:160 server.stop()161 server.start()162 return _method_obj(*args, **kwargs)163 elif e.code == ERROR_CODE_BASE - 2 and self.handlers['on']:164 # Not Found165 try:166 self.handlers['on'] = False167 # any handler returns True168 # will break the left handlers...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!