From ee0d788e5823a8cf43341e582de19b794dcdc661 Mon Sep 17 00:00:00 2001 From: unraid Date: Mon, 27 Apr 2026 10:32:18 +0800 Subject: [PATCH] fix: harden bounded agent communication review fixes CodeRabbit and Codecov surfaced real gaps in UDS framing, peer discovery, mailbox retention, and summary context coverage. This tightens those paths without suppressing review or coverage signals. Constraint: PR #369 must address CodeRabbit and Codecov findings without warning suppression or fake fallbacks Rejected: Suppress Codecov or CodeRabbit warnings | leaves real receive-path and test-isolation gaps Rejected: Add unreachable feature-gated tests | bun:bundle keeps those branches compile-time gated in local tests Confidence: high Scope-risk: moderate Directive: Keep UDS auth-token rejection outside feature flags; do not reintroduce inline token fallbacks Tested: bun test --coverage --coverage-reporter lcov --coverage-dir coverage; bun run test:all; bun run lint; bun run build; bun run build:vite; bun audit; git diff --cached --check Not-tested: Remote Codecov/CodeRabbit refreshed reports until pushed --- .../src/tools/ListPeersTool/ListPeersTool.ts | 16 ++- .../tools/SendMessageTool/SendMessageTool.ts | 40 ++++-- .../udsRecipientSanitization.test.ts | 82 ++++++++++- src/bridge/peerSessions.ts | 32 +++++ src/cli/print.ts | 2 +- .../__tests__/agentSummary.test.ts | 133 ++++++++++++++++++ .../__tests__/summaryContext.test.ts | 17 +++ src/services/AgentSummary/summaryContext.ts | 9 +- src/utils/__tests__/teammateMailbox.test.ts | 70 +++++++++ src/utils/__tests__/udsMessaging.test.ts | 100 ++++++++++++- src/utils/ndjsonFramer.ts | 64 +++++---- src/utils/teammateMailbox.ts | 13 +- src/utils/udsClient.ts | 58 ++------ src/utils/udsMessaging.ts | 93 ++++++------ src/utils/udsResponseReader.ts | 81 +++++++++++ 15 files changed, 657 insertions(+), 153 deletions(-) create mode 100644 src/services/AgentSummary/__tests__/agentSummary.test.ts create mode 100644 src/utils/udsResponseReader.ts diff --git a/packages/builtin-tools/src/tools/ListPeersTool/ListPeersTool.ts b/packages/builtin-tools/src/tools/ListPeersTool/ListPeersTool.ts index 48219edc7..f6c52ea80 100644 --- a/packages/builtin-tools/src/tools/ListPeersTool/ListPeersTool.ts +++ b/packages/builtin-tools/src/tools/ListPeersTool/ListPeersTool.ts @@ -84,19 +84,27 @@ Use this tool to discover messaging targets before sending cross-session message // UDS socket directory. The implementation scans for live sockets // and optionally includes Remote Control bridge peers. const peers: PeerInfo[] = [] + const seen = new Set() + const addPeer = (peer: PeerInfo): void => { + if (seen.has(peer.address)) return + seen.add(peer.address) + peers.push(peer) + } /* eslint-disable @typescript-eslint/no-require-imports */ const udsMessaging = require('src/utils/udsMessaging.js') as typeof import('src/utils/udsMessaging.js') const udsClient = require('src/utils/udsClient.js') as typeof import('src/utils/udsClient.js') + const bridgePeers = + require('src/bridge/peerSessions.js') as typeof import('src/bridge/peerSessions.js') /* eslint-enable @typescript-eslint/no-require-imports */ const messagingSocketPath = udsMessaging.getUdsMessagingSocketPath() if (messagingSocketPath) { // Self entry for reference if (_input.include_self) { - peers.push({ + addPeer({ address: udsMessaging.formatUdsAddress(messagingSocketPath), name: 'self', pid: process.pid, @@ -106,7 +114,7 @@ Use this tool to discover messaging targets before sending cross-session message for (const peer of await udsClient.listPeers()) { if (!peer.messagingSocketPath) continue - peers.push({ + addPeer({ address: udsMessaging.formatUdsAddress(peer.messagingSocketPath), name: peer.name ?? peer.kind, cwd: peer.cwd, @@ -114,6 +122,10 @@ Use this tool to discover messaging targets before sending cross-session message }) } + for (const peer of await bridgePeers.listBridgePeers()) { + addPeer(peer) + } + return { data: { peers }, } diff --git a/packages/builtin-tools/src/tools/SendMessageTool/SendMessageTool.ts b/packages/builtin-tools/src/tools/SendMessageTool/SendMessageTool.ts index 3548544fc..e4868bc53 100644 --- a/packages/builtin-tools/src/tools/SendMessageTool/SendMessageTool.ts +++ b/packages/builtin-tools/src/tools/SendMessageTool/SendMessageTool.ts @@ -672,17 +672,15 @@ export const SendMessageTool: Tool = errorCode: 9, } } - if (feature('UDS_INBOX')) { - if ( - addr.scheme === 'uds' && - (hasInlineUdsToken(input.to) || wasInlineUdsTokenRejected(input)) - ) { - return { - result: false, - message: - 'uds addresses must not include inline auth tokens; use the ListPeers address', - errorCode: 9, - } + if ( + addr.scheme === 'uds' && + (hasInlineUdsToken(input.to) || wasInlineUdsTokenRejected(input)) + ) { + return { + result: false, + message: + 'uds addresses must not include inline auth tokens; use the ListPeers address', + errorCode: 9, } } if (input.to.includes('@')) { @@ -808,6 +806,22 @@ export const SendMessageTool: Tool = }, async call(input, context, canUseTool, assistantMessage) { + if (typeof input.message === 'string') { + const addr = parseAddress(input.to) + if ( + addr.scheme === 'uds' && + (hasInlineUdsToken(input.to) || wasInlineUdsTokenRejected(input)) + ) { + return { + data: { + success: false, + message: + 'uds addresses must not include inline auth tokens; use the ListPeers address', + }, + } + } + } + if (feature('UDS_INBOX') && typeof input.message === 'string') { const addr = parseAddress(input.to) if (addr.scheme === 'bridge') { @@ -827,10 +841,10 @@ export const SendMessageTool: Tool = const { postInterClaudeMessage } = require('src/bridge/peerSessions.js') as typeof import('src/bridge/peerSessions.js') /* eslint-enable @typescript-eslint/no-require-imports */ - const result = await postInterClaudeMessage( + const result = (await postInterClaudeMessage( addr.target, input.message, - ) as { ok: boolean; error?: string } + )) as { ok: boolean; error?: string } const preview = input.summary || truncate(input.message, 50) return { data: { diff --git a/packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts b/packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts index 20124b6c3..a0ab2af0d 100644 --- a/packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts +++ b/packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts @@ -1,8 +1,4 @@ -import { describe, expect, mock, test } from 'bun:test' - -mock.module('bun:bundle', () => ({ - feature: (name: string) => name === 'UDS_INBOX', -})) +import { describe, expect, test } from 'bun:test' describe('SendMessageTool UDS recipient handling', () => { test('redacts inline UDS tokens before classifier and observable paths', async () => { @@ -25,6 +21,62 @@ describe('SendMessageTool UDS recipient handling', () => { ).toBe('to uds:/tmp/peer.sock: hello') }) + test('keeps redacted UDS token rejection through observable backfill', async () => { + const { SendMessageTool } = await import('../SendMessageTool.js') + const observableInput = { + to: 'uds:/tmp/peer.sock#token=secret-token', + message: { + type: 'plan_approval_response', + request_id: 'req-1', + approve: false, + reason: 'needs tests', + }, + } as Record + + SendMessageTool.backfillObservableInput!(observableInput) + + expect(observableInput.to).toBe('uds:/tmp/peer.sock') + expect(observableInput.recipient).toBe('uds:/tmp/peer.sock') + expect(observableInput.type).toBe('plan_approval_response') + expect(observableInput.request_id).toBe('req-1') + expect(observableInput.approve).toBe(false) + expect(observableInput.content).toBe('needs tests') + expect(JSON.stringify(observableInput)).not.toContain('secret-token') + + const result = await SendMessageTool.validateInput!( + observableInput as never, + {} as never, + ) + + expect(result.result).toBe(false) + if (result.result !== false) { + throw new Error('expected validation to reject redacted inline UDS token') + } + expect(result.message).toContain('inline auth tokens') + }) + + test('redacts UDS tokens in structured classifier text', async () => { + const { SendMessageTool } = await import('../SendMessageTool.js') + const to = 'uds:/tmp/peer.sock#token=secret-token' + + expect( + SendMessageTool.toAutoClassifierInput({ + to, + message: { type: 'shutdown_request' }, + }), + ).toBe('shutdown_request to uds:/tmp/peer.sock') + expect( + SendMessageTool.toAutoClassifierInput({ + to, + message: { + type: 'plan_approval_response', + request_id: 'req-1', + approve: true, + }, + }), + ).toBe('plan_approval approve to uds:/tmp/peer.sock') + }) + test('rejects inline UDS tokens during validation', async () => { const { SendMessageTool } = await import('../SendMessageTool.js') const result = await SendMessageTool.validateInput!( @@ -36,6 +88,26 @@ describe('SendMessageTool UDS recipient handling', () => { ) expect(result.result).toBe(false) + if (result.result !== false) { + throw new Error('expected validation to reject inline UDS token') + } + expect(result.message).toContain('inline auth tokens') + expect(JSON.stringify(result)).not.toContain('secret-token') + }) + + test('rejects inline UDS tokens during execution without leaking them', async () => { + const { SendMessageTool } = await import('../SendMessageTool.js') + const result = await SendMessageTool.call( + { + to: 'uds:/tmp/peer.sock#token=secret-token', + message: 'hello', + }, + {} as never, + undefined as never, + undefined as never, + ) + + expect(result.data.success).toBe(false) expect(JSON.stringify(result)).not.toContain('secret-token') }) }) diff --git a/src/bridge/peerSessions.ts b/src/bridge/peerSessions.ts index c194c9b62..716c879de 100644 --- a/src/bridge/peerSessions.ts +++ b/src/bridge/peerSessions.ts @@ -6,6 +6,38 @@ import { getBridgeAccessToken } from './bridgeConfig.js' import { getReplBridgeHandle } from './replBridgeHandle.js' import { toCompatSessionId } from './sessionIdCompat.js' +export type BridgePeerSession = { + address: string + name?: string + cwd?: string + pid?: number +} + +/** + * List locally registered sessions that have published a Remote Control + * session ID. The PID registry is the local source of truth for bridge peers + * already known to this machine; SendMessage can use these bridge: + * addresses when the current process has an active bridge handle. + */ +export async function listBridgePeers(): Promise { + const { listAllLiveSessions } = await import('../utils/udsClient.js') + const sessions = await listAllLiveSessions() + const peers: BridgePeerSession[] = [] + + for (const session of sessions) { + if (session.pid === process.pid || !session.bridgeSessionId) continue + const compatId = toCompatSessionId(session.bridgeSessionId) + peers.push({ + address: `bridge:${compatId}`, + name: session.name ?? session.kind, + cwd: session.cwd, + pid: session.pid, + }) + } + + return peers +} + /** * Send a plain-text message to another Claude session via the bridge API. * diff --git a/src/cli/print.ts b/src/cli/print.ts index 7a291fb50..c4e8c4569 100644 --- a/src/cli/print.ts +++ b/src/cli/print.ts @@ -2773,7 +2773,7 @@ function runHeadlessStreaming( const value = typeof entry.message.data === 'string' ? entry.message.data - : jsonStringify(entry.message) + : jsonStringify(entry.message.data) enqueue({ mode: 'prompt', value, diff --git a/src/services/AgentSummary/__tests__/agentSummary.test.ts b/src/services/AgentSummary/__tests__/agentSummary.test.ts new file mode 100644 index 000000000..0ab070080 --- /dev/null +++ b/src/services/AgentSummary/__tests__/agentSummary.test.ts @@ -0,0 +1,133 @@ +import { + afterAll, + afterEach, + beforeEach, + describe, + expect, + mock, + test, +} from 'bun:test' +import { debugMock } from '../../../../tests/mocks/debug' +import { logMock } from '../../../../tests/mocks/log' +import { asAgentId } from '../../../types/ids.js' +import type { CacheSafeParams } from '../../../utils/forkedAgent.js' + +const transcriptMessages = [ + { type: 'user', message: { content: 'start' }, uuid: 'u1' }, + { + type: 'assistant', + message: { content: [{ type: 'text', text: 'working' }] }, + uuid: 'a1', + }, + { type: 'user', message: { content: 'continue' }, uuid: 'u2' }, +] + +let poorModeActive = false +let forkCalls = 0 +let updateCalls: Array<{ taskId: string; summary: string }> = [] +let transcript = { messages: transcriptMessages } +const sessionStorageSnapshot = { + ...(require('../../../utils/sessionStorage.ts') as Record), +} + +mock.module('src/commands/poor/poorMode.js', () => ({ + isPoorModeActive: () => poorModeActive, +})) + +mock.module('src/tasks/LocalAgentTask/LocalAgentTask.js', () => ({ + updateAgentSummary: (taskId: string, summary: string) => { + updateCalls.push({ taskId, summary }) + }, +})) + +mock.module( + '@claude-code-best/builtin-tools/tools/AgentTool/runAgent.js', + () => ({ + filterIncompleteToolCalls: (messages: T) => messages, + }), +) + +mock.module('src/utils/debug.js', debugMock) +mock.module('src/utils/log.js', logMock) + +mock.module('src/utils/forkedAgent.js', () => ({ + runForkedAgent: async () => { + forkCalls += 1 + return { + messages: [ + { + type: 'assistant', + message: { + content: [{ type: 'text', text: 'Reading udsClient.ts' }], + }, + }, + ], + } + }, +})) + +mock.module('src/utils/sessionStorage.js', () => ({ + ...sessionStorageSnapshot, + getAgentTranscript: async () => transcript, +})) + +afterAll(() => { + mock.module('src/utils/sessionStorage.js', () => + require('../../../utils/sessionStorage.ts'), + ) +}) + +describe('startAgentSummarization', () => { + const realSetTimeout = globalThis.setTimeout + const realClearTimeout = globalThis.clearTimeout + let scheduled: + | ((...args: Parameters void)>) => void) + | undefined + + beforeEach(() => { + poorModeActive = false + forkCalls = 0 + updateCalls = [] + transcript = { messages: transcriptMessages } + scheduled = undefined + globalThis.setTimeout = ((callback: TimerHandler) => { + scheduled = callback as (...args: unknown[]) => void + return 1 as unknown as ReturnType + }) as unknown as typeof setTimeout + globalThis.clearTimeout = (() => undefined) as typeof clearTimeout + }) + + afterEach(() => { + globalThis.setTimeout = realSetTimeout + globalThis.clearTimeout = realClearTimeout + }) + + test('summarizes bounded transcript once and skips unchanged fingerprints', async () => { + const { startAgentSummarization } = await import('../agentSummary.js') + + const handle = startAgentSummarization( + 'task-1', + asAgentId('a0000000000000000'), + { + forkContextMessages: [{ type: 'user', message: { content: 'old' } }], + model: 'claude-test', + } as unknown as CacheSafeParams, + () => undefined, + ) + + expect(typeof scheduled).toBe('function') + await scheduled!() + + expect(forkCalls).toBe(1) + expect(updateCalls).toEqual([ + { taskId: 'task-1', summary: 'Reading udsClient.ts' }, + ]) + + await scheduled!() + + expect(forkCalls).toBe(1) + expect(updateCalls).toHaveLength(1) + + handle.stop() + }) +}) diff --git a/src/services/AgentSummary/__tests__/summaryContext.test.ts b/src/services/AgentSummary/__tests__/summaryContext.test.ts index 3ffa55964..1c701d14b 100644 --- a/src/services/AgentSummary/__tests__/summaryContext.test.ts +++ b/src/services/AgentSummary/__tests__/summaryContext.test.ts @@ -2,6 +2,7 @@ import { describe, expect, test } from 'bun:test' import type { Message } from '../../../types/message.js' import { getSummaryContextFingerprint, + MAX_SUMMARY_CONTEXT_CHARS, selectSummaryContextMessages, } from '../summaryContext.js' @@ -101,6 +102,10 @@ describe('selectSummaryContextMessages', () => { }) describe('getSummaryContextFingerprint', () => { + test('returns null for an empty transcript', () => { + expect(getSummaryContextFingerprint([])).toBeNull() + }) + test('changes when the transcript grows', () => { const messages = [ makeMessage('user', 'u1', 'first prompt'), @@ -129,4 +134,16 @@ describe('getSummaryContextFingerprint', () => { expect(first).not.toBe(second) }) + + test('includes a truncation marker for oversized primitive values', () => { + const prefix = 'x'.repeat(MAX_SUMMARY_CONTEXT_CHARS + 100) + const first = getSummaryContextFingerprint([ + makeMessage('assistant', 'a1', `${prefix}a`), + ]) + const second = getSummaryContextFingerprint([ + makeMessage('assistant', 'a1', `${prefix}b`), + ]) + + expect(first).not.toBe(second) + }) }) diff --git a/src/services/AgentSummary/summaryContext.ts b/src/services/AgentSummary/summaryContext.ts index 4d9f6a6ce..894a21e36 100644 --- a/src/services/AgentSummary/summaryContext.ts +++ b/src/services/AgentSummary/summaryContext.ts @@ -55,10 +55,15 @@ function updateFingerprintHash( if (limit.remaining <= 0) return if (value === null || typeof value !== 'object') { const text = String(value) + const consumed = Math.min(text.length, limit.remaining) + if (consumed <= 0) return hash.update(typeof value) hash.update(':') - hash.update(text.slice(0, limit.remaining)) - limit.remaining -= text.length + hash.update(text.slice(0, consumed)) + if (consumed < text.length) { + hash.update(`#truncated:${text.length}:${text.slice(-64)}`) + } + limit.remaining -= consumed return } if (seen.has(value)) { diff --git a/src/utils/__tests__/teammateMailbox.test.ts b/src/utils/__tests__/teammateMailbox.test.ts index 577c4331f..7f479ed36 100644 --- a/src/utils/__tests__/teammateMailbox.test.ts +++ b/src/utils/__tests__/teammateMailbox.test.ts @@ -3,8 +3,10 @@ import { mkdir, readFile, rm, writeFile } from 'node:fs/promises' import { mkdtempSync } from 'node:fs' import { tmpdir } from 'node:os' import { dirname, join } from 'node:path' +import type { Message } from '../../types/message.js' import { compactMailboxMessages, + getLastPeerDmSummary, getInboxPath, markMessageAsReadByIndex, markMessageAsReadByIdentity, @@ -119,6 +121,23 @@ describe('compactMailboxMessages', () => { ]) }) + test('does not prioritize malformed JSON-like unread messages as protocol', () => { + const compacted = compactMailboxMessages( + [ + message('{not-json', false), + message('regular-1', false), + message('regular-2', false), + ], + { + maxMessages: 1, + maxReadMessages: 0, + maxUnreadProtocolMessages: 10, + }, + ) + + expect(compacted.map(m => m.text)).toEqual(['regular-2']) + }) + test('caps unread protocol messages with an independent bound', () => { const compacted = compactMailboxMessages( Array.from( @@ -308,3 +327,54 @@ describe('teammate mailbox retention', () => { await expect(readMailbox('worker', 'alpha')).rejects.toThrow() }) }) + +describe('getLastPeerDmSummary', () => { + test('extracts the final peer direct-message summary from assistant tool use', () => { + const messages = [ + { type: 'user', message: { content: 'wake up' } }, + { + type: 'assistant', + message: { + content: [ + { + type: 'tool_use', + name: 'SendMessage', + input: { + to: 'worker-1', + message: 'please check the UDS bounds', + summary: 'Checking UDS bounds', + }, + }, + ], + }, + }, + ] as unknown as Message[] + + expect(getLastPeerDmSummary(messages)).toBe( + '[to worker-1] Checking UDS bounds', + ) + }) + + test('stops peer direct-message summary search at the wake-up boundary', () => { + const messages = [ + { + type: 'assistant', + message: { + content: [ + { + type: 'tool_use', + name: 'SendMessage', + input: { + to: 'worker-1', + message: 'old message', + }, + }, + ], + }, + }, + { type: 'user', message: { content: 'new prompt' } }, + ] as unknown as Message[] + + expect(getLastPeerDmSummary(messages)).toBeUndefined() + }) +}) diff --git a/src/utils/__tests__/udsMessaging.test.ts b/src/utils/__tests__/udsMessaging.test.ts index 52cb57abc..ef943cb76 100644 --- a/src/utils/__tests__/udsMessaging.test.ts +++ b/src/utils/__tests__/udsMessaging.test.ts @@ -1,5 +1,13 @@ -import { afterEach, describe, expect, test } from 'bun:test' -import { chmod, mkdir, rm, stat, symlink, unlink } from 'node:fs/promises' +import { afterEach, beforeEach, describe, expect, test } from 'bun:test' +import { + chmod, + mkdir, + mkdtemp, + rm, + stat, + symlink, + unlink, +} from 'node:fs/promises' import { createConnection, createServer } from 'node:net' import { dirname, join } from 'node:path' import { tmpdir } from 'node:os' @@ -15,6 +23,9 @@ import { stopUdsMessaging, } from '../udsMessaging.js' +let previousConfigDir: string | undefined +let tempConfigDir = '' + function socketPath(label: string): string { const suffix = `${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}-${label}` if (process.platform === 'win32') { @@ -52,10 +63,25 @@ async function waitForEnqueues( setOnEnqueue(null) } +beforeEach(async () => { + previousConfigDir = process.env.CLAUDE_CONFIG_DIR + tempConfigDir = await mkdtemp(join(tmpdir(), 'uds-messaging-home-')) + process.env.CLAUDE_CONFIG_DIR = tempConfigDir +}) + afterEach(async () => { setOnEnqueue(null) drainInbox() await stopUdsMessaging() + if (previousConfigDir === undefined) { + delete process.env.CLAUDE_CONFIG_DIR + } else { + process.env.CLAUDE_CONFIG_DIR = previousConfigDir + } + if (tempConfigDir) { + await rm(tempConfigDir, { recursive: true, force: true }) + tempConfigDir = '' + } }) async function closeServer(server: ReturnType): Promise { @@ -133,6 +159,57 @@ describe('UDS inbox retention', () => { expect(drainInbox()).toEqual([]) }) + test('udsClient helpers authenticate through the capability file', async () => { + const path = socketPath('uds-client') + await startUdsMessaging(path, { isExplicit: true }) + const { isPeerAlive, sendToUdsSocket } = await import('../udsClient.js') + + expect(await isPeerAlive(path)).toBe(true) + await waitForEnqueues(1, async () => { + await sendToUdsSocket(path, 'hello from client') + }) + + const drained = drainInbox() + expect(drained).toHaveLength(1) + expect(drained[0]?.message.data).toBe('hello from client') + expect(drained[0]?.message.meta).toBeUndefined() + }) + + test('udsClient peer probe fails closed on oversized pong frames', async () => { + const path = socketPath('uds-client-oversized-pong') + if (process.platform !== 'win32') { + await mkdir(dirname(path), { recursive: true }) + } + const receiver = createServer(socket => { + socket.on('data', () => { + socket.write('x'.repeat(MAX_UDS_FRAME_BYTES + 1)) + }) + }) + await new Promise((resolve, reject) => { + receiver.on('error', reject) + receiver.listen(path, () => resolve()) + }) + + try { + const { isPeerAlive } = await import('../udsClient.js') + expect(await isPeerAlive(path)).toBe(false) + } finally { + await closeServer(receiver) + if (process.platform !== 'win32') { + await unlink(path).catch(() => undefined) + } + } + }) + + test('udsClient send fails closed when no capability token exists', async () => { + const path = socketPath('uds-client-no-token') + const { sendToUdsSocket } = await import('../udsClient.js') + + await expect(sendToUdsSocket(path, 'hello')).rejects.toThrow( + 'No auth token found', + ) + }) + test('drained entries never expose the UDS auth token', async () => { const path = socketPath('strip-token') await startUdsMessaging(path, { isExplicit: true }) @@ -301,5 +378,24 @@ describe('UDS inbox retention', () => { await rm(tempHome, { recursive: true, force: true }) } }) + + test('fails closed when an explicit socket parent is not private', async () => { + const parent = join( + tmpdir(), + `uds-socket-parent-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`, + ) + await mkdir(parent, { recursive: true, mode: 0o755 }) + await chmod(parent, 0o755) + + try { + await expect( + startUdsMessaging(join(parent, 'messaging.sock'), { + isExplicit: true, + }), + ).rejects.toThrow('socket parent permissions are too broad') + } finally { + await rm(parent, { recursive: true, force: true }) + } + }) } }) diff --git a/src/utils/ndjsonFramer.ts b/src/utils/ndjsonFramer.ts index ecaa04dc8..7832e9303 100644 --- a/src/utils/ndjsonFramer.ts +++ b/src/utils/ndjsonFramer.ts @@ -27,6 +27,7 @@ export function attachNdjsonFramer( options: NdjsonFramerOptions = {}, ): void { let buffer = '' + let bufferBytes = 0 const maxFrameBytes = options.maxFrameBytes ?? Number.POSITIVE_INFINITY const rejectOversizedFrame = (bytes: number): void => { @@ -37,41 +38,48 @@ export function attachNdjsonFramer( socket.destroy(error) } + const emitLine = (line: string): void => { + if (!line.trim()) return + try { + onMessage(parse(line)) + } catch { + // Malformed JSON — skip + } + } + socket.on('data', (chunk: Buffer) => { + let start = 0 + for (let index = 0; index < chunk.length; index++) { + if (chunk[index] !== 0x0a) continue + + const segmentBytes = index - start + if ( + Number.isFinite(maxFrameBytes) && + bufferBytes + segmentBytes > maxFrameBytes + ) { + rejectOversizedFrame(bufferBytes + segmentBytes) + return + } + + buffer += chunk.subarray(start, index).toString('utf8') + emitLine(buffer) + buffer = '' + bufferBytes = 0 + start = index + 1 + } + + const tailBytes = chunk.length - start if ( Number.isFinite(maxFrameBytes) && - !chunk.includes(0x0a) && - Buffer.byteLength(buffer, 'utf8') + chunk.byteLength > maxFrameBytes + bufferBytes + tailBytes > maxFrameBytes ) { - rejectOversizedFrame(Buffer.byteLength(buffer, 'utf8') + chunk.byteLength) + rejectOversizedFrame(bufferBytes + tailBytes) return } - buffer += chunk.toString() - const lines = buffer.split('\n') - buffer = lines.pop() ?? '' - - for (const line of lines) { - if (!line.trim()) continue - if ( - Number.isFinite(maxFrameBytes) && - Buffer.byteLength(line, 'utf8') > maxFrameBytes - ) { - rejectOversizedFrame(Buffer.byteLength(line, 'utf8')) - return - } - try { - onMessage(parse(line)) - } catch { - // Malformed JSON — skip - } - } - - if ( - Number.isFinite(maxFrameBytes) && - Buffer.byteLength(buffer, 'utf8') > maxFrameBytes - ) { - rejectOversizedFrame(Buffer.byteLength(buffer, 'utf8')) + if (tailBytes > 0) { + buffer += chunk.subarray(start).toString('utf8') + bufferBytes += tailBytes } }) } diff --git a/src/utils/teammateMailbox.ts b/src/utils/teammateMailbox.ts index 6c18fd721..ad9b22f93 100644 --- a/src/utils/teammateMailbox.ts +++ b/src/utils/teammateMailbox.ts @@ -8,7 +8,7 @@ */ import { randomBytes } from 'crypto' -import { mkdir, readFile, rename, stat, writeFile } from 'fs/promises' +import { mkdir, readFile, rename, stat, unlink, writeFile } from 'fs/promises' import { join } from 'path' import { z } from 'zod/v4' import { TEAMMATE_MESSAGE_TAG } from '../constants/xml.js' @@ -77,7 +77,7 @@ function shouldRetainUnreadAsProtocolMessage( 'type' in (parsed as Record), ) } catch { - return true + return false } } @@ -160,8 +160,13 @@ async function writeMailboxAtomic( ) } const tempPath = `${inboxPath}.${process.pid}.${randomBytes(8).toString('hex')}.tmp` - await writeFile(tempPath, content, 'utf-8') - await rename(tempPath, inboxPath) + try { + await writeFile(tempPath, content, 'utf-8') + await rename(tempPath, inboxPath) + } catch (error) { + await unlink(tempPath).catch(() => undefined) + throw error + } } export function compactMailboxMessages( diff --git a/src/utils/udsClient.ts b/src/utils/udsClient.ts index f08bee696..e33ef3fdb 100644 --- a/src/utils/udsClient.ts +++ b/src/utils/udsClient.ts @@ -17,6 +17,7 @@ import { isProcessRunning } from './genericProcessUtils.js' import { jsonParse, jsonStringify } from './slowOperations.js' import type { SessionKind } from './concurrentSessions.js' import { MAX_UDS_FRAME_BYTES, type UdsMessage } from './udsMessaging.js' +import { attachUdsResponseReader, getChunkBytes } from './udsResponseReader.js' // --------------------------------------------------------------------------- // Types @@ -43,12 +44,6 @@ function getSessionsDir(): string { return join(getClaudeConfigHomeDir(), 'sessions') } -function getChunkBytes(chunk: string | Buffer): number { - return typeof chunk === 'string' - ? Buffer.byteLength(chunk, 'utf8') - : chunk.byteLength -} - // --------------------------------------------------------------------------- // Discovery // --------------------------------------------------------------------------- @@ -218,56 +213,33 @@ export async function sendToUdsSocket( udsMsg.from = getUdsMessagingSocketPath() return new Promise((resolve, reject) => { - let buffer = '' let settled = false + let conn: ReturnType const finish = (error?: Error): void => { if (settled) return settled = true - conn.end() - if (error) reject(error) - else resolve() + if (error) { + conn.destroy(error) + reject(error) + } else { + conn.end() + resolve() + } } - const conn = createConnection(target.socketPath, () => { + + conn = createConnection(target.socketPath, () => { udsMsg.meta = { ...udsMsg.meta, authToken } conn.write(jsonStringify(udsMsg) + '\n', err => { if (err) finish(err) }) }) - conn.on('data', chunk => { - if ( - Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) > - MAX_UDS_FRAME_BYTES - ) { - finish(new Error('UDS response frame exceeded size limit')) - return - } - buffer += chunk.toString() - const lines = buffer.split('\n') - buffer = lines.pop() ?? '' - for (const line of lines) { - if (!line.trim()) continue - let response: UdsMessage - try { - response = jsonParse(line) as UdsMessage - } catch { - continue - } - if (response.type === 'response') { - finish() - return - } - if (response.type === 'error') { - finish(new Error(response.data ?? 'UDS receiver rejected message')) - return - } - } - }) - conn.on('error', err => { - finish( + attachUdsResponseReader(conn, { + maxFrameBytes: MAX_UDS_FRAME_BYTES, + onSettled: finish, + formatSocketError: err => new Error( `Failed to connect to peer at ${target.socketPath}: ${errorMessage(err)}`, ), - ) }) conn.setTimeout(5000, () => { finish(new Error('Connection timed out')) diff --git a/src/utils/udsMessaging.ts b/src/utils/udsMessaging.ts index 7efa7fbf6..94b73dcd6 100644 --- a/src/utils/udsMessaging.ts +++ b/src/utils/udsMessaging.ts @@ -8,7 +8,7 @@ * but can be overridden via --messaging-socket-path. */ -import { createHash, randomBytes } from 'crypto' +import { createHash, randomBytes, timingSafeEqual } from 'crypto' import { createServer, type Server, type Socket } from 'net' import { chmod, @@ -26,6 +26,7 @@ import { logForDebugging } from './debug.js' import { errorMessage } from './errors.js' import { getClaudeConfigHomeDir } from './envUtils.js' import { attachNdjsonFramer } from './ndjsonFramer.js' +import { attachUdsResponseReader } from './udsResponseReader.js' import { logError } from './log.js' import { jsonParse, jsonStringify } from './slowOperations.js' @@ -160,26 +161,36 @@ async function assertPrivateCapabilityDir(dir: string): Promise { stat = await lstat(dir) } + assertPrivateDirectory(stat, dir, 'capability directory') + await chmod(dir, 0o700) +} + +function assertPrivateDirectory( + stat: Awaited>, + dir: string, + label: string, +): void { if (!stat.isDirectory() || stat.isSymbolicLink()) { throw new Error( - `[udsMessaging] capability directory is not a private directory: ${dir}`, + `[udsMessaging] ${label} is not a private directory: ${dir}`, ) } if (process.platform !== 'win32') { - const broadMode = stat.mode & 0o077 + const broadMode = Number(stat.mode) & 0o077 if (broadMode !== 0) { throw new Error( - `[udsMessaging] capability directory permissions are too broad: ${dir}`, + `[udsMessaging] ${label} permissions are too broad: ${dir}`, ) } - if (typeof process.getuid === 'function' && stat.uid !== process.getuid()) { + if ( + typeof process.getuid === 'function' && + Number(stat.uid) !== process.getuid() + ) { throw new Error( - `[udsMessaging] capability directory owner does not match current user: ${dir}`, + `[udsMessaging] ${label} owner does not match current user: ${dir}`, ) } } - - await chmod(dir, 0o700) } async function writePrivateFileExclusive( @@ -204,6 +215,7 @@ async function ensureSocketParent(path: string): Promise { `[udsMessaging] socket parent is not a directory: ${dir}`, ) } + assertPrivateDirectory(stat, dir, 'socket parent') return } catch (error) { if (!isNotFound(error)) throw error @@ -314,7 +326,12 @@ function getMessageAuthToken(message: UdsMessage): string | undefined { } function isAuthorizedMessage(message: UdsMessage): boolean { - return getMessageAuthToken(message) === authToken + const provided = getMessageAuthToken(message) + if (!provided || !authToken) return false + const providedBuffer = Buffer.from(provided, 'utf8') + const expectedBuffer = Buffer.from(authToken, 'utf8') + if (providedBuffer.length !== expectedBuffer.length) return false + return timingSafeEqual(providedBuffer, expectedBuffer) } function writeSocketMessage(socket: Socket, message: UdsMessage): void { @@ -554,20 +571,6 @@ export async function stopUdsMessaging(): Promise { } } -function parseResponseLine(line: string): UdsMessage | null { - try { - return jsonParse(line) as UdsMessage - } catch { - return null - } -} - -function getChunkBytes(chunk: string | Buffer): number { - return typeof chunk === 'string' - ? Buffer.byteLength(chunk, 'utf8') - : chunk.byteLength -} - /** * Send a UDS message to a specific socket path (outbound — used when this * session wants to push a message to a peer's server). @@ -592,46 +595,30 @@ export async function sendUdsMessage( ) return new Promise((resolve, reject) => { - let buffer = '' let settled = false + let conn: ReturnType const finish = (error?: Error): void => { if (settled) return settled = true - conn.end() - if (error) reject(error) - else resolve() + if (error) { + conn.destroy(error) + reject(error) + } else { + conn.end() + resolve() + } } - const conn = createConnection(targetSocketPath, () => { + + conn = createConnection(targetSocketPath, () => { conn.write(jsonStringify(outbound) + '\n', err => { if (err) finish(err) }) }) - conn.on('data', chunk => { - if ( - Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) > - MAX_UDS_FRAME_BYTES - ) { - finish(new Error('UDS response frame exceeded size limit')) - return - } - buffer += chunk.toString() - const lines = buffer.split('\n') - buffer = lines.pop() ?? '' - for (const line of lines) { - if (!line.trim()) continue - const response = parseResponseLine(line) - if (!response) continue - if (response.type === 'response' || response.type === 'pong') { - finish() - return - } - if (response.type === 'error') { - finish(new Error(response.data ?? 'UDS receiver rejected message')) - return - } - } + attachUdsResponseReader(conn, { + maxFrameBytes: MAX_UDS_FRAME_BYTES, + acceptPong: true, + onSettled: finish, }) - conn.on('error', err => finish(err)) // Timeout so we don't hang on unreachable sockets conn.setTimeout(5000, () => { finish(new Error('Connection timed out')) diff --git a/src/utils/udsResponseReader.ts b/src/utils/udsResponseReader.ts new file mode 100644 index 000000000..bb8d21f40 --- /dev/null +++ b/src/utils/udsResponseReader.ts @@ -0,0 +1,81 @@ +import type { Socket } from 'net' +import { errorMessage } from './errors.js' +import { jsonParse } from './slowOperations.js' +import type { UdsMessage } from './udsMessaging.js' + +type UdsResponseReaderOptions = { + maxFrameBytes: number + acceptPong?: boolean + onSettled: (error?: Error) => void + formatSocketError?: (error: unknown) => Error +} + +export function getChunkBytes(chunk: string | Buffer): number { + return typeof chunk === 'string' + ? Buffer.byteLength(chunk, 'utf8') + : chunk.byteLength +} + +function parseResponseLine(line: string): UdsMessage | null { + try { + return jsonParse(line) as UdsMessage + } catch { + return null + } +} + +export function attachUdsResponseReader( + socket: Socket, + options: UdsResponseReaderOptions, +): void { + let buffer = '' + let settled = false + + const finish = (error?: Error): void => { + if (settled) return + settled = true + if (error) { + socket.destroy(error) + } else { + socket.end() + } + options.onSettled(error) + } + + socket.on('data', chunk => { + if ( + Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) > + options.maxFrameBytes + ) { + finish(new Error('UDS response frame exceeded size limit')) + return + } + + buffer += chunk.toString() + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + for (const line of lines) { + if (!line.trim()) continue + const response = parseResponseLine(line) + if (!response) continue + if ( + response.type === 'response' || + (options.acceptPong === true && response.type === 'pong') + ) { + finish() + return + } + if (response.type === 'error') { + finish(new Error(response.data ?? 'UDS receiver rejected message')) + return + } + } + }) + + socket.on('error', error => { + finish( + options.formatSocketError?.(error) ?? + (error instanceof Error ? error : new Error(errorMessage(error))), + ) + }) +}