KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java
Overview
`KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java` is an integration test class within the Apache Camel Kafka component project. It tests the behavior of Kafka consumer routes configured with the [breakOnFirstError](/projects/289/68714) option enabled, combined with manual offset commit management via `KafkaManualCommit`.
The primary focus of this test is to verify that when a consumer route configured for batch processing encounters an error in one message, the route breaks processing on that first error and handles offset commits manually to avoid continuous retries of the failing message. This test ensures that the manual commit mechanism works correctly with the [breakOnFirstError](/projects/289/68714) flag and batch consumption, preventing infinite processing loops of problematic messages.
The test uses an embedded Kafka environment, Camel routes, and mocks to simulate Kafka message production and consumption, validating message processing order, error handling, and offset commits.
Detailed Explanation
Class: KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT
Package:
org.apache.camel.component.kafka.integrationExtends:
BaseKafkaTestSupport(presumably a Kafka integration test base class providing Kafka client setup and utilities)
Purpose
This class tests Camel Kafka consumer route behavior when:
breakOnFirstError=true— the consumer stops processing the batch upon the first error.allowManualCommit=true — offset commits are manually controlled in the route.
KafkaManualCommitis used to commit offsets explicitly.Batch consumption is enabled via
maxPollRecords=3.
Constants
Constant | Description |
|---|---|
`ROUTE_ID` | The identifier of the Camel Kafka route. |
`TOPIC` | The Kafka topic used for testing. |
Fields
Field | Type | Description |
|---|---|---|
`to` | `MockEndpoint` | Mock endpoint to assert message reception. Injected by Camel. |
`producer` | `KafkaProducer` | Kafka producer to send test messages into the topic. |
Lifecycle Methods
@BeforeEach before()Initializes Kafka producer with default properties.
Clears static records captured by MockConsumerInterceptor (used for Kafka consumer interception during tests).
@AfterEach after()Closes the Kafka producer.
Deletes the Kafka topic used for testing to clean up.
Test Method
@Test kafkaBreakOnFirstErrorBasicCapability()Resets and sets expectations on the
MockEndpointto receive 7 messages.Comments explain the behavior difference before and after a Camel fix (CAMEL-20044):
Old behavior: The message causing error (
message-3) would be retried once.New behavior with NOOP Commit Manager: The error message is processed once; offsets are committed manually to avoid retries.
Stops the route before publishing messages.
Publishes 6 messages (
message-0tomessage-5) to the Kafka topic.Starts the route again.
Uses Awaitility to wait until at least 6 messages are received by the mock endpoint.
Asserts the mock endpoint received the expected messages.
Route Configuration
Defined in overridden
createRouteBuilder()method.Route Properties:
From Kafka topic
TOPICwith:groupId=ROUTE_IDautoOffsetReset=earliestautoCommitEnable=false(disables Kafka auto commit)allowManualCommit=true (enables manual commit in Camel)
breakOnFirstError=true(break batch processing on first error)maxPollRecords=3(batch size)pollTimeoutMs=1000(poll timeout)String deserializers for key and value
MockConsumerInterceptor attached for intercepting consumer records
Route Steps:
Log consumed message.
Check payload via
ifIsPayloadWithErrorThrowException: throws an exception if message body is"message-3".Send message to
mock:resultendpoint.Commit offsets manually using
doCommitOffset.
Error Handling:
On any
Exception:Error is not handled (
handled(false)), meaning the exception propagates.Sends the error exchange to
mock:result.Calls
doCommitOffsetto commit offsets manually to avoid continuous retries.
Private Helper Methods
publishMessagesToKafka()Sends 6 messages (
message-0tomessage-5) to the Kafka topic using the producer.
doCommitOffset(Exchange exchange)Retrieves the
KafkaManualCommitheader from the exchange.Asserts it is not null.
Calls
commit()on theKafkaManualCommitinstance to commit offsets manually.Logs the commit action.
ifIsPayloadWithErrorThrowException(Exchange exchange)Extracts the message body as a string.
Throws a
RuntimeExceptionif the message equals"message-3"to simulate a processing error.
Important Implementation Details
Manual Offset Commit with
KafkaManualCommit:Unlike default Kafka consumer behavior which can use auto-commit or a NOOP commit manager, this test explicitly disables auto-commit and requires the route to manually commit offsets after processing each batch of messages (or after error handling). This is critical when breakOnFirstError is enabled to prevent the Kafka consumer from reprocessing the same failing message endlessly.
Batch Processing and breakOnFirstError:
The Kafka consumer is configured to poll up to 3 messages per batch (
maxPollRecords=3). WithbreakOnFirstError=true, if any message in the batch throws an exception (heremessage-3), the batch processing breaks immediately, and the error handling logic commits offsets manually to avoid retry loops.Error Handling and Retry Behavior:
The test verifies that when an error occurs, the message causing the error is processed once (no infinite retry), and subsequent messages in the batch continue to be processed after the route restarts.
Integration with MockConsumerInterceptor:
The interceptor is used to monitor consumer records during the test, which may assist in verifying consumed messages or debugging.
Platform Specific Test Execution:
The test is annotated to run only on certain operating systems and architectures due to reliability issues on some platforms.
Interaction with Other Parts of the System
BaseKafkaTestSupportProvides Kafka client utilities, embedded Kafka setup, and administrative client access (
kafkaAdminClient) for topic management.
Apache Camel Kafka Component
This test validates the Kafka component's consumer behavior with manual commit and error handling features.
MockEndpointUsed to verify messages received by the route, ensuring expected messages are processed despite errors.
KafkaManualCommitCore to offset commit management in this test, ensuring explicit control over Kafka offsets.
Used for capturing Kafka consumer records for testing or debugging purposes.
Usage Example
This test class is intended to be executed as part of the integration test suite during Kafka component development. It ensures that changes to the [breakOnFirstError](/projects/289/68714) and manual commit features maintain expected behavior.
To run the test:
mvn test -Dtest=KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT
Mermaid Class Diagram
classDiagram
class KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT {
- static final String ROUTE_ID
- static final String TOPIC
- Logger LOG
- MockEndpoint to
- KafkaProducer<String, String> producer
+ void before()
+ void after()
+ void kafkaBreakOnFirstErrorBasicCapability()
+ RouteBuilder createRouteBuilder()
- void publishMessagesToKafka()
- void doCommitOffset(Exchange)
- void ifIsPayloadWithErrorThrowException(Exchange)
}
KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT --|> BaseKafkaTestSupport
Summary
`KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java` is a focused integration test ensuring the Apache Camel Kafka consumer route behaves correctly with manual offset commits and batch error handling enabled. It validates that message processing halts on the first error per batch, commits offsets explicitly to avoid infinite retries, and resumes processing subsequent messages correctly. This test safeguards critical Kafka consumer features pivotal for robust message processing in fault scenarios.