KafkaIdempotentRepositoryPersistenceIT.java

Overview

`KafkaIdempotentRepositoryPersistenceIT.java` is an integration test class designed to verify the persistence and correctness of the `KafkaIdempotentRepository` in the Apache Camel Kafka component. The primary focus of this test is to ensure that the idempotent repository correctly recreates its internal cache from pre-existing Kafka topics, guaranteeing that the de-duplication state survives application restarts.

By executing tests in a specific order, the class validates that duplicate messages are filtered out consistently across multiple passes, even after the application or repository is restarted. This ensures reliability and correctness of message processing in distributed systems where exactly-once or at-least-once processing semantics are critical.

This test class extends `BaseKafkaTestSupport` to leverage Kafka infrastructure setup and implements `ConfigurableContext` to customize the Camel context for testing.


Class: KafkaIdempotentRepositoryPersistenceIT

Package

`org.apache.camel.processor.idempotent.kafka`

Inheritance

Purpose

Tests the persistence behavior of `KafkaIdempotentRepository` ensuring that the repository can reload its state from an existing Kafka topic and continue to filter duplicate messages correctly.


Fields

Name

Type

Description

`REPOSITORY_TOPIC`

`String` (static)

Unique Kafka topic name used as the backing store for the repository, generated with a random UUID to avoid collisions.

`kafkaIdempotentRepository`

`KafkaIdempotentRepository`

Instance of the repository under test, bound into the Camel registry for route usage.


Methods

@BeforeAll public static void createRepositoryTopic()


void clearTopics()


@Override @ContextFixture public void configureContext(CamelContext context)


@Override protected RouteBuilder createRouteBuilder()


private void sendMessages(long count)


Test Methods

All tests are ordered and annotated with JUnit 5 annotations to enforce execution order and provide descriptive names.

1. testFirstPassFiltersAsExpected()

2. testSecondPassFiltersEverything()

3. testThirdPassFiltersEverything(long count, long passes)

4. testFourthPass()

5. testClear()


Static Helper Methods

private static Stream<Arguments> multiplePassesProvider()


Implementation Details


Interaction with Other Components


Usage Example

Here's a simplified conceptual example of how the repository is used within the test:

// Configure Camel context and bind repository
KafkaIdempotentRepository repo = new KafkaIdempotentRepository(topic, bootstrapServers);
context.getRegistry().bind("kafkaIdempotentRepositoryPersistence", repo);

// Define route using idempotent consumer with the repository
from("direct:in")
    .to("mock:before")
    .idempotentConsumer(header("id"))
        .idempotentRepository("kafkaIdempotentRepositoryPersistence")
        .to("mock:out")
    .end();

// Send messages with duplicate IDs
template.sendBodyAndHeader("direct:in", "Test message", "id", 1);
template.sendBodyAndHeader("direct:in", "Test message", "id", 1); // filtered out

Mermaid Class Diagram

classDiagram
    class KafkaIdempotentRepositoryPersistenceIT {
        - static String REPOSITORY_TOPIC
        - KafkaIdempotentRepository kafkaIdempotentRepository
        + static void createRepositoryTopic()
        + void clearTopics()
        + void configureContext(CamelContext context)
        + RouteBuilder createRouteBuilder()
        - void sendMessages(long count)
        + void testFirstPassFiltersAsExpected()
        + void testSecondPassFiltersEverything()
        + void testThirdPassFiltersEverything(long count, long passes)
        + void testFourthPass()
        + void testClear()
        - static Stream~Arguments~ multiplePassesProvider()
    }
    KafkaIdempotentRepositoryPersistenceIT --|> BaseKafkaTestSupport
    KafkaIdempotentRepositoryPersistenceIT ..|> ConfigurableContext

Summary

`KafkaIdempotentRepositoryPersistenceIT.java` is a critical integration test validating that the `KafkaIdempotentRepository` correctly persists and reloads its deduplication state from Kafka topics, enabling reliable idempotent message processing across application restarts. It exercises various message sending scenarios, verifying that duplicates are filtered and unique messages pass through, and ensures repository clearing and Kafka topic management are handled safely.

This test class plays an important role in maintaining the robustness of Apache Camel's Kafka idempotent consumer support in distributed and fault-tolerant messaging environments.