Header Propagation
Purpose
Header Propagation addresses the need to seamlessly transfer Camel message headers into Kafka message headers during message production. This functionality enables metadata and contextual information carried in Camel exchanges to be preserved and communicated through Kafka topics. By serializing and filtering headers appropriately, it ensures that only relevant and supported headers are propagated, maintaining interoperability and message integrity within Kafka producers.
Functionality
The Header Propagation process primarily involves:
Filtering Headers: Using a configurable
HeaderFilterStrategyto exclude headers that should not be propagated to Kafka. This prevents leaking internal Camel headers or unsupported metadata.Serialization: Converting Camel header values into byte arrays suitable for Kafka headers. This is handled by a
KafkaHeaderSerializerimplementation, which by default supports common types likeString,Integer,Long,Double,Boolean, andbyte[]. For other types, it attempts conversion via Camel’s type converters.Header Construction: Creating Kafka
RecordHeaderinstances for each filtered and serialized header key-value pair to be attached to Kafka producer records.
Within the [KafkaProducer](/projects/289/68521) class, the method `getPropagatedHeaders` orchestrates this workflow:
public List<Header> getPropagatedHeaders(Exchange exchange, Message message) {
Map<String, Object> messageHeaders = message.getHeaders();
List<Header> propagatedHeaders = new ArrayList<>(messageHeaders.size());
for (Map.Entry<String, Object> header : messageHeaders.entrySet()) {
RecordHeader recordHeader = getRecordHeader(header, exchange);
if (recordHeader != null) {
propagatedHeaders.add(recordHeader);
}
}
return propagatedHeaders;
}
Here, each header is passed to `getRecordHeader`, which applies filtering and serialization:
private RecordHeader getRecordHeader(Map.Entry<String, Object> entry, Exchange exchange) {
final HeaderFilterStrategy headerFilterStrategy = configuration.getHeaderFilterStrategy();
final String key = entry.getKey();
final Object value = entry.getValue();
if (shouldBeFiltered(key, value, exchange, headerFilterStrategy)) {
final KafkaHeaderSerializer headerSerializer = configuration.getHeaderSerializer();
final byte[] headerValue = headerSerializer.serialize(key, value);
if (headerValue == null) {
return null;
}
return new RecordHeader(key, headerValue);
}
return null;
}
The default serializer implementation (`DefaultKafkaHeaderSerializer`) converts supported types as follows:
Converts strings directly to UTF-8 bytes.
Serializes primitives like integers and longs to their byte representations.
Converts booleans to string bytes.
Supports raw byte arrays as is.
Attempts fallback conversion via Camel’s type converters.
If a header value is of an unsupported type or fails conversion, it is skipped with a debug log.
Integration
Header Propagation is a critical subpart of the Kafka message production flow. It complements other subtopics such as:
Asynchronous Sending: Headers are propagated regardless of synchronous or asynchronous sending modes, providing consistent metadata across message dispatch methods.
Transactional Sending: Headers are included within messages sent inside Kafka transactions, ensuring atomicity does not lose header information.
By segregating header serialization logic from message production, it allows flexible customization of header filtering and serialization strategies without impacting core producer workflows.
This subtopic introduces the pluggable header serializer and filter mechanism, which is not detailed in the parent topic or other subtopics, enabling users to define how headers are transformed and which headers are propagated to Kafka.
Diagram
flowchart TD
A[Start: Produce Kafka Message] --> B[Retrieve Camel Message Headers]
B --> C{Filter Headers?}
C -- Yes --> D[Apply HeaderFilterStrategy]
D --> E{Serialize Header Value?}
E -- Yes --> F[Use KafkaHeaderSerializer to Serialize]
E -- No --> G[Skip Header]
F --> H[Create Kafka RecordHeader]
H --> I[Add to Propagated Headers List]
G --> I
C -- No --> H
I --> J{More Headers?}
J -- Yes --> D
J -- No --> K[Attach Headers to ProducerRecord]
K --> L[Send Message to Kafka]
This flowchart illustrates the key steps in header propagation during Kafka message production:
Extract headers from the Camel message.
Filter headers using the configured strategy to exclude unwanted headers.
Serialize header values to byte arrays.
Convert serialized headers into Kafka
RecordHeaderobjects.Attach the resulting headers to the Kafka producer record before sending.
This focused explanation highlights the purpose and mechanics of header propagation as a modular part of Kafka message production, emphasizing its flexibility and extensibility through configurable filters and serializers.