In this article, you use C# and the .NET CLI to create two applications that will produce and consume events from Apache Kafka.
By the end of the article, you will know how to use KafkaFlow to either Produce or Consume events from Apache Kafka.
Prerequisites OverviewYou will create two applications:
To connect them, you will be running an Apache Kafka cluster using Docker.
Steps 1. Create a folder for your applicationsCreate a new folder with the name KafkaFlowQuickstart.
2. Setup Apache KafkaInside the folder from step 1, create a docker-compose.yml
file. You can download it from here.
Using your terminal of choice, start the cluster.
4. Create Producer ProjectRun the following command to create a Console Project named Producer.
dotnet new console --name Producer
5. Install KafkaFlow packages
Inside the Producer project directory, run the following commands to install the required packages.
dotnet add package KafkaFlow
dotnet add package KafkaFlow.Microsoft.DependencyInjection
dotnet add package KafkaFlow.LogHandler.Console
dotnet add package KafkaFlow.Serializer.JsonCore
dotnet add package Microsoft.Extensions.DependencyInjection
6. Create the Message contract
Add a new class file named HelloMessage.cs and add the following example:
namespace Producer;
public class HelloMessage
{
public string Text { get; set; } = default!;
}
7. Create message sender
Replace the content of the Program.cs with the following example:
using Microsoft.Extensions.DependencyInjection;
using KafkaFlow.Producers;
using KafkaFlow.Serializer;
using KafkaFlow;
using Producer;
var services = new ServiceCollection();
const string topicName = "sample-topic";
const string producerName = "say-hello";
services.AddKafka(
kafka => kafka
.UseConsoleLog()
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.CreateTopicIfNotExists(topicName, 1, 1)
.AddProducer(
producerName,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(m =>
m.AddSerializer<JsonCoreSerializer>()
)
)
)
);
var serviceProvider = services.BuildServiceProvider();
var producer = serviceProvider
.GetRequiredService<IProducerAccessor>()
.GetProducer(producerName);
await producer.ProduceAsync(
topicName,
Guid.NewGuid().ToString(),
new HelloMessage { Text = "Hello!" });
Console.WriteLine("Message sent!");
8. Create Consumer Project
Run the following command to create a Console Project named Consumer.
dotnet new console --name Consumer
9. Add a reference to the Producer
In order to access the message contract, add a reference to the Producer Project.
Inside the Consumer project directory, run the following commands to add the reference.
dotnet add reference ../Producer
10. Install KafkaFlow packages
Inside the Consumer project directory, run the following commands to install the required packages.
dotnet add package KafkaFlow
dotnet add package KafkaFlow.Microsoft.DependencyInjection
dotnet add package KafkaFlow.LogHandler.Console
dotnet add package KafkaFlow.Serializer.JsonCore
dotnet add package Microsoft.Extensions.DependencyInjection
11. Create a Message Handler
Create a new class file named HelloMessageHandler.cs and add the following example.
using KafkaFlow;
using Producer;
namespace Consumer;
public class HelloMessageHandler : IMessageHandler<HelloMessage>
{
public Task Handle(IMessageContext context, HelloMessage message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Text);
return Task.CompletedTask;
}
}
12. Create the Message Consumer
Replace the content of the Program.cs with the following example.
using KafkaFlow;
using KafkaFlow.Serializer;
using Microsoft.Extensions.DependencyInjection;
using Consumer;
const string topicName = "sample-topic";
var services = new ServiceCollection();
services.AddKafka(kafka => kafka
.UseConsoleLog()
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.CreateTopicIfNotExists(topicName, 1, 1)
.AddConsumer(consumer => consumer
.Topic(topicName)
.WithGroupId("sample-group")
.WithBufferSize(100)
.WithWorkersCount(10)
.AddMiddlewares(middlewares => middlewares
.AddDeserializer<JsonCoreDeserializer>()
.AddTypedHandlers(h => h.AddHandler<HelloMessageHandler>())
)
)
)
);
var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.CreateKafkaBus();
await bus.StartAsync();
Console.ReadKey();
await bus.StopAsync();
13. Run!
From the KafkaFlowQuickstart
directory:
dotnet run --project Consumer/Consumer.csproj
dotnet run --project Producer/Producer.csproj
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4