index.py
Overview
The index.py file is a core asynchronous processing module within the InfiniFlow system, responsible for constructing, maintaining, and enriching knowledge graphs derived from chunks of document data. It orchestrates a pipeline that extracts entities and relationships from document chunks, generates subgraphs, merges them into global knowledge graphs, performs entity resolution to unify duplicate or related entities, and optionally extracts community structures within graphs for advanced analysis.
The file integrates various specialized extractors and utilities, manages concurrency through distributed locking, and interacts heavily with external data stores (e.g., Elasticsearch via settings.docStoreConn). It leverages the networkx library for graph manipulations and uses trio for async concurrency management with timeout controls.
Detailed Explanation of Classes, Functions, and Methods
1. async def run_graphrag(row: dict, language, with_resolution: bool, with_community: bool, chat_model, embedding_model, callback)
Purpose:
Top-level asynchronous function that coordinates the entire GraphRAG (Graph Retrieval-Augmented Generation) workflow for a given document. It extracts document chunks, generates subgraphs, merges them into the global knowledge graph, and optionally performs entity resolution and community extraction.
Parameters:
row (dict): Metadata and configuration for the document, including tenant, knowledge base (KB) ID, document ID, and parsing configurations.language: Language code or object specifying the language context for extraction.with_resolution (bool): Whether to perform entity resolution after merging.with_community (bool): Whether to extract community structures from the graph.chat_model: Large language model (LLM) backend used for entity extraction and resolution.embedding_model: Embedding model backend for vector representations.callback (Callable): Function for logging or progress reporting messages.
Returns:
Noneor completes after processing; returns early if no subgraph is generated.
Usage Example:
await run_graphrag(
row=document_metadata,
language="en",
with_resolution=True,
with_community=True,
chat_model=my_llm,
embedding_model=my_embedder,
callback=print
)
Implementation Details:
Retrieves document chunks from
settings.retrievaler.Selects either a general or light graph extractor based on KB parser config.
Enforces a timeout for subgraph generation, adjustable via environment variable.
Uses a Redis distributed lock keyed by KB ID to ensure exclusive processing.
Calls
generate_subgraph(),merge_subgraph(),resolve_entities(), andextract_community()in sequence.Logs timing and progress via the callback.
2. async def generate_subgraph(extractor: Extractor, tenant_id: str, kb_id: str, doc_id: str, chunks: list[str], language, entity_types, llm_bdl, embed_bdl, callback)
Purpose:
Generates a subgraph from document chunks using the specified extractor. This subgraph contains entities as nodes and relations as edges, annotated with metadata.
Parameters:
extractor (Extractor): Extractor class (e.g.,LightKGExtorGeneralKGExt) to parse chunks.tenant_id (str): Tenant identifier.kb_id (str): Knowledge base identifier.doc_id (str): Document identifier.chunks (list[str]): List of document text chunks.language: Language context.entity_types (list): Types of entities to extract.llm_bdl: LLM backend driver.embed_bdl: Embedding backend driver (unused directly here).callback (Callable): Logger for progress messages.
Returns:
nx.Graphsubgraph if generated successfully, orNoneif the graph already contains the document.
Usage Example:
subgraph = await generate_subgraph(
LightKGExt,
tenant_id="tenant_123",
kb_id="kb_456",
doc_id="doc_789",
chunks=["chunk1 text", "chunk2 text"],
language="en",
entity_types=["Person", "Organization"],
llm_bdl=my_llm,
embed_bdl=my_embedder,
callback=print,
)
Implementation Details:
Checks if graph already contains the document node (
does_graph_contains).Uses the extractor to asynchronously extract entities and relationships.
Builds a NetworkX graph, adding nodes with entity attributes and edges with relation attributes.
Ignores relations referencing non-existent nodes.
Calls
tidy_graph()to clean and normalize the graph structure.Stores the serialized subgraph chunk in the document store after deleting previous subgraph chunks for the document.
Reports the number of ignored relations and timing.
3. @timeout(60 * 3) async def merge_subgraph(tenant_id: str, kb_id: str, doc_id: str, subgraph: nx.Graph, embedding_model, callback)
Purpose:
Merges the generated subgraph into the existing global knowledge graph for the tenant and KB, updating node and edge sets and recalculating PageRank scores.
Parameters:
tenant_id (str): Tenant identifier.kb_id (str): Knowledge base identifier.doc_id (str): Document identifier.subgraph (nx.Graph): Newly generated subgraph to merge.embedding_model: Embedding model for downstream processing.callback (Callable): Logger for progress messages.
Returns:
nx.Graphupdated global knowledge graph.
Usage Example:
new_graph = await merge_subgraph(
tenant_id="tenant_123",
kb_id="kb_456",
doc_id="doc_789",
subgraph=subgraph,
embedding_model=my_embedder,
callback=print,
)
Implementation Details:
Retrieves old graph if it exists; otherwise, uses subgraph as the new global graph.
Uses
graph_merge()utility to combine old and new graphs, tracking changes.Calculates PageRank for importance scoring of nodes.
Persists the updated graph and changes via
set_graph().Enforced a 3-minute timeout on this operation.
4. @timeout(60 * 30, 1) async def resolve_entities(graph, subgraph_nodes: set[str], tenant_id: str, kb_id: str, doc_id: str, llm_bdl, embed_bdl, callback)
Purpose:
Performs entity resolution on the graph to unify duplicate or semantically equivalent entities, refining the graph structure.
Parameters:
graph (nx.Graph): Knowledge graph to resolve entities in.subgraph_nodes (set[str]): Nodes from the newly added subgraph to focus resolution on.tenant_id (str): Tenant identifier.kb_id (str): Knowledge base identifier.doc_id (str): Document identifier.llm_bdl: LLM backend driver for resolution logic.embed_bdl: Embedding backend driver for persistence.callback (Callable): Logger for progress messages.
Returns:
None(side effect: updates graph storage).
Usage Example:
await resolve_entities(
graph=global_graph,
subgraph_nodes=set(subgraph.nodes()),
tenant_id="tenant_123",
kb_id="kb_456",
doc_id="doc_789",
llm_bdl=my_llm,
embed_bdl=my_embedder,
callback=print,
)
Implementation Details:
Instantiates
EntityResolutioncomponent with LLM backend.Executes resolution asynchronously, which modifies the graph and returns a
GraphChangeobject.Logs the number of removed nodes and edges.
Updates pagerank scores and persists the graph state.
Timeout set to 30 minutes with 1 retry.
5. @timeout(60 * 30, 1) async def extract_community(graph, tenant_id: str, kb_id: str, doc_id: str, llm_bdl, embed_bdl, callback)
Purpose:
Extracts community structures (clusters or groups of related entities) from the knowledge graph, generates narrative reports for these communities, and indexes the reports as document chunks.
Parameters:
graph (nx.Graph): Knowledge graph on which to run community detection.tenant_id (str): Tenant identifier.kb_id (str): Knowledge base identifier.doc_id (str): Document identifier.llm_bdl: LLM backend driver for report generation.embed_bdl: Embedding backend driver for document indexing.callback (Callable): Logger for progress messages.
Returns:
Tuple
(community_structure, community_reports)containing structured community data and textual reports.
Usage Example:
comm_struct, comm_reports = await extract_community(
graph=global_graph,
tenant_id="tenant_123",
kb_id="kb_456",
doc_id="doc_789",
llm_bdl=my_llm,
embed_bdl=my_embedder,
callback=print,
)
Implementation Details:
Uses
CommunityReportsExtractorto identify communities and produce reports.Constructs document chunks summarizing each community, including tokens for indexing.
Deletes previous community reports in the document store before bulk inserting new ones.
Bulk insertion is batched (size=4) for efficiency.
Logs timing and count of extracted communities.
Timeout set to 30 minutes with 1 retry.
Important Implementation Details and Algorithms
Graph Construction and Manipulation: Uses the NetworkX library to build and manipulate knowledge graphs, representing entities as nodes and relations as edges. Attributes such as descriptions and source IDs are stored as node/edge metadata.
Entity Extraction and Resolution: Employs specialized extractors (
LightKGExt,GeneralKGExt) to parse document chunks and extract graph components. Entity resolution uses an LLM-powered approach to identify and merge duplicate entities across the graph.PageRank Calculation: After graph merges and resolution, PageRank scores are computed to rank nodes by importance within the graph.
Concurrency and Locking: Distributed locking via Redis (
RedisDistributedLock) ensures that concurrent processes do not simultaneously modify the same knowledge base graph, preventing race conditions.Timeout Decorators: Critical long-running functions have enforced timeouts to avoid indefinite blocking, with some allowing retries.
Document Store Interaction: The system persists intermediate and final graph data, subgraphs, and community reports to an external document store (
settings.docStoreConn), integrated with Elasticsearch indices per tenant.Tokenization for Indexing: Community report contents are tokenized using the
rag_tokenizerfor fine-grained text indexing.
Interaction with Other Parts of the Application
Settings and Configuration: Utilizes global settings for retrieval, document store connection, and environment variables.
API Utilities: Uses utility functions such as
get_uuid,timeout, and graph utilities likegraph_merge,tidy_graph,set_graph.Extractor Modules: Relies on multiple extractor classes from the
graphragpackage to perform domain-specific knowledge graph extraction and community detection.RAG NLP Module: Employs
rag_tokenizerfor tokenization in community report indexing.Redis Distributed Locking: Coordinates concurrency control across distributed processes.
Document Store (Elasticsearch): Reads from and writes to Elasticsearch-backed document stores for graph chunks and reports, maintaining index consistency per tenant and KB.
Visual Diagram: Class and Function Structure
flowchart TD
A[run_graphrag] --> B[generate_subgraph]
A --> C[merge_subgraph]
C --> D[set_graph]
A --> E[resolve_entities]
E --> D
A --> F[extract_community]
F --> G[CommunityReportsExtractor]
F --> H[docStoreConn.insert]
B --> I[Extractor (LightKGExt or GeneralKGExt)]
E --> J[EntityResolution]
subgraph Utilities
D[set_graph]
K[get_graph]
L[graph_merge]
M[tidy_graph]
end
subgraph Storage
H
N[docStoreConn.delete]
end
style A fill:#f9f,stroke:#333,stroke-width:2px
style B fill:#bbf,stroke:#333,stroke-width:1.5px
style C fill:#bbf,stroke:#333,stroke-width:1.5px
style E fill:#bbf,stroke:#333,stroke-width:1.5px
style F fill:#bbf,stroke:#333,stroke-width:1.5px
Summary
The index.py module is a pivotal orchestrator in the InfiniFlow platform’s knowledge graph lifecycle, integrating diverse components to extract, merge, and refine graph data derived from document collections. It ensures data consistency through distributed locking, optimizes graph quality via entity resolution and PageRank, and enhances knowledge representation by generating human-readable community reports. This file exemplifies advanced asynchronous programming, graph analytics, and scalable document indexing within a distributed system architecture.