diff --git a/packages/workflow-engine/src/__tests__/agentAdapter.test.ts b/packages/workflow-engine/src/__tests__/agentAdapter.test.ts index f868b55b4..0de4d92da 100644 --- a/packages/workflow-engine/src/__tests__/agentAdapter.test.ts +++ b/packages/workflow-engine/src/__tests__/agentAdapter.test.ts @@ -33,6 +33,7 @@ const CTX = { host: createHostHandle(null), signal: new AbortController().signal, runId: 'r', + agentId: 1, } test('resolve 默认走 default adapter,run 返回结果', async () => { diff --git a/packages/workflow-engine/src/__tests__/hooks.test.ts b/packages/workflow-engine/src/__tests__/hooks.test.ts index cef17ab0b..3f3324074 100644 --- a/packages/workflow-engine/src/__tests__/hooks.test.ts +++ b/packages/workflow-engine/src/__tests__/hooks.test.ts @@ -24,6 +24,14 @@ type CtxOverrides = Partial<{ truncated: string[] agentAdapterRegistry: AgentAdapterRegistry loggerWarn: (msg: string) => void + // taskRegistrar 的 agent 级 abort 绑定(agent kill 桥接)。 + // 提供后 buildCtx 注入到 ports.taskRegistrar;hooks.agent 把闭包塞进 adapterCtx。 + registerAgentAbort: ( + runId: string, + agentId: number, + ac: AbortController, + ) => void + unregisterAgentAbort: (runId: string, agentId: number) => void }> function buildCtx(overrides: CtxOverrides = {}): { @@ -50,6 +58,12 @@ function buildCtx(overrides: CtxOverrides = {}): { fail: () => {}, kill: () => {}, pendingAction: () => overrides.pending ?? null, + ...(overrides.registerAgentAbort + ? { registerAgentAbort: overrides.registerAgentAbort } + : {}), + ...(overrides.unregisterAgentAbort + ? { unregisterAgentAbort: overrides.unregisterAgentAbort } + : {}), }, journalStore: { read: async () => [], @@ -424,3 +438,73 @@ test('agentAdapterRegistry resolve 抛错 → agent 上抛(workflow failed)' }) await expect(hooks.agent('x')).rejects.toThrow() }) + +// service.kill(runId, agentId) 桥接:hooks.agent 必须把 taskRegistrar 的 +// registerAgentAbort/unregisterAgentAbort 注入 adapterCtx(绑定当前 runId)。 +// backend 据此把 agentAbort controller 塞进 Map,service.kill 据 agentId 精确 abort。 +test('agentAdapter ctx 注入 registerAgentAbort/unregisterAgentAbort(绑定 runId 转发 taskRegistrar)', async () => { + const registered: Array<{ + runId: string + agentId: number + controller: AbortController + }> = [] + const unregistered: Array<{ runId: string; agentId: number }> = [] + // 捕获 hooks 传给 adapter 的 ctx(验证 register/unregister 已注入且绑定 runId) + let capturedCtx: { + registerAgentAbort?: (id: number, ac: AbortController) => void + unregisterAgentAbort?: (id: number) => void + agentId: number + runId: string + } | null = null + const registry = new AgentAdapterRegistry() + .register({ + id: 'ad', + capabilities: { structuredOutput: true }, + async run(_params, ctx) { + capturedCtx = ctx + return { kind: 'ok', output: 'x', usage: { outputTokens: 1 } } + }, + }) + .default('ad') + const { hooks } = buildCtx({ + agentAdapterRegistry: registry, + registerAgentAbort: (runId, agentId, controller) => + registered.push({ runId, agentId, controller }), + unregisterAgentAbort: (runId, agentId) => + unregistered.push({ runId, agentId }), + }) + await hooks.agent('x') + // ctx 含 register/unregister(闭包绑定 runId='r1') + expect(capturedCtx).not.toBeNull() + expect(typeof capturedCtx!.registerAgentAbort).toBe('function') + expect(typeof capturedCtx!.unregisterAgentAbort).toBe('function') + // 模拟 backend 调用:注入的闭包把 (agentId, controller) 转发到 taskRegistrar, + // 并自动补 runId='r1'(backend 不需要知道 runId) + const ac = new AbortController() + capturedCtx!.registerAgentAbort!(7, ac) + capturedCtx!.unregisterAgentAbort!(7) + expect(registered).toEqual([{ runId: 'r1', agentId: 7, controller: ac }]) + expect(unregistered).toEqual([{ runId: 'r1', agentId: 7 }]) +}) + +test('taskRegistrar 未提供 registerAgentAbort → adapterCtx 也不含(hooks 不报错)', async () => { + // 不传 registerAgentAbort/unregisterAgentAbort overrides → buildCtx 也不注入 taskRegistrar + // hooks 用 optional chaining 跳过,adapterCtx 不含这两个字段 + let capturedCtx: object | null = null + const registry = new AgentAdapterRegistry() + .register({ + id: 'ad', + capabilities: { structuredOutput: true }, + async run(_params, ctx) { + capturedCtx = ctx + return { kind: 'ok', output: 'x', usage: { outputTokens: 1 } } + }, + }) + .default('ad') + const { hooks } = buildCtx({ agentAdapterRegistry: registry }) + await hooks.agent('x') + expect(capturedCtx).not.toBeNull() + expect( + (capturedCtx! as Record).registerAgentAbort, + ).toBeUndefined() +}) diff --git a/packages/workflow-engine/src/agentAdapter.ts b/packages/workflow-engine/src/agentAdapter.ts index 193ae463a..7639414ea 100644 --- a/packages/workflow-engine/src/agentAdapter.ts +++ b/packages/workflow-engine/src/agentAdapter.ts @@ -25,11 +25,29 @@ export type AgentAdapterContext = { signal: AbortSignal /** 当前 workflow runId(日志/追踪用)。 */ runId: string + /** + * 引擎层 agent 序号(hooks.agentIdSeq 递增;面板 RunProgress.agents[].id 同源)。 + * 注意:与 backend 内部创建的 core AgentId(字符串,子 agent 跟踪用)是两个不同概念, + * 不可混用。本字段用于 registerAgentAbort/unregisterAgentAbort 的 key,让 service + * .kill(runId, agentId) 能精确路由到 backend 创建的 AbortController。 + */ + agentId: number /** * 运行中进度上报(后端循环累计 token/tool 时调用)。可选:独立后端可不实现; * 引擎据此发 agent_progress 事件(闭包带 agentId/runId 关联),面板实时刷新。 */ onProgress?: (update: AgentProgressUpdate) => void + /** + * 注册 agent 级 AbortController(可选)。后端创建 controller 后调此注入 Map, + * 让 service.kill(runId, agentId) 能精确中断单个 agent 而不影响其他。 + * 由 hooks.agent 在 backend.run 调用前注入。 + */ + registerAgentAbort?: (agentId: number, ac: AbortController) => void + /** + * 注销 agent 级 AbortController(agent 完成或失败时调;幂等)。 + * 与 registerAgentAbort 配对。 + */ + unregisterAgentAbort?: (agentId: number) => void } /** diff --git a/packages/workflow-engine/src/engine/hooks.ts b/packages/workflow-engine/src/engine/hooks.ts index a16bf9533..51472f467 100644 --- a/packages/workflow-engine/src/engine/hooks.ts +++ b/packages/workflow-engine/src/engine/hooks.ts @@ -117,13 +117,45 @@ export function makeHooks( const onProgress = (update: AgentProgressUpdate): void => { emit({ type: 'agent_progress', agentId, label, phase, ...update }) } - const result = registry - ? await registry.resolve(params).run(params, { + // 注入 agent 级 AbortController 注册/注销:backend 创建 controller 后调 + // registerAgentAbort 注入 ports 层 bindings,service.kill(runId, agentId) 据此 + // 精确中断单个 agent。registry 不存在(agentRunner 兜底路径)时无 backend 中间层, + // ports 层 agentAbortControllers 永远空——单 agent kill 在该路径降级为 no-op。 + const adapterCtx = registry + ? { host: ctx.host, signal: ctx.signal, runId: ctx.runId, + agentId, onProgress, - }) + ...(ctx.ports.taskRegistrar.registerAgentAbort + ? { + registerAgentAbort: ( + id: number, + ac: AbortController, + ): void => { + ctx.ports.taskRegistrar.registerAgentAbort?.( + ctx.runId, + id, + ac, + ) + }, + } + : {}), + ...(ctx.ports.taskRegistrar.unregisterAgentAbort + ? { + unregisterAgentAbort: (id: number): void => { + ctx.ports.taskRegistrar.unregisterAgentAbort?.( + ctx.runId, + id, + ) + }, + } + : {}), + } + : null + const result = registry + ? await registry.resolve(params).run(params, adapterCtx!) : await ctx.ports.agentRunner.runAgentToResult(params, ctx.host) if (result.kind === 'ok') { ctx.resources.budget.addOutputTokens(result.usage.outputTokens) diff --git a/packages/workflow-engine/src/ports.ts b/packages/workflow-engine/src/ports.ts index 1b4778a20..140355c9e 100644 --- a/packages/workflow-engine/src/ports.ts +++ b/packages/workflow-engine/src/ports.ts @@ -69,6 +69,21 @@ export type TaskRegistrar = { complete(runId: string, summary?: string): void fail(runId: string, error: string): void kill(runId: string): void + /** + * 注册 agent 级 AbortController。backend 启动 agent 时调用,让 service + * .kill(runId, agentId) 能精确中断单个 agent(不影响同 run 其他 agent)。 + * 幂等:同 agentId 重复注册覆盖。 + */ + registerAgentAbort?(runId: string, agentId: number, ac: AbortController): void + /** + * 注销 agent 级 AbortController(agent 完成/失败时调;幂等)。 + */ + unregisterAgentAbort?(runId: string, agentId: number): void + /** + * 中断单个 agent。返回是否命中(false = agent 已完成/不存在)。 + * 不影响同 run 其他 agent,workflow 继续跑(被中断 agent 返回 dead → null)。 + */ + killAgent?(runId: string, agentId: number): boolean /** 返回当前待处理的 skip/retry 动作,或 null。 */ pendingAction(runId: string): { kind: 'skip' | 'retry' } | null } diff --git a/src/workflow/__tests__/claudeCodeBackend.test.ts b/src/workflow/__tests__/claudeCodeBackend.test.ts index e16666098..1e396768f 100644 --- a/src/workflow/__tests__/claudeCodeBackend.test.ts +++ b/src/workflow/__tests__/claudeCodeBackend.test.ts @@ -76,6 +76,7 @@ mock.module('src/utils/worktree.js', () => ({ }, })) +import { WorkflowAbortedError } from '@claude-code-best/workflow-engine' import { claudeCodeBackend, resolveAgentDefinition, @@ -108,6 +109,7 @@ function ctx() { }), signal: new AbortController().signal, runId: 'r1', + agentId: 1, } } @@ -188,6 +190,85 @@ test('runAgent 抛错 → dead', async () => { expect(res.kind).toBe('dead') }) +// 下面三组测试覆盖 'x' 无效修复:backend 必须把 ctx.signal 桥接到 runAgent.override +// .abortController,并把 AbortError 识别为 abort(throw WorkflowAbortedError,而非吞成 dead)。 +// 还要验证 registerAgentAbort 注入,让 service.kill(runId, agentId) 能精确中断单个 agent。 + +test('ctx.signal 预 abort → backend 桥接:override.abortController.signal.aborted=true', async () => { + // 用 capturedOverride 暴露 backend 创建的 agentAbort(mock 收到的 override.abortController) + let capturedController: AbortController | undefined + mock.module( + '@claude-code-best/builtin-tools/tools/AgentTool/runAgent.js', + () => ({ + runAgent: async function* (opts: { + override?: { abortController?: AbortController } + }) { + capturedController = opts.override?.abortController + yield { + type: 'assistant', + message: { content: [{ type: 'text', text: 'x' }] }, + } + }, + }), + ) + const parentAbort = new AbortController() + parentAbort.abort() + // mock 不抛 → backend 走正常返回路径;但桥接 `if (ctx.signal.aborted) agentAbort.abort()` + // 已同步触发,capturedController.signal.aborted 必为 true(kill 桥接根因) + await claudeCodeBackend.run( + { prompt: 'pre-aborted' }, + { ...ctx(), signal: parentAbort.signal }, + ) + expect(capturedController?.signal.aborted).toBe(true) +}) + +test('runAgent 抛 AbortError → backend throw WorkflowAbortedError(不吞成 dead)', async () => { + mock.module( + '@claude-code-best/builtin-tools/tools/AgentTool/runAgent.js', + () => ({ + // biome-ignore lint/correctness/useYield: 故意抛 AbortError 测识别分支 + runAgent: async function* () { + const e = new Error('aborted by parent') + e.name = 'AbortError' + throw e + }, + }), + ) + await expect( + claudeCodeBackend.run({ prompt: 'abort' }, ctx()), + ).rejects.toBeInstanceOf(WorkflowAbortedError) +}) + +test('registerAgentAbort/unregisterAgentAbort 注入:key=ctx.agentId(数字),controller 来自桥接', async () => { + // 恢复默认 mock(上一个测试把它改成抛 AbortError 了) + mock.module( + '@claude-code-best/builtin-tools/tools/AgentTool/runAgent.js', + () => ({ + runAgent: async function* () { + yield { + type: 'assistant', + message: { content: [{ type: 'text', text: 'agent-text' }] }, + } + }, + }), + ) + const registered: Array<{ id: number; controller: AbortController }> = [] + const unregistered: number[] = [] + await claudeCodeBackend.run( + { prompt: 'wiring' }, + { + ...ctx(), + agentId: 42, + registerAgentAbort: (id, ac) => registered.push({ id, controller: ac }), + unregisterAgentAbort: id => unregistered.push(id), + }, + ) + expect(registered).toHaveLength(1) + expect(registered[0]?.id).toBe(42) // 引擎数字 agentId(非 coreAgentId 字符串) + expect(registered[0]?.controller).toBeInstanceOf(AbortController) + expect(unregistered).toEqual([42]) // finally 清理幂等 +}) + test('id 与 capabilities 形状', () => { expect(claudeCodeBackend.id).toBe('claude-code') expect(claudeCodeBackend.capabilities.structuredOutput).toBe(true) diff --git a/src/workflow/__tests__/ports.test.ts b/src/workflow/__tests__/ports.test.ts index 95847da6c..630f3177f 100644 --- a/src/workflow/__tests__/ports.test.ts +++ b/src/workflow/__tests__/ports.test.ts @@ -93,6 +93,95 @@ test('taskRegistrar.register/complete/kill 经 RunBinding 路由(真 setAppSta ports.taskRegistrar.kill(runId) }) +// agent 级 kill 桥接:register → killAgent 精确中断;kill(runId) 顺带 abort 所有 agent。 +test('taskRegistrar agentAbortControllers:register/killAgent 精确中断;kill(runId) 批量 abort', () => { + const bus = createProgressBus() + const store = createProgressStoreFromBus(bus) + const ports = createWorkflowPorts({ bus, store }) + // 实现always provides these — cast 把 optional 拍平为 required(避免每行 ! 断言) + const tr = ports.taskRegistrar as Required + + const state = { tasks: {} } as unknown as AppState + const setAppState: SetAppState = f => { + Object.assign(state, f(state)) + } + const hostCtx = ports.hostFactory({ + context: { agentId: 'a-1', toolUseId: 'tu-1', setAppState }, + canUseTool: (() => Promise.resolve({ behavior: 'allow' })) as never, + parentMessage: {} as never, + }) + const { runId } = tr.register( + { + workflowName: 'wf', + summary: 'summary', + workflowFile: 'wf.ts', + toolUseId: 'tu-1', + }, + hostCtx.handle, + ) + + // 注册两个 agent 的 AbortController(模拟 backend 在启动 agent 时调用) + const ac1 = new AbortController() + const ac2 = new AbortController() + tr.registerAgentAbort(runId, 1, ac1) + tr.registerAgentAbort(runId, 2, ac2) + expect(ac1.signal.aborted).toBe(false) + expect(ac2.signal.aborted).toBe(false) + + // killAgent 精确中断 agent #1:仅 ac1 abort,ac2 不受影响 + expect(tr.killAgent(runId, 1)).toBe(true) + expect(ac1.signal.aborted).toBe(true) + expect(ac2.signal.aborted).toBe(false) + // 重复 kill 同 agent:controller 已 delete,返回 false(幂等) + expect(tr.killAgent(runId, 1)).toBe(false) + + // 未知 agentId / 未知 runId 安全返回 false + expect(tr.killAgent(runId, 999)).toBe(false) + expect(tr.killAgent('nope', 1)).toBe(false) + + // kill(runId) 批量 abort 剩余 agent(ac2) + tr.kill(runId) + expect(ac2.signal.aborted).toBe(true) + + // run 终态后 binding 已回收:再 killAgent 返回 false + expect(tr.killAgent(runId, 2)).toBe(false) +}) + +test('unregisterAgentAbort 从 Map 删除(backend finally 清理幂等)', () => { + const bus = createProgressBus() + const store = createProgressStoreFromBus(bus) + const ports = createWorkflowPorts({ bus, store }) + const tr = ports.taskRegistrar as Required + + const state = { tasks: {} } as unknown as AppState + const setAppState: SetAppState = f => { + Object.assign(state, f(state)) + } + const hostCtx = ports.hostFactory({ + context: { agentId: 'a-1', toolUseId: 'tu-1', setAppState }, + canUseTool: (() => Promise.resolve({ behavior: 'allow' })) as never, + parentMessage: {} as never, + }) + const { runId } = tr.register( + { + workflowName: 'wf', + summary: 'summary', + workflowFile: 'wf.ts', + toolUseId: 'tu-1', + }, + hostCtx.handle, + ) + const ac = new AbortController() + tr.registerAgentAbort(runId, 5, ac) + // 注销后 killAgent 无目标,返 false(不抛) + tr.unregisterAgentAbort(runId, 5) + expect(tr.killAgent(runId, 5)).toBe(false) + // 重复注销幂等(backend finally 不抛) + expect(() => tr.unregisterAgentAbort(runId, 5)).not.toThrow() + // 未知 runId 安全 no-op + expect(() => tr.unregisterAgentAbort('nope', 5)).not.toThrow() +}) + test('hostFactory.cwd 与 journalStore 同根(getProjectRoot)—— 修复 K 回归', () => { // 历史 bug:hostFactory.cwd 用 getCwd()、journalStore 用 getProjectRoot(), // 用户进入 worktree/子目录时两者不同 → 命名 workflow 解析与 journal 落盘不同步。 diff --git a/src/workflow/__tests__/service.test.ts b/src/workflow/__tests__/service.test.ts index 216090c53..52e138251 100644 --- a/src/workflow/__tests__/service.test.ts +++ b/src/workflow/__tests__/service.test.ts @@ -27,6 +27,14 @@ type RegistrarCall = | { kind: 'complete'; runId: string; summary?: string } | { kind: 'fail'; runId: string; error?: string } | { kind: 'kill'; runId: string } + | { + kind: 'registerAgentAbort' + runId: string + agentId: number + controller: AbortController + } + | { kind: 'unregisterAgentAbort'; runId: string; agentId: number } + | { kind: 'killAgent'; runId: string; agentId: number } function fakePorts( opts: { @@ -41,14 +49,18 @@ function fakePorts( ports: WorkflowPorts store: ReturnType killed: string[] - /** taskRegistrar 调用记录(complete/fail/kill)。 */ + /** taskRegistrar 调用记录(complete/fail/kill/registerAgentAbort/...)。 */ calls: RegistrarCall[] + /** runId → (agentId → AbortController)。测试模拟 backend 注册用。 */ + agentBindings: Map> } { const bus = createProgressBus() const store = createProgressStoreFromBus(bus) const killed: string[] = [] const calls: RegistrarCall[] = [] const bindings = new Map() + // agentId → AbortController(每个 runId 独立)。killAgent 据此精确中断。 + const agentBindings = new Map>() let seq = 0 const ports = { // hostFactory 实际不被 service.launch 路径调用(service 自建 host handle), @@ -93,6 +105,7 @@ function fakePorts( seq += 1 const runId = `run-${seq}` bindings.set(runId, { abort }) + agentBindings.set(runId, new Map()) return { runId, signal: abort.signal } }, complete: (runId: string, summary?: string) => { @@ -106,6 +119,31 @@ function fakePorts( calls.push({ kind: 'kill', runId }) bindings.get(runId)?.abort.abort() }, + registerAgentAbort: ( + runId: string, + agentId: number, + controller: AbortController, + ) => { + calls.push({ + kind: 'registerAgentAbort', + runId, + agentId, + controller, + }) + agentBindings.get(runId)?.set(agentId, controller) + }, + unregisterAgentAbort: (runId: string, agentId: number) => { + calls.push({ kind: 'unregisterAgentAbort', runId, agentId }) + agentBindings.get(runId)?.delete(agentId) + }, + killAgent: (runId: string, agentId: number) => { + calls.push({ kind: 'killAgent', runId, agentId }) + const ac = agentBindings.get(runId)?.get(agentId) + if (!ac) return false + ac.abort() + agentBindings.get(runId)!.delete(agentId) + return true + }, pendingAction: () => null, }, journalStore: { @@ -120,7 +158,7 @@ function fakePorts( warn: () => {}, }, } as unknown as WorkflowPorts - return { ports, store, killed, calls } + return { ports, store, killed, calls, agentBindings } } const stubTUC = { agentId: 'a1', toolUseId: 'tu' } as never @@ -184,6 +222,33 @@ test('kill 走 taskRegistrar.kill', async () => { expect(killed).toContain(runId) }) +test('killAgent 走 taskRegistrar.killAgent:精确中断单个 agent', async () => { + __resetWorkflowServiceForTests() + const { ports, store, calls, agentBindings } = fakePorts() + const svc = makeService(ports, store) + const { runId } = await svc.launch( + { script: `return agent('x')` }, + stubTUC, + stubCanUseTool, + ) + // 模拟 backend 启动 agent 时注册 AbortController + const ac = new AbortController() + agentBindings.get(runId)!.set(7, ac) + // service.killAgent 路由到 taskRegistrar.killAgent,后者真 abort 对应 controller + expect(svc.killAgent(runId, 7)).toBe(true) + expect(ac.signal.aborted).toBe(true) + expect( + calls.some( + c => c.kind === 'killAgent' && c.runId === runId && c.agentId === 7, + ), + ).toBe(true) + // 已 abort 后 controller 从 Map 删除:再次 killAgent 同 agent 返 false(幂等) + expect(svc.killAgent(runId, 7)).toBe(false) + // 未知 agentId / 未知 runId 安全返 false + expect(svc.killAgent(runId, 999)).toBe(false) + expect(svc.killAgent('nope', 1)).toBe(false) +}) + test('listRuns/subscribe 来自 store', () => { __resetWorkflowServiceForTests() const { ports, store } = fakePorts() diff --git a/src/workflow/__tests__/useWorkflowKeyboard.test.ts b/src/workflow/__tests__/useWorkflowKeyboard.test.ts index 9ea8fa55e..196d90138 100644 --- a/src/workflow/__tests__/useWorkflowKeyboard.test.ts +++ b/src/workflow/__tests__/useWorkflowKeyboard.test.ts @@ -11,12 +11,27 @@ test('q / Esc → quit', () => { expect(routeWorkflowKey('', { escape: true })).toBe('quit') }) -test('x → kill;r → resume;n → newRun', () => { - expect(routeWorkflowKey('x', {})).toBe('kill') +test('x → killAgent;K → killWorkflow;r → resume;n → newRun', () => { + expect(routeWorkflowKey('x', {})).toBe('killAgent') + expect(routeWorkflowKey('K', {})).toBe('killWorkflow') expect(routeWorkflowKey('r', {})).toBe('resume') expect(routeWorkflowKey('n', {})).toBe('newRun') }) +test('confirm 模式:y/Enter → confirmYes;n/Esc/q → confirmNo;其他键 → null', () => { + expect(routeWorkflowKey('y', {}, 'confirm')).toBe('confirmYes') + expect(routeWorkflowKey('Y', {}, 'confirm')).toBe('confirmYes') + expect(routeWorkflowKey('', { return: true }, 'confirm')).toBe('confirmYes') + expect(routeWorkflowKey('n', {}, 'confirm')).toBe('confirmNo') + expect(routeWorkflowKey('N', {}, 'confirm')).toBe('confirmNo') + expect(routeWorkflowKey('', { escape: true }, 'confirm')).toBe('confirmNo') + expect(routeWorkflowKey('q', {}, 'confirm')).toBe('confirmNo') + // confirm 模式吞掉导航/编辑键,防误触 + expect(routeWorkflowKey('x', {}, 'confirm')).toBeNull() + expect(routeWorkflowKey('', { tab: true }, 'confirm')).toBeNull() + expect(routeWorkflowKey('', { upArrow: true }, 'confirm')).toBeNull() +}) + test('←/→ 切焦点列;↑/↓ 列内移动', () => { expect(routeWorkflowKey('', { leftArrow: true })).toBe('focusLeft') expect(routeWorkflowKey('', { rightArrow: true })).toBe('focusRight') diff --git a/src/workflow/backends/claudeCodeBackend.ts b/src/workflow/backends/claudeCodeBackend.ts index b9af9193d..d208d1870 100644 --- a/src/workflow/backends/claudeCodeBackend.ts +++ b/src/workflow/backends/claudeCodeBackend.ts @@ -5,6 +5,7 @@ import { type AgentAdapterContext, type AgentRunParams, type AgentRunResult, + WorkflowAbortedError, } from '@claude-code-best/workflow-engine' import { assembleToolPool } from '../../tools.js' import { finalizeAgentTool } from '@claude-code-best/builtin-tools/tools/AgentTool/agentToolUtils.js' @@ -146,14 +147,16 @@ export const claudeCodeBackend: AgentAdapter = { const appState = toolUseContext.getAppState() const agentDef = resolveAgentDefinition(params.agentType, toolUseContext) const model = mapWorkflowModel(params.model) - const agentId = createAgentId() + // coreAgentId:core 层子 agent 跟踪 ID(字符串,runAgent 内部用)。 + // 与 ctx.agentId(引擎 number seq,用于面板/killAgent 路由)是两个不同概念,不可混用。 + const coreAgentId = createAgentId() // isolation:'worktree' — 在独立 git worktree 里跑 agent,并发写互不冲突。 let worktreeInfo: WorkflowWorktreeInfo | null = null if (params.isolation === 'worktree') { try { worktreeInfo = await createAgentWorktree( - makeWorkflowWorktreeSlug(ctx.runId, agentId), + makeWorkflowWorktreeSlug(ctx.runId, coreAgentId), ) } catch (e) { // fail-closed:隔离未达成不静默退化为共享 cwd(否则并发写数据竞争) @@ -170,6 +173,21 @@ export const claudeCodeBackend: AgentAdapter = { runWithCwdOverride(worktreeInfo!.worktreePath, fn) : (fn: () => T): T => fn() + // 桥接 ctx.signal → runAgent.override.abortController。否则 workflow 被 kill + // 时 runAgent 不知道('x' 无效根因):abort 信号到不了内部 fetch,agent 跑到完成。 + // 单 agent kill 走 service.kill(runId, agentId) → ports.taskRegistrar.killAgent → + // agentAbortControllers.get(agentId).abort();同一 controller 接管两条路径。 + const agentAbort = new AbortController() + const onParentAbort = (): void => agentAbort.abort() + if (ctx.signal.aborted) { + agentAbort.abort() + } else { + ctx.signal.addEventListener('abort', onParentAbort, { once: true }) + } + if (typeof ctx.registerAgentAbort === 'function') { + ctx.registerAgentAbort(ctx.agentId, agentAbort) + } + const workerPermissionContext = { ...appState.toolPermissionContext, mode: agentDef.permissionMode ?? 'acceptEdits', @@ -201,9 +219,10 @@ export const claudeCodeBackend: AgentAdapter = { isAsync: true, querySource: toolUseContext.options.querySource ?? 'workflow', availableTools: workerTools, - override: { agentId }, + // override 同一对象:coreAgentId(core 子 agent 跟踪)+ abortController(kill 桥接)。 // runAgent 的 model 是顶层 ModelAlias;workflow 的 model 是任意别名串, // 类型上不兼容,运行时由 provider 层解析。双重断言透传(优于 as any/never)。 + override: { agentId: coreAgentId, abortController: agentAbort }, ...(model ? { model: model as unknown as ModelAlias } : {}), ...(worktreeInfo ? { worktreePath: worktreeInfo.worktreePath } : {}), })) { @@ -224,12 +243,23 @@ export const claudeCodeBackend: AgentAdapter = { } }) } catch (e) { + // abort(kill workflow / kill agent):识别后必须重抛 WorkflowAbortedError, + // 否则 hooks.agent 会把 abort 当作普通失败吞成 dead,workflow 不知道被 kill + // (kill 路径 'x' 无效的另一面:信号虽然到了,但结果被伪装成正常完成)。 + if (agentAbort.signal.aborted || (e as Error)?.name === 'AbortError') { + throw new WorkflowAbortedError() + } logForDebugging( `workflow sub-agent error (${agentDef.agentType}): ${(e as Error).message}`, ) logEvent('tengu_workflow_agent', { ok: 0 }) return { kind: 'dead' } } finally { + // 清理(幂等):listener removeEventListener / Map.delete 重复调用安全。 + if (typeof ctx.unregisterAgentAbort === 'function') { + ctx.unregisterAgentAbort(ctx.agentId) + } + ctx.signal.removeEventListener('abort', onParentAbort) if (worktreeInfo) { const info = worktreeInfo worktreeInfo = null @@ -237,7 +267,7 @@ export const claudeCodeBackend: AgentAdapter = { } } - const finalized = finalizeAgentTool(messages, agentId, { + const finalized = finalizeAgentTool(messages, coreAgentId, { prompt: params.prompt, resolvedAgentModel: toolUseContext.options.mainLoopModel, isBuiltInAgent: isBuiltInAgent(agentDef), diff --git a/src/workflow/panel/WorkflowsPanel.tsx b/src/workflow/panel/WorkflowsPanel.tsx index 19e383bf9..be6a3570a 100644 --- a/src/workflow/panel/WorkflowsPanel.tsx +++ b/src/workflow/panel/WorkflowsPanel.tsx @@ -1,5 +1,5 @@ import React, { useEffect, useState, useSyncExternalStore } from 'react'; -import { Box, Text, useAnimationFrame } from '@anthropic/ink'; +import { Box, Dialog, Text, useAnimationFrame } from '@anthropic/ink'; import type { Theme } from '@anthropic/ink'; import type { LocalJSXCommandContext, LocalJSXCommandOnDone } from '../../types/command.js'; import { getWorkflowService } from '../service.js'; @@ -47,6 +47,9 @@ export function WorkflowsPanel({ const [focusColumn, setFocusColumn] = useState('phases'); const [selectedPhaseIndex, setSelectedPhaseIndex] = useState(0); const [selectedAgentIndex, setSelectedAgentIndex] = useState(0); + // kill 二次确认。null = 无弹窗;'workflow' = 杀整个 run;'agent' = 杀当前选中 agent。 + // 非 null 时键盘进入 confirm 模式(仅 y/Enter/n/Esc/q 响应)。 + const [confirmKill, setConfirmKill] = useState(null); // mount 时触发一次扫盘 hydrate 历史 run(service 内部 persistedLoaded flag 守护幂等)。 // 重 mount/重渲染不会重复扫盘(flag 进程单例守护)。svc 引用稳定(getWorkflowService 单例)。 @@ -110,8 +113,18 @@ export function WorkflowsPanel({ if (focusColumn === 'phases') setSelectedPhaseIndex(s => clampSelected(s + 1, phaseRowCount)); else setSelectedAgentIndex(s => clampSelected(s + 1, visibleAgents.length)); }, - killFocused: () => { - if (focused) svc.kill(focused.runId); + killAgent: () => { + // 仅在 agents 列聚焦时弹 agent 确认(在 phases 列按 x 无目标,no-op)。 + // 选中 agent 由 visibleAgents[clampedAgent] 决定;保存到 confirmKill 后由 + // confirmYes 实际执行——避免在两次渲染间 visibleAgents 变化导致误杀。 + if (focusColumn !== 'agents' || !focused) return; + const agent = visibleAgents[clampedAgent]; + if (!agent) return; + setConfirmKill('agent'); + }, + killWorkflow: () => { + if (!focused) return; + setConfirmKill('workflow'); }, resumeFocused: () => { if (!focused) return; @@ -125,9 +138,27 @@ export function WorkflowsPanel({ .catch(e => onDone(`resume failed: ${(e as Error).message}`)); }, newRun: () => onDone('Tip: start a named workflow with /, or pass name via the Workflow tool.'), - quit: () => onDone(), + quit: () => { + // confirm 模式下 q = 取消确认(routeWorkflowKey 已路由到 confirmNo); + // 非 confirm 模式才真退出面板。 + if (confirmKill !== null) { + setConfirmKill(null); + return; + } + onDone(); + }, + confirmYes: () => { + if (confirmKill === 'workflow' && focused) { + svc.kill(focused.runId); + } else if (confirmKill === 'agent' && focused) { + const agent = visibleAgents[clampedAgent]; + if (agent) svc.killAgent(focused.runId, agent.id); + } + setConfirmKill(null); + }, + confirmNo: () => setConfirmKill(null), }; - useWorkflowKeyboard(handlers); + useWorkflowKeyboard(handlers, confirmKill !== null ? 'confirm' : 'normal'); const running = runs.filter(r => r.status === 'running').length; const done = runs.length - running; @@ -182,8 +213,31 @@ export function WorkflowsPanel({ - Tab switch run · ←/→ focus · ↑/↓ move · x kill · r resume · q quit + + {confirmKill !== null + ? 'Confirm: y kill · n/Esc cancel' + : 'Tab switch run · ←/→ focus · ↑/↓ move · x kill agent · K kill workflow · r resume · q quit'} + + + {confirmKill !== null ? ( + setConfirmKill(null)} + color="warning" + > + Press y to confirm, or n/Esc to cancel. + + ) : null} ); } diff --git a/src/workflow/panel/useWorkflowKeyboard.ts b/src/workflow/panel/useWorkflowKeyboard.ts index b7b4987f6..4298a6023 100644 --- a/src/workflow/panel/useWorkflowKeyboard.ts +++ b/src/workflow/panel/useWorkflowKeyboard.ts @@ -3,11 +3,15 @@ import { useInput } from '@anthropic/ink' /** 焦点所在列。 */ export type FocusColumn = 'phases' | 'agents' +/** 键盘模式:normal=正常导航;confirm=弹了 Dialog,等用户 y/n 确认。 */ +export type WorkflowKeyboardMode = 'normal' | 'confirm' + /** useInput 的 key 对象子集(仅声明用到的字段,避免耦合 ink Key 类型)。 */ type KeyEvent = { tab?: boolean shift?: boolean escape?: boolean + return?: boolean leftArrow?: boolean rightArrow?: boolean upArrow?: boolean @@ -22,19 +26,34 @@ export type WorkflowKeyAction = | 'focusRight' | 'moveUp' | 'moveDown' - | 'kill' + | 'killAgent' + | 'killWorkflow' | 'resume' | 'newRun' | 'quit' + | 'confirmYes' + | 'confirmNo' export function routeWorkflowKey( input: string, key: KeyEvent, + mode: WorkflowKeyboardMode = 'normal', ): WorkflowKeyAction | null { + // confirm 模式:仅 y/Enter 确认,n/Esc/q 取消,其他键吞掉(防误触) + if (mode === 'confirm') { + if (input === 'y' || input === 'Y' || key.return) return 'confirmYes' + if (input === 'n' || input === 'N' || key.escape || input === 'q') { + return 'confirmNo' + } + return null + } // @anthropic/ink 的 key.tab 对 Tab 键置 true;个别环境回落到 '\t' if (key.tab || input === '\t') return key.shift ? 'prevTab' : 'nextTab' if (key.escape || input === 'q') return 'quit' - if (input === 'x') return 'kill' + // 大写 K = 杀整个 workflow;小写 x = 杀当前选中 agent(仅 agents 列)。 + // 大小写区分避免 x 误触发 workflow kill;K 显式需要 Shift 暗示"重操作"。 + if (input === 'K') return 'killWorkflow' + if (input === 'x') return 'killAgent' if (input === 'r') return 'resume' if (input === 'n') return 'newRun' if (key.leftArrow) return 'focusLeft' @@ -52,10 +71,17 @@ export type WorkflowKeyboardHandlers = { focusRight: () => void moveUp: () => void moveDown: () => void - killFocused: () => void + /** 请求杀当前选中 agent(panel 弹 Dialog 二次确认)。 */ + killAgent: () => void + /** 请求杀整个 workflow(panel 弹 Dialog 二次确认)。 */ + killWorkflow: () => void resumeFocused: () => void newRun: () => void quit: () => void + /** confirm 模式下用户确认(y/Enter)。 */ + confirmYes: () => void + /** confirm 模式下用户取消(n/Esc/q)。 */ + confirmNo: () => void } /** @@ -63,11 +89,16 @@ export type WorkflowKeyboardHandlers = { * - Tab / Shift+Tab:切顶部 run tab * - ← / →:phases ↔ agents 焦点切换 * - ↑ / ↓:当前焦点列内移动 - * - x kill · r resume · n new · q / Esc quit + * - x kill 单 agent · K kill 整个 workflow(带 Dialog 二次确认) · r resume · n new · q / Esc quit + * + * @param mode confirm 时只接受 y/n/Esc/q,其他键吞掉——避免在确认弹窗里误导航。 */ -export function useWorkflowKeyboard(h: WorkflowKeyboardHandlers): void { +export function useWorkflowKeyboard( + h: WorkflowKeyboardHandlers, + mode: WorkflowKeyboardMode = 'normal', +): void { useInput((input, key) => { - const action = routeWorkflowKey(input, key as KeyEvent) + const action = routeWorkflowKey(input, key as KeyEvent, mode) if (action === null) return switch (action) { case 'nextTab': @@ -88,8 +119,11 @@ export function useWorkflowKeyboard(h: WorkflowKeyboardHandlers): void { case 'moveDown': h.moveDown() break - case 'kill': - h.killFocused() + case 'killAgent': + h.killAgent() + break + case 'killWorkflow': + h.killWorkflow() break case 'resume': h.resumeFocused() @@ -100,6 +134,12 @@ export function useWorkflowKeyboard(h: WorkflowKeyboardHandlers): void { case 'quit': h.quit() break + case 'confirmYes': + h.confirmYes() + break + case 'confirmNo': + h.confirmNo() + break } }) } diff --git a/src/workflow/ports.ts b/src/workflow/ports.ts index c6bb62299..bfeda9049 100644 --- a/src/workflow/ports.ts +++ b/src/workflow/ports.ts @@ -34,6 +34,8 @@ type RunBinding = { setAppState: SetAppState abortController: AbortController workflowName: string + /** agentId → AbortController。backend 启动 agent 时注册;killAgent 据此精确中断。 */ + agentAbortControllers: Map } /** 每次工具调用从 toolUseContext 构造 WorkflowHostContext。 */ @@ -107,6 +109,7 @@ export function createWorkflowPorts(opts: { setAppState, abortController, workflowName: regOpts.workflowName, + agentAbortControllers: new Map(), }) logForDebugging( `workflow task registered: ${runId} (${regOpts.workflowName})`, @@ -131,8 +134,40 @@ export function createWorkflowPorts(opts: { const b = bindings.get(runId) if (!b) return killWorkflowTask(b.taskId, b.setAppState) // 内部 abort controller + // 杀 run 同时中断所有 in-flight agent(防止 backend 没接到 task abort 的极端时序) + for (const ac of b.agentAbortControllers.values()) { + try { + ac.abort() + } catch { + // no-op:abort 内部不会抛,但 fail-closed + } + } + b.agentAbortControllers.clear() bindings.delete(runId) }, + registerAgentAbort(runId, agentId, ac) { + const b = bindings.get(runId) + if (!b) return + b.agentAbortControllers.set(agentId, ac) + }, + unregisterAgentAbort(runId, agentId) { + const b = bindings.get(runId) + if (!b) return + b.agentAbortControllers.delete(agentId) + }, + killAgent(runId, agentId) { + const b = bindings.get(runId) + if (!b) return false + const ac = b.agentAbortControllers.get(agentId) + if (!ac) return false + try { + ac.abort() + } catch { + // no-op + } + b.agentAbortControllers.delete(agentId) + return true + }, pendingAction() { return null // v1:skip/retry 不接线(seam 保留) }, diff --git a/src/workflow/service.ts b/src/workflow/service.ts index 4d4e09dfb..3e473dd7d 100644 --- a/src/workflow/service.ts +++ b/src/workflow/service.ts @@ -59,6 +59,11 @@ export type WorkflowService = { canUseTool: CanUseToolFn, ): Promise<{ runId: string; scriptPath?: string }> kill(runId: string): void + /** + * 中断单个 agent(不影响同 run 其他 agent,workflow 继续跑)。 + * 返回是否命中(false = agent 已完成/不存在)。agent 被 abort 后返回 dead → null。 + */ + killAgent(runId: string, agentId: number): boolean /** * 进程退出 / 配置卸载时清理:杀掉所有 running run,避免孤儿 task。 * 已完成/失败的 run 不受影响。幂等——多次调用安全。 @@ -243,6 +248,9 @@ export function makeService( kill(runId) { ports.taskRegistrar.kill(runId) }, + killAgent(runId, agentId) { + return ports.taskRegistrar.killAgent?.(runId, agentId) ?? false + }, shutdown() { // 仅杀 running:已完成/失败的 run taskRegistrar 已回收 binding,kill 是 no-op。