This commit is contained in:
decolua
2026-01-16 13:09:13 +07:00
parent 6b22b1f490
commit 1d481c2862

View File

@@ -52,15 +52,15 @@ export class KiroExecutor extends BaseExecutor {
// For Kiro, we need to transform the binary EventStream to SSE
// Create a TransformStream to convert binary to SSE text
const transformedResponse = this.transformEventStreamToSSE(response, model);
return { response: transformedResponse, url, headers, transformedBody };
}
/**
* Transform AWS EventStream binary response to SSE text stream
* Using TransformStream instead of ReadableStream.pull() to avoid Workers timeout
*/
transformEventStreamToSSE(response, model) {
const reader = response.body.getReader();
let buffer = new Uint8Array(0);
let chunkIndex = 0;
const responseId = `chatcmpl-${Date.now()}`;
@@ -70,16 +70,172 @@ export class KiroExecutor extends BaseExecutor {
finishEmitted: false,
hasToolCalls: false,
toolCallIndex: 0,
seenToolIds: new Map() // Map toolUseId -> index
seenToolIds: new Map()
};
const stream = new ReadableStream({
async pull(controller) {
try {
const { done, value } = await reader.read();
if (done) {
// Emit finish_reason chunk if not already sent
const transformStream = new TransformStream({
async transform(chunk, controller) {
// Append to buffer
const newBuffer = new Uint8Array(buffer.length + chunk.length);
newBuffer.set(buffer);
newBuffer.set(chunk, buffer.length);
buffer = newBuffer;
// Parse events from buffer
let iterations = 0;
const maxIterations = 1000;
while (buffer.length >= 16 && iterations < maxIterations) {
iterations++;
const view = new DataView(buffer.buffer, buffer.byteOffset);
const totalLength = view.getUint32(0, false);
if (totalLength < 16 || totalLength > buffer.length || buffer.length < totalLength) break;
const eventData = buffer.slice(0, totalLength);
buffer = buffer.slice(totalLength);
const event = parseEventFrame(eventData);
if (!event) continue;
const eventType = event.headers[":event-type"] || "";
// Handle assistantResponseEvent
if (eventType === "assistantResponseEvent" && event.payload?.content) {
const chunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: chunkIndex === 0
? { role: "assistant", content: event.payload.content }
: { content: event.payload.content },
finish_reason: null
}]
};
chunkIndex++;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
// Handle codeEvent
if (eventType === "codeEvent" && event.payload?.content) {
const chunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: { content: event.payload.content },
finish_reason: null
}]
};
chunkIndex++;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
// Handle toolUseEvent
if (eventType === "toolUseEvent" && event.payload) {
state.hasToolCalls = true;
const toolUse = event.payload;
const toolUses = Array.isArray(toolUse) ? toolUse : [toolUse];
for (const singleToolUse of toolUses) {
const toolCallId = singleToolUse.toolUseId || `call_${Date.now()}`;
const toolName = singleToolUse.name || "";
const toolInput = singleToolUse.input;
let toolIndex;
const isNewTool = !state.seenToolIds.has(toolCallId);
if (isNewTool) {
toolIndex = state.toolCallIndex++;
state.seenToolIds.set(toolCallId, toolIndex);
const startChunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: {
...(chunkIndex === 0 ? { role: "assistant" } : {}),
tool_calls: [{
index: toolIndex,
id: toolCallId,
type: "function",
function: {
name: toolName,
arguments: ""
}
}]
},
finish_reason: null
}]
};
chunkIndex++;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(startChunk)}\n\n`));
} else {
toolIndex = state.seenToolIds.get(toolCallId);
}
if (toolInput !== undefined) {
let argumentsStr;
if (typeof toolInput === 'string') {
argumentsStr = toolInput;
} else if (typeof toolInput === 'object') {
argumentsStr = JSON.stringify(toolInput);
} else {
continue;
}
const argsChunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: {
tool_calls: [{
index: toolIndex,
function: {
arguments: argumentsStr
}
}]
},
finish_reason: null
}]
};
chunkIndex++;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(argsChunk)}\n\n`));
}
}
}
// Handle messageStopEvent
if (eventType === "messageStopEvent") {
const chunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: {},
finish_reason: state.hasToolCalls ? "tool_calls" : "stop"
}]
};
state.finishEmitted = true;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
// Detect end of stream
if ((eventType === "meteringEvent" || eventType === "contextUsageEvent") && !state.endDetected) {
state.endDetected = true;
if (!state.finishEmitted) {
state.finishEmitted = true;
const finishChunk = {
@@ -95,217 +251,41 @@ export class KiroExecutor extends BaseExecutor {
};
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(finishChunk)}\n\n`));
}
// Send final done message
controller.enqueue(new TextEncoder().encode("data: [DONE]\n\n"));
controller.close();
return;
}
}
// Append to buffer
const newBuffer = new Uint8Array(buffer.length + value.length);
newBuffer.set(buffer);
newBuffer.set(value, buffer.length);
buffer = newBuffer;
// Parse events from buffer
while (buffer.length >= 16) {
const view = new DataView(buffer.buffer, buffer.byteOffset);
const totalLength = view.getUint32(0, false);
if (totalLength < 16 || buffer.length < totalLength) break;
// Extract event
const eventData = buffer.slice(0, totalLength);
buffer = buffer.slice(totalLength);
// Parse event headers and payload
const event = parseEventFrame(eventData);
if (!event) continue;
const eventType = event.headers[":event-type"] || "";
// Handle assistantResponseEvent
if (eventType === "assistantResponseEvent" && event.payload?.content) {
const chunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: chunkIndex === 0
? { role: "assistant", content: event.payload.content }
: { content: event.payload.content },
finish_reason: null
}]
};
chunkIndex++;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
// Handle codeEvent
if (eventType === "codeEvent" && event.payload?.content) {
const chunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: { content: event.payload.content },
finish_reason: null
}]
};
chunkIndex++;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
// Handle toolUseEvent
if (eventType === "toolUseEvent" && event.payload) {
state.hasToolCalls = true; // Track that we have tool calls
const toolUse = event.payload;
// AWS Kiro sends toolUse as object or array
// If it's an array, process each tool separately
const toolUses = Array.isArray(toolUse) ? toolUse : [toolUse];
for (const singleToolUse of toolUses) {
const toolCallId = singleToolUse.toolUseId || `call_${Date.now()}`;
const toolName = singleToolUse.name || "";
const toolInput = singleToolUse.input; // Can be undefined, string, or object
// Get or assign tool call index
let toolIndex;
const isNewTool = !state.seenToolIds.has(toolCallId);
if (isNewTool) {
// NEW TOOL: Create start chunk
toolIndex = state.toolCallIndex++;
state.seenToolIds.set(toolCallId, toolIndex);
const startChunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: {
...(chunkIndex === 0 ? { role: "assistant" } : {}),
tool_calls: [{
index: toolIndex,
id: toolCallId,
type: "function",
function: {
name: toolName,
arguments: ""
}
}]
},
finish_reason: null
}]
};
chunkIndex++;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(startChunk)}\n\n`));
} else {
// EXISTING TOOL: Get its index
toolIndex = state.seenToolIds.get(toolCallId);
}
// Emit arguments chunk if input exists
// AWS Kiro streams input as: undefined (first event) → string chunks
if (toolInput !== undefined) {
let argumentsStr;
if (typeof toolInput === 'string') {
// AWS Kiro sends partial JSON as STRING
argumentsStr = toolInput;
} else if (typeof toolInput === 'object') {
// Fallback: if it's an object, stringify it
argumentsStr = JSON.stringify(toolInput);
} else {
// Skip if not string or object
continue;
}
const argsChunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: {
tool_calls: [{
index: toolIndex,
function: {
arguments: argumentsStr
}
}]
},
finish_reason: null
}]
};
chunkIndex++;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(argsChunk)}\n\n`));
}
}
}
// Handle messageStopEvent
if (eventType === "messageStopEvent") {
const chunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: {},
finish_reason: state.hasToolCalls ? "tool_calls" : "stop"
}]
};
state.finishEmitted = true;
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
// Detect end of stream: meteringEvent + contextUsageEvent usually come last
// Kiro doesn't always send messageStopEvent, so we need to detect completion
if ((eventType === "meteringEvent" || eventType === "contextUsageEvent") && !state.endDetected) {
state.endDetected = true;
// Schedule finish chunk emission after a short delay
setTimeout(() => {
if (!state.finishEmitted) {
state.finishEmitted = true;
const finishChunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: {},
finish_reason: state.hasToolCalls ? "tool_calls" : "stop"
}]
};
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(finishChunk)}\n\n`));
}
}, 100); // 100ms delay to check for more events
}
}
} catch (error) {
controller.error(error);
if (iterations >= maxIterations) {
console.warn("[Kiro] Max iterations reached in event parsing");
}
},
cancel() {
reader.cancel();
flush(controller) {
// Emit finish chunk if not already sent
if (!state.finishEmitted) {
state.finishEmitted = true;
const finishChunk = {
id: responseId,
object: "chat.completion.chunk",
created,
model,
choices: [{
index: 0,
delta: {},
finish_reason: state.hasToolCalls ? "tool_calls" : "stop"
}]
};
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(finishChunk)}\n\n`));
}
// Send final done message
controller.enqueue(new TextEncoder().encode("data: [DONE]\n\n"));
}
});
// Create new response with SSE headers
return new Response(stream, {
// Pipe response body through transform stream
const transformedStream = response.body.pipeThrough(transformStream);
return new Response(transformedStream, {
status: response.status,
statusText: response.statusText,
headers: {
@@ -342,7 +322,7 @@ function parseEventFrame(data) {
try {
const view = new DataView(data.buffer, data.byteOffset);
const headersLength = view.getUint32(4, false);
// Parse headers
const headers = {};
let offset = 12; // After prelude
@@ -363,7 +343,7 @@ function parseEventFrame(data) {
const valueLen = (data[offset] << 8) | data[offset + 1];
offset += 2;
if (offset + valueLen > data.length) break;
const value = new TextDecoder().decode(data.slice(offset, offset + valueLen));
offset += valueLen;
headers[name] = value;
@@ -375,16 +355,16 @@ function parseEventFrame(data) {
// Parse payload
const payloadStart = 12 + headersLength;
const payloadEnd = data.length - 4; // Exclude message CRC
let payload = null;
if (payloadEnd > payloadStart) {
const payloadStr = new TextDecoder().decode(data.slice(payloadStart, payloadEnd));
// Skip empty or whitespace-only payloads
if (!payloadStr || !payloadStr.trim()) {
return { headers, payload: null };
}
try {
payload = JSON.parse(payloadStr);
} catch (parseError) {