A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://farfetch.github.io/kafkaflow/docs/guides/middlewares/serializer-middleware below:

Serializer Middleware | KafkaFlow

Serializer Middleware

In this section, we will learn how to use the Serializer Middleware.

The Serializer Middleware is used to serialize and deserialize messages.

You can use one of the following common serializers or build your own:

How to use it

On the configuration, add the AddSerializer/AddDeserializer extension method to your producer/consumer middlewares to use it.

The AddSerializer/AddDeserializer method has two arguments:

tip

For topics that have just one message type, use the AddSingleTypeSerializer/AddSingleTypeDeserializer method.

Serializer middleware also handles the produce of tombstone records. The messages produced are null whenever the message value is null, but not when that value is an empty byte array.

services.AddKafka(kafka => kafka
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddProducer<ProductEventsProducer>(producer => producer
...
.AddMiddlewares(middlewares => middleware
...
.AddSerializer<JsonMessageSerializer>()

.AddSerializer<JsonMessageSerializer, YourTypeResolver>()

.AddSerializer(
resolver => new JsonMessageSerializer(...),
resolver => new YourTypeResolver(...))

.AddSingleTypeSerializer<YourMessageType, JsonMessageSerializer>()

.AddSingleTypeSerializer<YourMessageType>(resolver => new JsonMessageSerializer(...))
...
)
)
)
);

Adding Schema Registry support

Serializer middlewares can be used along with schema registry allowing the evolution of schemas according to the configured compatibility setting.

Install the KafkaFlow.SchemaRegistry package, configure the schema registry broker, and use one of the following packages to use all the schema registry integration features.

public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddKafka(
kafka => kafka
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.WithSchemaRegistry(config => config.Url = "localhost:8081")
.AddProducer(
...
.AddMiddlewares(middlewares =>
middlewares.AddSchemaRegistryAvroSerializer(new AvroSerializerConfig{ SubjectNameStrategy = SubjectNameStrategy.TopicRecord })
)
.AddConsumer(
...
.AddMiddlewares(middlewares => middlewares.AddSchemaRegistryAvroDeserializer()
)
)
);
}
}

info

To be able to publish multiple type messages per topic, SubjectNameStrategy.Record or SubjectNameStrategy.TopicRecord must be used. You can see a detailed explanation here.

Creating a Message Type Resolver

A type resolver is needed to instruct the middleware where to find the destination message type in the message metadata when consuming and where to store it when producing.

The framework has the DefaultTypeResolver that will be used omitting the second type parameter in the AddSerializer/AddDeserializer method. You can create your own implementation of IMessageTypeResolver to allow communication with other frameworks.

public class SampleMessageTypeResolver : IMessageTypeResolver
{
private const string MessageType = "MessageType";

public Type OnConsume(IMessageContext context)
{
var typeName = context.Headers.GetString(MessageType);

return Type.GetType(typeName);
}

public void OnProduce(IMessageContext context)
{
context.Headers.SetString(
MessageType,
$"{context.Message.GetType().FullName}, {context.Message.GetType().Assembly.GetName().Name}");
}
}

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4