Agent 作为工具

📑 目录

概述

递归调用限制与上下文传递的内部机制

agent.as_tool() 将一个 Agent 封装为另一个 Agent 可调用的工具。从实现层面看,被调用的 Agent 会启动一个子运行(sub-run):Runner 会创建新的运行上下文,将被调用 Agent 的 instructions 和输入注入其中,执行完毕后通过 custom_output_extractor 将结果提取为字符串返回给父 Agent。

这种机制的关键特性在于上下文隔离:子 Agent 默认看不到父 Agent 的对话历史,只能收到通过 input 参数传入的信息。这种隔离既是安全特性(防止信息泄露),也是限制(子 Agent 可能缺乏必要的上下文来做出正确决策)。

递归深度是另一个需要关注的点。虽然 SDK 没有硬编码的递归深度限制,但每一层嵌套都会增加 LLM 调用次数,从而线性增长延迟和成本。如果一个 Manager Agent 调用了 Expert Agent,而 Expert Agent 又通过 as_tool() 调用了 Sub-Expert Agent,三层嵌套意味着至少需要 3 次 LLM API 调用(不包括各层内部可能的工具调用)。

从设计哲学来看,as_tool() 模式体现了命令-查询分离原则:Manager Agent 负责决策和协调(命令),Expert Agent 负责执行具体的子任务并返回结果(查询)。这与函数式编程中的纯函数概念类似——Expert Agent 应该尽可能无副作用,只根据输入产生输出。

与 Handoff 模式相比,as_tool() 不会转移对话控制权,Manager 始终掌控全局。这适合" bounded subtask "(有界子任务)场景:任务有明确的输入和期望的输出格式,不需要子 Agent 与用户直接交互。

Agent.as_tool() 模式:让一个 Agent 成为另一个 Agent 的工具,实现管理器模式。

正文

相关阅读

参考文档

完整实战示例:分层任务分解与结果聚合

以下示例展示了如何在生产环境中使用 Agent-as-Tool 构建一个分层分析系统,包含输入校验、超时控制和结果格式化:

import asyncio
import json
from agents import Agent, Runner, function_tool


# 第一层:数据分析专家
analysis_agent = Agent(
    name="DataAnalyst",
    instructions="""
你是一个数据分析专家。给定一组数字  #计算:
1. 平均值(mean)
2. 中位数(median)
3. 标准差(std)
4. 趋势判断(上升/下降/平稳)

必须返回严格的 JSON 格式:
{"mean": float, "median": float, "std": float, "trend": str}
""".strip(),
    model="gpt-5-nano")

常见问题与调试

问题一:子 Agent 输出格式不符合预期

由于子 Agent 独立运行,它可能忽略父 Agent 传递的格式要求。解决方案:

  1. 在子 Agent 的 instructions 中使用强格式约束(如 "你必须只返回 JSON,不要添加任何解释")。
  2. 利用 custom_output_extractor 对子 Agent 的输出进行后处理和清洗。
  3. 在父 Agent 的 instructions 中提供子 Agent 输出的处理示例。

问题二:嵌套调用导致上下文爆炸

多层 as_tool() 嵌套会产生大量的中间输出,这些输出都会被保留在父 Agent 的对话历史中。调试方法:

  1. 限制 Manager Agent 的 max_turns,防止无限循环。
  2. custom_output_extractor 中对子 Agent 的输出进行摘要压缩。
  3. 如果子任务输出非常长,考虑只提取关键字段传给父 Agent。

问题三:子 Agent 陷入无限循环

子 Agent 内部也可能发生工具循环(如反复调用同一个工具)。由于子运行是独立的,父 Agent 无法直接干预。建议:

  1. 为每个子 Agent 设置保守的 max_turns(如 3-5)。
  2. 在子 Agent 的 instructions 中明确限制工具调用次数。
  3. 通过 trace 系统监控子运行的实际轮数,发现异常及时告警。

与其他方案对比

维度Agents SDK as_tool()CrewAI Task + AgentAutoGen Nested Chat
调用方式工具调用(同步返回)任务分配(异步执行)嵌套对话(完整历史)
上下文隔离强(默认隔离)中(通过上下文共享)弱(可传递完整历史)
结果控制custom_output_extractoroutput_callback对话结束条件
适用场景有界子任务工作流任务链深度协作讨论

CrewAI 的任务系统更侧重于"谁做什么"的分工,通过 Process 控制执行顺序。AutoGen 的嵌套对话则允许子 Agent 拥有更完整的上下文,适合需要深度协作的场景。Agents SDK 的 as_tool() 模式是最轻量的分层方案,适合已经将系统建模为"管理器-专家"架构的团队。

嵌套 Agent 架构的超时传递与结果聚合

mermaid
flowchart TD
subgraph 第一层
M[Manager Agent] -->|as_tool| W1[Worker A]
M -->|as_tool| W2[Worker B]
end
subgraph 第二层
W1 -->|as_tool| S1[Sub-expert 1]
W2 -->|as_tool| S2[Sub-expert 2]
end
M -->|并行 gather| P[结果聚合器]
W1 --> P
W2 --> P
P --> M
style M fill:#f9f,stroke:#333
style P fill:#bbf,stroke:#333


`Agent.as_tool()` 的分层架构虽然强大,但每一层嵌套都会引入额外的延迟和故障点。在生产环境中,最常见的三类问题是超时雪崩、结果格式不一致、错误传播失控。本节提供一套经过验证的工程实践来解决这些问题,确保多层 Agent 系统在高负载下依然稳定可靠。

