KafkaRecordProcessor.java


Overview

`KafkaRecordProcessor.java` is an abstract Java class within the Apache Camel Kafka component designed to assist in processing Kafka consumer records into Camel exchanges. It provides foundational logic to:

This class acts as a base for more specialized record processors that deal with Kafka consumer records in various ways within the Camel Kafka component. It encapsulates common processing tasks and header management to ensure a consistent and extensible approach to integrating Kafka with Camel routing and messaging.


Detailed Explanation

Package

package org.apache.camel.component.kafka.consumer.support;

This indicates the class is part of the Kafka consumer support utilities in the Apache Camel Kafka component.


Class: KafkaRecordProcessor

Description

An abstract helper class providing utility methods to convert Kafka `ConsumerRecord` instances into Camel `Exchange` messages by setting appropriate headers, filtering headers, and managing message bodies.

Logger

private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessor.class);

Used for debug-level logging, mainly when setting up exchanges.


Methods


1. setupExchangeMessage

protected void setupExchangeMessage(Message message, ConsumerRecord<Object, Object> consumerRecord)
Purpose

Populate a Camel `Message` with metadata from a Kafka `ConsumerRecord`, including partition, topic, offset, headers, timestamp, and the record's key and value.

Parameters
Behavior
Usage Example
ConsumerRecord<Object, Object> record = ...;
Exchange exchange = ...;
Message message = exchange.getIn();

KafkaRecordProcessor processor = new ConcreteKafkaRecordProcessor();
processor.setupExchangeMessage(message, record);

// The exchange message now has Kafka metadata and body set.

2. shouldBeFiltered

protected boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy)
Purpose

Determines whether a given Kafka header should be filtered out (excluded) from the Camel message headers.

Parameters
Returns
Details

Internally calls `headerFilterStrategy.applyFilterToExternalHeaders` and returns the negation. Essentially, it inverts the filter's boolean to indicate whether to keep the header.

Usage Example
boolean keep = processor.shouldBeFiltered(kafkaHeader, exchange, headerFilterStrategy);
if (keep) {
    // propagate this header
}

3. propagateHeaders

protected void propagateHeaders(
    KafkaConfiguration configuration,
    ConsumerRecord<Object, Object> consumerRecord,
    Exchange exchange)
Purpose

Transfers Kafka record headers into Camel exchange message headers after applying filtering and deserialization.

Parameters
Behavior
Usage Example
KafkaConfiguration config = ...;
ConsumerRecord<Object, Object> record = ...;
Exchange exchange = ...;

processor.propagateHeaders(config, record, exchange);

// exchange.getIn() now contains deserialized and filtered Kafka headers

Important Implementation Details


Interaction With Other Parts of the System

This class acts as a bridge between Kafka's native consumer data and Camel's messaging model, ensuring that Kafka metadata and headers are correctly mapped to Camel exchanges for further routing and processing.


Mermaid Class Diagram

classDiagram
    class KafkaRecordProcessor {
        -static final Logger LOG
        +void setupExchangeMessage(Message message, ConsumerRecord consumerRecord)
        +boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy)
        +void propagateHeaders(KafkaConfiguration configuration, ConsumerRecord consumerRecord, Exchange exchange)
    }

Summary

`KafkaRecordProcessor.java` is a foundational abstraction within Apache Camel's Kafka component that standardizes how Kafka consumer records are translated into Camel messages and exchanges. It handles metadata extraction, header filtering, and header deserialization, enabling seamless integration of Kafka messaging semantics into Camel's routing and processing framework. The class is designed for reuse and extension by specialized Kafka record processors in the component.