AsyncCommitManager.java


Overview

The `AsyncCommitManager` class is part of the Apache Camel Kafka component and provides an **asynchronous offset commit strategy** for Kafka consumers. Its primary responsibility is to manage Kafka consumer offset commits in a **non-blocking manner**, leveraging Kafka's `commitAsync()` API to improve consumer throughput and responsiveness.

This commit manager maintains an internal cache of offsets that have been processed but not yet committed, triggers asynchronous offset commits either automatically or on-demand, and handles post-commit callbacks to update state repositories and clean up its cache. It also supports manual commit integration, allowing explicit asynchronous commits triggered by Camel routes.


Detailed Description

Package and Imports


Class: AsyncCommitManager

public class AsyncCommitManager extends AbstractCommitManager

Purpose

`AsyncCommitManager` implements an asynchronous offset commit strategy for Kafka consumers. It extends `AbstractCommitManager`, inheriting common commit utilities and state management.

Fields

Field

Type

Description

`private static final Logger LOG`

`Logger`

Logger instance for logging commit events.

`private final Consumer consumer`

Kafka consumer instance

The underlying Kafka consumer used to commit offsets.

`private final OffsetCache offsetCache`

`OffsetCache`

Cache for tracking latest offsets per partition.

`private final StateRepository offsetRepository`

External offset repository

Optional external storage for committed offsets.

Constructor

public AsyncCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic)

Methods

1. public void commit()

// Trigger an async commit if auto-commit is enabled
asyncCommitManager.commit();

2. public void commit(TopicPartition partition)

TopicPartition tp = new TopicPartition("my-topic", 0);
asyncCommitManager.recordOffset(tp, lastProcessedOffset);
asyncCommitManager.commit(tp);

3. private void commitAsync(Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset)


4. public KafkaManualCommit getManualCommit(Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> consumerRecord)


5. public void recordOffset(TopicPartition partition, long partitionLastOffset)

asyncCommitManager.recordOffset(new TopicPartition("topic", 1), 123L);

6. private void postCommitCallback(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception)


Important Implementation Details


Interaction with Other Components


Usage Example

// Assume consumer is a properly initialized Kafka Consumer instance
AsyncCommitManager commitManager = new AsyncCommitManager(consumer, kafkaConsumer, "thread-1", "my-topic");

// Record processed offsets during message processing
commitManager.recordOffset(new TopicPartition("my-topic", 0), 123);

// Commit asynchronously for a specific partition
commitManager.commit(new TopicPartition("my-topic", 0));

// Or commit all offsets asynchronously if auto-commit is enabled
commitManager.commit();

Mermaid Class Diagram

classDiagram
    class AsyncCommitManager {
        -Consumer<?, ?> consumer
        -OffsetCache offsetCache
        -StateRepository<String, String> offsetRepository
        +AsyncCommitManager(Consumer<?, ?>, KafkaConsumer, String, String)
        +void commit()
        +void commit(TopicPartition)
        +KafkaManualCommit getManualCommit(Exchange, TopicPartition, ConsumerRecord)
        +void recordOffset(TopicPartition, long)
        -void commitAsync(Consumer<?, ?>, TopicPartition, long)
        -void postCommitCallback(Map<TopicPartition, OffsetAndMetadata>, Exception)
    }
    AsyncCommitManager --|> AbstractCommitManager

Summary

`AsyncCommitManager` is a concrete implementation of a commit manager that asynchronously commits Kafka consumer offsets using Kafka's `commitAsync()` API. It is optimized for high-throughput Kafka consumers where blocking on offset commits is undesirable. The class integrates with Camel's Kafka consumer infrastructure, supports manual commit APIs, and optionally persists offsets in an external repository, balancing performance and reliability.


Appendix: Related Files and Concepts

See the **Offset Commit Management** module documentation for details about other commit managers (`SyncCommitManager`, `NoopCommitManager`) and the overall commit manager factory logic.


Diagram: Asynchronous Offset Commit Workflow (Simplified)

flowchart TD
    PollLoop[Start Kafka Poll Loop] --> ProcessMsgs[Process Messages]
    ProcessMsgs --> RecordOffsets[Record Latest Offsets in OffsetCache]
    RecordOffsets --> CommitCheck{Is Commit Needed?}
    CommitCheck -- Yes --> CommitAsync[Invoke consumer.commitAsync()]
    CommitCheck -- No --> Continue[Continue Processing]
    CommitAsync --> Callback[Post-Commit Callback]
    Callback --> RepositoryCheck{Is Offset Repository Configured?}
    RepositoryCheck -- Yes --> SaveOffsets[Save Offsets to Repository]
    RepositoryCheck -- No --> Done[Done]
    SaveOffsets --> Done
    Continue --> PollLoop

This flowchart illustrates the core loop of message processing, offset tracking, asynchronous commit triggering, and post-commit handling including optional external repository updates.


End of Documentation for AsyncCommitManager.java