Best Python code snippet using fMBT_python
fmbttizen-agent.py
Source:fmbttizen-agent.py
...303 else:304 response = "%s%s\n" % (p, value)305 sys.stdout.write(response)306 sys.stdout.flush()307def sendHwTap(x, y, button):308 try:309 if touch_device:310 touch_device.tap(x, y)311 else:312 mouse_button_device.tap(x, y, button)313 return True, None314 except Exception, e:315 return False, str(e)316def sendHwMove(x, y):317 try:318 if touch_device:319 touch_device.move(x, y)320 else:321 mouse_button_device.move(x, y)322 return True, None323 except Exception, e:324 return False, str(e)325def sendHwFingerDown(x, y, button):326 try:327 if touch_device:328 touch_device.pressFinger(button, x, y)329 else:330 mouse_button_device.move(x, y)331 mouse_button_device.press(button)332 return True, None333 except Exception, e:334 return False, str(e)335def sendHwFingerUp(x, y, button):336 try:337 if touch_device:338 touch_device.releaseFinger(button)339 else:340 mouse_button_device.move(x, y)341 mouse_button_device.release(button)342 return True, None343 except Exception, e:344 return False, str(e)345def sendHwKey(keyName, delayBeforePress, delayBeforeRelease):346 if keyName.startswith("KEY_"):347 keyName = keyName.lstrip("KEY_")348 keyName = keyName.upper()349 fd = None350 closeFd = False351 try: inputDevice = deviceToEventFile[hwKeyDevice[keyName]]352 except: fd = keyboard_device._fd353 try: keyCode = _inputKeyNameToCode[keyName]354 except KeyError:355 try: keyCode = fmbtuinput.toKeyCode(keyName)356 except ValueError: return False, 'No keycode for key "%s"' % (keyName,)357 try:358 if not fd:359 fd = os.open(inputDevice, os.O_WRONLY | os.O_NONBLOCK)360 closeFd = True361 except: return False, 'Unable to open input device "%s" for writing' % (inputDevice,)362 if delayBeforePress > 0: time.sleep(delayBeforePress)363 if delayBeforePress >= 0:364 if os.write(fd, struct.pack(_input_event, int(time.time()), 0, _EV_KEY, keyCode, 1)) > 0:365 os.write(fd, struct.pack(_input_event, 0, 0, 0, 0, 0))366 if delayBeforeRelease > 0: time.sleep(delayBeforeRelease)367 if delayBeforeRelease >= 0:368 if os.write(fd, struct.pack(_input_event, int(time.time()), 0, _EV_KEY, keyCode, 0)) > 0:369 os.write(fd, struct.pack(_input_event, 0, 0, 0, 0, 0))370 if closeFd:371 os.close(fd)372 return True, None373def specialCharToXString(c):374 c2s = {'\n': "Return",375 ' ': "space", '!': "exclam", '"': "quotedbl",376 '#': "numbersign", '$': "dollar", '%': "percent",377 '&': "ampersand", "'": "apostrophe",378 '(': "parenleft", ')': "parenright", '*': "asterisk",379 '+': "plus", '-': "minus", '.': "period", '/': "slash",380 ':': "colon", ';': "semicolon", '<': "less", '=': "equal",381 '>': "greater", '?': "question", '@': "at",382 '_': "underscore"}383 return c2s.get(c, c)384def specialCharToUsKeys(c):385 # character -> ([modifier, [modifier...]] keycode)386 c2s = {'\n': ("KEY_ENTER",),387 ' ': ("KEY_SPACE",),388 '`': ("KEY_GRAVE",), '~': ("KEY_LEFTSHIFT", "KEY_GRAVE"),389 '!': ("KEY_LEFTSHIFT", "KEY_1"),390 '@': ("KEY_LEFTSHIFT", "KEY_2"),391 '#': ("KEY_LEFTSHIFT", "KEY_3"),392 '$': ("KEY_LEFTSHIFT", "KEY_4"),393 '%': ("KEY_LEFTSHIFT", "KEY_5"),394 '^': ("KEY_LEFTSHIFT", "KEY_6"),395 '&': ("KEY_LEFTSHIFT", "KEY_7"),396 '*': ("KEY_LEFTSHIFT", "KEY_8"),397 '(': ("KEY_LEFTSHIFT", "KEY_9"),398 ')': ("KEY_LEFTSHIFT", "KEY_0"),399 '-': ("KEY_MINUS",), '_': ("KEY_LEFTSHIFT", "KEY_MINUS"),400 '=': ("KEY_EQUAL",), '+': ("KEY_LEFTSHIFT", "KEY_EQUAL"),401 '\t': ("KEY_TAB",),402 '[': ("KEY_LEFTBRACE",), '{': ("KEY_LEFTSHIFT", "KEY_LEFTBRACE"),403 ']': ("KEY_RIGHTBRACE",), '}': ("KEY_LEFTSHIFT", "KEY_RIGHTBRACE"),404 ';': ("KEY_SEMICOLON",), ':': ("KEY_LEFTSHIFT", "KEY_SEMICOLON"),405 "'": ("KEY_APOSTROPHE",), '"': ("KEY_LEFTSHIFT", "KEY_APOSTROPHE"),406 '\\': ("KEY_BACKSLASH",), '|': ("KEY_LEFTSHIFT", "KEY_BACKSLASH"),407 ',': ("KEY_COMMA",), '<': ("KEY_LEFTSHIFT", "KEY_COMMA"),408 '.': ("KEY_DOT",), '>': ("KEY_LEFTSHIFT", "KEY_DOT"),409 '/': ("KEY_SLASH",), '?': ("KEY_LEFTSHIFT", "KEY_SLASH"),410 }411 return c2s.get(c, c)412mtEvents = {} # slot -> (tracking_id, x, y)413def inputEventSend(inputDevFd, eventType, event, param):414 t = time.time()415 tsec = int(t)416 tusec = int(1000000*(t-tsec))417 os.write(inputDevFd, struct.pack(_input_event,418 tsec, tusec, eventType, event, param))419def mtEventSend(eventType, event, param):420 """multitouch device event"""421 return inputEventSend(mtInputDevFd, eventType, event, param)422def mtGestureStart(x, y):423 mtGestureStart.trackingId += 1424 trackingId = mtGestureStart.trackingId425 for freeSlot in xrange(16):426 if not freeSlot in mtEvents: break427 else: raise ValueError("No free multitouch event slots available")428 mtEvents[freeSlot] = [trackingId, x, y]429 mtEventSend(_EV_ABS, _ABS_MT_SLOT, freeSlot)430 mtEventSend(_EV_ABS, _ABS_MT_TRACKING_ID, trackingId)431 mtEventSend(_EV_ABS, _ABS_MT_POSITION_X, x)432 mtEventSend(_EV_ABS, _ABS_MT_POSITION_Y, y)433 mtEventSend(_EV_ABS, _ABS_X, x)434 mtEventSend(_EV_ABS, _ABS_Y, y)435 mtEventSend(0, 0, 0) # SYNC436 return freeSlot437mtGestureStart.trackingId = 0438def mtGestureMove(slot, x, y):439 if x == mtEvents[slot][1] and y == mtEvents[slot][2]: return440 mtEventSend(_EV_ABS, _ABS_MT_SLOT, slot)441 mtEventSend(_EV_ABS, _ABS_MT_TRACKING_ID, mtEvents[slot][0])442 if x != mtEvents[slot][1] and 0 <= x <= root_width:443 mtEventSend(_EV_ABS, _ABS_MT_POSITION_X, x)444 mtEvents[slot][1] = x445 if y != mtEvents[slot][2] and 0 <= y <= root_height:446 mtEventSend(_EV_ABS, _ABS_MT_POSITION_Y, y)447 mtEvents[slot][2] = y448 if 0 <= x <= root_width:449 mtEventSend(_EV_ABS, _ABS_X, x)450 if 0 <= y <= root_height:451 mtEventSend(_EV_ABS, _ABS_Y, y)452 mtEventSend(0, 0, 0)453def mtGestureEnd(slot):454 mtEventSend(_EV_ABS, _ABS_MT_SLOT, slot)455 mtEventSend(_EV_ABS, _ABS_MT_TRACKING_ID, -1)456 mtEventSend(0, 0, 0) # SYNC457 del mtEvents[slot]458def mtLinearGesture(listOfStartEndPoints, duration, movePoints, sleepBeforeMove=0, sleepAfterMove=0):459 # listOfStartEndPoints: [ [(finger1startX, finger1startY), (finger1endX, finger1endY)],460 # [(finger2startX, finger2startY), (finger2endX, finger2endY)], ...]461 startPoints = [startEnd[0] for startEnd in listOfStartEndPoints]462 xDist = [startEnd[1][0] - startEnd[0][0] for startEnd in listOfStartEndPoints]463 yDist = [startEnd[1][1] - startEnd[0][1] for startEnd in listOfStartEndPoints]464 movePointsF = float(movePoints)465 fingers = []466 for (x, y) in startPoints:467 fingers.append(mtGestureStart(x, y))468 if sleepBeforeMove > 0: time.sleep(sleepBeforeMove)469 if movePoints > 0:470 intermediateSleep = float(duration) / movePoints471 for i in xrange(1, movePoints + 1):472 if intermediateSleep > 0:473 time.sleep(intermediateSleep)474 for fingerIndex, finger in enumerate(fingers):475 mtGestureMove(finger,476 startPoints[fingerIndex][0] + int(xDist[fingerIndex]*i/movePointsF),477 startPoints[fingerIndex][1] + int(yDist[fingerIndex]*i/movePointsF))478 if sleepAfterMove > 0: time.sleep(sleepAfterMove)479 for finger in fingers:480 mtGestureEnd(finger)481 return True, None482def typeCharX(origChar):483 modifiers = []484 c = specialCharToXString(origChar)485 keysym = libX11.XStringToKeysym(c)486 if keysym == NoSymbol:487 return False488 keycode = libX11.XKeysymToKeycode(display, keysym)489 first = (keycode - cMinKeycode.value) * cKeysymsPerKeycode.value490 try:491 if chr(keysyms[first + 1]) == origChar:492 modifiers.append(shiftModifier)493 except ValueError: pass494 for m in modifiers:495 libXtst.XTestFakeKeyEvent(display, m, X_True, X_CurrentTime)496 libXtst.XTestFakeKeyEvent(display, keycode, X_True, X_CurrentTime)497 libXtst.XTestFakeKeyEvent(display, keycode, X_False, X_CurrentTime)498 for m in modifiers[::-1]:499 libXtst.XTestFakeKeyEvent(display, m, X_False, X_CurrentTime)500 return True501def typeCharHw(origChar):502 for c in origChar:503 modifiers = []504 keyCode = None505 c = specialCharToUsKeys(c)506 if isinstance(c, tuple):507 modifiers = c[:-1]508 keyCode = c[-1]509 elif c in string.uppercase:510 modifiers = ["KEY_LEFTSHIFT"]511 keyCode = "KEY_" + c512 elif c in string.lowercase or c in string.digits:513 keyCode = "KEY_" + c.upper()514 else:515 # do not know how to type the character516 pass517 if keyCode:518 for m in modifiers:519 keyboard_device.press(m)520 keyboard_device.tap(keyCode)521 for m in modifiers[::-1]:522 keyboard_device.release(m)523 return True524if g_Xavailable:525 typeChar = typeCharX526else:527 typeChar = typeCharHw528def typeSequence(s, delayBetweenChars=0):529 skipped = []530 for c in s:531 if not typeChar(c):532 skipped.append(c)533 if delayBetweenChars != 0:534 time.sleep(delayBetweenChars)535 if skipped: return False, skipped536 else: return True, skipped537def takeScreenshotOnX():538 image_p = libX11.XGetImage(display, root_window,539 0, 0, root_width, root_height,540 X_AllPlanes, X_ZPixmap)541 image = image_p[0]542 # FMBTRAWX11 image format header:543 # FMBTRAWX11 [width] [height] [color depth] [bits per pixel]<linefeed>544 # Binary data545 rawfmbt_header = "FMBTRAWX11 %d %d %d %d\n" % (546 image.width, image.height, root_depth.value, image.bits_per_pixel)547 rawfmbt_data = ctypes.string_at(image.data, image.height * image.bytes_per_line)548 compressed_image = rawfmbt_header + zlib.compress(rawfmbt_data, 3)549 libX11.XDestroyImage(image_p)550 return True, compressed_image551def westonTakeScreenshotRoot():552 if westonTakeScreenshotRoot.ssFilename == None:553 westonTakeScreenshotRoot.ssFilename = findWestonScreenshotFilenameRoot()554 try:555 keyboard_device.press("KEY_LEFTMETA")556 keyboard_device.tap("s")557 keyboard_device.release("KEY_LEFTMETA")558 time.sleep(0.5)559 # wait for the screenshot writer to finish560 writerPid = fuser(westonTakeScreenshotRoot.ssFilename)561 if writerPid != None:562 time.sleep(0.25)563 while fuser(westonTakeScreenshotRoot.ssFilename, [writerPid]) != None:564 time.sleep(0.25)565 shutil.move(westonTakeScreenshotRoot.ssFilename, "/tmp/screenshot.png")566 os.chmod("/tmp/screenshot.png", 0666)567 except Exception, e:568 return False, str(e)569 return True, None570westonTakeScreenshotRoot.ssFilename = None571def takeScreenshotOnWeston():572 if iAmRoot:573 rv, status = westonTakeScreenshotRoot()574 else:575 rv, status = subAgentCommand("root", "tizen", "ss weston-root")576 if rv == False:577 return rv, status578 return True, file("/tmp/screenshot.png").read()579def fuser(filename, usualSuspects=None):580 """Returns the pid of a user of given file, or None"""581 filepath = os.path.realpath(filename)582 if not os.access(filepath, os.R_OK):583 raise ValueError('No such file: "%s"' % (filename,))584 if usualSuspects == None:585 procFds = glob.glob("/proc/[1-9][0-9][0-9]*/fd/*")586 else:587 procFds = []588 for pid in usualSuspects:589 procFds.extend(glob.glob("/proc/%s/fd/*" % (pid,)))590 for symlink in procFds:591 try:592 if os.path.realpath(symlink) == filepath:593 return int(symlink.split('/')[2])594 except OSError:595 pass596def findWestonScreenshotFilenameRoot():597 # find weston cwd598 for exe in glob.glob("/proc/[1-9][0-9][0-9]*/exe"):599 try:600 if os.path.realpath(exe) == "/usr/bin/weston":601 cwd = os.path.realpath(os.path.dirname(exe) + "/cwd")602 break603 except OSError:604 pass605 else:606 return False, "cannot find weston cwd"607 rv = cwd + "/wayland-screenshot.png"608 return rv609if g_Xavailable:610 takeScreenshot = takeScreenshotOnX611else:612 takeScreenshot = takeScreenshotOnWeston613def shellSOE(command, asyncStatus, asyncOut, asyncError):614 if (asyncStatus, asyncOut, asyncError) != (None, None, None):615 # prepare for decoupled asynchronous execution616 if asyncStatus == None: asyncStatus = "/dev/null"617 if asyncOut == None: asyncOut = "/dev/null"618 if asyncError == None: asyncError = "/dev/null"619 try:620 stdinFile = file("/dev/null", "r")621 stdoutFile = file(asyncOut, "a+")622 stderrFile = file(asyncError, "a+", 0)623 statusFile = file(asyncStatus, "a+")624 except IOError, e:625 return False, (None, None, e)626 try:627 if os.fork() > 0:628 # parent returns after successful fork, there no629 # direct visibility to async child process beyond this630 # point.631 stdinFile.close()632 stdoutFile.close()633 stderrFile.close()634 statusFile.close()635 return True, (0, None, None)636 except OSError, e:637 return False, (None, None, e)638 os.setsid()639 else:640 stdinFile = subprocess.PIPE641 stdoutFile = subprocess.PIPE642 stderrFile = subprocess.PIPE643 try:644 p = subprocess.Popen(command, shell=True,645 stdin=stdinFile,646 stdout=stdoutFile,647 stderr=stderrFile,648 close_fds=True)649 except Exception, e:650 return False, (None, None, e)651 if asyncStatus == None and asyncOut == None and asyncError == None:652 # synchronous execution, read stdout and stderr653 out, err = p.communicate()654 else:655 # asynchronous execution, store status to file656 statusFile.write(str(p.wait()) + "\n")657 statusFile.close()658 out, err = None, None659 return True, (p.returncode, out, err)660def waitOutput(nonblockingFd, acceptedOutputs, timeout, pollInterval=0.1):661 start = time.time()662 endTime = start + timeout663 s = ""664 try: s += nonblockingFd.read()665 except IOError: pass666 foundOutputs = [ao for ao in acceptedOutputs if ao in s]667 while len(foundOutputs) == 0 and time.time() < endTime:668 time.sleep(pollInterval)669 try: s += nonblockingFd.read()670 except IOError: pass671 foundOutputs = [ao for ao in acceptedOutputs if ao in s]672 return foundOutputs, s673_subAgents = {}674def openSubAgent(username, password):675 p = subprocess.Popen('''python -c 'import pty; pty.spawn(["su", "-c", "python /tmp/fmbttizen-agent.py --sub-agent", "-", "%s"])' ''' % (username,),676 shell=True,677 stdin=subprocess.PIPE,678 stdout=subprocess.PIPE,679 stderr=subprocess.PIPE)680 # Read in non-blocking mode to ensure agent starts correctly681 fl = fcntl.fcntl(p.stdout.fileno(), fcntl.F_GETFL)682 fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)683 output2 = ""684 seenPrompts, output1 = waitOutput(p.stdout, ["Password:", "FMBTAGENT"], 5.0)685 if "Password:" in seenPrompts:686 p.stdin.write(password + "\r")687 output1 = ""688 seenPrompts, output2 = waitOutput(p.stdout, ["FMBTAGENT"], 5.0)689 if not "FMBTAGENT" in seenPrompts:690 p.terminate()691 return (None, 'fMBT agent with username "%s" does not answer.' % (username,),692 output1 + output2)693 # Agent is alive, continue in blocking mode694 fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl)695 return p, "", ""696def subAgentCommand(username, password, cmd):697 if not username in _subAgents:698 process, output, error = openSubAgent(username, password)699 if process == None:700 return None, (-1, output, error)701 else:702 _subAgents[username] = process703 p = _subAgents[username]704 p.stdin.write(cmd + "\r")705 p.stdin.flush()706 answer = p.stdout.readline().rstrip()707 if answer.startswith("FMBTAGENT OK "):708 return True, _decode(answer[len("FMBTAGENT OK "):])709 else:710 return False, _decode(answer[len("FMBTAGENT ERROR "):])711def closeSubAgents():712 for username in _subAgents:713 subAgentCommand(username, None, "quit")714if __name__ == "__main__":715 try:716 origTermAttrs = termios.tcgetattr(sys.stdin.fileno())717 hasTerminal = True718 except termios.error:719 origTermAttrs = None720 hasTerminal = False721 if hasTerminal and not "--keep-echo" in sys.argv and not "--debug" in sys.argv:722 # Disable terminal echo723 newTermAttrs = origTermAttrs724 newTermAttrs[3] = origTermAttrs[3] & ~termios.ECHO725 termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, newTermAttrs)726 if "--no-x" in sys.argv:727 debug("X disabled")728 g_Xavailable = False729 platformInfo = {}730 platformInfo["input devices"] = fmbtuinput._g_deviceNames.keys()731 # Send version number, enter main loop732 write_response(True, platformInfo)733 cmd = read_cmd()734 while cmd:735 if cmd.startswith("bl "): # set display backlight time736 if iAmRoot:737 timeout = int(cmd[3:].strip())738 try:739 file("/opt/var/kdb/db/setting/lcd_backlight_normal","wb").write(struct.pack("ii",0x29,timeout))740 write_response(True, None)741 except Exception, e: write_response(False, e)742 else:743 write_response(*subAgentCommand("root", "tizen", cmd))744 elif cmd.startswith("er "): # event recorder745 if iAmRoot:746 cmd, arg = cmd.split(" ", 1)747 if arg.startswith("start "):748 filterOpts = _decode(arg.split()[1])749 if touch_device:750 filterOpts["touchScreen"] = touch_device751 fmbtuinput.startQueueingEvents(filterOpts)752 write_response(True, None)753 elif arg == "stop":754 events = fmbtuinput.stopQueueingEvents()755 write_response(True, None)756 elif arg == "fetch":757 events = fmbtuinput.fetchQueuedEvents()758 write_response(True, events)759 else:760 write_response(*subAgentCommand("root", "tizen", cmd))761 elif cmd.startswith("gd"): # get display status762 try:763 p = subprocess.Popen(['/usr/bin/xset', 'q'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)764 out, err = p.communicate()765 if "Monitor is Off" in out: write_response(True, "Off")766 elif "Monitor is On" in out: write_response(True, "On")767 else: write_response(False, err)768 except Exception, e: write_response(False, e)769 elif cmd.startswith("tm "): # touch move(x, y)770 xs, ys = cmd[3:].strip().split()771 if g_Xavailable:772 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)773 libX11.XFlush(display)774 else:775 if iAmRoot: rv, msg = sendHwMove(int(xs), int(ys))776 else: rv, msg = subAgentCommand("root", "tizen", cmd)777 write_response(True, None)778 elif cmd.startswith("tt "): # touch tap(x, y, button)779 x, y, button = [int(i) for i in cmd[3:].strip().split()]780 if g_Xavailable:781 libXtst.XTestFakeMotionEvent(display, current_screen, x, y, X_CurrentTime)782 libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)783 libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)784 libX11.XFlush(display)785 rv, msg = True, None786 else:787 if iAmRoot: rv, msg = sendHwTap(x, y, button-1)788 else: rv, msg = subAgentCommand("root", "tizen", cmd)789 write_response(rv, msg)790 elif cmd.startswith("td "): # touch down(x, y, button)791 xs, ys, button = cmd[3:].strip().split()792 button = int(button)793 if g_Xavailable:794 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)795 libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)796 libX11.XFlush(display)797 else:798 if iAmRoot: rv, msg = sendHwFingerDown(int(xs), int(ys), button-1)799 else: rv, msg = subAgentCommand("root", "tizen", cmd)800 write_response(True, None)801 elif cmd.startswith("tu "): # touch up(x, y, button)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!