From 0103f45109ca46d6f2ba1cd4276e1bde8ff6279c Mon Sep 17 00:00:00 2001 From: claude-code-best Date: Fri, 19 Jun 2026 17:13:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=20ACP=20session/dele?= =?UTF-8?q?te=20+=20message-id=20=E4=B8=A4=E4=B8=AA=20UNSTABLE=20RFD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit session/delete(rfds/session-delete.mdx): - sessionCapabilities.delete: {} 能力广告(类型增强写入,SDK 0.19.0 早于该 RFD) - extMethod 钩子路由 session/delete → unstable_deleteSession - 硬删除 .jsonl 文件,ENOENT 视为成功(幂等) - 未知方法抛 RequestError.methodNotFound(JSON-RPC -32601) message-id(rfds/message-id.mdx): - agent_message_chunk / user_message_chunk / agent_thought_chunk 携带 messageId - forwardSessionUpdates 维护 currentAgentMessageId,lazy 生成 UUID - streaming text/thinking chunks 与最终 assistant message 共享同一 ID - replayHistoryMessages per-message 生成 UUID - PromptRequest.messageId → PromptResponse.userMessageId 回显 - tool_call / plan / subagent 不带 messageId(spec 仅规定 chunk 类型) 测试:ACP service 从 176 → 191 (+15) - bridge.test.ts: +9 个 message-id 测试 - agent.test.ts: +6 个 session/delete + userMessageId 测试 - 总测试 5851 → 5866,全通过 审计文档:新增附录 A.2 记录两个 UNSTABLE RFD 实现状态 Co-Authored-By: glm-5.2 --- docs/acp-compliance-audit.md | 25 ++ src/services/acp/__tests__/agent.test.ts | 102 ++++++++ src/services/acp/__tests__/bridge.test.ts | 276 ++++++++++++++++++++++ src/services/acp/agent/AcpAgent.ts | 111 ++++++--- src/services/acp/agent/promptFlow.ts | 12 +- src/services/acp/bridge/forwarding.ts | 53 ++++- src/services/acp/bridge/notifications.ts | 12 + 7 files changed, 560 insertions(+), 31 deletions(-) diff --git a/docs/acp-compliance-audit.md b/docs/acp-compliance-audit.md index ce4e667b0..646369c87 100644 --- a/docs/acp-compliance-audit.md +++ b/docs/acp-compliance-audit.md @@ -804,6 +804,31 @@ | unstable_setSessionModel | unstable_setSessionModel | UNSTABLE | 保留 | | session/update | sessionUpdate (notification) | stable | 保留(usage_update 为 UNSTABLE 但为 interop 保留,见 §4.1) | +## 附录 A.2: UNSTABLE RFD 实现记录(2026-06-19) + +下列 UNSTABLE RFD 不属于严格 v1 合规范围,但为提升 interop 与客户端 UX 已主动实现。所有字段均已存在于 SDK 0.19.0 bundled schema 的 unstable 区段,主要 ACP 客户端(Zed / Cursor / RCS Web UI)均实现。 + +### A.2.1 session/delete(rfds/session-delete.mdx)✅ 已实现 + +- **能力广告**: `sessionCapabilities.delete: {}`(通过类型增强写入,因 SDK 0.19.0 的 SessionCapabilities 类型早于该 RFD)。 +- **方法路由**: SDK 0.19.0 的方法分发器 `default` 分支调用 `agent.extMethod(method, params)`,因此 `session/delete` 通过 extMethod 钩子路由到 `unstable_deleteSession`。 +- **语义**: 硬删除(unlink `~/.claude/projects//.jsonl`)。spec 允许 soft/hard delete,选 hard delete 简化实现。 +- **幂等性**: 删不存在的 session 也成功(ENOENT 视为成功)。 +- **未知方法**: extMethod 对未识别方法抛 `RequestError.methodNotFound(method)`(JSON-RPC -32601)。 +- **测试覆盖**: 6 个测试用例(能力广播 / extMethod 路由 / 幂等 / 内存清理 / 缺 sessionId 拒绝 / 未知方法拒绝)。 + +### A.2.2 message-id(rfds/message-id.mdx)✅ 已实现 + +- **覆盖范围**: `agent_message_chunk` / `user_message_chunk` / `agent_thought_chunk` 三个 chunk update 携带 `messageId`(UUID)。同消息的所有 chunks 共享 ID,不同消息 ID 不同。 +- **不覆盖**: `tool_call` / `tool_call_update` / `plan` 不携带 messageId(spec 仅规定 chunk 类型)。 +- **生成策略**: + - **Assistant 消息**: 在 `forwardSessionUpdates` 中维护 `currentAgentMessageId: string | null`,在 `stream_event` 或 `assistant` SDK 消息(`parent_tool_use_id === null`)首次出现时 lazy 生成 UUID;assistant 消息处理完后 reset 为 null,下一条触发新 UUID。所有 chunks(包括 streaming text/thinking 和最终 assistant message 中的 text/image)共享同一个 ID。 + - **Subagent 消息**(`parent_tool_use_id !== null`): 不追踪 messageId,因 spec 中嵌套 tool 消息不属于顶层 chunk 流。 + - **历史重放**(`replayHistoryMessages`): 每条 replayed user/assistant 消息独立生成 UUID(JSONL 不保留原始 ACP messageId)。 +- **格式**: `crypto.randomUUID()`(不用 Anthropic 的 `message.id` —— 它是 `msg_xxx` 格式,不符合 spec 要求的 UUID)。 +- **PromptRequest.messageId → PromptResponse.userMessageId**: 仅当客户端传入 `params.messageId` 时回显(spec 用词为 MAY 自行生成 → 取保守做法,不自行生成)。 +- **测试覆盖**: 7 个测试用例(assistant chunk / 多消息不同 ID / streaming 共享 ID / tool_call 不带 ID / subagent 不带 ID / replay per-message UUID / replay 字符串内容带 ID)+ 2 个 prompt 回显测试(echo / omit)。 + ## 附录 B: 不修复项及理由 以下 finding 出于技术权衡或非合规范围,暂不修复: diff --git a/src/services/acp/__tests__/agent.test.ts b/src/services/acp/__tests__/agent.test.ts index 75081d7b8..706010d5d 100644 --- a/src/services/acp/__tests__/agent.test.ts +++ b/src/services/acp/__tests__/agent.test.ts @@ -297,6 +297,16 @@ describe('AcpAgent', () => { (res.agentCapabilities?._meta as any)?.claudeCode?.forkSession, ).toBe(true) }) + + test('advertises session/delete capability per session-delete RFD', async () => { + // UNSTABLE per session-delete.mdx: capability-gated session/delete. + // SDK 0.19.0's SessionCapabilities type predates this field; we advertise + // it via type augmentation so clients implementing the RFD can find it. + const agent = new AcpAgent(makeConn()) + const res = await agent.initialize({} as any) + const caps = res.agentCapabilities?.sessionCapabilities as any + expect(caps.delete).toEqual({}) + }) }) describe('authenticate', () => { @@ -634,6 +644,54 @@ describe('AcpAgent', () => { }) }) + describe('deleteSession (session/delete via extMethod)', () => { + test('extMethod routes session/delete to unstable_deleteSession', async () => { + const agent = new AcpAgent(makeConn()) + const result = await agent.extMethod('session/delete', { + sessionId: 'nonexistent-sid-for-delete-test', + }) + // Idempotent: returns empty object even when session doesn't exist + expect(result).toEqual({}) + }) + + test('rejects session/delete without sessionId', async () => { + const agent = new AcpAgent(makeConn()) + await expect(agent.extMethod('session/delete', {})).rejects.toThrow( + 'non-empty sessionId', + ) + }) + + test('rejects unknown methods with methodNotFound-style error', async () => { + const agent = new AcpAgent(makeConn()) + await expect( + agent.extMethod('totally/unknown/method', {}), + ).rejects.toThrow() + }) + + test('unstable_deleteSession is idempotent for missing session', async () => { + const agent = new AcpAgent(makeConn()) + // No file exists for this ID; both calls must succeed (per spec §Semantics) + const r1 = await agent.unstable_deleteSession({ + sessionId: 'definitely-missing-id-1', + }) + const r2 = await agent.unstable_deleteSession({ + sessionId: 'definitely-missing-id-2', + }) + expect(r1).toEqual({}) + expect(r2).toEqual({}) + }) + + test('unstable_deleteSession tears down active in-memory session', async () => { + const agent = new AcpAgent(makeConn()) + const { sessionId } = await agent.newSession({ cwd: '/tmp' } as any) + expect(agent.sessions.has(sessionId)).toBe(true) + // deleteSession should remove the in-memory entry even though there's + // no on-disk file (newSession doesn't persist immediately in tests). + await agent.unstable_deleteSession({ sessionId }) + expect(agent.sessions.has(sessionId)).toBe(false) + }) + }) + describe('setSessionModel', () => { test('updates model on queryEngine', async () => { const agent = new AcpAgent(makeConn()) @@ -716,6 +774,50 @@ describe('AcpAgent', () => { }) }) + describe('prompt userMessageId echo (message-id RFD)', () => { + test('echoes client-supplied messageId as userMessageId', async () => { + // Per rfds/message-id.mdx: when the client provides a `messageId` on + // PromptRequest, the Agent echoes it back as `userMessageId`. + const agent = new AcpAgent(makeConn()) + const { sessionId } = await agent.newSession({ cwd: '/tmp' } as any) + ;(forwardSessionUpdates as ReturnType).mockResolvedValueOnce( + { + stopReason: 'end_turn', + usage: { + inputTokens: 10, + outputTokens: 5, + cachedReadTokens: 0, + cachedWriteTokens: 0, + }, + }, + ) + const clientMessageId = '11111111-2222-3333-4444-555555555555' + const res = await agent.prompt({ + sessionId, + prompt: [{ type: 'text', text: 'hello' }], + messageId: clientMessageId, + } as any) + expect((res as any).userMessageId).toBe(clientMessageId) + }) + + test('omits userMessageId when client does not supply messageId', async () => { + // Per rfds/message-id.mdx: agent MAY self-generate; we take the + // conservative approach of staying silent when the client didn't ask. + const agent = new AcpAgent(makeConn()) + const { sessionId } = await agent.newSession({ cwd: '/tmp' } as any) + ;(forwardSessionUpdates as ReturnType).mockResolvedValueOnce( + { + stopReason: 'end_turn', + }, + ) + const res = await agent.prompt({ + sessionId, + prompt: [{ type: 'text', text: 'hello' }], + } as any) + expect((res as any).userMessageId).toBeUndefined() + }) + }) + describe('prompt error handling', () => { test('returns cancelled when session was cancelled during prompt', async () => { const agent = new AcpAgent(makeConn()) diff --git a/src/services/acp/__tests__/bridge.test.ts b/src/services/acp/__tests__/bridge.test.ts index b6da315e8..76e82bd20 100644 --- a/src/services/acp/__tests__/bridge.test.ts +++ b/src/services/acp/__tests__/bridge.test.ts @@ -5,6 +5,7 @@ import { toolUpdateFromEditToolResponse, forwardSessionUpdates, nextSdkMessageOrAbort, + replayHistoryMessages, } from '../bridge.js' import { promptToQueryInput } from '../promptConversion.js' import { markdownEscape, toDisplayPath } from '../utils.js' @@ -1595,3 +1596,278 @@ describe('forwardSessionUpdates', () => { ).rejects.toThrow('stream exploded') }) }) + +// ── message-id (RFD) ────────────────────────────────────────────── +// +// Per rfds/message-id.mdx: agent_message_chunk / user_message_chunk / +// agent_thought_chunk MUST carry a `messageId` (UUID). All chunks of the +// same message share the ID; different messages get different IDs. tool_call +// and plan updates are out of scope and must NOT carry messageId. + +describe('forwardSessionUpdates — message-id (RFD)', () => { + test('attaches messageId to assistant text chunk (non-streaming)', async () => { + const conn = makeConn() + const msgs: SDKMessage[] = [ + { + type: 'assistant', + parent_tool_use_id: null, + message: { + content: [{ type: 'text', text: 'Hello!' }], + role: 'assistant', + }, + } as unknown as SDKMessage, + ] + await forwardSessionUpdates( + 's1', + makeStream(msgs), + conn, + new AbortController().signal, + {}, + ) + const calls = (conn.sessionUpdate as ReturnType).mock.calls + const chunkCall = calls.find( + (c: unknown[]) => + ((c[0] as Record>).update ?? {})[ + 'sessionUpdate' + ] === 'agent_message_chunk', + ) + expect(chunkCall).toBeDefined() + const update = (chunkCall![0] as { update: Record }).update + expect(typeof update.messageId).toBe('string') + // UUID format check (v4-ish, 36 chars with hyphens) + expect(update.messageId).toMatch( + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, + ) + }) + + test('different assistant messages get different messageIds', async () => { + const conn = makeConn() + const msgs: SDKMessage[] = [ + { + type: 'assistant', + parent_tool_use_id: null, + message: { + content: [{ type: 'text', text: 'First' }], + role: 'assistant', + }, + } as unknown as SDKMessage, + { + type: 'assistant', + parent_tool_use_id: null, + message: { + content: [{ type: 'text', text: 'Second' }], + role: 'assistant', + }, + } as unknown as SDKMessage, + ] + await forwardSessionUpdates( + 's1', + makeStream(msgs), + conn, + new AbortController().signal, + {}, + ) + const calls = (conn.sessionUpdate as ReturnType).mock.calls + const chunkCalls = calls.filter( + (c: unknown[]) => + ((c[0] as Record>).update ?? {})[ + 'sessionUpdate' + ] === 'agent_message_chunk', + ) + expect(chunkCalls.length).toBe(2) + const id1 = (chunkCalls[0][0] as { update: { messageId: string } }).update + .messageId + const id2 = (chunkCalls[1][0] as { update: { messageId: string } }).update + .messageId + expect(id1).not.toBe(id2) + }) + + test('streaming text + thinking chunks share the same messageId', async () => { + // stream_events for a single assistant message (text + thinking) must + // share one messageId, then the assistant message itself reuses it. + const conn = makeConn() + const msgs: SDKMessage[] = [ + { + type: 'stream_event', + parent_tool_use_id: null, + event: { + type: 'content_block_start', + content_block: { type: 'thinking', thinking: '' }, + }, + } as unknown as SDKMessage, + { + type: 'stream_event', + parent_tool_use_id: null, + event: { + type: 'content_block_delta', + delta: { type: 'thinking_delta', thinking: 'reasoning...' }, + }, + } as unknown as SDKMessage, + { + type: 'stream_event', + parent_tool_use_id: null, + event: { + type: 'content_block_start', + content_block: { type: 'text', text: '' }, + }, + } as unknown as SDKMessage, + { + type: 'stream_event', + parent_tool_use_id: null, + event: { + type: 'content_block_delta', + delta: { type: 'text_delta', text: 'Answer' }, + }, + } as unknown as SDKMessage, + { + type: 'assistant', + parent_tool_use_id: null, + message: { + content: [ + { type: 'thinking', thinking: 'reasoning...' }, + { type: 'text', text: 'Answer' }, + ], + role: 'assistant', + }, + } as unknown as SDKMessage, + ] + await forwardSessionUpdates( + 's1', + makeStream(msgs), + conn, + new AbortController().signal, + {}, + ) + const calls = (conn.sessionUpdate as ReturnType).mock.calls + const chunkCalls = calls + .map(c => (c[0] as { update: Record }).update) + .filter( + u => + u.sessionUpdate === 'agent_message_chunk' || + u.sessionUpdate === 'agent_thought_chunk', + ) + // streamingActive filters out the duplicate text/thinking from the + // final assistant message, so we only get the 4 streaming chunks here. + expect(chunkCalls.length).toBeGreaterThanOrEqual(4) + const ids = chunkCalls.map(u => u.messageId) + const uniqueIds = new Set(ids) + expect(uniqueIds.size).toBe(1) + expect(typeof ids[0]).toBe('string') + }) + + test('tool_call chunk does NOT carry messageId', async () => { + const conn = makeConn() + const msgs: SDKMessage[] = [ + { + type: 'assistant', + parent_tool_use_id: null, + message: { + content: [ + { + type: 'tool_use', + id: 'tu_mid', + name: 'Bash', + input: { command: 'ls' }, + }, + ], + role: 'assistant', + }, + } as unknown as SDKMessage, + ] + await forwardSessionUpdates( + 's1', + makeStream(msgs), + conn, + new AbortController().signal, + {}, + ) + const calls = (conn.sessionUpdate as ReturnType).mock.calls + const toolCall = calls + .map(c => (c[0] as { update: Record }).update) + .find(u => u.sessionUpdate === 'tool_call') + expect(toolCall).toBeDefined() + expect(toolCall!.messageId).toBeUndefined() + }) + + test('subagent stream_events do not carry messageId (parent_tool_use_id !== null)', async () => { + // Subagent messages are nested inside a tool call; per our scope decision + // we only track top-level messageIds, so subagent chunks must NOT carry one. + const conn = makeConn() + const msgs: SDKMessage[] = [ + { + type: 'stream_event', + parent_tool_use_id: 'tu_subagent', + event: { + type: 'content_block_delta', + delta: { type: 'text_delta', text: 'subagent text' }, + }, + } as unknown as SDKMessage, + ] + await forwardSessionUpdates( + 's1', + makeStream(msgs), + conn, + new AbortController().signal, + {}, + ) + const calls = (conn.sessionUpdate as ReturnType).mock.calls + const chunkCall = calls + .map(c => (c[0] as { update: Record }).update) + .find(u => u.sessionUpdate === 'agent_message_chunk') + expect(chunkCall).toBeDefined() + expect(chunkCall!.messageId).toBeUndefined() + }) +}) + +// ── replayHistoryMessages — message-id (RFD) ───────────────────── + +describe('replayHistoryMessages — message-id (RFD)', () => { + test('each replayed message gets its own messageId', async () => { + const conn = makeConn() + const messages: Array> = [ + { + type: 'user', + message: { content: [{ type: 'text', text: 'question' }] }, + }, + { + type: 'assistant', + message: { content: [{ type: 'text', text: 'answer' }] }, + }, + { + type: 'assistant', + message: { content: [{ type: 'text', text: 'follow-up' }] }, + }, + ] + await replayHistoryMessages('s1', messages, conn, {}, undefined, undefined) + const calls = (conn.sessionUpdate as ReturnType).mock.calls + const chunkCalls = calls + .map(c => (c[0] as { update: Record }).update) + .filter( + u => + u.sessionUpdate === 'agent_message_chunk' || + u.sessionUpdate === 'user_message_chunk', + ) + expect(chunkCalls.length).toBe(3) + const ids = chunkCalls.map(u => u.messageId) + expect(ids.every(id => typeof id === 'string')).toBe(true) + // All three IDs should be distinct (one per message) + expect(new Set(ids).size).toBe(3) + }) + + test('replayed string-content message carries messageId', async () => { + const conn = makeConn() + const messages: Array> = [ + { + type: 'assistant', + message: { content: 'plain string reply' }, + }, + ] + await replayHistoryMessages('s1', messages, conn, {}, undefined, undefined) + const calls = (conn.sessionUpdate as ReturnType).mock.calls + const chunkCall = calls + .map(c => (c[0] as { update: Record }).update) + .find(u => u.sessionUpdate === 'agent_message_chunk') + expect(chunkCall).toBeDefined() + expect(typeof chunkCall!.messageId).toBe('string') + }) +}) diff --git a/src/services/acp/agent/AcpAgent.ts b/src/services/acp/agent/AcpAgent.ts index 453115fb4..c57350a74 100644 --- a/src/services/acp/agent/AcpAgent.ts +++ b/src/services/acp/agent/AcpAgent.ts @@ -14,39 +14,42 @@ * `./index.js` imports those side-effect modules so the prototype is fully * populated before any AcpAgent instance is constructed. */ -import type { - Agent, - AgentSideConnection, - InitializeRequest, - InitializeResponse, - AuthenticateRequest, - AuthenticateResponse, - NewSessionRequest, - NewSessionResponse, - PromptRequest, - PromptResponse, - CancelNotification, - LoadSessionRequest, - LoadSessionResponse, - ListSessionsRequest, - ListSessionsResponse, - ResumeSessionRequest, - ResumeSessionResponse, - ForkSessionRequest, - ForkSessionResponse, - CloseSessionRequest, - CloseSessionResponse, - SetSessionModeRequest, - SetSessionModeResponse, - SetSessionModelRequest, - SetSessionModelResponse, - SetSessionConfigOptionRequest, - SetSessionConfigOptionResponse, - ClientCapabilities, +import { + RequestError, + type Agent, + type AgentSideConnection, + type InitializeRequest, + type InitializeResponse, + type AuthenticateRequest, + type AuthenticateResponse, + type NewSessionRequest, + type NewSessionResponse, + type PromptRequest, + type PromptResponse, + type CancelNotification, + type LoadSessionRequest, + type LoadSessionResponse, + type ListSessionsRequest, + type ListSessionsResponse, + type ResumeSessionRequest, + type ResumeSessionResponse, + type ForkSessionRequest, + type ForkSessionResponse, + type CloseSessionRequest, + type CloseSessionResponse, + type SetSessionModeRequest, + type SetSessionModeResponse, + type SetSessionModelRequest, + type SetSessionModelResponse, + type SetSessionConfigOptionRequest, + type SetSessionConfigOptionResponse, + type ClientCapabilities, } from '@agentclientprotocol/sdk' +import { unlink } from 'node:fs/promises' import type { Message } from '../../../types/message.js' import { sanitizeTitle } from '../utils.js' import { listSessionsImpl } from '../../../utils/listSessionsImpl.js' +import { resolveSessionFilePath } from '../../../utils/sessionStoragePortable.js' import type { AcpSession } from './sessionTypes.js' // ── Agent class ─────────────────────────────────────────────────── @@ -123,6 +126,11 @@ export class AcpAgent implements Agent { list: {}, resume: {}, close: {}, + // UNSTABLE per session-delete.mdx: capability-gated session/delete. + // SDK 0.19.0's SessionCapabilities type predates this field — clients + // implementing the RFD read `sessionCapabilities.delete`, so we + // advertise it at the standard path via type augmentation. + ...({ delete: {} } as { delete: Record }), }, }, } @@ -236,6 +244,51 @@ export class AcpAgent implements Agent { return {} } + // ── deleteSession (UNSTABLE, routed via extMethod) ────────────── + + async unstable_deleteSession(params: { + sessionId: string + }): Promise> { + // Per session-delete.mdx §Semantics: idempotent — deleting a session + // that doesn't exist (or was already deleted) MUST succeed silently. + const resolved = await resolveSessionFilePath(params.sessionId) + if (resolved) { + try { + await unlink(resolved.filePath) + } catch (err) { + // ENOENT is fine — file was concurrently removed. Any other error + // (EACCES, EISDIR, ...) we propagate. + if ((err as NodeJS.ErrnoException).code !== 'ENOENT') throw err + } + } + // Tear down in-memory session if present (e.g., session was active in + // another connection). teardownSession is a no-op if not loaded. + if (this.sessions.has(params.sessionId)) { + await this.teardownSession(params.sessionId) + } + return {} + } + + // ── extMethod (UNSTABLE method dispatch) ──────────────────────── + + async extMethod( + method: string, + params: Record, + ): Promise> { + // SDK 0.19.0 routes unknown methods here (acp.js:139 default branch). + // We surface UNSTABLE capabilities that the SDK hasn't typed yet. + if (method === 'session/delete') { + const sessionId = params.sessionId + if (typeof sessionId !== 'string' || sessionId.length === 0) { + throw new Error('session/delete requires a non-empty sessionId') + } + return this.unstable_deleteSession({ sessionId }) + } + // Unknown method — surface as JSON-RPC methodNotFound so clients see a + // standard error code (-32601) rather than a generic internal error. + throw RequestError.methodNotFound(method) + } + // ── cancel ──────────────────────────────────────────────────── async cancel(params: CancelNotification): Promise { diff --git a/src/services/acp/agent/promptFlow.ts b/src/services/acp/agent/promptFlow.ts index 981ebfbdb..32419bea1 100644 --- a/src/services/acp/agent/promptFlow.ts +++ b/src/services/acp/agent/promptFlow.ts @@ -43,6 +43,12 @@ async function prompt( throw new Error(`Session ${params.sessionId} not found`) } + // Per message-id.mdx RFD: if the client supplied a `messageId` on the + // PromptRequest, echo it back as `userMessageId` to confirm receipt. + // We do not self-generate when omitted — the spec makes that optional and + // staying quiet avoids surfacing IDs the client didn't ask to track. + const userMessageId = params.messageId ?? undefined + // Extract text/image content from the prompt const promptInput = promptToQueryInput(params.prompt) @@ -134,6 +140,7 @@ async function prompt( return { stopReason, usage: usagePayload, + ...(userMessageId ? { userMessageId } : {}), _meta: { claudeCode: { usage: usagePayload, @@ -141,7 +148,10 @@ async function prompt( }, } } - return { stopReason } + return { + stopReason, + ...(userMessageId ? { userMessageId } : {}), + } } catch (err: unknown) { // Treat AbortError / cancellation-shaped errors as a turn cancellation // regardless of the session.cancelled flag, to close the race window diff --git a/src/services/acp/bridge/forwarding.ts b/src/services/acp/bridge/forwarding.ts index c3c2b2c41..cd154ecab 100644 --- a/src/services/acp/bridge/forwarding.ts +++ b/src/services/acp/bridge/forwarding.ts @@ -5,6 +5,7 @@ // the notification converters, accumulating usage and mapping stop reasons. // `replayHistoryMessages` replays stored user/assistant history through // `toAcpNotifications`. +import { randomUUID } from 'node:crypto' import type { AgentSideConnection, ClientCapabilities, @@ -75,6 +76,16 @@ export async function forwardSessionUpdates( let lastContextWindowSize = 200000 let streamingActive = false + // Per message-id.mdx RFD: UUID identifying the current top-level agent + // message. Lazily generated on the first sign of a new assistant message + // (stream_event or assistant SDK message with parent_tool_use_id === null) + // and reset to null after the assistant message completes. All chunks of + // the same message share this ID; different messages get different IDs. + // Subagent messages (parent_tool_use_id !== null) don't get a tracked ID + // — they're nested inside a tool call and don't surface as top-level + // agent_message_chunk / agent_thought_chunk in the spec sense. + let currentAgentMessageId: string | null = null + try { while (!abortSignal.aborted) { // Race the next message against the abort signal so we unblock @@ -197,6 +208,19 @@ export async function forwardSessionUpdates( // ── Stream events ────────────────────────────────────────── case 'stream_event': { + // Lazily generate messageId for top-level assistant messages on the + // first stream event. Subagent stream_events (parent_tool_use_id !== + // null) don't get a tracked ID — they're nested inside a tool call. + const streamParent = msg.parent_tool_use_id + if (streamParent === null && currentAgentMessageId === null) { + currentAgentMessageId = randomUUID() + } + // After the lazy-generate above, currentAgentMessageId is a string + // when streamParent === null. Capture it locally so TS narrows. + const streamMessageId = + streamParent === null + ? (currentAgentMessageId ?? undefined) + : undefined const notifications = streamEventToAcpNotifications( msg, sessionId, @@ -205,6 +229,7 @@ export async function forwardSessionUpdates( { clientCapabilities, cwd, + messageId: streamMessageId, }, ) for (const notification of notifications) { @@ -245,6 +270,18 @@ export async function forwardSessionUpdates( lastAssistantModel = assistantMsg.model } + // Reuse the messageId already generated for stream_events of this + // top-level message; if no stream_events arrived (e.g., synthetic + // message without streaming), generate one now. Then reset so the + // next assistant message gets a fresh UUID. + let assistantMessageId: string | undefined + if (parentToolUseId === null) { + if (currentAgentMessageId === null) { + currentAgentMessageId = randomUUID() + } + assistantMessageId = currentAgentMessageId + } + const notifications = assistantMessageToAcpNotifications( msg, sessionId, @@ -255,11 +292,18 @@ export async function forwardSessionUpdates( cwd, parentToolUseId, streamingActive, + messageId: assistantMessageId, }, ) for (const notification of notifications) { await conn.sessionUpdate(notification) } + + // Reset after the top-level assistant message completes so the + // next message (stream_event or assistant) gets a fresh UUID. + if (parentToolUseId === null) { + currentAgentMessageId = null + } break } @@ -384,11 +428,16 @@ export async function replayHistoryMessages( if (typeof content === 'string') { if (!content.trim()) continue + // Per message-id.mdx RFD: each replayed message gets its own UUID + // (JSONL doesn't preserve the original ACP messageId). All chunks of + // the same message share the ID. + const replayMessageId = randomUUID() await conn.sessionUpdate({ sessionId, update: { sessionUpdate: role === 'assistant' ? 'agent_message_chunk' : 'user_message_chunk', + ...(replayMessageId ? { messageId: replayMessageId } : {}), content: { type: 'text', text: content }, }, }) @@ -396,6 +445,8 @@ export async function replayHistoryMessages( } if (Array.isArray(content)) { + // Each replayed message gets a fresh UUID independent of other messages. + const replayMessageId = randomUUID() const notifications = toAcpNotifications( content as Array>, role, @@ -403,7 +454,7 @@ export async function replayHistoryMessages( toolUseCache, conn, undefined, - { clientCapabilities, cwd }, + { clientCapabilities, cwd, messageId: replayMessageId }, ) for (const notification of notifications) { await conn.sessionUpdate(notification) diff --git a/src/services/acp/bridge/notifications.ts b/src/services/acp/bridge/notifications.ts index 6932943fc..f4e7a9592 100644 --- a/src/services/acp/bridge/notifications.ts +++ b/src/services/acp/bridge/notifications.ts @@ -40,6 +40,10 @@ export function toAcpNotifications( parentToolUseId?: string | null cwd?: string streamingActive?: boolean + // Per message-id.mdx RFD: UUID identifying the message these chunks + // belong to. Only attached to agent_message_chunk / user_message_chunk / + // agent_thought_chunk (spec scope). undefined = omit the field entirely. + messageId?: string }, ): SessionNotification[] { const output: SessionNotification[] = [] @@ -55,6 +59,7 @@ export function toAcpNotifications( update = { sessionUpdate: role === 'assistant' ? 'agent_message_chunk' : 'user_message_chunk', + ...(options?.messageId ? { messageId: options.messageId } : {}), content: { type: 'text', text }, } break @@ -65,6 +70,7 @@ export function toAcpNotifications( const thinking = (chunk.thinking as string) ?? '' update = { sessionUpdate: 'agent_thought_chunk', + ...(options?.messageId ? { messageId: options.messageId } : {}), content: { type: 'text', text: thinking }, } break @@ -78,6 +84,7 @@ export function toAcpNotifications( role === 'assistant' ? 'agent_message_chunk' : 'user_message_chunk', + ...(options?.messageId ? { messageId: options.messageId } : {}), content: { type: 'image', data: source.data as string, @@ -237,6 +244,7 @@ export function assistantMessageToAcpNotifications( parentToolUseId?: string | null cwd?: string streamingActive?: boolean + messageId?: string }, ): SessionNotification[] { const message = msg.message as Record | undefined @@ -255,6 +263,7 @@ export function assistantMessageToAcpNotifications( sessionId, update: { sessionUpdate: 'agent_message_chunk', + ...(options?.messageId ? { messageId: options.messageId } : {}), content: { type: 'text', text: content }, }, }, @@ -296,6 +305,7 @@ export function streamEventToAcpNotifications( clientCapabilities?: ClientCapabilities cwd?: string streamingActive?: boolean + messageId?: string }, ): SessionNotification[] { const event = (msg as unknown as { event: Record }).event @@ -318,6 +328,7 @@ export function streamEventToAcpNotifications( clientCapabilities: options?.clientCapabilities, parentToolUseId: msg.parent_tool_use_id as string | null | undefined, cwd: options?.cwd, + messageId: options?.messageId, }, ) } @@ -335,6 +346,7 @@ export function streamEventToAcpNotifications( clientCapabilities: options?.clientCapabilities, parentToolUseId: msg.parent_tool_use_id as string | null | undefined, cwd: options?.cwd, + messageId: options?.messageId, }, ) }