A2A协议

Agent-to-Agent通信协议,实现多智能体协作

什么是A2A协议?
┌─────────────────────────────────────────────────────────────────────────┐ │ A2A (Agent-to-Agent) 协议 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 发布者:Anthropic(2025年) │ │ 定位:AI智能体之间的通信和协作标准化框架 │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Agent A │◄──►│ Agent B │◄──►│ Agent C │◄──►│ Agent D │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ └───────────────┴───────┬───────┴───────────────┘ │ │ │ │ │ ┌───────┴───────┐ │ │ │ Agent Registry │ │ │ │ (服务发现) │ │ │ └───────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘

核心目标

🔍 智能体发现

动态发现可用智能体服务

📋 任务委托

智能体间任务分配

📊 结果传递

任务结果共享

🎼 协作编排

多智能体工作流

A2A vs MCP

维度 A2A MCP
通信对象 智能体 ↔ 智能体 智能体 ↔ 工具/资源
主要功能 任务委托、结果共享 工具调用、资源访问
协议层次 应用层协作 工具层接入
复杂度 高(多智能体协调) 低(单一工具调用)
A2A协议架构
┌─────────────────────────────────────────────────────────────────────────┐ │ A2A 协议栈架构 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Application Layer │ │ │ │ (Agent应用逻辑、业务流程) │ │ │ ├─────────────────────────────────────────────────────────┤ │ │ │ A2A Protocol Layer │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │ │ │ │ Task Mgmt │ │Message │ │ Agent Discovery │ │ │ │ │ │ (任务管理) │ │(消息传递) │ │ (服务发现) │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ │ │ ├─────────────────────────────────────────────────────────┤ │ │ │ Transport Layer (HTTP/WebSocket) │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘

核心消息类型

task_request

请求其他智能体执行任务

包含:任务ID、参数、期望结果

task_response

返回任务执行结果

包含:结果数据、状态、错误信息

task_update

任务执行进度更新

包含:当前状态、百分比、完成度

task_cancel

取消正在执行的任务

包含:取消原因、任务ID

智能体状态管理

IDLE
接收任务
BUSY
执行任务
COMPLETED/FAILED

任务生命周期

1. 创建

任务定义

2. 调度

分配智能体

3. 执行

处理任务

4. 监控

跟踪进度

5. 完成

返回结果

使用场景

🏢 企业工作流

多部门智能体协作处理复杂业务流程

🔬 研究分析

不同专业智能体协同分析问题

🛠️ 自动化运维

监控、诊断、修复智能体配合工作

🎯 复杂任务分解

主智能体协调子智能体完成大任务

💻 代码示例

示例:多Agent协作系统
"""
A2A协议示例:多Agent协作系统
- 研究Agent:负责信息搜集
- 分析Agent:负责数据分析
- 写作Agent:负责生成报告
- 协调者:管理任务分配
"""
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
import uuid

class AgentState(Enum):
    IDLE = "idle"
    BUSY = "busy"
    OFFLINE = "offline"

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    task_id: str
    task_type: str
    description: str
    parameters: Dict[str, Any]
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None

@dataclass
class Message:
    message_id: str
    sender: str
    receiver: str
    content: Dict
    message_type: str

class BaseAgent:
    """A2A Agent 基类"""
    
    def __init__(self, name: str, capabilities: List[str]):
        self.agent_id = str(uuid.uuid4())[:8]
        self.name = name
        self.capabilities = capabilities
        self.state = AgentState.IDLE
        self.inbox: List[Message] = []
    
    def receive_message(self, message: Message):
        self.inbox.append(message)
    
    def send_message(self, receiver: str, content: Dict, msg_type: str):
        message = Message(
            message_id=str(uuid.uuid4()),
            sender=self.name,
            receiver=receiver,
            content=content,
            message_type=msg_type
        )
        return message
    
    def can_handle(self, task_type: str) -> bool:
        return task_type in self.capabilities


