How to use _mock_task method in lisa

Best Python code snippet using lisa_python

test_create_task_mappings.py

Source:test_create_task_mappings.py Github

copy

Full Screen

...393class TestFlippedTasksPerBuild:394 def test_get_flipped_tasks_per_build(self):395 build_variant = "test"396 tasks = [397 _mock_task(display_name=str(i), activated=True, status="Success") for i in range(12)398 ]399 build_mock = MagicMock(build_variant=build_variant)400 build_mock.get_tasks.return_value = tasks401 prev_version = MagicMock()402 prev_build = MagicMock()403 prev_build.get_tasks.return_value = [404 _mock_task(405 display_name=str(i), activated=True, status="Success" if (i % 2 == 0) else "Failed"406 )407 for i in range(11)408 ]409 prev_version.build_by_variant.return_value = prev_build410 next_version = MagicMock()411 next_build = MagicMock()412 next_build.get_tasks.return_value = [413 _mock_task(414 display_name=str(i), activated=True, status="Success" if (i % 3 == 0) else "Failed"415 )416 for i in range(11)417 ]418 next_version.build_by_variant.return_value = next_build419 flipped_tasks = under_test._get_flipped_tasks_per_build(420 build_mock, prev_version, next_version421 )422 expected = ["3", "9"]423 assert len(flipped_tasks) == len(expected)424 for name in expected:425 assert name in flipped_tasks426 def test_no_get_flipped_tasks_if_tasks_werent_active(self):427 build_variant = "test"428 tasks = [_mock_task(display_name=str(i), activated=False, status="True") for i in range(10)]429 build_mock = MagicMock(build_variant=build_variant)430 build_mock.get_tasks.return_value = tasks431 prev_version = MagicMock()432 prev_build = MagicMock()433 prev_build.get_tasks.return_value = [434 _mock_task(display_name=str(i), status=str(i % 2 == 0)) for i in range(9)435 ]436 prev_version.build_by_variant.return_value = prev_build437 next_version = MagicMock()438 next_build = MagicMock()439 next_build.get_tasks.return_value = [440 _mock_task(display_name=str(i), status=str(i % 3 == 0)) for i in range(9)441 ]442 next_version.build_by_variant.return_value = next_build443 flipped_tasks = under_test._get_flipped_tasks_per_build(444 build_mock, prev_version, next_version445 )446 assert len(flipped_tasks) == 0447 def test_no_get_flipped_tasks_if_prev_version_does_not_contain_variant(self):448 build_variant = "test"449 tasks = [450 _mock_task(display_name=str(i), activated=True, status="Success") for i in range(12)451 ]452 build_mock = MagicMock(build_variant=build_variant)453 build_mock.get_tasks.return_value = tasks454 prev_version = MagicMock()455 prev_version.build_by_variant.side_effect = KeyError456 next_version = MagicMock()457 next_build = MagicMock()458 next_build.get_tasks.return_value = [459 _mock_task(460 display_name=str(i), activated=True, status="Success" if (i % 3 == 0) else "Failed"461 )462 for i in range(11)463 ]464 next_version.build_by_variant.return_value = next_build465 flipped_tasks = under_test._get_flipped_tasks_per_build(466 build_mock, prev_version, next_version467 )468 assert len(flipped_tasks) == 0469 def test_no_get_flipped_tasks_if_next_version_does_not_contain_variant(self):470 build_variant = "test"471 tasks = [472 _mock_task(display_name=str(i), activated=True, status="Success") for i in range(12)473 ]474 build_mock = MagicMock(build_variant=build_variant)475 build_mock.get_tasks.return_value = tasks476 prev_version = MagicMock()477 prev_build = MagicMock()478 prev_build.get_tasks.return_value = [479 _mock_task(480 display_name=str(i), activated=True, status="Success" if (i % 2 == 0) else "Failed"481 )482 for i in range(11)483 ]484 prev_version.build_by_variant.return_value = prev_build485 next_version = MagicMock()486 next_version.build_by_variant.side_effect = KeyError487 flipped_tasks = under_test._get_flipped_tasks_per_build(488 build_mock, prev_version, next_version489 )490 assert len(flipped_tasks) == 0491class TestCreateTaskMap:492 def test_create_task_map(self):493 tasks = [MagicMock(display_name=f"name{i}") for i in range(7)]494 mapped_tasks = under_test._create_task_map(tasks)495 for task in tasks:496 assert mapped_tasks[task.display_name] is not None497 assert mapped_tasks[task.display_name] == task498 def test_create_task_map_execution(self):499 display_task = MagicMock(500 display_name="name", json={"execution_tasks": [f"id{i}" for i in range(7)]}501 )502 execution_tasks = [MagicMock(display_name=f"name{i}", task_id=f"id{i}") for i in range(7)]503 tasks = [display_task] + execution_tasks504 mapped_tasks = under_test._create_task_map(tasks)505 assert len(mapped_tasks) == 1506 assert mapped_tasks["name"] == display_task507class TestIsTaskAFlip:508 def test_non_activated_task_is_not_a_flip(self):509 mock_task = _mock_task(activated=False)510 assert not under_test._is_task_a_flip(mock_task, {}, {})511 def test_not_a_success_or_failed_status_is_not_a_flip(self):512 mock_task = _mock_task(activated=True, status="started")513 assert not under_test._is_task_a_flip(mock_task, {}, {})514 def test_no_previous_task_is_not_a_flip(self):515 mock_task = _mock_task(activated=True, status="success")516 mock_task.is_success.return_value = False517 assert not under_test._is_task_a_flip(mock_task, {}, {})518 def test_no_next_task_is_not_a_flip(self):519 mock_task = _mock_task(activated=True, status="success")520 mock_task.is_success.return_value = False521 mock_prev_task = _mock_task(activated=True, status=mock_task.status)522 tasks_prev = {mock_task.display_name: mock_prev_task}523 assert not under_test._is_task_a_flip(mock_task, tasks_prev, {})524 def test_all_statuses_same_is_no_flip(self):525 mock_task = _mock_task(activated=True, status="failed")526 mock_task.is_success.return_value = False527 mock_prev_task = _mock_task(activated=True, status=mock_task.status)528 tasks_prev = {mock_task.display_name: mock_prev_task}529 mock_next_task = _mock_task(activated=True, status=mock_task.status)530 tasks_next = {mock_task.display_name: mock_next_task}531 assert not under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)532 def test_next_and_current_same_status_is_a_flip(self):533 mock_task = _mock_task(activated=True, status="failed")534 mock_task.is_success.return_value = False535 mock_prev_task = _mock_task(activated=True, status="success")536 tasks_prev = {mock_task.display_name: mock_prev_task}537 mock_next_task = _mock_task(activated=True, status=mock_task.status)538 tasks_next = {mock_task.display_name: mock_next_task}539 assert under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)540 mock_task = _mock_task(activated=True, status="success")541 mock_task.is_success.return_value = False542 mock_prev_task = _mock_task(activated=True, status="failed")543 tasks_prev = {mock_task.display_name: mock_prev_task}544 mock_next_task = _mock_task(activated=True, status=mock_task.status)545 tasks_next = {mock_task.display_name: mock_next_task}546 assert under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)547 def test_prev_task_not_activated_is_not_a_flip(self):548 mock_task = _mock_task(activated=True, status="failed")549 mock_task.is_success.return_value = False550 mock_prev_task = _mock_task(activated=False, status="success")551 tasks_prev = {mock_task.display_name: mock_prev_task}552 mock_next_task = _mock_task(activated=True, status=mock_task.status)553 tasks_next = {mock_task.display_name: mock_next_task}554 assert not under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)555 def test_next_task_not_activated_is_not_a_flip(self):556 mock_task = _mock_task(activated=True, status="failed")557 mock_task.is_success.return_value = False558 mock_prev_task = _mock_task(activated=True, status="success")559 tasks_prev = {mock_task.display_name: mock_prev_task}560 mock_next_task = _mock_task(activated=False, status=mock_task.status)561 tasks_next = {mock_task.display_name: mock_next_task}562 assert not under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)563 def test_prev_and_current_same_status_is_not_a_flip(self):564 mock_task = _mock_task(activated=True, status="failed")565 mock_task.is_success.return_value = False566 mock_prev_task = _mock_task(activated=True, status=mock_task.status)567 tasks_prev = {mock_task.display_name: mock_prev_task}568 mock_next_task = _mock_task(activated=True, status="success")569 tasks_next = {mock_task.display_name: mock_next_task}570 assert not under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)571class TestMeaningfulTaskStatus:572 def test_success_failed_status_pass(self):573 pass_mock = MagicMock(status="success")574 assert under_test._check_meaningful_task_status(pass_mock)575 failed_mock = MagicMock(status="failed")576 assert under_test._check_meaningful_task_status(failed_mock)577 def test_other_task_status_do_not_pass(self):578 task_mock = MagicMock(status="started")579 assert not under_test._check_meaningful_task_status(task_mock)580def _mock_task(activated: bool = None, status: str = None, display_name: str = None):581 return MagicMock(activated=activated, status=status, display_name=display_name)582class TestGenerateTaskMappings:583 @patch(ns("TaskMappings.create_task_mappings"))584 def test_generates_task_mappings(self, create_task_mappings_mock):585 mock_evg_api = MagicMock()586 created_task_mock = MagicMock()587 created_task_mock.transform.return_value = ["mock-mappings"]588 create_task_mappings_mock.return_value = (created_task_mock, "most-recent-version-analyzed")589 task_mappings, most_recent_version_analyzed = under_test.generate_task_mappings(590 mock_evg_api,591 "mongodb-mongo-master",592 VersionLimit(stop_at_version_id="my-version"),593 ".*src",594 module_name="my-module",...

Full Screen

Full Screen

rexflow_api.py

Source:rexflow_api.py Github

copy

Full Screen

...27 ValidatedPayload,28 ValidatorResults,29)30from rexflow_ui.errors import ValidationErrorDetails31def _mock_task():32 return Task(33 iid=MOCK_IID,34 tid=MOCK_TID,35 data=[36 TaskFieldData(37 dataId='uname',38 type='TEXT',39 order=1,40 ),41 ],42 )43def _mock_validation_error():44 return ValidationErrorDetails.init_from_payload(45 payload=ValidatedPayload(46 iid=MOCK_IID,47 tid=MOCK_TID,48 passed=False,49 results=[50 FieldValidationResult(51 dataId=MOCK_DATA_ID,52 passed=False,53 results=[54 ValidatorResults(55 passed=False,56 message='test failed validation',57 ),58 ],59 ),60 ],61 status=OperationStatus.FAILURE,62 ),63 )64def _mock_error():65 return ErrorDetails(message='test error')66def _mock_workflow(with_tasks=True):67 if with_tasks:68 tasks = [69 _mock_task(),70 ]71 else:72 tasks = []73 return Workflow(74 did=MOCK_DID,75 iid=MOCK_IID,76 status=WorkflowStatus.RUNNING,77 tasks=tasks,78 )79async def get_available_workflows(refresh=False) -> List[WorkflowDeployment]:80 return [81 WorkflowDeployment(82 name=MOCK_NAME,83 deployments=[MOCK_DID],84 bridge_url='',85 )86 ]87async def find_workflow_deployment(88 deployment_id: WorkflowDeploymentId,89) -> Optional[WorkflowDeployment]:90 return WorkflowDeployment(91 name=MOCK_NAME,92 deployments=[deployment_id],93 bridge_url='',94 )95async def start_workflow(96 deployment_id: WorkflowDeploymentId,97 metadata: List[MetaData] = [],98) -> Workflow:99 return _mock_workflow(with_tasks=False)100async def start_workflow_by_name(101 workflow_name: str,102 metadata: List[MetaData] = [],103) -> Workflow:104 return _mock_workflow(with_tasks=False)105async def refresh_workflows():106 pass107@validate_arguments108async def get_active_workflows(109 iids: List[WorkflowInstanceId] = [],110 metadata: Dict = {},111) -> List[Workflow]:112 return [_mock_workflow()]113async def complete_workflow(instance_id: WorkflowInstanceId) -> None:114 pass115async def cancel_workflow(116 instance_id: WorkflowInstanceId,117) -> bool:118 return True119@validate_arguments120async def start_tasks(121 iid: WorkflowInstanceId,122 tasks: List[TaskId]123) -> List[Task]:124 return [125 Task(126 iid=iid,127 tid=tid,128 )129 for tid in tasks130 ]131@validate_arguments132async def get_task(iid: WorkflowInstanceId, tid: TaskId) -> Task:133 return _mock_task()134@validate_arguments135async def validate_tasks(tasks: List[TaskChange]) -> TaskOperationResults:136 result = TaskOperationResults()137 if tasks:138 result.successful = [_mock_task()]139 else:140 result.errors = [_mock_validation_error(), _mock_error()]141 return result142@validate_arguments143async def save_tasks(tasks: List[TaskChange]) -> TaskOperationResults:144 result = TaskOperationResults()145 if tasks:146 result.successful = [_mock_task()]147 else:148 result.errors = [_mock_validation_error(), _mock_error()]149 return result150@validate_arguments151async def complete_tasks(tasks: List[TaskChange]) -> TaskOperationResults:152 result = TaskOperationResults()153 if tasks:154 result.successful = [_mock_task()]155 else:156 result.errors = [_mock_validation_error(), _mock_error()]...

