KafkaResumeStrategyConfiguration.java


Overview

`KafkaResumeStrategyConfiguration.java` defines a configuration class tailored for use with the Kafka-based resume strategies in Apache Camel, specifically the `KafkaResumeStrategy` and its implementations such as `SingleNodeKafkaResumeStrategy`. This configuration class encapsulates all necessary settings that control how the Kafka resume strategy interacts with Kafka brokers to persist and restore consumer offset states.

The file extends a generic `ResumeStrategyConfiguration` base class, adding Kafka-specific configuration parameters like Kafka producer and consumer properties, resume topic name, and initialization controls (retries and duration). This class acts as a centralized configuration holder, allowing users or other components to customize Kafka connectivity and behavior for offset persistence and recovery.


Class: KafkaResumeStrategyConfiguration

Description

A configuration container class providing Kafka-specific options for resume strategies that persist offset information into Kafka topics.

Superclass

Properties

Property

Type

Description

`producerProperties`

`Properties`

Kafka producer configuration properties used to create the Kafka producer instance.

`consumerProperties`

`Properties`

Kafka consumer configuration properties used to create the Kafka consumer instance.

`topic`

`String`

The Kafka topic name where resume offset records are produced and consumed.

`maxInitializationDuration`

`Duration`

Maximum allowed duration for initializing the offset cache during startup.

`maxInitializationRetries`

`int`

Maximum number of retries allowed for initialization attempts before failing. Must be ≥ 1.

Methods

Getters and Setters

Overridden Methods


Usage Example

KafkaResumeStrategyConfiguration config = new KafkaResumeStrategyConfiguration();

Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
config.setProducerProperties(producerProps);

Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "resume-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put("auto.offset.reset", "earliest");
config.setConsumerProperties(consumerProps);

config.setTopic("camel-resume-topic");
config.setMaxInitializationDuration(Duration.ofSeconds(10));
config.setMaxInitializationRetries(5);

// The config instance can now be passed to KafkaResumeStrategy implementations.

Important Implementation Details


Interaction with Other Parts of the System


Visual Diagram: Class Structure of KafkaResumeStrategyConfiguration

classDiagram
    class ResumeStrategyConfiguration {
        <<abstract>>
        +resumeStrategyService() : String
    }

    class KafkaResumeStrategyConfiguration {
        -producerProperties: Properties
        -consumerProperties: Properties
        -topic: String
        -maxInitializationDuration: Duration
        -maxInitializationRetries: int
        +getProducerProperties() : Properties
        +setProducerProperties(producerProperties: Properties) : void
        +getConsumerProperties() : Properties
        +setConsumerProperties(consumerProperties: Properties) : void
        +getTopic() : String
        +setTopic(topic: String) : void
        +getMaxInitializationDuration() : Duration
        +setMaxInitializationDuration(maxInitializationDuration: Duration) : void
        +getMaxInitializationRetries() : int
        +setMaxInitializationRetries(maxInitializationRetries: int) : void
        +resumeStrategyService() : String
    }

    KafkaResumeStrategyConfiguration --|> ResumeStrategyConfiguration

Summary

`KafkaResumeStrategyConfiguration.java` is a focused configuration class that encapsulates Kafka-specific settings to support reliable offset persistence and recovery in Kafka resume strategies within Apache Camel. It manages Kafka producer and consumer properties, the resume topic, and initialization parameters crucial for the offset cache lifecycle. This class acts as the configuration backbone for Kafka resume strategies, enabling seamless integration with Kafka brokers and the broader resume strategy framework. It ensures that Kafka clients are properly configured and that resume strategies operate within defined initialization constraints.

This clear separation of configuration concerns promotes modularity, testability, and flexibility in how Kafka resume strategies are deployed and managed in distributed or single-node Apache Camel applications.