KafkaConsumerCustomSubscribeAdapterIT.java
Overview
`KafkaConsumerCustomSubscribeAdapterIT.java` is an integration test class within the Apache Camel Kafka component module. The purpose of this file is to verify the behavior of a **custom Kafka consumer subscription adapter** when used with Camel routes consuming messages from a Kafka topic.
The test ensures that messages produced to a Kafka topic are correctly consumed by a Camel route configured with a custom `SubscribeAdapter` implementation. This adapter modifies the subscription behavior of the Kafka consumer, and the test confirms that the custom subscription logic is invoked as expected.
This file extends a base Kafka test support class and leverages the Camel testing framework (including [MockEndpoint](/projects/289/68546)) to validate message consumption within an isolated integration test environment.
Detailed Explanation
Package and Imports
Package:
org.apache.camel.component.kafka.integrationImports: The class imports Kafka client APIs, Apache Camel components, testing utilities, and JUnit 5 annotations for lifecycle and test management.
Class: KafkaConsumerCustomSubscribeAdapterIT
This is the main test class responsible for setting up Kafka producer and consumer, defining Camel routes, and asserting correct message flow and subscription behavior.
Constants
TOPIC(String): The Kafka topic name"test-subscribe-adapter"used in the test.
Fields
producer(KafkaProducer<String, String>): A Kafka producer instance used to send test messages to the topic.testSubscribeAdapter(TestSubscribeAdapter): A custom subscribe adapter instance registered in the Camel registry, used to intercept and track subscription calls.
Inner Class: TestSubscribeAdapter
This class extends [DefaultSubscribeAdapter](/projects/289/68602) and overrides the `subscribe` method to set a flag indicating the subscription call occurred.
Properties:
subscribeCalled(volatile boolean): Tracks whether thesubscribemethod was invoked.
Methods:
subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo):
Overrides the default subscription method to call the superclass method and then marksubscribeCalledastrue.isSubscribeCalled() : boolean
Returns the value ofsubscribeCalledto verify if subscription happened.
Lifecycle Methods
@BeforeEach before():
Initializes the Kafka producer with default properties before each test case.@AfterEach after():
Closes the Kafka producer and deletes the test topic after each test to clean up.
Camel Route Configuration
createRouteBuilder() : RouteBuilder(overrides base class method)
Defines a Camel route that consumes messages from the Kafka topic"test-subscribe-adapter"with parameters:brokers: Kafka bootstrap servers from the test serviceautoOffsetReset=earliest: Start consuming from earliest offsetconsumersCount=1: Number of consumer instances is 1
The route forwards all consumed messages to a mock endpoint [KafkaTestUtil.MOCK_RESULT](/projects/289/68701) for assertions.
Test Method
kafkaMessagesIsConsumedByCamel() : void
This test verifies the entire message flow:Sets expectations on the mock endpoint for messages
"m1"and"m2"in any order.Produces two Kafka messages with values
"m1"and"m2"to the topic.Asserts that the mock endpoint receives the expected messages.
Asserts that the custom subscribe adapter's
subscribemethod was called (subscribeCalled == true).
Important Implementation Details
Custom Subscription Adapter:
The test registers a customSubscribeAdapter(TestSubscribeAdapter) in the Camel registry under the keyKafkaConstants.KAFKA_SUBSCRIBE_ADAPTER. This adapter intercepts the Kafka consumer subscription process, allowing test verification that the subscription call happens.Kafka Topic Management:
The test deletes the Kafka topic after each test to ensure isolation and avoid side effects between test runs.Camel Kafka Component Integration:
The route configuration directly uses Camel Kafka component URI syntax with dynamic topic and broker parameters.Usage of MockEndpoint:
The test leverages Camel's MockEndpoint to assert the receipt of expected messages, providing a reliable way to verify asynchronous message flows.
Interaction with Other System Components
BaseKafkaTestSupport:
This class extendsBaseKafkaTestSupport, which likely provides common Kafka test setup utilities such as Kafka broker lifecycle management, default properties, and bootstrap server information.Kafka Component:
The test verifies the integration of the Kafka component in Apache Camel, specifically the consumer side with a custom subscription adapter.Kafka Admin Client:
Used to delete test topics during cleanup.Camel Registry:
The custom subscription adapter is bound to the Camel registry to override the default subscription behavior during test execution.KafkaTestUtil:
Provides constants such as the mock endpoint URI used for assertions.
Usage Example
This file itself is a test and not a reusable library class. However, it demonstrates:
How to create a custom subscription adapter by extending DefaultSubscribeAdapter.
How to register the custom adapter with Camel's registry.
How to configure a Camel route to consume from Kafka with the adapter.
How to produce messages for testing and assert their consumption.
Mermaid Class Diagram
classDiagram
class KafkaConsumerCustomSubscribeAdapterIT {
-String TOPIC = "test-subscribe-adapter"
-KafkaProducer<String, String> producer
-TestSubscribeAdapter testSubscribeAdapter
+void before()
+void after()
+RouteBuilder createRouteBuilder()
+void kafkaMessagesIsConsumedByCamel()
}
class TestSubscribeAdapter {
-volatile boolean subscribeCalled
+void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener listener, TopicInfo topicInfo)
+boolean isSubscribeCalled()
}
KafkaConsumerCustomSubscribeAdapterIT o-- TestSubscribeAdapter : uses
Summary
`KafkaConsumerCustomSubscribeAdapterIT.java` is a focused integration test validating that a custom Kafka consumer subscription adapter is invoked correctly in an Apache Camel Kafka consumer route. It produces messages to a Kafka topic, consumes them through Camel routes configured with the custom adapter, and asserts both message receipt and adapter invocation. This test ensures extensibility and correctness of the Kafka consumer subscription mechanism within Camel's Kafka component.