KafkaConfiguration.java
Overview
`KafkaConfiguration` is a central configuration class within the Apache Camel Kafka component. It encapsulates all configurable properties related to Kafka producers and consumers used in Camel routes. This class provides a comprehensive set of options to customize Kafka client behavior, including connection details, serialization, security (SSL and SASL), consumer behavior, producer batching, retries, and more.
The class serves as a bridge between Camel and Kafka client configurations, enabling fine-grained control over Kafka interactions from Camel routes. It supports creating Kafka producer and consumer properties (`java.util.Properties`) objects, which are used internally to instantiate Kafka clients.
Class: KafkaConfiguration
`KafkaConfiguration` implements `Cloneable` and `HeaderFilterStrategyAware`, allowing it to be cloned and to manage header filtering strategies for Kafka message headers.
Key Responsibilities:
Store configuration parameters for Kafka producers and consumers.
Provide getters and setters for all Kafka-related options.
Generate
Propertiesobjects for Kafka producers and consumers based on configured options.Manage SSL and SASL security settings, including integration with Camel's
SSLContextParameters.Support manual offset commits and batching for consumers.
Support custom header serialization and deserialization strategies.
Detailed Class Structure
classDiagram
class KafkaConfiguration {
-String topic
-String brokers
-String clientId
-HeaderFilterStrategy headerFilterStrategy
-Integer retryBackoffMs
-Integer retryBackoffMaxMs
-boolean preValidateHostAndPort
-boolean topicIsPattern
-String groupId
-String groupInstanceId
-int consumersCount
-KafkaHeaderDeserializer headerDeserializer
-String interceptorClasses
-String keyDeserializer
-String valueDeserializer
-Integer fetchMinBytes
-Integer fetchMaxBytes
-Integer heartbeatIntervalMs
-Integer maxPartitionFetchBytes
-Integer sessionTimeoutMs
-Integer maxPollRecords
-Long pollTimeoutMs
-Integer maxPollIntervalMs
-String autoOffsetReset
-String partitionAssignor
-Integer consumerRequestTimeoutMs
-Integer autoCommitIntervalMs
-Boolean checkCrcs
-Integer fetchWaitMaxMs
-SeekPolicy seekTo
-boolean autoCommitEnable
-boolean allowManualCommit
-boolean breakOnFirstError
-StateRepository offsetRepository
-PollOnError pollOnError
-Long commitTimeoutMs
-String isolationLevel
-String partitioner
-boolean partitionerIgnoreKeys
-ExecutorService workerPool
-Integer workerPoolCoreSize
-Integer workerPoolMaxSize
-Integer queueBufferingMaxMessages
-String valueSerializer
-String keySerializer
-String key
-Integer partitionKey
-boolean useIterator
-String requestRequiredAcks
-Integer bufferMemorySize
-String compressionCodec
-Integer retries
-boolean batchWithIndividualHeaders
-Integer producerBatchSize
-Integer connectionMaxIdleMs
-Integer lingerMs
-Integer maxBlockMs
-Integer maxRequestSize
-Integer receiveBufferBytes
-Integer requestTimeoutMs
-Integer deliveryTimeoutMs
-Integer sendBufferBytes
-boolean recordMetadata
-Integer maxInFlightRequest
-Integer metadataMaxAgeMs
-String metricReporters
-Integer noOfMetricsSample
-Integer metricsSampleWindowMs
-Integer reconnectBackoffMs
-boolean enableIdempotence
-KafkaHeaderSerializer headerSerializer
-Integer reconnectBackoffMaxMs
-SSLContextParameters sslContextParameters
-String sslKeyPassword
-String sslKeystoreLocation
-String sslKeystorePassword
-String sslTruststoreLocation
-String sslTruststorePassword
-String sslEnabledProtocols
-String sslKeystoreType
-String sslProtocol
-String sslProvider
-String sslTruststoreType
-String sslCipherSuites
-String sslEndpointAlgorithm
-String sslKeymanagerAlgorithm
-String sslTrustmanagerAlgorithm
-String saslKerberosServiceName
-String securityProtocol
-String saslMechanism
-String kerberosInitCmd
-Integer kerberosBeforeReloginMinTime
-Double kerberosRenewJitter
-Double kerberosRenewWindowFactor
-String kerberosPrincipalToLocalRules
-String saslJaasConfig
-String schemaRegistryURL
-boolean specificAvroReader
-Map<String,Object> additionalProperties
-int shutdownTimeout
-boolean synchronous
-String kerberosConfigLocation
-boolean batching
-Integer batchingIntervalMs
-String transactionalId
-boolean transacted
+KafkaConfiguration()
+KafkaConfiguration copy()
+Properties createProducerProperties()
+Properties createConsumerProperties()
+boolean isPreValidateHostAndPort()
+void setPreValidateHostAndPort(boolean)
+boolean isTopicIsPattern()
+void setTopicIsPattern(boolean)
+String getGroupId()
+void setGroupId(String)
+String getGroupInstanceId()
+void setGroupInstanceId(String)
+String getPartitioner()
+void setPartitioner(String)
+boolean isPartitionerIgnoreKeys()
+void setPartitionerIgnoreKeys(boolean)
+String getTopic()
+void setTopic(String)
+int getConsumersCount()
+void setConsumersCount(int)
+String getClientId()
+void setClientId(String)
+boolean isAutoCommitEnable()
+boolean getAutoCommitEnable()
+void setAutoCommitEnable(boolean)
+boolean isAllowManualCommit()
+void setAllowManualCommit(boolean)
+int getShutdownTimeout()
+void setShutdownTimeout(int)
+StateRepository getOffsetRepository()
+void setOffsetRepository(StateRepository)
+Integer getAutoCommitIntervalMs()
+void setAutoCommitIntervalMs(Integer)
+Integer getFetchMinBytes()
+void setFetchMinBytes(Integer)
+Integer getFetchMaxBytes()
+void setFetchMaxBytes(Integer)
+Integer getFetchWaitMaxMs()
+void setFetchWaitMaxMs(Integer)
+String getAutoOffsetReset()
+void setAutoOffsetReset(String)
+boolean isBreakOnFirstError()
+void setBreakOnFirstError(boolean)
+String getBrokers()
+void setBrokers(String)
+String getSchemaRegistryURL()
+void setSchemaRegistryURL(String)
+boolean isSpecificAvroReader()
+void setSpecificAvroReader(boolean)
+String getCompressionCodec()
+void setCompressionCodec(String)
+Integer getRetryBackoffMs()
+void setRetryBackoffMs(Integer)
+Integer getRetryBackoffMaxMs()
+void setRetryBackoffMaxMs(Integer)
+Integer getSendBufferBytes()
+void setSendBufferBytes(Integer)
+Integer getRequestTimeoutMs()
+void setRequestTimeoutMs(Integer)
+Integer getDeliveryTimeoutMs()
+void setDeliveryTimeoutMs(Integer)
+Integer getQueueBufferingMaxMessages()
+void setQueueBufferingMaxMessages(Integer)
+String getValueSerializer()
+void setValueSerializer(String)
+String getKeySerializer()
+void setKeySerializer(String)
+String getKerberosInitCmd()
+void setKerberosInitCmd(String)
+Integer getKerberosBeforeReloginMinTime()
+void setKerberosBeforeReloginMinTime(Integer)
+Double getKerberosRenewJitter()
+void setKerberosRenewJitter(Double)
+Double getKerberosRenewWindowFactor()
+void setKerberosRenewWindowFactor(Double)
+String getKerberosPrincipalToLocalRules()
+void setKerberosPrincipalToLocalRules(String)
+String getSslCipherSuites()
+void setSslCipherSuites(String)
+String getSslEndpointAlgorithm()
+void setSslEndpointAlgorithm(String)
+String getSslKeymanagerAlgorithm()
+void setSslKeymanagerAlgorithm(String)
+String getSslTrustmanagerAlgorithm()
+void setSslTrustmanagerAlgorithm(String)
+String getSslEnabledProtocols()
+void setSslEnabledProtocols(String)
+String getSslKeystoreType()
+void setSslKeystoreType(String)
+String getSslProtocol()
+void setSslProtocol(String)
+String getSslProvider()
+void setSslProvider(String)
+String getSslTruststoreType()
+void setSslTruststoreType(String)
+String getSaslKerberosServiceName()
+void setSaslKerberosServiceName(String)
+String getSaslMechanism()
+void setSaslMechanism(String)
+String getSaslJaasConfig()
+void setSaslJaasConfig(String)
+String getSecurityProtocol()
+void setSecurityProtocol(String)
+SSLContextParameters getSslContextParameters()
+void setSslContextParameters(SSLContextParameters)
+String getSslKeyPassword()
+void setSslKeyPassword(String)
+String getSslKeystoreLocation()
+void setSslKeystoreLocation(String)
+String getSslKeystorePassword()
+void setSslKeystorePassword(String)
+String getSslTruststoreLocation()
+void setSslTruststoreLocation(String)
+String getSslTruststorePassword()
+void setSslTruststorePassword(String)
+Integer getBufferMemorySize()
+void setBufferMemorySize(Integer)
+String getKey()
+void setKey(String)
+Integer getPartitionKey()
+void setPartitionKey(Integer)
+boolean isUseIterator()
+void setUseIterator(boolean)
+String getRequestRequiredAcks()
+void setRequestRequiredAcks(String)
+Integer getRetries()
+void setRetries(Integer)
+Integer getProducerBatchSize()
+void setProducerBatchSize(Integer)
+boolean isBatchWithIndividualHeaders()
+void setBatchWithIndividualHeaders(boolean)
+Integer getConnectionMaxIdleMs()
+void setConnectionMaxIdleMs(Integer)
+Integer getLingerMs()
+void setLingerMs(Integer)
+Integer getMaxBlockMs()
+void setMaxBlockMs(Integer)
+Integer getMaxRequestSize()
+void setMaxRequestSize(Integer)
+Integer getReceiveBufferBytes()
+void setReceiveBufferBytes(Integer)
+Integer getMaxInFlightRequest()
+void setMaxInFlightRequest(Integer)
+Integer getMetadataMaxAgeMs()
+void setMetadataMaxAgeMs(Integer)
+String getMetricReporters()
+void setMetricReporters(String)
+Integer getNoOfMetricsSample()
+void setNoOfMetricsSample(Integer)
+Integer getMetricsSampleWindowMs()
+void setMetricsSampleWindowMs(Integer)
+Integer getReconnectBackoffMs()
+void setReconnectBackoffMs(Integer)
+boolean isEnableIdempotence()
+void setEnableIdempotence(boolean)
+Integer getReconnectBackoffMaxMs()
+void setReconnectBackoffMaxMs(Integer)
+HeaderFilterStrategy getHeaderFilterStrategy()
+void setHeaderFilterStrategy(HeaderFilterStrategy)
+KafkaHeaderDeserializer getHeaderDeserializer()
+void setHeaderDeserializer(KafkaHeaderDeserializer)
+KafkaHeaderSerializer getHeaderSerializer()
+void setHeaderSerializer(KafkaHeaderSerializer)
+void setAdditionalProperties(Map<String,Object>)
+Map<String,Object> getAdditionalProperties()
+boolean isSynchronous()
+void setSynchronous(boolean)
+PollOnError getPollOnError()
+void setPollOnError(PollOnError)
+Long getCommitTimeoutMs()
+void setCommitTimeoutMs(Long)
+String getIsolationLevel()
+void setIsolationLevel(String)
+String getKerberosConfigLocation()
+void setKerberosConfigLocation(String)
+boolean isBatching()
+void setBatching(boolean)
+Integer getBatchingIntervalMs()
+void setBatchingIntervalMs(Integer)
+boolean isTransacted()
+void setTransacted(boolean)
+String getTransactionalId()
+void setTransactionalId(String)
# void applySaslConfiguration(Properties)
# void applyProducerSslConfiguration(Properties)
# void applySslConsumerConfigurationFromOptions(Properties)
# void applySslConfigurationFromContext(Properties, SSLContextParameters)
# void applyAdditionalProperties(Properties, Map<String,Object>)
# boolean isSasl(String)
# static void addPropertyIfNotEmpty(Properties, String, T)
# static void addUpperCasePropertyIfNotEmpty(Properties, String, T)
# static void addPropertyIfNotNull(Properties, String, T)
# static void addPropertyIfNotFalse(Properties, String, boolean)
# static void addCommaSeparatedList(Properties, String, List<String>)
}
Core Methods
Constructor
public KafkaConfiguration()
Creates a new instance with default values.
copy
public KafkaConfiguration copy()
Returns a deep copy of this configuration instance, cloning the object and copying the additional properties map.
**Usage:**
KafkaConfiguration originalConfig = new KafkaConfiguration();
KafkaConfiguration clonedConfig = originalConfig.copy();
createProducerProperties
public Properties createProducerProperties()
Generates a `Properties` object containing all configured Kafka producer properties. This includes serialization, batching, retries, security (SSL and SASL), and any additional properties set.
Uses configured serializers (
keySerializer,valueSerializer).Applies SSL configuration either from
sslContextParametersor individual SSL options.Applies SASL configurations if the
securityProtocolindicates SASL usage.Adds any additional properties specified in
additionalProperties.
**Returns:** `Properties` ready to be passed to KafkaProducer constructor.
createConsumerProperties
public Properties createConsumerProperties()
Generates a `Properties` object containing all configured Kafka consumer properties. This includes deserialization, fetch sizes, timeouts, security, auto commit settings, and additional properties.
Uses configured deserializers (
keyDeserializer,valueDeserializer).Applies SSL configuration either from
sslContextParametersor individual SSL options.Applies SASL configurations if the
securityProtocolindicates SASL usage.Adds any additional properties specified in
additionalProperties.
**Returns:** `Properties` ready to be passed to KafkaConsumer constructor.
SSL and SASL Configuration Helpers
applySslConfigurationFromContext(Properties, SSLContextParameters)
Applies SSL configuration based on Camel'sSSLContextParameters.applyProducerSslConfiguration(Properties)
Applies SSL options for producer based on individual SSL properties.applySslConsumerConfigurationFromOptions(Properties)
Applies SSL options for consumer based on individual SSL properties.applySaslConfiguration(Properties)
Applies SASL configuration options.
These internal methods modularize SSL/SASL setup for Kafka clients.
Header Filter Strategy
@Override
public HeaderFilterStrategy getHeaderFilterStrategy()
Returns the current header filter strategy used to filter headers between Camel messages and Kafka messages.
@Override
public void setHeaderFilterStrategy(HeaderFilterStrategy strategy)
Sets a custom header filter strategy.
Additional Properties
public void setAdditionalProperties(Map<String,Object> additionalProperties)
Sets additional Kafka configuration properties not explicitly exposed by the class fields.
public Map<String,Object> getAdditionalProperties()
Returns the map of additional properties.
This feature supports Kafka properties that may be new or less commonly used and not yet supported directly by Camel Kafka component options.
Important Configuration Properties (Selected)
Property Name | Description | Default / Notes |
|---|---|---|
`topic` | Kafka topic to consume from or produce to. | **Required** |
`brokers` | Comma-separated list of Kafka brokers (bootstrap servers). | |
`groupId` | Consumer group identifier. | Required for consumers |
`clientId` | Identifier for Kafka client used in each request for tracing. | |
`keySerializer` | Class name of key serializer. | Defaults to `KafkaConstants.KAFKA_DEFAULT_SERIALIZER` |
`valueSerializer` | Class name of value serializer. | Defaults to `KafkaConstants.KAFKA_DEFAULT_SERIALIZER` |
`keyDeserializer` | Class name of key deserializer. | Defaults to `KafkaConstants.KAFKA_DEFAULT_DESERIALIZER` |
`valueDeserializer` | Class name of value deserializer. | Defaults to `KafkaConstants.KAFKA_DEFAULT_DESERIALIZER` |
`autoCommitEnable` | Enables auto commit of offsets for consumers. | true by default |
`allowManualCommit` | Allows manual offset commits via KafkaManualCommit API. | false by default |
`sslContextParameters` | Camel SSLContextParameters for SSL configuration. | |
`securityProtocol` | Kafka security protocol (PLAINTEXT, SSL, SASL_SSL, etc.) | Defaults to Kafka's default (`PLAINTEXT`) |
`saslMechanism` | SASL authentication mechanism. | Defaults to `PLAIN` |
`retries` | Number of retries for producer message sends. | |
`enableIdempotence` | Enables producer idempotence to avoid duplicate messages. | true by default |
`batching` | Enables consumer batching mode. | false by default |
`pollTimeoutMs` | Timeout for consumer polling. | 5000 ms default |
`transactionalId` | Enables producer transactional mode with this ID. | |
`transacted` | Enables transacted endpoint behavior creating transactional.id automatically. | false by default |
Usage Examples
Creating a Kafka Producer Configuration
KafkaConfiguration config = new KafkaConfiguration();
config.setTopic("myTopic");
config.setBrokers("localhost:9092");
config.setKeySerializer("org.apache.kafka.common.serialization.StringSerializer");
config.setValueSerializer("org.apache.kafka.common.serialization.StringSerializer");
config.setClientId("myProducerClient");
Properties producerProps = config.createProducerProperties();
// Use producerProps to create KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);