Dynamic Topic Routing
Purpose
Dynamic Topic Routing addresses the need to route Kafka messages to different topics dynamically based on message content or metadata rather than using static topic names. This subtopic enables flexible Kafka topic selection driven by runtime data, such as timestamps embedded in messages or pattern-based transformations on existing topic names or event types. It enhances the routing capabilities within Apache Camel Kafka integration by allowing messages to be redirected to topics constructed or altered via configurable rules, supporting use cases like time-based partitioning, environment-specific routing, or event categorization.
Functionality
Dynamic Topic Routing is implemented through specialized processors that intercept Kafka messages within Camel routes and modify the target topic header before message production. The key processors are:
TimestampRouter:
Constructs dynamic topic names by embedding formatted timestamps extracted from message headers into a topic name pattern. For example, it can route messages to topics suffixed with dates (orders-2024-06-15), supporting time-partitioned topic organization.MessageTimestampRouter:
Similar toTimestampRouterbut extracts timestamp values from message JSON body fields rather than headers. It supports parsing timestamps in various formats and multiple timestamp keys, allowing dynamic routing based on message payload content.RegexRouter:
Applies regex pattern matching and replacement to existing topic names or CloudEventce-typeheaders. This enables renaming or redirecting topics based on defined regular expressions, facilitating environment-based routing or standardized topic naming conventions.
These processors modify the Kafka topic by setting the `kafka.OVERRIDE_TOPIC` header on the Camel message. When the message reaches the Kafka producer, this override header is used to direct the message to the dynamically determined topic instead of the statically configured one.
Core Workflow Example: RegexRouter
public void process(
@ExchangeProperty("regex") String regex, @ExchangeProperty("replacement") String replacement, Exchange ex) {
Pattern regexPattern = Pattern.compile(regex);
String topicName = ex.getMessage().getHeader("kafka.TOPIC", String.class);
if (ObjectHelper.isNotEmpty(topicName)) {
final Matcher matcher = regexPattern.matcher(topicName);
if (matcher.matches()) {
final String topicUpdated = matcher.replaceFirst(replacement);
ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", topicUpdated);
}
}
}
Reads the configured regex and replacement strings from exchange properties.
Matches the regex against the current topic name.
If matched, replaces the topic name according to the replacement pattern.
Sets the new topic name on the
kafka.OVERRIDE_TOPICheader for downstream processing.
MessageTimestampRouter Detailed Steps
Parses timestamp keys and formats from exchange properties.
Extracts JSON body and converts it into a map.
Searches for timestamp values in specified keys.
Parses and formats the timestamp into a string according to the configured format.
Constructs the new topic name by replacing placeholders
$[topic]and$[timestamp]in the topic format string.Sets the computed topic as the override topic header.
/
Integration
Dynamic Topic Routing processors integrate seamlessly within the parent topic of Message Transformation Utilities. While the parent topic covers a range of JSON field manipulations and static routing transformations, Dynamic Topic Routing uniquely focuses on:
Runtime topic name determination, which is not addressed by static JSON field changes.
Flexible, pattern-based topic renaming, enabling regex-driven routing logic.
Timestamp-based dynamic partitioning strategies that utilize message metadata or content dates.
These processors are typically invoked in Camel routes **before** the Kafka producer component serializes and sends messages. By setting the `kafka.OVERRIDE_TOPIC` header, they override the default topic, enabling dynamic routing without altering the producer logic itself.
This subtopic complements other transformation utilities by providing dynamic routing capabilities that adapt topic destinations based on message characteristics, enriching the flexibility and expressiveness of Kafka routing in Camel.
Diagram
flowchart TD
A[Message Received in Route] --> B[Dynamic Topic Routing Processor]
B -->|Extract Timestamp or Topic| C{Routing Logic}
C -->|Timestamp Based| D[Format Topic with Timestamp]
C -->|Regex Based| E[Apply Regex Replacement]
D --> F[Set kafka.OVERRIDE_TOPIC Header]
E --> F
F --> G[KafkaProducer Sends to Overridden Topic]
The message enters the route and passes through a Dynamic Topic Routing processor.
The processor decides which routing logic applies (timestamp-based or regex-based).
The topic name is transformed accordingly.
The transformed topic name is set as an override header.
KafkaProducer reads the override header and sends the message to the dynamically resolved topic.
This dynamic routing capability enables Kafka topics to be flexibly determined at runtime, enhancing the adaptability of message flows in event-driven architectures.