Best Python code snippet using fMBT_python
fmbttizen.py
Source:fmbttizen.py
...405 """406 def __init__(self, serialNumber=None, loginCommand=None, debugAgentFile=None):407 if loginCommand == None:408 if serialNumber == None:409 self._serialNumber = self.recvSerialNumber()410 else:411 self._serialNumber = serialNumber412 self._loginCommand = None413 else:414 if serialNumber == None:415 self._serialNumber = "unknown"416 else:417 self._serialNumber = serialNumber418 if isinstance(loginCommand, str):419 self._loginCommand = shlex.split(loginCommand)420 else:421 self._loginCommand = loginCommand422 self._useSdb = (self._loginCommand == None)423 self._useSsh = not self._useSdb424 self._sdbShell = None425 self._debugAgentFile = debugAgentFile426 self._agentNeedsResolution = True427 self.open()428 def __del__(self):429 self.close()430 def open(self):431 if self._serialNumber == "unknown" and self._useSdb:432 raise TizenDeviceNotFoundError("Tizen device not found.")433 self.close()434 agentFilename = os.path.join(435 os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))),436 "fmbttizen-agent.py")437 remoteUploadPath = "/tmp"438 agentRemoteFilename = remoteUploadPath + "/fmbttizen-agent.py"439 uploadFiles = [(agentFilename,440 agentRemoteFilename),441 (os.path.join(os.path.dirname(agentFilename),442 "fmbtuinput.py"),443 remoteUploadPath + "/fmbtuinput.py")]444 # Upload fmbttizen-agent to the device445 if self._useSdb:446 for src, dst in uploadFiles:447 uploadCmd = ["sdb", "-s", self._serialNumber, "push", src, dst]448 status, out, err = _run(uploadCmd, range(256))449 if status == 127:450 raise TizenConnectionError('Executing "sdb -s %s push" failed. Check your Tizen SDK installation.' % (self._serialNumber,))451 elif status != 0:452 if "device not found" in err:453 raise TizenDeviceNotFoundError('Tizen device "%s" not found.' % (self._serialNumber,))454 else:455 raise TizenConnectionError('Executing "%s" failed: %s' % (' '.join(uploadCmd), err + " " + out))456 else: # using SSH457 if self._loginCommand:458 for src, dst in uploadFiles:459 uploadCmd = self._loginCommand + ["cat", ">", dst]460 p = subprocess.Popen(uploadCmd, shell=False, stdin=subprocess.PIPE)461 p.stdin.write(file(src).read())462 p.stdin.close()463 p.wait()464 else: # run locally without remote login, no upload465 agentRemoteFilename = agentFilename466 # Launch agent, create persistent connection to it467 if self._useSdb:468 remoteShellCmd = ["sdb", "-s", self._serialNumber, "shell"]469 else: # using SSH470 remoteShellCmd = self._loginCommand + ["python", agentRemoteFilename]471 try:472 self._sdbShell = subprocess.Popen(remoteShellCmd,473 shell=False,474 stdin=subprocess.PIPE,475 stdout=subprocess.PIPE,476 stderr=subprocess.PIPE,477 close_fds=True)478 except OSError, msg:479 raise TizenConnectionError('Executing "%s" failed.' % (480 " ".join(remoteShellCmd),))481 _g_sdbProcesses.add(self._sdbShell)482 self._sdbShellErrQueue = Queue.Queue()483 thread.start_new_thread(_fileToQueue, (self._sdbShell.stderr, self._sdbShellErrQueue))484 if self._useSdb:485 self._sdbShell.stdin.write("\r")486 try:487 ok, self._platformInfo = self._agentCmd("python %s; exit" % (agentRemoteFilename,))488 except IOError:489 raise TizenConnectionError('Connecting to a Tizen device/emulator with "sdb -s %s shell" failed.' % (self._serialNumber,))490 else: # using SSH491 ok, self._platformInfo = self._agentCmd("")492 pass # agent already started by remoteShellCmd493 return ok494 def reportErrorsInQueue(self):495 while True:496 try: l = self._sdbShellErrQueue.get_nowait()497 except Queue.Empty: return498 if self._debugAgentFile: self._debugAgentFile.write("<2 %s" % (l,))499 _adapterLog("fmbttizen agent error: %s" % (l,))500 def close(self):501 if self._sdbShell != None:502 try: self._agentCmd("quit", retry=0)503 except: pass504 try: self._sdbShell.terminate()505 except: pass506 try: self._sdbShell.stdin.close()507 except: pass508 try: self._sdbShell.stdout.close()509 except: pass510 try: self._sdbShell.stderr.close()511 except: pass512 self.reportErrorsInQueue()513 _g_sdbProcesses.remove(self._sdbShell)514 self._sdbShell = None515 def _agentAnswer(self):516 errorLinePrefix = "FMBTAGENT ERROR "517 okLinePrefix = "FMBTAGENT OK "518 l = self._sdbShell.stdout.readline()519 output = []520 while True:521 if self._debugAgentFile:522 if len(l) > 72: self._debugAgentFile.write("<1 %s...\n" % (l[:72],))523 else: self._debugAgentFile.write("<1 %s\n" % (l,))524 if l.startswith(okLinePrefix):525 return True, _decode(l[len(okLinePrefix):])526 elif l.startswith(errorLinePrefix):527 return False, _decode(l[len(errorLinePrefix):])528 else:529 output.append(l)530 pass531 l = self._sdbShell.stdout.readline()532 if l == "":533 raise IOError("Unexpected termination of sdb shell: %s" % ("\n".join(output)))534 l = l.strip()535 def _agentCmd(self, command, retry=3):536 if command[:2] in ["tt", "td", "tm", "tu", "er"]:537 # Operating on coordinates on with a touch devices538 # may require information on screen resolution.539 # The agent does not know about possible rotation, so540 # the resolution needs to be sent from here.541 if self._agentNeedsResolution:542 self._agentCmd("sd %s %s" % self._gti.screenSize())543 self._agentNeedsResolution = False544 if self._sdbShell == None: return False, "disconnected"545 if self._debugAgentFile: self._debugAgentFile.write(">0 %s\n" % (command,))546 if self._useSdb:547 eol = "\r"548 else:549 eol = "\n"550 try:551 if len(command) > 0:552 self._sdbShell.stdin.write("%s%s" % (command, eol))553 self._sdbShell.stdin.flush()554 except IOError, msg:555 if retry > 0:556 time.sleep(.2)557 self.reportErrorsInQueue()558 _adapterLog('Error when sending command "%s": %s.' % (command, msg))559 self.open()560 self._agentCmd(command, retry=retry-1)561 else:562 raise563 return self._agentAnswer()564 def sendPress(self, keyName):565 return self._agentCmd("kp %s" % (keyName,))[0]566 def sendKeyDown(self, keyName):567 return self._agentCmd("kd %s" % (keyName,))[0]568 def sendKeyUp(self, keyName):569 return self._agentCmd("ku %s" % (keyName,))[0]570 def sendMtLinearGesture(self, *args):571 return self._agentCmd("ml %s" % (_encode(args)))[0]572 def sendScreenshotRotation(self, angle):573 return self._agentCmd("sa %s" % (angle,))[0]574 def sendTap(self, x, y):575 return self._agentCmd("tt %s %s 1" % (x, y))[0]576 def sendTouchDown(self, x, y):577 return self._agentCmd("td %s %s 1" % (x, y))[0]578 def sendTouchMove(self, x, y):579 return self._agentCmd("tm %s %s" % (x, y))[0]580 def sendTouchUp(self, x, y):581 return self._agentCmd("tu %s %s 1" % (x, y))[0]582 def sendType(self, string):583 return self._agentCmd("kt %s" % (_encode(string)))[0]584 def sendDisplayBacklightTime(self, timeout):585 return self._agentCmd("bl %s" % (timeout,))[0]586 def sendRecStart(self, devices=[]):587 """Start recording events"""588 filterOpts = {589 "type": ["EV_SYN", "EV_KEY", "EV_REL", "EV_ABS"]590 }591 if devices:592 filterOpts["device"] = devices593 return self._agentCmd("er start %s" % (_encode(filterOpts,)))[0]594 def sendRecStop(self):595 """Stop recording events"""596 return self._agentCmd("er stop")[0]597 def recvRec(self):598 """Receive recorded events"""599 status, events = self._agentCmd("er fetch")600 if status:601 return events602 else:603 return None604 def recvDisplayStatus(self):605 status = self._agentCmd("gd")606 if status[0] == False:607 raise FMBTTizenError("Error reading display status '%s'" % (status[2],))608 return status[1]609 def recvScreenshot(self, filename, blankFrameRetry=3):610 if blankFrameRetry > 2:611 rv, img = self._agentCmd("ss")612 else:613 rv, img = self._agentCmd("ss R") # retry614 if rv == False:615 return False616 if img.startswith("FMBTRAWX11"):617 try:618 header, zdata = img.split('\n', 1)619 width, height, depth, bpp = [int(n) for n in header.split()[1:]]620 data = zlib.decompress(zdata)621 except Exception, e:622 raise TizenConnectionError("Corrupted screenshot data: %s" % (e,))623 if len(data) != width * height * 4:624 raise FMBTTizenError("Image data size mismatch.")625 if fmbtgti.eye4graphics.bgrx2rgb(data, width, height) == 0 and blankFrameRetry > 0:626 time.sleep(0.5)627 return self.recvScreenshot(filename, blankFrameRetry - 1)628 # TODO: use libimagemagick directly to save data to png?629 ppm_header = "P6\n%d %d\n%d\n" % (width, height, 255)630 f = file(filename + ".ppm", "w").write(ppm_header + data[:width*height*3])631 _run(["convert", filename + ".ppm", filename], expectedExitStatus=0)632 os.remove("%s.ppm" % (filename,))633 else:634 file(filename, "w").write(img)635 return True636 def recvSerialNumber(self):637 s, o = commands.getstatusoutput("sdb get-serialno")638 return o.splitlines()[-1]639 def recvInputDevices(self):640 return self._platformInfo["input devices"]641 def shellSOE(self, shellCommand, username, password, asyncStatus, asyncOut, asyncError):642 _, (s, o, e) = self._agentCmd(643 "es %s" % (_encode((shellCommand, username, password, asyncStatus,644 asyncOut, asyncError)),))645 return s, o, e646 def target(self):647 return self._serialNumber648class FMBTTizenError(Exception): pass649class TizenConnectionError(FMBTTizenError): pass650class TizenDeviceNotFoundError(TizenConnectionError): pass
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!