Full Screen

Full Screen

gcs_download_test.py

Source:gcs_download_test.py Github

copy

Full Screen

...6import tarfile7import unittest8from unittest import mock9from gcs_download import DownloadAndUnpackFromCloudStorage10def _mock_task(status_code: int = 0, stderr: str = '') -> mock.Mock:11 task_mock = mock.Mock()12 attrs = {13 'returncode': status_code,14 'wait.return_value': status_code,15 'communicate.return_value': (None, stderr.encode()),16 }17 task_mock.configure_mock(**attrs)18 return task_mock19@mock.patch('subprocess.Popen')20@mock.patch('tarfile.open')21class TestDownloadAndUnpackFromCloudStorage(unittest.TestCase):22 def testHappyPath(self, mock_tarfile, mock_popen):23 mock_popen.return_value = _mock_task()24 DownloadAndUnpackFromCloudStorage('', '')25 def testFailedTarOpen(self, mock_tarfile, mock_popen):26 mock_popen.return_value = _mock_task(stderr='some error')27 mock_tarfile.side_effect = tarfile.ReadError()28 with self.assertRaises(subprocess.CalledProcessError):29 DownloadAndUnpackFromCloudStorage('', '')30 def testBadTaskStatusCode(self, mock_tarfile, mock_popen):31 mock_popen.return_value = _mock_task(stderr='some error', status_code=1)32 with self.assertRaises(subprocess.CalledProcessError):33 DownloadAndUnpackFromCloudStorage('', '')34if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful