mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(whastapp): bound connection startup waits (#90486)
* fix: add timeout to waitForWaConnection to prevent indefinite hangs If Baileys fails to emit a 'connection.update' event with either 'open' or 'close' status (e.g. due to network issues or internal errors), the waitForWaConnection promise hangs forever, blocking the entire monitor loop. Add a configurable timeout (default 60s) that rejects the promise and cleans up the event listener if no connection state is received in time. The timeout is backward-compatible as an optional parameter with a sensible default. * test: add coverage for waitForWaConnection timeout path - Test that promise rejects with descriptive error after timeout - Test that event listener is cleaned up after timeout - Test that timer is cleared when connection opens before timeout * fix: default timeoutMs to 0 to preserve QR login behavior The 60s default broke the QR login flow in login-qr.ts, which calls waitForWaConnection without a timeout and expects to wait up to 3 minutes while the user scans. Change the default to 0 (wait forever, matching original behavior) and pass the 60s timeout explicitly at the monitor callsite where it's actually needed. * fix: bound whatsapp connection startup waits * fix: align web channel wait contract * fix: retry whatsapp setup timeouts * fix: satisfy whatsapp status lint * fix: preserve whatsapp wait compatibility --------- Co-authored-by: MMMMSSSS8899 <praelovk@gmail.com>
This commit is contained in:
@@ -26,6 +26,7 @@ import {
|
||||
setRuntimeConfigSourceSnapshotMock,
|
||||
startWebAutoReplyMonitor,
|
||||
} from "./auto-reply.test-harness.js";
|
||||
import { waitForWaConnection } from "./session.js";
|
||||
|
||||
type DrainSelectionEntry = {
|
||||
channel: string;
|
||||
@@ -225,6 +226,33 @@ describe("web auto-reply connection", () => {
|
||||
expectErrorContaining(runtime.error, "2/2 attempts");
|
||||
});
|
||||
|
||||
it("retries opening-phase connection wait timeouts through the reconnect policy", async () => {
|
||||
vi.mocked(waitForWaConnection).mockRejectedValueOnce({ output: { statusCode: 408 } });
|
||||
const listenerFactory = vi.fn(async () => createMockWebListener());
|
||||
const sleep = vi.fn(async () => {});
|
||||
const { runtime, controller, run } = startWebAutoReplyMonitor({
|
||||
monitorWebChannelFn: monitorWebChannel as never,
|
||||
listenerFactory,
|
||||
sleep,
|
||||
reconnect: { initialMs: 10, maxMs: 10, maxAttempts: 2, factor: 1.1 },
|
||||
});
|
||||
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(listenerFactory).toHaveBeenCalledTimes(1);
|
||||
},
|
||||
{ timeout: 250, interval: 2 },
|
||||
);
|
||||
controller.abort();
|
||||
await run;
|
||||
|
||||
expect(waitForWaConnection).toHaveBeenCalledTimes(2);
|
||||
expect(listenerFactory).toHaveBeenCalledTimes(1);
|
||||
expect(sleep).toHaveBeenCalled();
|
||||
expectErrorContaining(runtime.error, "status 408");
|
||||
expectErrorContaining(runtime.error, "Retry 1/2");
|
||||
});
|
||||
|
||||
it("keeps post-open Baileys 428 on the reconnect path", async () => {
|
||||
const sleep = vi.fn(async () => {});
|
||||
const scripted = createScriptedWebListenerFactory();
|
||||
|
||||
@@ -32,13 +32,7 @@ import {
|
||||
resolveReconnectPolicy,
|
||||
sleepWithAbort,
|
||||
} from "../reconnect.js";
|
||||
import {
|
||||
formatError,
|
||||
getStatusCode,
|
||||
getWebAuthAgeMs,
|
||||
logoutWeb,
|
||||
readWebSelfId,
|
||||
} from "../session.js";
|
||||
import { formatError, getWebAuthAgeMs, logoutWeb, readWebSelfId } from "../session.js";
|
||||
import { resolveWhatsAppSocketTiming } from "../socket-timing.js";
|
||||
import { getRuntimeConfig, getRuntimeConfigSourceSnapshot } from "./config.runtime.js";
|
||||
import { whatsappHeartbeatLog, whatsappLog } from "./loggers.js";
|
||||
@@ -407,45 +401,73 @@ export async function monitorWebChannel(
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
if (getStatusCode(error) === 428) {
|
||||
const retryDecision = controller.consumeReconnectAttempt();
|
||||
statusController.noteReconnectAttempts(retryDecision.reconnectAttempts);
|
||||
const setupDecision = controller.resolveSetupErrorDecision(error);
|
||||
if (setupDecision === "aborted") {
|
||||
await controller.shutdown();
|
||||
break;
|
||||
}
|
||||
if (setupDecision) {
|
||||
statusController.noteReconnectAttempts(setupDecision.reconnectAttempts);
|
||||
statusController.noteClose({
|
||||
statusCode: 428,
|
||||
statusCode: setupDecision.normalized.statusCode,
|
||||
error: formatError(error),
|
||||
reconnectAttempts: retryDecision.reconnectAttempts,
|
||||
healthState: retryDecision.healthState,
|
||||
reconnectAttempts: setupDecision.reconnectAttempts,
|
||||
healthState: setupDecision.healthState,
|
||||
});
|
||||
if (retryDecision.action === "stop") {
|
||||
if (setupDecision.action === "stop") {
|
||||
reconnectLogger.warn(
|
||||
{
|
||||
connectionId,
|
||||
status: 428,
|
||||
reconnectAttempts: retryDecision.reconnectAttempts,
|
||||
status: setupDecision.normalized.statusLabel,
|
||||
reconnectAttempts: setupDecision.reconnectAttempts,
|
||||
maxAttempts: reconnectPolicy.maxAttempts,
|
||||
},
|
||||
"web reconnect: 428 during opening; max attempts reached",
|
||||
);
|
||||
runtime.error(
|
||||
`WhatsApp Web connection closed during setup (status 428) after ${retryDecision.reconnectAttempts}/${reconnectPolicy.maxAttempts} attempts. Relink with \`${formatCliCommand("openclaw channels login --channel whatsapp")}\` if the issue persists.`,
|
||||
"web reconnect: setup status error; max attempts reached",
|
||||
);
|
||||
if (setupDecision.healthState === "logged-out") {
|
||||
await clearTerminalWebAuthState({
|
||||
account,
|
||||
runtime,
|
||||
statusLabel: setupDecision.normalized.statusLabel,
|
||||
healthState: setupDecision.healthState,
|
||||
log: reconnectLogger,
|
||||
});
|
||||
runtime.error(
|
||||
`WhatsApp session logged out during setup. Run \`${formatCliCommand("openclaw channels login --channel whatsapp")}\` to relink.`,
|
||||
);
|
||||
} else if (setupDecision.healthState === "conflict") {
|
||||
await clearTerminalWebAuthState({
|
||||
account,
|
||||
runtime,
|
||||
statusLabel: setupDecision.normalized.statusLabel,
|
||||
healthState: setupDecision.healthState,
|
||||
log: reconnectLogger,
|
||||
});
|
||||
runtime.error(
|
||||
`WhatsApp Web connection closed during setup (status ${setupDecision.normalized.statusLabel}: session conflict). Resolve conflicting WhatsApp Web sessions, then relink with \`${formatCliCommand("openclaw channels login --channel whatsapp")}\`. Stopping web monitoring.`,
|
||||
);
|
||||
} else {
|
||||
runtime.error(
|
||||
`WhatsApp Web connection closed during setup (status ${setupDecision.normalized.statusLabel}) after ${setupDecision.reconnectAttempts}/${reconnectPolicy.maxAttempts} attempts. Relink with \`${formatCliCommand("openclaw channels login --channel whatsapp")}\` if the issue persists.`,
|
||||
);
|
||||
}
|
||||
await controller.shutdown();
|
||||
break;
|
||||
}
|
||||
reconnectLogger.info(
|
||||
{
|
||||
connectionId,
|
||||
status: 428,
|
||||
reconnectAttempts: retryDecision.reconnectAttempts,
|
||||
delayMs: retryDecision.delayMs,
|
||||
status: setupDecision.normalized.statusLabel,
|
||||
reconnectAttempts: setupDecision.reconnectAttempts,
|
||||
delayMs: setupDecision.delayMs,
|
||||
},
|
||||
"web reconnect: 428 during opening; retrying",
|
||||
"web reconnect: setup status error; retrying",
|
||||
);
|
||||
runtime.error(
|
||||
`WhatsApp Web connection closed during setup (status 428). Retry ${retryDecision.reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDurationPrecise(retryDecision.delayMs ?? 0)}.`,
|
||||
`WhatsApp Web connection closed during setup (status ${setupDecision.normalized.statusLabel}). Retry ${setupDecision.reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDurationPrecise(setupDecision.delayMs ?? 0)}.`,
|
||||
);
|
||||
try {
|
||||
await controller.waitBeforeRetry(retryDecision.delayMs ?? 0);
|
||||
await controller.waitBeforeRetry(setupDecision.delayMs ?? 0);
|
||||
} catch {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
} from "./connection-controller.js";
|
||||
import type { WhatsAppSendKind, WhatsAppSendResult } from "./inbound/send-result.js";
|
||||
import { createWaSocket, waitForWaConnection } from "./session.js";
|
||||
import { DEFAULT_WHATSAPP_SOCKET_TIMING } from "./socket-timing.js";
|
||||
|
||||
vi.mock("./session.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("./session.js")>("./session.js");
|
||||
@@ -130,6 +131,9 @@ describe("WhatsAppConnectionController", () => {
|
||||
});
|
||||
|
||||
expect(callOrder).toEqual(["create", "wait-for-connection"]);
|
||||
expect(waitForWaConnectionMock).toHaveBeenCalledWith(expect.anything(), {
|
||||
timeoutMs: DEFAULT_WHATSAPP_SOCKET_TIMING.connectTimeoutMs,
|
||||
});
|
||||
});
|
||||
|
||||
it("restarts login once on status 408 and preserves replacement socket options", async () => {
|
||||
@@ -180,6 +184,8 @@ describe("WhatsAppConnectionController", () => {
|
||||
});
|
||||
expect(onQr).toHaveBeenCalledWith("qr-after-timeout");
|
||||
expect(onSocketReplaced).toHaveBeenCalledWith(replacementSock);
|
||||
expect(waitForConnection).toHaveBeenNthCalledWith(1, initialSock, { timeout: "none" });
|
||||
expect(waitForConnection).toHaveBeenNthCalledWith(2, replacementSock, { timeout: "none" });
|
||||
});
|
||||
|
||||
it("still honors the post-pairing 515 restart after a status 408 recovery", async () => {
|
||||
@@ -213,6 +219,11 @@ describe("WhatsAppConnectionController", () => {
|
||||
});
|
||||
expect(createSocket).toHaveBeenCalledTimes(2);
|
||||
expect(waitForConnection).toHaveBeenCalledTimes(3);
|
||||
expect(waitForConnection).toHaveBeenNthCalledWith(1, initialSock, { timeout: "none" });
|
||||
expect(waitForConnection).toHaveBeenNthCalledWith(2, afterTimeoutSock, { timeout: "none" });
|
||||
expect(waitForConnection).toHaveBeenNthCalledWith(3, afterPairingRestartSock, {
|
||||
timeout: "none",
|
||||
});
|
||||
expect(initialSock.end).toHaveBeenCalledOnce();
|
||||
expect(afterTimeoutSock.end).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { DisconnectReason, type WASocket } from "baileys";
|
||||
import type { WASocket } from "baileys";
|
||||
import { info } from "openclaw/plugin-sdk/runtime-env";
|
||||
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
|
||||
import {
|
||||
@@ -14,11 +14,14 @@ import {
|
||||
logoutWeb,
|
||||
waitForWaConnection,
|
||||
} from "./session.js";
|
||||
import type { WhatsAppSocketTimingOptions } from "./socket-timing.js";
|
||||
import {
|
||||
DEFAULT_WHATSAPP_SOCKET_TIMING,
|
||||
type WhatsAppSocketTimingOptions,
|
||||
} from "./socket-timing.js";
|
||||
|
||||
const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401;
|
||||
const LOGGED_OUT_STATUS = 401;
|
||||
const POST_PAIRING_RESTART_STATUS = 515;
|
||||
const TIMED_OUT_STATUS = DisconnectReason?.timedOut ?? 408;
|
||||
const TIMED_OUT_STATUS = 408;
|
||||
const WHATSAPP_LOGIN_RESTART_MESSAGE =
|
||||
"WhatsApp asked for a restart after pairing (code 515); waiting for creds to save…";
|
||||
const WHATSAPP_LOGIN_TIMEOUT_RESTART_MESSAGE =
|
||||
@@ -228,7 +231,7 @@ export async function waitForWhatsAppLoginResult(params: {
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
await wait(currentSock);
|
||||
await wait(currentSock, { timeout: "none" });
|
||||
return {
|
||||
outcome: "connected",
|
||||
restarted: postPairingRestarted || timeoutRestarted,
|
||||
@@ -306,7 +309,7 @@ export class WhatsAppConnectionController {
|
||||
private readonly abortSignal?: AbortSignal;
|
||||
private readonly sleep: (ms: number, signal?: AbortSignal) => Promise<void>;
|
||||
private readonly isNonRetryableStatus: (statusCode: unknown) => boolean;
|
||||
private readonly socketTiming: WhatsAppSocketTimingOptions;
|
||||
private readonly socketTiming: Required<WhatsAppSocketTimingOptions>;
|
||||
private readonly abortPromise?: Promise<"aborted">;
|
||||
private readonly disconnectRetryController = new AbortController();
|
||||
|
||||
@@ -342,7 +345,10 @@ export class WhatsAppConnectionController {
|
||||
this.abortSignal = params.abortSignal;
|
||||
this.sleep = params.sleep ?? ((ms: number, signal?: AbortSignal) => sleepWithAbort(ms, signal));
|
||||
this.isNonRetryableStatus = params.isNonRetryableStatus ?? (() => false);
|
||||
this.socketTiming = params.socketTiming ?? {};
|
||||
this.socketTiming = {
|
||||
...DEFAULT_WHATSAPP_SOCKET_TIMING,
|
||||
...params.socketTiming,
|
||||
};
|
||||
this.socketRef = { current: null };
|
||||
this.abortPromise =
|
||||
params.abortSignal &&
|
||||
@@ -445,7 +451,7 @@ export class WhatsAppConnectionController {
|
||||
authDir: this.authDir,
|
||||
...this.socketTiming,
|
||||
});
|
||||
await waitForWaConnection(sock);
|
||||
await waitForWaConnection(sock, { timeoutMs: this.socketTiming.connectTimeoutMs });
|
||||
|
||||
this.socketRef.current = sock;
|
||||
const placeholderListener = {} as ManagedWhatsAppListener;
|
||||
@@ -565,6 +571,19 @@ export class WhatsAppConnectionController {
|
||||
};
|
||||
}
|
||||
|
||||
resolveSetupErrorDecision(error: unknown): WhatsAppConnectionCloseDecision | "aborted" | null {
|
||||
const statusCode = getStatusCode(error);
|
||||
if (typeof statusCode !== "number") {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this.resolveCloseDecision({
|
||||
status: statusCode,
|
||||
isLoggedOut: statusCode === LOGGED_OUT_STATUS,
|
||||
error,
|
||||
});
|
||||
}
|
||||
|
||||
consumeReconnectAttempt(): WhatsAppReconnectAttemptDecision {
|
||||
this.reconnectAttempts += 1;
|
||||
if (
|
||||
|
||||
@@ -221,6 +221,7 @@ vi.mock("./session.js", async () => {
|
||||
let monitorWebInbox: typeof import("./inbound.js").monitorWebInbox;
|
||||
let resetWebInboundDedupe: typeof import("./inbound.js").resetWebInboundDedupe;
|
||||
let createWaSocket: typeof import("./session.js").createWaSocket;
|
||||
let waitForWaConnection: typeof import("./session.js").waitForWaConnection;
|
||||
|
||||
async function waitForMessage(onMessage: ReturnType<typeof vi.fn>) {
|
||||
await vi.waitFor(() => expect(onMessage).toHaveBeenCalledTimes(1), {
|
||||
@@ -262,7 +263,7 @@ describe("web inbound media saves with extension", () => {
|
||||
beforeAll(async () => {
|
||||
await fs.rm(HOME, { recursive: true, force: true });
|
||||
({ monitorWebInbox, resetWebInboundDedupe } = await import("./inbound.js"));
|
||||
({ createWaSocket } = await import("./session.js"));
|
||||
({ createWaSocket, waitForWaConnection } = await import("./session.js"));
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
@@ -274,6 +275,30 @@ describe("web inbound media saves with extension", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("closes the socket when connection wait fails before inbox attach", async () => {
|
||||
const error = new Error("connection timeout");
|
||||
vi.mocked(waitForWaConnection).mockRejectedValueOnce(error);
|
||||
|
||||
await expect(
|
||||
monitorWebInbox({
|
||||
cfg: {
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
messages: { messagePrefix: undefined, responsePrefix: undefined },
|
||||
web: { whatsapp: { connectTimeoutMs: 12_345 } },
|
||||
} as never,
|
||||
verbose: false,
|
||||
onMessage: vi.fn(),
|
||||
accountId: "default",
|
||||
authDir: path.join(HOME, "wa-auth"),
|
||||
}),
|
||||
).rejects.toThrow("connection timeout");
|
||||
|
||||
expect(vi.mocked(waitForWaConnection)).toHaveBeenCalledWith(currentMockSocket, {
|
||||
timeoutMs: 12_345,
|
||||
});
|
||||
expect(currentMockSocket?.ws.close).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("stores image extension and keeps document filename", async () => {
|
||||
const onMessage = vi.fn();
|
||||
const listener = await monitorWebInbox({
|
||||
|
||||
@@ -1294,11 +1294,17 @@ export async function attachWebInboxToSocket(
|
||||
}
|
||||
|
||||
export async function monitorWebInbox(options: MonitorWebInboxOptions) {
|
||||
const socketTiming = resolveWhatsAppSocketTiming(options.cfg);
|
||||
const sock = await createWaSocket(false, options.verbose, {
|
||||
authDir: options.authDir,
|
||||
...resolveWhatsAppSocketTiming(options.cfg),
|
||||
...socketTiming,
|
||||
});
|
||||
await waitForWaConnection(sock);
|
||||
try {
|
||||
await waitForWaConnection(sock, { timeoutMs: socketTiming.connectTimeoutMs });
|
||||
} catch (err) {
|
||||
closeInboundMonitorSocket(sock);
|
||||
throw err;
|
||||
}
|
||||
return attachWebInboxToSocket({
|
||||
...options,
|
||||
sock,
|
||||
|
||||
@@ -87,8 +87,7 @@ describe("startWhatsAppQaDriverSession", () => {
|
||||
await session.close();
|
||||
});
|
||||
|
||||
it("clears the connection timeout after a successful connection", async () => {
|
||||
vi.useFakeTimers();
|
||||
it("passes the connection timeout to the shared connection waiter", async () => {
|
||||
const sock = createMockSocket();
|
||||
mocks.createWaSocket.mockResolvedValue(sock);
|
||||
mocks.waitForWaConnection.mockResolvedValue(undefined);
|
||||
@@ -98,30 +97,26 @@ describe("startWhatsAppQaDriverSession", () => {
|
||||
connectionTimeoutMs: 45_000,
|
||||
});
|
||||
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
expect(mocks.waitForWaConnection).toHaveBeenCalledWith(sock, { timeoutMs: 45_000 });
|
||||
|
||||
await session.close();
|
||||
});
|
||||
|
||||
it("closes the socket and removes listeners when connection setup times out", async () => {
|
||||
vi.useFakeTimers();
|
||||
const sock = createMockSocket();
|
||||
const timeoutError = new Error("timed out waiting for WhatsApp QA driver session");
|
||||
mocks.createWaSocket.mockResolvedValue(sock);
|
||||
mocks.waitForWaConnection.mockReturnValue(new Promise(() => {}));
|
||||
mocks.waitForWaConnection.mockRejectedValue(timeoutError);
|
||||
|
||||
const started = startWhatsAppQaDriverSession({
|
||||
authDir: "/tmp/openclaw-whatsapp-auth",
|
||||
connectionTimeoutMs: 10,
|
||||
});
|
||||
const rejection = started.catch((error: unknown) => error);
|
||||
await expect(
|
||||
startWhatsAppQaDriverSession({
|
||||
authDir: "/tmp/openclaw-whatsapp-auth",
|
||||
connectionTimeoutMs: 10,
|
||||
}),
|
||||
).rejects.toThrow("timed out waiting for WhatsApp QA driver session");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
|
||||
const error = await rejection;
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect((error as Error).message).toContain("timed out waiting for WhatsApp QA driver session");
|
||||
expect(mocks.waitForWaConnection).toHaveBeenCalledWith(sock, { timeoutMs: 10 });
|
||||
expect(sock.ev.listenerCount("messages.upsert")).toBe(0);
|
||||
expect(sock.end).toHaveBeenCalledOnce();
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -126,27 +126,13 @@ export async function startWhatsAppQaDriverSession(params: {
|
||||
};
|
||||
|
||||
sock.ev.on("messages.upsert", onMessagesUpsert);
|
||||
let connectionTimeout: NodeJS.Timeout | undefined;
|
||||
try {
|
||||
await Promise.race([
|
||||
waitForWaConnection(sock),
|
||||
new Promise<never>((_, reject) => {
|
||||
connectionTimeout = setTimeout(
|
||||
() => reject(new Error("timed out waiting for WhatsApp QA driver session")),
|
||||
params.connectionTimeoutMs ?? 45_000,
|
||||
);
|
||||
connectionTimeout.unref?.();
|
||||
}),
|
||||
]);
|
||||
await waitForWaConnection(sock, { timeoutMs: params.connectionTimeoutMs ?? 45_000 });
|
||||
} catch (error) {
|
||||
closeSessionResources(
|
||||
error instanceof Error ? error : new Error("failed starting WhatsApp QA driver session"),
|
||||
);
|
||||
throw error;
|
||||
} finally {
|
||||
if (connectionTimeout) {
|
||||
clearTimeout(connectionTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
const sendApi = createWebSendApi({
|
||||
|
||||
@@ -508,6 +508,16 @@ describe("web session", () => {
|
||||
});
|
||||
|
||||
it("waits for connection open", async () => {
|
||||
const ev = new EventEmitter();
|
||||
const promise = waitForWaConnection(
|
||||
{ ev } as unknown as ReturnType<typeof baileys.makeWASocket>,
|
||||
{ timeout: "none" },
|
||||
);
|
||||
ev.emit("connection.update", { connection: "open" });
|
||||
await expect(promise).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("keeps one-argument callers on the old no-timeout wait policy", async () => {
|
||||
const ev = new EventEmitter();
|
||||
const promise = waitForWaConnection({ ev } as unknown as ReturnType<
|
||||
typeof baileys.makeWASocket
|
||||
@@ -518,9 +528,10 @@ describe("web session", () => {
|
||||
|
||||
it("rejects when connection closes", async () => {
|
||||
const ev = new EventEmitter();
|
||||
const promise = waitForWaConnection({ ev } as unknown as ReturnType<
|
||||
typeof baileys.makeWASocket
|
||||
>);
|
||||
const promise = waitForWaConnection(
|
||||
{ ev } as unknown as ReturnType<typeof baileys.makeWASocket>,
|
||||
{ timeout: "none" },
|
||||
);
|
||||
ev.emit("connection.update", {
|
||||
connection: "close",
|
||||
lastDisconnect: new Error("bye"),
|
||||
@@ -528,6 +539,35 @@ describe("web session", () => {
|
||||
await expect(promise).rejects.toBeInstanceOf(Error);
|
||||
});
|
||||
|
||||
it("rejects after timeout with no connection event", async () => {
|
||||
vi.useFakeTimers();
|
||||
const ev = new EventEmitter();
|
||||
const promise = waitForWaConnection(
|
||||
{ ev } as unknown as ReturnType<typeof baileys.makeWASocket>,
|
||||
{ timeoutMs: 100 },
|
||||
);
|
||||
vi.advanceTimersByTime(100);
|
||||
const error = await promise.catch((err: unknown) => err);
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect((error as Error).message).toContain("timed out after 100ms");
|
||||
expect(error).toMatchObject({ output: { statusCode: 408 } });
|
||||
expect(ev.listenerCount("connection.update")).toBe(0);
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("clears timeout when connection opens before timeout", async () => {
|
||||
vi.useFakeTimers();
|
||||
const ev = new EventEmitter();
|
||||
const promise = waitForWaConnection(
|
||||
{ ev } as unknown as ReturnType<typeof baileys.makeWASocket>,
|
||||
{ timeoutMs: 5000 },
|
||||
);
|
||||
ev.emit("connection.update", { connection: "open" });
|
||||
await expect(promise).resolves.toBeUndefined();
|
||||
expect(ev.listenerCount("connection.update")).toBe(0);
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("logWebSelfId prints cached E.164 when creds exist", () => {
|
||||
const authDir = createTempAuthDir("openclaw-wa-log-self");
|
||||
fsSync.writeFileSync(
|
||||
|
||||
@@ -27,7 +27,6 @@ import {
|
||||
import { renderQrTerminal } from "./qr-terminal.js";
|
||||
import { getStatusCode } from "./session-errors.js";
|
||||
import {
|
||||
DisconnectReason,
|
||||
fetchLatestBaileysVersion,
|
||||
makeCacheableSignalKeyStore,
|
||||
makeWASocket,
|
||||
@@ -63,7 +62,7 @@ export {
|
||||
} from "./creds-persistence.js";
|
||||
export type { CredsQueueWaitResult } from "./creds-persistence.js";
|
||||
|
||||
const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401;
|
||||
const LOGGED_OUT_STATUS = 401;
|
||||
const WHATSAPP_WEBSOCKET_PROXY_TARGET = "https://mmg.whatsapp.net/";
|
||||
const CREDS_FLUSH_TIMEOUT_MESSAGE =
|
||||
"Queued WhatsApp creds save did not finish before auth bootstrap; skipping repair and continuing with primary creds.";
|
||||
@@ -311,21 +310,41 @@ function normalizeEnvProxyValue(value: string | undefined): string | null | unde
|
||||
return trimmed.length > 0 ? trimmed : null;
|
||||
}
|
||||
|
||||
export async function waitForWaConnection(sock: ReturnType<typeof makeWASocket>) {
|
||||
export type WhatsAppConnectionWaitOptions =
|
||||
| {
|
||||
timeout: "none";
|
||||
}
|
||||
| {
|
||||
timeoutMs: number;
|
||||
};
|
||||
|
||||
export async function waitForWaConnection(
|
||||
sock: ReturnType<typeof makeWASocket>,
|
||||
options: WhatsAppConnectionWaitOptions = { timeout: "none" },
|
||||
) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
type OffCapable = {
|
||||
off?: (event: string, listener: (...args: unknown[]) => void) => void;
|
||||
};
|
||||
const evWithOff = sock.ev as unknown as OffCapable;
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
|
||||
const cleanup = () => {
|
||||
evWithOff.off?.("connection.update", handler);
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
timer = undefined;
|
||||
}
|
||||
};
|
||||
|
||||
const handler = (...args: unknown[]) => {
|
||||
const update = (args[0] ?? {}) as Partial<import("baileys").ConnectionState>;
|
||||
if (update.connection === "open") {
|
||||
evWithOff.off?.("connection.update", handler);
|
||||
cleanup();
|
||||
resolve();
|
||||
}
|
||||
if (update.connection === "close") {
|
||||
evWithOff.off?.("connection.update", handler);
|
||||
cleanup();
|
||||
reject(
|
||||
toLintErrorObject(
|
||||
update.lastDisconnect ?? new Error("Connection closed"),
|
||||
@@ -336,6 +355,15 @@ export async function waitForWaConnection(sock: ReturnType<typeof makeWASocket>)
|
||||
};
|
||||
|
||||
sock.ev.on("connection.update", handler);
|
||||
|
||||
if ("timeoutMs" in options) {
|
||||
const timeoutMs = options.timeoutMs;
|
||||
timer = setTimeout(() => {
|
||||
cleanup();
|
||||
reject(createConnectionTimeoutError(timeoutMs));
|
||||
}, timeoutMs);
|
||||
timer.unref?.();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -356,3 +384,13 @@ function toLintErrorObject(value: unknown, fallbackMessage: string): Error {
|
||||
}
|
||||
return error;
|
||||
}
|
||||
|
||||
function createConnectionTimeoutError(timeoutMs: number): Error {
|
||||
const error = new Error(`WhatsApp connection timed out after ${timeoutMs}ms`);
|
||||
Object.assign(error, {
|
||||
output: {
|
||||
statusCode: 408,
|
||||
},
|
||||
});
|
||||
return error;
|
||||
}
|
||||
|
||||
@@ -61,4 +61,24 @@ describe("runtime web channel plugin", () => {
|
||||
"web channel plugin runtime is missing export 'resolveDefaultWebAuthDir'",
|
||||
);
|
||||
});
|
||||
|
||||
it("forwards the explicit connection wait policy to the heavy runtime", async () => {
|
||||
const waitForWaConnection = vi.fn().mockResolvedValue(undefined);
|
||||
const sock = { id: "socket" };
|
||||
|
||||
vi.doMock("./runtime-plugin-boundary.js", () => ({
|
||||
loadPluginBoundaryModule: () => ({ waitForWaConnection }),
|
||||
resolvePluginRuntimeModulePath: () => "/tmp/runtime-api.js",
|
||||
resolvePluginRuntimeRecordByEntryBaseNames: () => ({
|
||||
origin: "bundled",
|
||||
source: "test",
|
||||
}),
|
||||
}));
|
||||
|
||||
const { waitForWebChannelConnection } = await import("./runtime-web-channel-plugin.js");
|
||||
|
||||
await waitForWebChannelConnection(sock, { timeoutMs: 12_345 });
|
||||
|
||||
expect(waitForWaConnection).toHaveBeenCalledWith(sock, { timeoutMs: 12_345 });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -26,6 +26,14 @@ type WebChannelPluginRecord = {
|
||||
source: string;
|
||||
};
|
||||
|
||||
type WebChannelConnectionWaitOptions =
|
||||
| {
|
||||
timeout: "none";
|
||||
}
|
||||
| {
|
||||
timeoutMs: number;
|
||||
};
|
||||
|
||||
type WebChannelLightRuntimeModule = {
|
||||
getActiveWebListener: (accountId?: string | null) => unknown;
|
||||
getWebAuthAgeMs: (authDir?: string) => number | null;
|
||||
@@ -101,7 +109,7 @@ type WebChannelHeavyRuntimeModule = {
|
||||
monitorWebChannel: (...args: unknown[]) => Promise<unknown>;
|
||||
monitorWebInbox: (...args: unknown[]) => Promise<unknown>;
|
||||
startWebLoginWithQr: (...args: unknown[]) => Promise<unknown>;
|
||||
waitForWaConnection: (sock: unknown) => Promise<void>;
|
||||
waitForWaConnection: (sock: unknown, options: WebChannelConnectionWaitOptions) => Promise<void>;
|
||||
waitForWebLogin: (...args: unknown[]) => Promise<unknown>;
|
||||
extractMediaPlaceholder: (...args: unknown[]) => unknown;
|
||||
extractText: (...args: unknown[]) => unknown;
|
||||
|
||||
Reference in New Issue
Block a user