Model Context Protocol - AI工具连接标准
负责数据传输,支持多种通信方式
AI可调用的函数或API
可访问的数据和文档
预定义的提示模板
LLM生成过程中的控制
权限和沙箱机制
| 消息类型 | 方向 | 说明 |
|---|---|---|
| initialize | 双向 | 建立连接 |
| notifications/init | Client→Server | 初始化完成 |
| tools/list | Client→Server | 列出工具 |
| tools/call | Client→Server | 调用工具 |
| resources/list | Client→Server | 列出资源 |
| resources/read | Client→Server | 读取资源 |
| sampling/create | Client→Server | 请求采样 |
| 特性 | MCP | 传统方案 |
|---|---|---|
| 标准化 | 统一协议 | 各自为政 |
| 工具发现 | 动态发现 | 静态注册 |
| 扩展性 | 即插即用 | 需修改代码 |
| 安全性 | 沙箱机制 | 依赖实现 |
| 互操作 | 框架无关 | 框架绑定 |
AI连接搜索引擎、向量库
访问代码库、API文档
连接数据库、BI工具
调用第三方API
读取PDF、Word等
编排复杂任务
"""
MCP协议示例:创建自定义工具Server
- 文件操作工具
- 数据库查询工具
- API调用工具
"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
import json
import sqlite3
import requests
from datetime import datetime
# 1. 定义工具Schema (符合MCP规范)
class ToolSchema(BaseModel):
name: str
description: str
inputSchema: Dict
class ToolResult(BaseModel):
content: Any
isError: bool = False
# 2. 文件操作工具
class FileOperationTool:
"""文件操作工具 - 符合MCP Tool规范"""
@staticmethod
def get_schema() -> Dict:
return {
"name": "file_operations",
"description": "读取、写入、列出文件",
"inputSchema": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["read", "write", "list", "delete"],
"description": "操作类型"
},
"path": {"type": "string", "description": "文件路径"},
"content": {"type": "string", "description": "写入内容"}
},
"required": ["operation", "path"]
}
}
def execute(self, operation: str, path: str, content: str = None) -> ToolResult:
try:
if operation == "read":
with open(path, "r", encoding="utf-8") as f:
return ToolResult(content=f.read())
elif operation == "write":
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return ToolResult(content=f"已写入: {path}")
elif operation == "list":
import os
files = os.listdir(path)
return ToolResult(content=json.dumps(files))
elif operation == "delete":
import os
if os.path.exists(path):
os.remove(path)
return ToolResult(content=f"已删除: {path}")
return ToolResult(content=f"文件不存在: {path}")
except Exception as e:
return ToolResult(content=str(e), isError=True)
# 3. 数据库查询工具
class DatabaseQueryTool:
"""数据库查询工具 - 符合MCP Tool规范"""
def __init__(self, db_path: str = ":memory:"):
self.conn = sqlite3.connect(db_path)
self.create_sample_data()
def create_sample_data(self):
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
age INTEGER
)
""")
cursor.execute("INSERT OR IGNORE INTO users (id, name, email, age) VALUES (1, '张三', 'zhangsan@example.com', 25)")
cursor.execute("INSERT OR IGNORE INTO users (id, name, email, age) VALUES (2, '李四', 'lisi@example.com', 30)")
self.conn.commit()
def get_schema(self) -> Dict:
return {
"name": "database_query",
"description": "执行SQL查询",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL查询语句"},
"params": {"type": "array", "description": "查询参数"}
},
"required": ["query"]
}
}
def execute(self, query: str, params: List = None) -> ToolResult:
try:
cursor = self.conn.cursor()
cursor.execute(query, params or [])
if query.strip().upper().startswith("SELECT"):
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return ToolResult(content=json.dumps({
"columns": columns,
"rows": results,
"count": len(results)
}))
else:
self.conn.commit()
return ToolResult(content=f"影响行数: {cursor.rowcount}")
except Exception as e:
return ToolResult(content=str(e), isError=True)
# 4. API调用工具
class APICallTool:
"""API调用工具 - 符合MCP Tool规范"""
def get_schema(self) -> Dict:
return {
"name": "api_call",
"description": "发送HTTP请求",
"inputSchema": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "API地址"},
"method": {"type": "string", "enum": ["GET", "POST", "PUT", "DELETE"]},
"headers": {"type": "object", "description": "请求头"},
"data": {"type": "object", "description": "请求数据"}
},
"required": ["url", "method"]
}
}
def execute(self, url: str, method: str, headers: Dict = None, data: Dict = None) -> ToolResult:
try:
response = requests.request(
method=method,
url=url,
headers=headers or {},
json=data
)
return ToolResult(content=json.dumps({
"status_code": response.status_code,
"response": response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text
}))
except Exception as e:
return ToolResult(content=str(e), isError=True)
# 5. MCP Server实现
class MCPServer:
"""MCP Server - 管理所有工具"""
def __init__(self):
self.tools = [
FileOperationTool(),
DatabaseQueryTool(),
APICallTool()
]
self.tool_schemas = [t.get_schema() for t in self.tools]
def list_tools(self) -> List[Dict]:
"""返回所有工具Schema - MCP协议"""
return self.tool_schemas
def call_tool(self, name: str, arguments: Dict) -> Dict:
"""调用工具 - MCP协议核心"""
for tool in self.tools:
if tool.get_schema()["name"] == name:
result = tool.execute(**arguments)
return {
"content": [{"type": "text", "text": result.content}],
"isError": result.isError
}
return {"error": f"Tool not found: {name}", "isError": True}
# 使用示例
def main():
# 1. 创建MCP Server
server = MCPServer()
# 2. 列出可用工具 (MCP协议: tools/list)
print("📋 可用工具:")
for tool in server.list_tools():
print(f" - {tool['name']}: {tool['description']}")
# 3. 调用文件读取工具 (MCP协议: tools/call)
print("\n📄 读取文件:")
result = server.call_tool("file_operations", {
"operation": "list",
"path": "."
})
print(f" 结果: {result}")
# 4. 调用数据库查询工具
print("\n🗄️ 查询数据库:")
result = server.call_tool("database_query", {
"query": "SELECT * FROM users WHERE age > ?",
"params": [25]
})
print(f" 结果: {result}")
main()