KafkaRecordStreamingProcessorFacade.java

Overview

`KafkaRecordStreamingProcessorFacade.java` is a core component of the Apache Camel Kafka consumer module, specifically designed to handle streaming processing of Kafka records. This class acts as a facade that orchestrates the processing of Kafka records retrieved during polling, managing the iteration over topic partitions and their records, delegating the actual processing to a specialized processor, and handling commit management and error handling through listeners.

The primary purpose of this class is to provide a streamlined, efficient, and robust approach to processing Kafka records in a streaming fashion, maintaining control over commit semantics and error handling behavior. It extends the abstract base class `AbstractKafkaRecordProcessorFacade` which provides foundational behavior for record processing facades.


Classes and Methods

Class: KafkaRecordStreamingProcessorFacade

Fields

Field Name

Type

Description

`private static final LOG`

`Logger`

Logger instance for logging events and debug info.

private final kafkaRecordProcessor

`KafkaRecordStreamingProcessor`

The processor responsible for handling individual Kafka records.

Constructor

public KafkaRecordStreamingProcessorFacade(
    KafkaConsumer camelKafkaConsumer,
    String threadId,
    CommitManager commitManager,
    KafkaConsumerListener consumerListener)

Private Methods

private KafkaRecordStreamingProcessor buildKafkaRecordProcessor(CommitManager commitManager)
private ProcessingResult processRecord(
    TopicPartition partition,
    boolean partitionHasNext,
    boolean recordHasNext,
    KafkaRecordStreamingProcessor kafkaRecordProcessor,
    ConsumerRecord<Object, Object> consumerRecord)

Overridden Methods

@Override
public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords)
KafkaRecordStreamingProcessorFacade facade = new KafkaRecordStreamingProcessorFacade(consumer, "thread-1", commitManager, listener);
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofMillis(100));
ProcessingResult result = facade.processPolledRecords(records);
if (result.isBreakOnErrorHit()) {
    // Handle error scenario
}

Important Implementation Details and Algorithms


Interaction with Other Components


Class Diagram

classDiagram
    class KafkaRecordStreamingProcessorFacade {
        - kafkaRecordProcessor: KafkaRecordStreamingProcessor
        + KafkaRecordStreamingProcessorFacade(KafkaConsumer, String, CommitManager, KafkaConsumerListener)
        - buildKafkaRecordProcessor(CommitManager) KafkaRecordStreamingProcessor
        - processRecord(TopicPartition, boolean, boolean, KafkaRecordStreamingProcessor, ConsumerRecord) ProcessingResult
        + processPolledRecords(ConsumerRecords) ProcessingResult
    }

    KafkaRecordStreamingProcessorFacade --|> AbstractKafkaRecordProcessorFacade
    KafkaRecordStreamingProcessorFacade o-- KafkaRecordStreamingProcessor
    KafkaRecordStreamingProcessorFacade ..> KafkaConsumer
    KafkaRecordStreamingProcessorFacade ..> CommitManager
    KafkaRecordStreamingProcessorFacade ..> KafkaConsumerListener

Summary

`KafkaRecordStreamingProcessorFacade.java` is a key orchestration class within the Apache Camel Kafka consumer module for streaming record processing. It manages the flow of records from polling through partitioned iteration, delegates detailed processing to a specialized processor, and controls offset commits and error handling. Its design emphasizes modularity, extensibility through listeners, and integration with Camel’s processing model, providing a robust foundation for building streaming Kafka consumers.


If you need more details on related classes like `KafkaRecordStreamingProcessor` or `AbstractKafkaRecordProcessorFacade`, please let me know!