DelegatingCallback.java
Overview
`DelegatingCallback.java` defines a utility class **DelegatingCallback** that implements Kafka's `Callback` interface and delegates callback invocations to two separate `Callback` instances. This class is primarily used in the Kafka producer component of Apache Camel to combine multiple callback behaviors into a single callback handler. It ensures that both callbacks are executed when Kafka producer operations complete, and it gracefully handles exceptions thrown by either callback to prevent disruption of the other.
Class: DelegatingCallback
public final class DelegatingCallback implements Callback
Purpose
The `DelegatingCallback` class acts as a composite callback handler for Kafka producer callbacks. When Kafka sends a message asynchronously, it can invoke a callback to signal completion or failure. This class allows two callbacks to be registered and invoked sequentially for a single Kafka send operation, facilitating modular handling of Kafka send completions such as metadata recording, error handling, or routing continuation.
Key Features
Implements the Kafka
Callbackinterface.Holds references to two
Callbackinstances.Invokes both callbacks sequentially in
onCompletion.Catches and logs exceptions from each callback to ensure both are invoked regardless of individual failures.
Uses SLF4J for logging warnings on exceptions.
Constructor
DelegatingCallback(Callback callback1, Callback callback2)
Creates a new `DelegatingCallback` that delegates calls to two callbacks.
Parameters:
callback1– The firstCallbackinstance to delegate to. Must not be null.callback2– The secondCallbackinstance to delegate to. Must not be null.
Usage example:
Callback firstCallback = new KafkaProducerMetadataCallBack(...);
Callback secondCallback = new KafkaProducerCallBack(...);
Callback delegatingCallback = new DelegatingCallback(firstCallback, secondCallback);
// Pass this delegatingCallback to kafkaProducer.send(record, delegatingCallback);
Method
void onCompletion(RecordMetadata metadata, Exception exception)
Invoked by Kafka producer when a send operation completes, either successfully or with an exception.
Parameters:
metadata– KafkaRecordMetadatadescribing the sent record (may be null if an exception occurred).exception– Exception that occurred during send, or null if send was successful.
Behavior:
Invokes
callback1.onCompletion(metadata, exception)If this throws an exception, logs a warning but does not propagate the exception.
Invokes
callback2.onCompletion(metadata, exception)Similarly logs and ignores any exceptions.
Ensures both callbacks are always invoked even if one fails.
Usage scenario:
This method is called internally by Kafka producer client when an asynchronous send completes.
Implementation Details
Exception safety:
The class carefully isolates exceptions thrown by each delegate callback inside separate try-catch blocks. This design prevents one callback's failure from preventing the other from executing.Logging:
Uses SLF4JLoggerto log warnings if any callback throws an exception, aiding debugging without breaking the Kafka producer workflow.Final class:
The class is declaredfinalto prevent subclassing, emphasizing its utility nature and fixed delegation behavior.
Interaction with Kafka Producer Component
The
DelegatingCallbackis typically used in the Apache Camel Kafka producer support classes where multiple callback behaviors need to be combined.For example, in the Kafka producer asynchronous sending workflow, one callback might handle exchange continuation and error handling (
KafkaProducerCallBack), while another records Kafka metadata (KafkaProducerMetadataCallBack).By using
DelegatingCallback, these two concerns are cleanly separated but invoked together on Kafka send completion.This contributes to the modular design of the Camel Kafka producer, enabling flexible extension and composition of callback logic.
Example Usage in Kafka Producer Asynchronous Send
// Create individual callbacks
Callback metadataCallback = new KafkaProducerMetadataCallBack(exchange, recordMetadata);
Callback producerCallback = new KafkaProducerCallBack(exchange, asyncCallback, workerPool, recordMetadata);
// Combine callbacks into one
Callback delegatingCallback = new DelegatingCallback(metadataCallback, producerCallback);
// Send Kafka record asynchronously with combined callback
kafkaProducer.send(producerRecord, delegatingCallback);
Mermaid Class Diagram
classDiagram
class DelegatingCallback {
-Callback callback1
-Callback callback2
+DelegatingCallback(Callback callback1, Callback callback2)
+void onCompletion(RecordMetadata metadata, Exception exception)
}
DelegatingCallback ..|> Callback
Summary
The **DelegatingCallback** class is a small but crucial utility in Apache Camel's Kafka producer support that allows two Kafka producer callbacks to be combined and invoked as one. It ensures robust and modular callback handling by isolating exceptions in each delegate and logging warnings without interrupting the Kafka send lifecycle. This class enables clean composition of callback functionality such as metadata recording and routing continuation, fitting neatly into the asynchronous Kafka message production workflow.
If you are working with the Apache Camel Kafka component's producer code, understanding and using `DelegatingCallback` helps you extend or customize Kafka producer callbacks safely without disrupting the existing callback chain or error handling logic.