server.py
Overview
server.py implements a microservice control plane (MCP) server for RAGFlow, a Retrieval-Augmented Generation (RAG) backend system that facilitates querying large document datasets. The server exposes an API interface that allows clients to query datasets and documents through retrieval requests. It supports multiple transport protocols including Server-Sent Events (SSE) and Streamable HTTP, enabling real-time and asynchronous data streaming.
This file handles:
Connecting and authenticating with the RAGFlow backend service.
Caching dataset and document metadata to optimize repeated queries.
Implementing retrieval functionality that searches datasets/documents based on questions and returns relevant content chunks.
Providing an MCP server interface with tools registration and callable endpoints.
Configuring and running the server with command-line options and environment variable overrides.
Supporting multi-tenant (host) and self-host modes with API key management.
Middleware for authentication and request routing.
Integration with Starlette framework for async HTTP serving.
Classes and Functions
LaunchMode (StrEnum)
Enum defining the launch mode of the server.
Values:
SELF_HOST="self-host": Single tenant mode with static API key.HOST="host": Multi-tenant mode requiring clients to provide authorization headers.
Transport (StrEnum)
Enum representing supported transport protocols.
Values:
SSE="sse": Server-Sent Events transport.STEAMABLE_HTTP =
"streamable-http": Streamable HTTP transport for asynchronous streaming.
RAGFlowConnector
Connector class that communicates with the RAGFlow backend API. It manages caching of datasets and document metadata and facilitates retrieval queries.
Properties
_MAX_DATASET_CACHE: Max entries in dataset metadata cache (32)._MAX_DOCUMENT_CACHE: Max entries in document metadata cache (128)._CACHE_TTL: Cache time-to-live in seconds (300)._dataset_metadata_cache: LRU cache for dataset metadata._document_metadata_cache: LRU cache for document metadata by dataset.
Initialization
def __init__(self, base_url: str, version="v1")
Parameters:
base_url(str): Base URL of the RAGFlow backend API.version(str): API version string, default"v1".
Usage:
connector = RAGFlowConnector(base_url="http://127.0.0.1:9380")
Methods
bind_api_key(api_key: str) -> None
Binds an API key for authorization in requests._post(path: str, json=None, stream=False, files=None) -> requests.Response or None
Sends an authorized POST request to the backend._get(path: str, params=None, json=None) -> requests.Response
Sends a GET request to the backend._is_cache_valid(ts: float) -> bool
Checks if a cached entry timestamp is still valid (not expired)._get_expiry_timestamp() -> float
Returns a TTL-based expiry timestamp with random offset._get_cached_dataset_metadata(dataset_id: str) -> dict or None
Retrieves dataset metadata from cache if valid._set_cached_dataset_metadata(dataset_id: str, metadata: dict) -> None
Stores dataset metadata in cache with TTL._get_cached_document_metadata_by_dataset(dataset_id: str) -> dict or None
Retrieves document metadata for a dataset from cache._set_cached_document_metadata_by_dataset(dataset_id: str, doc_id_meta_list: list) -> None
Stores document metadata list for a dataset in cache.list_datasets(page=1, page_size=1000, orderby="create_time", desc=True, id=None, name=None) -> str
Queries the backend to list datasets, returns JSON lines string of dataset description and id.retrieval(...) -> list[types.TextContent]
Executes a retrieval query over specified datasets/documents with search parameters. Returns a list of text content results containing relevant chunks.Parameters:
dataset_ids(list[str]): List of dataset IDs to search. If empty, searches all datasets.document_ids(list[str], optional): List of document IDs to constrain search.question(str): Query string.page(int): Pagination page number (default 1).page_size(int): Number of results per page (default 30).similarity_threshold(float): Minimum similarity threshold for matches (default 0.2).vector_similarity_weight(float): Weight balancing vector vs term similarity (default 0.3).top_k(int): Max results to consider before ranking (default 1024).rerank_id(str, optional): Identifier of reranking model.keyword(bool): Enable keyword-based search (default False).force_refresh(bool): Force refresh of cached metadata (default False).
_get_document_metadata_cache(dataset_ids: list[str], force_refresh=False) -> tuple[dict, dict]
Fetches and caches document metadata and dataset metadata for given datasets._map_chunk_fields(chunk_data: dict, dataset_cache: dict, document_cache: dict) -> dict
Enhances retrieval chunk data with dataset name and per-chunk document metadata.
RAGFlowCtx
Simple context wrapper holding a RAGFlowConnector instance.
init(self, connector: RAGFlowConnector)
sse_lifespan(server: Server) -> AsyncIterator[dict]
Async context manager for the lifecycle of the legacy SSE application. Initializes the RAGFlowConnector context and logs startup/shutdown events.
with_api_key(required=True)
Decorator factory for API key injection and validation on endpoint handlers.
Parameters:
required(bool): Whether API key is mandatory.
Usage Example:
@with_api_key(required=True)
async def list_tools(*, connector):
...
Behavior:
In
HOSTmode, extracts token from Authorization orapi_keyheaders.In
SELF_HOSTmode, uses staticHOST_API_KEY.Binds API key to the
RAGFlowConnectorinstance passed to decorated functions.
MCP Server Endpoints
list_tools(*, connector) -> list[types.Tool]
Lists available tools for the MCP server, currently exposing
"ragflow_retrieval".The tool includes detailed input schema describing parameters supported by the retrieval API.
call_tool(name: str, arguments: dict, *, connector) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]
Handles invocation of named tools. Currently supports
"ragflow_retrieval".Extracts arguments for retrieval and calls
connector.retrieval(), returning the results.
create_starlette_app() -> Starlette
Creates a Starlette ASGI application configured with:
Optional authentication middleware (in HOST mode).
SSE transport routes (if enabled).
Streamable HTTP transport routes (if enabled).
Lifespan context managers for transport sessions.
Returns the configured Starlette app instance.
main(...)
CLI entrypoint using click to parse options and environment variables. Sets global configuration variables, validates mode constraints, prints banner and status, and runs the server via uvicorn.
Options include:
--base-url: Backend API URL.--host,--port: Network binding.--mode: Launch mode (self-hostorhost).--api-key: API key forself-hostmode.Flags to enable/disable transports and JSON response mode.
Implementation Details and Algorithms
Caching: Uses
OrderedDictas an LRU cache to store dataset and document metadata with TTL to reduce redundant backend calls.Metadata Refresh: Supports forced refresh to bypass cache and fetch fresh metadata.
Retrieval Logic: If no dataset IDs are provided, it automatically fetches all datasets and searches across them.
Chunk Enhancement: Each retrieval chunk is enhanced with dataset name and document metadata for richer client-side context.
Multi-transport Support: Supports legacy SSE and modern Streamable HTTP for flexible client compatibility.
Authentication Middleware: In HOST mode, intercepts HTTP requests to verify Authorization headers and API keys.
Async Server: Built on Starlette and uvicorn for high-performance async serving.
Interaction with Other Components
RAGFlow Backend API: Communicates with the backend server at the configured
BASE_URLfor dataset and document metadata, and retrieval queries.MCP Framework: Integrates with the MCP server framework via
Serverclass frommcp.server.lowleveland exposes tools and callable endpoints.Middleware and Transport Layers: Uses Starlette middleware and transport modules (
mcp.server.sseandmcp.server.streamable_http_manager) for communication protocols.Types Module (
mcp.types): Uses predefined data types for tools, text content, image content, and embedded resources.
Usage Examples
Starting the Server in Self-Host Mode
uv run mcp/server/server.py --host=127.0.0.1 --port=9382 --base-url=http://127.0.0.1:9380 --mode=self-host --api-key=ragflow-xxxxx
Calling the Retrieval Tool Programmatically
from mcp.server.server import RAGFlowConnector
connector = RAGFlowConnector(base_url="http://127.0.0.1:9380")
connector.bind_api_key("ragflow-xxxxx")
results = connector.retrieval(
dataset_ids=["dataset1"],
question="What is the capital of France?",
page=1,
page_size=10,
)
for content in results:
print(content.text)
Mermaid Class Diagram
classDiagram
class RAGFlowConnector {
-_MAX_DATASET_CACHE: int
-_MAX_DOCUMENT_CACHE: int
-_CACHE_TTL: int
-_dataset_metadata_cache: OrderedDict
-_document_metadata_cache: OrderedDict
+__init__(base_url: str, version="v1")
+bind_api_key(api_key: str)
+list_datasets(page: int, page_size: int, orderby: str, desc: bool, id: str, name: str) str
+retrieval(dataset_ids, document_ids, question, page, page_size, similarity_threshold, vector_similarity_weight, top_k, rerank_id, keyword, force_refresh)
-_post(path, json, stream, files)
-_get(path, params, json)
-_is_cache_valid(ts)
-_get_expiry_timestamp()
-_get_cached_dataset_metadata(dataset_id)
-_set_cached_dataset_metadata(dataset_id, metadata)
-_get_cached_document_metadata_by_dataset(dataset_id)
-_set_cached_document_metadata_by_dataset(dataset_id, doc_id_meta_list)
-_get_document_metadata_cache(dataset_ids, force_refresh)
-_map_chunk_fields(chunk_data, dataset_cache, document_cache)
}
class RAGFlowCtx {
+__init__(connector: RAGFlowConnector)
-conn: RAGFlowConnector
}
RAGFlowCtx --> RAGFlowConnector
Summary
server.py provides a robust MCP server interface to the RAGFlow retrieval backend, supporting multi-tenant and self-host modes with flexible transport protocols. It implements efficient caching strategies, rich metadata enrichment, and secure API key management to enable scalable and performant retrieval operations. The integration with Starlette and uvicorn allows asynchronous and extensible server deployment.
This file is a core part of the RAGFlow system enabling external clients to query, retrieve, and interact with large document datasets in a real-time, scalable manner.