Best Python code snippet using localstack_python
step_functions_client.py
Source:step_functions_client.py
...65 yield response66 else:67 sleep(wait_sleep_time)68 @error_handler69 def send_task_heartbeat(self, task_token):70 self._client.send_task_heartbeat(taskToken=task_token)71 def send_task_heartbeats_many(self, tasks_tokens):72 for token in tasks_tokens:73 self.send_task_heartbeat(task_token=token)74 @error_handler75 def send_task_success(self, task_token, output):76 logger.info(f'Sending successful response for task {task_token[-10:]}.')77 return self._client.send_task_success(taskToken=task_token, output=json.dumps(output))78 @error_handler79 def send_task_failure(self, task_token, cause=None, error=''):80 logger.error(f'Sending failed response for token {task_token[-10:]}')81 if cause:82 return self._client.send_task_failure(taskToken=task_token, cause=cause, error=error)83 exctype, value, tb = sys.exc_info()84 stacktrace = traceback.format_exception(exctype, value, tb)85 return self.send_task_failure_exception(task_token, exception=value, stacktrace=stacktrace)86 def send_task_failure_exception(self, task_token, exception, stacktrace):87 logger.error(f'Sending failed response for token: {task_token[-10:]}')...
test_activity.py
Source:test_activity.py
1import os2import signal3import time4from contextlib import suppress5import pytest6from mock import ANY, call, create_autospec, patch7from osdu_commons.activity_worker.activity import Activity, ActivityWorker8from osdu_commons.activity_worker.step_functions_client import (StepFunctionsClient,9 StepFunctionsException,10 StepFunctionsTaskTimeoutException)11@pytest.fixture()12def step_functions_client():13 step_functions_client_mock = create_autospec(StepFunctionsClient)14 step_functions_client_mock.iterate_activity_tasks.return_value = [15 {'taskToken': 'token 1', 'input': 'input-1'},16 {'taskToken': 'token 2', 'input': 'input-2'}17 ]18 return step_functions_client_mock19@pytest.fixture()20def activity():21 activity_mock = create_autospec(Activity)22 activity_mock.activity_handler.side_effect = ['output-1', 'output-2']23 return activity_mock24@pytest.fixture()25def worker(step_functions_client, activity):26 return ActivityWorker(activity, 'test_activity_worker', 'activity_arn_123', step_functions_client)27def test_tasks_are_executed(worker, activity, step_functions_client):28 activity.activity_handler.assert_not_called()29 worker.run()30 activity.activity_handler.assert_has_calls([call('input-1'), call('input-2')])31 step_functions_client.send_task_success.assert_has_calls([call('token 1', 'output-1'), call('token 2', 'output-2')])32def test_token_is_set_to_none_when_task_is_done(worker):33 worker.run()34 assert worker.token is None35def test_sigterm_is_handled_correctly(worker, activity, step_functions_client):36 activity.activity_handler.side_effect = lambda _: os.kill(os.getpid(), signal.SIGINT)37 with suppress(SystemExit):38 worker.run()39 step_functions_client.send_task_failure.assert_called_once_with(40 'token 1', cause='Signal 2', error='Process terminated'41 )42 step_functions_client.send_task_success.assert_not_called()43def test_heartbeats_are_sent(worker, step_functions_client, activity):44 activity.activity_handler.side_effect = lambda _: time.sleep(0.01)45 with patch('osdu_commons.activity_worker.activity.sleep', return_value=None):46 worker.run()47 step_functions_client.send_task_heartbeat.assert_has_calls([call('token 1'), call('token 2')])48def test_send_failure_when_error_during_heart_beats_sending(worker, activity, step_functions_client):49 activity.activity_handler.side_effect = lambda _: time.sleep(0.01)50 step_functions_client.send_task_heartbeat.side_effect = StepFunctionsException('Some exception')51 with patch('osdu_commons.activity_worker.activity.sleep', return_value=None):52 with suppress(SystemExit):53 worker.run()54 step_functions_client.send_task_heartbeat.assert_called_once_with('token 1')55 step_functions_client.send_task_failure_exception.assert_called_once_with('token 1', ANY, ANY)56 step_functions_client.send_task_failure.assert_not_called()57def test_activity_worker_is_terminated_when_sf_timed_out(worker, activity, step_functions_client):58 activity.activity_handler.side_effect = lambda _: time.sleep(0.01)59 step_functions_client.send_task_heartbeat.side_effect = StepFunctionsTaskTimeoutException()60 with patch('osdu_commons.activity_worker.activity.sleep', return_value=None):61 with suppress(SystemExit):62 worker.run()63 step_functions_client.send_task_heartbeat.assert_called_once_with('token 1')64 step_functions_client.send_task_failure_exception.assert_not_called()65 step_functions_client.send_task_failure.assert_not_called()66def test_task_failure_is_send_when_exception_occurs(worker, activity, step_functions_client):67 exception_raised_while_processing = Exception('Exception occurred while processing')68 activity.activity_handler.side_effect = exception_raised_while_processing69 with suppress(Exception):70 worker.run()71 step_functions_client.send_task_success.assert_not_called()72 step_functions_client.send_task_failure_exception.assert_called_with(73 'token 1',74 exception=exception_raised_while_processing,75 stacktrace=ANY...
step_functions_wrapper.py
Source:step_functions_wrapper.py
...29 def send_task_failure(self, token, error, cause):30 response = self.resource.send_task_success(31 taskToken=token, error=error, cause=cause)32 logger.info(response)33 def send_task_heartbeat(self, token):34 self.resource.send_task_heartbeat(taskToken=token)35sm_arn = os.getenv("SFN_ARN")36sfn = StepFunctions(sfn_resource=sfn_resource, sm_arn=sm_arn)37task_token = "abced"38output = {"Payload": {"msg": "Hello from Local"}}...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!