import { feature } from 'bun:bundle' import { type FSWatcher, watch } from 'fs' import React, { useCallback, useEffect, useRef } from 'react' import { setMainLoopModelOverride } from '../bootstrap/state.js' import { type BridgePermissionCallbacks, type BridgePermissionResponse, parseBridgePermissionResponse, } from '../bridge/bridgePermissionCallbacks.js' import { handleRemoteInterrupt } from '../bridge/remoteInterruptHandling.js' import { isTranscriptResetResultReady, shouldDeferBridgeResult, } from '../bridge/bridgeResultScheduling.js' import { buildBridgeConnectUrl } from '../bridge/bridgeStatusUtil.js' import { extractInboundMessageFields } from '../bridge/inboundMessages.js' import type { BridgeState, ReplBridgeHandle } from '../bridge/replBridge.js' import { setReplBridgeHandle } from '../bridge/replBridgeHandle.js' import type { Command } from '../commands.js' import { getSlashCommandToolSkills, isBridgeSafeCommand } from '../commands.js' import { getRemoteSessionUrl } from '../constants/product.js' import { useNotifications } from '../context/notifications.js' import type { PermissionMode, SDKMessage, } from '../entrypoints/agentSdkTypes.js' import type { SDKControlResponse } from '../entrypoints/sdk/controlTypes.js' import { Text } from '@anthropic/ink' import { getFeatureValue_CACHED_MAY_BE_STALE } from '../services/analytics/growthbook.js' import { useAppState, useAppStateStore, useSetAppState, } from '../state/AppState.js' import type { Message } from '../types/message.js' import { getCwd } from '../utils/cwd.js' import { logForDebugging } from '../utils/debug.js' import { errorMessage } from '../utils/errors.js' import { enqueue } from '../utils/messageQueueManager.js' import { buildSystemInitMessage } from '../utils/messages/systemInit.js' import { createBridgeStatusMessage, createSystemMessage, } from '../utils/messages.js' import { buildTaskStateMessage, getTaskStateSnapshotKey, } from '../utils/taskStateMessage.js' import { getAutoModeUnavailableNotification, getAutoModeUnavailableReason, isAutoModeGateEnabled, isBypassPermissionsModeDisabled, transitionPermissionMode, } from '../utils/permissions/permissionSetup.js' import { getLeaderToolUseConfirmQueue } from '../utils/swarm/leaderPermissionBridge.js' import { getTaskListId, getTasksDir, listTasks, onTasksUpdated, } from '../utils/tasks.js' import { ContentBlockParam } from '@anthropic-ai/sdk/resources' const TASK_STATE_DEBOUNCE_MS = 50 const TASK_STATE_POLL_MS = 5000 /** How long after a failure before replBridgeEnabled is auto-cleared (stops retries). */ export const BRIDGE_FAILURE_DISMISS_MS = 10_000 /** * Max consecutive initReplBridge failures before the hook stops re-attempting * for the session lifetime. Guards against paths that flip replBridgeEnabled * back on after auto-disable (settings sync, /remote-control, config tool) * when the underlying OAuth is unrecoverable — each re-attempt is another * guaranteed 401 against POST /v1/environments/bridge. Datadog 2026-03-08: * top stuck client generated 2,879 × 401/day alone (17% of all 401s on the * route). */ const MAX_CONSECUTIVE_INIT_FAILURES = 3 /** * Hook that initializes an always-on bridge connection in the background * and writes new user/assistant messages to the bridge session. * * Silently skips if bridge is not enabled or user is not OAuth-authenticated. * * Watches AppState.replBridgeEnabled — when toggled off (via /config or footer), * the bridge is torn down. When toggled back on, it re-initializes. * * Inbound messages from claude.ai are injected into the REPL via queuedCommands. */ export function useReplBridge( messages: Message[], setMessages: (action: React.SetStateAction) => void, abortControllerRef: React.RefObject, commands: readonly Command[], mainLoopModel: string, ): { sendBridgeResult: () => void } { const handleRef = useRef(null) const teardownPromiseRef = useRef | undefined>(undefined) const lastWrittenIndexRef = useRef(0) const pendingResultAfterFlushRef = useRef(false) const transcriptResetPendingRef = useRef(false) // Tracks UUIDs already flushed as initial messages. Persists across // bridge reconnections so Bridge #2+ only sends new messages — sending // duplicate UUIDs causes the server to kill the WebSocket. const flushedUUIDsRef = useRef(new Set()) const failureTimeoutRef = useRef | undefined>( undefined, ) // Persists across effect re-runs (unlike the effect's local state). Reset // only on successful init. Hits MAX_CONSECUTIVE_INIT_FAILURES → fuse blown // for the session, regardless of replBridgeEnabled re-toggling. const consecutiveFailuresRef = useRef(0) const setAppState = useSetAppState() const commandsRef = useRef(commands) commandsRef.current = commands const mainLoopModelRef = useRef(mainLoopModel) mainLoopModelRef.current = mainLoopModel const messagesRef = useRef(messages) messagesRef.current = messages const store = useAppStateStore() const { addNotification } = useNotifications() const replBridgeEnabled = feature('BRIDGE_MODE') ? useAppState(s => s.replBridgeEnabled) : false const replBridgeConnected = feature('BRIDGE_MODE') ? useAppState(s => s.replBridgeConnected) : false const replBridgeSessionActive = feature('BRIDGE_MODE') ? useAppState(s => s.replBridgeSessionActive) : false const replBridgeOutboundOnly = feature('BRIDGE_MODE') ? useAppState(s => s.replBridgeOutboundOnly) : false const replBridgeInitialName = feature('BRIDGE_MODE') ? useAppState(s => s.replBridgeInitialName) : undefined // Initialize/teardown bridge when enabled state changes. // Passes current messages as initialMessages so the remote session // starts with the existing conversation context (e.g. from /bridge). useEffect(() => { // feature() check must use positive pattern for dead code elimination — // negative pattern (if (!feature(...)) return) does NOT eliminate // dynamic imports below. if (feature('BRIDGE_MODE')) { if (!replBridgeEnabled) return const outboundOnly = replBridgeOutboundOnly function notifyBridgeFailed(detail?: string): void { if (outboundOnly) return addNotification({ key: 'bridge-failed', jsx: ( <> Remote Control failed {detail && · {detail}} ), priority: 'immediate', }) } if (consecutiveFailuresRef.current >= MAX_CONSECUTIVE_INIT_FAILURES) { logForDebugging( `[bridge:repl] Hook: ${consecutiveFailuresRef.current} consecutive init failures, not retrying this session`, ) // Clear replBridgeEnabled so /remote-control doesn't mistakenly show // BridgeDisconnectDialog for a bridge that never connected. const fuseHint = 'disabled after repeated failures · restart to retry' notifyBridgeFailed(fuseHint) setAppState(prev => { if (prev.replBridgeError === fuseHint && !prev.replBridgeEnabled) return prev return { ...prev, replBridgeError: fuseHint, replBridgeEnabled: false, } }) return } let cancelled = false // Capture messages.length now so we don't re-send initial messages // through writeMessages after the bridge connects. const initialMessageCount = messages.length void (async () => { try { // Wait for any in-progress teardown to complete before registering // a new environment. Without this, the deregister HTTP call from // the previous teardown races with the new register call, and the // server may tear down the freshly-created environment. if (teardownPromiseRef.current) { logForDebugging( '[bridge:repl] Hook: waiting for previous teardown to complete before re-init', ) await teardownPromiseRef.current teardownPromiseRef.current = undefined logForDebugging( '[bridge:repl] Hook: previous teardown complete, proceeding with re-init', ) } if (cancelled) return // Dynamic import so the module is tree-shaken in external builds const { initReplBridge } = await import('../bridge/initReplBridge.js') const { shouldShowAppUpgradeMessage } = await import( '../bridge/envLessBridgeConfig.js' ) // Assistant mode: perpetual bridge session — claude.ai shows one // continuous conversation across CLI restarts instead of a new // session per invocation. initBridgeCore reads bridge-pointer.json // (the same crash-recovery file #20735 added) and reuses its // {environmentId, sessionId} via reuseEnvironmentId + // api.reconnectSession(). Teardown skips archive/deregister/ // pointer-clear so the session survives clean exits, not just // crashes. Non-assistant bridges clear the pointer on teardown // (crash-recovery only). let perpetual = false if (feature('KAIROS')) { const { isAssistantMode } = await import('../assistant/index.js') perpetual = isAssistantMode() } // When a user message arrives from claude.ai, inject it into the REPL. // Preserves the original UUID so that when the message is forwarded // back to CCR, it matches the original — avoiding duplicate messages. // // Async because file_attachments (if present) need a network fetch + // disk write before we enqueue with the @path prefix. Caller doesn't // await — messages with attachments just land in the queue slightly // later, which is fine (web messages aren't rapid-fire). async function handleInboundMessage(msg: SDKMessage): Promise { try { const fields = extractInboundMessageFields(msg) if (!fields) return const { uuid } = fields // Dynamic import keeps the bridge code out of non-BRIDGE_MODE builds. const { resolveAndPrepend } = await import( '../bridge/inboundAttachments.js' ) const rawContent = fields.content let sanitized: string | Array<{ type: string; [key: string]: unknown }> = typeof rawContent === 'string' ? rawContent : rawContent as unknown as Array<{ type: string; [key: string]: unknown }> if (feature('KAIROS_GITHUB_WEBHOOKS')) { /* eslint-disable @typescript-eslint/no-require-imports */ const { sanitizeInboundWebhookContent } = require('../bridge/webhookSanitizer.js') as typeof import('../bridge/webhookSanitizer.js') /* eslint-enable @typescript-eslint/no-require-imports */ if (typeof sanitized === 'string') { sanitized = sanitizeInboundWebhookContent(sanitized) } } const content = await resolveAndPrepend(msg, sanitized as string | ContentBlockParam[]) const preview = typeof content === 'string' ? content.slice(0, 80) : `[${content.length} content blocks]` logForDebugging( `[bridge:repl] Injecting inbound user message: ${preview}${uuid ? ` uuid=${uuid}` : ''}`, ) enqueue({ value: content, mode: 'prompt' as const, uuid, // skipSlashCommands stays true as defense-in-depth — // processUserInputBase overrides it internally when bridgeOrigin // is set AND the resolved command passes isBridgeSafeCommand. // This keeps exit-word suppression and immediate-command blocks // intact for any code path that checks skipSlashCommands directly. skipSlashCommands: true, bridgeOrigin: true, }) } catch (e) { logForDebugging( `[bridge:repl] handleInboundMessage failed: ${e}`, { level: 'error' }, ) } } // State change callback — maps bridge lifecycle events to AppState. function handleStateChange( state: BridgeState, detail?: string, ): void { if (cancelled) return if (outboundOnly) { logForDebugging( `[bridge:repl] Mirror state=${state}${detail ? ` detail=${detail}` : ''}`, ) // Sync replBridgeConnected so the forwarding effect starts/stops // writing as the transport comes up or dies. if (state === 'failed') { setAppState(prev => { if (!prev.replBridgeConnected) return prev return { ...prev, replBridgeConnected: false } }) } else if (state === 'ready' || state === 'connected') { setAppState(prev => { if (prev.replBridgeConnected) return prev return { ...prev, replBridgeConnected: true } }) } return } const handle = handleRef.current switch (state) { case 'ready': setAppState(prev => { const connectUrl = handle && handle.environmentId !== '' ? buildBridgeConnectUrl( handle.environmentId, handle.sessionIngressUrl, ) : prev.replBridgeConnectUrl const sessionUrl = handle ? getRemoteSessionUrl( handle.bridgeSessionId, handle.sessionIngressUrl, ) : prev.replBridgeSessionUrl const envId = handle?.environmentId const sessionId = handle?.bridgeSessionId if ( prev.replBridgeConnected && !prev.replBridgeSessionActive && !prev.replBridgeReconnecting && prev.replBridgeConnectUrl === connectUrl && prev.replBridgeSessionUrl === sessionUrl && prev.replBridgeEnvironmentId === envId && prev.replBridgeSessionId === sessionId ) { return prev } return { ...prev, replBridgeConnected: true, replBridgeSessionActive: false, replBridgeReconnecting: false, replBridgeConnectUrl: connectUrl, replBridgeSessionUrl: sessionUrl, replBridgeEnvironmentId: envId, replBridgeSessionId: sessionId, replBridgeError: undefined, } }) break case 'connected': { setAppState(prev => { if (prev.replBridgeSessionActive) return prev return { ...prev, replBridgeConnected: true, replBridgeSessionActive: true, replBridgeReconnecting: false, replBridgeError: undefined, } }) // Send system/init so remote clients (web/iOS/Android) get // session metadata. REPL uses query() directly — never hits // QueryEngine's SDKMessage layer — so this is the only path // to put system/init on the REPL-bridge wire. Skills load is // async (memoized, cheap after REPL startup); fire-and-forget // so the connected-state transition isn't blocked. if ( getFeatureValue_CACHED_MAY_BE_STALE( 'tengu_bridge_system_init', false, ) ) { void (async () => { try { const skills = await getSlashCommandToolSkills(getCwd()) if (cancelled) return const state = store.getState() handleRef.current?.writeSdkMessages([ buildSystemInitMessage({ // tools/mcpClients/plugins redacted for REPL-bridge: // MCP-prefixed tool names and server names leak which // integrations the user has wired up; plugin paths leak // raw filesystem paths (username, project structure). // CCR v2 persists SDK messages to Spanner — users who // tap "Connect from phone" may not expect these on // Anthropic's servers. QueryEngine (SDK) still emits // full lists — SDK consumers expect full telemetry. tools: [], mcpClients: [], model: mainLoopModelRef.current, permissionMode: state.toolPermissionContext .mode as PermissionMode, // TODO: avoid the cast // Remote clients can only invoke bridge-safe commands — // advertising unsafe ones (local-jsx, unallowed local) // would let mobile/web attempt them and hit errors. commands: commandsRef.current.filter(isBridgeSafeCommand), agents: state.agentDefinitions.activeAgents, skills, plugins: [], fastMode: state.fastMode, }), ]) } catch (err) { logForDebugging( `[bridge:repl] Failed to send system/init: ${errorMessage(err)}`, { level: 'error' }, ) } })() } break } case 'reconnecting': setAppState(prev => { if (prev.replBridgeReconnecting) return prev return { ...prev, replBridgeReconnecting: true, replBridgeSessionActive: false, } }) break case 'failed': // Clear any previous failure dismiss timer clearTimeout(failureTimeoutRef.current) notifyBridgeFailed(detail) setAppState(prev => ({ ...prev, replBridgeError: detail, replBridgeReconnecting: false, replBridgeSessionActive: false, replBridgeConnected: false, })) // Auto-disable after timeout so the hook stops retrying. failureTimeoutRef.current = setTimeout(() => { if (cancelled) return failureTimeoutRef.current = undefined setAppState(prev => { if (!prev.replBridgeError) return prev return { ...prev, replBridgeEnabled: false, replBridgeError: undefined, } }) }, BRIDGE_FAILURE_DISMISS_MS) break } } // Map of pending bridge permission response handlers, keyed by request_id. // Each entry is an onResponse handler waiting for CCR to reply. const pendingPermissionHandlers = new Map< string, (response: BridgePermissionResponse) => void >() // Dispatch incoming control_response messages to registered handlers function handlePermissionResponse(msg: SDKControlResponse): void { const requestId = msg.response?.request_id if (!requestId) return const handler = pendingPermissionHandlers.get(requestId) if (!handler) { logForDebugging( `[bridge:repl] No handler for control_response request_id=${requestId}`, ) return } const parsed = parseBridgePermissionResponse(msg) if (!parsed) { logForDebugging( `[bridge:repl] Ignoring unrecognized control_response request_id=${requestId}`, ) return } pendingPermissionHandlers.delete(requestId) handler(parsed) } const rawHandle = await initReplBridge({ outboundOnly, tags: outboundOnly ? ['ccr-mirror'] : undefined, onInboundMessage: handleInboundMessage, onPermissionResponse: handlePermissionResponse, onInterrupt() { handleRemoteInterrupt(abortControllerRef.current) }, onSetModel(model) { const resolved = model === 'default' ? null : (model ?? null) setMainLoopModelOverride(resolved) setAppState(prev => { if (prev.mainLoopModelForSession === resolved) return prev return { ...prev, mainLoopModelForSession: resolved } }) }, onSetMaxThinkingTokens(maxTokens) { const enabled = maxTokens !== null setAppState(prev => { if (prev.thinkingEnabled === enabled) return prev return { ...prev, thinkingEnabled: enabled } }) }, onSetPermissionMode(mode) { // Policy guards MUST fire before transitionPermissionMode — // its internal auto-gate check is a defensive throw (with a // setAutoModeActive(true) side-effect BEFORE the throw) rather // than a graceful reject. Letting that throw escape would: // (1) leave STATE.autoModeActive=true while the mode is // unchanged (3-way invariant violation per src/CLAUDE.md) // (2) fail to send a control_response → server kills WS // These mirror print.ts handleSetPermissionMode; the bridge // can't import the checks directly (bootstrap-isolation), so // it relies on this verdict to emit the error response. if (mode === 'bypassPermissions') { if (isBypassPermissionsModeDisabled()) { return { ok: false, error: 'Cannot set permission mode to bypassPermissions because it is disabled by settings or configuration', } } if ( !store.getState().toolPermissionContext .isBypassPermissionsModeAvailable ) { return { ok: false, error: 'Cannot set permission mode to bypassPermissions because the session was not launched with --dangerously-skip-permissions', } } } if ( feature('TRANSCRIPT_CLASSIFIER') && mode === 'auto' && !isAutoModeGateEnabled() ) { const reason = getAutoModeUnavailableReason() return { ok: false, error: reason ? `Cannot set permission mode to auto: ${getAutoModeUnavailableNotification(reason)}` : 'Cannot set permission mode to auto', } } // Guards passed — apply via the centralized transition so // prePlanMode stashing and auto-mode state sync all fire. setAppState(prev => { const current = prev.toolPermissionContext.mode if (current === mode) return prev const next = transitionPermissionMode( current, mode, prev.toolPermissionContext, ) return { ...prev, toolPermissionContext: { ...next, mode }, } }) // Recheck queued permission prompts now that mode changed. setImmediate(() => { getLeaderToolUseConfirmQueue()?.(currentQueue => { currentQueue.forEach(item => { void item.recheckPermission() }) return currentQueue }) }) return { ok: true } }, onStateChange: handleStateChange, initialMessages: messages.length > 0 ? messages : undefined, getMessages: () => messagesRef.current, previouslyFlushedUUIDs: flushedUUIDsRef.current, initialName: replBridgeInitialName, perpetual, }) const handle = rawHandle ? { ...rawHandle, markTranscriptReset() { transcriptResetPendingRef.current = true pendingResultAfterFlushRef.current = false lastWrittenIndexRef.current = 0 }, } : null if (cancelled) { // Effect was cancelled while initReplBridge was in flight. // Tear down the handle to avoid leaking resources (poll loop, // WebSocket, registered environment, cleanup callback). logForDebugging( `[bridge:repl] Hook: init cancelled during flight, tearing down${handle ? ` env=${handle.environmentId}` : ''}`, ) if (handle) { void handle.teardown() } return } if (!handle) { // initReplBridge returned null — a precondition failed. For most // cases (no_oauth, policy_denied, etc.) onStateChange('failed') // already fired with a specific hint. The GrowthBook-gate-off case // is intentionally silent — not a failure, just not rolled out. consecutiveFailuresRef.current++ logForDebugging( `[bridge:repl] Init returned null (precondition or session creation failed); consecutive failures: ${consecutiveFailuresRef.current}`, ) clearTimeout(failureTimeoutRef.current) setAppState(prev => ({ ...prev, replBridgeError: prev.replBridgeError ?? 'check debug logs for details', })) failureTimeoutRef.current = setTimeout(() => { if (cancelled) return failureTimeoutRef.current = undefined setAppState(prev => { if (!prev.replBridgeError) return prev return { ...prev, replBridgeEnabled: false, replBridgeError: undefined, } }) }, BRIDGE_FAILURE_DISMISS_MS) return } handleRef.current = handle setReplBridgeHandle(handle) consecutiveFailuresRef.current = 0 // Skip initial messages in the forwarding effect — they were // already loaded as session events during creation. lastWrittenIndexRef.current = initialMessageCount if (outboundOnly) { setAppState(prev => { if ( prev.replBridgeConnected && prev.replBridgeSessionId === handle.bridgeSessionId ) return prev return { ...prev, replBridgeConnected: true, replBridgeSessionId: handle.bridgeSessionId, replBridgeSessionUrl: undefined, replBridgeConnectUrl: undefined, replBridgeError: undefined, } }) logForDebugging( `[bridge:repl] Mirror initialized, session=${handle.bridgeSessionId}`, ) } else { // Build bridge permission callbacks so the interactive permission // handler can race bridge responses against local user interaction. const permissionCallbacks: BridgePermissionCallbacks = { sendRequest( requestId, toolName, input, toolUseId, description, permissionSuggestions, blockedPath, ) { handle.sendControlRequest({ type: 'control_request', request_id: requestId, request: { subtype: 'can_use_tool', tool_name: toolName, input, tool_use_id: toolUseId, description, ...(permissionSuggestions ? { permission_suggestions: permissionSuggestions } : {}), ...(blockedPath ? { blocked_path: blockedPath } : {}), }, }) }, sendResponse(requestId, response) { const payload: Record = { ...response } handle.sendControlResponse({ type: 'control_response', response: { subtype: 'success', request_id: requestId, response: payload, }, }) }, cancelRequest(requestId) { handle.sendControlCancelRequest(requestId) }, onResponse(requestId, handler) { pendingPermissionHandlers.set(requestId, handler) return () => { pendingPermissionHandlers.delete(requestId) } }, } setAppState(prev => ({ ...prev, replBridgePermissionCallbacks: permissionCallbacks, })) const url = getRemoteSessionUrl( handle.bridgeSessionId, handle.sessionIngressUrl, ) // environmentId === '' signals the v2 env-less path. buildBridgeConnectUrl // builds an env-specific connect URL, which doesn't exist without an env. const hasEnv = handle.environmentId !== '' const connectUrl = hasEnv ? buildBridgeConnectUrl( handle.environmentId, handle.sessionIngressUrl, ) : undefined setAppState(prev => { if ( prev.replBridgeConnected && prev.replBridgeSessionUrl === url ) { return prev } return { ...prev, replBridgeConnected: true, replBridgeSessionUrl: url, replBridgeConnectUrl: connectUrl ?? prev.replBridgeConnectUrl, replBridgeEnvironmentId: handle.environmentId, replBridgeSessionId: handle.bridgeSessionId, replBridgeError: undefined, } }) // Show bridge status with URL in the transcript. perpetual (KAIROS // assistant mode) falls back to v1 at initReplBridge.ts — skip the // v2-only upgrade nudge for them. Own try/catch so a cosmetic // GrowthBook hiccup doesn't hit the outer init-failure handler. const upgradeNudge = !perpetual ? await shouldShowAppUpgradeMessage().catch(() => false) : false if (cancelled) return setMessages(prev => [ ...prev, createBridgeStatusMessage( url, upgradeNudge ? 'Please upgrade to the latest version of the Claude mobile app to see your Remote Control sessions.' : undefined, ), ]) logForDebugging( `[bridge:repl] Hook initialized, session=${handle.bridgeSessionId}`, ) } } catch (err) { // Never crash the REPL — surface the error in the UI. // Check cancelled first (symmetry with the !handle path at line ~386): // if initReplBridge threw during rapid toggle-off (in-flight network // error), don't count that toward the fuse or spam a stale error // into the UI. Also fixes pre-existing spurious setAppState/ // setMessages on cancelled throws. if (cancelled) return consecutiveFailuresRef.current++ const errMsg = errorMessage(err) logForDebugging( `[bridge:repl] Init failed: ${errMsg}; consecutive failures: ${consecutiveFailuresRef.current}`, ) clearTimeout(failureTimeoutRef.current) notifyBridgeFailed(errMsg) setAppState(prev => ({ ...prev, replBridgeError: errMsg, })) failureTimeoutRef.current = setTimeout(() => { if (cancelled) return failureTimeoutRef.current = undefined setAppState(prev => { if (!prev.replBridgeError) return prev return { ...prev, replBridgeEnabled: false, replBridgeError: undefined, } }) }, BRIDGE_FAILURE_DISMISS_MS) if (!outboundOnly) { setMessages(prev => [ ...prev, createSystemMessage( `Remote Control failed to connect: ${errMsg}`, 'warning', ), ]) } } })() return () => { cancelled = true clearTimeout(failureTimeoutRef.current) failureTimeoutRef.current = undefined if (handleRef.current) { logForDebugging( `[bridge:repl] Hook cleanup: starting teardown for env=${handleRef.current.environmentId} session=${handleRef.current.bridgeSessionId}`, ) teardownPromiseRef.current = handleRef.current.teardown() handleRef.current = null setReplBridgeHandle(null) } setAppState(prev => { if ( !prev.replBridgeConnected && !prev.replBridgeSessionActive && !prev.replBridgeError ) { return prev } return { ...prev, replBridgeConnected: false, replBridgeSessionActive: false, replBridgeReconnecting: false, replBridgeConnectUrl: undefined, replBridgeSessionUrl: undefined, replBridgeEnvironmentId: undefined, replBridgeSessionId: undefined, replBridgeError: undefined, replBridgePermissionCallbacks: undefined, } }) lastWrittenIndexRef.current = 0 pendingResultAfterFlushRef.current = false transcriptResetPendingRef.current = false } } }, [ replBridgeEnabled, replBridgeOutboundOnly, setAppState, setMessages, addNotification, ]) // Write new messages as they appear. // Also re-runs when replBridgeConnected changes (bridge finishes init), // so any messages that arrived before the bridge was ready get written. useEffect(() => { // Positive feature() guard — see first useEffect comment if (feature('BRIDGE_MODE')) { if (!replBridgeConnected) return const handle = handleRef.current if (!handle) return // Clamp the index in case messages were compacted (array shortened). // After compaction the ref could exceed messages.length, and without // clamping no new messages would be forwarded. if (lastWrittenIndexRef.current > messages.length) { logForDebugging( `[bridge:repl] Compaction detected: lastWrittenIndex=${lastWrittenIndexRef.current} > messages.length=${messages.length}, clamping`, ) } const startIndex = Math.min(lastWrittenIndexRef.current, messages.length) // Collect new messages since last write const newMessages: Message[] = [] for (let i = startIndex; i < messages.length; i++) { const msg = messages[i] if ( msg && (msg.type === 'user' || msg.type === 'assistant' || (msg.type === 'system' && msg.subtype === 'local_command')) ) { newMessages.push(msg) } } lastWrittenIndexRef.current = messages.length if (newMessages.length > 0) { handle.writeMessages(newMessages) transcriptResetPendingRef.current = false } if ( pendingResultAfterFlushRef.current && isTranscriptResetResultReady( transcriptResetPendingRef.current, messages.length, ) ) { transcriptResetPendingRef.current = false pendingResultAfterFlushRef.current = false handle.sendResult() return } if ( pendingResultAfterFlushRef.current && !transcriptResetPendingRef.current ) { pendingResultAfterFlushRef.current = false handle.sendResult() } } }, [messages, replBridgeConnected]) useEffect(() => { if (feature('BRIDGE_MODE')) { if (!replBridgeSessionActive || replBridgeOutboundOnly) return let cancelled = false let debounceTimer: ReturnType | undefined let pollTimer: ReturnType | undefined let watcher: FSWatcher | null = null let watchedDir: string | null = null let lastPublishedSnapshotKey: string | null = null let lastPublishedHandle: ReplBridgeHandle | null = null const rewatch = (dir: string): void => { if (dir === watchedDir && watcher !== null) return watcher?.close() watcher = null watchedDir = dir try { watcher = watch(dir, schedulePublish) watcher.unref() } catch { // Writers ensure the directory exists; if it does not yet, the // poll timer and in-process task signal still converge the snapshot. } } const publishTaskState = async (): Promise => { const handle = handleRef.current if (!handle) return const taskListId = getTaskListId() rewatch(getTasksDir(taskListId)) try { const tasks = await listTasks(taskListId) if (cancelled || handleRef.current !== handle) return const snapshotKey = getTaskStateSnapshotKey(taskListId, tasks) if ( snapshotKey === lastPublishedSnapshotKey && handle === lastPublishedHandle ) { return } handle.writeSdkMessages([buildTaskStateMessage(taskListId, tasks)]) lastPublishedSnapshotKey = snapshotKey lastPublishedHandle = handle } catch (err) { logForDebugging( `[bridge:repl] Failed to publish task_state: ${errorMessage(err)}`, { level: 'error' }, ) } } const schedulePublish = (): void => { if (debounceTimer) clearTimeout(debounceTimer) debounceTimer = setTimeout(() => { debounceTimer = undefined void publishTaskState() }, TASK_STATE_DEBOUNCE_MS) debounceTimer.unref?.() } void publishTaskState() const unsubscribe = onTasksUpdated(schedulePublish) pollTimer = setInterval(() => { void publishTaskState() }, TASK_STATE_POLL_MS) pollTimer.unref?.() return () => { cancelled = true unsubscribe() if (debounceTimer) clearTimeout(debounceTimer) if (pollTimer) clearInterval(pollTimer) watcher?.close() } } }, [replBridgeSessionActive, replBridgeOutboundOnly]) const sendBridgeResult = useCallback(() => { if (feature('BRIDGE_MODE')) { const handle = handleRef.current if (!handle) { pendingResultAfterFlushRef.current = true return } if ( isTranscriptResetResultReady( transcriptResetPendingRef.current, messagesRef.current.length, ) ) { transcriptResetPendingRef.current = false pendingResultAfterFlushRef.current = false handle.sendResult() return } // Message mirroring happens in a separate effect. When the turn completes // before that effect flushes the latest transcript rows, hold the result // so remote state transitions after the final mirrored messages instead // of bouncing back to "running" on local slash commands like /clear. if ( transcriptResetPendingRef.current || shouldDeferBridgeResult({ hasHandle: true, isConnected: replBridgeConnected, lastWrittenIndex: lastWrittenIndexRef.current, messageCount: messagesRef.current.length, }) ) { pendingResultAfterFlushRef.current = true return } handle.sendResult() } }, [replBridgeConnected]) return { sendBridgeResult } }