生命周期与 Hooks

📑 目录

概述

Hook 链执行顺序与中间件模式的设计哲学

Agents SDK 的生命周期 Hooks 允许开发者在 Agent 运行的关键节点插入自定义逻辑。与框架级别的中间件(如 FastAPI 的中间件链或 Django 的信号系统)不同,SDK 的 Hooks 是点对点的:每个 Hook 只在一个特定的事件点触发,而不是包裹整个请求-响应周期。

当前的 Hook 体系主要包括:

  • on_start:Agent 开始处理输入前触发,适合初始化资源或记录审计日志。
  • on_end:Agent 产生最终输出后触发,适合释放资源或发送 metrics。
  • on_tool_start / on_tool_end:单个工具调用前后触发,适合细粒度的性能监控和参数校验。
  • on_handoff:Handoff 发生时触发,适合记录路由决策和上下文摘要。

从执行顺序来看,Hooks 遵循深度优先原则:如果一次运行中发生了多次工具调用和 Handoff,每个嵌套事件的 Hooks 都会在父级事件完成前执行完毕。这种设计确保了资源管理的正确性(如数据库连接可以在最内层打开,在最外层关闭)。

然而,SDK 目前不支持 Hook 的链式注册(即一个事件点只能注册一个回调函数,后来的会覆盖先来的)。这与 FastAPI 的中间件栈(可以无限叠加)形成了鲜明对比。在实际项目中,如果需要多个独立的 Hook 逻辑(如一个负责日志,一个负责监控),建议自行实现一个分发器:

def composite_hook(*hooks):
    async def wrapper(ctx, *args, **kwargs):
        for h in hooks:
            await h(ctx, *args, **kwargs)
    return wrapper

Agent 生命周期事件:on_start、on_end、on_handoff 等 Hook 的注册与使用。

正文

相关阅读

参考文档

完整实战示例:全链路监控与审计的 Hook 中间件

以下示例展示了如何通过组合 Hook 实现日志、监控和审计的三重覆盖,并支持动态开关:

import time
import asyncio
from typing import Any, Callable
from agents import Agent, Runner, RunContextWrapper, function_tool


class HookMiddleware:
    """可组合的 Hook 中间件,支持日志、计时和审计。"""

    def __init__(self):
        self.audit_log: list[dict] = []
        self.enabled = {"logging": True, "timing": True, "audit": True}

    def log_hook(self, event: str) -> Callable:
        async def hook(ctx: RunContextWrapper[Any], *args, **kwargs):
            if self.enabled["logging"]:
                print(f"[LOG] {event} | run_id={ctx.run_id}")
        return hook

    def timing_hook(self, event: str) -> Callable:
        async def hook(ctx: RunContextWrapper[Any], *args, **kwargs):
            if self.enabled["timing"]:
                ctx.metadata = getattr(ctx, "metadata", {})
                ctx.metadata[f"{event}_start"] = time.perf_counter()
        return hook

    def timing_end_hook(self, event: str) -> Callable:
        async def hook(ctx: RunContextWrapper[Any], *args, **kwargs):
            if self.enabled["timing"]:
                start = getattr(ctx, "metadata", {}).get(f"{event}_start", 0)
                duration = time.perf_counter() - start
                print(f"[TIMING] {event} took {duration:.4f}s | run_id={ctx.run_id}")
        return hook

    def audit_hook(self) -> Callable:
        async def hook(ctx: RunContextWrapper[Any], *args, **kwargs):
            if self.enabled["audit"]:
                self.audit_log.append({
                    "run_id": ctx.run_id,
                    "timestamp": time.time(),
                    "context_type": type(ctx.context).__name__,
                })
        return hook

    def get_hooks(self) -> dict[str, Callable]:
        return {
            "on_start": self.log_hook("on_start"),
            "on_end": self.log_hook("on_end"),
            "on_tool_start": self.timing_hook("tool"),
            "on_tool_end": self.timing_end_hook("tool"),
            "on_handoff": self.audit_hook(),
        }


@function_tool
def slow_operation(query: str) -> str:
    """模拟慢速操作。"""
    import time
    time.sleep(0.5)
    return f"Result for {query}"


async def main():
    middleware = HookMiddleware()
    hooks = middleware.get_hooks()

    agent = Agent(
        name="HookedAgent",
        instructions="Use tools when needed.",
        tools=[slow_operation],
    )

Agent 生命周期与 Hook 触发点时序

