ManualCommit.java

Overview

The [ManualCommit.java](/projects/289/68612) file is part of the Apache Camel Kafka component and provides a **manual offset commit processor** for Kafka consumer messages within Camel routes. Its primary purpose is to enable explicit control over committing Kafka consumer offsets by invoking the commit operation programmatically during route processing. This contrasts with Kafka's default automatic offset commit behavior, allowing developers to implement fine-grained, application-level offset management for better reliability and exactly-once processing semantics.

This file defines the `ManualCommit` class, which implements the Camel `Processor` interface. When invoked, it attempts to retrieve a `KafkaManualCommit` instance from the current message headers and calls its `commit()` method to perform the offset commit.


Class: ManualCommit

public class ManualCommit implements Processor

Description

`ManualCommit` is a Camel `Processor` that triggers the Kafka manual commit operation on the current exchange. It looks up the commit object from the exchange message headers and calls its `commit()` method to commit offsets explicitly. This processor can be inserted anywhere in a Camel Kafka consumer route where manual commit is required.

Fields

Field

Type

Description

`LOG`

`Logger`

Logger instance for debug logging. Uses SLF4J framework.

Methods

void process(Exchange exchange) throws Exception

ManualCommit manualCommitProcessor = new ManualCommit();
manualCommitProcessor.process(exchange);

In a Camel route, this processor can be used as:

from("kafka:my-topic?autoCommitEnable=false")
  .process(new MyBusinessLogicProcessor())
  .process(new ManualCommit());

This ensures that offsets are committed only after the business logic processor successfully completes.


Important Implementation Details


Interaction with Other Components


Example Usage Scenario

  1. A Kafka consumer polls messages with auto-commit disabled.

  2. For each consumed record, a KafkaManualCommit instance is created and attached to the exchange header.

  3. The Camel route processes the message and, once processing is successful, invokes the ManualCommit processor.

  4. The processor retrieves the commit instance and calls commit(), which commits the offset back to Kafka.

  5. This ensures offsets are only committed after a successful message processing cycle.


Mermaid Class Diagram

classDiagram
    class ManualCommit {
        <<implements>> Processor
        - LOG: Logger
        + process(exchange: Exchange): void
    }
    
    class Exchange {
        + getMessage(): Message
    }
    
    class Message {
        + getHeader(String, Class): Object
    }
    
    class KafkaManualCommit {
        + commit(): void
    }
    
    ManualCommit --> Exchange : uses
    Exchange --> Message : provides
    Message --> KafkaManualCommit : retrieves header
    ManualCommit --> KafkaManualCommit : invokes commit()

Summary

[ManualCommit.java](/projects/289/68612) is a minimal yet critical integration piece within Apache Camel's Kafka component that enables explicit, manual offset commits during message processing. By implementing the `Processor` interface, it fits seamlessly into Camel routes and leverages the header-based propagation of `KafkaManualCommit` instances to perform offset management. This design supports advanced use cases requiring transactional or exactly-once processing guarantees by controlling the timing of offset commits outside Kafka's default automatic modes.