kafka-component.adoc

Kafka Component

**Version:** Since Camel 2.13 **Artifact:** camel-kafka **Support level:** Stable **Description:** Send and receive messages to/from an Apache Kafka broker. **Component header:** Both producer and consumer are supported

Overview

The Kafka component provides integration between Apache Camel and the Apache Kafka message broker. It enables Camel routes to produce messages to Kafka topics and consume messages from Kafka topics with full support for both producers and consumers.

Apache Kafka is a distributed streaming platform widely used for building real-time data pipelines and streaming apps. This component simplifies interaction with Kafka by exposing a Camel endpoint that supports the configuration of Kafka producers and consumers.

Maven users must include the `camel-kafka` dependency with the same version as the Camel core to use this component.

URI Format

kafka:topic[?options]

Key Features

Implementation Details & Algorithms

Consumer Error Handling

Kafka consumers can encounter exceptions during polling, such as deserialization errors or transient broker issues. The component classifies exceptions as retriable (`RetriableException`) or not, and applies one of several strategies controlled by the `pollOnError` configuration:

Advanced users can implement `PollExceptionStrategy` for custom behavior.

The `breakOnFirstError` option controls whether the consumer commits offsets immediately after encountering an error, enabling retries of failed messages.

Kafka Idempotent Repository

The component offers a distributed idempotent repository backed by a Kafka topic that stores changes as events. Each instance maintains a local in-memory cache rebuilt on startup by consuming the entire topic. This allows multiple Camel processes to share idempotency state across distributed systems.

Key properties include:

Manual Commit API

Supports manual offset commits using `KafkaManualCommit` API available in Camel message headers. Manual commit must be enabled via `allowManualCommit=true` and typically requires disabling auto-commit. Commit can be synchronous or asynchronous (via `KafkaManualCommitFactory`).

Users must be aware of thread safety restrictions: Kafka consumer commits must happen on the consumer thread to avoid concurrency exceptions.

Pausable Consumers

Consumers can be paused/resumed dynamically based on external conditions using the `.pausable()` DSL method. This allows graceful handling of transient failures or external system dependencies.

Kafka Transactions

Transactional Kafka producers can be enabled using `transacted=true` or specifying `transactionalId`. This allows atomic writes to Kafka topics, optionally integrated with JTA transactions for XA resource coordination.

Authentication and Security

Supports Kafka security protocols including SASL/PLAIN, SASL/OAUTHBEARER, and SSL. Configuration examples demonstrate how to set JAAS login modules for username/password and OAuth.

Batching Consumer

Enables batch consumption with options to control batch size (`maxPollRecords`), poll timeout (`pollTimeoutMs`), and batch interval (`batchingIntervalMs`). Supports automatic and manual offset commits for batch processing.

Usage Examples

Consuming messages from Kafka

Minimal route to consume from a Kafka topic:

from("kafka:test?brokers=localhost:9092")
    .log("Message received from Kafka : ${body}")
    .log("    on the topic ${headers[kafka.TOPIC]}")
    .log("    on the partition ${headers[kafka.PARTITION]}")
    .log("    with the offset ${headers[kafka.OFFSET]}")
    .log("    with the key ${headers[kafka.KEY]}");

Subscribe to multiple topics:

from("kafka:test,test1,test2?brokers=localhost:9092")
    .log("Message received from Kafka : ${body}");

Subscribe using pattern:

from("kafka:test.*?brokers=localhost:9092&topicIsPattern=true")
    .log("Message received from Kafka : ${body}");

Using a custom offset repository for manual offset management:

FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));
Registry registry = createCamelRegistry();
registry.bind("offsetRepo", repository);

DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() {
        from("kafka:myTopic?brokers=localhost:9092&groupId=myGroup&autoOffsetReset=earliest&offsetRepository=#offsetRepo")
            .to("mock:result");
    }
});

Producing messages to Kafka

Minimal route to produce a message:

from("direct:start")
    .setBody(constant("Message from Camel"))
    .setHeader(KafkaConstants.KEY, constant("Camel"))
    .to("kafka:test?brokers=localhost:9092");

SSL Configuration

Using endpoint parameters:

from("kafka:topic?brokers=localhost:9092&groupId=group1&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL")
    .to("mock:result");

Using `SSLContextParameters` bean:

KeyStoreParameters ksp = new KeyStoreParameters();
ksp.setResource("/path/to/keystore.jks");
ksp.setPassword("changeit");
KeyManagersParameters kmp = new KeyManagersParameters();
kmp.setKeyStore(ksp);
kmp.setKeyPassword("changeit");
SSLContextParameters scp = new SSLContextParameters();
scp.setKeyManagers(kmp);

registry.bind("ssl", scp);

from("kafka:topic?brokers=localhost:9092&groupId=group1&sslContextParameters=#ssl&securityProtocol=SSL")
    .to("mock:result");

Manual Commit Example

public void process(Exchange exchange) {
    KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    if (manual != null) {
        manual.commit();
    }
}

Pausable Consumer Example

from("kafka:topic")
    .pausable(new KafkaConsumerListener(), () -> canContinue())
    .process(this::processMessage)
    .to("some:destination");

Using Kafka Idempotent Repository

KafkaIdempotentRepository repo = new KafkaIdempotentRepository("idempotent-topic", "localhost:9092");
registry.bind("myIdemRepo", repo);

from("direct:input")
    .idempotentConsumer(header("id")).idempotentRepository("myIdemRepo")
    .to("mock:result");

Key Classes, Interfaces & Methods

KafkaComponent

Example:

KafkaComponent kafka = new KafkaComponent();
kafka.setAutoCommitEnable(false);
kafka.setAllowManualCommit(true);
kafka.setBreakOnFirstError(true);
camelContext.addComponent("kafka", kafka);

KafkaManualCommit

PollExceptionStrategy

SubscribeAdapter

KafkaIdempotentRepository

HeaderFilterStrategy

Interaction with Other System Parts

Important Notes


Mermaid Component Diagram

classDiagram
    class KafkaComponent {
        +autoCommitEnable: boolean
        +allowManualCommit: boolean
        +breakOnFirstError: boolean
        +kafkaManualCommitFactory: KafkaManualCommitFactory
        +setKerberosConfigLocation(location: String)
    }

    class KafkaManualCommit {
        +commit()
    }

    class PollExceptionStrategy {
        +handle(Exception e)
    }

    class SubscribeAdapter {
        +subscribe(consumer, rebalanceListener, topicInfo)
    }

    class KafkaIdempotentRepository {
        +topic: String
        +bootstrapServers: String
        +groupId: String
        +maxCacheSize: int
        +pollDurationMs: int
        +producerConfig: Properties
        +consumerConfig: Properties
    }

    KafkaComponent --> KafkaManualCommit : uses
    KafkaComponent --> PollExceptionStrategy : configurable
    KafkaComponent --> SubscribeAdapter : configurable
    KafkaComponent --> KafkaIdempotentRepository : integrates with

Summary

This file documents the Camel Kafka component, which facilitates producing and consuming messages to/from Apache Kafka brokers. It covers extensive features including error handling, manual commits, batching, transactions, idempotent repositories, security configurations, and interoperability. The component integrates tightly with Camel’s routing and registry mechanisms, enabling powerful Kafka-based integration patterns with flexible customization options.

The documentation provides usage examples, configuration instructions, and guidance on advanced scenarios such as manual commit handling and idempotent message consumption to ensure reliable and performant Kafka messaging within Camel applications.