KafkaProducer.java


Overview

The `KafkaProducer` class is a core component of the Apache Camel Kafka integration, responsible for producing and sending messages from Camel routes to Apache Kafka topics. It extends Camel’s `DefaultAsyncProducer` and implements `RouteIdAware` to integrate with Camel routing and lifecycle.

This class manages Kafka producer client creation and lifecycle, message preparation (including serialization and header propagation), and supports multiple sending modes:

Additionally, it supports dynamic topic/key/partition resolution, header filtering and serialization, and optional recording of Kafka metadata back into Camel exchanges.


Class: KafkaProducer

Package

`org.apache.camel.component.kafka`

Inherits


Fields

Field Name

Type

Description

`kafkaProducer`

`org.apache.kafka.clients.producer.Producer` (generic raw type suppressed)

The underlying Kafka producer client instance used to send messages.

`producerHealthCheck`

`KafkaProducerHealthCheck`

Optional health check instance monitoring Kafka producer readiness.

`healthCheckRepository`

`WritableHealthCheckRepository`

Repository to register and unregister health checks.

`clientId`

`String`

Kafka client ID string for identifying the producer instance.

`transactionId`

`String`

Kafka transactional ID used for transactional message sending.

`endpoint`

`KafkaEndpoint`

Reference to the KafkaEndpoint from which this producer was created.

`configuration`

`KafkaConfiguration`

Holds configuration options for the Kafka producer (serializers, brokers, transactional settings).

`workerPool`

`ExecutorService`

Thread pool used for asynchronous callback processing.

`shutdownWorkerPool`

`boolean`

Flag indicating if the worker pool should be shutdown on stop (if internally created).

`closeKafkaProducer`

`volatile boolean`

Flag indicating if the Kafka producer client should be closed on stop.

`endpointTopic`

`String`

Default topic extracted from the endpoint URI.

`configPartitionKey`

`Integer`

Configured static partition key, if any.

`configKey`

`String`

Configured static key, if any.

`routeId`

`String`

The route ID this producer is associated with.


Constructors

KafkaProducer(KafkaEndpoint endpoint)


Key Methods

Lifecycle Methods

protected void doStart() throws Exception

protected void doStop() throws Exception


Kafka Producer Client Accessors

Allows external injection or retrieval of the internal Kafka producer client.


Message Sending Methods

public void process(Exchange exchange) throws Exception

**Example usage:**

ProducerRecord<Object, Object> record = createRecord(exchange, message);
Future<RecordMetadata> future = kafkaProducer.send(record);
future.get(); // wait for completion

public boolean process(Exchange exchange, AsyncCallback callback)


Message Preparation Methods

protected ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message)

protected Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(Exchange exchange, Message message)


Header Propagation

public List<Header> getPropagatedHeaders(Exchange exchange, Message message)


Topic, Key, and Partition Resolution


Transaction Support

private void startKafkaTransaction(Exchange exchange)


Internal Helpers


Important Implementation Details


Usage Examples

Example 1: Sending a Single Message Synchronously

Exchange exchange = ...; // Camel exchange with message body and headers
KafkaProducer producer = new KafkaProducer(endpoint);

producer.doStart(); // initialize

producer.process(exchange); // blocks until message is sent and acknowledged

producer.doStop(); // cleanup

Example 2: Sending Multiple Messages Asynchronously with Callback

Exchange exchange = ...; // Camel exchange with Iterable body
KafkaProducer producer = new KafkaProducer(endpoint);
AsyncCallback callback = ...; // Camel async callback

producer.doStart();

boolean doneSync = producer.process(exchange, callback);

if (!doneSync) {
    // async processing in progress
}

// Once callback.done() is called, processing completes

producer.doStop();

Interaction with Other Components


Mermaid Class Diagram

classDiagram
    class KafkaProducer {
        - org.apache.kafka.clients.producer.Producer kafkaProducer
        - KafkaProducerHealthCheck producerHealthCheck
        - WritableHealthCheckRepository healthCheckRepository
        - String clientId
        - String transactionId
        - KafkaEndpoint endpoint
        - KafkaConfiguration configuration
        - ExecutorService workerPool
        - boolean shutdownWorkerPool
        - volatile boolean closeKafkaProducer
        - String endpointTopic
        - Integer configPartitionKey
        - String configKey
        - String routeId
        + KafkaProducer(KafkaEndpoint endpoint)
        + KafkaEndpoint getEndpoint()
        + Properties getProps()
        + boolean isReady()
        + org.apache.kafka.clients.producer.Producer getKafkaProducer()
        + void setKafkaProducer(org.apache.kafka.clients.producer.Producer kafkaProducer)
        + ExecutorService getWorkerPool()
        + void setWorkerPool(ExecutorService workerPool)
        + void doStart() throws Exception
        + void doStop() throws Exception
        + void process(Exchange exchange) throws Exception
        + boolean process(Exchange exchange, AsyncCallback callback)
        + ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message)
        + Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(Exchange exchange, Message message)
        + List<Header> getPropagatedHeaders(Exchange exchange, Message message)
        + String getRouteId()
        + void setRouteId(String routeId)
    }

Summary

The `KafkaProducer` class is a sophisticated, configurable Kafka message producer for Apache Camel routes. It abstracts Kafka client details, providing seamless integration of synchronous, asynchronous, and transactional sending modes. It supports dynamic topic and key resolution, message header propagation with serialization, and health checking.

By managing Kafka producer lifecycle, threading, and transactions within the Camel routing context, it ensures robust, scalable, and reliable Kafka message production aligned with Camel’s enterprise integration patterns.


End of KafkaProducer.java Documentation