In this section, we will learn how to use the Typed Handler middleware.
The Typed Handler Middleware allows you to execute different handlers depending on the message type.
tip
Use it when the topic has different message types.
When a message with a given type arrives, the middleware will call the appropriate message handler for that message type.
How Message Type is discoveredThe Message Type discovery may vary depending on the implementation.
With Schema RegistryWhen using a Schema Registry, the schema is read from it, and the first 5 bytes of the message represent the SchemaId
registered on the Schema Registry.
Using other serializers with no Schema Registry, the DefaultTypeResolver
is used by default. The DefaultTypeResolver
uses the header Message-Type
to identify the message type based on the Type fully qualified name.
It's also possible to write your own TypeResolver
implementing the IMessageTypeResolver
interface and using it in the AddSerializer
/AddDeserializer
method in the consumer/producer middleware.
There are three ways to add handlers to a consumer:
AddHandler<HandlerType>()
: adds one handler per call.AddHandlers(IEnumerable<Type> handlers)
: adds many handlers per call.AddHandlersFromAssemblyOf<HandlerType>()
: adds all classes on the given assembly type that implement the IMessageHandler
interface.services.AddKafka(kafka => kafka
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddConsumer(consumer => consumer
...
.AddMiddlewares(middlewares => middlewares
...
.AddTypedHandlers(handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler<ProductCreatedHandler>()
.AddHandlers( ... )
.AddHandlersFromAssemblyOf<ProductCreatedHandler>()
)
)
)
)
);
Create a Message Handler
A message handler can be created by implementing the IMessageHandler<MessageType>
interface.
The handler's instance is created by the configured dependency injection container, any handler dependency will be injected through the constructor, and the instance lifetime can be configured in the configuration.
warning
If there's no handler defined for the arriving message, it will be ignored.
public class ProductCreatedHandler : IMessageHandler<ProductCreatedEvent>
{
public Task Handle(IMessageContext context, ProductCreatedEvent message)
{
...
}
}
Configuring Handler Lifetime
The Handler lifetime can be configured to one of the following modes:
info
By default, the handler lifetime is Singleton.
Handling No Handler Found eventIf there's no handler defined for the arriving message, it will be ignored.
It is possible to handle those events. As an example, the following code writes to the console when a message can't be handled.
services.AddKafka(kafka => kafka
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddConsumer(consumer => consumer
...
.AddMiddlewares(middlewares => middlewares
...
.AddTypedHandlers(handlers => handlers
.AddHandler<ProductCreatedHandler>()
.WhenNoHandlerFound(context =>
Console.WriteLine("Message not handled > Partition: {0} | Offset: {1}",
context.ConsumerContext.Partition,
context.ConsumerContext.Offset)
)
)
)
)
)
);
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4