DiscardErrorStrategy.java


Overview

`DiscardErrorStrategy.java` implements a **poll exception handling strategy** for Kafka consumers within the Apache Camel Kafka component. Its primary role is to define how the Kafka consumer should behave when an exception occurs during the polling of messages from Kafka brokers.

Specifically, the `DiscardErrorStrategy`:

This approach ensures that the Kafka consumer does not get stuck retrying or halting due to poison messages, enabling robust, continuous consumption in environments where occasional malformed or problematic messages may occur.


Class: DiscardErrorStrategy

public class DiscardErrorStrategy implements PollExceptionStrategy

Purpose

Implements the `PollExceptionStrategy` interface to provide a discard-on-error strategy for Kafka polling exceptions.


Properties

Property

Type

Description

`consumer`

`Consumer`

The Kafka consumer instance to manipulate offsets.

`LOG`

`Logger`

Logger instance for warning and informational logs.


Constructor

public DiscardErrorStrategy(Consumer<?, ?> consumer)

Parameters

Description

Initializes the strategy with the consumer instance that will be manipulated to skip messages causing exceptions.


Methods

handle(long partitionLastOffset, Exception exception)

@Override
public void handle(long partitionLastOffset, Exception exception)

**Usage Example:**

DiscardErrorStrategy strategy = new DiscardErrorStrategy(consumer);
try {
    // Poll messages
} catch (Exception e) {
    long lastOffset = ...; // obtain last offset processed
    strategy.handle(lastOffset, e);
}

canContinue()

@Override
public boolean canContinue()

Implementation Details


Interaction with Other System Components


Example Workflow

When a poll exception occurs during Kafka message consumption:

  1. The fetch loop catches the exception.

  2. It calls DiscardErrorStrategy.handle(lastOffset, exception).

  3. The strategy logs a warning.

  4. It instructs the Kafka consumer to seek to the next offset after partitionLastOffset.

  5. The fetch loop queries canContinue(), which returns true.

  6. The fetch loop resumes polling from the new offset, effectively skipping the problematic message.


Visual Diagram

flowchart TD
    A[Kafka Consumer Poll] --> B{Exception Occurs?}
    B -- Yes --> C[DiscardErrorStrategy.handle()]
    C --> D[Seek Consumer to Next Offset]
    D --> E[Continue Polling Messages]
    E --> A
    B -- No --> F[Process Messages Normally]
    F --> A

Summary

`DiscardErrorStrategy.java` provides a straightforward and effective poll exception handling strategy for Kafka consumers that prefer to skip over problematic messages rather than retry or stop processing. By advancing the consumer offset beyond the poison message and signaling continuous operation, it ensures that message consumption remains robust and uninterrupted in the face of occasional data issues.

This implementation is particularly suited for use cases where message loss of malformed or poison messages is an acceptable trade-off for maintaining continuous processing throughput.


Appendix: Related Interface

PollExceptionStrategy (brief)


References


License

Licensed under the Apache License, Version 2.0. See the license header in the source code for details.