Best Python code snippet using playwright-python
test_data.py
Source:test_data.py
...125 def run(self):126 "Main master loop."127 self.start()128 util._setproctitle("master [%s]" % self.proc_name)129 self.manage_workers()130 while True:131 try:132 self.reap_workers()133 sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None134 if sig is None:135 self.sleep()136 self.murder_workers()137 self.manage_workers()138 continue139 if sig not in self.SIG_NAMES:140 self.log.info("Ignoring unknown signal: %s", sig)141 continue142 signame = self.SIG_NAMES.get(sig)143 handler = getattr(self, "handle_%s" % signame, None)144 if not handler:145 self.log.error("Unhandled signal: %s", signame)146 continue147 self.log.info("Handling signal: %s", signame)148 handler()149 self.wakeup()150 except StopIteration:151 self.halt()152 except KeyboardInterrupt:153 self.halt()154 except HaltServer, inst:155 self.halt(reason=inst.reason, exit_status=inst.exit_status)156 except SystemExit:157 raise158 except Exception:159 self.log.info("Unhandled exception in main loop:\n%s",160 traceback.format_exc())161 self.stop(False)162 if self.pidfile is not None:163 self.pidfile.unlink()164 sys.exit(-1)165 def handle_chld(self, sig, frame):166 "SIGCHLD handling"167 self.wakeup()168 def handle_hup(self):169 """\170 HUP handling.171 - Reload configuration172 - Start the new worker processes with a new configuration173 - Gracefully shutdown the old worker processes174 """175 self.log.info("Hang up: %s", self.master_name)176 self.reload()177 def handle_quit(self):178 "SIGQUIT handling"179 raise StopIteration180 def handle_int(self):181 "SIGINT handling"182 self.stop(False)183 raise StopIteration184 def handle_term(self):185 "SIGTERM handling"186 self.stop(False)187 raise StopIteration188 def handle_ttin(self):189 """\190 SIGTTIN handling.191 Increases the number of workers by one.192 """193 self.num_workers += 1194 self.manage_workers()195 def handle_ttou(self):196 """\197 SIGTTOU handling.198 Decreases the number of workers by one.199 """200 if self.num_workers <= 1:201 return202 self.num_workers -= 1203 self.manage_workers()204 def handle_usr1(self):205 """\206 SIGUSR1 handling.207 Kill all workers by sending them a SIGUSR1208 """209 self.kill_workers(signal.SIGUSR1)210 self.log.reopen_files()211 def handle_usr2(self):212 """\213 SIGUSR2 handling.214 Creates a new master/worker set as a slave of the current215 master without affecting old workers. Use this to do live216 deployment with the ability to backout a change.217 """218 self.reexec()219 def handle_winch(self):220 "SIGWINCH handling"221 if os.getppid() == 1 or os.getpgrp() != os.getpid():222 self.log.info("graceful stop of workers")223 self.num_workers = 0224 self.kill_workers(signal.SIGQUIT)225 else:226 self.log.info("SIGWINCH ignored. Not daemonized")227 def wakeup(self):228 """\229 Wake up the arbiter by writing to the PIPE230 """231 try:232 os.write(self.PIPE[1], '.')233 except IOError, e:234 if e.errno not in [errno.EAGAIN, errno.EINTR]:235 raise236 def halt(self, reason=None, exit_status=0):237 """ halt arbiter """238 self.stop()239 self.log.info("Shutting down: %s", self.master_name)240 if reason is not None:241 self.log.info("Reason: %s", reason)242 if self.pidfile is not None:243 self.pidfile.unlink()244 sys.exit(exit_status)245 def sleep(self):246 """\247 Sleep until PIPE is readable or we timeout.248 A readable PIPE means a signal occurred.249 """250 try:251 ready = select.select([self.PIPE[0]], [], [], 1.0)252 if not ready[0]:253 return254 while os.read(self.PIPE[0], 1):255 pass256 except select.error, e:257 if e[0] not in [errno.EAGAIN, errno.EINTR]:258 raise259 except OSError, e:260 if e.errno not in [errno.EAGAIN, errno.EINTR]:261 raise262 except KeyboardInterrupt:263 sys.exit()264 def stop(self, graceful=True):265 """\266 Stop workers267 :attr graceful: boolean, If True (the default) workers will be268 killed gracefully (ie. trying to wait for the current connection)269 """270 try:271 self.LISTENER.close()272 except Exception:273 pass274 self.LISTENER = None275 sig = signal.SIGQUIT276 if not graceful:277 sig = signal.SIGTERM278 limit = time.time() + self.cfg.graceful_timeout279 while self.WORKERS and time.time() < limit:280 self.kill_workers(sig)281 time.sleep(0.1)282 self.reap_workers()283 self.kill_workers(signal.SIGKILL)284 def reexec(self):285 """\286 Relaunch the master and workers.287 """288 if self.pidfile is not None:289 self.pidfile.rename("%s.oldbin" % self.pidfile.fname)290 self.reexec_pid = os.fork()291 if self.reexec_pid != 0:292 self.master_name = "Old Master"293 return294 os.environ['GUNICORN_FD'] = str(self.LISTENER.fileno())295 os.chdir(self.START_CTX['cwd'])296 self.cfg.pre_exec(self)297 util.closerange(3, self.LISTENER.fileno())298 util.closerange(self.LISTENER.fileno()+1, util.get_maxfd())299 os.execvpe(self.START_CTX[0], self.START_CTX['args'], os.environ)300 def reload(self):301 old_address = self.cfg.address302 # reload conf303 self.app.reload()304 self.setup(self.app)305 # reopen log files306 self.log.reopen_files()307 # do we need to change listener ?308 if old_address != self.cfg.address:309 self.LISTENER.close()310 self.LISTENER = create_socket(self.cfg, self.log)311 self.log.info("Listening at: %s", self.LISTENER)312 # do some actions on reload313 self.cfg.on_reload(self)314 # unlink pidfile315 if self.pidfile is not None:316 self.pidfile.unlink()317 # create new pidfile318 if self.cfg.pidfile is not None:319 self.pidfile = Pidfile(self.cfg.pidfile)320 self.pidfile.create(self.pid)321 # set new proc_name322 util._setproctitle("master [%s]" % self.proc_name)323 # spawn new workers324 for i in range(self.cfg.workers):325 self.spawn_worker()326 # manage workers327 self.manage_workers()328 def murder_workers(self):329 """\330 Kill unused/idle workers331 """332 for (pid, worker) in self.WORKERS.items():333 try:334 if time.time() - worker.tmp.last_update() <= self.timeout:335 continue336 except ValueError:337 continue338 self.log.critical("WORKER TIMEOUT (pid:%s)", pid)339 self.kill_worker(pid, signal.SIGKILL)340 def reap_workers(self):341 """\342 Reap workers to avoid zombie processes343 """344 try:345 while True:346 wpid, status = os.waitpid(-1, os.WNOHANG)347 if not wpid:348 break349 if self.reexec_pid == wpid:350 self.reexec_pid = 0351 else:352 # A worker said it cannot boot. We'll shutdown353 # to avoid infinite start/stop cycles.354 exitcode = status >> 8355 if exitcode == self.WORKER_BOOT_ERROR:356 reason = "Worker failed to boot."357 raise HaltServer(reason, self.WORKER_BOOT_ERROR)358 worker = self.WORKERS.pop(wpid, None)359 if not worker:360 continue361 worker.tmp.close()362 except OSError, e:363 if e.errno == errno.ECHILD:364 pass365 def manage_workers(self):366 """\367 Maintain the number of workers by spawning or killing368 as required.369 """370 if len(self.WORKERS.keys()) < self.num_workers:371 self.spawn_workers()372 workers = self.WORKERS.items()373 workers.sort(key=lambda w: w[1].age)374 while len(workers) > self.num_workers:375 (pid, _) = workers.pop(0)376 self.kill_worker(pid, signal.SIGQUIT)377 def spawn_worker(self):378 self.worker_age += 1379 worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,380 self.app, self.timeout/2.0,381 self.cfg, self.log)382 self.cfg.pre_fork(self, worker)383 pid = os.fork()384 if pid != 0:385 self.WORKERS[pid] = worker386 return pid387 # Process Child388 worker_pid = os.getpid()389 try:390 util._setproctitle("worker [%s]" % self.proc_name)391 self.log.info("Booting worker with pid: %s", worker_pid)392 self.cfg.post_fork(self, worker)393 worker.init_process()394 sys.exit(0)395 except SystemExit:396 raise397 except:398 self.log.debug("Exception in worker process:\n%s",399 traceback.format_exc())400 if not worker.booted:401 sys.exit(self.WORKER_BOOT_ERROR)402 sys.exit(-1)403 finally:404 self.log.info("Worker exiting (pid: %s)", worker_pid)405 try:406 worker.tmp.close()407 self.cfg.worker_exit(self, worker)408 except:409 pass410 def spawn_workers(self):411 """\412 Spawn new workers as needed.413 This is where a worker process leaves the main loop414 of the master process.415 """416 for i in range(self.num_workers - len(self.WORKERS.keys())):417 self.spawn_worker()418 def kill_workers(self, sig):419 """\420 Kill all workers with the signal `sig`421 :attr sig: `signal.SIG*` value422 """423 for pid in self.WORKERS.keys():424 self.kill_worker(pid, sig)425 def kill_worker(self, pid, sig):426 """\427 Kill a worker428 :attr pid: int, worker pid429 :attr sig: `signal.SIG*` value430 """431 try:432 os.kill(pid, sig)...
test_workers.py
Source:test_workers.py
1import unittest2from mock import patch, Mock, DEFAULT3import zmq4import json5import config.test_config as config6import common.broker.workers as workers7import time8from collections import OrderedDict9from common.broker.workers import WorkerShutdown10from ModelingMachine.engine.vertex_factory import VertexCache, ModelCache11from config.engine import EngConfig12class ZmqFakeSocket(object):13 def __init__(self, socktype):14 self.opts = {}15 self.messages = []16 self.recv_message = ['', workers.Protocol.REQUEST, 'client_address', '', 'service_name', '"request"']17 def bind(self, addr):18 pass19 def connect(self, host):20 pass21 def close(self):22 pass23 def setsockopt(self, opt, *args):24 self.opts[opt] = args25 def send_multipart(self, message):26 self.messages.append(str(message))27 def recv_multipart(self):28 return self.recv_message29class WorkersTestCase(unittest.TestCase):30 @classmethod31 def setUpClass(self):32 self.workers_context_patch = patch('common.broker.workers.zmq.Context')33 self.workers_context_mock = self.workers_context_patch.start()34 self.workers_context_mock.return_value.socket = ZmqFakeSocket35 self.workers_poller_patch = patch('common.broker.workers.zmq.Poller')36 self.workers_poller_mock = self.workers_poller_patch.start()37 self.workers_get_id_patch = patch('common.broker.workers.Workers.get_id')38 self.workers_get_id_mock = self.workers_get_id_patch.start()39 self.workers_get_id_mock.return_value = "1"40 self.workers_hb_patch = patch('common.broker.workers.Workers.send_heartbeats')41 self.workers_hb_mock = self.workers_hb_patch.start()42 self.workers_cp_patch = patch('common.broker.workers.Workers.check_pipes')43 self.workers_cp_mock = self.workers_cp_patch.start()44 self.workers_add_services_patch = patch('common.broker.workers.Workers.add_services')45 self.workers_add_services_mock = self.workers_add_services_patch.start()46 self.workers_register_patch = patch('common.broker.workers.Workers.register')47 self.workers_register_mock = self.workers_register_patch.start()48 self.workers_wp_patch = patch('common.broker.workers.WorkerProcess')49 self.workers_wp_mock = self.workers_wp_patch.start()50 self.workers_wp_mock.side_effect = Exception('WorkerProcess disabled')51 @classmethod52 def tearDownClass(self):53 self.workers_context_patch.stop()54 self.workers_poller_patch.stop()55 self.workers_get_id_patch.stop()56 self.workers_hb_patch.stop()57 self.workers_cp_patch.stop()58 self.workers_add_services_patch.stop()59 self.workers_register_patch.stop()60 self.workers_wp_patch.stop()61 def setUp(self):62 self.workers = workers.Workers(broker = None)63 self.workers.services = []64 self.workers.reconnect_to_broker()65 def test_send_services(self):66 self.workers.send_services()67 #message body "[]" is the empty json list of services68 self.assertEqual(self.workers.worker_socket.messages[-1], str(['', workers.Protocol.STATUS, "1", "[]"]))69 def test_process_msg(self):70 self.assertIsNone(self.workers.process_msg(workers.Protocol.HEARTBEAT, [""]))71 self.assertRaises(WorkerShutdown, self.workers.process_msg, workers.Protocol.SHUTDOWN, [""])72 self.assertEqual(self.workers.worker_socket.messages[-1], str(['', workers.Protocol.SHUTDOWN, "1"]))73 self.assertIsNone(self.workers.process_msg(workers.Protocol.STATUS, [""]))74 #message body "[]" is the empty json list of services75 self.assertEqual(self.workers.worker_socket.messages[-1], str(['', workers.Protocol.STATUS, "1", "[]"]))76 #this creates a new worker_socket77 self.assertIsNone(self.workers.process_msg(workers.Protocol.DISCONNECT, [""]))78 self.assertIsNone(self.workers.process_msg(workers.Protocol.INITIALIZE, [""]))79 self.assertEqual(self.workers.worker_socket.messages[-1], str(['', workers.Protocol.DISCONNECT, ""]))80 self.assertItemsEqual(self.workers.process_msg(workers.Protocol.REQUEST, ["client_address", "", "body"]), ["body"])81 @patch('common.broker.workers.FLIPPERS', autospec=True)82 def test_add_service(self, mock_flippers):83 mock_flippers.request_accounting = False84 self.workers.add_service("service_name")85 self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": None}])86 @patch('common.broker.workers.FLIPPERS', autospec=True)87 def test_add_service_with_flipper_on(self, mock_flippers):88 mock_flippers.request_accounting = True89 self.workers.add_service("service_name")90 self.assertItemsEqual(self.workers.services,91 [{"name": "service_name", "request": None, 'request_id': None}])92 @patch('common.broker.workers.FLIPPERS', autospec=True)93 def test_assign_request(self, mock_flippers):94 mock_flippers.request_accounting = False95 self.workers.add_service("service_name")96 self.assertTrue(self.workers.assign_request("service_name", {'qid': '1'}))97 self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": {'qid': '1'}}])98 #the one service is already occupied by a request99 self.assertFalse(self.workers.assign_request("service_name", {'qid': '1'}))100 self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": {'qid': '1'}}])101 #service 'manager' is always accepted102 self.assertTrue(self.workers.assign_request("manager", {'qid': '1'}))103 self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": {'qid': '1'}}])104 @patch('common.broker.workers.FLIPPERS', autospec=True)105 def test_clear_request(self, mock_flippers):106 mock_flippers.request_accounting = False107 self.workers.add_service("service_name")108 self.workers.assign_request("service_name", {'qid': '1'})109 self.workers.clear_request("service_name", {'qid': '1'})110 self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": None}])111 self.assertEqual(self.workers.worker_socket.messages[0], str(['', workers.Protocol.STATUS, "1", '["service_name"]']))112 self.assertTrue(self.workers.clear_request("manager", ""))113 @patch('common.broker.workers.FLIPPERS', autospec=True)114 def test_clear_request_with_flipper_on(self, mock_flippers):115 mock_flippers.request_accounting = True116 self.workers.add_service("service_name")117 self.workers.assign_request("service_name", {'qid': '1'})118 self.workers.clear_request("service_name", {'qid': '1'})119 self.assertItemsEqual(self.workers.services, [120 {"name": "service_name", "request": None, 'request_id': None}])121 self.assertEqual(self.workers.worker_socket.messages[0], str(['', workers.Protocol.STATUS, "1", '["service_name"]']))122 self.assertTrue(self.workers.clear_request("manager", ""))123 def test_cleanup_processes(self):124 with patch('common.broker.workers.sys', autospec=True) as mock_sys:125 wp = Mock()126 wp.is_alive.return_value = False127 wp.service = 'service1'128 wp.request = 'request1'129 wp2 = Mock()130 wp2.is_alive.return_value = True131 self.workers.worker_processes = [wp, wp2]132 with patch.object(self.workers, "clear_request") as mock_clear_request:133 self.workers.cleanup_processes()134 mock_clear_request.assert_called_once_with('service1', 'request1')135 self.assertItemsEqual(self.workers.worker_processes, [wp2])136 def test_poll_socket(self):137 self.workers.poller.poll.return_value = False138 self.assertIsNone(self.workers.poll_socket())139 self.workers.poller.poll.return_value = True140 self.assertEqual(self.workers.poll_socket(), self.workers.worker_socket.recv_message)141 def test_process_request(self):142 self.workers.poller.poll.return_value = True143 self.assertIsNone(self.workers.process_request())144 def test_process_manager_request_to_kill(self):145 service = 'manager'146 request =[service, '{"command": "kill"}']147 with patch.multiple(self.workers, wait_for_request = DEFAULT, kill_worker_by_request = DEFAULT, run_request=DEFAULT) as mocks:148 mocks['wait_for_request'].return_value = request149 result = self.workers.process_request()150 self.assertIsNone(result)151 mocks['kill_worker_by_request'].assert_called_once_with({'command': 'kill'})152 def test_process_manager_request_to_broadcast(self):153 service = 'manager'154 self.workers.add_service('service_name')155 self.workers.add_service('manager')156 request =[service, '{"command": "broadcast_command"}']157 with patch.multiple(self.workers, wait_for_request = DEFAULT, add_worker = DEFAULT) as mocks:158 mocks['wait_for_request'].return_value = request159 result = self.workers.process_request()160 self.assertIsNone(result)161 mocks['add_worker'].assert_called_once_with({'command': 'broadcast_command'}, None)162 def test_process_predict_request_with_cache(self):163 service = 'fit_single'164 self.workers.add_service('service_name')165 self.workers.add_service('fit_single')166 self.workers.model_cache.has_model_cache = True167 req = {'command':'predict_whatever', 'pid':'1234', 'blueprint_id':'1234', 'dataset_id':'1234', 'samplepct':'50', 'partitions':[[-1,-1]]}168 request =[service, json.dumps(req)]169 with patch.multiple(self.workers, wait_for_request = DEFAULT, add_worker = DEFAULT) as mocks:170 mocks['wait_for_request'].return_value = request171 self.workers.model_cache.get_cached_model = Mock()172 self.workers.model_cache.get_cached_model.return_value = 'test_cache'173 result = self.workers.process_request()174 self.assertIsNone(result)175 mocks['add_worker'].assert_called_once_with(req, 'test_cache')176 def test_process_predict_request_without_cache(self):177 service = 'fit_single'178 self.workers.add_service('service_name')179 self.workers.add_service('fit_single')180 self.workers.model_cache.has_model_cache = False181 req = {'command':'predict_whatever', 'pid':'1234', 'blueprint_id':'1234', 'dataset_id':'1234', 'samplepct':'50', 'partitions':[[-1,-1]]}182 request =[service, json.dumps(req)]183 with patch.multiple(self.workers, wait_for_request = DEFAULT, add_worker = DEFAULT) as mocks:184 mocks['wait_for_request'].return_value = request185 self.workers.model_cache.get_cached_model = Mock()186 self.workers.model_cache.get_cached_model.return_value = 'test_cache'187 result = self.workers.process_request()188 self.assertIsNone(result)189 mocks['add_worker'].assert_called_once_with(req, None)190 def test_get_cached_model(self):191 self.workers.model_cache.has_model_cache = 3192 req = {'command':'predict_whatever', 'pid':'1234', 'blueprint_id':'1234', 'dataset_id':'1234', 'samplepct':'50', 'partitions':[[-1,-1]]}193 #test get new model194 out = self.workers.model_cache.get_cached_model(req)195 self.assertIsInstance(out, VertexCache)196 self.assertEqual(OrderedDict(), self.workers.model_cache.cached_models)197 #test update198 self.workers.model_cache.update_cached_model(out,req)199 self.assertEqual(out, self.workers.model_cache.cached_models.values()[0])200 #test get existing model201 out2 = self.workers.model_cache.get_cached_model(req)202 self.assertEqual(out, out2)203 def test_shutdown(self):204 with patch.multiple(self.workers, try_run_once_at_shutdown=DEFAULT,205 current_requests=DEFAULT) as mocks:206 self.workers.stop = True207 self.workers.stop_time = time.time()208 self.workers.worker_processes = [1]209 mocks['current_requests'].return_value = [{'pid': 'pid', 'uid': 'uid'}]210 self.assertFalse(self.workers.shutdown())211 self.workers.stop = False212 self.assertFalse(self.workers.shutdown())213if __name__ == '__main__':...
bench_frameworks.py
Source:bench_frameworks.py
...15 start = time.time()16 res = func(*args, **kwargs)17 elapsed = time.time() - start18 return res, elapsed19def spawn_workers(n_workers):20 from ipyparallel.apps.ipengineapp import launch_new_instance21 pids = []22 import os23 for _ in range(n_workers):24 pid = os.fork()25 if pid == 0:26 launch_new_instance()27 else:28 pids.append(pid)29 #launch_new_instance()30 return pids31def bench_fiber(tasks, workers, task_duration, warmup=True, pool=None):32 if warmup:33 if not pool:34 pool = fiber.Pool(workers)35 pool.map(sleep_worker, [task_duration for x in range(tasks)],36 chunksize=1)37 logger.debug("warm up finished")38 res, elapsed = timeit(39 pool.map, sleep_worker, [task_duration for x in range(tasks)],40 chunksize=1,41 )42 return elapsed43def bench_fiber_seq(tasks, workers, task_duration, warmup=True, pool=None):44 def run(pool, duration):45 res = [None] * workers46 for i in range(tasks // workers):47 for j in range(workers):48 handle = pool.apply_async(sleep_worker, (duration,))49 res[j] = handle50 for j in range(workers):51 res[j].get()52 if warmup:53 if not pool:54 pool = mp.Pool(workers)55 pool.map(sleep_worker, [task_duration for x in range(tasks)],56 chunksize=1)57 logger.debug("warm up finished")58 res, elapsed = timeit(run, pool, task_duration)59 return elapsed60def bench_mp(tasks, workers, task_duration, warmup=True):61 logger.debug("benchmarking multiprocessing")62 pool = None63 if warmup:64 logger.debug("warming up")65 pool = mp.Pool(workers)66 pool.map(sleep_worker, [task_duration for x in range(tasks)],67 chunksize=1)68 logger.debug("warm up finished")69 res, elapsed = timeit(70 pool.map, sleep_worker, [task_duration for x in range(tasks)],71 chunksize=172 )73 return elapsed74def bench_mp_seq(tasks, workers, task_duration, warmup=True, pool=None):75 def run(pool, duration):76 res = [None] * workers77 for i in range(tasks // workers):78 for j in range(workers):79 handle = pool.apply_async(sleep_worker, (duration,))80 res[j] = handle81 for j in range(workers):82 res[j].get()83 if warmup:84 if not pool:85 pool = mp.Pool(workers)86 pool.map(sleep_worker, [task_duration for x in range(tasks)],87 chunksize=1)88 logger.debug("warm up finished")89 res, elapsed = timeit(run, pool, task_duration)90 return elapsed91def pyspark_parallel(sc, tasks, task_duration):92 nums = sc.parallelize([task_duration for i in range(tasks)])93 nums.map(sleep_worker).collect()94def pyspark_parallel_seq(sc, tasks, task_duration, workers):95 for i in range(tasks // workers):96 nums = sc.parallelize([task_duration for i in range(workers)])97 nums.map(sleep_worker).collect()98def bench_spark(tasks, workers, task_duration, warmup=True, sc=None):99 if warmup:100 pyspark_parallel(sc, tasks, task_duration)101 res, elapsed = timeit(pyspark_parallel, sc, tasks, task_duration)102 return elapsed103def bench_spark_seq(tasks, workers, task_duration, warmup=True, sc=None):104 if warmup:105 pyspark_parallel(sc, tasks, task_duration)106 res, elapsed = timeit(pyspark_parallel_seq, sc, tasks,107 task_duration, workers)108 return elapsed109def bench_ray(tasks, workers, task_duration, warmup=True):110 import ray111 @ray.remote112 def ray_sleep(duration):113 time.sleep(duration)114 if warmup:115 ray.get([ray_sleep.remote(task_duration) for x in range(tasks)])116 res, elapsed = timeit(117 ray.get, [ray_sleep.remote(task_duration) for x in range(tasks)]118 )119 return elapsed120def bench_ray_seq(tasks, workers, task_duration, warmup=True):121 import ray122 @ray.remote123 def ray_sleep(duration):124 time.sleep(duration)125 def ray_parallel_seq(tasks, workers):126 for i in range(tasks // workers):127 ray.get([ray_sleep.remote(task_duration) for x in range(workers)])128 if warmup:129 ray.get([ray_sleep.remote(task_duration) for x in range(tasks)])130 res, elapsed = timeit(131 ray_parallel_seq, tasks, workers132 )133 return elapsed134def bench_ipp_seq(tasks, workers, task_duration, warmup=True):135 from ipyparallel import Client136 rc = Client()137 #dview = rc[:]138 dview = rc.load_balanced_view()139 if warmup:140 dview.map_sync(sleep_worker, [task_duration for i in range(tasks)])141 dview.block = True142 def run(tasks):143 objs = [dview.apply_async(sleep_worker, task_duration) for i in range(tasks)]144 for task in objs:145 task.get()146 res, elapsed = timeit(147 run, tasks148 )149 return elapsed150def main():151 parser = argparse.ArgumentParser()152 parser.add_argument(153 'frameworks', nargs='+',154 choices=['mp', 'fiber', 'pyspark', 'ray', 'ipyparallel'],155 help='frameworks to benchmark'156 )157 parser.add_argument('-t', '--total-duration', type=int, default=1,158 help='total running time')159 parser.add_argument('-d', '--task-duration', type=float, default=None,160 choices=[0.001, 0.01, 0.1, 1],161 help='task duration in ms')162 args = parser.parse_args()163 workers = 5164 max_duration = args.total_duration165 results = {}166 frameworks = args.frameworks167 for framework in frameworks:168 results[framework] = []169 results[framework + "_seq"] = []170 if "pyspark" in frameworks:171 from pyspark import SparkContext172 import pyspark173 sc = SparkContext()174 conf = pyspark.SparkConf().setAll([("spark.cores.max", 5)])175 sc.stop()176 sc = pyspark.SparkContext(conf=conf)177 if "ray" in frameworks:178 import ray179 ray.init()180 if "fiber" in frameworks:181 import fiber.pool182 fiber_pool = fiber.Pool(workers)183 if "ipyparallel" in frameworks:184 print("before popen")185 #ipp_controller = subprocess.Popen(["ipcontroller", "--ip", "*"])186 print("after popen")187 import atexit188 import signal189 import os190 #atexit.register(ipp_controller.kill)191 pids = spawn_workers(workers)192 for pid in pids:193 atexit.register(os.kill, pid, signal.SIGKILL)194 time.sleep(4)195 for i in range(4):196 factor = 10 ** i197 duration = 1 / factor198 if args.task_duration is not None:199 print(args.task_duration, duration, type(args.task_duration), type(duration))200 if args.task_duration != duration:201 continue202 tasks = int(max_duration * workers / duration)203 print(204 "Benchmarking {} workers with {} tasks each takes {} "205 "seconds".format(...
workers_real_time_statistics.py
Source:workers_real_time_statistics.py
...166 :rtype: dict167 """168 return self._properties['activity_statistics']169 @property170 def total_workers(self):171 """172 :returns: The total_workers173 :rtype: unicode174 """175 return self._properties['total_workers']176 @property177 def workspace_sid(self):178 """179 :returns: The workspace_sid180 :rtype: unicode181 """182 return self._properties['workspace_sid']183 @property184 def url(self):...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!