概述
审批状态机与 UI 集成方案的交互设计
人工介入(Human-in-the-loop)是 AI 系统从"演示级"迈向"生产级"的关键门槛。完全自主的 Agent 在涉及资金、数据删除、权限变更等高风险操作时,必须引入人类的最终审批。Agents SDK 通过 needs_approval=True 为这一需求提供了底层支持,但生产环境的实现远比装饰器参数复杂。
从状态机视角来看,一个需要审批的工具调用会经历以下状态转换:
PENDING -> WAITING_FOR_APPROVAL -> APPROVED -> EXECUTING -> COMPLETED
|
+-> REJECTED -> ABORTED这个状态机的工程挑战在于持久化和异步通知:
- 持久化:审批请求必须被持久化存储,因为审批者可能在几分钟甚至几小时后才会响应。如果在这期间服务重启,未完成的审批不能丢失。
- 异步通知:系统需要主动通知审批者(如推送通知、邮件、钉钉消息),而不是让审批者不断轮询。
- 并发控制:如果一个审批请求被多个审批者同时处理,必须确保只有一个审批结果生效(乐观锁或分布式锁)。
从 UI 集成来看,人工介入需要前后端的紧密配合:
- 后端:暴露审批 API(
POST /approvals/{id}/approve和/reject),维护审批队列,与 Agent 的运行状态同步。 - 前端:在聊天界面中渲染待审批卡片,显示操作详情(如"即将删除用户 u123 的所有数据"),提供"批准"和"拒绝"按钮。
- Agent:在检测到待审批项时,生成友好的提示语告知用户"您的请求已提交审批,通常需要 1-5 分钟"。
性能方面,人工介入会显著增加请求的尾延迟(P99 可能从秒级增长到分钟级)。对于用户体验,关键是管理预期:让用户清楚地知道正在等待什么、需要多久、以及当前状态。
needs_approval、interruptions 和 RunState:在关键操作前暂停等待人工审批。
正文
相关阅读
参考文档
完整实战示例:异步审批系统与状态恢复
以下示例展示了如何在生产环境中构建一个支持异步审批、状态持久化和通知的完整人工介入系统:
import asyncio
import uuid
import time
from dataclasses import dataclass, field
from typing import Any
from agents import Agent, Runner, function_tool, RunContextWrapper
from agents.run_context import RunContextWrapper as RCW
@dataclass
class ApprovalRequest:
request_id: str
run_id: str
tool_name: str
tool_args: dict
requested_at: float
status: str = "pending" # pending, approved, rejected, expired
resolved_at: float | None = None
class AsyncApprovalStore:
"""异步审批存储(简化版,生产环境应使用数据库)。"""
def __init__(self):
self._requests: dict[str, ApprovalRequest] = {}
self._callbacks: dict[str, asyncio.Event] = {}
async def create(self, req: ApprovalRequest) -> None:
self._requests[req.request_id] = req
self._callbacks[req.request_id] = asyncio.Event()
async def approve(self, request_id: str) -> bool:
req = self._requests.get(request_id)
if not req or req.status != "pending":
return False
req.status = "approved"
req.resolved_at = time.time()
self._callbacks[request_id].set()
return True
async def reject(self, request_id: str) -> bool:
req = self._requests.get(request_id)
if not req or req.status != "pending":
return False
req.status = "rejected"
req.resolved_at = time.time()
self._callbacks[request_id].set()
return True
async def wait_for_resolution(self, request_id: str, timeout: float = 300.0) -> ApprovalRequest | None:
event = self._callbacks.get(request_id)
if not event:
return None
try:
await asyncio.wait_for(event.wait(), timeout=timeout)
return self._requests.get(request_id)
except asyncio.TimeoutError:
req = self._requests.get(request_id)
if req:
req.status = "expired"
return req
approval_store = AsyncApprovalStore()
@function_tool(needs_approval=True)
def delete_user_data(user_id: str, scope: str = "all") -> str:
"""删除用户数据。此操作不可逆,需要人工审批。"""
return f"User data for {user_id} (scope={scope}) has been permanently deleted."
@function_tool(needs_approval=True)
def transfer_funds(to_account: str, amount: float, currency: str = "USD") -> str:
"""转账操作。需要人工审批。"""
return f"Transferred {amount} {currency} to {to_account}."
class HumanInTheLoopRunner:
"""支持人工介入的运行器包装。"""
async def run(
self,
agent: Agent,
user_input: str,
timeout: float = 600.0,
) -> dict[str, Any]:
# 简化示例:实际应使用 Runner 的 state 序列化/恢复机制
result = await Runner.run(agent, user_input)
# 检查是否有待审批项
interruptions = getattr(result, "interruptions", [])
if not interruptions:
return {"status": "completed", "output": result.final_output}
pending = []
for interruption in interruptions:
req = ApprovalRequest(
request_id=str(uuid.uuid4()),
run_id=getattr(result, "run_id", "unknown"),
tool_name=getattr(interruption, "tool_name", "unknown"),
tool_args=getattr(interruption, "tool_args", {}),
requested_at=time.time())
人机协作决策流程
下图展示了 Agent 在何时、以何种方式请求人工介入的决策流程:
mermaid
flowchart TD
A[Agent 处理请求] --> B{置信度 > 0.9?}
B -->|是| C[直接输出]
B -->|否| D{涉及敏感操作?}
D -->|是| E[请求人工审批]
D -->|否| F{多次尝试失败?}
F -->|是| E
F -->|否| G[输出+免责声明]
E --> H[人工审核界面]
H --> I{审批结果?}
I -->|批准| C
I -->|拒绝| J[返回拒绝理由]
I -->|需补充| K[补充信息后重试]
K --> A
style E fill:#e8d5b5,stroke:#5a4a3a
style H fill:#c5e0b4,stroke:#5a4a3a
style I fill:#f4b183,stroke:#5a4a3a
人工介入不应是最后手段,而应是系统设计的一部分。在关键决策点主动引入人类判断,可以显著提升系统的可靠性和用户信任度。
## 人工介入的效率优化
人工介入虽然提升了可靠性,但也引入了延迟。在客服场景中,用户可能需要等待数分钟才能获得人工响应,这严重影响体验。以下是提升人机协作效率的策略:
**预填充决策上下文**:
当请求被路由到人工审核时,不要只传递原始请求,而是让 Agent 预先生成一份决策摘要,包含建议操作、风险评估和关键数据。审核员可以在 10 秒内理解情况并做出决策,而非花费 2 分钟阅读完整上下文。
```python
decision_summary = {
"request_type": "退款申请",
"amount": "¥500",
"user_history": "3年老用户,过往退款率 0%",
"agent_recommendation": "批准退款",
"risk_factors": ["金额超过自动审批阈值"],
"supporting_docs": ["订单截图", "支付凭证"],
}批量审批:
对于低风险、高频率的操作(如小额退款、常规权限变更),可以引入批量审批机制。系统收集一段时间的待审批请求,审核员一次性批量处理,效率提升 5-10 倍。
智能路由:
不是所有人工审核都需要高级专家处理。建立审核员分级体系:初级审核员处理标准化请求(如退款金额 < ¥100),高级审核员处理复杂争议(如欺诈嫌疑、大额交易)。智能路由根据请求特征自动分配给合适的审核员,减少专家资源浪费。
反馈闭环:
人工审核的结果应该反馈给 Agent,用于持续学习。如果审核员频繁修正 Agent 的推荐,说明 Agent 的决策逻辑存在偏差,需要通过 prompt 优化或微调来改进。
人机协作的 UX 设计同样重要。设计良好的人机协作界面需要考虑:上下文传递将 Agent 的推理过程、工具调用记录和相关数据完整展示给人类操作员;操作简化提供一键批准、一键拒绝和自定义回复三种选项;时效提示显示请求等待时长和 SLA 剩余时间。介入审批的审计日志应永久保存,满足合规要求。日志中需记录操作员身份、审批时间、原始请求和最终决策,支持事后追溯。
常见问题与调试
问题一:审批状态丢失导致运行无法恢复
如果服务在审批期间重启,未完成的审批请求和对应的 Agent 状态可能丢失。解决方案:
- 将审批请求和运行状态持久化到数据库(如 PostgreSQL),而非内存。
- 使用消息队列(如 RabbitMQ、Kafka)保证审批事件的不丢失传递。
- 实现状态恢复的幂等性:即使恢复操作被执行多次,结果也一致。
问题二:用户长时间等待没有反馈
如果审批者迟迟不响应,用户可能认为系统卡住了。用户体验优化:
- 在审批等待期间,定期向用户发送状态更新(如"您的请求正在审批中,已等待 2 分钟")。
- 提供取消选项:允许用户撤销 pending 的审批请求。
- 设置合理的超时(如 5 分钟),超时后自动拒绝并建议用户联系人工客服。
问题三:审批 UI 与 Agent 上下文不同步
审批者在 UI 中看到的工具参数可能与 Agent 实际要执行的参数不一致(如时序差导致参数被更新)。防范措施:
- 在创建审批请求时冻结参数快照,审批 UI 展示的是冻结值而非实时值。
- 如果参数在审批期间发生了变化,自动作废原审批请求并创建新的审批。
- 在审批通过后执行前,再次校验参数的有效性(如账户余额是否仍然充足)。
与其他方案对比
| 维度 | Agents SDK HITL | CAMEL 协作框架 | 自建审批工作流 |
|---|---|---|---|
| 实现层级 | 工具装饰器级别 | 消息协议级别 | 应用架构级别 |
| 持久化支持 | 需自行扩展 | 基础支持 | 完全可控 |
| 通知集成 | 需自行实现 | 需自行实现 | 完全可控 |
| UI 适配难度 | 中 | 高 | 高 |
CAMEL 框架通过角色扮演和协作消息协议实现了更自然的"人机协作"模式,适合研究人机交互的学术场景。Agents SDK 的 needs_approval 则是一个务实的工程方案,最小侵入式地解决了高风险操作的人工把关问题。对于大多数企业应用来说,在 SDK 基础上自建审批工作流(结合现有的 OA 系统或工单系统)可能是最可行的路径,虽然开发成本较高,但能与现有 IT 基础设施深度集成。
异步审批系统的观察者模式与参数快照
人工介入系统的核心挑战在于如何将同步的 Agent 运行流程与异步的人类审批流程无缝衔接。从事件驱动的视角来看,审批请求的产生、通知、响应和恢复构成了一条完整的事件流。为了降低系统耦合度,**观察者模式(Observer Pattern)**是实现通知机制的理想选择:当审批请求被创建时,所有注册的观察者(邮件通知器、钉钉机器人、WebSocket 推送服务)都会收到事件并独立处理。这种模式使得新的通知渠道可以零侵入地接入系统,审批事件的发布者无需关心具体有哪些消费者在监听,从而实现了发布与订阅的完全解耦。
sequenceDiagram
participant U as 用户
participant A as Agent
participant S as 审批存储
participant N as 通知服务
participant H as 审批者
participant R as 恢复引擎
U->>A: 发起高风险请求
A->>S: 创建审批请求
S->>N: 触发通知事件
N->>H: 推送审批通知
H->>S: 审批通过/拒绝
S->>R: 触发恢复事件
R->>A: 恢复 Agent 运行
A->>U: 返回最终结果参数快照是防止审批期间数据漂移的关键技术。当审批请求被创建时,系统必须冻结工具参数的当前值,而非保存引用。如果保存的是字典引用,在审批等待期间,Agent 或其他并发流程可能修改该字典,导致审批者看到的是旧参数,实际执行的是新参数,产生严重的安全隐患。以下是一个安全的参数冻结实现:
import copy
import json
import time
from dataclasses import dataclass
@dataclass(frozen=True)
class FrozenToolCall:
tool_name: str
tool_args: str # JSON 序列化后的参数
snapshot_at: float
@classmethod
def freeze(cls, tool_name: str, args: dict, timestamp: float):
# 深拷贝 + JSON 序列化双重保险
copied = copy.deepcopy(args)
return cls(
tool_name=tool_name,
tool_args=json.dumps(copied, sort_keys=True),
snapshot_at=timestamp
)
def get_args(self) -> dict:
return json.loads(self.tool_args)
class ParamSnapshotStore:
"""参数快照存储:确保审批期间参数不可变。"""
def __init__(self):
self._snapshots: dict[str, FrozenToolCall] = {}
def create_snapshot(self, request_id: str, tool_name: str, args: dict) -> FrozenToolCall:
snapshot = FrozenToolCall.freeze(tool_name, args, time.time())
self._snapshots[request_id] = snapshot
return snapshot
def get_snapshot(self, request_id: str) -> FrozenToolCall | None:
return self._snapshots.get(request_id)在超时处理方面,建议采用分级超时策略:初级超时(如 2 分钟)触发提醒通知给审批者;中级超时(如 5 分钟)通知备选审批者或上级;终极超时(如 10 分钟)自动拒绝并引导用户转人工客服。这种策略既避免了无限等待,又给予了充分的审批响应时间。审批结果恢复时,应再次校验参数的时效性,例如检查账户余额是否仍然充足、目标资源是否仍然存在,确保执行时的上下文与审批时的上下文一致。对于涉及资金操作的审批,强烈建议在执行前引入二次确认机制,由系统再次向审批者推送即将执行的操作摘要,审批者确认后才真正触发工具执行。这种双重确认在金融和医疗等高风险领域是行业标准做法,能够最大程度地降低误操作风险。此外,审批日志应作为审计追踪长期保留,满足合规要求并为事后责任认定提供依据。
从用户体验的角度,审批等待期间应提供清晰的状态反馈。建议在聊天界面中展示审批进度条或状态徽章,让用户随时了解当前处于哪个环节。对于被自动拒绝的请求,系统应给出明确的替代方案,例如引导用户修改请求参数后重新提交,或直接转接到人工客服通道。良好的审批体验不仅是技术问题,更是产品设计的核心考量。
生产环境部署与性能优化
审批工作流的实践要点
将本章节的技术应用到生产环境时,首要考虑的是稳定性与可观测性。建议采用渐进式 rollout 策略:先在开发环境验证核心逻辑,再迁移到预发布环境进行压力测试,最后才全量上线。部署过程中应配置完善的日志收集和指标监控,确保任何问题都能被快速发现和定位。
具体来说,需要在基础设施层面做好以下准备:容器资源限制(CPU/内存)、网络策略配置(防火墙规则、服务网格)、持久化存储选型(SSD vs 标准盘)以及备份恢复方案。对于高可用要求严格的场景,建议部署多实例并配置负载均衡,避免单点故障导致服务中断。
审批时效指标的关键指标
监控是生产系统的生命线。针对本章节涉及的功能,建议重点跟踪以下指标:请求延迟(P50/P95/P99)、错误率(4xx/5xx/超时)、吞吐量(QPS/TPS)以及资源利用率(CPU/内存/磁盘/网络)。这些指标应接入统一的监控大盘,并设置合理的告警阈值。
除了基础指标,还应关注业务层面的指标。例如功能成功率、用户满意度、成本消耗趋势等。通过将技术指标与业务指标关联分析,可以更准确地评估系统改进的实际价值,避免陷入"为了优化而优化"的陷阱。
多级审批路由的架构考量
随着业务规模增长,单实例部署很快会成为瓶颈。扩展性设计应在项目初期就纳入考量,而非事后补救。水平扩展通常比垂直扩展更具成本效益,但也引入了分布式系统的复杂性(数据一致性、服务发现、负载均衡等)。
在扩展过程中,建议遵循"无状态优先"原则:将状态外置到独立的存储层(如 Redis、PostgreSQL),使计算层可以随时水平扩容。对于无法避免的状态(如会话、缓存),采用分布式一致性协议或最终一致性模型来管理。定期进行容量规划和压力测试,确保系统在流量峰值时仍能稳定运行。
运维团队的协作建议
技术方案的落地离不开高效的团队协作。建议建立清晰的运维手册(Runbook),涵盖常见故障的诊断步骤、应急处理流程和升级路径。同时,通过定期的复盘会议,将线上事故转化为团队的学习素材,持续完善系统的健壮性。
在工具链方面,推荐将本章节的配置和脚本纳入版本控制(Git),并使用 Infrastructure as Code(IaC)工具(如 Terraform、Ansible)管理基础设施变更。这不仅能提高部署效率,还能确保环境一致性,减少"在我机器上能跑"的问题。