KafkaErrorStrategies.java
Overview
The **KafkaErrorStrategies.java** file is a utility class within the Apache Camel Kafka component that provides a centralized factory method to obtain an appropriate error handling strategy (`PollExceptionStrategy`) for Kafka consumers during polling operations. It encapsulates the logic to select and instantiate different error strategies based on the Kafka endpoint configuration or the component-level default.
This class plays a crucial role in defining the behavior of Kafka consumers when exceptions occur during polling Kafka topics. It supports multiple predefined strategies such as retrying, reconnecting, delegating to Camel's error handler, discarding messages, or stopping the consumer. This flexibility allows developers to tailor the error handling behavior to their application requirements.
Package and Dependencies
Package:
org.apache.camel.component.kafka.consumer.errorhandlerImports:
org.apache.camel.RuntimeCamelExceptionorg.apache.camel.component.kafka.KafkaEndpointorg.apache.camel.component.kafka.KafkaFetchRecordsorg.apache.camel.component.kafka.PollExceptionStrategyorg.apache.camel.component.kafka.PollOnErrororg.apache.camel.util.ObjectHelperorg.apache.kafka.clients.consumer.Consumer
The class depends on Kafka consumer APIs and Camel Kafka component classes related to endpoint configuration, record fetching, and error handling strategies.
Class: KafkaErrorStrategies
Description
`KafkaErrorStrategies` is a **final utility class** that cannot be instantiated (`private` constructor) and exposes a **single public static method** to determine and provide the correct poll exception strategy for Kafka consumers.
It acts as a factory for `PollExceptionStrategy` implementations based on the configured error handling mode (`PollOnError`) or any explicitly provided strategy at the component level.
Constructor
private KafkaErrorStrategies()
Access: private
Purpose: Prevent instantiation of this utility class.
Static Method: strategies
public static PollExceptionStrategy strategies(
KafkaFetchRecords recordFetcher,
KafkaEndpoint endpoint,
Consumer<?, ?> consumer)
Purpose
Determines and returns a `PollExceptionStrategy` instance which governs how polling exceptions are handled by a Kafka consumer.
Parameters
Parameter | Type | Description |
|---|---|---|
`recordFetcher` | `KafkaFetchRecords` | The fetch task instance that polls and processes Kafka records. It is used by certain strategies that require access to the fetcher logic. |
`endpoint` | `KafkaEndpoint` | The Camel Kafka endpoint that contains configuration and component references. Used to retrieve configuration options and default strategies. |
`consumer` | `Consumer` | The Kafka consumer instance used for consumption. Required for some error strategies that interact directly with the consumer. |
Returns
PollExceptionStrategy— an instance implementing the chosen error handling strategy.
Throws
RuntimeCamelExceptionif the configuredPollOnErrorstrategy is invalid or unsupported.
Description
Null Check: Ensures the
consumerparameter is not null.Component-level Strategy: Checks if the Kafka component has a globally configured
PollExceptionStrategy. If present, returns this instance immediately.Endpoint-level Strategy: Reads the endpoint's
PollOnErrorconfiguration enum value which specifies the strategy to apply.Strategy Selection: Uses a
switchstatement based on thePollOnErrorenum:RETRY→ returns a newRetryErrorStrategy()RECONNECT→ returns a newReconnectErrorStrategy(recordFetcher)ERROR_HANDLER→ returns a newBridgeErrorStrategy(recordFetcher, consumer)DISCARD→ returns a newDiscardErrorStrategy(consumer)STOP→ returns a newStopErrorStrategy(recordFetcher)
If no matching enum value is found, throws a
RuntimeCamelException.
Usage Example
KafkaFetchRecords fetcher = ... // instance responsible for fetching Kafka records
KafkaEndpoint endpoint = ... // configured Kafka endpoint
Consumer<?, ?> consumer = ... // Kafka consumer instance
PollExceptionStrategy strategy = KafkaErrorStrategies.strategies(fetcher, endpoint, consumer);
// Use the strategy to handle poll exceptions in KafkaFetchRecords
This example shows how to obtain the appropriate error handling strategy for use in a Kafka fetch thread.
Implementation Details and Design Notes
Strategy Pattern: The class implements a factory method pattern that encapsulates the creation logic of different error handling strategies, allowing easy extension or modification.
Error Handling Flexibility: By centralizing strategy selection, the consuming code (
KafkaFetchRecords) can remain agnostic of the specific error recovery logic, facilitating pluggable error management behaviors.Strong Typing with Enum: The use of the
PollOnErrorenum ensures that only validated, expected error handling modes are selectable, minimizing configuration errors.Component-Level Override: Supports a component-wide override of the poll exception strategy, enabling global behavior changes without modifying each endpoint's configuration.
Tight Coupling with Fetcher and Consumer: Some strategies require access to the fetch task or the Kafka consumer directly, indicating that error handling strategies are closely integrated with the Kafka consumption lifecycle.
Interaction with Other Parts of the System
KafkaFetchRecords: The fetch task that polls Kafka topics uses the strategy returned by this class to handle exceptions during polling. For example,
ReconnectErrorStrategymay trigger consumer reconnection logic, whileBridgeErrorStrategyforwards exceptions to Camel's error handler.KafkaEndpoint: Provides configuration including the error handling mode (
PollOnError) and optional component-level default strategy. This class reads these configurations to decide which strategy to apply.PollExceptionStrategy Implementations: These are the actual classes implementing different recovery behaviors:
RetryErrorStrategyReconnectErrorStrategyBridgeErrorStrategyDiscardErrorStrategyStopErrorStrategy
Kafka Consumer: Some strategies interact directly with the Kafka consumer instance to pause, resume, or close connections.
Class Diagram
classDiagram
class KafkaErrorStrategies {
-KafkaErrorStrategies()
+strategies(recordFetcher: KafkaFetchRecords, endpoint: KafkaEndpoint, consumer: Consumer<?, ?>): PollExceptionStrategy
}
KafkaErrorStrategies ..> PollExceptionStrategy : returns instance of
KafkaErrorStrategies ..> KafkaFetchRecords : uses in strategy creation
KafkaErrorStrategies ..> KafkaEndpoint : reads configuration
KafkaErrorStrategies ..> Consumer : requires for some strategies
Summary
`KafkaErrorStrategies.java` is a concise, focused utility class that provides a factory method for selecting the appropriate Kafka poll error handling strategy based on endpoint and component configuration. It supports various recovery behaviors such as retrying, reconnecting, bridging to Camel's error handler, discarding problematic records, or stopping the consumer. This design supports flexibility and extensibility in how Kafka consumer errors are managed during polling operations, making it a key integration point between Kafka consumer lifecycle management and Camel's error handling framework.