KafkaConsumerAuthManualTest.java
Overview
`KafkaConsumerAuthManualTest.java` is a JUnit integration test class designed to validate the ability of Apache Camel's Kafka component to authenticate and consume messages from a remote Kafka cluster secured with SASL authentication.
This test class connects to a Kafka broker specified through system properties, configures authentication parameters (username, password, SASL mechanism, security protocol), and verifies that messages produced to a secure Kafka topic are correctly consumed by a Camel route. The test ensures that Camel’s Kafka consumer can handle authentication and message consumption end-to-end against a real Kafka cluster with security enabled.
The class uses Camel's testing infrastructure (`CamelContextExtension`) and Kafka client libraries to produce and consume messages. It also leverages Awaitility to wait asynchronously for message consumption.
Class: KafkaConsumerAuthManualTest
Description
This is a JUnit 5 test class that performs manual testing of Kafka consumer authentication using SASL mechanisms in a secure environment. It runs only if required system properties (e.g., Kafka bootstrap servers, topic, username, password) are correctly provided, enabling flexible manual test execution.
Annotations
@TestMethodOrder(MethodOrderer.OrderAnnotation.class): Controls test execution order.@TestInstance(TestInstance.Lifecycle.PER_CLASS): Uses a single test instance for all tests.@EnabledIfSystemProperties: Enables tests only when specific system properties are set.@RegisterExtension: Registers the Camel context extension for Camel testing.
Constants / Fields
Field | Description |
|---|---|
`TOPIC` | Kafka topic to consume messages from, read from system property `kafka.manual.test.topic`. |
`BOOTSTRAP_SERVERS` | Kafka bootstrap servers URL, read from system property `bootstrapServers`. |
`USERNAME` | Kafka SASL username, read from system property `kafka.manual.test.username`. |
`PASSWORD` | Kafka SASL password, read from system property `kafka.manual.test.password`. |
`SECURITY_PROTOCOL` | Kafka security protocol (default: "SASL_PLAINTEXT"). |
`SASL_MECHANISM` | SASL mechanism (default: "PLAIN"). |
`MESSAGE_COUNT` | Number of test messages to consume (default: 5). |
`contextExtension` | CamelContext extension to manage Camel lifecycle in tests. |
`receivedMessages` | Volatile counter to track number of consumed messages. |
`producer` | Kafka producer instance used to send messages in tests. |
Methods
protected Properties getDefaultProperties()
Returns a `Properties` object with Kafka client configuration, including SASL authentication settings.
Functionality:
Calls KafkaTestUtil.getDefaultProperties to get base Kafka properties.
Adds SASL JAAS configuration string generated from username and password.
Sets security protocol and SASL mechanism.
Returns:
Propertiesconfigured for SASL authentication.Usage Example:
Properties props = getDefaultProperties();
// Use props to configure KafkaProducer or KafkaConsumer
@BeforeEach public void before()
Sets up the Kafka producer before each test method.
Functionality:
Retrieves SASL-enabled Kafka properties using
getDefaultProperties().Initializes the Kafka producer instance.
Error Handling:
Prints stack trace and fails the test if producer initialization fails.
@AfterEach public void after()
Cleans up resources after each test.
Functionality:
Closes the Kafka producer if not null.
Placeholder comment for cleaning test topics (not implemented).
@RouteFixture public void createRouteBuilder(CamelContext context)
Adds the Camel route to the Camel context before tests.
Functionality:
Calls
createRouteBuilder()and adds the route to the provided Camel context.
protected RouteBuilder createRouteBuilder()
Creates and returns a Camel `RouteBuilder` defining the Kafka consumer route.
Route Details:
Consumes from the Kafka topic using authentication parameters.
Uses:
brokers: bootstrap serversgroupId:"KafkaConsumerAuthManualTest"autoOffsetReset:"earliest"(starts consuming from earliest offset)clientId:"camel-kafka-auth-test"pollOnError:"RECONNECT"(retry on consumer poll errors)SASL mechanism, security protocol, and JAAS config for authentication.
On each message, increments
receivedMessages.Routes messages to a mock endpoint (
KafkaTestUtil.MOCK_RESULT) for assertions.
Returns:
RouteBuilderinstance with the configured route.Usage Example:
RouteBuilder routeBuilder = createRouteBuilder();
camelContext.addRoutes(routeBuilder);
@DisplayName("Tests that Camel can adequately connect and consume from an authenticated remote Kafka instance")
`@Test public void kafkaMessageIsConsumedByCamel() throws InterruptedException`
Main integration test method.
Purpose:
Validates that Camel’s Kafka consumer properly authenticates and consumes messages from a secure Kafka topic.Test Steps:
Retrieves the mock endpoint to assert messages.
Sets expectations on the mock endpoint:
Expected message count = 5.
Expected message bodies:
"message-0"through"message-4"(any order).Expected header values for commit flags.
Uses Awaitility to wait up to 1 hour for
receivedMessagescount to reach expected count.Asserts that the mock endpoint expectations are satisfied.
Throws:
InterruptedExceptionif thread waiting is interrupted.
Implementation Details and Algorithms
Authentication Setup:
SASL authentication is configured using JAAS config strings generated byContainerLocalAuthKafkaService.generateSimpleSaslJaasConfig(username, password). This string is injected into Kafka consumer and producer properties to enable SASL/PLAIN or other SASL mechanisms.Camel Route Configuration:
The Kafka consumer route is dynamically built with connection and authentication parameters passed as URI query parameters to the Camel Kafka component.Message Counting:
The route increments a volatile integerreceivedMessageson every consumed message, which is used in the test to verify the number of messages consumed.Awaitility Usage:
The test uses Awaitility to asynchronously wait until the expected number of messages are consumed, preventing flaky tests due to timing issues.
Interaction with Other Components
Camel Kafka Component:
This test validates the Kafka consumer integration within Apache Camel, specifically focusing on authentication capabilities.Kafka Cluster:
Requires an externally running Kafka cluster secured with SASL authentication, with topics and credentials provided via system properties.KafkaTestUtil:
Provides utility methods and constants, including default Kafka properties and mock endpoint name.ContainerLocalAuthKafkaService:
Generates SASL JAAS configuration strings used for authentication.Camel Testing Infrastructure:
UsesCamelContextExtensionandMockEndpointto manage Camel lifecycle and assert message processing.
Usage Example
To run this integration test manually, supply the following JVM system properties:
-DbootstrapServers=localhost:9093
-Dkafka.manual.test.topic=test-topic
-Dkafka.manual.test.username=testuser
-Dkafka.manual.test.password=testpassword
-Dkafka.manual.test.security.protocol=SASL_PLAINTEXT
-Dkafka.manual.test.sasl.mechanism=PLAIN
-Dkafka.manual.test.message.count=5
Then execute the test via Maven or your IDE. The test will produce messages externally (outside this class) and verify that Camel consumes them correctly.
Mermaid Class Diagram
classDiagram
class KafkaConsumerAuthManualTest {
-static final String TOPIC
-static final String BOOTSTRAP_SERVERS
-static final String USERNAME
-static final String PASSWORD
-static final String SECURITY_PROTOCOL
-static final String SASL_MECHANISM
-static final int MESSAGE_COUNT
-static CamelContextExtension contextExtension
-volatile int receivedMessages
-KafkaProducer<String,String> producer
+Properties getDefaultProperties()
+void before()
+void after()
+void createRouteBuilder(CamelContext)
+RouteBuilder createRouteBuilder()
+void kafkaMessageIsConsumedByCamel() throws InterruptedException
}
Summary
`KafkaConsumerAuthManualTest.java` is a focused integration test class that verifies Apache Camel's Kafka consumer can authenticate using SASL and consume messages from a secured Kafka cluster. It establishes a Camel route with SASL configuration, uses a Kafka producer to send messages, and asserts message consumption via Camel's testing framework. This class is intended for manual or CI environments where secure Kafka clusters are available and credentials are supplied via system properties.