KafkaComponent.java


Overview

`KafkaComponent.java` is a core component class within the Apache Camel Kafka integration module. Its primary responsibility is to serve as a factory for creating Kafka endpoints (`KafkaEndpoint`), which represent connections to Kafka topics for producing or consuming messages.

This component manages Kafka-specific configurations, lifecycle management, and advanced features such as backoff strategies for consumer creation and subscription, SSL context parameter usage, and custom factory injection for Kafka clients and manual commit handling.

It extends `HealthCheckComponent`, enabling health monitoring capabilities, and implements `SSLContextParametersAware` to integrate SSL context configuration, enhancing secure communication with Kafka brokers.


Class: KafkaComponent

Package

`org.apache.camel.component.kafka`

Inheritance

Purpose

Represents the Kafka component in Camel, responsible for:


Fields & Properties

Field

Type

Description

`configuration`

`KafkaConfiguration`

Holds the shared configuration for Kafka endpoints.

`useGlobalSslContextParameters`

`boolean`

Flag to indicate if global SSL context parameters should be used.

`kafkaManualCommitFactory`

`KafkaManualCommitFactory`

Factory for creating manual commit instances for consumers requiring manual offset commits.

`kafkaClientFactory`

`KafkaClientFactory`

Factory for creating Kafka clients (producers/consumers), allowing customization of clients.

`pollExceptionStrategy`

`PollExceptionStrategy`

Strategy for handling exceptions during Kafka polling.

`createConsumerBackoffMaxAttempts`

`int`

Max retry attempts for creating Kafka consumers before failing.

`createConsumerBackoffInterval`

`long`

Delay interval (ms) between consumer creation retry attempts.

`subscribeConsumerBackoffMaxAttempts`

`int`

Max retry attempts for subscribing Kafka consumers to topics before failing.

`subscribeConsumerBackoffInterval`

`long`

Delay interval (ms) between consumer subscription retry attempts.

`subscribeConsumerTopicMustExists`

`boolean`

If true, consumer subscription fails if topic doesn't exist on broker immediately.


Constructors

KafkaComponent()

Default constructor initializing the component without a Camel context.

KafkaComponent(CamelContext context)

Constructor initializing the component with a given `CamelContext`.


Methods

protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception

Creates a new `KafkaEndpoint` instance configured with the specified URI, topic, and parameters.


KafkaConfiguration getConfiguration()

Returns the shared Kafka configuration used by this component.


void setConfiguration(KafkaConfiguration configuration)

Sets the shared Kafka configuration for the component.


boolean isUseGlobalSslContextParameters()

Returns whether the component uses global SSL context parameters.


void setUseGlobalSslContextParameters(boolean useGlobalSslContextParameters)

Enables or disables the use of global SSL context parameters.


KafkaManualCommitFactory getKafkaManualCommitFactory()

Gets the factory used to create manual commit instances.


void setKafkaManualCommitFactory(KafkaManualCommitFactory kafkaManualCommitFactory)

Sets a custom factory to create manual commit instances for advanced manual offset commit logic.


KafkaClientFactory getKafkaClientFactory()

Gets the factory used to create Kafka clients (producers and consumers).


void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory)

Sets a custom Kafka client factory allowing to extend or customize Kafka client creation.


PollExceptionStrategy getPollExceptionStrategy()

Gets the strategy used for handling exceptions when polling Kafka messages.


void setPollExceptionStrategy(PollExceptionStrategy pollExceptionStrategy)

Sets a custom strategy to control how consumer polling exceptions are handled.


int getCreateConsumerBackoffMaxAttempts()

Gets the maximum number of attempts to create a Kafka consumer before failing.


void setCreateConsumerBackoffMaxAttempts(int createConsumerBackoffMaxAttempts)

Sets the maximum attempts for consumer creation retry.


long getCreateConsumerBackoffInterval()

Gets the delay interval (milliseconds) before retrying consumer creation.


void setCreateConsumerBackoffInterval(long createConsumerBackoffInterval)

Sets the delay interval (milliseconds) before retrying consumer creation.


int getSubscribeConsumerBackoffMaxAttempts()

Gets the maximum number of attempts to subscribe the consumer to a topic before failing.


