How to use mtLinearGesture method in fMBT

Best Python code snippet using fMBT_python

fmbttizen-agent.py

Source:fmbttizen-agent.py Github

copy

Full Screen

...454 mtEventSend(_EV_ABS, _ABS_MT_SLOT, slot)455 mtEventSend(_EV_ABS, _ABS_MT_TRACKING_ID, -1)456 mtEventSend(0, 0, 0) # SYNC457 del mtEvents[slot]458def mtLinearGesture(listOfStartEndPoints, duration, movePoints, sleepBeforeMove=0, sleepAfterMove=0):459 # listOfStartEndPoints: [ [(finger1startX, finger1startY), (finger1endX, finger1endY)],460 # [(finger2startX, finger2startY), (finger2endX, finger2endY)], ...]461 startPoints = [startEnd[0] for startEnd in listOfStartEndPoints]462 xDist = [startEnd[1][0] - startEnd[0][0] for startEnd in listOfStartEndPoints]463 yDist = [startEnd[1][1] - startEnd[0][1] for startEnd in listOfStartEndPoints]464 movePointsF = float(movePoints)465 fingers = []466 for (x, y) in startPoints:467 fingers.append(mtGestureStart(x, y))468 if sleepBeforeMove > 0: time.sleep(sleepBeforeMove)469 if movePoints > 0:470 intermediateSleep = float(duration) / movePoints471 for i in xrange(1, movePoints + 1):472 if intermediateSleep > 0:473 time.sleep(intermediateSleep)474 for fingerIndex, finger in enumerate(fingers):475 mtGestureMove(finger,476 startPoints[fingerIndex][0] + int(xDist[fingerIndex]*i/movePointsF),477 startPoints[fingerIndex][1] + int(yDist[fingerIndex]*i/movePointsF))478 if sleepAfterMove > 0: time.sleep(sleepAfterMove)479 for finger in fingers:480 mtGestureEnd(finger)481 return True, None482def typeCharX(origChar):483 modifiers = []484 c = specialCharToXString(origChar)485 keysym = libX11.XStringToKeysym(c)486 if keysym == NoSymbol:487 return False488 keycode = libX11.XKeysymToKeycode(display, keysym)489 first = (keycode - cMinKeycode.value) * cKeysymsPerKeycode.value490 try:491 if chr(keysyms[first + 1]) == origChar:492 modifiers.append(shiftModifier)493 except ValueError: pass494 for m in modifiers:495 libXtst.XTestFakeKeyEvent(display, m, X_True, X_CurrentTime)496 libXtst.XTestFakeKeyEvent(display, keycode, X_True, X_CurrentTime)497 libXtst.XTestFakeKeyEvent(display, keycode, X_False, X_CurrentTime)498 for m in modifiers[::-1]:499 libXtst.XTestFakeKeyEvent(display, m, X_False, X_CurrentTime)500 return True501def typeCharHw(origChar):502 for c in origChar:503 modifiers = []504 keyCode = None505 c = specialCharToUsKeys(c)506 if isinstance(c, tuple):507 modifiers = c[:-1]508 keyCode = c[-1]509 elif c in string.uppercase:510 modifiers = ["KEY_LEFTSHIFT"]511 keyCode = "KEY_" + c512 elif c in string.lowercase or c in string.digits:513 keyCode = "KEY_" + c.upper()514 else:515 # do not know how to type the character516 pass517 if keyCode:518 for m in modifiers:519 keyboard_device.press(m)520 keyboard_device.tap(keyCode)521 for m in modifiers[::-1]:522 keyboard_device.release(m)523 return True524if g_Xavailable:525 typeChar = typeCharX526else:527 typeChar = typeCharHw528def typeSequence(s, delayBetweenChars=0):529 skipped = []530 for c in s:531 if not typeChar(c):532 skipped.append(c)533 if delayBetweenChars != 0:534 time.sleep(delayBetweenChars)535 if skipped: return False, skipped536 else: return True, skipped537def takeScreenshotOnX():538 image_p = libX11.XGetImage(display, root_window,539 0, 0, root_width, root_height,540 X_AllPlanes, X_ZPixmap)541 image = image_p[0]542 # FMBTRAWX11 image format header:543 # FMBTRAWX11 [width] [height] [color depth] [bits per pixel]<linefeed>544 # Binary data545 rawfmbt_header = "FMBTRAWX11 %d %d %d %d\n" % (546 image.width, image.height, root_depth.value, image.bits_per_pixel)547 rawfmbt_data = ctypes.string_at(image.data, image.height * image.bytes_per_line)548 compressed_image = rawfmbt_header + zlib.compress(rawfmbt_data, 3)549 libX11.XDestroyImage(image_p)550 return True, compressed_image551def westonTakeScreenshotRoot():552 if westonTakeScreenshotRoot.ssFilename == None:553 westonTakeScreenshotRoot.ssFilename = findWestonScreenshotFilenameRoot()554 try:555 keyboard_device.press("KEY_LEFTMETA")556 keyboard_device.tap("s")557 keyboard_device.release("KEY_LEFTMETA")558 time.sleep(0.5)559 # wait for the screenshot writer to finish560 writerPid = fuser(westonTakeScreenshotRoot.ssFilename)561 if writerPid != None:562 time.sleep(0.25)563 while fuser(westonTakeScreenshotRoot.ssFilename, [writerPid]) != None:564 time.sleep(0.25)565 shutil.move(westonTakeScreenshotRoot.ssFilename, "/tmp/screenshot.png")566 os.chmod("/tmp/screenshot.png", 0666)567 except Exception, e:568 return False, str(e)569 return True, None570westonTakeScreenshotRoot.ssFilename = None571def takeScreenshotOnWeston():572 if iAmRoot:573 rv, status = westonTakeScreenshotRoot()574 else:575 rv, status = subAgentCommand("root", "tizen", "ss weston-root")576 if rv == False:577 return rv, status578 return True, file("/tmp/screenshot.png").read()579def fuser(filename, usualSuspects=None):580 """Returns the pid of a user of given file, or None"""581 filepath = os.path.realpath(filename)582 if not os.access(filepath, os.R_OK):583 raise ValueError('No such file: "%s"' % (filename,))584 if usualSuspects == None:585 procFds = glob.glob("/proc/[1-9][0-9][0-9]*/fd/*")586 else:587 procFds = []588 for pid in usualSuspects:589 procFds.extend(glob.glob("/proc/%s/fd/*" % (pid,)))590 for symlink in procFds:591 try:592 if os.path.realpath(symlink) == filepath:593 return int(symlink.split('/')[2])594 except OSError:595 pass596def findWestonScreenshotFilenameRoot():597 # find weston cwd598 for exe in glob.glob("/proc/[1-9][0-9][0-9]*/exe"):599 try:600 if os.path.realpath(exe) == "/usr/bin/weston":601 cwd = os.path.realpath(os.path.dirname(exe) + "/cwd")602 break603 except OSError:604 pass605 else:606 return False, "cannot find weston cwd"607 rv = cwd + "/wayland-screenshot.png"608 return rv609if g_Xavailable:610 takeScreenshot = takeScreenshotOnX611else:612 takeScreenshot = takeScreenshotOnWeston613def shellSOE(command, asyncStatus, asyncOut, asyncError):614 if (asyncStatus, asyncOut, asyncError) != (None, None, None):615 # prepare for decoupled asynchronous execution616 if asyncStatus == None: asyncStatus = "/dev/null"617 if asyncOut == None: asyncOut = "/dev/null"618 if asyncError == None: asyncError = "/dev/null"619 try:620 stdinFile = file("/dev/null", "r")621 stdoutFile = file(asyncOut, "a+")622 stderrFile = file(asyncError, "a+", 0)623 statusFile = file(asyncStatus, "a+")624 except IOError, e:625 return False, (None, None, e)626 try:627 if os.fork() > 0:628 # parent returns after successful fork, there no629 # direct visibility to async child process beyond this630 # point.631 stdinFile.close()632 stdoutFile.close()633 stderrFile.close()634 statusFile.close()635 return True, (0, None, None)636 except OSError, e:637 return False, (None, None, e)638 os.setsid()639 else:640 stdinFile = subprocess.PIPE641 stdoutFile = subprocess.PIPE642 stderrFile = subprocess.PIPE643 try:644 p = subprocess.Popen(command, shell=True,645 stdin=stdinFile,646 stdout=stdoutFile,647 stderr=stderrFile,648 close_fds=True)649 except Exception, e:650 return False, (None, None, e)651 if asyncStatus == None and asyncOut == None and asyncError == None:652 # synchronous execution, read stdout and stderr653 out, err = p.communicate()654 else:655 # asynchronous execution, store status to file656 statusFile.write(str(p.wait()) + "\n")657 statusFile.close()658 out, err = None, None659 return True, (p.returncode, out, err)660def waitOutput(nonblockingFd, acceptedOutputs, timeout, pollInterval=0.1):661 start = time.time()662 endTime = start + timeout663 s = ""664 try: s += nonblockingFd.read()665 except IOError: pass666 foundOutputs = [ao for ao in acceptedOutputs if ao in s]667 while len(foundOutputs) == 0 and time.time() < endTime:668 time.sleep(pollInterval)669 try: s += nonblockingFd.read()670 except IOError: pass671 foundOutputs = [ao for ao in acceptedOutputs if ao in s]672 return foundOutputs, s673_subAgents = {}674def openSubAgent(username, password):675 p = subprocess.Popen('''python -c 'import pty; pty.spawn(["su", "-c", "python /tmp/fmbttizen-agent.py --sub-agent", "-", "%s"])' ''' % (username,),676 shell=True,677 stdin=subprocess.PIPE,678 stdout=subprocess.PIPE,679 stderr=subprocess.PIPE)680 # Read in non-blocking mode to ensure agent starts correctly681 fl = fcntl.fcntl(p.stdout.fileno(), fcntl.F_GETFL)682 fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)683 output2 = ""684 seenPrompts, output1 = waitOutput(p.stdout, ["Password:", "FMBTAGENT"], 5.0)685 if "Password:" in seenPrompts:686 p.stdin.write(password + "\r")687 output1 = ""688 seenPrompts, output2 = waitOutput(p.stdout, ["FMBTAGENT"], 5.0)689 if not "FMBTAGENT" in seenPrompts:690 p.terminate()691 return (None, 'fMBT agent with username "%s" does not answer.' % (username,),692 output1 + output2)693 # Agent is alive, continue in blocking mode694 fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl)695 return p, "", ""696def subAgentCommand(username, password, cmd):697 if not username in _subAgents:698 process, output, error = openSubAgent(username, password)699 if process == None:700 return None, (-1, output, error)701 else:702 _subAgents[username] = process703 p = _subAgents[username]704 p.stdin.write(cmd + "\r")705 p.stdin.flush()706 answer = p.stdout.readline().rstrip()707 if answer.startswith("FMBTAGENT OK "):708 return True, _decode(answer[len("FMBTAGENT OK "):])709 else:710 return False, _decode(answer[len("FMBTAGENT ERROR "):])711def closeSubAgents():712 for username in _subAgents:713 subAgentCommand(username, None, "quit")714if __name__ == "__main__":715 try:716 origTermAttrs = termios.tcgetattr(sys.stdin.fileno())717 hasTerminal = True718 except termios.error:719 origTermAttrs = None720 hasTerminal = False721 if hasTerminal and not "--keep-echo" in sys.argv and not "--debug" in sys.argv:722 # Disable terminal echo723 newTermAttrs = origTermAttrs724 newTermAttrs[3] = origTermAttrs[3] & ~termios.ECHO725 termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, newTermAttrs)726 if "--no-x" in sys.argv:727 debug("X disabled")728 g_Xavailable = False729 platformInfo = {}730 platformInfo["input devices"] = fmbtuinput._g_deviceNames.keys()731 # Send version number, enter main loop732 write_response(True, platformInfo)733 cmd = read_cmd()734 while cmd:735 if cmd.startswith("bl "): # set display backlight time736 if iAmRoot:737 timeout = int(cmd[3:].strip())738 try:739 file("/opt/var/kdb/db/setting/lcd_backlight_normal","wb").write(struct.pack("ii",0x29,timeout))740 write_response(True, None)741 except Exception, e: write_response(False, e)742 else:743 write_response(*subAgentCommand("root", "tizen", cmd))744 elif cmd.startswith("er "): # event recorder745 if iAmRoot:746 cmd, arg = cmd.split(" ", 1)747 if arg.startswith("start "):748 filterOpts = _decode(arg.split()[1])749 if touch_device:750 filterOpts["touchScreen"] = touch_device751 fmbtuinput.startQueueingEvents(filterOpts)752 write_response(True, None)753 elif arg == "stop":754 events = fmbtuinput.stopQueueingEvents()755 write_response(True, None)756 elif arg == "fetch":757 events = fmbtuinput.fetchQueuedEvents()758 write_response(True, events)759 else:760 write_response(*subAgentCommand("root", "tizen", cmd))761 elif cmd.startswith("gd"): # get display status762 try:763 p = subprocess.Popen(['/usr/bin/xset', 'q'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)764 out, err = p.communicate()765 if "Monitor is Off" in out: write_response(True, "Off")766 elif "Monitor is On" in out: write_response(True, "On")767 else: write_response(False, err)768 except Exception, e: write_response(False, e)769 elif cmd.startswith("tm "): # touch move(x, y)770 xs, ys = cmd[3:].strip().split()771 if g_Xavailable:772 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)773 libX11.XFlush(display)774 else:775 if iAmRoot: rv, msg = sendHwMove(int(xs), int(ys))776 else: rv, msg = subAgentCommand("root", "tizen", cmd)777 write_response(True, None)778 elif cmd.startswith("tt "): # touch tap(x, y, button)779 x, y, button = [int(i) for i in cmd[3:].strip().split()]780 if g_Xavailable:781 libXtst.XTestFakeMotionEvent(display, current_screen, x, y, X_CurrentTime)782 libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)783 libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)784 libX11.XFlush(display)785 rv, msg = True, None786 else:787 if iAmRoot: rv, msg = sendHwTap(x, y, button-1)788 else: rv, msg = subAgentCommand("root", "tizen", cmd)789 write_response(rv, msg)790 elif cmd.startswith("td "): # touch down(x, y, button)791 xs, ys, button = cmd[3:].strip().split()792 button = int(button)793 if g_Xavailable:794 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)795 libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)796 libX11.XFlush(display)797 else:798 if iAmRoot: rv, msg = sendHwFingerDown(int(xs), int(ys), button-1)799 else: rv, msg = subAgentCommand("root", "tizen", cmd)800 write_response(True, None)801 elif cmd.startswith("tu "): # touch up(x, y, button)802 xs, ys, button = cmd[3:].strip().split()803 button = int(button)804 if g_Xavailable:805 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)806 libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)807 libX11.XFlush(display)808 else:809 if iAmRoot: rv, msg = sendHwFingerUp(int(xs), int(ys), button-1)810 else: rv, msg = subAgentCommand("root", "tizen", cmd)811 write_response(True, None)812 elif cmd.startswith("kd "): # hw key down813 if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, -1)814 else: rv, msg = subAgentCommand("root", "tizen", cmd)815 write_response(rv, msg)816 elif cmd.startswith("kp "): # hw key press817 if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, 0)818 else: rv, msg = subAgentCommand("root", "tizen", cmd)819 write_response(rv, msg)820 elif cmd.startswith("ku "): # hw key up821 if iAmRoot: rv, msg = sendHwKey(cmd[3:], -1, 0)822 else: rv, msg = subAgentCommand("root", "tizen", cmd)823 write_response(rv, msg)824 elif cmd.startswith("kt "): # send x events825 if g_Xavailable:826 rv, skippedSymbols = typeSequence(_decode(cmd[3:]))827 libX11.XFlush(display)828 elif iAmRoot:829 rv, skippedSymbols = typeSequence(_decode(cmd[3:]),830 delayBetweenChars=0.05)831 else:832 rv, skippedSymbols = subAgentCommand("root", "tizen", cmd)833 write_response(rv, skippedSymbols)834 elif cmd.startswith("ml "): # send multitouch linear gesture835 if iAmRoot:836 rv, _ = mtLinearGesture(*_decode(cmd[3:]))837 else:838 rv, _ = subAgentCommand("root", "tizen", cmd)839 write_response(rv, _)840 elif cmd.startswith("ss"): # save screenshot841 if "R" in cmd.split() and g_Xavailable:842 resetXConnection()843 if "weston-root" in cmd.split(): # do Weston root part only844 write_response(*westonTakeScreenshotRoot())845 else:846 rv, compressedImage = takeScreenshot()847 write_response(rv, compressedImage)848 elif cmd.startswith("sd "): # set screen dimensions (width and height)849 _sw, _sh = cmd[3:].split()850 screenWidth, screenHeight = int(_sw), int(_sh)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful