Session 会话管理

📑 目录

概述

会话存储格式与 TTL 过期策略的分布式一致性

Session 是 Agents SDK 中有状态对话的基石。Agent 本身是无状态的,Session 为运行注入了跨调用的记忆能力。从存储视角来看,Session 需要解决三个核心问题:序列化格式、存储后端和过期策略。

序列化格式方面,SDK 的 Session 接口通常将对话历史表示为 list[TResponseInputItem] 的序列化形式。这包括用户消息、助手消息、工具调用和工具结果。对于不同的存储后端,序列化方式也不同:

  • InMemorySession:直接持有 Python 对象,零序列化开销,但进程重启后数据丢失。
  • SQLiteSession:使用 JSON 文本存储,SQLite 的 WAL(Write-Ahead Logging)模式提供了良好的并发性能。
  • RedisSession:使用 Redis 的字符串或列表结构存储,支持分布式访问和 TTL 过期。

TTL(Time-To-Live)过期策略对于生产环境至关重要。如果没有过期机制,会话数据会无限增长,最终耗尽存储空间。合理的 TTL 设置需要权衡用户体验和存储成本:

  • 短期会话(如一次性查询):TTL 1-24 小时,过期后自动清理。
  • 中期会话(如客服对话):TTL 7-30 天,支持用户返回继续对话。
  • 长期会话(如个人助手):TTL 90-365 天,或永不过期但手动归档。

从分布式一致性来看,当多个服务实例共享同一个 Redis Session 时,需要考虑并发修改的问题。如果用户快速发送两条消息,两条请求可能被不同的服务实例处理,同时读取和修改同一个 Session,导致消息丢失或顺序错乱。解决方案包括:

  1. 乐观锁:为 Session 附加版本号,写回时检查版本号是否匹配。
  2. 分布式锁:使用 Redis Redlock 或类似机制,确保同一时刻只有一个实例在修改 Session。
  3. 会话亲和性(Session Affinity):通过负载均衡策略将同一用户的请求路由到同一实例。

SQLiteSession 和 RedisSession:自动管理跨多次运行的对话历史。

正文

相关阅读

参考文档

完整实战示例:企业级会话管理中间件

以下示例展示了如何构建一个支持多后端、TTL 管理和并发控制的会话管理中间件:

import asyncio
import json
import time
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, asdict
from typing import Any
from agents import Agent, Runner
from agents.sessions import InMemorySession, Session


class PersistentSession(ABC):
    """持久化会话抽象基类。"""

    @abstractmethod
    async def load(self, session_id: str) -> Session | None:
        pass

    @abstractmethod
    async def save(self, session_id: str, session: Session, ttl: int) -> None:
        pass

    @abstractmethod
    async def delete(self, session_id: str) -> None:
        pass


class RedisSessionStore(PersistentSession):
    """Redis 会话存储(简化示例)。"""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.lock_timeout = 5

    async def load(self, session_id: str) -> Session | None:
        data = await self.redis.get(f"session:{session_id}")
        if not data:
            return None
        items = json.loads(data)
        return InMemorySession(items)  # 简化重建

    async def save(self, session_id: str, session: Session, ttl: int) -> None:
        items = session.to_input_list() if hasattr(session, "to_input_list") else []
        await self.redis.setex(f"session:{session_id}", ttl, json.dumps(items, default=str))

    async def delete(self, session_id: str) -> None:
        await self.redis.delete(f"session:{session_id}")


class SessionManager:
    """会话管理器:封装加载、保存和并发控制。"""

    def __init__(self, store: PersistentSession, default_ttl: int = 3600):
        self.store = store
        self.default_ttl = default_ttl
        self._local_locks: dict[str, asyncio.Lock] = {}

    def _get_lock(self, session_id: str) -> asyncio.Lock:
        if session_id not in self._local_locks:
            self._local_locks[session_id] = asyncio.Lock()
        return self._local_locks[session_id]

    async def get_or_create(self, session_id: str | None) -> tuple[str, Session]:
        sid = session_id or str(uuid.uuid4())
        session = await self.store.load(sid)
        if session is None:
            session = InMemorySession()
        return sid, session

    async def run_with_session(
        self,
        agent: Agent,
        user_input: str,
        session_id: str | None = None,
        ttl: int | None = None,
    ) -> dict:
        sid, session = await self.get_or_create(session_id)
        async with self._get_lock(sid):
            result = await Runner.run(agent, user_input, session=session)
            await self.store.save(sid, session, ttl or self.default_ttl)

        return {
            "session_id": sid,
            "output": result.final_output,
            "history_length": len(session.to_input_list()) if hasattr(session, "to_input_list") else 0,
        }


# 模拟 Redis 的内存实现
class FakeRedis:
    def __init__(self):
        self._data: dict[str, tuple[str, float]] = {}  # value -> (data, expire_at)

    async def get(self, key: str) -> str | None:
        if key in self._data:
            data, expire_at = self._data[key]
            if time.time() < expire_at:
                return data
            del self._data[key]
        return None

    async def setex(self, key: str, seconds: int, value: str) -> None:
        self._data[key] = (value, time.time() + seconds)

    async def delete(self, key: str) -> None:
        self._data.pop(key, None)


