这是「GSD 全景代码解析」专题的第 43 篇。
专题引言:在 GSD(Get Shit Done)全景代码解析系列中,我们已深入剖析了 Task 调度、状态机、Context 传递等核心机制。本篇将视角转向「查询层与事件流」—— 这是 SDK 与外部世界交互的桥梁。无论是 IDE 的实时反馈,还是 CLI 的终端交互,都依赖一套统一的 Transport 抽象与事件流设计。理解这一层,是掌握 GSD SDK 运行时全貌的关键。
在 AI Agent 工具链中,查询层(Query Layer) 负责处理外部请求,而 事件流(Event Stream) 则构成了 Agent 与外部环境之间实时通信的血脉。GSD SDK 将这两部分设计得高度解耦:查询层专注于请求路由与响应构造,事件流则统一了各类通信通道的消息传递语义。
本文将深入解析 sdk/src/query/ 目录的代码结构、event-stream.ts 的核心实现,以及 WebSocket Transport 与 CLI Transport 的设计哲学。
一、query/ 目录结构
query/ 目录是 GSD SDK 中处理外部查询请求的集中入口。其结构如下:
sdk/src/query/
├── index.ts # 查询层统一入口,导出核心 API
├── types.ts # 查询相关的类型定义(QueryRequest、QueryResponse 等)
├── parser.ts # 查询请求解析器,处理输入参数校验与规范化
├── router.ts # 查询路由,将请求分发到对应的处理器
├── handlers/
│ ├── task.ts # Task 相关查询处理器
│ ├── context.ts # Context 查询处理器
│ ├── state.ts # 状态查询处理器
│ └── index.ts # 处理器聚合导出
├── middleware/
│ ├── auth.ts # 认证中间件
│ ├── logging.ts # 日志中间件
│ └── error-handler.ts # 错误处理中间件
└── utils.ts # 查询层工具函数1.1 查询层的职责边界
query 层的设计遵循 "薄层控制器" 原则:它本身不包含业务逻辑,而是作为外部请求与 SDK 内部服务之间的翻译层。
// sdk/src/query/index.ts
export class QueryLayer {
private router: QueryRouter;
private middlewares: QueryMiddleware[] = [];
constructor(options: QueryLayerOptions) {
this.router = new QueryRouter(options.handlers);
this.setupDefaultMiddlewares();
}
async execute(request: QueryRequest): Promise<QueryResponse> {
const context = await this.runMiddlewares(request);
return this.router.dispatch(context);
}
}查询请求的生命周期分为四个阶段:
- 接收(Receive):从 Transport 层接收原始请求
- 解析(Parse):
parser.ts将原始输入转换为结构化的QueryRequest - 路由(Route):
router.ts根据请求类型匹配 Handler - 响应(Respond):将 Handler 结果序列化为
QueryResponse返回
flowchart LR
A[Transport 层] -->|原始请求| B[QueryLayer]
B -->|解析| C[parser.ts]
C -->|QueryRequest| D[router.ts]
D -->|分发| E[handlers/*.ts]
E -->|结果| F[QueryResponse]
F -->|序列化| A1.2 类型系统的严谨设计
types.ts 中定义了完整的查询类型体系:
// sdk/src/query/types.ts
export interface QueryRequest<T = unknown> {
id: string; // 请求唯一标识,用于链路追踪
type: QueryType; // 查询类型枚举
payload: T; // 请求载荷
metadata: {
timestamp: number;
source: TransportSource; // 请求来源:ws | cli | http
auth?: AuthToken;
};
}
export interface QueryResponse<T = unknown> {
id: string; // 对应请求 ID
status: 'success' | 'error' | 'pending';
data?: T;
error?: QueryError;
metadata: {
timestamp: number;
duration: number; // 处理耗时(ms)
};
}
export enum QueryType {
TASK_LIST = 'task:list',
TASK_GET = 'task:get',
TASK_CREATE = 'task:create',
CONTEXT_GET = 'context:get',
STATE_SNAPSHOT = 'state:snapshot',
// ... 其他查询类型
}这种设计确保了所有查询都具有统一的结构,无论来自 WebSocket、CLI 还是未来可能扩展的 HTTP 通道。
二、event-stream.ts:14KB 的事件流心脏
event-stream.ts 是整个 SDK 中最核心的通信基础设施之一,约 14KB 的代码量承载了事件订阅、发布、过滤、转换的全部能力。
2.1 事件流架构总览
事件流采用 发布-订阅(Pub-Sub) 模型,支持多播与背压控制:
flowchart TD
subgraph EventStream
A[事件发布者] -->|Event| B[EventBus]
B -->|路由| C[过滤器链]
C -->|筛选后| D[订阅者 1]
C -->|筛选后| E[订阅者 2]
C -->|筛选后| F[订阅者 N]
end
G[Transport WS] -->|反序列化| A
H[Transport CLI] -->|反序列化| A
I[内部状态变更] -->|触发| A
D -->|序列化| J[客户端 WS]
E -->|序列化| K[CLI 输出]2.2 核心类:EventStream
// sdk/src/query/event-stream.ts
export class EventStream<T = unknown> {
private subscribers = new Map<string, Set<EventSubscriber<T>>>();
private middlewareChain: EventMiddleware<T>[] = [];
private buffer: RingBuffer<Event<T>>; // 环形缓冲区用于背压控制
private transformer?: EventTransformer<T>;
constructor(options: EventStreamOptions<T> = {}) {
this.buffer = new RingBuffer(options.bufferSize || 1000);
this.transformer = options.transformer;
}
// 订阅特定事件类型
subscribe(
eventType: string | string[],
handler: EventHandler<T>,
options: SubscribeOptions = {}
): Subscription {
const types = Array.isArray(eventType) ? eventType : [eventType];
const subscriber: EventSubscriber<T> = {
id: generateId(),
handler,
filter: options.filter,
priority: options.priority || 0,
once: options.once || false,
};
for (const type of types) {
if (!this.subscribers.has(type)) {
this.subscribers.set(type, new Set());
}
this.subscribers.get(type)!.add(subscriber);
}
return new Subscription(() => this.unsubscribe(subscriber, types));
}
// 发布事件
async emit(event: Event<T>): Promise<void> {
// 1. 经过中间件链处理
const processedEvent = await this.runMiddlewares(event);
if (!processedEvent) return; // 中间件可拦截事件
// 2. 写入缓冲区(背压控制)
if (!this.buffer.push(processedEvent)) {
this.handleBackpressure(processedEvent);
return;
}
// 3. 路由到订阅者
await this.dispatch(processedEvent);
}
}2.3 事件类型与格式
GSD 定义了一套完整的事件类型体系,分为 系统事件、任务事件 和 传输事件 三大类:
// sdk/src/query/event-stream.ts
// 基础事件接口
export interface Event<T = unknown> {
type: EventType;
payload: T;
timestamp: number;
sequence: number; // 全局序列号,保证有序性
source: EventSource; // 事件来源
traceId: string; // 链路追踪 ID
}
// 事件类型枚举
export enum EventType {
// 系统事件
SYSTEM_READY = 'system:ready',
SYSTEM_ERROR = 'system:error',
SYSTEM_HEARTBEAT = 'system:heartbeat',
// 任务生命周期事件
TASK_CREATED = 'task:created',
TASK_STARTED = 'task:started',
TASK_PROGRESS = 'task:progress',
TASK_COMPLETED = 'task:completed',
TASK_FAILED = 'task:failed',
TASK_CANCELLED = 'task:cancelled',
// 状态变更事件
STATE_CHANGED = 'state:changed',
CONTEXT_UPDATED = 'context:updated',
// 传输层事件
TRANSPORT_CONNECTED = 'transport:connected',
TRANSPORT_DISCONNECTED = 'transport:disconnected',
TRANSPORT_MESSAGE = 'transport:message',
}
// 事件来源
export interface EventSource {
type: 'internal' | 'websocket' | 'cli' | 'hook';
id: string; // 来源实例标识
}2.4 事件订阅与发布机制
订阅优先级与过滤
事件流支持基于优先级的有序分发和细粒度过滤:
// 带过滤条件的订阅示例
const subscription = eventStream.subscribe(
['task:progress', 'task:completed'],
(event) => {
console.log(`Task ${event.payload.taskId}: ${event.payload.progress}%`);
},
{
filter: (event) => event.payload.taskId.startsWith('agent-'),
priority: 10, // 数值越大优先级越高
}
);
// 取消订阅
subscription.unsubscribe();背压控制策略
当事件产生速率超过消费速率时,event-stream.ts 提供了三种背压策略:
// sdk/src/query/event-stream.ts
enum BackpressureStrategy {
DROP_OLDEST = 'drop-oldest', // 丢弃最旧的事件(默认)
DROP_LATEST = 'drop-latest', // 丢弃最新的事件
BLOCK = 'block', // 阻塞生产者(慎用)
THROTTLE = 'throttle', // 节流采样
}
class RingBuffer<T> {
private buffer: T[];
private head = 0;
private tail = 0;
private count = 0;
constructor(private capacity: number) {
this.buffer = new Array(capacity);
}
push(item: T): boolean {
if (this.count === this.capacity) {
return false; // 缓冲区已满
}
this.buffer[this.tail] = item;
this.tail = (this.tail + 1) % this.capacity;
this.count++;
return true;
}
shift(): T | undefined {
if (this.count === 0) return undefined;
const item = this.buffer[this.head];
this.head = (this.head + 1) % this.capacity;
this.count--;
return item;
}
}2.5 事件转换与管道
event-stream.ts 支持声明式的事件转换管道,便于 Transport 层进行协议适配:
// 定义事件转换器,将内部事件转换为外部协议格式
const wsTransformer: EventTransformer<WSMessage> = {
incoming: (raw: WSMessage): Event => ({
type: raw.type as EventType,
payload: raw.data,
timestamp: Date.now(),
sequence: raw.seq,
source: { type: 'websocket', id: raw.clientId },
traceId: raw.traceId || generateId(),
}),
outgoing: (event: Event): WSMessage => ({
type: event.type,
data: event.payload,
seq: event.sequence,
clientId: event.source.id,
traceId: event.traceId,
ts: event.timestamp,
}),
};
// 创建带转换器的事件流实例
const wsEventStream = new EventStream({
transformer: wsTransformer,
bufferSize: 500,
backpressure: BackpressureStrategy.DROP_OLDEST,
});三、WebSocket Transport:实时通信机制
WebSocket Transport 是 GSD SDK 与 IDE 插件、Web 界面通信的主要通道,实现了全双工的实时事件流。
3.1 架构与连接模型
flowchart TD
subgraph Client
A[IDE 插件]
B[Web 控制台]
end
subgraph Server["GSD SDK Server"]
C[WebSocket Server]
D[ConnectionManager]
E[WebSocketTransport]
F[EventStream]
end
A <-->|ws://| C
B <-->|wss://| C
C -->|管理| D
D -->|封装| E
E <-->|事件| F3.2 连接管理
WebSocketTransport 实现了完善的连接生命周期管理:
// sdk/src/query/transport/websocket.ts
export class WebSocketTransport implements Transport {
private wss: WebSocketServer;
private connections = new Map<string, WebSocketConnection>();
private heartbeatInterval: NodeJS.Timer;
private eventStream: EventStream;
constructor(options: WebSocketTransportOptions) {
this.eventStream = options.eventStream;
this.setupServer(options.port);
this.startHeartbeat();
}
private setupServer(port: number): void {
this.wss = new WebSocketServer({ port });
this.wss.on('connection', (ws: WebSocket, req: IncomingMessage) => {
const connId = generateId();
const connection: WebSocketConnection = {
id: connId,
ws,
state: 'connecting',
metadata: {
remoteAddress: req.socket.remoteAddress,
userAgent: req.headers['user-agent'],
connectedAt: Date.now(),
},
subscriptions: new Set(),
};
this.connections.set(connId, connection);
this.handleConnection(connection);
});
}
private handleConnection(conn: WebSocketConnection): void {
conn.state = 'connected';
// 通知事件流有新连接
this.eventStream.emit({
type: EventType.TRANSPORT_CONNECTED,
payload: { connectionId: conn.id, metadata: conn.metadata },
timestamp: Date.now(),
sequence: this.getNextSequence(),
source: { type: 'websocket', id: conn.id },
traceId: generateId(),
});
// 绑定消息处理器
conn.ws.on('message', (data: RawData) => {
this.handleMessage(conn, data);
});
// 连接关闭处理
conn.ws.on('close', (code: number, reason: Buffer) => {
this.handleDisconnection(conn, code, reason.toString());
});
// 错误处理
conn.ws.on('error', (error: Error) => {
this.handleError(conn, error);
});
}
}3.3 心跳与连接保活
为防止长时间无通信导致的连接中断,WebSocket Transport 实现了双向心跳机制:
// sdk/src/query/transport/websocket.ts
private startHeartbeat(): void {
const HEARTBEAT_INTERVAL = 30000; // 30 秒
const HEARTBEAT_TIMEOUT = 60000; // 60 秒超时
this.heartbeatInterval = setInterval(() => {
for (const [connId, conn] of this.connections) {
if (conn.state !== 'connected') continue;
// 检查上次 pong 时间
if (Date.now() - conn.lastPong > HEARTBEAT_TIMEOUT) {
this.closeConnection(connId, 'heartbeat timeout');
continue;
}
// 发送 ping
conn.ws.ping();
conn.lastPing = Date.now();
}
}, HEARTBEAT_INTERVAL);
}3.4 消息序列化与协议
WebSocket Transport 采用 JSON 作为默认序列化格式,同时支持可插拔的序列化器:
// sdk/src/query/transport/websocket.ts
export interface WSMessage {
type: string;
data: unknown;
seq: number;
traceId: string;
ts: number;
// 可选的元数据字段
ack?: number; // 确认序列号(可靠传输)
channel?: string; // 多路复用通道标识
}
export class WebSocketTransport implements Transport {
private serializer: MessageSerializer;
constructor(options: WebSocketTransportOptions) {
this.serializer = options.serializer || new JSONSerializer();
}
private handleMessage(conn: WebSocketConnection, data: RawData): void {
try {
const message = this.serializer.deserialize(data.toString());
// 转换为核心事件并注入事件流
const event = this.transformer.incoming({
...message,
clientId: conn.id,
});
this.eventStream.emit(event);
} catch (error) {
this.sendError(conn, 'PARSE_ERROR', 'Failed to parse message');
}
}
// 向指定连接发送消息
async send(connId: string, event: Event): Promise<void> {
const conn = this.connections.get(connId);
if (!conn || conn.state !== 'connected') {
throw new TransportError(`Connection ${connId} not available`);
}
const message = this.transformer.outgoing(event);
const payload = this.serializer.serialize(message);
return new Promise((resolve, reject) => {
conn.ws.send(payload, (err) => {
if (err) reject(err);
else resolve();
});
});
}
// 广播到所有连接或指定过滤条件的连接
async broadcast(event: Event, filter?: ConnectionFilter): Promise<void> {
const targets = filter
? Array.from(this.connections.values()).filter(filter)
: Array.from(this.connections.values());
await Promise.all(
targets.map((conn) => this.send(conn.id, event).catch(console.error))
);
}
}四、CLI Transport:命令行交互通道
CLI Transport 为 GSD SDK 提供了与终端用户直接交互的能力,是本地开发和调试的核心通道。
4.1 交互模型
flowchart LR
A[用户输入] -->|stdin| B[CLI Transport]
B -->|解析| C[Command Parser]
C -->|QueryRequest| D[QueryLayer]
D -->|QueryResponse| E[Output Formatter]
E -->|stdout| F[终端显示]
B <-->|事件流| G[EventStream]4.2 输入输出处理
CLI Transport 采用 行模式(Line Mode) 读取用户输入,支持命令历史和 Tab 补全:
// sdk/src/query/transport/cli.ts
export class CLITransport implements Transport {
private rl: readline.Interface;
private eventStream: EventStream;
private outputFormat: OutputFormat = 'pretty';
private commandHistory: string[] = [];
constructor(options: CLITransportOptions) {
this.eventStream = options.eventStream;
this.rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
prompt: 'gsd> ',
completer: this.completer.bind(this),
});
this.setupHandlers();
}
private setupHandlers(): void {
// 命令行输入处理
this.rl.on('line', async (input: string) => {
const trimmed = input.trim();
if (!trimmed) {
this.rl.prompt();
return;
}
this.commandHistory.push(trimmed);
await this.executeCommand(trimmed);
this.rl.prompt();
});
// SIGINT 优雅退出
this.rl.on('SIGINT', () => {
this.shutdown();
});
// 事件流订阅:将内部事件渲染到终端
this.eventStream.subscribe(
[EventType.TASK_PROGRESS, EventType.TASK_COMPLETED, EventType.SYSTEM_ERROR],
(event) => {
this.renderEvent(event);
}
);
}
private async executeCommand(input: string): Promise<void> {
try {
const parsed = this.parseCommand(input);
const request: QueryRequest = {
id: generateId(),
type: parsed.type,
payload: parsed.args,
metadata: {
timestamp: Date.now(),
source: { type: 'cli', id: 'cli-session' },
},
};
// 通过 QueryLayer 执行
const response = await this.queryLayer.execute(request);
this.renderResponse(response);
} catch (error) {
this.renderError(error);
}
}
}4.3 与 IDE 的集成
GSD 的 CLI Transport 不仅服务于独立终端,还通过 JSON-RPC 模式 与 IDE 集成:
// sdk/src/query/transport/cli.ts
interface IDEIntegrationOptions {
mode: 'interactive' | 'json-rpc';
rpcVersion?: string;
}
export class CLITransport implements Transport {
private ideMode: boolean = false;
// 当检测到 IDE 环境变量时,自动切换为 JSON-RPC 模式
private detectIDEMode(): void {
if (process.env.GSD_IDE_MODE === '1' || process.env.VSCODE_PID) {
this.ideMode = true;
this.outputFormat = 'json';
this.setupJSONRPCHandlers();
}
}
private setupJSONRPCHandlers(): void {
// JSON-RPC 2.0 协议适配
this.rl.removeAllListeners('line');
this.rl.on('line', async (input: string) => {
try {
const rpcRequest: JSONRPCRequest = JSON.parse(input);
const result = await this.handleJSONRPC(rpcRequest);
this.writeJSONRPCResponse(rpcRequest.id, result);
} catch (error) {
this.writeJSONRPCError(null, -32700, 'Parse error');
}
});
}
// 将事件流输出为 JSON-RPC Notification
private renderEventAsJSONRPC(event: Event): void {
const notification: JSONRPCNotification = {
jsonrpc: '2.0',
method: 'event',
params: {
type: event.type,
data: event.payload,
timestamp: event.timestamp,
},
};
console.log(JSON.stringify(notification));
}
}4.4 终端渲染引擎
对于交互式模式,CLI Transport 内置了轻量级的终端渲染引擎:
// sdk/src/query/transport/cli.ts
class TerminalRenderer {
private spinnerFrames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
private spinnerIndex = 0;
private activeSpinners = new Map<string, NodeJS.Timer>();
renderTaskProgress(taskId: string, progress: number, message?: string): void {
const bar = this.renderProgressBar(progress, 30);
const line = `${this.spinnerFrames[this.spinnerIndex]} [${bar}] ${progress}% ${message || ''}`;
this.updateLine(taskId, line);
}
private renderProgressBar(progress: number, width: number): string {
const filled = Math.round((progress / 100) * width);
const empty = width - filled;
return '█'.repeat(filled) + '░'.repeat(empty);
}
// ANSI 转义序列控制光标位置,实现动态更新
private updateLine(id: string, content: string): void {
const row = this.getRowForId(id);
process.stdout.write(`\x1B[${row}H\x1B[K${content}`);
}
}五、Transport 抽象层设计
GSD SDK 将 WebSocket、CLI 等不同通信通道统一抽象为 Transport 接口,这是整个查询层与事件流解耦的关键。
5.1 Transport 接口定义
// sdk/src/query/transport/types.ts
export interface Transport {
readonly id: string;
readonly type: TransportType;
readonly state: TransportState;
// 启动与关闭
start(): Promise<void>;
shutdown(): Promise<void>;
// 消息发送
send(event: Event): Promise<void>;
// 事件流绑定
bind(eventStream: EventStream): void;
// 状态查询
isReady(): boolean;
}
export enum TransportType {
WEBSOCKET = 'websocket',
CLI = 'cli',
HTTP = 'http',
IPC = 'ipc',
}
export enum TransportState {
IDLE = 'idle',
STARTING = 'starting',
READY = 'ready',
CLOSING = 'closing',
CLOSED = 'closed',
ERROR = 'error',
}5.2 TransportManager:统一管理多通道
flowchart TD
A[TransportManager] -->|管理| B[WebSocketTransport]
A -->|管理| C[CLITransport]
A -->|管理| D[HTTPTransport]
A -->|管理| E[FutureTransport]
B <-->|事件| F[共享 EventStream]
C <-->|事件| F
D <-->|事件| F
E <-->|事件| F// sdk/src/query/transport/manager.ts
export class TransportManager {
private transports = new Map<string, Transport>();
private eventStream: EventStream;
constructor(eventStream: EventStream) {
this.eventStream = eventStream;
}
register(transport: Transport): void {
if (this.transports.has(transport.id)) {
throw new Error(`Transport ${transport.id} already registered`);
}
transport.bind(this.eventStream);
this.transports.set(transport.id, transport);
// 监听 Transport 状态变化
this.eventStream.subscribe(
[EventType.TRANSPORT_CONNECTED, EventType.TRANSPORT_DISCONNECTED],
(event) => {
this.handleTransportStateChange(event);
}
);
}
async startAll(): Promise<void> {
await Promise.all(
Array.from(this.transports.values()).map((t) => t.start())
);
}
async broadcast(event: Event): Promise<void> {
const readyTransports = Array.from(this.transports.values()).filter(
(t) => t.isReady()
);
await Promise.all(
readyTransports.map((t) =>
t.send(event).catch((err) => {
console.error(`Failed to send to transport ${t.id}:`, err);
})
)
);
}
}5.3 设计哲学总结
Transport 抽象层的设计体现了 GSD SDK 的几个核心原则:
| 设计原则 | 体现 |
|---|---|
| 统一接口 | 所有 Transport 实现相同的 Transport 接口,便于扩展 |
| 事件驱动 | 通过共享 EventStream 解耦 Transport 与业务逻辑 |
| 背压感知 | 内置 RingBuffer 与背压策略,避免内存溢出 |
| 协议无关 | 序列化器与转换器可插拔,支持 JSON、Protobuf 等 |
| 链路追踪 | 每个事件携带 traceId,跨 Transport 追踪请求全链路 |
六、总结
本文深入解析了 GSD SDK 的查询层与事件流架构:
query/目录 作为薄层控制器,通过parser→router→handler的分层设计,实现了外部请求的统一入口与业务解耦。event-stream.ts以约 14KB 的代码量提供了生产级的 Pub-Sub 能力,包括优先级订阅、过滤链、背压控制、事件转换等特性,是整个 SDK 的通信心脏。WebSocket Transport 实现了完整的连接生命周期管理、双向心跳保活、以及可扩展的消息序列化机制,是 IDE 集成与 Web 控制台的主要通道。
CLI Transport 同时支持交互式终端与 JSON-RPC 模式,既能提供友好的命令行体验,也能无缝集成到 VS Code 等 IDE 中。
Transport 抽象层 通过统一的接口与
TransportManager,让多通道并存、事件广播、链路追踪变得简洁而强大。
理解这一层的设计,有助于我们在后续章节中更好地掌握 GSD 的 Hook 系统与运行时扩展机制。
下一篇预告:
下一篇预告:第 44 篇《钩子系统总览》
我们将深入解析 GSD SDK 的 Hook 系统,包括生命周期钩子的注册与执行机制、beforeTask、afterTask、onError 等核心钩子点的实现原理,以及如何通过钩子实现插件化的运行时扩展。钩子系统是 GSD「可扩展架构」的基石,也是自定义 Agent 行为的主要入口,敬请期待。