KafkaResumeStrategyConfiguration.java
Overview
`KafkaResumeStrategyConfiguration.java` defines a configuration class tailored for use with the Kafka-based resume strategies in Apache Camel, specifically the `KafkaResumeStrategy` and its implementations such as `SingleNodeKafkaResumeStrategy`. This configuration class encapsulates all necessary settings that control how the Kafka resume strategy interacts with Kafka brokers to persist and restore consumer offset states.
The file extends a generic `ResumeStrategyConfiguration` base class, adding Kafka-specific configuration parameters like Kafka producer and consumer properties, resume topic name, and initialization controls (retries and duration). This class acts as a centralized configuration holder, allowing users or other components to customize Kafka connectivity and behavior for offset persistence and recovery.
Class: KafkaResumeStrategyConfiguration
Description
A configuration container class providing Kafka-specific options for resume strategies that persist offset information into Kafka topics.
Superclass
ResumeStrategyConfiguration
Properties
Property | Type | Description |
|---|---|---|
`producerProperties` | `Properties` | Kafka producer configuration properties used to create the Kafka producer instance. |
`consumerProperties` | `Properties` | Kafka consumer configuration properties used to create the Kafka consumer instance. |
`topic` | `String` | The Kafka topic name where resume offset records are produced and consumed. |
`maxInitializationDuration` | `Duration` | Maximum allowed duration for initializing the offset cache during startup. |
`maxInitializationRetries` | `int` | Maximum number of retries allowed for initialization attempts before failing. Must be ≥ 1. |
Methods
Getters and Setters
public Properties getProducerProperties()Returns the current Kafka producer properties.
void setProducerProperties(Properties producerProperties)Sets the Kafka producer properties. Must not be null.
Parameters:
producerProperties: Non-null Kafka producer configuration properties.
public Properties getConsumerProperties()Returns the current Kafka consumer properties.
void setConsumerProperties(Properties consumerProperties)Sets the Kafka consumer properties. Must not be null.
Parameters:
consumerProperties: Non-null Kafka consumer configuration properties.
public String getTopic()Returns the Kafka resume topic name.
void setTopic(String topic)Sets the Kafka resume topic name. Must not be null.
Parameters:
topic: Non-null string representing Kafka topic name.
public Duration getMaxInitializationDuration()Returns the maximum duration allowed for initialization.
public void setMaxInitializationDuration(Duration maxInitializationDuration)Sets the maximum initialization duration.
Parameters:
maxInitializationDuration: Duration for initialization timeout.
public int getMaxInitializationRetries()Returns the maximum number of initialization retries.
public void setMaxInitializationRetries(int maxInitializationRetries)Sets the maximum number of retries allowed during initialization.
Parameters:
maxInitializationRetries: Integer ≥ 1; throwsIllegalArgumentExceptionif less than 1.
**Throws:**
IllegalArgumentExceptionif the argument is less than 1.
Overridden Methods
@Override public String resumeStrategyService()Returns a unique identifier string for this resume strategy service.
Returns:
"kafka-resume-strategy"(constant string identifier)
Usage Example
KafkaResumeStrategyConfiguration config = new KafkaResumeStrategyConfiguration();
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
config.setProducerProperties(producerProps);
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "resume-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put("auto.offset.reset", "earliest");
config.setConsumerProperties(consumerProps);
config.setTopic("camel-resume-topic");
config.setMaxInitializationDuration(Duration.ofSeconds(10));
config.setMaxInitializationRetries(5);
// The config instance can now be passed to KafkaResumeStrategy implementations.
Important Implementation Details
Null Checks on Setters:
The setters forproducerProperties,consumerProperties, andtopicperform null checks using Javaassertstatements to guarantee these are always set before usage, ensuring runtime safety.Validation on Retries:
ThemaxInitializationRetriessetter enforces that the value must be at least 1, throwing anIllegalArgumentExceptionotherwise. This prevents invalid retry configuration.Extensibility:
This class extends a genericResumeStrategyConfigurationbase class, indicating that it fits into a larger framework of pluggable resume strategies. It specializes the configuration for Kafka-based strategies.Integration Point:
TheresumeStrategyService()method returns a constant string"kafka-resume-strategy", which likely serves as a unique key or identifier used by the system to select or register this strategy configuration.
Interaction with Other Parts of the System
KafkaResumeStrategy and Implementations:
This configuration class is designed to be used by theKafkaResumeStrategyclass and any of its specialized implementations (e.g.,SingleNodeKafkaResumeStrategy). These strategies use the configuration to instantiate Kafka consumer and producer clients and control initialization and retry behavior.Kafka Producer and Consumer Clients:
TheproducerPropertiesandconsumerPropertiesare directly passed to Kafka client instances, determining how these clients connect to Kafka brokers, serialize/deserialize keys and values, and manage offsets.Offset Persistence Topic:
Thetopicproperty specifies the Kafka topic used to store offset information for resuming consumption. The strategy publishes offset updates to and consumes from this topic.Initialization Control:
ThemaxInitializationDurationandmaxInitializationRetriescontrol how long and how many times the resume strategy attempts to initialize its local offset cache by consuming from the resume topic before giving up.ResumeStrategyConfiguration Base Class:
Being a subclass ofResumeStrategyConfiguration, this class fits within the broader resume strategy framework, allowing Apache Camel components to work with different offset resume strategies generically.
Visual Diagram: Class Structure of KafkaResumeStrategyConfiguration
classDiagram
class ResumeStrategyConfiguration {
<<abstract>>
+resumeStrategyService() : String
}
class KafkaResumeStrategyConfiguration {
-producerProperties: Properties
-consumerProperties: Properties
-topic: String
-maxInitializationDuration: Duration
-maxInitializationRetries: int
+getProducerProperties() : Properties
+setProducerProperties(producerProperties: Properties) : void
+getConsumerProperties() : Properties
+setConsumerProperties(consumerProperties: Properties) : void
+getTopic() : String
+setTopic(topic: String) : void
+getMaxInitializationDuration() : Duration
+setMaxInitializationDuration(maxInitializationDuration: Duration) : void
+getMaxInitializationRetries() : int
+setMaxInitializationRetries(maxInitializationRetries: int) : void
+resumeStrategyService() : String
}
KafkaResumeStrategyConfiguration --|> ResumeStrategyConfiguration
Summary
`KafkaResumeStrategyConfiguration.java` is a focused configuration class that encapsulates Kafka-specific settings to support reliable offset persistence and recovery in Kafka resume strategies within Apache Camel. It manages Kafka producer and consumer properties, the resume topic, and initialization parameters crucial for the offset cache lifecycle. This class acts as the configuration backbone for Kafka resume strategies, enabling seamless integration with Kafka brokers and the broader resume strategy framework. It ensures that Kafka clients are properly configured and that resume strategies operate within defined initialization constraints.
This clear separation of configuration concerns promotes modularity, testability, and flexibility in how Kafka resume strategies are deployed and managed in distributed or single-node Apache Camel applications.