KafkaResumeStrategyConfigurationBuilder.java
Overview
`KafkaResumeStrategyConfigurationBuilder` is a fluent builder class designed to create and customize configuration objects for the `SingleNodeKafkaResumeStrategy` within Apache Camel’s Kafka resume strategies module. This configuration builder facilitates the setup of Kafka producer and consumer properties, resume topic, cache fill policies, initialization parameters, and other details needed to configure a Kafka-based offset resume strategy.
This builder abstracts the complexity of configuring Kafka clients and resume strategy parameters, providing sensible defaults and helper methods to tailor the configuration for resilience and performance in a fault-tolerant Kafka offset management context.
Class: KafkaResumeStrategyConfigurationBuilder
public class KafkaResumeStrategyConfigurationBuilder
extends BasicResumeStrategyConfigurationBuilder<KafkaResumeStrategyConfigurationBuilder, KafkaResumeStrategyConfiguration> {
...
}
Purpose
To construct an instance of
KafkaResumeStrategyConfiguration, which holds all configuration settings required by theSingleNodeKafkaResumeStrategy.To provide an easy-to-use API for configuring Kafka consumer and producer properties, topic names, retry policies, and cache fill policies.
To establish default Kafka serialization configurations for byte-array keys and values, suitable for Kafka resume offset records.
To ensure integration with the Camel resume strategy framework by extending
BasicResumeStrategyConfigurationBuilder.
Fields
Field | Type | Description |
|---|---|---|
`producerProperties` | `Properties` | Kafka producer configuration properties. |
`consumerProperties` | `Properties` | Kafka consumer configuration properties. |
`topic` | `String` | The Kafka topic name used for persisting resume offset data. |
`maxInitializationDuration` | `Duration` | Maximum allowed duration for initialization of the resume strategy (default 10 seconds). |
`maxInitializationRetries` | `int` | Maximum number of retries to initialize the resume strategy (default 5). |
Constructors
KafkaResumeStrategyConfigurationBuilder()
Private default constructor to prevent uninitialized builder creation externally.
KafkaResumeStrategyConfigurationBuilder(Properties producerProperties, Properties consumerProperties)
Constructs the builder with required Kafka producer and consumer properties.
Both parameters are mandatory and validated for non-null.
Parameters
Parameter | Type | Description |
|---|---|---|
`producerProperties` | `Properties` | Kafka producer configuration. |
`consumerProperties` | `Properties` | Kafka consumer configuration. |
Methods
withCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy)
Overrides the base method to additionally set Kafka consumer offset reset behavior based on cache fill policy.
If policy is
MINIMIZING, the consumer offset reset config is set to"latest"(consume only new messages).Otherwise (e.g.,
MAXIMIZING), it is set to"earliest"(consume from the beginning).
Returns `this` for fluent chaining.
Parameters
Parameter | Type | Description |
|---|---|---|
`cacheFillPolicy` | `Cacheable.FillPolicy` | Policy to control cache fill behavior. |
Returns
`KafkaResumeStrategyConfigurationBuilder`
Usage Example
builder.withCacheFillPolicy(Cacheable.FillPolicy.MINIMIZING);
withProducerProperty(String key, Object value)
Adds or overrides a Kafka producer property.
Parameters
Parameter | Type | Description |
|---|---|---|
`key` | `String` | Producer property key |
`value` | `Object` | Producer property value |
Returns
`KafkaResumeStrategyConfigurationBuilder`
Usage Example
builder.withProducerProperty("acks", "all");
withConsumerProperty(String key, Object value)
Adds or overrides a Kafka consumer property.
Parameters
Parameter | Type | Description |
|---|---|---|
`key` | `String` | Consumer property key |
`value` | `Object` | Consumer property value |
Returns
`KafkaResumeStrategyConfigurationBuilder`
Usage Example
builder.withConsumerProperty("max.poll.records", 500);
withGroupId(String value)
Sets the Kafka consumer group ID.
Parameters
Parameter | Type | Description |
|---|---|---|
`value` | `String` | Kafka consumer group ID |
Returns
`KafkaResumeStrategyConfigurationBuilder`
Usage Example
builder.withGroupId("resume-consumer-group");
withEnableAutoCommit(boolean value)
Enables or disables Kafka consumer auto-commit of offsets.
Parameters
Parameter | Type | Description |
|---|---|---|
`value` | `boolean` | True to enable auto commit. |
Returns
`KafkaResumeStrategyConfigurationBuilder`
Usage Example
builder.withEnableAutoCommit(false);
withBootstrapServers(String value)
Sets the Kafka bootstrap servers for both consumer and producer.
Parameters
Parameter | Type | Description |
|---|---|---|
`value` | `String` | Comma-separated Kafka bootstrap servers |
Returns
`KafkaResumeStrategyConfigurationBuilder`
Usage Example
builder.withBootstrapServers("localhost:9092");
withTopic(String value)
Sets the Kafka topic used for resume offset storage.
Parameters
Parameter | Type | Description |
|---|---|---|
`value` | `String` | Resume topic name. |
Returns
`KafkaResumeStrategyConfigurationBuilder`
Usage Example
builder.withTopic("camel-resume-topic");
withMaxInitializationDuration(Duration duration)
Sets the maximum duration allowed for initialization phase.
Parameters
Parameter | Type | Description |
|---|---|---|
`duration` | `Duration` | Duration for max initialization time. |
Returns
`KafkaResumeStrategyConfigurationBuilder`
Usage Example
builder.withMaxInitializationDuration(Duration.ofSeconds(15));
withMaxInitializationRetries(int retries)
Sets the maximum number of initialization retries.
Parameters
Parameter | Type | Description |
|---|---|---|
`retries` | int | Number of retries allowed. |
Returns
`KafkaResumeStrategyConfigurationBuilder`
Usage Example
builder.withMaxInitializationRetries(10);
build()
Builds and returns an immutable `KafkaResumeStrategyConfiguration` instance populated with all set properties.
Calls
buildCommonConfigurationfrom the superclass to apply common base settings.Sets Kafka consumer/producer properties, topic, max initialization duration, and retries.
Returns
Type | Description |
|---|---|
`KafkaResumeStrategyConfiguration` | Fully constructed configuration object |
Usage Example
KafkaResumeStrategyConfiguration config = builder.build();
Static Utility Methods
createConsumerProperties()
Creates default Kafka consumer properties configured to use byte array deserializers for keys and values.
Returns
`Properties` - Default consumer properties.
Usage Example
Properties consumerProps = KafkaResumeStrategyConfigurationBuilder.createConsumerProperties();
createProducerProperties()
Creates default Kafka producer properties configured to use byte array serializers for keys and values.
Returns
`Properties` - Default producer properties.
Usage Example
Properties producerProps = KafkaResumeStrategyConfigurationBuilder.createProducerProperties();
Static Factory Methods
newBuilder()
Creates a pre-configured builder instance with:
Default byte array serializers/deserializers.
Random UUID generated group ID.
Auto-commit enabled.
Cache fill policy set to
MAXIMIZING.Logs the group ID at debug level.
Returns
`KafkaResumeStrategyConfigurationBuilder` - pre-configured builder.
Usage Example
KafkaResumeStrategyConfigurationBuilder builder = KafkaResumeStrategyConfigurationBuilder.newBuilder();
newEmptyBuilder()
Creates an empty builder with:
Empty Kafka producer and consumer properties.
Random UUID generated group ID.
Auto-commit enabled.
Logs the group ID at debug level.
Use this when you want to start with a clean slate for Kafka properties.
Returns
`KafkaResumeStrategyConfigurationBuilder` - empty builder.
Usage Example
KafkaResumeStrategyConfigurationBuilder emptyBuilder = KafkaResumeStrategyConfigurationBuilder.newEmptyBuilder();
Implementation Details and Algorithms
Cache Fill Policy Influence:
The builder sets the Kafka consumer propertyauto.offset.resetaccording to the cache fill policy. This determines whether the consumer begins reading from the earliest or latest offset on startup, impacting how the offset cache is populated.Default Serialization Setup:
The default Kafka consumer and producer properties useByteArrayDeserializerandByteArraySerializerrespectively. This aligns with the requirement that offset keys and values are serialized as byte arrays for Kafka communication.Random Group ID Generation:
Each builder instance automatically assigns a unique consumer group ID using a generated UUID string, ensuring isolated consumption for resume topics.Fluent API Design:
AllwithX()methods returnthis, enabling chaining for concise and readable configuration building.Integration with Base Builder:
By extendingBasicResumeStrategyConfigurationBuilder, common resume strategy configurations (like cache policies) are managed uniformly, allowing reuse and consistency across different resume strategies.
Interaction with Other Components
KafkaResumeStrategyConfiguration:
The built configuration object is used to instantiate and configure theSingleNodeKafkaResumeStrategy, which implements the actual resume strategy logic for Kafka offset persistence.SingleNodeKafkaResumeStrategy:
This builder is the primary means to prepare and customize the configuration that the strategy consumes to set up Kafka clients, topics, retry policies, and cache filling behavior.Apache Camel Framework:
The builder is part of Apache Camel's resume strategy support, where it integrates with the resume cache, adapters, and lifecycle management.
Usage Example
KafkaResumeStrategyConfiguration config = KafkaResumeStrategyConfigurationBuilder.newBuilder()
.withBootstrapServers("localhost:9092")
.withTopic("camel-resume-topic")
.withGroupId("my-resume-group")
.withEnableAutoCommit(true)
.withCacheFillPolicy(Cacheable.FillPolicy.MINIMIZING)
.withMaxInitializationDuration(Duration.ofSeconds(15))
.withMaxInitializationRetries(3)
.build();
This example builds a `KafkaResumeStrategyConfiguration` ready to be used by a `SingleNodeKafkaResumeStrategy` instance.
Mermaid Class Diagram
classDiagram
class KafkaResumeStrategyConfigurationBuilder {
-Properties producerProperties
-Properties consumerProperties
-String topic
-Duration maxInitializationDuration
-int maxInitializationRetries
+KafkaResumeStrategyConfigurationBuilder(Properties, Properties)
+withCacheFillPolicy(Cacheable.FillPolicy): KafkaResumeStrategyConfigurationBuilder
+withProducerProperty(String, Object): KafkaResumeStrategyConfigurationBuilder
+withConsumerProperty(String, Object): KafkaResumeStrategyConfigurationBuilder
+withGroupId(String): KafkaResumeStrategyConfigurationBuilder
+withEnableAutoCommit(boolean): KafkaResumeStrategyConfigurationBuilder
+withBootstrapServers(String): KafkaResumeStrategyConfigurationBuilder
+withTopic(String): KafkaResumeStrategyConfigurationBuilder
+withMaxInitializationDuration(Duration): KafkaResumeStrategyConfigurationBuilder
+withMaxInitializationRetries(int): KafkaResumeStrategyConfigurationBuilder
+build(): KafkaResumeStrategyConfiguration
+static createConsumerProperties(): Properties
+static createProducerProperties(): Properties
+static newBuilder(): KafkaResumeStrategyConfigurationBuilder
+static newEmptyBuilder(): KafkaResumeStrategyConfigurationBuilder
}
KafkaResumeStrategyConfigurationBuilder --|> BasicResumeStrategyConfigurationBuilder
Summary
`KafkaResumeStrategyConfigurationBuilder` is a dedicated builder class that simplifies the construction of configurations for Kafka-based resume strategies in Apache Camel. It encapsulates detailed Kafka client configurations, topic settings, cache policies, and initialization parameters, providing a fluent API to produce well-formed `KafkaResumeStrategyConfiguration` instances. This class is essential in enabling the `SingleNodeKafkaResumeStrategy` to reliably maintain Kafka offset resume state with flexible and robust configuration options.