运行状态与恢复

📑 目录

概述

状态序列化格式与检查点机制的幂等性保证

Agent 运行的状态恢复能力是构建长时运行(long-running)系统的基石。当 Agent 执行被中断(如服务重启、人工审批等待、或分布式调度器抢占)时,必须能够从断点恢复,而不是从头开始。这要求运行状态具备可序列化性可重入性

Agents SDK 通过 result.to_state() 提供了状态的序列化能力。这个状态对象通常包含:

  1. 消息历史:当前运行中已产生的所有用户消息、助手消息、工具调用和工具结果。
  2. 活跃 Agent:当前正在处理输入的 Agent 标识。
  3. 上下文对象:运行时的业务上下文(如用户 ID、会话状态)。
  4. 待处理项:如待审批的工具调用、未完成的 Handoff 等。

从检查点(Checkpoint)设计的角度来看,状态的序列化格式需要满足以下要求:

  • 向后兼容:当 SDK 升级或模型切换时,旧版本保存的状态应能被新版本正确加载。
  • 最小化:只保存恢复运行所必需的信息,避免冗余数据导致序列化体积膨胀。
  • 加密敏感字段:如果状态中包含 PII 或业务敏感信息,序列化前应进行字段级加密。

幂等性(Idempotency)是状态恢复的另一个关键属性。如果恢复操作被执行了两次(如网络超时导致客户端重试),结果应与执行一次相同。在 Agent 运行场景中,幂等性主要关注工具调用的副作用:如果某个工具在检查点之前已经执行(如已经发送了邮件),恢复后不应再次执行。实现幂等性的常见方法是为每个工具调用分配唯一的 tool_call_id,并在工具实现中校验该 ID 是否已处理过。

RunState 序列化、to_state/from_state:中断运行的保存、恢复和继续执行。

正文

相关阅读

参考文档

完整实战示例:可恢复的长时运行 Agent 系统

以下示例展示了如何在生产环境中构建一个支持检查点保存、中断恢复和幂等性保证的 Agent 系统:

import asyncio
import json
import time
import hashlib
from dataclasses import dataclass, asdict
from typing import Any
from agents import Agent, Runner, function_tool, RunContextWrapper


# 已处理工具调用的幂等性记录
_processed_tool_calls: set[str] = set()


@function_tool
def send_notification(user_id: str, message: str) -> str:
    """发送通知给用户。具备幂等性保护。"""
    call_id = hashlib.sha256(f"{user_id}:{message}".encode()).hexdigest()[:16]
    if call_id in _processed_tool_calls:
        return f"[IDEMPOTENT] Notification already sent (id={call_id})"
    _processed_tool_calls.add(call_id)
    # 实际发送逻辑
    print(f"[SEND] To {user_id}: {message}")
    return f"Notification sent (id={call_id})"


@function_tool
def query_database(sql: str) -> str:
    """查询数据库。无副作用,天然幂等。"""
    return f"Query result for: {sql[:30]}..."


class CheckpointStore:
    """检查点存储(简化版,生产环境应使用数据库)。"""
    def __init__(self):
        self._checkpoints: dict[str, dict] = {}

    async def save(self, run_id: str, state: dict, metadata: dict) -> None:
        self._checkpoints[run_id] = {
            "state": state,
            "metadata": metadata,
            "saved_at": time.time(),
        }

    async def load(self, run_id: str) -> dict | None:
        cp = self._checkpoints.get(run_id)
        return cp["state"] if cp else None


class ResilientRunner:
    """具备恢复能力的运行器。"""

    def __init__(self, checkpoint_store: CheckpointStore):
        self.store = checkpoint_store

    async def run_or_resume(
        self,
        agent: Agent,
        user_input: str,
        run_id: str | None = None,
        context: Any = None,
    ) -> dict[str, Any]:
        run_id = run_id or f"run_{int(time.time())}_{id(agent)}"

        # 尝试恢复已有状态
        saved_state = await self.store.load(run_id)
        if saved_state:
            print(f"[RESUME] Restoring run {run_id} from checkpoint")
            # 简化:实际应使用 Runner.run_continuation 或类似机制
            result = await Runner.run(agent, user_input, context=context)
        else:
            print(f"[START] New run {run_id}")
            result = await Runner.run(agent, user_input, context=context)

        # 保存检查点
        state = result.to_state() if hasattr(result, "to_state") else {"output": result.final_output}
        await self.store.save(run_id, state, {"input": user_input, "turns": len(result.raw_responses)})

        return {
            "run_id": run_id,
            "output": result.final_output,
            "restored": saved_state is not None,
        }


async def main():
    store = CheckpointStore()
    runner = ResilientRunner(store)

    agent = Agent(
        name="ResilientAgent",
        instructions="Help users with database queries and notifications.",
        tools=[send_notification, query_database],
        model="gpt-5-nano")

