AsyncCommitManager.java
Overview
The `AsyncCommitManager` class is part of the Apache Camel Kafka component and provides an **asynchronous offset commit strategy** for Kafka consumers. Its primary responsibility is to manage Kafka consumer offset commits in a **non-blocking manner**, leveraging Kafka's `commitAsync()` API to improve consumer throughput and responsiveness.
This commit manager maintains an internal cache of offsets that have been processed but not yet committed, triggers asynchronous offset commits either automatically or on-demand, and handles post-commit callbacks to update state repositories and clean up its cache. It also supports manual commit integration, allowing explicit asynchronous commits triggered by Camel routes.
Detailed Description
Package and Imports
Package:
org.apache.camel.component.kafka.consumerKey imports:
Kafka client classes (
Consumer,ConsumerRecord,TopicPartition,OffsetAndMetadata)Apache Camel classes (
Exchange,KafkaConsumer,StateRepository)Logging (
org.slf4j.Logger)
Class: AsyncCommitManager
public class AsyncCommitManager extends AbstractCommitManager
Purpose
`AsyncCommitManager` implements an asynchronous offset commit strategy for Kafka consumers. It extends `AbstractCommitManager`, inheriting common commit utilities and state management.
Fields
Field | Type | Description |
|---|---|---|
`private static final Logger LOG` | `Logger` | Logger instance for logging commit events. |
`private final Consumer consumer` | Kafka consumer instance | The underlying Kafka consumer used to commit offsets. |
`private final OffsetCache offsetCache` | `OffsetCache` | Cache for tracking latest offsets per partition. |
`private final StateRepository offsetRepository` | External offset repository | Optional external storage for committed offsets. |
Constructor
public AsyncCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic)
Parameters:
consumer: Kafka consumer instance to perform commits.kafkaConsumer: Camel Kafka consumer wrapper.threadId: Identifier for the consumer thread.printableTopic: Human-readable topic name for logging.
Behavior:
Calls the parent constructor.
Initializes the Kafka consumer reference.
Retrieves the optional external offset repository from the configuration.
Methods
1. public void commit()
Purpose:
Performs an asynchronous commit of all offsets currently tracked by the Kafka consumer if auto-commit is enabled.Behavior:
Checks if the Kafka endpoint configuration has auto-commit enabled.
If enabled, logs the commit action.
Calls Kafka's
commitAsync()method without specific offsets, committing all tracked offsets asynchronously.
Usage Example:
// Trigger an async commit if auto-commit is enabled
asyncCommitManager.commit();
2. public void commit(TopicPartition partition)
Purpose:
Commits asynchronously the offset for a specific Kafka partition if it is present in the offset cache.Parameters:
partition: The KafkaTopicPartitionto commit.
Behavior:
Retrieves the last offset for the partition from
offsetCache.If an offset is present, calls
commitAsync()for that partition with offset + 1.
Usage Example:
TopicPartition tp = new TopicPartition("my-topic", 0);
asyncCommitManager.recordOffset(tp, lastProcessedOffset);
asyncCommitManager.commit(tp);
3. private void commitAsync(Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset)
Purpose:
Internal helper method to perform an asynchronous commit of a single partition offset.Parameters:
consumer: Kafka consumer instance.partition: Kafka TopicPartition to commit.partitionLastOffset: Last processed offset for the partition.
Behavior:
Logs debug info about commit.
Constructs a singleton map with the partition and offset+1 wrapped in
OffsetAndMetadata.Calls
consumer.commitAsync()with the map and a callbackpostCommitCallbackto handle commit completion.
4. public KafkaManualCommit getManualCommit(Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> consumerRecord)
Purpose:
Obtains a manual commit object that allows explicit asynchronous offset commit within a Camel route.Parameters:
exchange: The CamelExchangeinstance.partition: KafkaTopicPartitionassociated with the record.consumerRecord: KafkaConsumerRecordbeing processed.
Returns:
An instance of
KafkaManualCommitconfigured for asynchronous commit.
Behavior:
Attempts to retrieve a customized
KafkaManualCommitFactoryfrom the endpoint.If none is set, defaults to
DefaultKafkaManualAsyncCommitFactory.Delegates to the inherited
getManualCommitmethod with the factory.
5. public void recordOffset(TopicPartition partition, long partitionLastOffset)
Purpose:
Records the latest processed offset for a given partition into the internal offset cache.Parameters:
partition: KafkaTopicPartition.partitionLastOffset: The offset to record.
Behavior:
Delegates to the
offsetCacheto store the offset.
Usage Example:
asyncCommitManager.recordOffset(new TopicPartition("topic", 1), 123L);
6. private void postCommitCallback(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception)
Purpose:
Callback handler invoked by Kafka after an asynchronous commit attempt.Parameters:
committed: Map of partitions and their committed offsets.exception: Exception thrown during commit, ornullif success.
Behavior:
On success and if an external offset repository is configured:
Iterates committed entries and saves committed offsets in the repository.
Calls
offsetCache.removeCommittedEntries()to remove successfully committed offsets or handle failures.
Important Implementation Details
Offset Cache Handling:
The class maintains anOffsetCacheinstance that tracks offsets per partition. This cache ensures that only the latest processed offsets are committed, reducing redundant commits.Asynchronous Commit with Callback:
Commits are performed via Kafka'scommitAsync()API, which returns immediately and executes the provided callback upon completion. This enables non-blocking offset commits, improving consumer throughput.Offset Repository Integration:
If configured, committed offsets are persisted in an externalStateRepositoryfor enhanced reliability and recovery. This external repository can be used for fault-tolerant offset management outside Kafka.Manual Commit Support:
The class supports manual commit operations viaKafkaManualCommitinstances, allowing explicit control over offset commits within Camel routes.Logging:
Uses SLF4J for info and debug level logging to provide visibility into commit operations.
Interaction with Other Components
KafkaConsumer:
TheAsyncCommitManageris instantiated and used by theKafkaConsumercomponent to handle offset commits during message consumption.Kafka Client Library:
Uses Kafka's Java consumer APIs (commitAsync) to commit offsets to Kafka brokers asynchronously.OffsetCache:
Manages in-memory tracking of offsets to be committed.StateRepository:
Optionally persists committed offsets externally, allowing recovery beyond Kafka's internal offsets.KafkaManualCommit:
Works with manual commit factories to provide manual asynchronous commit capabilities within Camel routes.
Usage Example
// Assume consumer is a properly initialized Kafka Consumer instance
AsyncCommitManager commitManager = new AsyncCommitManager(consumer, kafkaConsumer, "thread-1", "my-topic");
// Record processed offsets during message processing
commitManager.recordOffset(new TopicPartition("my-topic", 0), 123);
// Commit asynchronously for a specific partition
commitManager.commit(new TopicPartition("my-topic", 0));
// Or commit all offsets asynchronously if auto-commit is enabled
commitManager.commit();
Mermaid Class Diagram
classDiagram
class AsyncCommitManager {
-Consumer<?, ?> consumer
-OffsetCache offsetCache
-StateRepository<String, String> offsetRepository
+AsyncCommitManager(Consumer<?, ?>, KafkaConsumer, String, String)
+void commit()
+void commit(TopicPartition)
+KafkaManualCommit getManualCommit(Exchange, TopicPartition, ConsumerRecord)
+void recordOffset(TopicPartition, long)
-void commitAsync(Consumer<?, ?>, TopicPartition, long)
-void postCommitCallback(Map<TopicPartition, OffsetAndMetadata>, Exception)
}
AsyncCommitManager --|> AbstractCommitManager
Summary
`AsyncCommitManager` is a concrete implementation of a commit manager that asynchronously commits Kafka consumer offsets using Kafka's `commitAsync()` API. It is optimized for high-throughput Kafka consumers where blocking on offset commits is undesirable. The class integrates with Camel's Kafka consumer infrastructure, supports manual commit APIs, and optionally persists offsets in an external repository, balancing performance and reliability.
Appendix: Related Files and Concepts
See the **Offset Commit Management** module documentation for details about other commit managers (`SyncCommitManager`, `NoopCommitManager`) and the overall commit manager factory logic.
Diagram: Asynchronous Offset Commit Workflow (Simplified)
flowchart TD
PollLoop[Start Kafka Poll Loop] --> ProcessMsgs[Process Messages]
ProcessMsgs --> RecordOffsets[Record Latest Offsets in OffsetCache]
RecordOffsets --> CommitCheck{Is Commit Needed?}
CommitCheck -- Yes --> CommitAsync[Invoke consumer.commitAsync()]
CommitCheck -- No --> Continue[Continue Processing]
CommitAsync --> Callback[Post-Commit Callback]
Callback --> RepositoryCheck{Is Offset Repository Configured?}
RepositoryCheck -- Yes --> SaveOffsets[Save Offsets to Repository]
RepositoryCheck -- No --> Done[Done]
SaveOffsets --> Done
Continue --> PollLoop
This flowchart illustrates the core loop of message processing, offset tracking, asynchronous commit triggering, and post-commit handling including optional external repository updates.