mirror of
https://github.com/decolua/9router.git
synced 2026-05-08 12:01:28 +00:00
193 lines
10 KiB
JavaScript
193 lines
10 KiB
JavaScript
import { detectFormat, getTargetFormat } from "../services/provider.js";
|
|
import { translateRequest } from "../translator/index.js";
|
|
import { FORMATS } from "../translator/formats.js";
|
|
import { COLORS } from "../utils/stream.js";
|
|
import { createStreamController } from "../utils/streamHandler.js";
|
|
import { refreshWithRetry } from "../services/tokenRefresh.js";
|
|
import { createRequestLogger } from "../utils/requestLogger.js";
|
|
import { getModelTargetFormat, PROVIDER_ID_TO_ALIAS } from "../config/providerModels.js";
|
|
import { createErrorResult, parseUpstreamError, formatProviderError } from "../utils/error.js";
|
|
import { HTTP_STATUS } from "../config/runtimeConfig.js";
|
|
import { handleBypassRequest } from "../utils/bypassHandler.js";
|
|
import { trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
|
|
import { getExecutor } from "../executors/index.js";
|
|
import { buildRequestDetail, extractRequestConfig } from "./chatCore/requestDetail.js";
|
|
import { handleForcedSSEToJson } from "./chatCore/sseToJsonHandler.js";
|
|
import { handleNonStreamingResponse } from "./chatCore/nonStreamingHandler.js";
|
|
import { handleStreamingResponse, buildOnStreamComplete } from "./chatCore/streamingHandler.js";
|
|
|
|
/**
|
|
* Core chat handler - shared between SSE and Worker
|
|
* @param {object} options.body - Request body
|
|
* @param {object} options.modelInfo - { provider, model }
|
|
* @param {object} options.credentials - Provider credentials
|
|
* @param {string} options.sourceFormatOverride - Override detected source format (e.g. "openai-responses")
|
|
*/
|
|
export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey, sourceFormatOverride }) {
|
|
const { provider, model } = modelInfo;
|
|
const requestStartTime = Date.now();
|
|
|
|
const sourceFormat = sourceFormatOverride || detectFormat(body);
|
|
|
|
// Check for bypass patterns (warmup, skip)
|
|
const bypassResponse = handleBypassRequest(body, model, userAgent);
|
|
if (bypassResponse) return bypassResponse;
|
|
|
|
const alias = PROVIDER_ID_TO_ALIAS[provider] || provider;
|
|
const modelTargetFormat = getModelTargetFormat(alias, model);
|
|
const targetFormat = modelTargetFormat || getTargetFormat(provider);
|
|
|
|
const clientRequestedStreaming = body.stream === true || sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI;
|
|
const providerRequiresStreaming = provider === "openai" || provider === "codex";
|
|
const stream = providerRequiresStreaming ? true : (body.stream !== false);
|
|
|
|
const reqLogger = await createRequestLogger(sourceFormat, targetFormat, model);
|
|
if (clientRawRequest) reqLogger.logClientRawRequest(clientRawRequest.endpoint, clientRawRequest.body, clientRawRequest.headers);
|
|
reqLogger.logRawRequest(body);
|
|
log?.debug?.("FORMAT", `${sourceFormat} → ${targetFormat} | stream=${stream}`);
|
|
|
|
let translatedBody = translateRequest(sourceFormat, targetFormat, model, body, stream, credentials, provider, reqLogger);
|
|
const toolNameMap = translatedBody._toolNameMap;
|
|
delete translatedBody._toolNameMap;
|
|
translatedBody.model = model;
|
|
|
|
const executor = getExecutor(provider);
|
|
trackPendingRequest(model, provider, connectionId, true);
|
|
appendRequestLog({ model, provider, connectionId, status: "PENDING" }).catch(() => {});
|
|
|
|
const msgCount = translatedBody.messages?.length || translatedBody.input?.length || translatedBody.contents?.length || translatedBody.request?.contents?.length || 0;
|
|
log?.debug?.("REQUEST", `${provider.toUpperCase()} | ${model} | ${msgCount} msgs`);
|
|
|
|
const streamController = createStreamController({
|
|
onDisconnect: (reason) => {
|
|
trackPendingRequest(model, provider, connectionId, false);
|
|
if (onDisconnect) onDisconnect(reason);
|
|
},
|
|
onError: () => trackPendingRequest(model, provider, connectionId, false),
|
|
log, provider, model
|
|
});
|
|
|
|
const proxyOptions = {
|
|
connectionProxyEnabled: credentials?.providerSpecificData?.connectionProxyEnabled === true,
|
|
connectionProxyUrl: credentials?.providerSpecificData?.connectionProxyUrl || "",
|
|
connectionNoProxy: credentials?.providerSpecificData?.connectionNoProxy || "",
|
|
};
|
|
|
|
if (proxyOptions.connectionProxyEnabled && proxyOptions.connectionProxyUrl) {
|
|
let maskedProxyUrl = proxyOptions.connectionProxyUrl;
|
|
try {
|
|
const parsed = new URL(proxyOptions.connectionProxyUrl);
|
|
const host = parsed.hostname || "";
|
|
const port = parsed.port ? `:${parsed.port}` : "";
|
|
const protocol = parsed.protocol || "http:";
|
|
maskedProxyUrl = `${protocol}//${host}${port}`;
|
|
} catch {
|
|
// Keep raw if URL parsing fails
|
|
}
|
|
|
|
const poolId = credentials?.providerSpecificData?.connectionProxyPoolId || "none";
|
|
const connectionName = credentials?.connectionName || credentials?.connectionId || "unknown";
|
|
log?.info?.("PROXY", `${provider.toUpperCase()} | ${model} | conn=${connectionName} | pool=${poolId} | url=${maskedProxyUrl}`);
|
|
}
|
|
|
|
if (proxyOptions.connectionProxyEnabled && proxyOptions.connectionNoProxy) {
|
|
const connectionName = credentials?.connectionName || credentials?.connectionId || "unknown";
|
|
log?.debug?.("PROXY", `${provider.toUpperCase()} | ${model} | conn=${connectionName} | no_proxy=${proxyOptions.connectionNoProxy}`);
|
|
}
|
|
|
|
// Execute request
|
|
let providerResponse, providerUrl, providerHeaders, finalBody;
|
|
try {
|
|
const result = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log, proxyOptions });
|
|
providerResponse = result.response;
|
|
providerUrl = result.url;
|
|
providerHeaders = result.headers;
|
|
finalBody = result.transformedBody;
|
|
reqLogger.logTargetRequest(providerUrl, providerHeaders, finalBody);
|
|
} catch (error) {
|
|
trackPendingRequest(model, provider, connectionId, false, true);
|
|
appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : HTTP_STATUS.BAD_GATEWAY}` }).catch(() => {});
|
|
saveRequestDetail(buildRequestDetail({
|
|
provider, model, connectionId,
|
|
latency: { ttft: 0, total: Date.now() - requestStartTime },
|
|
tokens: { prompt_tokens: 0, completion_tokens: 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: translatedBody || null,
|
|
response: { error: error.message || String(error), status: error.name === "AbortError" ? 499 : 502, thinking: null },
|
|
status: "error"
|
|
})).catch(() => {});
|
|
|
|
if (error.name === "AbortError") {
|
|
streamController.handleError(error);
|
|
return createErrorResult(499, "Request aborted");
|
|
}
|
|
const errMsg = formatProviderError(error, provider, model, HTTP_STATUS.BAD_GATEWAY);
|
|
console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`);
|
|
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, errMsg);
|
|
}
|
|
|
|
// Handle 401/403 - try token refresh
|
|
if (providerResponse.status === HTTP_STATUS.UNAUTHORIZED || providerResponse.status === HTTP_STATUS.FORBIDDEN) {
|
|
const newCredentials = await refreshWithRetry(() => executor.refreshCredentials(credentials, log), 3, log);
|
|
if (newCredentials?.accessToken || newCredentials?.copilotToken) {
|
|
log?.info?.("TOKEN", `${provider.toUpperCase()} | refreshed`);
|
|
Object.assign(credentials, newCredentials);
|
|
if (onCredentialsRefreshed) await onCredentialsRefreshed(newCredentials);
|
|
try {
|
|
const retryResult = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log, proxyOptions });
|
|
if (retryResult.response.ok) { providerResponse = retryResult.response; providerUrl = retryResult.url; }
|
|
} catch { log?.warn?.("TOKEN", `${provider.toUpperCase()} | retry after refresh failed`); }
|
|
} else {
|
|
log?.warn?.("TOKEN", `${provider.toUpperCase()} | refresh failed`);
|
|
}
|
|
}
|
|
|
|
// Provider returned error
|
|
if (!providerResponse.ok) {
|
|
trackPendingRequest(model, provider, connectionId, false, true);
|
|
const { statusCode, message, retryAfterMs } = await parseUpstreamError(providerResponse, provider);
|
|
appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => {});
|
|
saveRequestDetail(buildRequestDetail({
|
|
provider, model, connectionId,
|
|
latency: { ttft: 0, total: Date.now() - requestStartTime },
|
|
tokens: { prompt_tokens: 0, completion_tokens: 0 },
|
|
request: extractRequestConfig(body, stream),
|
|
providerRequest: finalBody || translatedBody || null,
|
|
response: { error: message, status: statusCode, thinking: null },
|
|
status: "error"
|
|
})).catch(() => {});
|
|
|
|
const errMsg = formatProviderError(new Error(message), provider, model, statusCode);
|
|
console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`);
|
|
if (retryAfterMs && provider === "antigravity") {
|
|
log?.debug?.("RETRY", `Antigravity quota reset in ${Math.ceil(retryAfterMs / 1000)}s`);
|
|
}
|
|
reqLogger.logError(new Error(message), finalBody || translatedBody);
|
|
return createErrorResult(statusCode, errMsg, retryAfterMs);
|
|
}
|
|
|
|
const sharedCtx = { provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess };
|
|
const appendLog = (extra) => appendRequestLog({ model, provider, connectionId, ...extra }).catch(() => {});
|
|
const trackDone = () => trackPendingRequest(model, provider, connectionId, false);
|
|
|
|
// Provider forced streaming but client wants JSON
|
|
if (!clientRequestedStreaming && providerRequiresStreaming) {
|
|
const result = await handleForcedSSEToJson({ ...sharedCtx, providerResponse, sourceFormat, trackDone, appendLog });
|
|
if (result) return result;
|
|
}
|
|
|
|
// True non-streaming response
|
|
if (!stream) {
|
|
return handleNonStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, reqLogger, trackDone, appendLog });
|
|
}
|
|
|
|
// Streaming response
|
|
const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx });
|
|
return handleStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete });
|
|
}
|
|
|
|
export function isTokenExpiringSoon(expiresAt, bufferMs = 5 * 60 * 1000) {
|
|
if (!expiresAt) return false;
|
|
return new Date(expiresAt).getTime() - Date.now() < bufferMs;
|
|
}
|