KafkaSendDynamicAware.java
Overview
The `KafkaSendDynamicAware` class is part of the Apache Camel Kafka component and implements the `SendDynamicAware` interface. It provides an optimization for sending messages dynamically to Kafka topics using the Camel **toD** ("dynamic to") DSL. Instead of creating a new Kafka endpoint URI for every dynamic topic, this class allows using a static Kafka endpoint and overrides the topic dynamically via the message header `KafkaConstants.OVERRIDE_TOPIC`.
This approach reduces endpoint creation overhead and improves performance when sending messages to multiple Kafka topics dynamically within the same route.
Class: KafkaSendDynamicAware
Package
`org.apache.camel.component.kafka`
Purpose
Implements dynamic sending awareness for Kafka endpoints in Apache Camel, enabling topic override via message headers rather than URI parameters.
Annotations
@SendDynamic("kafka") — registers this class as the dynamic sender handler for the Kafka component scheme.
Inheritance
Extends
ServiceSupport— provides lifecycle management support.Implements
SendDynamicAware— interface for supporting dynamic sending in Camel components.
Properties
Property | Type | Description |
|---|---|---|
`camelContext` | `CamelContext` | The Camel context instance in which this runs. |
`scheme` | `String` | The URI scheme this class handles, i.e., `"kafka"`. |
Methods
String getScheme()
Description: Returns the scheme this dynamic sender handles, typically
"kafka".Returns:
String— the scheme.Example Usage:
String scheme = kafkaSendDynamicAware.getScheme(); // returns "kafka"
void setScheme(String scheme)
Description: Sets the scheme handled by this instance.
Parameters:
scheme— the scheme string, e.g.,"kafka".
Example Usage:
kafkaSendDynamicAware.setScheme("kafka");
CamelContext getCamelContext()
Description: Gets the CamelContext associated with this instance.
Returns:
CamelContextExample Usage:
CamelContext context = kafkaSendDynamicAware.getCamelContext();
void setCamelContext(CamelContext camelContext)
Description: Sets the CamelContext for this instance.
Parameters:
camelContext— the CamelContext to set.
Example Usage:
kafkaSendDynamicAware.setCamelContext(context);
boolean isLenientProperties()
Description: Indicates whether lenient property binding is used.
Returns:
false— Kafka producer expects strict property binding.Usage: Used by Camel to determine property binding behavior.
DynamicAwareEntry prepare(Exchange exchange, String uri, String originalUri) throws Exception
Description: Prepares a
DynamicAwareEntryfor the given exchange and URIs. This method is called before message routing to handle dynamic sending.Parameters:
exchange— the current message exchange.uri— the potentially dynamic endpoint URI.originalUri— the original endpoint URI configured.
Returns:
DynamicAwareEntryencapsulating URIs for dynamic processing.Implementation Detail: Returns a new entry with the given URIs and null for other optional arguments.
Example Usage:
DynamicAwareEntry entry = kafkaSendDynamicAware.prepare(exchange, dynamicUri, originalUri);
String resolveStaticUri(Exchange exchange, DynamicAwareEntry entry) throws Exception
Description: Attempts to resolve a static URI from a dynamic one by comparing topic names.
Parameters:
exchange— the current exchange.entry— the preparedDynamicAwareEntrycontaining URIs.
Returns:
String— a static endpoint URI to use if the topic is dynamic, otherwisenull.Algorithm:
Parse the topic name from the dynamic URI.
Parse the topic name from the original URI.
If the topics differ, replace the topic in the dynamic URI with the original topic and return it.
Example Usage:
String staticUri = kafkaSendDynamicAware.resolveStaticUri(exchange, entry); if (staticUri != null) { // use staticUri as endpoint }
Processor createPreProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception
Description: Creates a pre-processor that sets the
KafkaConstants.OVERRIDE_TOPICheader if it is not already set.Parameters:
exchange— the current message exchange.entry— theDynamicAwareEntrywith URI information.
Returns:
Processorinstance ornull.Implementation Detail:
If the message already has the override topic header, returns
null(no processor needed).Otherwise, returns a processor that sets the header to the topic parsed from the endpoint URI.
Example Usage:
Processor preProcessor = kafkaSendDynamicAware.createPreProcessor(exchange, entry); if (preProcessor != null) { preProcessor.process(exchange); }
Processor createPostProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception
Description: Creates a post-processor executed after the send operation.
Returns: Always returns
nullbecause no post-processing is needed for Kafka dynamic sending.Example Usage: Not applicable.
private String parseTopicName(String uri)
Description: Extracts the Kafka topic name from the endpoint URI.
Parameters:
uri— the Kafka endpoint URI (e.g.,"kafka:myTopic?param=value").
Returns:
String— the topic name part of the URI, ornullif unable to parse.Algorithm:
Strip query parameters by cutting off at
"?".Locate the first colon
":"and extract the substring after it.Remove any leading slashes (
"//").
Example:
parseTopicName("kafka:myTopic?brokers=localhost") // returns "myTopic" parseTopicName("kafka://myTopic") // returns "myTopic"
Important Implementation Details
Dynamic Topic Override: The core functionality is allowing the dynamic override of Kafka topic names via the header
KafkaConstants.OVERRIDE_TOPIC. This enables the reuse of a single static Kafka endpoint URI and producer instance, improving resource usage and performance.URI Parsing: The method
parseTopicNamecarefully extracts the topic from the Kafka endpoint URI, handling optional query parameters and different URI formats.Lifecycle Management: Extends
ServiceSupportfor proper lifecycle management although no explicit start/stop logic is defined here.No Post Processing: Since Kafka message sending does not require additional cleanup or processing after the message is sent, the post-processor method returns
null.Lenient Properties: Returns
falseto indicate strict property validation, reflecting Kafka component’s configuration requirements.
Interaction with Other Components
Apache Camel Framework: Implements
SendDynamicAwareto integrate with Camel’s dynamic routing capabilities. It is invoked automatically when usingtoD("kafka:topic").Kafka Component: Works in conjunction with Camel’s Kafka component to send messages to Kafka brokers.
Exchange and Message: Uses Camel’s
ExchangeandMessageAPIs to read and write headers and manipulate the message during routing.KafkaConstants: Uses the constant
KafkaConstants.OVERRIDE_TOPICto set the dynamic topic header.
Usage Example
from("direct:start")
.toD("kafka:defaultTopic");
// Internally:
// For each message, KafkaSendDynamicAware sets the header KafkaConstants.OVERRIDE_TOPIC
// to the topic parsed from the endpoint URI or overridden by the user, allowing dynamic topic routing.
Mermaid Class Diagram
classDiagram
class KafkaSendDynamicAware {
-CamelContext camelContext
-String scheme
+String getScheme()
+void setScheme(String scheme)
+CamelContext getCamelContext()
+void setCamelContext(CamelContext camelContext)
+boolean isLenientProperties()
+DynamicAwareEntry prepare(Exchange exchange, String uri, String originalUri) throws Exception
+String resolveStaticUri(Exchange exchange, DynamicAwareEntry entry) throws Exception
+Processor createPreProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception
+Processor createPostProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception
-String parseTopicName(String uri)
}
KafkaSendDynamicAware --|> ServiceSupport
KafkaSendDynamicAware ..|> SendDynamicAware
Summary
`KafkaSendDynamicAware` is a specialized helper class used by Camel's Kafka component to enable efficient dynamic sending of messages to Kafka topics. It achieves this by reusing a static endpoint and dynamically setting the target topic via message headers, thus optimizing the usage of Kafka producers and reducing endpoint URI churn in Camel routes. This class carefully parses URIs, manages headers, and integrates seamlessly with the Camel routing engine’s dynamic send mechanism (`toD`).