feat: 远程群控 (#243)

* feat: restore pipe IPC, LAN pipes, monitor tool, and PR-package features

Core IPC system (UDS_INBOX):
- PipeServer/PipeClient with UDS + TCP dual transport, NDJSON protocol
- PipeRegistry: machineId-based role assignment, file locking
- Master/slave attach, prompt relay, permission forwarding
- Heartbeat lifecycle with parallel isPipeAlive probes
- Commands: /pipes, /attach, /detach, /send, /claim-main, /pipe-status

LAN Pipes (LAN_PIPES):
- UDP multicast beacon (224.0.71.67:7101) for zero-config LAN discovery
- PipeServer TCP listener, PipeClient TCP connect mode
- Heartbeat auto-attaches LAN peers via TCP
- Cross-machine attach allowed regardless of role
- /pipes shows [LAN] peers with role + hostname/IP
- SendMessageTool supports tcp: scheme with user consent

Architecture — extracted hooks from REPL.tsx (~830 lines → ~20 lines):
- usePipeIpc: lifecycle (bootstrap, handlers, heartbeat, cleanup)
- usePipeRelay: slave→master message relay via module singleton
- usePipePermissionForward: permission request/cancel forwarding
- usePipeRouter: selected pipe input routing with role+IP labels
- Shared ndjsonFramer.ts replaces 3 duplicate NDJSON parsers

Key fixes applied during development:
- Multicast binds to correct LAN interface (not WSL/Docker)
- Beacon ref stored as module singleton (not Zustand state mutation)
- Heartbeat preserves LAN peers in discoveredPipes and selectedPipes
- Disconnect handler calls removeSlaveClient (fixes listener leak)
- cleanupStaleEntries probes without lock, writes briefly under lock
- getMachineId uses async execFile (not blocking execSync)
- globalThis.__pipeSendToMaster replaced with setPipeRelay singleton
- M key only toggles route mode when selector panel is expanded
- User prompt displayed in message list on pipe broadcast
- Broadcast notifications show [role] + hostname/IP for LAN peers

Other restored features:
- Monitor tool: /monitor command, MonitorTool, MonitorMcpTask lifecycle
- Daemon supervisor and remoteControlServer command
- Tools: SnipTool, SleepTool, ListPeersTool, SendUserFileTool,
  WebBrowserTool, WorkflowTool, and 10+ stub→implementation rewrites
- Feature flags: UDS_INBOX, LAN_PIPES, MONITOR_TOOL, FORK_SUBAGENT,
  KAIROS, COORDINATOR_MODE, WORKFLOW_SCRIPTS, HISTORY_SNIP

Tests: 2190 pass / 0 fail (15 new: lanBeacon 7, peerAddress 8)

* fix: resolve merge conflicts and fix all tsc/test errors after main merge

- Export ToolResultBlockParam from Tool.ts (14 tool files fixed)
- Migrate ink imports from ../../ink.js to @anthropic/ink (7 files)
- Fix toolUseID → toolUseId typo in monitor.ts and MonitorTool.tsx
- Add fallback values for string|undefined type errors (8 locations)
- Fix AppState type in assistant.ts, add NewInstallWizard stubs
- Fix ParsedRepository.repo → .name in subscribe-pr.ts
- Fix AgentId/string type mismatch in BackgroundTasksDialog.tsx
- Fix PipeRelayFn return type in pipePermissionRelay.ts
- Use PipeMessage type in usePipeRelay.ts
- Fix lanBeacon.test.ts mock type assertions
- Create missing MouseActionEvent class for ink package
- Use ansi: color format instead of bare "green"/"red"
- Resolve theme.permission access via getTheme()

Result: 0 tsc errors, 2496 tests pass, 0 fail

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: 恢复 /poor 的说明

---------

Co-authored-by: unraid <local@unraid.local>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
claude-code-best
2026-04-11 23:22:55 +08:00
committed by GitHub
parent 2fea429dc6
commit 09fc515edb
124 changed files with 10958 additions and 577 deletions

View File

@@ -0,0 +1,116 @@
import { afterEach, describe, expect, test } from 'bun:test'
import {
addSlaveClient,
applyPipeEntryToSlaveState,
getConnectedSlaveTargets,
resetSlaveClientsForTesting,
subscribePipeEntries,
} from '../useMasterMonitor.js'
afterEach(() => {
resetSlaveClientsForTesting()
})
describe('useMasterMonitor registry helpers', () => {
test('returns only attached and connected targets from a selection list', () => {
addSlaveClient('cli-a', { connected: true } as any)
addSlaveClient('cli-b', { connected: false } as any)
const targets = getConnectedSlaveTargets(['cli-a', 'cli-b', 'cli-c'])
expect(targets).toHaveLength(1)
expect(targets[0]?.name).toBe('cli-a')
expect(targets[0]?.client.connected).toBe(true)
})
test('returns an empty array when no selected targets are connected', () => {
addSlaveClient('cli-a', { connected: false } as any)
expect(getConnectedSlaveTargets(['cli-a', 'cli-missing'])).toEqual([])
})
test('applies prompt_ack as busy activity with a summary', () => {
const next = applyPipeEntryToSlaveState(
{
name: 'cli-a',
connectedAt: '2026-04-08T00:00:00.000Z',
status: 'idle',
unreadCount: 0,
history: [],
},
{
type: 'prompt_ack',
content: 'accepted',
from: 'cli-a',
timestamp: '2026-04-08T00:00:01.000Z',
},
)
expect(next.status).toBe('busy')
expect(next.lastEventType).toBe('prompt_ack')
expect(next.lastSummary).toBe('accepted')
expect(next.unreadCount).toBe(1)
})
test('applies done and error entries to terminal slave states', () => {
const doneState = applyPipeEntryToSlaveState(
{
name: 'cli-a',
connectedAt: '2026-04-08T00:00:00.000Z',
status: 'busy',
unreadCount: 1,
history: [],
},
{
type: 'done',
content: 'completed',
from: 'cli-a',
timestamp: '2026-04-08T00:00:02.000Z',
},
)
expect(doneState.status).toBe('idle')
expect(doneState.lastSummary).toBe('completed')
const errorState = applyPipeEntryToSlaveState(doneState, {
type: 'error',
content: 'failed',
from: 'cli-a',
timestamp: '2026-04-08T00:00:03.000Z',
})
expect(errorState.status).toBe('error')
expect(errorState.lastEventType).toBe('error')
expect(errorState.lastSummary).toBe('failed')
expect(errorState.unreadCount).toBe(3)
})
test('emits pipe entries immediately when connected clients receive messages', () => {
const handlers = new Map<string, (msg: any) => void>()
const client = {
connected: true,
on(event: string, handler: (msg: any) => void) {
handlers.set(event, handler)
},
removeListener(event: string) {
handlers.delete(event)
},
}
const seen: Array<{ name: string; type: string; content: string }> = []
const unsubscribe = subscribePipeEntries((name, entry) => {
seen.push({ name, type: entry.type, content: entry.content })
})
addSlaveClient('cli-a', client as any)
handlers.get('message')?.({
type: 'stream',
data: 'hello',
from: 'cli-a',
ts: '2026-04-08T00:00:04.000Z',
})
expect(seen).toEqual([{ name: 'cli-a', type: 'stream', content: 'hello' }])
unsubscribe()
})
})

View File

@@ -4,6 +4,7 @@ import { randomUUID } from 'crypto'
import { logForDebugging } from 'src/utils/debug.js'
import { getAllowedChannels } from '../../../bootstrap/state.js'
import type { BridgePermissionCallbacks } from '../../../bridge/bridgePermissionCallbacks.js'
import type { ToolUseConfirm } from '../../../components/permissions/PermissionRequest.js'
import { getTerminalFocused } from '@anthropic/ink'
import {
CHANNEL_PERMISSION_REQUEST_METHOD,
@@ -25,6 +26,11 @@ import {
setYoloClassifierApproval,
} from '../../../utils/classifierApprovals.js'
import { errorMessage } from '../../../utils/errors.js'
import {
forgetPipePermissionRequest,
notifyPipePermissionCancel,
tryRelayPipePermissionRequest,
} from '../../../utils/pipePermissionRelay.js'
import type { PermissionDecision } from '../../../utils/permissions/PermissionResult.js'
import type { PermissionUpdate } from '../../../utils/permissions/PermissionUpdateSchema.js'
import { hasPermissionsToUseTool } from '../../../utils/permissions/permissions.js'
@@ -82,6 +88,18 @@ function handleInteractivePermission(
const permissionPromptStartTimeMs = Date.now()
const displayInput = result.updatedInput ?? ctx.input
let pipePermissionRequestId: string | null = null
function forgetPipePermission(reason?: string): void {
notifyPipePermissionCancel(pipePermissionRequestId, reason)
forgetPipePermissionRequest(pipePermissionRequestId)
pipePermissionRequestId = null
}
function forgetPipePermissionSilently(): void {
forgetPipePermissionRequest(pipePermissionRequestId)
pipePermissionRequestId = null
}
function clearClassifierIndicator(): void {
if (feature('BASH_CLASSIFIER')) {
@@ -89,7 +107,7 @@ function handleInteractivePermission(
}
}
ctx.pushToQueue({
const toolUseConfirm: ToolUseConfirm = {
assistantMessage: ctx.assistantMessage,
tool: ctx.tool,
description,
@@ -136,6 +154,7 @@ function handleInteractivePermission(
},
onAbort() {
if (!claim()) return
forgetPipePermission('Permission request was aborted locally in sub.')
if (bridgeCallbacks && bridgeRequestId) {
bridgeCallbacks.sendResponse(bridgeRequestId, {
behavior: 'deny',
@@ -158,6 +177,7 @@ function handleInteractivePermission(
contentBlocks?: ContentBlockParam[],
) {
if (!claim()) return // atomic check-and-mark before await
forgetPipePermission('Permission request was approved locally in sub.')
if (bridgeCallbacks && bridgeRequestId) {
bridgeCallbacks.sendResponse(bridgeRequestId, {
@@ -182,6 +202,7 @@ function handleInteractivePermission(
},
onReject(feedback?: string, contentBlocks?: ContentBlockParam[]) {
if (!claim()) return
forgetPipePermission('Permission request was rejected locally in sub.')
if (bridgeCallbacks && bridgeRequestId) {
bridgeCallbacks.sendResponse(bridgeRequestId, {
@@ -220,6 +241,7 @@ function handleInteractivePermission(
// a CCR-initiated mode switch, the very case this callback exists
// for after useReplBridge started calling it).
if (!claim()) return
forgetPipePermission('Permission request was resolved locally in sub.')
if (bridgeCallbacks && bridgeRequestId) {
bridgeCallbacks.cancelRequest(bridgeRequestId)
}
@@ -229,7 +251,65 @@ function handleInteractivePermission(
resolveOnce(ctx.buildAllow(freshResult.updatedInput ?? ctx.input))
}
},
})
}
ctx.pushToQueue(toolUseConfirm)
pipePermissionRequestId = tryRelayPipePermissionRequest(
toolUseConfirm,
response => {
if (!claim()) return
forgetPipePermissionSilently()
clearClassifierChecking(ctx.toolUseID)
clearClassifierIndicator()
ctx.removeFromQueue()
channelUnsubscribe?.()
if (bridgeCallbacks && bridgeRequestId) {
bridgeCallbacks.cancelRequest(bridgeRequestId)
}
if (response.behavior === 'allow') {
void (async () => {
if (response.permissionUpdates?.length) {
void ctx.persistPermissions(response.permissionUpdates)
}
ctx.logDecision(
{
decision: 'accept',
source: {
type: 'user',
permanent: !!response.permissionUpdates?.length,
},
},
{ permissionPromptStartTimeMs },
)
resolveOnce(
ctx.buildAllow(response.updatedInput ?? displayInput, {
acceptFeedback: response.feedback,
contentBlocks: response.contentBlocks,
}),
)
})()
} else {
ctx.logDecision(
{
decision: 'reject',
source: {
type: 'user_reject',
hasFeedback: !!response.feedback,
},
},
{ permissionPromptStartTimeMs },
)
resolveOnce(
ctx.cancelAndAbort(
response.feedback,
undefined,
response.contentBlocks,
),
)
}
},
)
// Race 4: Bridge permission response from CCR (claude.ai)
// When the bridge is connected, send the permission request to CCR and
@@ -257,6 +337,9 @@ function handleInteractivePermission(
bridgeRequestId,
response => {
if (!claim()) return // Local user/hook/classifier already responded
forgetPipePermission(
'Permission request was resolved by bridge before pipe response.',
)
signal.removeEventListener('abort', unsubscribe)
clearClassifierChecking(ctx.toolUseID)
clearClassifierIndicator()
@@ -364,6 +447,9 @@ function handleInteractivePermission(
channelRequestId,
response => {
if (!claim()) return // Another racer won
forgetPipePermission(
'Permission request was resolved by channel before pipe response.',
)
channelUnsubscribe?.() // both: map delete + listener remove
clearClassifierChecking(ctx.toolUseID)
clearClassifierIndicator()
@@ -421,6 +507,9 @@ function handleInteractivePermission(
permissionPromptStartTimeMs,
)
if (!hookDecision || !claim()) return
forgetPipePermission(
'Permission request was resolved by hook before pipe response.',
)
if (bridgeCallbacks && bridgeRequestId) {
bridgeCallbacks.cancelRequest(bridgeRequestId)
}
@@ -453,6 +542,9 @@ function handleInteractivePermission(
},
onAllow: decisionReason => {
if (!claim()) return
forgetPipePermission(
'Permission request was auto-approved before pipe response.',
)
if (bridgeCallbacks && bridgeRequestId) {
bridgeCallbacks.cancelRequest(bridgeRequestId)
}

View File

@@ -61,15 +61,18 @@ function stepTeammateSelection(
* Custom hook that handles Shift+Up/Down keyboard navigation for background tasks.
* When teammates (swarm) are present, navigates between leader and teammates.
* When only non-teammate background tasks exist, opens the background tasks dialog.
* When pipe IPC is active (UDS_INBOX), Shift+Down toggles the pipe selector panel.
* Also handles Enter to confirm selection, 'f' to view transcript, and 'k' to kill.
*/
export function useBackgroundTaskNavigation(options?: {
onOpenBackgroundTasks?: () => void
onTogglePipeSelector?: () => void
}): { handleKeyDown: (e: KeyboardEvent) => void } {
const tasks = useAppState(s => s.tasks)
const viewSelectionMode = useAppState(s => s.viewSelectionMode)
const viewingAgentTaskId = useAppState(s => s.viewingAgentTaskId)
const selectedIPAgentIndex = useAppState(s => s.selectedIPAgentIndex)
const pipeIpc = useAppState(s => (s as any).pipeIpc)
const setAppState = useSetAppState()
// Filter to running teammates and sort alphabetically to match TeammateSpinnerTree display
@@ -177,12 +180,20 @@ export function useBackgroundTaskNavigation(options?: {
// Shift+Up/Down for teammate transcript switching (with wrapping)
// Index -1 represents the leader, 0+ are teammates
// When showSpinnerTree is true, index === teammateCount is the "hide" row
// Third case: when pipe IPC is active and no teammates/background tasks, toggle pipe selector
if (e.shift && (e.key === 'up' || e.key === 'down')) {
e.preventDefault()
if (teammateCount > 0) {
stepTeammateSelection(e.key === 'down' ? 1 : -1, setAppState)
} else if (hasNonTeammateBackgroundTasks) {
options?.onOpenBackgroundTasks?.()
} else if (
e.key === 'down' &&
pipeIpc?.statusVisible &&
options?.onTogglePipeSelector
) {
// Shift+Down opens pipe selector when pipe IPC is active and no other navigation targets
options.onTogglePipeSelector()
}
return
}

View File

@@ -0,0 +1,327 @@
/**
* useMasterMonitor — master-side slave registry helpers plus an optional hook
*
* The module-level registry helpers are the live integration point used by
* attach/send/status flows. The hook remains available for history syncing if
* a caller wants AppState to mirror slave session events.
*
* The master CLI itself remains fully functional — this hook only collects
* data from slaves for review via /history and /status commands.
*/
import { useEffect, useSyncExternalStore } from 'react'
import { useAppState, useSetAppState } from '../state/AppState.js'
import {
getPipeIpc,
type PipeClient,
type PipeMessage,
type PipeIpcSlaveState,
} from '../utils/pipeTransport.js'
import { logForDebugging } from '../utils/debug.js'
/** Session history entry for pipe IPC monitoring. */
export type SessionEntry = {
type: string
content: string
from: string
timestamp: string
meta?: Record<string, unknown>
}
function summarizePipeEntry(entry: SessionEntry): string | undefined {
const content = entry.content.trim()
switch (entry.type) {
case 'prompt':
return content ? `Queued: ${content}` : 'Queued prompt'
case 'prompt_ack':
return content || 'Accepted'
case 'stream':
return content || undefined
case 'tool_start':
return content ? `Tool: ${content}` : 'Tool started'
case 'tool_result':
return content ? `Tool result: ${content}` : 'Tool completed'
case 'done':
return content || 'Completed'
case 'error':
return content || 'Error'
default:
return content || undefined
}
}
function statusForPipeEntry(
currentStatus: PipeIpcSlaveState['status'],
entryType: SessionEntry['type'],
): PipeIpcSlaveState['status'] {
switch (entryType) {
case 'prompt':
case 'prompt_ack':
case 'stream':
case 'tool_start':
case 'tool_result':
return 'busy'
case 'done':
return 'idle'
case 'error':
return 'error'
default:
return currentStatus
}
}
export function applyPipeEntryToSlaveState(
slave: PipeIpcSlaveState,
entry: SessionEntry,
): PipeIpcSlaveState {
return {
...slave,
status: statusForPipeEntry(slave.status, entry.type),
lastActivityAt: entry.timestamp,
lastSummary: summarizePipeEntry(entry),
lastEventType: entry.type as PipeIpcSlaveState['lastEventType'],
unreadCount: (slave.unreadCount ?? 0) + 1,
history: [...slave.history, entry],
}
}
/**
* Module-level registry of connected slave PipeClients.
* Keyed by slave pipe name. Managed by /attach and /detach commands.
*/
const _slaveClients = new Map<string, PipeClient>()
const _slaveClientRegistryListeners = new Set<() => void>()
const _pipeEntryListeners = new Set<
(slaveName: string, entry: SessionEntry) => void
>()
const _pipeEntryHandlers = new Map<string, (msg: PipeMessage) => void>()
let _slaveClientRegistryVersion = 0
const MONITORED_PIPE_ENTRY_TYPES = [
'prompt_ack',
'stream',
'tool_start',
'tool_result',
'done',
'error',
'prompt',
'permission_request',
'permission_cancel',
]
function isMonitoredPipeEntryType(type: string): boolean {
return MONITORED_PIPE_ENTRY_TYPES.includes(type)
}
function pipeMessageToSessionEntry(
slaveName: string,
msg: PipeMessage,
): SessionEntry {
return {
type: msg.type as SessionEntry['type'],
content: msg.data ?? '',
from: msg.from ?? slaveName,
timestamp: msg.ts ?? new Date().toISOString(),
meta: msg.meta,
}
}
function emitPipeEntry(slaveName: string, entry: SessionEntry): void {
for (const listener of _pipeEntryListeners) {
listener(slaveName, entry)
}
}
export function subscribePipeEntries(
listener: (slaveName: string, entry: SessionEntry) => void,
): () => void {
_pipeEntryListeners.add(listener)
return () => {
_pipeEntryListeners.delete(listener)
}
}
function detachPipeEntryEmitter(name: string, client?: PipeClient): void {
const handler = _pipeEntryHandlers.get(name)
if (!handler) return
client?.removeListener?.('message', handler)
_pipeEntryHandlers.delete(name)
}
function attachPipeEntryEmitter(name: string, client: PipeClient): void {
detachPipeEntryEmitter(name, _slaveClients.get(name))
if (typeof client.on !== 'function') return
const handler = (msg: PipeMessage) => {
if (!isMonitoredPipeEntryType(msg.type)) return
emitPipeEntry(name, pipeMessageToSessionEntry(name, msg))
}
_pipeEntryHandlers.set(name, handler)
client.on('message', handler)
}
function emitSlaveClientRegistryChanged(): void {
_slaveClientRegistryVersion += 1
for (const listener of _slaveClientRegistryListeners) {
listener()
}
}
function subscribeToSlaveClientRegistry(listener: () => void): () => void {
_slaveClientRegistryListeners.add(listener)
return () => {
_slaveClientRegistryListeners.delete(listener)
}
}
function getSlaveClientRegistryVersion(): number {
return _slaveClientRegistryVersion
}
export function addSlaveClient(name: string, client: PipeClient): void {
attachPipeEntryEmitter(name, client)
_slaveClients.set(name, client)
emitSlaveClientRegistryChanged()
}
export function removeSlaveClient(name: string): PipeClient | undefined {
const client = _slaveClients.get(name)
detachPipeEntryEmitter(name, client)
_slaveClients.delete(name)
emitSlaveClientRegistryChanged()
return client
}
export function getSlaveClient(name: string): PipeClient | undefined {
return _slaveClients.get(name)
}
export function getAllSlaveClients(): Map<string, PipeClient> {
return _slaveClients
}
export type ConnectedSlaveTarget = {
name: string
client: PipeClient
}
/**
* Resolve a selection list to currently connected slave clients.
*
* The pipe selector can include discovered-but-not-attached names. Routing
* should only treat attached, connected clients as broadcast targets.
*/
export function getConnectedSlaveTargets(
selectedNames: string[],
): ConnectedSlaveTarget[] {
const targets: ConnectedSlaveTarget[] = []
for (const name of selectedNames) {
const client = _slaveClients.get(name)
if (client?.connected) {
targets.push({ name, client })
}
}
return targets
}
export function resetSlaveClientsForTesting(): void {
for (const [name, client] of _slaveClients.entries()) {
detachPipeEntryEmitter(name, client)
}
_slaveClients.clear()
emitSlaveClientRegistryChanged()
}
export function useMasterMonitor(): void {
const role = useAppState(s => getPipeIpc(s).role)
const setAppState = useSetAppState()
const registryVersion = useSyncExternalStore(
subscribeToSlaveClientRegistry,
getSlaveClientRegistryVersion,
getSlaveClientRegistryVersion,
)
useEffect(() => {
if (role !== 'master' && _slaveClients.size === 0) return
// Set up listeners for each connected slave client
const cleanups: (() => void)[] = []
for (const [slaveName, client] of _slaveClients.entries()) {
const handler = (msg: PipeMessage) => {
const entry = pipeMessageToSessionEntry(slaveName, msg)
// Only record relevant message types
if (!isMonitoredPipeEntryType(msg.type)) {
return
}
setAppState(prev => {
const slave = getPipeIpc(prev).slaves[slaveName]
if (!slave) return prev
const newStatus =
msg.type === 'done' || msg.type === 'error'
? 'idle'
: msg.type === 'prompt'
? 'busy'
: slave.status
return {
...prev,
pipeIpc: {
...getPipeIpc(prev),
slaves: {
...getPipeIpc(prev).slaves,
[slaveName]: applyPipeEntryToSlaveState(
{
...slave,
status: newStatus,
},
entry,
),
},
},
}
})
if (msg.type === 'done') {
logForDebugging(`[MasterMonitor] Slave "${slaveName}" turn complete`)
}
}
client.on('message', handler)
// Handle slave disconnect
const onDisconnect = () => {
logForDebugging(`[MasterMonitor] Slave "${slaveName}" disconnected`)
removeSlaveClient(slaveName)
setAppState(prev => {
const { [slaveName]: _removed, ...remainingSlaves } =
getPipeIpc(prev).slaves
const hasSlaves = Object.keys(remainingSlaves).length > 0
return {
...prev,
pipeIpc: {
...getPipeIpc(prev),
role: hasSlaves ? 'master' : 'main',
displayRole: hasSlaves ? 'master' : 'main',
slaves: remainingSlaves,
},
}
})
}
client.on('disconnect', onDisconnect)
cleanups.push(() => {
client.removeListener('message', handler)
client.removeListener('disconnect', onDisconnect)
})
}
return () => {
for (const cleanup of cleanups) {
cleanup()
}
}
}, [registryVersion, role, setAppState])
}

623
src/hooks/usePipeIpc.ts Normal file
View File

@@ -0,0 +1,623 @@
/**
* usePipeIpc — Pipe IPC lifecycle hook.
*
* Extracted from REPL.tsx's 575-line inline useEffect. Manages:
* 1. Server creation (UDS + optional TCP for LAN)
* 2. LAN beacon startup
* 3. Message handlers (ping, attach, prompt, permission, detach)
* 4. Heartbeat loop (main: auto-attach + cleanup; sub: detect main alive)
* 5. Cleanup on unmount
*
* Feature-gated by UDS_INBOX. LAN extensions gated by LAN_PIPES.
*/
import { feature } from 'bun:bundle'
import { useEffect } from 'react'
import type {
PipeMessage,
PipeServer,
PipeIpcState,
} from '../utils/pipeTransport.js'
// Lazy-loaded module accessors (cached by Bun/Node require)
/* eslint-disable @typescript-eslint/no-require-imports */
const pt = () =>
require('../utils/pipeTransport.js') as typeof import('../utils/pipeTransport.js')
const pr = () =>
require('../utils/pipeRegistry.js') as typeof import('../utils/pipeRegistry.js')
const mm = () =>
require('./useMasterMonitor.js') as typeof import('./useMasterMonitor.js')
const bs = () =>
require('../bootstrap/state.js') as typeof import('../bootstrap/state.js')
const lb = () =>
require('../utils/lanBeacon.js') as typeof import('../utils/lanBeacon.js')
const pp = () =>
require('../utils/pipePermissionRelay.js') as typeof import('../utils/pipePermissionRelay.js')
const osm = () => require('os') as typeof import('os')
/* eslint-enable @typescript-eslint/no-require-imports */
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
type StoreApi = {
getState: () => any
setState: (updater: (prev: any) => any) => void
}
export type UsePipeIpcOptions = {
store: StoreApi
handleIncomingPrompt: (content: string) => boolean
}
// ---------------------------------------------------------------------------
// Helper: remove a dead slave from registry + state
// ---------------------------------------------------------------------------
function removeDeadSlave(slaveName: string, store: StoreApi): void {
mm().removeSlaveClient(slaveName)
store.setState((prev: any) => {
const pipeIpc = pt().getPipeIpc(prev)
const { [slaveName]: _removed, ...remainingSlaves } = pipeIpc.slaves
return {
...prev,
pipeIpc: {
...pipeIpc,
role: Object.keys(remainingSlaves).length > 0 ? 'master' : 'main',
displayRole:
Object.keys(remainingSlaves).length > 0 ? 'master' : 'main',
slaves: remainingSlaves,
selectedPipes: (pipeIpc.selectedPipes ?? []).filter(
(name: string) => name !== slaveName,
),
discoveredPipes: (pipeIpc.discoveredPipes ?? []).filter(
(pipe: { pipeName: string }) => pipe.pipeName !== slaveName,
),
},
}
})
}
// ---------------------------------------------------------------------------
// Helper: refresh discovered pipes (local subs + LAN peers)
// ---------------------------------------------------------------------------
function refreshDiscoveredPipes(
pipeName: string,
aliveSubs: Array<{
id: string
pipeName: string
subIndex: number
machineId: string
ip: string
hostname: string
}>,
store: StoreApi,
): void {
const freshDiscovered = aliveSubs
.filter(sub => sub.pipeName !== pipeName)
.map(sub => ({
id: sub.id,
pipeName: sub.pipeName,
role: `sub-${sub.subIndex}`,
machineId: sub.machineId,
ip: sub.ip,
hostname: sub.hostname,
alive: true,
}))
// Include LAN beacon peers so they aren't wiped out by heartbeat
let lanDiscovered: typeof freshDiscovered = []
if (feature('LAN_PIPES')) {
const beacon = lb().getLanBeacon()
if (beacon) {
const localNames = new Set(freshDiscovered.map(p => p.pipeName))
localNames.add(pipeName)
for (const [pName, peer] of beacon.getPeers()) {
if (!localNames.has(pName)) {
lanDiscovered.push({
id: `lan-${pName}`,
pipeName: pName,
role: peer.role,
machineId: peer.machineId,
ip: peer.ip,
hostname: peer.hostname,
alive: true,
})
}
}
}
}
const allDiscovered = [...freshDiscovered, ...lanDiscovered]
// Only update state if the list actually changed
const prev = pt().getPipeIpc(store.getState())
const prevNames = (prev.discoveredPipes ?? [])
.map((p: any) => p.pipeName)
.join(',')
const newNames = allDiscovered.map(p => p.pipeName).join(',')
if (prevNames === newNames) return
store.setState((prev: any) => {
const pipeIpc = pt().getPipeIpc(prev)
const aliveNames = new Set(allDiscovered.map(pipe => pipe.pipeName))
return {
...prev,
pipeIpc: {
...pipeIpc,
discoveredPipes: allDiscovered,
selectedPipes: (pipeIpc.selectedPipes ?? []).filter((name: string) =>
aliveNames.has(name),
),
},
}
})
}
// ---------------------------------------------------------------------------
// Phase: Register message handlers on server
// ---------------------------------------------------------------------------
function registerMessageHandlers(
server: PipeServer,
pipeName: string,
machineId: string,
store: StoreApi,
handleIncomingPrompt: (content: string) => boolean,
): void {
// Auto-reply pings for health checks
server.onMessage((msg: PipeMessage, reply) => {
if (msg.type === 'ping') reply({ type: 'pong' })
})
// Handle attach requests
server.onMessage((msg: PipeMessage, reply) => {
if (msg.type !== 'attach_request') return
const state = store.getState()
const currentPipeState = pt().getPipeIpc(state)
if (pt().isPipeControlled(currentPipeState)) {
reply({ type: 'attach_reject', data: 'Already controlled' })
return
}
// Allow LAN peers (different machineId) to attach regardless of role.
const isLanPeer = msg.meta?.machineId && msg.meta.machineId !== machineId
if (!isLanPeer && currentPipeState.role !== 'sub') {
reply({
type: 'attach_reject',
data: 'Only sub sessions can be attached.',
})
return
}
reply({ type: 'attach_accept' })
const clients = Array.from((server as any).clients as Set<any>)
const masterSocket = clients[clients.length - 1]
pp().setPipeRelay((relayMsg: any) => {
if (masterSocket && !masterSocket.destroyed) {
relayMsg.from = relayMsg.from ?? pipeName
relayMsg.ts = relayMsg.ts ?? new Date().toISOString()
masterSocket.write(JSON.stringify(relayMsg) + '\n')
}
})
store.setState((prev: any) => ({
...prev,
pipeIpc: {
...pt().getPipeIpc(prev),
role: 'sub',
displayRole: pt().getPipeDisplayRole(pt().getPipeIpc(prev)),
attachedBy: msg.from ?? 'unknown',
},
}))
})
// Handle prompts from master
server.onMessage((msg: PipeMessage, reply) => {
if (msg.type === 'prompt' && msg.data) {
const accepted = handleIncomingPrompt(msg.data)
if (accepted) {
reply({ type: 'prompt_ack', data: 'accepted' })
} else {
reply({
type: 'error',
data: 'Slave is busy and could not accept the prompt.',
})
}
}
})
// Handle permission decisions from master
server.onMessage((msg: PipeMessage, _reply) => {
if (msg.type !== 'permission_response' && msg.type !== 'permission_cancel')
return
const { resolvePipePermissionResponse, cancelPipePermissionRequest } =
require('../utils/pipePermissionRelay.js') as typeof import('../utils/pipePermissionRelay.js')
try {
const payload = msg.data ? JSON.parse(msg.data) : undefined
if (!payload?.requestId) return
if (msg.type === 'permission_response') {
resolvePipePermissionResponse(payload)
} else {
cancelPipePermissionRequest(payload.requestId, payload.reason)
}
} catch {
// Malformed — ignore
}
})
// Handle detach
server.onMessage((msg: PipeMessage, _reply) => {
if (msg.type !== 'detach') return
const { clearPendingPipePermissions } =
require('../utils/pipePermissionRelay.js') as typeof import('../utils/pipePermissionRelay.js')
clearPendingPipePermissions('Pipe detached before permission was resolved.')
pp().setPipeRelay(null)
store.setState((prev: any) => ({
...prev,
pipeIpc: (() => {
const pipeIpc = pt().getPipeIpc(prev)
const nextRole = pipeIpc.subIndex != null ? 'sub' : 'main'
const nextPipeState = { ...pipeIpc, role: nextRole, attachedBy: null }
return {
...nextPipeState,
displayRole: pt().getPipeDisplayRole(nextPipeState as PipeIpcState),
}
})(),
}))
})
}
// ---------------------------------------------------------------------------
// Phase: Heartbeat
// ---------------------------------------------------------------------------
function runMainHeartbeat(
pipeName: string,
machineId: string,
store: StoreApi,
disposed: { current: boolean },
): void {
void (async () => {
try {
await pr().cleanupStaleEntries()
const aliveSubs = await pr().getAliveSubs()
refreshDiscoveredPipes(pipeName, aliveSubs, store)
const connectedSlaves = mm().getAllSlaveClients()
const aliveSubNames = new Set(aliveSubs.map(sub => sub.pipeName))
// Build unified attach target list: local subs + LAN peers
type AttachTarget = {
pipeName: string
tcpEndpoint?: { host: string; port: number }
}
const attachTargets: AttachTarget[] = aliveSubs.map(sub => ({
pipeName: sub.pipeName,
}))
// Add LAN peers as attach targets
if (feature('LAN_PIPES')) {
const beacon = lb().getLanBeacon()
if (beacon) {
const localNames = new Set(attachTargets.map(t => t.pipeName))
localNames.add(pipeName)
for (const [pName, peer] of beacon.getPeers()) {
if (!localNames.has(pName)) {
attachTargets.push({
pipeName: pName,
tcpEndpoint: { host: peer.ip, port: peer.tcpPort },
})
aliveSubNames.add(pName)
}
}
}
}
const currentPipeState = pt().getPipeIpc(store.getState())
for (const target of attachTargets) {
if (target.pipeName === pipeName) continue
if (connectedSlaves.has(target.pipeName)) continue
try {
const myName = currentPipeState.serverName ?? pipeName
const client = await pt().connectToPipe(
target.pipeName,
myName,
3000,
target.tcpEndpoint,
)
const attached = await new Promise<boolean>(resolve => {
const timeout = setTimeout(() => {
client.disconnect()
resolve(false)
}, 3000)
client.onMessage((msg: any) => {
if (msg.type === 'attach_accept') {
clearTimeout(timeout)
resolve(true)
} else if (msg.type === 'attach_reject') {
clearTimeout(timeout)
client.disconnect()
resolve(false)
}
})
client.send({
type: 'attach_request',
meta: { machineId },
})
})
if (attached && !disposed.current) {
mm().addSlaveClient(target.pipeName, client)
client.on('disconnect', () => {
removeDeadSlave(target.pipeName, store)
})
store.setState((prev: any) => ({
...prev,
pipeIpc: {
...pt().getPipeIpc(prev),
role: 'master',
displayRole: 'master',
slaves: {
...pt().getPipeIpc(prev).slaves,
[target.pipeName]: {
name: target.pipeName,
connectedAt: new Date().toISOString(),
status: 'idle',
unreadCount: 0,
history: [],
},
},
},
}))
}
} catch {
// Connection failed — skip this cycle
}
}
// Clean up slaves that are no longer alive
let lanPeerNames: Set<string> | null = null
if (feature('LAN_PIPES')) {
const beacon = lb().getLanBeacon()
if (beacon) {
lanPeerNames = new Set(beacon.getPeers().keys())
}
}
for (const [slaveName, client] of connectedSlaves.entries()) {
const inLocalRegistry = aliveSubNames.has(slaveName)
const inLanBeacon = lanPeerNames?.has(slaveName) ?? false
if (!client.connected || (!inLocalRegistry && !inLanBeacon)) {
removeDeadSlave(slaveName, store)
}
}
} catch {
// Heartbeat cycle error — non-fatal
}
})()
}
function runSubHeartbeat(
pipeName: string,
machineId: string,
entry: any,
store: StoreApi,
disposed: { current: boolean },
): void {
void (async () => {
try {
const mainAlive = await pr().isMainAlive()
if (!mainAlive && !disposed.current) {
const registry = await pr().readRegistry()
const isSameMachine = pr().isMainMachine(machineId, registry)
if (isSameMachine) {
await pr().registerAsMain(entry)
} else {
await pr().revertToIndependent(pipeName)
}
store.setState((prev: any) => ({
...prev,
pipeIpc: {
...pt().getPipeIpc(prev),
role: 'main',
subIndex: null,
displayRole: 'main',
attachedBy: null,
},
}))
pp().setPipeRelay(null)
}
} catch {
// Heartbeat check error — non-fatal
}
})()
}
// ---------------------------------------------------------------------------
// Main hook
// ---------------------------------------------------------------------------
export function usePipeIpc({
store,
handleIncomingPrompt,
}: UsePipeIpcOptions): void {
if (!feature('UDS_INBOX')) return
useEffect(() => {
const pipeName = `cli-${bs().getSessionId().slice(0, 8)}`
const disposed = { current: false }
let heartbeatTimer: ReturnType<typeof setInterval> | null = null
let heartbeatBusy = false
let pipeServer: PipeServer | null = null
void (async () => {
try {
// --- Phase 1: Role determination ---
const machId = await pr().getMachineId()
const mac = pr().getMacAddress()
const localIp = pt().getLocalIp()
const host = osm().hostname()
const roleResult = await pr().determineRole(machId)
const entry = {
id: pipeName,
pid: process.pid,
machineId: machId,
startedAt: Date.now(),
ip: localIp,
mac,
hostname: host,
pipeName,
}
let initialRole: 'main' | 'sub' = 'main'
let subIndex: number | null = null
let displayRole = 'main'
if (roleResult.role === 'main' || roleResult.role === 'main-recover') {
await pr().registerAsMain(entry)
} else {
subIndex = roleResult.subIndex
await pr().registerAsSub(entry, subIndex)
initialRole = 'sub'
displayRole = `sub-${subIndex}`
}
// --- Phase 2: Server creation ---
const server = await pt().createPipeServer(
pipeName,
feature('LAN_PIPES') ? { enableTcp: true, tcpPort: 0 } : undefined,
)
pipeServer = server
if (disposed.current) {
await server.close()
await pr().unregister(pipeName)
return
}
// --- Phase 3: LAN beacon ---
if (feature('LAN_PIPES') && server.tcpAddress) {
const beacon = new (lb().LanBeacon)({
pipeName,
machineId: machId,
hostname: host,
ip: localIp,
tcpPort: server.tcpAddress.port,
role: initialRole,
})
beacon.start()
lb().setLanBeacon(beacon)
const entryWithTcp = {
...entry,
tcpPort: server.tcpAddress.port,
lanVisible: true,
}
if (initialRole === 'main') {
await pr().registerAsMain(entryWithTcp)
} else if (subIndex != null) {
await pr().registerAsSub(entryWithTcp, subIndex)
}
}
// Update store
store.setState((prev: any) => ({
...prev,
pipeIpc: {
...pt().getPipeIpc(prev),
serverName: pipeName,
role: initialRole,
subIndex,
displayRole,
localIp,
hostname: host,
machineId: machId,
mac,
},
}))
// --- Phase 4: Message handlers ---
registerMessageHandlers(
server,
pipeName,
machId,
store,
handleIncomingPrompt,
)
// --- Phase 5: Heartbeat ---
const HEARTBEAT_INTERVAL_MS = 5000
heartbeatTimer = setInterval(() => {
if (disposed.current || heartbeatBusy) return
heartbeatBusy = true
const currentPipeState = pt().getPipeIpc(store.getState())
if (
currentPipeState.role === 'main' ||
currentPipeState.role === 'master'
) {
runMainHeartbeat(pipeName, machId, store, disposed)
} else if (currentPipeState.role === 'sub') {
runSubHeartbeat(pipeName, machId, entry, store, disposed)
}
// Reset busy flag after a short delay to allow the async work to settle
setTimeout(() => {
heartbeatBusy = false
}, 4000)
}, HEARTBEAT_INTERVAL_MS)
} catch {
// PipeServer creation failed — non-fatal
}
})()
// --- Phase 6: Cleanup ---
return () => {
disposed.current = true
if (heartbeatTimer) {
clearInterval(heartbeatTimer)
heartbeatTimer = null
}
// Send detach to all slaves
const allClients = mm().getAllSlaveClients()
for (const [name, client] of allClients.entries()) {
try {
client.send({ type: 'detach' })
} catch {}
client.disconnect()
removeDeadSlave(name, store)
}
// Stop LAN beacon
const beacon = lb().getLanBeacon()
if (beacon) {
try {
beacon.stop()
} catch {}
lb().setLanBeacon(null)
}
// Unregister + close server
void pr()
.unregister(pipeName)
.catch(() => {})
if (pipeServer) {
void pipeServer.close().catch(() => {})
pipeServer = null
}
pp().setPipeRelay(null)
}
}, []) // eslint-disable-line react-hooks/exhaustive-deps
}

View File

@@ -0,0 +1,195 @@
/**
* usePipePermissionForward — Forward slave permission requests to master UI.
*
* Subscribes to slave pipe messages via subscribePipeEntries, and:
* 1. permission_request → enqueue into toolUseConfirmQueue for master approval
* 2. permission_cancel → remove from queue
* 3. stream/error/done → display as system messages
*/
import { feature } from 'bun:bundle'
import { useEffect } from 'react'
import type { Tool, ToolUseContext } from '../Tool.js'
import type { MessageType } from '../types/message.js'
type Deps = {
store: { getState: () => any }
tools: Tool<any, any>[]
setMessages: (action: React.SetStateAction<MessageType[]>) => void
setToolUseConfirmQueue: (action: React.SetStateAction<any[]>) => void
getToolUseContext: (...args: any[]) => ToolUseContext
mainLoopModel: string
}
export function usePipePermissionForward({
store,
tools,
setMessages,
setToolUseConfirmQueue,
getToolUseContext,
mainLoopModel,
}: Deps): void {
useEffect(() => {
if (!feature('UDS_INBOX')) return
/* eslint-disable @typescript-eslint/no-require-imports */
const { subscribePipeEntries, getSlaveClient } =
require('./useMasterMonitor.js') as typeof import('./useMasterMonitor.js')
const { getPipeIpc } =
require('../utils/pipeTransport.js') as typeof import('../utils/pipeTransport.js')
const { createAssistantMessage, createSystemMessage } =
require('../utils/messages.js') as typeof import('../utils/messages.js')
/* eslint-enable @typescript-eslint/no-require-imports */
return subscribePipeEntries(
(pipeName: string, entry: { type: string; content: string }) => {
const content = entry.content.trim()
const pipeIpcState = getPipeIpc(store.getState())
const peerInfo = (pipeIpcState.discoveredPipes ?? []).find(
(pipe: { pipeName: string }) => pipe.pipeName === pipeName,
)
const isLan = peerInfo?.ip && peerInfo.ip !== pipeIpcState.localIp
const displayRole = peerInfo
? isLan
? `${peerInfo.role} ${peerInfo.hostname}/${peerInfo.ip}`
: peerInfo.role
: pipeName
if (entry.type === 'permission_request') {
try {
const payload = JSON.parse(content)
const tool = tools.find(
candidate => candidate.name === payload.toolName,
)
const client = getSlaveClient(pipeName)
if (!client) return
if (!tool) {
client.send({
type: 'permission_response',
data: JSON.stringify({
requestId: payload.requestId,
behavior: 'deny',
feedback: `Tool "${payload.toolName}" is not available in main.`,
}),
})
return
}
const assistantMessage = createAssistantMessage({ content: '' })
const toolUseContext = getToolUseContext(
[],
[],
new AbortController(),
mainLoopModel,
)
setToolUseConfirmQueue((queue: any[]) => [
...queue,
{
assistantMessage,
tool,
description: payload.description,
input: payload.input,
toolUseContext,
toolUseID: `pipe:${payload.requestId}`,
permissionResult: payload.permissionResult,
permissionPromptStartTimeMs:
payload.permissionPromptStartTimeMs,
workerBadge: {
name: `${displayRole} / ${pipeName}`,
color: 'cyan',
},
onUserInteraction() {},
onAbort() {
client.send({
type: 'permission_response',
data: JSON.stringify({
requestId: payload.requestId,
behavior: 'deny',
feedback: 'Permission request was aborted in main.',
}),
})
},
onAllow(
updatedInput: any,
permissionUpdates: any,
feedback: any,
contentBlocks: any,
) {
client.send({
type: 'permission_response',
data: JSON.stringify({
requestId: payload.requestId,
behavior: 'allow',
updatedInput,
permissionUpdates,
feedback,
contentBlocks,
}),
})
},
onReject(feedback: any, contentBlocks: any) {
client.send({
type: 'permission_response',
data: JSON.stringify({
requestId: payload.requestId,
behavior: 'deny',
feedback,
contentBlocks,
}),
})
},
async recheckPermission() {},
},
])
} catch {
// Malformed permission request — ignore
}
return
}
if (entry.type === 'permission_cancel') {
try {
const payload = JSON.parse(content)
setToolUseConfirmQueue((queue: any[]) =>
queue.filter(
(item: any) => item.toolUseID !== `pipe:${payload.requestId}`,
),
)
} catch {
// Malformed — ignore
}
return
}
let message: any = null
if (entry.type === 'stream' && content) {
message = createSystemMessage(
`[${displayRole} / ${pipeName}] ${content}`,
'warning',
)
} else if (entry.type === 'error') {
message = createSystemMessage(
`[${displayRole} / ${pipeName}] Error: ${content || 'no output'}`,
'error',
)
} else if (entry.type === 'done') {
message = createSystemMessage(
`[${displayRole} / ${pipeName}] Completed`,
'warning',
)
}
if (message) {
setMessages((prev: MessageType[]) => [...prev, message])
}
},
)
}, [
getToolUseContext,
mainLoopModel,
setMessages,
setToolUseConfirmQueue,
store,
tools,
])
}

39
src/hooks/usePipeRelay.ts Normal file
View File

@@ -0,0 +1,39 @@
/**
* usePipeRelay — Pipe message relay utilities for slave → master communication.
*
* Provides `relayPipeMessage` and `pipeReturnHadErrorRef` for use in
* onQuery callbacks. The relay function reads from the module-level
* `getPipeRelay()` singleton set by usePipeIpc's attach handler.
*/
import { useRef, useCallback } from 'react'
import { getPipeRelay } from '../utils/pipePermissionRelay.js'
import type { PipeMessage } from '../utils/pipeTransport.js'
export type PipeRelayHandle = {
/** Send a relay message to the master. Returns false if no relay is active. */
relayPipeMessage: (message: PipeMessage) => boolean
/** Tracks whether an error was already relayed for this query turn. */
pipeReturnHadErrorRef: React.MutableRefObject<boolean>
}
/**
* Hook that provides pipe relay utilities. Safe to call unconditionally —
* when UDS_INBOX is off, the relay function is a no-op that returns false.
*/
export function usePipeRelay(): PipeRelayHandle {
const pipeReturnHadErrorRef = useRef(false)
const relayPipeMessage = useCallback(
(message: PipeMessage): boolean => {
const relay = getPipeRelay()
if (typeof relay !== 'function') {
return false
}
relay(message)
return true
},
[],
)
return { relayPipeMessage, pipeReturnHadErrorRef }
}

151
src/hooks/usePipeRouter.ts Normal file
View File

@@ -0,0 +1,151 @@
/**
* usePipeRouter — Route user input to selected pipe targets.
*
* Returns `routeToSelectedPipes(input)` which checks selectedPipes +
* routeMode and sends the prompt to all connected slave targets.
* Returns true if routed (caller should skip local execution).
*/
import { feature } from 'bun:bundle'
import { useCallback } from 'react'
type StoreApi = { getState: () => any }
type SetAppState = (updater: (prev: any) => any) => void
type AddNotification = (opts: {
key: string
text: string
color: string
priority: string
timeoutMs: number
}) => void
type Deps = {
store: StoreApi
setAppState: SetAppState
addNotification: AddNotification
}
/**
* Attempt to route user input to selected pipes.
* Returns true if routed to at least one pipe (skip local execution).
*/
export function usePipeRouter({ store, setAppState, addNotification }: Deps): {
routeToSelectedPipes: (input: string) => boolean
} {
const routeToSelectedPipes = useCallback(
(input: string): boolean => {
if (!feature('UDS_INBOX')) return false
if (!input.trim() || input.trim().startsWith('/')) return false
/* eslint-disable @typescript-eslint/no-require-imports */
const pipeState = (store.getState() as any).pipeIpc
const selectedPipes: string[] = pipeState?.selectedPipes ?? []
const routeMode: 'selected' | 'local' = pipeState?.routeMode ?? 'selected'
if (selectedPipes.length === 0 || routeMode === 'local') return false
const { getConnectedSlaveTargets } =
require('./useMasterMonitor.js') as typeof import('./useMasterMonitor.js')
const { getPipeIpc } =
require('../utils/pipeTransport.js') as typeof import('../utils/pipeTransport.js')
/* eslint-enable @typescript-eslint/no-require-imports */
const targets = getConnectedSlaveTargets(selectedPipes)
const pipeIpcForDisplay = getPipeIpc(store.getState())
const discovered: Array<{
pipeName: string
role: string
ip: string
hostname: string
}> = pipeIpcForDisplay.discoveredPipes ?? []
const sentTargetNames: string[] = []
const sentTargetLabels: string[] = []
const failedTargetNames: string[] = []
for (const { name, client } of targets) {
try {
client.send({ type: 'prompt', data: input.trim() })
sentTargetNames.push(name)
// Build display label: [role] hostname/ip for LAN, [role] for local
const info = discovered.find((d: any) => d.pipeName === name)
if (info) {
const isLan = info.ip && info.ip !== pipeIpcForDisplay.localIp
sentTargetLabels.push(
isLan
? `[${info.role}] ${info.hostname}/${info.ip}`
: `[${info.role}]`,
)
} else {
sentTargetLabels.push(name)
}
} catch {
failedTargetNames.push(name)
}
}
if (sentTargetNames.length > 0) {
const promptText = input.trim()
const promptTimestamp = new Date().toISOString()
setAppState((prev: any) => {
const pipeIpc = getPipeIpc(prev)
const nextSlaves = { ...pipeIpc.slaves }
for (const name of sentTargetNames) {
const slave = nextSlaves[name]
if (!slave) continue
nextSlaves[name] = {
...slave,
status: 'busy',
lastActivityAt: promptTimestamp,
lastSummary: `Queued: ${promptText}`,
lastEventType: 'prompt',
history: [
...slave.history,
{
type: 'prompt',
content: promptText,
from: pipeIpc.serverName ?? 'master',
timestamp: promptTimestamp,
},
],
}
}
return {
...prev,
pipeIpc: { ...pipeIpc, slaves: nextSlaves },
}
})
addNotification({
key: 'pipe-route-success',
text: `Routed to ${sentTargetLabels.join(', ')}; main can continue other tasks`,
color: 'success',
priority: 'immediate',
timeoutMs: 3000,
})
} else {
addNotification({
key: 'pipe-route-fallback',
text: 'Selected pipes are unavailable; processing locally.',
color: 'warning',
priority: 'immediate',
timeoutMs: 4000,
})
}
if (failedTargetNames.length > 0) {
addNotification({
key: 'pipe-route-partial-failure',
text: `Failed to send to: ${failedTargetNames.join(', ')}`,
color: 'warning',
priority: 'immediate',
timeoutMs: 4000,
})
}
return sentTargetNames.length > 0
},
[store, setAppState, addNotification],
)
return { routeToSelectedPipes }
}

View File

@@ -0,0 +1,122 @@
/**
* useSlaveNotifications — Real-time toast notifications for slave CLI events
*
* When role === 'master', watches slave session history for key events
* and shows toast notifications in the master CLI footer.
*/
import { useEffect, useRef } from 'react'
import { useNotifications } from '../context/notifications.js'
import { useAppState } from '../state/AppState.js'
import { getPipeIpc } from '../utils/pipeTransport.js'
import type { SessionEntry } from './useMasterMonitor.js'
import type { Notification } from '../context/notifications.js'
function foldSlaveNotif(
acc: Notification,
_incoming: Notification,
): Notification {
if (!('text' in acc)) return acc
const match = acc.text.match(/\((\d+)\)$/)
const count = match ? parseInt(match[1], 10) + 1 : 2
const base = acc.text.replace(/\s*\(\d+\)$/, '')
return {
...acc,
text: `${base} (${count})`,
fold: foldSlaveNotif,
}
}
function truncate(s: string, max: number): string {
if (s.length <= max) return s
return s.slice(0, max) + '…'
}
export function useSlaveNotifications(): void {
const role = useAppState(s => getPipeIpc(s).role)
const slaves = useAppState(s => getPipeIpc(s).slaves)
const { addNotification } = useNotifications()
const lastSeenRef = useRef<Record<string, number>>({})
useEffect(() => {
if (role !== 'master') return
for (const [name, slave] of Object.entries(slaves)) {
const lastSeen = lastSeenRef.current[name] ?? 0
const newEntries = slave.history.slice(lastSeen)
lastSeenRef.current[name] = slave.history.length
for (const entry of newEntries) {
const notification = makeNotification(name, entry)
if (notification) {
addNotification(notification)
}
}
}
for (const name of Object.keys(lastSeenRef.current)) {
if (!(name in slaves)) {
delete lastSeenRef.current[name]
}
}
}, [addNotification, role, slaves])
}
function makeNotification(
slaveName: string,
entry: SessionEntry,
): Notification | null {
const shortName =
slaveName.length > 16 ? `${slaveName.slice(0, 16)}` : slaveName
switch (entry.type) {
case 'prompt_ack':
return {
key: `slave-ack-${slaveName}`,
text: `[${shortName}] ✓ 已接收任务`,
priority: 'low',
timeoutMs: 2500,
fold: foldSlaveNotif,
}
case 'done':
return {
key: `slave-done-${slaveName}`,
text: `[${shortName}] ✓ 任务完成`,
priority: 'medium',
timeoutMs: 5000,
fold: foldSlaveNotif,
}
case 'error':
return {
key: `slave-error-${slaveName}`,
text: `[${shortName}] ✗ 错误: ${truncate(entry.content, 60)}`,
color: 'error',
priority: 'high',
timeoutMs: 8000,
}
case 'tool_start':
return {
key: `slave-tool-${slaveName}`,
text: `[${shortName}] 工具: ${truncate(entry.content, 40)}`,
priority: 'low',
timeoutMs: 3000,
fold: foldSlaveNotif,
}
case 'prompt':
return {
key: `slave-prompt-${slaveName}`,
text: `[${shortName}] ▶ 开始处理: ${truncate(entry.content, 50)}`,
priority: 'medium',
timeoutMs: 4000,
}
case 'stream':
case 'tool_result':
default:
return null
}
}