feat(responses): respect client streaming preference + string input support (#121)

- Remove forced stream=true from responsesHandler
- Add stream-to-JSON converter for non-streaming clients (Codex)
- Accept string input in Responses API (normalize to array)
- Codex SSE header fallback for missing Content-Type
- Refactor: extract shared normalizeResponsesInput()

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
apeltekci
2026-02-15 11:47:55 +07:00
committed by decolua
parent 202fee714b
commit ac7cedd27e
7 changed files with 263 additions and 26 deletions

View File

@@ -1,6 +1,7 @@
import { BaseExecutor } from "./base.js";
import { CODEX_DEFAULT_INSTRUCTIONS } from "../config/codexInstructions.js";
import { PROVIDERS } from "../config/constants.js";
import { normalizeResponsesInput } from "../translator/helpers/responsesApiHelper.js";
/**
* Codex Executor - handles OpenAI Codex API (Responses API format)
@@ -24,6 +25,10 @@ export class CodexExecutor extends BaseExecutor {
* Transform request before sending - inject default instructions if missing
*/
transformRequest(model, body, stream, credentials) {
// Convert string input to array format (Codex API requires input as array)
const normalized = normalizeResponsesInput(body.input);
if (normalized) body.input = normalized;
// Ensure input is present and non-empty (Codex API rejects empty input)
if (!body.input || (Array.isArray(body.input) && body.input.length === 0)) {
body.input = [{ type: "message", role: "user", content: [{ type: "input_text", text: "..." }] }];

View File

@@ -12,6 +12,7 @@ import { HTTP_STATUS } from "../config/constants.js";
import { handleBypassRequest } from "../utils/bypassHandler.js";
import { saveRequestUsage, trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
import { getExecutor } from "../executors/index.js";
import { convertResponsesStreamToJson } from "../transformer/streamToJsonConverter.js";
/**
* Translate non-streaming response to OpenAI format
@@ -365,8 +366,12 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
const modelTargetFormat = getModelTargetFormat(alias, model);
const targetFormat = modelTargetFormat || getTargetFormat(provider);
// Track if client actually wants streaming (before we force it for providers)
const clientRequestedStreaming = body.stream === true;
const providerRequiresStreaming = provider === 'openai' || provider === 'codex';
// Force streaming for OpenAI/Codex models (they don't support non-streaming mode properly)
const stream = (provider === 'openai' || provider === 'codex') ? true : (body.stream !== false);
const stream = providerRequiresStreaming ? true : (body.stream !== false);
// Create request logger for this session: sourceFormat_targetFormat_model
const reqLogger = await createRequestLogger(sourceFormat, targetFormat, model);
@@ -552,6 +557,77 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
return createErrorResult(statusCode, errMsg, retryAfterMs);
}
// Provider forced streaming but client wants JSON - convert SSE to JSON
if (!clientRequestedStreaming && providerRequiresStreaming) {
trackPendingRequest(model, provider, connectionId, false);
const contentType = providerResponse.headers.get("content-type") || "";
// Treat as SSE if content-type says so OR if it's empty/missing
// (Codex API doesn't always set Content-Type on streaming responses)
const isSSEResponse = contentType.includes("text/event-stream") || (contentType === "" && provider === "codex");
if (isSSEResponse) {
const isResponsesApi = sourceFormat === 'openai-responses';
if (isResponsesApi) {
// Responses API SSE → Responses API JSON (for pydantic_ai, OpenAI SDK, etc.)
try {
const jsonResponse = await convertResponsesStreamToJson(providerResponse.body);
log?.info?.("STREAM", `Converted Responses API SSE → JSON for non-streaming client`);
if (onRequestSuccess) await onRequestSuccess();
const usage = jsonResponse.usage || {};
appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { });
if (usage && typeof usage === 'object') {
const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [USAGE] ${provider.toUpperCase()} | in=${usage?.input_tokens || 0} | out=${usage?.output_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`;
console.log(`${COLORS.green}${msg}${COLORS.reset}`);
saveRequestUsage({
provider: provider || "unknown",
model: model || "unknown",
tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 },
timestamp: new Date().toISOString(),
connectionId: connectionId || undefined,
apiKey: apiKey || undefined
}).catch(() => { });
}
return {
success: true,
response: new Response(JSON.stringify(jsonResponse), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*"
}
})
};
} catch (error) {
console.error("[ChatCore] Responses API SSE→JSON conversion failed:", error);
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
}
} else {
// Chat Completions SSE → Chat Completions JSON
const sseText = await providerResponse.text();
const parsed = parseSSEToOpenAIResponse(sseText, model);
if (parsed) {
if (onRequestSuccess) await onRequestSuccess();
appendRequestLog({ model, provider, connectionId, tokens: parsed.usage, status: "200 OK" }).catch(() => { });
return {
success: true,
response: new Response(JSON.stringify(parsed), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*"
}
})
};
}
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
}
}
}
// Non-streaming response
if (!stream) {
trackPendingRequest(model, provider, connectionId, false);

View File

@@ -6,6 +6,7 @@
import { handleChatCore } from "./chatCore.js";
import { convertResponsesApiFormat } from "../translator/helpers/responsesApiHelper.js";
import { createResponsesApiTransformStream } from "../transformer/responsesTransformer.js";
import { convertResponsesStreamToJson } from "../transformer/streamToJsonConverter.js";
/**
* Handle /v1/responses request
@@ -24,8 +25,12 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o
// Convert Responses API format to Chat Completions format
const convertedBody = convertResponsesApiFormat(body);
// Ensure stream is enabled
convertedBody.stream = true;
// Preserve client's stream preference (matches OpenClaw behavior)
// Default to false if omitted: Boolean(undefined) = false
const clientRequestedStreaming = convertedBody.stream === true;
if (convertedBody.stream === undefined) {
convertedBody.stream = false;
}
// Call chat core handler
const result = await handleChatCore({
@@ -46,26 +51,52 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o
const response = result.response;
const contentType = response.headers.get("Content-Type") || "";
// If not SSE or error, return as-is
if (!contentType.includes("text/event-stream") || response.status !== 200) {
return result;
// Case 1: Client wants non-streaming, but got SSE (provider forced it, e.g., Codex)
if (!clientRequestedStreaming && contentType.includes("text/event-stream")) {
try {
const jsonResponse = await convertResponsesStreamToJson(response.body);
return {
success: true,
response: new Response(JSON.stringify(jsonResponse), {
status: 200,
headers: {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"Access-Control-Allow-Origin": "*"
}
})
};
} catch (error) {
console.error("[Responses API] Stream-to-JSON conversion failed:", error);
return {
success: false,
status: 500,
error: "Failed to convert streaming response to JSON"
};
}
}
// Transform SSE stream to Responses API format (no logging in worker)
const transformStream = createResponsesApiTransformStream(null);
const transformedBody = response.body.pipeThrough(transformStream);
// Case 2: Client wants streaming, got SSE - transform it
if (clientRequestedStreaming && contentType.includes("text/event-stream")) {
const transformStream = createResponsesApiTransformStream(null);
const transformedBody = response.body.pipeThrough(transformStream);
return {
success: true,
response: new Response(transformedBody, {
status: 200,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*"
}
})
};
return {
success: true,
response: new Response(transformedBody, {
status: 200,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*"
}
})
};
}
// Case 3: Non-SSE response (error or non-streaming from provider) - return as-is
return result;
}

View File

@@ -36,8 +36,9 @@ function buildAnthropicCompatibleUrl(baseUrl) {
// Detect request format from body structure
export function detectFormat(body) {
// OpenAI Responses API: has input[] array instead of messages[]
if (body.input && Array.isArray(body.input)) {
// OpenAI Responses API: has input (array or string) instead of messages[]
// The Responses API accepts both input as array and input as a plain string
if (body.input && (Array.isArray(body.input) || typeof body.input === "string") && !body.messages) {
return "openai-responses";
}

View File

@@ -0,0 +1,103 @@
/**
* Stream-to-JSON Converter
* Converts Responses API SSE stream to single JSON response
* Used when client requests non-streaming but provider forces streaming (e.g., Codex)
*/
/**
* Convert Responses API SSE stream to single JSON response
* @param {ReadableStream} stream - SSE stream from provider
* @returns {Promise<Object>} Final JSON response in Responses API format
*/
export async function convertResponsesStreamToJson(stream) {
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = "";
let responseId = "";
let output = [];
let created = Math.floor(Date.now() / 1000);
let status = "in_progress";
let usage = { input_tokens: 0, output_tokens: 0, total_tokens: 0 };
// Map of output_index -> item (for ordered output array)
const items = new Map();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Split by double newline (SSE event separator)
const messages = buffer.split("\n\n");
buffer = messages.pop() || ""; // Keep incomplete message in buffer
for (const msg of messages) {
if (!msg.trim()) continue;
// Parse SSE event
const eventMatch = msg.match(/^event:\s*(.+)$/m);
const dataMatch = msg.match(/^data:\s*(.+)$/m);
if (!eventMatch || !dataMatch) continue;
const eventType = eventMatch[1].trim();
const dataStr = dataMatch[1].trim();
if (dataStr === "[DONE]") continue;
let parsed;
try {
parsed = JSON.parse(dataStr);
} catch {
// Skip malformed JSON
continue;
}
// Handle different event types
if (eventType === "response.created") {
responseId = parsed.response?.id || responseId;
created = parsed.response?.created_at || created;
}
else if (eventType === "response.output_item.done") {
const idx = parsed.output_index ?? 0;
items.set(idx, parsed.item);
}
else if (eventType === "response.completed") {
status = "completed";
if (parsed.response?.usage) {
usage.input_tokens = parsed.response.usage.input_tokens || 0;
usage.output_tokens = parsed.response.usage.output_tokens || 0;
usage.total_tokens = parsed.response.usage.total_tokens || 0;
}
}
else if (eventType === "response.failed") {
status = "failed";
}
}
}
} finally {
reader.releaseLock();
}
// Build output array from accumulated items (ordered by index)
const maxIndex = items.size > 0 ? Math.max(...items.keys()) : -1;
for (let i = 0; i <= maxIndex; i++) {
output.push(items.get(i) || {
type: "message",
content: [],
role: "assistant"
});
}
return {
id: responseId || `resp_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`,
object: "response",
created_at: created,
status: status || "completed",
output,
usage
};
}

View File

@@ -1,3 +1,18 @@
/**
* Normalize Responses API input to array format.
* Accepts string or array, returns array of message items.
* @param {string|Array} input - raw input from Responses API body
* @returns {Array|null} normalized array or null if invalid
*/
export function normalizeResponsesInput(input) {
if (typeof input === "string") {
const text = input.trim() === "" ? "..." : input;
return [{ type: "message", role: "user", content: [{ type: "input_text", text }] }];
}
if (Array.isArray(input)) return input;
return null;
}
/**
* Convert OpenAI Responses API format to standard chat completions format
* Responses API uses: { input: [...], instructions: "..." }
@@ -19,7 +34,10 @@ export function convertResponsesApiFormat(body) {
let pendingToolCalls = [];
let pendingToolResults = [];
for (const item of body.input) {
const inputItems = normalizeResponsesInput(body.input);
if (!inputItems) return body;
for (const item of inputItems) {
// Determine item type - Droid CLI sends role-based items without 'type' field
// Fallback: if no type but has role property, treat as message
const itemType = item.type || (item.role ? "message" : null);

View File

@@ -6,6 +6,7 @@
*/
import { register } from "../index.js";
import { FORMATS } from "../formats.js";
import { normalizeResponsesInput } from "../helpers/responsesApiHelper.js";
/**
* Convert OpenAI Responses API request to OpenAI Chat Completions format
@@ -25,7 +26,10 @@ export function openaiResponsesToOpenAIRequest(model, body, stream, credentials)
let currentAssistantMsg = null;
let pendingToolResults = [];
for (const item of body.input) {
const inputItems = normalizeResponsesInput(body.input);
if (!inputItems) return body;
for (const item of inputItems) {
// Determine item type - Droid CLI sends role-based items without 'type' field
// Fallback: if no type but has role property, treat as message
const itemType = item.type || (item.role ? "message" : null);
@@ -234,4 +238,3 @@ export function openaiToOpenAIResponsesRequest(model, body, stream, credentials)
// Register both directions
register(FORMATS.OPENAI_RESPONSES, FORMATS.OPENAI, openaiResponsesToOpenAIRequest, null);
register(FORMATS.OPENAI, FORMATS.OPENAI_RESPONSES, openaiToOpenAIResponsesRequest, null);