Best Python code snippet using lettuce-tools_python
test_check.py
Source:test_check.py
...96 def setUp(self):97 self.client_mock = Mock()98 self.vhost_mock = Mock()99 def test_ok(self):100 res = get_queues(self.client_mock, self.vhost_mock)101 self.assertEqual(res, self.client_mock.get_queues.return_value)102 self.client_mock.get_queues.assert_called_once_with(self.vhost_mock)103 def test_network_error(self):104 self.client_mock.get_queues.side_effect = NetworkError()105 with self.assertRaises(RabbitCritical) as excinfo:106 get_queues(self.client_mock, self.vhost_mock)107 self.assertEqual(excinfo.exception.errors,108 {'all': ['Can not communicate with RabbitMQ.']})109 self.client_mock.get_queues.assert_called_once_with(self.vhost_mock)110 def test_404(self):111 self.client_mock.get_queues.side_effect = HTTPError('', status=404)112 with self.assertRaises(RabbitCritical) as excinfo:113 get_queues(self.client_mock, self.vhost_mock)114 self.assertEqual(excinfo.exception.errors, {'all':115 ['Queue not found.']})116 self.client_mock.get_queues.assert_called_once_with(self.vhost_mock)117 def test_401(self):118 self.client_mock.get_queues.side_effect = HTTPError('', status=401)119 with self.assertRaises(RabbitCritical) as excinfo:120 get_queues(self.client_mock, self.vhost_mock)121 self.assertEqual(excinfo.exception.errors, {'all': ['Unauthorized.']})122 self.client_mock.get_queues.assert_called_once_with(self.vhost_mock)123 def test_unknown_error(self):124 self.client_mock.get_queues.side_effect = HTTPError('', status=500)125 with self.assertRaises(RabbitCritical) as excinfo:126 get_queues(self.client_mock, self.vhost_mock)127 self.assertEqual(excinfo.exception.errors,128 {'all': ['Unhandled HTTP error, status: 500']})129 self.client_mock.get_queues.assert_called_once_with(self.vhost_mock)130@patch(MODULE + '.get_config', Mock())131@patch(MODULE + '.format_status', Mock())132@patch(MODULE + '.check_lengths', Mock())133@patch(MODULE + '.get_queues')134@patch(MODULE + '.sys.exit')135class RunTestCase(TestCase):136 def test_ok(self, exit_mock, get_queues_mock):137 run()138 exit_mock.assert_called_once_with(0)139 def test_warning(self, exit_mock, get_queues_mock):140 get_queues_mock.side_effect = RabbitWarning(['all'])...
test_queue_selector.py
Source:test_queue_selector.py
...3import unittest4from kale import queue_info5from kale import queue_selector6class MultiQueueInfo(queue_info.QueueInfoBase):7 def get_queues(self):8 return [queue_info.TaskQueue(name='queue1', priority=100),9 queue_info.TaskQueue(name='queue2', priority=50),10 queue_info.TaskQueue(name='queue3', priority=1)]11 def does_queue_need_work(self, queue):12 return not self.is_queue_empty(queue)13 def is_queue_empty(self, queue):14 if queue.name == 'queue2':15 return False16 return True17 def get_highest_priority_queue_that_needs_work(self):18 return self.get_queues()[0]19class SingleQueueInfo(queue_info.QueueInfoBase):20 def get_queues(self):21 return [queue_info.TaskQueue(name='queue1', priority=99)]22class NoQueueInfo(queue_info.QueueInfoBase):23 def get_queues(self):24 return []25class MultiQueueNoPriorityInfo(MultiQueueInfo):26 def get_highest_priority_queue_that_needs_work(self):27 return None28class BadQueueInfo(queue_info.QueueInfoBase):29 def get_queues(self):30 return [queue_info.TaskQueue(name='queue1', priority=101),31 queue_info.TaskQueue(name='queue2', priority=0)]32class SelectQueueBaseTest(unittest.TestCase):33 """Tests for SelectQueueBase class."""34 def get_queue_test(self):35 # Get any one queue from multiple queues36 selector = queue_selector.SelectQueueBase(MultiQueueInfo())37 with self.assertRaises(NotImplementedError):38 selector.get_queue()39class RandomQueueTest(unittest.TestCase):40 """Tests for Random class."""41 def get_queue_test(self):42 # Get any one queue from multiple queues43 queue = queue_selector.Random(MultiQueueInfo()).get_queue()44 self.assertTrue(queue.name in ['queue1', 'queue2', 'queue3'])45class HighestPriorityFirstTest(unittest.TestCase):46 """Tests for HighestPriorityFirst class."""47 def get_queue_test(self):48 # Get any one queue from multiple queues49 queue = queue_selector.HighestPriorityFirst(50 MultiQueueInfo()).get_queue()51 self.assertEqual(queue.name, 'queue1')52 def get_queue_test_no_priority(self):53 # Get any one queue from multiple queues54 queue = queue_selector.HighestPriorityFirst(55 MultiQueueNoPriorityInfo()).get_queue()56 self.assertTrue(queue.name in ['queue1', 'queue2', 'queue3'])57class LotteryTest(unittest.TestCase):58 """Tests for Lottery class."""59 def run_lottery_test(self):60 queue = queue_selector.Lottery._run_lottery(61 MultiQueueInfo().get_queues())62 self.assertTrue(queue.name in ['queue1', 'queue2', 'queue3'])63 queue = queue_selector.Lottery._run_lottery(64 SingleQueueInfo().get_queues())65 self.assertEqual(queue.name, 'queue1')66 queue = queue_selector.Lottery._run_lottery(NoQueueInfo().get_queues())67 self.assertIsNone(queue)68 queue = queue_selector.Lottery._run_lottery(69 BadQueueInfo().get_queues())70 self.assertIsNone(queue)71 def get_queue_test(self):72 # Get any one queue from multiple queues73 selector = queue_selector.Lottery(MultiQueueInfo())74 queue = selector.get_queue()75 self.assertTrue(queue.name in ['queue1', 'queue2', 'queue3'])76class ReducedLotteryTest(unittest.TestCase):77 """Tests for ReducedLottery class."""78 def get_queue_test(self):79 selector = queue_selector.ReducedLottery(MultiQueueInfo())80 queue = selector.get_queue()81 self.assertEqual(queue.name, 'queue2')82class HighestPriorityLotteryTest(unittest.TestCase):83 """Tests for HighestPriorityLottery class."""84 def run_lottery_test(self):85 queue = queue_selector.HighestPriorityLottery._run_lottery(86 MultiQueueInfo().get_queues())87 self.assertTrue(queue.name in ['queue1', 'queue2', 'queue3'])88 queue = queue_selector.HighestPriorityLottery._run_lottery(89 SingleQueueInfo().get_queues())90 self.assertEqual(queue.name, 'queue1')91 queue = queue_selector.HighestPriorityLottery._run_lottery(92 NoQueueInfo().get_queues())93 self.assertIsNone(queue)94 queue = queue_selector.HighestPriorityLottery._run_lottery(95 BadQueueInfo().get_queues())96 self.assertIsNone(queue)97 def get_queue_test(self):98 # Get any one queue from multiple queues99 selector = queue_selector.HighestPriorityLottery(MultiQueueInfo())100 queue = selector.get_queue()101 self.assertTrue(queue.name in ['queue1', 'queue2', 'queue3'])102class LotteryLotteryTest(unittest.TestCase):103 """Tests for LotteryLottery class."""104 def run_lottery_test(self):105 queue = queue_selector.LotteryLottery._run_lottery(106 MultiQueueInfo().get_queues())107 self.assertTrue(queue.name in ['queue1', 'queue2', 'queue3'])108 queue = queue_selector.LotteryLottery._run_lottery(109 SingleQueueInfo().get_queues())110 self.assertEqual(queue.name, 'queue1')111 queue = queue_selector.LotteryLottery._run_lottery(112 NoQueueInfo().get_queues())113 self.assertIsNone(queue)114 queue = queue_selector.LotteryLottery._run_lottery(115 BadQueueInfo().get_queues())116 self.assertIsNone(queue)117 def get_queue_test(self):118 # Get any one queue from multiple queues119 selector = queue_selector.LotteryLottery(MultiQueueInfo())120 queue = selector.get_queue()...
partition_count_watching_publisher_test.py
Source:partition_count_watching_publisher_test.py
1# Copyright 2020 Google LLC2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from asynctest.mock import MagicMock15import pytest16from google.cloud.pubsublite.internal.wire.partition_count_watcher import (17 PartitionCountWatcher,18)19from google.cloud.pubsublite.internal.wire.partition_count_watching_publisher import (20 PartitionCountWatchingPublisher,21)22from google.cloud.pubsublite.internal.wire.publisher import Publisher23from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy24from google.cloud.pubsublite.testing.test_utils import wire_queues, run_on_thread25from google.cloud.pubsublite.types import Partition26from google.cloud.pubsublite_v1 import PubSubMessage27from google.api_core.exceptions import GoogleAPICallError28pytestmark = pytest.mark.asyncio29@pytest.fixture()30def mock_publishers():31 return {Partition(i): MagicMock(spec=Publisher) for i in range(10)}32@pytest.fixture()33def mock_policies():34 return {i: MagicMock(spec=RoutingPolicy) for i in range(10)}35@pytest.fixture()36def mock_watcher():37 watcher = MagicMock(spec=PartitionCountWatcher)38 return watcher39@pytest.fixture()40def publisher(mock_watcher, mock_publishers, mock_policies):41 return run_on_thread(42 lambda: PartitionCountWatchingPublisher(43 mock_watcher, lambda p: mock_publishers[p], lambda c: mock_policies[c]44 )45 )46async def test_init(mock_watcher, publisher):47 mock_watcher.get_partition_count.return_value = 248 async with publisher:49 mock_watcher.__aenter__.assert_called_once()50 pass51 mock_watcher.__aexit__.assert_called_once()52async def test_failed_init(mock_watcher, publisher):53 mock_watcher.get_partition_count.side_effect = GoogleAPICallError("error")54 with pytest.raises(GoogleAPICallError):55 await publisher.__aenter__()56 mock_watcher.__aenter__.assert_called_once()57 mock_watcher.__aexit__.assert_called_once()58 await publisher.__aexit__(None, None, None)59async def test_simple_publish(mock_publishers, mock_policies, mock_watcher, publisher):60 mock_watcher.get_partition_count.return_value = 261 async with publisher:62 mock_policies[2].route.return_value = Partition(1)63 mock_publishers[Partition(1)].publish.return_value = "a"64 await publisher.publish(PubSubMessage())65 mock_policies[2].route.assert_called_with(PubSubMessage())66 mock_publishers[Partition(1)].publish.assert_called()67async def test_publish_after_increase(68 mock_publishers, mock_policies, mock_watcher, publisher69):70 get_queues = wire_queues(mock_watcher.get_partition_count)71 await get_queues.results.put(2)72 async with publisher:73 get_queues.called.get_nowait()74 mock_policies[2].route.return_value = Partition(1)75 mock_publishers[Partition(1)].publish.return_value = "a"76 await publisher.publish(PubSubMessage())77 mock_policies[2].route.assert_called_with(PubSubMessage())78 mock_publishers[Partition(1)].publish.assert_called()79 await get_queues.called.get()80 await get_queues.results.put(3)81 await get_queues.called.get()82 mock_policies[3].route.return_value = Partition(2)83 mock_publishers[Partition(2)].publish.return_value = "a"84 await publisher.publish(PubSubMessage())85 mock_policies[3].route.assert_called_with(PubSubMessage())86 mock_publishers[Partition(2)].publish.assert_called()87async def test_decrease_ignored(88 mock_publishers, mock_policies, mock_watcher, publisher89):90 get_queues = wire_queues(mock_watcher.get_partition_count)91 await get_queues.results.put(2)92 async with publisher:93 get_queues.called.get_nowait()94 mock_policies[2].route.return_value = Partition(1)95 mock_publishers[Partition(1)].publish.return_value = "a"96 await publisher.publish(PubSubMessage())97 mock_policies[2].route.assert_called_with(PubSubMessage())98 mock_publishers[Partition(1)].publish.assert_called()99 await get_queues.called.get()100 await get_queues.results.put(1)101 await get_queues.called.get()102 mock_policies[2].route.return_value = Partition(1)103 mock_publishers[Partition(1)].publish.return_value = "a"104 await publisher.publish(PubSubMessage())105 mock_policies[2].route.assert_called_with(PubSubMessage())...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!