KafkaTestUtil.java
Overview
`KafkaTestUtil.java` is a utility class designed to facilitate integration testing with Apache Kafka in the context of the Apache Camel Kafka component. It provides helper methods for configuring Kafka clients and components, managing Kafka topics, and setting up test environments that interact with Kafka clusters. This class is primarily used in integration tests to abstract and simplify repetitive Kafka setup tasks such as creating admin clients, configuring producer properties, and creating topics with specific partition counts.
This utility class encapsulates key Kafka administrative functions and Camel component configurations, enabling consistent and reliable test setups when working with Kafka within the Apache Camel testing infrastructure.
Classes and Methods
Class: KafkaTestUtil
A final utility class with static methods and constants. It cannot be instantiated or extended.
Constants
Constant Name | Type | Description |
|---|---|---|
`MOCK_RESULT` | String | Endpoint name `"mock:result"` used in tests as a mock consumer endpoint. |
`MOCK_RESULT_BAR` | String | Endpoint name `"mock:resultBar"` for alternate mock consumers. |
`MOCK_DLQ` | String | Endpoint name `"mock:dlq"` representing a dead letter queue in tests. |
Methods
private KafkaTestUtil()
Description: Private constructor to prevent instantiation of this utility class.
public static void setServiceProperties(KafkaService service)
Purpose: Sets a system property
bootstrapServerswith the bootstrap server addresses of the provided KafkaService instance.Parameters:
service: An instance ofKafkaService, representing a Kafka cluster for testing.
Returns:
voidUsage Example:
KafkaTestUtil.setServiceProperties(kafkaService); String bootstrap = System.getProperty("bootstrapServers");Details: Logs the Kafka cluster broker list and sets the system property for use in tests or components that rely on this property.
public static AdminClient createAdminClient(KafkaService service)
Purpose: Creates and returns a Kafka
AdminClientconnected to the Kafka cluster.Parameters:
service:KafkaServiceinstance representing the Kafka cluster.
Returns:
AdminClientinstance configured with the Kafka bootstrap servers.Usage Example:
AdminClient adminClient = KafkaTestUtil.createAdminClient(kafkaService);Details: Uses the
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIGproperty to point to the Kafka cluster. The returnedAdminClientcan perform cluster and topic administrative operations.
public static Properties getDefaultProperties(String bootstrapService)
Purpose: Generates a default set of Kafka producer properties for connecting to a given Kafka bootstrap server.
Parameters:
bootstrapService:Stringspecifying Kafka bootstrap server addresses.
Returns:
Propertiesobject with default Kafka producer configurations.Usage Example:
Properties props = KafkaTestUtil.getDefaultProperties("localhost:9092");Details:
Sets key and value serializers to Kafka's default serializer.
Sets
acksconfiguration to"1"for leader acknowledgment.Logs the connection target.
public static Properties getDefaultProperties(KafkaService service)
Purpose: Overloaded method that returns default Kafka producer properties using a
KafkaServiceinstance.Parameters:
service:KafkaServiceinstance.
Returns:
Propertiesconfigured for the Kafka bootstrap servers provided by the service.Usage Example:
Properties props = KafkaTestUtil.getDefaultProperties(kafkaService);Details: Calls the above method, passing the bootstrap servers obtained from the
KafkaService.
public static void configureKafkaComponent(CamelContext context, String bootstrapServers)
Purpose: Configures and adds a Kafka component to the provided Apache Camel context, pointing it to the Kafka cluster.
Parameters:
context:CamelContextinstance where the Kafka component will be registered.bootstrapServers:StringKafka bootstrap server addresses.
Returns:
voidUsage Example:
KafkaTestUtil.configureKafkaComponent(camelContext, "localhost:9092");Details:
Sets the properties component location for Camel to
"ref:prop".Creates and initializes a
KafkaComponent.Sets the brokers list on the Kafka component configuration.
Adds the configured Kafka component to the Camel context under the name
"kafka".Logs the bootstrap address being configured.
public static void createTopic(KafkaService service, String topic, int numPartitions)
Purpose: Creates a Kafka topic with a specified number of partitions, verifying its creation.
Parameters:
service:KafkaServiceinstance to connect to Kafka.topic:Stringname of the topic to create.numPartitions:intnumber of partitions to create for the topic.
Returns:
voidUsage Example:
KafkaTestUtil.createTopic(kafkaService, "myTopic", 3);Details:
Uses
AdminClientto create a topic with no replication (replication factor set to 0 usingCreateTopicsRequest.NO_REPLICATION_FACTOR).After topic creation, retrieves the topic description asynchronously.
Waits up to 5 seconds for the topic description to verify the number of partitions.
Uses JUnit assertions to fail the test if the partition count does not match or if an exception occurs.
Implementation Details and Algorithms
AdminClient Creation: Uses Kafka's
KafkaAdminClient.create(Properties)to create administrative clients for topic management.Topic Creation: Uses
NewTopicclass with specified partitions and no replication factor (likely because the test Kafka cluster runs with a single broker or for simplicity).Verification of Topic: Verifies the topic creation synchronously by waiting on a
KafkaFuturewith a timeout.Camel Kafka Component Configuration: Instantiates and initializes the Kafka component programmatically, which is important for integration tests to ensure the component uses the test Kafka cluster bootstrap addresses.
Logging: Uses SLF4J logger to provide runtime information about bootstrap servers and configuration steps.
Interaction with Other System Components
KafkaService: This class depends on
KafkaService, which represents an embedded or test Kafka cluster instance, providing the bootstrap server address and Kafka lifecycle management. It interacts with this class to obtain connection details.Apache Camel: The utility configures
KafkaComponentinstances in the Camel context, allowing Camel routes and tests to produce and consume Kafka messages.Kafka Clients (AdminClient and Producer): Uses Kafka client APIs (
AdminClient,NewTopic,TopicDescription) to perform cluster administration and topic management as part of the test setup.JUnit: Uses JUnit Jupiter assertions (
assertEquals,fail) to verify conditions during topic creation, ensuring test failures reflect Kafka setup issues early.Logging Framework: Uses SLF4J for logging key steps and debugging information.
Usage Scenario Example
A typical usage scenario in an integration test might look like this:
KafkaService kafkaService = ...; // Initialized test Kafka cluster
KafkaTestUtil.setServiceProperties(kafkaService);
Properties producerProps = KafkaTestUtil.getDefaultProperties(kafkaService);
KafkaTestUtil.createTopic(kafkaService, "test-topic", 3);
CamelContext context = new DefaultCamelContext();
KafkaTestUtil.configureKafkaComponent(context, kafkaService.getBootstrapServers());
// Camel routes can now use "kafka:test-topic" with the configured Kafka component and properties.
Mermaid Diagram
The following diagram illustrates the structure of the `KafkaTestUtil` utility class, focusing on its static methods and constants.
classDiagram
class KafkaTestUtil {
<<utility>>
+String MOCK_RESULT
+String MOCK_RESULT_BAR
+String MOCK_DLQ
+void setServiceProperties(KafkaService service)
+AdminClient createAdminClient(KafkaService service)
+Properties getDefaultProperties(String bootstrapService)
+Properties getDefaultProperties(KafkaService service)
+void configureKafkaComponent(CamelContext context, String bootstrapServers)
+void createTopic(KafkaService service, String topic, int numPartitions)
}
Summary
`KafkaTestUtil.java` is a focused utility class tailored for Kafka integration testing within Apache Camel. It abstracts Kafka cluster connection setup, topic creation, and Camel Kafka component configuration, enabling test code to be cleaner, more maintainable, and less error-prone. Its use of Kafka AdminClient and Camel APIs demonstrates best practices in automated Kafka environment setup for reliable and repeatable integration tests.