async def main():
    fake_redis = FakeRedis()
    store = RedisSessionStore(fake_redis)
    manager = SessionManager(store, default_ttl=300)

    agent = Agent(name="Chatty", instructions="You are a helpful assistant. Remember our conversation.", model="gpt-5-nano")

    # 第一轮
    r1 = await manager.run_with_session(agent, "My name is Alice")
    print(f"Session: {r1['session_id']}, History: {r1['history_length']}")

    # 第二轮(使用同一 session)
    r2 = await manager.run_with_session(agent, "What's my name?", session_id=r1["session_id"])
    print(f"History: {r2['history_length']}, Output: {r2['output']}")


if __name__ == "__main__":
    asyncio.run(main())

这个中间件的设计体现了存储无关性:通过抽象基类 PersistentSession,你可以轻松切换 Redis、PostgreSQL 或云存储后端,而业务逻辑保持不变。本地锁机制确保了单机并发安全,分布式场景下可以替换为 Redis 分布式锁。

会话状态机与生命周期

下图展示了会话从创建到销毁的完整状态转换:

stateDiagram-v2
    [*] --> Created: 首次请求
    Created --> Active: 对话进行中
    Active --> Idle: 用户无操作
    Idle --> Active: 新消息到达
    Idle --> Expired: 超过 TTL
    Active --> Persisted: 显式保存
    Persisted --> Restored: 会话恢复
    Expired --> [*]: 清理资源
    Restored --> Active: 继续对话

会话的生命周期管理直接影响用户体验和系统资源。过短的 TTL 导致用户频繁重新建立上下文,过长的 TTL 则占用不必要的存储空间。

会话数据的隐私与合规

会话数据通常包含敏感的用户对话内容,其存储和处理必须遵守数据保护法规(如 GDPR、CCPA、个人信息保护法)。

数据最小化原则

只存储业务必需的会话数据。例如,如果系统只需要保留最近 3 轮对话来生成回复,就不要存储完整的 50 轮历史。定期清理过期数据,设置合理的 TTL。

数据脱敏

在将会话数据写入日志或追踪系统前,应对敏感字段进行脱敏处理:

import re

SENSITIVE_PATTERNS = [
    (r'\b\d{18}\b', '[ID_CARD]'),
    (r'1[3-9]\d{9}', '[PHONE]'),
    (r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]'),
]

def desensitize(text: str) -> str:
    for pattern, replacement in SENSITIVE_PATTERNS:
        text = re.sub(pattern, replacement, text)
    return text

用户数据主权

法规通常要求用户有权查看、导出和删除自己的数据。会话管理系统应提供相应的 API:

  • GET /sessions/{user_id}:列出用户的所有会话
  • GET /sessions/{session_id}/export:导出会话内容(JSON/CSV 格式)
  • DELETE /sessions/{session_id}:删除会话及其所有关联数据

删除操作应传播到所有存储副本,包括主数据库、缓存、日志归档和备份。这通常通过逻辑删除标记实现:先标记为已删除并停止服务,等待备份轮换周期后再物理清除。

分布式会话一致性在多实例部署场景下尤为重要。Redis 是常见的选择,但需要注意以下问题:序列化格式方面,Pydantic 对象默认不可 JSON 序列化,需使用 model_dump_json() 转换;并发更新方面,多个实例可能同时读写同一会话,需使用 Redis 事务或 Lua 脚本保证原子性;过期策略方面,设置合理的 TTL,避免僵尸会话占用内存。对于跨地域部署,可考虑会话亲和性(Session Affinity)将同一用户固定路由到同一实例,减少分布式会话的读写开销。

常见问题与调试

问题一:会话数据在实例间不同步

当使用 InMemorySession 并部署多个服务实例时,每个实例持有独立的内存数据,导致用户的会话在实例间"漂移"。解决方案:

  1. 生产环境务必使用 RedisSession 或外置数据库。
  2. 在负载均衡器上启用 Session Affinity(粘性会话),确保同一用户始终路由到同一实例。
  3. 定期将会话数据从内存刷新到共享存储,降低漂移风险。

问题二:会话历史过长导致 token 爆炸

随着对话轮数增长,Session 中的消息历史会线性膨胀,最终导致 LLM 调用的 token 费用激增甚至超出上下文窗口。缓解策略:

  1. Session 层实现自动压缩:当历史超过阈值时,将早期消息摘要化。
  2. 为每个用户设置最大会话长度,超过后强制开启新会话。
  3. 使用滑动窗口策略,只保留最近 N 轮对话,旧消息定期归档。

问题三:TTL 过期导致用户对话中断

如果用户离开一段时间后返回,发现之前的对话上下文丢失了。平衡策略:

  1. 为活跃用户延长 TTL(每次交互时重置过期时间)。
  2. 在会话即将过期前发送提醒(如推送通知)。
  3. 实现会话归档和恢复机制,长期不活跃的会话被压缩存储,用户返回时可以快速恢复摘要。

