Best Python code snippet using avocado_python
task.py
Source:task.py
...30 # å¦æ¯æ°å å
¥çä»»å¡åå¯å¨ä»»å¡31 if not task._started.is_set():32 task.start()33 # å¦ä»»å¡æ²¡æåæ´»ååæ¢ä»»å¡34 if not task.is_task_alive():35 logger.debug('daemon run stop')36 task.stop()37 # å¦æä»»å¡å·²ç»åæ¢åæ¸
çä»»å¡38 if task._stopped.is_set():39 if task.is_kill_when_stop():40 # ææä»»å¡41 self.kill_task(task.name)42 else:43 # 设置为å¾
æ¸
ç44 self._tasks_running[i] = None45 # æ¯ç§è½®è¯¢46 time.sleep(1)47 logger.debug('daemon run finish')48 def exit(self):49 '''éåºå®æ¤çº¿ç¨50 :return:51 '''52 logger.debug('daemon exit start')53 # ç»æææå¨è¿è¡çä»»å¡54 for task in self._tasks_running:55 task.stop()56 # æ¸
空任å¡å表åå¨è¿è¡ä»»å¡å表57 self._tasks = {}58 self._tasks_running = []59 # 设置éåºæ è¯60 self._exit.set()61 logger.debug('daemon exit finish')62 def add_task(self, class_, name, callback = None, params = None,63 **kwargs):64 task = class_(name = name, callback = callback,65 params = params, **kwargs)66 if self.has_task(name):67 return False68 self._tasks[name] = task69 self._tasks_running.append(task)70 return True71 def has_task(self, name):72 return name in self._tasks73 def kill_task(self, name):74 try:75 task = self._tasks.get(name, None)76 assert task77 if task.is_task_alive():78 task.stop()79 task.on_kill()80 if task in self._tasks_running:81 index = self._tasks_running.index(task)82 self._tasks_running[index] = None83 self._tasks.pop(name)84 return True85 except AssertionError:86 return True87 except Exception as e:88 return False89 def query_tasks(self):90 return [{'name': task.name,91 'running': True if task in self._tasks_running else92 False}93 for task in self._tasks]94 def has_task_running(self):95 return len(self._tasks_running) > 096 def call(self, name, method, *args, **kwargs):97 task = self._tasks.get(name, None)98 if task:99 return getattr(task, method)(*args, **kwargs)100 def getattr(self, name, attribute):101 task = self._tasks.get(name, None)102 if task:103 return getattr(task, attribute)104 def setattr(self, name, attribute, value):105 task = self._tasks.get(name, None)106 if task:107 return setattr(task, attribute, value)108class TaskRunner(Thread):109 """ä»»å¡è¿è¡å¨110 """111 def __init__(self, **kwargs):112 try:113 # ååºä»»å¡å称ä½ä¸ºçº¿ç¨å称114 name = kwargs.get('name', None)115 assert name116 kwargs.pop('name')117 super().__init__(name = name, kwargs = kwargs)118 # åè°å¥æ119 self._callback = kwargs.get('callback', None)120 # è¿è¡åæ°121 self._params = kwargs.get('params', None)122 if self._params:123 for k, v in self._params.items():124 setattr(self, f'_{k}', v)125 # 线ç¨æ§å¶æå126 self._stopped = Event() # 线ç¨åæ¢æ è¯127 self._stop_by_user = False # 被ç¨æ·ç»æ¢æ è¯128 self._stop_by_self = False # 被任å¡èªèº«ç»æ¢æ è¯129 self._stop_by_daemon = False # 被å®æ¤çº¿ç¨ç»æ¢æ è¯130 self._run_times = 0 # è¿è¡æ¬¡æ°131 except AssertionError:132 raise TaskRunnerInitError()133 def run(self):134 # åç½®è¿è¡ï¼è·å¾è½®è¯¢é´é135 interval = self._on_before_runloop()136 interval = interval\137 if interval and (isinstance(interval, int)138 or isinstance(interval, float)) else 0.1139 while not self._stopped.is_set():140 self._run_times = self._run_times + 1141 logger.debug(142 f"[{urldecode(self.name)}] run loop start ({self._run_times})")143 # 主ä½è¿è¡144 self._on_runloop()145 # æé´é轮询146 time.sleep(interval)147 logger.debug(148 f"[{urldecode(self.name)}] run loop end ({self._run_times})")149 # åç½®è¿è¡ï¼ç¨äºæ¸
åº150 self._on_after_runloop()151 def stop(self):152 try:153 logger.debug(f"[{urldecode(self.name)}] stop server start")154 self._on_before_stop()155 self._stopped.set()156 self._stop_by_user = True157 self._on_after_stop()158 logger.debug(f"[{urldecode(self.name)}] stop server end")159 return True, True, self._get_stop_success_message()160 except Exception:161 return False, False, self._get_stop_success_message()162 def is_task_alive(self):163 pass164 def is_kill_when_stop(self):165 pass166 def on_kill(self):167 pass168 def _on_before_runloop(self):169 return 0.1170 def _on_runloop(self):171 pass172 def _on_after_runloop(self):173 pass174 def _on_before_stop(self):175 pass176 def _on_after_stop(self):177 pass178 def _get_stop_success_message(self):179 return _('Task has stopped')180 def _get_stop_fail_message(self):181 return _('Task stop failed')182class HeartbeatInitError(RuntimeError):183 pass184class OnceInitError(RuntimeError):185 pass186class HeartbeatRunner(TaskRunner):187 """å¿è·³è¿è¡å¨188 """189 def __init__(self, **kwargs):190 super().__init__(**kwargs)191 try:192 assert self._callback193 assert self._interval194 assert self._command195 from server.android.device import\196 _client # todo change to current_client197 self._command_func = getattr(_client,198 f'cmd_{self._command}',199 None)200 self._alive_func = getattr(_client,201 f'alive_{self._command}',202 None)203 assert self._command_func204 except AssertionError:205 raise HeartbeatInitError()206 def _on_before_runloop(self):207 return self._interval208 def _on_runloop(self):209 if self._command_func:210 self._callback('android',211 (self._command, self.name) +212 self._command_func(self.name,213 **self._params))214 else:215 self._callback('android',216 (self._command, self.name, False, False,217 None))218 def _on_after_runloop(self):219 logger.debug(f'[{urldecode(self.name)}] after run loop')220 def is_task_alive(self):221 if self._alive_func:222 return self._alive_func(self.name, **self._params)223 else:224 return True225 def is_kill_when_stop(self):226 return True227class OnceRunner(TaskRunner):228 """ä¸æ¬¡æ§è¿è¡å¨229 """230 def __init__(self, **kwargs):231 super().__init__(**kwargs)232 try:233 assert self._callback234 assert self._command235 from server.android.device import\236 _client # todo change to current_client237 self._command_func = getattr(_client,238 f'cmd_{self._command}',239 None)240 assert self._command_func241 except AssertionError:242 raise OnceInitError()243 def _on_runloop(self):244 self._callback('android',245 (self._command, self.name) +246 self._command_func(self.name,247 **self._params))248 def is_task_alive(self):249 return False250 def is_kill_when_stop(self):...
test_spawner.py
Source:test_spawner.py
...14 loop = asyncio.get_event_loop()15 spawned = loop.run_until_complete(self.spawner.spawn_task(self.runtime_task))16 self.assertTrue(spawned)17 def test_never_spawned(self):18 self.assertFalse(self.spawner.is_task_alive(self.runtime_task))19 self.assertFalse(self.spawner.is_task_alive(self.runtime_task))20class Mock(Process):21 def setUp(self):22 runnable = nrunner.Runnable('noop', 'uri')23 task = nrunner.Task('1', runnable)24 self.runtime_task = RuntimeTask(task)25 self.spawner = MockSpawner()26 def test_spawned_is_alive(self):27 loop = asyncio.get_event_loop()28 loop.run_until_complete(self.spawner.spawn_task(self.runtime_task))29 self.assertTrue(self.spawner.is_task_alive(self.runtime_task))30 self.assertFalse(self.spawner.is_task_alive(self.runtime_task))31class RandomMock(Mock):32 def setUp(self):33 runnable = nrunner.Runnable('noop', 'uri')34 task = nrunner.Task('1', runnable)35 self.runtime_task = RuntimeTask(task)36 self.spawner = MockRandomAliveSpawner()37 def test_spawned_is_alive(self):38 loop = asyncio.get_event_loop()39 loop.run_until_complete(self.spawner.spawn_task(self.runtime_task))40 # The likelihood of the random spawner returning the task is41 # not alive is 1 in 5. This gives the random code 1000042 # chances of returning False, so it should, famous last words,43 # be pretty safe44 finished = False45 for _ in range(10000):46 if not self.spawner.is_task_alive(self.runtime_task):47 finished = True48 break49 self.assertTrue(finished)50if __name__ == '__main__':...
taskkiller.py
Source:taskkiller.py
2import os3def get_tasks():4 with open('tasks.txt') as f:5 return [x.rstrip('\n') for x in f.readlines()]6def is_task_alive(tasks):7 for x in os.popen('tasklist').readlines():8 if len(x) > 1:9 task = x.split()[0]10 if task in tasks:11 return task12def main():13 tasks = get_tasks()14 while True:15 task = is_task_alive(tasks)16 if task:17 os.system(f'taskkill /f /im {task}')18 ctypes.windll.user32.MessageBoxW(0, 'Get back to work!', 'Alert!', 0)19if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!