OffsetCache.java

Overview

`OffsetCache.java` is a utility class within the Kafka consumer component of the Apache Camel integration framework. Its primary purpose is to maintain an in-memory cache of the latest processed offsets for Kafka topic partitions. This caching mechanism supports efficient offset management by tracking offsets that have been processed but not yet committed to Kafka.

The class helps coordinate offset commits, ensuring that only offsets that are successfully committed are removed from the cache, preventing data loss or duplicate processing in case of failures. It uses thread-safe data structures to support concurrent access typical in Kafka consumer environments.


Class: OffsetCache

Description

`OffsetCache` is a final class responsible for storing and managing the latest processed offsets for Kafka topic partitions. It acts as a temporary storage to keep track of offsets that have been processed by the consumer but might still need to be committed to Kafka. This allows the consumer to retry committing offsets in case of transient failures, improving reliability.


Properties

Property Name

Type

Description

`lastProcessedOffset`

`Map`

A concurrent map storing the latest processed offset per topic partition.


Methods

recordOffset

public void recordOffset(TopicPartition partition, long partitionLastOffset)

removeCommittedEntries

public void removeCommittedEntries(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception)

removeCommittedEntry

private void removeCommittedEntry(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata)

getOffset

public Long getOffset(TopicPartition partition)

cacheSize

public long cacheSize()

contains

public boolean contains(TopicPartition topicPartition)

Important Implementation Details


Interaction with Other Components

By providing a reliable offset tracking cache, this class helps ensure exactly-once or at-least-once message processing semantics in the Kafka consumer integration.


Class Diagram

classDiagram
    class OffsetCache {
        -Map<TopicPartition, Long> lastProcessedOffset
        +void recordOffset(TopicPartition partition, long partitionLastOffset)
        +void removeCommittedEntries(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception)
        -void removeCommittedEntry(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata)
        +Long getOffset(TopicPartition partition)
        +long cacheSize()
        +boolean contains(TopicPartition topicPartition)
    }

Summary

`OffsetCache.java` is a concise, thread-safe caching utility critical for managing the lifecycle of processed Kafka offsets within Apache Camel's Kafka component. It ensures that offsets are only removed from the cache after successful commits, thereby supporting robust and reliable Kafka consumer offset management. Its simple API provides methods to record, query, and clean up offsets, integrating seamlessly with Kafka consumer workflows.