与其他方案对比

维度Agents SDK SessionLangChain Memory自建数据库会话
实现复杂度低(内置多种后端)中(需选择适配器)高(从零设计)
扩展性中(接口固定)高(可自定义 Memory 类)极高(完全可控)
分布式支持RedisSession 原生支持依赖外部存储配置依赖自建架构
压缩策略基础(需自行扩展)丰富(多种预设)完全自定义

LangChain 的 Memory 生态系统提供了更多开箱即用的策略(如 ConversationBufferWindowMemoryVectorStoreRetrieverMemory),在会话压缩和长期记忆方面更为成熟。Agents SDK 的 Session 设计更简洁,与 Runner 的集成更紧密,适合追求简单直接的团队。对于需要复杂会话管理(如多轮状态机、跨会话学习)的场景,自建数据库会话可能是最终归宿,但开发和维护成本也最高。

会话存储架构与滑动窗口压缩机制

Session 作为 Agent 的记忆载体,其存储架构直接影响系统的可扩展性和响应延迟。从架构视角来看,一个完整的会话管理系统应包含接入层、处理层和存储层三个层次。接入层负责接收用户请求并提取会话标识;处理层执行会话的加载、压缩和保存逻辑;存储层则提供持久化能力。这种分层设计使得每一层都可以独立扩展和替换,符合单一职责原则。在高并发场景下,接入层可以采用无状态设计,所有会话状态外置到共享存储,从而实现计算层的水平扩容。这种架构也是现代云原生应用的设计共识,状态外置后,容器可以任意启停而不丢失用户上下文。

graph LR
    A[用户请求] --> B[接入层]
    B --> C[处理层]
    C --> D{存储后端}
    D --> E[InMemory]
    D --> F[SQLite]
    D --> G[Redis]
    C --> H[压缩引擎]
    H --> I[滑动窗口]
    H --> J[摘要生成]
    H --> K[Token 预算]

当会话历史不断增长时,Token 消耗会成为主要的成本瓶颈。实现滑动窗口压缩是解决这一问题的关键策略。其核心思想是:保留最近 N 轮对话的完整内容,将更早的历史通过 LLM 压缩为摘要,从而在不丢失核心上下文的前提下控制 Token 数量。以下是一个基于 Token 预算的会话压缩器实现:

from dataclasses import dataclass
from typing import Any

@dataclass
class TokenBudget:
    max_tokens: int = 4000
    reserve_tokens: int = 500  # 为当前轮次预留
    
    @property
    def available_for_history(self) -> int:
        return self.max_tokens - self.reserve_tokens

class SlidingWindowCompressor:
    """滑动窗口会话压缩器。"""
    
    def __init__(self, budget: TokenBudget, window_size: int = 6):
        self.budget = budget
        self.window_size = window_size
        self._summary: str = ""
    
    def compress(self, history: list[dict]) -> list[dict]:
        if len(history) <= self.window_size:
            return history
        
        # 保留最近 window_size 条完整消息
        recent = history[-self.window_size:]
        
        # 将更早的消息压缩为摘要(简化示例)
        older = history[:-self.window_size]
        if older:
            self._summary = self._generate_summary(older)
        
        result = []
        if self._summary:
            result.append({
                "role": "system",
                "content": f"[历史摘要] {self._summary}"
            })
        result.extend(recent)
        return result
    
    def _generate_summary(self, messages: list[dict]) -> str:
        topics = set()
        for m in messages:
            content = m.get("content", "")
            # 简化:提取关键词作为摘要
            if len(content) > 20:
                topics.add(content[:30] + "...")
        return "; ".join(list(topics)[:5])

除了滑动窗口,另一种高级策略是分层存储:将会话数据按照活跃度和时间维度划分为热数据、温数据和冷数据。热数据(最近一小时)保存在内存中,温数据(最近一天)保存在 Redis 中,冷数据则归档到对象存储(如 S3)。这种分层架构在成本和性能之间取得了良好平衡。对于需要长期记忆的场景,还可以将关键信息提取到向量数据库中,通过语义检索实现跨会话的知识召回,突破传统 Session 的轮次限制。生产环境中,建议在 Session 层实现统一的TTL 刷新策略:每次读写操作都重置 Redis 的过期时间,确保活跃会话不会因空闲而被意外清除,同时让长期不活跃的会话自然过期以释放存储空间。这种策略通常被称为惰性续约,是分布式缓存管理的最佳实践,能够有效避免用户在短暂离开后丢失上下文的糟糕体验。在多租户场景下,还应为不同租户配置独立的命名空间和配额限制,防止单个租户的会话数据膨胀影响整体系统稳定性。

生产环境部署与性能优化

会话存储选型的实践要点

将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。

具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。

会话活跃度的关键指标

监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。

除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。

分布式会话同步的架构考量

随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。

在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。

运维团队的协作建议

技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。

在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。