SeekUtil.java


Overview

`SeekUtil.java` is a utility class within the Apache Camel Kafka component, specifically under the consumer error handling package (`org.apache.camel.component.kafka.consumer.errorhandler`). Its primary purpose is to provide a helper method that facilitates seeking to the next offset for Kafka consumer partitions during message consumption.

This functionality is critical in scenarios where a consumer needs to skip a message or reposition itself after handling an error, ensuring the consumer continues processing from the correct next offset, thus maintaining message processing continuity and avoiding re-processing of previously consumed messages.

The class is designed as a final utility class with a private constructor to prevent instantiation and contains only static methods.


Class: SeekUtil

Description

A utility class providing methods to manipulate the offset of Kafka consumer partitions. It currently contains one main method to seek the consumer to the next offset after a given last processed offset or the current position.

Class Declaration

final class SeekUtil

Logger

private static final Logger LOG = LoggerFactory.getLogger(SeekUtil.class);

Used for logging informational messages about the seeking operation.


Methods

1. seekToNextOffset

public static void seekToNextOffset(Consumer<?, ?> consumer, long partitionLastOffset)

Purpose

Adjusts the Kafka consumer's position to the next offset to continue polling messages. It either uses the provided `partitionLastOffset` or, if it is invalid (`-1`), uses the consumer's current position to determine where to seek next.

Parameters

Parameter

Type

Description

`consumer`

`Consumer`

The Kafka consumer instance whose partitions offsets are to be manipulated.

`partitionLastOffset`

`long`

The last offset processed for the partition(s). If `-1`, the method uses the current consumer position.

Return Value

`void` — This method performs an action on the consumer but does not return a value.

Behavior and Implementation Details

Usage Example

Consumer<String, String> kafkaConsumer = ...; // existing Kafka consumer
long lastProcessedOffset = 100L;

// Seek consumer to next offset after last processed offset
SeekUtil.seekToNextOffset(kafkaConsumer, lastProcessedOffset);

This will reposition the consumer to offset 101 on all assigned partitions.


Important Implementation Details


Interaction with Other Components


Visual Diagram

The following flowchart illustrates the workflow of the `seekToNextOffset` method, depicting the decision points and actions taken based on the input parameters and consumer state.

flowchart TD
    A[Start: Call seekToNextOffset(consumer, partitionLastOffset)]
    B{Is consumer.assignment() != null?}
    C{Is partitionLastOffset != -1?}
    D[Calculate next offset = partitionLastOffset + 1]
    E[For each TopicPartition tp]
    F[Log: seeking to next offset]
    G[consumer.seek(tp, next offset)]
    H[Calculate next offset = consumer.position(tp) + 1]
    I[Log: seeking to next offset (only once)]
    J[consumer.seek(tp, next offset)]
    K[End]

    A --> B
    B -- No --> K
    B -- Yes --> C
    C -- Yes --> D
    D --> E
    E --> F
    F --> G
    G --> E
    E -->|All partitions processed| K
    C -- No --> E
    E --> H
    H --> I
    I --> J
    J --> E

Summary

`SeekUtil.java` is a specialized utility class that provides a simple yet crucial function: advancing Kafka consumer partitions to the next offset for continued message consumption. It is designed for use in error handling scenarios within the Apache Camel Kafka component, helping maintain consistent consumer behavior and avoiding message re-processing. Its straightforward API and logging approach make it an effective tool for offset management in Kafka consumer workflows.