From b1e5c9d7fa1690be688a7a22c4ae46cb776ffe80 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 30 May 2026 00:56:20 +0200 Subject: [PATCH] fix(agents): centralize terminal run outcome precedence (#88136) * fix(agents): centralize terminal run outcome precedence * docs(agents): explain terminal outcome precedence * docs(agents): note terminal outcome helper * fix(agents): preserve pending hard timeout over late completion * test(agents): align global session scoping expectation * Revert "test(agents): align global session scoping expectation" This reverts commit 9b4a0c3cb1b3885299eea7081d97f7142c415dc2. * test(infra): stabilize CONNECT timeout cap test * fix(agents): prioritize hard timeout terminal evidence * fix(gateway): preserve pending hard timeout snapshots --- AGENTS.md | 1 + src/agents/agent-run-terminal-outcome.test.ts | 127 +++++++++++ src/agents/agent-run-terminal-outcome.ts | 171 +++++++++++++++ src/agents/run-wait.ts | 22 +- src/gateway/server-methods/agent-job.ts | 117 +++++++++-- .../server-methods/agent-wait-dedupe.test.ts | 143 ++++++++++++- .../server-methods/agent-wait-dedupe.ts | 86 +++++--- .../server-methods/server-methods.test.ts | 198 ++++++++++++++++++ src/infra/net/http-connect-tunnel.test.ts | 4 +- 9 files changed, 815 insertions(+), 54 deletions(-) create mode 100644 src/agents/agent-run-terminal-outcome.test.ts create mode 100644 src/agents/agent-run-terminal-outcome.ts diff --git a/AGENTS.md b/AGENTS.md index 47f22032d82c..5b2cfe2ecba9 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -72,6 +72,7 @@ Skills own workflows; root owns hard policy and routing. - Plugin SDK exception: shipped external API gets new API first plus named compat/deprecation, small tests/docs if useful, removal plan. - Migrate internal/bundled callers to modern API in the same change. Do not let internal compat become permanent architecture. - Channels are implementation under `src/channels/**`; plugin authors get SDK seams. Providers own auth/catalog/runtime hooks; core owns generic loop. +- Agent run terminal state: normalize/merge via `src/agents/agent-run-terminal-outcome.ts`; do not rederive timeout/cancel precedence in projections. - Hot paths should carry prepared facts forward: provider id, model ref, channel id, target, capability family, attachment class. Do not rediscover with broad plugin/provider/channel/capability loaders. - Do not fix repeated request-time discovery with scattered caches. Move the canonical fact earlier; reuse prepared runtime objects; delete duplicate lookup branches. - Gateway/plugin metadata is process-stable: installs, manifests, catalogs, generated paths, bundled metadata. Changes require restart or explicit owner reload/install/doctor flow. diff --git a/src/agents/agent-run-terminal-outcome.test.ts b/src/agents/agent-run-terminal-outcome.test.ts new file mode 100644 index 000000000000..0c3fa9c38ea9 --- /dev/null +++ b/src/agents/agent-run-terminal-outcome.test.ts @@ -0,0 +1,127 @@ +import { describe, expect, it } from "vitest"; +import { + buildAgentRunTerminalOutcome, + isHardAgentRunTimeoutPhase, + mergeAgentRunTerminalOutcome, +} from "./agent-run-terminal-outcome.js"; + +describe("agent run terminal outcome", () => { + it("treats provider/preflight/post-turn timeout phases as hard run timeouts", () => { + expect(isHardAgentRunTimeoutPhase("preflight")).toBe(true); + expect(isHardAgentRunTimeoutPhase("provider")).toBe(true); + expect(isHardAgentRunTimeoutPhase("post_turn")).toBe(true); + expect(isHardAgentRunTimeoutPhase("queue")).toBe(false); + expect(isHardAgentRunTimeoutPhase("gateway_draining")).toBe(false); + }); + + it("keeps queue and gateway draining timeouts non-sticky", () => { + expect( + buildAgentRunTerminalOutcome({ + status: "timeout", + }).reason, + ).toBe("timed_out"); + expect( + buildAgentRunTerminalOutcome({ + status: "timeout", + timeoutPhase: "queue", + }).reason, + ).toBe("timed_out"); + expect( + buildAgentRunTerminalOutcome({ + status: "timeout", + timeoutPhase: "gateway_draining", + }).reason, + ).toBe("timed_out"); + }); + + it("keeps explicit rpc and stop cancellations sticky even with queue attribution", () => { + const rpcCancel = buildAgentRunTerminalOutcome({ + status: "timeout", + stopReason: "rpc", + timeoutPhase: "queue", + providerStarted: false, + endedAt: 100, + }); + const lateCompletion = buildAgentRunTerminalOutcome({ + status: "ok", + endedAt: 200, + }); + + expect(rpcCancel.reason).toBe("cancelled"); + expect(rpcCancel.status).toBe("timeout"); + expect(mergeAgentRunTerminalOutcome(rpcCancel, lateCompletion)).toBe(rpcCancel); + expect( + buildAgentRunTerminalOutcome({ + status: "timeout", + stopReason: "stop", + timeoutPhase: "gateway_draining", + }).reason, + ).toBe("cancelled"); + }); + + it("does not treat successful model stop metadata as cancellation", () => { + expect( + buildAgentRunTerminalOutcome({ + status: "ok", + stopReason: "stop", + }), + ).toEqual({ + reason: "completed", + status: "ok", + stopReason: "stop", + }); + }); + + it("prefers hard timeout evidence over default rpc cancellation metadata", () => { + const timeout = buildAgentRunTerminalOutcome({ + status: "timeout", + stopReason: "rpc", + timeoutPhase: "provider", + providerStarted: true, + endedAt: 200, + }); + const earlierCompletion = buildAgentRunTerminalOutcome({ + status: "ok", + endedAt: 190, + }); + + expect(timeout.reason).toBe("hard_timeout"); + expect(timeout.status).toBe("timeout"); + expect(mergeAgentRunTerminalOutcome(timeout, earlierCompletion)).toBe(earlierCompletion); + }); + + it("keeps a hard timeout over later aborts or failures for the same run", () => { + const timeout = buildAgentRunTerminalOutcome({ + status: "timeout", + timeoutPhase: "provider", + endedAt: 200, + }); + const lateAbort = buildAgentRunTerminalOutcome({ + status: "error", + stopReason: "aborted", + endedAt: 250, + }); + const lateFailure = buildAgentRunTerminalOutcome({ + status: "error", + error: "late rejection", + endedAt: 260, + }); + + expect(mergeAgentRunTerminalOutcome(timeout, lateAbort)).toBe(timeout); + expect(mergeAgentRunTerminalOutcome(timeout, lateFailure)).toBe(timeout); + }); + + it("lets an earlier proven completion correct a provisional timeout", () => { + const timeout = buildAgentRunTerminalOutcome({ + status: "timeout", + timeoutPhase: "provider", + endedAt: 200, + }); + const earlierCompletion = buildAgentRunTerminalOutcome({ + status: "ok", + endedAt: 190, + }); + + expect(mergeAgentRunTerminalOutcome(timeout, earlierCompletion)).toBe(earlierCompletion); + }); +}); diff --git a/src/agents/agent-run-terminal-outcome.ts b/src/agents/agent-run-terminal-outcome.ts new file mode 100644 index 000000000000..141ca7d556ba --- /dev/null +++ b/src/agents/agent-run-terminal-outcome.ts @@ -0,0 +1,171 @@ +import { formatBlockedLivenessError, isBlockedLivenessState } from "../shared/agent-liveness.js"; +import { AGENT_RUN_ABORTED_ERROR, isAbortedAgentStopReason } from "./run-termination.js"; +import { + normalizeAgentRunTimeoutPhase, + normalizeProviderStarted, + type AgentRunTimeoutPhase, +} from "./run-timeout-attribution.js"; + +export type AgentRunWaitStatus = "ok" | "error" | "timeout"; + +export type AgentRunTerminalReason = + | "completed" + | "hard_timeout" + | "timed_out" + | "cancelled" + | "aborted" + | "blocked" + | "failed"; + +export type AgentRunTerminalOutcome = { + reason: AgentRunTerminalReason; + status: AgentRunWaitStatus; + error?: string; + stopReason?: string; + livenessState?: string; + timeoutPhase?: AgentRunTimeoutPhase; + providerStarted?: boolean; + startedAt?: number; + endedAt?: number; +}; + +export type AgentRunTerminalInput = { + status: AgentRunWaitStatus; + error?: unknown; + stopReason?: unknown; + livenessState?: unknown; + timeoutPhase?: unknown; + providerStarted?: unknown; + startedAt?: unknown; + endedAt?: unknown; +}; + +const HARD_TIMEOUT_PHASES = new Set(["preflight", "provider", "post_turn"]); + +function asFiniteTimestamp(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function asNonEmptyString(value: unknown): string | undefined { + return typeof value === "string" && value.trim() ? value : undefined; +} + +export function isHardAgentRunTimeoutPhase(value: unknown): value is AgentRunTimeoutPhase { + const phase = normalizeAgentRunTimeoutPhase(value); + return phase !== undefined && HARD_TIMEOUT_PHASES.has(phase); +} + +export function isHardAgentRunTimeoutOutcome( + outcome: AgentRunTerminalOutcome | undefined | null, +): boolean { + return outcome?.reason === "hard_timeout"; +} + +export function isStickyAgentRunTerminalOutcome( + outcome: AgentRunTerminalOutcome | undefined | null, +): boolean { + return outcome?.reason === "hard_timeout" || outcome?.reason === "cancelled"; +} + +function isCancellationStopReason(value: string | undefined): boolean { + return value === "rpc" || value === "stop"; +} + +export function buildAgentRunTerminalOutcome( + input: AgentRunTerminalInput, +): AgentRunTerminalOutcome { + const stopReason = asNonEmptyString(input.stopReason); + const livenessState = asNonEmptyString(input.livenessState); + const timeoutPhase = normalizeAgentRunTimeoutPhase(input.timeoutPhase); + const providerStarted = normalizeProviderStarted(input.providerStarted); + const rawError = asNonEmptyString(input.error); + // Queue and gateway-draining timeouts are wait-layer uncertainty. Only + // provider-started or provider-phase timeouts are sticky child-run facts. + const hardTimeout = + input.status === "timeout" && + (isHardAgentRunTimeoutPhase(timeoutPhase) || providerStarted === true); + const aborted = isAbortedAgentStopReason(stopReason); + // ACP/model `stop` can be a normal successful finish. Treat rpc/stop as + // cancellation only for non-success terminal payloads from abort paths. + const cancelled = input.status !== "ok" && isCancellationStopReason(stopReason); + const blocked = isBlockedLivenessState(livenessState); + const error = blocked + ? formatBlockedLivenessError(rawError) + : aborted && !rawError + ? AGENT_RUN_ABORTED_ERROR + : rawError; + const reason: AgentRunTerminalReason = blocked + ? "blocked" + : hardTimeout + ? "hard_timeout" + : aborted + ? "aborted" + : cancelled + ? "cancelled" + : input.status === "timeout" + ? "timed_out" + : input.status === "error" + ? "failed" + : "completed"; + return { + reason, + status: + reason === "completed" + ? "ok" + : input.status === "timeout" && + (reason === "hard_timeout" || reason === "timed_out" || reason === "cancelled") + ? "timeout" + : "error", + ...(error ? { error } : {}), + ...(stopReason ? { stopReason } : {}), + ...(livenessState ? { livenessState } : {}), + ...(timeoutPhase ? { timeoutPhase } : {}), + ...(providerStarted !== undefined ? { providerStarted } : {}), + ...(asFiniteTimestamp(input.startedAt) !== undefined + ? { startedAt: asFiniteTimestamp(input.startedAt) } + : {}), + ...(asFiniteTimestamp(input.endedAt) !== undefined + ? { endedAt: asFiniteTimestamp(input.endedAt) } + : {}), + }; +} + +function completedBeforeOrAtTimeout(params: { + completed: AgentRunTerminalOutcome; + timeout: AgentRunTerminalOutcome; +}): boolean { + return ( + params.completed.reason === "completed" && + typeof params.completed.endedAt === "number" && + typeof params.timeout.endedAt === "number" && + params.completed.endedAt <= params.timeout.endedAt + ); +} + +export function mergeAgentRunTerminalOutcome( + current: AgentRunTerminalOutcome | undefined, + incoming: AgentRunTerminalOutcome, +): AgentRunTerminalOutcome { + if (!current) { + return incoming; + } + if (current.reason === "cancelled") { + return current; + } + // A hard timeout owns the run unless later evidence proves completion ended + // before that timeout; late abort/error cleanup must not downgrade it. + if (isHardAgentRunTimeoutOutcome(current)) { + return completedBeforeOrAtTimeout({ completed: incoming, timeout: current }) + ? incoming + : current; + } + if (incoming.reason === "cancelled") { + return incoming; + } + if (isHardAgentRunTimeoutOutcome(incoming)) { + return completedBeforeOrAtTimeout({ completed: current, timeout: incoming }) + ? current + : incoming; + } + return incoming; +} diff --git a/src/agents/run-wait.ts b/src/agents/run-wait.ts index 0b41e3b24331..96013d75ef4c 100644 --- a/src/agents/run-wait.ts +++ b/src/agents/run-wait.ts @@ -2,7 +2,7 @@ import { callGateway } from "../gateway/call.js"; import { formatErrorMessage } from "../infra/errors.js"; import { normalizeBlockedLivenessWaitStatus } from "../shared/agent-liveness.js"; import { parseFiniteNumber } from "../shared/number-coercion.js"; -import { AGENT_RUN_ABORTED_ERROR, isAbortedAgentStopReason } from "./run-termination.js"; +import { buildAgentRunTerminalOutcome } from "./agent-run-terminal-outcome.js"; import { normalizeAgentRunTimeoutPhase, normalizeProviderStarted, @@ -66,13 +66,23 @@ function normalizeAgentWaitResult( wait?: RawAgentWaitResponse, ): AgentWaitResult { const stopReason = typeof wait?.stopReason === "string" ? wait.stopReason : undefined; - const abortedStopReason = isAbortedAgentStopReason(stopReason); - const error = - abortedStopReason && typeof wait?.error !== "string" ? AGENT_RUN_ABORTED_ERROR : wait?.error; + const terminalOutcome = + status === "pending" + ? undefined + : buildAgentRunTerminalOutcome({ + status, + error: wait?.error, + stopReason, + livenessState: wait?.livenessState, + timeoutPhase: wait?.timeoutPhase, + providerStarted: wait?.providerStarted, + startedAt: wait?.startedAt, + endedAt: wait?.endedAt, + }); const normalized = normalizeBlockedLivenessWaitStatus({ - status: abortedStopReason ? "error" : status, + status: terminalOutcome?.status ?? status, livenessState: wait?.livenessState, - error, + error: terminalOutcome?.error, }); return { status: normalized.status, diff --git a/src/gateway/server-methods/agent-job.ts b/src/gateway/server-methods/agent-job.ts index 732c1fd06cff..70177440b042 100644 --- a/src/gateway/server-methods/agent-job.ts +++ b/src/gateway/server-methods/agent-job.ts @@ -1,11 +1,9 @@ -import { AGENT_RUN_ABORTED_ERROR, isAbortedAgentStopReason } from "../../agents/run-termination.js"; import { - normalizeAgentRunTimeoutPhase, - normalizeProviderStarted, - type AgentRunTimeoutPhase, -} from "../../agents/run-timeout-attribution.js"; + buildAgentRunTerminalOutcome, + mergeAgentRunTerminalOutcome, + type AgentRunTerminalOutcome, +} from "../../agents/agent-run-terminal-outcome.js"; import { onAgentEvent } from "../../infra/agent-events.js"; -import { formatBlockedLivenessError, isBlockedLivenessState } from "../../shared/agent-liveness.js"; import { setSafeTimeout } from "../../utils/timer-delay.js"; const AGENT_RUN_CACHE_TTL_MS = 10 * 60_000; @@ -40,7 +38,7 @@ type AgentRunSnapshot = { livenessState?: string; yielded?: boolean; pendingError?: boolean; - timeoutPhase?: AgentRunTimeoutPhase; + timeoutPhase?: AgentRunTerminalOutcome["timeoutPhase"]; providerStarted?: boolean; ts: number; }; @@ -63,9 +61,44 @@ function pruneAgentRunCache(now = Date.now()) { function recordAgentRunSnapshot(entry: AgentRunSnapshot) { pruneAgentRunCache(entry.ts); + const existing = agentRunCache.get(entry.runId); + if (existing && shouldPreserveTerminalSnapshot(existing, entry)) { + agentRunCache.set(entry.runId, { + ...existing, + ts: entry.ts, + }); + return; + } agentRunCache.set(entry.runId, entry); } +function shouldPreserveTerminalSnapshot( + existing: AgentRunSnapshot, + incoming: AgentRunSnapshot, +): boolean { + const existingOutcome = terminalOutcomeFromSnapshot(existing); + const incomingOutcome = terminalOutcomeFromSnapshot(incoming); + if (!existingOutcome) { + return false; + } + if (!incomingOutcome) { + return false; + } + const terminalOutcome = mergeAgentRunTerminalOutcome(existingOutcome, incomingOutcome); + return terminalOutcome === existingOutcome; +} + +function terminalOutcomeFromSnapshot( + snapshot: AgentRunSnapshot, +): AgentRunTerminalOutcome | undefined { + if (snapshot.pendingError) { + // Pending errors are still inside retry grace; a lifecycle start can cancel + // them, so they must not participate in sticky terminal precedence. + return undefined; + } + return buildAgentRunTerminalOutcome(snapshot); +} + function clearPendingAgentRunError(runId: string) { const pending = pendingAgentRunErrors.get(runId); if (!pending) { @@ -85,6 +118,12 @@ function clearPendingAgentRunTimeout(runId: string) { } function schedulePendingAgentRunError(snapshot: AgentRunSnapshot) { + const pendingTimeout = pendingAgentRunTimeouts.get(snapshot.runId); + if (pendingTimeout && shouldPreserveTerminalSnapshot(pendingTimeout.snapshot, snapshot)) { + // A late rejection can race in before the timeout grace publishes. Keep the + // pending hard timeout so waiters observe the original terminal cause. + return; + } clearPendingAgentRunTimeout(snapshot.runId); clearPendingAgentRunError(snapshot.runId); const dueAt = Date.now() + AGENT_RUN_ERROR_RETRY_GRACE_MS; @@ -101,6 +140,12 @@ function schedulePendingAgentRunError(snapshot: AgentRunSnapshot) { } function schedulePendingAgentRunTimeout(snapshot: AgentRunSnapshot) { + const pendingTimeout = pendingAgentRunTimeouts.get(snapshot.runId); + if (pendingTimeout && shouldPreserveTerminalSnapshot(pendingTimeout.snapshot, snapshot)) { + // Keep the first hard timeout through retry grace; later timeout-shaped + // cleanup events may lose provider attribution before the cache publishes. + return; + } clearPendingAgentRunError(snapshot.runId); clearPendingAgentRunTimeout(snapshot.runId); const dueAt = Date.now() + AGENT_RUN_TIMEOUT_RETRY_GRACE_MS; @@ -166,25 +211,30 @@ function createSnapshotFromLifecycleEvent(params: { const error = typeof data?.error === "string" ? data.error : undefined; const stopReason = typeof data?.stopReason === "string" ? data.stopReason : undefined; const livenessState = typeof data?.livenessState === "string" ? data.livenessState : undefined; - const blocked = isBlockedLivenessState(livenessState); - const abortedStopReason = isAbortedAgentStopReason(stopReason); - const status = - phase === "error" || blocked || abortedStopReason ? "error" : data?.aborted ? "timeout" : "ok"; - const resolvedError = abortedStopReason && !error ? AGENT_RUN_ABORTED_ERROR : error; - const timeoutPhase = - status === "timeout" ? normalizeAgentRunTimeoutPhase(data?.timeoutPhase) : undefined; - const providerStarted = normalizeProviderStarted(data?.providerStarted); - return { - runId, + const status = phase === "error" ? "error" : data?.aborted ? "timeout" : "ok"; + const terminalOutcome = buildAgentRunTerminalOutcome({ status, + error, + stopReason, + livenessState, + timeoutPhase: data?.timeoutPhase, + providerStarted: data?.providerStarted, startedAt, endedAt, - error: blocked ? formatBlockedLivenessError(resolvedError) : resolvedError, + }); + return { + runId, + status: terminalOutcome.status, + startedAt, + endedAt, + error: terminalOutcome.error, stopReason, livenessState, ...(data?.yielded === true ? { yielded: true } : {}), - ...(timeoutPhase ? { timeoutPhase } : {}), - ...(providerStarted !== undefined ? { providerStarted } : {}), + ...(terminalOutcome.timeoutPhase ? { timeoutPhase: terminalOutcome.timeoutPhase } : {}), + ...(terminalOutcome.providerStarted !== undefined + ? { providerStarted: terminalOutcome.providerStarted } + : {}), ts: Date.now(), }; } @@ -229,6 +279,10 @@ function ensureAgentRunListener() { schedulePendingAgentRunTimeout(snapshot); return; } + const pendingTimeout = pendingAgentRunTimeouts.get(evt.runId); + if (pendingTimeout && shouldPreserveTerminalSnapshot(pendingTimeout.snapshot, snapshot)) { + return; + } clearPendingAgentRunError(evt.runId); clearPendingAgentRunTimeout(evt.runId); recordAgentRunSnapshot(snapshot); @@ -277,6 +331,7 @@ export async function waitForAgentJob(params: { let settled = false; let pendingErrorTimer: NodeJS.Timeout | undefined; let pendingTimeoutTimer: NodeJS.Timeout | undefined; + let pendingTimeoutSnapshot: AgentRunSnapshot | undefined; let onAbort: (() => void) | undefined; let removeWaiter = () => {}; @@ -294,6 +349,7 @@ export async function waitForAgentJob(params: { } clearTimeout(pendingTimeoutTimer); pendingTimeoutTimer = undefined; + pendingTimeoutSnapshot = undefined; }; const finish = (entry: AgentRunSnapshot | null) => { @@ -317,6 +373,14 @@ export async function waitForAgentJob(params: { snapshot: AgentRunSnapshot, delayMs: number, ) => { + if ( + pendingTimeoutSnapshot && + shouldPreserveTerminalSnapshot(pendingTimeoutSnapshot, snapshot) + ) { + // Mirror the shared pending map: while this waiter holds a hard timeout + // in grace, late terminal events must not replace the original cause. + return; + } clearPendingErrorTimer(); clearPendingTimeoutTimer(); const timerRef = setSafeTimeout(() => { @@ -333,6 +397,7 @@ export async function waitForAgentJob(params: { pendingErrorTimer = timerRef; } else { pendingTimeoutTimer = timerRef; + pendingTimeoutSnapshot = snapshot; } }; @@ -379,6 +444,12 @@ export async function waitForAgentJob(params: { } const latest = ignoreCachedSnapshot ? undefined : getCachedAgentRun(runId); if (latest) { + if ( + pendingTimeoutSnapshot && + shouldPreserveTerminalSnapshot(pendingTimeoutSnapshot, latest) + ) { + return; + } finish(latest); return; } @@ -395,6 +466,12 @@ export async function waitForAgentJob(params: { scheduleTimeoutFinish(snapshot); return; } + if ( + pendingTimeoutSnapshot && + shouldPreserveTerminalSnapshot(pendingTimeoutSnapshot, snapshot) + ) { + return; + } recordAgentRunSnapshot(snapshot); finish(snapshot); }); diff --git a/src/gateway/server-methods/agent-wait-dedupe.test.ts b/src/gateway/server-methods/agent-wait-dedupe.test.ts index 6d0b1e2084f3..ced2392a44a2 100644 --- a/src/gateway/server-methods/agent-wait-dedupe.test.ts +++ b/src/gateway/server-methods/agent-wait-dedupe.test.ts @@ -448,7 +448,14 @@ describe("agent wait dedupe helper", () => { kind: "agent", runId, ts: 100, - payload: { runId, status: "timeout", stopReason: "rpc", endedAt: 100 }, + payload: { + runId, + status: "timeout", + stopReason: "rpc", + timeoutPhase: "queue", + providerStarted: false, + endedAt: 100, + }, }); setRunEntry({ dedupe, @@ -468,6 +475,129 @@ describe("agent wait dedupe helper", () => { endedAt: 100, error: undefined, stopReason: "rpc", + timeoutPhase: "queue", + providerStarted: false, + }); + }); + + it("preserves an RPC cancel snapshot when a later accepted write reuses the key", () => { + const dedupe = new Map(); + const runId = "run-cancel-wins-over-accepted"; + + setRunEntry({ + dedupe, + kind: "agent", + runId, + ts: 100, + payload: { + runId, + status: "timeout", + stopReason: "rpc", + timeoutPhase: "queue", + providerStarted: false, + endedAt: 100, + }, + }); + setRunEntry({ + dedupe, + kind: "agent", + runId, + ts: 200, + payload: { runId, status: "accepted" }, + }); + + expect( + readTerminalSnapshotFromGatewayDedupe({ + dedupe, + runId, + }), + ).toEqual({ + status: "timeout", + endedAt: 100, + error: undefined, + stopReason: "rpc", + timeoutPhase: "queue", + providerStarted: false, + }); + }); + + it("lets an earlier terminal completion correct a provisional timeout snapshot", () => { + const dedupe = new Map(); + const runId = "run-earlier-completion-wins"; + + setRunEntry({ + dedupe, + kind: "agent", + runId, + ts: 200, + payload: { + runId, + status: "timeout", + timeoutPhase: "provider", + startedAt: 100, + endedAt: 200, + }, + }); + setRunEntry({ + dedupe, + kind: "agent", + runId, + ts: 250, + payload: { + runId, + status: "ok", + startedAt: 100, + endedAt: 190, + }, + }); + + expect( + readTerminalSnapshotFromGatewayDedupe({ + dedupe, + runId, + }), + ).toEqual({ + status: "ok", + startedAt: 100, + endedAt: 190, + error: undefined, + }); + }); + + it("does not make bare queue timeouts sticky", () => { + const dedupe = new Map(); + const runId = "run-queue-timeout-replaced"; + + setRunEntry({ + dedupe, + kind: "agent", + runId, + ts: 100, + payload: { + runId, + status: "timeout", + timeoutPhase: "queue", + providerStarted: false, + endedAt: 100, + }, + }); + setRunEntry({ + dedupe, + kind: "agent", + runId, + ts: 200, + payload: { runId, status: "ok", endedAt: 200 }, + }); + + expect( + readTerminalSnapshotFromGatewayDedupe({ + dedupe, + runId, + }), + ).toEqual({ + status: "ok", + endedAt: 200, + error: undefined, }); }); @@ -480,7 +610,14 @@ describe("agent wait dedupe helper", () => { kind: "chat", runId, ts: 100, - payload: { runId, status: "timeout", stopReason: "rpc", endedAt: 100 }, + payload: { + runId, + status: "timeout", + stopReason: "rpc", + timeoutPhase: "queue", + providerStarted: false, + endedAt: 100, + }, }); setRunEntry({ dedupe, @@ -501,6 +638,8 @@ describe("agent wait dedupe helper", () => { endedAt: 100, error: undefined, stopReason: "rpc", + timeoutPhase: "queue", + providerStarted: false, }); }); diff --git a/src/gateway/server-methods/agent-wait-dedupe.ts b/src/gateway/server-methods/agent-wait-dedupe.ts index cd4e64741d48..68e5aabe2cd1 100644 --- a/src/gateway/server-methods/agent-wait-dedupe.ts +++ b/src/gateway/server-methods/agent-wait-dedupe.ts @@ -1,9 +1,9 @@ -import { AGENT_RUN_ABORTED_ERROR, isAbortedAgentStopReason } from "../../agents/run-termination.js"; import { - normalizeAgentRunTimeoutPhase, - normalizeProviderStarted, - type AgentRunTimeoutPhase, -} from "../../agents/run-timeout-attribution.js"; + buildAgentRunTerminalOutcome, + isStickyAgentRunTerminalOutcome, + mergeAgentRunTerminalOutcome, + type AgentRunTerminalOutcome, +} from "../../agents/agent-run-terminal-outcome.js"; import { normalizeBlockedLivenessWaitStatus } from "../../shared/agent-liveness.js"; import { isNonTerminalAgentRunStatus } from "../../shared/agent-run-status.js"; import { asFiniteNumber } from "../../shared/number-coercion.js"; @@ -20,7 +20,7 @@ export type AgentWaitTerminalSnapshot = { livenessState?: string; yielded?: boolean; pendingError?: boolean; - timeoutPhase?: AgentRunTimeoutPhase; + timeoutPhase?: AgentRunTerminalOutcome["timeoutPhase"]; providerStarted?: boolean; }; @@ -106,28 +106,27 @@ function readTerminalSnapshotFromDedupeEntry(entry: DedupeEntry): AgentWaitTermi const stopReason = asString(payload?.stopReason) ?? asString(resultMeta?.stopReason); const livenessState = asString(payload?.livenessState) ?? asString(resultMeta?.livenessState); const yielded = payload?.yielded === true || resultMeta?.yielded === true; - const timeoutPhase = - normalizeAgentRunTimeoutPhase(payload?.timeoutPhase) ?? - normalizeAgentRunTimeoutPhase(resultMeta?.timeoutPhase); - const providerStarted = - normalizeProviderStarted(payload?.providerStarted) ?? - normalizeProviderStarted(resultMeta?.providerStarted); + const timeoutPhase = payload?.timeoutPhase ?? resultMeta?.timeoutPhase; + const providerStarted = payload?.providerStarted ?? resultMeta?.providerStarted; const errorMessage = typeof payload?.error === "string" ? payload.error : typeof payload?.summary === "string" ? payload.summary : entry.error?.message; - const abortedStopReason = isAbortedAgentStopReason(stopReason); - const normalizedError = - abortedStopReason && !errorMessage ? AGENT_RUN_ABORTED_ERROR : errorMessage; if (status === "ok" || status === "timeout") { - const normalized = normalizeBlockedLivenessWaitStatus({ - status: abortedStopReason ? "error" : status, + const terminalOutcome = buildAgentRunTerminalOutcome({ + status, livenessState, - error: normalizedError, + error: errorMessage, + stopReason, + timeoutPhase, + providerStarted, + startedAt, + endedAt, }); + const normalized = normalizeBlockedLivenessWaitStatus(terminalOutcome); return { status: normalized.status, startedAt, @@ -136,31 +135,54 @@ function readTerminalSnapshotFromDedupeEntry(entry: DedupeEntry): AgentWaitTermi normalized.status === "error" ? normalized.error : normalized.status === "timeout" - ? normalizedError + ? terminalOutcome.error : undefined, stopReason, livenessState, ...(yielded ? { yielded } : {}), - ...(timeoutPhase ? { timeoutPhase } : {}), - ...(providerStarted !== undefined ? { providerStarted } : {}), + ...(terminalOutcome.timeoutPhase ? { timeoutPhase: terminalOutcome.timeoutPhase } : {}), + ...(terminalOutcome.providerStarted !== undefined + ? { providerStarted: terminalOutcome.providerStarted } + : {}), }; } if (status === "error" || !entry.ok) { + const terminalOutcome = buildAgentRunTerminalOutcome({ + status: "error", + livenessState, + error: errorMessage, + stopReason, + timeoutPhase, + providerStarted, + startedAt, + endedAt, + }); return { status: "error", startedAt, endedAt, - error: errorMessage, + error: terminalOutcome.error, stopReason, livenessState, ...(yielded ? { yielded } : {}), - ...(timeoutPhase ? { timeoutPhase } : {}), - ...(providerStarted !== undefined ? { providerStarted } : {}), + ...(terminalOutcome.timeoutPhase ? { timeoutPhase: terminalOutcome.timeoutPhase } : {}), + ...(terminalOutcome.providerStarted !== undefined + ? { providerStarted: terminalOutcome.providerStarted } + : {}), }; } return null; } +function terminalOutcomeFromWaitSnapshot( + snapshot: AgentWaitTerminalSnapshot, +): AgentRunTerminalOutcome | undefined { + if (snapshot.pendingError) { + return undefined; + } + return buildAgentRunTerminalOutcome(snapshot); +} + export function readTerminalSnapshotFromGatewayDedupe(params: { dedupe: Map; runId: string; @@ -264,9 +286,23 @@ export function setGatewayDedupeEntry(params: { const existing = params.dedupe.get(params.key); const existingSnapshot = existing ? readTerminalSnapshotFromDedupeEntry(existing) : null; const incomingSnapshot = readTerminalSnapshotFromDedupeEntry(params.entry); - if (existingSnapshot?.status === "timeout" && existingSnapshot.stopReason === "rpc") { + const existingOutcome = existingSnapshot + ? terminalOutcomeFromWaitSnapshot(existingSnapshot) + : undefined; + const incomingOutcome = incomingSnapshot + ? terminalOutcomeFromWaitSnapshot(incomingSnapshot) + : undefined; + if (existingOutcome && isStickyAgentRunTerminalOutcome(existingOutcome) && !incomingOutcome) { + // Accepted/in-flight rewrites are not evidence against a terminal hard + // timeout or explicit cancellation already stored for this run id. return; } + if (existingOutcome && incomingOutcome && isStickyAgentRunTerminalOutcome(existingOutcome)) { + const merged = mergeAgentRunTerminalOutcome(existingOutcome, incomingOutcome); + if (merged === existingOutcome) { + return; + } + } params.dedupe.set(params.key, params.entry); const runId = parseRunIdFromDedupeKey(params.key); if (!runId) { diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index 7606e25ddbc0..1d6197e04199 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -146,6 +146,204 @@ describe("waitForAgentJob", () => { } }); + it("keeps a recorded hard timeout when a later lifecycle error arrives", async () => { + vi.useFakeTimers(); + try { + const runId = `run-timeout-late-error-${Date.now()}-${Math.random().toString(36).slice(2)}`; + const firstWait = waitForAgentJob({ runId, timeoutMs: 20_000 }); + + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "start", startedAt: 100 }, + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 100, + endedAt: 200, + aborted: true, + timeoutPhase: "provider", + }, + }); + + await vi.advanceTimersByTimeAsync(15_000); + expectRecordFields(await firstWait, { + status: "timeout", + startedAt: 100, + endedAt: 200, + timeoutPhase: "provider", + }); + + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "error", + startedAt: 100, + endedAt: 250, + error: "late rejection", + }, + }); + await vi.advanceTimersByTimeAsync(15_000); + + const secondWait = await waitForAgentJob({ runId, timeoutMs: 1_000 }); + expectRecordFields(secondWait, { + status: "timeout", + startedAt: 100, + endedAt: 200, + timeoutPhase: "provider", + }); + } finally { + vi.useRealTimers(); + } + }); + + it("keeps a pending hard timeout when a late lifecycle error arrives during grace", async () => { + vi.useFakeTimers(); + try { + const runId = `run-pending-timeout-late-error-${Date.now()}-${Math.random().toString(36).slice(2)}`; + const waitPromise = waitForAgentJob({ runId, timeoutMs: 20_000 }); + + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "start", startedAt: 100 }, + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 100, + endedAt: 200, + aborted: true, + timeoutPhase: "provider", + }, + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "error", + startedAt: 100, + endedAt: 250, + error: "late rejection", + }, + }); + + await vi.advanceTimersByTimeAsync(15_000); + expectRecordFields(await waitPromise, { + status: "timeout", + startedAt: 100, + endedAt: 200, + timeoutPhase: "provider", + }); + } finally { + vi.useRealTimers(); + } + }); + + it("keeps a pending hard timeout when a late softer timeout arrives during grace", async () => { + vi.useFakeTimers(); + try { + const runId = `run-pending-hard-timeout-late-soft-timeout-${Date.now()}-${Math.random().toString(36).slice(2)}`; + const waitPromise = waitForAgentJob({ runId, timeoutMs: 20_000 }); + + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "start", startedAt: 100 }, + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 100, + endedAt: 200, + aborted: true, + timeoutPhase: "provider", + }, + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 100, + endedAt: 250, + aborted: true, + timeoutPhase: "gateway_draining", + }, + }); + + await vi.advanceTimersByTimeAsync(15_000); + expectRecordFields(await waitPromise, { + status: "timeout", + startedAt: 100, + endedAt: 200, + timeoutPhase: "provider", + }); + + const cached = await waitForAgentJob({ runId, timeoutMs: 1_000 }); + expectRecordFields(cached, { + status: "timeout", + startedAt: 100, + endedAt: 200, + timeoutPhase: "provider", + }); + } finally { + vi.useRealTimers(); + } + }); + + it("keeps a pending hard timeout when a late lifecycle completion arrives during grace", async () => { + vi.useFakeTimers(); + try { + const runId = `run-pending-timeout-late-completion-${Date.now()}-${Math.random().toString(36).slice(2)}`; + const waitPromise = waitForAgentJob({ runId, timeoutMs: 20_000 }); + + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "start", startedAt: 100 }, + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 100, + endedAt: 200, + aborted: true, + timeoutPhase: "provider", + }, + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 100, + endedAt: 250, + }, + }); + + await vi.advanceTimersByTimeAsync(15_000); + expectRecordFields(await waitPromise, { + status: "timeout", + startedAt: 100, + endedAt: 200, + timeoutPhase: "provider", + }); + } finally { + vi.useRealTimers(); + } + }); + it("keeps non-aborted lifecycle end events as ok", async () => { const snapshot = await runLifecycleScenario({ runIdPrefix: "run-ok", diff --git a/src/infra/net/http-connect-tunnel.test.ts b/src/infra/net/http-connect-tunnel.test.ts index 62ae91559d3f..df4bd59a078f 100644 --- a/src/infra/net/http-connect-tunnel.test.ts +++ b/src/infra/net/http-connect-tunnel.test.ts @@ -353,6 +353,7 @@ describe("openHttpConnectTunnel", () => { it("caps oversized CONNECT timeouts before arming the watchdog", async () => { vi.useFakeTimers(); + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); const proxySocket = new FakeSocket(); setNextNetSocket(proxySocket); const { openHttpConnectTunnel } = await import("./http-connect-tunnel.js"); @@ -366,11 +367,12 @@ describe("openHttpConnectTunnel", () => { const rejected = expect(tunnel).rejects.toThrow( `Proxy CONNECT failed via http://proxy.example:8080: Proxy CONNECT timed out after ${MAX_TIMER_TIMEOUT_MS}ms`, ); + expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), MAX_TIMER_TIMEOUT_MS); await vi.advanceTimersByTimeAsync(1); expect(proxySocket.destroyed).toBe(false); - await vi.advanceTimersByTimeAsync(MAX_TIMER_TIMEOUT_MS - 1); + await vi.advanceTimersToNextTimerAsync(); await rejected; expect(proxySocket.destroyed).toBe(true); });