Offset Commit Management
Overview
The **Offset Commit Management** module abstracts and implements various strategies for committing Kafka consumer offsets within the Apache Camel Kafka component. Its purpose is to manage how and when Kafka consumer offsets are acknowledged and persisted, which is critical for ensuring message processing reliability, fault tolerance, and exactly-once or at-least-once delivery semantics.
This module addresses the challenges of offset management by providing pluggable commit managers that support different commit modes, including synchronous commits, asynchronous commits, no-operation commits (when Kafka's auto-commit is enabled), and commits backed by an external offset repository. It also integrates with manual commit APIs that allow fine-grained offset control within Camel routes.
Core Concepts and Purpose
Offset Commit: The process of recording the offset position of a Kafka partition that a consumer has processed, so that upon restart or failure, consumption can resume from the last committed offset.
Commit Strategies: Different approaches to commit offsets, including:
Synchronous Commit: Blocking commits that wait for Kafka to acknowledge the offset persistence.
Asynchronous Commit: Non-blocking commits with callbacks upon completion.
No-Op Commit: Placeholder commits used when Kafka's auto-commit handles offset persistence.
Manual Commit Integration: Support for explicit offset commits triggered by user logic within Camel routes, allowing custom acknowledgment workflows.
Offset Repository Integration: Optional persistence of committed offsets into an external state repository for enhanced fault tolerance beyond Kafka's internal offset storage.
The module exists to provide flexibility and reliability in offset management, catering to different use cases such as high-throughput batch processing, transactional consumption, manual control, and offset persistence in external storage.
How the Module Works
Commit Manager Factory
At the core is the factory method `CommitManagers.createCommitManager()` which selects and creates an appropriate `CommitManager` implementation based on the consumer configuration and manual commit settings.
public static CommitManager createCommitManager(
Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration();
if (configuration.isAllowManualCommit()) {
KafkaManualCommitFactory manualCommitFactory = kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();
if (manualCommitFactory instanceof DefaultKafkaManualAsyncCommitFactory) {
return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
} else if (manualCommitFactory instanceof DefaultKafkaManualCommitFactory) {
return new SyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
} else {
return new NoopCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
}
} else {
if (configuration.getOffsetRepository() != null) {
return new CommitToOffsetManager(consumer, kafkaConsumer, threadId, printableTopic);
}
if (configuration.isBatching()) {
return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
}
}
return new NoopCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
}
This method evaluates:
If manual commit is allowed, it selects commit managers corresponding to the manual commit factory's type (async, sync, or no-op).
If an offset repository is configured, it uses a commit manager that persists offsets externally.
If batching is enabled, it prefers async commit.
Otherwise, it defaults to no-op commit manager relying on Kafka's auto-commit.
Abstract Commit Manager Base
All commit managers extend `AbstractCommitManager`, which provides common utilities and state, including:
Access to the Kafka consumer, Camel Kafka consumer endpoint, configuration, and logging.
A utility method to create manual commit objects, bridging Camel exchanges with Kafka offsets.
Methods to force synchronous commits and to persist offsets to an external state repository if configured.
For example, the method to force a synchronous commit of a specific partition offset:
@Override
public void forceCommit(TopicPartition partition, long partitionLastOffset) {
consumer.commitSync(
Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)),
Duration.ofMillis(configuration.getCommitTimeoutMs()));
}
Synchronous Commit Manager
`SyncCommitManager` performs blocking commits via Kafka's `commitSync()` API. It maintains an internal cache of offsets (`OffsetCache`) that have been processed but not yet committed.
Key behaviors include:
On commit, it sends the latest offset + 1 to Kafka and waits for confirmation.
If an offset repository is configured, it also saves the committed offset externally.
Supports recording offsets as they are processed for later commit.
This strategy ensures strong delivery semantics but may impact throughput due to waiting for commit acknowledgments.
@Override
public void commit(TopicPartition partition) {
Long offset = offsetCache.getOffset(partition);
if (offset == null) {
return;
}
consumer.commitSync(
Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)),
Duration.ofMillis(configuration.getCommitTimeoutMs()));
if (offsetRepository != null) {
saveStateToOffsetRepository(partition, offset + 1, offsetRepository);
}
offsetCache.removeCommittedEntries(...);
}
Asynchronous Commit Manager
`AsyncCommitManager` uses Kafka's `commitAsync()` API to commit offsets non-blockingly.
Characteristics:
Maintains an
OffsetCachesimilar to the sync manager.On commit, it sends offsets asynchronously with a callback to handle errors and update the offset repository if configured.
Allows higher throughput with less blocking but with the risk that commit failures may go unnoticed unless handled properly.
On commit completion, the callback persists offsets to the external repository and cleans the offset cache:
private void postCommitCallback(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception) {
if (exception == null && offsetRepository != null) {
for (var entry : committed.entrySet()) {
saveStateToOffsetRepository(entry.getKey(), entry.getValue().offset(), offsetRepository);
}
}
offsetCache.removeCommittedEntries(committed, exception);
}
No-Op Commit Manager
`NoopCommitManager` is a placeholder commit manager used when offset commits are handled automatically by Kafka's internal auto-commit mechanism.
Its commit methods do nothing beyond logging.
It is used when manual offset control is disabled and no external offset repository is configured.
This manager relies entirely on Kafka's consumer configuration for offset commits.
@Override
public void commit() {
LOG.info("Auto commit on {} from {} is enabled via Kafka consumer (NO-OP)", threadId, printableTopic);
}
@Override
public void commit(TopicPartition partition) {
LOG.debug("Auto commit to offset {} from topic {} is disabled (NO-OP)", threadId, partition.topic());
}
Manual Commit Integration
All commit managers support generating `KafkaManualCommit` instances that allow explicit offset commits from within Camel routes.
They leverage the `KafkaManualCommitFactory` from the endpoint configuration to instantiate manual commit objects that encapsulate the necessary context (exchange, consumer, partition, offset, timeout).
This integration enables users to control offset commits explicitly, overriding automatic commit behavior.
Interaction with Other Modules
KafkaConsumer: The commit managers are instantiated and used by the
KafkaConsumerclass to handle offset commits during message consumption.KafkaManualCommit: Factories and objects created by commit managers enable manual commit support in Camel routes.
Offset Repository: Commit managers optionally persist committed offsets to an external
StateRepositoryfor enhanced reliability and recovery.Kafka Client Library: Commit managers interact with the Kafka consumer's commit APIs (
commitSync,commitAsync) to persist offsets.Logging and Monitoring: Commit managers use SLF4J logging to provide insight into commit operations and failures.
Design Patterns and Approaches
Factory Method Pattern: The
CommitManagers.createCommitManager()method encapsulates commit manager selection logic, returning appropriate implementations based on configuration.Template Method Pattern:
AbstractCommitManagerprovides common commit utilities and a foundation for concrete commit managers.Strategy Pattern: Different commit managers (
SyncCommitManager,AsyncCommitManager,NoopCommitManager) encapsulate distinct commit strategies that can be swapped transparently.Cache Usage:
OffsetCacheis used to track offsets that need to be committed, improving commit efficiency and batch processing.Callback Handling:
AsyncCommitManageruses callbacks to asynchronously handle commit results and update state.
Summary of Commit Manager Classes
Commit Manager | Commit Mode | Blocking | Uses Offset Repository | Manual Commit Support | Typical Use Case |
|---|---|---|---|---|---|
`SyncCommitManager` | Synchronous | Yes | Optional | Yes | Reliable commit with lower throughput |
`AsyncCommitManager` | Asynchronous | No | Optional | Yes | High throughput with eventual commit confirmation |
`NoopCommitManager` | No operation | N/A | No | Limited | Kafka auto-commit enabled, no manual control |
`CommitToOffsetManager`* | External storage | Varies | Yes | Yes | Offset commits persisted in external repository |
*Note: `CommitToOffsetManager` was referenced but its implementation is outside the provided code.
Mermaid Sequence Diagram: Offset Commit Workflow
sequenceDiagram
participant Consumer as Kafka Consumer
participant CommitMgr as CommitManager
participant Kafka as Kafka Broker
participant OffsetRepo as Offset Repository
Note over Consumer: Message processing loop
Consumer->>CommitMgr: recordOffset(partition, offset)
CommitMgr->>CommitMgr: cache offset internally
alt Auto commit enabled & No manual commit
CommitMgr->>Kafka: commitAsync() or commitSync() as per strategy
else Manual commit enabled
CommitMgr->>Consumer: expose KafkaManualCommit for manual offset control
Note right of CommitMgr: Manual commit triggered by route
CommitMgr->>Kafka: commitSync() or commitAsync() for manual commit
end
alt Offset Repository configured
CommitMgr->>OffsetRepo: save offset state externally
end
Kafka-->>CommitMgr: commit acknowledgment or callback
CommitMgr->>CommitMgr: update cache to remove committed offsets
This diagram illustrates how the commit manager records offsets during consumption, commits offsets either automatically or manually, interacts with Kafka broker to persist commits, and optionally saves offset state to an external repository. The commit cache ensures only uncommitted offsets are tracked.
This detailed overview explains the purpose, operation, and integration of the Offset Commit Management module within the Apache Camel Kafka component, highlighting its flexible commit strategies and how it supports robust message consumption semantics.