Dynamic Topic Routing

Purpose

Dynamic Topic Routing addresses the need to route Kafka messages to different topics dynamically based on message content or metadata rather than using static topic names. This subtopic enables flexible Kafka topic selection driven by runtime data, such as timestamps embedded in messages or pattern-based transformations on existing topic names or event types. It enhances the routing capabilities within Apache Camel Kafka integration by allowing messages to be redirected to topics constructed or altered via configurable rules, supporting use cases like time-based partitioning, environment-specific routing, or event categorization.

Functionality

Dynamic Topic Routing is implemented through specialized processors that intercept Kafka messages within Camel routes and modify the target topic header before message production. The key processors are:

These processors modify the Kafka topic by setting the `kafka.OVERRIDE_TOPIC` header on the Camel message. When the message reaches the Kafka producer, this override header is used to direct the message to the dynamically determined topic instead of the statically configured one.

Core Workflow Example: RegexRouter

public void process(
        @ExchangeProperty("regex") String regex, @ExchangeProperty("replacement") String replacement, Exchange ex) {
    Pattern regexPattern = Pattern.compile(regex);
    String topicName = ex.getMessage().getHeader("kafka.TOPIC", String.class);
    if (ObjectHelper.isNotEmpty(topicName)) {
        final Matcher matcher = regexPattern.matcher(topicName);
        if (matcher.matches()) {
            final String topicUpdated = matcher.replaceFirst(replacement);
            ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", topicUpdated);
        }
    }
}

MessageTimestampRouter Detailed Steps

Integration

Dynamic Topic Routing processors integrate seamlessly within the parent topic of Message Transformation Utilities. While the parent topic covers a range of JSON field manipulations and static routing transformations, Dynamic Topic Routing uniquely focuses on:

These processors are typically invoked in Camel routes **before** the Kafka producer component serializes and sends messages. By setting the `kafka.OVERRIDE_TOPIC` header, they override the default topic, enabling dynamic routing without altering the producer logic itself.

This subtopic complements other transformation utilities by providing dynamic routing capabilities that adapt topic destinations based on message characteristics, enriching the flexibility and expressiveness of Kafka routing in Camel.

Diagram

flowchart TD
    A[Message Received in Route] --> B[Dynamic Topic Routing Processor]
    B -->|Extract Timestamp or Topic| C{Routing Logic}
    C -->|Timestamp Based| D[Format Topic with Timestamp]
    C -->|Regex Based| E[Apply Regex Replacement]
    D --> F[Set kafka.OVERRIDE_TOPIC Header]
    E --> F
    F --> G[KafkaProducer Sends to Overridden Topic]

This dynamic routing capability enables Kafka topics to be flexibly determined at runtime, enhancing the adaptability of message flows in event-driven architectures.