Kafka Message Production
Overview
The **Kafka Message Production** module is responsible for sending messages from Apache Camel routes to Kafka topics. It provides flexible and configurable support for producing Kafka messages in multiple modes, including asynchronous, synchronous, and transactional sending. Additionally, it handles Kafka header propagation, serialization of keys and values, and optional metadata recording for auditing or routing purposes.
This module addresses the challenges of reliable, performant, and consistent message production to Kafka within Camel routes, accommodating various usage scenarios such as batch message sending, transactional guarantees, and message header compatibility.
Core Concepts and Purpose
Reliable Message Sending: Ensures messages from Camel exchanges are correctly packaged and sent to Kafka topics, with options for synchronous confirmation or asynchronous delivery.
Transactional Sending: Supports Kafka transactions to enable atomic writes when multiple messages are involved or when transactional guarantees are required.
Header Propagation: Converts and propagates Camel message headers into Kafka record headers, enabling context and metadata to flow alongside message payloads.
Flexible Topic and Key Resolution: Allows static or dynamic resolution of Kafka topics, keys, and partition keys from configuration or Camel message headers.
Metadata Recording: Optionally records Kafka metadata (such as partition and offset info) back into Camel messages for downstream use.
How Kafka Message Production Works
KafkaProducer Class
The heart of this module is the `KafkaProducer` class (`KafkaProducer.java`), which extends Camel's `DefaultAsyncProducer` and implements the message sending logic.
Initialization and Configuration:
On startup (doStart()), the producer prepares Kafka client properties fromKafkaConfiguration, including broker addresses, serializers, and transactional settings. If a transactional ID is configured or required, it initializes Kafka transactions (kafkaProducer.initTransactions()).Worker Pool Management:
For asynchronous sending, the producer manages anExecutorServiceworker pool to handle Kafka callback threads, ensuring Camel routing continues on appropriate threads.Message Preparation:
When processing an exchange, the producer evaluates the target Kafka topic, partition key, and message key, which can be overridden dynamically via Camel message headers or statically configured.Header Serialization:
Camel message headers are filtered and serialized into Kafka headers using a pluggable KafkaHeaderSerializer. Headers that pass the filter are converted to KafkaRecordHeaderinstances to be included in the Kafka record.Creating Kafka Records:
Messages (or collections of messages) are transformed into KafkaProducerRecordinstances, including the topic, partition, key, serialized value, timestamp, and headers.Sending Messages:
Synchronous Sending:
In synchronous mode (process()withisSynchronous() == true), messages are sent using kafkaProducer.send() followed by.get()on the returnedFutureto block until confirmation. Batch sending is supported by iterating over collections of records.Asynchronous Sending:
In asynchronous mode (process(Exchange, AsyncCallback)), the producer sends messages without blocking, using callbacks (KafkaProducerCallBack) to track completion and errors. A worker pool thread is used to continue routing once all sends complete.
Transactional Support:
If transactions are enabled, the producer begins a Kafka transaction per exchange (startKafkaTransaction()) and registers synchronization to commit or abort the transaction upon exchange completion.Metadata Handling:
If configured, the producer records KafkaRecordMetadatafor each sent message back into Camel exchanges or messages under headerKafkaConstants.KAFKA_RECORD_META.
Key Functionalities and Workflows
1. Topic, Key, and Partition Resolution
The topic is determined by checking for a header override (
KafkaConstants.OVERRIDE_TOPIC), then configuration, then endpoint URI remainder path.The key and partition key can be overridden similarly by headers (
KafkaConstants.KEY,KafkaConstants.PARTITION_KEY) or set statically in configuration.Values are converted to the expected serialized types based on configured serializers using
ProducerUtil.tryConvertToSerializedType().
2. Header Propagation and Serialization
The
getPropagatedHeaders()method extracts Camel message headers, and after filtering viaHeaderFilterStrategy, serializes them with KafkaHeaderSerializer to Kafka headers (RecordHeader).Headers that cannot be serialized are skipped.
3. Sending Messages (Sync and Async)
Synchronous Processing (
process()):
Processes a single message or an iterable collection:Future<RecordMetadata> future = kafkaProducer.send(producerRecord); future.get(); // waits for completionBatch processing collects futures then waits for them to complete, optionally recording metadata.
Asynchronous Processing (
process(Exchange, AsyncCallback)):
UsesKafkaProducerCallBackto track outstanding sends and route continuation:kafkaProducer.send(record, delegatingCallback);where
delegatingCallbackcombines metadata recording and callback counting.
4. Transactional Sending
Transactions are started if enabled, by invoking
kafkaProducer.beginTransaction()and associating the transaction with the Camel UnitOfWork.A
KafkaTransactionSynchronizationis registered to commit or abort the transaction based on exchange outcomes.
Interactions with Other Components
KafkaEndpoint and KafkaConfiguration:
KafkaProducerretrieves configuration and endpoint details to resolve topics, keys, serializers, and connection settings.Producer Support Classes:
KafkaProducerCallBack: Manages callback completion and error handling for async sends.KafkaProducerMetadataCallBack: Records Kafka metadata into Camel messages.DelegatingCallback: Combines multiple callbacks to ensure all are invoked.KeyValueHolderIterator: Iterates over message collections producing Kafka records with proper serialization and header propagation.ProducerUtil: Utility for serialization conversion and metadata/exception handling.
Camel Routing:
KafkaProducerintegrates with Camel's routing lifecycle, supporting synchronous and asynchronous routing continuation, transactional context propagation, and exception propagation.Health Checks:
KafkaProducerHealthCheckmonitors the readiness state of the Kafka producer client.
Important Concepts and Design Patterns
Pluggable Serialization:
Serialization of keys, values, and headers are configurable and extendable, allowing custom serializers.Callback Aggregation:
Asynchronous sends aggregate multiple Kafka callbacks into a single callback (KafkaProducerCallBack) that manages exchange continuation.Thread Context ClassLoader Management:
Ensures Kafka client classes load authentication providers correctly by temporarily switching thread context class loader during producer creation.Use of Exchange UnitOfWork for Transactions:
Leverages Camel'sUnitOfWorklifecycle to manage Kafka transactions consistently within route processing.Dynamic Message Handling:
Supports sending collections or iterables of messages within a single exchange, enabling batch-like semantics.
Code Illustrations
Creating and Sending a Kafka Record Synchronously
ProducerRecord<Object, Object> record = createRecord(exchange, message);
Future<RecordMetadata> future = kafkaProducer.send(record);
RecordMetadata metadata = future.get(); // blocks until send completes
Asynchronous Message Sending with Callback Aggregation
KafkaProducerCallBack callback = new KafkaProducerCallBack(exchange, asyncCallback, workerPool, recordMetadata);
kafkaProducer.send(record, callback);
Transaction Start Before Sending
if (!unitOfWork.isTransactedBy(transactionId)) {
unitOfWork.beginTransactedBy(transactionId);
kafkaProducer.beginTransaction();
unitOfWork.addSynchronization(new KafkaTransactionSynchronization(transactionId, kafkaProducer));
}
Header Propagation and Serialization
for (Map.Entry<String, Object> header : message.getHeaders().entrySet()) {
if (headerFilterStrategy.applyFilterToCamelHeaders(key, value, exchange)) {
byte[] serialized = headerSerializer.serialize(key, value);
if (serialized != null) {
propagatedHeaders.add(new RecordHeader(key, serialized));
}
}
}
Mermaid Diagram: Kafka Message Production Workflow
sequenceDiagram
participant CamelRoute as Camel Route
participant KafkaProducer as KafkaProducer
participant KafkaClient as Kafka Producer Client
participant WorkerPool as Worker Pool (Async)
participant KafkaBroker as Kafka Broker
CamelRoute->>KafkaProducer: process(exchange)
alt Transaction enabled
KafkaProducer->>KafkaProducer: startKafkaTransaction()
end
alt Body is Iterable and async mode
KafkaProducer->>KafkaProducer: createRecordIterable()
loop for each record
KafkaProducer->>KafkaClient: send(record, callback)
KafkaClient-->>KafkaProducer: callback.onCompletion()
end
KafkaProducer->>WorkerPool: submit routing continuation
else sync or single message
KafkaProducer->>KafkaClient: send(record)
KafkaClient-->>KafkaProducer: send returns Future
KafkaProducer->>KafkaClient: future.get() (wait for send)
end
KafkaProducer->>CamelRoute: continue routing or set exception
KafkaClient->>KafkaBroker: publish message
This documentation explains the Kafka Message Production topic, detailing the purpose, functionality, internal workflows, and interactions within the Apache Camel Kafka component. It highlights how the `KafkaProducer` class orchestrates message sending to Kafka with support for asynchronous, synchronous, and transactional modes while managing header propagation and metadata recording.