KafkaResumeAdapter.java
Overview
`KafkaResumeAdapter.java` is a key implementation class within the Apache Camel Kafka component ecosystem, specifically designed to support resuming Kafka consumer processing from previously stored offsets. It acts as an adapter that bridges Kafka’s consumer offset management with Apache Camel’s resumable consumption framework.
This class implements the `ResumeAdapter`, `Deserializable`, and `Cacheable` interfaces, enabling it to:
Deserialize stored offset information from a cache or external storage.
Store and retrieve Kafka topic partition offsets.
Resume consumption from specific offsets by seeking the Kafka consumer to those positions.
Effectively, `KafkaResumeAdapter` facilitates fault-tolerant and exactly-once or at-least-once message processing by allowing Kafka consumers to restart from the last processed message offset tracked externally via a resume cache.
Class and Interface Details
Class: KafkaResumeAdapter
public class KafkaResumeAdapter implements ResumeAdapter, Deserializable, Cacheable
Package:
org.apache.camel.component.kafka.consumer.support.resumePurpose: Integrates Kafka offset management with Apache Camel’s resume framework.
Annotations:
@JdkService("kafka-adapter-factory"): Registers this class as a JDK service factory under the namekafka-adapter-factory.
Properties
Property | Type | Description |
|---|---|---|
`consumer` | `Consumer` | Kafka consumer instance used to seek to offsets. |
`resumeCache` | `ResumeCache` | Cache storing topic partitions and their offsets. |
`LOG` | `Logger` | Logger for diagnostic messages and warnings. |
Methods
private boolean resume(TopicPartition topicPartition, Object value)
Description: Seeks the Kafka consumer to the specified offset for a given topic partition.
Parameters:
topicPartition- The Kafka topic and partition to seek.value- The offset value (expected to beLong).
Returns:
trueif seeking is performed.Usage: Called internally during resume operations to reposition the consumer.
public void resume()
Description: Iterates over cached offsets and resumes consumption by seeking the Kafka consumer to each stored offset.
Parameters: None
Returns: void
Usage Example:
KafkaResumeAdapter adapter = new KafkaResumeAdapter(); adapter.setConsumer(kafkaConsumer); adapter.setCache(resumeCache); adapter.resume(); // Consumer seeks to cached offsets
public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer)
Description: Converts serialized key and value buffers into runtime objects and populates the resume cache.
Parameters:
keyBuffer– Serialized key buffer expected to represent aStringin format"topic/partition".valueBuffer– Serialized value buffer expected to represent aLongoffset.
Returns:
falsealways (indicating no immediate resumption occurs during deserialization).Behavior Details:
Deserializes the key and value.
Validates key format (
topic/partition).Adds the
(TopicPartition, offset)pair to resume cache.Logs warnings on invalid formats or types.
Usage: Typically invoked during cache loading or recovery from persistent storage.
public boolean add(OffsetKey<?> key, Offset<?> offset)
Description: Adds a new offset entry to the resume cache.
Parameters:
key– AnOffsetKeywrapping aTopicPartition.offset– AnOffsetwrapping aLongoffset value.
Returns:
trueif the offset was added successfully.Usage: Used to update the resume cache with new offsets as consumption progresses.
public void setCache(ResumeCache<?> cache)
Description: Sets the resume cache instance to be used.
Parameters:
cache– TheResumeCacheinstance to set (cast internally).
Returns: void
Usage Example:
ResumeCache<TopicPartition> cache = new SomeResumeCacheImplementation<>(); adapter.setCache(cache);
public ResumeCache<?> getCache()
Description: Returns the currently set resume cache.
Returns: The
ResumeCache<TopicPartition>instance.
public void setConsumer(Consumer<?, ?> consumer)
Description: Sets the Kafka consumer instance that this adapter will control.
Parameters:
consumer– The Kafka consumer instance.
Returns: void
Usage: Required before calling
resume()to ensure consumer can be seeked.
Important Implementation Details
The class assumes offset keys are strings of the form
"topic/partition", where partition is an integer.Offsets are stored as
Longvalues.The
resume()method uses Kafka’sConsumer.seek(TopicPartition, long)to reposition the consumer.Logging is used to detect and report invalid deserialization scenarios.
The class uses Apache Camel’s generic
ResumeCacheinterface to abstract offset storage, allowing flexible caching strategies.It handles type safety and unchecked casts carefully, particularly when setting the cache and deserializing data.
Interaction with the System
With Kafka Consumer: The adapter controls the Kafka consumer's offset pointer, enabling resumption from specific offsets.
With Resume Cache: Acts as both a source and sink for cached offsets. It reads from and writes to the cache to maintain consumer state.
With Apache Camel Resume Framework: Implements
ResumeAdapterto integrate Kafka offset management into Camel’s resumable consumption and recovery mechanisms.With Serialization Layer: Implements
Deserializableto convert persisted offset data back into usable runtime objects.
Usage Scenario
At startup, the adapter reads offset data from a persistent resume cache via
deserialize().The
resume()method is called to seek the Kafka consumer to the stored offsets.During runtime, as messages are processed, updated offsets are stored via
add()into the resume cache.On failure/restart, the adapter again deserializes offsets and resumes consumption, ensuring minimal message reprocessing.
Mermaid Class Diagram
classDiagram
class KafkaResumeAdapter {
- Consumer<?, ?> consumer
- ResumeCache<TopicPartition> resumeCache
- static Logger LOG
+ void resume()
+ boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer)
+ boolean add(OffsetKey<?> key, Offset<?> offset)
+ void setCache(ResumeCache<?> cache)
+ ResumeCache<?> getCache()
+ void setConsumer(Consumer<?, ?> consumer)
- boolean resume(TopicPartition topicPartition, Object value)
}
KafkaResumeAdapter ..|> ResumeAdapter
KafkaResumeAdapter ..|> Deserializable
KafkaResumeAdapter ..|> Cacheable
class ResumeAdapter
class Deserializable
class Cacheable
KafkaResumeAdapter --> Consumer : controls
KafkaResumeAdapter --> ResumeCache : stores offsets
Summary
`KafkaResumeAdapter.java` is an adapter class enabling Kafka consumers in Apache Camel to resume message consumption from externally stored offsets. By implementing key interfaces (`ResumeAdapter`, `Deserializable`, and `Cacheable`), it supports serialization/deserialization of offset state, cache management, and consumer offset seeking. This class is essential in providing fault tolerance and reliable message processing guarantees in Kafka-based integration scenarios within Apache Camel.