Skip to main content

Configuration

Configure consumer behavior through appsettings or custom configuration classes.

Default Configuration

Add default settings in appsettings.json:

{
"KafkaBus": {
"DefaultConsumerSettings": {
"BootstrapServers": "localhost:9092",
"AutoOffsetReset": "Earliest",
"EnableAutoCommit": true,
"AutoCommitIntervalMs": 5000,
"SessionTimeoutMs": 45000,
"MaxPollIntervalMs": 300000
}
}
}

Common Settings

SettingDescriptionDefault
BootstrapServersKafka broker addressesRequired
AutoOffsetResetWhere to start (Earliest, Latest)Latest
EnableAutoCommitAuto-acknowledge messagestrue
AutoCommitIntervalMsAuto-commit frequency5000
SessionTimeoutMsConsumer session timeout45000
MaxPollIntervalMsMax time between polls300000
FetchMinBytesMinimum fetch size1
FetchMaxBytesMaximum fetch size52428800

Custom Consumer Configuration

Create a configuration class for specific message types:

public class OrderConsumerConfiguration(IServiceProvider sp)
: ConsumerConfiguration<string, OrderCreated>(sp)
{
public override string Topic => "orders";
public override string GroupId => "order-processing-service";
public override int WorkerCount => 4;
}

Configuration Properties

Topic

The Kafka topic to consume from:

public override string Topic => "orders";

GroupId

Consumer group for load balancing:

public override string GroupId => "order-service";
tip

Consumers in the same group share partitions. Different groups receive all messages independently.

WorkerCount

Number of parallel workers (threads):

public override int WorkerCount => 4;
note

WorkerCount should not exceed the number of partitions. Extra workers will be idle.

Advanced Configuration

Override ConfigureAsync for full control:

public class OrderConsumerConfiguration(IServiceProvider sp)
: ConsumerConfiguration<string, OrderCreated>(sp)
{
public override string Topic => "orders";
public override string GroupId => "order-service";
public override int WorkerCount => 4;

public override Task<ConsumerConfig> ConfigureAsync()
{
var config = defaultConfiguration;

config.GroupId = GroupId;
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.EnableAutoCommit = false; // Manual ack
config.SessionTimeoutMs = 30000;
config.MaxPollIntervalMs = 600000; // 10 minutes for long processing
config.FetchMinBytes = 1024;
config.FetchMaxBytes = 10485760; // 10MB

return Task.FromResult(config);
}
}

Configuration Profiles

High Throughput

Process maximum messages per second:

public override Task<ConsumerConfig> ConfigureAsync()
{
var config = defaultConfiguration;
config.GroupId = GroupId;
config.EnableAutoCommit = true;
config.AutoCommitIntervalMs = 1000;
config.FetchMinBytes = 10240; // Wait for 10KB
config.FetchMaxBytes = 52428800; // 50MB max
config.MaxPartitionFetchBytes = 10485760; // 10MB per partition
return Task.FromResult(config);
}

Reliable Processing

Ensure no message loss:

public override Task<ConsumerConfig> ConfigureAsync()
{
var config = defaultConfiguration;
config.GroupId = GroupId;
config.EnableAutoCommit = false; // Manual acknowledgment
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.EnableAutoOffsetStore = false;
config.IsolationLevel = IsolationLevel.ReadCommitted;
return Task.FromResult(config);
}

Long Processing

For handlers that take time:

public override Task<ConsumerConfig> ConfigureAsync()
{
var config = defaultConfiguration;
config.GroupId = GroupId;
config.MaxPollIntervalMs = 1800000; // 30 minutes
config.SessionTimeoutMs = 60000; // 1 minute
config.HeartbeatIntervalMs = 20000; // 20 seconds
return Task.FromResult(config);
}

Custom Deserializers

Specify custom deserializers:

public class OrderConsumerConfiguration(IServiceProvider sp)
: ConsumerConfiguration<Guid, OrderCreated>(sp)
{
public override string Topic => "orders";

public override IDeserializer<Guid>? KeyDeserializer
=> new GuidDeserializer();

public override IDeserializer<OrderCreated>? ValueDeserializer
=> new JsonDeserializer<OrderCreated>();
}

Environment-Based Configuration

// appsettings.Development.json
{
"KafkaBus": {
"DefaultConsumerSettings": {
"BootstrapServers": "localhost:9092",
"AutoOffsetReset": "Earliest"
}
}
}

// appsettings.Production.json
{
"KafkaBus": {
"DefaultConsumerSettings": {
"BootstrapServers": "kafka-1:9092,kafka-2:9092",
"AutoOffsetReset": "Latest",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "Plain",
"SaslUsername": "your-username",
"SaslPassword": "your-password"
}
}
}

Next Steps