Cache Synchronization
Purpose
Cache Synchronization addresses the challenge of maintaining a consistent, up-to-date local cache of processed message IDs across multiple distributed instances of the Kafka-backed idempotent repository. Each instance runs independently, potentially on different nodes, and must reflect changes (additions, removals, clears) made by any peer in near real-time. This synchronization ensures that deduplication decisions are consistent cluster-wide, preventing duplicate message processing even in a distributed environment.
Without this synchronization, each node’s cache would be stale or divergent, leading to incorrect idempotency behavior. Cache Synchronization solves this by leveraging a dedicated Kafka topic as a shared state channel for broadcasting cache state changes and by consuming that topic to apply updates locally.
Functionality
The synchronization mechanism is built around a Kafka consumer-producer pair operating on a unique Kafka topic dedicated to this repository instance's cache state:
Broadcasting Cache Mutations:
Whenever the local cache is modified via operations such asadd(String key),remove(String key), orclear(), the change is immediately published as a Kafka message to the dedicated topic. The message key holds the message ID, and the message value encodes the cache action (add,remove, orclear).Consuming Cache Updates:
A backgroundTopicPollerthread continuously polls the Kafka topic, consuming all cache mutation messages sent by all cluster nodes. For each consumed message, it applies the corresponding cache action locally, updating the in-memory cache accordingly.Startup Cache Population:
On service start, before accepting any deduplication queries, the repository consumes the entire Kafka topic from the beginning to replay all past cache actions, effectively rebuilding the current global cache state. This ensures the local cache is fully warmed up and consistent with the cluster state.Configurable Polling and Sync Behavior:
The poll duration (timeout) for Kafka consumer polling is configurable to balance between cache freshness and network load. The sync process can be limited to startup only or continue synchronizing continuously in the background.
Key Workflows
Adding a Message ID:
Check if the key exists in the local cache.
If absent, add the key locally.
Broadcast an
addaction for the key to the Kafka topic.Other instances receive this and add the key to their caches.
Removing a Message ID:
Remove the key from the local cache.
Broadcast a
removeaction for the key to the Kafka topic.Other instances receive this and remove the key from their caches.
Clearing the Cache:
Clear the local cache.
Broadcast a
clearaction with no key to the Kafka topic.Other instances clear their caches accordingly.
Cache Update Consumption:
The
TopicPollerassigns the consumer to all partitions of the topic and polls indefinitely.For each polled message, it decodes the action and applies it to the cache.
It handles exceptions gracefully to maintain continuous synchronization.
Relationship to Parent Topic and Other Subtopics
Cache Synchronization is a core enabler of the overall Kafka-backed idempotent repository’s correctness and robustness. While the parent topic describes how a local cache is used to maintain previously seen message IDs and how the repository deduplicates messages, Cache Synchronization ensures that this cache remains coherent across all distributed instances.
It complements the **Cache Management** subtopic, which focuses on how an LRU cache is used locally to bound memory usage. Cache Synchronization extends this by managing how cache state changes propagate cluster-wide via Kafka.
This synchronization is distinct from other subtopics such as message transformation or offset commit management, as it specifically deals with maintaining consistency of the deduplication state in a distributed system using Kafka’s durable messaging semantics.
Code Snippet Illustrating Key Interaction
Broadcasting a cache action (e.g., adding a key) to the Kafka topic synchronously:
private void broadcastAction(String key, CacheAction action) {
// Send a Kafka record with key as message ID and value as action string
producer.send(new ProducerRecord<>(topic, key, action.toString())).get();
}
Consuming and applying cache updates from Kafka topic:
private void addToCache(ConsumerRecord<String, String> consumerRecord) {
CacheAction action = CacheAction.valueOf(consumerRecord.value());
String messageId = consumerRecord.key();
switch (action) {
case add -> cache.put(messageId, messageId);
case remove -> cache.remove(messageId);
case clear -> cache.clear();
default -> throw new IllegalArgumentException("Unknown action");
}
}
Starting cache synchronization on service start:
@Override
protected void doStart() throws Exception {
// initialize cache and Kafka consumer and producer...
poller = new TopicPoller();
ServiceHelper.startService(poller);
// Synchronously populate cache by consuming entire topic history
poller.run();
if (!startupOnly) {
// start continuous background synchronization
executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "KafkaIdempotentRepositorySync");
executorService.submit(poller);
}
}
Diagram: Cache Synchronization Process Flow
flowchart TD
Start[Start Repository] --> InitCache[Initialize Local Cache]
InitCache --> ConsumeAll[Consume Entire Kafka Topic from Beginning]
ConsumeAll --> PopulateCache[Populate Local Cache with Actions]
PopulateCache --> StartPoller[Start Background Poller (if not startupOnly)]
StartPoller --> PollLoop[Poll Kafka Topic for Cache Updates]
PollLoop --> ApplyUpdate[Apply Cache Actions (add/remove/clear)]
PollLoop --> PollLoop
UserActionAdd[Add Message ID] --> UpdateLocal[Update Local Cache]
UpdateLocal --> BroadcastAdd[Broadcast 'add' Action to Kafka Topic]
BroadcastAdd --> PollLoop
UserActionRemove[Remove Message ID] --> UpdateLocalRemove[Update Local Cache]
UpdateLocalRemove --> BroadcastRemove[Broadcast 'remove' Action to Kafka Topic]
BroadcastRemove --> PollLoop
UserActionClear[Clear Cache] --> ClearLocal[Clear Local Cache]
ClearLocal --> BroadcastClear[Broadcast 'clear' Action to Kafka Topic]
BroadcastClear --> PollLoop
This flowchart highlights the core synchronization cycle:
On startup, the repository replays all past cache mutations by consuming the entire topic to build the local cache.
Unless configured to sync only once, a background poller continues reading new cache state changes published by any cluster node.
User operations locally update the cache and broadcast their changes, which in turn are consumed by all nodes to keep caches aligned.
Cache Synchronization ensures that the Kafka-backed idempotent repository operates correctly and consistently in distributed environments by leveraging Kafka’s durable, ordered messaging as a shared state channel for cache mutations. This architectural choice enables high availability and fault tolerance while maintaining efficient local deduplication through caching.