KafkaConsumerIdempotentGroupIdIT.java
Overview
`KafkaConsumerIdempotentGroupIdIT.java` is an integration test class designed to verify the idempotent consumption of messages from a Kafka topic using Apache Camel's Kafka component. Specifically, it tests that messages with numeric headers can be consumed exactly once when idempotency is enabled via a Kafka-backed idempotent repository.
This test class:
Creates unique Kafka topics for messages and idempotent repository storage dynamically.
Sends a batch of messages to a Kafka topic.
Defines a Camel route that consumes messages from Kafka with idempotency enabled based on a numeric header.
Verifies through a mock endpoint that only unique messages are processed.
Cleans up Kafka topics after tests.
The test ensures that the Kafka consumer's idempotent feature works correctly when using group IDs, thus preventing duplicate message processing in distributed consumption scenarios.
Classes and Methods
Class: KafkaConsumerIdempotentGroupIdIT
This is the main test class that extends `KafkaConsumerIdempotentTestSupport` (not shown here, but presumably provides common Kafka testing utilities).
Fields
Field Name | Type | Description |
|---|---|---|
`TOPIC` | `String` (static final) | Unique Kafka topic name for sending test messages, generated with UUID prefix. |
`REPOSITORY_TOPIC` | `String` (static final) | Unique Kafka topic name used by `KafkaIdempotentRepository` to store processed message keys. |
`size` | `int` | Number of messages to produce and test (set to 200). |
`testIdempotent` | `KafkaIdempotentRepository` | Idempotent repository bound to Camel registry for deduplication persistence. |
Static Initializer
Generates unique topic names for
TOPICandREPOSITORY_TOPICusing random UUIDs to avoid conflicts in parallel test runs.
Static Methods
createRepositoryTopic():
Runs once before all tests. Creates the Kafka topic used for the idempotent repository, with a single partition.removeRepositoryTopic():
Runs once after all tests. Deletes the idempotent repository Kafka topic to clean up resources.
Lifecycle Methods
before():
Runs before each test. Producessize(200) messages to theTOPIC. This ensures messages are available for consumption in each test.after():
Runs after each test. Deletes theTOPICused for message testing to maintain a clean Kafka environment.
Registry Binding
The
KafkaIdempotentRepositoryinstancetestIdempotentis bound to the Camel registry with the name"kafkaIdempotentRepository". This repository stores consumed message IDs in the Kafka topicREPOSITORY_TOPICto achieve idempotency.
Method: createRouteBuilder()
Returns a Camel `RouteBuilder` that defines the route under test:
Consumes from the Kafka topic
TOPICwith the following key configuration:Consumer group ID:
"KafkaConsumerIdempotentIT"Auto offset reset to earliest to consume all messages.
String deserializers for key and value.
Auto commit enabled with interval and poll timeout configured.
Custom interceptor class for testing:
MockConsumerInterceptor.
Uses
.idempotentConsumer()with:The numeric header
"id"as the deduplication key.The idempotent repository bean
"kafkaIdempotentRepository".
Routes consumed unique messages to a mock endpoint KafkaTestUtil.MOCK_RESULT for assertion.
This route tests that messages with the same `"id"` header are consumed only once.
Test Method: kafkaIdempotentMessageIsConsumedByCamel()
Annotated with
@Testand@DisplayName.Retrieves the mock endpoint for assertion.
Calls
doRun(to, size)to execute the consumption and verify that exactlysizeunique messages are consumed.Verifies that the idempotent consumer logic successfully filters duplicates based on the numeric header.
Important Implementation Details
Idempotent Consumption:
The test leveragesidempotentConsumerin Camel, which uses the Kafka-backedKafkaIdempotentRepositoryto store message IDs. This repository uses a dedicated Kafka topic (REPOSITORY_TOPIC) to persist processed message keys, enabling distributed and fault-tolerant deduplication.Unique Topics per Test Run:
Topics are named with UUIDs to avoid collisions and allow parallel or repeated test executions without interference.Use of Numeric Headers for Idempotency:
The idempotency key is derived from a numeric message header"id", demonstrating that Camel supports header-based deduplication beyond just message keys.Kafka Consumer Configuration:
The consumer is configured with auto-commit enabled and custom interceptors to simulate realistic Kafka client behavior during testing.Conditional Test Execution:
The entire test class is disabled if the system propertyenable.kafka.consumer.idempotency.testsis set to"false". This allows conditional enabling in CI or local environments.
Interactions with Other Components
KafkaIdempotentRepository:
Acts as the persistent store for consumed message IDs. This repository interacts directly with Kafka, writing and reading from theREPOSITORY_TOPIC.KafkaTestUtil:
Provides utility methods and constants such as topic creation, deletion, and mock endpoint names used in the test.MockEndpoint:
A Camel testing component that collects messages routed to it, enabling assertions on message count and contents.KafkaConsumerIdempotentTestSupport(base class):
Presumably provides shared Kafka setup, tear-down, and helper methods like doSend() anddoRun()used in sending and asserting message processing.
Usage Example
This test runs automatically as part of the integration test suite if the system property `enable.kafka.consumer.idempotency.tests` is not set to `false`. It verifies that messages sent to a Kafka topic are consumed once and only once by a Camel route with idempotent consumer enabled.
To manually run the test:
mvn test -Dtest=KafkaConsumerIdempotentGroupIdIT
To disable the test:
mvn test -Denable.kafka.consumer.idempotency.tests=false
Mermaid Class Diagram
classDiagram
class KafkaConsumerIdempotentGroupIdIT {
- static final String TOPIC
- static final String REPOSITORY_TOPIC
- final int size = 200
- KafkaIdempotentRepository testIdempotent
+ static void createRepositoryTopic()
+ static void removeRepositoryTopic()
+ void before()
+ void after()
+ RouteBuilder createRouteBuilder()
+ void kafkaIdempotentMessageIsConsumedByCamel()
}
KafkaConsumerIdempotentGroupIdIT --|> KafkaConsumerIdempotentTestSupport
KafkaConsumerIdempotentGroupIdIT o-- KafkaIdempotentRepository : uses
KafkaConsumerIdempotentGroupIdIT --> RouteBuilder : creates
Summary
`KafkaConsumerIdempotentGroupIdIT.java` is a focused integration test validating Apache Camel Kafka component's idempotent consumer feature using a Kafka-backed repository. It ensures that messages with numeric IDs in headers are only processed once per consumer group, preventing duplicates in distributed Kafka consumption scenarios. The test dynamically manages Kafka topics, configures Camel routes with idempotent consumers, and uses mock endpoints for verification, serving as a reliable example of idempotency integration in event-driven applications.