Kafka Message Production

Overview

The **Kafka Message Production** module is responsible for sending messages from Apache Camel routes to Kafka topics. It provides flexible and configurable support for producing Kafka messages in multiple modes, including asynchronous, synchronous, and transactional sending. Additionally, it handles Kafka header propagation, serialization of keys and values, and optional metadata recording for auditing or routing purposes.

This module addresses the challenges of reliable, performant, and consistent message production to Kafka within Camel routes, accommodating various usage scenarios such as batch message sending, transactional guarantees, and message header compatibility.


Core Concepts and Purpose


How Kafka Message Production Works

KafkaProducer Class

The heart of this module is the `KafkaProducer` class (`KafkaProducer.java`), which extends Camel's `DefaultAsyncProducer` and implements the message sending logic.


Key Functionalities and Workflows

1. Topic, Key, and Partition Resolution

2. Header Propagation and Serialization

3. Sending Messages (Sync and Async)

4. Transactional Sending


Interactions with Other Components


Important Concepts and Design Patterns


Code Illustrations

Creating and Sending a Kafka Record Synchronously

ProducerRecord<Object, Object> record = createRecord(exchange, message);
Future<RecordMetadata> future = kafkaProducer.send(record);
RecordMetadata metadata = future.get(); // blocks until send completes

Asynchronous Message Sending with Callback Aggregation

KafkaProducerCallBack callback = new KafkaProducerCallBack(exchange, asyncCallback, workerPool, recordMetadata);
kafkaProducer.send(record, callback);

Transaction Start Before Sending

if (!unitOfWork.isTransactedBy(transactionId)) {
    unitOfWork.beginTransactedBy(transactionId);
    kafkaProducer.beginTransaction();
    unitOfWork.addSynchronization(new KafkaTransactionSynchronization(transactionId, kafkaProducer));
}

Header Propagation and Serialization

for (Map.Entry<String, Object> header : message.getHeaders().entrySet()) {
    if (headerFilterStrategy.applyFilterToCamelHeaders(key, value, exchange)) {
        byte[] serialized = headerSerializer.serialize(key, value);
        if (serialized != null) {
            propagatedHeaders.add(new RecordHeader(key, serialized));
        }
    }
}

Mermaid Diagram: Kafka Message Production Workflow

sequenceDiagram
    participant CamelRoute as Camel Route
    participant KafkaProducer as KafkaProducer
    participant KafkaClient as Kafka Producer Client
    participant WorkerPool as Worker Pool (Async)
    participant KafkaBroker as Kafka Broker

    CamelRoute->>KafkaProducer: process(exchange)
    alt Transaction enabled
        KafkaProducer->>KafkaProducer: startKafkaTransaction()
    end

    alt Body is Iterable and async mode
        KafkaProducer->>KafkaProducer: createRecordIterable()
        loop for each record
            KafkaProducer->>KafkaClient: send(record, callback)
            KafkaClient-->>KafkaProducer: callback.onCompletion()
        end
        KafkaProducer->>WorkerPool: submit routing continuation
    else sync or single message
        KafkaProducer->>KafkaClient: send(record)
        KafkaClient-->>KafkaProducer: send returns Future
        KafkaProducer->>KafkaClient: future.get() (wait for send)
    end

    KafkaProducer->>CamelRoute: continue routing or set exception
    KafkaClient->>KafkaBroker: publish message

This documentation explains the Kafka Message Production topic, detailing the purpose, functionality, internal workflows, and interactions within the Apache Camel Kafka component. It highlights how the `KafkaProducer` class orchestrates message sending to Kafka with support for asynchronous, synchronous, and transactional modes while managing header propagation and metadata recording.