DefaultKafkaManualCommit.java


Overview

`DefaultKafkaManualCommit.java` defines an **abstract base class** for handling manual offset commits in Apache Kafka consumers within the Apache Camel Kafka component. This class encapsulates common state and behavior useful for managing Kafka manual commits, providing access to essential Kafka consumer metadata such as topic partitions, offsets, and commit timeouts.

This class acts as a foundational building block for more specialized manual commit implementations, holding references to payload objects that carry the Camel Exchange context and Kafka record details. It facilitates interaction with Kafka's consumer API indirectly and standardizes access to commit-related information.


Detailed Explanation

Package

package org.apache.camel.component.kafka.consumer;

This class resides in the Kafka consumer package of the Apache Camel Kafka component, which integrates Kafka messaging within Apache Camel routes.

Import Statements


Class: DefaultKafkaManualCommit

public abstract class DefaultKafkaManualCommit implements KafkaManualCommit

Description


Fields

Field

Type

Description

`protected final camelExchangePayload`

`KafkaManualCommitFactory.CamelExchangePayload`

Holds the Camel exchange context including consumer and thread info.

`protected final kafkaRecordPayload`

`KafkaManualCommitFactory.KafkaRecordPayload`

Encapsulates Kafka record-specific metadata such as partition, offset, and timeout.


Constructor

protected DefaultKafkaManualCommit(
    KafkaManualCommitFactory.CamelExchangePayload camelExchangePayload,
    KafkaManualCommitFactory.KafkaRecordPayload kafkaRecordPayload)

Methods

getConsumer()

@Deprecated(since = "3.15.0")
public Consumer<?, ?> getConsumer()

getTopicName()

public String getTopicName()

getThreadId()

public String getThreadId()

getOffsetRepository()

@Deprecated
public StateRepository<String, String> getOffsetRepository()

getPartition()

public TopicPartition getPartition()

getRecordOffset()

public long getRecordOffset()

getCommitTimeout()

public long getCommitTimeout()

getCamelExchangePayload()

public KafkaManualCommitFactory.CamelExchangePayload getCamelExchangePayload()

getKafkaRecordPayload()

public KafkaManualCommitFactory.KafkaRecordPayload getKafkaRecordPayload()

toString()

@Override
public String toString()

Important Implementation Details


Interaction with Other System Components


Usage Example

public class MyKafkaManualCommit extends DefaultKafkaManualCommit {

    public MyKafkaManualCommit(CamelExchangePayload camelExchangePayload, KafkaRecordPayload kafkaRecordPayload) {
        super(camelExchangePayload, kafkaRecordPayload);
    }

    public void commit() {
        Consumer<?, ?> consumer = getCamelExchangePayload().consumer;
        TopicPartition partition = getPartition();
        long offset = getRecordOffset() + 1; // commit next offset

        // commit synchronously
        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
    }
}

Class Diagram

classDiagram
    class DefaultKafkaManualCommit {
        -camelExchangePayload: KafkaManualCommitFactory.CamelExchangePayload
        -kafkaRecordPayload: KafkaManualCommitFactory.KafkaRecordPayload
        +DefaultKafkaManualCommit(CamelExchangePayload, KafkaRecordPayload)
        +Consumer<?, ?> getConsumer() <<deprecated>>
        +String getTopicName()
        +String getThreadId()
        +StateRepository<String, String> getOffsetRepository() <<deprecated>>
        +TopicPartition getPartition()
        +long getRecordOffset()
        +long getCommitTimeout()
        +CamelExchangePayload getCamelExchangePayload()
        +KafkaRecordPayload getKafkaRecordPayload()
        +String toString()
    }

Summary

`DefaultKafkaManualCommit.java` is an essential abstract class in Apache Camel's Kafka consumer integration, encapsulating metadata and context required for manual offset commits. It manages Kafka record details, consumer and thread info, and supports backward compatibility while guiding users toward modern API usage. Its design promotes extensibility and clean separation between Camel and Kafka-specific payload data, serving as a reliable base for concrete manual commit implementations.