概述
上下文窗口压缩与滑动窗口策略的信息保持
LLM 的上下文窗口虽然不断增大,但"窗口大"不等于"用得好"。随着对话轮数增长,上下文中的信息密度会逐渐稀释——早期的关键约束可能被淹没在大量的闲聊和工具调用记录中。上下文管理的核心目标是在有限的窗口内,最大化信息价值密度。
滑动窗口(Sliding Window)是最简单的上下文管理策略:只保留最近 N 轮对话,丢弃更早的内容。它的优点是实现简单、计算开销低,但缺点是会丢失历史中的关键信息。例如,用户在对话开头提到的"我对花生过敏",可能在 10 轮后被滑动窗口丢弃,导致 Agent 在推荐餐厅时忽略这一关键约束。
更高级的策略是关键信息提取与保持:在对话过程中,使用一个轻量级模型或规则引擎识别并提取关键事实(如用户偏好、约束条件、已确认的信息),将这些事实存储在一个独立的"事实库"中。每次构建 prompt 时,将事实库与最近的对话历史拼接,既保留了关键信息,又控制了总长度。
上下文压缩则是另一种思路:将多轮对话总结为一段摘要文本,用摘要替代原始对话。摘要的质量取决于压缩算法——简单的截断会丢失信息,而基于 LLM 的摘要生成虽然质量高,但本身也消耗 token 和延迟。
从信息论的角度来看,上下文管理是一个有损压缩问题:我们在降低比特率(token 数量)的同时,尽量保持信息熵(关键事实的完整性)。最优策略取决于具体场景——客服场景需要保留用户约束和订单信息,而创意写作场景可能更关注最近的风格指示。
RunContextWrapper:在 Agent、工具和 Handoff 之间共享应用状态和依赖。
正文
相关阅读
参考文档
完整实战示例:智能上下文压缩与关键信息保持系统
以下示例展示了如何在生产环境中实现一个结合滑动窗口、关键信息提取和动态压缩的上下文管理系统:
import asyncio
import json
from dataclasses import dataclass, field
from typing import Any
from agents import Agent, Runner, RunContextWrapper
@dataclass
class FactEntry:
key: str
value: str
confidence: float # 0-1
timestamp: float
class SmartContextManager:
"""智能上下文管理器:滑动窗口 + 事实库 + 动态压缩。"""
def __init__(self, max_history_turns: int = 6, fact_ttl: float = 3600.0):
self.max_history_turns = max_history_turns
self.fact_ttl = fact_ttl
self.history: list[dict] = []
self.facts: list[FactEntry] = []
self.compression_count = 0
def add_turn(self, role: str, content: str):
self.history.append({"role": role, "content": content, "timestamp": asyncio.get_event_loop().time()})
if len(self.history) > self.max_history_turns * 2:
self._compress_oldest()
def add_fact(self, key: str, value: str, confidence: float = 0.8):
now = asyncio.get_event_loop().time()
# 去重:如果已有相同 key 且置信度更高,保留旧的
existing = [f for f in self.facts if f.key == key]
if existing and existing[0].confidence >= confidence:
return
self.facts = [f for f in self.facts if f.key != key]
self.facts.append(FactEntry(key, value, confidence, now))
def _compress_oldest(self):
"""将最旧的两轮对话压缩为摘要。"""
to_compress = self.history[:2]
self.history = self.history[2:]
summary = f"[Earlier: {to_compress[0]['role']} said '{to_compress[0]['content'][:50]}...', then {to_compress[1]['role']} responded.]"
self.history.insert(0, {"role": "system", "content": summary, "timestamp": 0})
self.compression_count += 1
def _cleanup_expired_facts(self):
now = asyncio.get_event_loop().time()
self.facts = [f for f in self.facts if now - f.timestamp < self.fact_ttl]
def build_context(self) -> str:
self._cleanup_expired_facts()
parts = []
# 1. 事实库
if self.facts:
facts_text = "\n".join(f"- {f.key}: {f.value}" for f in self.facts)
parts.append(f"[Known Facts]\n{facts_text}")
# 2. 历史对话(含压缩摘要)
for turn in self.history:
role_label = "User" if turn["role"] == "user" else "Assistant"
if turn["role"] == "system":
role_label = "Summary"
parts.append(f"[{role_label}] {turn['content']}")
return "\n\n".join(parts)
def get_stats(self) -> dict:
return {
"history_length": len(self.history),
"facts_count": len(self.facts),
"compression_count": self.compression_count,
}
async def main():
ctx_mgr = SmartContextManager(max_history_turns=4)
# 模拟对话
ctx_mgr.add_turn("user", "Hi, I'm vegetarian and I need restaurant recommendations in Shanghai.")
ctx_mgr.add_fact("dietary_restrictions", "vegetarian", 0.95)
ctx_mgr.add_fact("location", "Shanghai", 0.9)
ctx_mgr.add_turn("assistant", "Great! Shanghai has many excellent vegetarian restaurants. What type of cuisine do you prefer?")
ctx_mgr.add_turn("user", "I love Sichuan food, but it must be vegetarian.")
ctx_mgr.add_fact("cuisine_preference", "Sichuan", 0.85)
ctx_mgr.add_turn("assistant", "Here are some top vegetarian Sichuan restaurants in Shanghai...")
ctx_mgr.add_turn("user", "Also, I have a peanut allergy. Very important.")
ctx_mgr.add_fact("allergy", "peanut", 0.98)
ctx_mgr.add_turn("assistant", "Noted! I'll make sure to only recommend peanut-free options.")
ctx_mgr.add_turn("user", "What about the one on Nanjing Road?")
print("=== 构建的上下文 ===")
print(ctx_mgr.build_context())
print(f"\nStats: {ctx_mgr.get_stats()}")
if __name__ == "__main__":
asyncio.run(main())这个上下文管理器的设计体现了分层记忆的思想:短期记忆(最近对话)保持高保真,长期记忆(事实库)保持高稳定性。即使原始对话被压缩或丢弃,关键事实仍然通过事实库传递给模型。
上下文压缩策略对比
下图对比了三种上下文管理策略的执行流程:
flowchart TD
subgraph Sliding["滑动窗口"]
S1[保留最近 N 轮] --> S2[丢弃早期消息]
S2 --> S3[可能丢失关键信息]
end
subgraph Summary["摘要替换"]
M1[早期对话] --> M2[生成摘要]
M2 --> M3[用摘要替代原文]
M3 --> M4[保留近期完整消息]
end
subgraph Selective["选择性保留"]
E1[标记关键消息] --> E2[永久保留关键消息]
E3[非关键消息] --> E4[压缩或丢弃]
E2 --> E5[混合上下文]
E4 --> E5
end
style Sliding fill:#f4b183,stroke:#5a4a3a
style Summary fill:#e8d5b5,stroke:#5a4a3a
style Selective fill:#c5e0b4,stroke:#5a4a3a没有一种策略适用于所有场景。滑动窗口实现最简单但信息丢失最多;摘要替换平衡了信息量和完整性;选择性保留最精准但需要额外的关键消息识别逻辑。
上下文管理的工程实现
生产级上下文管理需要考虑多个维度:Token 预算、信息完整性和计算开销。
Token 预算分配:
将模型的上下文窗口划分为几个区域,每个区域有固定的 Token 配额:
CONTEXT_BUDGET = {
"system_instructions": 500,
"user_profile": 300,
"conversation_summary": 800,
"recent_messages": 2000,
"tool_results": 500,
}当某个区域超限时,按照优先级压缩:先压缩工具结果(只保留关键字段),再压缩近期消息(减少保留轮数),最后压缩对话摘要(使用更简洁的语言)。
增量摘要更新:
每次新增一轮对话后,不需要重新生成整个摘要,而是基于上一轮摘要做增量更新:
async def incremental_summary(prev_summary: str, new_messages: list) -> str:
updater = Agent(
name="SummaryUpdater",
instructions="基于已有摘要和新消息,生成更新后的摘要",
model="gpt-5-nano"
)
prompt = f"上一版摘要:{prev_summary}\\n新增消息:{new_messages}\\n请生成更新后的摘要,保持简洁(不超过 300 字)。"
result = await Runner.run(updater, prompt)
return result.final_output增量更新的计算成本远低于全量重生成,特别适合高频对话场景。
跨会话上下文继承:
在多轮客服场景中,用户可能在不同会话中联系客服。如果每次会话都从零开始,用户体验会大打折扣。实现跨会话上下文继承需要:
- 用户级上下文存储:将用户的长期偏好、历史问题和已确认事实存储到用户档案中。
- 会话启动注入:新会话开始时,自动加载用户档案作为系统消息的一部分。
- 档案更新机制:每次会话结束后,将会话中的新信息合并到用户档案中。
async def load_user_context(user_id: str) -> str:
profile = await user_store.get(user_id)
return f"用户档案:{profile['preferences']}\\n历史问题:{profile['common_issues']}\\n"跨会话继承需要特别注意隐私合规——用户有权要求删除自己的档案数据,系统设计时应支持一键清除所有关联的上下文信息。上下文压缩与摘要技术方面,当对话历史超过模型上下文窗口时,需要进行压缩。常见压缩策略包括滑动窗口、摘要替换和选择性保留。摘要的质量直接影响多轮对话的连贯性。建议在摘要中保留关键实体、用户意图和已确认的事实,避免压缩后丢失重要上下文。
常见问题与调试
问题一:关键事实未被提取导致约束被违反
如果事实提取算法不够准确,某些关键约束可能没有被捕获到事实库中。排查方法:
- 在事实提取后增加一个校验步骤:使用 LLM 判断最近的用户消息中是否包含未被记录的新约束。
- 维护一个"关键关键词"列表(如"过敏"、"禁止"、"必须"),当用户消息包含这些词时,强制进行事实提取。
- 定期对事实库进行人工审计,评估提取的准确率和召回率。
问题二:事实库过大导致 prompt 膨胀
如果对话涉及大量事实(如长文档讨论、复杂项目规划),事实库本身可能超过上下文窗口。优化策略:
- 对事实进行分层:高频访问的事实放在 prompt 开头,低频事实放入向量数据库,按需检索。
- 对相似事实进行合并和去重(如"喜欢红色"和"偏好红色"合并为一条)。
- 为事实设置优先级和 TTL,低优先级或过期事实自动归档。
问题三:压缩摘要丢失关键细节
基于截断或简单摘要的压缩可能丢失精确的数值、名称或条件。缓解措施:
- 在压缩前扫描待压缩文本中的实体(如人名、地名、数字),将这些实体提取到事实库中。
- 使用更智能的摘要模型(如专门微调的压缩模型)生成高质量的摘要。
- 对压缩过程进行可逆标记:保留指向原始消息的引用,需要时可以回溯查看完整内容。
与其他方案对比
| 维度 | 自定义上下文管理 | LangChain Memory | RAG 检索增强 |
|---|---|---|---|
| 信息来源 | 对话历史 | 对话历史 | 外部知识库 |
| 更新方式 | 实时提取 | 预设策略 | 预索引/实时索引 |
| 复杂度 | 中(需设计提取逻辑) | 低(开箱即用) | 高(需维护向量库) |
| 适用场景 | 长期对话记忆 | 短期会话管理 | 知识密集型问答 |
RAG(检索增强生成)与上下文管理是互补而非竞争关系:RAG 解决的是"模型不知道的外部知识",而上下文管理解决的是"模型知道但可能遗忘的对话历史"。在生产系统中,通常同时使用两者:RAG 提供领域知识,上下文管理提供个性化记忆。LangChain 的 Memory 系统提供了一些便利的预设策略,但对于需要深度定制的场景(如医疗对话中的关键禁忌提取),自定义上下文管理仍然是必要的。
策略模式实现可插拔的上下文压缩算法
上下文管理系统的核心价值在于"在有限窗口内保留最有价值的信息",但"什么是最有价值的"因场景而异。客服对话需要保留用户约束和订单信息,创意写作需要保留风格指示和角色设定,代码助手需要保留函数签名和类型定义。为不同场景硬编码压缩逻辑会导致系统僵化,而**策略模式(Strategy Pattern)**提供了一种优雅的解耦方案。
flowchart TD
A[用户输入] --> B{上下文长度检查}
B -->|未超限| C[直接追加到历史]
B -->|已超限| D[策略选择器]
D --> E[滑动窗口策略]
D --> F[关键信息提取策略]
D --> G[LLM 摘要策略]
D --> H[混合分层策略]
E --> I[更新上下文]
F --> I
G --> I
H --> I
I --> J[构建最终 Prompt]
J --> K[调用模型]策略模式的核心是将"压缩算法"这一变化点抽象为独立的策略接口,上下文管理器只负责调用策略,而不关心策略的具体实现。这使得团队可以并行开发多种压缩策略,并在运行时根据场景动态切换:
from abc import ABC, abstractmethod
from typing import List, Dict
import asyncio
class CompressionStrategy(ABC):
"""上下文压缩策略抽象基类。"""
@abstractmethod
async def compress(self, history: List[Dict], facts: List[Dict]) -> tuple:
"""返回 (compressed_history, compressed_facts)。"""
pass
@abstractmethod
def get_name(self) -> str:
pass
class SlidingWindowStrategy(CompressionStrategy):
"""滑动窗口策略:保留最近 N 轮。"""
def __init__(self, max_turns: int = 6):
self.max_turns = max_turns
async def compress(self, history, facts):
keep = self.max_turns * 2
return history[-keep:], facts
def get_name(self):
return "sliding_window"
class FactExtractionStrategy(CompressionStrategy):
"""关键信息提取策略:保留事实库,压缩历史为摘要。"""
async def compress(self, history, facts):
# 保留最近 2 轮,其余用轻量模型摘要
recent = history[-4:] if len(history) >= 4 else history
older = history[:-4] if len(history) >= 4 else []
summary = await self._summarize(older)
compressed = [{"role": "system", "content": f"[摘要] {summary}"}] + recent
return compressed, facts
async def _summarize(self, turns):
# 简化:实际应调用轻量 LLM
topics = [t['content'][:20] + "..." for t in turns[::2]]
return "先前讨论了: " + "; ".join(topics)
def get_name(self):
return "fact_extraction"
class HybridStrategy(CompressionStrategy):
"""混合分层策略:按信息优先级分层处理。"""
def __init__(self, strategies: List[CompressionStrategy]):
self.strategies = strategies
async def compress(self, history, facts):
# 装饰器模式:依次应用多个策略
h, f = history, facts
for s in self.strategies:
h, f = await s.compress(h, f)
return h, f
def get_name(self):
return f"hybrid({','.join(s.get_name() for s in self.strategies)})"
class ContextManagerWithStrategy:
"""基于策略模式的上下文管理器。"""
def __init__(self, strategy: CompressionStrategy):
self.strategy = strategy
self.history = []
self.facts = []
async def add_turn(self, role: str, content: str):
self.history.append({"role": role, "content": content})
# 当超过阈值时触发压缩
if len(self.history) > 12:
self.history, self.facts = await self.strategy.compress(
self.history, self.facts
)
def build_prompt(self) -> str:
parts = []
if self.facts:
parts.append("[已知事实]\n" + "\n".join(f"- {f['key']}: {f['value']}" for f in self.facts))
for turn in self.history:
parts.append(f"[{turn['role']}] {turn['content']}")
return "\n\n".join(parts)上述代码展示了策略模式与**装饰器模式(Decorator Pattern)**的结合。HybridStrategy 将多个策略组合在一起,按顺序应用,形成一条处理链。这种组合方式比继承更灵活——你可以自由搭配滑动窗口和事实提取,而无需创建新的子类。对于需要深度定制的场景(如医疗对话中的禁忌信息必须保留),可以新增一个 MedicalConstraintStrategy,直接插入策略链中即可。
策略选择本身也可以智能化。例如,通过监控历史对话的类型分布(问答比例、工具调用频率),动态推荐最优策略;或者在 A/B 测试框架中,同时运行多种策略,根据用户满意度指标自动选择 winner。这种"元策略"层的设计,使得上下文管理系统具备了自我进化的能力。
生产环境部署与性能优化
上下文压缩策略的实践要点
将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。
具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。
窗口命中率的关键指标
监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。
除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。
长会话归档的架构考量
随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。
在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。
运维团队的协作建议
技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。
在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。