mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(agent-job): preserve grace for pending error diagnostics
Preserve pending agent-job error diagnostics as non-terminal timeout snapshots so the retry grace path can still recover when the lifecycle later starts and completes. Local proof: - node scripts/run-vitest.mjs packages/sdk/src/index.test.ts src/gateway/server-methods/server-methods.test.ts src/gateway/server.chat.gateway-server-chat.test.ts src/agents/run-wait.test.ts src/agents/openclaw-tools.sessions.test.ts - node scripts/run-oxlint.mjs packages/sdk/src/client.ts packages/sdk/src/index.test.ts src/gateway/server-methods/agent-job.ts src/gateway/server-methods/agent.ts src/gateway/server-methods/agent-wait-dedupe.ts src/agents/run-wait.ts src/agents/tools/sessions-send-tool.ts src/gateway/server-methods/server-methods.test.ts src/gateway/server.chat.gateway-server-chat.test.ts src/agents/run-wait.test.ts src/agents/openclaw-tools.sessions.test.ts - autoreview --mode local: no accepted/actionable findings - CI run 26536599850: success Co-authored-by: Martin Garramon <martin@yulicreative.ai>
This commit is contained in:
@@ -58,9 +58,10 @@ function runStatusFromWaitPayload(payload: unknown): RunResult["status"] {
|
||||
: {};
|
||||
const status = typeof record.status === "string" ? record.status.toLowerCase() : undefined;
|
||||
const stopReason = typeof record.stopReason === "string" ? record.stopReason.toLowerCase() : "";
|
||||
const pendingError = record.pendingError === true;
|
||||
const hasTerminalTimeoutMetadata =
|
||||
readOptionalTimestamp(record.endedAt) !== undefined ||
|
||||
readOptionalString(record.error) !== undefined ||
|
||||
(!pendingError && readOptionalString(record.error) !== undefined) ||
|
||||
stopReason.length > 0 ||
|
||||
typeof record.livenessState === "string" ||
|
||||
record.yielded === true;
|
||||
|
||||
@@ -174,6 +174,24 @@ describe("OpenClaw SDK", () => {
|
||||
expect(result.error).toBeUndefined();
|
||||
});
|
||||
|
||||
it("keeps pending-error wait deadlines non-terminal", async () => {
|
||||
const transport = new FakeTransport({
|
||||
"agent.wait": {
|
||||
status: "timeout",
|
||||
runId: "run_pending_error",
|
||||
error: "429 RESOURCE_EXHAUSTED",
|
||||
pendingError: true,
|
||||
},
|
||||
});
|
||||
const oc = new OpenClaw({ transport });
|
||||
|
||||
const result = await oc.runs.wait("run_pending_error");
|
||||
|
||||
expect(result.runId).toBe("run_pending_error");
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.error?.message).toBe("429 RESOURCE_EXHAUSTED");
|
||||
});
|
||||
|
||||
it("maps terminal runtime timeout snapshots to timed_out", async () => {
|
||||
const transport = new FakeTransport({
|
||||
"agent.wait": {
|
||||
|
||||
@@ -1018,6 +1018,57 @@ describe("sessions tools", () => {
|
||||
expect(sendCallCount).toBe(0);
|
||||
});
|
||||
|
||||
it("sessions_send returns pending agent error diagnostics on timeout", async () => {
|
||||
const calls: Array<{ method?: string; params?: unknown }> = [];
|
||||
callGatewayMock.mockImplementation(async (opts: unknown) => {
|
||||
const request = opts as { method?: string; params?: unknown };
|
||||
calls.push(request);
|
||||
if (request.method === "chat.history") {
|
||||
return { messages: [] };
|
||||
}
|
||||
if (request.method === "agent") {
|
||||
return {
|
||||
runId: "run-pending-model-error",
|
||||
status: "accepted",
|
||||
acceptedAt: 1234,
|
||||
};
|
||||
}
|
||||
if (request.method === "agent.wait") {
|
||||
return {
|
||||
runId: "run-pending-model-error",
|
||||
status: "timeout",
|
||||
error: "429 RESOURCE_EXHAUSTED",
|
||||
pendingError: true,
|
||||
};
|
||||
}
|
||||
return {};
|
||||
});
|
||||
|
||||
const tool = createOpenClawTools({
|
||||
agentSessionKey: "discord:group:req",
|
||||
agentChannel: "discord",
|
||||
}).find((candidate) => candidate.name === "sessions_send");
|
||||
if (!tool) {
|
||||
throw new Error("missing sessions_send tool");
|
||||
}
|
||||
|
||||
const result = await tool.execute("call-pending-error", {
|
||||
sessionKey: "main",
|
||||
message: "check status",
|
||||
timeoutSeconds: 1,
|
||||
});
|
||||
|
||||
const details = sessionsSendDetails(result.details);
|
||||
expect(details.status).toBe("timeout");
|
||||
expect(details.error).toBe("429 RESOURCE_EXHAUSTED");
|
||||
expect(details.runId).toBe("run-pending-model-error");
|
||||
expect(details.delivery?.status).toBe("pending");
|
||||
expect(calls.filter((call) => call.method === "agent")).toHaveLength(1);
|
||||
await vi.waitFor(() =>
|
||||
expect(calls.filter((call) => call.method === "agent.wait").length).toBeGreaterThanOrEqual(2),
|
||||
);
|
||||
});
|
||||
|
||||
it("sessions_send resolves sessionId inputs", async () => {
|
||||
const sessionId = "sess-send";
|
||||
const targetKey = "agent:main:discord:channel:123";
|
||||
|
||||
@@ -220,6 +220,22 @@ describe("waitForAgentRun", () => {
|
||||
expect(result).toEqual({ status: "pending" });
|
||||
});
|
||||
|
||||
it("preserves pending error diagnostics on wait timeouts", async () => {
|
||||
callGatewayMock.mockResolvedValue({
|
||||
status: "timeout",
|
||||
error: "429 RESOURCE_EXHAUSTED",
|
||||
pendingError: true,
|
||||
});
|
||||
|
||||
const result = await waitForAgentRun({ runId: "run-pending-error", timeoutMs: 500 });
|
||||
|
||||
expect(result).toEqual({
|
||||
status: "timeout",
|
||||
error: "429 RESOURCE_EXHAUSTED",
|
||||
pendingError: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("normalizes wait timeouts before sending agent.wait", async () => {
|
||||
callGatewayMock.mockResolvedValue({ status: "ok" });
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ export type AgentWaitResult = {
|
||||
stopReason?: string;
|
||||
livenessState?: string;
|
||||
yielded?: boolean;
|
||||
pendingError?: boolean;
|
||||
timeoutPhase?: AgentRunTimeoutPhase;
|
||||
providerStarted?: boolean;
|
||||
};
|
||||
@@ -50,6 +51,7 @@ type RawAgentWaitResponse = {
|
||||
stopReason?: unknown;
|
||||
livenessState?: unknown;
|
||||
yielded?: unknown;
|
||||
pendingError?: unknown;
|
||||
timeoutPhase?: unknown;
|
||||
providerStarted?: unknown;
|
||||
};
|
||||
@@ -75,6 +77,7 @@ function normalizeAgentWaitResult(
|
||||
stopReason,
|
||||
livenessState: typeof wait?.livenessState === "string" ? wait.livenessState : undefined,
|
||||
yielded: wait?.yielded === true ? true : undefined,
|
||||
pendingError: wait?.pendingError === true ? true : undefined,
|
||||
timeoutPhase: normalizeAgentRunTimeoutPhase(wait?.timeoutPhase),
|
||||
providerStarted: normalizeProviderStarted(wait?.providerStarted),
|
||||
};
|
||||
|
||||
@@ -161,6 +161,12 @@ function isTerminalAgentWaitTimeout(result: AgentWaitResult): boolean {
|
||||
return result.endedAt !== undefined || Boolean(result.stopReason || result.livenessState);
|
||||
}
|
||||
|
||||
function isPendingErrorAgentWaitTimeout(result: AgentWaitResult): boolean {
|
||||
return (
|
||||
result.pendingError === true && typeof result.error === "string" && result.error.trim() !== ""
|
||||
);
|
||||
}
|
||||
|
||||
async function startAgentRun(params: {
|
||||
callGateway: GatewayCaller;
|
||||
runId: string;
|
||||
@@ -620,6 +626,16 @@ export function createSessionsSendTool(opts?: {
|
||||
});
|
||||
|
||||
if (result.status === "timeout") {
|
||||
if (isPendingErrorAgentWaitTimeout(result)) {
|
||||
startA2AFlow(undefined, runId);
|
||||
return jsonResult({
|
||||
runId,
|
||||
status: "timeout",
|
||||
error: result.error,
|
||||
sessionKey: displayKey,
|
||||
delivery,
|
||||
});
|
||||
}
|
||||
if (!isTerminalAgentWaitTimeout(result)) {
|
||||
startA2AFlow(undefined, runId);
|
||||
return jsonResult({
|
||||
|
||||
@@ -39,6 +39,7 @@ type AgentRunSnapshot = {
|
||||
stopReason?: string;
|
||||
livenessState?: string;
|
||||
yielded?: boolean;
|
||||
pendingError?: boolean;
|
||||
timeoutPhase?: AgentRunTimeoutPhase;
|
||||
providerStarted?: boolean;
|
||||
ts: number;
|
||||
@@ -137,6 +138,22 @@ function getPendingAgentRunTimeout(runId: string) {
|
||||
};
|
||||
}
|
||||
|
||||
function createPendingErrorTimeoutSnapshot(snapshot: AgentRunSnapshot): AgentRunSnapshot {
|
||||
// Keep this non-terminal: the retry grace can still be canceled by a later
|
||||
// lifecycle start, so omit terminal fields such as endedAt and stopReason.
|
||||
return {
|
||||
runId: snapshot.runId,
|
||||
status: "timeout",
|
||||
startedAt: snapshot.startedAt,
|
||||
error: snapshot.error,
|
||||
pendingError: true,
|
||||
...(snapshot.providerStarted !== undefined
|
||||
? { providerStarted: snapshot.providerStarted }
|
||||
: {}),
|
||||
ts: Date.now(),
|
||||
};
|
||||
}
|
||||
|
||||
function createSnapshotFromLifecycleEvent(params: {
|
||||
runId: string;
|
||||
phase: "end" | "error";
|
||||
@@ -383,7 +400,10 @@ export async function waitForAgentJob(params: {
|
||||
});
|
||||
removeWaiter = addAgentRunWaiter(runId);
|
||||
|
||||
const timer = setSafeTimeout(() => finish(null), timeoutMs);
|
||||
const timer = setSafeTimeout(() => {
|
||||
const pendingError = getPendingAgentRunError(runId);
|
||||
finish(pendingError ? createPendingErrorTimeoutSnapshot(pendingError.snapshot) : null);
|
||||
}, timeoutMs);
|
||||
onAbort = () => finish(null);
|
||||
signal?.addEventListener("abort", onAbort, { once: true });
|
||||
});
|
||||
|
||||
@@ -19,6 +19,7 @@ export type AgentWaitTerminalSnapshot = {
|
||||
stopReason?: string;
|
||||
livenessState?: string;
|
||||
yielded?: boolean;
|
||||
pendingError?: boolean;
|
||||
timeoutPhase?: AgentRunTimeoutPhase;
|
||||
providerStarted?: boolean;
|
||||
};
|
||||
|
||||
@@ -2240,6 +2240,7 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
stopReason: cachedGatewaySnapshot.stopReason,
|
||||
livenessState: cachedGatewaySnapshot.livenessState,
|
||||
yielded: cachedGatewaySnapshot.yielded,
|
||||
pendingError: cachedGatewaySnapshot.pendingError,
|
||||
timeoutPhase: cachedGatewaySnapshot.timeoutPhase,
|
||||
providerStarted: cachedGatewaySnapshot.providerStarted,
|
||||
});
|
||||
@@ -2302,6 +2303,7 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
stopReason: snapshot.stopReason,
|
||||
livenessState: snapshot.livenessState,
|
||||
yielded: snapshot.yielded,
|
||||
pendingError: snapshot.pendingError,
|
||||
timeoutPhase: snapshot.timeoutPhase,
|
||||
providerStarted: snapshot.providerStarted,
|
||||
});
|
||||
|
||||
@@ -336,6 +336,52 @@ describe("waitForAgentJob", () => {
|
||||
expect(fresh?.startedAt).toBe(200);
|
||||
expect(fresh?.endedAt).toBe(210);
|
||||
});
|
||||
|
||||
it("surfaces pending error diagnostics when outer timeout fires before error grace period", async () => {
|
||||
// Preserve the retry grace: the caller timeout may carry the pending error
|
||||
// reason, but it must not cache a terminal error before a later start can
|
||||
// cancel the pending snapshot.
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const runId = `run-pending-error-${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
||||
const waitPromise = waitForAgentJob({ runId, timeoutMs: 5_000 });
|
||||
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: { phase: "error", error: "transient-auth-failure" },
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(6_000);
|
||||
|
||||
const result = await waitPromise;
|
||||
expect(result).not.toBeNull();
|
||||
expect(result?.status).toBe("timeout");
|
||||
expect(result?.error).toBe("transient-auth-failure");
|
||||
expect(result?.pendingError).toBe(true);
|
||||
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: { phase: "start", startedAt: 12_000 },
|
||||
});
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end", startedAt: 12_000, endedAt: 12_100 },
|
||||
});
|
||||
|
||||
const recovered = await waitForAgentJob({ runId, timeoutMs: 1_000 });
|
||||
expectRecordFields(recovered, {
|
||||
status: "ok",
|
||||
startedAt: 12_000,
|
||||
endedAt: 12_100,
|
||||
});
|
||||
} finally {
|
||||
vi.clearAllTimers();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("augmentChatHistoryWithCanvasBlocks", () => {
|
||||
|
||||
@@ -147,9 +147,13 @@ describe("gateway server chat", () => {
|
||||
return actual.runId as string;
|
||||
};
|
||||
|
||||
const expectAgentWaitTimeout = (res: Awaited<ReturnType<typeof rpcReq>>) => {
|
||||
const expectAgentWaitTimeout = (res: Awaited<ReturnType<typeof rpcReq>>, error?: string) => {
|
||||
expect(res.ok).toBe(true);
|
||||
expect(res.payload?.status).toBe("timeout");
|
||||
if (error !== undefined) {
|
||||
expect(res.payload?.error).toBe(error);
|
||||
expect(res.payload?.pendingError).toBe(true);
|
||||
}
|
||||
};
|
||||
|
||||
const expectAgentWaitStartedAt = (res: Awaited<ReturnType<typeof rpcReq>>, startedAt: number) => {
|
||||
@@ -1665,7 +1669,7 @@ describe("gateway server chat", () => {
|
||||
});
|
||||
|
||||
const res = await waitP;
|
||||
expectAgentWaitTimeout(res);
|
||||
expectAgentWaitTimeout(res, "boom");
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user