KafkaConsumerHealthCheckIT.java
Overview
`KafkaConsumerHealthCheckIT.java` is an integration test class within the Apache Camel Kafka component module. It focuses on verifying the health check mechanisms for Kafka consumers integrated with Camel routes. Specifically, the class tests the liveness and readiness health checks of Kafka consumers consuming messages from a Kafka topic. The tests ensure that the consumer is correctly processing messages, propagating headers, and reporting health statuses accurately under normal operation and simulated Kafka server shutdown scenarios.
The class extends `KafkaHealthCheckTestSupport`, leveraging a Kafka test infrastructure and Camel's testing utilities. It configures a Kafka consumer route, produces test messages, and validates health check results and message consumption behavior.
Class: KafkaConsumerHealthCheckIT
Purpose
Verifies Kafka consumer health checks (liveness and readiness) in an Apache Camel context.
Validates message processing correctness including header propagation.
Ensures proper health status reporting when Kafka broker is up or down.
Package
`org.apache.camel.component.kafka.integration.health`
Inheritance
Extends:
KafkaHealthCheckTestSupport(likely provides Kafka test service and Camel context setup)
Constants
Name | Type | Description |
|---|---|---|
`TOPIC` | String | Kafka topic used for testing messages (`"test-health"`). |
`SKIPPED_HEADER_KEY` | String | Header key that should be skipped during consumption. |
`PROPAGATED_CUSTOM_HEADER` | String | Header key expected to be propagated to consumers. |
`PROPAGATED_HEADER_VALUE` | byte[] | The value associated with the propagated header. |
Fields
Name | Type | Description |
|---|---|---|
`kafkaAdminClient` | AdminClient | Kafka Admin client instance (static, shared). |
`LOG` | Logger | Logger for diagnostic messages. |
Annotations on Class
@Timeout(30): Each test has a 30-second timeout.@TestInstance(TestInstance.Lifecycle.PER_CLASS): Use a single test instance per class.@TestMethodOrder(MethodOrderer.OrderAnnotation.class): Execute tests in order specified by@Order.@DisabledIfSystemProperty: Disables tests if Kafka instance runs in specific container mode.@Tags({ @Tag("health") }): Tagged for health-related testing.@EnabledOnOs: Enables tests only on specific OS and architectures.
Methods
createRouteBuilder() : RouteBuilder
Purpose: Configures the Camel route that consumes from Kafka topic
test-health.Details:
Sets up a Kafka consumer endpoint with:
Bootstrap servers from test service.
Group ID:
KafkaConsumerHealthCheckIT.Auto offset reset to earliest.
String deserializers for key and value.
Auto commit enabled with 1000ms interval.
Poll timeout set to 1000ms.
Uses
MockConsumerInterceptorfor intercepting consumed messages.
Route logs the consumed message body at trace level.
Sends messages to a mock result endpoint (
KafkaTestUtil.MOCK_RESULT) for assertions.
Usage example:
RouteBuilder routeBuilder = kafkaConsumerHealthCheckIT.createRouteBuilder(); camelContext.addRoutes(routeBuilder);
configureContext(CamelContext context) : void
Purpose: Placeholder for additional Camel context configuration.
Implementation: No operation (
// NO-OP).
testReportUpWhenIsUp() : void
Test: Verifies liveness health check reports state
UPwhen Kafka consumer is running.Details:
Invokes Camel health check liveness.
Asserts all results are
UP.
testReportReadyWhenReady() : void
Test: Verifies readiness health check reports state
UPwhen Kafka consumer is ready.Details:
Uses Awaitility to wait up to 20 seconds for readiness to be
UP.Asserts all readiness health check results are
UP.
testIO() : void
Test: Verifies Kafka consumer I/O behavior, including message consumption and header propagation.
Steps:
Expects to receive 5 messages with bodies
"message-0"to"message-4".Validates that the header
KafkaConstants.LAST_RECORD_BEFORE_COMMITis expected with some null values.Checks that the propagated custom header is present with expected value.
Produces 5 messages to Kafka:
Each message has a key
"1".Each message contains two headers:
CamelSkippedHeader(should be skipped).PropagatedCustomHeader(should be propagated).
Asserts the mock endpoint is satisfied within 3 seconds.
Checks that the interceptor captured all 5 records.
Validates that the skipped header is not present in received messages.
Validates that the propagated header is present.
Usage example:
testIO();
testLivenessWhenDown() : void
Test: Verifies liveness health check still reports
UPeven when Kafka broker is down.Steps:
Shuts down Kafka test service.
Asserts liveness state is
UP(liveness should not fail just because Kafka is down).
testReadinessWhenDown() : void
Test: Verifies readiness health check reports
DOWNwhen Kafka broker is down.Steps:
Shuts down Kafka test service.
Uses Awaitility to wait up to 20 seconds for readiness to report
DOWN.Asserts at least one health check result is
DOWN.Checks the message contains "KafkaConsumer is not ready".
Asserts health check details include the topic and route id.
Important Implementation Details
The tests use Apache Camel's
HealthCheckHelperto invoke health checks for liveness and readiness.Kafka consumer endpoint configuration includes a consumer interceptor (
MockConsumerInterceptor) that tracks consumed records for assertions.Headers are explicitly tested for propagation or being skipped to verify header handling behavior across Kafka and Camel.
Awaitility library is used to handle asynchronous health check state changes.
Kafka producer is created with properties from
KafkaTestUtilthat abstracts Kafka test environment configuration.The tests simulate Kafka broker shutdown by invoking
service.shutdown()on the embedded Kafka service.
Interaction with Other Components
KafkaHealthCheckTestSupport: Provides Kafka test infrastructure and Camel context setup.MockConsumerInterceptor: Intercepts Kafka consumer records for validation.KafkaTestUtil: Utility class providing Kafka properties and constants (e.g., mock endpoint name).Apache Camel Core: Used for route creation, health checks, and endpoint management.
Kafka Clients: Used to produce test messages and manage Kafka admin client.
JUnit 5: Testing framework for organizing and running tests.
Awaitility: For asynchronous wait on health check state changes.
Usage Summary
This test class is intended to be executed as part of the integration test suite to validate that Kafka consumers in Camel routes:
Report correct health statuses for liveness and readiness.
Properly consume and process messages including correct header handling.
Maintain health reporting behavior when Kafka brokers become unavailable.
Visual Diagram
classDiagram
class KafkaConsumerHealthCheckIT {
+static final String TOPIC
+static final String SKIPPED_HEADER_KEY
+static final String PROPAGATED_CUSTOM_HEADER
+static final byte[] PROPAGATED_HEADER_VALUE
-static AdminClient kafkaAdminClient
-static Logger LOG
+RouteBuilder createRouteBuilder()
+void configureContext(CamelContext context)
+void testReportUpWhenIsUp()
+void testReportReadyWhenReady()
+void testIO() throws InterruptedException
+void testLivenessWhenDown()
+void testReadinessWhenDown()
}
KafkaConsumerHealthCheckIT --|> KafkaHealthCheckTestSupport
Example Test Workflow
Setup: Kafka broker started by test support, Camel context initialized.
Route Created: Kafka consumer route for topic
test-healthis added.Produce Messages: Test produces 5 messages with headers to Kafka.
Consume & Validate: Route consumes messages, assertions on body and headers.
Health Checks: Liveness and readiness checks invoked and validated.
Shutdown Kafka: Kafka broker is shut down.
Health Checks After Shutdown: Liveness should stay UP, readiness should go DOWN.
Summary
`KafkaConsumerHealthCheckIT.java` is a robust integration test validating Kafka consumer health checks within Apache Camel routes. It covers message flow correctness, header propagation, and health reporting under normal and failure scenarios, ensuring high reliability and observability of Kafka consumers in the Camel ecosystem.