# document_service.py

---

## Overview

`document_service.py` is a core backend service module within the InfiniFlow system responsible for managing document lifecycle and operations related to document storage, retrieval, parsing, chunking, indexing, and progress tracking. It provides a comprehensive interface to interact with `Document` entities in the database, supporting features such as document listing with filters, health checks, insertion, deletion, metadata updates, and integration with knowledge bases and tenants.

The service also orchestrates the complex workflow of document upload and parsing, including chunking content, embedding generation, and indexing for search. It manages task queueing for advanced parsing techniques like "raptor" and "graphrag" and interacts closely with other services such as knowledgebase, user, file, conversation, and task services. The module uses concurrency, hashing, and external storage and search infrastructure to support scalable document processing.

---

## Classes and Functions

### Class: `DocumentService(CommonService)`

Extends `CommonService` and provides document-specific database operations and business logic.

#### Properties:

* [model = Document](/projects/311/72016)\\
  The ORM model representing the `Document` entity.

#### Methods:

---

#### `get_cls_model_fields() -> list`

Returns a list of database fields (columns) of the `Document` model commonly used for queries.

**Usage:**

```python
fields = DocumentService.get_cls_model_fields()

get_list(kb_id, page_number, items_per_page, orderby, desc, keywords, id, name) -> (list, int)

Retrieves a paginated and optionally filtered list of documents belonging to a knowledge base (kb_id). Supports filtering by document id, name, and keyword search on name. Results can be ordered ascending or descending by a specified field.


check_doc_health(tenant_id: str, filename: str) -> bool

Checks whether a document upload meets constraints such as maximum allowed files per user and filename length limits.


get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, run_status, types, suffix) -> (list, int)

Fetches documents filtered by knowledgebase ID with additional filters on run status, document types, file suffixes, and keywords.


get_filter_by_kb_id(kb_id, keywords, run_status, types, suffix) -> (dict, int)

Returns aggregation counts of documents grouped by suffix and run status for documents matching filters, useful for UI filtering controls.


count_by_kb_id(kb_id, keywords, run_status, types) -> int

Returns count of documents filtered by knowledgebase ID and optional filters.


get_total_size_by_kb_id(kb_id, keywords="", run_status=[], types=[]) -> int

Calculates total size (sum of document sizes) of documents filtered by knowledgebase ID and optional filters.


insert(doc: dict) -> Document

Inserts a new document record into the database and increments the document count in the associated knowledgebase.


remove_document(doc: Document, tenant_id: str) -> int

Removes a document and all associated chunks, tasks, thumbnails, and search index entries.


get_newly_uploaded() -> list

Fetches documents recently uploaded that are valid, running, not virtual files, and have zero progress.


get_unfinished_docs() -> list

Returns documents that are valid, non-virtual, and have progress between 0 and 1 (incomplete processing).


increment_chunk_num(doc_id, kb_id, token_num, chunk_num, duration) -> int

Atomically increments token and chunk counts and process duration for a document and its knowledgebase.


decrement_chunk_num(doc_id, kb_id, token_num, chunk_num, duration) -> int

Atomically decrements token and chunk counts and adds process duration for a document and its knowledgebase.


clear_chunk_num(doc_id) -> int

Clears chunk/token counts and decrements document count on knowledgebase when a document is deleted.


clear_chunk_num_when_rerun(doc_id) -> int

Clears chunk/token counts on knowledgebase without changing document count, used when reprocessing a document.


get_tenant_id(doc_id) -> str or None

Gets the tenant ID associated with a document by joining through its knowledgebase.

Returns None if not found.


get_knowledgebase_id(doc_id) -> str or None

Gets the knowledgebase ID of a document.


get_tenant_id_by_name(name) -> str or None

Gets the tenant ID for a document by document name.


accessible(doc_id, user_id) -> bool

Checks if a user has access to a document through tenant membership.


accessible4deletion(doc_id, user_id) -> bool

Checks if a user has permission to delete a document, requiring specific roles and valid user-tenant status.


get_embd_id(doc_id) -> str or None

Retrieves the embedding model ID associated with the document's knowledgebase.


get_chunking_config(doc_id) -> dict or None

Returns a combined configuration dictionary for chunking a document, including tenant, knowledgebase, and parser settings.


get_doc_id_by_doc_name(doc_name) -> str or None

Returns document ID matching the exact document name.


get_doc_ids_by_doc_names(doc_names: list) -> list

Returns a list of document IDs for the given list of document names.


get_thumbnails(docids: list) -> list

Returns a list of documents with ids, knowledgebase ids, and thumbnails for given document IDs.


update_parser_config(id, config) -> None

Updates the parser configuration for a document by recursively merging new config fields into existing config.

If "raptor" config is removed, deletes it from stored config.


get_doc_count(tenant_id) -> int

Returns the count of documents associated with a tenant.


begin2parse(docid) -> None

Sets initial parsing progress state for a document, marking it as queued.


update_meta_fields(doc_id, meta_fields) -> int

Updates the metadata fields of a document.


get_meta_by_kbs(kb_ids: list) -> dict

Aggregates metadata fields across documents belonging to a list of knowledgebase IDs.

Returns a nested dictionary of metadata keys and values mapping to lists of document IDs.


update_progress() -> None

Updates progress and status of unfinished documents by aggregating progress from associated tasks.


get_kb_doc_count(kb_id) -> int

Returns the count of documents belonging to a knowledgebase.


do_cancel(doc_id) -> bool

Checks whether a document's processing is canceled or marked as failed (progress < 0).


Function: queue_raptor_o_graphrag_tasks(doc: dict, ty: str, priority: int) -> None

Queues a new task of type ty ("raptor" or "graphrag") for a document.


Function: get_queue_length(priority: int) -> int

Returns the approximate length (lag) of the Redis queue for a given priority.

Returns 0 if no queue information is available.


Function: doc_upload_and_parse(conversation_id: str, file_objs: list, user_id: str) -> list

Handles the overall workflow of uploading documents associated with a conversation and knowledgebase, parsing them into chunks, generating embeddings, indexing, and updating document chunk counts.


Implementation Details and Algorithms


Interaction with Other System Components


Usage Examples

Fetch paginated list of documents for a knowledgebase:

docs, total = DocumentService.get_list(
    kb_id="kb123",
    page_number=1,
    items_per_page=10,
    orderby="create_time",
    desc=True,
    keywords="report",
    id=None,
    name=None
)

Insert a new document record:

doc_data = {
    "id": "doc456",
    "kb_id": "kb123",
    "name": "Annual Report 2023",
    "size": 123456,
    # ... other fields ...
}
new_doc = DocumentService.insert(doc_data)

Remove a document and all associated data:

doc = Document.get_by_id("doc456")
DocumentService.remove_document(doc, tenant_id="tenant789")

Upload and parse files for a conversation:

uploaded_doc_ids = doc_upload_and_parse(conversation_id="conv001", file_objs=[file1, file2], user_id="user123")

Visual Diagram

classDiagram
    class DocumentService {
        <<CommonService>>
        +model: Document
        +get_cls_model_fields()
        +get_list(kb_id, page_number, items_per_page, orderby, desc, keywords, id, name)
        +check_doc_health(tenant_id, filename)
        +get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, run_status, types, suffix)
        +get_filter_by_kb_id(kb_id, keywords, run_status, types, suffix)
        +count_by_kb_id(kb_id, keywords, run_status, types)
        +get_total_size_by_kb_id(kb_id, keywords, run_status, types)
        +insert(doc)
        +remove_document(doc, tenant_id)
        +get_newly_uploaded()
        +get_unfinished_docs()
        +increment_chunk_num(doc_id, kb_id, token_num, chunk_num, duration)
        +decrement_chunk_num(doc_id, kb_id, token_num, chunk_num, duration)
        +clear_chunk_num(doc_id)
        +clear_chunk_num_when_rerun(doc_id)
        +get_tenant_id(doc_id)
        +get_knowledgebase_id(doc_id)
        +get_tenant_id_by_name(name)
        +accessible(doc_id, user_id)
        +accessible4deletion(doc_id, user_id)
        +get_embd_id(doc_id)
        +get_chunking_config(doc_id)
        +get_doc_id_by_doc_name(doc_name)
        +get_doc_ids_by_doc_names(doc_names)
        +get_thumbnails(docids)
        +update_parser_config(id, config)
        +get_doc_count(tenant_id)
        +begin2parse(docid)
        +update_meta_fields(doc_id, meta_fields)
        +get_meta_by_kbs(kb_ids)
        +update_progress()
        +get_kb_doc_count(kb_id)
        +do_cancel(doc_id)
    }

    class Task
    class Knowledgebase
    class Tenant
    class File
    class File2Document
    class UserTenant
    class StorageImpl
    class RedisConn

    DocumentService --> Task : manages tasks
    DocumentService --> Knowledgebase : updates counts
    DocumentService --> Tenant : retrieves tenant info
    DocumentService --> File : joins file info
    DocumentService --> File2Document : joins document-file mapping
    DocumentService --> UserTenant : checks user access
    DocumentService --> StorageImpl : stores/removes blobs
    DocumentService --> RedisConn : queues tasks

Summary

document_service.py is a pivotal module managing documents in the InfiniFlow platform. It encapsulates database operations, task management, document parsing workflows, and integration with storage, embedding, and search services. Its methods facilitate document lifecycle management, access control, metadata handling, and real-time progress tracking, enabling robust document processing pipelines within a multi-tenant knowledge base environment.