📡 Agent通信协议
1. 直接消息传递
直接消息传递是最简单、最直接的Agent间通信方式。一个Agent直接向另一个Agent发送消息,接收方处理后返回结果。 这种模式类似于微服务中的同步RPC调用。
📨 直接消息传递的特点
| 特性 | 说明 |
|---|---|
| 通信拓扑 | 点对点(Peer-to-Peer) |
| 耦合度 | 紧耦合:发送方需知道接收方地址/标识 |
| 同步性 | 通常为同步请求-响应模式,也支持异步 |
| 可靠性 | 依赖底层传输协议(如HTTP/gRPC/WebSocket) |
| 适用场景 | Agent数量少、通信关系固定、需要即时响应的场景 |
| 典型实现 | REST API、gRPC、函数直接调用 |
示例(Python伪代码):
# Agent A 直接调用 Agent B
result = agent_b.process({
"task": "analyze_code",
"code": code_snippet,
"context": {"language": "python", "focus": "security"}
})
# result: {"findings": [...], "score": 85, "confidence": 0.92}
2. 发布/订阅模式(Pub/Sub)
发布/订阅模式通过一个消息中间件(Message Broker)解耦发送方和接收方。 Agent将消息发布到特定Topic,所有订阅了该Topic的Agent都会收到消息。
📡 Pub/Sub的核心概念
| 概念 | 说明 |
|---|---|
| Topic(主题) | 消息的逻辑分类,如 task.assigned, task.completed, agent.status |
| Publisher(发布者) | 向Topic发送消息的Agent,不关心谁来消费 |
| Subscriber(订阅者) | 订阅Topic的Agent,接收并处理消息 |
| Broker(中间件) | 消息路由中枢,如 Redis Pub/Sub、RabbitMQ、Kafka、NATS |
| 消息持久化 | 可选:Broker存储消息,确保离线Agent重新上线后仍能接收 |
💡 Pub/Sub的典型应用
- 事件广播:某个Agent完成关键任务后,广播通知所有关注者
- Worker池调度:发布任务消息,多个同类型Worker竞争消费
- 状态同步:Agent的状态变化通过Topic通知其他Agent
- 日志/监控:所有Agent的日志统一发布到监控Topic
3. 共享黑板模式(Blackboard)
共享黑板模式是一种基于共享知识空间的协作方式。 所有Agent可以读写一个公共的数据结构("黑板"),通过观察黑板上的信息来决定自己的行动。
📋 黑板模式工作机制
| 阶段 | 描述 |
|---|---|
| 1. 初始化 | 用户请求写入黑板,标记为待处理 |
| 2. 观察 | 各Agent持续观察黑板,匹配自己能处理的任务 |
| 3. 认领 | Agent认领任务并标记为处理中 |
| 4. 写入 | Agent将处理结果写入黑板的相关区域 |
| 5. 触发 | 新写入的信息可能触发其他Agent开始工作(级联协作) |
| 6. 完成 | 当所有相关任务完成后,最终结果从黑板提取 |
黑板的数据结构:
// 黑板示例结构
Blackboard = {
"request": { "id": "req-001", "content": "...", "status": "processing" },
"analysis": {
"security": { "status": "done", "findings": [...], "by": "agent-security" },
"performance": { "status": "pending" }
},
"decisions": [],
"context": { "project_lang": "python", "deadline": "2026-06-01" }
}
4. Agent间消息格式
标准化的消息格式是Agent之间高效通信的基础。以下是推荐的通用消息结构:
📦 推荐消息格式
{
"header": {
"message_id": "uuid-1234", // 全局唯一消息ID
"timestamp": "2026-05-26T10:00:00Z", // ISO 8601时间戳
"sender": "agent-orchestrator", // 发送方Agent ID
"receiver": "agent-code-reviewer", // 接收方Agent ID(直接消息时)/ topic(Pub/Sub时)
"message_type": "task_request", // 消息类型
"protocol_version": "1.0", // 协议版本
"priority": "high" // 优先级: low/normal/high/critical
},
"body": {
"task_type": "code_review", // 任务类型
"payload": { ... }, // 任务具体数据
"context": { ... }, // 共享上下文
"expected_format": "json", // 期望的响应格式
"timeout_seconds": 60 // 超时时间
},
"trace": {
"correlation_id": "req-5678", // 关联ID,用于追踪完整请求链路
"parent_message_id": "uuid-999", // 父消息ID(如果此消息是响应)
"span_id": "span-001" // 分布式追踪Span ID
}
}
5. 通信延迟与可靠性
多Agent系统中,通信的延迟和可靠性直接影响系统整体性能和用户体验。
| 挑战 | 影响 | 解决方案 |
|---|---|---|
| 网络延迟 | Agent等待响应的时间增加,整体任务耗时变长 | 就近部署、连接池、批量通信、异步化 |
| 消息丢失 | 子任务被遗漏,导致最终结果不完整 | 消息持久化、ACK确认机制、至少一次投递保证 |
| 消息重复 | 同一子任务被多次执行,浪费资源且可能产生副作用 | 幂等性设计、去重ID、Exactly-Once语义(代价较高) |
| 消息乱序 | 有依赖关系的任务执行顺序错乱 | 消息排序(Kafka分区有序)、版本号/序列号、显式依赖声明 |
| Agent不可达 | 目标Agent下线或过载,消息发送失败 | 健康检查、心跳机制、自动重试、死信队列 |
| 消息积压 | 消费者处理速度跟不上生产者,队列堆积 | 背压机制、动态扩容、优先级队列、消息TTL |
⚠️ 长链路通信的风险
在多Agent链路中,如果Agent A调用Agent B,Agent B又调用Agent C(链式调用),延迟和故障风险会逐级放大。每增加一跳,整体延迟增加2-5秒(LLM推理耗时),且任一跳失败都可能导致整条链路失败。建议:
- 限制调用链深度(建议不超过3跳)
- 设置每跳的超时(默认30-60秒)
- 实现链路级别的熔断和降级
6. 通信模式对比
| 对比维度 | 直接消息传递 | 发布/订阅 | 共享黑板 | 对话式 |
|---|---|---|---|---|
| 耦合度 | 紧耦合 | 松耦合 | 松耦合 | 松耦合 |
| 延迟 | 低 | 中 | 中-高 | 高 |
| 可扩展性 | 差 | 好 | 好 | 中 |
| 可靠性 | 中(依赖重试) | 高(Broker保证) | 中(需自己实现) | 低 |
| 消息格式 | 结构化(JSON/Proto) | 结构化或半结构化 | 自由格式 | 自然语言 |
| 调试难度 | 低 | 中 | 高 | 高 |
| 实现复杂度 | 低 | 中 | 中-高 | 低 |
| Token消耗 | 低 | 低-中 | 中 | 高 |
| 典型场景 | 确定性任务链、Orchestrator→Worker | 事件驱动、广播通知、状态同步 | 协同问题求解、创意协作 | 人机混合、开放域讨论 |
| 代表框架 | AutoGen (直接调用) | 基于Celery/RabbitMQ的Agent系统 | LangGraph (State传递类似黑板) | CrewAI (对话任务) |
🎯 通信模式选择建议
- 启动阶段:从直接消息传递开始,简单可控,快速验证核心逻辑
- 规模增长:Agent数量超过5个时,考虑引入Pub/Sub解耦
- 复杂协作:当Agent需要基于知识空间协作时,引入黑板模式
- 混合使用:大多数生产系统同时使用多种模式:控制面用直接消息,数据面用Pub/Sub,共享状态用黑板