KafkaConsumerUnresolvableHealthCheckIT.java
Overview
`KafkaConsumerUnresolvableHealthCheckIT.java` is an integration test class designed to verify the health check functionality of a Kafka consumer within an Apache Camel context. Specifically, it tests how Camel's health check framework reports the liveness and readiness of Kafka consumers when the Kafka broker endpoint is intentionally misconfigured (unresolvable). This enables validation that the health check system correctly reports UP when the consumer is healthy and DOWN when the consumer cannot connect to Kafka.
The test class extends a base support class (`KafkaHealthCheckTestSupport`) and configures a Kafka component with an invalid broker address to simulate failure scenarios. It defines Kafka consumer routes and performs health checks using Apache Camel’s health check API, ensuring the system behaves as expected under both normal and failure conditions.
Detailed Explanation
Package and Imports
Package:
org.apache.camel.component.kafka.integration.healthKey imports include Apache Camel core (
CamelContext,RouteBuilder), Kafka client classes (ProducerRecord,RecordHeader), JUnit 5 testing framework, and Apache Camel health check utilities.
Class: KafkaConsumerUnresolvableHealthCheckIT
Purpose
This class tests the health check system for Kafka consumers when the broker address is deliberately misconfigured to an unresolvable host. It ensures that:
The liveness health check reports UP when the consumer is operational.
The readiness health check reports DOWN when the consumer cannot connect to Kafka.
Kafka consumer correctly processes messages when configured.
Annotations
@TestMethodOrder(MethodOrderer.OrderAnnotation.class): Ensures tests run in a specified order.@TestInstance(TestInstance.Lifecycle.PER_CLASS): Uses a single test instance for all tests.@DisabledIfSystemProperty: Disables tests if Kafka is running in local Strimzi container mode due to concurrency issues.@Tags(@Tag("health")): Categorizes tests under the "health" tag.
Constants
Name | Type | Description |
|---|---|---|
TOPIC | String | Kafka topic name used for testing. |
public static final String TOPIC = "test-health";
Overridden Methods
configureContext(CamelContext context)
Configures the Camel context for the test:
Sets properties location.
Creates and initializes a Kafka component with a deliberately incorrect broker address (
localhostreplaced bylocaIhost).Disables host/port pre-validation to allow startup despite invalid config.
Registers the Kafka component under the name
"kafka"in the Camel context.
**Parameters:**
CamelContext context: The Camel runtime context to configure.
**Usage:**
This method is marked with `@ContextFixture` and is called as part of test setup to inject the faulty Kafka configuration.
createRouteBuilder()
Creates a Camel route builder that defines the Kafka consumer route:
Consumes messages from the Kafka
test-healthtopic.Uses specific Kafka consumer properties such as group ID, deserializers, auto offset reset, auto commit, and a mock interceptor.
Processes each message by logging it at trace level.
Sends the message to a test endpoint (
to).
**Returns:**
RouteBuilderinstance defining the route.
Test Methods
testReportUpWhenIsUp()
Order: 1
Description: Verifies that the liveness health check reports
UPwhen the consumer is actually considered healthy.Behavior: Invokes Camel's liveness health checks and asserts all results are
UP.
Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
Assertions.assertTrue(up, "liveness check");
testReportCorrectlyWhenDown()
Order: 2
Description: Verifies that the readiness health check reports
DOWNwhen the consumer cannot resolve the Kafka broker.Behavior: Uses Awaitility to wait up to 20 seconds for the readiness check to reflect
DOWNstatus, indicating failure to connect.
await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> readinessCheck(context));
Calls helper method
readinessCheckto assert readiness isDOWNand message contains "bootstrap".
readinessCheck(CamelContext context) (private static)
Performs readiness health check assertions:
Invokes readiness checks.
Asserts all checks report
DOWN.Checks that messages contain the word
"bootstrap"indicating failure to connect to Kafka bootstrap servers.
kafkaConsumerHealthCheck()
Order: 3
Description: An end-to-end I/O test that sends Kafka messages with headers, including a custom propagated header, to verify the consumer processes messages as expected.
Behavior:
Sends 5 messages to the Kafka topic.
Each message includes headers: one to be skipped, one custom propagated header.
Uses assertions on the test endpoint (
to) to verify message receipt and header behavior.
Important Implementation Details
Faulty Kafka Configuration: The test deliberately replaces
"localhost"with"locaIhost"in the broker list to simulate an unresolvable Kafka broker endpoint. This tests the readiness health check's ability to detect down states.Health Check Invocation: Uses
HealthCheckHelper.invokeLiveness()andHealthCheckHelper.invokeReadiness()to programmatically trigger health checks.Awaitility: Utilized to asynchronously wait for the readiness state to become
DOWN, accommodating Kafka client connection timeouts.Custom Headers in Kafka Messages: Demonstrates Kafka header propagation and filtering in integration tests.
Disabling Pre-Validation: Disabling host/port pre-validation on Kafka component allows the context to start even with invalid broker configuration, enabling health check testing.
Interaction with Other Parts of the System
Apache Camel: This test class interacts heavily with the Apache Camel framework, especially the Kafka component and health check system.
Kafka Broker: Simulates interaction with Kafka brokers by sending and receiving messages through Kafka topics.
KafkaHealthCheckTestSupport: Inherits from a base test support class that presumably provides helpers like
producer,to(mock endpoints), and possibly manages Kafka lifecycle.Health Check Framework: Uses Camel's health check APIs to verify the liveness and readiness states of Kafka consumers.
Test Infrastructure: The test is disabled when using local Strimzi containers due to concurrency conflicts, indicating integration with containerized Kafka test environments.
Usage Examples
Running the Tests
These tests are designed to be executed as part of an integration test suite using JUnit 5.
mvn test -Dtest=KafkaConsumerUnresolvableHealthCheckIT
Expected Behavior
The first test confirms that when the Kafka consumer is configured (even with faulty broker), the liveness health check reports UP.
The second test waits and confirms readiness reports DOWN because the broker is unreachable.
The third test sends messages to Kafka and verifies processing behavior through Camel routes.
Mermaid Class Diagram
classDiagram
class KafkaConsumerUnresolvableHealthCheckIT {
<<Test Class>>
+static final String TOPIC
+void configureContext(CamelContext context)
+RouteBuilder createRouteBuilder()
+void testReportUpWhenIsUp()
+void testReportCorrectlyWhenDown()
-static void readinessCheck(CamelContext context)
+void kafkaConsumerHealthCheck() throws InterruptedException
}
KafkaConsumerUnresolvableHealthCheckIT --|> KafkaHealthCheckTestSupport
Summary
`KafkaConsumerUnresolvableHealthCheckIT.java` is a targeted integration test class that validates Apache Camel's Kafka consumer health check reporting under failure conditions caused by unresolvable Kafka brokers. It verifies both liveness and readiness health states and performs message processing tests, ensuring the Camel Kafka component’s robustness and observability in degraded network scenarios. The class leverages Camel’s health check utilities, Kafka client APIs, and JUnit 5 testing features to accomplish these goals.