Best Python code snippet using autotest_python
context.py
Source:context.py
1#!/usr/bin/env python2# encoding: utf-83# Created by zza on 2021/6/24 16:384# Copyright 2021 LinkSense Technology CO,. Ltd5from __future__ import annotations6import asyncio7import datetime8import traceback9from typing import TYPE_CHECKING, Dict, ItemsView, Optional, Type10from lk_flow.config import Config11from lk_flow.core.event import EVENT, Event, EventBus12from lk_flow.env import logger13from lk_flow.errors import (14 DuplicateModError,15 DuplicateTaskNameError,16 LkFlowBaseError,17 ModNotFoundError,18 TaskNotFoundError,19)20from lk_flow.models import SubProcess, Task21if TYPE_CHECKING: # pragma: no cover22 # https://stackoverflow.com/questions/39740632/python-type-hinting-without-cyclic-imports23 from lk_flow.core import ModAbstraction24class Context:25 _env: Context = None26 def __init__(self, config: Config):27 Context._env = self28 self.loop_enable: bool = True29 self.config: Config = config30 self.event_bus: EventBus = EventBus()31 self.sleep_time: int = config.sleep_time32 self.system_start_time: datetime.datetime = datetime.datetime.now()33 # Process éå34 self._PROCESS_ALL = {} # ææTask35 self._PROCESS_RUNNING = {} # æ£å¨è·ç36 self._PROCESS_STOPPED = {} # æªæ¿æ´» or å·²ç»æç37 self._mod_map: Dict[str, Type["ModAbstraction"]] = {}38 # add add_listener39 self.event_bus.add_listener(EVENT.EXEC_SYSTEM_CLOSE, self._close_loop)40 self.event_bus.add_listener(EVENT.HEARTBEAT, self.state_check)41 @classmethod42 def get_instance(cls) -> Context:43 """44 è¿åå·²ç»å建ç Context 对象45 >>> Context.get_instance()46 Traceback (most recent call last):47 ...48 RuntimeError: Context has not been created. Please Use `Context.get_instance()` after lk_flow init49 """50 if cls._env is None:51 raise RuntimeError(52 "Context has not been created. Please Use `Context.get_instance()` after lk_flow init"53 )54 return cls._env55 # Mod56 def get_mod_map(self) -> ItemsView[str, Type["ModAbstraction"]]:57 return self._mod_map.items()58 def add_mod(self, mod_name: str, mod: Type["ModAbstraction"]) -> None:59 if mod_name in self._mod_map.keys():60 _message = (61 f"DuplicateModError at name = {mod_name}, {self._mod_map[mod_name]}"62 )63 raise DuplicateModError(_message)64 self._mod_map[mod_name] = mod65 def get_mod(self, mod_name: str) -> Type[Type["ModAbstraction"]]:66 if mod_name not in self._mod_map.keys():67 raise ModNotFoundError(f"{mod_name} not found")68 return self._mod_map[mod_name]69 # Process70 def get_process(self, task_name: str) -> SubProcess:71 if task_name not in self._PROCESS_ALL:72 raise TaskNotFoundError(f"Task {task_name} not found")73 return self._PROCESS_ALL[task_name]74 # Process Set75 def get_all_processes(self) -> ItemsView[str, SubProcess]:76 return self._PROCESS_ALL.items()77 def get_stopped_processes(self) -> ItemsView[str, SubProcess]:78 return self._PROCESS_STOPPED.items()79 def get_running_processes(self) -> ItemsView[str, SubProcess]:80 return self._PROCESS_RUNNING.items()81 # Task82 def start_task(self, task_name: str) -> None:83 if task_name in self._PROCESS_RUNNING:84 return85 # get subprocess86 process_manager: SubProcess = self._PROCESS_STOPPED.pop(task_name)87 # pre start88 self.event_bus.publish_event(Event(EVENT.TASK_PRE_START, task_name=task_name))89 # start90 asyncio.get_event_loop().run_until_complete(process_manager.start())91 self._PROCESS_RUNNING[task_name] = process_manager92 # running93 event = Event(94 EVENT.TASK_RUNNING,95 task_name=task_name,96 task=process_manager.config,97 process=process_manager,98 )99 self.event_bus.publish_event(event)100 def stop_task(self, task_name: str) -> None:101 """åæ¢è¿ç¨"""102 if task_name not in self._PROCESS_RUNNING:103 return104 process_manager: SubProcess = self._PROCESS_RUNNING.pop(task_name)105 # stop106 asyncio.get_event_loop().run_until_complete(process_manager.stop())107 self._PROCESS_STOPPED[task_name] = process_manager108 # running109 event = Event(110 EVENT.TASK_STOP,111 task_name=task_name,112 task=process_manager.config,113 process=process_manager,114 )115 self.event_bus.publish_event(event)116 def add_task(self, task: Task) -> Optional[SubProcess]:117 """æ·»å ä»»å¡å°ç³»ç»"""118 if task.name in self._PROCESS_ALL: # å·²å è½½119 message = f"{task.name} is already used. {task}"120 raise DuplicateTaskNameError(message)121 logger.debug(f"add task: {task}")122 process_manager = SubProcess(task)123 self._PROCESS_ALL[task.name] = process_manager124 self._PROCESS_STOPPED[task.name] = process_manager125 event = Event(126 EVENT.TASK_ADD, task_name=task.name, task=task, process=process_manager127 )128 self.event_bus.publish_event(event)129 return process_manager130 def delete_task(self, task_name: str) -> None:131 subprocess: SubProcess = self._PROCESS_ALL.pop(task_name, None)132 if subprocess is None:133 return134 if self._PROCESS_RUNNING.pop(task_name, None):135 asyncio.run(subprocess.stop())136 else:137 self._PROCESS_STOPPED.pop(task_name, None)138 event = Event(139 EVENT.TASK_DELETE,140 task_name=task_name,141 task=subprocess.config,142 process=subprocess,143 )144 self.event_bus.publish_event(event)145 # System146 def _close_loop(self, event: Event) -> Optional[True]:147 """å
³é循ç¯"""148 self.loop_enable = False149 for task_name in self._PROCESS_RUNNING.copy().keys():150 self.stop_task(task_name)151 logger.info(f"Get {event}, close loop.")152 self.event_bus.publish_event(Event(EVENT.SYSTEM_CLOSE))153 def is_running(self, task_name: str) -> bool:154 """check is running"""155 if task_name in self._PROCESS_RUNNING:156 return True157 return False158 def state_check(self, _: Event) -> None:159 """160 æ£æ¥è¿ç¨ç¶æ161 è¿ç¨æ£å¸¸éåºååé TASK_FINISHäºä»¶162 å¼å¸¸éåºåé TASK_RUNNING_ERRORäºä»¶163 """164 for name, subprocess in self._PROCESS_RUNNING.copy().items():165 if subprocess.exit_code is None:166 # still running167 continue168 self._PROCESS_RUNNING.pop(name)169 self._PROCESS_STOPPED[name] = subprocess170 async def entry_loop(self) -> None:171 while self.loop_enable:172 try:173 self.event_bus.publish_event(174 Event(EVENT.HEARTBEAT, now=datetime.datetime.now())175 )176 except LkFlowBaseError as err:177 logger.error(err.message)178 logger.error(traceback.format_exc())179 await asyncio.sleep(self.sleep_time)...
musicplayer.py
Source:musicplayer.py
1"""2This module contains a reusable MusicPlayer class that is based3on an audio player application named mpg123.4"""5import subprocess6import time7from threading import Thread8#-------------------------------------------------------------------9# AudioEngineUnavailableError class10#-------------------------------------------------------------------11class AudioEngineUnavailableError(Exception):12 """13 When the required player application (mpg123) is not available14 this Exception is raised.15 """16 pass 17#-------------------------------------------------------------------18# NoPlaybackError class19#-------------------------------------------------------------------20class NoPlaybackError(Exception):21 """22 When an attempt is made to control playback but nothing is23 being played, this Exception is raised.24 """25 pass 26#-------------------------------------------------------------------27# PlaybackInProgressError class28#-------------------------------------------------------------------29class PlaybackInProgressError(Exception):30 """31 When an attempt is made to begin playback while something32 is already playing, this Exception is raised.33 """34 pass 35#-------------------------------------------------------------------36# MusicPlayer class37#-------------------------------------------------------------------38class MusicPlayer():39 """40 Start a subprocess running audio player mpg123 to play a specified41 mp3 file. Provide controls to stop/start and quit playback.42 """43 def __init__(self):44 self._audioengine = 'mpg123' # Only supported player is mpg12345 self._p = None46 self._is_paused = False47 # The following instance variables are affected by the process_monitor48 # thread. 49 self._process_running = False50 self._return_code = None51 def play(self, sound_file):52 if not self._process_running:53 try:54 # Open a subprocess that runs mpg123 to play an mp3 file55 self._p = subprocess.Popen([self._audioengine, 56 '-C', # Enable commands to be read from stdin57 '-q', # Be quiet58 sound_file],59 stdin=subprocess.PIPE, # Pipe input via bytes60 stdout=None, 61 stderr=None)62 # Since we are using stdin for commands, we have to send something63 # to keep mpg123 from complaining when we exit. The complaint is64 # not a serious one, but it is annoying. If stdin is not used the65 # terminal is posted with "Can't set terminal attributes" at exit.66 # I send an empty string below to keep mpg123 happy.67 self._p.stdin.write(b'')68 self._p.stdin.flush()69 # start a monitor thread that sets instance variables with the70 # status of the subprocess. 71 monitor_thread = Thread(target=self.process_monitor,args=()) 72 monitor_thread.start()73 except FileNotFoundError as e:74 raise AudioEngineUnavailableError(f'AudioEngineUnavailableError: {e}')75 else:76 raise PlaybackInProgressError('You cannot play while something else is already playing.')77 def quit_playing(self):78 if self._process_running:79 self._p.stdin.write(b'q')80 self._p.stdin.flush()81 # Wait for process to end82 while self._process_running:83 time.sleep(0.1)84 else:85 raise NoPlaybackError('Cannot quit playback because nothing is playing.')86 def pause(self):87 if self._process_running:88 if self._is_paused:89 # Already paused, do nothing90 pass91 else:92 self._p.stdin.write(b's')93 self._p.stdin.flush()94 self._is_paused = True95 else:96 raise NoPlaybackError('Cannot pause playback because nothing is playing.')97 98 def resume(self):99 if self._process_running:100 if not self._is_paused:101 # Already playing, do nothing102 pass103 else:104 self._p.stdin.write(b's')105 self._p.stdin.flush()106 self._is_paused = False107 else:108 raise NoPlaybackError('Cannot resume playback because nothing is playing.')109 def is_playing(self):110 return self._process_running111 def return_code(self):112 return self._return_code113 def process_monitor(self):114 """115 This code runs in its own thread to monitor the state of our116 external subprocess. Instance variables _process_running and117 _return_code are used to show the process status.118 """119 # Indicate that the process is running at the start, it120 # should be121 self._process_running = True122 # When a process exits, p.poll() returns the code it set upon123 # completion124 self._return_code = self._p.poll()125 # See whether the process has already exited. This will cause a126 # value (i.e. not None) to return from p.poll()127 if self._return_code == None:128 # Wait for the process to complete, get its return code directly129 # from the wait() call (i.e. do not use p.poll())130 self._return_code = self._p.wait()131 # When we get here, the process has exited and set a return code132 self._process_running = False133def main():134 """135 A function that instantiates a MusicPlayer and demonstrates its features.136 """137 player = MusicPlayer()138 player.play('./my_mp3_files/fluidity-100-ig-edit-4558.mp3')139 print(f'Player is playing: {player.is_playing()}')140 print('Play for 2 seconds')141 time.sleep(2)142 print('Pause player for 2 seconds')143 player.pause()144 time.sleep(2)145 print('')146 player.resume()147 time.sleep(2)148 player.quit_playing()149 print(f'Player is playing: {player.is_playing()}')150 print(f'Return code: {player.return_code()}')151 player.play('./my_mp3_files/this-minimal-technology-12327.mp3')152 # Play a track until it finishes, ending the player subprocess when it does.153 # Keep the Python process running until the track ends.154 print('The Python program will keep running until it sees subprocess completion.')155 progress_indicator = '*'156 while player.is_playing():157 time.sleep(1)158 print(progress_indicator)159 progress_indicator += '*'160if __name__ == '__main__':161 try:162 main()163 except Exception as e:164 print(e)165 ...
utils.py
Source:utils.py
1import asyncio2from .ml import prediction_reload3from trame import state4async def queue_to_state(queue, *tasks):5 _process_running = True6 while _process_running:7 if queue.empty():8 await asyncio.sleep(1)9 else:10 msg = queue.get_nowait()11 if isinstance(msg, str):12 # command13 if msg == "stop":14 _process_running = False15 else:16 # Need to monitor as we are outside of client/server update17 with state.monitor():18 # state update (dict)19 state.update(msg)20 await asyncio.gather(*tasks)21 # Make sure we can go to prediction22 state.prediction_available = prediction_reload()23 state.testing_count = 0...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!