多 Agent 编排策略

📑 目录

概述

拓扑排序算法与死锁检测的分布式系统设计

多 Agent 编排的本质是分布式任务调度。当系统中存在多个 Agent,且它们之间可以相互调用或交接时,就形成了一个分布式系统。这个系统面临的核心挑战与传统分布式系统相同:死锁、活锁、消息丢失和状态不一致。

从图论视角来看,Agent 之间的调用关系可以建模为一个有向图:节点是 Agent,边是调用/交接关系。如果这个图中存在环(如 A -> B -> C -> A),就意味着理论上存在无限循环的可能。编排策略的好坏,很大程度上取决于它如何处理图中的环和瓶颈节点。

常见的多 Agent 拓扑结构包括:

  1. 星型拓扑:一个中心 Manager 连接多个 Worker。结构简单,但 Manager 是单点瓶颈。
  2. 链式拓扑:Agent A -> Agent B -> Agent C -> …。适合流水线式处理,但容错性差,任何一环失败都会导致整个链条中断。
  3. 网状拓扑:任意 Agent 都可以与其他 Agent 交互。灵活性最高,但也最难调试和控制。
  4. 分层拓扑:多层 Manager,每层管理下一层的 Agent。适合大规模系统,但延迟较高。

死锁检测在多 Agent 系统中尤为重要。典型的死锁场景是:Agent A 持有资源 X 并等待 Agent B 释放资源 Y,而 Agent B 持有资源 Y 并等待 Agent A 释放资源 X。在 Agents SDK 中,"资源"通常表现为上下文对象或共享的 Session 状态。

SDK 目前没有内置的死锁检测机制,这需要开发者在架构层面预防:

  1. 超时机制:为每个子任务设置严格的超时,防止无限等待。
  2. 资源有序分配:如果多个 Agent 需要访问共享资源,规定统一的获取顺序。
  3. 会话隔离:为每个子任务创建独立的上下文,避免共享状态导致的竞态条件。

LLM 编排 vs 代码编排:两种策略的适用场景、优缺点和混合使用方案。

正文

相关阅读

参考文档

完整实战示例:分层编排的软件开发助手

以下示例展示了如何使用分层拓扑构建一个软件开发助手系统,包含需求分析、架构设计、代码实现和测试验证四个层级:

import asyncio
from agents import Agent, Runner, handoff


# 第一层:需求分析 Agent
requirement_agent = Agent(
    name="RequirementAnalyst",
    instructions="""
你是需求分析专家。将用户的模糊需求转化为清晰的软件功能规格说明。
输出格式:
- 功能列表(编号)
- 非功能需求(性能、安全)
- 优先级(P0/P1/P2)
""".strip(),
    model="gpt-5-nano")

多 Agent 编排模式对比

下图展示了四种常见编排模式的执行流程差异:
mermaid
flowchart TD
subgraph Pipeline["流水线"]
P1[A1] --> P2[A2] --> P3[A3] --> P4[输出]
end
subgraph Vote["投票"]
V1[输入] --> V2[A1]
V1 --> V3[A2]
V1 --> V4[A3]
V2 --> V5[聚合器]
V3 --> V5
V4 --> V5
V5 --> V6[输出]
end
subgraph Map["Map-Reduce"]
M1[输入] --> M2[拆分]
M2 --> M3[A1处理块1]
M2 --> M4[A2处理块2]
M2 --> M5[A3处理块3]
M3 --> M6[汇总]
M4 --> M6
M5 --> M6
M6 --> M7[输出]
end
subgraph HandoffC["Handoff链"]
H1[输入] --> H2{A1能处理?}
H2 -->|能| H3[A1输出]
H2 -->|不能| H4{A2能处理?}
H4 -->|能| H5[A2输出]
H4 -->|不能| H6[A3输出]
end


流水线模式适合有明确步骤的任务,投票模式适合需要高可靠性的场景,Map-Reduce 适合大规模并行处理,Handoff 链适合渐进式问题求解。

## 编排系统的可观测性设计

多 Agent 系统的调试难度远高于单 Agent 系统。当输出不符合预期时,需要追踪整个编排链路中的每一步决策。

**链路追踪**:

每个 Agent 的输入、输出和执行时间都应被记录到追踪系统中。对于编排模式,还需要记录编排器本身的决策逻辑(如为什么选择了 Agent B 而非 Agent C)。

