Apache Kafka C#.NET – Producer and Consumer with examples
Today in this series of Kafka .net core tutorial articles, we will learn Kafka C#.NET-Producer and Consumer examples.
We will use the .NET Core C# Client application that consumes messages from an Apache Kafka cluster.
So we shall be basically creating a Kafka Consumer client consuming the Kafka topic messages.
Today in this article, we will cover below aspects of Kafka Consumer configuration,
You can create a Kafka cluster using any of the below approaches,
- Confluent Cloud Cluster
- Your localhost cluster(if any)
- Remote Kafka cluster(Any)
Below discussed approach can be used for any of the above Kafka clusters configured.
We shall connect to the Confluent cluster hosted in the cloud. Here we will configure our client with the required cluster credentials and try to start messages from Kafka topics using the consumer client.
The Kafka Producer example is already discussed below article,
Getting started
Create .NET Core application( .NET Core 3.1 or 5 ,net45, netstandard1.3, netstandard2.0 and above)
Install below the Nuget package from Nuget Package Manager. The below Nuget package is officially supported by Confluent.
Using Package Manager Console,
Install-Package Confluent.Kafka -Version 1.5.1
Or
Using Command prompts,
dotnet add package -v 1.5.1 Confluent.Kafka
Note: Please use the latest available version of the NuGet package
Define Consumer Configuration
Define Consumer configuration using the class ConsumerConfig. Please make sure to define config details like BootstrapServers etc.
Define properties like SaslMechanism or SecurityProtocol accordingly.
SaslUsername and SaslPassword properties can be defined from CLI or Cloud interface.
This NuGet package comes with all basic classes and methods which let you define the configuration.
Please define the class ConsumerConfig. This class initializes a new Confluent.Kafka.ConsumerConfig instance wrapping an existing Confluent.Kafka.ClientConfig instance.
var cConfig= new ConsumerConfig
{
BootstrapServers = "xxxxx",
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = "xxxxxxx",
SaslPassword = "xxxxx+",
GroupId = Guid.NewGuid().ToString(),
AutoOffsetReset = AutoOffsetReset.Earliest
};
The above configuration is currently hardcoded but you can use Configurationbuilder to load them from the configuration file easily.
Kafka C#.NET – Consume Message from Kafka Topics
Let’s C# .net core Kafka consumer and Consume the message from Kafka Topics.
Subscribe to Topics
ConsumerBuilder class to build the configuration instance. This class exposes the Subscribe() method which lets you subscribe to a single Kafka topic.
Let’s use the above-defined config and build it with ProducerBuilder,
Poll the Message/Event
Please use another method Consume which lets you poll the message/event until the result is available.
using (var c = new ConsumerBuilder<Ignore, string>(cConfig).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Close and Release all the resources held by this consumer
c.Close();
}
}
}
TopicPartitionOffset represents a Kafka detail on Topic, Partition, and Offset details.
Kafka Consumer example.
In the above example, we are consuming 100 messages from the Kafka topics which we produced using the Producer example we learned in the previous article.
We had published messages with incremental values “Test1”, “Test2″…. and so on, and here we are consuming them in the same order to keep the message flow simple here.
Below is how Kafka’s topic shows Consumed messages.
When there is no message in the blocked topic, after a certain period of time, you will timeout error below,
%5|1598190018.518|REQTMOUT|rdkafka#consumer-1| [thrd:sasl_ssl://abcd.......cloud:xxxx/2]: sasl_ssl://abcd.......cloud:xxxx/2: Timed out FetchRequest in flight (after 359947ms, timeout #0) %4|1598190018.840|REQTMOUT|rdkafka#consumer-1| [thrd:sasl_ssl://abcd....cloud:xxxx/2]: sasl_ssl://abcd.xxxxx....cloud:xxxx/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
Once executed below are the results Consuming the Kafka topics with messages,
That’s All! This was very much the basics of getting started with the Apache Kafka C# .NET client.
References:
Do you have any comments or ideas or any better suggestions to share?
Please sound off your comments below.
Happy Coding !!
Summary
It’s simple to use the .NET Client application consuming messages from an Apache Kafka. Confluent Kafka is a lightweight wrapper around librdkafka that provides an easy interface for Consumer clients consuming the Kafka Topic messages by subscribing to the Topic and polling the message/event as required.
Please bookmark this page and share it with your friends. Please Subscribe to the blog to receive notifications on freshly published(2024) best practices and guidelines for software design and development.
How to do he retry, if any failures happen while consuming.
Hi Magesh- You could try basic retry for any failure. Please see here