AbstractCommitManager.java
Overview
`AbstractCommitManager` is an **abstract base class** within the Apache Camel Kafka component responsible for managing Kafka consumer offset commits. It provides foundational utilities, state, and common commit logic used by concrete commit manager implementations that handle different offset commit strategies such as synchronous, asynchronous, manual, or no-operation commits.
This class encapsulates the interaction with the underlying Kafka consumer, the Camel Kafka consumer endpoint configuration, logging facilities, and optional external offset repositories. It also supports generating manual commit instances which enable explicit offset committing from within Camel routes.
By abstracting common commit-related operations, `AbstractCommitManager` simplifies the implementation of various commit strategies, thus ensuring reliable offset management and flexible commit workflows in Kafka consumers.
Detailed Class Documentation
public abstract class AbstractCommitManager implements CommitManager
Package
`org.apache.camel.component.kafka.consumer`
Purpose
Serves as a base class for Kafka offset commit managers.
Implements common commit utilities and state management.
Bridges Camel Kafka consumer configuration and the Kafka client consumer.
Provides support for manual commit creation and offset repository integration.
Fields
Field Name | Type | Description |
|---|---|---|
`START_OFFSET` | `long` (static final) | Constant representing the start offset value `-1`. |
`NON_PARTITION` | `long` (static final) | Constant representing a non-partition value `-1`. |
`LOG` | `Logger` (static final) | Logger instance for logging within the class. |
`kafkaConsumer` | `KafkaConsumer` (protected) | Reference to the Camel Kafka consumer instance. |
`threadId` | `String` (protected) | Identifier for the thread running the consumer. |
`printableTopic` | `String` (protected) | Human-readable topic name used for logging and diagnostics. |
`configuration` | `KafkaConfiguration` (protected) | Kafka consumer endpoint configuration. |
`consumer` | `Consumer` (private) | The underlying Kafka client consumer instance. |
Constructor
protected AbstractCommitManager(
Consumer<?, ?> consumer,
KafkaConsumer kafkaConsumer,
String threadId,
String printableTopic)
Parameters:
consumer: The Kafka client consumer instance used to commit offsets.kafkaConsumer: The Camel Kafka consumer object.threadId: A unique identifier for the consumer thread.printableTopic: The topic name for logging/display purposes.
Description: Initializes the abstract commit manager with references to the Kafka consumer, Camel Kafka consumer, thread identifier, and topic for logging. Fetches the Kafka configuration from the endpoint.
Methods
getManualCommit
protected KafkaManualCommit getManualCommit(
Exchange exchange,
TopicPartition partition,
ConsumerRecord<Object, Object> record,
KafkaManualCommitFactory manualCommitFactory)
Parameters:
exchange: The CamelExchangeassociated with the Kafka record processing.partition: The Kafka topic partition of the consumed record.record: The Kafka consumer record.manualCommitFactory: A factory instance to createKafkaManualCommitobjects.
Returns:
KafkaManualCommitinstance configured with the exchange, partition, record offset, timeout, and commit manager.Description: Creates a
KafkaManualCommitobject that encapsulates manual commit logic for a specific record and exchange context. Uses the configured offset repository and commit timeout to prepare the manual commit.Usage Example:
KafkaManualCommit manualCommit = getManualCommit(exchange, partition, record, manualCommitFactory);
manualCommit.commit(); // Explicitly commit the offset when ready
getManualCommit (Override)
@Override
public KafkaManualCommit getManualCommit(
Exchange exchange,
TopicPartition partition,
ConsumerRecord<Object, Object> consumerRecord)
Parameters:
exchange: The CamelExchangeinstance.partition: Kafka topic partition.consumerRecord: Kafka consumer record.
Returns: A new
KafkaManualCommitinstance.Description: Convenience method that fetches the manual commit factory from the consumer endpoint configuration. If none is configured, it defaults to
DefaultKafkaManualCommitFactory. Delegates creation to the protectedgetManualCommitmethod.
forceCommit
@Override
public void forceCommit(TopicPartition partition, long partitionLastOffset)
Parameters:
partition: The Kafka topic partition to commit.partitionLastOffset: The last processed offset for the partition.
Returns:
voidDescription: Forces a synchronous commit of the specified partition at the given offset (+1) using the Kafka consumer's
commitSyncAPI. Applies the commit timeout configured inKafkaConfiguration. Logs debug information if enabled.Implementation Detail: Adds 1 to the offset before committing, as Kafka commits the offset of the next message to consume.
Usage Example:
forceCommit(new TopicPartition("my-topic", 0), 100L);
This commits offset
101for partition 0 of "my-topic".
saveStateToOffsetRepository
protected void saveStateToOffsetRepository(
TopicPartition partition,
long partitionLastOffset,
StateRepository<String, String> offsetRepository)
Parameters:
partition: Kafka topic partition.partitionLastOffset: Last processed offset to save.offsetRepository: External state repository for storing offsets.
Returns:
voidDescription: Saves the current offset for the given partition to an external
StateRepository. This provides durability of offset state beyond Kafka's internal offset storage.Logging: Debug logs the offset being saved.
Usage Detail: The method uses helper static methods to serialize the partition key and offset value to strings.
serializeOffsetKey
protected static String serializeOffsetKey(TopicPartition topicPartition)
Parameters:
topicPartition: The Kafka topic partition.
Returns: A string key in the format
"topicName/partitionNumber".Description: Converts a
TopicPartitioninto a string key suitable for offset storage.Example:
serializeOffsetKey(new TopicPartition("my-topic", 5)); // returns "my-topic/5"
serializeOffsetValue
protected static String serializeOffsetValue(long offset)
Parameters:
offset: The offset to serialize.
Returns: String representation of the offset.
Description: Converts the offset long value to string.
Important Implementation Details
The class holds a reference to the Kafka client consumer and the Camel Kafka consumer to coordinate offset commit operations.
It retrieves configuration details such as commit timeout and optional external offset repository from the consumer endpoint configuration.
The
forceCommitmethod performs a synchronous commit to Kafka, ensuring offset persistence with a timeout.The class supports manual commit creation, enabling Camel routes to explicitly commit offsets via the
KafkaManualCommitabstraction.It provides utilities to serialize offsets for external storage in
StateRepositorykeyed by topic and partition.Logging is performed using SLF4J to trace commit operations, especially useful in debugging commit behaviors.
Interaction with Other Components
KafkaConsumer: The
AbstractCommitManageris used by theKafkaConsumerclass to handle offset commits during message processing.KafkaManualCommitFactory: Utilized to create
KafkaManualCommitobjects that enable manual offset committing.Kafka Client Consumer (
Consumer<?, ?>): Directly interacts with Kafka'scommitSyncAPI for synchronous commits.StateRepository: Optionally persists offsets externally for enhanced fault tolerance.
Logging (SLF4J): Used for debugging and operational visibility of commit actions.
Usage Context
`AbstractCommitManager` is not instantiated directly but extended by concrete commit manager implementations such as:
SyncCommitManager: For synchronous blocking commits.AsyncCommitManager: For asynchronous commits with callbacks.NoopCommitManager: For no-op commits when auto-commit is enabled.CommitToOffsetManager: For commits persisted in external offset repositories.
Each subclass builds upon this base to implement their commit strategy details.
Mermaid Class Diagram
classDiagram
class AbstractCommitManager {
<<abstract>>
- static final long START_OFFSET = -1
- static final long NON_PARTITION = -1
- static final Logger LOG
# KafkaConsumer kafkaConsumer
# String threadId
# String printableTopic
# KafkaConfiguration configuration
- Consumer<?, ?> consumer
+ AbstractCommitManager(Consumer<?, ?>, KafkaConsumer, String, String)
# KafkaManualCommit getManualCommit(Exchange, TopicPartition, ConsumerRecord<Object, Object>, KafkaManualCommitFactory)
+ KafkaManualCommit getManualCommit(Exchange, TopicPartition, ConsumerRecord<Object, Object>)
+ void forceCommit(TopicPartition, long)
# void saveStateToOffsetRepository(TopicPartition, long, StateRepository<String, String>)
# static String serializeOffsetKey(TopicPartition)
# static String serializeOffsetValue(long)
}
AbstractCommitManager <|-- SyncCommitManager
AbstractCommitManager <|-- AsyncCommitManager
AbstractCommitManager <|-- NoopCommitManager
AbstractCommitManager <|-- CommitToOffsetManager
Summary
`AbstractCommitManager` provides essential infrastructure for Kafka offset commit management in Apache Camel's Kafka component. It abstracts common logic such as synchronous commit forcing, manual commit creation, offset serialization, and integration with external offset repositories. Through this abstraction, various commit strategies can be implemented in subclasses while sharing consistent behavior and configuration handling.
This design promotes modularity, extensibility, and reliability in offset management to support different use cases and delivery semantics in Kafka consumer applications built with Apache Camel.