StopErrorStrategy.java


Overview

`StopErrorStrategy.java` defines a **polling exception strategy** for Kafka consumers within the Apache Camel Kafka component. Its primary purpose is to **immediately halt the Kafka consumer's polling loop upon encountering a polling exception**. This conservative strategy prevents the consumer from repeatedly encountering errors, which could lead to resource exhaustion, inconsistent state, or message loss.

As part of the error handling framework for Kafka consumers, this strategy implements the `PollExceptionStrategy` interface and is invoked whenever a polling exception occurs during the Kafka polling cycle. Upon handling an error, it signals the consumer fetch task to stop further polling and marks the consumer as disconnected.


Class: StopErrorStrategy

Description

`StopErrorStrategy` encapsulates the logic to stop a Kafka consumer upon polling exceptions. When triggered, it disables retries and sets the consumer's connected status to `false`, effectively terminating the consumer’s fetch loop.

This strategy is suitable for critical error scenarios where continuing consumption might be harmful or when manual intervention is desired before resuming consumption.

Package

org.apache.camel.component.kafka.consumer.errorhandler

Implements

PollExceptionStrategy

Fields

Field

Type

Description

`LOG`

`Logger`

Logger instance for logging warnings

`recordFetcher`

`KafkaFetchRecords`

Reference to the Kafka consumer fetch task, used to control consumer state

`retry`

`boolean`

Flag indicating if polling should continue; initially `true`


Constructor

public StopErrorStrategy(KafkaFetchRecords recordFetcher)
KafkaFetchRecords fetcher = ...; // initialized elsewhere
StopErrorStrategy stopStrategy = new StopErrorStrategy(fetcher);

Methods

void reset()

stopErrorStrategy.reset();

This would enable the consumer to continue polling after being stopped previously.


boolean canContinue()

if (!stopErrorStrategy.canContinue()) {
    // Stop polling and terminate consumer fetch loop
}

void handle(long partitionLastOffset, Exception exception)

try {
    // polling logic
} catch (Exception e) {
    stopErrorStrategy.handle(lastOffset, e);
}
@Override
public void handle(long partitionLastOffset, Exception exception) {
    LOG.warn("Requesting the consumer to stop based on polling exception strategy");
    retry = false;
    recordFetcher.setConnected(false);
}

Important Implementation Details


Interaction with Other Components


Usage Example

KafkaFetchRecords fetcher = ...; // initialized elsewhere
StopErrorStrategy stopStrategy = new StopErrorStrategy(fetcher);

try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    // process records
} catch (Exception e) {
    stopStrategy.handle(lastOffset, e);
    if (!stopStrategy.canContinue()) {
        // Stop the consumer fetch loop
        break;
    }
}

Visual Diagram

classDiagram
    class StopErrorStrategy {
        -KafkaFetchRecords recordFetcher
        -boolean retry
        +StopErrorStrategy(KafkaFetchRecords)
        +void reset()
        +boolean canContinue()
        +void handle(long partitionLastOffset, Exception exception)
    }

    StopErrorStrategy ..|> PollExceptionStrategy

    class PollExceptionStrategy {
        <<interface>>
        +void handle(long partitionLastOffset, Exception exception)
        +boolean canContinue()
        +void reset()
    }

Workflow Diagram: Polling Exception Handling with StopErrorStrategy

flowchart TD
    A[Start Kafka Consumer Fetch Loop] --> B[Poll Kafka for Records]
    B --> C{Polling Exception Occurs?}
    C -- No --> D[Process Records]
    D --> B
    C -- Yes --> E[Invoke StopErrorStrategy.handle()]
    E --> F[Log Warning and Set retry = false]
    F --> G[Set recordFetcher.connected = false]
    G --> H[Stop Polling Loop]

Summary

`StopErrorStrategy` is a straightforward but critical component in Kafka consumer error handling within Apache Camel. It provides a robust fail-safe by stopping the consumer immediately when an error occurs during polling. This prevents repeated failures, potential data corruption, and unstable consumer states, making it ideal for scenarios requiring strict failure management and manual recovery.

By implementing the `PollExceptionStrategy` interface, it integrates seamlessly into the Kafka consumer fetch loop, offering users a configurable way to enforce immediate consumer shutdown on critical exceptions.


References