Cache Management
Purpose
Cache Management addresses the challenge of efficiently maintaining a bounded, in-memory set of processed message IDs for the Kafka-backed idempotent repository. Its primary role is to provide a local cache that tracks seen message IDs to enable fast deduplication decisions without repeatedly querying Kafka. This caching layer prevents unbounded memory growth by enforcing a maximum size and uses an eviction strategy to remove the least recently used entries.
This subtopic ensures that the idempotent consumer can quickly identify duplicates with minimal latency while keeping memory usage under control, which is critical when handling large volumes of messages in distributed environments.
Functionality
At its core, Cache Management uses an LRU (Least Recently Used) cache implementation to hold message IDs that have been processed. Key workflows and characteristics include:
Bounded Cache Size: The cache limits the number of stored message IDs to a configurable maximum (default: 1000), preventing excessive memory consumption.
LRU Eviction Policy: When the cache reaches its maximum size, the least recently used entries are evicted to make room for new ones, maintaining cache freshness.
Thread-Safe Local Storage: The cache is a thread-safe map (created via
LRUCacheFactory.newLRUCache(maxCacheSize)) that supports concurrent access and updates during message processing.Cache Population on Startup: Upon startup, the cache is rebuilt by consuming all prior state messages from a dedicated Kafka topic, ensuring the local cache reflects the latest cluster-wide idempotent state.
Real-Time Cache Updates: Subsequent cache mutations (adds, removes, clears) are broadcast to peers via Kafka topic messages, and the local cache listens to those messages to stay synchronized, but only the caching mechanism is discussed here.
Cache Counters and Metrics: The cache tracks the number of processed state sync events for monitoring and management visibility.
Example snippet showing cache initialization and usage:
// Create an LRU cache with configured max size
this.cache = LRUCacheFactory.newLRUCache(maxCacheSize);
// Add a message ID to cache on successful deduplication
if (!cache.containsKey(key)) {
cache.put(key, key);
broadcastAction(key, CacheAction.add);
}
Integration
Cache Management is a foundational component of the Kafka-backed idempotent repository, tightly integrated with other subtopics:
Cache Synchronization: The local cache is continuously updated by consuming messages from the Kafka topic that represent cache actions (add, remove, clear). Cache Management provides the data structure and local operations, while synchronization ensures consistency across distributed nodes.
Kafka Topic State: The cache is backed by a Kafka topic that stores all cache mutations. On startup, the repository consumes this topic fully to populate the cache, enabling fault-tolerant recovery and shared state.
Idempotent Repository API: Cache Management supports the repository operations (
add,contains,remove) by serving as the primary fast-access store for message IDs, minimizing Kafka interaction latency.Broadcasting Changes: When local cache changes occur, Cache Management triggers broadcasting those changes as Kafka messages to propagate state cluster-wide.
By isolating cache size management and eviction policies here, the idempotent repository can scale effectively while preserving correctness and consistency in distributed Kafka consumer deployments.
Diagram
classDiagram
class KafkaIdempotentRepository {
- Map<String, String> cache
- int maxCacheSize
- long cacheCounter
+ add(String key) bool
+ contains(String key) bool
+ remove(String key) bool
+ clear()
+ getCacheSize() int
+ getCacheCounter() long
}
class LRUCacheFactory {
+ newLRUCache(int maxSize) Map
}
KafkaIdempotentRepository --> LRUCacheFactory : uses
KafkaIdempotentRepository : -populateCache()
KafkaIdempotentRepository : -addToCache(ConsumerRecord)
KafkaIdempotentRepository : -broadcastAction(String key, CacheAction action)
This class diagram highlights that the `KafkaIdempotentRepository` relies on an LRU cache instance created via the `LRUCacheFactory` to maintain a bounded set of processed message IDs. Key methods manipulate and query this cache while managing synchronization and broadcasting state changes.