How to use validate_empty_message_batch method in localstack

Best Python code snippet using localstack_python

sqs_listener.py

Source:sqs_listener.py Github

copy

Full Screen

...159 # If we cannot find the Host header, then fall back to the port of the proxy.160 # (note that this could be incorrect, e.g., if running in Docker with a host port that161 # is different from the internal container port, but there is not much else we can do.)162 return request_handler.proxy.port163def validate_empty_message_batch(data, req_data):164 data = to_str(data).split('Entries=')165 if len(data) > 1 and not req_data.get('Entries'):166 return True167 return False168def is_sqs_queue_url(url):169 path = path_from_url(url).partition('?')[0]170 return re.match(r'^/(queue|%s)/[a-zA-Z0-9_-]+$' % constants.TEST_AWS_ACCOUNT_ID, path)171class ProxyListenerSQS(PersistingProxyListener):172 def api_name(self):173 return 'sqs'174 def forward_request(self, method, path, data, headers):175 if method == 'OPTIONS':176 return 200177 req_data = parse_request_data(method, path, data)178 if is_sqs_queue_url(path) and method == 'GET':179 if not headers.get('Authorization'):180 headers['Authorization'] = aws_stack.mock_aws_request_headers(service='sqs')['Authorization']181 method = 'POST'182 req_data = {'Action': 'GetQueueUrl', 'Version': API_VERSION, 'QueueName': path.split('/')[-1]}183 if req_data:184 action = req_data.get('Action')185 if action in ('SendMessage', 'SendMessageBatch') and SQS_BACKEND_IMPL == 'moto':186 # check message contents187 for key, value in req_data.items():188 if not re.match(MSG_CONTENT_REGEX, str(value)):189 return make_requests_error(code=400, code_string='InvalidMessageContents',190 message='Message contains invalid characters')191 elif action == 'SetQueueAttributes':192 # TODO remove this function if we stop using ElasticMQ entirely193 queue_url = _queue_url(path, req_data, headers)194 if SQS_BACKEND_IMPL == 'elasticmq':195 forward_attrs = _set_queue_attributes(queue_url, req_data)196 if len(req_data) != len(forward_attrs):197 # make sure we only forward the supported attributes to the backend198 return _get_attributes_forward_request(method, path, headers, req_data, forward_attrs)199 elif action == 'DeleteQueue':200 queue_url = _queue_url(path, req_data, headers)201 QUEUE_ATTRIBUTES.pop(queue_url, None)202 sns_listener.unsubscribe_sqs_queue(queue_url)203 elif action == 'ListDeadLetterSourceQueues':204 # TODO remove this function if we stop using ElasticMQ entirely205 queue_url = _queue_url(path, req_data, headers)206 if SQS_BACKEND_IMPL == 'elasticmq':207 headers = {'content-type': 'application/xhtml+xml'}208 content_str = _list_dead_letter_source_queues(QUEUE_ATTRIBUTES, queue_url)209 return requests_response(content_str, headers=headers)210 if 'QueueName' in req_data:211 encoded_data = urlencode(req_data, doseq=True) if method == 'POST' else ''212 modified_url = None213 if method == 'GET':214 base_path = path.partition('?')[0]215 modified_url = '%s?%s' % (base_path, urlencode(req_data, doseq=True))216 return Request(data=encoded_data, url=modified_url, headers=headers, method=method)217 return True218 def return_response(self, method, path, data, headers, response, request_handler):219 # persist requests to disk220 super(ProxyListenerSQS, self).return_response(221 method, path, data, headers, response, request_handler222 )223 if method == 'OPTIONS' and path == '/':224 # Allow CORS preflight requests to succeed.225 return 200226 if method != 'POST':227 return228 region_name = aws_stack.get_region()229 req_data = parse_request_data(method, path, data)230 action = req_data.get('Action')231 content_str = content_str_original = to_str(response.content)232 if response.status_code >= 400:233 return response234 _fire_event(req_data, response)235 # patch the response and add missing attributes236 if action == 'GetQueueAttributes':237 content_str = _add_queue_attributes(path, req_data, content_str, headers)238 # patch the response and return the correct endpoint URLs / ARNs239 if action in ('CreateQueue', 'GetQueueUrl', 'ListQueues', 'GetQueueAttributes', 'ListDeadLetterSourceQueues'):240 if config.USE_SSL and '<QueueUrl>http://' in content_str:241 # return https://... if we're supposed to use SSL242 content_str = re.sub(r'<QueueUrl>\s*http://', r'<QueueUrl>https://', content_str)243 # expose external hostname:port244 external_port = SQS_PORT_EXTERNAL or get_external_port(headers, request_handler)245 content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>',246 r'<QueueUrl>\1://%s:%s/\3</QueueUrl>' % (HOSTNAME_EXTERNAL, external_port),247 content_str)248 # encode account ID in queue URL249 content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://([^/]+)/queue/([^<]*)\s*</QueueUrl>',250 r'<QueueUrl>\1://\2/%s/\3</QueueUrl>' % constants.TEST_AWS_ACCOUNT_ID,251 content_str)252 # fix queue ARN253 content_str = re.sub(r'<([a-zA-Z0-9]+)>\s*arn:aws:sqs:elasticmq:([^<]+)</([a-zA-Z0-9]+)>',254 r'<\1>arn:aws:sqs:%s:\2</\3>' % region_name, content_str)255 if action == 'CreateQueue':256 queue_url = re.match(r'.*<QueueUrl>(.*)</QueueUrl>', content_str, re.DOTALL).group(1)257 if SQS_BACKEND_IMPL == 'elasticmq':258 _set_queue_attributes(queue_url, req_data)259 elif action == 'SendMessageBatch':260 if validate_empty_message_batch(data, req_data):261 msg = 'There should be at least one SendMessageBatchRequestEntry in the request.'262 return make_requests_error(code=404, code_string='EmptyBatchRequest', message=msg)263 # instruct listeners to fetch new SQS message264 if action in ('SendMessage', 'SendMessageBatch'):265 _process_sent_message(path, req_data, headers)266 if content_str_original != content_str:267 # if changes have been made, return patched response268 response.headers['content-length'] = len(content_str)269 response.headers['x-amz-crc32'] = calculate_crc32(content_str)270 return requests_response(content_str, headers=response.headers, status_code=response.status_code)271 @classmethod272 # TODO still needed? (can probably be removed)273 def get_message_attributes_md5(cls, req_data):274 req_data = clone(req_data)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful