Best Python code snippet using playwright-python
face_app.py
Source:face_app.py
...125 self.is_crashed = False126 self.in_queue = in_queue127 self._init_derived()128129 def _on_crash(self):130 exc_type, exc_value, exc_traceback = sys.exc_info()131 error('\n'.join(['Thread Exception: {}'.format(threading.current_thread())] + list(traceback.format_tb(exc_traceback, limit=32)) + [exc_type.__name__+': '+str(exc_value),]))132 self.is_crashed = True133134 def _init_derived(self):135 raise NotImplementedError('VisionCore::__init_derived() must be implemented in derived class')136137 def run(self):138 raise NotImplementedError('VisionCore::run() must be implemented in derived class')139140 # This is a template. Every thread must implement run() and modify this based on their jobs.141 while True:142 if self.is_crashed:143 # The thread is broken; do nothing...144 # or restart the thread in main thread, if its job is so important.145 time.sleep(1)146 continue147 148 if not self.is_standby:149 # Do initialization here...150 self.is_standby = True151 continue152153 try:154 t_now = time.time() * 1000 # Timing is essential for threading. It's a good practice to keep track of processing time for each thread.155 task = self.in_queue.get() # queue::get() is blocking by default156157 # Do its job...158159 self.in_queue.task_done() # Must be called or the queue will be fulled eventually160 except:161 self._on_crash()162163164class FaceEmbeddingAgent():165 """166 FaceEmbeddingAgent is an agent-specific object that stores required data for face embedding registering,167 a process that samples face images, checks if the face is already in database of known faces, add the168 new faces to database, and create KD tree of the faces to be searched.169 """170171 def __init__(self, agent_id, thread):172 self.agent_id = agent_id173 174 # Prepare directory175 self.safe_id = base64.urlsafe_b64encode(self.agent_id.encode()).decode()176 if not os.path.isdir(FACE_RECOGNITION_DIR):177 pathlib.Path(FACE_RECOGNITION_DIR).mkdir(parents=True, exist_ok=True)178 self.dir_embeddings = FACE_RECOGNITION_DIR + '/' + self.safe_id179 if not os.path.isdir(self.dir_embeddings):180 pathlib.Path(self.dir_embeddings).mkdir(parents=True, exist_ok=True)181 self.fn_embeddings = self.dir_embeddings + '/embeddings.json'182183 self.thread = thread184 self.t_last_frame = 0185 self.is_registering = False186187 self.t_update_face_tree = 0188 self.t_save_face_tree = 0189 self.t_save_face_images = 0190191 self.face_embeddings_candidate = {192 'embeddings': [],193 'rectangles': [],194 'face_images': [],195 'tree': None,196 }197198 self.face_names_cluster = []199 self.face_embeddings_cluster = []200 self.face_images_cluster = []201 self.face_tree_cluster = None202 self.names_to_index = {}203 self.is_tree_dirty = False204 self.is_tree_outsync = False205206 def append_cluster(self, index, embedding, image):207 # Update an existing cluster with latest face embeddings that match the cluster208 #print('append_cluster', index, len(embedding), len(image))209 self.face_embeddings_cluster[index].append(embedding)210 if index < len(self.face_images_cluster):211 self.face_images_cluster[index].append((np.array(image)*255).astype(dtype=np.uint8))212213 # Mix initial samples and appending samples, while keeping at least FACE_EMBEDDING_SAMPLE_SIZE of initial samples214 if len(self.face_embeddings_cluster[index]) > FACE_EMBEDDING_SAMPLE_SIZE * 5:215 if index < len(self.face_images_cluster) and len(self.face_images_cluster[index])>=len(self.face_embeddings_cluster[index]):216 # Shuffle while keeping initial FACE_EMBEDDING_SAMPLE_SIZE samples217 embeddings_shuffle_trim = self.face_embeddings_cluster[index][FACE_EMBEDDING_SAMPLE_SIZE::]218 images_shuffle_trim = self.face_images_cluster[index][FACE_EMBEDDING_SAMPLE_SIZE:FACE_EMBEDDING_SAMPLE_SIZE+len(embeddings_shuffle_trim)]219 embeddings_shuffle_trim, images_shuffle_trim = shuffle(embeddings_shuffle_trim, images_shuffle_trim)220 # Trim 2/5 samples (keep 3/5)221 self.face_embeddings_cluster[index] = self.face_embeddings_cluster[index][0:FACE_EMBEDDING_SAMPLE_SIZE] + embeddings_shuffle_trim[0:FACE_EMBEDDING_SAMPLE_SIZE*2]222 self.face_images_cluster[index] = self.face_images_cluster[index][0:FACE_EMBEDDING_SAMPLE_SIZE] + images_shuffle_trim[0:FACE_EMBEDDING_SAMPLE_SIZE*2]223 else:224 embeddings_shuffle_trim = self.face_embeddings_cluster[index][FACE_EMBEDDING_SAMPLE_SIZE::]225 embeddings_shuffle_trim = shuffle(embeddings_shuffle_trim)226 self.face_embeddings_cluster[index] = self.face_embeddings_cluster[index][0:FACE_EMBEDDING_SAMPLE_SIZE] + embeddings_shuffle_trim[0:FACE_EMBEDDING_SAMPLE_SIZE*2]227228 self.is_tree_dirty = True229230 def register_new_cluster(self, cluster, images):231 # A new face is found232 info('register_new_cluster, shape: {}'.format(np.array(cluster).shape))233 name = str(uuid.uuid4()) # Assign a random new name234 self.names_to_index[name] = len(self.face_embeddings_cluster)235 self.face_embeddings_cluster.append(cluster)236 self.face_names_cluster.append([name, len(cluster)])237238 self.face_images_cluster.append((np.array(images)*255).astype(dtype=np.uint8).tolist()[0:len(self.face_embeddings_cluster)])239 240 while len(self.face_embeddings_cluster) > FACE_RECOGNITION_REMEMBER: # Maximum of faces to remember241 name = self.face_names_cluster.pop(0)242 self.face_embeddings_cluster.pop(0)243 self.face_names_cluster.pop(name)244245 if len(self.face_images_cluster) > len(self.face_embeddings_cluster): self.face_images_cluster.pop(0)246247 self.is_tree_dirty = True248249 def check_update_tree(self):250 #print('check_update_tree', self.is_tree_dirty)251 try:252 t_now = time.time() * 1000253 if self.is_tree_dirty:254255 print('check_update_tree', len(self.face_embeddings_cluster), len(self.face_images_cluster))256 # Flatten the list of lists with varying length257 emb_flatten = []258 #print('names', self.face_names_cluster)259 for index, cluster in enumerate(self.face_embeddings_cluster):260 self.face_names_cluster[index][1] = len(self.face_embeddings_cluster[index])261262 for index, cluster in enumerate(self.face_embeddings_cluster):263 # Flatten cluster so it can be fed to scipy.spatial.KDTree()264 for emb in cluster:265 emb_flatten.append(emb)266267 info('Tree updated, identities: '.format(self.face_names_cluster))268 self.face_tree_cluster = scipy.spatial.KDTree(np.array(emb_flatten).reshape(-1, 128))269 FaceApplications().tree_updated(self.agent_id, self.face_tree_cluster, self.face_names_cluster)270 self.is_tree_outsync = True271 if self.t_save_face_tree==0: self.t_save_face_tree = t_now + INTERVAL_FACE_SAVE272273 self.is_tree_dirty = False274275 if self.is_tree_outsync:276 debug('Check saving embeddings to file... {}'.format(t_now-self.t_save_face_tree))277 if self.t_save_face_tree > 0 and t_now > self.t_save_face_tree:278 print()279 print()280 print(threading.current_thread(), 'Saving embeddings to file...', len(self.face_images_cluster))281 debug('Saving embeddings to file... {}'.format(len(self.face_images_cluster)))282 # Save registered faces as files283 # This is for debugging and has significant impact on performance284 for index, images in enumerate(self.face_images_cluster):285 name, count = self.face_names_cluster[index]286 dir = self.dir_embeddings + '/' + name287 if not os.path.isdir(dir):288 os.makedirs(dir)289 debug('images, name: {}, images: {}'.format(name, len(images)))290 for f_seq, f_img in enumerate(images):291 #img = (np.array(f_img)*255).astype(dtype=np.uint8)292 f_img = np.array(f_img, dtype=np.uint8)293 img = cv2.cvtColor(f_img, cv2.COLOR_RGB2BGR)294 cv2.imwrite(dir+'/'+str(f_seq).zfill(4)+'.jpg', img)295296 json_obj = {297 'embeddings': self.face_embeddings_cluster,298 'names': self.face_names_cluster,299 'images': self.face_images_cluster,300 'names_to_index': self.names_to_index,301 }302 if FaceApplications().file_write(self.fn_embeddings, json.dumps(json_obj, cls=NumpyEncoder)):303 self.is_tree_outsync = False304305 self.t_save_face_tree = 0306 except:307 self.thread._on_crash()308 #exc_type, exc_value, exc_traceback = sys.exc_info()309 #error('\n'.join(['Thread Exception: {}'.format(threading.current_thread())] + list(traceback.format_tb(exc_traceback, limit=32)) + [exc_type.__name__+': '+str(exc_value),]))310311 def restore_embeddings(self):312 # Load face embeddings from disk and update search tree313 print(threading.current_thread(), 'Restoring embeddings from file...')314 try:315 with open(self.fn_embeddings, 'r') as fr:316 json_obj = json.loads(fr.read())317 self.face_embeddings_cluster = json_obj['embeddings']318 self.face_names_cluster = json_obj['names']319 if 'images' in json_obj: self.face_images_cluster = json_obj['images']320 self.names_to_index = json_obj['names_to_index']321 self.is_tree_dirty = True322 self.check_update_tree()323 except FileNotFoundError:324 pass325 print(threading.current_thread(), 'Embeddings restored.')326327class FileWritingThread(ThreadBase):328 def _init_derived(self):329 pass330331 def run(self):332 while True:333 if self.is_crashed:334 # The thread is broken; do nothing...335 # or restart the thread in main thread, if its job is so important.336 time.sleep(1)337 continue338 339 if not self.is_standby:340 # Do initialization here...341 self.is_standby = True342 continue343344 try:345 t_now = time.time() * 1000 # Timing is essential for threading. It's a good practice to keep track of processing time for each thread.346 task = self.in_queue.get() # queue::get() is blocking by default347348 if task.write is not None:349 filepath = task.write['filepath']350 content = task.write['content']351 with open(filepath, 'w') as fw:352 fw.write(content)353 info('{} bytes of data written to file {} succesfully'.format(len(content), filepath))354355 self.in_queue.task_done() # Must be called or the queue will be fulled eventually356 except:357 self._on_crash()358359class FaceEmbeddingThread(ThreadBase):360 """361 FaceEmbeddingThread:362 1. Aggregate face embeddings363 2. Group them with intersection of union (IOU) as clusters364 3. Search each cluster in database of known faces365 4. If not found, register the cluster to database of known faces366 """367368 def _init_derived(self):369 self.agents = {}370 #print(threading.current_thread(), 'FaceEmbeddingThread::_init_derived()')371372 def run(self):373 while True:374 if self.is_crashed:375 # The thread is broken; do nothing376 time.sleep(1)377 continue378379 if not self.is_standby:380 # Do initialization here...381 self.is_standby = True382 continue383 384 try:385 # Get new face embeddings and update KDTree386 task = self.in_queue.get() # queue::get() is blocking by default387388 t_now = time.time() * 1000389390 if t_now > task.t_expiration:391 warning('Skip outdated embedding, delay: {}, embeddings: {}'.format(t_now-task.t_expiration, (task.embeddings is not None)))392 pass393 394 elif task.embeddings:395 agent_id = task.embeddings['agent']['agentId']396 if agent_id not in self.agents:397 self.agents[agent_id] = FaceEmbeddingAgent(agent_id, self)398 self.agents[agent_id].restore_embeddings()399 agent = self.agents[agent_id]400401 embeddings = sklearn.preprocessing.normalize(task.embeddings['recognition']['embeddings'])402 names = task.embeddings['names']403 confidences = task.embeddings['confidences']404 face_images = task.embeddings['recognition']['images']405 rectangles = task.embeddings['recognition']['rectangles']406 is_registering = False407 if task.embeddings['recognition']['mode']=='register': is_registering = True408 debug('Is registering, {}, {}'.format(is_registering, task.embeddings['recognition']['mode']))409410 #print('queried names', names)411 #print('get embedding', len(embeddings), len(face_images), len(rectangles))412413 candidate = agent.face_embeddings_candidate414415 if is_registering and not agent.is_registering:416 agent.t_update_face_tree = t_now + INTERVAL_FACE_REGISTER417 info('Engage registering... {}, {}'.format(agent.safe_id, agent.t_update_face_tree))418 # Reset candidates when engaging a new session of registering, to prevent mixing faces of different identities419 candidate['embeddings'] = []420 candidate['face_images'] = []421 candidate['rectangles'] = []422 agent.is_registering = is_registering423424 for emb_i, emb in enumerate(embeddings):425 if names[emb_i] and confidences[emb_i]>(1-FACE_EMBEDDING_THRESHOLD_LOW): # Threshold checking is addded to prevent mixing faces from different identities426 # The face is found in registered faces; append to the existing cluster427 index = agent.names_to_index[names[emb_i]]428 #print('registered faces', names[emb_i], len(agent.face_names_cluster), len(self.face_embeddings_cluster), len(self.face_images_cluster))429 agent.append_cluster(index, emb, face_images[emb_i])430 debug('Append latest face embeddings, n: {}, c: {}'.format(names[emb_i], confidences[emb_i]))431 else:432 # The face is unknown433 if is_registering:434 candidate['embeddings'].append(emb)435 candidate['face_images'].append(face_images[emb_i])436 candidate['rectangles'].append(rectangles[emb_i])437438 if not is_registering:439 # Update appended samples of existing identity to keep the record up to date440 if agent.t_update_face_tree==0:441 agent.t_update_face_tree = t_now + INTERVAL_FACE_REGISTER * 4 # Querying mode updates tree less frequently442 elif t_now > agent.t_update_face_tree:443 agent.check_update_tree()444 agent.t_update_face_tree = 0445446 else:447 # Each agent samples a limit number of faces for candidates to be registered448 candidate['embeddings'], candidate['face_images'], candidate['rectangles'] = shuffle(candidate['embeddings'], candidate['face_images'], candidate['rectangles'])449 n_keep = FACE_EMBEDDING_SAMPLE_SIZE * FACE_RECOGNITION_CONCURRENT * 2450 candidate['embeddings'] = candidate['embeddings'][0:n_keep]451 candidate['face_images'] = candidate['face_images'][0:n_keep]452 candidate['rectangles'] = candidate['rectangles'][0:n_keep]453454 #print(threading.current_thread(), 'CHECK UPDATE FACE TREE', t_now-self.t_update_face_tree)455 # Update KD Tree only every X seconds456 debug('Check registering faces, time_delta: {}, {}'.format(t_now-agent.t_update_face_tree, agent.t_update_face_tree))457 if agent.t_update_face_tree==0:458 agent.t_update_face_tree = t_now + INTERVAL_FACE_REGISTER459 debug('Signal faces registering')460 elif t_now > agent.t_update_face_tree:461 debug('Check registering new identity... {}'.format(agent.safe_id))462 # Group face embeddings into clusters463464 # 20180419 Lee noted: query_ball_tree() of scipy returns a lot of duplicated clusters and the result is also suboptimal465 # Use IOU instead to group candidates466 #self.face_tree_register = scipy.spatial.KDTree(register['embeddings'])467 #clusters = self.face_tree_register.query_ball_tree(self.face_tree_register, FACE_EMBEDDING_THRESHOLD_LOW)468 clusters = imutil.group_rectangles_miniou(candidate['rectangles'], threshold=0.3)469 if clusters is not None:470 #print(clusters)471 # Sorting does nothing right now; all clusters are processed, independently472 #clusters.sort(key=len, reverse=True)473 #print('sorted', clusters)474 for ci, new_cluster in enumerate(clusters):475 if len(new_cluster) >= FACE_EMBEDDING_SAMPLE_SIZE:476 len_ = len(new_cluster)477478 # Discard clusters with faces at different locations; this has no use for IOU clustering479 """ # This is required only for clusters from query_ball_tree() of scipy, which is no longer used due to slow performance and suboptimal results480 discard = False481 for fi1 in new_cluster:482 for fi2 in new_cluster:483 if fi1!=fi2:484 r1 = candidate['rectangles'][fi1]485 r2 = candidate['rectangles'][fi2]486 iou = imutil.calc_iou(r1, r2)487 if iou < 0.5:488 #print('small IOU, discard', iou)489 discard = True490 break491 if discard: break492 if discard: continue493 """494 debug('The cluster is valid, {}, len: {}, new: {}, r: {}'.format(ci, len(new_cluster), new_cluster, candidate['rectangles']))495496 # Convert index to 128-bytes embedding497 face_images = []498 for i, member in enumerate(new_cluster):499 new_cluster[i] = candidate['embeddings'][member]500 face_images.append(candidate['face_images'][member])501 502 #print('cluster', new_cluster)503 if agent.face_tree_cluster is not None:504 d = []505 new_cluster_ = []506 for emb_ in new_cluster:507 distance, index = agent.face_tree_cluster.query(emb_)508 if distance > 0.: # Sample with distance 0.0 is invalid509 #print('name and distance', self.face_names[index], distance)510 d.append(distance)511 new_cluster_.append(emb_)512 #print('distance stats', len(d), d)513 if len(d) >= FACE_EMBEDDING_SAMPLE_SIZE:514 d_mean = np.mean(d)515 if d_mean < FACE_EMBEDDING_THRESHOLD_LOW:516 # The face is already registered517 info('Face already registered, d_mean: {}, d: {}'.format(d_mean, d))518 pass519 else:520 # A new face is found521 agent.register_new_cluster(new_cluster_, face_images)522 else:523 # The tree is empty, register this cluster524 agent.register_new_cluster(new_cluster, face_images)525 #print('end of one cluster iteration', ci)526 #print('exit cluster iterations')527528 agent.check_update_tree()529 agent.t_update_face_tree = 0530531 else:532 warn('The task is invalid: {}'.format(task))533534 self.in_queue.task_done()535 536 except:537 self._on_crash()538539class FaceDetectionThread(ThreadBase):540 """ The singleton that manages all detection """541542 def _init_derived(self):543 self.tree_queue = Queue()544 545 def run(self):546 while True:547 if self.is_crashed:548 # The thread is broken; do nothing549 time.sleep(1)550 continue551552 if not self.is_standby:553 # Do initialization here...554 # Initialize required models and sessions555 info('THREAD INITIALIZATION: {}'.format(threading.current_thread()))556 self.is_standby = True557 continue558559 try:560 # Get task from input queue561 task = self.in_queue.get() # queue::get() is blocking by default562563 t_now = time.time() * 1000564565 is_registering = False566 if 'mode' in task.params['service'] and task.params['service']['mode']=='register':567 is_registering = True568569 agent_id = task.params['agent']['agentId']570 agstate = None571 t_delay = 24 * 60 * 60 * 1000572 if agent_id in FaceApplications().agent_state:573 agstate = FaceApplications().agent_state[agent_id]574 t_delay = t_now - agstate['t_detection']575576 # Do not skip any frame for registration, to improve initial time required to get registration, thus enhancing use experience577 if time.time() > task.t_expiration and not is_registering:578 # Threads are busy and fail to process the task in time579 # Something must be put in output queue for calling thread is waiting with get()580 task.params['output_holder'].put_nowait(None)581 #debug('SKIP, BUSY, {}'.format(time.time()-task.t_expiration))582583 elif agstate and t_delay < LAZY_INTERVAL_SKIP:584 # Rrequest for detection too frequent; ignore this request585 task.params['output_holder'].put_nowait(None)586 #debug('SKIP, TOO FREQUENT, {}, {}, {}'.format(t_delay, LAZY_INTERVAL_SKIP, agstate['t_detection']))587588 else:589 #print(threading.current_thread(), 'queue to exec', time.time()*1000 - task.t_queued)590 #print(task.params)591592 # Lazy detection mechanism593 lazy_level = LAZY_USE_NONE594595 # MTCNN detection with ROI596 mapping = None597 if agstate and t_delay < LAZY_INTERVAL_ROI:598 task.params['lazy'] = agstate['predictions']599 predictions = self.detect(task.img, task.params)600 task.params.pop('lazy', None)601602 if 'rectangles' in predictions and len(predictions['rectangles']) and 'predictions' in agstate and len(agstate['predictions']['rectangles']):603 mapping = self.get_rect_mapping(predictions['rectangles'], agstate['predictions']['rectangles'], LAZY_IOU)604605 """606 Lazy detection (and recognition) mechanism use a conservative strategy that...607 1. To validate previous detection, all faces in previous frame must be succesfully tracked in current frame.608 2. Tracking in rule 1 means that 2 face rectangles in consecutive frames have IOU < LAZY_IOU609 3. To validate previous recognition, all recognized faces in previous frame must be tracked successfully.610 """611612 is_all_recognized_tracked = False613 if mapping is not None:614 # Lazy detection with ROIs615 debug('ROI detection, recognition data: {}'.format(('recognition' in predictions)))616 lazy_level = LAZY_USE_ROI617 if not is_registering and t_delay < LAZY_INTERVAL_RECOGNITION:618 is_all_recognized_tracked = self.check_lazy_recognition(t_now, task, agstate, predictions, mapping)619 if 'identities' in predictions: debug('check_lazy_recognition {}'.format(predictions['identities']))620 else:621 # Full frame detection622 predictions = self.detect(task.img, task.params)623 debug('Full frame detection, recognition data: {}'.format(('recognition' in predictions)))624625 # Check if there's any recognized identity; recognition will be performed regardlessly if there's no recognized identity626 if 'identities' in predictions:627 if np.amax(predictions['identities']['confidence']) <= 0.1:628 # Discard previous recognition results if confidence too low629 predictions.pop('identities', None)630631 # Registration always use up-to-date recognition results to prevent mixing of face from different identities632 if 'identities' not in predictions:633 self.extract_face_embedding(predictions)634 if 'recognition' in predictions and len(predictions['recognition']['rectangles']):635 debug('Perform embedding matching {}'.format(len(predictions['recognition']['embeddings'])))636 #print('embeddings', predictions['embeddings'])637 names_, confidences_ = FaceApplications().query_embedding(task.params['agent']['agentId'], predictions['recognition']['embeddings'])638639 # Register faces only on request to prevent false positive new face registering640 task_embeddings = {641 'agent': task.params['agent'],642 'names': names_,643 'confidences': confidences_,644 'recognition': predictions['recognition'],645 }646 FaceApplications().register_embedding(task_embeddings)647648 n_rect = len(predictions['rectangles'])649 names = [''] * n_rect650 confidences = [0.] * n_rect651 for s_index, a_index in enumerate(predictions['recognition']['index']):652 names[a_index] = names_[s_index]653 confidences[a_index] = confidences_[s_index]654 predictions['identities'] = {655 'name': names,656 'confidence': confidences,657 }658659 else:660 if 'recognition' in predictions:661 debug('NO RECTANGLE FOR RECOGNITION')662 else:663 debug('NO RECOGNITION DATA')664665 is_fresh_results = True666667 if 'recognition' in predictions:668 predictions.pop('recognition', None) # embeddings are not supposed to be returned to client669670 # Keep results in memory for lazy detection. This must be before 2nd pass lazy recognition to prevent resursive use of lazy results671 FaceApplications().on_predictions(task.params['agent'], t_now, predictions)672673 debug('FaceDetectionThread task done, elapsed: {}, thread: {}'.format(time.time()*1000-task.t_queued, threading.current_thread()))674 task.params['output_holder'].put_nowait({'predictions': predictions})675676 self.in_queue.task_done()677678 except:679 self._on_crash()680681 def get_rect_mapping(self, rlist1, rlist2, threshold):682 """683 This function maps 2 lists of rectangles (as in 2 consecutive video frames) with their IOU to find tracked rectangles.684 685 Arguments:686 rlist1 {List} -- List of rectangles (x, y, w, h)687 rlist2 {List} -- List of rectangles (x, y, w, h)688 threshold {float} -- IOU of 2 rectangles must be higher than this to be considered being successfully tracked689 690 Returns:691 List -- List of pairs of index for tracked rectangles692 """693694 #debug('get_rect_mapping, {}, {}'.format(rlist1, rlist2))695 len_frame_now = len(rlist1)696 len_frame_last = len(rlist2)697 698 if len_frame_now and len_frame_last and len_frame_now >= len_frame_last:699 frame_id = []700 frame_id = frame_id + [0]*len_frame_now701 frame_id = frame_id + [1]*len_frame_last702703 mapping = {}704705 rectangles = rlist1 + rlist2706 clusters = imutil.group_rectangles_miniou(rectangles, threshold=threshold)707 #debug('Check IOU, clusters: {}, frame_id: {}'.format(clusters, frame_id))708 name = [''] * len_frame_now709 confidence = [0.] * len_frame_now710 #debug('Initialize name and confidence {} {} {}'.format(len_frame_now, name, confidence))711 for pair in clusters:712 if len(pair)==2: # Only one-one overlapping is considered same face713 index_now = pair[0]714 index_last = pair[1]715 if frame_id[index_now]!=frame_id[index_last]: # Overlapped rectangles not in the same frame716 # Rectangles IOU overlapped717 index_last = pair[1] - len_frame_now718 mapping[index_now] = index_last719 720 if len(mapping)==len_frame_now:721 # All rectangles can be mapped to previous rectangles722 #debug('All rectangles are tracked {}'.format(mapping))723 return mapping724725 #debug('Not all rectangles are tracked, now: {}, prev: {}'.format(len_frame_now, len_frame_last))726 return None727728 def check_lazy_recognition(self, t_now, task, agstate, predictions, mapping):729 # Use previous face recognition results730 if agstate is not None and mapping is not None:731 if 'identities' in agstate['predictions']:732 t_recognition = agstate['t_recognition']733 #debug('Check lazy recognition, t_recognition: {}, interval_recognition: {}'.format(t_now-t_recognition, LAZY_RECOGNITION))734735 confidence_decay = 1. #(t_now-t_recognition) / (LAZY_RECOGNITION)736 name = []737 confidence = []738 for i in range(len(predictions['rectangles'])):739 if mapping[i] < len(agstate['predictions']['identities']['name']):740 name.append(agstate['predictions']['identities']['name'][mapping[i]])741 confidence.append(agstate['predictions']['identities']['confidence'][mapping[i]] * LAZY_CONFIDENCE_DECAY)742 else:743 debug('A face in previous frame is missing')744 # Return False to indicate that not all recognized faces in previous frame are tracked successfully745 return False746747 if 'identities' in predictions:748 for i, c in enumerate(predictions['identities']['confidence']):749 if c <= 0:750 predictions['identities']['name'][i] = name[i]751 predictions['identities']['confidence'][i] = confidence[i]752 debug('Append lazy recognition, {}'.format(predictions['identities']))753 else:754 predictions['identities'] = {755 'name': name,756 'confidence': confidence,757 }758 #debug('Use lazy recognition, {}'.format(predictions['identities']))759760 return True761762 return False763764 def detect(self, img, params):765 """766 Input: pixel array, shape (h, w)767 Ouput: list of rectangles of objects, shape (count, y, x, h, w)768 """769 predictions = {770 'rectangles': [],771 'confidences': [],772 'sort_index': [],773 'landmarks': [],774 'emotions': [],775 'recognition': {776 'index': [],777 'rectangles': [],778 'embeddings': [],779 'images': [],780 },781 'timing': {},782 }783784 try:785 if 'service' in params:786 #{'type': 'face', 'options': {'resultsLimit': 5}, 'model': '12-net'}787 service = params['service']788 model = service['model']789790 mode = ''791 if 'mode' in service: mode = service['mode']792793 # Limit image size for performance794 #print('service', service)795 t_ = time.time()796797 # Prepare parameters798 res_cap = 448799 factor = 0.6800 interp = cv2.INTER_NEAREST801 if 'options' in service:802 options = service['options']803 if 'res_cap' in options:804 res_cap = int(options['res_cap'])805 if 'factor' in options:806 factor = float(options['factor'])807 if 'interp' in options:808 if options['interp']=='LINEAR':809 interp = cv2.INTER_LINEAR810 elif options['interp']=='AREA':811 interp = cv2.INTER_AREA812 if factor > 0.9: factor = 0.9813 elif factor < 0.45: factor = 0.45814 # For performance reason, resolution is hard-capped at 800, which is suitable for most applications815 if res_cap > 800: res_cap = 800816 elif res_cap <= 0: res_cap = 448 # In case client fail to initialize res_cap yet included options in request817 #print('options', factor, interp)818 819 # This is a safe guard to avoid very large image,820 # for best performance, client is responsible to scale the image before upload821 resized, scale_factor = imutil.fit_resize(img, maxsize=(res_cap, res_cap), interpolation=interp)822 scale_factor = 1. / scale_factor823 predictions['timing']['fit_resize'] = (time.time() - t_) * 1000824825 #print('mean', np.mean(img), np.mean(resized))826827 #time_start = time.time()828 #img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)829 # MTCNN face detection830 t_ = time.time()831 detector = FaceApplications().get_detector_create()832 pnet = detector['pnet']833 rnet = detector['rnet']834 onet = detector['onet']835836 if 'lazy' in params and len(params['lazy']['rectangles']):837 # Lazy detection level 1 performs MTCNN detection only in area with previously detected faces838 grouped_extents = np.empty((0, 5))839 grouped_landmarks = np.empty((0, 5, 2))840 height, width, *_ = resized.shape841842 # Detect only area with previously detected faces within time interval T < LAZT_DETECTION843 # Convert rectangles back to extents844 extents = np.array(params['lazy']['rectangles'], dtype=np.float) / scale_factor845 extents.resize((extents.shape[0]+1, extents.shape[1])) # Resize for adding a central ROI later846847 # Expand extents848 padding = 20849 for i, e in enumerate(extents):850 extents[i][2] += e[0]851 extents[i][3] += e[1]852 w = e[2] - e[0]853 h = e[3] - e[1]854 extents[i] += np.array((-w, -h, w, h), dtype=np.float)855 e = extents[i]856857 # Limit extents in the range of pixel array858 extents[i] = np.array((max(e[0], padding), max(e[1], padding), min(e[2], width-padding), min(e[3], height-padding)), dtype=np.float)859860 # Always detect central area of the pixel array861 qheight = height // 4862 qwidth = width // 4863 extents[-1] = np.array([qwidth, qheight, width-qwidth, height-qheight], dtype=np.float)864865 # Group overlapped extents866 for iteration in range(16):867 no_overlap = True868 for i1, e1 in enumerate(extents):869 if e1[2]==0: continue870 for i2, e2 in enumerate(extents):871 if e2[2]==0: continue872 if i1!=i2:873 if e2[0]>e1[2] or e2[1]>e1[3] or e2[2]<e1[0] or e2[3]<e1[1]:874 pass875 else:876 extents[i1] = np.array((min((e1[0], e2[0])), min((e1[1], e2[1])), max((e1[2], e2[2])), max((e1[3], e2[3]))), dtype=np.float)877 extents[i2] = np.array((0, 0, 0, 0), dtype=np.float)878 no_overlap = False879 if no_overlap: break880881 #print(type(extents[0]))882 #print('group', extents)883 predictions['timing']['prepare_roi'] = (time.time() - t_) * 1000884 t_ = time.time()885 for i, e in enumerate(extents):886 # For debugging...887 r_ = (np.array([e[0], e[1], e[2]-e[0], e[3]-e[1]])*scale_factor).astype(dtype=np.int).tolist()888 if 'roi' not in predictions: predictions['roi'] = []889 predictions['roi'].append(r_)890891 offset_ = np.array([e[0], e[1], e[0], e[1], 0])892 if e[2] > 0: # Validate ROI must has width > 0893 e = e.astype(dtype=np.int)894 roi = resized[e[1]:e[3], e[0]:e[2], :]895 _extents, _landmarks = FaceDetector.detect_face(roi, 40, pnet, rnet, onet, threshold=[0.6, 0.7, 0.9], factor=factor, interpolation=interp)896 if len(_extents):897 _extents += offset_898 _landmarks += offset_[0:2]899 grouped_extents = np.concatenate((grouped_extents, _extents))900 grouped_landmarks = np.concatenate((grouped_landmarks, _landmarks))901902 extents = grouped_extents903 landmarks = grouped_landmarks904 905 else:906 # Detect whole frame907 extents, landmarks = FaceDetector.detect_face(resized, 40, pnet, rnet, onet, threshold=[0.6, 0.7, 0.9], factor=factor, interpolation=interp)908909 predictions['timing']['mtcnn'] = (time.time() - t_) * 1000910911 if len(landmarks):912 landmarks = np.array(landmarks) * scale_factor913914 if model=='a-emoc':915 facelist = np.zeros((len(extents), 48, 48), dtype=np.float32)916 predictions['timing']['emoc_prepare'] = 0917 elif model=='fnet':918 aligned_face_list = np.zeros((len(extents), 160, 160, 3), dtype=np.float32)919 pass920 921 #print()922 #print(extents)923 sorting_index = {}924 for i, e in enumerate(extents):925 #e_ = (Rectangle(r_[0], r_[1], r_[2], r_[3]).to_extent().eval() * scale_factor).astype(dtype=np.int).tolist()926 r_ = (np.array([e[0], e[1], e[2]-e[0], e[3]-e[1]])*scale_factor).astype(dtype=np.int).tolist()927 #print('extents', i, e, r_)928 predictions['rectangles'].append(r_)929 predictions['confidences'].append(e[4])930 plist_ = landmarks[i].astype(dtype=np.int).tolist()931 predictions['landmarks'].append(plist_)932 if model=='a-emoc':933 t_ = time.time()934 #(x, y, w, h) = imutil.rect_fit_ar(r_, [0, 0, img.shape[1], img.shape[0]], 1., crop=False)935 (x, y, w, h) = imutil.rect_fit_points(landmarks[i], )936 r_ = np.array([x, y, w, h]).astype(dtype=np.int).tolist()937 938 # Return landmarks-corrected reactangles instead of MTCNN rectangle.939 # The rectangles are used to subsample faces for emotion recognition.940 predictions['rectangles'][-1] = r_941942 (x, y, w, h) = r_943 face = np.zeros((h, w, 3), dtype=np.float32)944 y_ = 0945 x_ = 0946 if y + h > img.shape[0]:947 h = img.shape[0] - y948 elif y < 0:949 y_ = -y950 y = 0951 h = h - y_952 if x + w > img.shape[1]:953 w = img.shape[1] - x954 elif x < 0:955 x_ = -x956 x = 0957 w = w - x_958 face[y_:y_+h, x_:x_+w, :] = img[y:y+h, x:x+w, :]959960 face = cv2.resize(face, (48, 48), interpolation = interp)961 face = (face[:, :, 0:1] * 0.2126 + face[:, :, 1:2] * 0.7152 + face[:, :, 2:3] * 0.0722).reshape((48, 48))962 #face_write = (face * 255.).astype(np.uint8)963 #cv2.imwrite('./face'+str(i).zfill(3)+'.jpg', face_write)964 facelist[i:i+1, :, :] = face965 predictions['timing']['emoc_prepare'] += (time.time() - t_) * 1000966 elif model=='fnet':967 aligned = FaceApplications.align_face_fnet(img, landmarks[i])968 aligned_face_list[i, :, :, :] = aligned969 #print('aligned', aligned.shape)970 971 if e[0] <= 0 or e[1] <= 0 or e[2] >= img.shape[1]-1 or e[3] >= img.shape[0]-1:972 sorting_index[i] = 0973 else:974 sorting_index[i] = e[4] * r_[2] * r_[3]975 976 predictions['sort_index'] = sorting_index977978 # Sort faces by sorting_index and select first N faces for computation intensive operations, e.g., facenet embedding extraction979 better_faces = sorted(sorting_index, key=sorting_index.get, reverse=True)980 better_faces = better_faces[0:FACE_RECOGNITION_CONCURRENT]981 better_aligned_face_list = np.zeros((len(better_faces), 160, 160, 3), dtype=np.float32)982 better_rectangles = []983 for better_face_index in range(len(better_faces)):984 o_index = better_faces[better_face_index]985 better_aligned_face_list[better_face_index, :, :, :] = aligned_face_list[o_index, :, :, :]986 better_rectangles.append(predictions['rectangles'][o_index])987 debug('Sort faces, s: {}, i: {}, r: {}'.format(sorting_index, better_faces, better_rectangles))988989 if model=='a-emoc':990 t_ = time.time()991 emoc_ = FaceApplications().get_emotion_classifier_create()992 emotions = emoc_.predict(facelist)993 predictions['emotions'] = emotions994 predictions['timing']['emoc'] = (time.time() - t_) * 1000995 996 elif model=='fnet':997 # Prepare information for extracting face embedding but don't perform facenet forwarding yet998 predictions['recognition']['prepare'] = {999 'better_aligned_face_list': copy.deepcopy(better_aligned_face_list),1000 'better_faces': copy.deepcopy(better_faces),1001 'better_rectangles': copy.deepcopy(better_rectangles),1002 }10031004 predictions['recognition']['mode'] = mode1005 #print()10061007 else:1008 # Nothing is requested; useful for measuring overhead1009 pass10101011 except:1012 self._on_crash()10131014 return predictions10151016 def extract_face_embedding(self, predictions):1017 if 'recognition' in predictions and 'prepare' in predictions['recognition']:1018 t_ = time.time()10191020 facenet_ = FaceApplications().get_facenet_create()10211022 better_aligned_face_list = predictions['recognition']['prepare']['better_aligned_face_list']1023 better_rectangles = predictions['recognition']['prepare']['better_rectangles']1024 better_faces = predictions['recognition']['prepare']['better_faces']10251026 # Perform facenet forwarding to extract face embedding
...
TouchstoneMonitor.py
Source:TouchstoneMonitor.py
...639 self.sessionCompleted = True640 if msg != '':641 self.alert(msg)642 return not self.sessionCompleted643 def _on_crash(self):644 """645 Handles Touchstone crashes646 """647 if self.abort:648 return649 if not self.sessionInited:650 self._close_sub_console()651 msg = "\nERROR: Could not start running tests. Check given arguments and/or DLL.\n"652 msg += "try to get the core file\n"653 msg += self._get_core_and_back_trace()654 raise BoosterError(__name__, msg)655 total_so_far = self._get_total()656 self.synthesizeFailure('Test crashed.', 'CRASHED')657 msg = "\n\n!!!!!!!!!! CRASHED !!!!!!!!!! {currentSet}-{currentId}\n".format(**self)658 msg += self._get_core_and_back_trace()659 self.consecutiveCrashes += 1660 if self.consecutiveCrashes >= self.maxConsecutiveCrashes:661 msg += '\n\n!!!!!!!!!!!!!!!!!!!! ABORT !!!!!!!!!!!!!!!!!!! after {} consecutive crashes\n'.format(self.consecutiveCrashes)662 self.abort = True663 self.abortReason = 'too many consecutive crashes'664 self.crashes += 1665 if not self.abort and self.crashes >= self.maxCrashes:666 msg += '\n\n!!!!!!!!!!!!!!!!!!!! ABORT !!!!!!!!!!!!!!!!!!! after {} accumulated crashes\n'.format(self.crashes)667 self.abort = True668 self.abortReason = 'too many crashes'669 if total_so_far == 0 and self.QUICK_ABORT:670 msg += '\n\n!!!!!!!!!!!!!!!!!!!! ABORT !!!!!!!!!!!!!!!!!!! crash on the 1st test case\n'671 self.abort = True672 self.abortReason = 'crash on the 1st test case'673 self.alert(msg)674 if not self.abort:675 self._relaunch_process()676 def _complete_output(self):677 """ Flush SetSummaryCsvLog. """678 ctx_totalcounts = {s: self.totalCounts[s] for s in self.status_items}679 total = ctx_totalcounts['TOTAL']680 suspended = ctx_totalcounts['SUSPENDED']681 not_found = ctx_totalcounts['NOT_FOUND']682 if total - suspended - not_found == 0:683 self.alert('!!!!!!!!!!!!! No test has actually run')684 self.alert(' total: {:10d}'.format(total))685 self.alert(' suspended: {:10d}'.format(suspended))686 if not_found:687 self.alert(' not found: {:10d}'.format(not_found))688 if self.fail_norun:689 self.synthesizeFailure('No test has actually run')690 self.alert(' the whole session is considered failed')691 self.alert(' (feature configurable by _Monitor.fail-norun in env xml)')692 ctx_totalcounts = {s: self.totalCounts[s] for s in self.status_items}693 num_failures = ctx_totalcounts['FAILED']694 current_set_counts = [str(self.currentSetCounts[status]) for status in self.status_items]695 total_counts = [str(self.totalCounts[status]) for status in self.status_items]696 self.touchstone_loggers['SetSummaryCsvLog'].write_log(697 ','.join([str(self.currentSet)] + current_set_counts))698 self.touchstone_loggers['SetSummaryCsvLog'].write_log(699 ','.join(['SUITE SUMMARY'] + total_counts))700 output_summary = \701 """702--------------------------------703Total {TOTAL:10d}704SUCCEED {SUCCEED:10d}705FAILED {FAILED:10d}706FAILED_STARTUP {FAILED_STARTUP:10d}707CLEANUP_FAILED {CLEANUP_FAILED:10d}708RESULT_IGNORED {RESULT_IGNORED:10d}709SUSPENDED {SUSPENDED:10d}710NOT_FOUND {NOT_FOUND:10d}711CRASHED {CRASHED:10d}712TIMEOUT {TIMEOUT:10d}713--------------------------------714End of tests715 """.format(**ctx_totalcounts)716 logger.info("{} Results:".format(self.test_suite_name))717 logger.info(output_summary)718 return ctx_totalcounts719 def _get_executable(self):720 fp = self.touchstone721 self.bin = re.sub(r'.*/', '', fp)722 return self.bin723 def _mk_pathname(self, fname):724 """725 Create a pathname for fname in the path of output_prefix, e.g.726 return output_dir/myfile when output_prefix is output_dir/o727 """728 return os.path.join(self.outputDir, fname)729 def _create_fallback_file(self, path):730 """731 Create a fallback file, touchstone should remove it when it terminates without crash732 Returns: the filename if succeed, or None733 """734 fallback = ''735 if not os.path.isdir(path):736 path = os.path.dirname(path)737 fallback = os.path.join(path, "fallback.dat")738 try:739 with open(fallback, 'w'):740 pass741 if not os.path.isfile(fallback):742 raise Exception("\n\n!!!!!!!!!!!!!!!!!!!! Fallback: fail to create {fallback}".format(fallback))743 except Exception as exc:744 logger.warning("\n\n!!!!!!!!!!!!!!!!!!!! Fallback: fail to create {fallback}. error: \"{error}\". \745 Crash detection might be inaccurate!".format(fallback=fallback, error=str(exc)))746 fallback = ''747 return fallback748 def _get_config_from_xml(self, xml_file_name, config_tag, filter):749 """750 Get configurations from XML file751 Args:752 xml_file_name:753 config_tag: root tag that contains the configuration.754 filter: list of accepted tags755 Returns: Touchstone Monitor configuration in a dictionary format756 """757 try:758 tree = ET.parse(xml_file_name)759 monitor_settings = tree.find(config_tag)760 config = {}761 if monitor_settings is not None and len(monitor_settings):762 for element in monitor_settings:763 # by conventioin, xml tags/attrs use dash (-) to separate words764 # python attributes don't, they use underscore (_) unless using camel-style765 setting = element.tag.replace('-', '_')766 value = element.text767 if setting in filter:768 config[setting] = value769 return config770 except ET.ParseError:771 logger.critical("Failed to parse {}".format(xml_file_name))772 raise773 def _init_environment(self, cwd):774 logger.info("Current working directory: %s" % cwd)775 xmlcfg = self._get_config_from_xml(self.testEnv, '_Monitor',776 self.valid_args.keys())777 # environment variables override xmlcfg778 # NO_BT true|t|1 disable to get backtrace from core779 for ev in ('NO_BT', 'QUICK_ABORT'):780 if ev in os.environ:781 xmlcfg[ev] = os.environ[ev]782 self.validate_config(xmlcfg)783 if not self._get_executable():784 raise Exception("Not executable: %s\n" % self.touchstone)785 if cwd != self.wd and self.wd != '.':786 if not os.path.isdir(self.wd):787 raise Exception("Working directory {wd} doesn't exist".format(**self))788 if not os.access(self.wd, os.X_OK | os.W_OK):789 raise Exception("No permission to access and write in {wd}".format(**self))790 logger.info("Changing current working directory to {wd}".format(**self))791 os.chdir(self.wd)792 self.outputDir = os.path.normpath(os.path.join(self.wd, self.outputPrefix))793 if not os.path.isdir(self.outputDir):794 self.outputDir = os.path.dirname(self.outputDir)795 if self.outputDir == '':796 self.outputDir = '.'797 self.fallback = self._create_fallback_file(self.outputDir)798 self.signal['INT'] = signal.signal(signal.SIGINT, self.signal_handler)799 self.signal['TERM'] = signal.signal(signal.SIGTERM, self.signal_handler)800 if os.name != 'nt':801 self.signal['QUIT'] = signal.signal(signal.SIGQUIT, self.signal_handler)802 # fallback solution to perform procdump (not through tsm.py or env.TOUCHSTONE_COMMAND_PREFIX)803 if platform.system() == 'Windows':804 if self.procdump is None and 'PROCDUMP' in os.environ:805 self.procdump = os.environ['PROCDUMP']806 if '-e' not in self.procdump:807 self.procdump += ' -e'808 else:809 self.procdump = None810 def _create_server(self):811 """ Create a server and bind specified address812 the port can be either assigned by the core (if port == 0)813 or scan from a base port814 return: server,port815 """816 host = 'localhost'817 port = self.get('port', 0)818 if port == 0 and platform.system() == 'HP-UX':819 # hpux has a bug that getsockname() returns (0, (0,0,...))820 # due to a corrupted build of _socket821 # http://grokbase.com/t/python/python-list/11a5mqzmxw/socket-getsockname-is-returning-junk822 #823 # simba internal: DDP-297824 port = 18000825 if port == 0:826 # Let the kernel choose the port number827 # not recommended, because it is a selfish behavior which828 # makes itself always available but potentially block other services829 server = Server((host, port), self)830 ip, port = server.socket.getsockname()831 return server, port832 # otherwise scan from port upto maxPortScan833 max_scan = self.get('maxPortScan', 500)834 for n in range(max_scan):835 try:836 server = Server((host, port), self)837 return server, port838 except socket.error as e:839 try:840 server.close()841 except:842 pass843 port += 1844 raise BoosterError(__name__, "ERROR: Monitor could not find a listen port.\n")845 def _wait_all_sockets_close(self):846 """847 In some cases, loggers will still be active848 even when all tests complete849 This provides a last chance for loggers to send data850 """851 for i in range(self.server_timeout):852 if self.active_loggers > 0:853 asyncore.loop(timeout=1, count=1)854 def _start_touchstone(self):855 """ Launch touchstone binary856 """857 self.server, port = self._create_server()858 self.server.listen(5)859 self.command = self.touchstone.split(' ') + ["-te", self.testEnv, "-ts", self.testSuite,860 "-o", self.outputPrefix, "-n", "1", "-serverip", '127.0.0.1', "-sp", str(port)]861 if self.procdump is not None:862 self.command = self.procdump.split(' ') + ['-x', self.wd] + self.command863 fallback = self.get('fallback', None)864 if fallback is not None and self.fallback != '':865 self.command += ['-fb', self.fallback]866 self.execute(self.command)867 now = time.time()868 self.sessionStartTime = now869 while True:870 # Loop times out after a second871 asyncore.loop(timeout=1, count=1)872 now = time.time()873 if self.active_loggers == 0 and self.sessionInited:874 # delay checking subprocess health875 # as long as there is a socket connection, touchstone is alive876 if not self.is_subprocess_running() and self._is_crash():877 self._on_crash()878 if not self.sessionInited:879 if not self.is_subprocess_running():880 msg = 'Touchstone dies before initialization'881 self.abort = True882 elif now - self.lastTestCaseTime >= self.timeoutBeforeInitialized:883 msg = 'it takes too long for touchstone to initialize ({} seconds), abort...'.format(now - self.lastTestCaseTime)884 self.kill_child(signal.SIGABRT)885 self.abort = True886 if self.abort:887 self.abortReason = msg888 logger.info(msg)889 print(msg)890 else:891 # Handle timeout if touchstone is initialized and is still running...
_page.py
Source:_page.py
...126 lambda params: self.emit(127 Page.Events.Console, from_channel(params["message"])128 ),129 )130 self._channel.on("crash", lambda _: self._on_crash())131 self._channel.on("dialog", lambda params: self._on_dialog(params))132 self._channel.on(133 "domcontentloaded", lambda _: self.emit(Page.Events.DOMContentLoaded)134 )135 self._channel.on(136 "download",137 lambda params: self.emit(138 Page.Events.Download, from_channel(params["download"])139 ),140 )141 self._channel.on(142 "fileChooser",143 lambda params: self.emit(144 Page.Events.FileChooser,145 FileChooser(146 self, from_channel(params["element"]), params["isMultiple"]147 ),148 ),149 )150 self._channel.on(151 "frameAttached",152 lambda params: self._on_frame_attached(from_channel(params["frame"])),153 )154 self._channel.on(155 "frameDetached",156 lambda params: self._on_frame_detached(from_channel(params["frame"])),157 )158 self._channel.on("load", lambda _: self.emit(Page.Events.Load))159 self._channel.on(160 "pageError",161 lambda params: self.emit(162 Page.Events.PageError, parse_error(params["error"]["error"])163 ),164 )165 self._channel.on(166 "popup",167 lambda params: self.emit(Page.Events.Popup, from_channel(params["page"])),168 )169 self._channel.on(170 "request",171 lambda params: self.emit(172 Page.Events.Request, from_channel(params["request"])173 ),174 )175 self._channel.on(176 "requestFailed",177 lambda params: self._on_request_failed(178 from_channel(params["request"]),179 params["responseEndTiming"],180 params["failureText"],181 ),182 )183 self._channel.on(184 "requestFinished",185 lambda params: self._on_request_finished(186 from_channel(params["request"]), params["responseEndTiming"]187 ),188 )189 self._channel.on(190 "response",191 lambda params: self.emit(192 Page.Events.Response, from_channel(params["response"])193 ),194 )195 self._channel.on(196 "route",197 lambda params: self._on_route(198 from_channel(params["route"]), from_channel(params["request"])199 ),200 )201 self._channel.on(202 "video",203 lambda params: cast(Video, self.video)._set_relative_path(204 params["relativePath"]205 ),206 )207 self._channel.on(208 "webSocket",209 lambda params: self.emit(210 Page.Events.WebSocket, from_channel(params["webSocket"])211 ),212 )213 self._channel.on(214 "worker", lambda params: self._on_worker(from_channel(params["worker"]))215 )216 def _set_browser_context(self, context: "BrowserContext") -> None:217 self._browser_context = context218 self._timeout_settings = TimeoutSettings(context._timeout_settings)219 def _on_request_failed(220 self,221 request: Request,222 response_end_timing: float,223 failure_text: str = None,224 ) -> None:225 request._failure_text = failure_text226 if request._timing:227 request._timing["responseEnd"] = response_end_timing228 self.emit(Page.Events.RequestFailed, request)229 def _on_request_finished(230 self, request: Request, response_end_timing: float231 ) -> None:232 if request._timing:233 request._timing["responseEnd"] = response_end_timing234 self.emit(Page.Events.RequestFinished, request)235 def _on_frame_attached(self, frame: Frame) -> None:236 frame._page = self237 self._frames.append(frame)238 self.emit(Page.Events.FrameAttached, frame)239 def _on_frame_detached(self, frame: Frame) -> None:240 self._frames.remove(frame)241 frame._detached = True242 self.emit(Page.Events.FrameDetached, frame)243 def _on_route(self, route: Route, request: Request) -> None:244 for handler_entry in self._routes:245 if handler_entry.matcher.matches(request.url):246 result = cast(Any, handler_entry.handler)(route, request)247 if inspect.iscoroutine(result):248 asyncio.create_task(result)249 return250 self._browser_context._on_route(route, request)251 def _on_binding(self, binding_call: "BindingCall") -> None:252 func = self._bindings.get(binding_call._initializer["name"])253 if func:254 asyncio.create_task(binding_call.call(func))255 self._browser_context._on_binding(binding_call)256 def _on_worker(self, worker: "Worker") -> None:257 self._workers.append(worker)258 worker._page = self259 self.emit(Page.Events.Worker, worker)260 def _on_close(self) -> None:261 self._is_closed = True262 self._browser_context._pages.remove(self)263 self.emit(Page.Events.Close)264 def _on_crash(self) -> None:265 self.emit(Page.Events.Crash)266 def _on_dialog(self, params: Any) -> None:267 dialog = from_channel(params["dialog"])268 if self.listeners(Page.Events.Dialog):269 self.emit(Page.Events.Dialog, dialog)270 else:271 asyncio.create_task(dialog.dismiss())272 def _add_event_handler(self, event: str, k: Any, v: Any) -> None:273 if event == Page.Events.FileChooser and len(self.listeners(event)) == 0:274 self._channel.send_no_reply(275 "setFileChooserInterceptedNoReply", {"intercepted": True}276 )277 super()._add_event_handler(event, k, v)278 def remove_listener(self, event: str, f: Any) -> None:...
event.py
Source:event.py
...141 self.mediator.logic.weapon.attach_obs(self.on_rotate_all)142 return wpn_cls143 def on_rotate_all(self, sender):144 self.notify('on_rotate_all', sender)145 def _on_crash(self):146 if self.mediator.fsm.getCurrentOrNextState() != 'Results':147 self.mediator.gfx.crash_sfx()148 def _process_wall(self):149 self._on_crash()150 def _process_goal(self):151 is_res = self.mediator.fsm.getCurrentOrNextState() == 'Results'152 has_started = self.mediator.logic.lap_time_start153 is_corr = self.mediator.logic.correct_lap154 if is_res or has_started and not is_corr:155 return156 self.mediator.logic.reset_waypoints()157 lap_times = self.mediator.logic.lap_times158 if self.mediator.logic.lap_time_start:159 lap_times += [self.mediator.logic.lap_time]160 self._process_nonstart_goals(1 + len(lap_times),161 self.mediator.laps)162 self.mediator.logic.lap_time_start = self.eng.curr_time163 def _process_nonstart_goals(self, lap_number, laps):...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!