BatchingProcessingITSupport.java
Overview
`BatchingProcessingITSupport.java` is an **abstract integration test support class** designed for testing Kafka message batching and manual offset commit scenarios within the Apache Camel Kafka component. It extends `BaseKafkaTestSupport` and provides reusable setup, utility methods, and validation logic for derived classes that implement specific integration tests involving Kafka message consumption in batch mode.
The class focuses on verifying:
Correct batching of Kafka messages consumed by Camel routes.
Behavior of Kafka manual commit offset handling during route lifecycle changes (start, stop, restart).
Message content and headers integrity within batched exchanges.
This file plays a key role in ensuring that Kafka batch processing features work as expected in the Camel Kafka connector integration suite.
Classes and Methods
abstract class BatchingProcessingITSupport extends BaseKafkaTestSupport
This abstract class serves as the base for Kafka batch processing integration tests. It encapsulates Kafka producer setup, test lifecycle hooks, and common validation utilities.
Fields
Field | Type | Description |
|---|---|---|
Mock endpoint "mock:result" to capture test messages. | ||
Mock endpoint "mock:resultBar" (may be used by subclasses). | ||
Kafka producer used to send test messages to Kafka topics. |
Methods
@BeforeEach public void createClient()
Purpose: Initializes the Kafka producer client before each test execution.
Details: Uses default Kafka producer properties inherited from
BaseKafkaTestSupport.Usage: Automatically called by JUnit before each test case.
public void cleanupKafka(String topic)
Parameters:
topic- the Kafka topic name to clean up.
Purpose: Closes the Kafka producer and deletes the specified Kafka topic to ensure a clean test environment.
Details: Uses
kafkaAdminClient(inherited) to delete topics.Usage Example:
cleanupKafka("test-topic");
public void kafkaManualCommitTest(String topic) throws Exception
Parameters:
topic- Kafka topic to send and consume messages.
Purpose: Tests Kafka manual commit offset behavior in a multi-step process:
Sends 5 messages, expects to receive them in one batch.
Stops the Camel route to simulate downtime.
Sends 3 more messages during route downtime; expects no consumption.
Restarts the route and verifies consumption of the 3 new messages.
Throws:
Exceptionfor test failures or Kafka issues.Usage: Called from concrete test classes to verify manual commit semantics.
Implementation Details:
Uses Camel's route controller to stop/start routes.
Validates message batches received at the
mock:resultendpoint.Relies on offset commit behavior to ensure no message duplication or loss.
private static void validateReceivedExchanges(int expectedCount, List<Exchange> exchanges)
Parameters:
expectedCount- expected number of messages in the batch.exchanges- list of CamelExchangeobjects representing the batched Kafka messages.
Purpose: Validates the batch of received exchanges:
Ensures the batch is not null and matches expected size.
Checks that each element in the batch is itself an
Exchange.Verifies message body format and presence of Kafka headers (
partition,topic).Confirms message bodies start with
"message-".
Throws: Assertion errors on validation failures.
Usage: Internal utility method used by test steps to verify correctness of consumed batches.
protected void setupPostExecutionExpectations()
Purpose: Configures expectations on the
mock:resultendpoint after route restart.Details: Currently expects exactly one message (batch) to be received.
Usage: Can be overridden by subclasses to customize expectations.
protected void sendRecords(int startIndex, int lastIndex, String topic)
Parameters:
startIndex- starting index for message numbering.lastIndex- exclusive upper bound index (not included).topic- Kafka topic to which messages are sent.
Purpose: Sends a range of messages (
message-{index}) to the specified Kafka topic using the Kafka producer.Usage Example:
sendRecords(0, 5, "my-topic"); // sends message-0 to message-4Implementation Details: Messages are keyed with
"1"and sent asynchronously.
protected void setupPreExecutionExpectations()
Purpose: Configures expectations on the
mock:resultendpoint before the first sending of messages.Details: Sets expected message count to 1 batch.
Usage: Can be overridden for customized pre-execution expectations.
Important Implementation Details and Algorithms
Batch Consumption Validation: The class tests that Kafka messages are consumed in batches by Camel routes, verifying that each batch contains a list of Camel
Exchangeobjects. This is crucial for ensuring batch processing features in Kafka-Camel integration.Manual Offset Commit Testing: The
kafkaManualCommitTestmethod simulates realistic lifecycle events (route stop/start) and verifies that only messages after the committed offset are re-consumed on restart, ensuring no duplicates or message loss.Use of Camel Mock Endpoints: The class leverages Camel's
MockEndpointto set expectations and assert the receipt of messages, enabling precise integration testing of route behavior.Separation of Setup and Validation: Helper methods like
setupPreExecutionExpectations,setupPostExecutionExpectations, andvalidateReceivedExchangesmodularize the test logic for clarity and reuse.
Interaction with Other Components
Extends
BaseKafkaTestSupportfor inherited Kafka admin utilities and shared test infrastructure.Uses Kafka Producer API (
KafkaProducer) to send messages to topics.Interacts with Apache Camel components:
MockEndpointto capture and assert test messages.Camel context and route controller to start/stop routes dynamically.
Integrates with Kafka broker via Kafka Admin client (
kafkaAdminClient) to clean test topics.Intended to be extended by concrete integration test classes that define specific Camel routes and topics for batch processing tests.
Visual Diagram
classDiagram
class BatchingProcessingITSupport {
-Logger LOG
-MockEndpoint to
-MockEndpoint toBar
-KafkaProducer<String,String> producer
+void createClient()
+void cleanupKafka(String topic)
+void kafkaManualCommitTest(String topic)
-static void validateReceivedExchanges(int expectedCount, List<Exchange> exchanges)
+void setupPostExecutionExpectations()
+void sendRecords(int startIndex, int lastIndex, String topic)
+void setupPreExecutionExpectations()
}
BatchingProcessingITSupport --|> BaseKafkaTestSupport
Usage Example in a Concrete Test Class
public class MyBatchingIT extends BatchingProcessingITSupport {
@Test
public void testKafkaBatchManualCommit() throws Exception {
String topic = "test-topic";
// Clean topic before test
cleanupKafka(topic);
// Execute manual commit test scenario
kafkaManualCommitTest(topic);
}
@Override
protected void setupPreExecutionExpectations() {
to.expectedMessageCount(1);
}
@Override
protected void setupPostExecutionExpectations() {
to.expectedMessageCount(1);
}
}
This documentation provides a complete understanding of the `BatchingProcessingITSupport.java` file, its role in Kafka batch processing integration testing within Apache Camel, and guidelines for extending and utilizing it in test suites.