在前面的章节中,我们陆续拆解了 Claude Code 的命令系统、状态管理、权限模型和工具链。但如果把 Claude Code 比作一个人体,QueryEngine 就是那颗持续跳动的心脏——它驱动着每一次用户输入到模型响应的完整生命周期,管理着 LLM 对话、工具调用决策、流式响应处理、上下文压缩和错误恢复。本文将深入解析这个约 46KB 的核心文件,厘清它的职责边界、内部架构以及与周边模块的协作关系。
一、QueryEngine 的定位:为什么是"心脏模块"
在 Claude Code 的源码仓库中,src/QueryEngine.ts 是一个约 1296 行、46KB 的核心文件。与它相邻的 src/query.ts 更大(约 68KB),但两者的职责有明确分工:
- QueryEngine.ts:负责对话生命周期和会话状态管理。它是一个可实例化的类,每个对话对应一个
QueryEngine实例,状态(消息历史、文件缓存、用量统计等)在多次submitMessage()调用之间持久化。 - query.ts:负责单次查询的循环执行。它是一个纯函数(
query()),在while(true)中驱动 LLM API 调用、工具执行、上下文压缩和错误恢复,但不持有跨轮次的状态。
这种分层设计体现了清晰的架构思想:QueryEngine 是"有状态的管理器",query.ts 是"无状态的执行器"。QueryEngine 将外部输入转换为内部状态更新,然后委托 query.ts 完成一轮具体的模型交互;query.ts 执行完毕后,QueryEngine 再将结果整合回会话状态。
从源码注释中可以印证这一点:
/**
* QueryEngine owns the query lifecycle and session state for a conversation.
* It extracts the core logic from ask() into a standalone class that can be
* used by both the headless/SDK path and (in a future phase) the REPL.
*
* One QueryEngine per conversation. Each submitMessage() call starts a new
* turn within the same conversation. State persists across turns.
*/来源:
src/QueryEngine.ts,第 97–105 行。
export class QueryEngine {
private config: QueryEngineConfig
private mutableMessages: Message[]
private abortController: AbortController
private permissionDenials: SDKPermissionDenial[]
private totalUsage: NonNullableUsage
private hasHandledOrphanedPermission = false
private readFileState: FileStateCache
private discoveredSkillNames = new Set<string>()
private loadedNestedMemoryPaths = new Set<string>()来源:
src/QueryEngine.ts,第 107–120 行。
这六个私有字段就是 QueryEngine 的"记忆":消息历史、中断控制器、权限拒绝记录、累计 Token 用量、文件读取缓存,以及技能发现追踪。
二、QueryEngine 在系统中的位置
为了理解 QueryEngine 与周边模块的关系,我们先用一张架构图来展示它的位置:
flowchart TB
subgraph Entrypoints["入口层"]
REPL["REPL (交互式)"]
SDK["SDK (headless)"]
AgentTool["AgentTool (子代理)"]
end
subgraph EngineLayer["引擎层"]
QueryEngine["QueryEngine.ts\n(会话状态管理)"]
end
subgraph LoopLayer["循环层"]
Query["query.ts\n(单次查询循环)"]
end
subgraph QueryInfra["查询基础设施"]
Config["query/config.ts"]
Deps["query/deps.ts"]
StopHooks["query/stopHooks.ts"]
TokenBudget["query/tokenBudget.ts"]
end
subgraph ToolLayer["工具层"]
Tool["Tool.ts\n(工具定义与类型)"]
Tools["tools.ts\n(工具注册)"]
ToolOrchestration["services/tools/\ntoolOrchestration.ts"]
StreamingExecutor["services/tools/\nStreamingToolExecutor.ts"]
end
subgraph StateLayer["状态层"]
AppState["state/AppState.ts"]
Context["utils/queryContext.ts\n(系统提示构建)"]
end
subgraph APILayer["API 层"]
ClaudeAPI["services/api/claude.ts\n(流式调用)"]
end
REPL -->|ask()| QueryEngine
SDK -->|ask()| QueryEngine
AgentTool -->|ask()| QueryEngine
QueryEngine -->|submitMessage()\n委托| Query
QueryEngine -->|读取/更新| AppState
QueryEngine -->|构建系统提示| Context
Query -->|调用| ClaudeAPI
Query -->|注入依赖| Deps
Query -->|读取配置| Config
Query -->|执行后处理| StopHooks
Query -->|检查预算| TokenBudget
Query -->|调度工具| ToolOrchestration
Query -->|流式执行| StreamingExecutor
ToolOrchestration -->|使用| Tool
StreamingExecutor -->|使用| Tool
QueryEngine -->|传递| Tool这张图展示了 QueryEngine 处于"承上启下"的关键位置:
- 向上:为 REPL、SDK 和 AgentTool 提供统一的对话入口。
ask()函数(在QueryEngine.ts末尾)是一个便利包装器,创建QueryEngine实例并调用submitMessage()。 - 向下:将具体的一次查询委托给
query.ts,同时提供必要的上下文(消息历史、系统提示、工具上下文等)。 - 横向:与
AppState交互以读取和更新全局状态,与Tool.ts交互以获取工具定义和权限策略。
三、QueryEngine 与 query.ts 的分工
如果把一次完整的对话比作一场足球比赛,QueryEngine 是球队经理,query.ts 是场上教练。
QueryEngine 的职责
- 会话状态持久化:
mutableMessages跨轮次累积,用户的前一条提问和模型的回答会成为下一条提问的上下文。 - 系统提示构建:每轮调用前,QueryEngine 都会调用
fetchSystemPromptParts()重新组装系统提示,注入工具列表、MCP 客户端信息、自定义提示词等。 - 用户输入预处理:通过
processUserInput()处理斜杠命令、附件、内存加载等。 - 用量与成本追踪:累加每轮对话的
usage,计算总成本。 - 权限拒绝追踪:记录被模型拒绝的工具调用,供 SDK 上报。
- 流式响应分发:将
query.ts生成的内部消息转换为SDKMessage格式 yield 给调用方。
query.ts 的职责
- LLM 流式调用:通过
deps.callModel()(即queryModelWithStreaming)发起 API 请求,处理message_start、content_block_delta、message_delta、message_stop等事件。 - 上下文压缩决策:在每次 API 调用前,按顺序执行 snip compact、microcompact、autocompact 和 context collapse,确保提示词不超过模型窗口。
- 工具调用执行:检测 assistant 消息中的
tool_use块,调度runTools()或StreamingToolExecutor执行工具,生成tool_result。 - 错误恢复:处理
prompt_too_long、max_output_tokens、模型 fallback、流式中断等异常情况。 - 停止钩子(Stop Hooks):在 assistant 响应完成后,执行各类钩子(如任务完成检测、队友空闲通知等)。
- Token 预算检查:检查当前轮次的 Token 消耗是否超出预算,决定是否继续或停止。
这种分工意味着:QueryEngine 关心"这是谁的比赛",query.ts 关心"这一回合怎么踢"。
四、QueryEngine 内部模块划分
QueryEngine 虽然是一个类,但其内部逻辑可以划分为几个清晰的阶段:
flowchart LR
subgraph SubmitMessage["submitMessage() 执行流程"]
direction TB
A["1. 初始化阶段"] --> B["2. 系统提示构建"]
B --> C["3. 用户输入处理"]
C --> D["4. 委托 query()"]
D --> E["5. 流式响应处理"]
E --> F["6. 结果归一化"]
end
subgraph Init["1. 初始化阶段"]
Init1["清除 discoveredSkillNames"]
Init2["设置 cwd"]
Init3["包装 canUseTool\n(追踪权限拒绝)"]
Init4["确定主循环模型"]
end
subgraph SysPrompt["2. 系统提示构建"]
Sys1["fetchSystemPromptParts()"]
Sys2["加载 memoryMechanicsPrompt"]
Sys3["asSystemPrompt() 组装"]
Sys4["注册结构化输出强制"]
end
subgraph UserInput["3. 用户输入处理"]
UI1["构建 ProcessUserInputContext"]
UI2["处理 orphan permission"]
UI3["processUserInput()"]
UI4["写入 transcript"]
UI5["生成 replayableMessages"]
end
subgraph Delegate["4. 委托 query()"]
Del1["构建 buildSystemInitMessage"]
Del2["调用 query() AsyncGenerator"]
end
subgraph Stream["5. 流式响应处理"]
St1["assistant → normalizeMessage"]
St2["progress → 记录+转发"]
St3["user → 累加 turnCount"]
St4["stream_event → 更新 usage"]
St5["attachment → 提取 structured_output"]
St6["system → compact_boundary\n/api_retry"]
end
subgraph Result["6. 结果归一化"]
Res1["检查 USD 预算超限"]
Res2["检查结构化输出重试次数"]
Res3["判断 isResultSuccessful"]
Res4["构建 success/error result"]
end
A -.-> Init
B -.-> SysPrompt
C -.-> UserInput
D -.-> Delegate
E -.-> Stream
F -.-> Result这个流程图展示了 submitMessage() 的六个阶段。其中第 4 阶段是最关键的" delegation 点"——QueryEngine 将已准备好的 messages、systemPrompt、userContext 等参数传递给 query(),进入真正的模型交互循环。
五、QueryEngineConfig:连接外部世界的接口
QueryEngine 的构造函数接收一个庞大的配置对象 QueryEngineConfig,它定义了 QueryEngine 与外部系统的所有连接点:
export type QueryEngineConfig = {
cwd: string
tools: Tools
commands: Command[]
mcpClients: MCPServerConnection[]
agents: AgentDefinition[]
canUseTool: CanUseToolFn
getAppState: () => AppState
setAppState: (f: (prev: AppState) => AppState) => void
initialMessages?: Message[]
readFileCache: FileStateCache
customSystemPrompt?: string
appendSystemPrompt?: string
userSpecifiedModel?: string
fallbackModel?: string
thinkingConfig?: ThinkingConfig
maxTurns?: number
maxBudgetUsd?: number
taskBudget?: { total: number }
jsonSchema?: Record<string, unknown>
verbose?: boolean
replayUserMessages?: boolean
handleElicitation?: ToolUseContext['handleElicitation']
includePartialMessages?: boolean
setSDKStatus?: (status: SDKStatus) => void
abortController?: AbortController
orphanedPermission?: OrphanedPermission
snipReplay?: (
yieldedSystemMsg: Message,
store: Message[],
) => { messages: Message[]; executed: boolean } | undefined
}来源:
src/QueryEngine.ts,第 56–95 行。
这个配置对象的设计非常有讲究:
- 环境相关:
cwd(工作目录)、tools、commands、mcpClients定义了当前对话的"工作环境"。 - 状态访问器:
getAppState/setAppState是 QueryEngine 读取和修改全局状态的桥梁,避免了直接导入全局状态导致的测试困难。 - 模型配置:
userSpecifiedModel、fallbackModel、thinkingConfig控制模型行为。 - 预算与限制:
maxTurns、maxBudgetUsd、taskBudget是防止对话无限膨胀的安全阀。 - 结构化输出:
jsonSchema配合SYNTHETIC_OUTPUT_TOOL_NAME实现 JSON Schema 强制输出。 - 历史裁剪:
snipReplay是一个注入的回调,用于处理HISTORY_SNIP特性下的历史消息裁剪,保持 QueryEngine 本身不包含特性开关字符串(便于 tree-shaking 和测试)。
六、submitMessage():核心方法的解剖
submitMessage() 是 QueryEngine 唯一的核心公共方法,它返回一个 AsyncGenerator<SDKMessage>,这意味着调用方通过 for await...of 消费流式消息。其签名如下:
async *submitMessage(
prompt: string | ContentBlockParam[],
options?: { uuid?: string; isMeta?: boolean },
): AsyncGenerator<SDKMessage, void, unknown>来源:
src/QueryEngine.ts,第 135–137 行。
6.1 初始化与 canUseTool 包装
在方法开头,QueryEngine 会从配置中解构出大量参数,并做两件关键的事:
- 清除 turn-scoped 状态:
this.discoveredSkillNames.clear()确保每轮对话的技能发现记录不会无限增长。 - 包装
canUseTool:原始canUseTool被包装为wrappedCanUseTool,在返回结果前检查behavior !== 'allow',并将拒绝记录推入permissionDenials数组。这是 SDK 上报权限拒绝的数据来源。
const wrappedCanUseTool: CanUseToolFn = async (tool, input, toolUseContext, assistantMessage, toolUseID, forceDecision) => {
const result = await canUseTool(tool, input, toolUseContext, assistantMessage, toolUseID, forceDecision)
if (result.behavior !== 'allow') {
this.permissionDenials.push({
tool_name: sdkCompatToolName(tool.name),
tool_use_id: toolUseID,
tool_input: input,
})
}
return result
}来源:
src/QueryEngine.ts,第 159–174 行。
6.2 系统提示的组装
QueryEngine 调用 fetchSystemPromptParts() 获取默认系统提示、用户上下文和系统上下文。如果用户提供了自定义系统提示且设置了 CLAUDE_COWORK_MEMORY_PATH_OVERRIDE,还会注入记忆机制提示词:
const memoryMechanicsPrompt =
customPrompt !== undefined && hasAutoMemPathOverride()
? await loadMemoryPrompt()
: null
const systemPrompt = asSystemPrompt([
...(customPrompt !== undefined ? [customPrompt] : defaultSystemPrompt),
...(memoryMechanicsPrompt ? [memoryMechanicsPrompt] : []),
...(appendSystemPrompt ? [appendSystemPrompt] : []),
])来源:
src/QueryEngine.ts,第 192–207 行。
6.3 ProcessUserInputContext 的构建
QueryEngine 构建了一个巨大的 ProcessUserInputContext 对象,传递给 processUserInput() 和后续的 query()。这个上下文对象包含:
messages和setMessages:允许斜杠命令修改消息数组。getAppState/setAppState:状态访问。abortController:中断信号。readFileState:文件读取缓存。updateFileHistoryState/updateAttributionState:文件历史追踪和归属状态更新。setSDKStatus:SDK 状态回调。
值得注意的是,setMessages 在第一个 processUserInputContext 中是有效的(允许斜杠命令修改消息),但在第二个重建的上下文(斜杠命令处理完成后)中被替换为 no-op,确保后续流程不再修改消息数组。
6.4 对 query() 的委托与响应处理
QueryEngine 调用 query() 后,进入一个巨大的 for await...of 循环,处理 query() yield 的每一种消息类型:
| 消息类型 | 处理方式 |
|---|---|
assistant | 推入 mutableMessages,通过 normalizeMessage() 转换为 SDK 格式 yield |
user | 推入 mutableMessages,yield 给 SDK,累加 turnCount |
progress | 推入 mutableMessages,写入 transcript,yield |
stream_event | 更新 currentMessageUsage,累加到 totalUsage;如启用 includePartialMessages 则 yield |
attachment | 推入 mutableMessages,提取 structured_output,处理 max_turns_reached |
system | 处理 compact_boundary(释放前置消息用于 GC)、api_error(转换为 api_retry SDK 消息) |
tool_use_summary | 直接 yield 给 SDK |
tombstone | 跳过(内部控制信号) |
stream_request_start | 跳过 |
来源:
src/QueryEngine.ts,第 316–510 行。
这个 switch-case 是 QueryEngine 的"消息路由器",它将 query.ts 的内部消息格式翻译为 SDKMessage 格式,同时维护 mutableMessages 的完整性。
6.5 预算与重试检查
在循环结束后,QueryEngine 会检查多个终止条件:
- USD 预算超限:如果
getTotalCost() >= maxBudgetUsd,yielderror_max_budget_usd结果。 - 结构化输出重试超限:如果
jsonSchema启用且SYNTHETIC_OUTPUT_TOOL_NAME调用次数超过MAX_STRUCTURED_OUTPUT_RETRIES(默认 5 次),yielderror_max_structured_output_retries结果。 - 执行失败诊断:如果最后一条消息不是成功的 assistant/user 消息,yield
error_during_execution,并附带诊断信息和 turn-scoped 的错误日志。
if (maxBudgetUsd !== undefined && getTotalCost() >= maxBudgetUsd) {
yield { type: 'result', subtype: 'error_max_budget_usd', ... }
return
}来源:
src/QueryEngine.ts,第 430–455 行。
6.6 成功结果的构建
如果一切正常,QueryEngine 从最后一条 assistant 消息中提取文本结果,构建 success 类型的结果消息:
yield {
type: 'result',
subtype: 'success',
is_error: isApiError,
duration_ms: Date.now() - startTime,
duration_api_ms: getTotalAPIDuration(),
num_turns: turnCount,
result: textResult,
stop_reason: lastStopReason,
session_id: getSessionId(),
total_cost_usd: getTotalCost(),
usage: this.totalUsage,
modelUsage: getModelUsage(),
permission_denials: this.permissionDenials,
structured_output: structuredOutputFromTool,
fast_mode_state: getFastModeState(mainLoopModel, initialAppState.fastMode),
uuid: randomUUID(),
}来源:
src/QueryEngine.ts,第 545–563 行。
七、query.ts:单次查询循环的详细解剖
如果说 QueryEngine 是"会话经理",那么 query.ts 就是"回合导演"。它包含两个主要导出:
export type QueryParams = { ... }
export async function* query(params: QueryParams): AsyncGenerator<...> { ... }来源:
src/query.ts,第 67–89 行。
7.1 queryLoop 与状态机
query() 内部调用 queryLoop(),后者维护一个 State 对象作为跨迭代状态:
type State = {
messages: Message[]
toolUseContext: ToolUseContext
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number
hasAttemptedReactiveCompact: boolean
maxOutputTokensOverride: number | undefined
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
stopHookActive: boolean | undefined
turnCount: number
transition: Continue | undefined
}来源:
src/query.ts,第 93–105 行。
transition 字段记录了上一轮继续的原因(如 next_turn、max_output_tokens_recovery、stop_hook_blocking 等),这让测试可以断言恢复路径是否被触发,而无需检查消息内容。
7.2 上下文压缩流水线
在每次 API 调用前,queryLoop 会按顺序执行多个压缩阶段:
- Tool Result Budget:对工具结果大小进行限制,替换超长内容为摘要。
- Snip Compact(
HISTORY_SNIP):裁剪历史消息中可被移除的片段,释放 Token。 - Microcompact:基于缓存的微型压缩,利用
cache_deleted_input_tokens优化。 - Context Collapse(
CONTEXT_COLLAPSE):将已折叠的上下文视图投影到消息数组。 - Autocompact:当 Token 数超过阈值时,调用模型生成对话摘要,替换历史消息。
messagesForQuery = await applyToolResultBudget(messagesForQuery, ...)
// snip
messagesForQuery = snipResult.messages
// microcompact
messagesForQuery = microcompactResult.messages
// context collapse
messagesForQuery = collapseResult.messages
// autocompact
const { compactionResult, consecutiveFailures } = await deps.autocompact(...)来源:
src/query.ts,第 220–290 行。
这个流水线的设计体现了渐进式压缩的思想:先用廉价的方法(snip、microcompact)释放 Token,只有在必要时才调用昂贵的模型摘要(autocompact)。
7.3 流式 API 调用与 Fallback
queryLoop 通过 deps.callModel() 发起流式调用,并在一个内层 while 循环中支持模型 fallback:
while (attemptWithFallback) {
attemptWithFallback = false
try {
for await (const message of deps.callModel({ ... })) {
// 处理流式消息...
if (streamingFallbackOccured) {
// 清除孤儿消息、重置状态、重试
}
}
} catch (innerError) {
if (innerError instanceof FallbackTriggeredError && fallbackModel) {
currentModel = fallbackModel
attemptWithFallback = true
// 重置 assistantMessages、toolResults、toolUseBlocks
continue
}
throw innerError
}
}来源:
src/query.ts,第 380–450 行。
当主模型(如 Claude 3.5 Sonnet)因高负载不可用时,系统会自动切换到 fallback 模型(如 Claude 3 Haiku),并重新发起请求。这一过程中,streamingToolExecutor 会被 discard 并重建,防止旧的 tool_use_id 泄漏到新的请求中。
7.4 工具执行与 StreamingToolExecutor
如果 assistant 消息包含 tool_use 块,queryLoop 会进入工具执行阶段:
const toolUpdates = streamingToolExecutor
? streamingToolExecutor.getRemainingResults()
: runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)
for await (const update of toolUpdates) {
if (update.message) {
yield update.message
toolResults.push(...)
}
if (update.newContext) {
updatedToolUseContext = { ...update.newContext, queryTracking }
}
}来源:
src/query.ts,第 620–640 行。
这里有两个执行路径:
- StreamingToolExecutor:在流式响应过程中就开始执行工具(实验性功能,由
streamingToolExecutiongate 控制)。 - runTools:传统的批处理模式,等 assistant 消息完全接收后再执行工具。
工具执行完成后,queryLoop 还会生成 toolUseSummary(由 Haiku 模型异步生成,不阻塞下一轮 API 调用),并处理附件消息(如文件变更通知、队列命令等)。
7.5 递归与终止条件
queryLoop 的尾部是一个"递归"——不是真正的函数递归,而是通过 continue 跳转到 while(true) 的下一次迭代:
const next: State = {
messages: [...messagesForQuery, ...assistantMessages, ...toolResults],
toolUseContext: toolUseContextWithQueryTracking,
autoCompactTracking: tracking,
turnCount: nextTurnCount,
maxOutputTokensRecoveryCount: 0,
hasAttemptedReactiveCompact: false,
pendingToolUseSummary: nextPendingToolUseSummary,
maxOutputTokensOverride: undefined,
stopHookActive,
transition: { reason: 'next_turn' },
}
state = next来源:
src/query.ts,第 780–792 行。
终止条件包括:
- 用户中断(
abortController.signal.aborted) - 达到
maxTurns限制 - Stop Hook 阻止继续
- Token Budget 决定停止
max_output_tokens恢复次数超限prompt_too_long且无法恢复
八、src/query/ 目录生态
src/query/ 目录是 query.ts 的基础设施层,包含四个关键文件:
8.1 config.ts:运行时配置快照
export type QueryConfig = {
sessionId: SessionId
gates: {
streamingToolExecution: boolean
emitToolUseSummaries: boolean
isAnt: boolean
fastModeEnabled: boolean
}
}
export function buildQueryConfig(): QueryConfig {
return {
sessionId: getSessionId(),
gates: {
streamingToolExecution: checkStatsigFeatureGate_CACHED_MAY_BE_STALE('tengu_streaming_tool_execution2'),
emitToolUseSummaries: isEnvTruthy(process.env.CLAUDE_CODE_EMIT_TOOL_USE_SUMMARIES),
isAnt: process.env.USER_TYPE === 'ant',
fastModeEnabled: !isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_FAST_MODE),
},
}
}来源:
src/query/config.ts,第 10–35 行。
设计要点:
- 有意排除
feature()gate:feature()是编译期 tree-shaking 边界,必须留在守卫块内部;buildQueryConfig只包含运行时 gate(环境变量、Statsig)。 - 单次快照:在
query()入口处一次性捕获,避免在长达 5–30 秒的流式响应期间 gate 值翻转。
8.2 deps.ts:依赖注入
export type QueryDeps = {
callModel: typeof queryModelWithStreaming
microcompact: typeof microcompactMessages
autocompact: typeof autoCompactIfNeeded
uuid: () => string
}
export function productionDeps(): QueryDeps {
return {
callModel: queryModelWithStreaming,
microcompact: microcompactMessages,
autocompact: autoCompactIfNeeded,
uuid: randomUUID,
}
}来源:
src/query/deps.ts,第 15–38 行。
这是一个刻意的窄范围依赖注入(仅 4 个依赖),让测试可以直接传入 mock,而不必对每个模块做 spyOn。注释中提到这 4 个依赖每个都在 6–8 个测试文件中被 spy,注入模式消除了这些样板代码。
8.3 stopHooks.ts:停止钩子 orchestration
handleStopHooks() 在 assistant 响应完成后执行,负责:
- 保存缓存安全参数(供
/btw命令和side_question读取)。 - 执行模板作业分类(
TEMPLATESgate)。 - 执行
executeStopHooks、executeTaskCompletedHooks、executeTeammateIdleHooks。 - 返回
blockingErrors(阻止继续的错误)和preventContinuation(是否阻止继续)。
export async function* handleStopHooks(
messagesForQuery: Message[],
assistantMessages: AssistantMessage[],
systemPrompt: SystemPrompt,
userContext: { [k: string]: string },
systemContext: { [k: string]: string },
toolUseContext: ToolUseContext,
querySource: QuerySource,
stopHookActive?: boolean,
): AsyncGenerator<..., StopHookResult>来源:
src/query/stopHooks.ts,第 42–58 行。
停止钩子是 Claude Code 实现"智能终止"的关键:当模型完成一个任务后,钩子系统可以检测到任务完成并阻止无意义的继续,或者在检测到队友空闲时发送通知。
8.4 tokenBudget.ts:Token 预算决策
export type BudgetTracker = {
continuationCount: number
lastDeltaTokens: number
lastGlobalTurnTokens: number
startedAt: number
}
export function checkTokenBudget(
tracker: BudgetTracker,
agentId: string | undefined,
budget: number | null,
globalTurnTokens: number,
): TokenBudgetDecision来源:
src/query/tokenBudget.ts,第 10–30 行。
checkTokenBudget 实现了两种决策:
- ContinueDecision:如果当前 Token 数低于预算的 90%(
COMPLETION_THRESHOLD),且没有出现"收益递减"(连续 3 次继续且每次增量低于 500 Token),则生成一个 nudge 消息让模型继续。 - StopDecision:如果达到阈值或出现收益递减,则停止并记录完成事件。
const isDiminishing =
tracker.continuationCount >= 3 &&
deltaSinceLastCheck < DIMINISHING_THRESHOLD &&
tracker.lastDeltaTokens < DIMINISHING_THRESHOLD来源:
src/query/tokenBudget.ts,第 70–73 行。
这个设计防止了模型在"几乎没有进展"的情况下无限循环,同时也允许真正有进展的对话继续到预算上限。
九、错误恢复机制全景
Claude Code 面对 LLM 生产环境的各种不确定性,构建了一套多层错误恢复体系:
| 错误类型 | 恢复策略 | 实现位置 |
|---|---|---|
prompt_too_long | 1. Context Collapse drain 2. Reactive Compact 3. 直接报错 | query.ts 520–560 行 |
max_output_tokens | 1. 升级到 64k 上限 2. 注入恢复消息继续 3. 3 次后停止 | query.ts 570–610 行 |
| 模型 Fallback | 切换到 fallback 模型,重置状态重试 | query.ts 420–460 行 |
| API 异常 | yield api_retry 系统消息,由上层重试 | QueryEngine.ts 480–490 行 |
| 流式中断 | 生成 synthetic tool_result,yield 中断消息 | query.ts 470–490 行 |
| 图像过大 | Reactive Compact 的媒体恢复 | query.ts 530 行 |
这套恢复机制的核心哲学是:尽可能在内部消化异常,避免将中间错误暴露给最终用户。例如 prompt_too_long 的错误消息会被"扣留"(withhold),直到所有恢复尝试都失败后才 surfaced;max_output_tokens 的恢复消息直接告诉模型"不要道歉,直接继续",以最小化 Token 浪费。
十、总结:QueryEngine 的架构设计思想
回顾 QueryEngine 和 query.ts 的设计,我们可以提炼出几个关键的架构思想:
状态与逻辑的分离:QueryEngine 持有跨轮次状态,query.ts 处理单次轮次逻辑。这种分离使得 query.ts 可以被独立测试(通过
QueryDeps注入),而 QueryEngine 可以被 REPL 和 SDK 复用。生成器驱动的流式架构:从
submitMessage()到query()再到deps.callModel(),整个链路都是AsyncGenerator。这种设计天然支持流式输出、中途取消和背压控制。渐进式降级:面对上下文超限,系统不会直接报错,而是依次尝试 snip → microcompact → context collapse → autocompact → reactive compact,只有在所有手段都失败后才向用户暴露错误。
预算驱动的自控:通过
maxTurns、maxBudgetUsd、taskBudget和tokenBudget四层预算,系统可以在失控前主动停止,避免"AI 烧钱"的灾难场景。Tree-shaking 友好的特性开关:
feature()gate 被刻意留在内联条件中,HISTORY_SNIP和CONTEXT_COLLAPSE等特性的字符串不会出现在未启用的构建产物中。QueryEngine 通过注入snipReplay回调来避免直接引用特性模块。
QueryEngine 不是 Claude Code 中最复杂的模块(query.ts 比它更大),但它是最核心的粘合层——它将用户输入、系统状态、模型 API、工具链和错误恢复编织成一个连贯的对话体验。理解 QueryEngine,就理解了 Claude Code 的心脏如何跳动。