KafkaRecordProcessor.java
Overview
`KafkaRecordProcessor.java` is an abstract Java class within the Apache Camel Kafka component designed to assist in processing Kafka consumer records into Camel exchanges. It provides foundational logic to:
Extract metadata and payload from Kafka
ConsumerRecordobjects.Populate Camel
MessageandExchangeobjects with Kafka-specific headers, keys, values, and timestamps.Filter and propagate Kafka headers into Camel message headers using a configurable strategy and deserializers.
This class acts as a base for more specialized record processors that deal with Kafka consumer records in various ways within the Camel Kafka component. It encapsulates common processing tasks and header management to ensure a consistent and extensible approach to integrating Kafka with Camel routing and messaging.
Detailed Explanation
Package
package org.apache.camel.component.kafka.consumer.support;
This indicates the class is part of the Kafka consumer support utilities in the Apache Camel Kafka component.
Class: KafkaRecordProcessor
Description
An abstract helper class providing utility methods to convert Kafka `ConsumerRecord` instances into Camel `Exchange` messages by setting appropriate headers, filtering headers, and managing message bodies.
Logger
private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessor.class);
Used for debug-level logging, mainly when setting up exchanges.
Methods
1. setupExchangeMessage
protected void setupExchangeMessage(Message message, ConsumerRecord<Object, Object> consumerRecord)
Purpose
Populate a Camel `Message` with metadata from a Kafka `ConsumerRecord`, including partition, topic, offset, headers, timestamp, and the record's key and value.
Parameters
message- The CamelMessageinstance to be populated.consumerRecord- The KafkaConsumerRecordfrom which to extract data.
Behavior
Sets Kafka-specific headers like partition, topic, offset, headers collection, and timestamps.
Sets the Camel
Exchange.MESSAGE_TIMESTAMPheader.If the Kafka record key is non-null, it adds it as a header.
Sets the message body to the Kafka record value.
Logs a debug message about the partition and offset.
Usage Example
ConsumerRecord<Object, Object> record = ...;
Exchange exchange = ...;
Message message = exchange.getIn();
KafkaRecordProcessor processor = new ConcreteKafkaRecordProcessor();
processor.setupExchangeMessage(message, record);
// The exchange message now has Kafka metadata and body set.
2. shouldBeFiltered
protected boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy)
Purpose
Determines whether a given Kafka header should be filtered out (excluded) from the Camel message headers.
Parameters
header- The KafkaHeaderto evaluate.exchange- The CamelExchangecontext.headerFilterStrategy- Strategy for filtering headers.
Returns
trueif the header should be propagated (i.e., not filtered).falseif the header should be excluded.
Details
Internally calls `headerFilterStrategy.applyFilterToExternalHeaders` and returns the negation. Essentially, it inverts the filter's boolean to indicate whether to keep the header.
Usage Example
boolean keep = processor.shouldBeFiltered(kafkaHeader, exchange, headerFilterStrategy);
if (keep) {
// propagate this header
}
3. propagateHeaders
protected void propagateHeaders(
KafkaConfiguration configuration,
ConsumerRecord<Object, Object> consumerRecord,
Exchange exchange)
Purpose
Transfers Kafka record headers into Camel exchange message headers after applying filtering and deserialization.
Parameters
configuration- TheKafkaConfigurationproviding header filter and deserializer.consumerRecord- The KafkaConsumerRecordcontaining headers.exchange- The CamelExchangewhose "in" message headers will be populated.
Behavior
Retrieves the
HeaderFilterStrategyandKafkaHeaderDeserializerfrom the provided configuration.Iterates over all Kafka headers in the record.
Filters headers using
shouldBeFiltered.Deserializes header values and sets them into the Camel message headers.
Usage Example
KafkaConfiguration config = ...;
ConsumerRecord<Object, Object> record = ...;
Exchange exchange = ...;
processor.propagateHeaders(config, record, exchange);
// exchange.getIn() now contains deserialized and filtered Kafka headers
Important Implementation Details
Uses Apache Camel's
HeaderFilterStrategyto determine which Kafka headers should be propagated.Relies on
KafkaHeaderDeserializerto convert Kafka header byte arrays into Java objects.Uses Java Stream API (
StreamSupport.stream) to efficiently process Kafka headers iterable.The class is abstract, suggesting it is intended to be extended by concrete processors that implement additional behavior.
Interaction With Other Parts of the System
KafkaConfiguration: Supplies configuration options, especially header filtering and deserialization strategies.
Camel Exchange and Message: This class populates these Camel messaging abstractions to integrate Kafka records into Camel routes.
Kafka ConsumerRecord: The source data structure from the Kafka client library representing a consumed record.
HeaderFilterStrategy: An SPI in Camel to customize header propagation policies.
KafkaHeaderDeserializer: Responsible for converting raw Kafka header bytes into usable Java types.
Logging: Uses SLF4J for logging debug information about processing actions.
This class acts as a bridge between Kafka's native consumer data and Camel's messaging model, ensuring that Kafka metadata and headers are correctly mapped to Camel exchanges for further routing and processing.
Mermaid Class Diagram
classDiagram
class KafkaRecordProcessor {
-static final Logger LOG
+void setupExchangeMessage(Message message, ConsumerRecord consumerRecord)
+boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy)
+void propagateHeaders(KafkaConfiguration configuration, ConsumerRecord consumerRecord, Exchange exchange)
}
Summary
`KafkaRecordProcessor.java` is a foundational abstraction within Apache Camel's Kafka component that standardizes how Kafka consumer records are translated into Camel messages and exchanges. It handles metadata extraction, header filtering, and header deserialization, enabling seamless integration of Kafka messaging semantics into Camel's routing and processing framework. The class is designed for reuse and extension by specialized Kafka record processors in the component.