mirror of
https://github.com/claude-code-best/claude-code.git
synced 2026-06-17 22:05:50 +00:00
feat: harden autonomy lifecycle, OOM bounds, and provider-boundary finalization
This PR consolidates a coordinated batch of fixes around autonomy run/flow lifecycle, scheduled task deduplication, provider-boundary state finalization, and matching memory-bound treatments for adjacent long-running subsystems (REPL fullscreen scrollback, skill-search/skill-learning runtime activation). All changes were developed and reviewed together because they touched the same lifecycle invariants and were uncovered by the same long-running session reproductions.
## Lifecycle correctness
- Queued autonomy prompts are not injected unless the persisted run was successfully claimed; queued run claiming is now terminal-safe so a once-consumed/cancelled/failed run can not slip back into `queued`.
- Autonomy run/flow finalization happens on completion, provider error, generator close, and cancellation — not just the happy path. New `src/__tests__/queryAutonomyProviderBoundary.test.ts` covers these provider-boundary transitions.
- `requestManagedAutonomyFlowCancel` and `resumeManagedAutonomyFlowPrompt` carry `rootDir` and `currentDir` explicitly across detached async boundaries (proactive-tick, cron, daemon restart) instead of inferring from process state.
- Active runs/flows are protected from janitor pruning so a running step can not be garbage-collected mid-flight (`src/utils/autonomyAuthority.ts`).
- Heartbeat parser now ignores fenced code blocks; the two-phase commit window for autonomy state transitions is documented in `docs/internals/autonomy-jira.md`.
## Ownership and dedup
- `src/utils/autonomyRuns.ts`: ownership stamping (run id + rootDir carried end-to-end), source-based dedup against active runs.
- `src/hooks/useScheduledTasks.ts`: scheduled ticks deduplicate against runs already active on the same source label.
- `src/utils/processUserInput/processSlashCommand.tsx`: forked slash commands now thread the autonomy `runId` so completion finalizers can find the originating run for deferred completion.
- New `src/utils/autonomyQueueLifecycle.ts` and tests collect the queue-side lifecycle invariants in one place.
## Memory bounds (related, same review pass)
- `src/screens/REPL.tsx`: caps fullscreen scrollback after the compact boundary and updates trailing progress rows in place. Long-running fullscreen sessions could otherwise retain thousands of post-compaction messages and duplicate progress rows, keeping Ink trees alive long after their useful context had moved on.
- `src/services/skillSearch/*` and `src/services/skillLearning/*`: runtime activation is strictly opt-in via existing env toggles; session caches are capped so long-running processes can not grow them forever. Build presence is preserved so operators can still discover and opt into the slash commands.
## CI / test contract
- `tests/integration/dependency-overrides.test.ts`: smoke test no longer drives Mermaid's browser renderer; it validates the package-resolution contract directly so CI does not regress on unrelated browser timing.
- New `tests/integration/autonomy-lifecycle-user-flow.test.ts`: end-to-end CLI subprocess flow exercising `status --deep`, `flows`, `flow <id>`, `flow resume`, `flow cancel` against persisted state.
- `src/entrypoints/cli.tsx`: `claude autonomy …` routes through an entrypoint fast path that reuses the slash-command formatter without booting the full interactive CLI. Stdout is flushed before forced exit so coverage subprocesses do not terminate with empty stdout.
- `packages/builtin-tools/src/tools/RemoteTriggerTool/__tests__/RemoteTriggerTool.test.ts`: stabilized to prevent audit flake under coverage.
## Tests added
- `src/__tests__/queryAutonomyProviderBoundary.test.ts`
- `src/hooks/__tests__/useScheduledTasks.test.ts`
- `src/utils/__tests__/autonomyAuthority.test.ts`
- `src/utils/__tests__/autonomyFlows.test.ts` (extended)
- `src/utils/__tests__/autonomyPersistence.test.ts` (extended)
- `src/utils/__tests__/autonomyQueueLifecycle.test.ts`
- `src/utils/__tests__/autonomyRuns.test.ts` (extended)
- `src/utils/processUserInput/__tests__/processSlashCommand.test.ts`
- `tests/integration/autonomy-lifecycle-user-flow.test.ts`
## Docs
- `docs/agent/sur-loop-scheduled-oom.md`: System Understanding Report covering the scheduled/loop OOM problem, the call graphs investigated, and the lifecycle invariants this PR establishes.
- `docs/agent/sur-skill-overflow-bugs.md`: SUR for the related skill-overflow context.
- `docs/internals/autonomy-jira.md`: documents the two-phase commit window and ownership stamping invariants.
- `docs/memory-leak-audit.md`: audit notes covering the REPL/scrollback and skill-search bounds.
## Invariants this PR establishes
1. Queued autonomy prompts are not injected unless the persisted run was successfully claimed.
2. Terminal run/flow states are terminal — completion, failure, and cancellation all finalize state regardless of which provider/error path triggered them.
3. Autonomy run/flow `rootDir` is carried explicitly across detached async boundaries instead of inferred from a shared singleton.
4. State-only CLI subcommands (`autonomy status|runs|flows|flow …`) bypass full interactive bootstrap so they do not hold unrelated handles open.
5. REPL fullscreen scrollback and skill-search/skill-learning session caches are explicitly bounded.
## Validation
```bash
bun run typecheck
CI=true GITHUB_ACTIONS=true bun test # 3996 pass / 0 fail across 305 files
bun test src/__tests__/queryAutonomyProviderBoundary.test.ts \
src/hooks/__tests__/useScheduledTasks.test.ts \
src/utils/__tests__/autonomy{Runs,Flows,Authority,QueueLifecycle,Persistence}.test.ts \
src/utils/processUserInput/__tests__/processSlashCommand.test.ts \
tests/integration/autonomy-lifecycle-user-flow.test.ts
```
## Origin
This PR is the consolidated, upstream-targeted version of two fork-side review PRs (fix/loop-scheduled-autonomy-oom and fix/autonomy-lifecycle). The fork-side review history is preserved at https://github.com/amDosion/claude-code-bast/pull/7 . The fork's own internal `chore: keep fork current with upstream` sync commits and the `docs: update contributors` automation are intentionally not included in this PR.
The autonomy CLI handler `rootDir` threading that the fork added (78f64d8a, 98d04ddb) is intentionally omitted here because upstream `a2cfaf91` (fix: 修复 RemoteTriggerTool 和 autonomy 测试的全量运行失败) already performed the equivalent change with an additional `currentDir` option. Keeping the upstream version avoids regressing that improvement.
This commit is contained in:
152
src/query.ts
152
src/query.ts
@@ -71,10 +71,16 @@ const jobClassifier = feature('TEMPLATES')
|
||||
: null
|
||||
/* eslint-enable @typescript-eslint/no-require-imports */
|
||||
import {
|
||||
enqueue,
|
||||
remove as removeFromQueue,
|
||||
getCommandsByMaxPriority,
|
||||
isSlashCommand,
|
||||
} from './utils/messageQueueManager.js'
|
||||
import {
|
||||
type AutonomyTurnOutcome,
|
||||
claimConsumableQueuedAutonomyCommands,
|
||||
finalizeAutonomyCommandsForTurn,
|
||||
} from './utils/autonomyQueueLifecycle.js'
|
||||
import { notifyCommandLifecycle } from './utils/commandLifecycle.js'
|
||||
import { headlessProfilerCheckpoint } from './utils/headlessProfiler.js'
|
||||
import {
|
||||
@@ -92,6 +98,7 @@ import { SLEEP_TOOL_NAME } from '@claude-code-best/builtin-tools/tools/SleepTool
|
||||
import { executePostSamplingHooks } from './utils/hooks/postSamplingHooks.js'
|
||||
import { executeStopFailureHooks } from './utils/hooks.js'
|
||||
import type { QuerySource } from './constants/querySource.js'
|
||||
import type { QueuedCommand } from './types/textInputTypes.js'
|
||||
import { createDumpPromptsFetch } from './services/api/dumpPrompts.js'
|
||||
import { StreamingToolExecutor } from './services/tools/StreamingToolExecutor.js'
|
||||
import { queryCheckpoint } from './utils/queryProfiler.js'
|
||||
@@ -111,7 +118,11 @@ import {
|
||||
} from './bootstrap/state.js'
|
||||
import { createBudgetTracker, checkTokenBudget } from './query/tokenBudget.js'
|
||||
import { count } from './utils/array.js'
|
||||
import { createTrace, endTrace, isLangfuseEnabled } from './services/langfuse/index.js'
|
||||
import {
|
||||
createTrace,
|
||||
endTrace,
|
||||
isLangfuseEnabled,
|
||||
} from './services/langfuse/index.js'
|
||||
import { getAPIProvider } from './utils/model/providers.js'
|
||||
|
||||
/* eslint-disable @typescript-eslint/no-require-imports */
|
||||
@@ -129,7 +140,11 @@ function* yieldMissingToolResultBlocks(
|
||||
) {
|
||||
for (const assistantMessage of assistantMessages) {
|
||||
// Extract all tool use blocks from this assistant message
|
||||
const toolUseBlocks = (Array.isArray(assistantMessage.message?.content) ? assistantMessage.message.content : []).filter(
|
||||
const toolUseBlocks = (
|
||||
Array.isArray(assistantMessage.message?.content)
|
||||
? assistantMessage.message.content
|
||||
: []
|
||||
).filter(
|
||||
(content: { type: string }) => content.type === 'tool_use',
|
||||
) as ToolUseBlock[]
|
||||
|
||||
@@ -181,6 +196,33 @@ function isWithheldMaxOutputTokens(
|
||||
return msg?.type === 'assistant' && msg.apiError === 'max_output_tokens'
|
||||
}
|
||||
|
||||
function getAutonomyTurnOutcome(params: {
|
||||
terminal?: Terminal
|
||||
thrownError?: unknown
|
||||
}): AutonomyTurnOutcome {
|
||||
if (params.thrownError !== undefined) {
|
||||
return { type: 'failed', error: params.thrownError }
|
||||
}
|
||||
|
||||
const terminal = params.terminal
|
||||
const reason = terminal?.reason
|
||||
switch (reason) {
|
||||
case 'completed':
|
||||
return { type: 'completed' }
|
||||
case undefined:
|
||||
case 'aborted_streaming':
|
||||
case 'aborted_tools':
|
||||
return { type: 'cancelled' }
|
||||
case 'model_error':
|
||||
return { type: 'failed', error: terminal.error }
|
||||
default:
|
||||
return {
|
||||
type: 'failed',
|
||||
message: `query ended without successful completion: ${reason}`,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export type QueryParams = {
|
||||
messages: Message[]
|
||||
systemPrompt: SystemPrompt
|
||||
@@ -230,6 +272,7 @@ export async function* query(
|
||||
Terminal
|
||||
> {
|
||||
const consumedCommandUuids: string[] = []
|
||||
const consumedAutonomyCommands: QueuedCommand[] = []
|
||||
|
||||
// Create Langfuse trace for this query turn (no-op if not configured).
|
||||
// When called as a sub-agent, langfuseTrace is already set by runAgent()
|
||||
@@ -238,8 +281,9 @@ export async function* query(
|
||||
logForDebugging(
|
||||
`[query] ownsTrace=${ownsTrace} incoming langfuseTrace=${params.toolUseContext.langfuseTrace ? 'present' : 'null/undefined'} isLangfuseEnabled=${isLangfuseEnabled()}`,
|
||||
)
|
||||
const langfuseTrace = params.toolUseContext.langfuseTrace
|
||||
?? (isLangfuseEnabled()
|
||||
const langfuseTrace =
|
||||
params.toolUseContext.langfuseTrace ??
|
||||
(isLangfuseEnabled()
|
||||
? createTrace({
|
||||
sessionId: getSessionId(),
|
||||
model: params.toolUseContext.options.mainLoopModel,
|
||||
@@ -258,9 +302,34 @@ export async function* query(
|
||||
: params
|
||||
|
||||
let terminal: Terminal | undefined
|
||||
let didThrow = false
|
||||
let thrownError: unknown
|
||||
try {
|
||||
terminal = yield* queryLoop(paramsWithTrace, consumedCommandUuids)
|
||||
terminal = yield* queryLoop(
|
||||
paramsWithTrace,
|
||||
consumedCommandUuids,
|
||||
consumedAutonomyCommands,
|
||||
)
|
||||
} catch (error) {
|
||||
didThrow = true
|
||||
thrownError = error
|
||||
throw error
|
||||
} finally {
|
||||
await finalizeAutonomyCommandsForTurn({
|
||||
commands: consumedAutonomyCommands,
|
||||
outcome: getAutonomyTurnOutcome({
|
||||
terminal,
|
||||
...(didThrow ? { thrownError } : {}),
|
||||
}),
|
||||
priority: 'later',
|
||||
})
|
||||
.then(nextCommands => {
|
||||
for (const command of nextCommands) {
|
||||
enqueue(command)
|
||||
}
|
||||
})
|
||||
.catch(logError)
|
||||
|
||||
// Only end the trace if we created it — sub-agents own their traces
|
||||
if (ownsTrace) {
|
||||
const isAborted =
|
||||
@@ -283,6 +352,7 @@ export async function* query(
|
||||
async function* queryLoop(
|
||||
params: QueryParams,
|
||||
consumedCommandUuids: string[],
|
||||
consumedAutonomyCommands: QueuedCommand[],
|
||||
): AsyncGenerator<
|
||||
| StreamEvent
|
||||
| RequestStartEvent
|
||||
@@ -790,7 +860,14 @@ async function* queryLoop(
|
||||
let yieldMessage: typeof message = message
|
||||
if (message.type === 'assistant') {
|
||||
const assistantMsg = message as AssistantMessage
|
||||
const contentArr = Array.isArray(assistantMsg.message?.content) ? assistantMsg.message.content as unknown as Array<{ type: string; input?: unknown; name?: string; [key: string]: unknown }> : []
|
||||
const contentArr = Array.isArray(assistantMsg.message?.content)
|
||||
? (assistantMsg.message.content as unknown as Array<{
|
||||
type: string
|
||||
input?: unknown
|
||||
name?: string
|
||||
[key: string]: unknown
|
||||
}>)
|
||||
: []
|
||||
let clonedContent: typeof contentArr | undefined
|
||||
for (let i = 0; i < contentArr.length; i++) {
|
||||
const block = contentArr[i]!
|
||||
@@ -826,7 +903,10 @@ async function* queryLoop(
|
||||
if (clonedContent) {
|
||||
yieldMessage = {
|
||||
...message,
|
||||
message: { ...(assistantMsg.message ?? {}), content: clonedContent },
|
||||
message: {
|
||||
...(assistantMsg.message ?? {}),
|
||||
content: clonedContent,
|
||||
},
|
||||
} as typeof message
|
||||
}
|
||||
}
|
||||
@@ -872,7 +952,11 @@ async function* queryLoop(
|
||||
const assistantMessage = message as AssistantMessage
|
||||
assistantMessages.push(assistantMessage)
|
||||
|
||||
const msgToolUseBlocks = (Array.isArray(assistantMessage.message?.content) ? assistantMessage.message.content : []).filter(
|
||||
const msgToolUseBlocks = (
|
||||
Array.isArray(assistantMessage.message?.content)
|
||||
? assistantMessage.message.content
|
||||
: []
|
||||
).filter(
|
||||
(content: { type: string }) => content.type === 'tool_use',
|
||||
) as ToolUseBlock[]
|
||||
if (msgToolUseBlocks.length > 0) {
|
||||
@@ -1005,7 +1089,10 @@ async function* queryLoop(
|
||||
logEvent('tengu_query_error', {
|
||||
assistantMessages: assistantMessages.length,
|
||||
toolUses: assistantMessages.flatMap(_ =>
|
||||
(Array.isArray(_.message?.content) ? _.message.content as Array<{ type: string }> : []).filter(content => content.type === 'tool_use'),
|
||||
(Array.isArray(_.message?.content)
|
||||
? (_.message.content as Array<{ type: string }>)
|
||||
: []
|
||||
).filter(content => content.type === 'tool_use'),
|
||||
).length,
|
||||
|
||||
queryChainId: queryChainIdForAnalytics,
|
||||
@@ -1307,7 +1394,10 @@ async function* queryLoop(
|
||||
// error → hook blocking → retry → error → …
|
||||
if (lastMessage?.isApiErrorMessage) {
|
||||
void executeStopFailureHooks(lastMessage, toolUseContext)
|
||||
return { reason: 'completed' }
|
||||
return {
|
||||
reason: 'model_error',
|
||||
error: lastMessage.error ?? lastMessage.apiError ?? 'api_error',
|
||||
}
|
||||
}
|
||||
|
||||
const stopHookResult = yield* handleStopHooks(
|
||||
@@ -1408,7 +1498,6 @@ async function* queryLoop(
|
||||
|
||||
queryCheckpoint('query_tool_execution_start')
|
||||
|
||||
|
||||
if (streamingToolExecutor) {
|
||||
logEvent('tengu_streaming_tool_execution_used', {
|
||||
tool_count: toolUseBlocks.length,
|
||||
@@ -1468,9 +1557,14 @@ async function* queryLoop(
|
||||
const lastAssistantMessage = assistantMessages.at(-1)
|
||||
let lastAssistantText: string | undefined
|
||||
if (lastAssistantMessage) {
|
||||
const textBlocks = (Array.isArray(lastAssistantMessage.message?.content) ? lastAssistantMessage.message.content as Array<{ type: string; text?: string }> : []).filter(
|
||||
block => block.type === 'text',
|
||||
)
|
||||
const textBlocks = (
|
||||
Array.isArray(lastAssistantMessage.message?.content)
|
||||
? (lastAssistantMessage.message.content as Array<{
|
||||
type: string
|
||||
text?: string
|
||||
}>)
|
||||
: []
|
||||
).filter(block => block.type === 'text')
|
||||
if (textBlocks.length > 0) {
|
||||
const lastTextBlock = textBlocks.at(-1)
|
||||
if (lastTextBlock && 'text' in lastTextBlock) {
|
||||
@@ -1622,12 +1716,32 @@ async function* queryLoop(
|
||||
// user prompts, even if someone stamps an agentId on one.
|
||||
return cmd.mode === 'task-notification' && cmd.agentId === currentAgentId
|
||||
})
|
||||
const queuedAutonomyClaim = await claimConsumableQueuedAutonomyCommands(
|
||||
queuedCommandsSnapshot,
|
||||
)
|
||||
if (queuedAutonomyClaim.staleCommands.length > 0) {
|
||||
removeFromQueue(queuedAutonomyClaim.staleCommands)
|
||||
}
|
||||
|
||||
const claimedConsumedCommands = queuedAutonomyClaim.claimedCommands.filter(
|
||||
cmd => cmd.mode === 'prompt' || cmd.mode === 'task-notification',
|
||||
)
|
||||
if (claimedConsumedCommands.length > 0) {
|
||||
consumedAutonomyCommands.push(...claimedConsumedCommands)
|
||||
for (const cmd of claimedConsumedCommands) {
|
||||
if (cmd.uuid) {
|
||||
consumedCommandUuids.push(cmd.uuid)
|
||||
notifyCommandLifecycle(cmd.uuid, 'started')
|
||||
}
|
||||
}
|
||||
removeFromQueue(claimedConsumedCommands)
|
||||
}
|
||||
|
||||
for await (const attachment of getAttachmentMessages(
|
||||
null,
|
||||
updatedToolUseContext,
|
||||
null,
|
||||
queuedCommandsSnapshot,
|
||||
queuedAutonomyClaim.attachmentCommands,
|
||||
[...messagesForQuery, ...assistantMessages, ...toolResults],
|
||||
querySource,
|
||||
)) {
|
||||
@@ -1659,7 +1773,6 @@ async function* queryLoop(
|
||||
pendingMemoryPrefetch.consumedOnIteration = turnCount - 1
|
||||
}
|
||||
|
||||
|
||||
// Inject prefetched skill discovery. collectSkillDiscoveryPrefetch emits
|
||||
// hidden_by_main_turn — true when the prefetch resolved before this point
|
||||
// (should be >98% at AKI@250ms / Haiku@573ms vs turn durations of 2-30s).
|
||||
@@ -1675,8 +1788,11 @@ async function* queryLoop(
|
||||
|
||||
// Remove only commands that were actually consumed as attachments.
|
||||
// Prompt and task-notification commands are converted to attachments above.
|
||||
const consumedCommands = queuedCommandsSnapshot.filter(
|
||||
cmd => cmd.mode === 'prompt' || cmd.mode === 'task-notification',
|
||||
const claimedCommandSet = new Set(claimedConsumedCommands)
|
||||
const consumedCommands = queuedAutonomyClaim.attachmentCommands.filter(
|
||||
cmd =>
|
||||
(cmd.mode === 'prompt' || cmd.mode === 'task-notification') &&
|
||||
!claimedCommandSet.has(cmd),
|
||||
)
|
||||
if (consumedCommands.length > 0) {
|
||||
for (const cmd of consumedCommands) {
|
||||
|
||||
Reference in New Issue
Block a user