task_executor.py


Overview

task_executor.py is a core asynchronous worker module for the InfiniFlow RAG (Retrieval-Augmented Generation) system, responsible for executing document processing tasks from a distributed queue. It:

This module is the bridge between task queueing, document processing pipelines, and vector search indexing, enabling scalable and fault-tolerant content ingestion for the RAG system.


Classes and Exceptions

TaskCanceledException

Custom exception raised when a task is detected as canceled during processing.

Usage Example:

if has_canceled(task_id):
    raise TaskCanceledException("Task has been canceled")

Functions and Methods

signal_handler(sig, frame)

Handles termination signals (SIGINT, SIGTERM). It logs the shutdown signal, sets the stop event to terminate the main loop gracefully, waits briefly, and then exits the process.


start_tracemalloc_and_snapshot(signum, frame)

Starts Python's tracemalloc memory tracing if not already started, takes a memory snapshot, and saves it to a timestamped file inside the logs directory. Logs current and peak memory usage.


stop_tracemalloc(signum, frame)

Stops tracemalloc tracing if active and logs the action.


set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing...")

Updates the progress and status message of a task in the database. Checks if the task has been canceled, in which case it raises TaskCanceledException.

set_progress(task_id="123", from_page=0, to_page=10, prog=0.5, msg="Halfway done")

async collect() -> (redis_msg, task)

Fetches a task message from Redis queues using consumer groups. Handles unacknowledged messages first, then new messages. Checks if the task exists and is not canceled.


async get_storage_binary(bucket, name) -> bytes

Fetches a binary file from object storage (MinIO) asynchronously using a thread pool.


async build_chunks(task: dict, progress_callback) -> list[dict]

Downloads a document file from storage, parses it into chunks using the associated parser, and uploads any images embedded in chunks back to storage. Supports keyword extraction, question proposal, and content tagging for the chunks using LLM calls.


init_kb(row: dict, vector_size: int) -> object

Initializes the knowledge base index for a tenant and knowledge base ID with the specified vector size in the document store.


async embedding(docs: list, mdl, parser_config=None, callback=None) -> (int, int)

Generates embedding vectors for a list of document chunks using the embedding model. It computes weighted embeddings combining title and content embeddings, supports batching, and calls back with progress updates.


async run_dataflow(dsl: str, tenant_id: str, doc_id: str, task_id: str, flow_id: str, callback=None)

Executes a dataflow pipeline based on a DSL string, tenant, document, task, and flow IDs.


async run_raptor(row: dict, chat_mdl, embd_mdl, vector_size: int, callback=None) -> (list, int)

Runs the Recursive Abstractive Processing for Tree-organized Retrieval (RAPTOR) algorithm on document chunks to cluster and abstract content.


async do_handle_task(task: dict)

Main logic to process a single task. It:


async handle_task()

Wrapper to collect a task and execute it with concurrency limits. Updates global counters for done and failed tasks.


async report_status()

Periodically reports the worker's heartbeat status, including task counts and current running tasks, to Redis sorted sets. Also cleans up expired worker entries.


async task_manager()

Wrapper that runs handle_task and releases the concurrency semaphore afterward.


async main()

The entrypoint for the task executor worker:


Implementation Details and Algorithms


Interaction with Other Components


Visual Diagram

classDiagram
    class TaskCanceledException {
        +msg: str
    }

    class TaskExecutor {
        -CONSUMER_NAME: str
        -MAX_CONCURRENT_TASKS: int
        -task_limiter: Semaphore
        -chunk_limiter: CapacityLimiter
        -embed_limiter: CapacityLimiter
        -minio_limiter: CapacityLimiter
        -kg_limiter: CapacityLimiter
        +signal_handler(sig, frame)
        +start_tracemalloc_and_snapshot(signum, frame)
        +stop_tracemalloc(signum, frame)
        +set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing...")
        +collect() async
        +get_storage_binary(bucket, name) async
        +build_chunks(task, progress_callback) async
        +init_kb(row, vector_size)
        +embedding(docs, mdl, parser_config=None, callback=None) async
        +run_dataflow(dsl, tenant_id, doc_id, task_id, flow_id, callback=None) async
        +run_raptor(row, chat_mdl, embd_mdl, vector_size, callback=None) async
        +do_handle_task(task) async
        +handle_task() async
        +report_status() async
        +task_manager() async
        +main() async
    }

    TaskExecutor --> TaskCanceledException : raises
    TaskExecutor ..> "api.db.services.task_service.TaskService" : uses
    TaskExecutor ..> "api.db.services.document_service.DocumentService" : uses
    TaskExecutor ..> "api.db.services.llm_service.LLMBundle" : uses
    TaskExecutor ..> "api.utils.log_utils" : uses
    TaskExecutor ..> "rag.utils.redis_conn.REDIS_CONN" : uses
    TaskExecutor ..> "rag.raptor.RecursiveAbstractiveProcessing4TreeOrganizedRetrieval" : uses
    TaskExecutor ..> "graphrag.general.index.run_graphrag" : uses
    TaskExecutor ..> "rag.flow.pipeline.Pipeline" : uses

Usage Example

The script is intended to be run as a standalone worker process. For example:

python task_executor.py 1

This will start a consumer worker named task_executor_1 which listens to task queues, processes tasks concurrently, and reports status.


Summary

task_executor.py is a sophisticated, asynchronous task execution engine within the InfiniFlow RAG system. It integrates distributed task consumption, document chunking/parsing, embedding generation, and indexing workflows, leveraging concurrency controls, LLM-powered enhancements, and fault tolerance mechanisms. It serves as a critical backend component ensuring documents are processed and indexed efficiently for downstream retrieval and generation tasks.