KafkaConsumerAuthIT.java
Overview
`KafkaConsumerAuthIT.java` is an integration test class designed to validate the authentication and message consumption capabilities of the Apache Camel Kafka component when interacting with a Kafka broker secured using JAAS and SASL (PLAIN) authentication. It leverages a local Kafka container (`ContainerLocalAuthKafkaService`) configured with SASL/JAAS security to ensure that Camel can successfully produce and consume messages in a secure Kafka environment.
The test verifies that:
Camel can connect to a secured Kafka cluster using SASL/PLAIN authentication.
Messages produced to a Kafka topic are consumed by a Camel route.
Custom Kafka headers are propagated correctly.
Kafka consumer interceptors capture the records as expected.
The class uses JUnit 5 for test lifecycle management, Camel testing support for route and context management, and Kafka clients for producing test data.
Detailed Description
Package
package org.apache.camel.component.kafka.integration;
This class resides in the integration tests package of the Camel Kafka component.
Class: KafkaConsumerAuthIT
This is the main test class annotated with JUnit 5 and Camel test infrastructure annotations to enable integration testing with an authenticated Kafka instance.
Annotations:
@EnabledIfSystemProperties: Enables this test only if the system propertykafka.instance.typematches eitherlocal-kafka3-containerorkafka— both requiring Kafka 3.x.@TestMethodOrder(MethodOrderer.OrderAnnotation.class): Specifies that test methods will be executed in the order defined by the@Orderannotation.
Constants:
TOPIC: The Kafka topic name used in tests ("test-auth-full").
Static Fields:
service: AContainerLocalAuthKafkaServiceinstance that manages a Kafka container configured with JAAS/SASL authentication. It is initialized with a JAAS config file (/kafka-jaas.config).contextExtension: Camel context extension for test lifecycle management.kafkaAdminClient: Kafka AdminClient used for managing Kafka topics.LOG: Logger instance for debug and trace logging.
Instance Fields:
producer: Kafka producer instance used to send messages to the Kafka topic during tests.
Lifecycle Methods
@BeforeEach void before()
Initializes Kafka producer with SASL/PLAIN authentication properties.
Clears any captured records in
MockConsumerInterceptor.Properties for authentication include:
SASL JAAS config generated for user
"camel"with password"camel-secret".Security protocol set as
"SASL_PLAINTEXT".SASL mechanism set as
"PLAIN".
@BeforeEach void setKafkaAdminClient()
Creates a singleton Kafka AdminClient if not already initialized. The client is used to manage Kafka topics during tests.
@AfterEach void after()
Closes the Kafka producer.
Deletes the test topic (
test-auth-full) to clean up between tests.
Route Configuration
void createRouteBuilder(CamelContext context)
Adds the route returned by
createRouteBuilder()to the Camel context.
RouteBuilder createRouteBuilder()
Returns a Camel `RouteBuilder` that:
Defines a Kafka consumer route consuming from the secure Kafka topic
test-auth-full.Uses SASL/PLAIN authentication with JAAS config.
Configured with:
Kafka brokers from the
service.Group ID:
"KafkaConsumerAuthIT".Auto offset reset to
"earliest".Key and value deserializers for String.
Client ID:
"camel-kafka-auth-test".Auto commit enabled with 1000 ms interval.
Poll timeout of 1000 ms.
Kafka consumer interceptor class
MockConsumerInterceptor.
Processes each consumed exchange with a trace log.
Routes messages to a Camel Mock endpoint (
KafkaTestUtil.MOCK_RESULT) for assertions.
Tests
@Test void kafkaMessageIsConsumedByCamel()
Purpose: Verifies that Camel can consume messages from the authenticated Kafka instance.
Timeout: 30 seconds.
Order: 1 (runs first).
Test Steps:
Defines expected messages (
message-0tomessage-4) and expected headers.Calls
populateKafkaTopicto send 5 messages to the Kafka topic with custom headers.Asserts that the Mock endpoint receives the expected messages and headers.
Verifies that
MockConsumerInterceptorcaptured exactly 5 records.Checks that the "CamelSkippedHeader" is not propagated but "PropagatedCustomHeader" is present.
Helper Methods
private void populateKafkaTopic(String propagatedHeaderKey, byte[] propagatedHeaderValue)
Sends 5 messages (
message-0tomessage-4) to the Kafka topic.Each
ProducerRecordcontains two headers:"CamelSkippedHeader"with value"skipped header value".Custom propagated header with the provided key and value.
Important Implementation Details
Authentication Setup: Uses JAAS configuration with SASL/PLAIN mechanism to authenticate the Kafka producer and consumer with the Kafka broker.
Kafka Container: The use of
ContainerLocalAuthKafkaServiceallows running integration tests against a real Kafka broker in a containerized environment with authentication enabled.Consumer Interceptor: The route uses
MockConsumerInterceptorto capture consumed records for validation.Headers Propagation: Tests confirm that certain headers are propagated or skipped as per Kafka consumer interceptor behavior.
Test Isolation: Topics are deleted after each test to ensure clean state.
Interactions with Other System Components
Apache Camel Kafka Component: This test validates the Kafka component's ability to work with SASL-authenticated Kafka clusters.
Kafka Broker (ContainerLocalAuthKafkaService): Provides a secured Kafka environment for testing.
Kafka AdminClient: Used to manage Kafka topics during the test lifecycle.
Camel Mock Component: Captures route outputs for assertions.
MockConsumerInterceptor: Kafka consumer interceptor used to capture records for verification.
Usage Example
This class is primarily used as an integration test and is not intended for direct use in application code. However, it serves as an example of how to:
Configure Kafka clients with SASL/JAAS authentication.
Define Camel routes consuming from SASL-enabled Kafka topics.
Produce messages with custom headers to Kafka.
Assert message consumption and header propagation in Camel routes.
Mermaid Class Diagram
classDiagram
class KafkaConsumerAuthIT {
+static final String TOPIC
+static ContainerLocalAuthKafkaService service
+static CamelContextExtension contextExtension
+static AdminClient kafkaAdminClient
-org.apache.kafka.clients.producer.KafkaProducer<String,String> producer
+void before()
+void setKafkaAdminClient()
+void after()
+void createRouteBuilder(CamelContext)
+RouteBuilder createRouteBuilder()
+void kafkaMessageIsConsumedByCamel()
-void populateKafkaTopic(String, byte[])
}
Summary
`KafkaConsumerAuthIT.java` is a robust integration test validating Apache Camel Kafka component's functionality with SASL/JAAS secured Kafka clusters. It sets up a secured Kafka environment, configures Camel routes with proper authentication, sends test messages, and verifies their consumption and header propagation. This test ensures that the Kafka component works seamlessly with authenticated Kafka brokers in real-world scenarios.