Asynchronous Manual Commit

Purpose

Asynchronous Manual Commit addresses the need for explicit, non-blocking offset commits in Kafka consumers managed within Apache Camel routes. This subtopic enables users to manually control when offsets are committed to Kafka without stalling the processing thread, improving throughput and responsiveness in high-volume or latency-sensitive scenarios. Unlike synchronous manual commits, which commit offsets immediately and block until the commit completes, asynchronous commits record offset information for later submission, allowing the consumer to continue processing messages while offset commits happen in the background.

Functionality

The core functionality revolves around providing a manual commit object that can be attached to each consumed Kafka message within a Camel exchange. This object exposes a `commit()` method that, when invoked, does not perform an immediate offset commit but rather records the offset asynchronously to be committed later by the commit manager.

Key workflows include:

This approach helps decouple message processing from Kafka offset commit latency, improving overall consumer efficiency.

Code Interaction Example

The factory class creates manual commit instances:

public class DefaultKafkaManualAsyncCommitFactory implements KafkaManualCommitFactory {
    @Override
    public KafkaManualCommit newInstance(
        CamelExchangePayload camelExchangePayload, 
        KafkaRecordPayload kafkaRecordPayload, 
        CommitManager commitManager) {
        return new DefaultKafkaManualAsyncCommit(camelExchangePayload, kafkaRecordPayload, commitManager);
    }
}

The manual commit implementation records offsets asynchronously:

public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit implements KafkaAsyncManualCommit {
    private final CommitManager commitManager;

    public DefaultKafkaManualAsyncCommit(CamelExchangePayload camelExchangePayload,
                                        KafkaRecordPayload recordPayload,
                                        CommitManager commitManager) {
        super(camelExchangePayload, recordPayload);
        this.commitManager = commitManager;
    }

    @Override
    public void commit() {
        commitManager.recordOffset(getPartition(), getRecordOffset());
    }
}

Here, the `commit()` method does not directly commit offsets but signals the `commitManager` to handle the offset asynchronously.

Integration within Manual Offset Commit and Kafka Consumer Workflow

Asynchronous Manual Commit complements the broader Manual Offset Commit subtopic by offering a non-blocking alternative to synchronous commits. It integrates seamlessly with the Kafka consumer’s commit management layer, where different commit strategies (sync, async, noop, offset repository backed) coexist.

This subtopic introduces the asynchronous manual commit factory and implementation classes, which are distinct from and not covered by synchronous commit approaches in other subtopics.

Diagram

sequenceDiagram
    participant Route as Camel Route
    participant AsyncCommit as DefaultKafkaManualAsyncCommit
    participant CommitMgr as CommitManager
    participant Kafka as Kafka Broker

    Route->>AsyncCommit: Obtain manual commit object
    Route->>AsyncCommit: Call commit()
    AsyncCommit->>CommitMgr: recordOffset(partition, offset)
    CommitMgr--)Kafka: Commit offsets asynchronously
    Route->>Route: Continue processing messages without waiting

The diagram illustrates the asynchronous manual commit flow: from obtaining the manual commit object in the route, calling `commit()`, delegating to the commit manager for asynchronous offset recording and commit, while the route continues processing immediately.