KafkaIdempotentRepositoryPersistenceIT.java
Overview
`KafkaIdempotentRepositoryPersistenceIT.java` is an integration test class designed to verify the persistence and correctness of the `KafkaIdempotentRepository` in the Apache Camel Kafka component. The primary focus of this test is to ensure that the idempotent repository correctly recreates its internal cache from pre-existing Kafka topics, guaranteeing that the de-duplication state survives application restarts.
By executing tests in a specific order, the class validates that duplicate messages are filtered out consistently across multiple passes, even after the application or repository is restarted. This ensures reliability and correctness of message processing in distributed systems where exactly-once or at-least-once processing semantics are critical.
This test class extends `BaseKafkaTestSupport` to leverage Kafka infrastructure setup and implements `ConfigurableContext` to customize the Camel context for testing.
Class: KafkaIdempotentRepositoryPersistenceIT
Package
`org.apache.camel.processor.idempotent.kafka`
Inheritance
Extends:
BaseKafkaTestSupportImplements:
ConfigurableContext
Purpose
Tests the persistence behavior of `KafkaIdempotentRepository` ensuring that the repository can reload its state from an existing Kafka topic and continue to filter duplicate messages correctly.
Fields
Name | Type | Description |
|---|---|---|
`REPOSITORY_TOPIC` | `String` (static) | Unique Kafka topic name used as the backing store for the repository, generated with a random UUID to avoid collisions. |
`kafkaIdempotentRepository` | `KafkaIdempotentRepository` | Instance of the repository under test, bound into the Camel registry for route usage. |
Methods
@BeforeAll public static void createRepositoryTopic()
Description: Creates the Kafka topic that backs the idempotent repository before any tests run.
Parameters: None
Return: void
Details: Uses KafkaTestUtil.createTopic() to create a topic with a single partition named according to
REPOSITORY_TOPIC.Usage: Ensures the Kafka topic is ready for repository operations.
void clearTopics()
Description: Deletes the Kafka topic used by the repository.
Parameters: None
Return: void
Details: Invokes Kafka admin client to delete the
REPOSITORY_TOPIC. Used for cleanup after tests.Usage: Called from
testClear()to clean up resources.
@Override @ContextFixture public void configureContext(CamelContext context)
Description: Configures the Camel context by binding the
KafkaIdempotentRepositoryinstance into the Camel registry.Parameters:
context- The CamelContext instance to configure.
Return: void
Details: Instantiates
KafkaIdempotentRepositorywith the topic name and Kafka bootstrap servers and registers it under the name"kafkaIdempotentRepositoryPersistence".Usage: Sets up the idempotent repository for use within Camel routes.
@Override protected RouteBuilder createRouteBuilder()
Description: Defines the Camel route used in the tests.
Parameters: None
Return:
RouteBuilderDetails:
Routes messages from
"direct:in"Sends to
"mock:before"endpoint for monitoring all incoming messagesApplies an idempotent consumer using the
idheader with the Kafka-backed repositoryForwards unique messages to
"mock:out"
Usage: Used by the Camel testing framework to create the route under test.
private void sendMessages(long count)
Description: Sends a specified number of test messages to the
"direct:in"endpoint.Parameters:
count- Number of messages to send.
Return: void
Details: Sends messages with body
"Test message"and an"id"header cycling through values 0 to 4 (i % 5) to induce duplicates intentionally.Usage: Used by multiple tests to produce messages for the idempotent consumer.
Test Methods
All tests are ordered and annotated with JUnit 5 annotations to enforce execution order and provide descriptive names.
1. testFirstPassFiltersAsExpected()
Annotation:
@Order(1),@Test,@DisplayNamePurpose: Sends 10 messages and verifies that duplicate messages are filtered out on first pass.
Behavior:
Verifies all 10 messages are received at
"mock:before".Checks only 5 unique messages (ids 0-4) pass through the idempotent filter to
"mock:out".
2. testSecondPassFiltersEverything()
Annotation:
@Order(2),@RepeatedTest(3),@DisabledIfSystemProperty,@DisplayNamePurpose: Resends the same 10 messages three times and verifies no duplicates pass.
Behavior:
All messages are received at
"mock:before".None pass the idempotent filter to
"mock:out"on repeated sends.
Note: Disabled for remote Kafka instances due to topic deletion restrictions.
3. testThirdPassFiltersEverything(long count, long passes)
Annotation:
@Order(3),@ParameterizedTest,@MethodSource,@DisabledIfSystemProperty,@DisplayNamePurpose: Runs multiple passes with varying message counts and verifies no duplicates pass.
Parameters:
count- Number of messages per pass.passes- Number of passes to send messages.
Behavior:
Ensures total messages received at
"mock:before"equalscount * passes.Checks no messages pass through to
"mock:out"due to duplication.
Data Source: Randomized counts and passes via
multiplePassesProvider().Note: Disabled for remote Kafka instances.
4. testFourthPass()
Annotation:
@Order(4),@Test,@DisabledIfSystemProperty,@DisplayNamePurpose: Sends messages with ids 5-9 (new unique ids) and verifies they pass through.
Behavior:
All 5 new messages are received at
"mock:before".All 5 pass the idempotent filter to
"mock:out".
5. testClear()
Annotation:
@Order(5),@Test,@DisplayNamePurpose: Tests that clearing the repository does not throw exceptions and performs topic cleanup.
Behavior:
Calls
clear()on the repository and asserts no exceptions.Calls
clearTopics()to delete the Kafka topic for clean teardown.
Static Helper Methods
private static Stream<Arguments> multiplePassesProvider()
Description: Supplies test parameters for the parameterized test
testThirdPassFiltersEverything.Returns: Stream of
Argumentscontaining pairs of(count, passes)with randomized and fixed values.Purpose: Enables running tests with variable message counts and pass counts to test repository robustness.
Implementation Details
KafkaIdempotentRepository: This is a Kafka-backed implementation of Camel's
IdempotentRepositoryinterface, which stores message IDs in a Kafka topic to guarantee deduplication across distributed consumers and survives restarts by reloading state from Kafka.Testing Approach: The test sends messages with duplicate IDs to a route that uses the Kafka idempotent repository. It verifies that duplicates are filtered and the repository state persists across multiple message send passes, simulating application restarts.
Ordering: Tests run in a specific order to simulate lifecycle scenarios:
Initial pass accepts unique messages.
Subsequent passes reject duplicates.
Later passes accept new unique messages.
Finally, repository clearing and topic cleanup.
Awaitility Usage: The use of Awaitility (
await()) ensures asynchronous message consumption and processing in the tests before assertions are checked.Conditional Disabling: Some tests are disabled for remote Kafka instances where topic management might be restricted.
Interaction with Other Components
Camel Context: The test configures Camel routes and registers the Kafka idempotent repository as a bean to be used in
idempotentConsumer.Kafka Infrastructure: Relies heavily on Kafka topics for storing repository state and uses Kafka admin clients for topic creation and deletion.
Mock Endpoints: Uses Camel's MockEndpoint to monitor messages before and after applying idempotency logic.
BaseKafkaTestSupport: Provides Kafka test infrastructure like embedded Kafka or test clusters, bootstrap server information, and admin clients.
KafkaIdempotentRepository: The core component under test, implementing idempotent message filtering backed by Kafka.
Usage Example
Here's a simplified conceptual example of how the repository is used within the test:
// Configure Camel context and bind repository
KafkaIdempotentRepository repo = new KafkaIdempotentRepository(topic, bootstrapServers);
context.getRegistry().bind("kafkaIdempotentRepositoryPersistence", repo);
// Define route using idempotent consumer with the repository
from("direct:in")
.to("mock:before")
.idempotentConsumer(header("id"))
.idempotentRepository("kafkaIdempotentRepositoryPersistence")
.to("mock:out")
.end();
// Send messages with duplicate IDs
template.sendBodyAndHeader("direct:in", "Test message", "id", 1);
template.sendBodyAndHeader("direct:in", "Test message", "id", 1); // filtered out
Mermaid Class Diagram
classDiagram
class KafkaIdempotentRepositoryPersistenceIT {
- static String REPOSITORY_TOPIC
- KafkaIdempotentRepository kafkaIdempotentRepository
+ static void createRepositoryTopic()
+ void clearTopics()
+ void configureContext(CamelContext context)
+ RouteBuilder createRouteBuilder()
- void sendMessages(long count)
+ void testFirstPassFiltersAsExpected()
+ void testSecondPassFiltersEverything()
+ void testThirdPassFiltersEverything(long count, long passes)
+ void testFourthPass()
+ void testClear()
- static Stream~Arguments~ multiplePassesProvider()
}
KafkaIdempotentRepositoryPersistenceIT --|> BaseKafkaTestSupport
KafkaIdempotentRepositoryPersistenceIT ..|> ConfigurableContext
Summary
`KafkaIdempotentRepositoryPersistenceIT.java` is a critical integration test validating that the `KafkaIdempotentRepository` correctly persists and reloads its deduplication state from Kafka topics, enabling reliable idempotent message processing across application restarts. It exercises various message sending scenarios, verifying that duplicates are filtered and unique messages pass through, and ensures repository clearing and Kafka topic management are handled safely.
This test class plays an important role in maintaining the robustness of Apache Camel's Kafka idempotent consumer support in distributed and fault-tolerant messaging environments.