KeyValueHolderIterator.java


Overview

`KeyValueHolderIterator.java` defines the `KeyValueHolderIterator` class, an iterator designed to convert and wrap a collection of message objects into Kafka producer records (`ProducerRecord`) within the Apache Camel Kafka component.

Its primary role is to iterate over messages (which can be raw payloads, Camel `Message`, or Camel `Exchange` objects) and generate corresponding Kafka `ProducerRecord` instances with correct topic, key, partition, timestamp, serialized value, and propagated Kafka headers. This iterator facilitates batch or individual message sending by transforming messages into Kafka-ready entries, handling all necessary serialization and header propagation based on the configured Kafka setup and Camel exchange context.


Detailed Class and Method Descriptions

Class: KeyValueHolderIterator

**Package:** `org.apache.camel.component.kafka.producer.support`

**Implements:** `Iterator>>`

This class wraps an iterator over raw message objects and converts each element into a `KeyValueHolder` holding:

The `ProducerRecord` is constructed with dynamically resolved topic, key, partition, timestamp, and Kafka headers, applying serialization logic as per the Kafka configuration.


Constructor

public KeyValueHolderIterator(
    Iterator<Object> msgList,
    Exchange exchange,
    KafkaConfiguration kafkaConfiguration,
    String msgTopic,
    PropagatedHeadersProvider propagatedHeadersProvider)

Parameters:


Methods

boolean hasNext()

KeyValueHolder<Object, ProducerRecord<Object, Object>> next()

void remove()


Private Helper Methods

These methods assist `next()` in extracting and resolving message details.

Message getInnerMessage(Object object)

Exchange getInnerExchange(Object object)

Long getOverrideTimestamp(Message innerMessage)

String getInnerTopic(Message innerMessage)

Object getInnerKey(Exchange innerExchange, Message innerMessage)

Integer getInnerPartitionKey(Message innerMessage)


Usage Example

Iterator<Object> messages = ...; // e.g. List<Object> or Exchange Body collection
Exchange exchange = ...;          // Current Camel exchange
KafkaConfiguration config = ...;  // Kafka producer configuration
String defaultTopic = "myTopic";
PropagatedHeadersProvider headersProvider = ...;

KeyValueHolderIterator iterator = new KeyValueHolderIterator(
    messages, exchange, config, defaultTopic, headersProvider);

while (iterator.hasNext()) {
    KeyValueHolder<Object, ProducerRecord<Object, Object>> kv =
        iterator.next();
    
    // kv.getKey() - original message object
    // kv.getValue() - Kafka ProducerRecord ready to send
    
    kafkaProducer.send(kv.getValue());
}

Important Implementation Details


Interaction with Other System Components


Mermaid Class Diagram

classDiagram
    class KeyValueHolderIterator {
        - Iterator<Object> msgList
        - Exchange exchange
        - KafkaConfiguration kafkaConfiguration
        - String msgTopic
        - PropagatedHeadersProvider propagatedHeadersProvider
        + KeyValueHolderIterator(Iterator<Object>, Exchange, KafkaConfiguration, String, PropagatedHeadersProvider)
        + boolean hasNext()
        + KeyValueHolder<Object, ProducerRecord<Object, Object>> next()
        + void remove()
        - Message getInnerMessage(Object)
        - Exchange getInnerExchange(Object)
        - Long getOverrideTimestamp(Message)
        - String getInnerTopic(Message)
        - Object getInnerKey(Exchange, Message)
        - Integer getInnerPartitionKey(Message)
    }

Summary

`KeyValueHolderIterator.java` is a critical utility class within the Apache Camel Kafka producer module that enables iteration over collections of messages and converts each into Kafka `ProducerRecord` instances. By handling serialization, header propagation, dynamic topic and key resolution, and accommodating different input types (payloads, messages, or exchanges), it supports the flexible and robust construction of Kafka records ready for production. This iterator facilitates efficient batch sending scenarios and integrates tightly with the Kafka producer lifecycle in Camel routes.


End of Documentation for KeyValueHolderIterator.java