自定义追踪处理器

📑 目录

概述

批处理优化与异步上报的数据管道设计

自定义追踪处理器的核心挑战在于吞吐量和延迟的权衡。如果每次 Agent 事件都立即同步上报到追踪后端,大量的网络 I/O 会严重拖慢 Agent 的运行速度。生产级的追踪系统通常采用异步批处理架构:事件先被写入内存缓冲区,由一个独立的后台线程或任务定期批量刷写到远端。

这种架构的关键设计参数包括:

  1. 批次大小(Batch Size):每批上报的最大事件数。过大的批次会增加延迟(需要等待更多事件),但能提高吞吐量(减少网络往返)。
  2. 刷新间隔(Flush Interval):即使批次未满,也强制刷新的最大等待时间。这确保了即使流量很低,事件也不会在缓冲区中滞留太久。
  3. 缓冲区上限(Buffer Capacity):当缓冲区满且网络不可用时,新事件的取舍策略(丢弃最老的、丢弃最新的、或阻塞等待)。
  4. 重试策略:上报失败时的退避策略(固定间隔、线性退避、指数退避)。

数据脱敏是自定义处理器的另一个重要职责。当追踪数据包含用户输入、工具参数或模型输出时,其中可能嵌套着 PII 或商业敏感信息。在将数据发送到第三方平台(如 Datadog、New Relic)之前,必须对敏感字段进行脱敏或哈希处理。

从设计模式来看,自定义追踪处理器可以实现观察者模式(Observer Pattern):SDK 在运行过程中产生事件,处理器订阅感兴趣的事件类型并进行处理。这种模式将追踪逻辑与业务逻辑完全解耦,使得追踪系统的演进不会影响 Agent 的核心行为。

BatchTraceProcessor、自定义 TraceProcessor:将追踪数据发送到自有监控系统。

正文

相关阅读

参考文档

完整实战示例:高性能异步追踪处理器

以下示例展示了如何构建一个支持批处理、异步上报、数据脱敏和背压控制的自定义追踪处理器:

import asyncio
import time
import copy
from dataclasses import dataclass
from typing import Any
from collections import deque
from agents import Agent, Runner, function_tool


@dataclass
class TracingEvent:
    event_type: str
    timestamp: float
    run_id: str
    agent_name: str
    details: dict


class AsyncBatchTracingProcessor:
    """异步批处理追踪处理器。"""

    def __init__(
        self,
        batch_size: int = 50,
        flush_interval: float = 5.0,
        max_buffer: int = 1000,
    ):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.max_buffer = max_buffer
        self.buffer: deque[TracingEvent] = deque()
        self._flush_task: asyncio.Task | None = None
        self._shutdown = False
        self._dropped_count = 0

    def start(self):
        self._flush_task = asyncio.create_task(self._flush_loop())

    def stop(self):
        self._shutdown = True
        if self._flush_task:
            self._flush_task.cancel()

    def on_event(self, event: TracingEvent):
        if len(self.buffer) >= self.max_buffer:
            self._dropped_count += 1
            return
        self.buffer.append(event)

    def _sanitize(self, event: TracingEvent) -> TracingEvent:
        """对事件中的敏感字段进行脱敏。"""
        sanitized = copy.deepcopy(event)
        details = sanitized.details
        # 对用户输入进行部分掩码
        if "user_input" in details:
            text = details["user_input"]
            details["user_input"] = text[:10] + "..." + text[-5:] if len(text) > 20 else "[REDACTED]"
        # 对工具参数中的 email 进行掩码
        if "tool_args" in details:
            for key in list(details["tool_args"].keys()):
                if "email" in key.lower():
                    details["tool_args"][key] = "[EMAIL_REDACTED]"
        return sanitized

    async def _flush_loop(self):
        while not self._shutdown:
            await asyncio.sleep(self.flush_interval)
            await self._flush_batch()

    async def _flush_batch(self):
        if not self.buffer:
            return
        batch = []
        while len(batch) < self.batch_size and self.buffer:
            batch.append(self.buffer.popleft())

        sanitized_batch = [self._sanitize(e) for e in batch]
        await self._send_to_backend(sanitized_batch)

    async def _send_to_backend(self, batch: list[TracingEvent]):
        """模拟发送到追踪后端。"""
        try:
            await asyncio.sleep(0.1)  # 模拟网络延迟
            print(f"[TRACES] Flushed {len(batch)} events. Dropped so far: {self._dropped_count}")
        except Exception as e:
            print(f"[TRACES] Send failed: {e}")
            # 失败时重新放入缓冲区(简化处理)
            for event in batch:
                if len(self.buffer) < self.max_buffer:
                    self.buffer.appendleft(event)


