task_executor.py
Overview
task_executor.py is a core asynchronous worker module for the InfiniFlow RAG (Retrieval-Augmented Generation) system, responsible for executing document processing tasks from a distributed queue. It:
Consumes tasks from Redis queue(s) using consumer groups.
Downloads document files from MinIO object storage.
Parses and chunks documents into smaller pieces using a variety of parser types.
Generates embeddings for document chunks using specified embedding models.
Optionally generates keywords, questions, and content tags for chunks using LLMs.
Indexes chunks into a vector search system (like Elasticsearch or Infinity).
Supports advanced workflows like dataflow pipelines, RAPTOR clustering, and graph-based chunking (graphrag).
Monitors and reports task execution status and worker heartbeats.
Handles task cancellation and failure reporting.
Manages concurrency and resource limits using Trio synchronization primitives.
This module is the bridge between task queueing, document processing pipelines, and vector search indexing, enabling scalable and fault-tolerant content ingestion for the RAG system.
Classes and Exceptions
TaskCanceledException
Custom exception raised when a task is detected as canceled during processing.
Usage Example:
if has_canceled(task_id):
raise TaskCanceledException("Task has been canceled")
Functions and Methods
signal_handler(sig, frame)
Handles termination signals (SIGINT, SIGTERM). It logs the shutdown signal, sets the stop event to terminate the main loop gracefully, waits briefly, and then exits the process.
Parameters:
sig: Signal number.frame: Current stack frame.
start_tracemalloc_and_snapshot(signum, frame)
Starts Python's tracemalloc memory tracing if not already started, takes a memory snapshot, and saves it to a timestamped file inside the logs directory. Logs current and peak memory usage.
Parameters:
signum: Signal number.frame: Current stack frame.
stop_tracemalloc(signum, frame)
Stops tracemalloc tracing if active and logs the action.
Parameters:
signum: Signal number.frame: Current stack frame.
set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing...")
Updates the progress and status message of a task in the database. Checks if the task has been canceled, in which case it raises TaskCanceledException.
Parameters:
task_id(str): Identifier of the task.from_page(int): Starting page number for progress reporting.to_page(int): Ending page number.prog(float|int|None): Progress value, negative indicates error.msg(str): Status message string.
Raises:
TaskCanceledExceptionif the task is canceled.
Example Usage:
set_progress(task_id="123", from_page=0, to_page=10, prog=0.5, msg="Halfway done")
async collect() -> (redis_msg, task)
Fetches a task message from Redis queues using consumer groups. Handles unacknowledged messages first, then new messages. Checks if the task exists and is not canceled.
Returns:
redis_msg: Redis message object for later acknowledgment.task(dict): Task data from DB.
Returns
(None, None)if no valid task found.
async get_storage_binary(bucket, name) -> bytes
Fetches a binary file from object storage (MinIO) asynchronously using a thread pool.
Parameters:
bucket(str): Storage bucket name.name(str): Object name (file path).
Returns:
File content bytes.
async build_chunks(task: dict, progress_callback) -> list[dict]
Downloads a document file from storage, parses it into chunks using the associated parser, and uploads any images embedded in chunks back to storage. Supports keyword extraction, question proposal, and content tagging for the chunks using LLM calls.
Parameters:
task(dict): Task details including document ID, parser id, page ranges, language, configs.progress_callback(callable): Function to update progress.
Returns:
List of chunk dictionaries, each representing a piece of the document with metadata and content.
Raises:
TaskCanceledExceptionif task canceled.Various exceptions on download or chunking failure.
init_kb(row: dict, vector_size: int) -> object
Initializes the knowledge base index for a tenant and knowledge base ID with the specified vector size in the document store.
Parameters:
row(dict): Task or document metadata containing tenant_id and kb_id.vector_size(int): Dimensionality of embedding vectors.
Returns:
Index object from the document store.
async embedding(docs: list, mdl, parser_config=None, callback=None) -> (int, int)
Generates embedding vectors for a list of document chunks using the embedding model. It computes weighted embeddings combining title and content embeddings, supports batching, and calls back with progress updates.
Parameters:
docs(list): List of chunk dictionaries.mdl: Embedding model bundle instance.parser_config(dict|None): Parser configuration options (e.g., filename embedding weight).callback(callable|None): Progress callback function.
Returns:
tk_count(int): Total token count processed.vector_size(int): Embedding vector size.
async run_dataflow(dsl: str, tenant_id: str, doc_id: str, task_id: str, flow_id: str, callback=None)
Executes a dataflow pipeline based on a DSL string, tenant, document, task, and flow IDs.
Parameters:
dsl(str): Domain-specific language pipeline description.tenant_id(str): Tenant identifier.doc_id(str): Document identifier.task_id(str): Task identifier.flow_id(str): Flow identifier.callback(callable|None): Optional progress callback.
async run_raptor(row: dict, chat_mdl, embd_mdl, vector_size: int, callback=None) -> (list, int)
Runs the Recursive Abstractive Processing for Tree-organized Retrieval (RAPTOR) algorithm on document chunks to cluster and abstract content.
Parameters:
row(dict): Task/document metadata including parser config.chat_mdl: Chat LLM model bundle.embd_mdl: Embedding model bundle.vector_size(int): Vector embedding dimension.callback(callable|None): Progress callback.
Returns:
List of new or updated chunks.
Token count processed.
async do_handle_task(task: dict)
Main logic to process a single task. It:
Checks task cancellation.
Initializes models and knowledge base index.
Depending on the task type (
dataflow,raptor,graphrag, or standard), it calls appropriate processing flows.Chunks documents, generates embeddings, and indexes them.
Updates progress and handles errors.
Parameters:
task(dict): Task metadata and parameters.
Raises:
Exceptions on failures, which propagate to caller.
async handle_task()
Wrapper to collect a task and execute it with concurrency limits. Updates global counters for done and failed tasks.
async report_status()
Periodically reports the worker's heartbeat status, including task counts and current running tasks, to Redis sorted sets. Also cleans up expired worker entries.
async task_manager()
Wrapper that runs handle_task and releases the concurrency semaphore afterward.
async main()
The entrypoint for the task executor worker:
Initializes logging and environment.
Sets up signal handlers.
Starts background coroutine to report status.
Runs an event loop that continually acquires concurrency permits and starts task processing coroutines.
Runs until stopped by external signal.
Implementation Details and Algorithms
Concurrency Control: Uses Trio semaphores and capacity limiters to throttle simultaneous tasks, chunk building, embedding generation, and MinIO uploads.
Task Queue Consumption: Uses Redis consumer groups with unacknowledged message handling for reliability.
Document Parsing: Supports multiple parser types selected dynamically via a factory dictionary based on
parser_id.Chunking: Documents are chunked asynchronously with progress callbacks; chunks may contain images saved separately.
Embedding: Uses batch encoding for embeddings, combining title and content embeddings with configurable weights.
LLM-Powered Features: Uses caching and concurrency-limited calls to LLMs for keyword extraction, question proposal, and content tagging.
RAPTOR Algorithm: Implements recursive clustering and abstraction of chunks to improve retrieval quality.
Graphrag: Supports graph-based chunking workflows for knowledge graph construction.
Task Cancellation: Periodically checks for task cancellation and aborts processing gracefully.
Progress Reporting: Updates task progress in the database with timestamps and detailed messages.
Fault Handling: Logs exceptions, reports errors in task progress, and handles missing files or model failures.
Memory Profiling: Supports signal-triggered memory snapshots via
tracemalloc.
Interaction with Other Components
Redis (via
REDIS_CONN): Task queue backend and status reporting.MinIO (via
STORAGE_IMPL): Object storage for raw document files and chunk images.Database Services (
TaskService,DocumentService): Task metadata management and document statistics updating.LLM Bundles (
LLMBundle): Provides chat and embedding model interfaces for NLP tasks.Document Parsers (
FACTORY): Selects parser modules for chunking documents.Vector Search (
settings.docStoreConn): Indexes chunks into vector search engine.RAGFlow Pipeline: Runs dataflow pipelines for advanced workflows.
RAPTOR & Graphrag: Implements recursive clustering and graph-based chunking.
Logging: Uses Python logging with configured root logger.
Task Cancellation and Progress: Uses DB flags and progress updates for control and feedback.
Trio Library: For asynchronous concurrency primitives and cooperative multitasking.
Visual Diagram
classDiagram
class TaskCanceledException {
+msg: str
}
class TaskExecutor {
-CONSUMER_NAME: str
-MAX_CONCURRENT_TASKS: int
-task_limiter: Semaphore
-chunk_limiter: CapacityLimiter
-embed_limiter: CapacityLimiter
-minio_limiter: CapacityLimiter
-kg_limiter: CapacityLimiter
+signal_handler(sig, frame)
+start_tracemalloc_and_snapshot(signum, frame)
+stop_tracemalloc(signum, frame)
+set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing...")
+collect() async
+get_storage_binary(bucket, name) async
+build_chunks(task, progress_callback) async
+init_kb(row, vector_size)
+embedding(docs, mdl, parser_config=None, callback=None) async
+run_dataflow(dsl, tenant_id, doc_id, task_id, flow_id, callback=None) async
+run_raptor(row, chat_mdl, embd_mdl, vector_size, callback=None) async
+do_handle_task(task) async
+handle_task() async
+report_status() async
+task_manager() async
+main() async
}
TaskExecutor --> TaskCanceledException : raises
TaskExecutor ..> "api.db.services.task_service.TaskService" : uses
TaskExecutor ..> "api.db.services.document_service.DocumentService" : uses
TaskExecutor ..> "api.db.services.llm_service.LLMBundle" : uses
TaskExecutor ..> "api.utils.log_utils" : uses
TaskExecutor ..> "rag.utils.redis_conn.REDIS_CONN" : uses
TaskExecutor ..> "rag.raptor.RecursiveAbstractiveProcessing4TreeOrganizedRetrieval" : uses
TaskExecutor ..> "graphrag.general.index.run_graphrag" : uses
TaskExecutor ..> "rag.flow.pipeline.Pipeline" : uses
Usage Example
The script is intended to be run as a standalone worker process. For example:
python task_executor.py 1
This will start a consumer worker named task_executor_1 which listens to task queues, processes tasks concurrently, and reports status.
Summary
task_executor.py is a sophisticated, asynchronous task execution engine within the InfiniFlow RAG system. It integrates distributed task consumption, document chunking/parsing, embedding generation, and indexing workflows, leveraging concurrency controls, LLM-powered enhancements, and fault tolerance mechanisms. It serves as a critical backend component ensuring documents are processed and indexed efficiently for downstream retrieval and generation tasks.