KafkaPausableConsumerCircuitBreakerIT.java
Overview
`KafkaPausableConsumerCircuitBreakerIT.java` is an integration test class designed to validate the behavior of a Kafka consumer within the Apache Camel framework when combined with a circuit breaker and pausable consumption mechanism. The test ensures that the Kafka consumer pauses message processing based on a conditional logic (simulating failures) and resumes processing once conditions improve, leveraging the Resilience4j circuit breaker for fault tolerance.
This file serves as a practical demonstration of how to:
Use pausable Kafka consumers to control message consumption dynamically.
Integrate Resilience4j circuit breaker with Camel routes to manage fault tolerance.
Simulate downstream failures and recovery.
Validate message processing and header propagation in Kafka-Camel integration.
Detailed Explanation of Classes and Methods
Class: KafkaPausableConsumerCircuitBreakerIT
This class extends `BaseKafkaTestSupport`, which provides Kafka environment setup and utilities for testing.
Constants and Fields
Name | Type | Description |
|---|---|---|
`SOURCE_TOPIC` | `String` | Kafka topic used for testing the pausable consumer with circuit breaker. |
`LOG` | `Logger` | Logger instance for logging test events. |
`SIMULATED_FAILURES` | `int` | Number of simulated failures before allowing message processing to succeed. |
`count` | `LongAdder` | Thread-safe counter tracking the number of processing attempts or failures. |
`executorService` | `ScheduledExecutorService` | Scheduled executor for simulating downstream system availability checks during errors. |
`producer` | `KafkaProducer` | Kafka producer used to send test messages to `SOURCE_TOPIC`. |
Static Methods
canContinue() : boolean
Determines whether the pausable Kafka consumer should continue processing messages or pause.
Logic:
Allows processing for the first message (
count <= 1).Pauses processing for messages until the number of failures reaches
SIMULATED_FAILURES.Resumes processing once failures exceed or equal
SIMULATED_FAILURES.
Returns:
trueif processing should continue.falseif processing should pause.
Usage: Passed as a lambda to the
.pausable()DSL method in the Kafka route to dynamically control consumption.
if (count.intValue() <= 1) return true;
if (count.intValue() >= SIMULATED_FAILURES) return true;
return false;
increment() : void
Increments the failure count by 1.
Usage: Simulates downstream system checks by increasing the failure count periodically when the circuit breaker detects an error.
getCount() : int
Returns the current failure count.
Lifecycle Methods
before() : void
Runs before each test.
Initializes Kafka producer with default Kafka properties.
Clears any previously captured records in the mock interceptor.
after() : void
Runs after each test.
Closes the Kafka producer to free resources.
Deletes the test Kafka topic to clean state for subsequent tests.
Overridden Method
createRouteBuilder() : RouteBuilder
Creates and configures Camel routes for the test.
Defines a Resilience4j
CircuitBreakernamed"pausable".Registers event listeners on the circuit breaker:
onSuccess: Shuts down the scheduled executor service simulating downstream availability checks.
onError: Starts a scheduled task that increments the failure count every second.
Binds the circuit breaker to Camel's registry under the name
"pausableCircuit".Defines two Camel routes:
Kafka Consumer Route (
from("kafka:" + SOURCE_TOPIC + ...)):Uses
.pausable()withcanContinue()to control message consumption dynamically.Applies the circuit breaker configured with
"pausableCircuit".Logs incoming Kafka messages.
Routes messages to
"direct:intermediate"for further processing.
Intermediate Processor Route (
from("direct:intermediate")):Throws a
RuntimeCamelExceptionfor the firstSIMULATED_FAILURESmessages to simulate failures.Routes successfully processed messages to a mock endpoint (
KafkaTestUtil.MOCK_RESULT) for assertion.
Test Method
kafkaMessageIsConsumedByCamel() : void
Annotation:
@Testand@Timeout(1 minute max)Sends 5 messages to the Kafka topic
SOURCE_TOPICwith custom headers.Sets expectations on the mock endpoint:
Exactly 5 messages should be received.
Each message body should be
"message-0"through"message-4".The special Kafka header
LAST_RECORD_BEFORE_COMMITshould not be present due to auto-commit enabled.The "PropagatedCustomHeader" should be preserved in the message headers.
The "CamelSkippedHeader" should be omitted.
Uses Awaitility to wait until the failure count exceeds 5 (indicating processing has proceeded past simulated failures).
Asserts that messages and headers meet expectations.
Purpose: Verifies that the pausable consumer and circuit breaker mechanisms correctly handle message retries and header propagation.
Important Implementation Details and Algorithms
Pausable Consumer Mechanism: Uses a dynamic lambda
canContinue()to control whether Kafka consumer polls/processes new messages. This simulates a scenario where processing is paused during downstream failures.Circuit Breaker Integration: The Resilience4j circuit breaker monitors downstream processing. On error, it triggers a scheduled periodic task (simulating downstream system checks) that increments a failure counter. On success, it shuts down this task.
Failure Simulation: The intermediate processor route intentionally throws exceptions for the first
SIMULATED_FAILURESmessages to simulate downstream failure conditions and test circuit breaker's reaction.Header Propagation: The test verifies that Kafka headers are correctly propagated or filtered during message consumption and processing, ensuring important headers survive while others (like skipped headers) do not.
Concurrency Considerations: The use of
LongAdderensures thread-safe increments and reads of the failure count.
Interaction with Other System Components
Apache Camel Kafka Component: This test exercises the Kafka component's consumer capabilities, specifically testing the pausable consumer feature.
Resilience4j Circuit Breaker: Provides fault tolerance and resilience in the Camel route by controlling message processing retries and backoff.
Kafka Broker: The test produces and consumes messages from an actual Kafka topic, requiring a running Kafka broker (usually managed by the
BaseKafkaTestSupport).Mock Endpoint: The Camel mock endpoint captures processed messages for assertions in tests.
MockConsumerInterceptor: Used to intercept Kafka consumer records for additional testing utilities or validation.
Usage Example
This file is an integration test and is typically executed as part of the project's test suite. Running this test will:
Produce messages to Kafka.
Consume messages using a pausable consumer that is controlled by circuit breaker status.
Simulate downstream system failures and recovery.
Assert that only successfully processed messages reach the mock endpoint.
Validate Kafka message header propagation behavior.
Mermaid Class Diagram
classDiagram
class KafkaPausableConsumerCircuitBreakerIT {
+static final String SOURCE_TOPIC
-static final Logger LOG
-static final int SIMULATED_FAILURES
-static LongAdder count
-static ScheduledExecutorService executorService
-KafkaProducer<String, String> producer
+static boolean canContinue()
+static void increment()
+static int getCount()
+void before()
+void after()
+RouteBuilder createRouteBuilder()
+void kafkaMessageIsConsumedByCamel()
}
KafkaPausableConsumerCircuitBreakerIT --|> BaseKafkaTestSupport
Summary
`KafkaPausableConsumerCircuitBreakerIT.java` is a comprehensive integration test showcasing how to combine Apache Camel's Kafka component with a pausable consumer pattern and a Resilience4j circuit breaker. It simulates downstream failures, tests dynamic pausing and resuming of message consumption, and verifies proper message and header processing within Kafka-Camel routes. This test is vital for ensuring robust, fault-tolerant Kafka consumers in production-grade Camel applications.