fix(remote-control): harden self-hosted session flows (#278)

Co-authored-by: chengzifeng <chengzifeng@meituan.com>
This commit is contained in:
Cheng Zi Feng
2026-04-16 10:46:31 +08:00
committed by GitHub
parent 5a4c820e1d
commit fe08cacf8d
24 changed files with 1252 additions and 162 deletions

View File

@@ -115,3 +115,109 @@ export function createSSEStream(c: Context, sessionId: string, fromSeqNum = 0) {
},
});
}
function toWorkerClientPayload(event: SessionEvent): Record<string, unknown> {
const normalized =
event.payload && typeof event.payload === "object"
? (event.payload as Record<string, unknown>)
: undefined;
const raw =
normalized?.raw && typeof normalized.raw === "object" && !Array.isArray(normalized.raw)
? (normalized.raw as Record<string, unknown>)
: undefined;
const payload: Record<string, unknown> = {
...(raw ?? normalized ?? {}),
type: event.type,
};
if (event.type === "user") {
const message = payload.message;
if (!message || typeof message !== "object" || !("content" in message)) {
const content =
typeof normalized?.content === "string"
? normalized.content
: typeof payload.content === "string"
? payload.content
: typeof event.payload === "string"
? event.payload
: "";
payload.content = content;
payload.message = { content };
}
}
return payload;
}
function toWorkerClientFrame(event: SessionEvent): string {
const data = JSON.stringify({
event_id: event.id,
sequence_num: event.seqNum,
event_type: event.type,
source: "client",
payload: toWorkerClientPayload(event),
created_at: new Date(event.createdAt).toISOString(),
});
return `id: ${event.seqNum}\nevent: client_event\ndata: ${data}\n\n`;
}
/** Create CCR worker SSE stream (client_event frames, outbound events only). */
export function createWorkerEventStream(c: Context, sessionId: string, fromSeqNum = 0) {
const bus = getEventBus(sessionId);
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
if (fromSeqNum > 0) {
const missed = bus
.getEventsSince(fromSeqNum)
.filter((event) => event.direction === "outbound");
for (const event of missed) {
controller.enqueue(encoder.encode(toWorkerClientFrame(event)));
}
}
controller.enqueue(encoder.encode(": keepalive\n\n"));
const unsub = bus.subscribe((event) => {
if (event.direction !== "outbound") {
return;
}
try {
controller.enqueue(encoder.encode(toWorkerClientFrame(event)));
} catch {
unsub();
}
});
const keepalive = setInterval(() => {
try {
controller.enqueue(encoder.encode(": keepalive\n\n"));
} catch {
clearInterval(keepalive);
unsub();
}
}, 15000);
c.req.raw.signal.addEventListener("abort", () => {
unsub();
clearInterval(keepalive);
try {
controller.close();
} catch {
// already closed
}
});
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
});
}

View File

@@ -24,13 +24,14 @@ const SERVER_KEEPALIVE_INTERVAL_MS = 60_000;
*/
function toSDKMessage(event: SessionEvent): string {
const payload = event.payload as Record<string, unknown> | null;
const messageUuid = typeof payload?.uuid === "string" && payload.uuid ? payload.uuid : event.id;
let msg: Record<string, unknown>;
if (event.type === "user" || event.type === "user_message") {
msg = {
type: "user",
uuid: event.id,
uuid: messageUuid,
session_id: event.sessionId,
message: {
role: "user",
@@ -82,7 +83,7 @@ function toSDKMessage(event: SessionEvent): string {
} else {
msg = {
type: event.type,
uuid: event.id,
uuid: messageUuid,
session_id: event.sessionId,
message: payload,
};