StopErrorStrategy.java
Overview
`StopErrorStrategy.java` defines a **polling exception strategy** for Kafka consumers within the Apache Camel Kafka component. Its primary purpose is to **immediately halt the Kafka consumer's polling loop upon encountering a polling exception**. This conservative strategy prevents the consumer from repeatedly encountering errors, which could lead to resource exhaustion, inconsistent state, or message loss.
As part of the error handling framework for Kafka consumers, this strategy implements the `PollExceptionStrategy` interface and is invoked whenever a polling exception occurs during the Kafka polling cycle. Upon handling an error, it signals the consumer fetch task to stop further polling and marks the consumer as disconnected.
Class: StopErrorStrategy
Description
`StopErrorStrategy` encapsulates the logic to stop a Kafka consumer upon polling exceptions. When triggered, it disables retries and sets the consumer's connected status to `false`, effectively terminating the consumer’s fetch loop.
This strategy is suitable for critical error scenarios where continuing consumption might be harmful or when manual intervention is desired before resuming consumption.
Package
org.apache.camel.component.kafka.consumer.errorhandler
Implements
PollExceptionStrategy
Fields
Field | Type | Description |
|---|---|---|
`LOG` | `Logger` | Logger instance for logging warnings |
`recordFetcher` | `KafkaFetchRecords` | Reference to the Kafka consumer fetch task, used to control consumer state |
`retry` | `boolean` | Flag indicating if polling should continue; initially `true` |
Constructor
public StopErrorStrategy(KafkaFetchRecords recordFetcher)
Parameters:
recordFetcher- The Kafka fetch task responsible for polling Kafka records. This is used internally to set the consumer connection state.
Description:
Initializes theStopErrorStrategywith a reference to the consumer fetcher. The fetcher will be instructed to stop consuming when an exception occurs.Example:
KafkaFetchRecords fetcher = ...; // initialized elsewhere
StopErrorStrategy stopStrategy = new StopErrorStrategy(fetcher);
Methods
void reset()
Description:
Resets the internal retry flag totrue, allowing the strategy to permit continuation again. Typically used when restarting or resetting the consumer.Usage:
stopErrorStrategy.reset();
This would enable the consumer to continue polling after being stopped previously.
boolean canContinue()
Description:
Indicates whether the polling loop can continue after an exception has been handled.Returns:
trueif the consumer should continue polling.falseonce the strategy has stopped the consumer due to an exception.
Usage:
if (!stopErrorStrategy.canContinue()) {
// Stop polling and terminate consumer fetch loop
}
void handle(long partitionLastOffset, Exception exception)
Parameters:
partitionLastOffset- The offset of the last partition fetched before the exception occurred.exception- The exception thrown during polling.
Description:
Handles a polling exception by:Logging a warning indicating the consumer will stop.
Setting the
retryflag tofalseto prevent further polling attempts.Marking the
recordFetcheras disconnected, which stops the consumer's fetch loop.
Usage:
try {
// polling logic
} catch (Exception e) {
stopErrorStrategy.handle(lastOffset, e);
}
Example Implementation:
@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Requesting the consumer to stop based on polling exception strategy");
retry = false;
recordFetcher.setConnected(false);
}
Important Implementation Details
Retry Flag: The internal boolean
retrycontrols whether the consumer polling loop should continue. It starts astruebut is set tofalseonce a polling exception occurs and is handled.Consumer Connection State: The strategy interacts directly with
KafkaFetchRecordsby setting its connected state tofalse. This signals the consumer fetcher to stop polling.Logging: Uses SLF4J to log a warning message when stopping the consumer, aiding in diagnostics.
Thread Safety: The class maintains simple state and is expected to be used within the Kafka consumer thread context.
Interaction with Other Components
KafkaFetchRecords: The class holds a reference to the consumer fetcher (
KafkaFetchRecords), which manages the Kafka consumer lifecycle. By callingrecordFetcher.setConnected(false), it instructs the fetcher to terminate polling.PollExceptionStrategy Interface:
StopErrorStrategyimplements this interface, allowing it to be plugged into the Kafka consumer's polling loop error handling mechanism.Kafka Consumer Poll Loop: When an exception is thrown during
consumer.poll(), the consumer fetcher delegates exception handling to the configuredPollExceptionStrategy(e.g.,StopErrorStrategy). The strategy decides whether to continue or stop polling.Other Strategies:
StopErrorStrategycontrasts with other strategies such as retrying or reconnecting. It opts for fail-fast termination rather than recovery attempts.
Usage Example
KafkaFetchRecords fetcher = ...; // initialized elsewhere
StopErrorStrategy stopStrategy = new StopErrorStrategy(fetcher);
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// process records
} catch (Exception e) {
stopStrategy.handle(lastOffset, e);
if (!stopStrategy.canContinue()) {
// Stop the consumer fetch loop
break;
}
}
Visual Diagram
classDiagram
class StopErrorStrategy {
-KafkaFetchRecords recordFetcher
-boolean retry
+StopErrorStrategy(KafkaFetchRecords)
+void reset()
+boolean canContinue()
+void handle(long partitionLastOffset, Exception exception)
}
StopErrorStrategy ..|> PollExceptionStrategy
class PollExceptionStrategy {
<<interface>>
+void handle(long partitionLastOffset, Exception exception)
+boolean canContinue()
+void reset()
}
Workflow Diagram: Polling Exception Handling with StopErrorStrategy
flowchart TD
A[Start Kafka Consumer Fetch Loop] --> B[Poll Kafka for Records]
B --> C{Polling Exception Occurs?}
C -- No --> D[Process Records]
D --> B
C -- Yes --> E[Invoke StopErrorStrategy.handle()]
E --> F[Log Warning and Set retry = false]
F --> G[Set recordFetcher.connected = false]
G --> H[Stop Polling Loop]
Summary
`StopErrorStrategy` is a straightforward but critical component in Kafka consumer error handling within Apache Camel. It provides a robust fail-safe by stopping the consumer immediately when an error occurs during polling. This prevents repeated failures, potential data corruption, and unstable consumer states, making it ideal for scenarios requiring strict failure management and manual recovery.
By implementing the `PollExceptionStrategy` interface, it integrates seamlessly into the Kafka consumer fetch loop, offering users a configurable way to enforce immediate consumer shutdown on critical exceptions.
References
PollExceptionStrategyinterface defines the contract for poll exception handling.KafkaFetchRecordsmanages Kafka consumer polling and state.Apache Camel Kafka Component Error Handling documentation for context and related strategies.