函数工具 Function Tool

📑 目录

基本用法

Schema 缓存与并发工具执行的内部优化

@function_tool 的 Schema 生成虽然便利,但并非没有成本。每次装饰器被应用时,SDK 需要调用 inspect.signaturegriffe 解析函数签名,再通过 pydantic 生成 JSON Schema。对于包含数十个工具的 Agent,这一过程的累计开销可能达到数十毫秒。

SDK 内部对 Schema 做了惰性缓存:一旦某个函数的 Schema 被生成,它会被存储在 FunctionTool 实例的属性中,后续重复使用同一工具时不会重新解析。然而,如果工具函数是在运行时动态生成的(如从配置文件加载),每次创建新的 FunctionTool 实例都会触发一次 Schema 生成。

在并发执行方面,Runner 默认会并行执行同一轮中的所有工具调用。这意味着如果你的 Agent 同时请求查询天气和搜索新闻,两个工具会在不同的 asyncio 任务中并发运行。这种设计显著降低了多工具场景下的总延迟,但也带来了并发安全的问题:

  1. 共享状态竞争:如果多个工具同时修改同一个全局变量或数据库记录,结果将不可预期。
  2. 资源耗尽:并发执行大量 I/O 密集型工具可能导致连接池耗尽或文件句柄超限。
  3. 副作用叠加:某些工具具有副作用(如发送邮件、扣款),并发执行可能导致重复操作。

对于需要严格串行执行的场景,目前的 workaround 是在工具实现内部使用 asyncio.Lock,或者在 instructions 中明确告诉模型"一次只调用一个工具"。

@function_tool 是 Agents SDK 中最常用的工具定义方式。它将任意 Python 函数转换为 Agent 可调用的工具。

from agents import Agent, Runner, function_tool

@function_tool
def get_weather(city: str) -> str:
    """Get the current weather for a city."""
    return f"The weather in {city} is sunny and 25°C."

agent = Agent(
    name="Weather Bot",
    instructions="Help users with weather queries.",
    tools=[get_weather],
)

result = Runner.run_sync(agent, "What's the weather in Tokyo?")
print(result.final_output)

关键规则

  1. 类型注解必需:所有参数必须有类型注解
  2. 文档字符串必需:函数的 docstring 会作为工具描述传递给 LLM
  3. 返回值:会被自动转换为字符串

结构化参数

使用 Pydantic 模型定义复杂参数:

from pydantic import BaseModel
from agents import function_tool

class Address(BaseModel):
    street: str
    city: str
    country: str

@function_tool
def calculate_shipping(address: Address, weight_kg: float) -> str:
    """Calculate shipping cost for an address."""
    return f"Shipping to {address.city} costs ${weight_kg * 2.5}"

延迟加载

当工具很多时,使用 defer_loading=True 减少初始 token:

@function_tool(defer_loading=True)
def get_customer_profile(customer_id: str) -> str:
    """Fetch customer profile from CRM."""
    return f"Profile for {customer_id}"

# 需要配合 ToolSearchTool 使用
agent = Agent(
    name="CRM Assistant",
    tools=[get_customer_profile, ToolSearchTool()],
)

异步工具

工具函数可以是 async 的:

import aiohttp