```python
from agents.tracing import Span

async def pipeline_orchestrator(input_text: str) -> str:
    with Span(name="pipeline", input=input_text) as span:
        step1 = await Runner.run(agents["extractor"], input_text)
        span.child("extract", output=step1.final_output)
        step2 = await Runner.run(agents["transformer"], step1.final_output)
        span.child("transform", output=step2.final_output)
        step3 = await Runner.run(agents["loader"], step2.final_output)
        span.child("load", output=step3.final_output)
        return step3.final_output

调试面板

为编排系统构建一个可视化调试面板,实时展示各 Agent 的状态、输入输出和耗时。面板应支持:

  1. 链路回放:重放某次请求在编排系统中的完整执行过程
  2. 对比模式:并排展示不同编排策略对同一输入的处理结果
  3. 瓶颈分析:自动识别链路中的慢节点(高延迟 Agent、频繁重试的工具调用)

编排系统的另一个挑战是错误传播。当链路中某个 Agent 失败时,应该采取何种恢复策略?选项包括:跳过该步骤继续执行、使用默认值替代、降级到备用 Agent、或中断整个链路返回错误。策略的选择应基于业务容错性要求。不同编排策略在延迟、吞吐量和可维护性方面各有优劣:流水线模式的延迟等于各阶段延迟之和,适合对实时性要求不高的离线任务;投票模式虽然消耗更多 Token,但能显著降低单一 Agent 幻觉导致的错误率,在医疗诊断、金融审核等高风险场景值得投入;Map-Reduce 模式的 Reduce 阶段如果由 LLM 执行,需要注意上下文窗口限制,对于数百个 Map 结果,可能需要多级 Reduce,形成树状聚合结构。

常见问题与调试

问题一:层级过多导致延迟累积

每增加一个层级,就至少增加一次 LLM 调用延迟。四层系统在最坏情况下可能需要 4-8 秒才能产生第一个 token。优化方法:

  1. 对非关键层级使用更轻量的模型(如 gpt-5-nano)。
  2. 将相邻的轻量层级合并(如将需求分析和架构设计合并为一个 Agent)。
  3. 对独立子任务使用并行调用,而非串行流水线。

问题二:上下文在层级传递中丢失或变形

每一层 Agent 都可能对输入进行"理解-重构",导致原始信息在传递过程中逐渐失真(类似"传话游戏")。解决方案:

  1. 在层级间传递原始输入的摘要,而非让上层 Agent 重新描述。
  2. 使用结构化数据(如 JSON)传递关键信息,减少自然语言的歧义。
  3. 在关键节点增加校验 Agent,检查传递的信息是否完整准确。

问题三:循环依赖导致编排失败

如果设计不当,Agent A 可能通过 Handoff 到 B,B 又通过 as_tool 调用 A,形成循环。排查方法:

  1. 绘制 Agent 调用关系图,检查是否存在环。
  2. 为每个 Agent 的运行深度设置上限(如最多嵌套 3 层)。
  3. 在架构评审中强制要求说明各 Agent 的职责边界和调用约束。

与其他方案对比

维度Agents SDK 编排AutoGen GroupChatCrewAI Crew
拓扑灵活性高(任意图结构)中(群体聊天)低(预设流程)
延迟控制需手动优化较差(群体讨论慢)较好(流程固定)
可调试性中(trace 追踪)低(对话发散)高(步骤明确)
适用团队技术型团队研究型团队业务型团队

AutoGen 的 GroupChat 在模拟开放式协作方面无人能及,适合研究多智能体涌现行为。CrewAI 则通过高度预设的流程降低了使用门槛,适合业务团队快速搭建工作流。Agents SDK 的编排方式提供了最大的架构自由度,但也要求开发者具备更强的系统设计能力,能够自行处理死锁、超时和失败恢复等分布式系统经典问题。

设计模式在 Agent 编排中的工程实践

将多 Agent 系统从原型推向生产时,单纯依靠 SDK 提供的基础编排能力往往不够。引入成熟的设计模式可以显著提升系统的可维护性和扩展性,使代码在面对需求变更时更加从容。

状态机模式与工作流编排

当 Agent 的调用关系存在明确的阶段和状态转换时,**状态机模式(State Machine Pattern)**是最自然的抽象。例如一个软件开发助手系统可以定义为以下状态:需求收集、架构设计、代码编写、测试验证和部署交付。每个状态对应一个专门的 Agent,状态之间的转换由明确的业务规则控制。

状态机的优势在于显式化流转约束:任何非法的状态跳转都会在运行时被拦截,这比隐藏在复杂 if-else 中的逻辑更容易审查和测试。更重要的是,状态机天然支持可视化:你可以将当前状态和历史轨迹输出到监控面板,让运维人员一眼看出系统处于哪个阶段。当编排流程出现卡顿时,状态历史也是排查问题的关键线索。

在实现层面,建议将状态定义和转换规则分离。状态定义描述系统有哪些阶段,转换规则描述何时可以从 A 状态进入 B 状态。这种分离使得新增一个状态或修改转换条件时,不需要重写整个编排逻辑。对于需要人工介入的环节,状态机还可以引入暂停状态,等待外部信号后继续执行。

责任链模式与动态路由

当多个 Agent 都可以处理同一类请求,但优先级或专业能力不同时,**责任链模式(Chain of Responsibility Pattern)**提供了一种优雅的解耦方案。不同于硬编码的 if-elif 分支,责任链将每个处理者串联起来,请求沿着链条传递,直到某个处理者认为自己能够处理为止。

这种模式特别适合客服分诊、技术问题分级等场景。例如,用户提问"如何重置密码",链条上的第一个处理者是账户安全 Agent,它通过关键词匹配判断置信度。如果置信度超过阈值,就直接处理;否则传递给下一个通用客服 Agent。责任链的另一个优势是支持运行时动态重组:在促销期间,你可以将促销咨询 Agent 插入链条前端,而在平时则将其移除,完全不需要修改其他 Agent 的代码。

从性能角度考虑,责任链中的每个节点都应该配备快速的预筛选机制。使用嵌入向量的余弦相似度或 TF-IDF 加权的词频匹配,可以在毫秒级完成置信度估算,避免不必要的 LLM 调用。同时,建议为整个链条设置超时上限,防止请求在链条中无限传递。

模式组合的最佳实践

状态机和责任链并非互斥,在实际系统中常常组合使用。状态机负责宏观阶段管理,责任链负责微观任务分发。例如在架构设计阶段,状态机确保系统已经收集了足够的需求信息,然后触发责任链将设计任务分发给前端、后端和数据库三个专业 Agent。这种分层设计使得系统既具备全局可控性,又保留了局部的灵活性。

graph TD
    A[用户请求] --> B{需求分析Agent}
    B -->|需求清晰| C[架构设计Agent]
    B -->|需求模糊| D[澄清问题Agent]
    D --> B
    C --> E{复杂度评估}
    E -->|简单| F[代码实现Agent]
    E -->|复杂| G[模块拆分Agent]
    G --> H[并行编码Agent群]
    H --> I[集成测试Agent]
    F --> I
    I --> J{测试通过?}
    J -->|否| K[调试修复Agent]
    K --> I
    J -->|是| L[交付结果]
    
    style A fill:#4a90d9,color:#fff
    style L fill:#5cb85c,color:#fff
    style J fill:#f0ad4e,color:#fff

以下示例展示了状态机和责任链的组合实现:

from enum import Enum, auto
from abc import ABC, abstractmethod
from typing import Optional

class DevPhase(Enum):
    REQUIREMENT = auto()
    DESIGN = auto()
    CODING = auto()
    TESTING = auto()
    DEPLOYMENT = auto()

class DevWorkflow:
    """基于状态机的开发工作流编排器。"""
    
    TRANSITIONS = {
        DevPhase.REQUIREMENT: [DevPhase.DESIGN],
        DevPhase.DESIGN: [DevPhase.CODING],
        DevPhase.CODING: [DevPhase.TESTING],
        DevPhase.TESTING: [DevPhase.CODING, DevPhase.DEPLOYMENT],
        DevPhase.DEPLOYMENT: [],
    }
    
    def __init__(self):
        self.state = DevPhase.REQUIREMENT
        self.agents: dict[DevPhase, Agent] = dict()
        self.history: list[DevPhase] = []
    
    def register_agent(self, phase: DevPhase, agent: Agent):
        self.agents[phase] = agent
    
    def can_transition_to(self, next_phase: DevPhase) -> bool:
        return next_phase in self.TRANSITIONS.get(self.state, [])
    
    async def step(self, user_input: str) -> str:
        agent = self.agents.get(self.state)
        if not agent:
            raise RuntimeError(f"No agent for phase")
        result = await Runner.run(agent, user_input)
        next_phase = self._infer_next_phase(result.final_output)
        if self.can_transition_to(next_phase):
            self.history.append(self.state)
            self.state = next_phase
        return result.final_output

class Handler(ABC):
    def __init__(self):
        self._next: Optional[Handler] = None
    
    def set_next(self, handler: "Handler") -> "Handler":
        self._next = handler
        return handler
    
    @abstractmethod
    async def handle(self, request: str, context: dict) -> Optional[str]:
        pass

class PriorityAgentHandler(Handler):
    """优先级 Agent 处理器。"""
    
    def __init__(self, agent: Agent, threshold: float = 0.7):
        super().__init__()
        self.agent = agent
        self.threshold = threshold
    
    async def handle(self, request: str, context: dict) -> Optional[str]:
        confidence = await self._estimate_confidence(request)
        if confidence >= self.threshold:
            result = await Runner.run(self.agent, request, context=context)
            return result.final_output
        if self._next:
            return await self._next.handle(request, context)
        return None
    
    async def _estimate_confidence(self, request: str) -> float:
        keywords = self.agent.instructions.lower().split()
        score = sum(1 for kw in keywords if kw in request.lower())
        return min(score / max(len(keywords), 1) * 2, 1.0)

生产环境部署与性能优化

工作流引擎的实践要点

将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。

具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。

编排延迟分析的关键指标

监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。

除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。

分布式 Agent 协调的架构考量

随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。

在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。

运维团队的协作建议

技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。

在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。
编排策略的选择应基于业务指标而非技术偏好。建议在上线初期同时运行多种策略,通过 A/B 测试对比解决率、延迟和成本,用数据驱动最终决策。