Fetch Task Management
Purpose
Fetch Task Management addresses the challenge of efficiently and reliably consuming messages from Kafka topics within the Kafka Consumer submodule. Due to Kafka consumers not being thread-safe and requiring continuous polling, this subtopic implements dedicated consumer threads—called fetch tasks—that handle message polling, error recovery, lifecycle management, and rebalance events. It ensures that messages are fetched in a controlled manner, respecting pause/resume requests and integrating error handling strategies to maintain robust consumption.
Functionality
The core responsibility of Fetch Task Management is encapsulated in the [KafkaFetchRecords](/projects/289/68519) class, which implements a runnable task that:
Creates and initializes Kafka consumer instances with configurable retry/backoff policies to handle transient failures when connecting or subscribing.
Continuously polls Kafka brokers using the Kafka client’s
pollmethod with a configurable timeout, fetching batches of messages.Processes fetched records using either batch or streaming processors, depending on configuration, to deliver messages into Camel routes.
Manages consumer pause and resume states safely, using internal state transitions (
RUNNING,PAUSE_REQUESTED,PAUSED,RESUME_REQUESTED) and Kafka’s pause/resume APIs to avoid concurrency issues.Handles consumer lifecycle events including clean shutdown, unsubscribe, and close operations to release resources gracefully.
Integrates an error handling strategy that governs behavior on exceptions during polling, including logging, retries, reconnection attempts, or termination.
Supports rebalance listeners that react to partition assignment and revocation, coordinating with commit managers and resume strategies to seek proper offsets.
Collects metrics for monitoring via an optional Dev Console integration.
Key workflows include:
Consumer Creation Loop: Tries to create a Kafka consumer instance with retries and backoff if failures occur.
Subscription Loop: Subscribes to topics or patterns with retry logic.
Polling Loop: Locks processing to avoid concurrency issues, polls Kafka, processes records, and reacts to pause/resume requests.
Error Handling: On exceptions, invokes a configured
PollExceptionStrategyto determine recovery or termination.Pause/Resume Transitions: On pause request, calls
consumer.pause(); on resume, seeks offsets and callsconsumer.resume().Shutdown Procedure: Attempts to finish processing and commits offsets before closing consumer and unsubscribing.
Code Snippet Illustrating Polling Loop with Pause/Resume Handling
while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
updateTaskState(); // handles pause/resume state transitions
if (result != null && result.isBreakOnErrorHit()) {
setReconnect(true);
setConnected(false);
}
}
State Management for Pause and Resume
private void updateTaskState() {
switch (state.get()) {
case PAUSE_REQUESTED:
consumer.pause(consumer.assignment());
state.set(State.PAUSED);
break;
case RESUME_REQUESTED:
consumer.committed(this.consumer.assignment()).forEach((tp, offset) -> {
if (offset != null) consumer.seek(tp, offset.offset());
});
consumer.resume(consumer.assignment());
state.set(State.RUNNING);
break;
default:
break;
}
}
Integration
Fetch Task Management is a foundational mechanism within the Kafka Message Consumption parent topic. It provides the low-level threading and polling infrastructure that other subtopics build upon:
Pause and Resume: Fetch tasks respond to pause/resume commands by transitioning their internal state and invoking Kafka consumer pause/resume APIs, enabling fine-grained control over consumption flow.
Consumer Health Monitoring: Metrics collected during polling by fetch tasks feed into health checks and monitoring systems, exposing consumer readiness and error states.
Commit Management: Fetch tasks integrate with
CommitManagerinstances to commit offsets appropriately after processing batches or individual records, ensuring reliable delivery semantics.Error Handling: The polling loop uses
PollExceptionStrategyimplementations to decide how to react to exceptions, supporting robust consumer resiliency.Resume Strategies and Rebalance Listeners: During subscription and partition assignment, fetch tasks install rebalance listeners that coordinate offset seeking and resume logic.
Manual Commit Support: When manual commits are enabled, fetch tasks attach commit handles to exchanges for explicit offset committing downstream.
By encapsulating the threading, polling, and error resilience details, Fetch Task Management abstracts complexity from higher-level consumption logic, allowing seamless integration with Camel's routing and processing models.
Diagram
flowchart TD
Start[Start Fetch Task Thread] --> CreateConsumer[Create Kafka Consumer Instance]
CreateConsumer -->|Success| Subscribe[Subscribe to Topics/Pattern]
Subscribe -->|Success| PollLoop[Polling Loop]
PollLoop --> PollKafka[Poll Records from Kafka]
PollKafka --> ProcessRecords[Process Fetched Records]
ProcessRecords --> CheckPauseResume{Pause or Resume Requested?}
CheckPauseResume -->|Pause Requested| PauseConsumer[Call consumer.pause()]
PauseConsumer --> PollLoop
CheckPauseResume -->|Resume Requested| SeekOffsets[Seek Offsets and consumer.resume()]
SeekOffsets --> PollLoop
CheckPauseResume -->|No| PollLoop
PollLoop -->|Exception| HandleError[Invoke PollExceptionStrategy]
HandleError -->|Can Continue| PollLoop
HandleError -->|Cannot Continue| Shutdown[Close Consumer and Stop Thread]
This flowchart visualizes the lifecycle and core processing loop of fetch tasks, focusing on consumer creation, subscription, continuous polling with pause/resume support, error handling, and graceful shutdown.