KafkaIdempotentRepository.java


Overview

`KafkaIdempotentRepository` is a Kafka topic-based implementation of Camel's [`IdempotentRepository`](https://camel.apache.org/manual/latest/idempotent-consumer.html) interface. It provides a distributed, fault-tolerant mechanism for deduplicating message processing in Apache Camel routes by storing and synchronizing message IDs across cluster nodes using Kafka as the backing store.

This class maintains a **local in-memory LRU cache** of previously seen message IDs for fast lookups and synchronizes cache mutations (additions, removals, clears) via a **dedicated Kafka topic**. Each repository instance consumes the entire topic on startup to rebuild its cache and optionally continues to sync updates in the background. This approach achieves **exactly-once** processing semantics in distributed, clustered environments without relying on external databases or coordination services.


Key Features and Functionality


Class: KafkaIdempotentRepository

Package

`org.apache.camel.processor.idempotent.kafka`

Interfaces Implemented

Annotations


Fields Summary

Field

Type

Description

`camelContext`

`CamelContext`

Camel runtime context injected by the framework.

`executorService`

`ExecutorService`

Background thread executor for continuous syncing.

`poller`

`TopicPoller` (inner class)

Runnable service that polls Kafka topic for cache updates.

`cacheCounter`

`AtomicLong`

Counts number of cache sync events received.

`cache`

`Map`

In-memory LRU cache storing message IDs.

`consumer`

`Consumer`

Kafka consumer for reading cache mutation events.

`producer`

`Producer`

Kafka producer for broadcasting cache mutations.

`producerConfig`

`Properties`

Kafka producer configuration properties.

`consumerConfig`

`Properties`

Kafka consumer configuration properties.

`topic`

`String`

Kafka topic name used for cache mutations (required).

`bootstrapServers`

`String`

Kafka bootstrap servers URL (used if configs not set).

`groupId`

`String`

Kafka consumer group ID (optional).

`maxCacheSize`

`int`

Maximum size of the local LRU cache (default 1000).

`pollDurationMs`

`int`

Kafka consumer poll timeout in milliseconds (default 100).

`startupOnly`

`boolean`

If true, syncs cache only on startup (default false).


Enum: CacheAction

Defines the types of cache mutation events that can be broadcast and consumed:

Value

Description

`add`

Add a message ID to cache

`remove`

Remove a message ID from cache

`clear`

Clear the entire cache


Constructors

The class provides multiple overloaded constructors allowing flexible configuration:

KafkaIdempotentRepository(); // default
KafkaIdempotentRepository(String topic, String bootstrapServers);
KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId);
KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs);
KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig);
KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs);
KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs, String groupId);
KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs, String groupId);

Public API Methods

boolean add(String key)

Adds a message ID to the repository if not already present.

KafkaIdempotentRepository repo = new KafkaIdempotentRepository("my-topic", "localhost:9092");
boolean added = repo.add("message-123");
if (added) {
    // process message
} else {
    // duplicate message, skip processing
}

boolean contains(String key)

Checks if the given message ID exists in the local cache.

if (repo.contains("message-123")) {
    // duplicate message
}

boolean remove(String key)

Removes the given message ID from the repository.

repo.remove("message-123");

boolean confirm(String key)


void clear()

Clears the entire local cache and broadcasts a `clear` event to all cluster nodes.

repo.clear();

Management Operations (Annotated with @ManagedOperation)


Getters and Setters

The class provides standard getters and setters for configuration properties such as:


Lifecycle Methods

void doStart()

void doStop()


Internal Private Methods

void populateCache()

void addToCache(ConsumerRecord<String, String> consumerRecord)

void broadcastAction(String key, CacheAction action)


Inner Class: TopicPoller


Important Implementation Details


Interaction with Other Components


Usage Example

CamelContext camelContext = new DefaultCamelContext();

KafkaIdempotentRepository repo = new KafkaIdempotentRepository("my-idempotent-topic", "localhost:9092");
repo.setCamelContext(camelContext);
repo.setGroupId("my-group");
repo.setMaxCacheSize(2000);
repo.setPollDurationMs(200);
repo.setStartupOnly(false);

camelContext.getRegistry().bind("kafkaIdempotentRepo", repo);
camelContext.start();

// Use in a route
from("kafka:input-topic")
    .idempotentConsumer(header("messageId")).messageIdRepository(repo)
    .to("log:processed");

Mermaid Class Diagram

classDiagram
    class KafkaIdempotentRepository {
        -CamelContext camelContext
        -ExecutorService executorService
        -TopicPoller poller
        -AtomicLong cacheCounter
        -Map<String, String> cache
        -Consumer<String, String> consumer
        -Producer<String, String> producer
        -Properties producerConfig
        -Properties consumerConfig
        -String topic
        -String bootstrapServers
        -String groupId
        -int maxCacheSize
        -int pollDurationMs
        -boolean startupOnly
        +KafkaIdempotentRepository()
        +add(String key) bool
        +contains(String key) bool
        +remove(String key) bool
        +clear()
        +confirm(String key) bool
        +getCacheCounter() long
        +getCacheSize() int
    }

    class TopicPoller {
        -AtomicBoolean init
        +run()
    }

    KafkaIdempotentRepository o-- TopicPoller : uses

Summary

`KafkaIdempotentRepository` is a robust, distributed idempotent repository implementation backed by Kafka topics. It efficiently deduplicates messages across multiple Camel route instances by maintaining a synchronized, local LRU cache of message IDs. Cache mutations are reliably broadcast and consumed via Kafka, ensuring eventual consistency and fault tolerance. This design leverages Kafka's durability and ordering guarantees to enable exactly-once message processing semantics in distributed environments without external coordination.


Additional Diagram: Cache Synchronization Workflow

sequenceDiagram
    participant InstanceA as Repo Instance A
    participant KafkaTopic as Kafka Topic
    participant InstanceB as Repo Instance B

    Note over InstanceA,KafkaTopic: Startup - consume full topic to populate cache
    InstanceA->>KafkaTopic: Assign partitions and seek to beginning
    KafkaTopic-->>InstanceA: Stream cache mutation events
    InstanceA->>InstanceA: Update local cache

    Note over InstanceA: Add message ID
    InstanceA->>InstanceA: Check cache
    alt Not found
        InstanceA->>InstanceA: Add to local cache
        InstanceA->>KafkaTopic: Produce 'add' event
    else Found
        InstanceA-->>InstanceA: Return false
    end

    Note over KafkaTopic,InstanceB: Broadcast event propagation
    KafkaTopic-->>InstanceB: Deliver 'add' event
    InstanceB->>InstanceB: Update local cache

    Note over InstanceB: Remove message ID
    InstanceB->>InstanceB: Remove from local cache
    InstanceB->>KafkaTopic: Produce 'remove' event

    KafkaTopic-->>InstanceA: Deliver 'remove' event
    InstanceA->>InstanceA: Update local cache accordingly

This completes the comprehensive documentation of `KafkaIdempotentRepository.java`.