Best Python code snippet using localstack_python
test_activity_tasks.py
Source:test_activity_tasks.py
...51 resp = conn.count_pending_activity_tasks("test-domain", "non-existent")52 resp.should.equal({"count": 0, "truncated": False})53# RespondActivityTaskCompleted endpoint54@mock_swf_deprecated55def test_respond_activity_task_completed():56 conn = setup_workflow()57 decision_token = conn.poll_for_decision_task(58 "test-domain", "queue")["taskToken"]59 conn.respond_decision_task_completed(decision_token, decisions=[60 SCHEDULE_ACTIVITY_TASK_DECISION61 ])62 activity_token = conn.poll_for_activity_task(63 "test-domain", "activity-task-list")["taskToken"]64 resp = conn.respond_activity_task_completed(65 activity_token, result="result of the task")66 resp.should.be.none67 resp = conn.get_workflow_execution_history(68 "test-domain", conn.run_id, "uid-abcd1234")69 resp["events"][-2]["eventType"].should.equal("ActivityTaskCompleted")70 resp["events"][-2]["activityTaskCompletedEventAttributes"].should.equal(71 {"result": "result of the task", "scheduledEventId": 5, "startedEventId": 6}72 )73@mock_swf_deprecated74def test_respond_activity_task_completed_on_closed_workflow_execution():75 conn = setup_workflow()76 decision_token = conn.poll_for_decision_task(77 "test-domain", "queue")["taskToken"]78 conn.respond_decision_task_completed(decision_token, decisions=[79 SCHEDULE_ACTIVITY_TASK_DECISION80 ])81 activity_token = conn.poll_for_activity_task(82 "test-domain", "activity-task-list")["taskToken"]83 # bad: we're closing workflow execution manually, but endpoints are not84 # coded for now..85 wfe = swf_backend.domains[0].workflow_executions[-1]86 wfe.execution_status = "CLOSED"87 # /bad88 conn.respond_activity_task_completed.when.called_with(89 activity_token90 ).should.throw(SWFResponseError, "WorkflowExecution=")91@mock_swf_deprecated92def test_respond_activity_task_completed_with_task_already_completed():93 conn = setup_workflow()94 decision_token = conn.poll_for_decision_task(95 "test-domain", "queue")["taskToken"]96 conn.respond_decision_task_completed(decision_token, decisions=[97 SCHEDULE_ACTIVITY_TASK_DECISION98 ])99 activity_token = conn.poll_for_activity_task(100 "test-domain", "activity-task-list")["taskToken"]101 conn.respond_activity_task_completed(activity_token)102 conn.respond_activity_task_completed.when.called_with(103 activity_token104 ).should.throw(SWFResponseError, "Unknown activity, scheduledEventId = 5")105# RespondActivityTaskFailed endpoint106@mock_swf_deprecated107def test_respond_activity_task_failed():108 conn = setup_workflow()109 decision_token = conn.poll_for_decision_task(110 "test-domain", "queue")["taskToken"]111 conn.respond_decision_task_completed(decision_token, decisions=[112 SCHEDULE_ACTIVITY_TASK_DECISION113 ])114 activity_token = conn.poll_for_activity_task(115 "test-domain", "activity-task-list")["taskToken"]...
Worker.py
Source:Worker.py
...26 read_str = f.read()27 n = int(read_str)28 res = fibonacci(n)29 bucket.put_object(Key=key + "_out", Body=str(res))30 swf.respond_activity_task_completed(31 taskToken=task['taskToken'],32 result=str(res)33 )34 for object in bucket.objects.filter(Prefix="swf/" + key):35 if object.key == key:36 object.delete()37 print("Fibonacci job complete")38 elif task['activityType']['name'] == 'StoreInputS3':39 print("Got store input job")40 value = task['input']41 cur_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")42 bucket.put_object(Key="swf/" + cur_date, Body=value)43 swf.respond_activity_task_completed(44 taskToken=task['taskToken'],45 result=cur_date46 )47 elif task['activityType']['name'] == 'StoreOutputS3':48 print("Got store output job")49 value = task['input']50 cur_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")51 bucket.put_object(Key="swf_out/" + cur_date, Body=value)52 swf.respond_activity_task_completed(53 taskToken=task['taskToken'],54 result=cur_date...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!