index.py


Overview

The index.py file is a core asynchronous processing module within the InfiniFlow system, responsible for constructing, maintaining, and enriching knowledge graphs derived from chunks of document data. It orchestrates a pipeline that extracts entities and relationships from document chunks, generates subgraphs, merges them into global knowledge graphs, performs entity resolution to unify duplicate or related entities, and optionally extracts community structures within graphs for advanced analysis.

The file integrates various specialized extractors and utilities, manages concurrency through distributed locking, and interacts heavily with external data stores (e.g., Elasticsearch via settings.docStoreConn). It leverages the networkx library for graph manipulations and uses trio for async concurrency management with timeout controls.


Detailed Explanation of Classes, Functions, and Methods

1. async def run_graphrag(row: dict, language, with_resolution: bool, with_community: bool, chat_model, embedding_model, callback)

Purpose:
Top-level asynchronous function that coordinates the entire GraphRAG (Graph Retrieval-Augmented Generation) workflow for a given document. It extracts document chunks, generates subgraphs, merges them into the global knowledge graph, and optionally performs entity resolution and community extraction.

Parameters:

Returns:

Usage Example:

await run_graphrag(
    row=document_metadata,
    language="en",
    with_resolution=True,
    with_community=True,
    chat_model=my_llm,
    embedding_model=my_embedder,
    callback=print
)

Implementation Details:


2. async def generate_subgraph(extractor: Extractor, tenant_id: str, kb_id: str, doc_id: str, chunks: list[str], language, entity_types, llm_bdl, embed_bdl, callback)

Purpose:
Generates a subgraph from document chunks using the specified extractor. This subgraph contains entities as nodes and relations as edges, annotated with metadata.

Parameters:

Returns:

Usage Example:

subgraph = await generate_subgraph(
    LightKGExt,
    tenant_id="tenant_123",
    kb_id="kb_456",
    doc_id="doc_789",
    chunks=["chunk1 text", "chunk2 text"],
    language="en",
    entity_types=["Person", "Organization"],
    llm_bdl=my_llm,
    embed_bdl=my_embedder,
    callback=print,
)

Implementation Details:


3. @timeout(60 * 3) async def merge_subgraph(tenant_id: str, kb_id: str, doc_id: str, subgraph: nx.Graph, embedding_model, callback)

Purpose:
Merges the generated subgraph into the existing global knowledge graph for the tenant and KB, updating node and edge sets and recalculating PageRank scores.

Parameters:

Returns:

Usage Example:

new_graph = await merge_subgraph(
    tenant_id="tenant_123",
    kb_id="kb_456",
    doc_id="doc_789",
    subgraph=subgraph,
    embedding_model=my_embedder,
    callback=print,
)

Implementation Details:


4. @timeout(60 * 30, 1) async def resolve_entities(graph, subgraph_nodes: set[str], tenant_id: str, kb_id: str, doc_id: str, llm_bdl, embed_bdl, callback)

Purpose:
Performs entity resolution on the graph to unify duplicate or semantically equivalent entities, refining the graph structure.

Parameters:

Returns:

Usage Example:

await resolve_entities(
    graph=global_graph,
    subgraph_nodes=set(subgraph.nodes()),
    tenant_id="tenant_123",
    kb_id="kb_456",
    doc_id="doc_789",
    llm_bdl=my_llm,
    embed_bdl=my_embedder,
    callback=print,
)

Implementation Details:


5. @timeout(60 * 30, 1) async def extract_community(graph, tenant_id: str, kb_id: str, doc_id: str, llm_bdl, embed_bdl, callback)

Purpose:
Extracts community structures (clusters or groups of related entities) from the knowledge graph, generates narrative reports for these communities, and indexes the reports as document chunks.

Parameters:

Returns:

Usage Example:

comm_struct, comm_reports = await extract_community(
    graph=global_graph,
    tenant_id="tenant_123",
    kb_id="kb_456",
    doc_id="doc_789",
    llm_bdl=my_llm,
    embed_bdl=my_embedder,
    callback=print,
)

Implementation Details:


Important Implementation Details and Algorithms


Interaction with Other Parts of the Application


Visual Diagram: Class and Function Structure

flowchart TD
    A[run_graphrag] --> B[generate_subgraph]
    A --> C[merge_subgraph]
    C --> D[set_graph]
    A --> E[resolve_entities]
    E --> D
    A --> F[extract_community]
    F --> G[CommunityReportsExtractor]
    F --> H[docStoreConn.insert]
    B --> I[Extractor (LightKGExt or GeneralKGExt)]
    E --> J[EntityResolution]
    subgraph Utilities
        D[set_graph]
        K[get_graph]
        L[graph_merge]
        M[tidy_graph]
    end
    subgraph Storage
        H
        N[docStoreConn.delete]
    end
    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#bbf,stroke:#333,stroke-width:1.5px
    style C fill:#bbf,stroke:#333,stroke-width:1.5px
    style E fill:#bbf,stroke:#333,stroke-width:1.5px
    style F fill:#bbf,stroke:#333,stroke-width:1.5px

Summary

The index.py module is a pivotal orchestrator in the InfiniFlow platform’s knowledge graph lifecycle, integrating diverse components to extract, merge, and refine graph data derived from document collections. It ensures data consistency through distributed locking, optimizes graph quality via entity resolution and PageRank, and enhances knowledge representation by generating human-readable community reports. This file exemplifies advanced asynchronous programming, graph analytics, and scalable document indexing within a distributed system architecture.