上一篇我用 TypeScript 写了一个最小 Agent Loop。

那一版能跑通工具调用,但体验还很原始:每次请求模型,都要等它完整生成以后才有输出。

天气工具这种小例子还好。如果换成 coding agent,让它读文件、执行命令、整理结果,中间可能要等好几秒。用户看着终端不动,很难判断程序是在正常工作,还是已经卡住了。

所以第二步我先给它加流式输出。

目标很简单:模型生成文字时,终端马上开始打印;如果模型要调用工具,程序也能从流式返回里正确拼出工具名和参数。

这篇只讲 lv4-stream.ts 这一层。

txt
用户输入
-> messages 历史
-> LLM 流式返回 chunk
-> 普通文字直接打印
-> 工具调用参数先拼接
-> 本地执行工具
-> 工具结果写回 messages
-> 再请求 LLM

非流式的问题#

第一版 Agent Loop 大概是这样:

ts
const res = await client.chat.completions.create({
  model,
  messages,
  tools,
  tool_choice: 'auto',
})
 
const message = res.choices[0]?.message

这段代码的特点是简单。请求发出去以后,等服务端生成完整响应,然后一次性拿到 message

缺点也很明显:等待期间没有任何反馈。

如果模型最后决定调用工具,用户在等待时也看不到“它正在准备调用工具”。如果模型最后直接回答,用户也只能等完整回答生成完。

流式输出解决的是等待体验。它不一定让模型更快完成,但会让用户更早看到进展。

开启 stream#

OpenAI-compatible 的 Chat Completions 里,开启流式只要加一个字段:

ts
const stream = await client.chat.completions.create({
  model,
  messages,
  tools,
  tool_choice: 'auto',
  stream: true,
})

加上 stream: true 以后,返回值就不再是一个完整 response,而是一个可以 for await...of 遍历的流。

在 HTTP 层,它是 SSE。服务端会不断发这种片段:

txt
data: {"delta":{"content":"你"}}
data: {"delta":{"content":"好"}}
data: {"delta":{"content":","}}
data: [DONE]

OpenAI SDK 已经把 SSE 解析掉了。在代码里,我只需要遍历每个 chunk

ts
let streamedText = ''
 
for await (const chunk of stream) {
  const choice = chunk.choices[0]
  if (!choice) continue
 
  const delta = choice.delta
 
  if (delta.content) {
    process.stdout.write(delta.content)
    streamedText += delta.content
  }
}

这里同时做了两件事。

process.stdout.write(delta.content) 负责实时打印。它不会自动换行,所以适合做终端里的打字机效果。

streamedText += delta.content 负责保存完整回答。流结束以后,我还要把这条 assistant 消息放回 messages

如果只打印不保存,下一轮模型就看不到自己刚才说过什么。

普通文字很简单#

普通文本的流式处理几乎没有难度。

来一个 delta.content,打印一个。最后拿到完整的 streamedText,构造一条 assistant message:

ts
const assistantMessage = {
  role: 'assistant',
  content: streamedText,
}

问题出在工具调用。

非流式时,工具调用是一整个对象。模型如果要执行 exec_shell,我拿到的通常是这种结构:

json
{
  "tool_calls": [
    {
      "id": "call_abc",
      "type": "function",
      "function": {
        "name": "exec_shell",
        "arguments": "{\"cmd\":\"date\"}"
      }
    }
  ]
}

arguments 虽然是字符串,但至少是完整字符串。直接 JSON.parse 就能得到参数对象。

流式返回里,情况会麻烦一点。

工具名、工具 id、参数字符串都可能分散在不同 chunk 里。尤其是 arguments,它经常是一段一段到达。

txt
chunk1: { index: 0, id: "call_abc", function: { name: "exec_shell" } }
chunk2: { index: 0, function: { arguments: "{\"c" } }
chunk3: { index: 0, function: { arguments: "md" } }
chunk4: { index: 0, function: { arguments: "\":\"date\"}" } }

这几段拼起来才是:

json
{"cmd":"date"}

如果在 chunk2 的时候就解析,JSON.parse('{"c') 一定会报错。

所以流式工具调用要先把参数拼完整,再解析。

用 Map 拼工具调用#

我在 lv4-stream.ts 里加了一个累积结构:

ts
type ToolCallAccumulator = {
  id: string
  name: string
  arguments: string
}
 
const pendingToolCalls = new Map<number, ToolCallAccumulator>()

这里用 Map,key 是 chunk 里的 index

为什么不用数组?因为一轮里可能有多个工具调用,不同工具的 chunk 可能交错到达。用 index 做 key,更像是在给每个工具调用建一条单独的缓冲区。

处理逻辑是这样:

ts
if (delta.tool_calls) {
  for (const tcDelta of delta.tool_calls) {
    const idx = tcDelta.index
 
    if (!pendingToolCalls.has(idx)) {
      pendingToolCalls.set(idx, { id: '', name: '', arguments: '' })
    }
 
    const pendingCall = pendingToolCalls.get(idx)!
 
    if (tcDelta.id) {
      pendingCall.id = tcDelta.id
    }
 
    if (tcDelta.function?.name) {
      pendingCall.name = tcDelta.function.name
    }
 
    if (tcDelta.function?.arguments) {
      pendingCall.arguments += tcDelta.function.arguments
    }
  }
}

最容易写错的是最后一行。

这里必须是 +=,不能是 =

因为每个 chunk 只带一小段参数。用 = 会把前面已经收到的片段覆盖掉。等流结束时,手里只剩最后一段,当然解析不出完整 JSON。

流结束以后再决定结果类型#

流式响应里,每个 chunk 只是局部信息。