检查点与恢复状态机

下图展示了基于检查点的运行状态管理和恢复流程:
mermaid
stateDiagram-v2
[] --> Running: 开始执行
Running --> Checkpoint: 达到检查点
Checkpoint --> Saved: 持久化成功
Saved --> Running: 继续执行
Running --> Failed: 异常中断
Failed --> Restoring: 加载最近检查点
Restoring --> Running: 恢复执行
Running --> Completed: 任务完成
Completed --> [
]
Failed --> Aborted: 重试耗尽
Aborted --> [*]


检查点的粒度是关键设计参数。过于频繁的检查点会增加 I/O 开销,过于稀疏的检查点则会导致恢复时丢失过多进度。

## 检查点设计的工程实践

检查点机制看似简单,但在分布式和高并发场景下有许多细节需要考虑。

**检查点内容裁剪**:

完整的运行状态可能包含大量数据(如完整的对话历史、工具结果、大型文件内容)。并非所有数据都需要保存到检查点中。

```python
def create_checkpoint(state: RunState) -> dict:
    return {
        "conversation_summary": summarize(state.messages),
        "completed_tools": [t.name for t in state.completed_tools],
        "pending_tool": state.pending_tool.id if state.pending_tool else None,
        "user_context": {
            "user_id": state.context.get("user_id"),
            "session_id": state.context.get("session_id"),
        },
    }

检查点一致性

如果检查点保存过程中系统崩溃,可能导致检查点文件不完整。解决方案是:先写入临时文件,确认写入成功后再原子重命名为正式文件名。

import os

def atomic_write(path: str, data: bytes):
    temp_path = path + ".tmp"
    with open(temp_path, "wb") as f:
        f.write(data)
        f.flush()
        os.fsync(f.fileno())
    os.rename(temp_path, path)

恢复时的状态验证

加载检查点后,不应直接恢复执行,而应先验证状态的完整性。检查项目包括:会话是否已过期、依赖的外部资源是否仍然可用、已完成的工具结果是否仍然有效。

检查点机制与事务系统的预写日志(WAL)思想类似:在执行状态变更前先记录意图,确保即使发生故障也能恢复到一致状态。这是构建可靠长运行 Agent 系统的基石。检查点策略与存储选型也很重要。不同存储介质在写入延迟、恢复速度、可靠性方面各有差异:本地文件适合开发调试;Redis 适合高频短任务;PostgreSQL 适合长任务且需审计;S3/OSS 适合灾难恢复。对于关键业务流程,建议采用多级检查点策略:内存中保持最新状态,Redis 中保存最近 5 分钟的状态,PostgreSQL 中持久化所有历史状态。恢复时优先从内存加载,失败则逐级回退。

常见问题与调试

问题一:状态序列化失败导致检查点丢失

如果上下文对象包含不可序列化的类型(如数据库连接、文件句柄、锁对象),to_state() 会抛出异常。解决方案:

  1. 在上下文类中实现自定义的序列化方法(__getstate__ / __setstate__),排除不可序列化的字段。
  2. 使用 DTO(Data Transfer Object)模式:上下文只包含纯数据,所有资源句柄在恢复后重新初始化。
  3. 在保存检查点前进行序列化预演(json.dumps 试序列化),提前发现问题。

问题二:恢复后行为与首次运行不一致

由于模型输出的随机性,恢复后的运行可能选择不同的工具或生成不同的文本,导致最终输出与首次运行不同。缓解方法:

  1. 使用固定的 seed 参数(如果模型支持)来降低随机性。
  2. 将首次运行的关键决策(如工具选择)也保存到检查点中,恢复时强制复用。
  3. 对于需要严格一致性的场景,在首次运行时就确定完整的执行计划,保存为"剧本",恢复时按剧本执行。

问题三:检查点体积过大影响存储和恢复速度

长对话的完整消息历史可能达到数 MB,频繁保存会消耗大量存储和带宽。优化建议:

  1. 增量检查点:只保存自上次检查点以来的增量状态。
  2. 压缩:使用 gzip 或 zstd 对序列化数据进行压缩。
  3. 分层存储:近期的检查点保存在高速存储(如 Redis),历史检查点归档到低成本存储(如 S3)。

与其他方案对比

维度Agents SDK StateTemporal 工作流LangChain Checkpoint
持久化粒度运行级活动级(更细)运行级
恢复语义应用层实现原生支持应用层实现
幂等性保证需自行实现原生支持需自行实现
适用场景Agent 对话恢复复杂业务流程LLM 链恢复

Temporal 是目前最成熟的长时运行工作流引擎,它原生支持检查点、重试、幂等性和补偿事务(Saga),适合构建复杂的业务流程(如订单履约、金融交易)。Agents SDK 的状态恢复更轻量,适合对话类应用的断点续传。LangChain 的 Checkpoint 系统主要面向 LCEL 链的恢复,与 Agents SDK 的 Agent 运行模型有所不同。

检查点机制与备忘录模式的工程实现

Agent 运行的状态恢复能力是构建长时运行系统的基石,而**备忘录模式(Memento Pattern)**正是实现这一能力的经典设计模式。备忘录模式的核心思想是在不破坏对象封装性的前提下,捕获对象的内部状态并保存在外部,以便日后恢复。在 Agents SDK 中,result.to_state()Runner.run_continuation 构成了备忘录模式的两个关键操作:前者创建备忘录(状态快照),后者根据备忘录恢复对象状态。这种分离使得状态管理逻辑与业务运行逻辑各司其职,便于独立测试和维护,也符合面向对象设计中的单一职责原则。

sequenceDiagram
    participant U as 用户/调度器
    participant R as Runner
    participant A as Agent
    participant C as CheckpointStore
    participant T as 工具执行

    U->>R: run_or_resume(input)
    R->>C: load(run_id)
    alt 存在检查点
        C-->>R: 返回 saved_state
        R->>A: 从断点恢复对话
    else 新运行
        R->>A: 从头开始执行
    end
    A->>T: 调用工具
    T-->>A: 返回结果
    A-->>R: 返回运行结果
    R->>C: save(run_id, state)
    R-->>U: 返回输出

对于长对话场景,全量保存每次检查点会造成存储膨胀和 I/O 压力。**增量检查点(Incremental Checkpoint)**是解决这一问题的有效手段。增量检查点只保存自上次检查点以来的变化部分(delta),恢复时通过叠加增量来重建完整状态。以下是一个简化版的增量检查点实现:

import copy
from typing import Any

class IncrementalCheckpointStore:
    """增量检查点存储。"""
    
    def __init__(self):
        self._base: dict[str, dict] = {}
        self._deltas: dict[str, list[dict]] = {}
    
    def _compute_delta(self, old_state: dict, new_state: dict) -> dict:
        """计算两个状态之间的差异。"""
        delta = {}
        for key, new_val in new_state.items():
            old_val = old_state.get(key)
            if old_val != new_val:
                delta[key] = new_val
        return delta
    
    async def save(self, run_id: str, state: dict, seq: int) -> None:
        if seq == 0 or run_id not in self._base:
            # 第一个检查点:保存全量
            self._base[run_id] = copy.deepcopy(state)
            self._deltas[run_id] = []
        else:
            # 后续检查点:保存增量
            delta = self._compute_delta(self._base[run_id], state)
            self._deltas[run_id].append(delta)
            # 定期合并增量,防止恢复时叠加过多
            if len(self._deltas[run_id]) >= 10:
                self._base[run_id] = copy.deepcopy(state)
                self._deltas[run_id] = []
    
    async def load(self, run_id: str, seq: int | None = None) -> dict | None:
        base = self._base.get(run_id)
        if base is None:
            return None
        if seq is None:
            # 恢复到最新状态
            result = copy.deepcopy(base)
            for delta in self._deltas.get(run_id, []):
                result.update(delta)
            return result
        return copy.deepcopy(base)

幂等性保证是状态恢复的另一个关键属性。如果恢复操作被执行了两次(如网络超时导致客户端重试),结果应与执行一次相同。实现幂等性的常见方法是为每个工具调用分配唯一的 tool_call_id,并在工具实现中校验该 ID 是否已处理过。对于外部系统调用,还应使用业务键幂等策略:根据业务参数生成确定性 ID(如用户 ID 加操作类型加时间戳的哈希),确保同一业务意图不会被重复执行。检查点的保存频率也需要权衡:过于频繁会增加开销,过于稀疏则会导致恢复时重复执行过多步骤。一般建议在每次工具调用后、每次 LLM 响应后以及等待外部事件(如人工审批)前保存检查点。对于关键业务系统,还应定期执行恢复演练,模拟服务中断并验证检查点的完整性和恢复成功率,这是灾难恢复规划中的重要环节,也是通过生产环境可靠性审查的必要条件。

在分布式部署场景下,检查点的存储后端选择尤为关键。Redis 适合高频读写但容量有限的检查点,PostgreSQL 适合需要复杂查询和长期保留的审计类检查点,而对象存储则适合大规模归档。建议根据检查点的访问频率和保留期限,设计自动迁移策略:新检查点写入 Redis,超过七天的检查点自动转存到对象存储,既保证恢复速度又控制存储成本。

生产环境部署与性能优化

检查点持久化的实践要点

将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。

具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。

恢复成功率的关键指标

监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。

除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。

跨实例状态迁移的架构考量

随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。

在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。

运维团队的协作建议

技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。

在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。