es_conn.py


Overview

es_conn.py provides a robust, singleton-based connection and interface layer to Elasticsearch for the InfiniFlow system's document storage and search needs. It extends a generic DocStoreConnection abstract base class to implement Elasticsearch-specific features such as index management, CRUD (Create, Read, Update, Delete) operations, complex search queries with multi-modal matching (text, dense vectors, fusion), and SQL-like querying over Elasticsearch.

This file is designed to handle Elasticsearch connectivity, query construction, retries on failures, and response parsing. It integrates tightly with the system's configuration and utility modules while exposing high-level APIs for other components to interact with the document database transparently.


Classes and Methods

Class: ESConnection

Singleton subclass of DocStoreConnection that manages Elasticsearch operations.

Initialization: init(self)


Private Methods

_connect(self) -> bool


Public Methods

dbType(self) -> str

health(self) -> dict


Index/Table Operations

createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int)

deleteIdx(self, indexName: str, knowledgebaseId: str)

indexExist(self, indexName: str, knowledgebaseId: str = None) -> bool


CRUD Operations

search(self, selectFields: list[str], highlightFields: list[str], condition: dict, matchExprs: list[MatchExpr], orderBy: OrderByExpr, offset: int, limit: int, indexNames: str | list[str], knowledgebaseIds: list[str], aggFields: list[str] = [], rank_feature: dict | None = None)

Usage example:

res = es_conn.search(
    selectFields=["title", "content"],
    highlightFields=["content"],
    condition={"kb_id": ["kb1", "kb2"], "available_int": 1},
    matchExprs=[MatchTextExpr(fields=["content"], matching_text="example query", extra_options={})],
    orderBy=OrderByExpr(fields=[("page_num_int", 0)]),
    offset=0,
    limit=10,
    indexNames="index1",
    knowledgebaseIds=["kb1"]
)

get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None

insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]

update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool

delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int


Search Result Helpers

These methods help parse and extract relevant data from Elasticsearch search results.


SQL Interface

sql(self, sql: str, fetch_size: int, format: str)


Important Implementation Details


Interaction with Other System Components


Visual Diagram: Class Diagram of ESConnection

classDiagram
    class ESConnection {
        - info: dict
        - es: Elasticsearch
        - mapping: dict
        + __init__()
        + _connect() bool
        + dbType() str
        + health() dict
        + createIdx(indexName: str, knowledgebaseId: str, vectorSize: int)
        + deleteIdx(indexName: str, knowledgebaseId: str)
        + indexExist(indexName: str, knowledgebaseId: str = None) bool
        + search(selectFields: list[str], highlightFields: list[str], condition: dict,
                 matchExprs: list[MatchExpr], orderBy: OrderByExpr, offset: int, limit: int,
                 indexNames: str | list[str], knowledgebaseIds: list[str], aggFields: list[str] = [], rank_feature: dict | None = None)
        + get(chunkId: str, indexName: str, knowledgebaseIds: list[str]) dict | None
        + insert(documents: list[dict], indexName: str, knowledgebaseId: str = None) list[str]
        + update(condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) bool
        + delete(condition: dict, indexName: str, knowledgebaseId: str) int
        + getTotal(res) int
        + getChunkIds(res) list[str]
        + getFields(res, fields: list[str]) dict[str, dict]
        + getHighlight(res, keywords: list[str], fieldnm: str) dict[str, str]
        + getAggregation(res, fieldnm: str) list[tuple[str, int]]
        + sql(sql: str, fetch_size: int, format: str)
    }

    ESConnection --|> DocStoreConnection
    ESConnection ..> Elasticsearch
    ESConnection ..> MatchExpr
    ESConnection ..> OrderByExpr

Summary

The es_conn.py module encapsulates all Elasticsearch related operations in the InfiniFlow system, providing a reliable, feature-rich, and extensible interface for document indexing, updating, searching, and management. It balances performance with robustness by implementing retry mechanisms and leveraging Elasticsearch's rich query DSL and SQL capabilities. This module is foundational for the system's knowledge base and document retrieval capabilities.