TimestampRouter.java


Overview

`TimestampRouter.java` is a utility class within the Apache Camel Kafka integration framework that facilitates **dynamic Kafka topic routing based on timestamps** embedded in message headers. The primary function of this class is to process incoming Kafka messages in Camel routes, extract and format a timestamp value from a message header, and then dynamically construct a Kafka topic name by embedding the formatted timestamp and original topic name into a configurable topic pattern.

This dynamic topic name is then set as an override header, `kafka.OVERRIDE_TOPIC`, which downstream Kafka producer components honor to route messages to the computed topic rather than a statically configured one.


Class: TimestampRouter

Description

`TimestampRouter` provides a single method, `process`, which performs the following key tasks:

This capability supports use cases such as time-based topic partitioning, where Kafka topics are suffixed or prefixed with timestamps representing message creation or event time.


Method: process

public void process(
        @ExchangeProperty("topicFormat") String topicFormat,
        @ExchangeProperty("timestampFormat") String timestampFormat,
        @ExchangeProperty("timestampHeaderName") String timestampHeaderName,
        Exchange ex)

Parameters

Parameter

Type

Description

`topicFormat`

String

Topic name pattern string containing placeholders `$[topic]` and `$[timestamp]` for substitution.

`timestampFormat`

String

Timestamp format string compatible with `SimpleDateFormat` (e.g., "yyyy-MM-dd").

`timestampHeaderName`

String

The name of the message header from which to extract the timestamp value (e.g., "eventTimestamp").

`ex`

Exchange

The Apache Camel Exchange object representing the current message exchange context.

Returns

Functionality Detail

  1. Placeholder Patterns Setup:

    • Defines literal regex patterns for the placeholders $[topic] and $[timestamp] to identify them in the topic format string.

  2. Timestamp Formatter:

    • Creates a SimpleDateFormat instance with the provided timestampFormat.

    • Sets the formatter timezone to UTC to standardize timestamp output.

  3. Timestamp Extraction and Parsing:

    • Obtains the current Kafka topic from the header "kafka.TOPIC".

    • Reads the timestamp from the specified header name (timestampHeaderName).

    • Converts the timestamp to a Long value representing milliseconds since epoch.

    • Supports multiple timestamp types:

      • Long

      • Instant (converted via toEpochMilli())

      • String or other convertible types parsed as Long.

  4. Topic Name Construction:

    • If a valid timestamp is found:

      • Formats the timestamp into a string using the configured formatter.

      • Replaces the $[topic] placeholder in the topicFormat with the original topic name (or empty string if absent).

      • Replaces the $[timestamp] placeholder with the formatted timestamp string.

    • Sets the resulting string as the header "kafka.OVERRIDE_TOPIC" on the message.

Usage Example

Suppose you want to route messages to topics formatted as `orders-$[timestamp]` where the timestamp is in `yyyy-MM-dd` format extracted from the header `eventTimestamp`.

TimestampRouter router = new TimestampRouter();

Exchange exchange = ...; // a Camel Exchange representing the current message
exchange.getMessage().setHeader("kafka.TOPIC", "orders");
exchange.getMessage().setHeader("eventTimestamp", Instant.now());

router.process(
    "orders-$[timestamp]",
    "yyyy-MM-dd",
    "eventTimestamp",
    exchange
);

String overriddenTopic = exchange.getMessage().getHeader("kafka.OVERRIDE_TOPIC", String.class);
// overriddenTopic might be "orders-2024-06-15"

Important Implementation Details


Interaction with Other System Components


Mermaid Class Diagram

classDiagram
    class TimestampRouter {
        +void process(String topicFormat, String timestampFormat, String timestampHeaderName, Exchange ex)
    }
    TimestampRouter ..> Exchange : uses
    Exchange o-- Message : contains

Summary

`TimestampRouter.java` is a focused utility class designed for dynamic Kafka topic routing within Apache Camel Kafka integrations. By extracting timestamps from message headers and formatting them according to user-defined patterns, it enables flexible topic naming schemes such as date-based partitions. Its design supports multiple timestamp data types, uses safe and literal string substitution, and integrates seamlessly with Camel’s exchange and header mechanisms, making it a robust and reusable processor in event-driven Kafka pipelines.