Synchronous Commit
Purpose
Synchronous Commit addresses the need for guaranteed persistence of Kafka consumer offsets before advancing message processing. Within the broader offset commit management topic, this subtopic ensures that offset commits to Kafka brokers happen in a blocking manner: the consumer waits until Kafka acknowledges the commit. This is critical in scenarios requiring strong delivery guarantees (e.g., exactly-once or at-least-once processing) where losing offset commit information could lead to message reprocessing or data inconsistency.
Unlike asynchronous or no-op commits, synchronous commits trade throughput for reliability by making sure offset updates are durably stored before proceeding to fetch new records.
Functionality
The core responsibilities include:
Offset Caching: Tracking the latest processed offsets per topic partition locally before committing.
Blocking Commit Calls: Performing blocking
commitSync()operations on the Kafka consumer to persist offsets.Partition-Specific Commits: Allowing commits of offsets for individual partitions for fine-grained control.
Offset Repository Integration: Optionally persisting offsets to an external state repository alongside Kafka commits.
Timeout Handling: Enforcing commit timeout configuration to avoid indefinite blocking.
Key Workflow
Record Offsets: As messages are processed, offsets are recorded via the
recordOffsetmethod into a local cache.Commit Offsets: When a commit is triggered, either automatically or explicitly for specific partitions, the manager retrieves the cached offset, increments it by one (Kafka expects the next message offset), and issues a synchronous commit via Kafka's
commitSync()method.State Repository Persistence: If configured, the committed offset is also saved to an external offset repository to support additional offset tracking or recovery mechanisms.
Cache Cleanup: After successful commit, the committed offsets are removed from the local cache to avoid redundant commits.
Code Interaction Snippet
@Override
public void commitSync(TopicPartition partition) {
Long offset = offsetCache.getOffset(partition);
if (offset == null) {
return;
}
long lastOffset = offset + 1;
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(
partition, new OffsetAndMetadata(lastOffset));
consumer.commitSync(offsets, Duration.ofMillis(configuration.getCommitTimeoutMs()));
if (offsetRepository != null) {
saveStateToOffsetRepository(partition, lastOffset, offsetRepository);
}
offsetCache.removeCommittedEntries(offsets, null);
}
This snippet highlights the synchronous commit execution, including timeout control and optional repository persistence.
Integration
Synchronous Commit is a concrete implementation of the broader Offset Commit Management strategy. It integrates tightly with:
Kafka Consumer Layer: It uses the Kafka client consumer to perform blocking commits, ensuring the consumer’s offset state is safely stored before continuing.
Offset Cache: Maintains an in-memory cache of offsets that have been processed but not yet committed, allowing efficient commits without redundant offset retrieval.
Offset Repository (Optional): Supports external state persistence for offset recovery, complementing Kafka’s internal offset storage.
Other Commit Strategies: Coexists with asynchronous and no-op commit managers, allowing users to select commit semantics based on their use case's reliability vs. performance trade-offs.
Manual Commit APIs: When combined with manual commit support, synchronous commit offers deterministic control over when offsets are persisted.
By providing a synchronous commit mechanism, this subtopic ensures that the Kafka consumer can guarantee offset persistence with minimal risk of message duplication due to offset loss, thus enhancing fault tolerance in critical processing pipelines.
Diagram
flowchart TD
ProcessedMsgs[Processed Messages] --> RecordOffset[Record Offset in Cache]
RecordOffset --> CheckCommitTrigger{Commit Needed?}
CheckCommitTrigger -- No --> Continue[Continue Processing]
CheckCommitTrigger -- Yes --> FetchOffset[Get Cached Offset]
FetchOffset --> IncrementOffset[Increment Offset by 1]
IncrementOffset --> CommitToKafka[commitSync(offset)]
CommitToKafka -->|Success| UpdateRepo[Save Offset to Repository (Optional)]
UpdateRepo --> ClearCache[Remove Committed Offset from Cache]
ClearCache --> Continue
CommitToKafka -->|Failure| HandleError[Handle Commit Failure]
This flowchart illustrates the synchronous commit process starting from offset recording through to committing offsets synchronously to Kafka, optionally persisting to an external repository, and cleaning up the cache for continued processing.