From e2db638982676899d4eadedb2614601ffe91ee33 Mon Sep 17 00:00:00 2001 From: decolua Date: Sun, 15 Feb 2026 12:02:53 +0700 Subject: [PATCH] feat: enhance request handling and error management in chatCore and streamToJsonConverter - Added detailed request logging and latency tracking in handleChatCore. - Improved error handling for SSE to JSON conversion and JSON parsing in streamToJsonConverter. - Introduced a safe JSON parsing utility to handle potential parsing errors gracefully in requestDetailsDb. Co-authored-by: zx --- open-sse/handlers/chatCore.js | 95 +++++++++++--- open-sse/transformer/streamToJsonConverter.js | 124 +++++++++--------- src/lib/requestDetailsDb.js | 38 ++++-- 3 files changed, 166 insertions(+), 91 deletions(-) diff --git a/open-sse/handlers/chatCore.js b/open-sse/handlers/chatCore.js index 16815d86..cd774630 100644 --- a/open-sse/handlers/chatCore.js +++ b/open-sse/handlers/chatCore.js @@ -606,6 +606,25 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred }).catch(() => { }); } + const totalLatency = Date.now() - requestStartTime; + saveRequestDetail({ + provider: provider || "unknown", + model: model || "unknown", + connectionId: connectionId || undefined, + timestamp: new Date().toISOString(), + latency: { ttft: totalLatency, total: totalLatency }, + tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: null, + response: { + content: jsonResponse.output?.[0]?.content?.[0]?.text || null, + thinking: null, + finish_reason: jsonResponse.status || "unknown" + }, + status: "success" + }).catch(() => { }); + return { success: true, response: new Response(JSON.stringify(jsonResponse), { @@ -621,22 +640,60 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred } } else { // Chat Completions SSE → Chat Completions JSON - const sseText = await providerResponse.text(); - const parsed = parseSSEToOpenAIResponse(sseText, model); - if (parsed) { - if (onRequestSuccess) await onRequestSuccess(); - appendRequestLog({ model, provider, connectionId, tokens: parsed.usage, status: "200 OK" }).catch(() => { }); - return { - success: true, - response: new Response(JSON.stringify(parsed), { - headers: { - "Content-Type": "application/json", - "Access-Control-Allow-Origin": "*" - } - }) - }; + try { + const sseText = await providerResponse.text(); + const parsed = parseSSEToOpenAIResponse(sseText, model); + if (parsed) { + if (onRequestSuccess) await onRequestSuccess(); + + const usage = parsed.usage || {}; + appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { }); + + if (usage && typeof usage === "object") { + saveRequestUsage({ + provider: provider || "unknown", + model: model || "unknown", + tokens: usage, + timestamp: new Date().toISOString(), + connectionId: connectionId || undefined, + apiKey: apiKey || undefined + }).catch(() => { }); + } + + const totalLatency = Date.now() - requestStartTime; + saveRequestDetail({ + provider: provider || "unknown", + model: model || "unknown", + connectionId: connectionId || undefined, + timestamp: new Date().toISOString(), + latency: { ttft: totalLatency, total: totalLatency }, + tokens: usage, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: null, + response: { + content: parsed.choices?.[0]?.message?.content || null, + thinking: parsed.choices?.[0]?.message?.reasoning_content || null, + finish_reason: parsed.choices?.[0]?.finish_reason || "unknown" + }, + status: "success" + }).catch(() => { }); + + return { + success: true, + response: new Response(JSON.stringify(parsed), { + headers: { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*" + } + }) + }; + } + return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request"); + } catch (error) { + console.error("[ChatCore] Chat Completions SSE→JSON conversion failed:", error); + return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON"); } - return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request"); } } } @@ -657,7 +714,13 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred } responseBody = parsedFromSSE; } else { - responseBody = await providerResponse.json(); + try { + responseBody = await providerResponse.json(); + } catch (parseError) { + appendRequestLog({ model, provider, connectionId, status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { }); + console.error(`[ChatCore] Failed to parse JSON response from ${provider}:`, parseError.message); + return createErrorResult(HTTP_STATUS.BAD_GATEWAY, `Invalid JSON response from ${provider}`); + } } // Notify success - caller can clear error status if needed diff --git a/open-sse/transformer/streamToJsonConverter.js b/open-sse/transformer/streamToJsonConverter.js index 282f9cf6..92cb723b 100644 --- a/open-sse/transformer/streamToJsonConverter.js +++ b/open-sse/transformer/streamToJsonConverter.js @@ -4,24 +4,64 @@ * Used when client requests non-streaming but provider forces streaming (e.g., Codex) */ +/** + * Process a single SSE message and update state accordingly. + */ +function processSSEMessage(msg, state) { + if (!msg.trim()) return; + + const eventMatch = msg.match(/^event:\s*(.+)$/m); + const dataMatch = msg.match(/^data:\s*(.+)$/m); + if (!eventMatch || !dataMatch) return; + + const eventType = eventMatch[1].trim(); + const dataStr = dataMatch[1].trim(); + if (dataStr === "[DONE]") return; + + let parsed; + try { parsed = JSON.parse(dataStr); } + catch { return; } + + if (eventType === "response.created") { + state.responseId = parsed.response?.id || state.responseId; + state.created = parsed.response?.created_at || state.created; + } else if (eventType === "response.output_item.done") { + state.items.set(parsed.output_index ?? 0, parsed.item); + } else if (eventType === "response.completed") { + state.status = "completed"; + if (parsed.response?.usage) { + state.usage.input_tokens = parsed.response.usage.input_tokens || 0; + state.usage.output_tokens = parsed.response.usage.output_tokens || 0; + state.usage.total_tokens = parsed.response.usage.total_tokens || 0; + } + } else if (eventType === "response.failed") { + state.status = "failed"; + } +} + +const EMPTY_RESPONSE = { input_tokens: 0, output_tokens: 0, total_tokens: 0 }; + /** * Convert Responses API SSE stream to single JSON response * @param {ReadableStream} stream - SSE stream from provider * @returns {Promise} Final JSON response in Responses API format */ export async function convertResponsesStreamToJson(stream) { + if (!stream || typeof stream.getReader !== "function") { + return { id: `resp_${Date.now()}`, object: "response", created_at: Math.floor(Date.now() / 1000), status: "failed", output: [], usage: { ...EMPTY_RESPONSE } }; + } + const reader = stream.getReader(); const decoder = new TextDecoder(); - let buffer = ""; - let responseId = ""; - let output = []; - let created = Math.floor(Date.now() / 1000); - let status = "in_progress"; - let usage = { input_tokens: 0, output_tokens: 0, total_tokens: 0 }; - // Map of output_index -> item (for ordered output array) - const items = new Map(); + const state = { + responseId: "", + created: Math.floor(Date.now() / 1000), + status: "in_progress", + usage: { ...EMPTY_RESPONSE }, + items: new Map() + }; try { while (true) { @@ -29,75 +69,35 @@ export async function convertResponsesStreamToJson(stream) { if (done) break; buffer += decoder.decode(value, { stream: true }); - - // Split by double newline (SSE event separator) const messages = buffer.split("\n\n"); - buffer = messages.pop() || ""; // Keep incomplete message in buffer + buffer = messages.pop() || ""; for (const msg of messages) { - if (!msg.trim()) continue; - - // Parse SSE event - const eventMatch = msg.match(/^event:\s*(.+)$/m); - const dataMatch = msg.match(/^data:\s*(.+)$/m); - - if (!eventMatch || !dataMatch) continue; - - const eventType = eventMatch[1].trim(); - const dataStr = dataMatch[1].trim(); - - if (dataStr === "[DONE]") continue; - - let parsed; - try { - parsed = JSON.parse(dataStr); - } catch { - // Skip malformed JSON - continue; - } - - // Handle different event types - if (eventType === "response.created") { - responseId = parsed.response?.id || responseId; - created = parsed.response?.created_at || created; - } - else if (eventType === "response.output_item.done") { - const idx = parsed.output_index ?? 0; - items.set(idx, parsed.item); - } - else if (eventType === "response.completed") { - status = "completed"; - if (parsed.response?.usage) { - usage.input_tokens = parsed.response.usage.input_tokens || 0; - usage.output_tokens = parsed.response.usage.output_tokens || 0; - usage.total_tokens = parsed.response.usage.total_tokens || 0; - } - } - else if (eventType === "response.failed") { - status = "failed"; - } + processSSEMessage(msg, state); } } + + // Flush remaining buffer (last event may not end with \n\n) + if (buffer.trim()) { + processSSEMessage(buffer, state); + } } finally { reader.releaseLock(); } // Build output array from accumulated items (ordered by index) - const maxIndex = items.size > 0 ? Math.max(...items.keys()) : -1; + const output = []; + const maxIndex = state.items.size > 0 ? Math.max(...state.items.keys()) : -1; for (let i = 0; i <= maxIndex; i++) { - output.push(items.get(i) || { - type: "message", - content: [], - role: "assistant" - }); + output.push(state.items.get(i) || { type: "message", content: [], role: "assistant" }); } return { - id: responseId || `resp_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`, + id: state.responseId || `resp_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`, object: "response", - created_at: created, - status: status || "completed", + created_at: state.created, + status: state.status || "completed", output, - usage + usage: state.usage }; } diff --git a/src/lib/requestDetailsDb.js b/src/lib/requestDetailsDb.js index c7f9d438..b19fa0d1 100644 --- a/src/lib/requestDetailsDb.js +++ b/src/lib/requestDetailsDb.js @@ -298,7 +298,8 @@ function safeJsonStringify(obj, maxSize) { try { const str = JSON.stringify(obj); if (str.length > maxSize) { - return str.substring(0, maxSize) + "... (truncated due to size limit)"; + // Return valid JSON instead of truncated invalid string + return JSON.stringify({ _truncated: true, _originalSize: str.length, _preview: str.substring(0, 200) }); } return str; } catch (error) { @@ -456,6 +457,12 @@ export async function getRequestDetails(filter = {}) { const stmt = db.prepare(query); const rows = stmt.all(...params); + // Safe JSON parse — returns fallback on corrupt/truncated data + const safeJsonParse = (str, fallback = {}) => { + try { return JSON.parse(str || '{}'); } + catch { return fallback; } + }; + // Convert back to original format const details = rows.map(row => ({ id: row.id, @@ -464,12 +471,12 @@ export async function getRequestDetails(filter = {}) { connectionId: row.connection_id, timestamp: new Date(row.timestamp).toISOString(), status: row.status, - latency: JSON.parse(row.latency || '{}'), - tokens: JSON.parse(row.tokens || '{}'), - request: JSON.parse(row.request || '{}'), - providerRequest: JSON.parse(row.provider_request || '{}'), - providerResponse: JSON.parse(row.provider_response || '{}'), - response: JSON.parse(row.response || '{}') + latency: safeJsonParse(row.latency), + tokens: safeJsonParse(row.tokens), + request: safeJsonParse(row.request), + providerRequest: safeJsonParse(row.provider_request), + providerResponse: safeJsonParse(row.provider_response), + response: safeJsonParse(row.response) })); return { @@ -500,6 +507,11 @@ export async function getRequestDetailById(id) { if (!row) return null; + const safeJsonParse = (str, fallback = {}) => { + try { return JSON.parse(str || '{}'); } + catch { return fallback; } + }; + return { id: row.id, provider: row.provider, @@ -507,11 +519,11 @@ export async function getRequestDetailById(id) { connectionId: row.connection_id, timestamp: new Date(row.timestamp).toISOString(), status: row.status, - latency: JSON.parse(row.latency || '{}'), - tokens: JSON.parse(row.tokens || '{}'), - request: JSON.parse(row.request || '{}'), - providerRequest: JSON.parse(row.provider_request || '{}'), - providerResponse: JSON.parse(row.provider_response || '{}'), - response: JSON.parse(row.response || '{}') + latency: safeJsonParse(row.latency), + tokens: safeJsonParse(row.tokens), + request: safeJsonParse(row.request), + providerRequest: safeJsonParse(row.provider_request), + providerResponse: safeJsonParse(row.provider_response), + response: safeJsonParse(row.response) }; }