ReconnectErrorStrategy.java
Overview
`ReconnectErrorStrategy.java` defines an error handling strategy for Kafka consumers within the Apache Camel Kafka component. This strategy implements the `PollExceptionStrategy` interface and focuses on recovering from polling exceptions by requesting a consumer reconnect, except in the case of non-recoverable authentication errors.
The primary goal of this strategy is to enhance consumer resilience by allowing automatic recovery from transient network or broker issues through reconnection cycles, while preventing futile retries on unrecoverable authentication failures.
Class: ReconnectErrorStrategy
Package
org.apache.camel.component.kafka.consumer.errorhandler
Dependencies
KafkaFetchRecords— manages Kafka consumer polling lifecycle and state.PollExceptionStrategy— interface defining the contract for poll exception handling strategies.AuthenticationException— Kafka-specific exception signaling authentication failures.Logger/LoggerFactory— for logging warning messages.
Purpose
The `ReconnectErrorStrategy` class implements a polling exception strategy that reacts to Kafka consumer poll failures by:
Determining whether the exception is recoverable.
Requesting the Kafka consumer to reconnect on the next poll cycle if recoverable.
Disabling reconnect attempts and marking the consumer disconnected if the exception is an
AuthenticationException(non-recoverable).Controlling whether the polling loop should continue immediately or wait for the reconnect cycle.
Class Diagram
classDiagram
class ReconnectErrorStrategy {
- KafkaFetchRecords recordFetcher
- boolean retry
+ ReconnectErrorStrategy(KafkaFetchRecords)
+ void reset()
+ boolean canContinue()
+ void handle(long partitionLastOffset, Exception exception)
}
ReconnectErrorStrategy ..|> PollExceptionStrategy
Detailed Class Description
Fields
Field | Type | Description |
|---|---|---|
`recordFetcher` | `KafkaFetchRecords` | Reference to the fetching component controlling consumer state and lifecycle. |
`retry` | `boolean` | Internal flag indicating if polling can continue immediately after handling an exception. |
Constructor
public ReconnectErrorStrategy(KafkaFetchRecords recordFetcher)
Parameters:
recordFetcher: An instance ofKafkaFetchRecordsthat this strategy will control for reconnect behavior.
Description: Initializes the strategy with the Kafka fetcher used to manage consumer state.
Methods
void reset()
Description: Resets the internal retry flag to
true, allowing polling to continue again. Typically called when a new poll cycle or consumer is initialized.Usage Example:
ReconnectErrorStrategy strategy = new ReconnectErrorStrategy(fetcher);
// After handling an error and stopping retry, reset to allow retrying again
strategy.reset();
boolean canContinue()
Returns:
trueif the strategy allows the consumer to continue polling immediately;falseotherwise.Description: Indicates to the caller whether the current poll cycle can proceed or if it should be halted (e.g., to close and reconnect).
Usage Example:
if (!strategy.canContinue()) {
// Stop current poll loop and trigger reconnect lifecycle
}
void handle(long partitionLastOffset, Exception exception)
Parameters:
partitionLastOffset: The last offset of the Kafka partition being processed (not used directly in this strategy).exception: The exception thrown during the Kafka consumer poll.
Description: Handles the exception by deciding whether to reconnect or not based on the exception type.
Behavior:
If the exception is an instance of
AuthenticationException:Logs a warning indicating a non-recoverable authentication error.
Sets the fetcher to not reconnect (
setReconnect(false)).Marks the fetcher as disconnected (
setConnected(false)).
For all other exceptions:
Logs a warning requesting a reconnect on the next poll.
Sets the fetcher to reconnect (
setReconnect(true)).Marks the fetcher as disconnected (
setConnected(false)).
Sets the internal retry flag to
falseto stop the current poll retry.
Usage Example:
try {
consumer.poll();
} catch (Exception e) {
strategy.handle(lastOffset, e);
if (!strategy.canContinue()) {
// Close consumer and reconnect on next cycle
}
}
Important Implementation Details
Retry Flag:
Theretryboolean flag controls whether the poll loop should immediately continue after an exception. After handling an exception,retryis set tofalse, signaling the current poll cycle should stop, allowing the consumer fetcher to close the current consumer and prepare for a reconnect if enabled.AuthenticationException Handling:
RecognizesAuthenticationExceptionas a fatal error where reconnect attempts are pointless (e.g., invalid credentials). Therefore, it disables reconnect and disconnects the consumer permanently until external intervention.Consumer Lifecycle Control:
The strategy manipulatesKafkaFetchRecordsflagssetReconnectandsetConnectedto orchestrate consumer shutdown and reconnection logic within the fetcher's polling thread.
Interaction with Other System Components
KafkaFetchRecords:
This class is the consumer fetch task managing Kafka polling loops. Upon catching poll exceptions, it delegates error handling toPollExceptionStrategyinstances likeReconnectErrorStrategy. The fetcher responds to the strategy’s signals by closing or reconnecting the Kafka consumer.Kafka Consumer:
The underlying Kafka consumer is closed and re-created based on reconnect flags controlled by this strategy.Polling Loop:
The polling loop checkscanContinue()to decide whether to retry immediately or halt and reconnect.
Usage in Application Context
When configured as the active poll exception strategy in Apache Camel’s Kafka component, `ReconnectErrorStrategy` ensures that the consumer:
Attempts to recover automatically from transient failures by reconnecting.
Avoids pointless retries when authentication failures occur.
Provides controlled consumer lifecycle transitions to maintain robust message consumption.
Visual Diagram: Sequence Diagram of Exception Handling Workflow
sequenceDiagram
participant Fetcher as KafkaFetchRecords
participant Strategy as ReconnectErrorStrategy
participant Kafka as Kafka Broker
Fetcher->>Kafka: poll()
Kafka-->>Fetcher: throws Exception
Fetcher->>Strategy: handle(partitionLastOffset, exception)
alt AuthenticationException (Non-recoverable)
Strategy->>Fetcher: setReconnect(false)
Strategy->>Fetcher: setConnected(false)
Strategy->>Strategy: retry = false
else Other Exceptions (Recoverable)
Strategy->>Fetcher: setReconnect(true)
Strategy->>Fetcher: setConnected(false)
Strategy->>Strategy: retry = false
end
Fetcher->>Fetcher: close consumer
Fetcher->>Fetcher: on next poll cycle, reconnect if enabled
Summary
`ReconnectErrorStrategy.java` is a key error handling strategy within the Kafka consumer error handling framework of Apache Camel. It enables the Kafka consumer to recover from transient problems by requesting a consumer reconnect, while also identifying and halting on non-recoverable authentication errors.
By manipulating consumer connection flags and signaling poll loop continuation, it helps maintain a resilient message consumption process that can self-heal from many common Kafka consumer disruptions.