fix(agent): abort accepted gateway runs on signal

This commit is contained in:
Kaspre
2026-05-19 21:08:39 -04:00
committed by Peter Steinberger
parent 192a782b99
commit 01fce88082
13 changed files with 2841 additions and 210 deletions

View File

@@ -55,6 +55,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t
- Isolated cron runs also treat run-level agent failures as job errors even when no reply payload is produced, so model/provider failures increment error counters and trigger failure notifications instead of clearing the job as successful.
- When an isolated agent-turn job reaches `timeoutSeconds`, cron aborts the underlying agent run and gives it a short cleanup window. If the run does not drain, Gateway-owned cleanup force-clears that run's session ownership before cron records the timeout, so queued chat work is not left behind a stale processing session.
- If an isolated agent-turn stalls before the runner starts or before the first model call, cron records a phase-specific timeout such as `setup timed out before runner start` or `stalled before first model call (last phase: context-engine)`. These watchdogs cover embedded providers and CLI-backed providers before their external CLI process is actually started, and are capped independently from long `timeoutSeconds` values so cold-start/auth/context failures surface quickly instead of waiting for the full job budget.
- If you use system cron or another external scheduler to run `openclaw agent`, wrap it with a hard-kill escalation even though the CLI handles `SIGTERM`/`SIGINT`. Gateway-backed runs ask the Gateway to abort accepted runs; local and embedded fallback runs receive the same abort signal. For GNU `timeout`, prefer `timeout -k 60 600 openclaw agent ...` over plain `timeout 600 ...`; the `-k` value is the supervisor backstop if the process cannot drain. For systemd units, keep the same shape by using a `SIGTERM` stop signal plus a grace window such as `TimeoutStopSec` before any final kill. If a retry reuses a `--run-id` while the original Gateway run is still active, the duplicate is reported as in-flight instead of starting a second run.
<a id="maintenance"></a>

View File

@@ -65,6 +65,7 @@ openclaw agent --agent ops --message "Run locally" --local
- `--json` keeps stdout reserved for the JSON response. Gateway, plugin, and embedded-fallback diagnostics are routed to stderr so scripts can parse stdout directly.
- Embedded fallback JSON includes `meta.transport: "embedded"` and `meta.fallbackFrom: "gateway"` so scripts can distinguish fallback runs from Gateway runs.
- If the Gateway accepts an agent run but the CLI times out waiting for the final reply, embedded fallback uses a fresh explicit `gateway-fallback-*` session/run id and reports `meta.fallbackReason: "gateway_timeout"` plus the fallback session fields. This avoids racing the Gateway-owned transcript lock or silently replacing the original routed conversation session.
- For Gateway-backed runs, `SIGTERM` and `SIGINT` interrupt the waiting CLI request. If the Gateway has already accepted the run, the CLI also sends `chat.abort` for that accepted run id before exiting. Local `--local` runs and embedded fallback runs receive the same abort signal, but do not send `chat.abort`. If a duplicate `--run-id` reaches the Gateway while the original agent run is still active, the duplicate response reports `status: "in_flight"` and the non-JSON CLI prints a stderr diagnostic instead of an empty reply. For external cron/systemd wrappers, keep an outer hard-kill backstop such as `timeout -k 60 600 openclaw agent ...` so the supervisor can still reap the process if shutdown cannot drain.
- When this command triggers `models.json` regeneration, SecretRef-managed provider credentials are persisted as non-secret markers (for example env var names, `secretref-env:ENV_VAR_NAME`, or `secretref-managed`), not resolved secret plaintext.
- Marker writes are source-authoritative: OpenClaw persists markers from the active source config snapshot, not from resolved runtime secret values.

View File

