Asynchronous Manual Commit
Purpose
Asynchronous Manual Commit addresses the need for explicit, non-blocking offset commits in Kafka consumers managed within Apache Camel routes. This subtopic enables users to manually control when offsets are committed to Kafka without stalling the processing thread, improving throughput and responsiveness in high-volume or latency-sensitive scenarios. Unlike synchronous manual commits, which commit offsets immediately and block until the commit completes, asynchronous commits record offset information for later submission, allowing the consumer to continue processing messages while offset commits happen in the background.
Functionality
The core functionality revolves around providing a manual commit object that can be attached to each consumed Kafka message within a Camel exchange. This object exposes a `commit()` method that, when invoked, does not perform an immediate offset commit but rather records the offset asynchronously to be committed later by the commit manager.
Key workflows include:
Manual Commit Creation: Using a factory (
DefaultKafkaManualAsyncCommitFactory), the system creates instances of the asynchronous manual commit object for each message.Offset Recording: Invoking
commit()on the manual commit object delegates to aCommitManagerthat caches the offset internally and schedules asynchronous commit operations to Kafka.Non-blocking Offset Commit: The actual commit to Kafka happens asynchronously, allowing the Camel route to proceed without waiting for Kafka broker response.
This approach helps decouple message processing from Kafka offset commit latency, improving overall consumer efficiency.
Code Interaction Example
The factory class creates manual commit instances:
public class DefaultKafkaManualAsyncCommitFactory implements KafkaManualCommitFactory {
@Override
public KafkaManualCommit newInstance(
CamelExchangePayload camelExchangePayload,
KafkaRecordPayload kafkaRecordPayload,
CommitManager commitManager) {
return new DefaultKafkaManualAsyncCommit(camelExchangePayload, kafkaRecordPayload, commitManager);
}
}
The manual commit implementation records offsets asynchronously:
public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit implements KafkaAsyncManualCommit {
private final CommitManager commitManager;
public DefaultKafkaManualAsyncCommit(CamelExchangePayload camelExchangePayload,
KafkaRecordPayload recordPayload,
CommitManager commitManager) {
super(camelExchangePayload, recordPayload);
this.commitManager = commitManager;
}
@Override
public void commit() {
commitManager.recordOffset(getPartition(), getRecordOffset());
}
}
Here, the `commit()` method does not directly commit offsets but signals the `commitManager` to handle the offset asynchronously.
Integration within Manual Offset Commit and Kafka Consumer Workflow
Asynchronous Manual Commit complements the broader Manual Offset Commit subtopic by offering a non-blocking alternative to synchronous commits. It integrates seamlessly with the Kafka consumer’s commit management layer, where different commit strategies (sync, async, noop, offset repository backed) coexist.
Within the Parent Topic: It extends explicit manual offset commit support by providing an asynchronous mechanism, enhancing flexibility for users needing non-blocking offset control.
With Commit Managers: The
CommitManagerabstraction handles actual commit logic, ensuring offsets recorded asynchronously are eventually committed to Kafka in a safe and consistent manner.With KafkaConsumer: As manual commit objects are attached to Camel exchanges during message consumption, routes can invoke
commit()asynchronously, decoupling offset commits from message processing.Complementing Synchronous Manual Commit: Users can choose between synchronous or asynchronous manual commit implementations based on their performance and reliability requirements.
This subtopic introduces the asynchronous manual commit factory and implementation classes, which are distinct from and not covered by synchronous commit approaches in other subtopics.
Diagram
sequenceDiagram
participant Route as Camel Route
participant AsyncCommit as DefaultKafkaManualAsyncCommit
participant CommitMgr as CommitManager
participant Kafka as Kafka Broker
Route->>AsyncCommit: Obtain manual commit object
Route->>AsyncCommit: Call commit()
AsyncCommit->>CommitMgr: recordOffset(partition, offset)
CommitMgr--)Kafka: Commit offsets asynchronously
Route->>Route: Continue processing messages without waiting
The diagram illustrates the asynchronous manual commit flow: from obtaining the manual commit object in the route, calling `commit()`, delegating to the commit manager for asynchronous offset recording and commit, while the route continues processing immediately.