KafkaFetchRecords.java


Overview

`KafkaFetchRecords` is a core component of the Apache Camel Kafka connector responsible for consuming messages from Kafka topics. It implements a dedicated, long-running thread task that:

This class is designed to handle the intricacies of Kafka consumer lifecycle management, error handling, and concurrency, abstracting these complexities from higher-level Camel Kafka consumer components. It runs independently for each consumer thread, enabling scalable parallel consumption.


Class: KafkaFetchRecords

Summary

Implements `Runnable` to represent a Kafka consumer fetch task running on its own thread. It manages the lifecycle of the Kafka consumer instance, polling loop, error handling, pause/resume state transitions, and metrics collection.

Key Fields

Enum: State

Defines the consumer fetcher state machine for pause/resume:


Constructor

KafkaFetchRecords(KafkaConsumer kafkaConsumer,
                  BridgeExceptionHandlerToErrorHandler bridge,
                  String topicName, Pattern topicPattern, String id,
                  Properties kafkaProps,
                  KafkaConsumerListener consumerListener)

Public Methods

void run()

Entry point for the fetch task thread. It:

**Usage Example:**

KafkaFetchRecords fetcher = new KafkaFetchRecords(...);
Thread thread = new Thread(fetcher);
thread.start();

void pause()

Requests a pause of the consumer after current processing.

void resume()

Requests a resume of the consumer after current processing.

void stop()

Initiates a safe stop of the consumer:

boolean isConnected()

Returns whether the fetch task’s consumer is currently connected.

boolean isPaused()

Returns whether the consumer is currently paused (based on internal state).

String getState()

Returns the current state name (`RUNNING`, `PAUSED`, etc.).

DevConsoleMetricsCollector getMetricsCollector()

Returns the metrics collector instance used for Dev Console integration.

TaskHealthState healthState()

Returns a snapshot of the consumer’s health state, including:


Protected and Private Methods

boolean createConsumerTask()

Attempts to create the underlying Kafka consumer instance and initialize the commit manager.

boolean initializeConsumerTask()

Attempts to subscribe the consumer to topics and initialize the consumer state.

void createConsumer()

Creates the Kafka consumer instance using the Kafka client factory.

void initializeConsumer()

Subscribes the consumer and resets the poll exception strategy.

void subscribe()

Performs actual subscription to topics or patterns with a `ConsumerRebalanceListener`.

void startPolling()

Main polling loop that:

KafkaRecordProcessorFacade createRecordProcessor()

Creates either a batch or streaming processing facade based on configuration.

void updateTaskState()

Handles state transitions for pause and resume:

void safeConsumerClose()

Closes the Kafka consumer safely, logging but ignoring exceptions.

void safeUnsubscribe()

Unsubscribes the consumer safely, handling possible closed state or exceptions.

void safeStop()

Waits for processing lock with a timeout and calls `consumer.wakeup()` to break poll loop.


Inner Class: PausePreservingRebalanceListener

Wraps a delegate `ConsumerRebalanceListener` to preserve pause state during partition assignment.


Important Implementation Details


Interaction with Other System Components


Usage Example

This class is mostly used internally by `KafkaConsumer`, but a simplified usage scenario:

Properties kafkaProps = new Properties();
// populate kafkaProps with consumer configs

KafkaConsumerListener listener = new MyKafkaConsumerListener();

KafkaFetchRecords fetchRecords = new KafkaFetchRecords(kafkaConsumer, bridge, "my-topic", null, "1", kafkaProps, listener);

// run in dedicated thread
new Thread(fetchRecords).start();

// pause consumption
fetchRecords.pause();

// resume consumption
fetchRecords.resume();

// stop fetching and shutdown
fetchRecords.stop();

Visual Diagram: Class Structure

classDiagram
    class KafkaFetchRecords {
        -KafkaConsumer kafkaConsumer
        -Consumer consumer
        -String clientId
        -String topicName
        -Pattern topicPattern
        -String threadId
        -Properties kafkaProps
        -PollExceptionStrategy pollExceptionStrategy
        -BridgeExceptionHandlerToErrorHandler bridge
        -ReentrantLock lock
        -CommitManager commitManager
        -volatile Exception lastError
        -KafkaConsumerListener consumerListener
        -volatile boolean terminated
        -volatile boolean reconnect
        -volatile boolean connected
        -AtomicReference~State~ state
        -DevConsoleMetricsCollector metricsCollector
        +void run()
        +void pause()
        +void resume()
        +void stop()
        +boolean isConnected()
        +boolean isPaused()
        +String getState()
        +TaskHealthState healthState()
        -void createConsumer()
        -void initializeConsumer()
        -void subscribe()
        -void startPolling()
        -void updateTaskState()
        -void safeConsumerClose()
        -void safeUnsubscribe()
        -void safeStop()
    }

    class PausePreservingRebalanceListener {
        -ConsumerRebalanceListener delegate
        +void onPartitionsRevoked(Collection~TopicPartition~ partitions)
        +void onPartitionsAssigned(Collection~TopicPartition~ partitions)
        +void onPartitionsLost(Collection~TopicPartition~ partitions)
    }

    KafkaFetchRecords --> PausePreservingRebalanceListener : uses

Summary

`KafkaFetchRecords` encapsulates the vital Kafka consumer thread functionality for Apache Camel’s Kafka component. By managing consumer lifecycle, polling, pause/resume state, error handling, and metrics, it enables robust, scalable, and fault-tolerant Kafka message consumption. Its design addresses Kafka client thread-safety constraints and integrates tightly with Camel’s error handling and health check frameworks, making it a foundational element of Kafka message consumption in Camel routes.