KafkaConsumerIdempotentWithProcessorIT.java

Overview

`KafkaConsumerIdempotentWithProcessorIT.java` is an integration test class within the Apache Camel Kafka component module. It verifies the correct functioning of an idempotent Kafka consumer route that processes messages with a custom processor before applying idempotency checks using a Kafka-backed idempotent repository.

The test ensures that duplicate messages are filtered out by the idempotent consumer based on a uniquely computed message header (`id`), which is transformed by a processor. It uses a Kafka topic for message consumption and a separate Kafka topic as the backing store for the idempotent repository. This setup validates the end-to-end behavior of Kafka message consumption with idempotency enforced by KafkaIdempotentRepository and message processing.


Class: KafkaConsumerIdempotentWithProcessorIT

Extends: `KafkaConsumerIdempotentTestSupport`

Purpose

This class sets up Kafka topics, produces test messages, and configures a Camel route that:

Important Fields

Field

Type

Description

`TOPIC`

`String`

The Kafka topic used for consuming test messages, uniquely generated per test run.

`REPOSITORY_TOPIC`

`String`

The Kafka topic used by `KafkaIdempotentRepository` to store consumed message IDs.

`size`

`int`

Number of test messages to produce and consume (default 200).

`kafkaIdempotentRepository`

`KafkaIdempotentRepository`

Instance of Kafka-backed idempotent repository bound to the registry for idempotency checks.


Static Initialization Block


Lifecycle Methods

Method

Annotation

Purpose

`createRepositoryTopic`

`@BeforeAll`

Creates the Kafka topic for the idempotent repository before any tests run.

`removeRepositoryTopic`

`@AfterAll`

Deletes the repository topic after all tests have completed.

`before`

`@BeforeEach`

Produces `size` messages to the test topic before each test execution.

`after`

`@AfterEach`

Deletes the test topic after each test to clean the environment.


Registry Binding


Route Configuration - createRouteBuilder()

Defines a single Camel route with the following characteristics:

**Processor Detail:**


Test Method - kafkaMessageIsConsumedByCamel()


Usage Example

This class functions as an integration test and is executed via a JUnit 5 test runner. To run the test:

mvn test -Dtest=KafkaConsumerIdempotentWithProcessorIT

The test produces messages to a Kafka topic, consumes them with the configured route, applies idempotency, and validates output via a mock endpoint.


Implementation Details & Algorithms

This setup ensures that messages with duplicate IDs are filtered out, preventing reprocessing, which is critical in exactly-once message processing scenarios.


Interaction with Other Components


Class Diagram

classDiagram
    class KafkaConsumerIdempotentWithProcessorIT {
        - static String TOPIC
        - static String REPOSITORY_TOPIC
        - int size = 200
        - KafkaIdempotentRepository kafkaIdempotentRepository
        + static void createRepositoryTopic()
        + static void removeRepositoryTopic()
        + void before()
        + void after()
        + RouteBuilder createRouteBuilder()
        + void kafkaMessageIsConsumedByCamel()
    }
    KafkaConsumerIdempotentWithProcessorIT --|> KafkaConsumerIdempotentTestSupport
    KafkaConsumerIdempotentWithProcessorIT o-- KafkaIdempotentRepository : uses
    KafkaConsumerIdempotentWithProcessorIT ..> RouteBuilder : returns

Summary

`KafkaConsumerIdempotentWithProcessorIT` is a focused integration test that validates a Kafka consumer route in Apache Camel with idempotent message consumption. It demonstrates how to:

This test ensures reliable exactly-once processing semantics in Kafka-Camel integration scenarios, a critical feature for robust stream processing applications.