From 66131a7b765ea030a0ecfc7a2a7ccccef695efbb Mon Sep 17 00:00:00 2001 From: claude-code-best Date: Mon, 20 Apr 2026 00:32:48 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E9=87=8D=E5=86=99=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E5=93=8D=E5=BA=94=EF=BC=8C=E7=A7=BB=E9=99=A4=20API=20=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E7=B1=BB=E5=9E=8B=E6=9E=9A=E4=B8=BE=E5=92=8C=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E7=89=87=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 从核心设计选择讲起(流式不只是打字机效果), 将错误处理重组为三层防护体系, 增加 Provider 适配策略对比表和设计考量分析。 Co-Authored-By: Claude Opus 4.6 --- docs/conversation/streaming.mdx | 211 ++++++++++---------------------- 1 file changed, 65 insertions(+), 146 deletions(-) diff --git a/docs/conversation/streaming.mdx b/docs/conversation/streaming.mdx index 2d9c18811..17afa240b 100644 --- a/docs/conversation/streaming.mdx +++ b/docs/conversation/streaming.mdx @@ -1,192 +1,111 @@ --- -title: "流式响应机制 - Claude Code 打字机效果原理" -description: "解析 Claude Code 流式响应实现:如何通过 SSE 逐 token 接收 AI 输出,实现实时打字机效果,提升用户等待体验。" -keywords: ["流式响应", "SSE", "streaming", "实时输出", "API streaming"] -sourceRef: "3ec5675 (2026-04-08)" +title: "流式响应" +description: "为什么流式是 Claude Code 的核心设计选择?理解流式传输的设计考量、错误处理策略和多 Provider 适配。" +keywords: ["流式响应", "SSE", "streaming", "API streaming"] --- -## 为什么需要流式 +## 为什么流式是核心设计 想象 AI 需要 30 秒才能生成完整回答——如果等 30 秒后才一次性显示,用户体验是灾难性的。 -流式响应让用户**实时看到 AI 的思考过程**: -- 文字逐字出现,用户能提前判断方向是否正确 -- 工具调用的参数在生成过程中就能预览 -- 长时间任务不会让用户觉得"卡死了" +但流式不仅仅是为了"打字机效果"。在 Claude Code 中,流式是整个系统架构的基础假设: -## `BetaRawMessageStreamEvent` 核心事件类型 +- **工具并行执行**:AI 在流式输出过程中就可能发出工具调用,系统可以立即开始执行,不必等整个响应结束。这使得"AI 边想边做"成为可能 +- **实时反馈**:用户看到 AI 的思考方向后可以提前判断是否正确,必要时提前中断 +- **可取消性**:流式架构天然支持用户中断——随时可以终止正在进行的流 +- **工具执行反馈**:不仅是 AI 输出是流式的,工具执行(如 shell 命令)的输出也是流式的——用户实时看到命令输出 -流式 API 返回的是一系列 `BetaRawMessageStreamEvent`,每种事件类型对应流式响应的不同阶段(`src/services/api/claude.ts`): +**代价**:流式架构比一次性响应复杂得多——需要处理连接中断、部分数据、乱序事件等边界情况。 + +## 流式响应的概念模型 + +一次流式响应不是"一个完整的回答",而是一系列按顺序到达的事件流: ``` -message_start ← 消息开始,包含 model、usage 初始值 - ├── content_block_start ← 内容块开始(text / tool_use / thinking) - │ ├── content_block_delta ← 增量数据(text_delta / input_json_delta / thinking_delta) - │ ├── content_block_delta ← ... 持续到达 - │ └── content_block_stop ← 内容块结束,yield AssistantMessage - ├── content_block_start ← 下一个内容块... - │ └── ... - └── message_delta ← stop_reason + 最终 usage -message_stop ← 消息结束 +消息开始 + ├── 内容块 1:文本 "我来帮你修复这个 bug。" + ├── 内容块 2:工具调用 { name: "Read", input: "..." } + ├── 内容块 3:文本 "我看到了问题..." + └── ... +消息结束(包含停止原因和 token 用量) ``` -### 事件处理状态机 +关键设计点: -`src/services/api/claude.ts` 中 `queryModelWithStreaming()` 函数的事件处理循环实现了一个基于 `switch(part.type)` 的状态机: +### 内容块的增量累加 -| 事件类型 | 处理逻辑 | 状态变更 | -|----------|----------|----------| -| `message_start` | 初始化 `partialMessage`,记录 TTFT(首字节延迟) | `usage` 初始化 | -| `content_block_start` | 按 `part.index` 创建对应类型的内容块 | `contentBlocks[index]` 初始化 | -| `content_block_delta` | 按子类型增量追加数据 | text / thinking / input 累加 | -| `content_block_stop` | 构建完整 `AssistantMessage` 并 yield | 消息推入 `newMessages` | -| `message_delta` | 更新 stop_reason 和最终 usage | 写回最后一条消息 | -| `message_stop` | 无操作(流结束标记) | — | +每个内容块(文本、思考、工具调用)都是通过增量数据逐步构建的。文本逐字到达,工具调用的 JSON 参数逐段到达。系统需要在内存中持续累加这些片段,直到一个内容块完整后才传递给消费者。 -### 内容块类型及其增量数据 +### 多消息产出 -`content_block_start` 中的 `content_block.type` 决定了如何处理后续 delta: +因为一次 AI 响应可能包含多个内容块(文本和工具调用交替出现),每个完整的内容块都会触发一次消息传递。这意味着一个 API 响应会产生**多条消息**——文本消息和工具调用消息交替产出。 -| 内容块类型 | Delta 类型 | 累加逻辑 | -|-----------|-----------|----------| -| `text` | `text_delta` | `text += delta.text` | -| `thinking` | `thinking_delta` + `signature_delta` | `thinking += delta.thinking`,`signature = delta.signature` | -| `tool_use` | `input_json_delta` | `input += delta.partial_json`(JSON 字符串增量拼接) | -| `server_tool_use` | `input_json_delta` | 同 tool_use | -| `connector_text` | `connector_text_delta` | 特殊连接器文本(feature flag 控制) | +### 停止原因的回写 -关键设计:`content_block_start` 时所有文本字段初始化为空字符串,只通过 `content_block_delta` 累加。这是因为 SDK 有时在 start 和 delta 中重复发送相同文本。 +AI 最终是"回答完毕"还是"需要调用工具",这个信息要到最后才知道。所以停止原因是在消息结束时**回写**到最后一条消息上的——消费者在收到中间消息时还不知道整轮对话是否结束。 -## 文本 chunk 和 tool_use block 的交织 +## 错误处理:三层防护 -一次 AI 响应可能包含多个内容块,交替出现: +流式连接比一次性请求脆弱得多——网络波动、服务器过载、连接超时都可能导致中断。系统设计了三层防护: -``` -content_block_start (text, index=0) "我来帮你修复这个 bug。" -content_block_delta (text_delta) "首先..." -content_block_stop (index=0) -content_block_start (tool_use, index=1) { name: "Read", input: "..." } -content_block_delta (input_json_delta) '{"file_p' → 'ath":' → '"src/foo.ts"}' -content_block_stop (index=1) -content_block_start (text, index=2) "我已经看到了问题所在..." -content_block_stop (index=2) -``` +### 第一层:被动停滞检测 -每个 `content_block_stop` 触发一次 `yield`,将完整的 AssistantMessage 推送给消费者。这意味着一个 AI 响应会产生**多条** `AssistantMessage`——文本消息和工具调用消息交替产出。 +系统记录每个事件到达的时间间隔。当间隔超过 30 秒时,记录为一次"停滞"并写入遥测。这是被动检测——只在下一个事件到达时才能发现之前的停滞,不会主动中断流。 -`stop_reason` 要等到 `message_delta` 才确定(可能是 `end_turn`、`tool_use`、`max_tokens` 等),所以最后一条消息的 `stop_reason` 是**回写**的: +**设计考量**:为什么不立即中断?因为 API 可能在做长时间的计算(如复杂推理),短暂的无响应不一定意味着故障。被动检测提供了观测能力,而不影响正常流程。 -```typescript -// claude.ts — stop_reason 回写逻辑(直接属性修改,不用对象替换) -// 因为 transcript 写队列持有 message.message 的引用 -const lastMsg = newMessages.at(-1) -if (lastMsg) { - lastMsg.message.usage = usage - lastMsg.message.stop_reason = stopReason -} -``` +### 第二层:主动空闲超时 -## 流式中的错误处理 +如果 90 秒内没有收到任何事件(可通过环境变量配置),系统主动终止流并进入重试流程。 -### 网络断开 +**设计考量**:这是兜底机制。真正的故障不能靠被动检测发现——因为被动检测依赖于"下一个事件到达",而如果连接已经死了,下一个事件永远不会到达。 -流式连接依赖 SSE(Server-Sent Events)。当连接中断时,系统有两层检测机制: +### 第三层:非流式降级 -1. **被动停滞检测**(`src/services/api/claude.ts` 中 stall 检测逻辑):当下一个事件到达时,计算与上一个事件的时间间隔。超过阈值(30 秒,`STALL_THRESHOLD_MS = 30_000`)记录为一次 stall,累积计数并写入遥测日志。这是被动检测——仅在下一个 chunk 到达时才触发,不会主动中断流。 -2. **主动空闲超时看门狗**(`src/services/api/claude.ts` 中 `STREAM_IDLE_TIMEOUT_MS` 看门狗逻辑):使用 `setTimeout` 设置 90 秒(可通过 `CLAUDE_STREAM_IDLE_TIMEOUT_MS` 环境变量覆盖)的硬性超时。如果在此期间没有收到任何事件,主动终止流并抛出错误进入重试流程。 -3. **非流式降级**:作为最后手段,设置 `didFallBackToNonStreaming` 标志,通过 `executeNonStreamingRequest()` 回退到非流式请求(一次性获取完整响应)。 +作为最后手段,系统可以回退到非流式请求——一次性获取完整响应。失去了实时性,但保证了功能可用性。 -```typescript -// claude.ts — 被动停滞检测 -const STALL_THRESHOLD_MS = 30_000 // 30 秒无事件视为停滞 -let totalStallTime = 0 -let stallCount = 0 +### 降级策略对比 -// claude.ts — 主动空闲超时 -const STREAM_IDLE_TIMEOUT_MS = - parseInt(process.env.CLAUDE_STREAM_IDLE_TIMEOUT_MS || '', 10) || 90_000 -``` +| 策略 | 检测方式 | 响应时间 | 用户体验 | +|------|----------|----------|----------| +| 正常流式 | — | 最低延迟 | 逐字显示 | +| 被动停滞检测 | 下一个事件到达时 | 不变 | 无感知 | +| 主动超时中断 | 定时器触发 | 中断后重试延迟 | 短暂停顿后恢复 | +| 非流式降级 | 重试失败后 | 等待完整响应 | 等待后一次性显示 | -### API 限流 +## Token 超限的两种场景 -当 API 返回限流错误时,系统使用 `withRetry` 包装器进行指数退避重试。重试逻辑考虑了: -- 错误类型(429 限流 vs 500 服务器错误) -- 重试次数上限 -- 退避间隔 +两种不同的 token 超限需要不同的处理策略: -### Token 超限 +| 场景 | 含义 | 处理方式 | +|------|------|----------| +| **输出超限** | AI 话说了一半被切断 | 提升输出上限重试,或提示 AI "接着说" | +| **上下文窗口超限** | 整个对话历史太长,塞不进 API | 触发自动压缩,用 AI 摘要替代原始对话 | -两种 token 超限场景有不同的处理: - -| 场景 | stop_reason | 处理方式 | -|------|------------|----------| -| **输出超限** | `max_tokens` | 生成错误消息,建议设置 `CLAUDE_CODE_MAX_OUTPUT_TOKENS` | -| **上下文窗口超限** | `model_context_window_exceeded` | 触发 compaction 压缩对话历史后重试 | - -```typescript -// claude.ts — stop_reason 处理 -if (stopReason === 'max_tokens') { - yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... }) -} -if (stopReason === 'model_context_window_exceeded') { - // 复用 max_output_tokens 的恢复路径 - yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... }) -} -``` - -### 流式停滞检测 - -系统持续监控事件到达间隔,检测"停滞"(stall): - -```typescript -// claude.ts — stall 检测逻辑 -const STALL_THRESHOLD_MS = 30_000 // 30 秒无事件视为停滞 -if (timeSinceLastEvent > STALL_THRESHOLD_MS) { - stallCount++ - totalStallTime += timeSinceLastEvent - logEvent('tengu_streaming_stall', { stall_duration_ms, stall_count, ... }) -} -``` - -这是**被动检测**——仅在下一个 chunk 到达时才触发比较。与之互补的是 90 秒主动空闲超时看门狗(`STREAM_IDLE_TIMEOUT_MS`),会直接中断长时间无响应的流。 +关键区别:输出超限是"AI 话太多",可以通过调整上限解决;上下文超限是"给 AI 看的东西太多",必须通过压缩或删除来减少。 ## 工具执行的流式反馈 -BashTool 的命令执行也是流式的——通过 `onProgress` 回调逐行推送输出: +不仅是 API 响应是流式的,工具执行本身也是流式的。例如 BashTool 执行 shell 命令时,命令的标准输出会实时推送给 UI——用户不需要等命令完全结束才能看到结果。 -``` -BashTool.call() → runShellCommand() → AsyncGenerator - ├── 每秒轮询输出文件 → onProgress(lastLines, allLines, ...) - ├── yield { type: 'progress', output, fullOutput, elapsedTimeSeconds } - └── return { code, stdout, interrupted, ... } -``` - -UI 层通过 `useToolCallProgress` hook 实时展示命令输出,而不是等命令完全结束。长时间运行的命令还支持自动后台化(`shouldAutoBackground`)。 +长时间运行的命令还支持**自动后台化**:如果命令执行超过一定时间,系统自动将其移到后台,AI 可以继续处理其他任务,命令完成后再回调结果。 ## 多 Provider 适配 -| Provider | 流式协议 | 特殊处理 | -|----------|----------|----------| -| **firstParty** (Anthropic Direct) | 原生 SSE | 延迟最低,TTFT 最快 | -| **AWS Bedrock** | AWS SDK 流式接口 | 需要额外的 beta header 和认证 | -| **Google Vertex** | gRPC → 事件流 | 通过 `getMergedBetas()` 适配 | -| **foundry** | Anthropic 兼容 API | 内部部署 | -| **openai** | OpenAI 流式适配器 | 转换为 Anthropic 内部格式 | -| **gemini** | Gemini 流式适配器 | 转换为 Anthropic 内部格式 | -| **grok** (xAI) | Grok 流式适配器 | 转换为 Anthropic 内部格式 | +系统支持 7 种 API Provider,每种有不同的流式协议和认证方式: -所有 Provider 通过统一的 `Stream` 抽象层屏蔽差异。上层代码(QueryEngine、REPL)不需要关心底层用的是哪个 Provider。 +| Provider 类别 | 流式方式 | 设计挑战 | +|---------------|----------|----------| +| Anthropic 直连 | 原生 SSE | 基准实现,其他 Provider 对齐它 | +| 云平台(Bedrock/Vertex) | SDK 封装的流式接口 | 需要适配认证、beta header、参数格式 | +| 第三方兼容(OpenAI/Gemini/Grok) | 各自的流式协议 | 需要转换为 Anthropic 内部格式 | -### Provider 选择 +**设计策略**:所有 Provider 通过统一的流式抽象层屏蔽差异。上层代码(编排层、交互层)不需要关心底层用的是哪个 Provider——它们只看到统一的事件流。 -`src/utils/model/providers.ts` 中的 `getAPIProvider()` 根据配置决定使用哪个 Provider: +这意味着切换 Provider 不需要修改任何业务逻辑,只需要在通信层适配新的协议。这也是为什么"通信层"是独立的一层。 -```typescript -// 根据 api_provider 配置选择: -// "anthropic" → 直连 -// "bedrock" → AWS SDK -// "vertex" → Google SDK -// 第三方 base URL → 自动检测 -``` +## 接下来 -每个 Provider 需要适配的细节包括:认证方式、beta header、请求参数格式、错误码映射——但这些差异在 `claude.ts` 的 `queryStream()` 函数中被统一处理。 +- **多轮对话** — 理解跨迭代的上下文管理 +- **上下文压缩** — 深入了解 token 超限时的自动压缩机制 +- **工具系统** — 了解工具执行的并行策略