feat(remote-control): 优化 Web 展示、状态同步与桥接控制流程 (#288)

Co-authored-by: chengzifeng <chengzifeng@meituan.com>
This commit is contained in:
Cheng Zi Feng
2026-04-17 16:21:27 +08:00
committed by GitHub
parent b5c299f5d2
commit 72a2093cd6
64 changed files with 4138 additions and 312 deletions

View File

@@ -1,11 +1,17 @@
import { feature } from 'bun:bundle'
import { type FSWatcher, watch } from 'fs'
import React, { useCallback, useEffect, useRef } from 'react'
import { setMainLoopModelOverride } from '../bootstrap/state.js'
import {
type BridgePermissionCallbacks,
type BridgePermissionResponse,
isBridgePermissionResponse,
parseBridgePermissionResponse,
} from '../bridge/bridgePermissionCallbacks.js'
import { handleRemoteInterrupt } from '../bridge/remoteInterruptHandling.js'
import {
isTranscriptResetResultReady,
shouldDeferBridgeResult,
} from '../bridge/bridgeResultScheduling.js'
import { buildBridgeConnectUrl } from '../bridge/bridgeStatusUtil.js'
import { extractInboundMessageFields } from '../bridge/inboundMessages.js'
import type { BridgeState, ReplBridgeHandle } from '../bridge/replBridge.js'
@@ -36,6 +42,10 @@ import {
createBridgeStatusMessage,
createSystemMessage,
} from '../utils/messages.js'
import {
buildTaskStateMessage,
getTaskStateSnapshotKey,
} from '../utils/taskStateMessage.js'
import {
getAutoModeUnavailableNotification,
getAutoModeUnavailableReason,
@@ -44,8 +54,17 @@ import {
transitionPermissionMode,
} from '../utils/permissions/permissionSetup.js'
import { getLeaderToolUseConfirmQueue } from '../utils/swarm/leaderPermissionBridge.js'
import {
getTaskListId,
getTasksDir,
listTasks,
onTasksUpdated,
} from '../utils/tasks.js'
import { ContentBlockParam } from '@anthropic-ai/sdk/resources'
const TASK_STATE_DEBOUNCE_MS = 50
const TASK_STATE_POLL_MS = 5000
/** How long after a failure before replBridgeEnabled is auto-cleared (stops retries). */
export const BRIDGE_FAILURE_DISMISS_MS = 10_000
@@ -81,6 +100,8 @@ export function useReplBridge(
const handleRef = useRef<ReplBridgeHandle | null>(null)
const teardownPromiseRef = useRef<Promise<void> | undefined>(undefined)
const lastWrittenIndexRef = useRef(0)
const pendingResultAfterFlushRef = useRef(false)
const transcriptResetPendingRef = useRef(false)
// Tracks UUIDs already flushed as initial messages. Persists across
// bridge reconnections so Bridge #2+ only sends new messages — sending
// duplicate UUIDs causes the server to kill the WebSocket.
@@ -109,6 +130,10 @@ export function useReplBridge(
? // biome-ignore lint/correctness/useHookAtTopLevel: feature() is a compile-time constant
useAppState(s => s.replBridgeConnected)
: false
const replBridgeSessionActive = feature('BRIDGE_MODE')
? // biome-ignore lint/correctness/useHookAtTopLevel: feature() is a compile-time constant
useAppState(s => s.replBridgeSessionActive)
: false
const replBridgeOutboundOnly = feature('BRIDGE_MODE')
? // biome-ignore lint/correctness/useHookAtTopLevel: feature() is a compile-time constant
useAppState(s => s.replBridgeOutboundOnly)
@@ -454,25 +479,24 @@ export function useReplBridge(
)
return
}
pendingPermissionHandlers.delete(requestId)
// Extract the permission decision from the control_response payload
const inner = msg.response
if (
inner.subtype === 'success' &&
inner.response &&
isBridgePermissionResponse(inner.response)
) {
handler(inner.response)
const parsed = parseBridgePermissionResponse(msg)
if (!parsed) {
logForDebugging(
`[bridge:repl] Ignoring unrecognized control_response request_id=${requestId}`,
)
return
}
pendingPermissionHandlers.delete(requestId)
handler(parsed)
}
const handle = await initReplBridge({
const rawHandle = await initReplBridge({
outboundOnly,
tags: outboundOnly ? ['ccr-mirror'] : undefined,
onInboundMessage: handleInboundMessage,
onPermissionResponse: handlePermissionResponse,
onInterrupt() {
abortControllerRef.current?.abort()
handleRemoteInterrupt(abortControllerRef.current)
},
onSetModel(model) {
const resolved = model === 'default' ? null : (model ?? null)
@@ -565,6 +589,16 @@ export function useReplBridge(
initialName: replBridgeInitialName,
perpetual,
})
const handle = rawHandle
? {
...rawHandle,
markTranscriptReset() {
transcriptResetPendingRef.current = true
pendingResultAfterFlushRef.current = false
lastWrittenIndexRef.current = 0
},
}
: null
if (cancelled) {
// Effect was cancelled while initReplBridge was in flight.
// Tear down the handle to avoid leaking resources (poll loop,
@@ -816,6 +850,8 @@ export function useReplBridge(
}
})
lastWrittenIndexRef.current = 0
pendingResultAfterFlushRef.current = false
transcriptResetPendingRef.current = false
}
}
}, [
@@ -864,15 +900,152 @@ export function useReplBridge(
if (newMessages.length > 0) {
handle.writeMessages(newMessages)
transcriptResetPendingRef.current = false
}
if (
pendingResultAfterFlushRef.current &&
isTranscriptResetResultReady(
transcriptResetPendingRef.current,
messages.length,
)
) {
transcriptResetPendingRef.current = false
pendingResultAfterFlushRef.current = false
handle.sendResult()
return
}
if (
pendingResultAfterFlushRef.current &&
!transcriptResetPendingRef.current
) {
pendingResultAfterFlushRef.current = false
handle.sendResult()
}
}
}, [messages, replBridgeConnected])
useEffect(() => {
if (feature('BRIDGE_MODE')) {
if (!replBridgeSessionActive || replBridgeOutboundOnly) return
let cancelled = false
let debounceTimer: ReturnType<typeof setTimeout> | undefined
let pollTimer: ReturnType<typeof setInterval> | undefined
let watcher: FSWatcher | null = null
let watchedDir: string | null = null
let lastPublishedSnapshotKey: string | null = null
let lastPublishedHandle: ReplBridgeHandle | null = null
const rewatch = (dir: string): void => {
if (dir === watchedDir && watcher !== null) return
watcher?.close()
watcher = null
watchedDir = dir
try {
watcher = watch(dir, schedulePublish)
watcher.unref()
} catch {
// Writers ensure the directory exists; if it does not yet, the
// poll timer and in-process task signal still converge the snapshot.
}
}
const publishTaskState = async (): Promise<void> => {
const handle = handleRef.current
if (!handle) return
const taskListId = getTaskListId()
rewatch(getTasksDir(taskListId))
try {
const tasks = await listTasks(taskListId)
if (cancelled || handleRef.current !== handle) return
const snapshotKey = getTaskStateSnapshotKey(taskListId, tasks)
if (
snapshotKey === lastPublishedSnapshotKey &&
handle === lastPublishedHandle
) {
return
}
handle.writeSdkMessages([buildTaskStateMessage(taskListId, tasks)])
lastPublishedSnapshotKey = snapshotKey
lastPublishedHandle = handle
} catch (err) {
logForDebugging(
`[bridge:repl] Failed to publish task_state: ${errorMessage(err)}`,
{ level: 'error' },
)
}
}
const schedulePublish = (): void => {
if (debounceTimer) clearTimeout(debounceTimer)
debounceTimer = setTimeout(() => {
debounceTimer = undefined
void publishTaskState()
}, TASK_STATE_DEBOUNCE_MS)
debounceTimer.unref?.()
}
void publishTaskState()
const unsubscribe = onTasksUpdated(schedulePublish)
pollTimer = setInterval(() => {
void publishTaskState()
}, TASK_STATE_POLL_MS)
pollTimer.unref?.()
return () => {
cancelled = true
unsubscribe()
if (debounceTimer) clearTimeout(debounceTimer)
if (pollTimer) clearInterval(pollTimer)
watcher?.close()
}
}
}, [replBridgeSessionActive, replBridgeOutboundOnly])
const sendBridgeResult = useCallback(() => {
if (feature('BRIDGE_MODE')) {
handleRef.current?.sendResult()
const handle = handleRef.current
if (!handle) {
pendingResultAfterFlushRef.current = true
return
}
if (
isTranscriptResetResultReady(
transcriptResetPendingRef.current,
messagesRef.current.length,
)
) {
transcriptResetPendingRef.current = false
pendingResultAfterFlushRef.current = false
handle.sendResult()
return
}
// Message mirroring happens in a separate effect. When the turn completes
// before that effect flushes the latest transcript rows, hold the result
// so remote state transitions after the final mirrored messages instead
// of bouncing back to "running" on local slash commands like /clear.
if (
transcriptResetPendingRef.current ||
shouldDeferBridgeResult({
hasHandle: true,
isConnected: replBridgeConnected,
lastWrittenIndex: lastWrittenIndexRef.current,
messageCount: messagesRef.current.length,
})
) {
pendingResultAfterFlushRef.current = true
return
}
handle.sendResult()
}
}, [])
}, [replBridgeConnected])
return { sendBridgeResult }
}