import { createLogger } from './logger.js' import { decodeJsonWsMessage, WsPayloadTooLargeError } from './ws-message.js' import { encodeWebSocketAuthProtocol } from './ws-auth.js' export interface RcsUpstreamConfig { rcsUrl: string // e.g. "http://localhost:3000" apiToken: string agentName: string channelGroupId?: string capabilities?: Record maxSessions?: number } export function buildRcsWsUrl(rcsUrl: string): string { let raw = rcsUrl raw = raw.replace(/^http:\/\//, 'ws://').replace(/^https:\/\//, 'wss://') const url = new URL(raw) const path = url.pathname.replace(/\/+$/, '') if (!path || path === '/') { url.pathname = '/acp/ws' } url.searchParams.delete('token') return url.toString() } /** * RCS upstream client — connects acp-link to a Remote Control Server. * * Lifecycle: * 1. connect() — opens WS to RCS * 2. Sends register message * 3. Waits for registered response * 4. Forwards all ACP events via send() * 5. Reconnects with exponential backoff on failure */ export class RcsUpstreamClient { private static log = createLogger('rcs-upstream') private ws: WebSocket | null = null private registered = false private reconnectAttempts = 0 private closed = false private readonly maxReconnectDelay = 30_000 private readonly baseReconnectDelay = 1_000 /** Agent ID obtained from REST registration */ private agentId: string | null = null /** Session ID from REST registration (ACP agents auto-create a session) */ private sessionId: string | undefined /** Handler for incoming ACP messages from RCS relay */ private messageHandler: ((message: Record) => void) | null = null constructor(private config: RcsUpstreamConfig) {} /** Get the agent ID from REST registration */ getAgentId(): string | null { return this.agentId } /** Set handler for incoming ACP messages from RCS relay */ setMessageHandler(handler: (message: Record) => void): void { this.messageHandler = handler } /** Register via REST API before establishing WS connection */ private async registerViaRest(): Promise { const baseUrl = this.config.rcsUrl .replace(/^ws:\/\//, 'http://') .replace(/^wss:\/\//, 'https://') .replace(/\/acp\/ws.*$/, '') .replace(/\/$/, '') const url = `${baseUrl}/v1/environments/bridge` RcsUpstreamClient.log.info({ url }, 'REST register') const resp = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${this.config.apiToken}`, }, body: JSON.stringify({ machine_name: this.config.agentName, worker_type: 'acp', bridge_id: this.config.channelGroupId || undefined, max_sessions: this.config.maxSessions, capabilities: this.config.capabilities, }), }) if (!resp.ok) { const text = await resp.text() throw new Error(`REST register failed (${resp.status}): ${text}`) } const data = (await resp.json()) as { environment_id: string environment_secret: string status: string session_id?: string } this.agentId = data.environment_id this.sessionId = data.session_id RcsUpstreamClient.log.info( { agentId: this.agentId, sessionId: this.sessionId }, 'REST register success', ) return data.environment_id } /** Normalize RCS URL: accept http(s) base URL and convert to ws(s) + /acp/ws path */ private buildWsUrl(): string { return buildRcsWsUrl(this.config.rcsUrl) } /** Open connection to RCS: REST register → WS identify */ async connect(): Promise { if (this.closed) return // Step 1: REST registration try { await this.registerViaRest() } catch (err) { RcsUpstreamClient.log.error({ err }, 'REST registration failed') if (!this.closed) { this.scheduleReconnect() } return } // Step 2: WebSocket connection with identify const wsUrl = this.buildWsUrl() RcsUpstreamClient.log.info({ url: wsUrl }, 'connecting WS') return new Promise((resolve, reject) => { try { this.ws = new WebSocket(wsUrl, [ encodeWebSocketAuthProtocol(this.config.apiToken), ]) this.ws.onopen = () => { RcsUpstreamClient.log.debug('ws open — sending identify') this.ws!.send( JSON.stringify({ type: 'identify', agent_id: this.agentId, }), ) } this.ws.onmessage = event => { let data: Record try { data = decodeJsonWsMessage(event.data) } catch (err) { if (err instanceof WsPayloadTooLargeError) { RcsUpstreamClient.log.warn( { error: err.message }, 'server message too large', ) this.ws?.close(1009, 'message too large') return } RcsUpstreamClient.log.warn( { raw: String(event.data).slice(0, 200) }, 'invalid JSON from server', ) return } if (data.type === 'identified') { RcsUpstreamClient.log.info( { agent_id: data.agent_id, channel_group_id: data.channel_group_id, }, 'identified', ) this.registered = true this.reconnectAttempts = 0 const webBase = this.config.rcsUrl .replace(/^ws:\/\//, 'http://') .replace(/^wss:\/\//, 'https://') .replace(/\/acp\/ws.*$/, '') .replace(/\/$/, '') console.log() console.log(` 🔗 Dashboard: ${webBase}/code/`) if (this.agentId) { console.log(` Agent ID: ${this.agentId}`) } console.log() resolve() } else if (data.type === 'registered') { // Legacy fallback: server still uses old register flow RcsUpstreamClient.log.info( { agent_id: data.agent_id }, 'registered (legacy)', ) this.agentId = (data.agent_id as string) || this.agentId this.registered = true this.reconnectAttempts = 0 resolve() } else if (data.type === 'error') { RcsUpstreamClient.log.error( { message: data.message }, 'server error', ) if (!this.registered) { reject(new Error(data.message as string)) } } else if (data.type === 'keep_alive') { // ignore keepalive } else { // Forward ACP protocol messages to handler (for RCS relay support) RcsUpstreamClient.log.debug( { type: data.type }, 'forwarding to relay handler', ) this.messageHandler?.(data) } } this.ws.onerror = () => { // onclose fires after onerror with the actual close code, so we log there if (!this.registered) { reject(new Error('WebSocket connection failed')) } } this.ws.onclose = event => { RcsUpstreamClient.log.info( { code: event.code, reason: event.reason || undefined }, 'ws closed', ) this.registered = false this.ws = null if (!this.closed) { this.scheduleReconnect() } } } catch (err) { RcsUpstreamClient.log.error({ err }, 'connect threw') reject(err) } }) } /** Send an ACP message to RCS for broadcast */ send(message: object): void { if (!this.ws || this.ws.readyState !== WebSocket.OPEN || !this.registered) { return } try { this.ws.send(JSON.stringify(message)) } catch (err) { RcsUpstreamClient.log.error({ err }, 'send failed') } } /** Check if registered with RCS */ isRegistered(): boolean { return ( this.registered && this.ws !== null && this.ws.readyState === WebSocket.OPEN ) } /** Close the RCS connection permanently */ async close(): Promise { this.closed = true this.registered = false if (this.ws) { this.ws.close(1000, 'client shutdown') this.ws = null } RcsUpstreamClient.log.info('closed') } private scheduleReconnect(): void { if (this.closed) return const delay = Math.min( this.baseReconnectDelay * 2 ** this.reconnectAttempts, this.maxReconnectDelay, ) const jitter = delay * Math.random() * 0.2 const actualDelay = delay + jitter this.reconnectAttempts++ RcsUpstreamClient.log.warn( { attempt: this.reconnectAttempts, delayMs: Math.round(actualDelay) }, 'reconnecting', ) setTimeout(async () => { if (this.closed) return try { await this.connect() } catch { // connect() itself logs the error; nothing to add here } }, actualDelay) } }