KafkaProducerCallBack.java

Overview

`KafkaProducerCallBack.java` defines the `KafkaProducerCallBack` class, a key utility for managing asynchronous Kafka message production within Apache Camel routes. Its primary responsibility is to track multiple Kafka send operations and coordinate the continuation of Camel routing only after all Kafka messages have been sent successfully or have encountered errors.

Kafka's asynchronous producer API sends messages in a non-blocking manner, invoking a callback upon completion. `KafkaProducerCallBack` aggregates these callbacks for potentially multiple messages (e.g., batch sends), handles exception and metadata recording, and signals Camel's asynchronous routing mechanism to continue once all sends complete.

This class also ensures thread-safe coordination using atomic counters and employs an executor service to submit routing continuation tasks off of Kafka client's internal threads, preventing thread contention or misuse of Kafka IO threads in Camel routing.


Class: KafkaProducerCallBack

public final class KafkaProducerCallBack implements Callback

Purpose

Implements Kafka's `Callback` interface to receive send completion notifications. Tracks outstanding Kafka sends with an internal count, records exceptions and Kafka metadata back into the Camel message body, and signals Camel's routing callback once all messages complete.

Key Responsibilities


Fields

Field Name

Type

Description

`body`

`Object`

The Camel message body or exchange object where exception and metadata are recorded.

`callback`

`AsyncCallback`

The Camel asynchronous callback invoked when routing can continue.

`count`

`AtomicInteger`

Tracks the number of outstanding Kafka send operations yet to complete.

`workerPool`

`ExecutorService`

Thread pool used to execute routing continuation asynchronously, off Kafka IO threads.

`record`

`boolean`

Flag indicating whether to record Kafka send metadata.

`recordMetadataList`

`List`

List of Kafka `RecordMetadata` objects collected for each successful send if metadata recording.


Constructor

public KafkaProducerCallBack(Object body, AsyncCallback callback, ExecutorService workerPool, boolean record)

Parameters:

Behavior:


Methods

void increment()

Increments the internal outstanding send counter by 1.

Usage:

Call this before sending an additional Kafka message to track it.

producerCallBack.increment();

boolean allSent()

Decrements the outstanding send count by 1. If the count reaches zero, it means all sends have completed.

Usage:

Called after initial sends to check if all work is done and routing should continue.

if (producerCallBack.allSent()) {
    // proceed with routing
}

void onCompletion(RecordMetadata recordMetadata, Exception e)

Kafka producer's callback method invoked when a send completes (successfully or with error).

Parameters:
Behavior:
Important:

The routing continuation is submitted asynchronously to avoid using Kafka IO threads for Camel routing.


private void doContinueRouting()

Internal method invoked asynchronously by the worker pool once all Kafka sends have completed.


Usage Example

// Setup callback with message body, Camel async callback, worker pool, and metadata recording enabled
KafkaProducerCallBack producerCallBack = new KafkaProducerCallBack(body, camelAsyncCallback, workerPool, true);

// For each Kafka message sent asynchronously:
producerCallBack.increment();
kafkaProducer.send(kafkaRecord, producerCallBack);

// After sending all messages:
if (producerCallBack.allSent()) {
    // All messages sent synchronously, routing can continue immediately.
}

Implementation Details and Algorithms


Interaction with Other Components


Class Diagram

classDiagram
    class KafkaProducerCallBack {
        -Object body
        -AsyncCallback callback
        -AtomicInteger count
        -ExecutorService workerPool
        -boolean record
        -List~RecordMetadata~ recordMetadataList
        +KafkaProducerCallBack(body, callback, workerPool, record)
        +void increment()
        +boolean allSent()
        +void onCompletion(RecordMetadata recordMetadata, Exception e)
        -void doContinueRouting()
    }
    KafkaProducerCallBack ..> AsyncCallback : uses
    KafkaProducerCallBack ..> ExecutorService : uses
    KafkaProducerCallBack ..> RecordMetadata : holds
}

Summary

The `KafkaProducerCallBack` class is a critical utility for managing asynchronous Kafka message production in Apache Camel's Kafka component. It aggregates multiple Kafka send completions, handles exception and metadata recording, and ensures Camel routes continue only after all Kafka messages have been processed. Its design emphasizes thread-safety, separation of Kafka IO threads from routing threads, and integration with Camel’s asynchronous routing framework.

By using an atomic counter and worker pool, it efficiently supports batch sending scenarios and integrates seamlessly with Camel's asynchronous callback model.


Visual Workflow Summary (Sequence Diagram)

sequenceDiagram
    participant CamelRoute as Camel Route
    participant KafkaProducer as KafkaProducer
    participant KafkaClient as Kafka Producer Client
    participant Callback as KafkaProducerCallBack
    participant WorkerPool as Worker Pool (ExecutorService)

    CamelRoute->>KafkaProducer: process(exchange, AsyncCallback)
    KafkaProducer->>Callback: new KafkaProducerCallBack(...)
    loop for each Kafka message
        KafkaProducer->>Callback: increment()
        KafkaProducer->>KafkaClient: send(record, Callback)
    end
    KafkaClient-->>Callback: onCompletion(metadata, exception)
    Callback->>Callback: decrement count, record metadata/exception
    alt All messages sent
        Callback->>WorkerPool: submit doContinueRouting()
        WorkerPool->>CamelRoute: AsyncCallback.done()
    end

This diagram illustrates the lifecycle of asynchronous Kafka message sending and routing continuation coordination performed by `KafkaProducerCallBack`.


References