KafkaRecordBatchingProcessor.java

Overview

`KafkaRecordBatchingProcessor` is a final class within the Apache Camel Kafka component that extends `KafkaRecordProcessor`. Its primary responsibility is to batch multiple Kafka consumer records into a single Camel `Exchange` and process them collectively. This batching mechanism optimizes throughput and resource utilization by reducing the frequency of processing calls and commits to Kafka.

Key features include:

This class is designed to work closely with the `KafkaConsumer`, `CommitManager`, and Camel's `Processor` interfaces to facilitate Kafka message consumption in a batch-oriented manner within the Camel routing framework.


Class: KafkaRecordBatchingProcessor

Package

`org.apache.camel.component.kafka.consumer.support.batching`

Extends

`KafkaRecordProcessor`

Logger


Fields

Field Name

Type

Description

`configuration`

`KafkaConfiguration`

The Kafka configuration object holding consumer and batching settings.

`processor`

`Processor`

The Camel processor to which the batch exchanges are sent for processing.

`commitManager`

`CommitManager`

Manages offset commits, both auto and manual.

`timeoutWatch`

`StopWatch`

Tracks elapsed time to trigger batch processing based on timeout.

`intervalWatch`

`StopWatch`

Tracks elapsed time to trigger batch processing based on a configured interval.

`exchangeList`

`Queue`

Queue holding individual exchanges created from Kafka consumer records, capped by max poll size.


Inner Class: CommitSynchronization

Implements

`Synchronization`

Description

Used as a callback that is invoked on batch exchange completion or failure. On successful completion, it triggers the commit of the Kafka offsets managed by `commitManager`. On failure, it handles exceptions by delegating to the configured `ExceptionHandler`.

Constructor

CommitSynchronization(ExceptionHandler exceptionHandler, int size)

Methods


Constructors

KafkaRecordBatchingProcessor

public KafkaRecordBatchingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager)

Initializes the `exchangeList` queue with capacity equal to the max poll records allowed.


Public Methods

Exchange toExchange(KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord)

Creates a Camel `Exchange` from a single Kafka `ConsumerRecord`.

Exchange exchange = batchingProcessor.toExchange(kafkaConsumer, topicPartition, consumerRecord);

ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, ConsumerRecords<Object, Object> consumerRecords)

Processes a batch of Kafka `ConsumerRecords`.

ProcessingResult result = batchingProcessor.processExchange(kafkaConsumer, consumerRecords);

Private Methods

boolean hasExpiredRecords(ConsumerRecords<Object, Object> consumerRecords)

Determines if the current batch should be processed due to timeout or batching interval expiration.


void processBatch(KafkaConsumer camelKafkaConsumer)

Processes the currently accumulated batch of exchanges.


void autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange, int size)

Handles processing and committing offsets automatically after batch processing.


void manualCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange)

Handles processing when manual commit mode is enabled.


void processException(Exchange exchange, ExceptionHandler exceptionHandler)

Handles exceptions raised during exchange processing by delegating to the configured exception handler.


Important Implementation Details


Interaction with Other Components


Usage Scenario

  1. Kafka consumer polls records from Kafka.

  2. processExchange is called with the polled records.

  3. Records are converted to exchanges and added to the batch queue.

  4. Once batch size or timeout/interval conditions are met, processBatch is triggered.

  5. Batch exchange containing multiple records is processed downstream.

  6. Offsets are committed depending on manual or auto commit config.

  7. Cycle repeats with new polled records.


Visual Diagram: Class Structure

classDiagram
    class KafkaRecordBatchingProcessor {
        - KafkaConfiguration configuration
        - Processor processor
        - CommitManager commitManager
        - StopWatch timeoutWatch
        - StopWatch intervalWatch
        - Queue~Exchange~ exchangeList
        + KafkaRecordBatchingProcessor(KafkaConfiguration, Processor, CommitManager)
        + Exchange toExchange(KafkaConsumer, TopicPartition, ConsumerRecord)
        + ProcessingResult processExchange(KafkaConsumer, ConsumerRecords)
        - boolean hasExpiredRecords(ConsumerRecords)
        - void processBatch(KafkaConsumer)
        - void autoCommitResultProcessing(KafkaConsumer, Exchange, int)
        - void manualCommitResultProcessing(KafkaConsumer, Exchange)
        - void processException(Exchange, ExceptionHandler)
    }

    class CommitSynchronization {
        - ExceptionHandler exceptionHandler
        - int size
        + CommitSynchronization(ExceptionHandler, int)
        + void onComplete(Exchange)
        + void onFailure(Exchange)
    }

    KafkaRecordBatchingProcessor "1" *-- "1" CommitSynchronization : uses >

Summary

`KafkaRecordBatchingProcessor` is a specialized processor class designed to batch Kafka consumer records within Apache Camel. It improves efficiency by aggregating records and managing commits based on batch completion or time-based triggers. The class integrates tightly with Camel’s exchange model and Kafka commit management, allowing flexible and robust processing of Kafka messages in batch mode.

This batching processor is a core component in Kafka consumers that require high throughput and controlled commit semantics within the Camel Kafka connector ecosystem.