KafkaConsumerSyncWithOffsetRepoCommitIT.java
Overview
`KafkaConsumerSyncWithOffsetRepoCommitIT.java` is an integration test class designed to validate the behavior of the Apache Camel Kafka component when manual synchronous commits are used in conjunction with an offset repository. Specifically, it ensures that Kafka consumer offsets are correctly committed and stored using a state repository (`MemoryStateRepository`). This test verifies that the offset repository is properly updated when Kafka manual commit operations are performed synchronously.
The class extends a base test support class `BaseManualCommitTestSupport` and configures two Kafka consumer routes that consume messages from a Kafka topic with manual offset commit enabled but auto-commit disabled. The test verifies that manual commits are acknowledged and that the offset state is persisted in the repository.
Class: KafkaConsumerSyncWithOffsetRepoCommitIT
Description
Integration test class for testing Kafka consumer manual commits with synchronous commit manager and offset repository.
Inheritance
Extends:
BaseManualCommitTestSupport
Package
org.apache.camel.component.kafka.integration.commit
Fields
Field Name | Type | Description |
|---|---|---|
`TOPIC` | `String` (static final) | Kafka topic name used in the test. |
`stateRepository` | `MemoryStateRepository` (static final) | In-memory offset repository instance, bound to Camel registry with name "stateRepository". |
Annotations
@BindToRegistry("stateRepository")— Registers thestateRepositorybean in Camel's registry.@AfterEach— JUnit lifecycle method to clean up Kafka topic after each test.@DisplayName— Provides a human-readable name for the test.@Test— Marks the test method to be run as part of the test suite.
Methods
after()
@AfterEach
public void after()
Description: Cleans up the Kafka topic after each test run to ensure a fresh environment.
Parameters: None
Return: void
Usage: Automatically called by JUnit after each test method execution.
createRouteBuilder()
@Override
protected RouteBuilder createRouteBuilder()
Description: Creates and returns a
RouteBuilderconfiguring two Kafka consumer routes.Parameters: None
Return:
RouteBuilderinstance with Kafka consumer routes configured.Implementation Details:
Configures two routes consuming from the same Kafka topic with the following options:
groupId=KafkaConsumerSyncCommitITpollTimeoutMs=1000autoCommitEnable=false— disables automatic offset commits.offsetRepository=#bean:stateRepository— uses the registeredMemoryStateRepositoryinstance.allowManualCommit=true— enables manual offset commit in the route.autoOffsetReset=earliest— starts consuming from the earliest offset if no committed offset exists.kafkaManualCommitFactoryset toDefaultKafkaManualCommitFactory— uses the default factory for manual commits.
The first route (`routeId = "foo"`) sends consumed messages to `KafkaTestUtil.MOCK_RESULT` and processes each exchange by manually committing the offset synchronously using the `KafkaManualCommit` object retrieved from the message header.
The second route (`routeId = "bar"`) consumes from the same topic but is set to not auto-start and sends messages to `KafkaTestUtil.MOCK_RESULT_BAR`.
Usage Example:
RouteBuilder routeBuilder = kafkaConsumerSyncWithOffsetRepoCommitIT.createRouteBuilder();
context.addRoutes(routeBuilder);
kafkaManualCommitWithOffsetRepo()
@DisplayName("Tests that the offset repository gets updated when using in conjunction with the Sync commit manager")
@Test
public void kafkaManualCommitWithOffsetRepo() throws Exception
Description: Test method that verifies the Kafka manual commit mechanism works correctly with the offset repository.
Parameters: None
Return: void
Behavior: Calls a helper method
kafkaManualCommitTestWithStateRepository(TOPIC, stateRepository)inherited from the base class (BaseManualCommitTestSupport) to perform the actual test logic.Usage: Executed by JUnit as a test case.
Important Implementation Details and Algorithms
Manual Offset Commit: The route explicitly disables Kafka's auto-commit feature, opting instead to manually commit offsets by retrieving the
KafkaManualCommitobject from the message header and invokingcommit()synchronously.Offset Repository: Offsets are stored in a
MemoryStateRepository, which acts as an in-memory key-value store for offsets, allowing offset management to be externalized from Kafka itself.Route Configuration: The Kafka consumer URI is constructed with parameters that enable manual commit and specify the offset repository bean, ensuring that commits update the repository state.
Dual Routes: Two routes consume the same topic but only the first route commits offsets manually, with the second route disabled by default to isolate the test scenario.
Interactions with Other Parts of the System
BaseManualCommitTestSupport: This class inherits from a base test support class which likely provides utility methods and Kafka setup/teardown logic (such as
kafkaManualCommitTestWithStateRepositoryandcleanupKafka).KafkaTestUtil: Utilized for constants like
MOCK_RESULTandMOCK_RESULT_BAR, which represent endpoints for message verification in tests.Camel Registry: The
MemoryStateRepositoryinstance is registered in the Camel registry and referenced in the Kafka consumer URI withoffsetRepository=#bean:stateRepository.Kafka Component: This test directly exercises the Apache Camel Kafka component's manual commit feature and offset repository integration.
Usage Scenario
This test file is used during the build process or manually by developers to verify that manual offset commits in Kafka consumers using Apache Camel are correctly persisted to an offset repository when using synchronous commit logic. This ensures reliability of message processing and offset tracking in environments where offset management is externalized.
Visual Diagram
classDiagram
class KafkaConsumerSyncWithOffsetRepoCommitIT {
+static final String TOPIC
+static final MemoryStateRepository stateRepository
+void after()
+RouteBuilder createRouteBuilder()
+void kafkaManualCommitWithOffsetRepo()
}
class RouteBuilder {
+void configure()
}
class KafkaManualCommit {
+void commit()
}
KafkaConsumerSyncWithOffsetRepoCommitIT --> RouteBuilder : createRouteBuilder() returns
RouteBuilder --> KafkaManualCommit : uses in processor
KafkaConsumerSyncWithOffsetRepoCommitIT --> MemoryStateRepository : stateRepository bean
Summary
Purpose: Integration test to verify Kafka manual synchronous commits with offset repository in Apache Camel.
Key Feature: Manual commits update an in-memory offset repository.
Tested Behavior: Synchronous manual commit functionality and offset repository persistence.
Configuration: Kafka consumer with manual commit enabled, auto-commit disabled, offset repository injected.
Verification: The offset repository reflects committed offsets after message processing.
Usage: Part of Kafka component integration tests ensuring correctness of offset management.
This documentation provides all necessary details for developers and testers to understand the purpose, implementation, and usage of `KafkaConsumerSyncWithOffsetRepoCommitIT.java`.