KafkaProducerFullIT.java
Overview
`KafkaProducerFullIT.java` is an integration test class for the Apache Camel Kafka component. Its primary purpose is to validate the full lifecycle of producing messages to Kafka topics using Camel routes. The class covers sending string and byte array messages, handling collections of messages, testing Kafka producer interceptors, verifying header propagation and filtering, and ensuring that message metadata and acknowledgments are properly handled.
The tests employ embedded Kafka consumers to verify messages published to topics and Camel's `MockEndpoint` to assert that message exchanges carry the expected metadata. It also demonstrates how to override header filtering and serialization strategies in Kafka endpoints.
This file is part of the broader Kafka component integration tests and interacts with Kafka brokers, Camel routes, and Kafka consumers to ensure correctness and robustness of message production features.
Class: KafkaProducerFullIT
Extends
BaseKafkaTestSupport
Purpose
To perform comprehensive integration tests validating the Kafka producer functionality within Camel routes.
Provides a variety of test scenarios for sending messages to Kafka topics and verifying their reception.
Tests header propagation, message interception, and configuration overrides related to Kafka message production.
Constants
Constant | Description |
|---|---|
`DIRECT_START_STRINGS_URI` | URI for Camel route that sends string messages to Kafka. |
`DIRECT_START_STRINGS_2_URI` | URI for another Camel route sending string messages. |
`DIRECT_START_BYTES_URI` | URI for Camel route that sends byte array messages to Kafka. |
`DIRECT_START_TRACED_URI` | URI for Camel route that sends messages intercepted by a producer interceptor. |
`DIRECT_PROPAGATED_HEADERS_URI` | URI for route that tests header propagation to Kafka messages. |
`DIRECT_NO_RECORD_SPECIFIC_HEADERS_URI` | URI for route that ensures record-specific headers are not propagated. |
`TOPIC_STRINGS` | Kafka topic name for string messages. |
`TOPIC_INTERCEPTED` | Kafka topic used for interception tests. |
`TOPIC_STRINGS_IN_HEADER` | Alternative topic name passed via message headers. |
`TOPIC_BYTES` | Kafka topic name for byte array messages. |
`TOPIC_BYTES_IN_HEADER` | Alternative topic for byte array messages passed via headers. |
`GROUP_BYTES` | Kafka consumer group id for byte array consumers. |
`TOPIC_PROPAGATED_HEADERS` | Kafka topic for testing header propagation. |
`TOPIC_NO_RECORD_SPECIFIC_HEADERS` | Kafka topic for testing filtering out record-specific headers. |
`KAFKA_ACK_MOCK` | Mock endpoint URI for capturing Kafka acknowledgments. |
Fields
Field | Description |
|---|---|
`stringsConsumerConn` | KafkaConsumer for consuming string messages from Kafka topics. |
`bytesConsumerConn` | KafkaConsumer for consuming byte array messages from Kafka topics. |
`stringsTemplate` | Camel ProducerTemplate for sending string messages. |
`stringsTemplate2` | Second Camel ProducerTemplate for string messages (used for different routes). |
`bytesTemplate` | Camel ProducerTemplate for sending byte messages. |
`interceptedTemplate` | ProducerTemplate used for testing producer interceptors. |
`propagatedHeadersTemplate` | ProducerTemplate for testing header propagation. |
`noRecordSpecificHeadersTemplate` | ProducerTemplate for testing header filtering of record-specific headers. |
`headerFilterStrategy` | Custom header filter strategy bound to Camel registry. |
`headersSerializer` | Custom Kafka header serializer bound to Camel registry. |
Lifecycle Methods
@BeforeAll before()
Initializes Kafka consumers for string and byte messages before all tests.
Uses static helper methods to create consumers with appropriate deserializers and configurations.
@AfterAll after()
Cleans up Kafka topics used during tests by deleting them via Kafka Admin Client.
@BeforeEach setupProducerTemplates()
Obtains fresh
ProducerTemplateinstances from the Camel context before each test, ensuring clean state.
Route Builder: createRouteBuilder()
Defines Camel routes that send messages from direct endpoints to Kafka topics.
Strings routes send to
TOPIC_STRINGSwith record metadata and required acknowledgments.Byte array routes send to
TOPIC_BYTESwith appropriate serializers.Intercepted messages route adds a
MockProducerInterceptorclass.Propagated headers and no record-specific headers routes test header behavior.
Example route snippet:
from(DIRECT_START_STRINGS_URI)
.to("kafka:" + TOPIC_STRINGS + "?recordMetadata=true&requestRequiredAcks=-1")
.to(KAFKA_ACK_MOCK);
Key Test Methods
producedStringMessageIsReceivedByKafka()
Sends 10 messages to
TOPIC_STRINGSand 5 messages toTOPIC_STRINGS_IN_HEADER.Waits for messages to be consumed via Kafka consumer.
Asserts that all messages were received.
Verifies that the Camel
MockEndpointreceived 15 exchanges with correct KafkaRecordMetadataheaders.
producedString2MessageIsReceivedByKafka()
Similar to the above but uses the second string producer template and route.
Ensures messages sent through a different route are also properly received.
producedStringMessageIsIntercepted()
Sends messages via the route with a Kafka
MockProducerInterceptor.Asserts that the interceptor captured all sent records.
producedStringCollectionMessageIsReceivedByKafka()
Sends collections (lists) of messages in a single exchange.
Verifies that the right number of
RecordMetadataobjects are present in the exchange headers.
producedBytesMessageIsReceivedByKafka()
Sends byte array messages to Kafka.
Uses byte array KafkaConsumer to consume and verify messages in both default and header-specified topics.
propagatedHeaderIsReceivedByKafka()
Sends a message with various types of headers (String, Integer, Long, Double, byte[], Boolean).
Verifies that these headers are properly serialized and propagated to the Kafka message headers.
Ensures custom or filtered headers are not propagated.
recordSpecificHeaderIsNotReceivedByKafka()
Sends a message with a Kafka record-specific header (
KafkaConstants.TOPIC).Asserts that this header is not propagated to the Kafka message.
headerFilterStrategyCouldBeOverridden()
Tests that a custom header filter strategy (
MyHeaderFilterStrategy) can be injected and used by a Kafka endpoint.
headerSerializerCouldBeOverridden()
Tests overriding the Kafka headers serializer with a custom implementation (
MyKafkaHeadersSerializer).
Helper Methods
createStringKafkaConsumer(String groupId)
Creates and configures a KafkaConsumer for String key and value deserialization.
Configured with auto commit, session timeout, earliest offset reset, and bootstrap servers.
createByteKafkaConsumer(String groupId)
Creates and configures a KafkaConsumer for byte array key and value deserialization.
Similar settings as the string consumer but with byte deserializers.
createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn, String topic, String topicInHeader, CountDownLatch messagesLatch)
Subscribes consumer to two topics.
Polls Kafka in a loop until all expected messages are consumed (countdown latch reaches zero).
createKafkaBytesMessageConsumer(KafkaConsumer<byte[], byte[]> consumerConn, String topic, String topicInHeader, CountDownLatch messagesLatch)
Same as above but for byte array consumers.
pollForRecords(KafkaConsumer<String, String> consumerConn, String topic, CountDownLatch messagesLatch)
Starts a new thread to poll Kafka for records.
Collects records into a list while counting down the latch as messages arrive.
Returns the collected records for assertions.
getHeaderValue(String headerKey, Headers headers)
Utility to find and return the byte array value of a Kafka header by key.
Asserts the header is present.
Inner Classes
MyHeaderFilterStrategy extends DefaultHeaderFilterStrategy
Custom header filter strategy registered to Camel registry.
Can be used to customize which headers are propagated to Kafka.
MyKafkaHeadersSerializer extends DefaultKafkaHeaderSerializer
Custom Kafka headers serializer registered to Camel registry.
Allows custom serialization logic for Kafka message headers.
Important Implementation Details
Uses Apache Camel's
ProducerTemplateto send messages to Kafka topics via direct endpoints.Employs
CountDownLatchfor synchronization to wait until Kafka consumers have received all expected messages.Kafka consumers are created with specific deserializers matching message types.
Messages can be sent individually or as collections.
Kafka record metadata is retrieved and asserted to ensure correct publishing.
Producer interceptors are tested to verify interception of messages.
Header propagation tests include diverse data types and ensure filtering of unwanted headers.
Custom header filtering and serialization demonstrate pluggability of the Kafka component.
Interactions with Other System Components
Apache Camel Context: Uses Camel routes and producer templates to send messages.
Kafka Broker: Sends messages to and consumes messages from Kafka topics.
Kafka Admin Client: Deletes test topics after tests finish.
Camel MockEndpoint: Captures messages post-Kafka production for assertion.
KafkaConsumer: Embedded consumers verify messages are received on Kafka topics.
Producer Interceptor: Mock interceptor to test Kafka producer hooks.
Registry: Binds custom header filter and serializer for endpoint configuration.
Usage Example
Sending string messages to Kafka in a test scenario:
int messageCount = 10;
CountDownLatch latch = new CountDownLatch(messageCount);
sendMessagesInRoute(DIRECT_START_STRINGS_URI, messageCount, stringsTemplate, "Test message", KafkaConstants.PARTITION_KEY, "0");
createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, latch);
latch.await(200, TimeUnit.MILLISECONDS);
Mermaid Class Diagram
classDiagram
class KafkaProducerFullIT {
-static final String DIRECT_START_STRINGS_URI
-static final String DIRECT_START_STRINGS_2_URI
-static final String DIRECT_START_BYTES_URI
-static final String DIRECT_START_TRACED_URI
-static final String DIRECT_PROPAGATED_HEADERS_URI
-static final String DIRECT_NO_RECORD_SPECIFIC_HEADERS_URI
-static final String TOPIC_STRINGS
-static final String TOPIC_INTERCEPTED
-static final String TOPIC_STRINGS_IN_HEADER
-static final String TOPIC_BYTES
-static final String TOPIC_BYTES_IN_HEADER
-static final String GROUP_BYTES
-static final String TOPIC_PROPAGATED_HEADERS
-static final String TOPIC_NO_RECORD_SPECIFIC_HEADERS
-static final String KAFKA_ACK_MOCK
-static KafkaConsumer<String,String> stringsConsumerConn
-static KafkaConsumer<byte[],byte[]> bytesConsumerConn
-ProducerTemplate stringsTemplate
-ProducerTemplate stringsTemplate2
-ProducerTemplate bytesTemplate
-ProducerTemplate interceptedTemplate
-ProducerTemplate propagatedHeadersTemplate
-ProducerTemplate noRecordSpecificHeadersTemplate
-MyHeaderFilterStrategy headerFilterStrategy
-MyKafkaHeadersSerializer headersSerializer
+static void before()
+static void after()
+void setupProducerTemplates()
+RouteBuilder createRouteBuilder()
+void producedStringMessageIsReceivedByKafka()
+void producedString2MessageIsReceivedByKafka()
+void producedStringMessageIsIntercepted()
+void producedStringCollectionMessageIsReceivedByKafka()
+void producedBytesMessageIsReceivedByKafka()
+void propagatedHeaderIsReceivedByKafka()
+void recordSpecificHeaderIsNotReceivedByKafka()
+void headerFilterStrategyCouldBeOverridden()
+void headerSerializerCouldBeOverridden()
-static KafkaConsumer<String,String> createStringKafkaConsumer(String)
-static KafkaConsumer<byte[],byte[]> createByteKafkaConsumer(String)
-void createKafkaMessageConsumer(KafkaConsumer<String,String>, String, String, CountDownLatch)
-void createKafkaBytesMessageConsumer(KafkaConsumer<byte[],byte[]>, String, String, CountDownLatch)
-List<ConsumerRecord<String,String>> pollForRecords(KafkaConsumer<String,String>, String, CountDownLatch)
-byte[] getHeaderValue(String, Headers)
}
class MyHeaderFilterStrategy {
<<static>>
+MyHeaderFilterStrategy()
}
class MyKafkaHeadersSerializer {
<<static>>
+MyKafkaHeadersSerializer()
}
KafkaProducerFullIT "1" *-- "1" MyHeaderFilterStrategy : has
KafkaProducerFullIT "1" *-- "1" MyKafkaHeadersSerializer : has
Summary
`KafkaProducerFullIT.java` is a detailed integration test class validating Kafka message production within Apache Camel routes. It tests various message types, header propagation, interceptors, and configuration overrides to ensure the Camel Kafka component functions as expected. The class uses Camel's testing utilities, Kafka consumers, and synchronization primitives to assert message delivery and metadata correctness, providing a robust test suite for Kafka producer functionality.