文章摘要 FakeGPT
加载中...|
概述
随着任务复杂度的增加,单个 Agent 往往难以应对。多 Agent 系统通过让多个专门化的 Agent 协作,可以解决更复杂的问题。本文将深入探讨多 Agent 协作模式、规划机制和主流框架。
为什么需要多 Agent
单 Agent 的局限
text
┌─────────────────────────────────────────────────────────┐
│ 单 Agent vs 多 Agent 对比 │
├─────────────────────────────────────────────────────────┤
│ │
│ 单 Agent: │
│ • 需要掌握所有技能 │
│ • 难以并行处理 │
│ • 容易出错(牵一发而动全身) │
│ • 难以扩展 │
│ • 上下文窗口限制 │
│ │
│ 多 Agent: │
│ • 专业化分工(每个 Agent 专注一个领域) │
│ • 并行执行任务 │
│ • 错误隔离(一个出错不影响其他) │
│ • 易于扩展 │
│ • 分布式处理 │
│ │
└─────────────────────────────────────────────────────────┘多 Agent 的优势
| 优势 | 说明 | 示例 |
|---|---|---|
| 专业化 | 每个 Agent 专注于特定领域 | 代码 Agent、写作 Agent、分析 Agent |
| 并行化 | 同时执行多个任务 | 同时搜索多个信息源 |
| 容错性 | 单个失败不影响整体 | 一个搜索失败,其他继续 |
| 可扩展 | 容易添加新能力 | 添加新的专业 Agent |
| 模块化 | 独立开发和测试 | 每个单元可单独优化 |
Agent 规划机制
规划的重要性
text
┌─────────────────────────────────────────────────────────┐
│ Agent 规划流程 │
├─────────────────────────────────────────────────────────┤
│ │
│ 用户任务 │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ 1. 理解目标 │ │
│ │ 分析任务要求、约束条件 │ │
│ └─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ 2. 任务分解 │ │
│ │ 将复杂任务分解为子任务 │ │
│ └─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ 3. 分配资源 │ │
│ │ 决定哪些 Agent 执行哪些任务 │ │
│ └─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ 4. 执行与监控 │ │
│ │ 执行任务,检查进度,处理错误 │ │
│ └─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ 5. 结果整合 │ │
│ │ 合并各 Agent 结果,生成最终输出 │ │
│ └─────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘任务分解策略
python
from typing import List, Dict
from pydantic import BaseModel
class SubTask(BaseModel):
"""子任务"""
id: str
description: str
agent_type: str # 需要什么类型的 Agent
dependencies: List[str] # 依赖的其他任务
inputs: Dict # 输入参数
class TaskPlanner:
"""任务规划器"""
def __init__(self):
self.planner_llm = ChatOpenAI(model="gpt-4o")
def plan(self, task: str) -> List[SubTask]:
"""将任务分解为子任务"""
prompt = f"""将以下任务分解为多个子任务。
任务: {task}
请考虑:
1. 需要哪些步骤?
2. 每个步骤需要什么类型的 Agent?
3. 任务之间的依赖关系?
返回JSON格式的子任务列表。"""
response = self.planner_llm.invoke(prompt)
# 解析并返回子任务
# 简化示例
return self._parse_plan(response.content)
def _parse_plan(self, plan_text: str) -> List[SubTask]:
"""解析规划结果"""
# 实际应用中应该使用结构化输出
import json
try:
data = json.loads(plan_text)
return [SubTask(**task) for task in data.get("tasks", [])]
except:
# 返回默认规划
return [
SubTask(
id="1",
description="分析任务",
agent_type="analyst",
dependencies=[],
inputs={"task": task}
),
SubTask(
id="2",
description="执行任务",
agent_type="executor",
dependencies=["1"],
inputs={}
)
]
# 使用
planner = TaskPlanner()
subtasks = planner.plan("帮我分析一下苹果公司最近的财报")
for task in subtasks:
print(f"{task.id}: {task.description} (Agent: {task.agent_type})")动态规划
python
class AdaptivePlanner:
"""自适应规划器"""
def __init__(self):
self.planner = ChatOpenAI(model="gpt-4o")
def plan_and_execute(self, task: str, max_iterations: int = 5):
"""规划并执行,根据结果动态调整"""
context = {"task": task, "iteration": 0}
results = []
for i in range(max_iterations):
context["iteration"] = i
# 1. 生成/更新计划
plan = self._generate_plan(context)
# 2. 执行当前步骤
result = self._execute_step(plan, context)
results.append(result)
# 3. 检查是否完成
if self._is_complete(result, context):
break
# 4. 更新上下文
context["previous_results"] = results
return self._combine_results(results)
def _generate_plan(self, context: Dict) -> Dict:
"""生成或更新计划"""
previous_results = context.get("previous_results", [])
if not previous_results:
# 首次规划
prompt = f"""为以下任务制定执行计划:
任务: {context['task']}
返回当前应该执行的步骤。"""
else:
# 根据之前结果调整计划
prompt = f"""基于之前的结果调整计划:
原始任务: {context['task']}
之前的结果: {previous_results}
返回下一步应该执行的步骤。"""
response = self.planner.invoke(prompt)
return self._parse_plan(response.content)
def _execute_step(self, plan: Dict, context: Dict) -> Dict:
"""执行计划步骤"""
# 根据计划调用相应的 Agent
agent_type = plan.get("agent_type")
agent = self._get_agent(agent_type)
return agent.execute(plan.get("inputs", {}))
def _is_complete(self, result: Dict, context: Dict) -> bool:
"""检查任务是否完成"""
prompt = f"""判断以下任务是否已完成:
任务: {context['task']}
当前结果: {result}
返回 "yes" 或 "no"。"""
response = self.planner.invoke(prompt)
return "yes" in response.content.lower()多 Agent 通信模式
通信模式分类
text
┌─────────────────────────────────────────────────────────┐
│ 多 Agent 通信模式 │
├─────────────────────────────────────────────────────────┤
│ │
│ 1. 层次式 (Hierarchical) │
│ Manager Agent │
│ ├── Worker Agent 1 │
│ ├── Worker Agent 2 │
│ └── Worker Agent 3 │
│ │
│ 2. 平等式 (Flat) │
│ Agent 1 ←→ Agent 2 ←→ Agent 3 │
│ │
│ 3. 网络式 (Network) │
│ ┌─────┐ ┌─────┐ │
│ │ A1 │────▶│ A2 │ │
│ └─────┘ └─────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────┐ ┌─────┐ │
│ │ A3 │────▶│ A4 │ │
│ └─────┘ └─────┘ │
│ │
│ 4. 议会式 (Consensus) │
│ 所有 Agent 讨论并投票 │
│ │
└─────────────────────────────────────────────────────────┘层次式协作
python
from abc import ABC, abstractmethod
from typing import Any, Dict
class Agent(ABC):
"""Agent 基类"""
def __init__(self, name: str):
self.name = name
@abstractmethod
def execute(self, task: str, context: Dict = None) -> Dict:
"""执行任务"""
pass
class ManagerAgent(Agent):
"""管理 Agent"""
def __init__(self, name: str, workers: List[Agent]):
super().__init__(name)
self.workers = workers
self.planner = ChatOpenAI(model="gpt-4o")
def execute(self, task: str, context: Dict = None) -> Dict:
"""分配任务并协调执行"""
context = context or {}
# 1. 任务分解
subtasks = self._decompose_task(task)
# 2. 分配给 Worker
worker_results = {}
for subtask in subtasks:
worker = self._find_worker(subtask["agent_type"])
if worker:
result = worker.execute(subtask["task"], context)
worker_results[subtask["id"]] = result
# 3. 整合结果
return self._integrate_results(task, worker_results)
def _decompose_task(self, task: str) -> List[Dict]:
"""分解任务"""
prompt = f"""将任务分解并分配给合适的 Agent:
可用 Agent 类型:
- researcher: 研究信息
- writer: 撰写内容
- analyst: 分析数据
- coder: 编写代码
任务: {task}
返回子任务列表,包含 agent_type 和 task 字段。"""
response = self.planner.invoke(prompt)
# 简化:返回解析后的子任务
return self._parse_subtasks(response.content)
def _find_worker(self, agent_type: str) -> Agent:
"""找到合适的 Worker"""
for worker in self.workers:
if worker.agent_type == agent_type:
return worker
return None
def _integrate_results(self, task: str, results: Dict) -> Dict:
"""整合结果"""
prompt = f"""整合以下 Agent 的执行结果:
原始任务: {task}
执行结果: {json.dumps(results, ensure_ascii=False)}
生成最终输出。"""
response = self.planner.invoke(prompt)
return {"final_output": response.content, "details": results}
class WorkerAgent(Agent):
"""Worker Agent"""
def __init__(self, name: str, agent_type: str):
super().__init__(name)
self.agent_type = agent_type
self.llm = ChatOpenAI(model="gpt-4o")
def execute(self, task: str, context: Dict = None) -> Dict:
"""执行分配的任务"""
prompt = f"""你是 {self.agent_type} Agent。执行以下任务:
任务: {task}
上下文: {context or '无'}
返回执行结果。"""
response = self.llm.invoke(prompt)
return {
"agent": self.name,
"type": self.agent_type,
"result": response.content
}
# 使用
researcher = WorkerAgent("研究专家", "researcher")
writer = WorkerAgent("写作专家", "writer")
analyst = WorkerAgent("分析专家", "analyst")
manager = ManagerAgent("项目经理", [researcher, writer, analyst])
result = manager.execute("帮我写一份关于AI发展趋势的报告")
print(result["final_output"])平等式协作
python
class FlatAgent(Agent):
"""平等 Agent,可以互相通信"""
def __init__(self, name: str, agent_type: str, peers: List['FlatAgent'] = None):
super().__init__(name)
self.agent_type = agent_type
self.peers = peers or []
self.llm = ChatOpenAI(model="gpt-4o")
self.message_queue = []
def send_message(self, recipient: str, message: str):
"""发送消息给其他 Agent"""
for peer in self.peers:
if peer.name == recipient:
peer.receive_message(self.name, message)
return True
return False
def receive_message(self, sender: str, message: str):
"""接收消息"""
self.message_queue.append({
"from": sender,
"message": message
})
def execute(self, task: str, context: Dict = None) -> Dict:
"""执行任务,可以与其他 Agent 通信"""
# 处理收到的消息
incoming_messages = self.message_queue.copy()
self.message_queue.clear()
# 构建提示
prompt = f"""你是 {self.name},类型是 {self.agent_type}。
任务: {task}
上下文: {context or '无'}
收到的消息:
{self._format_messages(incoming_messages)}
可以发送消息给其他 Agent: {[p.name for p in self.peers]}
执行任务,如果需要与其他 Agent 协作,说明要发送的消息。
格式:SEND <Agent名称> <消息内容>
返回结果。"""
response = self.llm.invoke(prompt)
result = response.content
# 处理发送消息的指令
self._process_send_commands(result)
return {
"agent": self.name,
"result": result
}
def _format_messages(self, messages: List[Dict]) -> str:
"""格式化消息"""
if not messages:
return "无"
return "\n".join([
f"- {msg['from']}: {msg['message']}"
for msg in messages
])
def _process_send_commands(self, text: str):
"""处理发送消息指令"""
import re
pattern = r'SEND (\w+) (.+)'
matches = re.findall(pattern, text)
for recipient, message in matches:
self.send_message(recipient, message)
class FlatMultiAgentSystem:
"""平等式多 Agent 系统"""
def __init__(self):
self.agents = []
def add_agent(self, agent: FlatAgent):
"""添加 Agent"""
self.agents.append(agent)
# 更新所有 Agent 的 peers
for a in self.agents:
a.peers = [other for other in self.agents if other != a]
def execute(self, task: str, max_rounds: int = 3) -> Dict:
"""执行任务,多轮协作"""
results = []
for round in range(max_rounds):
print(f"\n=== 第 {round + 1} 轮 ===")
round_results = {}
for agent in self.agents:
result = agent.execute(
task,
{"round": round, "previous_results": results}
)
round_results[agent.name] = result
results.append(round_results)
# 检查是否需要继续
if self._should_stop(results):
break
return self._combine_results(results)
def _should_stop(self, results: List[Dict]) -> bool:
"""判断是否应该停止"""
# 简化:如果所有 Agent 都没有发送消息,认为完成
last_round = results[-1]
for agent in self.agents:
if agent.message_queue:
return False
return True
# 使用
system = FlatMultiAgentSystem()
system.add_agent(FlatAgent("代码专家", "coder"))
system.add_agent(FlatAgent("测试专家", "tester"))
system.add_agent(FlatAgent("文档专家", "writer"))
result = system.execute("开发一个用户登录功能")
print(result)主流多 Agent 框架
框架对比
| 框架 | 语言 | 特点 | 适用场景 |
|---|---|---|---|
| AutoGen | Python | 微软开发,对话式 | 通用多 Agent |
| CrewAI | Python | 角色明确,流程清晰 | 企业流程自动化 |
| MetaGPT | Python | 模拟软件公司 | 软件开发 |
| LangGraph | Python | 状态机,可控制 | 复杂工作流 |
| AgentScope | Python | 阿里开发,支持多模态 | 多模态应用 |
AutoGen
python
# pip install pyautogen
import autogen
config_list = [
{
"model": "gpt-4o",
"api_key": "your-api-key"
}
]
# 定义 Agent
assistant = autogen.AssistantAgent(
name="assistant",
llm_config={
"config_list": config_list,
"temperature": 0
}
)
user_proxy = autogen.UserProxyAgent(
name="user_proxy",
human_input_mode="NEVER",
max_consecutive_auto_reply=10,
code_execution_config={
"work_dir": "coding",
"use_docker": False
}
)
# 创建对话
user_proxy.initiate_chat(
assistant,
message="帮我写一个Python快排算法"
)CrewAI
python
# pip install crewai
from crewai import Agent, Task, Crew
# 定义 Agent
researcher = Agent(
role="研究员",
goal="研究最新技术趋势",
backstory="你是一位经验丰富的技术研究员",
llm=ChatOpenAI(model="gpt-4o"),
verbose=True
)
writer = Agent(
role="技术作家",
goal="撰写清晰的技术文章",
backstory="你擅长将复杂的技术概念解释清楚",
llm=ChatOpenAI(model="gpt-4o"),
verbose=True
)
# 定义任务
research_task = Task(
description="研究 AI Agent 的发展趋势",
agent=researcher
)
write_task = Task(
description="根据研究结果撰写一篇技术文章",
agent=writer,
context=[research_task] # 依赖研究任务
)
# 创建 Crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
verbose=True
)
# 执行
result = crew.kickoff()
print(result)LangGraph
python
# pip install langgraph
from langgraph.graph import StateGraph, END
from typing import TypedDict
class AgentState(TypedDict):
"""Agent 状态"""
task: str
research_result: str
draft: str
final_output: str
def research_node(state: AgentState):
"""研究节点"""
researcher = ChatOpenAI(model="gpt-4o")
result = researcher.invoke(f"研究: {state['task']}")
return {"research_result": result.content}
def write_node(state: AgentState):
"""写作节点"""
writer = ChatOpenAI(model="gpt-4o")
result = writer.invoke(
f"基于以下研究撰写文章: {state['research_result']}"
)
return {"draft": result.content}
def review_node(state: AgentState):
"""审核节点"""
reviewer = ChatOpenAI(model="gpt-4o")
result = reviewer.invoke(f"审核并改进: {state['draft']}")
return {"final_output": result.content}
# 构建图
workflow = StateGraph(AgentState)
workflow.add_node("research", research_node)
workflow.add_node("write", write_node)
workflow.add_node("review", review_node)
workflow.set_entry_point("research")
workflow.add_edge("research", "write")
workflow.add_edge("write", "review")
workflow.add_edge("review", END)
# 编译
app = workflow.compile()
# 执行
result = app.invoke({"task": "写一篇关于 AI 的文章"})
print(result["final_output"])实战:构建研究小组 Agent
python
from typing import List, Dict, Optional
import json
from dataclasses import dataclass
@dataclass
class Message:
"""Agent 之间的消息"""
sender: str
receiver: str
content: str
round: int
class ResearchGroupAgent:
"""研究小组 Agent"""
def __init__(self, name: str, role: str, expertise: List[str]):
self.name = name
self.role = role
self.expertise = expertise
self.llm = ChatOpenAI(model="gpt-4o")
self.memory = [] # 对话记忆
self.findings = [] # 发现/贡献
def receive(self, message: Message) -> Optional[Message]:
"""接收消息并处理"""
self.memory.append(message)
# 构建上下文
context = self._build_context()
# 决定是否需要回复
decision = self._decide_response(message.content, context)
if decision["should_respond"]:
return Message(
sender=self.name,
receiver=message.sender,
content=decision["response"],
round=message.round + 1
)
return None
def _build_context(self) -> str:
"""构建对话上下文"""
recent = self.memory[-5:] # 最近5条消息
return "\n".join([
f"{msg.sender}: {msg.content}"
for msg in recent
])
def _decide_response(self, input_msg: str, context: str) -> Dict:
"""决定是否回复以及回复内容"""
prompt = f"""你是 {self.name},角色是 {self.role}。
专业领域: {', '.join(self.expertise)}
对话历史:
{context}
新消息: {input_msg}
决定是否需要回复。如果需要:
1. 提供你的专业见解
2. 或提出相关问题
3. 或总结当前进展
返回JSON格式:
{{
"should_respond": true/false,
"response": "回复内容"
}}"""
response = self.llm.invoke(prompt)
try:
return json.loads(response.content)
except:
return {"should_respond": False, "response": ""}
class ResearchGroup:
"""研究小组"""
def __init__(self, topic: str):
self.topic = topic
self.agents = []
self.messages = []
self.round = 0
def add_agent(self, agent: ResearchGroupAgent):
"""添加 Agent"""
self.agents.append(agent)
def discuss(self, max_rounds: int = 5) -> Dict:
"""进行讨论"""
# 初始提示
initial_msg = Message(
sender="Moderator",
receiver=self.agents[0].name,
content=f"请开始讨论研究主题: {self.topic}。首先由 {self.agents[0].role} 发言。",
round=0
)
self.messages.append(initial_msg)
# 多轮讨论
for self.round in range(max_rounds):
print(f"\n=== 第 {self.round + 1} 轮讨论 ===")
round_active = False
# 收集本轮所有回复
new_messages = []
for agent in self.agents:
# 找到发给该 Agent 的消息
for msg in self.messages:
if msg.receiver == agent.name and msg.round == self.round:
response = agent.receive(msg)
if response:
new_messages.append(response)
round_active = True
print(f"{response.sender} -> {response.receiver}: {response.content[:50]}...")
# 添加新消息
self.messages.extend(new_messages)
# 检查是否应该结束
if not round_active:
print("\n讨论结束")
break
# 生成总结
return self._generate_summary()
def _generate_summary(self) -> Dict:
"""生成讨论总结"""
summary_llm = ChatOpenAI(model="gpt-4o")
conversation = "\n".join([
f"{msg.sender}: {msg.content}"
for msg in self.messages
])
prompt = f"""总结以下关于 "{self.topic}" 的讨论:
对话内容:
{conversation}
请提供:
1. 主要观点
2. 达成的共识
3. 存在的分歧
4. 建议的后续行动
返回JSON格式。"""
response = summary_llm.invoke(prompt)
try:
return json.loads(response.content)
except:
return {"summary": response.content}
# 使用
group = ResearchGroup("如何提高 AI Agent 的可靠性")
group.add_agent(ResearchGroupAgent(
"Alice",
"AI研究员",
["AI架构", "大语言模型"]
))
group.add_agent(ResearchGroupAgent(
"Bob",
"软件工程师",
["软件测试", "代码质量"]
))
group.add_agent(ResearchGroupAgent(
"Charlie",
"产品经理",
["用户体验", "需求分析"]
))
# 开始讨论
summary = group.discuss(max_rounds=5)
print("\n=== 讨论总结 ===")
print(json.dumps(summary, ensure_ascii=False, indent=2))小结
多 Agent 系统能够解决更复杂的任务:
核心要点
多 Agent 优势
- 专业化分工
- 并行执行
- 错误隔离
- 易于扩展
规划机制
- 任务分解
- 动态调整
- 依赖管理
- 进度监控
通信模式
- 层次式:Manager 协调 Worker
- 平等式:Agent 互相通信
- 网络式:复杂拓扑结构
主流框架
- AutoGen:微软的对话式框架
- CrewAI:角色明确的流程框架
- LangGraph:状态机驱动的可控框架
实践建议
- 明确每个 Agent 的角色
- 设计清晰的通信协议
- 实现错误隔离和恢复
- 记录和分析 Agent 行为
下一篇文章将介绍 OpenAI 落地实践。
赞赏博主
评论 隐私政策