Skip to main content

Producer Middleware

Middleware allows you to intercept messages before they're sent to Kafka.

Creating Middleware

Implement IProduceMiddleware<TKey, TMessage>:

[MiddlewareOrder(1)]
public class LoggingMiddleware<TKey, TMessage>(
ILogger<LoggingMiddleware<TKey, TMessage>> logger)
: IProduceMiddleware<TKey, TMessage>
{
public async Task<DeliveryResult<TKey, TMessage>> InvokeAsync(
ProduceContext<TKey, TMessage> context,
ProduceDelegate<TKey, TMessage> next,
CancellationToken ct)
{
var sw = Stopwatch.StartNew();
logger.LogInformation("Producing to {Topic}", context.Topic);

var result = await next(context, ct);

logger.LogInformation(
"Produced to {Topic} P:{Partition} O:{Offset} in {Elapsed}ms",
context.Topic,
result.Partition.Value,
result.Offset.Value,
sw.ElapsedMilliseconds
);

return result;
}
}

ProduceContext

The context provides access to message details:

public async Task<DeliveryResult<TKey, TMessage>> InvokeAsync(
ProduceContext<TKey, TMessage> context,
ProduceDelegate<TKey, TMessage> next,
CancellationToken ct)
{
// Access context properties
var topic = context.Topic;
var partition = context.Partition; // null if not specified
var key = context.Key;
var message = context.Message;
var headers = context.Headers;

return await next(context, ct);
}

Middleware Order

Use [MiddlewareOrder] to control execution order. Higher values execute first:

[MiddlewareOrder(100)]  // Executes first (outermost)
public class AuthMiddleware<TKey, TMessage> : IProduceMiddleware<TKey, TMessage>

[MiddlewareOrder(50)] // Executes second
public class ValidationMiddleware<TKey, TMessage> : IProduceMiddleware<TKey, TMessage>

[MiddlewareOrder(1)] // Executes last (closest to Kafka)
public class LoggingMiddleware<TKey, TMessage> : IProduceMiddleware<TKey, TMessage>

Execution flow:

Request:  Auth → Validation → Logging → Kafka
Response: Auth ← Validation ← Logging ← Kafka

Common Middleware Examples

Correlation ID Middleware

Automatically add correlation headers:

[MiddlewareOrder(90)]
public class CorrelationMiddleware<TKey, TMessage>
: IProduceMiddleware<TKey, TMessage>
{
public async Task<DeliveryResult<TKey, TMessage>> InvokeAsync(
ProduceContext<TKey, TMessage> context,
ProduceDelegate<TKey, TMessage> next,
CancellationToken ct)
{
context.Headers ??= new Headers();

if (!context.Headers.Any(h => h.Key == "correlation-id"))
{
var correlationId = Activity.Current?.Id ?? Guid.NewGuid().ToString();
context.Headers.Add("correlation-id", Encoding.UTF8.GetBytes(correlationId));
}

return await next(context, ct);
}
}

Metrics Middleware

Track production metrics:

[MiddlewareOrder(80)]
public class MetricsMiddleware<TKey, TMessage>(
IMetricsCollector metrics)
: IProduceMiddleware<TKey, TMessage>
{
public async Task<DeliveryResult<TKey, TMessage>> InvokeAsync(
ProduceContext<TKey, TMessage> context,
ProduceDelegate<TKey, TMessage> next,
CancellationToken ct)
{
var sw = Stopwatch.StartNew();

try
{
var result = await next(context, ct);

metrics.RecordProduceSuccess(context.Topic, sw.ElapsedMilliseconds);
return result;
}
catch (Exception ex)
{
metrics.RecordProduceFailure(context.Topic, ex.GetType().Name);
throw;
}
}
}

Validation Middleware

Validate messages before sending:

[MiddlewareOrder(70)]
public class ValidationMiddleware<TKey, TMessage>(
IValidator<TMessage> validator)
: IProduceMiddleware<TKey, TMessage>
{
public async Task<DeliveryResult<TKey, TMessage>> InvokeAsync(
ProduceContext<TKey, TMessage> context,
ProduceDelegate<TKey, TMessage> next,
CancellationToken ct)
{
var validationResult = await validator.ValidateAsync(context.Message, ct);

if (!validationResult.IsValid)
{
throw new ValidationException(validationResult.Errors);
}

return await next(context, ct);
}
}

Enrichment Middleware

Add standard headers to all messages:

[MiddlewareOrder(60)]
public class EnrichmentMiddleware<TKey, TMessage>
: IProduceMiddleware<TKey, TMessage>
{
public async Task<DeliveryResult<TKey, TMessage>> InvokeAsync(
ProduceContext<TKey, TMessage> context,
ProduceDelegate<TKey, TMessage> next,
CancellationToken ct)
{
context.Headers ??= new Headers();
context.Headers.Add("produced-at", Encoding.UTF8.GetBytes(DateTime.UtcNow.ToString("O")));
context.Headers.Add("message-type", Encoding.UTF8.GetBytes(typeof(TMessage).Name));
context.Headers.Add("service", Encoding.UTF8.GetBytes("order-service"));

return await next(context, ct);
}
}

Registration

Middleware is automatically discovered and registered when you call AddKafkaBusProducers:

builder.Services.AddKafkaBusProducers(
builder.Configuration,
[typeof(Program).Assembly] // Scans this assembly for middleware
);

Next Steps