KafkaBreakOnFirstErrorReplayOldMessagesIT.java
Overview
`KafkaBreakOnFirstErrorReplayOldMessagesIT` is an integration test class designed to verify the behavior of the Apache Camel Kafka component when the [breakOnFirstError](/projects/289/68714) feature is enabled. This test specifically addresses a known issue (CAMEL-20044) related to incorrect handling of Kafka offset commits that could lead to replaying old messages erroneously.
The class mimics a real-world scenario where multiple Kafka messages are consumed with some intentionally causing errors. It validates that the route breaks on the first error and commits offsets correctly to prevent message replay. This ensures robustness when consuming Kafka messages in cases where some messages cause non-retryable exceptions.
The test performs the following high-level operations:
Create a Kafka topic with multiple partitions.
Produce a sequence of messages, including some that trigger errors.
Define a Camel route consuming from Kafka with manual offset commit and breakOnFirstError enabled.
Validate that the route stops processing on the first error and commits offsets correctly.
Confirm that no messages are replayed unnecessarily.
Classes and Methods
Class: KafkaBreakOnFirstErrorReplayOldMessagesIT
**Package:** `org.apache.camel.component.kafka.integration`
**Extends:** `BaseKafkaTestSupport`
This class contains JUnit 5 integration tests and related setup/teardown methods to validate Kafka consumer behavior under error conditions.
Fields
Field Name | Type | Description |
|---|---|---|
`ROUTE_ID` | `String` | The unique route ID used in the Camel route (`"breakOnFirstError-20044"`). |
`TOPIC` | `String` | Kafka topic name used in tests (`"breakOnFirstError-20044"`). |
`Logger` | Logger instance for logging debug and error messages. | |
`to` | `MockEndpoint` | Injected Camel mock endpoint to capture and assert messages received by the route. |
`producer` | Kafka producer used to send test messages into the topic. |
Annotations
@Tagsand@Tag("breakOnFirstError"): Categorizes the test.@DisabledIfSystemProperty: Disables the test in certain CI environments due to unreliability.@EnabledOnOs: Enables the test only on specific OS and architectures for stability.
Lifecycle Methods
@BeforeAll
`public static void setupTopic()`
Ensures the Kafka topic is created before any tests run.
Creates a topic with 3 partitions and 1 replication factor.
Uses Kafka Admin Client utilities for topic management.
@BeforeEach
`public void init()`
Initializes the Kafka producer with default properties before each test.
Clears any previously captured records from the mock consumer interceptor.
@AfterEach
`public void after()`
Closes the Kafka producer after each test.
Deletes the test Kafka topic to clean up state.
Test Method
@Test
`void testCamel20044TestFix()`
Resets the mock endpoint and sets expectations on messages to be received.
Stops the route before publishing messages to Kafka.
Publishes a list of 13 messages, some of which contain the string
"ERROR"to trigger exceptions.Starts the route to begin consuming messages.
Waits asynchronously (using Awaitility) for assertions to be satisfied within 10 seconds.
Asserts that all expected messages (including those triggering errors) are received exactly once without replay.
**Expected Messages:** `"1", "2", "3", "4", "5", "ERROR", "6", "7", "ERROR", "8", "9", "10", "11"`
Route Definition
createRouteBuilder()
Returns an anonymous `RouteBuilder` that defines the Kafka consumer route with the following characteristics:
Kafka URI Parameters:
groupId: Set toROUTE_ID.autoOffsetReset:earliest(consume from the beginning if no committed offset).autoCommitEnable: false (manual offset commit).allowManualCommit:true(enables manual commit).breakOnFirstError:
true(stop processing on first error).maxPollRecords:1(consume one record per poll).consumersCount:3(number of consumer threads).pollTimeoutMs:1000(poll timeout in ms).keyDeserializerandvalueDeserializer: Use String deserializers.interceptorClasses: Uses MockConsumerInterceptor for monitoring.
Error Handling:
On
RuntimeException(triggered by messages with"ERROR"payload), the exception is not handled (handled(false)).Before propagating the exception, the offset is committed manually via
doCommitOffset(exchange)to avoid replay.
Processing Steps:
Logs consumed message details.
Sends the message to the
mock:resultendpoint (to).Checks if the payload is
"ERROR"and throws aRuntimeExceptionto simulate failure.Commits the Kafka offset manually after processing.
The route is configured with
autoStartup(false)to allow manual control in tests.
Helper Methods
private void ifIsPayloadWithErrorThrowException(Exchange exchange)
Extracts the message body as a string from the Camel exchange.
Throws a
RuntimeExceptionwith a fixed message if the payload equals"ERROR".This simulates a non-retryable error condition in the route.
**Parameters:**
exchange: The CamelExchangecontaining the message.
**Throws:**
RuntimeExceptionif payload equals"ERROR".
private void publishMessagesToKafka()
Publishes a fixed list of 13 messages to the Kafka topic.
Messages include normal string values and
"ERROR"to trigger exceptions during consumption.Uses the Kafka producer to send messages asynchronously.
**Messages Published:** `"1", "2", "3", "4", "5", "ERROR", "6", "7", "ERROR", "8", "9", "10", "11"`
private void doCommitOffset(Exchange exchange)
Extracts the KafkaManualCommit object from the exchange header
KafkaConstants.MANUAL_COMMIT.If present, calls its
commit()method to manually commit the Kafka offset.Logs debug information before committing.
Logs an error if the manual commit object is missing.
**Parameters:**
exchange: The Camel exchange containing the Kafka message and commit header.
Important Implementation Details
Manual Offset Commit: The test enforces manual offset commits to precisely control when offsets are committed, crucial for testing offset commit correctness when errors occur.
Break on First Error: The
breakOnFirstError=trueoption causes the route to stop consuming further messages after the first error, ensuring no further messages are processed until the problem is addressed.Error Handling Strategy: The route uses
onExceptionwith handled(false) to let exceptions propagate (fail the route), but commits offsets before failing to avoid replay.Multi-Threaded Consumption: The test uses 3 consumers (
consumersCount=3) to simulate concurrent consumption and verify that the issue does not arise in parallel processing scenarios.Topic Setup and Teardown: Topic lifecycle management ensures clean state for each test run.
MockConsumerInterceptor: Used to monitor Kafka consumer behavior and captured records, aiding test assertions.
Interaction with Other System Components
Apache Camel Kafka Component: This test directly exercises the Kafka consumer functionality within Apache Camel.
Kafka Broker: The test requires a running Kafka cluster to produce and consume messages.
Camel Route Controller: The test controls the lifecycle of the route by stopping and starting it to simulate real-world consumption scenarios.
KafkaAdminUtil: Utility class used for Kafka topic creation and deletion.
Mock Endpoint (
mock:result): Used for verifying message receipt and contents within the Camel context.Awaitility: Used to asynchronously wait for test completion conditions in a robust manner.
Testing Framework: The class uses JUnit 5 annotations and lifecycle methods for integration testing.
Usage Example
This is an integration test class and typically run as part of the project's test suite. It is not designed to be invoked directly by application code.
To run the test:
mvn test -Dtest=KafkaBreakOnFirstErrorReplayOldMessagesIT
or within an IDE, run the test class as a JUnit test.
Mermaid Class Diagram
classDiagram
class KafkaBreakOnFirstErrorReplayOldMessagesIT {
<<Test Class>>
+static String ROUTE_ID
+static String TOPIC
-KafkaProducer<String,String> producer
-MockEndpoint to
+static void setupTopic()
+void init()
+void after()
+void testCamel20044TestFix()
+RouteBuilder createRouteBuilder()
-void ifIsPayloadWithErrorThrowException(Exchange)
-void publishMessagesToKafka()
-void doCommitOffset(Exchange)
}
KafkaBreakOnFirstErrorReplayOldMessagesIT --|> BaseKafkaTestSupport
Summary
`KafkaBreakOnFirstErrorReplayOldMessagesIT` is a critical integration test ensuring the Kafka consumer route in Apache Camel correctly handles offset commits when the [breakOnFirstError](/projects/289/68714) option is enabled. It validates that messages causing errors do not cause unintended replay of previously processed messages, thus preserving data processing integrity. The class uses a combination of Kafka topic management, manual offset commits, and controlled error injection to simulate and verify correct behavior under failure scenarios in a multi-threaded Kafka consumer environment.