void setSubscribeConsumerBackoffMaxAttempts(int subscribeConsumerBackoffMaxAttempts)

Sets the maximum attempts for consumer subscription retry.


long getSubscribeConsumerBackoffInterval()

Gets the delay interval (milliseconds) before retrying topic subscription.


void setSubscribeConsumerBackoffInterval(long subscribeConsumerBackoffInterval)

Sets the delay interval (milliseconds) before retrying topic subscription.


boolean isSubscribeConsumerTopicMustExists()

Returns whether the consumer subscription must verify the topic exists on the broker immediately.


void setSubscribeConsumerTopicMustExists(boolean subscribeConsumerTopicMustExists)

Sets the behavior to require topic existence verification during subscription.


Lifecycle Methods

protected void doInit() throws Exception

Initialization hook called during component startup.

protected void doStart() throws Exception

Start hook called during component start.


Important Implementation Details


Interaction with Other Parts of the System


Usage Example

// Create Camel context
CamelContext camelContext = new DefaultCamelContext();

// Create Kafka component and configure
KafkaComponent kafkaComponent = new KafkaComponent(camelContext);
KafkaConfiguration config = new KafkaConfiguration();
config.setBrokers("localhost:9092");
kafkaComponent.setConfiguration(config);

// Add component to Camel context
camelContext.addComponent("kafka", kafkaComponent);

// Create endpoint URI
String uri = "kafka:myTopic?groupId=myGroup";

// Create endpoint
KafkaEndpoint endpoint = (KafkaEndpoint) kafkaComponent.createEndpoint(uri, "myTopic", new HashMap<>());

// Use endpoint as needed in routes

Mermaid Class Diagram

classDiagram
    class KafkaComponent {
        -KafkaConfiguration configuration
        -boolean useGlobalSslContextParameters
        -KafkaManualCommitFactory kafkaManualCommitFactory
        -KafkaClientFactory kafkaClientFactory
        -PollExceptionStrategy pollExceptionStrategy
        -int createConsumerBackoffMaxAttempts
        -long createConsumerBackoffInterval
        -int subscribeConsumerBackoffMaxAttempts
        -long subscribeConsumerBackoffInterval
        -boolean subscribeConsumerTopicMustExists
        +KafkaComponent()
        +KafkaComponent(CamelContext)
        #KafkaEndpoint createEndpoint(String uri, String remaining, Map parameters)
        +KafkaConfiguration getConfiguration()
        +void setConfiguration(KafkaConfiguration configuration)
        +boolean isUseGlobalSslContextParameters()
        +void setUseGlobalSslContextParameters(boolean flag)
        +KafkaManualCommitFactory getKafkaManualCommitFactory()
        +void setKafkaManualCommitFactory(KafkaManualCommitFactory factory)
        +KafkaClientFactory getKafkaClientFactory()
        +void setKafkaClientFactory(KafkaClientFactory factory)
        +PollExceptionStrategy getPollExceptionStrategy()
        +void setPollExceptionStrategy(PollExceptionStrategy strategy)
        +int getCreateConsumerBackoffMaxAttempts()
        +void setCreateConsumerBackoffMaxAttempts(int attempts)
        +long getCreateConsumerBackoffInterval()
        +void setCreateConsumerBackoffInterval(long interval)
        +int getSubscribeConsumerBackoffMaxAttempts()
        +void setSubscribeConsumerBackoffMaxAttempts(int attempts)
        +long getSubscribeConsumerBackoffInterval()
        +void setSubscribeConsumerBackoffInterval(long interval)
        +boolean isSubscribeConsumerTopicMustExists()
        +void setSubscribeConsumerTopicMustExists(boolean flag)
        #void doInit()
        #void doStart()
    }
    
    KafkaComponent --|> HealthCheckComponent
    KafkaComponent ..|> SSLContextParametersAware

Summary

`KafkaComponent` serves as the foundational entry point for Kafka integration in Apache Camel. It abstracts Kafka client and endpoint creation and provides a rich set of configuration options for secure, robust, and customizable Kafka messaging, including advanced features like manual commits, backoff retry policies, and exception strategies. This class tightly integrates with Camel’s lifecycle and property binding mechanisms, enabling seamless Kafka connectivity within Camel routes and applications.