Skip to main content

Message Metadata

Access message metadata through ConsumeContext.

Available Properties

public override Task HandleAsync(
ConsumeContext<string, OrderCreated> context,
CancellationToken ct)
{
// Message key
var key = context.Key;

// Deserialized message
var message = context.Message;

// Topic information
var topic = context.Topic;
var partition = context.Partition;
var offset = context.Offset;

// Timestamp when message was produced
var timestamp = context.Timestamp;

// Message headers
var headers = context.Headers;

return Task.CompletedTask;
}

Offset Tracking

Use offset information for deduplication or progress tracking:

public class OrderConsumer(
IOffsetTracker tracker,
ILogger<OrderConsumer> logger)
: MessageConsumer<OrderCreated>
{
public override async Task HandleAsync(
ConsumeContext<string, OrderCreated> context,
CancellationToken ct)
{
// Check if already processed (deduplication)
if (await tracker.IsProcessedAsync(context.Topic, context.Partition, context.Offset))
{
logger.LogWarning("Message already processed, skipping");
return;
}

await ProcessOrderAsync(context.Message, ct);

// Track progress
await tracker.MarkProcessedAsync(context.Topic, context.Partition, context.Offset);
}
}

Timestamp Usage

Use message timestamp for time-based logic:

public override Task HandleAsync(
ConsumeContext<string, OrderCreated> context,
CancellationToken ct)
{
var messageAge = DateTime.UtcNow - context.Timestamp;

if (messageAge > TimeSpan.FromHours(24))
{
logger.LogWarning(
"Stale message detected. Age: {Age}, Produced: {Timestamp}",
messageAge,
context.Timestamp
);
}

// Calculate processing lag
logger.LogInformation(
"Processing message from {Timestamp}, lag: {LagMs}ms",
context.Timestamp,
messageAge.TotalMilliseconds
);

return Task.CompletedTask;
}

Next Steps