Fetch Task Management

Purpose

Fetch Task Management addresses the challenge of efficiently and reliably consuming messages from Kafka topics within the Kafka Consumer submodule. Due to Kafka consumers not being thread-safe and requiring continuous polling, this subtopic implements dedicated consumer threads—called fetch tasks—that handle message polling, error recovery, lifecycle management, and rebalance events. It ensures that messages are fetched in a controlled manner, respecting pause/resume requests and integrating error handling strategies to maintain robust consumption.

Functionality

The core responsibility of Fetch Task Management is encapsulated in the [KafkaFetchRecords](/projects/289/68519) class, which implements a runnable task that:

Key workflows include:

  1. Consumer Creation Loop: Tries to create a Kafka consumer instance with retries and backoff if failures occur.

  2. Subscription Loop: Subscribes to topics or patterns with retry logic.

  3. Polling Loop: Locks processing to avoid concurrency issues, polls Kafka, processes records, and reacts to pause/resume requests.

  4. Error Handling: On exceptions, invokes a configured PollExceptionStrategy to determine recovery or termination.

  5. Pause/Resume Transitions: On pause request, calls consumer.pause(); on resume, seeks offsets and calls consumer.resume().

  6. Shutdown Procedure: Attempts to finish processing and commits offsets before closing consumer and unsubscribing.

Code Snippet Illustrating Polling Loop with Pause/Resume Handling

while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
    ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
    ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
    updateTaskState(); // handles pause/resume state transitions
    if (result != null && result.isBreakOnErrorHit()) {
        setReconnect(true);
        setConnected(false);
    }
}

State Management for Pause and Resume

private void updateTaskState() {
    switch (state.get()) {
        case PAUSE_REQUESTED:
            consumer.pause(consumer.assignment());
            state.set(State.PAUSED);
            break;
        case RESUME_REQUESTED:
            consumer.committed(this.consumer.assignment()).forEach((tp, offset) -> {
                if (offset != null) consumer.seek(tp, offset.offset());
            });
            consumer.resume(consumer.assignment());
            state.set(State.RUNNING);
            break;
        default:
            break;
    }
}

Integration

Fetch Task Management is a foundational mechanism within the Kafka Message Consumption parent topic. It provides the low-level threading and polling infrastructure that other subtopics build upon:

By encapsulating the threading, polling, and error resilience details, Fetch Task Management abstracts complexity from higher-level consumption logic, allowing seamless integration with Camel's routing and processing models.

Diagram

flowchart TD
    Start[Start Fetch Task Thread] --> CreateConsumer[Create Kafka Consumer Instance]
    CreateConsumer -->|Success| Subscribe[Subscribe to Topics/Pattern]
    Subscribe -->|Success| PollLoop[Polling Loop]
    PollLoop --> PollKafka[Poll Records from Kafka]
    PollKafka --> ProcessRecords[Process Fetched Records]
    ProcessRecords --> CheckPauseResume{Pause or Resume Requested?}
    CheckPauseResume -->|Pause Requested| PauseConsumer[Call consumer.pause()]
    PauseConsumer --> PollLoop
    CheckPauseResume -->|Resume Requested| SeekOffsets[Seek Offsets and consumer.resume()]
    SeekOffsets --> PollLoop
    CheckPauseResume -->|No| PollLoop
    PollLoop -->|Exception| HandleError[Invoke PollExceptionStrategy]
    HandleError -->|Can Continue| PollLoop
    HandleError -->|Cannot Continue| Shutdown[Close Consumer and Stop Thread]

This flowchart visualizes the lifecycle and core processing loop of fetch tasks, focusing on consumer creation, subscription, continuous polling with pause/resume support, error handling, and graceful shutdown.