--- title: "流式响应机制 - Claude Code 打字机效果原理" description: "解析 Claude Code 流式响应实现:如何通过 SSE 逐 token 接收 AI 输出,实现实时打字机效果,提升用户等待体验。" keywords: ["流式响应", "SSE", "streaming", "实时输出", "API streaming"] sourceRef: "3ec5675 (2026-04-08)" --- ## 为什么需要流式 想象 AI 需要 30 秒才能生成完整回答——如果等 30 秒后才一次性显示,用户体验是灾难性的。 流式响应让用户**实时看到 AI 的思考过程**: - 文字逐字出现,用户能提前判断方向是否正确 - 工具调用的参数在生成过程中就能预览 - 长时间任务不会让用户觉得"卡死了" ## `BetaRawMessageStreamEvent` 核心事件类型 流式 API 返回的是一系列 `BetaRawMessageStreamEvent`,每种事件类型对应流式响应的不同阶段(`src/services/api/claude.ts`): ``` message_start ← 消息开始,包含 model、usage 初始值 ├── content_block_start ← 内容块开始(text / tool_use / thinking) │ ├── content_block_delta ← 增量数据(text_delta / input_json_delta / thinking_delta) │ ├── content_block_delta ← ... 持续到达 │ └── content_block_stop ← 内容块结束,yield AssistantMessage ├── content_block_start ← 下一个内容块... │ └── ... └── message_delta ← stop_reason + 最终 usage message_stop ← 消息结束 ``` ### 事件处理状态机 `src/services/api/claude.ts` 中 `queryModelWithStreaming()` 函数的事件处理循环实现了一个基于 `switch(part.type)` 的状态机: | 事件类型 | 处理逻辑 | 状态变更 | |----------|----------|----------| | `message_start` | 初始化 `partialMessage`,记录 TTFT(首字节延迟) | `usage` 初始化 | | `content_block_start` | 按 `part.index` 创建对应类型的内容块 | `contentBlocks[index]` 初始化 | | `content_block_delta` | 按子类型增量追加数据 | text / thinking / input 累加 | | `content_block_stop` | 构建完整 `AssistantMessage` 并 yield | 消息推入 `newMessages` | | `message_delta` | 更新 stop_reason 和最终 usage | 写回最后一条消息 | | `message_stop` | 无操作(流结束标记) | — | ### 内容块类型及其增量数据 `content_block_start` 中的 `content_block.type` 决定了如何处理后续 delta: | 内容块类型 | Delta 类型 | 累加逻辑 | |-----------|-----------|----------| | `text` | `text_delta` | `text += delta.text` | | `thinking` | `thinking_delta` + `signature_delta` | `thinking += delta.thinking`,`signature = delta.signature` | | `tool_use` | `input_json_delta` | `input += delta.partial_json`(JSON 字符串增量拼接) | | `server_tool_use` | `input_json_delta` | 同 tool_use | | `connector_text` | `connector_text_delta` | 特殊连接器文本(feature flag 控制) | 关键设计:`content_block_start` 时所有文本字段初始化为空字符串,只通过 `content_block_delta` 累加。这是因为 SDK 有时在 start 和 delta 中重复发送相同文本。 ## 文本 chunk 和 tool_use block 的交织 一次 AI 响应可能包含多个内容块,交替出现: ``` content_block_start (text, index=0) "我来帮你修复这个 bug。" content_block_delta (text_delta) "首先..." content_block_stop (index=0) content_block_start (tool_use, index=1) { name: "Read", input: "..." } content_block_delta (input_json_delta) '{"file_p' → 'ath":' → '"src/foo.ts"}' content_block_stop (index=1) content_block_start (text, index=2) "我已经看到了问题所在..." content_block_stop (index=2) ``` 每个 `content_block_stop` 触发一次 `yield`,将完整的 AssistantMessage 推送给消费者。这意味着一个 AI 响应会产生**多条** `AssistantMessage`——文本消息和工具调用消息交替产出。 `stop_reason` 要等到 `message_delta` 才确定(可能是 `end_turn`、`tool_use`、`max_tokens` 等),所以最后一条消息的 `stop_reason` 是**回写**的: ```typescript // claude.ts — stop_reason 回写逻辑(直接属性修改,不用对象替换) // 因为 transcript 写队列持有 message.message 的引用 const lastMsg = newMessages.at(-1) if (lastMsg) { lastMsg.message.usage = usage lastMsg.message.stop_reason = stopReason } ``` ## 流式中的错误处理 ### 网络断开 流式连接依赖 SSE(Server-Sent Events)。当连接中断时,系统有两层检测机制: 1. **被动停滞检测**(`src/services/api/claude.ts` 中 stall 检测逻辑):当下一个事件到达时,计算与上一个事件的时间间隔。超过阈值(30 秒,`STALL_THRESHOLD_MS = 30_000`)记录为一次 stall,累积计数并写入遥测日志。这是被动检测——仅在下一个 chunk 到达时才触发,不会主动中断流。 2. **主动空闲超时看门狗**(`src/services/api/claude.ts` 中 `STREAM_IDLE_TIMEOUT_MS` 看门狗逻辑):使用 `setTimeout` 设置 90 秒(可通过 `CLAUDE_STREAM_IDLE_TIMEOUT_MS` 环境变量覆盖)的硬性超时。如果在此期间没有收到任何事件,主动终止流并抛出错误进入重试流程。 3. **非流式降级**:作为最后手段,设置 `didFallBackToNonStreaming` 标志,通过 `executeNonStreamingRequest()` 回退到非流式请求(一次性获取完整响应)。 ```typescript // claude.ts — 被动停滞检测 const STALL_THRESHOLD_MS = 30_000 // 30 秒无事件视为停滞 let totalStallTime = 0 let stallCount = 0 // claude.ts — 主动空闲超时 const STREAM_IDLE_TIMEOUT_MS = parseInt(process.env.CLAUDE_STREAM_IDLE_TIMEOUT_MS || '', 10) || 90_000 ``` ### API 限流 当 API 返回限流错误时,系统使用 `withRetry` 包装器进行指数退避重试。重试逻辑考虑了: - 错误类型(429 限流 vs 500 服务器错误) - 重试次数上限 - 退避间隔 ### Token 超限 两种 token 超限场景有不同的处理: | 场景 | stop_reason | 处理方式 | |------|------------|----------| | **输出超限** | `max_tokens` | 生成错误消息,建议设置 `CLAUDE_CODE_MAX_OUTPUT_TOKENS` | | **上下文窗口超限** | `model_context_window_exceeded` | 触发 compaction 压缩对话历史后重试 | ```typescript // claude.ts — stop_reason 处理 if (stopReason === 'max_tokens') { yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... }) } if (stopReason === 'model_context_window_exceeded') { // 复用 max_output_tokens 的恢复路径 yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... }) } ``` ### 流式停滞检测 系统持续监控事件到达间隔,检测"停滞"(stall): ```typescript // claude.ts — stall 检测逻辑 const STALL_THRESHOLD_MS = 30_000 // 30 秒无事件视为停滞 if (timeSinceLastEvent > STALL_THRESHOLD_MS) { stallCount++ totalStallTime += timeSinceLastEvent logEvent('tengu_streaming_stall', { stall_duration_ms, stall_count, ... }) } ``` 这是**被动检测**——仅在下一个 chunk 到达时才触发比较。与之互补的是 90 秒主动空闲超时看门狗(`STREAM_IDLE_TIMEOUT_MS`),会直接中断长时间无响应的流。 ## 工具执行的流式反馈 BashTool 的命令执行也是流式的——通过 `onProgress` 回调逐行推送输出: ``` BashTool.call() → runShellCommand() → AsyncGenerator ├── 每秒轮询输出文件 → onProgress(lastLines, allLines, ...) ├── yield { type: 'progress', output, fullOutput, elapsedTimeSeconds } └── return { code, stdout, interrupted, ... } ``` UI 层通过 `useToolCallProgress` hook 实时展示命令输出,而不是等命令完全结束。长时间运行的命令还支持自动后台化(`shouldAutoBackground`)。 ## 多 Provider 适配 | Provider | 流式协议 | 特殊处理 | |----------|----------|----------| | **firstParty** (Anthropic Direct) | 原生 SSE | 延迟最低,TTFT 最快 | | **AWS Bedrock** | AWS SDK 流式接口 | 需要额外的 beta header 和认证 | | **Google Vertex** | gRPC → 事件流 | 通过 `getMergedBetas()` 适配 | | **foundry** | Anthropic 兼容 API | 内部部署 | | **openai** | OpenAI 流式适配器 | 转换为 Anthropic 内部格式 | | **gemini** | Gemini 流式适配器 | 转换为 Anthropic 内部格式 | | **grok** (xAI) | Grok 流式适配器 | 转换为 Anthropic 内部格式 | 所有 Provider 通过统一的 `Stream` 抽象层屏蔽差异。上层代码(QueryEngine、REPL)不需要关心底层用的是哪个 Provider。 ### Provider 选择 `src/utils/model/providers.ts` 中的 `getAPIProvider()` 根据配置决定使用哪个 Provider: ```typescript // 根据 api_provider 配置选择: // "anthropic" → 直连 // "bedrock" → AWS SDK // "vertex" → Google SDK // 第三方 base URL → 自动检测 ``` 每个 Provider 需要适配的细节包括:认证方式、beta header、请求参数格式、错误码映射——但这些差异在 `claude.ts` 的 `queryStream()` 函数中被统一处理。