mirror of
https://github.com/claude-code-best/claude-code.git
synced 2026-06-25 09:35:52 +00:00
refactor: 拆分 3 个过大 ACP 文件为模块化子文件(每个 <500 行)
通过 4 阶段 workflow(分析 → 计划 → 重构 → 验证)将 3 个超大的 ACP 源文件拆分为 28 个模块化子文件,每个均严格小于 500 行,且完整保留 所有公共 API(barrel 模式重导出)。 变更概要: - packages/acp-link/src/server.ts: 1800 → 20 行(barrel),新增 11 个子模块 (server/types、payload-decode、permission-mode、runtime-state、dispatch、 handlers-agent、handlers-session、acp-client、client-send、start-server、 testing-internals) - src/services/acp/agent.ts: 1297 → 33 行(barrel),新增 9 个子模块 (agent/AcpAgent、sessionTypes、permissionMode、configOptions、promptQueue、 internalAccessors、createSessionMethod、sessionLifecycle、promptFlow) - src/services/acp/bridge.ts: 1516 → 29 行(barrel),新增 8 个子模块 (bridge/types、paths、contentBlocks、toolInfo、toolResults、modelUsage、 notifications、forwarding) 验证: - bun run precheck 全通过(typecheck + lint + 5851 tests) - ACP service tests: 176 pass / 0 fail - ACP link tests: 47 pass / 0 fail - 所有外部消费者(entry.ts、permissions.ts、__tests__/)的 import 路径不变 - 测试文件零修改 迁移计划详见 docs/acp-refactor-plan.md。 Co-Authored-By: glm-5.2 <zai-org@claude-code-best.win>
This commit is contained in:
402
src/services/acp/bridge/forwarding.ts
Normal file
402
src/services/acp/bridge/forwarding.ts
Normal file
@@ -0,0 +1,402 @@
|
||||
// Stream replay + forwarding loop.
|
||||
//
|
||||
// `nextSdkMessageOrAbort` races an async generator against an AbortSignal.
|
||||
// `forwardSessionUpdates` consumes the SDKMessage stream and dispatches into
|
||||
// the notification converters, accumulating usage and mapping stop reasons.
|
||||
// `replayHistoryMessages` replays stored user/assistant history through
|
||||
// `toAcpNotifications`.
|
||||
import type {
|
||||
AgentSideConnection,
|
||||
ClientCapabilities,
|
||||
StopReason,
|
||||
} from '@agentclientprotocol/sdk'
|
||||
import type { SDKMessage } from '../../../entrypoints/sdk/coreTypes.generated.js'
|
||||
import type { BridgeSDKMessage, SessionUsage, ToolUseCache } from './types.js'
|
||||
import {
|
||||
assistantMessageToAcpNotifications,
|
||||
streamEventToAcpNotifications,
|
||||
toAcpNotifications,
|
||||
} from './notifications.js'
|
||||
import { getMatchingModelUsage } from './modelUsage.js'
|
||||
|
||||
// Top-level const alias retained from the original module. Only the
|
||||
// forwardSessionUpdates default branch and replayHistoryMessages reference it.
|
||||
const logger: { debug: (...args: unknown[]) => void } = console
|
||||
|
||||
export function nextSdkMessageOrAbort(
|
||||
sdkMessages: AsyncGenerator<SDKMessage, void, unknown>,
|
||||
abortSignal: AbortSignal,
|
||||
): Promise<IteratorResult<SDKMessage, void>> {
|
||||
if (abortSignal.aborted) {
|
||||
return Promise.resolve({ done: true, value: undefined })
|
||||
}
|
||||
let abortHandler: (() => void) | undefined
|
||||
const abortPromise = new Promise<IteratorResult<SDKMessage, void>>(
|
||||
resolve => {
|
||||
abortHandler = () => resolve({ done: true, value: undefined })
|
||||
abortSignal.addEventListener('abort', abortHandler, { once: true })
|
||||
},
|
||||
)
|
||||
return Promise.race([sdkMessages.next(), abortPromise]).finally(() => {
|
||||
if (abortHandler) {
|
||||
abortSignal.removeEventListener('abort', abortHandler)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// ── Main forwarding function ──────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Iterates SDKMessages from QueryEngine.submitMessage(), converts each
|
||||
* to ACP SessionUpdate notifications, and sends them via conn.sessionUpdate().
|
||||
* Returns the final StopReason and accumulated usage for the prompt turn.
|
||||
*/
|
||||
export async function forwardSessionUpdates(
|
||||
sessionId: string,
|
||||
sdkMessages: AsyncGenerator<SDKMessage, void, unknown>,
|
||||
conn: AgentSideConnection,
|
||||
abortSignal: AbortSignal,
|
||||
toolUseCache: ToolUseCache,
|
||||
clientCapabilities?: ClientCapabilities,
|
||||
cwd?: string,
|
||||
isCancelled?: () => boolean,
|
||||
): Promise<{ stopReason: StopReason; usage?: SessionUsage }> {
|
||||
let stopReason: StopReason = 'end_turn'
|
||||
const accumulatedUsage: SessionUsage = {
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
cachedReadTokens: 0,
|
||||
cachedWriteTokens: 0,
|
||||
}
|
||||
|
||||
// Track last assistant usage/model for context window size computation
|
||||
let lastAssistantTotalUsage: number | null = null
|
||||
let lastAssistantModel: string | null = null
|
||||
let lastContextWindowSize = 200000
|
||||
let streamingActive = false
|
||||
|
||||
try {
|
||||
while (!abortSignal.aborted) {
|
||||
// Race the next message against the abort signal so we unblock
|
||||
// immediately when cancelled, even if the generator is waiting for
|
||||
// a slow API response.
|
||||
const nextResult = await nextSdkMessageOrAbort(sdkMessages, abortSignal)
|
||||
if (nextResult.done || abortSignal.aborted) break
|
||||
const rawMsg = nextResult.value
|
||||
if (rawMsg == null) continue
|
||||
const msg = rawMsg as BridgeSDKMessage
|
||||
|
||||
switch (msg.type) {
|
||||
// ── System messages ────────────────────────────────────────
|
||||
case 'system': {
|
||||
const subtype = msg.subtype
|
||||
|
||||
if (subtype === 'compact_boundary') {
|
||||
// Reset assistant usage tracking after compaction
|
||||
lastAssistantTotalUsage = 0
|
||||
// NOTE: usage_update is an UNSTABLE SessionUpdate discriminator (not in
|
||||
// stable v1 schema). Token/cost info has no v1-stable carrier; we drop
|
||||
// it from session/update and rely on PromptResponse._meta for clients
|
||||
// that need it (see audit §4.1).
|
||||
await conn.sessionUpdate({
|
||||
sessionId,
|
||||
update: {
|
||||
sessionUpdate: 'agent_message_chunk',
|
||||
content: { type: 'text', text: '\n\nCompacting completed.' },
|
||||
},
|
||||
})
|
||||
}
|
||||
// api_retry, local_command_output — skip for now
|
||||
break
|
||||
}
|
||||
|
||||
// ── Result messages ────────────────────────────────────────
|
||||
case 'result': {
|
||||
const usage = msg.usage
|
||||
|
||||
if (usage) {
|
||||
accumulatedUsage.inputTokens += usage.input_tokens ?? 0
|
||||
accumulatedUsage.outputTokens += usage.output_tokens ?? 0
|
||||
accumulatedUsage.cachedReadTokens +=
|
||||
usage.cache_read_input_tokens ?? 0
|
||||
accumulatedUsage.cachedWriteTokens +=
|
||||
usage.cache_creation_input_tokens ?? 0
|
||||
}
|
||||
|
||||
// Resolve context window size from modelUsage via prefix matching
|
||||
const modelUsage = msg.modelUsage
|
||||
if (modelUsage && lastAssistantModel) {
|
||||
const match = getMatchingModelUsage(modelUsage, lastAssistantModel)
|
||||
if (match?.contextWindow) {
|
||||
lastContextWindowSize = match.contextWindow
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: usage_update was removed — it is an UNSTABLE SessionUpdate
|
||||
// discriminator not present in the stable v1 schema (audit §4.1). Token
|
||||
// and cost information is returned via PromptResponse._meta.claudeCode.usage
|
||||
// instead.
|
||||
|
||||
// Determine stop reason
|
||||
const subtype = msg.subtype
|
||||
const isError = msg.is_error
|
||||
|
||||
if (abortSignal.aborted) {
|
||||
stopReason = 'cancelled'
|
||||
break
|
||||
}
|
||||
|
||||
switch (subtype) {
|
||||
case 'success': {
|
||||
// Map Anthropic stop_reason to ACP StopReason. Branches are mutually
|
||||
// exclusive so a max_tokens termination that is also flagged isError
|
||||
// no longer silently flips to end_turn (audit §3.3, §3.4). refusal
|
||||
// (safety refusal) is a first-class ACP stop reason that must surface
|
||||
// to the client instead of being misreported as end_turn.
|
||||
const r = msg.stop_reason
|
||||
if (r === 'max_tokens') stopReason = 'max_tokens'
|
||||
else if (r === 'refusal') stopReason = 'refusal'
|
||||
else stopReason = 'end_turn'
|
||||
if (isError) stopReason = 'end_turn'
|
||||
break
|
||||
}
|
||||
case 'error_during_execution': {
|
||||
// Mutually exclusive: max_tokens wins when reported, otherwise the
|
||||
// error path falls back to end_turn. Avoids the prior two-if
|
||||
// sequence that overwrote max_tokens with end_turn (audit §3.4).
|
||||
if (msg.stop_reason === 'max_tokens') {
|
||||
stopReason = 'max_tokens'
|
||||
} else {
|
||||
stopReason = 'end_turn'
|
||||
}
|
||||
break
|
||||
}
|
||||
case 'error_max_budget_usd':
|
||||
case 'error_max_turns':
|
||||
case 'error_max_structured_output_retries':
|
||||
if (isError) {
|
||||
stopReason = 'max_turn_requests'
|
||||
} else {
|
||||
stopReason = 'max_turn_requests'
|
||||
}
|
||||
break
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// ── Stream events ──────────────────────────────────────────
|
||||
case 'stream_event': {
|
||||
const notifications = streamEventToAcpNotifications(
|
||||
msg,
|
||||
sessionId,
|
||||
toolUseCache,
|
||||
conn,
|
||||
{
|
||||
clientCapabilities,
|
||||
cwd,
|
||||
},
|
||||
)
|
||||
for (const notification of notifications) {
|
||||
await conn.sessionUpdate(notification)
|
||||
}
|
||||
streamingActive = true
|
||||
break
|
||||
}
|
||||
|
||||
// ── Assistant messages ─────────────────────────────────────
|
||||
case 'assistant': {
|
||||
// Track last assistant total usage for context window computation
|
||||
// (only for top-level messages, not subagents)
|
||||
const assistantMsg = msg.message
|
||||
const parentToolUseId = msg.parent_tool_use_id
|
||||
if (assistantMsg?.usage && parentToolUseId === null) {
|
||||
const usage = assistantMsg.usage
|
||||
lastAssistantTotalUsage =
|
||||
(typeof usage.input_tokens === 'number'
|
||||
? usage.input_tokens
|
||||
: 0) +
|
||||
(typeof usage.output_tokens === 'number'
|
||||
? usage.output_tokens
|
||||
: 0) +
|
||||
(typeof usage.cache_read_input_tokens === 'number'
|
||||
? usage.cache_read_input_tokens
|
||||
: 0) +
|
||||
(typeof usage.cache_creation_input_tokens === 'number'
|
||||
? usage.cache_creation_input_tokens
|
||||
: 0)
|
||||
}
|
||||
// Track the current top-level model for context window size lookup
|
||||
if (
|
||||
parentToolUseId === null &&
|
||||
assistantMsg?.model &&
|
||||
assistantMsg.model !== '<synthetic>'
|
||||
) {
|
||||
lastAssistantModel = assistantMsg.model
|
||||
}
|
||||
|
||||
const notifications = assistantMessageToAcpNotifications(
|
||||
msg,
|
||||
sessionId,
|
||||
toolUseCache,
|
||||
conn,
|
||||
{
|
||||
clientCapabilities,
|
||||
cwd,
|
||||
parentToolUseId,
|
||||
streamingActive,
|
||||
},
|
||||
)
|
||||
for (const notification of notifications) {
|
||||
await conn.sessionUpdate(notification)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// ── User messages ──────────────────────────────────────────
|
||||
case 'user': {
|
||||
// In ACP mode, user messages from replay/synthetic are typically skipped
|
||||
// The client already knows what the user sent
|
||||
break
|
||||
}
|
||||
|
||||
// ── Progress messages ──────────────────────────────────────
|
||||
case 'progress': {
|
||||
const progressData = msg.data
|
||||
if (!progressData) break
|
||||
|
||||
// Handle agent/skill subagent progress
|
||||
const progressType = progressData.type
|
||||
if (
|
||||
progressType === 'agent_progress' ||
|
||||
progressType === 'skill_progress'
|
||||
) {
|
||||
const progressMessage = progressData.message
|
||||
if (progressMessage) {
|
||||
const content = progressMessage.content as
|
||||
| Array<Record<string, unknown>>
|
||||
| undefined
|
||||
if (content) {
|
||||
for (const block of content) {
|
||||
if (block.type === 'text') {
|
||||
await conn.sessionUpdate({
|
||||
sessionId,
|
||||
update: {
|
||||
sessionUpdate: 'agent_message_chunk',
|
||||
content: { type: 'text', text: block.text as string },
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// ── Tool use summary ───────────────────────────────────────
|
||||
case 'tool_use_summary': {
|
||||
// Skip for now — not critical for basic functionality
|
||||
break
|
||||
}
|
||||
|
||||
// ── Attachment messages ────────────────────────────────────
|
||||
case 'attachment': {
|
||||
// Skip — handled by QueryEngine internally
|
||||
break
|
||||
}
|
||||
|
||||
// ── Compact boundary ───────────────────────────────────────
|
||||
case 'compact_boundary': {
|
||||
lastAssistantTotalUsage = 0
|
||||
// NOTE: usage_update removed — UNSTABLE discriminator, not in v1 stable
|
||||
// schema (audit §4.1). Token info flows through PromptResponse._meta.
|
||||
await conn.sessionUpdate({
|
||||
sessionId,
|
||||
update: {
|
||||
sessionUpdate: 'agent_message_chunk',
|
||||
content: { type: 'text', text: '\n\nCompacting completed.' },
|
||||
},
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
default:
|
||||
logger.debug('Ignoring unknown SDK message type')
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If we exited the loop because abort fired or cancel was requested, return cancelled
|
||||
if (abortSignal.aborted || isCancelled?.()) {
|
||||
return { stopReason: 'cancelled', usage: accumulatedUsage }
|
||||
}
|
||||
} catch (err: unknown) {
|
||||
if (abortSignal.aborted) {
|
||||
return { stopReason: 'cancelled', usage: accumulatedUsage }
|
||||
}
|
||||
throw err
|
||||
}
|
||||
|
||||
return { stopReason, usage: accumulatedUsage }
|
||||
}
|
||||
|
||||
// ── History replay ──────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Replays conversation history messages to the ACP client as session updates.
|
||||
* Used when resuming/loading a session to show the client the previous conversation.
|
||||
*/
|
||||
export async function replayHistoryMessages(
|
||||
sessionId: string,
|
||||
messages: Array<Record<string, unknown>>,
|
||||
conn: AgentSideConnection,
|
||||
toolUseCache: ToolUseCache,
|
||||
clientCapabilities?: ClientCapabilities,
|
||||
cwd?: string,
|
||||
): Promise<void> {
|
||||
for (const rawMsg of messages) {
|
||||
const msg = rawMsg as BridgeSDKMessage
|
||||
// Skip non-conversation messages
|
||||
if (msg.type !== 'user' && msg.type !== 'assistant') {
|
||||
logger.debug('Ignoring unknown SDK message type')
|
||||
continue
|
||||
}
|
||||
// Skip meta messages (synthetic continuation prompts)
|
||||
if (msg.isMeta === true) continue
|
||||
|
||||
const messageData = msg.message
|
||||
const content = messageData?.content
|
||||
if (!content) continue
|
||||
|
||||
const role: 'assistant' | 'user' =
|
||||
msg.type === 'assistant' ? 'assistant' : 'user'
|
||||
|
||||
if (typeof content === 'string') {
|
||||
if (!content.trim()) continue
|
||||
await conn.sessionUpdate({
|
||||
sessionId,
|
||||
update: {
|
||||
sessionUpdate:
|
||||
role === 'assistant' ? 'agent_message_chunk' : 'user_message_chunk',
|
||||
content: { type: 'text', text: content },
|
||||
},
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
if (Array.isArray(content)) {
|
||||
const notifications = toAcpNotifications(
|
||||
content as Array<Record<string, unknown>>,
|
||||
role,
|
||||
sessionId,
|
||||
toolUseCache,
|
||||
conn,
|
||||
undefined,
|
||||
{ clientCapabilities, cwd },
|
||||
)
|
||||
for (const notification of notifications) {
|
||||
await conn.sessionUpdate(notification)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user