Best Python code snippet using autotest_python
test_task.py
Source:test_task.py
1import time2import uuid3from typing import Optional # noqa: F4014from uuid import UUID, uuid1 # noqa: F4015import asynctest6import jsonpickle7import pytest8from asynctest import CoroutineMock, MagicMock9import dagger10import dagger.service.services11from dagger.tasks.task import (12 COMPLETE_BY_KEY,13 CorrelatableMapValue,14 DecisionTask,15 DefaultMonitoringTask,16 DefaultProcessTemplateDAGInstance,17 DefaultTemplateDAGInstance,18 IntervalTask,19 ITask,20 ITemplateDAGInstance,21 KafkaAgent,22 KafkaCommandTask,23 KafkaListenerTask,24 MonitoredProcessTemplateDAGInstance,25 ParallelCompositeTask,26 SensorTask,27 SkipOnMaxDurationTask,28 SystemTimerTask,29 TaskOperator,30 TaskStatus,31 TaskStatusEnum,32 Trigger,33 TriggerTask,34)35test = 2 * 1024**336test = test37class TestTasks:38 @pytest.fixture()39 def template_fixture(self):40 return DefaultTemplateDAGInstance(uuid1())41 @pytest.fixture()42 def executor_fixture(self):43 return KafkaCommandTask(uuid1())44 @pytest.fixture()45 def decision_fixture(self):46 return DecisionTask(uuid1())47 @pytest.fixture()48 def sensor_fixture(self):49 return KafkaListenerTask(uuid1())50 @pytest.fixture()51 def trigger_fixture(self):52 return TriggerTask(uuid1())53 @pytest.fixture()54 def interval_fixture(self):55 return IntervalTask(uuid1())56 @pytest.fixture()57 def system_timer_fixture(self):58 return SystemTimerTask(uuid1())59 @pytest.fixture()60 def parallel_composite_task_fixture(self):61 dagger.service.services.Dagger.app = MagicMock()62 return ParallelCompositeTask(uuid.uuid1())63 @pytest.fixture()64 def workflow_instance_fixture(self):65 return DefaultTemplateDAGInstance(uuid.uuid1())66 @pytest.mark.asyncio67 async def test_parallel_composite_task_stop(self, parallel_composite_task_fixture):68 try:69 await parallel_composite_task_fixture.stop()70 except Exception:71 pytest.fail("Error should not be thrown")72 @pytest.mark.asyncio73 async def test_parallel_composite_task_execute(74 self, parallel_composite_task_fixture75 ):76 try:77 await parallel_composite_task_fixture.execute(runtime_parameters={})78 assert (79 parallel_composite_task_fixture.status.code80 == TaskStatusEnum.EXECUTING.name81 )82 except Exception:83 pytest.fail("Error should not be thrown")84 @pytest.mark.asyncio85 async def test_parallel_composite_task_start_terminal(86 self, parallel_composite_task_fixture, workflow_instance_fixture87 ):88 try:89 parallel_composite_task_fixture.status = TaskStatus(90 code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value91 )92 parallel_composite_task_fixture.execute = CoroutineMock()93 parallel_composite_task_fixture.on_complete = CoroutineMock()94 await parallel_composite_task_fixture.start(95 workflow_instance=workflow_instance_fixture96 )97 assert not parallel_composite_task_fixture.execute.called98 assert parallel_composite_task_fixture.on_complete.called99 except Exception:100 pytest.fail("Error should not be thrown")101 @pytest.mark.asyncio102 async def test_parallel_composite_task_start_non_terminal(103 self, parallel_composite_task_fixture, workflow_instance_fixture104 ):105 try:106 parallel_composite_task_fixture.status = TaskStatus(107 code=TaskStatusEnum.SUBMITTED.name, value=TaskStatusEnum.SUBMITTED.value108 )109 child_task1 = MagicMock()110 child_task1.start = CoroutineMock()111 child_task2 = MagicMock()112 child_task2.start = CoroutineMock()113 id1 = uuid1()114 id2 = uuid1()115 parallel_composite_task_fixture.parallel_child_task_list = [id1, id2]116 dagger.service.services.Dagger.app._update_instance = CoroutineMock()117 dagger.service.services.Dagger.app.get_instance = CoroutineMock(118 side_effect=lambda x, log: child_task1 if x == id1 else child_task2119 )120 parallel_composite_task_fixture.execute = CoroutineMock()121 parallel_composite_task_fixture.on_complete = CoroutineMock()122 dagger.service.services.Dagger.app._update_instance = CoroutineMock()123 workflow_instance_fixture.tasks[id1] = child_task1124 workflow_instance_fixture.tasks[id2] = child_task2125 await parallel_composite_task_fixture.start(126 workflow_instance=workflow_instance_fixture127 )128 assert parallel_composite_task_fixture.execute.called129 assert dagger.service.services.Dagger.app._update_instance.called130 assert not parallel_composite_task_fixture.on_complete.called131 assert child_task1.start.called132 assert child_task2.start.called133 assert dagger.service.services.Dagger.app._update_instance.called134 except Exception:135 pytest.fail("Error should not be thrown")136 @pytest.mark.asyncio137 async def test_parallel_composite_task_start_non_terminal_executing(138 self, parallel_composite_task_fixture, workflow_instance_fixture139 ):140 try:141 parallel_composite_task_fixture.status = TaskStatus(142 code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value143 )144 child_task1 = MagicMock()145 child_task1.start = CoroutineMock()146 child_task2 = MagicMock()147 child_task2.start = CoroutineMock()148 id1 = uuid1()149 id2 = uuid1()150 parallel_composite_task_fixture.parallel_child_task_list = [id1, id2]151 dagger.service.services.Dagger.app._update_instance = CoroutineMock()152 workflow_instance_fixture.tasks[id1] = child_task1153 workflow_instance_fixture.tasks[id2] = child_task2154 parallel_composite_task_fixture.execute = CoroutineMock()155 parallel_composite_task_fixture.on_complete = CoroutineMock()156 await parallel_composite_task_fixture.start(157 workflow_instance=workflow_instance_fixture158 )159 assert not parallel_composite_task_fixture.execute.called160 assert not parallel_composite_task_fixture.on_complete.called161 assert child_task1.start.called162 assert child_task2.start.called163 assert not dagger.service.services.Dagger.app._update_instance.called164 except Exception:165 pytest.fail("Error should not be thrown")166 @pytest.mark.asyncio167 async def test_parallel_composite_task_notify_same_status(168 self, parallel_composite_task_fixture169 ):170 parallel_composite_task_fixture.status = TaskStatus(171 code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value172 )173 child_task1 = MagicMock()174 child_task1.start = CoroutineMock()175 child_task2 = MagicMock()176 child_task2.start = CoroutineMock()177 id1 = uuid1()178 id2 = uuid1()179 parallel_composite_task_fixture.parallel_child_task_list = [id1, id2]180 dagger.service.services.Dagger.app.get_instance = CoroutineMock()181 await parallel_composite_task_fixture.notify(182 parallel_composite_task_fixture.status183 )184 assert not dagger.service.services.Dagger.app.get_instance.called185 @pytest.mark.asyncio186 async def test_parallel_composite_task_notify_join_all_both_complete(187 self, parallel_composite_task_fixture, workflow_instance_fixture188 ):189 parallel_composite_task_fixture.status = TaskStatus(190 code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value191 )192 parallel_composite_task_fixture.operator_type = TaskOperator.JOIN_ALL.name193 child_task1 = MagicMock()194 child_task1.start = CoroutineMock()195 child_task1.status.code = TaskStatusEnum.COMPLETED.name196 child_task2 = MagicMock()197 child_task2.status.code = TaskStatusEnum.COMPLETED.name198 child_task2.start = CoroutineMock()199 id1 = uuid1()200 id2 = uuid1()201 parallel_composite_task_fixture.on_complete = CoroutineMock()202 parallel_composite_task_fixture.parallel_child_task_list = [id1, id2]203 workflow_instance_fixture.tasks[id1] = child_task1204 workflow_instance_fixture.tasks[id2] = child_task2205 await parallel_composite_task_fixture.notify(206 TaskStatus(207 code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value208 ),209 workflow_instance=workflow_instance_fixture,210 )211 assert parallel_composite_task_fixture.on_complete.called212 @pytest.mark.asyncio213 async def test_parallel_composite_task_notify_join_not_both_complete(214 self, parallel_composite_task_fixture, workflow_instance_fixture215 ):216 parallel_composite_task_fixture.status = TaskStatus(217 code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value218 )219 parallel_composite_task_fixture.operator_type = TaskOperator.JOIN_ALL.name220 child_task1 = MagicMock()221 child_task1.start = CoroutineMock()222 child_task1.status.code = TaskStatusEnum.COMPLETED.name223 child_task2 = MagicMock()224 child_task2.status.code = TaskStatusEnum.EXECUTING.name225 child_task2.start = CoroutineMock()226 id1 = uuid1()227 id2 = uuid1()228 parallel_composite_task_fixture.on_complete = CoroutineMock()229 parallel_composite_task_fixture.parallel_child_task_list = [id1, id2]230 workflow_instance_fixture.tasks[id1] = child_task1231 workflow_instance_fixture.tasks[id2] = child_task2232 await parallel_composite_task_fixture.notify(233 TaskStatus(234 code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value235 ),236 workflow_instance=workflow_instance_fixture,237 )238 assert not parallel_composite_task_fixture.on_complete.called239 @pytest.mark.asyncio240 async def test_parallel_composite_task_notify_atleast_one_none_complete(241 self, parallel_composite_task_fixture, workflow_instance_fixture242 ):243 parallel_composite_task_fixture.status = TaskStatus(244 code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value245 )246 parallel_composite_task_fixture.operator_type = TaskOperator.ATLEAST_ONE.name247 child_task1 = MagicMock()248 child_task1.start = CoroutineMock()249 child_task1.status.code = TaskStatusEnum.EXECUTING.name250 child_task2 = MagicMock()251 child_task2.status.code = TaskStatusEnum.EXECUTING.name252 child_task2.start = CoroutineMock()253 id1 = uuid1()254 id2 = uuid1()255 parallel_composite_task_fixture.on_complete = CoroutineMock()256 parallel_composite_task_fixture.parallel_child_task_list = [id1, id2]257 workflow_instance_fixture.tasks[id1] = child_task1258 workflow_instance_fixture.tasks[id2] = child_task2259 await parallel_composite_task_fixture.notify(260 TaskStatus(261 code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value262 ),263 workflow_instance=workflow_instance_fixture,264 )265 assert not parallel_composite_task_fixture.on_complete.called266 @pytest.mark.asyncio267 async def test_parallel_composite_task_notify_atleast_one_one_complete(268 self, parallel_composite_task_fixture, workflow_instance_fixture269 ):270 parallel_composite_task_fixture.status = TaskStatus(271 code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value272 )273 parallel_composite_task_fixture.operator_type = TaskOperator.ATLEAST_ONE.name274 child_task1 = MagicMock()275 child_task1.start = CoroutineMock()276 child_task1.status.code = TaskStatusEnum.EXECUTING.name277 child_task2 = MagicMock()278 child_task2.status.code = TaskStatusEnum.COMPLETED.name279 child_task2.start = CoroutineMock()280 id1 = uuid1()281 id2 = uuid1()282 parallel_composite_task_fixture.on_complete = CoroutineMock()283 parallel_composite_task_fixture.parallel_child_task_list = [id1, id2]284 workflow_instance_fixture.tasks[id1] = child_task1285 workflow_instance_fixture.tasks[id2] = child_task2286 await parallel_composite_task_fixture.notify(287 TaskStatus(288 code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value289 ),290 workflow_instance=workflow_instance_fixture,291 )292 assert parallel_composite_task_fixture.on_complete.called293 @pytest.mark.asyncio294 async def test_get_remaining_tasks(295 self, template_fixture, executor_fixture, sensor_fixture, decision_fixture296 ):297 dagger.service.services.Dagger.app.get_instance = CoroutineMock(298 side_effect=lambda m: m299 )300 first_process = DefaultProcessTemplateDAGInstance(uuid1())301 second_process = DefaultProcessTemplateDAGInstance(uuid1())302 template_fixture.root_dag = first_process.id303 template_fixture.add_task(first_process)304 first_process.root_dag = executor_fixture.id305 template_fixture.add_task(executor_fixture)306 second_process.root_dag = decision_fixture.id307 template_fixture.add_task(decision_fixture)308 template_fixture.add_task(sensor_fixture)309 template_fixture.add_task(second_process)310 first_process.next_dags = [second_process.id]311 second_process.next_dags = []312 executor_fixture.next_dags = [sensor_fixture.id]313 executor_fixture.root_dag = None314 sensor_fixture.next_dags = []315 sensor_fixture.root_dag = None316 decision_fixture.next_dags = []317 decision_fixture.root_dag = None318 remaining_tasks = await template_fixture.get_remaining_tasks(319 template_fixture.root_dag, workflow_instance=template_fixture, tasks=[]320 )321 assert remaining_tasks == [322 executor_fixture,323 sensor_fixture,324 first_process,325 decision_fixture,326 second_process,327 ]328 sensor_fixture.get_id = MagicMock(return_value=1)329 remaining_tasks = await template_fixture.get_remaining_tasks(330 template_fixture.root_dag,331 workflow_instance=template_fixture,332 tasks=[],333 end_task_id=1,334 )335 assert remaining_tasks == [executor_fixture, sensor_fixture]336 executor_fixture.get_id = MagicMock(return_value=2)337 remaining_tasks = await template_fixture.get_remaining_tasks(338 template_fixture.root_dag,339 workflow_instance=template_fixture,340 tasks=[],341 end_task_id=2,342 )343 assert remaining_tasks == [executor_fixture]344 first_process.get_id = MagicMock(return_value=3)345 remaining_tasks = await template_fixture.get_remaining_tasks(346 template_fixture.root_dag,347 workflow_instance=template_fixture,348 tasks=[],349 end_task_id=3,350 )351 assert remaining_tasks == [executor_fixture, sensor_fixture, first_process]352 decision_fixture.get_id = MagicMock(return_value=4)353 remaining_tasks = await template_fixture.get_remaining_tasks(354 template_fixture.root_dag,355 workflow_instance=template_fixture,356 tasks=[],357 end_task_id=4,358 )359 assert remaining_tasks == [360 executor_fixture,361 sensor_fixture,362 first_process,363 decision_fixture,364 ]365 second_process.get_id = MagicMock(return_value=5)366 remaining_tasks = await template_fixture.get_remaining_tasks(367 template_fixture.root_dag,368 workflow_instance=template_fixture,369 tasks=[],370 end_task_id=5,371 )372 assert remaining_tasks == [373 executor_fixture,374 sensor_fixture,375 first_process,376 decision_fixture,377 second_process,378 ]379 @pytest.mark.asyncio380 async def test_merge_runtime_parameters_template_instance(self, template_fixture):381 dagger.service.services.Dagger.app = CoroutineMock()382 dagger.service.services.Dagger.app._update_instance = CoroutineMock()383 template_fixture.runtime_parameters = {"k1": "10", "k2": "20"}384 remaining_task = SensorTask(uuid1())385 template_fixture.add_task(remaining_task)386 template_fixture.sensor_tasks_to_correletable_map[387 remaining_task.id388 ] = CorrelatableMapValue("k1", "0")389 remaining_task.status.code = TaskStatusEnum.NOT_STARTED.name390 remaining_task._update_correletable_key = CoroutineMock()391 template_fixture.get_remaining_tasks = CoroutineMock(392 return_value=[remaining_task]393 )394 await template_fixture._update_global_runtime_parameters()395 assert template_fixture.runtime_parameters == {"k1": "10", "k2": "20"}396 assert remaining_task._update_correletable_key.called397 assert template_fixture.sensor_tasks_to_correletable_map[398 remaining_task.id399 ] == CorrelatableMapValue("k1", "10")400 @pytest.mark.asyncio401 async def test_executortask(self, executor_fixture, workflow_instance_fixture):402 assert executor_fixture.status.code == TaskStatusEnum.NOT_STARTED.name403 dagger.service.services.Dagger.app = CoroutineMock()404 dagger.service.services.Dagger.app._update_instance = CoroutineMock()405 mock_status = MagicMock()406 jsonpickle.encode = MagicMock()407 mock_status.status.code = TaskStatusEnum.SKIPPED.name408 dagger.service.services.Dagger.app.get_instance = CoroutineMock(409 return_value=None410 )411 assert executor_fixture.time_completed == 0412 executor_fixture.next_dags = []413 parent_task = DefaultProcessTemplateDAGInstance(uuid1())414 workflow_instance_fixture.add_task(parent_task)415 executor_fixture.parent_id = parent_task.id416 mock_parent_notify_func = CoroutineMock417 mock_parent_notify_func.notify = CoroutineMock()418 mock_parent_node_func = executor_fixture.get_parent_node = CoroutineMock()419 mock_parent_node_func.side_effect = mock_parent_notify_func420 parent_task.notify = CoroutineMock()421 assert executor_fixture.get_id() == executor_fixture.id422 await executor_fixture.start(workflow_instance=workflow_instance_fixture)423 assert dagger.service.services.Dagger.app._update_instance.called424 assert executor_fixture.status.code == TaskStatusEnum.COMPLETED.name425 assert executor_fixture.time_completed != 0426 assert parent_task.notify.called427 with pytest.raises(NotImplementedError):428 await executor_fixture.evaluate()429 with pytest.raises(NotImplementedError):430 await executor_fixture.on_message(431 workflow_instance_fixture.runtime_parameters, {}432 )433 @pytest.mark.asyncio434 async def test_decisiontask(self, decision_fixture, workflow_instance_fixture):435 assert decision_fixture.status.code == TaskStatusEnum.NOT_STARTED.name436 dagger.service.services.Dagger.app._update_instance = CoroutineMock()437 mock_status = MagicMock()438 mock_status.status.code = TaskStatusEnum.NOT_STARTED.name439 dagger.service.services.Dagger.app.get_instance = CoroutineMock(440 return_value=mock_status441 )442 decision_fixture.next_dags = []443 decision_fixture.on_complete = CoroutineMock()444 workflow_instance_fixture.runtime_parameters = {}445 assert decision_fixture.get_id() == decision_fixture.id446 await decision_fixture.start(workflow_instance=workflow_instance_fixture)447 assert dagger.service.services.Dagger.app._update_instance.called448 assert decision_fixture.on_complete.called449 with pytest.raises(NotImplementedError):450 await decision_fixture.execute(451 runtime_parameters={}, workflow_instance=workflow_instance_fixture452 )453 with pytest.raises(NotImplementedError):454 await decision_fixture.on_message(workflow_instance_fixture, {})455 @pytest.mark.asyncio456 async def test_template_on_complete(self, template_fixture):457 assert template_fixture.status.code == TaskStatusEnum.NOT_STARTED.name458 dagger.service.services.Dagger.app._update_instance = CoroutineMock()459 mock_task = MagicMock()460 mock_task.root_dag = None461 mock_task.next_dags = []462 mock_task.next_task_dag = None463 dagger.service.services.Dagger.app.get_instance.side_effect = [464 mock_task,465 KeyError,466 ]467 dagger.service.services.Dagger.app._remove_itask_instance = CoroutineMock()468 dagger.service.services.Dagger.app._remove_root_template_instance = (469 CoroutineMock()470 )471 dagger.service.services.Dagger.app._remove_root_template_instance = (472 CoroutineMock()473 )474 monitoring_task = MagicMock()475 dagger.service.services.Dagger.app.get_monitoring_task = CoroutineMock(476 return_value=monitoring_task477 )478 monitoring_task.on_complete = CoroutineMock()479 template_fixture.runtime_parameters = MagicMock()480 template_fixture.next_dags = []481 assert template_fixture.get_id() == template_fixture.id482 await template_fixture.on_complete(workflow_instance=template_fixture)483 assert template_fixture.status.code == TaskStatusEnum.COMPLETED.name484 cur_time = int(time.time())485 assert template_fixture.time_completed <= cur_time486 @pytest.mark.asyncio487 async def test_template_on_complete_time_set(self, template_fixture):488 assert template_fixture.status.code == TaskStatusEnum.NOT_STARTED.name489 dagger.service.services.Dagger.app._update_instance = CoroutineMock()490 mock_task = MagicMock()491 mock_task.root_dag = None492 mock_task.next_dags = []493 mock_task.next_task_dag = None494 dagger.service.services.Dagger.app.get_instance.side_effect = [495 mock_task,496 KeyError,497 ]498 monitoring_task = MagicMock()499 dagger.service.services.Dagger.app.get_monitoring_task = CoroutineMock(500 return_value=monitoring_task501 )502 monitoring_task.on_complete = CoroutineMock()503 template_fixture.runtime_parameters = MagicMock()504 template_fixture.next_dags = []505 template_fixture.time_completed = 123506 assert template_fixture.get_id() == template_fixture.id507 await template_fixture.on_complete(workflow_instance=template_fixture)508 assert template_fixture.status.code == TaskStatusEnum.COMPLETED.name509 assert template_fixture.time_completed == 123510 @pytest.mark.asyncio511 async def test_sensortask(self, sensor_fixture, workflow_instance_fixture):512 assert sensor_fixture.status.code == TaskStatusEnum.NOT_STARTED.name513 dagger.service.services.Dagger.app._update_instance = CoroutineMock()514 sensor_fixture.on_complete = CoroutineMock()515 assert sensor_fixture.get_id() == sensor_fixture.id516 payload = dict()517 payload["1"] = 1518 ret_val = sensor_fixture.get_correlatable_key(payload)519 assert payload == ret_val520 await sensor_fixture.start(workflow_instance=workflow_instance_fixture)521 assert dagger.service.services.Dagger.app._update_instance.called522 assert not sensor_fixture.on_complete.called523 with pytest.raises(NotImplementedError):524 await sensor_fixture.execute(525 {}, workflow_instance=workflow_instance_fixture526 )527 with pytest.raises(NotImplementedError):528 await sensor_fixture.evaluate()529 @pytest.mark.asyncio530 async def test_sensortask_update_key_equal(531 self, sensor_fixture: SensorTask, workflow_instance_fixture532 ):533 template_instance = MagicMock()534 workflow_instance_fixture.runtime_parameters = {"k1": "v1", "k2": "v2"}535 dagger.service.services.Dagger.app.get_root_template_instance = CoroutineMock(536 return_value=workflow_instance_fixture537 )538 sensor_fixture.correlatable_key = "k2"539 dagger.service.services.Dagger.app.update_correletable_key_for_task = (540 CoroutineMock()541 )542 await sensor_fixture._update_correletable_key(template_instance)543 assert (544 dagger.service.services.Dagger.app.update_correletable_key_for_task.called545 )546 @pytest.mark.asyncio547 async def test_sensortask_update_key_both_none(548 self, sensor_fixture: SensorTask, workflow_instance_fixture549 ):550 workflow_instance_fixture.runtime_parameters = {"k1": "v1", "k2": "v2"}551 dagger.service.services.Dagger.app.get_root_template_instance = CoroutineMock(552 return_value=workflow_instance_fixture553 )554 sensor_fixture.correlatable_key = "k3"555 sensor_fixture.runtime_parameters = {"k1": "v1"}556 dagger.service.services.Dagger.app.update_correletable_key_for_task = (557 CoroutineMock()558 )559 await sensor_fixture._update_correletable_key(workflow_instance_fixture)560 assert (561 dagger.service.services.Dagger.app.update_correletable_key_for_task.called562 )563 @pytest.mark.asyncio564 async def test_future_triggertask(self, trigger_fixture, workflow_instance_fixture):565 assert trigger_fixture.status.code == TaskStatusEnum.NOT_STARTED.name566 trigger_fixture.time_to_execute = int(time.time()) + 1000567 dagger.service.services.Dagger.app._update_instance = CoroutineMock()568 assert trigger_fixture.get_id() == trigger_fixture.id569 await trigger_fixture.start(workflow_instance=workflow_instance_fixture)570 assert trigger_fixture.status.code == TaskStatusEnum.EXECUTING.name571 @pytest.mark.asyncio572 async def test_current_triggertask(573 self, trigger_fixture, workflow_instance_fixture574 ):575 assert trigger_fixture.status.code == TaskStatusEnum.NOT_STARTED.name576 trigger_fixture.time_to_execute = int(time.time()) - 1577 dagger.service.services.Dagger.app._update_instance = CoroutineMock()578 dagger.service.services.Dagger.app._store.get_trigger = CoroutineMock()579 dagger.service.services.Dagger.app._store.insert_trigger = CoroutineMock()580 dagger.service.services.Dagger.app._store.process_trigger_task_complete = (581 CoroutineMock()582 )583 assert trigger_fixture.get_id() == trigger_fixture.id584 await trigger_fixture.start(workflow_instance=workflow_instance_fixture)585 assert dagger.service.services.Dagger.app._update_instance.called586 assert trigger_fixture.status.code == TaskStatusEnum.COMPLETED.name587 assert (588 dagger.service.services.Dagger.app._store.process_trigger_task_complete.called589 )590 @pytest.mark.asyncio591 async def test_future_interval_fixture(592 self, interval_fixture, workflow_instance_fixture593 ):594 assert interval_fixture.status.code == TaskStatusEnum.NOT_STARTED.name595 interval_fixture.time_to_execute = int(time.time()) + 1000596 dagger.service.services.Dagger.app._update_instance = CoroutineMock()597 assert interval_fixture.get_id() == interval_fixture.id598 await interval_fixture.start(workflow_instance=workflow_instance_fixture)599 assert dagger.service.services.Dagger.app._update_instance.called600 assert interval_fixture.status.code == TaskStatusEnum.EXECUTING.name601 @pytest.mark.asyncio602 async def test_current_interval_fixture(603 self, interval_fixture, workflow_instance_fixture604 ):605 assert interval_fixture.status.code == TaskStatusEnum.NOT_STARTED.name606 interval_fixture.time_to_execute = int(time.time()) - 1607 dagger.service.services.Dagger.app._update_instance = CoroutineMock()608 dagger.service.services.Dagger.app._store.get_trigger = CoroutineMock()609 dagger.service.services.Dagger.app._store.insert_trigger = CoroutineMock()610 assert interval_fixture.get_id() == interval_fixture.id611 await interval_fixture.start(workflow_instance=workflow_instance_fixture)612 assert dagger.service.services.Dagger.app._update_instance.called613 assert interval_fixture.status.code == TaskStatusEnum.COMPLETED.name614 @pytest.mark.asyncio615 async def test_systemtimer_trigger_aerospike(616 self, system_timer_fixture, workflow_instance_fixture617 ):618 assert system_timer_fixture.status.code == TaskStatusEnum.NOT_STARTED.name619 mock = asynctest.MagicMock()620 mock.task_ids = ["id1"]621 mock.__aiter__.return_value = [mock]622 dagger.service.services.Dagger.app._store.get_valid_triggers.return_value = mock623 mock_trigger = Trigger({"ID1"}, time.time())624 dagger.service.services.Dagger.app._store.get_trigger = CoroutineMock(625 return_value=mock_trigger626 )627 task_mock = MagicMock()628 task_mock.start = CoroutineMock()629 task_mock.status.code = TaskStatusEnum.EXECUTING.name630 dagger.service.services.Dagger.app._store.execute_system_timer_task = (631 CoroutineMock()632 )633 assert system_timer_fixture.get_id() == system_timer_fixture.id634 await system_timer_fixture.start(workflow_instance=workflow_instance_fixture)635 assert (636 dagger.service.services.Dagger.app._store.execute_system_timer_task.called637 )638 with pytest.raises(NotImplementedError):639 await system_timer_fixture.on_message(workflow_instance_fixture, {})640 with pytest.raises(NotImplementedError):641 await system_timer_fixture.evaluate()642 with pytest.raises(NotImplementedError):643 await system_timer_fixture.get_correlatable_key(MagicMock())644 with pytest.raises(NotImplementedError):645 await system_timer_fixture.on_complete(646 workflow_instance=workflow_instance_fixture647 )648 @pytest.fixture()649 def default_process_instance_fixture(self):650 return DefaultProcessTemplateDAGInstance(uuid1())651 @pytest.mark.asyncio652 async def test_process_instance_task(653 self, default_process_instance_fixture, workflow_instance_fixture654 ):655 default_process_instance_fixture.max_run_duration = 300656 assert (657 default_process_instance_fixture.status.code658 == TaskStatusEnum.NOT_STARTED.name659 )660 dagger.service.services.Dagger.app._update_instance = CoroutineMock()661 mock = asynctest.MagicMock()662 mock.start = CoroutineMock()663 mock.merge_local_runtime_parameters = CoroutineMock()664 dagger.service.services.Dagger.app.get_instance = CoroutineMock(665 return_value=mock666 )667 dagger.service.services.Dagger.app._store_task_instance = CoroutineMock()668 dagger.service.services.Dagger.app._store_trigger_instance = CoroutineMock()669 await default_process_instance_fixture.start(670 workflow_instance=workflow_instance_fixture671 )672 assert (673 default_process_instance_fixture.status.code674 == TaskStatusEnum.EXECUTING.name675 )676 default_process_instance_fixture.on_complete = CoroutineMock()677 await default_process_instance_fixture.notify(678 status=TaskStatus(679 code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value680 ),681 workflow_instance=workflow_instance_fixture,682 )683 assert default_process_instance_fixture.on_complete.called684 assert dagger.service.services.Dagger.app._store_trigger_instance.called685 await default_process_instance_fixture.stop()686 with pytest.raises(NotImplementedError):687 await default_process_instance_fixture.get_correlatable_key(MagicMock())688 with pytest.raises(NotImplementedError):689 await default_process_instance_fixture.on_message(690 runtime_parameters=workflow_instance_fixture.runtime_parameters691 )692 with pytest.raises(NotImplementedError):693 await default_process_instance_fixture.evaluate()694 @pytest.fixture()695 def default_template_instance_fixture(self):696 return DefaultTemplateDAGInstance(CoroutineMock())697 @pytest.mark.asyncio698 async def test_default_template_instance_task(self, workflow_instance_fixture):699 assert workflow_instance_fixture.status.code == TaskStatusEnum.NOT_STARTED.name700 dagger.service.services.Dagger.app._update_instance = CoroutineMock()701 mock = asynctest.MagicMock()702 dagger.service.services.Dagger.app.get_instance.return_value = mock703 mock.start = CoroutineMock()704 mock.merge_local_runtime_parameters = CoroutineMock()705 await workflow_instance_fixture.start(706 workflow_instance=workflow_instance_fixture707 )708 assert workflow_instance_fixture.status.code == TaskStatusEnum.EXECUTING.name709 workflow_instance_fixture.on_complete = CoroutineMock()710 await workflow_instance_fixture.notify(711 status=TaskStatus(712 code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value713 ),714 workflow_instance=workflow_instance_fixture,715 )716 assert workflow_instance_fixture.on_complete.called717 with pytest.raises(NotImplementedError):718 await workflow_instance_fixture.get_correlatable_key(MagicMock())719 with pytest.raises(NotImplementedError):720 await workflow_instance_fixture.on_message(workflow_instance_fixture)721 with pytest.raises(NotImplementedError):722 await workflow_instance_fixture.evaluate()723 @pytest.fixture()724 def default_monitoring_instance_fixture(self):725 return TestDefaultMonitoringClass(uuid1())726 @pytest.fixture()727 def max_run_duration_monitoring_instance_fixture(self):728 return SkipOnMaxDurationTask(CoroutineMock())729 @pytest.mark.asyncio730 async def test_default_monitoring_task_on_complete(self, workflow_instance_fixture):731 dagger.service.services.Dagger.app._remove_root_template_instance = (732 CoroutineMock()733 )734 dagger.service.services.Dagger.app.delete_workflow_on_complete = True735 await workflow_instance_fixture.on_complete(736 workflow_instance=workflow_instance_fixture737 )738 assert dagger.service.services.Dagger.app._remove_root_template_instance.called739 @pytest.mark.asyncio740 async def test_max_run_duration_monitoring_task_on_complete(741 self,742 max_run_duration_monitoring_instance_fixture,743 default_process_instance_fixture,744 workflow_instance_fixture,745 ):746 max_run_duration_monitoring_instance_fixture.get_remaining_tasks = (747 CoroutineMock(return_value=[])748 )749 workflow_instance_fixture.status.code = TaskStatusEnum.EXECUTING.name750 await max_run_duration_monitoring_instance_fixture.process_monitored_task(751 default_process_instance_fixture,752 workflow_instance=workflow_instance_fixture,753 )754 assert max_run_duration_monitoring_instance_fixture.get_remaining_tasks.called755 @pytest.mark.asyncio756 async def test_default_monitoring_task_on_complete_not_called(757 self, default_monitoring_instance_fixture, workflow_instance_fixture758 ):759 monitored_task = MagicMock()760 monitored_task.status.code = TaskStatusEnum.COMPLETED.name761 dagger.service.services.Dagger.app.get_instance = CoroutineMock(762 return_value=monitored_task763 )764 default_monitoring_instance_fixture.process_monitored_task = CoroutineMock()765 await default_monitoring_instance_fixture.execute(766 runtime_parameters=workflow_instance_fixture.runtime_parameters,767 workflow_instance=workflow_instance_fixture,768 )769 assert (770 default_monitoring_instance_fixture.process_monitored_task.called is False771 )772 @pytest.mark.asyncio773 async def test_default_monitoring_task_on_complete_called(774 self,775 default_process_instance_fixture,776 default_monitoring_instance_fixture,777 workflow_instance_fixture,778 ):779 default_monitoring_instance_fixture.monitored_task_id = (780 default_process_instance_fixture.id781 )782 workflow_instance_fixture.add_task(default_process_instance_fixture)783 default_monitoring_instance_fixture.process_monitored_task = CoroutineMock()784 await default_monitoring_instance_fixture.execute(785 runtime_parameters={}, workflow_instance=workflow_instance_fixture786 )787 assert default_monitoring_instance_fixture.process_monitored_task.called788 @pytest.fixture()789 def default_monitored_process_template_instance(self):790 return MonitoredProcessTemplateDAGInstance(CoroutineMock())791 @pytest.mark.asyncio792 async def test_monitored_process_template_instance(793 self, default_monitored_process_template_instance, workflow_instance_fixture794 ):795 workflow_instance_fixture.runtime_parameters = {COMPLETE_BY_KEY: 414}796 dagger.service.services.Dagger.app._store_trigger_instance = CoroutineMock()797 default_monitored_process_template_instance.get_monitoring_task_type = (798 MagicMock(return_value=MagicMock())799 )800 await default_monitored_process_template_instance.execute(801 runtime_parameters=workflow_instance_fixture.runtime_parameters,802 workflow_instance=workflow_instance_fixture,803 )804 assert dagger.service.services.Dagger.app._store_trigger_instance.called is True805 @pytest.mark.asyncio806 async def test_monitored_process_template_instance_not_called(807 self, default_monitored_process_template_instance, workflow_instance_fixture808 ):809 default_monitored_process_template_instance.runtime_parameters = {}810 dagger.service.services.Dagger.app._store_task_instance = CoroutineMock()811 workflow_instance_fixture.runtime_parameters = {COMPLETE_BY_KEY: 414}812 default_monitored_process_template_instance.monitoring_task_id = uuid1()813 await default_monitored_process_template_instance.execute(814 workflow_instance_fixture.runtime_parameters,815 workflow_instance=workflow_instance_fixture,816 )817 assert dagger.service.services.Dagger.app._store_task_instance.called is False818 @pytest.mark.asyncio819 async def test_monitored_process_template_instance_on_complete(820 self, default_monitored_process_template_instance821 ):822 default_monitored_process_template_instance.monitoring_task_id = uuid.uuid1()823 mt = MagicMock()824 mt.on_complete = CoroutineMock()825 dagger.service.services.Dagger.app.get_instance = CoroutineMock(return_value=mt)826 assert mt.on_complete.calleds827class TestDefaultMonitoringClass(DefaultMonitoringTask):828 async def process_monitored_task(829 self, monitored_task: ITask, workflow_instance: Optional[ITemplateDAGInstance]830 ) -> None:831 pass832class TestKafkaAgent:833 @pytest.fixture()834 def workflow_instance_fixture(self):835 return DefaultTemplateDAGInstance(uuid.uuid1())836 @pytest.mark.asyncio837 async def test_kafka_agent(self, workflow_instance_fixture):838 app = MagicMock()839 topic = MagicMock()840 topic.get_topic_name = MagicMock(return_value="topic")841 task = KafkaListenerTask(uuid1())842 task._topic = MagicMock()843 task._topic.get_topic_name = MagicMock(return_value="topic")844 task.topic = "topic"845 listener = KafkaListenerTask(uuid1())846 workflow_instance_fixture.add_task(listener)847 workflow_instance_fixture.add_task(task)848 agent = KafkaAgent(app=app, topic=topic, task=listener)849 # Non-executing task850 task.on_message = CoroutineMock(return_value=True)851 task.on_complete = CoroutineMock()852 task.status = TaskStatus(853 code=TaskStatusEnum.NOT_STARTED.name, value=TaskStatusEnum.NOT_STARTED.value854 )855 task.allow_skip_to = True856 root_template_instance = CoroutineMock()857 skipped_task = KafkaListenerTask(uuid1())858 workflow_instance_fixture.add_task(skipped_task)859 skipped_task.on_complete = CoroutineMock()860 skipped_task.status.code = TaskStatusEnum.EXECUTING.name861 workflow_instance_fixture.get_remaining_tasks = CoroutineMock(862 return_value=[skipped_task, task]863 )864 dagger.service.services.Dagger.app.get_root_template_instance = CoroutineMock(865 return_value=root_template_instance866 )867 generator_mock = MagicMock()868 generator_mock.__aiter__.return_value = [(workflow_instance_fixture, task)]869 agent.app._get_tasks_by_correlatable_key.return_value = generator_mock870 listener.get_correlatable_keys_from_payload = CoroutineMock(871 return_value=[("id", "v1"), ("id2", "v2")]872 )873 stream = asynctest.MagicMock()874 stream.__aiter__.return_value = [range(1)]875 await agent.process_event(stream)876 assert task.on_message.called877 assert task.on_complete.called878 assert skipped_task.on_complete.called879 assert agent.app._get_tasks_by_correlatable_key.call_count == 2880 # Executing task but on_message returned False, assert on_complete not called881 task.on_message = CoroutineMock(return_value=False)882 task.on_complete = CoroutineMock()883 task.status.code = TaskStatusEnum.EXECUTING.name884 await agent.process_event(stream)885 assert task.on_message.called886 assert not task.on_complete.called887 # Skipped task with allow_skip_to as true, should process on_message and on_complete888 task.on_message = CoroutineMock(return_value=True)889 task.on_complete = CoroutineMock()890 task.status.code = TaskStatusEnum.SKIPPED.name891 task.allow_skip_to = True892 await agent.process_event(stream)893 assert task.on_message.called894 assert task.on_complete.called895 # Skipped task with allow_skip_to as false, should not process on_message or on_complete896 task.on_message = CoroutineMock(return_value=True)897 task.on_complete = CoroutineMock()898 task.status.code = TaskStatusEnum.SKIPPED.name899 task.allow_skip_to = False900 await agent.process_event(stream)901 assert not task.on_message.called902 assert not task.on_complete.called903 # test completed904 task.on_message = CoroutineMock(return_value=True)905 task.on_complete = CoroutineMock()906 task.start = CoroutineMock()907 task.status.code = TaskStatusEnum.COMPLETED.name908 task.allow_skip_to = False909 task.reprocess_on_message = False910 await agent.process_event(stream)911 assert task.start.called912 # test exactly once913 listener.match_only_one = True914 task1 = KafkaListenerTask(uuid1())915 workflow_instance_fixture.add_task(task1)916 task1._topic = MagicMock()917 task1._topic.get_topic_name = MagicMock(return_value="topic")918 task1.on_message = CoroutineMock(return_value=True)919 task1.on_complete = CoroutineMock()920 task1.status.code = TaskStatusEnum.EXECUTING.name921 task1.topic = "topic"922 task2 = KafkaListenerTask(uuid1())923 workflow_instance_fixture.add_task(task2)924 task2._topic = MagicMock()925 task2.topic = "topic"926 task2._topic.get_topic_name = MagicMock(return_value="topic")927 task2.on_message = CoroutineMock(return_value=True)928 task2.on_complete = CoroutineMock()929 task2.status.code = TaskStatusEnum.EXECUTING.name930 generator_mock2 = MagicMock()931 generator_mock2.__aiter__.return_value = [932 (workflow_instance_fixture, task1),933 (workflow_instance_fixture, task2),934 ]935 agent.app._get_tasks_by_correlatable_key.return_value = generator_mock2936 stream = asynctest.MagicMock()937 stream.__aiter__.return_value = range(1)938 await agent.process_event(stream)939 assert task1.on_message.called940 assert task1.on_complete.called...
interpolated.py
Source:interpolated.py
...47 minimum = float(np.nanmax(np.array((min(xx), minimum))))48 maximum = float(np.nanmin(np.array((max(xx), maximum))))49 super(Interped, self).__init__(name=name, latex_label=latex_label, unit=unit,50 minimum=minimum, maximum=maximum, boundary=boundary)51 self._update_instance()52 def __eq__(self, other):53 if self.__class__ != other.__class__:54 return False55 if np.array_equal(self.xx, other.xx) and np.array_equal(self.yy, other.yy):56 return True57 return False58 def prob(self, val):59 """Return the prior probability of val.60 Parameters61 ==========62 val: Union[float, int, array_like]63 Returns64 =======65 Union[float, array_like]: Prior probability of val66 """67 return self.probability_density(val)68 def cdf(self, val):69 return self.cumulative_distribution(val)70 def rescale(self, val):71 """72 'Rescale' a sample from the unit line element to the prior.73 This maps to the inverse CDF. This is done using interpolation.74 """75 rescaled = self.inverse_cumulative_distribution(val)76 if rescaled.shape == ():77 rescaled = float(rescaled)78 return rescaled79 @property80 def minimum(self):81 """Return minimum of the prior distribution.82 Updates the prior distribution if minimum is set to a different value.83 Yields an error if value is set below instantiated x-array minimum.84 Returns85 =======86 float: Minimum of the prior distribution87 """88 return self._minimum89 @minimum.setter90 def minimum(self, minimum):91 if minimum < self.min_limit:92 raise ValueError('Minimum cannot be set below {}.'.format(round(self.min_limit, 2)))93 self._minimum = minimum94 if '_maximum' in self.__dict__ and self._maximum < np.inf:95 self._update_instance()96 @property97 def maximum(self):98 """Return maximum of the prior distribution.99 Updates the prior distribution if maximum is set to a different value.100 Yields an error if value is set above instantiated x-array maximum.101 Returns102 =======103 float: Maximum of the prior distribution104 """105 return self._maximum106 @maximum.setter107 def maximum(self, maximum):108 if maximum > self.max_limit:109 raise ValueError('Maximum cannot be set above {}.'.format(round(self.max_limit, 2)))110 self._maximum = maximum111 if '_minimum' in self.__dict__ and self._minimum < np.inf:112 self._update_instance()113 @property114 def yy(self):115 """Return p(xx) values of the interpolated prior function.116 Updates the prior distribution if it is changed117 Returns118 =======119 array_like: p(xx) values120 """121 return self._yy122 @yy.setter123 def yy(self, yy):124 self._yy = yy125 self.__all_interpolated = interp1d(x=self.xx, y=self._yy, bounds_error=False, fill_value=0)126 self._update_instance()127 def _update_instance(self):128 self.xx = np.linspace(self.minimum, self.maximum, len(self.xx))129 self._yy = self.__all_interpolated(self.xx)130 self._initialize_attributes()131 def _initialize_attributes(self):132 from scipy.integrate import cumtrapz133 if np.trapz(self._yy, self.xx) != 1:134 logger.debug('Supplied PDF for {} is not normalised, normalising.'.format(self.name))135 self._yy /= np.trapz(self._yy, self.xx)136 self.YY = cumtrapz(self._yy, self.xx, initial=0)137 # Need last element of cumulative distribution to be exactly one.138 self.YY[-1] = 1139 self.probability_density = interp1d(x=self.xx, y=self._yy, bounds_error=False, fill_value=0)140 self.cumulative_distribution = interp1d(x=self.xx, y=self.YY, bounds_error=False, fill_value=(0, 1))141 self.inverse_cumulative_distribution = interp1d(x=self.YY, y=self.xx, bounds_error=True)...
serializers.py
Source:serializers.py
...15class NestedUpdateSerializerMixin(object):16 """17 Base nested serializer update and create18 """19 def _update_instance(self, instance, data, exclude_attrs=None):20 for attr, value in data.items():21 if not exclude_attrs or attr not in exclude_attrs:22 setattr(instance, attr, value)23 instance.save()24 return instance25 def _update_reference(self, instance, data, ref_name, ref_new=None):26 ref_data = data.pop(ref_name, None)27 ref = getattr(instance, ref_name, ref_new)28 if ref:29 if isinstance(ref_data, dict):30 self._update_instance(ref, ref_data)31 elif ref_data is None:32 ref.delete()33 return instance34 def create(self, validated_data):35 instance = self.Meta.model()36 self._update(instance, validated_data)37 return instance38 def update(self, instance, validated_data):39 self._update(instance, validated_data)40 return instance41 def _update(self, instance, validated_data):42 self._update_instance(instance, validated_data,43 self.Meta.references.keys())44 for key, model_name in self.Meta.references.items():45 model = apps.get_model(model_name)46 obj = model()47 setattr(obj, self.Meta.reference_parent, instance)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!