diff --git a/scripts/dev/gateway-ws-client.ts b/scripts/dev/gateway-ws-client.ts index 9c6ff5522257..9537a5883e21 100644 --- a/scripts/dev/gateway-ws-client.ts +++ b/scripts/dev/gateway-ws-client.ts @@ -1,173 +1 @@ -import { randomUUID } from "node:crypto"; -import WebSocket from "ws"; - -export type GatewayReqFrame = { type: "req"; id: string; method: string; params?: unknown }; -export type GatewayResFrame = { - type: "res"; - id: string; - ok: boolean; - payload?: unknown; - error?: unknown; -}; -export type GatewayEventFrame = { type: "event"; event: string; seq?: number; payload?: unknown }; -export type GatewayFrame = - | GatewayReqFrame - | GatewayResFrame - | GatewayEventFrame - | { type: string; [key: string]: unknown }; - -export function createArgReader(argv = process.argv.slice(2)) { - const get = (flag: string) => { - const idx = argv.indexOf(flag); - if (idx !== -1 && idx + 1 < argv.length) { - return argv[idx + 1]; - } - return undefined; - }; - const has = (flag: string) => argv.includes(flag); - return { argv, get, has }; -} - -export function resolveGatewayUrl(urlRaw: string): URL { - const url = new URL(urlRaw.includes("://") ? urlRaw : `wss://${urlRaw}`); - if (!url.port) { - url.port = url.protocol === "wss:" ? "443" : "80"; - } - return url; -} - -function toText(data: WebSocket.RawData): string { - if (typeof data === "string") { - return data; - } - if (data instanceof ArrayBuffer) { - return Buffer.from(data).toString("utf8"); - } - if (Array.isArray(data)) { - return Buffer.concat(data.map((chunk) => Buffer.from(chunk))).toString("utf8"); - } - return Buffer.from(data as Buffer).toString("utf8"); -} - -export function createGatewayWsClient(params: { - url: string; - handshakeTimeoutMs?: number; - openTimeoutMs?: number; - onEvent?: (evt: GatewayEventFrame) => void; -}) { - const ws = new WebSocket(params.url, { handshakeTimeout: params.handshakeTimeoutMs ?? 8000 }); - const pending = new Map< - string, - { - resolve: (res: GatewayResFrame) => void; - reject: (err: Error) => void; - timeout: ReturnType; - } - >(); - - const rejectPending = (error: Error) => { - for (const waiter of pending.values()) { - clearTimeout(waiter.timeout); - waiter.reject(error); - } - pending.clear(); - }; - - const request = (method: string, paramsObj?: unknown, timeoutMs = 12_000) => - new Promise((resolve, reject) => { - if (ws.readyState !== WebSocket.OPEN) { - reject(new Error(`gateway websocket is not open for ${method}`)); - return; - } - const id = randomUUID(); - const frame: GatewayReqFrame = { type: "req", id, method, params: paramsObj }; - const timeout = setTimeout(() => { - pending.delete(id); - reject(new Error(`timeout waiting for ${method}`)); - }, timeoutMs); - pending.set(id, { resolve, reject, timeout }); - try { - ws.send(JSON.stringify(frame), (err) => { - if (!err) { - return; - } - const waiter = pending.get(id); - if (!waiter) { - return; - } - pending.delete(id); - clearTimeout(waiter.timeout); - waiter.reject(err instanceof Error ? err : new Error(String(err))); - }); - } catch (err) { - pending.delete(id); - clearTimeout(timeout); - reject(err instanceof Error ? err : new Error(String(err))); - } - }); - - const waitOpen = () => - new Promise((resolve, reject) => { - const cleanup = () => { - clearTimeout(t); - ws.off("open", onOpen); - ws.off("error", onError); - }; - const onOpen = () => { - cleanup(); - resolve(); - }; - const onError = (err: Error) => { - cleanup(); - reject(err instanceof Error ? err : new Error(String(err))); - }; - const t = setTimeout(() => { - cleanup(); - ws.terminate(); - reject(new Error("ws open timeout")); - }, params.openTimeoutMs ?? 8000); - ws.once("open", onOpen); - ws.once("error", onError); - }); - - ws.on("message", (data) => { - const text = toText(data); - let frame: GatewayFrame | null; - try { - frame = JSON.parse(text) as GatewayFrame; - } catch { - return; - } - if (!frame || typeof frame !== "object" || !("type" in frame)) { - return; - } - if (frame.type === "res") { - const res = frame as GatewayResFrame; - const waiter = pending.get(res.id); - if (waiter) { - pending.delete(res.id); - clearTimeout(waiter.timeout); - waiter.resolve(res); - } - return; - } - if (frame.type === "event") { - const evt = frame as GatewayEventFrame; - params.onEvent?.(evt); - } - }); - ws.on("close", (code, reason) => { - const suffix = reason.length > 0 ? `: ${reason.toString("utf8")}` : ""; - rejectPending(new Error(`gateway websocket closed (${code})${suffix}`)); - }); - ws.on("error", (err) => { - rejectPending(err instanceof Error ? err : new Error(String(err))); - }); - - const close = () => { - rejectPending(new Error("gateway websocket client closed")); - ws.close(); - }; - - return { ws, request, waitOpen, close }; -} +export * from "../lib/gateway-ws-client.ts"; diff --git a/scripts/e2e/lib/codex-media-path/client.mjs b/scripts/e2e/lib/codex-media-path/client.mjs index 02834c39fbe4..19bc2ad01d31 100644 --- a/scripts/e2e/lib/codex-media-path/client.mjs +++ b/scripts/e2e/lib/codex-media-path/client.mjs @@ -1,10 +1,9 @@ import { createHash, randomBytes, randomUUID } from "node:crypto"; import { setTimeout as delay } from "node:timers/promises"; -import { WebSocket } from "ws"; import { PROTOCOL_VERSION } from "../../../../dist/gateway/protocol/index.js"; import { renderBitmapTextPngBase64 } from "../../../../test/helpers/live-image-probe.ts"; +import { createGatewayWsClient } from "../../../lib/gateway-ws-client.ts"; import { resolveGatewaySuccessPayload } from "../gateway-frame-payload.mjs"; -import { waitForWebSocketOpen } from "../websocket-open.mjs"; import { createJsonlRequestTailer } from "./jsonl-request-tail.mjs"; import { readPositiveIntEnv } from "./limits.mjs"; @@ -49,77 +48,26 @@ async function waitFor(label, predicate, timeoutMs) { throw new Error(`timeout waiting for ${label}`); } -function wsDataToString(data) { - if (typeof data === "string") { - return data; - } - if (Buffer.isBuffer(data)) { - return data.toString("utf8"); - } - if (Array.isArray(data)) { - return Buffer.concat(data).toString("utf8"); - } - return Buffer.from(data).toString("utf8"); -} - async function connectGateway() { - const ws = new WebSocket(`ws://127.0.0.1:${port}`); - await waitForWebSocketOpen(ws, 45_000, "gateway ws open timeout"); - - const pending = new Map(); - ws.on("message", (data) => { - let frame; - try { - frame = JSON.parse(wsDataToString(data)); - } catch { - return; - } - if (frame?.type === "event" && typeof frame.event === "string") { - return; - } - if (frame?.type !== "res" || typeof frame.id !== "string") { - return; - } - const match = pending.get(frame.id); - if (!match) { - return; - } - pending.delete(frame.id); - if (frame.ok === true) { - match.resolve(resolveGatewaySuccessPayload(frame)); - return; - } - match.reject(new Error(frame.error?.message ?? "gateway request failed")); - }); - ws.once("close", (code, reason) => { - const error = new Error(`gateway closed (${code}): ${wsDataToString(reason)}`); - for (const entry of pending.values()) { - entry.reject(error); - } - pending.clear(); + const gatewayClient = createGatewayWsClient({ + handshakeTimeoutMs: 45_000, + openTimeoutMs: 45_000, + openTimeoutMessage: "gateway ws open timeout", + url: `ws://127.0.0.1:${port}`, }); + await gatewayClient.waitOpen(); - function request(method, params, opts = {}) { - const id = randomUUID(); + async function request(method, params, opts = {}) { const timeoutMs = opts.timeoutMs ?? 60_000; - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - pending.delete(id); - reject(new Error(`gateway request timeout: ${method}`)); - }, timeoutMs); - timer.unref?.(); - pending.set(id, { - resolve: (value) => { - clearTimeout(timer); - resolve(value); - }, - reject: (error) => { - clearTimeout(timer); - reject(toLintErrorObject(error, "Non-Error rejection")); - }, - }); - ws.send(JSON.stringify({ type: "req", id, method, params: params ?? {} })); - }); + const response = await gatewayClient.request(method, params ?? {}, timeoutMs); + if (response.ok) { + return resolveGatewaySuccessPayload(response); + } + throw new Error( + response.error && typeof response.error === "object" && "message" in response.error + ? String(response.error.message) + : "gateway request failed", + ); } await request( @@ -146,18 +94,7 @@ async function connectGateway() { return { request, async close() { - if (ws.readyState === WebSocket.CLOSED) { - return; - } - await new Promise((resolve) => { - const timer = setTimeout(resolve, 2_000); - timer.unref?.(); - ws.once("close", () => { - clearTimeout(timer); - resolve(); - }); - ws.close(); - }); + gatewayClient.close(); }, }; } @@ -237,17 +174,3 @@ try { } finally { await gateway.close(); } - -function toLintErrorObject(value, fallbackMessage) { - if (value instanceof Error) { - return value; - } - if (typeof value === "string") { - return new Error(value); - } - const error = new Error(fallbackMessage, { cause: value }); - if ((typeof value === "object" && value !== null) || typeof value === "function") { - Object.assign(error, value); - } - return error; -} diff --git a/scripts/e2e/mcp-channels-harness.ts b/scripts/e2e/mcp-channels-harness.ts index a624b5848600..330194e9eeda 100644 --- a/scripts/e2e/mcp-channels-harness.ts +++ b/scripts/e2e/mcp-channels-harness.ts @@ -1,22 +1,19 @@ // Shared MCP-channel Docker E2E harness helpers. // The mounted test harness imports packaged dist modules so bridge assertions run // against the OpenClaw npm tarball installed in the functional image. -import { randomUUID } from "node:crypto"; import process from "node:process"; import { setTimeout as delay } from "node:timers/promises"; import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; -import { WebSocket } from "ws"; import { z } from "zod"; import { PROTOCOL_VERSION } from "../../dist/gateway/protocol/index.js"; import { formatErrorMessage } from "../../dist/infra/errors.js"; -import { rawDataToString } from "../../dist/infra/ws.js"; import { readStringValue } from "../../dist/normalization-core/string-coerce.js"; +import { createGatewayWsClient, type GatewayEventFrame } from "../lib/gateway-ws-client.ts"; +import { resolveGatewaySuccessPayload } from "./lib/gateway-frame-payload.mjs"; import { readMcpChannelLimits } from "./mcp-channel-limits.ts"; import { createMcpClientTempState, type McpClientTempState } from "./mcp-client-temp-state.ts"; import { connectMcpWithTimeout } from "./mcp-connect-timeout.ts"; -import { resolveGatewaySuccessPayload } from "./lib/gateway-frame-payload.mjs"; -import { waitForWebSocketOpen } from "./mcp-websocket-open.ts"; export const ClaudeChannelNotificationSchema = z.object({ method: z.literal("notifications/claude/channel"), @@ -136,116 +133,43 @@ async function connectGatewayOnce(params: { url: string; token: string; }): Promise { - const ws = new WebSocket(params.url); - await waitForWebSocketOpen(ws, GATEWAY_WS_OPEN_TIMEOUT_MS, "gateway ws open timeout"); - - const pending = new Map< - string, - { - resolve: (value: unknown) => void; - reject: (error: Error) => void; - } - >(); const requestedScopes = ["operator.read", "operator.write", "operator.pairing", "operator.admin"]; const events: Array<{ event: string; payload: Record }> = []; - - ws.on("message", (data) => { - let frame: unknown; - try { - frame = JSON.parse(rawDataToString(data)); - } catch { - return; - } - if (!frame || typeof frame !== "object") { - return; - } - const typed = frame as { - type?: unknown; - event?: unknown; - payload?: unknown; - id?: unknown; - ok?: unknown; - result?: unknown; - error?: { message?: unknown } | null; - }; - if (typed.type === "event" && typeof typed.event === "string") { + const gatewayClient = createGatewayWsClient({ + handshakeTimeoutMs: GATEWAY_WS_OPEN_TIMEOUT_MS, + onEvent(event: GatewayEventFrame) { pushBounded( events, { - event: typed.event, + event: event.event, payload: - typed.payload && typeof typed.payload === "object" - ? (typed.payload as Record) + event.payload && typeof event.payload === "object" + ? (event.payload as Record) : {}, }, GATEWAY_EVENT_RETAIN_LIMIT, ); - return; - } - if (typed.type !== "res" || typeof typed.id !== "string") { - return; - } - const match = pending.get(typed.id); - if (!match) { - return; - } - pending.delete(typed.id); - if (typed.ok === true) { - match.resolve(resolveGatewaySuccessPayload(typed)); - return; - } - match.reject( - new Error( - typed.error && typeof typed.error.message === "string" - ? typed.error.message - : "gateway request failed", - ), - ); - }); - - ws.once("close", (code, reason) => { - const error = new Error(`gateway closed (${code}): ${rawDataToString(reason)}`); - for (const entry of pending.values()) { - entry.reject(error); - } - pending.clear(); + }, + openTimeoutMs: GATEWAY_WS_OPEN_TIMEOUT_MS, + openTimeoutMessage: "gateway ws open timeout", + url: params.url, }); + await gatewayClient.waitOpen(); const sendGatewayRequest = ( method: string, requestParams: unknown, timeoutMs: number, ): Promise => { - const id = randomUUID(); - const frame = JSON.stringify({ - type: "req", - id, - method, - params: requestParams ?? {}, - }); - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - pending.delete(id); - reject(new Error(`gateway request timeout: ${method}`)); - }, timeoutMs); - timeout.unref?.(); - pending.set(id, { - resolve: (value) => { - clearTimeout(timeout); - resolve(value as T); - }, - reject: (error) => { - clearTimeout(timeout); - reject(error); - }, - }); - try { - ws.send(frame); - } catch (error) { - clearTimeout(timeout); - pending.delete(id); - reject(error instanceof Error ? error : new Error(String(error))); + return gatewayClient.request(method, requestParams ?? {}, timeoutMs).then((response) => { + if (response.ok) { + return resolveGatewaySuccessPayload(response) as T; } + throw new Error( + response.error && typeof response.error === "object" && "message" in response.error + ? String(response.error.message) + : "gateway request failed", + ); }); }; @@ -281,18 +205,7 @@ async function connectGatewayOnce(params: { }, events, async close() { - if (ws.readyState === WebSocket.CLOSED) { - return; - } - await new Promise((resolve) => { - const timeout = setTimeout(resolve, 2_000); - timeout.unref?.(); - ws.once("close", () => { - clearTimeout(timeout); - resolve(); - }); - ws.close(); - }); + gatewayClient.close(); }, }; } @@ -304,6 +217,7 @@ function isRetryableGatewayConnectError(error: Error): boolean { message.includes("gateway connect timeout") || message.includes("closed before open") || message.includes("gateway closed") || + message.includes("gateway websocket closed") || message.includes("econnrefused") || message.includes("socket hang up") ); diff --git a/scripts/lib/gateway-ws-client.ts b/scripts/lib/gateway-ws-client.ts new file mode 100644 index 000000000000..965066d5fc1d --- /dev/null +++ b/scripts/lib/gateway-ws-client.ts @@ -0,0 +1,181 @@ +import { randomUUID } from "node:crypto"; +import WebSocket from "ws"; + +export type GatewayReqFrame = { type: "req"; id: string; method: string; params?: unknown }; +export type GatewayResFrame = { + type: "res"; + id: string; + ok: boolean; + payload?: unknown; + error?: unknown; +}; +export type GatewayEventFrame = { type: "event"; event: string; seq?: number; payload?: unknown }; +export type GatewayFrame = + | GatewayReqFrame + | GatewayResFrame + | GatewayEventFrame + | { type: string; [key: string]: unknown }; + +export function createArgReader(argv = process.argv.slice(2)) { + const get = (flag: string) => { + const idx = argv.indexOf(flag); + if (idx !== -1 && idx + 1 < argv.length) { + return argv[idx + 1]; + } + return undefined; + }; + const has = (flag: string) => argv.includes(flag); + return { argv, get, has }; +} + +export function resolveGatewayUrl(urlRaw: string): URL { + const url = new URL(urlRaw.includes("://") ? urlRaw : `wss://${urlRaw}`); + if (!url.port) { + url.port = url.protocol === "wss:" ? "443" : "80"; + } + return url; +} + +function toText(data: WebSocket.RawData): string { + if (typeof data === "string") { + return data; + } + if (data instanceof ArrayBuffer) { + return Buffer.from(data).toString("utf8"); + } + if (Array.isArray(data)) { + return Buffer.concat(data.map((chunk) => Buffer.from(chunk))).toString("utf8"); + } + return Buffer.from(data as Buffer).toString("utf8"); +} + +export function createGatewayWsClient(params: { + url: string; + handshakeTimeoutMs?: number; + openTimeoutMs?: number; + openTimeoutMessage?: string; + onEvent?: (evt: GatewayEventFrame) => void; +}) { + const ws = new WebSocket(params.url, { handshakeTimeout: params.handshakeTimeoutMs ?? 8000 }); + const pending = new Map< + string, + { + resolve: (res: GatewayResFrame) => void; + reject: (err: Error) => void; + timeout: ReturnType; + } + >(); + + const rejectPending = (error: Error) => { + for (const waiter of pending.values()) { + clearTimeout(waiter.timeout); + waiter.reject(error); + } + pending.clear(); + }; + + const request = (method: string, paramsObj?: unknown, timeoutMs = 12_000) => + new Promise((resolve, reject) => { + if (ws.readyState !== WebSocket.OPEN) { + reject(new Error(`gateway websocket is not open for ${method}`)); + return; + } + const id = randomUUID(); + const frame: GatewayReqFrame = { type: "req", id, method, params: paramsObj }; + const timeout = setTimeout(() => { + pending.delete(id); + reject(new Error(`timeout waiting for ${method}`)); + }, timeoutMs); + pending.set(id, { resolve, reject, timeout }); + try { + ws.send(JSON.stringify(frame), (err) => { + if (!err) { + return; + } + const waiter = pending.get(id); + if (!waiter) { + return; + } + pending.delete(id); + clearTimeout(waiter.timeout); + waiter.reject(err instanceof Error ? err : new Error(String(err))); + }); + } catch (err) { + pending.delete(id); + clearTimeout(timeout); + reject(err instanceof Error ? err : new Error(String(err))); + } + }); + + const waitOpen = () => + new Promise((resolve, reject) => { + const cleanup = () => { + clearTimeout(t); + ws.off("open", onOpen); + ws.off("error", onError); + ws.off("close", onClose); + }; + const onOpen = () => { + cleanup(); + resolve(); + }; + const onError = (err: Error) => { + cleanup(); + reject(err instanceof Error ? err : new Error(String(err))); + }; + const onClose = (code: number, reason: Buffer) => { + cleanup(); + const suffix = reason.length > 0 ? `: ${reason.toString("utf8")}` : ""; + reject(new Error(`closed before open (${code})${suffix}`)); + }; + const t = setTimeout(() => { + cleanup(); + ws.terminate(); + reject(new Error(params.openTimeoutMessage ?? "ws open timeout")); + }, params.openTimeoutMs ?? 8000); + ws.once("open", onOpen); + ws.once("error", onError); + ws.once("close", onClose); + }); + + ws.on("message", (data) => { + const text = toText(data); + let frame: GatewayFrame | null; + try { + frame = JSON.parse(text) as GatewayFrame; + } catch { + return; + } + if (!frame || typeof frame !== "object" || !("type" in frame)) { + return; + } + if (frame.type === "res") { + const res = frame as GatewayResFrame; + const waiter = pending.get(res.id); + if (waiter) { + pending.delete(res.id); + clearTimeout(waiter.timeout); + waiter.resolve(res); + } + return; + } + if (frame.type === "event") { + const evt = frame as GatewayEventFrame; + params.onEvent?.(evt); + } + }); + ws.on("close", (code, reason) => { + const suffix = reason.length > 0 ? `: ${reason.toString("utf8")}` : ""; + rejectPending(new Error(`gateway websocket closed (${code})${suffix}`)); + }); + ws.on("error", (err) => { + rejectPending(err instanceof Error ? err : new Error(String(err))); + }); + + const close = () => { + rejectPending(new Error("gateway websocket client closed")); + ws.close(); + }; + + return { ws, request, waitOpen, close }; +} diff --git a/test/scripts/gateway-ws-client.test.ts b/test/scripts/gateway-ws-client.test.ts index e026c04bf0a8..b54bfffce602 100644 --- a/test/scripts/gateway-ws-client.test.ts +++ b/test/scripts/gateway-ws-client.test.ts @@ -101,6 +101,33 @@ describe("createGatewayWsClient", () => { client.close(); }); + it("rejects pending RPC requests when the socket errors", async () => { + const url = await listen(() => {}); + const client = createGatewayWsClient({ url }); + await client.waitOpen(); + + const pending = client.request("health", {}, 1000); + client.ws.emit("error", new Error("socket exploded")); + + await expect(pending).rejects.toThrow("socket exploded"); + client.close(); + }); + + it("rejects websocket closes before opening", async () => { + const stalled = await listenStalledUpgrade(); + const client = createGatewayWsClient({ openTimeoutMs: 1000, url: stalled.url }); + const opened = client.waitOpen(); + + client.ws.emit("close", 1006, Buffer.from("bye")); + + try { + await expect(opened).rejects.toThrow("closed before open (1006): bye"); + } finally { + client.close(); + await stalled.close(); + } + }); + it("terminates stalled websocket handshakes after the open timeout", async () => { const stalled = await listenStalledUpgrade(); const client = createGatewayWsClient({ openTimeoutMs: 5, url: stalled.url }); @@ -112,6 +139,21 @@ describe("createGatewayWsClient", () => { await stalled.close(); } }); + + it("uses caller-specific websocket open timeout messages", async () => { + const stalled = await listenStalledUpgrade(); + const client = createGatewayWsClient({ + openTimeoutMessage: "gateway ws open timeout", + openTimeoutMs: 5, + url: stalled.url, + }); + try { + await expect(client.waitOpen()).rejects.toThrow("gateway ws open timeout"); + } finally { + client.close(); + await stalled.close(); + } + }); }); async function waitFor(condition: () => boolean, timeoutMs = 1_000) {