OffsetCache.java
Overview
`OffsetCache.java` is a utility class within the Kafka consumer component of the Apache Camel integration framework. Its primary purpose is to maintain an in-memory cache of the latest processed offsets for Kafka topic partitions. This caching mechanism supports efficient offset management by tracking offsets that have been processed but not yet committed to Kafka.
The class helps coordinate offset commits, ensuring that only offsets that are successfully committed are removed from the cache, preventing data loss or duplicate processing in case of failures. It uses thread-safe data structures to support concurrent access typical in Kafka consumer environments.
Class: OffsetCache
Description
`OffsetCache` is a final class responsible for storing and managing the latest processed offsets for Kafka topic partitions. It acts as a temporary storage to keep track of offsets that have been processed by the consumer but might still need to be committed to Kafka. This allows the consumer to retry committing offsets in case of transient failures, improving reliability.
Properties
Property Name | Type | Description |
|---|---|---|
`lastProcessedOffset` | `Map` | A concurrent map storing the latest processed offset per topic partition. |
Methods
recordOffset
public void recordOffset(TopicPartition partition, long partitionLastOffset)
Description:
Records the latest processed offset for a specific Kafka topic partition. This updates the cache with the most recent offset that the consumer has processed but not necessarily committed.Parameters:
partition- TheTopicPartitionobject representing the Kafka topic and partition.partitionLastOffset- The offset (as along) that has been processed for the given partition.
Returns:
void
Usage Example:
TopicPartition tp = new TopicPartition("my-topic", 0); offsetCache.recordOffset(tp, 12345L);
removeCommittedEntries
public void removeCommittedEntries(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception)
Description:
Removes entries from the cache corresponding to offsets that have been successfully committed to Kafka. If the commit operation failed (indicated by a non-null exception), it logs the error and does not remove any entries, allowing for retry.Parameters:
committed- A map ofTopicPartitiontoOffsetAndMetadatarepresenting the offsets that were attempted to be committed.exception- AnExceptionobject indicating if the commit operation resulted in an error. Ifnull, the commit was successful.
Returns:
void
Implementation Detail:
Ifexceptionisnull, the method iterates over the committed offsets and callsremoveCommittedEntryfor each. Otherwise, it logs the error.Usage Example:
Map<TopicPartition, OffsetAndMetadata> committedOffsets = ...; offsetCache.removeCommittedEntries(committedOffsets, null);
removeCommittedEntry
private void removeCommittedEntry(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata)
Description:
Helper method that removes a single committed offset from the cache and logs the removal at debug level.Parameters:
topicPartition- TheTopicPartitionwhose offset was committed.offsetAndMetadata- The committed offset metadata.
Returns:
void
Implementation Detail:
Logs the committed offset and removes the partition key fromlastProcessedOffset.
getOffset
public Long getOffset(TopicPartition partition)
Description:
Retrieves the last processed offset for a given topic partition from the cache.Parameters:
partition- TheTopicPartitionfor which to retrieve the offset.
Returns:
Long- The last processed offset if present; otherwise,null.
Usage Example:
Long offset = offsetCache.getOffset(new TopicPartition("my-topic", 0)); if (offset != null) { System.out.println("Last processed offset: " + offset); }
cacheSize
public long cacheSize()
Description:
Returns the current number of entries (topic partitions) tracked in the cache.Returns:
longrepresenting the number of cached entries.
Usage Example:
long size = offsetCache.cacheSize(); System.out.println("Cache size: " + size);
contains
public boolean contains(TopicPartition topicPartition)
Description:
Checks if the cache contains an entry for a specific topic partition.Parameters:
topicPartition- TheTopicPartitionto check for existence.
Returns:
boolean-trueif the partition is in the cache,falseotherwise.
Usage Example:
boolean exists = offsetCache.contains(new TopicPartition("my-topic", 0));
Important Implementation Details
Thread Safety:
The class uses aConcurrentHashMapto store the offsets, ensuring thread-safe operations as Kafka consumers often operate in concurrent environments.Error Handling:
TheremoveCommittedEntriesmethod carefully handles commit failures by checking theexceptionparameter. If an error occurred during commit, it logs the failure rather than removing offsets prematurely.Logging:
Uses SLF4J for logging important events such as successful offset commits and errors during commits, aiding debugging and operational monitoring.Design Choice:
The class is markedfinalto prevent inheritance, indicating it is intended as a utility/helper class within the Kafka consumer component.
Interaction with Other Components
Kafka Consumer:
TheOffsetCacheworks closely with the Kafka consumer logic inside the Apache Camel Kafka component. It tracks offsets after messages are processed but before commits are sent to Kafka, supporting the commit lifecycle.Offset Commit Process:
After processing messages, offsets are recorded in the cache. When commit requests complete, this class removes committed offsets from the cache if successful, or retains them for retry on failure.TopicPartition and OffsetAndMetadata:
Relies on Kafka client classes:TopicPartitionidentifies the topic and partition.OffsetAndMetadataencapsulates the offset committed along with optional metadata.
By providing a reliable offset tracking cache, this class helps ensure exactly-once or at-least-once message processing semantics in the Kafka consumer integration.
Class Diagram
classDiagram
class OffsetCache {
-Map<TopicPartition, Long> lastProcessedOffset
+void recordOffset(TopicPartition partition, long partitionLastOffset)
+void removeCommittedEntries(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception)
-void removeCommittedEntry(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata)
+Long getOffset(TopicPartition partition)
+long cacheSize()
+boolean contains(TopicPartition topicPartition)
}
Summary
`OffsetCache.java` is a concise, thread-safe caching utility critical for managing the lifecycle of processed Kafka offsets within Apache Camel's Kafka component. It ensures that offsets are only removed from the cache after successful commits, thereby supporting robust and reliable Kafka consumer offset management. Its simple API provides methods to record, query, and clean up offsets, integrating seamlessly with Kafka consumer workflows.