KafkaConsumerRebalanceIT.java
Overview
`KafkaConsumerRebalanceIT.java` is an integration test class within the Apache Camel Kafka component module. It specifically tests the behavior of Kafka consumer offset management during partition rebalancing events. The test verifies that the custom offset state repository's `getState` method is invoked as expected when Kafka partitions are assigned to the consumer.
The file plays a vital role in ensuring that Kafka consumer offset handling via a custom `StateRepository` implementation integrates correctly with Camel's Kafka component, especially during consumer group rebalancing. This helps guarantee reliable message consumption and offset tracking in distributed environments.
Classes and Components
KafkaConsumerRebalanceIT
This is the main test class extending `BaseKafkaTestSupport` (not shown here) that presumably provides common Kafka test setup utilities.
Fields
private static final String TOPICThe Kafka topic used in this test:
"offset-rebalance".
private final CountDownLatch messagesLatchSynchronization aid initialized with a count of 1. Used to wait for the
getStatemethod to be called.
@BindToRegistry("offset") private final OffsetStateRepository offsetStateRepositoryA custom implementation of
StateRepositorybound to the Camel registry under the name"offset".This repository is injected into the Kafka consumer route to manage offset state.
Methods
public void offsetGetStateMustHaveBeenCalledTwice() throws ExceptionPurpose: Test method to assert that the
getStatemethod of the offset repository is called during consumer partition assignment.Behavior: Waits up to 30 seconds for the latch to count down, indicating
getStatewas called.Assertion: Uses JUnit
assertTrueto verify the latch reached zero.Usage Example:
@Test public void offsetGetStateMustHaveBeenCalledTwice() throws Exception { boolean offsetGetStateCalled = messagesLatch.await(30000, TimeUnit.MILLISECONDS); assertTrue(offsetGetStateCalled, "StateRepository.getState should have been called for topic " + TOPIC); }
public void after()Purpose: Cleanup method executed after each test.
Behavior: Deletes the test Kafka topic to clean state.
Interaction: Calls
kafkaAdminClient.deleteTopics()with the test topic.
protected RouteBuilder createRouteBuilder()Purpose: Defines a Camel route for the test.
Behavior: Creates a route consuming from the Kafka topic with specific consumer options:
groupId: unique group ID for the test.autoCommitIntervalMs: 1000 ms.autoOffsetReset:"latest".consumersCount: 1.offsetRepository: references the custom offset repository bean.
The route forwards messages to a mock endpoint for verification.
Usage Example:
protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @Override public void configure() { from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP" + "&autoCommitIntervalMs=1000" + "&autoOffsetReset=latest" + "&consumersCount=1" + "&offsetRepository=#offset") .routeId("consumer-rebalance-route") .to(KafkaTestUtil.MOCK_RESULT); } }; }
OffsetStateRepository
An inner static class implementing `StateRepository`, which provides custom offset state management for Kafka consumer.
Fields
private static final Logger LOGLogger instance for debug and trace logs.
final CountDownLatch messagesLatchReference to the latch used to signal when
getStateis called for the relevant topic.
Constructor
public OffsetStateRepository(CountDownLatch messagesLatch)Initializes the repository with the latch instance for synchronization.
Methods
public void start()Lifecycle method for repository startup. Empty implementation here.
public void stop()Lifecycle method for repository shutdown. Empty implementation here.
public String getState(String key)Purpose: Retrieve the stored offset state for a given key (topic partition).
Behavior: Logs the retrieval attempt. If the key contains the test topic name, counts down the latch.
Returns a fixed offset string
"-1"indicating no stored offset.Parameters:
String key— the key representing a Kafka topic partition.
Returns:
Stringoffset value.Usage Example:
@Override public String getState(String key) { LOG.debug("Getting the state for {} from topic {}", key, TOPIC); if (key.contains(TOPIC)) { LOG.trace("Topic matches, counting down"); messagesLatch.countDown(); } return "-1"; }
public void setState(String key, String value)Purpose: Store the offset state for a given key.
Behavior: No-op in this test implementation.
Important Implementation Details
CountDownLatch Usage:
The latch synchronizes the test to ensure that the offset repository'sgetStatemethod is invoked, which typically happens during Kafka consumer partition assignment (rebalance). This is crucial to validate that offset retrieval logic is triggered properly.Custom Offset Repository:
TheOffsetStateRepositoryimplementsStateRepositoryinterface expected by Camel Kafka component for offset storage. This test repository simulates offset retrieval without actual storage, returning-1to indicate the consumer should start from the latest offset.Route Configuration:
The test route uses the custom offset repository via theoffsetRepository=#offsetparameter, binding the repository from the Camel registry. This integration ensures Kafka consumer uses the test repository during consumption.Topic Cleanup:
After each test, the Kafka topic is deleted to maintain a clean environment for subsequent tests, preventing state leakage.
Interactions with Other System Components
Apache Camel Kafka Component:
The test integrates directly with the Camel Kafka consumer component by configuring a route consuming from Kafka with specific offset management settings.Kafka Broker and Admin Client:
The test depends on a Kafka cluster (likely a test container or embedded Kafka) and uses Kafka's admin client to delete topics after tests.BaseKafkaTestSupport:
This base class (not shown) likely provides Kafka cluster lifecycle management, test utilities, and Camel context setup.KafkaTestUtil:
Provides a mock endpoint for assertions (KafkaTestUtil.MOCK_RESULT), aiding in message flow verification.JUnit 5 Testing Framework:
Utilizes JUnit 5 annotations (@Test,@AfterEach) and assertions for test execution and validation.
Diagram: Class Structure and Relationships
classDiagram
class KafkaConsumerRebalanceIT {
-static final String TOPIC
-CountDownLatch messagesLatch
-OffsetStateRepository offsetStateRepository
+void offsetGetStateMustHaveBeenCalledTwice()
+void after()
+RouteBuilder createRouteBuilder()
}
class OffsetStateRepository {
-static final Logger LOG
-CountDownLatch messagesLatch
+OffsetStateRepository(CountDownLatch)
+void start()
+void stop()
+String getState(String)
+void setState(String, String)
}
KafkaConsumerRebalanceIT o-- OffsetStateRepository : contains
Summary
`KafkaConsumerRebalanceIT.java` is a focused integration test validating Kafka consumer offset state retrieval during partition rebalances within the Apache Camel Kafka component. It uses a custom `StateRepository` with synchronization primitives to assert that offset retrieval is triggered correctly. The test route is configured to use this repository, and cleanup ensures isolated test runs. This file ensures robustness in offset management integration, a key factor for reliable Kafka consumer operations in distributed, fault-tolerant systems.