How to use check_invocations method in localstack

Best Python code snippet using localstack_python

test_stepfunctions.py

Source: test_stepfunctions.py Github

copy

Full Screen

...244 result = self.sfn_client.start_execution(245 stateMachineArn=sm_arn, input=json.dumps(test_input)246 )247 self.assertTrue(result.get("executionArn"))248 def check_invocations():249 self.assertIn(lambda_arn_3, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)250 # assert that the result is correct251 result = self._get_execution_results(sm_arn)252 self.assertEqual(test_output, result)253 # assert that the lambda has been invoked by the SM execution254 retry(check_invocations, sleep=1, retries=10)255 # clean up256 self.cleanup(sm_arn, state_machines_before)257 def test_create_run_state_machine(self):258 state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]259 # create state machine260 role_arn = aws_stack.role_arn("sfn_role")261 definition = clone(STATE_MACHINE_BASIC)262 lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_1)263 lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_2)264 definition["States"]["step1"]["Resource"] = lambda_arn_1265 definition["States"]["step2"]["Resource"] = lambda_arn_2266 definition = json.dumps(definition)267 sm_name = "basic-%s" % short_uid()268 result = self.sfn_client.create_state_machine(269 name=sm_name, definition=definition, roleArn=role_arn270 )271 # assert that the SM has been created272 self.assert_machine_created(state_machines_before)273 # run state machine274 sm_arn = self.get_machine_arn(sm_name)275 lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()276 result = self.sfn_client.start_execution(stateMachineArn=sm_arn)277 self.assertTrue(result.get("executionArn"))278 def check_invocations():279 self.assertIn(lambda_arn_1, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)280 self.assertIn(lambda_arn_2, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)281 # assert that the result is correct282 result = self._get_execution_results(sm_arn)283 self.assertEqual({"Hello": TEST_RESULT_VALUE}, result["result_value"])284 # assert that the lambda has been invoked by the SM execution285 retry(check_invocations, sleep=0.7, retries=25)286 # clean up287 self.cleanup(sm_arn, state_machines_before)288 def test_try_catch_state_machine(self):289 state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]290 # create state machine291 role_arn = aws_stack.role_arn("sfn_role")292 definition = clone(STATE_MACHINE_CATCH)293 lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_1)294 lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_2)295 definition["States"]["Start"]["Parameters"]["FunctionName"] = lambda_arn_1296 definition["States"]["ErrorHandler"]["Resource"] = lambda_arn_2297 definition["States"]["Final"]["Resource"] = lambda_arn_2298 definition = json.dumps(definition)299 sm_name = "catch-%s" % short_uid()300 result = self.sfn_client.create_state_machine(301 name=sm_name, definition=definition, roleArn=role_arn302 )303 # run state machine304 sm_arn = self.get_machine_arn(sm_name)305 lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()306 result = self.sfn_client.start_execution(stateMachineArn=sm_arn)307 self.assertTrue(result.get("executionArn"))308 def check_invocations():309 self.assertIn(lambda_arn_1, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)310 self.assertIn(lambda_arn_2, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)311 # assert that the result is correct312 result = self._get_execution_results(sm_arn)313 self.assertEqual({"Hello": TEST_RESULT_VALUE}, result.get("handled"))314 # assert that the lambda has been invoked by the SM execution315 retry(check_invocations, sleep=1, retries=10)316 # clean up317 self.cleanup(sm_arn, state_machines_before)318 def test_intrinsic_functions(self):319 state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]320 # create state machine321 role_arn = aws_stack.role_arn("sfn_role")322 definition = clone(STATE_MACHINE_INTRINSIC_FUNCS)323 lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_5)324 lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_5)325 if isinstance(definition["States"]["state1"].get("Parameters"), dict):326 definition["States"]["state1"]["Parameters"]["lambda_params"][327 "FunctionName"328 ] = lambda_arn_1329 definition["States"]["state3"]["Resource"] = lambda_arn_2330 definition = json.dumps(definition)331 sm_name = "intrinsic-%s" % short_uid()332 self.sfn_client.create_state_machine(name=sm_name, definition=definition, roleArn=role_arn)333 # run state machine334 sm_arn = self.get_machine_arn(sm_name)335 lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()336 input = {}337 result = self.sfn_client.start_execution(stateMachineArn=sm_arn, input=json.dumps(input))338 self.assertTrue(result.get("executionArn"))339 def check_invocations():340 self.assertIn(lambda_arn_1, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)341 self.assertIn(lambda_arn_2, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)342 # assert that the result is correct343 result = self._get_execution_results(sm_arn)344 self.assertEqual({"payload": {"values": [1, "v2"]}}, result.get("result_value"))345 # assert that the lambda has been invoked by the SM execution346 retry(check_invocations, sleep=1, retries=10)347 # clean up348 self.cleanup(sm_arn, state_machines_before)349 def test_events_state_machine(self):350 events = aws_stack.connect_to_service("events")351 state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]352 # create event bus353 bus_name = f"bus-{short_uid()}"354 events.create_event_bus(Name=bus_name)355 # create state machine356 definition = clone(STATE_MACHINE_EVENTS)357 definition["States"]["step1"]["Parameters"]["Entries"][0]["EventBusName"] = bus_name358 definition = json.dumps(definition)359 sm_name = "events-%s" % short_uid()360 role_arn = aws_stack.role_arn("sfn_role")361 self.sfn_client.create_state_machine(name=sm_name, definition=definition, roleArn=role_arn)362 # run state machine363 events_before = len(TEST_EVENTS_CACHE)364 sm_arn = self.get_machine_arn(sm_name)365 result = self.sfn_client.start_execution(stateMachineArn=sm_arn)366 self.assertTrue(result.get("executionArn"))367 def check_invocations():368 # assert that the event is received369 self.assertEqual(events_before + 1, len(TEST_EVENTS_CACHE))370 last_event = TEST_EVENTS_CACHE[-1]371 self.assertEqual(bus_name, last_event["EventBusName"])372 self.assertEqual("TestSource", last_event["Source"])373 self.assertEqual("TestMessage", last_event["DetailType"])374 self.assertEqual(375 {"Message": "Hello from Step Functions!"}, json.loads(last_event["Detail"])376 )377 # assert that the event bus has received an event from the SM execution378 retry(check_invocations, sleep=1, retries=10)379 # clean up380 self.cleanup(sm_arn, state_machines_before)381 events.delete_event_bus(Name=bus_name)...

Full Screen

Full Screen

Blogs

Check out the latest blogs from LambdaTest on this topic:

13 Best Java Testing Frameworks For 2023

The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.

QA Innovation – Using the senseshaping concept to discover customer needs

QA Innovation - Using the senseshaping concept to discover customer needsQA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.

Best 23 Web Design Trends To Follow In 2023

Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.

Acquiring Employee Support for Change Management Implementation

Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful