KafkaConsumerIdempotentIT.java
Overview
`KafkaConsumerIdempotentIT.java` is an integration test class for the Apache Camel Kafka component, specifically validating the idempotent consumption feature when using Kafka consumers. The primary goal of this test is to ensure that messages are consumed exactly once by the Camel route when Kafka's idempotent consumer pattern is applied, even when message headers are numeric types.
This test class:
Creates unique Kafka topics for testing idempotency and the idempotent repository.
Uses a Kafka-backed idempotent repository to track processed message keys.
Defines a Camel route that consumes from a Kafka topic with idempotency enabled.
Sends a batch of messages and verifies that only unique messages are consumed exactly once.
Employs JUnit 5 annotations for lifecycle management and conditional execution.
Class: KafkaConsumerIdempotentIT
Inheritance
Extends:
KafkaConsumerIdempotentTestSupport
This base test support class likely provides common setup and utility methods for Kafka consumer idempotency testing but is not included in this file.
Constants
Name | Type | Description |
|---|---|---|
`TOPIC` | String | Kafka topic name for producing test messages, unique per test run. |
`REPOSITORY_TOPIC` | String | Kafka topic used as the backing store for the idempotent repository. |
These topics are dynamically generated with a UUID suffix to avoid conflicts between test runs.
Fields
Name | Type | Description |
|---|---|---|
`size` | int | Number of test messages to send (default 200). |
`testIdempotent` | KafkaIdempotentRepository | Kafka-backed idempotent repository bound to the Camel registry with name `"kafkaIdempotentRepository"`. |
Lifecycle Methods
Method | Description |
|---|---|
`@BeforeAll createRepositoryTopic()` | Creates the Kafka topic used for the idempotent repository before any tests run. |
`@AfterAll removeRepositoryTopic()` | Deletes the repository topic after all tests finish. |
`@BeforeEach before()` | Sends `size` messages to the `TOPIC` before each test method. |
`@AfterEach after()` | Deletes the main test topic (`TOPIC`) after each test to clean up. |
Route Definition
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("kafka:" + TOPIC
+ "?groupId=KafkaConsumerIdempotentIT&autoOffsetReset=earliest"
+ "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
+ "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
.routeId("foo")
.idempotentConsumer(numericHeader("id"))
.idempotentRepository("kafkaIdempotentRepository")
.to(KafkaTestUtil.MOCK_RESULT);
}
};
}
**Explanation:**
The route consumes messages from the Kafka topic
TOPIC.Consumer properties include:
Group ID:
"KafkaConsumerIdempotentIT".Earliest offset reset.
String deserialization for keys and values.
Auto commit enabled with 1-second interval.
Custom interceptor for mocking/testing.
The route applies an idempotent consumer pattern, using the numeric header
"id"as the idempotency key.The idempotent repository used is the Kafka-backed repository registered as
"kafkaIdempotentRepository".Processed messages are sent to a mock endpoint defined by
KafkaTestUtil.MOCK_RESULTfor assertions.
Test Method
@Test
@DisplayName("Numeric headers is consumable when using idempotent (CAMEL-16914)")
void kafkaIdempotentMessageIsConsumedByCamel() {
MockEndpoint to = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
doRun(to, size);
}
Purpose: Validates that messages with numeric headers can be consumed idempotently.
doRun: Likely a helper method (defined in the superclass) that triggers route execution and asserts expectations on the mock endpoint.MockEndpoint: Used for verifying that the correct number of unique messages are received.
Important Implementation Details
Idempotent Consumer Pattern: This pattern ensures that duplicate messages are filtered out by checking a unique message identifier before processing. Here, the
"id"header (numeric type) is used as the unique key.KafkaIdempotentRepository: The idempotency store is backed by a Kafka topic (
REPOSITORY_TOPIC), which stores processed message keys to maintain state across consumer restarts and distributed instances.Dynamic Topics: Both the main topic and the repository topic are created with a unique UUID suffix at runtime, avoiding test interference and ensuring isolation.
Test Enablement: The entire test class is disabled by default if the system property
enable.kafka.consumer.idempotency.testsis set to"false". This allows selective execution of these potentially longer-running integration tests.Timeout: Each test is subject to a 60-second timeout to avoid hangs.
Interaction with Other System Components
Kafka Cluster: The test interacts with a running Kafka cluster, creating and deleting topics dynamically.
Camel Kafka Component: Uses Apache Camel's Kafka component to consume messages idempotently.
KafkaIdempotentRepository: A specialized repository component that stores idempotency keys in Kafka.
MockEndpoint: Part of Apache Camel test utilities used for asserting message consumption.
KafkaTestUtil: Provides utility methods and constants for Kafka testing, such as topic creation and mock endpoints.
Base Test Support (
KafkaConsumerIdempotentTestSupport): Provides common Kafka and Camel test infrastructure like sending messages (doSend) and running assertions (doRun).
Usage Example
This class itself is a test and not intended for direct production use. However, it serves as an example for:
Configuring a Camel route with Kafka consumer idempotency using a numeric header.
Setting up and using a Kafka-backed idempotent repository.
Writing integration tests for Kafka consumers with idempotency in Camel.
Mermaid Class Diagram
classDiagram
class KafkaConsumerIdempotentIT {
- static final String TOPIC
- static final String REPOSITORY_TOPIC
- final int size = 200
- final KafkaIdempotentRepository testIdempotent
+ static void createRepositoryTopic()
+ static void removeRepositoryTopic()
+ void before()
+ void after()
+ RouteBuilder createRouteBuilder()
+ void kafkaIdempotentMessageIsConsumedByCamel()
}
KafkaConsumerIdempotentIT --|> KafkaConsumerIdempotentTestSupport
KafkaConsumerIdempotentIT ..> KafkaIdempotentRepository : uses
KafkaConsumerIdempotentIT ..> RouteBuilder : defines route
KafkaConsumerIdempotentIT ..> MockEndpoint : verifies consumption
Summary
`KafkaConsumerIdempotentIT.java` is a focused integration test validating Apache Camel's Kafka consumer idempotency feature using Kafka as the repository backend. It ensures that messages with numeric headers are processed exactly once, demonstrating robust idempotent consumption in distributed Kafka environments. This test also provides a practical example of configuring Kafka consumer properties and idempotent repositories in Camel routes.