style: 完成所有文件的lint

This commit is contained in:
claude-code-best
2026-05-01 21:39:30 +08:00
parent d136872cc9
commit 6182015005
1333 changed files with 68255 additions and 77882 deletions

View File

@@ -1,72 +1,75 @@
import type { WSContext } from "hono/ws";
import {
findAcpConnectionByAgentId,
sendToAgentWs,
} from "./acp-ws-handler";
import { getAcpEventBus } from "./event-bus";
import type { SessionEvent } from "./event-bus";
import { log, error as logError } from "../logger";
import type { WSContext } from 'hono/ws'
import { findAcpConnectionByAgentId, sendToAgentWs } from './acp-ws-handler'
import { getAcpEventBus } from './event-bus'
import type { SessionEvent } from './event-bus'
import { log, error as logError } from '../logger'
// Per-relay connection state
interface RelayConnectionEntry {
agentId: string;
unsub: (() => void) | null;
keepalive: ReturnType<typeof setInterval> | null;
ws: WSContext;
openTime: number;
agentId: string
unsub: (() => void) | null
keepalive: ReturnType<typeof setInterval> | null
ws: WSContext
openTime: number
}
const relayConnections = new Map<string, RelayConnectionEntry>(); // key: relayWsId
const relayConnections = new Map<string, RelayConnectionEntry>() // key: relayWsId
const RELAY_KEEPALIVE_INTERVAL_MS = 20_000;
const RELAY_KEEPALIVE_INTERVAL_MS = 20_000
/** Send a JSON message to relay WS */
function sendToRelayWs(ws: WSContext, msg: object): void {
if (ws.readyState !== 1) return;
if (ws.readyState !== 1) return
try {
ws.send(JSON.stringify(msg));
ws.send(JSON.stringify(msg))
} catch (err) {
logError("[ACP-Relay] send error:", err);
logError('[ACP-Relay] send error:', err)
}
}
/** Called from onOpen — finds target agent and bridges connection */
export function handleRelayOpen(ws: WSContext, relayWsId: string, agentId: string): void {
log(`[ACP-Relay] Relay connection opened: relayWsId=${relayWsId} agentId=${agentId}`);
export function handleRelayOpen(
ws: WSContext,
relayWsId: string,
agentId: string,
): void {
log(
`[ACP-Relay] Relay connection opened: relayWsId=${relayWsId} agentId=${agentId}`,
)
// Check if agent is online
const agentConn = findAcpConnectionByAgentId(agentId);
const agentConn = findAcpConnectionByAgentId(agentId)
if (!agentConn) {
log(`[ACP-Relay] Agent ${agentId} not found or offline`);
sendToRelayWs(ws, { type: "error", message: "Agent not found or offline" });
ws.close(4004, "agent not found");
return;
log(`[ACP-Relay] Agent ${agentId} not found or offline`)
sendToRelayWs(ws, { type: 'error', message: 'Agent not found or offline' })
ws.close(4004, 'agent not found')
return
}
// Keepalive interval
const keepalive = setInterval(() => {
const entry = relayConnections.get(relayWsId);
const entry = relayConnections.get(relayWsId)
if (!entry || entry.ws.readyState !== 1) {
clearInterval(keepalive);
return;
clearInterval(keepalive)
return
}
sendToRelayWs(entry.ws, { type: "keep_alive" });
}, RELAY_KEEPALIVE_INTERVAL_MS);
sendToRelayWs(entry.ws, { type: 'keep_alive' })
}, RELAY_KEEPALIVE_INTERVAL_MS)
// Subscribe to channel group EventBus — forward agent responses to frontend
const channelGroupId = agentConn.channelGroupId;
const bus = getAcpEventBus(channelGroupId);
const channelGroupId = agentConn.channelGroupId
const bus = getAcpEventBus(channelGroupId)
const unsub = bus.subscribe((event: SessionEvent) => {
if (ws.readyState !== 1) return;
if (event.direction !== "inbound") return;
if (ws.readyState !== 1) return
if (event.direction !== 'inbound') return
// Handle agent disconnect specially: send status to frontend
if (event.type === "agent_disconnect") {
sendToRelayWs(ws, { type: "status", payload: { connected: false } });
return;
if (event.type === 'agent_disconnect') {
sendToRelayWs(ws, { type: 'status', payload: { connected: false } })
return
}
// Forward agent responses to the frontend WebSocket
sendToRelayWs(ws, event.payload as object);
});
sendToRelayWs(ws, event.payload as object)
})
relayConnections.set(relayWsId, {
agentId,
@@ -74,7 +77,7 @@ export function handleRelayOpen(ws: WSContext, relayWsId: string, agentId: strin
keepalive,
ws,
openTime: Date.now(),
});
})
// Don't send a synthetic status message here!
// The frontend sends a "connect" command, which acp-link processes
@@ -82,70 +85,83 @@ export function handleRelayOpen(ws: WSContext, relayWsId: string, agentId: strin
// Sending a fake status would make the frontend think it's connected
// before the agent process is actually ready.
log(`[ACP-Relay] Relay established: relayWsId=${relayWsId} → agentId=${agentId}`);
log(
`[ACP-Relay] Relay established: relayWsId=${relayWsId} → agentId=${agentId}`,
)
}
/** Called from onMessage — forwards frontend messages to acp-link */
export function handleRelayMessage(ws: WSContext, relayWsId: string, data: string): void {
const entry = relayConnections.get(relayWsId);
if (!entry) return;
export function handleRelayMessage(
ws: WSContext,
relayWsId: string,
data: string,
): void {
const entry = relayConnections.get(relayWsId)
if (!entry) return
const lines = data.split("\n").filter((l) => l.trim());
const lines = data.split('\n').filter(l => l.trim())
for (const line of lines) {
let msg: Record<string, unknown>;
let msg: Record<string, unknown>
try {
msg = JSON.parse(line);
msg = JSON.parse(line)
} catch {
logError("[ACP-Relay] parse error:", line);
continue;
logError('[ACP-Relay] parse error:', line)
continue
}
// Ignore keepalive responses
if (msg.type === "keep_alive") continue;
if (msg.type === 'keep_alive') continue
// Forward to acp-link agent
const sent = sendToAgentWs(entry.agentId, msg);
const sent = sendToAgentWs(entry.agentId, msg)
if (!sent) {
sendToRelayWs(ws, { type: "error", message: "Agent connection lost" });
return;
sendToRelayWs(ws, { type: 'error', message: 'Agent connection lost' })
return
}
}
}
/** Called from onClose — cleans up relay connection */
export function handleRelayClose(ws: WSContext, relayWsId: string, code?: number, reason?: string): void {
const entry = relayConnections.get(relayWsId);
if (!entry) return;
export function handleRelayClose(
ws: WSContext,
relayWsId: string,
code?: number,
reason?: string,
): void {
const entry = relayConnections.get(relayWsId)
if (!entry) return
const duration = Math.round((Date.now() - entry.openTime) / 1000);
log(`[ACP-Relay] Connection closed: relayWsId=${relayWsId} agentId=${entry.agentId} code=${code ?? "none"} reason=${reason || "(none)"} duration=${duration}s`);
const duration = Math.round((Date.now() - entry.openTime) / 1000)
log(
`[ACP-Relay] Connection closed: relayWsId=${relayWsId} agentId=${entry.agentId} code=${code ?? 'none'} reason=${reason || '(none)'} duration=${duration}s`,
)
if (entry.unsub) {
entry.unsub();
entry.unsub()
}
if (entry.keepalive) {
clearInterval(entry.keepalive);
clearInterval(entry.keepalive)
}
relayConnections.delete(relayWsId);
relayConnections.delete(relayWsId)
}
/** Close all relay connections (for graceful shutdown) */
export function closeAllRelayConnections(): void {
if (relayConnections.size === 0) return;
if (relayConnections.size === 0) return
log(`[ACP-Relay] Closing ${relayConnections.size} relay connection(s)...`);
log(`[ACP-Relay] Closing ${relayConnections.size} relay connection(s)...`)
for (const [relayWsId, entry] of relayConnections) {
try {
if (entry.unsub) entry.unsub();
if (entry.keepalive) clearInterval(entry.keepalive);
if (entry.unsub) entry.unsub()
if (entry.keepalive) clearInterval(entry.keepalive)
if (entry.ws.readyState === 1) {
entry.ws.close(1001, "server_shutdown");
entry.ws.close(1001, 'server_shutdown')
}
} catch {
// ignore errors during shutdown
}
}
relayConnections.clear();
log("[ACP-Relay] All relay connections closed");
relayConnections.clear()
log('[ACP-Relay] All relay connections closed')
}

View File

@@ -1,19 +1,23 @@
import { log } from "../logger";
import type { Context } from "hono";
import type { SessionEvent } from "./event-bus";
import { getAcpEventBus } from "./event-bus";
import { log } from '../logger'
import type { Context } from 'hono'
import type { SessionEvent } from './event-bus'
import { getAcpEventBus } from './event-bus'
/** Create SSE response stream for an ACP channel group */
export function createAcpSSEStream(c: Context, channelGroupId: string, fromSeqNum = 0) {
const bus = getAcpEventBus(channelGroupId);
export function createAcpSSEStream(
c: Context,
channelGroupId: string,
fromSeqNum = 0,
) {
const bus = getAcpEventBus(channelGroupId)
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
const encoder = new TextEncoder()
// Send historical events if reconnecting
if (fromSeqNum > 0) {
const missed = bus.getEventsSince(fromSeqNum);
const missed = bus.getEventsSince(fromSeqNum)
for (const event of missed) {
const data = JSON.stringify({
type: event.type,
@@ -21,60 +25,70 @@ export function createAcpSSEStream(c: Context, channelGroupId: string, fromSeqNu
direction: event.direction,
seqNum: event.seqNum,
channel_group_id: channelGroupId,
});
controller.enqueue(encoder.encode(`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`));
})
controller.enqueue(
encoder.encode(
`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`,
),
)
}
}
// Send initial keepalive
controller.enqueue(encoder.encode(": keepalive\n\n"));
controller.enqueue(encoder.encode(': keepalive\n\n'))
// Subscribe to new events
const unsub = bus.subscribe((event) => {
const unsub = bus.subscribe(event => {
const data = JSON.stringify({
type: event.type,
payload: event.payload,
direction: event.direction,
seqNum: event.seqNum,
channel_group_id: channelGroupId,
});
})
try {
log(`[ACP-SSE] -> subscriber: channelGroup=${channelGroupId} type=${event.type} seq=${event.seqNum}`);
controller.enqueue(encoder.encode(`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`));
log(
`[ACP-SSE] -> subscriber: channelGroup=${channelGroupId} type=${event.type} seq=${event.seqNum}`,
)
controller.enqueue(
encoder.encode(
`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`,
),
)
} catch {
unsub();
unsub()
}
});
})
// Keepalive interval
const keepalive = setInterval(() => {
try {
controller.enqueue(encoder.encode(": keepalive\n\n"));
controller.enqueue(encoder.encode(': keepalive\n\n'))
} catch {
clearInterval(keepalive);
unsub();
clearInterval(keepalive)
unsub()
}
}, 15000);
}, 15000)
// Cleanup on abort
c.req.raw.signal.addEventListener("abort", () => {
unsub();
clearInterval(keepalive);
c.req.raw.signal.addEventListener('abort', () => {
unsub()
clearInterval(keepalive)
try {
controller.close();
controller.close()
} catch {
// already closed
}
});
})
},
});
})
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
},
});
})
}

