TimestampRouter.java
Overview
`TimestampRouter.java` is a utility class within the Apache Camel Kafka integration framework that facilitates **dynamic Kafka topic routing based on timestamps** embedded in message headers. The primary function of this class is to process incoming Kafka messages in Camel routes, extract and format a timestamp value from a message header, and then dynamically construct a Kafka topic name by embedding the formatted timestamp and original topic name into a configurable topic pattern.
This dynamic topic name is then set as an override header, `kafka.OVERRIDE_TOPIC`, which downstream Kafka producer components honor to route messages to the computed topic rather than a statically configured one.
Class: TimestampRouter
Description
`TimestampRouter` provides a single method, `process`, which performs the following key tasks:
Extracts a timestamp value from a specified header in the Camel Exchange message.
Converts and formats the timestamp into a string based on a user-configured pattern, enforcing UTC timezone.
Substitutes placeholders
$[topic]and$[timestamp]in a topic format string with the original topic name and formatted timestamp respectively.Sets the dynamically constructed topic name on the message header
kafka.OVERRIDE_TOPICto override the default Kafka topic routing.
This capability supports use cases such as time-based topic partitioning, where Kafka topics are suffixed or prefixed with timestamps representing message creation or event time.
Method: process
public void process(
@ExchangeProperty("topicFormat") String topicFormat,
@ExchangeProperty("timestampFormat") String timestampFormat,
@ExchangeProperty("timestampHeaderName") String timestampHeaderName,
Exchange ex)
Parameters
Parameter | Type | Description |
|---|---|---|
`topicFormat` | String | Topic name pattern string containing placeholders `$[topic]` and `$[timestamp]` for substitution. |
`timestampFormat` | String | Timestamp format string compatible with `SimpleDateFormat` (e.g., "yyyy-MM-dd"). |
`timestampHeaderName` | String | The name of the message header from which to extract the timestamp value (e.g., "eventTimestamp"). |
`ex` | Exchange | The Apache Camel Exchange object representing the current message exchange context. |
Returns
void— the method modifies the exchange in place by setting the overridden topic header if a valid timestamp is found.
Functionality Detail
Placeholder Patterns Setup:
Defines literal regex patterns for the placeholders
$[topic]and$[timestamp]to identify them in the topic format string.
Timestamp Formatter:
Creates a
SimpleDateFormatinstance with the providedtimestampFormat.Sets the formatter timezone to UTC to standardize timestamp output.
Timestamp Extraction and Parsing:
Obtains the current Kafka topic from the header
"kafka.TOPIC".Reads the timestamp from the specified header name (
timestampHeaderName).Converts the timestamp to a
Longvalue representing milliseconds since epoch.Supports multiple timestamp types:
LongInstant(converted viatoEpochMilli())String or other convertible types parsed as
Long.
Topic Name Construction:
If a valid timestamp is found:
Formats the timestamp into a string using the configured formatter.
Replaces the
$[topic]placeholder in thetopicFormatwith the original topic name (or empty string if absent).Replaces the
$[timestamp]placeholder with the formatted timestamp string.
Sets the resulting string as the header
"kafka.OVERRIDE_TOPIC"on the message.
Usage Example
Suppose you want to route messages to topics formatted as `orders-$[timestamp]` where the timestamp is in `yyyy-MM-dd` format extracted from the header `eventTimestamp`.
TimestampRouter router = new TimestampRouter();
Exchange exchange = ...; // a Camel Exchange representing the current message
exchange.getMessage().setHeader("kafka.TOPIC", "orders");
exchange.getMessage().setHeader("eventTimestamp", Instant.now());
router.process(
"orders-$[timestamp]",
"yyyy-MM-dd",
"eventTimestamp",
exchange
);
String overriddenTopic = exchange.getMessage().getHeader("kafka.OVERRIDE_TOPIC", String.class);
// overriddenTopic might be "orders-2024-06-15"
Important Implementation Details
Placeholder Replacement Using Regex:
The class uses
Pattern.compilewithPattern.LITERALflag to precisely match the literal placeholders$[topic]and$[timestamp]in the topic format string.This approach avoids regex meta-character interpretation and allows safe substitution.
Timestamp Type Flexibility:
Supports multiple types for the timestamp header, making the processor more robust and interoperable.
Uses Apache Camel's
ObjectHelper.isNotEmptyto verify presence before parsing.
UTC Timezone Enforcement:
Ensures consistent timestamp formatting regardless of system or JVM default timezones.
Graceful Handling of Missing Topic or Timestamp:
If the original topic is missing, it substitutes an empty string.
If the timestamp is missing or cannot be parsed, no topic override occurs.
No External Dependencies Beyond Core Java and Camel:
Uses standard Java
SimpleDateFormatandDateclasses.Uses Camel utilities for header and property access.
Interaction with Other System Components
Camel Exchange and Message Headers:
Reads input from Camel Exchange properties and message headers.
Writes the override topic to the message header
kafka.OVERRIDE_TOPIC.
Kafka Producer Component:
The Kafka producer component downstream reads the
kafka.OVERRIDE_TOPICheader.Uses this overridden topic name to route the message dynamically, overriding the statically configured topic.
Other Dynamic Topic Routing Processors:
Works alongside processors like
RegexRouterandMessageTimestampRouterwithin the message transformation utilities.Provides timestamp-based routing complementing regex-based or body-based routing processors.
Mermaid Class Diagram
classDiagram
class TimestampRouter {
+void process(String topicFormat, String timestampFormat, String timestampHeaderName, Exchange ex)
}
TimestampRouter ..> Exchange : uses
Exchange o-- Message : contains
The
TimestampRouterclass exposes a singleprocessmethod.It depends on the Camel
Exchangeobject to read properties and headers and set results.The
Exchangecontains theMessageobject where headers are manipulated.
Summary
`TimestampRouter.java` is a focused utility class designed for dynamic Kafka topic routing within Apache Camel Kafka integrations. By extracting timestamps from message headers and formatting them according to user-defined patterns, it enables flexible topic naming schemes such as date-based partitions. Its design supports multiple timestamp data types, uses safe and literal string substitution, and integrates seamlessly with Camel’s exchange and header mechanisms, making it a robust and reusable processor in event-driven Kafka pipelines.