KafkaProducerFullIT.java

Overview

`KafkaProducerFullIT.java` is an integration test class for the Apache Camel Kafka component. Its primary purpose is to validate the full lifecycle of producing messages to Kafka topics using Camel routes. The class covers sending string and byte array messages, handling collections of messages, testing Kafka producer interceptors, verifying header propagation and filtering, and ensuring that message metadata and acknowledgments are properly handled.

The tests employ embedded Kafka consumers to verify messages published to topics and Camel's `MockEndpoint` to assert that message exchanges carry the expected metadata. It also demonstrates how to override header filtering and serialization strategies in Kafka endpoints.

This file is part of the broader Kafka component integration tests and interacts with Kafka brokers, Camel routes, and Kafka consumers to ensure correctness and robustness of message production features.


Class: KafkaProducerFullIT

Extends

Purpose


Constants

Constant

Description

`DIRECT_START_STRINGS_URI`

URI for Camel route that sends string messages to Kafka.

`DIRECT_START_STRINGS_2_URI`

URI for another Camel route sending string messages.

`DIRECT_START_BYTES_URI`

URI for Camel route that sends byte array messages to Kafka.

`DIRECT_START_TRACED_URI`

URI for Camel route that sends messages intercepted by a producer interceptor.

`DIRECT_PROPAGATED_HEADERS_URI`

URI for route that tests header propagation to Kafka messages.

`DIRECT_NO_RECORD_SPECIFIC_HEADERS_URI`

URI for route that ensures record-specific headers are not propagated.

`TOPIC_STRINGS`

Kafka topic name for string messages.

`TOPIC_INTERCEPTED`

Kafka topic used for interception tests.

`TOPIC_STRINGS_IN_HEADER`

Alternative topic name passed via message headers.

`TOPIC_BYTES`

Kafka topic name for byte array messages.

`TOPIC_BYTES_IN_HEADER`

Alternative topic for byte array messages passed via headers.

`GROUP_BYTES`

Kafka consumer group id for byte array consumers.

`TOPIC_PROPAGATED_HEADERS`

Kafka topic for testing header propagation.

`TOPIC_NO_RECORD_SPECIFIC_HEADERS`

Kafka topic for testing filtering out record-specific headers.

`KAFKA_ACK_MOCK`

Mock endpoint URI for capturing Kafka acknowledgments.


Fields

Field

Description

`stringsConsumerConn`

KafkaConsumer for consuming string messages from Kafka topics.

`bytesConsumerConn`

KafkaConsumer for consuming byte array messages from Kafka topics.

`stringsTemplate`

Camel ProducerTemplate for sending string messages.

`stringsTemplate2`

Second Camel ProducerTemplate for string messages (used for different routes).

`bytesTemplate`

Camel ProducerTemplate for sending byte messages.

`interceptedTemplate`

ProducerTemplate used for testing producer interceptors.

`propagatedHeadersTemplate`

ProducerTemplate for testing header propagation.

`noRecordSpecificHeadersTemplate`

ProducerTemplate for testing header filtering of record-specific headers.

`headerFilterStrategy`

Custom header filter strategy bound to Camel registry.

`headersSerializer`

Custom Kafka header serializer bound to Camel registry.


Lifecycle Methods

@BeforeAll before()

@AfterAll after()

@BeforeEach setupProducerTemplates()


Route Builder: createRouteBuilder()

Defines Camel routes that send messages from direct endpoints to Kafka topics.

Example route snippet:

from(DIRECT_START_STRINGS_URI)
    .to("kafka:" + TOPIC_STRINGS + "?recordMetadata=true&requestRequiredAcks=-1")
    .to(KAFKA_ACK_MOCK);

Key Test Methods

producedStringMessageIsReceivedByKafka()

producedString2MessageIsReceivedByKafka()

producedStringMessageIsIntercepted()

producedStringCollectionMessageIsReceivedByKafka()

producedBytesMessageIsReceivedByKafka()

propagatedHeaderIsReceivedByKafka()

recordSpecificHeaderIsNotReceivedByKafka()

headerFilterStrategyCouldBeOverridden()

headerSerializerCouldBeOverridden()


Helper Methods

createStringKafkaConsumer(String groupId)

createByteKafkaConsumer(String groupId)

createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn, String topic, String topicInHeader, CountDownLatch messagesLatch)

createKafkaBytesMessageConsumer(KafkaConsumer<byte[], byte[]> consumerConn, String topic, String topicInHeader, CountDownLatch messagesLatch)

