es_conn.py
Overview
es_conn.py provides a robust, singleton-based connection and interface layer to Elasticsearch for the InfiniFlow system's document storage and search needs. It extends a generic DocStoreConnection abstract base class to implement Elasticsearch-specific features such as index management, CRUD (Create, Read, Update, Delete) operations, complex search queries with multi-modal matching (text, dense vectors, fusion), and SQL-like querying over Elasticsearch.
This file is designed to handle Elasticsearch connectivity, query construction, retries on failures, and response parsing. It integrates tightly with the system's configuration and utility modules while exposing high-level APIs for other components to interact with the document database transparently.
Classes and Methods
Class: ESConnection
Singleton subclass of DocStoreConnection that manages Elasticsearch operations.
Initialization: init(self)
Establishes connection to Elasticsearch cluster.
Validates cluster health and version (minimum version 8).
Loads index mappings from a JSON configuration file (
conf/mapping.json).Retries connection multiple times before raising exceptions.
Logs connection status and errors.
Private Methods
_connect(self) -> bool
Connects to Elasticsearch using hosts, credentials from settings.
Sets up self.es client and populates
self.infowith cluster info.Returns
Trueif connection is successful, elseFalse.
Public Methods
dbType(self) -> str
Returns the string "elasticsearch" indicating the type of document store.
health(self) -> dict
Returns cluster health information as a dictionary including cluster health status and type.
Index/Table Operations
createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int)
Creates an Elasticsearch index if it does not exist.
Uses mappings/settings loaded from the predefined
mapping.json.Parameters:
indexName: Name of the index.knowledgebaseId: ID of knowledge base (used for multi-tenant scenarios).vectorSize: Dimensionality of vectors (not directly used here but part of signature).
Returns:
Trueif index exists or creation successful;Noneor logs error otherwise.
deleteIdx(self, indexName: str, knowledgebaseId: str)
Deletes an index only if the
knowledgebaseIdis empty (indicating all KBs under tenant deleted).Otherwise, index remains to serve other KBs.
Handles exceptions silently if index not found.
indexExist(self, indexName: str, knowledgebaseId: str = None) -> bool
Checks if the index exists in Elasticsearch.
Retries on connection timeout.
Returns
Trueif exists, elseFalse.
CRUD Operations
search(self, selectFields: list[str], highlightFields: list[str], condition: dict, matchExprs: list[MatchExpr], orderBy: OrderByExpr, offset: int, limit: int, indexNames: str | list[str], knowledgebaseIds: list[str], aggFields: list[str] = [], rank_feature: dict | None = None)
Performs a complex search query combining:
Boolean filters based on
condition.Multi-modal matching expressions (
MatchTextExpr,MatchDenseExpr,FusionExpr).Ranking with optional rank features.
Highlighting of matched terms.
Sorting by specified fields.
Aggregations on requested fields.
Parameters:
selectFields: Fields to retrieve (currently used internally).highlightFields: Fields to highlight in results.condition: Dictionary of filter conditions.matchExprs: List of match expressions (text, dense vector, fusion).orderBy: Ordering expression for sorting.offset: Pagination offset.limit: Number of results to return.indexNames: String or list of index names to search in.knowledgebaseIds: List of knowledge base IDs to filter on.aggFields: List of fields to aggregate on.rank_feature: Optional dictionary of fields and their rank feature scores.
Returns: Elasticsearch raw search response dictionary.
Raises exceptions on repeated failures.
Usage example:
res = es_conn.search(
selectFields=["title", "content"],
highlightFields=["content"],
condition={"kb_id": ["kb1", "kb2"], "available_int": 1},
matchExprs=[MatchTextExpr(fields=["content"], matching_text="example query", extra_options={})],
orderBy=OrderByExpr(fields=[("page_num_int", 0)]),
offset=0,
limit=10,
indexNames="index1",
knowledgebaseIds=["kb1"]
)
get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None
Retrieves a single document by its ID from the specified index.
Returns the document source dict with an added
"id"key orNoneif not found.
insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]
Bulk inserts documents into an index.
Each document must contain an
"id"field (used as_idin ES).Returns a list of error strings if any operation failed, empty list otherwise.
update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool
Updates documents matching
conditionwith values fromnewValue.Supports:
Single document update by
"id".Bulk updates using
UpdateByQuerywith scripting for field modifications, additions, or removals.
Retries on connection issues; returns
Trueon success,Falseotherwise.
delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int
Deletes documents matching the condition from the index.
Supports deletion by list of IDs or complex boolean queries.
Returns number of documents deleted.
Search Result Helpers
These methods help parse and extract relevant data from Elasticsearch search results.
getTotal(self, res) -> int
Returns the total number of hits in the search result.getChunkIds(self, res) -> list[str]
Returns a list of document IDs from the search hits.getFields(self, res, fields: list[str]) -> dict[str, dict]
Returns a dictionary mapping document IDs to a sub-dictionary of requested fields and their values.getHighlight(self, res, keywords: list[str], fieldnm: str) -> dict[str, str]
Extracts and returns highlighted snippets for matched documents keyed by ID.getAggregation(self, res, fieldnm: str) -> list[tuple[str, int]]
Returns aggregation buckets (key, document count) for the given field from the search response.
SQL Interface
sql(self, sql: str, fetch_size: int, format: str)
Executes an SQL-like query via Elasticsearch SQL plugin.
Preprocesses the query to replace LIKE expressions on token fields with MATCH expressions for better full-text search.
Parameters:
sql: SQL query string.fetch_size: Number of rows to fetch per request.format: Response format (e.g., JSON).
Returns raw Elasticsearch SQL result or
Noneon failure.
Important Implementation Details
Singleton Pattern:
Ensures a single instance ofESConnectionis created and shared throughout the application for efficiency and consistency.Retries and Robustness:
Most network operations retry up toATTEMPT_TIME(2) times to handle transient failures like timeouts.Query Construction:
Useselasticsearch_dsllibrary to build complex boolean queries combining filters, full-text search, vector similarity (KNN), and rank features.Fusion Matching:
Supports fusion of text and vector matches with weighted sums, enabling advanced multimodal search.Mapping Configuration:
Loads index mapping and settings from a static JSON file, ensuring consistent schema management.Scripted Updates:
Uses painless scripts for efficient bulk updates and field manipulations on documents.Logging:
Extensive logging for debugging, error reporting, and monitoring Elasticsearch interactions.
Interaction with Other System Components
Settings Integration:
Reads Elasticsearch hosts, credentials, and other configurations from thesettingsmodule.Utility Modules:
Uses utilities fromrag.utils(e.g.,singleton,get_float) andapi.utils.file_utilsto manage file paths.DocStoreConnection Base Class:
Implements abstract methods defined inDocStoreConnectionto provide polymorphic access to different document stores.NLP Utilities:
Usesrag.nlpfor language detection and tokenization during highlighting and SQL query preprocessing.Match Expression Classes:
Relies onMatchExprand subclasses (MatchTextExpr,MatchDenseExpr,FusionExpr) to define search matching logic.Application Layer:
Serves as the backend for document retrieval, indexing, and search functionality used by the API layer and knowledge base modules.
Visual Diagram: Class Diagram of ESConnection
classDiagram
class ESConnection {
- info: dict
- es: Elasticsearch
- mapping: dict
+ __init__()
+ _connect() bool
+ dbType() str
+ health() dict
+ createIdx(indexName: str, knowledgebaseId: str, vectorSize: int)
+ deleteIdx(indexName: str, knowledgebaseId: str)
+ indexExist(indexName: str, knowledgebaseId: str = None) bool
+ search(selectFields: list[str], highlightFields: list[str], condition: dict,
matchExprs: list[MatchExpr], orderBy: OrderByExpr, offset: int, limit: int,
indexNames: str | list[str], knowledgebaseIds: list[str], aggFields: list[str] = [], rank_feature: dict | None = None)
+ get(chunkId: str, indexName: str, knowledgebaseIds: list[str]) dict | None
+ insert(documents: list[dict], indexName: str, knowledgebaseId: str = None) list[str]
+ update(condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) bool
+ delete(condition: dict, indexName: str, knowledgebaseId: str) int
+ getTotal(res) int
+ getChunkIds(res) list[str]
+ getFields(res, fields: list[str]) dict[str, dict]
+ getHighlight(res, keywords: list[str], fieldnm: str) dict[str, str]
+ getAggregation(res, fieldnm: str) list[tuple[str, int]]
+ sql(sql: str, fetch_size: int, format: str)
}
ESConnection --|> DocStoreConnection
ESConnection ..> Elasticsearch
ESConnection ..> MatchExpr
ESConnection ..> OrderByExpr
Summary
The es_conn.py module encapsulates all Elasticsearch related operations in the InfiniFlow system, providing a reliable, feature-rich, and extensible interface for document indexing, updating, searching, and management. It balances performance with robustness by implementing retry mechanisms and leveraging Elasticsearch's rich query DSL and SQL capabilities. This module is foundational for the system's knowledge base and document retrieval capabilities.