DefaultKafkaManualCommit.java
Overview
`DefaultKafkaManualCommit.java` defines an **abstract base class** for handling manual offset commits in Apache Kafka consumers within the Apache Camel Kafka component. This class encapsulates common state and behavior useful for managing Kafka manual commits, providing access to essential Kafka consumer metadata such as topic partitions, offsets, and commit timeouts.
This class acts as a foundational building block for more specialized manual commit implementations, holding references to payload objects that carry the Camel Exchange context and Kafka record details. It facilitates interaction with Kafka's consumer API indirectly and standardizes access to commit-related information.
Detailed Explanation
Package
package org.apache.camel.component.kafka.consumer;
This class resides in the Kafka consumer package of the Apache Camel Kafka component, which integrates Kafka messaging within Apache Camel routes.
Import Statements
org.apache.camel.spi.StateRepository— Interface for state persistence, used here for offset management.org.apache.kafka.clients.consumer.Consumer— Kafka consumer API.org.apache.kafka.common.TopicPartition— Represents a topic-partition tuple in Kafka.
Class: DefaultKafkaManualCommit
public abstract class DefaultKafkaManualCommit implements KafkaManualCommit
Description
Abstract base class implementing the
KafkaManualCommitinterface.Provides protected references to payload objects containing the Camel Exchange and Kafka record details.
Supplies methods to access Kafka consumer metadata like topic name, partition, record offset, and commit timeout.
Contains deprecated methods retained for backward compatibility.
Fields
Field | Type | Description |
|---|---|---|
`protected final camelExchangePayload` | `KafkaManualCommitFactory.CamelExchangePayload` | Holds the Camel exchange context including consumer and thread info. |
`protected final kafkaRecordPayload` | `KafkaManualCommitFactory.KafkaRecordPayload` | Encapsulates Kafka record-specific metadata such as partition, offset, and timeout. |
Constructor
protected DefaultKafkaManualCommit(
KafkaManualCommitFactory.CamelExchangePayload camelExchangePayload,
KafkaManualCommitFactory.KafkaRecordPayload kafkaRecordPayload)
Parameters:
camelExchangePayload: An instance containing Camel Exchange context and Kafka consumer references.kafkaRecordPayload: An instance encapsulating Kafka record metadata (topic partition, offset, commit timeout).
Purpose: Initializes the internal payload references to be used by other methods.
Methods
getConsumer()
@Deprecated(since = "3.15.0")
public Consumer<?, ?> getConsumer()
Description: Returns the Kafka consumer instance from the Camel exchange payload.
Deprecated: Use
getCamelExchangePayload()for accessing consumer.Returns:
Consumer<?, ?>— Kafka consumer instance.Usage Example:
Consumer<?, ?> consumer = manualCommit.getConsumer();
getTopicName()
public String getTopicName()
Description: Retrieves the Kafka topic name of the current record's partition.
Returns:
String— topic name.Usage Example:
String topic = manualCommit.getTopicName();
getThreadId()
public String getThreadId()
Description: Returns the ID of the thread processing the Kafka record.
Returns:
String— thread identifier.Usage Example:
String threadId = manualCommit.getThreadId();
getOffsetRepository()
@Deprecated
public StateRepository<String, String> getOffsetRepository()
Description: Returns the offset repository used for storing offsets externally, if configured.
Deprecated: Replaced by other offset management mechanisms.
Returns:
StateRepository<String, String>— offset storage abstraction.Usage Example:
StateRepository<String, String> repository = manualCommit.getOffsetRepository();
getPartition()
public TopicPartition getPartition()
Description: Fetches the Kafka
TopicPartitionassociated with the current record.Returns:
TopicPartition— topic and partition info.Usage Example:
TopicPartition partition = manualCommit.getPartition();
getRecordOffset()
public long getRecordOffset()
Description: Retrieves the offset of the Kafka record.
Returns:
long— offset value.Usage Example:
long offset = manualCommit.getRecordOffset();
getCommitTimeout()
public long getCommitTimeout()
Description: Returns the timeout value in milliseconds for committing the offset.
Returns:
long— commit timeout in ms.Usage Example:
long timeout = manualCommit.getCommitTimeout();
getCamelExchangePayload()
public KafkaManualCommitFactory.CamelExchangePayload getCamelExchangePayload()
Description: Provides access to the Camel Exchange payload object.
Returns:
CamelExchangePayload— contains consumer, thread ID, offset repository, and potentially other context info.Usage Example:
CamelExchangePayload exchangePayload = manualCommit.getCamelExchangePayload();
getKafkaRecordPayload()
public KafkaManualCommitFactory.KafkaRecordPayload getKafkaRecordPayload()
Description: Provides access to the Kafka Record payload containing partition, offset, and commit timeout.
Returns:
KafkaRecordPayload— Kafka record metadata.Usage Example:
KafkaRecordPayload recordPayload = manualCommit.getKafkaRecordPayload();
toString()
@Override
public String toString()
Description: Returns a string representation of the manual commit instance, including topic and offset.
Returns:
String— formatted summary.Example Output:
KafkaManualCommit[topic=my-topic, offset=1234]
Important Implementation Details
This class is abstract, designed to be extended by concrete implementations that define the actual commit logic.
It does not implement offset commit operations itself but manages data needed for committing.
The use of two inner payload objects (
CamelExchangePayloadandKafkaRecordPayload) encapsulates Camel and Kafka-specific data cleanly, promoting separation of concerns.Deprecated methods indicate a migration path in the API, encouraging users to access underlying data via payloads rather than direct consumer or repository references.
This class relies on Kafka's
TopicPartitionandConsumerAPIs but abstracts direct commit responsibilities to subclasses or other components.
Interaction with Other System Components
Part of the Apache Camel Kafka component — integrates Kafka with Camel routing and processing.
Collaborates with
KafkaManualCommitFactory, which provides the payload objects used here.Used in Kafka consumer processing threads to manage manual offset commits, allowing fine-grained control over committing offsets after message processing.
Interacts indirectly with Kafka's consumer API through payload objects.
Supports offset state persistence via the
StateRepositoryinterface, enabling external offset storage mechanisms.
Usage Example
public class MyKafkaManualCommit extends DefaultKafkaManualCommit {
public MyKafkaManualCommit(CamelExchangePayload camelExchangePayload, KafkaRecordPayload kafkaRecordPayload) {
super(camelExchangePayload, kafkaRecordPayload);
}
public void commit() {
Consumer<?, ?> consumer = getCamelExchangePayload().consumer;
TopicPartition partition = getPartition();
long offset = getRecordOffset() + 1; // commit next offset
// commit synchronously
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
}
}
Class Diagram
classDiagram
class DefaultKafkaManualCommit {
-camelExchangePayload: KafkaManualCommitFactory.CamelExchangePayload
-kafkaRecordPayload: KafkaManualCommitFactory.KafkaRecordPayload
+DefaultKafkaManualCommit(CamelExchangePayload, KafkaRecordPayload)
+Consumer<?, ?> getConsumer() <<deprecated>>
+String getTopicName()
+String getThreadId()
+StateRepository<String, String> getOffsetRepository() <<deprecated>>
+TopicPartition getPartition()
+long getRecordOffset()
+long getCommitTimeout()
+CamelExchangePayload getCamelExchangePayload()
+KafkaRecordPayload getKafkaRecordPayload()
+String toString()
}
Summary
`DefaultKafkaManualCommit.java` is an essential abstract class in Apache Camel's Kafka consumer integration, encapsulating metadata and context required for manual offset commits. It manages Kafka record details, consumer and thread info, and supports backward compatibility while guiding users toward modern API usage. Its design promotes extensibility and clean separation between Camel and Kafka-specific payload data, serving as a reliable base for concrete manual commit implementations.