KafkaConsumerStopIT.java
Overview
`KafkaConsumerStopIT` is an integration test class designed to verify the behavior of the Kafka consumer component in the Apache Camel Kafka integration. Specifically, it tests that the underlying Kafka client consumer (`org.apache.kafka.clients.consumer.KafkaConsumer`) is properly closed when the associated Camel Kafka consumer route is stopped.
This test ensures resource cleanup and proper lifecycle management of Kafka consumers within Camel routes, preventing resource leaks and potential issues in production environments. It extends `BaseKafkaTestSupport` to leverage Kafka test utilities and infrastructure.
Class: KafkaConsumerStopIT
Description
Integration test class for verifying Kafka consumer shutdown behavior in Apache Camel Kafka component.
Uses JUnit 5 for structured testing with lifecycle callbacks and ordered tests.
Sets up a Kafka producer to publish test messages.
Defines a Camel route consuming from a Kafka topic with specific consumer configurations and a mock interceptor.
Tests that when the route is stopped, the underlying Kafka consumer client is closed correctly.
Key Constants
Constant | Description |
|---|---|
`TOPIC` | Kafka topic name used for testing (`"test-full"`). |
Fields
Field | Type | Description |
|---|---|---|
`producer` | Kafka producer for sending test messages to the topic. | |
`LOG` | `Logger` | Logger instance for logging test-related messages. |
Methods
before()
Annotations:
@BeforeEachDescription: Setup method executed before each test.
Functionality: Initializes the Kafka producer with default properties and clears any previously captured records from the
MockConsumerInterceptor.Parameters: None
Returns: void
after()
Annotations:
@AfterEachDescription: Cleanup method executed after each test.
Functionality: Closes the Kafka producer if open and deletes the test Kafka topic to clean up resources.
Parameters: None
Returns: void
createRouteBuilder()
Overrides:
BaseKafkaTestSupport#createRouteBuilder()Description: Provides the Camel route configuration used in the tests.
Functionality:
Creates a Kafka consumer route consuming from
TOPICwith group IDKafkaConsumerFullIT.Configures Kafka consumer properties such as deserializers, auto offset reset, commit intervals, poll timeout, and interceptor classes.
The route processes each consumed message by logging it at trace level.
Sends the processed message to a mock endpoint for assertions.
Returns:
RouteBuilderinstance
**Usage example:**
RouteBuilder routeBuilder = kafkaConsumerStopIT.createRouteBuilder();
context.addRoutes(routeBuilder);
kafkaClientConsumerClosed(org.apache.kafka.clients.consumer.KafkaConsumer kafkaClientConsumer)
Description: Checks if the internal Kafka client consumer has been closed.
Parameters:
kafkaClientConsumer- the KafkaConsumer instance to check.
Returns:
boolean-trueif the consumer is closed; otherwisefalse.Implementation details:
Uses reflection to access the private
delegatefield of the KafkaConsumer, which is aClassicKafkaConsumer.Then accesses the private
closedboolean field to determine if the consumer has been closed.
Throws: Exception if reflection fails.
getKafkaClientConsumer()
Description: Retrieves the internal
org.apache.kafka.clients.consumer.KafkaConsumerinstance used by the Camel Kafka consumer route.Parameters: None
Returns:
org.apache.kafka.clients.consumer.KafkaConsumerImplementation details:
Obtains the Camel KafkaConsumer instance from the route identified by
"full-it".Uses reflection to access the private
tasksfield (list ofKafkaFetchRecords).Retrieves the first
KafkaFetchRecordsand extracts the privateconsumerfield.
Throws: Exception if reflection access fails.
kafkaClientConsumerClosedWhenKafkaRouteStopped()
Annotations:
@TestDescription: Main test method verifying that Kafka client consumer closes when the Camel Kafka route is stopped.
Test Steps:
Sets up a mock endpoint expectation to receive the message
"message".Sends a Kafka producer record with key
"1"and value"message"to theTOPIC.Asserts that the mock endpoint received the expected message within 3 seconds.
Retrieves the underlying Kafka client consumer instance.
Stops the Camel route
"full-it".Waits up to 10 seconds and repeatedly checks that the Kafka client consumer is closed.
Assertions:
The Kafka client consumer should be closed after the route is stopped.
Throws: Exception on test failure or reflection errors.
Important Implementation Details
Reflection Usage: The test uses JUnit Platform's
ReflectionUtilsandTryutility for safely accessing private fields inside Kafka consumer classes, which are not exposed by the public API. This is necessary to verify the internal state of the Kafka client consumer.MockConsumerInterceptor: The route includes
MockConsumerInterceptoras an interceptor class to capture records during consumption for verification.Awaitility: The test uses the Awaitility library to wait asynchronously for the Kafka client consumer to be closed, improving reliability in asynchronous conditions.
Route Configuration: Kafka consumer properties are carefully set to ensure predictable behavior during testing, such as earliest offset reset and auto commits enabled.
Interaction with Other System Components
Camel Routes: This class defines and tests a Camel route that consumes Kafka messages.
Kafka Cluster: The test interacts with a Kafka cluster (assumed to be available) by creating a producer and consuming from a test topic.
Kafka Admin Client: Used to clean up topics after tests.
Camel Testing Framework: Uses Camel's
MockEndpointto assert message receipt.Kafka Consumer Internals: Verifies internal Kafka consumer lifecycle states by accessing private fields.
Usage Example
KafkaConsumerStopIT test = new KafkaConsumerStopIT();
test.before();
try {
test.kafkaClientConsumerClosedWhenKafkaRouteStopped();
} finally {
test.after();
}
This example shows how the test lifecycle methods and main test method might be invoked programmatically (usually managed by JUnit).
Mermaid Class Diagram
classDiagram
class KafkaConsumerStopIT {
+static final String TOPIC
-Logger LOG
-KafkaProducer<String,String> producer
+void before()
+void after()
+RouteBuilder createRouteBuilder()
+void kafkaClientConsumerClosedWhenKafkaRouteStopped()
-KafkaConsumer getKafkaClientConsumer()
-boolean kafkaClientConsumerClosed(KafkaConsumer kafkaClientConsumer)
}
KafkaConsumerStopIT --|> BaseKafkaTestSupport
Summary
`KafkaConsumerStopIT.java` is a focused integration test class ensuring that Kafka consumers managed by Apache Camel routes properly close their underlying Kafka client consumers when the route is stopped. It uses a combination of Kafka producer/consumer, Camel routes, reflection for internal state verification, and Awaitility to robustly test consumer lifecycle management, critical for resource safety in real-world Kafka-Camel applications.