KafkaBreakOnFirstErrorWithBatchUsingSyncCommitManagerIT.java
Overview
`KafkaBreakOnFirstErrorWithBatchUsingSyncCommitManagerIT` is an integration test class designed to verify the behavior of the Apache Camel Kafka component when using the **breakOnFirstError** option combined with **manual offset commit** managed via a synchronous commit manager.
The test validates that when a message processing failure occurs (specifically on a designated payload), Camel breaks processing on the first error encountered within a batch of Kafka records, and retries the failed message indefinitely without committing its offset. This ensures no subsequent messages in the batch are processed until the error is resolved (simulated here by continuous failure). The test also confirms that manual offset commits are controlled correctly by Camel's [DefaultKafkaManualCommitFactory](/projects/289/68562).
This class extends `BaseKafkaTestSupport`, leveraging its Kafka test infrastructure to produce messages and manage Kafka topics.
Detailed Explanation
Package and Imports
Package:
org.apache.camel.component.kafka.integrationImports include JUnit 5 annotations for testing, Apache Camel classes for route building and mocking, Kafka client classes for producing messages, and Awaitility for asynchronous assertions.
Class: KafkaBreakOnFirstErrorWithBatchUsingSyncCommitManagerIT
Description
This class tests the "break on first error" functionality in Kafka consumers with batch processing and manual commits using a synchronous commit manager.
Constants
Name | Description |
|---|---|
`ROUTE_ID` | String identifier for the route used in the test. |
`TOPIC` | Kafka topic name used in the test. |
Logger
LOG- SLF4J logger to log debug information during message consumption.
Fields
Field | Type | Description |
|---|---|---|
`errorPayloads` | `CopyOnWriteArrayList` | Thread-safe list capturing payloads that trigger errors during processing. |
`to` | `MockEndpoint` (injected) | Apache Camel mock endpoint to assert consumed messages in the route. |
`producer` | Kafka producer instance used to send test messages to the Kafka topic. |
Lifecycle Methods
before()
Purpose: Setup method run before each test.
Functionality:
Initializes the Kafka producer with default properties.
Clears static records captured in MockConsumerInterceptor to ensure a clean state.
after()
Purpose: Cleanup method run after each test.
Functionality:
Closes the Kafka producer if initialized.
Deletes the Kafka topic used in the test to clean up resources.
Test Method
kafkaBreakOnFirstErrorBasicCapability()
Description: Verifies that when a processing error occurs on a message in a batch, the route breaks processing at the first error and retries the failed message indefinitely.
Steps:
Resets and sets expectations on the mock endpoint to receive exactly 3 messages:
message-0,message-1, andmessage-2.Stops the Camel route to allow pre-loading messages into Kafka.
Publishes 5 messages (
message-0throughmessage-4) to Kafka.Starts the Camel route which consumes messages.
Uses Awaitility to wait (up to 30 seconds) until the error payload list contains more than 3 entries, indicating repeated retries of the failing message.
Asserts that the mock endpoint received the expected 3 messages.
Asserts that all error payloads are
message-3.
Outcome: Confirms that processing halts on the first error (
message-3) and that the message is retried infinitely without proceeding to subsequent messages (message-4).
Route Definition
Method: createRouteBuilder()
Returns:
RouteBuilderinstance defining the Kafka consumer route.Route Configuration:
Consumes from Kafka topic
TOPIC.Configured with:
groupId set to the route ID.
autoOffsetReset=earliest to start from the beginning of the topic.
autoCommitEnable=falseand allowManualCommit=true to disable automatic commits and allow manual offset commits.breakOnFirstError=trueto stop processing on the first error encountered.maxPollRecords=3to consume batches of 3 messages.pollTimeoutMs=1000to set poll timeout.Custom key and value deserializers for strings.
kafkaManualCommitFactory set to DefaultKafkaManualCommitFactory for sync manual commit handling.
interceptorClassesset to MockConsumerInterceptor to intercept consumer records for testing.
Processing steps:
Logs the consumed message.
Calls
ifIsPayloadWithErrorThrowException()to inject error on a specific payload.Sends successful messages to the mock endpoint
to.
Helper Methods
publishMessagesToKafka()
Description: Publishes 5 test messages (
message-0tomessage-4) to the Kafka topic using the producer.Usage: Called before starting the route to populate Kafka with test data.
ifIsPayloadWithErrorThrowException(Exchange exchange)
Description: Checks if the message payload is
"message-3".Behavior:
If yes, adds payload to
errorPayloadslist and throws aRuntimeExceptionto simulate processing failure.Otherwise, processing continues normally.
Purpose: Simulates a processing error that triggers the "break on first error" behavior.
Important Implementation Details
Manual Commit Management: The test disables Kafka's auto-commit and enables manual offset commit via DefaultKafkaManualCommitFactory. This allows Camel to control offset commits explicitly, crucial for the "break on first error" behavior to be effective.
Batch Processing: The Kafka consumer is configured to poll 3 records at a time (
maxPollRecords=3). The test asserts that processing halts within a batch upon encountering an error.Error Handling Logic: The route throws an exception intentionally when processing
message-3. Because breakOnFirstError is enabled, consumption does not proceed beyond this message, and the offset is not committed, causing the message to be retried indefinitely.Use of Awaitility: The test waits asynchronously for the error payload list to accumulate retries, confirming that the failing message is repeatedly reprocessed.
Platform Restrictions: The test is enabled only on certain operating systems and architectures due to reliability concerns on some platforms.
Interaction with Other Parts of the System
BaseKafkaTestSupport: This class extends
BaseKafkaTestSupport, which presumably provides Kafka test utilities such as default properties, Kafka admin clients, and Camel context extensions.MockConsumerInterceptor: Used as a Kafka consumer interceptor to capture consumed records for testing purposes.
Apache Camel Kafka Component: The core component being tested, specifically its behavior in manual commit mode with batch consumption and error handling.
Camel Context and Route Controller: The test programmatically stops and starts the Camel route to control when messages are consumed.
Kafka Broker: The test interacts with a Kafka broker instance to produce and consume messages on a test topic.
Usage Example
This test class is primarily for automated testing and is run as part of the integration test suite. It can be executed using standard JUnit 5 test runners or build tools like Maven or Gradle.
Mermaid Class Diagram
classDiagram
class KafkaBreakOnFirstErrorWithBatchUsingSyncCommitManagerIT {
-static final String ROUTE_ID
-static final String TOPIC
-static final Logger LOG
-List~String~ errorPayloads
-MockEndpoint to
-KafkaProducer~String, String~ producer
+void before()
+void after()
+void kafkaBreakOnFirstErrorBasicCapability()
+RouteBuilder createRouteBuilder()
-void publishMessagesToKafka()
-void ifIsPayloadWithErrorThrowException(Exchange)
}
KafkaBreakOnFirstErrorWithBatchUsingSyncCommitManagerIT --|> BaseKafkaTestSupport
Summary
`KafkaBreakOnFirstErrorWithBatchUsingSyncCommitManagerIT` is a specialized integration test validating that Apache Camel's Kafka component correctly handles batch consumption with manual offset commits, halting and retrying on the first error within a batch. It ensures offsets are committed only for successfully processed messages and that error messages are retried indefinitely when [breakOnFirstError](/projects/289/68714) is enabled. The test uses a combination of Camel routes, Kafka producer/consumer settings, and JUnit/Awaitility assertions to simulate and verify this behavior.