KeyValueHolderIterator.java
Overview
`KeyValueHolderIterator.java` defines the `KeyValueHolderIterator` class, an iterator designed to convert and wrap a collection of message objects into Kafka producer records (`ProducerRecord`) within the Apache Camel Kafka component.
Its primary role is to iterate over messages (which can be raw payloads, Camel `Message`, or Camel `Exchange` objects) and generate corresponding Kafka `ProducerRecord` instances with correct topic, key, partition, timestamp, serialized value, and propagated Kafka headers. This iterator facilitates batch or individual message sending by transforming messages into Kafka-ready entries, handling all necessary serialization and header propagation based on the configured Kafka setup and Camel exchange context.
Detailed Class and Method Descriptions
Class: KeyValueHolderIterator
**Package:** `org.apache.camel.component.kafka.producer.support`
**Implements:** `Iterator>>`
This class wraps an iterator over raw message objects and converts each element into a `KeyValueHolder` holding:
The original message object as the key.
The corresponding Kafka
ProducerRecordas the value.
The `ProducerRecord` is constructed with dynamically resolved topic, key, partition, timestamp, and Kafka headers, applying serialization logic as per the Kafka configuration.
Constructor
public KeyValueHolderIterator(
Iterator<Object> msgList,
Exchange exchange,
KafkaConfiguration kafkaConfiguration,
String msgTopic,
PropagatedHeadersProvider propagatedHeadersProvider)
Parameters:
msgList: An
Iterator<Object>over messages to be sent. Each element can be:A raw message payload (any
Object).A Camel
Message.A Camel
Exchange.
exchange: The Camel
Exchangecontext containing routing and processing information, used for header filtering, serialization context, and fallback.kafkaConfiguration: Configuration object encapsulating Kafka producer settings such as serializers, default topic, keys, partition keys, etc.
msgTopic: The default Kafka topic to be used if not overridden by message headers.
propagatedHeadersProvider: Utility to extract and prepare Kafka headers from Camel message headers, either default or per-message.
Methods
boolean hasNext()
Returns
trueif there are more messages in the underlyingmsgListiterator.Delegates directly to
msgList.hasNext().
KeyValueHolder<Object, ProducerRecord<Object, Object>> next()
Retrieves the next message from
msgListand converts it into aKeyValueHolderwrapping:The original message object.
A Kafka
ProducerRecordproperly constructed with:Topic: resolved from message header
KafkaConstants.OVERRIDE_TOPICor defaultmsgTopic.Partition key: resolved from message header or configuration.
Timestamp: optional override from header
KafkaConstants.OVERRIDE_TIMESTAMP.Key: resolved from message header
KafkaConstants.KEYor configuration, serialized accordingly.Value: the message body, converted to the expected serialized type.
Headers: propagated Kafka headers extracted from the message.
Special handling:
If the message object is a Camel
ExchangeorMessage, it extracts inner details accordingly.Otherwise, treats the object as the raw message body and uses default topic and headers.
Returns: A
KeyValueHolderinstance pairing the original message with the KafkaProducerRecord.
void remove()
Removes the current element from the underlying iterator
msgList.Delegates directly to
msgList.remove().
Private Helper Methods
These methods assist `next()` in extracting and resolving message details.
Message getInnerMessage(Object object)
If
objectis anExchange, returns its inboundMessage(exchange.getIn()).If
objectis aMessage, returns it directly.Used to unify message access regardless of original object type.
Exchange getInnerExchange(Object object)
If
objectis anExchange, returns it.Otherwise, returns
null.Used to obtain transactional and routing context when available.
Long getOverrideTimestamp(Message innerMessage)
Checks if the header
KafkaConstants.OVERRIDE_TIMESTAMPis present.If so, converts the header value to
Longusing Camel's type converter.Returns the timestamp override or
nullif none.
String getInnerTopic(Message innerMessage)
Checks the header
KafkaConstants.OVERRIDE_TOPIC.Returns its value if present (removing the header in the process).
Otherwise, returns the default topic
msgTopic.
Object getInnerKey(Exchange innerExchange, Message innerMessage)
Checks the header
KafkaConstants.KEYon the message.If present:
If the Kafka configuration has a static key configured, that takes precedence.
Converts the key to the serialized type using the configured key serializer.
Returns the serialized key or
nullif none.
Integer getInnerPartitionKey(Message innerMessage)
Checks the header
KafkaConstants.PARTITION_KEYon the message.If Kafka configuration has a static partition key, it overrides the header.
Returns the partition key or
nullif none.
Usage Example
Iterator<Object> messages = ...; // e.g. List<Object> or Exchange Body collection
Exchange exchange = ...; // Current Camel exchange
KafkaConfiguration config = ...; // Kafka producer configuration
String defaultTopic = "myTopic";
PropagatedHeadersProvider headersProvider = ...;
KeyValueHolderIterator iterator = new KeyValueHolderIterator(
messages, exchange, config, defaultTopic, headersProvider);
while (iterator.hasNext()) {
KeyValueHolder<Object, ProducerRecord<Object, Object>> kv =
iterator.next();
// kv.getKey() - original message object
// kv.getValue() - Kafka ProducerRecord ready to send
kafkaProducer.send(kv.getValue());
}
Important Implementation Details
Serialization:
UsesProducerUtil.tryConvertToSerializedType()to convert keys and values into types compatible with Kafka serializers configured inKafkaConfiguration. This allows dynamic adaptation of message content.Header Propagation:
Kafka record headers are obtained viapropagatedHeadersProvider, which extracts and serializes Camel message headers into KafkaRecordHeaderinstances.Dynamic Topic and Key Resolution:
Supports per-message overrides by inspecting Camel message headers (KafkaConstants.OVERRIDE_TOPIC,KafkaConstants.KEY,KafkaConstants.PARTITION_KEY, andKafkaConstants.OVERRIDE_TIMESTAMP).Type Agnostic Input:
The iterator gracefully handles inputs of different types: raw payloads,Message, orExchange. This flexibility allows seamless batch processing of heterogeneous message lists.State Management:
The iterator delegates removal operations to the underlying iterator, ensuring consistent behavior in message collections.
Interaction with Other System Components
KafkaProducer (Kafka message sending):
KeyValueHolderIteratoris typically used by theKafkaProducerclass during batch message sending. It provides a standardized way to convert Camel messages into KafkaProducerRecordobjects.PropagatedHeadersProvider:
This provider abstracts the logic of extracting Camel headers and converting them into Kafka headers. The iterator relies on it to ensure headers propagate correctly.KafkaConfiguration:
Supplies serializer configurations, default keys, partition keys, and other Kafka producer settings needed to correctly construct theProducerRecord.Camel Exchange and Message:
The iterator accesses the CamelExchangeandMessageto derive message-specific overrides and context essential for accurate Kafka record construction.ProducerUtil:
Utility class used for type conversion to Kafka serializer-compatible types.
Mermaid Class Diagram
classDiagram
class KeyValueHolderIterator {
- Iterator<Object> msgList
- Exchange exchange
- KafkaConfiguration kafkaConfiguration
- String msgTopic
- PropagatedHeadersProvider propagatedHeadersProvider
+ KeyValueHolderIterator(Iterator<Object>, Exchange, KafkaConfiguration, String, PropagatedHeadersProvider)
+ boolean hasNext()
+ KeyValueHolder<Object, ProducerRecord<Object, Object>> next()
+ void remove()
- Message getInnerMessage(Object)
- Exchange getInnerExchange(Object)
- Long getOverrideTimestamp(Message)
- String getInnerTopic(Message)
- Object getInnerKey(Exchange, Message)
- Integer getInnerPartitionKey(Message)
}
Summary
`KeyValueHolderIterator.java` is a critical utility class within the Apache Camel Kafka producer module that enables iteration over collections of messages and converts each into Kafka `ProducerRecord` instances. By handling serialization, header propagation, dynamic topic and key resolution, and accommodating different input types (payloads, messages, or exchanges), it supports the flexible and robust construction of Kafka records ready for production. This iterator facilitates efficient batch sending scenarios and integrates tightly with the Kafka producer lifecycle in Camel routes.