Tracing 追踪系统

📑 目录

概述

OpenTelemetry 集成与 Span 上下文传播的分布式追踪

Tracing 系统是现代分布式应用的"黑匣子",它记录了请求在系统中的完整流转路径。在 Agents SDK 中,Tracing 不仅记录了 LLM 调用和工具执行,还覆盖了 Guardrails 校验、Handoff 交接和 Session 状态变更等 Agent 特有的生命周期事件。

从 OpenTelemetry 标准来看,一个 Trace 由多个 Span 组成,Span 之间通过 parent_span_id 形成树状结构。Agents SDK 的自动追踪会将以下事件映射为 Span:

  1. Agent 运行(Run):根 Span,覆盖从输入到最终输出的完整周期。
  2. LLM 调用(Generation):子 Span,记录模型请求/响应的延迟和 token 消耗。
  3. 工具执行(Tool Execution):子 Span,记录工具名称、参数、结果和耗时。
  4. Handoff(Transfer):子 Span,记录交接的源 Agent、目标 Agent 和上下文大小。
  5. Guardrails(Validation):子 Span,记录校验结果和是否触发 Tripwire。

Span 上下文传播是指当一个异步操作(如工具调用)发生时,当前 Span 的上下文需要被传递到子任务中,以确保子任务产生的 Span 能正确关联到父 Span。在 Python 的异步环境中,这通常通过 contextvars 实现。Agents SDK 内部已经处理了大部分传播逻辑,但如果你在自己的工具或 Hook 中启动了额外的 asyncio 任务,可能需要手动传播 Span 上下文。

采样策略(Sampling)是生产级追踪系统的另一个关键配置。全量追踪在调试时很有价值,但在高流量生产环境中会产生大量的数据和存储成本。常见的采样策略包括:

  • 头部采样:在请求入口处随机决定是否采样(如 1% 的请求被追踪)。
  • 尾部采样:先收集所有 Span,在请求完成后再根据某些条件(如延迟超过阈值、发生错误)决定是否保留。
  • 基于特征的采样:只追踪特定用户、特定 Agent 或包含特定关键词的请求。

内置追踪:trace、agent_span、generation_span、function_span 的自动采集与查看。

正文

相关阅读

参考文档

完整实战示例:自定义追踪采样与指标导出

以下示例展示了如何在生产环境中实现一个支持条件采样、延迟阈值告警和自定义标签的追踪系统:

import asyncio
import time
import random
from dataclasses import dataclass
from typing import Any
from agents import Agent, Runner, function_tool, RunContextWrapper


@dataclass
class TraceConfig:
    sample_rate: float = 0.1  # 10% 采样
    slow_threshold_ms: float = 2000.0
    error_sample_rate: float = 1.0  # 错误全量采样


class ProductionTracer:
    """生产级追踪器:采样 + 指标收集。"""

    def __init__(self, config: TraceConfig):
        self.config = config
        self.metrics = {
            "total_runs": 0,
            "sampled_runs": 0,
            "slow_runs": 0,
            "error_runs": 0,
            "total_latency_ms": 0.0,
        }

    def should_sample(self, is_error: bool = False) -> bool:
        if is_error and self.config.error_sample_rate >= 1.0:
            return True
        return random.random() < self.config.sample_rate

    async def trace_run(self, agent: Agent, user_input: str, context: Any = None) -> dict:
        start = time.perf_counter()
        self.metrics["total_runs"] += 1
        trace_id = f"trace_{int(start * 1000)}_{random.randint(1000, 9999)}"
        sampled = self.should_sample()

        spans = []
        if sampled:
            spans.append({"name": "run_start", "timestamp": start, "trace_id": trace_id})

        try:
            result = await Runner.run(agent, user_input, context=context)
            latency_ms = (time.perf_counter() - start) * 1000
            self.metrics["total_latency_ms"] += latency_ms

            is_slow = latency_ms > self.config.slow_threshold_ms
            is_error = False

            if is_slow:
                self.metrics["slow_runs"] += 1
                # 慢请求强制采样
                if not sampled:
                    sampled = True
                    spans.append({"name": "run_start", "timestamp": start, "trace_id": trace_id})

            if sampled:
                spans.append({
                    "name": "run_complete",
                    "latency_ms": latency_ms,
                    "turns": len(result.raw_responses),
                    "output_length": len(result.final_output),
                    "trace_id": trace_id,
                })
                self.metrics["sampled_runs"] += 1
                self._export_spans(spans)

            return {"success": True, "output": result.final_output, "trace_id": trace_id}

        except Exception as e:
            latency_ms = (time.perf_counter() - start) * 1000
            self.metrics["error_runs"] += 1
            if self.should_sample(is_error=True):
                spans.append({
                    "name": "run_error",
                    "error": str(e),
                    "latency_ms": latency_ms,
                    "trace_id": trace_id,
                })
                self._export_spans(spans)
            raise

    def _export_spans(self, spans: list[dict]):
        """导出 Span 到日志或 OTLP Collector。"""
        for span in spans:
            print(f"[TRACE] {span}")

    def get_metrics(self) -> dict:
        avg_latency = (
            self.metrics["total_latency_ms"] / self.metrics["total_runs"]
            if self.metrics["total_runs"] > 0 else 0)

