mirror of
https://github.com/decolua/9router.git
synced 2026-05-08 12:01:28 +00:00
244 lines
13 KiB
JavaScript
244 lines
13 KiB
JavaScript
import { detectFormat, getTargetFormat } from "../services/provider.js";
|
|
import { translateRequest } from "../translator/index.js";
|
|
import { FORMATS } from "../translator/formats.js";
|
|
import { COLORS } from "../utils/stream.js";
|
|
import { createStreamController } from "../utils/streamHandler.js";
|
|
import { refreshWithRetry } from "../services/tokenRefresh.js";
|
|
import { createRequestLogger } from "../utils/requestLogger.js";
|
|
import { getModelTargetFormat, getModelStrip, PROVIDER_ID_TO_ALIAS } from "../config/providerModels.js";
|
|
import { createErrorResult, parseUpstreamError, formatProviderError } from "../utils/error.js";
|
|
import { HTTP_STATUS } from "../config/runtimeConfig.js";
|
|
import { handleBypassRequest } from "../utils/bypassHandler.js";
|
|
import { trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
|
|
import { getExecutor } from "../executors/index.js";
|
|
import { buildRequestDetail, extractRequestConfig } from "./chatCore/requestDetail.js";
|
|
import { handleForcedSSEToJson } from "./chatCore/sseToJsonHandler.js";
|
|
import { handleNonStreamingResponse } from "./chatCore/nonStreamingHandler.js";
|
|
import { handleStreamingResponse, buildOnStreamComplete } from "./chatCore/streamingHandler.js";
|
|
import { detectClientTool, isNativePassthrough } from "../utils/clientDetector.js";
|
|
|
|
/**
|
|
* Core chat handler - shared between SSE and Worker
|
|
* @param {object} options.body - Request body
|
|
* @param {object} options.modelInfo - { provider, model }
|
|
* @param {object} options.credentials - Provider credentials
|
|
* @param {string} options.sourceFormatOverride - Override detected source format (e.g. "openai-responses")
|
|
*/
|
|
export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey, ccFilterNaming, sourceFormatOverride, providerThinking }) {
|
|
const { provider, model } = modelInfo;
|
|
const requestStartTime = Date.now();
|
|
|
|
const sourceFormat = sourceFormatOverride || detectFormat(body);
|
|
|
|
// Check for bypass patterns (warmup, skip, cc naming)
|
|
const bypassResponse = handleBypassRequest(body, model, userAgent, ccFilterNaming);
|
|
if (bypassResponse) return bypassResponse;
|
|
|
|
const alias = PROVIDER_ID_TO_ALIAS[provider] || provider;
|
|
const modelTargetFormat = getModelTargetFormat(alias, model);
|
|
const targetFormat = modelTargetFormat || getTargetFormat(provider);
|
|
const stripList = getModelStrip(alias, model);
|
|
|
|
// Inject provider-level thinking config override (only if client hasn't set)
|
|
// on/off → extended type (body.thinking), none/low/medium/high → effort type (body.reasoning_effort)
|
|
if (providerThinking?.mode && providerThinking.mode !== "auto") {
|
|
const mode = providerThinking.mode;
|
|
if (mode === "on" && !body.thinking) {
|
|
console.log("Injecting provider-level thinking config override: on");
|
|
body = { ...body, thinking: { type: "enabled", budget_tokens: 10000 } };
|
|
} else if (mode === "off" && !body.thinking) {
|
|
body = { ...body, thinking: { type: "disabled" } };
|
|
} else if (!body.reasoning_effort) {
|
|
body = { ...body, reasoning_effort: mode };
|
|
}
|
|
}
|
|
|
|
const clientRequestedStreaming = body.stream === true || sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI;
|
|
const providerRequiresStreaming = provider === "openai" || provider === "codex";
|
|
let stream = providerRequiresStreaming ? true : (body.stream !== false);
|
|
|
|
// Check client Accept header preference for non-streaming requests
|
|
// This fixes AI SDK compatibility where clients send Accept: application/json
|
|
const acceptHeader = clientRawRequest?.headers?.accept || "";
|
|
const clientPrefersJson = acceptHeader.includes("application/json");
|
|
const clientPrefersSSE = acceptHeader.includes("text/event-stream");
|
|
if (clientPrefersJson && !clientPrefersSSE && body.stream !== true) {
|
|
stream = false;
|
|
}
|
|
|
|
const reqLogger = await createRequestLogger(sourceFormat, targetFormat, model);
|
|
if (clientRawRequest) reqLogger.logClientRawRequest(clientRawRequest.endpoint, clientRawRequest.body, clientRawRequest.headers);
|
|
reqLogger.logRawRequest(body);
|
|
log?.debug?.("FORMAT", `${sourceFormat} → ${targetFormat} | stream=${stream}`);
|
|
|
|
// Native passthrough: CLI tool and provider are the same ecosystem
|
|
// Skip all translation/normalization — only model and Bearer are swapped
|
|
const clientTool = detectClientTool(clientRawRequest?.headers || {}, body);
|
|
const passthrough = isNativePassthrough(clientTool, provider);
|
|
|
|
let translatedBody;
|
|
let toolNameMap;
|
|
if (passthrough) {
|
|
log?.debug?.("PASSTHROUGH", `${clientTool} → ${provider} | native lossless`);
|
|
translatedBody = { ...body, model };
|
|
} else {
|
|
translatedBody = translateRequest(sourceFormat, targetFormat, model, body, stream, credentials, provider, reqLogger, stripList, connectionId);
|
|
if (!translatedBody) {
|
|
trackPendingRequest(model, provider, connectionId, false, true);
|
|
return createErrorResult(HTTP_STATUS.BAD_REQUEST, `Failed to translate request for ${sourceFormat} → ${targetFormat}`);
|
|
}
|
|
toolNameMap = translatedBody._toolNameMap;
|
|
delete translatedBody._toolNameMap;
|
|
translatedBody.model = model;
|
|
}
|
|
|
|
const executor = getExecutor(provider);
|
|
trackPendingRequest(model, provider, connectionId, true);
|
|
appendRequestLog({ model, provider, connectionId, status: "PENDING" }).catch(() => {});
|
|
|
|
const msgCount = translatedBody.messages?.length || translatedBody.input?.length || translatedBody.contents?.length || translatedBody.request?.contents?.length || 0;
|
|
log?.debug?.("REQUEST", `${provider.toUpperCase()} | ${model} | ${msgCount} msgs`);
|
|
|
|
const streamController = createStreamController({
|
|
onDisconnect: (reason) => {
|
|
trackPendingRequest(model, provider, connectionId, false);
|
|
if (onDisconnect) onDisconnect(reason);
|
|
},
|
|
onError: () => trackPendingRequest(model, provider, connectionId, false),
|
|
log, provider, model
|
|
});
|
|
|
|
const proxyOptions = {
|
|
connectionProxyEnabled: credentials?.providerSpecificData?.connectionProxyEnabled === true,
|
|
connectionProxyUrl: credentials?.providerSpecificData?.connectionProxyUrl || "",
|
|
connectionNoProxy: credentials?.providerSpecificData?.connectionNoProxy || "",
|
|
vercelRelayUrl: credentials?.providerSpecificData?.vercelRelayUrl || "",
|
|
};
|
|
|
|
if (proxyOptions.vercelRelayUrl) {
|
|
const connectionName = credentials?.connectionName || credentials?.connectionId || "unknown";
|
|
const poolId = credentials?.providerSpecificData?.connectionProxyPoolId || "none";
|
|
log?.info?.("PROXY", `${provider.toUpperCase()} | ${model} | conn=${connectionName} | pool=${poolId} | vercel-relay=${proxyOptions.vercelRelayUrl}`);
|
|
} else if (proxyOptions.connectionProxyEnabled && proxyOptions.connectionProxyUrl) {
|
|
let maskedProxyUrl = proxyOptions.connectionProxyUrl;
|
|
try {
|
|
const parsed = new URL(proxyOptions.connectionProxyUrl);
|
|
const host = parsed.hostname || "";
|
|
const port = parsed.port ? `:${parsed.port}` : "";
|
|
const protocol = parsed.protocol || "http:";
|
|
maskedProxyUrl = `${protocol}//${host}${port}`;
|
|
} catch {
|
|
// Keep raw if URL parsing fails
|
|
}
|
|
|
|
const poolId = credentials?.providerSpecificData?.connectionProxyPoolId || "none";
|
|
const connectionName = credentials?.connectionName || credentials?.connectionId || "unknown";
|
|
log?.info?.("PROXY", `${provider.toUpperCase()} | ${model} | conn=${connectionName} | pool=${poolId} | url=${maskedProxyUrl}`);
|
|
}
|
|
|
|
if (proxyOptions.connectionProxyEnabled && proxyOptions.connectionNoProxy) {
|
|
const connectionName = credentials?.connectionName || credentials?.connectionId || "unknown";
|
|
log?.debug?.("PROXY", `${provider.toUpperCase()} | ${model} | conn=${connectionName} | no_proxy=${proxyOptions.connectionNoProxy}`);
|
|
}
|
|
|
|
// Execute request
|
|
let providerResponse, providerUrl, providerHeaders, finalBody;
|
|
try {
|
|
const result = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log, proxyOptions });
|
|
providerResponse = result.response;
|
|
providerUrl = result.url;
|
|
providerHeaders = result.headers;
|
|
finalBody = result.transformedBody;
|
|
reqLogger.logTargetRequest(providerUrl, providerHeaders, finalBody);
|
|
} catch (error) {
|
|
trackPendingRequest(model, provider, connectionId, false, true);
|
|
appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : HTTP_STATUS.BAD_GATEWAY}` }).catch(() => {});
|
|
saveRequestDetail(buildRequestDetail({
|
|
provider, model, connectionId,
|
|
latency: { ttft: 0, total: Date.now() - requestStartTime },
|
|
tokens: { prompt_tokens: 0, completion_tokens: 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: translatedBody || null,
|
|
response: { error: error.message || String(error), status: error.name === "AbortError" ? 499 : 502, thinking: null },
|
|
status: "error"
|
|
})).catch(() => {});
|
|
|
|
if (error.name === "AbortError") {
|
|
streamController.handleError(error);
|
|
return createErrorResult(499, "Request aborted");
|
|
}
|
|
const errMsg = formatProviderError(error, provider, model, HTTP_STATUS.BAD_GATEWAY);
|
|
console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`);
|
|
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, errMsg);
|
|
}
|
|
|
|
// Handle 401/403 - try token refresh (skip for noAuth providers)
|
|
if (!executor.noAuth && (providerResponse.status === HTTP_STATUS.UNAUTHORIZED || providerResponse.status === HTTP_STATUS.FORBIDDEN)) {
|
|
try {
|
|
const newCredentials = await refreshWithRetry(() => executor.refreshCredentials(credentials, log), 3, log);
|
|
if (newCredentials?.accessToken || newCredentials?.copilotToken) {
|
|
log?.info?.("TOKEN", `${provider.toUpperCase()} | refreshed`);
|
|
Object.assign(credentials, newCredentials);
|
|
if (onCredentialsRefreshed) {
|
|
try { await onCredentialsRefreshed(newCredentials); } catch (e) { log?.warn?.("TOKEN", `onCredentialsRefreshed failed: ${e.message}`); }
|
|
}
|
|
try {
|
|
const retryResult = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log, proxyOptions });
|
|
if (retryResult.response.ok) { providerResponse = retryResult.response; providerUrl = retryResult.url; }
|
|
} catch { log?.warn?.("TOKEN", `${provider.toUpperCase()} | retry after refresh failed`); }
|
|
} else {
|
|
log?.warn?.("TOKEN", `${provider.toUpperCase()} | refresh failed`);
|
|
}
|
|
} catch (e) {
|
|
log?.warn?.("TOKEN", `${provider.toUpperCase()} | refresh threw: ${e.message}`);
|
|
}
|
|
}
|
|
|
|
// Provider returned error
|
|
if (!providerResponse.ok) {
|
|
trackPendingRequest(model, provider, connectionId, false, true);
|
|
const { statusCode, message } = await parseUpstreamError(providerResponse);
|
|
appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => {});
|
|
saveRequestDetail(buildRequestDetail({
|
|
provider, model, connectionId,
|
|
latency: { ttft: 0, total: Date.now() - requestStartTime },
|
|
tokens: { prompt_tokens: 0, completion_tokens: 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: finalBody || translatedBody || null,
|
|
response: { error: message, status: statusCode, thinking: null },
|
|
status: "error"
|
|
})).catch(() => {});
|
|
|
|
const errMsg = formatProviderError(new Error(message), provider, model, statusCode);
|
|
console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`);
|
|
reqLogger.logError(new Error(message), finalBody || translatedBody);
|
|
return createErrorResult(statusCode, errMsg);
|
|
}
|
|
|
|
const sharedCtx = { provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess };
|
|
const appendLog = (extra) => appendRequestLog({ model, provider, connectionId, ...extra }).catch(() => {});
|
|
const trackDone = () => trackPendingRequest(model, provider, connectionId, false);
|
|
|
|
// Provider forced streaming but client wants JSON
|
|
if (!clientRequestedStreaming && providerRequiresStreaming) {
|
|
const result = await handleForcedSSEToJson({ ...sharedCtx, providerResponse, sourceFormat, trackDone, appendLog });
|
|
if (result) { streamController.handleComplete(); return result; }
|
|
}
|
|
|
|
// True non-streaming response
|
|
if (!stream) {
|
|
const result = await handleNonStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, reqLogger, toolNameMap, trackDone, appendLog });
|
|
streamController.handleComplete();
|
|
return result;
|
|
}
|
|
|
|
// Streaming response
|
|
const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx });
|
|
return handleStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete });
|
|
}
|
|
|
|
export function isTokenExpiringSoon(expiresAt, bufferMs = 5 * 60 * 1000) {
|
|
if (!expiresAt) return false;
|
|
return new Date(expiresAt).getTime() - Date.now() < bufferMs;
|
|
}
|