Kafka Consumer Error Handling

Overview

The Kafka Consumer Error Handling module defines a set of strategies that govern how the system responds to exceptions encountered during Kafka consumer polling operations. Polling exceptions can occur for various reasons such as transient network issues, poison messages, authentication failures, or authorization problems. This module provides configurable, pluggable strategies to decide whether to retry, skip, reconnect, stop, or delegate error handling when such exceptions happen.

This capability is crucial to maintaining robust and resilient Kafka consumer behavior within Apache Camel routes, ensuring that message consumption continues smoothly or fails gracefully depending on the scenario and configured strategy.


Core Concepts

Purpose and Problem Addressed

Kafka consumers poll Kafka brokers for messages in a continuous loop. During polling, exceptions may arise which, if unhandled or handled improperly, can cause consumer threads to halt, lose messages, or enter unstable states.

The Kafka Consumer Error Handling module abstracts these exception handling mechanisms into distinct **poll exception strategies**, each implementing a consistent interface. This abstraction helps:


How the Module Works

PollExceptionStrategy Interface

All error handling strategies implement the `PollExceptionStrategy` interface, which defines:

This uniform interface enables the Kafka consumer's fetch loop to invoke error handling without coupling to specific strategy logic.

Available Strategies

The module provides five main strategies, each tailored for a particular error handling policy:

1. Discard Strategy (DiscardErrorStrategy)

*Example snippet:*

@Override
public void handle(long partitionLastOffset, Exception exception) {
    LOG.warn("Requesting the consumer to discard the message and continue to the next based on polling exception strategy");
    SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
}

@Override
public boolean canContinue() {
    return true;
}

2. Retry Strategy (RetryErrorStrategy)

*Example snippet:*

@Override
public void handle(long partitionLastOffset, Exception exception) {
    LOG.warn("Requesting the consumer to retry polling the same message based on polling exception strategy");
}

@Override
public boolean canContinue() {
    return true;
}

3. Reconnect Strategy (ReconnectErrorStrategy)

*Example snippet:*

@Override
public void handle(long partitionLastOffset, Exception exception) {
    if (exception instanceof AuthenticationException) {
        LOG.warn("Kafka reported a non-recoverable authentication error. The client will not reconnect");
        recordFetcher.setReconnect(false);
        recordFetcher.setConnected(false);
    } else {
        LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
        recordFetcher.setReconnect(true);
        recordFetcher.setConnected(false);
    }
    retry = false;
}

@Override
public boolean canContinue() {
    return retry;
}

4. Stop Strategy (StopErrorStrategy)

*Example snippet:*

@Override
public void handle(long partitionLastOffset, Exception exception) {
    LOG.warn("Requesting the consumer to stop based on polling exception strategy");
    retry = false;
    recordFetcher.setConnected(false);
}

@Override
public boolean canContinue() {
    return retry;
}

5. Bridge Error Strategy (BridgeErrorStrategy)

*Example snippet:*

@Override
public void handle(long partitionLastOffset, Exception exception) {
    LOG.warn("Deferring processing to the exception handler based on polling exception strategy");
    recordFetcher.getBridge().handleException(exception);
    SeekUtil.seekToNextOffset(consumer, partitionLastOffset);

    if (exception instanceof AuthenticationException || exception instanceof AuthorizationException) {
        continueFlag = false;
    }
}

@Override
public boolean canContinue() {
    return continueFlag;
}

Interaction with Other Modules


Sequence Diagram: Kafka Consumer Poll Exception Handling Workflow

sequenceDiagram
    participant Fetcher as KafkaFetchRecords (Consumer Fetcher)
    participant Consumer as Kafka Consumer
    participant Strategy as PollExceptionStrategy
    participant SeekUtil as Seek Utility
    participant Camel as Camel Error Handler Bridge

    Fetcher->>Consumer: poll()
    alt Exception occurs during poll
        Consumer-->>Fetcher: throws Exception
        Fetcher->>Strategy: handle(partitionLastOffset, exception)
        alt DiscardErrorStrategy or BridgeErrorStrategy
            Strategy->>SeekUtil: seekToNextOffset(consumer, partitionLastOffset)
        end
        alt BridgeErrorStrategy
            Strategy->>Camel: handleException(exception)
        end
        Strategy-->>Fetcher: canContinue()
        alt canContinue = true
            Fetcher->>Consumer: poll() (retry)
        else canContinue = false
            Fetcher->>Fetcher: stop polling or reconnect logic
        end
    else poll successful
        Fetcher->>Fetcher: process records
        Fetcher->>Consumer: poll() (next iteration)
    end

Summary

The Kafka Consumer Error Handling module encapsulates the logic for responding to exceptions during Kafka consumer polling into modular strategies. Each strategy defines a distinct approach — from skipping poison messages, retrying, reconnecting, stopping, to delegating error handling to Camel routes. This design enables flexible and robust consumer behavior adaptable to various operational scenarios, enhancing fault tolerance and message processing reliability within the Kafka component.