utils.py


Overview

utils.py is a utility module primarily designed to support the management, processing, caching, and manipulation of knowledge graphs and related data structures within a larger system that deals with knowledge bases, entity extraction, and natural language processing (NLP). This file provides:

This module acts as a backbone for various graph-related workflows such as graph construction, update propagation, caching, and persistence in a distributed environment.


Detailed Descriptions

Constants and Globals


Data Classes

GraphChange

@dataclasses.dataclass
class GraphChange:
    removed_nodes: Set[str] = dataclasses.field(default_factory=set)
    added_updated_nodes: Set[str] = dataclasses.field(default_factory=set)
    removed_edges: Set[Tuple[str, str]] = dataclasses.field(default_factory=set)
    added_updated_edges: Set[Tuple[str, str]] = dataclasses.field(default_factory=set)

Description:
Tracks changes between graph versions. Used to record nodes and edges that are added, updated, or removed, facilitating incremental updates.

Attributes:


Functions

perform_variable_replacements(input: str, history: list[dict] | None = None, variables: dict | None = None) -> str

Replaces placeholders in the input string and optionally in a chat history log with variable values.

text = "Hello, {user}!"
history = [{"role": "system", "content": "Welcome {user}!"}]
vars = {"user": "Alice"}
result = perform_variable_replacements(text, history, vars)
# result == "Hello, Alice!"
# history[0]["content"] == "Welcome Alice!"

clean_str(input: Any) -> str

Cleans a string by unescaping HTML entities, trimming whitespace, and removing control characters.


dict_has_keys_with_types(data: dict, expected_fields: list[tuple[str, type]]) -> bool

Checks if a dictionary has keys with specified types.


Caching Functions Using Redis

These functions generate cache keys by hashing concatenated inputs and use a Redis connection (REDIS_CONN) to get/set cached values.


tidy_graph(graph: nx.Graph, callback: Callable | None, check_attribute: bool = True)

Cleans a graph by removing nodes and edges missing essential attributes (description and source_id).


get_from_to(node1: str, node2: str) -> Tuple[str, str]

Returns a sorted tuple (min(node1, node2), max(node1, node2)) to ensure consistent ordering of edge endpoints.


graph_merge(g1: nx.Graph, g2: nx.Graph, change: GraphChange) -> nx.Graph

Merges graph g2 into graph g1 in-place, updating change to record modifications.


compute_args_hash(*args) -> str

Computes an MD5 hash for the string representation of input arguments tuple.


handle_single_entity_extraction(record_attributes: list[str], chunk_key: str) -> dict | None

Parses a list representing an entity record and returns a dictionary describing the entity, or None if invalid.


handle_single_relationship_extraction(record_attributes: list[str], chunk_key: str) -> dict | None

Parses a list representing a relationship record and returns a dictionary describing the edge.


pack_user_ass_to_openai_messages(*args: str) -> list[dict]

Converts alternating user and assistant message strings into a list of OpenAI-style message dicts.


split_string_by_multi_markers(content: str, markers: list[str]) -> list[str]

Splits a string by multiple markers/strings.


is_float_regex(value) -> bool

Regex-based check if a string represents a floating-point number.


chunk_id(chunk) -> str

Generates a unique hash ID for a chunk based on its content and knowledge base ID.


Async Graph Chunking Helpers


Async Graph Storage and Retrieval


Miscellaneous Utilities


Important Implementation Details


Interactions with Other System Components


Usage Examples

Merging Two Graphs

from networkx import Graph

g1 = Graph()
g1.add_node("A", description="Node A", source_id=["doc1"])
g2 = Graph()
g2.add_node("A", description="Additional info", source_id=["doc2"])
g2.add_node("B", description="Node B", source_id=["doc2"])
g2.add_edge("A", "B", weight=1.0, description="connects A and B", keywords=[], source_id=["doc2"])

change = GraphChange()
merged_graph = graph_merge(g1, g2, change)

Cleaning a String

dirty_string = "Hello & welcome to \x01the system!"
cleaned = clean_str(dirty_string)
# cleaned == "Hello & welcome to the system!"

Using Caching for Embeddings

embedding_model = ...  # some embedding model with llm_name attribute and encode() method
text = "Sample text"
cached_embedding = get_embed_cache(embedding_model.llm_name, text)
if cached_embedding is None:
    embedding = embedding_model.encode([text])[0]
    set_embed_cache(embedding_model.llm_name, text, embedding)

Visual Diagram - Flowchart of Main Utility Functions and Their Relationships

flowchart TD
    perform_variable_replacements --> clean_str
    perform_variable_replacements --> dict_has_keys_with_types

    clean_str -->|used by| handle_single_entity_extraction
    clean_str -->|used by| handle_single_relationship_extraction

    graph_merge --> tidy_graph
    graph_merge --> get_from_to

    get_llm_cache --> set_llm_cache
    get_embed_cache --> set_embed_cache

    graph_node_to_chunk --> get_embed_cache
    graph_node_to_chunk --> set_embed_cache

    graph_edge_to_chunk --> get_embed_cache
    graph_edge_to_chunk --> set_embed_cache

    set_graph --> graph_node_to_chunk
    set_graph --> graph_edge_to_chunk

    rebuild_graph --> graph_merge

    merge_tuples --> is_continuous_subsequence

    subgraph cache_ops [Cache Operations]
        get_llm_cache
        set_llm_cache
        get_embed_cache
        set_embed_cache
        get_tags_from_cache
        set_tags_to_cache
    end

    subgraph graph_ops [Graph Operations]
        graph_merge
        tidy_graph
        rebuild_graph
        set_graph
        get_graph
    end

    subgraph data_extract [Data Extraction]
        handle_single_entity_extraction
        handle_single_relationship_extraction
    end

    subgraph util_checks [Utility Checks]
        clean_str
        dict_has_keys_with_types
        is_float_regex
        compute_args_hash
    end

Summary

utils.py is a comprehensive utility library that supports the lifecycle of knowledge graphs and their associated data in a complex AI-driven knowledge system. It provides standardized methods for cleaning data, extracting and merging graph components, caching expensive computations, and asynchronously managing graph persistence and reconstruction. Its design emphasizes robustness, concurrency control, and integration with external services like Redis and Elasticsearch-like document stores.

This module is foundational for the overall system's efficiency and correctness in managing and querying knowledge graphs and their embeddings.