KafkaManualCommitFactory.java

Overview

`KafkaManualCommitFactory.java` defines a factory interface for creating instances of `KafkaManualCommit`—objects responsible for managing manual offset commits in Kafka consumers within Apache Camel routes. This interface is central to the manual commit support mechanism that allows developers to explicitly control when Kafka consumer offsets are committed, rather than relying on Kafka’s automatic commit behavior.

This file provides:

By abstracting the creation of commit instances, this interface enables multiple commit strategies (e.g., synchronous and asynchronous commits) to be plugged in transparently, enhancing flexibility and control over Kafka offset commits in Camel routes.


Detailed Explanation

Interface: KafkaManualCommitFactory

public interface KafkaManualCommitFactory {
    KafkaManualCommit newInstance(
        CamelExchangePayload camelExchangePayload,
        KafkaRecordPayload kafkaRecordPayload,
        CommitManager commitManager);
}

Nested Class: CamelExchangePayload

class CamelExchangePayload {
    public final Exchange exchange;
    public final Consumer<?, ?> consumer;
    public final String threadId;
    public final StateRepository<String, String> offsetRepository;

    public CamelExchangePayload(Exchange exchange, Consumer<?, ?> consumer, String threadId,
                               StateRepository<String, String> offsetRepository) {
        this.exchange = exchange;
        this.consumer = consumer;
        this.threadId = threadId;
        this.offsetRepository = offsetRepository;
    }
}

Nested Class: KafkaRecordPayload

class KafkaRecordPayload {
    public final TopicPartition partition;
    public final long recordOffset;
    public final long commitTimeout;

    public KafkaRecordPayload(TopicPartition partition, long recordOffset, long commitTimeout) {
        this.partition = partition;
        this.recordOffset = recordOffset;
        this.commitTimeout = commitTimeout;
    }
}

Important Implementation Details


Interaction with Other System Components


Usage Example

// Create Camel exchange payload with context
KafkaManualCommitFactory.CamelExchangePayload camelPayload =
    new KafkaManualCommitFactory.CamelExchangePayload(exchange, consumer, threadId, offsetRepository);

// Create Kafka record payload with record info
KafkaManualCommitFactory.KafkaRecordPayload kafkaPayload =
    new KafkaManualCommitFactory.KafkaRecordPayload(partition, recordOffset, commitTimeout);

// Create commit manager instance (implementation dependent)
CommitManager commitManager = ...;

// Use a factory implementation to create a manual commit instance
KafkaManualCommit manualCommit = factory.newInstance(camelPayload, kafkaPayload, commitManager);

// Attach to exchange header for later commit invocation
exchange.getMessage().setHeader(KafkaConstants.MANUAL_COMMIT, manualCommit);

Visual Diagram: Class Structure of KafkaManualCommitFactory.java

classDiagram
    interface KafkaManualCommitFactory {
        +newInstance(CamelExchangePayload, KafkaRecordPayload, CommitManager) KafkaManualCommit
    }

    class CamelExchangePayload {
        +Exchange exchange
        +Consumer<?, ?> consumer
        +String threadId
        +StateRepository<String,String> offsetRepository
        +CamelExchangePayload(exchange, consumer, threadId, offsetRepository)
    }

    class KafkaRecordPayload {
        +TopicPartition partition
        +long recordOffset
        +long commitTimeout
        +KafkaRecordPayload(partition, recordOffset, commitTimeout)
    }

    KafkaManualCommitFactory o-- CamelExchangePayload : uses
    KafkaManualCommitFactory o-- KafkaRecordPayload : uses

Summary

`KafkaManualCommitFactory.java` is a key abstraction in Apache Camel’s Kafka component that facilitates explicit, manual control over Kafka offset commits. By defining a factory interface and associated payload classes, it enables flexible and extensible creation of commit instances that can be tailored to different commit strategies and application requirements. This design plays a crucial role in supporting advanced messaging patterns such as transactional processing and exactly-once semantics within Camel routes consuming from Kafka.