entity_resolution.py
Overview
The entity_resolution.py file is a core module of the InfiniFlow project responsible for entity resolution within graphs. It extends an Extractor base class to identify and merge nodes in a graph that refer to the same real-world entity. This is achieved by leveraging a language model (LLM) to evaluate candidate pairs of entities, decide whether they represent the same entity, and then merging the graph nodes accordingly.
The module supports asynchronous batch processing of candidate pairs with concurrency control, uses heuristics to pre-filter pairs likely to be duplicates, and updates the graph structure and node rankings (PageRank) as a result of the resolution.
Classes and Functions
Class: EntityResolutionResult
@dataclass
class EntityResolutionResult:
graph: nx.Graph
change: GraphChange
Purpose: Encapsulates the result of the entity resolution process.
Attributes:
graph(networkx.Graph): The updated graph after resolution and merging.change(GraphChange): An object tracking changes made to the graph during merging.
Class: EntityResolution
class EntityResolution(Extractor):
Inheritance: Extends
Extractor(fromgraphrag.general.extractor).Purpose: Implements entity resolution logic on a graph using an LLM for decision-making.
Main Responsibilities:
Identify candidate entity pairs for resolution.
Query the LLM asynchronously to decide if pairs represent the same entity.
Merge nodes representing the same entity.
Update graph metrics (e.g., PageRank).
Constructor: __init__
def __init__(self, llm_invoker: CompletionLLM)
Parameters:
llm_invoker(CompletionLLM): An instance of a language model completion interface used to perform entity resolution queries.
Behavior: Initializes prompt templates and delimiter keys used for parsing the LLM's output.
Method: __call__
async def __call__(
self,
graph: nx.Graph,
subgraph_nodes: set[str],
prompt_variables: dict[str, Any] | None = None,
callback: Callable | None = None,
) -> EntityResolutionResult
Purpose: Main entry point to perform entity resolution on a given graph.
Parameters:
graph(nx.Graph): The graph to resolve entities within.subgraph_nodes(set[str]): Set of node identifiers to restrict the resolution scope.prompt_variables(dict, optional): Variables for customizing LLM prompt templates.callback(Callable, optional): Function to receive progress updates as text messages.
Returns:
EntityResolutionResultcontaining the updated graph and the changes applied.Workflow:
Prepare entity clusters by their type.
Generate candidate pairs for resolution using heuristics (
is_similaritymethod).Asynchronously batch query the LLM to resolve candidate pairs (with concurrency limits).
Collect pairs confirmed as duplicates and build a connectivity graph.
Merge connected components of duplicate nodes asynchronously into single nodes.
Recompute PageRank scores for all nodes.
Return results.
Method: _resolve_candidate
async def _resolve_candidate(
self,
candidate_resolution_i: tuple[str, list[tuple[str, str]]],
resolution_result: set[str],
resolution_result_lock: trio.Lock
)
Purpose: Interact with the LLM to resolve whether candidate pairs of a specific entity type represent the same entity.
Parameters:
candidate_resolution_i(tuple[str, list[tuple[str, str]]]): A tuple where the first element is the entity type string, and the second is a list of candidate pairs(nodeA, nodeB)to resolve.resolution_result(set): Shared set to store pairs confirmed as duplicates.resolution_result_lock(trio.Lock): Lock to ensure thread-safe access toresolution_result.
Behavior:
Constructs a prompt describing the candidate pairs.
Sends the prompt to the LLM with concurrency and timeout controls.
Parses the LLM's response with
_process_results.Updates
resolution_resultwith pairs confirmed as duplicates.
Method: _process_results
def _process_results(
self,
records_length: int,
results: str,
record_delimiter: str,
entity_index_delimiter: str,
resolution_result_delimiter: str
) -> list
Purpose: Parse the LLM response text to extract which candidate pairs are duplicates.
Parameters:
records_length(int): Number of candidate pairs sent for resolution.results(str): Raw string response from the LLM.record_delimiter(str): Delimiter string to split records.entity_index_delimiter(str): Delimiter string to identify candidate pair indices.resolution_result_delimiter(str): Delimiter string to identify resolution decision.
Returns: List of tuples
(pair_index, "yes")for confirmed duplicate pairs.Implementation Details:
Splits response by records.
Uses regular expressions to extract pair indices and yes/no decisions.
Filters and returns only positive ("yes") matches.
Method: _has_digit_in_2gram_diff
def _has_digit_in_2gram_diff(self, a: str, b: str) -> bool
Purpose: Heuristic to quickly reject pairs that differ in 2-grams containing digits.
Parameters:
a,b(str): Two entity names.
Returns:
Trueif any 2-gram difference contains a digit, elseFalse.Usage: Used to exclude pairs that are likely different based on numeric differences.
Method: is_similarity
def is_similarity(self, a: str, b: str) -> bool
Purpose: Heuristic to decide if two entity names are similar enough to be considered candidate duplicates.
Parameters:
a,b(str): Two entity names.
Returns:
Trueif the two names are similar, elseFalse.Algorithm:
Reject immediately if digit-containing 2-gram differences exist.
If both strings are English, use edit distance threshold (
<= half the length of shorter string).Otherwise, compare character sets, requiring at least 80% overlap for longer strings or at least 2 common characters for short strings.
Important Implementation Details and Algorithms
Asynchronous Batch Processing: Uses
trioconcurrency framework to batch entity pairs and resolve them concurrently with a semaphore limiting to 5 concurrent tasks.Timeouts: Each LLM query has a timeout of 280 seconds (configurable via environment variable).
Graph Merging: After resolution, connected components of confirmed duplicate edges are merged asynchronously into single nodes using
_merge_graph_nodes(inherited or implemented elsewhere).PageRank Update: After merging, PageRank values are recalculated and stored as node attributes to reflect new graph structure.
Prompt Construction: Constructs natural language prompts for the LLM describing pairs to resolve, including domain-specific instructions.
Candidate Pair Filtering: Uses heuristics and entity types to reduce the number of pairs sent to the LLM, improving efficiency.
Result Parsing: Uses regex to parse structured LLM output based on configurable delimiters.
Interactions with Other System Components
LLM Interface: Uses
CompletionLLMfromrag.llm.chat_modelas the language model backend for entity resolution decisions.Graph Utilities: Operates on
networkx.Graphinstances, relying on node attributes like"entity_type".Prompt Templates: Uses
ENTITY_RESOLUTION_PROMPTfromgraphrag.entity_resolution_promptto format LLM queries.Utility Functions:
perform_variable_replacementsfromgraphrag.utilsto inject variables into prompts.chat_limiter(likely a concurrency or rate limiter) to regulate LLM chat usage.
Base Extractor: Inherits from
Extractor(graphrag.general.extractor), integrating with broader extraction workflows.
Usage Example
import networkx as nx
from rag.llm.chat_model import Base as CompletionLLM
from entity_resolution import EntityResolution
# Assume graph is an existing networkx Graph with nodes having 'entity_type' attributes
graph = nx.Graph()
# ... populate graph ...
# LLM invoker instance (implementation dependent)
llm = CompletionLLM(...)
# Create EntityResolution instance
entity_resolver = EntityResolution(llm)
# Define nodes of interest
subgraph_nodes = set(graph.nodes)
# Define optional callback to track progress
def progress_callback(msg):
print(msg)
import trio
async def resolve_entities():
result = await entity_resolver(graph, subgraph_nodes, callback=progress_callback)
resolved_graph = result.graph
changes = result.change
# Use resolved_graph and changes as needed
trio.run(resolve_entities)
Mermaid Class Diagram
classDiagram
class EntityResolutionResult {
+graph: nx.Graph
+change: GraphChange
}
class EntityResolution {
-_resolution_prompt: str
-_output_formatter_prompt: str
-_record_delimiter_key: str
-_entity_index_delimiter_key: str
-_resolution_result_delimiter_key: str
-_llm: CompletionLLM
+__init__(llm_invoker: CompletionLLM)
+__call__(graph: nx.Graph, subgraph_nodes: set[str], prompt_variables: dict | None, callback: Callable | None) async EntityResolutionResult
-_resolve_candidate(candidate_resolution_i: tuple, resolution_result: set, resolution_result_lock: trio.Lock) async
-_process_results(records_length: int, results: str, record_delimiter: str, entity_index_delimiter: str, resolution_result_delimiter: str) list
-_has_digit_in_2gram_diff(a: str, b: str) bool
-is_similarity(a: str, b: str) bool
}
EntityResolutionResult <|-- EntityResolution
Summary
The entity_resolution.py module is a critical component for deduplicating and merging entities in graph data structures by leveraging natural language model inference combined with graph heuristics and asynchronous batch processing. It abstracts the complexity of interacting with LLMs for entity comparison, provides robust concurrency management, and updates the graph structure and metadata to reflect resolved entities. This module integrates tightly with InfiniFlow's graph processing and LLM infrastructure.