KafkaConsumerListener.java


Overview

`KafkaConsumerListener.java` defines the `KafkaConsumerListener` class, a key component in the Apache Camel Kafka consumer error handling mechanism. This class implements the `ConsumerListener` interface and is responsible for managing the pause and resume behavior of Kafka consumers based on processing outcomes.

The listener monitors message consumption and processing results to detect failures. When a failure occurs, it pauses the Kafka consumer to prevent further message consumption and optionally seeks to the beginning or end of the assigned partitions depending on a configured `SeekPolicy`. It also evaluates conditions for resuming the consumer, enabling a controlled and safe continuation of message flow.

This class plays an essential role in integrating error handling with the Kafka consumer lifecycle, ensuring robust and fault-tolerant message processing within Camel Kafka routes.


Class: KafkaConsumerListener

Package

org.apache.camel.component.kafka.consumer.errorhandler

Implements

ConsumerListener<Object, ProcessingResult>

Purpose

To observe Kafka consumer events related to message consumption and processing, and manage the consumer's pause and resume state based on processing success or failure.


Properties

Property

Type

Description

`consumer`

`Consumer`

The Kafka consumer instance controlled by this listener.

`seekPolicy`

`SeekPolicy`

Policy that dictates where to seek in the topic partitions upon failure (`BEGINNING` or `END`).

`afterConsumeEval`

`Predicate`

A predicate evaluated after consumption to decide if the consumer should be resumed.

`paused`

`boolean`

Internal flag indicating whether the consumer is currently paused.

`LOG`

`Logger`

Logger instance for logging events and warnings.


Methods

Getter and Setter for consumer

public Consumer<?, ?> getConsumer()
public void setConsumer(Consumer<?, ?> consumer)

Getter and Setter for seekPolicy

public SeekPolicy getSeekPolicy()
public void setSeekPolicy(SeekPolicy seekPolicy)

setResumableCheck

@Override
public void setResumableCheck(Predicate<?> afterConsumeEval)

afterConsume

@Override
public boolean afterConsume(Object ignored)
KafkaConsumerListener listener = new KafkaConsumerListener();
listener.setConsumer(kafkaConsumer);
listener.setResumableCheck(() -> {
    // logic to determine if consumer can resume, e.g., external flag check
    return isSystemRecovered();
});
boolean canContinue = listener.afterConsume(null);
if (!canContinue) {
    // wait or log pause state
}

afterProcess

@Override
public boolean afterProcess(ProcessingResult result)
ProcessingResult result = processMessage(record);
if (!listener.afterProcess(result)) {
    // stop further processing, consumer is paused
}

Important Implementation Details


Interaction with Other Components


Usage Scenario

`KafkaConsumerListener` is typically instantiated and configured within the Kafka consumer infrastructure in Camel. It is assigned the Kafka consumer instance and optionally a `SeekPolicy` and a predicate to control resumption.

When a message fails processing, the listener pauses the consumer and seeks to the configured position, preventing further message consumption. It then periodically evaluates the predicate to determine if conditions have improved (e.g., error resolved, manual intervention done) to resume consumption safely.


Visual Diagram

classDiagram
    class KafkaConsumerListener {
        -Consumer<?, ?> consumer
        -SeekPolicy seekPolicy
        -Predicate<?> afterConsumeEval
        -boolean paused
        +getConsumer(): Consumer<?, ?>
        +setConsumer(Consumer<?, ?>)
        +getSeekPolicy(): SeekPolicy
        +setSeekPolicy(SeekPolicy)
        +setResumableCheck(Predicate<?>)
        +afterConsume(Object): boolean
        +afterProcess(ProcessingResult): boolean
    }
    KafkaConsumerListener ..> Consumer : controls
    KafkaConsumerListener ..> SeekPolicy : uses
    KafkaConsumerListener ..> Predicate : evaluates
    KafkaConsumerListener ..> ProcessingResult : checks processing outcome

Summary

`KafkaConsumerListener` is a concise, yet crucial class implementing pause and resume behavior in the Camel Kafka consumer framework. By responding to processing results, it controls the Kafka consumer lifecycle to halt message consumption on failures and resume consumption based on configurable conditions. Its design ensures error handling is tightly integrated with offset management and consumer control, promoting resilient, controlled Kafka message processing.


Appendix: Related Concepts

Concept

Description

**SeekPolicy**

Enum defining whether to seek to the beginning or end of partitions after a failure.

**ProcessingResult**

Encapsulates the outcome of message processing (success or failure).

**ConsumerListener**

Interface for listening to consumer events such as after consume and after process.

**KafkaFetchRecords**

Worker thread class that polls Kafka and delegates to the listener for pause/resume control.


End of Documentation for KafkaConsumerListener.java