fix(e2e): share gateway websocket request handling

This commit is contained in:
Vincent Koc
2026-06-04 08:23:31 +02:00
parent 88b27c378d
commit d0f05d98d2
5 changed files with 265 additions and 377 deletions

View File

@@ -1,173 +1 @@
import { randomUUID } from "node:crypto";
import WebSocket from "ws";
export type GatewayReqFrame = { type: "req"; id: string; method: string; params?: unknown };
export type GatewayResFrame = {
type: "res";
id: string;
ok: boolean;
payload?: unknown;
error?: unknown;
};
export type GatewayEventFrame = { type: "event"; event: string; seq?: number; payload?: unknown };
export type GatewayFrame =
| GatewayReqFrame
| GatewayResFrame
| GatewayEventFrame
| { type: string; [key: string]: unknown };
export function createArgReader(argv = process.argv.slice(2)) {
const get = (flag: string) => {
const idx = argv.indexOf(flag);
if (idx !== -1 && idx + 1 < argv.length) {
return argv[idx + 1];
}
return undefined;
};
const has = (flag: string) => argv.includes(flag);
return { argv, get, has };
}
export function resolveGatewayUrl(urlRaw: string): URL {
const url = new URL(urlRaw.includes("://") ? urlRaw : `wss://${urlRaw}`);
if (!url.port) {
url.port = url.protocol === "wss:" ? "443" : "80";
}
return url;
}
function toText(data: WebSocket.RawData): string {
if (typeof data === "string") {
return data;
}
if (data instanceof ArrayBuffer) {
return Buffer.from(data).toString("utf8");
}
if (Array.isArray(data)) {
return Buffer.concat(data.map((chunk) => Buffer.from(chunk))).toString("utf8");
}
return Buffer.from(data as Buffer).toString("utf8");
}
export function createGatewayWsClient(params: {
url: string;
handshakeTimeoutMs?: number;
openTimeoutMs?: number;
onEvent?: (evt: GatewayEventFrame) => void;
}) {
const ws = new WebSocket(params.url, { handshakeTimeout: params.handshakeTimeoutMs ?? 8000 });
const pending = new Map<
string,
{
resolve: (res: GatewayResFrame) => void;
reject: (err: Error) => void;
timeout: ReturnType<typeof setTimeout>;
}
>();
const rejectPending = (error: Error) => {
for (const waiter of pending.values()) {
clearTimeout(waiter.timeout);
waiter.reject(error);
}
pending.clear();
};
const request = (method: string, paramsObj?: unknown, timeoutMs = 12_000) =>
new Promise<GatewayResFrame>((resolve, reject) => {
if (ws.readyState !== WebSocket.OPEN) {
reject(new Error(`gateway websocket is not open for ${method}`));
return;
}
const id = randomUUID();
const frame: GatewayReqFrame = { type: "req", id, method, params: paramsObj };
const timeout = setTimeout(() => {
pending.delete(id);
reject(new Error(`timeout waiting for ${method}`));
}, timeoutMs);
pending.set(id, { resolve, reject, timeout });
try {
ws.send(JSON.stringify(frame), (err) => {
if (!err) {
return;
}
const waiter = pending.get(id);
if (!waiter) {
return;
}
pending.delete(id);
clearTimeout(waiter.timeout);
waiter.reject(err instanceof Error ? err : new Error(String(err)));
});
} catch (err) {
pending.delete(id);
clearTimeout(timeout);
reject(err instanceof Error ? err : new Error(String(err)));
}
});
const waitOpen = () =>
new Promise<void>((resolve, reject) => {
const cleanup = () => {
clearTimeout(t);
ws.off("open", onOpen);
ws.off("error", onError);
};
const onOpen = () => {
cleanup();
resolve();
};
const onError = (err: Error) => {
cleanup();
reject(err instanceof Error ? err : new Error(String(err)));
};
const t = setTimeout(() => {
cleanup();
ws.terminate();
reject(new Error("ws open timeout"));
}, params.openTimeoutMs ?? 8000);
ws.once("open", onOpen);
ws.once("error", onError);
});
ws.on("message", (data) => {
const text = toText(data);
let frame: GatewayFrame | null;
try {
frame = JSON.parse(text) as GatewayFrame;
} catch {
return;
}
if (!frame || typeof frame !== "object" || !("type" in frame)) {
return;
}
if (frame.type === "res") {
const res = frame as GatewayResFrame;
const waiter = pending.get(res.id);
if (waiter) {
pending.delete(res.id);
clearTimeout(waiter.timeout);
waiter.resolve(res);
}
return;
}
if (frame.type === "event") {
const evt = frame as GatewayEventFrame;
params.onEvent?.(evt);
}
});
ws.on("close", (code, reason) => {
const suffix = reason.length > 0 ? `: ${reason.toString("utf8")}` : "";
rejectPending(new Error(`gateway websocket closed (${code})${suffix}`));
});
ws.on("error", (err) => {
rejectPending(err instanceof Error ? err : new Error(String(err)));
});
const close = () => {
rejectPending(new Error("gateway websocket client closed"));
ws.close();
};
return { ws, request, waitOpen, close };
}
export * from "../lib/gateway-ws-client.ts";

