const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});const transport = new StdioServerTransport();await server.connect(transport);
Copy
const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});const transport = new StdioServerTransport();await server.connect(transport);
Copy
const client = new Client({ name: "example-client", version: "1.0.0"}, { capabilities: {}});const transport = new StdioClientTransport({ command: "./server", args: ["--option", "value"]});await client.connect(transport);
Copy
app = Server("example-server")async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() )
Copy
params = StdioServerParameters( command="./server", args=["--option", "value"])async with stdio_client(params) as streams: async with ClientSession(streams[0], streams[1]) as session: await session.initialize()
SSE 传输支持服务器到客户端的流式传输,同时使用 HTTP POST 请求进行客户端到服务器的通信。
在以下情况下使用 SSE:
仅需要服务器到客户端的流式传输
在受限网络环境中工作
实现简单的更新
Copy
import express from "express";const app = express();const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});let transport: SSEServerTransport | null = null;app.get("/sse", (req, res) => { transport = new SSEServerTransport("/messages", res); server.connect(transport);});app.post("/messages", (req, res) => { if (transport) { transport.handlePostMessage(req, res); }});app.listen(3000);
Copy
import express from "express";const app = express();const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});let transport: SSEServerTransport | null = null;app.get("/sse", (req, res) => { transport = new SSEServerTransport("/messages", res); server.connect(transport);});app.post("/messages", (req, res) => { if (transport) { transport.handlePostMessage(req, res); }});app.listen(3000);
Copy
const client = new Client({ name: "example-client", version: "1.0.0"}, { capabilities: {}});const transport = new SSEClientTransport( new URL("http://localhost:3000/sse"));await client.connect(transport);
interface Transport { // Start processing messages start(): Promise<void>; // Send a JSON-RPC message send(message: JSONRPCMessage): Promise<void>; // Close the connection close(): Promise<void>; // Callbacks onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void;}
Copy
interface Transport { // Start processing messages start(): Promise<void>; // Send a JSON-RPC message send(message: JSONRPCMessage): Promise<void>; // Close the connection close(): Promise<void>; // Callbacks onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void;}
Note that while MCP Servers are often implemented with asyncio, we recommend
implementing low-level interfaces like transports with anyio for wider compatibility.
Copy
@contextmanagerasync def create_transport( read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception], write_stream: MemoryObjectSendStream[JSONRPCMessage]): """ Transport interface for MCP. Args: read_stream: Stream to read incoming messages from write_stream: Stream to write outgoing messages to """ async with anyio.create_task_group() as tg: try: # Start processing messages tg.start_soon(lambda: process_messages(read_stream)) # Send messages async with write_stream: yield write_stream except Exception as exc: # Handle errors raise exc finally: # Clean up tg.cancel_scope.cancel() await write_stream.aclose() await read_stream.aclose()
Note that while MCP Servers are often implemented with asyncio, we recommend
implementing low-level interfaces like transports with anyio for wider compatibility.
Copy
@contextmanagerasync def example_transport(scope: Scope, receive: Receive, send: Send): try: # Create streams for bidirectional communication read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) async def message_handler(): try: async with read_stream_writer: # Message handling logic pass except Exception as exc: logger.error(f"Failed to handle message: {exc}") raise exc async with anyio.create_task_group() as tg: tg.start_soon(message_handler) try: # Yield streams for communication yield read_stream, write_stream except Exception as exc: logger.error(f"Transport error: {exc}") raise exc finally: tg.cancel_scope.cancel() await write_stream.aclose() await read_stream.aclose() except Exception as exc: logger.error(f"Failed to initialize transport: {exc}") raise exc