KafkaConsumerBadPortSupervisingHealthCheckIT.java
Overview
`KafkaConsumerBadPortSupervisingHealthCheckIT.java` is an integration test class designed to validate the health check mechanisms of a Kafka consumer within the Apache Camel framework when configured with an incorrect Kafka broker port. The test ensures that Apache Camel's Supervising Route Controller can detect and manage route failures caused by bad broker configurations, and that Camel's health check system correctly reports the liveness and readiness of the Kafka consumer routes.
The test class specifically:
Configures a Kafka consumer route with an intentionally incorrect Kafka broker port.
Uses Camel's Supervising Route Controller to automatically handle route restarts and backoff strategies.
Validates the health check system by asserting that:
Liveness reports UP when the route is active.
Readiness reports DOWN when the route cannot connect to Kafka due to the bad port.
Verifies that no messages are consumed due to the incorrect broker configuration, ensuring the supervising controller and health checks behave as expected.
This test is part of the Kafka component integration tests under the health check suite.
Class and Method Details
Class: KafkaConsumerBadPortSupervisingHealthCheckIT
Extends: `KafkaHealthCheckTestSupport` (not shown here but assumed to provide Kafka test infrastructure and Camel context support)
This class contains several JUnit 5 tests that verify the health check behavior of a Kafka consumer route managed by the Supervising Route Controller when configured with a non-existent Kafka broker port.
Fields
Name | Type | Description |
|---|---|---|
`TOPIC` | `String` | Kafka topic used for testing (`"test-health"`). |
`LOG` | `Logger` | SLF4J Logger for logging trace messages. |
Methods
void configureContext(CamelContext context)
Purpose:
Configures the Camel context for the test environment.Behavior:
Sets a properties component location reference
"ref:prop".Installs a
DefaultSupervisingRouteControllerwith specified backoff parameters (delay, max attempts, initial delay).Creates and configures a Kafka component:
Sets the Kafka broker to an invalid port (
service.getBootstrapServers() + 123).Disables pre-validation of host and port to allow startup despite invalid broker.
Adds the Kafka component to the Camel context.
Parameters:
context- the CamelContext to configure.
Usage Example:
Called automatically by the test framework before route creation.
RouteBuilder createRouteBuilder()
Purpose:
Defines the Camel route for the Kafka consumer under test.Behavior:
Creates a route consuming messages from the Kafka topic
test-healthusing a group IDKafkaConsumerBadPortSupervisingHealthCheckIT.Configures deserializers for keys and values as
StringDeserializer.Enables auto commit with a 1000 ms interval.
Adds a consumer interceptor class for testing purposes.
The route logs consumed messages at TRACE level.
Routes messages to a mock endpoint
KafkaTestUtil.MOCK_RESULT.
Returns:
A configured
RouteBuilderinstance.
Usage Example:
Automatically used by the Camel testing framework to install routes during test setup.
void testReportUpWhenIsUp()
Test Annotation:
@Test,@Order(1),@DisplayName("Tests that liveness reports UP when it's actually up")Purpose:
Verifies that the liveness health check reports UP state when the route is running.Behavior:
Invokes Camel's health check helper to get liveness results.
Asserts all health check results are UP.
Throws:
None explicitly.
Example:
Run automatically as part of the test suite to confirm liveness.
void testReportCorrectlyWhenDown()
Test Annotation:
@Test,@Order(2),@DisplayName("Tests that readiness reports down when it's actually down")Purpose:
Validates that the readiness health check reports DOWN state when the Kafka consumer cannot connect due to a bad broker port.Behavior:
Uses Awaitility to repeatedly check readiness status for up to 20 seconds.
Calls
readinessCheckhelper method to assert readiness is DOWN with messages containing "port".
Throws:
None explicitly.
static void readinessCheck(CamelContext context)
Purpose:
Helper method to check readiness health check results.Behavior:
Invokes readiness health checks.
Asserts all results are DOWN.
Asserts that all diagnostic messages contain the word "port" indicating connection failure due to bad port.
Parameters:
context- CamelContext to query health checks.
Usage:
Called fromtestReportCorrectlyWhenDown().
void kafkaConsumerHealthCheck()
Test Annotation:
@Test,@Order(3),@DisplayName("I/O test to ensure everything is working as expected")Purpose:
Tests that no messages are consumed or processed due to the bad Kafka broker port. Ensures that the route does not receive messages and no headers are propagated.Behavior:
Prepares a mock endpoint to expect zero messages and no headers.
Sends 5
ProducerRecordmessages to the Kafka topic with custom headers.Asserts that the mock endpoint did not receive any messages within 3 seconds.
Throws:
InterruptedExceptionif the test is interrupted during waiting.
Important Implementation Details
Supervising Route Controller:
The test configures theDefaultSupervisingRouteControllerwhich allows Camel to supervise routes, restarting them on failure with backoff strategies. This controller is key to managing routes connected to Kafka brokers that may be temporarily unavailable.Kafka Broker Misconfiguration:
The Kafka component is deliberately configured with an invalid broker port (service.getBootstrapServers() + 123). This simulates failure scenarios where the consumer cannot connect, allowing validation of health check and supervising controller behavior.Health Checks:
Uses Apache Camel's health check framework (HealthCheckHelper) to programmatically check liveness and readiness states of the routes. The distinction between liveness (route/process up) and readiness (route ready to serve) is validated.Awaitility Library:
For asynchronous readiness checking, Awaitility waits up to 20 seconds for the readiness state to become DOWN, accommodating the supervising controller's retry and backoff delays.Mock Endpoint:
Used to assert that no messages are received since the consumer cannot connect to Kafka due to the bad port.
Interactions with Other System Parts
Apache Camel Context:
The test manipulates and verifies the Camel context, routes, and components.Kafka Component:
Directly configures and tests the Kafka component integration within Camel.Supervising Route Controller:
Tests how this controller manages route lifecycle and failure recovery.Health Check Framework:
Utilizes Camel's health check SPI to verify the health state of routes.Kafka Test Utilities (
KafkaTestUtil):
Uses utilities and constants for Kafka testing, e.g., mock endpoint names.Kafka Producer:
Sends test messages to the Kafka topic to simulate actual Kafka traffic (though messages are not received due to bad port).
Usage Example (Test Execution Flow)
Setup:
Configure Camel context with the Kafka component set to an invalid broker port and supervising route controller.Route Creation:
A Kafka consumer route is created consuming fromtest-healthtopic.Health Check - Liveness:
Verify that the route is considered alive (liveness UP), even if Kafka broker is unreachable.Health Check - Readiness:
Verify that the route readiness correctly reports DOWN due to connection failure.Message Consumption:
Send messages to Kafka and assert no messages are consumed or processed.
Mermaid Class Diagram
classDiagram
class KafkaConsumerBadPortSupervisingHealthCheckIT {
+static final String TOPIC
+static final Logger LOG
+void configureContext(CamelContext context)
+RouteBuilder createRouteBuilder()
+void testReportUpWhenIsUp()
+void testReportCorrectlyWhenDown()
+static void readinessCheck(CamelContext context)
+void kafkaConsumerHealthCheck() throws InterruptedException
}
KafkaConsumerBadPortSupervisingHealthCheckIT --|> KafkaHealthCheckTestSupport
Summary
`KafkaConsumerBadPortSupervisingHealthCheckIT` is a critical integration test ensuring robust health monitoring and automatic failure recovery of Kafka consumer routes within Apache Camel. By simulating broker connectivity failure through an invalid port, it validates the Supervising Route Controller's ability to manage route restarts and demonstrates how Camel's health checks accurately reflect route states under failure conditions. This test reinforces system resilience and observability in production environments where Kafka brokers may be intermittently unavailable.