ManualCommit.java
Overview
The [ManualCommit.java](/projects/289/68612) file is part of the Apache Camel Kafka component and provides a **manual offset commit processor** for Kafka consumer messages within Camel routes. Its primary purpose is to enable explicit control over committing Kafka consumer offsets by invoking the commit operation programmatically during route processing. This contrasts with Kafka's default automatic offset commit behavior, allowing developers to implement fine-grained, application-level offset management for better reliability and exactly-once processing semantics.
This file defines the `ManualCommit` class, which implements the Camel `Processor` interface. When invoked, it attempts to retrieve a `KafkaManualCommit` instance from the current message headers and calls its `commit()` method to perform the offset commit.
Class: ManualCommit
public class ManualCommit implements Processor
Description
`ManualCommit` is a Camel `Processor` that triggers the Kafka manual commit operation on the current exchange. It looks up the commit object from the exchange message headers and calls its `commit()` method to commit offsets explicitly. This processor can be inserted anywhere in a Camel Kafka consumer route where manual commit is required.
Fields
Field | Type | Description |
|---|---|---|
`LOG` | `Logger` | Logger instance for debug logging. Uses SLF4J framework. |
Methods
void process(Exchange exchange) throws Exception
Description:
Processes the given CamelExchangeby retrieving theKafkaManualCommitinstance from the message header and invoking itscommit()method. If the header is missing, it logs a debug message indicating that the commit cannot be performed.Parameters:
exchange— The CamelExchangeobject representing the current message and processing context.Throws:
Exception— if the underlying commit operation fails.Usage Example:
ManualCommit manualCommitProcessor = new ManualCommit();
manualCommitProcessor.process(exchange);
In a Camel route, this processor can be used as:
from("kafka:my-topic?autoCommitEnable=false")
.process(new MyBusinessLogicProcessor())
.process(new ManualCommit());
This ensures that offsets are committed only after the business logic processor successfully completes.
Important Implementation Details
Dependency on
KafkaManualCommit:
The processor relies on theKafkaManualCommitinstance being present in the message header with the keyKafkaConstants.MANUAL_COMMIT. This instance encapsulates the Kafka consumer context and offset information necessary to perform a commit.Logging:
Uses debug-level logging to indicate whether a manual commit is being performed or if the required header is missing.No Commit Logic Inside:
The actual commit logic is delegated to theKafkaManualCommit.commit()method, which abstracts commit semantics (synchronous/asynchronous) and interaction with Kafka or external offset repositories.Integration Point:
This processor is designed to be a simple, reusable component that can be added anywhere in the Camel route processing chain to trigger offset commits.
Interaction with Other Components
KafkaManualCommit:
The core commit interface whose instances are attached to exchanges and invoked by this processor to commit offsets.Kafka Consumer Component:
Responsible for creating and attachingKafkaManualCommitinstances to exchanges when manual commit mode is enabled.Camel Routes:
The processor is used within Camel routes consuming from Kafka to explicitly control when offsets are committed.Commit Managers and Offset Repositories:
Underlying commit implementations may interact with Kafka brokers or external stores through commit manager abstractions, but this file remains agnostic to those details.
Example Usage Scenario
A Kafka consumer polls messages with auto-commit disabled.
For each consumed record, a
KafkaManualCommitinstance is created and attached to the exchange header.The Camel route processes the message and, once processing is successful, invokes the
ManualCommitprocessor.The processor retrieves the commit instance and calls
commit(), which commits the offset back to Kafka.This ensures offsets are only committed after a successful message processing cycle.
Mermaid Class Diagram
classDiagram
class ManualCommit {
<<implements>> Processor
- LOG: Logger
+ process(exchange: Exchange): void
}
class Exchange {
+ getMessage(): Message
}
class Message {
+ getHeader(String, Class): Object
}
class KafkaManualCommit {
+ commit(): void
}
ManualCommit --> Exchange : uses
Exchange --> Message : provides
Message --> KafkaManualCommit : retrieves header
ManualCommit --> KafkaManualCommit : invokes commit()
Summary
[ManualCommit.java](/projects/289/68612) is a minimal yet critical integration piece within Apache Camel's Kafka component that enables explicit, manual offset commits during message processing. By implementing the `Processor` interface, it fits seamlessly into Camel routes and leverages the header-based propagation of `KafkaManualCommit` instances to perform offset management. This design supports advanced use cases requiring transactional or exactly-once processing guarantees by controlling the timing of offset commits outside Kafka's default automatic modes.