# 简单的追踪代理层
class TracedRunner:
    def __init__(self, processor: AsyncBatchTracingProcessor):
        self.processor = processor

    async def run(self, agent: Agent, user_input: str) -> Any:
        run_id = f"run_{int(time.time())}"
        self.processor.on_event(TracingEvent(
            event_type="run_start",
            timestamp=time.time(),
            run_id=run_id,
            agent_name=agent.name,
            details={"user_input": user_input},
        ))

        result = await Runner.run(agent, user_input)

        self.processor.on_event(TracingEvent(
            event_type="run_end",
            timestamp=time.time(),
            run_id=run_id,
            agent_name=agent.name,
            details={"output_length": len(result.final_output), "turns": len(result.raw_responses)},
        ))
        return result


@function_tool
def search_docs(query: str) -> str:
    return f"Docs about {query}: [placeholder]"


async def main():
    processor = AsyncBatchTracingProcessor(batch_size=3, flush_interval=2.0)
    processor.start()

    runner = TracedRunner(processor)
    agent = Agent(name="DocAgent", instructions="Search docs for users.", tools=[search_docs], model="gpt-5-nano")

    for i in range(5):
        await runner.run(agent, f"Query about topic {i}")

    # 等待最终刷新
    await asyncio.sleep(3)
    processor.stop()


if __name__ == "__main__":
    asyncio.run(main())

这个处理器的设计亮点在于背压感知:当缓冲区满时,新事件被丢弃而不是阻塞 Agent 运行。同时,脱敏逻辑确保了即使追踪数据泄露,也不会暴露用户的敏感信息。

追踪处理器的事件驱动架构

下图展示了追踪处理器在 Span 生命周期各阶段接收事件的流程:

flowchart TD
    S[Span 开始] --> P1[Processor.on_span_start]
    P1 --> E[Span 执行中]
    E --> P2[Processor.on_event]
    P2 --> E
    E --> F[Span 结束]
    F --> P3[Processor.on_span_end]
    P3 --> P4{是否缓冲?}
    P4 -->|是| B[写入缓冲区]
    P4 -->|否| D[直接输出]
    B --> FL{缓冲区满或定时触发?}
    FL -->|是| D
    FL -->|否| B
    D --> DEST[(存储目的地)]
    style P3 fill:#e8d5b5,stroke:#5a4a3a
    style B fill:#bdd7ee,stroke:#5a4a3a

事件驱动架构使得追踪处理器可以灵活组合,每个处理器只关注自己关心的事件类型,互不干扰。

构建生产级追踪管道

自定义追踪处理器的真正价值在于构建符合企业需求的追踪管道。以下是一个典型生产环境的追踪管道设计:

管道组件

  1. 日志处理器:将 Span 数据格式化为结构化日志(JSON),输出到 stdout 供日志采集器收集。
  2. 指标聚合器:从 Span 中提取关键指标(P50/P95/P99 延迟、错误率、QPS),实时推送到 Prometheus。
  3. 告警触发器:检测异常模式(如错误率突增、延迟超过 SLA),触发 PagerDuty 或企业微信告警。
  4. 存储归档器:将原始 Span 数据批量写入 ClickHouse 或 BigQuery,支持 Ad-hoc 查询和历史分析。

背压处理

当追踪数据量超过下游处理能力时,系统可能出现内存溢出或数据丢失。背压(Backpressure)机制确保系统在高负载下优雅降级:

class BackpressureProcessor:
    def __init__(self, max_queue_size: int = 10000):
        self.queue = asyncio.Queue(maxsize=max_queue_size)
        self.dropped = 0

    async def on_span_end(self, span: Span):
        try:
            self.queue.put_nowait(span)
        except asyncio.QueueFull:
            self.dropped += 1
            if self.dropped % 1000 == 1:
                logger.warning(f"追踪队列已满,已丢弃 {self.dropped} 个 Span")