分布式追踪的调用链模型

下图展示了多 Agent 协作场景下的分布式追踪调用链结构:
mermaid
flowchart TD
T[Trace: 用户请求] --> S1[Span: Router Agent]
S1 --> S2[Span: 工具调用 weather_tool]
S1 --> S3[Span: Handoff 到 Expert Agent]
S3 --> S4[Span: LLM 调用 gpt-5]
S3 --> S5[Span: 工具调用 query_db]
S4 --> S6[Span: Token 计费]
S5 --> S7[Span: 结果缓存查询]
style T fill:#e8d5b5,stroke:#5a4a3a,stroke-width:2px
style S1 fill:#c5e0b4,stroke:#5a4a3a
style S3 fill:#c5e0b4,stroke:#5a4a3a


每个 Span 记录了该操作的耗时、输入输出摘要和元数据。通过调用链,可以清晰地看到请求在各组件之间的传播路径和性能瓶颈所在。

## 追踪数据的存储与查询优化

追踪数据的特点是写入量大、查询模式多样(按 Trace ID 查询、按时间范围查询、按服务名查询)。选择合适的存储方案至关重要。

**存储方案对比**:

| 方案 | 写入性能 | 查询灵活性 | 成本 | 适用规模 |
|------|----------|------------|------|----------|
| Jaeger + Cassandra | 高 | 中 | 高 | 大型企业 |
| Zipkin + Elasticsearch | 高 | 高 | 中高 | 中型企业 |
| 自研 + ClickHouse | 极高 | 高 | 中 | 大型高并发 |
| 日志文件 + grep | 低 | 低 | 极低 | 小型项目 |

对于 Agents SDK 应用,推荐采用热数据 + 冷数据分层存储策略:最近 24 小时的追踪数据存储在内存或 Redis 中,支持毫秒级查询;历史数据按天批量压缩后存入对象存储(如 S3),查询时按需解压。

**采样策略的进阶技巧**:

除了固定比例采样,还可以实现错误偏置采样:正常请求按 1% 采样,但错误请求 100% 采样。

