mcp_tool_call_conn.py


Overview

The mcp_tool_call_conn.py file provides an asynchronous client session implementation for interacting with MCP (Multi-Component Platform) servers using different transport protocols (SSE or Streamable HTTP). It primarily defines the MCPToolCallSession class, which manages communication with MCP servers to list available tools and call those tools asynchronously. Additionally, it includes utility functions for managing multiple MCP sessions and converting MCP tool metadata into OpenAI-compatible tool descriptions.

This module is a key integration layer between the client-side application and MCP servers, enabling reliable, concurrent tool invocation and tool discovery with robust error handling and session lifecycle management.


Classes and Functions

Class: MCPToolCallSession

MCPToolCallSession extends ToolCallSession from the rag.llm.chat_model module to provide:

Attributes

Attribute

Type

Description

_ALL_INSTANCES

weakref.WeakSet[MCPToolCallSession]

Class-level registry of all active instances.

_mcp_server

Any

MCP server configuration (with url, headers, server_type, id).

_server_variables

dict[str, Any]

Variables for header template substitution.

_queue

asyncio.Queue

Queue for MCP tasks (tool listing, tool calls).

_close

bool

Flag indicating if the session is closing.

_event_loop

asyncio.AbstractEventLoop

Dedicated event loop running in a separate thread.

_thread_pool

ThreadPoolExecutor

Thread pool managing the event loop thread.

Methods

__init__(self, mcp_server: Any, server_variables: dict[str, Any] | None = None)

Initializes the session, sets up the event loop and thread pool, and starts the MCP server communication loop.

async def _mcp_server_loop(self) -> None

Main asynchronous loop that establishes connection to the MCP server based on its type (SSE or Streamable HTTP), initializes the client session, and processes tasks.

async def _process_mcp_tasks(self, client_session: ClientSession | None, error_message: str | None = None) -> None

Continuously processes tasks pulled from _queue. Tasks include:

If the client session is invalid or there is an error, returns the error to the task's result queue.

async def _call_mcp_server(self, task_type: MCPTaskType, timeout: float | int = 8, **kwargs) -> Any

Helper to enqueue a task to _queue and wait for its result asynchronously with a timeout.

async def _call_mcp_tool(self, name: str, arguments: dict[str, Any], timeout: float | int = 10) -> str

Invokes a tool call on the MCP server using _call_mcp_server and processes the result.

async def _get_tools_from_mcp_server(self, timeout: float | int = 8) -> list[Tool]

Fetches the list of available tools from the MCP server asynchronously.

def get_tools(self, timeout: float | int = 10) -> list[Tool]

Synchronous wrapper that runs _get_tools_from_mcp_server in the event loop thread and returns the tool list.

@override def tool_call(self, name: str, arguments: dict[str, Any], timeout: float | int = 10) -> str

Synchronous override of the ToolCallSession method to call a tool by name with arguments.

async def close(self) -> None

Closes the session asynchronously by stopping the event loop and thread pool, and removes the instance from the global registry.

def close_sync(self, timeout: float | int = 5) -> None

Synchronous method to close the session, waits for completion or times out.


Function: close_multiple_mcp_toolcall_sessions(sessions: list[MCPToolCallSession]) -> None

Closes multiple MCPToolCallSession instances concurrently.


Function: shutdown_all_mcp_sessions() -> None

Gracefully shuts down all active MCPToolCallSession instances tracked globally.


Function: mcp_tool_metadata_to_openai_tool(mcp_tool: Tool | dict) -> dict[str, Any]

Converts MCP tool metadata into a dictionary formatted for OpenAI tools.


Implementation Details & Algorithms


Interaction With Other System Components


Visual Diagram

classDiagram
    class MCPToolCallSession {
        - _mcp_server: Any
        - _server_variables: dict
        - _queue: asyncio.Queue
        - _close: bool
        - _event_loop: asyncio.AbstractEventLoop
        - _thread_pool: ThreadPoolExecutor
        + __init__(mcp_server, server_variables=None)
        + get_tools(timeout=10) list~Tool~
        + tool_call(name, arguments, timeout=10) str
        + close_sync(timeout=5)
        + close() coroutine
        - _mcp_server_loop() coroutine
        - _process_mcp_tasks(client_session, error_message=None) coroutine
        - _call_mcp_server(task_type, timeout=8, **kwargs) coroutine
        - _call_mcp_tool(name, arguments, timeout=10) coroutine
        - _get_tools_from_mcp_server(timeout=8) coroutine
    }

    MCPToolCallSession o-- ClientSession : uses
    MCPToolCallSession ..> ThreadPoolExecutor : manages event loop thread
    MCPToolCallSession ..> asyncio.Queue : manages tasks

Summary

mcp_tool_call_conn.py is a robust asynchronous client session implementation that enables interaction with MCP servers using SSE or Streamable HTTP protocols. It provides thread-safe, synchronous interfaces for tool listing and invocation, manages session lifecycle, and ensures proper concurrency and error handling. Its design facilitates integration within larger systems requiring dynamic tool calls over MCP, supporting efficient multi-session management and graceful shutdown of resources.