Idempotent Consumption
Overview
The **Idempotent Consumption** module provides a mechanism to ensure that messages are processed exactly once by deduplicating message IDs using a Kafka-backed idempotent repository. This repository avoids processing duplicate messages across distributed instances by maintaining a synchronized cache of seen message IDs that is shared and updated via a dedicated Kafka topic.
This approach solves the problem of duplicate message processing in distributed or clustered environments where multiple consumers or routes may receive overlapping messages. By storing and synchronizing message IDs in Kafka, the system achieves consistency and fault-tolerance without relying on external databases or complex coordination services.
Core Concepts and Purpose
Idempotent Repository: An abstraction for storing message identifiers to prevent duplicate processing. This implementation uses Kafka itself as the backing store.
Local Cache with Kafka Synchronization: Each instance maintains an in-memory cache of message IDs for quick lookup. This cache is kept consistent across instances by broadcasting cache mutations (add/remove/clear) to a dedicated Kafka topic and consuming those events to update the cache.
Topic-based Message ID Sharing: A unique Kafka topic per logical repository is used to publish cache mutation events. Consumers of this topic rebuild or update their caches by consuming all events from the topic.
Startup Cache Population: On startup, the consumer reads the entire Kafka topic to reconstruct the current set of known message IDs, ensuring the cache reflects the latest state before processing begins.
Continuous or Startup-Only Sync: The repository can be configured to either continue syncing cache updates from the Kafka topic in the background or only perform a one-time sync at startup.
Transactional and Eager Modes: Supports modes to control how aggressively duplicates are filtered and synchronized, suitable for various use cases and performance trade-offs.
How the Module Works
The key workflow involves maintaining a consistent, deduplicated set of message IDs across distributed nodes, leveraging Kafka for communication and persistence.
1. Initialization and Startup
On startup, the repository initializes its local cache as an LRU cache with a configurable maximum size.
Kafka consumer and producer clients are created with configured properties, including bootstrap servers and group ID.
The repository assigns the consumer to all partitions of the configured topic and seeks to the beginning to consume all past cache mutation events.
A synchronous poll loop consumes the entire topic, applying
add,remove, orclearactions contained in message values to update the local cache.This process ensures the cache is fully warmed up and consistent with the global state before processing.
2. Cache Synchronization
After startup, if configured to do so, a background thread (
TopicPoller) continuously polls the Kafka topic for new cache mutation events.Each event contains a key (message ID) and a value indicating the cache action (
add,remove,clear).The consumer updates the local cache accordingly, keeping it in sync with changes made by other repository instances.
This background syncing allows near real-time propagation of cache state changes across distributed consumers.
3. Adding and Removing Entries
When a new message ID is added via the
add(String key)method:The local cache is checked; if the key exists, the method returns
falseindicating a duplicate.Otherwise, the key is inserted into the local cache.
A synchronous Kafka producer send publishes an
addevent with the key to the topic, broadcasting the mutation to other instances.
When a key is removed via
remove(String key):The key is removed from the local cache.
A synchronous
removeevent is sent to the Kafka topic to notify peers.
The
clear()method broadcasts aclearevent causing all caches to be emptied.
4. Cache Inspection and Metrics
Methods like
contains(String key)check the local cache for message ID presence.Managed operations expose metrics such as the number of sync events received (
cacheCounter) and current cache size.
Interaction with Other Components
The KafkaIdempotentRepository is typically injected into Camel routes or processors requiring idempotent message consumption.
It acts as an implementation of the IdempotentRepository SPI, which Camel's idempotent consumer pattern uses to filter duplicates.
It depends on Kafka clients for producer and consumer functionality, configured separately or via bootstrap server properties.
The Kafka topic used by this repository must be unique per logical repository to avoid cross-contamination of message IDs.
The repository's background consumer thread operates independently, asynchronously updating the cache state.
This module interacts indirectly with Kafka brokers, the Camel context (for thread and service management), and the route processing logic that invokes its API.
Important Concepts and Design Patterns
Local Cache with Distributed Synchronization
Uses an LRU cache to limit memory usage while storing recent message IDs.
Cache mutations are broadcast via Kafka to achieve distributed consistency.
The cache is asynchronously updated by consuming the topic, ensuring eventual consistency across nodes.
Idempotent Repository SPI Implementation
Implements Camel's IdempotentRepository interface, allowing seamless integration with Camel's idempotent consumer pattern.
Methods
add,contains,remove, andclearare overridden to implement Kafka-backed logic.
Kafka Consumer/Producer Usage Patterns
The Kafka consumer is assigned manually to all partitions and seeks to the beginning to consume all historical cache mutation events.
Producer sends are synchronous (
.get()on theFuture) to guarantee cache mutation persistence before returning.
Background Synchronization Service
A dedicated background service (
TopicPoller) runs continuously (unless configured for startup-only) to keep the cache fresh.Uses a thread managed by Camel's
ExecutorServiceManagerto handle lifecycle and shutdown.
Code References and Illustrations
Cache Initialization and Startup Sync
this.cache = LRUCacheFactory.newLRUCache(maxCacheSize);
consumer = new KafkaConsumer<>(consumerConfig);
producer = new KafkaProducer<>(producerConfig);
poller = new TopicPoller();
ServiceHelper.startService(poller);
// Synchronously populate cache on startup
poller.run();
The cache is initialized using a least-recently-used strategy to cap memory usage.
The
TopicPollerservice is started and immediately invoked to perform a full cache sync.
Cache Population by Consuming Topic
private void populateCache() {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
Collection<TopicPartition> partitions = partitionInfos.stream()
.map(pi -> new TopicPartition(pi.topic(), pi.partition()))
.toList();
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
while (!KafkaConsumerUtil.isReachedOffsets(consumer, endOffsets)) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(pollDurationMs));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
addToCache(consumerRecord);
}
}
}
The consumer is assigned all partitions and seeks to the start to read all events.
Polling continues until the consumer has reached the latest offsets.
Each consumed record triggers a cache update.
Cache Mutation Event Handling
private void addToCache(ConsumerRecord<String, String> consumerRecord) {
CacheAction action = CacheAction.valueOf(consumerRecord.value());
String messageId = consumerRecord.key();
if (action == CacheAction.add) {
cache.put(messageId, messageId);
} else if (action == CacheAction.remove) {
cache.remove(messageId);
} else if (action == CacheAction.clear) {
cache.clear();
}
}
Cache updates are driven by
add,remove, andclearactions encoded as the Kafka message value.This ensures all instances interpret events consistently.
Adding a Message ID and Broadcasting Event
@Override
public boolean add(String key) {
if (cache.containsKey(key)) {
return false;
} else {
cache.put(key, key);
broadcastAction(key, CacheAction.add);
return true;
}
}
private void broadcastAction(String key, CacheAction action) {
producer.send(new ProducerRecord<>(topic, key, action.toString())).get();
}
The
addmethod updates the local cache and synchronously sends anaddevent to the Kafka topic.Synchronous send ensures the event is persisted before returning.
Cache Synchronization Subtopic
The **Cache Synchronization** submodule is responsible for maintaining cache consistency between multiple repository instances:
Uses a continuously running
TopicPollerthread that polls the Kafka topic for cache mutation events.On startup, the poller performs a full cache rebuild by consuming from the beginning of the topic.
If configured with
startupOnly = false, the poller runs indefinitely to apply incremental updates.The local cache thus reflects the union of all mutations broadcast by any instance.
Cache Management Subtopic
The **Cache Management** submodule handles:
Cache implementation as an LRU map with configurable maximum size (
maxCacheSize).Methods to add, remove, check, and clear entries in the cache.
Protection against duplicates by checking cache membership before adding.
Metrics to expose cache size and number of sync events processed.
Cache clearing broadcasts a
clearevent to all instances.
Mermaid Diagram: Cache Synchronization Workflow
sequenceDiagram
participant InstanceA as Repository Instance A
participant KafkaTopic as Kafka Topic
participant InstanceB as Repository Instance B
Note over InstanceA,KafkaTopic: Startup Sync
InstanceA->>KafkaTopic: Assign all partitions and seek to beginning
KafkaTopic-->>InstanceA: Stream all cache mutation events
InstanceA->>InstanceA: Populate local cache
Note over InstanceA,KafkaTopic: Adding a Message ID
InstanceA->>InstanceA: Check cache for key
alt Key not present
InstanceA->>InstanceA: Add key to local cache
InstanceA->>KafkaTopic: Produce 'add' event with key
else Key present
InstanceA-->>InstanceA: Return false (duplicate)
end
Note over KafkaTopic,InstanceB: Event Propagation
KafkaTopic-->>InstanceB: Deliver 'add' event with key
InstanceB->>InstanceB: Update local cache with key
Note over InstanceB,KafkaTopic: Removing a Message ID
InstanceB->>InstanceB: Remove key from local cache
InstanceB->>KafkaTopic: Produce 'remove' event with key
KafkaTopic-->>InstanceA: Deliver 'remove' event with key
InstanceA->>InstanceA: Update cache accordingly
This sequence diagram illustrates how multiple instances keep their caches synchronized by consuming and producing cache mutation events on a shared Kafka topic.
Summary
The **Idempotent Consumption** module implements a Kafka-backed idempotent repository that uses a local in-memory cache synchronized across distributed instances via a dedicated Kafka topic. It provides a robust, scalable, and fault-tolerant mechanism to deduplicate messages in distributed Camel routes, leveraging Kafka for persistence and event-driven cache updates. The module supports startup cache warmup and optional background synchronization, ensuring consistent cache state and exactly-once message processing semantics.