KafkaConsumerSyncCommitIT.java
Overview
`KafkaConsumerSyncCommitIT.java` is an integration test class designed to validate the synchronous manual commit functionality of the Apache Camel Kafka component consumer. The test ensures that Kafka consumer offsets can be committed manually in a synchronous manner rather than relying on automatic offset commits. This is critical for applications requiring precise control over message processing and offset management to avoid message loss or duplication.
The class extends from `BaseManualCommitTestSupport`, which presumably provides foundational Kafka setup, teardown, and utility methods tailored for testing manual commit semantics.
Detailed Description
Package
package org.apache.camel.component.kafka.integration.commit;
This class resides in the `integration.commit` package under the Kafka component of Apache Camel, indicating it is part of integration tests focusing on commit behaviors.
Class: KafkaConsumerSyncCommitIT
public class KafkaConsumerSyncCommitIT extends BaseManualCommitTestSupport
Purpose
Implements integration tests for synchronous manual commits in Kafka consumers using Apache Camel.
Verifies that manual commits can be successfully invoked on Kafka messages during consumption.
Uses Camel routes to consume messages from a Kafka topic and explicitly commit offsets manually.
Constants
public static final String TOPIC = "testManualCommitSyncTest";
Defines the Kafka topic used in the test to isolate test data.
Lifecycle Methods
@AfterEach public void after()
Invoked after each test execution.
Calls
cleanupKafka(TOPIC)from the base class, which likely clears the Kafka topic to ensure a clean state for subsequent tests.
Overridden Methods
protected RouteBuilder createRouteBuilder()
Returns a `RouteBuilder` that defines two Kafka consumer routes configured for manual synchronous commits.
**Route Configuration Details:**
Kafka Endpoint URI:
kafka:testManualCommitSyncTest ?groupId=KafkaConsumerSyncCommitIT &pollTimeoutMs=1000 &autoCommitEnable=false &allowManualCommit=true &autoOffsetReset=earliest &kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactorygroupId: Consumer group identifier.pollTimeoutMs=1000: Kafka consumer polling timeout in milliseconds.autoCommitEnable=false: Disables Kafka auto-commit of offsets.allowManualCommit=true: Enables manual offset commit.autoOffsetReset=earliest: Starts consuming from the earliest offset if no committed offset is found.kafkaManualCommitFactory: Specifies the factory that creates manual commit instances (DefaultKafkaManualCommitFactory).
**Routes:**
Route "foo":
From the Kafka endpoint.
Routes messages to a mock endpoint (
KafkaTestUtil.MOCK_RESULT) for assertions or verification.Processes each exchange to retrieve the
KafkaManualCommitinstance from the message header and performs a synchronous commit by invokingmanual.commit().Uses a lambda processor that asserts the presence of
KafkaManualCommitand calls commit.
Route "bar":
Same Kafka endpoint as "foo".
Routes messages to another mock endpoint (
KafkaTestUtil.MOCK_RESULT_BAR).The route is configured with
.autoStartup(false), meaning it will not start automatically with the Camel context. This could be for testing scenarios where the route is started manually or remains inactive during certain tests.
Test Method
@RepeatedTest(1) public void kafkaManualCommit() throws Exception
Executes the manual commit test once (specified by
@RepeatedTest(1)).Invokes
kafkaManualCommitTest(TOPIC)from the base class, which handles test logic such as producing messages to the topic, consuming them, and verifying that manual commits behave as expected.
Important Implementation Details
Manual Commit Logic:
The key focus of this test is the manual commit of Kafka offsets via the
KafkaManualCommitinterface. After consuming a message, the test explicitly retrieves this object from the exchange header and callscommit()synchronously. This ensures that the offset is committed only after the message has been processed, preventing message loss or duplicates due to automatic commits.Route Separation:
Defining two routes on the same Kafka topic but with different behaviors (one committing manually and one inactive) facilitates testing different consumption scenarios without interference.
Usage of
DefaultKafkaManualCommitFactory:This factory class provides the mechanism to create
KafkaManualCommitinstances which handle the commit operation. Using this factory is crucial for enabling manual commit functionality in Camel Kafka consumers.
Interaction with Other Components
Base Class:
BaseManualCommitTestSupportProvides Kafka topic setup, cleanup, and possibly utility methods for producing and consuming messages, and assertions for the tests.
KafkaTestUtilDefines mock endpoints (
MOCK_RESULT,MOCK_RESULT_BAR) used for verifying message receipt and processing in the routes.Apache Camel Kafka Component
The test exercises the Kafka consumer capabilities of the Camel Kafka component, especially the manual commit feature.
JUnit Jupiter
Utilizes JUnit 5 annotations such as
@AfterEachand@RepeatedTestfor test lifecycle management and repeated execution.
Usage Example
This class is a test class and is not intended for direct production use. However, it demonstrates how to configure a Camel Kafka consumer route for manual synchronous commits:
from("kafka:yourTopic?groupId=yourGroup&autoCommitEnable=false&allowManualCommit=true")
.process(exchange -> {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
manual.commit(); // Synchronously commit the offset after processing
}
})
.to("yourOutputEndpoint");
Mermaid Class Diagram
classDiagram
class KafkaConsumerSyncCommitIT {
+static final String TOPIC
+void after()
+RouteBuilder createRouteBuilder()
+void kafkaManualCommit()
}
KafkaConsumerSyncCommitIT --|> BaseManualCommitTestSupport
KafkaConsumerSyncCommitITextendsBaseManualCommitTestSupport.Key methods:
after(),createRouteBuilder(),kafkaManualCommit().Constant:
TOPIC.
Summary
`KafkaConsumerSyncCommitIT.java` is an integration test validating synchronous manual offset commits in Apache Camel Kafka consumers. It defines Kafka consumer routes with manual commit enabled, processes incoming messages, and explicitly commits offsets via the `KafkaManualCommit` interface. The class uses JUnit 5 for testing, and interacts with utility classes for Kafka topic management and mock endpoints. This test ensures that applications leveraging Camel Kafka consumers can safely and reliably manage offsets in a controlled manner, crucial for exactly-once or at-least-once message processing guarantees.