Skip to main content

KafkaBus

Lightweight Kafka library for .NET

Quick Installation

dotnet add package KafkaBus

Features

🚀

Simple API

Send and receive Kafka messages with just a few lines of code. No boilerplate required.

🔗

Middleware Pipeline

Add logging, retry logic, metrics, and more with the middleware pipeline pattern.

🔌

DI Integration

First-class support for Microsoft.Extensions.DependencyInjection with auto-discovery.

🔄

Manual Acknowledgment

Full control over message acknowledgment for reliable processing.

🎯

Custom Serialization

Use JSON, Protobuf, MessagePack, or any custom serializer you need.

Batch Operations

Send multiple messages efficiently with batch operations.

Simple & Intuitive

Producer

// Send a message
await messageBus.SendAsync("orders", new OrderCreated
{
Id = Guid.NewGuid(),
ProductName = "Laptop",
Amount = 999.99m
});

// Send with custom key
await messageBus.SendAsync<Guid, OrderCreated>("orders", message);

// Send batch
await messageBus.SendBatchAsync("orders", messages);

Consumer

public class OrderConsumer(ILogger<OrderConsumer> logger) 
: MessageConsumer<OrderCreated>
{
public override async Task HandleAsync(
ConsumeContext<string, OrderCreated> context,
CancellationToken ct)
{
logger.LogInformation("Order received: {Id}", context.Message.Id);

await ProcessOrderAsync(context.Message, ct);

Ack(context); // Manual acknowledgment
}
}

public class OrderConsumerConfiguration(IServiceProvider sp)
: ConsumerConfiguration<string, OrderCreated>(sp)
{
public override string Topic => "orders";
}

Easy Configuration

Configure KafkaBus through appsettings.json with sensible defaults. Override settings per environment or per message type.

  • Environment-specific configuration
  • Custom configuration sections
  • Per-message type overrides
  • Full Confluent.Kafka settings support
Learn More →
{
"KafkaBus": {
"DefaultProducerSettings": {
"BootstrapServers": "localhost:9092"
},
"DefaultConsumerSettings": {
"BootstrapServers": "localhost:9092",
"AutoOffsetReset": "Earliest",
"EnableAutoCommit": true
}
}
}
[MiddlewareOrder(1)]
public class LoggingMiddleware<TKey, TMessage>(
ILogger<LoggingMiddleware<TKey, TMessage>> logger)
: IConsumeMiddleware<TKey, TMessage>
{
public async Task InvokeAsync(
ConsumeContext<TKey, TMessage> context,
ConsumeDelegate<TKey, TMessage> next,
CancellationToken ct)
{
var sw = Stopwatch.StartNew();
logger.LogInformation("Processing {Topic}", context.Topic);

await next(context, ct);

logger.LogInformation("Processed in {Elapsed}ms", sw.ElapsedMilliseconds);
}
}

Powerful Middleware

Intercept messages with the middleware pipeline. Add cross-cutting concerns without modifying your business logic.

  • Logging & Metrics
  • Retry & Circuit Breaker
  • Validation & Enrichment
  • Distributed Tracing
  • Custom ordering with attributes
Learn More →

Ready to Get Started?

Add KafkaBus to your project and start building reliable Kafka applications in minutes.