Best Python code snippet using pytest-django_python
start_complete_demo.py
Source:start_complete_demo.py
...26__current_intent = 127__task_aborted_by_exception = False28__after_task = False29subproc = None30def print_it(s):31 print(str(datetime.now().isoformat('-')) + ': ' + str(s))32def usr1_handler(signum=None, frame=None):33 print_it('start_complete_demo: usr1_handler')34 global __task_aborted_by_exception35 global __aborting_task36 if not __aborting_task:37 print_it('Setting __task_aborted_by_exception to True.')38 __task_aborted_by_exception = True39def usr2_handler(signum=None, frame=None):40 print_it('start_complete_demo: usr2_handler')41 global __quit42 global __kill_count43 if not __quit:44 print_it('####################################################')45 print_it('# Going to abort execution of all remaining tasks. #')46 print_it('####################################################')47 __quit = True48 __kill_count = 133749 if not __aborting_task:50 abort_current_task()51def abort_current_task():52 print_it('start_complete_demo: abort_current_task')53 global __aborting_task54 global subproc55 global __clock_subscriber56 global __quit57 if __quit:58 print_it('Already quitting. Nothing to do here.')59 elif __aborting_task:60 print_it('Already aborting task. Nothing to do here.')61 else:62 __aborting_task = True63 if __clock_subscriber is not None:64 print_it('Unsubscribing clock subscriber.')65 __clock_subscriber.unregister()66 if subproc is not None:67 try:68 print_it('Killing subproc with pid ' + str(subproc.pid))69 exterminate(subproc.pid, signal.SIGINT)70 print_it('Waiting for subproc to terminate. Please be patient.')71 utils.wait_for_process(subproc, 90)72 poll = subproc.poll()73 if poll is None:74 print_it('Could not kill task process. Forcing!'75 ' (pid: ' + str(subproc.pid) + ', subproc.poll(): ' + str(poll) + ')')76 exterminate(subproc.pid, signal.SIGKILL, r=True)77 except Exception, e:78 print_it(e)79 __aborting_task = False80 print_it('start_complete_demo: Exiting abort_current_task')81def kill_like_a_berserk():82 print_it('start_complete_demo: kill_like_a_berserk')83 global subproc84 print_it('Killing subproc like a berserk.')85 exterminate(subproc.pid, signal.SIGKILL, r=True)86 print_it('start_complete_demo: Exiting kill_like_a_berserk.')87def exit_handler(signum=None, frame=None):88 print_it('start_complete_demo: exit_handler')89 print_it('signum: ' + str(signum) + ', frame: ' + str(frame))90 if signum != signal.SIGINT:91 print_it('start_complete_demo: exit_handler: Unhandled signal. Exiting without any action.')92 return93 global __kill_count94 global __quit95 global __clock_subscriber96 global __aborting_task97 __kill_count += 198 if __kill_count == 1:99 print_it('################################')100 print_it('# Going to abort current task. #')101 print_it('################################')102 abort_current_task()103 elif __kill_count == 2:104 print_it('####################################################')105 print_it('# Going to abort execution of all remaining tasks. #')106 print_it('####################################################')107 __quit = True108 if not __aborting_task:109 abort_current_task()110 elif __kill_count == 3:111 print_it('#################################')112 print_it('# Going to kill like a berserk. #')113 print_it('#################################')114 kill_like_a_berserk()115 else:116 print_it('You can stop spamming Ctrl-c now.')117 print_it('start_complete_demo: exiting exit_handler')118def start_demo(wait, tasks, logging):119 global __quit120 global __clock_subscriber121 global __time_started_task122 global __current_task123 global subproc124 global __kill_count125 global __remaining_time126 global __time_limit127 global __task_aborted_by_exception128 global __current_intent129 global __clock_time_started_task130 global __last_received_clock131 global __after_task132 rospy.init_node('start_complete_demo')133 rospy.loginfo('Setting use_sim_time to true')134 rospy.set_param('/use_sim_time', True)135 #atexit.register(exit_handler)136 signal.signal(signal.SIGTERM, exit_handler)137 signal.signal(signal.SIGINT, exit_handler)138 signal.signal(signal.SIGQUIT, exit_handler)139 signal.signal(signal.SIGUSR1, usr1_handler)140 signal.signal(signal.SIGUSR2, usr2_handler)141 print_it('Getting available task names.')142 tasks_to_execute = []143 task_names = get_available_task_names()144 print_it('Available task names: ' + str(task_names))145 #start_task.main(task_names[:2], True, True, True, True)146 if re.search('^\d*:\d*$', tasks):147 tasks_to_execute = eval('task_names[' + tasks + ']')148 elif re.search('^\d+$', tasks):149 tasks_to_execute = [task_names[int(tasks)]]150 elif re.search('^(\d+,?)+$', tasks):151 for num in tasks.split(','):152 tasks_to_execute.append(task_names[int(num)])153 tasks_to_execute.sort()154 else:155 for task in tasks.split(','):156 if task in task_names:157 tasks_to_execute.append(task)158 tasks_to_execute.sort()159 print_it('Going to execute the following tasks: ' + str(tasks_to_execute))160 if logging in [2]:161 dont_print = False162 else:163 dont_print = True164 i = 0165 __clock_time_started_task = 0166 while i < len(tasks_to_execute):167 __kill_count = 0168 __task_aborted_by_exception = False169 rospy.loginfo('Setting use_sim_time to true')170 rospy.set_param('/use_sim_time', True)171 if __quit:172 print_it('Demo has been aborted. Exiting (1)')173 return174 __current_task = tasks_to_execute[i]175 init_time = __initialization_time + '-' + __current_task + '-' + str(__current_intent)176 if wait:177 raw_input('Starting task ' + str(__current_task) + '. Press ENTER.')178 print_it('################################################################################')179 print_it('Starting task : ' + str(__current_task))180 print_it('Intent : ' + str(__current_intent))181 print_it('Started at : ' + str(__clock_time_started_task))182 print_it('Finished tasks: ' + str(tasks_to_execute[0:tasks_to_execute.index(__current_task)]))183 print_it('Tasks to go : ' + str(tasks_to_execute[tasks_to_execute.index(__current_task)+1:]))184 print_it('################################################################################')185 subproc, logger_process = utils.start_node('rosrun suturo_planning_startup start_task.py ' + __current_task +186 ' --plan --init --save --no-ts --inittime="' + init_time + '"' +187 ' --logging="' + str(logging) + '"' + ' --parent=' + str(os.getpid())188 , init_time, logging, 'Complete',189 dont_print=dont_print, print_prefix_to_stdout=False)190 __time_started_task = int(time.time())191 print_it('Subscribing to clock.')192 __clock_subscriber = rospy.Subscriber('clock', Clock, handle_clock, queue_size=1,193 callback_args={'task': __current_task,194 'intent': __current_intent})195 print_it('Waiting for task ' + __current_task + ' to terminate.')196 __after_task = False197 subproc.wait()198 __after_task = True199 print_it('Task ' + __current_task + ' terminated')200 print_it('Unsubscribing clock subscriber.')201 __clock_subscriber.unregister()202 if __quit:203 print_it('Demo has been aborted. Exiting (2)')204 return205 else:206 if __task_aborted_by_exception:207 print_it('Task has been interrupted by exception.')208 if __remaining_time > __time_limit / 2:209 print_it('Restarting task ' + __current_task)210 __current_intent += 1211 __clock_time_started_task += __last_received_clock212 else:213 print_it('Remaining time (' + str(__remaining_time) + 's) is too low to restart task.')214 print_it('Finished task ' + str(__current_task))215 i += 1216 __current_intent = 1217 __clock_time_started_task = 0218 else:219 print_it('Task terminated as expected.')220 print_it('Finished task ' + str(__current_task))221 i += 1222 __current_intent = 1223 __clock_time_started_task = 0224 if i != len(tasks_to_execute):225 time.sleep(5)226 # __remaining_time = __time_limit227 print_it('Finished complete demo.')228 rospy.signal_shutdown('Finished complete demo.')229def get_available_tasks():230 rospy.wait_for_service('euroc_c2_task_selector/list_scenes')231 service = rospy.ServiceProxy('euroc_c2_task_selector/list_scenes', ListScenes)232 return service().scenes233def get_available_task_names():234 available_tasks = get_available_tasks()235 task_names = []236 for t in available_tasks:237 task_names.append(t.name)238 task_names.sort()239 return task_names240def handle_clock(msg, args):241 global __time_started_task242 global __time_limit243 global __aborting_task244 global __current_task245 global __current_intent246 global __remaining_time247 global __last_received_clock248 global __clock_time_started_task249 global __after_task250 now = int(time.time())251 clock_time = msg.clock.secs252 remaining_time = __time_limit - __clock_time_started_task - clock_time253 task = args['task']254 intent = args['intent']255 if task == __current_task and intent == __current_intent:256 __last_received_clock = clock_time257 if not __aborting_task:258 __remaining_time = remaining_time259 def print_info():260 print_it(msg)261 print_it('__current_task: ' + str(__current_task))262 print_it('__current_intent: ' + str(__current_intent))263 print_it('task: ' + str(task))264 print_it('intent: ' + str(intent))265 print_it('remaining_time: ' + str(remaining_time))266 print_it('__clock_time_started_task: ' + str(__clock_time_started_task))267 print_it('__time_limit: ' + str(__time_limit))268 print_it('now: ' + str(now))269 print_it('__time_started_task: ' + str(__time_started_task))270 print_it('now - __time_started_task: ' + str(now - __time_started_task))271 if remaining_time <= 0 and not __after_task:272 if now - __time_started_task >= __time_limit:273 print_it('handle_clock:')274 print_info()275 print_panda()276 abort_current_task()277 else:278 print_it('Oh oh, received wrong information (old data from previous task) from clock.')279 print_info()280def print_panda():281 print_it('----------------------------------------------')282 print_it('| Ten minutes time limit has been exceeded. |')283 print_it('| Terminating Task. |')284 print_it('| |')285 print_it('â âââââââââââââââââââââââââââââââââââââââ |')286 print_it('| âââââââ«ââââ«ââââââââââââââââââââ«âââ«ââââââ |')287 print_it('| âââââââââââââââ«â«â«â«â«â«â«â«â«â«â«â«â«ââââââââââ«ââ |')288 print_it('| âââââââââââââââââââââââââââââââââââââââ |')289 print_it('| âââââââââââââââââââââââââââââââââââââââ« |')290 print_it('| ââ«âââââââââ«âââ«ââââââââââââââ«âââ«âââââââââ |')291 print_it('| âââââââââââââââââââââââââââââââââ«âââââââ |')292 print_it('| ââââââââââââââââââââââââââââââââââââââââ |')293 print_it('| â«ââââââââââââââââââââââââââââââââââ«âââââ |')294 print_it('| ââââââââââââââââââââââââââââââââââââââââ |')295 print_it('| ââââââââââââââââââââââââââââââââââââââââ« |')296 print_it('| âââââââââââââââââââââââ«âââ«ââââââââââââââ |')297 print_it('| ââââââââââââââ«ââââââââââââââââââââââââââ |')298 print_it('| âââ«ââââââââââ«âââââââââââââââââââââââââââ |')299 print_it('| ââââââââââââââââââââââââââââââââââââââââ |')300 print_it('| âââââââââââââââ«âââââââââââââââââââââââââ |')301 print_it('| âââââââââââââââ«ââââ«âââââââââââââââââââââ |')302 print_it('| ââââ«ââââââââââââ«ââââââââââ«ââââââââââââââ |')303 print_it('| ââââââ«ââââââââââââââââ«ââââââââââââââââââ |')304 print_it('| âââââââ«â«âââââââââ«ââââââââââââââââ«â«ââââââ |')305 print_it('| ââââââââ«ââââââââ«ââââââââââââââââ«â«â«â«ââââ |')306 print_it('| âââââââ«âââ«âââââ«ââââ«ââââââââââââââ«ââââ«âââ |')307 print_it('| âââââââââââ«âââââ«â«â«â«â«ââ«â«â«â«â«ââ«âââ«âââ«ââââââ |')308 print_it('| âââââââââ«âââââââââââââââââââââââââââââââ |')309 print_it('| âââââââââââ«â«â«â«â«â«â«â«ââââââ«â«â«â«â«â«â«âââââââââ |')310 print_it('| ââââââââ«âââââââââââââââââââââââââââââââ |')311 print_it('| âââââââââââââââââââ«ââââââââââââââââââââ |')312 print_it('| ââââââââââââââââââââââââââââââââ«âââââââ |')313 print_it('| ââââââââââââ«ââââââââââââââââââ«âââââââââ |')314 print_it('| âââââââââââââ«âââââââââââââââââââ«âââââââ |')315 print_it('| ââââââââââââââââââââââââââââââ«ââ«âââââââ |')316 print_it('| ââââââââââââââââ«âââââââââââââââââââââââ |')317 print_it('| âââââââââââââââââââââââââââââââââââââââ |')318 print_it('| ââââââââââââââ«ââââââââââââââââââââââââââ |')319 print_it('| âââââââââââââââââââââââââââââââââââââââ |')320 print_it('| âââââââââââââââââââââââââââââââââââââââ |')321 print_it('| ââââââââââââââââââââ«âââââââââââââââââââ |')322 print_it('| ââââââââââââ«ââ«âââââââââââââââ«â«ââââââââ |')323 print_it('| âââââââââââ«ââ«â«â«â«ââââ«â«â«ââââ«ââ«â«â«ââââââââ |')324 print_it('| ââââââââââââââââââââââââââââââââââââââ |')325 print_it('| booga mooga tooga xooga jooga iooga |')326 print_it('| |')327 print_it('| Sad panda is sad. |')328 print_it('| |')329 print_it('----------------------------------------------')330def main(argv):331 usage = 'usage: start_complete_demo [-t] [-w] [--tasks=<tasks you want to execute>] [--logging=<logging mode>]\n\n'\332 '\t-t\t\tPrints a list of all available tasks.\n' \333 '\t-w\t\tThe user has to press ENTER after each task.\n' \334 '\t--tasks\t\tA range like \':2\' (python slice and index notation)\n' \335 '\t\t\tor a comma-separated list of task names\n' \336 '\t\t\tor a comma-separated list of indices (as shown by -t).\n' \337 '\t--logging\t0 - Logs to files only (Default value).\n' \338 '\t\t\t1 - Logs to console only.\n' \339 '\t\t\t2 - Logs to files and console.'340 wait = False341 tasks = ':'342 logging = 0343 try:...
__main__.py
Source:__main__.py
...30 txt = print_queue.get(timeout=1)31 print(txt)32 except Empty:33 pass34 def print_it(*args):35 print_queue.put(' '.join(map(str, args)))36 def wait():37 with cond:38 cond.wait()39 def notify():40 with cond:41 cond.notifyAll()42 def puser():43 global p_total44 TAG = 'Producer:'45 print_it(TAG, 'connecting to bus %s ...' % args.uri)46 bus = pbus.connect(args.uri)47 print_it(TAG, 'connecte success!')48 print_it(TAG, 'will publish "%s" to %s' % (data, channel))49 ct = 050 mct = 051 time.sleep(.1)52 notify()53 st = time.time()54 while True:55 bus.publish(channel, data)56 ct += 157 p_total += 158 tg = int(max(time.time() - st, 0.001) * 1000)59 if time.time() - st >= timegap or not runing:60 mct += 161 print_it(TAG, mct, ':', ' count', ct, 'time: %sms' % tg, 'speed: %d/s' % int(ct * 1000 / tg), 'total', p_total)62 ct = 063 st = time.time()64 if not runing:65 break66 def suber():67 global c_total68 TAG = 'Consumer:'69 print_it(TAG, 'connecting to bus %s ...' % args.uri)70 bus = pbus.connect(args.uri)71 print_it(TAG, 'connecte success!')72 print_it(TAG, 'subscribing to %s ...' % channel)73 ps = bus.subscriber(channel)74 print_it(TAG, 'subscribe success!')75 print_it(TAG, 'waiting data from %s' % channel)76 ct = 077 mct = 078 time.sleep(.1)79 notify()80 st = time.time()81 for p in ps.listen():82 if p != data:83 raise Exception(u'msg error: %s' % p)84 c_total += 185 ct += 186 tg = int(max(time.time() - st, 0.001) * 1000)87 if time.time() - st >= timegap or not runing:88 mct += 189 print_it(TAG, mct, ':', ' count', ct, 'time: %sms' % tg, 'speed: %d/s' % int(ct * 1000 / tg), 'total', c_total)90 ct = 091 st = time.time()92 if not runing:93 break94 def start_thread(target):95 import threading96 def th():97 try:98 target()99 except:100 global runing101 import traceback102 traceback.print_exc()103 time.sleep(.1)104 notify()105 time.sleep(.1)106 stop()107 time.sleep(.1)108 notify()109 th = threading.Thread(target=th)110 th.setDaemon(True)111 th.start()112 start_thread(print_thread)113 def run():114 if args.loop:115 start_thread(suber)116 wait()117 start_thread(puser)118 elif args.producer:119 puser()120 else:121 suber()122 wait()123 print_it('--Press Ctrl+C to stop it!--')124 def stop():125 global runing, stoped126 print_it('will stop...')127 runing = False128 ct = 3129 while ct:130 print_it(ct, 's')131 time.sleep(1)132 ct -= 1133 if args.loop:134 print_it('p_total', p_total, 'c_total', c_total, 'total', p_total + c_total)135 elif args.producer:136 print_it('p_total', p_total)137 else:138 print_it('c_total', c_total)139 time.sleep(.1)140 stoped = True141 exit(0)142 try:143 start_thread(run)144 while runing:145 time.sleep(1)146 except KeyboardInterrupt:...
.profile.py
Source:.profile.py
1import os2import pathlib3import sys4print_it = False5if print_it:6 HOME = '/mnt/c/Users/jcnarlo/jcnarlo'7 ROOT = ''8 WINDOWS_ROOT = '/mnt/c'9 PF = r'Program Files'10 PFX86 = r'Program Files (x86)'11 PF = r'ProgramFiles'12 PFX86 = r'ProgramFilesX86'13 VIM_HOME = f'{WINDOWS_ROOT}/{PFX86}/Vim/vim81'14 PYTHON_DASH = f'{HOME}/work/Lacuna/install/bin/x64/python'15 PATH = '.'16 APATH = f'{HOME}/local/sbin'17 APATH = f'{APATH}:{HOME}/local/bin'18 APATH = f'{APATH}:{ROOT}/bin'19 APATH = f'{APATH}:{ROOT}/sbin'20 APATH = f'{APATH}:{ROOT}/usr/bin'21 APATH = f'{APATH}:{ROOT}/usr/sbin'22 APATH = f'{APATH}:{VIM_HOME}'23 APATH = f'{APATH}:{PYTHON_DASH}'24 APATH = f'{APATH}:{PYTHON_DASH}/Scripts'25 APATH = f'{APATH}:{WINDOWS_ROOT}/{PF}/7-Zip'26 APATH = f'{APATH}:{WINDOWS_ROOT}/{PFX86}/WinMerge'27 APATH = f'{APATH}:{WINDOWS_ROOT}/Windows/System32'28 APATH = f'{APATH}:{WINDOWS_ROOT}/Windows/SysWOW64'29 APATH = f'{APATH}:{WINDOWS_ROOT}/JOE'30 path0 = PATH31 path1 = path0.split(':')32 spath_add = APATH.strip()33 add_to_path = spath_add.split(':')34else:35 path0 = sys.argv[1]36 path1 = path0.split(':')37 spath_add = sys.argv[2].strip()38 add_to_path = spath_add.split(':')39if print_it:40#{41 print(f'argv: {sys.argv}', flush=True)42 print(f'path1={path1}', flush=True)43 print(f'add_to_path={add_to_path}', flush=True)44 print('', flush=True)45#}46for _atp in add_to_path:47#{48 if print_it:49 print(f'_atp={_atp}', flush=True)50 atp = None51 if False:52 #{53 if _atp.startswith('/mnt/c'):54 atp = _atp.replace('/mnt/c', 'C:')55 else:56 atp = _atp57 #}58 else:59 #{60 atp = _atp61 #}62 if print_it:63 print(f'check_dir={atp}', flush=True)64 p = pathlib.Path(atp)65 if print_it:66 #{67 if p.exists():68 print(f'pathlib TRUE {p}')69 else:70 print(f'pathlib FALSE {p}')71 #}72 #if True or os.path.exists(atp):73 if os.path.exists(atp):74 #{75 if print_it:76 print(f'Exist {atp}', flush=True)77 found = False78 for p in path1:79 #{80 if atp == p:81 #{82 if print_it:83 print(f'Skipping {atp}', flush=True)84 found = True85 break86 #}87 #}88 if not found:89 #{90 if False:91 if atp.startswith('C:'):92 atp = atp.replace('C:', '/mnt/c')93 else:94 atp = atp95 if print_it:96 print(f'Adding {atp}', flush=True)97 path1.append(atp)98 #}99 #}100 else:101 #{102 if print_it:103 print(f'NOT Exist {atp}', flush=True)104 #}105 if print_it:106 print('', flush=True)107#}108path0 = ':'.join(path1)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!