mirror of
https://github.com/decolua/9router.git
synced 2026-05-08 12:01:28 +00:00
393 lines
14 KiB
JavaScript
393 lines
14 KiB
JavaScript
import { translateResponse, initState } from "../translator/index.js";
|
|
import { FORMATS } from "../translator/formats.js";
|
|
import { trackPendingRequest, appendRequestLog } from "@/lib/usageDb.js";
|
|
import { extractUsage, hasValidUsage, estimateUsage, logUsage, addBufferToUsage, filterUsageForFormat, COLORS } from "./usageTracking.js";
|
|
import { parseSSELine, hasValuableContent, fixInvalidId, formatSSE } from "./streamHelpers.js";
|
|
|
|
export { COLORS, formatSSE };
|
|
|
|
// sharedEncoder is stateless — safe to share across streams
|
|
const sharedEncoder = new TextEncoder();
|
|
|
|
/**
|
|
* Stream modes
|
|
*/
|
|
const STREAM_MODE = {
|
|
TRANSLATE: "translate", // Full translation between formats
|
|
PASSTHROUGH: "passthrough" // No translation, normalize output, extract usage
|
|
};
|
|
|
|
/**
|
|
* Create unified SSE transform stream
|
|
* @param {object} options
|
|
* @param {string} options.mode - Stream mode: translate, passthrough
|
|
* @param {string} options.targetFormat - Provider format (for translate mode)
|
|
* @param {string} options.sourceFormat - Client format (for translate mode)
|
|
* @param {string} options.provider - Provider name
|
|
* @param {object} options.reqLogger - Request logger instance
|
|
* @param {string} options.model - Model name
|
|
* @param {string} options.connectionId - Connection ID for usage tracking
|
|
* @param {object} options.body - Request body (for input token estimation)
|
|
* @param {function} options.onStreamComplete - Callback when stream completes (content, usage)
|
|
* @param {string} options.apiKey - API key for usage tracking
|
|
*/
|
|
export function createSSEStream(options = {}) {
|
|
const {
|
|
mode = STREAM_MODE.TRANSLATE,
|
|
targetFormat,
|
|
sourceFormat,
|
|
provider = null,
|
|
reqLogger = null,
|
|
toolNameMap = null,
|
|
model = null,
|
|
connectionId = null,
|
|
body = null,
|
|
onStreamComplete = null,
|
|
apiKey = null
|
|
} = options;
|
|
|
|
let buffer = "";
|
|
let usage = null;
|
|
|
|
// Per-stream decoder with stream:true to correctly handle multi-byte chars split across chunks
|
|
const decoder = new TextDecoder("utf-8", { fatal: false });
|
|
|
|
const state = mode === STREAM_MODE.TRANSLATE ? { ...initState(sourceFormat), provider, toolNameMap, model } : null;
|
|
|
|
let totalContentLength = 0;
|
|
let accumulatedContent = "";
|
|
let accumulatedThinking = "";
|
|
let ttftAt = null;
|
|
|
|
return new TransformStream({
|
|
transform(chunk, controller) {
|
|
if (!ttftAt) {
|
|
ttftAt = Date.now();
|
|
}
|
|
const text = decoder.decode(chunk, { stream: true });
|
|
buffer += text;
|
|
reqLogger?.appendProviderChunk?.(text);
|
|
|
|
const lines = buffer.split("\n");
|
|
buffer = lines.pop() || "";
|
|
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
|
|
// Passthrough mode: normalize and forward
|
|
if (mode === STREAM_MODE.PASSTHROUGH) {
|
|
let output;
|
|
let injectedUsage = false;
|
|
|
|
if (trimmed.startsWith("data:") && trimmed.slice(5).trim() !== "[DONE]") {
|
|
try {
|
|
const parsed = JSON.parse(trimmed.slice(5).trim());
|
|
|
|
const idFixed = fixInvalidId(parsed);
|
|
|
|
// Ensure OpenAI-required fields are present on streaming chunks (Letta compat)
|
|
let fieldsInjected = false;
|
|
if (parsed.choices !== undefined) {
|
|
if (!parsed.object) { parsed.object = "chat.completion.chunk"; fieldsInjected = true; }
|
|
if (!parsed.created) { parsed.created = Math.floor(Date.now() / 1000); fieldsInjected = true; }
|
|
}
|
|
|
|
// Strip Azure-specific non-standard fields from streaming chunks
|
|
if (parsed.prompt_filter_results !== undefined) {
|
|
delete parsed.prompt_filter_results;
|
|
fieldsInjected = true;
|
|
}
|
|
if (parsed?.choices) {
|
|
for (const choice of parsed.choices) {
|
|
if (choice.content_filter_results !== undefined) {
|
|
delete choice.content_filter_results;
|
|
fieldsInjected = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!hasValuableContent(parsed, FORMATS.OPENAI)) {
|
|
continue;
|
|
}
|
|
|
|
const delta = parsed.choices?.[0]?.delta;
|
|
const content = delta?.content;
|
|
const reasoning = delta?.reasoning_content;
|
|
if (content && typeof content === "string") {
|
|
totalContentLength += content.length;
|
|
accumulatedContent += content;
|
|
}
|
|
if (reasoning && typeof reasoning === "string") {
|
|
totalContentLength += reasoning.length;
|
|
accumulatedThinking += reasoning;
|
|
}
|
|
|
|
const extracted = extractUsage(parsed);
|
|
if (extracted) {
|
|
usage = extracted;
|
|
}
|
|
|
|
const isFinishChunk = parsed.choices?.[0]?.finish_reason;
|
|
if (isFinishChunk && !hasValidUsage(parsed.usage)) {
|
|
const estimated = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
|
|
parsed.usage = filterUsageForFormat(estimated, FORMATS.OPENAI);
|
|
output = `data: ${JSON.stringify(parsed)}\n`;
|
|
usage = estimated;
|
|
injectedUsage = true;
|
|
} else if (isFinishChunk && usage) {
|
|
const buffered = addBufferToUsage(usage);
|
|
parsed.usage = filterUsageForFormat(buffered, FORMATS.OPENAI);
|
|
output = `data: ${JSON.stringify(parsed)}\n`;
|
|
injectedUsage = true;
|
|
} else if (idFixed || fieldsInjected) {
|
|
output = `data: ${JSON.stringify(parsed)}\n`;
|
|
injectedUsage = true;
|
|
}
|
|
} catch { }
|
|
}
|
|
|
|
if (!injectedUsage) {
|
|
if (line.startsWith("data:") && !line.startsWith("data: ")) {
|
|
output = "data: " + line.slice(5) + "\n";
|
|
} else {
|
|
output = line + "\n";
|
|
}
|
|
}
|
|
|
|
reqLogger?.appendConvertedChunk?.(output);
|
|
controller.enqueue(sharedEncoder.encode(output));
|
|
continue;
|
|
}
|
|
|
|
// Translate mode
|
|
if (!trimmed) continue;
|
|
|
|
const parsed = parseSSELine(trimmed, targetFormat);
|
|
if (!parsed) continue;
|
|
|
|
// For Ollama: done=true is the final chunk with finish_reason/usage, must translate
|
|
// For other formats: done=true is the [DONE] sentinel, skip
|
|
if (parsed && parsed.done && targetFormat !== FORMATS.OLLAMA) {
|
|
const output = "data: [DONE]\n\n";
|
|
reqLogger?.appendConvertedChunk?.(output);
|
|
controller.enqueue(sharedEncoder.encode(output));
|
|
continue;
|
|
}
|
|
|
|
// Claude format - content
|
|
if (parsed.delta?.text) {
|
|
totalContentLength += parsed.delta.text.length;
|
|
accumulatedContent += parsed.delta.text;
|
|
}
|
|
// Claude format - thinking
|
|
if (parsed.delta?.thinking) {
|
|
totalContentLength += parsed.delta.thinking.length;
|
|
accumulatedThinking += parsed.delta.thinking;
|
|
}
|
|
|
|
// OpenAI format - content
|
|
if (parsed.choices?.[0]?.delta?.content) {
|
|
totalContentLength += parsed.choices[0].delta.content.length;
|
|
accumulatedContent += parsed.choices[0].delta.content;
|
|
}
|
|
// OpenAI format - reasoning
|
|
if (parsed.choices?.[0]?.delta?.reasoning_content) {
|
|
totalContentLength += parsed.choices[0].delta.reasoning_content.length;
|
|
accumulatedThinking += parsed.choices[0].delta.reasoning_content;
|
|
}
|
|
|
|
// Gemini format
|
|
if (parsed.candidates?.[0]?.content?.parts) {
|
|
for (const part of parsed.candidates[0].content.parts) {
|
|
if (part.text && typeof part.text === "string") {
|
|
totalContentLength += part.text.length;
|
|
// Check if this is thinking content
|
|
if (part.thought === true) {
|
|
accumulatedThinking += part.text;
|
|
} else {
|
|
accumulatedContent += part.text;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Extract usage
|
|
const extracted = extractUsage(parsed);
|
|
if (extracted) state.usage = extracted; // Keep original usage for logging
|
|
|
|
// Translate: targetFormat -> openai -> sourceFormat
|
|
const translated = translateResponse(targetFormat, sourceFormat, parsed, state);
|
|
|
|
// Log OpenAI intermediate chunks (if available)
|
|
if (translated?._openaiIntermediate) {
|
|
for (const item of translated._openaiIntermediate) {
|
|
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
|
|
reqLogger?.appendOpenAIChunk?.(openaiOutput);
|
|
}
|
|
}
|
|
|
|
if (translated?.length > 0) {
|
|
for (const item of translated) {
|
|
// Filter empty chunks
|
|
if (!hasValuableContent(item, sourceFormat)) {
|
|
continue; // Skip this empty chunk
|
|
}
|
|
|
|
// Inject estimated usage if finish chunk has no valid usage
|
|
const isFinishChunk = item.type === "message_delta" || item.choices?.[0]?.finish_reason;
|
|
if (state.finishReason && isFinishChunk && !hasValidUsage(item.usage) && totalContentLength > 0) {
|
|
const estimated = estimateUsage(body, totalContentLength, sourceFormat);
|
|
item.usage = filterUsageForFormat(estimated, sourceFormat); // Filter + already has buffer
|
|
state.usage = estimated;
|
|
} else if (state.finishReason && isFinishChunk && state.usage) {
|
|
// Add buffer and filter usage for client (but keep original in state.usage for logging)
|
|
const buffered = addBufferToUsage(state.usage);
|
|
item.usage = filterUsageForFormat(buffered, sourceFormat);
|
|
}
|
|
|
|
const output = formatSSE(item, sourceFormat);
|
|
reqLogger?.appendConvertedChunk?.(output);
|
|
controller.enqueue(sharedEncoder.encode(output));
|
|
}
|
|
}
|
|
}
|
|
},
|
|
|
|
flush(controller) {
|
|
trackPendingRequest(model, provider, connectionId, false);
|
|
try {
|
|
const remaining = decoder.decode();
|
|
if (remaining) buffer += remaining;
|
|
|
|
if (mode === STREAM_MODE.PASSTHROUGH) {
|
|
if (buffer) {
|
|
let output = buffer;
|
|
if (buffer.startsWith("data:") && !buffer.startsWith("data: ")) {
|
|
output = "data: " + buffer.slice(5);
|
|
}
|
|
reqLogger?.appendConvertedChunk?.(output);
|
|
controller.enqueue(sharedEncoder.encode(output));
|
|
}
|
|
|
|
if (!hasValidUsage(usage) && totalContentLength > 0) {
|
|
usage = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
|
|
}
|
|
|
|
if (hasValidUsage(usage)) {
|
|
logUsage(provider, usage, model, connectionId, apiKey);
|
|
} else {
|
|
appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { });
|
|
}
|
|
|
|
// IMPORTANT: In passthrough mode we still must terminate the SSE stream.
|
|
// Some clients (e.g. OpenClaw) expect the OpenAI-style sentinel:
|
|
// data: [DONE]\n\n
|
|
// Without it they can hang until timeout and trigger failover.
|
|
const doneOutput = "data: [DONE]\n\n";
|
|
reqLogger?.appendConvertedChunk?.(doneOutput);
|
|
controller.enqueue(sharedEncoder.encode(doneOutput));
|
|
|
|
if (onStreamComplete) {
|
|
onStreamComplete({
|
|
content: accumulatedContent,
|
|
thinking: accumulatedThinking
|
|
}, usage, ttftAt);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (buffer.trim()) {
|
|
const parsed = parseSSELine(buffer.trim());
|
|
if (parsed && !parsed.done) {
|
|
const translated = translateResponse(targetFormat, sourceFormat, parsed, state);
|
|
|
|
if (translated?._openaiIntermediate) {
|
|
for (const item of translated._openaiIntermediate) {
|
|
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
|
|
reqLogger?.appendOpenAIChunk?.(openaiOutput);
|
|
}
|
|
}
|
|
|
|
if (translated?.length > 0) {
|
|
for (const item of translated) {
|
|
const output = formatSSE(item, sourceFormat);
|
|
reqLogger?.appendConvertedChunk?.(output);
|
|
controller.enqueue(sharedEncoder.encode(output));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const flushed = translateResponse(targetFormat, sourceFormat, null, state);
|
|
|
|
if (flushed?._openaiIntermediate) {
|
|
for (const item of flushed._openaiIntermediate) {
|
|
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
|
|
reqLogger?.appendOpenAIChunk?.(openaiOutput);
|
|
}
|
|
}
|
|
|
|
if (flushed?.length > 0) {
|
|
for (const item of flushed) {
|
|
const output = formatSSE(item, sourceFormat);
|
|
reqLogger?.appendConvertedChunk?.(output);
|
|
controller.enqueue(sharedEncoder.encode(output));
|
|
}
|
|
}
|
|
|
|
const doneOutput = "data: [DONE]\n\n";
|
|
reqLogger?.appendConvertedChunk?.(doneOutput);
|
|
controller.enqueue(sharedEncoder.encode(doneOutput));
|
|
|
|
if (!hasValidUsage(state?.usage) && totalContentLength > 0) {
|
|
state.usage = estimateUsage(body, totalContentLength, sourceFormat);
|
|
}
|
|
|
|
if (hasValidUsage(state?.usage)) {
|
|
logUsage(state.provider || targetFormat, state.usage, model, connectionId, apiKey);
|
|
} else {
|
|
appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { });
|
|
}
|
|
|
|
if (onStreamComplete) {
|
|
onStreamComplete({
|
|
content: accumulatedContent,
|
|
thinking: accumulatedThinking
|
|
}, state?.usage, ttftAt);
|
|
}
|
|
} catch (error) {
|
|
console.log("Error in flush:", error);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
export function createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider = null, reqLogger = null, toolNameMap = null, model = null, connectionId = null, body = null, onStreamComplete = null, apiKey = null) {
|
|
return createSSEStream({
|
|
mode: STREAM_MODE.TRANSLATE,
|
|
targetFormat,
|
|
sourceFormat,
|
|
provider,
|
|
reqLogger,
|
|
toolNameMap,
|
|
model,
|
|
connectionId,
|
|
body,
|
|
onStreamComplete,
|
|
apiKey
|
|
});
|
|
}
|
|
|
|
export function createPassthroughStreamWithLogger(provider = null, reqLogger = null, model = null, connectionId = null, body = null, onStreamComplete = null, apiKey = null) {
|
|
return createSSEStream({
|
|
mode: STREAM_MODE.PASSTHROUGH,
|
|
provider,
|
|
reqLogger,
|
|
model,
|
|
connectionId,
|
|
body,
|
|
onStreamComplete,
|
|
apiKey
|
|
});
|
|
}
|