diff --git a/scripts/e2e/lib/gateway-network/client.mjs b/scripts/e2e/lib/gateway-network/client.mjs index ecfb48516434..0ff784dd063b 100644 --- a/scripts/e2e/lib/gateway-network/client.mjs +++ b/scripts/e2e/lib/gateway-network/client.mjs @@ -2,6 +2,7 @@ import { WebSocket } from "ws"; import { PROTOCOL_VERSION } from "../../../../dist/gateway/protocol/index.js"; import { waitForWebSocketOpen } from "../websocket-open.mjs"; import { readGatewayNetworkClientConnectTimeoutMs } from "./limits.mjs"; +import { onceFrame } from "./ws-frames.mjs"; const url = process.env.GW_URL; const token = process.env.GW_TOKEN; @@ -23,25 +24,6 @@ async function openSocket(timeoutMs = 10_000) { return ws; } -function onceFrame(ws, filter, timeoutMs = 10_000) { - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - ws.off("message", handler); - reject(new Error("timeout")); - }, timeoutMs); - const handler = (data) => { - const obj = JSON.parse(String(data)); - if (!filter(obj)) { - return; - } - clearTimeout(timer); - ws.off("message", handler); - resolve(obj); - }; - ws.on("message", handler); - }); -} - function responseError(method, response) { const message = response.error?.message ?? "unknown"; return new Error(`${method} failed: ${message}`); @@ -50,6 +32,7 @@ function responseError(method, response) { function isRetryableStartupError(message) { return ( message.includes("gateway starting") || + message.includes("closed before frame") || message.includes("closed before open") || message.includes("ws open timeout") || message.includes("ECONNREFUSED") || diff --git a/scripts/e2e/lib/gateway-network/ws-frames.mjs b/scripts/e2e/lib/gateway-network/ws-frames.mjs new file mode 100644 index 000000000000..95c6b579f85f --- /dev/null +++ b/scripts/e2e/lib/gateway-network/ws-frames.mjs @@ -0,0 +1,66 @@ +function formatCloseValue(value) { + if (value === undefined || value === null) { + return ""; + } + if (typeof value === "string") { + return value; + } + if (typeof value === "number" || typeof value === "boolean" || typeof value === "bigint") { + return value.toString(); + } + if (value instanceof Uint8Array) { + return Buffer.from(value).toString(); + } + return JSON.stringify(value) ?? ""; +} + +export function onceFrame(ws, filter, timeoutMs = 10_000) { + return new Promise((resolve, reject) => { + let settled = false; + + const cleanup = () => { + clearTimeout(timer); + ws.off?.("message", onMessage); + ws.off?.("error", onError); + ws.off?.("close", onClose); + }; + const settle = (fn, value) => { + if (settled) { + return; + } + settled = true; + cleanup(); + fn(value); + }; + const onMessage = (data) => { + let obj; + try { + obj = JSON.parse(String(data)); + if (!filter(obj)) { + return; + } + } catch (error) { + settle(reject, error instanceof Error ? error : new Error(String(error))); + return; + } + settle(resolve, obj); + }; + const onError = (error) => + settle(reject, error instanceof Error ? error : new Error(String(error))); + const onClose = (code, reason) => { + const closeDetails = [formatCloseValue(code), formatCloseValue(reason)] + .filter(Boolean) + .join(" "); + const suffix = closeDetails ? `: ${closeDetails}` : ""; + settle(reject, new Error(`closed before frame${suffix}`)); + }; + const timer = setTimeout(() => { + settle(reject, new Error("timeout")); + }, timeoutMs); + timer.unref?.(); + + ws.on("message", onMessage); + ws.once("error", onError); + ws.once("close", onClose); + }); +} diff --git a/test/scripts/gateway-network-client.test.ts b/test/scripts/gateway-network-client.test.ts index 3810ceb30bb8..81e414a0d8ce 100644 --- a/test/scripts/gateway-network-client.test.ts +++ b/test/scripts/gateway-network-client.test.ts @@ -1,6 +1,8 @@ +import { EventEmitter } from "node:events"; import { readFileSync } from "node:fs"; import { describe, expect, it } from "vitest"; import { readGatewayNetworkClientConnectTimeoutMs } from "../../scripts/e2e/lib/gateway-network/limits.mjs"; +import { onceFrame } from "../../scripts/e2e/lib/gateway-network/ws-frames.mjs"; describe("gateway network WebSocket open guard", () => { it("rejects loose client timeout env values instead of parsing prefixes", () => { @@ -35,6 +37,54 @@ describe("gateway network WebSocket open guard", () => { ).toBe(3000); }); + it("resolves matching frames and ignores unrelated frames", async () => { + const ws = new EventEmitter(); + const frame = onceFrame(ws, (message) => message?.id === "target", 1000); + + ws.emit("message", JSON.stringify({ id: "noise" })); + ws.emit("message", JSON.stringify({ id: "target", ok: true })); + + await expect(frame).resolves.toEqual({ id: "target", ok: true }); + }); + + it("times out when no matching frame arrives", async () => { + const ws = new EventEmitter(); + const frame = onceFrame(ws, () => false, 10); + + ws.emit("message", JSON.stringify({ id: "noise" })); + + await expect(frame).rejects.toThrow("timeout"); + }); + + it("rejects frame waits immediately when the socket closes", async () => { + const ws = new EventEmitter(); + const startedAt = Date.now(); + const frame = onceFrame(ws, () => false, 1000); + + ws.emit("close", 1006, Buffer.from("bye")); + + await expect(frame).rejects.toThrow("closed before frame: 1006 bye"); + expect(Date.now() - startedAt).toBeLessThan(250); + }); + + it("rejects frame waits immediately on socket errors", async () => { + const ws = new EventEmitter(); + const frame = onceFrame(ws, () => false, 1000); + + ws.emit("error", new Error("socket exploded")); + + await expect(frame).rejects.toThrow("socket exploded"); + }); + + it("rejects invalid JSON frames instead of crashing the process", async () => { + const ws = new EventEmitter(); + const frame = onceFrame(ws, () => false, 1000); + + ws.emit("message", "{nope"); + + await expect(frame).rejects.toThrow(); + }); + it("proves health after the authenticated connect handshake", () => { const client = readFileSync("scripts/e2e/lib/gateway-network/client.mjs", "utf8"); const connectIndex = client.indexOf('method: "connect"'); @@ -44,5 +94,6 @@ describe("gateway network WebSocket open guard", () => { expect(healthIndex).toBeGreaterThan(connectIndex); expect(client).toContain('responseError("health", healthRes)'); expect(client).toContain('message.includes("closed before open")'); + expect(client).toContain('message.includes("closed before frame")'); }); });