refactor: share e2e websocket open helper

This commit is contained in:
Vincent Koc
2026-05-29 19:44:39 +02:00
parent bf3921dab7
commit 1fd5a90894
7 changed files with 60 additions and 138 deletions

View File

@@ -3,9 +3,9 @@ import { setTimeout as delay } from "node:timers/promises";
import { WebSocket } from "ws";
import { PROTOCOL_VERSION } from "../../../../dist/gateway/protocol/index.js";
import { renderBitmapTextPngBase64 } from "../../../../test/helpers/live-image-probe.ts";
import { waitForWebSocketOpen } from "../websocket-open.mjs";
import { createJsonlRequestTailer } from "./jsonl-request-tail.mjs";
import { readPositiveIntEnv } from "./limits.mjs";
import { waitForWebSocketOpen } from "./open-websocket.mjs";
const port = process.env.PORT;
const token = process.env.OPENCLAW_GATEWAY_TOKEN;

View File

@@ -1,40 +0,0 @@
export function waitForWebSocketOpen(ws, timeoutMs, message = "gateway ws open timeout") {
return new Promise((resolve, reject) => {
let settled = false;
const settle = (fn, value) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
ws.off?.("open", onOpen);
ws.off?.("error", onError);
fn(value);
};
const onOpen = () => settle(resolve);
const onError = (error) => settle(reject, error);
const timer = setTimeout(() => {
const consumeAbortError = () => {};
const removeAbortErrorConsumer = () => {
ws.off?.("error", consumeAbortError);
ws.off?.("close", removeAbortErrorConsumer);
};
try {
ws.off?.("error", onError);
ws.on?.("error", consumeAbortError);
ws.once?.("close", removeAbortErrorConsumer);
ws.terminate?.();
if (typeof ws.terminate !== "function") {
ws.close?.();
}
} finally {
settle(reject, new Error(message));
}
}, timeoutMs);
timer.unref?.();
ws.once("open", onOpen);
ws.once("error", onError);
});
}

View File

@@ -1,7 +1,7 @@
import { WebSocket } from "ws";
import { PROTOCOL_VERSION } from "../../../../dist/gateway/protocol/index.js";
import { waitForWebSocketOpen } from "../websocket-open.mjs";
import { readGatewayNetworkClientConnectTimeoutMs } from "./limits.mjs";
import { waitForWebSocketOpen } from "./open-websocket.mjs";
const url = process.env.GW_URL;
const token = process.env.GW_TOKEN;

View File

