KafkaProducer.java
Overview
The `KafkaProducer` class is a core component of the Apache Camel Kafka integration, responsible for producing and sending messages from Camel routes to Apache Kafka topics. It extends Camel’s `DefaultAsyncProducer` and implements `RouteIdAware` to integrate with Camel routing and lifecycle.
This class manages Kafka producer client creation and lifecycle, message preparation (including serialization and header propagation), and supports multiple sending modes:
Synchronous sending: blocking calls waiting for Kafka acknowledgments.
Asynchronous sending: non-blocking calls leveraging callbacks and worker threads.
Transactional sending: leveraging Kafka’s transactional APIs for atomic multi-message publishes.
Additionally, it supports dynamic topic/key/partition resolution, header filtering and serialization, and optional recording of Kafka metadata back into Camel exchanges.
Class: KafkaProducer
Package
`org.apache.camel.component.kafka`
Inherits
org.apache.camel.support.DefaultAsyncProducerImplements
org.apache.camel.spi.RouteIdAware
Fields
Field Name | Type | Description |
|---|---|---|
`kafkaProducer` | `org.apache.kafka.clients.producer.Producer` (generic raw type suppressed) | The underlying Kafka producer client instance used to send messages. |
`producerHealthCheck` | `KafkaProducerHealthCheck` | Optional health check instance monitoring Kafka producer readiness. |
`healthCheckRepository` | `WritableHealthCheckRepository` | Repository to register and unregister health checks. |
`clientId` | `String` | Kafka client ID string for identifying the producer instance. |
`transactionId` | `String` | Kafka transactional ID used for transactional message sending. |
`endpoint` | `KafkaEndpoint` | Reference to the KafkaEndpoint from which this producer was created. |
`configuration` | `KafkaConfiguration` | Holds configuration options for the Kafka producer (serializers, brokers, transactional settings). |
`workerPool` | `ExecutorService` | Thread pool used for asynchronous callback processing. |
`shutdownWorkerPool` | `boolean` | Flag indicating if the worker pool should be shutdown on stop (if internally created). |
`closeKafkaProducer` | `volatile boolean` | Flag indicating if the Kafka producer client should be closed on stop. |
`endpointTopic` | `String` | Default topic extracted from the endpoint URI. |
`configPartitionKey` | `Integer` | Configured static partition key, if any. |
`configKey` | `String` | Configured static key, if any. |
`routeId` | `String` | The route ID this producer is associated with. |
Constructors
KafkaProducer(KafkaEndpoint endpoint)
Parameters:
endpoint- TheKafkaEndpointinstance this producer is associated with.
Description:
Constructs a newKafkaProducerlinked to the given Kafka endpoint. It extracts topic and configuration defaults from the endpoint.
Key Methods
Lifecycle Methods
protected void doStart() throws Exception
Starts the Kafka producer:
Initializes Kafka producer properties.
Creates the Kafka client producer if not already provided.
Initializes Kafka transactions if configured.
Creates or reuses a worker pool for asynchronous sending.
Retrieves client ID via properties or reflection.
Registers health check if enabled.
protected void doStop() throws Exception
Stops the Kafka producer:
Removes health check registration.
Closes Kafka producer client if owned.
Gracefully shuts down the worker pool if created internally.
Kafka Producer Client Accessors
public org.apache.kafka.clients.producer.Producer getKafkaProducer()public void setKafkaProducer(org.apache.kafka.clients.producer.Producer kafkaProducer)
Allows external injection or retrieval of the internal Kafka producer client.
Message Sending Methods
public void process(Exchange exchange) throws Exception
Synchronous processing method called by Camel when endpoint is synchronous.
Supports single message or iterable body.
If transactions are enabled, starts a Kafka transaction.
Sends messages synchronously, blocking until Kafka acknowledges.
**Example usage:**
ProducerRecord<Object, Object> record = createRecord(exchange, message);
Future<RecordMetadata> future = kafkaProducer.send(record);
future.get(); // wait for completion
public boolean process(Exchange exchange, AsyncCallback callback)
Asynchronous processing method called when endpoint is asynchronous.
Uses
KafkaProducerCallBackto manage completion.Supports sending collections of messages asynchronously.
Starts Kafka transactions if configured.
Returns immediately; routing continues after Kafka callbacks complete.
Message Preparation Methods
protected ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message)
Creates a Kafka
ProducerRecordfrom the Camel message.Resolves topic, partition key, message key, and timestamp (supports override headers).
Serializes key and value according to configured serializers.
Propagates filtered and serialized Camel headers as Kafka headers.
protected Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(Exchange exchange, Message message)
For iterable message bodies, creates an iterator of Kafka
ProducerRecordwrappers.Supports batch sending of multiple messages within a single exchange.
Header Propagation
public List<Header> getPropagatedHeaders(Exchange exchange, Message message)
Extracts Camel message headers.
Filters headers using the configured
HeaderFilterStrategy.Serializes header values with
KafkaHeaderSerializer.Creates Kafka
RecordHeaderinstances for inclusion in Kafka messages.
Topic, Key, and Partition Resolution
Topic is resolved in order:
Override topic header (
KafkaConstants.OVERRIDE_TOPIC)Configuration topic
Endpoint URI remainder path
Key and partition key can be overridden via headers or static configuration.
Transaction Support
private void startKafkaTransaction(Exchange exchange)
Starts a Kafka transaction if not already begun for this exchange.
Registers a
KafkaTransactionSynchronizationwith Camel’sUnitOfWorkfor commit/abort handling.
Internal Helpers
private boolean isIterable(Object body): Checks if message body is iterable or iterator.private Iterator<Object> getObjectIterator(Object body): Converts body to iterator.private RecordHeader getRecordHeader(Map.Entry<String, Object> entry, Exchange exchange): Serializes and filters headers.private Object getOverrideKey(Message message): Resolves message key.private Integer getOverridePartitionKey(Message message): Resolves partition key.private String evaluateTopic(Message message): Resolves topic.
Important Implementation Details
Thread Context ClassLoader Management:
During Kafka producer creation, the thread context class loader is temporarily switched to Kafka's class loader to support Kafka's internal reflection-based authentication loading.Health Checking:
The producer registers with Camel’s health check framework if enabled, using reflection to inspect Kafka client readiness.Callback Aggregation for Async Sends:
UsesKafkaProducerCallBackandDelegatingCallbackto combine Kafka callbacks and signal Camel once all messages are sent.Support for Iterable Message Bodies:
The class supports batch sending by iterating over collections or iterators in the message body.Transactional Integration with Camel UnitOfWork:
Transactions are tightly coupled with Camel unit of work lifecycle, ensuring consistent commit or rollback semantics aligned with route processing.
Usage Examples
Example 1: Sending a Single Message Synchronously
Exchange exchange = ...; // Camel exchange with message body and headers
KafkaProducer producer = new KafkaProducer(endpoint);
producer.doStart(); // initialize
producer.process(exchange); // blocks until message is sent and acknowledged
producer.doStop(); // cleanup
Example 2: Sending Multiple Messages Asynchronously with Callback
Exchange exchange = ...; // Camel exchange with Iterable body
KafkaProducer producer = new KafkaProducer(endpoint);
AsyncCallback callback = ...; // Camel async callback
producer.doStart();
boolean doneSync = producer.process(exchange, callback);
if (!doneSync) {
// async processing in progress
}
// Once callback.done() is called, processing completes
producer.doStop();
Interaction with Other Components
KafkaEndpoint: Supplies configuration, Kafka client factory, and endpoint-specific details.
KafkaConfiguration: Provides producer properties, serializers, transactional settings.
Producer Support Classes:
KafkaProducerCallBackmanages async send callbacks.KafkaProducerMetadataCallBackrecords Kafka metadata into Camel messages.DelegatingCallbackaggregates multiple callbacks.KeyValueHolderIteratoriterates over message bodies producing Kafka records.ProducerUtilassists with serialization conversion and metadata handling.
Camel Core: Integrates with Camel routing, exchanges, unit of work, and health check repositories.
Kafka Client Library: Uses Kafka’s producer API for message sending, transaction management, and client lifecycle.
Mermaid Class Diagram
classDiagram
class KafkaProducer {
- org.apache.kafka.clients.producer.Producer kafkaProducer
- KafkaProducerHealthCheck producerHealthCheck
- WritableHealthCheckRepository healthCheckRepository
- String clientId
- String transactionId
- KafkaEndpoint endpoint
- KafkaConfiguration configuration
- ExecutorService workerPool
- boolean shutdownWorkerPool
- volatile boolean closeKafkaProducer
- String endpointTopic
- Integer configPartitionKey
- String configKey
- String routeId
+ KafkaProducer(KafkaEndpoint endpoint)
+ KafkaEndpoint getEndpoint()
+ Properties getProps()
+ boolean isReady()
+ org.apache.kafka.clients.producer.Producer getKafkaProducer()
+ void setKafkaProducer(org.apache.kafka.clients.producer.Producer kafkaProducer)
+ ExecutorService getWorkerPool()
+ void setWorkerPool(ExecutorService workerPool)
+ void doStart() throws Exception
+ void doStop() throws Exception
+ void process(Exchange exchange) throws Exception
+ boolean process(Exchange exchange, AsyncCallback callback)
+ ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message)
+ Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(Exchange exchange, Message message)
+ List<Header> getPropagatedHeaders(Exchange exchange, Message message)
+ String getRouteId()
+ void setRouteId(String routeId)
}
Summary
The `KafkaProducer` class is a sophisticated, configurable Kafka message producer for Apache Camel routes. It abstracts Kafka client details, providing seamless integration of synchronous, asynchronous, and transactional sending modes. It supports dynamic topic and key resolution, message header propagation with serialization, and health checking.
By managing Kafka producer lifecycle, threading, and transactions within the Camel routing context, it ensures robust, scalable, and reliable Kafka message production aligned with Camel’s enterprise integration patterns.