task_service.py


Overview

task_service.py is a core backend service module in the InfiniFlow system responsible for managing document processing tasks. It provides a comprehensive interface for creating, querying, updating, and managing tasks that process various document types—including PDFs and Excel files—within the context of knowledge bases and tenants.

The file implements:

This module interacts heavily with the database models (Task, Document, Knowledgebase, etc.), document parsing utilities, storage abstractions, and Redis-based task queues to ensure reliable and scalable document processing workflows.


Classes and Functions

TaskService(CommonService)

Service class for managing document processing tasks. Inherits from CommonService, leveraging generic database operations and adding task-specific logic.

Attributes

Class Methods


get_task(task_id: str) -> dict | None

Retrieve detailed task information by its unique ID, including related document, knowledge base, and tenant metadata.


get_tasks(doc_id: str) -> list[dict] | None

Retrieve all tasks associated with a specific document.


update_chunk_ids(id: str, chunk_ids: str) -> None

Update the chunk_ids field for a task, storing processed chunk identifiers as a space-separated string.


get_ongoing_doc_name() -> list[tuple]

Retrieve a list of document identifiers and locations for documents currently being processed.


do_cancel(id: str) -> bool

Determine if a task should be cancelled based on the status of its associated document.


update_progress(id: str, info: dict) -> None

Update the progress message and percentage completion of a task, with platform-specific behavior and concurrency control.


trim_header_by_lines(text: str, max_length: int) -> str

Helper function to trim a string preserving line breaks, ensuring the text length does not exceed max_length.


queue_tasks(doc: dict, bucket: str, name: str, priority: int = 0) -> None

Create and enqueue document processing tasks based on document type and configuration.


reuse_prev_task_chunks(task: dict, prev_tasks: list[dict], chunking_config: dict) -> int

Attempt to reuse chunk IDs from previous completed tasks to optimize processing.


cancel_all_task_of(doc_id: str) -> None

Set cancellation flags for all tasks associated with a document in Redis.


has_canceled(task_id: str) -> bool

Check whether a specific task has been marked as cancelled in Redis.


queue_dataflow(dsl: str, tenant_id: str, doc_id: str, task_id: str, flow_id: str, priority: int, callback=None) -> tuple[bool, str]

Queue a dataflow task, which is a specialized task type defined by a DSL (Domain Specific Language) script.


Important Implementation Details and Algorithms


Interaction with Other Modules


Visual Diagram

classDiagram
    class TaskService {
        +model
        +get_task(task_id)
        +get_tasks(doc_id)
        +update_chunk_ids(id, chunk_ids)
        +get_ongoing_doc_name()
        +do_cancel(id)
        +update_progress(id, info)
    }

    class DocumentService {
        +get_chunking_config(doc_id)
        +update_by_id(doc_id, updates)
        +get_knowledgebase_id(doc_id)
        +begin2parse(doc_id)
        +get_by_id(doc_id)
    }

    class RedisConnection {
        +queue_product(queue_name, message)
        +set(key, value)
        +get(key)
        +delete(ids, index_name, kb_id)
    }

    class Storage {
        +get(bucket, name)
    }

    TaskService ..> DocumentService : uses
    TaskService ..> RedisConnection : uses
    TaskService ..> Storage : uses

    %% Functions outside class
    class queue_tasks {
        +queue_tasks(doc, bucket, name, priority)
        +reuse_prev_task_chunks(task, prev_tasks, config)
        +cancel_all_task_of(doc_id)
        +has_canceled(task_id)
        +queue_dataflow(dsl, tenant_id, doc_id, task_id, flow_id, priority, callback)
    }

    queue_tasks ..> TaskService : uses
    queue_tasks ..> RedisConnection : uses
    queue_tasks ..> Storage : uses
    queue_tasks ..> DocumentService : uses

Summary

task_service.py is a foundational module in the InfiniFlow platform that orchestrates the lifecycle of document processing tasks. It efficiently manages task creation, chunking based on document type, progress and retry management, and task queueing in Redis. By supporting chunk reuse and sophisticated progress tracking, it boosts system performance and reliability in handling large-scale document processing workloads. This file is tightly integrated with the database layer, storage abstractions, and document parsing utilities, serving as a bridge between the data persistence layer and asynchronous processing infrastructure.