KafkaResumable.java
Overview
`KafkaResumable.java` defines the `KafkaResumable` class, a utility within the Apache Camel Kafka component designed to support resumable consumption of Kafka messages. Its primary purpose is to encapsulate the Kafka consumer state—specifically the topic partition and offset—into a standardized "resumable" object. This allows Camel routes to track and resume message consumption reliably from the last processed offset, ensuring no message loss or duplication in the event of failure or restarts.
The class implements the `Resumable` interface from the Camel resume framework, providing methods to retrieve a unique offset key and the last consumed offset. It also includes a factory method to create instances directly from a Camel `Exchange` with Kafka-specific headers.
Class: KafkaResumable
public final class KafkaResumable implements Resumable
Description
`KafkaResumable` is an immutable class representing the state of a Kafka consumer at a particular topic partition and offset. It implements `Resumable`, which is a contract for objects that can provide an offset key and last offset to support resumption logic in Camel.
Fields
Field | Type | Description |
|---|---|---|
`addressable` | `String` | A unique identifier combining topic and partition (formatted as `"topic/partition"`). |
`offset` | `Long` | The last consumed offset in the Kafka partition. |
Constructor
private KafkaResumable(String addressable, Long offset)
Parameters:
addressable: Unique identifier for the Kafka topic partition.offset: Last consumed offset.
Access: Private, enforcing use of the static factory method for creation.
Methods
OffsetKey<?> getOffsetKey()
Implements
Resumable.getOffsetKey().Returns an immutable offset key constructed from the
addressablestring.This key uniquely identifies the Kafka partition to which the offset applies.
Returns:
OffsetKey<?>— An immutable offset key representing the topic partition.
Usage Example:
KafkaResumable resumable = KafkaResumable.of(exchange);
OffsetKey<?> key = resumable.getOffsetKey();
// key can be used to retrieve or store offset state
Offset<?> getLastOffset()
Implements
Resumable.getLastOffset().Returns the offset wrapped as an
Offsetobject.Represents the last successfully processed Kafka offset for the given partition.
Returns:
Offset<?>— The last consumed offset.
Usage Example:
KafkaResumable resumable = KafkaResumable.of(exchange);
Offset<?> lastOffset = resumable.getLastOffset();
// lastOffset holds the numeric offset to resume from
static KafkaResumable of(Exchange exchange)
Description: Factory method to create a
KafkaResumableinstance from a CamelExchange.Extracts Kafka-specific headers
KafkaConstants.TOPIC,KafkaConstants.PARTITION, andKafkaConstants.OFFSET.Composes a unique addressable string as
"topic/partition".Constructs a new
KafkaResumablewith this addressable and the offset.Parameters:
exchange(Exchange): Camel message exchange containing Kafka headers.
Returns:
KafkaResumable: A new instance representing the current Kafka consumption state.
Usage Example:
KafkaResumable resumable = KafkaResumable.of(exchange);
Important: The method expects the exchange to have Kafka headers set (
topic,partition,offset). If any are missing, it may result inNullPointerException.
Implementation Details
The class is marked
finalto prevent inheritance, ensuring immutability and consistent behavior.The internal unique key (
addressable) concatenates topic and partition with a slash (topic/partition). This is a simple but effective way to uniquely identify a Kafka partition.Uses Apache Camel's resume framework classes
OffsetKeyandOffsetto fit into Camel’s generic offset tracking and resumption system.The factory method tightly couples this class to Kafka exchanges, making it straightforward to create resumable state from Kafka messages.
No complex algorithms are utilized; the class mainly acts as an adapter/wrapper between Kafka exchange headers and the resume framework.
Integration and Interaction
Apache Camel Kafka Component:
KafkaResumableis used internally by the Kafka consumer component of Apache Camel to track offsets.Camel Resume Framework: Implements
Resumableinterface, allowing the Kafka consumer to leverage Camel’s generic resumption mechanism.Exchange: Uses Camel
Exchangeobjects that carry Kafka message metadata (topic, partition, offset).Offset Storage: The offset key and offset returned by this class can be stored in persistent offset repositories or checkpointing mechanisms used by Camel to resume consumption on restart.
Visual Diagram
classDiagram
class KafkaResumable {
-String addressable
-Long offset
+static KafkaResumable of(Exchange exchange)
+OffsetKey<?> getOffsetKey()
+Offset<?> getLastOffset()
}
KafkaResumable ..|> Resumable
class Resumable {
<<interface>>
+OffsetKey<?> getOffsetKey()
+Offset<?> getLastOffset()
}
class Exchange {
+Message getMessage()
}
class Message {
+<T> T getHeader(String name, Class<T> type)
}
KafkaResumable --> Exchange : Uses for creation
Summary
`KafkaResumable.java` provides a clean, immutable representation of Kafka consumer state tailored to Apache Camel’s resume framework. It extracts necessary Kafka metadata from exchanges and packages it into a form that is easily tracked and persisted. This enables reliable resumption of Kafka consumers within Camel routes, supporting fault tolerance and exactly-once processing semantics.
This class is a key building block in the Kafka component’s offset management, bridging Kafka-specific details with Camel’s generalized resume mechanism.