KafkaBreakOnFirstErrorWithBatchUsingAsyncCommitManagerIT.java
Overview
`KafkaBreakOnFirstErrorWithBatchUsingAsyncCommitManagerIT.java` is an integration test class within the Apache Camel Kafka component module. Its primary purpose is to **verify the behavior of Kafka consumer routes configured with the `breakOnFirstError` option enabled, using asynchronous manual offset commit management in batch processing mode**.
This test ensures that when a message processing error occurs, the route properly breaks processing on the first error and repeatedly retries the failing message without committing its offset. This behavior prevents subsequent messages from being consumed until the error is resolved, a key feature for reliable message processing in Kafka.
The class leverages:
Apache Camel routes configured with Kafka consumer endpoint options such as
allowManualCommit,breakOnFirstError, and a custom asynchronous commit manager.A Kafka producer to send test messages.
A
MockEndpointto assert expectations on consumed messages.Awaitility for asynchronous waiting on expected conditions.
JUnit 5 testing framework with OS restrictions to ensure reliable execution on supported platforms.
Class: KafkaBreakOnFirstErrorWithBatchUsingAsyncCommitManagerIT
Description
Integration test class extending `BaseKafkaTestSupport` to validate Kafka consumer behavior with batch consumption and error handling using async commit manager in Camel.
Key Constants
Constant | Description |
|---|---|
`ROUTE_ID` | The fixed route identifier string `"breakOnFirstErrorBatchIT"` used to manage the lifecycle of the Camel route. |
`TOPIC` | Unique Kafka topic name generated by appending a random UUID to `"breakOnFirstErrorBatchIT"`. Ensures isolated test data. |
Logger
LOG: Logger instance for debug output related to Kafka message consumption.
Fields
Field | Type | Description |
|---|---|---|
`errorPayloads` | `List` | Thread-safe list collecting payloads of messages that triggered errors during processing. |
`to` | `MockEndpoint` | Injected mock endpoint to which the Camel route sends successfully processed messages, used for assertions. |
`producer` | `KafkaProducer` | Kafka producer instance used to send test messages to the Kafka topic before starting the consumer route. |
Lifecycle Methods
before()
Purpose: Setup method executed before each test.
Behavior:
Initializes a Kafka producer with default properties.
Clears records captured by
MockConsumerInterceptorto ensure test isolation.
Parameters: None
Returns: void
after()
Purpose: Cleanup method executed after each test.
Behavior:
Closes the Kafka producer if it was initialized.
Deletes the Kafka topic used for the test to avoid interference with other tests.
Uses Awaitility to wait for topic deletion to complete, with a timeout of 180 seconds.
Parameters: None
Returns: void
Test Methods
kafkaBreakOnFirstErrorBasicCapability()
Purpose: Tests that the route properly breaks on the first error message and retries it indefinitely without processing subsequent messages.
Behavior:
Resets the
MockEndpointand sets expectations:Exactly 3 messages should be received:
"message-0","message-1","message-2".Messages beyond
"message-3"should not be received due to break-on-error.
Stops the route to allow message publishing before consumption.
Publishes 5 test messages (
"message-0"to"message-4") to Kafka.Starts the route to begin message consumption.
Waits asynchronously until more than 3 error messages have accumulated (triggered by
"message-3").Asserts that
MockEndpointexpectations are met.Verifies that all error payloads correspond to
"message-3".
Parameters: None
Returns: void
Throws: Exception if the test fails or times out.
Route Configuration
Method: createRouteBuilder()
Purpose: Defines the Camel route used during the test.
Route URI:
kafka:<TOPIC>?groupId=<ROUTE_ID>&autoOffsetReset=earliest&autoCommitEnable=false&allowManualCommit=true&breakOnFirstError=true&maxPollRecords=3&pollTimeoutMs=1000&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor
Key Route Options:
autoCommitEnable=falseandallowManualCommit=true: disables auto commit and enables manual offset commits.breakOnFirstError=true: route breaks processing on the first error.maxPollRecords=3: batch size for polling Kafka messages.pollTimeoutMs=1000: poll timeout for fetching messages.kafkaManualCommitFactory: uses an asynchronous manual commit factory for offset management.interceptorClasses: attaches a mock consumer interceptor for testing.
Route Steps:
Logs consumed message details with debug level.
Invokes
ifIsPayloadWithErrorThrowException(exchange)to simulate error on specific payload.Routes message to
mock:resultendpoint for validation.
Returns:
RouteBuilderinstance.
Helper Methods
publishMessagesToKafka()
Purpose: Sends five test messages
"message-0"through"message-4"to the Kafka topic using the producer.Parameters: None
Returns: void
Usage: Called before starting the route to populate Kafka with test data.
ifIsPayloadWithErrorThrowException(Exchange exchange)
Purpose: Simulates an error during message processing for the payload
"message-3".Behavior:
Extracts the message body as a string.
If the message equals
"message-3", adds it toerrorPayloadsand throws aRuntimeExceptionto trigger route error handling.
Parameters:
exchange- Apache CamelExchangeobject representing the current message.
Returns: void
Usage: Invoked within the route's processor step to induce controlled failure.
Important Implementation Details
Error Handling & Offset Management:
The test uses the
breakOnFirstError=trueoption combined withallowManualCommit=trueand an asynchronous manual commit factory.This setup allows Camel to control exactly when offsets are committed.
When a message (in this case
"message-3") causes an error, the route breaks processing, and the failing message is retried indefinitely without committing its offset.This ensures no subsequent messages are processed until the error is resolved, preserving message order and processing guarantees.
Batch Processing:
The Kafka consumer polls messages in batches of 3 (
maxPollRecords=3).The test asserts only the first 3 messages before the error are processed, demonstrating batch behavior with error break.
Asynchronous Commit Manager:
Uses
DefaultKafkaManualAsyncCommitFactoryto create a commit manager that commits offsets asynchronously.This prevents blocking the consumer thread, enabling higher throughput.
Testing Environment Constraints:
The test is disabled on some OS architectures due to unreliability (
@EnabledOnOsannotation).Uses Awaitility to handle asynchronous waiting for commit and topic deletion.
Interactions with Other Components
BaseKafkaTestSupport:
The test extends this base class which likely provides Kafka client configuration, embedded Kafka broker setup, and utility methods such as
getDefaultProperties().
MockConsumerInterceptor:
Attached to the Kafka consumer to capture records for test verification.
Camel Context and Route Controller:
The test manipulates the lifecycle of the Camel route (
stopRouteandstartRoute) to coordinate message publishing and consumption.
Kafka Admin Client:
Used to delete the test topic after each test to keep the test environment clean.
Apache Camel Kafka Component:
The file tests features specific to the Kafka component, focusing on manual offset commit and error handling capabilities.
Usage Example
This integration test is not designed for direct reuse but as a template illustrating how to:
Configure a Kafka consumer route in Apache Camel with manual asynchronous commits.
Enable break-on-first-error functionality to handle poison messages.
Produce test messages for consumption.
Assert expected message flow and error handling behavior using Camel’s
MockEndpoint.
Visual Diagram
classDiagram
class KafkaBreakOnFirstErrorWithBatchUsingAsyncCommitManagerIT {
- List<String> errorPayloads
- MockEndpoint to
- KafkaProducer<String, String> producer
+ void before()
+ void after()
+ void kafkaBreakOnFirstErrorBasicCapability()
+ RouteBuilder createRouteBuilder()
- void publishMessagesToKafka()
- void ifIsPayloadWithErrorThrowException(Exchange)
}
KafkaBreakOnFirstErrorWithBatchUsingAsyncCommitManagerIT --|> BaseKafkaTestSupport
class RouteBuilder {
+ void configure()
}
KafkaBreakOnFirstErrorWithBatchUsingAsyncCommitManagerIT o-- RouteBuilder : creates >
Summary
`KafkaBreakOnFirstErrorWithBatchUsingAsyncCommitManagerIT.java` is a focused integration test validating Apache Camel Kafka consumer behavior under batch processing with manual asynchronous commits and strict error handling. It demonstrates how Camel can be configured to halt consumer progress on the first error and retry problematic messages indefinitely, ensuring robust and reliable Kafka message processing in production systems.