utils.py
Overview
utils.py is a utility module primarily designed to support the management, processing, caching, and manipulation of knowledge graphs and related data structures within a larger system that deals with knowledge bases, entity extraction, and natural language processing (NLP). This file provides:
Utilities for string cleaning, variable replacement, and data validation.
Cache management functions for LLM (Large Language Model) outputs and embeddings using Redis.
Functions to build, merge, tidy, and rebuild graphs using NetworkX.
Helpers for converting graph nodes and edges into chunks suitable for downstream processing and storage.
Async functions that interact with external services for search, embedding generation, and document storage.
Support for concurrency control using Trio's
CapacityLimiter.Miscellaneous helper functions related to sequence merging, tuple operations, and format conversions.
This module acts as a backbone for various graph-related workflows such as graph construction, update propagation, caching, and persistence in a distributed environment.
Detailed Descriptions
Constants and Globals
GRAPH_FIELD_SEP: str = ""
Separator string used when concatenating graph node/edge description fields.chat_limiter: trio.CapacityLimiter
Limits concurrency of chat-related async operations to a max set by the environment variable MAX_CONCURRENT_CHATS (default 10).ErrorHandlerFn
Type alias for an error handler callback function signature.
Data Classes
GraphChange
@dataclasses.dataclass
class GraphChange:
removed_nodes: Set[str] = dataclasses.field(default_factory=set)
added_updated_nodes: Set[str] = dataclasses.field(default_factory=set)
removed_edges: Set[Tuple[str, str]] = dataclasses.field(default_factory=set)
added_updated_edges: Set[Tuple[str, str]] = dataclasses.field(default_factory=set)
Description:
Tracks changes between graph versions. Used to record nodes and edges that are added, updated, or removed, facilitating incremental updates.
Attributes:
removed_nodes- Set of node identifiers removed from the graph.added_updated_nodes- Set of node identifiers added or updated.removed_edges- Set of edge tuples (source, target) removed.added_updated_edges- Set of edge tuples (source, target) added or updated.
Functions
perform_variable_replacements(input: str, history: list[dict] | None = None, variables: dict | None = None) -> str
Replaces placeholders in the input string and optionally in a chat history log with variable values.
Parameters:
input: The input string containing placeholders like{var_name}.history: Optional list of chat messages (dicts) where system role messages will also have replacements applied.variables: A dictionary of variable names and their replacement string values.
Returns:
The input string with all{var_name}replaced byvariables[var_name]. Also mutates system messages inhistoryto apply the same replacements.Usage Example:
text = "Hello, {user}!"
history = [{"role": "system", "content": "Welcome {user}!"}]
vars = {"user": "Alice"}
result = perform_variable_replacements(text, history, vars)
# result == "Hello, Alice!"
# history[0]["content"] == "Welcome Alice!"
clean_str(input: Any) -> str
Cleans a string by unescaping HTML entities, trimming whitespace, and removing control characters.
Parameters:
input: Any input, usually a string.
Returns:
Cleaned string or the original input if it is not a string.Details:
Removes characters in ranges\x00-\x1fand\x7f-\x9fplus quotes.
dict_has_keys_with_types(data: dict, expected_fields: list[tuple[str, type]]) -> bool
Checks if a dictionary has keys with specified types.
Parameters:
data: The dictionary to check.expected_fields: List of tuples(key, type)defining required keys and their expected data types.
Returns:
Trueif all keys exist and values are instances of specified types; otherwise,False.
Caching Functions Using Redis
These functions generate cache keys by hashing concatenated inputs and use a Redis connection (REDIS_CONN) to get/set cached values.
get_llm_cache(llmnm, txt, history, genconf)
Get cached LLM generation output.set_llm_cache(llmnm, txt, v, history, genconf)
Set cached LLM generation output with 24-hour TTL.get_embed_cache(llmnm, txt)
Get cached embedding vector parsed to a numpy array.set_embed_cache(llmnm, txt, arr)
Cache embedding vector with 24-hour TTL (serializes numpy arrays as JSON lists).get_tags_from_cache(kb_ids)
Get cached tags related to knowledge base IDs.set_tags_to_cache(kb_ids, tags)
Set tags cache with 10-minute TTL.
tidy_graph(graph: nx.Graph, callback: Callable | None, check_attribute: bool = True)
Cleans a graph by removing nodes and edges missing essential attributes (description and source_id).
Parameters:
graph: The NetworkX graph to tidy.callback: Optional callback function for messaging/logging.check_attribute: Whether to check for required attributes; defaults toTrue.
Behavior:
Removes nodes and edges missing required attributes and adds emptykeywordslists to edges missing that attribute. Callscallbackwith summary messages if any removals occur.
get_from_to(node1: str, node2: str) -> Tuple[str, str]
Returns a sorted tuple (min(node1, node2), max(node1, node2)) to ensure consistent ordering of edge endpoints.
graph_merge(g1: nx.Graph, g2: nx.Graph, change: GraphChange) -> nx.Graph
Merges graph g2 into graph g1 in-place, updating change to record modifications.
Parameters:
g1: The target graph to merge into.g2: The source graph to merge from.change: AGraphChangeinstance to record added/updated nodes and edges.
Returns:
The merged graphg1.Details:
Node descriptions and source_ids are concatenated with separators.
Edge weights, descriptions, keywords, and source_ids are summed or concatenated.
Recalculates node degrees into a
rankattribute.Merges graph-level
source_idlists.
compute_args_hash(*args) -> str
Computes an MD5 hash for the string representation of input arguments tuple.
handle_single_entity_extraction(record_attributes: list[str], chunk_key: str) -> dict | None
Parses a list representing an entity record and returns a dictionary describing the entity, or None if invalid.
Parameters:
record_attributes: List of strings representing entity attributes.chunk_key: Source chunk identifier.
Returns:
Dictionary with keys:entity_name,entity_type,description,source_id.Usage:
Extracts and cleans entity data from raw records for graph node creation.
handle_single_relationship_extraction(record_attributes: list[str], chunk_key: str) -> dict | None
Parses a list representing a relationship record and returns a dictionary describing the edge.
Parameters:
record_attributes: List of strings representing relationship attributes.chunk_key: Source chunk identifier.
Returns:
Dictionary with keys:src_id,tgt_id,weight,description,keywords,source_id,metadata.
pack_user_ass_to_openai_messages(*args: str) -> list[dict]
Converts alternating user and assistant message strings into a list of OpenAI-style message dicts.
Returns:
A list of dicts with roles alternating between"user"and"assistant".
split_string_by_multi_markers(content: str, markers: list[str]) -> list[str]
Splits a string by multiple markers/strings.
Returns:
List of substrings trimmed of whitespace.
is_float_regex(value) -> bool
Regex-based check if a string represents a floating-point number.
chunk_id(chunk) -> str
Generates a unique hash ID for a chunk based on its content and knowledge base ID.
Async Graph Chunking Helpers
graph_node_to_chunk(kb_id, embd_mdl, ent_name, meta, chunks)
Creates a chunk dict representing a graph node and appends it tochunks. Retrieves or generates embedding asynchronously with concurrency limits.graph_edge_to_chunk(kb_id, embd_mdl, from_ent_name, to_ent_name, meta, chunks)
Similar tograph_node_to_chunkbut for graph edges. Embedding is generated from a combined string of source and target entity names plus description.
Async Graph Storage and Retrieval
get_relation(tenant_id, kb_id, from_ent_name, to_ent_name, size=1)
Searches for relationships between specified entities in a knowledge base.does_graph_contains(tenant_id, kb_id, doc_id)
Checks if a graph contains a document ID as source.get_graph_doc_ids(tenant_id, kb_id) -> list[str]
Retrieves document IDs associated with a graph in the knowledge base.get_graph(tenant_id, kb_id, exclude_rebuild=None)
Retrieves a graph from storage or rebuilds it if marked as removed.set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, change: GraphChange, callback)
Stores the graph and related subgraphs into persistent storage, handling deletions and additions with concurrency and progress callbacks.
Miscellaneous Utilities
is_continuous_subsequence(subseq, seq)
Checks ifsubseqis a continuous subsequence ofseq.merge_tuples(list1, list2)
Merges tuples from two lists by connecting matching end/start elements, avoiding cyclic overlaps.get_entity_type2samples(idxnms, kb_ids: list)
Retrieves samples of entities grouped by entity types from the system's retrieval backend.flat_uniq_list(arr, key)
Flattens a list of dicts extracting lists or single values by key and returns a unique set.rebuild_graph(tenant_id, kb_id, exclude_rebuild=None)
Reconstructs a graph by composing all subgraphs stored in the knowledge base, optionally excluding certain sources.
Important Implementation Details
Caching: Uses
xxhashfor fast hashing of inputs to generate cache keys for embeddings and LLM responses stored in Redis with TTLs.Concurrency Control: Uses
trio.CapacityLimiterto limit concurrent embedding and chat operations, preventing resource saturation.Graph Storage: Stores graphs and subgraphs as JSON node-link data in a document store, indexed by knowledge base ID and keywords.
Graph Merging: When merging graphs, node and edge metadata like descriptions and source IDs are concatenated or aggregated to preserve provenance.
Timeouts: Embedding and graph storage operations use configurable timeouts, with environment variable control.
Asynchronous Design: Heavy IO and computation tasks are offloaded to threads or run asynchronously with Trio, supporting efficient concurrency.
Interactions with Other System Components
settings: Provides configuration and access to external services such as retrieval engine (settings.retrievaler) and document storage connection (settings.docStoreConn).ragmodules: Usesrag_tokenizerfor tokenization, andsearchutilities for query construction.api.utils: Depends on utility functions likeget_uuidfor unique ID generation and decorators liketimeout.Redis (
REDIS_CONN): Used as a caching backend for embeddings, LLM outputs, and tags.NetworkX (
nx): Used extensively for graph data structures and operations.Trio: Used for asynchronous concurrency management and cancellation.
Usage Examples
Merging Two Graphs
from networkx import Graph
g1 = Graph()
g1.add_node("A", description="Node A", source_id=["doc1"])
g2 = Graph()
g2.add_node("A", description="Additional info", source_id=["doc2"])
g2.add_node("B", description="Node B", source_id=["doc2"])
g2.add_edge("A", "B", weight=1.0, description="connects A and B", keywords=[], source_id=["doc2"])
change = GraphChange()
merged_graph = graph_merge(g1, g2, change)
Cleaning a String
dirty_string = "Hello & welcome to \x01the system!"
cleaned = clean_str(dirty_string)
# cleaned == "Hello & welcome to the system!"
Using Caching for Embeddings
embedding_model = ... # some embedding model with llm_name attribute and encode() method
text = "Sample text"
cached_embedding = get_embed_cache(embedding_model.llm_name, text)
if cached_embedding is None:
embedding = embedding_model.encode([text])[0]
set_embed_cache(embedding_model.llm_name, text, embedding)
Visual Diagram - Flowchart of Main Utility Functions and Their Relationships
flowchart TD
perform_variable_replacements --> clean_str
perform_variable_replacements --> dict_has_keys_with_types
clean_str -->|used by| handle_single_entity_extraction
clean_str -->|used by| handle_single_relationship_extraction
graph_merge --> tidy_graph
graph_merge --> get_from_to
get_llm_cache --> set_llm_cache
get_embed_cache --> set_embed_cache
graph_node_to_chunk --> get_embed_cache
graph_node_to_chunk --> set_embed_cache
graph_edge_to_chunk --> get_embed_cache
graph_edge_to_chunk --> set_embed_cache
set_graph --> graph_node_to_chunk
set_graph --> graph_edge_to_chunk
rebuild_graph --> graph_merge
merge_tuples --> is_continuous_subsequence
subgraph cache_ops [Cache Operations]
get_llm_cache
set_llm_cache
get_embed_cache
set_embed_cache
get_tags_from_cache
set_tags_to_cache
end
subgraph graph_ops [Graph Operations]
graph_merge
tidy_graph
rebuild_graph
set_graph
get_graph
end
subgraph data_extract [Data Extraction]
handle_single_entity_extraction
handle_single_relationship_extraction
end
subgraph util_checks [Utility Checks]
clean_str
dict_has_keys_with_types
is_float_regex
compute_args_hash
end
Summary
utils.py is a comprehensive utility library that supports the lifecycle of knowledge graphs and their associated data in a complex AI-driven knowledge system. It provides standardized methods for cleaning data, extracting and merging graph components, caching expensive computations, and asynchronously managing graph persistence and reconstruction. Its design emphasizes robustness, concurrency control, and integration with external services like Redis and Elasticsearch-like document stores.
This module is foundational for the overall system's efficiency and correctness in managing and querying knowledge graphs and their embeddings.