mirror of
https://github.com/claude-code-best/claude-code.git
synced 2026-06-17 22:05:50 +00:00
feat: 支持 acp-link 包进行 acp 通用的 remote-control (#292)
* fix: 修复超时问题 * feat: 添加 acp-link 代码 * refactor: 样式重构完成 * feat: RCS 添加 ACP 后端支持 - 新增 ACP WebSocket handler (agent 注册、EventBus 订阅) - 新增 relay handler (前端 WS → acp-link 透传 + EventBus inbound 转发) - 新增 SSE event stream 供外部消费者订阅 channel group 事件 - ACP REST 接口无鉴权 (agents、channel-groups) - WebSocket 端点保留 token 鉴权 - SPA 路由 /acp/ 指向 acp.html Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: 添加 ACP 专属前端界面 - 新增 /acp/ SPA 页面 (agent 列表 + 实时交互) - Agent 列表按 channel group 分组,显示在线状态 - 通过 RCS WebSocket relay 与 agent 通信 - Vite multi-page 构建 (index.html + acp.html) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: acp-link 支持 RCS relay 双向通信 - rcs-upstream 新增 messageHandler 转发非控制消息 - server.ts 新增虚拟 WS + relay client state 处理 relay ACP 消息 - newSession/loadSession 补充 mcpServers 参数 - 连接成功后显示 ACP Dashboard URL Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: 移除 FileExplorer 及文件操作相关代码 - 删除 FileExplorer 组件 - ACPMain 移除 Files tab,仅保留 Chat 和 History - client.ts 移除 listDir/readFile/onFileChanges 等方法 - types.ts 移除 FileItem/FileContent/FileChange 等类型 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: 修复类型问题 * feat: RCS 后端统一 ACP/Bridge 注册逻辑 - store: EnvironmentRecord 增加 capabilities 字段、storeFindEnvironmentByMachineName 复用逻辑 - store: 新增 storeGetSessionOwners,支持未绑定 session 自动 claim - environment: registerEnvironment 支持 ACP 复用已有记录,返回 session_id - session: resolveOwnedWebSessionId 支持无 owner session 自动绑定 - acp-ws-handler: 新增 handleIdentify 支持 REST+WS 两步注册 - acp routes: /acp/relay 和 /acp/agents 支持 UUID 认证 - event-bus: 增加 error 类型 payload 日志 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: acp-link 改 REST 注册 + WS identify 两步流程 - rcs-upstream: 新增 registerViaRest() 通过 POST /v1/environments/bridge 注册 - rcs-upstream: WS 连接后发送 identify 替代 register,携带 agentId - rcs-upstream: 入口链接改为 /code/?sid=${sessionId} 实现用户绑定 - server: 修复心跳跳过 relay 虚拟连接的 bug - server: maxSessions 配置传入 RCS upstream Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: 前端统一 Chat 组件 + ACP 聊天界面重构 - 新增 chat/ 组件: ChatView, ChatInput, MessageBubble, ToolCallGroup, PermissionPanel, SessionSidebar, CommandMenu - ACPMain: 重构支持完整 ACP 协议交互(session/prompt/permission) - rcs-chat-adapter: 统一 bridge session SSE 适配器 - ACPClient: 增强 session 管理、permission 流程、streaming 支持 - index.css: 新增 chat 相关样式、动画、布局 - useCommands: 新增快捷命令 hook Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: 删除 /acp/ 独立页面,ACP 聊天统一到 /code/:sessionId - 删除 acp.html、acp-main.tsx 入口文件和 pages/acp/ 目录 - SessionDetail: ACP session 在同一页面渲染 ACPSessionDetail 组件 - App.tsx: ?sid= 参数自动调用 apiBind 绑定用户 UUID - Dashboard: 统一 session 列表导航,ACP 显示紫色标签 - relay-client: 改用 UUID 认证替代 API token - EnvironmentList: 显示 workerType 标签(ACP Agent / Claude Code) - index.ts: 移除 /acp/ SPA 路由,vite.config 移除 acp 入口 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * build: 更新构建及测试修复 --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -8,6 +8,13 @@ export const config = {
|
||||
heartbeatInterval: parseInt(process.env.RCS_HEARTBEAT_INTERVAL || "20"),
|
||||
jwtExpiresIn: parseInt(process.env.RCS_JWT_EXPIRES_IN || "3600"),
|
||||
disconnectTimeout: parseInt(process.env.RCS_DISCONNECT_TIMEOUT || "300"),
|
||||
/** Bun WebSocket idle timeout (seconds). Bun sends protocol-level pings after
|
||||
* this many seconds of no received data. Must be shorter than any reverse
|
||||
* proxy's idle timeout (nginx default 60s, Cloudflare 100s). Default 30s. */
|
||||
wsIdleTimeout: parseInt(process.env.RCS_WS_IDLE_TIMEOUT || "30"),
|
||||
/** Server→client keep_alive data-frame interval (seconds). Keeps reverse
|
||||
* proxies from closing idle connections. Default 20s. */
|
||||
wsKeepaliveInterval: parseInt(process.env.RCS_WS_KEEPALIVE_INTERVAL || "20"),
|
||||
} as const;
|
||||
|
||||
export function getBaseUrl(): string {
|
||||
|
||||
@@ -4,15 +4,20 @@ import { logger } from "hono/logger";
|
||||
import { serveStatic } from "hono/bun";
|
||||
import { config } from "./config";
|
||||
import { closeAllConnections } from "./transport/ws-handler";
|
||||
import { closeAllAcpConnections } from "./transport/acp-ws-handler";
|
||||
import { closeAllRelayConnections } from "./transport/acp-relay-handler";
|
||||
import { startDisconnectMonitor } from "./services/disconnect-monitor";
|
||||
import { dirname, resolve } from "node:path";
|
||||
import { existsSync } from "node:fs";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import acpRoutes from "./routes/acp";
|
||||
|
||||
// Routes
|
||||
import v1Environments from "./routes/v1/environments";
|
||||
import v1EnvironmentsWork from "./routes/v1/environments.work";
|
||||
import v1Sessions from "./routes/v1/sessions";
|
||||
import v1SessionIngress, { websocket } from "./routes/v1/session-ingress";
|
||||
import v1SessionIngress from "./routes/v1/session-ingress";
|
||||
import { websocket } from "./transport/ws-shared";
|
||||
import v2CodeSessions from "./routes/v2/code-sessions";
|
||||
import v2Worker from "./routes/v2/worker";
|
||||
import v2WorkerEventsStream from "./routes/v2/worker-events-stream";
|
||||
@@ -33,9 +38,11 @@ app.use("/web/*", cors());
|
||||
// Health check
|
||||
app.get("/health", (c) => c.json({ status: "ok", version: config.version }));
|
||||
|
||||
// Static files — serve web/ directory under /code path
|
||||
// Static files — serve built web UI under /code path
|
||||
// Uses web/dist/ if it exists (production), otherwise falls back to web/ (dev/fallback)
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
const webDir = resolve(__dirname, "../web");
|
||||
const distDir = resolve(__dirname, "../web/dist");
|
||||
const webDir = existsSync(resolve(distDir, "index.html")) ? distDir : resolve(__dirname, "../web");
|
||||
|
||||
const stripCodePrefix = (p: string) => p.replace(/^\/code/, "");
|
||||
|
||||
@@ -70,6 +77,10 @@ app.route("/web", webSessions);
|
||||
app.route("/web", webControl);
|
||||
app.route("/web", webEnvironments);
|
||||
|
||||
// ACP protocol routes
|
||||
console.log("[RCS] ACP support enabled");
|
||||
app.route("/acp", acpRoutes);
|
||||
|
||||
const port = config.port;
|
||||
const host = config.host;
|
||||
|
||||
@@ -77,6 +88,8 @@ console.log(`[RCS] Remote Control Server starting on ${host}:${port}`);
|
||||
console.log("[RCS] API key configuration loaded");
|
||||
console.log(`[RCS] Base URL: ${config.baseUrl || `http://localhost:${port}`}`);
|
||||
console.log(`[RCS] Disconnect timeout: ${config.disconnectTimeout}s`);
|
||||
console.log(`[RCS] WebSocket idle timeout: ${config.wsIdleTimeout}s (protocol-level pings)`);
|
||||
console.log(`[RCS] WebSocket keepalive interval: ${config.wsKeepaliveInterval}s (data frames)`);
|
||||
|
||||
// Start disconnect monitor
|
||||
startDisconnectMonitor();
|
||||
@@ -87,15 +100,17 @@ export default {
|
||||
fetch: app.fetch,
|
||||
websocket: {
|
||||
...websocket,
|
||||
idleTimeout: 255, // WS idle timeout (seconds) — must be inside websocket object
|
||||
idleTimeout: config.wsIdleTimeout, // Bun sends protocol pings after this many seconds of silence
|
||||
},
|
||||
idleTimeout: 255, // HTTP server idle timeout (seconds) — needed for long-polling endpoints
|
||||
idleTimeout: config.wsIdleTimeout, // HTTP server idle timeout (seconds)
|
||||
};
|
||||
|
||||
// Graceful shutdown
|
||||
async function gracefulShutdown(signal: string) {
|
||||
console.log(`\n[RCS] Received ${signal}, shutting down...`);
|
||||
closeAllConnections();
|
||||
closeAllAcpConnections();
|
||||
closeAllRelayConnections();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
|
||||
214
packages/remote-control-server/src/routes/acp/index.ts
Normal file
214
packages/remote-control-server/src/routes/acp/index.ts
Normal file
@@ -0,0 +1,214 @@
|
||||
import { Hono } from "hono";
|
||||
import { upgradeWebSocket } from "../../transport/ws-shared";
|
||||
import { apiKeyAuth } from "../../auth/middleware";
|
||||
import { validateApiKey } from "../../auth/api-key";
|
||||
import {
|
||||
handleAcpWsOpen,
|
||||
handleAcpWsMessage,
|
||||
handleAcpWsClose,
|
||||
} from "../../transport/acp-ws-handler";
|
||||
import {
|
||||
handleRelayOpen,
|
||||
handleRelayMessage,
|
||||
handleRelayClose,
|
||||
} from "../../transport/acp-relay-handler";
|
||||
import {
|
||||
storeListAcpAgents,
|
||||
storeListAcpAgentsByChannelGroup,
|
||||
storeGetEnvironment,
|
||||
} from "../../store";
|
||||
import { createAcpSSEStream } from "../../transport/acp-sse-writer";
|
||||
import { log, error as logError } from "../../logger";
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
/** Maximum WebSocket message size: 10 MB */
|
||||
const MAX_WS_MESSAGE_SIZE = 10 * 1024 * 1024;
|
||||
|
||||
/** Response shape for an ACP agent */
|
||||
function toAcpAgentResponse(env: ReturnType<typeof storeGetEnvironment> & {}) {
|
||||
if (!env) return null;
|
||||
return {
|
||||
id: env.id,
|
||||
agent_name: env.machineName,
|
||||
channel_group_id: env.bridgeId,
|
||||
status: env.status === "active" ? "online" : "offline",
|
||||
max_sessions: env.maxSessions,
|
||||
last_seen_at: env.lastPollAt ? env.lastPollAt.getTime() / 1000 : null,
|
||||
created_at: env.createdAt.getTime() / 1000,
|
||||
};
|
||||
}
|
||||
|
||||
/** GET /acp/agents — List all registered ACP agents (UUID or API key auth) */
|
||||
app.get("/agents", async (c) => {
|
||||
// Require at least UUID auth
|
||||
const uuid = c.req.query("uuid");
|
||||
const authHeader = c.req.header("Authorization");
|
||||
const queryToken = c.req.query("token");
|
||||
const token = authHeader?.replace("Bearer ", "") || queryToken;
|
||||
if (!uuid && !(token && validateApiKey(token))) {
|
||||
return c.json({ error: { type: "unauthorized", message: "Missing auth" } }, 401);
|
||||
}
|
||||
const agents = storeListAcpAgents();
|
||||
return c.json(agents.map((a) => toAcpAgentResponse(a)).filter(Boolean));
|
||||
});
|
||||
|
||||
/** GET /acp/channel-groups — List all channel groups with member agents (UUID or API key auth) */
|
||||
app.get("/channel-groups", async (c) => {
|
||||
const uuid = c.req.query("uuid");
|
||||
const authHeader = c.req.header("Authorization");
|
||||
const queryToken = c.req.query("token");
|
||||
const token = authHeader?.replace("Bearer ", "") || queryToken;
|
||||
if (!uuid && !(token && validateApiKey(token))) {
|
||||
return c.json({ error: { type: "unauthorized", message: "Missing auth" } }, 401);
|
||||
}
|
||||
const agents = storeListAcpAgents();
|
||||
const groupMap = new Map<string, typeof agents>();
|
||||
for (const agent of agents) {
|
||||
const groupId = agent.bridgeId || "default";
|
||||
if (!groupMap.has(groupId)) {
|
||||
groupMap.set(groupId, []);
|
||||
}
|
||||
groupMap.get(groupId)!.push(agent);
|
||||
}
|
||||
const groups = [...groupMap.entries()].map(([id, members]) => ({
|
||||
channel_group_id: id,
|
||||
member_count: members.length,
|
||||
members: members.map((m) => toAcpAgentResponse(m)).filter(Boolean),
|
||||
}));
|
||||
return c.json(groups);
|
||||
});
|
||||
|
||||
/** GET /acp/channel-groups/:id — Specific channel group detail (no auth for web UI) */
|
||||
app.get("/channel-groups/:id", async (c) => {
|
||||
const groupId = c.req.param("id")!;
|
||||
const members = storeListAcpAgentsByChannelGroup(groupId);
|
||||
if (members.length === 0) {
|
||||
return c.json({ error: { type: "not_found", message: "Channel group not found" } }, 404);
|
||||
}
|
||||
return c.json({
|
||||
channel_group_id: groupId,
|
||||
member_count: members.length,
|
||||
members: members.map((m) => toAcpAgentResponse(m)).filter(Boolean),
|
||||
});
|
||||
});
|
||||
|
||||
/** SSE /acp/channel-groups/:id/events — Event stream for external consumers (no auth for web UI) */
|
||||
app.get("/channel-groups/:id/events", async (c) => {
|
||||
const groupId = c.req.param("id")!;
|
||||
|
||||
// Support Last-Event-ID / from_sequence_num for reconnection
|
||||
const lastEventId = c.req.header("Last-Event-ID");
|
||||
const fromSeq = c.req.query("from_sequence_num");
|
||||
const fromSeqNum = fromSeq ? parseInt(fromSeq) : lastEventId ? parseInt(lastEventId) : 0;
|
||||
|
||||
return createAcpSSEStream(c, groupId, fromSeqNum);
|
||||
});
|
||||
|
||||
/** WS /acp/ws — WebSocket endpoint for acp-link connections */
|
||||
app.get(
|
||||
"/ws",
|
||||
upgradeWebSocket(async (c) => {
|
||||
// Authenticate via API key in query param or header
|
||||
const authHeader = c.req.header("Authorization");
|
||||
const queryToken = c.req.query("token");
|
||||
const token = authHeader?.replace("Bearer ", "") || queryToken;
|
||||
|
||||
if (!token || !validateApiKey(token)) {
|
||||
log("[ACP-WS] Upgrade rejected: unauthorized");
|
||||
return {
|
||||
onOpen(_evt: any, ws: any) {
|
||||
ws.close(4003, "unauthorized");
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// Generate unique wsId for this connection
|
||||
const { v4: uuid } = await import("uuid");
|
||||
const wsId = `acp_ws_${uuid().replace(/-/g, "")}`;
|
||||
|
||||
log(`[ACP-WS] Upgrade accepted: wsId=${wsId}`);
|
||||
return {
|
||||
onOpen(_evt: any, ws: any) {
|
||||
handleAcpWsOpen(ws, wsId);
|
||||
},
|
||||
onMessage(evt: any, ws: any) {
|
||||
const data =
|
||||
typeof evt.data === "string"
|
||||
? evt.data
|
||||
: new TextDecoder().decode(evt.data as ArrayBuffer);
|
||||
if (data.length > MAX_WS_MESSAGE_SIZE) {
|
||||
logError(`[ACP-WS] Message too large on wsId=${wsId}: ${data.length} bytes`);
|
||||
ws.close(1009, "message too large");
|
||||
return;
|
||||
}
|
||||
handleAcpWsMessage(ws, wsId, data);
|
||||
},
|
||||
onClose(evt: any, ws: any) {
|
||||
const closeEvt = evt as unknown as CloseEvent;
|
||||
handleAcpWsClose(ws, wsId, closeEvt?.code, closeEvt?.reason);
|
||||
},
|
||||
onError(evt: any, ws: any) {
|
||||
logError(`[ACP-WS] Error on wsId=${wsId}:`, evt);
|
||||
handleAcpWsClose(ws, wsId, 1006, "websocket error");
|
||||
},
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
/** WS /acp/relay/:agentId — WebSocket relay for frontend to interact with an agent */
|
||||
app.get(
|
||||
"/relay/:agentId",
|
||||
upgradeWebSocket(async (c) => {
|
||||
// Authenticate via UUID (web frontend) or API key (legacy)
|
||||
const clientUuid = c.req.query("uuid");
|
||||
const authHeader = c.req.header("Authorization");
|
||||
const queryToken = c.req.query("token");
|
||||
const token = authHeader?.replace("Bearer ", "") || queryToken;
|
||||
|
||||
const hasUuid = !!clientUuid;
|
||||
const hasApiKey = !!token && validateApiKey(token);
|
||||
|
||||
if (!hasUuid && !hasApiKey) {
|
||||
log("[ACP-Relay] Upgrade rejected: unauthorized");
|
||||
return {
|
||||
onOpen(_evt: any, ws: any) {
|
||||
ws.close(4003, "unauthorized");
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const agentId = c.req.param("agentId")!;
|
||||
const { v4: uuid } = await import("uuid");
|
||||
const relayWsId = `relay_${uuid().replace(/-/g, "")}`;
|
||||
|
||||
log(`[ACP-Relay] Upgrade accepted: relayWsId=${relayWsId} agentId=${agentId}`);
|
||||
return {
|
||||
onOpen(_evt: any, ws: any) {
|
||||
handleRelayOpen(ws, relayWsId, agentId);
|
||||
},
|
||||
onMessage(evt: any, ws: any) {
|
||||
const data =
|
||||
typeof evt.data === "string"
|
||||
? evt.data
|
||||
: new TextDecoder().decode(evt.data as ArrayBuffer);
|
||||
if (data.length > MAX_WS_MESSAGE_SIZE) {
|
||||
logError(`[ACP-Relay] Message too large on relayWsId=${relayWsId}: ${data.length} bytes`);
|
||||
ws.close(1009, "message too large");
|
||||
return;
|
||||
}
|
||||
handleRelayMessage(ws, relayWsId, data);
|
||||
},
|
||||
onClose(evt: any, ws: any) {
|
||||
const closeEvt = evt as unknown as CloseEvent;
|
||||
handleRelayClose(ws, relayWsId, closeEvt?.code, closeEvt?.reason);
|
||||
},
|
||||
onError(evt: any, ws: any) {
|
||||
logError(`[ACP-Relay] Error on relayWsId=${relayWsId}:`, evt);
|
||||
handleRelayClose(ws, relayWsId, 1006, "websocket error");
|
||||
},
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
export default app;
|
||||
@@ -1,6 +1,6 @@
|
||||
import { log, error as logError } from "../../logger";
|
||||
import { Hono } from "hono";
|
||||
import { createBunWebSocket } from "hono/bun";
|
||||
import { upgradeWebSocket, websocket } from "../../transport/ws-shared";
|
||||
import { validateApiKey } from "../../auth/api-key";
|
||||
import { verifyWorkerJwt } from "../../auth/jwt";
|
||||
import {
|
||||
@@ -11,8 +11,6 @@ import {
|
||||
} from "../../transport/ws-handler";
|
||||
import { getSession, resolveExistingSessionId } from "../../services/session";
|
||||
|
||||
const { upgradeWebSocket, websocket } = createBunWebSocket();
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
/** Authenticate via API key or worker JWT in Authorization header or ?token= query param */
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { log, error as logError } from "../logger";
|
||||
import { storeListActiveEnvironments, storeUpdateEnvironment } from "../store";
|
||||
import { storeListActiveEnvironments, storeUpdateEnvironment, storeMarkAcpAgentOffline } from "../store";
|
||||
import { storeListSessions } from "../store";
|
||||
import { config } from "../config";
|
||||
import { updateSessionStatus } from "./session";
|
||||
@@ -10,6 +10,14 @@ export function runDisconnectMonitorSweep(now = Date.now()) {
|
||||
// Check environment heartbeat timeout
|
||||
const envs = storeListActiveEnvironments();
|
||||
for (const env of envs) {
|
||||
// Skip ACP agents — they use WS keepalive, not polling
|
||||
if (env.workerType === "acp") {
|
||||
if (env.lastPollAt && now - env.lastPollAt.getTime() > timeoutMs) {
|
||||
log(`[RCS] ACP agent ${env.id} timed out (no activity for ${Math.round((now - env.lastPollAt.getTime()) / 1000)}s)`);
|
||||
storeMarkAcpAgentOffline(env.id);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (env.lastPollAt && now - env.lastPollAt.getTime() > timeoutMs) {
|
||||
log(`[RCS] Environment ${env.id} timed out (no poll for ${Math.round((now - env.lastPollAt.getTime()) / 1000)}s)`);
|
||||
storeUpdateEnvironment(env.id, { status: "disconnected" });
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { config } from "../config";
|
||||
import {
|
||||
storeCreateEnvironment,
|
||||
storeCreateSession,
|
||||
storeGetEnvironment,
|
||||
storeUpdateEnvironment,
|
||||
storeListActiveEnvironments,
|
||||
@@ -18,6 +19,8 @@ function toResponse(row: EnvironmentRecord): EnvironmentResponse {
|
||||
status: row.status,
|
||||
username: row.username,
|
||||
last_poll_at: row.lastPollAt ? row.lastPollAt.getTime() / 1000 : null,
|
||||
worker_type: row.workerType,
|
||||
capabilities: row.capabilities,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -34,9 +37,21 @@ export function registerEnvironment(req: RegisterEnvironmentRequest & { metadata
|
||||
workerType,
|
||||
bridgeId: req.bridge_id,
|
||||
username: req.username,
|
||||
capabilities: req.capabilities,
|
||||
});
|
||||
|
||||
return { environment_id: record.id, environment_secret: record.secret, status: record.status as "active" };
|
||||
let sessionId: string | undefined;
|
||||
// ACP agents: auto-create a session so they appear in the dashboard sessions list
|
||||
if (workerType === "acp") {
|
||||
const session = storeCreateSession({
|
||||
environmentId: record.id,
|
||||
title: req.machine_name || "ACP Agent",
|
||||
source: "acp",
|
||||
});
|
||||
sessionId = session.id;
|
||||
}
|
||||
|
||||
return { environment_id: record.id, environment_secret: record.secret, status: record.status as "active", session_id: sessionId };
|
||||
}
|
||||
|
||||
export function deregisterEnvironment(envId: string) {
|
||||
|
||||
@@ -2,6 +2,8 @@ import {
|
||||
storeCreateSession,
|
||||
storeGetSession,
|
||||
storeIsSessionOwner,
|
||||
storeGetSessionOwners,
|
||||
storeBindSession,
|
||||
storeUpdateSession,
|
||||
storeListSessions,
|
||||
storeListSessionsByUsername,
|
||||
@@ -106,6 +108,16 @@ export function resolveOwnedWebSessionId(sessionId: string, uuid: string): strin
|
||||
return compatibleCodeSessionId;
|
||||
}
|
||||
|
||||
// Auto-bind: if the session exists but has no owner, claim it for the requesting user
|
||||
const existingId = resolveExistingSessionId(sessionId);
|
||||
if (existingId) {
|
||||
const owners = storeGetSessionOwners(existingId);
|
||||
if (!owners || owners.size === 0) {
|
||||
storeBindSession(existingId, uuid);
|
||||
return existingId;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ export interface EnvironmentRecord {
|
||||
maxSessions: number;
|
||||
workerType: string;
|
||||
bridgeId: string | null;
|
||||
capabilities: Record<string, unknown> | null;
|
||||
status: string;
|
||||
username: string | null;
|
||||
lastPollAt: Date | null;
|
||||
@@ -97,6 +98,21 @@ export function storeDeleteToken(token: string): boolean {
|
||||
|
||||
// ---------- Environment ----------
|
||||
|
||||
/** Find an active environment by machineName (optionally filtered by workerType) */
|
||||
export function storeFindEnvironmentByMachineName(
|
||||
machineName: string,
|
||||
workerType?: string,
|
||||
): EnvironmentRecord | undefined {
|
||||
for (const rec of environments.values()) {
|
||||
if (rec.machineName === machineName && rec.status === "active") {
|
||||
if (!workerType || rec.workerType === workerType) {
|
||||
return rec;
|
||||
}
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function storeCreateEnvironment(req: {
|
||||
secret: string;
|
||||
machineName?: string;
|
||||
@@ -107,7 +123,25 @@ export function storeCreateEnvironment(req: {
|
||||
workerType?: string;
|
||||
bridgeId?: string;
|
||||
username?: string;
|
||||
capabilities?: Record<string, unknown>;
|
||||
}): EnvironmentRecord {
|
||||
// ACP: reuse existing active record by machineName
|
||||
if (req.workerType === "acp" && req.machineName) {
|
||||
const existing = storeFindEnvironmentByMachineName(req.machineName, "acp");
|
||||
if (existing) {
|
||||
Object.assign(existing, {
|
||||
status: "active",
|
||||
lastPollAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
maxSessions: req.maxSessions ?? existing.maxSessions,
|
||||
bridgeId: req.bridgeId ?? existing.bridgeId,
|
||||
capabilities: req.capabilities ?? existing.capabilities,
|
||||
username: req.username ?? existing.username,
|
||||
});
|
||||
return existing;
|
||||
}
|
||||
}
|
||||
|
||||
const id = `env_${uuid().replace(/-/g, "")}`;
|
||||
const now = new Date();
|
||||
const record: EnvironmentRecord = {
|
||||
@@ -120,6 +154,7 @@ export function storeCreateEnvironment(req: {
|
||||
maxSessions: req.maxSessions ?? 1,
|
||||
workerType: req.workerType ?? "claude_code",
|
||||
bridgeId: req.bridgeId ?? null,
|
||||
capabilities: req.capabilities ?? null,
|
||||
status: "active",
|
||||
username: req.username ?? null,
|
||||
lastPollAt: now,
|
||||
@@ -134,7 +169,7 @@ export function storeGetEnvironment(id: string): EnvironmentRecord | undefined {
|
||||
return environments.get(id);
|
||||
}
|
||||
|
||||
export function storeUpdateEnvironment(id: string, patch: Partial<Pick<EnvironmentRecord, "status" | "lastPollAt" | "updatedAt">>): boolean {
|
||||
export function storeUpdateEnvironment(id: string, patch: Partial<Pick<EnvironmentRecord, "status" | "lastPollAt" | "updatedAt" | "capabilities" | "machineName" | "maxSessions" | "bridgeId">>): boolean {
|
||||
const rec = environments.get(id);
|
||||
if (!rec) return false;
|
||||
Object.assign(rec, patch, { updatedAt: new Date() });
|
||||
@@ -272,6 +307,10 @@ export function storeIsSessionOwner(sessionId: string, uuid: string): boolean {
|
||||
return owners ? owners.has(uuid) : false;
|
||||
}
|
||||
|
||||
export function storeGetSessionOwners(sessionId: string): Set<string> | undefined {
|
||||
return sessionOwners.get(sessionId);
|
||||
}
|
||||
|
||||
export function storeListSessionsByOwnerUuid(uuid: string): SessionRecord[] {
|
||||
const result: SessionRecord[] = [];
|
||||
for (const [sessionId, owners] of sessionOwners) {
|
||||
@@ -325,6 +364,43 @@ export function storeUpdateWorkItem(id: string, patch: Partial<Pick<WorkItemReco
|
||||
return true;
|
||||
}
|
||||
|
||||
// ---------- ACP Agent (reuses EnvironmentRecord with workerType="acp") ----------
|
||||
|
||||
/** List all ACP agents (environments with workerType="acp") */
|
||||
export function storeListAcpAgents(): EnvironmentRecord[] {
|
||||
return [...environments.values()].filter((e) => e.workerType === "acp");
|
||||
}
|
||||
|
||||
/** List ACP agents by channel group (stored in bridgeId field) */
|
||||
export function storeListAcpAgentsByChannelGroup(channelGroupId: string): EnvironmentRecord[] {
|
||||
return [...environments.values()].filter(
|
||||
(e) => e.workerType === "acp" && e.bridgeId === channelGroupId,
|
||||
);
|
||||
}
|
||||
|
||||
/** List online ACP agents */
|
||||
export function storeListOnlineAcpAgents(): EnvironmentRecord[] {
|
||||
return [...environments.values()].filter(
|
||||
(e) => e.workerType === "acp" && e.status === "active",
|
||||
);
|
||||
}
|
||||
|
||||
/** Mark an ACP agent as offline */
|
||||
export function storeMarkAcpAgentOffline(id: string): boolean {
|
||||
const rec = environments.get(id);
|
||||
if (!rec || rec.workerType !== "acp") return false;
|
||||
Object.assign(rec, { status: "offline", updatedAt: new Date() });
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Mark an ACP agent as online (on reconnect) */
|
||||
export function storeMarkAcpAgentOnline(id: string): boolean {
|
||||
const rec = environments.get(id);
|
||||
if (!rec || rec.workerType !== "acp") return false;
|
||||
Object.assign(rec, { status: "active", lastPollAt: new Date(), updatedAt: new Date() });
|
||||
return true;
|
||||
}
|
||||
|
||||
// ---------- Reset (for tests) ----------
|
||||
|
||||
export function storeReset() {
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
import type { WSContext } from "hono/ws";
|
||||
import {
|
||||
findAcpConnectionByAgentId,
|
||||
sendToAgentWs,
|
||||
} from "./acp-ws-handler";
|
||||
import { getAcpEventBus } from "./event-bus";
|
||||
import type { SessionEvent } from "./event-bus";
|
||||
import { log, error as logError } from "../logger";
|
||||
|
||||
// Per-relay connection state
|
||||
interface RelayConnectionEntry {
|
||||
agentId: string;
|
||||
unsub: (() => void) | null;
|
||||
keepalive: ReturnType<typeof setInterval> | null;
|
||||
ws: WSContext;
|
||||
openTime: number;
|
||||
}
|
||||
|
||||
const relayConnections = new Map<string, RelayConnectionEntry>(); // key: relayWsId
|
||||
|
||||
const RELAY_KEEPALIVE_INTERVAL_MS = 20_000;
|
||||
|
||||
/** Send a JSON message to relay WS */
|
||||
function sendToRelayWs(ws: WSContext, msg: object): void {
|
||||
if (ws.readyState !== 1) return;
|
||||
try {
|
||||
ws.send(JSON.stringify(msg));
|
||||
} catch (err) {
|
||||
logError("[ACP-Relay] send error:", err);
|
||||
}
|
||||
}
|
||||
|
||||
/** Called from onOpen — finds target agent and bridges connection */
|
||||
export function handleRelayOpen(ws: WSContext, relayWsId: string, agentId: string): void {
|
||||
log(`[ACP-Relay] Relay connection opened: relayWsId=${relayWsId} agentId=${agentId}`);
|
||||
|
||||
// Check if agent is online
|
||||
const agentConn = findAcpConnectionByAgentId(agentId);
|
||||
if (!agentConn) {
|
||||
log(`[ACP-Relay] Agent ${agentId} not found or offline`);
|
||||
sendToRelayWs(ws, { type: "error", message: "Agent not found or offline" });
|
||||
ws.close(4004, "agent not found");
|
||||
return;
|
||||
}
|
||||
|
||||
// Keepalive interval
|
||||
const keepalive = setInterval(() => {
|
||||
const entry = relayConnections.get(relayWsId);
|
||||
if (!entry || entry.ws.readyState !== 1) {
|
||||
clearInterval(keepalive);
|
||||
return;
|
||||
}
|
||||
sendToRelayWs(entry.ws, { type: "keep_alive" });
|
||||
}, RELAY_KEEPALIVE_INTERVAL_MS);
|
||||
|
||||
// Subscribe to channel group EventBus — forward agent responses to frontend
|
||||
const channelGroupId = agentConn.channelGroupId;
|
||||
const bus = getAcpEventBus(channelGroupId);
|
||||
const unsub = bus.subscribe((event: SessionEvent) => {
|
||||
if (ws.readyState !== 1) return;
|
||||
if (event.direction !== "inbound") return;
|
||||
// Handle agent disconnect specially: send status to frontend
|
||||
if (event.type === "agent_disconnect") {
|
||||
sendToRelayWs(ws, { type: "status", payload: { connected: false } });
|
||||
return;
|
||||
}
|
||||
// Forward agent responses to the frontend WebSocket
|
||||
sendToRelayWs(ws, event.payload as object);
|
||||
});
|
||||
|
||||
relayConnections.set(relayWsId, {
|
||||
agentId,
|
||||
unsub,
|
||||
keepalive,
|
||||
ws,
|
||||
openTime: Date.now(),
|
||||
});
|
||||
|
||||
// Don't send a synthetic status message here!
|
||||
// The frontend sends a "connect" command, which acp-link processes
|
||||
// and responds with a real status message including capabilities.
|
||||
// Sending a fake status would make the frontend think it's connected
|
||||
// before the agent process is actually ready.
|
||||
|
||||
log(`[ACP-Relay] Relay established: relayWsId=${relayWsId} → agentId=${agentId}`);
|
||||
}
|
||||
|
||||
/** Called from onMessage — forwards frontend messages to acp-link */
|
||||
export function handleRelayMessage(ws: WSContext, relayWsId: string, data: string): void {
|
||||
const entry = relayConnections.get(relayWsId);
|
||||
if (!entry) return;
|
||||
|
||||
const lines = data.split("\n").filter((l) => l.trim());
|
||||
for (const line of lines) {
|
||||
let msg: Record<string, unknown>;
|
||||
try {
|
||||
msg = JSON.parse(line);
|
||||
} catch {
|
||||
logError("[ACP-Relay] parse error:", line);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ignore keepalive responses
|
||||
if (msg.type === "keep_alive") continue;
|
||||
|
||||
// Forward to acp-link agent
|
||||
const sent = sendToAgentWs(entry.agentId, msg);
|
||||
if (!sent) {
|
||||
sendToRelayWs(ws, { type: "error", message: "Agent connection lost" });
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Called from onClose — cleans up relay connection */
|
||||
export function handleRelayClose(ws: WSContext, relayWsId: string, code?: number, reason?: string): void {
|
||||
const entry = relayConnections.get(relayWsId);
|
||||
if (!entry) return;
|
||||
|
||||
const duration = Math.round((Date.now() - entry.openTime) / 1000);
|
||||
log(`[ACP-Relay] Connection closed: relayWsId=${relayWsId} agentId=${entry.agentId} code=${code ?? "none"} reason=${reason || "(none)"} duration=${duration}s`);
|
||||
|
||||
if (entry.unsub) {
|
||||
entry.unsub();
|
||||
}
|
||||
if (entry.keepalive) {
|
||||
clearInterval(entry.keepalive);
|
||||
}
|
||||
|
||||
relayConnections.delete(relayWsId);
|
||||
}
|
||||
|
||||
/** Close all relay connections (for graceful shutdown) */
|
||||
export function closeAllRelayConnections(): void {
|
||||
if (relayConnections.size === 0) return;
|
||||
|
||||
log(`[ACP-Relay] Closing ${relayConnections.size} relay connection(s)...`);
|
||||
for (const [relayWsId, entry] of relayConnections) {
|
||||
try {
|
||||
if (entry.unsub) entry.unsub();
|
||||
if (entry.keepalive) clearInterval(entry.keepalive);
|
||||
if (entry.ws.readyState === 1) {
|
||||
entry.ws.close(1001, "server_shutdown");
|
||||
}
|
||||
} catch {
|
||||
// ignore errors during shutdown
|
||||
}
|
||||
}
|
||||
relayConnections.clear();
|
||||
log("[ACP-Relay] All relay connections closed");
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
import { log } from "../logger";
|
||||
import type { Context } from "hono";
|
||||
import type { SessionEvent } from "./event-bus";
|
||||
import { getAcpEventBus } from "./event-bus";
|
||||
|
||||
/** Create SSE response stream for an ACP channel group */
|
||||
export function createAcpSSEStream(c: Context, channelGroupId: string, fromSeqNum = 0) {
|
||||
const bus = getAcpEventBus(channelGroupId);
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
const encoder = new TextEncoder();
|
||||
|
||||
// Send historical events if reconnecting
|
||||
if (fromSeqNum > 0) {
|
||||
const missed = bus.getEventsSince(fromSeqNum);
|
||||
for (const event of missed) {
|
||||
const data = JSON.stringify({
|
||||
type: event.type,
|
||||
payload: event.payload,
|
||||
direction: event.direction,
|
||||
seqNum: event.seqNum,
|
||||
channel_group_id: channelGroupId,
|
||||
});
|
||||
controller.enqueue(encoder.encode(`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`));
|
||||
}
|
||||
}
|
||||
|
||||
// Send initial keepalive
|
||||
controller.enqueue(encoder.encode(": keepalive\n\n"));
|
||||
|
||||
// Subscribe to new events
|
||||
const unsub = bus.subscribe((event) => {
|
||||
const data = JSON.stringify({
|
||||
type: event.type,
|
||||
payload: event.payload,
|
||||
direction: event.direction,
|
||||
seqNum: event.seqNum,
|
||||
channel_group_id: channelGroupId,
|
||||
});
|
||||
try {
|
||||
log(`[ACP-SSE] -> subscriber: channelGroup=${channelGroupId} type=${event.type} seq=${event.seqNum}`);
|
||||
controller.enqueue(encoder.encode(`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`));
|
||||
} catch {
|
||||
unsub();
|
||||
}
|
||||
});
|
||||
|
||||
// Keepalive interval
|
||||
const keepalive = setInterval(() => {
|
||||
try {
|
||||
controller.enqueue(encoder.encode(": keepalive\n\n"));
|
||||
} catch {
|
||||
clearInterval(keepalive);
|
||||
unsub();
|
||||
}
|
||||
}, 15000);
|
||||
|
||||
// Cleanup on abort
|
||||
c.req.raw.signal.addEventListener("abort", () => {
|
||||
unsub();
|
||||
clearInterval(keepalive);
|
||||
try {
|
||||
controller.close();
|
||||
} catch {
|
||||
// already closed
|
||||
}
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
Connection: "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
});
|
||||
}
|
||||
313
packages/remote-control-server/src/transport/acp-ws-handler.ts
Normal file
313
packages/remote-control-server/src/transport/acp-ws-handler.ts
Normal file
@@ -0,0 +1,313 @@
|
||||
import type { WSContext } from "hono/ws";
|
||||
import { v4 as uuid } from "uuid";
|
||||
import { getAcpEventBus } from "./event-bus";
|
||||
import type { SessionEvent } from "./event-bus";
|
||||
import {
|
||||
storeCreateEnvironment,
|
||||
storeGetEnvironment,
|
||||
storeMarkAcpAgentOffline,
|
||||
storeMarkAcpAgentOnline,
|
||||
storeUpdateEnvironment,
|
||||
} from "../store";
|
||||
import { config } from "../config";
|
||||
import { log, error as logError } from "../logger";
|
||||
|
||||
// Per-connection state
|
||||
interface AcpConnectionEntry {
|
||||
agentId: string | null; // Set after register message
|
||||
channelGroupId: string;
|
||||
unsub: (() => void) | null;
|
||||
keepalive: ReturnType<typeof setInterval> | null;
|
||||
ws: WSContext;
|
||||
openTime: number;
|
||||
lastClientActivity: number;
|
||||
capabilities: Record<string, unknown> | null;
|
||||
}
|
||||
|
||||
const connections = new Map<string, AcpConnectionEntry>(); // key: wsId
|
||||
|
||||
const SERVER_KEEPALIVE_INTERVAL_MS = config.wsKeepaliveInterval * 1000;
|
||||
const CLIENT_ACTIVITY_TIMEOUT_MS = SERVER_KEEPALIVE_INTERVAL_MS * 3;
|
||||
|
||||
/** Send a JSON message to a WS connection (NDJSON format) */
|
||||
function sendToWs(ws: WSContext, msg: object): void {
|
||||
if (ws.readyState !== 1) return;
|
||||
try {
|
||||
ws.send(JSON.stringify(msg) + "\n");
|
||||
} catch (err) {
|
||||
logError("[ACP-WS] send error:", err);
|
||||
}
|
||||
}
|
||||
|
||||
/** Called from onOpen — initializes connection tracking */
|
||||
export function handleAcpWsOpen(ws: WSContext, wsId: string): void {
|
||||
log(`[ACP-WS] Connection opened: wsId=${wsId}`);
|
||||
|
||||
const keepalive = setInterval(() => {
|
||||
const entry = connections.get(wsId);
|
||||
if (!entry || entry.ws.readyState !== 1) {
|
||||
clearInterval(keepalive);
|
||||
return;
|
||||
}
|
||||
const silenceMs = Date.now() - entry.lastClientActivity;
|
||||
if (silenceMs > CLIENT_ACTIVITY_TIMEOUT_MS) {
|
||||
log(`[ACP-WS] Client inactive for ${Math.round(silenceMs / 1000)}s, closing dead connection`);
|
||||
try {
|
||||
entry.ws.close(1000, "client inactive");
|
||||
} catch {
|
||||
clearInterval(keepalive);
|
||||
}
|
||||
return;
|
||||
}
|
||||
sendToWs(entry.ws, { type: "keep_alive" });
|
||||
}, SERVER_KEEPALIVE_INTERVAL_MS);
|
||||
|
||||
connections.set(wsId, {
|
||||
agentId: null,
|
||||
channelGroupId: "",
|
||||
unsub: null,
|
||||
keepalive,
|
||||
ws,
|
||||
openTime: Date.now(),
|
||||
lastClientActivity: Date.now(),
|
||||
capabilities: null,
|
||||
});
|
||||
}
|
||||
|
||||
/** Handle register message — legacy WS-only registration (still supported) */
|
||||
function handleRegister(wsId: string, msg: Record<string, unknown>): void {
|
||||
const entry = connections.get(wsId);
|
||||
if (!entry) return;
|
||||
|
||||
if (entry.agentId) {
|
||||
sendToWs(entry.ws, { type: "error", message: "Already registered" });
|
||||
return;
|
||||
}
|
||||
|
||||
const agentName = (msg.agent_name as string) || "unknown";
|
||||
const capabilities = msg.capabilities as Record<string, unknown> | undefined;
|
||||
const channelGroupId = (msg.channel_group_id as string) || `group_${uuid().replace(/-/g, "").slice(0, 12)}`;
|
||||
const acpLinkVersion = (msg.acp_link_version as string) || null;
|
||||
const maxSessions = typeof msg.max_sessions === "number" ? msg.max_sessions : 1;
|
||||
|
||||
// Create EnvironmentRecord with workerType="acp"
|
||||
const secret = config.apiKeys[0] || "";
|
||||
const record = storeCreateEnvironment({
|
||||
secret,
|
||||
machineName: agentName,
|
||||
workerType: "acp",
|
||||
bridgeId: channelGroupId,
|
||||
maxSessions,
|
||||
capabilities: capabilities || undefined,
|
||||
} as Parameters<typeof storeCreateEnvironment>[0]);
|
||||
|
||||
// Store ACP-specific metadata via environment update
|
||||
storeUpdateEnvironment(record.id, {
|
||||
status: "active",
|
||||
} as Parameters<typeof storeUpdateEnvironment>[1]);
|
||||
|
||||
entry.agentId = record.id;
|
||||
entry.channelGroupId = channelGroupId;
|
||||
entry.capabilities = capabilities || null;
|
||||
|
||||
// Subscribe to channel group EventBus — broadcast events to this WS
|
||||
const bus = getAcpEventBus(channelGroupId);
|
||||
const unsub = bus.subscribe((event: SessionEvent) => {
|
||||
if (entry.ws.readyState !== 1) return;
|
||||
if (event.direction !== "outbound") return;
|
||||
// Forward outbound events as raw ACP messages
|
||||
sendToWs(entry.ws, event.payload as object);
|
||||
});
|
||||
entry.unsub = unsub;
|
||||
|
||||
log(`[ACP-WS] Agent registered (legacy WS): agentId=${record.id} channelGroup=${channelGroupId} name=${agentName}`);
|
||||
sendToWs(entry.ws, {
|
||||
type: "registered",
|
||||
agent_id: record.id,
|
||||
channel_group_id: channelGroupId,
|
||||
});
|
||||
}
|
||||
|
||||
/** Handle identify message — binds WS to an existing agent registered via REST */
|
||||
function handleIdentify(wsId: string, msg: Record<string, unknown>): void {
|
||||
const entry = connections.get(wsId);
|
||||
if (!entry) return;
|
||||
|
||||
if (entry.agentId) {
|
||||
sendToWs(entry.ws, { type: "error", message: "Already identified" });
|
||||
return;
|
||||
}
|
||||
|
||||
const agentId = msg.agent_id as string;
|
||||
if (!agentId) {
|
||||
sendToWs(entry.ws, { type: "error", message: "Missing agent_id" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Look up the environment record (created via REST registration)
|
||||
const record = storeGetEnvironment(agentId);
|
||||
if (!record || record.workerType !== "acp") {
|
||||
sendToWs(entry.ws, { type: "error", message: "Agent not found" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Update status to active
|
||||
storeMarkAcpAgentOnline(agentId);
|
||||
|
||||
const channelGroupId = record.bridgeId || `group_${uuid().replace(/-/g, "").slice(0, 12)}`;
|
||||
|
||||
entry.agentId = record.id;
|
||||
entry.channelGroupId = channelGroupId;
|
||||
entry.capabilities = record.capabilities || null;
|
||||
|
||||
// Subscribe to channel group EventBus — broadcast events to this WS
|
||||
const bus = getAcpEventBus(channelGroupId);
|
||||
const unsub = bus.subscribe((event: SessionEvent) => {
|
||||
if (entry.ws.readyState !== 1) return;
|
||||
if (event.direction !== "outbound") return;
|
||||
sendToWs(entry.ws, event.payload as object);
|
||||
});
|
||||
entry.unsub = unsub;
|
||||
|
||||
log(`[ACP-WS] Agent identified (REST+WS): agentId=${record.id} channelGroup=${channelGroupId}`);
|
||||
sendToWs(entry.ws, {
|
||||
type: "identified",
|
||||
agent_id: record.id,
|
||||
channel_group_id: channelGroupId,
|
||||
});
|
||||
}
|
||||
|
||||
/** Called from onMessage — processes NDJSON lines */
|
||||
export function handleAcpWsMessage(ws: WSContext, wsId: string, data: string): void {
|
||||
const entry = connections.get(wsId);
|
||||
if (!entry) return;
|
||||
|
||||
entry.lastClientActivity = Date.now();
|
||||
|
||||
const lines = data.split("\n").filter((l) => l.trim());
|
||||
for (const line of lines) {
|
||||
let msg: Record<string, unknown>;
|
||||
try {
|
||||
msg = JSON.parse(line);
|
||||
} catch {
|
||||
logError("[ACP-WS] parse error:", line);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle keepalive
|
||||
if (msg.type === "keep_alive") {
|
||||
// Update last activity timestamp (only if registered)
|
||||
if (entry.agentId) {
|
||||
storeUpdateEnvironment(entry.agentId, { lastPollAt: new Date() } as Parameters<typeof storeUpdateEnvironment>[1]);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle registration (legacy WS-only)
|
||||
if (msg.type === "register") {
|
||||
handleRegister(wsId, msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle identify (REST registration + WS binding)
|
||||
if (msg.type === "identify") {
|
||||
handleIdentify(wsId, msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Not registered yet — reject
|
||||
if (!entry.agentId) {
|
||||
sendToWs(entry.ws, { type: "error", message: "Not registered. Send register message first." });
|
||||
continue;
|
||||
}
|
||||
|
||||
// Update agent activity
|
||||
storeUpdateEnvironment(entry.agentId, { lastPollAt: new Date() } as Parameters<typeof storeUpdateEnvironment>[1]);
|
||||
|
||||
// Pass-through: publish to channel group EventBus as inbound
|
||||
const bus = getAcpEventBus(entry.channelGroupId);
|
||||
bus.publish({
|
||||
id: uuid(),
|
||||
sessionId: entry.channelGroupId,
|
||||
type: (msg.type as string) || "acp_message",
|
||||
payload: msg,
|
||||
direction: "inbound",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/** Called from onClose — marks agent offline and cleans up */
|
||||
export function handleAcpWsClose(ws: WSContext, wsId: string, code?: number, reason?: string): void {
|
||||
const entry = connections.get(wsId);
|
||||
if (!entry) return;
|
||||
|
||||
const duration = Math.round((Date.now() - entry.openTime) / 1000);
|
||||
log(`[ACP-WS] Connection closed: wsId=${wsId} agentId=${entry.agentId} code=${code ?? "none"} reason=${reason || "(none)"} duration=${duration}s`);
|
||||
|
||||
if (entry.unsub) {
|
||||
entry.unsub();
|
||||
}
|
||||
if (entry.keepalive) {
|
||||
clearInterval(entry.keepalive);
|
||||
}
|
||||
|
||||
// Mark agent as offline (don't delete record — allow reconnect)
|
||||
if (entry.agentId) {
|
||||
storeMarkAcpAgentOffline(entry.agentId);
|
||||
|
||||
// Notify all relay connections that this agent is gone
|
||||
if (entry.channelGroupId) {
|
||||
const bus = getAcpEventBus(entry.channelGroupId);
|
||||
bus.publish({
|
||||
id: uuid(),
|
||||
sessionId: entry.channelGroupId,
|
||||
type: "agent_disconnect",
|
||||
payload: { agentId: entry.agentId },
|
||||
direction: "inbound",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
connections.delete(wsId);
|
||||
}
|
||||
|
||||
/** Find an active ACP connection by agent ID */
|
||||
export function findAcpConnectionByAgentId(agentId: string): AcpConnectionEntry | null {
|
||||
for (const entry of connections.values()) {
|
||||
if (entry.agentId === agentId && entry.ws.readyState === 1) {
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Send a JSON message directly to an agent's WebSocket connection */
|
||||
export function sendToAgentWs(agentId: string, msg: object): boolean {
|
||||
const entry = findAcpConnectionByAgentId(agentId);
|
||||
if (!entry) return false;
|
||||
sendToWs(entry.ws, msg);
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Gracefully close all ACP WebSocket connections */
|
||||
export function closeAllAcpConnections(): void {
|
||||
if (connections.size === 0) return;
|
||||
|
||||
log(`[ACP-WS] Gracefully closing ${connections.size} ACP connection(s)...`);
|
||||
for (const [wsId, entry] of connections) {
|
||||
try {
|
||||
if (entry.unsub) entry.unsub();
|
||||
if (entry.keepalive) clearInterval(entry.keepalive);
|
||||
if (entry.ws.readyState === 1) {
|
||||
entry.ws.close(1001, "server_shutdown");
|
||||
}
|
||||
if (entry.agentId) {
|
||||
storeMarkAcpAgentOffline(entry.agentId);
|
||||
}
|
||||
} catch {
|
||||
// ignore errors during shutdown
|
||||
}
|
||||
}
|
||||
connections.clear();
|
||||
log("[ACP-WS] All connections closed");
|
||||
}
|
||||
@@ -12,6 +12,8 @@ export interface SessionEvent {
|
||||
|
||||
type Subscriber = (event: SessionEvent) => void;
|
||||
|
||||
const MAX_EVENTS_PER_BUS = 5000;
|
||||
|
||||
export class EventBus {
|
||||
private subscribers = new Set<Subscriber>();
|
||||
private events: SessionEvent[] = [];
|
||||
@@ -35,7 +37,14 @@ export class EventBus {
|
||||
createdAt: Date.now(),
|
||||
};
|
||||
this.events.push(full);
|
||||
log(`[RC-DEBUG] bus publish: sessionId=${event.sessionId} type=${event.type} dir=${event.direction} seq=${full.seqNum} subscribers=${this.subscribers.size}`);
|
||||
// Evict oldest events when exceeding limit
|
||||
if (this.events.length > MAX_EVENTS_PER_BUS) {
|
||||
this.events = this.events.slice(-Math.floor(MAX_EVENTS_PER_BUS / 2));
|
||||
}
|
||||
log(
|
||||
`[RC-DEBUG] bus publish: sessionId=${event.sessionId} type=${event.type} dir=${event.direction} seq=${full.seqNum} subscribers=${this.subscribers.size}`,
|
||||
event.type === "error" ? `payload=${JSON.stringify(event.payload)}` : "",
|
||||
);
|
||||
for (const cb of this.subscribers) {
|
||||
try {
|
||||
cb(full);
|
||||
@@ -85,3 +94,23 @@ export function removeEventBus(sessionId: string) {
|
||||
export function getAllEventBuses(): Map<string, EventBus> {
|
||||
return buses;
|
||||
}
|
||||
|
||||
/** Global registry of per-channel-group ACP event buses */
|
||||
const acpBuses = new Map<string, EventBus>();
|
||||
|
||||
export function getAcpEventBus(channelGroupId: string): EventBus {
|
||||
let bus = acpBuses.get(channelGroupId);
|
||||
if (!bus) {
|
||||
bus = new EventBus();
|
||||
acpBuses.set(channelGroupId, bus);
|
||||
}
|
||||
return bus;
|
||||
}
|
||||
|
||||
export function removeAcpEventBus(channelGroupId: string) {
|
||||
const bus = acpBuses.get(channelGroupId);
|
||||
if (bus) {
|
||||
bus.close();
|
||||
acpBuses.delete(channelGroupId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import type { SessionEvent } from "./event-bus";
|
||||
import { publishSessionEvent } from "../services/transport";
|
||||
import { log, error as logError } from "../logger";
|
||||
import { toClientPayload } from "./client-payload";
|
||||
import { config } from "../config";
|
||||
|
||||
// Per-connection cleanup, keyed by sessionId (only one WS per session)
|
||||
interface CleanupEntry {
|
||||
@@ -11,15 +12,20 @@ interface CleanupEntry {
|
||||
keepalive: ReturnType<typeof setInterval>;
|
||||
ws: WSContext;
|
||||
openTime: number;
|
||||
lastClientActivity: number;
|
||||
}
|
||||
const cleanupBySession = new Map<string, CleanupEntry>();
|
||||
|
||||
// Track all active WS connections for graceful shutdown
|
||||
const activeConnections = new Set<WSContext>();
|
||||
|
||||
// Bridge sends keep_alive data frames every 120s. Send server-side keep_alive
|
||||
// every 60s to ensure the connection stays alive even without user messages.
|
||||
const SERVER_KEEPALIVE_INTERVAL_MS = 60_000;
|
||||
// Server-side keepalive interval (configurable via RCS_WS_KEEPALIVE_INTERVAL).
|
||||
// Sends data frames to keep reverse proxies from closing idle connections.
|
||||
const SERVER_KEEPALIVE_INTERVAL_MS = (config.wsKeepaliveInterval || 20) * 1000;
|
||||
|
||||
// If no client data received within this threshold, the connection is
|
||||
// considered dead. Set to 3x keepalive to tolerate one missed interval.
|
||||
const CLIENT_ACTIVITY_TIMEOUT_MS = SERVER_KEEPALIVE_INTERVAL_MS * 3;
|
||||
|
||||
/**
|
||||
* Convert internal EventBus event -> SDK message for bridge client.
|
||||
@@ -33,6 +39,7 @@ function toSDKMessage(event: SessionEvent): string {
|
||||
/** Called from onOpen — subscribes to event bus, forwards outbound events to bridge WS */
|
||||
export function handleWebSocketOpen(ws: WSContext, sessionId: string) {
|
||||
const openTime = Date.now();
|
||||
const lastClientActivity = Date.now();
|
||||
log(`[RC-DEBUG] [WS] Open session=${sessionId}`);
|
||||
activeConnections.add(ws);
|
||||
|
||||
@@ -79,6 +86,17 @@ export function handleWebSocketOpen(ws: WSContext, sessionId: string) {
|
||||
clearInterval(keepalive);
|
||||
return;
|
||||
}
|
||||
// Check if client is still alive — close if no data received for too long
|
||||
const silenceMs = Date.now() - lastClientActivity;
|
||||
if (silenceMs > CLIENT_ACTIVITY_TIMEOUT_MS) {
|
||||
log(`[WS] Client inactive for ${Math.round(silenceMs / 1000)}s on session=${sessionId}, closing dead connection`);
|
||||
try {
|
||||
ws.close(1000, "client inactive");
|
||||
} catch {
|
||||
clearInterval(keepalive);
|
||||
}
|
||||
return;
|
||||
}
|
||||
try {
|
||||
ws.send('{"type":"keep_alive"}\n');
|
||||
} catch {
|
||||
@@ -86,13 +104,18 @@ export function handleWebSocketOpen(ws: WSContext, sessionId: string) {
|
||||
}
|
||||
}, SERVER_KEEPALIVE_INTERVAL_MS);
|
||||
|
||||
cleanupBySession.set(sessionId, { unsub, keepalive, ws, openTime });
|
||||
cleanupBySession.set(sessionId, { unsub, keepalive, ws, openTime, lastClientActivity });
|
||||
}
|
||||
|
||||
/**
|
||||
* Called from onMessage — bridge sends newline-delimited JSON.
|
||||
*/
|
||||
export function handleWebSocketMessage(ws: WSContext, sessionId: string, data: string) {
|
||||
// Track client activity for dead-connection detection
|
||||
const entry = cleanupBySession.get(sessionId);
|
||||
if (entry) {
|
||||
entry.lastClientActivity = Date.now();
|
||||
}
|
||||
const lines = data.split("\n").filter((l) => l.trim());
|
||||
for (const line of lines) {
|
||||
try {
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
export { upgradeWebSocket, websocket } from "hono/bun";
|
||||
@@ -19,6 +19,7 @@ export interface RegisterEnvironmentRequest {
|
||||
max_sessions?: number;
|
||||
worker_type?: string;
|
||||
bridge_id?: string;
|
||||
capabilities?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface RegisterEnvironmentResponse {
|
||||
@@ -105,6 +106,8 @@ export interface EnvironmentResponse {
|
||||
status: string;
|
||||
username: string | null;
|
||||
last_poll_at: number | null;
|
||||
worker_type?: string;
|
||||
capabilities?: Record<string, unknown> | null;
|
||||
}
|
||||
|
||||
export interface SessionSummaryResponse {
|
||||
|
||||
Reference in New Issue
Block a user