feat(workflow): run 终态落盘 state.json 支持跨重启恢复

终态 RunProgress(含 returnValue/error)此前只在内存 ProgressStore,进程
重启即丢失。本次让其落盘到 .claude/workflow-runs/<runId>/state.json,使
(a) 重启后可按 runId 取 return、(b) /workflows 面板跨重启展示历史 run。
跨进程 resume 明确不在范围。

- persistence.ts: getRunsDir/writeRunState/readRunState/listPersistedRuns
  + attachRunStatePersistence;原子覆盖写(tmp+rename),读容错(缺文件/
  损坏/schemaVersion 不符 → null),写 best-effort(IO 失败只 log warn)
- progress/store.ts: 加 hydrate(run) 直接注入磁盘 run(已存在 runId 跳过,
  内存优先)
- service.ts: getWorkflowService() 接线 attachRunStatePersistence(bus,
  store) 订阅 run_done(completed/failed/killed 三态共用,shutdown-kill
  也走同路径,无需额外钩子);WorkflowService 加 getRunAsync(id) 内存
  miss→读盘 fallback(不注入内存)+ loadPersistedRuns() 扫盘 hydrate
  (persistedLoaded flag 守护幂等)
- panel/WorkflowsPanel.tsx: mount 时调一次 loadPersistedRuns(重 mount
  不重复)
- ports.ts: runsDir 改用 getRunsDir() 消除拼接重复
- 测试: persistence.test.ts(11)/runStatePersistence.test.ts(5)/
  progressStore(2)/service(5)/WorkflowsPanel(1) 共 24 个新测试;
  precheck 5629 pass / 0 fail

设计偏离: 计划原写 monkey-patch getRunsDir 指向 tmpdir,Bun ESM namespace
不可变不可行;改用可选 runsDirProvider 参数(默认 getRunsDir)DI 注入,
加到 attachRunStatePersistence 与 makeService(cwdOverride 之后第 4 参),
与现有 cwdOverride 模式一致。makeService 的 cwdOverride 保持不变,不破坏
inline 持久化特性。

Co-Authored-By: glm-5.2 <zai-org@claude-code-best.win>
This commit is contained in:
claude-code-best
2026-06-13 23:37:52 +08:00
parent 54d2bf6f12
commit b5ead59e72
10 changed files with 821 additions and 3 deletions

View File

