mirror of
https://github.com/claude-code-best/claude-code.git
synced 2026-06-20 07:15:51 +00:00
Merge branch 'fix/workflow-run-accumulation' into fix/acp-protocol
This commit is contained in:
@@ -10,6 +10,7 @@ import {
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import {
|
||||
cleanupOldRuns,
|
||||
getRunsDir,
|
||||
listPersistedRuns,
|
||||
readRunState,
|
||||
@@ -197,3 +198,108 @@ test('getRunsDir returns <projectRoot>/.claude/workflow-runs shape', () => {
|
||||
// do not hard-code projectRoot (differs across machines), only check suffix structure
|
||||
expect(dir.endsWith(`${join('.claude', 'workflow-runs')}`)).toBe(true)
|
||||
})
|
||||
|
||||
test('listPersistedRuns limit N returns the N newest by updatedAt desc', async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
|
||||
try {
|
||||
await writeRunState(dir, makeRun({ runId: 'old', updatedAt: 1000 }))
|
||||
await writeRunState(dir, makeRun({ runId: 'mid', updatedAt: 2000 }))
|
||||
await writeRunState(dir, makeRun({ runId: 'new', updatedAt: 3000 }))
|
||||
|
||||
expect((await listPersistedRuns(dir, 0)).map(r => r.runId)).toEqual([])
|
||||
expect((await listPersistedRuns(dir, 1)).map(r => r.runId)).toEqual(['new'])
|
||||
expect((await listPersistedRuns(dir, 2)).map(r => r.runId)).toEqual([
|
||||
'new',
|
||||
'mid',
|
||||
])
|
||||
// limit larger than total → returns all (no padding)
|
||||
expect((await listPersistedRuns(dir, 99)).map(r => r.runId)).toEqual([
|
||||
'new',
|
||||
'mid',
|
||||
'old',
|
||||
])
|
||||
// undefined → unchanged "load everything" semantics (back-compat)
|
||||
expect((await listPersistedRuns(dir)).map(r => r.runId)).toEqual([
|
||||
'new',
|
||||
'mid',
|
||||
'old',
|
||||
])
|
||||
} finally {
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
}
|
||||
})
|
||||
|
||||
test('cleanupOldRuns keeps the newest keepMax runs and removes the rest', async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
|
||||
try {
|
||||
await writeRunState(dir, makeRun({ runId: 'old', updatedAt: 1000 }))
|
||||
await writeRunState(dir, makeRun({ runId: 'mid', updatedAt: 2000 }))
|
||||
await writeRunState(dir, makeRun({ runId: 'new', updatedAt: 3000 }))
|
||||
|
||||
const removed = await cleanupOldRuns(dir, 1)
|
||||
expect(removed).toBe(2)
|
||||
const remaining = (await listPersistedRuns(dir)).map(r => r.runId)
|
||||
expect(remaining).toEqual(['new'])
|
||||
// pruned dirs are fully gone (state.json included)
|
||||
await expect(readRunState(dir, 'old')).resolves.toBeNull()
|
||||
await expect(readRunState(dir, 'mid')).resolves.toBeNull()
|
||||
} finally {
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
}
|
||||
})
|
||||
|
||||
test('cleanupOldRuns prunes orphan dirs (no state.json) first', async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
|
||||
try {
|
||||
await writeRunState(dir, makeRun({ runId: 'r1', updatedAt: 1000 }))
|
||||
await writeRunState(dir, makeRun({ runId: 'r2', updatedAt: 2000 }))
|
||||
// orphan: no state.json → treated as updatedAt=0, sorted last, pruned first
|
||||
await mkdir(join(dir, 'orphan'), { recursive: true })
|
||||
|
||||
const removed = await cleanupOldRuns(dir, 2)
|
||||
expect(removed).toBe(1)
|
||||
const entries = await readdir(dir)
|
||||
expect(entries).not.toContain('orphan')
|
||||
expect(entries).toContain('r1')
|
||||
expect(entries).toContain('r2')
|
||||
} finally {
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
}
|
||||
})
|
||||
|
||||
test('cleanupOldRuns under keepMax is a no-op', async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
|
||||
try {
|
||||
await writeRunState(dir, makeRun({ runId: 'r1', updatedAt: 1000 }))
|
||||
await writeRunState(dir, makeRun({ runId: 'r2', updatedAt: 2000 }))
|
||||
|
||||
const removed = await cleanupOldRuns(dir, 5)
|
||||
expect(removed).toBe(0)
|
||||
expect((await listPersistedRuns(dir)).map(r => r.runId)).toEqual([
|
||||
'r2',
|
||||
'r1',
|
||||
])
|
||||
} finally {
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
}
|
||||
})
|
||||
|
||||
test('cleanupOldRuns on missing dir returns 0 (no throw)', async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
await expect(cleanupOldRuns(dir, 5)).resolves.toBe(0)
|
||||
})
|
||||
|
||||
test('cleanupOldRuns negative keepMax is clamped to 0 (removes everything, no slice(-N) inversion)', async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
|
||||
try {
|
||||
await writeRunState(dir, makeRun({ runId: 'r1', updatedAt: 1000 }))
|
||||
await writeRunState(dir, makeRun({ runId: 'r2', updatedAt: 2000 }))
|
||||
|
||||
// Without the clamp, slice(-1) would keep 1 entry — violating "keep 0 means keep none".
|
||||
await expect(cleanupOldRuns(dir, -1)).resolves.toBe(2)
|
||||
expect(await listPersistedRuns(dir)).toEqual([])
|
||||
} finally {
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
}
|
||||
})
|
||||
|
||||
@@ -220,6 +220,41 @@ test('launch inline script → returns scriptPath (persisted to cwdOverride dir)
|
||||
}
|
||||
})
|
||||
|
||||
test('launch inline script with title → workflowName comes from title (not the "workflow" default)', async () => {
|
||||
__resetWorkflowServiceForTests()
|
||||
const { ports, store } = fakePorts()
|
||||
const svc = makeService(ports, store)
|
||||
const { runId } = await svc.launch(
|
||||
{ script: `return agent('x')`, title: 'Review PR #42' },
|
||||
stubTUC,
|
||||
stubCanUseTool,
|
||||
)
|
||||
await settle()
|
||||
const r = svc.getRun(runId)
|
||||
expect(r).toBeDefined()
|
||||
expect(r!.workflowName).toBe('Review PR #42')
|
||||
})
|
||||
|
||||
test('launch scriptPath with title → workflowName still honors title', async () => {
|
||||
__resetWorkflowServiceForTests()
|
||||
const dir = await mkdtemp(join(tmpdir(), 'wf-svc-'))
|
||||
try {
|
||||
const file = join(dir, 'wf.js')
|
||||
await writeFile(file, `return agent('x')`)
|
||||
const { ports, store } = fakePorts()
|
||||
const svc = makeService(ports, store)
|
||||
const { runId } = await svc.launch(
|
||||
{ scriptPath: file, title: 'From File' },
|
||||
stubTUC,
|
||||
stubCanUseTool,
|
||||
)
|
||||
await settle()
|
||||
expect(svc.getRun(runId)!.workflowName).toBe('From File')
|
||||
} finally {
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
}
|
||||
})
|
||||
|
||||
test('kill goes through taskRegistrar.kill', async () => {
|
||||
__resetWorkflowServiceForTests()
|
||||
const { ports, store, killed } = fakePorts()
|
||||
|
||||
@@ -1,4 +1,11 @@
|
||||
import { mkdir, readFile, readdir, rename, writeFile } from 'node:fs/promises'
|
||||
import {
|
||||
mkdir,
|
||||
readFile,
|
||||
readdir,
|
||||
rename,
|
||||
rm,
|
||||
writeFile,
|
||||
} from 'node:fs/promises'
|
||||
import { join } from 'node:path'
|
||||
import { getProjectRoot } from '../bootstrap/state.js'
|
||||
import { logForDebugging } from '../utils/debug.js'
|
||||
@@ -10,6 +17,13 @@ const SCHEMA_VERSION = 1
|
||||
const STATE_FILE = 'state.json'
|
||||
const STATE_TMP = 'state.json.tmp'
|
||||
|
||||
/**
|
||||
* Hard ceiling on persisted run directories on disk. Beyond this, the oldest runs (by updatedAt)
|
||||
* are pruned by cleanupOldRuns. Set generously above LOAD_PERSISTED_LIMIT so runs hidden from the
|
||||
* panel can still be resumed manually before aging out.
|
||||
*/
|
||||
const KEEP_MAX_RUNS = 50
|
||||
|
||||
/**
|
||||
* Single source for runsDir: shares the same root as ports.ts journalStore (${projectRoot}/.claude/workflow-runs).
|
||||
* Extracted as a function: eliminates duplicated path concatenation between ports.ts and persistence logic, staying in the same root when entering worktree/subdirectory.
|
||||
@@ -86,9 +100,12 @@ export async function readRunState(
|
||||
* - A subdirectory without state.json (half-written run) → skip
|
||||
* - A subdirectory whose state.json is corrupted → skip that single one, keep scanning the rest
|
||||
* - Sort by updatedAt descending (consistent with store.list() ordering)
|
||||
* - Optional limit: keep only the first N newest (used by loadPersistedRuns so the panel
|
||||
* doesn't drown under months of history; full scan stays available by omitting the arg).
|
||||
*/
|
||||
export async function listPersistedRuns(
|
||||
runsDir: string,
|
||||
limit?: number,
|
||||
): Promise<RunProgress[]> {
|
||||
let entries: string[]
|
||||
try {
|
||||
@@ -101,7 +118,56 @@ export async function listPersistedRuns(
|
||||
const run = await readRunState(runsDir, name)
|
||||
if (run) runs.push(run)
|
||||
}
|
||||
return runs.sort((a, b) => b.updatedAt - a.updatedAt)
|
||||
runs.sort((a, b) => b.updatedAt - a.updatedAt)
|
||||
return limit !== undefined && limit >= 0 ? runs.slice(0, limit) : runs
|
||||
}
|
||||
|
||||
/**
|
||||
* Garbage-collect stale run directories: sort subdirs of runsDir by their state.json.updatedAt
|
||||
* (newest first), then recursively remove everything past keepMax. Subdirs without state.json are
|
||||
* treated as oldest (they're orphans — half-written, killed-mid-write, or pre-schema leftovers) so
|
||||
* they get pruned first.
|
||||
*
|
||||
* Best-effort: per-dir failures only log, do not abort the sweep. Safe to call repeatedly
|
||||
* (idempotent — once under the cap, it's a no-op).
|
||||
*
|
||||
* @returns number of directories actually removed.
|
||||
*/
|
||||
export async function cleanupOldRuns(
|
||||
runsDir: string,
|
||||
keepMax: number = KEEP_MAX_RUNS,
|
||||
): Promise<number> {
|
||||
let entries: string[]
|
||||
try {
|
||||
entries = await readdir(runsDir)
|
||||
} catch {
|
||||
return 0
|
||||
}
|
||||
type Candidate = { name: string; updatedAt: number }
|
||||
const candidates: Candidate[] = []
|
||||
for (const name of entries) {
|
||||
const run = await readRunState(runsDir, name)
|
||||
// updatedAt=0 → orphan dir without parseable state.json; sorts first → pruned first.
|
||||
candidates.push({ name, updatedAt: run?.updatedAt ?? 0 })
|
||||
}
|
||||
// Newest first; orphans (updatedAt=0) sink to the tail and get pruned first.
|
||||
candidates.sort((a, b) => b.updatedAt - a.updatedAt)
|
||||
// Guard against negative keepMax: slice(-N) would invert semantics and keep N newest instead of
|
||||
// pruning them, which contradicts the contract. Clamp to 0 so a bad caller at worst wipes everything.
|
||||
const cap = Math.max(0, Math.trunc(keepMax))
|
||||
const victims = candidates.slice(cap)
|
||||
let removed = 0
|
||||
for (const v of victims) {
|
||||
try {
|
||||
await rm(join(runsDir, v.name), { recursive: true, force: true })
|
||||
removed++
|
||||
} catch (e) {
|
||||
logForDebugging(
|
||||
`[workflow warn] cleanupOldRuns failed to remove ${v.name}: ${(e as Error).message}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
return removed
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -113,6 +179,10 @@ export async function listPersistedRuns(
|
||||
* Disk write is best-effort: writeRunState swallows IO exceptions and only logs, does not propagate —
|
||||
* so other bus subscribers (store, etc.) are not affected by persistence failures.
|
||||
*
|
||||
* Also fires-and-forgets cleanupOldRuns so the runs directory stays bounded across long-lived
|
||||
* sessions (KEEP_MAX_RUNS). The cleanup runs *after* the new state is written, guaranteeing the
|
||||
* just-finished run is already on disk and counted as newest — never swept out from under itself.
|
||||
*
|
||||
* @param runsDirProvider Optional runsDir resolver (defaults to getRunsDir).
|
||||
* Production path uses the default; tests inject a tmpdir to avoid writing to the real project directory (Bun ESM module namespace is read-only,
|
||||
* cannot monkey-patch getRunsDir itself).
|
||||
@@ -126,6 +196,15 @@ export function attachRunStatePersistence(
|
||||
if (event.type !== 'run_done') return
|
||||
const run = store.get(event.runId)
|
||||
if (!run) return
|
||||
void writeRunState(runsDirProvider(), run)
|
||||
const dir = runsDirProvider()
|
||||
void writeRunState(dir, run).then(() => {
|
||||
// Sweep only after the new state lands on disk — avoids a race where the just-finished run
|
||||
// itself gets pruned because its state.json wasn't counted yet.
|
||||
void cleanupOldRuns(dir).catch(e => {
|
||||
logForDebugging(
|
||||
`[workflow warn] cleanupOldRuns after run_done threw: ${(e as Error).message}`,
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -21,6 +21,13 @@ import {
|
||||
listPersistedRuns,
|
||||
readRunState,
|
||||
} from './persistence.js'
|
||||
|
||||
/**
|
||||
* How many newest persisted runs to hydrate into the store on panel open. Tuned to cover a normal
|
||||
* day's worth of workflow iterations without overrunning the panel tab row; anything older stays
|
||||
* on disk and is still resumable via getRunAsync until cleanupOldRuns reclaims it.
|
||||
*/
|
||||
const LOAD_PERSISTED_LIMIT = 20
|
||||
import { createProgressBus } from './progress/bus.js'
|
||||
import {
|
||||
createProgressStoreFromBus,
|
||||
@@ -135,19 +142,23 @@ export function makeService(
|
||||
script?: string
|
||||
name?: string
|
||||
scriptPath?: string
|
||||
title?: string
|
||||
}): Promise<{
|
||||
script: string
|
||||
workflowFile?: string
|
||||
workflowName: string
|
||||
}> {
|
||||
// Mirrors WorkflowTool.ts: name takes priority over title; only fall back to the literal
|
||||
// 'workflow' when neither is supplied (so /workflows tabs don't pile up under a same default name).
|
||||
const workflowName = input.name ?? input.title ?? 'workflow'
|
||||
if (input.script) {
|
||||
return { script: input.script, workflowName: 'workflow' }
|
||||
return { script: input.script, workflowName }
|
||||
}
|
||||
if (input.scriptPath) {
|
||||
return {
|
||||
script: await readFile(input.scriptPath, 'utf-8'),
|
||||
workflowFile: input.scriptPath,
|
||||
workflowName: 'workflow',
|
||||
workflowName,
|
||||
}
|
||||
}
|
||||
if (input.name) {
|
||||
@@ -280,7 +291,13 @@ export function makeService(
|
||||
if (persistedLoaded) return
|
||||
persistedLoaded = true
|
||||
try {
|
||||
const runs = await listPersistedRuns(runsDirProvider())
|
||||
// Cap hydration at LOAD_PERSISTED_LIMIT newest runs so the panel tab row doesn't drown
|
||||
// under accumulated history. Older state.json files stay on disk (within KEEP_MAX_RUNS,
|
||||
// maintained by cleanupOldRuns) and remain resumable via getRunAsync.
|
||||
const runs = await listPersistedRuns(
|
||||
runsDirProvider(),
|
||||
LOAD_PERSISTED_LIMIT,
|
||||
)
|
||||
for (const run of runs) store.hydrate(run)
|
||||
} catch (e) {
|
||||
// Scan failure does not block the panel: log + reset flag to allow next retry
|
||||
|
||||
Reference in New Issue
Block a user