KafkaConsumerAuthInvalidWithReconnectIT.java
Overview
`KafkaConsumerAuthInvalidWithReconnectIT.java` is an integration test class designed to validate the behavior of a Kafka consumer configured with SASL/JAAS authentication when encountering invalid authentication credentials and subsequently reconnecting with valid ones. It specifically tests a Kafka container supporting JAAS+SASL authentication and verifies that the consumer initially fails to connect due to invalid credentials but successfully reconnects after the Kafka service is restarted with valid credentials.
The class is part of the Apache Camel Kafka component integration tests. It leverages the Camel Kafka component, Kafka AdminClient API, and JUnit 5 with Awaitility for asynchronous assertions. The test ensures robustness of Kafka consumer error handling and reconnect logic when using SASL/PLAIN authentication protocol.
Detailed Explanation
Package and Imports
Located in the org.apache.camel.component.kafka.integration package.
Utilizes Apache Camel testing infrastructure, Kafka clients (
KafkaProducer,AdminClient), and JUnit 5 testing annotations.Uses a custom Kafka container service
ContainerLocalAuthKafkaServiceto run Kafka locally with different JAAS configurations.Uses
Awaitilitylibrary to wait for asynchronous state changes (e.g., consumer reconnection).
Class: KafkaConsumerAuthInvalidWithReconnectIT
Description
This class contains integration tests that:
Start a Kafka container with an invalid JAAS configuration.
Attempt to consume messages with invalid authentication credentials (expected to fail).
Restart Kafka with a valid JAAS configuration.
Verify that the Kafka consumer reconnects successfully after the configuration update.
Annotations
@EnabledIfSystemProperties: Ensures the tests run only if Kafka 3.x is available on the system.@TestMethodOrder(OrderAnnotation.class): Specifies the order of test execution.@RegisterExtension: Registers Camel context extensions for testing Camel routes.@RouteFixture: Defines Camel routes needed for the tests.
Fields and Constants
Name | Type | Description |
|---|---|---|
`TOPIC` | `String` | Kafka topic used in tests (`test-auth-invalid-with-reconnect`). |
`LOG` | `Logger` | Logger instance for debug and trace messages. |
`service` | `ContainerLocalAuthKafkaService` | Local Kafka container service with JAAS SASL config (static instance). |
`contextExtension` | `CamelContextExtension` | Camel context extension for route management. |
`producer` | `KafkaProducer` | Kafka producer used to send test messages. |
Static Initialization Block
Instantiates
servicewith a Kafka container using an invalid JAAS config file (/kafka-jaas-invalid.config).
Lifecycle Methods
beforeClass()
Runs once before all tests.
Initializes the Kafka container service.
Sets service properties for integration testing (e.g., bootstrap servers).
before()
Runs before each test.
Creates a Kafka producer configured with valid SASL credentials (
camel/camel-secret).Clears mock consumer interceptor records.
Fails the test if producer creation throws an exception.
after()
Runs after each test.
Closes the Kafka producer to release resources.
Route Configuration
Method: createRouteBuilder()
Returns a
RouteBuilderthat defines the Camel route for consuming messages from Kafka.Kafka consumer endpoint configuration includes:
SASL/PLAIN authentication with valid credentials.
Auto offset reset to earliest.
Auto commit enabled with 1-second interval.
pollOnError=RECONNECT to automatically reconnect on errors (key for testing reconnect).
The route processes received messages by logging them and forwarding to a mock Dead Letter Queue (
KafkaTestUtil.MOCK_DLQ).
Method: createRouteBuilder(CamelContext context)
Registers the route builder with given Camel context for test setup.
Tests
Test: testIsDisconnected() (Order 1)
Uses Kafka AdminClient to query the consumer group information for the test group.
Asserts that the consumer group has no members, confirming the consumer failed to connect due to invalid auth.
Ensures no unexpected exceptions occur during group info retrieval.
Test: testReconnect() (Order 2)
Shuts down the Kafka service running with invalid JAAS configuration.
Starts a new Kafka service instance with a valid JAAS configuration (
/kafka-jaas.config).Uses Awaitility to wait (max 30 seconds) until the consumer connects successfully.
Verifies the consumer group exists and has members using the helper method
assertIsConnected().
Helper Method: assertIsConnected(AdminClient adminClient)
Retrieves consumer group information for the test group.
Asserts that the group exists and is not null.
Used to verify successful reconnection of the Kafka consumer.
Important Implementation Details and Algorithms
The test leverages the property pollOnError=RECONNECT in Kafka consumer endpoint configuration, which instructs the Camel Kafka consumer to automatically reconnect on polling errors (like authentication failures).
Switching Kafka container JAAS config simulates an environment where the consumer initially cannot connect but can connect after the server is reconfigured.
Use of Awaitility provides a reliable way to asynchronously wait for Kafka consumer state changes instead of using arbitrary sleep.
Interaction with Other System Components
Camel Kafka Component: This test verifies the integration and error handling behavior of the Camel Kafka component's consumer endpoint with SASL authentication.
Kafka AdminClient: Used to introspect Kafka consumer groups to verify connection states.
ContainerLocalAuthKafkaService: Provides a controllable Kafka container environment with different JAAS configurations for testing authentication scenarios.
Camel Context & Routes: The test dynamically adds Kafka consumer routes into a Camel context to simulate real message consumption.
Usage Example
This test class is meant to be run as part of the integration test suite for Apache Camel Kafka component. Upon execution, it validates that a Kafka consumer:
Fails to connect when Kafka is configured with invalid SASL credentials.
Successfully reconnects when Kafka is restarted with correct SASL credentials.
Mermaid Class Diagram
classDiagram
class KafkaConsumerAuthInvalidWithReconnectIT {
<<test class>>
+String TOPIC
-Logger LOG
-ContainerLocalAuthKafkaService service
-CamelContextExtension contextExtension
-KafkaProducer<String,String> producer
+static void beforeClass()
+void before()
+void after()
+void createRouteBuilder(CamelContext context)
+RouteBuilder createRouteBuilder()
+void testIsDisconnected()
+void testReconnect()
-void assertIsConnected(AdminClient adminClient)
}
Summary
`KafkaConsumerAuthInvalidWithReconnectIT.java` is a focused integration test that ensures robust handling of Kafka consumer authentication failures and automatic reconnection in Apache Camel Kafka component when using SASL/JAAS authentication. It programmatically manipulates the Kafka service's authentication configuration to simulate connection failures and recoveries, verifying the consumer's resilience and error recovery mechanisms.