DefaultKafkaManualAsyncCommitFactory.java
Overview
[DefaultKafkaManualAsyncCommitFactory.java](/projects/289/68571) is a key component in Apache Camel's Kafka consumer manual offset commit support module. This class implements the `KafkaManualCommitFactory` interface and serves as a factory for creating instances of asynchronous manual commit objects (`DefaultKafkaManualAsyncCommit`). Its primary purpose is to provide a mechanism for creating manual commit objects that allow Kafka offset commits to be performed asynchronously during message processing within Camel routes.
This asynchronous commit strategy enables non-blocking offset commits, improving throughput and consumer responsiveness by decoupling offset committing from message processing threads. This is particularly useful in high-volume or latency-sensitive Kafka consumer applications integrated with Apache Camel.
Detailed Class and Method Documentation
Class: DefaultKafkaManualAsyncCommitFactory
Package
org.apache.camel.component.kafka.consumer
Description
A factory class responsible for creating new instances of `KafkaManualCommit` that perform asynchronous offset commits. This factory produces instances of `DefaultKafkaManualAsyncCommit`, which encapsulate the logic for asynchronously committing Kafka offsets within Camel Kafka consumer routes.
Implements
KafkaManualCommitFactory
This interface defines a contract for factories that create `KafkaManualCommit` instances tied to a specific Camel `Exchange` and Kafka record.
Method: newInstance
@Override
public KafkaManualCommit newInstance(
CamelExchangePayload camelExchangePayload,
KafkaRecordPayload kafkaRecordPayload,
CommitManager commitManager)
Description
Creates and returns a new `KafkaManualCommit` instance that supports asynchronous commit semantics.
Parameters
CamelExchangePayload camelExchangePayload:
A payload object encapsulating the Camel exchange context, including the exchange itself, the Kafka consumer, the processing thread ID, and optionally an offset repository. This provides the manual commit instance access to the required Camel and Kafka consumer context.KafkaRecordPayload kafkaRecordPayload:
Contains metadata about the Kafka record to be committed, such as theTopicPartition, the offset of the record, and commit timeout settings.CommitManager commitManager:
An abstraction responsible for managing the underlying commit operations. It handles the actual logic of recording offsets and asynchronously committing them to Kafka or an external offset repository.
Returns
KafkaManualCommit:
An instance ofDefaultKafkaManualAsyncCommit, which implements asynchronous manual commit behavior.
Usage Example
KafkaManualCommitFactory factory = new DefaultKafkaManualAsyncCommitFactory();
KafkaManualCommitFactory.CamelExchangePayload camelPayload =
new KafkaManualCommitFactory.CamelExchangePayload(exchange, consumer, threadId, offsetRepository);
KafkaManualCommitFactory.KafkaRecordPayload kafkaPayload =
new KafkaManualCommitFactory.KafkaRecordPayload(topicPartition, offset, timeout);
KafkaManualCommit manualCommit = factory.newInstance(camelPayload, kafkaPayload, commitManager);
exchange.getMessage().setHeader(KafkaConstants.MANUAL_COMMIT, manualCommit);
Important Implementation Details and Algorithms
This factory strictly produces asynchronous manual commit instances (
DefaultKafkaManualAsyncCommit), which means that when the commit is invoked, it does not block the calling thread waiting for Kafka to acknowledge the offset commit.The asynchronous commit instance relies on the provided
CommitManagerto handle offset recording and commit scheduling in a non-blocking manner.The design allows for interchangeable commit strategies by simply switching factories, supporting flexibility in commit behavior without changing route or consumer logic.
This class itself is very lightweight; it delegates the complexity of commit logic to the
DefaultKafkaManualAsyncCommitandCommitManagerclasses.
Interaction with Other System Components
KafkaManualCommitFactoryInterface:
This class implements the factory interface, which defines a method to create manual commit instances.DefaultKafkaManualAsyncCommitClass:
Instances created by this factory are of this type, implementing the asynchronous commit behavior.CommitManager:
Passed into the factory method and used by the created commit instance to perform offset commits asynchronously.Camel Kafka Consumer:
During message consumption, this factory is used to create manual commit instances for each Kafka record or batch. These instances are then attached to the CamelExchangeas headers for use in routes.Camel Routes:
Routes can retrieve the manual commit instance from the message header and invokecommit()asynchronously, allowing non-blocking offset commits.
Summary
Aspect | Description |
|---|---|
**Purpose** | Factory for asynchronous manual Kafka offset commit instances. |
**Functionality** | Creates `KafkaManualCommit` objects supporting async offset commit. |
**Key Method** | `newInstance(CamelExchangePayload, KafkaRecordPayload, CommitManager)` |
**Dependencies** | `DefaultKafkaManualAsyncCommit`, `CommitManager`, Camel Exchange and Kafka contexts. |
**Usage Context** | Used within Kafka consumer processing to enable non-blocking manual offset commits in Camel routes. |
Mermaid Class Diagram
classDiagram
class DefaultKafkaManualAsyncCommitFactory {
<<implements>> KafkaManualCommitFactory
+newInstance(camelExchangePayload: CamelExchangePayload, kafkaRecordPayload: KafkaRecordPayload, commitManager: CommitManager) KafkaManualCommit
}
class KafkaManualCommitFactory {
<<interface>>
+newInstance(camelExchangePayload: CamelExchangePayload, kafkaRecordPayload: KafkaRecordPayload, commitManager: CommitManager) KafkaManualCommit
}
DefaultKafkaManualAsyncCommitFactory ..|> KafkaManualCommitFactory
DefaultKafkaManualAsyncCommitFactory --> DefaultKafkaManualAsyncCommit : creates
class DefaultKafkaManualAsyncCommit {
+commit()
-commitManager: CommitManager
}
DefaultKafkaManualAsyncCommitFactory --> DefaultKafkaManualAsyncCommit
Additional Notes
This factory forms part of a broader manual offset commit architecture in Apache Camel’s Kafka component, allowing users to explicitly control offset commits with different strategies (sync or async).
It enables better throughput and responsiveness for Kafka consumers by avoiding blocking calls during offset commit, which is essential in high-performance messaging scenarios.
The factory pattern used here supports clean separation and easy extensibility of commit strategies.
References
See
DefaultKafkaManualAsyncCommitfor the commit implementation details.See
KafkaManualCommitFactoryinterface for the factory contract.Related commit strategies: DefaultKafkaManualCommitFactory (synchronous commit).
Manual offset commit support documentation in Apache Camel Kafka component.
This documentation provides a comprehensive understanding of the [DefaultKafkaManualAsyncCommitFactory.java](/projects/289/68571) file, its role in asynchronous Kafka manual commit processing, and how it integrates within the Apache Camel Kafka consumer framework.