处理器链的故障隔离

管道中的某个处理器失败不应影响其他处理器。例如,存储归档器因网络中断暂时不可用时,日志处理器和告警触发器应继续正常工作。

async def safe_process(processor, span):
    try:
        await processor.on_span_end(span)
    except Exception as e:
        logger.error(f"处理器 {processor} 失败: {e}")
        dead_letter_queue.put((processor, span))

这种设计确保了追踪系统的可靠性——它本身不应成为系统的单点故障。自定义处理器应实现优雅关闭,在进程终止前 flush 所有缓冲数据,避免追踪丢失。典型处理器组合包括:日志处理器、指标处理器、告警处理器和存储处理器。通过链式组合,每个处理器负责不同的关注点,实现模块化的追踪架构。

常见问题与调试

问题一:追踪事件顺序错乱

由于批处理和异步发送,事件到达后端的顺序可能与产生顺序不同。如果后端依赖事件顺序(如计算持续时间),就会导致错误。解决方案:

  1. 在每个事件中包含精确的时间戳(毫秒级),后端按时间戳排序而非到达顺序。
  2. 使用单调时钟(time.monotonic())计算持续时间,而非绝对时间差。
  3. 在批次内部保持事件的产生顺序,只在批次之间允许乱序。

问题二:进程退出时缓冲区中的事件丢失

如果应用崩溃或被优雅关闭时,缓冲区中还有未发送的事件,这些数据会丢失。缓解措施:

  1. 在进程退出信号(SIGTERM)处理函数中调用 processor.stop() 并执行一次同步刷新。
  2. 将缓冲区持久化到本地磁盘(如 SQLite),启动时恢复并重新发送。
  3. 对于关键事件,提供同步发送的选项(牺牲性能换取可靠性)。

问题三:脱敏规则遗漏导致数据泄露

追踪数据中的敏感信息可能以意想不到的形式出现(如嵌套在 JSON 字符串中)。安全建议:

  1. 维护一个敏感字段的中央注册表,所有追踪处理器共享同一套脱敏规则。
  2. 使用正则表达式进行深度扫描,而不仅仅是字段名匹配。
  3. 在开发环境中运行"敏感数据扫描"测试,模拟各种攻击性的输入数据,验证脱敏效果。

与其他方案对比

维度自定义 SDK ProcessorOpenTelemetry CollectorLangSmith 自动上报
控制力极高(完全自定义)高(配置化)低(固定行为)
开发成本
性能优化空间
数据主权完全可控可控(自建 Collector)依赖第三方

OpenTelemetry Collector 是一个优秀的中间层选择:它在应用和最终存储之间提供了缓冲、批处理、采样和转换能力,且完全通过配置管理,无需编写代码。如果你的团队已经在使用 OpenTelemetry 监控其他服务,将 Agents SDK 的追踪数据接入 Collector 是最自然的扩展路径。自定义 SDK Processor 则适合对追踪数据有高度定制需求(如特殊的脱敏算法、与内部审计系统的集成)的场景。

深度技术:责任链模式与多级处理管道

当追踪事件需要经过多个处理阶段(如脱敏、格式化、采样、压缩、上报)时,责任链模式(Chain of Responsibility Pattern) 是一种优雅的架构选择。每个处理器只负责一个单一的职责,事件依次流经处理链,直到被最终消费或丢弃。

graph LR
    A[原始追踪事件] --> B[脱敏处理器]
    B --> C[采样处理器]
    C --> D[格式化处理器]
    D --> E[压缩处理器]
    E --> F[上报处理器]
    F --> G[后端存储]
    B -.丢弃.-> H[审计日志]
    style B fill:#ffebee
    style F fill:#e8f5e9

责任链模式的优势在于可组合性可测试性。你可以像搭积木一样组合不同的处理器,根据环境需求灵活调整处理链的组成。例如,开发环境可以跳过脱敏和压缩以保留完整信息,而生产环境则启用全链路处理。每个处理器都是独立的单元测试目标,不需要依赖其他处理器或真实后端。

以下是一个基于责任链模式的多级追踪处理器实现,支持条件短路(某个处理器丢弃事件后终止链条)和异步并发处理:

from abc import ABC, abstractmethod
from typing import Any
from dataclasses import dataclass


