AbstractKafkaRecordProcessorFacade.java
Overview
`AbstractKafkaRecordProcessorFacade` is an abstract base class designed to encapsulate common functionality for processing Kafka consumer records within the Apache Camel Kafka component. It provides foundational utilities and state management for classes that implement the `KafkaRecordProcessorFacade` interface, streamlining the handling of Kafka consumer records, logging, and lifecycle awareness during consumption.
This class acts as a framework for concrete record processing implementations, offering consistent logging, consumer stopping state checks, and integration points with commit management and error handling mechanisms.
Detailed Documentation
Package
org.apache.camel.component.kafka.consumer.support
Class Declaration
public abstract class AbstractKafkaRecordProcessorFacade implements KafkaRecordProcessorFacade
Abstract class: Cannot be instantiated directly; meant to be extended by concrete classes.
Implements:
KafkaRecordProcessorFacadeinterface, which defines the contract for Kafka record processors in Camel Kafka consumers.
Fields
Field Name | Type | Description |
|---|---|---|
`camelKafkaConsumer` | `KafkaConsumer` | Reference to the Camel Kafka consumer owning this processor facade instance. |
`threadId` | `String` | Identifier for the consumer thread, used for logging and tracking. |
`commitManager` | `CommitManager` | Manages Kafka offset commits to ensure processed records are acknowledged properly. |
`consumerListener` | `KafkaConsumerListener` | Listener interface for handling consumer events and errors during processing. |
`LOG` | `Logger` | SLF4J logger instance scoped to this class for logging debug and trace information. |
Constructor
protected AbstractKafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId,
CommitManager commitManager, KafkaConsumerListener consumerListener)
Parameters:
camelKafkaConsumer: The Kafka consumer instance from Apache Camel.threadId: Unique identifier of the consumer thread.commitManager: Offset commit manager instance.consumerListener: Listener for consumer events.
Description: Initializes the facade with necessary references to the consumer, commit manager, and listener to facilitate record processing and lifecycle management.
Methods
protected boolean isStopping()
Description: Checks whether the underlying Camel Kafka consumer is in the process of stopping.
Returns:
boolean—trueif the consumer is stopping, otherwisefalse.Usage Example:
if (isStopping()) { // Perform graceful shutdown or skip further processing }
protected void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition)
Description: Logs debug-level information about the number of consumer records received for a specific Kafka topic partition.
Parameters:
partitionRecords: List ofConsumerRecord<Object, Object>representing records from a particular partition.partition:TopicPartitionobject identifying the Kafka topic and partition.
Usage: Helps trace the volume of data received per partition during processing.
Logging Level: Debug
protected void logRecords(ConsumerRecords<Object, Object> allRecords)
Description: Logs debug-level information about the total number of records retrieved in the last Kafka poll operation on the current consumer thread.
Parameters:
allRecords:ConsumerRecords<Object, Object>representing all records fetched in one poll.
Usage: Useful for monitoring throughput and activity in consumer threads.
Logging Level: Debug
protected void logRecord(ConsumerRecord<Object, Object> consumerRecord)
Description: Logs trace-level detailed information about an individual Kafka record, including partition, offset, key, and value.
Parameters:
consumerRecord: SingleConsumerRecord<Object, Object>instance to log.
Usage: Provides fine-grained visibility into individual message consumption for debugging.
Logging Level: Trace
Implementation Details
Logging: Uses SLF4J for logging at different granularities (debug and trace), enabling detailed runtime diagnostics without affecting production performance when disabled.
Thread Awareness: The class maintains a
threadIdfor contextual logging aligned with Kafka consumer threads.Lifecycle Awareness: The
isStopping()method checks for graceful shutdown signals, enabling subclasses to stop consumption safely.Commit Management: Holds a reference to a
CommitManager, though actual commit logic is expected in subclasses or collaborators.Error Handling: Contains a listener reference (
KafkaConsumerListener) to handle errors and events, promoting separation of concerns.
Interaction with Other Components
KafkaConsumer (Camel): The core consumer instance that this facade helps manage and monitor.
CommitManager: Works alongside to handle offset commits based on processed records.
KafkaConsumerListener: Provides hooks for error handling and consumer event notifications during record processing.
KafkaRecordProcessorFacade: The interface that this abstract class partially implements, specifying the contract for record processing facades.
This class is part of the Kafka consumer support infrastructure inside Apache Camel and is extended by concrete Kafka record processors that implement processing logic tailored to different consumption strategies or error handling policies.
Usage Example (Hypothetical)
public class CustomKafkaRecordProcessor extends AbstractKafkaRecordProcessorFacade {
public CustomKafkaRecordProcessor(KafkaConsumer consumer, String threadId,
CommitManager commitManager, KafkaConsumerListener listener) {
super(consumer, threadId, commitManager, listener);
}
public void processRecords(ConsumerRecords<Object, Object> records) {
if (isStopping()) {
return; // Skip processing when stopping
}
logRecords(records);
records.partitions().forEach(partition -> {
List<ConsumerRecord<Object, Object>> partitionRecords = records.records(partition);
logRecordsInPartition(partitionRecords, partition);
for (ConsumerRecord<Object, Object> record : partitionRecords) {
logRecord(record);
// Process the record...
}
// Commit offsets if needed
});
}
}
Mermaid Class Diagram
classDiagram
class AbstractKafkaRecordProcessorFacade {
-KafkaConsumer camelKafkaConsumer
-String threadId
-CommitManager commitManager
-KafkaConsumerListener consumerListener
-Logger LOG
+AbstractKafkaRecordProcessorFacade(KafkaConsumer, String, CommitManager, KafkaConsumerListener)
#boolean isStopping()
#void logRecordsInPartition(List<ConsumerRecord>, TopicPartition)
#void logRecords(ConsumerRecords)
#void logRecord(ConsumerRecord)
}
AbstractKafkaRecordProcessorFacade ..|> KafkaRecordProcessorFacade
Summary
`AbstractKafkaRecordProcessorFacade` serves as a reusable foundation for processing Kafka consumer records within Apache Camel's Kafka component. It provides essential utilities such as logging, lifecycle state checks, and dependencies injection for commit management and error handling, while leaving the actual record processing implementation to subclasses. This design promotes clean code reuse, consistent logging practices, and easier maintenance of Kafka consumers in complex Camel integrations.