KafkaAdminUtil.java

Overview

`KafkaAdminUtil.java` is a utility class designed to facilitate administration-related operations on Apache Kafka clusters within the Apache Camel Kafka integration testing environment. This class provides helper methods to create Kafka `AdminClient` instances (both standard and SASL-authenticated), retrieve consumer group details, and assert the presence of consumer groups in a Kafka cluster.

Primarily intended for use in integration tests, `KafkaAdminUtil` abstracts common Kafka Admin operations, simplifying test setup and validation when interacting with Kafka clusters managed by different Kafka services (e.g., plain or authenticated Kafka containers).


Class: KafkaAdminUtil

A final utility class containing static methods to interact with Kafka's AdminClient API. It cannot be instantiated.

Constructor

private KafkaAdminUtil()

Methods


createAdminClient

public static AdminClient createAdminClient(KafkaService service)
KafkaService kafkaService = ...; // Initialized Kafka service
AdminClient adminClient = KafkaAdminUtil.createAdminClient(kafkaService);
// Use adminClient to perform admin operations

getConsumerGroupInfo

public static Map<String, ConsumerGroupDescription> getConsumerGroupInfo(String groupId, AdminClient kafkaAdminClient)
    throws InterruptedException, ExecutionException, TimeoutException
AdminClient adminClient = KafkaAdminUtil.createAdminClient(kafkaService);
try {
    Map<String, ConsumerGroupDescription> groupInfo = KafkaAdminUtil.getConsumerGroupInfo("my-group", adminClient);
    ConsumerGroupDescription description = groupInfo.get("my-group");
    // Process description
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

assertGroupIsConnected

public static void assertGroupIsConnected(String groupId, AdminClient kafkaAdminClient)
AdminClient adminClient = KafkaAdminUtil.createAdminClient(kafkaService);
KafkaAdminUtil.assertGroupIsConnected("KafkaConsumerAuthIT", adminClient);

createAuthAdminClient

public static AdminClient createAuthAdminClient(KafkaService service)
KafkaService kafkaService = ...; // Authenticated Kafka service
AdminClient authAdminClient = KafkaAdminUtil.createAuthAdminClient(kafkaService);
// Use authAdminClient to perform admin operations with authentication

Implementation Details and Algorithms


Interaction with Other System Components


Visual Diagram: Flowchart of KafkaAdminUtil Static Methods and Their Relationships

flowchart TD
    A[KafkaAdminUtil] --> B[createAdminClient(service)]
    A --> C[getConsumerGroupInfo(groupId, kafkaAdminClient)]
    A --> D[assertGroupIsConnected(groupId, kafkaAdminClient)]
    A --> E[createAuthAdminClient(service)]

    D --> C
    B -->|returns| kafkaAdminClient1[AdminClient]
    E -->|returns| kafkaAdminClient2[AdminClient]

    C -->|returns| consumerGroupInfo[Map<String, ConsumerGroupDescription>]

    classDef util fill:#f9f,stroke:#333,stroke-width:2px;
    class A util

Summary

`KafkaAdminUtil.java` is a concise and focused utility class tailored to simplify Kafka Admin client creation and consumer group information retrieval in the context of Apache Camel Kafka integration tests. It provides both unauthenticated and SASL-authenticated AdminClient creation, enabling flexible testing against different Kafka cluster configurations. The class supports test assertions to verify the existence and connectivity of Kafka consumer groups, facilitating robust integration test development.

This class abstracts Kafka AdminClient boilerplate and error handling, promoting cleaner test code and easier maintenance of Kafka-related test utilities within the project.