SyncCommitManager.java


Overview

`SyncCommitManager` is a concrete implementation of a commit manager within the Apache Camel Kafka component responsible for managing Kafka consumer offset commits synchronously. It ensures that offset commits to Kafka brokers occur in a blocking manner using Kafka's `commitSync()` API, waiting for confirmation before proceeding. This approach provides strong delivery guarantees (at-least-once or exactly-once semantics) by making sure offsets are durably stored before the consumer fetches new records.

This class maintains an internal cache of offsets (`OffsetCache`) that have been processed but not yet committed, supports committing offsets for individual Kafka partitions, and optionally persists committed offsets to an external offset repository for enhanced fault tolerance.

`SyncCommitManager` extends `AbstractCommitManager` and integrates with the Kafka consumer, Camel Kafka consumer endpoint configuration, and optional external offset repositories.


Class: SyncCommitManager

Package

org.apache.camel.component.kafka.consumer

Inheritance and Interfaces


Fields

Field Name

Type

Description

`LOG`

`Logger`

SLF4J logger instance for logging commit operations.

`offsetCache`

`OffsetCache`

Cache to track offsets per `TopicPartition` before committing.

`consumer`

`Consumer`

Kafka consumer instance used to commit offsets.

`offsetRepository`

`StateRepository`

Optional external repository to persist committed offsets outside Kafka.


Constructors

public SyncCommitManager(Consumer<?, ?> consumer,
                         KafkaConsumer kafkaConsumer,
                         String threadId,
                         String printableTopic)

Methods

void commit()


void commit(TopicPartition partition)


private void commitSync(TopicPartition partition)


void recordOffset(TopicPartition partition, long partitionLastOffset)


Important Implementation Details


Interaction with Other System Components


Usage Example

// Assume kafkaConsumer is an instance of KafkaConsumer from Camel Kafka component
Consumer<String, String> kafkaClientConsumer = ...; // Kafka consumer instance
String threadId = "thread-1";
String topicName = "my-topic";

SyncCommitManager syncCommitManager = new SyncCommitManager(kafkaClientConsumer, kafkaConsumer, threadId, topicName);

// Record an offset after processing a message
TopicPartition partition = new TopicPartition(topicName, 0);
long lastProcessedOffset = 150L;
syncCommitManager.recordOffset(partition, lastProcessedOffset);

// Commit offset synchronously for the partition
syncCommitManager.commit(partition);

// Commit all offsets synchronously if auto-commit is enabled
syncCommitManager.commit();

Mermaid Class Diagram

classDiagram
    class SyncCommitManager {
        -OffsetCache offsetCache
        -Consumer<?, ?> consumer
        -StateRepository<String, String> offsetRepository
        +SyncCommitManager(Consumer<?, ?>, KafkaConsumer, String, String)
        +void commit()
        +void commit(TopicPartition)
        +void recordOffset(TopicPartition, long)
        -void commitSync(TopicPartition)
    }
    SyncCommitManager --|> AbstractCommitManager

Summary

`SyncCommitManager.java` provides a robust, synchronous offset commit strategy within Apache Camel's Kafka consumer implementation. It ensures reliable offset commits by blocking until Kafka confirms persistence, integrates with optional external offset repositories, and maintains an internal cache of offsets to efficiently manage commit operations. This commit manager is suitable for use cases demanding strong delivery guarantees, at the cost of some throughput due to blocking behavior.

The class fits into the broader offset commit management framework by implementing one of several commit strategies selectable based on configuration and use case needs, allowing Apache Camel Kafka consumers to adapt offset committing behavior dynamically.


Visual Workflow: Synchronous Commit Process

flowchart TD
    A[Message Processed] --> B[recordOffset(partition, offset)]
    B --> C{Commit Triggered?}
    C -- No --> A
    C -- Yes --> D[Retrieve cached offset]
    D --> E[Increment offset by 1]
    E --> F[commitSync(offset) to Kafka]
    F --> G{Commit successful?}
    G -- Yes --> H[Save offset to external repository (if configured)]
    H --> I[Remove committed offsets from cache]
    I --> A
    G -- No --> J[Handle commit failure (retry/log)]

This documentation provides a comprehensive understanding of `SyncCommitManager.java`, including its purpose, API, implementation details, and role within the Apache Camel Kafka consumer offset commit management system.