class ResearcherAgent(BaseAgent):
    """研究Agent"""
    
    def __init__(self):
        super().__init__("Researcher", ["research", "search", "collect"])
    
    def execute_task(self, task: Task) -> Dict:
        self.state = AgentState.BUSY
        print(f"🔍 {self.name} 正在研究: {task.description}")
        
        # 模拟研究过程
        result = {
            "findings": [
                f"关于{task.parameters.get('topic', '主题')}的关键信息1",
                f"关于{task.parameters.get('topic', '主题')}的关键信息2",
                f"关于{task.parameters.get('topic', '主题')}的关键信息3"
            ],
            "sources": ["source1", "source2", "source3"]
        }
        
        self.state = AgentState.IDLE
        return result


class AnalyzerAgent(BaseAgent):
    """分析Agent"""
    
    def __init__(self):
        super().__init__("Analyzer", ["analyze", "summarize", "compare"])
    
    def execute_task(self, task: Task) -> Dict:
        self.state = AgentState.BUSY
        print(f"📊 {self.name} 正在分析: {task.description}")
        
        data = task.parameters.get("data", [])
        result = {
            "summary": f"分析了{len(data)}条数据",
            "insights": ["洞察1", "洞察2"],
            "trends": ["趋势1", "趋势2"]
        }
        
        self.state = AgentState.IDLE
        return result


class WriterAgent(BaseAgent):
    """写作Agent"""
    
    def __init__(self):
        super().__init__("Writer", ["write", "format", "edit"])
    
    def execute_task(self, task: Task) -> Dict:
        self.state = AgentState.BUSY
        print(f"📝 {self.name} 正在写作: {task.description}")
        
        content = task.parameters.get("content", {})
        result = {
            "report": f"# {content.get('topic', '主题')}研究报告\n\n" + 
                     f"基于以下分析:{content.get('summary', '')}",
            "length": "2000字",
            "format": "markdown"
        }
        
        self.state = AgentState.IDLE
        return result


class CoordinatorAgent(BaseAgent):
    """协调者Agent - A2A协议核心"""
    
    def __init__(self, agents: List[BaseAgent]):
        super().__init__("Coordinator", ["coordinate", "delegate", "monitor"])
        self.agents = {agent.name: agent for agent in agents}
        self.task_history: Dict[str, Task] = {}
    
    def delegate_task(self, task: Task) -> str:
        """任务委派 - A2A协议核心功能"""
        print(f"🎯 {self.name} 正在委派任务: {task.task_type}")
        
        # 选择合适的Agent
        for agent_name, agent in self.agents.items():
            if agent.can_handle(task.task_type):
                task.status = TaskStatus.IN_PROGRESS
                self.task_history[task.task_id] = task
                
                # 发送任务消息
                message = self.send_message(
                    agent_name,
                    {"task_id": task.task_id, "description": task.description},
                    "task_request"
                )
                
                print(f"📤 任务已委派给 {agent_name}")
                return agent_name
        
        return None
    
    def collect_result(self, task_id: str) -> Any:
        """收集任务结果"""
        task = self.task_history.get(task_id)
        if task and task.status == TaskStatus.COMPLETED:
            return task.result
        return None


# 使用示例
def main():
    # 1. 创建Agent
    researcher = ResearcherAgent()
    analyzer = AnalyzerAgent()
    writer = WriterAgent()
    coordinator = CoordinatorAgent([researcher, analyzer, writer])
    
    # 2. 协调者委派任务
    research_task = Task(
        task_id="task-001",
        task_type="research",
        description="研究AI发展趋势",
        parameters={"topic": "人工智能"}
    )
    
    # A2A协议:任务委派
    assigned_agent = coordinator.delegate_task(research_task)
    
    # 3. Agent执行任务
    agent = coordinator.agents[assigned_agent]
    result = agent.execute_task(research_task)
    
    # 4. 返回结果
    research_task.result = result
    research_task.status = TaskStatus.COMPLETED
    
    print("\n✅ 任务完成!")
    print(f"研究发现: {result['findings']}")

main()
代码说明

🤖 Agent类型

  • Researcher - 信息搜集
  • Analyzer - 数据分析
  • Writer - 内容生成
  • Coordinator - 任务协调

📡 A2A核心功能

  • 任务委派 (delegate_task)
  • 消息传递 (send_message)
  • 结果收集 (collect_result)
  • 状态管理 (AgentState)

🔄 工作流程

  • 协调者接收任务
  • 选择合适Agent
  • 委派任务
  • 收集结果