KafkaConsumerIdempotentWithProcessorIT.java
Overview
`KafkaConsumerIdempotentWithProcessorIT.java` is an integration test class within the Apache Camel Kafka component module. It verifies the correct functioning of an idempotent Kafka consumer route that processes messages with a custom processor before applying idempotency checks using a Kafka-backed idempotent repository.
The test ensures that duplicate messages are filtered out by the idempotent consumer based on a uniquely computed message header (`id`), which is transformed by a processor. It uses a Kafka topic for message consumption and a separate Kafka topic as the backing store for the idempotent repository. This setup validates the end-to-end behavior of Kafka message consumption with idempotency enforced by KafkaIdempotentRepository and message processing.
Class: KafkaConsumerIdempotentWithProcessorIT
Extends: `KafkaConsumerIdempotentTestSupport`
Purpose
This class sets up Kafka topics, produces test messages, and configures a Camel route that:
Consumes messages from a Kafka topic.
Processes the message header
idto convert it from a byte array to a long-represented string.Applies an idempotent consumer using the processed
idheader.Uses a Kafka-backed idempotent repository to filter out duplicates.
Routes unique messages to a mock endpoint for verification.
Important Fields
Field | Type | Description |
|---|---|---|
`TOPIC` | `String` | The Kafka topic used for consuming test messages, uniquely generated per test run. |
`REPOSITORY_TOPIC` | `String` | The Kafka topic used by `KafkaIdempotentRepository` to store consumed message IDs. |
`size` | `int` | Number of test messages to produce and consume (default 200). |
`kafkaIdempotentRepository` | `KafkaIdempotentRepository` | Instance of Kafka-backed idempotent repository bound to the registry for idempotency checks. |
Static Initialization Block
Generates unique topic names with UUID suffixes to isolate test runs.
TOPICandREPOSITORY_TOPICare initialized here.
Lifecycle Methods
Method | Annotation | Purpose |
|---|---|---|
`createRepositoryTopic` | `@BeforeAll` | Creates the Kafka topic for the idempotent repository before any tests run. |
`removeRepositoryTopic` | `@AfterAll` | Deletes the repository topic after all tests have completed. |
`before` | `@BeforeEach` | Produces `size` messages to the test topic before each test execution. |
`after` | `@AfterEach` | Deletes the test topic after each test to clean the environment. |
Registry Binding
The KafkaIdempotentRepository instance is bound to the Camel registry with the name
"kafkaIdempotentRepository".It is configured with the
REPOSITORY_TOPICand Kafka bootstrap servers.
Route Configuration - createRouteBuilder()
Defines a single Camel route with the following characteristics:
Source: Kafka consumer endpoint listening on
TOPICwith group IDKafkaConsumerIdempotentWithProcessorIT.Deserializers: Uses
StringDeserializerfor both key and value.Offsets: Auto offset reset to earliest, auto commit enabled with interval 1000ms.
Interceptor: Uses
MockConsumerInterceptorfor testing.Processor: Transforms the message header
"id"from a byte array to its long integer string representation.Idempotent Consumer: Uses the transformed
"id"header as the idempotency key.Idempotent Repository: Uses the Kafka-backed repository injected from the registry.
Sink: Routes unique messages to a mock endpoint defined in
KafkaTestUtil.MOCK_RESULT.
**Processor Detail:**
The processor extracts the
"id"header as a byte array.Converts it to a
BigInteger.Converts to a
longvalue and then to aString.Sets the transformed
"id"header back on the message.
Test Method - kafkaMessageIsConsumedByCamel()
Retrieves the mock endpoint to assert results.
Invokes
doRun(to, size)from the superclass to run the test scenario.Verifies that the Camel route correctly consumes and processes all messages with idempotency applied.
Usage Example
This class functions as an integration test and is executed via a JUnit 5 test runner. To run the test:
mvn test -Dtest=KafkaConsumerIdempotentWithProcessorIT
The test produces messages to a Kafka topic, consumes them with the configured route, applies idempotency, and validates output via a mock endpoint.
Implementation Details & Algorithms
Idempotency: Implemented via
KafkaIdempotentRepositorywhich stores keys (here, message IDs) in a Kafka topic to track consumed messages.Processor Transformation: Converts the message header
idfrom a byte array to a string representation of a long integer to provide a consistent idempotency key.Kafka Topics: Separate topics are used for message flow and idempotent key storage to isolate responsibilities and facilitate testing.
MockConsumerInterceptor: Used for testing to intercept Kafka consumption behavior.
This setup ensures that messages with duplicate IDs are filtered out, preventing reprocessing, which is critical in exactly-once message processing scenarios.
Interaction with Other Components
Kafka Broker: Provides messaging infrastructure for both data topic and idempotent repository topic.
KafkaIdempotentRepository: A Camel component that uses Kafka topics to maintain an idempotency store.
Camel Mock Endpoint (MockEndpoint): Used for asserting the final message count and content after idempotent filtering.
KafkaTestUtil: Provides utilities such as topic creation, constants, and mock endpoints for testing.
Superclass
KafkaConsumerIdempotentTestSupport: Provides utility methods likedoSendanddoRunfor sending and verifying messages, as well as Kafka client instances.
Class Diagram
classDiagram
class KafkaConsumerIdempotentWithProcessorIT {
- static String TOPIC
- static String REPOSITORY_TOPIC
- int size = 200
- KafkaIdempotentRepository kafkaIdempotentRepository
+ static void createRepositoryTopic()
+ static void removeRepositoryTopic()
+ void before()
+ void after()
+ RouteBuilder createRouteBuilder()
+ void kafkaMessageIsConsumedByCamel()
}
KafkaConsumerIdempotentWithProcessorIT --|> KafkaConsumerIdempotentTestSupport
KafkaConsumerIdempotentWithProcessorIT o-- KafkaIdempotentRepository : uses
KafkaConsumerIdempotentWithProcessorIT ..> RouteBuilder : returns
Summary
`KafkaConsumerIdempotentWithProcessorIT` is a focused integration test that validates a Kafka consumer route in Apache Camel with idempotent message consumption. It demonstrates how to:
Use a processor to transform message headers for idempotency keys.
Leverage a Kafka-backed idempotent repository to track consumed messages.
Set up isolated Kafka topics for test data and idempotent state.
Use Camel testing facilities to verify end-to-end message flow correctness.
This test ensures reliable exactly-once processing semantics in Kafka-Camel integration scenarios, a critical feature for robust stream processing applications.