上一篇我用 TypeScript 写了一个最小 Agent Loop。
那一版能跑通工具调用,但体验还很原始:每次请求模型,都要等它完整生成以后才有输出。
天气工具这种小例子还好。如果换成 coding agent,让它读文件、执行命令、整理结果,中间可能要等好几秒。用户看着终端不动,很难判断程序是在正常工作,还是已经卡住了。
所以第二步我先给它加流式输出。
目标很简单:模型生成文字时,终端马上开始打印;如果模型要调用工具,程序也能从流式返回里正确拼出工具名和参数。
这篇只讲 lv4-stream.ts 这一层。
用户输入
-> messages 历史
-> LLM 流式返回 chunk
-> 普通文字直接打印
-> 工具调用参数先拼接
-> 本地执行工具
-> 工具结果写回 messages
-> 再请求 LLM非流式的问题#
第一版 Agent Loop 大概是这样:
const res = await client.chat.completions.create({
model,
messages,
tools,
tool_choice: 'auto',
})
const message = res.choices[0]?.message这段代码的特点是简单。请求发出去以后,等服务端生成完整响应,然后一次性拿到 message。
缺点也很明显:等待期间没有任何反馈。
如果模型最后决定调用工具,用户在等待时也看不到“它正在准备调用工具”。如果模型最后直接回答,用户也只能等完整回答生成完。
流式输出解决的是等待体验。它不一定让模型更快完成,但会让用户更早看到进展。
开启 stream#
OpenAI-compatible 的 Chat Completions 里,开启流式只要加一个字段:
const stream = await client.chat.completions.create({
model,
messages,
tools,
tool_choice: 'auto',
stream: true,
})加上 stream: true 以后,返回值就不再是一个完整 response,而是一个可以 for await...of 遍历的流。
在 HTTP 层,它是 SSE。服务端会不断发这种片段:
data: {"delta":{"content":"你"}}
data: {"delta":{"content":"好"}}
data: {"delta":{"content":","}}
data: [DONE]OpenAI SDK 已经把 SSE 解析掉了。在代码里,我只需要遍历每个 chunk:
let streamedText = ''
for await (const chunk of stream) {
const choice = chunk.choices[0]
if (!choice) continue
const delta = choice.delta
if (delta.content) {
process.stdout.write(delta.content)
streamedText += delta.content
}
}这里同时做了两件事。
process.stdout.write(delta.content) 负责实时打印。它不会自动换行,所以适合做终端里的打字机效果。
streamedText += delta.content 负责保存完整回答。流结束以后,我还要把这条 assistant 消息放回 messages。
如果只打印不保存,下一轮模型就看不到自己刚才说过什么。
普通文字很简单#
普通文本的流式处理几乎没有难度。
来一个 delta.content,打印一个。最后拿到完整的 streamedText,构造一条 assistant message:
const assistantMessage = {
role: 'assistant',
content: streamedText,
}问题出在工具调用。
非流式时,工具调用是一整个对象。模型如果要执行 exec_shell,我拿到的通常是这种结构:
{
"tool_calls": [
{
"id": "call_abc",
"type": "function",
"function": {
"name": "exec_shell",
"arguments": "{\"cmd\":\"date\"}"
}
}
]
}arguments 虽然是字符串,但至少是完整字符串。直接 JSON.parse 就能得到参数对象。
流式返回里,情况会麻烦一点。
工具名、工具 id、参数字符串都可能分散在不同 chunk 里。尤其是 arguments,它经常是一段一段到达。
chunk1: { index: 0, id: "call_abc", function: { name: "exec_shell" } }
chunk2: { index: 0, function: { arguments: "{\"c" } }
chunk3: { index: 0, function: { arguments: "md" } }
chunk4: { index: 0, function: { arguments: "\":\"date\"}" } }这几段拼起来才是:
{"cmd":"date"}如果在 chunk2 的时候就解析,JSON.parse('{"c') 一定会报错。
所以流式工具调用要先把参数拼完整,再解析。
用 Map 拼工具调用#
我在 lv4-stream.ts 里加了一个累积结构:
type ToolCallAccumulator = {
id: string
name: string
arguments: string
}
const pendingToolCalls = new Map<number, ToolCallAccumulator>()这里用 Map,key 是 chunk 里的 index。
为什么不用数组?因为一轮里可能有多个工具调用,不同工具的 chunk 可能交错到达。用 index 做 key,更像是在给每个工具调用建一条单独的缓冲区。
处理逻辑是这样:
if (delta.tool_calls) {
for (const tcDelta of delta.tool_calls) {
const idx = tcDelta.index
if (!pendingToolCalls.has(idx)) {
pendingToolCalls.set(idx, { id: '', name: '', arguments: '' })
}
const pendingCall = pendingToolCalls.get(idx)!
if (tcDelta.id) {
pendingCall.id = tcDelta.id
}
if (tcDelta.function?.name) {
pendingCall.name = tcDelta.function.name
}
if (tcDelta.function?.arguments) {
pendingCall.arguments += tcDelta.function.arguments
}
}
}最容易写错的是最后一行。
这里必须是 +=,不能是 =。
因为每个 chunk 只带一小段参数。用 = 会把前面已经收到的片段覆盖掉。等流结束时,手里只剩最后一段,当然解析不出完整 JSON。
流结束以后再决定结果类型#
流式响应里,每个 chunk 只是局部信息。
我不能看到第一个文字 chunk 就说“这轮是文本回答”,也不能看到第一个工具参数 chunk 就立刻执行工具。要等这一轮流结束,再根据最终的 finish_reason 判断。
代码里用一个变量记录结束信号:
let finalSignal = ''
for await (const chunk of stream) {
const choice = chunk.choices[0]
if (!choice) continue
// 处理 delta.content 和 delta.tool_calls
if (choice.finish_reason) {
finalSignal = choice.finish_reason
}
}流结束后,把 pendingToolCalls 整理成数组:
const toolCalls = [...pendingToolCalls.entries()]
.sort(([a], [b]) => a - b)
.map(([, acc]) => acc)然后返回两种结果之一:
if (finalSignal === 'tool_calls') {
return { type: 'tool_calls', calls: toolCalls, assistantMessage }
}
return { type: 'text', content: streamedText, assistantMessage }streamOneTurn 对外不暴露一堆零散 chunk,只返回结构化结果。
上层 Agent Loop 不需要关心底层是不是 SSE,也不需要知道参数是怎么拼出来的。它只看结果类型:文本,或者工具调用。
还要补一条 assistant message#
非流式接口会直接给我一条完整的 message。
流式接口不会。它只给我很多 chunk。
但 Agent Loop 仍然需要一条完整的 assistant message 放回 messages,否则下一轮上下文会断。
所以我在流结束后手动构造:
const assistantMessage =
finalSignal === 'tool_calls'
? {
role: 'assistant',
content: streamedText || null,
tool_calls: toolCalls.map(tc => ({
id: tc.id,
type: 'function' as const,
function: {
name: tc.name,
arguments: tc.arguments,
},
})),
}
: {
role: 'assistant',
content: streamedText,
}这条消息给下一轮模型看。
尤其是工具调用场景,顺序必须是:
assistant: 我要调用 exec_shell({"cmd":"date"})
tool: 命令执行结果
assistant: 根据命令结果回答用户如果少了第一条 assistant 消息,工具结果就像凭空出现的一段文本。模型可能还能猜到一点,但上下文已经不完整了。
Agent Loop 基本不用改#
把流式读取封装进 streamOneTurn 以后,外层循环还是原来的形状:
async function runStreamingAgent(messages) {
for (let round = 1; round <= 10; round++) {
const result = await streamOneTurn(messages)
messages.push(result.assistantMessage)
if (result.type === 'text') {
console.log('\n')
return
}
for (const tc of result.calls) {
const parsed = parseToolArguments(tc.name, tc.arguments)
const handler = toolHandlers[tc.name]
const toolResult = parsed.ok
? handler ? handler(parsed.args) : `[error] 未知工具: ${tc.name}`
: parsed.error
messages.push({
role: 'tool',
tool_call_id: tc.id,
content: toolResult,
})
}
}
}这段和最小 Agent Loop 的思路一样:
问模型
-> 模型直接回答:结束
-> 模型请求工具:执行工具
-> 工具结果写回 messages
-> 继续问模型区别只是“问模型”这一步换成了流式实现。
这也是我现在喜欢的写法:把复杂的协议细节包在 streamOneTurn 里,让 Agent Loop 继续像状态机一样读。
参数解析也要防御#
工具参数拼完以后,才可以解析:
function parseToolArguments(toolName: string, rawArguments: string) {
try {
return { ok: true, args: JSON.parse(rawArguments) }
} catch {
return {
ok: false,
error: `[error] 工具 ${toolName} 的参数不是合法 JSON: ${rawArguments}`,
}
}
}这里没有让程序直接崩掉。
如果参数不是合法 JSON,我把错误当成工具结果放回 messages。这样模型下一轮能看到失败原因,有机会自己修正。
这是最小 demo 里也值得保留的习惯。模型输出永远可能不符合预期,尤其是在工具多、参数复杂的时候。
这版踩到的坑#
第一个坑是用 console.log 打印流式文字。
console.log 会自动换行。每个 token 来一次 console.log,终端里就会变成一列字。流式输出应该用 process.stdout.write。
第二个坑是工具参数用 = 拼接。
arguments 是碎片。每来一段都要追加到原来的字符串后面。
第三个坑还是 messages.push(result.assistantMessage)。
从非流式改到流式以后,很容易以为“文字已经打印了”,就忘了保存 assistant 消息。打印给用户看是一回事,保存给模型看是另一回事。
第四个坑是太早执行工具。
工具调用要等流结束再执行。否则参数可能还没到齐,本地函数拿到的是半截 JSON。
怎么运行#
在 kk-agent-lab 里跑:
npm run lv4启动以后可以问:
当前时间是多少?如果模型决定调用 exec_shell,你会看到它先流式处理工具调用,程序再执行本地命令,最后模型根据命令结果回答。
也可以问文件相关问题,比如:
读取 ./package.json,告诉我项目名。这时流程会更像一个 coding agent:
模型请求 read_file
-> 本地读取 package.json
-> 工具结果写回 messages
-> 模型生成最终回答下一步#
加上流式以后,这个 Agent 已经更像一个能用的终端助手。
但 messages 还在一直变长。每次用户输入、模型回答、工具调用、工具结果,都会追加进去。短任务没问题,长任务会遇到上下文越来越大的问题。
所以下一层我准备写 context compact。
也就是:保留最近几轮,把更早的历史压缩成任务摘要,再继续让 Agent 工作。