KafkaConsumerTopicIsPatternIT.java
Overview
`KafkaConsumerTopicIsPatternIT.java` is an integration test class within the Apache Camel Kafka component module. Its primary purpose is to verify the functionality of consuming Kafka messages from topics matched by a regular expression pattern, rather than specifying a single topic name explicitly.
The test validates that the Kafka consumer properly subscribes to topics whose names match the given pattern and receives messages accordingly. It also ensures that auto-commit behavior and consumer interceptors work as expected in this scenario.
This file extends a base test class (`BaseKafkaTestSupport`) that provides shared setup and utilities for Kafka integration testing.
Class Summary
KafkaConsumerTopicIsPatternIT
This class contains:
Constants:
TOPIC: A specific Kafka topic name used in the test ("vess123d").TOPIC_PATTERN: A regex pattern ("v.*d") to match topic names for the consumer subscription.
Fields:
producer: A Kafka producer instance used to send test messages to Kafka.
Lifecycle Methods:
before(): Initializes the Kafka producer before each test and clears captured records in the mock consumer interceptor.after(): Closes the Kafka producer and deletes the test topic after each test.
Route Definition:
Overrides
createRouteBuilder()to define a Camel route consuming from Kafka topics matching the pattern, forwarding messages to a mock endpoint for assertions.
Test Method:
kafkaTopicIsPattern(): Sends messages to the test topic and asserts that the consumer receives the expected messages with correct headers and behavior.
Detailed Explanation
Constants
Constant | Type | Description |
|---|---|---|
`TOPIC` | String | Name of the Kafka topic used in the test. |
`TOPIC_PATTERN` | String | Regex pattern to match topics for consumption. |
Fields
Field | Type | Description |
|---|---|---|
`producer` | `KafkaProducer` | Kafka producer instance to send messages to Kafka. |
Lifecycle Methods
before()
@BeforeEach
public void before()
Purpose:
Sets up the Kafka producer before each test execution and resets the mock consumer interceptor's captured records.Details:
Calls
getDefaultProperties()(inherited fromBaseKafkaTestSupport) to get Kafka producer configuration.Instantiates a new
KafkaProducerwith these properties.Clears the static
recordsCapturedlist inMockConsumerInterceptorto ensure no residual data from previous tests.
Usage Example:
Automatically invoked by JUnit before each test method.
after()
@AfterEach
public void after()
Purpose:
Cleans up resources after each test.Details:
Closes the Kafka producer if initialized.
Deletes the test topic from Kafka using
kafkaAdminClient(inherited from the base class), ensuring no leftover data affects other tests.
Usage Example:
Automatically invoked by JUnit after each test method.
Route Definition
createRouteBuilder()
@Override
protected RouteBuilder createRouteBuilder()
Purpose:
Defines the Camel route used for the integration test.Details:
The route consumes messages from Kafka topics matching the regex pattern specified by
TOPIC_PATTERN.Uses the
topicIsPattern=trueoption to enable regex topic subscription.Other Kafka consumer options set:
groupId=KafkaConsumerTopicIsPatternIT— consumer group ID.autoOffsetReset=earliest— start from earliest offset if no committed offset.autoCommitEnable=trueandautoCommitIntervalMs=1000— enable auto-commit with interval.pollTimeoutMs=1000— poll timeout.interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor— class for intercepting consumed records.metadataMaxAgeMs=1000— frequency of metadata refresh.
The route sends consumed messages to a mock endpoint defined by
KafkaTestUtil.MOCK_RESULTfor test verification.
Usage Example:
from("kafka:" + TOPIC_PATTERN + "?topicIsPattern=true&groupId=KafkaConsumerTopicIsPatternIT&autoOffsetReset=earliest" +
"&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true" +
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor&metadataMaxAgeMs=1000")
.to(KafkaTestUtil.MOCK_RESULT);
Test Methods
kafkaTopicIsPattern()
@Test
public void kafkaTopicIsPattern() throws Exception
Purpose:
Tests that messages sent to a topic matching the subscribed pattern are correctly consumed by the Camel route.Parameters:
None.Returns:
voidTest Workflow:
Obtains the mock endpoint
KafkaTestUtil.MOCK_RESULTto assert received messages.Configures expectations:
Message count: 5.
Bodies:
"message-0"through"message-4"in any order.Header
KafkaConstants.TOPIC: all messages must have the actual topic nameTOPIC.Header
KafkaConstants.LAST_RECORD_BEFORE_COMMIT: must benullfor all messages because auto-commit is enabled.
Produces 5 messages to the Kafka topic specified by
TOPIC.Asserts that the mock endpoint expectations are satisfied within 3000ms.
Verifies that the
MockConsumerInterceptorcaptured exactly 5 records corresponding to the test topic.
Usage Example:
// This is executed automatically by JUnit during test runs
kafkaTopicIsPattern();
Assertions:
The route properly subscribes to topics matching the regex pattern.
Messages are received with the correct body and headers.
Auto-commit behavior disables the "last record before commit" header.
The interceptor captures the expected number of records.
Important Implementation Details
Topic Subscription via Pattern:
The Kafka consumer URI includes the parametertopicIsPattern=true, which instructs the Kafka consumer to subscribe to topics matched by the regex patternv.*d. This is a key Kafka client feature allowing dynamic topic subscription.MockConsumerInterceptor:
Used to intercept and capture consumed Kafka records for validation within the test. This helps verify that the consumer behaves as expected beyond the Camel routing layer.Auto Commit Behavior:
The test explicitly enablesautoCommitEnable=trueand verifies that theKafkaConstants.LAST_RECORD_BEFORE_COMMITheader is not set on any message, confirming auto-commit disables this behavior.Topic Cleanup:
After each test, the created topic is deleted via the Kafka admin client, ensuring no side effects between tests.
Interaction with Other System Components
BaseKafkaTestSupport:
Inherits Kafka test utility setup, configuration properties, and the Kafka admin client for topic management.KafkaTestUtil:
Provides constants such asMOCK_RESULT, the mock endpoint URI where messages are sent for assertions.MockConsumerInterceptor:
A Kafka consumer interceptor class that captures consumed records to verify low-level Kafka client behavior.Apache Camel Framework:
Uses Camel'sRouteBuilderto configure Kafka consumers and routes, andMockEndpointfor testing.JUnit:
Uses JUnit 5 annotations for lifecycle management and test execution.
Usage Scenario
This integration test is run as part of the continuous integration pipeline to ensure that the Kafka consumer component in Apache Camel correctly supports topic pattern subscriptions. It helps maintain the reliability of consuming from dynamically named topics in Kafka without specifying each topic explicitly.
Visual Diagram
classDiagram
class KafkaConsumerTopicIsPatternIT {
+String TOPIC
+String TOPIC_PATTERN
-KafkaProducer<String, String> producer
+void before()
+void after()
+RouteBuilder createRouteBuilder()
+void kafkaTopicIsPattern()
}
KafkaConsumerTopicIsPatternIT --|> BaseKafkaTestSupport : extends
KafkaConsumerTopicIsPatternIT ..> KafkaProducer : uses
KafkaConsumerTopicIsPatternIT ..> RouteBuilder : creates
KafkaConsumerTopicIsPatternIT ..> MockEndpoint : verifies
KafkaConsumerTopicIsPatternIT ..> MockConsumerInterceptor : verifies captured records
Summary
`KafkaConsumerTopicIsPatternIT.java` is a focused integration test class verifying Apache Camel Kafka component’s ability to consume messages from topics selected by regex pattern subscription. It tests key Kafka consumer configurations, interceptor usage, and message receipt correctness, ensuring robustness of the dynamic topic consumption feature within the Camel Kafka component.
The test is a vital part of the Kafka component's test suite, helping prevent regressions and guaranteeing reliable Kafka integration in real-world use cases involving pattern-based topic consumption.