From bd470b5ad46e1d6ed426edb1b001d23e38b25783 Mon Sep 17 00:00:00 2001 From: claude-code-best Date: Sun, 14 Jun 2026 11:11:33 +0800 Subject: [PATCH] =?UTF-8?q?feat(workflow):=20agent=20=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E9=87=8D=E8=AF=95=E4=B8=80=E6=AC=A1=EF=BC=88?= =?UTF-8?q?dead=20=E6=88=96=E9=9D=9E=20abort=20throw=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - hooks.agent 包装 invokeBackend:第一次 dead 或非 abort throw → 重试一次 - WorkflowAbortedError(kill)不重试——是用户意图 - registry.resolve 配置错(AdapterNotFoundError 等)在 try 外直接上抛,不走重试—— 配置问题重试无意义且掩盖 bug - 重试仍失败:dead 保持 dead;throw 降级 dead(不击穿 workflow, 与 parallel/pipeline null-on-error 契约一致) - budget 不重复扣:dead 不 addOutputTokens,重试 ok 才扣一次 - 新增 7 项 hooks 层重试测试 + 1 项 service 层降级测试 Co-Authored-By: glm-5.2 --- .../src/__tests__/hooks.test.ts | 104 ++++++++++++++++++ packages/workflow-engine/src/engine/hooks.ts | 37 ++++++- src/workflow/__tests__/service.test.ts | 44 +++++--- 3 files changed, 169 insertions(+), 16 deletions(-) diff --git a/packages/workflow-engine/src/__tests__/hooks.test.ts b/packages/workflow-engine/src/__tests__/hooks.test.ts index 3f3324074..6d5432279 100644 --- a/packages/workflow-engine/src/__tests__/hooks.test.ts +++ b/packages/workflow-engine/src/__tests__/hooks.test.ts @@ -123,6 +123,110 @@ test('agent dead → null', async () => { expect(await hooks.agent('hi')).toBeNull() }) +// 重试:dead 或 非 abort throw 都给一次重试机会;WorkflowAbortedError(kill)不重试。 +// 重试仍失败:dead 保持 dead;throw 降级为 dead(不击穿 workflow,hooks.agent 返 null)。 +test('agent dead → 重试一次成功 → ok', async () => { + let calls = 0 + const { hooks } = buildCtx({ + runner: async () => { + calls++ + return calls === 1 + ? { kind: 'dead' as const } + : { + kind: 'ok' as const, + output: 'recovered', + usage: { outputTokens: 5 }, + } + }, + }) + expect(await hooks.agent('p')).toBe('recovered') + expect(calls).toBe(2) +}) + +test('agent dead → 重试仍 dead → 最终 null(dead 保持 dead)', async () => { + let calls = 0 + const { hooks } = buildCtx({ + runner: async () => { + calls++ + return { kind: 'dead' as const } + }, + loggerWarn: () => {}, + }) + expect(await hooks.agent('p')).toBeNull() + expect(calls).toBe(2) +}) + +test('agent 非 abort throw → 重试一次成功 → ok', async () => { + let calls = 0 + const { hooks } = buildCtx({ + runner: async () => { + calls++ + if (calls === 1) throw new Error('transient network') + return { + kind: 'ok' as const, + output: 'recovered', + usage: { outputTokens: 3 }, + } + }, + loggerWarn: () => {}, + }) + expect(await hooks.agent('p')).toBe('recovered') + expect(calls).toBe(2) +}) + +test('agent 非 abort throw → 重试仍 throw → 降级 dead(返 null,不击穿 workflow)', async () => { + let calls = 0 + const { hooks } = buildCtx({ + runner: async () => { + calls++ + throw new Error('persistent') + }, + loggerWarn: () => {}, + }) + expect(await hooks.agent('p')).toBeNull() + expect(calls).toBe(2) +}) + +test('agent throw WorkflowAbortedError → 不重试,直接 rethrow(kill 不容许重试)', async () => { + let calls = 0 + const { hooks } = buildCtx({ + runner: async () => { + calls++ + throw new WorkflowAbortedError() + }, + }) + await expect(hooks.agent('p')).rejects.toBeInstanceOf(WorkflowAbortedError) + expect(calls).toBe(1) +}) + +test('agent ok → 不重试(calls=1,省一次 backend 往返)', async () => { + let calls = 0 + const { hooks } = buildCtx({ + runner: async () => { + calls++ + return { + kind: 'ok' as const, + output: 'first-try', + usage: { outputTokens: 1 }, + } + }, + }) + expect(await hooks.agent('p')).toBe('first-try') + expect(calls).toBe(1) +}) + +test('agent skipped → 不重试(用户主动 skip,不重试)', async () => { + let calls = 0 + const { hooks } = buildCtx({ + runner: async () => { + calls++ + return { kind: 'skipped' as const } + }, + }) + expect(await hooks.agent('p')).toBeNull() + expect(calls).toBe(1) +}) + test('agent journal 命中时不调用 runner', async () => { let called = 0 const { emitter } = createBufferingEmitter() diff --git a/packages/workflow-engine/src/engine/hooks.ts b/packages/workflow-engine/src/engine/hooks.ts index 51472f467..923e21aca 100644 --- a/packages/workflow-engine/src/engine/hooks.ts +++ b/packages/workflow-engine/src/engine/hooks.ts @@ -154,9 +154,40 @@ export function makeHooks( : {}), } : null - const result = registry - ? await registry.resolve(params).run(params, adapterCtx!) - : await ctx.ports.agentRunner.runAgentToResult(params, ctx.host) + // resolve 在 try 外:配置错(AdapterNotFoundError 等)直接上抛,不走重试—— + // 这是 workflow 配置问题而非 backend 临时故障,重试无意义且掩盖 bug。 + const adapter = registry ? registry.resolve(params) : null + const invokeBackend = (): Promise => + adapter + ? adapter.run(params, adapterCtx!) + : ctx.ports.agentRunner.runAgentToResult(params, ctx.host) + + // 失败一次自动重试:dead(terminal API error after retries)或 非 abort 抛错 + // 都给一次重试机会;WorkflowAbortedError(kill)不重试——是用户意图。 + // 重试仍失败:dead 保持 dead;throw 降级为 dead(不让一个 agent 击穿 workflow)。 + // budget 不重复扣:dead 不 addOutputTokens;重试 ok 才扣一次(最终 ok 时)。 + let result: AgentRunResult + try { + result = await invokeBackend() + if (result.kind === 'dead') { + ctx.ports.logger.warn?.( + `agent "${label ?? `#${agentId}`}" returned dead; retrying once`, + ) + result = await invokeBackend() + } + } catch (e) { + if (e instanceof WorkflowAbortedError) throw e + ctx.ports.logger.warn?.( + `agent "${label ?? `#${agentId}`}" threw (${(e as Error).message}); retrying once`, + ) + try { + result = await invokeBackend() + } catch (e2) { + if (e2 instanceof WorkflowAbortedError) throw e2 + // 重试仍抛:降级 dead(保持 workflow 继续;hooks.agent 返 null) + result = { kind: 'dead' } + } + } if (result.kind === 'ok') { ctx.resources.budget.addOutputTokens(result.usage.outputTokens) } diff --git a/src/workflow/__tests__/service.test.ts b/src/workflow/__tests__/service.test.ts index 52e138251..7e9d46b31 100644 --- a/src/workflow/__tests__/service.test.ts +++ b/src/workflow/__tests__/service.test.ts @@ -53,6 +53,8 @@ function fakePorts( calls: RegistrarCall[] /** runId → (agentId → AbortController)。测试模拟 backend 注册用。 */ agentBindings: Map> + /** adapter.run 被调次数(重试时累加)。holder 引用,测试读 adapterCalls.value。 */ + adapterCallsRef: { value: number } } { const bus = createProgressBus() const store = createProgressStoreFromBus(bus) @@ -61,6 +63,10 @@ function fakePorts( const bindings = new Map() // agentId → AbortController(每个 runId 独立)。killAgent 据此精确中断。 const agentBindings = new Map>() + // adapter.run 被调次数(重试时累加)。用 holder object 避免 closure/getter + // 在 Bun test runner 里的快照语义问题——返回时 shorthand 取当前值(=0), + // 后续 outer 变量 ++ 不会反映到 returned object 字段。holder 引用稳定。 + const adapterCallsRef = { value: 0 } let seq = 0 const ports = { // hostFactory 实际不被 service.launch 路径调用(service 自建 host handle), @@ -78,14 +84,19 @@ function fakePorts( run: opts.adapterThrow !== undefined ? async (): Promise => { + adapterCallsRef.value++ throw new Error(opts.adapterThrow) } - : async (): Promise => - opts.adapterResult ?? { - kind: 'ok', - output: 'mock-out', - usage: { outputTokens: 1 }, - }, + : async (): Promise => { + adapterCallsRef.value++ + return ( + opts.adapterResult ?? { + kind: 'ok', + output: 'mock-out', + usage: { outputTokens: 1 }, + } + ) + }, }), }, agentRunner: { @@ -158,7 +169,7 @@ function fakePorts( warn: () => {}, }, } as unknown as WorkflowPorts - return { ports, store, killed, calls, agentBindings } + return { ports, store, killed, calls, agentBindings, adapterCallsRef } } const stubTUC = { agentId: 'a1', toolUseId: 'tu' } as never @@ -349,17 +360,24 @@ test('脚本运行抛错 → service 路由到 taskRegistrar.fail,带 error expect(fail?.kind === 'fail' && fail.error).toMatch(/script boom/) }) -test('adapter 抛错 → service 通过 .catch 路径路由到 taskRegistrar.fail', async () => { +test('adapter 抛错 → 重试仍抛 → 降级 dead → workflow completed(不 fail)', async () => { __resetWorkflowServiceForTests() - const { ports, store, calls } = fakePorts({ adapterThrow: 'adapter boom' }) + // 新语义:agent 非 abort 抛错 → 重试一次 → 仍抛 → 降级 dead(agent 返 null), + // workflow 继续并 completed。重试容许临时故障(429/网络),但一个 agent + // 永久坏也不击穿整个 workflow(与 parallel/pipeline 的 null-on-error 契约一致)。 + const { ports, store, calls, adapterCallsRef } = fakePorts({ + adapterThrow: 'adapter boom', + }) const svc = makeService(ports, store) await svc.launch({ script: `return agent('x')` }, stubTUC, stubCanUseTool) await settle() + // 重试一次 → adapter 被调 2 次 + expect(adapterCallsRef.value).toBe(2) + // workflow 正常 completed,未 failed + const complete = calls.find(c => c.kind === 'complete') + expect(complete).toBeDefined() const fail = calls.find(c => c.kind === 'fail') - expect(fail).toBeDefined() - // adapter throw → runWorkflow 的内部 try/catch 转 failed status,error 透传; - // 或透传到 detached promise 的 .catch。两者最终都进 taskRegistrar.fail。 - expect(fail?.kind === 'fail' && fail.error).toMatch(/adapter boom/) + expect(fail).toBeUndefined() }) test('脚本正常完成 → service 路由到 taskRegistrar.complete', async () => {