feat: enhance request handling and error management in chatCore and streamToJsonConverter

- Added detailed request logging and latency tracking in handleChatCore.
- Improved error handling for SSE to JSON conversion and JSON parsing in streamToJsonConverter.
- Introduced a safe JSON parsing utility to handle potential parsing errors gracefully in requestDetailsDb.

Co-authored-by: zx <me@char.moe>
This commit is contained in:
decolua
2026-02-15 12:02:53 +07:00
parent 3d29b86d44
commit e2db638982
3 changed files with 166 additions and 91 deletions

View File

@@ -606,6 +606,25 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
}).catch(() => { });
}
const totalLatency = Date.now() - requestStartTime;
saveRequestDetail({
provider: provider || "unknown",
model: model || "unknown",
connectionId: connectionId || undefined,
timestamp: new Date().toISOString(),
latency: { ttft: totalLatency, total: totalLatency },
tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 },
request: extractRequestConfig(body, stream),
providerRequest: finalBody || translatedBody || null,
providerResponse: null,
response: {
content: jsonResponse.output?.[0]?.content?.[0]?.text || null,
thinking: null,
finish_reason: jsonResponse.status || "unknown"
},
status: "success"
}).catch(() => { });
return {
success: true,
response: new Response(JSON.stringify(jsonResponse), {
@@ -621,22 +640,60 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
}
} else {
// Chat Completions SSE → Chat Completions JSON
const sseText = await providerResponse.text();
const parsed = parseSSEToOpenAIResponse(sseText, model);
if (parsed) {
if (onRequestSuccess) await onRequestSuccess();
appendRequestLog({ model, provider, connectionId, tokens: parsed.usage, status: "200 OK" }).catch(() => { });
return {
success: true,
response: new Response(JSON.stringify(parsed), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*"
}
})
};
try {
const sseText = await providerResponse.text();
const parsed = parseSSEToOpenAIResponse(sseText, model);
if (parsed) {
if (onRequestSuccess) await onRequestSuccess();
const usage = parsed.usage || {};
appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { });
if (usage && typeof usage === "object") {
saveRequestUsage({
provider: provider || "unknown",
model: model || "unknown",
tokens: usage,
timestamp: new Date().toISOString(),
connectionId: connectionId || undefined,
apiKey: apiKey || undefined
}).catch(() => { });
}
const totalLatency = Date.now() - requestStartTime;
saveRequestDetail({
provider: provider || "unknown",
model: model || "unknown",
connectionId: connectionId || undefined,
timestamp: new Date().toISOString(),
latency: { ttft: totalLatency, total: totalLatency },
tokens: usage,
request: extractRequestConfig(body, stream),
providerRequest: finalBody || translatedBody || null,
providerResponse: null,
response: {
content: parsed.choices?.[0]?.message?.content || null,
thinking: parsed.choices?.[0]?.message?.reasoning_content || null,
finish_reason: parsed.choices?.[0]?.finish_reason || "unknown"
},
status: "success"
}).catch(() => { });
return {
success: true,
response: new Response(JSON.stringify(parsed), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*"
}
})
};
}
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
} catch (error) {
console.error("[ChatCore] Chat Completions SSE→JSON conversion failed:", error);
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
}
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
}
}
}
@@ -657,7 +714,13 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
}
responseBody = parsedFromSSE;
} else {
responseBody = await providerResponse.json();
try {
responseBody = await providerResponse.json();
} catch (parseError) {
appendRequestLog({ model, provider, connectionId, status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { });
console.error(`[ChatCore] Failed to parse JSON response from ${provider}:`, parseError.message);
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, `Invalid JSON response from ${provider}`);
}
}
// Notify success - caller can clear error status if needed

View File

