KafkaRecordStreamingProcessor.java


Overview

`KafkaRecordStreamingProcessor` is a final class within the Apache Camel Kafka component that specializes in processing Kafka consumer records in a streaming context. It extends the abstract functionality of `KafkaRecordProcessor` to handle each Kafka `ConsumerRecord` by converting it into a Camel `Exchange`, applying user-defined processing logic, and managing offset commits in accordance with configured commit strategies.

This processor supports fine-grained control over Kafka message consumption, including manual offset commits, error handling policies, and message header propagation, making it suitable for real-time streaming applications where precise offset management and error resilience are critical.


Class: KafkaRecordStreamingProcessor

Purpose

Package

`org.apache.camel.component.kafka.consumer.support.streaming`

Inheritance


Fields

Field

Type

Description

LOG

`Logger`

SLF4J Logger for logging processing events and exceptions.

`autoCommitEnabled`

`boolean`

Flag indicating if Kafka consumer auto-commit is enabled (`true`) or manual commit is used (`false`).

`configuration`

`KafkaConfiguration`

Holds Kafka consumer configuration settings.

`processor`

`Processor`

The Camel Processor that defines how to process the exchange created from a Kafka record.

`commitManager`

`CommitManager`

Manages offset commits, supporting both manual and automatic commit modes.


Constructor

KafkaRecordStreamingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager)

Initializes the streaming processor with required components.


Methods

ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, boolean partitionHasNext, boolean recordHasNext, ConsumerRecord<Object, Object> consumerRecord)

Processes a single Kafka consumer record within a streaming workflow.


private boolean processException(Exchange exchange, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord, ExceptionHandler exceptionHandler)

Handles exceptions thrown during exchange processing based on configuration.


Important Implementation Details


Interaction with Other Components


Visual Diagram

classDiagram
    class KafkaRecordStreamingProcessor {
        -boolean autoCommitEnabled
        -KafkaConfiguration configuration
        -Processor processor
        -CommitManager commitManager
        +KafkaRecordStreamingProcessor(configuration, processor, commitManager)
        +ProcessingResult processExchange(camelKafkaConsumer, topicPartition, partitionHasNext, recordHasNext, consumerRecord)
        -boolean processException(exchange, topicPartition, consumerRecord, exceptionHandler)
    }

    KafkaRecordStreamingProcessor --|> KafkaRecordProcessor

    KafkaRecordStreamingProcessor ..> KafkaConfiguration : uses
    KafkaRecordStreamingProcessor ..> Processor : uses
    KafkaRecordStreamingProcessor ..> CommitManager : uses
    KafkaRecordStreamingProcessor ..> KafkaConsumer : interacts with
    KafkaRecordStreamingProcessor ..> ProcessingResult : returns
    KafkaRecordStreamingProcessor ..> ExceptionHandler : interacts with

Summary

`KafkaRecordStreamingProcessor` is a specialized Kafka consumer record processor designed for streaming contexts in Apache Camel. It handles creating Camel exchanges from Kafka records, propagates headers, manages offset commits (both auto and manual), and implements flexible error handling policies. This class provides a critical bridge between Kafka's low-level consumer API and Camel's high-level routing and processing framework, enabling robust and configurable Kafka stream processing within Camel routes.