Best Python code snippet using localstack_python
test_stepfunctions.py
Source: test_stepfunctions.py
...244 result = self.sfn_client.start_execution(245 stateMachineArn=sm_arn, input=json.dumps(test_input)246 )247 self.assertTrue(result.get("executionArn"))248 def check_invocations():249 self.assertIn(lambda_arn_3, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)250 # assert that the result is correct251 result = self._get_execution_results(sm_arn)252 self.assertEqual(test_output, result)253 # assert that the lambda has been invoked by the SM execution254 retry(check_invocations, sleep=1, retries=10)255 # clean up256 self.cleanup(sm_arn, state_machines_before)257 def test_create_run_state_machine(self):258 state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]259 # create state machine260 role_arn = aws_stack.role_arn("sfn_role")261 definition = clone(STATE_MACHINE_BASIC)262 lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_1)263 lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_2)264 definition["States"]["step1"]["Resource"] = lambda_arn_1265 definition["States"]["step2"]["Resource"] = lambda_arn_2266 definition = json.dumps(definition)267 sm_name = "basic-%s" % short_uid()268 result = self.sfn_client.create_state_machine(269 name=sm_name, definition=definition, roleArn=role_arn270 )271 # assert that the SM has been created272 self.assert_machine_created(state_machines_before)273 # run state machine274 sm_arn = self.get_machine_arn(sm_name)275 lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()276 result = self.sfn_client.start_execution(stateMachineArn=sm_arn)277 self.assertTrue(result.get("executionArn"))278 def check_invocations():279 self.assertIn(lambda_arn_1, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)280 self.assertIn(lambda_arn_2, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)281 # assert that the result is correct282 result = self._get_execution_results(sm_arn)283 self.assertEqual({"Hello": TEST_RESULT_VALUE}, result["result_value"])284 # assert that the lambda has been invoked by the SM execution285 retry(check_invocations, sleep=0.7, retries=25)286 # clean up287 self.cleanup(sm_arn, state_machines_before)288 def test_try_catch_state_machine(self):289 state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]290 # create state machine291 role_arn = aws_stack.role_arn("sfn_role")292 definition = clone(STATE_MACHINE_CATCH)293 lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_1)294 lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_2)295 definition["States"]["Start"]["Parameters"]["FunctionName"] = lambda_arn_1296 definition["States"]["ErrorHandler"]["Resource"] = lambda_arn_2297 definition["States"]["Final"]["Resource"] = lambda_arn_2298 definition = json.dumps(definition)299 sm_name = "catch-%s" % short_uid()300 result = self.sfn_client.create_state_machine(301 name=sm_name, definition=definition, roleArn=role_arn302 )303 # run state machine304 sm_arn = self.get_machine_arn(sm_name)305 lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()306 result = self.sfn_client.start_execution(stateMachineArn=sm_arn)307 self.assertTrue(result.get("executionArn"))308 def check_invocations():309 self.assertIn(lambda_arn_1, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)310 self.assertIn(lambda_arn_2, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)311 # assert that the result is correct312 result = self._get_execution_results(sm_arn)313 self.assertEqual({"Hello": TEST_RESULT_VALUE}, result.get("handled"))314 # assert that the lambda has been invoked by the SM execution315 retry(check_invocations, sleep=1, retries=10)316 # clean up317 self.cleanup(sm_arn, state_machines_before)318 def test_intrinsic_functions(self):319 state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]320 # create state machine321 role_arn = aws_stack.role_arn("sfn_role")322 definition = clone(STATE_MACHINE_INTRINSIC_FUNCS)323 lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_5)324 lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_5)325 if isinstance(definition["States"]["state1"].get("Parameters"), dict):326 definition["States"]["state1"]["Parameters"]["lambda_params"][327 "FunctionName"328 ] = lambda_arn_1329 definition["States"]["state3"]["Resource"] = lambda_arn_2330 definition = json.dumps(definition)331 sm_name = "intrinsic-%s" % short_uid()332 self.sfn_client.create_state_machine(name=sm_name, definition=definition, roleArn=role_arn)333 # run state machine334 sm_arn = self.get_machine_arn(sm_name)335 lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()336 input = {}337 result = self.sfn_client.start_execution(stateMachineArn=sm_arn, input=json.dumps(input))338 self.assertTrue(result.get("executionArn"))339 def check_invocations():340 self.assertIn(lambda_arn_1, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)341 self.assertIn(lambda_arn_2, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)342 # assert that the result is correct343 result = self._get_execution_results(sm_arn)344 self.assertEqual({"payload": {"values": [1, "v2"]}}, result.get("result_value"))345 # assert that the lambda has been invoked by the SM execution346 retry(check_invocations, sleep=1, retries=10)347 # clean up348 self.cleanup(sm_arn, state_machines_before)349 def test_events_state_machine(self):350 events = aws_stack.connect_to_service("events")351 state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]352 # create event bus353 bus_name = f"bus-{short_uid()}"354 events.create_event_bus(Name=bus_name)355 # create state machine356 definition = clone(STATE_MACHINE_EVENTS)357 definition["States"]["step1"]["Parameters"]["Entries"][0]["EventBusName"] = bus_name358 definition = json.dumps(definition)359 sm_name = "events-%s" % short_uid()360 role_arn = aws_stack.role_arn("sfn_role")361 self.sfn_client.create_state_machine(name=sm_name, definition=definition, roleArn=role_arn)362 # run state machine363 events_before = len(TEST_EVENTS_CACHE)364 sm_arn = self.get_machine_arn(sm_name)365 result = self.sfn_client.start_execution(stateMachineArn=sm_arn)366 self.assertTrue(result.get("executionArn"))367 def check_invocations():368 # assert that the event is received369 self.assertEqual(events_before + 1, len(TEST_EVENTS_CACHE))370 last_event = TEST_EVENTS_CACHE[-1]371 self.assertEqual(bus_name, last_event["EventBusName"])372 self.assertEqual("TestSource", last_event["Source"])373 self.assertEqual("TestMessage", last_event["DetailType"])374 self.assertEqual(375 {"Message": "Hello from Step Functions!"}, json.loads(last_event["Detail"])376 )377 # assert that the event bus has received an event from the SM execution378 retry(check_invocations, sleep=1, retries=10)379 # clean up380 self.cleanup(sm_arn, state_machines_before)381 events.delete_event_bus(Name=bus_name)...
Check out the latest blogs from LambdaTest on this topic:
The web paradigm has changed considerably over the last few years. Web 2.0, a term coined way back in 1999, was one of the pivotal moments in the history of the Internet. UGC (User Generated Content), ease of use, and interoperability for the end-users were the key pillars of Web 2.0. Consumers who were only consuming content up till now started creating different forms of content (e.g., text, audio, video, etc.).
How do we acquire knowledge? This is one of the seemingly basic but critical questions you and your team members must ask and consider. We are experts; therefore, we understand why we study and what we should learn. However, many of us do not give enough thought to how we learn.
These days, development teams depend heavily on feedback from automated tests to evaluate the quality of the system they are working on.
Hey LambdaTesters! We’ve got something special for you this week. ????
Continuous integration is a coding philosophy and set of practices that encourage development teams to make small code changes and check them into a version control repository regularly. Most modern applications necessitate the development of code across multiple platforms and tools, so teams require a consistent mechanism for integrating and validating changes. Continuous integration creates an automated way for developers to build, package, and test their applications. A consistent integration process encourages developers to commit code changes more frequently, resulting in improved collaboration and code quality.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!