Message Transformation Utilities
Overview
The **Message Transformation Utilities** module provides a suite of processors designed to manipulate Kafka message contents and headers within Apache Camel routes. These processors enable dynamic and flexible modification of message bodies—primarily JSON structures—as well as dynamic routing of messages based on content or metadata such as timestamps and regex patterns.
This module exists to simplify common message transformation needs in Kafka integration pipelines, such as:
Adding, removing, or modifying JSON fields.
Masking sensitive data before sending or after receiving messages.
Extracting or hoisting fields for specialized routing or header enrichment.
Dynamically determining Kafka topic destinations based on message timestamps or string patterns.
By encapsulating these transformations into reusable processors, the module promotes cleaner route definitions, better message governance, and enhanced routing flexibility.
Core Concepts and Purpose
Why Message Transformation Utilities?
Kafka messages often carry JSON payloads that need to be enriched, filtered, or routed differently based on business logic or compliance requirements. Directly manipulating these messages within Camel routes can be verbose and error-prone.
This module addresses such challenges by providing well-defined processors that:
Operate on JSON message bodies using Jackson data structures.
Use Camel’s Exchange and header mechanisms to influence routing and metadata.
Support configurable and parameterized transformations, e.g., field names, masking values, and routing patterns.
Enable decoupling of transformation logic from routing logic for maintainability.
Key Functionalities and Workflows
The module organizes processors into two main categories:
1. JSON Field Manipulation Processors
These processors modify the JSON message bodies, supporting operations such as insertion, dropping, replacement, masking, extraction, and hoisting of fields.
InsertField: Inserts a new field or appends a value to a JSON array.
If the JSON body is an object, inserts a field with a specified name and value.
If the JSON body is an array, appends the value.
Supports dynamic value resolution using Camel Simple language expressions.
Example snippet:
JsonNode body = ex.getMessage().getBody(JsonNode.class); ((ObjectNode) body).put(field, resolvedValue); ex.getMessage().setBody(body);DropField: Removes a specified field from a JSON object body.
Operates only if the body is a JSON object.
Silently no-ops if the body is not an object or the field does not exist.
ReplaceField: Provides field-level control to filter enabled/disabled fields and rename fields.
Supports enabling only certain fields or disabling others.
Renames fields according to configured mappings.
Returns a new JSON node with applied transformations.
MaskField: Masks sensitive fields by replacing values with null equivalents or custom replacements.
Uses mapping between Java types and default mask values (e.g., zero for numbers, empty string for strings).
Supports custom replacement strings with type conversion.
Throws exceptions if masking is not possible for a given type or replacement.
ExtractField: Extracts a specific field from the JSON body either replacing the body or storing it in a header.
Supports options to output the extracted value in a header instead of the body.
Can perform strict header checks to avoid overwriting existing headers.
Can set a property indicating if the field was trimmed.
HoistField: Wraps the existing message body inside a new JSON object under the specified field name.
Useful to nest existing JSON into a subfield for routing or schema compliance.
ValueToKey: Extracts specified fields from the JSON body and sets them as the Kafka message key header.
Supports selecting multiple fields to compose a key map.
Facilitates custom partitioning or key-based routing downstream.
2. Dynamic Topic Routing Processors
These processors modify Kafka topic headers dynamically, enabling routing decisions based on message content or metadata.
RegexRouter: Uses regular expressions to transform Kafka topic names or CloudEvents
ce-typeheaders.Matches the existing topic or header value with a regex.
Replaces matching parts with a specified replacement string.
Sets the updated topic in the header
kafka.OVERRIDE_TOPICto override routing.
Example snippet:
Pattern regexPattern = Pattern.compile(regex); String topicName = ex.getMessage().getHeader("kafka.TOPIC", String.class); if (regexPattern.matcher(topicName).matches()) { String topicUpdated = matcher.replaceFirst(replacement); ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", topicUpdated); }TimestampRouter: Formats message timestamps and inserts them into Kafka topic names.
Supports placeholders
$[topic]and$[timestamp]within a topic format string.Extracts timestamp from a specified header, supporting multiple types (Long, Instant).
Formats timestamp with a configured pattern in UTC timezone.
Overrides the topic header with the formatted topic string to route messages accordingly.
MessageTimestampRouter: Similar to
TimestampRouterbut extracts timestamp values from JSON body fields.Supports multiple timestamp key candidates and timestamp input formats.
Parses timestamps from string or numeric values.
Applies the same topic format replacement logic to produce dynamic topic names.
Module Interactions and Relationships
This transformation module interacts primarily with the Camel routing layer and Kafka producer components:
Camel Exchange: All processors operate on the Camel
Exchangeobject, accessing and modifying message bodies and headers.Kafka Message Headers: Processors like
RegexRouterand timestamp routers manipulate Kafka-specific headers such askafka.TOPICandkafka.OVERRIDE_TOPICto influence downstream Kafka routing.JSON Payloads: Most processors expect and manipulate JSON payloads represented as
JsonNodeobjects, leveraging Jackson for serialization and deserialization.Integration with Kafka Producer: The transformed messages and headers are eventually passed to the
KafkaProducerwhich uses these headers (e.g., overridden topic) and bodies for producing Kafka records.Configuration via Exchange Properties: Many processors receive parameters via Camel exchange properties (e.g., fields to mask, regex patterns, timestamp formats), allowing flexible configuration without hardcoding.
Design Patterns and Approaches
Processor Interface: Many classes implement or act as Camel
Processors, enabling seamless integration into Camel routes.Use of Jackson Tree Model: JSON bodies are processed using Jackson's
JsonNodehierarchy, allowing type-safe and efficient JSON manipulation.Dynamic Expression Resolution: For example,
InsertFieldsupports dynamic value resolution using Camel’s Simple language expressions evaluated at runtime.Immutable Transformation: Processors like
ReplaceFieldproduce new JSON nodes rather than mutating the existing body directly, ensuring safer transformations.Header-Based Routing Override: Topic routing processors use Kafka header overrides to transparently redirect messages without altering route definitions.
Configurable Field Lists: Several processors accept comma-separated lists of field names, enabling selective field manipulation.
Code References Illustrating Key Concepts
InsertField: Dynamic Field Insertion or Array Append
switch (body.getNodeType()) {
case ARRAY:
((ArrayNode) body).add(resolvedValue);
break;
case OBJECT:
((ObjectNode) body).put(field, resolvedValue);
break;
default:
((ObjectNode) body).put(field, resolvedValue);
break;
}
DropField: Conditional Field Removal
if (body.getNodeType().equals(JsonNodeType.OBJECT)) {
((ObjectNode) body).remove(field);
ex.getMessage().setBody(body);
}
RegexRouter: Topic Header Rewrite Using Regex
Pattern regexPattern = Pattern.compile(regex);
String topicName = ex.getMessage().getHeader("kafka.TOPIC", String.class);
if (ObjectHelper.isNotEmpty(topicName)) {
final Matcher matcher = regexPattern.matcher(topicName);
if (matcher.matches()) {
final String topicUpdated = matcher.replaceFirst(replacement);
ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", topicUpdated);
}
}
TimestampRouter: Timestamp-Based Topic Formatting
final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
Long timestamp = ...; // extracted from header
if (ObjectHelper.isNotEmpty(timestamp)) {
final String formattedTimestamp = fmt.format(new Date(timestamp));
String updatedTopic = topicFormat
.replace("$[topic]", topicName != null ? topicName : "")
.replace("$[timestamp]", formattedTimestamp);
ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
}
Mermaid Diagram: Message Transformation Processing Flow
flowchart TD
Start[Start Message Processing] --> LoadBody[Load JSON Body from Exchange]
LoadBody --> DecideTransform{Transformation Type?}
DecideTransform -->|InsertField| Insert[Insert Field / Append to Array]
DecideTransform -->|DropField| Drop[Remove Field from JSON Object]
DecideTransform -->|ReplaceField| Replace[Filter, Rename, Replace Fields]
DecideTransform -->|MaskField| Mask[Mask Sensitive Fields]
DecideTransform -->|ExtractField| Extract[Extract Field to Body or Header]
DecideTransform -->|HoistField| Hoist[Wrap Body inside Field]
DecideTransform -->|ValueToKey| ValueKey[Extract Fields as Kafka Key]
DecideTransform -->|RegexRouter| RegexRoute[Regex-Based Topic/Header Routing]
DecideTransform -->|TimestampRouter| TimestampRoute[Timestamp-Based Topic Routing]
DecideTransform -->|MessageTimestampRouter| MsgTimestampRoute[Body Timestamp-Based Routing]
Insert --> UpdateExchange[Update Exchange with Transformed Body]
Drop --> UpdateExchange
Replace --> UpdateExchange
Mask --> UpdateExchange
Extract --> UpdateExchange
Hoist --> UpdateExchange
ValueKey --> UpdateExchangeWithKey[Set Kafka Key Header]
RegexRoute --> UpdateHeaders[Set Kafka Override Topic Header]
TimestampRoute --> UpdateHeaders
MsgTimestampRoute --> UpdateHeaders
UpdateExchange --> End[Continue Processing]
UpdateExchangeWithKey --> End
UpdateHeaders --> End
Summary
The **Message Transformation Utilities** module provides a rich set of processors that empower Camel Kafka routes to manipulate JSON messages and Kafka headers efficiently and declaratively. By supporting field-level JSON modifications and dynamic topic routing based on content or metadata, this module is essential for building flexible and maintainable Kafka message pipelines within Camel integrations.