Files
9router/open-sse/utils/stream.js
2026-03-14 09:37:29 +07:00

393 lines
14 KiB
JavaScript

import { translateResponse, initState } from "../translator/index.js";
import { FORMATS } from "../translator/formats.js";
import { trackPendingRequest, appendRequestLog } from "@/lib/usageDb.js";
import { extractUsage, hasValidUsage, estimateUsage, logUsage, addBufferToUsage, filterUsageForFormat, COLORS } from "./usageTracking.js";
import { parseSSELine, hasValuableContent, fixInvalidId, formatSSE } from "./streamHelpers.js";
export { COLORS, formatSSE };
// sharedEncoder is stateless — safe to share across streams
const sharedEncoder = new TextEncoder();
/**
* Stream modes
*/
const STREAM_MODE = {
TRANSLATE: "translate", // Full translation between formats
PASSTHROUGH: "passthrough" // No translation, normalize output, extract usage
};
/**
* Create unified SSE transform stream
* @param {object} options
* @param {string} options.mode - Stream mode: translate, passthrough
* @param {string} options.targetFormat - Provider format (for translate mode)
* @param {string} options.sourceFormat - Client format (for translate mode)
* @param {string} options.provider - Provider name
* @param {object} options.reqLogger - Request logger instance
* @param {string} options.model - Model name
* @param {string} options.connectionId - Connection ID for usage tracking
* @param {object} options.body - Request body (for input token estimation)
* @param {function} options.onStreamComplete - Callback when stream completes (content, usage)
* @param {string} options.apiKey - API key for usage tracking
*/
export function createSSEStream(options = {}) {
const {
mode = STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider = null,
reqLogger = null,
toolNameMap = null,
model = null,
connectionId = null,
body = null,
onStreamComplete = null,
apiKey = null
} = options;
let buffer = "";
let usage = null;
// Per-stream decoder with stream:true to correctly handle multi-byte chars split across chunks
const decoder = new TextDecoder("utf-8", { fatal: false });
const state = mode === STREAM_MODE.TRANSLATE ? { ...initState(sourceFormat), provider, toolNameMap, model } : null;
let totalContentLength = 0;
let accumulatedContent = "";
let accumulatedThinking = "";
let ttftAt = null;
return new TransformStream({
transform(chunk, controller) {
if (!ttftAt) {
ttftAt = Date.now();
}
const text = decoder.decode(chunk, { stream: true });
buffer += text;
reqLogger?.appendProviderChunk?.(text);
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
const trimmed = line.trim();
// Passthrough mode: normalize and forward
if (mode === STREAM_MODE.PASSTHROUGH) {
let output;
let injectedUsage = false;
if (trimmed.startsWith("data:") && trimmed.slice(5).trim() !== "[DONE]") {
try {
const parsed = JSON.parse(trimmed.slice(5).trim());
const idFixed = fixInvalidId(parsed);
// Ensure OpenAI-required fields are present on streaming chunks (Letta compat)
let fieldsInjected = false;
if (parsed.choices !== undefined) {
if (!parsed.object) { parsed.object = "chat.completion.chunk"; fieldsInjected = true; }
if (!parsed.created) { parsed.created = Math.floor(Date.now() / 1000); fieldsInjected = true; }
}
// Strip Azure-specific non-standard fields from streaming chunks
if (parsed.prompt_filter_results !== undefined) {
delete parsed.prompt_filter_results;
fieldsInjected = true;
}
if (parsed?.choices) {
for (const choice of parsed.choices) {
if (choice.content_filter_results !== undefined) {
delete choice.content_filter_results;
fieldsInjected = true;
}
}
}
if (!hasValuableContent(parsed, FORMATS.OPENAI)) {
continue;
}
const delta = parsed.choices?.[0]?.delta;
const content = delta?.content;
const reasoning = delta?.reasoning_content;
if (content && typeof content === "string") {
totalContentLength += content.length;
accumulatedContent += content;
}
if (reasoning && typeof reasoning === "string") {
totalContentLength += reasoning.length;
accumulatedThinking += reasoning;
}
const extracted = extractUsage(parsed);
if (extracted) {
usage = extracted;
}
const isFinishChunk = parsed.choices?.[0]?.finish_reason;
if (isFinishChunk && !hasValidUsage(parsed.usage)) {
const estimated = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
parsed.usage = filterUsageForFormat(estimated, FORMATS.OPENAI);
output = `data: ${JSON.stringify(parsed)}\n`;
usage = estimated;
injectedUsage = true;
} else if (isFinishChunk && usage) {
const buffered = addBufferToUsage(usage);
parsed.usage = filterUsageForFormat(buffered, FORMATS.OPENAI);
output = `data: ${JSON.stringify(parsed)}\n`;
injectedUsage = true;
} else if (idFixed || fieldsInjected) {
output = `data: ${JSON.stringify(parsed)}\n`;
injectedUsage = true;
}
} catch { }
}
if (!injectedUsage) {
if (line.startsWith("data:") && !line.startsWith("data: ")) {
output = "data: " + line.slice(5) + "\n";
} else {
output = line + "\n";
}
}
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
continue;
}
// Translate mode
if (!trimmed) continue;
const parsed = parseSSELine(trimmed, targetFormat);
if (!parsed) continue;
// For Ollama: done=true is the final chunk with finish_reason/usage, must translate
// For other formats: done=true is the [DONE] sentinel, skip
if (parsed && parsed.done && targetFormat !== FORMATS.OLLAMA) {
const output = "data: [DONE]\n\n";
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
continue;
}
// Claude format - content
if (parsed.delta?.text) {
totalContentLength += parsed.delta.text.length;
accumulatedContent += parsed.delta.text;
}
// Claude format - thinking
if (parsed.delta?.thinking) {
totalContentLength += parsed.delta.thinking.length;
accumulatedThinking += parsed.delta.thinking;
}
// OpenAI format - content
if (parsed.choices?.[0]?.delta?.content) {
totalContentLength += parsed.choices[0].delta.content.length;
accumulatedContent += parsed.choices[0].delta.content;
}
// OpenAI format - reasoning
if (parsed.choices?.[0]?.delta?.reasoning_content) {
totalContentLength += parsed.choices[0].delta.reasoning_content.length;
accumulatedThinking += parsed.choices[0].delta.reasoning_content;
}
// Gemini format
if (parsed.candidates?.[0]?.content?.parts) {
for (const part of parsed.candidates[0].content.parts) {
if (part.text && typeof part.text === "string") {
totalContentLength += part.text.length;
// Check if this is thinking content
if (part.thought === true) {
accumulatedThinking += part.text;
} else {
accumulatedContent += part.text;
}
}
}
}
// Extract usage
const extracted = extractUsage(parsed);
if (extracted) state.usage = extracted; // Keep original usage for logging
// Translate: targetFormat -> openai -> sourceFormat
const translated = translateResponse(targetFormat, sourceFormat, parsed, state);
// Log OpenAI intermediate chunks (if available)
if (translated?._openaiIntermediate) {
for (const item of translated._openaiIntermediate) {
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
reqLogger?.appendOpenAIChunk?.(openaiOutput);
}
}
if (translated?.length > 0) {
for (const item of translated) {
// Filter empty chunks
if (!hasValuableContent(item, sourceFormat)) {
continue; // Skip this empty chunk
}
// Inject estimated usage if finish chunk has no valid usage
const isFinishChunk = item.type === "message_delta" || item.choices?.[0]?.finish_reason;
if (state.finishReason && isFinishChunk && !hasValidUsage(item.usage) && totalContentLength > 0) {
const estimated = estimateUsage(body, totalContentLength, sourceFormat);
item.usage = filterUsageForFormat(estimated, sourceFormat); // Filter + already has buffer
state.usage = estimated;
} else if (state.finishReason && isFinishChunk && state.usage) {
// Add buffer and filter usage for client (but keep original in state.usage for logging)
const buffered = addBufferToUsage(state.usage);
item.usage = filterUsageForFormat(buffered, sourceFormat);
}
const output = formatSSE(item, sourceFormat);
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
}
}
},
flush(controller) {
trackPendingRequest(model, provider, connectionId, false);
try {
const remaining = decoder.decode();
if (remaining) buffer += remaining;
if (mode === STREAM_MODE.PASSTHROUGH) {
if (buffer) {
let output = buffer;
if (buffer.startsWith("data:") && !buffer.startsWith("data: ")) {
output = "data: " + buffer.slice(5);
}
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
if (!hasValidUsage(usage) && totalContentLength > 0) {
usage = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
}
if (hasValidUsage(usage)) {
logUsage(provider, usage, model, connectionId, apiKey);
} else {
appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { });
}
// IMPORTANT: In passthrough mode we still must terminate the SSE stream.
// Some clients (e.g. OpenClaw) expect the OpenAI-style sentinel:
// data: [DONE]\n\n
// Without it they can hang until timeout and trigger failover.
const doneOutput = "data: [DONE]\n\n";
reqLogger?.appendConvertedChunk?.(doneOutput);
controller.enqueue(sharedEncoder.encode(doneOutput));
if (onStreamComplete) {
onStreamComplete({
content: accumulatedContent,
thinking: accumulatedThinking
}, usage, ttftAt);
}
return;
}
if (buffer.trim()) {
const parsed = parseSSELine(buffer.trim());
if (parsed && !parsed.done) {
const translated = translateResponse(targetFormat, sourceFormat, parsed, state);
if (translated?._openaiIntermediate) {
for (const item of translated._openaiIntermediate) {
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
reqLogger?.appendOpenAIChunk?.(openaiOutput);
}
}
if (translated?.length > 0) {
for (const item of translated) {
const output = formatSSE(item, sourceFormat);
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
}
}
}
const flushed = translateResponse(targetFormat, sourceFormat, null, state);
if (flushed?._openaiIntermediate) {
for (const item of flushed._openaiIntermediate) {
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
reqLogger?.appendOpenAIChunk?.(openaiOutput);
}
}
if (flushed?.length > 0) {
for (const item of flushed) {
const output = formatSSE(item, sourceFormat);
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
}
const doneOutput = "data: [DONE]\n\n";
reqLogger?.appendConvertedChunk?.(doneOutput);
controller.enqueue(sharedEncoder.encode(doneOutput));
if (!hasValidUsage(state?.usage) && totalContentLength > 0) {
state.usage = estimateUsage(body, totalContentLength, sourceFormat);
}
if (hasValidUsage(state?.usage)) {
logUsage(state.provider || targetFormat, state.usage, model, connectionId, apiKey);
} else {
appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { });
}
if (onStreamComplete) {
onStreamComplete({
content: accumulatedContent,
thinking: accumulatedThinking
}, state?.usage, ttftAt);
}
} catch (error) {
console.log("Error in flush:", error);
}
}
});
}
export function createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider = null, reqLogger = null, toolNameMap = null, model = null, connectionId = null, body = null, onStreamComplete = null, apiKey = null) {
return createSSEStream({
mode: STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider,
reqLogger,
toolNameMap,
model,
connectionId,
body,
onStreamComplete,
apiKey
});
}
export function createPassthroughStreamWithLogger(provider = null, reqLogger = null, model = null, connectionId = null, body = null, onStreamComplete = null, apiKey = null) {
return createSSEStream({
mode: STREAM_MODE.PASSTHROUGH,
provider,
reqLogger,
model,
connectionId,
body,
onStreamComplete,
apiKey
});
}