BridgeErrorStrategy.java
Overview
`BridgeErrorStrategy` is a concrete implementation of the `PollExceptionStrategy` interface designed for Apache Camel's Kafka component. Its primary purpose is to delegate exception handling that occurs during Kafka consumer polling to Camel's built-in error handling infrastructure (the "bridge error handler"). This allows Kafka polling exceptions to be managed by Camel's routing error handlers, enabling integration with Camel's error handling features such as retries, dead-letter channels, and custom error processors.
The strategy also manages Kafka consumer offset handling by skipping poison messages (messages causing exceptions) to prevent consumer thread stalls. Additionally, it stops polling on critical security-related exceptions (authentication or authorization failures).
Class: BridgeErrorStrategy
Package
org.apache.camel.component.kafka.consumer.errorhandler
Purpose
Handles Kafka consumer poll exceptions by delegating to Camel's error handler bridge and managing consumer offsets, with logic to stop polling on security failures.
Declaration
public class BridgeErrorStrategy implements PollExceptionStrategy
Fields
Field | Type | Description |
|---|---|---|
`LOG` | `Logger` | SLF4J logger for logging warnings and info messages. |
`recordFetcher` | `KafkaFetchRecords` | Reference to the Kafka fetcher managing polling and offset operations. |
`consumer` | `Consumer` | Kafka consumer instance used to commit or seek offsets. |
`continueFlag` | `boolean` | Flag indicating whether polling should continue after an exception; initialized as `true`. |
Constructor
public BridgeErrorStrategy(KafkaFetchRecords recordFetcher, Consumer<?, ?> consumer)
Parameters:
recordFetcher: the Kafka fetcher instance that manages polling and error handling integration.consumer: the Kafka consumer instance whose offset will be manipulated.
Description:
Initializes the strategy with references to the record fetcher and Kafka consumer to enable delegation and offset management.
Methods
boolean canContinue()
Description:
Returns whether the consumer should continue polling after handling an exception.Returns:
trueif polling can continue;falseif polling should stop (e.g., in case of authentication or authorization failures).Usage Example:
if (!bridgeErrorStrategy.canContinue()) {
// Stop polling or shut down consumer
}
void handle(long partitionLastOffset, Exception exception)
Parameters:
partitionLastOffset: the last committed offset for the partition where the exception occurred.exception: theExceptioninstance thrown during polling.
Description:
Handles exceptions raised during Kafka consumer polling by:Logging a warning that handling is being delegated.
Delegating the exception to Camel's bridge error handler via
recordFetcher.getBridge().handleException(exception).Using
SeekUtil.seekToNextOffset(consumer, partitionLastOffset)to skip the problematic (poison) message by seeking to the next offset.Checking if the exception is a Kafka
AuthenticationExceptionorAuthorizationException; if so, setscontinueFlagtofalseto stop polling.
Usage Example:
try {
// Kafka consumer poll code...
} catch (Exception e) {
bridgeErrorStrategy.handle(lastOffset, e);
if (!bridgeErrorStrategy.canContinue()) {
// Handle shutdown
}
}
Important Implementation Details
Delegation to Camel's Error Handler:
The strategy leverages therecordFetcherto access Camel's error handling bridge, invokinghandleException(exception). This allows Kafka poll exceptions to be processed within Camel's routing error handling framework, supporting features like retries and dead-letter channels.Offset Management:
To avoid the consumer getting stuck on a poison message that caused the exception, the strategy advances the Kafka consumer offset to the next message usingSeekUtil.seekToNextOffset. This action skips the problematic message for subsequent polling.Security Exception Handling:
If the exception is a security-related Kafka exception (AuthenticationExceptionorAuthorizationException), the strategy signals that polling should stop by settingcontinueFlagtofalse. This prevents the consumer from repeatedly failing on unrecoverable security errors.Logging:
Warning logs are emitted to inform about the delegation and handling actions, aiding in debugging and monitoring.
Interaction with Other Components
KafkaFetchRecords (
recordFetcher):
TheBridgeErrorStrategyuses this fetcher to:Access Camel's error handler bridge.
Control consumer lifecycle indirectly by managing offsets.
Kafka Consumer (
consumer):
The strategy manipulates the Kafka consumer's offset viaSeekUtilto skip poison messages.SeekUtil:
A utility class responsible for seeking a Kafka consumer to a specific offset. Here, used to jump past the last offset on error.Camel Error Handler Bridge:
The error handler bridge within Camel handles the exception passed byBridgeErrorStrategy. This enables complex error handling scenarios defined in Camel routes.
Usage Summary
`BridgeErrorStrategy` is suitable when the developer wants to:
Integrate Kafka consumer polling exceptions into Camel's error handling mechanisms.
Automatically skip poison messages without stopping the consumer.
Stop polling on critical security exceptions.
Leverage Camel's error handling capabilities for retries, dead-letter queues, or other custom error policies.
Visual Diagram: Class Structure
classDiagram
class BridgeErrorStrategy {
- static final Logger LOG
- KafkaFetchRecords recordFetcher
- Consumer<?, ?> consumer
- boolean continueFlag
+ BridgeErrorStrategy(recordFetcher: KafkaFetchRecords, consumer: Consumer<?, ?>)
+ boolean canContinue()
+ void handle(partitionLastOffset: long, exception: Exception)
}
BridgeErrorStrategy ..|> PollExceptionStrategy
Sequence Diagram: Exception Handling Workflow
sequenceDiagram
participant KafkaConsumer as Kafka Consumer
participant BridgeStrategy as BridgeErrorStrategy
participant KafkaFetchRecords as Record Fetcher
participant CamelErrorHandler as Camel Error Handler Bridge
KafkaConsumer->>BridgeStrategy: Poll Exception Occurs (exception, partitionOffset)
BridgeStrategy->>KafkaFetchRecords: getBridge().handleException(exception)
KafkaFetchRecords->>CamelErrorHandler: handleException(exception)
BridgeStrategy->>KafkaConsumer: Seek to next offset (partitionOffset + 1)
alt Authentication/Authorization Exception
BridgeStrategy-->>KafkaConsumer: Signal stop polling (canContinue = false)
else Other Exception
BridgeStrategy-->>KafkaConsumer: Continue polling (canContinue = true)
end
Summary
The `BridgeErrorStrategy` class provides a robust mechanism to handle Kafka consumer poll exceptions by bridging them into Camel's error handling system. It effectively balances offset management to avoid consumer stalls with flexible error processing capabilities, enabling developers to leverage Camel's rich error-handling features for Kafka consumer exceptions. Its design ensures safe continuation or controlled shutdown of Kafka consumers based on the nature of encountered exceptions, particularly security-related ones.
Appendix: Example Usage in Context
KafkaFetchRecords fetcher = ...; // Kafka consumer fetcher instance
Consumer<byte[], byte[]> consumer = ...; // Kafka consumer instance
BridgeErrorStrategy strategy = new BridgeErrorStrategy(fetcher, consumer);
try {
// Kafka poll logic
} catch (Exception e) {
strategy.handle(lastCommittedOffset, e);
if (!strategy.canContinue()) {
// Shutdown or restart consumer logic
}
}
This usage pattern shows how `BridgeErrorStrategy` seamlessly integrates into the Kafka polling loop, providing error delegation and offset management with polling continuation control.