import type { Context } from "hono"; import type { SessionEvent } from "./event-bus"; import { getEventBus } from "./event-bus"; export interface SSEWriter { send(event: SessionEvent): void; close(): void; } export function createSSEWriter(c: Context): SSEWriter { const stream = new ReadableStream({ start(controller) { const encoder = new TextEncoder(); c.req.raw.signal.addEventListener("abort", () => { controller.close(); }); // Store encoder and controller for later use (c as any)._sseEncoder = encoder; (c as any)._sseController = controller; }, }); return { send(event: SessionEvent) { const encoder = (c as any)._sseEncoder as TextEncoder; const controller = (c as any)._sseController as ReadableStreamDefaultController; if (!encoder || !controller) return; const data = JSON.stringify({ type: event.type, payload: event.payload, direction: event.direction, seqNum: event.seqNum, }); const msg = `id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`; controller.enqueue(encoder.encode(msg)); }, close() { const controller = (c as any)._sseController as ReadableStreamDefaultController; controller?.close(); }, }; } /** Create SSE response stream for a session */ export function createSSEStream(c: Context, sessionId: string, fromSeqNum = 0) { const bus = getEventBus(sessionId); const stream = new ReadableStream({ start(controller) { const encoder = new TextEncoder(); // Send historical events if reconnecting if (fromSeqNum > 0) { const missed = bus.getEventsSince(fromSeqNum); for (const event of missed) { const data = JSON.stringify({ type: event.type, payload: event.payload, direction: event.direction, seqNum: event.seqNum, }); controller.enqueue(encoder.encode(`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`)); } } // Send initial keepalive controller.enqueue(encoder.encode(": keepalive\n\n")); // Subscribe to new events const unsub = bus.subscribe((event) => { const data = JSON.stringify({ type: event.type, payload: event.payload, direction: event.direction, seqNum: event.seqNum, }); try { console.log(`[RC-DEBUG] SSE -> web: sessionId=${sessionId} type=${event.type} dir=${event.direction} seq=${event.seqNum}`); controller.enqueue(encoder.encode(`id: ${event.seqNum}\nevent: message\ndata: ${data}\n\n`)); } catch { unsub(); } }); // Keepalive interval const keepalive = setInterval(() => { try { controller.enqueue(encoder.encode(": keepalive\n\n")); } catch { clearInterval(keepalive); unsub(); } }, 15000); // Cleanup on abort c.req.raw.signal.addEventListener("abort", () => { unsub(); clearInterval(keepalive); try { controller.close(); } catch { // already closed } }); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", "X-Accel-Buffering": "no", }, }); }