KafkaConsumerIdempotentTestSupport.java


Overview

`KafkaConsumerIdempotentTestSupport.java` is an abstract Java test support class designed to facilitate integration testing of Kafka consumers with idempotent behavior in the Apache Camel Kafka component. It extends a base test support class (`BaseKafkaTestSupport`) and provides helper methods to produce Kafka messages and verify their consumption in a manner that ensures each message is processed exactly once, despite potential retries or duplicate deliveries.

This class is primarily used in automated tests to send a batch of messages to a Kafka topic and then assert that the consumer endpoint (typically a Camel `MockEndpoint`) correctly receives and processes the expected number of messages with the appropriate headers, thus supporting idempotency testing scenarios.


Classes and Methods

Class: KafkaConsumerIdempotentTestSupport

**Type:** Abstract class **Extends:** `BaseKafkaTestSupport` (not provided here, assumed to provide Kafka test environment setup) **Package:** `org.apache.camel.component.kafka.integration`

This class provides utility methods to send messages to Kafka and verify their consumption in testing scenarios focused on idempotency.


Method: doSend(int size, String topic)

protected void doSend(int size, String topic)

**Description:** Sends a batch of Kafka messages to a specified topic. Each message is keyed and carries a header named `"id"` with a unique identifier matching the message index. This method uses the Kafka producer API directly to send messages synchronously within a try-with-resources block, ensuring resource cleanup.

**Parameters:**

**Implementation Details:**

**Usage Example:**

doSend(10, "test-topic");

This sends 10 messages with keys "0" through "9", each with a corresponding `"id"` header, to the topic `"test-topic"`.


Method: doRun(MockEndpoint mockEndpoint, int size)

protected void doRun(MockEndpoint mockEndpoint, int size)

**Description:** Waits and asserts that a given Camel `MockEndpoint` receives a specified number of messages within a timeout of 5 minutes. After receiving the messages, it performs a validation to ensure that the first received message contains the `"id"` header, which is critical for idempotency verification.

**Parameters:**

**Return:** Void. Throws assertion errors if conditions are not met.

**Implementation Details:**

**Usage Example:**

doRun(mockEndpoint, 10);

This waits for the `mockEndpoint` to receive 10 messages, then verifies the `"id"` header on the first message.


Important Implementation Details and Algorithms


Interaction with Other Parts of the System

This class is a utility base for Kafka consumer idempotency tests by bridging message production and consumer verification, facilitating integration testing of Camel Kafka components.


Visual Diagram

classDiagram
    class KafkaConsumerIdempotentTestSupport {
        <<abstract>>
        +void doSend(int size, String topic)
        +void doRun(MockEndpoint mockEndpoint, int size)
    }
    
    KafkaConsumerIdempotentTestSupport --|> BaseKafkaTestSupport
    KafkaConsumerIdempotentTestSupport ..> org.apache.camel.component.mock.MockEndpoint : uses
    KafkaConsumerIdempotentTestSupport ..> org.apache.kafka.clients.producer.KafkaProducer : uses
    KafkaConsumerIdempotentTestSupport ..> org.awaitility.Awaitility : uses

Summary

`KafkaConsumerIdempotentTestSupport.java` is an abstract test support class designed to simplify writing integration tests for Kafka consumers that require idempotent processing guarantees. It provides methods to send uniquely identified messages to Kafka and assert their consumption in Camel routes, verifying the presence of critical headers used for idempotency. By leveraging foundational Kafka and Camel testing utilities alongside Awaitility for asynchronous assertions, this class forms a vital part of robust testing strategies for Kafka-based applications in the Apache Camel ecosystem.