Best Python code snippet using Kiwi_python
example_workflows.py
Source:example_workflows.py
1# Licensed to the Apache Software Foundation (ASF) under one2# or more contributor license agreements. See the NOTICE file3# distributed with this work for additional information4# regarding copyright ownership. The ASF licenses this file5# to you under the Apache License, Version 2.0 (the6# "License"); you may not use this file except in compliance7# with the License. You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing,12# software distributed under the License is distributed on an13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY14# KIND, either express or implied. See the License for the15# specific language governing permissions and limitations16# under the License.17import os18from datetime import datetime19from google.protobuf.field_mask_pb2 import FieldMask20from airflow import DAG21from airflow.providers.google.cloud.operators.workflows import (22 WorkflowsCancelExecutionOperator,23 WorkflowsCreateExecutionOperator,24 WorkflowsCreateWorkflowOperator,25 WorkflowsDeleteWorkflowOperator,26 WorkflowsGetExecutionOperator,27 WorkflowsGetWorkflowOperator,28 WorkflowsListExecutionsOperator,29 WorkflowsListWorkflowsOperator,30 WorkflowsUpdateWorkflowOperator,31)32from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor33LOCATION = os.environ.get("GCP_WORKFLOWS_LOCATION", "us-central1")34PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")35WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_WORKFLOW_ID", "airflow-test-workflow")36# [START how_to_define_workflow]37WORKFLOW_CONTENT = """38- getCurrentTime:39 call: http.get40 args:41 url: https://us-central1-workflowsample.cloudfunctions.net/datetime42 result: currentTime43- readWikipedia:44 call: http.get45 args:46 url: https://en.wikipedia.org/w/api.php47 query:48 action: opensearch49 search: ${currentTime.body.dayOfTheWeek}50 result: wikiResult51- returnResult:52 return: ${wikiResult.body[1]}53"""54WORKFLOW = {55 "description": "Test workflow",56 "labels": {"airflow-version": "dev"},57 "source_contents": WORKFLOW_CONTENT,58}59# [END how_to_define_workflow]60EXECUTION = {"argument": ""}61SLEEP_WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_SLEEP_WORKFLOW_ID", "sleep_workflow")62SLEEP_WORKFLOW_CONTENT = """63- someSleep:64 call: sys.sleep65 args:66 seconds: 12067"""68SLEEP_WORKFLOW = {69 "description": "Test workflow",70 "labels": {"airflow-version": "dev"},71 "source_contents": SLEEP_WORKFLOW_CONTENT,72}73with DAG(74 "example_cloud_workflows",75 schedule_interval='@once',76 start_date=datetime(2021, 1, 1),77 catchup=False,78) as dag:79 # [START how_to_create_workflow]80 create_workflow = WorkflowsCreateWorkflowOperator(81 task_id="create_workflow",82 location=LOCATION,83 project_id=PROJECT_ID,84 workflow=WORKFLOW,85 workflow_id=WORKFLOW_ID,86 )87 # [END how_to_create_workflow]88 # [START how_to_update_workflow]89 update_workflows = WorkflowsUpdateWorkflowOperator(90 task_id="update_workflows",91 location=LOCATION,92 project_id=PROJECT_ID,93 workflow_id=WORKFLOW_ID,94 update_mask=FieldMask(paths=["name", "description"]),95 )96 # [END how_to_update_workflow]97 # [START how_to_get_workflow]98 get_workflow = WorkflowsGetWorkflowOperator(99 task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID100 )101 # [END how_to_get_workflow]102 # [START how_to_list_workflows]103 list_workflows = WorkflowsListWorkflowsOperator(104 task_id="list_workflows",105 location=LOCATION,106 project_id=PROJECT_ID,107 )108 # [END how_to_list_workflows]109 # [START how_to_delete_workflow]110 delete_workflow = WorkflowsDeleteWorkflowOperator(111 task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID112 )113 # [END how_to_delete_workflow]114 # [START how_to_create_execution]115 create_execution = WorkflowsCreateExecutionOperator(116 task_id="create_execution",117 location=LOCATION,118 project_id=PROJECT_ID,119 execution=EXECUTION,120 workflow_id=WORKFLOW_ID,121 )122 # [END how_to_create_execution]123 create_execution_id = create_execution.output["execution_id"]124 # [START how_to_wait_for_execution]125 wait_for_execution = WorkflowExecutionSensor(126 task_id="wait_for_execution",127 location=LOCATION,128 project_id=PROJECT_ID,129 workflow_id=WORKFLOW_ID,130 execution_id=create_execution_id,131 )132 # [END how_to_wait_for_execution]133 # [START how_to_get_execution]134 get_execution = WorkflowsGetExecutionOperator(135 task_id="get_execution",136 location=LOCATION,137 project_id=PROJECT_ID,138 workflow_id=WORKFLOW_ID,139 execution_id=create_execution_id,140 )141 # [END how_to_get_execution]142 # [START how_to_list_executions]143 list_executions = WorkflowsListExecutionsOperator(144 task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID145 )146 # [END how_to_list_executions]147 create_workflow_for_cancel = WorkflowsCreateWorkflowOperator(148 task_id="create_workflow_for_cancel",149 location=LOCATION,150 project_id=PROJECT_ID,151 workflow=SLEEP_WORKFLOW,152 workflow_id=SLEEP_WORKFLOW_ID,153 )154 create_execution_for_cancel = WorkflowsCreateExecutionOperator(155 task_id="create_execution_for_cancel",156 location=LOCATION,157 project_id=PROJECT_ID,158 execution=EXECUTION,159 workflow_id=SLEEP_WORKFLOW_ID,160 )161 # [START how_to_cancel_execution]162 cancel_execution = WorkflowsCancelExecutionOperator(163 task_id="cancel_execution",164 location=LOCATION,165 project_id=PROJECT_ID,166 workflow_id=SLEEP_WORKFLOW_ID,167 execution_id=create_execution_id,168 )169 # [END how_to_cancel_execution]170 create_workflow >> update_workflows >> [get_workflow, list_workflows]171 update_workflows >> [create_execution, create_execution_for_cancel]172 wait_for_execution >> [get_execution, list_executions]173 create_workflow_for_cancel >> create_execution_for_cancel >> cancel_execution174 [cancel_execution, list_executions] >> delete_workflow175 # Task dependencies created via `XComArgs`:176 # create_execution >> wait_for_execution177 # create_execution >> get_execution178 # create_execution >> cancel_execution179if __name__ == '__main__':180 dag.clear(dag_run_state=None)...
error_handling_test.py
Source:error_handling_test.py
...26 self.were_errors_sent = True27 self.errors_sent = {error.execution: error.error_message for error in errors}28 self.destination_type = destination_type29# ErrorHandler tests30def create_execution(source_name, destination_name):31 account_config = AccountConfig('', False, '', '', '')32 source = Source(source_name, SourceType.BIG_QUERY, ['', ''])33 destination = Destination(destination_name, DestinationType.ADS_OFFLINE_CONVERSION, [''])34 return Execution(account_config, source, destination)35def test_single_error_per_execution():36 error_handler = ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION, MockErrorNotifier())37 first_execution = create_execution('source1', 'destination1')38 second_execution = create_execution('source1', 'destination2')39 error_handler.add_error(first_execution, 'Error for first execution, fist input')40 error_handler.add_error(first_execution, 'Error for first execution, second input')41 error_handler.add_error(second_execution, 'Error for second execution, fist input')42 returned_errors = error_handler.errors43 assert len(returned_errors) == 244 assert returned_errors.keys() == {first_execution, second_execution}45def test_destination_type_consistency():46 error_handler = ErrorHandler(DestinationType.CM_OFFLINE_CONVERSION, MockErrorNotifier())47 wrong_destination_type_execution = create_execution('source', 'destination')48 with pytest.raises(ValueError):49 error_handler.add_error(wrong_destination_type_execution, 'error message')50def test_errors_sent_to_error_notifier():51 mock_notifier = MockErrorNotifier()52 error_handler = ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION, mock_notifier)53 first_execution = create_execution('source1', 'destination1')54 second_execution = create_execution('source1', 'destination2')55 error_handler.add_error(first_execution, 'Error for first execution, fist input')56 error_handler.add_error(second_execution, 'Error for second execution, fist input')57 error_handler.notify_errors()58 assert mock_notifier.were_errors_sent is True59 assert mock_notifier.errors_sent == {first_execution: 'Error for first execution, fist input',60 second_execution: 'Error for second execution, fist input'}61 assert mock_notifier.destination_type == DestinationType.ADS_OFFLINE_CONVERSION62def test_should_not_notify_when_empty_errors():63 mock_notifier = MockErrorNotifier()64 error_handler = ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION, mock_notifier)65 error_handler.notify_errors()66 assert mock_notifier.were_errors_sent is False67# GmailNotifier tests68def test_multiple_destinations_separated_by_comma():69 first_email = 'a@a.com'70 second_email = 'b@b.com'71 third_email = 'c@c.com'72 credentials = OAuthCredentials('', '', '', '')73 gmail_notifier = GmailNotifier(StaticValueProvider(str, 'true'), credentials,74 StaticValueProvider(str, f'{first_email}, {second_email} ,{third_email}'))75 emails = set(gmail_notifier.email_destinations)76 assert len(emails) == 377 assert set(emails) == {first_email, third_email, second_email}78def test_should_not_notify_when_param_is_false():79 first_email = 'a@a.com'80 second_email = 'b@b.com'81 third_email = 'c@c.com'82 credentials = OAuthCredentials('', '', '', '')83 gmail_notifier = GmailNotifier(StaticValueProvider(str, 'false'), credentials,84 StaticValueProvider(str, f'{first_email}, {second_email} ,{third_email}'))85 gmail_notifier.notify(DestinationType.ADS_OFFLINE_CONVERSION, [Error(create_execution('s', 'd'), 'error message')])86def test_should_not_notify_when_param_is_empty():87 first_email = 'a@a.com'88 second_email = 'b@b.com'89 third_email = 'c@c.com'90 credentials = OAuthCredentials('', '', '', '')91 gmail_notifier = GmailNotifier(StaticValueProvider(str, None), credentials,92 StaticValueProvider(str, f'{first_email}, {second_email} ,{third_email}'))...
test_trade.py
Source:test_trade.py
...14 assert trade.entry_price == 41015 assert trade.exit_price == 46016 assert trade.net_profit == 500017 # close the trade18 trade.executions.append(create_execution(ExecutionType.SELL, 300, 480, datetime(2017, 11, 29, 23, 55, 59, 0)))19 trade.calculate_trade_details()20 assert trade.total_shares == 40021 assert trade.open_shares == 022 assert trade.started_at == datetime(2017, 11, 20, 23, 55, 59, 0)23 assert trade.closed_at == datetime(2017, 11, 29, 23, 55, 59, 0)24 assert trade.status == TradeStatus.WIN25 assert trade.commissions == 026 assert trade.entry_price == 41027 assert trade.exit_price == 47528 assert trade.net_profit == 26000.029 assert trade.rmultiple == 65/3030def create_test_trade() -> Trade:31 trade: Trade = Trade()32 trade.ticker = "TSLA"33 trade.type = TradeType.LONG34 trade.stop_price = 38035 executions: List[Execution] = []36 executions.append(create_execution(ExecutionType.BUY, 200, 400, datetime(2017, 11, 28, 23, 55, 59, 0)))37 executions.append(create_execution(ExecutionType.BUY, 200, 420, datetime(2017, 11, 20, 23, 55, 59, 0)))38 executions.append(create_execution(ExecutionType.SELL, 100, 460, datetime(2017, 11, 28, 23, 55, 59, 0)))39 trade.executions = executions40 print(str(trade))41 return trade42 43def create_execution(type: ExecutionType, shares: int, price: float, execution_time: datetime) -> Execution:44 execution: Execution = Execution()45 execution.shares = shares46 execution.type = type47 execution.price = price48 execution.executed_at = execution_time...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!