KafkaConsumerFullIT.java

Overview

`KafkaConsumerFullIT.java` is an integration test class designed to validate the full lifecycle and advanced features of the Apache Camel Kafka consumer component within a real Kafka environment. It performs end-to-end testing by producing messages to a Kafka topic and asserting that these messages are correctly consumed, processed, and routed by Camel routes configured with Kafka consumers.

The class tests various Kafka consumer behaviors such as:

It uses JUnit 5 for test structuring and assertions, Apache Camel's testing framework for route and mock endpoint management, and Kafka's Java client APIs for producing messages and managing topics.


Package and Imports


Class: KafkaConsumerFullIT

Description

`KafkaConsumerFullIT` extends [BaseKafkaTestSupport](/projects/289/68683) (presumably a base class providing common Kafka test utilities) and implements an extensive suite of integration tests to verify the correctness and robustness of Kafka consumer behavior when integrated with Apache Camel routes.

Constants

Name

Description

`TOPIC`

Unique Kafka topic name used for all tests, generated with a UUID to avoid clashes.

`ROUTE`

Unique Camel route ID, also generated with UUID.

`FROM_URI`

Kafka endpoint URI configured with group ID, deserializers, auto-commit settings, polling timeouts, and an interceptor class for capturing records.

Logger

Fields

Name

Type

Description

`producer`

org.apache.kafka.clients.producer.KafkaProducer

Kafka producer instance used to send test messages.

`bean`

`MyKafkaHeaderDeserializer` (inner class)

Custom Kafka header deserializer bound to Camel registry for testing header deserializer override.


Lifecycle Methods

@BeforeEach void before()

@AfterEach void after()


Route Setup

@RouteFixture void createRouteBuilder(CamelContext context)

RouteBuilder createRouteBuilder()


Tests

1. kafkaMessageIsConsumedByCamelSeekedToBeginning()

// Producing messages
for (int i = 0; i < 5; i++) {
    producer.send(new ProducerRecord<>(TOPIC, "1", "message-" + i));
}
// Assertions handled by MockEndpoint

2. kafkaRecordSpecificHeadersAreNotOverwritten()

3. kafkaMessageIsConsumedByCamel()

4. kafkaMessageIsConsumedByCamelSeekedToEnd()

5. headerDeserializerCouldBeOverridden()

6. kafkaMessageIsConsumedByCamelAfterSuspendResume()


Inner Classes

MyKafkaHeaderDeserializer


Important Implementation Details


Interaction with Other System Components


Usage Example of Test Setup

// Producing messages
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "1", "message-" + i);
    record.headers().add(new RecordHeader("PropagatedCustomHeader", "value".getBytes()));
    producer.send(record);
}

// Asserting via Camel Mock Endpoint
MockEndpoint mock = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
mock.expectedMessageCount(5);
mock.assertIsSatisfied(3000);

Visual Diagram: Class Structure

classDiagram
    class KafkaConsumerFullIT {
        - static final String TOPIC
        - static final String ROUTE
        - static final String FROM_URI
        - Logger LOG
        - KafkaProducer<String,String> producer
        - MyKafkaHeaderDeserializer bean
        + void before()
        + void after()
        + void createRouteBuilder(CamelContext)
        + RouteBuilder createRouteBuilder()
        + void kafkaMessageIsConsumedByCamelSeekedToBeginning()
        + void kafkaRecordSpecificHeadersAreNotOverwritten()
        + void kafkaMessageIsConsumedByCamel()
        + void kafkaMessageIsConsumedByCamelSeekedToEnd()
        + void headerDeserializerCouldBeOverridden()
        + void kafkaMessageIsConsumedByCamelAfterSuspendResume()
    }
    class MyKafkaHeaderDeserializer {
    }
    KafkaConsumerFullIT --> MyKafkaHeaderDeserializer

Summary

`KafkaConsumerFullIT.java` is a comprehensive integration test class that verifies multiple aspects of Kafka consumer integration with Apache Camel routes, focusing on message consumption correctness, header propagation, consumer seek semantics, and lifecycle management. It leverages Kafka's actual broker environment and Camel's testing framework to ensure the Kafka component behaves reliably in complex scenarios, helping maintain the robustness of the Camel Kafka component.

This file plays a critical role in the system by providing automated validation for Kafka consumer features, preventing regressions and ensuring smooth integration within larger applications that rely on Apache Camel and Kafka.