Getting Started with Kafka in ASP.NET Core

Kafka in ASPNET Core

In this article, we shall see how to get started with Kafka in ASP.NET Core. We will see how to use Kafka Consumer and Producer client in ASP.NET Core 3.1 or 6.0 API.

Here we will create an API that will act as a traditional service listening to business events continuously (especially if your API design follows Choreography or Orchestration architecture, etc.)

Today in this article, we will cover below aspects,

Recently I had a requirement to create an ASP.NET Core API with the ability to subscribe to messages/events.

The service needed to be hosted in the cloud with the ability to subscribe to a business event/messaging to happen.

This I wanted to achieve apart from Controller endpoints exposed to other responsibilities.

Basically, the Producer/Publisher will produce an event or message, and when the message is available the API will act as a consumer who will consume these messages.

You could turn your API into a Consumer or Publisher/Producer as per your requirement.

What is KAFKA

Kafka is a popular messaging system that facilitates the implementation of event-driven architectures

Kafka, developed by Apache, excels at handling high-throughput, real-time data streaming.

It acts as a distributed event streaming platform, where events are produced and consumed by various applications.

Kindly visit the article for understanding Kafka Consumer and Producer examples using ASP.NET Core.

Once it receives the event/message, the messages would be flown to the downstream database and respective services.

Getting started

Create ASP.NET Core API using 3.1 or 6.0 .NET Core version.

Kafka vs RabbitMQ or IBM MQ
Using ASPNET Core API App as Service Windows service

Install Confluent.Kafka NuGet package

Install below the Nuget package from Nuget Package Manager.

The below Nuget package is officially supported by Confluent.

Using Package Manager Console,

 Install-Package Confluent.Kafka -Version 1.5.1 

Or

Using Command prompts,

 dotnet add package -v 1.5.1 Confluent.Kafka 

Note – Please use the latest version available

Please visit the below article for basic Consumer and Publisher examples below,

Register Hosted Service

Please update the Startup.cs for the following code.

 public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            services.AddHostedService<TheCodeBuzzConsumer>();
        }

In the above code, we have used AddHostedService() an extension to register Microsoft.Extensions.Hosting.IHostedService to register.

This was using class TheCodeBuzzConsumer which ultimately indirectly implements IHostedService.

Alternatively, you can register HostedService as below as well,

 public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                })
               .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<TheCodeBuzzConsumer>();
                });
    }

IoC DI – Registering Kafka ConsumerConfig

Kindly add the below code to ConfigureServices().

Here we are setting up the Kafka ConsumerConfig class with configuration details using setting up type-safe configuration technique.

We are setting up BootstrapServers, Consumer group, Kafka account details, etc config details.

Please note we are creating a Singleton instance of ConsumerConfig so that only one instance will be used to serve all the requests.

kafka c#, kafka .net, c# kafka, c# kafka consumer, kafka consumer c#, c# kafka consumer example, c# kafka producer example, kafka net, kafka c# example, c# kafka example

Create Backgroundworker class

Create a worker class called TheCodeBuzzConsumer.

This class needs to be derived from the class abstract BackgroundService.

This abstract class exposes a method called ExecuteAsync() which needs to be derived by our worker class.

BackgroundService class also gives you access to a few other virtual methods like StartAsync() and StopAsync() which can be used as needed.

Understanding ExecuteAsync

ExecuteAsync is an abstract method that is needed to implement the derived class. This method gets called as soon as IHostedService starts.

The implementation should return a task that represents the lifetime of the long-running operation(s) being performed.

Use this method using a while loop i.e until cancellation is requested.

Below is the implementation ExecuteAsync,

       protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            Task.Run(() => Start(stoppingToken));
            return Task.CompletedTask;
        }

        private void Start(CancellationToken stoppingToken)
        {
            //Perform your business logic here.
            throw new NotImplementedException();
        }
    }

Below is the final implementation for Kafka Consumer Builder and Subscription.

Kafka ASPNET Core c examples

Once executed the method starts invoking and executing every after 1 second and receives the event/messages which will get printed as below,

Kafka in ASPNET Core

Above is the plain vanilla implementation for you to get started.

You can extend the above implementation for any of the use cases you are dealing with.

References:

Watch this,

That’s All! Thank you!

Do you have any comments or ideas or any better suggestions to share?

Please sound off your comments below.

Happy Coding !!

Summary

Today in this article we learned how to enable ASP.NET Core API or MVC application as a service so that one can leverage Pub-Sub capability or Event-driven architecture capability into your existing application.

This technique is useful when dealing with creating a Message/Event-driven architecture-based application using example MQ( RabbitMQ or IBM MQ) or Kafka.



Please bookmark this page and share it with your friends. Please Subscribe to the blog to receive notifications on freshly published(2024) best practices and guidelines for software design and development.



6 thoughts on “Getting Started with Kafka in ASP.NET Core

    1. Hi Jayesh- The messages read will happen based on the Kafka-consumer group. This unique group ensures you read unread messages. It also ensures required offset is maintained for Kafka topic messages.Hope this helps.

  1. Nice example.
    Can you make realtim example for all kafka small setup with source api and target api with produce and consume.

  2. Implemented the kafka consumer in .Net core web api same as described in tis post, but it didn’t work.
    var cr = c.Consume(cts.token) code stuck here ,no message received by consumer. However if I implement consumer in console application it is working.

Leave a Reply

Your email address will not be published. Required fields are marked *