Back to Blog
Multi-AgentPatternsArchitecture

7 Design Patterns for Multi-Agent Collaboration: From Division to Emergence

2026-04-0113 min

在复杂的 AI Agent 应用中,单个 Agent 往往难以完成所有任务。多 Agent 协作模式通过组织多个 Agent 协同工作,实现远超单 Agent 的能力。本文将详细介绍 7 种核心的多 Agent 协作模式。

模式总览

| 模式 | 核心思想 | 适用场景 | 复杂度 |
|------|---------|---------|--------|
| Supervisor | 中央调度 | 任务分解与分配 | 中 |
| Debate | 辩论求真 | 决策分析 | 中 |
| Voting | 民主投票 | 内容筛选 | 低 |
| Competition | 竞争优选 | 创意生成 | 低 |
| Pipeline | 流水线作业 | 内容生产 | 低 |
| Hierarchical | 层级管理 | 大型项目 | 高 |
| Swarm | 群体协作 | 探索性任务 | 高 |

1. Supervisor 模式(监督者模式)

架构

                 ┌──────────────────┐

│ Supervisor │

│ (任务分解与调度) │

└────────┬─────────┘

┌───────────┼───────────┐

│ │ │

┌─────▼─────┐ ┌───▼───┐ ┌────▼────┐

│ Worker A │ │Worker B│ │Worker C │

│ (搜索) │ │(分析) │ │(写作) │

└───────────┘ └───────┘ └─────────┘

核心实现

from dataclasses import dataclass

from typing import List, Optional

import json

@dataclass

class Task:

id: str

description: str

assigned_to: Optional[str]

status: str # "pending", "in_progress", "completed", "failed"

result: Optional[str] = None

dependencies: List[str] = None

class SupervisorAgent:

"""监督者 Agent:负责任务分解、分配和协调"""

def __init__(self, llm_client, workers: dict):

self.llm = llm_client

self.workers = workers # {"name": WorkerAgent}

self.task_queue: List[Task] = []

self.completed_tasks: List[Task] = []

async def execute(self, objective: str) -> str:

"""执行目标"""

# 1. 分解任务

tasks = await self._decompose_tasks(objective)

# 2. 分配并执行

while any(t.status != "completed" for t in tasks):

# 找到可以执行的任务(依赖已完成)

ready_tasks = [

t for t in tasks

if t.status == "pending"

and all(

any(ct.id == dep and ct.status == "completed"

for ct in tasks)

for dep in (t.dependencies or [])

)

]

if not ready_tasks:

break

# 并行执行就绪的任务

import asyncio

results = await asyncio.gather(*[

self._execute_task(task) for task in ready_tasks

])

for task, result in zip(ready_tasks, results):

task.status = "completed"

task.result = result

self.completed_tasks.append(task)

# 3. 汇总结果

return await self._synthesize_results(objective, tasks)

async def _decompose_tasks(self, objective: str) -> List[Task]:

"""将目标分解为子任务"""

worker_descriptions = "\n".join(

f"- {name}: {worker.description}"

for name, worker in self.workers.items()

)

