SeekPolicyPartitionAssignmentAdapter.java
Overview
The `SeekPolicyPartitionAssignmentAdapter` class provides a partition assignment strategy for Kafka consumers within the Apache Camel Kafka component. This adapter implements a resume strategy based on Camel's `SeekPolicy` configuration, allowing a Kafka consumer to seek to either the beginning or the end of assigned partitions upon partition assignment.
In Kafka consumer processing, when partitions are assigned to a consumer (such as during a rebalance), the consumer can control where to start consuming messages. This class encapsulates the logic for seeking based on a configured policy (`BEGINNING` or `END`), enabling flexible and controlled resumption of consumption.
Class: SeekPolicyPartitionAssignmentAdapter
Package
`org.apache.camel.component.kafka.consumer.support.classic`
Implements
PartitionAssignmentAdapter
Purpose
Handles Kafka partition assignment events by seeking the consumer's position according to a `SeekPolicy`. This allows starting consumption from the beginning or end of the assigned partitions, based on the configured policy.
Properties
Property | Type | Description |
|---|---|---|
`seekPolicy` | `SeekPolicy` | The configured seek policy (BEGINNING or END). |
`consumer` | `Consumer` | The Kafka consumer instance to control. |
`LOG` | `Logger` | Logger instance for debug messages. |
Constructor
public SeekPolicyPartitionAssignmentAdapter(SeekPolicy seekPolicy)
Parameters:
seekPolicy- TheSeekPolicyto apply when partitions are assigned. Determines whether to seek to the beginning or end of partitions.
Description:
Initializes the adapter with the specified seek policy.
Methods
void setConsumer(Consumer<?, ?> consumer)
Parameters:
consumer- The Kafka consumer instance to be controlled by this adapter.
Description:
Assigns the consumer instance to this adapter, enabling it to perform seek operations on partition assignments.Usage Example:
SeekPolicyPartitionAssignmentAdapter adapter = new SeekPolicyPartitionAssignmentAdapter(SeekPolicy.BEGINNING); adapter.setConsumer(myKafkaConsumer);
void handlePartitionAssignment()
Description:
Invoked when partitions are assigned to the consumer. This method applies the configured seek policy to reposition the consumer's offset in each assigned partition.Behavior:
If the
seekPolicyisBEGINNING, seeks to the earliest offset of all assigned partitions.If the
seekPolicyisEND, seeks to the latest offset of all assigned partitions.If the
seekPolicyis neither, no action is taken.
Implementation Detail:
Uses Kafka Consumer API methods:
consumer.seekToBeginning(Set<TopicPartition>)consumer.seekToEnd(Set<TopicPartition>)
Obtains the assigned partitions via
consumer.assignment().
Logging:
Logs debug messages indicating which seek operation is performed.
Usage Example:
adapter.handlePartitionAssignment();
Important Implementation Details
The adapter depends on the Kafka Consumer's assignment state (
consumer.assignment()) to identify the partitions currently assigned.It leverages Kafka's built-in seek methods to reposition the consumer offset.
Only two seek policies are handled:
BEGINNINGandEND. Any otherSeekPolicyvalues will result in no seek operation.The use of SLF4J logging (
LOG.debug) provides visibility into the seek operations during runtime, which is useful for troubleshooting and auditing consumer behavior.
Interaction with Other System Components
Kafka Consumer (
org.apache.kafka.clients.consumer.Consumer): This class directly manipulates the Kafka consumer instance to control offset positioning.Apache Camel Kafka Component: The adapter is part of the Camel Kafka consumer infrastructure. It plugs into the consumer lifecycle to handle partition assignment events using the configured
SeekPolicy.SeekPolicy Enum (
org.apache.camel.component.kafka.SeekPolicy): Defines the possible seek strategies (BEGINNING,END, etc.) that influence this adapter's behavior.PartitionAssignmentAdapter Interface:
SeekPolicyPartitionAssignmentAdapterimplements this interface, enabling polymorphism and integration into a larger partition assignment handling framework within Camel Kafka.
Usage Scenario Example
Suppose you have a Kafka consumer integrated with Apache Camel, and you want to ensure that whenever partitions are assigned (e.g., after a rebalance), the consumer starts reading messages from the beginning of the partitions:
SeekPolicyPartitionAssignmentAdapter adapter = new SeekPolicyPartitionAssignmentAdapter(SeekPolicy.BEGINNING);
adapter.setConsumer(kafkaConsumer);
adapter.handlePartitionAssignment();
This ensures the consumer processes all messages from the start of the assigned partitions.
Mermaid Class Diagram
classDiagram
class SeekPolicyPartitionAssignmentAdapter {
-seekPolicy: SeekPolicy
-consumer: Consumer<?, ?>
+SeekPolicyPartitionAssignmentAdapter(seekPolicy: SeekPolicy)
+setConsumer(consumer: Consumer<?, ?>): void
+handlePartitionAssignment(): void
}
SeekPolicyPartitionAssignmentAdapter ..|> PartitionAssignmentAdapter
class SeekPolicy {
<<enumeration>>
+BEGINNING
+END
}
SeekPolicyPartitionAssignmentAdapter --> "1" SeekPolicy
SeekPolicyPartitionAssignmentAdapter --> "1" Consumer
Summary
`SeekPolicyPartitionAssignmentAdapter.java` provides a focused implementation of a partition assignment strategy based on Camel's `SeekPolicy`. By controlling Kafka consumer offsets at partition assignment time, it ensures flexible resumption behavior in message consumption, contributing to the robustness and configurability of Apache Camel's Kafka integration.