Transactional Sending

Purpose

Transactional sending addresses the need for atomicity and consistency when producing messages to Kafka within Apache Camel routes. In scenarios where multiple messages must be sent as a single unit of work, transactional sending ensures that either all messages are successfully published or none are, preventing partial updates and maintaining data integrity. This subtopic enables Kafka producers to participate in Kafka's native transactions, providing commit and abort capabilities tied to the lifecycle of a Camel exchange.

Functionality

This subtopic implements Kafka transaction support by integrating Kafka’s transactional API with Camel’s unit of work model. Key aspects include:

Critical Interaction Snippet: Starting a Kafka Transaction

private void startKafkaTransaction(Exchange exchange) {
    UnitOfWork uow = exchange.getUnitOfWork();

    if (!uow.isTransactedBy(transactionId)) {
        LOG.debug("Starting kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId());
        uow.beginTransactedBy(transactionId);
        kafkaProducer.beginTransaction();
        uow.addSynchronization(new KafkaTransactionSynchronization(transactionId, kafkaProducer));
    } else {
        LOG.debug("Using existing kafka transaction {} with exchange {}.", transactionId, exchange.getExchangeId());
    }
}

This method ensures a Kafka transaction is started once per exchange and registers synchronization for commit/abort.

Transaction Synchronization Behavior

@Override
public void onDone(Exchange exchange) {
    try {
        if (exchange.getException() != null || exchange.isRollbackOnly()) {
            if (exchange.getException() instanceof KafkaException) {
                LOG.warn("Catch {} and will close kafka producer with transaction {} ", exchange.getException(), transactionId);
                kafkaProducer.close();
            } else {
                LOG.warn("Abort kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId());
                kafkaProducer.abortTransaction();
            }
        } else {
            LOG.debug("Commit kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId());
            kafkaProducer.commitTransaction();
        }
    } catch (KafkaException e) {
        exchange.setException(e);
    } catch (Exception e) {
        exchange.setException(e);
        LOG.warn("Abort kafka transaction {} with exchange {} due to {} ", transactionId, exchange.getExchangeId(), e.getMessage(), e);
        kafkaProducer.abortTransaction();
    } finally {
        exchange.getUnitOfWork().endTransactedBy(transactionId);
    }
}

This snippet commits or aborts the Kafka transaction depending on the outcome of the exchange processing.

Integration

Transactional sending builds on the Kafka message production capabilities by layering transactional boundaries around message sends. It complements other subtopics as follows:

This subtopic introduces transaction lifecycle management tightly coupled with the Camel exchange, which is not covered by the parent topic’s general production or other subtopics.

Diagram

sequenceDiagram
    participant CamelRoute as Camel Route
    participant KafkaProducer as Kafka Producer
    participant UnitOfWork as Camel UnitOfWork
    participant KafkaBroker as Kafka Broker

    CamelRoute->>KafkaProducer: process(exchange)
    KafkaProducer->>UnitOfWork: beginTransactedBy(transactionId) if needed
    KafkaProducer->>KafkaProducer: beginTransaction()
    Note right of KafkaProducer: Create ProducerRecords and send messages
    KafkaProducer->>KafkaBroker: send(ProducerRecord)
    KafkaBroker-->>KafkaProducer: ack
    CamelRoute->>UnitOfWork: completeExchange()
    UnitOfWork->>KafkaTransactionSynchronization: onDone()
    alt exchange successful
        KafkaTransactionSynchronization->>KafkaProducer: commitTransaction()
    else exchange failed or rollback
        KafkaTransactionSynchronization->>KafkaProducer: abortTransaction()
    end

This sequence diagram illustrates the core transactional sending workflow: starting the transaction, sending messages, and committing or aborting based on exchange outcome.


Transactional sending extends Kafka message production by enforcing atomicity and consistency through Kafka’s transactional API integrated with Camel’s exchange lifecycle, enabling reliable and fault-tolerant Kafka message publishing within Camel routes.