SingleNodeKafkaResumeStrategy.java


Overview

The `SingleNodeKafkaResumeStrategy` class provides a **resume strategy** for Apache Camel Kafka integrations that persistently stores and recovers consumer offsets by publishing them to and consuming them from a dedicated Kafka topic. This implementation is optimized for **single-node environments** where offset state is managed locally but backed by Kafka for durability and fault tolerance.

This strategy enables the Kafka consumer to:

It integrates with Apache Camel’s `ResumeAdapter` abstraction to manage serialization, deserialization, and caching of offsets.


Class: SingleNodeKafkaResumeStrategy

public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy, CamelContextAware

Purpose

Implements a Kafka resume strategy that stores offsets in a Kafka topic for single-node Apache Camel Kafka integrations. It manages Kafka consumer and producer instances internally to asynchronously update and recover offsets.

Implements


Fields

Field

Type

Description

`consumer`

`Consumer`

Kafka consumer instance used to consume offset records from the resume topic.

`producer`

`Producer`

Kafka producer instance used to asynchronously publish offset updates to the resume topic.

`pollDuration`

`Duration`

Duration to poll Kafka consumer for new offset records (default 1 second).

`subscribed`

`boolean`

Flag indicating if the consumer has subscribed to the resume topic.

`adapter`

`ResumeAdapter`

Adapter to serialize, deserialize, and cache offset data.

`resumeStrategyConfiguration`

`KafkaResumeStrategyConfiguration`

Configuration object containing Kafka properties and resume topic settings.

`executorService`

`ExecutorService`

Thread pool for asynchronous cache loading and refreshing.

`writeLock`

`ReentrantLock`

Lock for synchronizing producer send operations and producer lifecycle management.

`initLatch`

`CountDownLatch`

Latch used to coordinate asynchronous cache initialization completion.

`camelContext`

`CamelContext`

Apache Camel context for lifecycle and executor service management.


Constructors

SingleNodeKafkaResumeStrategy()

Default no-argument constructor.


SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration)

Creates an instance with the provided resume strategy configuration.

**Parameters:**


SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration, ExecutorService executorService)

Creates an instance with the provided configuration and executor service for asynchronous tasks.

**Parameters:**


Key Methods and Usage


void updateLastOffset(T offset) throws Exception

Updates the last offset asynchronously in Kafka.

**Parameters:**

**Throws:** `Exception`

**Usage Example:**

singleNodeStrategy.updateLastOffset(offsetInstance);

void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception

Updates the last offset asynchronously with a callback to notify about update completion or errors.

**Parameters:**

**Throws:** `Exception`


void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception

Updates the last offset given offset key and offset value.

**Parameters:**

**Throws:** `Exception`


void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception

Core update method that serializes the offset key and value, asynchronously sends them to Kafka, and updates the local cache.

**Parameters:**

**Throws:** `Exception`


void loadCache()

Starts asynchronous loading of existing offsets from Kafka resume topic into the local cache.

**Behavior:**

**Usage Example:**

singleNodeStrategy.loadCache();

void stop()

Gracefully stops the strategy by:

**Usage:**

Call during shutdown or lifecycle stop to release resources cleanly.


void start()

Starts the strategy by creating the Kafka producer instance.


ResumeAdapter getAdapter()

Returns the configured `ResumeAdapter`. If not yet initialized, waits for cache initialization to complete.


void setAdapter(ResumeAdapter adapter)

Sets the `ResumeAdapter` used for offset serialization, deserialization, and caching.


void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration)

Sets the configuration for the resume strategy. Expects a `KafkaResumeStrategyConfiguration` instance, throws runtime exception if the wrong type is given.


Duration getPollDuration()

Returns the Kafka consumer poll duration.


void setPollDuration(Duration pollDuration)

Sets the Kafka consumer poll duration. Cannot be null.


Protected/Internal Methods


void produce(byte[] key, byte[] message, UpdateCallBack updateCallBack)

Asynchronously sends a Kafka record with the given key and message to the resume topic.


void doAdd(OffsetKey<?> key, Offset<?> offsetValue)

Adds the offset key/value pair to the local cache if the adapter implements `Cacheable`.


