fix: bound agent communication memory growth

UDS messaging now uses private local capabilities instead of exposing auth tokens through SDK metadata, environment variables, session registry, peer listing, or tool output. The receive path bounds NDJSON frames, response buffers, active clients, and pending inbox bytes, and strips auth metadata before messages enter the prompt queue.

Teammate mailboxes now validate file and message sizes, fail closed on corrupt mutation inputs, compact by count and retained bytes, and use stable message identity for in-process acknowledgements. Agent summaries now fork only a bounded recent context using lazy size estimation and content fingerprints instead of retaining or serializing unbounded histories.

Constraint: PR #361 was already merged; this branch is based on upstream/main@c2ac9a74.
Rejected: Default-disabling COORDINATOR_MODE/TEAMMEM only | explicit feature enablement still hit unbounded paths.
Rejected: Persisting UDS auth in SDK/env/session registry | bridge/remote metadata can leak local capability secrets.
Rejected: Inline uds #token addresses | observable/tool/classifier paths can reflect raw addresses outside the UDS request frame.
Rejected: Positional mailbox marking after compaction | compaction can shift indices across the lock boundary.
Confidence: high
Scope-risk: moderate
Directive: Do not expose UDS capability tokens through SDK messages, environment variables, session registry, peer-list output, or SendMessage result/classifier surfaces.
Directive: Do not reintroduce positional mailbox acknowledgements unless compaction is removed or read+mark is atomic under one lock.
Tested: bun test src/utils/__tests__/ndjsonFramer.test.ts src/utils/__tests__/udsMessaging.test.ts packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts
Tested: bunx tsc --noEmit --pretty false
Tested: bun run lint
Tested: bunx biome lint modified src/package files
Tested: bun run test:all (3704 pass, 0 fail, 6734 expects)
Tested: bun audit (No vulnerabilities found)
Tested: bun run build
Tested: bun run build:vite
Tested: git diff --check
Not-tested: End-to-end external UDS client driving a full production headless model turn.
This commit is contained in:
unraid
2026-04-26 21:44:42 +08:00
parent c2ac9a74c1
commit f353eb056a
17 changed files with 2087 additions and 137 deletions

View File

