Best Python code snippet using localstack_python
test_sqs.py
Source:test_sqs.py
...145 }146 )147 return {'Messages': messages}148 mock_conn.return_value.receive_message.side_effect = mock_receive_message149 def mock_delete_message_batch(**kwargs):150 return {'Successful'}151 mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch152 # Test that messages are filtered153 self.sensor.message_filtering = 'literal'154 self.sensor.message_filtering_match_values = ["a matching message"]155 result = self.sensor.poke(self.mock_context)156 assert result157 # Test that only filtered messages are deleted158 delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]159 calls_delete_message_batch = [160 mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)161 ]162 mock_conn.assert_has_calls(calls_delete_message_batch)163 @mock.patch.object(SqsHook, "get_conn")164 def test_poke_message_filtering_jsonpath(self, mock_conn):165 self.sqs_hook.create_queue(QUEUE_NAME)166 matching = [167 {"id": 11, "key": {"matches": [1, 2]}},168 {"id": 12, "key": {"matches": [3, 4, 5]}},169 {"id": 13, "key": {"matches": [10]}},170 ]171 non_matching = [172 {"id": 14, "key": {"nope": [5, 6]}},173 {"id": 15, "key": {"nope": [7, 8]}},174 ]175 all = matching + non_matching176 def mock_receive_message(**kwargs):177 messages = []178 for message in all:179 messages.append(180 {181 'MessageId': message['id'],182 'ReceiptHandle': 100 + message['id'],183 'Body': json.dumps(message),184 }185 )186 return {'Messages': messages}187 mock_conn.return_value.receive_message.side_effect = mock_receive_message188 def mock_delete_message_batch(**kwargs):189 return {'Successful'}190 mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch191 # Test that messages are filtered192 self.sensor.message_filtering = 'jsonpath'193 self.sensor.message_filtering_config = 'key.matches[*]'194 result = self.sensor.poke(self.mock_context)195 assert result196 # Test that only filtered messages are deleted197 delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]198 calls_delete_message_batch = [199 mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)200 ]201 mock_conn.assert_has_calls(calls_delete_message_batch)202 @mock.patch.object(SqsHook, "get_conn")203 def test_poke_message_filtering_jsonpath_values(self, mock_conn):204 self.sqs_hook.create_queue(QUEUE_NAME)205 matching = [206 {"id": 11, "key": {"matches": [1, 2]}},207 {"id": 12, "key": {"matches": [1, 4, 5]}},208 {"id": 13, "key": {"matches": [4, 5]}},209 ]210 non_matching = [211 {"id": 21, "key": {"matches": [10]}},212 {"id": 22, "key": {"nope": [5, 6]}},213 {"id": 23, "key": {"nope": [7, 8]}},214 ]215 all = matching + non_matching216 def mock_receive_message(**kwargs):217 messages = []218 for message in all:219 messages.append(220 {221 'MessageId': message['id'],222 'ReceiptHandle': 100 + message['id'],223 'Body': json.dumps(message),224 }225 )226 return {'Messages': messages}227 mock_conn.return_value.receive_message.side_effect = mock_receive_message228 def mock_delete_message_batch(**kwargs):229 return {'Successful'}230 mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch231 # Test that messages are filtered232 self.sensor.message_filtering = 'jsonpath'233 self.sensor.message_filtering_config = 'key.matches[*]'234 self.sensor.message_filtering_match_values = [1, 4]235 result = self.sensor.poke(self.mock_context)236 assert result237 # Test that only filtered messages are deleted238 delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]239 calls_delete_message_batch = [240 mock.call().delete_message_batch(QueueUrl='https://test-queue', Entries=delete_entries)241 ]242 mock_conn.assert_has_calls(calls_delete_message_batch)243 @mock.patch.object(SqsHook, "get_conn")244 def test_poke_do_not_delete_message_on_received(self, mock_conn):245 self.sqs_hook.create_queue(QUEUE_NAME)246 self.sqs_hook.send_message(queue_url=QUEUE_URL, message_body='hello')247 self.sensor = SqsSensor(248 task_id='test_task2',249 dag=self.dag,250 sqs_queue=QUEUE_URL,251 aws_conn_id='aws_default',252 # do not delete message upon reception253 delete_message_on_reception=False,254 )...
sqs_helper.py
Source:sqs_helper.py
23sqs = boto3.client('sqs')456def delete_message_batch(sqs_queue_url: str, event):7 receiptHandles = list(8 map(9 lambda record: {10 "Id": record["messageId"],11 "ReceiptHandle": record["receiptHandle"]12 }, event['Records']))13 response = sqs.delete_message_batch(QueueUrl=sqs_queue_url,14 Entries=receiptHandles)
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!