KafkaProducerMetadataCallBack.java
Overview
`KafkaProducerMetadataCallBack.java` defines a simple but crucial helper class used within the Apache Camel Kafka producer module to handle Kafka producer callbacks. Its primary responsibility is to process the completion of Kafka message sends by capturing any exceptions and optionally storing the Kafka record metadata into the message body.
This callback implementation integrates tightly with Apache Camel's message exchange model, enabling the propagation of Kafka producer results (success or failure) back into Camel message bodies for subsequent processing or auditing.
Class: KafkaProducerMetadataCallBack
Purpose
This class implements Kafka's `Callback` interface and acts as a handler for asynchronous Kafka producer send completions. Upon completion, it:
Sets any exception encountered during the send into the Camel message body.
Optionally sets the Kafka
RecordMetadata(such as topic, partition, offset) into the message body if configured to do so.
This allows Camel routes to access Kafka metadata or errors related to message production without blocking or polling.
Package
package org.apache.camel.component.kafka.producer.support;
Imports
org.apache.kafka.clients.producer.Callback— Kafka's callback interface for producer send completion.org.apache.kafka.clients.producer.RecordMetadata— Metadata about the sent Kafka record.Static utility methods from
ProducerUtil:setException(Object body, Exception e)— sets exception info into the message body.setRecordMetadata(Object body, RecordMetadata metadata)— sets Kafka metadata into message body.
Declaration
public class KafkaProducerMetadataCallBack implements Callback
Implements Kafka's `Callback` interface.
Fields
Field | Type | Description |
|---|---|---|
`body` | Object | The message body (usually Camel message body) to attach results. |
`recordMetadata` | boolean | Flag indicating if Kafka record metadata should be set on body. |
Constructor
public KafkaProducerMetadataCallBack(Object body, boolean recordMetadata)
Parameters:
body: The message body object to which exceptions or metadata will be attached.recordMetadata: A boolean flag indicating whether to record Kafka metadata into the body.
Description:
Creates a new callback instance configured to optionally store the Kafka metadata on completion.Usage example:
Object messageBody = exchange.getIn().getBody(); boolean storeMetadata = true; KafkaProducerMetadataCallBack callback = new KafkaProducerMetadataCallBack(messageBody, storeMetadata);
Method: onCompletion
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e)
Parameters:
recordMetadata: KafkaRecordMetadataobject containing information about the sent record (topic, partition, offset, timestamp).e: Exception thrown during sending if any, otherwisenull.
Description:
This method is invoked by Kafka client when a send operation completes. It performs two main actions:Calls
ProducerUtil.setException(body, e)to attach any exception to the message body for Camel to handle.If
recordMetadataflag istrue, callsProducerUtil.setRecordMetadata(body, recordMetadata)to attach metadata into the body.
Return Value:
Void.Example usage:
This method is called automatically by Kafka producer after sending a message asynchronously. End users do not call it directly.
Implementation Details
Relies on
ProducerUtilstatic methods to set exception and metadata on the message body.The message body is typically a Camel message body or an object that supports metadata/exception attachment.
The class does not contain logic for thread management or routing continuation; it purely focuses on updating the message content on Kafka send completion.
Lightweight and immutable: the
bodyandrecordMetadataflag are final and set during construction.
Interaction with Other Components
KafkaProducer:
This callback is often instantiated and passed to Kafka producer'ssend()method within Camel's Kafka producer implementation (KafkaProducerclass). It complements other callback classes likeKafkaProducerCallBackwhich manages routing continuation.ProducerUtil:
Utilizes helper methods inProducerUtilfor safely setting exceptions and metadata on various message body types.Camel Exchange:
Thebodyparameter is typically derived from the CamelExchangeorMessage. By updating the body, this callback helps propagate Kafka send results back into Camel routing.
Usage Context
In the Kafka asynchronous sending process, when the Kafka client finishes sending a message, it calls the registered `Callback.onCompletion()` method. This class is used when the producer configuration or logic requires Kafka record metadata to be captured and/or exceptions to be recorded on the message body, allowing downstream Camel processors to react accordingly.
Example Workflow
Camel route sends message via KafkaProducer.
KafkaProducer creates a
KafkaProducerMetadataCallBackinstance passing the message body and metadata recording flag.Kafka producer sends the message asynchronously with this callback.
Kafka client invokes
onCompletion()after sending completes.The callback sets exception or metadata on the message body.
Camel route continues processing with updated message information.
Mermaid Class Diagram
classDiagram
class KafkaProducerMetadataCallBack {
-Object body
-boolean recordMetadata
+KafkaProducerMetadataCallBack(body: Object, recordMetadata: boolean)
+onCompletion(recordMetadata: RecordMetadata, e: Exception) void
}
KafkaProducerMetadataCallBack ..> Callback : implements
KafkaProducerMetadataCallBack ..> ProducerUtil : uses
Summary
`KafkaProducerMetadataCallBack` is a focused, utility callback class used in the Apache Camel Kafka component to:
Capture and record exceptions from Kafka producer send operations.
Optionally record Kafka
RecordMetadatasuch as topic, partition, and offset into the Camel message body.
It works as part of the asynchronous Kafka message sending infrastructure within Camel, enabling Kafka producer send results to be integrated into Camel's routing and error handling mechanisms smoothly, without introducing complex threading or routing logic.
This class is a small but essential bridge between Kafka's asynchronous callback model and Camel's message exchange processing model.