ProducerUtil.java
Overview
`ProducerUtil` is a utility class within the Apache Camel Kafka component, specifically under the package `org.apache.camel.component.kafka.producer.support`. It provides essential helper methods that assist Kafka producers in:
Converting Camel message payloads to Kafka-serializable types based on configured serializers.
Setting exceptions on Camel
ExchangeorMessageobjects in case of errors during Kafka operations.Attaching Kafka
RecordMetadata(metadata about sent Kafka records) back to Camel message headers for downstream usage.
This class is designed as a final utility class with static methods only, preventing instantiation. It encapsulates common, reusable logic related to Kafka message production, serialization, and metadata management to keep the core producer code clean and focused.
Class: ProducerUtil
Description
A utility helper class that provides static methods for:
Converting payload objects into serialized types expected by Kafka producers according to the configured Kafka serializer.
Attaching exceptions to Camel routing objects.
Setting Kafka record metadata onto Camel messages or exchanges.
This class is package-private and intended for internal use by Kafka producer components.
Constructor
private ProducerUtil()
Private constructor to prevent instantiation of this utility class.
Methods
tryConvertToSerializedType
public static Object tryConvertToSerializedType(Exchange exchange, Object object, String valueSerializer)
Purpose:
Attempts to convert the givenobjectinto a type compatible with the Kafka value serializer configured (represented byvalueSerializerclass name). Uses Camel's type conversion system for this.Parameters:
exchange- the current CamelExchangecontext; used to access Camel’s type converter. May benull.object- the original object (value) to convert.valueSerializer- fully qualified class name of the Kafka serializer being used for the value.
Returns:
The converted object if successful and compatible with Kafka serializer expectations.
The original
objectif no conversion was possible or ifexchangeisnull.
Supported Serializers and Conversions:
KafkaConstants.KAFKA_DEFAULT_SERIALIZER(usually a String serializer): converts toString.class.org.apache.kafka.common.serialization.ByteArraySerializer: converts tobyte[].class.org.apache.kafka.common.serialization.ByteBufferSerializer: converts tojava.nio.ByteBuffer.org.apache.kafka.common.serialization.BytesSerializer: converts first tobyte[]then wraps into Kafka'sBytesobject.
Usage Example:
Exchange exchange = ...;
Object originalValue = somePojo;
String serializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
Object serializedValue = ProducerUtil.tryConvertToSerializedType(exchange, originalValue, serializerClass);
// serializedValue will be a byte[] if conversion was successful
Implementation Details:
Uses Camel'sTypeConverter.tryConvertTo()method, leveraging the Camel context and exchange to perform type conversions. ForBytesSerializer, an intermediate conversion to byte array is done before wrapping it.
setException
static void setException(Object object, Exception e)
Purpose:
Sets an exception on Camel routing artifacts (ExchangeorMessage) to propagate Kafka-related errors back into Camel routing.Parameters:
object- either a CamelExchangeorMessageinstance.e- the exception to be set.
Behavior:
If
objectis anExchange, calls itssetException(e).If
objectis aMessageand its exchange is not null, sets the exception on the exchange.If
eisnull, no action is taken.
Usage Example:
try {
// Kafka sending logic
} catch (Exception e) {
ProducerUtil.setException(exchange, e);
}
setRecordMetadata (single metadata)
static void setRecordMetadata(Object body, RecordMetadata recordMetadata)
Purpose:
Convenience method that wraps a singleRecordMetadatainstance into a singleton list and delegates tosetRecordMetadata(Object, List<RecordMetadata>).Parameters:
body- Camel routing artifact (ExchangeorMessage) on which to set the metadata header.recordMetadata- KafkaRecordMetadatainstance representing metadata of the sent record.
Usage:
Used internally to associate Kafka metadata with a Camel message or exchange after a single record send.
setRecordMetadata (list of metadata)
public static void setRecordMetadata(Object body, List<RecordMetadata> recordMetadataList)
Purpose:
Attaches a list of KafkaRecordMetadataobjects to the Camel routing artifact by setting a header with keyKafkaConstants.KAFKA_RECORD_META.Parameters:
body- anExchangeorMessageobject.recordMetadataList- a list of Kafka metadata objects corresponding to sent messages.
Behavior:
If
bodyis anExchange, sets the header on itsMessage.If
bodyis aMessage, sets the header directly on it.Header key used:
KafkaConstants.KAFKA_RECORD_META.
Usage Example:
List<RecordMetadata> metadataList = ...;
ProducerUtil.setRecordMetadata(exchange, metadataList);
Important Implementation Details
The class is final and has a private constructor to prevent instantiation.
Uses Camel's type conversion system for flexible and pluggable conversions.
Supports only a limited set of Kafka serializers for conversion; for others, it returns the original object.
The methods for setting exceptions and metadata are designed to be fail-safe and detect whether the object is an
Exchangeor aMessageand act accordingly.RecordMetadataheaders set by this utility are used for post-send processing or auditing within Camel routes.The class resides in the producer support package, indicating its role as a helper for Kafka producers (
KafkaProducer).
Interaction with Other Components
KafkaProducer:
The primary Kafka producer class callsProducerUtilmethods for:Converting message payloads to Kafka-compatible serialized types before sending.
Recording exceptions from Kafka producer callbacks.
Storing Kafka send metadata into Camel message headers.
Camel Exchange and Message:
ProducerUtilworks closely with Camel routing constructs, manipulating exchanges and messages to set exceptions or metadata.KafkaConstants:
Uses constant keys such asKafkaConstants.KAFKA_RECORD_METAto store metadata headers consistently.Camel TypeConverter:
Relies on Camel's type conversion system to convert payloads into Kafka serializer-expected classes.
Visual Diagram: Function Flow and Relationships
flowchart TD
A[Input Object] --> B{Is Exchange null?}
B -- Yes --> C[Return original object]
B -- No --> D[Check valueSerializer]
D -->|Default Serializer| E[Convert to String]
D -->|ByteArraySerializer| F[Convert to byte[]]
D -->|ByteBufferSerializer| G[Convert to ByteBuffer]
D -->|BytesSerializer| H[Convert to byte[] then wrap as Bytes]
E --> I[Return converted object or original]
F --> I
G --> I
H --> I
subgraph Exception Handling
J[Input object (Exchange or Message)]
K[Exception e]
J --> L{Is Exchange?}
L -- Yes --> M[Set exception on Exchange]
L -- No --> N{Is Message with Exchange?}
N -- Yes --> O[Set exception on Message's Exchange]
N -- No --> P[Do nothing]
end
subgraph Metadata Setting
Q[Input object (Exchange or Message)]
R[RecordMetadata or List<RecordMetadata>]
Q --> S{Exchange?}
S -- Yes --> T[Set header on Exchange's Message]
S -- No --> U{Message?}
U -- Yes --> V[Set header on Message]
U -- No --> W[No action]
end
Summary
`ProducerUtil.java` is a focused utility class that supports Kafka message production in Apache Camel by:
Converting Camel message bodies into the correct serialized form expected by Kafka producers based on serializer configuration.
Propagating exceptions from Kafka operations into the Camel routing context.
Attaching Kafka metadata to Camel messages or exchanges for downstream processing.
It plays a critical role in enabling seamless integration between Camel's routing and Kafka's producer APIs by bridging type conversions and metadata handling, ensuring reliable and transparent Kafka message production.
Usage Snippets
Convert message body to Kafka serialized type
Object serializedValue = ProducerUtil.tryConvertToSerializedType(exchange, message.getBody(), valueSerializerClassName);
Set exception on exchange or message
try {
// Kafka send logic that may throw exception
} catch (Exception e) {
ProducerUtil.setException(exchange, e);
}
Attach Kafka record metadata after sending
RecordMetadata metadata = kafkaSendResult.getRecordMetadata();
ProducerUtil.setRecordMetadata(exchange, metadata);
This documentation provides a detailed understanding of the purpose, functions, and usage of `ProducerUtil.java` within the Apache Camel Kafka component.