基本用法
Schema 缓存与并发工具执行的内部优化
@function_tool 的 Schema 生成虽然便利,但并非没有成本。每次装饰器被应用时,SDK 需要调用 inspect.signature 和 griffe 解析函数签名,再通过 pydantic 生成 JSON Schema。对于包含数十个工具的 Agent,这一过程的累计开销可能达到数十毫秒。
SDK 内部对 Schema 做了惰性缓存:一旦某个函数的 Schema 被生成,它会被存储在 FunctionTool 实例的属性中,后续重复使用同一工具时不会重新解析。然而,如果工具函数是在运行时动态生成的(如从配置文件加载),每次创建新的 FunctionTool 实例都会触发一次 Schema 生成。
在并发执行方面,Runner 默认会并行执行同一轮中的所有工具调用。这意味着如果你的 Agent 同时请求查询天气和搜索新闻,两个工具会在不同的 asyncio 任务中并发运行。这种设计显著降低了多工具场景下的总延迟,但也带来了并发安全的问题:
- 共享状态竞争:如果多个工具同时修改同一个全局变量或数据库记录,结果将不可预期。
- 资源耗尽:并发执行大量 I/O 密集型工具可能导致连接池耗尽或文件句柄超限。
- 副作用叠加:某些工具具有副作用(如发送邮件、扣款),并发执行可能导致重复操作。
对于需要严格串行执行的场景,目前的 workaround 是在工具实现内部使用 asyncio.Lock,或者在 instructions 中明确告诉模型"一次只调用一个工具"。
@function_tool 是 Agents SDK 中最常用的工具定义方式。它将任意 Python 函数转换为 Agent 可调用的工具。
from agents import Agent, Runner, function_tool
@function_tool
def get_weather(city: str) -> str:
"""Get the current weather for a city."""
return f"The weather in {city} is sunny and 25°C."
agent = Agent(
name="Weather Bot",
instructions="Help users with weather queries.",
tools=[get_weather],
)
result = Runner.run_sync(agent, "What's the weather in Tokyo?")
print(result.final_output)关键规则
- 类型注解必需:所有参数必须有类型注解
- 文档字符串必需:函数的 docstring 会作为工具描述传递给 LLM
- 返回值:会被自动转换为字符串
结构化参数
使用 Pydantic 模型定义复杂参数:
from pydantic import BaseModel
from agents import function_tool
class Address(BaseModel):
street: str
city: str
country: str
@function_tool
def calculate_shipping(address: Address, weight_kg: float) -> str:
"""Calculate shipping cost for an address."""
return f"Shipping to {address.city} costs ${weight_kg * 2.5}"延迟加载
当工具很多时,使用 defer_loading=True 减少初始 token:
@function_tool(defer_loading=True)
def get_customer_profile(customer_id: str) -> str:
"""Fetch customer profile from CRM."""
return f"Profile for {customer_id}"
# 需要配合 ToolSearchTool 使用
agent = Agent(
name="CRM Assistant",
tools=[get_customer_profile, ToolSearchTool()],
)异步工具
工具函数可以是 async 的:
import aiohttp
@function_tool
async def fetch_url(url: str) -> str:
"""Fetch content from a URL."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()工具配置
@function_tool(
name_override="weather_lookup", # 覆盖工具名
description_override="Look up weather data.", # 覆盖描述
needs_approval=True, # 需要人工审批
)
def get_weather(city: str) -> str:
...完整示例
from agents import Agent, Runner, function_tool
from typing import Literal
@function_tool
def search_products(
query: str,
category: Literal["electronics", "clothing", "food"],
max_price: float | None = None
) -> str:
"""Search products by query and category."""
results = [f"{category} item matching '{query}'"]
if max_price:
results.append(f"Under ${max_price}")
return "\n".join(results)
agent = Agent(
name="Shopping Assistant",
instructions="Help users find products.",
tools=[search_products],
)与 Guardrails 结合
from agents import input_guardrail, GuardrailFunctionOutput
@input_guardrail
async def validate_search(ctx, agent, input):
if "password" in str(input).lower():
return GuardrailFunctionOutput(
tripwire_triggered=True,
output_info="Search contains sensitive keyword",
)
return GuardrailFunctionOutput(tripwire_triggered=False)
agent = Agent(
name="Safe Shopping",
tools=[search_products],
input_guardrails=[validate_search],
)下一步
了解 OpenAI 托管工具 和 MCP 工具集成。
完整实战示例:企业级工具注册表与并发控制
以下示例展示了如何构建一个支持权限校验、速率限制和并发控制的工具注册表:
import asyncio
import time
from typing import Any, Callable
from functools import wraps
from agents import Agent, Runner, RunContextWrapper, function_tool
class ToolRegistry:
"""企业级工具注册表,支持权限、限流和审计。"""
def __init__(self):
self._tools: dict[str, Callable] = {}
self._permissions: dict[str, list[str]] = {} # tool_name -> [required_role]
self._rate_limits: dict[str, tuple[int, float]] = {} # tool_name -> (max_calls, window_sec)
self._call_counts: dict[str, list[float]] = {} # tool_name -> [timestamps]
self._locks: dict[str, asyncio.Lock] = {}
def register(
self,
fn: Callable,
name: str | None = None,
roles: list[str] | None = None,
rate_limit: tuple[int, float] | None = None,
serial: bool = False,
):
tool_name = name or fn.__name__
self._tools[tool_name] = fn
if roles:
self._permissions[tool_name] = roles
if rate_limit:
self._rate_limits[tool_name] = rate_limit
self._call_counts[tool_name] = []
if serial:
self._locks[tool_name] = asyncio.Lock()
return fn
async def execute(
self,
tool_name: str,
ctx: RunContextWrapper[Any],
*args,
**kwargs,
) -> str:
# 权限检查
user_roles = getattr(ctx.context, "roles", [])
required = self._permissions.get(tool_name, [])
if required and not any(r in user_roles for r in required):
raise PermissionError(f"Tool '{tool_name}' requires one of roles: {required}")
# 速率限制检查
if tool_name in self._rate_limits:
max_calls, window = self._rate_limits[tool_name]
now = time.time()
self._call_counts[tool_name] = [t for t in self._call_counts[tool_name] if now - t < window]
if len(self._call_counts[tool_name]) >= max_calls:
raise RuntimeError(f"Rate limit exceeded for tool '{tool_name}'")
self._call_counts[tool_name].append(now)
# 串行执行控制
fn = self._tools[tool_name]
if tool_name in self._locks:
async with self._locks[tool_name]:
return await fn(*args, **kwargs) if asyncio.iscoroutinefunction(fn) else fn(*args, **kwargs)
return await fn(*args, **kwargs) if asyncio.iscoroutinefunction(fn) else fn(*args, **kwargs)
registry = ToolRegistry()
async def query_database(sql: str) -> str:
"""模拟数据库查询。"""
await asyncio.sleep(0.3)
return f"Query result for: {sql[:20]}..."
async def send_email(to: str, subject: str) -> str:
"""模拟发送邮件。"""
await asyncio.sleep(0.2)
return f"Email sent to {to} with subject '{subject}'"
registry.register(query_database, roles=["admin", "analyst"], rate_limit=(10, 60.0))
registry.register(send_email, roles=["admin"], serial=True) # 邮件串行发送避免重复
async def main():
# 将注册表工具包装为 function_tool(简化示例)
agent = Agent(
name="RegistryAgent",
instructions="Use available tools to help the user.",
model="gpt-5-nano")
常见问题与调试
问题一:工具 Schema 生成失败或过于宽松
如果函数参数缺少类型注解,SDK 会生成 anyOf 或 {"type": "string"} 这样的宽松 Schema,导致模型调用质量下降。排查方法:
- 使用
mypy或pyright对工具函数进行静态类型检查。 - 在单元测试中验证
FunctionTool.params_json_schema的结构是否符合预期。 - 对于复杂类型,优先使用 Pydantic
BaseModel而非裸字典或TypedDict。
问题二:并发工具执行导致数据库连接池耗尽
当 Agent 在一次响应中请求调用 5 个工具,而每个工具都打开一个数据库连接时,可能瞬间耗尽连接池。解决方案:
- 在工具实现中使用连接池(如
asyncpg的 pool),而不是每次新建连接。 - 通过
RunConfig或自定义 Runner 限制每轮的最大并发工具数。 - 对于非关键工具,实现熔断机制,当连接池使用率超过 80% 时自动拒绝新请求。
问题三:RunContextWrapper 类型不匹配
如果 Agent 声明了 Agent[UserContext],但某个工具接受 RunContextWrapper[OtherContext],类型检查器会报错。运行时虽然可能通过(Python 的动态类型特性),但会导致上下文对象访问失败。务必确保整个运行链路使用统一的上下文类型。
与其他方案对比
| 维度 | Agents SDK @function_tool | LangChain @tool | CrewAI @tool |
|---|---|---|---|
| Schema 生成 | 自动(inspect + griffe) | 自动(inspect) | 自动(inspect) |
| 上下文注入 | 原生 RunContextWrapper | 依赖 RunnableConfig | 通过 agent 属性 |
| 并发执行 | 默认并行(每轮内) | 默认串行 | 默认串行 |
| 延迟加载 | 原生支持 defer_loading | 需自行实现 | 不支持 |
LangChain 的工具系统在生态丰富度上具有优势,有大量的预置工具(如 SerpAPI、SQLDatabase)。Agents SDK 的工具系统更轻量,但 defer_loading 和 ToolSearchTool 的组合在工具数量膨胀时(超过 20 个)能显著节省 token 成本。CrewAI 的工具封装更贴近"代理执行任务"的语义,但在底层控制粒度上不如前两者精细。
工具生命周期与 Schema 演进实战
mermaid
flowchart TD
A[Python 函数定义] --> B[inspect.signature 解析签名]
B --> C[griffe 提取文档字符串]
C --> D[pydantic 生成 JSON Schema]
D --> E[Schema 缓存到 FunctionTool]
E --> F[注册到 Agent.tools 列表]
F --> G[LLM 请求携带工具定义]
G --> H[模型输出 tool_calls]
H --> I{是否并发?}
I -->|是| J[asyncio.gather 并行执行]
I -->|否| K[串行调用]
J --> L[结果转为字符串]
K --> L
L --> M[追加到消息历史]
理解 `@function_tool` 的生命周期有助于在生产环境中做出正确的性能决策。整个流程可分为三个阶段:**Schema 生成期**、**注册与注入期**、**执行与回写期**。在生成期,SDK 依赖 `inspect` 模块读取类型注解,再通过 `griffe` 解析 docstring 的语义结构,最终由 pydantic 输出符合 OpenAI Function Calling 规范的 JSON Schema。这一阶段的耗时通常在 5 到 20 毫秒之间,但对于动态生成的工具(例如从数据库加载的插件或用户自定义脚本),重复生成 Schema 会成为明显的性能瓶颈,在工具数量超过 50 个时尤为突出。
解决此问题的核心思路是**预生成加序列化缓存**。你可以在应用启动时将全部工具的 Schema 导出为静态 JSON 文件,运行时直接加载,绕过整个解析链路。以下示例展示了如何实现这一策略:
```python
import json
import os
from agents import function_tool
from agents.tools.function_tool import FunctionTool
# 预生成阶段(CI/CD 或启动脚本中执行)
def export_schemas(tools: list, output_path: str):
schemas = {}
for t in tools:
ft = t if isinstance(t, FunctionTool) else t._tool
schemas[ft.name] = ft.params_json_schema
with open(output_path, "w", encoding="utf-8") as f:
json.dump(schemas, f, ensure_ascii=False, indent=2)
print(f"Exported {len(schemas)} schemas to {output_path}")
# 运行时加载阶段(生产环境使用)
def load_cached_schema(tool_name: str, cache_dir: str = "./tool_schemas") -> dict:
path = os.path.join(cache_dir, "schemas.json")
if not os.path.exists(path):
raise FileNotFoundError(f"Schema cache not found: {path}")
with open(path, "r", encoding="utf-8") as f:
all_schemas = json.load(f)
return all_schemas.get(tool_name, {})另一个实战需求是参数级别的自定义校验。虽然 pydantic 会自动校验基础类型,但业务规则校验(如城市名必须在白名单中、日期不能晚于今天、订单金额不能超过用户余额)需要开发者自行实现。推荐的做法是在函数体内前置校验逻辑,并通过抛出 ValueError 让模型收到清晰的错误反馈,模型往往能在下一轮对话中修正参数:
from datetime import datetime
from agents import function_tool
@function_tool
def book_flight(origin: str, destination: str, depart_date: str) -> str:
"""预订航班。depart_date 格式为 YYYY-MM-DD。"""
allowed_cities = {"PEK", "SHA", "CAN", "SZX", "CTU"}
if origin not in allowed_cities or destination not in allowed_cities:
raise ValueError(f"仅支持以下城市: {', '.join(allowed_cities)}")
try:
dt = datetime.strptime(depart_date, "%Y-%m-%d")
except ValueError:
raise ValueError("日期格式错误,请使用 YYYY-MM-DD")
if dt.date() < datetime.now().date():
raise ValueError("出发日期不能早于今天")
return f"成功预订 {origin} -> {destination} 的航班,日期 {depart_date}"在并发场景下,上述校验逻辑天然是线程安全的,因为它只访问本地变量和不可变集合。但如果校验需要查询外部服务(如验证用户额度或库存数量),则必须引入分布式锁或令牌桶限流,防止竞态条件导致超额预订或超卖。将工具函数设计为无状态且显式注入依赖,是确保并发安全的关键原则。此外,对于高频调用工具,建议在入口处添加指标埋点,记录校验失败率和各阶段的延迟分布,以便持续优化。长期来看,还应建立 Schema 版本管理机制,当工具参数发生变更时,通过灰度发布验证新 Schema 对模型调用准确率的影响,避免全量更新导致的功能回退。监控方面,可以通过 OpenTelemetry 为每个工具调用生成独立的 Span,追踪从 Schema 解析到结果返回的全链路耗时,并将这些数据对接到 Prometheus 和 Grafana 进行可视化分析,形成完整的可观测性闭环。
对于 Schema 演进中的破坏性变更,推荐采用蓝绿部署策略:新旧版本工具并行注册,通过流量切换比例逐步验证新版本的稳定性。当确认模型对新 Schema 的调用准确率达标后,再下线旧版本工具,确保线上服务零中断。
生产环境部署与性能优化
工具服务化的实践要点
将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。
具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。
工具调用指标的关键指标
监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。
除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。
工具并发控制的架构考量
随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。
在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。
运维团队的协作建议
技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。
在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。
工具函数的质量直接决定 Agent 的可靠性。建议为每个工具编写单元测试,覆盖正常路径、边界情况和异常路径。只有测试通过的工具才允许注册到生产环境的 Agent 中。