View File

@@ -1,10 +1,9 @@
import { createHash, randomBytes, randomUUID } from "node:crypto";
import { setTimeout as delay } from "node:timers/promises";
import { WebSocket } from "ws";
import { PROTOCOL_VERSION } from "../../../../dist/gateway/protocol/index.js";
import { renderBitmapTextPngBase64 } from "../../../../test/helpers/live-image-probe.ts";
import { createGatewayWsClient } from "../../../lib/gateway-ws-client.ts";
import { resolveGatewaySuccessPayload } from "../gateway-frame-payload.mjs";
import { waitForWebSocketOpen } from "../websocket-open.mjs";
import { createJsonlRequestTailer } from "./jsonl-request-tail.mjs";
import { readPositiveIntEnv } from "./limits.mjs";
@@ -49,77 +48,26 @@ async function waitFor(label, predicate, timeoutMs) {
throw new Error(`timeout waiting for ${label}`);
}
function wsDataToString(data) {
if (typeof data === "string") {
return data;
}
if (Buffer.isBuffer(data)) {
return data.toString("utf8");
}
if (Array.isArray(data)) {
return Buffer.concat(data).toString("utf8");
}
return Buffer.from(data).toString("utf8");
}
async function connectGateway() {
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
await waitForWebSocketOpen(ws, 45_000, "gateway ws open timeout");
const pending = new Map();
ws.on("message", (data) => {
let frame;
try {
frame = JSON.parse(wsDataToString(data));
} catch {
return;
}
if (frame?.type === "event" && typeof frame.event === "string") {
return;
}
if (frame?.type !== "res" || typeof frame.id !== "string") {
return;
}
const match = pending.get(frame.id);
if (!match) {
return;
}
pending.delete(frame.id);
if (frame.ok === true) {
match.resolve(resolveGatewaySuccessPayload(frame));
return;
}
match.reject(new Error(frame.error?.message ?? "gateway request failed"));
});
ws.once("close", (code, reason) => {
const error = new Error(`gateway closed (${code}): ${wsDataToString(reason)}`);
for (const entry of pending.values()) {
entry.reject(error);
}
pending.clear();
const gatewayClient = createGatewayWsClient({
handshakeTimeoutMs: 45_000,
openTimeoutMs: 45_000,
openTimeoutMessage: "gateway ws open timeout",
url: `ws://127.0.0.1:${port}`,
});
await gatewayClient.waitOpen();
function request(method, params, opts = {}) {
const id = randomUUID();
async function request(method, params, opts = {}) {
const timeoutMs = opts.timeoutMs ?? 60_000;
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
pending.delete(id);
reject(new Error(`gateway request timeout: ${method}`));
}, timeoutMs);
timer.unref?.();
pending.set(id, {
resolve: (value) => {
clearTimeout(timer);
resolve(value);
},
reject: (error) => {
clearTimeout(timer);
reject(toLintErrorObject(error, "Non-Error rejection"));
},
});
ws.send(JSON.stringify({ type: "req", id, method, params: params ?? {} }));
});
const response = await gatewayClient.request(method, params ?? {}, timeoutMs);
if (response.ok) {
return resolveGatewaySuccessPayload(response);
}
throw new Error(
response.error && typeof response.error === "object" && "message" in response.error
? String(response.error.message)
: "gateway request failed",
);
}
await request(
@@ -146,18 +94,7 @@ async function connectGateway() {
return {
request,
async close() {
if (ws.readyState === WebSocket.CLOSED) {
return;
}
await new Promise((resolve) => {
const timer = setTimeout(resolve, 2_000);
timer.unref?.();
ws.once("close", () => {
clearTimeout(timer);
resolve();
});
ws.close();
});
gatewayClient.close();
},
};
}
@@ -237,17 +174,3 @@ try {
} finally {
await gateway.close();
}
function toLintErrorObject(value, fallbackMessage) {
if (value instanceof Error) {
return value;
}
if (typeof value === "string") {
return new Error(value);
}
const error = new Error(fallbackMessage, { cause: value });
if ((typeof value === "object" && value !== null) || typeof value === "function") {
Object.assign(error, value);
}
return error;
}

View File

@@ -1,22 +1,19 @@
// Shared MCP-channel Docker E2E harness helpers.
// The mounted test harness imports packaged dist modules so bridge assertions run
// against the OpenClaw npm tarball installed in the functional image.
import { randomUUID } from "node:crypto";
import process from "node:process";
import { setTimeout as delay } from "node:timers/promises";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { WebSocket } from "ws";
import { z } from "zod";
import { PROTOCOL_VERSION } from "../../dist/gateway/protocol/index.js";
import { formatErrorMessage } from "../../dist/infra/errors.js";
import { rawDataToString } from "../../dist/infra/ws.js";
import { readStringValue } from "../../dist/normalization-core/string-coerce.js";
import { createGatewayWsClient, type GatewayEventFrame } from "../lib/gateway-ws-client.ts";
import { resolveGatewaySuccessPayload } from "./lib/gateway-frame-payload.mjs";
import { readMcpChannelLimits } from "./mcp-channel-limits.ts";
import { createMcpClientTempState, type McpClientTempState } from "./mcp-client-temp-state.ts";
import { connectMcpWithTimeout } from "./mcp-connect-timeout.ts";
import { resolveGatewaySuccessPayload } from "./lib/gateway-frame-payload.mjs";
import { waitForWebSocketOpen } from "./mcp-websocket-open.ts";
export const ClaudeChannelNotificationSchema = z.object({
method: z.literal("notifications/claude/channel"),
@@ -136,116 +133,43 @@ async function connectGatewayOnce(params: {
url: string;
token: string;
}): Promise<GatewayRpcClient> {
const ws = new WebSocket(params.url);
await waitForWebSocketOpen(ws, GATEWAY_WS_OPEN_TIMEOUT_MS, "gateway ws open timeout");
const pending = new Map<
string,
{
resolve: (value: unknown) => void;
reject: (error: Error) => void;
}
>();
const requestedScopes = ["operator.read", "operator.write", "operator.pairing", "operator.admin"];
const events: Array<{ event: string; payload: Record<string, unknown> }> = [];
ws.on("message", (data) => {
let frame: unknown;
try {
frame = JSON.parse(rawDataToString(data));
} catch {
return;
}
if (!frame || typeof frame !== "object") {
return;
}
const typed = frame as {
type?: unknown;
event?: unknown;
payload?: unknown;
id?: unknown;
ok?: unknown;
result?: unknown;
error?: { message?: unknown } | null;
};
if (typed.type === "event" && typeof typed.event === "string") {
const gatewayClient = createGatewayWsClient({
handshakeTimeoutMs: GATEWAY_WS_OPEN_TIMEOUT_MS,
onEvent(event: GatewayEventFrame) {
pushBounded(
events,
{
event: typed.event,
event: event.event,
payload:
typed.payload && typeof typed.payload === "object"
? (typed.payload as Record<string, unknown>)
event.payload && typeof event.payload === "object"
? (event.payload as Record<string, unknown>)
: {},
},
GATEWAY_EVENT_RETAIN_LIMIT,
);
return;
}
if (typed.type !== "res" || typeof typed.id !== "string") {
return;
}
const match = pending.get(typed.id);
if (!match) {
return;
}
pending.delete(typed.id);
if (typed.ok === true) {
match.resolve(resolveGatewaySuccessPayload(typed));
return;
}
match.reject(
new Error(
typed.error && typeof typed.error.message === "string"
? typed.error.message
: "gateway request failed",
),
);
});
ws.once("close", (code, reason) => {
const error = new Error(`gateway closed (${code}): ${rawDataToString(reason)}`);
for (const entry of pending.values()) {
entry.reject(error);
}
pending.clear();
},
openTimeoutMs: GATEWAY_WS_OPEN_TIMEOUT_MS,
openTimeoutMessage: "gateway ws open timeout",
url: params.url,
});
await gatewayClient.waitOpen();
const sendGatewayRequest = <T = unknown>(
method: string,
requestParams: unknown,
timeoutMs: number,
): Promise<T> => {
const id = randomUUID();
const frame = JSON.stringify({
type: "req",
id,
method,
params: requestParams ?? {},
});
return new Promise<T>((resolve, reject) => {
const timeout = setTimeout(() => {
pending.delete(id);
reject(new Error(`gateway request timeout: ${method}`));
}, timeoutMs);
timeout.unref?.();
pending.set(id, {
resolve: (value) => {
clearTimeout(timeout);
resolve(value as T);
},
reject: (error) => {
clearTimeout(timeout);
reject(error);
},
});
try {
ws.send(frame);
} catch (error) {
clearTimeout(timeout);
pending.delete(id);
reject(error instanceof Error ? error : new Error(String(error)));
return gatewayClient.request(method, requestParams ?? {}, timeoutMs).then((response) => {
if (response.ok) {
return resolveGatewaySuccessPayload(response) as T;
}
throw new Error(
response.error && typeof response.error === "object" && "message" in response.error
? String(response.error.message)
: "gateway request failed",
);
});
};
@@ -281,18 +205,7 @@ async function connectGatewayOnce(params: {
},
events,
async close() {
if (ws.readyState === WebSocket.CLOSED) {
return;
}
await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, 2_000);
timeout.unref?.();
ws.once("close", () => {
clearTimeout(timeout);
resolve();
});
ws.close();
});
gatewayClient.close();
},
};
}
@@ -304,6 +217,7 @@ function isRetryableGatewayConnectError(error: Error): boolean {
message.includes("gateway connect timeout") ||
message.includes("closed before open") ||
message.includes("gateway closed") ||
message.includes("gateway websocket closed") ||
message.includes("econnrefused") ||
message.includes("socket hang up")
);

View File

@@ -0,0 +1,181 @@
import { randomUUID } from "node:crypto";
import WebSocket from "ws";
export type GatewayReqFrame = { type: "req"; id: string; method: string; params?: unknown };
export type GatewayResFrame = {
type: "res";
id: string;
ok: boolean;
payload?: unknown;
error?: unknown;
};
export type GatewayEventFrame = { type: "event"; event: string; seq?: number; payload?: unknown };
export type GatewayFrame =
| GatewayReqFrame
| GatewayResFrame
| GatewayEventFrame
| { type: string; [key: string]: unknown };
export function createArgReader(argv = process.argv.slice(2)) {
const get = (flag: string) => {
const idx = argv.indexOf(flag);
if (idx !== -1 && idx + 1 < argv.length) {
return argv[idx + 1];
}
return undefined;
};
const has = (flag: string) => argv.includes(flag);
return { argv, get, has };
}
export function resolveGatewayUrl(urlRaw: string): URL {
const url = new URL(urlRaw.includes("://") ? urlRaw : `wss://${urlRaw}`);
if (!url.port) {
url.port = url.protocol === "wss:" ? "443" : "80";
}
return url;
}
function toText(data: WebSocket.RawData): string {
if (typeof data === "string") {
return data;
}
if (data instanceof ArrayBuffer) {
return Buffer.from(data).toString("utf8");
}
if (Array.isArray(data)) {
return Buffer.concat(data.map((chunk) => Buffer.from(chunk))).toString("utf8");
}
return Buffer.from(data as Buffer).toString("utf8");
}
export function createGatewayWsClient(params: {
url: string;
handshakeTimeoutMs?: number;
openTimeoutMs?: number;
openTimeoutMessage?: string;
onEvent?: (evt: GatewayEventFrame) => void;
}) {
const ws = new WebSocket(params.url, { handshakeTimeout: params.handshakeTimeoutMs ?? 8000 });
const pending = new Map<
string,
{
resolve: (res: GatewayResFrame) => void;
reject: (err: Error) => void;
timeout: ReturnType<typeof setTimeout>;
}
>();
const rejectPending = (error: Error) => {
for (const waiter of pending.values()) {
clearTimeout(waiter.timeout);
waiter.reject(error);
}
pending.clear();
};
const request = (method: string, paramsObj?: unknown, timeoutMs = 12_000) =>
new Promise<GatewayResFrame>((resolve, reject) => {
if (ws.readyState !== WebSocket.OPEN) {
reject(new Error(`gateway websocket is not open for ${method}`));
return;
}
const id = randomUUID();
const frame: GatewayReqFrame = { type: "req", id, method, params: paramsObj };
const timeout = setTimeout(() => {
pending.delete(id);
reject(new Error(`timeout waiting for ${method}`));
}, timeoutMs);
pending.set(id, { resolve, reject, timeout });
try {
ws.send(JSON.stringify(frame), (err) => {
if (!err) {
return;
}
const waiter = pending.get(id);
if (!waiter) {
return;
}
pending.delete(id);
clearTimeout(waiter.timeout);
waiter.reject(err instanceof Error ? err : new Error(String(err)));
});
} catch (err) {
pending.delete(id);
clearTimeout(timeout);
reject(err instanceof Error ? err : new Error(String(err)));
}
});
const waitOpen = () =>
new Promise<void>((resolve, reject) => {
const cleanup = () => {
clearTimeout(t);
ws.off("open", onOpen);
ws.off("error", onError);
ws.off("close", onClose);
};
const onOpen = () => {
cleanup();
resolve();
};
const onError = (err: Error) => {
cleanup();
reject(err instanceof Error ? err : new Error(String(err)));
};
const onClose = (code: number, reason: Buffer) => {
cleanup();
const suffix = reason.length > 0 ? `: ${reason.toString("utf8")}` : "";
reject(new Error(`closed before open (${code})${suffix}`));
};
const t = setTimeout(() => {
cleanup();
ws.terminate();
reject(new Error(params.openTimeoutMessage ?? "ws open timeout"));
}, params.openTimeoutMs ?? 8000);
ws.once("open", onOpen);
ws.once("error", onError);
ws.once("close", onClose);
});
ws.on("message", (data) => {
const text = toText(data);
let frame: GatewayFrame | null;
try {
frame = JSON.parse(text) as GatewayFrame;
} catch {
return;
}
if (!frame || typeof frame !== "object" || !("type" in frame)) {
return;
}
if (frame.type === "res") {
const res = frame as GatewayResFrame;
const waiter = pending.get(res.id);
if (waiter) {
pending.delete(res.id);
clearTimeout(waiter.timeout);
waiter.resolve(res);
}
return;
}
if (frame.type === "event") {
const evt = frame as GatewayEventFrame;
params.onEvent?.(evt);
}
});
ws.on("close", (code, reason) => {
const suffix = reason.length > 0 ? `: ${reason.toString("utf8")}` : "";
rejectPending(new Error(`gateway websocket closed (${code})${suffix}`));
});
ws.on("error", (err) => {
rejectPending(err instanceof Error ? err : new Error(String(err)));
});
const close = () => {
rejectPending(new Error("gateway websocket client closed"));
ws.close();
};
return { ws, request, waitOpen, close };
}

View File

@@ -101,6 +101,33 @@ describe("createGatewayWsClient", () => {
client.close();
});
it("rejects pending RPC requests when the socket errors", async () => {
const url = await listen(() => {});
const client = createGatewayWsClient({ url });
await client.waitOpen();
const pending = client.request("health", {}, 1000);
client.ws.emit("error", new Error("socket exploded"));
await expect(pending).rejects.toThrow("socket exploded");
client.close();
});
it("rejects websocket closes before opening", async () => {
const stalled = await listenStalledUpgrade();
const client = createGatewayWsClient({ openTimeoutMs: 1000, url: stalled.url });
const opened = client.waitOpen();
client.ws.emit("close", 1006, Buffer.from("bye"));
try {
await expect(opened).rejects.toThrow("closed before open (1006): bye");
} finally {
client.close();
await stalled.close();
}
});
it("terminates stalled websocket handshakes after the open timeout", async () => {
const stalled = await listenStalledUpgrade();
const client = createGatewayWsClient({ openTimeoutMs: 5, url: stalled.url });
@@ -112,6 +139,21 @@ describe("createGatewayWsClient", () => {
await stalled.close();
}
});
it("uses caller-specific websocket open timeout messages", async () => {
const stalled = await listenStalledUpgrade();
const client = createGatewayWsClient({
openTimeoutMessage: "gateway ws open timeout",
openTimeoutMs: 5,
url: stalled.url,
});
try {
await expect(client.waitOpen()).rejects.toThrow("gateway ws open timeout");
} finally {
client.close();
await stalled.close();
}
});
});
async function waitFor(condition: () => boolean, timeoutMs = 1_000) {