Cache Synchronization

Purpose

Cache Synchronization addresses the challenge of maintaining a consistent, up-to-date local cache of processed message IDs across multiple distributed instances of the Kafka-backed idempotent repository. Each instance runs independently, potentially on different nodes, and must reflect changes (additions, removals, clears) made by any peer in near real-time. This synchronization ensures that deduplication decisions are consistent cluster-wide, preventing duplicate message processing even in a distributed environment.

Without this synchronization, each node’s cache would be stale or divergent, leading to incorrect idempotency behavior. Cache Synchronization solves this by leveraging a dedicated Kafka topic as a shared state channel for broadcasting cache state changes and by consuming that topic to apply updates locally.

Functionality

The synchronization mechanism is built around a Kafka consumer-producer pair operating on a unique Kafka topic dedicated to this repository instance's cache state:

Key Workflows

Relationship to Parent Topic and Other Subtopics

Cache Synchronization is a core enabler of the overall Kafka-backed idempotent repository’s correctness and robustness. While the parent topic describes how a local cache is used to maintain previously seen message IDs and how the repository deduplicates messages, Cache Synchronization ensures that this cache remains coherent across all distributed instances.

It complements the **Cache Management** subtopic, which focuses on how an LRU cache is used locally to bound memory usage. Cache Synchronization extends this by managing how cache state changes propagate cluster-wide via Kafka.

This synchronization is distinct from other subtopics such as message transformation or offset commit management, as it specifically deals with maintaining consistency of the deduplication state in a distributed system using Kafka’s durable messaging semantics.


Code Snippet Illustrating Key Interaction

Broadcasting a cache action (e.g., adding a key) to the Kafka topic synchronously:

private void broadcastAction(String key, CacheAction action) {
    // Send a Kafka record with key as message ID and value as action string
    producer.send(new ProducerRecord<>(topic, key, action.toString())).get();
}

Consuming and applying cache updates from Kafka topic:

private void addToCache(ConsumerRecord<String, String> consumerRecord) {
    CacheAction action = CacheAction.valueOf(consumerRecord.value());
    String messageId = consumerRecord.key();
    switch (action) {
        case add -> cache.put(messageId, messageId);
        case remove -> cache.remove(messageId);
        case clear -> cache.clear();
        default -> throw new IllegalArgumentException("Unknown action");
    }
}

Starting cache synchronization on service start:

@Override
protected void doStart() throws Exception {
    // initialize cache and Kafka consumer and producer...

    poller = new TopicPoller();
    ServiceHelper.startService(poller);

    // Synchronously populate cache by consuming entire topic history
    poller.run();

    if (!startupOnly) {
        // start continuous background synchronization
        executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "KafkaIdempotentRepositorySync");
        executorService.submit(poller);
    }
}

Diagram: Cache Synchronization Process Flow

flowchart TD
    Start[Start Repository] --> InitCache[Initialize Local Cache]
    InitCache --> ConsumeAll[Consume Entire Kafka Topic from Beginning]
    ConsumeAll --> PopulateCache[Populate Local Cache with Actions]
    PopulateCache --> StartPoller[Start Background Poller (if not startupOnly)]
    StartPoller --> PollLoop[Poll Kafka Topic for Cache Updates]
    PollLoop --> ApplyUpdate[Apply Cache Actions (add/remove/clear)]
    PollLoop --> PollLoop

    UserActionAdd[Add Message ID] --> UpdateLocal[Update Local Cache]
    UpdateLocal --> BroadcastAdd[Broadcast 'add' Action to Kafka Topic]
    BroadcastAdd --> PollLoop

    UserActionRemove[Remove Message ID] --> UpdateLocalRemove[Update Local Cache]
    UpdateLocalRemove --> BroadcastRemove[Broadcast 'remove' Action to Kafka Topic]
    BroadcastRemove --> PollLoop

    UserActionClear[Clear Cache] --> ClearLocal[Clear Local Cache]
    ClearLocal --> BroadcastClear[Broadcast 'clear' Action to Kafka Topic]
    BroadcastClear --> PollLoop

This flowchart highlights the core synchronization cycle:


Cache Synchronization ensures that the Kafka-backed idempotent repository operates correctly and consistently in distributed environments by leveraging Kafka’s durable, ordered messaging as a shared state channel for cache mutations. This architectural choice enables high availability and fault tolerance while maintaining efficient local deduplication through caching.