DefaultKafkaManualCommitFactory.java


Overview

[DefaultKafkaManualCommitFactory.java](/projects/289/68569) is a core component of the Apache Camel Kafka Manual Offset Commit Support module, responsible for creating **synchronous manual commit** instances for Kafka consumer offsets. It implements the `KafkaManualCommitFactory` interface and produces instances of `DefaultKafkaManualSyncCommit` that encapsulate the logic for synchronously committing Kafka offsets within Camel routes.

This factory enables fine-grained, explicit control over when Kafka offsets are committed by the route, ensuring that the offset commit operation blocks until completion. This is critical in processing scenarios that require strict ordering, exactly-once semantics, or transactional guarantees where the route must confirm offset persistence before proceeding.


Class Summary

DefaultKafkaManualCommitFactory

public class DefaultKafkaManualCommitFactory implements KafkaManualCommitFactory {
    @Override
    public KafkaManualCommit newInstance(
        CamelExchangePayload camelExchangePayload,
        KafkaRecordPayload kafkaRecordPayload,
        CommitManager commitManager) {
        
        return new DefaultKafkaManualSyncCommit(camelExchangePayload, kafkaRecordPayload, commitManager);
    }
}

Description

Method Details

newInstance
KafkaManualCommit newInstance(
    CamelExchangePayload camelExchangePayload,
    KafkaRecordPayload kafkaRecordPayload,
    CommitManager commitManager)

Important Implementation Details


Interaction with Other Components


Usage Example

// Assume these objects are available from the Kafka consumer context and Camel route
KafkaManualCommitFactory factory = new DefaultKafkaManualCommitFactory();

KafkaManualCommitFactory.CamelExchangePayload camelPayload =
    new KafkaManualCommitFactory.CamelExchangePayload(exchange, consumer, threadId, offsetRepository);

KafkaManualCommitFactory.KafkaRecordPayload kafkaPayload =
    new KafkaManualCommitFactory.KafkaRecordPayload(partition, offset, timeout);

CommitManager commitManager = ...; // injected or constructed

KafkaManualCommit manualCommit = factory.newInstance(camelPayload, kafkaPayload, commitManager);

// Attach the manual commit instance to the Camel exchange for later use
exchange.getMessage().setHeader(KafkaConstants.MANUAL_COMMIT, manualCommit);

In a Camel route processor, the manual commit can be triggered as follows:

public void process(Exchange exchange) throws Exception {
    KafkaManualCommit manual = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    if (manual != null) {
        manual.commit();  // Synchronously commit the offset
    }
}

Mermaid Class Diagram

classDiagram
    class DefaultKafkaManualCommitFactory {
        +newInstance(camelExchangePayload: CamelExchangePayload, kafkaRecordPayload: KafkaRecordPayload, commitManager: CommitManager) KafkaManualCommit
    }

    class KafkaManualCommitFactory {
        <<interface>>
        +newInstance(camelExchangePayload: CamelExchangePayload, kafkaRecordPayload: KafkaRecordPayload, commitManager: CommitManager) KafkaManualCommit
    }

    class DefaultKafkaManualSyncCommit {
        +commit()
        -camelExchangePayload: CamelExchangePayload
        -kafkaRecordPayload: KafkaRecordPayload
        -commitManager: CommitManager
    }

    DefaultKafkaManualCommitFactory ..|> KafkaManualCommitFactory
    DefaultKafkaManualCommitFactory --> DefaultKafkaManualSyncCommit : creates

Summary

This component is essential for use cases requiring strong processing guarantees and immediate offset commit acknowledgment, enabling developers to build robust Kafka consumer integrations with explicit offset management.


Additional Context: Manual Offset Commit Workflow

The following sequence diagram illustrates the interaction of this factory within the broader manual commit workflow:

sequenceDiagram
    participant KafkaConsumer as Kafka Consumer
    participant Factory as DefaultKafkaManualCommitFactory
    participant CamelExchange as Camel Exchange
    participant RouteProcessor as Route Processor
    participant ManualCommitProc as ManualCommit Processor
    participant CommitManager as Commit Manager

    KafkaConsumer->>Factory: newInstance(camelPayload, kafkaPayload, commitManager)
    Factory->>CamelExchange: Attach DefaultKafkaManualSyncCommit instance
    CamelExchange->>RouteProcessor: Deliver exchange with manual commit header
    RouteProcessor->>ManualCommitProc: Invoke ManualCommit processor
    ManualCommitProc->>CamelExchange: Retrieve manual commit instance
    ManualCommitProc->>CommitManager: commit() (synchronous)
    CommitManager-->>KafkaConsumer: Commit offsets synchronously

This depicts the lifecycle from record consumption, manual commit instance creation, through to synchronous offset committing triggered by the route processor.


References


This documentation aims to provide a thorough understanding of the role and usage of [DefaultKafkaManualCommitFactory.java](/projects/289/68569) within the Apache Camel Kafka manual offset commit architecture.