我不能看到第一个文字 chunk 就说“这轮是文本回答”,也不能看到第一个工具参数 chunk 就立刻执行工具。要等这一轮流结束,再根据最终的 finish_reason 判断。

代码里用一个变量记录结束信号:

ts
let finalSignal = ''
 
for await (const chunk of stream) {
  const choice = chunk.choices[0]
  if (!choice) continue
 
  // 处理 delta.content 和 delta.tool_calls
 
  if (choice.finish_reason) {
    finalSignal = choice.finish_reason
  }
}

流结束后,把 pendingToolCalls 整理成数组:

ts
const toolCalls = [...pendingToolCalls.entries()]
  .sort(([a], [b]) => a - b)
  .map(([, acc]) => acc)

然后返回两种结果之一:

ts
if (finalSignal === 'tool_calls') {
  return { type: 'tool_calls', calls: toolCalls, assistantMessage }
}
 
return { type: 'text', content: streamedText, assistantMessage }

streamOneTurn 对外不暴露一堆零散 chunk,只返回结构化结果。

上层 Agent Loop 不需要关心底层是不是 SSE,也不需要知道参数是怎么拼出来的。它只看结果类型:文本,或者工具调用。

还要补一条 assistant message#

非流式接口会直接给我一条完整的 message

流式接口不会。它只给我很多 chunk。

但 Agent Loop 仍然需要一条完整的 assistant message 放回 messages,否则下一轮上下文会断。

所以我在流结束后手动构造:

ts
const assistantMessage =
  finalSignal === 'tool_calls'
    ? {
        role: 'assistant',
        content: streamedText || null,
        tool_calls: toolCalls.map(tc => ({
          id: tc.id,
          type: 'function' as const,
          function: {
            name: tc.name,
            arguments: tc.arguments,
          },
        })),
      }
    : {
        role: 'assistant',
        content: streamedText,
      }

这条消息给下一轮模型看。

尤其是工具调用场景,顺序必须是:

txt
assistant: 我要调用 exec_shell({"cmd":"date"})
tool: 命令执行结果
assistant: 根据命令结果回答用户

如果少了第一条 assistant 消息,工具结果就像凭空出现的一段文本。模型可能还能猜到一点,但上下文已经不完整了。

Agent Loop 基本不用改#

把流式读取封装进 streamOneTurn 以后,外层循环还是原来的形状:

ts
async function runStreamingAgent(messages) {
  for (let round = 1; round <= 10; round++) {
    const result = await streamOneTurn(messages)
    messages.push(result.assistantMessage)
 
    if (result.type === 'text') {
      console.log('\n')
      return
    }
 
    for (const tc of result.calls) {
      const parsed = parseToolArguments(tc.name, tc.arguments)
      const handler = toolHandlers[tc.name]
 
      const toolResult = parsed.ok
        ? handler ? handler(parsed.args) : `[error] 未知工具: ${tc.name}`
        : parsed.error
 
      messages.push({
        role: 'tool',
        tool_call_id: tc.id,
        content: toolResult,
      })
    }
  }
}

这段和最小 Agent Loop 的思路一样:

txt
问模型
-> 模型直接回答:结束
-> 模型请求工具:执行工具
-> 工具结果写回 messages
-> 继续问模型

区别只是“问模型”这一步换成了流式实现。

这也是我现在喜欢的写法:把复杂的协议细节包在 streamOneTurn 里,让 Agent Loop 继续像状态机一样读。

参数解析也要防御#

工具参数拼完以后,才可以解析:

ts
function parseToolArguments(toolName: string, rawArguments: string) {
  try {
    return { ok: true, args: JSON.parse(rawArguments) }
  } catch {
    return {
      ok: false,
      error: `[error] 工具 ${toolName} 的参数不是合法 JSON: ${rawArguments}`,
    }
  }
}

这里没有让程序直接崩掉。

如果参数不是合法 JSON,我把错误当成工具结果放回 messages。这样模型下一轮能看到失败原因,有机会自己修正。

这是最小 demo 里也值得保留的习惯。模型输出永远可能不符合预期,尤其是在工具多、参数复杂的时候。

这版踩到的坑#

第一个坑是用 console.log 打印流式文字。

console.log 会自动换行。每个 token 来一次 console.log,终端里就会变成一列字。流式输出应该用 process.stdout.write

第二个坑是工具参数用 = 拼接。

arguments 是碎片。每来一段都要追加到原来的字符串后面。

第三个坑还是 messages.push(result.assistantMessage)

从非流式改到流式以后,很容易以为“文字已经打印了”,就忘了保存 assistant 消息。打印给用户看是一回事,保存给模型看是另一回事。

第四个坑是太早执行工具。

工具调用要等流结束再执行。否则参数可能还没到齐,本地函数拿到的是半截 JSON。

怎么运行#

kk-agent-lab 里跑:

bash
npm run lv4

启动以后可以问:

txt
当前时间是多少?

如果模型决定调用 exec_shell,你会看到它先流式处理工具调用,程序再执行本地命令,最后模型根据命令结果回答。

也可以问文件相关问题,比如:

txt
读取 ./package.json,告诉我项目名。

这时流程会更像一个 coding agent:

txt
模型请求 read_file
-> 本地读取 package.json
-> 工具结果写回 messages
-> 模型生成最终回答

下一步#

加上流式以后,这个 Agent 已经更像一个能用的终端助手。

messages 还在一直变长。每次用户输入、模型回答、工具调用、工具结果,都会追加进去。短任务没问题,长任务会遇到上下文越来越大的问题。

所以下一层我准备写 context compact。

也就是:保留最近几轮,把更早的历史压缩成任务摘要,再继续让 Agent 工作。