Best Testcontainers-dotnet code snippet using DotNet.Testcontainers.Configurations.KafkaTestcontainerConfiguration
KafkaTestcontainerConfiguration.cs
Source:KafkaTestcontainerConfiguration.cs
...9 using DotNet.Testcontainers.Containers;10 using JetBrains.Annotations;11 /// <inheritdoc cref="TestcontainerMessageBrokerConfiguration" />12 [PublicAPI]13 public class KafkaTestcontainerConfiguration : TestcontainerMessageBrokerConfiguration14 {15 private const string KafkaImage = "confluentinc/cp-kafka:6.0.5";16 private const string StartupScriptPath = "/testcontainers_start.sh";17 private const int KafkaPort = 9092;18 private const int BrokerPort = 9093;19 private const int ZookeeperPort = 2181;20 /// <summary>21 /// Initializes a new instance of the <see cref="KafkaTestcontainerConfiguration" /> class.22 /// </summary>23 public KafkaTestcontainerConfiguration()24 : this(KafkaImage)25 {26 }27 /// <summary>28 /// Initializes a new instance of the <see cref="KafkaTestcontainerConfiguration" /> class.29 /// </summary>30 /// <param name="image">The Docker image.</param>31 public KafkaTestcontainerConfiguration(string image)32 : base(image, KafkaPort)33 {34 // Use two listeners with different names, it will force Kafka to communicate with itself via internal35 // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener.36 this.Environments.Add("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");37 this.Environments.Add("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{this.DefaultPort},BROKER://0.0.0.0:{BrokerPort}");38 this.Environments.Add("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");39 this.Environments.Add("KAFKA_BROKER_ID", "1");40 this.Environments.Add("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");41 this.Environments.Add("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");42 this.Environments.Add("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString(CultureInfo.InvariantCulture));43 this.Environments.Add("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");44 this.Environments.Add("KAFKA_ZOOKEEPER_CONNECT", $"localhost:{ZookeeperPort}");45 }...
Program.cs
Source:Program.cs
...37 }38}39// async Task Kafka2(){40// await Task.Delay(1);41// KafkaTestcontainerConfiguration configuration = new KafkaTestcontainerConfiguration();42// var kafkaBuilder1 = new DotNet.Testcontainers.Builders.TestcontainersBuilder<KafkaTestcontainer>();43// var kafkaBuilder = kafkaBuilder1.WithKafka(configuration);44// kafkaBuilder.Build();45// }46async Task Kafka()47{48 var kafkaBuilder = new DotNet.Testcontainers.Builders.TestcontainersBuilder<DotNet.Testcontainers.Containers.KafkaTestcontainer>();49 var config = new KafkaTestcontainerConfiguration();50 var kafkaBuilder2 = kafkaBuilder.WithKafka(config);51 await using (var testcontainers = kafkaBuilder2.Build())52 {53 await testcontainers.StartAsync();54 WriteLine($"Started Kafka. Connect with {testcontainers.BootstrapServers}");55 // localhost:4915856 WriteLine("Press enter to end it");57 ReadLine();58 }59}...
KafkaFixture.cs
Source:KafkaFixture.cs
...9 using Xunit;10 [UsedImplicitly]11 public sealed class KafkaFixture : IAsyncLifetime, IDisposable12 {13 private readonly KafkaTestcontainerConfiguration configuration = new KafkaTestcontainerConfiguration();14 public KafkaFixture()15 {16 this.Container = new TestcontainersBuilder<KafkaTestcontainer>()17 .WithKafka(this.configuration)18 .Build();19 }20 public KafkaTestcontainer Container { get; }21 public Task InitializeAsync()22 {23 return this.Container.StartAsync();24 }25 public Task DisposeAsync()26 {27 return this.Container.DisposeAsync().AsTask();...
KafkaTestcontainerConfiguration
Using AI Code Generation
1using DotNet.Testcontainers.Configurations;2using DotNet.Testcontainers.Containers;3using DotNet.Testcontainers.Containers.Builders;4using DotNet.Testcontainers.Containers.Modules;5using DotNet.Testcontainers.Containers.WaitStrategies;6using DotNet.Testcontainers.Images;7using DotNet.Testcontainers.Images.Builders;8using System;9using System.Threading.Tasks;10{11 {12 public static async Task Main(string[] args)13 {14 {15 Env = new[] { "ALLOW_PLAINTEXT_LISTENER=yes" },16 PortBinding = new[] { "9092:9092" },
KafkaTestcontainerConfiguration
Using AI Code Generation
1using System;2using System.Text;3using System.Threading;4using System.Threading.Tasks;5using Confluent.Kafka;6using DotNet.Testcontainers.Containers.Builders;7using DotNet.Testcontainers.Containers.Configurations.Databases;8using DotNet.Testcontainers.Containers.Configurations.MessageBrokers;9using DotNet.Testcontainers.Containers.Modules.Databases;10using DotNet.Testcontainers.Containers.Modules.MessageBrokers;11using DotNet.Testcontainers.Containers.WaitStrategies;12{13 {14 static async Task Main(string[] args)15 {16 Console.WriteLine("Hello World!");17 {18 WaitStrategy = Wait.ForUnixContainer().UntilPortIsAvailable(9092)19 };20 var kafkaTestcontainer = new TestcontainersBuilder<KafkaTestcontainer>()21 .WithConfiguration(kafkaTestcontainerConfiguration)22 .Build();23 await kafkaTestcontainer.StartAsync();24 var bootstrapServers = kafkaTestcontainer.BootstrapServers;25 var config = new ProducerConfig { BootstrapServers = bootstrapServers };26 using (var producer = new ProducerBuilder<Null, string>(config).Build())27 {28 {29 var dr = await producer.ProduceAsync("test-topic", new Message<Null, string> { Value = "test message" });30 Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");31 }32 catch (ProduceException<Null, string> e)33 {34 Console.WriteLine($"Delivery failed: {e.Error.Reason}");35 }36 }37 await kafkaTestcontainer.StopAsync();38 }39 }40}
KafkaTestcontainerConfiguration
Using AI Code Generation
1using DotNet.Testcontainers.Configurations;2using DotNet.Testcontainers.Containers.Builders;3using DotNet.Testcontainers.Containers.Modules;4using DotNet.Testcontainers.Containers.WaitStrategies;5using DotNet.Testcontainers.Images;6{7 {8 public KafkaTestcontainerConfiguration()9 {10 this.Image = "confluentinc/cp-kafka:5.3.1";11 this.WaitStrategy = Wait.ForUnixContainer().UntilPortIsAvailable(9092);12 }13 }14}15using DotNet.Testcontainers.Containers;16using DotNet.Testcontainers.Containers.Builders;17using DotNet.Testcontainers.Containers.Modules;18using DotNet.Testcontainers.Containers.WaitStrategies;19using DotNet.Testcontainers.Images;20using DotNet.Testcontainers.Tests.Fixtures;21using Xunit;22{23 {24 public KafkaTestcontainer Kafka { get; private set; }25 public async Task InitializeAsync()26 {27 this.Kafka = new TestcontainersBuilder<KafkaTestcontainer>()28 .WithConfiguration(new KafkaTestcontainerConfiguration())29 .Build();30 await this.Kafka.StartAsync();31 }32 public async Task DisposeAsync()33 {34 await this.Kafka.StopAsync();35 }36 }37}38using DotNet.Testcontainers.Containers;39using DotNet.Testcontainers.Containers.Builders;40using DotNet.Testcontainers.Containers.Modules;41using DotNet.Testcontainers.Containers.WaitStrategies;42using DotNet.Testcontainers.Images;43using DotNet.Testcontainers.Tests.Fixtures;44using Xunit;45{46 {47 private readonly KafkaFixture fixture;48 public KafkaTest(KafkaFixture fixture)49 {50 this.fixture = fixture;51 }52 public void KafkaShouldBeRunning()53 {54 Assert.True(this.fixture.Kafka.IsRunning);55 }56 }57}
KafkaTestcontainerConfiguration
Using AI Code Generation
1{2 {3 }4};5{6 WaitStrategy = Wait.ForUnixContainer().UntilPortIsAvailable(9092),7};8await kafkaTestcontainer.StartAsync();9{10 {11 }12};13{14 WaitStrategy = Wait.ForUnixContainer().UntilPortIsAvailable(9092),15};16await kafkaTestcontainer.StartAsync();17{18 {19 }20};21{22 WaitStrategy = Wait.ForUnixContainer().UntilPortIsAvailable(9092),23};24await kafkaTestcontainer.StartAsync();
KafkaTestcontainerConfiguration
Using AI Code Generation
1using DotNet.Testcontainers.Configurations;2using System;3using System.Collections.Generic;4using System.Linq;5using System.Text;6using System.Threading.Tasks;7{8 {9 public KafkaTestcontainerConfiguration(int port = 9092) : base("confluentinc/cp-kafka")10 {11 this.Port = port;12 this.Environments.Add("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");13 this.Environments.Add("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");14 }15 }16}17using DotNet.Testcontainers.Configurations;18using System;19using System.Collections.Generic;20using System.Linq;21using System.Text;22using System.Threading.Tasks;23{24 {25 public KafkaTestcontainerConfiguration(int port = 9092) : base("confluentinc/cp-kafka")26 {27 this.Port = port;28 this.Environments.Add("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");29 this.Environments.Add("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");30 }31 }32}33using DotNet.Testcontainers.Configurations;34using System;35using System.Collections.Generic;36using System.Linq;37using System.Text;38using System.Threading.Tasks;39{40 {41 public KafkaTestcontainerConfiguration(int port = 9092) : base("confluentinc/cp-kafka")42 {43 this.Port = port;44 this.Environments.Add("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");45 this.Environments.Add("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");
KafkaTestcontainerConfiguration
Using AI Code Generation
1using DotNet.Testcontainers.Configurations;2using DotNet.Testcontainers.Containers.Modules.Databases;3using DotNet.Testcontainers.Containers.Modules.MessageBrokers;4using DotNet.Testcontainers.Containers.WaitStrategies;5using DotNet.Testcontainers.Tests.Fixtures;6using Xunit;7{8 {9 private readonly KafkaTestcontainerFixture fixture;10 public KafkaTestcontainerTest(KafkaTestcontainerFixture fixture)11 {12 this.fixture = fixture;13 }14 public async Task KafkaTestcontainerIsRunning()15 {16 var isRunning = await this.fixture.Kafka.IsRunningAsync();17 Assert.True(isRunning);18 }19 }20}21using DotNet.Testcontainers.Configurations;22using DotNet.Testcontainers.Containers.Modules.Databases;23using DotNet.Testcontainers.Containers.Modules.MessageBrokers;24using DotNet.Testcontainers.Containers.WaitStrategies;25using DotNet.Testcontainers.Tests.Fixtures;26using Xunit;27{28 {29 private readonly KafkaTestcontainerFixture fixture;30 public KafkaTestcontainerTest(KafkaTestcontainerFixture fixture)31 {32 this.fixture = fixture;33 }34 public async Task KafkaTestcontainerIsRunning()35 {36 var isRunning = await this.fixture.Kafka.IsRunningAsync();37 Assert.True(isRunning);38 }39 }40}41using DotNet.Testcontainers.Configurations;42using DotNet.Testcontainers.Containers.Modules.Databases;43using DotNet.Testcontainers.Containers.Modules.MessageBrokers;44using DotNet.Testcontainers.Containers.WaitStrategies;45using DotNet.Testcontainers.Tests.Fixtures;46using Xunit;47{48 {
KafkaTestcontainerConfiguration
Using AI Code Generation
1using DotNet.Testcontainers.Configurations;2using Xunit;3{4 {5 public void KafkaTest()6 {7 var kafkaTestcontainerConfiguration = new KafkaTestcontainerConfiguration();8 var kafkaTestcontainer = new KafkaTestcontainer(kafkaTestcontainerConfiguration);9 kafkaTestcontainer.Start();10 }11 }12}13using DotNet.Testcontainers.Configurations;14using Xunit;15{16 {17 public void KafkaTest()18 {19 {20 };21 var kafkaTestcontainer = new KafkaTestcontainer(kafkaTestcontainerConfiguration);22 kafkaTestcontainer.Start();23 }24 }25}26using DotNet.Testcontainers.Configurations;27using Xunit;28{29 {30 public void KafkaTest()31 {32 {33 };34 var kafkaTestcontainer = new KafkaTestcontainer(kafkaTestcontainerConfiguration);35 kafkaTestcontainer.Start();36 }37 }38}39using DotNet.Testcontainers.Configurations;40using Xunit;41{42 {43 public void KafkaTest()44 {
KafkaTestcontainerConfiguration
Using AI Code Generation
1var kafkaTestcontainerConfiguration = new KafkaTestcontainerConfiguration();2var kafkaTestcontainer = new KafkaTestcontainer(kafkaTestcontainerConfiguration);3await kafkaTestcontainer.StartAsync();4var kafkaHost = kafkaTestcontainer.Hostname;5var kafkaPort = kafkaTestcontainer.Port;6Console.WriteLine($"Kafka container started with host {kafkaHost} and port {kafkaPort}");7var kafkaTestcontainerConfiguration = new KafkaTestcontainerConfiguration();8var kafkaTestcontainer = new KafkaTestcontainer(kafkaTestcontainerConfiguration);9await kafkaTestcontainer.StartAsync();10var kafkaHost = kafkaTestcontainer.Hostname;11var kafkaPort = kafkaTestcontainer.Port;12Console.WriteLine($"Kafka container started with host {kafkaHost} and port {kafkaPort}");13var kafkaTestcontainerConfiguration = new KafkaTestcontainerConfiguration();14var kafkaTestcontainer = new KafkaTestcontainer(kafkaTestcontainerConfiguration);15await kafkaTestcontainer.StartAsync();16var kafkaHost = kafkaTestcontainer.Hostname;17var kafkaPort = kafkaTestcontainer.Port;18Console.WriteLine($"Kafka container started with host {kafkaHost} and port {kafkaPort}");19var kafkaTestcontainerConfiguration = new KafkaTestcontainerConfiguration();20var kafkaTestcontainer = new KafkaTestcontainer(kafkaTestcontainerConfiguration);21await kafkaTestcontainer.StartAsync();22var kafkaHost = kafkaTestcontainer.Hostname;23var kafkaPort = kafkaTestcontainer.Port;24Console.WriteLine($"Kafka container started with host {kafkaHost} and port {kafkaPort}");25var kafkaTestcontainerConfiguration = new KafkaTestcontainerConfiguration();26var kafkaTestcontainer = new KafkaTestcontainer(kafkaTestcontainerConfiguration);27await kafkaTestcontainer.StartAsync();
KafkaTestcontainerConfiguration
Using AI Code Generation
1var kafkaTestcontainerConfiguration = new KafkaTestcontainerConfiguration();2var kafkaTestcontainer = new TestcontainersContainer(kafkaTestcontainerConfiguration)3 .WithExposedPorts(9092)4 .WithPortBinding(9092)5 .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(9092));6await kafkaTestcontainer.StartAsync();7var kafkaConnection = new KafkaConnection(kafkaTestcontainer.GetDockerHostIpAddress(), 8 kafkaTestcontainer.GetMappedPublicPort(9092));9var kafkaProducer = new KafkaProducer(kafkaConnection, "test-topic");10var kafkaConsumer = new KafkaConsumer(kafkaConnection, "test-topic");11await kafkaConsumer.StartAsync();12await kafkaProducer.ProduceAsync("test message");13var kafkaMessage = await kafkaConsumer.ConsumeAsync();14Console.WriteLine(kafkaMessage);15await kafkaConsumer.StopAsync();16await kafkaTestcontainer.StopAsync();17var kafkaTestcontainerConfiguration = new KafkaTestcontainerConfiguration();18var kafkaTestcontainer = new TestcontainersContainer(kafkaTestcontainerConfiguration)19 .WithExposedPorts(9092)20 .WithPortBinding(9092)21 .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(9092));22await kafkaTestcontainer.StartAsync();23var kafkaConnection = new KafkaConnection(kafkaTestcontainer.GetDockerHostIpAddress(), 24 kafkaTestcontainer.GetMappedPublicPort(9092));25var kafkaProducer = new KafkaProducer(kafkaConnection, "test-topic");26var kafkaConsumer = new KafkaConsumer(kafkaConnection, "test-topic");27await kafkaConsumer.StartAsync();28await kafkaProducer.ProduceAsync("test message");29var kafkaMessage = await kafkaConsumer.ConsumeAsync();30Console.WriteLine(kafkaMessage);31await kafkaConsumer.StopAsync();32await kafkaTestcontainer.StopAsync();33var kafkaTestcontainerConfiguration = new KafkaTestcontainerConfiguration();34var kafkaTestcontainer = new TestcontainersContainer(kafkaTestcontainerConfiguration)35 .WithExposedPorts(9092)36 .WithPortBinding(9092)37 .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(9092));
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!