mirror of
https://github.com/claude-code-best/claude-code.git
synced 2026-06-19 23:05:51 +00:00
fix: bound agent communication memory growth (#369)
* fix: bound agent communication memory growth UDS messaging now uses private local capabilities instead of exposing auth tokens through SDK metadata, environment variables, session registry, peer listing, or tool output. The receive path bounds NDJSON frames, response buffers, active clients, and pending inbox bytes, and strips auth metadata before messages enter the prompt queue. Teammate mailboxes now validate file and message sizes, fail closed on corrupt mutation inputs, compact by count and retained bytes, and use stable message identity for in-process acknowledgements. Agent summaries now fork only a bounded recent context using lazy size estimation and content fingerprints instead of retaining or serializing unbounded histories. Constraint: PR #361 was already merged; this branch is based on upstream/main@c2ac9a74. Rejected: Default-disabling COORDINATOR_MODE/TEAMMEM only | explicit feature enablement still hit unbounded paths. Rejected: Persisting UDS auth in SDK/env/session registry | bridge/remote metadata can leak local capability secrets. Rejected: Inline uds #token addresses | observable/tool/classifier paths can reflect raw addresses outside the UDS request frame. Rejected: Positional mailbox marking after compaction | compaction can shift indices across the lock boundary. Confidence: high Scope-risk: moderate Directive: Do not expose UDS capability tokens through SDK messages, environment variables, session registry, peer-list output, or SendMessage result/classifier surfaces. Directive: Do not reintroduce positional mailbox acknowledgements unless compaction is removed or read+mark is atomic under one lock. Tested: bun test src/utils/__tests__/ndjsonFramer.test.ts src/utils/__tests__/udsMessaging.test.ts packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts Tested: bunx tsc --noEmit --pretty false Tested: bun run lint Tested: bunx biome lint modified src/package files Tested: bun run test:all (3704 pass, 0 fail, 6734 expects) Tested: bun audit (No vulnerabilities found) Tested: bun run build Tested: bun run build:vite Tested: git diff --check Not-tested: End-to-end external UDS client driving a full production headless model turn. * fix: harden bounded agent communication review fixes CodeRabbit and Codecov surfaced real gaps in UDS framing, peer discovery, mailbox retention, and summary context coverage. This tightens those paths without suppressing review or coverage signals. Constraint: PR #369 must address CodeRabbit and Codecov findings without warning suppression or fake fallbacks Rejected: Suppress Codecov or CodeRabbit warnings | leaves real receive-path and test-isolation gaps Rejected: Add unreachable feature-gated tests | bun:bundle keeps those branches compile-time gated in local tests Confidence: high Scope-risk: moderate Directive: Keep UDS auth-token rejection outside feature flags; do not reintroduce inline token fallbacks Tested: bun test --coverage --coverage-reporter lcov --coverage-dir coverage; bun run test:all; bun run lint; bun run build; bun run build:vite; bun audit; git diff --cached --check Not-tested: Remote Codecov/CodeRabbit refreshed reports until pushed * fix: prevent agent communication bounds from hiding CI regressions Tighten the UDS auth, framing, and response-reader boundaries while keeping the AgentSummary lifecycle covered so Codecov and CI fail on real regressions instead of missing coverage. The poorMode settings mock mirrors unrelated real settings defaults to avoid Bun mock retention changing later permission tests. Constraint: PR #369 must fix Codecov/CI precisely without warning suppression, fallback masking, or mock pollution Rejected: Delete AgentSummary lifecycle coverage | would hide Codecov loss and stale-summary behavior Rejected: Store inline UDS rejection in a hidden input sentinel | cloned observable inputs can drop it and bypass rejection Rejected: Ignore malformed UDS frames until timeout | leaves client slots and SendMessage calls open to exhaustion Confidence: high Scope-risk: moderate Directive: Keep empty #token= markers rejected; do not require a non-empty token value in hasInlineUdsToken Tested: bun test packages/builtin-tools/src/tools/SendMessageTool/__tests__/udsRecipientSanitization.test.ts src/utils/__tests__/udsMessaging.test.ts src/utils/__tests__/udsResponseReader.test.ts src/utils/__tests__/ndjsonFramer.test.ts Tested: bunx tsc --noEmit --pretty false Tested: bun run lint Tested: bun test --coverage --coverage-reporter lcov --coverage-dir coverage Tested: bun run test:all Tested: bun audit Tested: bun run build Tested: bun run build:vite Not-tested: GitHub-hosted Codecov upload until pushed PR checks rerun --------- Co-authored-by: unraid <local@unraid.local>
This commit is contained in:
@@ -7,7 +7,8 @@
|
||||
* Note: Inboxes are keyed by agent name within a team.
|
||||
*/
|
||||
|
||||
import { mkdir, readFile, writeFile } from 'fs/promises'
|
||||
import { randomBytes } from 'crypto'
|
||||
import { mkdir, readFile, rename, stat, unlink, writeFile } from 'fs/promises'
|
||||
import { join } from 'path'
|
||||
import { z } from 'zod/v4'
|
||||
import { TEAMMATE_MESSAGE_TAG } from '../constants/xml.js'
|
||||
@@ -40,6 +41,13 @@ const LOCK_OPTIONS = {
|
||||
},
|
||||
}
|
||||
|
||||
export const MAX_MAILBOX_MESSAGES = 1_000
|
||||
export const MAX_READ_MAILBOX_MESSAGES = 200
|
||||
export const MAX_UNREAD_PROTOCOL_MAILBOX_MESSAGES = 2_000
|
||||
export const MAX_MAILBOX_MESSAGE_TEXT_BYTES = 64 * 1024
|
||||
export const MAX_MAILBOX_RETAINED_BYTES = 2 * 1024 * 1024
|
||||
export const MAX_MAILBOX_FILE_BYTES = 4 * 1024 * 1024
|
||||
|
||||
export type TeammateMessage = {
|
||||
from: string
|
||||
text: string
|
||||
@@ -49,6 +57,223 @@ export type TeammateMessage = {
|
||||
summary?: string // 5-10 word summary shown as preview in the UI
|
||||
}
|
||||
|
||||
function isJsonLikeMessage(text: string): boolean {
|
||||
const trimmed = text.trimStart()
|
||||
return trimmed.startsWith('{') || trimmed.startsWith('[')
|
||||
}
|
||||
|
||||
function shouldRetainUnreadAsProtocolMessage(
|
||||
message: TeammateMessage,
|
||||
): boolean {
|
||||
if (message.read) return false
|
||||
if (isStructuredProtocolMessage(message.text)) return true
|
||||
if (!isJsonLikeMessage(message.text)) return false
|
||||
|
||||
try {
|
||||
const parsed = jsonParse(message.text)
|
||||
return Boolean(
|
||||
parsed &&
|
||||
typeof parsed === 'object' &&
|
||||
'type' in (parsed as Record<string, unknown>),
|
||||
)
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
function sameMailboxMessage(a: TeammateMessage, b: TeammateMessage): boolean {
|
||||
return a.from === b.from && a.timestamp === b.timestamp && a.text === b.text
|
||||
}
|
||||
|
||||
function mailboxMessageStorageBytes(message: TeammateMessage): number {
|
||||
return Buffer.byteLength(jsonStringify(message), 'utf8')
|
||||
}
|
||||
|
||||
function assertMailboxMessageSize(message: TeammateMessage): void {
|
||||
const textBytes = Buffer.byteLength(message.text, 'utf8')
|
||||
if (textBytes > MAX_MAILBOX_MESSAGE_TEXT_BYTES) {
|
||||
throw new Error(
|
||||
`Mailbox message text exceeds ${MAX_MAILBOX_MESSAGE_TEXT_BYTES} bytes`,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
function toMailboxMessage(value: unknown): TeammateMessage {
|
||||
if (!value || typeof value !== 'object') {
|
||||
throw new Error('Invalid mailbox message: expected object')
|
||||
}
|
||||
const record = value as Record<string, unknown>
|
||||
if (
|
||||
typeof record.from !== 'string' ||
|
||||
typeof record.text !== 'string' ||
|
||||
typeof record.timestamp !== 'string' ||
|
||||
typeof record.read !== 'boolean'
|
||||
) {
|
||||
throw new Error('Invalid mailbox message shape')
|
||||
}
|
||||
const message: TeammateMessage = {
|
||||
from: record.from,
|
||||
text: record.text,
|
||||
timestamp: record.timestamp,
|
||||
read: record.read,
|
||||
...(typeof record.color === 'string' ? { color: record.color } : {}),
|
||||
...(typeof record.summary === 'string' ? { summary: record.summary } : {}),
|
||||
}
|
||||
assertMailboxMessageSize(message)
|
||||
return message
|
||||
}
|
||||
|
||||
function parseMailboxMessages(content: string): TeammateMessage[] {
|
||||
const parsed = jsonParse(content)
|
||||
if (!Array.isArray(parsed)) {
|
||||
throw new Error('Invalid mailbox file: expected message array')
|
||||
}
|
||||
return parsed.map(toMailboxMessage)
|
||||
}
|
||||
|
||||
async function readMailboxFile(inboxPath: string): Promise<string> {
|
||||
const info = await stat(inboxPath)
|
||||
if (info.size > MAX_MAILBOX_FILE_BYTES) {
|
||||
throw new Error(
|
||||
`Mailbox file exceeds ${MAX_MAILBOX_FILE_BYTES} bytes: ${inboxPath}`,
|
||||
)
|
||||
}
|
||||
return readFile(inboxPath, 'utf-8')
|
||||
}
|
||||
|
||||
async function readMailboxForMutation(
|
||||
agentName: string,
|
||||
teamName?: string,
|
||||
): Promise<TeammateMessage[]> {
|
||||
const inboxPath = getInboxPath(agentName, teamName)
|
||||
return parseMailboxMessages(await readMailboxFile(inboxPath))
|
||||
}
|
||||
|
||||
async function writeMailboxAtomic(
|
||||
inboxPath: string,
|
||||
content: string,
|
||||
): Promise<void> {
|
||||
const bytes = Buffer.byteLength(content, 'utf8')
|
||||
if (bytes > MAX_MAILBOX_FILE_BYTES) {
|
||||
throw new Error(
|
||||
`Compacted mailbox still exceeds ${MAX_MAILBOX_FILE_BYTES} bytes`,
|
||||
)
|
||||
}
|
||||
const tempPath = `${inboxPath}.${process.pid}.${randomBytes(8).toString('hex')}.tmp`
|
||||
try {
|
||||
await writeFile(tempPath, content, 'utf-8')
|
||||
await rename(tempPath, inboxPath)
|
||||
} catch (error) {
|
||||
await unlink(tempPath).catch(() => undefined)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
export function compactMailboxMessages(
|
||||
messages: TeammateMessage[],
|
||||
limits: {
|
||||
maxMessages?: number
|
||||
maxReadMessages?: number
|
||||
maxUnreadProtocolMessages?: number
|
||||
maxRetainedBytes?: number
|
||||
} = {},
|
||||
): TeammateMessage[] {
|
||||
const maxMessages = limits.maxMessages ?? MAX_MAILBOX_MESSAGES
|
||||
const maxReadMessages = limits.maxReadMessages ?? MAX_READ_MAILBOX_MESSAGES
|
||||
const maxUnreadProtocolMessages =
|
||||
limits.maxUnreadProtocolMessages ?? MAX_UNREAD_PROTOCOL_MAILBOX_MESSAGES
|
||||
const maxRetainedBytes = limits.maxRetainedBytes ?? MAX_MAILBOX_RETAINED_BYTES
|
||||
|
||||
if (
|
||||
maxRetainedBytes <= 0 ||
|
||||
(maxMessages <= 0 && maxUnreadProtocolMessages <= 0)
|
||||
) {
|
||||
return []
|
||||
}
|
||||
|
||||
const keepIndexes = new Set<number>()
|
||||
let retainedBytes = 0
|
||||
let keptUnreadProtocolMessages = 0
|
||||
const tryKeep = (index: number): boolean => {
|
||||
if (keepIndexes.has(index)) return true
|
||||
const message = messages[index]
|
||||
if (!message) return false
|
||||
const bytes = mailboxMessageStorageBytes(message)
|
||||
if (bytes > maxRetainedBytes || retainedBytes + bytes > maxRetainedBytes) {
|
||||
return false
|
||||
}
|
||||
keepIndexes.add(index)
|
||||
retainedBytes += bytes
|
||||
return true
|
||||
}
|
||||
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
const message = messages[i]
|
||||
if (!message || !shouldRetainUnreadAsProtocolMessage(message)) continue
|
||||
if (keptUnreadProtocolMessages >= maxUnreadProtocolMessages) continue
|
||||
if (tryKeep(i)) keptUnreadProtocolMessages++
|
||||
}
|
||||
|
||||
let keptNonProtocolMessages = 0
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
if (keptNonProtocolMessages >= maxMessages) break
|
||||
const message = messages[i]
|
||||
if (
|
||||
message &&
|
||||
!message.read &&
|
||||
!shouldRetainUnreadAsProtocolMessage(message)
|
||||
) {
|
||||
if (tryKeep(i)) keptNonProtocolMessages++
|
||||
}
|
||||
}
|
||||
|
||||
let keptReadMessages = 0
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
if (keptNonProtocolMessages >= maxMessages) break
|
||||
if (keptReadMessages >= maxReadMessages) break
|
||||
const message = messages[i]
|
||||
if (message?.read) {
|
||||
if (tryKeep(i)) {
|
||||
keptReadMessages++
|
||||
keptNonProtocolMessages++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return messages.filter((_message, index) => keepIndexes.has(index))
|
||||
}
|
||||
|
||||
function logUnreadMailboxEvictions(
|
||||
original: TeammateMessage[],
|
||||
compacted: TeammateMessage[],
|
||||
context: string,
|
||||
): void {
|
||||
const kept = new Set(compacted)
|
||||
const unreadEvicted = original.filter(message => {
|
||||
return !message.read && !kept.has(message)
|
||||
})
|
||||
if (unreadEvicted.length === 0) return
|
||||
|
||||
const protocolEvicted = count(unreadEvicted, message =>
|
||||
shouldRetainUnreadAsProtocolMessage(message),
|
||||
)
|
||||
logError(
|
||||
new Error(
|
||||
`[TeammateMailbox] Compacted ${unreadEvicted.length} unread message(s) in ${context}; protocol_or_unknown=${protocolEvicted}`,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
async function writeCompactedMailbox(
|
||||
inboxPath: string,
|
||||
messages: TeammateMessage[],
|
||||
context: string,
|
||||
): Promise<void> {
|
||||
const compacted = compactMailboxMessages(messages)
|
||||
logUnreadMailboxEvictions(messages, compacted, context)
|
||||
await writeMailboxAtomic(inboxPath, jsonStringify(compacted, null, 2))
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the path to a teammate's inbox file
|
||||
* Structure: ~/.claude/teams/{team_name}/inboxes/{agent_name}.json
|
||||
@@ -89,8 +314,7 @@ export async function readMailbox(
|
||||
logForDebugging(`[TeammateMailbox] readMailbox: path=${inboxPath}`)
|
||||
|
||||
try {
|
||||
const content = await readFile(inboxPath, 'utf-8')
|
||||
const messages = jsonParse(content) as TeammateMessage[]
|
||||
const messages = parseMailboxMessages(await readMailboxFile(inboxPath))
|
||||
logForDebugging(
|
||||
`[TeammateMailbox] readMailbox: read ${messages.length} message(s)`,
|
||||
)
|
||||
@@ -103,7 +327,7 @@ export async function readMailbox(
|
||||
}
|
||||
logForDebugging(`Failed to read inbox for ${agentName}: ${error}`)
|
||||
logError(error)
|
||||
return []
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,7 +380,7 @@ export async function writeToMailbox(
|
||||
`[TeammateMailbox] writeToMailbox: failed to create inbox file: ${error}`,
|
||||
)
|
||||
logError(error)
|
||||
return
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,22 +392,23 @@ export async function writeToMailbox(
|
||||
})
|
||||
|
||||
// Re-read messages after acquiring lock to get the latest state
|
||||
const messages = await readMailbox(recipientName, teamName)
|
||||
const messages = await readMailboxForMutation(recipientName, teamName)
|
||||
|
||||
const newMessage: TeammateMessage = {
|
||||
const newMessage = toMailboxMessage({
|
||||
...message,
|
||||
read: false,
|
||||
}
|
||||
})
|
||||
|
||||
messages.push(newMessage)
|
||||
|
||||
await writeFile(inboxPath, jsonStringify(messages, null, 2), 'utf-8')
|
||||
await writeCompactedMailbox(inboxPath, messages, 'writeToMailbox')
|
||||
logForDebugging(
|
||||
`[TeammateMailbox] Wrote message to ${recipientName}'s inbox from ${message.from}`,
|
||||
)
|
||||
} catch (error) {
|
||||
logForDebugging(`Failed to write to inbox for ${recipientName}: ${error}`)
|
||||
logError(error)
|
||||
throw error
|
||||
} finally {
|
||||
if (release) {
|
||||
await release()
|
||||
@@ -222,7 +447,7 @@ export async function markMessageAsReadByIndex(
|
||||
logForDebugging(`[TeammateMailbox] markMessageAsReadByIndex: lock acquired`)
|
||||
|
||||
// Re-read messages after acquiring lock to get the latest state
|
||||
const messages = await readMailbox(agentName, teamName)
|
||||
const messages = await readMailboxForMutation(agentName, teamName)
|
||||
logForDebugging(
|
||||
`[TeammateMailbox] markMessageAsReadByIndex: read ${messages.length} messages after lock`,
|
||||
)
|
||||
@@ -244,7 +469,7 @@ export async function markMessageAsReadByIndex(
|
||||
|
||||
messages[messageIndex] = { ...message, read: true }
|
||||
|
||||
await writeFile(inboxPath, jsonStringify(messages, null, 2), 'utf-8')
|
||||
await writeCompactedMailbox(inboxPath, messages, 'markMessageAsReadByIndex')
|
||||
logForDebugging(
|
||||
`[TeammateMailbox] markMessageAsReadByIndex: marked message at index ${messageIndex} as read`,
|
||||
)
|
||||
@@ -270,6 +495,46 @@ export async function markMessageAsReadByIndex(
|
||||
}
|
||||
}
|
||||
|
||||
export async function markMessageAsReadByIdentity(
|
||||
agentName: string,
|
||||
teamName: string | undefined,
|
||||
expectedMessage: TeammateMessage,
|
||||
): Promise<boolean> {
|
||||
const inboxPath = getInboxPath(agentName, teamName)
|
||||
const lockFilePath = `${inboxPath}.lock`
|
||||
|
||||
let release: (() => Promise<void>) | undefined
|
||||
try {
|
||||
release = await lockfile.lock(inboxPath, {
|
||||
lockfilePath: lockFilePath,
|
||||
...LOCK_OPTIONS,
|
||||
})
|
||||
|
||||
const messages = await readMailboxForMutation(agentName, teamName)
|
||||
const messageIndex = messages.findIndex(message => {
|
||||
return !message.read && sameMailboxMessage(message, expectedMessage)
|
||||
})
|
||||
if (messageIndex < 0) return false
|
||||
|
||||
messages[messageIndex] = { ...messages[messageIndex]!, read: true }
|
||||
await writeCompactedMailbox(
|
||||
inboxPath,
|
||||
messages,
|
||||
'markMessageAsReadByIdentity',
|
||||
)
|
||||
return true
|
||||
} catch (error) {
|
||||
const code = getErrnoCode(error)
|
||||
if (code === 'ENOENT') return false
|
||||
logError(error)
|
||||
return false
|
||||
} finally {
|
||||
if (release) {
|
||||
await release()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark all messages in a teammate's inbox as read
|
||||
* Uses file locking to prevent race conditions
|
||||
@@ -297,7 +562,7 @@ export async function markMessagesAsRead(
|
||||
logForDebugging(`[TeammateMailbox] markMessagesAsRead: lock acquired`)
|
||||
|
||||
// Re-read messages after acquiring lock to get the latest state
|
||||
const messages = await readMailbox(agentName, teamName)
|
||||
const messages = await readMailboxForMutation(agentName, teamName)
|
||||
logForDebugging(
|
||||
`[TeammateMailbox] markMessagesAsRead: read ${messages.length} messages after lock`,
|
||||
)
|
||||
@@ -317,7 +582,7 @@ export async function markMessagesAsRead(
|
||||
// messages comes from jsonParse — fresh, unshared objects safe to mutate
|
||||
for (const m of messages) m.read = true
|
||||
|
||||
await writeFile(inboxPath, jsonStringify(messages, null, 2), 'utf-8')
|
||||
await writeCompactedMailbox(inboxPath, messages, 'markMessagesAsRead')
|
||||
logForDebugging(
|
||||
`[TeammateMailbox] markMessagesAsRead: WROTE ${unreadCount} message(s) as read to ${inboxPath}`,
|
||||
)
|
||||
@@ -1114,7 +1379,7 @@ export async function markMessagesAsReadByPredicate(
|
||||
...LOCK_OPTIONS,
|
||||
})
|
||||
|
||||
const messages = await readMailbox(agentName, teamName)
|
||||
const messages = await readMailboxForMutation(agentName, teamName)
|
||||
if (messages.length === 0) {
|
||||
return
|
||||
}
|
||||
@@ -1123,7 +1388,11 @@ export async function markMessagesAsReadByPredicate(
|
||||
!m.read && predicate(m) ? { ...m, read: true } : m,
|
||||
)
|
||||
|
||||
await writeFile(inboxPath, jsonStringify(updatedMessages, null, 2), 'utf-8')
|
||||
await writeCompactedMailbox(
|
||||
inboxPath,
|
||||
updatedMessages,
|
||||
'markMessagesAsReadByPredicate',
|
||||
)
|
||||
} catch (error) {
|
||||
const code = getErrnoCode(error)
|
||||
if (code === 'ENOENT') {
|
||||
@@ -1161,7 +1430,12 @@ export function getLastPeerDmSummary(messages: Message[]): string | undefined {
|
||||
if (!Array.isArray(content)) continue
|
||||
for (const block of content) {
|
||||
if (typeof block === 'string') continue
|
||||
const b = block as unknown as { type: string; name?: string; input?: Record<string, unknown>; [key: string]: unknown }
|
||||
const b = block as unknown as {
|
||||
type: string
|
||||
name?: string
|
||||
input?: Record<string, unknown>
|
||||
[key: string]: unknown
|
||||
}
|
||||
if (
|
||||
b.type === 'tool_use' &&
|
||||
b.name === SEND_MESSAGE_TOOL_NAME &&
|
||||
@@ -1177,7 +1451,7 @@ export function getLastPeerDmSummary(messages: Message[]): string | undefined {
|
||||
const to = b.input.to as string
|
||||
const summary =
|
||||
'summary' in b.input && typeof b.input.summary === 'string'
|
||||
? b.input.summary as string
|
||||
? (b.input.summary as string)
|
||||
: (b.input.message as string).slice(0, 80)
|
||||
return `[to ${to}] ${summary}`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user