KafkaTransactionIT.java

Overview

`KafkaTransactionIT.java` is an integration test class for validating Kafka transactional message production and consumption within the Apache Camel Kafka component. It ensures that transactional guarantees and concurrency aspects are correctly handled when sending messages to Kafka topics using Camel routes configured for transactions.

The class extends `BaseKafkaTestSupport`, leveraging Camel's testing framework and Kafka clients to:

This file plays a critical role in assuring the reliability and correctness of transactional messaging in the Camel Kafka integration, especially under concurrent load conditions.


Detailed Explanation

Class: KafkaTransactionIT

Integration test class for Kafka transactional messaging.


Lifecycle Methods

@BeforeAll public static void before()

@AfterAll public static void after()


Overridden Methods

protected RouteBuilder createRouteBuilder()

Defines two Camel routes for testing transactional Kafka producers:

  1. Sequential Transaction Route (direct:startTransaction)

    • Sends messages to topic transaction with Kafka producer properties:

      • transactional.id=1234

      • enable.idempotence=true

      • retries=5

      • requestRequiredAcks=-1 (all replicas must acknowledge)

    • After producing the message, a processor checks the message body:

      • If the body contains "fail", it throws a RuntimeException to simulate failure and trigger transaction rollback.

    • The route ends with sending the exchange to a mock endpoint for result verification.

  2. Concurrent Transaction Route (seda:startTransaction)

    • Sends messages to topic concurrency_transaction with Kafka producer properties:

      • transactional.id=5678

      • enable.idempotence=true

      • retries=5

      • synchronous=true (ensures producer sends records synchronously)

    • No additional processing or failure simulation here.


Test Methods

public void concurrencyProducedTransactionMessage() throws InterruptedException

Tests concurrent production of transactional messages.

**Usage Example:**

concurrencyProducedTransactionMessage();

This will verify that concurrent transactional message production and consumption works without message loss.


public void producedTransactionMassageIsReceivedByKafka() throws InterruptedException

Tests sequential transactional message production with simulated failure.

**Usage Example:**

producedTransactionMassageIsReceivedByKafka();

This test validates transactional rollback behavior on failure and confirms metadata properties of produced messages.


Private Helper Methods

private static KafkaConsumer<String, String> createStringKafkaConsumer(final String groupId)

Creates and configures a Kafka consumer for string key/value deserialization.

**Kafka consumer configuration includes:**


private void createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn, String topic, CountDownLatch messagesLatch)

Consumes messages from Kafka topic until the `messagesLatch` count reaches zero.


Implementation Details and Algorithms


Interaction with Other System Components


Visual Diagram

classDiagram
    class KafkaTransactionIT {
        +static final String SEQUENTIAL_TRANSACTION_URI
        +static final String CONCURRENT_TRANSACTION_URI
        +static final String TOPIC_TRANSACTION
        +static final String TOPIC_CONCURRENCY_TRANSACTION
        +static final int THREAD_NUM
        -static KafkaConsumer<String, String> stringsConsumerConn
        +static void before()
        +static void after()
        +RouteBuilder createRouteBuilder()
        +void concurrencyProducedTransactionMessage() throws InterruptedException
        +void producedTransactionMassageIsReceivedByKafka() throws InterruptedException
        -static KafkaConsumer<String, String> createStringKafkaConsumer(String groupId)
        -void createKafkaMessageConsumer(KafkaConsumer<String,String>, String, CountDownLatch)
    }

    KafkaTransactionIT ..> RouteBuilder : creates
    KafkaTransactionIT ..> KafkaConsumer : uses
    KafkaTransactionIT ..> ProducerTemplate : uses
    KafkaTransactionIT ..> MockEndpoint : verifies

Summary

`KafkaTransactionIT.java` is a critical integration test class validating Kafka transactional producer functionality within Apache Camel. It covers sequential and concurrent transaction scenarios, including failure handling and metadata verification. The file combines Camel route configuration, Kafka consumer polling, and multithreaded test execution to ensure reliable and correct transactional message processing. It interacts with Kafka, Camel testing utilities, and Kafka admin clients to manage test lifecycle and resource cleanup.