How to use KafkaContainerCluster class of com.example.kafkacluster package

Best Testcontainers-java code snippet using com.example.kafkacluster.KafkaContainerCluster

Source:KafkaContainerCluster.java Github

copy

Full Screen

...16import java.util.stream.Stream;17import static java.util.concurrent.TimeUnit.SECONDS;18/**19 * Provides an easy way to launch a Kafka cluster with multiple brokers. Copied from20 * https://github.com/testcontainers/testcontainers-java/blob/master/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java21 * <p>22 * Since it was copied from the link above, we decided to not make any major changes to it and use it as is.23 */24public class KafkaContainerCluster implements Startable {25 private final int brokersNum;26 private final Network network;27 private final GenericContainer<?> zookeeper;28 private final Collection<KafkaContainer> brokers;29 /**30 * Default constructor for building a kafka cluster to be used for tests.31 *32 * @param confluentPlatformVersion version of the kafka image33 * @param brokersNum number of brokers34 * @param internalTopicsRf replication factor35 */36 public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {37 if (brokersNum < 0) {38 throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");39 }40 if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {41 throw new IllegalArgumentException(42 "internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0");43 }44 this.brokersNum = brokersNum;45 this.network = Network.newNetwork();46 this.zookeeper = new GenericContainer<>(DockerImageName.parse("confluentinc/cp-zookeeper")47 .withTag(confluentPlatformVersion))48 .withNetwork(network)49 .withNetworkAliases("zookeeper")50 .withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT));...

Full Screen

Full Screen

Source:KafkaContainerClusterTest.java Github

copy

Full Screen

...20import java.util.UUID;21import java.util.concurrent.TimeUnit;22import static org.assertj.core.api.Assertions.assertThat;23import static org.assertj.core.api.Assertions.tuple;24public class KafkaContainerClusterTest {25 @Test26 public void testKafkaContainerCluster() throws Exception {27 try (28 KafkaContainerCluster cluster = new KafkaContainerCluster("6.2.1", 3, 2)29 ) {30 cluster.start();31 String bootstrapServers = cluster.getBootstrapServers();32 assertThat(cluster.getBrokers()).hasSize(3);33 testKafkaFunctionality(bootstrapServers, 3, 2);34 }35 }36 protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {37 try (38 AdminClient adminClient = AdminClient.create(ImmutableMap.of(39 AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers40 ));41 KafkaProducer<String, String> producer = new KafkaProducer<>(42 ImmutableMap.of(...

Full Screen

Full Screen

KafkaContainerCluster

Using AI Code Generation

copy

Full Screen

1import com.example.kafkacluster.KafkaContainerCluster;2import org.apache.kafka.clients.consumer.ConsumerRecord;3import org.apache.kafka.clients.consumer.ConsumerRecords;4import org.apache.kafka.clients.consumer.KafkaConsumer;5import org.apache.kafka.clients.producer.ProducerRecord;6import org.apache.kafka.common.serialization.StringDeserializer;7import org.apache.kafka.common.serialization.StringSerializer;8import org.junit.Test;9import java.util.Arrays;10import java.util.Properties;11import java.util.concurrent.TimeUnit;12public class KafkaContainerClusterTest {13 public void test() throws Exception {14 KafkaContainerCluster kafka = new KafkaContainerCluster(3, 1);15 kafka.start();16 Properties producerProperties = new Properties();17 producerProperties.put("bootstrap.servers", kafka.getBootstrapServers());18 producerProperties.put("key.serializer", StringSerializer.class.getName());19 producerProperties.put("value.serializer", StringSerializer.class.getName());20 Properties consumerProperties = new Properties();21 consumerProperties.put("bootstrap.servers", kafka.getBootstrapServers());22 consumerProperties.put("key.deserializer", StringDeserializer.class.getName());23 consumerProperties.put("value.deserializer", StringDeserializer.class.getName());24 consumerProperties.put("group.id", "test");25 consumerProperties.put("auto.offset.reset", "earliest");26 kafka.createTopic("test", 3, (short) 1);27 kafka.send(new ProducerRecord<>("test", "test", "test"));28 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);29 consumer.subscribe(Arrays.asList("test"));30 ConsumerRecords<String, String> records = consumer.poll(10000);31 for (ConsumerRecord<String, String> record : records) {32 System.out.println(record);33 }34 consumer.close();35 kafka.stop();36 }37}

Full Screen

Full Screen

KafkaContainerCluster

Using AI Code Generation

copy

Full Screen

1import com.example.kafkacluster.KafkaContainerCluster;2import org.apache.kafka.clients.producer.KafkaProducer;3import org.apache.kafka.clients.producer.ProducerRecord;4import org.apache.kafka.common.serialization.StringSerializer;5import java.util.Properties;6import java.util.concurrent.ExecutionException;7public class 1 {8 public static void main(String[] args) throws InterruptedException, ExecutionException {9 KafkaContainerCluster kafkaContainerCluster = new KafkaContainerCluster(3, 2181, 9092);10 kafkaContainerCluster.start();11 Properties properties = new Properties();12 properties.put("bootstrap.servers", kafkaContainerCluster.getBootstrapServers());13 properties.put("key.serializer", StringSerializer.class);14 properties.put("value.serializer", StringSerializer.class);15 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);16 producer.send(new ProducerRecord<String, String>("test", "Hello World!")).get();17 kafkaContainerCluster.stop();18 }19}20import com.example.kafkacluster.KafkaContainerCluster;21import org.apache.kafka.clients.consumer.ConsumerRecord;22import org.apache.kafka.clients.consumer.ConsumerRecords;23import org.apache.kafka.clients.consumer.KafkaConsumer;24import org.apache.kafka.common.serialization.StringDeserializer;25import java.util.Arrays;26import java.util.Properties;27public class 2 {28 public static void main(String[] args) throws InterruptedException {29 KafkaContainerCluster kafkaContainerCluster = new KafkaContainerCluster(3, 2181, 9092);30 kafkaContainerCluster.start();31 Properties properties = new Properties();32 properties.put("bootstrap.servers", kafkaContainerCluster.getBootstrapServers());33 properties.put("key.deserializer", StringDeserializer.class);34 properties.put("value.deserializer", StringDeserializer.class);35 properties.put("group.id", "test");36 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);37 consumer.subscribe(Arrays.asList("test

Full Screen

Full Screen

KafkaContainerCluster

Using AI Code Generation

copy

Full Screen

1package com.example.kafkacluster;2import java.io.IOException;3import java.util.Properties;4import org.apache.kafka.clients.consumer.ConsumerConfig;5import org.apache.kafka.clients.consumer.ConsumerRecord;6import org.apache.kafka.clients.consumer.ConsumerRecords;7import org.apache.kafka.clients.consumer.KafkaConsumer;8import org.apache.kafka.common.serialization.StringDeserializer;9import org.apache.kafka.common.serialization.StringSerializer;10public class Consumer {11 public static void main(String[] args) throws IOException {12 Properties props = new Properties();13 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");14 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");15 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());16 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());17 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);18 consumer.subscribe(java.util.Collections.singletonList("test"));19 while (true) {20 ConsumerRecords<String, String> records = consumer.poll(100);21 for (ConsumerRecord<String, String> record : records)22 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());23 }24 }25}26package com.example.kafkacluster;27import java.io.IOException;28import org.apache.kafka.clients.producer.KafkaProducer;29import org.apache.kafka.clients.producer.ProducerConfig;30import org.apache.kafka.clients.producer.ProducerRecord;31import org.apache.kafka.common.serialization.StringSerializer;32public class Producer {33 public static void main(String[] args) throws IOException {34 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(KafkaContainerCluster.getKafkaProducerProperties());35 for (int i = 0; i < 100; i++) {36 ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "test", "test");37 producer.send(record);38 }39 producer.close();40 }41}42package com.example.kafkacluster;43import java.io.IOException;44import org.apache.kafka.clients.consumer.Consumer

Full Screen

Full Screen

KafkaContainerCluster

Using AI Code Generation

copy

Full Screen

1import com.example.kafkacluster.KafkaContainerCluster;2import org.apache.kafka.clients.producer.ProducerRecord;3import org.apache.kafka.clients.producer.RecordMetadata;4import org.apache.kafka.common.serialization.StringSerializer;5import org.apache.kafka.common.serialization.StringDeserializer;6import org.apache.kafka.clients.consumer.ConsumerRecord;7import org.apache.kafka.clients.consumer.ConsumerRecords;8import org.apache.kafka.clients.consumer.KafkaConsumer;9import org.apache.kafka.clients.producer.KafkaProducer;10import org.apache.kafka.clients.producer.ProducerConfig;11import org.apache.kafka.common.serialization.StringDeserializer;12import org.apache.kafka.common.serialization.StringSerializer;13import java.util.Arrays;14import java.util.Properties;15import java.util.concurrent.ExecutionException;16import java.util.concurrent.Future;17import java.util.concurrent.TimeUnit;18import java.util.concurrent.TimeoutException;19import java.util.concurrent.atomic.AtomicInteger;20import java.util.stream.IntStream;21import java.util.concurrent.CountDownLatch;22import java.util.concurrent.ExecutorService;23import java.util.concurrent.Executors;24import java.util.HashMap;25import java.util.Map;26import java.util.concurrent.atomic.AtomicInteger;27import java.util.concurrent.atomic.AtomicLong;28import java.util.concurrent.atomic.AtomicBoolean;29import org.apache.kafka.clients.consumer.OffsetAndMetadata;30import org.apache.kafka.common.TopicPartition;31import org.apache.kafka.common.errors.WakeupException;32import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;33import org.apache.kafka.clients.consumer.ConsumerConfig;34import org.apache.kafka.clients.consumer.Consumer;35import java.util.concurrent.atomic.AtomicReference;36import java.util.List;37import java.util.ArrayList;38import java.util.Iterator;39import java.util.Collections;40import java.util.concurrent.ArrayBlockingQueue;41import java.util.concurrent.BlockingQueue;42import java.util.concurrent.LinkedBlockingQueue;43import java.util.concurrent.TimeUnit;44import java.util.concurrent.Semaphore;45import java.util.concurrent.locks.Lock;46import java.util.concurrent.locks.ReentrantLock;47import java.util.concurrent.locks.Condition;48import java.util.concurrent.locks.ReentrantReadWriteLock;49import java.util.concurrent.locks.ReentrantLock;50import java.util.concurrent.locks.ReentrantReadWriteLock;51import java.util.concurrent.locks.Lock;52import java.util.concurrent.locks.Condition;53import java.util.concurrent.locks.ReentrantLock;54import java.util.concurrent.locks.ReentrantReadWriteLock;55import java.util.concurrent.locks.Lock;56import java.util.concurrent.locks.Condition;57import java.util.concurrent.locks.ReentrantLock;58import java.util.concurrent.locks.ReentrantReadWriteLock;59import java.util.concurrent.locks.Lock;60import java.util.concurrent.locks.Condition;61import java.util.concurrent.lock

Full Screen

Full Screen

KafkaContainerCluster

Using AI Code Generation

copy

Full Screen

1import com.example.kafkacluster.KafkaContainerCluster;2import java.util.Properties;3import java.util.concurrent.ExecutionException;4import org.apache.kafka.clients.admin.AdminClient;5import org.apache.kafka.clients.admin.ListTopicsResult;6import org.apache.kafka.clients.admin.NewTopic;7import org.apache.kafka.clients.producer.KafkaProducer;8import org.apache.kafka.clients.producer.ProducerRecord;9import org.apache.kafka.common.KafkaFuture;10import org.apache.kafka.common.serialization.StringSerializer;11public class KafkaContainerClusterTest {12 public static void main(String[] args) throws ExecutionException, InterruptedException {13 KafkaContainerCluster cluster = new KafkaContainerCluster();14 cluster.startCluster();15 cluster.createTopic("test-topic", 3, 3);16 Properties producerProps = new Properties();17 producerProps.setProperty("bootstrap.servers", cluster.getBootstrapServers());18 producerProps.setProperty("key.serializer", StringSerializer.class.getName());19 producerProps.setProperty("value.serializer", StringSerializer.class.getName());20 KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);21 producer.send(new ProducerRecord<>("test-topic", "test-key", "test-value")).get();22 Properties adminClientProps = new Properties();23 adminClientProps.setProperty("bootstrap.servers", cluster.getBootstrapServers());24 AdminClient adminClient = AdminClient.create(adminClientProps);25 ListTopicsResult listTopicsResult = adminClient.listTopics();26 KafkaFuture<java.util.Set<String>> kafkaFuture = listTopicsResult.names();27 System.out.println(kafkaFuture.get());28 cluster.stopCluster();29 }30}31import com.example.kafkacluster.KafkaContainerCluster;32import java.util.Properties;33import java.util.concurrent.ExecutionException;34import org.apache.kafka.clients.admin.AdminClient;35import org.apache.kafka.clients.admin.ListTopicsResult;36import org.apache.kafka.clients.admin.NewTopic;37import org.apache.kafka.clients.producer.KafkaProducer;38import org.apache.kafka.clients.producer.ProducerRecord;39import org.apache.kafka.common.KafkaFuture;40import org.apache.kafka.common.serialization.StringSerializer;41public class KafkaContainerClusterTest {42 public static void main(String[] args

Full Screen

Full Screen

KafkaContainerCluster

Using AI Code Generation

copy

Full Screen

1import com.example.kafkacluster.KafkaContainerCluster;2import org.apache.kafka.clients.admin.*;3import org.apache.kafka.clients.producer.ProducerRecord;4import org.apache.kafka.common.KafkaFuture;5import org.apache.kafka.common.TopicPartitionInfo;6import org.apache.kafka.common.serialization.StringSerializer;7import org.apache.kafka.common.utils.Utils;8import org.apache.kafka.streams.StreamsConfig;9import org.junit.Test;10import org.junit.runner.RunWith;11import org.springframework.beans.factory.annotation.Autowired;12import org.springframework.boot.test.context.SpringBootTest;13import org.springframework.boot.test.util.TestPropertyValues;14import org.springframework.context.ApplicationContextInitializer;15import org.springframework.context.ConfigurableApplicationContext;16import org.springframework.kafka.core.DefaultKafkaProducerFactory;17import org.springframework.kafka.core.KafkaTemplate;18import org.springframework.kafka.core.ProducerFactory;19import org.springframework.test.context.ContextConfiguration;20import org.springframework.test.context.junit4.SpringRunner;21import java.util.*;22import java.util.concurrent.ExecutionException;23import java.util.concurrent.TimeUnit;24import java.util.concurrent.TimeoutException;25@RunWith(SpringRunner.class)26@ContextConfiguration(initializers = {KafkaClusterTest.Initializer.class})27public class KafkaClusterTest {28 private KafkaContainerCluster kafkaContainerCluster;29 public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {30 public void initialize(ConfigurableApplicationContext configurableApplicationContext) {31 TestPropertyValues.of(32 "spring.kafka.bootstrap-servers=" + kafkaContainerCluster.getBootstrapServers(),33 "spring.kafka.properties." + StreamsConfig.consumerPrefix(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + "=" + kafkaContainerCluster.getBootstrapServers(),34 "spring.kafka.properties." + StreamsConfig.producerPrefix(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + "=" + kafkaContainerCluster.getBootstrapServers(),35 "spring.kafka.properties." + StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) + "=" + "earliest"36 ).applyTo(configurableApplicationContext.getEnvironment());37 }38 }39 public void test() throws ExecutionException, InterruptedException, TimeoutException {40 AdminClient adminClient = AdminClient.create(kafkaContainerCluster.getAdminClientProperties());41 ListTopicsResult listTopicsResult = adminClient.listTopics();42 KafkaFuture<Set<String>> names = listTopicsResult.names();43 Set<String> topicNames = names.get(10, TimeUnit.SECONDS);44 System.out.println("Topic names: " + topicNames);45 if (topicNames.contains("topic1")) {

Full Screen

Full Screen

KafkaContainerCluster

Using AI Code Generation

copy

Full Screen

1import com.example.kafkacluster.KafkaContainerCluster;2public class KafkaCluster {3 public static void main(String[] args) {4 KafkaContainerCluster cluster = new KafkaContainerCluster(3);5 cluster.start();6 cluster.stop();7 }8}

Full Screen

Full Screen

KafkaContainerCluster

Using AI Code Generation

copy

Full Screen

1import com.example.kafkacluster.KafkaContainerCluster;2public class KafkaClusterMainClass {3 public static void main(String[] args) {4 KafkaContainerCluster kafkaContainerCluster = new KafkaContainerCluster();5 kafkaContainerCluster.startCluster();6 }7}8import com.example.kafkacluster.KafkaContainerCluster;9public class KafkaClusterMainClass {10 public static void main(String[] args) {11 KafkaContainerCluster kafkaContainerCluster = new KafkaContainerCluster();12 kafkaContainerCluster.startCluster();13 }14}15package com.example.kafkacluster;16import org.testcontainers.containers.KafkaContainer;17public class KafkaContainerCluster {18 private KafkaContainer kafkaContainer = null;19 public void startCluster() {20 kafkaContainer = new KafkaContainer();21 kafkaContainer.start();22 }23}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testcontainers-java automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Most used methods in KafkaContainerCluster

Test Your Web Or Mobile Apps On 3000+ Browsers

Signup for free

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful