Files
claude-code/docs/conversation/streaming.mdx
CyberScrubber 8b2532a9c1 docs: fix documentation deviations from source code (#220)
* docs: 修正 docs/conversation 文档与源码的偏差(multi-turn/streaming/the-loop)

- multi-turn: TranscriptWriter→Project 私有类, 会话路径改用 sanitized-cwd,
  补充 StoredCostState.lastDuration 字段, 模型切换改为 setModel(),
  QueryEngine 状态补全 loadedNestedMemoryPaths/hasHandledOrphanedPermission,
  行号改为符号引用
- streaming: STALL_THRESHOLD_MS 10s→30s, 新增 90s 主动空闲看门狗描述,
  非流式降级补充 didFallBackToNonStreaming/executeNonStreamingRequest,
  行号改为符号引用
- the-loop: 终止条件 7→11, 继续条件重整为 5 组层级结构,
  max_output_tokens 拆分 escalate/recovery 子阶段,
  prompt-too-long 拆分 collapse_drain/reactive_compact 子策略,
  State 类型修正 autoCompactTracking 为可选, 行号改为符号引用
- 全部: 添加 sourceRef 版本锚定(3ec5675)

* docs: 修正 docs/extensibility 文档与源码的偏差(custom-agents/hooks/skills)

- custom-agents: Verification 模型修正为 inherit, 补充 Plugin Agent 字段限制
  (permissionMode/hooks/mcpServers 被安全忽略, isolation 仅 worktree),
  加载流程修正为 6 层优先级, 补充 memory snapshot 门控条件
- hooks: 事件数 22→27(补充 Notification), Hook 类型定义位置修正为 3 个文件,
  行号改为符号引用, Zod schema 范围修正, 去重键修正为四部分复合键,
  registerFrontmatterHooks/clearSessionHooks 区分定义位置和调用位置
- skills: 字段数 17→16, 权限层级 4→5(补充 remote canonical auto-allow),
  SAFE_SKILL_PROPERTIES 28→30, skillUsageTracking 路径修正,
  行号改为符号引用
- mcp-protocol: 全部验证通过, 无需修改
- 全部: 添加 sourceRef 版本锚定(3ec5675)

* Revert "docs: 修正 docs/extensibility 文档与源码的偏差(custom-agents/hooks/skills)"

* docs: 修正 docs/extensibility 文档与源码的偏差(hooks/skills/mcp-protocol)

hooks:
- 事件数 22→27(补充 Notification 事件)
- Hook 类型定义位置修正为 3 个文件分布
  (schemas/hooks.ts / types/hooks.ts / utils/hooks/sessionHooks.ts)
- Zod schema 引用从硬编码行号改为符号引用
- hookSpecificOutput 表从 6 扩展至 15 个事件
  (补全 permissionDecisionReason / PostToolUseFailure / SubagentStart 等)
- 去重键从 pluginRoot\0command 修正为四部分复合键
  (pluginRoot\0shell\0command\0ifCondition)
- 全部硬编码行号改为符号引用以避免版本漂移

skills:
- parseSkillFrontmatterFields 字段数 17→16
- SAFE_SKILL_PROPERTIES 属性数 28→30
- checkPermissions 层级 4→5
- 第 2 层描述从"官方市场"修正为"远程 canonical"

mcp-protocol:
- 配置层级从"三级"修正为
  "enterprise 独占或合并 user/project/local + plugin + claude.ai"

* docs: 修正 system-prompt.mdx 中 Boundary 章节的层级与可读性

- Boundary 插入条件从 ### 降为 blockquote,不再打断三种分块模式的并列结构
- 表格中 Boundary 缓存策略列补充说明其分割作用
- 新增 Boundary 概念释义(blockquote),解释其分割静态区/动态区以实现全局缓存的设计意图
2026-04-09 17:53:11 +08:00

190 lines
8.7 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
---
title: "流式响应机制 - Claude Code 打字机效果原理"
description: "解析 Claude Code 流式响应实现:如何通过 SSE 逐 token 接收 AI 输出,实现实时打字机效果,提升用户等待体验。"
keywords: ["流式响应", "SSE", "streaming", "实时输出", "API streaming"]
sourceRef: "3ec5675 (2026-04-08)"
---
## 为什么需要流式
想象 AI 需要 30 秒才能生成完整回答——如果等 30 秒后才一次性显示,用户体验是灾难性的。
流式响应让用户**实时看到 AI 的思考过程**
- 文字逐字出现,用户能提前判断方向是否正确
- 工具调用的参数在生成过程中就能预览
- 长时间任务不会让用户觉得"卡死了"
## `BetaRawMessageStreamEvent` 核心事件类型
流式 API 返回的是一系列 `BetaRawMessageStreamEvent`,每种事件类型对应流式响应的不同阶段(`src/services/api/claude.ts`
```
message_start ← 消息开始,包含 model、usage 初始值
├── content_block_start ← 内容块开始text / tool_use / thinking
│ ├── content_block_delta ← 增量数据text_delta / input_json_delta / thinking_delta
│ ├── content_block_delta ← ... 持续到达
│ └── content_block_stop ← 内容块结束yield AssistantMessage
├── content_block_start ← 下一个内容块...
│ └── ...
└── message_delta ← stop_reason + 最终 usage
message_stop ← 消息结束
```
### 事件处理状态机
`src/services/api/claude.ts` 中 `queryStreamRaw()` 函数的事件处理循环实现了一个基于 `switch(part.type)` 的状态机:
| 事件类型 | 处理逻辑 | 状态变更 |
|----------|----------|----------|
| `message_start` | 初始化 `partialMessage`,记录 TTFT首字节延迟 | `usage` 初始化 |
| `content_block_start` | 按 `part.index` 创建对应类型的内容块 | `contentBlocks[index]` 初始化 |
| `content_block_delta` | 按子类型增量追加数据 | text / thinking / input 累加 |
| `content_block_stop` | 构建完整 `AssistantMessage` 并 yield | 消息推入 `newMessages` |
| `message_delta` | 更新 stop_reason 和最终 usage | 写回最后一条消息 |
| `message_stop` | 无操作(流结束标记) | — |
### 内容块类型及其增量数据
`content_block_start` 中的 `content_block.type` 决定了如何处理后续 delta
| 内容块类型 | Delta 类型 | 累加逻辑 |
|-----------|-----------|----------|
| `text` | `text_delta` | `text += delta.text` |
| `thinking` | `thinking_delta` + `signature_delta` | `thinking += delta.thinking``signature = delta.signature` |
| `tool_use` | `input_json_delta` | `input += delta.partial_json`JSON 字符串增量拼接) |
| `server_tool_use` | `input_json_delta` | 同 tool_use |
| `connector_text` | `connector_text_delta` | 特殊连接器文本feature flag 控制) |
关键设计:`content_block_start` 时所有文本字段初始化为空字符串,只通过 `content_block_delta` 累加。这是因为 SDK 有时在 start 和 delta 中重复发送相同文本。
## 文本 chunk 和 tool_use block 的交织
一次 AI 响应可能包含多个内容块,交替出现:
```
content_block_start (text, index=0) "我来帮你修复这个 bug。"
content_block_delta (text_delta) "首先..."
content_block_stop (index=0)
content_block_start (tool_use, index=1) { name: "Read", input: "..." }
content_block_delta (input_json_delta) '{"file_p' → 'ath":' → '"src/foo.ts"}'
content_block_stop (index=1)
content_block_start (text, index=2) "我已经看到了问题所在..."
content_block_stop (index=2)
```
每个 `content_block_stop` 触发一次 `yield`,将完整的 AssistantMessage 推送给消费者。这意味着一个 AI 响应会产生**多条** `AssistantMessage`——文本消息和工具调用消息交替产出。
`stop_reason` 要等到 `message_delta` 才确定(可能是 `end_turn`、`tool_use`、`max_tokens` 等),所以最后一条消息的 `stop_reason` 是**回写**的:
```typescript
// claude.ts — stop_reason 回写逻辑(直接属性修改,不用对象替换)
// 因为 transcript 写队列持有 message.message 的引用
const lastMsg = newMessages.at(-1)
if (lastMsg) {
lastMsg.message.usage = usage
lastMsg.message.stop_reason = stopReason
}
```
## 流式中的错误处理
### 网络断开
流式连接依赖 SSEServer-Sent Events。当连接中断时系统有两层检测机制
1. **被动停滞检测**`src/services/api/claude.ts` 中 stall 检测逻辑当下一个事件到达时计算与上一个事件的时间间隔。超过阈值30 秒,`STALL_THRESHOLD_MS = 30_000`)记录为一次 stall累积计数并写入遥测日志。这是被动检测——仅在下一个 chunk 到达时才触发,不会主动中断流。
2. **主动空闲超时看门狗**`src/services/api/claude.ts` 中 `STREAM_IDLE_TIMEOUT_MS` 看门狗逻辑):使用 `setTimeout` 设置 90 秒(可通过 `CLAUDE_STREAM_IDLE_TIMEOUT_MS` 环境变量覆盖)的硬性超时。如果在此期间没有收到任何事件,主动终止流并抛出错误进入重试流程。
3. **非流式降级**:作为最后手段,设置 `didFallBackToNonStreaming` 标志,通过 `executeNonStreamingRequest()` 回退到非流式请求(一次性获取完整响应)。
```typescript
// claude.ts — 被动停滞检测
const STALL_THRESHOLD_MS = 30_000 // 30 秒无事件视为停滞
let totalStallTime = 0
let stallCount = 0
// claude.ts — 主动空闲超时
const STREAM_IDLE_TIMEOUT_MS =
parseInt(process.env.CLAUDE_STREAM_IDLE_TIMEOUT_MS || '', 10) || 90_000
```
### API 限流
当 API 返回限流错误时,系统使用 `withRetry` 包装器进行指数退避重试。重试逻辑考虑了:
- 错误类型429 限流 vs 500 服务器错误)
- 重试次数上限
- 退避间隔
### Token 超限
两种 token 超限场景有不同的处理:
| 场景 | stop_reason | 处理方式 |
|------|------------|----------|
| **输出超限** | `max_tokens` | 生成错误消息,建议设置 `CLAUDE_CODE_MAX_OUTPUT_TOKENS` |
| **上下文窗口超限** | `model_context_window_exceeded` | 触发 compaction 压缩对话历史后重试 |
```typescript
// claude.ts — stop_reason 处理
if (stopReason === 'max_tokens') {
yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}
if (stopReason === 'model_context_window_exceeded') {
// 复用 max_output_tokens 的恢复路径
yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}
```
### 流式停滞检测
系统持续监控事件到达间隔,检测"停滞"stall
```typescript
// claude.ts — stall 检测逻辑
const STALL_THRESHOLD_MS = 30_000 // 30 秒无事件视为停滞
if (timeSinceLastEvent > STALL_THRESHOLD_MS) {
stallCount++
totalStallTime += timeSinceLastEvent
logEvent('tengu_streaming_stall', { stall_duration_ms, stall_count, ... })
}
```
这是**被动检测**——仅在下一个 chunk 到达时才触发比较。与之互补的是 90 秒主动空闲超时看门狗(`STREAM_IDLE_TIMEOUT_MS`),会直接中断长时间无响应的流。
## 工具执行的流式反馈
BashTool 的命令执行也是流式的——通过 `onProgress` 回调逐行推送输出:
```
BashTool.call() → runShellCommand() → AsyncGenerator
├── 每秒轮询输出文件 → onProgress(lastLines, allLines, ...)
├── yield { type: 'progress', output, fullOutput, elapsedTimeSeconds }
└── return { code, stdout, interrupted, ... }
```
UI 层通过 `useToolCallProgress` hook 实时展示命令输出,而不是等命令完全结束。长时间运行的命令还支持自动后台化(`shouldAutoBackground`)。
## 多 Provider 适配
| Provider | 流式协议 | 特殊处理 |
|----------|----------|----------|
| **Anthropic Direct** | 原生 SSE | 延迟最低TTFT 最快 |
| **AWS Bedrock** | AWS SDK 流式接口 | 需要额外的 beta header 和认证 |
| **Google Vertex** | gRPC → 事件流 | 通过 `getMergedBetas()` 适配 |
| **Azure** | Anthropic 兼容 API | 自定义 base URL |
所有 Provider 通过统一的 `Stream<BetaRawMessageStreamEvent>` 抽象层屏蔽差异。上层代码QueryEngine、REPL不需要关心底层用的是哪个 Provider。
### Provider 选择
`src/utils/model/providers.ts` 中的 `getAPIProvider()` 根据配置决定使用哪个 Provider
```typescript
// 根据 api_provider 配置选择:
// "anthropic" → 直连
// "bedrock" → AWS SDK
// "vertex" → Google SDK
// 第三方 base URL → 自动检测
```
每个 Provider 需要适配的细节包括认证方式、beta header、请求参数格式、错误码映射——但这些差异在 `claude.ts` 的 `queryStream()` 函数中被统一处理。