BaseManualCommitTestSupport.java
Overview
`BaseManualCommitTestSupport.java` is an **abstract base test class** designed for integration testing of Apache Camel Kafka components focusing on **manual offset commit behavior**. It extends `BaseKafkaTestSupport` and provides reusable utility methods and test logic to verify correct manual commit functionality when consuming Kafka messages in a Camel route.
This class facilitates testing scenarios such as:
Sending a batch of messages to Kafka, asserting their consumption.
Stopping and restarting the route to verify offset commit and recovery.
Using a
StateRepositoryto store and verify committed offsets.
It is primarily used as a superclass for concrete test classes targeting manual commit behaviors in Kafka consumer routes within the Apache Camel Kafka component integration tests.
Detailed Class Explanation
BaseManualCommitTestSupport
An abstract test support class for Kafka manual commit integration tests in Apache Camel.
Fields
Field Name | Type | Description |
|---|---|---|
`to` | `MockEndpoint` | Mock endpoint for verifying messages consumed from Kafka topic "foo". Injected via Camel's `@EndpointInject`. |
`toBar` | `MockEndpoint` | Mock endpoint for verifying messages consumed from Kafka topic "bar". Injected via Camel's `@EndpointInject`. *(Not used in provided code but available for extension)* |
`producer` | Kafka producer client used to send test messages to Kafka topics. Initialized before each test. |
Methods
void createClient()
Description:
Initializes the Kafka producer client using default properties before each test execution. This setup is done by the JUnit@BeforeEachannotation.Parameters: None
Returns: void
Usage:
Automatically invoked before each test to ensure a fresh Kafka producer instance.
void cleanupKafka(String topic)
Description:
Cleans up Kafka resources after tests by closing the Kafka producer and deleting the specified Kafka topic to ensure a clean state for subsequent tests.Parameters:
topic(String) : Kafka topic name to be deleted.
Returns: void
Usage Example:
cleanupKafka("my-test-topic");
void kafkaManualCommitTest(String topic) throws Exception
Description:
Executes a manual commit test scenario:Sets pre-execution expectations for messages.
Sends 5 messages to the Kafka topic.
Asserts that those messages are consumed.
Stops the Camel route consuming the topic (simulating downtime).
Sends 3 more messages while the route is stopped.
Restarts the route.
Asserts only the last 3 new messages are consumed (offsets committed correctly).
Parameters:
topic(String) : The Kafka topic name to test.
Returns: void
Throws:
ExceptionUsage Example:
kafkaManualCommitTest("foo");
void kafkaManualCommitTestWithStateRepository(String topic, StateRepository<String, String> stateRepository) throws Exception
Description:
Similar tokafkaManualCommitTest, but additionally verifies that the committed offset is stored in a providedStateRepository. It waits until the offset state is updated and asserts the expected offset is stored after message consumption.Parameters:
topic(String) : Kafka topic name.stateRepository(StateRepository<String, String>) : State repository instance used to persist offsets.
Returns: void
Throws:
ExceptionUsage Example:
kafkaManualCommitTestWithStateRepository("foo", myStateRepository);
void setupPostExecutionExpectations()
Description:
Sets expectations on thetoMockEndpoint for messages expected after restarting the route:Expects 3 messages received.
Expected bodies are
"message-5","message-6","message-7"in any order.
Parameters: None
Returns: void
Usage:
Used internally after route restart to assert correct message consumption from the committed offset.
void sendRecords(int startIndex, int lastIndex, String topic)
Description:
Sends a sequence of Kafka messages to the specified topic. Message keys are fixed as"1", and message bodies are"message-" + position.Parameters:
startIndex(int) : Start index (inclusive) of messages to send.lastIndex(int) : End index (exclusive) of messages to send.topic(String) : Kafka topic name.
Returns: void
Usage Example:
sendRecords(0, 5, "foo"); // Sends message-0 to message-4
void setupPreExecutionExpectations()
Description:
Sets expectations on thetoMockEndpoint before the first batch of messages is sent:Expects 5 messages.
Expected bodies are
"message-0"through"message-4"in any order.Asserts that each message contains a non-null header
KafkaConstants.LAST_RECORD_BEFORE_COMMIT, which is important for manual commits signaling.
Parameters: None
Returns: void
Usage:
Called before sending initial messages to assert correct message receipt and manual commit header presence.
Important Implementation Details
Manual Offset Commit Verification:
The tests simulate a real-world scenario where a Kafka consumer manually commits offsets after processing messages. The class verifies that after stopping and restarting the route, the consumer resumes consumption from the last committed offset, not from the beginning.State Repository Usage:
ThekafkaManualCommitTestWithStateRepositorymethod tests integration with aStateRepository— a key-value store used to persist offsets externally. This is important for fault tolerance and recovery in distributed systems.Use of Awaitility:
The method uses Awaitility to wait asynchronously for the state repository to be updated with the committed offset, which may not be instantaneous.MockEndpoints:
Camel'sMockEndpointis used to define expected message counts and bodies, providing a powerful way to verify message flow and content during integration tests.Producer Management:
The Kafka producer is created fresh before each test and closed during cleanup to avoid resource leaks.
Interaction with Other Components
Extends:
BaseKafkaTestSupport— likely provides Kafka cluster setup, common utilities, and base integration test scaffolding.
Uses:
Apache Camel's routing context and route controller to stop/start routes (
contextExtension.getContext().getRouteController()).Kafka's
KafkaProducerto send messages during tests.MockEndpointto assert message consumption.StateRepositoryinterface for offset persistence abstraction.
Integration:
This class is part of the Apache Camel Kafka component integration tests, verifying manual commit semantics in Kafka consumer routes. It interacts with Kafka brokers, Camel routing engine, and state repositories to simulate and verify message consumption workflows.
Usage Example (Pseudo)
public class ManualCommitTest extends BaseManualCommitTestSupport {
@Test
public void testManualCommitScenario() throws Exception {
String topic = "foo";
kafkaManualCommitTest(topic);
}
@Test
public void testManualCommitWithStateRepo() throws Exception {
String topic = "foo";
StateRepository<String, String> stateRepo = new InMemoryStateRepository<>();
kafkaManualCommitTestWithStateRepository(topic, stateRepo);
}
}
Mermaid Class Diagram
classDiagram
class BaseManualCommitTestSupport {
<<abstract>>
-MockEndpoint to
-MockEndpoint toBar
-KafkaProducer<String,String> producer
+void createClient()
+void cleanupKafka(String topic)
+void kafkaManualCommitTest(String topic)
+void kafkaManualCommitTestWithStateRepository(String topic, StateRepository<String,String> stateRepository)
+void setupPostExecutionExpectations()
+void sendRecords(int startIndex, int lastIndex, String topic)
+void setupPreExecutionExpectations()
}
BaseManualCommitTestSupport --|> BaseKafkaTestSupport
Summary
`BaseManualCommitTestSupport.java` serves as a foundational test class that simplifies and standardizes manual offset commit testing for Kafka consumers in Apache Camel. It encapsulates essential test workflows — sending messages, asserting consumption, stopping/starting routes, and verifying offsets with or without external state repositories — enabling robust validation of Kafka manual commit functionality across multiple test scenarios.