View File

@@ -1,313 +1,343 @@
import type { WSContext } from "hono/ws";
import { randomUUID } from "node:crypto";
import { getAcpEventBus } from "./event-bus";
import type { SessionEvent } from "./event-bus";
import type { WSContext } from 'hono/ws'
import { randomUUID } from 'node:crypto'
import { getAcpEventBus } from './event-bus'
import type { SessionEvent } from './event-bus'
import {
storeCreateEnvironment,
storeGetEnvironment,
storeMarkAcpAgentOffline,
storeMarkAcpAgentOnline,
storeUpdateEnvironment,
} from "../store";
import { config } from "../config";
import { log, error as logError } from "../logger";
} from '../store'
import { config } from '../config'
import { log, error as logError } from '../logger'
// Per-connection state
interface AcpConnectionEntry {
agentId: string | null; // Set after register message
channelGroupId: string;
unsub: (() => void) | null;
keepalive: ReturnType<typeof setInterval> | null;
ws: WSContext;
openTime: number;
lastClientActivity: number;
capabilities: Record<string, unknown> | null;
agentId: string | null // Set after register message
channelGroupId: string
unsub: (() => void) | null
keepalive: ReturnType<typeof setInterval> | null
ws: WSContext
openTime: number
lastClientActivity: number
capabilities: Record<string, unknown> | null
}
const connections = new Map<string, AcpConnectionEntry>(); // key: wsId
const connections = new Map<string, AcpConnectionEntry>() // key: wsId
const SERVER_KEEPALIVE_INTERVAL_MS = config.wsKeepaliveInterval * 1000;
const CLIENT_ACTIVITY_TIMEOUT_MS = SERVER_KEEPALIVE_INTERVAL_MS * 3;
const SERVER_KEEPALIVE_INTERVAL_MS = config.wsKeepaliveInterval * 1000
const CLIENT_ACTIVITY_TIMEOUT_MS = SERVER_KEEPALIVE_INTERVAL_MS * 3
/** Send a JSON message to a WS connection (NDJSON format) */
function sendToWs(ws: WSContext, msg: object): void {
if (ws.readyState !== 1) return;
if (ws.readyState !== 1) return
try {
ws.send(JSON.stringify(msg) + "\n");
ws.send(JSON.stringify(msg) + '\n')
} catch (err) {
logError("[ACP-WS] send error:", err);
logError('[ACP-WS] send error:', err)
}
}
/** Called from onOpen — initializes connection tracking */
export function handleAcpWsOpen(ws: WSContext, wsId: string): void {
log(`[ACP-WS] Connection opened: wsId=${wsId}`);
log(`[ACP-WS] Connection opened: wsId=${wsId}`)
const keepalive = setInterval(() => {
const entry = connections.get(wsId);
const entry = connections.get(wsId)
if (!entry || entry.ws.readyState !== 1) {
clearInterval(keepalive);
return;
clearInterval(keepalive)
return
}
const silenceMs = Date.now() - entry.lastClientActivity;
const silenceMs = Date.now() - entry.lastClientActivity
if (silenceMs > CLIENT_ACTIVITY_TIMEOUT_MS) {
log(`[ACP-WS] Client inactive for ${Math.round(silenceMs / 1000)}s, closing dead connection`);
log(
`[ACP-WS] Client inactive for ${Math.round(silenceMs / 1000)}s, closing dead connection`,
)
try {
entry.ws.close(1000, "client inactive");
entry.ws.close(1000, 'client inactive')
} catch {
clearInterval(keepalive);
clearInterval(keepalive)
}
return;
return
}
sendToWs(entry.ws, { type: "keep_alive" });
}, SERVER_KEEPALIVE_INTERVAL_MS);
sendToWs(entry.ws, { type: 'keep_alive' })
}, SERVER_KEEPALIVE_INTERVAL_MS)
connections.set(wsId, {
agentId: null,
channelGroupId: "",
channelGroupId: '',
unsub: null,
keepalive,
ws,
openTime: Date.now(),
lastClientActivity: Date.now(),
capabilities: null,
});
})
}
/** Handle register message — legacy WS-only registration (still supported) */
function handleRegister(wsId: string, msg: Record<string, unknown>): void {
const entry = connections.get(wsId);
if (!entry) return;
const entry = connections.get(wsId)
if (!entry) return
if (entry.agentId) {
sendToWs(entry.ws, { type: "error", message: "Already registered" });
return;
sendToWs(entry.ws, { type: 'error', message: 'Already registered' })
return
}
const agentName = (msg.agent_name as string) || "unknown";
const capabilities = msg.capabilities as Record<string, unknown> | undefined;
const channelGroupId = (msg.channel_group_id as string) || `group_${randomUUID().replace(/-/g, "").slice(0, 12)}`;
const acpLinkVersion = (msg.acp_link_version as string) || null;
const maxSessions = typeof msg.max_sessions === "number" ? msg.max_sessions : 1;
const agentName = (msg.agent_name as string) || 'unknown'
const capabilities = msg.capabilities as Record<string, unknown> | undefined
const channelGroupId =
(msg.channel_group_id as string) ||
`group_${randomUUID().replace(/-/g, '').slice(0, 12)}`
const acpLinkVersion = (msg.acp_link_version as string) || null
const maxSessions =
typeof msg.max_sessions === 'number' ? msg.max_sessions : 1
// Create EnvironmentRecord with workerType="acp"
const secret = config.apiKeys[0] || "";
const secret = config.apiKeys[0] || ''
const record = storeCreateEnvironment({
secret,
machineName: agentName,
workerType: "acp",
workerType: 'acp',
bridgeId: channelGroupId,
maxSessions,
capabilities: capabilities || undefined,
} as Parameters<typeof storeCreateEnvironment>[0]);
} as Parameters<typeof storeCreateEnvironment>[0])
// Store ACP-specific metadata via environment update
storeUpdateEnvironment(record.id, {
status: "active",
} as Parameters<typeof storeUpdateEnvironment>[1]);
status: 'active',
} as Parameters<typeof storeUpdateEnvironment>[1])
entry.agentId = record.id;
entry.channelGroupId = channelGroupId;
entry.capabilities = capabilities || null;
entry.agentId = record.id
entry.channelGroupId = channelGroupId
entry.capabilities = capabilities || null
// Subscribe to channel group EventBus — broadcast events to this WS
const bus = getAcpEventBus(channelGroupId);
const bus = getAcpEventBus(channelGroupId)
const unsub = bus.subscribe((event: SessionEvent) => {
if (entry.ws.readyState !== 1) return;
if (event.direction !== "outbound") return;
if (entry.ws.readyState !== 1) return
if (event.direction !== 'outbound') return
// Forward outbound events as raw ACP messages
sendToWs(entry.ws, event.payload as object);
});
entry.unsub = unsub;
sendToWs(entry.ws, event.payload as object)
})
entry.unsub = unsub
log(`[ACP-WS] Agent registered (legacy WS): agentId=${record.id} channelGroup=${channelGroupId} name=${agentName}`);
log(
`[ACP-WS] Agent registered (legacy WS): agentId=${record.id} channelGroup=${channelGroupId} name=${agentName}`,
)
sendToWs(entry.ws, {
type: "registered",
type: 'registered',
agent_id: record.id,
channel_group_id: channelGroupId,
});
})
}
/** Handle identify message — binds WS to an existing agent registered via REST */
function handleIdentify(wsId: string, msg: Record<string, unknown>): void {
const entry = connections.get(wsId);
if (!entry) return;
const entry = connections.get(wsId)
if (!entry) return
if (entry.agentId) {
sendToWs(entry.ws, { type: "error", message: "Already identified" });
return;
sendToWs(entry.ws, { type: 'error', message: 'Already identified' })
return
}
const agentId = msg.agent_id as string;
const agentId = msg.agent_id as string
if (!agentId) {
sendToWs(entry.ws, { type: "error", message: "Missing agent_id" });
return;
sendToWs(entry.ws, { type: 'error', message: 'Missing agent_id' })
return
}
// Look up the environment record (created via REST registration)
const record = storeGetEnvironment(agentId);
if (!record || record.workerType !== "acp") {
sendToWs(entry.ws, { type: "error", message: "Agent not found" });
return;
const record = storeGetEnvironment(agentId)
if (!record || record.workerType !== 'acp') {
sendToWs(entry.ws, { type: 'error', message: 'Agent not found' })
return
}
// Update status to active
storeMarkAcpAgentOnline(agentId);
storeMarkAcpAgentOnline(agentId)
const channelGroupId = record.bridgeId || `group_${randomUUID().replace(/-/g, "").slice(0, 12)}`;
const channelGroupId =
record.bridgeId || `group_${randomUUID().replace(/-/g, '').slice(0, 12)}`
entry.agentId = record.id;
entry.channelGroupId = channelGroupId;
entry.capabilities = record.capabilities || null;
entry.agentId = record.id
entry.channelGroupId = channelGroupId
entry.capabilities = record.capabilities || null
// Subscribe to channel group EventBus — broadcast events to this WS
const bus = getAcpEventBus(channelGroupId);
const bus = getAcpEventBus(channelGroupId)
const unsub = bus.subscribe((event: SessionEvent) => {
if (entry.ws.readyState !== 1) return;
if (event.direction !== "outbound") return;
sendToWs(entry.ws, event.payload as object);
});
entry.unsub = unsub;
if (entry.ws.readyState !== 1) return
if (event.direction !== 'outbound') return
sendToWs(entry.ws, event.payload as object)
})
entry.unsub = unsub
log(`[ACP-WS] Agent identified (REST+WS): agentId=${record.id} channelGroup=${channelGroupId}`);
log(
`[ACP-WS] Agent identified (REST+WS): agentId=${record.id} channelGroup=${channelGroupId}`,
)
sendToWs(entry.ws, {
type: "identified",
type: 'identified',
agent_id: record.id,
channel_group_id: channelGroupId,
});
})
}
/** Called from onMessage — processes NDJSON lines */
export function handleAcpWsMessage(ws: WSContext, wsId: string, data: string): void {
const entry = connections.get(wsId);
if (!entry) return;
export function handleAcpWsMessage(
ws: WSContext,
wsId: string,
data: string,
): void {
const entry = connections.get(wsId)
if (!entry) return
entry.lastClientActivity = Date.now();
entry.lastClientActivity = Date.now()
const lines = data.split("\n").filter((l) => l.trim());
const lines = data.split('\n').filter(l => l.trim())
for (const line of lines) {
let msg: Record<string, unknown>;
let msg: Record<string, unknown>
try {
msg = JSON.parse(line);
msg = JSON.parse(line)
} catch {
logError("[ACP-WS] parse error:", line);
continue;
logError('[ACP-WS] parse error:', line)
continue
}
// Handle keepalive
if (msg.type === "keep_alive") {
if (msg.type === 'keep_alive') {
// Update last activity timestamp (only if registered)
if (entry.agentId) {
storeUpdateEnvironment(entry.agentId, { lastPollAt: new Date() } as Parameters<typeof storeUpdateEnvironment>[1]);
storeUpdateEnvironment(entry.agentId, {
lastPollAt: new Date(),
} as Parameters<typeof storeUpdateEnvironment>[1])
}
continue;
continue
}
// Handle registration (legacy WS-only)
if (msg.type === "register") {
handleRegister(wsId, msg);
continue;
if (msg.type === 'register') {
handleRegister(wsId, msg)
continue
}
// Handle identify (REST registration + WS binding)
if (msg.type === "identify") {
handleIdentify(wsId, msg);
continue;
if (msg.type === 'identify') {
handleIdentify(wsId, msg)
continue
}
// Not registered yet — reject
if (!entry.agentId) {
sendToWs(entry.ws, { type: "error", message: "Not registered. Send register message first." });
continue;
sendToWs(entry.ws, {
type: 'error',
message: 'Not registered. Send register message first.',
})
continue
}
// Update agent activity
storeUpdateEnvironment(entry.agentId, { lastPollAt: new Date() } as Parameters<typeof storeUpdateEnvironment>[1]);
storeUpdateEnvironment(entry.agentId, {
lastPollAt: new Date(),
} as Parameters<typeof storeUpdateEnvironment>[1])
// Pass-through: publish to channel group EventBus as inbound
const bus = getAcpEventBus(entry.channelGroupId);
const bus = getAcpEventBus(entry.channelGroupId)
bus.publish({
id: randomUUID(),
sessionId: entry.channelGroupId,
type: (msg.type as string) || "acp_message",
type: (msg.type as string) || 'acp_message',
payload: msg,
direction: "inbound",
});
direction: 'inbound',
})
}
}
/** Called from onClose — marks agent offline and cleans up */
export function handleAcpWsClose(ws: WSContext, wsId: string, code?: number, reason?: string): void {
const entry = connections.get(wsId);
if (!entry) return;
export function handleAcpWsClose(
ws: WSContext,
wsId: string,
code?: number,
reason?: string,
): void {
const entry = connections.get(wsId)
if (!entry) return
const duration = Math.round((Date.now() - entry.openTime) / 1000);
log(`[ACP-WS] Connection closed: wsId=${wsId} agentId=${entry.agentId} code=${code ?? "none"} reason=${reason || "(none)"} duration=${duration}s`);
const duration = Math.round((Date.now() - entry.openTime) / 1000)
log(
`[ACP-WS] Connection closed: wsId=${wsId} agentId=${entry.agentId} code=${code ?? 'none'} reason=${reason || '(none)'} duration=${duration}s`,
)
if (entry.unsub) {
entry.unsub();
entry.unsub()
}
if (entry.keepalive) {
clearInterval(entry.keepalive);
clearInterval(entry.keepalive)
}
// Mark agent as offline (don't delete record — allow reconnect)
if (entry.agentId) {
storeMarkAcpAgentOffline(entry.agentId);
storeMarkAcpAgentOffline(entry.agentId)
// Notify all relay connections that this agent is gone
if (entry.channelGroupId) {
const bus = getAcpEventBus(entry.channelGroupId);
const bus = getAcpEventBus(entry.channelGroupId)
bus.publish({
id: randomUUID(),
sessionId: entry.channelGroupId,
type: "agent_disconnect",
type: 'agent_disconnect',
payload: { agentId: entry.agentId },
direction: "inbound",
});
direction: 'inbound',
})
}
}
connections.delete(wsId);
connections.delete(wsId)
}
/** Find an active ACP connection by agent ID */
export function findAcpConnectionByAgentId(agentId: string): AcpConnectionEntry | null {
export function findAcpConnectionByAgentId(
agentId: string,
): AcpConnectionEntry | null {
for (const entry of connections.values()) {
if (entry.agentId === agentId && entry.ws.readyState === 1) {
return entry;
return entry
}
}
return null;
return null
}
/** Send a JSON message directly to an agent's WebSocket connection */
export function sendToAgentWs(agentId: string, msg: object): boolean {
const entry = findAcpConnectionByAgentId(agentId);
if (!entry) return false;
sendToWs(entry.ws, msg);
return true;
const entry = findAcpConnectionByAgentId(agentId)
if (!entry) return false
sendToWs(entry.ws, msg)
return true
}
/** Gracefully close all ACP WebSocket connections */
export function closeAllAcpConnections(): void {
if (connections.size === 0) return;
if (connections.size === 0) return
log(`[ACP-WS] Gracefully closing ${connections.size} ACP connection(s)...`);
log(`[ACP-WS] Gracefully closing ${connections.size} ACP connection(s)...`)
for (const [wsId, entry] of connections) {
try {
if (entry.unsub) entry.unsub();
if (entry.keepalive) clearInterval(entry.keepalive);
if (entry.unsub) entry.unsub()
if (entry.keepalive) clearInterval(entry.keepalive)
if (entry.ws.readyState === 1) {
entry.ws.close(1001, "server_shutdown");
entry.ws.close(1001, 'server_shutdown')
}
if (entry.agentId) {
storeMarkAcpAgentOffline(entry.agentId);
storeMarkAcpAgentOffline(entry.agentId)
}
} catch {
// ignore errors during shutdown
}
}
connections.clear();
log("[ACP-WS] All connections closed");
connections.clear()
log('[ACP-WS] All connections closed')
}

View File

@@ -1,74 +1,83 @@
import type { SessionEvent } from "./event-bus";
import type { SessionEvent } from './event-bus'
/**
* Convert an internal session event into the SDK/control message shape that
* bridge workers consume on both the legacy WS path and the v2 worker SSE path.
*/
export function toClientPayload(event: SessionEvent): Record<string, unknown> {
const payload = event.payload as Record<string, unknown> | null;
const payload = event.payload as Record<string, unknown> | null
const messageUuid =
typeof payload?.uuid === "string" && payload.uuid ? payload.uuid : event.id;
typeof payload?.uuid === 'string' && payload.uuid ? payload.uuid : event.id
if (event.type === "user" || event.type === "user_message") {
if (event.type === 'user' || event.type === 'user_message') {
return {
type: "user",
type: 'user',
uuid: messageUuid,
session_id: event.sessionId,
...(payload?.isSynthetic === true ? { isSynthetic: true } : {}),
message: {
role: "user",
content: payload?.content ?? payload?.message ?? "",
role: 'user',
content: payload?.content ?? payload?.message ?? '',
},
};
}
}
if (event.type === "permission_response" || event.type === "control_response") {
const approved = !!payload?.approved;
const existingResponse = payload?.response as Record<string, unknown> | undefined;
if (
event.type === 'permission_response' ||
event.type === 'control_response'
) {
const approved = !!payload?.approved
const existingResponse = payload?.response as
| Record<string, unknown>
| undefined
if (existingResponse) {
return { type: "control_response", response: existingResponse };
return { type: 'control_response', response: existingResponse }
}
const updatedInput = payload?.updated_input as Record<string, unknown> | undefined;
const updatedPermissions = payload?.updated_permissions as Record<string, unknown>[] | undefined;
const feedbackMessage = payload?.message as string | undefined;
const updatedInput = payload?.updated_input as
| Record<string, unknown>
| undefined
const updatedPermissions = payload?.updated_permissions as
| Record<string, unknown>[]
| undefined
const feedbackMessage = payload?.message as string | undefined
return {
type: "control_response",
type: 'control_response',
response: {
subtype: approved ? "success" : "error",
request_id: payload?.request_id ?? "",
subtype: approved ? 'success' : 'error',
request_id: payload?.request_id ?? '',
...(approved
? {
response: {
behavior: "allow" as const,
behavior: 'allow' as const,
...(updatedInput ? { updatedInput } : {}),
...(updatedPermissions ? { updatedPermissions } : {}),
},
}
: {
error: "Permission denied by user",
response: { behavior: "deny" as const },
error: 'Permission denied by user',
response: { behavior: 'deny' as const },
...(feedbackMessage ? { message: feedbackMessage } : {}),
}),
},
};
}
}
if (event.type === "interrupt") {
if (event.type === 'interrupt') {
return {
type: "control_request",
type: 'control_request',
request_id: event.id,
request: { subtype: "interrupt" },
};
request: { subtype: 'interrupt' },
}
}
if (event.type === "control_request") {
if (event.type === 'control_request') {
return {
type: "control_request",
type: 'control_request',
request_id: payload?.request_id ?? event.id,
request: payload?.request ?? payload,
};
}
}
return {
@@ -76,5 +85,5 @@ export function toClientPayload(event: SessionEvent): Record<string, unknown> {
uuid: messageUuid,
session_id: event.sessionId,
message: payload,
};
}
}

View File

@@ -1,116 +1,116 @@
import { log, error as logError } from "../logger";
import { log, error as logError } from '../logger'
export interface SessionEvent {
id: string;
sessionId: string;
type: string;
payload: unknown;
direction: "inbound" | "outbound";
seqNum: number;
createdAt: number;
id: string
sessionId: string
type: string
payload: unknown
direction: 'inbound' | 'outbound'
seqNum: number
createdAt: number
}
type Subscriber = (event: SessionEvent) => void;
type Subscriber = (event: SessionEvent) => void
const MAX_EVENTS_PER_BUS = 5000;
const MAX_EVENTS_PER_BUS = 5000
export class EventBus {
private subscribers = new Set<Subscriber>();
private events: SessionEvent[] = [];
private seqNum = 0;
private closed = false;
private subscribers = new Set<Subscriber>()
private events: SessionEvent[] = []
private seqNum = 0
private closed = false
subscribe(callback: Subscriber): () => void {
this.subscribers.add(callback);
return () => this.subscribers.delete(callback);
this.subscribers.add(callback)
return () => this.subscribers.delete(callback)
}
subscriberCount(): number {
return this.subscribers.size;
return this.subscribers.size
}
publish(event: Omit<SessionEvent, "seqNum" | "createdAt">): SessionEvent {
if (this.closed) throw new Error("EventBus is closed");
publish(event: Omit<SessionEvent, 'seqNum' | 'createdAt'>): SessionEvent {
if (this.closed) throw new Error('EventBus is closed')
const full: SessionEvent = {
...event,
seqNum: ++this.seqNum,
createdAt: Date.now(),
};
this.events.push(full);
}
this.events.push(full)
// Evict oldest events when exceeding limit
if (this.events.length > MAX_EVENTS_PER_BUS) {
this.events = this.events.slice(-Math.floor(MAX_EVENTS_PER_BUS / 2));
this.events = this.events.slice(-Math.floor(MAX_EVENTS_PER_BUS / 2))
}
log(
`[RC-DEBUG] bus publish: sessionId=${event.sessionId} type=${event.type} dir=${event.direction} seq=${full.seqNum} subscribers=${this.subscribers.size}`,
event.type === "error" ? `payload=${JSON.stringify(event.payload)}` : "",
);
event.type === 'error' ? `payload=${JSON.stringify(event.payload)}` : '',
)
for (const cb of this.subscribers) {
try {
cb(full);
cb(full)
} catch (err) {
logError(`[RC-DEBUG] bus subscriber error:`, err);
logError(`[RC-DEBUG] bus subscriber error:`, err)
}
}
return full;
return full
}
getLastSeqNum(): number {
return this.seqNum;
return this.seqNum
}
getEventsSince(seqNum: number): SessionEvent[] {
const idx = this.events.findIndex((e) => e.seqNum > seqNum);
if (idx === -1) return [];
return this.events.slice(idx);
const idx = this.events.findIndex(e => e.seqNum > seqNum)
if (idx === -1) return []
return this.events.slice(idx)
}
close() {
this.closed = true;
this.subscribers.clear();
this.closed = true
this.subscribers.clear()
}
}
/** Global registry of per-session event buses */
const buses = new Map<string, EventBus>();
const buses = new Map<string, EventBus>()
export function getEventBus(sessionId: string): EventBus {
let bus = buses.get(sessionId);
let bus = buses.get(sessionId)
if (!bus) {
bus = new EventBus();
buses.set(sessionId, bus);
bus = new EventBus()
buses.set(sessionId, bus)
}
return bus;
return bus
}
export function removeEventBus(sessionId: string) {
const bus = buses.get(sessionId);
const bus = buses.get(sessionId)
if (bus) {
bus.close();
buses.delete(sessionId);
bus.close()
buses.delete(sessionId)
}
}
export function getAllEventBuses(): Map<string, EventBus> {
return buses;
return buses
}
/** Global registry of per-channel-group ACP event buses */
const acpBuses = new Map<string, EventBus>();
const acpBuses = new Map<string, EventBus>()
export function getAcpEventBus(channelGroupId: string): EventBus {
let bus = acpBuses.get(channelGroupId);
let bus = acpBuses.get(channelGroupId)
if (!bus) {
bus = new EventBus();
acpBuses.set(channelGroupId, bus);
bus = new EventBus()
acpBuses.set(channelGroupId, bus)
}
return bus;
return bus
}
export function removeAcpEventBus(channelGroupId: string) {
const bus = acpBuses.get(channelGroupId);
const bus = acpBuses.get(channelGroupId)
if (bus) {
bus.close();
acpBuses.delete(channelGroupId);
bus.close()
acpBuses.delete(channelGroupId)
}
}

View File

@@ -1,163 +1,177 @@
import { log, error as logError } from "../logger";
import type { Context } from "hono";
import type { SessionEvent } from "./event-bus";
import { getEventBus } from "./event-bus";
import { toClientPayload } from "./client-payload";
import { log, error as logError } from '../logger'
import type { Context } from 'hono'
import type { SessionEvent } from './event-bus'
import { getEventBus } from './event-bus'
import { toClientPayload } from './client-payload'
export interface SSEWriter {
send(event: SessionEvent): void;
close(): void;
send(event: SessionEvent): void
close(): void
}
export function createSSEWriter(c: Context): SSEWriter {
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
c.req.raw.signal.addEventListener("abort", () => {
controller.close();
});
const encoder = new TextEncoder()
c.req.raw.signal.addEventListener('abort', () => {
controller.close()
})
// Store encoder and controller for later use
(c as any)._sseEncoder = encoder;
(c as any)._sseController = controller;
;(c as any)._sseEncoder = encoder
;(c as any)._sseController = controller
},
});
})
return {
send(event: SessionEvent) {
const encoder = (c as any)._sseEncoder as TextEncoder;
const controller = (c as any)._sseController as ReadableStreamDefaultController;
if (!encoder || !controller) return;
const encoder = (c as any)._sseEncoder as TextEncoder
const controller = (c as any)
._sseController as ReadableStreamDefaultController
if (!encoder || !controller) return
const data = JSON.stringify({
type: event.type,
payload: event.payload,
direction: event.direction,
seqNum: event.seqNum,
});
const msg = `id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`;
controller.enqueue(encoder.encode(msg));
})
const msg = `id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`
controller.enqueue(encoder.encode(msg))
},
close() {
const controller = (c as any)._sseController as ReadableStreamDefaultController;
controller?.close();
const controller = (c as any)
._sseController as ReadableStreamDefaultController
controller?.close()
},
};
}
}
/** Create SSE response stream for a session */
export function createSSEStream(c: Context, sessionId: string, fromSeqNum = 0) {
const bus = getEventBus(sessionId);
const bus = getEventBus(sessionId)
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
const encoder = new TextEncoder()
// Send historical events if reconnecting
if (fromSeqNum > 0) {
const missed = bus.getEventsSince(fromSeqNum);
const missed = bus.getEventsSince(fromSeqNum)
for (const event of missed) {
const data = JSON.stringify({
type: event.type,
payload: event.payload,
direction: event.direction,
seqNum: event.seqNum,
});
controller.enqueue(encoder.encode(`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`));
})
controller.enqueue(
encoder.encode(
`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`,
),
)
}
}
// Send initial keepalive
controller.enqueue(encoder.encode(": keepalive\n\n"));
controller.enqueue(encoder.encode(': keepalive\n\n'))
// Subscribe to new events
const unsub = bus.subscribe((event) => {
const unsub = bus.subscribe(event => {
const data = JSON.stringify({
type: event.type,
payload: event.payload,
direction: event.direction,
seqNum: event.seqNum,
});
})
try {
log(`[RC-DEBUG] SSE -> web: sessionId=${sessionId} type=${event.type} dir=${event.direction} seq=${event.seqNum}`);
controller.enqueue(encoder.encode(`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`));
log(
`[RC-DEBUG] SSE -> web: sessionId=${sessionId} type=${event.type} dir=${event.direction} seq=${event.seqNum}`,
)
controller.enqueue(
encoder.encode(
`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`,
),
)
} catch {
unsub();
unsub()
}
});
})
// Keepalive interval
const keepalive = setInterval(() => {
try {
controller.enqueue(encoder.encode(": keepalive\n\n"));
controller.enqueue(encoder.encode(': keepalive\n\n'))
} catch {
clearInterval(keepalive);
unsub();
clearInterval(keepalive)
unsub()
}
}, 15000);
}, 15000)
// Cleanup on abort
c.req.raw.signal.addEventListener("abort", () => {
unsub();
clearInterval(keepalive);
c.req.raw.signal.addEventListener('abort', () => {
unsub()
clearInterval(keepalive)
try {
controller.close();
controller.close()
} catch {
// already closed
}
});
})
},
});
})
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
},
});
})
}
function toWorkerClientPayload(event: SessionEvent): Record<string, unknown> {
if (
event.type === "permission_response" ||
event.type === "control_response" ||
event.type === "control_request" ||
event.type === "interrupt"
event.type === 'permission_response' ||
event.type === 'control_response' ||
event.type === 'control_request' ||
event.type === 'interrupt'
) {
return toClientPayload(event);
return toClientPayload(event)
}
const normalized =
event.payload && typeof event.payload === "object"
event.payload && typeof event.payload === 'object'
? (event.payload as Record<string, unknown>)
: undefined;
: undefined
const raw =
normalized?.raw && typeof normalized.raw === "object" && !Array.isArray(normalized.raw)
normalized?.raw &&
typeof normalized.raw === 'object' &&
!Array.isArray(normalized.raw)
? (normalized.raw as Record<string, unknown>)
: undefined;
: undefined
const payload: Record<string, unknown> = {
...(raw ?? normalized ?? {}),
type: event.type,
};
}
if (event.type === "user") {
const message = payload.message;
if (!message || typeof message !== "object" || !("content" in message)) {
if (event.type === 'user') {
const message = payload.message
if (!message || typeof message !== 'object' || !('content' in message)) {
const content =
typeof normalized?.content === "string"
typeof normalized?.content === 'string'
? normalized.content
: typeof payload.content === "string"
: typeof payload.content === 'string'
? payload.content
: typeof event.payload === "string"
: typeof event.payload === 'string'
? event.payload
: "";
payload.content = content;
payload.message = { content };
: ''
payload.content = content
payload.message = { content }
}
}
return payload;
return payload
}
function toWorkerClientFrame(event: SessionEvent): string {
@@ -165,70 +179,74 @@ function toWorkerClientFrame(event: SessionEvent): string {
event_id: event.id,
sequence_num: event.seqNum,
event_type: event.type,
source: "client",
source: 'client',
payload: toWorkerClientPayload(event),
created_at: new Date(event.createdAt).toISOString(),
});
return `id: ${event.seqNum}\nevent: client_event\ndata: ${data}\n\n`;
})
return `id: ${event.seqNum}\nevent: client_event\ndata: ${data}\n\n`
}
/** Create CCR worker SSE stream (client_event frames, outbound events only). */
export function createWorkerEventStream(c: Context, sessionId: string, fromSeqNum = 0) {
const bus = getEventBus(sessionId);
export function createWorkerEventStream(
c: Context,
sessionId: string,
fromSeqNum = 0,
) {
const bus = getEventBus(sessionId)
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
const encoder = new TextEncoder()
if (fromSeqNum > 0) {
const missed = bus
.getEventsSince(fromSeqNum)
.filter((event) => event.direction === "outbound");
.filter(event => event.direction === 'outbound')
for (const event of missed) {
controller.enqueue(encoder.encode(toWorkerClientFrame(event)));
controller.enqueue(encoder.encode(toWorkerClientFrame(event)))
}
}
controller.enqueue(encoder.encode(": keepalive\n\n"));
controller.enqueue(encoder.encode(': keepalive\n\n'))
const unsub = bus.subscribe((event) => {
if (event.direction !== "outbound") {
return;
const unsub = bus.subscribe(event => {
if (event.direction !== 'outbound') {
return
}
try {
controller.enqueue(encoder.encode(toWorkerClientFrame(event)));
controller.enqueue(encoder.encode(toWorkerClientFrame(event)))
} catch {
unsub();
unsub()
}
});
})
const keepalive = setInterval(() => {
try {
controller.enqueue(encoder.encode(": keepalive\n\n"));
controller.enqueue(encoder.encode(': keepalive\n\n'))
} catch {
clearInterval(keepalive);
unsub();
clearInterval(keepalive)
unsub()
}
}, 15000);
}, 15000)
c.req.raw.signal.addEventListener("abort", () => {
unsub();
clearInterval(keepalive);
c.req.raw.signal.addEventListener('abort', () => {
unsub()
clearInterval(keepalive)
try {
controller.close();
controller.close()
} catch {
// already closed
}
});
})
},
});
})
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
},
});
})
}

