diff --git a/src/cli/print.ts b/src/cli/print.ts index 6cddf7b08..8b0aedc46 100644 --- a/src/cli/print.ts +++ b/src/cli/print.ts @@ -2819,97 +2819,89 @@ function runHeadlessStreaming( let cronScheduler: import('../utils/cronScheduler.js').CronScheduler | null = null if (cronGate.isKairosCronEnabled()) { + // Shared dedup-claim → input-close-recheck → onSuccess pipeline for the + // three cron entry points (legacy onFire, onFireTask agent, onFireTask + // non-agent). Centralizing the cancel-on-late-shutdown contract here keeps + // the three branches from drifting on what happens between claim and + // dispatch. onSuccess receives the claimed QueuedCommand and decides + // whether to enqueue it (normal path) or mark the run failed (agent path). + const dispatchHeadlessCronCommand = (params: { + basePrompt: string + sourceId: string + sourceLabel: string + logSuffix: string + onSuccess: (command: QueuedCommand) => void | Promise + }): void => { + if (inputClosed) return + void (async () => { + const command = await createAutonomyQueuedPromptIfNoActiveSource({ + basePrompt: params.basePrompt, + trigger: 'scheduled-task', + currentDir: cwd(), + sourceId: params.sourceId, + sourceLabel: params.sourceLabel, + workload: WORKLOAD_CRON, + shouldCreate: () => !inputClosed, + }) + if (!command) return + if (inputClosed) { + await cancelQueuedAutonomyCommands({ commands: [command] }) + return + } + await params.onSuccess(command) + })().catch(error => { + logError(error) + logForDebugging( + `[ScheduledTasks] failed to enqueue headless task${params.logSuffix}: ${error}`, + { level: 'error' }, + ) + }) + } + + const enqueueAndRun = (command: QueuedCommand): void => { + enqueue({ + ...command, + uuid: randomUUID(), + }) + void run() + } + cronScheduler = cronSchedulerModule.createCronScheduler({ onFire: prompt => { - if (inputClosed) return - void (async () => { - // Use the prompt itself as the dedup source: legacy KAIROS-style - // cron entries fire the same prompt repeatedly, and without a - // dedicated task id the prompt text is what uniquely identifies - // the entry. Without source-dedup, repeated fires would stack - // additional runs while an earlier one is still active. Match the - // onFireTask branch below to keep the two paths consistent. - const command = await createAutonomyQueuedPromptIfNoActiveSource({ - basePrompt: prompt, - trigger: 'scheduled-task', - currentDir: cwd(), - sourceId: prompt, - sourceLabel: prompt, - workload: WORKLOAD_CRON, - shouldCreate: () => !inputClosed, - }) - if (!command) return - if (inputClosed) { - await cancelQueuedAutonomyCommands({ commands: [command] }) - return - } - enqueue({ - ...command, - uuid: randomUUID(), - }) - void run() - })().catch(error => { - logError(error) - logForDebugging( - `[ScheduledTasks] failed to enqueue headless task: ${error}`, - { - level: 'error', - }, - ) + // Legacy KAIROS-style entries: the prompt text is what uniquely + // identifies the cron entry, so it doubles as both source id and + // source label for dedup. + dispatchHeadlessCronCommand({ + basePrompt: prompt, + sourceId: prompt, + sourceLabel: prompt, + logSuffix: '', + onSuccess: enqueueAndRun, }) }, onFireTask: task => { - if (inputClosed) return - void (async () => { - if (task.agentId) { - const command = await createAutonomyQueuedPromptIfNoActiveSource({ - basePrompt: task.prompt, - trigger: 'scheduled-task', - currentDir: cwd(), - sourceId: task.id, - sourceLabel: task.prompt, - workload: WORKLOAD_CRON, - shouldCreate: () => !inputClosed, - }) - if (!command) return - if (inputClosed) { - await cancelQueuedAutonomyCommands({ commands: [command] }) - return - } - await markAutonomyRunFailed( - command.autonomy!.runId, - `No teammate runtime available for scheduled task owner ${task.agentId} in headless mode.`, - command.autonomy!.rootDir, - ) - return - } - const command = await createAutonomyQueuedPromptIfNoActiveSource({ + if (task.agentId) { + dispatchHeadlessCronCommand({ basePrompt: task.prompt, - trigger: 'scheduled-task', - currentDir: cwd(), sourceId: task.id, sourceLabel: task.prompt, - workload: WORKLOAD_CRON, - shouldCreate: () => !inputClosed, - }) - if (!command) return - if (inputClosed) { - await cancelQueuedAutonomyCommands({ commands: [command] }) - return - } - enqueue({ - ...command, - uuid: randomUUID(), - }) - void run() - })().catch(error => { - logError(error) - logForDebugging( - `[ScheduledTasks] failed to enqueue headless task ${task.id}: ${error}`, - { - level: 'error', + logSuffix: ` ${task.id}`, + onSuccess: async command => { + await markAutonomyRunFailed( + command.autonomy!.runId, + `No teammate runtime available for scheduled task owner ${task.agentId} in headless mode.`, + command.autonomy!.rootDir, + ) }, - ) + }) + return + } + dispatchHeadlessCronCommand({ + basePrompt: task.prompt, + sourceId: task.id, + sourceLabel: task.prompt, + logSuffix: ` ${task.id}`, + onSuccess: enqueueAndRun, }) }, isLoading: () => running || inputClosed, diff --git a/src/services/compact/postCompactCleanup.ts b/src/services/compact/postCompactCleanup.ts index a98806c4e..b89e3a0be 100644 --- a/src/services/compact/postCompactCleanup.ts +++ b/src/services/compact/postCompactCleanup.ts @@ -5,6 +5,7 @@ import { getUserContext } from '../../context.js' import { clearSpeculativeChecks } from '@claude-code-best/builtin-tools/tools/BashTool/bashPermissions.js' import { clearClassifierApprovals } from '../../utils/classifierApprovals.js' import { resetGetMemoryFilesCache } from '../../utils/claudemd.js' +import { logError } from '../../utils/log.js' import { clearSessionMessagesCache } from '../../utils/sessionStorage.js' import { clearBetaTracingState } from '../../utils/telemetry/betaSessionTracing.js' import { resetMicrocompactState } from './microCompact.js' @@ -75,9 +76,16 @@ export function runPostCompactCleanup(querySource?: QuerySource): void { // (REPL post-compact handler, /compact command, autoCompact) finish their // own state transitions without an extra microtask round-trip — the sweep // catches up on the next event-loop tick. - void import('../../utils/attributionHooks.js').then(m => - m.sweepFileContentCache(), - ) + // + // The .catch is required even though the current attributionHooks.ts is a + // no-op stub: without it, a future restored sweepFileContentCache that + // throws would surface as an unhandled promise rejection from a function + // whose synchronous signature gives callers no way to observe it. + void import('../../utils/attributionHooks.js') + .then(m => m.sweepFileContentCache()) + .catch(error => { + logError(error) + }) } clearSessionMessagesCache() } diff --git a/src/utils/__tests__/autonomyRuns.test.ts b/src/utils/__tests__/autonomyRuns.test.ts index 03bb5923a..268b856fd 100644 --- a/src/utils/__tests__/autonomyRuns.test.ts +++ b/src/utils/__tests__/autonomyRuns.test.ts @@ -1,7 +1,5 @@ import { afterEach, beforeEach, describe, expect, test } from 'bun:test' -import { existsSync, readFileSync } from 'fs' -import { mkdir, writeFile } from 'fs/promises' -import { join, resolve as resolvePath } from 'path' +import { join, resolve as resolvePath } from 'node:path' import { resetStateForTests, setCwdState, @@ -42,11 +40,14 @@ import { cleanupTempDir, createTempDir, createTempSubdir, + readTempFile, + tempPathExists, writeTempFile, } from '../../../tests/mocks/file-system' const AGENTS_REL = join(AUTONOMY_DIR, 'AGENTS.md') const HEARTBEAT_REL = join(AUTONOMY_DIR, 'HEARTBEAT.md') +const RUNS_REL = join(AUTONOMY_DIR, 'runs.json') let tempDir = '' @@ -349,9 +350,9 @@ describe('autonomyRuns', () => { ) // Seed an active queued run for cron-1 so the next dedup attempt skips. - await mkdir(join(tempDir, AUTONOMY_DIR), { recursive: true }) - await writeFile( - resolveAutonomyRunsPath(tempDir), + await writeTempFile( + tempDir, + RUNS_REL, `${JSON.stringify( { runs: [ @@ -373,7 +374,6 @@ describe('autonomyRuns', () => { null, 2, )}\n`, - 'utf-8', ) const skipped = await createAutonomyQueuedPromptIfNoActiveSource({ @@ -400,9 +400,9 @@ describe('autonomyRuns', () => { }) test('createAutonomyQueuedPromptIfNoActiveSource recovers stale active runs from dead owner processes', async () => { - await mkdir(join(tempDir, AUTONOMY_DIR), { recursive: true }) - await writeFile( - resolveAutonomyRunsPath(tempDir), + await writeTempFile( + tempDir, + RUNS_REL, `${JSON.stringify( { runs: [ @@ -426,7 +426,6 @@ describe('autonomyRuns', () => { null, 2, )}\n`, - 'utf-8', ) await expect( @@ -483,7 +482,7 @@ describe('autonomyRuns', () => { await markAutonomyRunRunning(runId, tempDir, 100) const runsPath = resolveAutonomyRunsPath(tempDir) - const file = JSON.parse(readFileSync(runsPath, 'utf-8')) as { + const file = JSON.parse(await readTempFile(runsPath)) as { runs: Array> } file.runs = file.runs.map(run => @@ -491,7 +490,7 @@ describe('autonomyRuns', () => { ? { ...run, ownerProcessId: 2_147_483_647 } : run, ) - await writeFile(runsPath, `${JSON.stringify(file, null, 2)}\n`, 'utf-8') + await writeTempFile(tempDir, RUNS_REL, `${JSON.stringify(file, null, 2)}\n`) const replacement = await createAutonomyQueuedPromptIfNoActiveSource({ basePrompt: 'replacement prompt', @@ -615,11 +614,10 @@ describe('autonomyRuns', () => { endedAt: 2_000 + index, })), ] - await mkdir(join(tempDir, AUTONOMY_DIR), { recursive: true }) - await writeFile( - resolveAutonomyRunsPath(tempDir), + await writeTempFile( + tempDir, + RUNS_REL, `${JSON.stringify({ runs }, null, 2)}\n`, - 'utf-8', ) await createAutonomyRun({ @@ -637,10 +635,9 @@ describe('autonomyRuns', () => { }) test('listAutonomyRuns keeps older persisted records by normalizing missing runtime and owner metadata', async () => { - const runsPath = resolveAutonomyRunsPath(tempDir) - await mkdir(join(tempDir, '.claude', 'autonomy'), { recursive: true }) - await writeFile( - runsPath, + await writeTempFile( + tempDir, + RUNS_REL, `${JSON.stringify( { runs: [ @@ -657,7 +654,6 @@ describe('autonomyRuns', () => { null, 2, )}\n`, - 'utf-8', ) const [legacy] = await listAutonomyRuns(tempDir) @@ -832,7 +828,7 @@ describe('autonomyRuns', () => { expect(recovered!.autonomy?.flowId).toBe(flow!.flowId) }) - test('STALE_ACTIVE_RUN_ERROR_PREFIX stays in sync with HEARTBEAT.md stale-recovery-health task', () => { + test('STALE_ACTIVE_RUN_ERROR_PREFIX stays in sync with HEARTBEAT.md stale-recovery-health task', async () => { // The HEARTBEAT.md stale-recovery-health task prompt embeds this prefix // as a literal string. Changing the constant without updating the // heartbeat prompt would silently break the monitor — this test fails @@ -846,12 +842,12 @@ describe('autonomyRuns', () => { 'autonomy', 'HEARTBEAT.md', ) - if (!existsSync(heartbeatPath)) { + if (!(await tempPathExists(heartbeatPath))) { // .claude/ may be absent in some checkout layouts (e.g., shallow clone // for npm pack). Skip rather than fail in that case. return } - const content = readFileSync(heartbeatPath, 'utf8') + const content = await readTempFile(heartbeatPath) expect(content).toContain(STALE_ACTIVE_RUN_ERROR_PREFIX) }) }) diff --git a/src/utils/autonomyFlows.ts b/src/utils/autonomyFlows.ts index 01334937e..989dd851f 100644 --- a/src/utils/autonomyFlows.ts +++ b/src/utils/autonomyFlows.ts @@ -3,7 +3,10 @@ import { mkdir, writeFile } from 'fs/promises' import { dirname, join, resolve } from 'path' import { getProjectRoot } from '../bootstrap/state.js' import { AUTONOMY_DIR, type AutonomyTriggerKind } from './autonomyAuthority.js' -import { withAutonomyPersistenceLock } from './autonomyPersistence.js' +import { + retainActiveFirst, + withAutonomyPersistenceLock, +} from './autonomyPersistence.js' import { getFsImplementation } from './fsOperations.js' const AUTONOMY_FLOWS_MAX = 100 @@ -170,26 +173,12 @@ function isManagedFlowStatusActive(status: AutonomyFlowStatus): boolean { function selectPersistedAutonomyFlows( flows: AutonomyFlowRecord[], ): AutonomyFlowRecord[] { - // Two-phase sort. Phase 1: priority sort (active flows first, then by - // updatedAt desc) selects the AUTONOMY_FLOWS_MAX most-relevant records to - // retain — active flows are guaranteed a slot before any inactive flow is - // considered. Phase 2: re-sort the retained slice by updatedAt desc only, - // so the persisted file is in plain reverse-chronological order regardless - // of activity status. Listings/UI consume the persisted order directly. - const retained = flows - .slice() - .map(cloneFlowRecord) - .sort((left, right) => { - const leftActive = isManagedFlowStatusActive(left.status) - const rightActive = isManagedFlowStatusActive(right.status) - if (leftActive !== rightActive) { - return leftActive ? -1 : 1 - } - return right.updatedAt - left.updatedAt - }) - .slice(0, AUTONOMY_FLOWS_MAX) - - return retained.sort((left, right) => right.updatedAt - left.updatedAt) + return retainActiveFirst( + flows.map(cloneFlowRecord), + flow => isManagedFlowStatusActive(flow.status), + flow => flow.updatedAt, + AUTONOMY_FLOWS_MAX, + ) } function defaultFlowSource(params: { diff --git a/src/utils/autonomyPersistence.ts b/src/utils/autonomyPersistence.ts index 649270bcc..4085a1a9a 100644 --- a/src/utils/autonomyPersistence.ts +++ b/src/utils/autonomyPersistence.ts @@ -4,6 +4,33 @@ import { lock } from './lockfile.js' const persistenceLocks = new Map>() +/** + * Two-phase persistence retention. Active records (queued/running, etc.) are + * always kept — capping them risks evicting in-flight work; that responsibility + * lives in caller-side leak detection. Inactive (terminal) records are ranked + * by `getTimestamp` desc and capped to fill the remaining budget below `max`. + * + * Returned list is sorted by `getTimestamp` desc regardless of activity, so + * the persisted file is plain reverse-chronological order — listings/UI can + * consume it directly without re-sorting. + */ +export function retainActiveFirst( + records: readonly T[], + isActive: (record: T) => boolean, + getTimestamp: (record: T) => number, + max: number, +): T[] { + const sortDesc = (left: T, right: T) => + getTimestamp(right) - getTimestamp(left) + const active = records.filter(isActive).slice().sort(sortDesc) + const history = records + .filter(record => !isActive(record)) + .slice() + .sort(sortDesc) + .slice(0, Math.max(0, max - active.length)) + return [...active, ...history].sort(sortDesc) +} + export function getAutonomyPersistenceLockCountForTests(): number { if (process.env.NODE_ENV !== 'test') { throw new Error( diff --git a/src/utils/autonomyRuns.ts b/src/utils/autonomyRuns.ts index 89970eaaf..d850be928 100644 --- a/src/utils/autonomyRuns.ts +++ b/src/utils/autonomyRuns.ts @@ -27,12 +27,22 @@ import { type AutonomyFlowSyncMode, type ManagedAutonomyFlowStepDefinition, } from './autonomyFlows.js' -import { withAutonomyPersistenceLock } from './autonomyPersistence.js' +import { + retainActiveFirst, + withAutonomyPersistenceLock, +} from './autonomyPersistence.js' import { getFsImplementation } from './fsOperations.js' import { isProcessRunning } from './genericProcessUtils.js' import { logError } from './log.js' const AUTONOMY_RUNS_MAX = 200 +// Diagnostic threshold for active (queued/running) runs. Active records are +// deliberately exempt from AUTONOMY_RUNS_MAX so a leak in finalization cannot +// silently evict in-flight work; that exemption only makes sense if a leak is +// loud when it appears. Crossing this threshold warns once per process so +// operators see the divergence in logs before runs.json grows pathologically. +const AUTONOMY_ACTIVE_RUNS_WARN_THRESHOLD = 100 +let warnedActiveRunsThresholdCrossed = false const AUTONOMY_RUNS_RELATIVE_PATH = join(AUTONOMY_DIR, 'runs.json') // Sentinel string surfaced to operators via runs.json error fields and // referenced literally by the HEARTBEAT.md `stale-recovery-health` task. @@ -130,17 +140,24 @@ function isAutonomyRunActive(run: AutonomyRunRecord): boolean { function selectPersistedAutonomyRuns( runs: AutonomyRunRecord[], ): AutonomyRunRecord[] { - const cloned = runs.slice().map(cloneRunRecord) - const active = cloned - .filter(isAutonomyRunActive) - .sort((left, right) => right.createdAt - left.createdAt) - const history = cloned - .filter(run => !isAutonomyRunActive(run)) - .sort((left, right) => right.createdAt - left.createdAt) - .slice(0, Math.max(0, AUTONOMY_RUNS_MAX - active.length)) - - return [...active, ...history].sort( - (left, right) => right.createdAt - left.createdAt, + const cloned = runs.map(cloneRunRecord) + const activeCount = cloned.filter(isAutonomyRunActive).length + if ( + !warnedActiveRunsThresholdCrossed && + activeCount >= AUTONOMY_ACTIVE_RUNS_WARN_THRESHOLD + ) { + warnedActiveRunsThresholdCrossed = true + logError( + new Error( + `autonomy: ${activeCount} active runs exceed warn threshold ${AUTONOMY_ACTIVE_RUNS_WARN_THRESHOLD}; check for finalize leaks`, + ), + ) + } + return retainActiveFirst( + cloned, + isAutonomyRunActive, + run => run.createdAt, + AUTONOMY_RUNS_MAX, ) } diff --git a/tests/mocks/file-system.ts b/tests/mocks/file-system.ts index e356ec015..c46defc6c 100644 --- a/tests/mocks/file-system.ts +++ b/tests/mocks/file-system.ts @@ -30,3 +30,21 @@ export async function createTempSubdir( await mkdir(path, { recursive: true }) return path } + +/** + * Read a file under the test temp dir as utf-8 text. Mirrors the node:fs + * `readFileSync(path, 'utf-8')` ergonomics but uses Bun's native file API so + * tests stay on the Bun-only runtime contract. + */ +export async function readTempFile(path: string): Promise { + return Bun.file(path).text() +} + +/** + * Best-effort existence check for a path under the test temp dir. Uses Bun's + * native file API (works for files; directories return true via Bun.file().exists() + * iff the path resolves — reads directly from the filesystem). + */ +export async function tempPathExists(path: string): Promise { + return Bun.file(path).exists() +}