mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(scripts): stop rpc rtt process groups
This commit is contained in:
@@ -54,6 +54,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Release/CI/E2E: bound release candidate GitHub API calls so stalled network requests cannot wedge workflow and artifact polling.
|
||||
- Release/CI/E2E: bound Discord smoke API calls in cross-OS release checks so host-side round trips cannot hang on stalled fetches.
|
||||
- Release/CI/E2E: bound RPC RTT gateway readiness probes so a half-open local HTTP response cannot stall cleanup past the readiness deadline.
|
||||
- Release/CI/E2E: stop RPC RTT gateway process groups so pnpm wrapper children cannot survive measurement cleanup.
|
||||
- Scripts/UI: stop descendant processes from wrapped non-interactive commands when `run-with-env` receives shutdown signals.
|
||||
- Release/CI/E2E: write multi-node update Docker artifacts to unique per-run directories by default so parallel runs cannot overwrite evidence.
|
||||
- Release/CI/E2E: write package Telegram Docker artifacts to unique per-run directories by default so parallel live/RTT runs cannot overwrite evidence.
|
||||
|
||||
@@ -11,6 +11,7 @@ const DEFAULT_METHODS = ["health", "config.get"];
|
||||
const DEFAULT_ITERATIONS = 10;
|
||||
export const READY_TIMEOUT_MS = 120_000;
|
||||
export const READY_PROBE_TIMEOUT_MS = 1_000;
|
||||
const PARENT_TERMINATION_SIGNALS = ["SIGHUP", "SIGINT", "SIGTERM"];
|
||||
const IS_DIRECT_RUN =
|
||||
typeof process.argv[1] === "string" &&
|
||||
path.resolve(process.argv[1]) === fileURLToPath(import.meta.url);
|
||||
@@ -143,20 +144,117 @@ export async function waitForGatewayReady({
|
||||
throw new Error(`gateway did not become ready after ${readyTimeoutMs}ms\n${stderr.slice(-4000)}`);
|
||||
}
|
||||
|
||||
async function stopGateway(child) {
|
||||
if (child.exitCode !== null || child.signalCode !== null) {
|
||||
function isProcessAlreadyExitedError(error) {
|
||||
return error && typeof error === "object" && error.code === "ESRCH";
|
||||
}
|
||||
|
||||
function defaultKillProcess(pid, signal) {
|
||||
return process.kill(pid, signal);
|
||||
}
|
||||
|
||||
async function defaultOpen(filePath, flags) {
|
||||
return await fs.open(filePath, flags);
|
||||
}
|
||||
|
||||
export function signalGatewayProcess(child, signal, killProcess = defaultKillProcess) {
|
||||
if (process.platform !== "win32" && typeof child.pid === "number") {
|
||||
try {
|
||||
killProcess(-child.pid, signal);
|
||||
return true;
|
||||
} catch (error) {
|
||||
if (isProcessAlreadyExitedError(error)) {
|
||||
return false;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
try {
|
||||
return child.kill(signal);
|
||||
} catch (error) {
|
||||
if (isProcessAlreadyExitedError(error)) {
|
||||
return false;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export function isGatewayProcessAlive(child, killProcess = defaultKillProcess) {
|
||||
if (process.platform !== "win32" && typeof child.pid === "number") {
|
||||
try {
|
||||
killProcess(-child.pid, 0);
|
||||
return true;
|
||||
} catch (error) {
|
||||
if (isProcessAlreadyExitedError(error)) {
|
||||
return false;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
return child.exitCode === null && child.signalCode === null;
|
||||
}
|
||||
|
||||
function signalGatewayProcessForParentExit(child, signal, killProcess) {
|
||||
try {
|
||||
signalGatewayProcess(child, signal, killProcess);
|
||||
} catch {
|
||||
// Parent shutdown cleanup is best effort; the original signal should win.
|
||||
}
|
||||
}
|
||||
|
||||
export function installGatewayParentCleanup(
|
||||
child,
|
||||
{ killProcess = defaultKillProcess, processLike = process } = {},
|
||||
) {
|
||||
const signalHandlers = new Map();
|
||||
const cleanup = (signal) => {
|
||||
signalGatewayProcessForParentExit(child, signal, killProcess);
|
||||
if (process.platform !== "win32") {
|
||||
signalGatewayProcessForParentExit(child, "SIGKILL", killProcess);
|
||||
}
|
||||
};
|
||||
const exitHandler = () => {
|
||||
cleanup("SIGTERM");
|
||||
};
|
||||
const removeHandlers = () => {
|
||||
processLike.off?.("exit", exitHandler);
|
||||
for (const [signal, handler] of signalHandlers) {
|
||||
processLike.off?.(signal, handler);
|
||||
}
|
||||
signalHandlers.clear();
|
||||
};
|
||||
processLike.once("exit", exitHandler);
|
||||
for (const signal of PARENT_TERMINATION_SIGNALS) {
|
||||
const handler = () => {
|
||||
cleanup(signal);
|
||||
removeHandlers();
|
||||
processLike.kill?.(processLike.pid, signal);
|
||||
};
|
||||
signalHandlers.set(signal, handler);
|
||||
processLike.once(signal, handler);
|
||||
}
|
||||
return removeHandlers;
|
||||
}
|
||||
|
||||
async function waitForGatewayExit(child, timeoutMs, killProcess = defaultKillProcess) {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
while (Date.now() <= deadline) {
|
||||
if (!isGatewayProcessAlive(child, killProcess)) {
|
||||
return true;
|
||||
}
|
||||
await sleep(Math.min(25, Math.max(0, deadline - Date.now())));
|
||||
}
|
||||
return !isGatewayProcessAlive(child, killProcess);
|
||||
}
|
||||
|
||||
export async function stopGateway(child, options = {}) {
|
||||
if (!isGatewayProcessAlive(child, options.killProcess)) {
|
||||
return;
|
||||
}
|
||||
child.kill("SIGTERM");
|
||||
const exited = await new Promise((resolve) => {
|
||||
const timer = setTimeout(() => resolve(false), 1_500);
|
||||
child.once("exit", () => {
|
||||
clearTimeout(timer);
|
||||
resolve(true);
|
||||
});
|
||||
});
|
||||
if (!exited && child.exitCode === null && child.signalCode === null) {
|
||||
child.kill("SIGKILL");
|
||||
const killGraceMs = Math.max(0, options.killGraceMs ?? 1_500);
|
||||
signalGatewayProcess(child, "SIGTERM", options.killProcess);
|
||||
const exited = await waitForGatewayExit(child, killGraceMs, options.killProcess);
|
||||
if (!exited) {
|
||||
signalGatewayProcess(child, "SIGKILL", options.killProcess);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,7 +269,7 @@ async function closeFileHandles(handles) {
|
||||
export async function startGateway({
|
||||
configPath,
|
||||
env = process.env,
|
||||
openImpl = fs.open,
|
||||
openImpl = defaultOpen,
|
||||
port,
|
||||
repoRoot,
|
||||
spawnImpl = spawn,
|
||||
@@ -207,6 +305,7 @@ export async function startGateway({
|
||||
],
|
||||
{
|
||||
cwd: repoRoot,
|
||||
detached: process.platform !== "win32",
|
||||
env: {
|
||||
...env,
|
||||
HOME: path.join(tempRoot, "home"),
|
||||
@@ -417,6 +516,7 @@ async function main() {
|
||||
const stdoutPath = path.join(tempRoot, "gateway.stdout.log");
|
||||
const stderrPath = path.join(tempRoot, "gateway.stderr.log");
|
||||
let gatewayChild;
|
||||
let removeGatewayParentCleanup = () => {};
|
||||
let status = "fail";
|
||||
let details = "";
|
||||
let measurement;
|
||||
@@ -449,6 +549,7 @@ async function main() {
|
||||
tempRoot,
|
||||
token,
|
||||
});
|
||||
removeGatewayParentCleanup = installGatewayParentCleanup(gatewayChild);
|
||||
await waitForGatewayReady({ child: gatewayChild, port, stderrPath });
|
||||
|
||||
const requireFromOpenClaw = createRequire(path.join(repoRoot, "package.json"));
|
||||
@@ -534,8 +635,12 @@ async function main() {
|
||||
} catch (error) {
|
||||
details = error instanceof Error ? (error.stack ?? error.message) : String(error);
|
||||
} finally {
|
||||
if (gatewayChild) {
|
||||
await stopGateway(gatewayChild).catch(() => {});
|
||||
try {
|
||||
if (gatewayChild) {
|
||||
await stopGateway(gatewayChild).catch(() => {});
|
||||
}
|
||||
} finally {
|
||||
removeGatewayParentCleanup();
|
||||
}
|
||||
try {
|
||||
await cleanupTempRoot(tempRoot);
|
||||
|
||||
@@ -2,7 +2,11 @@ import { EventEmitter } from "node:events";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
cleanupTempRoot,
|
||||
installGatewayParentCleanup,
|
||||
isGatewayProcessAlive,
|
||||
signalGatewayProcess,
|
||||
startGateway,
|
||||
stopGateway,
|
||||
waitForGatewayReady,
|
||||
} from "../../scripts/measure-rpc-rtt.mjs";
|
||||
|
||||
@@ -49,6 +53,7 @@ describe("scripts/measure-rpc-rtt.mjs", () => {
|
||||
],
|
||||
expect.objectContaining({
|
||||
cwd: "/repo",
|
||||
detached: process.platform !== "win32",
|
||||
env: expect.objectContaining({
|
||||
HOME: "/tmp/rpc-rtt/home",
|
||||
OPENCLAW_CONFIG_PATH: "/tmp/openclaw.json",
|
||||
@@ -63,6 +68,162 @@ describe("scripts/measure-rpc-rtt.mjs", () => {
|
||||
expect(stderr.close).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("signals the gateway process group on POSIX so pnpm children do not leak", () => {
|
||||
const child = Object.assign(new EventEmitter(), {
|
||||
exitCode: null,
|
||||
kill: vi.fn(),
|
||||
pid: 12345,
|
||||
signalCode: null,
|
||||
});
|
||||
const kill = vi.fn(() => true);
|
||||
|
||||
expect(signalGatewayProcess(child, "SIGTERM", kill)).toBe(true);
|
||||
|
||||
if (process.platform === "win32") {
|
||||
expect(child.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
} else {
|
||||
expect(kill).toHaveBeenCalledWith(-12345, "SIGTERM");
|
||||
expect(child.kill).not.toHaveBeenCalled();
|
||||
}
|
||||
});
|
||||
|
||||
it("treats missing gateway process groups as already exited", () => {
|
||||
const child = Object.assign(new EventEmitter(), {
|
||||
exitCode: null,
|
||||
kill: vi.fn(() => false),
|
||||
pid: 12345,
|
||||
signalCode: null,
|
||||
});
|
||||
const kill = vi.fn(() => {
|
||||
throw Object.assign(new Error("no such process"), { code: "ESRCH" });
|
||||
});
|
||||
|
||||
expect(signalGatewayProcess(child, "SIGTERM", kill)).toBe(false);
|
||||
});
|
||||
|
||||
it("checks process group liveness instead of only the pnpm wrapper", () => {
|
||||
const child = Object.assign(new EventEmitter(), {
|
||||
exitCode: 0,
|
||||
kill: vi.fn(),
|
||||
pid: 12345,
|
||||
signalCode: null,
|
||||
});
|
||||
const kill = vi.fn(() => true);
|
||||
|
||||
if (process.platform === "win32") {
|
||||
expect(isGatewayProcessAlive(child, kill)).toBe(false);
|
||||
} else {
|
||||
expect(isGatewayProcessAlive(child, kill)).toBe(true);
|
||||
expect(kill).toHaveBeenCalledWith(-12345, 0);
|
||||
}
|
||||
});
|
||||
|
||||
it("force-kills the gateway process group after the graceful stop window", async () => {
|
||||
const child = Object.assign(new EventEmitter(), {
|
||||
exitCode: null,
|
||||
kill: vi.fn(),
|
||||
pid: 12346,
|
||||
signalCode: null,
|
||||
});
|
||||
const kill = vi.fn(() => true);
|
||||
|
||||
await stopGateway(child, { killGraceMs: 1, killProcess: kill });
|
||||
|
||||
if (process.platform === "win32") {
|
||||
expect(child.kill).toHaveBeenNthCalledWith(1, "SIGTERM");
|
||||
expect(child.kill).toHaveBeenNthCalledWith(2, "SIGKILL");
|
||||
} else {
|
||||
expect(kill).toHaveBeenNthCalledWith(1, -12346, 0);
|
||||
expect(kill).toHaveBeenNthCalledWith(2, -12346, "SIGTERM");
|
||||
expect(kill).toHaveBeenLastCalledWith(-12346, "SIGKILL");
|
||||
expect(child.kill).not.toHaveBeenCalled();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not trust a finished pnpm wrapper while the process group is alive", async () => {
|
||||
const child = Object.assign(new EventEmitter(), {
|
||||
exitCode: 0,
|
||||
kill: vi.fn(),
|
||||
pid: 12347,
|
||||
signalCode: null,
|
||||
});
|
||||
const kill = vi.fn(() => true);
|
||||
|
||||
await stopGateway(child, { killGraceMs: 1, killProcess: kill });
|
||||
|
||||
if (process.platform === "win32") {
|
||||
expect(child.kill).not.toHaveBeenCalled();
|
||||
} else {
|
||||
expect(kill).toHaveBeenNthCalledWith(1, -12347, 0);
|
||||
expect(kill).toHaveBeenNthCalledWith(2, -12347, "SIGTERM");
|
||||
expect(kill).toHaveBeenLastCalledWith(-12347, "SIGKILL");
|
||||
expect(child.kill).not.toHaveBeenCalled();
|
||||
}
|
||||
});
|
||||
|
||||
it("cleans up the gateway process group before re-raising parent signals", () => {
|
||||
const child = Object.assign(new EventEmitter(), {
|
||||
exitCode: null,
|
||||
kill: vi.fn(),
|
||||
pid: 12348,
|
||||
signalCode: null,
|
||||
});
|
||||
const processLike = Object.assign(new EventEmitter(), {
|
||||
kill: vi.fn(),
|
||||
pid: 98765,
|
||||
});
|
||||
const kill = vi.fn(() => true);
|
||||
|
||||
const removeCleanup = installGatewayParentCleanup(child, {
|
||||
killProcess: kill,
|
||||
processLike,
|
||||
});
|
||||
processLike.emit("SIGTERM");
|
||||
|
||||
if (process.platform === "win32") {
|
||||
expect(child.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
} else {
|
||||
expect(kill).toHaveBeenNthCalledWith(1, -12348, "SIGTERM");
|
||||
expect(kill).toHaveBeenNthCalledWith(2, -12348, "SIGKILL");
|
||||
expect(child.kill).not.toHaveBeenCalled();
|
||||
}
|
||||
expect(processLike.kill).toHaveBeenCalledWith(98765, "SIGTERM");
|
||||
expect(processLike.listenerCount("SIGTERM")).toBe(0);
|
||||
|
||||
removeCleanup();
|
||||
});
|
||||
|
||||
it("cleans up the gateway process group on parent exit", () => {
|
||||
const child = Object.assign(new EventEmitter(), {
|
||||
exitCode: null,
|
||||
kill: vi.fn(),
|
||||
pid: 12349,
|
||||
signalCode: null,
|
||||
});
|
||||
const processLike = Object.assign(new EventEmitter(), {
|
||||
kill: vi.fn(),
|
||||
pid: 98766,
|
||||
});
|
||||
const kill = vi.fn(() => true);
|
||||
|
||||
const removeCleanup = installGatewayParentCleanup(child, {
|
||||
killProcess: kill,
|
||||
processLike,
|
||||
});
|
||||
processLike.emit("exit");
|
||||
|
||||
if (process.platform === "win32") {
|
||||
expect(child.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
} else {
|
||||
expect(kill).toHaveBeenNthCalledWith(1, -12349, "SIGTERM");
|
||||
expect(kill).toHaveBeenNthCalledWith(2, -12349, "SIGKILL");
|
||||
expect(child.kill).not.toHaveBeenCalled();
|
||||
}
|
||||
expect(processLike.kill).not.toHaveBeenCalled();
|
||||
|
||||
removeCleanup();
|
||||
});
|
||||
|
||||
it("fails readiness immediately when the gateway already exited", async () => {
|
||||
const child = Object.assign(new EventEmitter(), {
|
||||
exitCode: 1,
|
||||
|
||||
Reference in New Issue
Block a user