Resume Strategies
Overview
Resume strategies in this project define mechanisms for Kafka consumers to reliably maintain and restore their consumption offset state across restarts or failures. Specifically, these strategies enable fault-tolerant and resumable Kafka message consumption by persisting offset information externally. This external persistence allows consumer instances to continue processing from the last committed offsets, preventing data loss or duplicate processing.
The **Resume Strategies** module focuses on the design and implementation of offset state storage and retrieval using Kafka topics themselves as the persistent medium. This approach leverages Kafka's durability and distributed nature to maintain offset data in a topic dedicated to resume information, thus integrating offset persistence tightly with the Kafka messaging infrastructure.
Core Concepts and Purpose
Offset Persistence in Kafka Topics: Instead of relying solely on Kafka's consumer group offset mechanism or external storage, this module persists offset information explicitly into a Kafka topic. This enables more flexible and customizable offset management strategies that can work across nodes or in single-node environments.
Fault Tolerance and Recovery: By storing offsets in a Kafka topic, the consumer can recover its last known offset even after crashes or restarts. This supports exactly-once or at-least-once delivery semantics depending on how the offsets are committed and processed.
Caching and Asynchronous Updates: To optimize performance, offsets are cached locally and updates are published asynchronously to the Kafka resume topic. This reduces latency and load on Kafka brokers while ensuring offset durability.
Single Node Strategy: The module includes a concrete implementation optimized for single-node integrations, simplifying assumptions about concurrency and distribution.
How the Module Works
SingleNodeKafkaResumeStrategy
The primary implementation provided is the `SingleNodeKafkaResumeStrategy`. This class manages offset persistence by publishing offset updates as Kafka records to a configured resume topic and consuming from that same topic to rebuild the local cache on startup.
Key Functionalities:
Offset Update Publishing:
When a new offset is available, the strategy serializes the offset key and value into byte arrays and asynchronously sends these as messages to the resume Kafka topic via a Kafka producer. This is done inside theupdateLastOffset()methods, which lock during message send to ensure thread safety.ByteBuffer keyBuffer = offsetKey.serialize(); ByteBuffer valueBuffer = offset.serialize(); produce(keyBuffer.array(), valueBuffer.array(), updateCallBack);Offset Cache Management:
The strategy maintains a local cache of offset state using aResumeAdapterthat implementsCacheable. After sending offsets to Kafka, the local cache is updated synchronously to reflect the latest state.Cache Loading on Startup:
On initialization, the strategy creates a Kafka consumer subscribed to the resume topic and continuously polls messages to rebuild the local cache by deserializing the offset records. This process is asynchronous and controlled by a countdown latch ensuring that cache loading completes or times out.executorService.submit(() -> refresh(initLatch));Consumer Subscription with Rewind:
When subscribing to the resume topic, the strategy can rewind the consumer to an earlier offset position based on the cache size, allowing retrieval of historical offset records to fill the cache completely.This is managed with a
ConsumerRebalanceListenerthat seeks the consumer position backwards on partition assignment.Thread Safety and Lifecycle Management:
The strategy uses a reentrant lock (writeLock) to guard producer send operations and safely close producer and consumer clients on stop. The consumer uses wakeup calls and executor service shutdown to terminate polling threads gracefully.
Configuration and Extensibility:
The strategy uses a
KafkaResumeStrategyConfigurationobject that holds Kafka producer/consumer properties, resume topic name, initialization timeouts, and retry counts.It accepts an external
ResumeAdapterimplementation responsible for serialization/deserialization and caching logic, enabling flexible adaptation to different offset data formats or cache policies.The class implements
CamelContextAwareto integrate with the environment’s executor service management.
Interaction with Other System Components
ResumeAdapter:
TheSingleNodeKafkaResumeStrategyuses aResumeAdapterto abstract how offsets are serialized, deserialized, cached, and retrieved. The adapter acts as a bridge between raw Kafka records and the internal offset state representation.Kafka Consumer and Producer:
The strategy manages its own Kafka consumer and producer instances dedicated to the resume topic. These clients operate independently from the main message consumer and producer managed elsewhere in the system.ResumeRebalanceListener:
When subscribing the consumer to the resume topic, a rebalance listener (e.g.,ResumeRebalanceListener) may be used to handle partition assignment and perform seeking with offset rewind, ensuring cache completeness on startup or rebalance events.Commit Management:
This resume strategy complements commit managers by providing an alternative source of truth for offsets. Commit managers handle committing offsets back to Kafka or external repositories, while resume strategies focus on caching and restoring offsets from a Kafka topic.Camel Context and Executor Service:
The strategy uses Camel's executor service to run asynchronous cache loading and refresh tasks, integrating smoothly into the Camel lifecycle and threading model.
Important Concepts and Design Patterns
Asynchronous Offset Publishing:
Offset updates are sent asynchronously to avoid blocking message processing threads, improving throughput and responsiveness.Local Cache with Kafka Topic Synchronization:
The strategy caches offset state locally for fast access and updates this cache by consuming the resume topic records, ensuring consistency and durability.Reentrant Locking for Producer Send Operations:
To maintain thread safety in concurrent scenarios, a reentrant lock guards the producer sending logic and resource cleanup.Use of Countdown Latch for Startup Synchronization:
The asynchronous cache loading thread uses a countdown latch to signal initialization completion or timeout to the main thread, ensuring readiness before processing.Consumer Rebalance Listener to Handle Offset Rewind:
The strategy implements partition assignment hooks to rewind the consumer offset position by the cache size, enabling the retrieval of enough historical messages to populate the cache fully.Plug-ability via ResumeAdapter Interface:
The design separates offset serialization and caching logic into adapters, allowing different implementations to be plugged in without changing the resume strategy core.
Key Code References
Offset Update and Asynchronous Produce:
public <T extends Resumable> void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception { OffsetKey<?> key = offset.getOffsetKey(); Offset<?> offsetValue = offset.getLastOffset(); ByteBuffer keyBuffer = offsetKey.serialize(); ByteBuffer valueBuffer = offset.serialize(); try { writeLock.lock(); produce(keyBuffer.array(), valueBuffer.array(), updateCallBack); } finally { writeLock.unlock(); } doAdd(offsetKey, offset); }Cache Loading via Background Thread:
public void loadCache() { if (!(adapter instanceof Deserializable)) { throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable"); } initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries()); if (executorService == null) { executorService = camelContext.getExecutorServiceManager() .newSingleThreadExecutor(this, "SingleNodeKafkaResumeStrategy"); } executorService.submit(() -> refresh(initLatch)); }Consumer Subscription with Offset Rewind:
public void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic, long remaining) { if (!subscribed) { consumer.subscribe(Collections.singletonList(topic), getConsumerRebalanceListener(consumer, remaining)); subscribed = true; } } private ConsumerRebalanceListener getConsumerRebalanceListener(Consumer<byte[], byte[]> consumer, long remaining) { return new ConsumerRebalanceListener() { @Override public void onPartitionsAssigned(Collection<TopicPartition> assignments) { for (TopicPartition assignment : assignments) { final long endPosition = consumer.position(assignment); final long startPosition = endPosition - remaining; if (startPosition >= 0) { consumer.seek(assignment, startPosition); } } } }; }Lifecycle Management and Shutdown:
@Override public void stop() { try { if (!writeLock.tryLock(1, TimeUnit.SECONDS)) { LOG.warn("Failed to obtain a lock for closing the producer. Force closing the producer ..."); } IOHelper.close(producer, "Kafka producer", LOG); } finally { writeLock.unlock(); } try { consumer.wakeup(); if (executorService != null) { executorService.shutdown(); if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
Configuration
The `KafkaResumeStrategyConfigurationBuilder` class facilitates building configuration for the resume strategy, allowing fine-tuning of Kafka consumer/producer properties, resume topic name, retry counts, and cache fill policies.
Default serializers/deserializers for byte arrays are set up.
Example configuration properties include group ID, bootstrap servers, and auto commit settings.
Cache fill policy influences consumer offset reset behavior (
earliestvslatest).
Example builder usage:
KafkaResumeStrategyConfigurationBuilder builder = KafkaResumeStrategyConfigurationBuilder.newBuilder()
.withBootstrapServers("localhost:9092")
.withTopic("camel-resume-topic")
.withGroupId("resume-consumer-group")
.withEnableAutoCommit(true)
.withMaxInitializationRetries(5)
.withMaxInitializationDuration(Duration.ofSeconds(10));
KafkaResumeStrategyConfiguration config = builder.build();
Visual Diagram: Resume Strategy Offset Flow
sequenceDiagram
participant Consumer as ResumeStrategy Consumer
participant Cache as Local Offset Cache
participant Producer as ResumeStrategy Producer
participant Kafka as Kafka Resume Topic
Note over Consumer, Kafka: Startup: Load offsets from Kafka topic
Consumer->>Kafka: Poll resume topic records
Kafka-->>Consumer: ConsumerRecords
Consumer->>Cache: Deserialize and store offsets
Note over Cache, Producer: Offset update
Cache->>Producer: Serialize offset update
Producer->>Kafka: Produce offset update record (async)
Note over Kafka, Consumer: Continuous offset updates
Kafka-->>Consumer: Offset records consumed to refresh cache
Summary
The **Resume Strategies** module provides a robust mechanism for Kafka consumers within the Apache Camel Kafka component to persist and restore their offset state using Kafka topics as durable storage. The `SingleNodeKafkaResumeStrategy` implements this via asynchronous Kafka producer and consumer clients dedicated to a resume topic, coupled with a local cache synchronized through consuming offset records. This design enables fault-tolerant, resumable consumption with configurable initialization, caching, and concurrency controls. The module integrates closely with other offset management facilities and Camel's threading model, offering flexibility and reliability in managing Kafka consumer offset state beyond default Kafka capabilities.