MessageTimestampRouter.java


Overview

`MessageTimestampRouter` is a specialized processor class designed for Apache Camel routes integrating with Kafka. Its primary purpose is to dynamically determine and override the Kafka topic for outgoing messages based on timestamps extracted from fields within the JSON message body.

Rather than relying on Kafka headers or static topic names, this router parses timestamps embedded in the message payload, formats these timestamps according to a specified pattern, and injects the resulting timestamp string into a configurable topic name format. The dynamically constructed topic is then set as an override header to direct Kafka producers to route the message accordingly.

This class enables flexible time-based partitioning or routing of Kafka messages where timestamps are stored inside JSON payload fields, supporting multiple timestamp keys, diverse timestamp input formats, and customizable topic naming patterns.


Class: MessageTimestampRouter

Package

`org.apache.camel.component.kafka.transform`

Purpose

Processes Apache Camel `Exchange` messages to:


Method: process

public void process(
    @ExchangeProperty("topicFormat") String topicFormat,
    @ExchangeProperty("timestampFormat") String timestampFormat,
    @ExchangeProperty("timestampKeys") String timestampKeys,
    @ExchangeProperty("timestampKeyFormat") String timestampKeyFormat,
    Exchange ex
) throws ParseException

Description

This method performs the core logic of extracting and formatting timestamps and dynamically generating the Kafka topic name.

Parameters

Parameter

Type

Description

`topicFormat`

String

A string pattern defining the Kafka topic format, which may contain placeholders `$[topic]` and `$[timestamp]`. Example: `"my-topic-$[timestamp]"`.

`timestampFormat`

String

The output timestamp format pattern used to format the extracted timestamp into a string. Example: `"yyyy-MM-dd"`.

`timestampKeys`

String

Comma-separated list of JSON field names to search for timestamp values inside the message body. Example: `"createdAt,eventTime"`.

`timestampKeyFormat`

String

The input format pattern of the timestamp string in the message body fields. If equal to `"timestamp"`, the raw timestamp is treated as a long epoch millis. Otherwise, it is parsed using this format. Example: `"yyyy-MM-dd'T'HH:mm:ss'Z'"`.

`ex`

Exchange

The Apache Camel Exchange object representing the current message processing context, including message body and headers.

Throws

Returns

Usage Example

Within an Apache Camel route, you could configure and invoke this processor as:

MessageTimestampRouter router = new MessageTimestampRouter();
router.process(
    "my-topic-$[timestamp]",
    "yyyy-MM-dd",
    "createdAt,eventTime",
    "yyyy-MM-dd'T'HH:mm:ss'Z'",
    exchange
);

This would:


Detailed Behavior and Implementation Notes

  1. Placeholder Patterns

    Two placeholders are recognized inside the topicFormat string:

    • $[topic] — replaced by the current Kafka topic header value, or empty string if not present.

    • $[timestamp] — replaced by the formatted timestamp extracted from the message body.

    These replacements are done using `java.util.regex.Pattern` with `Pattern.LITERAL` to treat the placeholders as literal strings.

  2. Timestamp Extraction

    • The processor uses Jackson's ObjectMapper to convert the JSON message body into a Map<Object,Object>.

    • It iterates over the list of timestampKeys (split by commas) and attempts to retrieve the first non-empty timestamp value from the JSON body.

    • If no timestamp key is found or the value is empty, no routing override occurs.

  3. Timestamp Parsing

    • If timestampKeyFormat is non-empty and not equal to "timestamp", the timestamp value is parsed as a date string using SimpleDateFormat with UTC timezone.

    • If timestampKeyFormat equals "timestamp" or is empty, the timestamp is treated as a raw long integer representing milliseconds since epoch.

    • Parsing errors throw ParseException.

  4. Timestamp Formatting

    • Once the timestamp is parsed into a Long millisecond value, it is formatted into a string using the timestampFormat pattern and UTC timezone.

  5. Topic Construction

    • The original topicFormat string is processed, replacing $[topic] and $[timestamp] placeholders with their resolved values.

    • If the Kafka topic header (kafka.TOPIC) is not present, $[topic] is replaced with an empty string.

    • The final topic string is set on the exchange as the header kafka.OVERRIDE_TOPIC.

  6. Exchange Mutation

    • The method does not modify the message body.

    • It only sets or overrides the header kafka.OVERRIDE_TOPIC to direct Kafka producers to the new topic.


Interaction with Other System Components


Visual Diagram: Class Structure and Method

classDiagram
    class MessageTimestampRouter {
        +void process(String topicFormat, String timestampFormat, String timestampKeys, String timestampKeyFormat, Exchange ex) throws ParseException
    }

Summary

`MessageTimestampRouter` provides a robust mechanism for routing Kafka messages dynamically based on timestamps embedded inside JSON message bodies. By supporting multiple timestamp keys, flexible timestamp input formats, and dynamic topic name templates with placeholders, it enables time-based or content-driven Kafka topic routing that is crucial for advanced data partitioning and organization strategies in event-driven systems.


Appendix: Placeholder Usage in topicFormat

Placeholder

Description

Example Usage

`$[topic]`

Inserts the current Kafka topic name header

`"logs-$[topic]-$[timestamp]"` → `"logs-app1-2024-06-15"`

`$[timestamp]`

Inserts the formatted timestamp string

`"events-$[timestamp]"` → `"events-2024-06-15"`


Notes


If you require integration examples or further customization guidance, please refer to the **Dynamic Topic Routing** section of the Message Transformation Utilities documentation.