Offset Commit Management

Overview

The **Offset Commit Management** module abstracts and implements various strategies for committing Kafka consumer offsets within the Apache Camel Kafka component. Its purpose is to manage how and when Kafka consumer offsets are acknowledged and persisted, which is critical for ensuring message processing reliability, fault tolerance, and exactly-once or at-least-once delivery semantics.

This module addresses the challenges of offset management by providing pluggable commit managers that support different commit modes, including synchronous commits, asynchronous commits, no-operation commits (when Kafka's auto-commit is enabled), and commits backed by an external offset repository. It also integrates with manual commit APIs that allow fine-grained offset control within Camel routes.

Core Concepts and Purpose

The module exists to provide flexibility and reliability in offset management, catering to different use cases such as high-throughput batch processing, transactional consumption, manual control, and offset persistence in external storage.

How the Module Works

Commit Manager Factory

At the core is the factory method `CommitManagers.createCommitManager()` which selects and creates an appropriate `CommitManager` implementation based on the consumer configuration and manual commit settings.

public static CommitManager createCommitManager(
        Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
    KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration();

    if (configuration.isAllowManualCommit()) {
        KafkaManualCommitFactory manualCommitFactory = kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();
        if (manualCommitFactory instanceof DefaultKafkaManualAsyncCommitFactory) {
            return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
        } else if (manualCommitFactory instanceof DefaultKafkaManualCommitFactory) {
            return new SyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
        } else {
            return new NoopCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
        }
    } else {
        if (configuration.getOffsetRepository() != null) {
            return new CommitToOffsetManager(consumer, kafkaConsumer, threadId, printableTopic);
        }
        if (configuration.isBatching()) {
            return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
        }
    }
    return new NoopCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
}

This method evaluates:

Abstract Commit Manager Base

All commit managers extend `AbstractCommitManager`, which provides common utilities and state, including:

For example, the method to force a synchronous commit of a specific partition offset:

@Override
public void forceCommit(TopicPartition partition, long partitionLastOffset) {
    consumer.commitSync(
        Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)),
        Duration.ofMillis(configuration.getCommitTimeoutMs()));
}

Synchronous Commit Manager

`SyncCommitManager` performs blocking commits via Kafka's `commitSync()` API. It maintains an internal cache of offsets (`OffsetCache`) that have been processed but not yet committed.

Key behaviors include:

This strategy ensures strong delivery semantics but may impact throughput due to waiting for commit acknowledgments.

@Override
public void commit(TopicPartition partition) {
    Long offset = offsetCache.getOffset(partition);
    if (offset == null) {
        return;
    }

    consumer.commitSync(
        Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)),
        Duration.ofMillis(configuration.getCommitTimeoutMs()));

    if (offsetRepository != null) {
        saveStateToOffsetRepository(partition, offset + 1, offsetRepository);
    }

    offsetCache.removeCommittedEntries(...);
}

Asynchronous Commit Manager

`AsyncCommitManager` uses Kafka's `commitAsync()` API to commit offsets non-blockingly.

Characteristics:

On commit completion, the callback persists offsets to the external repository and cleans the offset cache:

private void postCommitCallback(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception) {
    if (exception == null && offsetRepository != null) {
        for (var entry : committed.entrySet()) {
            saveStateToOffsetRepository(entry.getKey(), entry.getValue().offset(), offsetRepository);
        }
    }
    offsetCache.removeCommittedEntries(committed, exception);
}

No-Op Commit Manager

`NoopCommitManager` is a placeholder commit manager used when offset commits are handled automatically by Kafka's internal auto-commit mechanism.

@Override
public void commit() {
    LOG.info("Auto commit on {} from {} is enabled via Kafka consumer (NO-OP)", threadId, printableTopic);
}

@Override
public void commit(TopicPartition partition) {
    LOG.debug("Auto commit to offset {} from topic {} is disabled (NO-OP)", threadId, partition.topic());
}

Manual Commit Integration

All commit managers support generating `KafkaManualCommit` instances that allow explicit offset commits from within Camel routes.

They leverage the `KafkaManualCommitFactory` from the endpoint configuration to instantiate manual commit objects that encapsulate the necessary context (exchange, consumer, partition, offset, timeout).

This integration enables users to control offset commits explicitly, overriding automatic commit behavior.

Interaction with Other Modules

Design Patterns and Approaches

Summary of Commit Manager Classes

Commit Manager

Commit Mode

Blocking

Uses Offset Repository

Manual Commit Support

Typical Use Case

`SyncCommitManager`

Synchronous

Yes

Optional

Yes

Reliable commit with lower throughput

`AsyncCommitManager`

Asynchronous

No

Optional

Yes

High throughput with eventual commit confirmation

`NoopCommitManager`

No operation

N/A

No

Limited

Kafka auto-commit enabled, no manual control

`CommitToOffsetManager`*

External storage

Varies

Yes

Yes

Offset commits persisted in external repository

*Note: `CommitToOffsetManager` was referenced but its implementation is outside the provided code.


Mermaid Sequence Diagram: Offset Commit Workflow

sequenceDiagram
    participant Consumer as Kafka Consumer
    participant CommitMgr as CommitManager
    participant Kafka as Kafka Broker
    participant OffsetRepo as Offset Repository

    Note over Consumer: Message processing loop
    Consumer->>CommitMgr: recordOffset(partition, offset)
    CommitMgr->>CommitMgr: cache offset internally

    alt Auto commit enabled & No manual commit
        CommitMgr->>Kafka: commitAsync() or commitSync() as per strategy
    else Manual commit enabled
        CommitMgr->>Consumer: expose KafkaManualCommit for manual offset control
        Note right of CommitMgr: Manual commit triggered by route
        CommitMgr->>Kafka: commitSync() or commitAsync() for manual commit
    end

    alt Offset Repository configured
        CommitMgr->>OffsetRepo: save offset state externally
    end

    Kafka-->>CommitMgr: commit acknowledgment or callback
    CommitMgr->>CommitMgr: update cache to remove committed offsets

This diagram illustrates how the commit manager records offsets during consumption, commits offsets either automatically or manually, interacts with Kafka broker to persist commits, and optionally saves offset state to an external repository. The commit cache ensures only uncommitted offsets are tracked.


This detailed overview explains the purpose, operation, and integration of the Offset Commit Management module within the Apache Camel Kafka component, highlighting its flexible commit strategies and how it supports robust message consumption semantics.