fix(types): replace all as any with proper type assertions

Eliminate unsafe `as any` casts across 21 non-test source files,
replacing them with specific type annotations:

- Bridge transport: use StdoutMessage type for write/writeBatch calls
- print.ts: type msg.request as Record<string, unknown> for unknown
  SDK control subtypes; use StdoutMessage for output.enqueue()
- API providers (openai/grok/gemini): import ChatCompletion types,
  type streams as AsyncIterable<ChatCompletionChunk>, type request
  bodies as ChatCompletionCreateParamsStreaming
- Computer use executor: use Partial<ResolvePrepareCaptureResult>
  for cross-platform screenshot result
- Components: replace Ink color string casts with proper typing
- Win32 bridge: type stdin as Writable after null check

All 2453 tests pass with 0 failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
claude-code-best
2026-04-09 23:51:33 +08:00
parent a14d3dc8f0
commit 34bbc1d403
19 changed files with 317 additions and 231 deletions

View File

@@ -1128,7 +1128,7 @@ function runHeadlessStreaming(
rate_limit_info: rateLimitInfo,
uuid: randomUUID(),
session_id: getSessionId(),
})
} as unknown as Parameters<typeof output.enqueue>[0])
}
}
statusListeners.add(rateLimitListener)
@@ -1237,7 +1237,7 @@ function runHeadlessStreaming(
uuid: crumb.uuid,
timestamp: crumb.timestamp,
isReplay: true,
} as SDKUserMessageReplay)
} as SDKUserMessageReplay as StdoutMessage)
}
}
}
@@ -1974,7 +1974,7 @@ function runHeadlessStreaming(
parent_tool_use_id: null,
uuid: c.uuid as string,
isReplay: true,
} as SDKUserMessageReplay)
} as SDKUserMessageReplay as StdoutMessage)
}
}
}
@@ -2200,7 +2200,7 @@ function runHeadlessStreaming(
output.enqueue({
type: 'system',
subtype: 'status',
status,
status: status as 'compacting' | null,
session_id: getSessionId(),
uuid: randomUUID(),
})
@@ -2227,10 +2227,10 @@ function runHeadlessStreaming(
isBackgroundTask(t),
)
) {
heldBackResult = message
heldBackResult = message as StdoutMessage
} else {
heldBackResult = null
output.enqueue(message)
output.enqueue(message as StdoutMessage)
}
} else {
// Flush SDK events (task_started, task_progress) so background
@@ -2238,7 +2238,7 @@ function runHeadlessStreaming(
for (const event of drainSdkEvents()) {
output.enqueue(event)
}
output.enqueue(message)
output.enqueue(message as StdoutMessage)
}
}
}) // end runWithWorkload
@@ -2256,11 +2256,12 @@ function runHeadlessStreaming(
{ turnStartTime } as import('src/utils/filePersistence/types.js').TurnStartTime,
abortController.signal,
result => {
const filesResult = result as { persistedFiles: { filename: string; file_id: string }[]; failedFiles: { filename: string; error: string }[] }
output.enqueue({
type: 'system' as const,
subtype: 'files_persisted' as const,
files: (result as any).persistedFiles,
failed: (result as any).failedFiles,
files: filesResult.persistedFiles,
failed: filesResult.failedFiles,
processed_at: new Date().toISOString(),
uuid: randomUUID(),
session_id: getSessionId(),
@@ -2730,7 +2731,7 @@ function runHeadlessStreaming(
}
const sendControlResponseSuccess = function (
message: SDKControlRequest,
message: { request_id: string } | SDKControlRequest,
response?: Record<string, unknown>,
) {
output.enqueue({
@@ -2744,7 +2745,7 @@ function runHeadlessStreaming(
}
const sendControlResponseError = function (
message: SDKControlRequest,
message: { request_id: string } | SDKControlRequest,
errorMessage: string,
) {
output.enqueue({
@@ -2820,11 +2821,21 @@ function runHeadlessStreaming(
message.type !== 'user' &&
message.type !== 'control_response'
) {
notifyCommandLifecycle(eventId, 'completed')
notifyCommandLifecycle(eventId as string, 'completed')
}
if (message.type === 'control_request') {
if (message.request.subtype === 'interrupt') {
// Type assertion: structuredInput yields StdinMessage | SDKMessage, but
// when type === 'control_request' the object has request_id and request.
// The union with SDKMessage (typed as `any`) causes request to be `unknown`.
// Cast to SDKControlRequest (via unknown) for type safety on known subtypes,
// and use Record<string, unknown> for subtypes not in the zod schema union.
const msg = message as unknown as SDKControlRequest
// Wider-typed alias for request properties on subtypes not in the zod schema.
// The schema union doesn't include end_session, channel_enable, mcp_authenticate,
// claude_authenticate, etc. so accessing their properties narrows to `never`.
const req = msg.request as Record<string, unknown>
if (msg.request.subtype === 'interrupt') {
// Track escapes for attribution (ant-only feature)
if (feature('COMMIT_ATTRIBUTION')) {
setAppState(prev => ({
@@ -2842,10 +2853,10 @@ function runHeadlessStreaming(
suggestionState.abortController = null
suggestionState.lastEmitted = null
suggestionState.pendingSuggestion = null
sendControlResponseSuccess(message)
} else if (message.request.subtype === 'end_session') {
sendControlResponseSuccess(msg)
} else if (req.subtype === 'end_session') {
logForDebugging(
`[print.ts] end_session received, reason=${message.request.reason ?? 'unspecified'}`,
`[print.ts] end_session received, reason=${req.reason ?? 'unspecified'}`,
)
if (abortController) {
abortController.abort()
@@ -2854,16 +2865,16 @@ function runHeadlessStreaming(
suggestionState.abortController = null
suggestionState.lastEmitted = null
suggestionState.pendingSuggestion = null
sendControlResponseSuccess(message)
sendControlResponseSuccess(msg)
break // exits for-await → falls through to inputClosed=true drain below
} else if (message.request.subtype === 'initialize') {
} else if (msg.request.subtype === 'initialize') {
// SDK MCP server names from the initialize message
// Populated by both browser and ProcessTransport sessions
if (
message.request.sdkMcpServers &&
message.request.sdkMcpServers.length > 0
msg.request.sdkMcpServers &&
msg.request.sdkMcpServers.length > 0
) {
for (const serverName of message.request.sdkMcpServers) {
for (const serverName of msg.request.sdkMcpServers) {
// Create placeholder config for SDK MCP servers
// The actual server connection is managed by the SDK Query class
sdkMcpConfigs[serverName] = {
@@ -2874,8 +2885,8 @@ function runHeadlessStreaming(
}
await handleInitializeRequest(
message.request,
message.request_id,
msg.request,
msg.request_id,
initialized,
output,
commands,
@@ -2890,7 +2901,7 @@ function runHeadlessStreaming(
// Enable prompt suggestions in AppState when SDK consumer opts in.
// shouldEnablePromptSuggestion() returns false for non-interactive
// sessions, but the SDK consumer explicitly requested suggestions.
if (message.request.promptSuggestions) {
if (msg.request.promptSuggestions) {
setAppState(prev => {
if (prev.promptSuggestionEnabled) return prev
return { ...prev, promptSuggestionEnabled: true }
@@ -2898,7 +2909,7 @@ function runHeadlessStreaming(
}
if (
message.request.agentProgressSummaries &&
msg.request.agentProgressSummaries &&
getFeatureValue_CACHED_MAY_BE_STALE('tengu_slate_prism', true)
) {
setSdkAgentProgressSummariesEnabled(true)
@@ -2911,13 +2922,13 @@ function runHeadlessStreaming(
if (hasCommandsInQueue()) {
void run()
}
} else if (message.request.subtype === 'set_permission_mode') {
const m = message.request // for typescript (TODO: use readonly types to avoid this)
} else if (msg.request.subtype === 'set_permission_mode') {
const m = msg.request // for typescript (TODO: use readonly types to avoid this)
setAppState(prev => ({
...prev,
toolPermissionContext: handleSetPermissionMode(
m,
message.request_id,
msg.request_id,
prev.toolPermissionContext,
output,
),
@@ -2926,8 +2937,8 @@ function runHeadlessStreaming(
// handleSetPermissionMode sends the control_response; the
// notifySessionMetadataChanged that used to follow here is
// now fired by onChangeAppState (with externalized mode name).
} else if (message.request.subtype === 'set_model') {
const requestedModel = message.request.model ?? 'default'
} else if (msg.request.subtype === 'set_model') {
const requestedModel = msg.request.model ?? 'default'
const model =
requestedModel === 'default'
? getDefaultMainLoopModel()
@@ -2937,24 +2948,24 @@ function runHeadlessStreaming(
notifySessionMetadataChanged({ model })
injectModelSwitchBreadcrumbs(requestedModel, model)
sendControlResponseSuccess(message)
} else if (message.request.subtype === 'set_max_thinking_tokens') {
if (message.request.max_thinking_tokens === null) {
sendControlResponseSuccess(msg)
} else if (msg.request.subtype === 'set_max_thinking_tokens') {
if (msg.request.max_thinking_tokens === null) {
options.thinkingConfig = undefined
} else if (message.request.max_thinking_tokens === 0) {
} else if (msg.request.max_thinking_tokens === 0) {
options.thinkingConfig = { type: 'disabled' }
} else {
options.thinkingConfig = {
type: 'enabled',
budgetTokens: message.request.max_thinking_tokens,
budgetTokens: msg.request.max_thinking_tokens,
}
}
sendControlResponseSuccess(message)
} else if (message.request.subtype === 'mcp_status') {
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg)
} else if (msg.request.subtype === 'mcp_status') {
sendControlResponseSuccess(msg, {
mcpServers: buildMcpServerStatuses(),
})
} else if (message.request.subtype === 'get_context_usage') {
} else if (msg.request.subtype === 'get_context_usage') {
try {
const appState = getAppState()
const data = await collectContextData({
@@ -2968,13 +2979,13 @@ function runHeadlessStreaming(
appendSystemPrompt: options.appendSystemPrompt,
},
})
sendControlResponseSuccess(message, { ...data })
sendControlResponseSuccess(msg, { ...data })
} catch (error) {
sendControlResponseError(message, errorMessage(error))
sendControlResponseError(msg, errorMessage(error))
}
} else if (message.request.subtype === 'mcp_message') {
} else if (msg.request.subtype === 'mcp_message') {
// Handle MCP notifications from SDK servers
const mcpRequest = message.request
const mcpRequest = msg.request as Record<string, unknown>
const sdkClient = sdkClients.find(
client => client.name === mcpRequest.server_name,
)
@@ -2985,32 +2996,32 @@ function runHeadlessStreaming(
sdkClient.type === 'connected' &&
sdkClient.client?.transport?.onmessage
) {
sdkClient.client.transport.onmessage(mcpRequest.message)
sdkClient.client.transport.onmessage(mcpRequest.message as import('@modelcontextprotocol/sdk/types.js').JSONRPCMessage)
}
sendControlResponseSuccess(message)
} else if (message.request.subtype === 'rewind_files') {
sendControlResponseSuccess(msg)
} else if (msg.request.subtype === 'rewind_files') {
const appState = getAppState()
const result = await handleRewindFiles(
message.request.user_message_id as UUID,
msg.request.user_message_id as UUID,
appState,
setAppState,
message.request.dry_run ?? false,
msg.request.dry_run ?? false,
)
if (result.canRewind || message.request.dry_run) {
sendControlResponseSuccess(message, result)
if (result.canRewind || msg.request.dry_run) {
sendControlResponseSuccess(msg, result)
} else {
sendControlResponseError(
message,
msg,
(result.error as string) ?? 'Unexpected error',
)
}
} else if (message.request.subtype === 'cancel_async_message') {
const targetUuid = message.request.message_uuid
} else if (msg.request.subtype === 'cancel_async_message') {
const targetUuid = msg.request.message_uuid
const removed = dequeueAllMatching(cmd => cmd.uuid === targetUuid)
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg, {
cancelled: removed.length > 0,
})
} else if (message.request.subtype === 'seed_read_state') {
} else if (msg.request.subtype === 'seed_read_state') {
// Client observed a Read that was later removed from context (e.g.
// by snip), so transcript-based seeding missed it. Queued into
// pendingSeeds; applied at the next clone-replace boundary.
@@ -3018,7 +3029,7 @@ function runHeadlessStreaming(
// expandPath: all other readFileState writers normalize (~, relative,
// session cwd vs process cwd). FileEditTool looks up by expandPath'd
// key — a verbatim client path would miss.
const normalizedPath = expandPath(message.request.path)
const normalizedPath = expandPath(msg.request.path)
// Check disk mtime before reading content. If the file changed
// since the client's observation, readFile would return C_current
// but we'd store it with the client's M_observed — getChangedFiles
@@ -3028,7 +3039,7 @@ function runHeadlessStreaming(
// makes Edit fail "file not read yet" → forces a fresh Read.
// Math.floor matches FileReadTool and getFileModificationTime.
const diskMtime = Math.floor((await stat(normalizedPath)).mtimeMs)
if (diskMtime <= message.request.mtime) {
if (diskMtime <= msg.request.mtime) {
const raw = await readFile(normalizedPath, 'utf-8')
// Strip BOM + normalize CRLF→LF to match readFileInRange and
// readFileSyncWithMetadata. FileEditTool's content-compare
@@ -3047,18 +3058,18 @@ function runHeadlessStreaming(
} catch {
// ENOENT etc — skip seeding but still succeed
}
sendControlResponseSuccess(message)
} else if (message.request.subtype === 'mcp_set_servers') {
sendControlResponseSuccess(msg)
} else if (msg.request.subtype === 'mcp_set_servers') {
const { response, sdkServersChanged } = await applyMcpServerChanges(
message.request.servers,
msg.request.servers as Record<string, McpServerConfigForProcessTransport>,
)
sendControlResponseSuccess(message, response)
sendControlResponseSuccess(msg, response)
// Connect SDK servers AFTER response to avoid deadlock
if (sdkServersChanged) {
void updateSdkMcp()
}
} else if (message.request.subtype === 'reload_plugins') {
} else if (msg.request.subtype === 'reload_plugins') {
try {
if (
feature('DOWNLOAD_USER_SETTINGS') &&
@@ -3106,7 +3117,7 @@ function runHeadlessStreaming(
logError(pluginsR.reason)
}
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg, {
commands: currentCommands
.filter(cmd => cmd.userInvocable !== false)
.map(cmd => ({
@@ -3120,15 +3131,15 @@ function runHeadlessStreaming(
model: a.model === 'inherit' ? undefined : a.model,
})),
plugins,
mcpServers: buildMcpServerStatuses(),
mcpServers: buildMcpServerStatuses() as SDKControlReloadPluginsResponse['mcpServers'],
error_count: r.error_count,
} satisfies SDKControlReloadPluginsResponse)
} catch (error) {
sendControlResponseError(message, errorMessage(error))
sendControlResponseError(msg, errorMessage(error))
}
} else if (message.request.subtype === 'mcp_reconnect') {
} else if (msg.request.subtype === 'mcp_reconnect') {
const currentAppState = getAppState()
const { serverName } = message.request
const { serverName } = msg.request
elicitationRegistered.delete(serverName)
// Config-existence gate must cover the SAME sources as the
// operations below. SDK-injected servers (query({mcpServers:{...}}))
@@ -3144,7 +3155,7 @@ function runHeadlessStreaming(
?.config ??
null
if (!config) {
sendControlResponseError(message, `Server not found: ${serverName}`)
sendControlResponseError(msg, `Server not found: ${serverName}`)
} else {
const result = await reconnectMcpServerImpl(serverName, config)
// Update appState.mcp with the new client, tools, commands, and resources
@@ -3190,18 +3201,18 @@ function runHeadlessStreaming(
if (result.client.type === 'connected') {
registerElicitationHandlers([result.client])
reregisterChannelHandlerAfterReconnect(result.client)
sendControlResponseSuccess(message)
sendControlResponseSuccess(msg)
} else {
const errorMessage =
result.client.type === 'failed'
? (result.client.error ?? 'Connection failed')
: `Server status: ${result.client.type}`
sendControlResponseError(message, errorMessage)
sendControlResponseError(msg, errorMessage)
}
}
} else if (message.request.subtype === 'mcp_toggle') {
} else if (msg.request.subtype === 'mcp_toggle') {
const currentAppState = getAppState()
const { serverName, enabled } = message.request
const { serverName, enabled } = msg.request
elicitationRegistered.delete(serverName)
// Gate must match the client-lookup spread below (which
// includes sdkClients and dynamicMcpState.clients). Same fix as
@@ -3216,7 +3227,7 @@ function runHeadlessStreaming(
null
if (!config) {
sendControlResponseError(message, `Server not found: ${serverName}`)
sendControlResponseError(msg, `Server not found: ${serverName}`)
} else if (!enabled) {
// Disabling: persist + disconnect (matches TUI toggleMcpServer behavior)
setMcpServerEnabled(serverName, false)
@@ -3247,7 +3258,7 @@ function runHeadlessStreaming(
resources: omit(prev.mcp.resources, serverName),
},
}))
sendControlResponseSuccess(message)
sendControlResponseSuccess(msg)
} else {
// Enabling: persist + reconnect
setMcpServerEnabled(serverName, true)
@@ -3281,20 +3292,20 @@ function runHeadlessStreaming(
if (result.client.type === 'connected') {
registerElicitationHandlers([result.client])
reregisterChannelHandlerAfterReconnect(result.client)
sendControlResponseSuccess(message)
sendControlResponseSuccess(msg)
} else {
const errorMessage =
result.client.type === 'failed'
? (result.client.error ?? 'Connection failed')
: `Server status: ${result.client.type}`
sendControlResponseError(message, errorMessage)
sendControlResponseError(msg, errorMessage)
}
}
} else if (message.request.subtype === 'channel_enable') {
} else if (req.subtype === 'channel_enable') {
const currentAppState = getAppState()
handleChannelEnable(
message.request_id,
message.request.serverName,
msg.request_id,
req.serverName as string,
// Pool spread matches mcp_status — all three client sources.
[
...currentAppState.mcp.clients,
@@ -3303,8 +3314,8 @@ function runHeadlessStreaming(
],
output,
)
} else if (message.request.subtype === 'mcp_authenticate') {
const { serverName } = message.request
} else if (req.subtype === 'mcp_authenticate') {
const { serverName } = req
const currentAppState = getAppState()
const config =
getMcpConfigByName(serverName) ??
@@ -3313,10 +3324,10 @@ function runHeadlessStreaming(
?.config ??
null
if (!config) {
sendControlResponseError(message, `Server not found: ${serverName}`)
sendControlResponseError(msg, `Server not found: ${serverName}`)
} else if (config.type !== 'sse' && config.type !== 'http') {
sendControlResponseError(
message,
msg,
`Server type "${config.type}" does not support OAuth authentication`,
)
} else {
@@ -3353,12 +3364,12 @@ function runHeadlessStreaming(
])
if (authUrl) {
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg, {
authUrl,
requiresUserAction: true,
})
} else {
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg, {
requiresUserAction: false,
})
}
@@ -3453,11 +3464,11 @@ function runHeadlessStreaming(
})
void fullFlowPromise
} catch (error) {
sendControlResponseError(message, errorMessage(error))
sendControlResponseError(msg, errorMessage(error))
}
}
} else if (message.request.subtype === 'mcp_oauth_callback_url') {
const { serverName, callbackUrl } = message.request
} else if (req.subtype === 'mcp_oauth_callback_url') {
const { serverName, callbackUrl } = req
const submit = oauthCallbackSubmitters.get(serverName)
if (submit) {
// Validate the callback URL before submitting. The submit
@@ -3475,7 +3486,7 @@ function runHeadlessStreaming(
}
if (!hasCodeOrError) {
sendControlResponseError(
message,
msg,
'Invalid callback URL: missing authorization code. Please paste the full redirect URL including the code parameter.',
)
} else {
@@ -3488,32 +3499,32 @@ function runHeadlessStreaming(
if (authPromise) {
try {
await authPromise
sendControlResponseSuccess(message)
sendControlResponseSuccess(msg)
} catch (error) {
sendControlResponseError(
message,
msg,
error instanceof Error
? error.message
: 'OAuth authentication failed',
)
}
} else {
sendControlResponseSuccess(message)
sendControlResponseSuccess(msg)
}
}
} else {
sendControlResponseError(
message,
msg,
`No active OAuth flow for server: ${serverName}`,
)
}
} else if (message.request.subtype === 'claude_authenticate') {
} else if (req.subtype === 'claude_authenticate') {
// Anthropic OAuth over the control channel. The SDK client owns
// the user's browser (we're headless in -p mode); we hand back
// both URLs and wait. Automatic URL → localhost listener catches
// the redirect if the browser is on this host; manual URL → the
// success page shows "code#state" for claude_oauth_callback.
const { loginWithClaudeAi } = message.request
const { loginWithClaudeAi } = req
// Clean up any prior flow. cleanup() closes the localhost listener
// and nulls the manual resolver. The prior `flow` promise is left
@@ -3594,30 +3605,30 @@ function runHeadlessStreaming(
)
}),
])
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg, {
manualUrl,
automaticUrl,
})
} catch (error) {
sendControlResponseError(message, errorMessage(error))
sendControlResponseError(msg, errorMessage(error))
}
} else if (
message.request.subtype === 'claude_oauth_callback' ||
message.request.subtype === 'claude_oauth_wait_for_completion'
req.subtype === 'claude_oauth_callback' ||
req.subtype === 'claude_oauth_wait_for_completion'
) {
if (!claudeOAuth) {
sendControlResponseError(
message,
msg,
'No active claude_authenticate flow',
)
} else {
// Inject the manual code synchronously — must happen in stdin
// message order so a subsequent claude_authenticate doesn't
// replace the service before this code lands.
if (message.request.subtype === 'claude_oauth_callback') {
if (req.subtype === 'claude_oauth_callback') {
claudeOAuth.service.handleManualAuthCodeInput({
authorizationCode: message.request.authorizationCode,
state: message.request.state,
authorizationCode: req.authorizationCode as string,
state: req.state as string,
})
}
// Detach the await — the stdin reader is serial and blocking
@@ -3629,7 +3640,7 @@ function runHeadlessStreaming(
void flow.then(
() => {
const accountInfo = getAccountInformation()
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg, {
account: {
email: accountInfo?.email,
organization: accountInfo?.organization,
@@ -3641,11 +3652,11 @@ function runHeadlessStreaming(
})
},
(error: unknown) =>
sendControlResponseError(message, errorMessage(error)),
sendControlResponseError(msg, errorMessage(error)),
)
}
} else if (message.request.subtype === 'mcp_clear_auth') {
const { serverName } = message.request
} else if (req.subtype === 'mcp_clear_auth') {
const { serverName } = req
const currentAppState = getAppState()
const config =
getMcpConfigByName(serverName) ??
@@ -3654,10 +3665,10 @@ function runHeadlessStreaming(
?.config ??
null
if (!config) {
sendControlResponseError(message, `Server not found: ${serverName}`)
sendControlResponseError(msg, `Server not found: ${serverName}`)
} else if (config.type !== 'sse' && config.type !== 'http') {
sendControlResponseError(
message,
msg,
`Cannot clear auth for server type "${config.type}"`,
)
} else {
@@ -3690,16 +3701,16 @@ function runHeadlessStreaming(
: omit(prev.mcp.resources, serverName),
},
}))
sendControlResponseSuccess(message, {})
sendControlResponseSuccess(msg, {})
}
} else if (message.request.subtype === 'apply_flag_settings') {
} else if (msg.request.subtype === 'apply_flag_settings') {
// Snapshot the current model before applying — we need to detect
// model switches so we can inject breadcrumbs and notify listeners.
const prevModel = getMainLoopModel()
// Merge the provided settings into the in-memory flag settings
const existing = getFlagSettingsInline() ?? {}
const incoming = message.request.settings
const incoming = msg.request.settings
// Shallow-merge top-level keys; getSettingsForSource handles
// the deep merge with file-based flag settings via mergeWith.
// JSON serialization drops `undefined`, so callers use `null`
@@ -3748,8 +3759,8 @@ function runHeadlessStreaming(
injectModelSwitchBreadcrumbs(modelArg, newModel)
}
sendControlResponseSuccess(message)
} else if (message.request.subtype === 'get_settings') {
sendControlResponseSuccess(msg)
} else if (msg.request.subtype === 'get_settings') {
const currentAppState = getAppState()
const model = getMainLoopModel()
// modelSupportsEffort gate matches claude.ts — applied.effort must
@@ -3757,7 +3768,7 @@ function runHeadlessStreaming(
const effort = modelSupportsEffort(model)
? resolveAppliedEffort(model, currentAppState.effortValue)
: undefined
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg, {
...getSettingsWithSources(),
applied: {
model,
@@ -3765,22 +3776,22 @@ function runHeadlessStreaming(
effort: typeof effort === 'string' ? effort : null,
},
})
} else if (message.request.subtype === 'stop_task') {
const { task_id: taskId } = message.request
} else if (msg.request.subtype === 'stop_task') {
const { task_id: taskId } = msg.request
try {
await stopTask(taskId, {
getAppState,
setAppState,
})
sendControlResponseSuccess(message, {})
sendControlResponseSuccess(msg, {})
} catch (error) {
sendControlResponseError(message, errorMessage(error))
sendControlResponseError(msg, errorMessage(error))
}
} else if (message.request.subtype === 'generate_session_title') {
} else if (req.subtype === 'generate_session_title') {
// Fire-and-forget so the Haiku call does not block the stdin loop
// (which would delay processing of subsequent user messages /
// interrupts for the duration of the API roundtrip).
const { description, persist } = message.request
const { description, persist } = req
// Reuse the live controller only if it has not already been aborted
// (e.g. by interrupt()); an aborted signal would cause queryHaiku to
// immediately throw APIUserAbortError → {title: null}.
@@ -3799,16 +3810,16 @@ function runHeadlessStreaming(
logError(e)
}
}
sendControlResponseSuccess(message, { title })
sendControlResponseSuccess(msg, { title })
} catch (e) {
// Unreachable in practice — generateSessionTitle wraps its
// own body and returns null, saveAiGeneratedTitle is wrapped
// above. Propagate (not swallow) so unexpected failures are
// visible to the SDK caller (hostComms.ts catches and logs).
sendControlResponseError(message, errorMessage(e))
sendControlResponseError(msg, errorMessage(e))
}
})()
} else if (message.request.subtype === 'side_question') {
} else if (req.subtype === 'side_question') {
// Same fire-and-forget pattern as generate_session_title above —
// the forked agent's API roundtrip must not block the stdin loop.
//
@@ -3824,7 +3835,7 @@ function runHeadlessStreaming(
// matches in the common case. May still miss the cache for
// coordinator mode or memory-mechanics extras — acceptable, the
// alternative is the side question failing entirely.
const { question } = message.request
const { question } = req
void (async () => {
try {
const saved = getLastCacheSafeParams()
@@ -3863,16 +3874,16 @@ function runHeadlessStreaming(
question,
cacheSafeParams,
})
sendControlResponseSuccess(message, { response: result.response })
sendControlResponseSuccess(msg, { response: result.response })
} catch (e) {
sendControlResponseError(message, errorMessage(e))
sendControlResponseError(msg, errorMessage(e))
}
})()
} else if (
(feature('PROACTIVE') || feature('KAIROS')) &&
(message.request as { subtype: string }).subtype === 'set_proactive'
(msg.request as { subtype: string }).subtype === 'set_proactive'
) {
const req = message.request as unknown as {
const req = msg.request as unknown as {
subtype: string
enabled: boolean
}
@@ -3884,12 +3895,12 @@ function runHeadlessStreaming(
} else {
proactiveModule!.deactivateProactive()
}
sendControlResponseSuccess(message)
} else if (message.request.subtype === 'remote_control') {
if (message.request.enabled) {
sendControlResponseSuccess(msg)
} else if (req.subtype === 'remote_control') {
if (req.enabled as boolean) {
if (bridgeHandle) {
// Already connected
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg, {
session_url: getRemoteSessionUrl(
bridgeHandle.bridgeSessionId,
bridgeHandle.sessionIngressUrl,
@@ -3972,7 +3983,7 @@ function runHeadlessStreaming(
})
if (!handle) {
sendControlResponseError(
message,
msg,
bridgeFailureDetail ??
'Remote Control initialization failed',
)
@@ -3988,7 +3999,7 @@ function runHeadlessStreaming(
structuredIO.setOnControlRequestResolved(requestId => {
handle.sendControlCancelRequest(requestId)
})
sendControlResponseSuccess(message, {
sendControlResponseSuccess(msg, {
session_url: getRemoteSessionUrl(
handle.bridgeSessionId,
handle.sessionIngressUrl,
@@ -4001,7 +4012,7 @@ function runHeadlessStreaming(
})
}
} catch (err) {
sendControlResponseError(message, errorMessage(err))
sendControlResponseError(msg, errorMessage(err))
}
}
} else {
@@ -4012,21 +4023,21 @@ function runHeadlessStreaming(
await bridgeHandle.teardown()
bridgeHandle = null
}
sendControlResponseSuccess(message)
sendControlResponseSuccess(msg)
}
} else {
// Unknown control request subtype — send an error response so
// the caller doesn't hang waiting for a reply that never comes.
sendControlResponseError(
message,
`Unsupported control request subtype: ${(message.request as { subtype: string }).subtype}`,
msg,
`Unsupported control request subtype: ${(msg.request as { subtype: string }).subtype}`,
)
}
continue
} else if (message.type === 'control_response') {
// Replay control_response messages when replay mode is enabled
if (options.replayUserMessages) {
output.enqueue(message)
output.enqueue(message as StdoutMessage)
}
continue
} else if (message.type === 'keep_alive') {
@@ -4038,11 +4049,11 @@ function runHeadlessStreaming(
} else if (message.type === 'assistant' || message.type === 'system') {
// History replay from bridge: inject into mutableMessages as
// conversation context so the model sees prior turns.
const internalMsgs = toInternalMessages([message])
const internalMsgs = toInternalMessages([message as SDKMessage])
mutableMessages.push(...internalMsgs)
// Echo assistant messages back so CCR displays them
if (message.type === 'assistant' && options.replayUserMessages) {
output.enqueue(message)
output.enqueue(message as StdoutMessage)
}
continue
}
@@ -4051,58 +4062,61 @@ function runHeadlessStreaming(
if (message.type !== 'user') {
continue
}
// Type assertion: after the type guard, message is a user message.
// The union with SDKMessage (any) prevents proper narrowing.
const userMsg = message as SDKUserMessage
// First prompt message implicitly initializes if not already done.
initialized = true
// Check for duplicate user message - skip if already processed
if (message.uuid) {
if (userMsg.uuid) {
const sessionId = getSessionId() as UUID
const existsInSession = await doesMessageExistInSession(
sessionId,
message.uuid,
userMsg.uuid as UUID,
)
// Check both historical duplicates (from file) and runtime duplicates (this session)
if (existsInSession || receivedMessageUuids.has(message.uuid)) {
logForDebugging(`Skipping duplicate user message: ${message.uuid}`)
if (existsInSession || receivedMessageUuids.has(userMsg.uuid as UUID)) {
logForDebugging(`Skipping duplicate user message: ${userMsg.uuid}`)
// Send acknowledgment for duplicate message if replay mode is enabled
if (options.replayUserMessages) {
logForDebugging(
`Sending acknowledgment for duplicate user message: ${message.uuid}`,
`Sending acknowledgment for duplicate user message: ${userMsg.uuid}`,
)
output.enqueue({
type: 'user',
content: message.message?.content ?? '',
message: message.message,
content: (userMsg.message as { content?: string })?.content ?? '',
message: userMsg.message as { role: string; content: unknown },
session_id: sessionId,
parent_tool_use_id: null,
uuid: message.uuid,
timestamp: message.timestamp,
uuid: userMsg.uuid as string,
timestamp: (userMsg as { timestamp?: string }).timestamp,
isReplay: true,
} as unknown as SDKUserMessageReplay)
} as unknown as SDKUserMessageReplay as StdoutMessage)
}
// Historical dup = transcript already has this turn's output, so it
// ran but its lifecycle was never closed (interrupted before ack).
// Runtime dups don't need this — the original enqueue path closes them.
if (existsInSession) {
notifyCommandLifecycle(message.uuid, 'completed')
notifyCommandLifecycle(userMsg.uuid as string, 'completed')
}
// Don't enqueue duplicate messages for execution
continue
}
// Track this UUID to prevent runtime duplicates
trackReceivedMessageUuid(message.uuid)
trackReceivedMessageUuid(userMsg.uuid as UUID)
}
enqueue({
mode: 'prompt' as const,
// file_attachments rides the protobuf catchall from the web composer.
// Same-ref no-op when absent (no 'file_attachments' key).
value: await resolveAndPrepend(message, message.message.content),
uuid: message.uuid,
priority: message.priority,
value: await resolveAndPrepend(userMsg, (userMsg.message as { content: ContentBlockParam[] }).content),
uuid: userMsg.uuid as `${string}-${string}-${string}-${string}-${string}`,
priority: (userMsg as { priority?: string }).priority as import('src/types/textInputTypes.js').QueuePriority,
})
// Increment prompt count for attribution tracking and save snapshot
// The snapshot persists promptCount so it survives compaction
@@ -4463,7 +4477,7 @@ async function handleInitializeRequest(
})),
output_style: outputStyle,
available_output_styles: Object.keys(availableOutputStyles),
models: modelInfos,
models: modelInfos as unknown as SDKControlInitializeResponse['models'],
account: {
email: accountInfo?.email,
organization: accountInfo?.organization,
@@ -4473,7 +4487,7 @@ async function handleInitializeRequest(
// getAccountInformation() returns undefined under 3P providers, so the
// other fields are all absent. apiProvider disambiguates "not logged
// in" (firstParty + tokenSource:none) from "3P, login not applicable".
apiProvider: getAPIProvider(),
apiProvider: getAPIProvider() as 'firstParty' | 'bedrock' | 'vertex' | 'foundry',
},
pid: process.pid,
}

View File

@@ -355,7 +355,7 @@ export class StructuredIO {
// Used by bridge session runner for auth token refresh
// (CLAUDE_CODE_SESSION_ACCESS_TOKEN) which must be readable
// by the REPL process itself, not just child Bash commands.
const variables = message.variables as Record<string, string>
const variables = message.variables ?? {}
const keys = Object.keys(variables)
for (const [key, value] of Object.entries(variables)) {
process.env[key] = value
@@ -377,7 +377,8 @@ export class StructuredIO {
if (uuid) {
notifyCommandLifecycle(uuid, 'completed')
}
const request = this.pendingRequests.get(message.response.request_id)
const resp = message.response as { request_id: string; subtype: string; response?: Record<string, unknown>; error?: string }
const request = this.pendingRequests.get(resp.request_id)
if (!request) {
// Check if this tool_use was already resolved through the normal
// permission flow. Duplicate control_response deliveries (e.g. from
@@ -385,8 +386,8 @@ export class StructuredIO {
// re-processing them would push duplicate assistant messages into
// the conversation, causing API 400 errors.
const responsePayload =
message.response.subtype === 'success'
? message.response.response
resp.subtype === 'success'
? resp.response
: undefined
const toolUseID = responsePayload?.toolUseID
if (
@@ -394,31 +395,31 @@ export class StructuredIO {
this.resolvedToolUseIds.has(toolUseID)
) {
logForDebugging(
`Ignoring duplicate control_response for already-resolved toolUseID=${toolUseID} request_id=${message.response.request_id}`,
`Ignoring duplicate control_response for already-resolved toolUseID=${toolUseID} request_id=${resp.request_id}`,
)
return undefined
}
if (this.unexpectedResponseCallback) {
await this.unexpectedResponseCallback(message)
await this.unexpectedResponseCallback(message as SDKControlResponse & { uuid?: string })
}
return undefined // Ignore responses for requests we don't know about
}
this.trackResolvedToolUseId(request.request)
this.pendingRequests.delete(message.response.request_id)
this.pendingRequests.delete(resp.request_id)
// Notify the bridge when the SDK consumer resolves a can_use_tool
// request, so it can cancel the stale permission prompt on claude.ai.
if (
(request.request.request as { subtype?: string }).subtype === 'can_use_tool' &&
this.onControlRequestResolved
) {
this.onControlRequestResolved(message.response.request_id)
this.onControlRequestResolved(resp.request_id)
}
if (message.response.subtype === 'error') {
request.reject(new Error(message.response.error))
if (resp.subtype === 'error') {
request.reject(new Error(resp.error ?? 'Unknown error'))
return undefined
}
const result = message.response.response
const result = resp.response
if (request.schema) {
try {
request.resolve(request.schema.parse(result))
@@ -454,9 +455,9 @@ export class StructuredIO {
if (message.type === 'assistant' || message.type === 'system') {
return message
}
if (message.message.role !== 'user') {
if ((message as { message?: { role?: string } }).message?.role !== 'user') {
exitWithMessage(
`Error: Expected message role 'user', got '${message.message.role}'`,
`Error: Expected message role 'user', got '${(message as { message?: { role?: string } }).message?.role}'`,
)
}
return message
@@ -678,7 +679,7 @@ export class StructuredIO {
{
subtype: 'hook_callback',
callback_id: callbackId,
input,
input: input as Parameters<HookCallback['callback']>[0],
tool_use_id: toolUseID || undefined,
},
hookJSONOutputSchema(),