KafkaAdminUtil.java
Overview
`KafkaAdminUtil.java` is a utility class designed to facilitate administration-related operations on Apache Kafka clusters within the Apache Camel Kafka integration testing environment. This class provides helper methods to create Kafka `AdminClient` instances (both standard and SASL-authenticated), retrieve consumer group details, and assert the presence of consumer groups in a Kafka cluster.
Primarily intended for use in integration tests, `KafkaAdminUtil` abstracts common Kafka Admin operations, simplifying test setup and validation when interacting with Kafka clusters managed by different Kafka services (e.g., plain or authenticated Kafka containers).
Class: KafkaAdminUtil
A final utility class containing static methods to interact with Kafka's AdminClient API. It cannot be instantiated.
Constructor
private KafkaAdminUtil()
Purpose: Private constructor to prevent instantiation of this utility class.
Methods
createAdminClient
public static AdminClient createAdminClient(KafkaService service)
Purpose: Creates a standard (non-authenticated) Kafka
AdminClientconnected to the Kafka cluster specified by the givenKafkaService.Parameters:
service(KafkaService): The Kafka service instance providing the bootstrap servers' address.
Returns:
AdminClient— A Kafka AdminClient configured to communicate with the Kafka cluster.Usage Example:
KafkaService kafkaService = ...; // Initialized Kafka service
AdminClient adminClient = KafkaAdminUtil.createAdminClient(kafkaService);
// Use adminClient to perform admin operations
Implementation Details:
The method creates aPropertiesobject and sets thebootstrap.serversconfiguration with the Kafka service's bootstrap servers. Then it creates a KafkaAdminClient using these properties.
getConsumerGroupInfo
public static Map<String, ConsumerGroupDescription> getConsumerGroupInfo(String groupId, AdminClient kafkaAdminClient)
throws InterruptedException, ExecutionException, TimeoutException
Purpose: Retrieves detailed information about a specific Kafka consumer group from the Kafka cluster.
Parameters:
groupId(String): The consumer group ID to query.kafkaAdminClient(AdminClient): The Kafka AdminClient instance used to communicate with the Kafka cluster.
Returns:
Map<String, ConsumerGroupDescription>— A map containing the group ID as the key and its description as the value.Throws:
InterruptedExceptionExecutionExceptionTimeoutException
Usage Example:
AdminClient adminClient = KafkaAdminUtil.createAdminClient(kafkaService);
try {
Map<String, ConsumerGroupDescription> groupInfo = KafkaAdminUtil.getConsumerGroupInfo("my-group", adminClient);
ConsumerGroupDescription description = groupInfo.get("my-group");
// Process description
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
Implementation Details:
UsesAdminClient.describeConsumerGroupsAPI to asynchronously fetch the group description for the specified group ID, waiting up to 30 seconds for the operation to complete.
assertGroupIsConnected
public static void assertGroupIsConnected(String groupId, AdminClient kafkaAdminClient)
Purpose: Asserts (in testing context) that a Kafka consumer group with the specified ID is present and connected in the Kafka cluster.
Parameters:
groupId(String): The consumer group ID to check.kafkaAdminClient(AdminClient): The Kafka AdminClient used to query the cluster.
Returns: Void
Behavior:
Retrieves consumer group info for the specified group ID.
Asserts that there is at least one group with the given ID.
Specifically checks that the group
"KafkaConsumerAuthIT"exists and is not null.
Usage Example:
AdminClient adminClient = KafkaAdminUtil.createAdminClient(kafkaService);
KafkaAdminUtil.assertGroupIsConnected("KafkaConsumerAuthIT", adminClient);
Implementation Details:
Uses JUnit assertions to verify group presence. The method usesassertDoesNotThrowto safely callgetConsumerGroupInfowithout propagating checked exceptions during assertion.Note:
The method is hardcoded to check for the group"KafkaConsumerAuthIT"regardless of thegroupIdparameter, which may be an implementation oversight or intentional test-specific behavior.
createAuthAdminClient
public static AdminClient createAuthAdminClient(KafkaService service)
Purpose: Creates a Kafka
AdminClientconfigured for SASL/PLAIN authentication to connect to a secured Kafka cluster.Parameters:
service(KafkaService): The Kafka service instance providing the bootstrap servers' address and authentication info.
Returns:
AdminClient— A Kafka AdminClient configured with SASL authentication.Usage Example:
KafkaService kafkaService = ...; // Authenticated Kafka service
AdminClient authAdminClient = KafkaAdminUtil.createAuthAdminClient(kafkaService);
// Use authAdminClient to perform admin operations with authentication
Implementation Details:
Sets the following properties on the Kafka AdminClient:bootstrap.serversto connect to the Kafka cluster.security.protocolto"SASL_PLAINTEXT"for SASL authentication over plaintext.sasl.mechanismto"PLAIN".sasl.jaas.configto a JAAS configuration string generated byContainerLocalAuthKafkaService.generateSimpleSaslJaasConfigwith hardcoded username"admin"and password"admin-secret".
This setup enables the AdminClient to authenticate against Kafka brokers requiring SASL/PLAIN.
Implementation Details and Algorithms
The utility class leverages Kafka's
AdminClientAPI to perform administrative queries asynchronously.It uses Java concurrency constructs such as
Future.get(timeout, TimeUnit)to block on results with a timeout.The SASL authentication setup relies on static JAAS config generation provided by
ContainerLocalAuthKafkaService.JUnit 5's assertion utilities are used to integrate with test frameworks, ensuring that exceptions during admin calls are properly handled in tests.
Interaction with Other System Components
KafkaService Interface:
The utility depends onKafkaServiceimplementations to provide Kafka cluster connection details, such as bootstrap servers. This allows flexibility to use different Kafka environments (e.g., local containers, remote clusters).ContainerLocalAuthKafkaService:
Used specifically increateAuthAdminClientto generate SASL JAAS configuration strings for authentication. This implies a dependency on a service that manages secured Kafka container setups.JUnit Testing Framework:
The assertions inassertGroupIsConnectedindicate that this class is primarily used in integration tests to verify Kafka consumer group connectivity.Kafka Admin API:
Directly uses Kafka's AdminClient and related classes to query consumer group metadata.
Visual Diagram: Flowchart of KafkaAdminUtil Static Methods and Their Relationships
flowchart TD
A[KafkaAdminUtil] --> B[createAdminClient(service)]
A --> C[getConsumerGroupInfo(groupId, kafkaAdminClient)]
A --> D[assertGroupIsConnected(groupId, kafkaAdminClient)]
A --> E[createAuthAdminClient(service)]
D --> C
B -->|returns| kafkaAdminClient1[AdminClient]
E -->|returns| kafkaAdminClient2[AdminClient]
C -->|returns| consumerGroupInfo[Map<String, ConsumerGroupDescription>]
classDef util fill:#f9f,stroke:#333,stroke-width:2px;
class A util
Summary
`KafkaAdminUtil.java` is a concise and focused utility class tailored to simplify Kafka Admin client creation and consumer group information retrieval in the context of Apache Camel Kafka integration tests. It provides both unauthenticated and SASL-authenticated AdminClient creation, enabling flexible testing against different Kafka cluster configurations. The class supports test assertions to verify the existence and connectivity of Kafka consumer groups, facilitating robust integration test development.
This class abstracts Kafka AdminClient boilerplate and error handling, promoting cleaner test code and easier maintenance of Kafka-related test utilities within the project.