KafkaConsumerAsyncWithOffsetRepoCommitIT.java
Overview
`KafkaConsumerAsyncWithOffsetRepoCommitIT.java` is an integration test class designed to validate the behavior of the Apache Camel Kafka component when performing **asynchronous manual offset commits** using an external offset repository. Specifically, it tests that the offset repository is properly updated when Kafka consumer commits offsets asynchronously via the `DefaultKafkaManualAsyncCommitFactory`.
This class extends `BaseManualCommitTestSupport` (presumably providing common test utilities and Kafka setup) and focuses on verifying manual commit semantics with offset state management using an in-memory offset repository (`MemoryStateRepository`).
Classes and Methods
KafkaConsumerAsyncWithOffsetRepoCommitIT
This is the sole public class in the file. It contains the integration test logic.
Visibility | Type | Name | Description |
|---|---|---|---|
public | class | KafkaConsumerAsyncWithOffsetRepoCommitIT | Integration test for async manual commit with offset repository |
Fields
Visibility | Type | Name | Description |
|---|---|---|---|
private static final | `MemoryStateRepository` | `stateRepository` | In-memory offset state repository bean bound to registry |
Annotated with
@BindToRegistry("stateRepository")so that it can be referenced in the Kafka consumer URI.
Methods
after()
@AfterEach
public void after()
Purpose: Cleans up Kafka topic data after each test run.
Parameters: None
Returns: void
Usage: Invoked automatically by JUnit after each test method.
createRouteBuilder()
@Override
protected RouteBuilder createRouteBuilder()
Purpose: Defines and configures the Camel routes used in the test.
Parameters: None
Returns:
RouteBuilderinstance configuring two routes consuming from the same Kafka topic.Implementation details:
Two routes are created from the same Kafka topic
testAsyncCommitWithOffsetRepoTest.Kafka consumer URI parameters:
groupId=KafkaConsumerAsyncCommitIT- consumer group.pollTimeoutMs=1000- poll timeout in milliseconds.autoCommitEnable=false- disables Kafka's auto commit.offsetRepository=#bean:stateRepository- uses the bound in-memory offset repository.allowManualCommit=true- enables manual commit.autoOffsetReset=earliest- start consuming from earliest offset if none committed.kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory- uses async manual commit factory.
Route "foo":
Consumes messages from Kafka.
Sends messages to a mock endpoint (
KafkaTestUtil.MOCK_RESULT).Processes each message by retrieving the
KafkaManualCommitheader and callingcommit()to asynchronously commit offsets.Includes an assertion to ensure
KafkaManualCommitis not null.
Route "bar":
Consumes from the same Kafka topic, but with
autoStartup(false), so it does not start automatically.Sends messages to a different mock endpoint (
KafkaTestUtil.MOCK_RESULT_BAR).
kafkaManualCommitWithOffsetRepo()
@DisplayName("Tests that the offset repository gets updated when using in conjunction with the Async commit manager")
@Test
public void kafkaManualCommitWithOffsetRepo() throws Exception
Purpose: Main test method verifying that asynchronous commits update the offset repository as expected.
Parameters: None
Returns: void
Throws: Exception if test fails or encounters errors.
Usage: Standard JUnit test method.
Details:
Calls
kafkaManualCommitTestWithStateRepository(defined in the superclass) passing the topic and the state repository instance.This method presumably sends messages to Kafka, consumes them via the configured routes, performs manual commits, and verifies the state repository is correctly updated.
Important Implementation Details
Async Manual Commit Factory:
The test configures the Kafka consumer to useDefaultKafkaManualAsyncCommitFactoryfor manual commits. This factory enables committing offsets asynchronously, improving consumer throughput and avoiding blocking calls.Offset Repository Integration:
TheMemoryStateRepositoryinstance is used as an external offset repository bound to the Camel registry. This repository stores offset states outside Kafka's internal offset storage, allowing for custom offset management strategies.Manual Commit Handling:
The route processor explicitly retrieves theKafkaManualCommitobject from message headers and invokescommit()on it to trigger manual offset commits asynchronously.Test Isolation:
The@AfterEachmethod ensures that Kafka topic data is cleaned up after each test run, preventing interference between tests and allowing for consistent test results.
Interaction with Other Components
BaseManualCommitTestSupport: The superclass providing shared Kafka test utilities and setup, including methods likecleanupKafka()andkafkaManualCommitTestWithStateRepository().MemoryStateRepository: A simple in-memory state repository implementation from Apache Camel used here to persist offset states.Apache Camel Kafka Component: The core component under test, which integrates Kafka consumers with Camel routes, handling offset commits and message processing.
KafkaTestUtil: Provides mock endpoint URIs (likeMOCK_RESULTandMOCK_RESULT_BAR) used for testing message receipt.DefaultKafkaManualAsyncCommitFactory: Provides asynchronous manual offset commit capability, improving performance and responsiveness.
Usage Example
This file is an integration test and not intended for direct use in production code. However, it illustrates how to configure a Kafka consumer route with:
Manual asynchronous offset commits.
External offset state repository integration.
Custom consumer group and offset reset behavior.
Example Camel route snippet from `createRouteBuilder()`:
from("kafka:testAsyncCommitWithOffsetRepoTest?groupId=KafkaConsumerAsyncCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
+ "&offsetRepository=#bean:stateRepository&allowManualCommit=true&autoOffsetReset=earliest"
+ "&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory")
.routeId("foo")
.to("mock:result")
.process(exchange -> {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
manual.commit();
}
});
Mermaid Class Diagram
classDiagram
class KafkaConsumerAsyncWithOffsetRepoCommitIT {
+static final String TOPIC
-static MemoryStateRepository stateRepository
+after()
+createRouteBuilder() RouteBuilder
+kafkaManualCommitWithOffsetRepo() void
}
KafkaConsumerAsyncWithOffsetRepoCommitIT --|> BaseManualCommitTestSupport
class MemoryStateRepository {
+get(String key)
+set(String key, Object value)
+clear()
}
class RouteBuilder {
+configure()
}
KafkaConsumerAsyncWithOffsetRepoCommitIT ..> MemoryStateRepository : uses
KafkaConsumerAsyncWithOffsetRepoCommitIT ..> RouteBuilder : returns
Summary
`KafkaConsumerAsyncWithOffsetRepoCommitIT.java` is a focused integration test validating the Apache Camel Kafka component's capability to perform asynchronous manual offset commits while persisting offset states externally via a state repository. It demonstrates configuring Kafka consumer routes with manual commit enabled, integrating an in-memory offset repository, and verifying that commits update this state correctly. The test ensures reliable offset management and message processing in scenarios requiring precise control over offset commits beyond Kafka's auto-commit mechanism.