Configuration
Configure consumer behavior through appsettings or custom configuration classes.
Default Configuration
Add default settings in appsettings.json:
{
"KafkaBus": {
"DefaultConsumerSettings": {
"BootstrapServers": "localhost:9092",
"AutoOffsetReset": "Earliest",
"EnableAutoCommit": true,
"AutoCommitIntervalMs": 5000,
"SessionTimeoutMs": 45000,
"MaxPollIntervalMs": 300000
}
}
}
Common Settings
| Setting | Description | Default |
|---|---|---|
BootstrapServers | Kafka broker addresses | Required |
AutoOffsetReset | Where to start (Earliest, Latest) | Latest |
EnableAutoCommit | Auto-acknowledge messages | true |
AutoCommitIntervalMs | Auto-commit frequency | 5000 |
SessionTimeoutMs | Consumer session timeout | 45000 |
MaxPollIntervalMs | Max time between polls | 300000 |
FetchMinBytes | Minimum fetch size | 1 |
FetchMaxBytes | Maximum fetch size | 52428800 |
Custom Consumer Configuration
Create a configuration class for specific message types:
public class OrderConsumerConfiguration(IServiceProvider sp)
: ConsumerConfiguration<string, OrderCreated>(sp)
{
public override string Topic => "orders";
public override string GroupId => "order-processing-service";
public override int WorkerCount => 4;
}
Configuration Properties
Topic
The Kafka topic to consume from:
public override string Topic => "orders";
GroupId
Consumer group for load balancing:
public override string GroupId => "order-service";
tip
Consumers in the same group share partitions. Different groups receive all messages independently.
WorkerCount
Number of parallel workers (threads):
public override int WorkerCount => 4;
note
WorkerCount should not exceed the number of partitions. Extra workers will be idle.
Advanced Configuration
Override ConfigureAsync for full control:
public class OrderConsumerConfiguration(IServiceProvider sp)
: ConsumerConfiguration<string, OrderCreated>(sp)
{
public override string Topic => "orders";
public override string GroupId => "order-service";
public override int WorkerCount => 4;
public override Task<ConsumerConfig> ConfigureAsync()
{
var config = defaultConfiguration;
config.GroupId = GroupId;
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.EnableAutoCommit = false; // Manual ack
config.SessionTimeoutMs = 30000;
config.MaxPollIntervalMs = 600000; // 10 minutes for long processing
config.FetchMinBytes = 1024;
config.FetchMaxBytes = 10485760; // 10MB
return Task.FromResult(config);
}
}
Configuration Profiles
High Throughput
Process maximum messages per second:
public override Task<ConsumerConfig> ConfigureAsync()
{
var config = defaultConfiguration;
config.GroupId = GroupId;
config.EnableAutoCommit = true;
config.AutoCommitIntervalMs = 1000;
config.FetchMinBytes = 10240; // Wait for 10KB
config.FetchMaxBytes = 52428800; // 50MB max
config.MaxPartitionFetchBytes = 10485760; // 10MB per partition
return Task.FromResult(config);
}
Reliable Processing
Ensure no message loss:
public override Task<ConsumerConfig> ConfigureAsync()
{
var config = defaultConfiguration;
config.GroupId = GroupId;
config.EnableAutoCommit = false; // Manual acknowledgment
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.EnableAutoOffsetStore = false;
config.IsolationLevel = IsolationLevel.ReadCommitted;
return Task.FromResult(config);
}
Long Processing
For handlers that take time:
public override Task<ConsumerConfig> ConfigureAsync()
{
var config = defaultConfiguration;
config.GroupId = GroupId;
config.MaxPollIntervalMs = 1800000; // 30 minutes
config.SessionTimeoutMs = 60000; // 1 minute
config.HeartbeatIntervalMs = 20000; // 20 seconds
return Task.FromResult(config);
}
Custom Deserializers
Specify custom deserializers:
public class OrderConsumerConfiguration(IServiceProvider sp)
: ConsumerConfiguration<Guid, OrderCreated>(sp)
{
public override string Topic => "orders";
public override IDeserializer<Guid>? KeyDeserializer
=> new GuidDeserializer();
public override IDeserializer<OrderCreated>? ValueDeserializer
=> new JsonDeserializer<OrderCreated>();
}
Environment-Based Configuration
// appsettings.Development.json
{
"KafkaBus": {
"DefaultConsumerSettings": {
"BootstrapServers": "localhost:9092",
"AutoOffsetReset": "Earliest"
}
}
}
// appsettings.Production.json
{
"KafkaBus": {
"DefaultConsumerSettings": {
"BootstrapServers": "kafka-1:9092,kafka-2:9092",
"AutoOffsetReset": "Latest",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "Plain",
"SaslUsername": "your-username",
"SaslPassword": "your-password"
}
}
}
Next Steps
- Acknowledgment - Manual message acknowledgment
- Metadata - Access message metadata
- Middleware - Add cross-cutting concerns