Best Python code snippet using autotest_python
consumer_test.py
Source:consumer_test.py
1# Licensed to the Apache Software Foundation (ASF) under one or more2# contributor license agreements. See the NOTICE file distributed with3# this work for additional information regarding copyright ownership.4# The ASF licenses this file to You under the Apache License, Version 2.05# (the "License"); you may not use this file except in compliance with6# the License. You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15from ducktape.mark import matrix16from ducktape.utils.util import wait_until17from kafkatest.tests.kafka_test import KafkaTest18from kafkatest.services.zookeeper import ZookeeperService19from kafkatest.services.kafka import KafkaService20from kafkatest.services.verifiable_producer import VerifiableProducer21from kafkatest.services.verifiable_consumer import VerifiableConsumer22from kafkatest.services.kafka import TopicPartition23import signal24def partitions_for(topic, num_partitions):25 partitions = set()26 for i in range(num_partitions):27 partitions.add(TopicPartition(topic=topic, partition=i))28 return partitions29class VerifiableConsumerTest(KafkaTest):30 STOPIC = "simple_topic"31 TOPIC = "test_topic"32 NUM_PARTITIONS = 333 PARTITIONS = partitions_for(TOPIC, NUM_PARTITIONS)34 GROUP_ID = "test_group_id"35 def __init__(self, test_context):36 super(VerifiableConsumerTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={37 self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },38 self.STOPIC : { 'partitions': 1, 'replication-factor': 2 }39 })40 self.num_producers = 141 self.num_consumers = 242 self.session_timeout = 1000043 def min_cluster_size(self):44 """Override this since we're adding services outside of the constructor"""45 return super(VerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers46 def _partitions(self, assignment):47 partitions = []48 for parts in assignment.itervalues():49 partitions += parts50 return partitions51 def _valid_assignment(self, assignment):52 partitions = self._partitions(assignment)53 return len(partitions) == self.NUM_PARTITIONS and set(partitions) == self.PARTITIONS54 def _setup_consumer(self, topic, enable_autocommit=False):55 return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,56 topic, self.GROUP_ID, session_timeout=self.session_timeout,57 enable_autocommit=enable_autocommit)58 def _setup_producer(self, topic, max_messages=-1):59 return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic,60 max_messages=max_messages, throughput=500)61 def _await_all_members(self, consumer):62 # Wait until all members have joined the group63 wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers, timeout_sec=self.session_timeout+5,64 err_msg="Consumers failed to join in a reasonable amount of time")65 def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):66 for _ in range(num_bounces):67 for node in consumer.nodes:68 consumer.stop_node(node, clean_shutdown)69 wait_until(lambda: len(consumer.dead_nodes()) == (self.num_consumers - 1), timeout_sec=self.session_timeout,70 err_msg="Timed out waiting for the consumers to shutdown")71 total_consumed = consumer.total_consumed()72 73 consumer.start_node(node)74 wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,75 timeout_sec=self.session_timeout,76 err_msg="Timed out waiting for the consumers to shutdown")77 def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):78 for _ in range(num_bounces):79 for node in consumer.nodes:80 consumer.stop_node(node, clean_shutdown)81 wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,82 err_msg="Timed out waiting for the consumers to shutdown")83 total_consumed = consumer.total_consumed()84 85 for node in consumer.nodes:86 consumer.start_node(node)87 wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,88 timeout_sec=self.session_timeout*2,89 err_msg="Timed out waiting for the consumers to shutdown")90 def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):91 for _ in range(num_bounces):92 for node in self.kafka.nodes:93 total_consumed = consumer.total_consumed()94 self.kafka.restart_node(node, clean_shutdown=True)95 wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,96 timeout_sec=30,97 err_msg="Timed out waiting for the broker to shutdown")98 def bounce_all_brokers(self, consumer, num_bounces=5, clean_shutdown=True):99 for _ in range(num_bounces):100 for node in self.kafka.nodes:101 self.kafka.stop_node(node)102 for node in self.kafka.nodes:103 self.kafka.start_node(node)104 105 def test_broker_rolling_bounce(self):106 """107 Verify correct consumer behavior when the brokers are consecutively restarted.108 Setup: single Kafka cluster with one producer writing messages to a single topic with one109 partition, an a set of consumers in the same group reading from the same topic.110 - Start a producer which continues producing new messages throughout the test.111 - Start up the consumers and wait until they've joined the group.112 - In a loop, restart each broker consecutively, waiting for the group to stabilize between113 each broker restart.114 - Verify delivery semantics according to the failure type and that the broker bounces115 did not cause unexpected group rebalances.116 """117 partition = TopicPartition(self.STOPIC, 0)118 119 producer = self._setup_producer(self.STOPIC)120 consumer = self._setup_consumer(self.STOPIC)121 producer.start()122 wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,123 err_msg="Producer failed waiting for messages to be written")124 consumer.start()125 self._await_all_members(consumer)126 num_rebalances = consumer.num_rebalances()127 # TODO: make this test work with hard shutdowns, which probably requires128 # pausing before the node is restarted to ensure that any ephemeral129 # nodes have time to expire130 self.rolling_bounce_brokers(consumer, clean_shutdown=True)131 132 unexpected_rebalances = consumer.num_rebalances() - num_rebalances133 assert unexpected_rebalances == 0, \134 "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances135 consumer.stop_all()136 assert consumer.current_position(partition) == consumer.total_consumed(), \137 "Total consumed records did not match consumed position"138 @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])139 def test_consumer_bounce(self, clean_shutdown, bounce_mode):140 """141 Verify correct consumer behavior when the consumers in the group are consecutively restarted.142 Setup: single Kafka cluster with one producer and a set of consumers in one group.143 - Start a producer which continues producing new messages throughout the test.144 - Start up the consumers and wait until they've joined the group.145 - In a loop, restart each consumer, waiting for each one to rejoin the group before146 restarting the rest.147 - Verify delivery semantics according to the failure type.148 """149 partition = TopicPartition(self.STOPIC, 0)150 151 producer = self._setup_producer(self.STOPIC)152 consumer = self._setup_consumer(self.STOPIC)153 producer.start()154 wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,155 err_msg="Producer failed waiting for messages to be written")156 consumer.start()157 self._await_all_members(consumer)158 if bounce_mode == "all":159 self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)160 else:161 self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)162 163 consumer.stop_all()164 if clean_shutdown:165 # if the total records consumed matches the current position, we haven't seen any duplicates166 # this can only be guaranteed with a clean shutdown167 assert consumer.current_position(partition) == consumer.total_consumed(), \168 "Total consumed records did not match consumed position"169 else:170 # we may have duplicates in a hard failure171 assert consumer.current_position(partition) <= consumer.total_consumed(), \172 "Current position greater than the total number of consumed records"173 @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])174 def test_consumer_failure(self, clean_shutdown, enable_autocommit):175 partition = TopicPartition(self.STOPIC, 0)176 177 consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit)178 producer = self._setup_producer(self.STOPIC)179 consumer.start()180 self._await_all_members(consumer)181 partition_owner = consumer.owner(partition)182 assert partition_owner is not None183 # startup the producer and ensure that some records have been written184 producer.start()185 wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,186 err_msg="Producer failed waiting for messages to be written")187 # stop the partition owner and await its shutdown188 consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)189 wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,190 timeout_sec=self.session_timeout+5, err_msg="Timed out waiting for consumer to close")191 # ensure that the remaining consumer does some work after rebalancing192 current_total_consumed = consumer.total_consumed()193 wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=10,194 err_msg="Timed out waiting for additional records to be consumed after first consumer failed")195 consumer.stop_all()196 if clean_shutdown:197 # if the total records consumed matches the current position, we haven't seen any duplicates198 # this can only be guaranteed with a clean shutdown199 assert consumer.current_position(partition) == consumer.total_consumed(), \200 "Total consumed records did not match consumed position"201 else:202 # we may have duplicates in a hard failure203 assert consumer.current_position(partition) <= consumer.total_consumed(), \204 "Current position greater than the total number of consumed records"205 # if autocommit is not turned on, we can also verify the last committed offset206 if not enable_autocommit:207 assert consumer.last_commit(partition) == consumer.current_position(partition), \208 "Last committed offset did not match last consumed position"209 @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])210 def test_broker_failure(self, clean_shutdown, enable_autocommit):211 partition = TopicPartition(self.STOPIC, 0)212 213 consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit)214 producer = self._setup_producer(self.STOPIC)215 producer.start()216 consumer.start()217 self._await_all_members(consumer)218 num_rebalances = consumer.num_rebalances()219 # shutdown one of the brokers220 # TODO: we need a way to target the coordinator instead of picking arbitrarily221 self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)222 # ensure that the consumers do some work after the broker failure223 current_total_consumed = consumer.total_consumed()224 wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=20,225 err_msg="Timed out waiting for additional records to be consumed after first consumer failed")226 # verify that there were no rebalances on failover227 assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"228 consumer.stop_all()229 # if the total records consumed matches the current position, we haven't seen any duplicates230 assert consumer.current_position(partition) == consumer.total_consumed(), \231 "Total consumed records did not match consumed position"232 # if autocommit is not turned on, we can also verify the last committed offset233 if not enable_autocommit:234 assert consumer.last_commit(partition) == consumer.current_position(partition), \235 "Last committed offset did not match last consumed position"236 def test_simple_consume(self):237 total_records = 1000238 consumer = self._setup_consumer(self.STOPIC)239 producer = self._setup_producer(self.STOPIC, max_messages=total_records)240 partition = TopicPartition(self.STOPIC, 0)241 consumer.start()242 self._await_all_members(consumer)243 producer.start()244 wait_until(lambda: producer.num_acked == total_records, timeout_sec=20,245 err_msg="Producer failed waiting for messages to be written")246 wait_until(lambda: consumer.last_commit(partition) == total_records, timeout_sec=10,247 err_msg="Consumer failed to read all expected messages")248 assert consumer.current_position(partition) == total_records249 def test_valid_assignment(self):250 consumer = self._setup_consumer(self.TOPIC)251 consumer.start()252 self._await_all_members(consumer)...
scheduler_lib_unittest.py
Source:scheduler_lib_unittest.py
1#!/usr/bin/python2#3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.4# Use of this source code is governed by a BSD-style license that can be5# found in the LICENSE file.6import mock7import unittest8import common9from autotest_lib.database import database_connection10from autotest_lib.frontend import setup_django_environment11from autotest_lib.frontend.afe import readonly_connection12from autotest_lib.server import utils as server_utils13from autotest_lib.scheduler import scheduler_lib14from django.db import utils as django_utils15class ConnectionManagerTests(unittest.TestCase):16 """Connection manager unittests."""17 def setUp(self):18 self.connection_manager = None19 readonly_connection.set_globally_disabled = mock.MagicMock()20 setup_django_environment.enable_autocommit = mock.MagicMock()21 server_utils.Singleton._instances = {}22 def tearDown(self):23 readonly_connection.set_globally_disabled.reset_mock()24 setup_django_environment.enable_autocommit.reset_mock()25 def testConnectionDisconnect(self):26 """Test connection and disconnecting from the database."""27 # Test that the connection manager only opens a connection once.28 connection_manager = scheduler_lib.ConnectionManager()29 connection_manager.open_connection = mock.MagicMock()30 connection = connection_manager.get_connection()31 connection_manager.open_connection.assert_called_once_with()32 connection_manager.open_connection.reset_mock()33 connection = connection_manager.get_connection()34 self.assertTrue(35 connection_manager.open_connection.call_count == 0)36 connection_manager.open_connection.reset_mock()37 # Test that del on the connection manager closes the connection38 connection_manager.disconnect = mock.MagicMock()39 connection_manager.__del__()40 connection_manager.disconnect.assert_called_once_with()41 def testConnectionReconnect(self):42 """Test that retries don't destroy the connection."""43 database_connection._DjangoBackend.execute = mock.MagicMock()44 database_connection._DjangoBackend.execute.side_effect = (45 django_utils.DatabaseError('Database Error'))46 connection_manager = scheduler_lib.ConnectionManager()47 connection = connection_manager.get_connection()48 self.assertRaises(django_utils.DatabaseError,49 connection.execute, *('', None, True))50 self.assertTrue(51 database_connection._DjangoBackend.execute.call_count == 2)52 database_connection._DjangoBackend.execute.reset_mock()53 self.assertTrue(connection_manager.db_connection ==54 connection_manager.get_connection())55 def testConnectionManagerSingleton(self):56 """Test that the singleton works as expected."""57 # Confirm that instantiating the class applies global db settings.58 connection_manager = scheduler_lib.ConnectionManager()59 readonly_connection.set_globally_disabled.assert_called_once_with(True)60 setup_django_environment.enable_autocommit.assert_called_once_with()61 readonly_connection.set_globally_disabled.reset_mock()62 setup_django_environment.enable_autocommit.reset_mock()63 # Confirm that instantiating another connection manager doesn't change64 # the database settings, and in fact, returns the original manager.65 connection_manager_2 = scheduler_lib.ConnectionManager()66 self.assertTrue(connection_manager == connection_manager_2)67 self.assertTrue(68 readonly_connection.set_globally_disabled.call_count == 0)69 self.assertTrue(70 setup_django_environment.enable_autocommit.call_count == 0)71 # Confirm that we don't open the connection when the class is72 # instantiated.73 self.assertTrue(connection_manager.db_connection is None)74if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!