MessageTimestampRouter.java
Overview
`MessageTimestampRouter` is a specialized processor class designed for Apache Camel routes integrating with Kafka. Its primary purpose is to dynamically determine and override the Kafka topic for outgoing messages based on timestamps extracted from fields within the JSON message body.
Rather than relying on Kafka headers or static topic names, this router parses timestamps embedded in the message payload, formats these timestamps according to a specified pattern, and injects the resulting timestamp string into a configurable topic name format. The dynamically constructed topic is then set as an override header to direct Kafka producers to route the message accordingly.
This class enables flexible time-based partitioning or routing of Kafka messages where timestamps are stored inside JSON payload fields, supporting multiple timestamp keys, diverse timestamp input formats, and customizable topic naming patterns.
Class: MessageTimestampRouter
Package
`org.apache.camel.component.kafka.transform`
Purpose
Processes Apache Camel `Exchange` messages to:
Extract timestamps from one or more specified fields in the JSON message body.
Parse and format these timestamps into a desired string representation.
Construct a new Kafka topic name by replacing placeholders in a topic format string.
Set the new topic name as an override header (
kafka.OVERRIDE_TOPIC) for downstream Kafka routing.
Method: process
public void process(
@ExchangeProperty("topicFormat") String topicFormat,
@ExchangeProperty("timestampFormat") String timestampFormat,
@ExchangeProperty("timestampKeys") String timestampKeys,
@ExchangeProperty("timestampKeyFormat") String timestampKeyFormat,
Exchange ex
) throws ParseException
Description
This method performs the core logic of extracting and formatting timestamps and dynamically generating the Kafka topic name.
Parameters
Parameter | Type | Description |
|---|---|---|
`topicFormat` | String | A string pattern defining the Kafka topic format, which may contain placeholders `$[topic]` and `$[timestamp]`. Example: `"my-topic-$[timestamp]"`. |
`timestampFormat` | String | The output timestamp format pattern used to format the extracted timestamp into a string. Example: `"yyyy-MM-dd"`. |
`timestampKeys` | String | Comma-separated list of JSON field names to search for timestamp values inside the message body. Example: `"createdAt,eventTime"`. |
`timestampKeyFormat` | String | The input format pattern of the timestamp string in the message body fields. If equal to `"timestamp"`, the raw timestamp is treated as a long epoch millis. Otherwise, it is parsed using this format. Example: `"yyyy-MM-dd'T'HH:mm:ss'Z'"`. |
`ex` | Exchange | The Apache Camel Exchange object representing the current message processing context, including message body and headers. |
Throws
ParseExceptionif the timestamp parsing from the JSON field fails due to invalid format.
Returns
void— modifies theExchangein-place by setting the override topic header if the timestamp is successfully extracted and formatted.
Usage Example
Within an Apache Camel route, you could configure and invoke this processor as:
MessageTimestampRouter router = new MessageTimestampRouter();
router.process(
"my-topic-$[timestamp]",
"yyyy-MM-dd",
"createdAt,eventTime",
"yyyy-MM-dd'T'HH:mm:ss'Z'",
exchange
);
This would:
Look for fields
"createdAt"or"eventTime"in the JSON body.Parse the timestamp using the ISO 8601 format.
Format the timestamp as
"yyyy-MM-dd".Replace the
$[timestamp]placeholder in the topic format.Set the resulting topic override header.
Detailed Behavior and Implementation Notes
Placeholder Patterns
Two placeholders are recognized inside the
topicFormatstring:$[topic]— replaced by the current Kafka topic header value, or empty string if not present.$[timestamp]— replaced by the formatted timestamp extracted from the message body.
These replacements are done using `java.util.regex.Pattern` with `Pattern.LITERAL` to treat the placeholders as literal strings.
Timestamp Extraction
The processor uses Jackson's
ObjectMapperto convert the JSON message body into aMap<Object,Object>.It iterates over the list of
timestampKeys(split by commas) and attempts to retrieve the first non-empty timestamp value from the JSON body.If no timestamp key is found or the value is empty, no routing override occurs.
Timestamp Parsing
If
timestampKeyFormatis non-empty and not equal to"timestamp", the timestamp value is parsed as a date string usingSimpleDateFormatwith UTC timezone.If
timestampKeyFormatequals"timestamp"or is empty, the timestamp is treated as a raw long integer representing milliseconds since epoch.Parsing errors throw
ParseException.
Timestamp Formatting
Once the timestamp is parsed into a
Longmillisecond value, it is formatted into a string using thetimestampFormatpattern and UTC timezone.
Topic Construction
The original
topicFormatstring is processed, replacing$[topic]and$[timestamp]placeholders with their resolved values.If the Kafka topic header (
kafka.TOPIC) is not present,$[topic]is replaced with an empty string.The final topic string is set on the exchange as the header
kafka.OVERRIDE_TOPIC.
Exchange Mutation
The method does not modify the message body.
It only sets or overrides the header
kafka.OVERRIDE_TOPICto direct Kafka producers to the new topic.
Interaction with Other System Components
Apache Camel Exchange and Message: The class operates on the Camel
Exchangeobject, reading message bodies and headers and setting headers to influence routing.Kafka Headers: Reads
kafka.TOPICheader to get the current topic and setskafka.OVERRIDE_TOPICheader to override the topic dynamically.JSON Payload Handling: Uses Jackson library to parse the JSON body into a map for flexible access to nested fields.
Timestamp Parsing and Formatting: Relies on
SimpleDateFormatfor parsing timestamp strings from JSON fields and formatting them for topic naming.Route Integration: Intended to be used as a Camel processor step before sending messages to Kafka, enabling dynamic, timestamp-based topic routing.
Visual Diagram: Class Structure and Method
classDiagram
class MessageTimestampRouter {
+void process(String topicFormat, String timestampFormat, String timestampKeys, String timestampKeyFormat, Exchange ex) throws ParseException
}
Summary
`MessageTimestampRouter` provides a robust mechanism for routing Kafka messages dynamically based on timestamps embedded inside JSON message bodies. By supporting multiple timestamp keys, flexible timestamp input formats, and dynamic topic name templates with placeholders, it enables time-based or content-driven Kafka topic routing that is crucial for advanced data partitioning and organization strategies in event-driven systems.
Appendix: Placeholder Usage in topicFormat
Placeholder | Description | Example Usage |
|---|---|---|
`$[topic]` | Inserts the current Kafka topic name header | `"logs-$[topic]-$[timestamp]"` → `"logs-app1-2024-06-15"` |
`$[timestamp]` | Inserts the formatted timestamp string | `"events-$[timestamp]"` → `"events-2024-06-15"` |
Notes
Timezone is always UTC for both parsing and formatting, ensuring consistency across distributed systems.
If no timestamp can be extracted, the topic override header is not set, and routing falls back to the original topic.
The processor expects JSON message bodies; non-JSON bodies will cause conversion failures.
If you require integration examples or further customization guidance, please refer to the **Dynamic Topic Routing** section of the Message Transformation Utilities documentation.