KafkaBatchingProcessingAutoCommitErrorHandlingIT.java
Overview
`KafkaBatchingProcessingAutoCommitErrorHandlingIT.java` is an integration test class designed to verify Kafka batch processing behavior when using **auto-commit** mode with **error handling** in Apache Camel routes. It specifically tests how the system behaves when part of a batch fails processing but the batch is auto-committed regardless.
This file extends a support class `BatchingProcessingITSupport` (not shown here) and focuses on:
Consuming Kafka messages in batches.
Handling exceptions in batch processing without stopping the offset commits.
Using Camel's
onExceptionwithcontinued(true)to allow the route to continue processing subsequent batches even if some records fail.Verifying that batch processing and error handling behave as expected during the test run.
The test simulates a failure on the 4th record in a batch, checking that other records process successfully and that the batch offset is still committed.
Class Details
KafkaBatchingProcessingAutoCommitErrorHandlingIT
**Package:** `org.apache.camel.component.kafka.integration.batching`
**Extends:** `BatchingProcessingITSupport`
This is the main integration test class verifying Kafka batch processing with auto-commit and error handling.
Fields
Field Name | Type | Description |
|---|---|---|
`LOG` | Logger | Logger instance for this class. |
`TOPIC` | String | Kafka topic name used in tests. |
`invalidExchangeFormat` | boolean (volatile) | Flag to track if the exchange body format is invalid during processing. |
Methods and Their Usage
after()
@AfterEach
public void after()
Purpose: Cleanup Kafka topic after each test run.
Behavior: Calls
cleanupKafka(TOPIC)to clear test data to avoid cross-test contamination.Parameters: None
Return: void
Usage: JUnit lifecycle method annotated with
@AfterEach, automatically called after each test.
createRouteBuilder()
@Override
protected RouteBuilder createRouteBuilder()
Purpose: Defines the Camel route used for batch processing in this test.
Return: An instance of
RouteBuilderconfigured with Kafka batch consumer and error handling logic.Details:
Consumes from Kafka topic
TOPICwith parameters:groupId=KafkaBatchingProcessingITpollTimeoutMs=1000batching=true(enables batch consumption)maxPollRecords=10(max 10 messages per batch)autoOffsetReset=earliest(start from earliest offset if no committed offset found)
Configures exception handling for
IllegalArgumentExceptionwith.continued(true), meaning the route will log the error but continue processing subsequent batches, and offsets will be auto-committed.The route processes each batch by iterating over the list of
Exchangeobjects representing each Kafka record.On processing the 4th record in the batch, it deliberately throws an
IllegalArgumentExceptionto simulate a failure.Successfully processed exchanges are sent to a mock endpoint
KafkaTestUtil.MOCK_RESULTfor assertions.
Usage: Called internally by the Camel test framework to define route behavior.
kafkaAutoCommit()
@Test
public void kafkaAutoCommit() throws Exception
Purpose: JUnit test method that initiates the Kafka batch processing test and verifies results.
Behavior:
Calls
kafkaManualCommitTest(TOPIC)(inherited fromBatchingProcessingITSupport) to produce test messages and start route processing.Asserts that
invalidExchangeFormatremainsfalse, ensuring the received body is a list ofExchangeobjects as expected.
Parameters: None
Return: void
Usage: Executes the test scenario validating auto-commit behavior and error handling for batch processing.
Important Implementation Details
Batch Processing with Auto-commit and Error Handling
Batching enabled: The Kafka consumer is configured with
batching=trueandmaxPollRecords=10, so it consumes up to 10 messages per poll as a batch.Exchange body: Each batch is delivered as a list of
Exchangeobjects, each representing a Kafka record.Error handling with continued processing:
The route uses
onException(IllegalArgumentException.class).continued(true)which means if anIllegalArgumentExceptionis thrown during batch processing, the exception is caught, logged, and the route continues without marking the batch as failed.This simulates a scenario where partial batch failures do not stop the offset commit for the entire batch.
Simulated failure:
The code throws an
IllegalArgumentExceptionon the 4th record processed within a batch to test error handling.Despite the exception, the batch will be auto-committed because of
.continued(true).
Result endpoint:
Processed exchanges are sent to a mock endpoint
KafkaTestUtil.MOCK_RESULTfor assertion or verification during the test.
Interaction with Other Parts of the System
Extends
BatchingProcessingITSupport: This base class likely provides common Kafka setup, message production, and utility methods such askafkaManualCommitTestandcleanupKafka.Uses
KafkaTestUtil: Provides static utilities such asMOCK_RESULTendpoint for validating test outcomes.Logging: Uses SLF4J
Loggerfor logging processing information and exception details.Camel Kafka component: Leverages the Apache Camel Kafka component to configure and consume Kafka topics with batch processing and offset commit semantics.
JUnit 5: Uses JUnit 5 annotations and assertions for integration testing.
Usage Example
This class is primarily for internal automated testing and is not invoked directly by users. When running the integration tests, it:
Creates Kafka topics and produces messages using inherited utilities.
Starts a Camel route consuming messages in batches from Kafka.
Simulates a failure on the 4th message in the batch.
Checks that the batch is still committed (auto-commit) and that processing continues.
Verifies no invalid exchange formats were encountered.
Mermaid Class Diagram
classDiagram
class KafkaBatchingProcessingAutoCommitErrorHandlingIT {
-static final Logger LOG
-static final String TOPIC
-volatile boolean invalidExchangeFormat
+void after()
+RouteBuilder createRouteBuilder()
+void kafkaAutoCommit()
}
KafkaBatchingProcessingAutoCommitErrorHandlingIT --|> BatchingProcessingITSupport
Summary
`KafkaBatchingProcessingAutoCommitErrorHandlingIT.java` is a focused integration test validating Apache Camel's Kafka batch consumer behavior when auto-commit is enabled and part of a batch fails processing. It demonstrates how to configure error handling to continue processing and commit offsets even if some records fail, a useful pattern in production systems where partial failures should not block progress.
This test guarantees that:
Batches are properly consumed as lists of
Exchangeobjects.Exception handling does not interrupt the Kafka offset commit.
The route handles partial failures gracefully.
The system logs and tracks error conditions for failed records.
The file integrates tightly with the broader Kafka integration test infrastructure and Camel Kafka component, relying on inherited utilities and conventions for setup and verification.