KafkaTransactionSynchronization.java


Overview

`KafkaTransactionSynchronization.java` is a core internal class within the Apache Camel Kafka component designed to integrate Kafka's native transactional model with Camel's **Unit of Work** lifecycle. It acts as a synchronization callback that manages Kafka transaction boundaries (commit or abort) based on the outcome of a Camel `Exchange`.

The class listens to the completion event of an `Exchange` and performs the appropriate Kafka transactional operation:

This mechanism ensures atomicity and consistency of Kafka message production within Camel routes, tightly coupling Kafka transactions with Camel's routing and error handling semantics.


Class: KafkaTransactionSynchronization

Package

org.apache.camel.component.kafka

Description

`KafkaTransactionSynchronization` extends Camel's `SynchronizationAdapter` to hook into the lifecycle of a Camel `Exchange`. It provides transactional semantics for Kafka message producing by committing or aborting the Kafka transaction on exchange completion.

Properties

Property

Type

Description

`transactionId`

`String`

Unique identifier for the Kafka transaction. Used to mark the transaction within Camel's UnitOfWork.

`kafkaProducer`

`Producer` (Kafka API)

The Kafka producer instance managing the Kafka transaction.

Constructor

public KafkaTransactionSynchronization(String transactionId, Producer kafkaProducer)
Producer<String, String> producer = ...; // Initialized Kafka Producer with transactions enabled
String txId = "tx12345";
KafkaTransactionSynchronization sync = new KafkaTransactionSynchronization(txId, producer);
exchange.getUnitOfWork().addSynchronization(sync);

Methods

void onDone(Exchange exchange)

This method is called by Camel when the associated `Exchange` completes—whether successfully or with failure.

if (exchange failed or rollback) {
    if (exception is KafkaException) {
        close kafkaProducer;
    } else {
        abort kafkaProducer transaction;
    }
} else {
    commit kafkaProducer transaction;
}
end transactional unit of work;

Implementation Details and Algorithms


Interaction with Other Components


Usage Example

// Inside Kafka Producer component or route processor managing transactions
Producer<String, String> kafkaProducer = ...; // initialized with transactions enabled
String transactionId = "tx-001";

// Begin the Kafka transaction explicitly
kafkaProducer.beginTransaction();

// Register synchronization on Camel exchange's unit of work
exchange.getUnitOfWork().addSynchronization(new KafkaTransactionSynchronization(transactionId, kafkaProducer));

// Proceed with sending messages within the exchange

// Upon exchange completion, KafkaTransactionSynchronization will commit or abort transaction accordingly

Mermaid Class Diagram

classDiagram
    class KafkaTransactionSynchronization {
        -String transactionId
        -Producer kafkaProducer
        +KafkaTransactionSynchronization(String transactionId, Producer kafkaProducer)
        +void onDone(Exchange exchange)
    }
    KafkaTransactionSynchronization --|> SynchronizationAdapter

Summary

`KafkaTransactionSynchronization.java` is a focused utility class that bridges Apache Camel's routing lifecycle with Kafka's transactional message production. By hooking into Camel's UnitOfWork completion events, it guarantees Kafka transactions are committed or aborted in accordance with the success or failure of the Camel exchange, thereby ensuring atomic, consistent, and reliable message delivery in Kafka-backed Camel routes.


Additional References


This documentation provides a comprehensive understanding of the role and implementation of `KafkaTransactionSynchronization.java` within the Apache Camel Kafka component ecosystem.