kafka-component.adoc
Kafka Component
**Version:** Since Camel 2.13 **Artifact:** camel-kafka **Support level:** Stable **Description:** Send and receive messages to/from an Apache Kafka broker. **Component header:** Both producer and consumer are supported
Overview
The Kafka component provides integration between Apache Camel and the Apache Kafka message broker. It enables Camel routes to produce messages to Kafka topics and consume messages from Kafka topics with full support for both producers and consumers.
Apache Kafka is a distributed streaming platform widely used for building real-time data pipelines and streaming apps. This component simplifies interaction with Kafka by exposing a Camel endpoint that supports the configuration of Kafka producers and consumers.
Maven users must include the `camel-kafka` dependency with the same version as the Camel core to use this component.
URI Format
kafka:topic[?options]
topic: Kafka topic name or pattern to consume/produce messages. Patterns are supported whentopicIsPattern=true.options: Various component and endpoint options to configure connection, security, batching, error handling, etc.
Key Features
Support for both Kafka producers and consumers within Camel routes.
Configurable error handling strategies for consumer polling errors.
Support for manual and automatic offset commits.
Idempotent repository implementation backed by Kafka topics for exactly-once semantics.
Support for transactional Kafka producers.
Support for Kafka header serialization/deserialization with customizable strategies.
Supports pausable consumers and custom subscription adapters.
SSL/TLS and SASL authentication support, including OAuth.
Batching consumer support with configurable batch size and timeout.
Integration patterns to interoperate with JMS and other systems.
Implementation Details & Algorithms
Consumer Error Handling
Kafka consumers can encounter exceptions during polling, such as deserialization errors or transient broker issues. The component classifies exceptions as retriable (`RetriableException`) or not, and applies one of several strategies controlled by the `pollOnError` configuration:
DISCARD: Discards the problematic message and continues polling.
ERROR_HANDLER (default): Uses Camel's error handler for exception processing, then continues polling.
RECONNECT: Reconnects the consumer and retries polling.
RETRY: Retries polling the same message.
STOP: Stops the consumer until manually restarted.
Advanced users can implement `PollExceptionStrategy` for custom behavior.
The `breakOnFirstError` option controls whether the consumer commits offsets immediately after encountering an error, enabling retries of failed messages.
Kafka Idempotent Repository
The component offers a distributed idempotent repository backed by a Kafka topic that stores changes as events. Each instance maintains a local in-memory cache rebuilt on startup by consuming the entire topic. This allows multiple Camel processes to share idempotency state across distributed systems.
Key properties include:
topic: Kafka topic name used to broadcast idempotent state changes.bootstrapServers: Kafka brokers for producer and consumer.groupId: Consumer group ID for the repository.maxCacheSize: Size of local cache for recent keys.pollDurationMs: Duration of Kafka consumer poll for cache updates.producerConfigandconsumerConfig: To support advanced Kafka client configuration.
Manual Commit API
Supports manual offset commits using `KafkaManualCommit` API available in Camel message headers. Manual commit must be enabled via `allowManualCommit=true` and typically requires disabling auto-commit. Commit can be synchronous or asynchronous (via `KafkaManualCommitFactory`).
Users must be aware of thread safety restrictions: Kafka consumer commits must happen on the consumer thread to avoid concurrency exceptions.
Pausable Consumers
Consumers can be paused/resumed dynamically based on external conditions using the `.pausable()` DSL method. This allows graceful handling of transient failures or external system dependencies.
Kafka Transactions
Transactional Kafka producers can be enabled using `transacted=true` or specifying `transactionalId`. This allows atomic writes to Kafka topics, optionally integrated with JTA transactions for XA resource coordination.
Authentication and Security
Supports Kafka security protocols including SASL/PLAIN, SASL/OAUTHBEARER, and SSL. Configuration examples demonstrate how to set JAAS login modules for username/password and OAuth.
Batching Consumer
Enables batch consumption with options to control batch size (`maxPollRecords`), poll timeout (`pollTimeoutMs`), and batch interval (`batchingIntervalMs`). Supports automatic and manual offset commits for batch processing.
Usage Examples
Consuming messages from Kafka
Minimal route to consume from a Kafka topic:
from("kafka:test?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}");
Subscribe to multiple topics:
from("kafka:test,test1,test2?brokers=localhost:9092")
.log("Message received from Kafka : ${body}");
Subscribe using pattern:
from("kafka:test.*?brokers=localhost:9092&topicIsPattern=true")
.log("Message received from Kafka : ${body}");
Using a custom offset repository for manual offset management:
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));
Registry registry = createCamelRegistry();
registry.bind("offsetRepo", repository);
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("kafka:myTopic?brokers=localhost:9092&groupId=myGroup&autoOffsetReset=earliest&offsetRepository=#offsetRepo")
.to("mock:result");
}
});
Producing messages to Kafka
Minimal route to produce a message:
from("direct:start")
.setBody(constant("Message from Camel"))
.setHeader(KafkaConstants.KEY, constant("Camel"))
.to("kafka:test?brokers=localhost:9092");
SSL Configuration
Using endpoint parameters:
from("kafka:topic?brokers=localhost:9092&groupId=group1&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL")
.to("mock:result");
Using `SSLContextParameters` bean:
KeyStoreParameters ksp = new KeyStoreParameters();
ksp.setResource("/path/to/keystore.jks");
ksp.setPassword("changeit");
KeyManagersParameters kmp = new KeyManagersParameters();
kmp.setKeyStore(ksp);
kmp.setKeyPassword("changeit");
SSLContextParameters scp = new SSLContextParameters();
scp.setKeyManagers(kmp);
registry.bind("ssl", scp);
from("kafka:topic?brokers=localhost:9092&groupId=group1&sslContextParameters=#ssl&securityProtocol=SSL")
.to("mock:result");
Manual Commit Example
public void process(Exchange exchange) {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
manual.commit();
}
}
Pausable Consumer Example
from("kafka:topic")
.pausable(new KafkaConsumerListener(), () -> canContinue())
.process(this::processMessage)
.to("some:destination");
Using Kafka Idempotent Repository
KafkaIdempotentRepository repo = new KafkaIdempotentRepository("idempotent-topic", "localhost:9092");
registry.bind("myIdemRepo", repo);
from("direct:input")
.idempotentConsumer(header("id")).idempotentRepository("myIdemRepo")
.to("mock:result");
Key Classes, Interfaces & Methods
KafkaComponent
The main component class that manages Kafka producer and consumer configuration.
Properties:
autoCommitEnable(boolean): Enable/disable auto offset commit.allowManualCommit(boolean): Enable manual commit API.breakOnFirstError(boolean): Whether to commit offset on first error to retry messages.kafkaManualCommitFactory(KafkaManualCommitFactory): Factory for manual commit implementations.setKerberosConfigLocation(String): Static method to specify Kerberos config file location.
Example:
KafkaComponent kafka = new KafkaComponent();
kafka.setAutoCommitEnable(false);
kafka.setAllowManualCommit(true);
kafka.setBreakOnFirstError(true);
camelContext.addComponent("kafka", kafka);
KafkaManualCommit
Interface for manual offset commit control.
Method:
commit()- commits the current offset synchronously.Usage: obtained from Camel Exchange message header
KafkaConstants.MANUAL_COMMIT.
PollExceptionStrategy
Interface to customize error handling strategies during Kafka consumer poll exceptions.
Implementations control how to react to retriable and non-retriable exceptions.
SubscribeAdapter
Interface to customize subscription logic for Kafka consumers.
Method:
subscribe(Consumer, ConsumerRebalanceListener, TopicInfo)Allows subscription by topic list or pattern.
KafkaIdempotentRepository
Idempotent repository implementation backed by a Kafka topic.
Properties include
topic,bootstrapServers,groupId,maxCacheSize,pollDurationMs, etc.Usage requires registering in Camel registry.
Supports distributed caching and event sourcing for idempotency.
HeaderFilterStrategy
Strategy interface for filtering Kafka headers during propagation to/from Camel exchange.
Default filters out headers starting with
Camelororg.apache.camel.Can be customized via
headerFilterStrategyURI parameter.
Interaction with Other System Parts
Camel Routes: The Kafka component exposes endpoints for producers and consumers used in Camel routes.
Camel Registry: Beans such as
KafkaIdempotentRepository,SubscribeAdapter,HeaderFilterStrategyare registered here.Camel Context: The component is registered with the CamelContext, enabling routing and lifecycle management.
Kafka Broker: Communicates with Kafka brokers over configured protocols (PLAINTEXT, SSL, SASL).
JMS Component: Supports interoperability scenarios where Kafka messages with JMS headers are consumed or produced.
JTA Transaction Manager: Can be integrated for distributed transactions involving Kafka producers.
Important Notes
Kafka consumer commits must be done on the consumer thread due to Kafka client thread-safety constraints.
Manual commits and
breakOnFirstErrorrequire careful usage to avoid message loss or duplicate processing.Batch consumer settings balance latency and throughput; tuning
pollTimeoutMsandbatchingIntervalMsis critical.Kafka transactions require unique
transactionalIdper producer endpoint.Header propagation only supports certain data types (
String,Integer,Long,Double,Boolean,byte[]).OAuth and JAAS configuration can be complex; refer to Kafka documentation and Strimzi OAuth README for details.
Pausable consumers are intended as a reactive mechanism to transient failures; prefer RoutePolicy for general route control.
Mermaid Component Diagram
classDiagram
class KafkaComponent {
+autoCommitEnable: boolean
+allowManualCommit: boolean
+breakOnFirstError: boolean
+kafkaManualCommitFactory: KafkaManualCommitFactory
+setKerberosConfigLocation(location: String)
}
class KafkaManualCommit {
+commit()
}
class PollExceptionStrategy {
+handle(Exception e)
}
class SubscribeAdapter {
+subscribe(consumer, rebalanceListener, topicInfo)
}
class KafkaIdempotentRepository {
+topic: String
+bootstrapServers: String
+groupId: String
+maxCacheSize: int
+pollDurationMs: int
+producerConfig: Properties
+consumerConfig: Properties
}
KafkaComponent --> KafkaManualCommit : uses
KafkaComponent --> PollExceptionStrategy : configurable
KafkaComponent --> SubscribeAdapter : configurable
KafkaComponent --> KafkaIdempotentRepository : integrates with
Summary
This file documents the Camel Kafka component, which facilitates producing and consuming messages to/from Apache Kafka brokers. It covers extensive features including error handling, manual commits, batching, transactions, idempotent repositories, security configurations, and interoperability. The component integrates tightly with Camel’s routing and registry mechanisms, enabling powerful Kafka-based integration patterns with flexible customization options.
The documentation provides usage examples, configuration instructions, and guidance on advanced scenarios such as manual commit handling and idempotent message consumption to ensure reliable and performant Kafka messaging within Camel applications.