Consumer<byte[], byte[]> createConsumer()

Creates a new Kafka consumer instance using the configured consumer properties.


void createProducer()

Creates a new Kafka producer instance if one does not already exist.


void subscribe(Consumer<byte[], byte[]> consumer)

Subscribes the consumer to the resume topic. If the adapter's cache has capacity, subscribes with a rebalance listener to rewind offset consumption by the cache size; otherwise, subscribes normally.


void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic)

Subscribes the consumer to the topic if not already subscribed.


void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic, long remaining)

Subscribes the consumer with a rebalance listener that seeks to rewind the consumer position by `remaining` messages on partition assignment.


ConsumerRebalanceListener getConsumerRebalanceListener(Consumer<byte[], byte[]> consumer, long remaining)

Returns a rebalance listener that seeks the consumer position backwards by `remaining` messages on partition assignment.


ConsumerRecords<byte[], byte[]> consume(Consumer<byte[], byte[]> consumer)

Polls Kafka consumer for records using `pollDuration`. Returns empty records if none available.


ConsumerRecords<byte[], byte[]> consume(int retries, Consumer<byte[], byte[]> consumer)

Polls Kafka consumer with retry logic, returning records if found or empty if retries exhausted.


void refresh(CountDownLatch latch)

Background task that:

Handles `WakeupException` for safe shutdown.


void waitForInitialization()

Waits for asynchronous cache loading to complete or timeout based on configuration.


Implementation Details and Algorithms


Interaction with Other Components


Usage Example

// Create configuration
KafkaResumeStrategyConfiguration config = new KafkaResumeStrategyConfigurationBuilder()
    .withBootstrapServers("localhost:9092")
    .withTopic("resume-topic")
    .withGroupId("resume-group")
    .build();

// Create strategy instance
SingleNodeKafkaResumeStrategy strategy = new SingleNodeKafkaResumeStrategy(config);

// Set adapter (implementation of ResumeAdapter)
strategy.setAdapter(myResumeAdapter);

// Start the strategy (creates producer)
strategy.start();

// Load cache asynchronously
strategy.loadCache();

// Update offsets when needed
strategy.updateLastOffset(myOffset);

// On shutdown
strategy.stop();

Visual Diagram: Class Structure and Key Methods

classDiagram
    class SingleNodeKafkaResumeStrategy {
        -Consumer<byte[], byte[]> consumer
        -Producer<byte[], byte[]> producer
        -Duration pollDuration
        -boolean subscribed
        -ResumeAdapter adapter
        -KafkaResumeStrategyConfiguration resumeStrategyConfiguration
        -ExecutorService executorService
        -ReentrantLock writeLock
        -CountDownLatch initLatch
        -CamelContext camelContext
        +SingleNodeKafkaResumeStrategy()
        +SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration)
        +SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration, ExecutorService)
        +void updateLastOffset(T offset) throws Exception
        +void updateLastOffset(T offset, UpdateCallBack) throws Exception
        +void updateLastOffset(OffsetKey<?>, Offset<?>) throws Exception
        +void updateLastOffset(OffsetKey<?>, Offset<?>, UpdateCallBack) throws Exception
        +void loadCache()
        +void start()
        +void stop()
        +void close() throws IOException
        +ResumeAdapter getAdapter()
        +void setAdapter(ResumeAdapter)
        +void setResumeStrategyConfiguration(ResumeStrategyConfiguration)
        +ResumeStrategyConfiguration getResumeStrategyConfiguration()
        +Duration getPollDuration()
        +void setPollDuration(Duration)
    }

Summary

The `SingleNodeKafkaResumeStrategy` class provides a robust, fault-tolerant Kafka offset management mechanism tailored for single-node Apache Camel Kafka integrations. It leverages Kafka itself as a persistent store for offset state by asynchronously publishing offset updates and consuming them to rebuild a local cache on startup. Key features include asynchronous cache loading, thread-safe producer usage, offset rewind on partition assignment, and integration with Camel's lifecycle and threading model. This strategy is a critical component within the broader Resume Strategies framework, enabling reliable and resumable Kafka consumption without external offset storage.