KafkaRecordStreamingProcessorFacade.java
Overview
`KafkaRecordStreamingProcessorFacade.java` is a core component of the Apache Camel Kafka consumer module, specifically designed to handle streaming processing of Kafka records. This class acts as a facade that orchestrates the processing of Kafka records retrieved during polling, managing the iteration over topic partitions and their records, delegating the actual processing to a specialized processor, and handling commit management and error handling through listeners.
The primary purpose of this class is to provide a streamlined, efficient, and robust approach to processing Kafka records in a streaming fashion, maintaining control over commit semantics and error handling behavior. It extends the abstract base class `AbstractKafkaRecordProcessorFacade` which provides foundational behavior for record processing facades.
Classes and Methods
Class: KafkaRecordStreamingProcessorFacade
Package:
org.apache.camel.component.kafka.consumer.support.streamingExtends:
AbstractKafkaRecordProcessorFacadeDescription:
Implements Kafka record processing for streaming consumers by iterating over partitions and their records, processing each record, and managing commits and error handling.
Fields
Field Name | Type | Description |
|---|---|---|
`private static final LOG` | `Logger` | Logger instance for logging events and debug info. |
`KafkaRecordStreamingProcessor` | The processor responsible for handling individual Kafka records. |
Constructor
public KafkaRecordStreamingProcessorFacade(
KafkaConsumer camelKafkaConsumer,
String threadId,
CommitManager commitManager,
KafkaConsumerListener consumerListener)
Parameters:
camelKafkaConsumer- The Camel Kafka consumer instance.threadId- Identifier for the processing thread.commitManager- Manages offset commits.consumerListener- Listener for consumer lifecycle events and error handling.
Description:
Initializes the facade, setting up the internalKafkaRecordStreamingProcessorusing the provided commit manager and consumer configuration.
Private Methods
private KafkaRecordStreamingProcessor buildKafkaRecordProcessor(CommitManager commitManager)
Parameters:
commitManager: The commit manager to handle offset commits.
Returns:
A new instance of
KafkaRecordStreamingProcessor.
Description:
Constructs the internal record processor with the consumer configuration, processor, and commit manager.
private ProcessingResult processRecord(
TopicPartition partition,
boolean partitionHasNext,
boolean recordHasNext,
KafkaRecordStreamingProcessor kafkaRecordProcessor,
ConsumerRecord<Object, Object> consumerRecord)
Parameters:
partition- The Kafka topic partition the record belongs to.partitionHasNext- Indicates if there are more partitions to process after this one.recordHasNext- Indicates if there are more records in the current partition to process.kafkaRecordProcessor- The processor instance to process the single record.consumerRecord- The Kafka consumer record to process.
Returns:
ProcessingResultindicating the outcome of processing the record.
Description:
Logs the record details and delegates processing tokafkaRecordProcessor.processExchange(). This method encapsulates the handling of one Kafka record.
Overridden Methods
@Override
public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords)
Parameters:
allRecords- A batch of consumer records returned by Kafka poll.
Returns:
ProcessingResultsummarizing the processing outcome for the batch.
Description:
This method is the main entry point for processing Kafka records retrieved by the consumer. It:Logs the batch of records.
Iterates over each partition in the batch.
Iterates over each record in the partition.
Processes each record using
processRecord().Checks for errors or stop signals.
Uses a
KafkaConsumerListenerto potentially stop processing early.Commits offsets when all records in a partition are successfully processed.
Usage example:
KafkaRecordStreamingProcessorFacade facade = new KafkaRecordStreamingProcessorFacade(consumer, "thread-1", commitManager, listener);
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofMillis(100));
ProcessingResult result = facade.processPolledRecords(records);
if (result.isBreakOnErrorHit()) {
// Handle error scenario
}
Important Implementation Details and Algorithms
Streaming Processing:
The facade processes records in a streaming manner, iterating partition-by-partition and record-by-record, which is suitable for continuous, real-time data streams.Commit Management:
Offsets are committed per-partition after all records in that partition are processed successfully, ensuring at-least-once processing semantics.Error Handling:
If any processing error occurs (isBreakOnErrorHit()returns true), processing halts. Additionally, theKafkaConsumerListenercan influence processing flow by returningfalseinafterProcess()to stop further processing and commit offsets.Logging:
The class logs detailed debug information about partitions and records being processed, which aids in troubleshooting during development or production monitoring.Integration with Camel Kafka Consumer:
This facade relies on the Camel Kafka consumer’s configuration and processor to handle the actual business logic of processing messages, keeping this class focused on orchestration.
Interaction with Other Components
KafkaConsumer(Camel Kafka Consumer):
Provides configuration and message processor used during record processing.CommitManager:
Handles offset management and commits to Kafka, invoked after successful processing of all records in a partition or upon certain error conditions.KafkaConsumerListener:
Allows for custom lifecycle or error handling, influencing whether processing continues after each record.KafkaRecordStreamingProcessor:
The internal processor that performs the detailed work of converting a Kafka record into an exchange and invoking the Camel processor.AbstractKafkaRecordProcessorFacade:
This class inherits common functionality and lifecycle support for record processing facades.
Class Diagram
classDiagram
class KafkaRecordStreamingProcessorFacade {
- kafkaRecordProcessor: KafkaRecordStreamingProcessor
+ KafkaRecordStreamingProcessorFacade(KafkaConsumer, String, CommitManager, KafkaConsumerListener)
- buildKafkaRecordProcessor(CommitManager) KafkaRecordStreamingProcessor
- processRecord(TopicPartition, boolean, boolean, KafkaRecordStreamingProcessor, ConsumerRecord) ProcessingResult
+ processPolledRecords(ConsumerRecords) ProcessingResult
}
KafkaRecordStreamingProcessorFacade --|> AbstractKafkaRecordProcessorFacade
KafkaRecordStreamingProcessorFacade o-- KafkaRecordStreamingProcessor
KafkaRecordStreamingProcessorFacade ..> KafkaConsumer
KafkaRecordStreamingProcessorFacade ..> CommitManager
KafkaRecordStreamingProcessorFacade ..> KafkaConsumerListener
Summary
`KafkaRecordStreamingProcessorFacade.java` is a key orchestration class within the Apache Camel Kafka consumer module for streaming record processing. It manages the flow of records from polling through partitioned iteration, delegates detailed processing to a specialized processor, and controls offset commits and error handling. Its design emphasizes modularity, extensibility through listeners, and integration with Camel’s processing model, providing a robust foundation for building streaming Kafka consumers.
If you need more details on related classes like `KafkaRecordStreamingProcessor` or `AbstractKafkaRecordProcessorFacade`, please let me know!