Best Python code snippet using localstack_python
test_cli.py
Source:test_cli.py
...145 },146 }147 conf_obj = mock_klio_config.setup(config, config_file, config_override)148 result = runner.invoke(cli.main, cli_inputs)149 core_testing.assert_execution_success(result)150 assert "" == result.output151 if image_tag:152 mock_get_git_sha.assert_not_called()153 else:154 mock_get_git_sha.assert_called_once_with(155 mock_klio_config.patch_os_getcwd156 )157 mock_klio_config.assert_calls()158 exp_image_tag = image_tag or mock_get_git_sha.return_value159 mock_build.assert_called_once_with(160 mock_klio_config.patch_os_getcwd,161 conf_obj,162 config_override,163 image_tag=exp_image_tag,164 )165def test_create_job(runner, mock_create, patch_os_getcwd):166 cli_inputs = [167 "job",168 "create",169 "--job-name",170 "test-job",171 "--gcp-project",172 "test-gcp-project",173 "--use-defaults",174 ]175 result = runner.invoke(cli.main, cli_inputs)176 core_testing.assert_execution_success(result)177 assert "" == result.output178 known_kwargs = {179 "job_name": "test-job",180 "gcp_project": "test-gcp-project",181 "use_defaults": True,182 }183 mock_create.assert_called_once_with((), known_kwargs, patch_os_getcwd)184def test_create_job_prompts(runner, mock_create, patch_os_getcwd):185 cli_inputs = ["job", "create"]186 prompt_inputs = ["test-job", "test-gcp-project"]187 inputs = "\n".join(prompt_inputs)188 result = runner.invoke(cli.main, cli_inputs, input=inputs)189 core_testing.assert_execution_success(result)190 exp_output = (191 "Name of your new job: {0}\n"192 "Name of the GCP project the job should be created in: "193 "{1}\n".format(*prompt_inputs)194 )195 assert exp_output == result.output196 known_kwargs = {197 "job_name": "test-job",198 "gcp_project": "test-gcp-project",199 "use_defaults": False,200 }201 mock_create.assert_called_once_with((), known_kwargs, patch_os_getcwd)202def test_create_job_unknown_args(runner, mock_create, patch_os_getcwd):203 cli_inputs = [204 "job",205 "create",206 "--job-name",207 "test-job",208 "--gcp-project",209 "test-gcp-project",210 "--use-defaults",211 "--input-topic",212 "test-input-topic",213 ]214 result = runner.invoke(cli.main, cli_inputs)215 core_testing.assert_execution_success(result)216 assert "" == result.output217 unknown_args = ("--input-topic", "test-input-topic")218 known_kwargs = {219 "job_name": "test-job",220 "gcp_project": "test-gcp-project",221 "use_defaults": True,222 }223 mock_create.assert_called_once_with(224 unknown_args, known_kwargs, patch_os_getcwd225 )226@pytest.mark.parametrize("config_override", (None, "klio-job2.yaml"))227def test_delete_job(228 config_override,229 mocker,230 monkeypatch,231 runner,232 config_file,233 mock_delete,234 patch_os_getcwd,235 mock_klio_config,236):237 config = {238 "job_name": "test-job",239 "version": 1,240 "job_config": {241 "inputs": [242 {243 "topic": "foo-topic",244 "subscription": "foo-sub",245 "data_location": "foo-input-location",246 }247 ],248 "outputs": [249 {250 "topic": "foo-topic-output",251 "data_location": "foo-output-location",252 }253 ],254 },255 }256 conf_obj = mock_klio_config.setup(config, config_file, config_override)257 cli_inputs = ["job", "delete"]258 if config_override:259 cli_inputs.extend(["--config-file", config_override])260 result = runner.invoke(cli.main, cli_inputs)261 core_testing.assert_execution_success(result)262 assert "" == result.output263 mock_klio_config.assert_calls()264 mock_delete.assert_called_once_with(conf_obj)265 mock_delete.return_value.delete.assert_called_once_with()266def test_delete_job_gke(267 mocker, monkeypatch, runner, patch_os_getcwd, mock_klio_config,268):269 config = {270 "job_name": "test-job",271 "version": 1,272 "pipeline_options": {273 "project": "test-project",274 "runner": "DirectGKERunner",275 },276 "job_config": {},277 }278 mock_klio_config.setup(config, None, None)279 mock_delete_gke = mocker.patch.object(280 cli.job_commands.gke, "DeletePipelineGKE"281 )282 cli_inputs = ["job", "delete"]283 result = runner.invoke(cli.main, cli_inputs)284 core_testing.assert_execution_success(result)285 assert "" == result.output286 mock_klio_config.assert_calls()287 mock_delete_gke.assert_called_once_with(mock_klio_config.meta.job_dir)288 mock_delete_gke.return_value.delete.assert_called_once_with()289@pytest.mark.parametrize(290 "is_gcp,direct_runner,config_override,image_tag",291 (292 (False, True, None, None),293 (True, False, None, None),294 (True, True, None, None),295 (True, True, None, "foobar"),296 (True, True, "klio-job2.yaml", None),297 (True, True, "klio-job2.yaml", "foobar"),298 ),299)300@pytest.mark.parametrize("is_job_dir_override", (False, True))301def test_run_job(302 runner,303 mocker,304 tmpdir,305 is_gcp,306 direct_runner,307 image_tag,308 config_override,309 config_file,310 mock_get_git_sha,311 mock_klio_config,312 mock_error_stackdriver_logger_metrics,313 is_job_dir_override,314):315 mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")316 mock_run.return_value = 0317 config_data = {318 "job_name": "test-job",319 "pipeline_options": {320 "worker_harness_container_image": "gcr.register.io/squad/feature",321 "project": "test-project",322 "region": "boonies",323 "staging_location": "gs://somewhere/over/the/rainbow",324 "temp_location": "gs://somewhere/over/the/rainbow",325 },326 "job_config": {327 "inputs": [328 {329 "topic": "foo-topic",330 "subscription": "foo-sub",331 "data_location": "foo-input-location",332 }333 ],334 "outputs": [335 {336 "topic": "foo-topic-output",337 "data_location": "foo-output-location",338 }339 ],340 },341 }342 job_dir_override = None343 if is_job_dir_override:344 temp_dir = tmpdir.mkdir("testing12345")345 job_dir_override = str(temp_dir)346 mock_klio_config.setup(347 config_data, config_file, config_override, job_dir_override348 )349 cli_inputs = ["job", "run"]350 if image_tag:351 cli_inputs.extend(["--image-tag", image_tag])352 if direct_runner:353 cli_inputs.append("--direct-runner")354 if config_override:355 cli_inputs.extend(["--config-file", config_override])356 if job_dir_override:357 cli_inputs.extend(["--job-dir", job_dir_override])358 exp_image_tag = image_tag or mock_get_git_sha.return_value359 if config_override:360 exp_image_tag = "{}-{}".format(exp_image_tag, config_override)361 result = runner.invoke(cli.main, cli_inputs)362 core_testing.assert_execution_success(result)363 assert "" == result.output364 mock_klio_config.assert_calls()365 mock_run.assert_called_once_with()366 mock_get_git_sha.assert_called_once_with(367 mock_klio_config.meta.job_dir, image_tag368 )369 mock_error_stackdriver_logger_metrics.assert_called_once_with(370 mock_klio_config.klio_config, direct_runner371 )372@pytest.mark.parametrize(373 "direct_runner,config_override,image_tag",374 (375 (True, None, None),376 (False, None, None),377 (True, None, "foobar"),378 (True, "klio-job2.yaml", None),379 (True, "klio-job2.yaml", "foobar"),380 ),381)382def test_run_job_gke(383 runner,384 mocker,385 tmpdir,386 direct_runner,387 image_tag,388 config_override,389 config_file,390 mock_get_git_sha,391 mock_klio_config,392 mock_error_stackdriver_logger_metrics,393):394 mock_run_gke = mocker.patch.object(gke_commands.RunPipelineGKE, "run")395 mock_run_gke.return_value = 0396 mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")397 mock_run.return_value = 0398 config_data = {399 "job_name": "test-job",400 "pipeline_options": {401 "worker_harness_container_image": "gcr.register.io/squad/feature",402 "project": "test-project",403 "region": "boonies",404 "staging_location": "gs://somewhere/over/the/rainbow",405 "temp_location": "gs://somewhere/over/the/rainbow",406 "runner": "DirectGKERunner",407 },408 "job_config": {409 "inputs": [410 {411 "topic": "foo-topic",412 "subscription": "foo-sub",413 "data_location": "foo-input-location",414 }415 ],416 "outputs": [417 {418 "topic": "foo-topic-output",419 "data_location": "foo-output-location",420 }421 ],422 },423 }424 mock_klio_config.setup(config_data, config_file, config_override)425 cli_inputs = ["job", "run"]426 if image_tag:427 cli_inputs.extend(["--image-tag", image_tag])428 if direct_runner:429 cli_inputs.append("--direct-runner")430 if config_override:431 cli_inputs.extend(["--config-file", config_override])432 exp_image_tag = image_tag or mock_get_git_sha.return_value433 if config_override:434 exp_image_tag = "{}-{}".format(exp_image_tag, config_override)435 result = runner.invoke(cli.main, cli_inputs)436 core_testing.assert_execution_success(result)437 assert "" == result.output438 mock_klio_config.assert_calls()439 if direct_runner:440 mock_run.assert_called_once_with()441 mock_run_gke.assert_not_called()442 else:443 mock_run_gke.assert_called_once_with()444 mock_run.assert_not_called()445 mock_get_git_sha.assert_called_once_with(446 mock_klio_config.meta.job_dir, image_tag447 )448 mock_error_stackdriver_logger_metrics.assert_called_once_with(449 mock_klio_config.klio_config, direct_runner450 )451def test_run_job_raises(452 runner,453 mocker,454 config_file,455 patch_os_getcwd,456 pipeline_config_dict,457 mock_warn_if_py2_job,458 mock_get_git_sha,459):460 mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")461 mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")462 cli_inputs = ["job", "run", "--image-tag", "foobar"]463 config = {"job_name": "test-job"}464 mock_get_config.return_value = config465 result = runner.invoke(cli.main, cli_inputs)466 assert 1 == result.exit_code467 mock_warn_if_py2_job.asset_called_once_with(patch_os_getcwd)468 mock_get_config.assert_called_once_with(config_file)469 mock_get_git_sha.assert_not_called()470 mock_run.assert_not_called()471@pytest.mark.parametrize("config_override", (None, "klio-job2.yaml"))472def test_stop_job(473 config_override,474 runner,475 mock_stop,476 mocker,477 config_file,478 pipeline_config_dict,479 mock_klio_config,480):481 config_data = {482 "job_name": "test-job",483 "version": 1,484 "pipeline_options": pipeline_config_dict,485 "job_config": {486 "inputs": [487 {488 "topic": "foo-topic",489 "subscription": "foo-sub",490 "data_location": "foo-input-location",491 }492 ],493 "outputs": [494 {495 "topic": "foo-topic-output",496 "data_location": "foo-output-location",497 }498 ],499 },500 }501 mock_klio_config.setup(config_data, config_file, config_override)502 cli_inputs = ["job", "stop"]503 if config_override:504 cli_inputs.extend(["--config-file", config_override])505 result = runner.invoke(cli.main, cli_inputs)506 core_testing.assert_execution_success(result)507 assert "" == result.output508 mock_klio_config.assert_calls()509 mock_stop.assert_called_once_with(510 "test-job", "test-project", "us-central1", "cancel"511 )512def test_stop_job_gke(513 runner, mocker, mock_klio_config,514):515 config_data = {516 "job_name": "test-job",517 "version": 1,518 "pipeline_options": {519 "project": "test-project",520 "runner": "DirectGKERunner",521 },522 "job_config": {},523 }524 mock_klio_config.setup(config_data, None, None)525 mock_stop_gke = mocker.patch.object(526 cli.job_commands.gke, "StopPipelineGKE"527 )528 cli_inputs = ["job", "stop"]529 result = runner.invoke(cli.main, cli_inputs)530 core_testing.assert_execution_success(result)531 assert "" == result.output532 mock_klio_config.assert_calls()533 mock_stop_gke.assert_called_once_with(mock_klio_config.meta.job_dir)534@pytest.mark.parametrize(535 "is_gcp,direct_runner,config_override,image_tag",536 (537 (False, True, None, None),538 (True, False, None, None),539 (True, True, None, None),540 (True, True, "klio-job2.yaml", None),541 ),542)543def test_deploy_job(544 runner,545 mocker,546 is_gcp,547 direct_runner,548 config_override,549 image_tag,550 mock_stop,551 config_file,552 patch_os_getcwd,553 pipeline_config_dict,554 mock_get_git_sha,555 mock_klio_config,556 mock_error_stackdriver_logger_metrics,557):558 mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")559 mock_run.return_value = 0560 config_data = {561 "job_name": "test-job",562 "version": 1,563 "pipeline_options": pipeline_config_dict,564 "job_config": {565 "inputs": [566 {567 "topic": "foo-topic",568 "subscription": "foo-sub",569 "data_location": "foo-input-location",570 }571 ],572 "outputs": [573 {574 "topic": "foo-topic-output",575 "data_location": "foo-output-location",576 }577 ],578 },579 }580 mock_klio_config.setup(581 copy.deepcopy(config_data), config_file, config_override582 )583 cli_inputs = ["job", "deploy"]584 if image_tag:585 cli_inputs.extend(["--image_tag", image_tag])586 if direct_runner:587 cli_inputs.append("--direct-runner")588 if config_override:589 cli_inputs.extend(["--config-file", config_override])590 exp_image_tag = image_tag or mock_get_git_sha.return_value591 if config_override:592 exp_image_tag = "{}-{}".format(exp_image_tag, config_override)593 result = runner.invoke(cli.main, cli_inputs)594 core_testing.assert_execution_success(result)595 assert "" == result.output596 mock_get_git_sha.assert_called_once_with(patch_os_getcwd, image_tag)597 mock_klio_config.assert_calls()598 mock_stop.assert_called_once_with(599 "test-job", "test-project", "us-central1", "cancel"600 )601 mock_run.assert_called_once_with()602 mock_error_stackdriver_logger_metrics.assert_called_once_with(603 mock_klio_config.klio_config, direct_runner604 )605def test_deploy_job_raises(606 runner,607 mocker,608 mock_stop,609 config_file,610 patch_os_getcwd,611 pipeline_config_dict,612 mock_warn_if_py2_job,613 mock_get_git_sha,614):615 mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")616 mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")617 cli_inputs = ["job", "deploy", "--image-tag", "foobar"]618 config = {"job_name": "test-job"}619 mock_get_config.return_value = config620 result = runner.invoke(cli.main, cli_inputs)621 assert 1 == result.exit_code622 mock_warn_if_py2_job.assert_called_once_with(patch_os_getcwd)623 mock_get_config.assert_called_once_with(config_file)624 mock_stop.assert_not_called()625 mock_get_git_sha.assert_not_called()626 mock_run.assert_not_called()627@pytest.mark.parametrize(628 "pytest_args,conf_override,image_tag",629 (630 ([], None, None),631 (["--", "-s"], None, None),632 (["--", "test::test test2::test2"], None, None),633 (["--", "-s"], "klio-job2.yaml", None),634 ),635)636def test_test_job(637 runner,638 mocker,639 config_file,640 patch_os_getcwd,641 pytest_args,642 conf_override,643 image_tag,644 mock_get_git_sha,645 mock_warn_if_py2_job,646 mock_get_config_job_dir,647):648 mock_test_pipeline = mocker.patch.object(649 cli.job_commands.test, "TestPipeline"650 )651 mock_test_pipeline.return_value.run.return_value = 0652 mock_get_config_job_dir.return_value = (653 patch_os_getcwd,654 conf_override or config_file,655 )656 cli_inputs = ["job", "test"]657 if image_tag:658 cli_inputs.extend(["--image-tag", image_tag])659 if conf_override:660 cli_inputs.extend(["--config-file", conf_override])661 cli_inputs.extend(pytest_args)662 config_data = {663 "job_name": "test-job",664 "pipeline_options": {665 "worker_harness_container_image": "gcr.register.io/squad/feature",666 "project": "test-project",667 "region": "boonies",668 "staging_location": "gs://somewhere/over/the/rainbow",669 "temp_location": "gs://somewhere/over/the/rainbow",670 },671 "job_config": {672 "inputs": [673 {674 "topic": "foo-topic",675 "subscription": "foo-sub",676 "data_location": "foo-input-location",677 }678 ],679 "outputs": [680 {681 "topic": "foo-topic-output",682 "data_location": "foo-output-location",683 }684 ],685 },686 }687 mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")688 # deepcopy since KlioConfig will pop keys689 mock_get_config.return_value = config_data690 conf = kconfig.KlioConfig(copy.deepcopy(config_data))691 mock_klio_config = mocker.patch.object(core_utils.config, "KlioConfig")692 mock_klio_config.return_value = conf693 result = runner.invoke(cli.main, cli_inputs)694 core_testing.assert_execution_success(result)695 assert "" == result.output696 exp_image_tag = image_tag or mock_get_git_sha.return_value697 if conf_override:698 exp_image_tag = "{}-{}".format(exp_image_tag, conf_override)699 mock_get_config_job_dir.assert_called_once_with(None, conf_override)700 mock_warn_if_py2_job.assert_called_once_with(patch_os_getcwd)701 if not image_tag:702 mock_get_git_sha.assert_called_once_with(patch_os_getcwd)703 else:704 mock_get_git_sha.assert_not_called()705 mock_get_config.assert_called_once_with(conf_override or config_file)706 mock_klio_config.assert_called_once_with(707 config_data, raw_overrides=(), raw_templates=()708 )709 exp_docker_runtime_config = cli.DockerRuntimeConfig(710 image_tag=exp_image_tag,711 force_build=False,712 config_file_override=conf_override,713 )714 mock_test_pipeline.assert_called_once_with(715 patch_os_getcwd, conf, exp_docker_runtime_config716 )717 mock_test_pipeline.return_value.run.assert_called_once_with(718 pytest_args=pytest_args719 )720@pytest.mark.parametrize(721 "create_resources,conf_override",722 (723 (False, None),724 (False, "klio-job2.yaml"),725 (True, None),726 (True, "klio-job2.yaml"),727 ),728)729def test_verify(730 runner,731 mocker,732 patch_os_getcwd,733 create_resources,734 conf_override,735 mock_warn_if_py2_job,736 mock_get_config_job_dir,737):738 mock_verify_job = mocker.patch.object(739 cli.job_commands.verify.VerifyJob, "verify_job"740 )741 mock_get_config_job_dir.return_value = (742 patch_os_getcwd,743 conf_override or config_file,744 )745 mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")746 config = {747 "job_name": "klio-job-name",748 "job_config": {749 "inputs": [750 {"topic": "foo", "subscription": "bar", "data_location": "baz"}751 ],752 "outputs": [{"topic": "foo-out", "data_location": "baz-out"}],753 },754 "pipeline_options": {},755 "version": 1,756 }757 mock_get_config.return_value = config758 cli_inputs = ["job", "verify"]759 if create_resources:760 cli_inputs.append("--create-resources")761 if conf_override:762 cli_inputs.extend(["--config-file", conf_override])763 result = runner.invoke(cli.main, cli_inputs)764 core_testing.assert_execution_success(result)765 mock_get_config_job_dir.assert_called_once_with(None, conf_override)766 mock_get_config.assert_called_once_with(conf_override or config_file)767 mock_warn_if_py2_job.assert_called_once_with(patch_os_getcwd)768 mock_verify_job.assert_called_once_with()769@pytest.mark.parametrize(770 "use_job_dir,gcs_location,conf_override",771 (772 (None, None, None),773 (True, None, None),774 (None, "gs://bar", None),775 (None, None, "klio-job2.yaml"),776 ),777)778@pytest.mark.parametrize(779 "input_file,output_file",780 (781 (None, None),782 ("input.stats", None),783 (None, "output.stats"),784 ("input.stats", "output.stats"),785 ),786)787@pytest.mark.parametrize(788 "since,until", ((None, None), ("2 hours ago", "1 hour ago"))789)790def test_collect_profiling_data(791 runner,792 patch_os_getcwd,793 mock_warn_if_py2_job,794 mocker,795 use_job_dir,796 gcs_location,797 conf_override,798 input_file,799 output_file,800 since,801 until,802):803 mock_collector = mocker.patch.object(804 cli.job_commands.profile, "DataflowProfileStatsCollector"805 )806 config = {807 "version": 2,808 "job_name": "test-job",809 "job_config": {"events": {}, "data": {}},810 "pipeline_options": {},811 }812 if not gcs_location:813 config["pipeline_options"] = {"profile_location": "gs://foo"}814 mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")815 mock_get_config.return_value = config816 exp_exit_code = 0817 exp_output_snippet = ""818 if input_file:819 if any([use_job_dir, gcs_location, output_file, conf_override]):820 exp_exit_code = 2821 exp_output_snippet = "mutually exclusive"822 elif any([since, until]):823 exp_output_snippet = "`--since` and `--until` will be ignored"824 cli_inputs = ["job", "profile", "collect-profiling-data"]825 if input_file:826 cli_inputs.extend(["--input-file", input_file])827 if output_file:828 cli_inputs.extend(["--output-file", output_file])829 if since:830 cli_inputs.extend(["--since", since])831 if until:832 cli_inputs.extend(["--until", until])833 if gcs_location:834 cli_inputs.extend(["--gcs-location", gcs_location])835 if conf_override:836 cli_inputs.extend(["--config-file", conf_override])837 exp_since = since or "1 hour ago"838 exp_until = until or "now"839 with runner.isolated_filesystem() as f:840 if input_file:841 with open(input_file, "w") as in_f:842 in_f.write("foo\n")843 if use_job_dir:844 cli_inputs.extend(["--job-dir", f])845 result = runner.invoke(cli.main, cli_inputs)846 assert exp_exit_code == result.exit_code847 assert exp_output_snippet in result.output848 exp_job_dir = patch_os_getcwd if not use_job_dir else f849 if use_job_dir and exp_exit_code == 0:850 mock_warn_if_py2_job.assert_called_once_with(exp_job_dir)851 if exp_exit_code != 2:852 mock_collector.assert_called_once_with(853 gcs_location=gcs_location or "gs://foo",854 input_file=input_file,855 output_file=output_file,856 since=exp_since,857 until=exp_until,858 )859 mock_collector.return_value.get.assert_called_once_with(860 ("tottime",), ()861 )862def test_collect_profiling_data_raises(863 runner, patch_os_getcwd, mock_warn_if_py2_job, mocker864):865 mock_collector = mocker.patch.object(866 cli.job_commands.profile, "DataflowProfileStatsCollector"867 )868 config = {869 "version": 2,870 "job_name": "test-job",871 "job_config": {"events": {}, "data": {}},872 "pipeline_options": {},873 }874 mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")875 mock_get_config.return_value = config876 cli_inputs = ["job", "profile", "collect-profiling-data"]877 result = runner.invoke(cli.main, cli_inputs)878 # no need to check the whole error msg string879 assert "Error: Please provide a GCS location" in result.output880 assert 2 == result.exit_code881 mock_collector.assert_not_called()882 mock_collector.return_value.get.assert_not_called()883@pytest.mark.parametrize(884 "input_file,entity_ids,exp_raise,exp_msg",885 (886 ("input.txt", (), False, None),887 (None, ("foo", "bar"), False, None),888 ("input.txt", ("foo", "bar"), True, "Illegal usage"),889 (None, (), True, "Must provide"),890 ),891)892def test_require_profile_input_data(893 input_file, entity_ids, exp_raise, exp_msg894):895 if exp_raise:896 with pytest.raises(click.UsageError, match=exp_msg):897 cli._require_profile_input_data(input_file, entity_ids)898 else:899 ret = cli._require_profile_input_data(input_file, entity_ids)900 assert ret is None901@pytest.mark.parametrize(902 "image_tag,config_override", ((None, None), ("foo-1234", "klio-job2.yaml"))903)904def test_profile(905 image_tag,906 config_override,907 config_file,908 patch_os_getcwd,909 pipeline_config_dict,910 mock_get_git_sha,911 mocker,912 mock_klio_config,913):914 mock_req_profile_input = mocker.patch.object(915 cli, "_require_profile_input_data"916 )917 config_data = {918 "job_name": "test-job",919 "pipeline_options": {920 "worker_harness_container_image": "gcr.register.io/squad/feature",921 "project": "test-project",922 "region": "boonies",923 "staging_location": "gs://somewhere/over/the/rainbow",924 "temp_location": "gs://somewhere/over/the/rainbow",925 },926 "job_config": {927 "inputs": [928 {929 "topic": "foo-topic",930 "subscription": "foo-sub",931 "data_location": "foo-input-location",932 }933 ],934 "outputs": [935 {936 "topic": "foo-topic-output",937 "data_location": "foo-output-location",938 }939 ],940 },941 }942 conf = mock_klio_config.setup(config_data, config_file, config_override)943 mock_pipeline = mocker.patch.object(944 cli.job_commands.profile, "ProfilePipeline"945 )946 exp_image_tag = image_tag or mock_get_git_sha.return_value947 if config_override:948 exp_image_tag = "{}-{}".format(exp_image_tag, config_override)949 exp_basic_config = cli.DockerRuntimeConfig(950 image_tag=exp_image_tag,951 force_build=False,952 config_file_override=config_override,953 )954 exp_prof_config = cli.ProfileConfig(955 input_file="input.txt",956 output_file="output.txt",957 show_logs=False,958 entity_ids=(),959 )960 kwargs = {961 "input_file": "input.txt",962 "output_file": "output.txt",963 "entity_ids": (),964 "force_build": False,965 "show_logs": False,966 "entity_ids": (),967 }968 cli._profile(969 subcommand="foo",970 klio_config=conf,971 image_tag=image_tag,972 config_meta=mock_klio_config.meta,973 **kwargs974 )975 mock_req_profile_input.assert_called_once_with("input.txt", ())976 exp_call = mocker.call(977 mock_klio_config.meta.job_dir, conf, exp_basic_config, exp_prof_config,978 )979 assert 1 == mock_pipeline.call_count980 assert exp_call == mock_pipeline.call_args981 mock_pipeline.return_value.run.assert_called_once_with(982 what="foo", subcommand_flags={}983 )984def test_profile_memory(runner, mocker, minimal_mock_klio_config):985 mock_profile = mocker.patch.object(cli, "_profile")986 result = runner.invoke(cli.profile_memory, [])987 core_testing.assert_execution_success(result)988 exp_kwargs = {989 "image_tag": None,990 "force_build": False,991 "include_children": False,992 "input_file": None,993 "interval": 0.1,994 "multiprocess": False,995 "output_file": None,996 "plot_graph": False,997 "show_logs": False,998 "entity_ids": (),999 }1000 mock_profile.assert_called_once_with(1001 "memory",1002 minimal_mock_klio_config.klio_config,1003 minimal_mock_klio_config.meta,1004 **exp_kwargs1005 )1006def test_profile_memory_per_line(runner, mocker, minimal_mock_klio_config):1007 mock_profile = mocker.patch.object(cli, "_profile")1008 result = runner.invoke(cli.profile_memory_per_line, [])1009 core_testing.assert_execution_success(result)1010 exp_kwargs = {1011 "get_maximum": False,1012 "per_element": False,1013 "image_tag": None,1014 "force_build": False,1015 "input_file": None,1016 "output_file": None,1017 "show_logs": False,1018 "entity_ids": (),1019 }1020 mock_profile.assert_called_once_with(1021 "memory-per-line",1022 minimal_mock_klio_config.klio_config,1023 minimal_mock_klio_config.meta,1024 **exp_kwargs1025 )1026def test_profile_cpu(runner, mocker, minimal_mock_klio_config):1027 mock_profile = mocker.patch.object(cli, "_profile")1028 result = runner.invoke(cli.profile_cpu, [])1029 core_testing.assert_execution_success(result)1030 exp_kwargs = {1031 "interval": 0.1,1032 "plot_graph": False,1033 "image_tag": None,1034 "force_build": False,1035 "input_file": None,1036 "output_file": None,1037 "show_logs": False,1038 "entity_ids": (),1039 }1040 mock_profile.assert_called_once_with(1041 "cpu",1042 minimal_mock_klio_config.klio_config,1043 minimal_mock_klio_config.meta,1044 **exp_kwargs1045 )1046def test_profile_timeit(1047 runner, mocker, mock_klio_config, minimal_mock_klio_config1048):1049 mock_profile = mocker.patch.object(cli, "_profile")1050 result = runner.invoke(cli.profile_timeit, [])1051 core_testing.assert_execution_success(result)1052 exp_kwargs = {1053 "iterations": 10,1054 "image_tag": None,1055 "force_build": False,1056 "input_file": None,1057 "output_file": None,1058 "show_logs": False,1059 "entity_ids": (),1060 }1061 mock_profile.assert_called_once_with(1062 "timeit",1063 mock_klio_config.klio_config,1064 minimal_mock_klio_config.meta,1065 **exp_kwargs1066 )1067@pytest.mark.parametrize(1068 "force,ping,top_down,bottom_up,non_klio,conf_override",1069 (1070 (False, False, False, False, False, None),1071 (False, False, True, False, False, None),1072 (False, True, False, False, False, None),1073 (True, False, False, False, False, None),1074 (True, True, True, False, False, None),1075 (False, False, False, False, True, None),1076 (False, False, True, False, True, None),1077 (False, False, False, False, False, "klio-job2.yaml"),1078 ),1079)1080def test_publish(1081 force,1082 ping,1083 top_down,1084 bottom_up,1085 non_klio,1086 conf_override,1087 runner,1088 mocker,1089 config_file,1090 patch_os_getcwd,1091 mock_klio_config,1092):1093 mock_publish = mocker.patch.object(1094 cli.message_commands.publish, "publish_messages"1095 )1096 config = {1097 "job_name": "test-job",1098 "pipeline_options": {1099 "worker_harness_container_image": "gcr.register.io/squad/feature"1100 },1101 "job_config": {1102 "inputs": [1103 {1104 "topic": "foo-topic",1105 "subscription": "foo-sub",1106 "data_location": "foo-input-location",1107 }1108 ],1109 "outputs": [1110 {1111 "topic": "foo-topic-output",1112 "data_location": "foo-output-location",1113 }1114 ],1115 },1116 }1117 cli_inputs = ["message", "publish", "deadb33f"]1118 if force:1119 cli_inputs.append("--force")1120 if ping:1121 cli_inputs.append("--ping")1122 if top_down:1123 cli_inputs.append("--top-down")1124 if bottom_up:1125 cli_inputs.append("--bottom-up")1126 if non_klio:1127 cli_inputs.append("--non-klio")1128 config["job_config"]["allow_non_klio_messages"] = True1129 if conf_override:1130 cli_inputs.extend(["--config-file", conf_override])1131 conf = mock_klio_config.setup(config, config_file)1132 result = runner.invoke(cli.main, cli_inputs)1133 core_testing.assert_execution_success(result)1134 assert "" == result.output1135 mock_publish.assert_called_once_with(1136 conf, ("deadb33f",), force, ping, top_down, non_klio,1137 )1138def test_publish_raises_execution(1139 runner, mocker, config_file, patch_os_getcwd, mock_klio_config1140):1141 mock_publish = mocker.patch.object(1142 cli.message_commands.publish, "publish_messages"1143 )1144 cli_inputs = [1145 "message",1146 "publish",1147 "deadb33f",...
_testing.py
Source:_testing.py
...13# limitations under the License.14#15from klio_core import config as kconfig16from klio_core import utils as core_utils17def assert_execution_success(result):18 """Helper for testing CLI commands that emits errors if execution failed"""19 if result.exception:20 if result.stdout:21 print("captured stdout: {}".format(result.stdout))22 raise result.exception23 assert 0 == result.exit_code24class MockKlioConfig(object):25 def __init__(self, module, mocker, monkeypatch, patch_os_getcwd):26 self.mock_get_config = mocker.patch.object(27 core_utils, "get_config_by_path"28 )29 self.mock_get_config_job_dir = mocker.Mock()30 monkeypatch.setattr(31 module.core_utils,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!