mirror of
https://github.com/decolua/9router.git
synced 2026-05-08 12:01:28 +00:00
- Tracks endpoints like /v1/chat/completions, /v1/messages, /v1/responses - New sortable/groupable table in usage dashboard with expandable groups - Enhanced usage database aggregation by endpoint + model + provider - Added endpoint tracking to all saveRequestUsage/saveRequestDetail calls - Maintains backward compatibility with existing data structure
964 lines
35 KiB
JavaScript
964 lines
35 KiB
JavaScript
import { detectFormat, getTargetFormat } from "../services/provider.js";
|
|
import { translateRequest, needsTranslation } from "../translator/index.js";
|
|
import { FORMATS } from "../translator/formats.js";
|
|
import { createSSETransformStreamWithLogger, createPassthroughStreamWithLogger, COLORS } from "../utils/stream.js";
|
|
import { createStreamController, pipeWithDisconnect } from "../utils/streamHandler.js";
|
|
import { addBufferToUsage, filterUsageForFormat } from "../utils/usageTracking.js";
|
|
import { refreshWithRetry } from "../services/tokenRefresh.js";
|
|
import { createRequestLogger } from "../utils/requestLogger.js";
|
|
import { getModelTargetFormat, PROVIDER_ID_TO_ALIAS } from "../config/providerModels.js";
|
|
import { createErrorResult, parseUpstreamError, formatProviderError } from "../utils/error.js";
|
|
import { HTTP_STATUS } from "../config/constants.js";
|
|
import { handleBypassRequest } from "../utils/bypassHandler.js";
|
|
import { saveRequestUsage, trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
|
|
import { getExecutor } from "../executors/index.js";
|
|
import { convertResponsesStreamToJson } from "../transformer/streamToJsonConverter.js";
|
|
|
|
/**
|
|
* Translate non-streaming response to OpenAI format
|
|
* Handles different provider response formats (Gemini, Claude, etc.)
|
|
*/
|
|
function translateNonStreamingResponse(responseBody, targetFormat, sourceFormat) {
|
|
// If already in source format (usually OpenAI), return as-is
|
|
if (targetFormat === sourceFormat || targetFormat === FORMATS.OPENAI) {
|
|
return responseBody;
|
|
}
|
|
|
|
// Handle Gemini/Antigravity format
|
|
if (targetFormat === FORMATS.GEMINI || targetFormat === FORMATS.ANTIGRAVITY || targetFormat === FORMATS.GEMINI_CLI) {
|
|
const response = responseBody.response || responseBody;
|
|
if (!response?.candidates?.[0]) {
|
|
return responseBody; // Can't translate, return raw
|
|
}
|
|
|
|
const candidate = response.candidates[0];
|
|
const content = candidate.content;
|
|
const usage = response.usageMetadata || responseBody.usageMetadata;
|
|
|
|
// Build message content
|
|
let textContent = "";
|
|
const toolCalls = [];
|
|
let reasoningContent = "";
|
|
|
|
if (content?.parts) {
|
|
for (const part of content.parts) {
|
|
// Handle thinking/reasoning
|
|
if (part.thought === true && part.text) {
|
|
reasoningContent += part.text;
|
|
}
|
|
// Regular text
|
|
else if (part.text !== undefined) {
|
|
textContent += part.text;
|
|
}
|
|
// Function calls
|
|
if (part.functionCall) {
|
|
toolCalls.push({
|
|
id: `call_${part.functionCall.name}_${Date.now()}_${toolCalls.length}`,
|
|
type: "function",
|
|
function: {
|
|
name: part.functionCall.name,
|
|
arguments: JSON.stringify(part.functionCall.args || {})
|
|
}
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build OpenAI format message
|
|
const message = { role: "assistant" };
|
|
if (textContent) {
|
|
message.content = textContent;
|
|
}
|
|
if (reasoningContent) {
|
|
message.reasoning_content = reasoningContent;
|
|
}
|
|
if (toolCalls.length > 0) {
|
|
message.tool_calls = toolCalls;
|
|
}
|
|
// If no content at all, set content to empty string
|
|
if (!message.content && !message.tool_calls) {
|
|
message.content = "";
|
|
}
|
|
|
|
// Determine finish reason
|
|
let finishReason = (candidate.finishReason || "stop").toLowerCase();
|
|
if (finishReason === "stop" && toolCalls.length > 0) {
|
|
finishReason = "tool_calls";
|
|
}
|
|
|
|
const result = {
|
|
id: `chatcmpl-${response.responseId || Date.now()}`,
|
|
object: "chat.completion",
|
|
created: Math.floor(new Date(response.createTime || Date.now()).getTime() / 1000),
|
|
model: response.modelVersion || "gemini",
|
|
choices: [{
|
|
index: 0,
|
|
message,
|
|
finish_reason: finishReason
|
|
}]
|
|
};
|
|
|
|
// Add usage if available (match streaming translator: add thoughtsTokenCount to prompt_tokens)
|
|
if (usage) {
|
|
result.usage = {
|
|
prompt_tokens: (usage.promptTokenCount || 0) + (usage.thoughtsTokenCount || 0),
|
|
completion_tokens: usage.candidatesTokenCount || 0,
|
|
total_tokens: usage.totalTokenCount || 0
|
|
};
|
|
if (usage.thoughtsTokenCount > 0) {
|
|
result.usage.completion_tokens_details = {
|
|
reasoning_tokens: usage.thoughtsTokenCount
|
|
};
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
// Handle Claude format
|
|
if (targetFormat === FORMATS.CLAUDE) {
|
|
if (!responseBody.content) {
|
|
return responseBody; // Can't translate, return raw
|
|
}
|
|
|
|
let textContent = "";
|
|
let thinkingContent = "";
|
|
const toolCalls = [];
|
|
|
|
for (const block of responseBody.content) {
|
|
if (block.type === "text") {
|
|
textContent += block.text;
|
|
} else if (block.type === "thinking") {
|
|
thinkingContent += block.thinking || "";
|
|
} else if (block.type === "tool_use") {
|
|
toolCalls.push({
|
|
id: block.id,
|
|
type: "function",
|
|
function: {
|
|
name: block.name,
|
|
arguments: JSON.stringify(block.input || {})
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
const message = { role: "assistant" };
|
|
if (textContent) {
|
|
message.content = textContent;
|
|
}
|
|
if (thinkingContent) {
|
|
message.reasoning_content = thinkingContent;
|
|
}
|
|
if (toolCalls.length > 0) {
|
|
message.tool_calls = toolCalls;
|
|
}
|
|
if (!message.content && !message.tool_calls) {
|
|
message.content = "";
|
|
}
|
|
|
|
let finishReason = responseBody.stop_reason || "stop";
|
|
if (finishReason === "end_turn") finishReason = "stop";
|
|
if (finishReason === "tool_use") finishReason = "tool_calls";
|
|
|
|
const result = {
|
|
id: `chatcmpl-${responseBody.id || Date.now()}`,
|
|
object: "chat.completion",
|
|
created: Math.floor(Date.now() / 1000),
|
|
model: responseBody.model || "claude",
|
|
choices: [{
|
|
index: 0,
|
|
message,
|
|
finish_reason: finishReason
|
|
}]
|
|
};
|
|
|
|
if (responseBody.usage) {
|
|
result.usage = {
|
|
prompt_tokens: responseBody.usage.input_tokens || 0,
|
|
completion_tokens: responseBody.usage.output_tokens || 0,
|
|
total_tokens: (responseBody.usage.input_tokens || 0) + (responseBody.usage.output_tokens || 0)
|
|
};
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
// Unknown format, return as-is
|
|
return responseBody;
|
|
}
|
|
|
|
/**
|
|
* Extract usage from non-streaming response body
|
|
* Handles different provider response formats
|
|
*/
|
|
function extractUsageFromResponse(responseBody, provider) {
|
|
if (!responseBody || typeof responseBody !== 'object') return null;
|
|
|
|
// Claude format - check first to avoid conflict with OpenAI check
|
|
if (responseBody.usage && typeof responseBody.usage === 'object' && responseBody.usage.input_tokens !== undefined) {
|
|
return {
|
|
prompt_tokens: responseBody.usage.input_tokens || 0,
|
|
completion_tokens: responseBody.usage.output_tokens || 0,
|
|
cache_read_input_tokens: responseBody.usage.cache_read_input_tokens,
|
|
cache_creation_input_tokens: responseBody.usage.cache_creation_input_tokens
|
|
};
|
|
}
|
|
|
|
// OpenAI format
|
|
if (responseBody.usage && typeof responseBody.usage === 'object' && responseBody.usage.prompt_tokens !== undefined) {
|
|
return {
|
|
prompt_tokens: responseBody.usage.prompt_tokens || 0,
|
|
completion_tokens: responseBody.usage.completion_tokens || 0,
|
|
cached_tokens: responseBody.usage.prompt_tokens_details?.cached_tokens,
|
|
reasoning_tokens: responseBody.usage.completion_tokens_details?.reasoning_tokens
|
|
};
|
|
}
|
|
|
|
// Gemini format
|
|
if (responseBody.usageMetadata && typeof responseBody.usageMetadata === 'object') {
|
|
return {
|
|
prompt_tokens: responseBody.usageMetadata.promptTokenCount || 0,
|
|
completion_tokens: responseBody.usageMetadata.candidatesTokenCount || 0,
|
|
reasoning_tokens: responseBody.usageMetadata.thoughtsTokenCount
|
|
};
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Extract full request configuration from body
|
|
* Captures all relevant parameters for request details
|
|
*/
|
|
function extractRequestConfig(body, stream) {
|
|
const config = {
|
|
messages: body.messages || [],
|
|
model: body.model,
|
|
stream: stream
|
|
};
|
|
|
|
// Add all optional configuration parameters
|
|
const optionalParams = [
|
|
'temperature', 'top_p', 'top_k',
|
|
'max_tokens', 'max_completion_tokens',
|
|
'thinking', 'reasoning', 'enable_thinking',
|
|
'presence_penalty', 'frequency_penalty',
|
|
'seed', 'stop', 'tools', 'tool_choice',
|
|
'response_format', 'prediction', 'store', 'metadata',
|
|
'n', 'logprobs', 'top_logprobs', 'logit_bias',
|
|
'user', 'parallel_tool_calls'
|
|
];
|
|
|
|
for (const param of optionalParams) {
|
|
if (body[param] !== undefined) {
|
|
config[param] = body[param];
|
|
}
|
|
}
|
|
|
|
return config;
|
|
}
|
|
|
|
/**
|
|
* Convert OpenAI-style SSE chunks into a single non-streaming JSON response.
|
|
* Used as a fallback when upstream returns text/event-stream for stream=false.
|
|
*/
|
|
function parseSSEToOpenAIResponse(rawSSE, fallbackModel) {
|
|
const lines = String(rawSSE || "").split("\n");
|
|
const chunks = [];
|
|
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed.startsWith("data:")) continue;
|
|
const payload = trimmed.slice(5).trim();
|
|
if (!payload || payload === "[DONE]") continue;
|
|
try {
|
|
chunks.push(JSON.parse(payload));
|
|
} catch {
|
|
// Ignore malformed SSE lines and continue best-effort parsing.
|
|
}
|
|
}
|
|
|
|
if (chunks.length === 0) return null;
|
|
|
|
const first = chunks[0];
|
|
const contentParts = [];
|
|
const reasoningParts = [];
|
|
let finishReason = "stop";
|
|
let usage = null;
|
|
|
|
for (const chunk of chunks) {
|
|
const choice = chunk?.choices?.[0];
|
|
const delta = choice?.delta || {};
|
|
|
|
if (typeof delta.content === "string" && delta.content.length > 0) {
|
|
contentParts.push(delta.content);
|
|
}
|
|
if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) {
|
|
reasoningParts.push(delta.reasoning_content);
|
|
}
|
|
if (choice?.finish_reason) {
|
|
finishReason = choice.finish_reason;
|
|
}
|
|
if (chunk?.usage && typeof chunk.usage === "object") {
|
|
usage = chunk.usage;
|
|
}
|
|
}
|
|
|
|
const message = {
|
|
role: "assistant",
|
|
content: contentParts.join("")
|
|
};
|
|
if (reasoningParts.length > 0) {
|
|
message.reasoning_content = reasoningParts.join("");
|
|
}
|
|
|
|
const result = {
|
|
id: first.id || `chatcmpl-${Date.now()}`,
|
|
object: "chat.completion",
|
|
created: first.created || Math.floor(Date.now() / 1000),
|
|
model: first.model || fallbackModel || "unknown",
|
|
choices: [
|
|
{
|
|
index: 0,
|
|
message,
|
|
finish_reason: finishReason
|
|
}
|
|
]
|
|
};
|
|
|
|
if (usage) {
|
|
result.usage = usage;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Core chat handler - shared between SSE and Worker
|
|
* Returns { success, response, status, error } for caller to handle fallback
|
|
* @param {object} options
|
|
* @param {object} options.body - Request body
|
|
* @param {object} options.modelInfo - { provider, model }
|
|
* @param {object} options.credentials - Provider credentials
|
|
* @param {object} options.log - Logger instance (optional)
|
|
* @param {function} options.onCredentialsRefreshed - Callback when credentials are refreshed
|
|
* @param {function} options.onRequestSuccess - Callback when request succeeds (to clear error status)
|
|
* @param {function} options.onDisconnect - Callback when client disconnects
|
|
* @param {string} options.connectionId - Connection ID for usage tracking
|
|
* @param {string} options.apiKey - API key for usage tracking
|
|
*/
|
|
export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey }) {
|
|
const { provider, model } = modelInfo;
|
|
const requestStartTime = Date.now();
|
|
|
|
const sourceFormat = detectFormat(body);
|
|
|
|
// Check for bypass patterns (warmup, skip) - return fake response
|
|
const bypassResponse = handleBypassRequest(body, model, userAgent);
|
|
if (bypassResponse) {
|
|
return bypassResponse;
|
|
}
|
|
|
|
// Detect source format and get target format
|
|
// Model-specific targetFormat takes priority over provider default
|
|
|
|
const alias = PROVIDER_ID_TO_ALIAS[provider] || provider;
|
|
const modelTargetFormat = getModelTargetFormat(alias, model);
|
|
const targetFormat = modelTargetFormat || getTargetFormat(provider);
|
|
|
|
// Track if client actually wants streaming (before we force it for providers)
|
|
const clientRequestedStreaming = body.stream === true;
|
|
const providerRequiresStreaming = provider === 'openai' || provider === 'codex';
|
|
|
|
// Force streaming for OpenAI/Codex models (they don't support non-streaming mode properly)
|
|
const stream = providerRequiresStreaming ? true : (body.stream !== false);
|
|
|
|
// Create request logger for this session: sourceFormat_targetFormat_model
|
|
const reqLogger = await createRequestLogger(sourceFormat, targetFormat, model);
|
|
|
|
// 0. Log client raw request (before any conversion)
|
|
if (clientRawRequest) {
|
|
reqLogger.logClientRawRequest(
|
|
clientRawRequest.endpoint,
|
|
clientRawRequest.body,
|
|
clientRawRequest.headers
|
|
);
|
|
}
|
|
|
|
// 1. Log raw request from client
|
|
reqLogger.logRawRequest(body);
|
|
|
|
log?.debug?.("FORMAT", `${sourceFormat} → ${targetFormat} | stream=${stream}`);
|
|
|
|
// Translate request (pass reqLogger for intermediate logging)
|
|
let translatedBody = body;
|
|
translatedBody = translateRequest(sourceFormat, targetFormat, model, body, stream, credentials, provider, reqLogger);
|
|
|
|
// Extract toolNameMap for response translation (Claude OAuth)
|
|
const toolNameMap = translatedBody._toolNameMap;
|
|
delete translatedBody._toolNameMap;
|
|
|
|
// Update model in body
|
|
translatedBody.model = model;
|
|
|
|
// Get executor for this provider
|
|
const executor = getExecutor(provider);
|
|
|
|
// Track pending request
|
|
trackPendingRequest(model, provider, connectionId, true);
|
|
|
|
// Log start
|
|
appendRequestLog({ model, provider, connectionId, status: "PENDING" }).catch(() => { });
|
|
|
|
const msgCount = translatedBody.messages?.length
|
|
|| translatedBody.contents?.length
|
|
|| translatedBody.request?.contents?.length
|
|
|| 0;
|
|
log?.debug?.("REQUEST", `${provider.toUpperCase()} | ${model} | ${msgCount} msgs`);
|
|
|
|
// Create stream controller for disconnect detection
|
|
const streamController = createStreamController({
|
|
onDisconnect: (reason) => {
|
|
// Track request finished (disconnected)
|
|
trackPendingRequest(model, provider, connectionId, false);
|
|
if (onDisconnect) onDisconnect(reason);
|
|
},
|
|
onError: (error) => {
|
|
// Track request finished (error/zombie)
|
|
trackPendingRequest(model, provider, connectionId, false);
|
|
},
|
|
log,
|
|
provider,
|
|
model
|
|
});
|
|
|
|
// Execute request using executor (handles URL building, headers, fallback, transform)
|
|
let providerResponse;
|
|
let providerUrl;
|
|
let providerHeaders;
|
|
let finalBody;
|
|
|
|
try {
|
|
const result = await executor.execute({
|
|
model,
|
|
body: translatedBody,
|
|
stream,
|
|
credentials,
|
|
signal: streamController.signal,
|
|
log
|
|
});
|
|
|
|
providerResponse = result.response;
|
|
providerUrl = result.url;
|
|
providerHeaders = result.headers;
|
|
finalBody = result.transformedBody;
|
|
|
|
// Log target request (final request to provider)
|
|
reqLogger.logTargetRequest(providerUrl, providerHeaders, finalBody);
|
|
|
|
} catch (error) {
|
|
trackPendingRequest(model, provider, connectionId, false);
|
|
appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { });
|
|
|
|
const errorDetail = {
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
connectionId: connectionId || undefined,
|
|
timestamp: new Date().toISOString(),
|
|
latency: { ttft: 0, total: Date.now() - requestStartTime },
|
|
tokens: { prompt_tokens: 0, completion_tokens: 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: translatedBody || null,
|
|
providerResponse: null,
|
|
response: {
|
|
error: error.message || String(error),
|
|
status: error.name === "AbortError" ? 499 : 502,
|
|
thinking: null
|
|
},
|
|
status: "error"
|
|
};
|
|
saveRequestDetail(errorDetail).catch(() => {});
|
|
|
|
if (error.name === "AbortError") {
|
|
streamController.handleError(error);
|
|
return createErrorResult(499, "Request aborted");
|
|
}
|
|
const errMsg = formatProviderError(error, provider, model, HTTP_STATUS.BAD_GATEWAY);
|
|
console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`);
|
|
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, errMsg);
|
|
}
|
|
|
|
// Handle 401/403 - try token refresh using executor
|
|
if (providerResponse.status === HTTP_STATUS.UNAUTHORIZED || providerResponse.status === HTTP_STATUS.FORBIDDEN) {
|
|
const newCredentials = await refreshWithRetry(
|
|
() => executor.refreshCredentials(credentials, log),
|
|
3,
|
|
log
|
|
);
|
|
|
|
if (newCredentials?.accessToken || newCredentials?.copilotToken) {
|
|
log?.info?.("TOKEN", `${provider.toUpperCase()} | refreshed`);
|
|
|
|
// Update credentials
|
|
Object.assign(credentials, newCredentials);
|
|
|
|
// Notify caller about refreshed credentials
|
|
if (onCredentialsRefreshed && newCredentials) {
|
|
await onCredentialsRefreshed(newCredentials);
|
|
}
|
|
|
|
// Retry with new credentials
|
|
try {
|
|
const retryResult = await executor.execute({
|
|
model,
|
|
body: translatedBody,
|
|
stream,
|
|
credentials,
|
|
signal: streamController.signal,
|
|
log
|
|
});
|
|
|
|
if (retryResult.response.ok) {
|
|
providerResponse = retryResult.response;
|
|
providerUrl = retryResult.url;
|
|
}
|
|
} catch (retryError) {
|
|
log?.warn?.("TOKEN", `${provider.toUpperCase()} | retry after refresh failed`);
|
|
}
|
|
} else {
|
|
log?.warn?.("TOKEN", `${provider.toUpperCase()} | refresh failed`);
|
|
}
|
|
}
|
|
|
|
// Check provider response - return error info for fallback handling
|
|
if (!providerResponse.ok) {
|
|
trackPendingRequest(model, provider, connectionId, false);
|
|
const { statusCode, message, retryAfterMs } = await parseUpstreamError(providerResponse, provider);
|
|
appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => { });
|
|
|
|
const errorDetail = {
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
connectionId: connectionId || undefined,
|
|
timestamp: new Date().toISOString(),
|
|
latency: { ttft: 0, total: Date.now() - requestStartTime },
|
|
tokens: { prompt_tokens: 0, completion_tokens: 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: finalBody || translatedBody || null,
|
|
providerResponse: null,
|
|
response: {
|
|
error: message,
|
|
status: statusCode,
|
|
thinking: null
|
|
},
|
|
status: "error"
|
|
};
|
|
saveRequestDetail(errorDetail).catch(() => {});
|
|
|
|
const errMsg = formatProviderError(new Error(message), provider, model, statusCode);
|
|
console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`);
|
|
|
|
// Log Antigravity retry time if available
|
|
if (retryAfterMs && provider === "antigravity") {
|
|
const retrySeconds = Math.ceil(retryAfterMs / 1000);
|
|
log?.debug?.("RETRY", `Antigravity quota reset in ${retrySeconds}s (${retryAfterMs}ms)`);
|
|
}
|
|
|
|
// Log error with full request body for debugging
|
|
reqLogger.logError(new Error(message), finalBody || translatedBody);
|
|
|
|
return createErrorResult(statusCode, errMsg, retryAfterMs);
|
|
}
|
|
|
|
// Provider forced streaming but client wants JSON - convert SSE to JSON
|
|
if (!clientRequestedStreaming && providerRequiresStreaming) {
|
|
trackPendingRequest(model, provider, connectionId, false);
|
|
const contentType = providerResponse.headers.get("content-type") || "";
|
|
|
|
// Treat as SSE if content-type says so OR if it's empty/missing
|
|
// (Codex API doesn't always set Content-Type on streaming responses)
|
|
const isSSEResponse = contentType.includes("text/event-stream") || (contentType === "" && provider === "codex");
|
|
if (isSSEResponse) {
|
|
const isResponsesApi = sourceFormat === 'openai-responses';
|
|
|
|
if (isResponsesApi) {
|
|
// Responses API SSE → Responses API JSON (for pydantic_ai, OpenAI SDK, etc.)
|
|
try {
|
|
const jsonResponse = await convertResponsesStreamToJson(providerResponse.body);
|
|
log?.info?.("STREAM", `Converted Responses API SSE → JSON for non-streaming client`);
|
|
|
|
if (onRequestSuccess) await onRequestSuccess();
|
|
|
|
const usage = jsonResponse.usage || {};
|
|
appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { });
|
|
|
|
if (usage && typeof usage === 'object') {
|
|
const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [USAGE] ${provider.toUpperCase()} | in=${usage?.input_tokens || 0} | out=${usage?.output_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`;
|
|
console.log(`${COLORS.green}${msg}${COLORS.reset}`);
|
|
|
|
saveRequestUsage({
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 },
|
|
timestamp: new Date().toISOString(),
|
|
connectionId: connectionId || undefined,
|
|
apiKey: apiKey || undefined
|
|
}).catch(() => { });
|
|
}
|
|
|
|
const totalLatency = Date.now() - requestStartTime;
|
|
saveRequestDetail({
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
connectionId: connectionId || undefined,
|
|
timestamp: new Date().toISOString(),
|
|
latency: { ttft: totalLatency, total: totalLatency },
|
|
tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: finalBody || translatedBody || null,
|
|
providerResponse: null,
|
|
response: {
|
|
content: jsonResponse.output?.[0]?.content?.[0]?.text || null,
|
|
thinking: null,
|
|
finish_reason: jsonResponse.status || "unknown"
|
|
},
|
|
status: "success",
|
|
endpoint: clientRawRequest?.endpoint || null
|
|
}).catch(() => { });
|
|
|
|
return {
|
|
success: true,
|
|
response: new Response(JSON.stringify(jsonResponse), {
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Access-Control-Allow-Origin": "*"
|
|
}
|
|
})
|
|
};
|
|
} catch (error) {
|
|
console.error("[ChatCore] Responses API SSE→JSON conversion failed:", error);
|
|
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
|
|
}
|
|
} else {
|
|
// Chat Completions SSE → Chat Completions JSON
|
|
try {
|
|
const sseText = await providerResponse.text();
|
|
const parsed = parseSSEToOpenAIResponse(sseText, model);
|
|
if (parsed) {
|
|
if (onRequestSuccess) await onRequestSuccess();
|
|
|
|
const usage = parsed.usage || {};
|
|
appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { });
|
|
|
|
if (usage && typeof usage === "object") {
|
|
saveRequestUsage({
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
tokens: usage,
|
|
timestamp: new Date().toISOString(),
|
|
connectionId: connectionId || undefined,
|
|
apiKey: apiKey || undefined,
|
|
endpoint: clientRawRequest?.endpoint || null
|
|
}).catch(() => { });
|
|
}
|
|
|
|
const totalLatency = Date.now() - requestStartTime;
|
|
saveRequestDetail({
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
connectionId: connectionId || undefined,
|
|
timestamp: new Date().toISOString(),
|
|
latency: { ttft: totalLatency, total: totalLatency },
|
|
tokens: usage,
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: finalBody || translatedBody || null,
|
|
providerResponse: null,
|
|
response: {
|
|
content: parsed.choices?.[0]?.message?.content || null,
|
|
thinking: parsed.choices?.[0]?.message?.reasoning_content || null,
|
|
finish_reason: parsed.choices?.[0]?.finish_reason || "unknown"
|
|
},
|
|
status: "success",
|
|
endpoint: clientRawRequest?.endpoint || null
|
|
}).catch(() => { });
|
|
|
|
return {
|
|
success: true,
|
|
response: new Response(JSON.stringify(parsed), {
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Access-Control-Allow-Origin": "*"
|
|
}
|
|
})
|
|
};
|
|
}
|
|
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
|
|
} catch (error) {
|
|
console.error("[ChatCore] Chat Completions SSE→JSON conversion failed:", error);
|
|
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Non-streaming response
|
|
if (!stream) {
|
|
trackPendingRequest(model, provider, connectionId, false);
|
|
const contentType = providerResponse.headers.get("content-type") || "";
|
|
let responseBody;
|
|
|
|
if (contentType.includes("text/event-stream")) {
|
|
// Upstream returned SSE even though stream=false; convert best-effort to JSON.
|
|
const sseText = await providerResponse.text();
|
|
const parsedFromSSE = parseSSEToOpenAIResponse(sseText, model);
|
|
if (!parsedFromSSE) {
|
|
appendRequestLog({ model, provider, connectionId, status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { });
|
|
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
|
|
}
|
|
responseBody = parsedFromSSE;
|
|
} else {
|
|
try {
|
|
responseBody = await providerResponse.json();
|
|
} catch (parseError) {
|
|
appendRequestLog({ model, provider, connectionId, status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { });
|
|
console.error(`[ChatCore] Failed to parse JSON response from ${provider}:`, parseError.message);
|
|
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, `Invalid JSON response from ${provider}`);
|
|
}
|
|
}
|
|
|
|
// Log provider response (raw response from provider)
|
|
reqLogger.logProviderResponse(
|
|
providerResponse.status,
|
|
providerResponse.statusText,
|
|
providerResponse.headers,
|
|
responseBody
|
|
);
|
|
|
|
// Notify success - caller can clear error status if needed
|
|
if (onRequestSuccess) {
|
|
await onRequestSuccess();
|
|
}
|
|
|
|
// Log usage for non-streaming responses
|
|
const usage = extractUsageFromResponse(responseBody, provider);
|
|
appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { });
|
|
if (usage && typeof usage === 'object') {
|
|
const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [USAGE] ${provider.toUpperCase()} | in=${usage?.prompt_tokens || 0} | out=${usage?.completion_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`;
|
|
console.log(`${COLORS.green}${msg}${COLORS.reset}`);
|
|
|
|
saveRequestUsage({
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
tokens: usage,
|
|
timestamp: new Date().toISOString(),
|
|
connectionId: connectionId || undefined,
|
|
apiKey: apiKey || undefined,
|
|
endpoint: clientRawRequest?.endpoint || null
|
|
}).catch(err => {
|
|
console.error("Failed to save usage stats:", err.message);
|
|
});
|
|
}
|
|
|
|
// Translate response to client's expected format (usually OpenAI)
|
|
const translatedResponse = needsTranslation(targetFormat, sourceFormat)
|
|
? translateNonStreamingResponse(responseBody, targetFormat, sourceFormat)
|
|
: responseBody;
|
|
|
|
// Ensure OpenAI-required fields are present (needed for Letta and other strict clients)
|
|
if (!translatedResponse.object) translatedResponse.object = "chat.completion";
|
|
if (!translatedResponse.created) translatedResponse.created = Math.floor(Date.now() / 1000);
|
|
|
|
// Strip Azure-specific non-standard fields
|
|
if (translatedResponse.prompt_filter_results !== undefined) {
|
|
delete translatedResponse.prompt_filter_results;
|
|
}
|
|
if (translatedResponse?.choices) {
|
|
for (const choice of translatedResponse.choices) {
|
|
if (choice.content_filter_results !== undefined) {
|
|
delete choice.content_filter_results;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add buffer and filter usage for client (to prevent CLI context errors)
|
|
if (translatedResponse?.usage) {
|
|
const buffered = addBufferToUsage(translatedResponse.usage);
|
|
translatedResponse.usage = filterUsageForFormat(buffered, sourceFormat);
|
|
}
|
|
|
|
// Log converted response (final response to client)
|
|
reqLogger.logConvertedResponse(translatedResponse);
|
|
|
|
const totalLatency = Date.now() - requestStartTime;
|
|
const requestDetail = {
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
connectionId: connectionId || undefined,
|
|
timestamp: new Date().toISOString(),
|
|
latency: {
|
|
ttft: totalLatency,
|
|
total: totalLatency
|
|
},
|
|
tokens: usage || { prompt_tokens: 0, completion_tokens: 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: finalBody || translatedBody || null,
|
|
providerResponse: responseBody || null,
|
|
response: {
|
|
content: translatedResponse?.choices?.[0]?.message?.content ||
|
|
translatedResponse?.content ||
|
|
null,
|
|
thinking: translatedResponse?.choices?.[0]?.message?.reasoning_content ||
|
|
translatedResponse?.reasoning_content ||
|
|
null,
|
|
finish_reason: translatedResponse?.choices?.[0]?.finish_reason || "unknown"
|
|
},
|
|
status: "success",
|
|
endpoint: clientRawRequest?.endpoint || null
|
|
};
|
|
|
|
// Async save (don't block response)
|
|
saveRequestDetail(requestDetail).catch(err => {
|
|
console.error("[RequestDetail] Failed to save:", err.message);
|
|
});
|
|
|
|
return {
|
|
success: true,
|
|
response: new Response(JSON.stringify(translatedResponse), {
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Access-Control-Allow-Origin": "*"
|
|
}
|
|
})
|
|
};
|
|
}
|
|
|
|
// Streaming response
|
|
|
|
// Notify success - caller can clear error status if needed
|
|
if (onRequestSuccess) {
|
|
await onRequestSuccess();
|
|
}
|
|
|
|
const responseHeaders = {
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"Access-Control-Allow-Origin": "*"
|
|
};
|
|
|
|
let streamContent = "";
|
|
let streamUsage = null;
|
|
const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
|
|
|
|
const onStreamComplete = (contentObj, usage, ttftAt) => {
|
|
// contentObj is object { content, thinking }
|
|
streamUsage = usage;
|
|
|
|
const updatedDetail = {
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
connectionId: connectionId || undefined,
|
|
timestamp: new Date().toISOString(),
|
|
latency: {
|
|
ttft: ttftAt ? ttftAt - requestStartTime : Date.now() - requestStartTime,
|
|
total: Date.now() - requestStartTime
|
|
},
|
|
tokens: usage || { prompt_tokens: 0, completion_tokens: 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: finalBody || translatedBody || null,
|
|
providerResponse: contentObj.content || "[Empty streaming response]",
|
|
response: {
|
|
content: contentObj.content || "[Empty streaming response]",
|
|
thinking: contentObj.thinking || null,
|
|
type: "streaming"
|
|
},
|
|
status: "success",
|
|
id: streamDetailId
|
|
};
|
|
|
|
saveRequestDetail(updatedDetail).catch(err => {
|
|
console.error("[RequestDetail] Failed to update streaming content:", err.message);
|
|
});
|
|
|
|
// Save usage stats for dashboard
|
|
if (usage && typeof usage === 'object') {
|
|
const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [STREAM USAGE] ${provider.toUpperCase()} | in=${usage?.prompt_tokens || 0} | out=${usage?.completion_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`;
|
|
console.log(`${COLORS.green}${msg}${COLORS.reset}`);
|
|
|
|
saveRequestUsage({
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
tokens: usage,
|
|
timestamp: new Date().toISOString(),
|
|
connectionId: connectionId || undefined,
|
|
apiKey: apiKey || undefined,
|
|
endpoint: clientRawRequest?.endpoint || null
|
|
}).catch(err => {
|
|
console.error("Failed to save streaming usage stats:", err.message);
|
|
});
|
|
}
|
|
};
|
|
|
|
let transformStream;
|
|
const isDroidCLI = userAgent?.toLowerCase().includes('droid') || userAgent?.toLowerCase().includes('codex-cli');
|
|
const needsCodexTranslation = provider === 'codex'
|
|
&& targetFormat === 'openai-responses'
|
|
&& !isDroidCLI;
|
|
|
|
if (needsCodexTranslation) {
|
|
log?.debug?.("STREAM", `Codex translation mode: openai-responses → openai`);
|
|
transformStream = createSSETransformStreamWithLogger('openai-responses', 'openai', provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey);
|
|
} else if (needsTranslation(targetFormat, sourceFormat)) {
|
|
log?.debug?.("STREAM", `Translation mode: ${targetFormat} → ${sourceFormat}`);
|
|
transformStream = createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey);
|
|
} else {
|
|
log?.debug?.("STREAM", `Standard passthrough mode`);
|
|
transformStream = createPassthroughStreamWithLogger(provider, reqLogger, model, connectionId, body, onStreamComplete, apiKey);
|
|
}
|
|
|
|
const transformedBody = pipeWithDisconnect(providerResponse, transformStream, streamController);
|
|
|
|
const totalLatency = Date.now() - requestStartTime;
|
|
const streamingDetail = {
|
|
provider: provider || "unknown",
|
|
model: model || "unknown",
|
|
connectionId: connectionId || undefined,
|
|
timestamp: new Date().toISOString(),
|
|
latency: {
|
|
ttft: 0,
|
|
total: Date.now() - requestStartTime
|
|
},
|
|
tokens: { prompt_tokens: 0, completion_tokens: 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: finalBody || translatedBody || null,
|
|
providerResponse: "[Streaming - raw response not captured]",
|
|
response: {
|
|
content: "[Streaming in progress...]",
|
|
thinking: null,
|
|
type: "streaming"
|
|
},
|
|
status: "success",
|
|
id: streamDetailId
|
|
};
|
|
|
|
saveRequestDetail(streamingDetail).catch(err => {
|
|
console.error("[RequestDetail] Failed to save streaming request:", err.message);
|
|
});
|
|
|
|
return {
|
|
success: true,
|
|
response: new Response(transformedBody, {
|
|
headers: responseHeaders
|
|
})
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Check if token is expired or about to expire
|
|
*/
|
|
export function isTokenExpiringSoon(expiresAt, bufferMs = 5 * 60 * 1000) {
|
|
if (!expiresAt) return false;
|
|
const expiresAtMs = new Date(expiresAt).getTime();
|
|
return expiresAtMs - Date.now() < bufferMs;
|
|
}
|