KafkaComponent.java
Overview
`KafkaComponent.java` is a core component class within the Apache Camel Kafka integration module. Its primary responsibility is to serve as a factory for creating Kafka endpoints (`KafkaEndpoint`), which represent connections to Kafka topics for producing or consuming messages.
This component manages Kafka-specific configurations, lifecycle management, and advanced features such as backoff strategies for consumer creation and subscription, SSL context parameter usage, and custom factory injection for Kafka clients and manual commit handling.
It extends `HealthCheckComponent`, enabling health monitoring capabilities, and implements `SSLContextParametersAware` to integrate SSL context configuration, enhancing secure communication with Kafka brokers.
Class: KafkaComponent
Package
`org.apache.camel.component.kafka`
Inheritance
Extends:
HealthCheckComponentImplements:
SSLContextParametersAware
Purpose
Represents the Kafka component in Camel, responsible for:
Creating and configuring Kafka endpoints.
Managing component-level Kafka configurations.
Supporting SSL security parameters.
Allowing advanced customization via factories and exception strategies.
Handling retry/backoff policies for robust Kafka consumer lifecycle management.
Fields & Properties
Field | Type | Description |
|---|---|---|
`configuration` | `KafkaConfiguration` | Holds the shared configuration for Kafka endpoints. |
`useGlobalSslContextParameters` | `boolean` | Flag to indicate if global SSL context parameters should be used. |
`kafkaManualCommitFactory` | `KafkaManualCommitFactory` | Factory for creating manual commit instances for consumers requiring manual offset commits. |
`kafkaClientFactory` | `KafkaClientFactory` | Factory for creating Kafka clients (producers/consumers), allowing customization of clients. |
`pollExceptionStrategy` | `PollExceptionStrategy` | Strategy for handling exceptions during Kafka polling. |
`createConsumerBackoffMaxAttempts` | `int` | Max retry attempts for creating Kafka consumers before failing. |
`createConsumerBackoffInterval` | `long` | Delay interval (ms) between consumer creation retry attempts. |
`subscribeConsumerBackoffMaxAttempts` | `int` | Max retry attempts for subscribing Kafka consumers to topics before failing. |
`subscribeConsumerBackoffInterval` | `long` | Delay interval (ms) between consumer subscription retry attempts. |
`subscribeConsumerTopicMustExists` | `boolean` | If true, consumer subscription fails if topic doesn't exist on broker immediately. |
Constructors
KafkaComponent()
Default constructor initializing the component without a Camel context.
KafkaComponent(CamelContext context)
Constructor initializing the component with a given `CamelContext`.
Methods
protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception
Creates a new `KafkaEndpoint` instance configured with the specified URI, topic, and parameters.
Parameters:
uri- Full endpoint URI (e.g.,"kafka:my-topic").remaining- The part of the URI after the scheme, typically the Kafka topic name.parameters- Map of additional configuration properties for the endpoint.
Returns:
A fully configuredKafkaEndpointinstance.Throws:
IllegalArgumentExceptionif the topic is missing in the URI.Usage Example:
KafkaComponent kafkaComponent = new KafkaComponent(camelContext); KafkaEndpoint endpoint = kafkaComponent.createEndpoint("kafka:myTopic", "myTopic", paramsMap);Implementation Details:
Validates that the topic name is provided in the URI.
Extracts any
additionalProperties.*from the parameters and binds them.Copies the component-level configuration for reuse.
Applies SSL context parameters if not explicitly set on the endpoint.
Ensures the topic is set on the endpoint configuration if missing after property binding.
KafkaConfiguration getConfiguration()
Returns the shared Kafka configuration used by this component.
void setConfiguration(KafkaConfiguration configuration)
Sets the shared Kafka configuration for the component.
Usage:
Used to pre-configure common Kafka options that endpoints inherit.
boolean isUseGlobalSslContextParameters()
Returns whether the component uses global SSL context parameters.
void setUseGlobalSslContextParameters(boolean useGlobalSslContextParameters)
Enables or disables the use of global SSL context parameters.
KafkaManualCommitFactory getKafkaManualCommitFactory()
Gets the factory used to create manual commit instances.
void setKafkaManualCommitFactory(KafkaManualCommitFactory kafkaManualCommitFactory)
Sets a custom factory to create manual commit instances for advanced manual offset commit logic.
KafkaClientFactory getKafkaClientFactory()
Gets the factory used to create Kafka clients (producers and consumers).
void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory)
Sets a custom Kafka client factory allowing to extend or customize Kafka client creation.
PollExceptionStrategy getPollExceptionStrategy()
Gets the strategy used for handling exceptions when polling Kafka messages.
void setPollExceptionStrategy(PollExceptionStrategy pollExceptionStrategy)
Sets a custom strategy to control how consumer polling exceptions are handled.
int getCreateConsumerBackoffMaxAttempts()
Gets the maximum number of attempts to create a Kafka consumer before failing.
void setCreateConsumerBackoffMaxAttempts(int createConsumerBackoffMaxAttempts)
Sets the maximum attempts for consumer creation retry.
long getCreateConsumerBackoffInterval()
Gets the delay interval (milliseconds) before retrying consumer creation.
void setCreateConsumerBackoffInterval(long createConsumerBackoffInterval)
Sets the delay interval (milliseconds) before retrying consumer creation.
int getSubscribeConsumerBackoffMaxAttempts()
Gets the maximum number of attempts to subscribe the consumer to a topic before failing.
void setSubscribeConsumerBackoffMaxAttempts(int subscribeConsumerBackoffMaxAttempts)
Sets the maximum attempts for consumer subscription retry.
long getSubscribeConsumerBackoffInterval()
Gets the delay interval (milliseconds) before retrying topic subscription.
void setSubscribeConsumerBackoffInterval(long subscribeConsumerBackoffInterval)
Sets the delay interval (milliseconds) before retrying topic subscription.
boolean isSubscribeConsumerTopicMustExists()
Returns whether the consumer subscription must verify the topic exists on the broker immediately.
void setSubscribeConsumerTopicMustExists(boolean subscribeConsumerTopicMustExists)
Sets the behavior to require topic existence verification during subscription.
Lifecycle Methods
protected void doInit() throws Exception
Initialization hook called during component startup.
Ensures default Kafka client factory is created if none was autowired.
Warns if manual commit is allowed but no factory is set.
protected void doStart() throws Exception
Start hook called during component start.
Binds additional properties in the configuration with support for Camel property placeholders such as
#beanor#class.
Important Implementation Details
Endpoint Creation:
The component uses the URI scheme"kafka:<topic>"to create endpoints. The topic is mandatory and must be specified either in the URI or configuration.Configuration Copying:
Each endpoint receives a copy of the component's shared configuration, ensuring endpoint-level customization does not affect the component-wide defaults.Additional Properties:
Supports dynamic binding of additional Kafka client properties viaadditionalProperties.*parameters.SSL Context Integration:
Supports global SSL context parameters to secure Kafka connections. If an endpoint does not specify SSL parameters, it inherits the global ones.Backoff and Retry Strategies:
Configurable retry counts and intervals are available for consumer creation and subscription to handle transient errors (e.g., DNS issues or broker unavailability).Manual Commit Support:
Supports manual commit of Kafka offsets with a pluggable factory, allowing customized commit behavior.Health Checking:
ExtendsHealthCheckComponentto provide health checks for Kafka connectivity and operations.
Interaction with Other Parts of the System
KafkaEndpoint:
KafkaComponentcreates and configures instances ofKafkaEndpoint, which encapsulate Kafka producer/consumer behavior.KafkaConfiguration:
Holds all configuration options; shared across endpoints but can be overridden per endpoint.KafkaClientFactory:
Provides Kafka client instances (KafkaConsumerandKafkaProducer), allowing customization or extension of clients.KafkaManualCommitFactory:
Allows plugging in customized manual commit logic for Kafka consumers.PollExceptionStrategy:
Defines how exceptions during polling are handled, enabling custom error handling strategies.CamelContext:
Integrates with Apache Camel’s context for property binding, dependency injection, and lifecycle management.
Usage Example
// Create Camel context
CamelContext camelContext = new DefaultCamelContext();
// Create Kafka component and configure
KafkaComponent kafkaComponent = new KafkaComponent(camelContext);
KafkaConfiguration config = new KafkaConfiguration();
config.setBrokers("localhost:9092");
kafkaComponent.setConfiguration(config);
// Add component to Camel context
camelContext.addComponent("kafka", kafkaComponent);
// Create endpoint URI
String uri = "kafka:myTopic?groupId=myGroup";
// Create endpoint
KafkaEndpoint endpoint = (KafkaEndpoint) kafkaComponent.createEndpoint(uri, "myTopic", new HashMap<>());
// Use endpoint as needed in routes
Mermaid Class Diagram
classDiagram
class KafkaComponent {
-KafkaConfiguration configuration
-boolean useGlobalSslContextParameters
-KafkaManualCommitFactory kafkaManualCommitFactory
-KafkaClientFactory kafkaClientFactory
-PollExceptionStrategy pollExceptionStrategy
-int createConsumerBackoffMaxAttempts
-long createConsumerBackoffInterval
-int subscribeConsumerBackoffMaxAttempts
-long subscribeConsumerBackoffInterval
-boolean subscribeConsumerTopicMustExists
+KafkaComponent()
+KafkaComponent(CamelContext)
#KafkaEndpoint createEndpoint(String uri, String remaining, Map parameters)
+KafkaConfiguration getConfiguration()
+void setConfiguration(KafkaConfiguration configuration)
+boolean isUseGlobalSslContextParameters()
+void setUseGlobalSslContextParameters(boolean flag)
+KafkaManualCommitFactory getKafkaManualCommitFactory()
+void setKafkaManualCommitFactory(KafkaManualCommitFactory factory)
+KafkaClientFactory getKafkaClientFactory()
+void setKafkaClientFactory(KafkaClientFactory factory)
+PollExceptionStrategy getPollExceptionStrategy()
+void setPollExceptionStrategy(PollExceptionStrategy strategy)
+int getCreateConsumerBackoffMaxAttempts()
+void setCreateConsumerBackoffMaxAttempts(int attempts)
+long getCreateConsumerBackoffInterval()
+void setCreateConsumerBackoffInterval(long interval)
+int getSubscribeConsumerBackoffMaxAttempts()
+void setSubscribeConsumerBackoffMaxAttempts(int attempts)
+long getSubscribeConsumerBackoffInterval()
+void setSubscribeConsumerBackoffInterval(long interval)
+boolean isSubscribeConsumerTopicMustExists()
+void setSubscribeConsumerTopicMustExists(boolean flag)
#void doInit()
#void doStart()
}
KafkaComponent --|> HealthCheckComponent
KafkaComponent ..|> SSLContextParametersAware
Summary
`KafkaComponent` serves as the foundational entry point for Kafka integration in Apache Camel. It abstracts Kafka client and endpoint creation and provides a rich set of configuration options for secure, robust, and customizable Kafka messaging, including advanced features like manual commits, backoff retry policies, and exception strategies. This class tightly integrates with Camel’s lifecycle and property binding mechanisms, enabling seamless Kafka connectivity within Camel routes and applications.