概述
PII 检测算法与会话压缩策略的工程实现
输入过滤(Input Filter)在多 Agent 系统中扮演着"数据海关"的角色:它决定了在 Agent 交接或上下文传递时,哪些信息可以过境,哪些信息需要被拦截或脱敏。从工程角度看,输入过滤面临两个核心挑战:检测的准确性和处理的开销。
PII(个人身份信息)检测通常采用多层过滤架构:
规则层:使用正则表达式快速匹配常见的 PII 模式,如身份证号、手机号、信用卡号、邮箱地址。规则层的优势是速度快(O(n) 复杂度)、零误召回(不会漏掉匹配规则的内容),但缺点是误报率高(如一个16位数字可能是订单号而非信用卡号)。
模型层:使用专门的 NER(命名实体识别)模型或 LLM 来识别更复杂的 PII 类型,如姓名、地址、生物特征信息。模型层的准确率高,但延迟较大(通常需要 100-500ms),且成本更高。
上下文层:结合消息上下文判断某个实体是否为 PII。例如,"我的卡号是 1234" 中的数字显然是 PII,而"订单号是 1234"中的数字则不是。
会话压缩则是另一个关键优化点。随着对话轮数增长,上下文长度呈线性膨胀。如果不加控制,很快就会触达模型的上下文窗口上限。常见的压缩策略包括:
- 滑动窗口:只保留最近 N 轮对话。
- 摘要替换:将早期的多轮对话压缩为一段摘要文本。
- 选择性保留:识别并保留关键信息(如用户确认过的约束条件),丢弃闲聊内容。
Agents SDK 的 input_filter 为上述所有策略提供了钩子入口,开发者可以在交接时实施任意的压缩和脱敏逻辑。
HandoffInputData、input_filter 和 nest_handoff_history:控制交接时的上下文传递。
正文
相关阅读
参考文档
完整实战示例:多层输入过滤与会话压缩系统
以下示例展示了如何在生产环境中实现一个结合规则脱敏、模型检测和智能压缩的输入过滤系统:
import re
import asyncio
from dataclasses import dataclass, field
from typing import Any
from agents import Agent, Runner, handoff, RunContextWrapper
from agents.handoffs import HandoffInputData
@dataclass
class FilterConfig:
enable_regex: bool = True
enable_llm_check: bool = False # 生产环境可开启
max_history_length: int = 10
compress_threshold: int = 5
class SmartInputFilter:
"""智能输入过滤器:规则脱敏 + 历史压缩。"""
PATTERNS = {
"credit_card": re.compile(r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b"),
"phone": re.compile(r"\b1[3-9]\d{9}\b"),
"id_card": re.compile(r"\b\d{17}[\dXx]|\d{15}\b"),
"email": re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"),
}
def __init__(self, config: FilterConfig = None):
self.config = config or FilterConfig()
self.compression_log: list[dict] = []
def _regex_mask(self, text: str) -> str:
"""基于正则的 PII 脱敏。"""
for name, pattern in self.PATTERNS.items():
text = pattern.sub(lambda m: f"[{name.upper()}_MASKED]", text)
return text
def _compress_history(self, history: list) -> list:
"""当历史超过阈值时,压缩早期消息为摘要。"""
if len(history) <= self.config.max_history_length:
return history
# 保留最近 N 条,将更早的消息替换为摘要标记
keep = history[-self.config.max_history_length:]
compressed_count = len(history) - self.config.max_history_length
summary = f"[Earlier {compressed_count} messages summarized for brevity]"
# 简化处理:实际应使用 LLM 生成真实摘要
return [summary] + keep
def __call__(self, data: HandoffInputData) -> HandoffInputData:
# 1. 脱敏处理
if self.config.enable_regex:
sanitized = []
for item in data.input_history:
text = str(item)
masked = self._regex_mask(text)
sanitized.append(masked if masked != text else item)
data.input_history = sanitized
# 2. 历史压缩
data.input_history = self._compress_history(data.input_history)
self.compression_log.append({
"original_length": len(data.input_history) + self.config.max_history_length,
"compressed_length": len(data.input_history),
})
return data
# 创建过滤器和 Handoff
filter_config = FilterConfig(max_history_length=8, compress_threshold=4)
smart_filter = SmartInputFilter(filter_config)
billing_agent = Agent(name="Billing", instructions="Handle billing with PII awareness.")
sales_agent = Agent(name="Sales", instructions="Handle sales inquiries.")
triage_agent = Agent(
name="Triage",
instructions="Route users to the right department.",
handoffs=[
handoff(billing_agent, input_filter=smart_filter),
handoff(sales_agent, input_filter=smart_filter),
])
输入过滤的多层防御架构
下图展示了从网络边界到应用层的多层输入过滤架构:
mermaid
flowchart LR
WAF[WAF/防火墙] --> Rate[速率限制]
Rate --> Syntax[语法校验]
Syntax --> Keyword[关键词过滤]
Keyword --> Semantic[语义分析]
Semantic --> Guard[Guardrails Agent]
Guard --> App[Agent 应用]
WAF -->|阻断恶意IP| Block1[(阻断)]
Rate -->|超限| Block2[(阻断)]
Keyword -->|命中黑名单| Block3[(阻断)]
Semantic -->|分类恶意| Block4[(阻断)]
Guard -->|触发防护| Block5[(阻断)]
style WAF fill:#f4b183,stroke:#5a4a3a
style Rate fill:#f4b183,stroke:#5a4a3a
style Keyword fill:#e8d5b5,stroke:#5a4a3a
style Guard fill:#c5e0b4,stroke:#5a4a3a
纵深防御的核心思想是:每一层过滤都不追求完美,而是追求在成本可控的前提下拦截大部分攻击。
## 会话安全:身份认证与授权
输入过滤只是安全的第一道防线,会话层面的身份认证和授权同样重要。
**会话令牌设计**:
每个会话应关联一个不可伪造的 JWT 令牌,包含用户身份、权限范围和会话过期时间。Agent 在处理请求前必须验证令牌的有效性。
```python
import jwt
class SecureSession:
def __init__(self, token: str, secret: str):
self.payload = jwt.decode(token, secret, algorithms=["HS256"])
@property
def user_id(self) -> str:
return self.payload["sub"]
@property
def permissions(self) -> list[str]:
return self.payload.get("perms", [])
def can_use_tool(self, tool_name: str) -> bool:
return f"tool:{tool_name}" in self.permissions会话固定攻击防护:
攻击者可能试图劫持用户的会话。防护措施包括:
- 令牌绑定:将 JWT 令牌与客户端指纹(IP 地址、User-Agent)绑定,检测到异常变更时强制重新认证。
- 短期令牌:访问令牌有效期设置为 15 分钟,配合刷新令牌机制,减少令牌泄露后的风险窗口。
- 会话吊销:当检测到异常行为(如短时间内大量请求、来自不同地理位置)时,立即将会话加入黑名单,后续请求携带该令牌将被拒绝。
会话管理与输入过滤是相辅相成的:输入过滤阻止恶意内容进入系统,会话管理确保只有合法用户才能访问其有权访问的资源。输入过滤的绕过与防御也是关键课题。攻击者可能通过提示注入(Prompt Injection)绕过输入过滤。常见绕过手法包括角色扮演、编码混淆、分片注入。防御策略应采用多层过滤:第一层解码常见编码;第二层模式匹配(正则/关键词);第三层语义分类(小型分类模型)。没有任何单一防御手段是绝对安全的,安全策略的核心是纵深防御——即使一层被突破,后续层级仍能提供保护。
常见问题与调试
问题一:正则过滤误报导致正常信息被掩码
例如,一个 16 位订单号可能被误判为信用卡号。缓解策略:
- 为不同场景维护独立的正则规则集(如客服场景 vs 金融场景)。
- 使用白名单机制:已知安全的格式(如以特定前缀开头的订单号)跳过检测。
- 在掩码前记录原始值到安全日志,便于事后审计和纠错。
问题二:历史压缩导致关键约束丢失
如果早期对话中用户明确了一个重要约束(如"我对花生过敏"),压缩后这个信息可能丢失。解决方案:
- 实现"关键信息提取":在压缩前使用轻量级模型识别并提取用户约束,存储到独立的 key-value 上下文中。
- 在摘要中显式包含关键约束:"[Summary: User has peanut allergy. 3 earlier messages omitted.]"
- 对包含关键词(如"过敏"、"禁忌"、"必须")的消息标记为不可压缩。
问题三:LLM 检测层的延迟和成本
如果为每条消息都调用 LLM 进行 PII 检测,成本可能超过主业务逻辑。优化建议:
- 只在高风险场景(如医疗、金融)启用 LLM 检测层。
- 对已通过正则检测的消息跳过 LLM 检测。
- 使用本地部署的小模型(如 7B 参数的 NER 模型)替代 API 调用。
与其他方案对比
| 维度 | Agents SDK input_filter | LangChain Memory | Microsoft Presidio |
|---|---|---|---|
| 过滤时机 | Handoff/交接时 | 历史加载时 | 独立服务(任意时机) |
| PII 检测 | 需自行实现 | 基础支持 | 专业级(内置多语言) |
| 压缩策略 | 完全自定义 | 提供几种预设 | 不支持 |
| 适用场景 | 多 Agent 上下文管理 | 单 Agent 会话管理 | 企业级数据脱敏 |
Microsoft Presidio 是目前业界最成熟的 PII 检测和脱敏框架,支持 30+ 种实体类型和多语言,适合对数据隐私要求极高的企业。Agents SDK 的 input_filter 更专注于 Agent 上下文的管理场景,提供了最大的灵活性,但需要开发者自行实现检测算法。LangChain 的 Memory 系统在会话压缩方面提供了一些便利的预设(如 ConversationSummaryMemory),但其设计主要面向单 Agent 场景,在多 Agent 交接时不够灵活。
可组合的过滤管道与策略模式设计
在生产环境中,输入过滤的需求往往是多维且动态变化的。同一套系统在不同业务场景下可能需要不同的脱敏强度、不同的压缩策略和不同的历史保留规则。直接使用一个庞大的过滤函数会导致代码难以维护和测试。通过装饰器模式(Decorator Pattern)和策略模式(Strategy Pattern),可以构建出灵活可组合的过滤架构。
装饰器模式构建过滤管道
装饰器模式允许在不修改原有 Handoff 逻辑的前提下,动态地为输入过滤添加新能力。每个装饰器只关注一个横切关注点,主过滤逻辑保持纯粹。例如,你可以为核心过滤器依次包裹日志装饰器、指标装饰器和缓存装饰器,而无需改动核心过滤器的任何代码。
这种设计的核心价值在于职责分离。审计日志装饰器负责记录每次过滤前后的消息数量和命中规则,性能指标装饰器负责采集各层的延迟分布,缓存装饰器负责缓存已过滤的相同输入以避免重复计算。当某个横切关注点的需求变更时,只需要修改对应的装饰器即可,其他组件不受影响。
在实际部署中,建议通过配置文件控制装饰器的启用顺序和参数。这样在排查问题时,可以临时关闭非关键装饰器以缩小问题范围;在性能调优时,也可以单独调整某一层的行为而无需重新部署代码。
策略模式实现可插拔的压缩算法
不同的业务场景对会话压缩的需求差异很大。医疗咨询场景要求保留所有关键约束信息,不能因压缩导致禁忌用药提示丢失;而普通客服场景更关注响应速度,可以接受较激进的压缩策略。策略模式允许在运行时切换压缩算法,完美应对这种多变的需求。
滑动窗口策略实现最简单,但会丢失超出窗口的上下文信息。摘要替换策略通过 LLM 生成早期对话的摘要,保留的信息更丰富,但引入了额外的延迟和成本。选择性保留策略基于关键词识别关键消息,在信息保留和上下文长度之间取得了较好的平衡。在实际系统中,还可以根据当前负载动态切换策略:高峰期使用滑动窗口保证低延迟,闲时使用摘要替换保证信息完整。
策略模式的另一个优势是可测试性。每个压缩策略都是独立的类,可以单独编写单元测试,验证其在各种边界条件下的行为。例如测试 SelectiveRetentionStrategy 是否能正确识别包含"必须""禁止"等关键词的消息,并确保这些消息在任何压缩阈值下都不会被丢弃。
graph LR
A[原始输入] --> B[规则脱敏层]
B --> C[模型检测层]
C --> D[上下文判断层]
D --> E[会话压缩层]
E --> F[输出到Agent]
B -.->|命中规则| G[审计日志]
C -.->|发现PII| G
D -.->|上下文异常| H[安全告警]
style A fill:#4a90d9,color:#fff
style F fill:#5cb85c,color:#fff
style G fill:#f0ad4e,color:#fff
style H fill:#d9534f,color:#fff以下示例展示了装饰器模式和策略模式的组合实现:
from functools import wraps
from typing import Callable
from abc import ABC, abstractmethod
class FilterDecorator:
"""过滤器装饰器基类。"""
def __init__(self, filter_fn: Callable):
self.filter_fn = filter_fn
def __call__(self, data: HandoffInputData) -> HandoffInputData:
return self.filter_fn(data)
class LoggingDecorator(FilterDecorator):
"""审计日志装饰器。"""
def __call__(self, data: HandoffInputData) -> HandoffInputData:
before = len(data.input_history)
result = self.filter_fn(data)
after = len(result.input_history)
print(f"[AUDIT] Filtered from {before} to {after} messages")
return result
class MetricsDecorator(FilterDecorator):
"""性能指标装饰器。"""
def __init__(self, filter_fn: Callable, name: str):
super().__init__(filter_fn)
self.name = name
self.latency_ms: float = 0.0
def __call__(self, data: HandoffInputData) -> HandoffInputData:
import time
start = time.perf_counter()
result = self.filter_fn(data)
self.latency_ms = (time.perf_counter() - start) * 1000
print(f"[METRICS] {self.name} took {self.latency_ms:.2f}ms")
return result
class CompressionStrategy(ABC):
"""会话压缩策略接口。"""
@abstractmethod
def compress(self, history: list, max_length: int) -> list:
pass
class SlidingWindowStrategy(CompressionStrategy):
def compress(self, history: list, max_length: int) -> list:
return history[-max_length:]
class SummaryStrategy(CompressionStrategy):
def compress(self, history: list, max_length: int) -> list:
if len(history) <= max_length:
return history
keep = history[-(max_length - 1):]
summary = f"[{len(history) - len(keep)} earlier messages summarized]"
return [summary] + keep
class SelectiveRetentionStrategy(CompressionStrategy):
KEYWORDS = ["必须", "禁止", "重要", "注意", "约束"]
def compress(self, history: list, max_length: int) -> list:
critical = [msg for msg in history if any(kw in str(msg) for kw in self.KEYWORDS)]
non_critical = [msg for msg in history if msg not in critical]
slots = max_length - len(critical)
return critical + non_critical[-max(0, slots):]
class AdaptiveCompressor:
"""自适应压缩器。"""
def __init__(self, strategy: CompressionStrategy = None):
self.strategy = strategy or SlidingWindowStrategy()
def set_strategy(self, strategy: CompressionStrategy):
self.strategy = strategy
def compress(self, history: list, max_length: int) -> list:
return self.strategy.compress(history, max_length)生产环境部署与性能优化
敏感数据识别的实践要点
将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。
具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。
过滤命中率的关键指标
监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。
除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。
实时流处理的架构考量
随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。
在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。
运维团队的协作建议
技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。
在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。
输入过滤规则需要定期审计。攻击者的绕过手段不断进化,每季度应组织安全评审,更新黑名单和检测模型,确保防御体系跟上威胁变化。