DefaultKafkaManualCommitFactory.java
Overview
[DefaultKafkaManualCommitFactory.java](/projects/289/68569) is a core component of the Apache Camel Kafka Manual Offset Commit Support module, responsible for creating **synchronous manual commit** instances for Kafka consumer offsets. It implements the `KafkaManualCommitFactory` interface and produces instances of `DefaultKafkaManualSyncCommit` that encapsulate the logic for synchronously committing Kafka offsets within Camel routes.
This factory enables fine-grained, explicit control over when Kafka offsets are committed by the route, ensuring that the offset commit operation blocks until completion. This is critical in processing scenarios that require strict ordering, exactly-once semantics, or transactional guarantees where the route must confirm offset persistence before proceeding.
Class Summary
DefaultKafkaManualCommitFactory
public class DefaultKafkaManualCommitFactory implements KafkaManualCommitFactory {
@Override
public KafkaManualCommit newInstance(
CamelExchangePayload camelExchangePayload,
KafkaRecordPayload kafkaRecordPayload,
CommitManager commitManager) {
return new DefaultKafkaManualSyncCommit(camelExchangePayload, kafkaRecordPayload, commitManager);
}
}
Description
Implements the
KafkaManualCommitFactoryinterface.Creates new instances of
DefaultKafkaManualSyncCommit, which perform synchronous manual commits of Kafka offsets.Each commit instance is tied to a specific Camel exchange, Kafka record metadata, and a commit manager that handles the actual offset commit operation.
Method Details
newInstance
KafkaManualCommit newInstance(
CamelExchangePayload camelExchangePayload,
KafkaRecordPayload kafkaRecordPayload,
CommitManager commitManager)
Parameters:
camelExchangePayload(KafkaManualCommitFactory.CamelExchangePayload): Contains the CamelExchange, KafkaConsumerinstance, thread identifier, and optionally an offset repository. Provides context needed for committing offsets within the Camel routing environment.kafkaRecordPayload(KafkaManualCommitFactory.KafkaRecordPayload): Holds Kafka topic partition, offset, and commit timeout settings related to the consumed record.commitManager(CommitManager): A strategy interface responsible for executing the commit operation. The synchronous commit assumes theCommitManagerperforms commit operations synchronously (blocking).
Returns:
A new instance ofKafkaManualCommit, specifically aDefaultKafkaManualSyncCommit, that can be used to commit the offset for the given record within the Camel route.Usage:
This method is typically called internally by the Kafka consumer component when processing incoming records to create manual commit objects that are attached to the Camel exchange.
Important Implementation Details
Synchronous Commit Strategy:
The factory exclusively produces synchronous commit instances (DefaultKafkaManualSyncCommit). These commit instances block the caller thread until the Kafka broker acknowledges the offset commit, ensuring durability before proceeding.Decoupling via Factory Pattern:
Using a factory interface and concrete implementations allows flexible switching between synchronous and asynchronous commit strategies without impacting the rest of the system. This class represents the synchronous variant.Integration with Commit Manager:
The returned commit instance delegates the actual commit logic to aCommitManager. This abstraction allows different commit strategies or backends (Kafka commit, offset repositories) to be plugged in transparently.Usage in Camel Routes:
Instances created by this factory are attached to Camel exchanges as headers (e.g., underKafkaConstants.MANUAL_COMMIT). Route processors can retrieve this instance and invokecommit()to perform a manual offset commit synchronously.
Interaction with Other Components
KafkaManualCommitFactoryInterface:
This class is a concrete implementation of this factory interface.DefaultKafkaManualSyncCommit:
The synchronous commit instance produced by this factory. Implements theKafkaManualCommitinterface and uses the commit manager for synchronous offset commits.CommitManager:
Responsible for performing the actual offset commit operation to Kafka or an external offset store.Camel
Exchange:
The factory uses the Camel exchange context payload to create manual commit instances linked to the current message processing.Kafka Consumer Component:
When consuming records, the Kafka consumer creates manual commit instances via this factory to enable explicit offset control within Camel routes.
Usage Example
// Assume these objects are available from the Kafka consumer context and Camel route
KafkaManualCommitFactory factory = new DefaultKafkaManualCommitFactory();
KafkaManualCommitFactory.CamelExchangePayload camelPayload =
new KafkaManualCommitFactory.CamelExchangePayload(exchange, consumer, threadId, offsetRepository);
KafkaManualCommitFactory.KafkaRecordPayload kafkaPayload =
new KafkaManualCommitFactory.KafkaRecordPayload(partition, offset, timeout);
CommitManager commitManager = ...; // injected or constructed
KafkaManualCommit manualCommit = factory.newInstance(camelPayload, kafkaPayload, commitManager);
// Attach the manual commit instance to the Camel exchange for later use
exchange.getMessage().setHeader(KafkaConstants.MANUAL_COMMIT, manualCommit);
In a Camel route processor, the manual commit can be triggered as follows:
public void process(Exchange exchange) throws Exception {
KafkaManualCommit manual = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
manual.commit(); // Synchronously commit the offset
}
}
Mermaid Class Diagram
classDiagram
class DefaultKafkaManualCommitFactory {
+newInstance(camelExchangePayload: CamelExchangePayload, kafkaRecordPayload: KafkaRecordPayload, commitManager: CommitManager) KafkaManualCommit
}
class KafkaManualCommitFactory {
<<interface>>
+newInstance(camelExchangePayload: CamelExchangePayload, kafkaRecordPayload: KafkaRecordPayload, commitManager: CommitManager) KafkaManualCommit
}
class DefaultKafkaManualSyncCommit {
+commit()
-camelExchangePayload: CamelExchangePayload
-kafkaRecordPayload: KafkaRecordPayload
-commitManager: CommitManager
}
DefaultKafkaManualCommitFactory ..|> KafkaManualCommitFactory
DefaultKafkaManualCommitFactory --> DefaultKafkaManualSyncCommit : creates
Summary
DefaultKafkaManualCommitFactoryprovides a synchronous manual commit implementation for Apache Camel Kafka consumers.It creates instances of
DefaultKafkaManualSyncCommitthat block until Kafka acknowledges offset commits.This class fits into the manual commit support system by enabling explicit, deterministic offset commit control within Camel routes.
It leverages the factory design pattern for flexible commit strategy selection.
It is tightly integrated with the
CommitManagerabstraction, CamelExchangepayloads, and Kafka consumer components.
This component is essential for use cases requiring strong processing guarantees and immediate offset commit acknowledgment, enabling developers to build robust Kafka consumer integrations with explicit offset management.
Additional Context: Manual Offset Commit Workflow
The following sequence diagram illustrates the interaction of this factory within the broader manual commit workflow:
sequenceDiagram
participant KafkaConsumer as Kafka Consumer
participant Factory as DefaultKafkaManualCommitFactory
participant CamelExchange as Camel Exchange
participant RouteProcessor as Route Processor
participant ManualCommitProc as ManualCommit Processor
participant CommitManager as Commit Manager
KafkaConsumer->>Factory: newInstance(camelPayload, kafkaPayload, commitManager)
Factory->>CamelExchange: Attach DefaultKafkaManualSyncCommit instance
CamelExchange->>RouteProcessor: Deliver exchange with manual commit header
RouteProcessor->>ManualCommitProc: Invoke ManualCommit processor
ManualCommitProc->>CamelExchange: Retrieve manual commit instance
ManualCommitProc->>CommitManager: commit() (synchronous)
CommitManager-->>KafkaConsumer: Commit offsets synchronously
This depicts the lifecycle from record consumption, manual commit instance creation, through to synchronous offset committing triggered by the route processor.
References
KafkaManualCommitFactory— Factory interface for creating manual commit instances.DefaultKafkaManualSyncCommit— Synchronous commit implementation created by this factory.CommitManager— Abstraction for offset commit execution.Apache Camel Kafka component manual offset commit support.
This documentation aims to provide a thorough understanding of the role and usage of [DefaultKafkaManualCommitFactory.java](/projects/289/68569) within the Apache Camel Kafka manual offset commit architecture.