import type { WSContext } from "hono/ws"; import { randomUUID } from "node:crypto"; import { getAcpEventBus } from "./event-bus"; import type { SessionEvent } from "./event-bus"; import { storeCreateEnvironment, storeGetEnvironment, storeMarkAcpAgentOffline, storeMarkAcpAgentOnline, storeUpdateEnvironment, } from "../store"; import { config } from "../config"; import { log, error as logError } from "../logger"; // Per-connection state interface AcpConnectionEntry { agentId: string | null; // Set after register message channelGroupId: string; unsub: (() => void) | null; keepalive: ReturnType | null; ws: WSContext; openTime: number; lastClientActivity: number; capabilities: Record | null; } const connections = new Map(); // key: wsId const SERVER_KEEPALIVE_INTERVAL_MS = config.wsKeepaliveInterval * 1000; const CLIENT_ACTIVITY_TIMEOUT_MS = SERVER_KEEPALIVE_INTERVAL_MS * 3; /** Send a JSON message to a WS connection (NDJSON format) */ function sendToWs(ws: WSContext, msg: object): void { if (ws.readyState !== 1) return; try { ws.send(JSON.stringify(msg) + "\n"); } catch (err) { logError("[ACP-WS] send error:", err); } } /** Called from onOpen — initializes connection tracking */ export function handleAcpWsOpen(ws: WSContext, wsId: string): void { log(`[ACP-WS] Connection opened: wsId=${wsId}`); const keepalive = setInterval(() => { const entry = connections.get(wsId); if (!entry || entry.ws.readyState !== 1) { clearInterval(keepalive); return; } const silenceMs = Date.now() - entry.lastClientActivity; if (silenceMs > CLIENT_ACTIVITY_TIMEOUT_MS) { log(`[ACP-WS] Client inactive for ${Math.round(silenceMs / 1000)}s, closing dead connection`); try { entry.ws.close(1000, "client inactive"); } catch { clearInterval(keepalive); } return; } sendToWs(entry.ws, { type: "keep_alive" }); }, SERVER_KEEPALIVE_INTERVAL_MS); connections.set(wsId, { agentId: null, channelGroupId: "", unsub: null, keepalive, ws, openTime: Date.now(), lastClientActivity: Date.now(), capabilities: null, }); } /** Handle register message — legacy WS-only registration (still supported) */ function handleRegister(wsId: string, msg: Record): void { const entry = connections.get(wsId); if (!entry) return; if (entry.agentId) { sendToWs(entry.ws, { type: "error", message: "Already registered" }); return; } const agentName = (msg.agent_name as string) || "unknown"; const capabilities = msg.capabilities as Record | undefined; const channelGroupId = (msg.channel_group_id as string) || `group_${randomUUID().replace(/-/g, "").slice(0, 12)}`; const acpLinkVersion = (msg.acp_link_version as string) || null; const maxSessions = typeof msg.max_sessions === "number" ? msg.max_sessions : 1; // Create EnvironmentRecord with workerType="acp" const secret = config.apiKeys[0] || ""; const record = storeCreateEnvironment({ secret, machineName: agentName, workerType: "acp", bridgeId: channelGroupId, maxSessions, capabilities: capabilities || undefined, } as Parameters[0]); // Store ACP-specific metadata via environment update storeUpdateEnvironment(record.id, { status: "active", } as Parameters[1]); entry.agentId = record.id; entry.channelGroupId = channelGroupId; entry.capabilities = capabilities || null; // Subscribe to channel group EventBus — broadcast events to this WS const bus = getAcpEventBus(channelGroupId); const unsub = bus.subscribe((event: SessionEvent) => { if (entry.ws.readyState !== 1) return; if (event.direction !== "outbound") return; // Forward outbound events as raw ACP messages sendToWs(entry.ws, event.payload as object); }); entry.unsub = unsub; log(`[ACP-WS] Agent registered (legacy WS): agentId=${record.id} channelGroup=${channelGroupId} name=${agentName}`); sendToWs(entry.ws, { type: "registered", agent_id: record.id, channel_group_id: channelGroupId, }); } /** Handle identify message — binds WS to an existing agent registered via REST */ function handleIdentify(wsId: string, msg: Record): void { const entry = connections.get(wsId); if (!entry) return; if (entry.agentId) { sendToWs(entry.ws, { type: "error", message: "Already identified" }); return; } const agentId = msg.agent_id as string; if (!agentId) { sendToWs(entry.ws, { type: "error", message: "Missing agent_id" }); return; } // Look up the environment record (created via REST registration) const record = storeGetEnvironment(agentId); if (!record || record.workerType !== "acp") { sendToWs(entry.ws, { type: "error", message: "Agent not found" }); return; } // Update status to active storeMarkAcpAgentOnline(agentId); const channelGroupId = record.bridgeId || `group_${randomUUID().replace(/-/g, "").slice(0, 12)}`; entry.agentId = record.id; entry.channelGroupId = channelGroupId; entry.capabilities = record.capabilities || null; // Subscribe to channel group EventBus — broadcast events to this WS const bus = getAcpEventBus(channelGroupId); const unsub = bus.subscribe((event: SessionEvent) => { if (entry.ws.readyState !== 1) return; if (event.direction !== "outbound") return; sendToWs(entry.ws, event.payload as object); }); entry.unsub = unsub; log(`[ACP-WS] Agent identified (REST+WS): agentId=${record.id} channelGroup=${channelGroupId}`); sendToWs(entry.ws, { type: "identified", agent_id: record.id, channel_group_id: channelGroupId, }); } /** Called from onMessage — processes NDJSON lines */ export function handleAcpWsMessage(ws: WSContext, wsId: string, data: string): void { const entry = connections.get(wsId); if (!entry) return; entry.lastClientActivity = Date.now(); const lines = data.split("\n").filter((l) => l.trim()); for (const line of lines) { let msg: Record; try { msg = JSON.parse(line); } catch { logError("[ACP-WS] parse error:", line); continue; } // Handle keepalive if (msg.type === "keep_alive") { // Update last activity timestamp (only if registered) if (entry.agentId) { storeUpdateEnvironment(entry.agentId, { lastPollAt: new Date() } as Parameters[1]); } continue; } // Handle registration (legacy WS-only) if (msg.type === "register") { handleRegister(wsId, msg); continue; } // Handle identify (REST registration + WS binding) if (msg.type === "identify") { handleIdentify(wsId, msg); continue; } // Not registered yet — reject if (!entry.agentId) { sendToWs(entry.ws, { type: "error", message: "Not registered. Send register message first." }); continue; } // Update agent activity storeUpdateEnvironment(entry.agentId, { lastPollAt: new Date() } as Parameters[1]); // Pass-through: publish to channel group EventBus as inbound const bus = getAcpEventBus(entry.channelGroupId); bus.publish({ id: randomUUID(), sessionId: entry.channelGroupId, type: (msg.type as string) || "acp_message", payload: msg, direction: "inbound", }); } } /** Called from onClose — marks agent offline and cleans up */ export function handleAcpWsClose(ws: WSContext, wsId: string, code?: number, reason?: string): void { const entry = connections.get(wsId); if (!entry) return; const duration = Math.round((Date.now() - entry.openTime) / 1000); log(`[ACP-WS] Connection closed: wsId=${wsId} agentId=${entry.agentId} code=${code ?? "none"} reason=${reason || "(none)"} duration=${duration}s`); if (entry.unsub) { entry.unsub(); } if (entry.keepalive) { clearInterval(entry.keepalive); } // Mark agent as offline (don't delete record — allow reconnect) if (entry.agentId) { storeMarkAcpAgentOffline(entry.agentId); // Notify all relay connections that this agent is gone if (entry.channelGroupId) { const bus = getAcpEventBus(entry.channelGroupId); bus.publish({ id: randomUUID(), sessionId: entry.channelGroupId, type: "agent_disconnect", payload: { agentId: entry.agentId }, direction: "inbound", }); } } connections.delete(wsId); } /** Find an active ACP connection by agent ID */ export function findAcpConnectionByAgentId(agentId: string): AcpConnectionEntry | null { for (const entry of connections.values()) { if (entry.agentId === agentId && entry.ws.readyState === 1) { return entry; } } return null; } /** Send a JSON message directly to an agent's WebSocket connection */ export function sendToAgentWs(agentId: string, msg: object): boolean { const entry = findAcpConnectionByAgentId(agentId); if (!entry) return false; sendToWs(entry.ws, msg); return true; } /** Gracefully close all ACP WebSocket connections */ export function closeAllAcpConnections(): void { if (connections.size === 0) return; log(`[ACP-WS] Gracefully closing ${connections.size} ACP connection(s)...`); for (const [wsId, entry] of connections) { try { if (entry.unsub) entry.unsub(); if (entry.keepalive) clearInterval(entry.keepalive); if (entry.ws.readyState === 1) { entry.ws.close(1001, "server_shutdown"); } if (entry.agentId) { storeMarkAcpAgentOffline(entry.agentId); } } catch { // ignore errors during shutdown } } connections.clear(); log("[ACP-WS] All connections closed"); }