Produce and Consume Events from Azure EventHubs in .NET – Part II

Send and Receive Events from Azure Event Hubs Consumer example

Today in this article, we will see how to create consumer applications for receiving the events from Azure EventHubs using the C# .NET application.

Today in this article, we will cover below aspects,

We already discussed on how to send or create a producer application in the previous article.

Please visit the below link for the Producer example,

What is Event Hub and why to use it?

As we already discussed in our previous article Azure Event Hub is a streaming platform and event ingestion service in the Azure Cloud environment.

Below are the main characteristics of EventHub,

Main characteristics,

  • Azure Event Hubs is a streaming platform and provides a distributed stream processing platform with data and analytics.
  • Provides the ability to process millions of events per second.
  • Ability to transform the data using a different provider or adapter of your choice
  • Event Hubs provide a time retention policy buffer.
  • It helps to decouple event producers from event consumers.
  • Support for real-time and batch processing.

Prerequisites

  • Please create your Virtual network with the required Infra resources
  • Please create an Azure Storage account
  • Please create Blob Container

Getting started

Let’s start creating C# .NET Core console application as below,

Azure Event Hubs producer and consumer example

Add NuGet package called Azure.Messaging.EventHubs

Using Package Manager Console,

PM> Install-Package Azure.Messaging.EventHubs
PM> Install-Package Azure.Messaging.EventHubs.Processor

Or

Using NuGet Package Manager

AzureMessagingEventHubsProcessor

I shall be loading the configuration as below,

var builder = new ConfigurationBuilder()
                     .SetBasePath(Directory.GetCurrentDirectory())
                     .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
                     .AddJsonFile($"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.json", optional: true);

            var configuration = builder.Build();

Below is my configuration file with details on the Blob Connection string, Blob Container Name, Event Hub namespace connection string, Event Name, etc.

{
  "BlobConnectionString": "DefaultEndpointsProtocol=https;AccountName=vstorageaccount;AccountKey=<Your key>;EndpointSuffix=core.windows.net",
  "BlobContainerName": "thecodebuzz-container",
  "HubNameSpaceConString": "Endpoint=sb://<Your Connection String >.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<Your Key>",
  "EventHubName": "firstevent"
}

Below is how we are fetching the configuration details from the appsettings.json file,

            
            string blobStorageConnectionString = configuration["BlobConnectionString"];

            string blobContainerName = configuration["BlobContainerName"];

            string ehubNamespaceConnectionString = configuration["HubNameSpaceConString"];

            string eventHubName = configuration["EventHubName"];

Below is the complete Consumer side code,

           // Create a BlobContainerClient object
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);

            // Create an EventProcessorClient client object
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);

            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;

            // Start the processing

            await processor.StartProcessingAsync();

            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));

            // Stop the processing
            await processor.StopProcessingAsync();

In the above code,

  • We are using the EventProcessorClient to create a Consumer class object. This instance allows for consuming and processing events across all partitions of a given Event Hub within a specific consumer group.
  • We are passing the Blob Container client, Consumer group, Event Hubs namespace and event Hub name as input to EventProcessorClient.
  • ProcessEventHandler– Is the event handler for processing the events.
  • StartProcessingAsync() method signals the start EventProcessorClient to begin the processing.

Below is how we can handle the events and perform post-actions on the data.

In the below example, we are simply printing the records on the console.

 static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            Console.WriteLine(@"Recevied : {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }

For Error handling, please use the below handler to catch any exceptions,

  
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            Console.WriteLine($"Partition '{ eventArgs.PartitionId}': an unhandled exception was encountered.");

            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }

The above code will start consuming the real-time data from Event Hub successfully.

Let’s send or produce the 2 batches of 10 records each to Event Hub using the previous i.e Producer application.

Please visit the below link for the Producer example,

Azure Event Hubs

Below shows the Consumer start consuming and process the events sent by the producer.

Azure EventHubs Consumer example

Similarly below shows full Metrics with details on Messages and Throughput (in Bytes of data) loaded to Event Hub,

Azure EventHubs

Do you have any comments or ideas or any better suggestions to share?

Please sound off your comments below.

Happy Coding !!

References:

Summary

Today in this article, we learned, how to configure Event Hubs using C# .NET. We created a Producer and Consumer application for the Azure Event Hub. We understood how Event Hub provides a distributed stream processing of data and analytics services and helps you process millions of records with low latency and easy integration.



Please bookmark this page and share it with your friends. Please Subscribe to the blog to receive notifications on freshly published(2024) best practices and guidelines for software design and development.



Leave a Reply

Your email address will not be published. Required fields are marked *