Best Python code snippet using localstack_python
_test_sqspoller.py
Source:_test_sqspoller.py
1# pylint: skip-file2import mock3import pytest4from scrapy.exceptions import NotConfigured5from content_analytics.middlewares.sqspoller import SQSPollerMiddleware6class TestSQSPollerMiddleware(object):7 __module = 'content_analytics.sqspoller'8 @mock.patch.object(9 SQSPollerMiddleware, '_SQSPollerMiddleware__setup_keep_messages'10 )11 @mock.patch.object(12 SQSPollerMiddleware, '_SQSPollerMiddleware__setup_queue'13 )14 def __get_instance(15 self, mocked_setup_keep_messages, mocked_setup_queue,16 aws_options=None, queue_name=None17 ):18 if aws_options is None:19 aws_options = {}20 return SQSPollerMiddleware(aws_options, queue_name)21 def test_from_crawler__SQSPOLLER_ENABLED_false(self):22 crawler = mock.MagicMock()23 crawler.settings.getbool.return_value = False24 with pytest.raises(NotConfigured):25 SQSPollerMiddleware.from_crawler(crawler)26 crawler.settings.getbool.assert_called_once_with(27 'SQSPOLLER_ENABLED', False28 )29 @mock.patch('{}.aws_from_settings'.format(__module))30 def test_from_crawler__aws_options_None(self, mocked_aws_from_settings):31 crawler = mock.MagicMock()32 crawler.settings.getbool.return_value = True33 mocked_aws_from_settings.return_value = None34 with pytest.raises(NotConfigured):35 SQSPollerMiddleware.from_crawler(crawler)36 crawler.settings.getbool.assert_called_once_with(37 'SQSPOLLER_ENABLED', False38 )39 mocked_aws_from_settings.assert_called_once_with(40 crawler.settings, prefix='SQSPOLLER_'41 )42 @mock.patch('{}.aws_from_settings'.format(__module))43 def test_from_crawler__SQSPOLLER_QUEUE_None(44 self, mocked_aws_from_settings45 ):46 crawler = mock.MagicMock()47 crawler.settings.getbool.return_value = True48 crawler.settings.get.return_value = None49 with pytest.raises(NotConfigured):50 SQSPollerMiddleware.from_crawler(crawler)51 crawler.settings.getbool.assert_called_once_with(52 'SQSPOLLER_ENABLED', False53 )54 mocked_aws_from_settings.assert_called_once_with(55 crawler.settings, prefix='SQSPOLLER_'56 )57 crawler.settings.get.assert_called_once_with('SQSPOLLER_QUEUE')58 @mock.patch('{}.aws_from_settings'.format(__module))59 @mock.patch.object(60 SQSPollerMiddleware, '__init__'61 )62 def test_from_crawler(self, mocked_init, mocked_aws_from_settings):63 mocked_init.return_value = None64 crawler = mock.MagicMock()65 crawler.settings.getbool.return_value = True66 crawler.settings.get.return_value = 'Queue'67 SQSPollerMiddleware.from_crawler(crawler)68 crawler.settings.getbool.assert_called_once_with(69 'SQSPOLLER_ENABLED', False70 )71 crawler.settings.get.assert_called_once_with('SQSPOLLER_QUEUE')72 mocked_init.assert_called_once_with(73 mocked_aws_from_settings.return_value,74 crawler.settings.get.return_value75 )76 @mock.patch.object(77 SQSPollerMiddleware, '_SQSPollerMiddleware__setup_keep_messages'78 )79 @mock.patch.object(80 SQSPollerMiddleware, '_SQSPollerMiddleware__setup_queue'81 )82 def test___init__(self, mocked_setup_queue, mocked_setup_keep_messages):83 aws_options, queue_name = None, None84 SQSPollerMiddleware(aws_options, queue_name)85 mocked_setup_queue.assert_called_once_with(aws_options, queue_name)86 mocked_setup_keep_messages.assert_called_once_with()87 @mock.patch('{}.logging'.format(__module))88 @mock.patch('{}.boto'.format(__module))89 def test___setup_queue(self, mocked_boto, mocked_logging):90 aws_options, queue_name = {}, None91 mware = SQSPollerMiddleware(aws_options, queue_name)92 mocked_logging.getLogger.assert_called_once_with('boto')93 mocked_logging.getLogger.return_value.setLevel\94 .assert_called_once_with(mocked_logging.WARNING)95 mocked_boto.connect_sqs.assert_called_once_with(**aws_options)96 mocked_boto.connect_sqs.return_value.get_queue\97 .assert_called_once_with(queue_name)98 mocked_boto.connect_sqs.return_value.get_queue.return_value\99 .set_message_class.assert_called_once_with(100 mocked_boto.sqs.message.RawMessage101 )102 assert mware.input_queue is mocked_boto.connect_sqs.return_value\103 .get_queue.return_value104 def test___keep_messages__empty(self):105 mware = self.__get_instance()106 mware._SQSPollerMiddleware__keep_messages()107 mware.input_queue.change_message_visibility_batch.assert_not_called()108 def test___keep_messages__one(self):109 mware = self.__get_instance()110 mware.in_progress = [None]111 mware._SQSPollerMiddleware__keep_messages()112 mware.input_queue.change_message_visibility_batch\113 .assert_called_once_with([(None, 60)])114 def test___keep_messages__ten(self):115 mware = self.__get_instance()116 mware.in_progress = range(10)117 mware._SQSPollerMiddleware__keep_messages()118 mware.input_queue.change_message_visibility_batch\119 .assert_called_once_with(map(lambda x: (x, 60), range(10)))120 def test_spider_closed__stop_loopingcall(self):121 mware = self.__get_instance()122 mware._SQSPollerMiddleware__keep_messages_loop = mock.MagicMock()123 mware._SQSPollerMiddleware__keep_messages_loop.running = True124 mware.spider_closed()125 mware._SQSPollerMiddleware__keep_messages_loop.stop\126 .assert_called_once_with()127 def test_spider_closed__empty(self):128 mware = self.__get_instance()129 mware._SQSPollerMiddleware__keep_messages_loop = mock.MagicMock()130 mware._SQSPollerMiddleware__keep_messages_loop.running = False131 mware.spider_closed()132 mware._SQSPollerMiddleware__keep_messages_loop.stop.assert_not_called()133 mware.input_queue.delete_message_batch.assert_not_called()134 mware.input_queue.change_message_visibility_batch.assert_not_called()135 def test_spider_closed__one(self):136 mware = self.__get_instance()137 mware._SQSPollerMiddleware__keep_messages_loop = mock.MagicMock()138 mware._SQSPollerMiddleware__keep_messages_loop.running = False139 mware.in_progress = [None]140 mware.done = [None]141 mware.spider_closed()142 mware._SQSPollerMiddleware__keep_messages_loop.stop.assert_not_called()143 mware.input_queue.delete_message_batch.assert_called_once_with([None])144 assert not mware.done145 mware.input_queue.change_message_visibility_batch\146 .assert_called_once_with([(None, 1)])147 assert not mware.in_progress148 def test_process_start_requests__empty(self):149 mware = self.__get_instance()150 mware.input_queue.get_messages.return_value = []151 with pytest.raises(StopIteration):152 next(mware.process_start_requests(None, None))153 mware.input_queue.get_messages.assert_called_once_with(154 10, visibility_timeout=60, wait_time_seconds=1155 )156 def test_process_start_requests__many(self):157 mware = self.__get_instance()158 message = mock.MagicMock()159 message.get_body.return_value = '{"url": "http://example.com"}'160 messages = [message for _ in range(100)]161 __messages = iter([messages, []]) # avoids an infinite loop162 def get_messages(*args, **kwargs):163 return next(__messages)164 mware.input_queue.get_messages = get_messages165 spider = mock.MagicMock()166 requests = [x for x in mware.process_start_requests(None, spider)]167 assert len(requests) == 100168 def test_process_start_requests__make_request_from_message(self):169 mware = self.__get_instance()170 message = mock.MagicMock()171 message.get_body.return_value = '{"url": "http://example.com"}'172 mware.input_queue.get_messages.return_value = [message]173 spider = mock.MagicMock()174 request = next(mware.process_start_requests(None, spider))175 spider.make_request_from_message.assert_called_once_with(176 {'url': 'http://example.com'}177 )178 assert spider.make_request_from_message.return_value is request179 request.meta.__setitem__.assert_called_once_with('_message', message)180 def test_process_start_requests__make_request_from_url(self):181 mware = self.__get_instance()182 message = mock.MagicMock()183 message.get_body.return_value = '{"url": "http://example.com"}'184 mware.input_queue.get_messages.return_value = [message]185 spider = mock.MagicMock()186 del spider.make_request_from_message187 request = next(mware.process_start_requests(None, spider))188 spider.make_request_from_url.assert_called_once_with(189 'http://example.com'190 )191 assert spider.make_request_from_url.return_value is request192 request.meta.__setitem__.assert_called_once_with('_message', message)193 def test_process_spider_input__no_key(self):194 mware = self.__get_instance()195 mware.done = mock.MagicMock()196 response = mock.MagicMock()197 response.meta = {}198 result = mware.process_spider_input(response, None)199 assert result is None200 mware.done.append.assert_not_called()201 def test_process_spider_input__one(self):202 mware = self.__get_instance()203 mware.in_progress = mock.MagicMock()204 response = mock.MagicMock()205 message = mock.MagicMock()206 response.meta = {'_message': message}207 assert len(mware.done) == 0208 result = mware.process_spider_input(response, None)209 assert result is None210 assert len(mware.done) == 1211 mware.in_progress.remove.assert_called_once_with(message)212 mware.input_queue.delete_message_batch.assert_not_called()213 def test_process_spider_input__ten(self):214 mware = self.__get_instance()215 message = mock.MagicMock()216 mware.done.extend([message for _ in range(9)])217 __done = [message for _ in range(10)]218 mware.in_progress = mock.MagicMock()219 response = mock.MagicMock()220 response.meta = {'_message': message}221 assert len(mware.done) == 9222 result = mware.process_spider_input(response, None)223 assert result is None224 mware.in_progress.remove.assert_called_once_with(message)225 mware.input_queue.delete_message_batch.assert_called_once_with(__done)226 assert len(mware.done) == 0227 def test_process_spider_input__eleven(self):228 mware = self.__get_instance()229 message = mock.MagicMock()230 mware.done.extend([message for _ in range(10)])231 __done = [message for _ in range(10)]232 mware.in_progress = mock.MagicMock()233 response = mock.MagicMock()234 response.meta = {'_message': message}235 assert len(mware.done) == 10236 result = mware.process_spider_input(response, None)237 assert result is None238 mware.in_progress.remove.assert_called_once_with(message)239 mware.input_queue.delete_message_batch.assert_called_once_with(__done)...
queue.py
Source:queue.py
...87 def delete_message_batch(self, messages, callback=None):88 return self.connection.delete_message_batch(89 self, messages, callback=callback,90 )91 def change_message_visibility_batch(self, messages, callback=None):92 return self.connection.change_message_visibility_batch(93 self, messages, callback=callback,94 )95 def delete(self, callback=None):96 return self.connection.delete_queue(self, callback=callback)97 def count(self, page_size=10, vtimeout=10, callback=None,98 _attr='ApproximateNumberOfMessages'):99 return self.get_attributes(100 _attr, callback=transform(101 self._coerce_field_value, callback, _attr, int,102 ),...
SQSClientExtendedBase.py
Source:SQSClientExtendedBase.py
...10 def delete_message_batch(self, **kwargs):11 raise NotImplementedError('delete_message_batch is not implemented')12 def change_message_visibility(self, **kwargs):13 raise NotImplementedError('change_message_visibility is not implemented')14 def change_message_visibility_batch(self, **kwargs):15 raise NotImplementedError('change_message_visibility_batch is not implemented')16 def purge_queue(self, **kwargs):17 raise NotImplementedError('purge_queue is not implemented')18 def create_queue(self, **kwargs):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!