KafkaConsumerBadPortHealthCheckIT.java
Overview
`KafkaConsumerBadPortHealthCheckIT.java` is an integration test class designed to validate the health check functionality of Apache Camel's Kafka consumer component when configured with an invalid Kafka broker port. The test verifies that the Camel context correctly reports liveness and readiness status under failure conditions induced by a bad port configuration.
The class extends `KafkaHealthCheckTestSupport`, inheriting test support utilities for Kafka-related health checks. It programmatically configures a Kafka consumer endpoint with an invalid port, disables pre-validation to allow context startup, and then verifies health check behavior through JUnit 5 tests.
Detailed Explanation
Package and Imports
Package:
org.apache.camel.component.kafka.integration.healthKey imports:
Apache Camel core (
CamelContext,RouteBuilder,HealthCheck, etc.)Kafka client (
ProducerRecord,RecordHeader)JUnit 5 testing framework (
@Test,@Order,Assertions)Awaitility for asynchronous wait
SLF4J for logging
Class: KafkaConsumerBadPortHealthCheckIT
Description
This is a JUnit 5 integration test class that checks the health status of a Kafka consumer configured with an incorrect broker port. It verifies that:
The liveness probe reports UP when the application context is running.
The readiness probe reports
DOWNwith appropriate messaging about the bad port.The Kafka consumer does not receive any messages due to the bad port, confirming no successful connection.
Annotations
@TestMethodOrder(MethodOrderer.OrderAnnotation.class): Enforces execution order of tests.@TestInstance(TestInstance.Lifecycle.PER_CLASS): Uses a single test class instance for all tests.@DisabledIfSystemProperty: Disables the tests if a specific system property is set (to avoid conflicts with certain Kafka test environments).@Tags({@Tag("health")}): Categorizes tests under "health" for filtering.
Constants
TOPIC: Kafka topic used for testing ("test-health").LOG: SLF4J logger instance for this class.
Fields
@EndpointInject("mock:result") private MockEndpoint to: Camel Mock endpoint for capturing messages sent to the route during tests.
Methods
configureContext(CamelContext context)
Purpose:
Configures the Camel context before routes are created. Sets up a Kafka component with an intentionally incorrect broker port by appending123to the valid bootstrap servers URL.Parameters:
CamelContext context: The Camel runtime context.
Implementation Details:
Sets the properties location to
"ref:prop"(reference to external properties).Creates and initializes a new Kafka component.
Overrides the brokers to a wrong port by appending
123to the bootstrap server string.Disables pre-validation of host and port to allow the application to start despite the bad port.
Adds the Kafka component to the Camel context under the name
"kafka".
Usage:
This method is called automatically before route creation to setup the test environment.
createRouteBuilder()
Purpose:
Defines the Camel route used in this test which consumes messages from the Kafka topic configured with the bad port.Returns:
RouteBuilder: A new anonymous subclass defining the Kafka consumer route.
Route Details:
Source endpoint: Kafka consumer configured with group ID, deserializers, offset reset, commit settings, and a consumer interceptor class.
Processor: Logs the received message body at trace level.
Sink endpoint: A Camel Mock endpoint (
mock:result) for test assertions.
Usage:
Camel invokes this method to deploy the route in the context during test startup.
testReportUpWhenIsUp()
Purpose:
Verifies that the Camel context's liveness health check reports UP when the system is actually running.Test Details:
Retrieves the Camel context.
Invokes liveness health checks via
HealthCheckHelper.invokeLiveness().Asserts all health check results indicate state UP.
Annotations:
@Test,@Order(1),@DisplayName("Tests that liveness reports UP when it's actually up")
Usage Example:
This test ensures that the health endpoint correctly reports alive status despite Kafka consumer failing to connect.
testReportCorrectlyWhenDown()
Purpose:
Verifies that the readiness health check reportsDOWNwhen the Kafka consumer is not able to connect due to the bad port.Test Details:
Waits up to 20 seconds for readiness to settle using Awaitility.
Invokes readiness health checks via
HealthCheckHelper.invokeReadiness().Asserts all health checks report
DOWN.Validates that the error messages contain the word
"port"indicating the cause.
Annotations:
@Test,@Order(2),@DisplayName("Tests that readiness reports down when it's actually down")
Usage Example:
This test confirms that readiness probes detect and report the Kafka consumer's inability to connect.
kafkaConsumerHealthCheck()
Purpose:
Tests that no messages are consumed or received by the Kafka consumer when configured with a bad port.Test Details:
Defines a custom header key and value to simulate message headers.
Sets expectations on the mock endpoint for no messages or headers received.
Produces 5 messages with headers to the Kafka topic.
Asserts that the mock endpoint receives nothing within 3000ms.
Annotations:
@Test,@Order(3)
Usage Example:
This test simulates message sending to Kafka and verifies no consumption occurs because of the bad port.
Important Implementation Details
Bad Port Injection:
The key aspect is setting the Kafka component's broker list to an invalid port by appending123to the bootstrap servers string. This simulates a network failure scenario.Disabling Pre-validation:
Normally, Camel Kafka component validates host and port before startup. This is disabled (setPreValidateHostAndPort(false)) to allow context startup despite invalid configuration, so the health checks can be tested dynamically.Health Checks Usage:
Uses Camel'sHealthCheckHelperto programmatically invoke liveness and readiness probes and analyze their states.Awaitility for Async Assertions:
Readiness check uses Awaitility to wait until the health check reportsDOWN, accommodating the asynchronous nature of Kafka connection attempts.Mock Endpoint for Message Assertions:
Ensures no messages are received by the consumer due to the bad port, enforcing the negative test case.
Interaction with Other System Components
Kafka Broker:
The test interacts with a Kafka broker service (service.getBootstrapServers()), but intentionally misconfigures the broker port to simulate failure.Camel Context and Kafka Component:
This test manipulates the Camel context configuration, especially the Kafka component, to test health check behavior.Health Check Framework:
Uses Camel's health check framework to verify runtime health status of the Kafka consumer endpoint.Testing Infrastructure:
Relies on JUnit 5, Camel test infrastructure, and Awaitility for asynchronous testing.
Usage Example
// Example to run the full test suite
public class RunKafkaHealthCheckTests {
public static void main(String[] args) {
KafkaConsumerBadPortHealthCheckIT test = new KafkaConsumerBadPortHealthCheckIT();
// Initialize Camel Context and configure with bad port
CamelContext context = new DefaultCamelContext();
test.configureContext(context);
// Create routes and start context
context.addRoutes(test.createRouteBuilder());
context.start();
// Run health check tests (would normally be run by JUnit)
test.testReportUpWhenIsUp();
test.testReportCorrectlyWhenDown();
test.kafkaConsumerHealthCheck();
context.stop();
}
}
Mermaid Class Diagram
classDiagram
class KafkaConsumerBadPortHealthCheckIT {
+static final String TOPIC
+configureContext(CamelContext context) void
+createRouteBuilder() RouteBuilder
+testReportUpWhenIsUp() void
+testReportCorrectlyWhenDown() void
+kafkaConsumerHealthCheck() void
}
KafkaConsumerBadPortHealthCheckIT --|> KafkaHealthCheckTestSupport
Summary
`KafkaConsumerBadPortHealthCheckIT.java` is a focused integration test class that validates Apache Camel Kafka consumer health checks under failure conditions caused by a misconfigured Kafka broker port. It ensures the Camel context’s health reporting mechanisms correctly signal readiness and liveness status, and verifies no message consumption occurs due to connection failure. This test helps maintain robustness in Kafka integration by early detection of connectivity issues via health checks.