Skip to main content

Custom Serialization

Implement custom serializers for Protobuf, Avro, MessagePack, or other formats.

Marker Interfaces

KafkaBus provides marker interfaces to identify default serializers:

// Key serializers
public interface IDefaultKeySerializer { }
public interface IDefaultKeyDeserializer { }

// Value serializers
public interface IDefaultValueSerializer { }
public interface IDefaultValueDeserializer { }

Custom Serializer Interface

Implement ISerializer<T> from Confluent.Kafka and add the marker interface:

public class CustomSerializer<T> : ISerializer<T>, IDefaultValueSerializer
{
public byte[] Serialize(T data, SerializationContext context)
{
// Your serialization logic
}
}

Custom Deserializer Interface

Implement IDeserializer<T> with the marker interface:

public class CustomDeserializer<T> : IDeserializer<T>, IDefaultValueDeserializer
{
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull || data.IsEmpty)
return default!;
// Your deserialization logic
}
}

Protobuf Example

Using Google.Protobuf with marker interfaces:

public class ProtobufSerializer<T> : ISerializer<T>, IDefaultValueSerializer
where T : IMessage<T>
{
public byte[] Serialize(T data, SerializationContext context)
{
return data.ToByteArray();
}
}

public class ProtobufDeserializer<T> : IDeserializer<T>, IDefaultValueDeserializer
where T : IMessage<T>, new()
{
private readonly MessageParser<T> _parser = new(() => new T());

public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull || data.IsEmpty)
return default!;
return _parser.ParseFrom(data.ToArray());
}
}

MessagePack Example

Using MessagePack-CSharp with marker interfaces:

public class MessagePackSerializer<T> : ISerializer<T>, IDefaultValueSerializer
{
public byte[] Serialize(T data, SerializationContext context)
{
return MessagePackSerializer.Serialize(data);
}
}

public class MessagePackDeserializer<T> : IDeserializer<T>, IDefaultValueDeserializer
{
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull || data.IsEmpty)
return default!;
return MessagePackSerializer.Deserialize<T>(data.ToArray());
}
}

Registering Default Serializers

Register globally for all producers/consumers:

// Use Protobuf for all messages
builder.Services.AddKafkaBusProducers(
builder.Configuration,
[typeof(Program).Assembly],
defaultValueSerializer: typeof(ProtobufSerializer<>)
);

builder.Services.AddKafkaBusConsumers(
builder.Configuration,
[typeof(Program).Assembly],
defaultValueDeserializer: typeof(ProtobufDeserializer<>)
);

Per-Message Serializers

Configure for specific message types:

public class OrderProducerConfiguration(IServiceProvider sp)
: ProducerConfiguration<string, OrderCreated>(sp)
{
public override ISerializer<OrderCreated>? ValueSerializer
=> new ProtobufSerializer<OrderCreated>();
}

public class OrderConsumerConfiguration(IServiceProvider sp)
: ConsumerConfiguration<string, OrderCreated>(sp)
{
public override string Topic => "orders";

public override IDeserializer<OrderCreated>? ValueDeserializer
=> new ProtobufDeserializer<OrderCreated>();
}

Custom Key Serializers

For non-string keys, use IDefaultKeySerializer and IDefaultKeyDeserializer:

public class GuidSerializer : ISerializer<Guid>, IDefaultKeySerializer
{
public byte[] Serialize(Guid data, SerializationContext context)
{
return data.ToByteArray();
}
}

public class GuidDeserializer : IDeserializer<Guid>, IDefaultKeyDeserializer
{
public Guid Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull || data.Length != 16)
return Guid.Empty;
return new Guid(data);
}
}

// Usage in configuration
public class OrderProducerConfiguration(IServiceProvider sp)
: ProducerConfiguration<Guid, OrderCreated>(sp)
{
public override Guid GetKey(OrderCreated message) => message.Id;
public override ISerializer<Guid>? KeySerializer => new GuidSerializer();
}

Schema Registry Integration

For Avro with Confluent Schema Registry:

public class AvroSerializerWrapper<T> : ISerializer<T>, IDefaultValueSerializer
where T : class
{
private readonly ISchemaRegistryClient _schemaRegistry;
private readonly AvroSerializer<T> _serializer;

public AvroSerializerWrapper(string schemaRegistryUrl)
{
_schemaRegistry = new CachedSchemaRegistryClient(
new SchemaRegistryConfig { Url = schemaRegistryUrl });
_serializer = new AvroSerializer<T>(_schemaRegistry);
}

public byte[] Serialize(T data, SerializationContext context)
{
return _serializer.SerializeAsync(data, context)
.GetAwaiter().GetResult();
}
}

Marker Interface Summary

InterfacePurpose
IDefaultKeySerializerMarks a class as the default key serializer
IDefaultKeyDeserializerMarks a class as the default key deserializer
IDefaultValueSerializerMarks a class as the default value serializer
IDefaultValueDeserializerMarks a class as the default value deserializer

Next Steps