Files
9router/open-sse/handlers/chatCore/sseToJsonHandler.js
decolua 3d4dbdc0e7 fix(chat): pick last non-empty message for Codex Responses SSE
Root cause: Codex/OpenAI Responses streams multiple alternating reasoning and
message output items. The first message block often has empty output_text; the
visible answer lives in a later message. Previous code used output.find() which
always picked the first (empty) message block.

Fix: walk message items from end and use the last message whose extracted text
is non-empty; fall back to final message if all are empty.

Note: Removed debug logging code from original PR #383 to keep implementation clean.

Co-authored-by: lokinh <locnh@uniultra.xyz>
Made-with: Cursor
2026-03-23 09:29:31 +07:00

186 lines
8.5 KiB
JavaScript

import { convertResponsesStreamToJson } from "../../transformer/streamToJsonConverter.js";
import { createErrorResult } from "../../utils/error.js";
import { HTTP_STATUS } from "../../config/runtimeConfig.js";
import { FORMATS } from "../../translator/formats.js";
import { buildRequestDetail, extractRequestConfig, saveUsageStats } from "./requestDetail.js";
import { saveRequestDetail, appendRequestLog } from "@/lib/usageDb.js";
function textFromResponsesMessageItem(item) {
if (!item?.content || !Array.isArray(item.content)) return "";
const byType = item.content.find((c) => c.type === "output_text");
if (typeof byType?.text === "string") return byType.text;
const anyText = item.content.find((c) => typeof c.text === "string");
if (typeof anyText?.text === "string") return anyText.text;
return "";
}
/**
* Codex / Responses API may emit many alternating reasoning + message items.
* Early message blocks often have empty output_text; the user-visible answer is usually in the last non-empty message.
*/
function pickAssistantMessageForChatCompletion(output) {
if (!Array.isArray(output)) return { msgItem: null, textContent: null };
const messages = output.filter((item) => item?.type === "message");
if (messages.length === 0) return { msgItem: null, textContent: null };
for (let i = messages.length - 1; i >= 0; i--) {
const text = textFromResponsesMessageItem(messages[i]);
if (text.length > 0) return { msgItem: messages[i], textContent: text };
}
const last = messages[messages.length - 1];
return { msgItem: last, textContent: textFromResponsesMessageItem(last) };
}
/**
* Parse OpenAI-style SSE text into a single chat completion JSON.
* Used when provider forces streaming but client wants non-streaming.
*/
export function parseSSEToOpenAIResponse(rawSSE, fallbackModel) {
const chunks = [];
for (const line of String(rawSSE || "").split("\n")) {
const trimmed = line.trim();
if (!trimmed.startsWith("data:")) continue;
const payload = trimmed.slice(5).trim();
if (!payload || payload === "[DONE]") continue;
try { chunks.push(JSON.parse(payload)); } catch { /* ignore malformed lines */ }
}
if (chunks.length === 0) return null;
const first = chunks[0];
const contentParts = [];
const reasoningParts = [];
let finishReason = "stop";
let usage = null;
for (const chunk of chunks) {
const choice = chunk?.choices?.[0];
const delta = choice?.delta || {};
if (typeof delta.content === "string" && delta.content.length > 0) contentParts.push(delta.content);
if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) reasoningParts.push(delta.reasoning_content);
if (choice?.finish_reason) finishReason = choice.finish_reason;
if (chunk?.usage && typeof chunk.usage === "object") usage = chunk.usage;
}
const message = { role: "assistant", content: contentParts.join("") };
if (reasoningParts.length > 0) message.reasoning_content = reasoningParts.join("");
const result = {
id: first.id || `chatcmpl-${Date.now()}`,
object: "chat.completion",
created: first.created || Math.floor(Date.now() / 1000),
model: first.model || fallbackModel || "unknown",
choices: [{ index: 0, message, finish_reason: finishReason }]
};
if (usage) result.usage = usage;
return result;
}
/**
* Handle case: provider forced streaming but client wants JSON.
* Supports both Codex/Responses API SSE and standard Chat Completions SSE.
*/
export async function handleForcedSSEToJson({ providerResponse, sourceFormat, provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, trackDone, appendLog }) {
const contentType = providerResponse.headers.get("content-type") || "";
const isSSE = contentType.includes("text/event-stream") || (contentType === "" && provider === "codex");
if (!isSSE) return null; // not handled here
trackDone();
const ctx = {
provider, model, connectionId,
request: extractRequestConfig(body, stream),
providerRequest: finalBody || translatedBody || null
};
// Codex/Responses API SSE path
const isCodexResponsesApi = provider === "codex" || sourceFormat === FORMATS.OPENAI_RESPONSES;
if (isCodexResponsesApi) {
try {
const jsonResponse = await convertResponsesStreamToJson(providerResponse.body);
if (onRequestSuccess) await onRequestSuccess();
const usage = jsonResponse.usage || {};
appendLog({ tokens: usage, status: "200 OK" });
saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint });
const { msgItem, textContent } = pickAssistantMessageForChatCompletion(jsonResponse.output);
const totalLatency = Date.now() - requestStartTime;
saveRequestDetail(buildRequestDetail({
...ctx,
latency: { ttft: totalLatency, total: totalLatency },
tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 },
response: { content: textContent, thinking: null, finish_reason: jsonResponse.status || "unknown" },
status: "success"
}, { endpoint: clientRawRequest?.endpoint || null })).catch(() => {});
// Client is Responses API → return as-is
if (sourceFormat === FORMATS.OPENAI_RESPONSES) {
return { success: true, response: new Response(JSON.stringify(jsonResponse), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) };
}
// Build client-format response
const inTokens = usage.input_tokens || 0;
const outTokens = usage.output_tokens || 0;
let finalResp;
if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) {
finalResp = {
response: {
candidates: [{ content: { role: "model", parts: [{ text: textContent || "" }] }, finishReason: "STOP", index: 0 }],
usageMetadata: { promptTokenCount: inTokens, candidatesTokenCount: outTokens, totalTokenCount: inTokens + outTokens },
modelVersion: model,
responseId: jsonResponse.id || `resp_${Date.now()}`
}
};
} else {
finalResp = {
id: jsonResponse.id || `chatcmpl-${Date.now()}`,
object: "chat.completion",
created: jsonResponse.created_at || Math.floor(Date.now() / 1000),
model: jsonResponse.model || model,
choices: [{ index: 0, message: { role: "assistant", content: textContent || "" }, finish_reason: jsonResponse.status === "completed" ? "stop" : (jsonResponse.status || "stop") }],
usage: { prompt_tokens: inTokens, completion_tokens: outTokens, total_tokens: inTokens + outTokens }
};
}
return { success: true, response: new Response(JSON.stringify(finalResp), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) };
} catch (err) {
console.error("[ChatCore] Responses API SSE→JSON failed:", err);
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
}
}
// Standard Chat Completions SSE path
try {
const sseText = await providerResponse.text();
const parsed = parseSSEToOpenAIResponse(sseText, model);
if (!parsed) return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
if (onRequestSuccess) await onRequestSuccess();
const usage = parsed.usage || {};
appendLog({ tokens: usage, status: "200 OK" });
saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint });
const totalLatency = Date.now() - requestStartTime;
saveRequestDetail(buildRequestDetail({
...ctx,
latency: { ttft: totalLatency, total: totalLatency },
tokens: usage,
response: {
content: parsed.choices?.[0]?.message?.content || null,
thinking: parsed.choices?.[0]?.message?.reasoning_content || null,
finish_reason: parsed.choices?.[0]?.finish_reason || "unknown"
},
status: "success"
}, { endpoint: clientRawRequest?.endpoint || null })).catch(() => {});
return { success: true, response: new Response(JSON.stringify(parsed), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) };
} catch (err) {
console.error("[ChatCore] Chat Completions SSE→JSON failed:", err);
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
}
}