@@ -107,6 +107,34 @@ function requireRecord(value: unknown, label: string): Record<string, unknown> {
return value as Record<string, unknown>;
}
function createSignalProcess() {
type SignalName = "SIGINT" | "SIGTERM";
const listeners = new Map<SignalName, Set<() => void>>();
const processLike = {
on(signal: SignalName, handler: () => void) {
const current = listeners.get(signal) ?? new Set<() => void>();
current.add(handler);
listeners.set(signal, current);
return processLike;
},
off(signal: SignalName, handler: () => void) {
listeners.get(signal)?.delete(handler);
return processLike;
},
};
return {
processLike,
emit(signal: SignalName) {
for (const handler of listeners.get(signal) ?? []) {
handler();
}
},
listenerCount(signal: SignalName) {
return listeners.get(signal)?.size ?? 0;
},
};
}
function mockMessages(mock: unknown): string[] {
const calls = (mock as { mock?: { calls?: unknown[][] } }).mock?.calls ?? [];
return calls.map(([message]) => String(message));
@@ -361,6 +389,638 @@ describe("agentCliCommand", () => {
);
});
it("does not treat lazy channel deps as the process signal source", async () => {
await withTempStore(async () => {
mockGatewaySuccessReply();
const deps = new Proxy(
{},
{
get(target, property, receiver) {
if (property === "process") {
return async () => undefined;
}
return Reflect.get(target, property, receiver);
},
},
);
await agentCliCommand({ message: "hi", to: "+1555" }, runtime, deps);
expect(callGateway).toHaveBeenCalledTimes(1);
expect(agentCommand).not.toHaveBeenCalled();
});
});
it("exits for successful gateway runs when SIGTERM arrives before return", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
mockGatewaySuccessReply();
const signalRuntime: RuntimeEnv = {
log: vi.fn(() => {
signals.emit("SIGTERM");
}),
error: vi.fn(),
exit: vi.fn(),
};
const result = await agentCliCommand({ message: "hi", to: "+1555" }, signalRuntime, {
process: signals.processLike,
});
expect(result).toBeUndefined();
expect(callGateway).toHaveBeenCalledTimes(1);
expect(agentCommand).not.toHaveBeenCalled();
expect(signalRuntime.log).toHaveBeenCalledWith("hello");
expect(signalRuntime.exit).toHaveBeenCalledWith(143);
});
});
it.each([
["SIGTERM", 143],
["SIGINT", 130],
] as const)(
"aborts an accepted gateway run using the accepted session key when %s interrupts the CLI",
async (signalName, exitCode) => {
await withTempStore(async () => {
const signals = createSignalProcess();
let sameConnectionAbort:
| { method: string; params: unknown; opts?: { timeoutMs?: number | null } }
| undefined;
callGateway.mockImplementation(async (requestValue: unknown) => {
const request = requireRecord(requestValue, "gateway request");
if (request.method === "agent") {
const onAccepted = request.onAccepted as ((payload: unknown) => void) | undefined;
const onSignalAbort = request.onSignalAbort as
| ((
request: (
method: string,
params?: unknown,
opts?: { timeoutMs?: number | null },
) => Promise<unknown>,
) => Promise<void>)
| undefined;
const signal = request.signal as AbortSignal | undefined;
onAccepted?.({
status: "accepted",
runId: "run-signal",
sessionKey: "agent:main:explicit:reset-run",
});
return await new Promise((_, reject) => {
signal?.addEventListener(
"abort",
() => {
void (async () => {
await onSignalAbort?.(async (method, params, opts) => {
sameConnectionAbort = { method, params, opts };
return { ok: true, aborted: true, runIds: ["run-signal"] };
});
const err = new Error("gateway request aborted for agent");
err.name = "AbortError";
reject(err);
})();
},
{ once: true },
);
});
}
throw new Error(`unexpected gateway method ${String(request.method)}`);
});
const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, {
process: signals.processLike,
});
await Promise.resolve();
signals.emit(signalName);
expect(signals.listenerCount("SIGTERM")).toBe(0);
expect(signals.listenerCount("SIGINT")).toBe(0);
await run;
expect(callGateway).toHaveBeenCalledTimes(1);
expect(runtime.exit).toHaveBeenCalledWith(exitCode);
expect(sameConnectionAbort?.method).toBe("chat.abort");
expect(sameConnectionAbort?.opts).toEqual({ timeoutMs: 2_000 });
expect(sameConnectionAbort?.params).toEqual({
sessionKey: "agent:main:explicit:reset-run",
runId: "run-signal",
});
});
},
);
it("aborts a gateway run by idempotency key before the accepted ack", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
let sameConnectionAbort:
| { method: string; params: unknown; opts?: { timeoutMs?: number | null } }
| undefined;
callGateway.mockImplementation(async (requestValue: unknown) => {
const request = requireRecord(requestValue, "gateway request");
if (request.method === "agent") {
const params = requireRecord(request.params, "gateway agent params");
expect(params.idempotencyKey).toBe("pre-accepted-run");
const onSignalAbort = request.onSignalAbort as
| ((
request: (
method: string,
params?: unknown,
opts?: { timeoutMs?: number | null },
) => Promise<unknown>,
) => Promise<void>)
| undefined;
const signal = request.signal as AbortSignal | undefined;
return await new Promise((_, reject) => {
signal?.addEventListener(
"abort",
() => {
void (async () => {
await onSignalAbort?.(async (method, params, opts) => {
sameConnectionAbort = { method, params, opts };
return { ok: true, aborted: true, runIds: ["pre-accepted-run"] };
});
const err = new Error("gateway request aborted before accepted ack");
err.name = "AbortError";
reject(err);
})();
},
{ once: true },
);
});
}
throw new Error(`unexpected gateway method ${String(request.method)}`);
});
const run = agentCliCommand(
{ message: "hi", sessionId: "pre-session", runId: "pre-accepted-run" },
runtime,
{
process: signals.processLike,
},
);
await Promise.resolve();
signals.emit("SIGTERM");
await run;
expect(callGateway).toHaveBeenCalledTimes(1);
expect(runtime.exit).toHaveBeenCalledWith(143);
expect(sameConnectionAbort?.method).toBe("chat.abort");
expect(sameConnectionAbort?.opts).toEqual({ timeoutMs: 2_000 });
expect(sameConnectionAbort?.params).toEqual({
sessionKey: "agent:main:explicit:pre-session",
runId: "pre-accepted-run",
});
expect(signals.listenerCount("SIGTERM")).toBe(0);
expect(signals.listenerCount("SIGINT")).toBe(0);
});
});
it("skips fallback abort when SIGTERM interrupts before the gateway request starts", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
callGateway.mockImplementation(async (requestValue: unknown) => {
const request = requireRecord(requestValue, "gateway request");
if (request.method === "agent") {
const signal = request.signal as AbortSignal | undefined;
return await new Promise((_, reject) => {
signal?.addEventListener(
"abort",
() => {
const err = new Error("gateway request aborted before start");
err.name = "AbortError";
reject(err);
},
{ once: true },
);
});
}
throw new Error(`unexpected gateway method ${String(request.method)}`);
});
const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, {
process: signals.processLike,
});
await Promise.resolve();
signals.emit("SIGTERM");
await run;
expect(callGateway).toHaveBeenCalledTimes(1);
expect(runtime.exit).toHaveBeenCalledWith(143);
expect(signals.listenerCount("SIGTERM")).toBe(0);
expect(signals.listenerCount("SIGINT")).toBe(0);
});
});
it("retries same-connection abort before falling back to a new Gateway call", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
const sameConnectionAborts: Array<{
method: string;
params: unknown;
opts?: { timeoutMs?: number | null };
}> = [];
callGateway.mockImplementation(async (requestValue: unknown) => {
const request = requireRecord(requestValue, "gateway request");
if (request.method === "agent") {
const params = requireRecord(request.params, "gateway agent params");
expect(params.idempotencyKey).toBe("pre-accepted-run");
const onSignalAbort = request.onSignalAbort as
| ((
request: (
method: string,
params?: unknown,
opts?: { timeoutMs?: number | null },
) => Promise<unknown>,
) => Promise<void>)
| undefined;
const signal = request.signal as AbortSignal | undefined;
return await new Promise((_, reject) => {
signal?.addEventListener(
"abort",
() => {
void (async () => {
await onSignalAbort?.(async (method, params, opts) => {
sameConnectionAborts.push({ method, params, opts });
return sameConnectionAborts.length < 3
? { ok: true, aborted: false, runIds: [] }
: { ok: true, aborted: true, runIds: ["pre-accepted-run"] };
});
const err = new Error("gateway request aborted before registration");
err.name = "AbortError";
reject(err);
})();
},
{ once: true },
);
});
}
throw new Error(`unexpected gateway method ${String(request.method)}`);
});
const run = agentCliCommand(
{ message: "hi", to: "+1555", runId: "pre-accepted-run" },
runtime,
{
process: signals.processLike,
},
);
await Promise.resolve();
signals.emit("SIGTERM");
await run;
expect(callGateway).toHaveBeenCalledTimes(1);
expect(runtime.exit).toHaveBeenCalledWith(143);
expect(sameConnectionAborts).toHaveLength(3);
expect(sameConnectionAborts.at(-1)).toEqual({
method: "chat.abort",
opts: { timeoutMs: 2_000 },
params: {
sessionKey: "agent:main:main",
runId: "pre-accepted-run",
},
});
expect(signals.listenerCount("SIGTERM")).toBe(0);
expect(signals.listenerCount("SIGINT")).toBe(0);
});
});
it("falls back to a new Gateway call when the same-connection abort is not confirmed", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
const sameConnectionAborts: Array<{
method: string;
params: unknown;
opts?: { timeoutMs?: number | null };
}> = [];
let fallbackAbort: Record<string, unknown> | undefined;
callGateway.mockImplementation(async (requestValue: unknown) => {
const request = requireRecord(requestValue, "gateway request");
if (request.method === "agent") {
const params = requireRecord(request.params, "gateway agent params");
expect(params.idempotencyKey).toBe("pre-accepted-run");
const onSignalAbort = request.onSignalAbort as
| ((
request: (
method: string,
params?: unknown,
opts?: { timeoutMs?: number | null },
) => Promise<unknown>,
) => Promise<void>)
| undefined;
const signal = request.signal as AbortSignal | undefined;
return await new Promise((_, reject) => {
signal?.addEventListener(
"abort",
() => {
void (async () => {
await onSignalAbort?.(async (method, params, opts) => {
sameConnectionAborts.push({ method, params, opts });
return { ok: true, aborted: false, runIds: [] };
});
const err = new Error("gateway request aborted before registration");
err.name = "AbortError";
reject(err);
})();
},
{ once: true },
);
});
}
if (request.method === "chat.abort") {
fallbackAbort = request;
return { ok: true, aborted: true, runIds: ["pre-accepted-run"] };
}
throw new Error(`unexpected gateway method ${String(request.method)}`);
});
const run = agentCliCommand(
{ message: "hi", to: "+1555", runId: "pre-accepted-run" },
runtime,
{
process: signals.processLike,
},
);
await Promise.resolve();
signals.emit("SIGTERM");
await run;
expect(callGateway).toHaveBeenCalledTimes(2);
expect(runtime.exit).toHaveBeenCalledWith(143);
expect(sameConnectionAborts).toHaveLength(5);
expect(sameConnectionAborts.at(-1)).toEqual({
method: "chat.abort",
opts: { timeoutMs: 2_000 },
params: {
sessionKey: "agent:main:main",
runId: "pre-accepted-run",
},
});
expect(fallbackAbort?.method).toBe("chat.abort");
expect(fallbackAbort?.timeoutMs).toBe(2_000);
expect(fallbackAbort?.params).toEqual({
sessionKey: "agent:main:main",
runId: "pre-accepted-run",
});
expect(signals.listenerCount("SIGTERM")).toBe(0);
expect(signals.listenerCount("SIGINT")).toBe(0);
});
});
it("preserves backend admin authority when SIGTERM aborts a model override run", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
let sameConnectionAbort:
| { method: string; params: unknown; opts?: { timeoutMs?: number | null } }
| undefined;
callGateway.mockImplementation(async (requestValue: unknown) => {
const request = requireRecord(requestValue, "gateway request");
if (request.method === "agent") {
expect(request.clientName).toBe("gateway-client");
expect(request.mode).toBe("backend");
expect(request.scopes).toEqual(["operator.admin"]);
const onAccepted = request.onAccepted as ((payload: unknown) => void) | undefined;
const onSignalAbort = request.onSignalAbort as
| ((
request: (
method: string,
params?: unknown,
opts?: { timeoutMs?: number | null },
) => Promise<unknown>,
) => Promise<void>)
| undefined;
const signal = request.signal as AbortSignal | undefined;
onAccepted?.({ status: "accepted", runId: "run-model-sigterm" });
return await new Promise((_, reject) => {
signal?.addEventListener(
"abort",
() => {
void (async () => {
await onSignalAbort?.(async (method, params, opts) => {
sameConnectionAbort = { method, params, opts };
return { ok: true, aborted: true, runIds: ["run-model-sigterm"] };
});
const err = new Error("gateway request aborted for model override agent");
err.name = "AbortError";
reject(err);
})();
},
{ once: true },
);
});
}
throw new Error(`unexpected gateway method ${String(request.method)}`);
});
const run = agentCliCommand(
{ message: "hi", to: "+1555", model: "ollama/qwen3.5:9b" },
runtime,
{
process: signals.processLike,
},
);
await Promise.resolve();
signals.emit("SIGTERM");
await run;
expect(callGateway).toHaveBeenCalledTimes(1);
expect(runtime.exit).toHaveBeenCalledWith(143);
expect(sameConnectionAbort?.method).toBe("chat.abort");
expect(sameConnectionAbort?.opts).toEqual({ timeoutMs: 2_000 });
expect(sameConnectionAbort?.params).toEqual({
sessionKey: "agent:main:main",
runId: "run-model-sigterm",
});
expect(signals.listenerCount("SIGTERM")).toBe(0);
expect(signals.listenerCount("SIGINT")).toBe(0);
});
});
it("preserves backend admin authority for model override fallback aborts", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
const sameConnectionAborts: Array<{
method: string;
params: unknown;
opts?: { timeoutMs?: number | null };
}> = [];
let fallbackAbort: Record<string, unknown> | undefined;
callGateway.mockImplementation(async (requestValue: unknown) => {
const request = requireRecord(requestValue, "gateway request");
if (request.method === "agent") {
expect(request.clientName).toBe("gateway-client");
expect(request.mode).toBe("backend");
expect(request.scopes).toEqual(["operator.admin"]);
const onAccepted = request.onAccepted as ((payload: unknown) => void) | undefined;
const onSignalAbort = request.onSignalAbort as
| ((
request: (
method: string,
params?: unknown,
opts?: { timeoutMs?: number | null },
) => Promise<unknown>,
) => Promise<void>)
| undefined;
const signal = request.signal as AbortSignal | undefined;
onAccepted?.({ status: "accepted", runId: "run-model-fallback" });
return await new Promise((_, reject) => {
signal?.addEventListener(
"abort",
() => {
void (async () => {
await onSignalAbort?.(async (method, params, opts) => {
sameConnectionAborts.push({ method, params, opts });
return { ok: true, aborted: false, runIds: [] };
});
const err = new Error("gateway request aborted for model override agent");
err.name = "AbortError";
reject(err);
})();
},
{ once: true },
);
});
}
if (request.method === "chat.abort") {
fallbackAbort = request;
return { ok: true, aborted: true, runIds: ["run-model-fallback"] };
}
throw new Error(`unexpected gateway method ${String(request.method)}`);
});
const run = agentCliCommand(
{ message: "hi", to: "+1555", model: "ollama/qwen3.5:9b" },
runtime,
{
process: signals.processLike,
},
);
await Promise.resolve();
signals.emit("SIGTERM");
await run;
expect(callGateway).toHaveBeenCalledTimes(2);
expect(runtime.exit).toHaveBeenCalledWith(143);
expect(sameConnectionAborts).toHaveLength(5);
expect(fallbackAbort?.method).toBe("chat.abort");
expect(fallbackAbort?.timeoutMs).toBe(2_000);
expect(fallbackAbort?.clientName).toBe("gateway-client");
expect(fallbackAbort?.mode).toBe("backend");
expect(fallbackAbort?.scopes).toEqual(["operator.admin"]);
expect(fallbackAbort?.params).toEqual({
sessionKey: "agent:main:main",
runId: "run-model-fallback",
});
expect(signals.listenerCount("SIGTERM")).toBe(0);
expect(signals.listenerCount("SIGINT")).toBe(0);
});
});
it("passes SIGTERM abort signals into local agent runs", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
agentCommand.mockImplementationOnce(async (opts: { abortSignal?: AbortSignal }) => {
expect(opts.abortSignal).toBeInstanceOf(AbortSignal);
return await new Promise((_, reject) => {
opts.abortSignal?.addEventListener(
"abort",
() => {
const err = new Error("local agent aborted");
err.name = "AbortError";
reject(err);
},
{ once: true },
);
});
});
const run = agentCliCommand({ message: "hi", to: "+1555", local: true }, runtime, {
process: signals.processLike,
});
await Promise.resolve();
signals.emit("SIGTERM");
await run;
expect(callGateway).not.toHaveBeenCalled();
expect(agentCommand).toHaveBeenCalledTimes(1);
expect(runtime.exit).toHaveBeenCalledWith(143);
expect(signals.listenerCount("SIGTERM")).toBe(0);
expect(signals.listenerCount("SIGINT")).toBe(0);
});
});
it("exits for local runs that resolve after SIGTERM aborts them", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
agentCommand.mockImplementationOnce(async (opts: { abortSignal?: AbortSignal }) => {
return await new Promise((resolve) => {
opts.abortSignal?.addEventListener(
"abort",
() => {
resolve({
payloads: [],
meta: { aborted: true },
} as unknown as Awaited<ReturnType<typeof AgentCommand>>);
},
{ once: true },
);
});
});
const run = agentCliCommand({ message: "hi", to: "+1555", local: true }, runtime, {
process: signals.processLike,
});
await Promise.resolve();
signals.emit("SIGTERM");
await expect(run).resolves.toBeUndefined();
expect(callGateway).not.toHaveBeenCalled();
expect(agentCommand).toHaveBeenCalledTimes(1);
expect(runtime.exit).toHaveBeenCalledWith(143);
});
});
it("exits for embedded fallback runs that resolve after SIGTERM aborts them", async () => {
await withTempStore(async () => {
const signals = createSignalProcess();
callGateway.mockRejectedValueOnce(createGatewayClosedError());
let resolveFallback: ((value: Awaited<ReturnType<typeof AgentCommand>>) => void) | undefined;
agentCommand.mockImplementationOnce(async (_opts: { abortSignal?: AbortSignal }) => {
return await new Promise((resolve) => {
resolveFallback = resolve;
});
});
const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, {
process: signals.processLike,
});
for (let attempt = 0; attempt < 10 && agentCommand.mock.calls.length === 0; attempt += 1) {
await Promise.resolve();
}
expect(agentCommand).toHaveBeenCalledTimes(1);
signals.emit("SIGTERM");
resolveFallback?.({
payloads: [],
meta: { aborted: true },
} as unknown as Awaited<ReturnType<typeof AgentCommand>>);
await expect(run).resolves.toBeUndefined();
expect(callGateway).toHaveBeenCalledTimes(1);
expect(runtime.exit).toHaveBeenCalledWith(143);
});
});
it("does not route abort errors through embedded gateway fallback classification", async () => {
await withTempStore(async () => {
const err = new Error("gateway request aborted for agent");
err.name = "AbortError";
callGateway.mockRejectedValueOnce(err);
await expect(agentCliCommand({ message: "hi", to: "+1555" }, runtime)).rejects.toThrow(
"gateway request aborted for agent",
);
expect(isGatewayTransportError).not.toHaveBeenCalled();
expect(agentCommand).not.toHaveBeenCalled();
});
});
it("stays silent when the gateway returns an intentional empty reply", async () => {
await withTempStore(async () => {
callGateway.mockResolvedValue({
@@ -397,6 +1057,23 @@ describe("agentCliCommand", () => {
});
});
it("surfaces duplicate in-flight gateway runs without pretending a reply arrived", async () => {
await withTempStore(async () => {
callGateway.mockResolvedValue({
runId: "idem-1",
status: "in_flight",
sessionKey: "agent:main:main",
});
await agentCliCommand({ message: "hi", to: "+1555", runId: "idem-1" }, runtime);
expect(runtime.error).toHaveBeenCalledWith(
"Agent run idem-1 is already in flight; not starting a duplicate run.",
);
expect(runtime.log).not.toHaveBeenCalledWith("No reply from agent.");
});
});
it("passes model overrides through gateway requests", async () => {
await withTempStore(async () => {
mockGatewaySuccessReply();

View File

@@ -6,7 +6,12 @@ import type { CliDeps } from "../cli/deps.types.js";
import { withProgress } from "../cli/progress.js";
import { getRuntimeConfig } from "../config/config.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { callGateway, isGatewayTransportError, randomIdempotencyKey } from "../gateway/call.js";
import {
callGateway,
isGatewayTransportError,
randomIdempotencyKey,
type GatewayRequestFunction,
} from "../gateway/call.js";
import { ADMIN_SCOPE } from "../gateway/operator-scopes.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../gateway/protocol/client-info.js";
import { routeLogsToStderr } from "../logging/console.js";
@@ -71,6 +76,27 @@ type AgentCliOpts = {
local?: boolean;
};
type AgentCliSignal = "SIGINT" | "SIGTERM";
type AgentCliProcessLike = {
on(signal: AgentCliSignal, handler: () => void): unknown;
off(signal: AgentCliSignal, handler: () => void): unknown;
};
type AgentCliDeps = CliDeps & {
process?: AgentCliProcessLike;
};
type AgentGatewayCallIdentity = Pick<
Parameters<typeof callGateway>[0],
"clientName" | "mode" | "scopes"
>;
const AGENT_CLI_SIGNALS: readonly AgentCliSignal[] = ["SIGINT", "SIGTERM"];
const GATEWAY_ABORT_RETRY_DELAYS_MS = [50, 150, 300, 600] as const;
const GATEWAY_ABORT_REQUEST_TIMEOUT_MS = 2_000;
const AGENT_CLI_SIGNAL_EXIT_CODES: Record<AgentCliSignal, number> = {
SIGINT: 130,
SIGTERM: 143,
};
function protectJsonStdout(opts: Pick<AgentCliOpts, "json">): void {
if (opts.json === true) {
routeLogsToStderr();
@@ -173,6 +199,211 @@ function normalizeSessionKeyOptsForDispatch(opts: AgentCliOpts): AgentCliOpts {
};
}
function isAbortError(err: unknown): boolean {
return err instanceof Error && err.name === "AbortError";
}
function readAcceptedRunContext(payload: unknown): {
runId?: string;
sessionKey?: string;
} {
if (!payload || typeof payload !== "object") {
return {};
}
const runId = (payload as { runId?: unknown }).runId;
const sessionKey = (payload as { sessionKey?: unknown }).sessionKey;
const status = (payload as { status?: unknown }).status;
if (status !== "accepted") {
return {};
}
return {
runId: typeof runId === "string" && runId.trim() ? runId.trim() : undefined,
sessionKey: typeof sessionKey === "string" && sessionKey.trim() ? sessionKey.trim() : undefined,
};
}
function createAgentCliSignalBridge(processLike: AgentCliProcessLike = process) {
const controller = new AbortController();
let receivedSignal: AgentCliSignal | undefined;
const handlers = new Map<AgentCliSignal, () => void>();
const detachHandlers = () => {
for (const [signal, handler] of handlers) {
processLike.off(signal, handler);
}
handlers.clear();
};
for (const signal of AGENT_CLI_SIGNALS) {
const handler = () => {
receivedSignal = signal;
if (!controller.signal.aborted) {
// runtime.exit may bypass finally cleanup, so first-signal self-detach is load-bearing.
controller.abort();
detachHandlers();
}
};
handlers.set(signal, handler);
processLike.on(signal, handler);
}
return {
signal: controller.signal,
getReceivedSignal: () => receivedSignal,
dispose: detachHandlers,
};
}
function isAgentCliProcessLike(value: unknown): value is AgentCliProcessLike {
return (
Boolean(value) &&
typeof value === "object" &&
typeof (value as { on?: unknown }).on === "function" &&
typeof (value as { off?: unknown }).off === "function"
);
}
function resolveAgentCliProcessLike(deps: AgentCliDeps | undefined): AgentCliProcessLike {
if (!deps || !Object.prototype.hasOwnProperty.call(deps, "process")) {
return process;
}
const processLike = (deps as { process?: unknown }).process;
return isAgentCliProcessLike(processLike) ? processLike : process;
}
function delayMs(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function isConfirmedChatAbortResponseForRun(value: unknown, runId: string): boolean {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return false;
}
const response = value as { aborted?: unknown; runIds?: unknown };
if (response.aborted !== true) {
return false;
}
if (response.runIds === undefined) {
return true;
}
return Array.isArray(response.runIds) && response.runIds.includes(runId);
}
async function abortAcceptedGatewayAgentRunWithRequest(params: {
runId: string | undefined;
sessionKey: string | undefined;
signal: AgentCliSignal | undefined;
runtime: RuntimeEnv;
request: GatewayRequestFunction;
logFailure?: boolean;
}): Promise<boolean> {
if (!params.signal || !params.runId || !params.sessionKey) {
return false;
}
try {
const response = await params.request(
"chat.abort",
{
sessionKey: params.sessionKey,
runId: params.runId,
},
{ timeoutMs: GATEWAY_ABORT_REQUEST_TIMEOUT_MS },
);
if (isConfirmedChatAbortResponseForRun(response, params.runId)) {
return true;
}
if (params.logFailure !== false) {
params.runtime.error?.(
`Interrupted by ${params.signal}; Gateway run ${params.runId} was not confirmed aborted.`,
);
}
return false;
} catch (err) {
if (params.logFailure !== false) {
params.runtime.error?.(
`Interrupted by ${params.signal}; failed to abort Gateway run ${params.runId}: ${String(
err,
)}`,
);
}
return false;
}
}
async function abortAcceptedGatewayAgentRunWithGatewayCall(params: {
runId: string | undefined;
sessionKey: string | undefined;
signal: AgentCliSignal | undefined;
runtime: RuntimeEnv;
gatewayIdentity: AgentGatewayCallIdentity;
}): Promise<void> {
const request: GatewayRequestFunction = async <T = Record<string, unknown>>(
method: string,
requestParams?: unknown,
opts?: Parameters<GatewayRequestFunction>[2],
): Promise<T> =>
await callGateway<T>({
method,
params: requestParams,
timeoutMs: opts?.timeoutMs ?? undefined,
expectFinal: opts?.expectFinal,
...params.gatewayIdentity,
});
for (const [attempt, retryDelayMs] of [...GATEWAY_ABORT_RETRY_DELAYS_MS, 0].entries()) {
const isFinalAttempt = attempt === GATEWAY_ABORT_RETRY_DELAYS_MS.length;
const aborted = await abortAcceptedGatewayAgentRunWithRequest({
runId: params.runId,
sessionKey: params.sessionKey,
signal: params.signal,
runtime: params.runtime,
request,
logFailure: isFinalAttempt,
});
if (aborted || isFinalAttempt) {
return;
}
await delayMs(retryDelayMs);
}
}
async function abortAcceptedGatewayAgentRunOnActiveConnection(params: {
runId: string | undefined;
sessionKey: string | undefined;
signal: AgentCliSignal | undefined;
runtime: RuntimeEnv;
request: GatewayRequestFunction;
}): Promise<boolean> {
for (const [attempt, retryDelayMs] of [...GATEWAY_ABORT_RETRY_DELAYS_MS, 0].entries()) {
const isFinalAttempt = attempt === GATEWAY_ABORT_RETRY_DELAYS_MS.length;
const aborted = await abortAcceptedGatewayAgentRunWithRequest({
runId: params.runId,
sessionKey: params.sessionKey,
signal: params.signal,
runtime: params.runtime,
request: params.request,
logFailure: false,
});
if (aborted || isFinalAttempt) {
return aborted;
}
await delayMs(retryDelayMs);
}
return false;
}
function exitForReceivedSignal(signal: AgentCliSignal | undefined, runtime: RuntimeEnv): boolean {
if (!signal) {
return false;
}
runtime.exit(AGENT_CLI_SIGNAL_EXIT_CODES[signal]);
return true;
}
function returnAfterSignalExit<T>(
value: T,
signal: AgentCliSignal | undefined,
runtime: RuntimeEnv,
): T | undefined {
return exitForReceivedSignal(signal, runtime) ? undefined : value;
}
function createGatewayTimeoutFallbackSessionId(): string {
return `${GATEWAY_TIMEOUT_FALLBACK_SESSION_PREFIX}${randomUUID()}`;
}
@@ -227,7 +458,21 @@ function buildGatewayJsonResponse(response: GatewayAgentResponse): GatewayAgentR
};
}
async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: RuntimeEnv) {
function isInFlightGatewayAgentResponse(response: GatewayAgentResponse): boolean {
return response.status === "in_flight";
}
function formatInFlightGatewayAgentMessage(response: GatewayAgentResponse): string {
return response.runId
? `Agent run ${response.runId} is already in flight; not starting a duplicate run.`
: "Agent run is already in flight; not starting a duplicate run.";
}
async function agentViaGatewayCommand(
opts: AgentCliOpts,
runtime: RuntimeEnv,
signalBridge: ReturnType<typeof createAgentCliSignalBridge>,
) {
protectJsonStdout(opts);
const body = (opts.message ?? "").trim();
const explicitSessionKey = opts.sessionKey?.trim();
@@ -271,44 +516,90 @@ async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: RuntimeEnv) {
const idempotencyKey = normalizeOptionalString(opts.runId) || randomIdempotencyKey();
const modelOverride = normalizeOptionalString(opts.model);
const hasModelOverride = Boolean(modelOverride);
const gatewayIdentity: AgentGatewayCallIdentity = hasModelOverride
? {
clientName: GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT,
mode: GATEWAY_CLIENT_MODES.BACKEND,
scopes: [ADMIN_SCOPE],
}
: {
clientName: GATEWAY_CLIENT_NAMES.CLI,
mode: GATEWAY_CLIENT_MODES.CLI,
};
const response: GatewayAgentResponse = await withProgress(
{
label: "Waiting for agent reply…",
indeterminate: true,
enabled: opts.json !== true,
},
async () =>
await callGateway({
method: "agent",
params: {
message: body,
agentId,
model: modelOverride,
to: opts.to,
replyTo: opts.replyTo,
sessionId: opts.sessionId,
sessionKey,
thinking: opts.thinking,
deliver: Boolean(opts.deliver),
channel,
replyChannel: opts.replyChannel,
replyAccountId: opts.replyAccount,
bestEffortDeliver: opts.bestEffortDeliver,
timeout: timeoutSeconds,
lane: opts.lane,
extraSystemPrompt: opts.extraSystemPrompt,
idempotencyKey,
},
expectFinal: true,
timeoutMs: gatewayTimeoutMs,
clientName: hasModelOverride
? GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT
: GATEWAY_CLIENT_NAMES.CLI,
mode: hasModelOverride ? GATEWAY_CLIENT_MODES.BACKEND : GATEWAY_CLIENT_MODES.CLI,
...(hasModelOverride ? { scopes: [ADMIN_SCOPE] } : {}),
}),
);
let acceptedRunId: string | undefined = idempotencyKey;
let acceptedSessionKey: string | undefined = sessionKey;
let acceptedGatewayRun = false;
let activeConnectionAbortAttempted = false;
let activeConnectionAbortSucceeded = false;
let response: GatewayAgentResponse;
try {
response = await withProgress(
{
label: "Waiting for agent reply…",
indeterminate: true,
enabled: opts.json !== true,
},
async () =>
await callGateway({
method: "agent",
params: {
message: body,
agentId,
model: modelOverride,
to: opts.to,
replyTo: opts.replyTo,
sessionId: opts.sessionId,
sessionKey,
thinking: opts.thinking,
deliver: Boolean(opts.deliver),
channel,
replyChannel: opts.replyChannel,
replyAccountId: opts.replyAccount,
bestEffortDeliver: opts.bestEffortDeliver,
timeout: timeoutSeconds,
lane: opts.lane,
extraSystemPrompt: opts.extraSystemPrompt,
idempotencyKey,
},
expectFinal: true,
timeoutMs: gatewayTimeoutMs,
signal: signalBridge.signal,
onAccepted: (payload) => {
acceptedGatewayRun = true;
const accepted = readAcceptedRunContext(payload);
acceptedRunId = accepted.runId ?? acceptedRunId;
acceptedSessionKey = accepted.sessionKey ?? acceptedSessionKey;
},
onSignalAbort: async (request) => {
activeConnectionAbortAttempted = true;
activeConnectionAbortSucceeded = await abortAcceptedGatewayAgentRunOnActiveConnection({
runId: acceptedRunId,
sessionKey: acceptedSessionKey,
signal: signalBridge.getReceivedSignal(),
runtime,
request,
});
},
...gatewayIdentity,
}),
);
} catch (err) {
if (
isAbortError(err) &&
!activeConnectionAbortSucceeded &&
(acceptedGatewayRun || activeConnectionAbortAttempted)
) {
await abortAcceptedGatewayAgentRunWithGatewayCall({
runId: acceptedRunId,
sessionKey: acceptedSessionKey,
signal: signalBridge.getReceivedSignal(),
runtime,
gatewayIdentity,
});
}
throw err;
}
if (opts.json) {
writeRuntimeJson(runtime, buildGatewayJsonResponse(response));
@@ -318,6 +609,11 @@ async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: RuntimeEnv) {
const result = response?.result;
const payloads = result?.payloads ?? [];
if (isInFlightGatewayAgentResponse(response)) {
runtime.error?.(formatInFlightGatewayAgentMessage(response));
return response;
}
if (payloads.length === 0) {
if (response?.status !== "ok") {
runtime.log(response?.summary ? response.summary : "No reply from agent.");
@@ -335,62 +631,87 @@ async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: RuntimeEnv) {
return response;
}
export async function agentCliCommand(opts: AgentCliOpts, runtime: RuntimeEnv, deps?: CliDeps) {
export async function agentCliCommand(
opts: AgentCliOpts,
runtime: RuntimeEnv,
deps?: AgentCliDeps,
) {
protectJsonStdout(opts);
const dispatchOpts = normalizeSessionKeyOptsForDispatch(opts);
validateExplicitSessionKeyForDispatch(dispatchOpts);
const signalBridge = createAgentCliSignalBridge(resolveAgentCliProcessLike(deps));
const localOpts = {
...dispatchOpts,
agentId: dispatchOpts.agent,
replyAccountId: dispatchOpts.replyAccount,
cleanupBundleMcpOnRunEnd: true,
cleanupCliLiveSessionOnRunEnd: true,
abortSignal: signalBridge.signal,
};
if (dispatchOpts.local === true) {
return await agentCommand(localOpts, runtime, deps);
}
try {
return await agentViaGatewayCommand(dispatchOpts, runtime);
} catch (err) {
if (isGatewayAgentTimeoutError(err)) {
const fallbackAgentId = resolveAgentIdForGatewayTimeoutFallback(dispatchOpts);
const fallbackSession = createGatewayTimeoutFallbackSession(fallbackAgentId);
if (dispatchOpts.local === true) {
const result = await agentCommand(localOpts, runtime, deps);
return returnAfterSignalExit(result, signalBridge.getReceivedSignal(), runtime);
}
try {
const result = await agentViaGatewayCommand(dispatchOpts, runtime, signalBridge);
return returnAfterSignalExit(result, signalBridge.getReceivedSignal(), runtime);
} catch (err) {
if (isAbortError(err)) {
if (exitForReceivedSignal(signalBridge.getReceivedSignal(), runtime)) {
return undefined;
}
throw err;
}
if (isGatewayAgentTimeoutError(err)) {
const fallbackAgentId = resolveAgentIdForGatewayTimeoutFallback(dispatchOpts);
const fallbackSession = createGatewayTimeoutFallbackSession(fallbackAgentId);
runtime.error?.(
`EMBEDDED FALLBACK: Gateway agent timed out; running embedded agent with fresh session ${fallbackSession.sessionId}: ${String(err)}`,
);
const result = await agentCommand(
{
...localOpts,
sessionId: fallbackSession.sessionId,
sessionKey: fallbackSession.sessionKey,
runId: fallbackSession.sessionId,
resultMetaOverrides: {
...EMBEDDED_FALLBACK_META,
fallbackReason: "gateway_timeout",
fallbackSessionId: fallbackSession.sessionId,
fallbackSessionKey: fallbackSession.sessionKey,
},
},
runtime,
deps,
);
return returnAfterSignalExit(result, signalBridge.getReceivedSignal(), runtime);
}
if (!isGatewayAgentEmbeddedFallbackError(err)) {
throw err;
}
runtime.error?.(
`EMBEDDED FALLBACK: Gateway agent timed out; running embedded agent with fresh session ${fallbackSession.sessionId}: ${String(err)}`,
`EMBEDDED FALLBACK: Gateway agent failed; running embedded agent: ${String(err)}`,
);
return await agentCommand(
const result = await agentCommand(
{
...localOpts,
sessionId: fallbackSession.sessionId,
sessionKey: fallbackSession.sessionKey,
runId: fallbackSession.sessionId,
resultMetaOverrides: {
...EMBEDDED_FALLBACK_META,
fallbackReason: "gateway_timeout",
fallbackSessionId: fallbackSession.sessionId,
fallbackSessionKey: fallbackSession.sessionKey,
},
resultMetaOverrides: EMBEDDED_FALLBACK_META,
},
runtime,
deps,
);
return returnAfterSignalExit(result, signalBridge.getReceivedSignal(), runtime);
}
if (!isGatewayAgentEmbeddedFallbackError(err)) {
throw err;
} catch (err) {
if (isAbortError(err) && exitForReceivedSignal(signalBridge.getReceivedSignal(), runtime)) {
return undefined;
}
runtime.error?.(
`EMBEDDED FALLBACK: Gateway agent failed; running embedded agent: ${String(err)}`,
);
return await agentCommand(
{
...localOpts,
resultMetaOverrides: EMBEDDED_FALLBACK_META,
},
runtime,
deps,
);
throw err;
} finally {
signalBridge.dispose();
}
}

View File

@@ -75,7 +75,12 @@ let lastClientOptions: {
let lastRequestOptions: {
method?: string;
params?: unknown;
opts?: { expectFinal?: boolean; timeoutMs?: number | null };
opts?: {
expectFinal?: boolean;
timeoutMs?: number | null;
signal?: AbortSignal;
onAccepted?: (payload: unknown) => void;
};
} | null = null;
type StartMode = "hello" | "close" | "connect-error" | "silent" | "startup-retry-then-hello";
let startMode: StartMode = "hello";
@@ -185,7 +190,12 @@ class StubGatewayClient {
async request(
method: string,
params: unknown,
opts?: { expectFinal?: boolean; timeoutMs?: number | null },
opts?: {
expectFinal?: boolean;
timeoutMs?: number | null;
signal?: AbortSignal;
onAccepted?: (payload: unknown) => void;
},
) {
lastRequestOptions = { method, params, opts };
return { ok: true };
@@ -1195,6 +1205,166 @@ describe("callGateway error details", () => {
expect(lastRequestOptions?.opts?.timeoutMs).toBe(45_000);
});
it("forwards caller abort signal and accepted callback to client requests", async () => {
setLocalLoopbackGatewayConfig();
const controller = new AbortController();
const onAccepted = vi.fn();
await callGateway({
method: "agent",
expectFinal: true,
signal: controller.signal,
onAccepted,
});
expect(lastRequestOptions?.method).toBe("agent");
expect(lastRequestOptions?.opts?.signal).toBe(controller.signal);
expect(lastRequestOptions?.opts?.onAccepted).toBe(onAccepted);
});
it("runs the signal abort hook on the active gateway connection before teardown", async () => {
setLocalLoopbackGatewayConfig();
const controller = new AbortController();
const abortRequests: Array<{
method: string;
params: unknown;
opts?: { timeoutMs?: number | null };
}> = [];
let stopStarted = false;
testing.setDepsForTests({
createGatewayClient: (opts) =>
({
async request(
method: string,
params: unknown,
requestOpts?: {
expectFinal?: boolean;
timeoutMs?: number | null;
signal?: AbortSignal;
},
) {
lastRequestOptions = { method, params, opts: requestOpts };
if (method === "agent") {
return await new Promise((_, reject) => {
requestOpts?.signal?.addEventListener(
"abort",
() => {
const err = new Error("gateway request aborted for agent");
err.name = "AbortError";
reject(err);
},
{ once: true },
);
});
}
abortRequests.push({ method, params, opts: requestOpts });
return { ok: true };
},
start() {
opts.onHelloOk?.({
features: {
methods: helloMethods ?? [],
events: [],
},
} as unknown as Parameters<NonNullable<typeof opts.onHelloOk>>[0]);
},
stop() {},
async stopAndWait() {
stopStarted = true;
},
}) as never,
getRuntimeConfig: getRuntimeConfig as unknown as () => OpenClawConfig,
loadOrCreateDeviceIdentity: () => deviceIdentityState.value,
resolveGatewayPort: resolveGatewayPort as unknown as (
cfg?: OpenClawConfig,
env?: NodeJS.ProcessEnv,
) => number,
});
const promise = callGateway({
method: "agent",
expectFinal: true,
signal: controller.signal,
onSignalAbort: async (request) => {
await request("chat.abort", { sessionKey: "main", runId: "run-1" }, { timeoutMs: 5_000 });
},
});
await vi.waitFor(() => {
expect(lastRequestOptions?.method).toBe("agent");
});
controller.abort();
await expect(promise).rejects.toThrow("gateway request aborted for agent");
expect(abortRequests).toEqual([
{
method: "chat.abort",
params: { sessionKey: "main", runId: "run-1" },
opts: { timeoutMs: 5_000 },
},
]);
expect(stopStarted).toBe(true);
});
it("skips the signal abort hook before the primary request starts", async () => {
setLocalLoopbackGatewayConfig();
const controller = new AbortController();
const onSignalAbort = vi.fn(async () => undefined);
let startCalled = false;
let stopStarted = false;
testing.setDepsForTests({
createGatewayClient: () =>
({
async request(
method: string,
params: unknown,
requestOpts?: {
expectFinal?: boolean;
timeoutMs?: number | null;
signal?: AbortSignal;
},
) {
lastRequestOptions = { method, params, opts: requestOpts };
return { ok: true };
},
start() {
startCalled = true;
},
stop() {},
async stopAndWait() {
stopStarted = true;
},
}) as never,
getRuntimeConfig: getRuntimeConfig as unknown as () => OpenClawConfig,
loadOrCreateDeviceIdentity: () => deviceIdentityState.value,
resolveGatewayPort: resolveGatewayPort as unknown as (
cfg?: OpenClawConfig,
env?: NodeJS.ProcessEnv,
) => number,
});
const promise = callGateway({
method: "agent",
expectFinal: true,
signal: controller.signal,
onSignalAbort,
});
await vi.waitFor(() => {
expect(startCalled).toBe(true);
});
controller.abort();
await expect(promise).rejects.toThrow("gateway request aborted for agent");
expect(onSignalAbort).not.toHaveBeenCalled();
expect(lastRequestOptions).toBeNull();
expect(stopStarted).toBe(true);
});
it("passes configured gateway handshake timeout to the client watchdog", async () => {
getRuntimeConfig.mockReturnValue({
gateway: { mode: "local", bind: "loopback", handshakeTimeoutMs: 30_000 },

View File

@@ -24,6 +24,7 @@ import {
GatewayClient,
isGatewayConnectAssemblyError,
type GatewayClientOptions,
type GatewayClientRequestOptions,
} from "./client.js";
import {
buildGatewayConnectionDetailsWithResolvers,
@@ -49,6 +50,12 @@ import {
import { MIN_CLIENT_PROTOCOL_VERSION, PROTOCOL_VERSION } from "./protocol/index.js";
export type { GatewayConnectionDetails };
export type GatewayRequestFunction = <T = Record<string, unknown>>(
method: string,
params?: unknown,
opts?: GatewayClientRequestOptions,
) => Promise<T>;
type CallGatewayBaseOptions = {
url?: string;
token?: string;
@@ -59,6 +66,9 @@ type CallGatewayBaseOptions = {
params?: unknown;
expectFinal?: boolean;
timeoutMs?: number;
signal?: AbortSignal;
onAccepted?: GatewayClientRequestOptions["onAccepted"];
onSignalAbort?: (request: GatewayRequestFunction) => Promise<void> | void;
clientName?: GatewayClientName;
clientDisplayName?: string;
clientVersion?: string;
@@ -619,6 +629,12 @@ function createGatewayTimeoutTransportError(params: {
});
}
function createGatewayRequestAbortError(method: string): Error {
const err = new Error(`gateway request aborted for ${method}`);
err.name = "AbortError";
return err;
}
function ensureGatewaySupportsRequiredMethods(params: {
requiredMethods: string[] | undefined;
methods: string[] | undefined;
@@ -672,26 +688,75 @@ async function executeGatewayRequestWithScopes<T>(params: {
safeTimerTimeoutMs,
} = params;
return await new Promise<T>((resolve, reject) => {
if (opts.signal?.aborted) {
reject(createGatewayRequestAbortError(opts.method));
return;
}
let settled = false;
let ignoreClose = false;
const startAbort = new AbortController();
const stop = (err?: Error, value?: T) => {
if (settled) {
return;
}
settled = true;
let abortHandler: (() => void) | undefined;
let client: GatewayClient | undefined;
let timer: NodeJS.Timeout | undefined;
let primaryRequestStarted = false;
const cleanup = () => {
startAbort.abort();
clearTimeout(timer);
void stopGatewayClient(client).finally(() => {
if (abortHandler) {
opts.signal?.removeEventListener("abort", abortHandler);
}
if (timer) {
clearTimeout(timer);
}
};
const stopClientThenSettle = (
activeClient: GatewayClient | undefined,
err?: Error,
value?: T,
) => {
const complete = () => {
if (err) {
reject(err);
} else {
resolve(value as T);
}
});
};
if (!activeClient) {
complete();
return;
}
void stopGatewayClient(activeClient).finally(complete);
};
const stop = (err?: Error, value?: T) => {
if (settled) {
return;
}
settled = true;
cleanup();
stopClientThenSettle(client, err, value);
};
abortHandler = () => {
if (settled) {
return;
}
ignoreClose = true;
settled = true;
cleanup();
const err = createGatewayRequestAbortError(opts.method);
const activeClient = client;
const stopAfterAbortHook = () => stopClientThenSettle(activeClient, err);
if (!activeClient || !opts.onSignalAbort || !primaryRequestStarted) {
stopAfterAbortHook();
return;
}
const request: GatewayRequestFunction = activeClient.request.bind(activeClient);
void Promise.resolve()
.then(() => opts.onSignalAbort?.(request))
.catch(() => {})
.finally(stopAfterAbortHook);
};
opts.signal?.addEventListener("abort", abortHandler, { once: true });
const client = gatewayCallDeps.createGatewayClient({
client = gatewayCallDeps.createGatewayClient({
url,
token,
password,
@@ -719,9 +784,16 @@ async function executeGatewayRequestWithScopes<T>(params: {
methods: hello.features?.methods,
attemptedMethod: opts.method,
});
const result = await client.request<T>(opts.method, opts.params, {
const activeClient = client;
if (!activeClient) {
throw new Error("gateway client not initialized");
}
primaryRequestStarted = true;
const result = await activeClient.request<T>(opts.method, opts.params, {
expectFinal: opts.expectFinal,
timeoutMs: opts.timeoutMs,
signal: opts.signal,
onAccepted: opts.onAccepted,
});
ignoreClose = true;
stop(undefined, result);
@@ -752,7 +824,7 @@ async function executeGatewayRequestWithScopes<T>(params: {
},
});
const timer = setTimeout(() => {
timer = setTimeout(() => {
ignoreClose = true;
stop(
createGatewayTimeoutTransportError({

View File

@@ -61,6 +61,17 @@ type Pending = {
reject: (err: unknown) => void;
expectFinal: boolean;
timeout: NodeJS.Timeout | null;
cleanup?: () => void;
onAccepted?: (payload: unknown) => void;
acceptedNotified?: boolean;
};
export type GatewayClientRequestOptions = {
expectFinal?: boolean;
timeoutMs?: number | null;
signal?: AbortSignal;
/** Called once for expectFinal requests after an accepted response, before the final result. */
onAccepted?: (payload: unknown) => void;
};
type GatewayClientErrorShape = {
@@ -1031,12 +1042,20 @@ export class GatewayClient {
const payload = parsed.payload as { status?: unknown } | undefined;
const status = payload?.status;
if (pending.expectFinal && status === "accepted") {
if (!pending.acceptedNotified) {
pending.acceptedNotified = true;
try {
pending.onAccepted?.(parsed.payload);
} catch (err) {
logDebug(
`gateway client accepted callback error: ${formatGatewayClientErrorForLog(err)}`,
);
}
}
return;
}
this.pending.delete(parsed.id);
if (pending.timeout) {
clearTimeout(pending.timeout);
}
pending.cleanup?.();
if (parsed.ok) {
pending.resolve(parsed.payload);
} else {
@@ -1120,9 +1139,7 @@ export class GatewayClient {
private flushPendingErrors(err: Error) {
for (const [, p] of this.pending) {
if (p.timeout) {
clearTimeout(p.timeout);
}
p.cleanup?.();
p.reject(err);
}
this.pending.clear();
@@ -1185,11 +1202,14 @@ export class GatewayClient {
async request<T = Record<string, unknown>>(
method: string,
params?: unknown,
opts?: { expectFinal?: boolean; timeoutMs?: number | null },
opts?: GatewayClientRequestOptions,
): Promise<T> {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error("gateway not connected");
}
if (opts?.signal?.aborted) {
throw createGatewayRequestAbortError(method);
}
const id = randomUUID();
const frame: RequestFrame = { type: "req", id, method, params };
if (!validateRequestFrame(frame)) {
@@ -1206,22 +1226,49 @@ export class GatewayClient {
: expectFinal
? null
: this.requestTimeoutMs;
const signal = opts?.signal;
const p = new Promise<T>((resolve, reject) => {
let abortHandler: (() => void) | undefined;
const timeout =
timeoutMs === null
? null
: setTimeout(() => {
const pending = this.pending.get(id);
this.pending.delete(id);
pending?.cleanup?.();
reject(new Error(`gateway request timeout for ${method}`));
}, timeoutMs);
const cleanup = () => {
if (timeout) {
clearTimeout(timeout);
}
if (signal && abortHandler) {
signal.removeEventListener("abort", abortHandler);
}
};
abortHandler = () => {
const pending = this.pending.get(id);
this.pending.delete(id);
pending?.cleanup?.();
reject(createGatewayRequestAbortError(method));
};
this.pending.set(id, {
resolve: (value) => resolve(value as T),
reject,
expectFinal,
timeout,
cleanup,
onAccepted: opts?.onAccepted,
});
signal?.addEventListener("abort", abortHandler, { once: true });
});
this.ws.send(JSON.stringify(frame));
return p;
}
}
function createGatewayRequestAbortError(method: string): Error {
const err = new Error(`gateway request aborted for ${method}`);
err.name = "AbortError";
return err;
}

View File

@@ -307,6 +307,90 @@ describe("GatewayClient", () => {
}
});
test("notifies accepted expectFinal requests while continuing to wait for final", async () => {
const client = new GatewayClient({
requestTimeoutMs: 25,
});
const send = vi.fn();
(
client as unknown as {
ws: WebSocket | { readyState: number; send: (data: string) => void; close: () => void };
}
).ws = {
readyState: WebSocket.OPEN,
send,
close: vi.fn(),
};
const onAccepted = vi.fn();
const requestPromise = client.request<{ status: string }>("agent", undefined, {
expectFinal: true,
onAccepted,
});
const frame = JSON.parse(String(send.mock.calls[0]?.[0])) as { id: string };
(
client as unknown as {
handleMessage: (raw: string) => void;
}
).handleMessage(
JSON.stringify({
type: "res",
id: frame.id,
ok: true,
payload: { status: "accepted", runId: "run-1" },
}),
);
expect(onAccepted).toHaveBeenCalledWith({ status: "accepted", runId: "run-1" });
expect((client as unknown as { pending: Map<string, unknown> }).pending.size).toBe(1);
(
client as unknown as {
handleMessage: (raw: string) => void;
}
).handleMessage(
JSON.stringify({
type: "res",
id: frame.id,
ok: true,
payload: { status: "ok" },
}),
);
await expect(requestPromise).resolves.toEqual({ status: "ok" });
expect((client as unknown as { pending: Map<string, unknown> }).pending.size).toBe(0);
});
test("aborts in-flight requests from caller AbortSignal", async () => {
const client = new GatewayClient({
requestTimeoutMs: 25,
});
const send = vi.fn();
(
client as unknown as {
ws: WebSocket | { readyState: number; send: () => void; close: () => void };
}
).ws = {
readyState: WebSocket.OPEN,
send,
close: vi.fn(),
};
const controller = new AbortController();
const requestPromise = client.request("status", undefined, {
signal: controller.signal,
timeoutMs: null,
});
expect(send).toHaveBeenCalledTimes(1);
expect((client as unknown as { pending: Map<string, unknown> }).pending.size).toBe(1);
controller.abort();
await expect(requestPromise).rejects.toThrow("gateway request aborted for status");
expect((client as unknown as { pending: Map<string, unknown> }).pending.size).toBe(0);
});
test("clamps oversized explicit request timeouts before scheduling", async () => {
vi.useFakeTimers();
try {

View File

@@ -306,6 +306,43 @@ describe("startGatewayMaintenanceTimers", () => {
stopMaintenanceTimers(timers);
});
it("keeps pending accepted agent dedupe entries until their run expiry", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
const now = Date.now();
deps.dedupe.set("agent:pending-agent", {
ts: now - DEDUPE_TTL_MS - 1,
ok: true,
payload: {
runId: "pending-agent",
sessionKey: "agent:main:main",
status: "accepted",
expiresAtMs: now + 120_000,
},
});
deps.dedupe.set("agent:expired-pending-agent", {
ts: now - DEDUPE_TTL_MS - 1,
ok: true,
payload: {
runId: "expired-pending-agent",
sessionKey: "agent:main:main",
status: "accepted",
expiresAtMs: now - 1,
},
});
const timers = startGatewayMaintenanceTimers(deps);
await vi.advanceTimersByTimeAsync(60_000);
expect(deps.dedupe.has("agent:pending-agent")).toBe(true);
expect(deps.dedupe.has("agent:expired-pending-agent")).toBe(false);
stopMaintenanceTimers(timers);
});
it("keeps active exec approval dedupe aliases past the normal ttl", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));

View File

@@ -106,6 +106,20 @@ export function startGatewayMaintenanceTimers(params: {
: undefined
: undefined;
};
const isPendingAcceptedAgentDedupeKey = (key: string, dedupeEntry: DedupeEntry) => {
if (!key.startsWith("agent:")) {
return false;
}
const payload = dedupeEntry.payload;
if (!payload || typeof payload !== "object" || Array.isArray(payload)) {
return false;
}
if ((payload as { status?: unknown }).status !== "accepted") {
return false;
}
const expiresAtMs = (payload as { expiresAtMs?: unknown }).expiresAtMs;
return typeof expiresAtMs === "number" && Number.isFinite(expiresAtMs) && expiresAtMs > now;
};
const isActiveRunDedupeKey = (key: string, dedupeEntry: DedupeEntry) => {
if (!key.startsWith("agent:") && !key.startsWith("chat:")) {
return false;
@@ -118,7 +132,7 @@ export function startGatewayMaintenanceTimers(params: {
return key.startsWith("agent:") ? entry.kind === "agent" : entry.kind !== "agent";
};
for (const [k, v] of params.dedupe) {
if (isActiveRunDedupeKey(k, v)) {
if (isActiveRunDedupeKey(k, v) || isPendingAcceptedAgentDedupeKey(k, v)) {
continue;
}
if (now - v.ts > DEDUPE_TTL_MS) {
@@ -128,7 +142,10 @@ export function startGatewayMaintenanceTimers(params: {
if (params.dedupe.size > DEDUPE_MAX) {
const excess = params.dedupe.size - DEDUPE_MAX;
const oldestKeys = [...params.dedupe.entries()]
.filter(([key, entry]) => !isActiveRunDedupeKey(key, entry))
.filter(
([key, entry]) =>
!isActiveRunDedupeKey(key, entry) && !isPendingAcceptedAgentDedupeKey(key, entry),
)
.toSorted(([, left], [, right]) => left.ts - right.ts)
.slice(0, excess)
.map(([key]) => key);

View File

@@ -1,5 +1,5 @@
import fs from "node:fs/promises";
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
registerExecApprovalFollowupRuntimeHandoff,
resetExecApprovalFollowupRuntimeHandoffsForTests,
@@ -91,6 +91,15 @@ vi.mock("../../config/config.js", async () => {
vi.mock("../../agents/agent-scope.js", () => ({
listAgentIds: mocks.listAgentIds,
resolveDefaultAgentId: () => "main",
resolveSessionAgentId: ({
sessionKey,
}: {
sessionKey?: string | null;
config?: Record<string, unknown>;
}) => {
const m = /^agent:([^:]+):/.exec((sessionKey ?? "").trim());
return m?.[1] ?? "main";
},
resolveAgentConfig: (cfg: { agents?: { list?: Array<{ id?: string }> } }, agentId: string) =>
cfg.agents?.list?.find((agent) => agent.id === agentId),
resolveAgentWorkspaceDir: (cfg: { agents?: { defaults?: { workspace?: string } } }) =>
@@ -1847,7 +1856,10 @@ describe("gateway agent handler", () => {
await flushScheduledDispatchStep();
expect(mocks.agentCommand).toHaveBeenCalledTimes(agentCommandCallsBefore + 1);
expect(mockCallArg(secondRespond, 0, 3)).toEqual({ cached: true });
expect(mockCallArg(secondRespond, 0, 3)).toEqual({
cached: true,
runId: firstRegistration.idempotencyKey,
});
});
it("reserves exec approval followup dedupe before awaited session work", async () => {
@@ -1932,9 +1944,12 @@ describe("gateway agent handler", () => {
expect(sessionWriteCalls).toBe(1);
expect(mockCallArg(secondRespond, 0, 1)).toMatchObject({
runId: firstRegistration.idempotencyKey,
status: "accepted",
status: "in_flight",
});
expect(mockCallArg(secondRespond, 0, 3)).toEqual({
cached: true,
runId: firstRegistration.idempotencyKey,
});
expect(mockCallArg(secondRespond, 0, 3)).toEqual({ cached: true });
releaseFirstSessionWrite?.();
await first;
@@ -3508,10 +3523,38 @@ describe("gateway agent handler", () => {
});
describe("gateway agent handler chat.abort integration", () => {
afterEach(() => {
function resetIntegrationState() {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetDetachedTaskLifecycleRuntimeForTests();
resetTaskRegistryForTests();
mocks.agentCommand.mockReset();
mocks.loadConfigReturn = {};
mocks.loadGatewaySessionRow.mockReset();
mocks.loadSessionEntry.mockReset();
mocks.updateSessionStore.mockReset();
mocks.getLatestSubagentRunByChildSessionKey.mockReset();
mocks.replaceSubagentRunAfterSteer.mockReset();
mocks.resolveExplicitAgentSessionKey.mockReset().mockReturnValue(undefined);
mocks.resolveBareResetBootstrapFileAccess.mockReset().mockReturnValue(true);
mocks.listAgentIds.mockReset().mockReturnValue(["main"]);
mocks.loadVoiceWakeRoutingConfig.mockReset();
mocks.resolveVoiceWakeRouteByTrigger.mockReset();
mocks.resolveSendPolicy.mockReset().mockReturnValue("allow");
dateOnlyFakeClockActive = false;
vi.useRealTimers();
resetExecApprovalFollowupRuntimeHandoffsForTests();
}
beforeEach(() => {
resetIntegrationState();
});
afterEach(() => {
resetIntegrationState();
});
function prime(sessionId = "existing-session-id", cfg: Record<string, unknown> = {}) {
@@ -3553,6 +3596,7 @@ describe("gateway agent handler chat.abort integration", () => {
prime();
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
const context = makeContext();
const respond = vi.fn();
const runId = "idem-yield-before-dispatch";
const pending = invokeAgent(
@@ -3562,17 +3606,25 @@ describe("gateway agent handler chat.abort integration", () => {
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ respond, reqId: runId, flushDispatch: false },
{ context, respond, reqId: runId, flushDispatch: false },
);
await Promise.resolve();
await Promise.resolve();
expect(mockCallArg(respond)).toBe(true);
expectRecordFields(mockCallArg(respond, 0, 1), {
const acceptedPayload = expectRecordFields(mockCallArg(respond, 0, 1), {
runId,
status: "accepted",
});
expect(acceptedPayload).not.toHaveProperty("dedupeKeys");
expect(acceptedPayload).not.toHaveProperty("ownerConnId");
expect(acceptedPayload).not.toHaveProperty("ownerDeviceId");
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
status: "accepted",
dedupeKeys: [`agent:${runId}`],
});
expect(mockCallArg(respond, 0, 2)).toBeUndefined();
expect(mockCallArg(respond, 0, 3)).toEqual({ runId });
expect(mocks.agentCommand).not.toHaveBeenCalled();
@@ -3585,11 +3637,669 @@ describe("gateway agent handler chat.abort integration", () => {
expect(mocks.agentCommand).toHaveBeenCalledTimes(1);
});
it("uses the explicit no-timeout agent expiry instead of the chat 24h cap", async () => {
it("does not dispatch when chat.abort lands during the accepted ack yield", async () => {
prime();
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
const context = makeContext();
const respond = vi.fn();
const runId = "idem-abort-before-dispatch";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, respond, reqId: runId, flushDispatch: false },
);
expectRecordFields(mockCallArg(respond, 0, 1), {
runId,
sessionKey: "agent:main:main",
status: "accepted",
});
expect(context.chatAbortControllers.has(runId)).toBe(true);
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:main", runId },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: null,
isWebchatConnect: () => false,
});
expectRecordFields(mockCallArg(abortRespond, 0, 1), {
aborted: true,
runIds: [runId],
});
expect(context.chatAbortControllers.has(runId)).toBe(false);
await flushScheduledDispatchStep();
expect(mocks.agentCommand).not.toHaveBeenCalled();
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
status: "timeout",
summary: "aborted",
stopReason: "rpc",
});
const finalResponse = respond.mock.calls.find(
(call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout",
);
expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], {
runId,
status: "timeout",
stopReason: "rpc",
});
});
it("does not dispatch when chat.abort lands during pre-accept setup", async () => {
prime();
const requestedSessionKey = "agent:main:legacy-main";
let releaseSessionWrite: (() => void) | undefined;
let sessionWriteCalls = 0;
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
sessionWriteCalls += 1;
if (sessionWriteCalls === 1) {
await new Promise<void>((resolve) => {
releaseSessionWrite = resolve;
});
}
const store = {
"agent:main:main": buildExistingMainStoreEntry(),
};
return await updater(store);
});
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
const context = makeContext();
const respond = vi.fn();
const runId = "idem-abort-before-registration";
const pending = invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: requestedSessionKey,
idempotencyKey: runId,
},
{ context, respond, reqId: runId, flushDispatch: false },
);
await waitForAssertion(() => expect(sessionWriteCalls).toBe(1));
expect(context.chatAbortControllers.has(runId)).toBe(false);
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: requestedSessionKey,
status: "accepted",
});
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: requestedSessionKey, runId },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: null,
isWebchatConnect: () => false,
});
expectRecordFields(mockCallArg(abortRespond, 0, 1), {
aborted: true,
runIds: [runId],
});
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: requestedSessionKey,
status: "timeout",
summary: "aborted",
stopReason: "rpc",
});
releaseSessionWrite?.();
await pending;
await flushScheduledDispatchStep();
expect(mocks.agentCommand).not.toHaveBeenCalled();
expect(context.chatAbortControllers.has(runId)).toBe(false);
const finalResponse = respond.mock.calls.find(
(call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout",
);
expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], {
runId,
status: "timeout",
stopReason: "rpc",
});
});
it("does not dispatch when a stop command lands during pre-accept setup", async () => {
prime();
const requestedSessionKey = "agent:main:legacy-main";
let releaseSessionWrite: (() => void) | undefined;
let sessionWriteCalls = 0;
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
sessionWriteCalls += 1;
if (sessionWriteCalls === 1) {
await new Promise<void>((resolve) => {
releaseSessionWrite = resolve;
});
}
const store = {
"agent:main:main": buildExistingMainStoreEntry(),
};
return await updater(store);
});
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
const context = makeContext();
const respond = vi.fn();
const runId = "idem-stop-before-registration";
const pending = invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: requestedSessionKey,
idempotencyKey: runId,
},
{ context, respond, reqId: runId, flushDispatch: false },
);
await waitForAssertion(() => expect(sessionWriteCalls).toBe(1));
expect(context.chatAbortControllers.has(runId)).toBe(false);
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: requestedSessionKey,
status: "accepted",
});
const stopRespond = vi.fn();
await chatHandlers["chat.send"]({
params: {
sessionKey: requestedSessionKey,
message: "/stop",
idempotencyKey: "idem-stop-command-before-registration",
},
respond: stopRespond as never,
context,
req: { type: "req", id: "stop-req", method: "chat.send" },
client: null,
isWebchatConnect: () => false,
});
expectRecordFields(mockCallArg(stopRespond, 0, 1), {
aborted: true,
runIds: [runId],
});
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: requestedSessionKey,
status: "timeout",
summary: "aborted",
stopReason: "stop",
});
releaseSessionWrite?.();
await pending;
await flushScheduledDispatchStep();
expect(mocks.agentCommand).not.toHaveBeenCalled();
expect(context.chatAbortControllers.has(runId)).toBe(false);
const finalResponse = respond.mock.calls.find(
(call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout",
);
expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], {
runId,
status: "timeout",
stopReason: "stop",
});
});
it("does not dispatch when session-level chat.abort lands during pre-accept setup", async () => {
prime();
let releaseSessionWrite: (() => void) | undefined;
let sessionWriteCalls = 0;
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
sessionWriteCalls += 1;
if (sessionWriteCalls === 1) {
await new Promise<void>((resolve) => {
releaseSessionWrite = resolve;
});
}
const store = {
"agent:main:main": buildExistingMainStoreEntry(),
};
return await updater(store);
});
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
const context = makeContext();
const respond = vi.fn();
const runId = "idem-session-level-abort-before-registration";
const pending = invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, respond, reqId: runId, flushDispatch: false },
);
await waitForAssertion(() => expect(sessionWriteCalls).toBe(1));
expect(context.chatAbortControllers.has(runId)).toBe(false);
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:main" },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: null,
isWebchatConnect: () => false,
});
expectRecordFields(mockCallArg(abortRespond, 0, 1), {
aborted: true,
runIds: [runId],
});
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: "agent:main:main",
status: "timeout",
summary: "aborted",
stopReason: "rpc",
});
releaseSessionWrite?.();
await pending;
await flushScheduledDispatchStep();
expect(mocks.agentCommand).not.toHaveBeenCalled();
expect(context.chatAbortControllers.has(runId)).toBe(false);
const finalResponse = respond.mock.calls.find(
(call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout",
);
expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], {
runId,
status: "timeout",
stopReason: "rpc",
});
});
it("does not dispatch when chat.abort lands during slow attachment setup", async () => {
mockMainSessionEntry({
sessionId: "existing-session-id",
model: "vision-model",
modelProvider: "test",
});
mocks.updateSessionStore.mockResolvedValue(undefined);
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
let releaseCatalog: (() => void) | undefined;
const context = {
...makeContext(),
loadGatewayModelCatalog: vi.fn(
async () =>
await new Promise((resolve) => {
releaseCatalog = () =>
resolve([
{
id: "vision-model",
name: "vision-model",
provider: "test",
input: ["image"],
},
]);
}),
),
} as unknown as GatewayRequestContext;
const respond = vi.fn();
const runId = "idem-abort-during-attachment-setup";
const pending = invokeAgent(
{
message: "inspect this",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
attachments: [
{
type: "file",
mimeType: "image/png",
fileName: "pixel.png",
content: Buffer.from("not really a png").toString("base64"),
},
],
},
{ context, respond, reqId: runId, flushDispatch: false },
);
await waitForAssertion(() =>
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: "agent:main:main",
status: "accepted",
}),
);
expect(context.chatAbortControllers.has(runId)).toBe(false);
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:main", runId },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: null,
isWebchatConnect: () => false,
});
expectRecordFields(mockCallArg(abortRespond, 0, 1), {
aborted: true,
runIds: [runId],
});
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: "agent:main:main",
status: "timeout",
summary: "aborted",
stopReason: "rpc",
});
releaseCatalog?.();
await pending;
await flushScheduledDispatchStep();
expect(mocks.agentCommand).not.toHaveBeenCalled();
expect(context.chatAbortControllers.has(runId)).toBe(false);
const finalResponse = respond.mock.calls.find(
(call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout",
);
expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], {
runId,
status: "timeout",
stopReason: "rpc",
});
});
it("does not dispatch when chat.abort lands before voice wake reroutes the session", async () => {
let releaseRouting: (() => void) | undefined;
mocks.loadVoiceWakeRoutingConfig.mockImplementation(
async () =>
await new Promise((resolve) => {
releaseRouting = () =>
resolve({
version: 1,
defaultTarget: { mode: "current" },
routes: [],
updatedAtMs: 0,
});
}),
);
mocks.resolveVoiceWakeRouteByTrigger.mockReturnValue({ sessionKey: "agent:main:voice" });
mocks.loadSessionEntry.mockImplementation((sessionKey: string) => ({
cfg: {},
storePath: "/tmp/sessions.json",
entry: {
sessionId: sessionKey === "agent:main:voice" ? "voice-session-id" : "main-session-id",
updatedAt: Date.now(),
},
canonicalKey: sessionKey === "agent:main:voice" ? "agent:main:voice" : "agent:main:main",
}));
mocks.updateSessionStore.mockResolvedValue(undefined);
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
const context = makeContext();
const respond = vi.fn();
const runId = "idem-abort-before-voice-route";
const pending = invokeAgent(
{
message: "wake up",
sessionKey: "agent:main:main",
voiceWakeTrigger: "robot wake",
idempotencyKey: runId,
},
{ context, respond, reqId: runId, flushDispatch: false },
);
await waitForAssertion(() =>
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: "agent:main:main",
status: "accepted",
}),
);
expect(context.chatAbortControllers.has(runId)).toBe(false);
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:main", runId },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: null,
isWebchatConnect: () => false,
});
expectRecordFields(mockCallArg(abortRespond, 0, 1), {
aborted: true,
runIds: [runId],
});
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: "agent:main:main",
status: "timeout",
summary: "aborted",
stopReason: "rpc",
});
releaseRouting?.();
await pending;
await flushScheduledDispatchStep();
expect(mocks.agentCommand).not.toHaveBeenCalled();
expect(context.chatAbortControllers.has(runId)).toBe(false);
const finalResponse = respond.mock.calls.find(
(call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout",
);
expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], {
runId,
status: "timeout",
stopReason: "rpc",
});
});
it("rejects unauthorized chat.abort during pre-accept setup", async () => {
prime();
let releaseSessionWrite: (() => void) | undefined;
let sessionWriteCalls = 0;
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
sessionWriteCalls += 1;
if (sessionWriteCalls === 1) {
await new Promise<void>((resolve) => {
releaseSessionWrite = resolve;
});
}
const store = {
"agent:main:main": buildExistingMainStoreEntry(),
};
return await updater(store);
});
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
const context = makeContext();
const respond = vi.fn();
const runId = "idem-abort-before-registration-unauthorized";
const pending = invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{
context,
respond,
reqId: runId,
flushDispatch: false,
client: { connId: "owner-conn" } as AgentHandlerArgs["client"],
},
);
await waitForAssertion(() => expect(sessionWriteCalls).toBe(1));
expect(context.chatAbortControllers.has(runId)).toBe(false);
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:main", runId },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: { connId: "other-conn" } as AgentHandlerArgs["client"],
isWebchatConnect: () => false,
});
expect(mockCallArg(abortRespond, 0, 0)).toBe(false);
expectRecordFields(mockCallArg(abortRespond, 0, 2), {
code: "INVALID_REQUEST",
message: "unauthorized",
});
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
sessionKey: "agent:main:main",
status: "accepted",
});
releaseSessionWrite?.();
await pending;
await flushScheduledDispatchStep();
expect(mocks.agentCommand).toHaveBeenCalledTimes(1);
expect(context.chatAbortControllers.has(runId)).toBe(true);
});
it("updates exec approval followup aliases when chat.abort lands during pre-accept setup", async () => {
const bashElevated = {
enabled: true,
allowed: true,
defaultLevel: "on" as const,
};
const firstRegistration = registerExecApprovalFollowupRuntimeHandoff({
approvalId: "req-elevated-preaccept-abort",
sessionKey: "agent:main:telegram:direct:123",
bashElevated,
});
const secondRegistration = registerExecApprovalFollowupRuntimeHandoff({
approvalId: "req-elevated-preaccept-abort",
sessionKey: "agent:main:telegram:direct:123",
bashElevated,
});
if (!firstRegistration || !secondRegistration) {
throw new Error("expected runtime handoff ids");
}
mockMainSessionEntry({
sessionId: "existing-session-id",
lastChannel: "telegram",
lastTo: "123",
});
let releaseSessionWrite: (() => void) | undefined;
let sessionWriteCalls = 0;
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
sessionWriteCalls += 1;
if (sessionWriteCalls === 1) {
await new Promise<void>((resolve) => {
releaseSessionWrite = resolve;
});
}
const store = {
"agent:main:main": buildExistingMainStoreEntry({
lastChannel: "telegram",
lastTo: "123",
}),
};
return await updater(store);
});
mocks.agentCommand.mockImplementation(() => new Promise(() => {}));
const context = makeContext();
const runId = firstRegistration.idempotencyKey;
const aliasKey = "agent:exec-approval-followup:req-elevated-preaccept-abort";
const pending = invokeAgent(
{
message: "exec followup",
sessionKey: "agent:main:telegram:direct:123",
channel: "telegram",
idempotencyKey: runId,
internalRuntimeHandoffId: firstRegistration.handoffId,
},
{
reqId: "exec-followup-preaccept-abort-1",
client: backendGatewayClient(),
context,
flushDispatch: false,
},
);
await waitForAssertion(() => expect(sessionWriteCalls).toBe(1));
expectRecordFields(context.dedupe.get(aliasKey)?.payload, {
runId,
sessionKey: "agent:main:telegram:direct:123",
status: "accepted",
});
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:telegram:direct:123", runId },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: backendGatewayClient(),
isWebchatConnect: () => false,
});
expectRecordFields(mockCallArg(abortRespond, 0, 1), {
aborted: true,
runIds: [runId],
});
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
status: "timeout",
stopReason: "rpc",
});
expectRecordFields(context.dedupe.get(aliasKey)?.payload, {
runId,
status: "timeout",
stopReason: "rpc",
});
releaseSessionWrite?.();
await pending;
const retryRespond = await invokeAgent(
{
message: "exec followup duplicate",
sessionKey: "agent:main:telegram:direct:123",
channel: "telegram",
idempotencyKey: secondRegistration.idempotencyKey,
internalRuntimeHandoffId: secondRegistration.handoffId,
},
{
reqId: "exec-followup-preaccept-abort-2",
client: backendGatewayClient(),
context,
flushDispatch: false,
},
);
expect(mockCallArg(retryRespond, 0, 1)).toMatchObject({
runId,
status: "timeout",
stopReason: "rpc",
});
expect(mocks.agentCommand).not.toHaveBeenCalled();
});
it("uses the explicit no-timeout agent expiry instead of the chat 24h cap", async () => {
prime();
mocks.agentCommand.mockImplementation(() => new Promise(() => {}));
const context = makeContext();
const respond = vi.fn();
const runId = "idem-abort-no-timeout";
await invokeAgent(
{
@@ -3599,7 +4309,7 @@ describe("gateway agent handler chat.abort integration", () => {
idempotencyKey: runId,
timeout: 0,
},
{ context, reqId: runId },
{ context, respond, reqId: runId },
);
const entry = context.chatAbortControllers.get(runId);
@@ -3682,6 +4392,56 @@ describe("gateway agent handler chat.abort integration", () => {
expect(context.chatAbortControllers.has(runId)).toBe(false);
});
it("chat.abort by runId allows the owner connection to use a stale session key", async () => {
prime();
const pending = new Promise(() => {});
let capturedSignal: AbortSignal | undefined;
mocks.agentCommand.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => {
capturedSignal = opts.abortSignal;
return pending;
});
const context = makeContext();
const runId = "idem-abort-stale-session-key";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{
context,
reqId: runId,
client: { connId: "owner-conn" } as AgentHandlerArgs["client"],
},
);
const active = requireValue(context.chatAbortControllers.get(runId), "active run missing");
context.chatAbortControllers.set(runId, {
...active,
sessionKey: "agent:main:canonical",
});
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:main", runId },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: { connId: "owner-conn" } as AgentHandlerArgs["client"],
isWebchatConnect: () => false,
});
expect(mockCallArg(abortRespond)).toBe(true);
expectRecordFields(mockCallArg(abortRespond, 0, 1), {
aborted: true,
runIds: [runId],
});
expect(capturedSignal?.aborted).toBe(true);
expect(context.chatAbortControllers.has(runId)).toBe(false);
});
it("keeps the sessions.abort wait snapshot after late agent completion", async () => {
prime();
let capturedSignal: AbortSignal | undefined;
@@ -3901,10 +4661,61 @@ describe("gateway agent handler chat.abort integration", () => {
);
expect(context.chatAbortControllers.get(runId)).toBe(preExisting);
expect(context.dedupe.has(`agent:${runId}`)).toBe(false);
expect(mocks.agentCommand).not.toHaveBeenCalled();
expect(respond).toHaveBeenCalledWith(true, { runId, status: "in_flight" }, undefined, {
cached: true,
runId,
});
});
it("returns in_flight instead of replaying cached accepted agent replies", async () => {
prime();
mocks.agentCommand.mockImplementationOnce(
() =>
new Promise(() => {
// Keep the first run pending so the dedupe entry remains accepted.
}),
);
const context = makeContext();
const runId = "idem-cached-accepted";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId, flushDispatch: false },
);
expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, {
runId,
status: "accepted",
sessionKey: "agent:main:main",
});
const duplicateRespond = vi.fn();
await invokeAgent(
{
message: "hi again",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: `${runId}-duplicate`, respond: duplicateRespond },
);
expect(mocks.agentCommand).not.toHaveBeenCalled();
expect(duplicateRespond).toHaveBeenCalledWith(
true,
{ runId, status: "in_flight", sessionKey: "agent:main:main" },
undefined,
{
cached: true,
runId,
},
);
});
});

View File

@@ -449,6 +449,68 @@ function readGatewayDedupeEntry(params: {
return undefined;
}
function isAcceptedAgentDedupePayload(payload: unknown): payload is {
acceptedAt?: unknown;
dedupeKeys?: unknown;
expiresAtMs?: unknown;
ownerConnId?: unknown;
ownerDeviceId?: unknown;
runId?: unknown;
sessionKey?: unknown;
status: "accepted";
} {
return (
typeof payload === "object" &&
payload !== null &&
(payload as { status?: unknown }).status === "accepted"
);
}
function isPreRegistrationAbortedAgentDedupePayload(payload: unknown): payload is {
runId?: unknown;
sessionKey?: unknown;
status: "timeout";
stopReason?: unknown;
} {
const stopReason = (payload as { stopReason?: unknown } | null)?.stopReason;
return (
typeof payload === "object" &&
payload !== null &&
(payload as { status?: unknown }).status === "timeout" &&
(stopReason === "rpc" || stopReason === "stop")
);
}
function isPreRegistrationAbortedAgentDedupeEntryForSession(params: {
entry: ReturnType<typeof readGatewayDedupeEntry> | undefined;
runId: string;
sessionKey?: string;
alternateSessionKeys?: Array<string | undefined>;
}): boolean {
if (!params.entry?.ok || !isPreRegistrationAbortedAgentDedupePayload(params.entry.payload)) {
return false;
}
const payload = params.entry.payload;
const payloadRunId = typeof payload.runId === "string" ? payload.runId.trim() : "";
if (payloadRunId && payloadRunId !== params.runId) {
return false;
}
const payloadSessionKey =
typeof payload.sessionKey === "string" && payload.sessionKey.trim()
? payload.sessionKey.trim()
: undefined;
const expectedSessionKeys = new Set(
[params.sessionKey, ...(params.alternateSessionKeys ?? [])].filter((value): value is string =>
Boolean(value?.trim()),
),
);
return (
!payloadSessionKey ||
expectedSessionKeys.size === 0 ||
expectedSessionKeys.has(payloadSessionKey)
);
}
function setGatewayDedupeEntries(params: {
dedupe: GatewayRequestContext["dedupe"];
keys: readonly string[];
@@ -463,6 +525,28 @@ function setGatewayDedupeEntries(params: {
}
}
function setAbortedAgentDedupeEntries(params: {
dedupe: GatewayRequestContext["dedupe"];
keys: readonly string[];
runId: string;
stopReason: string;
}) {
setGatewayDedupeEntries({
dedupe: params.dedupe,
keys: params.keys,
entry: {
ts: Date.now(),
ok: true,
payload: {
runId: params.runId,
status: "timeout" as const,
summary: "aborted",
stopReason: params.stopReason,
},
},
});
}
function deleteGatewayDedupeEntries(params: {
dedupe: GatewayRequestContext["dedupe"];
keys: readonly string[];
@@ -716,6 +800,30 @@ export const agentHandlers: GatewayRequestHandlers = {
keys: agentDedupeKeys,
});
if (cached) {
if (cached.ok && isAcceptedAgentDedupePayload(cached.payload)) {
const cachedRunId =
typeof cached.payload.runId === "string" && cached.payload.runId.trim()
? cached.payload.runId.trim()
: runId;
const cachedSessionKey =
typeof cached.payload.sessionKey === "string" && cached.payload.sessionKey.trim()
? cached.payload.sessionKey.trim()
: undefined;
respond(
true,
{
runId: cachedRunId,
status: "in_flight" as const,
...(cachedSessionKey ? { sessionKey: cachedSessionKey } : {}),
},
undefined,
{
cached: true,
runId: cachedRunId,
},
);
return;
}
respond(cached.ok, cached.payload, cached.error, {
cached: true,
});
@@ -723,20 +831,36 @@ export const agentHandlers: GatewayRequestHandlers = {
}
let agentDedupeReserved = false;
let agentRunAccepted = false;
const reserveExecApprovalFollowupDedupe = () => {
if (agentDedupeReserved || !execApprovalFollowupApprovalId) {
const ownerConnId = typeof client?.connId === "string" ? client.connId : undefined;
const ownerDeviceId =
typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined;
const reservePreAcceptedAgentDedupe = (sessionKey?: string) => {
if (agentDedupeReserved || !sessionKey) {
return;
}
const acceptedAt = Date.now();
const pendingTimeoutMs = resolveAgentTimeoutMs({
cfg,
overrideSeconds: typeof request.timeout === "number" ? request.timeout : undefined,
});
setGatewayDedupeEntries({
dedupe: context.dedupe,
keys: agentDedupeKeys,
entry: {
ts: Date.now(),
ts: acceptedAt,
ok: true,
payload: {
runId,
status: "accepted" as const,
acceptedAt: Date.now(),
sessionKey,
acceptedAt,
dedupeKeys: agentDedupeKeys,
expiresAtMs: resolveAgentRunExpiresAtMs({
now: acceptedAt,
timeoutMs: pendingTimeoutMs,
}),
ownerConnId,
ownerDeviceId,
},
},
});
@@ -746,6 +870,18 @@ export const agentHandlers: GatewayRequestHandlers = {
if (!agentDedupeReserved || agentRunAccepted) {
return;
}
const reservedEntry = readGatewayDedupeEntry({
dedupe: context.dedupe,
keys: agentDedupeKeys,
});
if (
isPreRegistrationAbortedAgentDedupeEntryForSession({
entry: reservedEntry,
runId,
})
) {
return;
}
deleteGatewayDedupeEntries({
dedupe: context.dedupe,
keys: agentDedupeKeys,
@@ -756,91 +892,6 @@ export const agentHandlers: GatewayRequestHandlers = {
const requestedBestEffortDeliver =
typeof request.bestEffortDeliver === "boolean" ? request.bestEffortDeliver : undefined;
let message = (request.message ?? "").trim();
if (!isRawModelRun) {
message = annotateInterSessionPromptText(message, inputProvenance);
}
let images: Array<{ type: "image"; data: string; mimeType: string }> = [];
let imageOrder: PromptImageOrderEntry[] = [];
if (normalizedAttachments.length > 0) {
const requestedSessionKeyRaw =
typeof request.sessionKey === "string" && request.sessionKey.trim()
? request.sessionKey.trim()
: undefined;
let baseProvider: string | undefined;
let baseModel: string | undefined;
if (requestedSessionKeyRaw) {
const { cfg: sessCfg, entry: sessEntry } = loadSessionEntry(requestedSessionKeyRaw);
const sessionAgentId = resolveAgentIdFromSessionKey(requestedSessionKeyRaw);
const modelRef = resolveSessionModelRef(sessCfg, sessEntry, sessionAgentId);
baseProvider = modelRef.provider;
baseModel = modelRef.model;
}
const effectiveProvider = providerOverride || baseProvider;
const effectiveModel = modelOverride || baseModel;
const supportsInlineImages = await resolveGatewayModelSupportsImages({
loadGatewayModelCatalog: context.loadGatewayModelCatalog,
provider: effectiveProvider,
model: effectiveModel,
});
try {
const parsed = await parseMessageWithAttachments(message, normalizedAttachments, {
maxBytes: resolveChatAttachmentMaxBytes(cfg),
log: context.logGateway,
supportsInlineImages,
// agent.run does not yet wire a ctx.MediaPaths stage path, so reject
// non-image attachments explicitly (UnsupportedAttachmentError)
// instead of saving them where the agent cannot reach them.
acceptNonImage: false,
});
message = parsed.message.trim();
images = parsed.images;
imageOrder = parsed.imageOrder;
// offloadedRefs are appended as text markers to `message`; the agent
// runner will resolve them via detectAndLoadPromptImages.
} catch (err) {
// MediaOffloadError indicates a server-side storage fault (ENOSPC, EPERM,
// etc.). Map it to UNAVAILABLE so clients can retry without treating it as
// a bad request. All other errors are input-validation failures → 4xx.
logAttachmentFailure(context.logGateway, "agent attachment parse failed", err);
const isServerFault = err instanceof MediaOffloadError;
respond(
false,
undefined,
errorShape(
isServerFault ? ErrorCodes.UNAVAILABLE : ErrorCodes.INVALID_REQUEST,
String(err),
),
);
return;
}
}
// Accept internal non-delivery sources (heartbeat, cron, webhook) as valid
// channel hints so subagent spawns from those parent runs are not rejected.
const isKnownGatewayChannel = (value: string): boolean =>
isGatewayMessageChannel(value) || isInternalNonDeliveryChannel(value);
const channelHints = [request.channel, request.replyChannel]
.filter((value): value is string => typeof value === "string")
.map((value) => value.trim())
.filter(Boolean);
for (const rawChannel of channelHints) {
const normalized = normalizeMessageChannel(rawChannel);
if (normalized && normalized !== "last" && !isKnownGatewayChannel(normalized)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid agent params: unknown channel: ${normalized}`,
),
);
return;
}
}
const knownAgents = listAgentIds(cfg);
const agentIdRaw = normalizeOptionalString(request.agentId) ?? "";
let agentId = agentIdRaw ? normalizeAgentId(agentIdRaw) : undefined;
@@ -894,10 +945,92 @@ export const agentHandlers: GatewayRequestHandlers = {
return;
}
}
// Exec approval followups can retry with a fresh nonce for the same approval id.
// Reserve the stable alias before awaited session/delivery work so overlaps dedupe.
reserveExecApprovalFollowupDedupe();
// Reserve the run before awaited attachment/session/delivery work so duplicate calls dedupe and
// pre-registration chat.abort can be made durable by idempotency key.
const preAcceptedReservedSessionKey = requestedSessionKey;
reservePreAcceptedAgentDedupe(preAcceptedReservedSessionKey);
try {
let message = (request.message ?? "").trim();
if (!isRawModelRun) {
message = annotateInterSessionPromptText(message, inputProvenance);
}
let images: Array<{ type: "image"; data: string; mimeType: string }> = [];
let imageOrder: PromptImageOrderEntry[] = [];
if (normalizedAttachments.length > 0) {
let baseProvider: string | undefined;
let baseModel: string | undefined;
if (requestedSessionKeyRaw) {
const { cfg: sessCfg, entry: sessEntry } = loadSessionEntry(requestedSessionKeyRaw);
const sessionAgentId = resolveAgentIdFromSessionKey(requestedSessionKeyRaw);
const modelRef = resolveSessionModelRef(sessCfg, sessEntry, sessionAgentId);
baseProvider = modelRef.provider;
baseModel = modelRef.model;
}
const effectiveProvider = providerOverride || baseProvider;
const effectiveModel = modelOverride || baseModel;
const supportsInlineImages = await resolveGatewayModelSupportsImages({
loadGatewayModelCatalog: context.loadGatewayModelCatalog,
provider: effectiveProvider,
model: effectiveModel,
});
try {
const parsed = await parseMessageWithAttachments(message, normalizedAttachments, {
maxBytes: resolveChatAttachmentMaxBytes(cfg),
log: context.logGateway,
supportsInlineImages,
// agent.run does not yet wire a ctx.MediaPaths stage path, so reject
// non-image attachments explicitly (UnsupportedAttachmentError)
// instead of saving them where the agent cannot reach them.
acceptNonImage: false,
});
message = parsed.message.trim();
images = parsed.images;
imageOrder = parsed.imageOrder;
// offloadedRefs are appended as text markers to `message`; the agent
// runner will resolve them via detectAndLoadPromptImages.
} catch (err) {
// MediaOffloadError indicates a server-side storage fault (ENOSPC, EPERM,
// etc.). Map it to UNAVAILABLE so clients can retry without treating it as
// a bad request. All other errors are input-validation failures → 4xx.
logAttachmentFailure(context.logGateway, "agent attachment parse failed", err);
const isServerFault = err instanceof MediaOffloadError;
respond(
false,
undefined,
errorShape(
isServerFault ? ErrorCodes.UNAVAILABLE : ErrorCodes.INVALID_REQUEST,
String(err),
),
);
return;
}
}
// Accept internal non-delivery sources (heartbeat, cron, webhook) as valid
// channel hints so subagent spawns from those parent runs are not rejected.
const isKnownGatewayChannel = (value: string): boolean =>
isGatewayMessageChannel(value) || isInternalNonDeliveryChannel(value);
const channelHints = [request.channel, request.replyChannel]
.filter((value): value is string => typeof value === "string")
.map((value) => value.trim())
.filter(Boolean);
for (const rawChannel of channelHints) {
const normalized = normalizeMessageChannel(rawChannel);
if (normalized && normalized !== "last" && !isKnownGatewayChannel(normalized)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid agent params: unknown channel: ${normalized}`,
),
);
return;
}
}
const voiceWakeTrigger = normalizeOptionalString(request.voiceWakeTrigger) ?? "";
const replyTo = normalizeOptionalString(request.replyTo) ?? "";
const to = normalizeOptionalString(request.to) ?? "";
@@ -1438,6 +1571,26 @@ export const agentHandlers: GatewayRequestHandlers = {
const deliver = request.deliver === true && resolvedChannel !== INTERNAL_MESSAGE_CHANNEL;
const preRegistrationAbort = readGatewayDedupeEntry({
dedupe: context.dedupe,
keys: agentDedupeKeys,
});
if (
isPreRegistrationAbortedAgentDedupeEntryForSession({
entry: preRegistrationAbort,
runId,
sessionKey: resolvedSessionKey,
alternateSessionKeys: [preAcceptedReservedSessionKey, requestedSessionKey],
})
) {
agentRunAccepted = true;
respond(true, preRegistrationAbort?.payload, undefined, {
cached: true,
runId,
});
return;
}
// Register before the accepted ack so an immediate chat.abort/sessions.abort
// cannot race the active-run entry. Agent RPC runs use the agent timeout;
// chat.send keeps the shorter chat cleanup cap.
@@ -1466,15 +1619,15 @@ export const agentHandlers: GatewayRequestHandlers = {
timeoutMs,
now,
expiresAtMs: resolveAgentRunExpiresAtMs({ now, timeoutMs }),
ownerConnId: typeof client?.connId === "string" ? client.connId : undefined,
ownerDeviceId:
typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined,
ownerConnId,
ownerDeviceId,
providerId: activeModelProvider,
authProviderId: activeAuthProvider,
kind: "agent",
});
if (!activeRunAbort.registered && context.chatAbortControllers.has(runId)) {
agentRunAccepted = true;
const existingRunAbort = context.chatAbortControllers.get(runId);
if (!activeRunAbort.registered && existingRunAbort) {
agentRunAccepted = existingRunAbort.kind === "agent";
respond(true, { runId, status: "in_flight" as const }, undefined, {
cached: true,
runId,
@@ -1484,9 +1637,16 @@ export const agentHandlers: GatewayRequestHandlers = {
const accepted = {
runId,
sessionKey: resolvedSessionKey,
status: "accepted" as const,
acceptedAt: Date.now(),
};
const acceptedDedupePayload = {
...accepted,
dedupeKeys: agentDedupeKeys,
ownerConnId,
ownerDeviceId,
};
agentRunAccepted = true;
// Store an in-flight ack so retries do not spawn a second run.
setGatewayDedupeEntries({
@@ -1495,7 +1655,7 @@ export const agentHandlers: GatewayRequestHandlers = {
entry: {
ts: Date.now(),
ok: true,
payload: accepted,
payload: acceptedDedupePayload,
},
});
respond(true, accepted, undefined, { runId });
@@ -1508,6 +1668,27 @@ export const agentHandlers: GatewayRequestHandlers = {
let dispatched = false;
try {
if (activeRunAbort.controller.signal.aborted) {
setAbortedAgentDedupeEntries({
dedupe: context.dedupe,
keys: agentDedupeKeys,
runId,
stopReason: "rpc",
});
respond(
true,
{
runId,
status: "timeout" as const,
summary: "aborted",
stopReason: "rpc",
},
undefined,
{ runId },
);
return;
}
if (resolvedSessionKey) {
await reactivateCompletedSubagentSession({
sessionKey: resolvedSessionKey,

View File

@@ -167,6 +167,25 @@ type ChatAbortRequester = {
isAdmin: boolean;
};
type PreRegisteredAgentDedupePayload = {
dedupeKeys?: unknown;
ownerConnId?: unknown;
ownerDeviceId?: unknown;
runId?: unknown;
sessionKey?: unknown;
status?: unknown;
};
type PreRegisteredAgentRun = {
runId: string;
sessionKey: string;
payload: PreRegisteredAgentDedupePayload;
};
function normalizeUnknownText(value: unknown): string | undefined {
return typeof value === "string" ? normalizeOptionalText(value) : undefined;
}
/** True when a reply payload carries at least one media reference (mediaUrl or mediaUrls). */
function isMediaBearingPayload(payload: ReplyPayload): boolean {
if (payload.isReasoning === true) {
@@ -1642,6 +1661,153 @@ function canRequesterAbortChatRun(
return false;
}
function canRequesterAbortChatRunWithoutSessionMatch(
entry: ChatAbortControllerEntry,
requester: ChatAbortRequester,
): boolean {
if (requester.isAdmin) {
return true;
}
const ownerDeviceId = normalizeOptionalText(entry.ownerDeviceId);
const ownerConnId = normalizeOptionalText(entry.ownerConnId);
return Boolean(
(ownerDeviceId && requester.deviceId && ownerDeviceId === requester.deviceId) ||
(ownerConnId && requester.connId && ownerConnId === requester.connId),
);
}
function readPreRegisteredAgentDedupePayloadForSession(params: {
entry: GatewayRequestContext["dedupe"] extends Map<string, infer T> ? T | undefined : never;
runId: string;
sessionKey: string;
}): PreRegisteredAgentDedupePayload | undefined {
if (!params.entry?.ok) {
return undefined;
}
const payload = params.entry.payload as PreRegisteredAgentDedupePayload | undefined;
if (payload?.status !== "accepted") {
return undefined;
}
const payloadRunId = normalizeUnknownText(payload.runId);
if (payloadRunId && payloadRunId !== params.runId) {
return undefined;
}
return normalizeUnknownText(payload.sessionKey) === params.sessionKey ? payload : undefined;
}
function readPreRegisteredAgentRun(params: {
key: string;
entry: GatewayRequestContext["dedupe"] extends Map<string, infer T> ? T | undefined : never;
}): PreRegisteredAgentRun | undefined {
if (!params.key.startsWith("agent:") || !params.entry?.ok) {
return undefined;
}
const payload = params.entry.payload as PreRegisteredAgentDedupePayload | undefined;
if (payload?.status !== "accepted") {
return undefined;
}
const runId = normalizeUnknownText(payload.runId) ?? normalizeOptionalText(params.key.slice(6));
const sessionKey = normalizeUnknownText(payload.sessionKey);
if (!runId || !sessionKey) {
return undefined;
}
return { runId, sessionKey, payload };
}
function canRequesterAbortPreRegisteredAgentRun(
payload: PreRegisteredAgentDedupePayload,
requester: ChatAbortRequester,
): boolean {
return canRequesterAbortChatRun(
{
controller: new AbortController(),
sessionId: "",
sessionKey: normalizeUnknownText(payload.sessionKey) ?? "",
startedAtMs: 0,
expiresAtMs: 0,
ownerConnId: normalizeUnknownText(payload.ownerConnId),
ownerDeviceId: normalizeUnknownText(payload.ownerDeviceId),
kind: "agent",
},
requester,
);
}
function resolvePreRegisteredAgentDedupeKeys(
payload: PreRegisteredAgentDedupePayload,
runId: string,
): string[] {
const keys = [`agent:${runId}`];
const payloadKeys = Array.isArray(payload.dedupeKeys) ? payload.dedupeKeys : [];
for (const key of payloadKeys) {
const normalized = normalizeUnknownText(key);
if (normalized?.startsWith("agent:")) {
keys.push(normalized);
}
}
return [...new Set(keys)];
}
function writePreRegisteredAgentAbort(params: {
context: GatewayRequestContext;
runId: string;
sessionKey: string;
payload: PreRegisteredAgentDedupePayload;
stopReason: string;
endedAt?: number;
}) {
const endedAt = params.endedAt ?? Date.now();
for (const key of resolvePreRegisteredAgentDedupeKeys(params.payload, params.runId)) {
setGatewayDedupeEntry({
dedupe: params.context.dedupe,
key,
entry: {
ts: endedAt,
ok: true,
payload: {
runId: params.runId,
sessionKey: params.sessionKey,
status: "timeout" as const,
summary: "aborted",
stopReason: params.stopReason,
endedAt,
},
},
});
}
}
function resolveAuthorizedPreRegisteredAgentRunsForSessionKeys(params: {
context: GatewayRequestContext;
sessionKeys: Iterable<string>;
requester: ChatAbortRequester;
}) {
const sessionKeys = new Set(
Array.from(params.sessionKeys, (sessionKey) => normalizeOptionalText(sessionKey)).filter(
(sessionKey): sessionKey is string => Boolean(sessionKey),
),
);
const authorizedByRunId = new Map<string, PreRegisteredAgentRun>();
let matchedSessionRuns = 0;
for (const [key, entry] of params.context.dedupe) {
const run = readPreRegisteredAgentRun({ key, entry });
if (!run || !sessionKeys.has(run.sessionKey)) {
continue;
}
if (params.context.chatAbortControllers.has(run.runId)) {
continue;
}
matchedSessionRuns += 1;
if (canRequesterAbortPreRegisteredAgentRun(run.payload, params.requester)) {
authorizedByRunId.set(run.runId, run);
}
}
return {
matchedSessionRuns,
authorizedRuns: [...authorizedByRunId.values()],
};
}
function resolveAuthorizedRunsForSessionKeys(params: {
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
sessionKeys: Iterable<string>;
@@ -1686,17 +1852,26 @@ async function abortChatRunsForSessionKeyWithPartials(params: {
stopReason?: string;
requester: ChatAbortRequester;
}): Promise<{ aborted: boolean; runIds: string[]; unauthorized: boolean }> {
const sessionKeys = [params.sessionKey, ...(params.sessionKeyAliases ?? [])];
const { matchedSessionRuns, authorizedRuns } = resolveAuthorizedRunsForSessionKeys({
chatAbortControllers: params.context.chatAbortControllers,
sessionKeys: [params.sessionKey, ...(params.sessionKeyAliases ?? [])],
sessionKeys,
sessionIds: [params.sessionId],
requester: params.requester,
});
if (authorizedRuns.length === 0) {
const {
matchedSessionRuns: matchedPendingAgentRuns,
authorizedRuns: authorizedPendingAgentRuns,
} = resolveAuthorizedPreRegisteredAgentRunsForSessionKeys({
context: params.context,
sessionKeys,
requester: params.requester,
});
if (authorizedRuns.length === 0 && authorizedPendingAgentRuns.length === 0) {
return {
aborted: false,
runIds: [],
unauthorized: matchedSessionRuns > 0,
unauthorized: matchedSessionRuns > 0 || matchedPendingAgentRuns > 0,
};
}
const authorizedRunIdSet = new Set(authorizedRuns.map((run) => run.runId));
@@ -1717,6 +1892,19 @@ async function abortChatRunsForSessionKeyWithPartials(params: {
runIds.push(runId);
}
}
const endedAt = Date.now();
const stopReason = params.stopReason ?? "rpc";
for (const { runId, sessionKey, payload } of authorizedPendingAgentRuns) {
writePreRegisteredAgentAbort({
context: params.context,
runId,
sessionKey,
payload,
stopReason,
endedAt,
});
runIds.push(runId);
}
const res = { aborted: runIds.length > 0, runIds, unauthorized: false };
if (res.aborted) {
await persistAbortedPartials({
@@ -1926,10 +2114,34 @@ export const chatHandlers: GatewayRequestHandlers = {
const active = context.chatAbortControllers.get(runId);
if (!active) {
const pendingAgentEntry = context.dedupe.get(`agent:${runId}`);
const pendingAgentPayload = readPreRegisteredAgentDedupePayloadForSession({
entry: pendingAgentEntry,
runId,
sessionKey: rawSessionKey,
});
if (pendingAgentPayload) {
if (!canRequesterAbortPreRegisteredAgentRun(pendingAgentPayload, requester)) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized"));
return;
}
writePreRegisteredAgentAbort({
context,
runId,
sessionKey: rawSessionKey,
payload: pendingAgentPayload,
stopReason: "rpc",
});
respond(true, { ok: true, aborted: true, runIds: [runId] });
return;
}
respond(true, { ok: true, aborted: false, runIds: [] });
return;
}
if (active.sessionKey !== rawSessionKey) {
if (
active.sessionKey !== rawSessionKey &&
!canRequesterAbortChatRunWithoutSessionMatch(active, requester)
) {
respond(
false,
undefined,
@@ -1945,13 +2157,13 @@ export const chatHandlers: GatewayRequestHandlers = {
const partialText = context.chatRunBuffers.get(runId);
const res = abortChatRunById(ops, {
runId,
sessionKey: rawSessionKey,
sessionKey: active.sessionKey,
stopReason: "rpc",
});
if (res.aborted && partialText && partialText.trim()) {
await persistAbortedPartials({
context,
sessionKey: rawSessionKey,
sessionKey: active.sessionKey,
snapshots: [
{
runId,