diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index d34c57318d24..5dd94b81bc09 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -55,6 +55,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t - Isolated cron runs also treat run-level agent failures as job errors even when no reply payload is produced, so model/provider failures increment error counters and trigger failure notifications instead of clearing the job as successful. - When an isolated agent-turn job reaches `timeoutSeconds`, cron aborts the underlying agent run and gives it a short cleanup window. If the run does not drain, Gateway-owned cleanup force-clears that run's session ownership before cron records the timeout, so queued chat work is not left behind a stale processing session. - If an isolated agent-turn stalls before the runner starts or before the first model call, cron records a phase-specific timeout such as `setup timed out before runner start` or `stalled before first model call (last phase: context-engine)`. These watchdogs cover embedded providers and CLI-backed providers before their external CLI process is actually started, and are capped independently from long `timeoutSeconds` values so cold-start/auth/context failures surface quickly instead of waiting for the full job budget. +- If you use system cron or another external scheduler to run `openclaw agent`, wrap it with a hard-kill escalation even though the CLI handles `SIGTERM`/`SIGINT`. Gateway-backed runs ask the Gateway to abort accepted runs; local and embedded fallback runs receive the same abort signal. For GNU `timeout`, prefer `timeout -k 60 600 openclaw agent ...` over plain `timeout 600 ...`; the `-k` value is the supervisor backstop if the process cannot drain. For systemd units, keep the same shape by using a `SIGTERM` stop signal plus a grace window such as `TimeoutStopSec` before any final kill. If a retry reuses a `--run-id` while the original Gateway run is still active, the duplicate is reported as in-flight instead of starting a second run. diff --git a/docs/cli/agent.md b/docs/cli/agent.md index de5bab7fdc20..b9399d1915d3 100644 --- a/docs/cli/agent.md +++ b/docs/cli/agent.md @@ -65,6 +65,7 @@ openclaw agent --agent ops --message "Run locally" --local - `--json` keeps stdout reserved for the JSON response. Gateway, plugin, and embedded-fallback diagnostics are routed to stderr so scripts can parse stdout directly. - Embedded fallback JSON includes `meta.transport: "embedded"` and `meta.fallbackFrom: "gateway"` so scripts can distinguish fallback runs from Gateway runs. - If the Gateway accepts an agent run but the CLI times out waiting for the final reply, embedded fallback uses a fresh explicit `gateway-fallback-*` session/run id and reports `meta.fallbackReason: "gateway_timeout"` plus the fallback session fields. This avoids racing the Gateway-owned transcript lock or silently replacing the original routed conversation session. +- For Gateway-backed runs, `SIGTERM` and `SIGINT` interrupt the waiting CLI request. If the Gateway has already accepted the run, the CLI also sends `chat.abort` for that accepted run id before exiting. Local `--local` runs and embedded fallback runs receive the same abort signal, but do not send `chat.abort`. If a duplicate `--run-id` reaches the Gateway while the original agent run is still active, the duplicate response reports `status: "in_flight"` and the non-JSON CLI prints a stderr diagnostic instead of an empty reply. For external cron/systemd wrappers, keep an outer hard-kill backstop such as `timeout -k 60 600 openclaw agent ...` so the supervisor can still reap the process if shutdown cannot drain. - When this command triggers `models.json` regeneration, SecretRef-managed provider credentials are persisted as non-secret markers (for example env var names, `secretref-env:ENV_VAR_NAME`, or `secretref-managed`), not resolved secret plaintext. - Marker writes are source-authoritative: OpenClaw persists markers from the active source config snapshot, not from resolved runtime secret values. diff --git a/src/commands/agent-via-gateway.test.ts b/src/commands/agent-via-gateway.test.ts index 3f6155526f4b..92335542223b 100644 --- a/src/commands/agent-via-gateway.test.ts +++ b/src/commands/agent-via-gateway.test.ts @@ -107,6 +107,34 @@ function requireRecord(value: unknown, label: string): Record { return value as Record; } +function createSignalProcess() { + type SignalName = "SIGINT" | "SIGTERM"; + const listeners = new Map void>>(); + const processLike = { + on(signal: SignalName, handler: () => void) { + const current = listeners.get(signal) ?? new Set<() => void>(); + current.add(handler); + listeners.set(signal, current); + return processLike; + }, + off(signal: SignalName, handler: () => void) { + listeners.get(signal)?.delete(handler); + return processLike; + }, + }; + return { + processLike, + emit(signal: SignalName) { + for (const handler of listeners.get(signal) ?? []) { + handler(); + } + }, + listenerCount(signal: SignalName) { + return listeners.get(signal)?.size ?? 0; + }, + }; +} + function mockMessages(mock: unknown): string[] { const calls = (mock as { mock?: { calls?: unknown[][] } }).mock?.calls ?? []; return calls.map(([message]) => String(message)); @@ -361,6 +389,638 @@ describe("agentCliCommand", () => { ); }); + it("does not treat lazy channel deps as the process signal source", async () => { + await withTempStore(async () => { + mockGatewaySuccessReply(); + const deps = new Proxy( + {}, + { + get(target, property, receiver) { + if (property === "process") { + return async () => undefined; + } + return Reflect.get(target, property, receiver); + }, + }, + ); + + await agentCliCommand({ message: "hi", to: "+1555" }, runtime, deps); + + expect(callGateway).toHaveBeenCalledTimes(1); + expect(agentCommand).not.toHaveBeenCalled(); + }); + }); + + it("exits for successful gateway runs when SIGTERM arrives before return", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + mockGatewaySuccessReply(); + const signalRuntime: RuntimeEnv = { + log: vi.fn(() => { + signals.emit("SIGTERM"); + }), + error: vi.fn(), + exit: vi.fn(), + }; + + const result = await agentCliCommand({ message: "hi", to: "+1555" }, signalRuntime, { + process: signals.processLike, + }); + + expect(result).toBeUndefined(); + expect(callGateway).toHaveBeenCalledTimes(1); + expect(agentCommand).not.toHaveBeenCalled(); + expect(signalRuntime.log).toHaveBeenCalledWith("hello"); + expect(signalRuntime.exit).toHaveBeenCalledWith(143); + }); + }); + + it.each([ + ["SIGTERM", 143], + ["SIGINT", 130], + ] as const)( + "aborts an accepted gateway run using the accepted session key when %s interrupts the CLI", + async (signalName, exitCode) => { + await withTempStore(async () => { + const signals = createSignalProcess(); + let sameConnectionAbort: + | { method: string; params: unknown; opts?: { timeoutMs?: number | null } } + | undefined; + callGateway.mockImplementation(async (requestValue: unknown) => { + const request = requireRecord(requestValue, "gateway request"); + if (request.method === "agent") { + const onAccepted = request.onAccepted as ((payload: unknown) => void) | undefined; + const onSignalAbort = request.onSignalAbort as + | (( + request: ( + method: string, + params?: unknown, + opts?: { timeoutMs?: number | null }, + ) => Promise, + ) => Promise) + | undefined; + const signal = request.signal as AbortSignal | undefined; + onAccepted?.({ + status: "accepted", + runId: "run-signal", + sessionKey: "agent:main:explicit:reset-run", + }); + return await new Promise((_, reject) => { + signal?.addEventListener( + "abort", + () => { + void (async () => { + await onSignalAbort?.(async (method, params, opts) => { + sameConnectionAbort = { method, params, opts }; + return { ok: true, aborted: true, runIds: ["run-signal"] }; + }); + const err = new Error("gateway request aborted for agent"); + err.name = "AbortError"; + reject(err); + })(); + }, + { once: true }, + ); + }); + } + throw new Error(`unexpected gateway method ${String(request.method)}`); + }); + + const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, { + process: signals.processLike, + }); + await Promise.resolve(); + signals.emit(signalName); + expect(signals.listenerCount("SIGTERM")).toBe(0); + expect(signals.listenerCount("SIGINT")).toBe(0); + + await run; + expect(callGateway).toHaveBeenCalledTimes(1); + expect(runtime.exit).toHaveBeenCalledWith(exitCode); + expect(sameConnectionAbort?.method).toBe("chat.abort"); + expect(sameConnectionAbort?.opts).toEqual({ timeoutMs: 2_000 }); + expect(sameConnectionAbort?.params).toEqual({ + sessionKey: "agent:main:explicit:reset-run", + runId: "run-signal", + }); + }); + }, + ); + + it("aborts a gateway run by idempotency key before the accepted ack", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + let sameConnectionAbort: + | { method: string; params: unknown; opts?: { timeoutMs?: number | null } } + | undefined; + callGateway.mockImplementation(async (requestValue: unknown) => { + const request = requireRecord(requestValue, "gateway request"); + if (request.method === "agent") { + const params = requireRecord(request.params, "gateway agent params"); + expect(params.idempotencyKey).toBe("pre-accepted-run"); + const onSignalAbort = request.onSignalAbort as + | (( + request: ( + method: string, + params?: unknown, + opts?: { timeoutMs?: number | null }, + ) => Promise, + ) => Promise) + | undefined; + const signal = request.signal as AbortSignal | undefined; + return await new Promise((_, reject) => { + signal?.addEventListener( + "abort", + () => { + void (async () => { + await onSignalAbort?.(async (method, params, opts) => { + sameConnectionAbort = { method, params, opts }; + return { ok: true, aborted: true, runIds: ["pre-accepted-run"] }; + }); + const err = new Error("gateway request aborted before accepted ack"); + err.name = "AbortError"; + reject(err); + })(); + }, + { once: true }, + ); + }); + } + throw new Error(`unexpected gateway method ${String(request.method)}`); + }); + + const run = agentCliCommand( + { message: "hi", sessionId: "pre-session", runId: "pre-accepted-run" }, + runtime, + { + process: signals.processLike, + }, + ); + await Promise.resolve(); + signals.emit("SIGTERM"); + + await run; + expect(callGateway).toHaveBeenCalledTimes(1); + expect(runtime.exit).toHaveBeenCalledWith(143); + expect(sameConnectionAbort?.method).toBe("chat.abort"); + expect(sameConnectionAbort?.opts).toEqual({ timeoutMs: 2_000 }); + expect(sameConnectionAbort?.params).toEqual({ + sessionKey: "agent:main:explicit:pre-session", + runId: "pre-accepted-run", + }); + expect(signals.listenerCount("SIGTERM")).toBe(0); + expect(signals.listenerCount("SIGINT")).toBe(0); + }); + }); + + it("skips fallback abort when SIGTERM interrupts before the gateway request starts", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + callGateway.mockImplementation(async (requestValue: unknown) => { + const request = requireRecord(requestValue, "gateway request"); + if (request.method === "agent") { + const signal = request.signal as AbortSignal | undefined; + return await new Promise((_, reject) => { + signal?.addEventListener( + "abort", + () => { + const err = new Error("gateway request aborted before start"); + err.name = "AbortError"; + reject(err); + }, + { once: true }, + ); + }); + } + throw new Error(`unexpected gateway method ${String(request.method)}`); + }); + + const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, { + process: signals.processLike, + }); + await Promise.resolve(); + signals.emit("SIGTERM"); + + await run; + expect(callGateway).toHaveBeenCalledTimes(1); + expect(runtime.exit).toHaveBeenCalledWith(143); + expect(signals.listenerCount("SIGTERM")).toBe(0); + expect(signals.listenerCount("SIGINT")).toBe(0); + }); + }); + + it("retries same-connection abort before falling back to a new Gateway call", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + const sameConnectionAborts: Array<{ + method: string; + params: unknown; + opts?: { timeoutMs?: number | null }; + }> = []; + callGateway.mockImplementation(async (requestValue: unknown) => { + const request = requireRecord(requestValue, "gateway request"); + if (request.method === "agent") { + const params = requireRecord(request.params, "gateway agent params"); + expect(params.idempotencyKey).toBe("pre-accepted-run"); + const onSignalAbort = request.onSignalAbort as + | (( + request: ( + method: string, + params?: unknown, + opts?: { timeoutMs?: number | null }, + ) => Promise, + ) => Promise) + | undefined; + const signal = request.signal as AbortSignal | undefined; + return await new Promise((_, reject) => { + signal?.addEventListener( + "abort", + () => { + void (async () => { + await onSignalAbort?.(async (method, params, opts) => { + sameConnectionAborts.push({ method, params, opts }); + return sameConnectionAborts.length < 3 + ? { ok: true, aborted: false, runIds: [] } + : { ok: true, aborted: true, runIds: ["pre-accepted-run"] }; + }); + const err = new Error("gateway request aborted before registration"); + err.name = "AbortError"; + reject(err); + })(); + }, + { once: true }, + ); + }); + } + throw new Error(`unexpected gateway method ${String(request.method)}`); + }); + + const run = agentCliCommand( + { message: "hi", to: "+1555", runId: "pre-accepted-run" }, + runtime, + { + process: signals.processLike, + }, + ); + await Promise.resolve(); + signals.emit("SIGTERM"); + + await run; + expect(callGateway).toHaveBeenCalledTimes(1); + expect(runtime.exit).toHaveBeenCalledWith(143); + expect(sameConnectionAborts).toHaveLength(3); + expect(sameConnectionAborts.at(-1)).toEqual({ + method: "chat.abort", + opts: { timeoutMs: 2_000 }, + params: { + sessionKey: "agent:main:main", + runId: "pre-accepted-run", + }, + }); + expect(signals.listenerCount("SIGTERM")).toBe(0); + expect(signals.listenerCount("SIGINT")).toBe(0); + }); + }); + + it("falls back to a new Gateway call when the same-connection abort is not confirmed", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + const sameConnectionAborts: Array<{ + method: string; + params: unknown; + opts?: { timeoutMs?: number | null }; + }> = []; + let fallbackAbort: Record | undefined; + callGateway.mockImplementation(async (requestValue: unknown) => { + const request = requireRecord(requestValue, "gateway request"); + if (request.method === "agent") { + const params = requireRecord(request.params, "gateway agent params"); + expect(params.idempotencyKey).toBe("pre-accepted-run"); + const onSignalAbort = request.onSignalAbort as + | (( + request: ( + method: string, + params?: unknown, + opts?: { timeoutMs?: number | null }, + ) => Promise, + ) => Promise) + | undefined; + const signal = request.signal as AbortSignal | undefined; + return await new Promise((_, reject) => { + signal?.addEventListener( + "abort", + () => { + void (async () => { + await onSignalAbort?.(async (method, params, opts) => { + sameConnectionAborts.push({ method, params, opts }); + return { ok: true, aborted: false, runIds: [] }; + }); + const err = new Error("gateway request aborted before registration"); + err.name = "AbortError"; + reject(err); + })(); + }, + { once: true }, + ); + }); + } + if (request.method === "chat.abort") { + fallbackAbort = request; + return { ok: true, aborted: true, runIds: ["pre-accepted-run"] }; + } + throw new Error(`unexpected gateway method ${String(request.method)}`); + }); + + const run = agentCliCommand( + { message: "hi", to: "+1555", runId: "pre-accepted-run" }, + runtime, + { + process: signals.processLike, + }, + ); + await Promise.resolve(); + signals.emit("SIGTERM"); + + await run; + expect(callGateway).toHaveBeenCalledTimes(2); + expect(runtime.exit).toHaveBeenCalledWith(143); + expect(sameConnectionAborts).toHaveLength(5); + expect(sameConnectionAborts.at(-1)).toEqual({ + method: "chat.abort", + opts: { timeoutMs: 2_000 }, + params: { + sessionKey: "agent:main:main", + runId: "pre-accepted-run", + }, + }); + expect(fallbackAbort?.method).toBe("chat.abort"); + expect(fallbackAbort?.timeoutMs).toBe(2_000); + expect(fallbackAbort?.params).toEqual({ + sessionKey: "agent:main:main", + runId: "pre-accepted-run", + }); + expect(signals.listenerCount("SIGTERM")).toBe(0); + expect(signals.listenerCount("SIGINT")).toBe(0); + }); + }); + + it("preserves backend admin authority when SIGTERM aborts a model override run", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + let sameConnectionAbort: + | { method: string; params: unknown; opts?: { timeoutMs?: number | null } } + | undefined; + callGateway.mockImplementation(async (requestValue: unknown) => { + const request = requireRecord(requestValue, "gateway request"); + if (request.method === "agent") { + expect(request.clientName).toBe("gateway-client"); + expect(request.mode).toBe("backend"); + expect(request.scopes).toEqual(["operator.admin"]); + const onAccepted = request.onAccepted as ((payload: unknown) => void) | undefined; + const onSignalAbort = request.onSignalAbort as + | (( + request: ( + method: string, + params?: unknown, + opts?: { timeoutMs?: number | null }, + ) => Promise, + ) => Promise) + | undefined; + const signal = request.signal as AbortSignal | undefined; + onAccepted?.({ status: "accepted", runId: "run-model-sigterm" }); + return await new Promise((_, reject) => { + signal?.addEventListener( + "abort", + () => { + void (async () => { + await onSignalAbort?.(async (method, params, opts) => { + sameConnectionAbort = { method, params, opts }; + return { ok: true, aborted: true, runIds: ["run-model-sigterm"] }; + }); + const err = new Error("gateway request aborted for model override agent"); + err.name = "AbortError"; + reject(err); + })(); + }, + { once: true }, + ); + }); + } + throw new Error(`unexpected gateway method ${String(request.method)}`); + }); + + const run = agentCliCommand( + { message: "hi", to: "+1555", model: "ollama/qwen3.5:9b" }, + runtime, + { + process: signals.processLike, + }, + ); + await Promise.resolve(); + signals.emit("SIGTERM"); + + await run; + expect(callGateway).toHaveBeenCalledTimes(1); + expect(runtime.exit).toHaveBeenCalledWith(143); + expect(sameConnectionAbort?.method).toBe("chat.abort"); + expect(sameConnectionAbort?.opts).toEqual({ timeoutMs: 2_000 }); + expect(sameConnectionAbort?.params).toEqual({ + sessionKey: "agent:main:main", + runId: "run-model-sigterm", + }); + expect(signals.listenerCount("SIGTERM")).toBe(0); + expect(signals.listenerCount("SIGINT")).toBe(0); + }); + }); + + it("preserves backend admin authority for model override fallback aborts", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + const sameConnectionAborts: Array<{ + method: string; + params: unknown; + opts?: { timeoutMs?: number | null }; + }> = []; + let fallbackAbort: Record | undefined; + callGateway.mockImplementation(async (requestValue: unknown) => { + const request = requireRecord(requestValue, "gateway request"); + if (request.method === "agent") { + expect(request.clientName).toBe("gateway-client"); + expect(request.mode).toBe("backend"); + expect(request.scopes).toEqual(["operator.admin"]); + const onAccepted = request.onAccepted as ((payload: unknown) => void) | undefined; + const onSignalAbort = request.onSignalAbort as + | (( + request: ( + method: string, + params?: unknown, + opts?: { timeoutMs?: number | null }, + ) => Promise, + ) => Promise) + | undefined; + const signal = request.signal as AbortSignal | undefined; + onAccepted?.({ status: "accepted", runId: "run-model-fallback" }); + return await new Promise((_, reject) => { + signal?.addEventListener( + "abort", + () => { + void (async () => { + await onSignalAbort?.(async (method, params, opts) => { + sameConnectionAborts.push({ method, params, opts }); + return { ok: true, aborted: false, runIds: [] }; + }); + const err = new Error("gateway request aborted for model override agent"); + err.name = "AbortError"; + reject(err); + })(); + }, + { once: true }, + ); + }); + } + if (request.method === "chat.abort") { + fallbackAbort = request; + return { ok: true, aborted: true, runIds: ["run-model-fallback"] }; + } + throw new Error(`unexpected gateway method ${String(request.method)}`); + }); + + const run = agentCliCommand( + { message: "hi", to: "+1555", model: "ollama/qwen3.5:9b" }, + runtime, + { + process: signals.processLike, + }, + ); + await Promise.resolve(); + signals.emit("SIGTERM"); + + await run; + expect(callGateway).toHaveBeenCalledTimes(2); + expect(runtime.exit).toHaveBeenCalledWith(143); + expect(sameConnectionAborts).toHaveLength(5); + expect(fallbackAbort?.method).toBe("chat.abort"); + expect(fallbackAbort?.timeoutMs).toBe(2_000); + expect(fallbackAbort?.clientName).toBe("gateway-client"); + expect(fallbackAbort?.mode).toBe("backend"); + expect(fallbackAbort?.scopes).toEqual(["operator.admin"]); + expect(fallbackAbort?.params).toEqual({ + sessionKey: "agent:main:main", + runId: "run-model-fallback", + }); + expect(signals.listenerCount("SIGTERM")).toBe(0); + expect(signals.listenerCount("SIGINT")).toBe(0); + }); + }); + + it("passes SIGTERM abort signals into local agent runs", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + agentCommand.mockImplementationOnce(async (opts: { abortSignal?: AbortSignal }) => { + expect(opts.abortSignal).toBeInstanceOf(AbortSignal); + return await new Promise((_, reject) => { + opts.abortSignal?.addEventListener( + "abort", + () => { + const err = new Error("local agent aborted"); + err.name = "AbortError"; + reject(err); + }, + { once: true }, + ); + }); + }); + + const run = agentCliCommand({ message: "hi", to: "+1555", local: true }, runtime, { + process: signals.processLike, + }); + await Promise.resolve(); + signals.emit("SIGTERM"); + + await run; + expect(callGateway).not.toHaveBeenCalled(); + expect(agentCommand).toHaveBeenCalledTimes(1); + expect(runtime.exit).toHaveBeenCalledWith(143); + expect(signals.listenerCount("SIGTERM")).toBe(0); + expect(signals.listenerCount("SIGINT")).toBe(0); + }); + }); + + it("exits for local runs that resolve after SIGTERM aborts them", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + agentCommand.mockImplementationOnce(async (opts: { abortSignal?: AbortSignal }) => { + return await new Promise((resolve) => { + opts.abortSignal?.addEventListener( + "abort", + () => { + resolve({ + payloads: [], + meta: { aborted: true }, + } as unknown as Awaited>); + }, + { once: true }, + ); + }); + }); + + const run = agentCliCommand({ message: "hi", to: "+1555", local: true }, runtime, { + process: signals.processLike, + }); + await Promise.resolve(); + signals.emit("SIGTERM"); + + await expect(run).resolves.toBeUndefined(); + expect(callGateway).not.toHaveBeenCalled(); + expect(agentCommand).toHaveBeenCalledTimes(1); + expect(runtime.exit).toHaveBeenCalledWith(143); + }); + }); + + it("exits for embedded fallback runs that resolve after SIGTERM aborts them", async () => { + await withTempStore(async () => { + const signals = createSignalProcess(); + callGateway.mockRejectedValueOnce(createGatewayClosedError()); + let resolveFallback: ((value: Awaited>) => void) | undefined; + agentCommand.mockImplementationOnce(async (_opts: { abortSignal?: AbortSignal }) => { + return await new Promise((resolve) => { + resolveFallback = resolve; + }); + }); + + const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, { + process: signals.processLike, + }); + for (let attempt = 0; attempt < 10 && agentCommand.mock.calls.length === 0; attempt += 1) { + await Promise.resolve(); + } + expect(agentCommand).toHaveBeenCalledTimes(1); + signals.emit("SIGTERM"); + resolveFallback?.({ + payloads: [], + meta: { aborted: true }, + } as unknown as Awaited>); + + await expect(run).resolves.toBeUndefined(); + expect(callGateway).toHaveBeenCalledTimes(1); + expect(runtime.exit).toHaveBeenCalledWith(143); + }); + }); + + it("does not route abort errors through embedded gateway fallback classification", async () => { + await withTempStore(async () => { + const err = new Error("gateway request aborted for agent"); + err.name = "AbortError"; + callGateway.mockRejectedValueOnce(err); + + await expect(agentCliCommand({ message: "hi", to: "+1555" }, runtime)).rejects.toThrow( + "gateway request aborted for agent", + ); + + expect(isGatewayTransportError).not.toHaveBeenCalled(); + expect(agentCommand).not.toHaveBeenCalled(); + }); + }); it("stays silent when the gateway returns an intentional empty reply", async () => { await withTempStore(async () => { callGateway.mockResolvedValue({ @@ -397,6 +1057,23 @@ describe("agentCliCommand", () => { }); }); + it("surfaces duplicate in-flight gateway runs without pretending a reply arrived", async () => { + await withTempStore(async () => { + callGateway.mockResolvedValue({ + runId: "idem-1", + status: "in_flight", + sessionKey: "agent:main:main", + }); + + await agentCliCommand({ message: "hi", to: "+1555", runId: "idem-1" }, runtime); + + expect(runtime.error).toHaveBeenCalledWith( + "Agent run idem-1 is already in flight; not starting a duplicate run.", + ); + expect(runtime.log).not.toHaveBeenCalledWith("No reply from agent."); + }); + }); + it("passes model overrides through gateway requests", async () => { await withTempStore(async () => { mockGatewaySuccessReply(); diff --git a/src/commands/agent-via-gateway.ts b/src/commands/agent-via-gateway.ts index b64d31cd19c0..0111953e15e7 100644 --- a/src/commands/agent-via-gateway.ts +++ b/src/commands/agent-via-gateway.ts @@ -6,7 +6,12 @@ import type { CliDeps } from "../cli/deps.types.js"; import { withProgress } from "../cli/progress.js"; import { getRuntimeConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { callGateway, isGatewayTransportError, randomIdempotencyKey } from "../gateway/call.js"; +import { + callGateway, + isGatewayTransportError, + randomIdempotencyKey, + type GatewayRequestFunction, +} from "../gateway/call.js"; import { ADMIN_SCOPE } from "../gateway/operator-scopes.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../gateway/protocol/client-info.js"; import { routeLogsToStderr } from "../logging/console.js"; @@ -71,6 +76,27 @@ type AgentCliOpts = { local?: boolean; }; +type AgentCliSignal = "SIGINT" | "SIGTERM"; +type AgentCliProcessLike = { + on(signal: AgentCliSignal, handler: () => void): unknown; + off(signal: AgentCliSignal, handler: () => void): unknown; +}; +type AgentCliDeps = CliDeps & { + process?: AgentCliProcessLike; +}; +type AgentGatewayCallIdentity = Pick< + Parameters[0], + "clientName" | "mode" | "scopes" +>; + +const AGENT_CLI_SIGNALS: readonly AgentCliSignal[] = ["SIGINT", "SIGTERM"]; +const GATEWAY_ABORT_RETRY_DELAYS_MS = [50, 150, 300, 600] as const; +const GATEWAY_ABORT_REQUEST_TIMEOUT_MS = 2_000; +const AGENT_CLI_SIGNAL_EXIT_CODES: Record = { + SIGINT: 130, + SIGTERM: 143, +}; + function protectJsonStdout(opts: Pick): void { if (opts.json === true) { routeLogsToStderr(); @@ -173,6 +199,211 @@ function normalizeSessionKeyOptsForDispatch(opts: AgentCliOpts): AgentCliOpts { }; } +function isAbortError(err: unknown): boolean { + return err instanceof Error && err.name === "AbortError"; +} + +function readAcceptedRunContext(payload: unknown): { + runId?: string; + sessionKey?: string; +} { + if (!payload || typeof payload !== "object") { + return {}; + } + const runId = (payload as { runId?: unknown }).runId; + const sessionKey = (payload as { sessionKey?: unknown }).sessionKey; + const status = (payload as { status?: unknown }).status; + if (status !== "accepted") { + return {}; + } + return { + runId: typeof runId === "string" && runId.trim() ? runId.trim() : undefined, + sessionKey: typeof sessionKey === "string" && sessionKey.trim() ? sessionKey.trim() : undefined, + }; +} + +function createAgentCliSignalBridge(processLike: AgentCliProcessLike = process) { + const controller = new AbortController(); + let receivedSignal: AgentCliSignal | undefined; + const handlers = new Map void>(); + const detachHandlers = () => { + for (const [signal, handler] of handlers) { + processLike.off(signal, handler); + } + handlers.clear(); + }; + for (const signal of AGENT_CLI_SIGNALS) { + const handler = () => { + receivedSignal = signal; + if (!controller.signal.aborted) { + // runtime.exit may bypass finally cleanup, so first-signal self-detach is load-bearing. + controller.abort(); + detachHandlers(); + } + }; + handlers.set(signal, handler); + processLike.on(signal, handler); + } + return { + signal: controller.signal, + getReceivedSignal: () => receivedSignal, + dispose: detachHandlers, + }; +} + +function isAgentCliProcessLike(value: unknown): value is AgentCliProcessLike { + return ( + Boolean(value) && + typeof value === "object" && + typeof (value as { on?: unknown }).on === "function" && + typeof (value as { off?: unknown }).off === "function" + ); +} + +function resolveAgentCliProcessLike(deps: AgentCliDeps | undefined): AgentCliProcessLike { + if (!deps || !Object.prototype.hasOwnProperty.call(deps, "process")) { + return process; + } + const processLike = (deps as { process?: unknown }).process; + return isAgentCliProcessLike(processLike) ? processLike : process; +} + +function delayMs(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function isConfirmedChatAbortResponseForRun(value: unknown, runId: string): boolean { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return false; + } + const response = value as { aborted?: unknown; runIds?: unknown }; + if (response.aborted !== true) { + return false; + } + if (response.runIds === undefined) { + return true; + } + return Array.isArray(response.runIds) && response.runIds.includes(runId); +} + +async function abortAcceptedGatewayAgentRunWithRequest(params: { + runId: string | undefined; + sessionKey: string | undefined; + signal: AgentCliSignal | undefined; + runtime: RuntimeEnv; + request: GatewayRequestFunction; + logFailure?: boolean; +}): Promise { + if (!params.signal || !params.runId || !params.sessionKey) { + return false; + } + try { + const response = await params.request( + "chat.abort", + { + sessionKey: params.sessionKey, + runId: params.runId, + }, + { timeoutMs: GATEWAY_ABORT_REQUEST_TIMEOUT_MS }, + ); + if (isConfirmedChatAbortResponseForRun(response, params.runId)) { + return true; + } + if (params.logFailure !== false) { + params.runtime.error?.( + `Interrupted by ${params.signal}; Gateway run ${params.runId} was not confirmed aborted.`, + ); + } + return false; + } catch (err) { + if (params.logFailure !== false) { + params.runtime.error?.( + `Interrupted by ${params.signal}; failed to abort Gateway run ${params.runId}: ${String( + err, + )}`, + ); + } + return false; + } +} + +async function abortAcceptedGatewayAgentRunWithGatewayCall(params: { + runId: string | undefined; + sessionKey: string | undefined; + signal: AgentCliSignal | undefined; + runtime: RuntimeEnv; + gatewayIdentity: AgentGatewayCallIdentity; +}): Promise { + const request: GatewayRequestFunction = async >( + method: string, + requestParams?: unknown, + opts?: Parameters[2], + ): Promise => + await callGateway({ + method, + params: requestParams, + timeoutMs: opts?.timeoutMs ?? undefined, + expectFinal: opts?.expectFinal, + ...params.gatewayIdentity, + }); + for (const [attempt, retryDelayMs] of [...GATEWAY_ABORT_RETRY_DELAYS_MS, 0].entries()) { + const isFinalAttempt = attempt === GATEWAY_ABORT_RETRY_DELAYS_MS.length; + const aborted = await abortAcceptedGatewayAgentRunWithRequest({ + runId: params.runId, + sessionKey: params.sessionKey, + signal: params.signal, + runtime: params.runtime, + request, + logFailure: isFinalAttempt, + }); + if (aborted || isFinalAttempt) { + return; + } + await delayMs(retryDelayMs); + } +} + +async function abortAcceptedGatewayAgentRunOnActiveConnection(params: { + runId: string | undefined; + sessionKey: string | undefined; + signal: AgentCliSignal | undefined; + runtime: RuntimeEnv; + request: GatewayRequestFunction; +}): Promise { + for (const [attempt, retryDelayMs] of [...GATEWAY_ABORT_RETRY_DELAYS_MS, 0].entries()) { + const isFinalAttempt = attempt === GATEWAY_ABORT_RETRY_DELAYS_MS.length; + const aborted = await abortAcceptedGatewayAgentRunWithRequest({ + runId: params.runId, + sessionKey: params.sessionKey, + signal: params.signal, + runtime: params.runtime, + request: params.request, + logFailure: false, + }); + if (aborted || isFinalAttempt) { + return aborted; + } + await delayMs(retryDelayMs); + } + return false; +} + +function exitForReceivedSignal(signal: AgentCliSignal | undefined, runtime: RuntimeEnv): boolean { + if (!signal) { + return false; + } + runtime.exit(AGENT_CLI_SIGNAL_EXIT_CODES[signal]); + return true; +} + +function returnAfterSignalExit( + value: T, + signal: AgentCliSignal | undefined, + runtime: RuntimeEnv, +): T | undefined { + return exitForReceivedSignal(signal, runtime) ? undefined : value; +} + function createGatewayTimeoutFallbackSessionId(): string { return `${GATEWAY_TIMEOUT_FALLBACK_SESSION_PREFIX}${randomUUID()}`; } @@ -227,7 +458,21 @@ function buildGatewayJsonResponse(response: GatewayAgentResponse): GatewayAgentR }; } -async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: RuntimeEnv) { +function isInFlightGatewayAgentResponse(response: GatewayAgentResponse): boolean { + return response.status === "in_flight"; +} + +function formatInFlightGatewayAgentMessage(response: GatewayAgentResponse): string { + return response.runId + ? `Agent run ${response.runId} is already in flight; not starting a duplicate run.` + : "Agent run is already in flight; not starting a duplicate run."; +} + +async function agentViaGatewayCommand( + opts: AgentCliOpts, + runtime: RuntimeEnv, + signalBridge: ReturnType, +) { protectJsonStdout(opts); const body = (opts.message ?? "").trim(); const explicitSessionKey = opts.sessionKey?.trim(); @@ -271,44 +516,90 @@ async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: RuntimeEnv) { const idempotencyKey = normalizeOptionalString(opts.runId) || randomIdempotencyKey(); const modelOverride = normalizeOptionalString(opts.model); const hasModelOverride = Boolean(modelOverride); + const gatewayIdentity: AgentGatewayCallIdentity = hasModelOverride + ? { + clientName: GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT, + mode: GATEWAY_CLIENT_MODES.BACKEND, + scopes: [ADMIN_SCOPE], + } + : { + clientName: GATEWAY_CLIENT_NAMES.CLI, + mode: GATEWAY_CLIENT_MODES.CLI, + }; - const response: GatewayAgentResponse = await withProgress( - { - label: "Waiting for agent reply…", - indeterminate: true, - enabled: opts.json !== true, - }, - async () => - await callGateway({ - method: "agent", - params: { - message: body, - agentId, - model: modelOverride, - to: opts.to, - replyTo: opts.replyTo, - sessionId: opts.sessionId, - sessionKey, - thinking: opts.thinking, - deliver: Boolean(opts.deliver), - channel, - replyChannel: opts.replyChannel, - replyAccountId: opts.replyAccount, - bestEffortDeliver: opts.bestEffortDeliver, - timeout: timeoutSeconds, - lane: opts.lane, - extraSystemPrompt: opts.extraSystemPrompt, - idempotencyKey, - }, - expectFinal: true, - timeoutMs: gatewayTimeoutMs, - clientName: hasModelOverride - ? GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT - : GATEWAY_CLIENT_NAMES.CLI, - mode: hasModelOverride ? GATEWAY_CLIENT_MODES.BACKEND : GATEWAY_CLIENT_MODES.CLI, - ...(hasModelOverride ? { scopes: [ADMIN_SCOPE] } : {}), - }), - ); + let acceptedRunId: string | undefined = idempotencyKey; + let acceptedSessionKey: string | undefined = sessionKey; + let acceptedGatewayRun = false; + let activeConnectionAbortAttempted = false; + let activeConnectionAbortSucceeded = false; + let response: GatewayAgentResponse; + try { + response = await withProgress( + { + label: "Waiting for agent reply…", + indeterminate: true, + enabled: opts.json !== true, + }, + async () => + await callGateway({ + method: "agent", + params: { + message: body, + agentId, + model: modelOverride, + to: opts.to, + replyTo: opts.replyTo, + sessionId: opts.sessionId, + sessionKey, + thinking: opts.thinking, + deliver: Boolean(opts.deliver), + channel, + replyChannel: opts.replyChannel, + replyAccountId: opts.replyAccount, + bestEffortDeliver: opts.bestEffortDeliver, + timeout: timeoutSeconds, + lane: opts.lane, + extraSystemPrompt: opts.extraSystemPrompt, + idempotencyKey, + }, + expectFinal: true, + timeoutMs: gatewayTimeoutMs, + signal: signalBridge.signal, + onAccepted: (payload) => { + acceptedGatewayRun = true; + const accepted = readAcceptedRunContext(payload); + acceptedRunId = accepted.runId ?? acceptedRunId; + acceptedSessionKey = accepted.sessionKey ?? acceptedSessionKey; + }, + onSignalAbort: async (request) => { + activeConnectionAbortAttempted = true; + activeConnectionAbortSucceeded = await abortAcceptedGatewayAgentRunOnActiveConnection({ + runId: acceptedRunId, + sessionKey: acceptedSessionKey, + signal: signalBridge.getReceivedSignal(), + runtime, + request, + }); + }, + ...gatewayIdentity, + }), + ); + } catch (err) { + if ( + isAbortError(err) && + !activeConnectionAbortSucceeded && + (acceptedGatewayRun || activeConnectionAbortAttempted) + ) { + await abortAcceptedGatewayAgentRunWithGatewayCall({ + runId: acceptedRunId, + sessionKey: acceptedSessionKey, + signal: signalBridge.getReceivedSignal(), + runtime, + gatewayIdentity, + }); + } + throw err; + } if (opts.json) { writeRuntimeJson(runtime, buildGatewayJsonResponse(response)); @@ -318,6 +609,11 @@ async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: RuntimeEnv) { const result = response?.result; const payloads = result?.payloads ?? []; + if (isInFlightGatewayAgentResponse(response)) { + runtime.error?.(formatInFlightGatewayAgentMessage(response)); + return response; + } + if (payloads.length === 0) { if (response?.status !== "ok") { runtime.log(response?.summary ? response.summary : "No reply from agent."); @@ -335,62 +631,87 @@ async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: RuntimeEnv) { return response; } -export async function agentCliCommand(opts: AgentCliOpts, runtime: RuntimeEnv, deps?: CliDeps) { +export async function agentCliCommand( + opts: AgentCliOpts, + runtime: RuntimeEnv, + deps?: AgentCliDeps, +) { protectJsonStdout(opts); const dispatchOpts = normalizeSessionKeyOptsForDispatch(opts); validateExplicitSessionKeyForDispatch(dispatchOpts); + const signalBridge = createAgentCliSignalBridge(resolveAgentCliProcessLike(deps)); const localOpts = { ...dispatchOpts, agentId: dispatchOpts.agent, replyAccountId: dispatchOpts.replyAccount, cleanupBundleMcpOnRunEnd: true, cleanupCliLiveSessionOnRunEnd: true, + abortSignal: signalBridge.signal, }; - if (dispatchOpts.local === true) { - return await agentCommand(localOpts, runtime, deps); - } - try { - return await agentViaGatewayCommand(dispatchOpts, runtime); - } catch (err) { - if (isGatewayAgentTimeoutError(err)) { - const fallbackAgentId = resolveAgentIdForGatewayTimeoutFallback(dispatchOpts); - const fallbackSession = createGatewayTimeoutFallbackSession(fallbackAgentId); + if (dispatchOpts.local === true) { + const result = await agentCommand(localOpts, runtime, deps); + return returnAfterSignalExit(result, signalBridge.getReceivedSignal(), runtime); + } + + try { + const result = await agentViaGatewayCommand(dispatchOpts, runtime, signalBridge); + return returnAfterSignalExit(result, signalBridge.getReceivedSignal(), runtime); + } catch (err) { + if (isAbortError(err)) { + if (exitForReceivedSignal(signalBridge.getReceivedSignal(), runtime)) { + return undefined; + } + throw err; + } + if (isGatewayAgentTimeoutError(err)) { + const fallbackAgentId = resolveAgentIdForGatewayTimeoutFallback(dispatchOpts); + const fallbackSession = createGatewayTimeoutFallbackSession(fallbackAgentId); + runtime.error?.( + `EMBEDDED FALLBACK: Gateway agent timed out; running embedded agent with fresh session ${fallbackSession.sessionId}: ${String(err)}`, + ); + const result = await agentCommand( + { + ...localOpts, + sessionId: fallbackSession.sessionId, + sessionKey: fallbackSession.sessionKey, + runId: fallbackSession.sessionId, + resultMetaOverrides: { + ...EMBEDDED_FALLBACK_META, + fallbackReason: "gateway_timeout", + fallbackSessionId: fallbackSession.sessionId, + fallbackSessionKey: fallbackSession.sessionKey, + }, + }, + runtime, + deps, + ); + return returnAfterSignalExit(result, signalBridge.getReceivedSignal(), runtime); + } + + if (!isGatewayAgentEmbeddedFallbackError(err)) { + throw err; + } + runtime.error?.( - `EMBEDDED FALLBACK: Gateway agent timed out; running embedded agent with fresh session ${fallbackSession.sessionId}: ${String(err)}`, + `EMBEDDED FALLBACK: Gateway agent failed; running embedded agent: ${String(err)}`, ); - return await agentCommand( + const result = await agentCommand( { ...localOpts, - sessionId: fallbackSession.sessionId, - sessionKey: fallbackSession.sessionKey, - runId: fallbackSession.sessionId, - resultMetaOverrides: { - ...EMBEDDED_FALLBACK_META, - fallbackReason: "gateway_timeout", - fallbackSessionId: fallbackSession.sessionId, - fallbackSessionKey: fallbackSession.sessionKey, - }, + resultMetaOverrides: EMBEDDED_FALLBACK_META, }, runtime, deps, ); + return returnAfterSignalExit(result, signalBridge.getReceivedSignal(), runtime); } - - if (!isGatewayAgentEmbeddedFallbackError(err)) { - throw err; + } catch (err) { + if (isAbortError(err) && exitForReceivedSignal(signalBridge.getReceivedSignal(), runtime)) { + return undefined; } - - runtime.error?.( - `EMBEDDED FALLBACK: Gateway agent failed; running embedded agent: ${String(err)}`, - ); - return await agentCommand( - { - ...localOpts, - resultMetaOverrides: EMBEDDED_FALLBACK_META, - }, - runtime, - deps, - ); + throw err; + } finally { + signalBridge.dispose(); } } diff --git a/src/gateway/call.test.ts b/src/gateway/call.test.ts index ff7529490aec..95d239c14b33 100644 --- a/src/gateway/call.test.ts +++ b/src/gateway/call.test.ts @@ -75,7 +75,12 @@ let lastClientOptions: { let lastRequestOptions: { method?: string; params?: unknown; - opts?: { expectFinal?: boolean; timeoutMs?: number | null }; + opts?: { + expectFinal?: boolean; + timeoutMs?: number | null; + signal?: AbortSignal; + onAccepted?: (payload: unknown) => void; + }; } | null = null; type StartMode = "hello" | "close" | "connect-error" | "silent" | "startup-retry-then-hello"; let startMode: StartMode = "hello"; @@ -185,7 +190,12 @@ class StubGatewayClient { async request( method: string, params: unknown, - opts?: { expectFinal?: boolean; timeoutMs?: number | null }, + opts?: { + expectFinal?: boolean; + timeoutMs?: number | null; + signal?: AbortSignal; + onAccepted?: (payload: unknown) => void; + }, ) { lastRequestOptions = { method, params, opts }; return { ok: true }; @@ -1195,6 +1205,166 @@ describe("callGateway error details", () => { expect(lastRequestOptions?.opts?.timeoutMs).toBe(45_000); }); + it("forwards caller abort signal and accepted callback to client requests", async () => { + setLocalLoopbackGatewayConfig(); + const controller = new AbortController(); + const onAccepted = vi.fn(); + + await callGateway({ + method: "agent", + expectFinal: true, + signal: controller.signal, + onAccepted, + }); + + expect(lastRequestOptions?.method).toBe("agent"); + expect(lastRequestOptions?.opts?.signal).toBe(controller.signal); + expect(lastRequestOptions?.opts?.onAccepted).toBe(onAccepted); + }); + + it("runs the signal abort hook on the active gateway connection before teardown", async () => { + setLocalLoopbackGatewayConfig(); + + const controller = new AbortController(); + const abortRequests: Array<{ + method: string; + params: unknown; + opts?: { timeoutMs?: number | null }; + }> = []; + let stopStarted = false; + + testing.setDepsForTests({ + createGatewayClient: (opts) => + ({ + async request( + method: string, + params: unknown, + requestOpts?: { + expectFinal?: boolean; + timeoutMs?: number | null; + signal?: AbortSignal; + }, + ) { + lastRequestOptions = { method, params, opts: requestOpts }; + if (method === "agent") { + return await new Promise((_, reject) => { + requestOpts?.signal?.addEventListener( + "abort", + () => { + const err = new Error("gateway request aborted for agent"); + err.name = "AbortError"; + reject(err); + }, + { once: true }, + ); + }); + } + abortRequests.push({ method, params, opts: requestOpts }); + return { ok: true }; + }, + start() { + opts.onHelloOk?.({ + features: { + methods: helloMethods ?? [], + events: [], + }, + } as unknown as Parameters>[0]); + }, + stop() {}, + async stopAndWait() { + stopStarted = true; + }, + }) as never, + getRuntimeConfig: getRuntimeConfig as unknown as () => OpenClawConfig, + loadOrCreateDeviceIdentity: () => deviceIdentityState.value, + resolveGatewayPort: resolveGatewayPort as unknown as ( + cfg?: OpenClawConfig, + env?: NodeJS.ProcessEnv, + ) => number, + }); + + const promise = callGateway({ + method: "agent", + expectFinal: true, + signal: controller.signal, + onSignalAbort: async (request) => { + await request("chat.abort", { sessionKey: "main", runId: "run-1" }, { timeoutMs: 5_000 }); + }, + }); + + await vi.waitFor(() => { + expect(lastRequestOptions?.method).toBe("agent"); + }); + controller.abort(); + + await expect(promise).rejects.toThrow("gateway request aborted for agent"); + expect(abortRequests).toEqual([ + { + method: "chat.abort", + params: { sessionKey: "main", runId: "run-1" }, + opts: { timeoutMs: 5_000 }, + }, + ]); + expect(stopStarted).toBe(true); + }); + + it("skips the signal abort hook before the primary request starts", async () => { + setLocalLoopbackGatewayConfig(); + + const controller = new AbortController(); + const onSignalAbort = vi.fn(async () => undefined); + let startCalled = false; + let stopStarted = false; + + testing.setDepsForTests({ + createGatewayClient: () => + ({ + async request( + method: string, + params: unknown, + requestOpts?: { + expectFinal?: boolean; + timeoutMs?: number | null; + signal?: AbortSignal; + }, + ) { + lastRequestOptions = { method, params, opts: requestOpts }; + return { ok: true }; + }, + start() { + startCalled = true; + }, + stop() {}, + async stopAndWait() { + stopStarted = true; + }, + }) as never, + getRuntimeConfig: getRuntimeConfig as unknown as () => OpenClawConfig, + loadOrCreateDeviceIdentity: () => deviceIdentityState.value, + resolveGatewayPort: resolveGatewayPort as unknown as ( + cfg?: OpenClawConfig, + env?: NodeJS.ProcessEnv, + ) => number, + }); + + const promise = callGateway({ + method: "agent", + expectFinal: true, + signal: controller.signal, + onSignalAbort, + }); + + await vi.waitFor(() => { + expect(startCalled).toBe(true); + }); + controller.abort(); + + await expect(promise).rejects.toThrow("gateway request aborted for agent"); + expect(onSignalAbort).not.toHaveBeenCalled(); + expect(lastRequestOptions).toBeNull(); + expect(stopStarted).toBe(true); + }); + it("passes configured gateway handshake timeout to the client watchdog", async () => { getRuntimeConfig.mockReturnValue({ gateway: { mode: "local", bind: "loopback", handshakeTimeoutMs: 30_000 }, diff --git a/src/gateway/call.ts b/src/gateway/call.ts index 417d977a01b0..b56894d88d35 100644 --- a/src/gateway/call.ts +++ b/src/gateway/call.ts @@ -24,6 +24,7 @@ import { GatewayClient, isGatewayConnectAssemblyError, type GatewayClientOptions, + type GatewayClientRequestOptions, } from "./client.js"; import { buildGatewayConnectionDetailsWithResolvers, @@ -49,6 +50,12 @@ import { import { MIN_CLIENT_PROTOCOL_VERSION, PROTOCOL_VERSION } from "./protocol/index.js"; export type { GatewayConnectionDetails }; +export type GatewayRequestFunction = >( + method: string, + params?: unknown, + opts?: GatewayClientRequestOptions, +) => Promise; + type CallGatewayBaseOptions = { url?: string; token?: string; @@ -59,6 +66,9 @@ type CallGatewayBaseOptions = { params?: unknown; expectFinal?: boolean; timeoutMs?: number; + signal?: AbortSignal; + onAccepted?: GatewayClientRequestOptions["onAccepted"]; + onSignalAbort?: (request: GatewayRequestFunction) => Promise | void; clientName?: GatewayClientName; clientDisplayName?: string; clientVersion?: string; @@ -619,6 +629,12 @@ function createGatewayTimeoutTransportError(params: { }); } +function createGatewayRequestAbortError(method: string): Error { + const err = new Error(`gateway request aborted for ${method}`); + err.name = "AbortError"; + return err; +} + function ensureGatewaySupportsRequiredMethods(params: { requiredMethods: string[] | undefined; methods: string[] | undefined; @@ -672,26 +688,75 @@ async function executeGatewayRequestWithScopes(params: { safeTimerTimeoutMs, } = params; return await new Promise((resolve, reject) => { + if (opts.signal?.aborted) { + reject(createGatewayRequestAbortError(opts.method)); + return; + } let settled = false; let ignoreClose = false; const startAbort = new AbortController(); - const stop = (err?: Error, value?: T) => { - if (settled) { - return; - } - settled = true; + let abortHandler: (() => void) | undefined; + let client: GatewayClient | undefined; + let timer: NodeJS.Timeout | undefined; + let primaryRequestStarted = false; + const cleanup = () => { startAbort.abort(); - clearTimeout(timer); - void stopGatewayClient(client).finally(() => { + if (abortHandler) { + opts.signal?.removeEventListener("abort", abortHandler); + } + if (timer) { + clearTimeout(timer); + } + }; + const stopClientThenSettle = ( + activeClient: GatewayClient | undefined, + err?: Error, + value?: T, + ) => { + const complete = () => { if (err) { reject(err); } else { resolve(value as T); } - }); + }; + if (!activeClient) { + complete(); + return; + } + void stopGatewayClient(activeClient).finally(complete); }; + const stop = (err?: Error, value?: T) => { + if (settled) { + return; + } + settled = true; + cleanup(); + stopClientThenSettle(client, err, value); + }; + abortHandler = () => { + if (settled) { + return; + } + ignoreClose = true; + settled = true; + cleanup(); + const err = createGatewayRequestAbortError(opts.method); + const activeClient = client; + const stopAfterAbortHook = () => stopClientThenSettle(activeClient, err); + if (!activeClient || !opts.onSignalAbort || !primaryRequestStarted) { + stopAfterAbortHook(); + return; + } + const request: GatewayRequestFunction = activeClient.request.bind(activeClient); + void Promise.resolve() + .then(() => opts.onSignalAbort?.(request)) + .catch(() => {}) + .finally(stopAfterAbortHook); + }; + opts.signal?.addEventListener("abort", abortHandler, { once: true }); - const client = gatewayCallDeps.createGatewayClient({ + client = gatewayCallDeps.createGatewayClient({ url, token, password, @@ -719,9 +784,16 @@ async function executeGatewayRequestWithScopes(params: { methods: hello.features?.methods, attemptedMethod: opts.method, }); - const result = await client.request(opts.method, opts.params, { + const activeClient = client; + if (!activeClient) { + throw new Error("gateway client not initialized"); + } + primaryRequestStarted = true; + const result = await activeClient.request(opts.method, opts.params, { expectFinal: opts.expectFinal, timeoutMs: opts.timeoutMs, + signal: opts.signal, + onAccepted: opts.onAccepted, }); ignoreClose = true; stop(undefined, result); @@ -752,7 +824,7 @@ async function executeGatewayRequestWithScopes(params: { }, }); - const timer = setTimeout(() => { + timer = setTimeout(() => { ignoreClose = true; stop( createGatewayTimeoutTransportError({ diff --git a/src/gateway/client.ts b/src/gateway/client.ts index 8e2996531299..61b15c4aa624 100644 --- a/src/gateway/client.ts +++ b/src/gateway/client.ts @@ -61,6 +61,17 @@ type Pending = { reject: (err: unknown) => void; expectFinal: boolean; timeout: NodeJS.Timeout | null; + cleanup?: () => void; + onAccepted?: (payload: unknown) => void; + acceptedNotified?: boolean; +}; + +export type GatewayClientRequestOptions = { + expectFinal?: boolean; + timeoutMs?: number | null; + signal?: AbortSignal; + /** Called once for expectFinal requests after an accepted response, before the final result. */ + onAccepted?: (payload: unknown) => void; }; type GatewayClientErrorShape = { @@ -1031,12 +1042,20 @@ export class GatewayClient { const payload = parsed.payload as { status?: unknown } | undefined; const status = payload?.status; if (pending.expectFinal && status === "accepted") { + if (!pending.acceptedNotified) { + pending.acceptedNotified = true; + try { + pending.onAccepted?.(parsed.payload); + } catch (err) { + logDebug( + `gateway client accepted callback error: ${formatGatewayClientErrorForLog(err)}`, + ); + } + } return; } this.pending.delete(parsed.id); - if (pending.timeout) { - clearTimeout(pending.timeout); - } + pending.cleanup?.(); if (parsed.ok) { pending.resolve(parsed.payload); } else { @@ -1120,9 +1139,7 @@ export class GatewayClient { private flushPendingErrors(err: Error) { for (const [, p] of this.pending) { - if (p.timeout) { - clearTimeout(p.timeout); - } + p.cleanup?.(); p.reject(err); } this.pending.clear(); @@ -1185,11 +1202,14 @@ export class GatewayClient { async request>( method: string, params?: unknown, - opts?: { expectFinal?: boolean; timeoutMs?: number | null }, + opts?: GatewayClientRequestOptions, ): Promise { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { throw new Error("gateway not connected"); } + if (opts?.signal?.aborted) { + throw createGatewayRequestAbortError(method); + } const id = randomUUID(); const frame: RequestFrame = { type: "req", id, method, params }; if (!validateRequestFrame(frame)) { @@ -1206,22 +1226,49 @@ export class GatewayClient { : expectFinal ? null : this.requestTimeoutMs; + const signal = opts?.signal; const p = new Promise((resolve, reject) => { + let abortHandler: (() => void) | undefined; const timeout = timeoutMs === null ? null : setTimeout(() => { + const pending = this.pending.get(id); this.pending.delete(id); + pending?.cleanup?.(); reject(new Error(`gateway request timeout for ${method}`)); }, timeoutMs); + const cleanup = () => { + if (timeout) { + clearTimeout(timeout); + } + if (signal && abortHandler) { + signal.removeEventListener("abort", abortHandler); + } + }; + abortHandler = () => { + const pending = this.pending.get(id); + this.pending.delete(id); + pending?.cleanup?.(); + reject(createGatewayRequestAbortError(method)); + }; this.pending.set(id, { resolve: (value) => resolve(value as T), reject, expectFinal, timeout, + cleanup, + onAccepted: opts?.onAccepted, }); + signal?.addEventListener("abort", abortHandler, { once: true }); }); this.ws.send(JSON.stringify(frame)); return p; } } + +function createGatewayRequestAbortError(method: string): Error { + const err = new Error(`gateway request aborted for ${method}`); + err.name = "AbortError"; + return err; +} diff --git a/src/gateway/client.watchdog.test.ts b/src/gateway/client.watchdog.test.ts index 3c3bf5b7b1d5..6a985f824ec1 100644 --- a/src/gateway/client.watchdog.test.ts +++ b/src/gateway/client.watchdog.test.ts @@ -307,6 +307,90 @@ describe("GatewayClient", () => { } }); + test("notifies accepted expectFinal requests while continuing to wait for final", async () => { + const client = new GatewayClient({ + requestTimeoutMs: 25, + }); + const send = vi.fn(); + ( + client as unknown as { + ws: WebSocket | { readyState: number; send: (data: string) => void; close: () => void }; + } + ).ws = { + readyState: WebSocket.OPEN, + send, + close: vi.fn(), + }; + + const onAccepted = vi.fn(); + const requestPromise = client.request<{ status: string }>("agent", undefined, { + expectFinal: true, + onAccepted, + }); + const frame = JSON.parse(String(send.mock.calls[0]?.[0])) as { id: string }; + + ( + client as unknown as { + handleMessage: (raw: string) => void; + } + ).handleMessage( + JSON.stringify({ + type: "res", + id: frame.id, + ok: true, + payload: { status: "accepted", runId: "run-1" }, + }), + ); + + expect(onAccepted).toHaveBeenCalledWith({ status: "accepted", runId: "run-1" }); + expect((client as unknown as { pending: Map }).pending.size).toBe(1); + + ( + client as unknown as { + handleMessage: (raw: string) => void; + } + ).handleMessage( + JSON.stringify({ + type: "res", + id: frame.id, + ok: true, + payload: { status: "ok" }, + }), + ); + + await expect(requestPromise).resolves.toEqual({ status: "ok" }); + expect((client as unknown as { pending: Map }).pending.size).toBe(0); + }); + + test("aborts in-flight requests from caller AbortSignal", async () => { + const client = new GatewayClient({ + requestTimeoutMs: 25, + }); + const send = vi.fn(); + ( + client as unknown as { + ws: WebSocket | { readyState: number; send: () => void; close: () => void }; + } + ).ws = { + readyState: WebSocket.OPEN, + send, + close: vi.fn(), + }; + + const controller = new AbortController(); + const requestPromise = client.request("status", undefined, { + signal: controller.signal, + timeoutMs: null, + }); + expect(send).toHaveBeenCalledTimes(1); + expect((client as unknown as { pending: Map }).pending.size).toBe(1); + + controller.abort(); + + await expect(requestPromise).rejects.toThrow("gateway request aborted for status"); + expect((client as unknown as { pending: Map }).pending.size).toBe(0); + }); + test("clamps oversized explicit request timeouts before scheduling", async () => { vi.useFakeTimers(); try { diff --git a/src/gateway/server-maintenance.test.ts b/src/gateway/server-maintenance.test.ts index 3c0c0925069a..8404d65bca87 100644 --- a/src/gateway/server-maintenance.test.ts +++ b/src/gateway/server-maintenance.test.ts @@ -306,6 +306,43 @@ describe("startGatewayMaintenanceTimers", () => { stopMaintenanceTimers(timers); }); + it("keeps pending accepted agent dedupe entries until their run expiry", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const now = Date.now(); + deps.dedupe.set("agent:pending-agent", { + ts: now - DEDUPE_TTL_MS - 1, + ok: true, + payload: { + runId: "pending-agent", + sessionKey: "agent:main:main", + status: "accepted", + expiresAtMs: now + 120_000, + }, + }); + deps.dedupe.set("agent:expired-pending-agent", { + ts: now - DEDUPE_TTL_MS - 1, + ok: true, + payload: { + runId: "expired-pending-agent", + sessionKey: "agent:main:main", + status: "accepted", + expiresAtMs: now - 1, + }, + }); + + const timers = startGatewayMaintenanceTimers(deps); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.dedupe.has("agent:pending-agent")).toBe(true); + expect(deps.dedupe.has("agent:expired-pending-agent")).toBe(false); + + stopMaintenanceTimers(timers); + }); + it("keeps active exec approval dedupe aliases past the normal ttl", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index 011bbd9599b6..cb03d31cc720 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -106,6 +106,20 @@ export function startGatewayMaintenanceTimers(params: { : undefined : undefined; }; + const isPendingAcceptedAgentDedupeKey = (key: string, dedupeEntry: DedupeEntry) => { + if (!key.startsWith("agent:")) { + return false; + } + const payload = dedupeEntry.payload; + if (!payload || typeof payload !== "object" || Array.isArray(payload)) { + return false; + } + if ((payload as { status?: unknown }).status !== "accepted") { + return false; + } + const expiresAtMs = (payload as { expiresAtMs?: unknown }).expiresAtMs; + return typeof expiresAtMs === "number" && Number.isFinite(expiresAtMs) && expiresAtMs > now; + }; const isActiveRunDedupeKey = (key: string, dedupeEntry: DedupeEntry) => { if (!key.startsWith("agent:") && !key.startsWith("chat:")) { return false; @@ -118,7 +132,7 @@ export function startGatewayMaintenanceTimers(params: { return key.startsWith("agent:") ? entry.kind === "agent" : entry.kind !== "agent"; }; for (const [k, v] of params.dedupe) { - if (isActiveRunDedupeKey(k, v)) { + if (isActiveRunDedupeKey(k, v) || isPendingAcceptedAgentDedupeKey(k, v)) { continue; } if (now - v.ts > DEDUPE_TTL_MS) { @@ -128,7 +142,10 @@ export function startGatewayMaintenanceTimers(params: { if (params.dedupe.size > DEDUPE_MAX) { const excess = params.dedupe.size - DEDUPE_MAX; const oldestKeys = [...params.dedupe.entries()] - .filter(([key, entry]) => !isActiveRunDedupeKey(key, entry)) + .filter( + ([key, entry]) => + !isActiveRunDedupeKey(key, entry) && !isPendingAcceptedAgentDedupeKey(key, entry), + ) .toSorted(([, left], [, right]) => left.ts - right.ts) .slice(0, excess) .map(([key]) => key); diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 35d81f92ead0..3f90efd291b9 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -1,5 +1,5 @@ import fs from "node:fs/promises"; -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { registerExecApprovalFollowupRuntimeHandoff, resetExecApprovalFollowupRuntimeHandoffsForTests, @@ -91,6 +91,15 @@ vi.mock("../../config/config.js", async () => { vi.mock("../../agents/agent-scope.js", () => ({ listAgentIds: mocks.listAgentIds, resolveDefaultAgentId: () => "main", + resolveSessionAgentId: ({ + sessionKey, + }: { + sessionKey?: string | null; + config?: Record; + }) => { + const m = /^agent:([^:]+):/.exec((sessionKey ?? "").trim()); + return m?.[1] ?? "main"; + }, resolveAgentConfig: (cfg: { agents?: { list?: Array<{ id?: string }> } }, agentId: string) => cfg.agents?.list?.find((agent) => agent.id === agentId), resolveAgentWorkspaceDir: (cfg: { agents?: { defaults?: { workspace?: string } } }) => @@ -1847,7 +1856,10 @@ describe("gateway agent handler", () => { await flushScheduledDispatchStep(); expect(mocks.agentCommand).toHaveBeenCalledTimes(agentCommandCallsBefore + 1); - expect(mockCallArg(secondRespond, 0, 3)).toEqual({ cached: true }); + expect(mockCallArg(secondRespond, 0, 3)).toEqual({ + cached: true, + runId: firstRegistration.idempotencyKey, + }); }); it("reserves exec approval followup dedupe before awaited session work", async () => { @@ -1932,9 +1944,12 @@ describe("gateway agent handler", () => { expect(sessionWriteCalls).toBe(1); expect(mockCallArg(secondRespond, 0, 1)).toMatchObject({ runId: firstRegistration.idempotencyKey, - status: "accepted", + status: "in_flight", + }); + expect(mockCallArg(secondRespond, 0, 3)).toEqual({ + cached: true, + runId: firstRegistration.idempotencyKey, }); - expect(mockCallArg(secondRespond, 0, 3)).toEqual({ cached: true }); releaseFirstSessionWrite?.(); await first; @@ -3508,10 +3523,38 @@ describe("gateway agent handler", () => { }); describe("gateway agent handler chat.abort integration", () => { - afterEach(() => { + function resetIntegrationState() { + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetDetachedTaskLifecycleRuntimeForTests(); + resetTaskRegistryForTests(); mocks.agentCommand.mockReset(); + mocks.loadConfigReturn = {}; + mocks.loadGatewaySessionRow.mockReset(); + mocks.loadSessionEntry.mockReset(); + mocks.updateSessionStore.mockReset(); mocks.getLatestSubagentRunByChildSessionKey.mockReset(); mocks.replaceSubagentRunAfterSteer.mockReset(); + mocks.resolveExplicitAgentSessionKey.mockReset().mockReturnValue(undefined); + mocks.resolveBareResetBootstrapFileAccess.mockReset().mockReturnValue(true); + mocks.listAgentIds.mockReset().mockReturnValue(["main"]); + mocks.loadVoiceWakeRoutingConfig.mockReset(); + mocks.resolveVoiceWakeRouteByTrigger.mockReset(); + mocks.resolveSendPolicy.mockReset().mockReturnValue("allow"); + dateOnlyFakeClockActive = false; + vi.useRealTimers(); + resetExecApprovalFollowupRuntimeHandoffsForTests(); + } + + beforeEach(() => { + resetIntegrationState(); + }); + + afterEach(() => { + resetIntegrationState(); }); function prime(sessionId = "existing-session-id", cfg: Record = {}) { @@ -3553,6 +3596,7 @@ describe("gateway agent handler chat.abort integration", () => { prime(); mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); + const context = makeContext(); const respond = vi.fn(); const runId = "idem-yield-before-dispatch"; const pending = invokeAgent( @@ -3562,17 +3606,25 @@ describe("gateway agent handler chat.abort integration", () => { sessionKey: "agent:main:main", idempotencyKey: runId, }, - { respond, reqId: runId, flushDispatch: false }, + { context, respond, reqId: runId, flushDispatch: false }, ); await Promise.resolve(); await Promise.resolve(); expect(mockCallArg(respond)).toBe(true); - expectRecordFields(mockCallArg(respond, 0, 1), { + const acceptedPayload = expectRecordFields(mockCallArg(respond, 0, 1), { runId, status: "accepted", }); + expect(acceptedPayload).not.toHaveProperty("dedupeKeys"); + expect(acceptedPayload).not.toHaveProperty("ownerConnId"); + expect(acceptedPayload).not.toHaveProperty("ownerDeviceId"); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + status: "accepted", + dedupeKeys: [`agent:${runId}`], + }); expect(mockCallArg(respond, 0, 2)).toBeUndefined(); expect(mockCallArg(respond, 0, 3)).toEqual({ runId }); expect(mocks.agentCommand).not.toHaveBeenCalled(); @@ -3585,11 +3637,669 @@ describe("gateway agent handler chat.abort integration", () => { expect(mocks.agentCommand).toHaveBeenCalledTimes(1); }); - it("uses the explicit no-timeout agent expiry instead of the chat 24h cap", async () => { + it("does not dispatch when chat.abort lands during the accepted ack yield", async () => { prime(); mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); const context = makeContext(); + const respond = vi.fn(); + const runId = "idem-abort-before-dispatch"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, respond, reqId: runId, flushDispatch: false }, + ); + + expectRecordFields(mockCallArg(respond, 0, 1), { + runId, + sessionKey: "agent:main:main", + status: "accepted", + }); + expect(context.chatAbortControllers.has(runId)).toBe(true); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:main", runId }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expectRecordFields(mockCallArg(abortRespond, 0, 1), { + aborted: true, + runIds: [runId], + }); + expect(context.chatAbortControllers.has(runId)).toBe(false); + + await flushScheduledDispatchStep(); + + expect(mocks.agentCommand).not.toHaveBeenCalled(); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + status: "timeout", + summary: "aborted", + stopReason: "rpc", + }); + const finalResponse = respond.mock.calls.find( + (call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout", + ); + expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], { + runId, + status: "timeout", + stopReason: "rpc", + }); + }); + + it("does not dispatch when chat.abort lands during pre-accept setup", async () => { + prime(); + const requestedSessionKey = "agent:main:legacy-main"; + let releaseSessionWrite: (() => void) | undefined; + let sessionWriteCalls = 0; + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + sessionWriteCalls += 1; + if (sessionWriteCalls === 1) { + await new Promise((resolve) => { + releaseSessionWrite = resolve; + }); + } + const store = { + "agent:main:main": buildExistingMainStoreEntry(), + }; + return await updater(store); + }); + mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); + + const context = makeContext(); + const respond = vi.fn(); + const runId = "idem-abort-before-registration"; + const pending = invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: requestedSessionKey, + idempotencyKey: runId, + }, + { context, respond, reqId: runId, flushDispatch: false }, + ); + await waitForAssertion(() => expect(sessionWriteCalls).toBe(1)); + expect(context.chatAbortControllers.has(runId)).toBe(false); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: requestedSessionKey, + status: "accepted", + }); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: requestedSessionKey, runId }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expectRecordFields(mockCallArg(abortRespond, 0, 1), { + aborted: true, + runIds: [runId], + }); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: requestedSessionKey, + status: "timeout", + summary: "aborted", + stopReason: "rpc", + }); + + releaseSessionWrite?.(); + await pending; + await flushScheduledDispatchStep(); + + expect(mocks.agentCommand).not.toHaveBeenCalled(); + expect(context.chatAbortControllers.has(runId)).toBe(false); + const finalResponse = respond.mock.calls.find( + (call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout", + ); + expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], { + runId, + status: "timeout", + stopReason: "rpc", + }); + }); + + it("does not dispatch when a stop command lands during pre-accept setup", async () => { + prime(); + const requestedSessionKey = "agent:main:legacy-main"; + let releaseSessionWrite: (() => void) | undefined; + let sessionWriteCalls = 0; + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + sessionWriteCalls += 1; + if (sessionWriteCalls === 1) { + await new Promise((resolve) => { + releaseSessionWrite = resolve; + }); + } + const store = { + "agent:main:main": buildExistingMainStoreEntry(), + }; + return await updater(store); + }); + mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); + + const context = makeContext(); + const respond = vi.fn(); + const runId = "idem-stop-before-registration"; + const pending = invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: requestedSessionKey, + idempotencyKey: runId, + }, + { context, respond, reqId: runId, flushDispatch: false }, + ); + await waitForAssertion(() => expect(sessionWriteCalls).toBe(1)); + expect(context.chatAbortControllers.has(runId)).toBe(false); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: requestedSessionKey, + status: "accepted", + }); + + const stopRespond = vi.fn(); + await chatHandlers["chat.send"]({ + params: { + sessionKey: requestedSessionKey, + message: "/stop", + idempotencyKey: "idem-stop-command-before-registration", + }, + respond: stopRespond as never, + context, + req: { type: "req", id: "stop-req", method: "chat.send" }, + client: null, + isWebchatConnect: () => false, + }); + + expectRecordFields(mockCallArg(stopRespond, 0, 1), { + aborted: true, + runIds: [runId], + }); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: requestedSessionKey, + status: "timeout", + summary: "aborted", + stopReason: "stop", + }); + + releaseSessionWrite?.(); + await pending; + await flushScheduledDispatchStep(); + + expect(mocks.agentCommand).not.toHaveBeenCalled(); + expect(context.chatAbortControllers.has(runId)).toBe(false); + const finalResponse = respond.mock.calls.find( + (call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout", + ); + expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], { + runId, + status: "timeout", + stopReason: "stop", + }); + }); + + it("does not dispatch when session-level chat.abort lands during pre-accept setup", async () => { + prime(); + let releaseSessionWrite: (() => void) | undefined; + let sessionWriteCalls = 0; + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + sessionWriteCalls += 1; + if (sessionWriteCalls === 1) { + await new Promise((resolve) => { + releaseSessionWrite = resolve; + }); + } + const store = { + "agent:main:main": buildExistingMainStoreEntry(), + }; + return await updater(store); + }); + mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); + + const context = makeContext(); + const respond = vi.fn(); + const runId = "idem-session-level-abort-before-registration"; + const pending = invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, respond, reqId: runId, flushDispatch: false }, + ); + await waitForAssertion(() => expect(sessionWriteCalls).toBe(1)); + expect(context.chatAbortControllers.has(runId)).toBe(false); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:main" }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expectRecordFields(mockCallArg(abortRespond, 0, 1), { + aborted: true, + runIds: [runId], + }); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: "agent:main:main", + status: "timeout", + summary: "aborted", + stopReason: "rpc", + }); + + releaseSessionWrite?.(); + await pending; + await flushScheduledDispatchStep(); + + expect(mocks.agentCommand).not.toHaveBeenCalled(); + expect(context.chatAbortControllers.has(runId)).toBe(false); + const finalResponse = respond.mock.calls.find( + (call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout", + ); + expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], { + runId, + status: "timeout", + stopReason: "rpc", + }); + }); + + it("does not dispatch when chat.abort lands during slow attachment setup", async () => { + mockMainSessionEntry({ + sessionId: "existing-session-id", + model: "vision-model", + modelProvider: "test", + }); + mocks.updateSessionStore.mockResolvedValue(undefined); + mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); + + let releaseCatalog: (() => void) | undefined; + const context = { + ...makeContext(), + loadGatewayModelCatalog: vi.fn( + async () => + await new Promise((resolve) => { + releaseCatalog = () => + resolve([ + { + id: "vision-model", + name: "vision-model", + provider: "test", + input: ["image"], + }, + ]); + }), + ), + } as unknown as GatewayRequestContext; + const respond = vi.fn(); + const runId = "idem-abort-during-attachment-setup"; + const pending = invokeAgent( + { + message: "inspect this", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + attachments: [ + { + type: "file", + mimeType: "image/png", + fileName: "pixel.png", + content: Buffer.from("not really a png").toString("base64"), + }, + ], + }, + { context, respond, reqId: runId, flushDispatch: false }, + ); + + await waitForAssertion(() => + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: "agent:main:main", + status: "accepted", + }), + ); + expect(context.chatAbortControllers.has(runId)).toBe(false); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:main", runId }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expectRecordFields(mockCallArg(abortRespond, 0, 1), { + aborted: true, + runIds: [runId], + }); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: "agent:main:main", + status: "timeout", + summary: "aborted", + stopReason: "rpc", + }); + + releaseCatalog?.(); + await pending; + await flushScheduledDispatchStep(); + + expect(mocks.agentCommand).not.toHaveBeenCalled(); + expect(context.chatAbortControllers.has(runId)).toBe(false); + const finalResponse = respond.mock.calls.find( + (call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout", + ); + expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], { + runId, + status: "timeout", + stopReason: "rpc", + }); + }); + + it("does not dispatch when chat.abort lands before voice wake reroutes the session", async () => { + let releaseRouting: (() => void) | undefined; + mocks.loadVoiceWakeRoutingConfig.mockImplementation( + async () => + await new Promise((resolve) => { + releaseRouting = () => + resolve({ + version: 1, + defaultTarget: { mode: "current" }, + routes: [], + updatedAtMs: 0, + }); + }), + ); + mocks.resolveVoiceWakeRouteByTrigger.mockReturnValue({ sessionKey: "agent:main:voice" }); + mocks.loadSessionEntry.mockImplementation((sessionKey: string) => ({ + cfg: {}, + storePath: "/tmp/sessions.json", + entry: { + sessionId: sessionKey === "agent:main:voice" ? "voice-session-id" : "main-session-id", + updatedAt: Date.now(), + }, + canonicalKey: sessionKey === "agent:main:voice" ? "agent:main:voice" : "agent:main:main", + })); + mocks.updateSessionStore.mockResolvedValue(undefined); + mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); + + const context = makeContext(); + const respond = vi.fn(); + const runId = "idem-abort-before-voice-route"; + const pending = invokeAgent( + { + message: "wake up", + sessionKey: "agent:main:main", + voiceWakeTrigger: "robot wake", + idempotencyKey: runId, + }, + { context, respond, reqId: runId, flushDispatch: false }, + ); + + await waitForAssertion(() => + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: "agent:main:main", + status: "accepted", + }), + ); + expect(context.chatAbortControllers.has(runId)).toBe(false); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:main", runId }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expectRecordFields(mockCallArg(abortRespond, 0, 1), { + aborted: true, + runIds: [runId], + }); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: "agent:main:main", + status: "timeout", + summary: "aborted", + stopReason: "rpc", + }); + + releaseRouting?.(); + await pending; + await flushScheduledDispatchStep(); + + expect(mocks.agentCommand).not.toHaveBeenCalled(); + expect(context.chatAbortControllers.has(runId)).toBe(false); + const finalResponse = respond.mock.calls.find( + (call: unknown[]) => (call[1] as { status?: unknown } | undefined)?.status === "timeout", + ); + expectRecordFields(requireValue(finalResponse, "terminal response missing")[1], { + runId, + status: "timeout", + stopReason: "rpc", + }); + }); + + it("rejects unauthorized chat.abort during pre-accept setup", async () => { + prime(); + let releaseSessionWrite: (() => void) | undefined; + let sessionWriteCalls = 0; + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + sessionWriteCalls += 1; + if (sessionWriteCalls === 1) { + await new Promise((resolve) => { + releaseSessionWrite = resolve; + }); + } + const store = { + "agent:main:main": buildExistingMainStoreEntry(), + }; + return await updater(store); + }); + mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); + + const context = makeContext(); + const respond = vi.fn(); + const runId = "idem-abort-before-registration-unauthorized"; + const pending = invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { + context, + respond, + reqId: runId, + flushDispatch: false, + client: { connId: "owner-conn" } as AgentHandlerArgs["client"], + }, + ); + await waitForAssertion(() => expect(sessionWriteCalls).toBe(1)); + expect(context.chatAbortControllers.has(runId)).toBe(false); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:main", runId }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: { connId: "other-conn" } as AgentHandlerArgs["client"], + isWebchatConnect: () => false, + }); + + expect(mockCallArg(abortRespond, 0, 0)).toBe(false); + expectRecordFields(mockCallArg(abortRespond, 0, 2), { + code: "INVALID_REQUEST", + message: "unauthorized", + }); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + sessionKey: "agent:main:main", + status: "accepted", + }); + + releaseSessionWrite?.(); + await pending; + await flushScheduledDispatchStep(); + + expect(mocks.agentCommand).toHaveBeenCalledTimes(1); + expect(context.chatAbortControllers.has(runId)).toBe(true); + }); + + it("updates exec approval followup aliases when chat.abort lands during pre-accept setup", async () => { + const bashElevated = { + enabled: true, + allowed: true, + defaultLevel: "on" as const, + }; + const firstRegistration = registerExecApprovalFollowupRuntimeHandoff({ + approvalId: "req-elevated-preaccept-abort", + sessionKey: "agent:main:telegram:direct:123", + bashElevated, + }); + const secondRegistration = registerExecApprovalFollowupRuntimeHandoff({ + approvalId: "req-elevated-preaccept-abort", + sessionKey: "agent:main:telegram:direct:123", + bashElevated, + }); + if (!firstRegistration || !secondRegistration) { + throw new Error("expected runtime handoff ids"); + } + mockMainSessionEntry({ + sessionId: "existing-session-id", + lastChannel: "telegram", + lastTo: "123", + }); + let releaseSessionWrite: (() => void) | undefined; + let sessionWriteCalls = 0; + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + sessionWriteCalls += 1; + if (sessionWriteCalls === 1) { + await new Promise((resolve) => { + releaseSessionWrite = resolve; + }); + } + const store = { + "agent:main:main": buildExistingMainStoreEntry({ + lastChannel: "telegram", + lastTo: "123", + }), + }; + return await updater(store); + }); + mocks.agentCommand.mockImplementation(() => new Promise(() => {})); + const context = makeContext(); + const runId = firstRegistration.idempotencyKey; + const aliasKey = "agent:exec-approval-followup:req-elevated-preaccept-abort"; + + const pending = invokeAgent( + { + message: "exec followup", + sessionKey: "agent:main:telegram:direct:123", + channel: "telegram", + idempotencyKey: runId, + internalRuntimeHandoffId: firstRegistration.handoffId, + }, + { + reqId: "exec-followup-preaccept-abort-1", + client: backendGatewayClient(), + context, + flushDispatch: false, + }, + ); + await waitForAssertion(() => expect(sessionWriteCalls).toBe(1)); + expectRecordFields(context.dedupe.get(aliasKey)?.payload, { + runId, + sessionKey: "agent:main:telegram:direct:123", + status: "accepted", + }); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:telegram:direct:123", runId }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: backendGatewayClient(), + isWebchatConnect: () => false, + }); + + expectRecordFields(mockCallArg(abortRespond, 0, 1), { + aborted: true, + runIds: [runId], + }); + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + status: "timeout", + stopReason: "rpc", + }); + expectRecordFields(context.dedupe.get(aliasKey)?.payload, { + runId, + status: "timeout", + stopReason: "rpc", + }); + + releaseSessionWrite?.(); + await pending; + + const retryRespond = await invokeAgent( + { + message: "exec followup duplicate", + sessionKey: "agent:main:telegram:direct:123", + channel: "telegram", + idempotencyKey: secondRegistration.idempotencyKey, + internalRuntimeHandoffId: secondRegistration.handoffId, + }, + { + reqId: "exec-followup-preaccept-abort-2", + client: backendGatewayClient(), + context, + flushDispatch: false, + }, + ); + + expect(mockCallArg(retryRespond, 0, 1)).toMatchObject({ + runId, + status: "timeout", + stopReason: "rpc", + }); + expect(mocks.agentCommand).not.toHaveBeenCalled(); + }); + + it("uses the explicit no-timeout agent expiry instead of the chat 24h cap", async () => { + prime(); + mocks.agentCommand.mockImplementation(() => new Promise(() => {})); + + const context = makeContext(); + const respond = vi.fn(); const runId = "idem-abort-no-timeout"; await invokeAgent( { @@ -3599,7 +4309,7 @@ describe("gateway agent handler chat.abort integration", () => { idempotencyKey: runId, timeout: 0, }, - { context, reqId: runId }, + { context, respond, reqId: runId }, ); const entry = context.chatAbortControllers.get(runId); @@ -3682,6 +4392,56 @@ describe("gateway agent handler chat.abort integration", () => { expect(context.chatAbortControllers.has(runId)).toBe(false); }); + it("chat.abort by runId allows the owner connection to use a stale session key", async () => { + prime(); + const pending = new Promise(() => {}); + let capturedSignal: AbortSignal | undefined; + mocks.agentCommand.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => { + capturedSignal = opts.abortSignal; + return pending; + }); + + const context = makeContext(); + const runId = "idem-abort-stale-session-key"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { + context, + reqId: runId, + client: { connId: "owner-conn" } as AgentHandlerArgs["client"], + }, + ); + + const active = requireValue(context.chatAbortControllers.get(runId), "active run missing"); + context.chatAbortControllers.set(runId, { + ...active, + sessionKey: "agent:main:canonical", + }); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:main", runId }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: { connId: "owner-conn" } as AgentHandlerArgs["client"], + isWebchatConnect: () => false, + }); + + expect(mockCallArg(abortRespond)).toBe(true); + expectRecordFields(mockCallArg(abortRespond, 0, 1), { + aborted: true, + runIds: [runId], + }); + expect(capturedSignal?.aborted).toBe(true); + expect(context.chatAbortControllers.has(runId)).toBe(false); + }); + it("keeps the sessions.abort wait snapshot after late agent completion", async () => { prime(); let capturedSignal: AbortSignal | undefined; @@ -3901,10 +4661,61 @@ describe("gateway agent handler chat.abort integration", () => { ); expect(context.chatAbortControllers.get(runId)).toBe(preExisting); + expect(context.dedupe.has(`agent:${runId}`)).toBe(false); expect(mocks.agentCommand).not.toHaveBeenCalled(); expect(respond).toHaveBeenCalledWith(true, { runId, status: "in_flight" }, undefined, { cached: true, runId, }); }); + + it("returns in_flight instead of replaying cached accepted agent replies", async () => { + prime(); + mocks.agentCommand.mockImplementationOnce( + () => + new Promise(() => { + // Keep the first run pending so the dedupe entry remains accepted. + }), + ); + + const context = makeContext(); + const runId = "idem-cached-accepted"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId, flushDispatch: false }, + ); + + expectRecordFields(context.dedupe.get(`agent:${runId}`)?.payload, { + runId, + status: "accepted", + sessionKey: "agent:main:main", + }); + + const duplicateRespond = vi.fn(); + await invokeAgent( + { + message: "hi again", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: `${runId}-duplicate`, respond: duplicateRespond }, + ); + + expect(mocks.agentCommand).not.toHaveBeenCalled(); + expect(duplicateRespond).toHaveBeenCalledWith( + true, + { runId, status: "in_flight", sessionKey: "agent:main:main" }, + undefined, + { + cached: true, + runId, + }, + ); + }); }); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 64aaef9c9e15..dd7c3399a519 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -449,6 +449,68 @@ function readGatewayDedupeEntry(params: { return undefined; } +function isAcceptedAgentDedupePayload(payload: unknown): payload is { + acceptedAt?: unknown; + dedupeKeys?: unknown; + expiresAtMs?: unknown; + ownerConnId?: unknown; + ownerDeviceId?: unknown; + runId?: unknown; + sessionKey?: unknown; + status: "accepted"; +} { + return ( + typeof payload === "object" && + payload !== null && + (payload as { status?: unknown }).status === "accepted" + ); +} + +function isPreRegistrationAbortedAgentDedupePayload(payload: unknown): payload is { + runId?: unknown; + sessionKey?: unknown; + status: "timeout"; + stopReason?: unknown; +} { + const stopReason = (payload as { stopReason?: unknown } | null)?.stopReason; + return ( + typeof payload === "object" && + payload !== null && + (payload as { status?: unknown }).status === "timeout" && + (stopReason === "rpc" || stopReason === "stop") + ); +} + +function isPreRegistrationAbortedAgentDedupeEntryForSession(params: { + entry: ReturnType | undefined; + runId: string; + sessionKey?: string; + alternateSessionKeys?: Array; +}): boolean { + if (!params.entry?.ok || !isPreRegistrationAbortedAgentDedupePayload(params.entry.payload)) { + return false; + } + const payload = params.entry.payload; + const payloadRunId = typeof payload.runId === "string" ? payload.runId.trim() : ""; + if (payloadRunId && payloadRunId !== params.runId) { + return false; + } + const payloadSessionKey = + typeof payload.sessionKey === "string" && payload.sessionKey.trim() + ? payload.sessionKey.trim() + : undefined; + const expectedSessionKeys = new Set( + [params.sessionKey, ...(params.alternateSessionKeys ?? [])].filter((value): value is string => + Boolean(value?.trim()), + ), + ); + return ( + !payloadSessionKey || + expectedSessionKeys.size === 0 || + expectedSessionKeys.has(payloadSessionKey) + ); +} + function setGatewayDedupeEntries(params: { dedupe: GatewayRequestContext["dedupe"]; keys: readonly string[]; @@ -463,6 +525,28 @@ function setGatewayDedupeEntries(params: { } } +function setAbortedAgentDedupeEntries(params: { + dedupe: GatewayRequestContext["dedupe"]; + keys: readonly string[]; + runId: string; + stopReason: string; +}) { + setGatewayDedupeEntries({ + dedupe: params.dedupe, + keys: params.keys, + entry: { + ts: Date.now(), + ok: true, + payload: { + runId: params.runId, + status: "timeout" as const, + summary: "aborted", + stopReason: params.stopReason, + }, + }, + }); +} + function deleteGatewayDedupeEntries(params: { dedupe: GatewayRequestContext["dedupe"]; keys: readonly string[]; @@ -716,6 +800,30 @@ export const agentHandlers: GatewayRequestHandlers = { keys: agentDedupeKeys, }); if (cached) { + if (cached.ok && isAcceptedAgentDedupePayload(cached.payload)) { + const cachedRunId = + typeof cached.payload.runId === "string" && cached.payload.runId.trim() + ? cached.payload.runId.trim() + : runId; + const cachedSessionKey = + typeof cached.payload.sessionKey === "string" && cached.payload.sessionKey.trim() + ? cached.payload.sessionKey.trim() + : undefined; + respond( + true, + { + runId: cachedRunId, + status: "in_flight" as const, + ...(cachedSessionKey ? { sessionKey: cachedSessionKey } : {}), + }, + undefined, + { + cached: true, + runId: cachedRunId, + }, + ); + return; + } respond(cached.ok, cached.payload, cached.error, { cached: true, }); @@ -723,20 +831,36 @@ export const agentHandlers: GatewayRequestHandlers = { } let agentDedupeReserved = false; let agentRunAccepted = false; - const reserveExecApprovalFollowupDedupe = () => { - if (agentDedupeReserved || !execApprovalFollowupApprovalId) { + const ownerConnId = typeof client?.connId === "string" ? client.connId : undefined; + const ownerDeviceId = + typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined; + const reservePreAcceptedAgentDedupe = (sessionKey?: string) => { + if (agentDedupeReserved || !sessionKey) { return; } + const acceptedAt = Date.now(); + const pendingTimeoutMs = resolveAgentTimeoutMs({ + cfg, + overrideSeconds: typeof request.timeout === "number" ? request.timeout : undefined, + }); setGatewayDedupeEntries({ dedupe: context.dedupe, keys: agentDedupeKeys, entry: { - ts: Date.now(), + ts: acceptedAt, ok: true, payload: { runId, status: "accepted" as const, - acceptedAt: Date.now(), + sessionKey, + acceptedAt, + dedupeKeys: agentDedupeKeys, + expiresAtMs: resolveAgentRunExpiresAtMs({ + now: acceptedAt, + timeoutMs: pendingTimeoutMs, + }), + ownerConnId, + ownerDeviceId, }, }, }); @@ -746,6 +870,18 @@ export const agentHandlers: GatewayRequestHandlers = { if (!agentDedupeReserved || agentRunAccepted) { return; } + const reservedEntry = readGatewayDedupeEntry({ + dedupe: context.dedupe, + keys: agentDedupeKeys, + }); + if ( + isPreRegistrationAbortedAgentDedupeEntryForSession({ + entry: reservedEntry, + runId, + }) + ) { + return; + } deleteGatewayDedupeEntries({ dedupe: context.dedupe, keys: agentDedupeKeys, @@ -756,91 +892,6 @@ export const agentHandlers: GatewayRequestHandlers = { const requestedBestEffortDeliver = typeof request.bestEffortDeliver === "boolean" ? request.bestEffortDeliver : undefined; - let message = (request.message ?? "").trim(); - if (!isRawModelRun) { - message = annotateInterSessionPromptText(message, inputProvenance); - } - let images: Array<{ type: "image"; data: string; mimeType: string }> = []; - let imageOrder: PromptImageOrderEntry[] = []; - if (normalizedAttachments.length > 0) { - const requestedSessionKeyRaw = - typeof request.sessionKey === "string" && request.sessionKey.trim() - ? request.sessionKey.trim() - : undefined; - - let baseProvider: string | undefined; - let baseModel: string | undefined; - if (requestedSessionKeyRaw) { - const { cfg: sessCfg, entry: sessEntry } = loadSessionEntry(requestedSessionKeyRaw); - const sessionAgentId = resolveAgentIdFromSessionKey(requestedSessionKeyRaw); - const modelRef = resolveSessionModelRef(sessCfg, sessEntry, sessionAgentId); - baseProvider = modelRef.provider; - baseModel = modelRef.model; - } - const effectiveProvider = providerOverride || baseProvider; - const effectiveModel = modelOverride || baseModel; - const supportsInlineImages = await resolveGatewayModelSupportsImages({ - loadGatewayModelCatalog: context.loadGatewayModelCatalog, - provider: effectiveProvider, - model: effectiveModel, - }); - - try { - const parsed = await parseMessageWithAttachments(message, normalizedAttachments, { - maxBytes: resolveChatAttachmentMaxBytes(cfg), - log: context.logGateway, - supportsInlineImages, - // agent.run does not yet wire a ctx.MediaPaths stage path, so reject - // non-image attachments explicitly (UnsupportedAttachmentError) - // instead of saving them where the agent cannot reach them. - acceptNonImage: false, - }); - message = parsed.message.trim(); - images = parsed.images; - imageOrder = parsed.imageOrder; - // offloadedRefs are appended as text markers to `message`; the agent - // runner will resolve them via detectAndLoadPromptImages. - } catch (err) { - // MediaOffloadError indicates a server-side storage fault (ENOSPC, EPERM, - // etc.). Map it to UNAVAILABLE so clients can retry without treating it as - // a bad request. All other errors are input-validation failures → 4xx. - logAttachmentFailure(context.logGateway, "agent attachment parse failed", err); - const isServerFault = err instanceof MediaOffloadError; - respond( - false, - undefined, - errorShape( - isServerFault ? ErrorCodes.UNAVAILABLE : ErrorCodes.INVALID_REQUEST, - String(err), - ), - ); - return; - } - } - - // Accept internal non-delivery sources (heartbeat, cron, webhook) as valid - // channel hints so subagent spawns from those parent runs are not rejected. - const isKnownGatewayChannel = (value: string): boolean => - isGatewayMessageChannel(value) || isInternalNonDeliveryChannel(value); - const channelHints = [request.channel, request.replyChannel] - .filter((value): value is string => typeof value === "string") - .map((value) => value.trim()) - .filter(Boolean); - for (const rawChannel of channelHints) { - const normalized = normalizeMessageChannel(rawChannel); - if (normalized && normalized !== "last" && !isKnownGatewayChannel(normalized)) { - respond( - false, - undefined, - errorShape( - ErrorCodes.INVALID_REQUEST, - `invalid agent params: unknown channel: ${normalized}`, - ), - ); - return; - } - } - const knownAgents = listAgentIds(cfg); const agentIdRaw = normalizeOptionalString(request.agentId) ?? ""; let agentId = agentIdRaw ? normalizeAgentId(agentIdRaw) : undefined; @@ -894,10 +945,92 @@ export const agentHandlers: GatewayRequestHandlers = { return; } } - // Exec approval followups can retry with a fresh nonce for the same approval id. - // Reserve the stable alias before awaited session/delivery work so overlaps dedupe. - reserveExecApprovalFollowupDedupe(); + // Reserve the run before awaited attachment/session/delivery work so duplicate calls dedupe and + // pre-registration chat.abort can be made durable by idempotency key. + const preAcceptedReservedSessionKey = requestedSessionKey; + reservePreAcceptedAgentDedupe(preAcceptedReservedSessionKey); + try { + let message = (request.message ?? "").trim(); + if (!isRawModelRun) { + message = annotateInterSessionPromptText(message, inputProvenance); + } + let images: Array<{ type: "image"; data: string; mimeType: string }> = []; + let imageOrder: PromptImageOrderEntry[] = []; + if (normalizedAttachments.length > 0) { + let baseProvider: string | undefined; + let baseModel: string | undefined; + if (requestedSessionKeyRaw) { + const { cfg: sessCfg, entry: sessEntry } = loadSessionEntry(requestedSessionKeyRaw); + const sessionAgentId = resolveAgentIdFromSessionKey(requestedSessionKeyRaw); + const modelRef = resolveSessionModelRef(sessCfg, sessEntry, sessionAgentId); + baseProvider = modelRef.provider; + baseModel = modelRef.model; + } + const effectiveProvider = providerOverride || baseProvider; + const effectiveModel = modelOverride || baseModel; + const supportsInlineImages = await resolveGatewayModelSupportsImages({ + loadGatewayModelCatalog: context.loadGatewayModelCatalog, + provider: effectiveProvider, + model: effectiveModel, + }); + + try { + const parsed = await parseMessageWithAttachments(message, normalizedAttachments, { + maxBytes: resolveChatAttachmentMaxBytes(cfg), + log: context.logGateway, + supportsInlineImages, + // agent.run does not yet wire a ctx.MediaPaths stage path, so reject + // non-image attachments explicitly (UnsupportedAttachmentError) + // instead of saving them where the agent cannot reach them. + acceptNonImage: false, + }); + message = parsed.message.trim(); + images = parsed.images; + imageOrder = parsed.imageOrder; + // offloadedRefs are appended as text markers to `message`; the agent + // runner will resolve them via detectAndLoadPromptImages. + } catch (err) { + // MediaOffloadError indicates a server-side storage fault (ENOSPC, EPERM, + // etc.). Map it to UNAVAILABLE so clients can retry without treating it as + // a bad request. All other errors are input-validation failures → 4xx. + logAttachmentFailure(context.logGateway, "agent attachment parse failed", err); + const isServerFault = err instanceof MediaOffloadError; + respond( + false, + undefined, + errorShape( + isServerFault ? ErrorCodes.UNAVAILABLE : ErrorCodes.INVALID_REQUEST, + String(err), + ), + ); + return; + } + } + + // Accept internal non-delivery sources (heartbeat, cron, webhook) as valid + // channel hints so subagent spawns from those parent runs are not rejected. + const isKnownGatewayChannel = (value: string): boolean => + isGatewayMessageChannel(value) || isInternalNonDeliveryChannel(value); + const channelHints = [request.channel, request.replyChannel] + .filter((value): value is string => typeof value === "string") + .map((value) => value.trim()) + .filter(Boolean); + for (const rawChannel of channelHints) { + const normalized = normalizeMessageChannel(rawChannel); + if (normalized && normalized !== "last" && !isKnownGatewayChannel(normalized)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid agent params: unknown channel: ${normalized}`, + ), + ); + return; + } + } + const voiceWakeTrigger = normalizeOptionalString(request.voiceWakeTrigger) ?? ""; const replyTo = normalizeOptionalString(request.replyTo) ?? ""; const to = normalizeOptionalString(request.to) ?? ""; @@ -1438,6 +1571,26 @@ export const agentHandlers: GatewayRequestHandlers = { const deliver = request.deliver === true && resolvedChannel !== INTERNAL_MESSAGE_CHANNEL; + const preRegistrationAbort = readGatewayDedupeEntry({ + dedupe: context.dedupe, + keys: agentDedupeKeys, + }); + if ( + isPreRegistrationAbortedAgentDedupeEntryForSession({ + entry: preRegistrationAbort, + runId, + sessionKey: resolvedSessionKey, + alternateSessionKeys: [preAcceptedReservedSessionKey, requestedSessionKey], + }) + ) { + agentRunAccepted = true; + respond(true, preRegistrationAbort?.payload, undefined, { + cached: true, + runId, + }); + return; + } + // Register before the accepted ack so an immediate chat.abort/sessions.abort // cannot race the active-run entry. Agent RPC runs use the agent timeout; // chat.send keeps the shorter chat cleanup cap. @@ -1466,15 +1619,15 @@ export const agentHandlers: GatewayRequestHandlers = { timeoutMs, now, expiresAtMs: resolveAgentRunExpiresAtMs({ now, timeoutMs }), - ownerConnId: typeof client?.connId === "string" ? client.connId : undefined, - ownerDeviceId: - typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined, + ownerConnId, + ownerDeviceId, providerId: activeModelProvider, authProviderId: activeAuthProvider, kind: "agent", }); - if (!activeRunAbort.registered && context.chatAbortControllers.has(runId)) { - agentRunAccepted = true; + const existingRunAbort = context.chatAbortControllers.get(runId); + if (!activeRunAbort.registered && existingRunAbort) { + agentRunAccepted = existingRunAbort.kind === "agent"; respond(true, { runId, status: "in_flight" as const }, undefined, { cached: true, runId, @@ -1484,9 +1637,16 @@ export const agentHandlers: GatewayRequestHandlers = { const accepted = { runId, + sessionKey: resolvedSessionKey, status: "accepted" as const, acceptedAt: Date.now(), }; + const acceptedDedupePayload = { + ...accepted, + dedupeKeys: agentDedupeKeys, + ownerConnId, + ownerDeviceId, + }; agentRunAccepted = true; // Store an in-flight ack so retries do not spawn a second run. setGatewayDedupeEntries({ @@ -1495,7 +1655,7 @@ export const agentHandlers: GatewayRequestHandlers = { entry: { ts: Date.now(), ok: true, - payload: accepted, + payload: acceptedDedupePayload, }, }); respond(true, accepted, undefined, { runId }); @@ -1508,6 +1668,27 @@ export const agentHandlers: GatewayRequestHandlers = { let dispatched = false; try { + if (activeRunAbort.controller.signal.aborted) { + setAbortedAgentDedupeEntries({ + dedupe: context.dedupe, + keys: agentDedupeKeys, + runId, + stopReason: "rpc", + }); + respond( + true, + { + runId, + status: "timeout" as const, + summary: "aborted", + stopReason: "rpc", + }, + undefined, + { runId }, + ); + return; + } + if (resolvedSessionKey) { await reactivateCompletedSubagentSession({ sessionKey: resolvedSessionKey, diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 8c0ba0504c4f..e1ca8bbf0842 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -167,6 +167,25 @@ type ChatAbortRequester = { isAdmin: boolean; }; +type PreRegisteredAgentDedupePayload = { + dedupeKeys?: unknown; + ownerConnId?: unknown; + ownerDeviceId?: unknown; + runId?: unknown; + sessionKey?: unknown; + status?: unknown; +}; + +type PreRegisteredAgentRun = { + runId: string; + sessionKey: string; + payload: PreRegisteredAgentDedupePayload; +}; + +function normalizeUnknownText(value: unknown): string | undefined { + return typeof value === "string" ? normalizeOptionalText(value) : undefined; +} + /** True when a reply payload carries at least one media reference (mediaUrl or mediaUrls). */ function isMediaBearingPayload(payload: ReplyPayload): boolean { if (payload.isReasoning === true) { @@ -1642,6 +1661,153 @@ function canRequesterAbortChatRun( return false; } +function canRequesterAbortChatRunWithoutSessionMatch( + entry: ChatAbortControllerEntry, + requester: ChatAbortRequester, +): boolean { + if (requester.isAdmin) { + return true; + } + const ownerDeviceId = normalizeOptionalText(entry.ownerDeviceId); + const ownerConnId = normalizeOptionalText(entry.ownerConnId); + return Boolean( + (ownerDeviceId && requester.deviceId && ownerDeviceId === requester.deviceId) || + (ownerConnId && requester.connId && ownerConnId === requester.connId), + ); +} + +function readPreRegisteredAgentDedupePayloadForSession(params: { + entry: GatewayRequestContext["dedupe"] extends Map ? T | undefined : never; + runId: string; + sessionKey: string; +}): PreRegisteredAgentDedupePayload | undefined { + if (!params.entry?.ok) { + return undefined; + } + const payload = params.entry.payload as PreRegisteredAgentDedupePayload | undefined; + if (payload?.status !== "accepted") { + return undefined; + } + const payloadRunId = normalizeUnknownText(payload.runId); + if (payloadRunId && payloadRunId !== params.runId) { + return undefined; + } + return normalizeUnknownText(payload.sessionKey) === params.sessionKey ? payload : undefined; +} + +function readPreRegisteredAgentRun(params: { + key: string; + entry: GatewayRequestContext["dedupe"] extends Map ? T | undefined : never; +}): PreRegisteredAgentRun | undefined { + if (!params.key.startsWith("agent:") || !params.entry?.ok) { + return undefined; + } + const payload = params.entry.payload as PreRegisteredAgentDedupePayload | undefined; + if (payload?.status !== "accepted") { + return undefined; + } + const runId = normalizeUnknownText(payload.runId) ?? normalizeOptionalText(params.key.slice(6)); + const sessionKey = normalizeUnknownText(payload.sessionKey); + if (!runId || !sessionKey) { + return undefined; + } + return { runId, sessionKey, payload }; +} + +function canRequesterAbortPreRegisteredAgentRun( + payload: PreRegisteredAgentDedupePayload, + requester: ChatAbortRequester, +): boolean { + return canRequesterAbortChatRun( + { + controller: new AbortController(), + sessionId: "", + sessionKey: normalizeUnknownText(payload.sessionKey) ?? "", + startedAtMs: 0, + expiresAtMs: 0, + ownerConnId: normalizeUnknownText(payload.ownerConnId), + ownerDeviceId: normalizeUnknownText(payload.ownerDeviceId), + kind: "agent", + }, + requester, + ); +} + +function resolvePreRegisteredAgentDedupeKeys( + payload: PreRegisteredAgentDedupePayload, + runId: string, +): string[] { + const keys = [`agent:${runId}`]; + const payloadKeys = Array.isArray(payload.dedupeKeys) ? payload.dedupeKeys : []; + for (const key of payloadKeys) { + const normalized = normalizeUnknownText(key); + if (normalized?.startsWith("agent:")) { + keys.push(normalized); + } + } + return [...new Set(keys)]; +} + +function writePreRegisteredAgentAbort(params: { + context: GatewayRequestContext; + runId: string; + sessionKey: string; + payload: PreRegisteredAgentDedupePayload; + stopReason: string; + endedAt?: number; +}) { + const endedAt = params.endedAt ?? Date.now(); + for (const key of resolvePreRegisteredAgentDedupeKeys(params.payload, params.runId)) { + setGatewayDedupeEntry({ + dedupe: params.context.dedupe, + key, + entry: { + ts: endedAt, + ok: true, + payload: { + runId: params.runId, + sessionKey: params.sessionKey, + status: "timeout" as const, + summary: "aborted", + stopReason: params.stopReason, + endedAt, + }, + }, + }); + } +} + +function resolveAuthorizedPreRegisteredAgentRunsForSessionKeys(params: { + context: GatewayRequestContext; + sessionKeys: Iterable; + requester: ChatAbortRequester; +}) { + const sessionKeys = new Set( + Array.from(params.sessionKeys, (sessionKey) => normalizeOptionalText(sessionKey)).filter( + (sessionKey): sessionKey is string => Boolean(sessionKey), + ), + ); + const authorizedByRunId = new Map(); + let matchedSessionRuns = 0; + for (const [key, entry] of params.context.dedupe) { + const run = readPreRegisteredAgentRun({ key, entry }); + if (!run || !sessionKeys.has(run.sessionKey)) { + continue; + } + if (params.context.chatAbortControllers.has(run.runId)) { + continue; + } + matchedSessionRuns += 1; + if (canRequesterAbortPreRegisteredAgentRun(run.payload, params.requester)) { + authorizedByRunId.set(run.runId, run); + } + } + return { + matchedSessionRuns, + authorizedRuns: [...authorizedByRunId.values()], + }; +} + function resolveAuthorizedRunsForSessionKeys(params: { chatAbortControllers: Map; sessionKeys: Iterable; @@ -1686,17 +1852,26 @@ async function abortChatRunsForSessionKeyWithPartials(params: { stopReason?: string; requester: ChatAbortRequester; }): Promise<{ aborted: boolean; runIds: string[]; unauthorized: boolean }> { + const sessionKeys = [params.sessionKey, ...(params.sessionKeyAliases ?? [])]; const { matchedSessionRuns, authorizedRuns } = resolveAuthorizedRunsForSessionKeys({ chatAbortControllers: params.context.chatAbortControllers, - sessionKeys: [params.sessionKey, ...(params.sessionKeyAliases ?? [])], + sessionKeys, sessionIds: [params.sessionId], requester: params.requester, }); - if (authorizedRuns.length === 0) { + const { + matchedSessionRuns: matchedPendingAgentRuns, + authorizedRuns: authorizedPendingAgentRuns, + } = resolveAuthorizedPreRegisteredAgentRunsForSessionKeys({ + context: params.context, + sessionKeys, + requester: params.requester, + }); + if (authorizedRuns.length === 0 && authorizedPendingAgentRuns.length === 0) { return { aborted: false, runIds: [], - unauthorized: matchedSessionRuns > 0, + unauthorized: matchedSessionRuns > 0 || matchedPendingAgentRuns > 0, }; } const authorizedRunIdSet = new Set(authorizedRuns.map((run) => run.runId)); @@ -1717,6 +1892,19 @@ async function abortChatRunsForSessionKeyWithPartials(params: { runIds.push(runId); } } + const endedAt = Date.now(); + const stopReason = params.stopReason ?? "rpc"; + for (const { runId, sessionKey, payload } of authorizedPendingAgentRuns) { + writePreRegisteredAgentAbort({ + context: params.context, + runId, + sessionKey, + payload, + stopReason, + endedAt, + }); + runIds.push(runId); + } const res = { aborted: runIds.length > 0, runIds, unauthorized: false }; if (res.aborted) { await persistAbortedPartials({ @@ -1926,10 +2114,34 @@ export const chatHandlers: GatewayRequestHandlers = { const active = context.chatAbortControllers.get(runId); if (!active) { + const pendingAgentEntry = context.dedupe.get(`agent:${runId}`); + const pendingAgentPayload = readPreRegisteredAgentDedupePayloadForSession({ + entry: pendingAgentEntry, + runId, + sessionKey: rawSessionKey, + }); + if (pendingAgentPayload) { + if (!canRequesterAbortPreRegisteredAgentRun(pendingAgentPayload, requester)) { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized")); + return; + } + writePreRegisteredAgentAbort({ + context, + runId, + sessionKey: rawSessionKey, + payload: pendingAgentPayload, + stopReason: "rpc", + }); + respond(true, { ok: true, aborted: true, runIds: [runId] }); + return; + } respond(true, { ok: true, aborted: false, runIds: [] }); return; } - if (active.sessionKey !== rawSessionKey) { + if ( + active.sessionKey !== rawSessionKey && + !canRequesterAbortChatRunWithoutSessionMatch(active, requester) + ) { respond( false, undefined, @@ -1945,13 +2157,13 @@ export const chatHandlers: GatewayRequestHandlers = { const partialText = context.chatRunBuffers.get(runId); const res = abortChatRunById(ops, { runId, - sessionKey: rawSessionKey, + sessionKey: active.sessionKey, stopReason: "rpc", }); if (res.aborted && partialText && partialText.trim()) { await persistAbortedPartials({ context, - sessionKey: rawSessionKey, + sessionKey: active.sessionKey, snapshots: [ { runId,