KafkaConsumerListener.java
Overview
`KafkaConsumerListener.java` defines the `KafkaConsumerListener` class, a key component in the Apache Camel Kafka consumer error handling mechanism. This class implements the `ConsumerListener` interface and is responsible for managing the pause and resume behavior of Kafka consumers based on processing outcomes.
The listener monitors message consumption and processing results to detect failures. When a failure occurs, it pauses the Kafka consumer to prevent further message consumption and optionally seeks to the beginning or end of the assigned partitions depending on a configured `SeekPolicy`. It also evaluates conditions for resuming the consumer, enabling a controlled and safe continuation of message flow.
This class plays an essential role in integrating error handling with the Kafka consumer lifecycle, ensuring robust and fault-tolerant message processing within Camel Kafka routes.
Class: KafkaConsumerListener
Package
org.apache.camel.component.kafka.consumer.errorhandler
Implements
ConsumerListener<Object, ProcessingResult>
Purpose
To observe Kafka consumer events related to message consumption and processing, and manage the consumer's pause and resume state based on processing success or failure.
Properties
Property | Type | Description |
|---|---|---|
`consumer` | `Consumer` | The Kafka consumer instance controlled by this listener. |
`seekPolicy` | `SeekPolicy` | Policy that dictates where to seek in the topic partitions upon failure (`BEGINNING` or `END`). |
`afterConsumeEval` | `Predicate` | A predicate evaluated after consumption to decide if the consumer should be resumed. |
`paused` | `boolean` | Internal flag indicating whether the consumer is currently paused. |
`LOG` | `Logger` | Logger instance for logging events and warnings. |
Methods
Getter and Setter for consumer
public Consumer<?, ?> getConsumer()
Returns the Kafka consumer instance associated with this listener.
public void setConsumer(Consumer<?, ?> consumer)
Assigns the Kafka consumer instance to this listener.
Getter and Setter for seekPolicy
public SeekPolicy getSeekPolicy()
Returns the current seek policy.
public void setSeekPolicy(SeekPolicy seekPolicy)
Sets the seek policy, which determines where the consumer seeks upon failure.
setResumableCheck
@Override
public void setResumableCheck(Predicate<?> afterConsumeEval)
Sets the predicate used to evaluate whether the consumer can resume after being paused.
Parameters:
afterConsumeEval: APredicate<?>that returnstrueif the consumer should resume.
afterConsume
@Override
public boolean afterConsume(Object ignored)
Called after message consumption but before processing.
If the consumer is paused, it evaluates the
afterConsumeEvalpredicate:If the predicate returns
true, the consumer is resumed, andtrueis returned.Otherwise, the consumer remains paused and
falseis returned.
If the consumer is not paused, returns
trueto allow continued processing.Parameters:
ignored: Unused parameter, typically the consumed record or context.
Returns:
trueif the consumer should continue processing.falseif the consumer is paused and should not continue yet.
Usage Example:
KafkaConsumerListener listener = new KafkaConsumerListener();
listener.setConsumer(kafkaConsumer);
listener.setResumableCheck(() -> {
// logic to determine if consumer can resume, e.g., external flag check
return isSystemRecovered();
});
boolean canContinue = listener.afterConsume(null);
if (!canContinue) {
// wait or log pause state
}
afterProcess
@Override
public boolean afterProcess(ProcessingResult result)
Called after message processing completes.
If processing failed (
result.isFailed()), this method:Pauses the Kafka consumer.
Sets the internal
pausedflag.Seeks to the beginning or end of the assigned partitions depending on
seekPolicy.Returns
falseto indicate processing should halt.
If processing succeeded, returns
trueto continue.Parameters:
result: AProcessingResultobject indicating success or failure of processing.
Returns:
falseif processing failed and consumer is paused.trueif processing succeeded.
Usage Example:
ProcessingResult result = processMessage(record);
if (!listener.afterProcess(result)) {
// stop further processing, consumer is paused
}
Important Implementation Details
Pause and Seek Logic: When processing fails, the listener uses Kafka's
pause()on assigned partitions to halt consumption. It then seeks to the configured position (BEGINNINGorEND) to reprocess or skip messages.Resuming Condition: The class relies on an externally provided
Predicate<?>to determine if the pause condition has been resolved and the consumer can safely resume.Type Safety: The
afterConsumemethod uses a suppressed warning for unused parameter because the interface demands an argument, but the listener does not use it.Logging: Uses SLF4J logger to warn about pause/resume events and state changes, aiding diagnostics.
Interaction with Other Components
Kafka Consumer: The listener directly controls the Kafka consumer instance by invoking
pause(),resume(), and seek operations.SeekPolicy Enum: Guides offset repositioning on failure, critical for correct reprocessing semantics.
ProcessingResult: Represents the outcome of message processing, used to decide pause/resume actions.
ConsumerListener Interface: This class is used as a callback interface by the Kafka consumption framework (e.g.,
KafkaFetchRecords) to hook into consumption and processing lifecycle events.Pause and Resume Mechanism: Integrates with higher-level pause/resume lifecycle management of fetch tasks and consumers, enabling coordinated consumer control in the Camel Kafka component.
Usage Scenario
`KafkaConsumerListener` is typically instantiated and configured within the Kafka consumer infrastructure in Camel. It is assigned the Kafka consumer instance and optionally a `SeekPolicy` and a predicate to control resumption.
When a message fails processing, the listener pauses the consumer and seeks to the configured position, preventing further message consumption. It then periodically evaluates the predicate to determine if conditions have improved (e.g., error resolved, manual intervention done) to resume consumption safely.
Visual Diagram
classDiagram
class KafkaConsumerListener {
-Consumer<?, ?> consumer
-SeekPolicy seekPolicy
-Predicate<?> afterConsumeEval
-boolean paused
+getConsumer(): Consumer<?, ?>
+setConsumer(Consumer<?, ?>)
+getSeekPolicy(): SeekPolicy
+setSeekPolicy(SeekPolicy)
+setResumableCheck(Predicate<?>)
+afterConsume(Object): boolean
+afterProcess(ProcessingResult): boolean
}
KafkaConsumerListener ..> Consumer : controls
KafkaConsumerListener ..> SeekPolicy : uses
KafkaConsumerListener ..> Predicate : evaluates
KafkaConsumerListener ..> ProcessingResult : checks processing outcome
Summary
`KafkaConsumerListener` is a concise, yet crucial class implementing pause and resume behavior in the Camel Kafka consumer framework. By responding to processing results, it controls the Kafka consumer lifecycle to halt message consumption on failures and resume consumption based on configurable conditions. Its design ensures error handling is tightly integrated with offset management and consumer control, promoting resilient, controlled Kafka message processing.
Appendix: Related Concepts
Concept | Description |
|---|---|
**SeekPolicy** | Enum defining whether to seek to the beginning or end of partitions after a failure. |
**ProcessingResult** | Encapsulates the outcome of message processing (success or failure). |
**ConsumerListener** | Interface for listening to consumer events such as after consume and after process. |
**KafkaFetchRecords** | Worker thread class that polls Kafka and delegates to the listener for pause/resume control. |