diff --git a/.gitignore b/.gitignore index 450bc617..acb837cc 100644 --- a/.gitignore +++ b/.gitignore @@ -19,7 +19,6 @@ # production /build -cloud/* # misc .DS_Store diff --git a/cloud/.gitignore b/cloud/.gitignore new file mode 100644 index 00000000..03d963cb --- /dev/null +++ b/cloud/.gitignore @@ -0,0 +1,3 @@ +.wrangler/* +node_modules/* +**node_modules/* diff --git a/cloud/README.md b/cloud/README.md new file mode 100644 index 00000000..5b096dd3 --- /dev/null +++ b/cloud/README.md @@ -0,0 +1,25 @@ +# 9Router Cloud Worker + +Deploy your own Cloudflare Worker to access 9Router from anywhere. + +## Setup + +```bash +# 1. Login to Cloudflare +npm install -g wrangler +wrangler login + +# 2. Install dependencies +cd app/cloud +npm install + +# 3. Create KV & D1, then paste IDs into wrangler.toml +wrangler kv namespace create KV +wrangler d1 create proxy-db + +# 4. Init database & deploy +wrangler d1 execute proxy-db --remote --file=./migrations/0001_init.sql +npm run deploy +``` + +Copy your Worker URL → 9Router Dashboard → **Endpoint** → **Setup Cloud** → paste → **Save** → **Enable Cloud**. diff --git a/cloud/jsconfig.json b/cloud/jsconfig.json new file mode 100644 index 00000000..6f1c2a85 --- /dev/null +++ b/cloud/jsconfig.json @@ -0,0 +1,12 @@ +{ + "compilerOptions": { + "baseUrl": ".", + "paths": { + "open-sse": ["../open-sse"], + "open-sse/*": ["../open-sse/*"] + }, + "module": "ESNext", + "moduleResolution": "bundler", + "target": "ESNext" + } +} diff --git a/cloud/migrations/0001_init.sql b/cloud/migrations/0001_init.sql new file mode 100644 index 00000000..ce13021a --- /dev/null +++ b/cloud/migrations/0001_init.sql @@ -0,0 +1,9 @@ +-- Migration: Create machines table +CREATE TABLE IF NOT EXISTS machines ( + machineId TEXT PRIMARY KEY, + data TEXT NOT NULL, + updatedAt TEXT NOT NULL +); + +-- Index for faster lookups +CREATE INDEX IF NOT EXISTS idx_machines_updatedAt ON machines(updatedAt); diff --git a/cloud/package.json b/cloud/package.json new file mode 100644 index 00000000..ee1ba8aa --- /dev/null +++ b/cloud/package.json @@ -0,0 +1,17 @@ +{ + "name": "9router-cloud", + "version": "0.2.13", + "private": true, + "type": "module", + "description": "9Router Cloud Worker - Self-hosted Cloudflare Worker proxy", + "scripts": { + "dev": "wrangler dev", + "deploy": "wrangler deploy" + }, + "dependencies": { + "open-sse": "file:../open-sse" + }, + "devDependencies": { + "wrangler": "^3.0.0" + } +} diff --git a/cloud/src/handlers/cache.js b/cloud/src/handlers/cache.js new file mode 100644 index 00000000..b5a50b8d --- /dev/null +++ b/cloud/src/handlers/cache.js @@ -0,0 +1,37 @@ +import { errorResponse } from "open-sse/utils/error.js"; +import { extractBearerToken, parseApiKey } from "../utils/apiKey.js"; +import * as log from "../utils/logger.js"; + +export async function handleCacheClear(request, env) { + const apiKey = extractBearerToken(request); + if (!apiKey) { + return errorResponse(401, "Missing API key"); + } + + try { + const body = await request.json().catch(() => ({})); + + // Get machineId from API key or body + let machineId = body.machineId; + if (!machineId) { + const parsed = await parseApiKey(apiKey); + machineId = parsed?.machineId; + } + + if (!machineId) { + return errorResponse(400, "Missing machineId"); + } + + // No cache layer to clear anymore + log.info("CACHE", `Cache clear requested for machine: ${machineId} (no-op)`); + + return new Response(JSON.stringify({ success: true, machineId, message: "No cache layer" }), { + headers: { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*" + } + }); + } catch (error) { + return errorResponse(500, error.message); + } +} \ No newline at end of file diff --git a/cloud/src/handlers/chat.js b/cloud/src/handlers/chat.js new file mode 100644 index 00000000..0b4a206d --- /dev/null +++ b/cloud/src/handlers/chat.js @@ -0,0 +1,305 @@ +import { getModelInfoCore } from "open-sse/services/model.js"; +import { handleChatCore } from "open-sse/handlers/chatCore.js"; +import { errorResponse } from "open-sse/utils/error.js"; +import { checkFallbackError, isAccountUnavailable, getUnavailableUntil, getEarliestRateLimitedUntil, formatRetryAfter } from "open-sse/services/accountFallback.js"; +import { getComboModelsFromData, handleComboChat } from "open-sse/services/combo.js"; +import { HTTP_STATUS } from "open-sse/config/constants.js"; +import * as log from "../utils/logger.js"; +import { refreshTokenByProvider } from "../services/tokenRefresh.js"; +import { parseApiKey, extractBearerToken } from "../utils/apiKey.js"; +import { getMachineData, saveMachineData } from "../services/storage.js"; + +const TOKEN_EXPIRY_BUFFER_MS = 5 * 60 * 1000; + +async function getModelInfo(modelStr, machineId, env) { + const data = await getMachineData(machineId, env); + return getModelInfoCore(modelStr, data?.modelAliases || {}); +} + +/** + * Handle chat request + * @param {Request} request + * @param {Object} env + * @param {Object} ctx + * @param {string|null} machineIdOverride - machineId from URL (old format) or null (new format - extract from key) + */ +export async function handleChat(request, env, ctx, machineIdOverride = null) { + if (request.method === "OPTIONS") { + return new Response(null, { + headers: { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, OPTIONS", + "Access-Control-Allow-Headers": "*" + } + }); + } + + // Determine machineId: from URL (old) or from API key (new) + let machineId = machineIdOverride; + + if (!machineId) { + // New format: extract machineId from API key + const apiKey = extractBearerToken(request); + if (!apiKey) return errorResponse(HTTP_STATUS.UNAUTHORIZED, "Missing API key"); + + const parsed = await parseApiKey(apiKey); + if (!parsed) return errorResponse(HTTP_STATUS.UNAUTHORIZED, "Invalid API key format"); + + if (!parsed.isNewFormat || !parsed.machineId) { + return errorResponse(HTTP_STATUS.BAD_REQUEST, "API key does not contain machineId. Use /{machineId}/v1/... endpoint for old format keys."); + } + + machineId = parsed.machineId; + } + + if (!await validateApiKey(request, machineId, env)) { + return errorResponse(HTTP_STATUS.UNAUTHORIZED, "Invalid API key"); + } + + let body; + try { + body = await request.json(); + } catch { + return errorResponse(HTTP_STATUS.BAD_REQUEST, "Invalid JSON body"); + } + + log.info("CHAT", `${machineId} | ${body.model}`, { stream: body.stream !== false }); + + const modelStr = body.model; + if (!modelStr) return errorResponse(HTTP_STATUS.BAD_REQUEST, "Missing model"); + + // Check if model is a combo + const data = await getMachineData(machineId, env); + const comboModels = getComboModelsFromData(modelStr, data?.combos || []); + + if (comboModels) { + log.info("COMBO", `"${modelStr}" with ${comboModels.length} models`); + return handleComboChat({ + body, + models: comboModels, + handleSingleModel: (reqBody, model) => handleSingleModelChat(reqBody, model, machineId, env), + log + }); + } + + // Single model request + return handleSingleModelChat(body, modelStr, machineId, env); +} + +/** + * Handle single model chat request + */ +async function handleSingleModelChat(body, modelStr, machineId, env) { + const modelInfo = await getModelInfo(modelStr, machineId, env); + if (!modelInfo.provider) return errorResponse(HTTP_STATUS.BAD_REQUEST, "Invalid model format"); + + const { provider, model } = modelInfo; + log.info("MODEL", `${provider.toUpperCase()} | ${model}`); + + let excludeConnectionId = null; + let lastError = null; + let lastStatus = null; + + while (true) { + const credentials = await getProviderCredentials(machineId, provider, env, excludeConnectionId); + if (!credentials || credentials.allRateLimited) { + if (credentials?.allRateLimited) { + const retryAfterSec = Math.ceil((new Date(credentials.retryAfter).getTime() - Date.now()) / 1000); + const errorMsg = lastError || credentials.lastError || "Unavailable"; + const msg = `[${provider}/${model}] ${errorMsg} (${credentials.retryAfterHuman})`; + const status = lastStatus || Number(credentials.lastErrorCode) || HTTP_STATUS.SERVICE_UNAVAILABLE; + log.warn("CHAT", `${provider.toUpperCase()} | ${msg}`); + return new Response( + JSON.stringify({ error: { message: msg } }), + { status, headers: { "Content-Type": "application/json", "Retry-After": String(Math.max(retryAfterSec, 1)) } } + ); + } + if (!excludeConnectionId) { + return errorResponse(HTTP_STATUS.BAD_REQUEST, `No credentials for provider: ${provider}`); + } + log.warn("CHAT", `${provider.toUpperCase()} | no more accounts`); + return new Response( + JSON.stringify({ error: lastError || "All accounts unavailable" }), + { status: lastStatus || HTTP_STATUS.SERVICE_UNAVAILABLE, headers: { "Content-Type": "application/json" } } + ); + } + + log.debug("CHAT", `account=${credentials.id}`, { provider }); + + const refreshedCredentials = await checkAndRefreshToken(machineId, provider, credentials, env); + + // Use shared chatCore + const result = await handleChatCore({ + body, + modelInfo: { provider, model }, + credentials: refreshedCredentials, + log, + onCredentialsRefreshed: async (newCreds) => { + await updateCredentials(machineId, credentials.id, newCreds, env); + }, + onRequestSuccess: async () => { + // Clear error status only if currently has error (optimization) + await clearAccountError(machineId, credentials.id, credentials, env); + } + }); + + if (result.success) return result.response; + + const { shouldFallback } = checkFallbackError(result.status, result.error); + + if (shouldFallback) { + log.warn("FALLBACK", `${provider.toUpperCase()} | ${credentials.id} | ${result.status}`); + await markAccountUnavailable(machineId, credentials.id, result.status, result.error, env); + excludeConnectionId = credentials.id; + lastError = result.error; + lastStatus = result.status; + continue; + } + + return result.response; + } +} + +async function checkAndRefreshToken(machineId, provider, credentials, env) { + if (!credentials.expiresAt) return credentials; + + const expiresAt = new Date(credentials.expiresAt).getTime(); + if (expiresAt - Date.now() >= TOKEN_EXPIRY_BUFFER_MS) return credentials; + + log.debug("TOKEN", `${provider.toUpperCase()} | expiring, refreshing`); + + const newCredentials = await refreshTokenByProvider(provider, credentials); + if (newCredentials?.accessToken) { + await updateCredentials(machineId, credentials.id, newCredentials, env); + return { + ...credentials, + accessToken: newCredentials.accessToken, + refreshToken: newCredentials.refreshToken || credentials.refreshToken, + expiresAt: newCredentials.expiresIn + ? new Date(Date.now() + newCredentials.expiresIn * 1000).toISOString() + : credentials.expiresAt + }; + } + + return credentials; +} + +async function validateApiKey(request, machineId, env) { + const authHeader = request.headers.get("Authorization"); + if (!authHeader?.startsWith("Bearer ")) return false; + + const apiKey = authHeader.slice(7); + const data = await getMachineData(machineId, env); + return data?.apiKeys?.some(k => k.key === apiKey) || false; +} + +async function getProviderCredentials(machineId, provider, env, excludeConnectionId = null) { + const data = await getMachineData(machineId, env); + if (!data?.providers) return null; + + const providerConnections = Object.entries(data.providers) + .filter(([connId, conn]) => { + if (conn.provider !== provider || !conn.isActive) return false; + if (excludeConnectionId && connId === excludeConnectionId) return false; + if (isAccountUnavailable(conn.rateLimitedUntil)) return false; + return true; + }) + .sort((a, b) => (a[1].priority || 999) - (b[1].priority || 999)); + + if (providerConnections.length === 0) { + // Check if accounts exist but all rate limited + const allConnections = Object.entries(data.providers) + .filter(([, conn]) => conn.provider === provider && conn.isActive) + .map(([, conn]) => conn); + const earliest = getEarliestRateLimitedUntil(allConnections); + if (earliest) { + const rateLimitedConns = allConnections.filter(c => c.rateLimitedUntil && new Date(c.rateLimitedUntil).getTime() > Date.now()); + const earliestConn = rateLimitedConns.sort((a, b) => new Date(a.rateLimitedUntil) - new Date(b.rateLimitedUntil))[0]; + return { + allRateLimited: true, + retryAfter: earliest, + retryAfterHuman: formatRetryAfter(earliest), + lastError: earliestConn?.lastError || null, + lastErrorCode: earliestConn?.errorCode || null + }; + } + return null; + } + + const [connectionId, connection] = providerConnections[0]; + + return { + id: connectionId, + apiKey: connection.apiKey, + accessToken: connection.accessToken, + refreshToken: connection.refreshToken, + expiresAt: connection.expiresAt, + projectId: connection.projectId, + copilotToken: connection.providerSpecificData?.copilotToken, + providerSpecificData: connection.providerSpecificData, + // Include current status for optimization check + status: connection.status, + lastError: connection.lastError, + rateLimitedUntil: connection.rateLimitedUntil + }; +} + +async function markAccountUnavailable(machineId, connectionId, status, errorText, env) { + const data = await getMachineData(machineId, env); + if (!data?.providers?.[connectionId]) return; + + const conn = data.providers[connectionId]; + const backoffLevel = conn.backoffLevel || 0; + const { cooldownMs, newBackoffLevel } = checkFallbackError(status, errorText, backoffLevel); + const rateLimitedUntil = getUnavailableUntil(cooldownMs); + const reason = typeof errorText === "string" ? errorText.slice(0, 100) : "Provider error"; + + data.providers[connectionId].rateLimitedUntil = rateLimitedUntil; + data.providers[connectionId].status = "unavailable"; + data.providers[connectionId].lastError = reason; + data.providers[connectionId].errorCode = status || null; + data.providers[connectionId].lastErrorAt = new Date().toISOString(); + data.providers[connectionId].backoffLevel = newBackoffLevel ?? backoffLevel; + data.providers[connectionId].updatedAt = new Date().toISOString(); + + await saveMachineData(machineId, data, env); + log.warn("ACCOUNT", `${connectionId} | unavailable until ${rateLimitedUntil} (backoff=${newBackoffLevel ?? backoffLevel})`); +} + +async function clearAccountError(machineId, connectionId, currentCredentials, env) { + // Only update if currently has error status (optimization) + const hasError = currentCredentials.status === "unavailable" || + currentCredentials.lastError || + currentCredentials.rateLimitedUntil; + + if (!hasError) return; // Skip if already clean + + const data = await getMachineData(machineId, env); + if (!data?.providers?.[connectionId]) return; + + data.providers[connectionId].status = "active"; + data.providers[connectionId].lastError = null; + data.providers[connectionId].lastErrorAt = null; + data.providers[connectionId].rateLimitedUntil = null; + data.providers[connectionId].backoffLevel = 0; + data.providers[connectionId].updatedAt = new Date().toISOString(); + + await saveMachineData(machineId, data, env); + log.info("ACCOUNT", `${connectionId} | error cleared`); +} + +async function updateCredentials(machineId, connectionId, newCredentials, env) { + const data = await getMachineData(machineId, env); + if (!data?.providers?.[connectionId]) return; + + data.providers[connectionId].accessToken = newCredentials.accessToken; + if (newCredentials.refreshToken) data.providers[connectionId].refreshToken = newCredentials.refreshToken; + if (newCredentials.expiresIn) { + data.providers[connectionId].expiresAt = new Date(Date.now() + newCredentials.expiresIn * 1000).toISOString(); + data.providers[connectionId].expiresIn = newCredentials.expiresIn; + } + data.providers[connectionId].updatedAt = new Date().toISOString(); + + await saveMachineData(machineId, data, env); + log.debug("TOKEN", `credentials updated | ${connectionId}`); +} diff --git a/cloud/src/handlers/cleanup.js b/cloud/src/handlers/cleanup.js new file mode 100644 index 00000000..3c638455 --- /dev/null +++ b/cloud/src/handlers/cleanup.js @@ -0,0 +1,34 @@ +import * as log from "../utils/logger.js"; + +const RETENTION_DAYS = 7; + +/** + * Cleanup old machine data from D1 + * Runs daily via cron trigger + */ +export async function handleCleanup(env) { + const cutoffDate = new Date(Date.now() - RETENTION_DAYS * 24 * 60 * 60 * 1000).toISOString(); + + log.info("CLEANUP", `Deleting records older than ${cutoffDate}`); + + try { + const result = await env.DB.prepare("DELETE FROM machines WHERE updatedAt < ?") + .bind(cutoffDate) + .run(); + + log.info("CLEANUP", `Deleted ${result.meta?.changes || 0} old records`); + + return { + success: true, + deleted: result.meta?.changes || 0, + cutoffDate + }; + } catch (error) { + log.error("CLEANUP", error.message); + return { + success: false, + error: error.message + }; + } +} + diff --git a/cloud/src/handlers/countTokens.js b/cloud/src/handlers/countTokens.js new file mode 100644 index 00000000..bfb2cf11 --- /dev/null +++ b/cloud/src/handlers/countTokens.js @@ -0,0 +1,46 @@ +import { errorResponse } from "open-sse/utils/error.js"; + +const CORS_HEADERS = { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "POST, OPTIONS", + "Access-Control-Allow-Headers": "*" +}; + +/** + * Handle POST /{machineId}/v1/messages/count_tokens + * Mock token count response based on content length + */ +export async function handleCountTokens(request, env) { + let body; + try { + body = await request.json(); + } catch { + return errorResponse(400, "Invalid JSON body"); + } + + // Estimate token count based on content length + const messages = body.messages || []; + let totalChars = 0; + + for (const msg of messages) { + if (typeof msg.content === "string") { + totalChars += msg.content.length; + } else if (Array.isArray(msg.content)) { + for (const part of msg.content) { + if (part.type === "text" && part.text) { + totalChars += part.text.length; + } + } + } + } + + // Rough estimate: ~4 chars per token + const inputTokens = Math.ceil(totalChars / 4); + + return new Response(JSON.stringify({ + input_tokens: inputTokens + }), { + headers: { "Content-Type": "application/json", ...CORS_HEADERS } + }); +} + diff --git a/cloud/src/handlers/forward.js b/cloud/src/handlers/forward.js new file mode 100644 index 00000000..ab89aec4 --- /dev/null +++ b/cloud/src/handlers/forward.js @@ -0,0 +1,75 @@ +// CF headers to remove +const CF_HEADERS = [ + "cf-connecting-ip", "cf-connecting-ip6", "cf-ray", "cf-visitor", + "cf-ipcountry", "cf-tracking-id", "cf-connecting-ip6-policy", + "x-real-ip", "x-forwarded-for", "x-forwarded-proto", "x-forwarded-host" +]; + +// Forward request to any endpoint +export async function handleForward(request) { + try { + const url = new URL(request.url); + const clientIp = request.headers.get("CF-Connecting-IP") || ""; + const { targetUrl, headers = {}, body } = await request.json(); + + if (!targetUrl) { + return new Response(JSON.stringify({ error: "targetUrl is required" }), { + status: 400, + headers: { "Content-Type": "application/json" } + }); + } + + // Filter out CF headers from input + const cleanHeaders = {}; + for (const [key, value] of Object.entries(headers)) { + if (!CF_HEADERS.includes(key.toLowerCase())) { + cleanHeaders[key] = value; + } + } + + // Set standard forwarding headers + cleanHeaders["X-Client-IP"] = clientIp; + cleanHeaders["X-Forwarded-Proto"] = url.protocol.replace(":", ""); + cleanHeaders["X-Forwarded-Host"] = url.host; + cleanHeaders["X-From-Worker"] = "1"; + + console.log("[FORWARD] Target:", targetUrl); + console.log("[FORWARD] Headers:", JSON.stringify(cleanHeaders)); + + // Create Request object to have more control over headers + const outgoingRequest = new Request(targetUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...cleanHeaders + }, + body: JSON.stringify(body) + }); + + // Use fetch with cf options to minimize auto-added headers + const response = await fetch(outgoingRequest, { + cf: { + // Disable automatic features that add headers + scrapeShield: false, + minify: false, + mirage: false, + polish: "off" + } + }); + + // Stream response back to client + return new Response(response.body, { + status: response.status, + headers: { + "Content-Type": response.headers.get("Content-Type") || "application/json", + "Access-Control-Allow-Origin": "*" + } + }); + } catch (error) { + console.error("[FORWARD] Error:", error.message); + return new Response(JSON.stringify({ error: error.message }), { + status: 500, + headers: { "Content-Type": "application/json" } + }); + } +} diff --git a/cloud/src/handlers/forwardRaw.js b/cloud/src/handlers/forwardRaw.js new file mode 100644 index 00000000..87d34193 --- /dev/null +++ b/cloud/src/handlers/forwardRaw.js @@ -0,0 +1,173 @@ +import { connect } from "cloudflare:sockets"; + +// Forward request via raw TCP socket (bypasses CF auto headers) +export async function handleForwardRaw(request) { + try { + const { targetUrl, headers = {}, body } = await request.json(); + + if (!targetUrl) { + return new Response(JSON.stringify({ error: "targetUrl is required" }), { + status: 400, + headers: { "Content-Type": "application/json" } + }); + } + + const url = new URL(targetUrl); + const host = url.hostname; + const port = url.port || (url.protocol === "https:" ? 443 : 80); + const path = url.pathname + url.search; + const isHttps = url.protocol === "https:"; + + console.log("[FORWARD_RAW] Connecting to:", host, port, isHttps ? "(TLS)" : ""); + + // Connect to target server + let secureSocket; + if (isHttps) { + // For HTTPS, connect directly with TLS enabled + console.log("[FORWARD_RAW] Creating TLS socket..."); + secureSocket = connect({ + hostname: host, + port: parseInt(port), + secureTransport: "on" + }); + console.log("[FORWARD_RAW] TLS socket created"); + } else { + secureSocket = connect({ hostname: host, port: parseInt(port) }); + } + + console.log("[FORWARD_RAW] Socket object:", secureSocket); + console.log("[FORWARD_RAW] Socket opened:", secureSocket.opened); + + // Wait for socket to be ready + try { + console.log("[FORWARD_RAW] Waiting for socket to open..."); + await secureSocket.opened; + console.log("[FORWARD_RAW] Socket opened successfully"); + } catch (openError) { + console.error("[FORWARD_RAW] Socket open error:", openError.message); + throw openError; + } + + console.log("[FORWARD_RAW] Getting writer and reader..."); + const writer = secureSocket.writable.getWriter(); + const reader = secureSocket.readable.getReader(); + console.log("[FORWARD_RAW] Writer and reader obtained"); + + // Build raw HTTP request + const bodyStr = JSON.stringify(body); + const requestHeaders = { + "Host": host, + "Content-Type": "application/json", + "Content-Length": new TextEncoder().encode(bodyStr).length.toString(), + "Connection": "close", + ...headers + }; + + // Build HTTP request string + let httpRequest = `POST ${path} HTTP/1.1\r\n`; + for (const [key, value] of Object.entries(requestHeaders)) { + httpRequest += `${key}: ${value}\r\n`; + } + httpRequest += `\r\n${bodyStr}`; + + console.log("[FORWARD_RAW] Sending request:", httpRequest.substring(0, 300)); + console.log("[FORWARD_RAW] Full request length:", httpRequest.length); + + // Send request + try { + console.log("[FORWARD_RAW] Writing to socket..."); + await writer.write(new TextEncoder().encode(httpRequest)); + console.log("[FORWARD_RAW] Write complete, closing writer..."); + await writer.close(); + console.log("[FORWARD_RAW] Writer closed"); + } catch (writeError) { + console.error("[FORWARD_RAW] Write error:", writeError.message); + throw writeError; + } + + // Read response with timeout + console.log("[FORWARD_RAW] Starting to read response..."); + let responseData = new Uint8Array(0); + let attempts = 0; + const maxAttempts = 100; // 10 seconds max + + while (attempts < maxAttempts) { + console.log("[FORWARD_RAW] Reading attempt:", attempts); + const { done, value } = await reader.read(); + console.log("[FORWARD_RAW] Read result - done:", done, "value length:", value?.length); + if (done) break; + if (value) { + const newData = new Uint8Array(responseData.length + value.length); + newData.set(responseData); + newData.set(value, responseData.length); + responseData = newData; + + // Check if we have complete response (has headers end marker) + const text = new TextDecoder().decode(responseData); + if (text.includes("\r\n\r\n")) { + // Check if we have Content-Length and received all body + const headerEnd = text.indexOf("\r\n\r\n"); + const headers = text.substring(0, headerEnd).toLowerCase(); + const contentLengthMatch = headers.match(/content-length:\s*(\d+)/); + if (contentLengthMatch) { + const expectedLength = parseInt(contentLengthMatch[1]); + const bodyReceived = text.length - headerEnd - 4; + if (bodyReceived >= expectedLength) { + console.log("[FORWARD_RAW] Complete response received"); + break; + } + } + } + } + attempts++; + } + + console.log("[FORWARD_RAW] Read loop finished, total bytes:", responseData.length); + + const responseText = new TextDecoder().decode(responseData); + console.log("[FORWARD_RAW] Response received:", responseText.substring(0, 500)); + + // Parse HTTP response + const headerEndIndex = responseText.indexOf("\r\n\r\n"); + if (headerEndIndex === -1) { + console.log("[FORWARD_RAW] Full response data:", responseText); + throw new Error("Invalid HTTP response - no header end found"); + } + + const headerPart = responseText.substring(0, headerEndIndex); + const bodyPart = responseText.substring(headerEndIndex + 4); + + // Parse status line + const statusLine = headerPart.split("\r\n")[0]; + const statusMatch = statusLine.match(/HTTP\/[\d.]+ (\d+)/); + const status = statusMatch ? parseInt(statusMatch[1]) : 200; + + // Parse headers + const responseHeaders = {}; + const headerLines = headerPart.split("\r\n").slice(1); + for (const line of headerLines) { + const colonIndex = line.indexOf(":"); + if (colonIndex > 0) { + const key = line.substring(0, colonIndex).trim(); + const value = line.substring(colonIndex + 1).trim(); + responseHeaders[key.toLowerCase()] = value; + } + } + + return new Response(bodyPart, { + status, + headers: { + "Content-Type": responseHeaders["content-type"] || "application/json", + "Access-Control-Allow-Origin": "*" + } + }); + + } catch (error) { + console.error("[FORWARD_RAW] Error:", error.message, error.stack); + return new Response(JSON.stringify({ error: error.message }), { + status: 500, + headers: { "Content-Type": "application/json" } + }); + } +} + diff --git a/cloud/src/handlers/sync.js b/cloud/src/handlers/sync.js new file mode 100644 index 00000000..93d4b766 --- /dev/null +++ b/cloud/src/handlers/sync.js @@ -0,0 +1,227 @@ +import * as log from "../utils/logger.js"; +import { getMachineData, saveMachineData, deleteMachineData } from "../services/storage.js"; + +const CORS_HEADERS = { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*" +}; + +// Removed: WORKER_FIELDS and WORKER_SPECIFIC_FIELDS +// Now syncing entire provider based on updatedAt (simpler logic) + +export async function handleSync(request, env, ctx) { + const url = new URL(request.url); + const machineId = url.pathname.split("/")[2]; // /sync/:machineId + + // Handle CORS preflight + if (request.method === "OPTIONS") { + return new Response(null, { + headers: { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS", + "Access-Control-Allow-Headers": "*" + } + }); + } + + if (!machineId) { + log.warn("SYNC", "Missing machineId in path"); + return jsonResponse({ error: "Missing machineId" }, 400); + } + + // Route by method + switch (request.method) { + case "GET": + return handleGet(machineId, env); + case "POST": + return handlePost(request, machineId, env); + case "DELETE": + return handleDelete(machineId, env); + default: + return jsonResponse({ error: "Method not allowed" }, 405); + } +} + +/** + * GET /sync/:machineId - Return merged data for Web to update + */ +async function handleGet(machineId, env) { + const data = await getMachineData(machineId, env); + + if (!data) { + log.warn("SYNC", "No data found", { machineId }); + return jsonResponse({ error: "No data found" }, 404); + } + + log.info("SYNC", "Data retrieved", { machineId }); + return jsonResponse({ + success: true, + data + }); +} + +/** + * POST /sync/:machineId - Merge Web data with Worker data + * providers stored by ID (supports multiple connections per provider) + */ +async function handlePost(request, machineId, env) { + let body; + try { + body = await request.json(); + } catch { + log.warn("SYNC", "Invalid JSON body", { machineId }); + return jsonResponse({ error: "Invalid JSON body" }, 400); + } + + // Validate required fields + if (!body.providers || !Array.isArray(body.providers)) { + log.warn("SYNC", "Missing or invalid providers array", { machineId }); + return jsonResponse({ error: "Missing providers array" }, 400); + } + + const existingData = await getMachineData(machineId, env) || { providers: {}, modelAliases: {}, apiKeys: [] }; + + // Merge providers by ID + const mergedProviders = {}; + const changes = { updated: [], fromWorker: [] }; + + for (const webProvider of body.providers) { + const providerId = webProvider.id; + if (!providerId) { + log.warn("SYNC", "Provider missing id", { provider: webProvider.provider }); + continue; + } + + const workerProvider = existingData.providers[providerId]; + + if (workerProvider) { + // Merge: token fields from Worker, config fields from Web + mergedProviders[providerId] = mergeProvider(webProvider, workerProvider, changes, providerId); + } else { + // New provider from Web + mergedProviders[providerId] = formatProviderData(webProvider); + changes.updated.push(providerId); + } + } + + // Prepare final data - modelAliases, apiKeys, combos always from Web + const finalData = { + providers: mergedProviders, + modelAliases: body.modelAliases || existingData.modelAliases || {}, + combos: body.combos || existingData.combos || [], + apiKeys: body.apiKeys || existingData.apiKeys || [], + updatedAt: new Date().toISOString() + }; + + // Store in D1 + invalidate cache + await saveMachineData(machineId, finalData, env); + + log.info("SYNC", "Data synced successfully", { + machineId, + providerCount: Object.keys(mergedProviders).length, + changes + }); + + return jsonResponse({ + success: true, + data: finalData, + changes + }); +} + +/** + * DELETE /sync/:machineId - Clear cache when Worker is disabled + */ +async function handleDelete(machineId, env) { + await deleteMachineData(machineId, env); + + log.info("SYNC", "Data deleted", { machineId }); + return jsonResponse({ + success: true, + message: "Data deleted successfully" + }); +} + +/** + * Merge provider data: compare updatedAt to decide which source to use + * Simple logic: newer wins (sync entire provider) + */ +function mergeProvider(webProvider, workerProvider, changes, providerId) { + const webTime = new Date(webProvider.updatedAt || 0).getTime(); + const workerTime = new Date(workerProvider.updatedAt || 0).getTime(); + + let merged; + + if (workerTime > webTime) { + // Cloud has newer data - use entire Cloud provider + merged = formatProviderData(workerProvider); + changes.fromWorker.push(providerId); + } else { + // Server has newer data - use entire Server provider + merged = formatProviderData(webProvider); + changes.updated.push(providerId); + } + + // Always update timestamp + merged.updatedAt = new Date().toISOString(); + return merged; +} + +/** + * Format provider data for storage + */ +function formatProviderData(provider) { + return { + id: provider.id, + provider: provider.provider, + authType: provider.authType, + name: provider.name, + displayName: provider.displayName, + email: provider.email, + priority: provider.priority, + globalPriority: provider.globalPriority, + defaultModel: provider.defaultModel, + accessToken: provider.accessToken, + refreshToken: provider.refreshToken, + expiresAt: provider.expiresAt, + expiresIn: provider.expiresIn, + tokenType: provider.tokenType, + scope: provider.scope, + idToken: provider.idToken, + projectId: provider.projectId, + apiKey: provider.apiKey, + providerSpecificData: provider.providerSpecificData || {}, + isActive: provider.isActive, + status: provider.status || "active", + lastError: provider.lastError || null, + lastErrorAt: provider.lastErrorAt || null, + errorCode: provider.errorCode || null, + rateLimitedUntil: provider.rateLimitedUntil || null, + createdAt: provider.createdAt, + updatedAt: provider.updatedAt || new Date().toISOString() + }; +} + +/** + * Update provider status (called when token refresh fails or API errors) + */ +export function updateProviderStatus(providers, providerId, status, error = null, errorCode = null) { + if (providers[providerId]) { + providers[providerId].status = status; + providers[providerId].lastError = error; + providers[providerId].lastErrorAt = error ? new Date().toISOString() : null; + providers[providerId].errorCode = errorCode; + providers[providerId].updatedAt = new Date().toISOString(); + } + return providers; +} + +/** + * Helper to create JSON response + */ +function jsonResponse(data, status = 200) { + return new Response(JSON.stringify(data), { + status, + headers: CORS_HEADERS + }); +} diff --git a/cloud/src/handlers/verify.js b/cloud/src/handlers/verify.js new file mode 100644 index 00000000..cf5e4a7a --- /dev/null +++ b/cloud/src/handlers/verify.js @@ -0,0 +1,60 @@ +import { parseApiKey, extractBearerToken } from "../utils/apiKey.js"; +import { getMachineData } from "../services/storage.js"; + +/** + * Verify API key endpoint + * @param {Request} request + * @param {Object} env + * @param {string|null} machineIdOverride - machineId from URL (old format) or null (new format) + */ +export async function handleVerify(request, env, machineIdOverride = null) { + const apiKey = extractBearerToken(request); + if (!apiKey) { + return jsonResponse({ error: "Missing or invalid Authorization header" }, 401); + } + + // Determine machineId: from URL (old) or from API key (new) + let machineId = machineIdOverride; + + if (!machineId) { + const parsed = await parseApiKey(apiKey); + if (!parsed) { + return jsonResponse({ error: "Invalid API key format" }, 401); + } + + if (!parsed.isNewFormat || !parsed.machineId) { + return jsonResponse({ error: "API key does not contain machineId" }, 400); + } + + machineId = parsed.machineId; + } + + const data = await getMachineData(machineId, env); + + if (!data) { + return jsonResponse({ error: "Machine not found" }, 404); + } + + const isValid = data.apiKeys?.some(k => k.key === apiKey) || false; + + if (!isValid) { + return jsonResponse({ error: "Invalid API key" }, 401); + } + + return jsonResponse({ + valid: true, + machineId, + providersCount: Object.keys(data.providers || {}).length + }); +} + +function jsonResponse(data, status = 200) { + return new Response(JSON.stringify(data), { + status, + headers: { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*" + } + }); +} + diff --git a/cloud/src/index.js b/cloud/src/index.js new file mode 100644 index 00000000..d33dd57b --- /dev/null +++ b/cloud/src/index.js @@ -0,0 +1,217 @@ +import { initTranslators } from "open-sse/translator/index.js"; +import { ollamaModels } from "open-sse/config/ollamaModels.js"; +import { transformToOllama } from "open-sse/utils/ollamaTransform.js"; +import * as log from "./utils/logger.js"; + +// Static imports for handlers (avoid dynamic import CPU cost) +import { handleCleanup } from "./handlers/cleanup.js"; +import { handleCacheClear } from "./handlers/cache.js"; +import { handleSync } from "./handlers/sync.js"; +import { handleChat } from "./handlers/chat.js"; +import { handleVerify } from "./handlers/verify.js"; +import { handleTestClaude } from "./handlers/testClaude.js"; +import { handleForward } from "./handlers/forward.js"; +import { handleForwardRaw } from "./handlers/forwardRaw.js"; +import { createLandingPageResponse } from "./services/landingPage.js"; + +// Initialize translators at module load (static imports) +initTranslators(); + +// Helper to add CORS headers to response +function addCorsHeaders(response) { + const newHeaders = new Headers(response.headers); + newHeaders.set("Access-Control-Allow-Origin", "*"); + newHeaders.set("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS"); + newHeaders.set("Access-Control-Allow-Headers", "*"); + return new Response(response.body, { + status: response.status, + statusText: response.statusText, + headers: newHeaders + }); +} + +const worker = { + async scheduled(event, env, ctx) { + const result = await handleCleanup(env); + log.info("SCHEDULED", "Cleanup completed", result); + }, + + async fetch(request, env, ctx) { + const startTime = Date.now(); + const url = new URL(request.url); + let path = url.pathname; + + // Normalize /v1/v1/* → /v1/* + if (path.startsWith("/v1/v1/")) { + path = path.replace("/v1/v1/", "/v1/"); + } else if (path === "/v1/v1") { + path = "/v1"; + } + + log.request(request.method, path); + + // CORS preflight + if (request.method === "OPTIONS") { + return new Response(null, { + headers: { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS", + "Access-Control-Allow-Headers": "*" + } + }); + } + + try { + // Routes + + // Landing page + if (path === "/" && request.method === "GET") { + const response = createLandingPageResponse(); + log.response(response.status, Date.now() - startTime); + return response; + } + + if (path === "/health" && request.method === "GET") { + log.response(200, Date.now() - startTime); + return new Response(JSON.stringify({ status: "ok" }), { + headers: { "Content-Type": "application/json" } + }); + } + + // Ollama compatible - list models + if (path === "/api/tags" && request.method === "GET") { + log.response(200, Date.now() - startTime); + return new Response(JSON.stringify(ollamaModels), { + headers: { "Content-Type": "application/json" } + }); + } + + if (path === "/cache/clear" && request.method === "POST") { + const response = await handleCacheClear(request, env); + log.response(response.status, Date.now() - startTime); + return response; + } + + // Sync provider data by machineId (GET, POST, DELETE) + if (path.startsWith("/sync/") && ["GET", "POST", "DELETE"].includes(request.method)) { + const response = await handleSync(request, env, ctx); + log.response(response.status, Date.now() - startTime); + return response; + } + + // ========== NEW FORMAT: /v1/... (machineId in API key) ========== + + // New format: /v1/chat/completions + if (path === "/v1/chat/completions" && request.method === "POST") { + const response = await handleChat(request, env, ctx, null); + log.response(response.status, Date.now() - startTime); + return addCorsHeaders(response); + } + + // New format: /v1/messages (Claude format) + if (path === "/v1/messages" && request.method === "POST") { + const response = await handleChat(request, env, ctx, null); + log.response(response.status, Date.now() - startTime); + return addCorsHeaders(response); + } + + // New format: /v1/responses (OpenAI Responses API - Codex CLI) + if (path === "/v1/responses" && request.method === "POST") { + const response = await handleChat(request, env, ctx, null); + log.response(response.status, Date.now() - startTime); + return response; + } + + // New format: /v1/verify + if (path === "/v1/verify" && request.method === "GET") { + const response = await handleVerify(request, env, null); + log.response(response.status, Date.now() - startTime); + return addCorsHeaders(response); + } + + // New format: /v1/api/chat (Ollama format) + if (path === "/v1/api/chat" && request.method === "POST") { + const clonedReq = request.clone(); + const body = await clonedReq.json(); + const response = await handleChat(request, env, ctx, null); + const ollamaResponse = transformToOllama(response, body.model || "llama3.2"); + log.response(200, Date.now() - startTime); + return ollamaResponse; + } + + // ========== OLD FORMAT: /{machineId}/v1/... ========== + + // Machine ID based chat endpoint + if (path.match(/^\/[^\/]+\/v1\/chat\/completions$/) && request.method === "POST") { + const machineId = path.split("/")[1]; + const response = await handleChat(request, env, ctx, machineId); + log.response(response.status, Date.now() - startTime); + return response; + } + + // Machine ID based messages endpoint (Claude format) + if (path.match(/^\/[^\/]+\/v1\/messages$/) && request.method === "POST") { + const machineId = path.split("/")[1]; + const response = await handleChat(request, env, ctx, machineId); + log.response(response.status, Date.now() - startTime); + return response; + } + + // Machine ID based api/chat endpoint (Ollama format) + if (path.match(/^\/[^\/]+\/v1\/api\/chat$/) && request.method === "POST") { + const machineId = path.split("/")[1]; + const clonedReq = request.clone(); + const body = await clonedReq.json(); + const response = await handleChat(request, env, ctx, machineId); + const ollamaResponse = transformToOllama(response, body.model || "llama3.2"); + log.response(200, Date.now() - startTime); + return ollamaResponse; + } + + // Machine ID based verify endpoint + if (path.match(/^\/[^\/]+\/v1\/verify$/) && request.method === "GET") { + const machineId = path.split("/")[1]; + const response = await handleVerify(request, env, machineId); + log.response(response.status, Date.now() - startTime); + return response; + } + + // Test Claude - forward to Anthropic API + if (path === "/testClaude" && request.method === "POST") { + const response = await handleTestClaude(request); + log.response(response.status, Date.now() - startTime); + return response; + } + + // Forward request to any endpoint + if (path === "/forward" && request.method === "POST") { + const response = await handleForward(request); + log.response(response.status, Date.now() - startTime); + return response; + } + + // Forward request via raw TCP socket (bypasses CF auto headers) + if (path === "/forward-raw" && request.method === "POST") { + const response = await handleForwardRaw(request); + log.response(response.status, Date.now() - startTime); + return response; + } + + log.warn("ROUTER", "Not found", { path }); + return new Response(JSON.stringify({ error: "Not Found" }), { + status: 404, + headers: { "Content-Type": "application/json" } + }); + + } catch (error) { + log.error("ROUTER", error.message, { stack: error.stack }); + return new Response(JSON.stringify({ error: error.message }), { + status: 500, + headers: { "Content-Type": "application/json" } + }); + } + } +}; + +export default worker; + diff --git a/cloud/src/services/landingPage.js b/cloud/src/services/landingPage.js new file mode 100644 index 00000000..dd6e91ae --- /dev/null +++ b/cloud/src/services/landingPage.js @@ -0,0 +1,27 @@ +/** + * Landing Page Service + * Simple health check page for self-hosted worker + */ + +/** + * Create landing page response + * @returns {Response} HTML response + */ +export function createLandingPageResponse() { + const html = ` +
Worker is running. Configure this URL in your 9Router dashboard.
+