KafkaIdempotentRepositoryNonEagerIT.java
Overview
`KafkaIdempotentRepositoryNonEagerIT` is an integration test class designed to validate the behavior of the **non-eager** mode of the Apache Camel Kafka-based idempotent repository (`KafkaIdempotentRepository`). This class ensures that duplicate messages are correctly filtered out when processed through Camel routes using the Kafka idempotent repository but with lazy (non-eager) idempotent checking.
The tests in this file verify two main aspects:
Duplicate messages are not processed more than once.
Exceptions during message processing cause rollback, allowing re-processing.
The class extends a base test class `SimpleIdempotentTest` (not included here), which presumably provides foundational Camel test infrastructure.
Detailed Explanation
Package and Dependencies
Package:
org.apache.camel.processor.idempotent.kafkaKey Dependencies:
Apache Camel testing framework (
org.apache.camel.test,org.junit.jupiter.api)Kafka test utilities (
KafkaTestUtil)Awaitility for asynchronous assertions
JUnit 5 for structured test execution and ordering
Class: KafkaIdempotentRepositoryNonEagerIT
Description
This class defines integration tests for the `KafkaIdempotentRepository` configured with eager mode disabled (`eager(false)`). It verifies that the idempotent consumer correctly filters duplicates lazily during message consumption in a Camel route.
Annotations
@TestMethodOrder(MethodOrderer.OrderAnnotation.class): Ensures test methods run in a specified order.@BeforeAll: Setup method executed once before all tests.@BindToRegistry: Registers the Kafka idempotent repository instance into Camel's registry under the namekafkaIdempotentRepositoryNonEager.@ContextFixture: Custom test fixture hook to configure Kafka component in the Camel context.@Order: Specifies execution order for JUnit test methods.@Test,@DisplayName: Define individual test cases with readable names.
Fields
Field | Description |
|---|---|
`REPOSITORY_TOPIC` | Kafka topic name for storing idempotent keys, unique per test run using UUID. |
`kafkaIdempotentRepository` | Instance of `KafkaIdempotentRepository` bound to Camel registry, uses the topic above and Kafka bootstrap servers. |
Methods
createRepositoryTopic()
Type:
static voidPurpose: Creates the Kafka topic used by the idempotent repository.
Usage: Runs once before any test cases to ensure the topic exists.
Details: Uses KafkaTestUtil.createTopic with 1 partition.
configureKafka(CamelContext context)
Parameters:
CamelContext context- The Camel runtime context.
Purpose: Configures the Kafka component within the Camel context to use the correct Kafka bootstrap servers.
Details: Ensures the Kafka component is properly set up before routes execute.
createRouteBuilder()
Returns:
RouteBuilderPurpose: Defines the Camel routing logic used in the tests.
Route Details:
Starts from endpoint
direct:in.Sends messages to
mock:beforeendpoint (to capture all incoming messages).Processes messages with an idempotent consumer using the header
"id"as the message key.Uses the Kafka idempotent repository registered as
"kafkaIdempotentRepositoryNonEager".Sets
.eager(false)to disable eager checking (lazy mode).Sends unique messages to
mock:out.
Significance: Demonstrates how to set up a non-eager idempotent consumer with Kafka repository.
testRemovesDuplicates()
Test Order: 1
Purpose: Validates that duplicate messages (same
"id"header) are filtered out by idempotent consumer.Test Logic:
Sends 10 messages with
"id"headers cycling from 0 to 4 (duplicates exist).Asserts that only 5 unique messages appear on
mock:out(duplicates filtered).Asserts all 10 messages were received on
mock:before.
Key Assertion:
mockOut.getReceivedCounter()equals 5; duplicates removed.Usage Example:
ProducerTemplate template = contextExtension.getProducerTemplate();
for (int i = 0; i < 10; i++) {
template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5);
}
testRollsBackOnException()
Test Order: 2
Purpose: Ensures that if an exception occurs during processing, the message is not marked as consumed and can be retried.
Test Logic:
Configures
mock:outto throw an exception when receiving messages with"id" == 0.Sends 10 messages with
"id"cycling 0 to 4.Catches expected
CamelExecutionExceptionfor failed messages.Verifies that only 5 messages are successfully processed.
Validates total messages received by
mock:beforeis 20 (including retries).
Significance: Demonstrates rollback behavior when exceptions occur, confirming Kafka idempotent repository does not prematurely mark messages as consumed.
Usage Example:
mockOut.whenAnyExchangeReceived(exchange -> {
int id = exchange.getIn().getHeader("id", Integer.class);
if (id == 0) {
throw new IllegalArgumentException("Boom!");
}
});
Important Implementation Details
Non-Eager Idempotent Mode: The
.eager(false)setting means the idempotent repository will check for duplicates lazily, i.e., after the message is routed to the idempotent consumer rather than before. This behavior differs from eager mode that checks before processing.Kafka Topic Usage: The Kafka topic acts as a distributed store for idempotent keys to track which message IDs have already been processed, enabling reliability and consistency across distributed Camel instances.
Test Ordering: Tests depend on previous message processing state, hence the use of ordered test execution.
Exception Handling: Use of throwing exceptions in processing to confirm rollback semantics with idempotent repository.
Interaction with Other System Components
Apache Camel: Uses Camel's routing DSL and testing framework to define and test routes.
Kafka: Utilizes Kafka topics as backing store for idempotent keys.
KafkaTestUtil: Utility to create topics and configure Kafka components for testing.
MockEndpoints: Used to verify message flow and behavior during tests.
SimpleIdempotentTest: Base class providing foundational Camel test support (context, extensions).
Visual Diagram
classDiagram
class KafkaIdempotentRepositoryNonEagerIT {
- static final String REPOSITORY_TOPIC
- KafkaIdempotentRepository kafkaIdempotentRepository
+ static void createRepositoryTopic()
+ void configureKafka(CamelContext context)
+ RouteBuilder createRouteBuilder()
+ void testRemovesDuplicates()
+ void testRollsBackOnException()
}
KafkaIdempotentRepositoryNonEagerIT --|> SimpleIdempotentTest
Summary
`KafkaIdempotentRepositoryNonEagerIT` is a focused integration test class that validates the lazy (non-eager) mode of Apache Camel's Kafka-backed idempotent repository. It ensures duplicate messages are filtered and that processing exceptions cause correct rollback behavior. This helps guarantee that the Kafka idempotent repository behaves correctly in distributed and fault-tolerant Camel routes.
The class showcases practical usage of Kafka as an idempotent message store and demonstrates how to write robust tests for idempotent consumers in Camel.