extractor.py
Overview
The extractor.py file is a core utility component in the InfiniFlow system responsible for extracting, merging, and summarizing entities and relationships from chunks of textual data using a Large Language Model (LLM). It provides an asynchronous pipeline that processes multiple text chunks concurrently, identifies meaningful nodes (entities) and edges (relationships), merges redundant information, and uses LLM-powered summarization to condense descriptions. This file interfaces heavily with the LLM abstraction layer and graph data structures, integrating natural language understanding into graph-based knowledge representations.
Classes and Functions
Class: Extractor
The Extractor class encapsulates the logic to extract entities and relationships from text chunks by leveraging an LLM invoker, managing concurrency, caching, and summarization.
Initialization
def __init__(
self,
llm_invoker: CompletionLLM,
language: str | None = "English",
entity_types: list[str] | None = None,
)
Parameters:
llm_invoker(CompletionLLM): An instance of a chat-based completion LLM used to perform extraction and summarization tasks.language(str | None, optional): Language context for extraction and summarization. Defaults to"English".entity_types(list[str] | None, optional): List of entity types to recognize. Defaults to["organization", "person", "geo", "event", "category"].
Description:
Initializes the extractor with the provided LLM instance, language, and entity types to focus on.
Private Method: _chat
@timeout(60 * 20)
def _chat(self, system, history, gen_conf={})
Parameters:
system(str): System prompt or instructions for the LLM.history(list): Conversation history as a list of messages.gen_conf(dict, optional): Configuration parameters for LLM generation.
Returns:
str- The response generated by the LLM.Description:
Handles interaction with the LLM, including caching, retries (up to 3 attempts), and error handling. It strips out unwanted preamble (</think>tag) and detects error messages in the response. Uses a timeout decorator to abort if the call exceeds 20 minutes.Usage Example:
response = extractor._chat(system="You are a helpful assistant", history=[{"role": "user", "content": "Extract entities"}])
Private Method: _entities_and_relations
def _entities_and_relations(self, chunk_key: str, records: list, tuple_delimiter: str)
Parameters:
chunk_key(str): Identifier for the current chunk.records(list[str]): List of raw string records extracted from the chunk.tuple_delimiter(str): Delimiter used to split tuple elements in a record.
Returns:
Tuple of two dictionaries:maybe_nodes(dict): Maps entity names to lists of entity data dictionaries.maybe_edges(dict): Maps(src_id, tgt_id)tuples to lists of relationship data dictionaries.
Description:
Processes extracted records by splitting them and determining whether they represent entities or relationships using helper functionshandle_single_entity_extractionandhandle_single_relationship_extraction. Filters entities by type based on configured entity types.Usage Example:
nodes, edges = extractor._entities_and_relations("chunk1", ["org1|desc|type"], "|")
Asynchronous Callable: __call__
async def __call__(self, doc_id: str, chunks: list[str], callback: Callable | None = None)
Parameters:
doc_id(str): Document identifier.chunks(list[str]): List of text chunks to process.callback(Callable | None, optional): Optional callback function for progress updates.
Returns:
Tuple:all_entities_data(list[dict]): List of merged and summarized entity dictionaries.all_relationships_data(list[dict]): List of merged and summarized relationship dictionaries.
Description:
Entry point to extract entities and relationships from multiple chunks concurrently. Usestriofor async concurrency limiting to avoid overload. After extraction, merges entities and relationships asynchronously and summarizes descriptions with LLM calls. Reports progress via optional callback.Usage Example:
entities, relationships = await extractor(doc_id="doc123", chunks=["chunk1 text", "chunk2 text"])
Private Async Method: _merge_nodes
async def _merge_nodes(self, entity_name: str, entities: list[dict], all_relationships_data)
Parameters:
entity_name(str): Name of the entity to merge.entities(list[dict]): List of entity data dicts to merge.all_relationships_data(list[dict]): Aggregated list to append merged entity data.
Description:
Merges multiple entity extractions for the same entity name by choosing the most frequent entity type, combining descriptions, and consolidating source IDs. Calls_handle_entity_relation_summaryto summarize the combined description before appending the merged entity data to the results list.
Private Async Method: _merge_edges
async def _merge_edges(self, src_id: str, tgt_id: str, edges_data: list[dict], all_relationships_data=None)
Parameters:
src_id(str): Source entity ID.tgt_id(str): Target entity ID.edges_data(list[dict]): List of edge data dicts to merge.all_relationships_data(list[dict], optional): Aggregated list to append merged edge data.
Description:
Merges multiple relationship extractions between the same source and target by summing weights, concatenating descriptions, consolidating keywords and source IDs, and summarizing the combined description using_handle_entity_relation_summary. The merged edge is appended to the results list.
Private Async Method: _merge_graph_nodes
async def _merge_graph_nodes(self, graph: nx.Graph, nodes: list[str], change: GraphChange)
Parameters:
graph(networkx.Graph): Graph instance containing nodes and edges.nodes(list[str]): List of node names to merge.change(GraphChange): Object tracking added/removed nodes and edges.
Description:
Merges multiple nodes in a NetworkX graph into a single node (the first in the list). Descriptions and source IDs are combined. Neighboring edges are updated and merged where applicable. Summaries are refreshed after merging. Tracks graph changes inchangefor downstream processing.
Private Async Method: _handle_entity_relation_summary
async def _handle_entity_relation_summary(self, entity_or_relation_name: str, description: str) -> str
Parameters:
entity_or_relation_name(str): Name of the entity or relation being summarized.description(str): Long concatenated description string.
Returns:
str- A summarized description string.Description:
Truncates the description to a maximum token limit, splits it by separator, and if the number of description segments exceeds a threshold, invokes the LLM with a summarization prompt (SUMMARIZE_DESCRIPTIONS_PROMPT). Uses concurrency limits to avoid LLM overload. Returns either the original truncated description or the summarized version.Usage Example:
summary = await extractor._handle_entity_relation_summary("EntityX", "long description text")
Important Implementation Details and Algorithms
Concurrency and Async Design:
Usestriofor asynchronous concurrency control with semaphores to limit parallel chunk processing (MAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK) and controlled concurrent LLM calls (viachat_limiter).LLM Caching:
Results of LLM calls are cached usingget_llm_cacheandset_llm_cacheto avoid redundant computation and reduce API calls.Entity and Relationship Extraction:
Uses heuristic splitting and helper functions (handle_single_entity_extraction,handle_single_relationship_extraction) to parse raw text records into structured entities and edges.Merging Strategy:
Entities and edges are merged by majority vote on entity types, concatenation of descriptions with separators, summing weights, and flattening unique source IDs and keywords.Graph Node Merging:
When merging nodes in a graph, the first node absorbs descriptions and edges of others, and edges to shared neighbors are merged by aggregating attributes.Summarization:
Large descriptions are summarized with a specialized prompt to condense information, improving downstream graph readability and analysis.Error Handling and Retries:
The_chatmethod retries LLM calls up to 3 times on failure and raises exceptions if all attempts fail.Logging and Callbacks:
Uses logging for info and warning messages and supports optional callback functions for progress reporting.
Interaction with Other Parts of the System
LLM Interface:
Depends onCompletionLLMfromrag.llm.chat_modelfor chat-based language model interactions.Graph Utilities:
Usesnetworkxgraph structures for node and edge management.Utility Modules:
Relies on utility functions fromgraphrag.utilsandapi.utils.api_utilsfor tasks like caching, splitting, and concurrency limiting.Prompts:
Uses predefined prompt templates such asSUMMARIZE_DESCRIPTIONS_PROMPTto instruct LLM summarization.Environment Configuration:
Concurrency limits are configurable via the environment variableMAX_CONCURRENT_PROCESS_AND_EXTRACT_CHUNK.Data Flow:
Typically, this module would be called by higher-level ingestion or processing pipelines that supply text chunks extracted from documents or other data sources. The resulting entities and relationships can then be fed into graph databases or knowledge graph services.
Visual Diagram: Extractor Class Structure
classDiagram
class Extractor {
-_llm: CompletionLLM
-_language: str | None
-_entity_types: list[str]
+__init__(llm_invoker, language=None, entity_types=None)
+__call__(doc_id: str, chunks: list[str], callback: Callable | None = None)
-_chat(system, history, gen_conf={})
-_entities_and_relations(chunk_key: str, records: list, tuple_delimiter: str)
-_merge_nodes(entity_name: str, entities: list[dict], all_relationships_data)
-_merge_edges(src_id: str, tgt_id: str, edges_data: list[dict], all_relationships_data=None)
-_merge_graph_nodes(graph: nx.Graph, nodes: list[str], change: GraphChange)
-_handle_entity_relation_summary(entity_or_relation_name: str, description: str) -> str
}
Summary
The extractor.py module is a sophisticated, asynchronous entity and relationship extraction component leveraging LLMs for understanding and summarization. It manages text chunk processing at scale, merges overlapping entity and relationship data intelligently, and produces refined graph nodes and edges ready for integration into graph-based knowledge systems. Its design balances concurrency, caching, and error resilience, making it a critical part of the InfiniFlow pipeline.