infinity_conn.py


Overview

infinity_conn.py provides a comprehensive interface to the Infinity vector search engine, acting as a document store connection adapter within the InfiniFlow system. It implements database, table, and document CRUD operations, search functionalities with complex match expressions, and utility methods to handle Infinity-specific data formats and query conditions.

This file wraps Infinity's native Python client and manages connection pooling, schema migrations, query construction, and result post-processing to integrate Infinity seamlessly with higher-level components such as RAG (retrieval-augmented generation) modules.


Classes and Functions

Utility Functions

field_keyword(field_name: str) -> bool

Checks if a given field name should be treated as a "keyword" field (usually string fields that are not lists).

equivalent_condition_to_str(condition: dict, table_instance=None) -> str | None

Converts a dictionary condition into a SQL-like filter string compatible with Infinity, considering field types and special keyword fields.

concat_dataframes(df_list: list[pd.DataFrame], selectFields: list[str]) -> pd.DataFrame

Concatenates a list of Pandas DataFrames, handling empty DataFrames and ensuring columns match expected schema.


Class: InfinityConnection

A singleton class extending DocStoreConnection to provide a robust connection and operation layer on top of Infinity.

Initialization: __init__(self)

Private Method: _migrate_db(self, inf_conn)


Database Operations

dbType(self) -> str

health(self) -> dict


Table Operations

createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int)

deleteIdx(self, indexName: str, knowledgebaseId: str)

indexExist(self, indexName: str, knowledgebaseId: str) -> bool


CRUD Operations

search(self, selectFields, highlightFields, condition, matchExprs, orderBy, offset, limit, indexNames, knowledgebaseIds, aggFields=[], rank_feature=None) -> tuple[pd.DataFrame, int]

Performs complex search queries across multiple indexes and knowledge bases with support for:

get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None

Fetches a single document by its ID from specified indexes and knowledge bases.

insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]

Inserts multiple documents into the specified table. If the table doesn’t exist, it creates it automatically.

update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool

Updates documents matching a condition with new values.

delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int

Deletes documents matching a filter condition from a table.


Helper Functions for Search Result Processing

getTotal(self, res) -> int

getChunkIds(self, res) -> list[str]

getFields(self, res, fields: list[str]) -> dict[str, dict]

getHighlight(self, res, keywords: list[str], fieldnm: str) -> dict

getAggregation(self, res, fieldnm: str)


SQL Operations

sql(sql: str, fetch_size: int, format: str)


Important Implementation Details


Interactions with Other System Components


Visual Diagram

classDiagram
    class InfinityConnection {
        - dbName: str
        - connPool: ConnectionPool
        + __init__()
        + _migrate_db(inf_conn)
        + dbType() str
        + health() dict
        + createIdx(indexName, knowledgebaseId, vectorSize)
        + deleteIdx(indexName, knowledgebaseId)
        + indexExist(indexName, knowledgebaseId) bool
        + search(selectFields, highlightFields, condition, matchExprs, orderBy, offset, limit, indexNames, knowledgebaseIds, aggFields, rank_feature) tuple
        + get(chunkId, indexName, knowledgebaseIds) dict|None
        + insert(documents, indexName, knowledgebaseId) list
        + update(condition, newValue, indexName, knowledgebaseId) bool
        + delete(condition, indexName, knowledgebaseId) int
        + getTotal(res) int
        + getChunkIds(res) list
        + getFields(res, fields) dict
        + getHighlight(res, keywords, fieldnm) dict
        + getAggregation(res, fieldnm) list
        + sql(sql, fetch_size, format)
    }

    class MatchExpr
    class MatchTextExpr
    class MatchDenseExpr
    class FusionExpr
    class OrderByExpr

    InfinityConnection ..> ConnectionPool : uses
    InfinityConnection ..> infinity.common.ConflictType : uses
    InfinityConnection ..> MatchExpr : processes
    InfinityConnection ..> MatchTextExpr
    InfinityConnection ..> MatchDenseExpr
    InfinityConnection ..> FusionExpr
    InfinityConnection ..> OrderByExpr

    class UtilityFunctions {
        + field_keyword(field_name) bool
        + equivalent_condition_to_str(condition, table_instance) str|None
        + concat_dataframes(df_list, selectFields) pd.DataFrame
    }

Summary

The infinity_conn.py file is a critical component in the InfiniFlow ecosystem, providing a powerful, extensible, and robust abstraction over the Infinity vector search engine. Its detailed handling of connection pooling, schema management, query construction, and result processing facilitates seamless integration of vector and text search capabilities into document retrieval and knowledge base systems.

This module is designed to be resilient to Infinity server status changes, flexible in handling various data types and query expressions, and efficient in managing distributed search across multiple indexes and knowledge bases.