KafkaBreakOnFirstErrorReleaseResourcesIT.java
Overview
`KafkaBreakOnFirstErrorReleaseResourcesIT.java` is an integration test class designed to validate the **breakOnFirstError** functionality in the Apache Camel Kafka component. It specifically addresses a resource leakage issue (notably heartbeat threads) identified in ticket **CAMEL-20563**, ensuring that resources are properly released when Kafka consumers reconnect after an error occurs.
This test class uses Apache Kafka and Camel's Kafka component to simulate scenarios where messages cause processing errors, then verifies that the consumers clean up resources correctly upon reconnecting. The test also validates the manual offset commit behavior and ensures the number of heartbeat threads matches the number of Kafka consumers, preventing thread leaks.
Detailed Explanation
Class: KafkaBreakOnFirstErrorReleaseResourcesIT
This class extends `BaseKafkaTestSupport` (a test base class likely providing Kafka test infrastructure) and uses JUnit 5 for testing. It is annotated with tags and OS conditions to control test execution based on environment reliability.
Key Characteristics:
Test Focus: breakOnFirstError functionality and resource cleanup (heartbeat threads).
Kafka Setup: Creates a topic with 3 partitions.
Consumer Configuration: Uses 3 consumers with manual commit and
breakOnFirstError=true.Message Flow: Sends messages including some that trigger errors, then verifies message consumption and resource cleanup.
Test Reliability: Disabled on some CI environments due to flakiness.
Constants and Fields
Name | Type | Description |
|---|---|---|
`ROUTE_ID` | `String` | Unique route ID with thread hash to isolate test runs. |
`String` | Kafka topic name, unique per test run for isolation. | |
`Logger` | Logger instance for logging test information. | |
`CONSUMER_COUNT` | `int` | Number of Kafka consumers used in the route (3). |
`to` | `MockEndpoint` | Mock endpoint to assert message reception in the route. |
`producer` | Kafka producer to send test messages to the topic. |
Lifecycle Methods
@BeforeAll static void setupTopic()
Purpose: Initializes Kafka admin client and creates the test topic with 3 partitions.
Behavior: Uses KafkaAdminUtil and the
servicecontext to create the topic before any tests run.
@BeforeEach void init()
Purpose: Initializes Kafka producer with default properties and clears captured records from interceptor.
Behavior: Runs before each test method to reset state.
@AfterEach void after()
Purpose: Closes the Kafka producer and deletes the test topic to clean up after each test.
Behavior: Ensures no side effects remain between test executions.
Test Method
void testCamel20563TestFix()
Purpose: Validates that breakOnFirstError works correctly and heartbeat threads are not leaked.
Test Steps:
Resets mock endpoint expectations.
Sets expected messages including "ERROR" payloads that will throw exceptions.
Stops the route to reset state.
Publishes a series of messages to Kafka, including some that trigger errors.
Starts the route and waits for message consumption.
Asserts that all expected messages are received (including retries around errors).
Counts heartbeat threads and asserts the count matches consumer count.
Assertions:
Messages received match expected bodies.
Heartbeat thread count equals the number of consumers, confirming no thread leakage.
Timeout: Waits up to 30 seconds with an 8-second initial delay to accommodate processing.
Supporting Methods
int countHeartbeatThreads()
Purpose: Counts active threads related to Kafka consumer heartbeats matching the test route ID.
Implementation:
Enumerates all JVM threads.
Counts threads whose names contain
"heartbeat"and the route-specific string.Logs thread names for debugging.
Returns: Number of heartbeat threads found.
RouteBuilder createRouteBuilder()
Purpose: Defines the Camel Kafka consumer route under test.
Route Details:
Consumes from the TOPIC with specific consumer configurations:
groupId set to
ROUTE_IDautoOffsetReset=earliestManual commit enabled (
allowManualCommit=true,autoCommitEnable=false)breakOnFirstError=trueto stop on first processing error3 consumers (
consumersCount=3)Poll timeout 1000 ms
Max poll records 1
Uses MockConsumerInterceptor to capture records
The route:
Logs consumption info
Sends messages to the
mock:resultendpoint for assertionsThrows RuntimeException if payload equals "ERROR"
Commits offsets manually on each message or on exception
The route is configured to not start automatically (
autoStartup(false)).
void ifIsPayloadWithErrorThrowException(Exchange exchange)
Purpose: Checks message payload and throws a RuntimeException if it equals "ERROR".
Effect: Simulates a non-retryable error to test breakOnFirstError behavior.
void publishMessagesToKafka()
Purpose: Sends a predefined list of messages to the Kafka topic.
Messages: Includes normal strings and two "ERROR" messages to trigger exceptions.
void doCommitOffset(Exchange exchange)
Purpose: Commits Kafka consumer offsets manually.
Implementation:
Retrieves the KafkaManualCommit object from the exchange header.
Calls
commit()on it if present.Logs error if the commit object is missing.
Important Implementation Details
Manual Offset Commit: Offsets are manually committed after processing or on exception to ensure message delivery semantics are maintained despite errors.
breakOnFirstError: Configured to stop processing immediately after encountering a processing error, preventing further messages from being consumed until the route restarts.
Heartbeat Threads: Kafka consumers maintain heartbeat threads to keep the group membership alive. This test ensures these threads are properly released after the route stops and restarts, preventing resource leaks.
Test Isolation: Usage of unique topic and route IDs via thread hashcode ensures test runs do not interfere.
Interceptor Usage: MockConsumerInterceptor collects records for verification and debugging.
Interaction with Other System Components
Kafka Cluster: The test directly interacts with a Kafka cluster (likely embedded or test container based) by creating topics, producing messages, and consuming them through Camel Kafka component.
Apache Camel Context: The route is managed within the Camel runtime, leveraging its route lifecycle and error handling.
Kafka Admin Client: Used to create and delete topics programmatically.
JUnit 5 & Awaitility: Testing and synchronization frameworks to manage test execution and timing.
MockEndpoint: Camel testing component to assert message receptions.
Logging: SLF4J Logger is used for debugging and informational output.
Usage Example
This is a test class, so its usage is primarily for integration testing. To run the test:
mvn test -Dtest=KafkaBreakOnFirstErrorReleaseResourcesIT
Or within an IDE:
Run
KafkaBreakOnFirstErrorReleaseResourcesITas a JUnit test.Ensure Kafka is available or embedded Kafka test environment is active.
Observe test logs confirming heartbeat thread counts and message assertions.
Mermaid Class Diagram
classDiagram
class KafkaBreakOnFirstErrorReleaseResourcesIT {
+static void setupTopic()
+void init()
+void after()
+void testCamel20563TestFix()
+int countHeartbeatThreads()
+RouteBuilder createRouteBuilder()
-void ifIsPayloadWithErrorThrowException(Exchange)
-void publishMessagesToKafka()
-void doCommitOffset(Exchange)
-KafkaProducer<String,String> producer
-MockEndpoint to
}
KafkaBreakOnFirstErrorReleaseResourcesIT --|> BaseKafkaTestSupport
Summary
`KafkaBreakOnFirstErrorReleaseResourcesIT.java` is a focused integration test ensuring that the Apache Camel Kafka consumer properly handles error scenarios with `breakOnFirstError=true` and does not leak consumer heartbeat threads during route stops and restarts. It combines Kafka admin operations, producer/consumer interaction, Camel routing with manual offset commits, and JUnit testing practices to verify robustness of the Kafka component in error conditions.