Manual Offset Commit Support

This module provides explicit control over Kafka consumer offset commits within Apache Camel routes. It enables developers to manage when and how Kafka offsets are committed, allowing fine-grained handling of message acknowledgment beyond automatic commit modes. This capability is essential for scenarios requiring precise processing guarantees, such as exactly-once processing, transactional workflows, or manual error handling.


Purpose and Core Concepts

By default, Kafka consumers may commit offsets automatically at intervals or synchronously after message processing. However, these default behaviors may not fit all use cases, especially when message processing involves complex logic, external systems, or requires transactional guarantees.

**Manual Offset Commit Support** introduces an API and implementation pattern whereby offset commits can be triggered explicitly by Camel routes during message processing. This approach allows:


How Manual Commit Support Works

The manual commit mechanism revolves around the **`KafkaManualCommit`** interface and factories that produce commit instances attached to exchanges. These commits encapsulate the necessary Kafka consumer context and offset information, enabling commit operations to be invoked programmatically.

Key Components

  1. KafkaManualCommitFactory
    A factory interface responsible for creating KafkaManualCommit instances tied to a specific Camel Exchange and Kafka record metadata.

    • Holds nested payload classes to pass Camel and Kafka context:

      • CamelExchangePayload: Includes the Camel Exchange, the Kafka Consumer, thread identifier, and optional offset repository for external offset state.

      • KafkaRecordPayload: Contains Kafka TopicPartition, record offset, and commit timeout settings.

  2. Default Factories
    Two main implementations of KafkaManualCommitFactory provide different commit semantics:

    • DefaultKafkaManualCommitFactory: Creates synchronous commit instances (DefaultKafkaManualSyncCommit), which block until the offset commit completes.

    • DefaultKafkaManualAsyncCommitFactory: Creates asynchronous commit instances (DefaultKafkaManualAsyncCommit), which initiate commit requests without blocking.

  3. ManualCommit Processor
    A Camel Processor that can be inserted in routes to trigger the manual commit on the current exchange. It retrieves the KafkaManualCommit instance from the message header and calls its commit() method.


Workflow and Usage

The manual commit support is designed to be integrated into Camel Kafka consumer routes as follows:

  1. During Message Consumption
    When a Kafka consumer fetches records, for each record or batch, the system creates a KafkaManualCommit instance using a configured factory. This instance carries the relevant state for committing the offset of that record.

  2. Attaching to Exchange
    The KafkaManualCommit object is attached to the Camel Exchange as a message header, commonly under the key KafkaConstants.MANUAL_COMMIT.

  3. Route Processing
    The route processes the message exchange as usual. At the point where the application determines that it is safe to commit offsets (e.g., after successful processing or external transaction commit), it invokes the ManualCommit processor.

  4. Committing Offsets
    The ManualCommit processor retrieves the commit instance from the exchange header and calls its commit() method, which executes the commit logic either synchronously or asynchronously depending on the factory used.


Example Snippet: Creating a Manual Commit Instance

KafkaManualCommitFactory factory = ...; // injected or configured factory
KafkaManualCommitFactory.CamelExchangePayload camelPayload =
    new KafkaManualCommitFactory.CamelExchangePayload(exchange, consumer, threadId, offsetRepository);
KafkaManualCommitFactory.KafkaRecordPayload kafkaPayload =
    new KafkaManualCommitFactory.KafkaRecordPayload(partition, offset, timeout);

KafkaManualCommit manualCommit = factory.newInstance(camelPayload, kafkaPayload, commitManager);
exchange.getMessage().setHeader(KafkaConstants.MANUAL_COMMIT, manualCommit);

Example Snippet: Manual Commit Processor Usage

public void process(Exchange exchange) throws Exception {
    KafkaManualCommit manual = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    if (manual != null) {
        manual.commit();  // perform the offset commit
    }
}

Interaction with Other Modules


Design Patterns and Concepts


Mermaid Diagram: Manual Offset Commit Workflow

sequenceDiagram
    participant KafkaConsumer as Kafka Consumer
    participant Factory as KafkaManualCommitFactory
    participant CamelExchange as Camel Exchange
    participant RouteProcessor as Route Processor
    participant ManualCommitProc as ManualCommit Processor
    participant CommitManager as Commit Manager

    KafkaConsumer->>Factory: Create KafkaManualCommit instance
    Factory->>CamelExchange: Attach commit instance to message header
    CamelExchange->>RouteProcessor: Deliver exchange with manual commit header
    RouteProcessor->>ManualCommitProc: Invoke ManualCommit processor
    ManualCommitProc->>CamelExchange: Retrieve manual commit instance
    ManualCommitProc->>CommitManager: Call commit()
    CommitManager-->>KafkaConsumer: Commit offsets to Kafka or offset repository

This manual offset commit support module empowers Camel Kafka consumers with explicit and configurable control over offset commits, enabling robust message processing workflows tailored to diverse application requirements.