Files
claude-code/src/services/tools/StreamingToolExecutor.ts
claude-code-best 4f1649e249 feature: 20260429 代码巡检 (#383)
* fix: 实现 snipCompact/snipProjection 存根,修复 QueryEngine mutableMessages 不收缩的内存泄漏

将 snipCompact.ts 和 snipProjection.ts 从纯存根替换为完整实现:
- snipCompactIfNeeded: 检测 snip_boundary 消息,按 removedUuids 过滤消息,释放旧消息内存
- isSnipBoundaryMessage/projectSnippedView: 边界检测与视图投影
- isSnipMarkerMessage/isSnipRuntimeEnabled/shouldNudgeForSnips: 辅助函数
- 28 个测试覆盖边界检测、消息过滤、空输入、多边界等场景

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: 完善 StreamingToolExecutor.discard() 释放内部状态,修复 NO_FLICKER 模式内存泄漏

discard() 原先仅设置 flag,不释放 tools 数组、siblingAbortController 和 turnSpan。
NO_FLICKER 模式 API 重试时旧工具结果堆积无法被 GC 回收。

修复内容:
- 中止 siblingAbortController 以取消运行中的工具子进程
- 清空 tools 数组释放 TrackedTool 引用(block、assistantMessage、results、pendingProgress)
- 清理 progressAvailableResolve 和 turnSpan
- 添加 7 个测试覆盖 discard 后的各种状态验证

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: 清理 useReplBridge pendingPermissionHandlers,修复 RC 权限条目保留内存泄漏

pendingPermissionHandlers Map 原定义在 async IIFE 内部,组件卸载时
cleanup 函数无法访问。修复方案:
- 将 Map 提升至 useEffect 顶层作用域
- cleanup 时显式调用 pendingPermissionHandlers.clear() 释放闭包引用
- 添加 8 个测试覆盖 handler 注册/取消/响应/cleanup 模式

同时确认 #4 空闲渲染循环已完整实现(所有 10 个 useAnimationFrame
调用者均正确传递 null 暂停时钟)。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: 确认 #11 LRU 缓存键已完整实现,添加 FileStateCache 测试 + 修复类型错误

审计确认 #11 FileStateCache 已完整实现(LRU 双重限制 max+maxSize +
sizeCalculation),归类从"未实现"修正为"已确认完整"。
- 添加 16 个 FileStateCache 测试覆盖 LRU 驱逐、大小计算、路径归一化
- 添加 6 个 coerceToolContentToString 测试覆盖类型强制转换
- 修复 replBridgePermissionHandlers 测试的类型断言错误

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* docs: 完成内存泄漏审计,标记所有条目已处理