prompt = f"""将以下目标分解为可执行的子任务。

目标: {objective}

可用的 Worker:

{worker_descriptions}

返回 JSON 格式:

[{{

"id": "task_1",

"description": "任务描述",

"assigned_to": "worker_name",

"dependencies": ["task_id"]

}}]

注意:

  • 每个任务应分配给最合适的 Worker
  • 标注任务间的依赖关系
  • 任务应足够具体,Worker 可以直接执行"""
  • response = await self.llm.generate(prompt)

    task_dicts = json.loads(response)

    return [

    Task(

    id=t["id"],

    description=t["description"],

    assigned_to=t["assigned_to"],

    status="pending",

    dependencies=t.get("dependencies", [])

    )

    for t in task_dicts

    ]

    async def _execute_task(self, task: Task) -> str:

    """执行单个任务"""

    worker = self.workers.get(task.assigned_to)

    if not worker:

    return f"错误:找不到 Worker {task.assigned_to}"

    task.status = "in_progress"

    # 收集依赖任务的结果作为上下文

    context = ""

    if task.dependencies:

    dep_results = [

    ct.result for ct in self.completed_tasks

    if ct.id in task.dependencies

    ]

    context = "\n".join(dep_results)

    return await worker.execute(task.description, context)

    async def _synthesize_results(self, objective: str,

    tasks: List[Task]) -> str:

    """汇总所有任务结果"""

    results_text = "\n\n".join(

    f"## {t.description}\n{t.result}"

    for t in tasks if t.result

    )

    prompt = f"""基于以下各子任务的结果,汇总生成最终回答。

    目标: {objective}

    子任务结果:

    {results_text}

    请提供一个完整、连贯的最终回答:"""

    return await self.llm.generate(prompt)

    class WorkerAgent:

    """Worker Agent:执行具体任务"""

    def __init__(self, name: str, description: str,

    llm_client, tools: list = None):

    self.name = name

    self.description = description

    self.llm = llm_client

    self.tools = tools or []

    async def execute(self, task: str, context: str = "") -> str:

    prompt = f"""你是一个 {self.name}。{self.description}

    {"相关上下文:\n" + context if context else ""}

    请执行以下任务:{task}"""

    return await self.llm.generate(prompt)

    2. Debate 模式(辩论模式)

    架构

        ┌────────────────────────────────────────┐
    

    │ Debate Arena │

    │ │

    │ ┌──────────┐ ┌──────────┐ ┌───────┐│

    │ │Proponent │↔│Opponent │↔│Judge ││

    │ │ (支持方) │ │ (反对方) │ │(裁判) ││

    │ └──────────┘ └──────────┘ └───────┘│

    │ ↑ ↑ ↑ │

    │ └──────────────┴────────────┘ │

    │ 多轮辩论 │

    └────────────────────────────────────────┘

    核心实现

    class DebateAgent:
    

    """辩论 Agent 系统"""

    def __init__(self, llm_client):

    self.llm = llm_client

    self.proponent = DebateParticipant("支持方", llm_client)

    self.opponent = DebateParticipant("反对方", llm_client)

    self.judge = JudgeAgent(llm_client)

    async def debate(self, topic: str, rounds: int = 3) -> dict:

    """执行辩论"""

    debate_history = []

    # 初始陈述

    proponent_arg = await self.proponent.initial_argument(topic, "support")

    opponent_arg = await self.opponent.initial_argument(topic, "oppose")

    debate_history.append({

    "round": 0,

    "proponent": proponent_arg,

    "opponent": opponent_arg

    })

    # 多轮辩论

    for round_num in range(1, rounds + 1):

    proponent_arg = await self.proponent.rebuttal(

    topic, opponent_arg, debate_history

    )

    opponent_arg = await self.opponent.rebuttal(

    topic, proponent_arg, debate_history

    )

    debate_history.append({

    "round": round_num,

    "proponent": proponent_arg,

    "opponent": opponent_arg

    })

    # 裁判判决

    verdict = await self.judge.evaluate(topic, debate_history)

    return {

    "topic": topic,

    "debate_history": debate_history,

    "verdict": verdict

    }

    class DebateParticipant:

    """辩论参与者"""

    def __init__(self, stance: str, llm_client):

    self.stance = stance

    self.llm = llm_client

    async def initial_argument(self, topic: str, position: str) -> str:

    prompt = f"""你是一位辩论选手,立场是{self.stance}。

    辩题: {topic}

    你的立场: {'支持' if position == 'support' else '反对'}

    请提供你的初始论点,要求:

  • 提出 2-3 个核心论据
  • 每个论据有事实或逻辑支撑
  • 语言有说服力但保持理性"""
  • return await self.llm.generate(prompt)

    async def rebuttal(self, topic: str, opponent_arg: str,

    history: list) -> str:

    history_text = "\n\n".join(

    f"第 {h['round']} 轮:\n支持方: {h['proponent']}\n反对方: {h['opponent']}"

    for h in history

    )

    prompt = f"""辩题: {topic}

    你的立场: {self.stance}

    辩论历史:

    {history_text}

    对方最新论点:

    {opponent_arg}

    请反驳对方的论点,并加强你的立场。要求:

  • 直接回应对方的关键论据
  • 指出对方论证的漏洞
  • 补充新的支持论据"""
  • return await self.llm.generate(prompt)

    class JudgeAgent:

    """裁判 Agent"""

    def __init__(self, llm_client):

    self.llm = llm_client

    async def evaluate(self, topic: str, debate_history: list) -> dict:

    debate_text = "\n\n".join(

    f"第 {h['round']} 轮:\n支持方: {h['proponent']}\n反对方: {h['opponent']}"

    for h in debate_history

    )

    prompt = f"""你是一位公正的辩论裁判。

    辩题: {topic}

    完整辩论记录:

    {debate_text}

    请评估双方表现并给出判决。返回 JSON:

    {{

    "winner": "proponent|opponent|tie",

    "reasoning": "判决理由",

    "proponent_score": 0-10,

    "opponent_score": 0-10,

    "key_turning_points": ["关键转折点"],

    "strengths": {{"proponent": "...", "opponent": "..."}},

    "weaknesses": {{"proponent": "...", "opponent": "..."}}

    }}"""

    response = await self.llm.generate(prompt)

    return json.loads(response)

    3. Voting 模式(投票模式)

    架构

                  ┌─────────────┐
    

    │ Voter 1 │

    └──────┬──────┘

    ┌────────┐ ┌───▼───┐ ┌────────┐

    │Voter 2 │→│Aggre- │←│Voter 3 │

    └────────┘ │gator │ └────────┘

    └───┬───┘

    ┌─────▼─────┐

    │ 最终结果 │

    └───────────┘

    核心实现

    class VotingSystem:
    

    """多 Agent 投票系统"""

    def __init__(self, llm_client, num_voters: int = 3):

    self.llm = llm_client

    self.voters = [

    VotingAgent(f"Voter_{i}", llm_client, perspective)

    for i, perspective in enumerate([

    "你是一个注重逻辑和事实的评估者",

    "你是一个注重创新和可行性的评估者",

    "你是一个注重用户体验和可读性的评估者"

    ][:num_voters])

    ]

    async def vote(self, question: str, options: list,

    criteria: list = None) -> dict:

    """执行投票"""

    # 并行收集投票

    import asyncio

    votes = await asyncio.gather(*[

    voter.cast_vote(question, options, criteria)

    for voter in self.voters

    ])

    # 聚合结果

    return self._aggregate_votes(votes, options)

    def _aggregate_votes(self, votes: list, options: list) -> dict:

    """聚合投票结果"""

    vote_counts = {opt: 0 for opt in options}

    all_reasoning = []

    for vote in votes:

    vote_counts[vote["choice"]] += 1

    all_reasoning.append({

    "voter": vote["voter"],

    "choice": vote["choice"],

    "reasoning": vote["reasoning"],

    "confidence": vote["confidence"]

    })

    winner = max(vote_counts, key=vote_counts.get)

    consensus = vote_counts[winner] / len(votes)

    return {

    "winner": winner,

    "vote_counts": vote_counts,

    "consensus": consensus,

    "all_reasoning": all_reasoning,

    "unanimous": consensus == 1.0

    }

    class VotingAgent:

    """投票 Agent"""

    def __init__(self, name: str, llm_client, perspective: str):

    self.name = name

    self.llm = llm_client

    self.perspective = perspective

    async def cast_vote(self, question: str, options: list,

    criteria: list = None) -> dict:

    options_text = "\n".join(f"{i+1}. {opt}" for i, opt in enumerate(options))

    criteria_text = "\n".join(f"- {c}" for c in (criteria or []))

    prompt = f"""{self.perspective}

    问题: {question}

    选项:

    {options_text}

    {"评估标准:\n" + criteria_text if criteria else ""}

    请从你的视角评估每个选项,并投票选出最佳选项。返回 JSON:

    {{

    "choice": "选项内容",

    "reasoning": "投票理由",

    "confidence": 0.0-1.0,

    "evaluations": {{"选项1": "评价", "选项2": "评价"}}

    }}"""

    response = await self.llm.generate(prompt)

    result = json.loads(response)

    result["voter"] = self.name

    return result

    4. Competition 模式(竞争模式)

    架构

        ┌──────────┐  ┌──────────┐  ┌──────────┐
    

    │Contender │ │Contender │ │Contender │

    │ A │ │ B │ │ C │

    └────┬─────┘ └────┬─────┘ └────┬─────┘

    │ │ │

    ▼ ▼ ▼

    ┌──────────────────────────────────────┐

    │ Evaluator │

    │ (评估最优方案) │

    └──────────────────────────────────────┘

    核心实现

    class CompetitionSystem:
    

    """竞争系统:多个 Agent 竞争生成最优方案"""

    def __init__(self, llm_client, num_contenders: int = 3):

    self.llm = llm_client

    self.contenders = [

    CreativeContender(f"Contender_{i}", llm_client, style)

    for i, style in enumerate([

    "保守稳健", "创新激进", "平衡折中"

    ][:num_contenders])

    ]

    self.evaluator = CompetitionEvaluator(llm_client)

    async def compete(self, challenge: str, evaluation_criteria: list) -> dict:

    """执行竞争"""

    # 并行生成方案

    import asyncio

    solutions = await asyncio.gather(*[

    contender.generate_solution(challenge)

    for contender in self.contenders

    ])

    # 评估所有方案

    evaluation = await self.evaluator.evaluate(

    challenge, solutions, evaluation_criteria

    )

    return {

    "challenge": challenge,

    "solutions": [

    {"contender": c.name, "solution": s}

    for c, s in zip(self.contenders, solutions)

    ],

    "evaluation": evaluation

    }

    class CreativeContender:

    """创意竞争者"""

    def __init__(self, name: str, llm_client, style: str):

    self.name = name

    self.llm = llm_client

    self.style = style

    async def generate_solution(self, challenge: str) -> str:

    prompt = f"""你是一个{self.style}风格的问题解决者。

    挑战: {challenge}

    请提供你的解决方案。发挥你的{self.style}风格特色。"""

    return await self.llm.generate(prompt)

    class CompetitionEvaluator:

    """竞争评估者"""

    def __init__(self, llm_client):

    self.llm = llm_client

    async def evaluate(self, challenge: str, solutions: list,

    criteria: list) -> dict:

    solutions_text = "\n\n".join(

    f"方案 {i+1}:\n{sol}" for i, sol in enumerate(solutions)

    )

    criteria_text = "\n".join(f"- {c}" for c in criteria)

    prompt = f"""挑战: {challenge}

    评估标准:

    {criteria_text}

    方案:

    {solutions_text}

    请评估所有方案并选出最优。返回 JSON:

    {{

    "scores": [{{"solution": 1, "score": 0-10, "feedback": "..."}}, ...],

    "winner": 1,

    "reasoning": "选择理由",

    "synthesized_best": "综合各方案优点的改进版本"

    }}"""

    response = await self.llm.generate(prompt)

    return json.loads(response)

    5. Pipeline 模式(流水线模式)

    架构

    ┌────────┐    ┌────────┐    ┌────────┐    ┌────────┐
    

    │ Stage1 │───▶│ Stage2 │───▶│ Stage3 │───▶│ Stage4 │

    │ (采集) │ │ (清洗) │ │ (分析) │ │ (输出) │

    └────────┘ └────────┘ └────────┘ └────────┘

    核心实现

    from abc import ABC, abstractmethod
    
    

    class PipelineStage(ABC):

    """流水线阶段基类"""

    @abstractmethod

    async def process(self, input_data: dict) -> dict:

    pass

    class DataCollectionStage(PipelineStage):

    """数据采集阶段"""

    def __init__(self, tools: list):

    self.tools = tools

    async def process(self, input_data: dict) -> dict:

    query = input_data.get("query", "")

    collected = []

    for tool in self.tools:

    result = await tool.run(query)

    collected.extend(result)

    return {**input_data, "raw_data": collected}

    class AnalysisStage(PipelineStage):

    """分析阶段"""

    def __init__(self, llm_client):

    self.llm = llm_client

    async def process(self, input_data: dict) -> dict:

    data = input_data.get("raw_data", [])

    prompt = f"分析以下数据并提取关键洞察:\n{data}"

    analysis = await self.llm.generate(prompt)

    return {**input_data, "analysis": analysis}

    class ReportStage(PipelineStage):

    """报告生成阶段"""

    def __init__(self, llm_client):

    self.llm = llm_client

    async def process(self, input_data: dict) -> dict:

    analysis = input_data.get("analysis", "")

    prompt = f"基于以下分析生成报告:\n{analysis}"

    report = await self.llm.generate(prompt)

    return {**input_data, "report": report}

    class AgentPipeline:

    """Agent 流水线"""

    def __init__(self, stages: List[PipelineStage]):

    self.stages = stages

    async def execute(self, initial_input: dict) -> dict:

    data = initial_input

    for stage in self.stages:

    data = await stage.process(data)

    return data

    6. Hierarchical 模式(层级模式)

    架构

                        ┌─────────────┐
    

    │ CEO Agent │

    └──────┬──────┘

    ┌────────────┼────────────┐

    │ │ │

    ┌─────▼─────┐ ┌───▼────┐ ┌────▼────┐

    │ Tech Lead │ │PM Lead │ │Ops Lead │

    └─────┬─────┘ └───┬────┘ └────┬────┘

    │ │ │

    ┌─────┼─────┐ ... ...

    │ │ │

    ┌─▼─┐ ┌▼──┐ ┌▼──┐

    │Dev│ │Dev│ │Dev│

    └───┘ └───┘ └───┘

    核心实现

    class HierarchicalAgent:
    

    """层级 Agent 系统"""

    def __init__(self, llm_client):

    self.llm = llm_client

    # 构建层级结构

    self.ceo = ManagerAgent("CEO", llm_client, "最高决策者")

    self.tech_lead = ManagerAgent("TechLead", llm_client, "技术团队负责人")

    self.pm_lead = ManagerAgent("PMLead", llm_client, "产品团队负责人")

    self.devs = [

    WorkerAgent(f"Developer_{i}", "软件开发", llm_client)

    for i in range(3)

    ]

    # 建立报告关系

    self.ceo.subordinates = [self.tech_lead, self.pm_lead]

    self.tech_lead.subordinates = self.devs

    async def execute(self, objective: str) -> dict:

    """执行目标"""

    return await self.ceo.delegate(objective)

    class ManagerAgent:

    """管理者 Agent"""

    def __init__(self, name: str, llm_client, role: str):

    self.name = name

    self.llm = llm_client

    self.role = role

    self.subordinates = []

    async def delegate(self, objective: str, context: str = "") -> dict:

    """委派任务"""

    if not self.subordinates:

    # 没有下属,自己执行

    return {"result": await self._self_execute(objective)}

    # 分解任务并委派

    sub_objectives = await self._decompose_for_team(objective)

    import asyncio

    results = await asyncio.gather(*[

    subordinate.delegate(sub_obj["objective"])

    for subordinate, sub_obj in zip(self.subordinates, sub_objectives)

    if sub_obj.get("assigned_to") == subordinate.name

    ])

    # 汇总下属结果

    return await self._synthesize(objective, results)

    async def _decompose_for_team(self, objective: str) -> list:

    team_desc = ", ".join(f"{s.name}({s.role})" for s in self.subordinates)

    prompt = f"""作为 {self.name}({self.role}),将以下目标分解给团队成员。

    目标: {objective}

    团队: {team_desc}

    返回 JSON: [{{"assigned_to": "...", "objective": "..."}}]"""

    response = await self.llm.generate(prompt)

    return json.loads(response)

    async def _synthesize(self, objective: str, results: list) -> dict:

    results_text = str(results)

    prompt = f"综合以下团队结果,生成关于 '{objective}' 的总结:\n{results_text}"

    summary = await self.llm.generate(prompt)

    return {"result": summary, "sub_results": results}

    7. Swarm 模式(群体模式)

    架构

        ┌─────────────────────────────────────────────┐
    

    │ Swarm Network │

    │ │

    │ ○─────○─────○ │

    │ │╲ ╱│╲ ╱│ ○ = Agent │

    │ │ ╲ ╱ │ ╲ ╱ │ ─ = 通信链路 │

    │ │ ╳ │ ╳ │ │

    │ │ ╱ ╲ │ ╱ ╲ │ 无中心节点 │

    │ ○─────○─────○ 自组织协作 │

    │ ╲ │ ╱ │

    │ ○│○ │

    └─────────────────────────────────────────────┘

    核心实现

    class SwarmAgent:
    

    """群体 Agent 系统"""

    def __init__(self, llm_client, num_agents: int = 5):

    self.llm = llm_client

    self.agents = [

    SwarmMember(f"Agent_{i}", llm_client, capabilities)

    for i, capabilities in enumerate([

    ["搜索", "信息收集"],

    ["分析", "推理"],

    ["写作", "总结"],

    ["编程", "技术"],

    ["创意", "设计"]

    ][:num_agents])

    ]

    self.message_bus = MessageBus()

    async def swarm_solve(self, problem: str) -> dict:

    """群体协作解决问题"""

    # 1. 广播问题

    await self.message_bus.broadcast({

    "type": "problem",

    "content": problem

    })

    # 2. Agent 自组织响应

    contributions = []

    for agent in self.agents:

    response = await agent.respond_to_problem(problem)

    if response:

    contributions.append({

    "agent": agent.name,

    "contribution": response

    })

    # 广播贡献

    await self.message_bus.broadcast({

    "type": "contribution",

    "from": agent.name,

    "content": response

    })

    # 3. 多轮协作

    for round_num in range(3):

    new_contributions = []

    for agent in self.agents:

    messages = await self.message_bus.get_messages_for(agent.name)

    response = await agent.collaborate(messages)

    if response:

    new_contributions.append({

    "agent": agent.name,

    "contribution": response

    })

    await self.message_bus.broadcast({

    "type": "refinement",

    "from": agent.name,

    "content": response

    })

    contributions.extend(new_contributions)

    # 4. 汇总群体智慧

    return await self._emerge_solution(problem, contributions)

    async def _emerge_solution(self, problem: str,

    contributions: list) -> str:

    contrib_text = "\n".join(

    f"[{c['agent']}]: {c['contribution']}" for c in contributions

    )

    prompt = f"""以下是一群 Agent 对问题的贡献,请综合这些贡献生成最终方案。

    问题: {problem}

    Agent 贡献:

    {contrib_text}

    请生成一个综合的、高质量的最终方案:"""

    return await self.llm.generate(prompt)

    class MessageBus:

    """消息总线"""

    def __init__(self):

    self.messages = []

    self.subscriptions = {}

    async def broadcast(self, message: dict):

    self.messages.append({

    **message,

    "timestamp": time.time()

    })

    async def get_messages_for(self, agent_name: str,

    since: float = 0) -> list:

    return [

    m for m in self.messages

    if m["timestamp"] > since and m.get("from") != agent_name

    ]

    模式选择指南

    | 场景 | 推荐模式 | 原因 |
    |------|---------|------|
    | 产品需求分析 | Debate | 需要多方观点碰撞 |
    | 内容质量审核 | Voting | 需要多数共识 |
    | 广告文案生成 | Competition | 需要创意竞争 |
    | 数据处理流水线 | Pipeline | 串行处理流程 |
    | 大型项目管理 | Hierarchical | 明确的汇报关系 |
    | 开放性探索 | Swarm | 需要涌现式创新 |
    | 日常任务调度 | Supervisor | 灵活的任务分配 |

    总结

    多 Agent 协作模式为构建复杂 AI 系统提供了丰富的设计模式。从简单的 Pipeline 到复杂的 Swarm,每种模式都有其独特的价值。在实际项目中,往往需要组合多种模式来应对复杂的业务需求。选择合适的协作模式,是构建高效多 Agent 系统的关键。