KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java
Overview
`KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java` is an integration test class within the Apache Camel Kafka component suite. It validates the behavior of Kafka consumers configured with **breakOnFirstError=false** and **manual commit enabled** using `KafkaManualCommit` alongside a NOOP commit manager.
The test ensures that when a consumer encounters processing errors for some messages, it does **not halt** message consumption immediately but continues processing subsequent messages. It also verifies that offsets are committed manually only for successfully processed messages, demonstrating how Camel’s Kafka component handles error scenarios gracefully without stopping the route.
This class extends `BaseKafkaTestSupport`, leveraging the Camel testing framework and Kafka test utilities.
Class: KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT
Description
Test class that exercises Kafka consumer error handling when `breakOnFirstError` is disabled, and manual offset commits are performed programmatically using `KafkaManualCommit`. It publishes test messages to a Kafka topic, some of which deliberately trigger exceptions, and asserts that the consumer continues processing subsequent messages without stopping.
Annotations
@Tags({ @Tag("breakOnFirstError") }): Categorizes the test under "breakOnFirstError".@EnabledOnOs(...): Enables the test only on selected operating systems and CPU architectures, disabling it on unreliable platforms.
Constants
Name | Type | Description |
|---|---|---|
ROUTE_ID | String | Identifier for the Camel route under test (`breakOnFirstErrorOff`) |
TOPIC | String | Kafka topic name used for testing (`breakOnFirstErrorOff`) |
Fields
Name | Type | Description |
|---|---|---|
`to` | `MockEndpoint` | Camel Mock endpoint to assert received messages |
`producer` | Kafka producer instance to publish test messages |
Lifecycle Methods
@BeforeEach void before()
Initializes the Kafka producer with default properties and clears records captured by the mock consumer interceptor before each test run.@AfterEach void after()
Closes the Kafka producer and cleans up the test topic by deleting it via the Kafka admin client after each test run.
Test Methods
void kafkaBreakOnFirstErrorBasicCapability()
Purpose: Validates that the consumer continues processing messages even after encountering errors in some messages, since
breakOnFirstErroris set to false.Test Steps:
Reset and configure the mock endpoint to expect 4 messages.
Stop the Kafka consumer route.
Publish 6 messages to the Kafka topic (
message-0tomessage-5).Start the Kafka consumer route.
Await until at least 4 messages are received.
Assert that the expected messages were received, including skipping the problematic ones (
message-3,message-4) but still continuing tomessage-5.
Expected behavior: Messages with payload
message-3andmessage-4throw a runtime exception but do not stop message consumption. Offsets for successful messages are committed manually.
Key Methods and Utilities
RouteBuilder createRouteBuilder()
Purpose: Defines the Camel route used in the test, consuming from Kafka with specific options.
Route Configuration Details:
Kafka consumer endpoint with:
groupId=breakOnFirstErrorOffautoOffsetReset=earliestautoCommitEnable=false(manual commit enabled)allowManualCommit=truebreakOnFirstError=false(disables break on first error)maxPollRecords=1pollTimeoutMs=1000String deserializers for key and value
MockConsumerInterceptorto intercept records
Route ID set to
breakOnFirstErrorOff.Processing steps:
Logs the consumed message at debug level.
Invokes
ifIsPayloadWithErrorThrowException()to simulate failures on specific messages.Sends the message to the mock endpoint
to.Calls
doCommitOffset()to manually commit the Kafka offset for the message.
private void publishMessagesToKafka()
Publishes 6 messages (
message-0tomessage-5) to the Kafka topic used for testing.Uses the Kafka Java producer instance to send messages asynchronously.
**Usage Example:**
publishMessagesToKafka();
private void doCommitOffset(Exchange exchange)
Extracts the
KafkaManualCommitobject from the message headerKafkaConstants.MANUAL_COMMIT.Calls
commit()on theKafkaManualCommitobject to commit the offset manually for the processed message.Logs the commit operation.
**Parameters:**
Parameter | Type | Description |
|---|---|---|
exchange | Exchange | The Camel Exchange containing the Kafka message and headers |
**Throws:**
Assertion error if
KafkaManualCommitheader is missing (viaassertNotNull).
private void ifIsPayloadWithErrorThrowException(Exchange exchange)
Checks if the message body equals
"message-3"or"message-4".If yes, throws a
RuntimeExceptionto simulate a processing error.
**Parameters:**
Parameter | Type | Description |
|---|---|---|
exchange | Exchange | The Camel Exchange containing the Kafka message |
Important Implementation Details
Manual Offset Commit: The consumer disables automatic commits (
autoCommitEnable=false) and usesallowManualCommit=trueto manually commit offsets after successful processing.breakOnFirstError=false: The route is configured to not break the consumer on the first error encountered. This means that failures in processing individual messages do not stop the entire consumption process.
Error Simulation: Messages with payload
"message-3"and"message-4"deliberately cause exceptions to test the error handling behavior.MockConsumerInterceptor: Used to capture records for verification and to ensure the consumer behavior is as expected.
Route Lifecycle Management: The test explicitly stops and starts the route to control when messages are consumed.
Awaitility: Used to asynchronously wait for the expected number of messages to be received, ensuring test stability.
Interaction with Other Components
BaseKafkaTestSupport: The parent class likely provides Kafka cluster management, admin client, and Camel context setup, which this test relies on.
KafkaManualCommit: Part of the Kafka consumer API in Camel, enabling manual offset commit control.
MockEndpoint: Camel testing utility to assert received message counts and payloads.
Kafka Admin Client: Used in
@AfterEachto delete test topics and clean up Kafka resources.CamelKafkaUtil: Utility class for logging Kafka message details.
MockConsumerInterceptor: Kafka consumer interceptor used to track consumed records for testing purposes.
Usage Example
The following snippet illustrates how the test publishes messages and verifies consumption behavior:
// Stop the route to prepare for publishing messages
contextExtension.getContext().getRouteController().stopRoute(ROUTE_ID);
// Publish test messages to Kafka topic
publishMessagesToKafka();
// Start the route to begin consuming messages
contextExtension.getContext().getRouteController().startRoute(ROUTE_ID);
// Await until at least 4 messages are processed
Awaitility.await().atMost(30, TimeUnit.SECONDS)
.until(() -> to.getExchanges().size() > 3);
// Assert that the expected messages were received
to.assertIsSatisfied(3000);
Mermaid Class Diagram
classDiagram
class KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT {
-static final String ROUTE_ID
-static final String TOPIC
-static final Logger LOG
-MockEndpoint to
-KafkaProducer<String,String> producer
+void before()
+void after()
+void kafkaBreakOnFirstErrorBasicCapability()
+RouteBuilder createRouteBuilder()
-void publishMessagesToKafka()
-void doCommitOffset(Exchange exchange)
-void ifIsPayloadWithErrorThrowException(Exchange exchange)
}
KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT --|> BaseKafkaTestSupport
Summary
This integration test class is critical for verifying the resilience and correctness of Kafka consumer error handling within Apache Camel when manual commit control is used and the consumer is configured **not** to break on the first error. It ensures that message flows continue despite processing errors and that offset commits are handled properly, preventing message loss or duplication.
By simulating error scenarios and asserting message consumption, this test provides confidence in the fault tolerance capabilities of the Camel Kafka component in real-world streaming environments.