Best Python code snippet using localstack_python
stepfunctions_basics.py
Source:stepfunctions_basics.py
1# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.2# SPDX-License-Identifier: Apache-2.03"""4Purpose5Shows how to use the AWS SDK for Python (Boto3) with AWS Step Functions to6create and run state machines.7"""8import json9import logging10from botocore.exceptions import ClientError11logger = logging.getLogger(__name__)12class StepFunctionsStateMachine:13 """Encapsulates Step Functions state machine functions."""14 def __init__(self, stepfunctions_client):15 """16 :param stepfunctions_client: A Boto3 Step Functions client.17 """18 self.stepfunctions_client = stepfunctions_client19 self.state_machine_name = None20 self.state_machine_arn = None21 def _clear(self):22 """23 Clears the object of its instance data.24 """25 self.state_machine_name = None26 self.state_machine_arn = None27 def create(self, name, definition, role_arn):28 """29 Creates a new state machine.30 :param name: The name of the new state machine.31 :param definition: A dict that contains all of the state and flow control32 information. The dict is translated to JSON before it is33 uploaded.34 :param role_arn: A role that grants Step Functions permission to access any35 AWS services that are specified in the definition.36 :return: The Amazon Resource Name (ARN) of the new state machine.37 """38 try:39 response = self.stepfunctions_client.create_state_machine(40 name=name, definition=json.dumps(definition), roleArn=role_arn)41 self.state_machine_name = name42 self.state_machine_arn = response['stateMachineArn']43 logger.info(44 "Created state machine %s. ARN is %s.", name, self.state_machine_arn)45 except ClientError:46 logger.exception("Couldn't create state machine %s.", name)47 raise48 else:49 return self.state_machine_arn50 def update(self, definition, role_arn=None):51 """52 Updates an existing state machine. Any runs currently operating do not update53 until they are stopped.54 :param definition: A dict that contains all of the state and flow control55 information for the state machine. This completely replaces56 the existing definition.57 :param role_arn: A role that grants Step Functions permission to access any58 AWS services that are specified in the definition.59 """60 if self.state_machine_arn is None:61 raise ValueError62 try:63 kwargs = {64 'stateMachineArn': self.state_machine_arn,65 'definition': json.dumps(definition)}66 if role_arn is not None:67 kwargs['roleArn'] = role_arn68 self.stepfunctions_client.update_state_machine(**kwargs)69 logger.info("Updated state machine %s.", self.state_machine_name)70 except ClientError:71 logger.exception(72 "Couldn't update state machine %s.", self.state_machine_name)73 raise74 def delete(self):75 """76 Deletes a state machine and all associated run information.77 """78 if self.state_machine_arn is None:79 raise ValueError80 try:81 self.stepfunctions_client.delete_state_machine(82 stateMachineArn=self.state_machine_arn)83 logger.info("Deleted state machine %s.", self.state_machine_name)84 self._clear()85 except ClientError:86 logger.exception(87 "Couldn't delete state machine %s.", self.state_machine_name)88 raise89 def find(self, state_machine_name):90 """91 Finds a state machine by name. This function iterates the state machines for92 the current account until it finds a match and returns the first matching93 state machine.94 :param state_machine_name: The name of the state machine to find.95 :return: The ARN of the named state machine when found; otherwise, None.96 """97 self._clear()98 try:99 paginator = self.stepfunctions_client.get_paginator('list_state_machines')100 for page in paginator.paginate():101 for machine in page['stateMachines']:102 if machine['name'] == state_machine_name:103 self.state_machine_name = state_machine_name104 self.state_machine_arn = machine['stateMachineArn']105 break106 if self.state_machine_arn is not None:107 break108 if self.state_machine_arn is not None:109 logger.info(110 "Found state machine %s with ARN %s.", self.state_machine_name,111 self.state_machine_arn)112 else:113 logger.info("Couldn't find state machine %s.", state_machine_name)114 except ClientError:115 logger.exception("Couldn't find state machine %s.", state_machine_name)116 raise117 else:118 return self.state_machine_arn119 def describe(self):120 """121 Gets metadata about a state machine.122 :return: The metadata about the state machine.123 """124 if self.state_machine_arn is None:125 raise ValueError126 try:127 response = self.stepfunctions_client.describe_state_machine(128 stateMachineArn=self.state_machine_arn)129 logger.info("Got metadata for state machine %s.", self.state_machine_name)130 except ClientError:131 logger.exception(132 "Couldn't get metadata for state machine %s.", self.state_machine_name)133 raise134 else:135 return response136 def start_run(self, run_name, run_input=None):137 """138 Starts a run with the current state definition.139 :param run_name: The name of the run. This name must be unique for all runs140 for the state machine.141 :param run_input: Data that is passed as input to the run.142 :return: The ARN of the run.143 """144 if self.state_machine_arn is None:145 raise ValueError146 try:147 kwargs = {'stateMachineArn': self.state_machine_arn, 'name': run_name}148 if run_input is not None:149 kwargs['input'] = json.dumps(run_input)150 response = self.stepfunctions_client.start_execution(**kwargs)151 run_arn = response['executionArn']152 logger.info("Started run %s. ARN is %s.", run_name, run_arn)153 except ClientError:154 logger.exception("Couldn't start run %s.", run_name)155 raise156 else:157 return run_arn158 def list_runs(self, run_status=None):159 """160 Lists the runs for the state machine.161 :param run_status: When specified, only lists runs that have the specified162 status. Otherwise, all runs are listed.163 :return: The list of runs.164 """165 if self.state_machine_arn is None:166 raise ValueError167 try:168 kwargs = {'stateMachineArn': self.state_machine_arn}169 if run_status is not None:170 kwargs['statusFilter'] = run_status171 response = self.stepfunctions_client.list_executions(**kwargs)172 runs = response['executions']173 logger.info(174 "Got %s runs for state machine %s.", len(runs), self.state_machine_name)175 except ClientError:176 logger.exception(177 "Couldn't get runs for state machine %s.", self.state_machine_name)178 raise179 else:180 return runs181 def stop_run(self, run_arn, cause):182 """183 Stops a run.184 :param run_arn: The run to stop.185 :param cause: A description of why the run was stopped.186 """187 try:188 self.stepfunctions_client.stop_execution(executionArn=run_arn, cause=cause)189 logger.info("Stopping run %s.", run_arn)190 except ClientError:191 logger.exception("Couldn't stop run %s.", run_arn)...
stepfunctions_stubber.py
Source:stepfunctions_stubber.py
1# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.2# SPDX-License-Identifier: Apache-2.03"""4Stub functions that are used by the AWS Step Functions unit tests.5"""6from datetime import datetime7import json8from test_tools.example_stubber import ExampleStubber9class StepFunctionsStubber(ExampleStubber):10 """11 A class that implements stub functions used by Amazon Step Functions unit tests.12 The stubbed functions expect certain parameters to be passed to them as13 part of the tests, and raise errors if the parameters are not as expected.14 """15 def __init__(self, client, use_stubs=True):16 """17 Initializes the object with a specific client and configures it for18 stubbing or AWS passthrough.19 :param client: A Boto3 Step Functions client.20 :param use_stubs: When True, use stubs to intercept requests. Otherwise,21 pass requests through to AWS.22 """23 super().__init__(client, use_stubs)24 def stub_create_state_machine(25 self, name, definition, role_arn, state_machine_arn, error_code=None):26 expected_params = {27 'name': name, 'definition': json.dumps(definition), 'roleArn': role_arn}28 response = {29 'stateMachineArn': state_machine_arn, 'creationDate': datetime.now()}30 self._stub_bifurcator(31 'create_state_machine', expected_params, response, error_code=error_code)32 def stub_update_state_machine(33 self, state_machine_arn, definition, role_arn=None, error_code=None):34 expected_params = {35 'stateMachineArn': state_machine_arn, 'definition': json.dumps(definition)}36 if role_arn is not None:37 expected_params['roleArn'] = role_arn38 response = {'updateDate': datetime.now()}39 self._stub_bifurcator(40 'update_state_machine', expected_params, response, error_code=error_code)41 def stub_delete_state_machine(self, state_machine_arn, error_code=None):42 expected_params = {'stateMachineArn': state_machine_arn}43 response = {}44 self._stub_bifurcator(45 'delete_state_machine', expected_params, response, error_code=error_code)46 def stub_list_state_machines(self, state_machines, error_code=None):47 expected_params = {}48 response = {'stateMachines': [49 {**sm, 'type': 'STANDARD', 'creationDate': datetime.now()}50 for sm in state_machines]}51 self._stub_bifurcator(52 'list_state_machines', expected_params, response, error_code=error_code)53 def stub_describe_state_machine(54 self, state_machine_arn, name, definition, role_arn, error_code=None):55 expected_params = {'stateMachineArn': state_machine_arn}56 response = {57 'name': name, 'definition': definition, 'roleArn': role_arn,58 'stateMachineArn': state_machine_arn, 'type': 'STANDARD',59 'creationDate': datetime.now()}60 self._stub_bifurcator(61 'describe_state_machine', expected_params, response, error_code=error_code)62 def stub_start_execution(63 self, state_machine_arn, run_name, run_arn, run_input=None,64 error_code=None):65 expected_params = {'stateMachineArn': state_machine_arn, 'name': run_name}66 if run_input is not None:67 expected_params['input'] = json.dumps(run_input)68 response = {'executionArn': run_arn, 'startDate': datetime.now()}69 self._stub_bifurcator(70 'start_execution', expected_params, response, error_code=error_code)71 def stub_list_executions(72 self, state_machine_arn, runs, run_status=None, error_code=None):73 expected_params = {'stateMachineArn': state_machine_arn}74 if run_status is not None:75 expected_params['statusFilter'] = run_status76 response = {'executions': [77 {**run, 'stateMachineArn': state_machine_arn,78 'status': run_status if run_status is not None else 'RUNNING',79 'startDate': datetime.now()} for run in runs]}80 self._stub_bifurcator(81 'list_executions', expected_params, response, error_code=error_code)82 def stub_stop_execution(self, run_arn, cause, error_code=None):83 expected_params = {'executionArn': run_arn, 'cause': cause}84 response = {'stopDate': datetime.now()}85 self._stub_bifurcator(...
step_function_start_execution.py
Source:step_function_start_execution.py
1# Licensed to the Apache Software Foundation (ASF) under one2# or more contributor license agreements. See the NOTICE file3# distributed with this work for additional information4# regarding copyright ownership. The ASF licenses this file5# to you under the Apache License, Version 2.0 (the6# "License"); you may not use this file except in compliance7# with the License. You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing,12# software distributed under the License is distributed on an13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY14# KIND, either express or implied. See the License for the15# specific language governing permissions and limitations16# under the License.17from typing import Optional, Union18from airflow.exceptions import AirflowException19from airflow.models import BaseOperator20from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook21from airflow.utils.decorators import apply_defaults22class StepFunctionStartExecutionOperator(BaseOperator):23 """24 An Operator that begins execution of an Step Function State Machine25 Additional arguments may be specified and are passed down to the underlying BaseOperator.26 .. seealso::27 :class:`~airflow.models.BaseOperator`28 :param state_machine_arn: ARN of the Step Function State Machine29 :type state_machine_arn: str30 :param name: The name of the execution.31 :type name: Optional[str]32 :param state_machine_input: JSON data input to pass to the State Machine33 :type state_machine_input: Union[Dict[str, any], str, None]34 :param aws_conn_id: aws connection to uses35 :type aws_conn_id: str36 :param do_xcom_push: if True, execution_arn is pushed to XCom with key execution_arn.37 :type do_xcom_push: bool38 """39 template_fields = ['state_machine_arn', 'name', 'input']40 template_ext = ()41 ui_color = '#f9c915'42 @apply_defaults43 def __init__(44 self,45 *,46 state_machine_arn: str,47 name: Optional[str] = None,48 state_machine_input: Union[dict, str, None] = None,49 aws_conn_id='aws_default',50 region_name=None,51 **kwargs,52 ):53 super().__init__(**kwargs)54 self.state_machine_arn = state_machine_arn55 self.name = name56 self.input = state_machine_input57 self.aws_conn_id = aws_conn_id58 self.region_name = region_name59 def execute(self, context):60 hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)61 execution_arn = hook.start_execution(self.state_machine_arn, self.name, self.input)62 if execution_arn is None:63 raise AirflowException(f'Failed to start State Machine execution for: {self.state_machine_arn}')64 self.log.info('Started State Machine execution for %s: %s', self.state_machine_arn, execution_arn)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!