KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java
Overview
`KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java` is an integration test class within the Apache Camel Kafka component suite. This test specifically validates the **breakOnFirstError** functionality when consuming Kafka messages in batches, combined with manual offset commit management and retry semantics.
The test demonstrates how to:
Use manual offset commits (allowManualCommit=true) with
KafkaManualCommitto control when offsets are committed.Break processing on the first error in a batch (
breakOnFirstError=true), ensuring that messages causing failures are retried.Retry processing of problematic messages without committing their offsets, so the Kafka consumer re-processes them.
Use batch consumption (
maxPollRecords=3) to consume multiple messages per poll and handle errors within these batches.
This class extends a base Kafka test support class and contains a Kafka route configured for manual commit and error handling, along with a test method verifying retry behavior when an error occurs.
Class: KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT
Description
This JUnit 5 test class verifies that Kafka consumer routes configured with [breakOnFirstError](/projects/289/68663) and manual commit correctly retry messages that fail processing within a batch, without committing their offsets until successful processing.
Important Annotations
@Tags({ @Tag("breakOnFirstError") })— categorizes the test.@EnabledOnOs(...)— enables the test on specific operating systems only, due to known reliability issues on some platforms.
Constants
Constant | Type | Description |
|---|---|---|
`ROUTE_ID` | String | Identifier for the Camel route under test. |
`TOPIC` | String | Kafka topic name used in this test. |
Fields
Field Name | Type | Description |
|---|---|---|
`to` | `MockEndpoint` | Mock endpoint to capture and assert messages from the route. |
`producer` | Kafka producer instance used to send test messages. |
Lifecycle Methods
before()
Purpose: Setup before each test execution.
Actions:
Initializes a Kafka producer with default properties.
Clears any previously captured records from the mock consumer interceptor.
after()
Purpose: Cleanup after each test execution.
Actions:
Closes the Kafka producer if initialized.
Deletes the test Kafka topic to clean up state.
Test Method
kafkaBreakOnFirstErrorBasicCapabilityRetry()
Description: Validates that when a message in a batch triggers an exception, the route breaks on the first error, does not commit offsets for failed messages, and retries processing of that message until it succeeds.
Steps:
Resets the mock endpoint.
Stops the route to publish messages first.
Publishes six messages (
message-0throughmessage-5) to Kafka.Starts the route to begin consuming messages.
Uses Awaitility to wait up to 10 seconds for at least 8 messages to be consumed (including retries).
Asserts that messages
message-4andmessage-5were never consumed because the route breaks on the first error.Asserts that
message-3was consumed multiple times (due to retry).Verifies that mock endpoint conditions are met within 3 seconds.
Throws:
Exceptionif test conditions fail or timeout occurs.
Route Definition
createRouteBuilder()
Returns:
RouteBuilderinstance defining the Kafka consumer route.Route Configuration:
Source: Kafka topic (
breakOnFirstErrorBatchRetryIT) with parameters:groupId = route id
autoOffsetReset=earliestautoCommitEnable=false(manual commit mode)allowManualCommit=true (enables manual offset commit)
breakOnFirstError=true(stop processing batch on first error)maxPollRecords=3(batch size)pollTimeoutMs=1000Key/value deserializers as String.
interceptorClassesset to a mock interceptor to capture records.
Error Handling:
onException(Exception.class)is configured as:Not handled (
handled(false)), so exceptions propagate.Message routed to the mock endpoint for inspection.
No offset commit on error, enabling retries.
Processors:
Logs Kafka message consumption.
Throws an exception if the payload equals
"message-3"(to simulate an error).Sends processed message to the mock endpoint.
Commits the Kafka offset manually upon successful processing.
Private Helper Methods
publishMessagesToKafka()
Description: Publishes six messages (
message-0tomessage-5) to the test Kafka topic.Usage: Called before starting the route to seed Kafka with test data.
doCommitOffset(Exchange exchange)
Parameters:
exchange: The Camel Exchange from the Kafka consumer route.
Description: Retrieves the
KafkaManualCommitinstance from the message header and commits the offset manually.Implementation Detail:
Asserts the manual commit instance is not null.
Calls
commit()to acknowledge message processing.
Usage: Called after successful processing of a message batch.
ifIsPayloadWithErrorThrowException(Exchange exchange)
Parameters:
exchange: The Camel Exchange from the Kafka consumer route.
Description: Throws a
RuntimeExceptionif the message body equals"message-3"to simulate a processing error and trigger retry logic.Usage: Used to simulate failure for testing breakOnFirstError behavior.
Implementation Details & Algorithms
Batch Consumption with Error Handling: The consumer fetches messages in batches of 3 (
maxPollRecords=3). On encountering an exception during processing of any message in the batch, the route breaks processing and does not commit offsets for the batch.Manual Offset Commit: Offsets are committed explicitly via
KafkaManualCommit.commit(). This prevents Kafka from auto-committing offsets for messages that caused errors, so those messages are retried.Retry Logic: By not committing offsets when an error occurs, Kafka re-delivers the same messages on the next poll. The test verifies that the message causing the exception (
message-3) is retried multiple times.Mock Interceptor Integration: The route uses a custom mock consumer interceptor to capture consumed records for testing and verification purposes.
Interaction with Other Components
BaseKafkaTestSupport: This class extends a base test class which likely provides Kafka test lifecycle management such as embedded Kafka brokers, topic creation, and Kafka admin client access.
Kafka Producer: Used to seed messages into Kafka before the route starts consuming.
Camel Context and RouteController: The test programmatically stops and starts the route to control message consumption timing.
MockEndpoint: Used to capture and assert messages passing through the route, enabling verification of expected behavior and message flow.
Awaitility: Used to wait asynchronously for route processing to complete or reach certain conditions.
Usage Example
This test class is primarily for automated integration testing within the Apache Camel Kafka component. To run the test:
Ensure the Kafka environment and dependencies are available.
Run the test via JUnit 5 compatible test runner.
Observe that messages causing errors are retried, and that no offsets are committed for failed messages.
Class Diagram
classDiagram
class KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT {
- static final String ROUTE_ID
- static final String TOPIC
- static final Logger LOG
- MockEndpoint to
- KafkaProducer<String,String> producer
+ void before()
+ void after()
+ void kafkaBreakOnFirstErrorBasicCapabilityRetry()
+ RouteBuilder createRouteBuilder()
- void publishMessagesToKafka()
- void doCommitOffset(Exchange)
- void ifIsPayloadWithErrorThrowException(Exchange)
}
KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT ..|> BaseKafkaTestSupport
Summary
`KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java` is a focused integration test validating Kafka batch message consumption with manual offset commit and "break on first error" semantics in Camel routes. It helps ensure that messages causing exceptions trigger retries without prematurely committing offsets, thus preserving message processing guarantees in Kafka-Camel integrations.