fix(agents): centralize terminal run outcome precedence (#88136)

* fix(agents): centralize terminal run outcome precedence

* docs(agents): explain terminal outcome precedence

* docs(agents): note terminal outcome helper

* fix(agents): preserve pending hard timeout over late completion

* test(agents): align global session scoping expectation

* Revert "test(agents): align global session scoping expectation"

This reverts commit 9b4a0c3cb1b3885299eea7081d97f7142c415dc2.

* test(infra): stabilize CONNECT timeout cap test

* fix(agents): prioritize hard timeout terminal evidence

* fix(gateway): preserve pending hard timeout snapshots
This commit is contained in:
Peter Steinberger
2026-05-30 00:56:20 +02:00
committed by GitHub
parent ba3eae5518
commit b1e5c9d7fa
9 changed files with 815 additions and 54 deletions

View File

@@ -72,6 +72,7 @@ Skills own workflows; root owns hard policy and routing.
- Plugin SDK exception: shipped external API gets new API first plus named compat/deprecation, small tests/docs if useful, removal plan.
- Migrate internal/bundled callers to modern API in the same change. Do not let internal compat become permanent architecture.
- Channels are implementation under `src/channels/**`; plugin authors get SDK seams. Providers own auth/catalog/runtime hooks; core owns generic loop.
- Agent run terminal state: normalize/merge via `src/agents/agent-run-terminal-outcome.ts`; do not rederive timeout/cancel precedence in projections.
- Hot paths should carry prepared facts forward: provider id, model ref, channel id, target, capability family, attachment class. Do not rediscover with broad plugin/provider/channel/capability loaders.
- Do not fix repeated request-time discovery with scattered caches. Move the canonical fact earlier; reuse prepared runtime objects; delete duplicate lookup branches.
- Gateway/plugin metadata is process-stable: installs, manifests, catalogs, generated paths, bundled metadata. Changes require restart or explicit owner reload/install/doctor flow.

View File

@@ -0,0 +1,127 @@
import { describe, expect, it } from "vitest";
import {
buildAgentRunTerminalOutcome,
isHardAgentRunTimeoutPhase,
mergeAgentRunTerminalOutcome,
} from "./agent-run-terminal-outcome.js";
describe("agent run terminal outcome", () => {
it("treats provider/preflight/post-turn timeout phases as hard run timeouts", () => {
expect(isHardAgentRunTimeoutPhase("preflight")).toBe(true);
expect(isHardAgentRunTimeoutPhase("provider")).toBe(true);
expect(isHardAgentRunTimeoutPhase("post_turn")).toBe(true);
expect(isHardAgentRunTimeoutPhase("queue")).toBe(false);
expect(isHardAgentRunTimeoutPhase("gateway_draining")).toBe(false);
});
it("keeps queue and gateway draining timeouts non-sticky", () => {
expect(
buildAgentRunTerminalOutcome({
status: "timeout",
}).reason,
).toBe("timed_out");
expect(
buildAgentRunTerminalOutcome({
status: "timeout",
timeoutPhase: "queue",
}).reason,
).toBe("timed_out");
expect(
buildAgentRunTerminalOutcome({
status: "timeout",
timeoutPhase: "gateway_draining",
}).reason,
).toBe("timed_out");
});
it("keeps explicit rpc and stop cancellations sticky even with queue attribution", () => {
const rpcCancel = buildAgentRunTerminalOutcome({
status: "timeout",
stopReason: "rpc",
timeoutPhase: "queue",
providerStarted: false,
endedAt: 100,
});
const lateCompletion = buildAgentRunTerminalOutcome({
status: "ok",
endedAt: 200,
});
expect(rpcCancel.reason).toBe("cancelled");
expect(rpcCancel.status).toBe("timeout");
expect(mergeAgentRunTerminalOutcome(rpcCancel, lateCompletion)).toBe(rpcCancel);
expect(
buildAgentRunTerminalOutcome({
status: "timeout",
stopReason: "stop",
timeoutPhase: "gateway_draining",
}).reason,
).toBe("cancelled");
});
it("does not treat successful model stop metadata as cancellation", () => {
expect(
buildAgentRunTerminalOutcome({
status: "ok",
stopReason: "stop",
}),
).toEqual({
reason: "completed",
status: "ok",
stopReason: "stop",
});
});
it("prefers hard timeout evidence over default rpc cancellation metadata", () => {
const timeout = buildAgentRunTerminalOutcome({
status: "timeout",
stopReason: "rpc",
timeoutPhase: "provider",
providerStarted: true,
endedAt: 200,
});
const earlierCompletion = buildAgentRunTerminalOutcome({
status: "ok",
endedAt: 190,
});
expect(timeout.reason).toBe("hard_timeout");
expect(timeout.status).toBe("timeout");
expect(mergeAgentRunTerminalOutcome(timeout, earlierCompletion)).toBe(earlierCompletion);
});
it("keeps a hard timeout over later aborts or failures for the same run", () => {
const timeout = buildAgentRunTerminalOutcome({
status: "timeout",
timeoutPhase: "provider",
endedAt: 200,
});
const lateAbort = buildAgentRunTerminalOutcome({
status: "error",
stopReason: "aborted",
endedAt: 250,
});
const lateFailure = buildAgentRunTerminalOutcome({
status: "error",
error: "late rejection",
endedAt: 260,
});
expect(mergeAgentRunTerminalOutcome(timeout, lateAbort)).toBe(timeout);
expect(mergeAgentRunTerminalOutcome(timeout, lateFailure)).toBe(timeout);
});
it("lets an earlier proven completion correct a provisional timeout", () => {
const timeout = buildAgentRunTerminalOutcome({
status: "timeout",
timeoutPhase: "provider",
endedAt: 200,
});
const earlierCompletion = buildAgentRunTerminalOutcome({
status: "ok",
endedAt: 190,
});
expect(mergeAgentRunTerminalOutcome(timeout, earlierCompletion)).toBe(earlierCompletion);
});
});

View File

@@ -0,0 +1,171 @@
import { formatBlockedLivenessError, isBlockedLivenessState } from "../shared/agent-liveness.js";
import { AGENT_RUN_ABORTED_ERROR, isAbortedAgentStopReason } from "./run-termination.js";
import {
normalizeAgentRunTimeoutPhase,
normalizeProviderStarted,
type AgentRunTimeoutPhase,
} from "./run-timeout-attribution.js";
export type AgentRunWaitStatus = "ok" | "error" | "timeout";
export type AgentRunTerminalReason =
| "completed"
| "hard_timeout"
| "timed_out"
| "cancelled"
| "aborted"
| "blocked"
| "failed";
export type AgentRunTerminalOutcome = {
reason: AgentRunTerminalReason;
status: AgentRunWaitStatus;
error?: string;
stopReason?: string;
livenessState?: string;
timeoutPhase?: AgentRunTimeoutPhase;
providerStarted?: boolean;
startedAt?: number;
endedAt?: number;
};
export type AgentRunTerminalInput = {
status: AgentRunWaitStatus;
error?: unknown;
stopReason?: unknown;
livenessState?: unknown;
timeoutPhase?: unknown;
providerStarted?: unknown;
startedAt?: unknown;
endedAt?: unknown;
};
const HARD_TIMEOUT_PHASES = new Set<AgentRunTimeoutPhase>(["preflight", "provider", "post_turn"]);
function asFiniteTimestamp(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function asNonEmptyString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value : undefined;
}
export function isHardAgentRunTimeoutPhase(value: unknown): value is AgentRunTimeoutPhase {
const phase = normalizeAgentRunTimeoutPhase(value);
return phase !== undefined && HARD_TIMEOUT_PHASES.has(phase);
}
export function isHardAgentRunTimeoutOutcome(
outcome: AgentRunTerminalOutcome | undefined | null,
): boolean {
return outcome?.reason === "hard_timeout";
}
export function isStickyAgentRunTerminalOutcome(
outcome: AgentRunTerminalOutcome | undefined | null,
): boolean {
return outcome?.reason === "hard_timeout" || outcome?.reason === "cancelled";
}
function isCancellationStopReason(value: string | undefined): boolean {
return value === "rpc" || value === "stop";
}
export function buildAgentRunTerminalOutcome(
input: AgentRunTerminalInput,
): AgentRunTerminalOutcome {
const stopReason = asNonEmptyString(input.stopReason);
const livenessState = asNonEmptyString(input.livenessState);
const timeoutPhase = normalizeAgentRunTimeoutPhase(input.timeoutPhase);
const providerStarted = normalizeProviderStarted(input.providerStarted);
const rawError = asNonEmptyString(input.error);
// Queue and gateway-draining timeouts are wait-layer uncertainty. Only
// provider-started or provider-phase timeouts are sticky child-run facts.
const hardTimeout =
input.status === "timeout" &&
(isHardAgentRunTimeoutPhase(timeoutPhase) || providerStarted === true);
const aborted = isAbortedAgentStopReason(stopReason);
// ACP/model `stop` can be a normal successful finish. Treat rpc/stop as
// cancellation only for non-success terminal payloads from abort paths.
const cancelled = input.status !== "ok" && isCancellationStopReason(stopReason);
const blocked = isBlockedLivenessState(livenessState);
const error = blocked
? formatBlockedLivenessError(rawError)
: aborted && !rawError
? AGENT_RUN_ABORTED_ERROR
: rawError;
const reason: AgentRunTerminalReason = blocked
? "blocked"
: hardTimeout
? "hard_timeout"
: aborted
? "aborted"
: cancelled
? "cancelled"
: input.status === "timeout"
? "timed_out"
: input.status === "error"
? "failed"
: "completed";
return {
reason,
status:
reason === "completed"
? "ok"
: input.status === "timeout" &&
(reason === "hard_timeout" || reason === "timed_out" || reason === "cancelled")
? "timeout"
: "error",
...(error ? { error } : {}),
...(stopReason ? { stopReason } : {}),
...(livenessState ? { livenessState } : {}),
...(timeoutPhase ? { timeoutPhase } : {}),
...(providerStarted !== undefined ? { providerStarted } : {}),
...(asFiniteTimestamp(input.startedAt) !== undefined
? { startedAt: asFiniteTimestamp(input.startedAt) }
: {}),
...(asFiniteTimestamp(input.endedAt) !== undefined
? { endedAt: asFiniteTimestamp(input.endedAt) }
: {}),
};
}
function completedBeforeOrAtTimeout(params: {
completed: AgentRunTerminalOutcome;
timeout: AgentRunTerminalOutcome;
}): boolean {
return (
params.completed.reason === "completed" &&
typeof params.completed.endedAt === "number" &&
typeof params.timeout.endedAt === "number" &&
params.completed.endedAt <= params.timeout.endedAt
);
}
export function mergeAgentRunTerminalOutcome(
current: AgentRunTerminalOutcome | undefined,
incoming: AgentRunTerminalOutcome,
): AgentRunTerminalOutcome {
if (!current) {
return incoming;
}
if (current.reason === "cancelled") {
return current;
}
// A hard timeout owns the run unless later evidence proves completion ended
// before that timeout; late abort/error cleanup must not downgrade it.
if (isHardAgentRunTimeoutOutcome(current)) {
return completedBeforeOrAtTimeout({ completed: incoming, timeout: current })
? incoming
: current;
}
if (incoming.reason === "cancelled") {
return incoming;
}
if (isHardAgentRunTimeoutOutcome(incoming)) {
return completedBeforeOrAtTimeout({ completed: current, timeout: incoming })
? current
: incoming;
}
return incoming;
}

View File

@@ -2,7 +2,7 @@ import { callGateway } from "../gateway/call.js";
import { formatErrorMessage } from "../infra/errors.js";
import { normalizeBlockedLivenessWaitStatus } from "../shared/agent-liveness.js";
import { parseFiniteNumber } from "../shared/number-coercion.js";
import { AGENT_RUN_ABORTED_ERROR, isAbortedAgentStopReason } from "./run-termination.js";
import { buildAgentRunTerminalOutcome } from "./agent-run-terminal-outcome.js";
import {
normalizeAgentRunTimeoutPhase,
normalizeProviderStarted,
@@ -66,13 +66,23 @@ function normalizeAgentWaitResult(
wait?: RawAgentWaitResponse,
): AgentWaitResult {
const stopReason = typeof wait?.stopReason === "string" ? wait.stopReason : undefined;
const abortedStopReason = isAbortedAgentStopReason(stopReason);
const error =
abortedStopReason && typeof wait?.error !== "string" ? AGENT_RUN_ABORTED_ERROR : wait?.error;
const terminalOutcome =
status === "pending"
? undefined
: buildAgentRunTerminalOutcome({
status,
error: wait?.error,
stopReason,
livenessState: wait?.livenessState,
timeoutPhase: wait?.timeoutPhase,
providerStarted: wait?.providerStarted,
startedAt: wait?.startedAt,
endedAt: wait?.endedAt,
});
const normalized = normalizeBlockedLivenessWaitStatus({
status: abortedStopReason ? "error" : status,
status: terminalOutcome?.status ?? status,
livenessState: wait?.livenessState,
error,
error: terminalOutcome?.error,
});
return {
status: normalized.status,

View File

@@ -1,11 +1,9 @@
import { AGENT_RUN_ABORTED_ERROR, isAbortedAgentStopReason } from "../../agents/run-termination.js";
import {
normalizeAgentRunTimeoutPhase,
normalizeProviderStarted,
type AgentRunTimeoutPhase,
} from "../../agents/run-timeout-attribution.js";
buildAgentRunTerminalOutcome,
mergeAgentRunTerminalOutcome,
type AgentRunTerminalOutcome,
} from "../../agents/agent-run-terminal-outcome.js";
import { onAgentEvent } from "../../infra/agent-events.js";
import { formatBlockedLivenessError, isBlockedLivenessState } from "../../shared/agent-liveness.js";
import { setSafeTimeout } from "../../utils/timer-delay.js";
const AGENT_RUN_CACHE_TTL_MS = 10 * 60_000;
@@ -40,7 +38,7 @@ type AgentRunSnapshot = {
livenessState?: string;
yielded?: boolean;
pendingError?: boolean;
timeoutPhase?: AgentRunTimeoutPhase;
timeoutPhase?: AgentRunTerminalOutcome["timeoutPhase"];
providerStarted?: boolean;
ts: number;
};
@@ -63,9 +61,44 @@ function pruneAgentRunCache(now = Date.now()) {
function recordAgentRunSnapshot(entry: AgentRunSnapshot) {
pruneAgentRunCache(entry.ts);
const existing = agentRunCache.get(entry.runId);
if (existing && shouldPreserveTerminalSnapshot(existing, entry)) {
agentRunCache.set(entry.runId, {
...existing,
ts: entry.ts,
});
return;
}
agentRunCache.set(entry.runId, entry);
}
function shouldPreserveTerminalSnapshot(
existing: AgentRunSnapshot,
incoming: AgentRunSnapshot,
): boolean {
const existingOutcome = terminalOutcomeFromSnapshot(existing);
const incomingOutcome = terminalOutcomeFromSnapshot(incoming);
if (!existingOutcome) {
return false;
}
if (!incomingOutcome) {
return false;
}
const terminalOutcome = mergeAgentRunTerminalOutcome(existingOutcome, incomingOutcome);
return terminalOutcome === existingOutcome;
}
function terminalOutcomeFromSnapshot(
snapshot: AgentRunSnapshot,
): AgentRunTerminalOutcome | undefined {
if (snapshot.pendingError) {
// Pending errors are still inside retry grace; a lifecycle start can cancel
// them, so they must not participate in sticky terminal precedence.
return undefined;
}
return buildAgentRunTerminalOutcome(snapshot);
}
function clearPendingAgentRunError(runId: string) {
const pending = pendingAgentRunErrors.get(runId);
if (!pending) {
@@ -85,6 +118,12 @@ function clearPendingAgentRunTimeout(runId: string) {
}
function schedulePendingAgentRunError(snapshot: AgentRunSnapshot) {
const pendingTimeout = pendingAgentRunTimeouts.get(snapshot.runId);
if (pendingTimeout && shouldPreserveTerminalSnapshot(pendingTimeout.snapshot, snapshot)) {
// A late rejection can race in before the timeout grace publishes. Keep the
// pending hard timeout so waiters observe the original terminal cause.
return;
}
clearPendingAgentRunTimeout(snapshot.runId);
clearPendingAgentRunError(snapshot.runId);
const dueAt = Date.now() + AGENT_RUN_ERROR_RETRY_GRACE_MS;
@@ -101,6 +140,12 @@ function schedulePendingAgentRunError(snapshot: AgentRunSnapshot) {
}
function schedulePendingAgentRunTimeout(snapshot: AgentRunSnapshot) {
const pendingTimeout = pendingAgentRunTimeouts.get(snapshot.runId);
if (pendingTimeout && shouldPreserveTerminalSnapshot(pendingTimeout.snapshot, snapshot)) {
// Keep the first hard timeout through retry grace; later timeout-shaped
// cleanup events may lose provider attribution before the cache publishes.
return;
}
clearPendingAgentRunError(snapshot.runId);
clearPendingAgentRunTimeout(snapshot.runId);
const dueAt = Date.now() + AGENT_RUN_TIMEOUT_RETRY_GRACE_MS;
@@ -166,25 +211,30 @@ function createSnapshotFromLifecycleEvent(params: {
const error = typeof data?.error === "string" ? data.error : undefined;
const stopReason = typeof data?.stopReason === "string" ? data.stopReason : undefined;
const livenessState = typeof data?.livenessState === "string" ? data.livenessState : undefined;
const blocked = isBlockedLivenessState(livenessState);
const abortedStopReason = isAbortedAgentStopReason(stopReason);
const status =
phase === "error" || blocked || abortedStopReason ? "error" : data?.aborted ? "timeout" : "ok";
const resolvedError = abortedStopReason && !error ? AGENT_RUN_ABORTED_ERROR : error;
const timeoutPhase =
status === "timeout" ? normalizeAgentRunTimeoutPhase(data?.timeoutPhase) : undefined;
const providerStarted = normalizeProviderStarted(data?.providerStarted);
return {
runId,
const status = phase === "error" ? "error" : data?.aborted ? "timeout" : "ok";
const terminalOutcome = buildAgentRunTerminalOutcome({
status,
error,
stopReason,
livenessState,
timeoutPhase: data?.timeoutPhase,
providerStarted: data?.providerStarted,
startedAt,
endedAt,
error: blocked ? formatBlockedLivenessError(resolvedError) : resolvedError,
});
return {
runId,
status: terminalOutcome.status,
startedAt,
endedAt,
error: terminalOutcome.error,
stopReason,
livenessState,
...(data?.yielded === true ? { yielded: true } : {}),
...(timeoutPhase ? { timeoutPhase } : {}),
...(providerStarted !== undefined ? { providerStarted } : {}),
...(terminalOutcome.timeoutPhase ? { timeoutPhase: terminalOutcome.timeoutPhase } : {}),
...(terminalOutcome.providerStarted !== undefined
? { providerStarted: terminalOutcome.providerStarted }
: {}),
ts: Date.now(),
};
}
@@ -229,6 +279,10 @@ function ensureAgentRunListener() {
schedulePendingAgentRunTimeout(snapshot);
return;
}
const pendingTimeout = pendingAgentRunTimeouts.get(evt.runId);
if (pendingTimeout && shouldPreserveTerminalSnapshot(pendingTimeout.snapshot, snapshot)) {
return;
}
clearPendingAgentRunError(evt.runId);
clearPendingAgentRunTimeout(evt.runId);
recordAgentRunSnapshot(snapshot);
@@ -277,6 +331,7 @@ export async function waitForAgentJob(params: {
let settled = false;
let pendingErrorTimer: NodeJS.Timeout | undefined;
let pendingTimeoutTimer: NodeJS.Timeout | undefined;
let pendingTimeoutSnapshot: AgentRunSnapshot | undefined;
let onAbort: (() => void) | undefined;
let removeWaiter = () => {};
@@ -294,6 +349,7 @@ export async function waitForAgentJob(params: {
}
clearTimeout(pendingTimeoutTimer);
pendingTimeoutTimer = undefined;
pendingTimeoutSnapshot = undefined;
};
const finish = (entry: AgentRunSnapshot | null) => {
@@ -317,6 +373,14 @@ export async function waitForAgentJob(params: {
snapshot: AgentRunSnapshot,
delayMs: number,
) => {
if (
pendingTimeoutSnapshot &&
shouldPreserveTerminalSnapshot(pendingTimeoutSnapshot, snapshot)
) {
// Mirror the shared pending map: while this waiter holds a hard timeout
// in grace, late terminal events must not replace the original cause.
return;
}
clearPendingErrorTimer();
clearPendingTimeoutTimer();
const timerRef = setSafeTimeout(() => {
@@ -333,6 +397,7 @@ export async function waitForAgentJob(params: {
pendingErrorTimer = timerRef;
} else {
pendingTimeoutTimer = timerRef;
pendingTimeoutSnapshot = snapshot;
}
};
@@ -379,6 +444,12 @@ export async function waitForAgentJob(params: {
}
const latest = ignoreCachedSnapshot ? undefined : getCachedAgentRun(runId);
if (latest) {
if (
pendingTimeoutSnapshot &&
shouldPreserveTerminalSnapshot(pendingTimeoutSnapshot, latest)
) {
return;
}
finish(latest);
return;
}
@@ -395,6 +466,12 @@ export async function waitForAgentJob(params: {
scheduleTimeoutFinish(snapshot);
return;
}
if (
pendingTimeoutSnapshot &&
shouldPreserveTerminalSnapshot(pendingTimeoutSnapshot, snapshot)
) {
return;
}
recordAgentRunSnapshot(snapshot);
finish(snapshot);
});

View File

@@ -448,7 +448,14 @@ describe("agent wait dedupe helper", () => {
kind: "agent",
runId,
ts: 100,
payload: { runId, status: "timeout", stopReason: "rpc", endedAt: 100 },
payload: {
runId,
status: "timeout",
stopReason: "rpc",
timeoutPhase: "queue",
providerStarted: false,
endedAt: 100,
},
});
setRunEntry({
dedupe,
@@ -468,6 +475,129 @@ describe("agent wait dedupe helper", () => {
endedAt: 100,
error: undefined,
stopReason: "rpc",
timeoutPhase: "queue",
providerStarted: false,
});
});
it("preserves an RPC cancel snapshot when a later accepted write reuses the key", () => {
const dedupe = new Map();
const runId = "run-cancel-wins-over-accepted";
setRunEntry({
dedupe,
kind: "agent",
runId,
ts: 100,
payload: {
runId,
status: "timeout",
stopReason: "rpc",
timeoutPhase: "queue",
providerStarted: false,
endedAt: 100,
},
});
setRunEntry({
dedupe,
kind: "agent",
runId,
ts: 200,
payload: { runId, status: "accepted" },
});
expect(
readTerminalSnapshotFromGatewayDedupe({
dedupe,
runId,
}),
).toEqual({
status: "timeout",
endedAt: 100,
error: undefined,
stopReason: "rpc",
timeoutPhase: "queue",
providerStarted: false,
});
});
it("lets an earlier terminal completion correct a provisional timeout snapshot", () => {
const dedupe = new Map();
const runId = "run-earlier-completion-wins";
setRunEntry({
dedupe,
kind: "agent",
runId,
ts: 200,
payload: {
runId,
status: "timeout",
timeoutPhase: "provider",
startedAt: 100,
endedAt: 200,
},
});
setRunEntry({
dedupe,
kind: "agent",
runId,
ts: 250,
payload: {
runId,
status: "ok",
startedAt: 100,
endedAt: 190,
},
});
expect(
readTerminalSnapshotFromGatewayDedupe({
dedupe,
runId,
}),
).toEqual({
status: "ok",
startedAt: 100,
endedAt: 190,
error: undefined,
});
});
it("does not make bare queue timeouts sticky", () => {
const dedupe = new Map();
const runId = "run-queue-timeout-replaced";
setRunEntry({
dedupe,
kind: "agent",
runId,
ts: 100,
payload: {
runId,
status: "timeout",
timeoutPhase: "queue",
providerStarted: false,
endedAt: 100,
},
});
setRunEntry({
dedupe,
kind: "agent",
runId,
ts: 200,
payload: { runId, status: "ok", endedAt: 200 },
});
expect(
readTerminalSnapshotFromGatewayDedupe({
dedupe,
runId,
}),
).toEqual({
status: "ok",
endedAt: 200,
error: undefined,
});
});
@@ -480,7 +610,14 @@ describe("agent wait dedupe helper", () => {
kind: "chat",
runId,
ts: 100,
payload: { runId, status: "timeout", stopReason: "rpc", endedAt: 100 },
payload: {
runId,
status: "timeout",
stopReason: "rpc",
timeoutPhase: "queue",
providerStarted: false,
endedAt: 100,
},
});
setRunEntry({
dedupe,
@@ -501,6 +638,8 @@ describe("agent wait dedupe helper", () => {
endedAt: 100,
error: undefined,
stopReason: "rpc",
timeoutPhase: "queue",
providerStarted: false,
});
});

View File

@@ -1,9 +1,9 @@
import { AGENT_RUN_ABORTED_ERROR, isAbortedAgentStopReason } from "../../agents/run-termination.js";
import {
normalizeAgentRunTimeoutPhase,
normalizeProviderStarted,
type AgentRunTimeoutPhase,
} from "../../agents/run-timeout-attribution.js";
buildAgentRunTerminalOutcome,
isStickyAgentRunTerminalOutcome,
mergeAgentRunTerminalOutcome,
type AgentRunTerminalOutcome,
} from "../../agents/agent-run-terminal-outcome.js";
import { normalizeBlockedLivenessWaitStatus } from "../../shared/agent-liveness.js";
import { isNonTerminalAgentRunStatus } from "../../shared/agent-run-status.js";
import { asFiniteNumber } from "../../shared/number-coercion.js";
@@ -20,7 +20,7 @@ export type AgentWaitTerminalSnapshot = {
livenessState?: string;
yielded?: boolean;
pendingError?: boolean;
timeoutPhase?: AgentRunTimeoutPhase;
timeoutPhase?: AgentRunTerminalOutcome["timeoutPhase"];
providerStarted?: boolean;
};
@@ -106,28 +106,27 @@ function readTerminalSnapshotFromDedupeEntry(entry: DedupeEntry): AgentWaitTermi
const stopReason = asString(payload?.stopReason) ?? asString(resultMeta?.stopReason);
const livenessState = asString(payload?.livenessState) ?? asString(resultMeta?.livenessState);
const yielded = payload?.yielded === true || resultMeta?.yielded === true;
const timeoutPhase =
normalizeAgentRunTimeoutPhase(payload?.timeoutPhase) ??
normalizeAgentRunTimeoutPhase(resultMeta?.timeoutPhase);
const providerStarted =
normalizeProviderStarted(payload?.providerStarted) ??
normalizeProviderStarted(resultMeta?.providerStarted);
const timeoutPhase = payload?.timeoutPhase ?? resultMeta?.timeoutPhase;
const providerStarted = payload?.providerStarted ?? resultMeta?.providerStarted;
const errorMessage =
typeof payload?.error === "string"
? payload.error
: typeof payload?.summary === "string"
? payload.summary
: entry.error?.message;
const abortedStopReason = isAbortedAgentStopReason(stopReason);
const normalizedError =
abortedStopReason && !errorMessage ? AGENT_RUN_ABORTED_ERROR : errorMessage;
if (status === "ok" || status === "timeout") {
const normalized = normalizeBlockedLivenessWaitStatus({
status: abortedStopReason ? "error" : status,
const terminalOutcome = buildAgentRunTerminalOutcome({
status,
livenessState,
error: normalizedError,
error: errorMessage,
stopReason,
timeoutPhase,
providerStarted,
startedAt,
endedAt,
});
const normalized = normalizeBlockedLivenessWaitStatus(terminalOutcome);
return {
status: normalized.status,
startedAt,
@@ -136,31 +135,54 @@ function readTerminalSnapshotFromDedupeEntry(entry: DedupeEntry): AgentWaitTermi
normalized.status === "error"
? normalized.error
: normalized.status === "timeout"
? normalizedError
? terminalOutcome.error
: undefined,
stopReason,
livenessState,
...(yielded ? { yielded } : {}),
...(timeoutPhase ? { timeoutPhase } : {}),
...(providerStarted !== undefined ? { providerStarted } : {}),
...(terminalOutcome.timeoutPhase ? { timeoutPhase: terminalOutcome.timeoutPhase } : {}),
...(terminalOutcome.providerStarted !== undefined
? { providerStarted: terminalOutcome.providerStarted }
: {}),
};
}
if (status === "error" || !entry.ok) {
const terminalOutcome = buildAgentRunTerminalOutcome({
status: "error",
livenessState,
error: errorMessage,
stopReason,
timeoutPhase,
providerStarted,
startedAt,
endedAt,
});
return {
status: "error",
startedAt,
endedAt,
error: errorMessage,
error: terminalOutcome.error,
stopReason,
livenessState,
...(yielded ? { yielded } : {}),
...(timeoutPhase ? { timeoutPhase } : {}),
...(providerStarted !== undefined ? { providerStarted } : {}),
...(terminalOutcome.timeoutPhase ? { timeoutPhase: terminalOutcome.timeoutPhase } : {}),
...(terminalOutcome.providerStarted !== undefined
? { providerStarted: terminalOutcome.providerStarted }
: {}),
};
}
return null;
}
function terminalOutcomeFromWaitSnapshot(
snapshot: AgentWaitTerminalSnapshot,
): AgentRunTerminalOutcome | undefined {
if (snapshot.pendingError) {
return undefined;
}
return buildAgentRunTerminalOutcome(snapshot);
}
export function readTerminalSnapshotFromGatewayDedupe(params: {
dedupe: Map<string, DedupeEntry>;
runId: string;
@@ -264,9 +286,23 @@ export function setGatewayDedupeEntry(params: {
const existing = params.dedupe.get(params.key);
const existingSnapshot = existing ? readTerminalSnapshotFromDedupeEntry(existing) : null;
const incomingSnapshot = readTerminalSnapshotFromDedupeEntry(params.entry);
if (existingSnapshot?.status === "timeout" && existingSnapshot.stopReason === "rpc") {
const existingOutcome = existingSnapshot
? terminalOutcomeFromWaitSnapshot(existingSnapshot)
: undefined;
const incomingOutcome = incomingSnapshot
? terminalOutcomeFromWaitSnapshot(incomingSnapshot)
: undefined;
if (existingOutcome && isStickyAgentRunTerminalOutcome(existingOutcome) && !incomingOutcome) {
// Accepted/in-flight rewrites are not evidence against a terminal hard
// timeout or explicit cancellation already stored for this run id.
return;
}
if (existingOutcome && incomingOutcome && isStickyAgentRunTerminalOutcome(existingOutcome)) {
const merged = mergeAgentRunTerminalOutcome(existingOutcome, incomingOutcome);
if (merged === existingOutcome) {
return;
}
}
params.dedupe.set(params.key, params.entry);
const runId = parseRunIdFromDedupeKey(params.key);
if (!runId) {

View File

@@ -146,6 +146,204 @@ describe("waitForAgentJob", () => {
}
});
it("keeps a recorded hard timeout when a later lifecycle error arrives", async () => {
vi.useFakeTimers();
try {
const runId = `run-timeout-late-error-${Date.now()}-${Math.random().toString(36).slice(2)}`;
const firstWait = waitForAgentJob({ runId, timeoutMs: 20_000 });
emitAgentEvent({
runId,
stream: "lifecycle",
data: { phase: "start", startedAt: 100 },
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 100,
endedAt: 200,
aborted: true,
timeoutPhase: "provider",
},
});
await vi.advanceTimersByTimeAsync(15_000);
expectRecordFields(await firstWait, {
status: "timeout",
startedAt: 100,
endedAt: 200,
timeoutPhase: "provider",
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "error",
startedAt: 100,
endedAt: 250,
error: "late rejection",
},
});
await vi.advanceTimersByTimeAsync(15_000);
const secondWait = await waitForAgentJob({ runId, timeoutMs: 1_000 });
expectRecordFields(secondWait, {
status: "timeout",
startedAt: 100,
endedAt: 200,
timeoutPhase: "provider",
});
} finally {
vi.useRealTimers();
}
});
it("keeps a pending hard timeout when a late lifecycle error arrives during grace", async () => {
vi.useFakeTimers();
try {
const runId = `run-pending-timeout-late-error-${Date.now()}-${Math.random().toString(36).slice(2)}`;
const waitPromise = waitForAgentJob({ runId, timeoutMs: 20_000 });
emitAgentEvent({
runId,
stream: "lifecycle",
data: { phase: "start", startedAt: 100 },
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 100,
endedAt: 200,
aborted: true,
timeoutPhase: "provider",
},
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "error",
startedAt: 100,
endedAt: 250,
error: "late rejection",
},
});
await vi.advanceTimersByTimeAsync(15_000);
expectRecordFields(await waitPromise, {
status: "timeout",
startedAt: 100,
endedAt: 200,
timeoutPhase: "provider",
});
} finally {
vi.useRealTimers();
}
});
it("keeps a pending hard timeout when a late softer timeout arrives during grace", async () => {
vi.useFakeTimers();
try {
const runId = `run-pending-hard-timeout-late-soft-timeout-${Date.now()}-${Math.random().toString(36).slice(2)}`;
const waitPromise = waitForAgentJob({ runId, timeoutMs: 20_000 });
emitAgentEvent({
runId,
stream: "lifecycle",
data: { phase: "start", startedAt: 100 },
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 100,
endedAt: 200,
aborted: true,
timeoutPhase: "provider",
},
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 100,
endedAt: 250,
aborted: true,
timeoutPhase: "gateway_draining",
},
});
await vi.advanceTimersByTimeAsync(15_000);
expectRecordFields(await waitPromise, {
status: "timeout",
startedAt: 100,
endedAt: 200,
timeoutPhase: "provider",
});
const cached = await waitForAgentJob({ runId, timeoutMs: 1_000 });
expectRecordFields(cached, {
status: "timeout",
startedAt: 100,
endedAt: 200,
timeoutPhase: "provider",
});
} finally {
vi.useRealTimers();
}
});
it("keeps a pending hard timeout when a late lifecycle completion arrives during grace", async () => {
vi.useFakeTimers();
try {
const runId = `run-pending-timeout-late-completion-${Date.now()}-${Math.random().toString(36).slice(2)}`;
const waitPromise = waitForAgentJob({ runId, timeoutMs: 20_000 });
emitAgentEvent({
runId,
stream: "lifecycle",
data: { phase: "start", startedAt: 100 },
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 100,
endedAt: 200,
aborted: true,
timeoutPhase: "provider",
},
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 100,
endedAt: 250,
},
});
await vi.advanceTimersByTimeAsync(15_000);
expectRecordFields(await waitPromise, {
status: "timeout",
startedAt: 100,
endedAt: 200,
timeoutPhase: "provider",
});
} finally {
vi.useRealTimers();
}
});
it("keeps non-aborted lifecycle end events as ok", async () => {
const snapshot = await runLifecycleScenario({
runIdPrefix: "run-ok",

View File

@@ -353,6 +353,7 @@ describe("openHttpConnectTunnel", () => {
it("caps oversized CONNECT timeouts before arming the watchdog", async () => {
vi.useFakeTimers();
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
const proxySocket = new FakeSocket();
setNextNetSocket(proxySocket);
const { openHttpConnectTunnel } = await import("./http-connect-tunnel.js");
@@ -366,11 +367,12 @@ describe("openHttpConnectTunnel", () => {
const rejected = expect(tunnel).rejects.toThrow(
`Proxy CONNECT failed via http://proxy.example:8080: Proxy CONNECT timed out after ${MAX_TIMER_TIMEOUT_MS}ms`,
);
expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), MAX_TIMER_TIMEOUT_MS);
await vi.advanceTimersByTimeAsync(1);
expect(proxySocket.destroyed).toBe(false);
await vi.advanceTimersByTimeAsync(MAX_TIMER_TIMEOUT_MS - 1);
await vi.advanceTimersToNextTimerAsync();
await rejected;
expect(proxySocket.destroyed).toBe(true);
});