KafkaConsumerAsyncWithOffsetRepoCommitIT.java

Overview

`KafkaConsumerAsyncWithOffsetRepoCommitIT.java` is an integration test class designed to validate the behavior of the Apache Camel Kafka component when performing **asynchronous manual offset commits** using an external offset repository. Specifically, it tests that the offset repository is properly updated when Kafka consumer commits offsets asynchronously via the `DefaultKafkaManualAsyncCommitFactory`.

This class extends `BaseManualCommitTestSupport` (presumably providing common test utilities and Kafka setup) and focuses on verifying manual commit semantics with offset state management using an in-memory offset repository (`MemoryStateRepository`).


Classes and Methods

KafkaConsumerAsyncWithOffsetRepoCommitIT

This is the sole public class in the file. It contains the integration test logic.

Visibility

Type

Name

Description

public

class

KafkaConsumerAsyncWithOffsetRepoCommitIT

Integration test for async manual commit with offset repository

Fields

Visibility

Type

Name

Description

private static final

`MemoryStateRepository`

`stateRepository`

In-memory offset state repository bean bound to registry

Methods


after()
@AfterEach
public void after()

createRouteBuilder()
@Override
protected RouteBuilder createRouteBuilder()

kafkaManualCommitWithOffsetRepo()
@DisplayName("Tests that the offset repository gets updated when using in conjunction with the Async commit manager")
@Test
public void kafkaManualCommitWithOffsetRepo() throws Exception

Important Implementation Details


Interaction with Other Components


Usage Example

This file is an integration test and not intended for direct use in production code. However, it illustrates how to configure a Kafka consumer route with:

Example Camel route snippet from `createRouteBuilder()`:

from("kafka:testAsyncCommitWithOffsetRepoTest?groupId=KafkaConsumerAsyncCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
     + "&offsetRepository=#bean:stateRepository&allowManualCommit=true&autoOffsetReset=earliest"
     + "&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory")
    .routeId("foo")
    .to("mock:result")
    .process(exchange -> {
        KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
        if (manual != null) {
            manual.commit();
        }
    });

Mermaid Class Diagram

classDiagram
    class KafkaConsumerAsyncWithOffsetRepoCommitIT {
        +static final String TOPIC
        -static MemoryStateRepository stateRepository
        +after()
        +createRouteBuilder() RouteBuilder
        +kafkaManualCommitWithOffsetRepo() void
    }
    KafkaConsumerAsyncWithOffsetRepoCommitIT --|> BaseManualCommitTestSupport

    class MemoryStateRepository {
        +get(String key)
        +set(String key, Object value)
        +clear()
    }

    class RouteBuilder {
        +configure()
    }

    KafkaConsumerAsyncWithOffsetRepoCommitIT ..> MemoryStateRepository : uses
    KafkaConsumerAsyncWithOffsetRepoCommitIT ..> RouteBuilder : returns

Summary

`KafkaConsumerAsyncWithOffsetRepoCommitIT.java` is a focused integration test validating the Apache Camel Kafka component's capability to perform asynchronous manual offset commits while persisting offset states externally via a state repository. It demonstrates configuring Kafka consumer routes with manual commit enabled, integrating an in-memory offset repository, and verifying that commits update this state correctly. The test ensures reliable offset management and message processing in scenarios requiring precise control over offset commits beyond Kafka's auto-commit mechanism.