mcp_tool_call_conn.py
Overview
The mcp_tool_call_conn.py file provides an asynchronous client session implementation for interacting with MCP (Multi-Component Platform) servers using different transport protocols (SSE or Streamable HTTP). It primarily defines the MCPToolCallSession class, which manages communication with MCP servers to list available tools and call those tools asynchronously. Additionally, it includes utility functions for managing multiple MCP sessions and converting MCP tool metadata into OpenAI-compatible tool descriptions.
This module is a key integration layer between the client-side application and MCP servers, enabling reliable, concurrent tool invocation and tool discovery with robust error handling and session lifecycle management.
Classes and Functions
Class: MCPToolCallSession
MCPToolCallSession extends ToolCallSession from the rag.llm.chat_model module to provide:
Connection management to an MCP server over SSE or Streamable HTTP.
An event loop running in a dedicated thread for asynchronous communication.
Queued task processing for listing tools and calling tools on the MCP server.
Timeout and error handling for MCP interactions.
Synchronous wrappers to interface with asynchronous methods.
Global tracking of all active instances via a weak reference set.
Attributes
Attribute | Type | Description |
|---|---|---|
Class-level registry of all active instances. | ||
|
| MCP server configuration (with |
|
| Variables for header template substitution. |
|
| Queue for MCP tasks (tool listing, tool calls). |
|
| Flag indicating if the session is closing. |
|
| Dedicated event loop running in a separate thread. |
|
| Thread pool managing the event loop thread. |
Methods
__init__(self, mcp_server: Any, server_variables: dict[str, Any] | None = None)
Initializes the session, sets up the event loop and thread pool, and starts the MCP server communication loop.
Parameters:
mcp_server: Object containing MCP server configuration.server_variables(optional): Variables for header templating.
Usage:
session = MCPToolCallSession(mcp_server=my_mcp_server, server_variables={"token": "abc123"})
async def _mcp_server_loop(self) -> None
Main asynchronous loop that establishes connection to the MCP server based on its type (SSE or Streamable HTTP), initializes the client session, and processes tasks.
Handles authentication headers via templating.
Manages connection exceptions and initialization timeouts.
Routes tasks to be processed via
_process_mcp_tasks().
async def _process_mcp_tasks(self, client_session: ClientSession | None, error_message: str | None = None) -> None
Continuously processes tasks pulled from _queue. Tasks include:
"list_tools": Retrieve available tools from MCP server."tool_call": Invoke a specific tool with arguments.
If the client session is invalid or there is an error, returns the error to the task's result queue.
async def _call_mcp_server(self, task_type: MCPTaskType, timeout: float | int = 8, **kwargs) -> Any
Helper to enqueue a task to _queue and wait for its result asynchronously with a timeout.
Parameters:
task_type: One of"list_tools"or"tool_call".timeout: Maximum wait time in seconds.**kwargs: Arguments specific to the task.
Returns: Result of the MCP task or raises an exception on failure or timeout.
async def _call_mcp_tool(self, name: str, arguments: dict[str, Any], timeout: float | int = 10) -> str
Invokes a tool call on the MCP server using _call_mcp_server and processes the result.
Returns the text content of the tool's response or an error message.
async def _get_tools_from_mcp_server(self, timeout: float | int = 8) -> list[Tool]
Fetches the list of available tools from the MCP server asynchronously.
def get_tools(self, timeout: float | int = 10) -> list[Tool]
Synchronous wrapper that runs _get_tools_from_mcp_server in the event loop thread and returns the tool list.
Raises
RuntimeErroron timeout or logs errors.
@override def tool_call(self, name: str, arguments: dict[str, Any], timeout: float | int = 10) -> str
Synchronous override of the ToolCallSession method to call a tool by name with arguments.
Runs
_call_mcp_toolasynchronously.Handles timeouts and exceptions gracefully.
async def close(self) -> None
Closes the session asynchronously by stopping the event loop and thread pool, and removes the instance from the global registry.
def close_sync(self, timeout: float | int = 5) -> None
Synchronous method to close the session, waits for completion or times out.
Function: close_multiple_mcp_toolcall_sessions(sessions: list[MCPToolCallSession]) -> None
Closes multiple MCPToolCallSession instances concurrently.
Starts a new event loop and thread to run closure coroutines.
Waits for all sessions to close, then stops the event loop and joins the thread.
Logs progress and final cleanup status.
Usage:
close_multiple_mcp_toolcall_sessions([session1, session2, session3])
Function: shutdown_all_mcp_sessions() -> None
Gracefully shuts down all active MCPToolCallSession instances tracked globally.
Uses
close_multiple_mcp_toolcall_sessionsinternally.Logs if there are no active sessions.
Function: mcp_tool_metadata_to_openai_tool(mcp_tool: Tool | dict) -> dict[str, Any]
Converts MCP tool metadata into a dictionary formatted for OpenAI tools.
Supports
Toolinstances or plain dictionaries.Outputs a dictionary with type
"function"including name, description, and parameters.Example:
openai_tool = mcp_tool_metadata_to_openai_tool(mcp_tool) # { # "type": "function", # "function": { # "name": "tool_name", # "description": "Tool description", # "parameters": {...} # } # }
Implementation Details & Algorithms
Dedicated Event Loop in Thread:
EachMCPToolCallSessioncreates its own asyncio event loop running inside a dedicated thread viaThreadPoolExecutor. This design isolates MCP communication from the main thread allowing synchronous interfaces to safely interact with asynchronous operations.Task Queue with Async Communication:
MCP tasks (list_tools,tool_call) are represented as tuples placed into an asyncio queue. The_process_mcp_taskscoroutine continuously consumes tasks, calls the appropriate MCP client methods, and returns results via a result queue.Header Templating:
Authentication and other headers can include templated variables, substituted at runtime using Python'sstring.Template. This allows dynamic headers based on theserver_variablesdictionary.Transport Protocol Handling:
Supports two MCP server types:SSE (Server-Sent Events): Uses
sse_clientto receive event streams.Streamable HTTP: Uses
streamablehttp_clientfor bidirectional communication streams.
Timeouts & Error Handling:
Initialization and task requests have configurable timeouts. Exceptions are caught and logged. Errors are propagated back to callers wrapped in exceptions or error messages.Global Instance Tracking:
Uses aweakref.WeakSetto track all current instances, enabling global batch operations such as graceful shutdown.
Interaction With Other System Components
MCP Server Configuration (
mcp_server):
Passed into the session, contains server URL, headers, server type (SSE or Streamable HTTP), and a unique ID.ClientSession:
Represents the active session with the MCP server, handling protocol-specific communication details. This file manages lifecycle and calls onClientSession.MCP Client Modules:
Importssse_clientandstreamablehttp_clientto establish connections depending on server type.RAG LLM Chat Model (
ToolCallSession):
TheMCPToolCallSessionderives fromToolCallSession, integrating MCP tool calls into the broader LLM chat framework.Tool & Result Types:
Uses types likeTool,CallToolResult,ListToolsResult, andTextContentfrom MCP and RAG modules to represent data exchanges.
Visual Diagram
classDiagram
class MCPToolCallSession {
- _mcp_server: Any
- _server_variables: dict
- _queue: asyncio.Queue
- _close: bool
- _event_loop: asyncio.AbstractEventLoop
- _thread_pool: ThreadPoolExecutor
+ __init__(mcp_server, server_variables=None)
+ get_tools(timeout=10) list~Tool~
+ tool_call(name, arguments, timeout=10) str
+ close_sync(timeout=5)
+ close() coroutine
- _mcp_server_loop() coroutine
- _process_mcp_tasks(client_session, error_message=None) coroutine
- _call_mcp_server(task_type, timeout=8, **kwargs) coroutine
- _call_mcp_tool(name, arguments, timeout=10) coroutine
- _get_tools_from_mcp_server(timeout=8) coroutine
}
MCPToolCallSession o-- ClientSession : uses
MCPToolCallSession ..> ThreadPoolExecutor : manages event loop thread
MCPToolCallSession ..> asyncio.Queue : manages tasks
Summary
mcp_tool_call_conn.py is a robust asynchronous client session implementation that enables interaction with MCP servers using SSE or Streamable HTTP protocols. It provides thread-safe, synchronous interfaces for tool listing and invocation, manages session lifecycle, and ensures proper concurrency and error handling. Its design facilitates integration within larger systems requiring dynamic tool calls over MCP, supporting efficient multi-session management and graceful shutdown of resources.