KafkaFetchRecords.java
Overview
`KafkaFetchRecords` is a core component of the Apache Camel Kafka connector responsible for consuming messages from Kafka topics. It implements a dedicated, long-running thread task that:
Creates and manages a Kafka consumer instance.
Subscribes to topics or topic patterns, with support for rebalance listeners.
Continuously polls Kafka brokers for records.
Processes fetched records using either batch or streaming strategies.
Handles pause and resume requests safely in a thread-safe manner.
Manages offset commits via a commit manager.
Handles exceptions during polling with configurable error strategies.
Integrates with Camel’s Dev Console for metrics collection.
Provides health monitoring capabilities for readiness and error states.
This class is designed to handle the intricacies of Kafka consumer lifecycle management, error handling, and concurrency, abstracting these complexities from higher-level Camel Kafka consumer components. It runs independently for each consumer thread, enabling scalable parallel consumption.
Class: KafkaFetchRecords
Summary
Implements `Runnable` to represent a Kafka consumer fetch task running on its own thread. It manages the lifecycle of the Kafka consumer instance, polling loop, error handling, pause/resume state transitions, and metrics collection.
Key Fields
kafkaConsumer: Reference to the owningKafkaConsumerinstance.consumer: The underlying Kafka client consumer instance (org.apache.kafka.clients.consumer.Consumer).topicName/topicPattern: Topic or pattern subscribed to.threadId: Unique ID for logging and metrics (combines topic and thread number).kafkaProps: Kafka consumer configuration properties.pollExceptionStrategy: Strategy instance to handle exceptions during polling.lock:ReentrantLockto serialize Kafka consumer access as it is not thread-safe.commitManager: Manages offset commits.state: An atomic enum representing consumer pause/resume state (RUNNING,PAUSE_REQUESTED,PAUSED,RESUME_REQUESTED).terminated: Flag indicating if the fetch task has terminated.connected: Flag indicating if the consumer is currently connected.reconnect: Flag to indicate if a reconnect is needed.metricsCollector: Collects metrics for Camel Dev Console integration.consumerListener: Optional listener for consumer events and error handling.
Enum: State
Defines the consumer fetcher state machine for pause/resume:
RUNNING: Normal polling state.PAUSE_REQUESTED: Pause requested, but not yet paused.PAUSED: Consumer is paused.RESUME_REQUESTED: Resume requested, but not yet resumed.
Constructor
KafkaFetchRecords(KafkaConsumer kafkaConsumer,
BridgeExceptionHandlerToErrorHandler bridge,
String topicName, Pattern topicPattern, String id,
Properties kafkaProps,
KafkaConsumerListener consumerListener)
Parameters:
kafkaConsumer: The owning KafkaConsumer instance.bridge: Exception bridge to Camel error handler.topicName: Kafka topic name to subscribe to.topicPattern: Kafka topic pattern to subscribe to.id: Thread ID suffix.kafkaProps: Kafka consumer properties.consumerListener: Listener for consumer events and errors.
Behavior: Initializes fields, determines if Dev Console metrics should be collected, and sets up metrics collector accordingly.
Public Methods
void run()
Entry point for the fetch task thread. It:
Checks if the consumer is allowed to run.
Attempts to create and subscribe the Kafka consumer with retry and backoff.
Enters the polling loop to fetch and process records.
Handles pause/resume state transitions.
Applies error handling strategy on exceptions.
Terminates and closes the consumer on fatal errors or shutdown.
**Usage Example:**
KafkaFetchRecords fetcher = new KafkaFetchRecords(...);
Thread thread = new Thread(fetcher);
thread.start();
void pause()
Requests a pause of the consumer after current processing.
Sets state to
PAUSE_REQUESTED.The polling loop will pause the Kafka consumer on next iteration.
void resume()
Requests a resume of the consumer after current processing.
Sets state to
RESUME_REQUESTED.The polling loop will resume the Kafka consumer on next iteration.
void stop()
Initiates a safe stop of the consumer:
Waits for ongoing processing to finish with a configurable timeout.
Calls
consumer.wakeup()to break the poll loop.
boolean isConnected()
Returns whether the fetch task’s consumer is currently connected.
boolean isPaused()
Returns whether the consumer is currently paused (based on internal state).
String getState()
Returns the current state name (`RUNNING`, `PAUSED`, etc.).
DevConsoleMetricsCollector getMetricsCollector()
Returns the metrics collector instance used for Dev Console integration.
TaskHealthState healthState()
Returns a snapshot of the consumer’s health state, including:
Readiness (connection status and Kafka client state).
Termination status.
Recoverability (whether errors allow recovery).
Last error encountered.
Client ID.
Current backoff interval.
Kafka properties.
Protected and Private Methods
boolean createConsumerTask()
Attempts to create the underlying Kafka consumer instance and initialize the commit manager.
On failure, logs the error, sets the last error, and throws
TaskRunFailureExceptionto trigger retry.
boolean initializeConsumerTask()
Attempts to subscribe the consumer to topics and initialize the consumer state.
On failure, logs the error, sets the last error, and throws
TaskRunFailureExceptionto trigger retry.
void createConsumer()
Creates the Kafka consumer instance using the Kafka client factory.
Sets client ID via reflection if not set.
Instantiates the poll exception strategy.
Applies Kerberos config if specified.
void initializeConsumer()
Subscribes the consumer and resets the poll exception strategy.
void subscribe()
Performs actual subscription to topics or patterns with a `ConsumerRebalanceListener`.
Uses a
PausePreservingRebalanceListenerwrapper to preserve pause state during partition assignment.Resolves a custom or default
SubscribeAdapterfrom Camel context.
void startPolling()
Main polling loop that:
Locks the consumer for thread safety.
Polls Kafka with configured timeout.
Processes records via batch or streaming processor facade.
Collects metrics.
Handles pause/resume transitions.
Applies error handling strategies on exceptions.
Commits offsets when disconnecting.
Unlocks on completion.
KafkaRecordProcessorFacade createRecordProcessor()
Creates either a batch or streaming processing facade based on configuration.
void updateTaskState()
Handles state transitions for pause and resume:
On
PAUSE_REQUESTED, callsconsumer.pause().On
RESUME_REQUESTED, seeks committed offsets and callsconsumer.resume().
void safeConsumerClose()
Closes the Kafka consumer safely, logging but ignoring exceptions.
void safeUnsubscribe()
Unsubscribes the consumer safely, handling possible closed state or exceptions.
void safeStop()
Waits for processing lock with a timeout and calls `consumer.wakeup()` to break poll loop.
Inner Class: PausePreservingRebalanceListener
Wraps a delegate `ConsumerRebalanceListener` to preserve pause state during partition assignment.
If partitions are assigned while paused, it sets state back to
PAUSE_REQUESTEDto re-pause consumer.
Important Implementation Details
Concurrency: Uses
ReentrantLockto ensure only one thread accesses the Kafka consumer at a time, since the Kafka client is not thread-safe.State Machine: The
Stateenum and atomic reference ensure safe transitions for pause and resume requests without concurrency issues.Error Handling: Uses
PollExceptionStrategyto encapsulate all error handling and recovery logic during polling.Retry & Backoff: Consumer creation and subscription use retry loops with configurable backoff intervals and max attempts, implemented with Camel’s
ForegroundTaskandBudgets.Metrics & Health: Integrates with Camel’s Dev Console via
DevConsoleMetricsCollectorand exposes detailed health state for monitoring.Reflection: Uses reflection to retrieve Kafka client ID and internal client network state for health checks.
Clean Shutdown: Implements a safe stop procedure that waits for processing to finish before waking up consumer and closing resources.
Interaction with Other System Components
KafkaConsumer:
KafkaFetchRecordsinstances are created and managed byKafkaConsumer, which orchestrates overall consumer lifecycle and thread pool.CommitManager: Used to manage offset commits, either automatically or manually controlled.
KafkaConsumerListener: Receives callbacks after message consumption and for error handling.
PollExceptionStrategy: Defines how exceptions during polling are handled (e.g., retry, reconnect, stop).
SubscribeAdapter: Abstracts subscription logic, allowing custom subscription mechanisms.
ResumeStrategy & RebalanceListeners: For fault-tolerant offset management during consumer rebalances.
Dev Console: Metrics collector integrates with Camel’s dev console for visualization and diagnostics.
CamelContext & Registry: Used to lookup custom subscription adapters and configure consumer.
Usage Example
This class is mostly used internally by `KafkaConsumer`, but a simplified usage scenario:
Properties kafkaProps = new Properties();
// populate kafkaProps with consumer configs
KafkaConsumerListener listener = new MyKafkaConsumerListener();
KafkaFetchRecords fetchRecords = new KafkaFetchRecords(kafkaConsumer, bridge, "my-topic", null, "1", kafkaProps, listener);
// run in dedicated thread
new Thread(fetchRecords).start();
// pause consumption
fetchRecords.pause();
// resume consumption
fetchRecords.resume();
// stop fetching and shutdown
fetchRecords.stop();
Visual Diagram: Class Structure
classDiagram
class KafkaFetchRecords {
-KafkaConsumer kafkaConsumer
-Consumer consumer
-String clientId
-String topicName
-Pattern topicPattern
-String threadId
-Properties kafkaProps
-PollExceptionStrategy pollExceptionStrategy
-BridgeExceptionHandlerToErrorHandler bridge
-ReentrantLock lock
-CommitManager commitManager
-volatile Exception lastError
-KafkaConsumerListener consumerListener
-volatile boolean terminated
-volatile boolean reconnect
-volatile boolean connected
-AtomicReference~State~ state
-DevConsoleMetricsCollector metricsCollector
+void run()
+void pause()
+void resume()
+void stop()
+boolean isConnected()
+boolean isPaused()
+String getState()
+TaskHealthState healthState()
-void createConsumer()
-void initializeConsumer()
-void subscribe()
-void startPolling()
-void updateTaskState()
-void safeConsumerClose()
-void safeUnsubscribe()
-void safeStop()
}
class PausePreservingRebalanceListener {
-ConsumerRebalanceListener delegate
+void onPartitionsRevoked(Collection~TopicPartition~ partitions)
+void onPartitionsAssigned(Collection~TopicPartition~ partitions)
+void onPartitionsLost(Collection~TopicPartition~ partitions)
}
KafkaFetchRecords --> PausePreservingRebalanceListener : uses
Summary
`KafkaFetchRecords` encapsulates the vital Kafka consumer thread functionality for Apache Camel’s Kafka component. By managing consumer lifecycle, polling, pause/resume state, error handling, and metrics, it enables robust, scalable, and fault-tolerant Kafka message consumption. Its design addresses Kafka client thread-safety constraints and integrates tightly with Camel’s error handling and health check frameworks, making it a foundational element of Kafka message consumption in Camel routes.