@function_tool
async def fetch_url(url: str) -> str:
    """Fetch content from a URL."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

工具配置

@function_tool(
    name_override="weather_lookup",  # 覆盖工具名
    description_override="Look up weather data.",  # 覆盖描述
    needs_approval=True,  # 需要人工审批
)
def get_weather(city: str) -> str:
    ...

完整示例

from agents import Agent, Runner, function_tool
from typing import Literal

@function_tool
def search_products(
    query: str,
    category: Literal["electronics", "clothing", "food"],
    max_price: float | None = None
) -> str:
    """Search products by query and category."""
    results = [f"{category} item matching '{query}'"]
    if max_price:
        results.append(f"Under ${max_price}")
    return "\n".join(results)

agent = Agent(
    name="Shopping Assistant",
    instructions="Help users find products.",
    tools=[search_products],
)

与 Guardrails 结合

from agents import input_guardrail, GuardrailFunctionOutput

@input_guardrail
async def validate_search(ctx, agent, input):
    if "password" in str(input).lower():
        return GuardrailFunctionOutput(
            tripwire_triggered=True,
            output_info="Search contains sensitive keyword",
        )
    return GuardrailFunctionOutput(tripwire_triggered=False)

agent = Agent(
    name="Safe Shopping",
    tools=[search_products],
    input_guardrails=[validate_search],
)

下一步

了解 OpenAI 托管工具MCP 工具集成

完整实战示例:企业级工具注册表与并发控制

以下示例展示了如何构建一个支持权限校验、速率限制和并发控制的工具注册表:

import asyncio
import time
from typing import Any, Callable
from functools import wraps
from agents import Agent, Runner, RunContextWrapper, function_tool


class ToolRegistry:
    """企业级工具注册表,支持权限、限流和审计。"""

    def __init__(self):
        self._tools: dict[str, Callable] = {}
        self._permissions: dict[str, list[str]] = {}  # tool_name -> [required_role]
        self._rate_limits: dict[str, tuple[int, float]] = {}  # tool_name -> (max_calls, window_sec)
        self._call_counts: dict[str, list[float]] = {}  # tool_name -> [timestamps]
        self._locks: dict[str, asyncio.Lock] = {}

    def register(
        self,
        fn: Callable,
        name: str | None = None,
        roles: list[str] | None = None,
        rate_limit: tuple[int, float] | None = None,
        serial: bool = False,
    ):
        tool_name = name or fn.__name__
        self._tools[tool_name] = fn
        if roles:
            self._permissions[tool_name] = roles
        if rate_limit:
            self._rate_limits[tool_name] = rate_limit
            self._call_counts[tool_name] = []
        if serial:
            self._locks[tool_name] = asyncio.Lock()
        return fn

    async def execute(
        self,
        tool_name: str,
        ctx: RunContextWrapper[Any],
        *args,
        **kwargs,
    ) -> str:
        # 权限检查
        user_roles = getattr(ctx.context, "roles", [])
        required = self._permissions.get(tool_name, [])
        if required and not any(r in user_roles for r in required):
            raise PermissionError(f"Tool '{tool_name}' requires one of roles: {required}")

        # 速率限制检查
        if tool_name in self._rate_limits:
            max_calls, window = self._rate_limits[tool_name]
            now = time.time()
            self._call_counts[tool_name] = [t for t in self._call_counts[tool_name] if now - t < window]
            if len(self._call_counts[tool_name]) >= max_calls:
                raise RuntimeError(f"Rate limit exceeded for tool '{tool_name}'")
            self._call_counts[tool_name].append(now)

        # 串行执行控制
        fn = self._tools[tool_name]
        if tool_name in self._locks:
            async with self._locks[tool_name]:
                return await fn(*args, **kwargs) if asyncio.iscoroutinefunction(fn) else fn(*args, **kwargs)
        return await fn(*args, **kwargs) if asyncio.iscoroutinefunction(fn) else fn(*args, **kwargs)


registry = ToolRegistry()


async def query_database(sql: str) -> str:
    """模拟数据库查询。"""
    await asyncio.sleep(0.3)
    return f"Query result for: {sql[:20]}..."


async def send_email(to: str, subject: str) -> str:
    """模拟发送邮件。"""
    await asyncio.sleep(0.2)
    return f"Email sent to {to} with subject '{subject}'"


registry.register(query_database, roles=["admin", "analyst"], rate_limit=(10, 60.0))
registry.register(send_email, roles=["admin"], serial=True)  # 邮件串行发送避免重复


async def main():
    # 将注册表工具包装为 function_tool(简化示例)
    agent = Agent(
        name="RegistryAgent",
        instructions="Use available tools to help the user.",
        model="gpt-5-nano")

常见问题与调试

问题一:工具 Schema 生成失败或过于宽松

如果函数参数缺少类型注解,SDK 会生成 anyOf{"type": "string"} 这样的宽松 Schema,导致模型调用质量下降。排查方法:

  1. 使用 mypypyright 对工具函数进行静态类型检查。
  2. 在单元测试中验证 FunctionTool.params_json_schema 的结构是否符合预期。
  3. 对于复杂类型,优先使用 Pydantic BaseModel 而非裸字典或 TypedDict

问题二:并发工具执行导致数据库连接池耗尽

当 Agent 在一次响应中请求调用 5 个工具,而每个工具都打开一个数据库连接时,可能瞬间耗尽连接池。解决方案:

  1. 在工具实现中使用连接池(如 asyncpg 的 pool),而不是每次新建连接。
  2. 通过 RunConfig 或自定义 Runner 限制每轮的最大并发工具数。
  3. 对于非关键工具,实现熔断机制,当连接池使用率超过 80% 时自动拒绝新请求。

问题三:RunContextWrapper 类型不匹配

如果 Agent 声明了 Agent[UserContext],但某个工具接受 RunContextWrapper[OtherContext],类型检查器会报错。运行时虽然可能通过(Python 的动态类型特性),但会导致上下文对象访问失败。务必确保整个运行链路使用统一的上下文类型。

与其他方案对比

维度Agents SDK @function_toolLangChain @toolCrewAI @tool
Schema 生成自动(inspect + griffe)自动(inspect)自动(inspect)
上下文注入原生 RunContextWrapper依赖 RunnableConfig通过 agent 属性
并发执行默认并行(每轮内)默认串行默认串行
延迟加载原生支持 defer_loading需自行实现不支持

LangChain 的工具系统在生态丰富度上具有优势,有大量的预置工具(如 SerpAPI、SQLDatabase)。Agents SDK 的工具系统更轻量,但 defer_loadingToolSearchTool 的组合在工具数量膨胀时(超过 20 个)能显著节省 token 成本。CrewAI 的工具封装更贴近"代理执行任务"的语义,但在底层控制粒度上不如前两者精细。

工具生命周期与 Schema 演进实战

mermaid
flowchart TD
A[Python 函数定义] --> B[inspect.signature 解析签名]
B --> C[griffe 提取文档字符串]
C --> D[pydantic 生成 JSON Schema]
D --> E[Schema 缓存到 FunctionTool]
E --> F[注册到 Agent.tools 列表]
F --> G[LLM 请求携带工具定义]
G --> H[模型输出 tool_calls]
H --> I{是否并发?}
I -->|是| J[asyncio.gather 并行执行]
I -->|否| K[串行调用]
J --> L[结果转为字符串]
K --> L
L --> M[追加到消息历史]


理解 `@function_tool` 的生命周期有助于在生产环境中做出正确的性能决策。整个流程可分为三个阶段:**Schema 生成期**、**注册与注入期**、**执行与回写期**。在生成期,SDK 依赖 `inspect` 模块读取类型注解,再通过 `griffe` 解析 docstring 的语义结构,最终由 pydantic 输出符合 OpenAI Function Calling 规范的 JSON Schema。这一阶段的耗时通常在 5 到 20 毫秒之间,但对于动态生成的工具(例如从数据库加载的插件或用户自定义脚本),重复生成 Schema 会成为明显的性能瓶颈,在工具数量超过 50 个时尤为突出。

解决此问题的核心思路是**预生成加序列化缓存**。你可以在应用启动时将全部工具的 Schema 导出为静态 JSON 文件,运行时直接加载,绕过整个解析链路。以下示例展示了如何实现这一策略:

```python
import json
import os
from agents import function_tool
from agents.tools.function_tool import FunctionTool

# 预生成阶段(CI/CD 或启动脚本中执行)
def export_schemas(tools: list, output_path: str):
    schemas = {}
    for t in tools:
        ft = t if isinstance(t, FunctionTool) else t._tool
        schemas[ft.name] = ft.params_json_schema
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(schemas, f, ensure_ascii=False, indent=2)
    print(f"Exported {len(schemas)} schemas to {output_path}")

# 运行时加载阶段(生产环境使用)
def load_cached_schema(tool_name: str, cache_dir: str = "./tool_schemas") -> dict:
    path = os.path.join(cache_dir, "schemas.json")
    if not os.path.exists(path):
        raise FileNotFoundError(f"Schema cache not found: {path}")
    with open(path, "r", encoding="utf-8") as f:
        all_schemas = json.load(f)
    return all_schemas.get(tool_name, {})

另一个实战需求是参数级别的自定义校验。虽然 pydantic 会自动校验基础类型,但业务规则校验(如城市名必须在白名单中、日期不能晚于今天、订单金额不能超过用户余额)需要开发者自行实现。推荐的做法是在函数体内前置校验逻辑,并通过抛出 ValueError 让模型收到清晰的错误反馈,模型往往能在下一轮对话中修正参数:

from datetime import datetime
from agents import function_tool

@function_tool
def book_flight(origin: str, destination: str, depart_date: str) -> str:
    """预订航班。depart_date 格式为 YYYY-MM-DD。"""
    allowed_cities = {"PEK", "SHA", "CAN", "SZX", "CTU"}
    if origin not in allowed_cities or destination not in allowed_cities:
        raise ValueError(f"仅支持以下城市: {', '.join(allowed_cities)}")
    try:
        dt = datetime.strptime(depart_date, "%Y-%m-%d")
    except ValueError:
        raise ValueError("日期格式错误,请使用 YYYY-MM-DD")
    if dt.date() < datetime.now().date():
        raise ValueError("出发日期不能早于今天")
    return f"成功预订 {origin} -> {destination} 的航班,日期 {depart_date}"

在并发场景下,上述校验逻辑天然是线程安全的,因为它只访问本地变量和不可变集合。但如果校验需要查询外部服务(如验证用户额度或库存数量),则必须引入分布式锁或令牌桶限流,防止竞态条件导致超额预订或超卖。将工具函数设计为无状态且显式注入依赖,是确保并发安全的关键原则。此外,对于高频调用工具,建议在入口处添加指标埋点,记录校验失败率和各阶段的延迟分布,以便持续优化。长期来看,还应建立 Schema 版本管理机制,当工具参数发生变更时,通过灰度发布验证新 Schema 对模型调用准确率的影响,避免全量更新导致的功能回退。监控方面,可以通过 OpenTelemetry 为每个工具调用生成独立的 Span,追踪从 Schema 解析到结果返回的全链路耗时,并将这些数据对接到 Prometheus 和 Grafana 进行可视化分析,形成完整的可观测性闭环。

对于 Schema 演进中的破坏性变更,推荐采用蓝绿部署策略:新旧版本工具并行注册,通过流量切换比例逐步验证新版本的稳定性。当确认模型对新 Schema 的调用准确率达标后,再下线旧版本工具,确保线上服务零中断。

生产环境部署与性能优化

工具服务化的实践要点

将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。

具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。

工具调用指标的关键指标

监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。

除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。

工具并发控制的架构考量

随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。

在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。

运维团队的协作建议

技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。

在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。

工具函数的质量直接决定 Agent 的可靠性。建议为每个工具编写单元测试,覆盖正常路径、边界情况和异常路径。只有测试通过的工具才允许注册到生产环境的 Agent 中。