KafkaEndpoint.java


Overview

`KafkaEndpoint.java` is a core class in the Apache Camel Kafka component, responsible for representing an endpoint that can send and receive messages to/from an Apache Kafka broker. It acts as a bridge between Camel's routing engine and Kafka's producer and consumer clients.

The class encapsulates Kafka-specific configurations and provides factory methods to create Kafka producers and consumers. It also supports advanced features such as custom Kafka client factories, manual commit strategies for consumers, and integration with Camel's executor service management for thread pool creation.

Key responsibilities include:

This class extends `DefaultEndpoint` and implements `MultipleConsumersSupport` and `EndpointServiceLocation` interfaces, integrating tightly with the Camel framework.


Class: KafkaEndpoint

public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, EndpointServiceLocation

Description

Represents a Kafka endpoint in Apache Camel, allowing routes to produce to or consume from Kafka topics.

Key Fields

Field

Type

Description

`configuration`

`KafkaConfiguration`

Holds all the Kafka-specific configuration options such as brokers, topic, client ID, etc.

`kafkaClientFactory`

`KafkaClientFactory`

Optional factory to create Kafka consumer and producer instances, allowing for custom clients.

`kafkaManualCommitFactory`

`KafkaManualCommitFactory`

Optional factory to create Kafka manual commit handlers for consumers requiring manual commits.

Constructor Summary

Constructor

Description

`KafkaEndpoint()`

Default no-arg constructor.

`KafkaEndpoint(String endpointUri, KafkaComponent component)`

Creates an endpoint with URI and parent component.

Important Methods

KafkaComponent getComponent()

String getServiceUrl()

String getServiceProtocol()

Map<String, String> getServiceMetadata()

KafkaConfiguration getConfiguration()

void setConfiguration(KafkaConfiguration configuration)

KafkaClientFactory getKafkaClientFactory()

void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory)

KafkaManualCommitFactory getKafkaManualCommitFactory()

void setKafkaManualCommitFactory(KafkaManualCommitFactory kafkaManualCommitFactory)

void doBuild() throws Exception

Consumer createConsumer(Processor processor) throws Exception

**Usage Example:**

Processor processor = exchange -> {
    // Processing logic
};
Consumer consumer = kafkaEndpoint.createConsumer(processor);

Producer createProducer() throws Exception

**Usage Example:**

Producer producer = kafkaEndpoint.createProducer();
// Use the producer to send messages

boolean isMultipleConsumersSupported()

<T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type)

void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type)

void updateClassProperties(Properties props)

ExecutorService createExecutor(Object source)

ExecutorService createProducerExecutor(Object source)

protected KafkaProducer createProducer(KafkaEndpoint endpoint)


Implementation Details and Algorithms


Interaction with Other Components


Usage Example

// Create Kafka endpoint
KafkaEndpoint endpoint = new KafkaEndpoint("kafka:myTopic", kafkaComponent);

// Configure endpoint
KafkaConfiguration config = new KafkaConfiguration();
config.setBrokers("localhost:9092");
config.setTopic("myTopic");
config.setClientId("myClientId");
endpoint.setConfiguration(config);

// Create producer and send message
Producer producer = endpoint.createProducer();
Exchange exchange = new DefaultExchange(camelContext);
exchange.getIn().setBody("Hello Kafka");
producer.process(exchange);

// Create consumer with a processor
Consumer consumer = endpoint.createConsumer(exchange -> {
    String body = exchange.getIn().getBody(String.class);
    System.out.println("Received: " + body);
});

Mermaid Class Diagram

classDiagram
    class KafkaEndpoint {
        - KafkaConfiguration configuration
        - KafkaClientFactory kafkaClientFactory
        - KafkaManualCommitFactory kafkaManualCommitFactory
        + KafkaEndpoint()
        + KafkaEndpoint(String endpointUri, KafkaComponent component)
        + KafkaComponent getComponent()
        + String getServiceUrl()
        + String getServiceProtocol()
        + Map<String,String> getServiceMetadata()
        + KafkaConfiguration getConfiguration()
        + void setConfiguration(KafkaConfiguration configuration)
        + KafkaClientFactory getKafkaClientFactory()
        + void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory)
        + KafkaManualCommitFactory getKafkaManualCommitFactory()
        + void setKafkaManualCommitFactory(KafkaManualCommitFactory kafkaManualCommitFactory)
        + void doBuild() throws Exception
        + Consumer createConsumer(Processor processor) throws Exception
        + Producer createProducer() throws Exception
        + boolean isMultipleConsumersSupported()
        + <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type)
        + void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type)
        + void updateClassProperties(Properties props)
        + ExecutorService createExecutor(Object source)
        + ExecutorService createProducerExecutor(Object source)
        # KafkaProducer createProducer(KafkaEndpoint endpoint)
    }
    
    KafkaEndpoint --|> DefaultEndpoint
    KafkaEndpoint ..|> MultipleConsumersSupport
    KafkaEndpoint ..|> EndpointServiceLocation

Summary

`KafkaEndpoint.java` is a pivotal class that integrates Apache Kafka with Apache Camel's routing framework by providing a configurable endpoint for message production and consumption. It encapsulates Kafka client creation, configuration management, and thread handling while supporting extensibility through custom factories and dynamic class loading. This design allows Camel applications to interact with Kafka brokers efficiently and flexibly within Camel routes.