Best Python code snippet using yandex-tank
test_worker.py
Source:test_worker.py
1"""Module testing the kale.worker module."""2from __future__ import absolute_import3import mock4import signal5import unittest6from kale import exceptions7from kale import test_utils8from kale import worker9from six.moves import range10class WorkerTestCase(unittest.TestCase):11 """Test worker logic."""12 def _create_patch(self, name):13 """Helper method for creating scoped mocks."""14 patcher = mock.patch(name)15 patch = patcher.start()16 self.addCleanup(patcher.stop)17 return patch18 def testRun(self):19 """Test an iteration that has tasks."""20 mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')21 mock_consumer.return_value = None22 startup_handler = self._create_patch('kale.settings.ON_WORKER_STARTUP')23 worker_inst = worker.Worker()24 self.assertTrue(worker_inst is not None)25 startup_handler.assert_called_once_with()26 def testRunIterationWithTasks(self):27 """Test an iteration that has tasks."""28 mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')29 mock_consumer.return_value = None30 fetch_batch = self._create_patch('kale.consumer.Consumer.fetch_batch')31 message = test_utils.new_mock_message()32 fetch_batch.return_value = [message]33 run_batch = self._create_patch('kale.worker.Worker._run_batch')34 run_batch.return_value = (1, 1)35 worker_inst = worker.Worker()36 mock_consumer.assert_called_once_with()37 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()38 self.assertTrue(worker_inst._run_single_iteration())39 self.assertEqual(fetch_batch.called, 1)40 self.assertTrue(worker_inst._dirty)41 run_batch.assert_called_once_with([message])42 def testRunIterationWithoutTasks(self):43 """Test an iteration that does not have tasks."""44 mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')45 mock_consumer.return_value = None46 fetch_batch = self._create_patch('kale.consumer.Consumer.fetch_batch')47 fetch_batch.return_value = []48 run_batch = self._create_patch('kale.worker.Worker._run_batch')49 worker_inst = worker.Worker()50 mock_consumer.assert_called_once_with()51 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()52 self.assertFalse(worker_inst._run_single_iteration())53 self.assertFalse(worker_inst._dirty)54 self.assertEqual(fetch_batch.called, 1)55 self.assertFalse(run_batch.called)56 def testCleanupWorkerStop(self):57 """Test cleanup worker."""58 mock_consumer = self._create_patch('kale.consumer.Consumer')59 release_batch = self._create_patch('kale.worker.Worker._release_batch')60 shutdown_handler = self._create_patch(61 'kale.settings.ON_WORKER_SHUTDOWN')62 sys_exit = self._create_patch('sys.exit')63 worker_inst = worker.Worker()64 mock_consumer.assert_called_once_with()65 release_batch.return_value = (0, 0)66 worker_inst._cleanup_worker(signal.SIGABRT, None)67 release_batch.assert_called_once_with()68 sys_exit.assert_called_once_with(0)69 shutdown_handler.assert_called_once_with()70 def testCleanupWorkerSuspend(self):71 """Test cleanup worker."""72 mock_consumer = self._create_patch('kale.consumer.Consumer')73 release_batch = self._create_patch('kale.worker.Worker._release_batch')74 sys_exit = self._create_patch('sys.exit')75 worker_inst = worker.Worker()76 mock_consumer.assert_called_once_with()77 release_batch.return_value = (0, 0)78 worker_inst._cleanup_worker(signal.SIGTSTP, None)79 release_batch.assert_called_once_with()80 assert not sys_exit.called, 'System should not have exited.'81 def testReleaseBatchWithTimeToSpare(self):82 """Test releasing a batch where the spare time is over the threshold.83 """84 mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')85 mock_consumer.return_value = None86 mock_release = self._create_patch(87 'kale.consumer.Consumer.release_messages')88 mock_delete = self._create_patch(89 'kale.consumer.Consumer.delete_messages')90 mock_publish_dlq = self._create_patch(91 'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')92 get_time = self._create_patch('time.time')93 worker_inst = worker.Worker()94 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()95 mock_consumer.assert_called_once_with()96 worker_inst._incomplete_messages = [97 test_utils.new_mock_message() for i in range(2)]98 worker_inst._successful_messages = [99 test_utils.new_mock_message() for i in range(3)]100 worker_inst._failed_messages = [101 test_utils.new_mock_message() for i in range(4)]102 worker_inst._batch_stop_time = 20103 # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 10 > 1)104 get_time.return_value = 10105 releasable_messages = worker_inst._incomplete_messages106 deletable_messages = (107 worker_inst._successful_messages + worker_inst._failed_messages)108 num_deleted, num_released = worker_inst._release_batch()109 mock_release.assert_called_once_with(110 releasable_messages, worker_inst._batch_queue.name)111 mock_delete.assert_called_once_with(112 deletable_messages, worker_inst._batch_queue.name)113 assert not mock_publish_dlq.called, ('No messages should have been '114 'moved to dlq.')115 self.assertEqual(num_deleted, len(deletable_messages))116 self.assertEqual(num_released, len(releasable_messages))117 self.assertEqual(0, len(worker_inst._incomplete_messages))118 self.assertEqual(0, len(worker_inst._successful_messages))119 self.assertEqual(0, len(worker_inst._failed_messages))120 self.assertEqual(0, len(worker_inst._permanent_failures))121 def testReleaseBatchWithPermanent(self):122 """Test releasing a batch where the spare time is over the threshold.123 """124 mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')125 mock_consumer.return_value = None126 mock_release = self._create_patch(127 'kale.consumer.Consumer.release_messages')128 mock_delete = self._create_patch(129 'kale.consumer.Consumer.delete_messages')130 mock_publish_dlq = self._create_patch(131 'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')132 get_time = self._create_patch('time.time')133 worker_inst = worker.Worker()134 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()135 mock_consumer.assert_called_once_with()136 worker_inst._incomplete_messages = [137 test_utils.new_mock_message() for i in range(2)]138 worker_inst._successful_messages = [139 test_utils.new_mock_message() for i in range(3)]140 worker_inst._failed_messages = [141 test_utils.new_mock_message() for i in range(4)]142 # Permanent failures should be a subset of failures.143 worker_inst._permanent_failures = worker_inst._failed_messages[:2]144 worker_inst._batch_stop_time = 20145 # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 10 > 1)146 get_time.return_value = 10147 releasable_messages = worker_inst._incomplete_messages148 permament_failures = worker_inst._permanent_failures149 deletable_messages = (150 worker_inst._successful_messages + worker_inst._failed_messages)151 num_deleted, num_released = worker_inst._release_batch()152 mock_release.assert_called_once_with(153 releasable_messages, worker_inst._batch_queue.name)154 mock_delete.assert_called_once_with(155 deletable_messages, worker_inst._batch_queue.name)156 mock_publish_dlq.assert_called_once_with(157 worker_inst._batch_queue.dlq_name, permament_failures)158 self.assertEqual(num_deleted, len(deletable_messages))159 self.assertEqual(num_released, len(releasable_messages))160 self.assertEqual(0, len(worker_inst._incomplete_messages))161 self.assertEqual(0, len(worker_inst._successful_messages))162 self.assertEqual(0, len(worker_inst._failed_messages))163 self.assertEqual(0, len(worker_inst._permanent_failures))164 def testReleaseBatchWithNoSuccessfulAndNoTimeLeft(self):165 """Test releasing a batch where the spare time is over the threshold.166 """167 mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')168 mock_consumer.return_value = None169 mock_release = self._create_patch(170 'kale.consumer.Consumer.release_messages')171 mock_delete = self._create_patch(172 'kale.consumer.Consumer.delete_messages')173 mock_publish_dlq = self._create_patch(174 'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')175 get_time = self._create_patch('time.time')176 worker_inst = worker.Worker()177 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()178 mock_consumer.assert_called_once_with()179 worker_inst._successful_messages = []180 worker_inst._incomplete_messages = [181 test_utils.new_mock_message() for i in range(2)]182 worker_inst._failed_messages = [183 test_utils.new_mock_message() for i in range(4)]184 worker_inst._batch_stop_time = 20185 # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 19.5 < 1)186 get_time.return_value = 19.5187 deletable_messages = worker_inst._failed_messages188 num_deleted, num_released = worker_inst._release_batch()189 assert not mock_release.called, ('No messages should have '190 'been released.')191 # Failed messages should have been deleted.192 mock_delete.assert_called_once_with(193 deletable_messages, worker_inst._batch_queue.name)194 assert not mock_publish_dlq.called, ('No messages should have'195 'been moved to dlq.')196 self.assertEqual(num_deleted, len(deletable_messages))197 self.assertEqual(num_released, 0)198 self.assertEqual(0, len(worker_inst._incomplete_messages))199 self.assertEqual(0, len(worker_inst._successful_messages))200 self.assertEqual(0, len(worker_inst._failed_messages))201 self.assertEqual(0, len(worker_inst._permanent_failures))202 def testReleaseBatchWithNoDeletableAndNoTimeLeft(self):203 """Test releasing a batch where the spare time is over the threshold.204 """205 mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')206 mock_consumer.return_value = None207 mock_release = self._create_patch(208 'kale.consumer.Consumer.release_messages')209 mock_delete = self._create_patch(210 'kale.consumer.Consumer.delete_messages')211 mock_publish_dlq = self._create_patch(212 'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')213 get_time = self._create_patch('time.time')214 worker_inst = worker.Worker()215 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()216 mock_consumer.assert_called_once_with()217 worker_inst._successful_messages = []218 worker_inst._failed_messages = []219 worker_inst._incomplete_messages = [220 test_utils.new_mock_message() for i in range(2)]221 worker_inst._batch_stop_time = 20222 # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 19.5 < 1)223 get_time.return_value = 19.5224 num_deleted, num_released = worker_inst._release_batch()225 assert not mock_release.called, ('No messages should have '226 'been released.')227 assert not mock_delete.called, 'No messages should have been deleted.'228 assert not mock_publish_dlq.called, ('No messages should have'229 ' been moved to dlq.')230 self.assertEqual(num_deleted, 0)231 self.assertEqual(num_released, 0)232 self.assertEqual(0, len(worker_inst._incomplete_messages))233 self.assertEqual(0, len(worker_inst._successful_messages))234 self.assertEqual(0, len(worker_inst._failed_messages))235 self.assertEqual(0, len(worker_inst._permanent_failures))236 def testReleaseBatchWithNoDeletableAndWithTimeLeft(self):237 """Test releasing a batch where the spare time is over the threshold.238 """239 mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')240 mock_consumer.return_value = None241 mock_release = self._create_patch(242 'kale.consumer.Consumer.release_messages')243 mock_delete = self._create_patch(244 'kale.consumer.Consumer.delete_messages')245 mock_publish_dlq = self._create_patch(246 'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')247 get_time = self._create_patch('time.time')248 worker_inst = worker.Worker()249 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()250 mock_consumer.assert_called_once_with()251 worker_inst._successful_messages = []252 worker_inst._failed_messages = []253 worker_inst._incomplete_messages = [254 test_utils.new_mock_message() for i in range(2)]255 worker_inst._batch_stop_time = 20256 # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 19.5 < 1)257 get_time.return_value = 10258 releasable_messages = worker_inst._incomplete_messages259 num_deleted, num_released = worker_inst._release_batch()260 mock_release.assert_called_once_with(261 releasable_messages, worker_inst._batch_queue.name)262 assert not mock_delete.called, 'No messages should have been deleted.'263 assert not mock_publish_dlq.called, ('No messages should have '264 'been moved to dlq.')265 self.assertEqual(num_deleted, 0)266 self.assertEqual(num_released, len(releasable_messages))267 self.assertEqual(0, len(worker_inst._incomplete_messages))268 self.assertEqual(0, len(worker_inst._successful_messages))269 self.assertEqual(0, len(worker_inst._failed_messages))270 self.assertEqual(0, len(worker_inst._permanent_failures))271 def testRunBatchSuccessful(self):272 """Test a successful batch."""273 mock_consumer = self._create_patch('kale.consumer.Consumer')274 get_time = self._create_patch('time.time')275 worker_inst = worker.Worker()276 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()277 mock_consumer.assert_called_once_with()278 worker_inst._batch_stop_time = 100279 # _batch_stop_time - (get_time + task.time_limit) > 0280 # (100 - (10 + 60)) > 0)281 get_time.return_value = 10282 message_batch = [test_utils.new_mock_message()]283 num_messages = len(message_batch)284 worker_inst._run_batch(message_batch)285 self.assertEqual(0, len(worker_inst._incomplete_messages))286 self.assertEqual(num_messages, len(worker_inst._successful_messages))287 self.assertEqual(0, len(worker_inst._failed_messages))288 self.assertEqual(0, len(worker_inst._permanent_failures))289 def testRunBatchNoTimeRemaining(self):290 """Test a batch where there is not enough time remaining."""291 mock_consumer = self._create_patch('kale.consumer.Consumer')292 get_time = self._create_patch('time.time')293 worker_inst = worker.Worker()294 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()295 mock_consumer.assert_called_once_with()296 worker_inst._batch_stop_time = 50297 # _batch_stop_time - (get_time + task.time_limit) > 0298 # (100 - (10 + 60)) < 0)299 get_time.return_value = 10300 message_batch = [test_utils.new_mock_message()]301 num_messages = len(message_batch)302 worker_inst._run_batch(message_batch)303 self.assertEqual(num_messages, len(worker_inst._incomplete_messages))304 self.assertEqual(0, len(worker_inst._successful_messages))305 self.assertEqual(0, len(worker_inst._failed_messages))306 self.assertEqual(0, len(worker_inst._permanent_failures))307 def testRunBatchTaskTimeout(self):308 """Test batch with a task timeout."""309 mock_consumer = self._create_patch('kale.consumer.Consumer')310 get_time = self._create_patch('time.time')311 mock_failure = self._create_patch(312 'kale.test_utils.TimeoutTask.handle_failure')313 mock_failure.return_value = True314 worker_inst = worker.Worker()315 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()316 mock_consumer.assert_called_once_with()317 worker_inst._batch_stop_time = 100318 # _batch_stop_time - (get_time + task.time_limit) > 0319 # (100 - (10 + 60)) > 0)320 get_time.return_value = 10321 message = test_utils.new_mock_message(322 task_class=test_utils.TimeoutTask)323 message_batch = [message]324 num_messages = len(message_batch)325 worker_inst._run_batch(message_batch)326 fail_msg, fail_exc = mock_failure.call_args[0]327 self.assertEqual(fail_msg, message)328 self.assertTrue(type(fail_exc) == exceptions.TimeoutException)329 self.assertEqual(0, len(worker_inst._incomplete_messages))330 self.assertEqual(0, len(worker_inst._successful_messages))331 self.assertEqual(0, len(worker_inst._permanent_failures))332 self.assertEqual(num_messages, len(worker_inst._failed_messages))333 def testRunBatchTaskException(self):334 """Test batch with a task exception."""335 mock_consumer = self._create_patch('kale.consumer.Consumer')336 get_time = self._create_patch('time.time')337 mock_failure = self._create_patch(338 'kale.test_utils.FailTask.handle_failure')339 mock_failure.return_value = True340 worker_inst = worker.Worker()341 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()342 mock_consumer.assert_called_once_with()343 worker_inst._batch_stop_time = 100344 # _batch_stop_time - (get_time + task.time_limit) > 0345 # (100 - (10 + 60)) > 0)346 get_time.return_value = 10347 message = test_utils.new_mock_message(task_class=test_utils.FailTask)348 message_batch = [message]349 num_messages = len(message_batch)350 worker_inst._run_batch(message_batch)351 fail_msg, fail_exc = mock_failure.call_args[0]352 self.assertEqual(fail_msg, message)353 self.assertTrue(type(fail_exc) == exceptions.TaskException)354 self.assertEqual(0, len(worker_inst._incomplete_messages))355 self.assertEqual(0, len(worker_inst._successful_messages))356 self.assertEqual(0, len(worker_inst._permanent_failures))357 self.assertEqual(num_messages, len(worker_inst._failed_messages))358 def testRunBatchTaskExceptionPermanentFailure(self):359 """Test batch with a task exception."""360 mock_consumer = self._create_patch('kale.consumer.Consumer')361 get_time = self._create_patch('time.time')362 mock_failure = self._create_patch(363 'kale.test_utils.FailTask.handle_failure')364 mock_failure.return_value = False365 worker_inst = worker.Worker()366 worker_inst._batch_queue = worker_inst._queue_selector.get_queue()367 mock_consumer.assert_called_once_with()368 worker_inst._batch_stop_time = 100369 # _batch_stop_time - (get_time + task.time_limit) > 0370 # (100 - (10 + 60)) > 0)371 get_time.return_value = 10372 message = test_utils.new_mock_message(task_class=test_utils.FailTask)373 message_batch = [message]374 num_messages = len(message_batch)375 worker_inst._run_batch(message_batch)376 fail_msg, fail_exc = mock_failure.call_args[0]377 self.assertEqual(fail_msg, message)378 self.assertTrue(type(fail_exc) == exceptions.TaskException)379 self.assertEqual(0, len(worker_inst._incomplete_messages))380 self.assertEqual(0, len(worker_inst._successful_messages))381 self.assertEqual(1, len(worker_inst._permanent_failures))382 self.assertEqual(num_messages, len(worker_inst._failed_messages))383 def testCheckProcessExceedingMemory(self):384 """Test process resources method."""385 mock_resource = self._create_patch('resource.getrusage')386 sys_exit = self._create_patch('sys.exit')387 self._create_patch('kale.consumer.Consumer')388 worker_inst = worker.Worker()389 mock_resource.return_value = mock.MagicMock(ru_maxrss=1000000000)390 worker_inst._check_process_resources()391 sys_exit.assert_called_once_with(1)392 def testCheckProcessDirty(self):393 """Test process resources method."""394 mock_resource = self._create_patch('resource.getrusage')395 mock_resource.return_value = mock.MagicMock(ru_maxrss=10)396 sys_exit = self._create_patch('sys.exit')397 self._create_patch('kale.consumer.Consumer')398 worker_inst = worker.Worker()399 worker_inst._dirty = True400 self.assertTrue(worker_inst._check_process_resources())401 self.assertFalse(sys_exit.called)402 def testCheckProcessNotDirty(self):403 """Test process resources method."""404 mock_logger = self._create_patch('kale.worker.logger.info')405 mock_resource = self._create_patch('resource.getrusage')406 mock_resource.return_value = mock.MagicMock(ru_maxrss=10)407 sys_exit = self._create_patch('sys.exit')408 self._create_patch('kale.consumer.Consumer')409 worker_inst = worker.Worker()410 worker_inst._dirty = False411 self.assertTrue(worker_inst._check_process_resources())412 self.assertFalse(mock_logger.called)413 self.assertFalse(sys_exit.called)414 def testRemoveMessageOrExitSuccess(self):415 """Test remove_message_or_exit method."""416 sys_exit = self._create_patch('sys.exit')417 worker_inst = worker.Worker()418 worker_inst._incomplete_messages = [1, 2]419 worker_inst.remove_message_or_exit(1)420 self.assertEqual(worker_inst._incomplete_messages, [2])421 sys_exit.assert_not_called()422 def testRemoveMessageOrExitFailure(self):423 """Test remove_message_or_exit method."""424 sys_exit = self._create_patch('sys.exit')425 worker_inst = worker.Worker()426 worker_inst._incomplete_messages = [1, 2]427 worker_inst.remove_message_or_exit(3)428 self.assertEqual(worker_inst._incomplete_messages, [1, 2])...
store_configuration_test.py
Source:store_configuration_test.py
1from unittest import mock2import pytest3from confluent_kafka import Consumer4from streaming_data_types.fbschemas.forwarder_config_update_rf5k.UpdateType import (5 UpdateType,6)7from streaming_data_types.forwarder_config_update_rf5k import (8 Protocol,9 StreamInfo,10 serialise_rf5k,11)12from forwarder.configuration_store import ConfigurationStore13from forwarder.parse_config_update import (14 Channel,15 EpicsProtocol,16 config_change_to_command_type,17 parse_config_update,18)19from tests.kafka.fake_producer import FakeProducer20DUMMY_UPDATE_HANDLER = None21CHANNELS_TO_STORE = {22 Channel("channel1", EpicsProtocol.PVA, "topic1", "f142"): DUMMY_UPDATE_HANDLER,23 Channel("channel2", EpicsProtocol.CA, "topic2", "tdct"): DUMMY_UPDATE_HANDLER,24}25STREAMS_TO_RETRIEVE = [26 StreamInfo(27 channel.name,28 channel.schema,29 channel.output_topic,30 Protocol.Protocol.PVA31 if channel.protocol == EpicsProtocol.PVA32 else Protocol.Protocol.CA,33 )34 for channel in CHANNELS_TO_STORE.keys()35]36class FakeKafkaMessage:37 def __init__(self, message):38 self._message = message39 def value(self):40 return self._message41def assert_stored_channel_correct(outputted_channel):42 # Will only be found if key exists and as the key is the channel43 # it will only match if the values are exactly the same.44 assert outputted_channel in CHANNELS_TO_STORE45def test_when_multiple_pvs_dumped_config_contains_all_pv_details():46 producer = FakeProducer()47 store = ConfigurationStore(producer, consumer=None, topic="store_topic")48 store.save_configuration(CHANNELS_TO_STORE)49 stored_message = parse_config_update(producer.published_payload) # type: ignore50 stored_channels = stored_message.channels51 assert_stored_channel_correct(stored_channels[0]) # type: ignore52 assert_stored_channel_correct(stored_channels[1]) # type: ignore53def test_when_no_pvs_stored_message_type_is_remove_all():54 producer = FakeProducer()55 store = ConfigurationStore(producer, consumer=None, topic="store_topic")56 store.save_configuration({})57 stored_message = parse_config_update(producer.published_payload) # type: ignore58 assert stored_message.channels is None59 assert (60 stored_message.command_type61 == config_change_to_command_type[UpdateType.REMOVEALL]62 )63def test_retrieving_stored_info_with_no_pvs_gets_message_without_streams():64 mock_consumer = mock.create_autospec(Consumer)65 mock_consumer.get_watermark_offsets.return_value = (0, 100)66 message = serialise_rf5k(UpdateType.REMOVEALL, [])67 mock_consumer.consume.return_value = [FakeKafkaMessage(message)]68 store = ConfigurationStore(69 producer=None, consumer=mock_consumer, topic="store_topic"70 )71 config = parse_config_update(store.retrieve_configuration())72 assert config.channels is None73def test_retrieving_stored_info_with_multiple_pvs_gets_streams():74 mock_consumer = mock.create_autospec(Consumer)75 mock_consumer.get_watermark_offsets.return_value = (0, 100)76 message = serialise_rf5k(UpdateType.ADD, STREAMS_TO_RETRIEVE)77 mock_consumer.consume.return_value = [FakeKafkaMessage(message)]78 store = ConfigurationStore(79 producer=None, consumer=mock_consumer, topic="store_topic"80 )81 config = parse_config_update(store.retrieve_configuration())82 channels = config.channels83 assert_stored_channel_correct(channels[0]) # type: ignore84 assert_stored_channel_correct(channels[1]) # type: ignore85def test_retrieve_config_find_valid_message_amongst_junk():86 message = serialise_rf5k(UpdateType.ADD, STREAMS_TO_RETRIEVE)87 messages_in_storage_topic = [88 [FakeKafkaMessage(":: SOME JUNK MESSAGE 1 ::")],89 [FakeKafkaMessage(":: SOME JUNK MESSAGE 2 ::")],90 [FakeKafkaMessage(message)],91 [FakeKafkaMessage(":: SOME JUNK MESSAGE 3 ::")],92 [FakeKafkaMessage(":: SOME JUNK MESSAGE 4 ::")],93 ] # type: ignore94 mock_consumer = mock.create_autospec(Consumer)95 mock_consumer.get_watermark_offsets.return_value = (96 0,97 len(messages_in_storage_topic),98 )99 mock_consumer.consume.side_effect = messages_in_storage_topic100 store = ConfigurationStore(101 producer=None, consumer=mock_consumer, topic="store_topic"102 )103 config = parse_config_update(store.retrieve_configuration())104 channels = config.channels105 assert_stored_channel_correct(channels[0]) # type: ignore106 assert_stored_channel_correct(channels[1]) # type: ignore107def test_retrieve_config_with_only_junk_as_message_in_storage_topic():108 messages_in_storage_topic = [109 [FakeKafkaMessage(":: SOME JUNK MESSAGE 1 ::")],110 [FakeKafkaMessage(":: SOME JUNK MESSAGE 2 ::")],111 [FakeKafkaMessage(":: SOME JUNK MESSAGE 3 ::")],112 ] # type: ignore113 mock_consumer = mock.create_autospec(Consumer)114 mock_consumer.get_watermark_offsets.return_value = (115 0,116 len(messages_in_storage_topic),117 )118 mock_consumer.consume.side_effect = messages_in_storage_topic119 store = ConfigurationStore(120 producer=None, consumer=mock_consumer, topic="store_topic"121 )122 with pytest.raises(RuntimeError):123 store.retrieve_configuration()124def test_retrieve_config_with_empty_storage_topic():125 messages_in_storage_topic = [] # type: ignore126 mock_consumer = mock.create_autospec(Consumer)127 mock_consumer.get_watermark_offsets.return_value = (128 0,129 len(messages_in_storage_topic),130 )131 mock_consumer.consume.side_effect = messages_in_storage_topic132 store = ConfigurationStore(133 producer=None, consumer=mock_consumer, topic="store_topic"134 )135 with pytest.raises(RuntimeError):...
TestParser.py
Source:TestParser.py
1import nose2from pymock import *3import sys4sys.path.insert(0,'..')5from profileparser import ProfileParser6class TestParser(PyMockTestCase):7 def test_should_construct_with_consumer(self):8 mock_consumer = self.mock()9 parser = ProfileParser(mock_consumer)10 def test_should_parse_thread_msg(self):11 mock_consumer = self.mock()12 parser = ProfileParser(mock_consumer)13 self.expectAndReturn(mock_consumer.on_thread(332, "My Thread Name"), None)14 self.replay()15 parser.parse("T 332 My Thread Name")16 self.verify()17 def test_should_parse_thread_msg_2(self):18 mock_consumer = self.mock()19 parser = ProfileParser(mock_consumer)20 self.expectAndReturn(mock_consumer.on_thread(102, "Another Thread"), None)21 self.replay()22 parser.parse("T 102 Another Thread")23 self.verify()24 def test_should_parse_function_msg(self):25 mock_consumer = self.mock()26 parser = ProfileParser(mock_consumer)27 self.expectAndReturn(mock_consumer.on_function(113, 223, "My function name"), None)28 self.replay()29 parser.parse("F 113 223 My function name")30 self.verify()31 def test_should_parse_function_msg_2(self):32 mock_consumer = self.mock()33 parser = ProfileParser(mock_consumer)34 self.expectAndReturn(mock_consumer.on_function(43, 44, "Another function"), None)35 self.replay()36 parser.parse("F 43 44 Another function")37 self.verify()38 def test_should_parse_sample_start_msg(self):39 mock_consumer = self.mock()40 parser = ProfileParser(mock_consumer)41 self.expectAndReturn(mock_consumer.on_sample_start(0, 113, 333), None)42 self.replay()43 parser.parse("S 0 113 333")44 self.verify()45 def test_should_parse_sample_start_msg_2(self):46 mock_consumer = self.mock()47 parser = ProfileParser(mock_consumer)48 self.expectAndReturn(mock_consumer.on_sample_start(2, 1, 444), None)49 self.replay()50 parser.parse("S 2 1 444")51 self.verify()52 def test_should_parse_sample_finish_msg(self):53 mock_consumer = self.mock()54 parser = ProfileParser(mock_consumer)55 self.expectAndReturn(mock_consumer.on_sample_finish(4, 113, 333), None)56 self.replay()57 parser.parse("E 4 113 333")58 self.verify()59 def test_should_parse_sample_finish_msg_2(self):60 mock_consumer = self.mock()61 parser = ProfileParser(mock_consumer)62 self.expectAndReturn(mock_consumer.on_sample_finish(7, 1, 444), None)63 self.replay()64 parser.parse("E 7 1 444")65 self.verify()66 def test_should_ignore_comments(self):67 mock_consumer = self.mock()68 parser = ProfileParser(mock_consumer)69 self.replay()70 parser.parse("#a_comment_without_spaces")71 parser.parse("# another comment with spaces")72 self.verify()73 74 def test_should_parse_event_msg(self):75 mock_consumer = self.mock()76 parser = ProfileParser(mock_consumer)77 self.expectAndReturn(mock_consumer.on_event(113, 223, "My event name"), None)78 self.replay()79 parser.parse("V 113 223 My event name")80 self.verify()81 def test_should_parse_event_msg_2(self):82 mock_consumer = self.mock()83 parser = ProfileParser(mock_consumer)84 self.expectAndReturn(mock_consumer.on_event(43, 44, "Another event"), None)85 self.replay()86 parser.parse("V 43 44 Another event")87 self.verify()88 89 def test_should_parse_emit_event_msg(self):90 mock_consumer = self.mock()91 parser = ProfileParser(mock_consumer)92 self.expectAndReturn(mock_consumer.on_event_emit(0, 113, 333), None)93 self.replay()94 parser.parse("Y 0 113 333")95 self.verify()96 def test_should_parse_emit_event_msg_2(self):97 mock_consumer = self.mock()98 parser = ProfileParser(mock_consumer)99 self.expectAndReturn(mock_consumer.on_event_emit(2, 1, 444), None)100 self.replay()101 parser.parse("Y 2 1 444")102 self.verify()103 def test_should_parse_counter_registration(self):104 mock_consumer = self.mock()105 parser = ProfileParser(mock_consumer)106 self.expectAndReturn(mock_consumer.on_counter(132, "My counter"), None)107 self.replay()108 parser.parse("C 132 My counter")109 self.verify()110 111 def test_should_parse_counter_value(self):112 mock_consumer = self.mock()113 parser = ProfileParser(mock_consumer)114 self.expectAndReturn(mock_consumer.on_counter_value(132, 22, 44), None)115 self.replay()116 parser.parse("D 132 22 44")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!