@@ -1,12 +1,10 @@
import { spawnSync } from "node:child_process";
import { EventEmitter } from "node:events";
import { appendFileSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { createJsonlRequestTailer } from "../../scripts/e2e/lib/codex-media-path/jsonl-request-tail.mjs";
import { readPositiveIntEnv } from "../../scripts/e2e/lib/codex-media-path/limits.mjs";
import { waitForWebSocketOpen } from "../../scripts/e2e/lib/codex-media-path/open-websocket.mjs";
const tempRoots: string[] = [];
const writeConfigPath = path.resolve("scripts/e2e/lib/codex-media-path/write-config.mjs");
@@ -36,23 +34,6 @@ function runWriteConfig(root: string, env: Record<string, string> = {}) {
});
}
class FakeWebSocket extends EventEmitter {
terminated = false;
closed = false;
terminate(): void {
this.terminated = true;
queueMicrotask(() => {
this.emit("error", new Error("socket abort after terminate"));
this.emit("close");
});
}
close(): void {
this.closed = true;
}
}
afterEach(() => {
for (const root of tempRoots.splice(0)) {
rmSync(root, { recursive: true, force: true });
@@ -152,33 +133,3 @@ describe("codex media path JSONL tailer", () => {
expect(tailer.read()).toEqual([{ method: "turn/start" }]);
});
});
describe("codex media path WebSocket open guard", () => {
it("terminates sockets that never open", async () => {
const ws = new FakeWebSocket();
const keepAlive = setTimeout(() => {}, 100);
try {
await expect(waitForWebSocketOpen(ws, 1)).rejects.toThrow("gateway ws open timeout");
} finally {
clearTimeout(keepAlive);
}
expect(ws.terminated).toBe(true);
await new Promise((resolve) => setImmediate(resolve));
expect(ws.listenerCount("open")).toBe(0);
expect(ws.listenerCount("error")).toBe(0);
});
it("cleans listeners after successful opens", async () => {
const ws = new FakeWebSocket();
const opened = waitForWebSocketOpen(ws, 100);
ws.emit("open");
await expect(opened).resolves.toBeUndefined();
expect(ws.terminated).toBe(false);
expect(ws.listenerCount("open")).toBe(0);
expect(ws.listenerCount("error")).toBe(0);
});
});

View File

@@ -0,0 +1,58 @@
import { EventEmitter } from "node:events";
import { describe, expect, it } from "vitest";
import { waitForWebSocketOpen } from "../../scripts/e2e/lib/websocket-open.mjs";
class FakeWebSocket extends EventEmitter {
terminated = false;
terminate(): void {
this.terminated = true;
queueMicrotask(() => {
this.emit("error", new Error("socket abort after terminate"));
this.emit("close");
});
}
}
describe("E2E WebSocket open guard", () => {
it("consumes abort errors after open timeouts", async () => {
const ws = new FakeWebSocket();
const keepAlive = setTimeout(() => {}, 100);
try {
await expect(waitForWebSocketOpen(ws, 1)).rejects.toThrow("ws open timeout");
} finally {
clearTimeout(keepAlive);
}
await new Promise((resolve) => setImmediate(resolve));
expect(ws.terminated).toBe(true);
expect(ws.listenerCount("open")).toBe(0);
expect(ws.listenerCount("error")).toBe(0);
});
it("uses caller-specific timeout messages", async () => {
const ws = new FakeWebSocket();
const keepAlive = setTimeout(() => {}, 100);
try {
await expect(waitForWebSocketOpen(ws, 1, "gateway ws open timeout")).rejects.toThrow(
"gateway ws open timeout",
);
} finally {
clearTimeout(keepAlive);
}
});
it("cleans listeners after successful opens", async () => {
const ws = new FakeWebSocket();
const opened = waitForWebSocketOpen(ws, 100);
ws.emit("open");
await expect(opened).resolves.toBeUndefined();
expect(ws.terminated).toBe(false);
expect(ws.listenerCount("open")).toBe(0);
expect(ws.listenerCount("error")).toBe(0);
});
});

View File

@@ -1,24 +1,5 @@
import { EventEmitter } from "node:events";
import { describe, expect, it } from "vitest";
import { readGatewayNetworkClientConnectTimeoutMs } from "../../scripts/e2e/lib/gateway-network/limits.mjs";
import { waitForWebSocketOpen } from "../../scripts/e2e/lib/gateway-network/open-websocket.mjs";
class FakeWebSocket extends EventEmitter {
terminated = false;
closed = false;
terminate(): void {
this.terminated = true;
queueMicrotask(() => {
this.emit("error", new Error("socket abort after terminate"));
this.emit("close");
});
}
close(): void {
this.closed = true;
}
}
describe("gateway network WebSocket open guard", () => {
it("rejects loose client timeout env values instead of parsing prefixes", () => {
@@ -52,32 +33,4 @@ describe("gateway network WebSocket open guard", () => {
}),
).toBe(3000);
});
it("consumes abort errors after open timeouts", async () => {
const ws = new FakeWebSocket();
const keepAlive = setTimeout(() => {}, 100);
try {
await expect(waitForWebSocketOpen(ws, 1)).rejects.toThrow("ws open timeout");
} finally {
clearTimeout(keepAlive);
}
await new Promise((resolve) => setImmediate(resolve));
expect(ws.terminated).toBe(true);
expect(ws.listenerCount("open")).toBe(0);
expect(ws.listenerCount("error")).toBe(0);
});
it("cleans listeners after successful opens", async () => {
const ws = new FakeWebSocket();
const opened = waitForWebSocketOpen(ws, 100);
ws.emit("open");
await expect(opened).resolves.toBeUndefined();
expect(ws.terminated).toBe(false);
expect(ws.listenerCount("open")).toBe(0);
expect(ws.listenerCount("error")).toBe(0);
});
});