In this section, we will learn how to use the Serializer Middleware.
The Serializer Middleware is used to serialize and deserialize messages.
You can use one of the following common serializers or build your own:
How to use itOn the configuration, add the AddSerializer
/AddDeserializer
extension method to your producer/consumer middlewares to use it.
The AddSerializer
/AddDeserializer
method has two arguments:
ISerializer
/IDeserializer
interface.IMessageTypeResolver
interface. If the parameter is not provided, then the DefaultTypeResolver
will be used. Both classes can be provided as an argument through a factory method too.tip
For topics that have just one message type, use the AddSingleTypeSerializer
/AddSingleTypeDeserializer
method.
Serializer middleware also handles the produce of tombstone records. The messages produced are null
whenever the message value is null, but not when that value is an empty byte
array.
services.AddKafka(kafka => kafka
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddProducer<ProductEventsProducer>(producer => producer
...
.AddMiddlewares(middlewares => middleware
...
.AddSerializer<JsonMessageSerializer>()
.AddSerializer<JsonMessageSerializer, YourTypeResolver>()
.AddSerializer(
resolver => new JsonMessageSerializer(...),
resolver => new YourTypeResolver(...))
.AddSingleTypeSerializer<YourMessageType, JsonMessageSerializer>()
.AddSingleTypeSerializer<YourMessageType>(resolver => new JsonMessageSerializer(...))
...
)
)
)
);
Adding Schema Registry support
Serializer middlewares can be used along with schema registry allowing the evolution of schemas according to the configured compatibility setting.
Install the KafkaFlow.SchemaRegistry package, configure the schema registry broker, and use one of the following packages to use all the schema registry integration features.
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddKafka(
kafka => kafka
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.WithSchemaRegistry(config => config.Url = "localhost:8081")
.AddProducer(
...
.AddMiddlewares(middlewares =>
middlewares.AddSchemaRegistryAvroSerializer(new AvroSerializerConfig{ SubjectNameStrategy = SubjectNameStrategy.TopicRecord })
)
.AddConsumer(
...
.AddMiddlewares(middlewares => middlewares.AddSchemaRegistryAvroDeserializer()
)
)
);
}
}
info
To be able to publish multiple type messages per topic, SubjectNameStrategy.Record
or SubjectNameStrategy.TopicRecord
must be used. You can see a detailed explanation here.
A type resolver is needed to instruct the middleware where to find the destination message type in the message metadata when consuming and where to store it when producing.
The framework has the DefaultTypeResolver
that will be used omitting the second type parameter in the AddSerializer
/AddDeserializer
method. You can create your own implementation of IMessageTypeResolver
to allow communication with other frameworks.
public class SampleMessageTypeResolver : IMessageTypeResolver
{
private const string MessageType = "MessageType";
public Type OnConsume(IMessageContext context)
{
var typeName = context.Headers.GetString(MessageType);
return Type.GetType(typeName);
}
public void OnProduce(IMessageContext context)
{
context.Headers.SetString(
MessageType,
$"{context.Message.GetType().FullName}, {context.Message.GetType().Assembly.GetName().Name}");
}
}
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4