12 项审计条目全部处理完毕:
- 11 项已确认完整实现(含 4 项主动修复:#8 StreamingToolExecutor、#9 RC 权限、#12 snipCompact、#4 确认完整)
- 1 项已知限制(#7 Bun --compile 兼容性)
- 65 个测试覆盖所有修复项
- 验证报告确认所有修复代码正确实现

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: highlight.js 按需注册 26 个常用语言,减少 ~80% 语法内存占用

将 `import hljs from 'highlight.js'`(190+ 语言,~5-15MB)改为
`import hljs from 'highlight.js/lib/core'` + 静态导入并注册 26 个
常用语言(TypeScript、Python、Bash、Go、Rust 等)。静态 import
在 Bun --compile 模式下正常工作,避免了 createRequire 的路径问题。

内存从 ~5-15MB 降至 ~1-2MB。添加 7 个测试验证语言注册和
highlight 功能,现有 17 个 color-diff 测试全部通过。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: 修复 inProcessRunner 权限响应后未 cleanup 的 interval 泄漏

权限请求得到响应后(批准/拒绝),pollInterval 和 abort listener
未被清理,导致 setInterval 永远运行。在长时间运行的 swarm 会话
中,每次权限请求都会泄漏一个 interval 和一个 listener。

修复:在成功/拒绝路径中调用 cleanup() 以清理 interval、
unregister callback 和移除 abort listener。添加 6 个测试
覆盖 permission callback 注册/处理/清理生命周期。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: LSP openedFiles Map 在 compaction 后未清理,添加 closeAllFiles() 集成

LSPServerManager 的 openedFiles Map 持续增长(代码注释标注为 TODO),
长时间会话中每次文件操作都追加条目但从不清理。添加 closeAllFiles()
方法并在 postCompactCleanup 中调用,compaction 后释放所有 LSP 服务器端
文件状态。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: 修复 language-registration 测试在全量运行时因 hljs 单例污染而失败

cliHighlight.ts 导入全量 highlight.js(192 语言),与 color-diff-napi
使用的 highlight.js/lib/core 共享同一单例。全量测试运行时全量包先加载,
导致断言"未注册语言"和"不超过 30 个语言"失败。

改为验证目标 26 个语言全部存在,而非检查总数。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-29 09:14:26 +08:00

558 lines
18 KiB
TypeScript

import type { ToolUseBlock } from '@anthropic-ai/sdk/resources/index.mjs'
import {
createUserMessage,
REJECT_MESSAGE,
withMemoryCorrectionHint,
} from 'src/utils/messages.js'
import type { CanUseToolFn } from '../../hooks/useCanUseTool.js'
import { findToolByName, type Tools, type ToolUseContext } from '../../Tool.js'
import { BASH_TOOL_NAME } from '@claude-code-best/builtin-tools/tools/BashTool/toolName.js'
import type { AssistantMessage, Message } from '../../types/message.js'
import { createChildAbortController } from '../../utils/abortController.js'
import { runToolUse } from './toolExecution.js'
import { createToolBatchSpan, endToolBatchSpan } from '../langfuse/index.js'
import type { LangfuseSpan } from '../langfuse/index.js'
type MessageUpdate = {
message?: Message
newContext?: ToolUseContext
}
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
type TrackedTool = {
id: string
block: ToolUseBlock
assistantMessage: AssistantMessage
status: ToolStatus
isConcurrencySafe: boolean
promise?: Promise<void>
results?: Message[]
// Progress messages are stored separately and yielded immediately
pendingProgress: Message[]
contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
}
/**
* Executes tools as they stream in with concurrency control.
* - Concurrent-safe tools can execute in parallel with other concurrent-safe tools
* - Non-concurrent tools must execute alone (exclusive access)
* - Results are buffered and emitted in the order tools were received
*/
export class StreamingToolExecutor {
private tools: TrackedTool[] = []
private toolUseContext: ToolUseContext
private hasErrored = false
private erroredToolDescription = ''
private siblingAbortController: AbortController
private discarded = false
private progressAvailableResolve?: () => void
private turnSpan: LangfuseSpan | null = null
constructor(
private readonly toolDefinitions: Tools,
private readonly canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
) {
this.toolUseContext = toolUseContext
this.siblingAbortController = createChildAbortController(
toolUseContext.abortController,
)
}
/**
* Discards all pending and in-progress tools. Called when streaming fallback
* occurs and results from the failed attempt should be abandoned.
* Queued tools won't start, and in-progress tools will receive synthetic errors.
*
* Releases all internal references (tools array, abort controller, context)
* so that the discarded executor and its buffered results can be garbage-collected.
* Without this, repeated API retries in NO_FLICKER mode accumulate leaked
* TrackedTool objects (each holding assistantMessage, results, pendingProgress).
*/
discard(): void {
this.discarded = true
// Abort running tool subprocesses (Bash spawns, etc.) so they don't
// continue producing results after the executor is replaced.
this.siblingAbortController.abort('streaming_fallback')
// Release references to allow GC of tool blocks, messages, and promises.
this.tools.length = 0
this.progressAvailableResolve = undefined
if (this.turnSpan) {
endToolBatchSpan(this.turnSpan)
this.turnSpan = null
}
}
/**
* Add a tool to the execution queue. Will start executing immediately if conditions allow.
*/
addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
// Create turn span on first tool — will be ended in getRemainingResults
if (this.tools.length === 0 && this.turnSpan === null) {
this.turnSpan = createToolBatchSpan(
this.toolUseContext.langfuseTrace ?? null,
{ toolNames: [block.name], batchIndex: 0 },
)
if (this.turnSpan) {
this.toolUseContext = { ...this.toolUseContext, langfuseBatchSpan: this.turnSpan }
}
}
const toolDefinition = findToolByName(this.toolDefinitions, block.name)
if (!toolDefinition) {
this.tools.push({
id: block.id,
block,
assistantMessage,
status: 'completed',
isConcurrencySafe: true,
pendingProgress: [],
results: [
createUserMessage({
content: [
{
type: 'tool_result',
content: `<tool_use_error>Error: No such tool available: ${block.name}</tool_use_error>`,
is_error: true,
tool_use_id: block.id,
},
],
toolUseResult: `Error: No such tool available: ${block.name}`,
sourceToolAssistantUUID: assistantMessage.uuid,
}),
],
})
return
}
const parsedInput = toolDefinition.inputSchema.safeParse(block.input)
const isConcurrencySafe = parsedInput?.success
? (() => {
try {
return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data))
} catch {
return false
}
})()
: false
this.tools.push({
id: block.id,
block,
assistantMessage,
status: 'queued',
isConcurrencySafe,
pendingProgress: [],
})
void this.processQueue()
}
/**
* Check if a tool can execute based on current concurrency state
*/
private canExecuteTool(isConcurrencySafe: boolean): boolean {
const executingTools = this.tools.filter(t => t.status === 'executing')
return (
executingTools.length === 0 ||
(isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
)
}
/**
* Process the queue, starting tools when concurrency conditions allow
*/
private async processQueue(): Promise<void> {
for (const tool of this.tools) {
if (tool.status !== 'queued') continue
if (this.canExecuteTool(tool.isConcurrencySafe)) {
await this.executeTool(tool)
} else {
// Can't execute this tool yet, and since we need to maintain order for non-concurrent tools, stop here
if (!tool.isConcurrencySafe) break
}
}
}
private createSyntheticErrorMessage(
toolUseId: string,
reason: 'sibling_error' | 'user_interrupted' | 'streaming_fallback',
assistantMessage: AssistantMessage,
): Message {
// For user interruptions (ESC to reject), use REJECT_MESSAGE so the UI shows
// "User rejected edit" instead of "Error editing file"
if (reason === 'user_interrupted') {
return createUserMessage({
content: [
{
type: 'tool_result',
content: withMemoryCorrectionHint(REJECT_MESSAGE),
is_error: true,
tool_use_id: toolUseId,
},
],
toolUseResult: 'User rejected tool use',
sourceToolAssistantUUID: assistantMessage.uuid,
})
}
if (reason === 'streaming_fallback') {
return createUserMessage({
content: [
{
type: 'tool_result',
content:
'<tool_use_error>Error: Streaming fallback - tool execution discarded</tool_use_error>',
is_error: true,
tool_use_id: toolUseId,
},
],
toolUseResult: 'Streaming fallback - tool execution discarded',
sourceToolAssistantUUID: assistantMessage.uuid,
})
}
const desc = this.erroredToolDescription
const msg = desc
? `Cancelled: parallel tool call ${desc} errored`
: 'Cancelled: parallel tool call errored'
return createUserMessage({
content: [
{
type: 'tool_result',
content: `<tool_use_error>${msg}</tool_use_error>`,
is_error: true,
tool_use_id: toolUseId,
},
],
toolUseResult: msg,
sourceToolAssistantUUID: assistantMessage.uuid,
})
}
/**
* Determine why a tool should be cancelled.
*/
private getAbortReason(
tool: TrackedTool,
): 'sibling_error' | 'user_interrupted' | 'streaming_fallback' | null {
if (this.discarded) {
return 'streaming_fallback'
}
if (this.hasErrored) {
return 'sibling_error'
}
if (this.toolUseContext.abortController.signal.aborted) {
// 'interrupt' means the user typed a new message while tools were
// running. Only cancel tools whose interruptBehavior is 'cancel';
// 'block' tools shouldn't reach here (abort isn't fired).
if (this.toolUseContext.abortController.signal.reason === 'interrupt') {
return this.getToolInterruptBehavior(tool) === 'cancel'
? 'user_interrupted'
: null
}
return 'user_interrupted'
}
return null
}
private getToolInterruptBehavior(tool: TrackedTool): 'cancel' | 'block' {
const definition = findToolByName(this.toolDefinitions, tool.block.name)
if (!definition?.interruptBehavior) return 'block'
try {
return definition.interruptBehavior()
} catch {
return 'block'
}
}
private getToolDescription(tool: TrackedTool): string {
const input = tool.block.input as Record<string, unknown> | undefined
const summary = input?.command ?? input?.file_path ?? input?.pattern ?? ''
if (typeof summary === 'string' && summary.length > 0) {
const truncated =
summary.length > 40 ? summary.slice(0, 40) + '\u2026' : summary
return `${tool.block.name}(${truncated})`
}
return tool.block.name
}
private updateInterruptibleState(): void {
const executing = this.tools.filter(t => t.status === 'executing')
this.toolUseContext.setHasInterruptibleToolInProgress?.(
executing.length > 0 &&
executing.every(t => this.getToolInterruptBehavior(t) === 'cancel'),
)
}
/**
* Execute a tool and collect its results
*/
private async executeTool(tool: TrackedTool): Promise<void> {
tool.status = 'executing'
this.toolUseContext.setInProgressToolUseIDs(prev =>
new Set(prev).add(tool.id),
)
this.updateInterruptibleState()
const messages: Message[] = []
const contextModifiers: Array<(context: ToolUseContext) => ToolUseContext> =
[]
const collectResults = async () => {
// If already aborted (by error or user), generate synthetic error block instead of running the tool
const initialAbortReason = this.getAbortReason(tool)
if (initialAbortReason) {
messages.push(
this.createSyntheticErrorMessage(
tool.id,
initialAbortReason,
tool.assistantMessage,
),
)
tool.results = messages
tool.contextModifiers = contextModifiers
tool.status = 'completed'
this.updateInterruptibleState()
return
}
// Per-tool child controller. Lets siblingAbortController kill running
// subprocesses (Bash spawns listen to this signal) when a Bash error
// cascades. Permission-dialog rejection also aborts this controller
// (PermissionContext.ts cancelAndAbort) — that abort must bubble up to
// the query controller so the query loop's post-tool abort check ends
// the turn. Without bubble-up, ExitPlanMode "clear context + auto"
// sends REJECT_MESSAGE to the model instead of aborting (#21056 regression).
const toolAbortController = createChildAbortController(
this.siblingAbortController,
)
toolAbortController.signal.addEventListener(
'abort',
() => {
if (
toolAbortController.signal.reason !== 'sibling_error' &&
!this.toolUseContext.abortController.signal.aborted &&
!this.discarded
) {
this.toolUseContext.abortController.abort(
toolAbortController.signal.reason,
)
}
},
{ once: true },
)
const generator = runToolUse(
tool.block,
tool.assistantMessage,
this.canUseTool,
{ ...this.toolUseContext, abortController: toolAbortController },
)
// Track if this specific tool has produced an error result.
// This prevents the tool from receiving a duplicate "sibling error"
// message when it is the one that caused the error.
let thisToolErrored = false
for await (const update of generator) {
// Check if we were aborted by a sibling tool error or user interruption.
// Only add the synthetic error if THIS tool didn't produce the error.
const abortReason = this.getAbortReason(tool)
if (abortReason && !thisToolErrored) {
messages.push(
this.createSyntheticErrorMessage(
tool.id,
abortReason,
tool.assistantMessage,
),
)
break
}
const isErrorResult =
update.message.type === 'user' &&
Array.isArray(update.message.message!.content) &&
update.message.message!.content.some(
_ => _.type === 'tool_result' && _.is_error === true,
)
if (isErrorResult) {
thisToolErrored = true
// Only Bash errors cancel siblings. Bash commands often have implicit
// dependency chains (e.g. mkdir fails → subsequent commands pointless).
// Read/WebFetch/etc are independent — one failure shouldn't nuke the rest.
if (tool.block.name === BASH_TOOL_NAME) {
this.hasErrored = true
this.erroredToolDescription = this.getToolDescription(tool)
this.siblingAbortController.abort('sibling_error')
}
}
if (update.message) {
// Progress messages go to pendingProgress for immediate yielding
if (update.message.type === 'progress') {
tool.pendingProgress.push(update.message)
// Signal that progress is available
if (this.progressAvailableResolve) {
this.progressAvailableResolve()
this.progressAvailableResolve = undefined
}
} else {
messages.push(update.message)
}
}
if (update.contextModifier) {
contextModifiers.push(update.contextModifier.modifyContext)
}
}
tool.results = messages
tool.contextModifiers = contextModifiers
tool.status = 'completed'
this.updateInterruptibleState()
// NOTE: we currently don't support context modifiers for concurrent
// tools. None are actively being used, but if we want to use
// them in concurrent tools, we need to support that here.
if (!tool.isConcurrencySafe && contextModifiers.length > 0) {
for (const modifier of contextModifiers) {
this.toolUseContext = modifier(this.toolUseContext)
}
}
}
const promise = collectResults()
tool.promise = promise
// Process more queue when done
void promise.finally(() => {
void this.processQueue()
})
}
/**
* Get any completed results that haven't been yielded yet (non-blocking)
* Maintains order where necessary
* Also yields any pending progress messages immediately
*/
*getCompletedResults(): Generator<MessageUpdate, void> {
if (this.discarded) {
return
}
for (const tool of this.tools) {
// Always yield pending progress messages immediately, regardless of tool status
while (tool.pendingProgress.length > 0) {
const progressMessage = tool.pendingProgress.shift()!
yield { message: progressMessage, newContext: this.toolUseContext }
}
if (tool.status === 'yielded') {
continue
}
if (tool.status === 'completed' && tool.results) {
tool.status = 'yielded'
for (const message of tool.results) {
yield { message, newContext: this.toolUseContext }
}
markToolUseAsComplete(this.toolUseContext, tool.id)
} else if (tool.status === 'executing' && !tool.isConcurrencySafe) {
break
}
}
}
/**
* Check if any tool has pending progress messages
*/
private hasPendingProgress(): boolean {
return this.tools.some(t => t.pendingProgress.length > 0)
}
/**
* Wait for remaining tools and yield their results as they complete
* Also yields progress messages as they become available
*/
async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
if (this.discarded) {
return
}
while (this.hasUnfinishedTools()) {
await this.processQueue()
for (const result of this.getCompletedResults()) {
yield result
}
// If we still have executing tools but nothing completed, wait for any to complete
// OR for progress to become available
if (
this.hasExecutingTools() &&
!this.hasCompletedResults() &&
!this.hasPendingProgress()
) {
const executingPromises = this.tools
.filter(t => t.status === 'executing' && t.promise)
.map(t => t.promise!)
// Also wait for progress to become available
const progressPromise = new Promise<void>(resolve => {
this.progressAvailableResolve = resolve
})
if (executingPromises.length > 0) {
await Promise.race([...executingPromises, progressPromise])
}
}
}
for (const result of this.getCompletedResults()) {
yield result
}
endToolBatchSpan(this.turnSpan)
this.turnSpan = null
}
/**
* Check if there are any completed results ready to yield
*/
private hasCompletedResults(): boolean {
return this.tools.some(t => t.status === 'completed')
}
/**
* Check if there are any tools still executing
*/
private hasExecutingTools(): boolean {
return this.tools.some(t => t.status === 'executing')
}
/**
* Check if there are any unfinished tools
*/
private hasUnfinishedTools(): boolean {
return this.tools.some(t => t.status !== 'yielded')
}
/**
* Get the current tool use context (may have been modified by context modifiers)
*/
getUpdatedContext(): ToolUseContext {
return this.toolUseContext
}
}
function markToolUseAsComplete(
toolUseContext: ToolUseContext,
toolUseID: string,
) {
toolUseContext.setInProgressToolUseIDs(prev => {
const next = new Set(prev)
next.delete(toolUseID)
return next
})
}