KafkaConsumerHealthCheckIT.java

Overview

`KafkaConsumerHealthCheckIT.java` is an integration test class within the Apache Camel Kafka component module. It focuses on verifying the health check mechanisms for Kafka consumers integrated with Camel routes. Specifically, the class tests the liveness and readiness health checks of Kafka consumers consuming messages from a Kafka topic. The tests ensure that the consumer is correctly processing messages, propagating headers, and reporting health statuses accurately under normal operation and simulated Kafka server shutdown scenarios.

The class extends `KafkaHealthCheckTestSupport`, leveraging a Kafka test infrastructure and Camel's testing utilities. It configures a Kafka consumer route, produces test messages, and validates health check results and message consumption behavior.


Class: KafkaConsumerHealthCheckIT

Purpose

Package

`org.apache.camel.component.kafka.integration.health`

Inheritance


Constants

Name

Type

Description

`TOPIC`

String

Kafka topic used for testing messages (`"test-health"`).

`SKIPPED_HEADER_KEY`

String

Header key that should be skipped during consumption.

`PROPAGATED_CUSTOM_HEADER`

String

Header key expected to be propagated to consumers.

`PROPAGATED_HEADER_VALUE`

byte[]

The value associated with the propagated header.


Fields

Name

Type

Description

`kafkaAdminClient`

AdminClient

Kafka Admin client instance (static, shared).

`LOG`

Logger

Logger for diagnostic messages.


Annotations on Class


Methods

createRouteBuilder() : RouteBuilder


configureContext(CamelContext context) : void


testReportUpWhenIsUp() : void


testReportReadyWhenReady() : void


testIO() : void


testLivenessWhenDown() : void


testReadinessWhenDown() : void


Important Implementation Details


Interaction with Other Components


Usage Summary

This test class is intended to be executed as part of the integration test suite to validate that Kafka consumers in Camel routes:


Visual Diagram

classDiagram
    class KafkaConsumerHealthCheckIT {
        +static final String TOPIC
        +static final String SKIPPED_HEADER_KEY
        +static final String PROPAGATED_CUSTOM_HEADER
        +static final byte[] PROPAGATED_HEADER_VALUE
        -static AdminClient kafkaAdminClient
        -static Logger LOG
        +RouteBuilder createRouteBuilder()
        +void configureContext(CamelContext context)
        +void testReportUpWhenIsUp()
        +void testReportReadyWhenReady()
        +void testIO() throws InterruptedException
        +void testLivenessWhenDown()
        +void testReadinessWhenDown()
    }
    KafkaConsumerHealthCheckIT --|> KafkaHealthCheckTestSupport

Example Test Workflow

  1. Setup: Kafka broker started by test support, Camel context initialized.

  2. Route Created: Kafka consumer route for topic test-health is added.

  3. Produce Messages: Test produces 5 messages with headers to Kafka.

  4. Consume & Validate: Route consumes messages, assertions on body and headers.

  5. Health Checks: Liveness and readiness checks invoked and validated.

  6. Shutdown Kafka: Kafka broker is shut down.

  7. Health Checks After Shutdown: Liveness should stay UP, readiness should go DOWN.


Summary

`KafkaConsumerHealthCheckIT.java` is a robust integration test validating Kafka consumer health checks within Apache Camel routes. It covers message flow correctness, header propagation, and health reporting under normal and failure scenarios, ensuring high reliability and observability of Kafka consumers in the Camel ecosystem.