Best Python code snippet using localstack_python
test_sqs.py
Source: test_sqs.py
...145 }146 )147 return {'Messages': messages}148 mock_conn.return_value.receive_message.side_effect = mock_receive_message149 def mock_delete_message_batch(**kwargs):150 return {'Successful'}151 mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch152 # Test that messages are filtered153 self.sensor.message_filtering = 'literal'154 self.sensor.message_filtering_match_values = ["a matching message"]155 result = self.sensor.poke(self.mock_context)156 assert result157 # Test that only filtered messages are deleted158 delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]159 calls_delete_message_batch = [160 mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)161 ]162 mock_conn.assert_has_calls(calls_delete_message_batch)163 @mock.patch.object(SqsHook, "get_conn")164 def test_poke_message_filtering_jsonpath(self, mock_conn):165 self.sqs_hook.create_queue(QUEUE_NAME)166 matching = [167 {"id": 11, "key": {"matches": [1, 2]}},168 {"id": 12, "key": {"matches": [3, 4, 5]}},169 {"id": 13, "key": {"matches": [10]}},170 ]171 non_matching = [172 {"id": 14, "key": {"nope": [5, 6]}},173 {"id": 15, "key": {"nope": [7, 8]}},174 ]175 all = matching + non_matching176 def mock_receive_message(**kwargs):177 messages = []178 for message in all:179 messages.append(180 {181 'MessageId': message['id'],182 'ReceiptHandle': 100 + message['id'],183 'Body': json.dumps(message),184 }185 )186 return {'Messages': messages}187 mock_conn.return_value.receive_message.side_effect = mock_receive_message188 def mock_delete_message_batch(**kwargs):189 return {'Successful'}190 mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch191 # Test that messages are filtered192 self.sensor.message_filtering = 'jsonpath'193 self.sensor.message_filtering_config = 'key.matches[*]'194 result = self.sensor.poke(self.mock_context)195 assert result196 # Test that only filtered messages are deleted197 delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]198 calls_delete_message_batch = [199 mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)200 ]201 mock_conn.assert_has_calls(calls_delete_message_batch)202 @mock.patch.object(SqsHook, "get_conn")203 def test_poke_message_filtering_jsonpath_values(self, mock_conn):204 self.sqs_hook.create_queue(QUEUE_NAME)205 matching = [206 {"id": 11, "key": {"matches": [1, 2]}},207 {"id": 12, "key": {"matches": [1, 4, 5]}},208 {"id": 13, "key": {"matches": [4, 5]}},209 ]210 non_matching = [211 {"id": 21, "key": {"matches": [10]}},212 {"id": 22, "key": {"nope": [5, 6]}},213 {"id": 23, "key": {"nope": [7, 8]}},214 ]215 all = matching + non_matching216 def mock_receive_message(**kwargs):217 messages = []218 for message in all:219 messages.append(220 {221 'MessageId': message['id'],222 'ReceiptHandle': 100 + message['id'],223 'Body': json.dumps(message),224 }225 )226 return {'Messages': messages}227 mock_conn.return_value.receive_message.side_effect = mock_receive_message228 def mock_delete_message_batch(**kwargs):229 return {'Successful'}230 mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch231 # Test that messages are filtered232 self.sensor.message_filtering = 'jsonpath'233 self.sensor.message_filtering_config = 'key.matches[*]'234 self.sensor.message_filtering_match_values = [1, 4]235 result = self.sensor.poke(self.mock_context)236 assert result237 # Test that only filtered messages are deleted238 delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]239 calls_delete_message_batch = [240 mock.call().delete_message_batch(QueueUrl='https://test-queue', Entries=delete_entries)241 ]242 mock_conn.assert_has_calls(calls_delete_message_batch)243 @mock.patch.object(SqsHook, "get_conn")244 def test_poke_do_not_delete_message_on_received(self, mock_conn):245 self.sqs_hook.create_queue(QUEUE_NAME)246 self.sqs_hook.send_message(queue_url=QUEUE_URL, message_body='hello')247 self.sensor = SqsSensor(248 task_id='test_task2',249 dag=self.dag,250 sqs_queue=QUEUE_URL,251 aws_conn_id='aws_default',252 # do not delete message upon reception253 delete_message_on_reception=False,254 )...
sqs_helper.py
Source: sqs_helper.py
23sqs = boto3.client('sqs')456def delete_message_batch(sqs_queue_url: str, event):7 receiptHandles = list(8 map(9 lambda record: {10 "Id": record["messageId"],11 "ReceiptHandle": record["receiptHandle"]12 }, event['Records']))13 response = sqs.delete_message_batch(QueueUrl=sqs_queue_url,14 Entries=receiptHandles)
...
Check out the latest blogs from LambdaTest on this topic:
The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.
QA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.
Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.
Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!