BridgeErrorStrategy.java


Overview

`BridgeErrorStrategy` is a concrete implementation of the `PollExceptionStrategy` interface designed for Apache Camel's Kafka component. Its primary purpose is to delegate exception handling that occurs during Kafka consumer polling to Camel's built-in error handling infrastructure (the "bridge error handler"). This allows Kafka polling exceptions to be managed by Camel's routing error handlers, enabling integration with Camel's error handling features such as retries, dead-letter channels, and custom error processors.

The strategy also manages Kafka consumer offset handling by skipping poison messages (messages causing exceptions) to prevent consumer thread stalls. Additionally, it stops polling on critical security-related exceptions (authentication or authorization failures).


Class: BridgeErrorStrategy

Package

org.apache.camel.component.kafka.consumer.errorhandler

Purpose

Handles Kafka consumer poll exceptions by delegating to Camel's error handler bridge and managing consumer offsets, with logic to stop polling on security failures.

Declaration

public class BridgeErrorStrategy implements PollExceptionStrategy

Fields

Field

Type

Description

`LOG`

`Logger`

SLF4J logger for logging warnings and info messages.

`recordFetcher`

`KafkaFetchRecords`

Reference to the Kafka fetcher managing polling and offset operations.

`consumer`

`Consumer`

Kafka consumer instance used to commit or seek offsets.

`continueFlag`

`boolean`

Flag indicating whether polling should continue after an exception; initialized as `true`.

Constructor

public BridgeErrorStrategy(KafkaFetchRecords recordFetcher, Consumer<?, ?> consumer)

Methods

boolean canContinue()

if (!bridgeErrorStrategy.canContinue()) {
    // Stop polling or shut down consumer
}

void handle(long partitionLastOffset, Exception exception)

try {
    // Kafka consumer poll code...
} catch (Exception e) {
    bridgeErrorStrategy.handle(lastOffset, e);
    if (!bridgeErrorStrategy.canContinue()) {
        // Handle shutdown
    }
}

Important Implementation Details


Interaction with Other Components


Usage Summary

`BridgeErrorStrategy` is suitable when the developer wants to:


Visual Diagram: Class Structure

classDiagram
    class BridgeErrorStrategy {
        - static final Logger LOG
        - KafkaFetchRecords recordFetcher
        - Consumer<?, ?> consumer
        - boolean continueFlag
        + BridgeErrorStrategy(recordFetcher: KafkaFetchRecords, consumer: Consumer<?, ?>)
        + boolean canContinue()
        + void handle(partitionLastOffset: long, exception: Exception)
    }
    BridgeErrorStrategy ..|> PollExceptionStrategy

Sequence Diagram: Exception Handling Workflow

sequenceDiagram
    participant KafkaConsumer as Kafka Consumer
    participant BridgeStrategy as BridgeErrorStrategy
    participant KafkaFetchRecords as Record Fetcher
    participant CamelErrorHandler as Camel Error Handler Bridge

    KafkaConsumer->>BridgeStrategy: Poll Exception Occurs (exception, partitionOffset)
    BridgeStrategy->>KafkaFetchRecords: getBridge().handleException(exception)
    KafkaFetchRecords->>CamelErrorHandler: handleException(exception)
    BridgeStrategy->>KafkaConsumer: Seek to next offset (partitionOffset + 1)
    alt Authentication/Authorization Exception
        BridgeStrategy-->>KafkaConsumer: Signal stop polling (canContinue = false)
    else Other Exception
        BridgeStrategy-->>KafkaConsumer: Continue polling (canContinue = true)
    end

Summary

The `BridgeErrorStrategy` class provides a robust mechanism to handle Kafka consumer poll exceptions by bridging them into Camel's error handling system. It effectively balances offset management to avoid consumer stalls with flexible error processing capabilities, enabling developers to leverage Camel's rich error-handling features for Kafka consumer exceptions. Its design ensures safe continuation or controlled shutdown of Kafka consumers based on the nature of encountered exceptions, particularly security-related ones.


Appendix: Example Usage in Context

KafkaFetchRecords fetcher = ...; // Kafka consumer fetcher instance
Consumer<byte[], byte[]> consumer = ...; // Kafka consumer instance

BridgeErrorStrategy strategy = new BridgeErrorStrategy(fetcher, consumer);

try {
    // Kafka poll logic
} catch (Exception e) {
    strategy.handle(lastCommittedOffset, e);
    if (!strategy.canContinue()) {
        // Shutdown or restart consumer logic
    }
}

This usage pattern shows how `BridgeErrorStrategy` seamlessly integrates into the Kafka polling loop, providing error delegation and offset management with polling continuation control.