fix(agents): report stale session locks without cleanup

Report live-owned stale session locks as typed acquisition failures instead of auto-removing them, while preserving safe reclaim for dead/orphaned lock files. Propagate stale lock acquisition through embedded runner takeover handling, failover/cache/delivery classifiers, and QA retry detection.

Refs #87779
This commit is contained in:
Peter Steinberger
2026-05-31 15:28:54 +01:00
committed by GitHub
parent fb7e21796d
commit 7ca77124fe
10 changed files with 240 additions and 36 deletions

View File

@@ -75,6 +75,28 @@ describe("qa suite runtime agent session helpers", () => {
);
});
it("retries transient session store stale locks while creating sessions", async () => {
const lockStaleError = Object.assign(
new Error("SessionWriteLockStaleError: session file lock stale"),
{ code: "OPENCLAW_SESSION_WRITE_LOCK_STALE" },
);
gatewayCall.mockRejectedValueOnce(lockStaleError).mockResolvedValueOnce({ key: " session-3 " });
vi.useFakeTimers();
const pending = createSession(env, "Retry Stale Session", "agent:qa:stale-retry");
await vi.advanceTimersByTimeAsync(1);
await expect(pending).resolves.toBe("session-3");
expect(gatewayCall).toHaveBeenCalledTimes(2);
expect(gatewayCall).toHaveBeenNthCalledWith(
2,
"sessions.create",
{ label: "Retry Stale Session", key: "agent:qa:stale-retry" },
expect.objectContaining({ timeoutMs: expect.any(Number) }),
);
});
it("reads effective tool ids once and drops blanks", async () => {
gatewayCall.mockResolvedValueOnce({
groups: [

View File

@@ -35,8 +35,11 @@ function isSessionStoreLockTimeout(error: unknown) {
const text = formatErrorMessage(error);
return (
text.includes("OPENCLAW_SESSION_WRITE_LOCK_TIMEOUT") ||
text.includes("OPENCLAW_SESSION_WRITE_LOCK_STALE") ||
text.includes("SessionWriteLockTimeoutError") ||
text.includes("session file locked")
text.includes("SessionWriteLockStaleError") ||
text.includes("session file locked") ||
text.includes("session file lock stale")
);
}

View File

@@ -11,7 +11,7 @@ import { streamWithPayloadPatch } from "../../llm/providers/stream-wrappers/stre
import type { Model } from "../../llm/types.js";
import { buildGuardedModelFetch } from "../provider-transport-fetch.js";
import type { StreamFn } from "../runtime/index.js";
import { isSessionWriteLockTimeoutError } from "../session-write-lock-error.js";
import { isSessionWriteLockAcquireError } from "../session-write-lock-error.js";
import { stableStringify } from "../stable-stringify.js";
import { stripSystemPromptCacheBoundary } from "../system-prompt-cache-boundary.js";
import { mergeTransportHeaders, sanitizeTransportPayloadText } from "../transport-stream-shared.js";
@@ -179,7 +179,7 @@ async function appendGooglePromptCacheEntry(
try {
await sessionManager.appendCustomEntry(GOOGLE_PROMPT_CACHE_CUSTOM_TYPE, entry);
} catch (err) {
if (err instanceof EmbeddedAttemptSessionTakeoverError || isSessionWriteLockTimeoutError(err)) {
if (err instanceof EmbeddedAttemptSessionTakeoverError || isSessionWriteLockAcquireError(err)) {
throw err;
}
// ignore persistence failures

View File

@@ -8,7 +8,10 @@ import {
runWithOwnedSessionTranscriptWritePublication,
withOwnedSessionTranscriptWrites,
} from "../../../config/sessions/transcript-write-context.js";
import { SessionWriteLockTimeoutError } from "../../session-write-lock-error.js";
import {
SessionWriteLockStaleError,
SessionWriteLockTimeoutError,
} from "../../session-write-lock-error.js";
import {
acquireSessionWriteLock,
resetSessionWriteLockStateForTest,
@@ -1178,6 +1181,36 @@ describe("embedded attempt session lock lifecycle", () => {
expect(releases).toEqual(["prep"]);
});
it("skips cleanup lock reacquisition after a post-prompt stale lock", async () => {
const releases: string[] = [];
const acquireSessionWriteLock = vi
.fn()
.mockResolvedValueOnce({ release: vi.fn(async () => releases.push("prep")) })
.mockRejectedValueOnce(
new SessionWriteLockStaleError({
owner: "pid=789 alive=true ageMs=1800001",
lockPath: `${lockOptions.sessionFile}.lock`,
staleReasons: ["too-old"],
}),
);
const controller = await createEmbeddedAttemptSessionLockController({
acquireSessionWriteLock,
lockOptions,
});
await controller.releaseForPrompt();
await expect(controller.withSessionWriteLock(() => "late-write")).rejects.toBeInstanceOf(
SessionWriteLockStaleError,
);
const cleanupLock = await controller.acquireForCleanup();
await cleanupLock.release();
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2);
expect(controller.hasSessionTakeover()).toBe(true);
expect(releases).toEqual(["prep"]);
});
it("wraps provider stream submission with queued transcript drain and lock release", async () => {
const events: string[] = [];
const streamFn = vi.fn(async (..._args: unknown[]) => {

View File

@@ -5,7 +5,7 @@ import { isDeepStrictEqual } from "node:util";
import { normalizeStringEntries } from "@openclaw/normalization-core/string-normalization";
import { withOwnedSessionTranscriptWrites } from "../../../config/sessions/transcript-write-context.js";
import { resolveGlobalSingleton } from "../../../shared/global-singleton.js";
import { isSessionWriteLockTimeoutError } from "../../session-write-lock-error.js";
import { isSessionWriteLockAcquireError } from "../../session-write-lock-error.js";
import type { acquireSessionWriteLock } from "../../session-write-lock.js";
import { resolveEmbeddedSessionFileKey } from "../session-file-key.js";
@@ -651,7 +651,7 @@ export async function createEmbeddedAttemptSessionLockController(params: {
try {
return { lock: await acquireLock(), owned: true };
} catch (err) {
if (isSessionWriteLockTimeoutError(err)) {
if (isSessionWriteLockAcquireError(err)) {
takeoverDetected = true;
}
throw err;
@@ -865,7 +865,7 @@ export async function createEmbeddedAttemptSessionLockController(params: {
try {
return await acquireLock();
} catch (err) {
if (isSessionWriteLockTimeoutError(err)) {
if (isSessionWriteLockAcquireError(err)) {
takeoverDetected = true;
return undefined;
}

View File

@@ -9,7 +9,7 @@ import {
} from "./embedded-agent-helpers/errors.js";
import { isTimeoutErrorMessage } from "./embedded-agent-helpers/errors.js";
import type { FailoverReason } from "./embedded-agent-helpers/types.js";
import { isSessionWriteLockTimeoutError } from "./session-write-lock-error.js";
import { isSessionWriteLockAcquireError } from "./session-write-lock-error.js";
const ABORT_TIMEOUT_RE = /request was aborted|request aborted/i;
const MAX_FAILOVER_CAUSE_DEPTH = 25;
@@ -262,8 +262,8 @@ function normalizeDirectErrorSignal(err: unknown): FailoverSignal {
};
}
function hasSessionWriteLockTimeout(err: unknown, seen: Set<object> = new Set()): boolean {
if (isSessionWriteLockTimeoutError(err)) {
function hasSessionWriteLockContention(err: unknown, seen: Set<object> = new Set()): boolean {
if (isSessionWriteLockAcquireError(err)) {
return true;
}
if (!err || typeof err !== "object") {
@@ -275,9 +275,9 @@ function hasSessionWriteLockTimeout(err: unknown, seen: Set<object> = new Set())
seen.add(err);
const candidate = err as { error?: unknown; cause?: unknown; reason?: unknown };
return (
hasSessionWriteLockTimeout(candidate.error, seen) ||
hasSessionWriteLockTimeout(candidate.cause, seen) ||
hasSessionWriteLockTimeout(candidate.reason, seen)
hasSessionWriteLockContention(candidate.error, seen) ||
hasSessionWriteLockContention(candidate.cause, seen) ||
hasSessionWriteLockContention(candidate.reason, seen)
);
}
@@ -315,7 +315,7 @@ function hasEmbeddedAttemptSessionTakeover(err: unknown, seen: Set<object> = new
* See #83510.
*/
export function isNonProviderRuntimeCoordinationError(err: unknown): boolean {
if (!hasSessionWriteLockTimeout(err) && !hasEmbeddedAttemptSessionTakeover(err)) {
if (!hasSessionWriteLockContention(err) && !hasEmbeddedAttemptSessionTakeover(err)) {
return false;
}
if (isFailoverError(err)) {
@@ -331,7 +331,7 @@ function hasTimeoutHint(err: unknown): boolean {
if (!err) {
return false;
}
if (hasSessionWriteLockTimeout(err)) {
if (hasSessionWriteLockContention(err)) {
return false;
}
if (readErrorName(err) === "TimeoutError") {
@@ -351,7 +351,7 @@ export function isTimeoutError(err: unknown): boolean {
if (readErrorName(err) !== "AbortError") {
return false;
}
if (hasSessionWriteLockTimeout(err)) {
if (hasSessionWriteLockContention(err)) {
return false;
}
const message = getErrorMessage(err);
@@ -460,7 +460,7 @@ function resolveFailoverClassificationFromErrorInternal(
const hasExplicitFailoverMetadata =
typeof inferSignalStatus(signal) === "number" ||
(codeReason !== null && codeReason !== "timeout");
const hasSessionLock = hasSessionWriteLockTimeout(err);
const hasSessionLock = hasSessionWriteLockContention(err);
const classification = classifyFailoverSignal(signal);
const nestedCandidates = getNestedErrorCandidates(err);

View File

@@ -1,4 +1,5 @@
const SESSION_WRITE_LOCK_TIMEOUT_CODE = "OPENCLAW_SESSION_WRITE_LOCK_TIMEOUT";
const SESSION_WRITE_LOCK_STALE_CODE = "OPENCLAW_SESSION_WRITE_LOCK_STALE";
export class SessionWriteLockTimeoutError extends Error {
readonly code = SESSION_WRITE_LOCK_TIMEOUT_CODE;
@@ -17,6 +18,24 @@ export class SessionWriteLockTimeoutError extends Error {
}
}
export class SessionWriteLockStaleError extends Error {
readonly code = SESSION_WRITE_LOCK_STALE_CODE;
readonly owner: string;
readonly lockPath: string;
readonly staleReasons: string[];
constructor(params: { owner: string; lockPath: string; staleReasons?: string[] }) {
const staleReasons = params.staleReasons?.length ? params.staleReasons : ["unknown"];
super(
`session file lock stale (${staleReasons.join(", ")}): ${params.owner} ${params.lockPath}`,
);
this.name = "SessionWriteLockStaleError";
this.owner = params.owner;
this.lockPath = params.lockPath;
this.staleReasons = staleReasons;
}
}
export function isSessionWriteLockTimeoutError(err: unknown): boolean {
return (
err instanceof SessionWriteLockTimeoutError ||
@@ -27,3 +46,18 @@ export function isSessionWriteLockTimeoutError(err: unknown): boolean {
)
);
}
export function isSessionWriteLockStaleError(err: unknown): boolean {
return (
err instanceof SessionWriteLockStaleError ||
Boolean(
err &&
typeof err === "object" &&
(err as { code?: unknown }).code === SESSION_WRITE_LOCK_STALE_CODE,
)
);
}
export function isSessionWriteLockAcquireError(err: unknown): boolean {
return isSessionWriteLockTimeoutError(err) || isSessionWriteLockStaleError(err);
}

View File

@@ -1,9 +1,11 @@
import { spawn } from "node:child_process";
import fsSync from "node:fs";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { MAX_TIMER_TIMEOUT_MS } from "@openclaw/normalization-core/number-coercion";
import { afterEach, beforeAll, describe, expect, it, vi } from "vitest";
import { SessionWriteLockStaleError } from "./session-write-lock-error.js";
const FAKE_STARTTIME = 12345;
let testing: typeof import("./session-write-lock.js").testing;
@@ -119,12 +121,13 @@ async function withSymlinkedSessionPaths(
async function expectActiveInProcessLockIsNotReclaimed(params?: {
legacyStarttime?: unknown;
createdAt?: string;
}): Promise<void> {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
const lockPayload = {
pid: process.pid,
createdAt: new Date().toISOString(),
createdAt: params?.createdAt ?? new Date().toISOString(),
...(params && "legacyStarttime" in params ? { starttime: params.legacyStarttime } : {}),
};
await fs.writeFile(lockPath, JSON.stringify(lockPayload), "utf8");
@@ -410,6 +413,46 @@ describe("acquireSessionWriteLock", () => {
});
});
it("does not report or remove active in-process locks that pass staleMs", async () => {
await expectActiveInProcessLockIsNotReclaimed({
createdAt: new Date(Date.now() - 120_000).toISOString(),
});
});
it("reports live OpenClaw-owned stale locks without removing them", async () => {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
const owner = spawn(process.execPath, ["-e", "setInterval(() => {}, 1000)", "openclaw"], {
stdio: "ignore",
});
if (!owner.pid) {
throw new Error("missing lock owner pid");
}
await fs.writeFile(
lockPath,
JSON.stringify({
pid: owner.pid,
createdAt: new Date(Date.now() - 120_000).toISOString(),
}),
"utf8",
);
try {
await expect(
acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10 }),
).rejects.toMatchObject({
name: "SessionWriteLockStaleError",
staleReasons: ["too-old"],
});
await expect(
acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10 }),
).rejects.toBeInstanceOf(SessionWriteLockStaleError);
await expect(fs.access(lockPath)).resolves.toBeUndefined();
} finally {
owner.kill("SIGTERM");
}
});
});
it("watchdog releases stale in-process locks", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);

View File

@@ -6,7 +6,10 @@ import { MAX_TIMER_TIMEOUT_MS } from "@openclaw/normalization-core/number-coerci
import { createFileLockManager } from "../infra/file-lock-manager.js";
import { readGatewayProcessArgsSync as readProcessArgsSync } from "../infra/gateway-processes.js";
import { getProcessStartTime, isPidAlive } from "../shared/pid-alive.js";
import { SessionWriteLockTimeoutError } from "./session-write-lock-error.js";
import {
SessionWriteLockStaleError,
SessionWriteLockTimeoutError,
} from "./session-write-lock-error.js";
type LockFilePayload = {
pid?: number;
@@ -563,7 +566,34 @@ function lockInspectionNeedsMtimeStaleFallback(details: LockInspectionDetails):
);
}
async function shouldReclaimContendedLockFile(
async function shouldReportContendedLockStale(params: {
lockPath: string;
details: LockInspectionDetails;
heldByThisProcess: boolean;
staleMs: number;
nowMs: number;
orphanPayloadGraceMs: number;
}): Promise<boolean> {
if (!params.details.stale) {
return false;
}
if (params.heldByThisProcess) {
return false;
}
if (lockInspectionNeedsMtimeStaleFallback(params.details)) {
try {
const stat = await fs.stat(params.lockPath);
const ageMs = Math.max(0, params.nowMs - stat.mtimeMs);
return ageMs > Math.min(params.staleMs, params.orphanPayloadGraceMs);
} catch (error) {
const code = (error as { code?: string } | null)?.code;
return code !== "ENOENT";
}
}
return true;
}
async function shouldRemoveContendedLockFile(
lockPath: string,
details: LockInspectionDetails,
staleMs: number,
@@ -573,6 +603,9 @@ async function shouldReclaimContendedLockFile(
if (!details.stale) {
return false;
}
if (details.staleReasons.every((reason) => REPORT_ONLY_STALE_LOCK_REASONS.has(reason))) {
return false;
}
if (!lockInspectionNeedsMtimeStaleFallback(details)) {
return true;
}
@@ -602,10 +635,7 @@ async function shouldRemoveLockDuringCleanup(
if (!details.stale) {
return false;
}
if (details.staleReasons.every((reason) => REPORT_ONLY_STALE_LOCK_REASONS.has(reason))) {
return false;
}
return await shouldReclaimContendedLockFile(lockPath, details, staleMs, nowMs);
return await shouldRemoveContendedLockFile(lockPath, details, staleMs, nowMs);
}
function sessionLockHeldByThisProcess(normalizedSessionFile: string): boolean {
@@ -638,6 +668,25 @@ function shouldTreatAsOrphanSelfLock(params: {
return currentStarttime !== null && currentStarttime === storedStarttime;
}
function describeLockOwnerForError(params: {
payload: LockFilePayload | null;
inspected: LockInspectionDetails;
}): string {
const parts: string[] = [];
if (params.inspected.pid !== null) {
parts.push(`pid=${params.inspected.pid}`);
parts.push(`alive=${params.inspected.pidAlive ? "true" : "false"}`);
} else if (typeof params.payload?.pid === "number") {
parts.push(`pid=${params.payload.pid}`);
} else {
parts.push("owner=unknown");
}
if (typeof params.inspected.ageMs === "number") {
parts.push(`ageMs=${Math.floor(params.inspected.ageMs)}`);
}
return parts.join(" ");
}
function inspectLockPayloadForSession(params: {
payload: LockFilePayload | null;
staleMs: number;
@@ -828,13 +877,14 @@ export async function acquireSessionWriteLock(params: {
readOwnerProcessArgs: readProcessArgsSync,
respectMaxHold: !heldByThisProcess,
});
return await shouldReclaimContendedLockFile(
return await shouldReportContendedLockStale({
lockPath,
inspected,
details: inspected,
heldByThisProcess,
staleMs,
nowMs,
orphanPayloadGraceMs,
);
});
},
shouldRemoveStaleLock: async ({ lockPath, normalizedTargetPath, payload }) => {
await yieldEventLoop();
@@ -849,7 +899,7 @@ export async function acquireSessionWriteLock(params: {
readOwnerProcessArgs: readProcessArgsSync,
respectMaxHold: !heldByThisProcess,
});
return await shouldReclaimContendedLockFile(
return await shouldRemoveContendedLockFile(
lockPath,
inspected,
staleMs,
@@ -860,13 +910,31 @@ export async function acquireSessionWriteLock(params: {
});
return { release: lock.release };
} catch (err) {
if (!isFileLockError(err, "file_lock_timeout")) {
if (!isFileLockError(err, "file_lock_timeout") && !isFileLockError(err, "file_lock_stale")) {
throw err;
}
const timeoutLockPath = (err as { lockPath?: string }).lockPath ?? lockPath;
const payload = await readLockPayload(timeoutLockPath);
const owner = typeof payload?.pid === "number" ? `pid=${payload.pid}` : "unknown";
throw new SessionWriteLockTimeoutError({ timeoutMs, owner, lockPath: timeoutLockPath });
const errorLockPath = (err as { lockPath?: string }).lockPath ?? lockPath;
const payload = await readLockPayload(errorLockPath);
const nowMs = Date.now();
const heldByThisProcess = sessionLockHeldByThisProcess(normalizedSessionFile);
const inspected = inspectLockPayloadForSession({
payload,
staleMs,
nowMs,
heldByThisProcess,
reclaimLockWithoutStarttime: true,
readOwnerProcessArgs: readProcessArgsSync,
respectMaxHold: !heldByThisProcess,
});
const owner = describeLockOwnerForError({ payload, inspected });
if (isFileLockError(err, "file_lock_stale")) {
throw new SessionWriteLockStaleError({
owner,
lockPath: errorLockPath,
staleReasons: inspected.staleReasons,
});
}
throw new SessionWriteLockTimeoutError({ timeoutMs, owner, lockPath: errorLockPath });
}
}
}

View File

@@ -43,7 +43,7 @@ import type { EmbeddedAgentQueueMessageOptions } from "./embedded-agent-runner/r
import type { EmbeddedAgentQueueMessageOutcome } from "./embedded-agent-runner/runs.js";
import { mediaUrlsFromGeneratedAttachments } from "./generated-attachments.js";
import type { AgentInternalEvent } from "./internal-events.js";
import { isSessionWriteLockTimeoutError } from "./session-write-lock-error.js";
import { isSessionWriteLockAcquireError } from "./session-write-lock-error.js";
import {
callGateway,
createBoundDeliveryRouter,
@@ -398,12 +398,13 @@ function isIncompleteAnnounceAgentResultError(error: unknown): boolean {
}
function isSessionWriteLockAnnounceAgentError(error: unknown): boolean {
if (isSessionWriteLockTimeoutError(error)) {
if (isSessionWriteLockAcquireError(error)) {
return true;
}
const message = summarizeDeliveryError(error);
return (
/\bSessionWriteLockTimeoutError\b/.test(message) || /\bsession file locked\b/i.test(message)
/\bSessionWriteLock(?:Timeout|Stale)Error\b/.test(message) ||
/\bsession file lock(?:ed| stale)\b/i.test(message)
);
}