Kafka Consumer Error Handling
Overview
The Kafka Consumer Error Handling module defines a set of strategies that govern how the system responds to exceptions encountered during Kafka consumer polling operations. Polling exceptions can occur for various reasons such as transient network issues, poison messages, authentication failures, or authorization problems. This module provides configurable, pluggable strategies to decide whether to retry, skip, reconnect, stop, or delegate error handling when such exceptions happen.
This capability is crucial to maintaining robust and resilient Kafka consumer behavior within Apache Camel routes, ensuring that message consumption continues smoothly or fails gracefully depending on the scenario and configured strategy.
Core Concepts
Purpose and Problem Addressed
Kafka consumers poll Kafka brokers for messages in a continuous loop. During polling, exceptions may arise which, if unhandled or handled improperly, can cause consumer threads to halt, lose messages, or enter unstable states.
The Kafka Consumer Error Handling module abstracts these exception handling mechanisms into distinct **poll exception strategies**, each implementing a consistent interface. This abstraction helps:
Encapsulate different recovery or failure policies.
Allow users to configure the desired behavior per use case.
Simplify the consumer poll loop by delegating error handling.
Provide fine-grained control over poison message handling, connectivity issues, and security errors.
How the Module Works
PollExceptionStrategy Interface
All error handling strategies implement the `PollExceptionStrategy` interface, which defines:
handle(long partitionLastOffset, Exception exception): Reacts to a poll exception, typically by performing recovery or cleanup actions.canContinue(): Indicates whether the consumer should continue polling after the exception.reset(): (optional) Resets internal state to allow retrying after an error.
This uniform interface enables the Kafka consumer's fetch loop to invoke error handling without coupling to specific strategy logic.
Available Strategies
The module provides five main strategies, each tailored for a particular error handling policy:
1. Discard Strategy (DiscardErrorStrategy)
Goal: Skip the problematic message causing the exception and continue processing subsequent messages.
Behavior: Seeks the consumer to the next offset after the last known offset, effectively discarding the poison message.
Continuation: Always returns
trueforcanContinue(), so polling continues.Use case: When poison messages are rare and should be skipped without interrupting the consumer.
*Example snippet:*
@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Requesting the consumer to discard the message and continue to the next based on polling exception strategy");
SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
}
@Override
public boolean canContinue() {
return true;
}
2. Retry Strategy (RetryErrorStrategy)
Goal: Retry polling the same message without advancing the offset.
Behavior: Does not change the consumer offset; simply logs and signals to retry.
Continuation: Always returns
trueto keep polling.Use case: For transient exceptions where retrying the same message may succeed later.
*Example snippet:*
@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Requesting the consumer to retry polling the same message based on polling exception strategy");
}
@Override
public boolean canContinue() {
return true;
}
3. Reconnect Strategy (ReconnectErrorStrategy)
Goal: Attempt to recover by reconnecting the Kafka consumer client.
Behavior:
If the exception is an unrecoverable
AuthenticationException, disables reconnect and marks the consumer as disconnected.Otherwise, requests a reconnect on the next polling attempt.
After handling, signals that the current consumer should close.
Continuation: Returns
falseafter handling to stop current polling; reconnect triggers a new consumer creation.Use case: For network or broker-side issues where reconnecting the client can resolve the problem.
*Example snippet:*
@Override
public void handle(long partitionLastOffset, Exception exception) {
if (exception instanceof AuthenticationException) {
LOG.warn("Kafka reported a non-recoverable authentication error. The client will not reconnect");
recordFetcher.setReconnect(false);
recordFetcher.setConnected(false);
} else {
LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
recordFetcher.setReconnect(true);
recordFetcher.setConnected(false);
}
retry = false;
}
@Override
public boolean canContinue() {
return retry;
}
4. Stop Strategy (StopErrorStrategy)
Goal: Terminate the consumer on encountering an exception.
Behavior: Marks the consumer as disconnected and signals polling to stop.
Continuation: Returns
falsefromcanContinue()after handling.Use case: For critical errors where consumer operation must halt to avoid data corruption or inconsistent state.
*Example snippet:*
@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Requesting the consumer to stop based on polling exception strategy");
retry = false;
recordFetcher.setConnected(false);
}
@Override
public boolean canContinue() {
return retry;
}
5. Bridge Error Strategy (BridgeErrorStrategy)
Goal: Delegate exception handling to an external Camel error handler.
Behavior:
Calls the Camel route's bridge error handler to process the exception.
Seeks past the poison message to avoid infinite retries.
Stops polling if the exception is an
AuthenticationExceptionorAuthorizationException.
Continuation: Returns
falsefor non-recoverable security exceptions, otherwise continues polling.Use case: When users want to route poll exceptions into Camel's routing error handling mechanism.
*Example snippet:*
@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Deferring processing to the exception handler based on polling exception strategy");
recordFetcher.getBridge().handleException(exception);
SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
if (exception instanceof AuthenticationException || exception instanceof AuthorizationException) {
continueFlag = false;
}
}
@Override
public boolean canContinue() {
return continueFlag;
}
Interaction with Other Modules
KafkaFetchRecords: The consumer fetch task manages the polling loop and invokes the configured
PollExceptionStrategyupon exceptions duringconsumer.poll(). Strategies likeReconnectErrorStrategyandStopErrorStrategymanipulate the fetcher’s connected and reconnect flags to control consumer lifecycle.SeekUtil: Utility class used by some strategies (e.g.,
DiscardErrorStrategy,BridgeErrorStrategy) to manually adjust consumer offsets, skipping poison messages.Camel Route Error Handling: The
BridgeErrorStrategyintegrates with the Camel error handler bridge, allowing poll exceptions to be routed into Camel’s error handling framework for custom processing.Kafka Consumer: The underlying Kafka consumer instance is manipulated by strategies to seek offsets or close connections as required by the error handling logic.
Sequence Diagram: Kafka Consumer Poll Exception Handling Workflow
sequenceDiagram
participant Fetcher as KafkaFetchRecords (Consumer Fetcher)
participant Consumer as Kafka Consumer
participant Strategy as PollExceptionStrategy
participant SeekUtil as Seek Utility
participant Camel as Camel Error Handler Bridge
Fetcher->>Consumer: poll()
alt Exception occurs during poll
Consumer-->>Fetcher: throws Exception
Fetcher->>Strategy: handle(partitionLastOffset, exception)
alt DiscardErrorStrategy or BridgeErrorStrategy
Strategy->>SeekUtil: seekToNextOffset(consumer, partitionLastOffset)
end
alt BridgeErrorStrategy
Strategy->>Camel: handleException(exception)
end
Strategy-->>Fetcher: canContinue()
alt canContinue = true
Fetcher->>Consumer: poll() (retry)
else canContinue = false
Fetcher->>Fetcher: stop polling or reconnect logic
end
else poll successful
Fetcher->>Fetcher: process records
Fetcher->>Consumer: poll() (next iteration)
end
Summary
The Kafka Consumer Error Handling module encapsulates the logic for responding to exceptions during Kafka consumer polling into modular strategies. Each strategy defines a distinct approach — from skipping poison messages, retrying, reconnecting, stopping, to delegating error handling to Camel routes. This design enables flexible and robust consumer behavior adaptable to various operational scenarios, enhancing fault tolerance and message processing reliability within the Kafka component.