KafkaProducerCallBack.java
Overview
`KafkaProducerCallBack.java` defines the `KafkaProducerCallBack` class, a key utility for managing asynchronous Kafka message production within Apache Camel routes. Its primary responsibility is to track multiple Kafka send operations and coordinate the continuation of Camel routing only after all Kafka messages have been sent successfully or have encountered errors.
Kafka's asynchronous producer API sends messages in a non-blocking manner, invoking a callback upon completion. `KafkaProducerCallBack` aggregates these callbacks for potentially multiple messages (e.g., batch sends), handles exception and metadata recording, and signals Camel's asynchronous routing mechanism to continue once all sends complete.
This class also ensures thread-safe coordination using atomic counters and employs an executor service to submit routing continuation tasks off of Kafka client's internal threads, preventing thread contention or misuse of Kafka IO threads in Camel routing.
Class: KafkaProducerCallBack
public final class KafkaProducerCallBack implements Callback
Purpose
Implements Kafka's `Callback` interface to receive send completion notifications. Tracks outstanding Kafka sends with an internal count, records exceptions and Kafka metadata back into the Camel message body, and signals Camel's routing callback once all messages complete.
Key Responsibilities
Counting outstanding sends and detecting when all have completed.
Storing Kafka
RecordMetadatafor each successfully sent message if configured.Recording exceptions encountered during sending.
Calling back to Camel routing asynchronously using a dedicated worker thread pool.
Integrating with Camel's asynchronous routing model via
AsyncCallback.
Fields
Field Name | Type | Description |
|---|---|---|
`body` | `Object` | The Camel message body or exchange object where exception and metadata are recorded. |
`callback` | `AsyncCallback` | The Camel asynchronous callback invoked when routing can continue. |
`count` | `AtomicInteger` | Tracks the number of outstanding Kafka send operations yet to complete. |
`workerPool` | `ExecutorService` | Thread pool used to execute routing continuation asynchronously, off Kafka IO threads. |
`record` | `boolean` | Flag indicating whether to record Kafka send metadata. |
`recordMetadataList` | `List` | List of Kafka `RecordMetadata` objects collected for each successful send if metadata recording. |
Constructor
public KafkaProducerCallBack(Object body, AsyncCallback callback, ExecutorService workerPool, boolean record)
Parameters:
body— The Camel message body or exchange object to attach exceptions or metadata.callback— The Camel asynchronous callback to notify when all sends have completed.workerPool— AnExecutorServiceto submit routing continuation tasks asynchronously.record— Flag indicating whether Kafka record metadata should be collected.
Behavior:
Initializes the internal count to 1 (anticipating one send initially).
Validates that
workerPoolis not null (critical for async continuation).If
recordis true, associates the empty metadata list with the message body usingProducerUtil.setRecordMetadata()so downstream consumers can access it.
Methods
void increment()
Increments the internal outstanding send counter by 1.
Usage:
Call this before sending an additional Kafka message to track it.
producerCallBack.increment();
boolean allSent()
Decrements the outstanding send count by 1. If the count reaches zero, it means all sends have completed.
Logs a trace message indicating all messages have been sent.
Calls the Camel
AsyncCallback.done(true)method to signal synchronous completion.Returns
trueif all messages are sent; otherwise, returnsfalse.
Usage:
Called after initial sends to check if all work is done and routing should continue.
if (producerCallBack.allSent()) {
// proceed with routing
}
void onCompletion(RecordMetadata recordMetadata, Exception e)
Kafka producer's callback method invoked when a send completes (successfully or with error).
Parameters:
recordMetadata— Metadata about the sent record (topic, partition, offset).e— Exception if the send failed, otherwisenull.
Behavior:
Records exception on the message body using
ProducerUtil.setException(body, e).If metadata recording is enabled, adds the
recordMetadatatorecordMetadataList.Decrements the outstanding send count.
If this was the last outstanding message, submits a routing continuation task (
doContinueRouting()) to theworkerPool.
Important:
The routing continuation is submitted asynchronously to avoid using Kafka IO threads for Camel routing.
private void doContinueRouting()
Internal method invoked asynchronously by the worker pool once all Kafka sends have completed.
Logs a trace message indicating routing continuation within the worker thread.
Calls
callback.done(false)to notify Camel routing to continue asynchronously.
Usage Example
// Setup callback with message body, Camel async callback, worker pool, and metadata recording enabled
KafkaProducerCallBack producerCallBack = new KafkaProducerCallBack(body, camelAsyncCallback, workerPool, true);
// For each Kafka message sent asynchronously:
producerCallBack.increment();
kafkaProducer.send(kafkaRecord, producerCallBack);
// After sending all messages:
if (producerCallBack.allSent()) {
// All messages sent synchronously, routing can continue immediately.
}
Implementation Details and Algorithms
Atomic Counter Management:
Uses anAtomicIntegerinitialized to 1, incremented for each additional message sent, and decremented on each callback completion. When the count reaches zero, all sends have completed.Exception and Metadata Propagation:
Upon completion, exceptions and record metadata are recorded on the Camel message body using utility methods (ProducerUtil), enabling downstream processors to handle errors or access Kafka metadata.Thread Offloading:
Kafka's internal IO threads should not be used for Camel routing continuation. The class submits a routing continuation task to a dedicatedExecutorServiceworker pool, ensuring thread-safety and separation of concerns.Dual Callback Signaling:
The class supports both synchronous and asynchronous routing continuation via theallSent()method (which signals synchronous completion) and the asynchronouscallback.done(false)called insidedoContinueRouting().
Interaction with Other Components
KafkaProducer:
The KafkaProducer class creates and usesKafkaProducerCallBackinstances to manage asynchronous Kafka sends. It increments the callback's count for each message and passes it to Kafka'ssend()method.ProducerUtil:
Utility class that provides static methodssetException()andsetRecordMetadata()to attach exceptions and metadata to the Camel message body.Camel AsyncCallback:
KafkaProducerCallBackholds a reference to Camel'sAsyncCallbackand invokes it once all Kafka sends are finished to continue Camel route processing.ExecutorService Worker Pool:
Used to run routing continuation tasks asynchronously, preventing Kafka client threads from being blocked or misused.
Class Diagram
classDiagram
class KafkaProducerCallBack {
-Object body
-AsyncCallback callback
-AtomicInteger count
-ExecutorService workerPool
-boolean record
-List~RecordMetadata~ recordMetadataList
+KafkaProducerCallBack(body, callback, workerPool, record)
+void increment()
+boolean allSent()
+void onCompletion(RecordMetadata recordMetadata, Exception e)
-void doContinueRouting()
}
KafkaProducerCallBack ..> AsyncCallback : uses
KafkaProducerCallBack ..> ExecutorService : uses
KafkaProducerCallBack ..> RecordMetadata : holds
}
Summary
The `KafkaProducerCallBack` class is a critical utility for managing asynchronous Kafka message production in Apache Camel's Kafka component. It aggregates multiple Kafka send completions, handles exception and metadata recording, and ensures Camel routes continue only after all Kafka messages have been processed. Its design emphasizes thread-safety, separation of Kafka IO threads from routing threads, and integration with Camel’s asynchronous routing framework.
By using an atomic counter and worker pool, it efficiently supports batch sending scenarios and integrates seamlessly with Camel's asynchronous callback model.
Visual Workflow Summary (Sequence Diagram)
sequenceDiagram
participant CamelRoute as Camel Route
participant KafkaProducer as KafkaProducer
participant KafkaClient as Kafka Producer Client
participant Callback as KafkaProducerCallBack
participant WorkerPool as Worker Pool (ExecutorService)
CamelRoute->>KafkaProducer: process(exchange, AsyncCallback)
KafkaProducer->>Callback: new KafkaProducerCallBack(...)
loop for each Kafka message
KafkaProducer->>Callback: increment()
KafkaProducer->>KafkaClient: send(record, Callback)
end
KafkaClient-->>Callback: onCompletion(metadata, exception)
Callback->>Callback: decrement count, record metadata/exception
alt All messages sent
Callback->>WorkerPool: submit doContinueRouting()
WorkerPool->>CamelRoute: AsyncCallback.done()
end
This diagram illustrates the lifecycle of asynchronous Kafka message sending and routing continuation coordination performed by `KafkaProducerCallBack`.
References
KafkaProducerCallBack.java (source)
Apache Camel Kafka component documentation
Kafka Producer API (org.apache.kafka.clients.producer.Callback)
Apache Camel AsyncCallback interface
ProducerUtil utility class for metadata and exception handling