Manual Offset Commit Support
This module provides explicit control over Kafka consumer offset commits within Apache Camel routes. It enables developers to manage when and how Kafka offsets are committed, allowing fine-grained handling of message acknowledgment beyond automatic commit modes. This capability is essential for scenarios requiring precise processing guarantees, such as exactly-once processing, transactional workflows, or manual error handling.
Purpose and Core Concepts
By default, Kafka consumers may commit offsets automatically at intervals or synchronously after message processing. However, these default behaviors may not fit all use cases, especially when message processing involves complex logic, external systems, or requires transactional guarantees.
**Manual Offset Commit Support** introduces an API and implementation pattern whereby offset commits can be triggered explicitly by Camel routes during message processing. This approach allows:
Deferring commits until after successful business logic execution.
Implementing custom commit strategies (e.g., synchronous or asynchronous commits).
Integrating with Camel's error handling and transaction mechanisms.
Reducing the risk of message loss or duplication by controlling commit timing.
How Manual Commit Support Works
The manual commit mechanism revolves around the **`KafkaManualCommit`** interface and factories that produce commit instances attached to exchanges. These commits encapsulate the necessary Kafka consumer context and offset information, enabling commit operations to be invoked programmatically.
Key Components
KafkaManualCommitFactory
A factory interface responsible for creatingKafkaManualCommitinstances tied to a specific CamelExchangeand Kafka record metadata.Holds nested payload classes to pass Camel and Kafka context:
CamelExchangePayload: Includes the CamelExchange, the KafkaConsumer, thread identifier, and optional offset repository for external offset state.KafkaRecordPayload: Contains KafkaTopicPartition, record offset, and commit timeout settings.
Default Factories
Two main implementations ofKafkaManualCommitFactoryprovide different commit semantics:DefaultKafkaManualCommitFactory: Creates synchronous commit instances (DefaultKafkaManualSyncCommit), which block until the offset commit completes.DefaultKafkaManualAsyncCommitFactory: Creates asynchronous commit instances (DefaultKafkaManualAsyncCommit), which initiate commit requests without blocking.
ManualCommitProcessor
A CamelProcessorthat can be inserted in routes to trigger the manual commit on the current exchange. It retrieves theKafkaManualCommitinstance from the message header and calls itscommit()method.
Workflow and Usage
The manual commit support is designed to be integrated into Camel Kafka consumer routes as follows:
During Message Consumption
When a Kafka consumer fetches records, for each record or batch, the system creates aKafkaManualCommitinstance using a configured factory. This instance carries the relevant state for committing the offset of that record.Attaching to Exchange
TheKafkaManualCommitobject is attached to the CamelExchangeas a message header, commonly under the keyKafkaConstants.MANUAL_COMMIT.Route Processing
The route processes the message exchange as usual. At the point where the application determines that it is safe to commit offsets (e.g., after successful processing or external transaction commit), it invokes theManualCommitprocessor.Committing Offsets
TheManualCommitprocessor retrieves the commit instance from the exchange header and calls itscommit()method, which executes the commit logic either synchronously or asynchronously depending on the factory used.
Example Snippet: Creating a Manual Commit Instance
KafkaManualCommitFactory factory = ...; // injected or configured factory
KafkaManualCommitFactory.CamelExchangePayload camelPayload =
new KafkaManualCommitFactory.CamelExchangePayload(exchange, consumer, threadId, offsetRepository);
KafkaManualCommitFactory.KafkaRecordPayload kafkaPayload =
new KafkaManualCommitFactory.KafkaRecordPayload(partition, offset, timeout);
KafkaManualCommit manualCommit = factory.newInstance(camelPayload, kafkaPayload, commitManager);
exchange.getMessage().setHeader(KafkaConstants.MANUAL_COMMIT, manualCommit);
Example Snippet: Manual Commit Processor Usage
public void process(Exchange exchange) throws Exception {
KafkaManualCommit manual = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
manual.commit(); // perform the offset commit
}
}
Interaction with Other Modules
Kafka Consumer (
KafkaConsumer): Responsible for polling Kafka and creatingKafkaManualCommitinstances for each consumed record or batch. It delegates commit control to manual commit objects when manual commit mode is enabled.Commit Managers (
CommitManager): Abstract the underlying commit strategy (synchronous, asynchronous, no-op, repository-backed). Manual commit instances rely on aCommitManagerto execute the actual commit operation.Offset Repository (
StateRepository): Optional external store used by some commit managers to persist offset state outside Kafka. The manual commit payload includes a reference to this repository if applicable.Camel Routes: The manual commit support integrates into Camel routes by placing the commit instance in the exchange headers, allowing route processors to invoke commits explicitly.
Design Patterns and Concepts
Factory Pattern:
KafkaManualCommitFactoryand its implementations encapsulate creation logic of commit instances, allowing the system to switch between synchronous and asynchronous commit behaviors transparently.Strategy Pattern: Different commit behaviors (sync vs async) are encapsulated in separate commit classes created by respective factories, allowing runtime selection of commit strategy.
Separation of Concerns: Manual commit logic is decoupled from Kafka consumer polling and Camel routing logic, enabling modularity and testability.
Header Propagation: Usage of Camel message headers to carry manual commit instances allows seamless integration with route processors without tight coupling.
Mermaid Diagram: Manual Offset Commit Workflow
sequenceDiagram
participant KafkaConsumer as Kafka Consumer
participant Factory as KafkaManualCommitFactory
participant CamelExchange as Camel Exchange
participant RouteProcessor as Route Processor
participant ManualCommitProc as ManualCommit Processor
participant CommitManager as Commit Manager
KafkaConsumer->>Factory: Create KafkaManualCommit instance
Factory->>CamelExchange: Attach commit instance to message header
CamelExchange->>RouteProcessor: Deliver exchange with manual commit header
RouteProcessor->>ManualCommitProc: Invoke ManualCommit processor
ManualCommitProc->>CamelExchange: Retrieve manual commit instance
ManualCommitProc->>CommitManager: Call commit()
CommitManager-->>KafkaConsumer: Commit offsets to Kafka or offset repository
This manual offset commit support module empowers Camel Kafka consumers with explicit and configurable control over offset commits, enabling robust message processing workflows tailored to diverse application requirements.