KafkaBreakOnFirstErrorSeekIssueIT.java


Overview

`KafkaBreakOnFirstErrorSeekIssueIT.java` is an integration test class designed to verify the behavior of the Apache Camel Kafka component's **breakOnFirstError** functionality. This test specifically addresses a previously reported issue (CAMEL-19894) where offsets were not correctly committed during batch processing when using the synchronous commit manager in Kafka consumers.

The test mimics a real-world scenario where a Kafka consumer configured with multiple partitions and multiple consumer threads breaks processing on the first error encountered in a batch. It ensures that message offsets are correctly managed, and that messages causing exceptions are retried without blocking processing of other partitions.

This test is essential to validate the robustness of the Camel Kafka component when error handling and manual offset commits are used together under concurrent consumption scenarios.


Detailed Class and Method Descriptions

Class: KafkaBreakOnFirstErrorSeekIssueIT

Extends: `BaseKafkaTestSupport` Package: `org.apache.camel.component.kafka.integration`

This class contains the integration test that validates the fix for the CAMEL-19894 issue.


Constants

Name

Type

Description

`ROUTE_ID`

String

Unique route identifier combining a fixed prefix with a random UUID. Used as Kafka consumer group ID.

TOPIC

String

Unique Kafka topic name combining a fixed prefix with a random UUID.

`PARTITION_COUNT`

int

Number of partitions created for the Kafka topic (set to 2).

`CONSUMERS_COUNT`

int

Number of Kafka consumers in the Camel route (set to 4, more than partitions to test concurrency).


Properties

Name

Type

Description

`errorPayloads`

`CopyOnWriteArrayList`

Thread-safe list to track payloads that triggered errors during message processing.

`to`

`MockEndpoint`

Camel mock endpoint injected for asserting test expectations on consumed messages.

`producer`

KafkaProducer

Kafka producer used to publish test messages to the topic.


Lifecycle Methods

@BeforeAll static void setupTopic()


@BeforeEach void init()


@AfterEach void after()


Test Method

@Test void testCamel19894TestFix() throws Exception


Overridden Method

RouteBuilder createRouteBuilder()


Private Helper Methods

void ifIsFifthRecordThrowException(Exchange e)


void publishMessagesToKafka()


Important Implementation Details and Algorithms


Interaction with Other System Components


Usage Example

KafkaBreakOnFirstErrorSeekIssueIT test = new KafkaBreakOnFirstErrorSeekIssueIT();
test.setupTopic();      // Setup Kafka topic
test.init();            // Initialize producer and topic
test.testCamel19894TestFix();  // Run the test
test.after();           // Cleanup

Mermaid Class Diagram

classDiagram
    class KafkaBreakOnFirstErrorSeekIssueIT {
        -List~String~ errorPayloads
        -MockEndpoint to
        -KafkaProducer~String,String~ producer
        +static void setupTopic()
        +void init()
        +void after()
        +void testCamel19894TestFix()
        +RouteBuilder createRouteBuilder()
        -void ifIsFifthRecordThrowException(Exchange)
        -void publishMessagesToKafka()
    }
    KafkaBreakOnFirstErrorSeekIssueIT --|> BaseKafkaTestSupport

Summary

The `KafkaBreakOnFirstErrorSeekIssueIT` integration test class is a critical test validating the Camel Kafka component’s handling of batch processing errors with manual offset commits. It reproduces a previously reported bug, confirming that the consumer properly stops on the first error in a batch and retries the failed messages without committing offsets prematurely. The test involves multiple partitions and consumers, message production, and precise control over the route lifecycle, providing a robust verification of Kafka consumer error semantics in Camel.

This test helps maintain the reliability and correctness of message processing workflows in real-world fault scenarios within the Apache Camel Kafka integration.