KafkaRecordBatchingProcessor.java
Overview
`KafkaRecordBatchingProcessor` is a final class within the Apache Camel Kafka component that extends `KafkaRecordProcessor`. Its primary responsibility is to batch multiple Kafka consumer records into a single Camel `Exchange` and process them collectively. This batching mechanism optimizes throughput and resource utilization by reducing the frequency of processing calls and commits to Kafka.
Key features include:
Aggregation of individual Kafka
ConsumerRecordinstances into batches.Processing of batches either when the batch size limit is reached or when a timeout/interval expires.
Support for both automatic and manual offset commit strategies.
Error handling and commit synchronization to ensure reliability during batch processing.
This class is designed to work closely with the `KafkaConsumer`, `CommitManager`, and Camel's `Processor` interfaces to facilitate Kafka message consumption in a batch-oriented manner within the Camel routing framework.
Class: KafkaRecordBatchingProcessor
Package
`org.apache.camel.component.kafka.consumer.support.batching`
Extends
`KafkaRecordProcessor`
Logger
private static final Logger LOG- Used for debug and warning logs related to batch processing.
Fields
Field Name | Type | Description |
|---|---|---|
`configuration` | `KafkaConfiguration` | The Kafka configuration object holding consumer and batching settings. |
`processor` | `Processor` | The Camel processor to which the batch exchanges are sent for processing. |
`commitManager` | `CommitManager` | Manages offset commits, both auto and manual. |
`timeoutWatch` | `StopWatch` | Tracks elapsed time to trigger batch processing based on timeout. |
`intervalWatch` | `StopWatch` | Tracks elapsed time to trigger batch processing based on a configured interval. |
`exchangeList` | `Queue` | Queue holding individual exchanges created from Kafka consumer records, capped by max poll size. |
Inner Class: CommitSynchronization
Implements
`Synchronization`
Description
Used as a callback that is invoked on batch exchange completion or failure. On successful completion, it triggers the commit of the Kafka offsets managed by `commitManager`. On failure, it handles exceptions by delegating to the configured `ExceptionHandler`.
Constructor
CommitSynchronization(ExceptionHandler exceptionHandler, int size)
exceptionHandler: Handles exceptions during synchronization.size: Number of exchanges in the batch (for logging purposes).
Methods
void onComplete(Exchange exchange)
Called when batch processing completes successfully. Commits the Kafka offsets.void onFailure(Exchange exchange)
Called when batch processing fails. Logs and handles exceptions.
Constructors
KafkaRecordBatchingProcessor
public KafkaRecordBatchingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager)
configuration: Kafka consumer and batching configurations.processor: The CamelProcessorto process the batched exchanges.commitManager: Manages Kafka offset commits.
Initializes the `exchangeList` queue with capacity equal to the max poll records allowed.
Public Methods
Exchange toExchange(KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord)
Creates a Camel `Exchange` from a single Kafka `ConsumerRecord`.
Parameters:
camelKafkaConsumer: The Kafka consumer instance creating the exchange.topicPartition: Kafka topic partition of the record.consumerRecord: The Kafka record to convert.
Returns:
A newExchangepopulated with the consumer record data and headers. If manual commit is enabled, aKafkaManualCommitheader is added.Usage Example:
Exchange exchange = batchingProcessor.toExchange(kafkaConsumer, topicPartition, consumerRecord);
ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, ConsumerRecords<Object, Object> consumerRecords)
Processes a batch of Kafka `ConsumerRecords`.
Parameters:
camelKafkaConsumer: The Kafka consumer instance.consumerRecords: The batch of consumer records fetched from Kafka.
Returns:
AProcessingResultindicating the processing status (always returns unprocessed in this batch context).Behavior:
Aggregates individual records into exchanges and stores them in
exchangeList.Checks if the batch timeout or interval has expired; if so, processes the current batch.
Processes batch when the max poll records threshold is reached.
Resets timers appropriately after batch processing.
Usage Example:
ProcessingResult result = batchingProcessor.processExchange(kafkaConsumer, consumerRecords);
Private Methods
boolean hasExpiredRecords(ConsumerRecords<Object, Object> consumerRecords)
Determines if the current batch should be processed due to timeout or batching interval expiration.
Parameters:
consumerRecords: The newly polled consumer records.
Returns:
trueif a timeout or interval has expired requiring batch processing, elsefalse.
void processBatch(KafkaConsumer camelKafkaConsumer)
Processes the currently accumulated batch of exchanges.
Creates a new Camel exchange containing a list of all individual exchanges as the message body.
Depending on configuration, invokes either manual or auto commit processing flows.
Releases the exchange after processing.
void autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange, int size)
Handles processing and committing offsets automatically after batch processing.
Adds a
CommitSynchronizationcallback to the exchange's lifecycle.Processes the exchange using the configured processor.
Handles exceptions if processing fails.
void manualCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange)
Handles processing when manual commit mode is enabled.
Processes the batch exchange using the configured processor.
Handles exceptions but leaves commit responsibility to the integration.
void processException(Exchange exchange, ExceptionHandler exceptionHandler)
Handles exceptions raised during exchange processing by delegating to the configured exception handler.
Important Implementation Details
Batching Strategy: Records are accumulated into
exchangeListuntil either the configured max poll size is reached or a timeout/interval triggers batch processing.Timers: Two
StopWatchtimers track elapsed time since last batch processing for timeout and interval triggering.Manual vs Auto Commit: Supports both commit strategies:
Auto commit triggers commits automatically after successful processing using
CommitSynchronization.Manual commit passes a
KafkaManualCommittoken header to allow integration-level commit control.
Exchange Aggregation: The batch exchange contains a list of individual exchanges as its message body, enabling downstream processors to handle multiple records at once.
Error Handling: Uses Camel's
ExceptionHandlerto log and manage exceptions during batch processing.
Interaction with Other Components
KafkaConsumer: Creates exchanges and manages release of exchanges post-processing.
Processor: The business logic or route processor that handles the batch of records.
CommitManager: Handles offset commits, either manual or auto, ensuring offsets are correctly committed after processing.
KafkaConfiguration: Provides batch size, timeout, interval, and commit-related configuration.
Exchange & Message: Core Camel abstractions used to carry Kafka records and metadata through the routing engine.
ExceptionHandler: Manages errors occurring during processing or commit operations.
Usage Scenario
Kafka consumer polls records from Kafka.
processExchangeis called with the polled records.Records are converted to exchanges and added to the batch queue.
Once batch size or timeout/interval conditions are met,
processBatchis triggered.Batch exchange containing multiple records is processed downstream.
Offsets are committed depending on manual or auto commit config.
Cycle repeats with new polled records.
Visual Diagram: Class Structure
classDiagram
class KafkaRecordBatchingProcessor {
- KafkaConfiguration configuration
- Processor processor
- CommitManager commitManager
- StopWatch timeoutWatch
- StopWatch intervalWatch
- Queue~Exchange~ exchangeList
+ KafkaRecordBatchingProcessor(KafkaConfiguration, Processor, CommitManager)
+ Exchange toExchange(KafkaConsumer, TopicPartition, ConsumerRecord)
+ ProcessingResult processExchange(KafkaConsumer, ConsumerRecords)
- boolean hasExpiredRecords(ConsumerRecords)
- void processBatch(KafkaConsumer)
- void autoCommitResultProcessing(KafkaConsumer, Exchange, int)
- void manualCommitResultProcessing(KafkaConsumer, Exchange)
- void processException(Exchange, ExceptionHandler)
}
class CommitSynchronization {
- ExceptionHandler exceptionHandler
- int size
+ CommitSynchronization(ExceptionHandler, int)
+ void onComplete(Exchange)
+ void onFailure(Exchange)
}
KafkaRecordBatchingProcessor "1" *-- "1" CommitSynchronization : uses >
Summary
`KafkaRecordBatchingProcessor` is a specialized processor class designed to batch Kafka consumer records within Apache Camel. It improves efficiency by aggregating records and managing commits based on batch completion or time-based triggers. The class integrates tightly with Camel’s exchange model and Kafka commit management, allowing flexible and robust processing of Kafka messages in batch mode.
This batching processor is a core component in Kafka consumers that require high throughput and controlled commit semantics within the Camel Kafka connector ecosystem.