import { detectFormat, getTargetFormat } from "../services/provider.js"; import { translateRequest } from "../translator/index.js"; import { FORMATS } from "../translator/formats.js"; import { COLORS } from "../utils/stream.js"; import { createStreamController } from "../utils/streamHandler.js"; import { refreshWithRetry } from "../services/tokenRefresh.js"; import { createRequestLogger } from "../utils/requestLogger.js"; import { getModelTargetFormat, getModelStrip, PROVIDER_ID_TO_ALIAS } from "../config/providerModels.js"; import { createErrorResult, parseUpstreamError, formatProviderError } from "../utils/error.js"; import { HTTP_STATUS } from "../config/runtimeConfig.js"; import { handleBypassRequest } from "../utils/bypassHandler.js"; import { trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js"; import { getExecutor } from "../executors/index.js"; import { buildRequestDetail, extractRequestConfig } from "./chatCore/requestDetail.js"; import { handleForcedSSEToJson } from "./chatCore/sseToJsonHandler.js"; import { handleNonStreamingResponse } from "./chatCore/nonStreamingHandler.js"; import { handleStreamingResponse, buildOnStreamComplete } from "./chatCore/streamingHandler.js"; import { detectClientTool, isNativePassthrough } from "../utils/clientDetector.js"; import { injectCaveman } from "../rtk/caveman.js"; import { compressMessages, formatRtkLog } from "../rtk/index.js"; /** * Core chat handler - shared between SSE and Worker * @param {object} options.body - Request body * @param {object} options.modelInfo - { provider, model } * @param {object} options.credentials - Provider credentials * @param {string} options.sourceFormatOverride - Override detected source format (e.g. "openai-responses") */ export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey, ccFilterNaming, rtkEnabled, cavemanEnabled, cavemanLevel, sourceFormatOverride, providerThinking }) { const { provider, model } = modelInfo; const requestStartTime = Date.now(); const sourceFormat = sourceFormatOverride || detectFormat(body); // Check for bypass patterns (warmup, skip, cc naming) const bypassResponse = handleBypassRequest(body, model, userAgent, ccFilterNaming); if (bypassResponse) return bypassResponse; const alias = PROVIDER_ID_TO_ALIAS[provider] || provider; const modelTargetFormat = getModelTargetFormat(alias, model); const targetFormat = modelTargetFormat || getTargetFormat(provider); const stripList = getModelStrip(alias, model); // Inject provider-level thinking config override (only if client hasn't set) // on/off → extended type (body.thinking), none/low/medium/high → effort type (body.reasoning_effort) if (providerThinking?.mode && providerThinking.mode !== "auto") { const mode = providerThinking.mode; if (mode === "on" && !body.thinking) { console.log("Injecting provider-level thinking config override: on"); body = { ...body, thinking: { type: "enabled", budget_tokens: 10000 } }; } else if (mode === "off" && !body.thinking) { body = { ...body, thinking: { type: "disabled" } }; } else if (!body.reasoning_effort) { body = { ...body, reasoning_effort: mode }; } } const clientRequestedStreaming = body.stream === true || sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI; const providerRequiresStreaming = provider === "openai" || provider === "codex" || provider === "commandcode"; let stream = providerRequiresStreaming ? true : (body.stream !== false); // Check client Accept header preference for non-streaming requests // This fixes AI SDK compatibility where clients send Accept: application/json const acceptHeader = clientRawRequest?.headers?.accept || ""; const clientPrefersJson = acceptHeader.includes("application/json"); const clientPrefersSSE = acceptHeader.includes("text/event-stream"); if (clientPrefersJson && !clientPrefersSSE && body.stream !== true) { stream = false; } const reqLogger = await createRequestLogger(sourceFormat, targetFormat, model); if (clientRawRequest) reqLogger.logClientRawRequest(clientRawRequest.endpoint, clientRawRequest.body, clientRawRequest.headers); reqLogger.logRawRequest(body); log?.debug?.("FORMAT", `${sourceFormat} → ${targetFormat} | stream=${stream}`); // Native passthrough: CLI tool and provider are the same ecosystem // Skip all translation/normalization — only model and Bearer are swapped const clientTool = detectClientTool(clientRawRequest?.headers || {}, body); const passthrough = isNativePassthrough(clientTool, provider); let translatedBody; let toolNameMap; if (passthrough) { log?.debug?.("PASSTHROUGH", `${clientTool} → ${provider} | native lossless`); translatedBody = { ...body, model }; } else { translatedBody = translateRequest(sourceFormat, targetFormat, model, body, stream, credentials, provider, reqLogger, stripList, connectionId, clientTool); if (!translatedBody) { trackPendingRequest(model, provider, connectionId, false, true); return createErrorResult(HTTP_STATUS.BAD_REQUEST, `Failed to translate request for ${sourceFormat} → ${targetFormat}`); } toolNameMap = translatedBody._toolNameMap; delete translatedBody._toolNameMap; translatedBody.model = model; } // Token savers: applied at the final body just before dispatch // Covers both passthrough (source shape) and translated (target shape) flows const finalFormat = passthrough ? sourceFormat : targetFormat; // RTK: compress tool_result content const rtkStats = compressMessages(translatedBody, rtkEnabled); const rtkLine = formatRtkLog(rtkStats); if (rtkLine) console.log(rtkLine); // Caveman: inject terse-style system prompt if (cavemanEnabled && cavemanLevel) { injectCaveman(translatedBody, finalFormat, cavemanLevel); log?.debug?.("CAVEMAN", `${cavemanLevel} | ${finalFormat}`); } const executor = getExecutor(provider); trackPendingRequest(model, provider, connectionId, true); appendRequestLog({ model, provider, connectionId, status: "PENDING" }).catch(() => {}); const msgCount = translatedBody.messages?.length || translatedBody.input?.length || translatedBody.contents?.length || translatedBody.request?.contents?.length || 0; log?.debug?.("REQUEST", `${provider.toUpperCase()} | ${model} | ${msgCount} msgs`); const streamController = createStreamController({ onDisconnect: (reason) => { trackPendingRequest(model, provider, connectionId, false); if (onDisconnect) onDisconnect(reason); }, onError: () => trackPendingRequest(model, provider, connectionId, false), log, provider, model }); const proxyOptions = { connectionProxyEnabled: credentials?.providerSpecificData?.connectionProxyEnabled === true, connectionProxyUrl: credentials?.providerSpecificData?.connectionProxyUrl || "", connectionNoProxy: credentials?.providerSpecificData?.connectionNoProxy || "", vercelRelayUrl: credentials?.providerSpecificData?.vercelRelayUrl || "", }; if (proxyOptions.vercelRelayUrl) { const connectionName = credentials?.connectionName || credentials?.connectionId || "unknown"; const poolId = credentials?.providerSpecificData?.connectionProxyPoolId || "none"; log?.info?.("PROXY", `${provider.toUpperCase()} | ${model} | conn=${connectionName} | pool=${poolId} | vercel-relay=${proxyOptions.vercelRelayUrl}`); } else if (proxyOptions.connectionProxyEnabled && proxyOptions.connectionProxyUrl) { let maskedProxyUrl = proxyOptions.connectionProxyUrl; try { const parsed = new URL(proxyOptions.connectionProxyUrl); const host = parsed.hostname || ""; const port = parsed.port ? `:${parsed.port}` : ""; const protocol = parsed.protocol || "http:"; maskedProxyUrl = `${protocol}//${host}${port}`; } catch { // Keep raw if URL parsing fails } const poolId = credentials?.providerSpecificData?.connectionProxyPoolId || "none"; const connectionName = credentials?.connectionName || credentials?.connectionId || "unknown"; log?.info?.("PROXY", `${provider.toUpperCase()} | ${model} | conn=${connectionName} | pool=${poolId} | url=${maskedProxyUrl}`); } if (proxyOptions.connectionProxyEnabled && proxyOptions.connectionNoProxy) { const connectionName = credentials?.connectionName || credentials?.connectionId || "unknown"; log?.debug?.("PROXY", `${provider.toUpperCase()} | ${model} | conn=${connectionName} | no_proxy=${proxyOptions.connectionNoProxy}`); } // Execute request let providerResponse, providerUrl, providerHeaders, finalBody; try { const result = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log, proxyOptions }); providerResponse = result.response; providerUrl = result.url; providerHeaders = result.headers; finalBody = result.transformedBody; reqLogger.logTargetRequest(providerUrl, providerHeaders, finalBody); } catch (error) { trackPendingRequest(model, provider, connectionId, false, true); appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : HTTP_STATUS.BAD_GATEWAY}` }).catch(() => {}); saveRequestDetail(buildRequestDetail({ provider, model, connectionId, latency: { ttft: 0, total: Date.now() - requestStartTime }, tokens: { prompt_tokens: 0, completion_tokens: 0 }, request: extractRequestConfig(body, stream), providerRequest: translatedBody || null, response: { error: error.message || String(error), status: error.name === "AbortError" ? 499 : 502, thinking: null }, status: "error" })).catch(() => {}); if (error.name === "AbortError") { streamController.handleError(error); return createErrorResult(499, "Request aborted"); } const errMsg = formatProviderError(error, provider, model, HTTP_STATUS.BAD_GATEWAY); console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`); return createErrorResult(HTTP_STATUS.BAD_GATEWAY, errMsg); } // Handle 401/403 - try token refresh (skip for noAuth providers) if (!executor.noAuth && (providerResponse.status === HTTP_STATUS.UNAUTHORIZED || providerResponse.status === HTTP_STATUS.FORBIDDEN)) { try { const newCredentials = await refreshWithRetry(() => executor.refreshCredentials(credentials, log), 3, log); if (newCredentials?.accessToken || newCredentials?.copilotToken) { log?.info?.("TOKEN", `${provider.toUpperCase()} | refreshed`); Object.assign(credentials, newCredentials); if (onCredentialsRefreshed) { try { await onCredentialsRefreshed(newCredentials); } catch (e) { log?.warn?.("TOKEN", `onCredentialsRefreshed failed: ${e.message}`); } } try { const retryResult = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log, proxyOptions }); if (retryResult.response.ok) { providerResponse = retryResult.response; providerUrl = retryResult.url; } } catch { log?.warn?.("TOKEN", `${provider.toUpperCase()} | retry after refresh failed`); } } else { log?.warn?.("TOKEN", `${provider.toUpperCase()} | refresh failed`); } } catch (e) { log?.warn?.("TOKEN", `${provider.toUpperCase()} | refresh threw: ${e.message}`); } } // Provider returned error if (!providerResponse.ok) { trackPendingRequest(model, provider, connectionId, false, true); const { statusCode, message, resetsAtMs } = await parseUpstreamError(providerResponse, executor); appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => {}); saveRequestDetail(buildRequestDetail({ provider, model, connectionId, latency: { ttft: 0, total: Date.now() - requestStartTime }, tokens: { prompt_tokens: 0, completion_tokens: 0 }, request: extractRequestConfig(body, stream), providerRequest: finalBody || translatedBody || null, response: { error: message, status: statusCode, thinking: null }, status: "error" })).catch(() => {}); const errMsg = formatProviderError(new Error(message), provider, model, statusCode); console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`); reqLogger.logError(new Error(message), finalBody || translatedBody); return createErrorResult(statusCode, errMsg, resetsAtMs); } const sharedCtx = { provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess }; const appendLog = (extra) => appendRequestLog({ model, provider, connectionId, ...extra }).catch(() => {}); const trackDone = () => trackPendingRequest(model, provider, connectionId, false); // Provider forced streaming but client wants JSON if (!clientRequestedStreaming && providerRequiresStreaming) { const result = await handleForcedSSEToJson({ ...sharedCtx, providerResponse, sourceFormat, trackDone, appendLog }); if (result) { streamController.handleComplete(); return result; } } // True non-streaming response if (!stream) { const result = await handleNonStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, reqLogger, toolNameMap, trackDone, appendLog }); streamController.handleComplete(); return result; } // Streaming response const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx }); return handleStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete }); } export function isTokenExpiringSoon(expiresAt, bufferMs = 5 * 60 * 1000) { if (!expiresAt) return false; return new Date(expiresAt).getTime() - Date.now() < bufferMs; }