@@ -8,14 +8,25 @@
* but can be overridden via --messaging-socket-path.
*/
import { createHash, randomBytes } from 'crypto'
import { createServer, type Server, type Socket } from 'net'
import { mkdir, unlink } from 'fs/promises'
import {
chmod,
lstat,
mkdir,
open,
readFile,
rename,
unlink,
} from 'fs/promises'
import { dirname, join } from 'path'
import { tmpdir } from 'os'
import { registerCleanup } from './cleanupRegistry.js'
import { logForDebugging } from './debug.js'
import { errorMessage } from './errors.js'
import { getClaudeConfigHomeDir } from './envUtils.js'
import { attachNdjsonFramer } from './ndjsonFramer.js'
import { logError } from './log.js'
import { jsonParse, jsonStringify } from './slowOperations.js'
// ---------------------------------------------------------------------------
@@ -27,6 +38,7 @@ export type UdsMessageType =
| 'notification'
| 'query'
| 'response'
| 'error'
| 'ping'
| 'pong'
@@ -60,6 +72,15 @@ let onEnqueueCb: (() => void) | null = null
const clients = new Set<Socket>()
const inbox: UdsInboxEntry[] = []
let nextId = 1
let defaultSocketPath: string | null = null
let authToken: string | null = null
let capabilityFilePath: string | null = null
let inboxBytes = 0
export const MAX_UDS_INBOX_ENTRIES = 1_000
export const MAX_UDS_FRAME_BYTES = 64 * 1024
export const MAX_UDS_INBOX_BYTES = 2 * 1024 * 1024
export const MAX_UDS_CLIENTS = 128
// ---------------------------------------------------------------------------
// Public API — socket path helpers
@@ -74,10 +95,19 @@ let nextId = 1
* transparently, but we use the pipe format on Windows for Node.js compat.
*/
export function getDefaultUdsSocketPath(): string {
if (defaultSocketPath) return defaultSocketPath
const nonce = randomBytes(16).toString('hex')
if (process.platform === 'win32') {
return `\\\\.\\pipe\\claude-code-${process.pid}`
defaultSocketPath = `\\\\.\\pipe\\claude-code-${process.pid}-${nonce}`
return defaultSocketPath
}
return join(tmpdir(), 'claude-code-socks', `${process.pid}.sock`)
defaultSocketPath = join(
tmpdir(),
'claude-code-socks',
`${process.pid}-${nonce}`,
'messaging.sock',
)
return defaultSocketPath
}
/**
@@ -88,6 +118,142 @@ export function getUdsMessagingSocketPath(): string | undefined {
return socketPath ?? undefined
}
export function formatUdsAddress(socket: string): string {
return `uds:${socket}`
}
export function parseUdsTarget(target: string): {
socketPath: string
} {
if (target.includes('#token=')) {
throw new Error(
'UDS target must not include an inline auth token; use the ListPeers address',
)
}
return { socketPath: target }
}
function getCapabilityDir(): string {
return join(getClaudeConfigHomeDir(), 'messaging-capabilities')
}
function getCapabilityPath(socket: string): string {
const digest = createHash('sha256').update(socket).digest('hex')
return join(getCapabilityDir(), `${digest}.json`)
}
function isNotFound(error: unknown): boolean {
return (
typeof error === 'object' &&
error !== null &&
(error as NodeJS.ErrnoException).code === 'ENOENT'
)
}
async function assertPrivateCapabilityDir(dir: string): Promise<void> {
let stat: Awaited<ReturnType<typeof lstat>>
try {
stat = await lstat(dir)
} catch (error) {
if (!isNotFound(error)) throw error
await mkdir(dir, { recursive: true, mode: 0o700 })
stat = await lstat(dir)
}
if (!stat.isDirectory() || stat.isSymbolicLink()) {
throw new Error(
`[udsMessaging] capability directory is not a private directory: ${dir}`,
)
}
if (process.platform !== 'win32') {
const broadMode = stat.mode & 0o077
if (broadMode !== 0) {
throw new Error(
`[udsMessaging] capability directory permissions are too broad: ${dir}`,
)
}
if (typeof process.getuid === 'function' && stat.uid !== process.getuid()) {
throw new Error(
`[udsMessaging] capability directory owner does not match current user: ${dir}`,
)
}
}
await chmod(dir, 0o700)
}
async function writePrivateFileExclusive(
path: string,
content: string,
): Promise<void> {
const handle = await open(path, 'wx', 0o600)
try {
await handle.writeFile(content, 'utf-8')
} finally {
await handle.close()
}
await chmod(path, 0o600)
}
async function ensureSocketParent(path: string): Promise<void> {
const dir = dirname(path)
try {
const stat = await lstat(dir)
if (!stat.isDirectory() || stat.isSymbolicLink()) {
throw new Error(
`[udsMessaging] socket parent is not a directory: ${dir}`,
)
}
return
} catch (error) {
if (!isNotFound(error)) throw error
}
await mkdir(dir, { recursive: true, mode: 0o700 })
await chmod(dir, 0o700)
}
async function writeCapabilityFile(
socket: string,
token: string,
): Promise<void> {
const dir = getCapabilityDir()
await assertPrivateCapabilityDir(dir)
const target = getCapabilityPath(socket)
const temp = `${target}.${process.pid}.${randomBytes(8).toString('hex')}.tmp`
try {
await writePrivateFileExclusive(
temp,
jsonStringify({ socketPath: socket, authToken: token }),
)
await rename(temp, target)
} catch (error) {
try {
await unlink(temp)
} catch {
// Temp file may not exist if exclusive creation failed.
}
throw error
}
capabilityFilePath = target
}
export async function readUdsCapabilityToken(
socket: string,
): Promise<string | undefined> {
try {
const parsed = jsonParse(
await readFile(getCapabilityPath(socket), 'utf-8'),
) as Record<string, unknown>
if (parsed.socketPath === socket && typeof parsed.authToken === 'string') {
return parsed.authToken
}
} catch {
// Missing or unreadable capability file means the peer is not addressable.
}
return undefined
}
// ---------------------------------------------------------------------------
// Inbox
// ---------------------------------------------------------------------------
@@ -101,16 +267,79 @@ export function setOnEnqueue(cb: (() => void) | null): void {
}
/**
* Drain all pending inbox messages, marking them processed.
* Drain all pending inbox messages and release retained history.
*/
export function drainInbox(): UdsInboxEntry[] {
const pending = inbox.filter(e => e.status === 'pending')
const pending = inbox.splice(0, inbox.length)
inboxBytes = 0
for (const entry of pending) {
entry.status = 'processed'
}
return pending
}
function getMessageBytes(message: UdsMessage): number {
return Buffer.byteLength(jsonStringify(message), 'utf8')
}
function enqueueInboxEntry(entry: UdsInboxEntry): boolean {
const entryBytes = getMessageBytes(entry.message)
if (
entryBytes > MAX_UDS_FRAME_BYTES ||
inbox.length >= MAX_UDS_INBOX_ENTRIES ||
inboxBytes + entryBytes > MAX_UDS_INBOX_BYTES
) {
logError(
new Error(
`[udsMessaging] inbox full (${inbox.length}/${MAX_UDS_INBOX_ENTRIES}, ${inboxBytes}/${MAX_UDS_INBOX_BYTES} bytes); dropping message type=${entry.message.type}`,
),
)
return false
}
inbox.push(entry)
inboxBytes += entryBytes
return true
}
function ensureAuthToken(): string {
if (!authToken) {
authToken = randomBytes(32).toString('hex')
}
return authToken
}
function getMessageAuthToken(message: UdsMessage): string | undefined {
const token = message.meta?.authToken
return typeof token === 'string' ? token : undefined
}
function isAuthorizedMessage(message: UdsMessage): boolean {
return getMessageAuthToken(message) === authToken
}
function writeSocketMessage(socket: Socket, message: UdsMessage): void {
if (socket.destroyed) return
socket.write(jsonStringify(message) + '\n')
}
function stripAuthToken(message: UdsMessage): UdsMessage {
const { authToken: _authToken, ...metaWithoutAuth } = message.meta ?? {}
return {
...message,
meta: Object.keys(metaWithoutAuth).length > 0 ? metaWithoutAuth : undefined,
}
}
function withRequestAuthToken(message: UdsMessage, token: string): UdsMessage {
return {
...message,
meta: {
...message.meta,
authToken: token,
},
}
}
// ---------------------------------------------------------------------------
// Server
// ---------------------------------------------------------------------------
@@ -132,7 +361,7 @@ export async function startUdsMessaging(
// Ensure parent directory exists (skip on Windows — pipe paths aren't files)
if (process.platform !== 'win32') {
await mkdir(dirname(path), { recursive: true })
await ensureSocketParent(path)
}
// Clean up stale socket file (skip on Windows — pipe paths aren't files)
@@ -144,69 +373,134 @@ export async function startUdsMessaging(
}
}
socketPath = path
const token = ensureAuthToken()
try {
await writeCapabilityFile(path, token)
socketPath = path
await new Promise<void>((resolve, reject) => {
const srv = createServer(socket => {
clients.add(socket)
logForDebugging(
`[udsMessaging] client connected (total: ${clients.size})`,
)
attachNdjsonFramer<UdsMessage>(
socket,
msg => {
// Handle ping with automatic pong
if (msg.type === 'ping') {
const pong: UdsMessage = {
type: 'pong',
from: socketPath ?? undefined,
ts: new Date().toISOString(),
}
if (!socket.destroyed) {
socket.write(jsonStringify(pong) + '\n')
}
return
}
// Enqueue into inbox
const entry: UdsInboxEntry = {
id: `uds-${nextId++}`,
message: msg,
receivedAt: Date.now(),
status: 'pending',
}
inbox.push(entry)
await new Promise<void>((resolve, reject) => {
const srv = createServer(socket => {
if (clients.size >= MAX_UDS_CLIENTS) {
logForDebugging(
`[udsMessaging] enqueued message type=${msg.type} from=${msg.from ?? 'unknown'}`,
`[udsMessaging] rejected client: ${clients.size}/${MAX_UDS_CLIENTS} clients already connected`,
)
onEnqueueCb?.()
},
text => jsonParse(text) as UdsMessage,
)
socket.destroy()
return
}
clients.add(socket)
logForDebugging(
`[udsMessaging] client connected (total: ${clients.size})`,
)
socket.on('close', () => {
clients.delete(socket)
attachNdjsonFramer<UdsMessage>(
socket,
msg => {
if (!isAuthorizedMessage(msg)) {
logForDebugging(
`[udsMessaging] rejected unauthenticated message type=${msg.type}`,
)
if (!socket.destroyed) {
socket.write(
jsonStringify({
type: 'error',
data: 'unauthorized',
ts: new Date().toISOString(),
} satisfies UdsMessage) + '\n',
)
}
return
}
// Handle ping with automatic pong
if (msg.type === 'ping') {
writeSocketMessage(socket, {
type: 'pong',
from: socketPath ?? undefined,
ts: new Date().toISOString(),
})
return
}
// Enqueue into inbox
const sanitizedMessage = stripAuthToken(msg)
const entry: UdsInboxEntry = {
id: `uds-${nextId++}`,
message: sanitizedMessage,
receivedAt: Date.now(),
status: 'pending',
}
if (!enqueueInboxEntry(entry)) {
writeSocketMessage(socket, {
type: 'error',
data: 'inbox full',
ts: new Date().toISOString(),
})
return
}
logForDebugging(
`[udsMessaging] enqueued message type=${msg.type} from=${msg.from ?? 'unknown'}`,
)
writeSocketMessage(socket, {
type: 'response',
data: 'ok',
ts: new Date().toISOString(),
meta: { id: entry.id },
})
onEnqueueCb?.()
},
text => jsonParse(text) as UdsMessage,
{
maxFrameBytes: MAX_UDS_FRAME_BYTES,
onFrameError: error => {
logForDebugging(`[udsMessaging] ${error.message}`)
},
},
)
socket.on('close', () => {
clients.delete(socket)
})
socket.on('error', err => {
clients.delete(socket)
logForDebugging(`[udsMessaging] client error: ${errorMessage(err)}`)
})
})
socket.on('error', err => {
clients.delete(socket)
logForDebugging(`[udsMessaging] client error: ${errorMessage(err)}`)
srv.on('error', reject)
srv.listen(path, () => {
void (async () => {
try {
if (process.platform !== 'win32') {
await chmod(path, 0o600)
}
server = srv
// Export so child processes can discover the socket
process.env.CLAUDE_CODE_MESSAGING_SOCKET = path
logForDebugging(
`[udsMessaging] server listening on ${path}${opts?.isExplicit ? ' (explicit)' : ''}`,
)
resolve()
} catch (error) {
srv.close(() => reject(error))
}
})()
})
})
srv.on('error', reject)
srv.listen(path, () => {
server = srv
// Export so child processes can discover the socket
process.env.CLAUDE_CODE_MESSAGING_SOCKET = path
logForDebugging(
`[udsMessaging] server listening on ${path}${opts?.isExplicit ? ' (explicit)' : ''}`,
)
resolve()
})
})
} catch (error) {
if (capabilityFilePath) {
try {
await unlink(capabilityFilePath)
} catch {
// Already gone.
}
capabilityFilePath = null
}
socketPath = null
authToken = null
throw error
}
// Register cleanup so the socket file is removed on exit
registerCleanup(async () => {
@@ -230,6 +524,9 @@ export async function stopUdsMessaging(): Promise<void> {
server!.close(() => resolve())
})
server = null
inbox.length = 0
inboxBytes = 0
onEnqueueCb = null
// Remove socket file (skip on Windows — pipe paths aren't files)
if (socketPath) {
@@ -245,7 +542,30 @@ export async function stopUdsMessaging(): Promise<void> {
`[udsMessaging] server stopped, socket removed: ${socketPath}`,
)
socketPath = null
authToken = null
}
if (capabilityFilePath) {
try {
await unlink(capabilityFilePath)
} catch {
// Already gone
}
capabilityFilePath = null
}
}
function parseResponseLine(line: string): UdsMessage | null {
try {
return jsonParse(line) as UdsMessage
} catch {
return null
}
}
function getChunkBytes(chunk: string | Buffer): number {
return typeof chunk === 'string'
? Buffer.byteLength(chunk, 'utf8')
: chunk.byteLength
}
/**
@@ -255,23 +575,66 @@ export async function stopUdsMessaging(): Promise<void> {
export async function sendUdsMessage(
targetSocketPath: string,
message: UdsMessage,
opts: { authToken?: string } = {},
): Promise<void> {
const { createConnection } = await import('net')
message.from = message.from ?? socketPath ?? undefined
message.ts = message.ts ?? new Date().toISOString()
const token = opts.authToken ?? authToken
if (!token) {
throw new Error('Cannot send UDS message without auth token')
}
const outbound = withRequestAuthToken(
{
...message,
from: message.from ?? socketPath ?? undefined,
ts: message.ts ?? new Date().toISOString(),
},
token,
)
return new Promise<void>((resolve, reject) => {
let buffer = ''
let settled = false
const finish = (error?: Error): void => {
if (settled) return
settled = true
conn.end()
if (error) reject(error)
else resolve()
}
const conn = createConnection(targetSocketPath, () => {
conn.write(jsonStringify(message) + '\n', err => {
conn.end()
if (err) reject(err)
else resolve()
conn.write(jsonStringify(outbound) + '\n', err => {
if (err) finish(err)
})
})
conn.on('error', reject)
conn.on('data', chunk => {
if (
Buffer.byteLength(buffer, 'utf8') + getChunkBytes(chunk) >
MAX_UDS_FRAME_BYTES
) {
finish(new Error('UDS response frame exceeded size limit'))
return
}
buffer += chunk.toString()
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
if (!line.trim()) continue
const response = parseResponseLine(line)
if (!response) continue
if (response.type === 'response' || response.type === 'pong') {
finish()
return
}
if (response.type === 'error') {
finish(new Error(response.data ?? 'UDS receiver rejected message'))
return
}
}
})
conn.on('error', err => finish(err))
// Timeout so we don't hang on unreachable sockets
conn.setTimeout(5000, () => {
conn.destroy(new Error('Connection timed out'))
finish(new Error('Connection timed out'))
})
})
}