@@ -1,10 +1,13 @@
import { expect, test } from 'bun:test';
import { PassThrough } from 'node:stream';
import React from 'react';
import { wrappedRender as render } from '@anthropic/ink';
import { SentryErrorBoundary } from '../../components/SentryErrorBoundary.js';
import type { RunProgress } from '../progress/store.js';
import { call as panelCall } from '../panel/panelCall.js';
import { clampSelected, WorkflowsPanel } from '../panel/WorkflowsPanel.js';
import { STATUS_DOT } from '../panel/status.js';
import { __resetWorkflowServiceForTests, getWorkflowService } from '../service.js';
// 纯函数:选中夹紧到有效区间(与面板内 clampSelected 同源)。
test('clampSelected空列表→0越界→末位负/NaN→0正常→原值', () => {
@@ -104,3 +107,40 @@ test('panelCall 用 SentryErrorBoundary 包裹 WorkflowsPanel修复 M 回归
expect(React.isValidElement(child)).toBe(true);
expect(typeof child.props.onDone).toBe('function');
});
// ---- Task 6: 面板 mount 触发一次 loadPersistedRuns ----
// 验证 WorkflowsPanel mount 时调 svc.loadPersistedRuns() 恰好一次。
// service 内部 persistedLoaded flag 守护幂等;重渲染/重 mount 不重复调用。
// 用 spy 替换单例的 loadPersistedRuns渲染到 PassThrough 流,等 useEffect 触发。
test('WorkflowsPanel mount 触发一次 loadPersistedRuns', async () => {
__resetWorkflowServiceForTests();
const svc = getWorkflowService();
let calls = 0;
const orig = svc.loadPersistedRuns.bind(svc);
svc.loadPersistedRuns = async () => {
calls++;
};
const stdout = new PassThrough();
// 消费 data 避免 buffer 撑爆render 会写多帧)
stdout.on('data', () => {});
let instance: { unmount: () => void; waitUntilExit: () => Promise<void> } | undefined;
try {
instance = await render(
React.createElement(WorkflowsPanel, {
onDone: () => {},
context: { canUseTool: undefined } as never,
}),
{ stdout: stdout as unknown as NodeJS.WriteStream, patchConsole: false },
);
// mount 后 useEffect 异步触发;等 tick 让 React commit + effect 跑完
await new Promise(r => setTimeout(r, 30));
expect(calls).toBe(1);
} finally {
instance?.unmount();
svc.loadPersistedRuns = orig;
__resetWorkflowServiceForTests();
}
});

View File

@@ -0,0 +1,199 @@
import { expect, test } from 'bun:test'
import {
mkdir,
mkdtemp,
readFile,
readdir,
rm,
writeFile as fsWriteFile,
} from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import {
getRunsDir,
listPersistedRuns,
readRunState,
writeRunState,
} from '../persistence.js'
import type { RunProgress } from '../progress/store.js'
function makeRun(over: Partial<RunProgress> = {}): RunProgress {
return {
runId: 'r1',
workflowName: 'w',
status: 'completed',
phases: [],
declaredPhases: [],
currentPhase: null,
agents: [],
agentCount: 0,
startedAt: 1000,
updatedAt: 2000,
...over,
} as RunProgress
}
test('writeRunState → readRunState 往返一致returnValue 为对象)', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
const run = makeRun({
returnValue: { confirmedCount: 2, items: ['a', 'b'] },
})
await writeRunState(dir, run)
const got = await readRunState(dir, 'r1')
expect(got).not.toBeNull()
expect(got!.runId).toBe('r1')
expect(got!.returnValue).toEqual({ confirmedCount: 2, items: ['a', 'b'] })
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('readRunState 缺文件 → null', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
const got = await readRunState(dir, 'never-exists')
expect(got).toBeNull()
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('readRunState 损坏 JSON → null', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
await mkdir(join(dir, 'rX'), { recursive: true })
await fsWriteFile(join(dir, 'rX', 'state.json'), '{not valid json', 'utf-8')
const got = await readRunState(dir, 'rX')
expect(got).toBeNull()
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('readRunState schemaVersion 不符 → null', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
await mkdir(join(dir, 'rX'), { recursive: true })
await fsWriteFile(
join(dir, 'rX', 'state.json'),
JSON.stringify({ schemaVersion: 999, run: makeRun({ runId: 'rX' }) }),
'utf-8',
)
const got = await readRunState(dir, 'rX')
expect(got).toBeNull()
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('writeRunState 原子写:成功后无 tmp 残留', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
await writeRunState(dir, makeRun({ runId: 'rAtom' }))
const sub = await readdir(join(dir, 'rAtom'))
expect(sub).toContain('state.json')
expect(sub).not.toContain('state.json.tmp')
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('listPersistedRuns 扫多子目录、跳过无 state.json 的目录、按 updatedAt 降序', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
// 三个有效 run + 一个只有 journal 没 state.json 的半残目录
await writeRunState(dir, makeRun({ runId: 'old', updatedAt: 1000 }))
await writeRunState(dir, makeRun({ runId: 'mid', updatedAt: 2000 }))
await writeRunState(dir, makeRun({ runId: 'new', updatedAt: 3000 }))
await mkdir(join(dir, 'half-broken'), { recursive: true })
const runs = await listPersistedRuns(dir)
expect(runs.map(r => r.runId)).toEqual(['new', 'mid', 'old'])
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('listPersistedRuns 扫到损坏 state.json → 跳过该单个,继续扫其余', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
await writeRunState(dir, makeRun({ runId: 'good' }))
await mkdir(join(dir, 'bad'), { recursive: true })
await fsWriteFile(join(dir, 'bad', 'state.json'), 'corrupt', 'utf-8')
const runs = await listPersistedRuns(dir)
expect(runs.map(r => r.runId)).toEqual(['good'])
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('writeRunState 不抛 returnValue 为 null/字符串/数组', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
await writeRunState(dir, makeRun({ runId: 'n', returnValue: null }))
await writeRunState(dir, makeRun({ runId: 's', returnValue: 'text' }))
await writeRunState(dir, makeRun({ runId: 'a', returnValue: [1, 2, 3] }))
expect((await readRunState(dir, 'n'))!.returnValue).toBeNull()
expect((await readRunState(dir, 's'))!.returnValue).toBe('text')
expect((await readRunState(dir, 'a'))!.returnValue).toEqual([1, 2, 3])
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('writeRunState 覆盖写:同 runId 二次写覆盖旧内容', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
await writeRunState(dir, makeRun({ runId: 'rOV', status: 'running' }))
await writeRunState(dir, makeRun({ runId: 'rOV', status: 'completed' }))
const got = await readRunState(dir, 'rOV')
expect(got!.status).toBe('completed')
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('writeRunState 写入完整 AgentProgress不含 output 内容,含 label/phase/token 等)', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-'))
try {
const run = makeRun({
runId: 'rAg',
agents: [
{
id: 1,
label: 'review:hooks',
phase: 'Review',
status: 'done',
outputShape: 'object',
tokenCount: 12345,
toolCount: 3,
model: 'claude-sonnet-4-6',
},
],
agentCount: 1,
})
await writeRunState(dir, run)
const got = await readRunState(dir, 'rAg')
expect(got!.agents).toHaveLength(1)
expect(got!.agents[0]).toEqual({
id: 1,
label: 'review:hooks',
phase: 'Review',
status: 'done',
outputShape: 'object',
tokenCount: 12345,
toolCount: 3,
model: 'claude-sonnet-4-6',
})
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('getRunsDir 返回 <projectRoot>/.claude/workflow-runs 形态', () => {
const dir = getRunsDir()
// 不 hard-code projectRoot跨机器不同只校验后缀结构
expect(dir.endsWith(`${join('.claude', 'workflow-runs')}`)).toBe(true)
})

View File

@@ -1,6 +1,9 @@
import { expect, test } from 'bun:test'
import { createProgressBus, type ProgressBus } from '../progress/bus.js'
import { createProgressStoreFromBus } from '../progress/store.js'
import {
createProgressStoreFromBus,
type RunProgress,
} from '../progress/store.js'
import type { AgentRunResult } from '@claude-code-best/workflow-engine'
const ok = (o: string): AgentRunResult => ({
@@ -229,3 +232,58 @@ test('agent_done 落地 model/tokenCount/toolCountok 变体)', () => {
expect(a.tokenCount).toBe(22900)
expect(a.toolCount).toBe(1)
})
// ---- hydrate从磁盘注入历史 run跨重启恢复----
test('hydrate 注入新 run → get 命中 + list 含该项 + 通知 listener', () => {
const { store } = newStore()
let notified = 0
store.subscribe(() => notified++)
const historical: RunProgress = {
runId: 'hist-1',
workflowName: 'old-job',
status: 'completed',
phases: [],
declaredPhases: [],
currentPhase: null,
agents: [],
agentCount: 5,
returnValue: { summary: 'past' },
startedAt: 1,
updatedAt: 2,
}
store.hydrate(historical)
expect(store.get('hist-1')).toBe(historical)
expect(store.list().map(r => r.runId)).toContain('hist-1')
expect(notified).toBeGreaterThan(0)
})
test('hydrate 已存在的 runId → 跳过(内存优先,不被磁盘覆盖)', () => {
const { bus, store } = newStore()
bus.emit({
type: 'run_started',
runId: 'r1',
workflowName: 'live',
meta: null,
})
const stale: RunProgress = {
runId: 'r1',
workflowName: 'STALE-SHOULD-NOT-WIN',
status: 'completed',
phases: [],
declaredPhases: [],
currentPhase: null,
agents: [],
agentCount: 0,
startedAt: 1,
updatedAt: 2,
}
store.hydrate(stale)
const got = store.get('r1')!
expect(got.workflowName).toBe('live')
expect(got.status).toBe('running')
})

View File

@@ -0,0 +1,177 @@
import { expect, test } from 'bun:test'
import { mkdtemp, rm, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { attachRunStatePersistence, readRunState } from '../persistence.js'
import { createProgressBus } from '../progress/bus.js'
import { createProgressStoreFromBus } from '../progress/store.js'
/**
* attachRunStatePersistence 的契约测试(调整后 Task 4
* 直接测 bus + store 组合,不走 makeService保持 makeService 签名 (ports, store, cwdOverride?) 不变)。
*
* runsDir 通过 attachRunStatePersistence 的第三个参数 runsDirProvider 注入 tmpdir
* 避免写真实项目目录Bun ESM 模块命名空间只读,无法 monkey-patch getRunsDir
*/
test('run_done completed → 写盘 state.jsonreturnValue 一致', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-persist-'))
try {
const bus = createProgressBus()
const store = createProgressStoreFromBus(bus)
attachRunStatePersistence(bus, store, () => dir)
bus.emit({
type: 'run_started',
runId: 'rW',
workflowName: 'w',
meta: null,
})
bus.emit({
type: 'run_done',
runId: 'rW',
status: 'completed',
returnValue: { ok: true, n: 3 },
})
// writeRunState 是 async订阅里 void writeRunState(...));让 microtask 跑完
await new Promise(r => setTimeout(r, 50))
const got = await readRunState(dir, 'rW')
expect(got).not.toBeNull()
expect(got!.status).toBe('completed')
expect(got!.returnValue).toEqual({ ok: true, n: 3 })
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('run_done failed → 写盘 status=failed + error 字段', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-persist-'))
try {
const bus = createProgressBus()
const store = createProgressStoreFromBus(bus)
attachRunStatePersistence(bus, store, () => dir)
bus.emit({
type: 'run_started',
runId: 'rF',
workflowName: 'w',
meta: null,
})
bus.emit({
type: 'run_done',
runId: 'rF',
status: 'failed',
error: 'boom',
})
await new Promise(r => setTimeout(r, 50))
const got = await readRunState(dir, 'rF')
expect(got).not.toBeNull()
expect(got!.status).toBe('failed')
expect(got!.error).toBe('boom')
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('run_done killed → 写盘 status=killed', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-persist-'))
try {
const bus = createProgressBus()
const store = createProgressStoreFromBus(bus)
attachRunStatePersistence(bus, store, () => dir)
bus.emit({
type: 'run_started',
runId: 'rK',
workflowName: 'w',
meta: null,
})
bus.emit({ type: 'run_done', runId: 'rK', status: 'killed' })
await new Promise(r => setTimeout(r, 50))
const got = await readRunState(dir, 'rK')
expect(got?.status).toBe('killed')
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('writeRunState 内部 IO 异常被吞掉attachRunStatePersistence 不传播bus emit 不中断', async () => {
const blockerDir = await mkdtemp(join(tmpdir(), 'wf-persist-'))
// 先创建一个同名文件,让子路径 mkdir 失败 → writeRunState 内部 catch 吞掉
await writeFile(join(blockerDir, 'not-a-dir.txt'), 'blocker', 'utf-8')
try {
const bus = createProgressBus()
const store = createProgressStoreFromBus(bus)
// runsDir 指向一个父路径是文件的目录mkdir recursive 失败
attachRunStatePersistence(bus, store, () =>
join(blockerDir, 'not-a-dir.txt'),
)
// 额外的订阅者验证它仍被通知bus emit 不应因持久化 listener 内部异常中断)
let otherNotified = 0
bus.subscribe(() => otherNotified++)
// bus.emit 不应抛——writeRunState 内部吞异常
expect(() => {
bus.emit({
type: 'run_started',
runId: 'rErr',
workflowName: 'w',
meta: null,
})
bus.emit({
type: 'run_done',
runId: 'rErr',
status: 'completed',
returnValue: 'x',
})
}).not.toThrow()
// 让 writeRunState 的 microtask 跑完(异常在内部被吞)
await new Promise(r => setTimeout(r, 50))
// store 这条订阅者仍正常工作(收到了 run_started + run_done 两次事件)
expect(otherNotified).toBeGreaterThanOrEqual(2)
expect(store.get('rErr')?.status).toBe('completed')
} finally {
await rm(blockerDir, { recursive: true, force: true })
}
})
test('attachRunStatePersistence 返回 unsubscribe调用后不再写盘', async () => {
const dir = await mkdtemp(join(tmpdir(), 'wf-persist-'))
try {
const bus = createProgressBus()
const store = createProgressStoreFromBus(bus)
const unsub = attachRunStatePersistence(bus, store, () => dir)
// 先发一个 run_done验证写盘生效
bus.emit({
type: 'run_started',
runId: 'r1',
workflowName: 'w',
meta: null,
})
bus.emit({ type: 'run_done', runId: 'r1', status: 'completed' })
await new Promise(r => setTimeout(r, 50))
expect(await readRunState(dir, 'r1')).not.toBeNull()
// unsubscribe 后再发 run_done不应再写盘
unsub()
bus.emit({
type: 'run_started',
runId: 'r2',
workflowName: 'w',
meta: null,
})
bus.emit({ type: 'run_done', runId: 'r2', status: 'completed' })
await new Promise(r => setTimeout(r, 50))
expect(await readRunState(dir, 'r2')).toBeNull()
} finally {
await rm(dir, { recursive: true, force: true })
}
})

View File

@@ -9,7 +9,10 @@ import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { makeService, __resetWorkflowServiceForTests } from '../service.js'
import { createProgressBus } from '../progress/bus.js'
import { createProgressStoreFromBus } from '../progress/store.js'
import {
createProgressStoreFromBus,
type RunProgress,
} from '../progress/store.js'
import type {
AgentRunResult,
ProgressEvent,
@@ -356,3 +359,153 @@ test('shutdown 不重复杀已完成 run幂等多次调用安全', asyn
// 幂等
expect(() => svc.shutdown()).not.toThrow()
})
// ---- Task 5: loadPersistedRuns + getRunAsync fallback ----
// runsDirProvider 作为 makeService 第四个可选参数注入 tmpdir避免写真实项目目录
// Bun ESM 模块命名空间只读,无法 monkey-patch getRunsDir
test('loadPersistedRuns 扫盘 hydrate 历史 run已有内存 run 不被覆盖', async () => {
__resetWorkflowServiceForTests()
const dir = await mkdtemp(join(tmpdir(), 'wf-svc-'))
try {
// 磁盘先有两个历史 run
const { writeRunState } = await import('../persistence.js')
const historicalA = {
runId: 'hA',
workflowName: 'old-A',
status: 'completed',
phases: [],
declaredPhases: [],
currentPhase: null,
agents: [],
agentCount: 1,
returnValue: 'a',
startedAt: 10,
updatedAt: 20,
} as RunProgress
const historicalB = {
runId: 'hB',
workflowName: 'old-B',
status: 'failed',
phases: [],
declaredPhases: [],
currentPhase: null,
agents: [],
agentCount: 2,
error: 'x',
startedAt: 30,
updatedAt: 40,
} as RunProgress
await writeRunState(dir, historicalA)
await writeRunState(dir, historicalB)
const { ports, store } = fakePorts()
// 内存先有一个本次会话 run通过 ports.progressEmitter.emit 走 bus → store
ports.progressEmitter.emit({
type: 'run_started',
runId: 'live',
workflowName: 'live-w',
meta: null,
})
const svc = makeService(ports, store, undefined, () => dir)
await svc.loadPersistedRuns()
const ids = svc.listRuns().map(r => r.runId)
expect(ids).toContain('hA')
expect(ids).toContain('hB')
expect(ids).toContain('live')
// 内存优先live 仍是 running不被磁盘覆盖磁盘里没有 live 也不会注入 STALE
expect(svc.getRun('live')!.status).toBe('running')
expect(svc.getRun('hA')!.returnValue).toBe('a')
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('loadPersistedRuns 重复调用仅扫盘一次persistedLoaded flag', async () => {
__resetWorkflowServiceForTests()
const dir = await mkdtemp(join(tmpdir(), 'wf-svc-'))
try {
const { ports, store } = fakePorts()
const svc = makeService(ports, store, undefined, () => dir)
await svc.loadPersistedRuns()
await svc.loadPersistedRuns()
await svc.loadPersistedRuns()
// 重复调用不抛错、不改变 listRuns 结果(空目录)
expect(svc.listRuns()).toEqual([])
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('getRunAsync 内存命中 → 不读盘', async () => {
__resetWorkflowServiceForTests()
const dir = await mkdtemp(join(tmpdir(), 'wf-svc-'))
try {
const { ports, store } = fakePorts()
const svc = makeService(ports, store, undefined, () => dir)
ports.progressEmitter.emit({
type: 'run_started',
runId: 'live',
workflowName: 'w',
meta: null,
})
const got = await svc.getRunAsync('live')
expect(got?.runId).toBe('live')
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('getRunAsync 内存 miss + 磁盘命中 → 返回磁盘值,且不注入内存(再次 get 仍读盘)', async () => {
__resetWorkflowServiceForTests()
const dir = await mkdtemp(join(tmpdir(), 'wf-svc-'))
try {
const { writeRunState } = await import('../persistence.js')
const historical = {
runId: 'hist-only',
workflowName: 'old',
status: 'completed',
phases: [],
declaredPhases: [],
currentPhase: null,
agents: [],
agentCount: 0,
returnValue: { x: 1 },
startedAt: 1,
updatedAt: 2,
} as RunProgress
await writeRunState(dir, historical)
const { ports, store } = fakePorts()
const svc = makeService(ports, store, undefined, () => dir)
const got = await svc.getRunAsync('hist-only')
expect(got?.returnValue).toEqual({ x: 1 })
// 不注入内存:内存 list 不含(未 hydrate
expect(svc.listRuns().map(r => r.runId)).not.toContain('hist-only')
// 再次 get 仍能返回(每次走 readRunState fallback
const got2 = await svc.getRunAsync('hist-only')
expect(got2?.returnValue).toEqual({ x: 1 })
} finally {
await rm(dir, { recursive: true, force: true })
}
})
test('getRunAsync 内存 miss + 磁盘 miss → undefined', async () => {
__resetWorkflowServiceForTests()
const dir = await mkdtemp(join(tmpdir(), 'wf-svc-'))
try {
const { ports, store } = fakePorts()
const svc = makeService(ports, store, undefined, () => dir)
const got = await svc.getRunAsync('no-such-run')
expect(got).toBeUndefined()
} finally {
await rm(dir, { recursive: true, force: true })
}
})