DefaultSubscribeAdapter.java
Overview
`DefaultSubscribeAdapter.java` is a core utility class within the Apache Camel Kafka component responsible for managing Kafka topic subscriptions for consumers. It implements the `SubscribeAdapter` interface to provide a default mechanism for subscribing Kafka consumers to topics or topic patterns and enforcing the existence of topics when required.
The class handles both static topic subscriptions and dynamic subscriptions via regular expressions (patterns). Additionally, it supports validation to ensure that specified topics exist in the Kafka cluster before proceeding with consumption, helping prevent runtime errors due to missing topics.
Classes and Interfaces
DefaultSubscribeAdapter
This is the main class in the file. It implements the `SubscribeAdapter` interface and encapsulates logic to subscribe a Kafka consumer to topics or patterns, optionally verifying topic existence.
Properties
Property | Type | Description |
|---|---|---|
`topic` | String | The specific topic name to subscribe to (optional). Used mainly for error reporting. |
`topicMustExists` | boolean | Flag indicating if the adapter should verify that the topic exists before subscribing. |
Constructors
DefaultSubscribeAdapter()Creates an instance without a specific topic and disables existence check.
DefaultSubscribeAdapter adapter = new DefaultSubscribeAdapter();DefaultSubscribeAdapter(String topic, boolean topicMustExists)Creates an instance with a specified topic and a flag to enforce topic existence.
DefaultSubscribeAdapter adapter = new DefaultSubscribeAdapter("myTopic", true);
Methods
void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo)Subscribes the given Kafka consumer to topics or patterns specified in
topicInfoand registers the provided rebalance listener.Parameters:
Parameter
Type
Description
`consumer`
`Consumer`
Kafka consumer instance to subscribe.
`reBalanceListener`
`ConsumerRebalanceListener`
Listener to handle partition rebalance events.
`topicInfo`
`TopicInfo`
Encapsulates topics or pattern information for subscription.
Functionality:
Logs the topic(s) or pattern to which the consumer is subscribing.
If
topicInfocontains a pattern, subscribes the consumer using the pattern.Otherwise, subscribes the consumer to the explicit list of topics.
If
topicMustExistsistrue, verifies that at least one topic matching the pattern or topic list exists in the Kafka cluster.Throws
UnknownTopicOrPartitionExceptionif no matching topic is found andtopicMustExistsis enabled.
**Throws:**
UnknownTopicOrPartitionException— if topic existence validation is enabled and no matching topic is found.
**Usage Example:**
TopicInfo topicInfo = new TopicInfo(Arrays.asList("topicA", "topicB")); // example usage Consumer<String, String> consumer = ...; // initialized Kafka consumer ConsumerRebalanceListener listener = ...; // implemented listener DefaultSubscribeAdapter adapter = new DefaultSubscribeAdapter("topicA", true); adapter.subscribe(consumer, listener, topicInfo);
Important Implementation Details
Topic Subscription Modes:
The class supports two modes of topic subscription:Static Subscription: Subscribing to an explicit list of topics.
Pattern-based Subscription: Subscribing to topics matching a regular expression pattern.
Topic Existence Validation:
WhentopicMustExistsistrue, after subscribing, the adapter retrieves the current list of topics from Kafka usingconsumer.listTopics(). It iterates through these topics to confirm if any topic matches the subscription criteria (either matches the pattern or is in the topic list). If no match is found, it throws anUnknownTopicOrPartitionException, preventing consumers from silently subscribing to non-existent topics.Logging:
Utilizes SLF4J logging at the INFO level to log the subscription target, aiding in debugging and operational transparency.Integration with TopicHelper:
For logging, it leveragesTopicHelper.getPrintableTopic(topicInfo)to generate a human-readable representation of the subscription target.
Interaction with Other System Components
Kafka Consumer API:
Uses Kafka'sConsumerinterface for subscribing to topics and managing consumer state.ConsumerRebalanceListener:
Registers rebalance listeners to handle partition assignment changes gracefully.TopicInfo:
Relies on theTopicInfoclass to encapsulate subscription details such as topic lists or patterns. This abstraction allows flexible subscription management.TopicHelper:
Uses utility methods fromTopicHelperfor logging purposes.Exception Handling:
Throws Kafka-specificUnknownTopicOrPartitionExceptionto signal issues with topic availability, aligning with Kafka client standards.Apache Camel Kafka Component:
Functions as part of the consumer support subscription mechanism in the Camel Kafka component, facilitating Kafka consumer configuration and management within Camel routes.
Diagram: Class Structure of DefaultSubscribeAdapter.java
classDiagram
class DefaultSubscribeAdapter {
-String topic
-boolean topicMustExists
+DefaultSubscribeAdapter()
+DefaultSubscribeAdapter(String topic, boolean topicMustExists)
+void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo)
}
DefaultSubscribeAdapter ..|> SubscribeAdapter
Summary
`DefaultSubscribeAdapter` provides a robust, configurable way to subscribe Kafka consumers to topics or patterns, with the option to enforce topic existence to avoid runtime issues. By abstracting the subscription logic and integrating with the Kafka consumer APIs, it simplifies the subscription process within the Apache Camel Kafka component, enabling reliable and flexible Kafka integrations.
If you require further information on related classes such as `TopicInfo`, `SubscribeAdapter`, or the Apache Camel Kafka component architecture, please refer to their respective documentation.