Best Python code snippet using locust
test_cli.py
Source:test_cli.py
...35 del os.environ['PYTHONPATH']36 else:37 os.environ['PYTHONPATH'] = old_path38 sys.path = old_sys_path39def start_worker(name, args=None, exit_code=0, exception_str=''):40 args = check.opt_list_param(args, 'args')41 runner = CliRunner()42 result = runner.invoke(main, ['worker', 'start', '-d'] + args + ['--name', name])43 assert result.exit_code == exit_code, str(result.exception)44 if exception_str:45 assert exception_str in str(result.exception)46@contextmanager47def cleanup_worker(name, args=None):48 args = check.opt_list_param(args, 'args')49 try:50 yield51 finally:52 runner = CliRunner()53 result = runner.invoke(main, ['worker', 'terminate'] + args + [name])54 assert result.exit_code == 0, str(result.exception)55def check_for_worker(name, args=None, present=True):56 runner = CliRunner()57 args = check.opt_list_param(args, 'args')58 result = runner.invoke(main, ['worker', 'list'] + args)59 assert result.exit_code == 0, str(result.exception)60 retry_count = 061 while retry_count < 10 and (62 not '{name}@'.format(name=name) in result.output63 if present64 else '{name}@'.format(name=name) in result.output65 ):66 time.sleep(1)67 result = runner.invoke(main, ['worker', 'list'] + args)68 assert result.exit_code == 0, str(result.exception)69 retry_count += 170 return (71 '{name}@'.format(name=name) in result.output72 if present73 else '{name}@'.format(name=name) not in result.output74 )75def test_invoke_entrypoint():76 runner = CliRunner()77 result = runner.invoke(main)78 assert result.exit_code == 079 assert 'worker' in result.output80 runner = CliRunner()81 result = runner.invoke(main, ['worker'])82 assert result.exit_code == 083 assert 'dagster-celery start' in result.output84@skip_ci85def test_start_worker():86 with cleanup_worker('dagster_test_worker'):87 start_worker('dagster_test_worker')88 assert check_for_worker('dagster_test_worker')89@skip_ci90def test_start_worker_too_many_queues():91 args = ['-q', '1', '-q', '2', '-q', '3', '-q', '4', '-q', '5']92 with cleanup_worker('dagster_test_worker'):93 start_worker(94 'dagster_test_worker',95 args=args,96 exit_code=1,97 exception_str=(98 'Can\'t start a dagster_celery worker that listens on more than four queues, due to a '99 'bug in Celery 4.'100 ),101 )102@skip_ci103def test_start_worker_config_from_empty_yaml():104 args = ['-y', file_relative_path(__file__, 'empty.yaml')]105 with cleanup_worker('dagster_test_worker', args=args):106 start_worker('dagster_test_worker', args=args)107 assert check_for_worker('dagster_test_worker')108@skip_ci109def test_start_worker_config_from_yaml():110 args = ['-y', file_relative_path(__file__, 'engine_config.yaml')]111 with cleanup_worker('dagster_test_worker', args=args):112 start_worker('dagster_test_worker', args=args)113 assert check_for_worker('dagster_test_worker')114@mock.patch('dagster_celery.cli.launch_background_worker')115def test_mock_start_worker(worker_patch):116 start_worker('dagster_test_worker')117 assert_called(worker_patch)118@mock.patch('dagster_celery.cli.launch_background_worker')119def test_mock_start_worker_config_from_empty_yaml(worker_patch):120 args = ['-y', file_relative_path(__file__, 'empty.yaml')]121 start_worker('dagster_test_worker', args=args)122 assert_called(worker_patch)123@mock.patch('dagster_celery.cli.launch_background_worker')124def test_start_mock_worker_config_from_yaml(worker_patch):125 args = ['-y', file_relative_path(__file__, 'engine_config.yaml')]126 start_worker('dagster_test_worker', args=args)127 assert_called(worker_patch)128@mock.patch('dagster_celery.cli.launch_background_worker')129def test_mock_start_worker_too_many_queues(_worker_patch):130 args = ['-q', '1', '-q', '2', '-q', '3', '-q', '4', '-q', '5']131 start_worker(132 'dagster_test_worker',133 args=args,134 exit_code=1,135 exception_str=(136 'Can\'t start a dagster_celery worker that listens on more than four queues, due to a '137 'bug in Celery 4.'138 ),...
test_barrier.py
Source:test_barrier.py
...7async def test_barrier(zk, path):8 is_lifted = False9 loop = asyncio.get_running_loop()10 is_worker_started = loop.create_future()11 async def start_worker():12 barrier = zk.recipes.Barrier(path)13 is_worker_started.set_result('ok')14 await barrier.wait()15 assert is_lifted is True16 barrier = zk.recipes.Barrier(path)17 await barrier.create()18 worker = asyncio.create_task(start_worker())19 is_ok = await is_worker_started20 assert is_ok == 'ok'21 is_lifted = True22 await barrier.lift()23 await worker24@pytest.mark.asyncio25async def test_double_barrier(zk, path):26 num_workers = 027 workers = []28 async def start_worker(min_workers):29 barrier = zk.recipes.DoubleBarrier(path, min_workers)30 await barrier.enter()31 for i in range(5):32 assert num_workers >= min_workers33 await barrier.leave()34 target = 835 for _ in range(target):36 num_workers += 137 workers.append(asyncio.create_task(start_worker(target)))38 await asyncio.wait(workers)39 await zk.delete(path)40@pytest.mark.asyncio41async def test_many_waiters(zk, path):42 """Test for many waiters"""43 WORKER_NUM = 100044 worker_cnt = 045 pass_barrier = 046 cond = asyncio.Condition()47 async def start_worker():48 barrier = zk.recipes.Barrier(path)49 nonlocal worker_cnt50 worker_cnt += 151 async with cond:52 cond.notify()53 await barrier.wait()54 nonlocal pass_barrier55 pass_barrier += 156 worker_cnt -= 157 async with cond:58 cond.notify()59 barrier = zk.recipes.Barrier(path)60 await barrier.create()61 for _ in range(WORKER_NUM):62 asyncio.create_task(start_worker())63 async with cond:64 await cond.wait_for(lambda: worker_cnt == WORKER_NUM)65 await asyncio.sleep(1)66 # Make sure that all workers are blocked at .wait() coroutines. And no one67 # passed beyond the barrier until now.68 assert pass_barrier == 069 await barrier.lift()70 async def drain():71 async with cond:72 await cond.wait_for(lambda: worker_cnt == 0)73 await asyncio.wait_for(drain(), timeout=5)74 assert pass_barrier == WORKER_NUM75@pytest.mark.asyncio76async def test_wait_before_create(zk, path):77 """await barrier.wait() should finish immediately if the barrier does not78 exist. Because it is semantically right: No barrier, no blocking.79 """80 wait_finished = False81 async def start_worker():82 barrier = zk.recipes.Barrier(path)83 await barrier.wait()84 nonlocal wait_finished85 wait_finished = True86 task = asyncio.create_task(start_worker())87 try:88 await asyncio.wait_for(task, timeout=2)89 except asyncio.TimeoutError:90 pass91 assert wait_finished92@pytest.mark.asyncio93async def test_double_barrier_timeout(zk, path):94 entered = False95 MIN_WORKERS = 1096 barrier = zk.recipes.DoubleBarrier(path, MIN_WORKERS)97 with pytest.raises(exc.TimeoutError):98 await barrier.enter(timeout=0.5)99 entered = True100 assert not entered101 await zk.deleteall(path)102@pytest.mark.asyncio103async def test_double_barrier_enter_leakage(zk, path):104 enter_count = 0105 MIN_WORKERS = 32106 async def start_worker():107 nonlocal enter_count108 barrier = zk.recipes.DoubleBarrier(path, MIN_WORKERS)109 await barrier.enter(timeout=0.5)110 enter_count += 1111 with pytest.raises(exc.TimeoutError):112 await start_worker()113 assert enter_count == 0114 try:115 results = await asyncio.gather(116 *[start_worker() for _ in range(MIN_WORKERS - 1)],117 return_exceptions=True)118 assert all([isinstance(x, exc.TimeoutError) for x in results])119 assert enter_count == 0120 assert len(await zk.get_children(path)) == 0121 finally:122 await zk.deleteall(path)123@pytest.mark.asyncio124async def test_double_barrier_leave_timeout(zk, path):125 MIN_WORKERS = 11126 async def start_first_worker():127 barrier = zk.recipes.DoubleBarrier(path, MIN_WORKERS)128 await barrier.enter()129 # Some of workers won't leave in timeout so timeout exception should be130 # occured131 await barrier.leave(timeout=0.5)132 count = 0133 async def start_worker():134 barrier = zk.recipes.DoubleBarrier(path, MIN_WORKERS)135 await barrier.enter()136 nonlocal count137 count += 1138 await asyncio.sleep(1 - count * 0.1)139 await barrier.leave()140 tasks = []141 first_task = asyncio.create_task(start_first_worker())142 for _ in range(MIN_WORKERS - 1):143 tasks.append(asyncio.create_task(start_worker()))144 # Ensure the order of creation of sequential znodes145 await asyncio.sleep(0.1)146 try:147 with pytest.raises(exc.TimeoutError):148 await first_task149 finally:150 for task in tasks:151 if not task.done():152 task.cancel()...
worker.py
Source:worker.py
...6from rq import Worker, Queue, Connection7listen = ['high', 'default', 'low']8redis_url = os.getenv('REDISTOGO_URL', 'redis://redistogo:137dd33f39a1fd083ea34578f5984441@spinyfin.redistogo.com:9852/')9conn = redis.from_url(redis_url)10def start_worker():11 with Connection(conn):12 worker = Worker(map(Queue, listen))13 worker.work()14 return None15if __name__ == '__main__':16 with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:17 future_to_url = [18 executor.submit(start_worker), 19 executor.submit(start_worker), 20 executor.submit(start_worker), 21 executor.submit(start_worker), 22 executor.submit(start_worker), 23 executor.submit(start_worker), 24 executor.submit(start_worker), ...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!