Best Python code snippet using locust
test_concurrency_lock.py
Source:test_concurrency_lock.py
...5from celery_worker.celery import app6from celery_worker.locks import args_to_uid, concurrency_lock7import unittest8@app.task9def dummy_task(self):10 pass11class ConcurrencyLockTestCase(unittest.TestCase):12 @patch("celery_worker.locks.get_mongodb_collection")13 def test_success(self, get_collection):14 get_collection.return_value.insert_one.return_value = None15 test_method_procedure = Mock()16 @concurrency_lock17 def test_method(*args, **kwargs):18 test_method_procedure(*args, **kwargs)19 with patch("celery_worker.locks.datetime") as datetime_mock:20 datetime_mock.utcnow.return_value = datetime.utcnow()21 test_method(dummy_task, 1, 2, message="Hi") # run22 task_uid = args_to_uid(23 (test_method.__module__, test_method.__name__, (1, 2), dict(message="Hi"))...
test_operators.py
Source:test_operators.py
1# from datetime import datetime, timedelta2# from airflow import DAG3# from airflow.operators.dummy_operator import DummyOperator4# from airflow.operators import MyFirstOperator, EmailOperator, BashOperator, ManagerOperator, PrinterOperator, DataRetriveOperator5# from airflow.operators.mysql_operator import MySqlOperator6# from airflow.hooks.hive_hooks import HiveServer2Hook7# from airflow.hooks.mysql_hook import MySqlHook8#9# # from airflow.operators import MySqlOperator10#11# import numpy as np12#13# default_args = {14# 'owner': 'Daniel',15# 'depends_on_past': False,16# 'start_date': datetime(2018, 3, 5),17# 'email_on_failure': True,18# 'email_on_retry': True,19# 'retries': 5,20# 'retry_delay': timedelta(minutes=1),21# }22#23# dag = DAG('Daniel_task2', default_args=default_args, description='Another tutorial DAG',24# schedule_interval='0 12 * * *', catchup=False)25#26# mysql_hook = MySqlHook(conn_id='dfdfd_msql')27#28# # ids = mysql_hook.get_pandas_df('SELECT ID FROM MWS_COLT_ITEM LIMIT 100')29#30# # t = MySqlOperator(task_id='mysql_task', sql='SELECT * FROM wspider_temp.MWS_COLT_ITEM_IVT LIMIT 110', dag=dag)31# # t.run(start_date=datetime(2018, 3, 5), end_date=datetime(2018, 3, 8), ignore_ti_state=True)32#33#34# # dummy_task = DummyOperator(task_id='dummy_task', dag=dag)35#36# # sensor_task = MyFirstSensor(task_id='my_sensor_task', poke_interval=30, dag=dag)37#38#39# manager_task = ManagerOperator(my_operator_param='test', db_hook=mysql_hook, task_id='manager_task', dag=dag)40#41#42#43# impala_task = DataRetriveOperator(my_operator_param='impala', db_hook=mysql_hook, task_id='my_impala_task', dag=dag)44#45#46# manager_task >> impala_task47#48# # manager_task >> impala_task49# # db_tasks = []50# # for i in range(10):51# # db_task = DataRetriveOperator(my_operator_param='impala', db_hook=mysql_hook, task_id='my_impala_task_%s' % i, dag=dag)52# # db_tasks.append(db_task)53# #54# # for db_task in db_tasks:55# # dummy_task >> db_task56#57# # mysql_task = MySqlOperator(task_id='mysql_task', sql='SELECT * FROM airflow.connection', dag=dag)58# # print(mysql_task)59#60# # mysql_task = MySqlOperator(task_id='mysql_task', sql='SELECT * FROM wspider_temp.ADDRESS', dag=dag)61#62# # printer_task = PrinterOperator(my_operator_param='printer', )63#64# # operator_task = MyFirstOperator(my_operator_param='This is a test', task_id='my_first_operator_task', dag=dag)65#66#67# # emailNotify_task = EmailOperator(68# # task_id='email_notification',69# # to = ['daniel.kim@epopcon.com'],70# # subject = 'Epopcon ETL Daily Report',71# # html_content = 'Epopcon ETL is successfully finished!', dag=dag)72#73# # dummy_task >> sensor_task >> manager_task74#75# # arr_lst = np.array(range(100))76# #77# # jobs = np.array_split(arr_lst, 20)78# #79# # print_tasks = []80# #81# # for idx, job in enumerate(jobs):82# # task = PrinterOperator(my_operator_param='printer', task_id='printer_%s' % idx, assigned_lst=job, dag=dag)83# # print_tasks.append(task)84# #85#86# # manager_task >> sensor_task >> emailNotify_task87#88# # manager_task >> mysql_task89# # dummy_task >> mysql_task90#91# # for print_task in print_tasks:...
test_searchalgorithms_hillclimbing.py
Source:test_searchalgorithms_hillclimbing.py
1"""2Unit test for searchalgorithms_hillclimbing.py3"""4from search import enforced_hillclimbing_search5from . import dummy_task6from task import *7def test_enforced_hillclimbing_search_at_goal():8 """9 plan of length 0, start is equal to goal10 """11 task = dummy_task.get_search_space_at_goal()12 heuristic = dummy_task.DummyHeuristic(task)13 solution = enforced_hillclimbing_search(task, heuristic)14 print(solution)15 assert solution is not None16 assert len(solution) == 017def test_enforced_hillclimbing_search_no_solution():18 """19 goal is not reachable20 """21 task = dummy_task.get_search_space_no_solution()22 heuristic = dummy_task.DummyHeuristic(task)23 solution = enforced_hillclimbing_search(task, heuristic)24 print(solution)25 assert solution is None26def test_enforced_hillclimbing_search_three_step():27 """28 plan with length 329 """30 task = dummy_task.get_simple_search_space()31 heuristic = dummy_task.DummyHeuristic(task)32 solution = enforced_hillclimbing_search(task, heuristic)33 print(solution)34 assert solution is not None35 assert len(solution) == 336def test_enforced_hillclimbing_search_four_step():37 """38 plan with length 439 """40 task = dummy_task.get_simple_search_space_2()41 heuristic = dummy_task.DummyHeuristic(task)42 solution = enforced_hillclimbing_search(task, heuristic)43 print(solution)44 assert solution is not None...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!