KafkaConsumerUtil.java

Overview

`KafkaConsumerUtil.java` is a utility class within the Apache Camel Kafka idempotent processor module. It provides helper methods related to the Kafka consumer API, specifically for checking consumer offsets against target offsets on assigned topic partitions.

The core functionality centers on verifying whether a Kafka consumer has read messages up to specified target offsets across its assigned partitions. This utility is useful in scenarios where consumers need to confirm progress or completion of message processing up to certain offsets, for example in idempotent message consumption workflows or offset management processes.

Class: KafkaConsumerUtil

A utility class that offers static methods to work with Kafka consumers, focusing on offset verification.


Method: isReachedOffsets

public static <K, V> boolean isReachedOffsets(Consumer<K, V> consumer, Map<TopicPartition, Long> targetOffsets)

Purpose

Determines if the given Kafka consumer has reached or surpassed the specified target offsets for all its assigned topic partitions.

Parameters

Returns

Throws

Detailed Behavior

Usage Example

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;
import java.util.HashMap;

KafkaConsumer<String, String> consumer = ...; // initialized and subscribed/assigned
Map<TopicPartition, Long> targetOffsets = new HashMap<>();
targetOffsets.put(new TopicPartition("my-topic", 0), 100L);
targetOffsets.put(new TopicPartition("my-topic", 1), 50L);

boolean reached = KafkaConsumerUtil.isReachedOffsets(consumer, targetOffsets);

if (reached) {
    System.out.println("Consumer has reached the target offsets.");
} else {
    System.out.println("Consumer has not yet reached the target offsets.");
}

Important Implementation Details


Interaction with Other System Components


Visual Diagram

flowchart TD
    A[isReachedOffsets Method] --> B[Validate targetOffsets non-empty]
    B --> C[Get consumer assigned partitions]
    C --> D[Extend targetOffsets with missing partitions -> Long.MIN_VALUE]
    D --> E[For each partition]
    E --> F{consumer.position(partition) >= targetOffsets.get(partition)?}
    F -- Yes --> G[Check next partition]
    F -- No --> H[Return false]
    G --> I{All partitions checked?}
    I -- Yes --> J[Return true]
    I -- No --> E

Summary

`KafkaConsumerUtil.java` provides a focused utility to verify if a Kafka consumer has read up to specified offsets across its assigned partitions. This is essential for managing reliability and idempotency in message processing pipelines using Kafka. The single method `isReachedOffsets` ensures flexible and safe checks of consumer progress, handling edge cases like missing partitions gracefully. It plays a supporting role in the broader Apache Camel Kafka integration ecosystem.