SingleNodeKafkaResumeStrategy.java
Overview
The `SingleNodeKafkaResumeStrategy` class provides a **resume strategy** for Apache Camel Kafka integrations that persistently stores and recovers consumer offsets by publishing them to and consuming them from a dedicated Kafka topic. This implementation is optimized for **single-node environments** where offset state is managed locally but backed by Kafka for durability and fault tolerance.
This strategy enables the Kafka consumer to:
Persist offset state asynchronously by producing offset updates to a Kafka topic.
Rebuild its local offset cache on startup or recovery by consuming stored offset records from the same Kafka topic.
Handle consumer lifecycle, including startup, shutdown, and offset cache initialization with thread-safe operations.
Support rewind of consumer position during rebalance to repopulate the cache with recent offset history.
It integrates with Apache Camel’s `ResumeAdapter` abstraction to manage serialization, deserialization, and caching of offsets.
Class: SingleNodeKafkaResumeStrategy
public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy, CamelContextAware
Purpose
Implements a Kafka resume strategy that stores offsets in a Kafka topic for single-node Apache Camel Kafka integrations. It manages Kafka consumer and producer instances internally to asynchronously update and recover offsets.
Implements
KafkaResumeStrategy— Interface defining resume strategy methods.CamelContextAware— Interface to integrate with Apache Camel runtime context.
Fields
Field | Type | Description |
|---|---|---|
`consumer` | `Consumer` | Kafka consumer instance used to consume offset records from the resume topic. |
`producer` | `Producer` | Kafka producer instance used to asynchronously publish offset updates to the resume topic. |
`pollDuration` | `Duration` | Duration to poll Kafka consumer for new offset records (default 1 second). |
`subscribed` | `boolean` | Flag indicating if the consumer has subscribed to the resume topic. |
`adapter` | `ResumeAdapter` | Adapter to serialize, deserialize, and cache offset data. |
`resumeStrategyConfiguration` | `KafkaResumeStrategyConfiguration` | Configuration object containing Kafka properties and resume topic settings. |
`executorService` | `ExecutorService` | Thread pool for asynchronous cache loading and refreshing. |
`writeLock` | `ReentrantLock` | Lock for synchronizing producer send operations and producer lifecycle management. |
`initLatch` | `CountDownLatch` | Latch used to coordinate asynchronous cache initialization completion. |
`camelContext` | `CamelContext` | Apache Camel context for lifecycle and executor service management. |
Constructors
SingleNodeKafkaResumeStrategy()
Default no-argument constructor.
SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration)
Creates an instance with the provided resume strategy configuration.
**Parameters:**
resumeStrategyConfiguration– Configuration for Kafka consumer/producer properties and topic.
SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration, ExecutorService executorService)
Creates an instance with the provided configuration and executor service for asynchronous tasks.
**Parameters:**
resumeStrategyConfiguration– Configuration for Kafka consumer/producer properties and topic.executorService– Executor service used to run cache loading and refreshing asynchronously.
Key Methods and Usage
void updateLastOffset(T offset) throws Exception
Updates the last offset asynchronously in Kafka.
**Parameters:**
offset– AResumableinstance containing offset key and last offset value.
**Throws:** `Exception`
**Usage Example:**
singleNodeStrategy.updateLastOffset(offsetInstance);
void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception
Updates the last offset asynchronously with a callback to notify about update completion or errors.
**Parameters:**
offset– AResumableinstance.updateCallBack– Callback invoked after the produce operation completes.
**Throws:** `Exception`
void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception
Updates the last offset given offset key and offset value.
**Parameters:**
offsetKey– Serialized key identifying the offset.offset– Offset value.
**Throws:** `Exception`
void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception
Core update method that serializes the offset key and value, asynchronously sends them to Kafka, and updates the local cache.
**Parameters:**
offsetKey– Offset key.offset– Offset value.updateCallBack– Optional callback for completion notification.
**Throws:** `Exception`
void loadCache()
Starts asynchronous loading of existing offsets from Kafka resume topic into the local cache.
**Behavior:**
Validates that the adapter supports deserialization.
Creates a
CountDownLatchfor initialization synchronization.Submits an asynchronous task to refresh offsets from Kafka.
Uses Camel's executor service if no executor is provided.
**Usage Example:**
singleNodeStrategy.loadCache();
void stop()
Gracefully stops the strategy by:
Closing the Kafka producer with locking to ensure thread safety.
Waking up and closing the Kafka consumer.
Shutting down the executor service.
**Usage:**
Call during shutdown or lifecycle stop to release resources cleanly.
void start()
Starts the strategy by creating the Kafka producer instance.
ResumeAdapter getAdapter()
Returns the configured `ResumeAdapter`. If not yet initialized, waits for cache initialization to complete.
void setAdapter(ResumeAdapter adapter)
Sets the `ResumeAdapter` used for offset serialization, deserialization, and caching.
void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration)
Sets the configuration for the resume strategy. Expects a `KafkaResumeStrategyConfiguration` instance, throws runtime exception if the wrong type is given.
Duration getPollDuration()
Returns the Kafka consumer poll duration.
void setPollDuration(Duration pollDuration)
Sets the Kafka consumer poll duration. Cannot be null.
Protected/Internal Methods
void produce(byte[] key, byte[] message, UpdateCallBack updateCallBack)
Asynchronously sends a Kafka record with the given key and message to the resume topic.
Uses the Kafka producer's
sendmethod with an asynchronous callback.Logs errors if send fails.
Notifies the
updateCallBackif provided.
void doAdd(OffsetKey<?> key, Offset<?> offsetValue)
Adds the offset key/value pair to the local cache if the adapter implements `Cacheable`.
Consumer<byte[], byte[]> createConsumer()
Creates a new Kafka consumer instance using the configured consumer properties.
void createProducer()
Creates a new Kafka producer instance if one does not already exist.
void subscribe(Consumer<byte[], byte[]> consumer)
Subscribes the consumer to the resume topic. If the adapter's cache has capacity, subscribes with a rebalance listener to rewind offset consumption by the cache size; otherwise, subscribes normally.
void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic)
Subscribes the consumer to the topic if not already subscribed.
void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic, long remaining)
Subscribes the consumer with a rebalance listener that seeks to rewind the consumer position by `remaining` messages on partition assignment.
ConsumerRebalanceListener getConsumerRebalanceListener(Consumer<byte[], byte[]> consumer, long remaining)
Returns a rebalance listener that seeks the consumer position backwards by `remaining` messages on partition assignment.
ConsumerRecords<byte[], byte[]> consume(Consumer<byte[], byte[]> consumer)
Polls Kafka consumer for records using `pollDuration`. Returns empty records if none available.
ConsumerRecords<byte[], byte[]> consume(int retries, Consumer<byte[], byte[]> consumer)
Polls Kafka consumer with retry logic, returning records if found or empty if retries exhausted.
void refresh(CountDownLatch latch)
Background task that:
Creates Kafka consumer.
Subscribes to resume topic.
Polls continuously for offset records.
Deserializes records into cache via the adapter.
Counts down the latch when initialization is done.
Handles `WakeupException` for safe shutdown.
void waitForInitialization()
Waits for asynchronous cache loading to complete or timeout based on configuration.
Implementation Details and Algorithms
Thread Safety:
Uses aReentrantLock(writeLock) to guard Kafka producer send operations and producer lifecycle close, preventing concurrent access issues.Asynchronous Cache Initialization:
Cache loading is performed in a separate thread via anExecutorService. ACountDownLatchis used to signal when initialization is complete or times out.Consumer Rebalance and Offset Rewind:
The consumer subscribes to the resume topic with a rebalance listener that rewinds offset positions by the cache size to repopulate the cache after restarts or rebalances.Offset Serialization and Deserialization:
Delegated to theResumeAdapterwhich must implementDeserializableandCacheableinterfaces to support cache loading and updating.Error Handling:
Errors during producer sends or consumer polling are logged. The producer send callback reports failures asynchronously.Lifecycle Integration:
Implementsstart(),stop(), andclose()methods managing Kafka client lifecycle and executor service shutdown.
Interaction with Other Components
ResumeAdapter:
Handles serialization/deserialization and caching of offsets, abstracting offset format and storage.Kafka Consumer & Producer:
Dedicated Kafka clients managed internally for interaction with the resume topic.CamelContext:
Provides lifecycle integration and executor service management for asynchronous tasks.KafkaResumeStrategyConfiguration:
Provides configuration such as Kafka bootstrap servers, topic name, consumer group ID, poll durations, and retry settings.
Usage Example
// Create configuration
KafkaResumeStrategyConfiguration config = new KafkaResumeStrategyConfigurationBuilder()
.withBootstrapServers("localhost:9092")
.withTopic("resume-topic")
.withGroupId("resume-group")
.build();
// Create strategy instance
SingleNodeKafkaResumeStrategy strategy = new SingleNodeKafkaResumeStrategy(config);
// Set adapter (implementation of ResumeAdapter)
strategy.setAdapter(myResumeAdapter);
// Start the strategy (creates producer)
strategy.start();
// Load cache asynchronously
strategy.loadCache();
// Update offsets when needed
strategy.updateLastOffset(myOffset);
// On shutdown
strategy.stop();
Visual Diagram: Class Structure and Key Methods
classDiagram
class SingleNodeKafkaResumeStrategy {
-Consumer<byte[], byte[]> consumer
-Producer<byte[], byte[]> producer
-Duration pollDuration
-boolean subscribed
-ResumeAdapter adapter
-KafkaResumeStrategyConfiguration resumeStrategyConfiguration
-ExecutorService executorService
-ReentrantLock writeLock
-CountDownLatch initLatch
-CamelContext camelContext
+SingleNodeKafkaResumeStrategy()
+SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration)
+SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration, ExecutorService)
+void updateLastOffset(T offset) throws Exception
+void updateLastOffset(T offset, UpdateCallBack) throws Exception
+void updateLastOffset(OffsetKey<?>, Offset<?>) throws Exception
+void updateLastOffset(OffsetKey<?>, Offset<?>, UpdateCallBack) throws Exception
+void loadCache()
+void start()
+void stop()
+void close() throws IOException
+ResumeAdapter getAdapter()
+void setAdapter(ResumeAdapter)
+void setResumeStrategyConfiguration(ResumeStrategyConfiguration)
+ResumeStrategyConfiguration getResumeStrategyConfiguration()
+Duration getPollDuration()
+void setPollDuration(Duration)
}
Summary
The `SingleNodeKafkaResumeStrategy` class provides a robust, fault-tolerant Kafka offset management mechanism tailored for single-node Apache Camel Kafka integrations. It leverages Kafka itself as a persistent store for offset state by asynchronously publishing offset updates and consuming them to rebuild a local cache on startup. Key features include asynchronous cache loading, thread-safe producer usage, offset rewind on partition assignment, and integration with Camel's lifecycle and threading model. This strategy is a critical component within the broader Resume Strategies framework, enabling reliable and resumable Kafka consumption without external offset storage.