首先是超时预算分配。当 Manager Agent 设置了总超时(如 15 秒),它需要将这一预算合理分配给各个 Worker。一种有效的策略是对半递减:Manager 自身保留 50% 的时间用于决策和聚合,剩余 50% 分配给直接子 Agent;子 Agent 再按同样规则向下分配。以下代码展示了如何实现这一策略:

```python
import asyncio
from dataclasses import dataclass
from agents import Agent, Runner

@dataclass
class TimeoutBudget:
    total_ms: int
    depth: int = 0
    max_depth: int = 3

    def allocate_for_child(self) -> "TimeoutBudget":
        if self.depth >= self.max_depth:
            raise RuntimeError("超过最大嵌套深度限制")
        child_total = self.total_ms // 2
        if child_total < 1000:
            raise RuntimeError("子 Agent 超时不足 1 秒")
        return TimeoutBudget(total_ms=child_total, depth=self.depth + 1)

async def run_with_budget(agent: Agent, input_text: str, budget: TimeoutBudget):
    try:
        return await asyncio.wait_for(
            Runner.run(agent, input_text),
            timeout=budget.total_ms / 1000.0
        )
    except asyncio.TimeoutError:
        return "[操作超时,请简化请求后重试]"

其次是结构化结果聚合。当 Manager 同时调用多个 Worker 时,每个 Worker 可能返回不同格式的输出。在聚合层统一格式可以避免 Manager 被混乱的输入干扰,从而做出更准确的后续决策:

import json
from typing import List

class ResultAggregator:
    """将多个 Worker 的输出聚合为统一结构。"""

    @staticmethod
    def aggregate(results: List[str], workers: List[str]) -> str:
        entries = []
        for worker_name, raw in zip(workers, results):
            try:
                data = json.loads(raw)
                entries.append({"source": worker_name, "data": data, "ok": True})
            except json.JSONDecodeError:
                entries.append({"source": worker_name, "data": raw, "ok": False})
        return json.dumps({"results": entries}, ensure_ascii=False, indent=2)

async def parallel_dispatch(manager_input: str, workers: List[Agent], budget: TimeoutBudget):
    tasks = []
    for w in workers:
        tool = w.as_tool()
        tasks.append(tool(manager_input))
    raw_results = await asyncio.gather(*tasks, return_exceptions=True)
    safe_results = [
        r if not isinstance(r, Exception) else f"[错误: {str(r)}]"
        for r in raw_results
    ]
    return ResultAggregator.aggregate(safe_results, [w.name for w in workers])

最后是错误传播控制与上下文压缩。return_exceptions=True 是 asyncio.gather 的关键参数,它能确保单个 Worker 失败不会导致整个聚合流程中断。在更严格的场景中,你还可以为每个 Worker 配置独立的重试策略和熔断器,防止某个子 Agent 的无限循环拖垮整个请求链路。由于嵌套调用会将每一层的结果追加到父 Agent 的消息历史中,多层嵌套很容易导致上下文窗口溢出,因此建议在 custom_output_extractor 中对子 Agent 输出进行摘要压缩,只保留关键结论和指标。同时,建议在所有嵌套调用点记录 span 级别的 trace 数据,包含深度、耗时和异常类型,便于后续排查性能瓶颈和优化调用拓扑。对于需要跨层级共享上下文的场景,可以通过 RunContextWrapper 显式注入必要的全局状态,而不是依赖完整的历史传递,这样既能保证上下文隔离的安全性,又能满足特定业务的数据共享需求。在极端高并发场景下,还应考虑为不同层级的 Agent 分配独立的模型实例和速率限制配额,防止底层子 Agent 耗尽全局令牌池,从而保障 Manager 层的决策能力始终可用。

另一个容易被忽视的问题是子 Agent 的指令漂移。由于子 Agent 独立运行,其 instructions 中的行为约束可能随着模型版本更新而失效。建议将子 Agent 的 instructions 纳入版本控制,并在每次模型升级后运行回归测试集,验证子 Agent 的输出格式和决策逻辑是否符合预期。同时,建立父子 Agent 之间的契约文档,明确输入输出格式、超时预算和异常处理规则,降低协作出错的可能性。

生产环境部署与性能优化

递归调用限制的实践要点

将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。

具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。

嵌套深度监控的关键指标

监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。

除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。

子 Agent 资源池的架构考量

随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。

在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。

运维团队的协作建议

技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。

在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。

Agent-as-Tool 的嵌套深度需要严格限制。建议设置全局最大嵌套层级(如 3 层),超过限制时直接返回错误。这可以防止因逻辑缺陷导致的无限递归和资源耗尽。

子 Agent 的超时设置应与父 Agent 协调。如果父 Agent 设置了 10 秒超时,而子 Agent 设置了 8 秒超时,那么留给其他逻辑的时间只有 2 秒。建议子 Agent 的超时不超过父 Agent 的 50%。

当 Manager Agent 需要同时调用多个 Worker Agent 时,建议使用 asyncio.gather 实现并行调用,而不是串行等待。并行化可以显著降低总延迟,但需要注意并发数控制,避免触发下游服务的限流。
Agent 作为工具的调用链路追踪需要特别关注。由于嵌套调用会形成调用树,建议在追踪数据中记录父子 Span 关系,便于后续分析性能瓶颈和错误传播路径。