AbstractCommitManager.java


Overview

`AbstractCommitManager` is an **abstract base class** within the Apache Camel Kafka component responsible for managing Kafka consumer offset commits. It provides foundational utilities, state, and common commit logic used by concrete commit manager implementations that handle different offset commit strategies such as synchronous, asynchronous, manual, or no-operation commits.

This class encapsulates the interaction with the underlying Kafka consumer, the Camel Kafka consumer endpoint configuration, logging facilities, and optional external offset repositories. It also supports generating manual commit instances which enable explicit offset committing from within Camel routes.

By abstracting common commit-related operations, `AbstractCommitManager` simplifies the implementation of various commit strategies, thus ensuring reliable offset management and flexible commit workflows in Kafka consumers.


Detailed Class Documentation

public abstract class AbstractCommitManager implements CommitManager

Package

`org.apache.camel.component.kafka.consumer`

Purpose


Fields

Field Name

Type

Description

`START_OFFSET`

`long` (static final)

Constant representing the start offset value `-1`.

`NON_PARTITION`

`long` (static final)

Constant representing a non-partition value `-1`.

`LOG`

`Logger` (static final)

Logger instance for logging within the class.

`kafkaConsumer`

`KafkaConsumer` (protected)

Reference to the Camel Kafka consumer instance.

`threadId`

`String` (protected)

Identifier for the thread running the consumer.

`printableTopic`

`String` (protected)

Human-readable topic name used for logging and diagnostics.

`configuration`

`KafkaConfiguration` (protected)

Kafka consumer endpoint configuration.

`consumer`

`Consumer` (private)

The underlying Kafka client consumer instance.


Constructor

protected AbstractCommitManager(
    Consumer<?, ?> consumer,
    KafkaConsumer kafkaConsumer,
    String threadId,
    String printableTopic)

Methods

getManualCommit

protected KafkaManualCommit getManualCommit(
    Exchange exchange,
    TopicPartition partition,
    ConsumerRecord<Object, Object> record,
    KafkaManualCommitFactory manualCommitFactory)
KafkaManualCommit manualCommit = getManualCommit(exchange, partition, record, manualCommitFactory);
manualCommit.commit();  // Explicitly commit the offset when ready

getManualCommit (Override)

@Override
public KafkaManualCommit getManualCommit(
    Exchange exchange,
    TopicPartition partition,
    ConsumerRecord<Object, Object> consumerRecord)

forceCommit

@Override
public void forceCommit(TopicPartition partition, long partitionLastOffset)
forceCommit(new TopicPartition("my-topic", 0), 100L);

saveStateToOffsetRepository

protected void saveStateToOffsetRepository(
    TopicPartition partition,
    long partitionLastOffset,
    StateRepository<String, String> offsetRepository)

serializeOffsetKey

protected static String serializeOffsetKey(TopicPartition topicPartition)
serializeOffsetKey(new TopicPartition("my-topic", 5));  // returns "my-topic/5"

serializeOffsetValue

protected static String serializeOffsetValue(long offset)

Important Implementation Details


Interaction with Other Components


Usage Context

`AbstractCommitManager` is not instantiated directly but extended by concrete commit manager implementations such as:

Each subclass builds upon this base to implement their commit strategy details.


Mermaid Class Diagram

classDiagram
    class AbstractCommitManager {
        <<abstract>>
        - static final long START_OFFSET = -1
        - static final long NON_PARTITION = -1
        - static final Logger LOG
        # KafkaConsumer kafkaConsumer
        # String threadId
        # String printableTopic
        # KafkaConfiguration configuration
        - Consumer<?, ?> consumer
        + AbstractCommitManager(Consumer<?, ?>, KafkaConsumer, String, String)
        # KafkaManualCommit getManualCommit(Exchange, TopicPartition, ConsumerRecord<Object, Object>, KafkaManualCommitFactory)
        + KafkaManualCommit getManualCommit(Exchange, TopicPartition, ConsumerRecord<Object, Object>)
        + void forceCommit(TopicPartition, long)
        # void saveStateToOffsetRepository(TopicPartition, long, StateRepository<String, String>)
        # static String serializeOffsetKey(TopicPartition)
        # static String serializeOffsetValue(long)
    }
    
    AbstractCommitManager <|-- SyncCommitManager
    AbstractCommitManager <|-- AsyncCommitManager
    AbstractCommitManager <|-- NoopCommitManager
    AbstractCommitManager <|-- CommitToOffsetManager

Summary

`AbstractCommitManager` provides essential infrastructure for Kafka offset commit management in Apache Camel's Kafka component. It abstracts common logic such as synchronous commit forcing, manual commit creation, offset serialization, and integration with external offset repositories. Through this abstraction, various commit strategies can be implemented in subclasses while sharing consistent behavior and configuration handling.

This design promotes modularity, extensibility, and reliability in offset management to support different use cases and delivery semantics in Kafka consumer applications built with Apache Camel.


End of Documentation for AbstractCommitManager.java