Best Python code snippet using fMBT_python
server.py
Source:server.py
1from multiprocessing.reduction import reduce_socket2import multiprocessing3import select4import socket5import sys6try:7 import argparse8except:9 # argparse is only available in Python 2.7+10 print >> sys.stderr, 'pip install -U argparse'11 sys.exit(1)12def handle_conn(conn, addr):13 data = conn.recv(32)14 conn.sendall(data)15 conn.close()16def queued_handle_conn(queue):17 while True:18 rebuild_func, hints, addr = queue.get()19 conn = rebuild_func(*hints)20 handle_conn(conn, addr)21def basic_server(socket_):22 child = []23 try:24 while True:25 conn, addr = socket_.accept()26 p = multiprocessing.Process(target=handle_conn, args=(conn, addr))27 p.start()28 child.append(p)29 finally:30 [p.terminate() for p in child if p.is_alive()]31def select_server(socket_, timeout=1, use_worker=False):32 '''Single process select() with non-blocking accept() and recv().'''33 peers = []34 try:35 max_peers = 036 if use_worker:37 queue = multiprocessing.Queue()38 worker = multiprocessing.Process(target=queued_handle_conn,39 args=(queue,))40 worker.start()41 while True:42 max_peers = max(max_peers, len(peers))43 readable, w, e = select.select(peers + [socket_], [], [], timeout)44 for s in readable:45 if s is socket_:46 while True:47 try:48 conn, addr = socket_.accept()49 conn.setblocking(0)50 peers.append(conn)51 except:52 break53 else:54 peers.remove(s)55 conn, addr = s, s.getpeername()56 if use_worker:57 # Behind-the-scene: 'conn' is serialized and sent to58 # worker process via socket (IPC).59 rebuild_func, hints = reduce_socket(conn)60 queue.put((rebuild_func, hints, addr))61 else:62 handle_conn(conn, addr)63 finally:64 if use_worker and worker.is_alive():65 worker.terminate()66 print 'Max. number of connections:', max_peers67def poll_server(socket_, timeout=1, use_worker=False):68 '''Single process poll() with non-blocking accept() and recv().'''69 peers = {} # {fileno: socket}70 flag = (select.POLLIN |71 select.POLLERR |72 select.POLLHUP)73 try:74 max_peers = 075 if use_worker:76 queue = multiprocessing.Queue()77 worker = multiprocessing.Process(target=queued_handle_conn,78 args=(queue,))79 worker.start()80 poll = select.poll()81 poll.register(socket_, select.POLLIN)82 while True:83 max_peers = max(max_peers, len(peers))84 actionable = poll.poll(timeout)85 for fd, event in actionable:86 if fd == socket_.fileno():87 while True:88 try:89 conn, addr = socket_.accept()90 conn.setblocking(0)91 peers[conn.fileno()] = conn92 poll.register(conn, flag)93 except:94 break95 elif event & select.POLLIN:96 poll.unregister(fd)97 conn, addr = peers[fd], peers[fd].getpeername()98 if use_worker:99 # Behind-the-scene: 'conn' is serialized and sent to100 # worker process via socket (IPC).101 rebuild_func, hints = reduce_socket(conn)102 queue.put((rebuild_func, hints, addr))103 else:104 handle_conn(conn, addr)105 elif event & select.POLLERR or event & select.POLLHUP:106 poll.unregister(fd)107 peers[fd].close()108 finally:109 if use_worker and worker.is_alive():110 worker.terminate()111 print 'Max. number of connections:', max_peers112def epoll_server(socket_, timeout=1, use_worker=False):113 '''Single process epoll() with non-blocking accept() and recv().'''114 peers = {} # {fileno: socket}115 flag = (select.EPOLLIN |116 select.EPOLLET |117 select.EPOLLERR |118 select.EPOLLHUP)119 try:120 max_peers = 0121 if use_worker:122 queue = multiprocessing.Queue()123 worker = multiprocessing.Process(target=queued_handle_conn,124 args=(queue,))125 worker.start()126 epoll = select.epoll()127 epoll.register(socket_, select.EPOLLIN | select.EPOLLET)128 while True:129 max_peers = max(max_peers, len(peers))130 actionable = epoll.poll(timeout=timeout)131 for fd, event in actionable:132 if fd == socket_.fileno():133 while True:134 try:135 conn, addr = socket_.accept()136 conn.setblocking(0)137 peers[conn.fileno()] = conn138 epoll.register(conn, flag)139 except:140 break141 elif event & select.EPOLLIN:142 epoll.unregister(fd)143 conn, addr = peers[fd], peers[fd].getpeername()144 if use_worker:145 # Behind-the-scene: 'conn' is serialized and sent to146 # worker process via socket (IPC).147 rebuild_func, hints = reduce_socket(conn)148 queue.put((rebuild_func, hints, addr))149 else:150 handle_conn(conn, addr)151 elif event & select.EPOLLERR or event & select.EPOLLHUP:152 epoll.unregister(fd)153 peers[fd].close()154 finally:155 if use_worker and worker.is_alive():156 worker.terminate()157 epoll.close()158 print 'Max. number of connections:', max_peers159def main():160 HOST, PORT = '127.0.0.1', 8000161 MODES = ('basic', 'select', 'poll', 'epoll')162 argparser = argparse.ArgumentParser()163 argparser.add_argument('mode', help=('Operating mode of the server: %s'164 % ', '.join(MODES)))165 argparser.add_argument('--backlog', type=int, default=0,166 help='socket.listen() backlog')167 argparser.add_argument('--timeout', type=int, default=1000,168 help='select/poll/epoll timeout in ms')169 argparser.add_argument('--worker', action='store_true',170 help=('Spawn a worker to process request in '171 'select/poll/epoll mode. '172 'NOTE: The sole purpose of this option is '173 'experiment, it does not really help shorten '174 'the response time.'))175 args = argparser.parse_args()176 if args.mode not in MODES:177 msg = 'Availble operating modes: %s' % ', '.join(MODES)178 print >> sys.stderr, msg179 sys.exit(1)180 socket_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM)181 try:182 socket_.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)183 socket_.bind((HOST, PORT))184 if args.mode in ('select', 'poll', 'epoll'):185 socket_.setblocking(0)186 timeout = args.timeout / 1000187 socket_.listen(args.backlog)188 if args.mode == 'basic':189 basic_server(socket_)190 elif args.mode == 'select':191 select_server(socket_, timeout, use_worker=args.worker)192 elif args.mode == 'poll':193 poll_server(socket_, timeout, use_worker=args.worker)194 elif args.mode == 'epoll':195 epoll_server(socket_, timeout, use_worker=args.worker)196 except KeyboardInterrupt:197 pass198 finally:199 socket_.close()200if __name__ == '__main__':...
test_workers.py
Source:test_workers.py
1"""2this is a test module testing the workers api of Chariots3"""4import time5import pytest6from flaky import flaky7from redis import Redis8from chariots.pipelines import PipelinesServer, Pipeline9from chariots.pipelines.nodes import Node10from chariots.testing import TestPipelinesClient11from chariots.workers import JobStatus, RQWorkerPool12from chariots.errors import VersionError13from chariots._helpers.test_helpers import IsPair, RQWorkerContext, build_keras_pipeline, \14 do_keras_pipeline_predictions_test15def do_async_pipeline_test(test_client, pipe, use_worker=None):16 """helper function that tests the basic pipeline with just the IsPair op"""17 response = test_client.call_pipeline(pipe, pipeline_input=list(range(20)), use_worker=use_worker)18 assert response.job_status == JobStatus.queued19 time.sleep(5)20 response = test_client.fetch_job(response.job_id, pipe)21 assert response.job_status == JobStatus.done22 assert response.value == [not i % 2 for i in range(20)]23def test_app_async(tmpdir, opstore_func):24 """tests executing the app async with the app setting `use_workers` to true"""25 with RQWorkerContext():26 pipe1 = Pipeline([27 Node(IsPair(), input_nodes=['__pipeline_input__'], output_nodes='__pipeline_output__'),28 ], name='inner_pipe')29 app = PipelinesServer([pipe1], op_store_client=opstore_func(tmpdir), import_name='some_app',30 worker_pool=RQWorkerPool(redis=Redis()),31 use_workers=True)32 test_client = TestPipelinesClient(app)33 do_async_pipeline_test(test_client, pipe1)34def test_app_async_pipeline(tmpdir, opstore_func):35 """tests executing the app async with the pipeline setting `use_workers` to true"""36 with RQWorkerContext():37 pipe1 = Pipeline([38 Node(IsPair(), input_nodes=['__pipeline_input__'], output_nodes='__pipeline_output__')39 ], name='inner_pipe', use_worker=True)40 app = PipelinesServer([pipe1], op_store_client=opstore_func(tmpdir), import_name='some_app',41 worker_pool=RQWorkerPool(redis=Redis()))42 test_client = TestPipelinesClient(app)43 do_async_pipeline_test(test_client, pipe1)44def test_app_async_request(tmpdir, opstore_func):45 """tests executing the app async with the client setting `use_workers` to true"""46 with RQWorkerContext():47 pipe1 = Pipeline([48 Node(IsPair(), input_nodes=['__pipeline_input__'], output_nodes='__pipeline_output__')49 ], name='inner_pipe')50 app = PipelinesServer([pipe1], op_store_client=opstore_func(tmpdir), import_name='some_app',51 worker_pool=RQWorkerPool(redis=Redis()))52 test_client = TestPipelinesClient(app)53 do_async_pipeline_test(test_client, pipe1, use_worker=True)54def test_app_async_conflicting_config(tmpdir, opstore_func):55 """56 tests the behavior when their are conflicts in the `use_workers` config (there is at least one True and one False)57 """58 with RQWorkerContext():59 pipe1 = Pipeline([60 Node(IsPair(), input_nodes=['__pipeline_input__'], output_nodes='__pipeline_output__')61 ], name='inner_pipe', use_worker=True)62 app = PipelinesServer([pipe1], op_store_client=opstore_func(tmpdir), import_name='some_app',63 worker_pool=RQWorkerPool(redis=Redis()), use_workers=False)64 test_client = TestPipelinesClient(app)65 response = test_client.call_pipeline(pipe1, pipeline_input=list(range(20)), use_worker=True)66 assert response.job_status == JobStatus.done67 assert response.value == [not i % 2 for i in range(20)]68# this needs to be flaky because it might take a little bit longer69@flaky(3, 1)70def test_complex_sk_training_pipeline_async(complex_sk_pipelines, tmpdir, opstore_func):71 """tests the async with a more complexe sklearn based pipeline"""72 with RQWorkerContext():73 train_transform, train_pipe, pred_pipe = complex_sk_pipelines74 train_transform.use_worker = True75 train_pipe.use_worker = True76 my_app = PipelinesServer(app_pipelines=[train_transform, train_pipe, pred_pipe],77 op_store_client=opstore_func(tmpdir), import_name='my_app',78 worker_pool=RQWorkerPool(redis=Redis()))79 test_client = TestPipelinesClient(my_app)80 response = test_client.call_pipeline(train_transform)81 time.sleep(5.)82 response = test_client.fetch_job(response.job_id, train_transform)83 assert response.job_status == JobStatus.done84 # test_client.load_pipeline(train_pipe)85 response = test_client.call_pipeline(train_pipe)86 time.sleep(5.)87 response = test_client.fetch_job(response.job_id, train_pipe)88 assert response.job_status == JobStatus.done89 test_client.load_pipeline(pred_pipe)90 response = test_client.call_pipeline(pred_pipe, pipeline_input=[91 [100, 101, 102],92 [101, 102, 103],93 [102, 103, 104]])94 assert response.job_status == JobStatus.done95 assert len(response.value) == 396 for i, individual_value in enumerate(response.value):97 assert abs(101 + i - individual_value) < 1e-598 response = test_client.call_pipeline(train_transform)99 test_client.save_pipeline(train_transform)100 time.sleep(5.)101 response = test_client.fetch_job(response.job_id, train_transform)102 assert response.job_status == JobStatus.done103 with pytest.raises(VersionError):104 test_client.load_pipeline(pred_pipe)105@flaky(5, 1)106def test_train_keras_pipeline_async(tmpdir, opstore_func):107 """test the workers with a keras pipeline"""108 with RQWorkerContext():109 train_pipeline, pred_pipeline = build_keras_pipeline(train_async=True)110 my_app = PipelinesServer(app_pipelines=[train_pipeline, pred_pipeline], op_store_client=opstore_func(tmpdir),111 import_name='my_app', worker_pool=RQWorkerPool(redis=Redis()))112 client = TestPipelinesClient(my_app)113 client.call_pipeline(train_pipeline)114 response = client.call_pipeline(train_pipeline)115 time.sleep(10)116 response = client.fetch_job(response.job_id, train_pipeline)117 assert response.job_status == JobStatus.done118 client.load_pipeline(pred_pipeline)...
test_subpool.py
Source:test_subpool.py
1from __future__ import print_function2from future import standard_library3standard_library.install_aliases()4from builtins import range5from bson import ObjectId6import urllib.request, urllib.error, urllib.parse7import json8import time9import os10import pytest11@pytest.mark.parametrize(["use_worker"], [[False], [True]])12def test_subpool_simple(worker, use_worker):13 # Check that a subpool can be used both in an outside of a Job context14 if use_worker:15 worker.start()16 else:17 from tests.tasks.general import SubPool18 def run(params):19 if use_worker:20 return worker.send_task("tests.tasks.general.SubPool", params)21 else:22 return SubPool().run(params)23 # Check that sequential sleeps work24 start_time = time.time()25 result = run({26 "pool_size": 1, "inner_params": [1, 1]27 })28 total_time = time.time() - start_time29 assert result == [1, 1]30 assert total_time > 231 # py.test doesn't use gevent so we don't get the benefits of the hub32 if use_worker:33 # Parallel sleeps34 start_time = time.time()35 result = run({36 "pool_size": 20, "inner_params": [1] * 2037 })38 total_time = time.time() - start_time39 assert result == [1] * 2040 assert total_time < 241@pytest.mark.parametrize(["p_imap"], [42 [True],43 [False]44])45def test_subpool_exception(worker, p_imap):46 # An exception in the subpool is raised outside the pool47 worker.send_task("tests.tasks.general.SubPool", {48 "pool_size": 20, "inner_params": ["exception"], "imap": p_imap49 }, accept_statuses=["failed"])50 job = worker.mongodb_jobs.mrq_jobs.find_one()51 assert job52 assert job["status"] == "failed"53 assert "__INNER_EXCEPTION_LINE__" in job["traceback"]54@pytest.mark.parametrize(["p_size"], [55 [0],56 [1],57 [2],58 [100]59])60def test_subpool_import(worker, p_size):61 """ This tests that the patch_import() function does its job of preventing a gevent crash62 like explained in https://code.google.com/p/gevent/issues/detail?id=108 """63 # Large file import64 worker.send_task("tests.tasks.general.SubPool", {65 "pool_size": p_size, "inner_params": ["import-large-file"] * p_size66 }, accept_statuses=["success"])67def test_subpool_imap():68 from mrq.context import subpool_imap69 def iterator(n):70 for i in range(0, n):71 if i == 5:72 raise Exception("Iterator exception!")73 yield i74 def inner_func(i):75 time.sleep(1)76 print("inner_func: %s" % i)77 if i == 4:78 raise Exception("Inner exception!")79 return i * 280 with pytest.raises(Exception):81 for res in subpool_imap(10, inner_func, iterator(10)):82 print("Got %s" % res)83 for res in subpool_imap(2, inner_func, iterator(1)):84 print("Got %s" % res)85 with pytest.raises(Exception):86 for res in subpool_imap(2, inner_func, iterator(5)):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!