7 Design Patterns for Multi-Agent Collaboration: From Division to Emergence
在复杂的 AI Agent 应用中,单个 Agent 往往难以完成所有任务。多 Agent 协作模式通过组织多个 Agent 协同工作,实现远超单 Agent 的能力。本文将详细介绍 7 种核心的多 Agent 协作模式。
模式总览
1. Supervisor 模式(监督者模式)
架构
┌──────────────────┐
│ Supervisor │
│ (任务分解与调度) │
└────────┬─────────┘
│
┌───────────┼───────────┐
│ │ │
┌─────▼─────┐ ┌───▼───┐ ┌────▼────┐
│ Worker A │ │Worker B│ │Worker C │
│ (搜索) │ │(分析) │ │(写作) │
└───────────┘ └───────┘ └─────────┘
核心实现
from dataclasses import dataclass
from typing import List, Optional
import json
@dataclass
class Task:
id: str
description: str
assigned_to: Optional[str]
status: str # "pending", "in_progress", "completed", "failed"
result: Optional[str] = None
dependencies: List[str] = None
class SupervisorAgent:
"""监督者 Agent:负责任务分解、分配和协调"""
def __init__(self, llm_client, workers: dict):
self.llm = llm_client
self.workers = workers # {"name": WorkerAgent}
self.task_queue: List[Task] = []
self.completed_tasks: List[Task] = []
async def execute(self, objective: str) -> str:
"""执行目标"""
# 1. 分解任务
tasks = await self._decompose_tasks(objective)
# 2. 分配并执行
while any(t.status != "completed" for t in tasks):
# 找到可以执行的任务(依赖已完成)
ready_tasks = [
t for t in tasks
if t.status == "pending"
and all(
any(ct.id == dep and ct.status == "completed"
for ct in tasks)
for dep in (t.dependencies or [])
)
]
if not ready_tasks:
break
# 并行执行就绪的任务
import asyncio
results = await asyncio.gather(*[
self._execute_task(task) for task in ready_tasks
])
for task, result in zip(ready_tasks, results):
task.status = "completed"
task.result = result
self.completed_tasks.append(task)
# 3. 汇总结果
return await self._synthesize_results(objective, tasks)
async def _decompose_tasks(self, objective: str) -> List[Task]:
"""将目标分解为子任务"""
worker_descriptions = "\n".join(
f"- {name}: {worker.description}"
for name, worker in self.workers.items()
)
prompt = f"""将以下目标分解为可执行的子任务。
目标: {objective}
可用的 Worker:
{worker_descriptions}
返回 JSON 格式:
[{{
"id": "task_1",
"description": "任务描述",
"assigned_to": "worker_name",
"dependencies": ["task_id"]
}}]
注意:
每个任务应分配给最合适的 Worker
标注任务间的依赖关系
任务应足够具体,Worker 可以直接执行"""
response = await self.llm.generate(prompt)
task_dicts = json.loads(response)
return [
Task(
id=t["id"],
description=t["description"],
assigned_to=t["assigned_to"],
status="pending",
dependencies=t.get("dependencies", [])
)
for t in task_dicts
]
async def _execute_task(self, task: Task) -> str:
"""执行单个任务"""
worker = self.workers.get(task.assigned_to)
if not worker:
return f"错误:找不到 Worker {task.assigned_to}"
task.status = "in_progress"
# 收集依赖任务的结果作为上下文
context = ""
if task.dependencies:
dep_results = [
ct.result for ct in self.completed_tasks
if ct.id in task.dependencies
]
context = "\n".join(dep_results)
return await worker.execute(task.description, context)
async def _synthesize_results(self, objective: str,
tasks: List[Task]) -> str:
"""汇总所有任务结果"""
results_text = "\n\n".join(
f"## {t.description}\n{t.result}"
for t in tasks if t.result
)
prompt = f"""基于以下各子任务的结果,汇总生成最终回答。
目标: {objective}
子任务结果:
{results_text}
请提供一个完整、连贯的最终回答:"""
return await self.llm.generate(prompt)
class WorkerAgent:
"""Worker Agent:执行具体任务"""
def __init__(self, name: str, description: str,
llm_client, tools: list = None):
self.name = name
self.description = description
self.llm = llm_client
self.tools = tools or []
async def execute(self, task: str, context: str = "") -> str:
prompt = f"""你是一个 {self.name}。{self.description}
{"相关上下文:\n" + context if context else ""}
请执行以下任务:{task}"""
return await self.llm.generate(prompt)
2. Debate 模式(辩论模式)
架构
┌────────────────────────────────────────┐
│ Debate Arena │
│ │
│ ┌──────────┐ ┌──────────┐ ┌───────┐│
│ │Proponent │↔│Opponent │↔│Judge ││
│ │ (支持方) │ │ (反对方) │ │(裁判) ││
│ └──────────┘ └──────────┘ └───────┘│
│ ↑ ↑ ↑ │
│ └──────────────┴────────────┘ │
│ 多轮辩论 │
└────────────────────────────────────────┘
核心实现
class DebateAgent:
"""辩论 Agent 系统"""
def __init__(self, llm_client):
self.llm = llm_client
self.proponent = DebateParticipant("支持方", llm_client)
self.opponent = DebateParticipant("反对方", llm_client)
self.judge = JudgeAgent(llm_client)
async def debate(self, topic: str, rounds: int = 3) -> dict:
"""执行辩论"""
debate_history = []
# 初始陈述
proponent_arg = await self.proponent.initial_argument(topic, "support")
opponent_arg = await self.opponent.initial_argument(topic, "oppose")
debate_history.append({
"round": 0,
"proponent": proponent_arg,
"opponent": opponent_arg
})
# 多轮辩论
for round_num in range(1, rounds + 1):
proponent_arg = await self.proponent.rebuttal(
topic, opponent_arg, debate_history
)
opponent_arg = await self.opponent.rebuttal(
topic, proponent_arg, debate_history
)
debate_history.append({
"round": round_num,
"proponent": proponent_arg,
"opponent": opponent_arg
})
# 裁判判决
verdict = await self.judge.evaluate(topic, debate_history)
return {
"topic": topic,
"debate_history": debate_history,
"verdict": verdict
}
class DebateParticipant:
"""辩论参与者"""
def __init__(self, stance: str, llm_client):
self.stance = stance
self.llm = llm_client
async def initial_argument(self, topic: str, position: str) -> str:
prompt = f"""你是一位辩论选手,立场是{self.stance}。
辩题: {topic}
你的立场: {'支持' if position == 'support' else '反对'}
请提供你的初始论点,要求:
提出 2-3 个核心论据
每个论据有事实或逻辑支撑
语言有说服力但保持理性"""
return await self.llm.generate(prompt)
async def rebuttal(self, topic: str, opponent_arg: str,
history: list) -> str:
history_text = "\n\n".join(
f"第 {h['round']} 轮:\n支持方: {h['proponent']}\n反对方: {h['opponent']}"
for h in history
)
prompt = f"""辩题: {topic}
你的立场: {self.stance}
辩论历史:
{history_text}
对方最新论点:
{opponent_arg}
请反驳对方的论点,并加强你的立场。要求:
直接回应对方的关键论据
指出对方论证的漏洞
补充新的支持论据"""
return await self.llm.generate(prompt)
class JudgeAgent:
"""裁判 Agent"""
def __init__(self, llm_client):
self.llm = llm_client
async def evaluate(self, topic: str, debate_history: list) -> dict:
debate_text = "\n\n".join(
f"第 {h['round']} 轮:\n支持方: {h['proponent']}\n反对方: {h['opponent']}"
for h in debate_history
)
prompt = f"""你是一位公正的辩论裁判。
辩题: {topic}
完整辩论记录:
{debate_text}
请评估双方表现并给出判决。返回 JSON:
{{
"winner": "proponent|opponent|tie",
"reasoning": "判决理由",
"proponent_score": 0-10,
"opponent_score": 0-10,
"key_turning_points": ["关键转折点"],
"strengths": {{"proponent": "...", "opponent": "..."}},
"weaknesses": {{"proponent": "...", "opponent": "..."}}
}}"""
response = await self.llm.generate(prompt)
return json.loads(response)
3. Voting 模式(投票模式)
架构
┌─────────────┐
│ Voter 1 │
└──────┬──────┘
│
┌────────┐ ┌───▼───┐ ┌────────┐
│Voter 2 │→│Aggre- │←│Voter 3 │
└────────┘ │gator │ └────────┘
└───┬───┘
│
┌─────▼─────┐
│ 最终结果 │
└───────────┘
核心实现
class VotingSystem:
"""多 Agent 投票系统"""
def __init__(self, llm_client, num_voters: int = 3):
self.llm = llm_client
self.voters = [
VotingAgent(f"Voter_{i}", llm_client, perspective)
for i, perspective in enumerate([
"你是一个注重逻辑和事实的评估者",
"你是一个注重创新和可行性的评估者",
"你是一个注重用户体验和可读性的评估者"
][:num_voters])
]
async def vote(self, question: str, options: list,
criteria: list = None) -> dict:
"""执行投票"""
# 并行收集投票
import asyncio
votes = await asyncio.gather(*[
voter.cast_vote(question, options, criteria)
for voter in self.voters
])
# 聚合结果
return self._aggregate_votes(votes, options)
def _aggregate_votes(self, votes: list, options: list) -> dict:
"""聚合投票结果"""
vote_counts = {opt: 0 for opt in options}
all_reasoning = []
for vote in votes:
vote_counts[vote["choice"]] += 1
all_reasoning.append({
"voter": vote["voter"],
"choice": vote["choice"],
"reasoning": vote["reasoning"],
"confidence": vote["confidence"]
})
winner = max(vote_counts, key=vote_counts.get)
consensus = vote_counts[winner] / len(votes)
return {
"winner": winner,
"vote_counts": vote_counts,
"consensus": consensus,
"all_reasoning": all_reasoning,
"unanimous": consensus == 1.0
}
class VotingAgent:
"""投票 Agent"""
def __init__(self, name: str, llm_client, perspective: str):
self.name = name
self.llm = llm_client
self.perspective = perspective
async def cast_vote(self, question: str, options: list,
criteria: list = None) -> dict:
options_text = "\n".join(f"{i+1}. {opt}" for i, opt in enumerate(options))
criteria_text = "\n".join(f"- {c}" for c in (criteria or []))
prompt = f"""{self.perspective}
问题: {question}
选项:
{options_text}
{"评估标准:\n" + criteria_text if criteria else ""}
请从你的视角评估每个选项,并投票选出最佳选项。返回 JSON:
{{
"choice": "选项内容",
"reasoning": "投票理由",
"confidence": 0.0-1.0,
"evaluations": {{"选项1": "评价", "选项2": "评价"}}
}}"""
response = await self.llm.generate(prompt)
result = json.loads(response)
result["voter"] = self.name
return result
4. Competition 模式(竞争模式)
架构
┌──────────┐ ┌──────────┐ ┌──────────┐
│Contender │ │Contender │ │Contender │
│ A │ │ B │ │ C │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────┐
│ Evaluator │
│ (评估最优方案) │
└──────────────────────────────────────┘
核心实现
class CompetitionSystem:
"""竞争系统:多个 Agent 竞争生成最优方案"""
def __init__(self, llm_client, num_contenders: int = 3):
self.llm = llm_client
self.contenders = [
CreativeContender(f"Contender_{i}", llm_client, style)
for i, style in enumerate([
"保守稳健", "创新激进", "平衡折中"
][:num_contenders])
]
self.evaluator = CompetitionEvaluator(llm_client)
async def compete(self, challenge: str, evaluation_criteria: list) -> dict:
"""执行竞争"""
# 并行生成方案
import asyncio
solutions = await asyncio.gather(*[
contender.generate_solution(challenge)
for contender in self.contenders
])
# 评估所有方案
evaluation = await self.evaluator.evaluate(
challenge, solutions, evaluation_criteria
)
return {
"challenge": challenge,
"solutions": [
{"contender": c.name, "solution": s}
for c, s in zip(self.contenders, solutions)
],
"evaluation": evaluation
}
class CreativeContender:
"""创意竞争者"""
def __init__(self, name: str, llm_client, style: str):
self.name = name
self.llm = llm_client
self.style = style
async def generate_solution(self, challenge: str) -> str:
prompt = f"""你是一个{self.style}风格的问题解决者。
挑战: {challenge}
请提供你的解决方案。发挥你的{self.style}风格特色。"""
return await self.llm.generate(prompt)
class CompetitionEvaluator:
"""竞争评估者"""
def __init__(self, llm_client):
self.llm = llm_client
async def evaluate(self, challenge: str, solutions: list,
criteria: list) -> dict:
solutions_text = "\n\n".join(
f"方案 {i+1}:\n{sol}" for i, sol in enumerate(solutions)
)
criteria_text = "\n".join(f"- {c}" for c in criteria)
prompt = f"""挑战: {challenge}
评估标准:
{criteria_text}
方案:
{solutions_text}
请评估所有方案并选出最优。返回 JSON:
{{
"scores": [{{"solution": 1, "score": 0-10, "feedback": "..."}}, ...],
"winner": 1,
"reasoning": "选择理由",
"synthesized_best": "综合各方案优点的改进版本"
}}"""
response = await self.llm.generate(prompt)
return json.loads(response)
5. Pipeline 模式(流水线模式)
架构
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Stage1 │───▶│ Stage2 │───▶│ Stage3 │───▶│ Stage4 │
│ (采集) │ │ (清洗) │ │ (分析) │ │ (输出) │
└────────┘ └────────┘ └────────┘ └────────┘
核心实现
from abc import ABC, abstractmethod
class PipelineStage(ABC):
"""流水线阶段基类"""
@abstractmethod
async def process(self, input_data: dict) -> dict:
pass
class DataCollectionStage(PipelineStage):
"""数据采集阶段"""
def __init__(self, tools: list):
self.tools = tools
async def process(self, input_data: dict) -> dict:
query = input_data.get("query", "")
collected = []
for tool in self.tools:
result = await tool.run(query)
collected.extend(result)
return {**input_data, "raw_data": collected}
class AnalysisStage(PipelineStage):
"""分析阶段"""
def __init__(self, llm_client):
self.llm = llm_client
async def process(self, input_data: dict) -> dict:
data = input_data.get("raw_data", [])
prompt = f"分析以下数据并提取关键洞察:\n{data}"
analysis = await self.llm.generate(prompt)
return {**input_data, "analysis": analysis}
class ReportStage(PipelineStage):
"""报告生成阶段"""
def __init__(self, llm_client):
self.llm = llm_client
async def process(self, input_data: dict) -> dict:
analysis = input_data.get("analysis", "")
prompt = f"基于以下分析生成报告:\n{analysis}"
report = await self.llm.generate(prompt)
return {**input_data, "report": report}
class AgentPipeline:
"""Agent 流水线"""
def __init__(self, stages: List[PipelineStage]):
self.stages = stages
async def execute(self, initial_input: dict) -> dict:
data = initial_input
for stage in self.stages:
data = await stage.process(data)
return data
6. Hierarchical 模式(层级模式)
架构
┌─────────────┐
│ CEO Agent │
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
┌─────▼─────┐ ┌───▼────┐ ┌────▼────┐
│ Tech Lead │ │PM Lead │ │Ops Lead │
└─────┬─────┘ └───┬────┘ └────┬────┘
│ │ │
┌─────┼─────┐ ... ...
│ │ │
┌─▼─┐ ┌▼──┐ ┌▼──┐
│Dev│ │Dev│ │Dev│
└───┘ └───┘ └───┘
核心实现
class HierarchicalAgent:
"""层级 Agent 系统"""
def __init__(self, llm_client):
self.llm = llm_client
# 构建层级结构
self.ceo = ManagerAgent("CEO", llm_client, "最高决策者")
self.tech_lead = ManagerAgent("TechLead", llm_client, "技术团队负责人")
self.pm_lead = ManagerAgent("PMLead", llm_client, "产品团队负责人")
self.devs = [
WorkerAgent(f"Developer_{i}", "软件开发", llm_client)
for i in range(3)
]
# 建立报告关系
self.ceo.subordinates = [self.tech_lead, self.pm_lead]
self.tech_lead.subordinates = self.devs
async def execute(self, objective: str) -> dict:
"""执行目标"""
return await self.ceo.delegate(objective)
class ManagerAgent:
"""管理者 Agent"""
def __init__(self, name: str, llm_client, role: str):
self.name = name
self.llm = llm_client
self.role = role
self.subordinates = []
async def delegate(self, objective: str, context: str = "") -> dict:
"""委派任务"""
if not self.subordinates:
# 没有下属,自己执行
return {"result": await self._self_execute(objective)}
# 分解任务并委派
sub_objectives = await self._decompose_for_team(objective)
import asyncio
results = await asyncio.gather(*[
subordinate.delegate(sub_obj["objective"])
for subordinate, sub_obj in zip(self.subordinates, sub_objectives)
if sub_obj.get("assigned_to") == subordinate.name
])
# 汇总下属结果
return await self._synthesize(objective, results)
async def _decompose_for_team(self, objective: str) -> list:
team_desc = ", ".join(f"{s.name}({s.role})" for s in self.subordinates)
prompt = f"""作为 {self.name}({self.role}),将以下目标分解给团队成员。
目标: {objective}
团队: {team_desc}
返回 JSON: [{{"assigned_to": "...", "objective": "..."}}]"""
response = await self.llm.generate(prompt)
return json.loads(response)
async def _synthesize(self, objective: str, results: list) -> dict:
results_text = str(results)
prompt = f"综合以下团队结果,生成关于 '{objective}' 的总结:\n{results_text}"
summary = await self.llm.generate(prompt)
return {"result": summary, "sub_results": results}
7. Swarm 模式(群体模式)
架构
┌─────────────────────────────────────────────┐
│ Swarm Network │
│ │
│ ○─────○─────○ │
│ │╲ ╱│╲ ╱│ ○ = Agent │
│ │ ╲ ╱ │ ╲ ╱ │ ─ = 通信链路 │
│ │ ╳ │ ╳ │ │
│ │ ╱ ╲ │ ╱ ╲ │ 无中心节点 │
│ ○─────○─────○ 自组织协作 │
│ ╲ │ ╱ │
│ ○│○ │
└─────────────────────────────────────────────┘
核心实现
class SwarmAgent:
"""群体 Agent 系统"""
def __init__(self, llm_client, num_agents: int = 5):
self.llm = llm_client
self.agents = [
SwarmMember(f"Agent_{i}", llm_client, capabilities)
for i, capabilities in enumerate([
["搜索", "信息收集"],
["分析", "推理"],
["写作", "总结"],
["编程", "技术"],
["创意", "设计"]
][:num_agents])
]
self.message_bus = MessageBus()
async def swarm_solve(self, problem: str) -> dict:
"""群体协作解决问题"""
# 1. 广播问题
await self.message_bus.broadcast({
"type": "problem",
"content": problem
})
# 2. Agent 自组织响应
contributions = []
for agent in self.agents:
response = await agent.respond_to_problem(problem)
if response:
contributions.append({
"agent": agent.name,
"contribution": response
})
# 广播贡献
await self.message_bus.broadcast({
"type": "contribution",
"from": agent.name,
"content": response
})
# 3. 多轮协作
for round_num in range(3):
new_contributions = []
for agent in self.agents:
messages = await self.message_bus.get_messages_for(agent.name)
response = await agent.collaborate(messages)
if response:
new_contributions.append({
"agent": agent.name,
"contribution": response
})
await self.message_bus.broadcast({
"type": "refinement",
"from": agent.name,
"content": response
})
contributions.extend(new_contributions)
# 4. 汇总群体智慧
return await self._emerge_solution(problem, contributions)
async def _emerge_solution(self, problem: str,
contributions: list) -> str:
contrib_text = "\n".join(
f"[{c['agent']}]: {c['contribution']}" for c in contributions
)
prompt = f"""以下是一群 Agent 对问题的贡献,请综合这些贡献生成最终方案。
问题: {problem}
Agent 贡献:
{contrib_text}
请生成一个综合的、高质量的最终方案:"""
return await self.llm.generate(prompt)
class MessageBus:
"""消息总线"""
def __init__(self):
self.messages = []
self.subscriptions = {}
async def broadcast(self, message: dict):
self.messages.append({
**message,
"timestamp": time.time()
})
async def get_messages_for(self, agent_name: str,
since: float = 0) -> list:
return [
m for m in self.messages
if m["timestamp"] > since and m.get("from") != agent_name
]
模式选择指南
总结
多 Agent 协作模式为构建复杂 AI 系统提供了丰富的设计模式。从简单的 Pipeline 到复杂的 Swarm,每种模式都有其独特的价值。在实际项目中,往往需要组合多种模式来应对复杂的业务需求。选择合适的协作模式,是构建高效多 Agent 系统的关键。