opensearch_conn.py


Overview

The opensearch_conn.py file provides a comprehensive Python interface for interacting with an OpenSearch cluster, specifically tailored for document storage and retrieval in the InfiniFlow project. It implements a singleton class OSConnection that extends the generic DocStoreConnection interface to perform database operations, index management, CRUD (Create, Read, Update, Delete) operations, complex searches including KNN vector similarity search, and SQL-like query execution on OpenSearch.

This file encapsulates OpenSearch-specific functionality, including connection setup with retry logic, health checks, index existence verification, advanced query building (text, dense vector, and fusion queries), and helper methods for processing search results. It is a critical component for integrating OpenSearch as a document engine backend in the larger system.


Classes and Functions

Class: OSConnection

Singleton
Extends DocStoreConnection to provide OpenSearch-specific implementations.

Initialization: __init__(self)


Method: dbType(self) -> str


Method: health(self) -> dict


Index (Table) Operations

Method: createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int)

Method: deleteIdx(self, indexName: str, knowledgebaseId: str)

Method: indexExist(self, indexName: str, knowledgebaseId: str = None) -> bool


CRUD Operations

Method: search(...)

res = os_conn.search(
    selectFields=["title", "content"],
    highlightFields=["content"],
    condition={"kb_id": ["kb1"], "available_int": 1},
    matchExprs=[MatchTextExpr(fields=["content"], matching_text="open search"), MatchDenseExpr(...), FusionExpr(...)],
    orderBy=OrderByExpr(fields=[("page_num_int", 0)]),
    offset=0,
    limit=10,
    indexNames="my_index",
    knowledgebaseIds=["kb1"]
)

Method: get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None


Method: insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]


Method: update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool


Method: delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int


Helper Functions for Search Result Processing


SQL Query Support

Method: sql(self, sql: str, fetch_size: int, format: str)


Important Implementation Details and Algorithms


Interaction with Other Components


Visual Diagram

classDiagram
    class OSConnection {
        +info: dict
        +os: OpenSearch
        +mapping: dict
        +__init__()
        +dbType() str
        +health() dict
        +createIdx(indexName: str, knowledgebaseId: str, vectorSize: int)
        +deleteIdx(indexName: str, knowledgebaseId: str)
        +indexExist(indexName: str, knowledgebaseId: str = None) bool
        +search(selectFields: list[str], highlightFields: list[str], condition: dict,
                matchExprs: list[MatchExpr], orderBy: OrderByExpr, offset: int, limit: int,
                indexNames: str | list[str], knowledgebaseIds: list[str], aggFields: list[str] = [],
                rank_feature: dict | None = None)
        +get(chunkId: str, indexName: str, knowledgebaseIds: list[str]) dict | None
        +insert(documents: list[dict], indexName: str, knowledgebaseId: str = None) list[str]
        +update(condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) bool
        +delete(condition: dict, indexName: str, knowledgebaseId: str) int
        +getTotal(res) int
        +getChunkIds(res) list[str]
        +getFields(res, fields: list[str]) dict[str, dict]
        +getHighlight(res, keywords: list[str], fieldnm: str) dict[str, str]
        +getAggregation(res, fieldnm: str) list[tuple]
        +sql(sql: str, fetch_size: int, format: str)
    }

    OSConnection --|> DocStoreConnection
    OSConnection ..> OpenSearch
    OSConnection ..> UpdateByQuery
    OSConnection ..> Q
    OSConnection ..> Search
    OSConnection ..> Index

Summary

opensearch_conn.py is a core backend module in the InfiniFlow project that manages all interactions with OpenSearch as a document engine. It wraps connection management, index lifecycle operations, advanced search capabilities including text and vector matching, and document CRUD, while providing utility functions to interpret results. Its robust retry mechanisms and error handling ensure resilient communication with the OpenSearch cluster. This file bridges the gap between the application’s high-level document storage abstractions and the specifics of OpenSearch DSL and APIs.