Best Python code snippet using autotest_python
test_roslaunch_pmon.py
Source:test_roslaunch_pmon.py
...65 pass66 67 def unregister(self, p):68 self.procs.remove(p)69 def has_process(self, name):70 return len([p for p in self.procs if p.name == name]) > 071 def get_process(self, name):72 return self.procs.get(p, None)73 def has_main_thread_jobs(self):74 return False75 76 def do_main_thread_jobs(self):77 pass78 79 def kill_process(self, name):80 pass81 82 def shutdown(self):83 pass84 85 def get_active_names(self):86 return [p.name for p in self.procs]87 def get_process_names_with_spawn_count(self):88 actives = [(p.name, p.spawn_count) for p in self.procs]89 deads = []90 retval = [actives, deads]91 def mainthread_spin_once(self):92 pass93 94 def mainthread_spin(self):95 pass96 def run(self):97 pass98 99class ProcessMock(roslaunch.pmon.Process):100 def __init__(self, package, name, args, env, respawn=False):101 super(ProcessMock, self).__init__(package, name, args, env, respawn)102 self.stopped = False103 def stop(self, errors):104 self.stopped = True105 106class RespawnOnceProcessMock(ProcessMock):107 def __init__(self, package, name, args, env, respawn=False):108 super(ProcessMock, self).__init__(package, name, args, env, respawn)109 self.spawn_count = 0110 def is_alive(self):111 return False112 113 def start(self):114 self.spawn_count += 1115 if self.spawn_count > 1:116 self.respawn = False117## Test roslaunch.server118class TestRoslaunchPmon(unittest.TestCase):119 def setUp(self):120 self.pmon = roslaunch.pmon.ProcessMonitor()121 ## test all apis of Process instance. part coverage/sanity test122 def _test_Process(self, p, package, name, args, env, respawn):123 self.assertEquals(package, p.package)124 self.assertEquals(name, p.name)125 self.assertEquals(args, p.args) 126 self.assertEquals(env, p.env) 127 self.assertEquals(respawn, p.respawn)128 self.assertEquals(0, p.spawn_count) 129 self.assertEquals(None, p.exit_code)130 self.assert_(p.get_exit_description())131 self.failIf(p.is_alive())132 info = p.get_info()133 self.assertEquals(package, info['package'])134 self.assertEquals(name, info['name'])135 self.assertEquals(args, info['args']) 136 self.assertEquals(env, info['env']) 137 self.assertEquals(respawn, info['respawn'])138 self.assertEquals(0, info['spawn_count']) 139 self.failIf('exit_code' in info)140 p.start()141 self.assertEquals(1, p.spawn_count)142 self.assertEquals(1, p.get_info()['spawn_count']) 143 p.start() 144 self.assertEquals(2, p.spawn_count)145 self.assertEquals(2, p.get_info()['spawn_count'])146 # noop147 p.stop()148 p.exit_code = 0149 self.assertEquals(0, p.get_info()['exit_code'])150 self.assert_('cleanly' in p.get_exit_description())151 p.exit_code = 1152 self.assertEquals(1, p.get_info()['exit_code']) 153 self.assert_('[exit code 1]' in p.get_exit_description())154 ## tests to make sure that our Process base class has 100% coverage155 def test_Process(self):156 from roslaunch.pmon import Process157 # test constructor params158 respawn = False159 package = 'foo-%s'%time.time()160 name = 'name-%s'%time.time()161 args = [time.time(), time.time(), time.time()]162 env = { 'key': time.time(), 'key2': time.time() } 163 p = Process(package, name, args, env)164 self._test_Process(p, package, name, args, env, False)165 p = Process(package, name, args, env, True)166 self._test_Process(p, package, name, args, env, True) 167 p = Process(package, name, args, env, False)168 self._test_Process(p, package, name, args, env, False) 169 170 def _test_DeadProcess(self, p0, package, name, args, env, respawn):171 from roslaunch.pmon import DeadProcess172 p0.exit_code = -1173 dp = DeadProcess(p0)174 self.assertEquals(package, dp.package)175 self.assertEquals(name, dp.name)176 self.assertEquals(args, dp.args) 177 self.assertEquals(env, dp.env) 178 self.assertEquals(respawn, dp.respawn)179 self.assertEquals(0, dp.spawn_count) 180 self.assertEquals(-1, dp.exit_code)181 self.failIf(dp.is_alive())182 info = dp.get_info()183 info0 = p0.get_info()184 self.assertEquals(info0['package'], info['package'])185 self.assertEquals(info0['name'], info['name'])186 self.assertEquals(info0['args'], info['args']) 187 self.assertEquals(info0['env'], info['env']) 188 self.assertEquals(info0['respawn'], info['respawn'])189 self.assertEquals(0, info['spawn_count']) 190 try:191 dp.start()192 self.fail("should not be able to start a dead process")193 except: pass194 195 # info should be frozen196 p0.package = 'dead package'197 p0.name = 'dead name' 198 p0.spawn_count = 1199 self.assertEquals(package, dp.package)200 self.assertEquals(name, dp.name)201 self.assertEquals(0, dp.spawn_count) 202 self.assertEquals(package, dp.get_info()['package']) 203 self.assertEquals(name, dp.get_info()['name']) 204 self.assertEquals(0, dp.get_info()['spawn_count']) 205 p0.start() 206 self.assertEquals(0, dp.spawn_count)207 self.assertEquals(0, dp.get_info()['spawn_count'])208 # noop209 p0.stop()210 def test_DeadProcess(self):211 from roslaunch.pmon import Process, DeadProcess212 # test constructor params213 respawn = False214 package = 'foo-%s'%time.time()215 name = 'name-%s'%time.time()216 args = [time.time(), time.time(), time.time()]217 env = { 'key': time.time(), 'key2': time.time() } 218 p = Process(package, name, args, env)219 self._test_DeadProcess(p, package, name, args, env, False)220 p = Process(package, name, args, env, True)221 self._test_DeadProcess(p, package, name, args, env, True) 222 p = Process(package, name, args, env, False)223 self._test_DeadProcess(p, package, name, args, env, False) 224 def test_start_shutdown_process_monitor(self):225 def failer():226 raise Exception("oops")227 # noop228 self.failIf(roslaunch.pmon.shutdown_process_monitor(None))229 # test with fake pmon so we can get branch-complete230 pmon = ProcessMonitorMock()231 # - by setting alive to true, shutdown fails, though it can't really do anything about it232 pmon.alive = True233 self.failIf(roslaunch.pmon.shutdown_process_monitor(pmon))234 235 # make sure that exceptions get trapped 236 pmon.shutdown = failer237 # should cause an exception during execution, but should get caught 238 self.failIf(roslaunch.pmon.shutdown_process_monitor(pmon))239 240 # Test with a real process monitor241 pmon = roslaunch.pmon.start_process_monitor()242 self.assert_(pmon.isAlive())243 self.assert_(roslaunch.pmon.shutdown_process_monitor(pmon))244 self.failIf(pmon.isAlive())245 # fiddle around with some state that would shouldn't be246 roslaunch.pmon._shutting_down = True247 pmon = roslaunch.pmon.start_process_monitor()248 if pmon != None:249 self.failIf(roslaunch.pmon.shutdown_process_monitor(pmon))250 self.fail("start_process_monitor should fail if during shutdown sequence")251 252 def test_pmon_shutdown(self):253 # should be noop254 roslaunch.pmon.pmon_shutdown()255 256 # start two real process monitors and kill them257 # pmon_shutdown258 pmon1 = roslaunch.pmon.start_process_monitor()259 pmon2 = roslaunch.pmon.start_process_monitor()260 self.assert_(pmon1.isAlive())261 self.assert_(pmon2.isAlive()) 262 roslaunch.pmon.pmon_shutdown()263 264 self.failIf(pmon1.isAlive())265 self.failIf(pmon2.isAlive()) 266 267 def test_add_process_listener(self):268 # coverage test, not a functionality test as that would be much more difficult to simulate269 from roslaunch.pmon import ProcessListener270 l = ProcessListener()271 self.pmon.add_process_listener(l)272 def test_kill_process(self):273 from roslaunch.core import RLException274 pmon = self.pmon275 # should return False276 self.failIf(pmon.kill_process('foo'))277 278 p1 = ProcessMock('foo', 'name1', [], {})279 p2 = ProcessMock('bar', 'name2', [], {})280 pmon.register(p1)281 pmon.register(p2) 282 self.failIf(p1.stopped)283 self.failIf(p2.stopped)284 self.assert_(p1.name in pmon.get_active_names())285 self.assert_(p2.name in pmon.get_active_names())286 # should fail as pmon API is string-based287 try:288 self.assert_(pmon.kill_process(p1))289 self.fail("kill_process should have thrown RLException")290 except RLException: pass291 292 self.assert_(pmon.kill_process(p1.name))293 self.assert_(p1.stopped)294 295 # - pmon should not have removed yet as the pmon thread cannot catch the death296 self.assert_(pmon.has_process(p1.name))297 self.assert_(p1.name in pmon.get_active_names())298 299 self.failIf(p2.stopped) 300 self.assert_(p2.name in pmon.get_active_names()) 301 pmon.kill_process(p2.name)302 self.assert_(p2.stopped)303 304 # - pmon should not have removed yet as the pmon thread cannot catch the death305 self.assert_(pmon.has_process(p2.name))306 self.assert_(p2.name in pmon.get_active_names())307 p3 = ProcessMock('bar', 'name3', [], {})308 def bad(x):309 raise Exception("ha ha ha")310 p3.stop = bad311 pmon.register(p3)312 # make sure kill_process is safe313 pmon.kill_process(p3.name)314 def f():315 return False316 p1.is_alive = f317 p2.is_alive = f 318 p3.is_alive = f319 # Now that we've 'killed' all the processes, we should be able320 # to run through the ProcessMonitor run loop and it should321 # exit. But first, important that we check that pmon has no322 # other extra state in it323 self.assertEquals(3, len(pmon.get_active_names()))324 # put pmon into exitable state325 pmon.registrations_complete()326 # and run it -- but setup a safety timer to kill it if it doesn't exit327 marker = Marker()328 thread.start_new_thread(kill_pmon, (self.pmon,marker, 10.))329 330 pmon.run()331 332 self.failIf(marker.marked, "pmon had to be externally killed")333 334 self.assert_(pmon.done)335 self.failIf(pmon.has_process(p1.name))336 self.failIf(pmon.has_process(p2.name))337 alive, dead = pmon.get_process_names_with_spawn_count()338 self.failIf(alive)339 self.assert_((p1.name, p1.spawn_count) in dead)340 self.assert_((p2.name, p2.spawn_count) in dead)341 def test_run(self):342 # run is really hard to test...343 pmon = self.pmon344 # put pmon into exitable state345 pmon.registrations_complete()346 # give the pmon a process that raises an exception when it's347 # is_alive is checked. this should be marked as a dead process348 p1 = ProcessMock('bar', 'name1', [], {})349 def bad():350 raise Exception('ha ha')351 p1.is_alive = bad352 pmon.register(p1)353 354 # give pmon a well-behaved but dead process355 p2 = ProcessMock('bar', 'name2', [], {}) 356 def f():357 return False358 p2.is_alive = f359 pmon.register(p2)360 # give pmon a process that wants to respawn once361 p3 = RespawnOnceProcessMock('bar', 'name3', [], {}) 362 pmon.register(p3)363 364 # test assumptions about pmon's internal data structures365 # before we begin test366 self.assert_(p1 in pmon.procs)367 self.assert_(pmon._registrations_complete)368 self.failIf(pmon.is_shutdown)369 370 # and run it -- but setup a safety timer to kill it if it doesn't exit371 marker = Marker()372 thread.start_new_thread(kill_pmon, (self.pmon,marker, 10.))373 374 pmon.run()375 376 self.failIf(marker.marked, "pmon had to be externally killed") 377 # retest assumptions378 self.failIf(pmon.procs)379 self.assert_(pmon.is_shutdown) 380 pmon.is_shutdown = False381 382 def test_get_process_names_with_spawn_count(self):383 p1 = ProcessMock('foo', 'name1', [], {})384 p2 = ProcessMock('bar', 'name2', [], {})385 pmon = self.pmon386 self.assertEquals([[], []], pmon.get_process_names_with_spawn_count())387 pmon.register(p1)388 self.assertEquals([[('name1', 0),], []], pmon.get_process_names_with_spawn_count()) 389 pmon.register(p2)390 alive, dead = pmon.get_process_names_with_spawn_count()391 self.assertEquals([], dead)392 self.assert_(('name1', 0) in alive)393 self.assert_(('name2', 0) in alive) 394 395 import random396 p1.spawn_count = random.randint(1, 10000)397 p2.spawn_count = random.randint(1, 10000)398 399 alive, dead = pmon.get_process_names_with_spawn_count()400 self.assertEquals([], dead)401 self.assert_((p1.name, p1.spawn_count) in alive)402 self.assert_((p2.name, p2.spawn_count) in alive)403 #TODO figure out how to test dead_list404 405 ## Tests ProcessMonitor.register(), unregister(), has_process(), and get_process()406 def test_registration(self):407 from roslaunch.core import RLException408 from roslaunch.pmon import Process409 pmon = self.pmon410 411 p1 = Process('foo', 'name1', [], {})412 p2 = Process('bar', 'name2', [], {})413 corep1 = Process('core', 'core1', [], {}) 414 corep2 = Process('core', 'core2', [], {}) 415 pmon.register(p1)416 self.assert_(pmon.has_process('name1'))417 self.assertEquals(p1, pmon.get_process('name1'))418 self.failIf(pmon.has_process('name2'))419 self.assertEquals(['name1'], pmon.get_active_names())420 try:421 pmon.register(Process('foo', p1.name, [], {}))422 self.fail("should not allow duplicate process name")423 except RLException: pass424 425 pmon.register(p2)426 self.assert_(pmon.has_process('name2'))427 self.assertEquals(p2, pmon.get_process('name2'))428 self.assertEquals(set(['name1', 'name2']), set(pmon.get_active_names()))429 430 pmon.register_core_proc(corep1)431 self.assert_(pmon.has_process('core1'))432 self.assertEquals(corep1, pmon.get_process('core1'))433 self.assertEquals(set(['name1', 'name2', 'core1']), set(pmon.get_active_names())) 434 pmon.register_core_proc(corep2)435 self.assert_(pmon.has_process('core2'))436 self.assertEquals(corep2, pmon.get_process('core2'))437 self.assertEquals(set(['name1', 'name2', 'core1', 'core2']), set(pmon.get_active_names())) 438 pmon.unregister(p2)439 self.failIf(pmon.has_process('name2')) 440 pmon.unregister(p1)441 self.failIf(pmon.has_process('name1')) 442 pmon.unregister(corep1)443 self.failIf(pmon.has_process('core1')) 444 pmon.unregister(corep2)445 self.failIf(pmon.has_process('core2'))446 pmon.shutdown()447 try:448 pmon.register(Process('shutdown_fail', 'shutdown_fail', [], {}))449 self.fail("registration should fail post-shutdown")450 except RLException: pass451 452 453 454 def test_mainthread_spin_once(self):455 # shouldn't do anything456 self.pmon.done = False457 self.pmon.mainthread_spin_once()458 self.pmon.done = True 459 self.pmon.mainthread_spin_once()...
HttpSendCamera.py
Source:HttpSendCamera.py
1from concurrent.futures.thread import ThreadPoolExecutor2from IO.camera.basler.process.img.bangcai3.process_img import process_img3from IO.camera.basler.source.basler_camera import BaslerCameras4from threading import Thread5from queue import Queue6from tools.cv2base64 import image_to_base647from tools.wrapper import SAFE8import time9import requests10import json11# import grequests12class HttpSender():13 def __init__(self,send_url,send_thread=5,sleep_ms=150):14 super(HttpSender, self).__init__()15 self.send_url = send_url16 self.queue = Queue()17 self.send_thread = send_thread18 self.threadpool = ThreadPoolExecutor(max_workers=self.send_thread)19 print("éåé¿åº¦{}".format(self.threadpool._work_queue.qsize()))20 self.sleep_ms = sleep_ms21 # def run(self):22 # while True:23 # self.loop_func()24 # @SAFE25 # def loop_func(self):26 # if self.queue.qsize() > 0:27 # c = self.queue.get()28 # self.threadpool.submit(self.send_img,c[0],c[code])29 #30 # else:31 # time.sleep(self.sleep_ms/1000)32 # def put(self,img,info):33 # self.queue.put((img,info))34 def async_send_img(self,img,info):35 self.threadpool.submit(self.send_img,image_to_base64(img),info)36 print("éåé¿åº¦{}".format(self.threadpool._work_queue.qsize()))37 @SAFE38 def send_img(self,img_base64,info):39 data = {"data": info, "img": img_base64}40 response = requests.get(self.send_url, json=json.dumps(data))41class HttpSendCamera(BaslerCameras):42 def __init__(self,begin_id,camera_ip_list,camera_config_list,send_url,send_thread):43 super(HttpSendCamera,self).__init__(begin_id,camera_ip_list,camera_config_list)44 self.calc_percent = 0.5 # æå¤å°æ¯å¨å·¥æ§æºä¸å计ç®45 self.sender = HttpSender(send_url=send_url,send_thread=send_thread)46 def handle_img(self,image,camera_id,count):47 super(HttpSendCamera, self).handle_img(image, camera_id, count)48 has_process = False49 has_process, has_steel, image, left, right = self.qiebian_mod(camera_id, count, has_process, image)50 info = dict(51 camera_id=camera_id + self.begin_id,52 edge_gap=0,53 count=count,54 day_info=self.day_info,55 trigger_info=self.trigger_info,56 has_steel=has_steel,57 left=left,58 right=right,59 has_process=has_process60 )61 self.send(image,info)62 def qiebian_mod(self, camera_id, count, has_process, image):63 if count % 10 < self.calc_percent * 10:64 image, has_steel, left, right = process_img(image, camera_id)65 has_process = True66 else:67 image, has_steel, left, right = image, True, 0, 102468 return has_process, has_steel, image, left, right69 def send(self,image,info):70 print("info",info)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!