Best Python code snippet using uiautomator
test_flow.py
Source:test_flow.py
1# -*- coding: utf-8 -*-2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.3# Use of this source code is governed by a BSD-style license that can be4# found in the LICENSE file.5"""Guide the user to perform gestures. Record and validate the gestures."""6import fcntl7import glob8import os9import subprocess10import sys11import common_util12import firmware_log13import firmware_utils14import fuzzy15import mini_color16import mtb17import touchbotII_robot_wrapper as robot_wrapper18import test_conf as conf19import validators20from firmware_utils import GestureList21sys.path.append('../../bin/input')22import input_device23# Include some constants24from firmware_constants import DEV, GV, MODE, OPTIONS, TFK25class TestFlow:26 """Guide the user to perform gestures. Record and validate the gestures."""27 def __init__(self, device_geometry, device, keyboard, win, parser, output,28 test_version, board, firmware_version, options):29 self.device_geometry = device_geometry30 self.device = device31 self.device_node = self.device.device_node32 self.keyboard = keyboard33 self.firmware_version = firmware_version34 self.output = output35 self.board = board36 self.test_version = test_version37 self.output.print_report('%s' % test_version)38 self._get_record_cmd()39 self.win = win40 self.parser = parser41 self.packets = None42 self.gesture_file_name = None43 self.prefix_space = self.output.get_prefix_space()44 self.scores = []45 self.mode = options[OPTIONS.MODE]46 self.fngenerator_only = options[OPTIONS.FNGENERATOR]47 self.iterations = options[OPTIONS.ITERATIONS]48 self.replay_dir = options[OPTIONS.REPLAY]49 self.resume_dir = options[OPTIONS.RESUME]50 self.recording = not any([bool(self.replay_dir), bool(self.resume_dir)])51 self.device_type = (DEV.TOUCHSCREEN if options[OPTIONS.TOUCHSCREEN]52 else DEV.TOUCHPAD)53 self.robot = robot_wrapper.RobotWrapper(self.board, options)54 self.robot_waiting = False55 self.gv_count = float('infinity')56 gesture_names = self._get_gesture_names()57 order = None58 if self._is_robot_mode():59 order = lambda x: conf.finger_tips_required[x.name]60 self.gesture_list = GestureList(gesture_names).get_gesture_list(order)61 self._get_all_gesture_variations(options[OPTIONS.SIMPLIFIED])62 self.init_flag = False63 self.system_device = self._non_blocking_open(self.device_node)64 self.evdev_device = input_device.InputEvent()65 self.screen_shot = firmware_utils.ScreenShot(self.geometry_str)66 self.mtb_evemu = mtb.MtbEvemu(device)67 self._rename_old_log_and_html_files()68 self._set_static_prompt_messages()69 self.gesture_image_name = None70 self.gesture_continues_flag = False71 self.use_existent_event_file_flag = False72 def __del__(self):73 self.system_device.close()74 def _rename_old_log_and_html_files(self):75 """When in replay or resume mode, rename the old log and html files."""76 if self.replay_dir or self.resume_dir:77 for file_type in ['*.log', '*.html']:78 path_names = os.path.join(self.output.log_dir, file_type)79 for old_path_name in glob.glob(path_names):80 new_path_name = '.'.join([old_path_name, 'old'])81 os.rename(old_path_name, new_path_name)82 def _is_robot_mode(self):83 return self.robot.is_robot_action_mode() or self.mode == MODE.ROBOT_SIM84 def _get_gesture_names(self):85 """Determine the gesture names based on the mode."""86 if self.mode == MODE.QUICKSTEP:87 return conf.gesture_names_quickstep88 elif self.mode == MODE.NOISE:89 return conf.gesture_names_noise_extended90 elif self._is_robot_mode():91 # The mode could be MODE.ROBOT or MODE.ROBOT_SIM.92 # The same gesture names list is used in both modes.93 return conf.gesture_names_robot[self.device_type]94 elif self.mode == MODE.MANUAL:95 # Define the manual list which is gesture_names_complete:96 # gesture_names_robot - gesture_names_equipment_required97 manual_set = (set(conf.gesture_names_complete[self.device_type]) -98 set(conf.gesture_names_robot[self.device_type]))99 return list(manual_set - set(conf.gesture_names_fngenerator_required))100 elif self.mode == MODE.CALIBRATION:101 return conf.gesture_names_calibration102 else:103 # Filter out tests that need a function generator for COMPLETE mode104 # unless they've indicated that they have one105 return [n for n in conf.gesture_names_complete[self.device_type]106 if (self.fngenerator_only or107 n not in conf.gesture_names_fngenerator_required)]108 def _non_blocking_open(self, filename):109 """Open the file in non-blocing mode."""110 fd = open(filename)111 fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)112 return fd113 def _non_blocking_read(self, dev, fd):114 """Non-blocking read on fd."""115 try:116 dev.read(fd)117 event = (dev.tv_sec, dev.tv_usec, dev.type, dev.code, dev.value)118 except Exception, e:119 event = None120 return event121 def _reopen_system_device(self):122 """Close the device and open a new one."""123 self.system_device.close()124 self.system_device = open(self.device_node)125 self.system_device = self._non_blocking_open(self.device_node)126 def _set_static_prompt_messages(self):127 """Set static prompt messages."""128 # Prompt for next gesture.129 self._prompt_next = (130 "Press SPACE to save this file and go to next test,\n"131 " 'm' to save this file and record again,\n"132 " 'd' to delete this file and try again,\n"133 " 'x' to discard this file and exit.")134 # Prompt to see test result through timeout callback.135 self._prompt_result = (136 "Perform the gesture now.\n"137 "See the test result on the right after finger lifted.\n"138 "Or press 'x' to exit.")139 def _get_prompt_abnormal_gestures(self, warn_msg):140 """Prompt for next gesture."""141 prompt = '\n'.join(142 ["It is very likely that you perform a WRONG gesture!",143 warn_msg,144 "Press 'd' to delete this file and try again (recommended),",145 " SPACE to save this file if you are sure it's correct,",146 " 'x' to discard this file and exit."])147 return prompt148 def _get_prompt_no_data(self):149 """Prompt to remind user of performing gestures."""150 prompt = ("You need to perform the specified gestures "151 "before pressing SPACE.\n")152 return prompt + self._prompt_result153 def _get_record_cmd(self):154 """Get the device event record command."""155 # Run mtplot with settings to disable clearing the display if the robot156 # clicks the pad, and adding a visible click indicator in the output157 self.record_program = 'mtplot -s1 -c0 -m0'158 if not common_util.program_exists(self.record_program):159 msg = 'Error: the program "%s" does not exist in $PATH.'160 self.output.print_report(msg % self.record_program)161 exit(1)162 display_name = firmware_utils.get_display_name()163 self.geometry_str = '%dx%d+%d+%d' % self.device_geometry164 format_str = '%s %s -d %s -g %s'165 self.record_cmd = format_str % (self.record_program,166 self.device_node,167 display_name,168 self.geometry_str)169 self.output.print_report('Record program: %s' % self.record_cmd)170 def _span_seq(self, seq1, seq2):171 """Span sequence seq1 over sequence seq2.172 E.g., seq1 = (('a', 'b'), 'c')173 seq2 = ('1', ('2', '3'))174 res = (('a', 'b', '1'), ('a', 'b', '2', '3'),175 ('c', '1'), ('c', '2', '3'))176 E.g., seq1 = ('a', 'b')177 seq2 = ('1', '2', '3')178 res = (('a', '1'), ('a', '2'), ('a', '3'),179 ('b', '1'), ('b', '2'), ('b', '3'))180 E.g., seq1 = (('a', 'b'), ('c', 'd'))181 seq2 = ('1', '2', '3')182 res = (('a', 'b', '1'), ('a', 'b', '2'), ('a', 'b', '3'),183 ('c', 'd', '1'), ('c', 'd', '2'), ('c', 'd', '3'))184 """185 to_list = lambda s: list(s) if isinstance(s, tuple) else [s]186 return tuple(tuple(to_list(s1) + to_list(s2)) for s1 in seq1187 for s2 in seq2)188 def span_variations(self, seq):189 """Span the variations of a gesture."""190 if seq is None:191 return (None,)192 elif isinstance(seq[0], tuple):193 return reduce(self._span_seq, seq)194 else:195 return seq196 def _stop(self):197 """Terminate the recording process."""198 self.record_proc.poll()199 # Terminate the process only when it was not terminated yet.200 if self.record_proc.returncode is None:201 self.record_proc.terminate()202 self.record_proc.wait()203 self.output.print_window('')204 def _get_gesture_image_name(self):205 """Get the gesture file base name without file extension."""206 filepath = os.path.splitext(self.gesture_file_name)[0]207 self.gesture_image_name = filepath + '.png'208 return filepath209 def _close_gesture_file(self):210 """Close the gesture file."""211 if self.gesture_file.closed:212 return213 filename = self.gesture_file.name214 self.gesture_file.close()215 # Strip off the header of the gesture file.216 #217 # Input driver version is 1.0.1218 # Input device ID: bus 0x18 vendor 0x0 product 0x0 version 0x0219 # Input device name: "Atmel maXTouch Touchpad"220 # ...221 # Testing ... (interrupt to exit)222 # Event: time 519.855, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID),223 # value 884224 #225 tmp_filename = filename + '.tmp'226 os.rename(filename, tmp_filename)227 with open(tmp_filename) as src_f:228 with open(filename, 'w') as dst_f:229 for line in src_f:230 if line.startswith('Event:'):231 dst_f.write(line)232 os.remove(tmp_filename)233 def _stop_record_and_post_image(self):234 """Terminate the recording process."""235 if self.record_new_file:236 self._close_gesture_file()237 self.screen_shot.dump_root(self._get_gesture_image_name())238 self.record_proc.terminate()239 self.record_proc.wait()240 else:241 self._get_gesture_image_name()242 self.win.set_image(self.gesture_image_name)243 def _create_prompt(self, test, variation):244 """Create a color prompt."""245 prompt = test.prompt246 if isinstance(variation, tuple):247 subprompt = reduce(lambda s1, s2: s1 + s2,248 tuple(test.subprompt[s] for s in variation))249 elif variation is None or test.subprompt is None:250 subprompt = None251 else:252 subprompt = test.subprompt[variation]253 if subprompt is None:254 color_prompt = prompt255 monochrome_prompt = prompt256 else:257 color_prompt = mini_color.color_string(prompt, '{', '}', 'green')258 color_prompt = color_prompt.format(*subprompt)259 monochrome_prompt = prompt.format(*subprompt)260 color_msg_format = mini_color.color_string('\n<%s>:\n%s%s', '<', '>',261 'blue')262 color_msg = color_msg_format % (test.name, self.prefix_space,263 color_prompt)264 msg = '%s: %s' % (test.name, monochrome_prompt)265 glog = firmware_log.GestureLog()266 glog.name = test.name267 glog.variation = variation268 glog.prompt = monochrome_prompt269 return (msg, color_msg, glog)270 def _choice_exit(self):271 """Procedure to exit."""272 self._stop()273 if os.path.exists(self.gesture_file_name):274 os.remove(self.gesture_file_name)275 self.output.print_report(self.deleted_msg)276 def _stop_record_and_rm_file(self):277 """Stop recording process and remove the current gesture file."""278 self._stop()279 if os.path.exists(self.gesture_file_name):280 os.remove(self.gesture_file_name)281 self.output.print_report(self.deleted_msg)282 def _create_gesture_file_name(self, gesture, variation):283 """Create the gesture file name based on its variation.284 Examples of different levels of file naming:285 Primary name:286 pinch_to_zoom.zoom_in-lumpy-fw_11.27287 Root name:288 pinch_to_zoom.zoom_in-lumpy-fw_11.27-manual-20130221_050510289 Base name:290 pinch_to_zoom.zoom_in-lumpy-fw_11.27-manual-20130221_050510.dat291 """292 if variation is None:293 gesture_name = gesture.name294 else:295 if type(variation) is tuple:296 name_list = [gesture.name,] + list(variation)297 else:298 name_list = [gesture.name, variation]299 gesture_name = '.'.join(name_list)300 self.primary_name = conf.filename.sep.join([301 gesture_name,302 self.board,303 conf.fw_prefix + self.firmware_version])304 root_name = conf.filename.sep.join([305 self.primary_name,306 self.mode,307 firmware_utils.get_current_time_str()])308 basename = '.'.join([root_name, conf.filename.ext])309 return basename310 def _add_scores(self, new_scores):311 """Add the new scores of a single gesture to the scores list."""312 if new_scores is not None:313 self.scores += new_scores314 def _final_scores(self, scores):315 """Print the final score."""316 # Note: conf.score_aggregator uses a function in fuzzy module.317 final_score = eval(conf.score_aggregator)(scores)318 self.output.print_report('\nFinal score: %s\n' % str(final_score))319 def _robot_action(self):320 """Control the robot to perform the action."""321 if self._is_robot_mode() or self.robot.is_manual_noise_test_mode():322 self.robot.configure_noise(self.gesture, self.variation)323 if self._is_robot_mode():324 self.robot.control(self.gesture, self.variation)325 # Once the script terminates start a timeout to clean up if one326 # hasn't already been set to keep the test suite from hanging.327 if not self.gesture_begins_flag:328 self.win.register_timeout_add(self.gesture_timeout_callback,329 self.gesture.timeout)330 def _handle_user_choice_save_after_parsing(self, next_gesture=True):331 """Handle user choice for saving the parsed gesture file."""332 self.output.print_window('')333 if self.use_existent_event_file_flag or self.recording:334 if self.saved_msg:335 self.output.print_report(self.saved_msg)336 if self.new_scores:337 self._add_scores(self.new_scores)338 self.output.report_html.insert_image(self.gesture_image_name)339 self.output.report_html.flush()340 # After flushing to report_html, reset the gesture_image_name so that341 # it will not be reused by next gesture variation accidentally.342 self.gesture_image_name = None343 if self._pre_setup_this_gesture_variation(next_gesture=next_gesture):344 # There are more gestures.345 self._setup_this_gesture_variation()346 self._robot_action()347 else:348 # No more gesture.349 self._final_scores(self.scores)350 self.output.stop()351 self.output.report_html.stop()352 self.win.stop()353 self.packets = None354 def _handle_user_choice_discard_after_parsing(self):355 """Handle user choice for discarding the parsed gesture file."""356 self.output.print_window('')357 self._setup_this_gesture_variation()358 self._robot_action()359 self.packets = None360 def _handle_user_choice_exit_after_parsing(self):361 """Handle user choice to exit after the gesture file is parsed."""362 self._stop_record_and_rm_file()363 self.output.stop()364 self.output.report_html.stop()365 self.win.stop()366 def check_for_wrong_number_of_fingers(self, details):367 flag_found = False368 try:369 position = details.index('CountTrackingIDValidator')370 except ValueError as e:371 return None372 # An example of the count of tracking IDs:373 # ' count of trackid IDs: 1'374 number_tracking_ids = int(details[position + 1].split()[-1])375 # An example of the criteria_str looks like:376 # ' criteria_str: == 2'377 criteria = int(details[position + 2].split()[-1])378 if number_tracking_ids < criteria:379 print ' CountTrackingIDValidator: '380 print ' number_tracking_ids: ', number_tracking_ids381 print ' criteria: ', criteria382 print ' number_tracking_ids should be larger!'383 msg = 'Number of Tracking IDs should be %d instead of %d'384 return msg % (criteria, number_tracking_ids)385 return None386 def _empty_packets_is_legal_result(self):387 return ('tap' in self.gesture.name and self._is_robot_mode())388 def _handle_user_choice_validate_before_parsing(self):389 """Handle user choice for validating before gesture file is parsed."""390 # Parse the device events. Make sure there are events.391 self.packets = self.parser.parse_file(self.gesture_file_name)392 if self.packets or self._empty_packets_is_legal_result():393 # Validate this gesture and get the results.394 (self.new_scores, msg_list, vlogs) = validators.validate(395 self.packets, self.gesture, self.variation)396 # If the number of tracking IDs is less than the expected value,397 # the user probably made a wrong gesture.398 error = self.check_for_wrong_number_of_fingers(msg_list)399 if error:400 prompt = self._get_prompt_abnormal_gestures(error)401 color = 'red'402 else:403 prompt = self._prompt_next404 color = 'black'405 self.output.print_window(msg_list)406 self.output.buffer_report(msg_list)407 self.output.report_html.insert_validator_logs(vlogs)408 self.win.set_prompt(prompt, color=color)409 print prompt410 self._stop_record_and_post_image()411 else:412 self.win.set_prompt(self._get_prompt_no_data(), color='red')413 def _handle_user_choice_exit_before_parsing(self):414 """Handle user choice to exit before the gesture file is parsed."""415 self._close_gesture_file()416 self._handle_user_choice_exit_after_parsing()417 def _is_parsing_gesture_file_done(self):418 """Is parsing the gesture file done?"""419 return self.packets is not None420 def _is_arrow_key(self, choice):421 """Is this an arrow key?"""422 return (choice in TFK.ARROW_KEY_LIST)423 def user_choice_callback(self, fd, condition):424 """A callback to handle the key pressed by the user.425 This is the primary GUI event-driven method handling the user input.426 """427 choice = self.keyboard.get_key_press_event(fd)428 if choice:429 self._handle_keyboard_event(choice)430 return True431 def _handle_keyboard_event(self, choice):432 """Handle the keyboard event."""433 if self._is_arrow_key(choice):434 self.win.scroll(choice)435 elif self.robot_waiting:436 # The user wants the robot to start its action.437 if choice in (TFK.SAVE, TFK.SAVE2):438 self.robot_waiting = False439 self._robot_action()440 # The user wants to exit.441 elif choice == TFK.EXIT:442 self._handle_user_choice_exit_after_parsing()443 elif self._is_parsing_gesture_file_done():444 # Save this gesture file and go to next gesture.445 if choice in (TFK.SAVE, TFK.SAVE2):446 self._handle_user_choice_save_after_parsing()447 # Save this file and perform the same gesture again.448 elif choice == TFK.MORE:449 self._handle_user_choice_save_after_parsing(next_gesture=False)450 # Discard this file and perform the gesture again.451 elif choice == TFK.DISCARD:452 self._handle_user_choice_discard_after_parsing()453 # The user wants to exit.454 elif choice == TFK.EXIT:455 self._handle_user_choice_exit_after_parsing()456 # The user presses any wrong key.457 else:458 self.win.set_prompt(self._prompt_next, color='red')459 else:460 if choice == TFK.EXIT:461 self._handle_user_choice_exit_before_parsing()462 # The user presses any wrong key.463 else:464 self.win.set_prompt(self._prompt_result, color='red')465 def _get_all_gesture_variations(self, simplified):466 """Get all variations for all gestures."""467 gesture_variations_list = []468 self.variations_dict = {}469 for gesture in self.gesture_list:470 variations_list = []471 variations = self.span_variations(gesture.variations)472 for variation in variations:473 gesture_variations_list.append((gesture, variation))474 variations_list.append(variation)475 if simplified:476 break477 self.variations_dict[gesture.name] = variations_list478 self.gesture_variations = iter(gesture_variations_list)479 def gesture_timeout_callback(self):480 """A callback watching whether a gesture has timed out."""481 if self.replay_dir:482 # There are event files to replay for this gesture variation.483 if self.use_existent_event_file_flag:484 self._handle_user_choice_validate_before_parsing()485 self._handle_user_choice_save_after_parsing(next_gesture=True)486 return False487 # A gesture is stopped only when two conditions are met simultaneously:488 # (1) there are no reported packets for a timeout interval, and489 # (2) the number of tracking IDs is 0.490 elif (self.gesture_continues_flag or491 not self.mtb_evemu.all_fingers_leaving()):492 self.gesture_continues_flag = False493 return True494 else:495 self._handle_user_choice_validate_before_parsing()496 self.win.remove_event_source(self.gesture_file_watch_tag)497 if self._is_robot_mode():498 self._handle_keyboard_event(TFK.SAVE)499 return False500 def gesture_file_watch_callback(self, fd, condition, evdev_device):501 """A callback to watch the device input."""502 # Read the device node continuously until end503 event = True504 while event:505 event = self._non_blocking_read(evdev_device, fd)506 if event:507 self.mtb_evemu.process_event(event)508 self.gesture_continues_flag = True509 if (not self.gesture_begins_flag):510 self.gesture_begins_flag = True511 self.win.register_timeout_add(self.gesture_timeout_callback,512 self.gesture.timeout)513 return True514 def init_gesture_setup_callback(self, widget, event):515 """A callback to set up environment before a user starts a gesture."""516 if not self.init_flag:517 self.init_flag = True518 self._pre_setup_this_gesture_variation()519 self._setup_this_gesture_variation()520 self._robot_action()521 def _get_existent_event_files(self):522 """Get the existent event files that starts with the primary_name."""523 primary_pathnames = os.path.join(self.output.log_dir,524 self.primary_name + '*.dat')525 self.primary_gesture_files = glob.glob(primary_pathnames)526 # Reverse sorting the file list so that we could pop from the tail.527 self.primary_gesture_files.sort()528 self.primary_gesture_files.reverse()529 def _use_existent_event_file(self):530 """If the replay flag is set in the command line, and there exists a531 file(s) with the same primary name, then use the existent file(s)532 instead of recording a new one.533 """534 if self.primary_gesture_files:535 self.gesture_file_name = self.primary_gesture_files.pop()536 return True537 return False538 def _pre_setup_this_gesture_variation(self, next_gesture=True):539 """Get gesture, variation, filename, prompt, etc."""540 next_gesture_first_time = False541 if next_gesture:542 if self.gv_count < self.iterations:543 self.gv_count += 1544 else:545 self.gv_count = 1546 gesture_variation = next(self.gesture_variations, None)547 if gesture_variation is None:548 return False549 self.gesture, self.variation = gesture_variation550 next_gesture_first_time = True551 basename = self._create_gesture_file_name(self.gesture, self.variation)552 if next_gesture_first_time:553 self._get_existent_event_files()554 if self.replay_dir or self.resume_dir:555 self.use_existent_event_file_flag = self._use_existent_event_file()556 if ((not self.replay_dir and not self.resume_dir) or557 (self.resume_dir and not self.use_existent_event_file_flag)):558 self.gesture_file_name = os.path.join(self.output.log_dir, basename)559 self.saved_msg = '(saved: %s)\n' % self.gesture_file_name560 self.deleted_msg = '(deleted: %s)\n' % self.gesture_file_name561 else:562 self.saved_msg = None563 self.deleted_msg = None564 self.new_scores = None565 if self.robot.is_robot_action_mode() or self.robot.is_manual_noise_test_mode():566 self.robot.turn_off_noise()567 (msg, color_msg, glog) = self._create_prompt(self.gesture,568 self.variation)569 self.win.set_gesture_name(msg)570 self.output.report_html.insert_gesture_log(glog)571 print color_msg572 self.output.print_report(color_msg)573 return True574 def _setup_this_gesture_variation(self):575 """Set up the recording process or use an existent event data file."""576 if self.replay_dir:577 self.record_new_file = False578 self.win.register_timeout_add(self.gesture_timeout_callback, 0)579 return580 if self.resume_dir and self.use_existent_event_file_flag:581 self.record_new_file = False582 self._handle_user_choice_validate_before_parsing()583 self._handle_keyboard_event(TFK.SAVE)584 return585 # Initiate the MtbSanityValidator. Note that this should be done each586 # time just before recording the gesture file since it requires a587 # snapshot of the input device before any finger touching the device.588 self.gesture.mtb_sanity_validator = validators.MtbSanityValidator()589 # Now, we will record a new gesture event file.590 # Fork a new process for mtplot. Add io watch for the gesture file.591 self.record_new_file = True592 self.gesture_file = open(self.gesture_file_name, 'w')593 self.record_proc = subprocess.Popen(self.record_cmd.split(),594 stdout=self.gesture_file)595 # Watch if data come in to the monitored file.596 self.gesture_begins_flag = False597 self._reopen_system_device()598 self.gesture_file_watch_tag = self.win.register_io_add_watch(599 self.gesture_file_watch_callback, self.system_device,...
gesture.py
Source:gesture.py
...9 g.add_stroke(point_list=[(1,1), (3,4), (2,1)])10 g.normalize()11 # Add it to the database12 gdb = GestureDatabase()13 gdb.add_gesture(g)14 # And for the next gesture, try to find it!15 g2 = Gesture()16 # ...17 gdb.find(g2)18.. warning::19 You don't really want to do this: it's more of an example of how20 to construct gestures dynamically. Typically, you would21 need a lot more points, so it's better to record gestures in a file and22 reload them to compare later. Look in the examples/gestures directory for23 an example of how to do that.24'''25__all__ = ('Gesture', 'GestureDatabase', 'GesturePoint', 'GestureStroke')26import pickle27import base6428import zlib29import math30from kivy.vector import Vector31from io import BytesIO32class GestureDatabase(object):33 '''Class to handle a gesture database.'''34 def __init__(self):35 self.db = []36 def add_gesture(self, gesture):37 '''Add a new gesture to the database.'''38 self.db.append(gesture)39 def find(self, gesture, minscore=0.9, rotation_invariant=True):40 '''Find a matching gesture in the database.'''41 if not gesture:42 return43 best = None44 bestscore = minscore45 for g in self.db:46 score = g.get_score(gesture, rotation_invariant)47 if score < bestscore:48 continue49 bestscore = score50 best = g51 if not best:52 return53 return (bestscore, best)54 def gesture_to_str(self, gesture):55 '''Convert a gesture into a unique string.'''56 io = BytesIO()57 p = pickle.Pickler(io)58 p.dump(gesture)59 data = base64.b64encode(zlib.compress(io.getvalue(), 9))60 return data61 def str_to_gesture(self, data):62 '''Convert a unique string to a gesture.'''63 io = BytesIO(zlib.decompress(base64.b64decode(data)))64 p = pickle.Unpickler(io)65 gesture = p.load()66 return gesture67class GesturePoint:68 def __init__(self, x, y):69 '''Stores the x,y coordinates of a point in the gesture.'''70 self.x = float(x)71 self.y = float(y)72 def scale(self, factor):73 ''' Scales the point by the given factor.'''74 self.x *= factor75 self.y *= factor76 return self77 def __repr__(self):78 return 'Mouse_point: %f,%f' % (self.x, self.y)79class GestureStroke:80 ''' Gestures can be made up of multiple strokes.'''81 def __init__(self):82 ''' A stroke in the gesture.'''83 self.points = list()84 self.screenpoints = list()85 # These return the min and max coordinates of the stroke86 @property87 def max_x(self):88 if len(self.points) == 0:89 return 090 return max(self.points, key=lambda pt: pt.x).x91 @property92 def min_x(self):93 if len(self.points) == 0:94 return 095 return min(self.points, key=lambda pt: pt.x).x96 @property97 def max_y(self):98 if len(self.points) == 0:99 return 0100 return max(self.points, key=lambda pt: pt.y).y101 @property102 def min_y(self):103 if len(self.points) == 0:104 return 0105 return min(self.points, key=lambda pt: pt.y).y106 def add_point(self, x, y):107 '''108 add_point(x=x_pos, y=y_pos)109 Adds a point to the stroke.110 '''111 self.points.append(GesturePoint(x, y))112 self.screenpoints.append((x, y))113 def scale_stroke(self, scale_factor):114 '''115 scale_stroke(scale_factor=float)116 Scales the stroke down by scale_factor.117 '''118 self.points = [pt.scale(scale_factor) for pt in self.points]119 def points_distance(self, point1, point2):120 '''121 points_distance(point1=GesturePoint, point2=GesturePoint)122 Returns the distance between two GesturePoints.123 '''124 x = point1.x - point2.x125 y = point1.y - point2.y126 return math.sqrt(x * x + y * y)127 def stroke_length(self, point_list=None):128 '''Finds the length of the stroke. If a point list is given,129 finds the length of that list.130 '''131 if point_list is None:132 point_list = self.points133 gesture_length = 0.0134 if len(point_list) <= 1: # If there is only one point -> no length135 return gesture_length136 for i in range(len(point_list) - 1):137 gesture_length += self.points_distance(138 point_list[i], point_list[i + 1])139 return gesture_length140 def normalize_stroke(self, sample_points=32):141 '''Normalizes strokes so that every stroke has a standard number of142 points. Returns True if stroke is normalized, False if it can't be143 normalized. sample_points controls the resolution of the stroke.144 '''145 # If there is only one point or the length is 0, don't normalize146 if len(self.points) <= 1 or self.stroke_length(self.points) == 0.0:147 return False148 # Calculate how long each point should be in the stroke149 target_stroke_size = \150 self.stroke_length(self.points) / float(sample_points)151 new_points = list()152 new_points.append(self.points[0])153 # We loop on the points154 prev = self.points[0]155 src_distance = 0.0156 dst_distance = target_stroke_size157 for curr in self.points[1:]:158 d = self.points_distance(prev, curr)159 if d > 0:160 prev = curr161 src_distance = src_distance + d162 # The new point need to be inserted into the163 # segment [prev, curr]164 while dst_distance < src_distance:165 x_dir = curr.x - prev.x166 y_dir = curr.y - prev.y167 ratio = (src_distance - dst_distance) / d168 to_x = x_dir * ratio + prev.x169 to_y = y_dir * ratio + prev.y170 new_points.append(GesturePoint(to_x, to_y))171 dst_distance = self.stroke_length(self.points) / \172 float(sample_points) * len(new_points)173 # If this happens, we are into troubles...174 if not len(new_points) == sample_points:175 raise ValueError('Invalid number of strokes points; got '176 '%d while it should be %d' %177 (len(new_points), sample_points))178 self.points = new_points179 return True180 def center_stroke(self, offset_x, offset_y):181 '''Centers the stroke by offsetting the points.'''182 for point in self.points:183 point.x -= offset_x184 point.y -= offset_y185class Gesture:186 '''A python implementation of a gesture recognition algorithm by187 Oleg Dopertchouk: http://www.gamedev.net/reference/articles/article2039.asp188 Implemented by Jeiel Aranal (chemikhazi@gmail.com),189 released into the public domain.190 '''191 # Tolerance for evaluation using the '==' operator192 DEFAULT_TOLERANCE = 0.1193 def __init__(self, tolerance=None):194 '''195 Gesture([tolerance=float])196 Creates a new gesture with an optional matching tolerance value.197 '''198 self.width = 0.199 self.height = 0.200 self.gesture_product = 0.201 self.strokes = list()202 if tolerance is None:203 self.tolerance = Gesture.DEFAULT_TOLERANCE204 else:205 self.tolerance = tolerance206 def _scale_gesture(self):207 ''' Scales down the gesture to a unit of 1.'''208 # map() creates a list of min/max coordinates of the strokes209 # in the gesture and min()/max() pulls the lowest/highest value210 min_x = min([stroke.min_x for stroke in self.strokes])211 max_x = max([stroke.max_x for stroke in self.strokes])212 min_y = min([stroke.min_y for stroke in self.strokes])213 max_y = max([stroke.max_y for stroke in self.strokes])214 x_len = max_x - min_x215 self.width = x_len216 y_len = max_y - min_y217 self.height = y_len218 scale_factor = max(x_len, y_len)219 if scale_factor <= 0.0:220 return False221 scale_factor = 1.0 / scale_factor222 for stroke in self.strokes:223 stroke.scale_stroke(scale_factor)224 return True225 def _center_gesture(self):226 ''' Centers the Gesture.points of the gesture.'''227 total_x = 0.0228 total_y = 0.0229 total_points = 0230 for stroke in self.strokes:231 # adds up all the points inside the stroke232 stroke_y = sum([pt.y for pt in stroke.points])233 stroke_x = sum([pt.x for pt in stroke.points])234 total_y += stroke_y235 total_x += stroke_x236 total_points += len(stroke.points)237 if total_points == 0:238 return False239 # Average to get the offset240 total_x /= total_points241 total_y /= total_points242 # Apply the offset to the strokes243 for stroke in self.strokes:244 stroke.center_stroke(total_x, total_y)245 return True246 def add_stroke(self, point_list=None):247 '''Adds a stroke to the gesture and returns the Stroke instance.248 Optional point_list argument is a list of the mouse points for249 the stroke.250 '''251 self.strokes.append(GestureStroke())252 if isinstance(point_list, list) or isinstance(point_list, tuple):253 for point in point_list:254 if isinstance(point, GesturePoint):255 self.strokes[-1].points.append(point)256 elif isinstance(point, list) or isinstance(point, tuple):257 if len(point) != 2:258 raise ValueError("Stroke entry must have 2 values max")259 self.strokes[-1].add_point(point[0], point[1])260 else:261 raise TypeError("The point list should either be "262 "tuples of x and y or a list of "263 "GesturePoint objects")264 elif point_list is not None:265 raise ValueError("point_list should be a tuple/list")266 return self.strokes[-1]267 def normalize(self, stroke_samples=32):268 '''Runs the gesture normalization algorithm and calculates the dot269 product with self.270 '''271 if not self._scale_gesture() or not self._center_gesture():272 self.gesture_product = False273 return False274 for stroke in self.strokes:275 stroke.normalize_stroke(stroke_samples)276 self.gesture_product = self.dot_product(self)277 def get_rigid_rotation(self, dstpts):278 '''279 Extract the rotation to apply to a group of points to minimize the280 distance to a second group of points. The two groups of points are281 assumed to be centered. This is a simple version that just picks282 an angle based on the first point of the gesture.283 '''284 if len(self.strokes) < 1 or len(self.strokes[0].points) < 1:285 return 0...
gestures.py
Source:gestures.py
1"""Gesture detection for SNAP Badge, using the LIS3DH accelerometer.2Note: this could be enhanced to use the LIS3D hardware for detection, with the advantage of much lower3 cpu utilization and better performance including wake on interrupt. The initial software based4 implementation has the advantage of being easy to understand and experiment with.5"""6from drivers.lis3dh_accel import *7# Detected gestures8GESTURE_DOWN = 0 # Badge held vertical, then flipped down face-up9GESTURE_ZERO_G = 1 # Badge experienced zero-g condition10GESTURE_HIGH_G = 2 # Badge experienced high-g condition11# Double tap configuration parameters 12DT_CFG1 = 0x77 # ODR = 400Hz13DT_CFG2 = 0x84 # Enable High-pass filter14DT_CFG3 = 0xc0 # Set INT1 pin15DT_SRC = 0x2016DT_THRESHOLD = 0x5017DT_CFG = 0x2018DT_LIMIT = 0x17 # 57.5ms19DT_LATENCY = 0x25 # 92.5ms20DT_WINDOW = 0x30 # 120ms21gesture_debounce = 2022gesture_cb = None23def gesture_set_callback(cb):24 """Set callback when gesture detected. Callback receives detected gesture type as parameter.25 ex: def my_gestures(gesture_type): ...26 """27 global gesture_cb28 gesture_cb = cb29def gesture_update_accel():30 """Track accelerometer position readings from last poll. Enables determining inertial 'diffs' of each axis"""31 global gest_last_x, gest_last_y, gest_last_z32 gest_last_x = lis_axis_x33 gest_last_y = lis_axis_y34 gest_last_z = lis_axis_z35def gesture_poll_10ms():36 """Called by system every 10ms"""37 global gesture_debounce38 # Poll the accelerometer39 lis_read()40 41 # Debounce gesture detection42 if gesture_debounce:43 gesture_debounce -= 144 else:45 #print lis_axis_x, ",", lis_axis_y, ",", lis_axis_z46 #print "dxyz=", lis_axis_x - rps_last_x, ",",lis_axis_y - rps_last_y, ",",lis_axis_z - rps_last_z47 48 # Detect GESTURE_DOWN event49 dz = lis_axis_z - gest_last_z50 if dz > 6000:51 if gesture_cb:52 gesture_cb(GESTURE_DOWN)53 gesture_debounce = 2054 55 gesture_update_accel()56 57def gesture_set_zero_g():58 """Configure accelerometer to detect zero-g condition and set interrupt"""59 lis_int1(0x95, 0x10, 0x10)60 61def gesture_set_high_g():62 """Configure accelerometer to detect high-g condition and set interrupt"""63 lis_int1(0x2a, 0x30, 0x00)64 65def gesture_set_double_tap():66 """Configure accelerometer to detect double tap and set interrupt"""67 lis_ctrl(1, DT_CFG1)68 lis_ctrl(2, DT_CFG2)69 lis_ctrl(3, DT_CFG3)70 lis_tap_cfg(DT_SRC, DT_THRESHOLD, DT_CFG, DT_LIMIT, DT_LATENCY, DT_WINDOW)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!