pollForRecords(KafkaConsumer<String, String> consumerConn, String topic, CountDownLatch messagesLatch)

getHeaderValue(String headerKey, Headers headers)


Inner Classes

MyHeaderFilterStrategy extends DefaultHeaderFilterStrategy

MyKafkaHeadersSerializer extends DefaultKafkaHeaderSerializer


Important Implementation Details


Interactions with Other System Components


Usage Example

Sending string messages to Kafka in a test scenario:

int messageCount = 10;
CountDownLatch latch = new CountDownLatch(messageCount);
sendMessagesInRoute(DIRECT_START_STRINGS_URI, messageCount, stringsTemplate, "Test message", KafkaConstants.PARTITION_KEY, "0");
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, latch);
latch.await(200, TimeUnit.MILLISECONDS);

Mermaid Class Diagram

classDiagram
    class KafkaProducerFullIT {
        -static final String DIRECT_START_STRINGS_URI
        -static final String DIRECT_START_STRINGS_2_URI
        -static final String DIRECT_START_BYTES_URI
        -static final String DIRECT_START_TRACED_URI
        -static final String DIRECT_PROPAGATED_HEADERS_URI
        -static final String DIRECT_NO_RECORD_SPECIFIC_HEADERS_URI
        -static final String TOPIC_STRINGS
        -static final String TOPIC_INTERCEPTED
        -static final String TOPIC_STRINGS_IN_HEADER
        -static final String TOPIC_BYTES
        -static final String TOPIC_BYTES_IN_HEADER
        -static final String GROUP_BYTES
        -static final String TOPIC_PROPAGATED_HEADERS
        -static final String TOPIC_NO_RECORD_SPECIFIC_HEADERS
        -static final String KAFKA_ACK_MOCK

        -static KafkaConsumer<String,String> stringsConsumerConn
        -static KafkaConsumer<byte[],byte[]> bytesConsumerConn

        -ProducerTemplate stringsTemplate
        -ProducerTemplate stringsTemplate2
        -ProducerTemplate bytesTemplate
        -ProducerTemplate interceptedTemplate
        -ProducerTemplate propagatedHeadersTemplate
        -ProducerTemplate noRecordSpecificHeadersTemplate

        -MyHeaderFilterStrategy headerFilterStrategy
        -MyKafkaHeadersSerializer headersSerializer

        +static void before()
        +static void after()
        +void setupProducerTemplates()
        +RouteBuilder createRouteBuilder()
        +void producedStringMessageIsReceivedByKafka()
        +void producedString2MessageIsReceivedByKafka()
        +void producedStringMessageIsIntercepted()
        +void producedStringCollectionMessageIsReceivedByKafka()
        +void producedBytesMessageIsReceivedByKafka()
        +void propagatedHeaderIsReceivedByKafka()
        +void recordSpecificHeaderIsNotReceivedByKafka()
        +void headerFilterStrategyCouldBeOverridden()
        +void headerSerializerCouldBeOverridden()

        -static KafkaConsumer<String,String> createStringKafkaConsumer(String)
        -static KafkaConsumer<byte[],byte[]> createByteKafkaConsumer(String)
        -void createKafkaMessageConsumer(KafkaConsumer<String,String>, String, String, CountDownLatch)
        -void createKafkaBytesMessageConsumer(KafkaConsumer<byte[],byte[]>, String, String, CountDownLatch)
        -List<ConsumerRecord<String,String>> pollForRecords(KafkaConsumer<String,String>, String, CountDownLatch)
        -byte[] getHeaderValue(String, Headers)
    }

    class MyHeaderFilterStrategy {
        <<static>>
        +MyHeaderFilterStrategy()
    }

    class MyKafkaHeadersSerializer {
        <<static>>
        +MyKafkaHeadersSerializer()
    }

    KafkaProducerFullIT "1" *-- "1" MyHeaderFilterStrategy : has
    KafkaProducerFullIT "1" *-- "1" MyKafkaHeadersSerializer : has

Summary

`KafkaProducerFullIT.java` is a detailed integration test class validating Kafka message production within Apache Camel routes. It tests various message types, header propagation, interceptors, and configuration overrides to ensure the Camel Kafka component functions as expected. The class uses Camel's testing utilities, Kafka consumers, and synchronization primitives to assert message delivery and metadata correctness, providing a robust test suite for Kafka producer functionality.