概述
OpenTelemetry 集成与 Span 上下文传播的分布式追踪
Tracing 系统是现代分布式应用的"黑匣子",它记录了请求在系统中的完整流转路径。在 Agents SDK 中,Tracing 不仅记录了 LLM 调用和工具执行,还覆盖了 Guardrails 校验、Handoff 交接和 Session 状态变更等 Agent 特有的生命周期事件。
从 OpenTelemetry 标准来看,一个 Trace 由多个 Span 组成,Span 之间通过 parent_span_id 形成树状结构。Agents SDK 的自动追踪会将以下事件映射为 Span:
- Agent 运行(Run):根 Span,覆盖从输入到最终输出的完整周期。
- LLM 调用(Generation):子 Span,记录模型请求/响应的延迟和 token 消耗。
- 工具执行(Tool Execution):子 Span,记录工具名称、参数、结果和耗时。
- Handoff(Transfer):子 Span,记录交接的源 Agent、目标 Agent 和上下文大小。
- Guardrails(Validation):子 Span,记录校验结果和是否触发 Tripwire。
Span 上下文传播是指当一个异步操作(如工具调用)发生时,当前 Span 的上下文需要被传递到子任务中,以确保子任务产生的 Span 能正确关联到父 Span。在 Python 的异步环境中,这通常通过 contextvars 实现。Agents SDK 内部已经处理了大部分传播逻辑,但如果你在自己的工具或 Hook 中启动了额外的 asyncio 任务,可能需要手动传播 Span 上下文。
采样策略(Sampling)是生产级追踪系统的另一个关键配置。全量追踪在调试时很有价值,但在高流量生产环境中会产生大量的数据和存储成本。常见的采样策略包括:
- 头部采样:在请求入口处随机决定是否采样(如 1% 的请求被追踪)。
- 尾部采样:先收集所有 Span,在请求完成后再根据某些条件(如延迟超过阈值、发生错误)决定是否保留。
- 基于特征的采样:只追踪特定用户、特定 Agent 或包含特定关键词的请求。
内置追踪:trace、agent_span、generation_span、function_span 的自动采集与查看。
正文
相关阅读
参考文档
完整实战示例:自定义追踪采样与指标导出
以下示例展示了如何在生产环境中实现一个支持条件采样、延迟阈值告警和自定义标签的追踪系统:
import asyncio
import time
import random
from dataclasses import dataclass
from typing import Any
from agents import Agent, Runner, function_tool, RunContextWrapper
@dataclass
class TraceConfig:
sample_rate: float = 0.1 # 10% 采样
slow_threshold_ms: float = 2000.0
error_sample_rate: float = 1.0 # 错误全量采样
class ProductionTracer:
"""生产级追踪器:采样 + 指标收集。"""
def __init__(self, config: TraceConfig):
self.config = config
self.metrics = {
"total_runs": 0,
"sampled_runs": 0,
"slow_runs": 0,
"error_runs": 0,
"total_latency_ms": 0.0,
}
def should_sample(self, is_error: bool = False) -> bool:
if is_error and self.config.error_sample_rate >= 1.0:
return True
return random.random() < self.config.sample_rate
async def trace_run(self, agent: Agent, user_input: str, context: Any = None) -> dict:
start = time.perf_counter()
self.metrics["total_runs"] += 1
trace_id = f"trace_{int(start * 1000)}_{random.randint(1000, 9999)}"
sampled = self.should_sample()
spans = []
if sampled:
spans.append({"name": "run_start", "timestamp": start, "trace_id": trace_id})
try:
result = await Runner.run(agent, user_input, context=context)
latency_ms = (time.perf_counter() - start) * 1000
self.metrics["total_latency_ms"] += latency_ms
is_slow = latency_ms > self.config.slow_threshold_ms
is_error = False
if is_slow:
self.metrics["slow_runs"] += 1
# 慢请求强制采样
if not sampled:
sampled = True
spans.append({"name": "run_start", "timestamp": start, "trace_id": trace_id})
if sampled:
spans.append({
"name": "run_complete",
"latency_ms": latency_ms,
"turns": len(result.raw_responses),
"output_length": len(result.final_output),
"trace_id": trace_id,
})
self.metrics["sampled_runs"] += 1
self._export_spans(spans)
return {"success": True, "output": result.final_output, "trace_id": trace_id}
except Exception as e:
latency_ms = (time.perf_counter() - start) * 1000
self.metrics["error_runs"] += 1
if self.should_sample(is_error=True):
spans.append({
"name": "run_error",
"error": str(e),
"latency_ms": latency_ms,
"trace_id": trace_id,
})
self._export_spans(spans)
raise
def _export_spans(self, spans: list[dict]):
"""导出 Span 到日志或 OTLP Collector。"""
for span in spans:
print(f"[TRACE] {span}")
def get_metrics(self) -> dict:
avg_latency = (
self.metrics["total_latency_ms"] / self.metrics["total_runs"]
if self.metrics["total_runs"] > 0 else 0)
分布式追踪的调用链模型
下图展示了多 Agent 协作场景下的分布式追踪调用链结构:
mermaid
flowchart TD
T[Trace: 用户请求] --> S1[Span: Router Agent]
S1 --> S2[Span: 工具调用 weather_tool]
S1 --> S3[Span: Handoff 到 Expert Agent]
S3 --> S4[Span: LLM 调用 gpt-5]
S3 --> S5[Span: 工具调用 query_db]
S4 --> S6[Span: Token 计费]
S5 --> S7[Span: 结果缓存查询]
style T fill:#e8d5b5,stroke:#5a4a3a,stroke-width:2px
style S1 fill:#c5e0b4,stroke:#5a4a3a
style S3 fill:#c5e0b4,stroke:#5a4a3a
每个 Span 记录了该操作的耗时、输入输出摘要和元数据。通过调用链,可以清晰地看到请求在各组件之间的传播路径和性能瓶颈所在。
## 追踪数据的存储与查询优化
追踪数据的特点是写入量大、查询模式多样(按 Trace ID 查询、按时间范围查询、按服务名查询)。选择合适的存储方案至关重要。
**存储方案对比**:
| 方案 | 写入性能 | 查询灵活性 | 成本 | 适用规模 |
|------|----------|------------|------|----------|
| Jaeger + Cassandra | 高 | 中 | 高 | 大型企业 |
| Zipkin + Elasticsearch | 高 | 高 | 中高 | 中型企业 |
| 自研 + ClickHouse | 极高 | 高 | 中 | 大型高并发 |
| 日志文件 + grep | 低 | 低 | 极低 | 小型项目 |
对于 Agents SDK 应用,推荐采用热数据 + 冷数据分层存储策略:最近 24 小时的追踪数据存储在内存或 Redis 中,支持毫秒级查询;历史数据按天批量压缩后存入对象存储(如 S3),查询时按需解压。
**采样策略的进阶技巧**:
除了固定比例采样,还可以实现错误偏置采样:正常请求按 1% 采样,但错误请求 100% 采样。
```python
def smart_sample(trace_context) -> bool:
if trace_context.has_error:
return True
if trace_context.duration_ms > 5000:
return True
return random.random() < 0.01追踪系统不仅是调试工具,也是性能优化的数据来源。定期分析追踪数据中的慢 Span 分布,可以识别出系统的性能瓶颈和优化机会。追踪数据的采样与成本控制同样重要。全量追踪在流量高峰时会产生海量数据,存储和查询成本不可忽略。采样策略需要在数据完整性和成本之间取得平衡:头部采样按固定比例随机采样,实现简单但可能遗漏关键请求;尾部采样先收集所有追踪数据,在请求完成后根据条件决定是否保留;自适应采样动态调整采样率,错误率上升时提高采样比例,平稳期降低。追踪数据应设置合理的保留期(如 30 天),超过期限后归档到低成本存储或聚合为指标,释放主存储空间。
常见问题与调试
问题一:Trace 中缺少自定义工具的 Span
如果自定义工具的执行没有被记录到 Trace 中,可能是因为工具内部的异步任务没有正确传播 Span 上下文。排查方法:
- 检查工具函数是否使用了
async def,同步函数在某些追踪实现中可能无法被正确包裹。 - 如果在工具内部创建了新的
asyncio.Task,确保使用trace.get_current_span()手动传递上下文。 - 升级 SDK 到最新版本,早期版本可能对某些工具类型(如 MCP 工具)的追踪支持不完整。
问题二:Trace 数据量过大导致存储成本飙升
全量追踪在大型系统中可能产生每天数 GB 的数据。成本控制策略:
- 实施头部采样,只追踪 1%-5% 的请求。
- 对成功且快速的请求,只保留根 Span,丢弃详细的子 Span。
- 设置 Trace 的 TTL(如 7 天),过期后自动归档或删除。
- 使用自建的低成本存储(如 ClickHouse、Elasticsearch)替代 SaaS 追踪平台。
问题三:分布式系统中的 Trace 断链
当 Agent 调用外部微服务或 MCP 工具时,Trace 可能在服务边界处断裂。修复方法:
- 在跨服务调用时,通过 HTTP Header(如
traceparent)传递 Trace ID 和 Span ID。 - 确保所有下游服务都集成了 OpenTelemetry 并正确解析传入的上下文。
- 对于 MCP 工具,在工具协议中增加 Trace 上下文传递的扩展字段。
与其他方案对比
| 维度 | Agents SDK Tracing | LangSmith | OpenTelemetry + Jaeger |
|---|---|---|---|
| 集成深度 | 原生(自动记录 Agent 事件) | 原生(针对 LLM 优化) | 通用(需手动埋点) |
| 可视化 | OpenAI Trace Viewer | LangSmith UI | Jaeger UI / Grafana |
| 成本 | 包含在 SDK 使用中 | 按追踪量收费 | 自建成本 |
| 生态兼容 | OpenAI 生态 | LangChain 生态 | 全栈通用 |
LangSmith 是目前 LLM 应用追踪领域最专业的平台,它提供了提示词版本管理、A/B 测试、反馈收集等高级功能,与 LangChain 生态深度集成。Agents SDK 的内置追踪则更适合已经在使用 OpenAI 平台的团队,无需额外的基础设施投入。对于需要统一追踪全栈(前端、后端、数据库、Agent)的团队,自建 OpenTelemetry + Jaeger 是最终方案,但需要更多的运维投入。
深度技术:Span 上下文传播与装饰器模式
在复杂的 Agent 系统中,追踪逻辑不应侵入业务代码。通过装饰器模式(Decorator Pattern),我们可以在不修改原有函数的前提下,为其自动附加追踪能力。Python 的 @wraps 和 contextvars 为这种无侵入埋点提供了语言级支持。
graph TD
A[用户请求] --> B[trace_run 根Span]
B --> C[agent_span Agent执行]
C --> D[generation_span LLM调用]
C --> E[function_span 工具执行]
D --> F[上下文传播
contextvars]
E --> F
F --> G[子任务异步执行]
G --> H[Span上报 OTLP]
style B fill:#e1f5fe
style F fill:#fff3e0装饰器的核心思想是将 Span 的生命周期管理封装在一个可复用的函数包装器中。当装饰器应用于 Agent 的 run 方法或工具函数时,它会自动创建对应类型的 Span,记录开始时间、输入参数,并在函数返回或抛出异常时完成 Span 的结束和上报。这种模式遵循开闭原则:对扩展开放(添加新的追踪类型),对修改封闭(不改动原有业务逻辑)。
以下是一个基于装饰器模式的通用追踪包装器实现,支持自动推断 Span 类型、动态采样和异常标记:
import functools
import time
import contextvars
from typing import Callable, Any
# 使用 contextvars 实现 Span 上下文在异步任务间的传播
current_span_id: contextvars.ContextVar[str] = contextvars.ContextVar("span_id")
def trace_span(span_type: str = "function", sample_rate: float = 1.0):
"""通用追踪装饰器:自动创建 Span 并传播上下文。"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
import random
if random.random() > sample_rate:
return await func(*args, **kwargs)
span_id = f"span_{span_type}_{int(time.time() * 1000)}"
token = current_span_id.set(span_id)
start = time.perf_counter()
try:
result = await func(*args, **kwargs)
_export_span(span_id, span_type, start, status="ok")
return result
except Exception as e:
_export_span(span_id, span_type, start, status="error", error=str(e))
raise
finally:
current_span_id.reset(token)
@functools.wraps(func)
def sync_wrapper(*args, **kwargs) -> Any:
import random
if random.random() > sample_rate:
return func(*args, **kwargs)
span_id = f"span_{span_type}_{int(time.time() * 1000)}"
token = current_span_id.set(span_id)
start = time.perf_counter()
try:
result = func(*args, **kwargs)
_export_span(span_id, span_type, start, status="ok")
return result
except Exception as e:
_export_span(span_id, span_type, start, status="error", error=str(e))
raise
finally:
current_span_id.reset(token)
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
def _export_span(span_id: str, span_type: str, start: float, status: str, error: str = None):
latency_ms = (time.perf_counter() - start) * 1000
parent_id = current_span_id.get(None)
print(f"[TRACE] span_id={span_id}, type={span_type}, latency={latency_ms:.2f}ms, status={status}, parent={parent_id}")
# 使用示例:为 Agent 工具添加追踪
@trace_span(span_type="tool", sample_rate=0.5)
async def search_database(query: str) -> list[dict]:
"""模拟数据库查询工具。"""
await asyncio.sleep(0.05)
return [{"id": 1, "content": f"Result for {query}"}]这段代码的关键在于 contextvars.ContextVar 的使用。在 Python 异步环境中,每个 asyncio.Task 都有自己独立的上下文拷贝,因此通过 current_span_id.set() 设置的父 Span ID 会自动跟随到子任务中,无需手动传递参数。这是 Agents SDK 内部实现 Span 上下文传播的底层机制。
装饰器模式与 SDK 内置追踪的关系是互补的:内置追踪覆盖 SDK 自身的生命周期事件(Run、Generation、Handoff),而自定义装饰器则可以覆盖你的业务代码、外部 API 调用和自定义工具。将两者结合,可以构建出覆盖全链路的统一追踪视图。
在分布式环境中,Span 上下文传播还需要考虑跨进程边界的问题。当你的 Agent 调用外部微服务或 MCP 工具时,Trace ID 和 Span ID 需要通过标准化的载体传递。OpenTelemetry 定义的 traceparent 头(格式为 00-trace_id-span_id-flags)是目前业界通用的解决方案。在 Agents SDK 中,如果你通过 HTTP 调用外部工具,应在请求头中注入当前 Span 的上下文:
import httpx
from agents.tracing import get_current_span
async def call_external_api(url: str, payload: dict) -> dict:
span = get_current_span()
headers = {}
if span:
# 将当前 Span 上下文注入 HTTP 请求头
headers["traceparent"] = f"00-{span.trace_id}-{span.span_id}-01"
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, headers=headers)
return response.json()这段代码展示了上下文传播(Context Propagation) 的标准做法。下游服务在收到请求后,解析 traceparent 头并恢复 Span 上下文,后续产生的 Span 就能自动关联到当前的 Trace 树中。对于不支持 OpenTelemetry 的遗留系统,你也可以通过自定义头(如 x-trace-id)实现简化的传播,只需要确保上下游的 ID 格式兼容即可。上下文传播的质量直接决定了分布式 Trace 的完整性,是生产级追踪系统中最容易被忽视但影响最大的环节。建议在每个跨服务调用点都添加上下文注入和提取的逻辑,并将这部分代码封装为可复用的中间件或请求拦截器,以避免遗漏。
生产环境部署与性能优化
追踪采样策略的实践要点
将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。
具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。
Span 异常检测的关键指标
监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。
除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。
海量追踪数据处理的架构考量
随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。
在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。
运维团队的协作建议
技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。
在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。
Tracing 数据的存储成本随流量线性增长。建议实施智能采样策略:对正常请求采样 1%,对错误请求采样 100%,并在采样决策中融入用户等级(VIP 用户全量追踪)。
追踪数据不仅用于故障排查,也是性能优化的重要依据。通过分析 Span 的耗时分布,可以识别系统中的热点路径(如某个工具调用耗时过长),并针对性地进行优化。