opensearch_conn.py
Overview
The opensearch_conn.py file provides a comprehensive Python interface for interacting with an OpenSearch cluster, specifically tailored for document storage and retrieval in the InfiniFlow project. It implements a singleton class OSConnection that extends the generic DocStoreConnection interface to perform database operations, index management, CRUD (Create, Read, Update, Delete) operations, complex searches including KNN vector similarity search, and SQL-like query execution on OpenSearch.
This file encapsulates OpenSearch-specific functionality, including connection setup with retry logic, health checks, index existence verification, advanced query building (text, dense vector, and fusion queries), and helper methods for processing search results. It is a critical component for integrating OpenSearch as a document engine backend in the larger system.
Classes and Functions
Class: OSConnection
Singleton
Extends DocStoreConnection to provide OpenSearch-specific implementations.
Initialization: __init__(self)
Purpose: Establishes a connection to OpenSearch using settings from the
rag.settingsmodule. It verifies the cluster health, OpenSearch version (must be >= 2), and loads the index mapping from a JSON configuration file.Retries: Attempts connection up to
ATTEMPT_TIME(2) times with a delay if initial connections fail.Exceptions: Raises an exception if the cluster is unhealthy, the version is unsupported, or required config files are missing.
Method: dbType(self) -> str
Returns:
"opensearch"Purpose: Identifies the database type for this connection.
Method: health(self) -> dict
Returns: Dictionary containing cluster health info augmented with
"type": "opensearch".Purpose: Provides cluster health metrics.
Index (Table) Operations
Method: createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int)
Parameters:
indexName: Name of the OpenSearch index.knowledgebaseId: Knowledge base identifier.vectorSize: Dimensionality of vector embeddings (currently unused).
Returns:
Trueif index exists or created successfully, else logs exception.Purpose: Creates an index with predefined mappings if it does not exist.
Method: deleteIdx(self, indexName: str, knowledgebaseId: str)
Parameters: Same as
createIdx.Returns: None.
Purpose: Deletes an index if
knowledgebaseIdis empty, otherwise skips deletion (since multiple KBs share one index).Exception Handling: Ignores
NotFoundError.
Method: indexExist(self, indexName: str, knowledgebaseId: str = None) -> bool
Parameters:
indexName: Index to check for existence.knowledgebaseId: Optional knowledge base ID.
Returns: Boolean indicating whether the index exists.
Implementation: Uses OpenSearch Index.exists() with retry on timeout or conflict errors.
CRUD Operations
Method: search(...)
Parameters:
selectFields(list of str): Fields to select in results.highlightFields(list of str): Fields to highlight in results.condition(dict): Filter conditions.matchExprs(list ofMatchExpr): List of match expressions for query construction.orderBy(OrderByExpr): Sort order.offset(int): Result offset.limit(int): Number of results to return.indexNames(str or list of str): Target indices.knowledgebaseIds(list of str): Filter by knowledge bases.aggFields(list of str): Fields to aggregate.rank_feature(dict or None): Fields and boosts for rank_feature queries.
Returns: Raw OpenSearch search response dict.
Purpose: Constructs and executes a complex OpenSearch query supporting:
Boolean filters on conditions.
Text matching with
query_string.Dense vector KNN search for embedding similarity.
Fusion of text and vector match scores with weighted sum.
Highlighting of matched terms.
Sorting with appropriate unmapped type handling.
Aggregations on specified fields.
Implementation Details:
If
matchExprsinclude aFusionExprwith weighted sum, weights are extracted to balance text vs vector similarity.KNN search is manually constructed as OpenSearch Python client does not provide an encapsulated method.
Retry logic on timeouts.
Usage Example:
res = os_conn.search(
selectFields=["title", "content"],
highlightFields=["content"],
condition={"kb_id": ["kb1"], "available_int": 1},
matchExprs=[MatchTextExpr(fields=["content"], matching_text="open search"), MatchDenseExpr(...), FusionExpr(...)],
orderBy=OrderByExpr(fields=[("page_num_int", 0)]),
offset=0,
limit=10,
indexNames="my_index",
knowledgebaseIds=["kb1"]
)
Method: get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None
Parameters:
chunkId: Document ID.indexName: Index to query.knowledgebaseIds: List of knowledge base IDs for filtering (not used in this method).
Returns: Document as a dict with
_sourceand added"id"key, orNoneif not found.Purpose: Retrieves a single document by ID.
Retries: On timeout up to
ATTEMPT_TIME.Exceptions: Returns
Noneif document not found.
Method: insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]
Parameters:
documents: List of documents to insert. Each must have"id"key (mapped to_id).indexName: Target index.knowledgebaseId: Optional knowledge base ID.
Returns: List of error strings if any occurred during bulk insertion, empty list on success.
Purpose: Bulk inserts documents using OpenSearch Bulk API.
Implementation Detail: Uses deep copy to avoid modifying original documents, separates metadata
_id.Retries: Retries on timeout with back-off.
Method: update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool
Parameters:
condition: Filter conditions for documents to update. Supportsidfor single document update or boolean filters for multiple.newValue: Fields and values to update; special keys"add"and"remove"support array operations.indexName: Target index.knowledgebaseId: Knowledgebase ID.
Returns:
Trueif update succeeded, elseFalse.Purpose: Updates documents matching conditions using either direct update (by id) or
UpdateByQuerywith scripts.Implementation Details:
Uses painless scripting to add/remove fields or set values.
Supports updating arrays by adding/removing elements.
Handles complex filtering including "exists" conditions.
Retries on timeout, connection failures, and conflict exceptions.
Method: delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int
Parameters:
condition: Delete conditions. Supports deletion byid(single or list), or filters using boolean queries.indexName: Target index.knowledgebaseId: Knowledge base ID.
Returns: Number of documents deleted.
Purpose: Deletes documents matching the condition using OpenSearch
delete_by_query.Implementation Details:
Supports "exists" and "must_not" filters.
Retries on timeout or connection errors.
Returns 0 if documents not found.
Helper Functions for Search Result Processing
getTotal(self, res) -> int
Returns total hit count from search response.getChunkIds(self, res) -> list[str]
Extracts document IDs from search hits.getFields(self, res, fields: list[str]) -> dict[str, dict]
Returns a dictionary mapping doc IDs to requested fields and their values.getHighlight(self, res, keywords: list[str], fieldnm: str) -> dict[str, str]
Extracts and formats highlighted snippets for matched keywords from search results.getAggregation(self, res, fieldnm: str) -> list[tuple]
Returns list of (key, doc_count) tuples from aggregation results.
SQL Query Support
Method: sql(self, sql: str, fetch_size: int, format: str)
Parameters:
sql: SQL query string.fetch_size: Number of rows to fetch.format: Response format (e.g., JSON).
Returns: Raw SQL query response from OpenSearch or
Noneon failure.Purpose: Executes SQL queries against OpenSearch, transforming token-based LIKE clauses into OpenSearch MATCH queries with fine-grained tokenization.
Implementation Details:
Performs regex replacements to translate SQL LIKE on token fields to OpenSearch MATCH syntax with specific operators.
Retries on connection timeouts.
Logs exceptions.
Important Implementation Details and Algorithms
Singleton pattern: Ensures only one connection instance is created.
Retry logic: Multiple methods retry on transient errors like timeouts or connection issues.
KNN vector similarity search: Implemented manually in DSL since OpenSearch Python client lacks built-in KNN encapsulation. Supports integrating vector similarity with traditional text matching using weighted fusion expressions.
Painless scripting: Used in update operations to perform partial updates, add/remove elements in array fields, and complex field manipulations.
Highlight post-processing: Highlights keywords in English and non-English text differently to improve result readability.
SQL translation: Converts SQL LIKE on token fields to OpenSearch MATCH queries with tokenizers, enhancing full-text search capabilities.
Index mapping loading: Loads JSON index mapping from a config file to ensure consistent schema.
Interaction with Other Components
Settings (
rag.settings): Reads OpenSearch hosts, credentials, and field names (TAG_FLD,PAGERANK_FLD).Utility modules (
rag.utils,api.utils.file_utils): Uses singleton decorator, base directory retrieval, and document store connection interfaces.NLP utilities (
rag.nlp): Uses language detection and tokenization for text processing and SQL translation.OpenSearch Python client: Core dependency for all OpenSearch operations.
DocStoreConnection base class: Defines the interface contract for document store connections, ensuring compatibility within the system's storage abstraction.
Visual Diagram
classDiagram
class OSConnection {
+info: dict
+os: OpenSearch
+mapping: dict
+__init__()
+dbType() str
+health() dict
+createIdx(indexName: str, knowledgebaseId: str, vectorSize: int)
+deleteIdx(indexName: str, knowledgebaseId: str)
+indexExist(indexName: str, knowledgebaseId: str = None) bool
+search(selectFields: list[str], highlightFields: list[str], condition: dict,
matchExprs: list[MatchExpr], orderBy: OrderByExpr, offset: int, limit: int,
indexNames: str | list[str], knowledgebaseIds: list[str], aggFields: list[str] = [],
rank_feature: dict | None = None)
+get(chunkId: str, indexName: str, knowledgebaseIds: list[str]) dict | None
+insert(documents: list[dict], indexName: str, knowledgebaseId: str = None) list[str]
+update(condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) bool
+delete(condition: dict, indexName: str, knowledgebaseId: str) int
+getTotal(res) int
+getChunkIds(res) list[str]
+getFields(res, fields: list[str]) dict[str, dict]
+getHighlight(res, keywords: list[str], fieldnm: str) dict[str, str]
+getAggregation(res, fieldnm: str) list[tuple]
+sql(sql: str, fetch_size: int, format: str)
}
OSConnection --|> DocStoreConnection
OSConnection ..> OpenSearch
OSConnection ..> UpdateByQuery
OSConnection ..> Q
OSConnection ..> Search
OSConnection ..> Index
Summary
opensearch_conn.py is a core backend module in the InfiniFlow project that manages all interactions with OpenSearch as a document engine. It wraps connection management, index lifecycle operations, advanced search capabilities including text and vector matching, and document CRUD, while providing utility functions to interpret results. Its robust retry mechanisms and error handling ensure resilient communication with the OpenSearch cluster. This file bridges the gap between the application’s high-level document storage abstractions and the specifics of OpenSearch DSL and APIs.