refactor: share script bounded response reader

This commit is contained in:
Vincent Koc
2026-05-29 22:53:38 +02:00
parent 95f9231136
commit 6fd8cfd5bb
5 changed files with 105 additions and 248 deletions

View File

@@ -6,6 +6,7 @@ import { GoogleGenAI, Modality } from "@google/genai";
import { chromium, type Browser } from "playwright";
import { createServer } from "vite";
import { buildOpenAIRealtimeVoiceProvider } from "../../extensions/openai/realtime-voice-provider.ts";
import { readBoundedResponseText } from "../lib/bounded-response.ts";
import {
parseStrictIntegerOption,
previewForDevToolLog,
@@ -50,95 +51,16 @@ function shortError(error: unknown): string {
return previewForDevToolLog(error instanceof Error ? error.message : String(error), 800);
}
function responseBodyTooLargeError(label: string, maxBytes: number): Error {
return new Error(`${label} response body exceeded ${maxBytes} bytes`);
}
async function readBoundedText(
response: Response,
label: string,
maxBytes = OPENAI_HTTP_RESPONSE_MAX_BYTES,
signal?: AbortSignal,
): Promise<string> {
const contentLength = Number(response.headers.get("content-length") ?? "");
if (Number.isSafeInteger(contentLength) && contentLength > maxBytes) {
await response.body?.cancel().catch(() => undefined);
throw responseBodyTooLargeError(label, maxBytes);
}
if (!response.body) {
return "";
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
const chunks: string[] = [];
let totalBytes = 0;
let canceled = false;
try {
for (;;) {
const { done, value } = await readResponseChunk(reader, label, signal, () => {
canceled = true;
});
if (done) {
const tail = decoder.decode();
if (tail) {
chunks.push(tail);
}
break;
}
totalBytes += value.byteLength;
if (totalBytes > maxBytes) {
canceled = true;
await reader.cancel().catch(() => undefined);
throw responseBodyTooLargeError(label, maxBytes);
}
chunks.push(decoder.decode(value, { stream: true }));
}
} finally {
if (!canceled) {
reader.releaseLock();
}
}
return chunks.join("");
}
async function readResponseChunk(
reader: ReadableStreamDefaultReader<Uint8Array>,
label: string,
signal: AbortSignal | undefined,
markCanceled: () => void,
): Promise<ReadableStreamReadResult<Uint8Array>> {
if (!signal) {
return await reader.read();
}
if (signal.aborted) {
markCanceled();
await reader.cancel().catch(() => undefined);
throw signal.reason instanceof Error ? signal.reason : new Error(`${label} request aborted`);
}
let removeAbortListener: (() => void) | undefined;
const abortPromise = new Promise<ReadableStreamReadResult<Uint8Array>>((_resolve, reject) => {
const onAbort = () => {
markCanceled();
void reader.cancel().catch(() => undefined);
reject(
signal.reason instanceof Error ? signal.reason : new Error(`${label} request aborted`),
);
};
signal.addEventListener("abort", onAbort, { once: true });
removeAbortListener = () => signal.removeEventListener("abort", onAbort);
return await readBoundedResponseText(response, label, maxBytes, {
createTooLargeError: (message) => new Error(message),
signal,
});
try {
return await Promise.race([reader.read(), abortPromise]);
} finally {
removeAbortListener?.();
}
}
async function readBoundedJsonResponse(