SyncCommitManager.java
Overview
`SyncCommitManager` is a concrete implementation of a commit manager within the Apache Camel Kafka component responsible for managing Kafka consumer offset commits synchronously. It ensures that offset commits to Kafka brokers occur in a blocking manner using Kafka's `commitSync()` API, waiting for confirmation before proceeding. This approach provides strong delivery guarantees (at-least-once or exactly-once semantics) by making sure offsets are durably stored before the consumer fetches new records.
This class maintains an internal cache of offsets (`OffsetCache`) that have been processed but not yet committed, supports committing offsets for individual Kafka partitions, and optionally persists committed offsets to an external offset repository for enhanced fault tolerance.
`SyncCommitManager` extends `AbstractCommitManager` and integrates with the Kafka consumer, Camel Kafka consumer endpoint configuration, and optional external offset repositories.
Class: SyncCommitManager
Package
org.apache.camel.component.kafka.consumer
Inheritance and Interfaces
Extends:
AbstractCommitManager
Fields
Field Name | Type | Description |
|---|---|---|
`LOG` | `Logger` | SLF4J logger instance for logging commit operations. |
`offsetCache` | `OffsetCache` | Cache to track offsets per `TopicPartition` before committing. |
`consumer` | `Consumer` | Kafka consumer instance used to commit offsets. |
`offsetRepository` | `StateRepository` | Optional external repository to persist committed offsets outside Kafka. |
Constructors
public SyncCommitManager(Consumer<?, ?> consumer,
KafkaConsumer kafkaConsumer,
String threadId,
String printableTopic)
Parameters:
consumer: Kafka consumer instance used to perform commit operations.kafkaConsumer: The Camel Kafka consumer owning this commit manager.threadId: Identifier for the thread running the consumer (used in logging).printableTopic: Human-readable topic name for logging.
Description:
Initializes the synchronous commit manager with the Kafka consumer, references configuration for offset repository, and sets up necessary internal state.
Methods
void commit()
Description:
Performs a synchronous commit of all offsets currently tracked by the Kafka consumer, but only if Kafka's auto-commit is enabled in the configuration. It uses the Kafka consumer'scommitSync()method to block until the commit is acknowledged.Usage Example:
syncCommitManager.commit();Behavior:
Logs the commit operation.
Calls
consumer.commitSync()without parameters to commit all assigned partitions.
void commit(TopicPartition partition)
Parameters:
partition: The KafkaTopicPartitionwhose offset should be committed.
Description:
Commits the offset for a specific partition synchronously. Retrieves the cached offset for the partition, increments it by one (Kafka expects the next offset to consume), and commits it to Kafka. If an offset repository is configured, it also persists the committed offset externally.Usage Example:
TopicPartition tp = new TopicPartition("my-topic", 0); syncCommitManager.commit(tp);Logging:
Debug-level logging indicating the commit operation with partition and thread info.
Internal Call:
Delegates the actual commit logic to the private methodcommitSync(partition).
private void commitSync(TopicPartition partition)
Parameters:
partition: KafkaTopicPartitionto commit.
Description:
Core method that:Retrieves the last offset processed for the given partition from
offsetCache.If no offset is recorded, exits early.
Prepares a map with the offset + 1 (the next offset to consume).
Calls Kafka's
commitSync(offsets, timeout)to synchronously commit the partition offset.Optionally persists the committed offset to the external offset repository.
Cleans up the offset cache by removing committed entries.
Implementation Details:
Uses configured commit timeout (
configuration.getCommitTimeoutMs()).Uses Kafka's
OffsetAndMetadatawrapper to specify offsets.Ensures offset increment by one as Kafka expects the next offset after the last consumed message.
void recordOffset(TopicPartition partition, long partitionLastOffset)
Parameters:
partition: KafkaTopicPartitionfor which offset is recorded.partitionLastOffset: Last offset processed for the partition.
Description:
Records the last processed offset for a partition in the internalOffsetCache. This offset will later be committed during the commit operations.Usage Example:
syncCommitManager.recordOffset(tp, 12345L);Purpose:
Allows the consumer to keep track of the latest processed offset before committing.
Important Implementation Details
Offset Caching:
TheOffsetCachestores offsets between processing and committing to avoid redundant commits and to handle batch processing efficiently.Synchronous Commit:
Uses Kafka'scommitSync()method, which blocks until the broker acknowledges the offset commit, ensuring strong delivery semantics.Offset Increment:
Kafka offsets are committed as the next offset to consume, hence the increment by one before committing.Timeout Control:
Commit operations respect a configured timeout (commitTimeoutMs) to avoid indefinite blocking.Optional External Offset Repository:
If configured, committed offsets are also saved in an external state repository (StateRepository). This adds an additional layer of fault tolerance outside Kafka's internal offset storage.Logging:
Uses SLF4J for info and debug logs to track commit operations, thread identities, and topics for diagnostic purposes.
Interaction with Other System Components
Kafka Consumer (
Consumer<?, ?>):SyncCommitManagerrelies on the Kafka consumer API for committing offsets synchronously.KafkaConsumer (Camel component):
Provides configuration and context, including consumer endpoint, offset repository, and manual commit integration.Offset Repository (
StateRepository):
Optional component where committed offsets can be stored externally for recovery or auditing.OffsetCache:
Internal utility to track offsets awaiting commit.Logging Framework:
Uses SLF4J for structured logs aiding monitoring and troubleshooting.AbstractCommitManager:
Base class providing shared utilities and state for different commit managers.
Usage Example
// Assume kafkaConsumer is an instance of KafkaConsumer from Camel Kafka component
Consumer<String, String> kafkaClientConsumer = ...; // Kafka consumer instance
String threadId = "thread-1";
String topicName = "my-topic";
SyncCommitManager syncCommitManager = new SyncCommitManager(kafkaClientConsumer, kafkaConsumer, threadId, topicName);
// Record an offset after processing a message
TopicPartition partition = new TopicPartition(topicName, 0);
long lastProcessedOffset = 150L;
syncCommitManager.recordOffset(partition, lastProcessedOffset);
// Commit offset synchronously for the partition
syncCommitManager.commit(partition);
// Commit all offsets synchronously if auto-commit is enabled
syncCommitManager.commit();
Mermaid Class Diagram
classDiagram
class SyncCommitManager {
-OffsetCache offsetCache
-Consumer<?, ?> consumer
-StateRepository<String, String> offsetRepository
+SyncCommitManager(Consumer<?, ?>, KafkaConsumer, String, String)
+void commit()
+void commit(TopicPartition)
+void recordOffset(TopicPartition, long)
-void commitSync(TopicPartition)
}
SyncCommitManager --|> AbstractCommitManager
Summary
`SyncCommitManager.java` provides a robust, synchronous offset commit strategy within Apache Camel's Kafka consumer implementation. It ensures reliable offset commits by blocking until Kafka confirms persistence, integrates with optional external offset repositories, and maintains an internal cache of offsets to efficiently manage commit operations. This commit manager is suitable for use cases demanding strong delivery guarantees, at the cost of some throughput due to blocking behavior.
The class fits into the broader offset commit management framework by implementing one of several commit strategies selectable based on configuration and use case needs, allowing Apache Camel Kafka consumers to adapt offset committing behavior dynamically.
Visual Workflow: Synchronous Commit Process
flowchart TD
A[Message Processed] --> B[recordOffset(partition, offset)]
B --> C{Commit Triggered?}
C -- No --> A
C -- Yes --> D[Retrieve cached offset]
D --> E[Increment offset by 1]
E --> F[commitSync(offset) to Kafka]
F --> G{Commit successful?}
G -- Yes --> H[Save offset to external repository (if configured)]
H --> I[Remove committed offsets from cache]
I --> A
G -- No --> J[Handle commit failure (retry/log)]
This documentation provides a comprehensive understanding of `SyncCommitManager.java`, including its purpose, API, implementation details, and role within the Apache Camel Kafka consumer offset commit management system.