Asynchronous Sending

Purpose

Asynchronous Sending addresses the need for non-blocking, efficient dispatch of Kafka messages within Apache Camel routes. It enables messages to be sent to Kafka brokers without forcing the Camel route to wait for the Kafka broker's acknowledgment, thereby improving throughput and responsiveness. This subtopic specifically solves the challenge of integrating Kafka's asynchronous send API with Camel's routing and callback mechanisms, ensuring that routing continues only after Kafka confirms message receipt or failure.

Functionality

The core workflow of asynchronous sending involves creating Kafka `ProducerRecord` instances from Camel messages and invoking Kafka's `send` method with a callback handler to track completion. The subtopic introduces a dedicated callback class (`KafkaProducerCallBack`) that manages concurrency and integrates with Camel’s asynchronous routing model.

Key aspects include:

Key Workflow Snippet

@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
    KafkaProducerCallBack producerCallBack = new KafkaProducerCallBack(exchange, callback, workerPool, configuration.isRecordMetadata());
    Message message = exchange.getMessage();
    Object body = message.getBody();

    if (transactionId != null) {
        startKafkaTransaction(exchange);
    }

    if (endpoint.getConfiguration().isUseIterator() && isIterable(body)) {
        processIterableAsync(exchange, producerCallBack, message);
    } else {
        ProducerRecord<Object, Object> record = createRecord(exchange, message);
        doSend(exchange, record, producerCallBack);
    }

    return producerCallBack.allSent();
}

Here, the `KafkaProducerCallBack` orchestrates completion notification, and `doSend` submits messages to Kafka with the callback attached.

Integration

Asynchronous Sending complements the parent topic of Kafka Message Production by providing a non-blocking sending mechanism alongside synchronous and transactional sending modes. It integrates tightly with:

This subtopic extends the parent by enabling high-throughput, low-latency message production scenarios without blocking route execution, a capability not covered by synchronous or transactional sending alone.

Diagram

sequenceDiagram
    participant CamelRoute as Camel Route
    participant KafkaProducer as KafkaProducer
    participant KafkaClient as Kafka Client
    participant WorkerPool as Worker Pool (ExecutorService)
    participant Callback as KafkaProducerCallBack

    CamelRoute->>KafkaProducer: process(exchange, AsyncCallback)
    KafkaProducer->>KafkaClient: send(ProducerRecord, Callback)
    Note right of KafkaClient: Kafka sends message asynchronously
    KafkaClient-->>Callback: onCompletion(metadata, exception)
    Callback->>Callback: decrement count, collect metadata
    alt All messages sent
        Callback->>WorkerPool: submit continuation task
        WorkerPool->>CamelRoute: AsyncCallback.done()
    end

This sequence depicts how Camel initiates the async send, Kafka client processes it, the callback collects results, and routing continues asynchronously once all sends complete.


Asynchronous Sending enables efficient, scalable Kafka message production by bridging Kafka's native async API with Camel's routing lifecycle, ensuring seamless integration and reliable callback handling.