ValueToKey.java


Overview

`ValueToKey.java` is a processor utility class within the Apache Camel Kafka integration framework. Its primary purpose is to extract specified fields from a JSON message body and use those fields to compose a Kafka message key. The constructed key is then set on the Camel message header `kafka.KEY`.

This transformation facilitates customized Kafka key creation based on selected fields from the message payload, enabling downstream Kafka producer components to perform key-based partitioning, routing, or message grouping.


Class: ValueToKey

Description

The `ValueToKey` class provides a single method `process` which reads a JSON message body from the Camel `Exchange`, extracts specific fields defined by the user, and sets these fields as a composite key on the Kafka message header.

Package

org.apache.camel.component.kafka.transform

Dependencies


Methods

process

public void process(@ExchangeProperty("fields") String fields, Exchange ex) throws InvalidPayloadException

Description

Extracts JSON fields specified by the `fields` property from the Camel message body and sets them as the Kafka message key header (`kafka.KEY`).

Parameters

Returns

Throws

Detailed Behavior

  1. Initializes an empty list splittedFields.

  2. Uses Jackson ObjectMapper to convert the message body, expected as a JsonNode, into a Map<Object,Object> representing the JSON object fields.

  3. If the fields string is not empty, it splits the string by commas to create a list of field names.

  4. Iterates over all entries in the JSON body map:

    • For each entry, checks if the key (field name) is contained in the selected fields list using filterNames.

    • If the field name matches, the field and its value are added to a new map key.

  5. Sets the constructed key map as the kafka.KEY header on the message.

Usage Example

Suppose the message body JSON is:

{
  "userId": 123,
  "orderId": "A456",
  "amount": 29.99,
  "status": "confirmed"
}

If the exchange property `"fields"` is `"userId,orderId"`, after processing:

Map<Object,Object> key = new HashMap<>();
key.put("userId", 123);
key.put("orderId", "A456");
ex.getMessage().setHeader("kafka.KEY", key);

This allows Kafka to use the combination of `userId` and `orderId` fields as the message key for partitioning.


filterNames

boolean filterNames(String fieldName, List<String> splittedFields)

Description

Helper method to determine if a given field name should be included in the Kafka key extraction.

Parameters

Returns

Usage

This method is used internally in the `process` method to filter fields based on the configured list.


Implementation Details and Algorithm


Interaction with the System


Visual Diagram: Class Structure

classDiagram
    class ValueToKey {
        +void process(String fields, Exchange ex) throws InvalidPayloadException
        +boolean filterNames(String fieldName, List~String~ splittedFields)
    }

Summary

The `ValueToKey` class is a concise utility processor designed to extract specific fields from a JSON message body and use those fields as the Kafka message key. This allows flexible key construction based on message content, enabling efficient Kafka partitioning and routing. It integrates with Camel Kafka routes by leveraging exchange properties and headers and uses Jackson for JSON manipulation.

Its simplicity and focused functionality make it a valuable component in the broader Message Transformation Utilities module, particularly when precise Kafka key control is required.


Example Usage in Camel Route (Conceptual)

from("direct:start")
    .process(new ValueToKey())
    .to("kafka:my-topic");

With route configuration or prior steps setting the `"fields"` exchange property to `"userId,orderId"`.


Additional Notes


End of Documentation for ValueToKey.java