KafkaBreakOnFirstErrorSeekIssueIT.java
Overview
`KafkaBreakOnFirstErrorSeekIssueIT.java` is an integration test class designed to verify the behavior of the Apache Camel Kafka component's **breakOnFirstError** functionality. This test specifically addresses a previously reported issue (CAMEL-19894) where offsets were not correctly committed during batch processing when using the synchronous commit manager in Kafka consumers.
The test mimics a real-world scenario where a Kafka consumer configured with multiple partitions and multiple consumer threads breaks processing on the first error encountered in a batch. It ensures that message offsets are correctly managed, and that messages causing exceptions are retried without blocking processing of other partitions.
This test is essential to validate the robustness of the Camel Kafka component when error handling and manual offset commits are used together under concurrent consumption scenarios.
Detailed Class and Method Descriptions
Class: KafkaBreakOnFirstErrorSeekIssueIT
Extends: `BaseKafkaTestSupport` Package: `org.apache.camel.component.kafka.integration`
This class contains the integration test that validates the fix for the CAMEL-19894 issue.
Constants
Name | Type | Description |
|---|---|---|
`ROUTE_ID` | String | Unique route identifier combining a fixed prefix with a random UUID. Used as Kafka consumer group ID. |
String | Unique Kafka topic name combining a fixed prefix with a random UUID. | |
`PARTITION_COUNT` | int | Number of partitions created for the Kafka topic (set to 2). |
`CONSUMERS_COUNT` | int | Number of Kafka consumers in the Camel route (set to 4, more than partitions to test concurrency). |
Properties
Name | Type | Description |
|---|---|---|
`errorPayloads` | `CopyOnWriteArrayList` | Thread-safe list to track payloads that triggered errors during message processing. |
`to` | `MockEndpoint` | Camel mock endpoint injected for asserting test expectations on consumed messages. |
`producer` | Kafka producer used to publish test messages to the topic. |
Lifecycle Methods
@BeforeAll static void setupTopic()
Purpose: Initializes Kafka admin client before all tests run.
Details: Creates an admin client instance to manage Kafka topics.
Usage: Ensures Kafka topics can be created/deleted during tests.
@BeforeEach void init()
Purpose: Runs before each test to set up Kafka producer, clear interceptors, and create the test topic.
Implementation:
Creates Kafka producer with default properties.
Clears any previous records captured by
MockConsumerInterceptor.Creates a new topic with
PARTITION_COUNTpartitions.Waits until partitions are fully created using Awaitility for reliable test setup.
Note: Topic creation moved here from
BeforeAllto avoid stale state between test runs.
@AfterEach void after()
Purpose: Cleans up resources after each test.
Implementation:
Closes Kafka producer if initialized.
Deletes the test topic from Kafka to avoid interference with other tests.
Test Method
@Test void testCamel19894TestFix() throws Exception
Purpose: The core test validating break-on-first-error behavior.
Test Logic:
Configures the mock endpoint to expect exactly 5 messages with specific payloads.
Stops the Kafka route temporarily.
Verifies the topic has the expected partition count.
Publishes messages to the Kafka topic across partitions.
Starts the route to begin consuming.
Waits until error-triggering payloads ("3" and "8") are detected in the
errorPayloadslist.Asserts that the mock endpoint received the expected messages, confirming correct processing and error handling.
Expected Behavior:
Messages "3" (partition 0) and "8" (partition 1) throw exceptions and are retried indefinitely.
Other messages in both partitions are processed normally.
The consumer stops processing more records from a partition once an error is encountered due to
breakOnFirstError=true.
Usage Example:
KafkaBreakOnFirstErrorSeekIssueIT test = new KafkaBreakOnFirstErrorSeekIssueIT(); test.testCamel19894TestFix();
Overridden Method
RouteBuilder createRouteBuilder()
Purpose: Configures the Camel route used in the test.
Route Details:
From Kafka topic with:
Group ID set to
ROUTE_ID.autoOffsetReset=earliest.autoCommitEnable=falseand manual commits enabled.breakOnFirstError=trueto stop on first error in batch.maxPollRecords=8.consumersCount=4(more consumers than partitions).Heartbeat and metadata refresh intervals set to 1000 ms.
String deserializers for key and value.
Uses synchronous manual commit factory.
Interceptor class:
MockConsumerInterceptor.
Route is not auto-started.
Processes each message with:
Logging of consumption.
Logic to throw an exception when the message body is "3" or "8".
Sends messages to a mock endpoint (
mock:result).
Private Helper Methods
void ifIsFifthRecordThrowException(Exchange e)
Purpose: Throws an exception for specific message bodies to simulate processing failure.
Parameters:
Exchange e: Camel exchange containing the message.
Behavior:
If the message body is
"3"or"8", adds it toerrorPayloadsand throwsRuntimeException.
Effect: Causes the Camel route to break on the first error in the batch for these messages, triggering retries.
void publishMessagesToKafka()
Purpose: Publishes a predefined set of test messages into Kafka partitions.
Implementation:
Partition 0: Publishes messages
"1","2","3","4".Partition 1: Publishes messages
"5","6","7","8","9","10","11".Messages are sent with explicit partition keys ("k0" and "k1").
Usage: Used in the test method to inject messages to be consumed.
Important Implementation Details and Algorithms
Break on First Error: The route uses the
breakOnFirstError=trueoption, which instructs the Kafka consumer to stop processing further records within the current poll batch as soon as an error occurs. This protects against offset commits for partially processed batches, ensuring message processing consistency.Manual Offset Commit: The route disables auto commit (
autoCommitEnable=false) and uses manual commit with a synchronous commit manager to control when offsets are committed, crucial for testing correct offset management under errors.Concurrency and Partitioning: Multiple consumers (4) are configured to consume from 2 partitions to test concurrent consumption and ensure that errors in one partition do not block processing of other partitions.
Error Triggering: The test deliberately triggers exceptions on specific messages ("3" and "8") to simulate failure scenarios and verify retry and offset management logic.
Use of Awaitility: The test uses Awaitility to wait asynchronously for conditions (topic creation completion, error payloads detection) to ensure reliable test execution without race conditions.
Interaction with Other System Components
Kafka Cluster: The test interacts directly with a Kafka cluster to create topics, produce messages, and consume messages using the Camel Kafka component.
Camel Kafka Component: This test validates the behavior of the Camel Kafka consumer, particularly focusing on the break-on-first-error feature and manual offset commit mechanisms.
MockConsumerInterceptor: Used as an interceptor to capture consumed records for verification/debugging purposes.
Camel Context and Route Controller: The test starts and stops the Kafka route programmatically to control test flow.
KafkaAdminClient: Used to create and delete Kafka topics dynamically during test setup and teardown.
Usage Example
KafkaBreakOnFirstErrorSeekIssueIT test = new KafkaBreakOnFirstErrorSeekIssueIT();
test.setupTopic(); // Setup Kafka topic
test.init(); // Initialize producer and topic
test.testCamel19894TestFix(); // Run the test
test.after(); // Cleanup
Mermaid Class Diagram
classDiagram
class KafkaBreakOnFirstErrorSeekIssueIT {
-List~String~ errorPayloads
-MockEndpoint to
-KafkaProducer~String,String~ producer
+static void setupTopic()
+void init()
+void after()
+void testCamel19894TestFix()
+RouteBuilder createRouteBuilder()
-void ifIsFifthRecordThrowException(Exchange)
-void publishMessagesToKafka()
}
KafkaBreakOnFirstErrorSeekIssueIT --|> BaseKafkaTestSupport
Summary
The `KafkaBreakOnFirstErrorSeekIssueIT` integration test class is a critical test validating the Camel Kafka component’s handling of batch processing errors with manual offset commits. It reproduces a previously reported bug, confirming that the consumer properly stops on the first error in a batch and retries the failed messages without committing offsets prematurely. The test involves multiple partitions and consumers, message production, and precise control over the route lifecycle, providing a robust verification of Kafka consumer error semantics in Camel.
This test helps maintain the reliability and correctness of message processing workflows in real-world fault scenarios within the Apache Camel Kafka integration.