import { MAX_ITEMS_PER_CALL, MAX_TOTAL_AGENTS } from '../constants.js' import type { AgentRunParams, AgentRunResult, JournalEntry, ProgressEvent, } from '../types.js' import type { EngineContext } from './context.js' import { WorkflowAbortedError, WorkflowError } from './errors.js' import { agentCallKey } from './journal.js' import type { WorkflowHooks } from './script.js' /** workflow() 钩子的子 workflow 执行器(由 runWorkflow 注入,避免循环依赖)。 */ export type SubWorkflowRunner = (opts: { name?: string scriptPath?: string script?: string args?: unknown }) => Promise type HookProgressInit = | { type: 'phase_started'; phase: string } | { type: 'phase_done'; phase: string } | { type: 'agent_started'; agentId: number; label?: string; phase?: string } | { type: 'agent_done' agentId: number label?: string phase?: string result: AgentRunResult } | { type: 'log'; message: string } export function makeHooks( ctx: EngineContext, runSubWorkflow: SubWorkflowRunner, ): WorkflowHooks { // 所有进度事件自动注入 runId,供 adapter 路由到对应 task(多并发 workflow) const emit = (init: HookProgressInit): void => { ctx.ports.progressEmitter.emit({ runId: ctx.runId, ...init, } as ProgressEvent) } const agent: WorkflowHooks['agent'] = async (prompt, opts = {}) => { const r = ctx.resources if (r.agentCountBox.value >= MAX_TOTAL_AGENTS) { throw new WorkflowError( `workflow 超过 agent 总数上限 (${MAX_TOTAL_AGENTS})`, ) } // 每次 agent() 调用分配唯一 id(含 journal 命中),盖戳 started/done 供 reducer 精确关联 const agentId = r.agentIdSeq.value++ const params: AgentRunParams = { prompt, ...opts } const key = agentCallKey(prompt, params) const label = opts.label as string | undefined const phase = (opts.phase as string | undefined) ?? ctx.currentPhase ?? undefined // journal 命中 → 直接返回缓存 if (!ctx.journalInvalidated && ctx.journalIndex < ctx.journal.length) { const entry = ctx.journal[ctx.journalIndex]! if (entry.key === key) { ctx.journalIndex++ emit({ type: 'agent_done', agentId, label, phase, result: entry.result, }) return resultToOutput(entry.result) } // 发散:丢弃后续 journal,后续全部现场跑 ctx.journalInvalidated = true ctx.journal = ctx.journal.slice(0, ctx.journalIndex) await ctx.ports.journalStore.truncate(ctx.runId) } let release: () => void try { release = await ctx.resources.semaphore.acquire(ctx.signal) } catch { // abort 期间在队列中等待:semaphore 已把 waiter 移除、未消耗 permit throw new WorkflowAbortedError() } try { if (ctx.signal.aborted) throw new WorkflowAbortedError() // 预算检查在 semaphore 临界区内:queued waiter 被唤醒后看到最新 spent, // 否则 N 个 waiter 入队时 spent=0 全过检,唤醒后无 re-check 全部超支。 // journal 命中路径不扣预算,无需检查。 r.budget.assertCanSpend() const pending = ctx.ports.taskRegistrar.pendingAction(ctx.runId) if (pending?.kind === 'skip') { const result: AgentRunResult = { kind: 'skipped' } emit({ type: 'agent_done', agentId, label, phase, result }) return null } ctx.resources.agentCountBox.value++ emit({ type: 'agent_started', agentId, label, phase }) const registry = ctx.ports.agentAdapterRegistry const result = registry ? await registry.resolve(params).run(params, { host: ctx.host, signal: ctx.signal, runId: ctx.runId, }) : await ctx.ports.agentRunner.runAgentToResult(params, ctx.host) if (result.kind === 'ok') { ctx.resources.budget.addOutputTokens(result.usage.outputTokens) } emit({ type: 'agent_done', agentId, label, phase, result }) const entry: JournalEntry = { key, seq: agentId, result } // 关键:push 顺序 = 完成顺序(非调用顺序);read() 已按 seq 重排, // 因此 resume 时调用顺序与 journal 顺序对齐,key 索引稳定。 ctx.journal.push(entry) ctx.journalIndex++ await ctx.ports.journalStore.append(ctx.runId, entry) return resultToOutput(result) } finally { release() } } const parallel: WorkflowHooks['parallel'] = async thunks => { if (thunks.length > MAX_ITEMS_PER_CALL) { throw new WorkflowError( `parallel 超过单次调用 items 上限 (${MAX_ITEMS_PER_CALL})`, ) } return Promise.all( thunks.map(async (t, i) => { try { return await t() } catch (e) { // "null on error"契约不变,但应 log——否则 workflow 作者无法定位为何 agent 失败 ctx.ports.logger.warn?.( `parallel thunk #${i} failed: ${(e as Error).message}`, ) return null } }), ) } const pipeline: WorkflowHooks['pipeline'] = async ( items: readonly T[], ...stages: Array< (prev: unknown, item: T, index: number) => Promise > ): Promise> => { if (items.length > MAX_ITEMS_PER_CALL) { throw new WorkflowError( `pipeline 超过单次调用 items 上限 (${MAX_ITEMS_PER_CALL})`, ) } return Promise.all( items.map(async (item, index): Promise => { try { let prev: unknown = item for (const stage of stages) { prev = await stage(prev, item, index) } return prev as R } catch (e) { ctx.ports.logger.warn?.( `pipeline item #${index} failed: ${(e as Error).message}`, ) return null } }), ) } const phase: WorkflowHooks['phase'] = title => { if (ctx.currentPhase) { emit({ type: 'phase_done', phase: ctx.currentPhase }) } ctx.currentPhase = title emit({ type: 'phase_started', phase: title }) } const log: WorkflowHooks['log'] = message => { emit({ type: 'log', message }) } const workflow: WorkflowHooks['workflow'] = async (nameOrRef, args) => { if (ctx.resources.depth >= 1) { throw new WorkflowError('workflow() 嵌套仅允许一层') } const sub: Parameters[0] = typeof nameOrRef === 'string' ? { name: nameOrRef } : { scriptPath: nameOrRef.scriptPath } return runSubWorkflow({ ...sub, args }) } return { agent, parallel, pipeline, phase, log, workflow } } function resultToOutput(result: AgentRunResult): unknown { return result.kind === 'ok' ? result.output : null }