fix: harden bounded agent communication review fixes

CodeRabbit and Codecov surfaced real gaps in UDS framing, peer discovery, mailbox retention, and summary context coverage. This tightens those paths without suppressing review or coverage signals.

Constraint: PR #369 must address CodeRabbit and Codecov findings without warning suppression or fake fallbacks

Rejected: Suppress Codecov or CodeRabbit warnings | leaves real receive-path and test-isolation gaps

Rejected: Add unreachable feature-gated tests | bun:bundle keeps those branches compile-time gated in local tests

Confidence: high

Scope-risk: moderate

Directive: Keep UDS auth-token rejection outside feature flags; do not reintroduce inline token fallbacks

Tested: bun test --coverage --coverage-reporter lcov --coverage-dir coverage; bun run test:all; bun run lint; bun run build; bun run build:vite; bun audit; git diff --cached --check

Not-tested: Remote Codecov/CodeRabbit refreshed reports until pushed
This commit is contained in:
unraid
2026-04-27 10:32:18 +08:00
parent f353eb056a
commit ee0d788e58
15 changed files with 657 additions and 153 deletions

View File

@@ -3,8 +3,10 @@ import { mkdir, readFile, rm, writeFile } from 'node:fs/promises'
import { mkdtempSync } from 'node:fs'
import { tmpdir } from 'node:os'
import { dirname, join } from 'node:path'
import type { Message } from '../../types/message.js'
import {
compactMailboxMessages,
getLastPeerDmSummary,
getInboxPath,
markMessageAsReadByIndex,
markMessageAsReadByIdentity,
@@ -119,6 +121,23 @@ describe('compactMailboxMessages', () => {
])
})
test('does not prioritize malformed JSON-like unread messages as protocol', () => {
const compacted = compactMailboxMessages(
[
message('{not-json', false),
message('regular-1', false),
message('regular-2', false),
],
{
maxMessages: 1,
maxReadMessages: 0,
maxUnreadProtocolMessages: 10,
},
)
expect(compacted.map(m => m.text)).toEqual(['regular-2'])
})
test('caps unread protocol messages with an independent bound', () => {
const compacted = compactMailboxMessages(
Array.from(
@@ -308,3 +327,54 @@ describe('teammate mailbox retention', () => {
await expect(readMailbox('worker', 'alpha')).rejects.toThrow()
})
})
describe('getLastPeerDmSummary', () => {
test('extracts the final peer direct-message summary from assistant tool use', () => {
const messages = [
{ type: 'user', message: { content: 'wake up' } },
{
type: 'assistant',
message: {
content: [
{
type: 'tool_use',
name: 'SendMessage',
input: {
to: 'worker-1',
message: 'please check the UDS bounds',
summary: 'Checking UDS bounds',
},
},
],
},
},
] as unknown as Message[]
expect(getLastPeerDmSummary(messages)).toBe(
'[to worker-1] Checking UDS bounds',
)
})
test('stops peer direct-message summary search at the wake-up boundary', () => {
const messages = [
{
type: 'assistant',
message: {
content: [
{
type: 'tool_use',
name: 'SendMessage',
input: {
to: 'worker-1',
message: 'old message',
},
},
],
},
},
{ type: 'user', message: { content: 'new prompt' } },
] as unknown as Message[]
expect(getLastPeerDmSummary(messages)).toBeUndefined()
})
})

View File

@@ -1,5 +1,13 @@
import { afterEach, describe, expect, test } from 'bun:test'
import { chmod, mkdir, rm, stat, symlink, unlink } from 'node:fs/promises'
import { afterEach, beforeEach, describe, expect, test } from 'bun:test'
import {
chmod,
mkdir,
mkdtemp,
rm,
stat,
symlink,
unlink,
} from 'node:fs/promises'
import { createConnection, createServer } from 'node:net'
import { dirname, join } from 'node:path'
import { tmpdir } from 'node:os'
@@ -15,6 +23,9 @@ import {
stopUdsMessaging,
} from '../udsMessaging.js'
let previousConfigDir: string | undefined
let tempConfigDir = ''
function socketPath(label: string): string {
const suffix = `${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}-${label}`
if (process.platform === 'win32') {
@@ -52,10 +63,25 @@ async function waitForEnqueues(
setOnEnqueue(null)
}
beforeEach(async () => {
previousConfigDir = process.env.CLAUDE_CONFIG_DIR
tempConfigDir = await mkdtemp(join(tmpdir(), 'uds-messaging-home-'))
process.env.CLAUDE_CONFIG_DIR = tempConfigDir
})
afterEach(async () => {
setOnEnqueue(null)
drainInbox()
await stopUdsMessaging()
if (previousConfigDir === undefined) {
delete process.env.CLAUDE_CONFIG_DIR
} else {
process.env.CLAUDE_CONFIG_DIR = previousConfigDir
}
if (tempConfigDir) {
await rm(tempConfigDir, { recursive: true, force: true })
tempConfigDir = ''
}
})
async function closeServer(server: ReturnType<typeof createServer>): Promise<void> {
@@ -133,6 +159,57 @@ describe('UDS inbox retention', () => {
expect(drainInbox()).toEqual([])
})
test('udsClient helpers authenticate through the capability file', async () => {
const path = socketPath('uds-client')
await startUdsMessaging(path, { isExplicit: true })
const { isPeerAlive, sendToUdsSocket } = await import('../udsClient.js')
expect(await isPeerAlive(path)).toBe(true)
await waitForEnqueues(1, async () => {
await sendToUdsSocket(path, 'hello from client')
})
const drained = drainInbox()
expect(drained).toHaveLength(1)
expect(drained[0]?.message.data).toBe('hello from client')
expect(drained[0]?.message.meta).toBeUndefined()
})
test('udsClient peer probe fails closed on oversized pong frames', async () => {
const path = socketPath('uds-client-oversized-pong')
if (process.platform !== 'win32') {
await mkdir(dirname(path), { recursive: true })
}
const receiver = createServer(socket => {
socket.on('data', () => {
socket.write('x'.repeat(MAX_UDS_FRAME_BYTES + 1))
})
})
await new Promise<void>((resolve, reject) => {
receiver.on('error', reject)
receiver.listen(path, () => resolve())
})
try {
const { isPeerAlive } = await import('../udsClient.js')
expect(await isPeerAlive(path)).toBe(false)
} finally {
await closeServer(receiver)
if (process.platform !== 'win32') {
await unlink(path).catch(() => undefined)
}
}
})
test('udsClient send fails closed when no capability token exists', async () => {
const path = socketPath('uds-client-no-token')
const { sendToUdsSocket } = await import('../udsClient.js')
await expect(sendToUdsSocket(path, 'hello')).rejects.toThrow(
'No auth token found',
)
})
test('drained entries never expose the UDS auth token', async () => {
const path = socketPath('strip-token')
await startUdsMessaging(path, { isExplicit: true })
@@ -301,5 +378,24 @@ describe('UDS inbox retention', () => {
await rm(tempHome, { recursive: true, force: true })
}
})
test('fails closed when an explicit socket parent is not private', async () => {
const parent = join(
tmpdir(),
`uds-socket-parent-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`,
)
await mkdir(parent, { recursive: true, mode: 0o755 })
await chmod(parent, 0o755)
try {
await expect(
startUdsMessaging(join(parent, 'messaging.sock'), {
isExplicit: true,
}),
).rejects.toThrow('socket parent permissions are too broad')
} finally {
await rm(parent, { recursive: true, force: true })
}
})
}
})

View File

@@ -27,6 +27,7 @@ export function attachNdjsonFramer<T = unknown>(
options: NdjsonFramerOptions = {},
): void {
let buffer = ''
let bufferBytes = 0
const maxFrameBytes = options.maxFrameBytes ?? Number.POSITIVE_INFINITY
const rejectOversizedFrame = (bytes: number): void => {
@@ -37,41 +38,48 @@ export function attachNdjsonFramer<T = unknown>(
socket.destroy(error)
}
const emitLine = (line: string): void => {
if (!line.trim()) return
try {
onMessage(parse(line))
} catch {
// Malformed JSON — skip
}
}
socket.on('data', (chunk: Buffer) => {
let start = 0
for (let index = 0; index < chunk.length; index++) {
if (chunk[index] !== 0x0a) continue
const segmentBytes = index - start
if (
Number.isFinite(maxFrameBytes) &&
bufferBytes + segmentBytes > maxFrameBytes
) {
rejectOversizedFrame(bufferBytes + segmentBytes)
return
}
buffer += chunk.subarray(start, index).toString('utf8')
emitLine(buffer)
buffer = ''
bufferBytes = 0
start = index + 1
}
const tailBytes = chunk.length - start
if (
Number.isFinite(maxFrameBytes) &&
!chunk.includes(0x0a) &&
Buffer.byteLength(buffer, 'utf8') + chunk.byteLength > maxFrameBytes
bufferBytes + tailBytes > maxFrameBytes
) {
rejectOversizedFrame(Buffer.byteLength(buffer, 'utf8') + chunk.byteLength)
rejectOversizedFrame(bufferBytes + tailBytes)
return
}
buffer += chunk.toString()
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
if (!line.trim()) continue
if (
Number.isFinite(maxFrameBytes) &&
Buffer.byteLength(line, 'utf8') > maxFrameBytes
) {
rejectOversizedFrame(Buffer.byteLength(line, 'utf8'))
return
}
try {
onMessage(parse(line))
} catch {
// Malformed JSON — skip
}
}
if (
Number.isFinite(maxFrameBytes) &&
Buffer.byteLength(buffer, 'utf8') > maxFrameBytes
) {
rejectOversizedFrame(Buffer.byteLength(buffer, 'utf8'))
if (tailBytes > 0) {
buffer += chunk.subarray(start).toString('utf8')
bufferBytes += tailBytes
}
})
}

View File

@@ -8,7 +8,7 @@
*/
import { randomBytes } from 'crypto'
import { mkdir, readFile, rename, stat, writeFile } from 'fs/promises'
import { mkdir, readFile, rename, stat, unlink, writeFile } from 'fs/promises'
import { join } from 'path'
import { z } from 'zod/v4'
import { TEAMMATE_MESSAGE_TAG } from '../constants/xml.js'
@@ -77,7 +77,7 @@ function shouldRetainUnreadAsProtocolMessage(
'type' in (parsed as Record<string, unknown>),
)
} catch {
return true
return false
}
}
@@ -160,8 +160,13 @@ async function writeMailboxAtomic(
)
}
const tempPath = `${inboxPath}.${process.pid}.${randomBytes(8).toString('hex')}.tmp`
await writeFile(tempPath, content, 'utf-8')
await rename(tempPath, inboxPath)
try {
await writeFile(tempPath, content, 'utf-8')
await rename(tempPath, inboxPath)
} catch (error) {
await unlink(tempPath).catch(() => undefined)
throw error
}
}
export function compactMailboxMessages(

View File

@@ -17,6 +17,7 @@ import { isProcessRunning } from './genericProcessUtils.js'
import { jsonParse, jsonStringify } from './slowOperations.js'
import type { SessionKind } from './concurrentSessions.js'
import { MAX_UDS_FRAME_BYTES, type UdsMessage } from './udsMessaging.js'
import { attachUdsResponseReader, getChunkBytes } from './udsResponseReader.js'
// ---------------------------------------------------------------------------
// Types
@@ -43,12 +44,6 @@ function getSessionsDir(): string {
return join(getClaudeConfigHomeDir(), 'sessions')
}
function getChunkBytes(chunk: string | Buffer): number {
return typeof chunk === 'string'
? Buffer.byteLength(chunk, 'utf8')
: chunk.byteLength
}
// ---------------------------------------------------------------------------
// Discovery
// ---------------------------------------------------------------------------
@@ -218,56 +213,33 @@ export async function sendToUdsSocket(
udsMsg.from = getUdsMessagingSocketPath()
return new Promise<void>((resolve, reject) => {
let buffer = ''
let settled = false
let conn: ReturnType<typeof createConnection>
const finish = (error?: Error): void => {
if (settled) return
settled = true
conn.end()
if (error) reject(error)
else resolve()
if (error) {
conn.destroy(error)
reject(error)
} else {
conn.end()
resolve()
}
}
const conn = createConnection(target.socketPath, () => {
conn = createConnection(target.socketPath, () => {
udsMsg.meta = { ...udsMsg.meta, authToken }
conn.write(jsonStringify(udsMsg) + '\n', err => {
if (err) finish(err)
})
})
conn.on('data', chunk => {
if (
Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) >
MAX_UDS_FRAME_BYTES
) {
finish(new Error('UDS response frame exceeded size limit'))
return
}
buffer += chunk.toString()
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
if (!line.trim()) continue
let response: UdsMessage
try {
response = jsonParse(line) as UdsMessage
} catch {
continue
}
if (response.type === 'response') {
finish()
return
}
if (response.type === 'error') {
finish(new Error(response.data ?? 'UDS receiver rejected message'))
return
}
}
})
conn.on('error', err => {
finish(
attachUdsResponseReader(conn, {
maxFrameBytes: MAX_UDS_FRAME_BYTES,
onSettled: finish,
formatSocketError: err =>
new Error(
`Failed to connect to peer at ${target.socketPath}: ${errorMessage(err)}`,
),
)
})
conn.setTimeout(5000, () => {
finish(new Error('Connection timed out'))

View File

@@ -8,7 +8,7 @@
* but can be overridden via --messaging-socket-path.
*/
import { createHash, randomBytes } from 'crypto'
import { createHash, randomBytes, timingSafeEqual } from 'crypto'
import { createServer, type Server, type Socket } from 'net'
import {
chmod,
@@ -26,6 +26,7 @@ import { logForDebugging } from './debug.js'
import { errorMessage } from './errors.js'
import { getClaudeConfigHomeDir } from './envUtils.js'
import { attachNdjsonFramer } from './ndjsonFramer.js'
import { attachUdsResponseReader } from './udsResponseReader.js'
import { logError } from './log.js'
import { jsonParse, jsonStringify } from './slowOperations.js'
@@ -160,26 +161,36 @@ async function assertPrivateCapabilityDir(dir: string): Promise<void> {
stat = await lstat(dir)
}
assertPrivateDirectory(stat, dir, 'capability directory')
await chmod(dir, 0o700)
}
function assertPrivateDirectory(
stat: Awaited<ReturnType<typeof lstat>>,
dir: string,
label: string,
): void {
if (!stat.isDirectory() || stat.isSymbolicLink()) {
throw new Error(
`[udsMessaging] capability directory is not a private directory: ${dir}`,
`[udsMessaging] ${label} is not a private directory: ${dir}`,
)
}
if (process.platform !== 'win32') {
const broadMode = stat.mode & 0o077
const broadMode = Number(stat.mode) & 0o077
if (broadMode !== 0) {
throw new Error(
`[udsMessaging] capability directory permissions are too broad: ${dir}`,
`[udsMessaging] ${label} permissions are too broad: ${dir}`,
)
}
if (typeof process.getuid === 'function' && stat.uid !== process.getuid()) {
if (
typeof process.getuid === 'function' &&
Number(stat.uid) !== process.getuid()
) {
throw new Error(
`[udsMessaging] capability directory owner does not match current user: ${dir}`,
`[udsMessaging] ${label} owner does not match current user: ${dir}`,
)
}
}
await chmod(dir, 0o700)
}
async function writePrivateFileExclusive(
@@ -204,6 +215,7 @@ async function ensureSocketParent(path: string): Promise<void> {
`[udsMessaging] socket parent is not a directory: ${dir}`,
)
}
assertPrivateDirectory(stat, dir, 'socket parent')
return
} catch (error) {
if (!isNotFound(error)) throw error
@@ -314,7 +326,12 @@ function getMessageAuthToken(message: UdsMessage): string | undefined {
}
function isAuthorizedMessage(message: UdsMessage): boolean {
return getMessageAuthToken(message) === authToken
const provided = getMessageAuthToken(message)
if (!provided || !authToken) return false
const providedBuffer = Buffer.from(provided, 'utf8')
const expectedBuffer = Buffer.from(authToken, 'utf8')
if (providedBuffer.length !== expectedBuffer.length) return false
return timingSafeEqual(providedBuffer, expectedBuffer)
}
function writeSocketMessage(socket: Socket, message: UdsMessage): void {
@@ -554,20 +571,6 @@ export async function stopUdsMessaging(): Promise<void> {
}
}
function parseResponseLine(line: string): UdsMessage | null {
try {
return jsonParse(line) as UdsMessage
} catch {
return null
}
}
function getChunkBytes(chunk: string | Buffer): number {
return typeof chunk === 'string'
? Buffer.byteLength(chunk, 'utf8')
: chunk.byteLength
}
/**
* Send a UDS message to a specific socket path (outbound — used when this
* session wants to push a message to a peer's server).
@@ -592,46 +595,30 @@ export async function sendUdsMessage(
)
return new Promise<void>((resolve, reject) => {
let buffer = ''
let settled = false
let conn: ReturnType<typeof createConnection>
const finish = (error?: Error): void => {
if (settled) return
settled = true
conn.end()
if (error) reject(error)
else resolve()
if (error) {
conn.destroy(error)
reject(error)
} else {
conn.end()
resolve()
}
}
const conn = createConnection(targetSocketPath, () => {
conn = createConnection(targetSocketPath, () => {
conn.write(jsonStringify(outbound) + '\n', err => {
if (err) finish(err)
})
})
conn.on('data', chunk => {
if (
Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) >
MAX_UDS_FRAME_BYTES
) {
finish(new Error('UDS response frame exceeded size limit'))
return
}
buffer += chunk.toString()
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
if (!line.trim()) continue
const response = parseResponseLine(line)
if (!response) continue
if (response.type === 'response' || response.type === 'pong') {
finish()
return
}
if (response.type === 'error') {
finish(new Error(response.data ?? 'UDS receiver rejected message'))
return
}
}
attachUdsResponseReader(conn, {
maxFrameBytes: MAX_UDS_FRAME_BYTES,
acceptPong: true,
onSettled: finish,
})
conn.on('error', err => finish(err))
// Timeout so we don't hang on unreachable sockets
conn.setTimeout(5000, () => {
finish(new Error('Connection timed out'))

View File

@@ -0,0 +1,81 @@
import type { Socket } from 'net'
import { errorMessage } from './errors.js'
import { jsonParse } from './slowOperations.js'
import type { UdsMessage } from './udsMessaging.js'
type UdsResponseReaderOptions = {
maxFrameBytes: number
acceptPong?: boolean
onSettled: (error?: Error) => void
formatSocketError?: (error: unknown) => Error
}
export function getChunkBytes(chunk: string | Buffer): number {
return typeof chunk === 'string'
? Buffer.byteLength(chunk, 'utf8')
: chunk.byteLength
}
function parseResponseLine(line: string): UdsMessage | null {
try {
return jsonParse(line) as UdsMessage
} catch {
return null
}
}
export function attachUdsResponseReader(
socket: Socket,
options: UdsResponseReaderOptions,
): void {
let buffer = ''
let settled = false
const finish = (error?: Error): void => {
if (settled) return
settled = true
if (error) {
socket.destroy(error)
} else {
socket.end()
}
options.onSettled(error)
}
socket.on('data', chunk => {
if (
Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) >
options.maxFrameBytes
) {
finish(new Error('UDS response frame exceeded size limit'))
return
}
buffer += chunk.toString()
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
if (!line.trim()) continue
const response = parseResponseLine(line)
if (!response) continue
if (
response.type === 'response' ||
(options.acceptPong === true && response.type === 'pong')
) {
finish()
return
}
if (response.type === 'error') {
finish(new Error(response.data ?? 'UDS receiver rejected message'))
return
}
}
})
socket.on('error', error => {
finish(
options.formatSocketError?.(error) ??
(error instanceof Error ? error : new Error(errorMessage(error))),
)
})
}