AbstractKafkaRecordProcessorFacade.java


Overview

`AbstractKafkaRecordProcessorFacade` is an abstract base class designed to encapsulate common functionality for processing Kafka consumer records within the Apache Camel Kafka component. It provides foundational utilities and state management for classes that implement the `KafkaRecordProcessorFacade` interface, streamlining the handling of Kafka consumer records, logging, and lifecycle awareness during consumption.

This class acts as a framework for concrete record processing implementations, offering consistent logging, consumer stopping state checks, and integration points with commit management and error handling mechanisms.


Detailed Documentation

Package

org.apache.camel.component.kafka.consumer.support

Class Declaration

public abstract class AbstractKafkaRecordProcessorFacade implements KafkaRecordProcessorFacade

Fields

Field Name

Type

Description

`camelKafkaConsumer`

`KafkaConsumer`

Reference to the Camel Kafka consumer owning this processor facade instance.

`threadId`

`String`

Identifier for the consumer thread, used for logging and tracking.

`commitManager`

`CommitManager`

Manages Kafka offset commits to ensure processed records are acknowledged properly.

`consumerListener`

`KafkaConsumerListener`

Listener interface for handling consumer events and errors during processing.

`LOG`

`Logger`

SLF4J logger instance scoped to this class for logging debug and trace information.


Constructor

protected AbstractKafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId,
                                            CommitManager commitManager, KafkaConsumerListener consumerListener)

Methods

protected boolean isStopping()

protected void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition)

protected void logRecords(ConsumerRecords<Object, Object> allRecords)

protected void logRecord(ConsumerRecord<Object, Object> consumerRecord)


Implementation Details


Interaction with Other Components

This class is part of the Kafka consumer support infrastructure inside Apache Camel and is extended by concrete Kafka record processors that implement processing logic tailored to different consumption strategies or error handling policies.


Usage Example (Hypothetical)

public class CustomKafkaRecordProcessor extends AbstractKafkaRecordProcessorFacade {

    public CustomKafkaRecordProcessor(KafkaConsumer consumer, String threadId,
                                      CommitManager commitManager, KafkaConsumerListener listener) {
        super(consumer, threadId, commitManager, listener);
    }

    public void processRecords(ConsumerRecords<Object, Object> records) {
        if (isStopping()) {
            return; // Skip processing when stopping
        }
        logRecords(records);
        records.partitions().forEach(partition -> {
            List<ConsumerRecord<Object, Object>> partitionRecords = records.records(partition);
            logRecordsInPartition(partitionRecords, partition);
            for (ConsumerRecord<Object, Object> record : partitionRecords) {
                logRecord(record);
                // Process the record...
            }
            // Commit offsets if needed
        });
    }
}

Mermaid Class Diagram

classDiagram
    class AbstractKafkaRecordProcessorFacade {
        -KafkaConsumer camelKafkaConsumer
        -String threadId
        -CommitManager commitManager
        -KafkaConsumerListener consumerListener
        -Logger LOG
        +AbstractKafkaRecordProcessorFacade(KafkaConsumer, String, CommitManager, KafkaConsumerListener)
        #boolean isStopping()
        #void logRecordsInPartition(List<ConsumerRecord>, TopicPartition)
        #void logRecords(ConsumerRecords)
        #void logRecord(ConsumerRecord)
    }
    AbstractKafkaRecordProcessorFacade ..|> KafkaRecordProcessorFacade

Summary

`AbstractKafkaRecordProcessorFacade` serves as a reusable foundation for processing Kafka consumer records within Apache Camel's Kafka component. It provides essential utilities such as logging, lifecycle state checks, and dependencies injection for commit management and error handling, while leaving the actual record processing implementation to subclasses. This design promotes clean code reuse, consistent logging practices, and easier maintenance of Kafka consumers in complex Camel integrations.