以下时序图展示了 Agent 执行过程中各 Hook 的触发时机:
mermaid
sequenceDiagram
participant U as 用户
participant BR as before_run
participant A as Agent
participant BL as before_llm
participant L as LLM API
participant AL as after_llm
participant BT as before_tool
participant T as 工具
participant AT as after_tool
participant AR as after_run
U->>BR: 触发
BR->>A: 上下文准备
A->>BL: 触发
BL->>L: 请求发送前
L–>>AL: 响应接收后
AL->>A: 解析响应
A->>BT: 如需调用工具
BT->>T: 执行前
T–>>AT: 执行后
AT->>A: 回填结果
A->>AR: 触发
AR–>>U: 返回结果


Hooks 的强大之处在于它们可以修改执行流程中的任何数据。例如,before_llm Hook 可以动态调整 temperature 参数,after_tool Hook 可以对工具结果进行后处理或缓存。

## Hook 的高级用法:AOP 编程模式

Hooks 可以实现面向切面编程(AOP),将横切关注点(日志、监控、认证、限流)从业务逻辑中分离出来。

**自动日志记录**:为所有 Agent 调用添加统一的日志记录,无需在每个 Agent 中重复代码。

```python
import time
from agents import RunContext

async def logging_hook(ctx: RunContext):
    ctx.context["_start_time"] = time.time()
    ctx.tracing.span("agent_start", {"agent": ctx.agent.name})

async def logging_after_hook(ctx: RunContext):
    duration = time.time() - ctx.context.get("_start_time", 0)
    ctx.tracing.span("agent_end", {
        "duration_ms": round(duration * 1000, 2),
        "output_length": len(ctx.result.final_output),
    })

动态限流:在 before_run 中检查当前用户的请求频率,超限则直接返回错误,避免浪费 LLM API 调用额度。

import asyncio

rate_limit_store = {}

async def rate_limit_hook(ctx: RunContext):
    user_id = ctx.context.get("user_id", "anonymous")
    now = asyncio.get_event_loop().time()
    window = rate_limit_store.setdefault(user_id, [])
    window[:] = [t for t in window if now - t < 60]
    if len(window) >= 10:
        raise RateLimitError("请求过于频繁,请稍后再试")
    window.append(now)

这种 AOP 模式的优势在于关注点分离:业务开发者专注于 Agent 的核心逻辑,基础设施开发者通过 Hooks 注入运维需求,两者互不干扰。

Hooks 的测试容易被忽视。由于 Hook 在 Agent 执行流程中触发,单元测试需要模拟完整的 RunContext。建议为每个 Hook 编写独立的异步测试用例,覆盖正常和异常路径。多个 Hook 注册到同一生命周期事件时,它们的执行顺序遵循注册先后顺序。若某个 Hook 抛出异常,默认行为是中断后续 Hook 执行并将异常抛出给调用方。建议在 Hook 内部捕获异常并记录日志,避免单个 Hook 的失败影响整个 Agent 的执行流程。

常见问题与调试

问题一:Hook 中抛出异常导致运行中断

如果 Hook 回调函数中发生了未捕获的异常,默认行为是中断当前 Agent 的运行。这在生产环境中非常危险——一个监控 Hook 的 bug 不应该导致用户请求失败。建议:

  1. 在所有 Hook 内部包裹 try/except,将异常记录到日志而不是抛出。
  2. 对于非关键 Hook(如 metrics),设置 suppress_exceptions=True(如果 SDK 支持)或自行实现类似逻辑。
  3. 在 CI/CD 中增加 Hook 逻辑的单元测试,避免运行时才发现问题。

问题二:Hook 中的异步操作死锁

如果 Hook 内部调用了阻塞式 I/O(如同步的数据库查询),而 Runner 运行在单线程事件循环中,就会导致整个运行流程卡住。务必确保所有 Hook 都是 async def,且内部只使用异步库(如 aiomysqlaiohttp)。

问题三:Hook 执行顺序不可预期

由于 SDK 的 Hook 注册机制不支持优先级排序,当多个模块各自注册 Hook 时,执行顺序取决于导入顺序。建议:

  1. 在一个中心化的模块中统一管理所有 Hook 的注册。
  2. 使用上述的 composite_hook 模式显式控制执行顺序。
  3. 避免在多个地方对同一个事件点重复注册 Hook。

与其他方案对比

维度Agents SDK HooksFastAPI 中间件Django 信号
触发粒度事件点(离散)请求-响应(连续)事件广播(离散)
链式支持否(单回调)是(栈式中间件)是(多接收器)
异常传播默认中断运行可控制传播行为独立执行,互不干扰
适用场景Agent 运行监控HTTP 请求处理模型层事件响应

FastAPI 的中间件链提供了更优雅的切面编程能力,但它是围绕 HTTP 请求设计的,无法直接套用到 Agent 运行流程中。Django 的信号系统与 Agents SDK 的 Hooks 最为相似,但 Django 信号支持多个接收器并行执行,而 SDK 目前只支持单回调。对于需要复杂 Hook 编排的项目,建议在 SDK 之上自行构建一个轻量级的事件总线。

Agent生命周期Hook调用时序

---
title: Agent生命周期Hook调用时序图
---
sequenceDiagram
    autonumber
    participant U as 用户
    participant R as Runner
    participant A as Agent
    participant H as Hook中间件
    participant T as 工具

    U->>R: 提交输入
    R->>H: 触发on_start
    H-->>R: 初始化Span/日志
    R->>A: 调用Agent
    A->>H: 触发on_tool_start
    H-->>A: 记录工具调用开始
    A->>T: 执行工具
    T-->>A: 返回结果
    A->>H: 触发on_tool_end
    H-->>A: 记录工具调用耗时
    A->>H: 触发on_handoff
    H-->>A: 记录路由决策
    A-->>R: 返回最终输出
    R->>H: 触发on_end
    H-->>R: 结束Span/释放资源
    R-->>U: 返回响应

分布式追踪上下文传递与OpenTelemetry集成

在生产环境中,单个用户请求可能触发多个Agent的串行或并行调用,每个Agent又会执行多个工具。如果没有统一的追踪标识,定位性能瓶颈或故障根因将极为困难。Hooks是注入分布式追踪能力的天然切入点——on_start时创建Span,on_end时结束Span,工具调用前后记录子Span。追踪数据应接入Jaeger或Zipkin等可视化平台,便于运维团队快速定位延迟异常点。上下文传递是分布式追踪的关键:当Agent调用另一个Agent时,trace_id和span_id必须跨进程边界传播,否则追踪链会断裂成孤立的片段。

以下是一个将Agents SDK Hooks与OpenTelemetry集成的实现示例:

import json
from typing import Any
from agents import RunContextWrapper
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode


tracer = trace.get_tracer("agents.sdk")


class TracingHooks:
    """OpenTelemetry分布式追踪Hook集合。"""
    
    def __init__(self):
        self._spans: dict[str, trace.Span] = {}
        self._tool_spans: dict[str, trace.Span] = {}
    
    async def on_start(self, ctx: RunContextWrapper[Any], agent: Any):
        span = tracer.start_span(
            name=f"agent.{agent.name}",
            attributes={
                "agent.name": agent.name,
                "run.id": ctx.run_id,
                "context.type": type(ctx.context).__name__,
            }
        )
        self._spans[ctx.run_id] = span
    
    async def on_end(self, ctx: RunContextWrapper[Any], agent: Any,
                     output: Any):
        span = self._spans.pop(ctx.run_id, None)
        if span:
            if hasattr(output, "content"):
                span.set_attribute("output.length", len(output.content))
            span.set_status(Status(StatusCode.OK))
            span.end()
    
    async def on_tool_start(self, ctx: RunContextWrapper[Any], tool: Any,
                            inputs: dict):
        parent_span = self._spans.get(ctx.run_id)
        if parent_span:
            ctx_token = trace.set_span_in_context(parent_span)
            with tracer.start_as_current_span(
                name=f"tool.{tool.name}",
                context=ctx_token,
                attributes={"tool.inputs": json.dumps(inputs)}
            ) as span:
                self._tool_spans[f"{ctx.run_id}:{tool.name}"] = span
    
    async def on_tool_end(self, ctx: RunContextWrapper[Any], tool: Any,
                          output: Any):
        span = self._tool_spans.pop(f"{ctx.run_id}:{tool.name}", None)
        if span:
            if isinstance(output, str):
                span.set_attribute("output.length", len(output))
            span.set_status(Status(StatusCode.OK))
            span.end()
    
    async def on_handoff(self, ctx: RunContextWrapper[Any], from_agent: Any,
                         to_agent: Any):
        span = self._spans.get(ctx.run_id)
        if span:
            span.add_event(
                "handoff",
                {
                    "from.agent": from_agent.name,
                    "to.agent": to_agent.name,
                }
            )

Hook性能开销量化与异步资源池管理

Hook中的逻辑会阻塞Agent的主执行流程,因此其性能开销必须被量化监控。常见的性能陷阱包括:同步数据库查询、未设置超时的外部HTTP调用、以及大数据结构的深拷贝操作。建议为每个Hook设置独立的超时策略,并通过资源池复用昂贵的连接对象。当Hook执行时间超过Agent本身运行时间的10%时,就应考虑将逻辑拆分到异步任务队列中处理。异常隔离同样重要:一个监控Hook的bug不应导致用户请求失败,所有Hook内部必须包裹try/except块,确保即使追踪系统本身故障也不会影响主业务流程。

以下是一个带超时保护和连接池复用的Hook包装器:

import asyncio
import time
from typing import Callable, Any
from concurrent.futures import ThreadPoolExecutor


class PerformanceAwareHook:
    """性能感知Hook包装器,支持超时控制和资源池复用。"""
    
    def __init__(self, max_workers: int = 4,
                 default_timeout: float = 1.0):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.default_timeout = default_timeout
        self._metrics: dict[str, list[float]] = {}
    
    def wrap(self, hook_fn: Callable, name: str, timeout: float = None):
        timeout = timeout or self.default_timeout
        
        async def wrapper(ctx, *args, **kwargs):
            start = time.perf_counter()
            try:
                if not asyncio.iscoroutinefunction(hook_fn):
                    loop = asyncio.get_event_loop()
                    await asyncio.wait_for(
                        loop.run_in_executor(
                            self.executor, hook_fn, ctx, *args, **kwargs
                        ),
                        timeout=timeout
                    )
                else:
                    await asyncio.wait_for(
                        hook_fn(ctx, *args, **kwargs), timeout=timeout
                    )
            except asyncio.TimeoutError:
                print(f"Hook '{name}' timed out after {timeout}s")
            except Exception as e:
                print(f"Hook '{name}' failed: {e}")
            finally:
                duration = time.perf_counter() - start
                self._metrics.setdefault(name, []).append(duration)
        
        return wrapper
    
    def report(self):
        for name, durations in self._metrics.items():
            avg = sum(durations) / len(durations)
            p99 = sorted(durations)[int(len(durations) * 0.99)]                 if len(durations) > 1 else avg
            print(f"Hook '{name}': avg={avg:.4f}s, p99={p99:.4f}s, "
                  f"calls={len(durations)}")

Hook注册顺序与并发安全

当多个模块各自注册Hook时,执行顺序可能因导入时序而变得不可预期。这在生产环境中是危险的:如果一个负责获取锁的Hook在释放锁的Hook之后执行,就会导致死锁。解决方案是在一个中心化的配置模块中显式定义Hook的执行顺序,使用composite_hook模式将多个回调组合成一个有序链。对于共享可变状态(如计数器、缓存),必须使用asyncio.Lock进行保护,避免并发执行导致的数据竞争。

要点总结

  1. 使用run_id作为Span的键,确保同一运行周期内的Span可正确关联;工具调用Span应作为Agent Span的子Span,形成完整的调用链。
  2. Handoff事件建议以Span Event形式记录,而非创建新Span,避免过度碎片化;所有Span必须在on_end中正确结束,否则会导致追踪数据泄漏和内存累积。
  3. Hook性能开销必须通过超时机制加以约束;同步操作应投递到线程池执行,避免阻塞asyncio事件循环;建议设置默认超时不超过1秒。
  4. 建议为每个Hook配备独立的性能指标收集,定期输出avg和p99延迟;当Hook延迟超过Agent总耗时10%时应触发告警。
  5. 资源池的max_workers应根据CPU核心数和外部服务并发能力调整,避免线程过多导致上下文切换开销过大;异常隔离是生产环境的硬性要求。
  6. Hook注册顺序应中心化管控,共享状态必须加锁保护;composite_hook模式是控制多回调执行顺序的最佳实践。

生产环境部署与性能优化

Hook 链编排的实践要点

将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。

具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。

生命周期事件追踪的关键指标

监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。

除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。

异步 Hook 性能的架构考量

随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。

在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。

运维团队的协作建议

技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。

在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。

Hooks 的性能开销容易被忽视。如果在 Hook 中执行了同步数据库查询或外部 HTTP 调用,整个 Agent 的运行会被阻塞。建议对所有 Hook 操作进行异步化改造。

Hook 的调试难度较高,因为问题可能出现在任何生命周期阶段。建议为每个 Hook 配备独立的单元测试,并在测试环境中模拟各种边界条件(如空输入、超长输入、特殊字符)。

Hook 的注册顺序会影响执行结果。如果多个 Hook 操作共享资源(如数据库连接),建议按照获取锁、执行业务、释放锁的顺序排列,避免因顺序不当导致的死锁或资源泄漏。
Hook 的测试容易被忽视。由于 Hook 在 Agent 执行流程中触发,单元测试需要模拟完整的 RunContext。建议为每个 Hook 编写独立的异步测试用例,覆盖正常和异常路径。