...62 req_data_new['Attribute.%s.Value' % i] = [v]63 i += 164 data = urlencode(req_data_new, doseq=True)65 return Request(data=data, headers=headers, method=method)66def _set_queue_attributes(queue_url, req_data):67 # TODO remove this function if we stop using ElasticMQ entirely68 if SQS_BACKEND_IMPL != 'elasticmq':69 return70 attrs = _format_attributes(req_data)71 # select only the attributes in UNSUPPORTED_ATTRIBUTE_NAMES72 local_attrs = {}73 for k, v in attrs.items():74 if k in UNSUPPORTED_ATTRIBUTE_NAMES:75 try:76 _v = json.loads(v)77 if isinstance(_v, dict):78 if 'maxReceiveCount' in _v:79 _v['maxReceiveCount'] = int(_v['maxReceiveCount'])80 local_attrs.update(dict({k: json.dumps(_v)}))81 except Exception:82 local_attrs.update(dict({k: v}))83 QUEUE_ATTRIBUTES[queue_url] = QUEUE_ATTRIBUTES.get(queue_url) or {}84 QUEUE_ATTRIBUTES[queue_url].update(local_attrs)85 forward_attrs = dict([(k, v) for k, v in attrs.items() if k not in UNSUPPORTED_ATTRIBUTE_NAMES])86 return forward_attrs87def _add_queue_attributes(path, req_data, content_str, headers):88 # TODO remove this function if we stop using ElasticMQ entirely89 if SQS_BACKEND_IMPL != 'elasticmq':90 return content_str91 flags = re.MULTILINE | re.DOTALL92 queue_url = _queue_url(path, req_data, headers)93 requested_attributes = _format_attributes_names(req_data)94 regex = r'(.*<GetQueueAttributesResult>)(.*)(</GetQueueAttributesResult>.*)'95 attrs = re.sub(regex, r'\2', content_str, flags=flags)96 for key, value in QUEUE_ATTRIBUTES.get(queue_url, {}).items():97 if (not requested_attributes or requested_attributes.intersection({'All', key})) and \98 not re.match(r'<Name>\s*%s\s*</Name>' % key, attrs, flags=flags):99 attrs += '<Attribute><Name>%s</Name><Value>%s</Value></Attribute>' % (key, value)100 content_str = (re.sub(regex, r'\1', content_str, flags=flags) +101 attrs + re.sub(regex, r'\3', content_str, flags=flags))102 return content_str103def _fire_event(req_data, response):104 action = req_data.get('Action')105 event_type = None106 queue_url = None107 if action == 'CreateQueue':108 event_type = event_publisher.EVENT_SQS_CREATE_QUEUE109 response_data = xmltodict.parse(response.content)110 if 'CreateQueueResponse' in response_data:111 queue_url = response_data['CreateQueueResponse']['CreateQueueResult']['QueueUrl']112 elif action == 'DeleteQueue':113 event_type = event_publisher.EVENT_SQS_DELETE_QUEUE114 queue_url = req_data.get('QueueUrl')115 if event_type and queue_url:116 event_publisher.fire_event(event_type, payload={'u': event_publisher.get_hash(queue_url)})117def _queue_url(path, req_data, headers):118 queue_url = req_data.get('QueueUrl')119 if queue_url:120 return queue_url121 url = config.TEST_SQS_URL122 if headers.get('Host'):123 url = '%s://%s' % (get_service_protocol(), headers['Host'])124 queue_url = '%s%s' % (url, path.partition('?')[0])125 return queue_url126def _list_dead_letter_source_queues(queues, queue_url):127 dead_letter_source_queues = []128 for k, v in queues.items():129 for i, j in v.items():130 if i == 'RedrivePolicy':131 f = json.loads(v[i])132 queue_url_split = queue_url.split('/')133 if queue_url_split[-1] in f['deadLetterTargetArn']:134 dead_letter_source_queues.append(k)135 return format_list_dl_source_queues_response(dead_letter_source_queues)136def _process_sent_message(path, req_data, headers):137 queue_name = _queue_url(path, req_data, headers).rpartition('/')[2]138 lambda_api.process_sqs_message(queue_name)139def format_list_dl_source_queues_response(queues):140 content_str = """<ListDeadLetterSourceQueuesResponse xmlns="{}">141 <ListDeadLetterSourceQueuesResult>142 {}143 </ListDeadLetterSourceQueuesResult>144 </ListDeadLetterSourceQueuesResponse>"""145 queue_urls = ''146 for q in queues:147 queue_urls += '<QueueUrl>{}</QueueUrl>'.format(q)148 return content_str.format(XMLNS_SQS, queue_urls)149# extract the external port used by the client to make the request150def get_external_port(headers, request_handler):151 host = headers.get('Host', '')152 if not host:153 forwarded = headers.get('X-Forwarded-For', '').split(',')154 host = forwarded[-2] if len(forwarded) > 2 else forwarded[-1]155 if ':' in host:156 return int(host.split(':')[1])157 if not request_handler or not request_handler.proxy:158 return config.PORT_SQS159 # If we cannot find the Host header, then fall back to the port of the proxy.160 # (note that this could be incorrect, e.g., if running in Docker with a host port that161 # is different from the internal container port, but there is not much else we can do.)162 return request_handler.proxy.port163def validate_empty_message_batch(data, req_data):164 data = to_str(data).split('Entries=')165 if len(data) > 1 and not req_data.get('Entries'):166 return True167 return False168def is_sqs_queue_url(url):169 path = path_from_url(url).partition('?')[0]170 return re.match(r'^/(queue|%s)/[a-zA-Z0-9_-]+$' % constants.TEST_AWS_ACCOUNT_ID, path)171class ProxyListenerSQS(PersistingProxyListener):172 def api_name(self):173 return 'sqs'174 def forward_request(self, method, path, data, headers):175 if method == 'OPTIONS':176 return 200177 req_data = parse_request_data(method, path, data)178 if is_sqs_queue_url(path) and method == 'GET':179 if not headers.get('Authorization'):180 headers['Authorization'] = aws_stack.mock_aws_request_headers(service='sqs')['Authorization']181 method = 'POST'182 req_data = {'Action': 'GetQueueUrl', 'Version': API_VERSION, 'QueueName': path.split('/')[-1]}183 if req_data:184 action = req_data.get('Action')185 if action in ('SendMessage', 'SendMessageBatch') and SQS_BACKEND_IMPL == 'moto':186 # check message contents187 for key, value in req_data.items():188 if not re.match(MSG_CONTENT_REGEX, str(value)):189 return make_requests_error(code=400, code_string='InvalidMessageContents',190 message='Message contains invalid characters')191 elif action == 'SetQueueAttributes':192 # TODO remove this function if we stop using ElasticMQ entirely193 queue_url = _queue_url(path, req_data, headers)194 if SQS_BACKEND_IMPL == 'elasticmq':195 forward_attrs = _set_queue_attributes(queue_url, req_data)196 if len(req_data) != len(forward_attrs):197 # make sure we only forward the supported attributes to the backend198 return _get_attributes_forward_request(method, path, headers, req_data, forward_attrs)199 elif action == 'DeleteQueue':200 queue_url = _queue_url(path, req_data, headers)201 QUEUE_ATTRIBUTES.pop(queue_url, None)202 sns_listener.unsubscribe_sqs_queue(queue_url)203 elif action == 'ListDeadLetterSourceQueues':204 # TODO remove this function if we stop using ElasticMQ entirely205 queue_url = _queue_url(path, req_data, headers)206 if SQS_BACKEND_IMPL == 'elasticmq':207 headers = {'content-type': 'application/xhtml+xml'}208 content_str = _list_dead_letter_source_queues(QUEUE_ATTRIBUTES, queue_url)209 return requests_response(content_str, headers=headers)210 if 'QueueName' in req_data:211 encoded_data = urlencode(req_data, doseq=True) if method == 'POST' else ''212 modified_url = None213 if method == 'GET':214 base_path = path.partition('?')[0]215 modified_url = '%s?%s' % (base_path, urlencode(req_data, doseq=True))216 return Request(data=encoded_data, url=modified_url, headers=headers, method=method)217 return True218 def return_response(self, method, path, data, headers, response, request_handler):219 # persist requests to disk220 super(ProxyListenerSQS, self).return_response(221 method, path, data, headers, response, request_handler222 )223 if method == 'OPTIONS' and path == '/':224 # Allow CORS preflight requests to succeed.225 return 200226 if method != 'POST':227 return228 region_name = aws_stack.get_region()229 req_data = parse_request_data(method, path, data)230 action = req_data.get('Action')231 content_str = content_str_original = to_str(response.content)232 if response.status_code >= 400:233 return response234 _fire_event(req_data, response)235 # patch the response and add missing attributes236 if action == 'GetQueueAttributes':237 content_str = _add_queue_attributes(path, req_data, content_str, headers)238 # patch the response and return the correct endpoint URLs / ARNs239 if action in ('CreateQueue', 'GetQueueUrl', 'ListQueues', 'GetQueueAttributes', 'ListDeadLetterSourceQueues'):240 if config.USE_SSL and '<QueueUrl>http://' in content_str:241 # return https://... if we're supposed to use SSL242 content_str = re.sub(r'<QueueUrl>\s*http://', r'<QueueUrl>https://', content_str)243 # expose external hostname:port244 external_port = SQS_PORT_EXTERNAL or get_external_port(headers, request_handler)245 content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>',246 r'<QueueUrl>\1://%s:%s/\3</QueueUrl>' % (HOSTNAME_EXTERNAL, external_port),247 content_str)248 # encode account ID in queue URL249 content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://([^/]+)/queue/([^<]*)\s*</QueueUrl>',250 r'<QueueUrl>\1://\2/%s/\3</QueueUrl>' % constants.TEST_AWS_ACCOUNT_ID,251 content_str)252 # fix queue ARN253 content_str = re.sub(r'<([a-zA-Z0-9]+)>\s*arn:aws:sqs:elasticmq:([^<]+)</([a-zA-Z0-9]+)>',254 r'<\1>arn:aws:sqs:%s:\2</\3>' % region_name, content_str)255 if action == 'CreateQueue':256 queue_url = re.match(r'.*<QueueUrl>(.*)</QueueUrl>', content_str, re.DOTALL).group(1)257 if SQS_BACKEND_IMPL == 'elasticmq':258 _set_queue_attributes(queue_url, req_data)259 elif action == 'SendMessageBatch':260 if validate_empty_message_batch(data, req_data):261 msg = 'There should be at least one SendMessageBatchRequestEntry in the request.'262 return make_requests_error(code=404, code_string='EmptyBatchRequest', message=msg)263 # instruct listeners to fetch new SQS message264 if action in ('SendMessage', 'SendMessageBatch'):265 _process_sent_message(path, req_data, headers)266 if content_str_original != content_str:267 # if changes have been made, return patched response268 response.headers['content-length'] = len(content_str)269 response.headers['x-amz-crc32'] = calculate_crc32(content_str)270 return requests_response(content_str, headers=response.headers, status_code=response.status_code)271 @classmethod272 # TODO still needed? 