DefaultKafkaManualAsyncCommit.java
Overview
`DefaultKafkaManualAsyncCommit.java` is a core implementation class within the Apache Camel Kafka component that provides **asynchronous manual offset commit** functionality for Kafka consumers. It extends the synchronous manual commit support by enabling offset commits to be recorded non-blockingly, thereby allowing Kafka consumers to continue processing messages without waiting for the offset commit to complete.
This class plays a critical role in improving throughput and responsiveness in high-volume or latency-sensitive Kafka consumer scenarios integrated with Apache Camel routes. Instead of committing offsets directly to Kafka brokers in a blocking manner, it delegates the offset recording to a `CommitManager`, which handles asynchronous commit operations.
Class: DefaultKafkaManualAsyncCommit
Declaration
public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit implements KafkaAsyncManualCommit
Purpose
Implements asynchronous manual commit semantics by overriding the
commit()method.Delegates offset commit requests to a
CommitManagerthat manages offset recording and asynchronous commits.Extends
DefaultKafkaManualCommitto reuse existing functionality related to partition and offset retrieval.Implements
KafkaAsyncManualCommitinterface signaling asynchronous commit behavior.
Properties
Property | Type | Description |
|---|---|---|
`commitManager` | `CommitManager` | The manager responsible for recording offsets asynchronously. |
Constructor
public DefaultKafkaManualAsyncCommit(
KafkaManualCommitFactory.CamelExchangePayload camelExchangePayload,
KafkaManualCommitFactory.KafkaRecordPayload recordPayload,
CommitManager commitManager)
Parameters
camelExchangePayload: Encapsulates the Camel exchange context and payload information related to the Kafka message.recordPayload: Holds the Kafka record details, including partition and offset information.commitManager: The commit manager instance responsible for handling asynchronous offset commits.
Description
Initializes the manual commit object with the necessary context and delegates to the superclass constructor for shared initialization. Stores the `commitManager` to delegate asynchronous commit requests.
Usage Example
DefaultKafkaManualAsyncCommit asyncCommit = new DefaultKafkaManualAsyncCommit(
camelExchangePayload,
recordPayload,
commitManager
);
Method: commit()
@Override
public void commit()
Description
Overrides the
commit()method from the parent class and interface.Instead of performing a direct offset commit, it calls
commitManager.recordOffset()with the partition and offset obtained from the underlying record.This method records the offset asynchronously, enabling non-blocking commits.
Behavior
When invoked, the method signals the `CommitManager` to record the current partition and offset. The `CommitManager` subsequently handles the actual commit to Kafka asynchronously, allowing the route or consumer to proceed immediately without waiting for Kafka broker acknowledgment.
Usage Example
// Trigger asynchronous offset commit for the current record
asyncCommit.commit();
Important Implementation Details
Inheritance: Extends
DefaultKafkaManualCommitto inherit methods likegetPartition()andgetRecordOffset(), which provide partition and offset information from the Kafka record.Interface Implementation: Implements
KafkaAsyncManualCommitto explicitly denote asynchronous commit behavior and to be used polymorphically where async commits are required.Delegation: The actual commit logic is delegated to the
CommitManager, which abstracts the complexity of batching, scheduling, and committing offsets asynchronously.Threading and Performance: By recording offsets asynchronously, this class helps reduce blocking calls in Camel routes, thus enhancing throughput and lowering latency in message processing pipelines.
Interaction with Other Components
CommitManager:
ThecommitManagerinstance is the key collaborator responsible for managing commit state and orchestrating asynchronous commits to Kafka brokers. This class delegates offset recording to it.DefaultKafkaManualCommit:
The superclass that provides foundational manual commit functionality such as accessing partition and offset details from the Kafka record payload.KafkaManualCommitFactory:
Typically, instances of this class are created by a factory class likeDefaultKafkaManualAsyncCommitFactory. The factory handles creation and injection of dependencies such as theCommitManager.Apache Camel Kafka Consumer:
This class is used within Camel routes to provide manual commit controls on consumed Kafka messages, allowing route developers to invoke asynchronous commits explicitly.
Usage Scenario Example
In an Apache Camel Kafka consumer route, a message arrives and is wrapped in a Camel exchange. The exchange payload includes a manual commit object of type `DefaultKafkaManualAsyncCommit`. When the route logic decides to commit the offset, it calls:
manualCommit.commit();
This invocation does not block the processing thread but records the offset asynchronously for later commit, enabling the route to continue processing subsequent messages efficiently.
Mermaid Class Diagram
classDiagram
class DefaultKafkaManualAsyncCommit {
-CommitManager commitManager
+DefaultKafkaManualAsyncCommit(CamelExchangePayload camelExchangePayload, KafkaRecordPayload recordPayload, CommitManager commitManager)
+commit()
+getPartition()
+getRecordOffset()
}
class DefaultKafkaManualCommit {
+getPartition()
+getRecordOffset()
+commit()
}
interface KafkaAsyncManualCommit {
+commit()
}
DefaultKafkaManualAsyncCommit --|> DefaultKafkaManualCommit
DefaultKafkaManualAsyncCommit ..|> KafkaAsyncManualCommit
Summary
`DefaultKafkaManualAsyncCommit.java` is a lightweight, focused class designed to support asynchronous manual offset commits in Kafka consumers integrated with Apache Camel routes. By leveraging a `CommitManager` for asynchronous offset recording, it helps improve consumer throughput and responsiveness, crucial for high-performance streaming applications. It fits into the broader manual commit architecture by implementing the asynchronous commit interface and extending the default manual commit base class.
This class is intended to be used internally by the Kafka component within Apache Camel and instantiated via factory classes to provide users with an explicit and non-blocking offset commit mechanism.