Produce and Consume Events from Azure EventHubs in .NET – Part II
Today in this article, we will see how to create consumer applications for receiving the events from Azure EventHubs using the C# .NET application.
Today in this article, we will cover below aspects,
We already discussed on how to send or create a producer application in the previous article.
Please visit the below link for the Producer example,
What is Event Hub and why to use it?
As we already discussed in our previous article Azure Event Hub is a streaming platform and event ingestion service in the Azure Cloud environment.
Below are the main characteristics of EventHub,
Main characteristics,
- Azure Event Hubs is a streaming platform and provides a distributed stream processing platform with data and analytics.
- Provides the ability to process millions of events per second.
- Ability to transform the data using a different provider or adapter of your choice
- Event Hubs provide a time retention policy buffer.
- It helps to decouple event producers from event consumers.
- Support for real-time and batch processing.
Prerequisites
- Please create your Virtual network with the required Infra resources
- Please create an Azure Storage account
- Please create Blob Container
Getting started
Let’s start creating C# .NET Core console application as below,
Add NuGet package called Azure.Messaging.EventHubs
Using Package Manager Console,
PM> Install-Package Azure.Messaging.EventHubs
PM> Install-Package Azure.Messaging.EventHubs.Processor
Or
Using NuGet Package Manager
I shall be loading the configuration as below,
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.AddJsonFile($"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.json", optional: true);
var configuration = builder.Build();
Below is my configuration file with details on the Blob Connection string, Blob Container Name, Event Hub namespace connection string, Event Name, etc.
{
"BlobConnectionString": "DefaultEndpointsProtocol=https;AccountName=vstorageaccount;AccountKey=<Your key>;EndpointSuffix=core.windows.net",
"BlobContainerName": "thecodebuzz-container",
"HubNameSpaceConString": "Endpoint=sb://<Your Connection String >.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<Your Key>",
"EventHubName": "firstevent"
}
Below is how we are fetching the configuration details from the appsettings.json file,
string blobStorageConnectionString = configuration["BlobConnectionString"];
string blobContainerName = configuration["BlobContainerName"];
string ehubNamespaceConnectionString = configuration["HubNameSpaceConString"];
string eventHubName = configuration["EventHubName"];
Below is the complete Consumer side code,
// Create a BlobContainerClient object
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
// Create an EventProcessorClient client object
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
// Start the processing
await processor.StartProcessingAsync();
// Wait for 10 seconds for the events to be processed
await Task.Delay(TimeSpan.FromSeconds(10));
// Stop the processing
await processor.StopProcessingAsync();
In the above code,
- We are using the EventProcessorClient to create a Consumer class object. This instance allows for consuming and processing events across all partitions of a given Event Hub within a specific consumer group.
- We are passing the Blob Container client, Consumer group, Event Hubs namespace and event Hub name as input to EventProcessorClient.
- ProcessEventHandler– Is the event handler for processing the events.
- StartProcessingAsync() method signals the start EventProcessorClient to begin the processing.
Below is how we can handle the events and perform post-actions on the data.
In the below example, we are simply printing the records on the console.
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine(@"Recevied : {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
For Error handling, please use the below handler to catch any exceptions,
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"Partition '{ eventArgs.PartitionId}': an unhandled exception was encountered.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
The above code will start consuming the real-time data from Event Hub successfully.
Let’s send or produce the 2 batches of 10 records each to Event Hub using the previous i.e Producer application.
Please visit the below link for the Producer example,
Azure Event Hubs
Below shows the Consumer start consuming and process the events sent by the producer.
Similarly below shows full Metrics with details on Messages and Throughput (in Bytes of data) loaded to Event Hub,
Do you have any comments or ideas or any better suggestions to share?
Please sound off your comments below.
Happy Coding !!
References:
Summary
Today in this article, we learned, how to configure Event Hubs using C# .NET. We created a Producer and Consumer application for the Azure Event Hub. We understood how Event Hub provides a distributed stream processing of data and analytics services and helps you process millions of records with low latency and easy integration.
Please bookmark this page and share it with your friends. Please Subscribe to the blog to receive notifications on freshly published(2024) best practices and guidelines for software design and development.