infinity_conn.py
Overview
infinity_conn.py provides a comprehensive interface to the Infinity vector search engine, acting as a document store connection adapter within the InfiniFlow system. It implements database, table, and document CRUD operations, search functionalities with complex match expressions, and utility methods to handle Infinity-specific data formats and query conditions.
This file wraps Infinity's native Python client and manages connection pooling, schema migrations, query construction, and result post-processing to integrate Infinity seamlessly with higher-level components such as RAG (retrieval-augmented generation) modules.
Classes and Functions
Utility Functions
field_keyword(field_name: str) -> bool
Checks if a given field name should be treated as a "keyword" field (usually string fields that are not lists).
Parameters:
field_name: Name of the field.
Returns:
Trueif the field is a keyword field,Falseotherwise.
Example Usage:
is_kwd = field_keyword("source_id") # True is_kwd = field_keyword("docnm_kwd") # False
equivalent_condition_to_str(condition: dict, table_instance=None) -> str | None
Converts a dictionary condition into a SQL-like filter string compatible with Infinity, considering field types and special keyword fields.
Parameters:
condition: A dictionary of field conditions (e.g., filters).table_instance: (Optional) Table metadata to infer column types.
Returns:
A string representing the condition in Infinity's expected syntax.
Details:
Handles full-text filters for keyword fields.
Supports
must_notandexistsclauses.Escapes string values appropriately.
Example Usage:
cond_str = equivalent_condition_to_str({"status": "active", "source_id": ["src1", "src2"]})
concat_dataframes(df_list: list[pd.DataFrame], selectFields: list[str]) -> pd.DataFrame
Concatenates a list of Pandas DataFrames, handling empty DataFrames and ensuring columns match expected schema.
Parameters:
df_list: List of DataFrames to concatenate.selectFields: List of selected fields/columns.
Returns:
A single concatenated DataFrame with correct columns.
Example Usage:
combined_df = concat_dataframes([df1, df2], ["id", "score()", "content"])
Class: InfinityConnection
A singleton class extending DocStoreConnection to provide a robust connection and operation layer on top of Infinity.
Initialization: __init__(self)
Initializes connection pool to Infinity using settings.
Waits for Infinity to be healthy (up to 120 seconds).
Performs automatic schema migration based on
infinity_mapping.json.
Private Method: _migrate_db(self, inf_conn)
Ensures the database and tables conform to the latest schema.
Adds missing columns and creates necessary indexes (e.g., full-text indices).
Uses project configuration file
conf/infinity_mapping.json.
Database Operations
dbType(self) -> str
Returns the database type string
"infinity".
health(self) -> dict
Checks the health status of the Infinity server.
Returns a dictionary with
type,status("green" or "red"), and error messages if any.
Table Operations
createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int)
Creates a new table in Infinity with the specified index and knowledge base ID.
Sets up vector fields and indexes (HNSW for vector search, full-text for text fields).
Loads schema from
infinity_mapping.json.
deleteIdx(self, indexName: str, knowledgebaseId: str)
Drops a table corresponding to the given index and knowledge base ID.
indexExist(self, indexName: str, knowledgebaseId: str) -> bool
Checks if a specific index table exists in the database.
CRUD Operations
search(self, selectFields, highlightFields, condition, matchExprs, orderBy, offset, limit, indexNames, knowledgebaseIds, aggFields=[], rank_feature=None) -> tuple[pd.DataFrame, int]
Performs complex search queries across multiple indexes and knowledge bases with support for:
Text matching (
MatchTextExpr)Dense vector matching (
MatchDenseExpr)Fusion of multiple match expressions (
FusionExpr)Filtering, sorting, pagination, and aggregation fields.
Parameters:
selectFields: List of fields to retrieve.highlightFields: Fields to highlight (currently not supported).condition: Filter conditions as a dict.matchExprs: List of match expressions (text/dense/fusion).orderBy: Ordering expression.offset: Pagination offset.limit: Number of results to return.indexNames: Index names (string or list).knowledgebaseIds: List of knowledge base IDs.aggFields: Fields to aggregate.rank_feature: Optional ranking feature weights.
Returns:
Tuple of a Pandas DataFrame of results and total hit count.
Usage Example:
results, total = infinity_conn.search( selectFields=["id", "content"], highlightFields=["content"], condition={"status": "active"}, matchExprs=[MatchTextExpr(fields=["content"], matching_text="AI")], orderBy=OrderByExpr(fields=[("score", 1)]), offset=0, limit=10, indexNames="myindex", knowledgebaseIds=["kb1"] )
get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None
Fetches a single document by its ID from specified indexes and knowledge bases.
Returns:
Document as a dictionary or
Noneif not found.
insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]
Inserts multiple documents into the specified table. If the table doesn’t exist, it creates it automatically.
Handles special field serialization (e.g., keyword fields, embeddings).
Deletes existing documents with the same IDs before insertion.
Returns:
Empty list (currently placeholder for potential ID list).
update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool
Updates documents matching a condition with new values.
Supports complex field updates including removal of items from list fields.
Converts fields into proper formats for Infinity storage.
Returns:
Trueon success.
delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int
Deletes documents matching a filter condition from a table.
Returns:
Number of rows deleted.
Helper Functions for Search Result Processing
getTotal(self, res) -> int
Returns total hit count from search results (tuple or DataFrame).
getChunkIds(self, res) -> list[str]
Extracts document IDs from the search results.
getFields(self, res, fields: list[str]) -> dict[str, dict]
Converts search results DataFrame into a dictionary keyed by document ID.
Handles deserialization of keyword fields, JSON fields, and integer encoding fields.
getHighlight(self, res, keywords: list[str], fieldnm: str) -> dict
Generates simple HTML-highlighted snippets of matched keywords in text fields.
getAggregation(self, res, fieldnm: str)
Manually aggregates tag counts from result sets as Infinity does not support native aggregation.
SQL Operations
sql(sql: str, fetch_size: int, format: str)
Not implemented; raises
NotImplementedError.
Important Implementation Details
Singleton Pattern:
InfinityConnectionis decorated with@singletonto ensure a single instance.Connection Pooling: Uses
infinity.connection_pool.ConnectionPoolfor managing multiple connections efficiently.Schema Migration: Automatically updates table schemas by reading from a centralized JSON mapping file.
Complex Query Building: Supports multiple match expressions and fusion queries, combining text and vector search.
Field Serialization: Special handling for list fields, embedding vectors, and hexadecimal encoding for certain integer arrays.
Result Post-processing: Converts Infinity's raw results into user-friendly formats (lists, dicts, JSON-loaded fields).
Error Handling: Gracefully handles table not found errors, waiting for Infinity readiness, and logs warnings for unexpected issues.
Interactions with Other System Components
Integration with RAG Modules: Implements
DocStoreConnectioninterface used by RAG components for document retrieval and storage.Settings: Reads configurations from
rag.settingsandrag.settings.PAGERANK_FLD,TAG_FLD.Utility Modules: Uses helper utilities from
rag.utils,api.utils.file_utils, andrag.utils.doc_store_connfor connection management and query expression handling.Infinity SDK: Wraps and extends functionality from the native Infinity Python client libraries (
infinity,infinity.common,infinity.index, etc.).Logging: Integrates with the
ragflow.infinity_connlogger for detailed debug/info logs.
Visual Diagram
classDiagram
class InfinityConnection {
- dbName: str
- connPool: ConnectionPool
+ __init__()
+ _migrate_db(inf_conn)
+ dbType() str
+ health() dict
+ createIdx(indexName, knowledgebaseId, vectorSize)
+ deleteIdx(indexName, knowledgebaseId)
+ indexExist(indexName, knowledgebaseId) bool
+ search(selectFields, highlightFields, condition, matchExprs, orderBy, offset, limit, indexNames, knowledgebaseIds, aggFields, rank_feature) tuple
+ get(chunkId, indexName, knowledgebaseIds) dict|None
+ insert(documents, indexName, knowledgebaseId) list
+ update(condition, newValue, indexName, knowledgebaseId) bool
+ delete(condition, indexName, knowledgebaseId) int
+ getTotal(res) int
+ getChunkIds(res) list
+ getFields(res, fields) dict
+ getHighlight(res, keywords, fieldnm) dict
+ getAggregation(res, fieldnm) list
+ sql(sql, fetch_size, format)
}
class MatchExpr
class MatchTextExpr
class MatchDenseExpr
class FusionExpr
class OrderByExpr
InfinityConnection ..> ConnectionPool : uses
InfinityConnection ..> infinity.common.ConflictType : uses
InfinityConnection ..> MatchExpr : processes
InfinityConnection ..> MatchTextExpr
InfinityConnection ..> MatchDenseExpr
InfinityConnection ..> FusionExpr
InfinityConnection ..> OrderByExpr
class UtilityFunctions {
+ field_keyword(field_name) bool
+ equivalent_condition_to_str(condition, table_instance) str|None
+ concat_dataframes(df_list, selectFields) pd.DataFrame
}
Summary
The infinity_conn.py file is a critical component in the InfiniFlow ecosystem, providing a powerful, extensible, and robust abstraction over the Infinity vector search engine. Its detailed handling of connection pooling, schema management, query construction, and result processing facilitates seamless integration of vector and text search capabilities into document retrieval and knowledge base systems.
This module is designed to be resilient to Infinity server status changes, flexible in handling various data types and query expressions, and efficient in managing distributed search across multiple indexes and knowledge bases.