fix: prevent SSE listener leak in console-logs stream (#751)

The ReadableStream cancel() callback is not reliably invoked on client
disconnect under Next.js, causing emitter listeners (line/clear) and the
keepalive interval to accumulate, eventually triggering
MaxListenersExceededWarning.

Use request.signal as the primary disconnect trigger, with cancel() and
enqueue failures as fallbacks. Cleanup is idempotent via state.closed.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
zerray
2026-05-03 16:11:47 +08:00
committed by GitHub
parent bb260a8c65
commit 52ad3b6630

View File

@@ -4,10 +4,23 @@ export const dynamic = "force-dynamic";
initConsoleLogCapture();
export async function GET() {
export async function GET(request) {
const encoder = new TextEncoder();
const emitter = getConsoleEmitter();
const state = { closed: false, send: null, keepalive: null };
const state = { closed: false, send: null, sendClear: null, keepalive: null };
// Idempotent: safe to call from request.signal abort, cancel(), or enqueue failure.
const cleanup = () => {
if (state.closed) return;
state.closed = true;
if (state.send) emitter.off("line", state.send);
if (state.sendClear) emitter.off("clear", state.sendClear);
if (state.keepalive) clearInterval(state.keepalive);
};
// request.signal fires reliably on client disconnect; ReadableStream.cancel()
// is not always invoked in Next.js, which caused listeners to accumulate.
request.signal.addEventListener("abort", cleanup, { once: true });
const stream = new ReadableStream({
start(controller) {
@@ -23,7 +36,7 @@ export async function GET() {
try {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: "line", line })}\n\n`));
} catch {
state.closed = true;
cleanup();
}
};
@@ -33,7 +46,7 @@ export async function GET() {
try {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: "clear" })}\n\n`));
} catch {
state.closed = true;
cleanup();
}
};
@@ -46,17 +59,13 @@ export async function GET() {
try {
controller.enqueue(encoder.encode(": ping\n\n"));
} catch {
state.closed = true;
clearInterval(state.keepalive);
cleanup();
}
}, 25000);
},
cancel() {
state.closed = true;
emitter.off("line", state.send);
emitter.off("clear", state.sendClear);
clearInterval(state.keepalive);
cleanup();
},
});