Best Python code snippet using hypothesis
test_workflow_refactoring.py
Source:test_workflow_refactoring.py
...45 self.workflow_populator.upload_yaml_workflow(REFACTORING_SIMPLE_TEST)46 actions = [47 {"action_type": "update_name", "name": "my cool new name"},48 ]49 self._refactor(actions)50 assert self._latest_workflow.stored_workflow.name == "my cool new name"51 actions = [52 {"action_type": "update_annotation", "annotation": "my cool new annotation"},53 ]54 response = self._refactor(actions)55 assert response.workflow["annotation"] == "my cool new annotation"56 actions = [57 {"action_type": "update_license", "license": "AFL-3.0"},58 ]59 self._refactor(actions)60 assert self._latest_workflow.license == "AFL-3.0"61 actions = [62 {"action_type": "update_creator", "creator": [{"class": "Person", "name": "Mary"}]},63 ]64 self._refactor(actions)65 assert self._latest_workflow.creator_metadata[0]["class"] == "Person"66 assert self._latest_workflow.creator_metadata[0]["name"] == "Mary"67 actions = [68 {"action_type": "update_report", "report": {"markdown": "my report..."}}69 ]70 self._refactor(actions)71 assert self._latest_workflow.reports_config["markdown"] == "my report..."72 assert self._latest_workflow.step_by_index(0).label == "test_input"73 actions = [74 {"action_type": "update_step_label", "step": {"order_index": 0}, "label": "new_label"},75 ]76 self._refactor(actions)77 assert self._latest_workflow.step_by_index(0).label == "new_label"78 actions = [79 {"action_type": "update_step_position", "step": {"order_index": 0}, "position": {"left": 3, "top": 5}},80 ]81 self._refactor(actions)82 assert self._latest_workflow.step_by_index(0).label == "new_label"83 assert self._latest_workflow.step_by_index(0).position["left"] == 384 assert self._latest_workflow.step_by_index(0).position["top"] == 585 # Build raw steps...86 actions = [87 {"action_type": "add_step", "type": "parameter_input", "label": "new_param", "tool_state": {"parameter_type": "text"}, "position": {"left": 10, "top": 50}},88 ]89 self._refactor(actions)90 assert self._latest_workflow.step_by_label("new_param").label == "new_param"91 assert self._latest_workflow.step_by_label("new_param").tool_inputs.get("optional", False) is False92 assert self._latest_workflow.step_by_label("new_param").position["left"] == 1093 assert self._latest_workflow.step_by_label("new_param").position["top"] == 5094 # Cleaner syntax for defining inputs...95 actions = [96 {"action_type": "add_input", "type": "text", "label": "new_param2", "optional": True, "position": {"top": 1, "left": 2}},97 ]98 self._refactor(actions)99 assert self._latest_workflow.step_by_label("new_param2").label == "new_param2"100 assert self._latest_workflow.step_by_label("new_param2").tool_inputs.get("optional", False) is True101 assert self._latest_workflow.step_by_label("new_param2").position["top"] == 1102 assert self._latest_workflow.step_by_label("new_param2").position["left"] == 2103 assert len(self._latest_workflow.step_by_label("first_cat").inputs) == 1104 actions = [105 {106 "action_type": "disconnect",107 "input": {"label": "first_cat", "input_name": "input1"},108 "output": {"label": "new_label"},109 }110 ]111 self._refactor(actions)112 assert len(self._latest_workflow.step_by_label("first_cat").inputs) == 0113 actions = [114 {115 "action_type": "connect",116 "input": {"label": "first_cat", "input_name": "input1"},117 "output": {"label": "new_label"},118 }119 ]120 self._refactor(actions)121 assert len(self._latest_workflow.step_by_label("first_cat").inputs) == 1122 # Re-disconnect so we can test extract_input123 actions = [124 {125 "action_type": "disconnect",126 "input": {"label": "first_cat", "input_name": "input1"},127 "output": {"label": "new_label"},128 }129 ]130 self._refactor(actions)131 # try to create an input for first_cat/input1 automatically132 actions = [133 {134 "action_type": "extract_input",135 "input": {"label": "first_cat", "input_name": "input1"},136 "label": "extracted_input",137 }138 ]139 self._refactor(actions)140 assert self._latest_workflow.step_by_label("extracted_input")141 assert len(self._latest_workflow.step_by_label("first_cat").inputs) == 1142 actions = [143 {144 "action_type": "update_output_label",145 "output": {"label": "first_cat", "output_name": "out_file1"},146 "output_label": "new_wf_out",147 }148 ]149 self._refactor(actions)150 assert self._latest_workflow.step_by_label("first_cat").workflow_outputs[0].label == "new_wf_out"151 def test_basic_refactoring_types_dry_run(self):152 self.workflow_populator.upload_yaml_workflow(REFACTORING_SIMPLE_TEST)153 actions = [154 {"action_type": "update_name", "name": "my cool new name"},155 ]156 response = self._dry_run(actions)157 assert response.workflow["name"] == "my cool new name"158 actions = [159 {"action_type": "update_annotation", "annotation": "my cool new annotation"},160 ]161 response = self._dry_run(actions)162 assert response.workflow["annotation"] == "my cool new annotation"163 actions = [164 {"action_type": "update_license", "license": "AFL-3.0"},165 ]166 response = self._dry_run(actions)167 assert response.workflow["license"] == "AFL-3.0"168 actions = [169 {"action_type": "update_creator", "creator": [{"class": "Person", "name": "Mary"}]},170 ]171 response = self._dry_run(actions)172 creator_list = response.workflow["creator"]173 assert isinstance(creator_list, list)174 creator = creator_list[0]175 assert creator["class"] == "Person"176 assert creator["name"] == "Mary"177 actions = [178 {"action_type": "update_report", "report": {"markdown": "my report..."}}179 ]180 response = self._dry_run(actions)181 assert response.workflow["report"]["markdown"] == "my report..."182 actions = [183 {"action_type": "add_step", "type": "parameter_input", "label": "new_param", "tool_state": {"parameter_type": "text"}, "position": {"left": 10, "top": 50}},184 ]185 response = self._dry_run(actions)186 workflow_dict = response.workflow187 assert _step_with_label(workflow_dict, "new_param")188 actions = [189 {190 "action_type": "update_output_label",191 "output": {"label": "first_cat", "output_name": "out_file1"},192 "output_label": "new_wf_out",193 }194 ]195 response = self._dry_run(actions)196 workflow_dict = response.workflow197 first_cat_step = _step_with_label(workflow_dict, "first_cat")198 assert first_cat_step["workflow_outputs"][0]["label"] == "new_wf_out"199 def test_refactoring_legacy_parameters(self):200 wf = self.workflow_populator.load_workflow_from_resource("test_workflow_randomlines_legacy_params")201 self.workflow_populator.create_workflow(wf)202 actions = [203 {"action_type": "extract_untyped_parameter", "name": "seed"},204 {"action_type": "extract_untyped_parameter", "name": "num", "label": "renamed_num"},205 ]206 self._refactor(actions)207 assert self._latest_workflow.step_by_label("seed").tool_inputs["parameter_type"] == "text"208 assert self._latest_workflow.step_by_label("renamed_num").tool_inputs["parameter_type"] == "integer"209 random_lines_state = self._latest_workflow.step_by_index(2).tool_inputs210 assert "num_lines" in random_lines_state211 num_lines = random_lines_state["num_lines"]212 assert isinstance(num_lines, dict)213 assert "__class__" in num_lines214 assert num_lines["__class__"] == 'ConnectedValue'215 assert "seed_source" in random_lines_state216 seed_source = random_lines_state["seed_source"]217 assert isinstance(seed_source, dict)218 assert "seed" in seed_source219 seed = seed_source["seed"]220 assert isinstance(seed, dict)221 assert "__class__" in seed222 assert seed["__class__"] == 'ConnectedValue'223 # cannot handle mixed, incompatible types on the inputs though224 wf = self.workflow_populator.load_workflow_from_resource("test_workflow_randomlines_legacy_params_mixed_types")225 self.workflow_populator.create_workflow(wf)226 actions = [227 {"action_type": "extract_untyped_parameter", "name": "mixed_param"},228 ]229 expected_exception = None230 try:231 self._refactor(actions)232 except Exception as e:233 expected_exception = e234 assert expected_exception235 assert "input types" in str(expected_exception)236 def test_refactoring_legacy_parameters_without_tool_state(self):237 # test parameters used in PJA without being used in tool state.238 # These will work fine with the simplified workflow UI, but should probably239 # be formalized and assigned a unique label and informative annotation.240 self.workflow_populator.upload_yaml_workflow("""241class: GalaxyWorkflow242inputs:243 test_input: data244steps:245 first_cat:246 tool_id: cat247 in:248 input1: test_input249 outputs:250 out_file1:251 rename: "${pja_only_param} name"252""")253 actions = [254 {"action_type": "extract_untyped_parameter", "name": "pja_only_param"},255 ]256 self._refactor(actions)257 assert self._latest_workflow.step_by_label("pja_only_param").tool_inputs["parameter_type"] == "text"258 def test_refactoring_legacy_parameters_without_tool_state_dry_run(self):259 # same as above but dry run...260 self.workflow_populator.upload_yaml_workflow("""261class: GalaxyWorkflow262inputs:263 test_input: data264steps:265 first_cat:266 tool_id: cat267 in:268 input1: test_input269 outputs:270 out_file1:271 rename: "${pja_only_param} name"272""")273 actions = [274 {"action_type": "extract_untyped_parameter", "name": "pja_only_param"},275 ]276 response = self._dry_run(actions)277 new_step = _step_with_label(response.workflow, "pja_only_param")278 state_str = new_step["tool_state"]279 state = json.loads(state_str)280 assert state["parameter_type"] == "text"281 def test_refactoring_legacy_parameters_without_tool_state_relabel(self):282 # same thing as above, but apply relabeling and ensure PJA gets updated.283 self.workflow_populator.upload_yaml_workflow("""284class: GalaxyWorkflow285inputs:286 test_input: data287steps:288 first_cat:289 tool_id: cat290 in:291 input1: test_input292 outputs:293 out_file1:294 rename: "${pja_only_param} name"295""")296 actions = [297 {"action_type": "extract_untyped_parameter", "name": "pja_only_param", "label": "new_label"},298 ]299 self._refactor(actions)300 assert self._latest_workflow.step_by_label("new_label").tool_inputs["parameter_type"] == "text"301 pjas = self._latest_workflow.step_by_label("first_cat").post_job_actions302 assert len(pjas) == 1303 pja = pjas[0]304 assert "newname" in pja.action_arguments305 assert "${new_label}" in pja.action_arguments["newname"]306 def test_removing_unlabeled_workflow_outputs(self):307 wf = self.workflow_populator.load_workflow_from_resource("test_workflow_randomlines_legacy_params")308 self.workflow_populator.create_workflow(wf)309 only_step = self._latest_workflow.step_by_index(0)310 assert len(only_step.workflow_outputs) == 1311 actions = [312 {"action_type": "remove_unlabeled_workflow_outputs"},313 ]314 self._refactor(actions)315 only_step = self._latest_workflow.step_by_index(0)316 assert len(only_step.workflow_outputs) == 0317 def test_fill_defaults_option(self):318 # this is a prereq for other state filling refactoring tests that319 # would be better in API tests for workflow import options but fill320 # defaults happens automatically on export, so this might only be321 # testable in an integration test currently.322 # populating a workflow with incomplete state...323 wf = self.workflow_populator.load_workflow_from_resource("test_workflow_two_random_lines")324 ts = json.loads(wf["steps"]["0"]["tool_state"])325 del ts["num_lines"]326 wf["steps"]["0"]["tool_state"] = json.dumps(ts)327 self.workflow_populator.create_workflow(wf, fill_defaults=False)328 first_step = self._latest_workflow.step_by_label("random1")329 assert "num_lines" not in first_step.tool_inputs330 self.workflow_populator.create_workflow(wf, fill_defaults=True)331 first_step = self._latest_workflow.step_by_label("random1")332 assert "num_lines" in first_step.tool_inputs333 assert json.loads(first_step.tool_inputs["num_lines"]) == 1334 def test_refactor_works_with_subworkflows(self):335 self.workflow_populator.upload_yaml_workflow(WORKFLOW_NESTED_SIMPLE)336 actions = [337 {"action_type": "update_step_label", "step": {"label": "nested_workflow"}, "label": "new_nested_workflow"},338 ]339 self._refactor(actions)340 self._latest_workflow.step_by_label("new_nested_workflow")341 def test_refactor_works_with_incomplete_state(self):342 # populating a workflow with incomplete state...343 wf = self.workflow_populator.load_workflow_from_resource("test_workflow_two_random_lines")344 ts = json.loads(wf["steps"]["0"]["tool_state"])345 del ts["num_lines"]346 wf["steps"]["0"]["tool_state"] = json.dumps(ts)347 self.workflow_populator.create_workflow(wf, fill_defaults=False)348 assert self._latest_workflow.step_by_index(0).label == "random1"349 actions = [350 {"action_type": "update_step_label", "step": {"order_index": 0}, "label": "random1_new"},351 ]352 self._refactor(actions)353 first_step = self._latest_workflow.step_by_label("random1_new")354 assert "num_lines" not in first_step.tool_inputs355 def test_refactor_works_with_missing_tools(self):356 # populating a workflow with incomplete state...357 wf = self.workflow_populator.load_workflow_from_resource("test_workflow_two_random_lines")358 wf["steps"]["1"]["tool_id"] = "random-missing"359 wf["steps"]["1"]["content_id"] = "random-missing"360 self.workflow_populator.create_workflow(wf, fill_defaults=False)361 assert self._latest_workflow.step_by_index(1).label == "random2"362 assert self._latest_workflow.step_by_index(1).tool_id == "random-missing"363 assert "num_lines" in self._latest_workflow.step_by_index(1).tool_inputs364 actions = [365 {"action_type": "update_step_label", "step": {"order_index": 1}, "label": "random2_new"},366 ]367 self._refactor(actions)368 assert self._latest_workflow.step_by_index(1).label == "random2_new"369 assert "num_lines" in self._latest_workflow.step_by_index(1).tool_inputs370 def test_refactor_fill_step_defaults(self):371 self._load_two_random_lines_wf_with_missing_state()372 actions = [373 {"action_type": "fill_step_defaults", "step": {"order_index": 0}},374 ]375 action_executions = self._refactor(actions).action_executions376 first_step = self._latest_workflow.step_by_label("random1")377 assert "num_lines" in first_step.tool_inputs378 assert len(action_executions) == 1379 action_execution = action_executions[0]380 assert len(action_execution.messages) == 1381 message = action_execution.messages[0]382 assert message.order_index == 0383 assert message.step_label == "random1"384 assert message.input_name == "num_lines"385 # ensure other step untouched...386 second_step = self._latest_workflow.step_by_label("random2")387 assert "num_lines" not in second_step.tool_inputs388 def test_refactor_fill_step_defaults_dry_run(self):389 self._load_two_random_lines_wf_with_missing_state()390 actions = [391 {"action_type": "fill_step_defaults", "step": {"order_index": 0}},392 ]393 response = self._dry_run(actions)394 action_executions = response.action_executions395 assert len(action_executions) == 1396 action_execution = action_executions[0]397 assert len(action_execution.messages) == 1398 message = action_execution.messages[0]399 assert message.order_index == 0400 assert message.step_label == "random1"401 assert message.input_name == "num_lines"402 # TODO:403 # first_step = self._latest_workflow.step_by_label("random1")404 # assert "num_lines" in first_step.tool_inputs405 # ensure other step untouched...406 # second_step = self._latest_workflow.step_by_label("random2")407 # assert "num_lines" not in second_step.tool_inputs408 def test_refactor_fill_defaults(self):409 self._load_two_random_lines_wf_with_missing_state()410 actions = [411 {"action_type": "fill_defaults"},412 ]413 action_executions = self._refactor(actions).action_executions414 first_step = self._latest_workflow.step_by_label("random1")415 assert "num_lines" in first_step.tool_inputs416 second_step = self._latest_workflow.step_by_label("random2")417 assert "num_lines" in second_step.tool_inputs418 assert len(action_executions) == 1419 action_execution = action_executions[0]420 assert len(action_execution.messages) == 2421 message = action_execution.messages[0]422 assert message.order_index == 0423 assert message.step_label == "random1"424 assert message.input_name == "num_lines"425 message = action_execution.messages[1]426 assert message.order_index == 1427 assert message.step_label == "random2"428 assert message.input_name == "num_lines"429 def test_tool_version_upgrade_no_state_change(self):430 self.workflow_populator.upload_yaml_workflow("""431class: GalaxyWorkflow432steps:433 the_step:434 tool_id: multiple_versions435 tool_version: '0.1'436 state:437 inttest: 0438""")439 assert self._latest_workflow.step_by_label("the_step").tool_version == "0.1"440 actions = [441 {"action_type": "upgrade_tool", "step": {"label": "the_step"}},442 ]443 # t = self._app.toolbox.get_tool("multiple_versions", tool_version="0.1")444 # assert t is not None445 # assert t.version == "0.1"446 action_executions = self._refactor(actions).action_executions447 assert len(action_executions) == 1448 assert len(action_executions[0].messages) == 0449 assert self._latest_workflow.step_by_label("the_step").tool_version == "0.2"450 def test_tool_version_upgrade_state_added(self):451 self.workflow_populator.upload_yaml_workflow("""452class: GalaxyWorkflow453steps:454 the_step:455 tool_id: multiple_versions_changes456 tool_version: '0.1'457 state:458 inttest: 0459""")460 assert self._latest_workflow.step_by_label("the_step").tool_version == "0.1"461 actions = [462 {"action_type": "upgrade_tool", "step": {"label": "the_step"}, "tool_version": "0.2"},463 ]464 action_executions = self._refactor(actions).action_executions465 assert self._latest_workflow.step_by_label("the_step").tool_version == "0.2"466 assert len(action_executions) == 1467 messages = action_executions[0].messages468 assert len(messages) == 1469 message = messages[0]470 assert message.message_type == RefactorActionExecutionMessageTypeEnum.tool_state_adjustment471 assert message.order_index == 0472 assert message.step_label == "the_step"473 assert message.input_name == "floattest"474 def test_subworkflow_upgrade_simplest(self):475 self.workflow_populator.upload_yaml_workflow(WORKFLOW_NESTED_SIMPLE)476 # second oldest workflow will be the nested workflow, grab it and update...477 nested_stored_workflow = self._recent_stored_workflow(2)478 assert len(nested_stored_workflow.workflows) == 1479 self._increment_nested_workflow_version(480 nested_stored_workflow,481 num_lines_from="1",482 num_lines_to="2"483 )484 self._app.model.session.expunge(nested_stored_workflow)485 # ensure subworkflow updated properly...486 nested_stored_workflow = self._recent_stored_workflow(2)487 assert len(nested_stored_workflow.workflows) == 2488 updated_nested_step = nested_stored_workflow.latest_workflow.step_by_label("random_lines")489 assert updated_nested_step.tool_inputs["num_lines"] == "2"490 # we now have an nested workflow with a simple update, download491 # the target workflow and ensure it is pointing at the old version492 pre_upgrade_native = self._download_native(self._most_recent_stored_workflow)493 self._assert_nested_workflow_num_lines_is(pre_upgrade_native, "1")494 actions = [495 {"action_type": "upgrade_subworkflow", "step": {"label": "nested_workflow"}},496 ]497 response = self._dry_run(actions)498 action_executions = response.action_executions499 assert len(action_executions) == 1500 assert len(action_executions[0].messages) == 0501 action_executions = self._refactor(actions).action_executions502 assert len(action_executions) == 1503 assert len(action_executions[0].messages) == 0504 post_upgrade_native = self._download_native(self._most_recent_stored_workflow)505 self._assert_nested_workflow_num_lines_is(post_upgrade_native, "2")506 def test_subworkflow_upgrade_specified(self):507 self.workflow_populator.upload_yaml_workflow(WORKFLOW_NESTED_SIMPLE)508 # second oldest workflow will be the nested workflow, grab it and update...509 nested_stored_workflow = self._recent_stored_workflow(2)510 # create two versions so we can test jumping to the middle one...511 self._increment_nested_workflow_version(512 nested_stored_workflow,513 num_lines_from="1",514 num_lines_to="20"515 )516 self._increment_nested_workflow_version(517 nested_stored_workflow,518 num_lines_from="20",519 num_lines_to="30"520 )521 self._app.model.session.expunge(nested_stored_workflow)522 # ensure subworkflow updated properly...523 nested_stored_workflow = self._recent_stored_workflow(2)524 assert len(nested_stored_workflow.workflows) == 3525 middle_workflow_id = self._app.security.encode_id(nested_stored_workflow.workflows[1].id)526 actions = [527 {"action_type": "upgrade_subworkflow", "step": {"label": "nested_workflow"}, "content_id": middle_workflow_id},528 ]529 action_executions = self._dry_run(actions).action_executions530 assert len(action_executions) == 1531 assert len(action_executions[0].messages) == 0532 action_executions = self._refactor(actions).action_executions533 assert len(action_executions) == 1534 assert len(action_executions[0].messages) == 0535 post_upgrade_native = self._download_native(self._most_recent_stored_workflow)536 self._assert_nested_workflow_num_lines_is(post_upgrade_native, "20")537 def test_subworkflow_upgrade_connection_input_dropped(self):538 self.workflow_populator.upload_yaml_workflow(WORKFLOW_NESTED_SIMPLE)539 nested_stored_workflow = self._recent_stored_workflow(2)540 actions = [541 {"action_type": "update_step_label", "step": {"label": "inner_input"}, "label": "renamed_inner_input"},542 ]543 self._refactor(actions, stored_workflow=nested_stored_workflow)544 actions = [545 {"action_type": "upgrade_subworkflow", "step": {"label": "nested_workflow"}},546 ]547 action_executions = self._refactor(actions).action_executions548 native_dict = self._download_native()549 nested_step = _step_with_label(native_dict, "nested_workflow")550 # order_index of subworkflow shifts down from "2" because it has no551 # inbound inputs552 assert nested_step["subworkflow"]["steps"]["0"]["label"] == "renamed_inner_input"553 assert len(action_executions) == 1554 messages = action_executions[0].messages555 assert len(messages) == 1556 message = messages[0]557 assert message.message_type == RefactorActionExecutionMessageTypeEnum.connection_drop_forced558 assert message.order_index == 2559 assert message.step_label == "nested_workflow"560 assert message.input_name == "inner_input"561 assert message.from_step_label == "first_cat"562 assert message.from_order_index == 1563 assert message.output_name == "out_file1"564 def test_subworkflow_upgrade_connection_output_dropped(self):565 self.workflow_populator.upload_yaml_workflow(WORKFLOW_NESTED_SIMPLE)566 nested_stored_workflow = self._recent_stored_workflow(2)567 actions = [568 {569 "action_type": "update_output_label",570 "output": {"label": "random_lines", "output_name": "out_file1"},571 "output_label": "renamed_output",572 }573 ]574 self._refactor(actions, stored_workflow=nested_stored_workflow)575 actions = [576 {"action_type": "upgrade_subworkflow", "step": {"label": "nested_workflow"}},577 ]578 action_executions = self._refactor(actions).action_executions579 assert len(action_executions) == 1580 messages = action_executions[0].messages581 # it was connected to two inputs on second_cat step582 assert len(messages) == 2583 for message in messages:584 assert message.message_type == RefactorActionExecutionMessageTypeEnum.connection_drop_forced585 assert message.order_index == 3586 assert message.step_label == "second_cat"587 assert message.input_name in ["input1", "queries_0|input2"]588 assert message.from_step_label == "nested_workflow"589 assert message.from_order_index == 2590 assert message.output_name == "workflow_output"591 def test_subworkflow_upgrade_output_label_dropped(self):592 self.workflow_populator.upload_yaml_workflow(WORKFLOW_NESTED_RUNTIME_PARAMETER)593 nested_stored_workflow = self._recent_stored_workflow(2)594 actions = [595 {596 "action_type": "update_output_label",597 "output": {"label": "random_lines", "output_name": "out_file1"},598 "output_label": "renamed_output",599 }600 ]601 self._refactor(actions, stored_workflow=nested_stored_workflow)602 actions = [603 {"action_type": "upgrade_subworkflow", "step": {"label": "nested_workflow"}},604 ]605 action_executions = self._refactor(actions).action_executions606 assert len(action_executions) == 1607 messages = action_executions[0].messages608 assert len(messages) == 1609 message = messages[0]610 assert message.message_type == RefactorActionExecutionMessageTypeEnum.workflow_output_drop_forced611 assert message.order_index == 1612 assert message.step_label == "nested_workflow"613 assert message.output_name == "workflow_output"614 assert message.output_label == "outer_output"615 def _download_native(self, workflow=None):616 workflow = workflow or self._most_recent_stored_workflow617 workflow_id = self._app.security.encode_id(workflow.id)618 return self.workflow_populator.download_workflow(workflow_id)619 @contextlib.contextmanager620 def _export_for_update(self, workflow):621 workflow_id = self._app.security.encode_id(workflow.id)622 with self.workflow_populator.export_for_update(workflow_id) as workflow_object:623 yield workflow_object624 def _refactor(self, actions, stored_workflow=None, dry_run=False, style="ga"):625 user = self._app.model.session.query(User).order_by(User.id.desc()).limit(1).one()626 mock_trans = MockTrans(self._app, user)627 app = self._app628 original_url_for = app.url_for629 def url_for(*args, **kwd):630 return ''631 app.url_for = url_for632 try:633 return self._manager.refactor(634 mock_trans,635 stored_workflow or self._most_recent_stored_workflow,636 RefactorRequest(637 actions=actions, dry_run=dry_run, style=style638 )639 )640 finally:641 app = url_for = original_url_for642 def _dry_run(self, actions, stored_workflow=None):643 # Do a bunch of checks to ensure nothing workflow related was written to the database644 # or even added to the sa_session.645 sa_session = self._app.model.session646 sa_session.flush()647 sw_update_time = self._model_last_time(StoredWorkflow)648 assert sw_update_time649 w_update_time = self._model_last_time(Workflow)650 assert w_update_time651 ws_last_id = self._model_last_id(WorkflowStep)652 assert ws_last_id653 wsc_last_id = self._model_last_id(WorkflowStepConnection)654 pja_last_id = self._model_last_id(PostJobAction)655 pjaa_last_id = self._model_last_id(PostJobActionAssociation)656 wo_last_id = self._model_last_id(WorkflowOutput)657 response = self._refactor(actions, stored_workflow=stored_workflow, dry_run=True)658 sa_session.flush()659 sa_session.expunge_all()660 assert sw_update_time == self._model_last_time(StoredWorkflow)661 assert w_update_time == self._model_last_time(Workflow)662 assert ws_last_id == self._model_last_id(WorkflowStep)663 assert wsc_last_id == self._model_last_id(WorkflowStepConnection)664 assert pja_last_id == self._model_last_id(PostJobAction)665 assert pjaa_last_id == self._model_last_id(PostJobActionAssociation)666 assert wo_last_id == self._model_last_id(WorkflowOutput)667 return response668 def _model_last_time(self, clazz):669 obj = self._app.model.session.query(clazz).order_by(clazz.update_time.desc()).limit(1).one()670 return obj.update_time671 def _model_last_id(self, clazz):...
excel_to_json.py
Source:excel_to_json.py
1# -*- coding: utf-8 -*-2"""3Created on Thu Feb 10 12:11:28 20224@author: mascherbauer5"""6import pandas as pd7import numpy as np8import pyodbc9from pathlib import Path10# path_to_file = r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\inputdata\NUTS2021.xlsx"11# NUTS_codes = pd.read_excel(path_to_file,12# sheet_name="NUTS & SR 2021",13# engine="openpyxl"14# )["Code 2021"].dropna().to_numpy()15# NUTS1 = []16# NUTS2 = []17# NUTS3 = []18# # iterate through NUTS code and create tables for NUTS1, NUTS2 and NUTS319# for code in NUTS_codes:20# if "Z" in code: # drop extra nuts regions21# continue22# elif len(code) == 2:23# continue24# elif len(code) == 3: # NUTS125# NUTS1.append(code)26# elif len(code) == 4: # NUTS227# NUTS2.append(code)28# elif len(code) == 5: # NUTS329# NUTS3.append(code)30#31# nuts1_country_column = [nuts[:2] for nuts in NUTS1]32# nuts2_country_column = [nuts[:2] for nuts in NUTS2]33# nuts3_country_column = [nuts[:2] for nuts in NUTS3]34#35# nuts1_frame = pd.DataFrame(np.column_stack([nuts1_country_column, NUTS1]), columns=["country", "nuts_id"])36# nuts2_frame = pd.DataFrame(np.column_stack([nuts2_country_column, NUTS2]), columns=["country", "nuts_id"])37# nuts3_frame = pd.DataFrame(np.column_stack([nuts3_country_column, NUTS3]), columns=["country", "nuts_id"])38# # frames to json:39# nuts1_frame.to_json(r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\NUTS1.json", orient="table")40# nuts2_frame.to_json(r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\NUTS2.json", orient="table")41# nuts3_frame.to_json(r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\NUTS3.json", orient="table")42# path_to_file_heat_demand = r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\inputdata\heat_demand_nuts_3.csv"43# heat_demand_df = pd.read_csv(path_to_file_heat_demand)44# heat_demand_df.to_json(r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\heat_demand.json", orient="table")45# path_to_floor_area = r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\inputdata\ground_floor_area_residential_nuts_3.csv"46# ground_floor_area = pd.read_csv(path_to_floor_area, sep=";")47# ground_floor_area = ground_floor_area.loc[:, ["nuts_id", "sum"]]48# ground_floor_area.to_json(r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\residential_floor_area.json", orient="table")49#50#51# #%%52# path_to_synth_load = Path(r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\inputdata\synthetic_profiles")53# synth_load_df = pd.DataFrame()54# for child in path_to_synth_load.iterdir():55# for file in child.iterdir():56# if file.suffix == ".xlsx":57# df = pd.read_excel(file, sheet_name="Profile").set_index("Unnamed: 0")58# household_profile = pd.to_numeric(df.loc[:, "H0"].drop("ts [UTC]"))59# household_profile.index = pd.to_datetime(household_profile.index)60# household_profile = household_profile.resample("1H").sum()61# if len(household_profile) == 8785: # leap year (drop 29th february)62# household_profile = household_profile[~((household_profile.index.month == 2) &63# (household_profile.index.day == 29))]64# household_profile = household_profile.reset_index(drop=True).drop(0) # drop first row (23:00-24:00 last year)65# household_profile = household_profile.to_numpy() * 1_000 # from kWh in Wh66# year = file.stem[-4:]67# synth_load_df[year] = household_profile68#69# # save file to json70# synth_load_df.to_json(r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\synthetic_load_household.json", orient="table")71#72# # Hot water demand profile73hot_water_path = r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Philipp\inputdata\AUT\Hot_water_profile.xlsx"74hot_water = pd.read_excel(Path(hot_water_path), engine="openpyxl")75hot_water = hot_water["Profile"] * 1_00076# hot_water.to_json(r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\hot_water_demand.json", orient="table")77path_to_prices = r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Philipp\inputdata\Variable_prices\el_prices_2030_AT.xlsx"78prices = pd.read_excel(path_to_prices, sheet_name="Tabelle1")79prices = prices.loc[:, [53, 106, 211]].drop(0)80prices = prices.rename(columns={53: "53", 106: "106", 211: "211"})81# add first day as last day as one day is missing:82prices = pd.concat([prices, prices.loc[:24, :]], axis=0).reset_index(drop=True)83prices = prices.apply(pd.to_numeric)84# von â¬/MWh into cent/Wh85prices = prices * 100 / 1_000 / 1_00086prices.to_json(r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\data\price_profiles.json", orient="table")87# path_to_synth_load = r"C:\Users\mascherbauer\PycharmProjects\NewTrends\Prosumager\_Refactor\data\inputdata\synthetic_profiles\synthload2017\SynthLoad2017.mdb"88# driver = "{Microsoft Access Driver (*.mdb)}"89# PWD = ""90# con = pyodbc.connect('DRIVER={};DBQ={};PWD={}'.format(driver,path_to_synth_load,PWD))91# cur = con.cursor()92# SQL = 'SELECT * FROM all_tables;'...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!