KafkaFlow is a set of nuget packages.
Install KafkaFlow using NuGet package management.
Types are in the KafkaFlow
namespace.
The host is configured using Dependency Injection. This is typically done once at application Startup.cs
shown bellow, but you can find an example on how to do it with an Hosted Service here.
public void ConfigureServices(IServiceCollection services)
{
services.AddKafka(kafka => kafka
.UseConsoleLog()
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddConsumer(consumer => consumer
.Topic("sample-topic")
.WithGroupId("sample-group")
.WithBufferSize(100)
.WithWorkersCount(10)
.AddMiddlewares(middlewares => middlewares
.AddDeserializer<JsonCoreDeserializer>()
.AddTypedHandlers(handlers => handlers
.AddHandler<SampleMessageHandler>())
)
)
.AddProducer("producer-name", producer => producer
.DefaultTopic("sample-topic")
.AddMiddlewares(middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()
)
)
)
);
}
public void Configure(
IApplicationBuilder app,
IWebHostEnvironment env,
IHostApplicationLifetime lifetime)
{
var kafkaBus = app.ApplicationServices.CreateKafkaBus();
lifetime.ApplicationStarted.Register(() => kafkaBus.StartAsync(lifetime.ApplicationStopped));
}
Now you can create Message Handlers or access Producers and start exchanging events through Kafka.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4