task_service.py
Overview
task_service.py is a core backend service module in the InfiniFlow system responsible for managing document processing tasks. It provides a comprehensive interface for creating, querying, updating, and managing tasks that process various document types—including PDFs and Excel files—within the context of knowledge bases and tenants.
The file implements:
A
TaskServiceclass that extends a common service for database operations on task entities.Functions to queue and chunk document processing tasks efficiently.
Mechanisms to reuse previously processed chunks to optimize performance.
Support for task cancellation and progress tracking.
Integration with Redis queues for task distribution and asynchronous processing.
Handling of different document types and parser configurations with tailored task chunking logic.
This module interacts heavily with the database models (Task, Document, Knowledgebase, etc.), document parsing utilities, storage abstractions, and Redis-based task queues to ensure reliable and scalable document processing workflows.
Classes and Functions
TaskService(CommonService)
Service class for managing document processing tasks. Inherits from CommonService, leveraging generic database operations and adding task-specific logic.
Attributes
model: The database model class representing tasks (Task).
Class Methods
get_task(task_id: str) -> dict | None
Retrieve detailed task information by its unique ID, including related document, knowledge base, and tenant metadata.
Parameters:
task_id(str): Unique identifier of the task.
Returns:
dictcontaining comprehensive task information and related metadata, orNoneif not found or retry limit exceeded.
Details:
Joins
TaskwithDocument,Knowledgebase, andTenanttables.Implements retry logic: abandons tasks after 3 retries.
Updates progress message and retry count atomically.
Usage Example:
task_info = TaskService.get_task("task-uuid-1234") if task_info: print("Task found:", task_info) else: print("Task not found or abandoned due to retries")
get_tasks(doc_id: str) -> list[dict] | None
Retrieve all tasks associated with a specific document.
Parameters:
doc_id(str): Document identifier.
Returns:
List of task dictionaries or
Noneif no tasks are found.
Details:
Orders tasks by page number and creation time.
Includes progress and chunk IDs.
Usage Example:
tasks = TaskService.get_tasks("doc-uuid-5678") if tasks: for task in tasks: print(task) else: print("No tasks found for document")
update_chunk_ids(id: str, chunk_ids: str) -> None
Update the chunk_ids field for a task, storing processed chunk identifiers as a space-separated string.
Parameters:
id(str): Task identifier.chunk_ids(str): Space-separated chunk IDs.
Usage Example:
TaskService.update_chunk_ids("task-uuid", "chunk1 chunk2 chunk3")
get_ongoing_doc_name() -> list[tuple]
Retrieve a list of document identifiers and locations for documents currently being processed.
Returns:
List of tuples
(parent_id/kb_id, location)representing ongoing documents.
Details:
Uses DB-level locking to ensure thread safety.
Filters documents that are valid, running, and not virtual.
Only includes tasks with progress less than 100% and created recently.
Usage Example:
ongoing_docs = TaskService.get_ongoing_doc_name() for kb_or_parent_id, location in ongoing_docs: print(kb_or_parent_id, location)
do_cancel(id: str) -> bool
Determine if a task should be cancelled based on the status of its associated document.
Parameters:
id(str): Task identifier.
Returns:
Trueif the document status indicates cancellation or negative progress; otherwiseFalse.
Usage Example:
if TaskService.do_cancel("task-uuid-1234"): print("Task should be cancelled")
update_progress(id: str, info: dict) -> None
Update the progress message and percentage completion of a task, with platform-specific behavior and concurrency control.
Parameters:
id(str): Task identifier.info(dict): Dictionary containing keys:progress_msg(str, optional): Message to append.progress(float, optional): Progress value [0.0, 1.0], or -1 for error.
Details:
Appends progress messages, trimming to max 3000 lines.
Updates progress only if new progress is greater or error (-1), avoiding regressions.
Uses DB lock for thread safety on non-macOS platforms.
Usage Example:
TaskService.update_progress("task-uuid-1234", {"progress_msg": "50% done", "progress": 0.5})
trim_header_by_lines(text: str, max_length: int) -> str
Helper function to trim a string preserving line breaks, ensuring the text length does not exceed max_length.
Parameters:
text(str): Input text.max_length(int): Maximum length allowed.
Returns:
Trimmed text string.
Usage Example:
trimmed = trim_header_by_lines(long_text, 3000)
queue_tasks(doc: dict, bucket: str, name: str, priority: int = 0) -> None
Create and enqueue document processing tasks based on document type and configuration.
Parameters:
doc(dict): Document metadata, including type, parser config, and IDs.bucket(str): Storage bucket name.name(str): Document filename.priority(int): Priority for task queueing (default 0).
Details:
Supports PDF and Excel with chunked tasks by page or row ranges.
Calculates task digests for caching and reuse.
Attempts to reuse previously processed chunks.
Updates document chunk count and triggers parsing start.
Enqueues unfinished tasks into Redis queue.
Usage Example:
queue_tasks(doc_metadata, "my_bucket", "file.pdf", priority=5)
reuse_prev_task_chunks(task: dict, prev_tasks: list[dict], chunking_config: dict) -> int
Attempt to reuse chunk IDs from previous completed tasks to optimize processing.
Parameters:
task(dict): Current task.prev_tasks(list[dict]): List of prior tasks.chunking_config(dict): Chunking configuration.
Returns:
Number of chunks reused (int), or 0 if none reused.
Details:
Matches previous tasks by page range and digest.
Only reuses if previous task is complete and has chunks.
Updates progress and progress message accordingly.
Usage Example:
reused_chunks = reuse_prev_task_chunks(new_task, old_tasks, chunk_config)
cancel_all_task_of(doc_id: str) -> None
Set cancellation flags for all tasks associated with a document in Redis.
Parameters:
doc_id(str): Document ID.
Usage Example:
cancel_all_task_of("doc-uuid-1234")
has_canceled(task_id: str) -> bool
Check whether a specific task has been marked as cancelled in Redis.
Parameters:
task_id(str): Task ID.
Returns:
True if task is cancelled, False otherwise.
Usage Example:
if has_canceled("task-uuid-1234"): print("Task was cancelled")
queue_dataflow(dsl: str, tenant_id: str, doc_id: str, task_id: str, flow_id: str, priority: int, callback=None) -> tuple[bool, str]
Queue a dataflow task, which is a specialized task type defined by a DSL (Domain Specific Language) script.
Parameters:
dsl(str): DSL script defining the dataflow.tenant_id(str): Tenant identifier.doc_id(str): Document identifier.task_id(str): Optional existing task ID; if empty, will generate a new one.flow_id(str): Optional flow ID; if empty, will generate a new one.priority(int): Task priority.callback: Optional callback (unused).
Returns:
Tuple
(success: bool, error_message: str).
Details:
Deletes any existing task with the same ID.
Inserts the new dataflow task into the database.
Verifies knowledge base existence.
Enqueues the task into Redis.
Usage Example:
success, err = queue_dataflow(dsl_script, tenant_id, doc_id, "", "", 1) if not success: print("Failed to queue dataflow:", err)
Important Implementation Details and Algorithms
Task Chunking:
PDF documents are chunked by pages, configurable by
task_page_sizeand parser type.Excel documents are chunked by row ranges (default 3000 rows per task).
Other documents default to a single large task.
Task Digest Calculation:
Uses
xxhash64-bit hashing to generate a digest based on the chunking config and task parameters.Digest used to detect task duplication and enable chunk reuse.
Task Retry and Progress:
Tasks retry up to 3 times; after that, they are abandoned.
Progress updates are atomic and only allow forward progress or error state (-1).
Progress messages are appended and trimmed to prevent overflow.
Concurrency and Locking:
Uses database-level locks around critical update operations (
update_progressandget_ongoing_doc_name) to ensure consistency.On macOS, locking is skipped due to environment-specific behavior.
Redis Integration:
Tasks are enqueued into Redis queues with priority-based queue names.
Cancellation flags are managed in Redis keys.
Reuse of Previous Task Chunks:
To optimize processing, completed task chunks from previous runs are reused if task digest and page ranges match.
This avoids redundant chunk processing and speeds up re-parsing.
Interaction with Other Modules
Database Models:
Uses
Task,Document,Knowledgebase,Tenant,File2Document, andFilePeewee ORM models for data retrieval and updates.
Document Parsing:
Integrates with
PdfParserandRAGFlowExcelParserto determine document dimensions (pages, rows).
Storage:
Uses
STORAGE_IMPLto fetch document binaries from storage buckets.
Redis:
Uses
REDIS_CONNfor task queueing and cancellation signaling.
Utilities:
Uses utilities for UUID generation, timestamps, and server queue naming.
Settings:
Reads configurations from
api.settingsandrag.settings.
Other Services:
Calls
DocumentServicefor document-related operations like chunking config and knowledge base retrieval.Uses
CommonServicefor generic database CRUD operations.
Visual Diagram
classDiagram
class TaskService {
+model
+get_task(task_id)
+get_tasks(doc_id)
+update_chunk_ids(id, chunk_ids)
+get_ongoing_doc_name()
+do_cancel(id)
+update_progress(id, info)
}
class DocumentService {
+get_chunking_config(doc_id)
+update_by_id(doc_id, updates)
+get_knowledgebase_id(doc_id)
+begin2parse(doc_id)
+get_by_id(doc_id)
}
class RedisConnection {
+queue_product(queue_name, message)
+set(key, value)
+get(key)
+delete(ids, index_name, kb_id)
}
class Storage {
+get(bucket, name)
}
TaskService ..> DocumentService : uses
TaskService ..> RedisConnection : uses
TaskService ..> Storage : uses
%% Functions outside class
class queue_tasks {
+queue_tasks(doc, bucket, name, priority)
+reuse_prev_task_chunks(task, prev_tasks, config)
+cancel_all_task_of(doc_id)
+has_canceled(task_id)
+queue_dataflow(dsl, tenant_id, doc_id, task_id, flow_id, priority, callback)
}
queue_tasks ..> TaskService : uses
queue_tasks ..> RedisConnection : uses
queue_tasks ..> Storage : uses
queue_tasks ..> DocumentService : uses
Summary
task_service.py is a foundational module in the InfiniFlow platform that orchestrates the lifecycle of document processing tasks. It efficiently manages task creation, chunking based on document type, progress and retry management, and task queueing in Redis. By supporting chunk reuse and sophisticated progress tracking, it boosts system performance and reliability in handling large-scale document processing workloads. This file is tightly integrated with the database layer, storage abstractions, and document parsing utilities, serving as a bridge between the data persistence layer and asynchronous processing infrastructure.