```python
def smart_sample(trace_context) -> bool:
    if trace_context.has_error:
        return True
    if trace_context.duration_ms > 5000:
        return True
    return random.random() < 0.01

追踪系统不仅是调试工具,也是性能优化的数据来源。定期分析追踪数据中的慢 Span 分布,可以识别出系统的性能瓶颈和优化机会。追踪数据的采样与成本控制同样重要。全量追踪在流量高峰时会产生海量数据,存储和查询成本不可忽略。采样策略需要在数据完整性和成本之间取得平衡:头部采样按固定比例随机采样,实现简单但可能遗漏关键请求;尾部采样先收集所有追踪数据,在请求完成后根据条件决定是否保留;自适应采样动态调整采样率,错误率上升时提高采样比例,平稳期降低。追踪数据应设置合理的保留期(如 30 天),超过期限后归档到低成本存储或聚合为指标,释放主存储空间。

常见问题与调试

问题一:Trace 中缺少自定义工具的 Span

如果自定义工具的执行没有被记录到 Trace 中,可能是因为工具内部的异步任务没有正确传播 Span 上下文。排查方法:

  1. 检查工具函数是否使用了 async def,同步函数在某些追踪实现中可能无法被正确包裹。
  2. 如果在工具内部创建了新的 asyncio.Task,确保使用 trace.get_current_span() 手动传递上下文。
  3. 升级 SDK 到最新版本,早期版本可能对某些工具类型(如 MCP 工具)的追踪支持不完整。

问题二:Trace 数据量过大导致存储成本飙升

全量追踪在大型系统中可能产生每天数 GB 的数据。成本控制策略:

  1. 实施头部采样,只追踪 1%-5% 的请求。
  2. 对成功且快速的请求,只保留根 Span,丢弃详细的子 Span。
  3. 设置 Trace 的 TTL(如 7 天),过期后自动归档或删除。
  4. 使用自建的低成本存储(如 ClickHouse、Elasticsearch)替代 SaaS 追踪平台。

问题三:分布式系统中的 Trace 断链

当 Agent 调用外部微服务或 MCP 工具时,Trace 可能在服务边界处断裂。修复方法:

  1. 在跨服务调用时,通过 HTTP Header(如 traceparent)传递 Trace ID 和 Span ID。
  2. 确保所有下游服务都集成了 OpenTelemetry 并正确解析传入的上下文。
  3. 对于 MCP 工具,在工具协议中增加 Trace 上下文传递的扩展字段。

与其他方案对比

维度Agents SDK TracingLangSmithOpenTelemetry + Jaeger
集成深度原生(自动记录 Agent 事件)原生(针对 LLM 优化)通用(需手动埋点)
可视化OpenAI Trace ViewerLangSmith UIJaeger UI / Grafana
成本包含在 SDK 使用中按追踪量收费自建成本
生态兼容OpenAI 生态LangChain 生态全栈通用

LangSmith 是目前 LLM 应用追踪领域最专业的平台,它提供了提示词版本管理、A/B 测试、反馈收集等高级功能,与 LangChain 生态深度集成。Agents SDK 的内置追踪则更适合已经在使用 OpenAI 平台的团队,无需额外的基础设施投入。对于需要统一追踪全栈(前端、后端、数据库、Agent)的团队,自建 OpenTelemetry + Jaeger 是最终方案,但需要更多的运维投入。

深度技术:Span 上下文传播与装饰器模式

在复杂的 Agent 系统中,追踪逻辑不应侵入业务代码。通过装饰器模式(Decorator Pattern),我们可以在不修改原有函数的前提下,为其自动附加追踪能力。Python 的 @wrapscontextvars 为这种无侵入埋点提供了语言级支持。

graph TD
    A[用户请求] --> B[trace_run 根Span]
    B --> C[agent_span Agent执行]
    C --> D[generation_span LLM调用]
    C --> E[function_span 工具执行]
    D --> F[上下文传播
contextvars] E --> F F --> G[子任务异步执行] G --> H[Span上报 OTLP] style B fill:#e1f5fe style F fill:#fff3e0

装饰器的核心思想是将 Span 的生命周期管理封装在一个可复用的函数包装器中。当装饰器应用于 Agent 的 run 方法或工具函数时,它会自动创建对应类型的 Span,记录开始时间、输入参数,并在函数返回或抛出异常时完成 Span 的结束和上报。这种模式遵循开闭原则:对扩展开放(添加新的追踪类型),对修改封闭(不改动原有业务逻辑)。

以下是一个基于装饰器模式的通用追踪包装器实现,支持自动推断 Span 类型、动态采样和异常标记:

import functools
import time
import contextvars
from typing import Callable, Any

# 使用 contextvars 实现 Span 上下文在异步任务间的传播
current_span_id: contextvars.ContextVar[str] = contextvars.ContextVar("span_id")


def trace_span(span_type: str = "function", sample_rate: float = 1.0):
    """通用追踪装饰器:自动创建 Span 并传播上下文。"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs) -> Any:
            import random
            if random.random() > sample_rate:
                return await func(*args, **kwargs)

            span_id = f"span_{span_type}_{int(time.time() * 1000)}"
            token = current_span_id.set(span_id)
            start = time.perf_counter()

            try:
                result = await func(*args, **kwargs)
                _export_span(span_id, span_type, start, status="ok")
                return result
            except Exception as e:
                _export_span(span_id, span_type, start, status="error", error=str(e))
                raise
            finally:
                current_span_id.reset(token)

        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs) -> Any:
            import random
            if random.random() > sample_rate:
                return func(*args, **kwargs)

            span_id = f"span_{span_type}_{int(time.time() * 1000)}"
            token = current_span_id.set(span_id)
            start = time.perf_counter()

            try:
                result = func(*args, **kwargs)
                _export_span(span_id, span_type, start, status="ok")
                return result
            except Exception as e:
                _export_span(span_id, span_type, start, status="error", error=str(e))
                raise
            finally:
                current_span_id.reset(token)

        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
    return decorator


def _export_span(span_id: str, span_type: str, start: float, status: str, error: str = None):
    latency_ms = (time.perf_counter() - start) * 1000
    parent_id = current_span_id.get(None)
    print(f"[TRACE] span_id={span_id}, type={span_type}, latency={latency_ms:.2f}ms, status={status}, parent={parent_id}")


# 使用示例:为 Agent 工具添加追踪
@trace_span(span_type="tool", sample_rate=0.5)
async def search_database(query: str) -> list[dict]:
    """模拟数据库查询工具。"""
    await asyncio.sleep(0.05)
    return [{"id": 1, "content": f"Result for {query}"}]

这段代码的关键在于 contextvars.ContextVar 的使用。在 Python 异步环境中,每个 asyncio.Task 都有自己独立的上下文拷贝,因此通过 current_span_id.set() 设置的父 Span ID 会自动跟随到子任务中,无需手动传递参数。这是 Agents SDK 内部实现 Span 上下文传播的底层机制。

装饰器模式与 SDK 内置追踪的关系是互补的:内置追踪覆盖 SDK 自身的生命周期事件(Run、Generation、Handoff),而自定义装饰器则可以覆盖你的业务代码、外部 API 调用和自定义工具。将两者结合,可以构建出覆盖全链路的统一追踪视图。

在分布式环境中,Span 上下文传播还需要考虑跨进程边界的问题。当你的 Agent 调用外部微服务或 MCP 工具时,Trace ID 和 Span ID 需要通过标准化的载体传递。OpenTelemetry 定义的 traceparent 头(格式为 00-trace_id-span_id-flags)是目前业界通用的解决方案。在 Agents SDK 中,如果你通过 HTTP 调用外部工具,应在请求头中注入当前 Span 的上下文:

import httpx
from agents.tracing import get_current_span

async def call_external_api(url: str, payload: dict) -> dict:
    span = get_current_span()
    headers = {}
    if span:
        # 将当前 Span 上下文注入 HTTP 请求头
        headers["traceparent"] = f"00-{span.trace_id}-{span.span_id}-01"

    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=payload, headers=headers)
        return response.json()

这段代码展示了上下文传播(Context Propagation) 的标准做法。下游服务在收到请求后,解析 traceparent 头并恢复 Span 上下文,后续产生的 Span 就能自动关联到当前的 Trace 树中。对于不支持 OpenTelemetry 的遗留系统,你也可以通过自定义头(如 x-trace-id)实现简化的传播,只需要确保上下游的 ID 格式兼容即可。上下文传播的质量直接决定了分布式 Trace 的完整性,是生产级追踪系统中最容易被忽视但影响最大的环节。建议在每个跨服务调用点都添加上下文注入和提取的逻辑,并将这部分代码封装为可复用的中间件或请求拦截器,以避免遗漏。

生产环境部署与性能优化

追踪采样策略的实践要点

将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。

具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。

Span 异常检测的关键指标

监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。

除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。

海量追踪数据处理的架构考量

随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。

在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。

运维团队的协作建议

技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。

在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。

Tracing 数据的存储成本随流量线性增长。建议实施智能采样策略:对正常请求采样 1%,对错误请求采样 100%,并在采样决策中融入用户等级(VIP 用户全量追踪)。

追踪数据不仅用于故障排查,也是性能优化的重要依据。通过分析 Span 的耗时分布,可以识别系统中的热点路径(如某个工具调用耗时过长),并针对性地进行优化。