Message Transformation Utilities

Overview

The **Message Transformation Utilities** module provides a suite of processors designed to manipulate Kafka message contents and headers within Apache Camel routes. These processors enable dynamic and flexible modification of message bodies—primarily JSON structures—as well as dynamic routing of messages based on content or metadata such as timestamps and regex patterns.

This module exists to simplify common message transformation needs in Kafka integration pipelines, such as:

By encapsulating these transformations into reusable processors, the module promotes cleaner route definitions, better message governance, and enhanced routing flexibility.


Core Concepts and Purpose

Why Message Transformation Utilities?

Kafka messages often carry JSON payloads that need to be enriched, filtered, or routed differently based on business logic or compliance requirements. Directly manipulating these messages within Camel routes can be verbose and error-prone.

This module addresses such challenges by providing well-defined processors that:


Key Functionalities and Workflows

The module organizes processors into two main categories:

1. JSON Field Manipulation Processors

These processors modify the JSON message bodies, supporting operations such as insertion, dropping, replacement, masking, extraction, and hoisting of fields.


2. Dynamic Topic Routing Processors

These processors modify Kafka topic headers dynamically, enabling routing decisions based on message content or metadata.


Module Interactions and Relationships

This transformation module interacts primarily with the Camel routing layer and Kafka producer components:


Design Patterns and Approaches


Code References Illustrating Key Concepts

InsertField: Dynamic Field Insertion or Array Append

switch (body.getNodeType()) {
    case ARRAY:
        ((ArrayNode) body).add(resolvedValue);
        break;
    case OBJECT:
        ((ObjectNode) body).put(field, resolvedValue);
        break;
    default:
        ((ObjectNode) body).put(field, resolvedValue);
        break;
}

DropField: Conditional Field Removal

if (body.getNodeType().equals(JsonNodeType.OBJECT)) {
    ((ObjectNode) body).remove(field);
    ex.getMessage().setBody(body);
}

RegexRouter: Topic Header Rewrite Using Regex

Pattern regexPattern = Pattern.compile(regex);
String topicName = ex.getMessage().getHeader("kafka.TOPIC", String.class);
if (ObjectHelper.isNotEmpty(topicName)) {
    final Matcher matcher = regexPattern.matcher(topicName);
    if (matcher.matches()) {
        final String topicUpdated = matcher.replaceFirst(replacement);
        ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", topicUpdated);
    }
}

TimestampRouter: Timestamp-Based Topic Formatting

final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));

Long timestamp = ...; // extracted from header
if (ObjectHelper.isNotEmpty(timestamp)) {
    final String formattedTimestamp = fmt.format(new Date(timestamp));
    String updatedTopic = topicFormat
        .replace("$[topic]", topicName != null ? topicName : "")
        .replace("$[timestamp]", formattedTimestamp);
    ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
}

Mermaid Diagram: Message Transformation Processing Flow

flowchart TD
    Start[Start Message Processing] --> LoadBody[Load JSON Body from Exchange]
    LoadBody --> DecideTransform{Transformation Type?}
    DecideTransform -->|InsertField| Insert[Insert Field / Append to Array]
    DecideTransform -->|DropField| Drop[Remove Field from JSON Object]
    DecideTransform -->|ReplaceField| Replace[Filter, Rename, Replace Fields]
    DecideTransform -->|MaskField| Mask[Mask Sensitive Fields]
    DecideTransform -->|ExtractField| Extract[Extract Field to Body or Header]
    DecideTransform -->|HoistField| Hoist[Wrap Body inside Field]
    DecideTransform -->|ValueToKey| ValueKey[Extract Fields as Kafka Key]
    DecideTransform -->|RegexRouter| RegexRoute[Regex-Based Topic/Header Routing]
    DecideTransform -->|TimestampRouter| TimestampRoute[Timestamp-Based Topic Routing]
    DecideTransform -->|MessageTimestampRouter| MsgTimestampRoute[Body Timestamp-Based Routing]
    Insert --> UpdateExchange[Update Exchange with Transformed Body]
    Drop --> UpdateExchange
    Replace --> UpdateExchange
    Mask --> UpdateExchange
    Extract --> UpdateExchange
    Hoist --> UpdateExchange
    ValueKey --> UpdateExchangeWithKey[Set Kafka Key Header]
    RegexRoute --> UpdateHeaders[Set Kafka Override Topic Header]
    TimestampRoute --> UpdateHeaders
    MsgTimestampRoute --> UpdateHeaders
    UpdateExchange --> End[Continue Processing]
    UpdateExchangeWithKey --> End
    UpdateHeaders --> End

Summary

The **Message Transformation Utilities** module provides a rich set of processors that empower Camel Kafka routes to manipulate JSON messages and Kafka headers efficiently and declaratively. By supporting field-level JSON modifications and dynamic topic routing based on content or metadata, this module is essential for building flexible and maintainable Kafka message pipelines within Camel integrations.