Best Python code snippet using localstack_python
test_skeleton.py
Source:test_skeleton.py
...129 result = skeleton.invoke(context)130 # Use the parser from botocore to parse the serialized response131 response_parser = create_parser(sqs_service.protocol)132 parsed_response = response_parser.parse(133 result.to_readonly_response_dict(), sqs_service.operation_model("SendMessage").output_shape134 )135 # Test the ResponseMetadata and delete it afterwards136 assert "ResponseMetadata" in parsed_response137 assert "RequestId" in parsed_response["ResponseMetadata"]138 assert len(parsed_response["ResponseMetadata"]["RequestId"]) == 52139 assert "HTTPStatusCode" in parsed_response["ResponseMetadata"]140 assert parsed_response["ResponseMetadata"]["HTTPStatusCode"] == 200141 del parsed_response["ResponseMetadata"]142 # Compare the (remaining) actual payload143 assert parsed_response == {144 "MD5OfMessageBody": "String",145 "MD5OfMessageAttributes": "String",146 "MD5OfMessageSystemAttributes": "String",147 "MessageId": "String",148 "SequenceNumber": "String",149 }150def test_skeleton_e2e_sqs_send_message_not_implemented():151 sqs_service = load_service("sqs")152 skeleton = Skeleton(sqs_service, TestSqsApiNotImplemented())153 context = RequestContext()154 context.account = "test"155 context.region = "us-west-1"156 context.service = sqs_service157 context.request = HttpRequest(158 **{159 "method": "POST",160 "path": "/",161 "body": "Action=SendMessage&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue&MessageBody=%7B%22foo%22%3A+%22bared%22%7D&DelaySeconds=2",162 "headers": _get_sqs_request_headers(),163 }164 )165 result = skeleton.invoke(context)166 # Use the parser from botocore to parse the serialized response167 response_parser = create_parser(sqs_service.protocol)168 parsed_response = response_parser.parse(169 result.to_readonly_response_dict(), sqs_service.operation_model("SendMessage").output_shape170 )171 # Test the ResponseMetadata172 assert "ResponseMetadata" in parsed_response173 assert "RequestId" in parsed_response["ResponseMetadata"]174 assert len(parsed_response["ResponseMetadata"]["RequestId"]) == 52175 assert "HTTPStatusCode" in parsed_response["ResponseMetadata"]176 assert parsed_response["ResponseMetadata"]["HTTPStatusCode"] == 501177 # Compare the (remaining) actual eror payload178 assert "Error" in parsed_response179 assert parsed_response["Error"] == {180 "Code": "InternalFailure",181 "Message": "API action 'SendMessage' for service 'sqs' not yet implemented",182 }183def test_dispatch_common_service_exception():184 def delete_queue(_context: RequestContext, _request: ServiceRequest):185 raise CommonServiceException("NonExistentQueue", "No such queue")186 table: DispatchTable = {}187 table["DeleteQueue"] = delete_queue188 sqs_service = load_service("sqs")189 skeleton = Skeleton(sqs_service, table)190 context = RequestContext()191 context.account = "test"192 context.region = "us-west-1"193 context.service = sqs_service194 context.request = HttpRequest(195 **{196 "method": "POST",197 "path": "/",198 "body": "Action=DeleteQueue&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue",199 "headers": _get_sqs_request_headers(),200 }201 )202 result = skeleton.invoke(context)203 # Use the parser from botocore to parse the serialized response204 response_parser = create_parser(sqs_service.protocol)205 parsed_response = response_parser.parse(206 result.to_readonly_response_dict(), sqs_service.operation_model("SendMessage").output_shape207 )208 assert "Error" in parsed_response209 assert parsed_response["Error"] == {210 "Code": "NonExistentQueue",211 "Message": "No such queue",212 }213def test_dispatch_missing_method_returns_internal_failure():214 table: DispatchTable = {}215 sqs_service = load_service("sqs")216 skeleton = Skeleton(sqs_service, table)217 context = RequestContext()218 context.account = "test"219 context.region = "us-west-1"220 context.service = sqs_service221 context.request = HttpRequest(222 **{223 "method": "POST",224 "path": "/",225 "body": "Action=DeleteQueue&Version=2012-11-05&QueueUrl=http%3A%2F%2Flocalhost%3A4566%2F000000000000%2Ftf-acc-test-queue",226 "headers": _get_sqs_request_headers(),227 }228 )229 result = skeleton.invoke(context)230 # Use the parser from botocore to parse the serialized response231 response_parser = create_parser(sqs_service.protocol)232 parsed_response = response_parser.parse(233 result.to_readonly_response_dict(), sqs_service.operation_model("SendMessage").output_shape234 )235 assert "Error" in parsed_response236 assert parsed_response["Error"] == {237 "Code": "InternalFailure",238 "Message": "API action 'DeleteQueue' for service 'sqs' not yet implemented",239 }240class TestServiceRequestDispatcher:241 def test_default_dispatcher(self):242 class SomeAction(ServiceRequest):243 ArgOne: str244 ArgTwo: int245 def fn(context, arg_one, arg_two):246 assert type(context) == RequestContext247 assert arg_one == "foo"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!