View File

@@ -1,31 +1,31 @@
import type { WSContext } from "hono/ws";
import { getEventBus } from "./event-bus";
import type { SessionEvent } from "./event-bus";
import { publishSessionEvent } from "../services/transport";
import { log, error as logError } from "../logger";
import { toClientPayload } from "./client-payload";
import { config } from "../config";
import type { WSContext } from 'hono/ws'
import { getEventBus } from './event-bus'
import type { SessionEvent } from './event-bus'
import { publishSessionEvent } from '../services/transport'
import { log, error as logError } from '../logger'
import { toClientPayload } from './client-payload'
import { config } from '../config'
// Per-connection cleanup, keyed by sessionId (only one WS per session)
interface CleanupEntry {
unsub: () => void;
keepalive: ReturnType<typeof setInterval>;
ws: WSContext;
openTime: number;
lastClientActivity: number;
unsub: () => void
keepalive: ReturnType<typeof setInterval>
ws: WSContext
openTime: number
lastClientActivity: number
}
const cleanupBySession = new Map<string, CleanupEntry>();
const cleanupBySession = new Map<string, CleanupEntry>()
// Track all active WS connections for graceful shutdown
const activeConnections = new Set<WSContext>();
const activeConnections = new Set<WSContext>()
// Server-side keepalive interval (configurable via RCS_WS_KEEPALIVE_INTERVAL).
// Sends data frames to keep reverse proxies from closing idle connections.
const SERVER_KEEPALIVE_INTERVAL_MS = (config.wsKeepaliveInterval || 20) * 1000;
const SERVER_KEEPALIVE_INTERVAL_MS = (config.wsKeepaliveInterval || 20) * 1000
// If no client data received within this threshold, the connection is
// considered dead. Set to 3x keepalive to tolerate one missed interval.
const CLIENT_ACTIVITY_TIMEOUT_MS = SERVER_KEEPALIVE_INTERVAL_MS * 3;
const CLIENT_ACTIVITY_TIMEOUT_MS = SERVER_KEEPALIVE_INTERVAL_MS * 3
/**
* Convert internal EventBus event -> SDK message for bridge client.
@@ -33,36 +33,36 @@ const CLIENT_ACTIVITY_TIMEOUT_MS = SERVER_KEEPALIVE_INTERVAL_MS * 3;
function toSDKMessage(event: SessionEvent): string {
// NDJSON format: each message MUST end with \n so the child process's
// line-based parser can split messages correctly.
return JSON.stringify(toClientPayload(event)) + "\n";
return JSON.stringify(toClientPayload(event)) + '\n'
}
/** Called from onOpen — subscribes to event bus, forwards outbound events to bridge WS */
export function handleWebSocketOpen(ws: WSContext, sessionId: string) {
const openTime = Date.now();
const lastClientActivity = Date.now();
log(`[RC-DEBUG] [WS] Open session=${sessionId}`);
activeConnections.add(ws);
const openTime = Date.now()
const lastClientActivity = Date.now()
log(`[RC-DEBUG] [WS] Open session=${sessionId}`)
activeConnections.add(ws)
// If there's an existing connection for this session, clean it up first
const existing = cleanupBySession.get(sessionId);
const existing = cleanupBySession.get(sessionId)
if (existing) {
log(`[WS] Replacing existing connection for session=${sessionId}`);
existing.unsub();
clearInterval(existing.keepalive);
activeConnections.delete(existing.ws);
log(`[WS] Replacing existing connection for session=${sessionId}`)
existing.unsub()
clearInterval(existing.keepalive)
activeConnections.delete(existing.ws)
}
const bus = getEventBus(sessionId);
const bus = getEventBus(sessionId)
// Replay ALL events (inbound + outbound) so the bridge can reconstruct
// the full conversation history — assistant replies are inbound events.
const missed = bus.getEventsSince(0);
const missed = bus.getEventsSince(0)
if (missed.length > 0) {
log(`[WS] Replaying ${missed.length} missed event(s)`);
log(`[WS] Replaying ${missed.length} missed event(s)`)
for (const event of missed) {
if (ws.readyState !== 1) break;
if (ws.readyState !== 1) break
try {
ws.send(toSDKMessage(event));
ws.send(toSDKMessage(event))
} catch {
// ignore send errors during replay
}
@@ -70,75 +70,96 @@ export function handleWebSocketOpen(ws: WSContext, sessionId: string) {
}
const unsub = bus.subscribe((event: SessionEvent) => {
if (ws.readyState !== 1) return;
if (event.direction !== "outbound") return;
if (ws.readyState !== 1) return
if (event.direction !== 'outbound') return
try {
const sdkMsg = toSDKMessage(event);
log(`[RC-DEBUG] [WS] -> bridge (outbound): type=${event.type} len=${sdkMsg.length} msg=${sdkMsg.slice(0, 300)}`);
ws.send(sdkMsg);
const sdkMsg = toSDKMessage(event)
log(
`[RC-DEBUG] [WS] -> bridge (outbound): type=${event.type} len=${sdkMsg.length} msg=${sdkMsg.slice(0, 300)}`,
)
ws.send(sdkMsg)
} catch (err) {
logError("[RC-DEBUG] [WS] send error:", err);
logError('[RC-DEBUG] [WS] send error:', err)
}
});
})
const keepalive = setInterval(() => {
if (ws.readyState !== 1) {
clearInterval(keepalive);
return;
clearInterval(keepalive)
return
}
// Check if client is still alive — close if no data received for too long
const silenceMs = Date.now() - lastClientActivity;
const silenceMs = Date.now() - lastClientActivity
if (silenceMs > CLIENT_ACTIVITY_TIMEOUT_MS) {
log(`[WS] Client inactive for ${Math.round(silenceMs / 1000)}s on session=${sessionId}, closing dead connection`);
log(
`[WS] Client inactive for ${Math.round(silenceMs / 1000)}s on session=${sessionId}, closing dead connection`,
)
try {
ws.close(1000, "client inactive");
ws.close(1000, 'client inactive')
} catch {
clearInterval(keepalive);
clearInterval(keepalive)
}
return;
return
}
try {
ws.send('{"type":"keep_alive"}\n');
ws.send('{"type":"keep_alive"}\n')
} catch {
clearInterval(keepalive);
clearInterval(keepalive)
}
}, SERVER_KEEPALIVE_INTERVAL_MS);
}, SERVER_KEEPALIVE_INTERVAL_MS)
cleanupBySession.set(sessionId, { unsub, keepalive, ws, openTime, lastClientActivity });
cleanupBySession.set(sessionId, {
unsub,
keepalive,
ws,
openTime,
lastClientActivity,
})
}
/**
* Called from onMessage — bridge sends newline-delimited JSON.
*/
export function handleWebSocketMessage(ws: WSContext, sessionId: string, data: string) {
export function handleWebSocketMessage(
ws: WSContext,
sessionId: string,
data: string,
) {
// Track client activity for dead-connection detection
const entry = cleanupBySession.get(sessionId);
const entry = cleanupBySession.get(sessionId)
if (entry) {
entry.lastClientActivity = Date.now();
entry.lastClientActivity = Date.now()
}
const lines = data.split("\n").filter((l) => l.trim());
const lines = data.split('\n').filter(l => l.trim())
for (const line of lines) {
try {
ingestBridgeMessage(sessionId, JSON.parse(line));
ingestBridgeMessage(sessionId, JSON.parse(line))
} catch (err) {
logError("[WS] parse error:", err);
logError('[WS] parse error:', err)
}
}
}
/** Called from onClose — unsubscribes from event bus */
export function handleWebSocketClose(ws: WSContext, sessionId: string, code?: number, reason?: string) {
activeConnections.delete(ws);
export function handleWebSocketClose(
ws: WSContext,
sessionId: string,
code?: number,
reason?: string,
) {
activeConnections.delete(ws)
const entry = cleanupBySession.get(sessionId);
const duration = entry ? Math.round((Date.now() - entry.openTime) / 1000) : -1;
const entry = cleanupBySession.get(sessionId)
const duration = entry ? Math.round((Date.now() - entry.openTime) / 1000) : -1
log(`[WS] Close session=${sessionId} code=${code ?? "none"} reason=${reason || "(none)"} duration=${duration}s`);
log(
`[WS] Close session=${sessionId} code=${code ?? 'none'} reason=${reason || '(none)'} duration=${duration}s`,
)
if (entry) {
entry.unsub();
clearInterval(entry.keepalive);
cleanupBySession.delete(sessionId);
entry.unsub()
clearInterval(entry.keepalive)
cleanupBySession.delete(sessionId)
}
}
@@ -150,88 +171,104 @@ export function handleWebSocketClose(ws: WSContext, sessionId: string, code?: nu
* {"subtype":"success","uuid":"...","result":"..."} → type "result"
*/
function deriveEventType(msg: Record<string, unknown>): string {
if (msg.type && typeof msg.type === "string") return msg.type;
if (msg.type && typeof msg.type === 'string') return msg.type
// Child process stream-json format: message.role determines type
const message = msg.message as Record<string, unknown> | undefined;
if (message && typeof message.role === "string") {
return message.role; // "user", "assistant", "system"
const message = msg.message as Record<string, unknown> | undefined
if (message && typeof message.role === 'string') {
return message.role // "user", "assistant", "system"
}
// Result message
if (msg.subtype || msg.result !== undefined) return "result";
if (msg.subtype || msg.result !== undefined) return 'result'
// System/init message
if (msg.session_id) return "system";
if (msg.session_id) return 'system'
return "unknown";
return 'unknown'
}
/**
* Parse a single SDK message from bridge -> publish to EventBus as inbound.
*/
export function ingestBridgeMessage(sessionId: string, msg: Record<string, unknown>) {
if (msg.type === "keep_alive") return;
export function ingestBridgeMessage(
sessionId: string,
msg: Record<string, unknown>,
) {
if (msg.type === 'keep_alive') return
const eventType = deriveEventType(msg);
const eventType = deriveEventType(msg)
log(`[RC-DEBUG] [WS] <- bridge (inbound): sessionId=${sessionId} type=${eventType}${msg.uuid ? ` uuid=${msg.uuid}` : ""} msg=${JSON.stringify(msg).slice(0, 300)}`);
log(
`[RC-DEBUG] [WS] <- bridge (inbound): sessionId=${sessionId} type=${eventType}${msg.uuid ? ` uuid=${msg.uuid}` : ''} msg=${JSON.stringify(msg).slice(0, 300)}`,
)
let payload: unknown;
let payload: unknown
if (eventType === "assistant" || eventType === "partial_assistant") {
const message = msg.message as Record<string, unknown> | undefined;
const content = message?.content;
if (eventType === 'assistant' || eventType === 'partial_assistant') {
const message = msg.message as Record<string, unknown> | undefined
const content = message?.content
// Extract text from content blocks for simple display
let text = "";
if (typeof content === "string") {
text = content;
let text = ''
if (typeof content === 'string') {
text = content
} else if (Array.isArray(content)) {
text = content
.filter((b: unknown) => b && typeof b === "object" && "type" in (b as Record<string, unknown>) && (b as Record<string, unknown>).type === "text")
.map((b: Record<string, unknown>) => (b as Record<string, unknown>).text || "")
.join("");
.filter(
(b: unknown) =>
b &&
typeof b === 'object' &&
'type' in (b as Record<string, unknown>) &&
(b as Record<string, unknown>).type === 'text',
)
.map(
(b: Record<string, unknown>) =>
(b as Record<string, unknown>).text || '',
)
.join('')
}
payload = { message: msg.message, uuid: msg.uuid, content: text };
} else if (eventType === "user" || eventType === "system") {
payload = { message: msg.message, uuid: msg.uuid, content: text }
} else if (eventType === 'user' || eventType === 'system') {
payload = {
message: msg.message,
uuid: msg.uuid,
...(typeof msg.isSynthetic === "boolean" ? { isSynthetic: msg.isSynthetic } : {}),
};
} else if (eventType === "control_request") {
payload = { request_id: msg.request_id, request: msg.request };
} else if (eventType === "control_response") {
payload = { response: msg.response };
} else if (eventType === "result" || eventType === "result_success") {
payload = { subtype: msg.subtype, uuid: msg.uuid, result: msg.result };
...(typeof msg.isSynthetic === 'boolean'
? { isSynthetic: msg.isSynthetic }
: {}),
}
} else if (eventType === 'control_request') {
payload = { request_id: msg.request_id, request: msg.request }
} else if (eventType === 'control_response') {
payload = { response: msg.response }
} else if (eventType === 'result' || eventType === 'result_success') {
payload = { subtype: msg.subtype, uuid: msg.uuid, result: msg.result }
} else {
payload = msg;
payload = msg
}
publishSessionEvent(sessionId, eventType, payload, "inbound");
publishSessionEvent(sessionId, eventType, payload, 'inbound')
}
/**
* Gracefully close all active WebSocket connections.
*/
export function closeAllConnections(): void {
const count = activeConnections.size;
if (count === 0) return;
const count = activeConnections.size
if (count === 0) return
log(`[WS] Gracefully closing ${count} active connection(s)...`);
log(`[WS] Gracefully closing ${count} active connection(s)...`)
for (const [sessionId, entry] of cleanupBySession) {
try {
entry.unsub();
clearInterval(entry.keepalive);
entry.unsub()
clearInterval(entry.keepalive)
if (entry.ws.readyState === 1) {
entry.ws.close(1001, "server_shutdown");
entry.ws.close(1001, 'server_shutdown')
}
} catch {
// ignore errors during shutdown
}
}
cleanupBySession.clear();
activeConnections.clear();
log("[WS] All connections closed");
cleanupBySession.clear()
activeConnections.clear()
log('[WS] All connections closed')
}

View File

@@ -1,39 +1,42 @@
import { Buffer } from "node:buffer";
import type { WSContext } from "hono/ws";
import { error as logError } from "../logger";
import { Buffer } from 'node:buffer'
import type { WSContext } from 'hono/ws'
import { error as logError } from '../logger'
const textDecoder = new TextDecoder();
const textDecoder = new TextDecoder()
export const MAX_WS_MESSAGE_SIZE = 10 * 1024 * 1024;
export const MAX_WS_MESSAGE_SIZE = 10 * 1024 * 1024
export type DecodedWsMessage =
| { ok: true; data: string; size: number }
| { ok: false; reason: string; size?: number };
| { ok: false; reason: string; size?: number }
export function decodeWsPayload(data: unknown): DecodedWsMessage {
if (typeof data === "string") {
return { ok: true, data, size: Buffer.byteLength(data, "utf8") };
if (typeof data === 'string') {
return { ok: true, data, size: Buffer.byteLength(data, 'utf8') }
}
if (data instanceof ArrayBuffer) {
if (data.byteLength > MAX_WS_MESSAGE_SIZE) {
return { ok: false, reason: "message too large", size: data.byteLength };
return { ok: false, reason: 'message too large', size: data.byteLength }
}
return { ok: true, data: textDecoder.decode(data), size: data.byteLength };
return { ok: true, data: textDecoder.decode(data), size: data.byteLength }
}
if (data instanceof Uint8Array) {
if (data.byteLength > MAX_WS_MESSAGE_SIZE) {
return { ok: false, reason: "message too large", size: data.byteLength };
return { ok: false, reason: 'message too large', size: data.byteLength }
}
return { ok: true, data: textDecoder.decode(data), size: data.byteLength };
return { ok: true, data: textDecoder.decode(data), size: data.byteLength }
}
if (typeof SharedArrayBuffer !== "undefined" && data instanceof SharedArrayBuffer) {
const bytes = new Uint8Array(data);
if (
typeof SharedArrayBuffer !== 'undefined' &&
data instanceof SharedArrayBuffer
) {
const bytes = new Uint8Array(data)
if (bytes.byteLength > MAX_WS_MESSAGE_SIZE) {
return { ok: false, reason: "message too large", size: bytes.byteLength };
return { ok: false, reason: 'message too large', size: bytes.byteLength }
}
return { ok: true, data: textDecoder.decode(bytes), size: bytes.byteLength };
return { ok: true, data: textDecoder.decode(bytes), size: bytes.byteLength }
}
return { ok: false, reason: typeof data };
return { ok: false, reason: typeof data }
}
export function handleSizedWsPayload(
@@ -43,22 +46,28 @@ export function handleSizedWsPayload(
payload: unknown,
handleMessage: (data: string) => void,
): boolean {
const decoded = decodeWsPayload(payload);
const decoded = decodeWsPayload(payload)
if (!decoded.ok) {
if (decoded.reason === "message too large" && decoded.size !== undefined) {
logError(`${logPrefix} Message too large on ${label}: size=${decoded.size} limit=${MAX_WS_MESSAGE_SIZE}`);
ws.close(1009, "message too large");
return false;
if (decoded.reason === 'message too large' && decoded.size !== undefined) {
logError(
`${logPrefix} Message too large on ${label}: size=${decoded.size} limit=${MAX_WS_MESSAGE_SIZE}`,
)
ws.close(1009, 'message too large')
return false
}
logError(`${logPrefix} Unsupported message payload on ${label}: ${decoded.reason}`);
ws.close(1003, "unsupported message payload");
return false;
logError(
`${logPrefix} Unsupported message payload on ${label}: ${decoded.reason}`,
)
ws.close(1003, 'unsupported message payload')
return false
}
if (decoded.size > MAX_WS_MESSAGE_SIZE) {
logError(`${logPrefix} Message too large on ${label}: size=${decoded.size} limit=${MAX_WS_MESSAGE_SIZE}`);
ws.close(1009, "message too large");
return false;
logError(
`${logPrefix} Message too large on ${label}: size=${decoded.size} limit=${MAX_WS_MESSAGE_SIZE}`,
)
ws.close(1009, 'message too large')
return false
}
handleMessage(decoded.data);
return true;
handleMessage(decoded.data)
return true
}

View File

@@ -1 +1 @@
export { upgradeWebSocket, websocket } from "hono/bun";
export { upgradeWebSocket, websocket } from 'hono/bun'