KafkaManualCommitFactory.java
Overview
`KafkaManualCommitFactory.java` defines a factory interface for creating instances of `KafkaManualCommit`—objects responsible for managing manual offset commits in Kafka consumers within Apache Camel routes. This interface is central to the manual commit support mechanism that allows developers to explicitly control when Kafka consumer offsets are committed, rather than relying on Kafka’s automatic commit behavior.
This file provides:
The
KafkaManualCommitFactoryinterface with a method to create newKafkaManualCommitinstances.Two nested payload classes,
CamelExchangePayloadandKafkaRecordPayload, which encapsulate context information from Camel and Kafka respectively, required to create commit instances.
By abstracting the creation of commit instances, this interface enables multiple commit strategies (e.g., synchronous and asynchronous commits) to be plugged in transparently, enhancing flexibility and control over Kafka offset commits in Camel routes.
Detailed Explanation
Interface: KafkaManualCommitFactory
public interface KafkaManualCommitFactory {
KafkaManualCommit newInstance(
CamelExchangePayload camelExchangePayload,
KafkaRecordPayload kafkaRecordPayload,
CommitManager commitManager);
}
Purpose:
Defines a factory contract to create newKafkaManualCommitinstances associated with a specific Camel exchange and Kafka record.Method:
newInstance:
Creates and returns a newKafkaManualCommitobject using the provided payloads and aCommitManager.
Parameters:
CamelExchangePayload camelExchangePayload
Contains Camel-related context including theExchange, KafkaConsumer, thread ID, and an optional offset repository.KafkaRecordPayload kafkaRecordPayload
Contains Kafka record-related information such as topic partition, offset, and commit timeout.CommitManager commitManager
Responsible for executing the actual commit operation underlying theKafkaManualCommit.
Returns:
A new instance ofKafkaManualCommitconfigured with the provided context.
Nested Class: CamelExchangePayload
class CamelExchangePayload {
public final Exchange exchange;
public final Consumer<?, ?> consumer;
public final String threadId;
public final StateRepository<String, String> offsetRepository;
public CamelExchangePayload(Exchange exchange, Consumer<?, ?> consumer, String threadId,
StateRepository<String, String> offsetRepository) {
this.exchange = exchange;
this.consumer = consumer;
this.threadId = threadId;
this.offsetRepository = offsetRepository;
}
}
Purpose:
Encapsulates Camel-specific data required to create a manual commit instance.Properties:
Exchange exchange: The Camel message exchange, representing the message and its processing context.Consumer<?, ?> consumer: The Kafka consumer instance used to poll messages.String threadId: Identifier for the thread processing the exchange, useful for tracking or concurrency control.StateRepository<String, String> offsetRepository: Optional external repository to store offsets outside Kafka (e.g., a distributed cache or database).
Usage:
Passed to the factory to provide the runtime Camel context needed for offset commit operations.
Nested Class: KafkaRecordPayload
class KafkaRecordPayload {
public final TopicPartition partition;
public final long recordOffset;
public final long commitTimeout;
public KafkaRecordPayload(TopicPartition partition, long recordOffset, long commitTimeout) {
this.partition = partition;
this.recordOffset = recordOffset;
this.commitTimeout = commitTimeout;
}
}
Purpose:
Holds Kafka-specific data related to the record being processed and committed.Properties:
TopicPartition partition: Kafka topic partition from which the record was consumed.long recordOffset: Offset of the Kafka record to be committed.long commitTimeout: Timeout duration (in milliseconds) to wait for the commit operation to complete.
Usage:
Provides the necessary Kafka metadata for the commit operation to be correctly targeted.
Important Implementation Details
Factory Pattern Usage:
This interface abstracts the creation ofKafkaManualCommitinstances, decoupling the commit logic from the consumer and allowing different commit strategies (e.g., synchronous, asynchronous, repository-backed) to be implemented and swapped easily.Payload Holder Classes:
Both payload classes serve as immutable data carriers, grouping relevant information from Camel and Kafka to be passed into the factory method. This design improves clarity and maintainability by segregating concerns and reducing method signature complexity.Integration with CommitManager:
The factory method requires aCommitManagerinstance, which encapsulates the actual commit logic. This separation allows commit implementations created by the factory to delegate commit execution to the manager, supporting various commit strategies and external offset storage.
Interaction with Other System Components
Kafka Consumer (
Consumer):
The consumer instance is included in theCamelExchangePayloadand used in commit implementations to perform offset commits.Camel
Exchange:
The exchange is passed to the factory and ultimately holds the manual commit instance, typically attached as a message header for later retrieval in the Camel route.CommitManager:
Implements the commit strategy. Factories produce commit instances that delegate toCommitManagerfor actual offset commit execution.Offset Repository (
StateRepository):
Allows offset storage outside Kafka, helpful in advanced use cases requiring external offset management.Camel Routes:
Manual commit instances created by this factory are placed in Camel exchange headers and invoked by processors within the route to perform manual offset commits.
Usage Example
// Create Camel exchange payload with context
KafkaManualCommitFactory.CamelExchangePayload camelPayload =
new KafkaManualCommitFactory.CamelExchangePayload(exchange, consumer, threadId, offsetRepository);
// Create Kafka record payload with record info
KafkaManualCommitFactory.KafkaRecordPayload kafkaPayload =
new KafkaManualCommitFactory.KafkaRecordPayload(partition, recordOffset, commitTimeout);
// Create commit manager instance (implementation dependent)
CommitManager commitManager = ...;
// Use a factory implementation to create a manual commit instance
KafkaManualCommit manualCommit = factory.newInstance(camelPayload, kafkaPayload, commitManager);
// Attach to exchange header for later commit invocation
exchange.getMessage().setHeader(KafkaConstants.MANUAL_COMMIT, manualCommit);
Visual Diagram: Class Structure of KafkaManualCommitFactory.java
classDiagram
interface KafkaManualCommitFactory {
+newInstance(CamelExchangePayload, KafkaRecordPayload, CommitManager) KafkaManualCommit
}
class CamelExchangePayload {
+Exchange exchange
+Consumer<?, ?> consumer
+String threadId
+StateRepository<String,String> offsetRepository
+CamelExchangePayload(exchange, consumer, threadId, offsetRepository)
}
class KafkaRecordPayload {
+TopicPartition partition
+long recordOffset
+long commitTimeout
+KafkaRecordPayload(partition, recordOffset, commitTimeout)
}
KafkaManualCommitFactory o-- CamelExchangePayload : uses
KafkaManualCommitFactory o-- KafkaRecordPayload : uses
Summary
`KafkaManualCommitFactory.java` is a key abstraction in Apache Camel’s Kafka component that facilitates explicit, manual control over Kafka offset commits. By defining a factory interface and associated payload classes, it enables flexible and extensible creation of commit instances that can be tailored to different commit strategies and application requirements. This design plays a crucial role in supporting advanced messaging patterns such as transactional processing and exactly-once semantics within Camel routes consuming from Kafka.