diff --git a/src/workflow/__tests__/WorkflowsPanel.test.tsx b/src/workflow/__tests__/WorkflowsPanel.test.tsx index b354b2c36..4306cf72a 100644 --- a/src/workflow/__tests__/WorkflowsPanel.test.tsx +++ b/src/workflow/__tests__/WorkflowsPanel.test.tsx @@ -1,10 +1,13 @@ import { expect, test } from 'bun:test'; +import { PassThrough } from 'node:stream'; import React from 'react'; +import { wrappedRender as render } from '@anthropic/ink'; import { SentryErrorBoundary } from '../../components/SentryErrorBoundary.js'; import type { RunProgress } from '../progress/store.js'; import { call as panelCall } from '../panel/panelCall.js'; import { clampSelected, WorkflowsPanel } from '../panel/WorkflowsPanel.js'; import { STATUS_DOT } from '../panel/status.js'; +import { __resetWorkflowServiceForTests, getWorkflowService } from '../service.js'; // 纯函数:选中夹紧到有效区间(与面板内 clampSelected 同源)。 test('clampSelected:空列表→0;越界→末位;负/NaN→0;正常→原值', () => { @@ -104,3 +107,40 @@ test('panelCall 用 SentryErrorBoundary 包裹 WorkflowsPanel(修复 M 回归 expect(React.isValidElement(child)).toBe(true); expect(typeof child.props.onDone).toBe('function'); }); + +// ---- Task 6: 面板 mount 触发一次 loadPersistedRuns ---- +// 验证 WorkflowsPanel mount 时调 svc.loadPersistedRuns() 恰好一次。 +// service 内部 persistedLoaded flag 守护幂等;重渲染/重 mount 不重复调用。 +// 用 spy 替换单例的 loadPersistedRuns,渲染到 PassThrough 流,等 useEffect 触发。 + +test('WorkflowsPanel mount 触发一次 loadPersistedRuns', async () => { + __resetWorkflowServiceForTests(); + const svc = getWorkflowService(); + let calls = 0; + const orig = svc.loadPersistedRuns.bind(svc); + svc.loadPersistedRuns = async () => { + calls++; + }; + + const stdout = new PassThrough(); + // 消费 data 避免 buffer 撑爆(render 会写多帧) + stdout.on('data', () => {}); + let instance: { unmount: () => void; waitUntilExit: () => Promise } | undefined; + try { + instance = await render( + React.createElement(WorkflowsPanel, { + onDone: () => {}, + context: { canUseTool: undefined } as never, + }), + { stdout: stdout as unknown as NodeJS.WriteStream, patchConsole: false }, + ); + // mount 后 useEffect 异步触发;等 tick 让 React commit + effect 跑完 + await new Promise(r => setTimeout(r, 30)); + + expect(calls).toBe(1); + } finally { + instance?.unmount(); + svc.loadPersistedRuns = orig; + __resetWorkflowServiceForTests(); + } +}); diff --git a/src/workflow/__tests__/persistence.test.ts b/src/workflow/__tests__/persistence.test.ts new file mode 100644 index 000000000..d976370dc --- /dev/null +++ b/src/workflow/__tests__/persistence.test.ts @@ -0,0 +1,199 @@ +import { expect, test } from 'bun:test' +import { + mkdir, + mkdtemp, + readFile, + readdir, + rm, + writeFile as fsWriteFile, +} from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { + getRunsDir, + listPersistedRuns, + readRunState, + writeRunState, +} from '../persistence.js' +import type { RunProgress } from '../progress/store.js' + +function makeRun(over: Partial = {}): RunProgress { + return { + runId: 'r1', + workflowName: 'w', + status: 'completed', + phases: [], + declaredPhases: [], + currentPhase: null, + agents: [], + agentCount: 0, + startedAt: 1000, + updatedAt: 2000, + ...over, + } as RunProgress +} + +test('writeRunState → readRunState 往返一致(returnValue 为对象)', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + const run = makeRun({ + returnValue: { confirmedCount: 2, items: ['a', 'b'] }, + }) + await writeRunState(dir, run) + const got = await readRunState(dir, 'r1') + expect(got).not.toBeNull() + expect(got!.runId).toBe('r1') + expect(got!.returnValue).toEqual({ confirmedCount: 2, items: ['a', 'b'] }) + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('readRunState 缺文件 → null', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + const got = await readRunState(dir, 'never-exists') + expect(got).toBeNull() + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('readRunState 损坏 JSON → null', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + await mkdir(join(dir, 'rX'), { recursive: true }) + await fsWriteFile(join(dir, 'rX', 'state.json'), '{not valid json', 'utf-8') + const got = await readRunState(dir, 'rX') + expect(got).toBeNull() + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('readRunState schemaVersion 不符 → null', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + await mkdir(join(dir, 'rX'), { recursive: true }) + await fsWriteFile( + join(dir, 'rX', 'state.json'), + JSON.stringify({ schemaVersion: 999, run: makeRun({ runId: 'rX' }) }), + 'utf-8', + ) + const got = await readRunState(dir, 'rX') + expect(got).toBeNull() + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('writeRunState 原子写:成功后无 tmp 残留', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + await writeRunState(dir, makeRun({ runId: 'rAtom' })) + const sub = await readdir(join(dir, 'rAtom')) + expect(sub).toContain('state.json') + expect(sub).not.toContain('state.json.tmp') + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('listPersistedRuns 扫多子目录、跳过无 state.json 的目录、按 updatedAt 降序', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + // 三个有效 run + 一个只有 journal 没 state.json 的半残目录 + await writeRunState(dir, makeRun({ runId: 'old', updatedAt: 1000 })) + await writeRunState(dir, makeRun({ runId: 'mid', updatedAt: 2000 })) + await writeRunState(dir, makeRun({ runId: 'new', updatedAt: 3000 })) + await mkdir(join(dir, 'half-broken'), { recursive: true }) + + const runs = await listPersistedRuns(dir) + expect(runs.map(r => r.runId)).toEqual(['new', 'mid', 'old']) + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('listPersistedRuns 扫到损坏 state.json → 跳过该单个,继续扫其余', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + await writeRunState(dir, makeRun({ runId: 'good' })) + await mkdir(join(dir, 'bad'), { recursive: true }) + await fsWriteFile(join(dir, 'bad', 'state.json'), 'corrupt', 'utf-8') + + const runs = await listPersistedRuns(dir) + expect(runs.map(r => r.runId)).toEqual(['good']) + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('writeRunState 不抛 returnValue 为 null/字符串/数组', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + await writeRunState(dir, makeRun({ runId: 'n', returnValue: null })) + await writeRunState(dir, makeRun({ runId: 's', returnValue: 'text' })) + await writeRunState(dir, makeRun({ runId: 'a', returnValue: [1, 2, 3] })) + expect((await readRunState(dir, 'n'))!.returnValue).toBeNull() + expect((await readRunState(dir, 's'))!.returnValue).toBe('text') + expect((await readRunState(dir, 'a'))!.returnValue).toEqual([1, 2, 3]) + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('writeRunState 覆盖写:同 runId 二次写覆盖旧内容', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + await writeRunState(dir, makeRun({ runId: 'rOV', status: 'running' })) + await writeRunState(dir, makeRun({ runId: 'rOV', status: 'completed' })) + const got = await readRunState(dir, 'rOV') + expect(got!.status).toBe('completed') + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('writeRunState 写入完整 AgentProgress(不含 output 内容,含 label/phase/token 等)', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-')) + try { + const run = makeRun({ + runId: 'rAg', + agents: [ + { + id: 1, + label: 'review:hooks', + phase: 'Review', + status: 'done', + outputShape: 'object', + tokenCount: 12345, + toolCount: 3, + model: 'claude-sonnet-4-6', + }, + ], + agentCount: 1, + }) + await writeRunState(dir, run) + const got = await readRunState(dir, 'rAg') + expect(got!.agents).toHaveLength(1) + expect(got!.agents[0]).toEqual({ + id: 1, + label: 'review:hooks', + phase: 'Review', + status: 'done', + outputShape: 'object', + tokenCount: 12345, + toolCount: 3, + model: 'claude-sonnet-4-6', + }) + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('getRunsDir 返回 /.claude/workflow-runs 形态', () => { + const dir = getRunsDir() + // 不 hard-code projectRoot(跨机器不同),只校验后缀结构 + expect(dir.endsWith(`${join('.claude', 'workflow-runs')}`)).toBe(true) +}) diff --git a/src/workflow/__tests__/progressStore.test.ts b/src/workflow/__tests__/progressStore.test.ts index bd223eba9..09c64c4d4 100644 --- a/src/workflow/__tests__/progressStore.test.ts +++ b/src/workflow/__tests__/progressStore.test.ts @@ -1,6 +1,9 @@ import { expect, test } from 'bun:test' import { createProgressBus, type ProgressBus } from '../progress/bus.js' -import { createProgressStoreFromBus } from '../progress/store.js' +import { + createProgressStoreFromBus, + type RunProgress, +} from '../progress/store.js' import type { AgentRunResult } from '@claude-code-best/workflow-engine' const ok = (o: string): AgentRunResult => ({ @@ -229,3 +232,58 @@ test('agent_done 落地 model/tokenCount/toolCount(ok 变体)', () => { expect(a.tokenCount).toBe(22900) expect(a.toolCount).toBe(1) }) + +// ---- hydrate:从磁盘注入历史 run(跨重启恢复)---- + +test('hydrate 注入新 run → get 命中 + list 含该项 + 通知 listener', () => { + const { store } = newStore() + let notified = 0 + store.subscribe(() => notified++) + + const historical: RunProgress = { + runId: 'hist-1', + workflowName: 'old-job', + status: 'completed', + phases: [], + declaredPhases: [], + currentPhase: null, + agents: [], + agentCount: 5, + returnValue: { summary: 'past' }, + startedAt: 1, + updatedAt: 2, + } + store.hydrate(historical) + + expect(store.get('hist-1')).toBe(historical) + expect(store.list().map(r => r.runId)).toContain('hist-1') + expect(notified).toBeGreaterThan(0) +}) + +test('hydrate 已存在的 runId → 跳过(内存优先,不被磁盘覆盖)', () => { + const { bus, store } = newStore() + bus.emit({ + type: 'run_started', + runId: 'r1', + workflowName: 'live', + meta: null, + }) + + const stale: RunProgress = { + runId: 'r1', + workflowName: 'STALE-SHOULD-NOT-WIN', + status: 'completed', + phases: [], + declaredPhases: [], + currentPhase: null, + agents: [], + agentCount: 0, + startedAt: 1, + updatedAt: 2, + } + store.hydrate(stale) + + const got = store.get('r1')! + expect(got.workflowName).toBe('live') + expect(got.status).toBe('running') +}) diff --git a/src/workflow/__tests__/runStatePersistence.test.ts b/src/workflow/__tests__/runStatePersistence.test.ts new file mode 100644 index 000000000..3a6962992 --- /dev/null +++ b/src/workflow/__tests__/runStatePersistence.test.ts @@ -0,0 +1,177 @@ +import { expect, test } from 'bun:test' +import { mkdtemp, rm, writeFile } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { attachRunStatePersistence, readRunState } from '../persistence.js' +import { createProgressBus } from '../progress/bus.js' +import { createProgressStoreFromBus } from '../progress/store.js' + +/** + * attachRunStatePersistence 的契约测试(调整后 Task 4): + * 直接测 bus + store 组合,不走 makeService(保持 makeService 签名 (ports, store, cwdOverride?) 不变)。 + * + * runsDir 通过 attachRunStatePersistence 的第三个参数 runsDirProvider 注入 tmpdir, + * 避免写真实项目目录(Bun ESM 模块命名空间只读,无法 monkey-patch getRunsDir)。 + */ + +test('run_done completed → 写盘 state.json,returnValue 一致', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-persist-')) + try { + const bus = createProgressBus() + const store = createProgressStoreFromBus(bus) + attachRunStatePersistence(bus, store, () => dir) + + bus.emit({ + type: 'run_started', + runId: 'rW', + workflowName: 'w', + meta: null, + }) + bus.emit({ + type: 'run_done', + runId: 'rW', + status: 'completed', + returnValue: { ok: true, n: 3 }, + }) + + // writeRunState 是 async(订阅里 void writeRunState(...));让 microtask 跑完 + await new Promise(r => setTimeout(r, 50)) + + const got = await readRunState(dir, 'rW') + expect(got).not.toBeNull() + expect(got!.status).toBe('completed') + expect(got!.returnValue).toEqual({ ok: true, n: 3 }) + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('run_done failed → 写盘 status=failed + error 字段', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-persist-')) + try { + const bus = createProgressBus() + const store = createProgressStoreFromBus(bus) + attachRunStatePersistence(bus, store, () => dir) + + bus.emit({ + type: 'run_started', + runId: 'rF', + workflowName: 'w', + meta: null, + }) + bus.emit({ + type: 'run_done', + runId: 'rF', + status: 'failed', + error: 'boom', + }) + await new Promise(r => setTimeout(r, 50)) + + const got = await readRunState(dir, 'rF') + expect(got).not.toBeNull() + expect(got!.status).toBe('failed') + expect(got!.error).toBe('boom') + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('run_done killed → 写盘 status=killed', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-persist-')) + try { + const bus = createProgressBus() + const store = createProgressStoreFromBus(bus) + attachRunStatePersistence(bus, store, () => dir) + + bus.emit({ + type: 'run_started', + runId: 'rK', + workflowName: 'w', + meta: null, + }) + bus.emit({ type: 'run_done', runId: 'rK', status: 'killed' }) + await new Promise(r => setTimeout(r, 50)) + + const got = await readRunState(dir, 'rK') + expect(got?.status).toBe('killed') + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('writeRunState 内部 IO 异常被吞掉:attachRunStatePersistence 不传播,bus emit 不中断', async () => { + const blockerDir = await mkdtemp(join(tmpdir(), 'wf-persist-')) + // 先创建一个同名文件,让子路径 mkdir 失败 → writeRunState 内部 catch 吞掉 + await writeFile(join(blockerDir, 'not-a-dir.txt'), 'blocker', 'utf-8') + try { + const bus = createProgressBus() + const store = createProgressStoreFromBus(bus) + // runsDir 指向一个父路径是文件的目录:mkdir recursive 失败 + attachRunStatePersistence(bus, store, () => + join(blockerDir, 'not-a-dir.txt'), + ) + + // 额外的订阅者,验证它仍被通知(bus emit 不应因持久化 listener 内部异常中断) + let otherNotified = 0 + bus.subscribe(() => otherNotified++) + + // bus.emit 不应抛——writeRunState 内部吞异常 + expect(() => { + bus.emit({ + type: 'run_started', + runId: 'rErr', + workflowName: 'w', + meta: null, + }) + bus.emit({ + type: 'run_done', + runId: 'rErr', + status: 'completed', + returnValue: 'x', + }) + }).not.toThrow() + + // 让 writeRunState 的 microtask 跑完(异常在内部被吞) + await new Promise(r => setTimeout(r, 50)) + + // store 这条订阅者仍正常工作(收到了 run_started + run_done 两次事件) + expect(otherNotified).toBeGreaterThanOrEqual(2) + expect(store.get('rErr')?.status).toBe('completed') + } finally { + await rm(blockerDir, { recursive: true, force: true }) + } +}) + +test('attachRunStatePersistence 返回 unsubscribe;调用后不再写盘', async () => { + const dir = await mkdtemp(join(tmpdir(), 'wf-persist-')) + try { + const bus = createProgressBus() + const store = createProgressStoreFromBus(bus) + const unsub = attachRunStatePersistence(bus, store, () => dir) + + // 先发一个 run_done,验证写盘生效 + bus.emit({ + type: 'run_started', + runId: 'r1', + workflowName: 'w', + meta: null, + }) + bus.emit({ type: 'run_done', runId: 'r1', status: 'completed' }) + await new Promise(r => setTimeout(r, 50)) + expect(await readRunState(dir, 'r1')).not.toBeNull() + + // unsubscribe 后再发 run_done,不应再写盘 + unsub() + bus.emit({ + type: 'run_started', + runId: 'r2', + workflowName: 'w', + meta: null, + }) + bus.emit({ type: 'run_done', runId: 'r2', status: 'completed' }) + await new Promise(r => setTimeout(r, 50)) + expect(await readRunState(dir, 'r2')).toBeNull() + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) diff --git a/src/workflow/__tests__/service.test.ts b/src/workflow/__tests__/service.test.ts index 0ba7d659a..216090c53 100644 --- a/src/workflow/__tests__/service.test.ts +++ b/src/workflow/__tests__/service.test.ts @@ -9,7 +9,10 @@ import { tmpdir } from 'node:os' import { join } from 'node:path' import { makeService, __resetWorkflowServiceForTests } from '../service.js' import { createProgressBus } from '../progress/bus.js' -import { createProgressStoreFromBus } from '../progress/store.js' +import { + createProgressStoreFromBus, + type RunProgress, +} from '../progress/store.js' import type { AgentRunResult, ProgressEvent, @@ -356,3 +359,153 @@ test('shutdown 不重复杀已完成 run;幂等(多次调用安全)', asyn // 幂等 expect(() => svc.shutdown()).not.toThrow() }) + +// ---- Task 5: loadPersistedRuns + getRunAsync fallback ---- +// runsDirProvider 作为 makeService 第四个可选参数注入 tmpdir,避免写真实项目目录 +// (Bun ESM 模块命名空间只读,无法 monkey-patch getRunsDir)。 + +test('loadPersistedRuns 扫盘 hydrate 历史 run;已有内存 run 不被覆盖', async () => { + __resetWorkflowServiceForTests() + const dir = await mkdtemp(join(tmpdir(), 'wf-svc-')) + try { + // 磁盘先有两个历史 run + const { writeRunState } = await import('../persistence.js') + const historicalA = { + runId: 'hA', + workflowName: 'old-A', + status: 'completed', + phases: [], + declaredPhases: [], + currentPhase: null, + agents: [], + agentCount: 1, + returnValue: 'a', + startedAt: 10, + updatedAt: 20, + } as RunProgress + const historicalB = { + runId: 'hB', + workflowName: 'old-B', + status: 'failed', + phases: [], + declaredPhases: [], + currentPhase: null, + agents: [], + agentCount: 2, + error: 'x', + startedAt: 30, + updatedAt: 40, + } as RunProgress + await writeRunState(dir, historicalA) + await writeRunState(dir, historicalB) + + const { ports, store } = fakePorts() + // 内存先有一个本次会话 run(通过 ports.progressEmitter.emit 走 bus → store) + ports.progressEmitter.emit({ + type: 'run_started', + runId: 'live', + workflowName: 'live-w', + meta: null, + }) + const svc = makeService(ports, store, undefined, () => dir) + + await svc.loadPersistedRuns() + + const ids = svc.listRuns().map(r => r.runId) + expect(ids).toContain('hA') + expect(ids).toContain('hB') + expect(ids).toContain('live') + // 内存优先:live 仍是 running(不被磁盘覆盖;磁盘里没有 live 也不会注入 STALE) + expect(svc.getRun('live')!.status).toBe('running') + expect(svc.getRun('hA')!.returnValue).toBe('a') + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('loadPersistedRuns 重复调用仅扫盘一次(persistedLoaded flag)', async () => { + __resetWorkflowServiceForTests() + const dir = await mkdtemp(join(tmpdir(), 'wf-svc-')) + try { + const { ports, store } = fakePorts() + const svc = makeService(ports, store, undefined, () => dir) + + await svc.loadPersistedRuns() + await svc.loadPersistedRuns() + await svc.loadPersistedRuns() + + // 重复调用不抛错、不改变 listRuns 结果(空目录) + expect(svc.listRuns()).toEqual([]) + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('getRunAsync 内存命中 → 不读盘', async () => { + __resetWorkflowServiceForTests() + const dir = await mkdtemp(join(tmpdir(), 'wf-svc-')) + try { + const { ports, store } = fakePorts() + const svc = makeService(ports, store, undefined, () => dir) + ports.progressEmitter.emit({ + type: 'run_started', + runId: 'live', + workflowName: 'w', + meta: null, + }) + + const got = await svc.getRunAsync('live') + expect(got?.runId).toBe('live') + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('getRunAsync 内存 miss + 磁盘命中 → 返回磁盘值,且不注入内存(再次 get 仍读盘)', async () => { + __resetWorkflowServiceForTests() + const dir = await mkdtemp(join(tmpdir(), 'wf-svc-')) + try { + const { writeRunState } = await import('../persistence.js') + const historical = { + runId: 'hist-only', + workflowName: 'old', + status: 'completed', + phases: [], + declaredPhases: [], + currentPhase: null, + agents: [], + agentCount: 0, + returnValue: { x: 1 }, + startedAt: 1, + updatedAt: 2, + } as RunProgress + await writeRunState(dir, historical) + + const { ports, store } = fakePorts() + const svc = makeService(ports, store, undefined, () => dir) + + const got = await svc.getRunAsync('hist-only') + expect(got?.returnValue).toEqual({ x: 1 }) + // 不注入内存:内存 list 不含(未 hydrate) + expect(svc.listRuns().map(r => r.runId)).not.toContain('hist-only') + // 再次 get 仍能返回(每次走 readRunState fallback) + const got2 = await svc.getRunAsync('hist-only') + expect(got2?.returnValue).toEqual({ x: 1 }) + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) + +test('getRunAsync 内存 miss + 磁盘 miss → undefined', async () => { + __resetWorkflowServiceForTests() + const dir = await mkdtemp(join(tmpdir(), 'wf-svc-')) + try { + const { ports, store } = fakePorts() + const svc = makeService(ports, store, undefined, () => dir) + + const got = await svc.getRunAsync('no-such-run') + expect(got).toBeUndefined() + } finally { + await rm(dir, { recursive: true, force: true }) + } +}) diff --git a/src/workflow/panel/WorkflowsPanel.tsx b/src/workflow/panel/WorkflowsPanel.tsx index 3c5f9bce5..84a332956 100644 --- a/src/workflow/panel/WorkflowsPanel.tsx +++ b/src/workflow/panel/WorkflowsPanel.tsx @@ -48,6 +48,12 @@ export function WorkflowsPanel({ const [selectedPhaseIndex, setSelectedPhaseIndex] = useState(0); const [selectedAgentIndex, setSelectedAgentIndex] = useState(0); + // mount 时触发一次扫盘 hydrate 历史 run(service 内部 persistedLoaded flag 守护幂等)。 + // 重 mount/重渲染不会重复扫盘(flag 进程单例守护)。svc 引用稳定(getWorkflowService 单例)。 + useEffect(() => { + void svc.loadPersistedRuns(); + }, [svc]); + // runs 变化时:activeRunId 失效(被 kill / 首次)→ 夹紧到首个 useEffect(() => { if (runs.length === 0) { diff --git a/src/workflow/persistence.ts b/src/workflow/persistence.ts new file mode 100644 index 000000000..5925302b4 --- /dev/null +++ b/src/workflow/persistence.ts @@ -0,0 +1,131 @@ +import { mkdir, readFile, readdir, rename, writeFile } from 'node:fs/promises' +import { join } from 'node:path' +import { getProjectRoot } from '../bootstrap/state.js' +import { logForDebugging } from '../utils/debug.js' +import type { ProgressBus } from './progress/bus.js' +import type { ProgressStore, RunProgress } from './progress/store.js' + +/** state.json 当前 schema 版本;升级时引入迁移链。 */ +const SCHEMA_VERSION = 1 +const STATE_FILE = 'state.json' +const STATE_TMP = 'state.json.tmp' + +/** + * runsDir 统一来源:与 ports.ts journalStore 同根(${projectRoot}/.claude/workflow-runs)。 + * 提取为函数:消除 ports.ts 与持久化逻辑的路径拼接重复,进入 worktree/子目录时保持同根。 + * 测试用 monkey-patch 本函数指向 tmpdir。 + */ +export function getRunsDir(): string { + return join(getProjectRoot(), '.claude', 'workflow-runs') +} + +type StateFile = { + schemaVersion: number + run: RunProgress +} + +/** + * 原子覆盖写终态 RunProgress 到 //state.json。 + * 原子性:writeFile(tmp) → rename(tmp, target),rename 原子;最坏留 tmp,下次写覆盖。 + * 失败 best-effort:IO 异常只 log warn,不抛(workflow 已成功,持久化失败只意味着重启后取不到)。 + */ +export async function writeRunState( + runsDir: string, + run: RunProgress, +): Promise { + const dir = join(runsDir, run.runId) + const target = join(dir, STATE_FILE) + const tmp = join(dir, STATE_TMP) + const payload: StateFile = { schemaVersion: SCHEMA_VERSION, run } + try { + await mkdir(dir, { recursive: true }) + await writeFile(tmp, JSON.stringify(payload), 'utf-8') + await rename(tmp, target) + } catch (e) { + logForDebugging( + `[workflow warn] writeRunState failed for ${run.runId}: ${(e as Error).message}`, + ) + } +} + +/** + * 读 //state.json,容错: + * - 文件不存在 → null(调用方按 miss 处理) + * - JSON 解析失败 / schema 结构不符 / schemaVersion 不符 → null(log warn,不崩) + */ +export async function readRunState( + runsDir: string, + runId: string, +): Promise { + const target = join(runsDir, runId, STATE_FILE) + let raw: string + try { + raw = await readFile(target, 'utf-8') + } catch { + return null + } + try { + const parsed = JSON.parse(raw) as Partial + if (parsed.schemaVersion !== SCHEMA_VERSION) return null + const run = parsed.run + if (!run || typeof run !== 'object') return null + if (typeof run.runId !== 'string') return null + if (typeof run.status !== 'string') return null + return run as RunProgress + } catch (e) { + logForDebugging( + `[workflow warn] readRunState parse failed for ${runId}: ${(e as Error).message}`, + ) + return null + } +} + +/** + * 扫描 runsDir 下所有子目录,读取每个 state.json,返回非空 RunProgress 列表。 + * - runsDir 不存在 → 空数组 + * - 某子目录无 state.json(半残 run)→ 跳过 + * - 某子目录 state.json 损坏 → 跳过该单个,继续扫其余 + * - 按 updatedAt 降序(与 store.list() 排序一致) + */ +export async function listPersistedRuns( + runsDir: string, +): Promise { + let entries: string[] + try { + entries = await readdir(runsDir) + } catch { + return [] + } + const runs: RunProgress[] = [] + for (const name of entries) { + const run = await readRunState(runsDir, name) + if (run) runs.push(run) + } + return runs.sort((a, b) => b.updatedAt - a.updatedAt) +} + +/** + * 订阅 bus 的 run_done 事件,把终态 RunProgress 写到磁盘 state.json。 + * 覆盖 completed/failed/killed 三态(shutdown-kill 也走 run_done killed)。 + * store 先于本订阅注册到 bus,故 listener 执行时 store.get(runId) 已是终态。 + * 返回 unsubscribe 函数(测试清理用)。 + * + * 写盘 best-effort:writeRunState 内部吞 IO 异常只 log,不传播—— + * 因此 bus 的其他订阅者(store 等)不受持久化失败影响。 + * + * @param runsDirProvider 可选的 runsDir 解析器(默认 getRunsDir)。 + * 生产路径走默认值;测试注入 tmpdir 避免写真实项目目录(Bun ESM 模块命名空间只读, + * 无法 monkey-patch getRunsDir 本身)。 + */ +export function attachRunStatePersistence( + bus: ProgressBus, + store: ProgressStore, + runsDirProvider: () => string = getRunsDir, +): () => void { + return bus.subscribe(event => { + if (event.type !== 'run_done') return + const run = store.get(event.runId) + if (!run) return + void writeRunState(runsDirProvider(), run) + }) +} diff --git a/src/workflow/ports.ts b/src/workflow/ports.ts index 680518d96..c6bb62299 100644 --- a/src/workflow/ports.ts +++ b/src/workflow/ports.ts @@ -5,6 +5,7 @@ import { } from '@claude-code-best/workflow-engine' import { logForDebugging } from '../utils/debug.js' import { getProjectRoot } from '../bootstrap/state.js' +import { getRunsDir } from './persistence.js' import { type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, logEvent, @@ -69,7 +70,7 @@ export function createWorkflowPorts(opts: { store: ProgressStore }): WorkflowPorts { const bindings = new Map() - const runsDir = `${getProjectRoot()}/.claude/workflow-runs` + const runsDir = getRunsDir() const registry = buildRegistry() // 遥测订阅(独立于 store)。LogEventMetadata 只接受 boolean/number/undefined, diff --git a/src/workflow/progress/store.ts b/src/workflow/progress/store.ts index f69e8d7e8..e056092ea 100644 --- a/src/workflow/progress/store.ts +++ b/src/workflow/progress/store.ts @@ -41,6 +41,8 @@ export type ProgressStore = { apply(event: ProgressEvent): void list(): RunProgress[] get(runId: string): RunProgress | undefined + /** 直接注入磁盘读出的 run(绕过 bus);已存在的 runId 跳过——内存优先。 */ + hydrate(run: RunProgress): void /** 供 useSyncExternalStore:返回稳定引用,无变更时同一数组。 */ subscribe(listener: () => void): () => void getSnapshot(): RunProgress[] @@ -184,6 +186,11 @@ export function createProgressStoreFromBus(bus: ProgressBus): ProgressStore { apply, list: () => snapshot, get: id => byId.get(id), + hydrate(run) { + if (byId.has(run.runId)) return + byId.set(run.runId, run) + notify() + }, subscribe: fn => { listeners.add(fn) return () => listeners.delete(fn) diff --git a/src/workflow/service.ts b/src/workflow/service.ts index 928c9548a..622b3b415 100644 --- a/src/workflow/service.ts +++ b/src/workflow/service.ts @@ -15,6 +15,12 @@ import { getProjectRoot } from '../bootstrap/state.js' import { logForDebugging } from '../utils/debug.js' import { buildHostBundle, makeHostHandle } from './hostHandle.js' import { installWorkflowNotifications } from './notifications.js' +import { + attachRunStatePersistence, + getRunsDir, + listPersistedRuns, + readRunState, +} from './persistence.js' import { createProgressBus } from './progress/bus.js' import { createProgressStoreFromBus, @@ -59,6 +65,16 @@ export type WorkflowService = { shutdown(): void listRuns(): RunProgress[] getRun(runId: string): RunProgress | undefined + /** + * 异步按 runId 查:内存命中则返回;miss 读盘 state.json(不注入内存)。 + * 供"按 runId 取历史 return"场景;面板展示请走 loadPersistedRuns + listRuns。 + */ + getRunAsync(runId: string): Promise + /** + * 扫盘把所有历史 run 的 state.json hydrate 进 store(已存在 runId 跳过)。 + * 进程单例内仅实际扫盘一次(persistedLoaded flag);重复调用立即返回。 + */ + loadPersistedRuns(): Promise subscribe(listener: () => void): () => void listNamed(workflowDir?: string): Promise } @@ -72,6 +88,9 @@ export function getWorkflowService(): WorkflowService { const store = createProgressStoreFromBus(bus) const ports = createWorkflowPorts({ bus, store }) const service = makeService(ports, store) + // 订阅 run_done 写终态快照到磁盘(completed/failed/killed 三态共用入口,shutdown-kill 也走此路径)。 + // store 先于本订阅注册到 bus,故 listener 执行时 store.get(runId) 已是终态。 + attachRunStatePersistence(bus, store) // 安装状态变更通知桥接(commit 0768d4dc 承诺但旧实现落空的"完成时自动通知") installWorkflowNotifications(service) cached = service @@ -83,11 +102,15 @@ export function getWorkflowService(): WorkflowService { * * 生产路径用 {@link getWorkflowService};测试用本函数直接注入 fake ports, * 避免触碰真实的 getProjectRoot/getCwd/analytics 等模块级副作用。 + * + * @param cwdOverride 仅供测试注入临时目录(避免 inline 持久化写真实项目目录)。 + * @param runsDirProvider 仅供测试注入 tmpdir(Bun ESM 模块命名空间只读,无法 monkey-patch getRunsDir)。 */ export function makeService( ports: WorkflowPorts, store: ProgressStore, cwdOverride?: string, + runsDirProvider: () => string = getRunsDir, ): WorkflowService { const buildHost = ( toolUseContext: ToolUseContext, @@ -138,6 +161,10 @@ export function makeService( throw new Error('必须提供 script、name 或 scriptPath 之一') } + // loadPersistedRuns 的进程单例 flag:首次调用后置 true,后续重复调用立即返回。 + // 扫盘失败时复位允许下次重试。每个 makeService 调用独立闭包变量(测试构造新 service 时重置)。 + let persistedLoaded = false + return { ports, @@ -232,6 +259,25 @@ export function makeService( listRuns: () => store.list(), getRun: id => store.get(id), + async getRunAsync(id) { + const mem = store.get(id) + if (mem) return mem + return (await readRunState(runsDirProvider(), id)) ?? undefined + }, + async loadPersistedRuns() { + if (persistedLoaded) return + persistedLoaded = true + try { + const runs = await listPersistedRuns(runsDirProvider()) + for (const run of runs) store.hydrate(run) + } catch (e) { + // 扫盘失败不阻断面板:log + 复位 flag 允许下次重试 + logForDebugging( + `[workflow warn] loadPersistedRuns failed: ${(e as Error).message}`, + ) + persistedLoaded = false + } + }, subscribe: fn => store.subscribe(fn), async listNamed(workflowDir) {