Synchronous Commit

Purpose

Synchronous Commit addresses the need for guaranteed persistence of Kafka consumer offsets before advancing message processing. Within the broader offset commit management topic, this subtopic ensures that offset commits to Kafka brokers happen in a blocking manner: the consumer waits until Kafka acknowledges the commit. This is critical in scenarios requiring strong delivery guarantees (e.g., exactly-once or at-least-once processing) where losing offset commit information could lead to message reprocessing or data inconsistency.

Unlike asynchronous or no-op commits, synchronous commits trade throughput for reliability by making sure offset updates are durably stored before proceeding to fetch new records.

Functionality

The core responsibilities include:

Key Workflow

  1. Record Offsets: As messages are processed, offsets are recorded via the recordOffset method into a local cache.

  2. Commit Offsets: When a commit is triggered, either automatically or explicitly for specific partitions, the manager retrieves the cached offset, increments it by one (Kafka expects the next message offset), and issues a synchronous commit via Kafka's commitSync() method.

  3. State Repository Persistence: If configured, the committed offset is also saved to an external offset repository to support additional offset tracking or recovery mechanisms.

  4. Cache Cleanup: After successful commit, the committed offsets are removed from the local cache to avoid redundant commits.

Code Interaction Snippet

@Override
public void commitSync(TopicPartition partition) {
    Long offset = offsetCache.getOffset(partition);
    if (offset == null) {
        return;
    }
    long lastOffset = offset + 1;
    Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(
        partition, new OffsetAndMetadata(lastOffset));
    consumer.commitSync(offsets, Duration.ofMillis(configuration.getCommitTimeoutMs()));
    if (offsetRepository != null) {
        saveStateToOffsetRepository(partition, lastOffset, offsetRepository);
    }
    offsetCache.removeCommittedEntries(offsets, null);
}

This snippet highlights the synchronous commit execution, including timeout control and optional repository persistence.

Integration

Synchronous Commit is a concrete implementation of the broader Offset Commit Management strategy. It integrates tightly with:

By providing a synchronous commit mechanism, this subtopic ensures that the Kafka consumer can guarantee offset persistence with minimal risk of message duplication due to offset loss, thus enhancing fault tolerance in critical processing pipelines.

Diagram

flowchart TD
    ProcessedMsgs[Processed Messages] --> RecordOffset[Record Offset in Cache]
    RecordOffset --> CheckCommitTrigger{Commit Needed?}
    CheckCommitTrigger -- No --> Continue[Continue Processing]
    CheckCommitTrigger -- Yes --> FetchOffset[Get Cached Offset]
    FetchOffset --> IncrementOffset[Increment Offset by 1]
    IncrementOffset --> CommitToKafka[commitSync(offset)]
    CommitToKafka -->|Success| UpdateRepo[Save Offset to Repository (Optional)]
    UpdateRepo --> ClearCache[Remove Committed Offset from Cache]
    ClearCache --> Continue
    CommitToKafka -->|Failure| HandleError[Handle Commit Failure]

This flowchart illustrates the synchronous commit process starting from offset recording through to committing offsets synchronously to Kafka, optionally persisting to an external repository, and cleaning up the cache for continued processing.