Header Propagation

Purpose

Header Propagation addresses the need to seamlessly transfer Camel message headers into Kafka message headers during message production. This functionality enables metadata and contextual information carried in Camel exchanges to be preserved and communicated through Kafka topics. By serializing and filtering headers appropriately, it ensures that only relevant and supported headers are propagated, maintaining interoperability and message integrity within Kafka producers.

Functionality

The Header Propagation process primarily involves:

Within the [KafkaProducer](/projects/289/68521) class, the method `getPropagatedHeaders` orchestrates this workflow:

public List<Header> getPropagatedHeaders(Exchange exchange, Message message) {
    Map<String, Object> messageHeaders = message.getHeaders();
    List<Header> propagatedHeaders = new ArrayList<>(messageHeaders.size());

    for (Map.Entry<String, Object> header : messageHeaders.entrySet()) {
        RecordHeader recordHeader = getRecordHeader(header, exchange);
        if (recordHeader != null) {
            propagatedHeaders.add(recordHeader);
        }
    }

    return propagatedHeaders;
}

Here, each header is passed to `getRecordHeader`, which applies filtering and serialization:

private RecordHeader getRecordHeader(Map.Entry<String, Object> entry, Exchange exchange) {
    final HeaderFilterStrategy headerFilterStrategy = configuration.getHeaderFilterStrategy();
    final String key = entry.getKey();
    final Object value = entry.getValue();

    if (shouldBeFiltered(key, value, exchange, headerFilterStrategy)) {
        final KafkaHeaderSerializer headerSerializer = configuration.getHeaderSerializer();
        final byte[] headerValue = headerSerializer.serialize(key, value);

        if (headerValue == null) {
            return null;
        }
        return new RecordHeader(key, headerValue);
    }
    return null;
}

The default serializer implementation (`DefaultKafkaHeaderSerializer`) converts supported types as follows:

If a header value is of an unsupported type or fails conversion, it is skipped with a debug log.

Integration

Header Propagation is a critical subpart of the Kafka message production flow. It complements other subtopics such as:

By segregating header serialization logic from message production, it allows flexible customization of header filtering and serialization strategies without impacting core producer workflows.

This subtopic introduces the pluggable header serializer and filter mechanism, which is not detailed in the parent topic or other subtopics, enabling users to define how headers are transformed and which headers are propagated to Kafka.

Diagram

flowchart TD
    A[Start: Produce Kafka Message] --> B[Retrieve Camel Message Headers]
    B --> C{Filter Headers?}
    C -- Yes --> D[Apply HeaderFilterStrategy]
    D --> E{Serialize Header Value?}
    E -- Yes --> F[Use KafkaHeaderSerializer to Serialize]
    E -- No --> G[Skip Header]
    F --> H[Create Kafka RecordHeader]
    H --> I[Add to Propagated Headers List]
    G --> I
    C -- No --> H
    I --> J{More Headers?}
    J -- Yes --> D
    J -- No --> K[Attach Headers to ProducerRecord]
    K --> L[Send Message to Kafka]

This flowchart illustrates the key steps in header propagation during Kafka message production:

  1. Extract headers from the Camel message.

  2. Filter headers using the configured strategy to exclude unwanted headers.

  3. Serialize header values to byte arrays.

  4. Convert serialized headers into Kafka RecordHeader objects.

  5. Attach the resulting headers to the Kafka producer record before sending.


This focused explanation highlights the purpose and mechanics of header propagation as a modular part of Kafka message production, emphasizing its flexibility and extensibility through configurable filters and serializers.