@@ -4,24 +4,64 @@
* Used when client requests non-streaming but provider forces streaming (e.g., Codex)
*/
/**
* Process a single SSE message and update state accordingly.
*/
function processSSEMessage(msg, state) {
if (!msg.trim()) return;
const eventMatch = msg.match(/^event:\s*(.+)$/m);
const dataMatch = msg.match(/^data:\s*(.+)$/m);
if (!eventMatch || !dataMatch) return;
const eventType = eventMatch[1].trim();
const dataStr = dataMatch[1].trim();
if (dataStr === "[DONE]") return;
let parsed;
try { parsed = JSON.parse(dataStr); }
catch { return; }
if (eventType === "response.created") {
state.responseId = parsed.response?.id || state.responseId;
state.created = parsed.response?.created_at || state.created;
} else if (eventType === "response.output_item.done") {
state.items.set(parsed.output_index ?? 0, parsed.item);
} else if (eventType === "response.completed") {
state.status = "completed";
if (parsed.response?.usage) {
state.usage.input_tokens = parsed.response.usage.input_tokens || 0;
state.usage.output_tokens = parsed.response.usage.output_tokens || 0;
state.usage.total_tokens = parsed.response.usage.total_tokens || 0;
}
} else if (eventType === "response.failed") {
state.status = "failed";
}
}
const EMPTY_RESPONSE = { input_tokens: 0, output_tokens: 0, total_tokens: 0 };
/**
* Convert Responses API SSE stream to single JSON response
* @param {ReadableStream} stream - SSE stream from provider
* @returns {Promise<Object>} Final JSON response in Responses API format
*/
export async function convertResponsesStreamToJson(stream) {
if (!stream || typeof stream.getReader !== "function") {
return { id: `resp_${Date.now()}`, object: "response", created_at: Math.floor(Date.now() / 1000), status: "failed", output: [], usage: { ...EMPTY_RESPONSE } };
}
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = "";
let responseId = "";
let output = [];
let created = Math.floor(Date.now() / 1000);
let status = "in_progress";
let usage = { input_tokens: 0, output_tokens: 0, total_tokens: 0 };
// Map of output_index -> item (for ordered output array)
const items = new Map();
const state = {
responseId: "",
created: Math.floor(Date.now() / 1000),
status: "in_progress",
usage: { ...EMPTY_RESPONSE },
items: new Map()
};
try {
while (true) {
@@ -29,75 +69,35 @@ export async function convertResponsesStreamToJson(stream) {
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Split by double newline (SSE event separator)
const messages = buffer.split("\n\n");
buffer = messages.pop() || ""; // Keep incomplete message in buffer
buffer = messages.pop() || "";
for (const msg of messages) {
if (!msg.trim()) continue;
// Parse SSE event
const eventMatch = msg.match(/^event:\s*(.+)$/m);
const dataMatch = msg.match(/^data:\s*(.+)$/m);
if (!eventMatch || !dataMatch) continue;
const eventType = eventMatch[1].trim();
const dataStr = dataMatch[1].trim();
if (dataStr === "[DONE]") continue;
let parsed;
try {
parsed = JSON.parse(dataStr);
} catch {
// Skip malformed JSON
continue;
}
// Handle different event types
if (eventType === "response.created") {
responseId = parsed.response?.id || responseId;
created = parsed.response?.created_at || created;
}
else if (eventType === "response.output_item.done") {
const idx = parsed.output_index ?? 0;
items.set(idx, parsed.item);
}
else if (eventType === "response.completed") {
status = "completed";
if (parsed.response?.usage) {
usage.input_tokens = parsed.response.usage.input_tokens || 0;
usage.output_tokens = parsed.response.usage.output_tokens || 0;
usage.total_tokens = parsed.response.usage.total_tokens || 0;
}
}
else if (eventType === "response.failed") {
status = "failed";
}
processSSEMessage(msg, state);
}
}
// Flush remaining buffer (last event may not end with \n\n)
if (buffer.trim()) {
processSSEMessage(buffer, state);
}
} finally {
reader.releaseLock();
}
// Build output array from accumulated items (ordered by index)
const maxIndex = items.size > 0 ? Math.max(...items.keys()) : -1;
const output = [];
const maxIndex = state.items.size > 0 ? Math.max(...state.items.keys()) : -1;
for (let i = 0; i <= maxIndex; i++) {
output.push(items.get(i) || {
type: "message",
content: [],
role: "assistant"
});
output.push(state.items.get(i) || { type: "message", content: [], role: "assistant" });
}
return {
id: responseId || `resp_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`,
id: state.responseId || `resp_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`,
object: "response",
created_at: created,
status: status || "completed",
created_at: state.created,
status: state.status || "completed",
output,
usage
usage: state.usage
};
}

View File

@@ -298,7 +298,8 @@ function safeJsonStringify(obj, maxSize) {
try {
const str = JSON.stringify(obj);
if (str.length > maxSize) {
return str.substring(0, maxSize) + "... (truncated due to size limit)";
// Return valid JSON instead of truncated invalid string
return JSON.stringify({ _truncated: true, _originalSize: str.length, _preview: str.substring(0, 200) });
}
return str;
} catch (error) {
@@ -456,6 +457,12 @@ export async function getRequestDetails(filter = {}) {
const stmt = db.prepare(query);
const rows = stmt.all(...params);
// Safe JSON parse — returns fallback on corrupt/truncated data
const safeJsonParse = (str, fallback = {}) => {
try { return JSON.parse(str || '{}'); }
catch { return fallback; }
};
// Convert back to original format
const details = rows.map(row => ({
id: row.id,
@@ -464,12 +471,12 @@ export async function getRequestDetails(filter = {}) {
connectionId: row.connection_id,
timestamp: new Date(row.timestamp).toISOString(),
status: row.status,
latency: JSON.parse(row.latency || '{}'),
tokens: JSON.parse(row.tokens || '{}'),
request: JSON.parse(row.request || '{}'),
providerRequest: JSON.parse(row.provider_request || '{}'),
providerResponse: JSON.parse(row.provider_response || '{}'),
response: JSON.parse(row.response || '{}')
latency: safeJsonParse(row.latency),
tokens: safeJsonParse(row.tokens),
request: safeJsonParse(row.request),
providerRequest: safeJsonParse(row.provider_request),
providerResponse: safeJsonParse(row.provider_response),
response: safeJsonParse(row.response)
}));
return {
@@ -500,6 +507,11 @@ export async function getRequestDetailById(id) {
if (!row) return null;
const safeJsonParse = (str, fallback = {}) => {
try { return JSON.parse(str || '{}'); }
catch { return fallback; }
};
return {
id: row.id,
provider: row.provider,
@@ -507,11 +519,11 @@ export async function getRequestDetailById(id) {
connectionId: row.connection_id,
timestamp: new Date(row.timestamp).toISOString(),
status: row.status,
latency: JSON.parse(row.latency || '{}'),
tokens: JSON.parse(row.tokens || '{}'),
request: JSON.parse(row.request || '{}'),
providerRequest: JSON.parse(row.provider_request || '{}'),
providerResponse: JSON.parse(row.provider_response || '{}'),
response: JSON.parse(row.response || '{}')
latency: safeJsonParse(row.latency),
tokens: safeJsonParse(row.tokens),
request: safeJsonParse(row.request),
providerRequest: safeJsonParse(row.provider_request),
providerResponse: safeJsonParse(row.provider_response),
response: safeJsonParse(row.response)
};
}