PropagatedHeadersProvider.java
Overview
`PropagatedHeadersProvider` is a utility class designed to facilitate the propagation of Kafka message headers within the Apache Camel Kafka component. It manages how headers are extracted and provided for Kafka messages when producing records, especially in the context of batch processing with or without individual headers.
The class primarily supports two modes governed by the Kafka configuration flag `batchWithIndividualHeaders`:
Default mode (
batchWithIndividualHeaders=false): Headers are extracted once from a parent exchange/message and reused for all child messages within a batch.Individual headers mode (
batchWithIndividualHeaders=true): Headers are dynamically generated for each child message individually.
This class abstracts the header propagation logic, ensuring correct and efficient handling of Kafka headers according to the configured behavior.
Detailed Class Description
Class: PropagatedHeadersProvider
This class encapsulates the logic for producing Kafka headers for messages based on the parent exchange and message or per child message, depending on batch configuration.
Fields
Field Name | Type | Description |
|---|---|---|
`kafkaProducer` | `KafkaProducer` | Reference to the Kafka producer instance used to extract propagated headers. |
`parentExchange` | `Exchange` | The Camel Exchange representing the parent context for the batch or message group. |
`parentMessage` | `Message` | The parent Camel Message from which default headers may be derived. |
`propagatedHeaders` | `List ` | Cached list of Kafka headers propagated from the parent exchange/message when batch headers are not individualized (can be `null`). |
Constructor
public PropagatedHeadersProvider(KafkaProducer kafkaProducer,
KafkaConfiguration configuration,
Exchange parentExchange,
Message parentMessage)
Parameters:
kafkaProducer: The KafkaProducer instance that provides header extraction methods.configuration: KafkaConfiguration object holding configuration flags, includingbatchWithIndividualHeaders.parentExchange: The parent Camel Exchange from which headers may be propagated.parentMessage: The parent Camel Message associated with the exchange.
Description:
Initializes the provider. IfbatchWithIndividualHeadersis disabled (default), it extracts and caches propagated headers immediately from the parent exchange/message for reuse. If enabled, it defers header extraction to be performed individually per message.
Methods
List<Header> getDefaultHeaders()
Returns:
AList<Header>representing the Kafka headers extracted from the parent exchange/message.Description:
Retrieves propagated headers determined from the parent exchange and message. This method is used internally when batch individual headers are disabled, providing a consistent set of headers for all messages in the batch.Usage Example:
List<Header> defaultHeaders = propagatedHeadersProvider.getDefaultHeaders();
List<Header> getHeaders(Exchange childExchange, Message childMessage)
Parameters:
childExchange: The Camel Exchange for the current child message.childMessage: The Camel Message for which headers are to be provided.
Returns:
AList<Header>containing the Kafka headers for the given child message.Description:
Returns the appropriate headers for a particular child message:If
propagatedHeadersis already cached (batch individual headers disabled), it returns those directly.Otherwise, it dynamically extracts headers from the specific child exchange and message.
Usage Example:
List<Header> headers = propagatedHeadersProvider.getHeaders(childExchange, childMessage);
Important Implementation Details
The class tightly couples with
KafkaProducerfor the actual header extraction logic viagetPropagatedHeaders().The distinction between batch processing modes (
batchWithIndividualHeadersenabled or disabled) dictates whether headers are cached once or generated per message.This design optimizes performance for default batch processing by avoiding repeated header extraction.
The class assumes that
parentExchangeandchildExchangemay sometimes be identical, but messages are always distinct, supporting fine-grained header control per message.
Interaction with Other Components
KafkaProducer:
The primary collaborator,KafkaProducerprovides the methodgetPropagatedHeaders(Exchange, Message)which actually extracts Kafka-compatible headers from Camel messages.KafkaConfiguration:
Provides configuration flags such asbatchWithIndividualHeadersthat influence the header propagation strategy.Camel Exchange and Message:
PropagatedHeadersProvideruses these Camel core types to access message headers and contextual information.Kafka Headers (
org.apache.kafka.common.header.Header):
The output of this provider is a list of Kafka headers that will be attached to Kafka records for message production.
This class is typically used internally by the Kafka component’s producer logic during message batch processing to ensure headers are correctly propagated according to user configuration.
Mermaid Class Diagram
classDiagram
class PropagatedHeadersProvider {
-KafkaProducer kafkaProducer
-Exchange parentExchange
-Message parentMessage
-List<Header> propagatedHeaders
+PropagatedHeadersProvider(KafkaProducer, KafkaConfiguration, Exchange, Message)
+List<Header> getDefaultHeaders()
+List<Header> getHeaders(Exchange, Message)
}
PropagatedHeadersProvider --> KafkaProducer : uses
PropagatedHeadersProvider --> KafkaConfiguration : reads config
PropagatedHeadersProvider --> Exchange : parentExchange, childExchange
PropagatedHeadersProvider --> Message : parentMessage, childMessage
PropagatedHeadersProvider --> Header : produces List<Header>
Summary
`PropagatedHeadersProvider` is a focused helper class within the Apache Camel Kafka producer component that streamlines the propagation of Kafka headers during batch message production. It supports configurable behavior for header reuse or individual header extraction per message, integrating closely with `KafkaProducer` and Camel core types. This abstraction simplifies header management for Kafka messaging in Camel routes, improving maintainability and configurability of Kafka header propagation.