Pause and Resume

Purpose

This subtopic addresses the need to temporarily halt and subsequently resume Kafka message consumption within Apache Camel Kafka consumers. Pausing is essential for managing transient processing issues, applying backpressure, or reacting to consumer-side errors without fully stopping the consumer. Resuming restores message flow once the conditions that triggered the pause have been resolved.

Unlike simply stopping and restarting the consumer, pause and resume enable fine-grained control over consumption lifecycle, allowing for smoother error recovery and operational flexibility. This feature is deeply integrated with error handling and circuit breaker mechanisms to enhance consumer resilience.

Functionality

Core Workflow

Key Components and Behavior

Error and Seek Policy Handling

Integration with Circuit Breaker and Health Checks

Integration

Pause and resume complement the broader Kafka message consumption functionality by enhancing control over the consumer lifecycle without full shutdown. They work alongside:

This subtopic introduces dynamic control over message flow in the consumer, extending beyond simple start/stop semantics covered in the parent topic and other subtopics.

Diagram

sequenceDiagram
    participant KafkaConsumer
    participant FetchTask as KafkaFetchRecords Task
    participant KafkaBroker
    participant ConsumerListener as KafkaConsumerListener

    KafkaConsumer->>FetchTask: start polling
    FetchTask->>KafkaBroker: poll messages
    KafkaBroker-->>FetchTask: ConsumerRecords
    FetchTask->>KafkaConsumer: deliver records
    KafkaConsumer->>ConsumerListener: process records
    alt processing success
        ConsumerListener-->>FetchTask: continue polling
    else processing failure
        ConsumerListener->>FetchTask: pause consumption
        FetchTask-->>KafkaBroker: pause assigned partitions
        ConsumerListener->>FetchTask: seek offsets (beginning/end) if needed
        loop wait for recovery
            ConsumerListener->>ConsumerListener: evaluate resume predicate
            alt resume condition met
                ConsumerListener->>FetchTask: resume consumption
                FetchTask-->>KafkaBroker: resume assigned partitions
                break
            else not ready
                ConsumerListener-->>ConsumerListener: remain paused
            end
        end
    end

Code Snippets Highlight

@Override
protected void doSuspend() throws Exception {
    for (KafkaFetchRecords task : tasks) {
        LOG.info("Pausing Kafka record fetcher task running client ID {}", task.healthState().getClientId());
        task.pause();
    }
    super.doSuspend();
}

@Override
protected void doResume() throws Exception {
    for (KafkaFetchRecords task : tasks) {
        LOG.info("Resuming Kafka record fetcher task running client ID {}", task.healthState().getClientId());
        task.resume();
    }
    super.doResume();
}
@Override
public boolean afterProcess(ProcessingResult result) {
    if (result.isFailed()) {
        LOG.warn("Pausing consumer due to error on the last processing");
        consumer.pause(consumer.assignment());
        paused = true;

        if (seekPolicy == SeekPolicy.BEGINNING) {
            consumer.seekToBeginning(consumer.assignment());
        } else if (seekPolicy == SeekPolicy.END) {
            consumer.seekToEnd(consumer.assignment());
        }
        return false;
    }
    return true;
}

@Override
public boolean afterConsume(Object ignored) {
    if (paused) {
        if (afterConsumeEval.test(null)) {
            LOG.warn("State changed, therefore resuming the consumer");
            consumer.resume(consumer.assignment());
            return true;
        }
        LOG.warn("The consumer is not yet resumable");
        return false;
    }
    return true;
}

This pause and resume mechanism provides a resilient, responsive approach to managing Kafka consumption flow, tightly integrated with error handling and offset management to support stable and fault-tolerant streaming applications.