KafkaRecordStreamingProcessor.java
Overview
`KafkaRecordStreamingProcessor` is a final class within the Apache Camel Kafka component that specializes in processing Kafka consumer records in a streaming context. It extends the abstract functionality of `KafkaRecordProcessor` to handle each Kafka `ConsumerRecord` by converting it into a Camel `Exchange`, applying user-defined processing logic, and managing offset commits in accordance with configured commit strategies.
This processor supports fine-grained control over Kafka message consumption, including manual offset commits, error handling policies, and message header propagation, making it suitable for real-time streaming applications where precise offset management and error resilience are critical.
Class: KafkaRecordStreamingProcessor
Purpose
Processes individual Kafka consumer records into Camel exchanges.
Manages message headers and commit semantics based on configuration.
Handles exceptions during processing with configurable policies.
Coordinates with a
CommitManagerto commit offsets either automatically or manually.
Package
`org.apache.camel.component.kafka.consumer.support.streaming`
Inheritance
Extends
KafkaRecordProcessor
Fields
Field | Type | Description |
|---|---|---|
`Logger` | SLF4J Logger for logging processing events and exceptions. | |
`autoCommitEnabled` | `boolean` | Flag indicating if Kafka consumer auto-commit is enabled (`true`) or manual commit is used (`false`). |
`configuration` | `KafkaConfiguration` | Holds Kafka consumer configuration settings. |
`processor` | `Processor` | The Camel Processor that defines how to process the exchange created from a Kafka record. |
`commitManager` | `CommitManager` | Manages offset commits, supporting both manual and automatic commit modes. |
Constructor
KafkaRecordStreamingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager)
Initializes the streaming processor with required components.
Parameters:
configuration- Kafka consumer configuration instance.processor- Camel Processor to process each Kafka exchange.commitManager- Manager responsible for committing offsets.
Behavior:
Reads the auto-commit setting from
configurationto determine commit behavior.Stores references to the processor and commit manager for use during processing.
Methods
ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, boolean partitionHasNext, boolean recordHasNext, ConsumerRecord<Object, Object> consumerRecord)
Processes a single Kafka consumer record within a streaming workflow.
Parameters:
camelKafkaConsumer- The Camel Kafka consumer instance managing the consumption.topicPartition- The Kafka topic partition of the current record.partitionHasNext- Flag indicating if more records exist in the current partition.recordHasNext- Flag indicating if more records exist in the current poll batch.consumerRecord- The Kafka record to be processed.
Returns:
ProcessingResult- Encapsulates the outcome of processing, including error status and offset info.
Behavior:
Creates a new Camel
Exchangefor the record.Prepares the
Exchangeby copying Kafka record data and propagating headers.Adds special headers if auto-commit is disabled, signaling if this is the last record before commit.
If manual commit is allowed, adds a
KafkaManualCommitinstance to the exchange headers.Invokes the configured Camel
Processorto handle the exchange.Catches and handles exceptions, invoking
processExceptionif needed.Updates offset tracking in
commitManagerif no break-on-error condition occurred.Releases the
Exchangeback to the consumer.
Usage Example:
ProcessingResult result = processor.processExchange( camelKafkaConsumer, topicPartition, partitionHasNext, recordHasNext, consumerRecord); if(result.isBreakOnErrorHit()) { // Handle break on error scenario }
private boolean processException(Exchange exchange, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord, ExceptionHandler exceptionHandler)
Handles exceptions thrown during exchange processing based on configuration.
Parameters:
exchange- The Camel exchange where the exception occurred.topicPartition- The topic partition related to the failing record.consumerRecord- The Kafka record being processed when the error occurred.exceptionHandler- The Camel exception handler to delegate exception processing.
Returns:
boolean-trueif processing should break and stop further consumption;falseotherwise.
Behavior:
If
breakOnFirstErroris enabled in the configuration:Logs a warning with details.
Performs a commit of the current offset via
commitManager.Returns
trueto signal the consumption loop to halt processing.
Otherwise:
Delegates exception handling to the provided
exceptionHandler.Continues processing by returning
false.
Important Implementation Details
Exchange Preparation:
The processor sets up the Camel
Exchangewith message content and headers derived from the KafkaConsumerRecord.Headers such as
LAST_RECORD_BEFORE_COMMITandLAST_POLL_RECORDprovide information about the record's position in the current batch and partition, which is critical for managing commits when auto-commit is disabled.
Commit Management:
The class works closely with the
CommitManagerwhich centralizes offset commit logic.Supports both automatic commits (when enabled) and manual commits via
KafkaManualCommitobjects attached to the exchange header.Upon successful processing, offsets are recorded with the commit manager to be committed later or immediately depending on strategy.
Error Handling Strategy:
The class supports a "break on first error" strategy, which causes the consumer to stop processing further records on an error.
Alternatively, it supports continuing processing after logging and handling exceptions without interrupting consumption.
This flexibility allows integration with different error recovery mechanisms.
Concurrency and Exchange Management:
Exchanges are created for each record and released after processing to manage resource lifecycle.
The design assumes a streaming consumption model where records are processed one at a time in sequence.
Interaction with Other Components
KafkaConsumer:
The processor uses the
KafkaConsumerinstance to create exchanges and access exception handlers.It also calls back to release exchanges once processing is complete.
CommitManager:
Coordinates the commit lifecycle of Kafka offsets.
Provides manual commit objects when allowed, enabling users to manually control offset commits.
Processor:
User-defined processing logic that operates on the Camel Exchange created from Kafka records.
This is where business logic or routing rules are applied.
KafkaConfiguration:
Supplies configuration options such as auto-commit flag, manual commit allowance, and error handling policies.
ExceptionHandler:
Provides customizable exception handling, used when processing fails and
breakOnFirstErroris disabled.
Visual Diagram
classDiagram
class KafkaRecordStreamingProcessor {
-boolean autoCommitEnabled
-KafkaConfiguration configuration
-Processor processor
-CommitManager commitManager
+KafkaRecordStreamingProcessor(configuration, processor, commitManager)
+ProcessingResult processExchange(camelKafkaConsumer, topicPartition, partitionHasNext, recordHasNext, consumerRecord)
-boolean processException(exchange, topicPartition, consumerRecord, exceptionHandler)
}
KafkaRecordStreamingProcessor --|> KafkaRecordProcessor
KafkaRecordStreamingProcessor ..> KafkaConfiguration : uses
KafkaRecordStreamingProcessor ..> Processor : uses
KafkaRecordStreamingProcessor ..> CommitManager : uses
KafkaRecordStreamingProcessor ..> KafkaConsumer : interacts with
KafkaRecordStreamingProcessor ..> ProcessingResult : returns
KafkaRecordStreamingProcessor ..> ExceptionHandler : interacts with
Summary
`KafkaRecordStreamingProcessor` is a specialized Kafka consumer record processor designed for streaming contexts in Apache Camel. It handles creating Camel exchanges from Kafka records, propagates headers, manages offset commits (both auto and manual), and implements flexible error handling policies. This class provides a critical bridge between Kafka's low-level consumer API and Camel's high-level routing and processing framework, enabling robust and configurable Kafka stream processing within Camel routes.