KafkaProducerTest.java
Overview
`KafkaProducerTest.java` is a comprehensive unit test class designed to verify the behavior and correctness of the `KafkaProducer` component within the Apache Camel Kafka integration. The class uses JUnit 5 for testing and Mockito for mocking dependencies, simulating Kafka producer interactions without requiring a live Kafka broker.
The file tests various scenarios including synchronous and asynchronous message sending to Kafka topics, handling of headers such as topic overrides, partition keys, message keys, timestamps, and batch processing with individual headers. It ensures that messages are correctly sent with appropriate metadata and that exceptions are properly handled.
Class: KafkaProducerTest
Purpose
To validate the functionality of the
KafkaProducerclass used in Apache Camel's Kafka component.To ensure the producer correctly interprets and applies configuration, message headers, and batching strategies.
To confirm that Kafka producer interactions (sending records, handling callbacks, exceptions) behave as expected.
Key Fields
Field Name | Type | Description |
|---|---|---|
`producer` | `KafkaProducer` | Instance of the KafkaProducer under test. |
`endpoint` | `KafkaEndpoint` | Kafka endpoint configured for sending messages. |
`fromEndpoint` | `KafkaEndpoint` | A second Kafka endpoint used in some tests. |
`converter` | `TypeConverter` | Mocked type converter used in Camel context. |
`context` | `CamelContext` | Mocked Camel context. |
`exchange` | `Exchange` | Mocked exchange representing message exchanges. |
`ecc` | `ExtendedCamelContext` | Mocked extended Camel context. |
`in` | `Message` | Default Camel message used as incoming message in tests. |
`callback` | `AsyncCallback` | Mocked async callback interface for async processing. |
Constructor
public KafkaProducerTest() throws Exception
Initializes the Kafka component, endpoints, and producer.
Configures mock Kafka producer to simulate sending messages.
Sets up mocks for Camel context, exchange, and asynchronous callback.
Prepares the producer with a fixed thread pool worker pool for async tests.
Test Methods
Each test method validates particular functionality of the `KafkaProducer` class.
1. testPropertyBuilder()
Purpose: Verify Kafka producer properties are correctly built from endpoint configuration.
Checks: That the
bootstrap.serversproperty matches the configured brokers.
2. processSendsMessage()
Purpose: Test synchronous sending of a message with partition key header.
Setup: Sets topic, adds partition header, mocks exchange message.
Verifies: Kafka producer's
sendmethod is called and record metadata is attached.
3. processSendsMessageWithException()
Purpose: Verify that exceptions thrown by Kafka producer during sending are propagated.
Setup: Mocks producer to throw
ApiException.Verifies:
ApiExceptionis thrown whenprocessis invoked.
4. processAsyncSendsMessage()
Purpose: Test asynchronous sending with callback invocation on completion.
Setup: Sets topic, adds partition header.
Verifies: Callback's
onCompletionis invoked; record metadata is attached.
5. processAsyncSendsMessageWithException()
Purpose: Test asynchronous sending with Kafka producer throwing exception.
Setup: Mocks producer to throw
ApiExceptionon send.Verifies: Exception is set on exchange, callback's
done()called with success; metadata attached.
6. processSendsMessageWithTopicHeaderAndNoTopicInEndPoint()
Purpose: Send message with topic header while endpoint topic is null.
Verifies: Message sent to topic from header; metadata exists.
7. processSendsMessageWithTopicHeaderAndEndPoint()
Purpose: Send message with both endpoint topic and message topic header.
Verifies: Headers preserved; message sent with correct partition, topic, and key.
8. processSendsMessageWithOverrideTopicHeaderAndEndPoint()
Purpose: Send message with override topic and timestamp headers.
Verifies: Override topic header is removed after processing; message sent with override topic and key.
9. processRequiresTopicInEndpointOrInHeader()
Purpose: Ensure topic must be specified either in endpoint config or message header.
Verifies: Message sent to topic with key.
10. processRequiresTopicInConfiguration()
Purpose: Verify sending message when topic is specified only in configuration.
Verifies: Message sent to configured topic with key.
11. processDoesNotRequirePartitionHeader()
Purpose: Validate sending message without partition key header.
Verifies: Message sent successfully; metadata exists.
12. processSendsMessageWithPartitionKeyHeader()
Purpose: Send message with partition key and message key headers.
Verifies: Message sent with specified partition key, topic, and key.
13. processSendsMessageWithPartitionKeyHeaderOnly()
Purpose: Send message with only partition key header.
Verifies: Partition key and topic used correctly.
14. processSendsMessageWithMessageKeyHeader()
Purpose: Send message with only message key header.
Verifies: Message key and topic used correctly.
15. processSendsMessageWithMessageTimestampHeader()
Purpose: Send message with timestamp override header.
Verifies: Timestamp is respected in record metadata.
16. processSendMessageWithTopicHeader()
Purpose: Send message with topic header, key, and partition.
Verifies: Correct topic, partition, and key are used.
17. processSendsMessageWithMessageTopicName()
Purpose: Send message with only message topic name.
Verifies: Message sent to configured topic.
18. processSendsMessageWithListOfExchangesWithOverrideTopicHeaderOnEveryExchange()
Purpose: Test batch sending of multiple aggregated exchanges with individual override topics.
Verifies: Messages sent to respective override topics; metadata attached for each.
19. processSendsMessageWithListOfMessagesWithOverrideTopicHeaderOnEveryExchange()
Purpose: Test batch sending of aggregated messages with override topics.
Verifies: Messages sent correctly; metadata verified for each message.
20. processSendsMessageWithListOfExchangesWithIndividualHeaders()
Purpose: Test batch sending with individual headers enabled.
Verifies: Messages sent with expected individual header values.
21. processSendsMessageWithListOfMessagesWithIndividualHeaders()
Purpose: Similar to above, but with aggregated messages.
Verifies: Individual headers honored in sent messages.
Helper Methods
verifySendMessage(...)
Overloaded methods to verify that a Kafka `ProducerRecord` was sent with expected:
Partition key
Topic
Message key
Uses Mockito's `ArgumentCaptor` to capture sent records and assert correctness.
verifySendMessages(List<String> expectedTopics, List<String> expectedIndividualHeaderValues)
Verifies multiple messages sent with expected topics.
If individual headers are provided, verifies the last header with the key
someIndividualHeadermatches the expected values.
assertRecordMetadataExists()
Asserts that the Kafka record metadata is present in the message headers.
Overloaded to check for specific number of metadata entries or metadata per aggregated exchange/message.
aggregateExchanges(List<Exchange>, AggregationStrategy)
Aggregates a list of exchanges using the specified Camel
AggregationStrategy.Used to simulate batch sending scenarios.
createListOfExchangesWithTopics(List<String>)
Creates a list of
Exchangeobjects each with a unique override topic header and custom individual header.Used for batch testing with different topics and headers.
Important Implementation Details
Mocking Kafka Producer: The Kafka producer interactions are mocked to avoid live Kafka dependencies, allowing isolated unit testing.
Aggregation Strategies: Uses Camel's
GroupedExchangeAggregationStrategyandGroupedMessageAggregationStrategyto test batch sending scenarios.Headers Handling: Tests cover Kafka-specific headers such as
PARTITION_KEY,KEY,TOPIC,OVERRIDE_TOPIC, andOVERRIDE_TIMESTAMP, ensuring they influence the produced records correctly.Exception Handling: Tests confirm that exceptions during send operations are correctly propagated or handled asynchronously.
Async Processing: Uses
AsyncCallbackto verify asynchronous message sending behavior and callback invocation.Record Metadata: Ensures that Kafka
RecordMetadatais attached to the Camel message headers after sending, enabling downstream processing.
Interaction With Other Parts of the System
KafkaProducer: The class under test, responsible for sending Camel exchanges as Kafka messages.
KafkaEndpoint: Used to configure topics and Kafka client factories.
CamelContext & Exchange: Core Camel components that provide message routing and type conversion services.
Kafka Client Library: The underlying Kafka producer client is mocked but normally used to send messages to Kafka brokers.
AggregationStrategy: Camel's aggregation mechanism to combine multiple exchanges/messages for batch processing.
Headers and Constants: Uses
KafkaConstantsfor Kafka header keys to control message routing and metadata.
Usage Example
KafkaProducerTest test = new KafkaProducerTest();
test.processSendsMessage(); // tests sending a message synchronously with partition key
Mermaid Class Diagram
classDiagram
class KafkaProducerTest {
- static final String SOME_INDIVIDUAL_HEADER
- KafkaProducer producer
- KafkaEndpoint endpoint
- KafkaEndpoint fromEndpoint
- TypeConverter converter
- CamelContext context
- Exchange exchange
- ExtendedCamelContext ecc
- Message in
- AsyncCallback callback
+ KafkaProducerTest()
+ void testPropertyBuilder()
+ void processSendsMessage()
+ void processSendsMessageWithException()
+ void processAsyncSendsMessage()
+ void processAsyncSendsMessageWithException()
+ void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint()
+ void processSendsMessageWithTopicHeaderAndEndPoint()
+ void processSendsMessageWithOverrideTopicHeaderAndEndPoint()
+ void processRequiresTopicInEndpointOrInHeader()
+ void processRequiresTopicInConfiguration()
+ void processDoesNotRequirePartitionHeader()
+ void processSendsMessageWithPartitionKeyHeader()
+ void processSendsMessageWithPartitionKeyHeaderOnly()
+ void processSendsMessageWithMessageKeyHeader()
+ void processSendsMessageWithMessageTimestampHeader()
+ void processSendMessageWithTopicHeader()
+ void processSendsMessageWithMessageTopicName()
+ void processSendsMessageWithListOfExchangesWithOverrideTopicHeaderOnEveryExchange()
+ void processSendsMessageWithListOfMessagesWithOverrideTopicHeaderOnEveryExchange()
+ void processSendsMessageWithListOfExchangesWithIndividualHeaders()
+ void processSendsMessageWithListOfMessagesWithIndividualHeaders()
- void verifySendMessage(Integer, String, String)
- void verifySendMessage(Integer, String)
- void verifySendMessage(String, String)
- void verifySendMessage(String)
- void verifySendMessages(List~String~, List~String~)
- void assertRecordMetadataTimestampExists()
- void assertRecordMetadataExists()
- void assertRecordMetadataExists(int)
- void assertRecordMetadataExistsForEachAggregatedExchange()
- void assertRecordMetadataExistsForEachAggregatedMessage()
- Exchange aggregateExchanges(List~Exchange~, AggregationStrategy)
- List~Exchange~ createListOfExchangesWithTopics(List~String~)
}
Summary
`KafkaProducerTest.java` is a detailed and robust test suite that ensures the Apache Camel Kafka Producer behaves correctly in various scenarios, including synchronous/asynchronous sending, handling of Kafka-specific headers, exception management, and batch processing with aggregation strategies. By mocking dependencies and Kafka clients, it provides fast, isolated tests critical for maintaining Kafka integration stability within Apache Camel.