Basic Usage
Learn how to send messages to Kafka using KafkaBus.
IMessageBus Interface
The IMessageBus interface is your primary way to send messages. Inject it into your services:
public class OrderService(IMessageBus messageBus)
{
public async Task CreateOrderAsync(Order order)
{
await messageBus.SendAsync("orders", order);
}
}
Send Methods
Send to Topic
Send a message to a topic with automatic partition assignment:
// String key (default)
await messageBus.SendAsync("orders", new OrderCreated(Guid.NewGuid(), "Product", 99.99m));
// With custom key type
await messageBus.SendAsync<Guid, OrderCreated>("orders", message);
Send to Specific Partition
When you need control over which partition receives the message:
// String key
await messageBus.SendAsync("orders", partition: 2, message);
// Custom key type
await messageBus.SendAsync<Guid, OrderCreated>("orders", partition: 2, message);
Send Batch
Send multiple messages efficiently:
var orders = new[]
{
new OrderCreated(Guid.NewGuid(), "Product A", 10.00m),
new OrderCreated(Guid.NewGuid(), "Product B", 20.00m),
new OrderCreated(Guid.NewGuid(), "Product C", 30.00m)
};
// String key
var results = await messageBus.SendBatchAsync("orders", orders);
// Custom key type
var results = await messageBus.SendBatchAsync<Guid, OrderCreated>("orders", orders);
Key Types
KafkaBus supports any key type. The default is string:
// Default string key
await messageBus.SendAsync("topic", message);
// Guid key
await messageBus.SendAsync<Guid, OrderCreated>("topic", message);
// Int key
await messageBus.SendAsync<int, UserEvent>("topic", message);
// Long key
await messageBus.SendAsync<long, Transaction>("topic", message);
tip
Use meaningful keys for partitioning. Messages with the same key always go to the same partition, ensuring ordering.
Complete Example
public class NotificationService(IMessageBus messageBus, ILogger<NotificationService> logger)
{
public async Task SendOrderNotificationAsync(Order order)
{
var notification = new OrderNotification
{
OrderId = order.Id,
CustomerEmail = order.CustomerEmail,
Status = "Created",
CreatedAt = DateTime.UtcNow
};
var result = await messageBus.SendAsync("notifications", notification);
logger.LogInformation(
"Notification sent to partition {Partition} at offset {Offset}",
result.Partition.Value,
result.Offset.Value
);
}
public async Task SendBulkNotificationsAsync(IEnumerable<Order> orders)
{
var notifications = orders.Select(o => new OrderNotification
{
OrderId = o.Id,
CustomerEmail = o.CustomerEmail,
Status = "Created",
CreatedAt = DateTime.UtcNow
});
var results = await messageBus.SendBatchAsync("notifications", notifications);
logger.LogInformation("Sent {Count} notifications", results.Count());
}
}
Next Steps
- Delivery Result - Handle send results
- Custom Keys - Configure message keys
- Headers - Add metadata to messages