KafkaEndpoint.java
Overview
`KafkaEndpoint.java` is a core class in the Apache Camel Kafka component, responsible for representing an endpoint that can send and receive messages to/from an Apache Kafka broker. It acts as a bridge between Camel's routing engine and Kafka's producer and consumer clients.
The class encapsulates Kafka-specific configurations and provides factory methods to create Kafka producers and consumers. It also supports advanced features such as custom Kafka client factories, manual commit strategies for consumers, and integration with Camel's executor service management for thread pool creation.
Key responsibilities include:
Managing Kafka configuration parameters.
Creating and configuring Kafka producers and consumers.
Supporting multiple consumers on the same endpoint.
Providing metadata about the Kafka service (broker URLs, client IDs).
Handling class loading for Kafka serializers, deserializers, partitioners, and SASL callback handlers.
This class extends `DefaultEndpoint` and implements `MultipleConsumersSupport` and `EndpointServiceLocation` interfaces, integrating tightly with the Camel framework.
Class: KafkaEndpoint
public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, EndpointServiceLocation
Description
Represents a Kafka endpoint in Apache Camel, allowing routes to produce to or consume from Kafka topics.
Key Fields
Field | Type | Description |
|---|---|---|
`configuration` | `KafkaConfiguration` | Holds all the Kafka-specific configuration options such as brokers, topic, client ID, etc. |
`kafkaClientFactory` | `KafkaClientFactory` | Optional factory to create Kafka consumer and producer instances, allowing for custom clients. |
`kafkaManualCommitFactory` | `KafkaManualCommitFactory` | Optional factory to create Kafka manual commit handlers for consumers requiring manual commits. |
Constructor Summary
Constructor | Description |
|---|---|
`KafkaEndpoint()` | Default no-arg constructor. |
`KafkaEndpoint(String endpointUri, KafkaComponent component)` | Creates an endpoint with URI and parent component. |
Important Methods
KafkaComponent getComponent()
Returns the parent
KafkaComponentfor this endpoint.Overrides to cast from the generic
Componenttype.
String getServiceUrl()
Returns the Kafka brokers URL(s) configured for this endpoint.
Used for service location purposes.
String getServiceProtocol()
Returns the protocol string
"kafka".
Map<String, String> getServiceMetadata()
Returns metadata about the service, such as the client ID if configured.
KafkaConfiguration getConfiguration()
Returns the current Kafka configuration object.
void setConfiguration(KafkaConfiguration configuration)
Sets the Kafka configuration.
KafkaClientFactory getKafkaClientFactory()
Returns the Kafka client factory if set.
void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory)
Sets a custom Kafka client factory to override the default Kafka clients.
KafkaManualCommitFactory getKafkaManualCommitFactory()
Returns the factory used to create manual commit handlers.
void setKafkaManualCommitFactory(KafkaManualCommitFactory kafkaManualCommitFactory)
Sets a custom manual commit factory.
void doBuild() throws Exception
Lifecycle method called during endpoint build.
Ensures that if the client factories are not set, they are inherited from the parent component.
Consumer createConsumer(Processor processor) throws Exception
Creates a new
KafkaConsumerbound to this endpoint and configured with the providedProcessor.Configures the consumer with endpoint settings.
**Usage Example:**
Processor processor = exchange -> {
// Processing logic
};
Consumer consumer = kafkaEndpoint.createConsumer(processor);
Producer createProducer() throws Exception
Creates a new
KafkaProducerfor this endpoint.If configured for synchronous operation, wraps the producer in a
SynchronousDelegateProducer.
**Usage Example:**
Producer producer = kafkaEndpoint.createProducer();
// Use the producer to send messages
boolean isMultipleConsumersSupported()
Returns
trueindicating that this endpoint supports multiple concurrent consumers.
<T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type)
Utility method to resolve a class instance from an object that is either a Class or a String representing the class name.
Tries multiple class loaders for resolution.
Used primarily to load Kafka serializer/deserializer classes dynamically.
void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type)
Helper to replace a property value (string or class) in Kafka client properties with an actual
Classinstance.
void updateClassProperties(Properties props)
Updates Kafka client properties by resolving configured serializer, deserializer, partitioner, and SASL callback handler classes.
Uses Camel's
ClassResolverfor flexible class loading.Exceptions during resolution are logged but ignored to allow Kafka client's own error handling.
ExecutorService createExecutor(Object source)
Creates a fixed thread pool executor for the Kafka consumer using Camel's executor service manager.
Thread pool size is based on the configured number of consumers.
ExecutorService createProducerExecutor(Object source)
Creates a thread pool executor for Kafka producers based on configured core and max pool sizes.
protected KafkaProducer createProducer(KafkaEndpoint endpoint)
Factory method to instantiate a
KafkaProducer.Can be overridden by subclasses for custom producer creation.
Implementation Details and Algorithms
Dynamic Class Loading: The endpoint supports dynamic resolution of Kafka-related classes (serializers, deserializers, partitioners, SASL handlers) using Camel's
ClassResolver. This allows users to specify class names as strings in configuration, and the endpoint will load and inject properClassobjects into Kafka properties.Custom Factories: Through
KafkaClientFactoryandKafkaManualCommitFactory, the implementation supports dependency injection of custom Kafka clients and manual commit logic. This enables extension and customization of Kafka client behavior without modifying the endpoint itself.Thread Pool Management: Thread pools for consumers and producers are created through Camel’s
ExecutorServiceManager, allowing centralized lifecycle management and configuration consistency across Camel routes.Synchronous Producer Wrapping: If the configuration specifies synchronous operation, the Kafka producer is wrapped in
SynchronousDelegateProducerto ensure synchronous message sending semantics.
Interaction with Other Components
KafkaComponent: The
KafkaEndpointis created and managed byKafkaComponent, which acts as the factory and configuration holder for Kafka endpoints in Camel.KafkaConfiguration: Holds all Kafka-specific configuration options, injected into the endpoint.
KafkaProducer and KafkaConsumer: Endpoint creates instances of these classes which implement Camel's
ProducerandConsumerinterfaces, respectively. They handle the actual Kafka client interactions.CamelContext: Used for resolving classes and creating executors, integrating the endpoint tightly with the Camel runtime environment.
KafkaManualCommit: Used for manual offset commit logic in Kafka consumers when enabled.
Usage Example
// Create Kafka endpoint
KafkaEndpoint endpoint = new KafkaEndpoint("kafka:myTopic", kafkaComponent);
// Configure endpoint
KafkaConfiguration config = new KafkaConfiguration();
config.setBrokers("localhost:9092");
config.setTopic("myTopic");
config.setClientId("myClientId");
endpoint.setConfiguration(config);
// Create producer and send message
Producer producer = endpoint.createProducer();
Exchange exchange = new DefaultExchange(camelContext);
exchange.getIn().setBody("Hello Kafka");
producer.process(exchange);
// Create consumer with a processor
Consumer consumer = endpoint.createConsumer(exchange -> {
String body = exchange.getIn().getBody(String.class);
System.out.println("Received: " + body);
});
Mermaid Class Diagram
classDiagram
class KafkaEndpoint {
- KafkaConfiguration configuration
- KafkaClientFactory kafkaClientFactory
- KafkaManualCommitFactory kafkaManualCommitFactory
+ KafkaEndpoint()
+ KafkaEndpoint(String endpointUri, KafkaComponent component)
+ KafkaComponent getComponent()
+ String getServiceUrl()
+ String getServiceProtocol()
+ Map<String,String> getServiceMetadata()
+ KafkaConfiguration getConfiguration()
+ void setConfiguration(KafkaConfiguration configuration)
+ KafkaClientFactory getKafkaClientFactory()
+ void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory)
+ KafkaManualCommitFactory getKafkaManualCommitFactory()
+ void setKafkaManualCommitFactory(KafkaManualCommitFactory kafkaManualCommitFactory)
+ void doBuild() throws Exception
+ Consumer createConsumer(Processor processor) throws Exception
+ Producer createProducer() throws Exception
+ boolean isMultipleConsumersSupported()
+ <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type)
+ void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type)
+ void updateClassProperties(Properties props)
+ ExecutorService createExecutor(Object source)
+ ExecutorService createProducerExecutor(Object source)
# KafkaProducer createProducer(KafkaEndpoint endpoint)
}
KafkaEndpoint --|> DefaultEndpoint
KafkaEndpoint ..|> MultipleConsumersSupport
KafkaEndpoint ..|> EndpointServiceLocation
Summary
`KafkaEndpoint.java` is a pivotal class that integrates Apache Kafka with Apache Camel's routing framework by providing a configurable endpoint for message production and consumption. It encapsulates Kafka client creation, configuration management, and thread handling while supporting extensibility through custom factories and dynamic class loading. This design allows Camel applications to interact with Kafka brokers efficiently and flexibly within Camel routes.