OffsetPartitionAssignmentAdapter.java
Overview
`OffsetPartitionAssignmentAdapter.java` is a Java class that implements a partition assignment strategy for Apache Kafka consumers within the Apache Camel Kafka component. Its primary purpose is to enable Kafka consumer partitions to resume consumption from their last committed offsets stored externally in a `StateRepository`. This approach facilitates reliable and consistent recovery from interruptions by ensuring that message consumption does not restart from the earliest or latest Kafka offsets but from the exact position last processed.
The class implements the `PartitionAssignmentAdapter` interface (not shown here), which supports custom logic to handle partition assignments — typically invoked when the Kafka consumer is assigned partitions, such as after a rebalance.
Detailed Documentation
Class: OffsetPartitionAssignmentAdapter
Description
A resume strategy adapter that resumes Kafka consumer partitions from offsets stored in an external state repository. It interacts with Kafka's `Consumer` and a `StateRepository` that keeps track of the last consumed offsets as string states.
Properties
Property | Type | Description |
|---|---|---|
`offsetRepository` | `StateRepository` | Repository holding the last committed offsets keyed by partition. |
`consumer` | `Consumer` | Kafka consumer instance to control partition offset position. |
Constructors
public OffsetPartitionAssignmentAdapter(StateRepository<String, String> offsetRepository)
Parameters:
offsetRepository: An instance ofStateRepositoryused to retrieve saved offsets for partitions.
Description:
Initializes the adapter with the given offset repository.
Methods
void setConsumer(Consumer<?, ?> consumer)
Parameters:
consumer: The Kafka consumer instance to assign partitions and seek offsets.
Description:
Sets the Kafka consumer used by this adapter to manipulate partition offsets.
Usage:
This method must be called before handling partition assignments to ensure the adapter has access to the consumer instance.
void handlePartitionAssignment()
Description:
Called when the consumer is assigned partitions. For each assigned partition, it retrieves the last saved offset from the
offsetRepository. If an offset exists, it seeks the consumer to the next offset (last offset + 1) to resume consumption.
Behavior:
Iterates over the currently assigned partitions (
consumer.assignment()).For each partition:
Retrieves the saved offset state using a serialized key.
If a valid offset is found, calls
resumeFromOffsetto seek the consumer.
Usage Example:
OffsetPartitionAssignmentAdapter adapter = new OffsetPartitionAssignmentAdapter(stateRepository); adapter.setConsumer(kafkaConsumer); adapter.handlePartitionAssignment();
private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState)
Parameters:
consumer: Kafka consumer instance.topicPartition: The KafkaTopicPartitionto resume.offsetState: The serialized offset string retrieved from the repository.
Description:
Deserializes the stored offset string to a long value.
Seeks the consumer to the offset immediately after the stored offset (offset + 1) to avoid duplicate processing.
Logs the action at debug level.
Implementation Detail:
Assumes the stored offset corresponds to the last successfully processed message, so it resumes from the subsequent offset.
public static String serializeOffsetKey(TopicPartition topicPartition)
Parameters:
topicPartition: KafkaTopicPartitioninstance.
Returns:
A
Stringkey used to store/retrieve offsets in theStateRepository.
Description:
Converts a
TopicPartitioninto a string key of the format"topicName/partitionNumber".
Example:
TopicPartition tp = new TopicPartition("orders", 3); String key = OffsetPartitionAssignmentAdapter.serializeOffsetKey(tp); // key = "orders/3"
public static long deserializeOffsetValue(String offset)
Parameters:
offset: The offset value stored as a string.
Returns:
Parsed
longoffset value.
Description:
Parses the offset string into a long integer.
Exception:
Throws
NumberFormatExceptionif the string cannot be parsed as a long.
Important Implementation Details
Offset Resuming Logic: The adapter resumes consumption from the offset after the last stored offset to prevent reprocessing the last consumed message.
External Offset Storage: Offsets are stored externally in a
StateRepositorykeyed by"topic/partition". This allows offset management outside Kafka's default offset storage mechanisms.Consumer Interaction: The class depends on the Kafka
ConsumerAPI’sassignment()to detect partitions andseek()to set the consumer’s position.Logging: Uses SLF4J for debug-level logs to trace offset resuming activities.
Interaction with Other System Components
Kafka Consumer: The adapter requires a
Consumerinstance to manipulate partition offsets.StateRepository: An abstraction that holds offset states as strings. This repository could be backed by persistent storage (e.g., a database, file store).
PartitionAssignmentAdapter Interface: This class implements the
PartitionAssignmentAdapterinterface, which defines the contract for partition assignment handling strategies used by the Kafka consumer component in Apache Camel.
In a typical workflow, the Kafka consumer component:
Instantiates this adapter with a configured
StateRepository.Sets the Kafka consumer instance.
Invokes
handlePartitionAssignment()after partitions are assigned (e.g., after rebalance).The adapter resumes consumption from the correct offsets.
Usage Example
// Assume offsetRepository is already implemented and initialized
StateRepository<String, String> offsetRepository = ...;
Consumer<String, String> kafkaConsumer = ...;
OffsetPartitionAssignmentAdapter adapter = new OffsetPartitionAssignmentAdapter(offsetRepository);
adapter.setConsumer(kafkaConsumer);
// After Kafka consumer rebalance/partition assignment event
adapter.handlePartitionAssignment();
This example sets up the adapter, assigns the consumer, and resumes consumption from stored offsets.
Mermaid Diagram: Class Structure
classDiagram
class OffsetPartitionAssignmentAdapter {
-StateRepository<String, String> offsetRepository
-Consumer<?, ?> consumer
+OffsetPartitionAssignmentAdapter(StateRepository<String,String>)
+void setConsumer(Consumer<?, ?>)
+void handlePartitionAssignment()
-void resumeFromOffset(Consumer<?, ?>, TopicPartition, String)
+static String serializeOffsetKey(TopicPartition)
+static long deserializeOffsetValue(String)
}
Summary
`OffsetPartitionAssignmentAdapter.java` is a utility class for Apache Camel Kafka consumers that manages resuming partition consumption from externally stored offsets. It leverages Kafka’s consumer API to seek to the correct offset position after partition assignments, ensuring reliable and consistent message processing in distributed and fault-tolerant environments. The class is tightly coupled with the `StateRepository` abstraction for offset state storage and provides static helper methods for key and offset serialization.