# document_service.py
---
## Overview
`document_service.py` is a core backend service module within the InfiniFlow system responsible for managing document lifecycle and operations related to document storage, retrieval, parsing, chunking, indexing, and progress tracking. It provides a comprehensive interface to interact with `Document` entities in the database, supporting features such as document listing with filters, health checks, insertion, deletion, metadata updates, and integration with knowledge bases and tenants.
The service also orchestrates the complex workflow of document upload and parsing, including chunking content, embedding generation, and indexing for search. It manages task queueing for advanced parsing techniques like "raptor" and "graphrag" and interacts closely with other services such as knowledgebase, user, file, conversation, and task services. The module uses concurrency, hashing, and external storage and search infrastructure to support scalable document processing.
---
## Classes and Functions
### Class: `DocumentService(CommonService)`
Extends `CommonService` and provides document-specific database operations and business logic.
#### Properties:
* [model = Document](/projects/311/72016)\\
The ORM model representing the `Document` entity.
#### Methods:
---
#### `get_cls_model_fields() -> list`
Returns a list of database fields (columns) of the `Document` model commonly used for queries.
**Usage:**
```python
fields = DocumentService.get_cls_model_fields()
get_list(kb_id, page_number, items_per_page, orderby, desc, keywords, id, name) -> (list, int)
Retrieves a paginated and optionally filtered list of documents belonging to a knowledge base (kb_id). Supports filtering by document id, name, and keyword search on name. Results can be ordered ascending or descending by a specified field.
Parameters:
kb_id(int/str): Knowledgebase ID to filter documents.page_number(int): Page number for pagination.items_per_page(int): Number of items per page.orderby(str): Field name to order by.desc(bool): Whether to order descending.keywords(str or None): Keyword to filter document names.id(str or None): Exact document ID filter.name(str or None): Exact document name filter.
Returns:
Tuple of (list of dict documents, total count)
check_doc_health(tenant_id: str, filename: str) -> bool
Checks whether a document upload meets constraints such as maximum allowed files per user and filename length limits.
Raises
RuntimeErrorif constraints exceeded.Returns
Trueif checks pass.
get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, run_status, types, suffix) -> (list, int)
Fetches documents filtered by knowledgebase ID with additional filters on run status, document types, file suffixes, and keywords.
Supports ordering and pagination.
Returns list of documents and total count.
get_filter_by_kb_id(kb_id, keywords, run_status, types, suffix) -> (dict, int)
Returns aggregation counts of documents grouped by suffix and run status for documents matching filters, useful for UI filtering controls.
Returns:
Dictionary with counts for"suffix"and"run_status"keys, and total count.
count_by_kb_id(kb_id, keywords, run_status, types) -> int
Returns count of documents filtered by knowledgebase ID and optional filters.
get_total_size_by_kb_id(kb_id, keywords="", run_status=[], types=[]) -> int
Calculates total size (sum of document sizes) of documents filtered by knowledgebase ID and optional filters.
insert(doc: dict) -> Document
Inserts a new document record into the database and increments the document count in the associated knowledgebase.
Raises
RuntimeErroron DB errors.Returns the created
Documentinstance.
remove_document(doc: Document, tenant_id: str) -> int
Removes a document and all associated chunks, tasks, thumbnails, and search index entries.
Cleans up storage objects via
STORAGE_IMPL.Deletes related tasks using
TaskService.Updates knowledge graph indexes if relevant.
Returns number of deleted document records.
get_newly_uploaded() -> list
Fetches documents recently uploaded that are valid, running, not virtual files, and have zero progress.
get_unfinished_docs() -> list
Returns documents that are valid, non-virtual, and have progress between 0 and 1 (incomplete processing).
increment_chunk_num(doc_id, kb_id, token_num, chunk_num, duration) -> int
Atomically increments token and chunk counts and process duration for a document and its knowledgebase.
Raises
LookupErrorif document not found.
decrement_chunk_num(doc_id, kb_id, token_num, chunk_num, duration) -> int
Atomically decrements token and chunk counts and adds process duration for a document and its knowledgebase.
Raises
LookupErrorif document not found.
clear_chunk_num(doc_id) -> int
Clears chunk/token counts and decrements document count on knowledgebase when a document is deleted.
Requires the document to exist.
clear_chunk_num_when_rerun(doc_id) -> int
Clears chunk/token counts on knowledgebase without changing document count, used when reprocessing a document.
get_tenant_id(doc_id) -> str or None
Gets the tenant ID associated with a document by joining through its knowledgebase.
Returns None if not found.
get_knowledgebase_id(doc_id) -> str or None
Gets the knowledgebase ID of a document.
get_tenant_id_by_name(name) -> str or None
Gets the tenant ID for a document by document name.
accessible(doc_id, user_id) -> bool
Checks if a user has access to a document through tenant membership.
accessible4deletion(doc_id, user_id) -> bool
Checks if a user has permission to delete a document, requiring specific roles and valid user-tenant status.
get_embd_id(doc_id) -> str or None
Retrieves the embedding model ID associated with the document's knowledgebase.
get_chunking_config(doc_id) -> dict or None
Returns a combined configuration dictionary for chunking a document, including tenant, knowledgebase, and parser settings.
get_doc_id_by_doc_name(doc_name) -> str or None
Returns document ID matching the exact document name.
get_doc_ids_by_doc_names(doc_names: list) -> list
Returns a list of document IDs for the given list of document names.
get_thumbnails(docids: list) -> list
Returns a list of documents with ids, knowledgebase ids, and thumbnails for given document IDs.
update_parser_config(id, config) -> None
Updates the parser configuration for a document by recursively merging new config fields into existing config.
If "raptor" config is removed, deletes it from stored config.
get_doc_count(tenant_id) -> int
Returns the count of documents associated with a tenant.
begin2parse(docid) -> None
Sets initial parsing progress state for a document, marking it as queued.
update_meta_fields(doc_id, meta_fields) -> int
Updates the metadata fields of a document.
get_meta_by_kbs(kb_ids: list) -> dict
Aggregates metadata fields across documents belonging to a list of knowledgebase IDs.
Returns a nested dictionary of metadata keys and values mapping to lists of document IDs.
update_progress() -> None
Updates progress and status of unfinished documents by aggregating progress from associated tasks.
Handles special logic to queue "raptor" or "graphrag" tasks if configured but not yet created.
Updates progress messages and estimated queue lengths.
Logs exceptions but continues processing other documents.
get_kb_doc_count(kb_id) -> int
Returns the count of documents belonging to a knowledgebase.
do_cancel(doc_id) -> bool
Checks whether a document's processing is canceled or marked as failed (progress < 0).
Function: queue_raptor_o_graphrag_tasks(doc: dict, ty: str, priority: int) -> None
Queues a new task of type ty ("raptor" or "graphrag") for a document.
Creates a unique digest for the task using xxhash over task properties and document chunk config.
Inserts the task into the database.
Queues the task message to Redis with priority-based queue name.
Raises AssertionError if Redis queue is inaccessible.
Function: get_queue_length(priority: int) -> int
Returns the approximate length (lag) of the Redis queue for a given priority.
Returns 0 if no queue information is available.
Function: doc_upload_and_parse(conversation_id: str, file_objs: list, user_id: str) -> list
Handles the overall workflow of uploading documents associated with a conversation and knowledgebase, parsing them into chunks, generating embeddings, indexing, and updating document chunk counts.
Resolves conversation and its knowledgebase.
Uploads files and obtains blob content.
Uses thread pool to parse files concurrently using parser modules keyed by parser ID.
Supports specialized parsers for presentations, pictures, audio, emails, or falls back to naive parsing.
Generates mind maps via
MindMapExtractorfor non-picture documents using an LLM.Encodes chunks into embeddings using tenant's embedding model.
Inserts chunks and embeddings into document store index, creating index if needed.
Updates document and knowledgebase chunk/token counts.
Stores images in external storage if present.
Returns list of uploaded document IDs.
Implementation Details and Algorithms
Database Access: Uses Peewee ORM with explicit connection context management (
@DB.connection_context()decorators).Query Efficiency: Uses joins to related tables (
File2Document,File,Knowledgebase,Tenant) to fetch related data in single queries.Chunking & Embedding: Uses external NLP and embedding libraries (
rag.nlp,LLMBundle) to tokenize and vectorize document chunks.Task Queueing: Implements task queueing with Redis queues, managing priorities and consumer group lags.
Hashing: Uses
xxhashfor fast, consistent hashing of chunking configurations and task properties to generate unique task digests.Concurrency: Uses Python's
ThreadPoolExecutorto parallelize document parsing for performance.Error Handling: Extensive use of exceptions for flow control and validation, with logged exceptions in non-critical paths.
Recursive Config Update: Deep merges new parser configuration dictionaries into existing configs with the
dfs_updatehelper.Document/Knowledgebase Counters: Keeps synchronized counts of tokens, chunks, and document numbers to track processing progress and resource usage.
Document Store Interaction: Inserts and deletes document chunks from an external document store with index management.
Interaction with Other System Components
Database Models: Interacts primarily with
Document,Knowledgebase,Tenant,File,Task,UserTenantORM models.Other Services: Uses services like
KnowledgebaseService,TaskService,FileService,ConversationService,DialogService,API4ConversationService,TenantServicefor multi-entity coordination.Document Store: Uses
settings.docStoreConnto interact with a document storage and search index system.Storage Layer: Uses
STORAGE_IMPLimplementation for storing and removing binary objects (e.g., images).Task Queue: Uses Redis connection
REDIS_CONNfor queuing document processing tasks with priority.NLP & Embedding: Uses
rag.nlp,rag_tokenizer, and tenant-specific LLM bundles for tokenization and embedding creation.Parsers: Supports multiple parser modules for different document types (presentation, picture, audio, email, naive fallback).
Knowledge Graph: Updates knowledge graph related indexes on document removal.
Logging: Uses standard Python
loggingfor error reporting.
Usage Examples
Fetch paginated list of documents for a knowledgebase:
docs, total = DocumentService.get_list(
kb_id="kb123",
page_number=1,
items_per_page=10,
orderby="create_time",
desc=True,
keywords="report",
id=None,
name=None
)
Insert a new document record:
doc_data = {
"id": "doc456",
"kb_id": "kb123",
"name": "Annual Report 2023",
"size": 123456,
# ... other fields ...
}
new_doc = DocumentService.insert(doc_data)
Remove a document and all associated data:
doc = Document.get_by_id("doc456")
DocumentService.remove_document(doc, tenant_id="tenant789")
Upload and parse files for a conversation:
uploaded_doc_ids = doc_upload_and_parse(conversation_id="conv001", file_objs=[file1, file2], user_id="user123")
Visual Diagram
classDiagram
class DocumentService {
<<CommonService>>
+model: Document
+get_cls_model_fields()
+get_list(kb_id, page_number, items_per_page, orderby, desc, keywords, id, name)
+check_doc_health(tenant_id, filename)
+get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, run_status, types, suffix)
+get_filter_by_kb_id(kb_id, keywords, run_status, types, suffix)
+count_by_kb_id(kb_id, keywords, run_status, types)
+get_total_size_by_kb_id(kb_id, keywords, run_status, types)
+insert(doc)
+remove_document(doc, tenant_id)
+get_newly_uploaded()
+get_unfinished_docs()
+increment_chunk_num(doc_id, kb_id, token_num, chunk_num, duration)
+decrement_chunk_num(doc_id, kb_id, token_num, chunk_num, duration)
+clear_chunk_num(doc_id)
+clear_chunk_num_when_rerun(doc_id)
+get_tenant_id(doc_id)
+get_knowledgebase_id(doc_id)
+get_tenant_id_by_name(name)
+accessible(doc_id, user_id)
+accessible4deletion(doc_id, user_id)
+get_embd_id(doc_id)
+get_chunking_config(doc_id)
+get_doc_id_by_doc_name(doc_name)
+get_doc_ids_by_doc_names(doc_names)
+get_thumbnails(docids)
+update_parser_config(id, config)
+get_doc_count(tenant_id)
+begin2parse(docid)
+update_meta_fields(doc_id, meta_fields)
+get_meta_by_kbs(kb_ids)
+update_progress()
+get_kb_doc_count(kb_id)
+do_cancel(doc_id)
}
class Task
class Knowledgebase
class Tenant
class File
class File2Document
class UserTenant
class StorageImpl
class RedisConn
DocumentService --> Task : manages tasks
DocumentService --> Knowledgebase : updates counts
DocumentService --> Tenant : retrieves tenant info
DocumentService --> File : joins file info
DocumentService --> File2Document : joins document-file mapping
DocumentService --> UserTenant : checks user access
DocumentService --> StorageImpl : stores/removes blobs
DocumentService --> RedisConn : queues tasks
Summary
document_service.py is a pivotal module managing documents in the InfiniFlow platform. It encapsulates database operations, task management, document parsing workflows, and integration with storage, embedding, and search services. Its methods facilitate document lifecycle management, access control, metadata handling, and real-time progress tracking, enabling robust document processing pipelines within a multi-tenant knowledge base environment.