KafkaConsumerUtil.java
Overview
`KafkaConsumerUtil.java` is a utility class within the Apache Camel Kafka idempotent processor module. It provides helper methods related to the Kafka consumer API, specifically for checking consumer offsets against target offsets on assigned topic partitions.
The core functionality centers on verifying whether a Kafka consumer has read messages up to specified target offsets across its assigned partitions. This utility is useful in scenarios where consumers need to confirm progress or completion of message processing up to certain offsets, for example in idempotent message consumption workflows or offset management processes.
Class: KafkaConsumerUtil
A utility class that offers static methods to work with Kafka consumers, focusing on offset verification.
Method: isReachedOffsets
public static <K, V> boolean isReachedOffsets(Consumer<K, V> consumer, Map<TopicPartition, Long> targetOffsets)
Purpose
Determines if the given Kafka consumer has reached or surpassed the specified target offsets for all its assigned topic partitions.
Parameters
consumer (
Consumer<K, V>):
A Kafka consumer instance. It is expected that the consumer is assigned to at least one topic partition at the time of this call.targetOffsets (
Map<TopicPartition, Long>):
A non-empty map specifying the target offsets for particular topic partitions. The keys areTopicPartitioninstances; the values are the target offsets (long) that the consumer should have reached or exceeded.
Returns
boolean—
Returnstrueif for every assigned partition, the current consumer position is greater than or equal to the respective target offset. Returnsfalseif the consumer has not yet reached the target offset on any assigned partition.
Throws
IllegalArgumentExceptioniftargetOffsetsisnullor empty.
Detailed Behavior
The method first validates that the
targetOffsetsmap is not empty, throwing an exception otherwise.It retrieves the current set of assigned partitions from the consumer using
consumer.assignment().For any partition assigned to the consumer but missing in the
targetOffsetsmap, the method adds an entry withLong.MIN_VALUEas the target offset. This effectively means those partitions are considered already reached, as any consumer position will be greater than or equal toLong.MIN_VALUE.Finally, it checks whether the consumer's current position for each assigned partition is at least the target offset.
Usage Example
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.HashMap;
KafkaConsumer<String, String> consumer = ...; // initialized and subscribed/assigned
Map<TopicPartition, Long> targetOffsets = new HashMap<>();
targetOffsets.put(new TopicPartition("my-topic", 0), 100L);
targetOffsets.put(new TopicPartition("my-topic", 1), 50L);
boolean reached = KafkaConsumerUtil.isReachedOffsets(consumer, targetOffsets);
if (reached) {
System.out.println("Consumer has reached the target offsets.");
} else {
System.out.println("Consumer has not yet reached the target offsets.");
}
Important Implementation Details
Handling Missing Partitions in Target Offsets:
The method handles the scenario where the consumer is assigned to partitions not explicitly listed intargetOffsets. It treats these partitions as already reached by assigning them a target offset ofLong.MIN_VALUEto avoid false negatives.Consumer Position Query:
Usesconsumer.position(partition)to get the current offset position for each partition. This requires that the consumer be assigned partitions and that it has fetched offsets from the broker.Immutability of Input:
The method creates a newHashMapcopy of the inputtargetOffsetsto avoid mutating the caller's map.Generics Support:
The method is generic over the key and value types of the Kafka consumer, denoted as<K, V>. This makes it compatible with any consumer key and value serialization scheme.
Interaction with Other System Components
This utility is designed to be used alongside Kafka consumers within the Apache Camel Kafka idempotent repository or processor components, which manage tracking of consumed messages to ensure exactly-once processing semantics.
It interacts indirectly with Kafka broker state via the
ConsumerAPI, which manages assignments, fetches, and offset commits.The utility method could be called by higher-level processors or services that require confirmation of consumer progress before triggering subsequent processing steps or offset commits.
It depends on Apache Kafka client libraries (
org.apache.kafka.clients.consumer.Consumerandorg.apache.kafka.common.TopicPartition).
Visual Diagram
flowchart TD
A[isReachedOffsets Method] --> B[Validate targetOffsets non-empty]
B --> C[Get consumer assigned partitions]
C --> D[Extend targetOffsets with missing partitions -> Long.MIN_VALUE]
D --> E[For each partition]
E --> F{consumer.position(partition) >= targetOffsets.get(partition)?}
F -- Yes --> G[Check next partition]
F -- No --> H[Return false]
G --> I{All partitions checked?}
I -- Yes --> J[Return true]
I -- No --> E
Summary
`KafkaConsumerUtil.java` provides a focused utility to verify if a Kafka consumer has read up to specified offsets across its assigned partitions. This is essential for managing reliability and idempotency in message processing pipelines using Kafka. The single method `isReachedOffsets` ensures flexible and safe checks of consumer progress, handling edge cases like missing partitions gracefully. It plays a supporting role in the broader Apache Camel Kafka integration ecosystem.