Agent-to-Agent通信协议,实现多智能体协作
动态发现可用智能体服务
智能体间任务分配
任务结果共享
多智能体工作流
| 维度 | A2A | MCP |
|---|---|---|
| 通信对象 | 智能体 ↔ 智能体 | 智能体 ↔ 工具/资源 |
| 主要功能 | 任务委托、结果共享 | 工具调用、资源访问 |
| 协议层次 | 应用层协作 | 工具层接入 |
| 复杂度 | 高(多智能体协调) | 低(单一工具调用) |
请求其他智能体执行任务
返回任务执行结果
任务执行进度更新
取消正在执行的任务
任务定义
分配智能体
处理任务
跟踪进度
返回结果
多部门智能体协作处理复杂业务流程
不同专业智能体协同分析问题
监控、诊断、修复智能体配合工作
主智能体协调子智能体完成大任务
"""
A2A协议示例:多Agent协作系统
- 研究Agent:负责信息搜集
- 分析Agent:负责数据分析
- 写作Agent:负责生成报告
- 协调者:管理任务分配
"""
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
import uuid
class AgentState(Enum):
IDLE = "idle"
BUSY = "busy"
OFFLINE = "offline"
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
task_id: str
task_type: str
description: str
parameters: Dict[str, Any]
status: TaskStatus = TaskStatus.PENDING
result: Any = None
@dataclass
class Message:
message_id: str
sender: str
receiver: str
content: Dict
message_type: str
class BaseAgent:
"""A2A Agent 基类"""
def __init__(self, name: str, capabilities: List[str]):
self.agent_id = str(uuid.uuid4())[:8]
self.name = name
self.capabilities = capabilities
self.state = AgentState.IDLE
self.inbox: List[Message] = []
def receive_message(self, message: Message):
self.inbox.append(message)
def send_message(self, receiver: str, content: Dict, msg_type: str):
message = Message(
message_id=str(uuid.uuid4()),
sender=self.name,
receiver=receiver,
content=content,
message_type=msg_type
)
return message
def can_handle(self, task_type: str) -> bool:
return task_type in self.capabilities
class ResearcherAgent(BaseAgent):
"""研究Agent"""
def __init__(self):
super().__init__("Researcher", ["research", "search", "collect"])
def execute_task(self, task: Task) -> Dict:
self.state = AgentState.BUSY
print(f"🔍 {self.name} 正在研究: {task.description}")
# 模拟研究过程
result = {
"findings": [
f"关于{task.parameters.get('topic', '主题')}的关键信息1",
f"关于{task.parameters.get('topic', '主题')}的关键信息2",
f"关于{task.parameters.get('topic', '主题')}的关键信息3"
],
"sources": ["source1", "source2", "source3"]
}
self.state = AgentState.IDLE
return result
class AnalyzerAgent(BaseAgent):
"""分析Agent"""
def __init__(self):
super().__init__("Analyzer", ["analyze", "summarize", "compare"])
def execute_task(self, task: Task) -> Dict:
self.state = AgentState.BUSY
print(f"📊 {self.name} 正在分析: {task.description}")
data = task.parameters.get("data", [])
result = {
"summary": f"分析了{len(data)}条数据",
"insights": ["洞察1", "洞察2"],
"trends": ["趋势1", "趋势2"]
}
self.state = AgentState.IDLE
return result
class WriterAgent(BaseAgent):
"""写作Agent"""
def __init__(self):
super().__init__("Writer", ["write", "format", "edit"])
def execute_task(self, task: Task) -> Dict:
self.state = AgentState.BUSY
print(f"📝 {self.name} 正在写作: {task.description}")
content = task.parameters.get("content", {})
result = {
"report": f"# {content.get('topic', '主题')}研究报告\n\n" +
f"基于以下分析:{content.get('summary', '')}",
"length": "2000字",
"format": "markdown"
}
self.state = AgentState.IDLE
return result
class CoordinatorAgent(BaseAgent):
"""协调者Agent - A2A协议核心"""
def __init__(self, agents: List[BaseAgent]):
super().__init__("Coordinator", ["coordinate", "delegate", "monitor"])
self.agents = {agent.name: agent for agent in agents}
self.task_history: Dict[str, Task] = {}
def delegate_task(self, task: Task) -> str:
"""任务委派 - A2A协议核心功能"""
print(f"🎯 {self.name} 正在委派任务: {task.task_type}")
# 选择合适的Agent
for agent_name, agent in self.agents.items():
if agent.can_handle(task.task_type):
task.status = TaskStatus.IN_PROGRESS
self.task_history[task.task_id] = task
# 发送任务消息
message = self.send_message(
agent_name,
{"task_id": task.task_id, "description": task.description},
"task_request"
)
print(f"📤 任务已委派给 {agent_name}")
return agent_name
return None
def collect_result(self, task_id: str) -> Any:
"""收集任务结果"""
task = self.task_history.get(task_id)
if task and task.status == TaskStatus.COMPLETED:
return task.result
return None
# 使用示例
def main():
# 1. 创建Agent
researcher = ResearcherAgent()
analyzer = AnalyzerAgent()
writer = WriterAgent()
coordinator = CoordinatorAgent([researcher, analyzer, writer])
# 2. 协调者委派任务
research_task = Task(
task_id="task-001",
task_type="research",
description="研究AI发展趋势",
parameters={"topic": "人工智能"}
)
# A2A协议:任务委派
assigned_agent = coordinator.delegate_task(research_task)
# 3. Agent执行任务
agent = coordinator.agents[assigned_agent]
result = agent.execute_task(research_task)
# 4. 返回结果
research_task.result = result
research_task.status = TaskStatus.COMPLETED
print("\n✅ 任务完成!")
print(f"研究发现: {result['findings']}")
main()