ValueToKey.java
Overview
`ValueToKey.java` is a processor utility class within the Apache Camel Kafka integration framework. Its primary purpose is to extract specified fields from a JSON message body and use those fields to compose a Kafka message key. The constructed key is then set on the Camel message header `kafka.KEY`.
This transformation facilitates customized Kafka key creation based on selected fields from the message payload, enabling downstream Kafka producer components to perform key-based partitioning, routing, or message grouping.
Class: ValueToKey
Description
The `ValueToKey` class provides a single method `process` which reads a JSON message body from the Camel `Exchange`, extracts specific fields defined by the user, and sets these fields as a composite key on the Kafka message header.
Package
org.apache.camel.component.kafka.transform
Dependencies
Jackson
ObjectMapperandJsonNodefor JSON parsing and conversion.Apache Camel
Exchange,ExchangeProperty,InvalidPayloadException.Utility class
ObjectHelperfrom Camel for null/empty checks.Java collections and streams.
Methods
process
public void process(@ExchangeProperty("fields") String fields, Exchange ex) throws InvalidPayloadException
Description
Extracts JSON fields specified by the `fields` property from the Camel message body and sets them as the Kafka message key header (`kafka.KEY`).
Parameters
@ExchangeProperty("fields") String fields
A comma-separated string listing the field names to extract from the JSON body.Exchange ex
The Apache Camel Exchange object representing the current message exchange.
Returns
void
The method modifies the exchange in-place by setting a message header.
Throws
InvalidPayloadException
If the message body cannot be converted properly or is invalid for the operation.
Detailed Behavior
Initializes an empty list
splittedFields.Uses Jackson
ObjectMapperto convert the message body, expected as aJsonNode, into aMap<Object,Object>representing the JSON object fields.If the
fieldsstring is not empty, it splits the string by commas to create a list of field names.Iterates over all entries in the JSON body map:
For each entry, checks if the key (field name) is contained in the selected fields list using
filterNames.If the field name matches, the field and its value are added to a new map
key.
Sets the constructed
keymap as thekafka.KEYheader on the message.
Usage Example
Suppose the message body JSON is:
{
"userId": 123,
"orderId": "A456",
"amount": 29.99,
"status": "confirmed"
}
If the exchange property `"fields"` is `"userId,orderId"`, after processing:
The Kafka key header will be set as:
Map<Object,Object> key = new HashMap<>();
key.put("userId", 123);
key.put("orderId", "A456");
ex.getMessage().setHeader("kafka.KEY", key);
This allows Kafka to use the combination of `userId` and `orderId` fields as the message key for partitioning.
filterNames
boolean filterNames(String fieldName, List<String> splittedFields)
Description
Helper method to determine if a given field name should be included in the Kafka key extraction.
Parameters
String fieldName— The current field name from the JSON body.List<String> splittedFields— The list of fields to extract.
Returns
trueiffieldNameis present insplittedFields, otherwisefalse.
Usage
This method is used internally in the `process` method to filter fields based on the configured list.
Implementation Details and Algorithm
Uses Jackson's
ObjectMapperfor robust conversion fromJsonNodeto a JavaMap. This approach handles any JSON object structure flexibly.Splits the input
fieldsstring by commas to allow multiple field extraction.Iterates through the message body map entries rather than directly manipulating JSON nodes, simplifying the extraction logic.
Constructs a new map containing only the specified fields as the Kafka key.
Sets the result on the message header
kafka.KEY, which is recognized by Camel Kafka components to set the Kafka record key.
Interaction with the System
The
ValueToKeyclass operates as a Camel processor in Kafka integration routes.It expects the message body to be JSON and the list of target fields to be provided as an exchange property named
"fields".By setting the
kafka.KEYheader, it influences the Kafka producer's message key, which is critical for Kafka's partitioning and message ordering semantics.It is typically used in routes before the Kafka producer endpoint, enabling customized key extraction logic without modifying the route definition extensively.
Works cohesively with other transformation utilities in the
org.apache.camel.component.kafka.transformpackage that manipulate JSON bodies or headers.
Visual Diagram: Class Structure
classDiagram
class ValueToKey {
+void process(String fields, Exchange ex) throws InvalidPayloadException
+boolean filterNames(String fieldName, List~String~ splittedFields)
}
The class contains two public methods:
process(primary business logic) andfilterNames(helper).There are no fields or properties; the class operates statelessly.
Summary
The `ValueToKey` class is a concise utility processor designed to extract specific fields from a JSON message body and use those fields as the Kafka message key. This allows flexible key construction based on message content, enabling efficient Kafka partitioning and routing. It integrates with Camel Kafka routes by leveraging exchange properties and headers and uses Jackson for JSON manipulation.
Its simplicity and focused functionality make it a valuable component in the broader Message Transformation Utilities module, particularly when precise Kafka key control is required.
Example Usage in Camel Route (Conceptual)
from("direct:start")
.process(new ValueToKey())
.to("kafka:my-topic");
With route configuration or prior steps setting the `"fields"` exchange property to `"userId,orderId"`.
Additional Notes
The class assumes the message body is a JSON object; if the body is not a JSON object or is malformed, an exception may be thrown.
The key header is a
Map<Object,Object>; downstream Kafka components need to serialize this map appropriately or further process it.The method does not transform or alter the message body; it only sets headers.
No caching or state is maintained; the class is thread-safe.