@dataclass
class TraceEvent:
    event_type: str
    payload: dict
    metadata: dict


class TraceProcessor(ABC):
    """追踪处理器抽象基类。"""

    def __init__(self, next_processor: "TraceProcessor" = None):
        self._next = next_processor

    @abstractmethod
    async def process(self, event: TraceEvent) -> TraceEvent | None:
        """处理事件,返回处理后的事件或 None 表示丢弃。"""
        pass

    async def handle(self, event: TraceEvent) -> None:
        result = await self.process(event)
        if result is not None and self._next is not None:
            await self._next.handle(result)


class SanitizationProcessor(TraceProcessor):
    """脱敏处理器:对敏感字段进行掩码。"""

    async def process(self, event: TraceEvent) -> TraceEvent | None:
        payload = dict(event.payload)
        if "user_email" in payload:
            email = payload["user_email"]
            payload["user_email"] = email[:2] + "***@" + email.split("@")[-1]
        return TraceEvent(event.event_type, payload, event.metadata)


class SamplingProcessor(TraceProcessor):
    """采样处理器:按策略决定是否继续传递。"""

    def __init__(self, sample_rate: float, next_processor: TraceProcessor = None):
        super().__init__(next_processor)
        self.sample_rate = sample_rate

    async def process(self, event: TraceEvent) -> TraceEvent | None:
        import random
        if event.metadata.get("force_trace") or random.random() < self.sample_rate:
            return event
        return None  # 丢弃事件,终止链条


class BatchExportProcessor(TraceProcessor):
    """批量上报处理器:累积事件后批量发送。"""

    def __init__(self, batch_size: int = 50, next_processor: TraceProcessor = None):
        super().__init__(next_processor)
        self.batch_size = batch_size
        self._buffer = []

    async def process(self, event: TraceEvent) -> TraceEvent | None:
        self._buffer.append(event)
        if len(self._buffer) >= self.batch_size:
            await self._flush()
        return event

    async def _flush(self):
        if not self._buffer:
            return
        print(f"[EXPORT] Sending batch of {len(self._buffer)} events")
        self._buffer.clear()


# 组装处理链:脱敏 -> 采样 -> 批量上报
pipeline = SanitizationProcessor(
    SamplingProcessor(0.1, BatchExportProcessor(batch_size=10))
)

责任链模式与简单的批处理上报相比,提供了更强的扩展性。当未来需要增加新的处理阶段(如事件聚合、异常检测、成本计算)时,只需实现一个新的 TraceProcessor 子类并插入链条中的适当位置,无需改动现有代码。这种架构特别适合构建企业级的追踪平台,其中不同团队可能负责不同的处理阶段。

在实际生产环境中,处理链的可靠性同样重要。当某个处理器失败时,应有明确的降级策略。例如,脱敏处理器失败不应阻止事件上报(可以标记为"未脱敏"后继续传递),而采样处理器失败则应保守地选择全量采样(避免丢失关键故障信息)。这种故障降级(Fail-safe) 设计可以通过在处理器的 process 方法中增加异常捕获和默认行为来实现:

class ResilientSamplingProcessor(TraceProcessor):
    """具备故障降级能力的采样处理器。"""

    async def process(self, event: TraceEvent) -> TraceEvent | None:
        try:
            if event.metadata.get("force_trace"):
                return event
            import random
            return event if random.random() < self.sample_rate else None
        except Exception:
            # 采样逻辑失败时,保守策略:保留事件
            print("[WARN] Sampling failed, preserving event")
            return event

此外,处理链的性能监控也值得关注。建议在每个处理器前后记录时间戳,计算出每个阶段的处理延迟。如果某个处理器的延迟持续升高,可能意味着其内部逻辑存在性能瓶颈(如脱敏正则过于复杂、批量上报的网络超时)。通过为处理链添加可观测性埋点,可以及时发现并优化性能热点,确保追踪系统本身不会成为应用的瓶颈。处理链的延迟应控制在 Agent 总运行时间的 1% 以内,这是生产环境中的一条实用经验法则。

生产环境部署与性能优化

处理器插件化的实践要点

将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。

具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。

上报队列深度的关键指标

监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。

除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。

批处理优化的架构考量

随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。

在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。

运维团队的协作建议

技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。

在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。