KafkaIdempotentRepository.java
Overview
`KafkaIdempotentRepository` is a Kafka topic-based implementation of Camel's [`IdempotentRepository`](https://camel.apache.org/manual/latest/idempotent-consumer.html) interface. It provides a distributed, fault-tolerant mechanism for deduplicating message processing in Apache Camel routes by storing and synchronizing message IDs across cluster nodes using Kafka as the backing store.
This class maintains a **local in-memory LRU cache** of previously seen message IDs for fast lookups and synchronizes cache mutations (additions, removals, clears) via a **dedicated Kafka topic**. Each repository instance consumes the entire topic on startup to rebuild its cache and optionally continues to sync updates in the background. This approach achieves **exactly-once** processing semantics in distributed, clustered environments without relying on external databases or coordination services.
Key Features and Functionality
Distributed Idempotency: Synchronizes message ID state across multiple instances via Kafka topic messages.
Local LRU Cache: Maintains an in-memory cache with configurable maximum size for fast duplicate detection.
Kafka Topic-Based State Store: Uses a dedicated Kafka topic to broadcast cache mutations (
add,remove,clear) as messages.Startup Cache Population: On startup, consumes the entire Kafka topic to rebuild the local cache to the latest state.
Optional Continuous Sync: Can keep syncing cache updates continuously in a background thread or only once at startup.
Synchronous Kafka Producer Sends: Ensures all cache mutations are reliably published before returning.
Configurable Kafka Consumer and Producer: Supports bootstrap servers or full Kafka client properties for both.
Camel Integration: Implements
IdempotentRepositoryandCamelContextAware, managed via Camel's lifecycle.
Class: KafkaIdempotentRepository
Package
`org.apache.camel.processor.idempotent.kafka`
Interfaces Implemented
org.apache.camel.spi.IdempotentRepositoryorg.apache.camel.CamelContextAware
Annotations
@ManagedResource— Exposes management operations for monitoring.@Metadata— Provides metadata for Camel tooling.@Configurer— Supports configuration metadata generation.
Fields Summary
Field | Type | Description |
|---|---|---|
`camelContext` | `CamelContext` | Camel runtime context injected by the framework. |
`executorService` | `ExecutorService` | Background thread executor for continuous syncing. |
`poller` | `TopicPoller` (inner class) | Runnable service that polls Kafka topic for cache updates. |
`cacheCounter` | `AtomicLong` | Counts number of cache sync events received. |
`cache` | `Map` | In-memory LRU cache storing message IDs. |
`consumer` | `Consumer` | Kafka consumer for reading cache mutation events. |
`producer` | `Producer` | Kafka producer for broadcasting cache mutations. |
`producerConfig` | `Properties` | Kafka producer configuration properties. |
`consumerConfig` | `Properties` | Kafka consumer configuration properties. |
`topic` | `String` | Kafka topic name used for cache mutations (required). |
`bootstrapServers` | `String` | Kafka bootstrap servers URL (used if configs not set). |
`groupId` | `String` | Kafka consumer group ID (optional). |
`maxCacheSize` | `int` | Maximum size of the local LRU cache (default 1000). |
`pollDurationMs` | `int` | Kafka consumer poll timeout in milliseconds (default 100). |
`startupOnly` | `boolean` | If true, syncs cache only on startup (default false). |
Enum: CacheAction
Defines the types of cache mutation events that can be broadcast and consumed:
Value | Description |
|---|---|
`add` | Add a message ID to cache |
`remove` | Remove a message ID from cache |
`clear` | Clear the entire cache |
Constructors
The class provides multiple overloaded constructors allowing flexible configuration:
KafkaIdempotentRepository(); // default
KafkaIdempotentRepository(String topic, String bootstrapServers);
KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId);
KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs);
KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig);
KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs);
KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs, String groupId);
KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs, String groupId);
Use constructors with
Propertiesto fully customize Kafka client configs.Use constructors with
bootstrapServersfor sensible defaults.
Public API Methods
boolean add(String key)
Adds a message ID to the repository if not already present.
Parameters:
key— the message ID to add.
Returns:
trueif the key was not present and was added successfully.falseif the key already exists (duplicate).
Behavior:
Checks the local cache for existence.
If absent, inserts into local cache and broadcasts an
addevent synchronously on the Kafka topic.Other instances will receive this event and update their caches.
Example:
KafkaIdempotentRepository repo = new KafkaIdempotentRepository("my-topic", "localhost:9092");
boolean added = repo.add("message-123");
if (added) {
// process message
} else {
// duplicate message, skip processing
}
boolean contains(String key)
Checks if the given message ID exists in the local cache.
Parameters:
key— the message ID to check.
Returns:
trueif the key exists,falseotherwise.
Example:
if (repo.contains("message-123")) {
// duplicate message
}
boolean remove(String key)
Removes the given message ID from the repository.
Parameters:
key— the message ID to remove.
Returns:
Always returns
true.
Behavior:
Removes the key from the local cache.
Broadcasts a
removeevent synchronously to the Kafka topic for other instances.
Example:
repo.remove("message-123");
boolean confirm(String key)
Parameters:
key— message ID.
Returns:
Always returns
true.
Description:
No-op method required by the interface, used for confirming message processing.
void clear()
Clears the entire local cache and broadcasts a `clear` event to all cluster nodes.
Example:
repo.clear();
Management Operations (Annotated with @ManagedOperation)
long getCacheCounter()— Returns the number of sync events received from the Kafka topic.long getCacheSize()— Returns the current number of entries in the local cache.
Getters and Setters
The class provides standard getters and setters for configuration properties such as:
topicbootstrapServersgroupIdmaxCacheSizepollDurationMsstartupOnlyproducerConfigconsumerConfigcamelContext
Lifecycle Methods
void doStart()
Validates mandatory properties (
camelContext,topic, and Kafka configs).Initializes the local LRU cache with the configured maximum size.
Creates Kafka consumer and producer clients with appropriate serializers and configs.
Starts the
TopicPollerservice.Calls
poller.run()synchronously to fully populate the cache by consuming the entire Kafka topic from beginning to end.If
startupOnlyisfalse, starts a background thread that continuously syncs cache updates from Kafka.
void doStop()
Shuts down the background executor service if running.
Stops the
TopicPollerservice.Closes Kafka consumer and producer clients.
Logs the final cache counter value.
Internal Private Methods
void populateCache()
Retrieves all partitions for the configured Kafka topic.
Assigns the consumer to all partitions and seeks to the beginning.
Reads all existing cache mutation events until the end offsets to rebuild the local cache.
Invokes
addToCachefor each consumed event.
void addToCache(ConsumerRecord<String, String> consumerRecord)
Parses the cache action (
add,remove,clear) from the Kafka message value.Applies the action to the local cache:
add: insert the message ID.remove: remove the message ID.clear: clear the entire cache.
Increments the
cacheCounter.Logs warnings for unexpected values.
void broadcastAction(String key, CacheAction action)
Sends a synchronous Kafka producer record with the given key and cache action string (e.g.,
add,remove,clear).Ensures the mutation event is persisted before returning.
Handles and wraps exceptions in
RuntimeCamelException.
Inner Class: TopicPoller
Implements
RunnableandServiceSupport.Responsible for consuming cache mutation events from Kafka to keep the local cache synchronized.
On first run, calls
populateCache()to rebuild cache from scratch.If running continuously, polls Kafka periodically with timeout
pollDurationMsand applies all received cache mutations.Handles exceptions gracefully and logs warnings.
Stops when the repository is stopped.
Important Implementation Details
LRU Cache:
The local cache is created viaLRUCacheFactory.newLRUCache(maxCacheSize)to bound memory usage and evict least recently used entries when the cache is full.Kafka Consumer Configuration:
Auto commit is disabled (
enable.auto.commit = false), as offsets are managed internally.Uses
StringDeserializerfor both key and value.Consumer is manually assigned to all partitions and seeks to the beginning for full cache population.
Kafka Producer Configuration:
Uses
StringSerializerfor both key and value.Configured for synchronous sends by disabling batching (
batch.size = 0) and requiring acks (acks = 1).
Synchronous Sends:
All cache mutation events are sent synchronously viaproducer.send(...).get()to ensure reliable delivery and ordering.Cache Synchronization:
TheTopicPollerruns either once (at startup) or continuously, applying all cache mutation events from Kafka to keep the local cache consistent with other instances.Threading:
When continuous syncing is enabled, a single background thread is created and managed by Camel'sExecutorServiceManager.Error Handling:
Exceptions during polling or sending are logged and ignored to maintain high availability.
Interaction with Other Components
Apache Camel Routes:
This class implements theIdempotentRepositorySPI, which is used by Camel's idempotent consumer pattern to filter duplicate messages in routes.Kafka Brokers:
Uses Kafka topics to store and broadcast cache mutation events. Each repository instance connects as a Kafka consumer and producer.Camel Context:
Utilizes Camel's lifecycle and thread management facilities to start/stop services and manage background threads.Other Repository Instances:
Collaborates with other instances by broadcasting cache changes to the shared Kafka topic, ensuring cluster-wide idempotency.
Usage Example
CamelContext camelContext = new DefaultCamelContext();
KafkaIdempotentRepository repo = new KafkaIdempotentRepository("my-idempotent-topic", "localhost:9092");
repo.setCamelContext(camelContext);
repo.setGroupId("my-group");
repo.setMaxCacheSize(2000);
repo.setPollDurationMs(200);
repo.setStartupOnly(false);
camelContext.getRegistry().bind("kafkaIdempotentRepo", repo);
camelContext.start();
// Use in a route
from("kafka:input-topic")
.idempotentConsumer(header("messageId")).messageIdRepository(repo)
.to("log:processed");
Mermaid Class Diagram
classDiagram
class KafkaIdempotentRepository {
-CamelContext camelContext
-ExecutorService executorService
-TopicPoller poller
-AtomicLong cacheCounter
-Map<String, String> cache
-Consumer<String, String> consumer
-Producer<String, String> producer
-Properties producerConfig
-Properties consumerConfig
-String topic
-String bootstrapServers
-String groupId
-int maxCacheSize
-int pollDurationMs
-boolean startupOnly
+KafkaIdempotentRepository()
+add(String key) bool
+contains(String key) bool
+remove(String key) bool
+clear()
+confirm(String key) bool
+getCacheCounter() long
+getCacheSize() int
}
class TopicPoller {
-AtomicBoolean init
+run()
}
KafkaIdempotentRepository o-- TopicPoller : uses
Summary
`KafkaIdempotentRepository` is a robust, distributed idempotent repository implementation backed by Kafka topics. It efficiently deduplicates messages across multiple Camel route instances by maintaining a synchronized, local LRU cache of message IDs. Cache mutations are reliably broadcast and consumed via Kafka, ensuring eventual consistency and fault tolerance. This design leverages Kafka's durability and ordering guarantees to enable exactly-once message processing semantics in distributed environments without external coordination.
Additional Diagram: Cache Synchronization Workflow
sequenceDiagram
participant InstanceA as Repo Instance A
participant KafkaTopic as Kafka Topic
participant InstanceB as Repo Instance B
Note over InstanceA,KafkaTopic: Startup - consume full topic to populate cache
InstanceA->>KafkaTopic: Assign partitions and seek to beginning
KafkaTopic-->>InstanceA: Stream cache mutation events
InstanceA->>InstanceA: Update local cache
Note over InstanceA: Add message ID
InstanceA->>InstanceA: Check cache
alt Not found
InstanceA->>InstanceA: Add to local cache
InstanceA->>KafkaTopic: Produce 'add' event
else Found
InstanceA-->>InstanceA: Return false
end
Note over KafkaTopic,InstanceB: Broadcast event propagation
KafkaTopic-->>InstanceB: Deliver 'add' event
InstanceB->>InstanceB: Update local cache
Note over InstanceB: Remove message ID
InstanceB->>InstanceB: Remove from local cache
InstanceB->>KafkaTopic: Produce 'remove' event
KafkaTopic-->>InstanceA: Deliver 'remove' event
InstanceA->>InstanceA: Update local cache accordingly
This completes the comprehensive documentation of `KafkaIdempotentRepository.java`.