OffsetPartitionAssignmentAdapter.java

Overview

`OffsetPartitionAssignmentAdapter.java` is a Java class that implements a partition assignment strategy for Apache Kafka consumers within the Apache Camel Kafka component. Its primary purpose is to enable Kafka consumer partitions to resume consumption from their last committed offsets stored externally in a `StateRepository`. This approach facilitates reliable and consistent recovery from interruptions by ensuring that message consumption does not restart from the earliest or latest Kafka offsets but from the exact position last processed.

The class implements the `PartitionAssignmentAdapter` interface (not shown here), which supports custom logic to handle partition assignments — typically invoked when the Kafka consumer is assigned partitions, such as after a rebalance.


Detailed Documentation

Class: OffsetPartitionAssignmentAdapter

Description

A resume strategy adapter that resumes Kafka consumer partitions from offsets stored in an external state repository. It interacts with Kafka's `Consumer` and a `StateRepository` that keeps track of the last consumed offsets as string states.

Properties

Property

Type

Description

`offsetRepository`

`StateRepository`

Repository holding the last committed offsets keyed by partition.

`consumer`

`Consumer`

Kafka consumer instance to control partition offset position.

Constructors

public OffsetPartitionAssignmentAdapter(StateRepository<String, String> offsetRepository)

Methods


void setConsumer(Consumer<?, ?> consumer)

void handlePartitionAssignment()

private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState)

public static String serializeOffsetKey(TopicPartition topicPartition)

public static long deserializeOffsetValue(String offset)

Important Implementation Details


Interaction with Other System Components

In a typical workflow, the Kafka consumer component:

  1. Instantiates this adapter with a configured StateRepository.

  2. Sets the Kafka consumer instance.

  3. Invokes handlePartitionAssignment() after partitions are assigned (e.g., after rebalance).

  4. The adapter resumes consumption from the correct offsets.


Usage Example

// Assume offsetRepository is already implemented and initialized
StateRepository<String, String> offsetRepository = ...;
Consumer<String, String> kafkaConsumer = ...;

OffsetPartitionAssignmentAdapter adapter = new OffsetPartitionAssignmentAdapter(offsetRepository);
adapter.setConsumer(kafkaConsumer);

// After Kafka consumer rebalance/partition assignment event
adapter.handlePartitionAssignment();

This example sets up the adapter, assigns the consumer, and resumes consumption from stored offsets.


Mermaid Diagram: Class Structure

classDiagram
    class OffsetPartitionAssignmentAdapter {
        -StateRepository<String, String> offsetRepository
        -Consumer<?, ?> consumer
        +OffsetPartitionAssignmentAdapter(StateRepository<String,String>)
        +void setConsumer(Consumer<?, ?>)
        +void handlePartitionAssignment()
        -void resumeFromOffset(Consumer<?, ?>, TopicPartition, String)
        +static String serializeOffsetKey(TopicPartition)
        +static long deserializeOffsetValue(String)
    }

Summary

`OffsetPartitionAssignmentAdapter.java` is a utility class for Apache Camel Kafka consumers that manages resuming partition consumption from externally stored offsets. It leverages Kafka’s consumer API to seek to the correct offset position after partition assignments, ensuring reliable and consistent message processing in distributed and fault-tolerant environments. The class is tightly coupled with the `StateRepository` abstraction for offset state storage and provides static helper methods for key and offset serialization.