Quick Installation
dotnet add package KafkaBus
Features
Simple API
Send and receive Kafka messages with just a few lines of code. No boilerplate required.
Middleware Pipeline
Add logging, retry logic, metrics, and more with the middleware pipeline pattern.
DI Integration
First-class support for Microsoft.Extensions.DependencyInjection with auto-discovery.
Manual Acknowledgment
Full control over message acknowledgment for reliable processing.
Custom Serialization
Use JSON, Protobuf, MessagePack, or any custom serializer you need.
Batch Operations
Send multiple messages efficiently with batch operations.
Simple & Intuitive
Producer
// Send a message
await messageBus.SendAsync("orders", new OrderCreated
{
Id = Guid.NewGuid(),
ProductName = "Laptop",
Amount = 999.99m
});
// Send with custom key
await messageBus.SendAsync<Guid, OrderCreated>("orders", message);
// Send batch
await messageBus.SendBatchAsync("orders", messages);
Consumer
public class OrderConsumer(ILogger<OrderConsumer> logger)
: MessageConsumer<OrderCreated>
{
public override async Task HandleAsync(
ConsumeContext<string, OrderCreated> context,
CancellationToken ct)
{
logger.LogInformation("Order received: {Id}", context.Message.Id);
await ProcessOrderAsync(context.Message, ct);
Ack(context); // Manual acknowledgment
}
}
public class OrderConsumerConfiguration(IServiceProvider sp)
: ConsumerConfiguration<string, OrderCreated>(sp)
{
public override string Topic => "orders";
}
Easy Configuration
Configure KafkaBus through appsettings.json with sensible defaults. Override settings per environment or per message type.
- Environment-specific configuration
- Custom configuration sections
- Per-message type overrides
- Full Confluent.Kafka settings support
{
"KafkaBus": {
"DefaultProducerSettings": {
"BootstrapServers": "localhost:9092"
},
"DefaultConsumerSettings": {
"BootstrapServers": "localhost:9092",
"AutoOffsetReset": "Earliest",
"EnableAutoCommit": true
}
}
}
[MiddlewareOrder(1)]
public class LoggingMiddleware<TKey, TMessage>(
ILogger<LoggingMiddleware<TKey, TMessage>> logger)
: IConsumeMiddleware<TKey, TMessage>
{
public async Task InvokeAsync(
ConsumeContext<TKey, TMessage> context,
ConsumeDelegate<TKey, TMessage> next,
CancellationToken ct)
{
var sw = Stopwatch.StartNew();
logger.LogInformation("Processing {Topic}", context.Topic);
await next(context, ct);
logger.LogInformation("Processed in {Elapsed}ms", sw.ElapsedMilliseconds);
}
}
Powerful Middleware
Intercept messages with the middleware pipeline. Add cross-cutting concerns without modifying your business logic.
- Logging & Metrics
- Retry & Circuit Breaker
- Validation & Enrichment
- Distributed Tracing
- Custom ordering with attributes
Ready to Get Started?
Add KafkaBus to your project and start building reliable Kafka applications in minutes.