复杂任务选型:多Agent + Router + Critic
适用场景与复杂度评估
复杂任务通常具备以下特征:
- 需要多专业领域协作
- 高度不确定的任务环境
- 需要质量保证和风险控制
- 涉及复杂的决策流程
- 需要人机协作
# 复杂任务架构:多Agent协作模式
class MultiAgentOrchestrator:
def __init__(self):
self.router = TaskRouter()
self.workers = {
"developer": DeveloperAgent(),
"analyst": AnalystAgent(),
"designer": DesignerAgent(),
"qa": QAAgent()
}
self.critic = CriticAgent()
self.coordinator = AgentCoordinator()
async def process(self, complex_task: ComplexTask) -> TaskResult:
# 1. 任务分解与路由
subtasks = await self.router.decompose(complex_task)
agent_assignments = await self.router.route(subtasks)
# 2. 并行执行
agent_results = await self.coordinator.execute_parallel(
agent_assignments, self.workers
)
# 3. 结果聚合
aggregated_result = await self.coordinator.aggregate_results(
agent_results
)
# 4. 质量评估与修正
quality_score = await self.critic.evaluate(aggregated_result)
if quality_score < QUALITY_THRESHOLD:
# 重新分配或修正
corrected_result = await self.critic.correct(
aggregated_result, agent_results
)
return corrected_result
return aggregated_result
class TaskRouter:
def __init__(self):
self.skill_registry = {}
self.load_balancer = LoadBalancer()
self.quality_predictor = QualityPredictor()
async def decompose(self, task: ComplexTask) -> List[SubTask]:
"""将复杂任务分解为可执行的子任务"""
subtasks = []
for requirement in task.requirements:
subtasks.append(SubTask(
id=f"sub_{len(subtasks)}",
description=requirement,
required_skills=self._infer_skills(requirement)
))
return subtasks
async def route(self, subtasks: List[SubTask]) -> Dict[str, List[SubTask]]:
"""将子任务路由到合适的Agent"""
assignments = {}
for subtask in subtasks:
agent_type = self._select_agent_type(subtask)
assignments.setdefault(agent_type, []).append(subtask)
return assignments
def _infer_skills(self, requirement: str) -> List[str]:
"""从需求描述中推断所需技能"""
skill_keywords = {
"code": ["编程", "开发", "代码", "调试"],
"analysis": ["分析", "统计", "数据", "报告"],
"design": ["设计", "UI", "架构", "原型"],
"qa": ["测试", "质量", "验证", "审查"]
}
matched_skills = []
for skill, keywords in skill_keywords.items():
if any(kw in requirement for kw in keywords):
matched_skills.append(skill)
return matched_skills if matched_skills else ["general"]
def _select_agent_type(self, subtask: SubTask) -> str:
"""基于子任务需求选择Agent类型"""
skills = subtask.required_skills
if "code" in skills:
return "developer"
elif "analysis" in skills:
return "analyst"
elif "design" in skills:
return "designer"
elif "qa" in skills:
return "qa"
return "general"基于安全要求选型
在LLM Agent架构设计中,安全要求是影响架构选型的核心维度之一。不同的安全级别决定了我们对权限控制、数据隔离、审计追踪等安全控制措施的严格程度。本文将从安全威胁模型出发,分析不同安全要求下的架构选型策略与具体实现方案。
安全威胁模型与风险评估
核心安全威胁
提示注入攻击(Prompt Injection)
- 恶意用户通过精心构造的输入让Agent执行非预期操作
- 包括越权访问、数据泄露、系统破坏等
工具调用越权(Tool Escalation)
- 绕过权限控制调用敏感工具或API
- 组合调用多个工具实现更复杂的攻击
数据泄露与隐私风险
- 模型记忆或响应中泄露敏感信息
- 通过提示探测获取训练数据或内部知识
身份冒充与认证绕过
- 伪装身份访问系统资源
- 利用会话状态进行身份盗用
风险评估矩阵
| 威胁类型 | 发生概率 | 影响程度 | 风险等级 | 优先级 |
|---|---|---|---|---|
| 提示注入 | 高 | 中 | 高 | P0 |
| 工具越权 | 中 | 高 | 高 | P0 |
| 数据泄露 | 中 | 高 | 高 | P0 |
| 身份冒充 | 低 | 高 | 中 | P1 |
| 社工攻击 | 低 | 中 | 低 | P2 |
安全级别划分与架构选型
三个安全级别定义
1. 标准安全级别(Standard Security)
- 适用场景:内部工具、一般业务系统
- 核心要求:基础权限控制、日志记录
- 安全控制:用户认证、基础权限、访问日志
2. 高安全级别(High Security)
- 适用场景:金融、医疗、政务系统
- 核心要求:严格权限控制、完整审计、数据隔离
- 安全控制:多级认证、细粒度权限、完整审计链路
3. 极高安全级别(Extreme Security)
- 适用场景:国家安全、核心机密系统
- 核心要求:最高安全标准、物理隔离、零信任架构
- 安全控制:物理隔离、多因素认证、硬件安全模块
标准安全级别架构选型
推荐架构模式
用户界面 → 认证网关 → 编排层 → 工具层 → 数据层
↓ ↓ ↓ ↓ ↓
基础权限 基础日志 单Agent 基础工具 标准存储核心安全控制实现
1. 基础权限管理
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional
import json
class PermissionLevel(Enum):
READ = "read"
WRITE = "write"
EXECUTE = "execute"
ADMIN = "admin"
@dataclass
class SecurityContext:
user_id: str
session_id: str
permissions: List[PermissionLevel]
resource_access: Dict[str, PermissionLevel]
audit_required: bool = True
class BasicPermissionManager:
def __init__(self):
self.user_permissions = {}
self.resource_permissions = {}
def check_permission(self, context: SecurityContext,
resource: str, action: PermissionLevel) -> bool:
"""检查用户对资源的权限"""
# 检查用户是否对资源有权限
if resource not in context.resource_access:
return False
user_level = context.resource_access[resource]
return self._has_permission(user_level, action)
def _has_permission(self, user_level: PermissionLevel,
required_level: PermissionLevel) -> bool:
"""权限级别检查"""
level_hierarchy = {
PermissionLevel.READ: 1,
PermissionLevel.WRITE: 2,
PermissionLevel.EXECUTE: 3,
PermissionLevel.ADMIN: 4
}
return level_hierarchy[user_level] >= level_hierarchy[required_level]
# 使用示例
def create_secure_agent_tool():
permission_manager = BasicPermissionManager()
def secure_tool_call(context: SecurityContext, tool_name: str,
parameters: Dict, **kwargs):
# 权限检查
if not permission_manager.check_permission(context, tool_name,
PermissionLevel.EXECUTE):
raise PermissionError(f"User {context.user_id} lacks permission to execute {tool_name}")
# 基础安全检查
if tool_name in ["delete_data", "execute_sql"]:
audit_log = {
"user_id": context.user_id,
"session_id": context.session_id,
"tool_name": tool_name,
"parameters": parameters,
"timestamp": datetime.utcnow().isoformat()
}
print(f"AUDIT: {json.dumps(audit_log)}")
# 执行工具调用
return execute_tool(tool_name, parameters)
return secure_tool_call2. 基础提示安全控制
import re
from typing import List
class BasicPromptSecurityFilter:
def __init__(self):
# 基础敏感词检测
self.dangerous_patterns = [
r"ignore.*previous.*instruction",
r"forget.*everything.*above",
r"system.*prompt",
r"jailbreak",
r"sudo.*execute"
]
# 禁止执行的命令模式
self.forbidden_commands = [
"rm -rf", "format", "del /s", "shutdown", "reboot"
]
def filter_prompt(self, user_input: str) -> str:
"""过滤不安全的提示"""
filtered_input = user_input
# 检查危险模式
for pattern in self.dangerous_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
raise SecurityException("Detected potentially malicious prompt pattern")
# 检查禁止命令
for cmd in self.forbidden_commands:
if cmd in user_input.lower():
raise SecurityException("Detected forbidden command pattern")
return filtered_input
def sanitize_tool_output(self, output: str) -> str:
"""清理工具输出中的敏感信息"""
# 移除可能的密码、密钥模式
sensitive_patterns = [
r"password[:\s]+[^\s]+",
r"key[:\s]+[A-Za-z0-9+/]+={0,2}",
r"token[:\s]+[A-Za-z0-9+/]+={0,2}"
]
for pattern in sensitive_patterns:
output = re.sub(pattern, "[REDACTED]", output, flags=re.IGNORECASE)
return output
# 基础安全Agent实现
class BasicSecureAgent:
def __init__(self, tool_manager, permission_manager):
self.tool_manager = tool_manager
self.permission_manager = permission_manager
self.security_filter = BasicPromptSecurityFilter()
def process_request(self, context: SecurityContext, user_input: str) -> str:
try:
# 1. 安全过滤
safe_input = self.security_filter.filter_prompt(user_input)
# 2. 基本响应生成
response = self._generate_response(safe_input)
# 3. 输出清理
safe_response = self.security_filter.sanitize_tool_output(response)
return safe_response
except SecurityException as e:
return f"Security check failed: {str(e)}"高安全级别架构选型
推荐架构模式
用户界面 → 多级认证 → 安全编排层 → 权限控制中心 → 工具沙箱 → 安全数据层
↓ ↓ ↓ ↓ ↓ ↓
强化认证 审计追踪 多Agent 细粒度权限 沙箱执行 加密存储核心安全控制实现
1. 细粒度权限控制系统
from abc import ABC, abstractmethod
from typing import Set, Dict, Any
import json
import hmac
import hashlib
class SecurityPolicy(ABC):
@abstractmethod
def evaluate(self, context: SecurityContext, resource: str,
action: str, parameters: Dict[str, Any]) -> bool:
pass
class RBACPolicy(SecurityPolicy):
"""基于角色的访问控制策略"""
def __init__(self):
self.roles = {}
self.permissions = {}
def add_role(self, role_name: str, permissions: Set[str]):
self.roles[role_name] = permissions
def evaluate(self, context: SecurityContext, resource: str,
action: str, parameters: Dict[str, Any]) -> bool:
role = getattr(context, "role", None)
if role is None:
return False
allowed = self.roles.get(role, set())
return action in allowed基于性能要求选型
在LLM Agent架构设计中,性能要求是决定架构选型的关键因素。不同的业务场景对延迟、准确性、吞吐量和成本有着不同的权衡需求。本节将深入分析基于性能要求的架构选型策略,并提供具体的技术实现方案和对比分析。
性能维度的架构影响分析
在深入讨论具体选型策略前,我们需要理解不同性能维度对架构设计的直接影响:
graph TB
A[性能要求] --> B[低延迟要求]
A --> C[高质量要求]
A --> D[高吞吐量要求]
A --> E[成本敏感要求]
A --> F[可扩展性要求]
B --> B1[轻量模型]
B --> B2[减少规划步数]
B --> B3[并行执行]
B --> B4[缓存策略]
C --> C1[复杂规划策略]
C --> C2[多模型集成]
C --> C3[质量验证机制]
C --> C4[反思优化]
D --> D1[微服务架构]
D --> D2[负载均衡]
D --> D3[队列系统]
D --> D4[异步处理]
E --> E1[模型压缩]
E --> E2[资源优化]
E --> E3[缓存复用]
E --> E4[按需调用]
F --> F1[模块化设计]
F --> F2[容器化部署]
F --> F3[水平扩展]
F --> F4[服务发现]低延迟场景架构选型
1. 轻量级单Agent架构
当业务场景对响应时间有严格要求(如实时对话、智能推荐)时,推荐采用轻量级单Agent架构:
# 低延迟Agent架构示例
class LowLatencyAgent:
def __init__(self):
self.model = "gpt-3.5-turbo" # 轻量模型
self.cache = RedisCache() # 高速缓存
self.tools = {
"search": FastSearchTool(),
"weather": WeatherAPI()
}
async def process(self, user_input: str) -> str:
# 检查缓存
cache_key = self._generate_cache_key(user_input)
if cached_result := await self.cache.get(cache_key):
return cached_result
# 单步直接执行
result = await self._direct_execute(user_input)
# 缓存结果
await self.cache.set(cache_key, result, ttl=300)
return result
async def _direct_execute(self, query: str) -> str:
# 最小化工具调用
if "weather" in query:
return await self.tools["weather"].get_current(query)
else:
return await self._llm_call(query)性能优化策略:
- 模型选择:优先使用推理速度快的轻量模型
- 缓存策略:多层缓存(Redis + 内存缓存)
- 减少规划步数:直接执行而非复杂的多步规划
- 工具优化:预加载常用工具,减少调用延迟
2. 并行执行架构
对于可以并行处理的复杂任务,采用并行执行架构:
class ParallelAgent:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=5)
self.sub_agents = [
SearchAgent(),
AnalysisAgent(),
SynthesisAgent()
]
async def process(self, query: str) -> str:
# 并行执行子任务
tasks = [
self.executor.submit(agent.analyze, query)
for agent in self.sub_agents
]
# 收集结果
results = await asyncio.gather(*tasks)
# 快速合成最终结果
return self._quick_synthesize(results)