KafkaErrorStrategies.java


Overview

The **KafkaErrorStrategies.java** file is a utility class within the Apache Camel Kafka component that provides a centralized factory method to obtain an appropriate error handling strategy (`PollExceptionStrategy`) for Kafka consumers during polling operations. It encapsulates the logic to select and instantiate different error strategies based on the Kafka endpoint configuration or the component-level default.

This class plays a crucial role in defining the behavior of Kafka consumers when exceptions occur during polling Kafka topics. It supports multiple predefined strategies such as retrying, reconnecting, delegating to Camel's error handler, discarding messages, or stopping the consumer. This flexibility allows developers to tailor the error handling behavior to their application requirements.


Package and Dependencies

The class depends on Kafka consumer APIs and Camel Kafka component classes related to endpoint configuration, record fetching, and error handling strategies.


Class: KafkaErrorStrategies

Description

`KafkaErrorStrategies` is a **final utility class** that cannot be instantiated (`private` constructor) and exposes a **single public static method** to determine and provide the correct poll exception strategy for Kafka consumers.

It acts as a factory for `PollExceptionStrategy` implementations based on the configured error handling mode (`PollOnError`) or any explicitly provided strategy at the component level.

Constructor

private KafkaErrorStrategies()

Static Method: strategies

public static PollExceptionStrategy strategies(
    KafkaFetchRecords recordFetcher,
    KafkaEndpoint endpoint,
    Consumer<?, ?> consumer)

Purpose

Determines and returns a `PollExceptionStrategy` instance which governs how polling exceptions are handled by a Kafka consumer.

Parameters

Parameter

Type

Description

`recordFetcher`

`KafkaFetchRecords`

The fetch task instance that polls and processes Kafka records. It is used by certain strategies that require access to the fetcher logic.

`endpoint`

`KafkaEndpoint`

The Camel Kafka endpoint that contains configuration and component references. Used to retrieve configuration options and default strategies.

`consumer`

`Consumer`

The Kafka consumer instance used for consumption. Required for some error strategies that interact directly with the consumer.

Returns

Throws

Description

  1. Null Check: Ensures the consumer parameter is not null.

  2. Component-level Strategy: Checks if the Kafka component has a globally configured PollExceptionStrategy. If present, returns this instance immediately.

  3. Endpoint-level Strategy: Reads the endpoint's PollOnError configuration enum value which specifies the strategy to apply.

  4. Strategy Selection: Uses a switch statement based on the PollOnError enum:

    • RETRY → returns a new RetryErrorStrategy()

    • RECONNECT → returns a new ReconnectErrorStrategy(recordFetcher)

    • ERROR_HANDLER → returns a new BridgeErrorStrategy(recordFetcher, consumer)

    • DISCARD → returns a new DiscardErrorStrategy(consumer)

    • STOP → returns a new StopErrorStrategy(recordFetcher)

  5. If no matching enum value is found, throws a RuntimeCamelException.


Usage Example

KafkaFetchRecords fetcher = ... // instance responsible for fetching Kafka records
KafkaEndpoint endpoint = ...    // configured Kafka endpoint
Consumer<?, ?> consumer = ...   // Kafka consumer instance

PollExceptionStrategy strategy = KafkaErrorStrategies.strategies(fetcher, endpoint, consumer);

// Use the strategy to handle poll exceptions in KafkaFetchRecords

This example shows how to obtain the appropriate error handling strategy for use in a Kafka fetch thread.


Implementation Details and Design Notes


Interaction with Other Parts of the System


Class Diagram

classDiagram
    class KafkaErrorStrategies {
        -KafkaErrorStrategies()
        +strategies(recordFetcher: KafkaFetchRecords, endpoint: KafkaEndpoint, consumer: Consumer<?, ?>): PollExceptionStrategy
    }

    KafkaErrorStrategies ..> PollExceptionStrategy : returns instance of
    KafkaErrorStrategies ..> KafkaFetchRecords : uses in strategy creation
    KafkaErrorStrategies ..> KafkaEndpoint : reads configuration
    KafkaErrorStrategies ..> Consumer : requires for some strategies

Summary

`KafkaErrorStrategies.java` is a concise, focused utility class that provides a factory method for selecting the appropriate Kafka poll error handling strategy based on endpoint and component configuration. It supports various recovery behaviors such as retrying, reconnecting, bridging to Camel's error handler, discarding problematic records, or stopping the consumer. This design supports flexibility and extensibility in how Kafka consumer errors are managed during polling operations, making it a key integration point between Kafka consumer lifecycle management and Camel's error handling framework.