Idempotent Consumption

Overview

The **Idempotent Consumption** module provides a mechanism to ensure that messages are processed exactly once by deduplicating message IDs using a Kafka-backed idempotent repository. This repository avoids processing duplicate messages across distributed instances by maintaining a synchronized cache of seen message IDs that is shared and updated via a dedicated Kafka topic.

This approach solves the problem of duplicate message processing in distributed or clustered environments where multiple consumers or routes may receive overlapping messages. By storing and synchronizing message IDs in Kafka, the system achieves consistency and fault-tolerance without relying on external databases or complex coordination services.


Core Concepts and Purpose


How the Module Works

The key workflow involves maintaining a consistent, deduplicated set of message IDs across distributed nodes, leveraging Kafka for communication and persistence.

1. Initialization and Startup

2. Cache Synchronization

3. Adding and Removing Entries

4. Cache Inspection and Metrics


Interaction with Other Components


Important Concepts and Design Patterns

Local Cache with Distributed Synchronization

Idempotent Repository SPI Implementation

Kafka Consumer/Producer Usage Patterns

Background Synchronization Service


Code References and Illustrations

Cache Initialization and Startup Sync

this.cache = LRUCacheFactory.newLRUCache(maxCacheSize);

consumer = new KafkaConsumer<>(consumerConfig);
producer = new KafkaProducer<>(producerConfig);

poller = new TopicPoller();
ServiceHelper.startService(poller);

// Synchronously populate cache on startup
poller.run();

Cache Population by Consuming Topic

private void populateCache() {
    List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
    Collection<TopicPartition> partitions = partitionInfos.stream()
            .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
            .toList();

    consumer.assign(partitions);
    consumer.seekToBeginning(partitions);

    Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);

    while (!KafkaConsumerUtil.isReachedOffsets(consumer, endOffsets)) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(pollDurationMs));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            addToCache(consumerRecord);
        }
    }
}

Cache Mutation Event Handling

private void addToCache(ConsumerRecord<String, String> consumerRecord) {
    CacheAction action = CacheAction.valueOf(consumerRecord.value());
    String messageId = consumerRecord.key();

    if (action == CacheAction.add) {
        cache.put(messageId, messageId);
    } else if (action == CacheAction.remove) {
        cache.remove(messageId);
    } else if (action == CacheAction.clear) {
        cache.clear();
    }
}

Adding a Message ID and Broadcasting Event

@Override
public boolean add(String key) {
    if (cache.containsKey(key)) {
        return false;
    } else {
        cache.put(key, key);
        broadcastAction(key, CacheAction.add);
        return true;
    }
}

private void broadcastAction(String key, CacheAction action) {
    producer.send(new ProducerRecord<>(topic, key, action.toString())).get();
}

Cache Synchronization Subtopic

The **Cache Synchronization** submodule is responsible for maintaining cache consistency between multiple repository instances:


Cache Management Subtopic

The **Cache Management** submodule handles:


Mermaid Diagram: Cache Synchronization Workflow

sequenceDiagram
    participant InstanceA as Repository Instance A
    participant KafkaTopic as Kafka Topic
    participant InstanceB as Repository Instance B

    Note over InstanceA,KafkaTopic: Startup Sync
    InstanceA->>KafkaTopic: Assign all partitions and seek to beginning
    KafkaTopic-->>InstanceA: Stream all cache mutation events
    InstanceA->>InstanceA: Populate local cache

    Note over InstanceA,KafkaTopic: Adding a Message ID
    InstanceA->>InstanceA: Check cache for key
    alt Key not present
        InstanceA->>InstanceA: Add key to local cache
        InstanceA->>KafkaTopic: Produce 'add' event with key
    else Key present
        InstanceA-->>InstanceA: Return false (duplicate)
    end

    Note over KafkaTopic,InstanceB: Event Propagation
    KafkaTopic-->>InstanceB: Deliver 'add' event with key
    InstanceB->>InstanceB: Update local cache with key

    Note over InstanceB,KafkaTopic: Removing a Message ID
    InstanceB->>InstanceB: Remove key from local cache
    InstanceB->>KafkaTopic: Produce 'remove' event with key

    KafkaTopic-->>InstanceA: Deliver 'remove' event with key
    InstanceA->>InstanceA: Update cache accordingly

This sequence diagram illustrates how multiple instances keep their caches synchronized by consuming and producing cache mutation events on a shared Kafka topic.


Summary

The **Idempotent Consumption** module implements a Kafka-backed idempotent repository that uses a local in-memory cache synchronized across distributed instances via a dedicated Kafka topic. It provides a robust, scalable, and fault-tolerant mechanism to deduplicate messages in distributed Camel routes, leveraging Kafka for persistence and event-driven cache updates. The module supports startup cache warmup and optional background synchronization, ensuring consistent cache state and exactly-once message processing semantics.