diff --git a/packages/builtin-tools/src/tools/ListPeersTool/ListPeersTool.ts b/packages/builtin-tools/src/tools/ListPeersTool/ListPeersTool.ts index 48219edc7..f6c52ea80 100644 --- a/packages/builtin-tools/src/tools/ListPeersTool/ListPeersTool.ts +++ b/packages/builtin-tools/src/tools/ListPeersTool/ListPeersTool.ts @@ -84,19 +84,27 @@ Use this tool to discover messaging targets before sending cross-session message // UDS socket directory. The implementation scans for live sockets // and optionally includes Remote Control bridge peers. const peers: PeerInfo[] = [] + const seen = new Set() + const addPeer = (peer: PeerInfo): void => { + if (seen.has(peer.address)) return + seen.add(peer.address) + peers.push(peer) + } /* eslint-disable @typescript-eslint/no-require-imports */ const udsMessaging = require('src/utils/udsMessaging.js') as typeof import('src/utils/udsMessaging.js') const udsClient = require('src/utils/udsClient.js') as typeof import('src/utils/udsClient.js') + const bridgePeers = + require('src/bridge/peerSessions.js') as typeof import('src/bridge/peerSessions.js') /* eslint-enable @typescript-eslint/no-require-imports */ const messagingSocketPath = udsMessaging.getUdsMessagingSocketPath() if (messagingSocketPath) { // Self entry for reference if (_input.include_self) { - peers.push({ + addPeer({ address: udsMessaging.formatUdsAddress(messagingSocketPath), name: 'self', pid: process.pid, @@ -106,7 +114,7 @@ Use this tool to discover messaging targets before sending cross-session message for (const peer of await udsClient.listPeers()) { if (!peer.messagingSocketPath) continue - peers.push({ + addPeer({ address: udsMessaging.formatUdsAddress(peer.messagingSocketPath), name: peer.name ?? peer.kind, cwd: peer.cwd, @@ -114,6 +122,10 @@ Use this tool to discover messaging targets before sending cross-session message }) } + for (const peer of await bridgePeers.listBridgePeers()) { + addPeer(peer) + } + return { data: { peers }, } diff --git a/packages/builtin-tools/src/tools/SendMessageTool/SendMessageTool.ts b/packages/builtin-tools/src/tools/SendMessageTool/SendMessageTool.ts index 3548544fc..e4868bc53 100644 --- a/packages/builtin-tools/src/tools/SendMessageTool/SendMessageTool.ts +++ b/packages/builtin-tools/src/tools/SendMessageTool/SendMessageTool.ts @@ -672,17 +672,15 @@ export const SendMessageTool: Tool = errorCode: 9, } } - if (feature('UDS_INBOX')) { - if ( - addr.scheme === 'uds' && - (hasInlineUdsToken(input.to) || wasInlineUdsTokenRejected(input)) - ) { - return { - result: false, - message: - 'uds addresses must not include inline auth tokens; use the ListPeers address', - errorCode: 9, - } + if ( + addr.scheme === 'uds' && + (hasInlineUdsToken(input.to) || wasInlineUdsTokenRejected(input)) + ) { + return { + result: false, + message: + 'uds addresses must not include inline auth tokens; use the ListPeers address', + errorCode: 9, } } if (input.to.includes('@')) { @@ -808,6 +806,22 @@ export const SendMessageTool: Tool = }, async call(input, context, canUseTool, assistantMessage) { + if (typeof input.message === 'string') { + const addr = parseAddress(input.to) + if ( + addr.scheme === 'uds' && + (hasInlineUdsToken(input.to) || wasInlineUdsTokenRejected(input)) + ) { + return { + data: { + success: false, + message: + 'uds addresses must not include inline auth tokens; use the ListPeers address', + }, + } + } + } + if (feature('UDS_INBOX') && typeof input.message === 'string') { const addr = parseAddress(input.to) if (addr.scheme === 'bridge') { @@ -827,10 +841,10 @@ export const SendMessageTool: Tool = const { postInterClaudeMessage } = require('src/bridge/peerSessions.js') as typeof import('src/bridge/peerSessions.js') /* eslint-enable @typescript-eslint/no-require-imports */ - const result = await postInterClaudeMessage( + const result = (await postInterClaudeMessage( addr.target, input.message, - ) as { ok: boolean; error?: string } + )) as { ok: boolean; error?: string } const preview = input.summary || truncate(input.message, 50) return { data: { diff --git a/packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts b/packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts index 20124b6c3..a0ab2af0d 100644 --- a/packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts +++ b/packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts @@ -1,8 +1,4 @@ -import { describe, expect, mock, test } from 'bun:test' - -mock.module('bun:bundle', () => ({ - feature: (name: string) => name === 'UDS_INBOX', -})) +import { describe, expect, test } from 'bun:test' describe('SendMessageTool UDS recipient handling', () => { test('redacts inline UDS tokens before classifier and observable paths', async () => { @@ -25,6 +21,62 @@ describe('SendMessageTool UDS recipient handling', () => { ).toBe('to uds:/tmp/peer.sock: hello') }) + test('keeps redacted UDS token rejection through observable backfill', async () => { + const { SendMessageTool } = await import('../SendMessageTool.js') + const observableInput = { + to: 'uds:/tmp/peer.sock#token=secret-token', + message: { + type: 'plan_approval_response', + request_id: 'req-1', + approve: false, + reason: 'needs tests', + }, + } as Record + + SendMessageTool.backfillObservableInput!(observableInput) + + expect(observableInput.to).toBe('uds:/tmp/peer.sock') + expect(observableInput.recipient).toBe('uds:/tmp/peer.sock') + expect(observableInput.type).toBe('plan_approval_response') + expect(observableInput.request_id).toBe('req-1') + expect(observableInput.approve).toBe(false) + expect(observableInput.content).toBe('needs tests') + expect(JSON.stringify(observableInput)).not.toContain('secret-token') + + const result = await SendMessageTool.validateInput!( + observableInput as never, + {} as never, + ) + + expect(result.result).toBe(false) + if (result.result !== false) { + throw new Error('expected validation to reject redacted inline UDS token') + } + expect(result.message).toContain('inline auth tokens') + }) + + test('redacts UDS tokens in structured classifier text', async () => { + const { SendMessageTool } = await import('../SendMessageTool.js') + const to = 'uds:/tmp/peer.sock#token=secret-token' + + expect( + SendMessageTool.toAutoClassifierInput({ + to, + message: { type: 'shutdown_request' }, + }), + ).toBe('shutdown_request to uds:/tmp/peer.sock') + expect( + SendMessageTool.toAutoClassifierInput({ + to, + message: { + type: 'plan_approval_response', + request_id: 'req-1', + approve: true, + }, + }), + ).toBe('plan_approval approve to uds:/tmp/peer.sock') + }) + test('rejects inline UDS tokens during validation', async () => { const { SendMessageTool } = await import('../SendMessageTool.js') const result = await SendMessageTool.validateInput!( @@ -36,6 +88,26 @@ describe('SendMessageTool UDS recipient handling', () => { ) expect(result.result).toBe(false) + if (result.result !== false) { + throw new Error('expected validation to reject inline UDS token') + } + expect(result.message).toContain('inline auth tokens') + expect(JSON.stringify(result)).not.toContain('secret-token') + }) + + test('rejects inline UDS tokens during execution without leaking them', async () => { + const { SendMessageTool } = await import('../SendMessageTool.js') + const result = await SendMessageTool.call( + { + to: 'uds:/tmp/peer.sock#token=secret-token', + message: 'hello', + }, + {} as never, + undefined as never, + undefined as never, + ) + + expect(result.data.success).toBe(false) expect(JSON.stringify(result)).not.toContain('secret-token') }) }) diff --git a/src/bridge/peerSessions.ts b/src/bridge/peerSessions.ts index c194c9b62..716c879de 100644 --- a/src/bridge/peerSessions.ts +++ b/src/bridge/peerSessions.ts @@ -6,6 +6,38 @@ import { getBridgeAccessToken } from './bridgeConfig.js' import { getReplBridgeHandle } from './replBridgeHandle.js' import { toCompatSessionId } from './sessionIdCompat.js' +export type BridgePeerSession = { + address: string + name?: string + cwd?: string + pid?: number +} + +/** + * List locally registered sessions that have published a Remote Control + * session ID. The PID registry is the local source of truth for bridge peers + * already known to this machine; SendMessage can use these bridge: + * addresses when the current process has an active bridge handle. + */ +export async function listBridgePeers(): Promise { + const { listAllLiveSessions } = await import('../utils/udsClient.js') + const sessions = await listAllLiveSessions() + const peers: BridgePeerSession[] = [] + + for (const session of sessions) { + if (session.pid === process.pid || !session.bridgeSessionId) continue + const compatId = toCompatSessionId(session.bridgeSessionId) + peers.push({ + address: `bridge:${compatId}`, + name: session.name ?? session.kind, + cwd: session.cwd, + pid: session.pid, + }) + } + + return peers +} + /** * Send a plain-text message to another Claude session via the bridge API. * diff --git a/src/cli/print.ts b/src/cli/print.ts index 7a291fb50..c4e8c4569 100644 --- a/src/cli/print.ts +++ b/src/cli/print.ts @@ -2773,7 +2773,7 @@ function runHeadlessStreaming( const value = typeof entry.message.data === 'string' ? entry.message.data - : jsonStringify(entry.message) + : jsonStringify(entry.message.data) enqueue({ mode: 'prompt', value, diff --git a/src/services/AgentSummary/__tests__/agentSummary.test.ts b/src/services/AgentSummary/__tests__/agentSummary.test.ts new file mode 100644 index 000000000..0ab070080 --- /dev/null +++ b/src/services/AgentSummary/__tests__/agentSummary.test.ts @@ -0,0 +1,133 @@ +import { + afterAll, + afterEach, + beforeEach, + describe, + expect, + mock, + test, +} from 'bun:test' +import { debugMock } from '../../../../tests/mocks/debug' +import { logMock } from '../../../../tests/mocks/log' +import { asAgentId } from '../../../types/ids.js' +import type { CacheSafeParams } from '../../../utils/forkedAgent.js' + +const transcriptMessages = [ + { type: 'user', message: { content: 'start' }, uuid: 'u1' }, + { + type: 'assistant', + message: { content: [{ type: 'text', text: 'working' }] }, + uuid: 'a1', + }, + { type: 'user', message: { content: 'continue' }, uuid: 'u2' }, +] + +let poorModeActive = false +let forkCalls = 0 +let updateCalls: Array<{ taskId: string; summary: string }> = [] +let transcript = { messages: transcriptMessages } +const sessionStorageSnapshot = { + ...(require('../../../utils/sessionStorage.ts') as Record), +} + +mock.module('src/commands/poor/poorMode.js', () => ({ + isPoorModeActive: () => poorModeActive, +})) + +mock.module('src/tasks/LocalAgentTask/LocalAgentTask.js', () => ({ + updateAgentSummary: (taskId: string, summary: string) => { + updateCalls.push({ taskId, summary }) + }, +})) + +mock.module( + '@claude-code-best/builtin-tools/tools/AgentTool/runAgent.js', + () => ({ + filterIncompleteToolCalls: (messages: T) => messages, + }), +) + +mock.module('src/utils/debug.js', debugMock) +mock.module('src/utils/log.js', logMock) + +mock.module('src/utils/forkedAgent.js', () => ({ + runForkedAgent: async () => { + forkCalls += 1 + return { + messages: [ + { + type: 'assistant', + message: { + content: [{ type: 'text', text: 'Reading udsClient.ts' }], + }, + }, + ], + } + }, +})) + +mock.module('src/utils/sessionStorage.js', () => ({ + ...sessionStorageSnapshot, + getAgentTranscript: async () => transcript, +})) + +afterAll(() => { + mock.module('src/utils/sessionStorage.js', () => + require('../../../utils/sessionStorage.ts'), + ) +}) + +describe('startAgentSummarization', () => { + const realSetTimeout = globalThis.setTimeout + const realClearTimeout = globalThis.clearTimeout + let scheduled: + | ((...args: Parameters void)>) => void) + | undefined + + beforeEach(() => { + poorModeActive = false + forkCalls = 0 + updateCalls = [] + transcript = { messages: transcriptMessages } + scheduled = undefined + globalThis.setTimeout = ((callback: TimerHandler) => { + scheduled = callback as (...args: unknown[]) => void + return 1 as unknown as ReturnType + }) as unknown as typeof setTimeout + globalThis.clearTimeout = (() => undefined) as typeof clearTimeout + }) + + afterEach(() => { + globalThis.setTimeout = realSetTimeout + globalThis.clearTimeout = realClearTimeout + }) + + test('summarizes bounded transcript once and skips unchanged fingerprints', async () => { + const { startAgentSummarization } = await import('../agentSummary.js') + + const handle = startAgentSummarization( + 'task-1', + asAgentId('a0000000000000000'), + { + forkContextMessages: [{ type: 'user', message: { content: 'old' } }], + model: 'claude-test', + } as unknown as CacheSafeParams, + () => undefined, + ) + + expect(typeof scheduled).toBe('function') + await scheduled!() + + expect(forkCalls).toBe(1) + expect(updateCalls).toEqual([ + { taskId: 'task-1', summary: 'Reading udsClient.ts' }, + ]) + + await scheduled!() + + expect(forkCalls).toBe(1) + expect(updateCalls).toHaveLength(1) + + handle.stop() + }) +}) diff --git a/src/services/AgentSummary/__tests__/summaryContext.test.ts b/src/services/AgentSummary/__tests__/summaryContext.test.ts index 3ffa55964..1c701d14b 100644 --- a/src/services/AgentSummary/__tests__/summaryContext.test.ts +++ b/src/services/AgentSummary/__tests__/summaryContext.test.ts @@ -2,6 +2,7 @@ import { describe, expect, test } from 'bun:test' import type { Message } from '../../../types/message.js' import { getSummaryContextFingerprint, + MAX_SUMMARY_CONTEXT_CHARS, selectSummaryContextMessages, } from '../summaryContext.js' @@ -101,6 +102,10 @@ describe('selectSummaryContextMessages', () => { }) describe('getSummaryContextFingerprint', () => { + test('returns null for an empty transcript', () => { + expect(getSummaryContextFingerprint([])).toBeNull() + }) + test('changes when the transcript grows', () => { const messages = [ makeMessage('user', 'u1', 'first prompt'), @@ -129,4 +134,16 @@ describe('getSummaryContextFingerprint', () => { expect(first).not.toBe(second) }) + + test('includes a truncation marker for oversized primitive values', () => { + const prefix = 'x'.repeat(MAX_SUMMARY_CONTEXT_CHARS + 100) + const first = getSummaryContextFingerprint([ + makeMessage('assistant', 'a1', `${prefix}a`), + ]) + const second = getSummaryContextFingerprint([ + makeMessage('assistant', 'a1', `${prefix}b`), + ]) + + expect(first).not.toBe(second) + }) }) diff --git a/src/services/AgentSummary/summaryContext.ts b/src/services/AgentSummary/summaryContext.ts index 4d9f6a6ce..894a21e36 100644 --- a/src/services/AgentSummary/summaryContext.ts +++ b/src/services/AgentSummary/summaryContext.ts @@ -55,10 +55,15 @@ function updateFingerprintHash( if (limit.remaining <= 0) return if (value === null || typeof value !== 'object') { const text = String(value) + const consumed = Math.min(text.length, limit.remaining) + if (consumed <= 0) return hash.update(typeof value) hash.update(':') - hash.update(text.slice(0, limit.remaining)) - limit.remaining -= text.length + hash.update(text.slice(0, consumed)) + if (consumed < text.length) { + hash.update(`#truncated:${text.length}:${text.slice(-64)}`) + } + limit.remaining -= consumed return } if (seen.has(value)) { diff --git a/src/utils/__tests__/teammateMailbox.test.ts b/src/utils/__tests__/teammateMailbox.test.ts index 577c4331f..7f479ed36 100644 --- a/src/utils/__tests__/teammateMailbox.test.ts +++ b/src/utils/__tests__/teammateMailbox.test.ts @@ -3,8 +3,10 @@ import { mkdir, readFile, rm, writeFile } from 'node:fs/promises' import { mkdtempSync } from 'node:fs' import { tmpdir } from 'node:os' import { dirname, join } from 'node:path' +import type { Message } from '../../types/message.js' import { compactMailboxMessages, + getLastPeerDmSummary, getInboxPath, markMessageAsReadByIndex, markMessageAsReadByIdentity, @@ -119,6 +121,23 @@ describe('compactMailboxMessages', () => { ]) }) + test('does not prioritize malformed JSON-like unread messages as protocol', () => { + const compacted = compactMailboxMessages( + [ + message('{not-json', false), + message('regular-1', false), + message('regular-2', false), + ], + { + maxMessages: 1, + maxReadMessages: 0, + maxUnreadProtocolMessages: 10, + }, + ) + + expect(compacted.map(m => m.text)).toEqual(['regular-2']) + }) + test('caps unread protocol messages with an independent bound', () => { const compacted = compactMailboxMessages( Array.from( @@ -308,3 +327,54 @@ describe('teammate mailbox retention', () => { await expect(readMailbox('worker', 'alpha')).rejects.toThrow() }) }) + +describe('getLastPeerDmSummary', () => { + test('extracts the final peer direct-message summary from assistant tool use', () => { + const messages = [ + { type: 'user', message: { content: 'wake up' } }, + { + type: 'assistant', + message: { + content: [ + { + type: 'tool_use', + name: 'SendMessage', + input: { + to: 'worker-1', + message: 'please check the UDS bounds', + summary: 'Checking UDS bounds', + }, + }, + ], + }, + }, + ] as unknown as Message[] + + expect(getLastPeerDmSummary(messages)).toBe( + '[to worker-1] Checking UDS bounds', + ) + }) + + test('stops peer direct-message summary search at the wake-up boundary', () => { + const messages = [ + { + type: 'assistant', + message: { + content: [ + { + type: 'tool_use', + name: 'SendMessage', + input: { + to: 'worker-1', + message: 'old message', + }, + }, + ], + }, + }, + { type: 'user', message: { content: 'new prompt' } }, + ] as unknown as Message[] + + expect(getLastPeerDmSummary(messages)).toBeUndefined() + }) +}) diff --git a/src/utils/__tests__/udsMessaging.test.ts b/src/utils/__tests__/udsMessaging.test.ts index 52cb57abc..ef943cb76 100644 --- a/src/utils/__tests__/udsMessaging.test.ts +++ b/src/utils/__tests__/udsMessaging.test.ts @@ -1,5 +1,13 @@ -import { afterEach, describe, expect, test } from 'bun:test' -import { chmod, mkdir, rm, stat, symlink, unlink } from 'node:fs/promises' +import { afterEach, beforeEach, describe, expect, test } from 'bun:test' +import { + chmod, + mkdir, + mkdtemp, + rm, + stat, + symlink, + unlink, +} from 'node:fs/promises' import { createConnection, createServer } from 'node:net' import { dirname, join } from 'node:path' import { tmpdir } from 'node:os' @@ -15,6 +23,9 @@ import { stopUdsMessaging, } from '../udsMessaging.js' +let previousConfigDir: string | undefined +let tempConfigDir = '' + function socketPath(label: string): string { const suffix = `${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}-${label}` if (process.platform === 'win32') { @@ -52,10 +63,25 @@ async function waitForEnqueues( setOnEnqueue(null) } +beforeEach(async () => { + previousConfigDir = process.env.CLAUDE_CONFIG_DIR + tempConfigDir = await mkdtemp(join(tmpdir(), 'uds-messaging-home-')) + process.env.CLAUDE_CONFIG_DIR = tempConfigDir +}) + afterEach(async () => { setOnEnqueue(null) drainInbox() await stopUdsMessaging() + if (previousConfigDir === undefined) { + delete process.env.CLAUDE_CONFIG_DIR + } else { + process.env.CLAUDE_CONFIG_DIR = previousConfigDir + } + if (tempConfigDir) { + await rm(tempConfigDir, { recursive: true, force: true }) + tempConfigDir = '' + } }) async function closeServer(server: ReturnType): Promise { @@ -133,6 +159,57 @@ describe('UDS inbox retention', () => { expect(drainInbox()).toEqual([]) }) + test('udsClient helpers authenticate through the capability file', async () => { + const path = socketPath('uds-client') + await startUdsMessaging(path, { isExplicit: true }) + const { isPeerAlive, sendToUdsSocket } = await import('../udsClient.js') + + expect(await isPeerAlive(path)).toBe(true) + await waitForEnqueues(1, async () => { + await sendToUdsSocket(path, 'hello from client') + }) + + const drained = drainInbox() + expect(drained).toHaveLength(1) + expect(drained[0]?.message.data).toBe('hello from client') + expect(drained[0]?.message.meta).toBeUndefined() + }) + + test('udsClient peer probe fails closed on oversized pong frames', async () => { + const path = socketPath('uds-client-oversized-pong') + if (process.platform !== 'win32') { + await mkdir(dirname(path), { recursive: true }) + } + const receiver = createServer(socket => { + socket.on('data', () => { + socket.write('x'.repeat(MAX_UDS_FRAME_BYTES + 1)) + }) + }) + await new Promise((resolve, reject) => { + receiver.on('error', reject) + receiver.listen(path, () => resolve()) + }) + + try { + const { isPeerAlive } = await import('../udsClient.js') + expect(await isPeerAlive(path)).toBe(false) + } finally { + await closeServer(receiver) + if (process.platform !== 'win32') { + await unlink(path).catch(() => undefined) + } + } + }) + + test('udsClient send fails closed when no capability token exists', async () => { + const path = socketPath('uds-client-no-token') + const { sendToUdsSocket } = await import('../udsClient.js') + + await expect(sendToUdsSocket(path, 'hello')).rejects.toThrow( + 'No auth token found', + ) + }) + test('drained entries never expose the UDS auth token', async () => { const path = socketPath('strip-token') await startUdsMessaging(path, { isExplicit: true }) @@ -301,5 +378,24 @@ describe('UDS inbox retention', () => { await rm(tempHome, { recursive: true, force: true }) } }) + + test('fails closed when an explicit socket parent is not private', async () => { + const parent = join( + tmpdir(), + `uds-socket-parent-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`, + ) + await mkdir(parent, { recursive: true, mode: 0o755 }) + await chmod(parent, 0o755) + + try { + await expect( + startUdsMessaging(join(parent, 'messaging.sock'), { + isExplicit: true, + }), + ).rejects.toThrow('socket parent permissions are too broad') + } finally { + await rm(parent, { recursive: true, force: true }) + } + }) } }) diff --git a/src/utils/ndjsonFramer.ts b/src/utils/ndjsonFramer.ts index ecaa04dc8..7832e9303 100644 --- a/src/utils/ndjsonFramer.ts +++ b/src/utils/ndjsonFramer.ts @@ -27,6 +27,7 @@ export function attachNdjsonFramer( options: NdjsonFramerOptions = {}, ): void { let buffer = '' + let bufferBytes = 0 const maxFrameBytes = options.maxFrameBytes ?? Number.POSITIVE_INFINITY const rejectOversizedFrame = (bytes: number): void => { @@ -37,41 +38,48 @@ export function attachNdjsonFramer( socket.destroy(error) } + const emitLine = (line: string): void => { + if (!line.trim()) return + try { + onMessage(parse(line)) + } catch { + // Malformed JSON — skip + } + } + socket.on('data', (chunk: Buffer) => { + let start = 0 + for (let index = 0; index < chunk.length; index++) { + if (chunk[index] !== 0x0a) continue + + const segmentBytes = index - start + if ( + Number.isFinite(maxFrameBytes) && + bufferBytes + segmentBytes > maxFrameBytes + ) { + rejectOversizedFrame(bufferBytes + segmentBytes) + return + } + + buffer += chunk.subarray(start, index).toString('utf8') + emitLine(buffer) + buffer = '' + bufferBytes = 0 + start = index + 1 + } + + const tailBytes = chunk.length - start if ( Number.isFinite(maxFrameBytes) && - !chunk.includes(0x0a) && - Buffer.byteLength(buffer, 'utf8') + chunk.byteLength > maxFrameBytes + bufferBytes + tailBytes > maxFrameBytes ) { - rejectOversizedFrame(Buffer.byteLength(buffer, 'utf8') + chunk.byteLength) + rejectOversizedFrame(bufferBytes + tailBytes) return } - buffer += chunk.toString() - const lines = buffer.split('\n') - buffer = lines.pop() ?? '' - - for (const line of lines) { - if (!line.trim()) continue - if ( - Number.isFinite(maxFrameBytes) && - Buffer.byteLength(line, 'utf8') > maxFrameBytes - ) { - rejectOversizedFrame(Buffer.byteLength(line, 'utf8')) - return - } - try { - onMessage(parse(line)) - } catch { - // Malformed JSON — skip - } - } - - if ( - Number.isFinite(maxFrameBytes) && - Buffer.byteLength(buffer, 'utf8') > maxFrameBytes - ) { - rejectOversizedFrame(Buffer.byteLength(buffer, 'utf8')) + if (tailBytes > 0) { + buffer += chunk.subarray(start).toString('utf8') + bufferBytes += tailBytes } }) } diff --git a/src/utils/teammateMailbox.ts b/src/utils/teammateMailbox.ts index 6c18fd721..ad9b22f93 100644 --- a/src/utils/teammateMailbox.ts +++ b/src/utils/teammateMailbox.ts @@ -8,7 +8,7 @@ */ import { randomBytes } from 'crypto' -import { mkdir, readFile, rename, stat, writeFile } from 'fs/promises' +import { mkdir, readFile, rename, stat, unlink, writeFile } from 'fs/promises' import { join } from 'path' import { z } from 'zod/v4' import { TEAMMATE_MESSAGE_TAG } from '../constants/xml.js' @@ -77,7 +77,7 @@ function shouldRetainUnreadAsProtocolMessage( 'type' in (parsed as Record), ) } catch { - return true + return false } } @@ -160,8 +160,13 @@ async function writeMailboxAtomic( ) } const tempPath = `${inboxPath}.${process.pid}.${randomBytes(8).toString('hex')}.tmp` - await writeFile(tempPath, content, 'utf-8') - await rename(tempPath, inboxPath) + try { + await writeFile(tempPath, content, 'utf-8') + await rename(tempPath, inboxPath) + } catch (error) { + await unlink(tempPath).catch(() => undefined) + throw error + } } export function compactMailboxMessages( diff --git a/src/utils/udsClient.ts b/src/utils/udsClient.ts index f08bee696..e33ef3fdb 100644 --- a/src/utils/udsClient.ts +++ b/src/utils/udsClient.ts @@ -17,6 +17,7 @@ import { isProcessRunning } from './genericProcessUtils.js' import { jsonParse, jsonStringify } from './slowOperations.js' import type { SessionKind } from './concurrentSessions.js' import { MAX_UDS_FRAME_BYTES, type UdsMessage } from './udsMessaging.js' +import { attachUdsResponseReader, getChunkBytes } from './udsResponseReader.js' // --------------------------------------------------------------------------- // Types @@ -43,12 +44,6 @@ function getSessionsDir(): string { return join(getClaudeConfigHomeDir(), 'sessions') } -function getChunkBytes(chunk: string | Buffer): number { - return typeof chunk === 'string' - ? Buffer.byteLength(chunk, 'utf8') - : chunk.byteLength -} - // --------------------------------------------------------------------------- // Discovery // --------------------------------------------------------------------------- @@ -218,56 +213,33 @@ export async function sendToUdsSocket( udsMsg.from = getUdsMessagingSocketPath() return new Promise((resolve, reject) => { - let buffer = '' let settled = false + let conn: ReturnType const finish = (error?: Error): void => { if (settled) return settled = true - conn.end() - if (error) reject(error) - else resolve() + if (error) { + conn.destroy(error) + reject(error) + } else { + conn.end() + resolve() + } } - const conn = createConnection(target.socketPath, () => { + + conn = createConnection(target.socketPath, () => { udsMsg.meta = { ...udsMsg.meta, authToken } conn.write(jsonStringify(udsMsg) + '\n', err => { if (err) finish(err) }) }) - conn.on('data', chunk => { - if ( - Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) > - MAX_UDS_FRAME_BYTES - ) { - finish(new Error('UDS response frame exceeded size limit')) - return - } - buffer += chunk.toString() - const lines = buffer.split('\n') - buffer = lines.pop() ?? '' - for (const line of lines) { - if (!line.trim()) continue - let response: UdsMessage - try { - response = jsonParse(line) as UdsMessage - } catch { - continue - } - if (response.type === 'response') { - finish() - return - } - if (response.type === 'error') { - finish(new Error(response.data ?? 'UDS receiver rejected message')) - return - } - } - }) - conn.on('error', err => { - finish( + attachUdsResponseReader(conn, { + maxFrameBytes: MAX_UDS_FRAME_BYTES, + onSettled: finish, + formatSocketError: err => new Error( `Failed to connect to peer at ${target.socketPath}: ${errorMessage(err)}`, ), - ) }) conn.setTimeout(5000, () => { finish(new Error('Connection timed out')) diff --git a/src/utils/udsMessaging.ts b/src/utils/udsMessaging.ts index 7efa7fbf6..94b73dcd6 100644 --- a/src/utils/udsMessaging.ts +++ b/src/utils/udsMessaging.ts @@ -8,7 +8,7 @@ * but can be overridden via --messaging-socket-path. */ -import { createHash, randomBytes } from 'crypto' +import { createHash, randomBytes, timingSafeEqual } from 'crypto' import { createServer, type Server, type Socket } from 'net' import { chmod, @@ -26,6 +26,7 @@ import { logForDebugging } from './debug.js' import { errorMessage } from './errors.js' import { getClaudeConfigHomeDir } from './envUtils.js' import { attachNdjsonFramer } from './ndjsonFramer.js' +import { attachUdsResponseReader } from './udsResponseReader.js' import { logError } from './log.js' import { jsonParse, jsonStringify } from './slowOperations.js' @@ -160,26 +161,36 @@ async function assertPrivateCapabilityDir(dir: string): Promise { stat = await lstat(dir) } + assertPrivateDirectory(stat, dir, 'capability directory') + await chmod(dir, 0o700) +} + +function assertPrivateDirectory( + stat: Awaited>, + dir: string, + label: string, +): void { if (!stat.isDirectory() || stat.isSymbolicLink()) { throw new Error( - `[udsMessaging] capability directory is not a private directory: ${dir}`, + `[udsMessaging] ${label} is not a private directory: ${dir}`, ) } if (process.platform !== 'win32') { - const broadMode = stat.mode & 0o077 + const broadMode = Number(stat.mode) & 0o077 if (broadMode !== 0) { throw new Error( - `[udsMessaging] capability directory permissions are too broad: ${dir}`, + `[udsMessaging] ${label} permissions are too broad: ${dir}`, ) } - if (typeof process.getuid === 'function' && stat.uid !== process.getuid()) { + if ( + typeof process.getuid === 'function' && + Number(stat.uid) !== process.getuid() + ) { throw new Error( - `[udsMessaging] capability directory owner does not match current user: ${dir}`, + `[udsMessaging] ${label} owner does not match current user: ${dir}`, ) } } - - await chmod(dir, 0o700) } async function writePrivateFileExclusive( @@ -204,6 +215,7 @@ async function ensureSocketParent(path: string): Promise { `[udsMessaging] socket parent is not a directory: ${dir}`, ) } + assertPrivateDirectory(stat, dir, 'socket parent') return } catch (error) { if (!isNotFound(error)) throw error @@ -314,7 +326,12 @@ function getMessageAuthToken(message: UdsMessage): string | undefined { } function isAuthorizedMessage(message: UdsMessage): boolean { - return getMessageAuthToken(message) === authToken + const provided = getMessageAuthToken(message) + if (!provided || !authToken) return false + const providedBuffer = Buffer.from(provided, 'utf8') + const expectedBuffer = Buffer.from(authToken, 'utf8') + if (providedBuffer.length !== expectedBuffer.length) return false + return timingSafeEqual(providedBuffer, expectedBuffer) } function writeSocketMessage(socket: Socket, message: UdsMessage): void { @@ -554,20 +571,6 @@ export async function stopUdsMessaging(): Promise { } } -function parseResponseLine(line: string): UdsMessage | null { - try { - return jsonParse(line) as UdsMessage - } catch { - return null - } -} - -function getChunkBytes(chunk: string | Buffer): number { - return typeof chunk === 'string' - ? Buffer.byteLength(chunk, 'utf8') - : chunk.byteLength -} - /** * Send a UDS message to a specific socket path (outbound — used when this * session wants to push a message to a peer's server). @@ -592,46 +595,30 @@ export async function sendUdsMessage( ) return new Promise((resolve, reject) => { - let buffer = '' let settled = false + let conn: ReturnType const finish = (error?: Error): void => { if (settled) return settled = true - conn.end() - if (error) reject(error) - else resolve() + if (error) { + conn.destroy(error) + reject(error) + } else { + conn.end() + resolve() + } } - const conn = createConnection(targetSocketPath, () => { + + conn = createConnection(targetSocketPath, () => { conn.write(jsonStringify(outbound) + '\n', err => { if (err) finish(err) }) }) - conn.on('data', chunk => { - if ( - Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) > - MAX_UDS_FRAME_BYTES - ) { - finish(new Error('UDS response frame exceeded size limit')) - return - } - buffer += chunk.toString() - const lines = buffer.split('\n') - buffer = lines.pop() ?? '' - for (const line of lines) { - if (!line.trim()) continue - const response = parseResponseLine(line) - if (!response) continue - if (response.type === 'response' || response.type === 'pong') { - finish() - return - } - if (response.type === 'error') { - finish(new Error(response.data ?? 'UDS receiver rejected message')) - return - } - } + attachUdsResponseReader(conn, { + maxFrameBytes: MAX_UDS_FRAME_BYTES, + acceptPong: true, + onSettled: finish, }) - conn.on('error', err => finish(err)) // Timeout so we don't hang on unreachable sockets conn.setTimeout(5000, () => { finish(new Error('Connection timed out')) diff --git a/src/utils/udsResponseReader.ts b/src/utils/udsResponseReader.ts new file mode 100644 index 000000000..bb8d21f40 --- /dev/null +++ b/src/utils/udsResponseReader.ts @@ -0,0 +1,81 @@ +import type { Socket } from 'net' +import { errorMessage } from './errors.js' +import { jsonParse } from './slowOperations.js' +import type { UdsMessage } from './udsMessaging.js' + +type UdsResponseReaderOptions = { + maxFrameBytes: number + acceptPong?: boolean + onSettled: (error?: Error) => void + formatSocketError?: (error: unknown) => Error +} + +export function getChunkBytes(chunk: string | Buffer): number { + return typeof chunk === 'string' + ? Buffer.byteLength(chunk, 'utf8') + : chunk.byteLength +} + +function parseResponseLine(line: string): UdsMessage | null { + try { + return jsonParse(line) as UdsMessage + } catch { + return null + } +} + +export function attachUdsResponseReader( + socket: Socket, + options: UdsResponseReaderOptions, +): void { + let buffer = '' + let settled = false + + const finish = (error?: Error): void => { + if (settled) return + settled = true + if (error) { + socket.destroy(error) + } else { + socket.end() + } + options.onSettled(error) + } + + socket.on('data', chunk => { + if ( + Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) > + options.maxFrameBytes + ) { + finish(new Error('UDS response frame exceeded size limit')) + return + } + + buffer += chunk.toString() + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + for (const line of lines) { + if (!line.trim()) continue + const response = parseResponseLine(line) + if (!response) continue + if ( + response.type === 'response' || + (options.acceptPong === true && response.type === 'pong') + ) { + finish() + return + } + if (response.type === 'error') { + finish(new Error(response.data ?? 'UDS receiver rejected message')) + return + } + } + }) + + socket.on('error', error => { + finish( + options.formatSocketError?.(error) ?? + (error instanceof Error ? error : new Error(errorMessage(error))), + ) + }) +}