mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(session-lock): enforce maxHoldMs in shouldReclaim during lock acquisition (#85764)
* fix(session-lock): enforce maxHoldMs in shouldReclaim during lock acquisition - Adds optional maxHoldMs parameter to inspectLockPayload - Inspect now marks locks as stale when held longer than maxHoldMs - Passes maxHoldMs through inspectLockPayloadForSession - acquireSessionWriteLock's shouldReclaim callback now passes maxHoldMs This ensures that when a live process holds a lock for longer than maxHoldMs (default 5min), other processes can reclaim it during acquisition — matching the watchdog's existing enforcement. Previously shouldReclaim only used staleMs (30min default), meaning a lock held for 10+ minutes by a live PID would never be reclaimable, causing 60s timeout failures and gateway freezes. Closes #85762 * fix(session-lock): add dead-PID fast-path before retry loop Adds a fast-path check at the top of acquireSessionWriteLock: if the lock file's owner PID is dead, remove it immediately before entering the retry loop. This saves up to timeoutMs (60s) of futile waiting when the previous lock holder has died. The shouldReclaim callback already handles this case, but only iteratively through the retry loop. The fast-path eliminates that unnecessary delay. * fix(session-lock): enforce max hold during acquisition * fix(session-lock): revalidate max hold safely * fix(session-lock): honor holder max-hold policy * fix(session-lock): keep cleanup from reclaiming live holders * fix(session-lock): remove stale locks only when unchanged * fix(session-lock): skip self-held max-hold reclaim * fix(ci): refresh gateway protocol checks --------- Co-authored-by: njuboy11 <njuboy11@users.noreply.github.com> Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
committed by
Peter Steinberger
parent
57b956fd7c
commit
8ac7cd621b
@@ -66,6 +66,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Checks/Windows: route full `pnpm check` stage commands through the managed child runner so Windows avoids Node shell-argv deprecation warnings there too.
|
||||
- Checks/Windows: run managed child commands through explicit `cmd.exe` wrapping instead of Node shell mode with argv, avoiding Node 24 subprocess deprecation warnings during changed checks.
|
||||
- Gateway: omit internal stream-error placeholder entries from agent prompt history so failed assistant turns are not replayed as model-authored text. (#85652) Thanks @anyech.
|
||||
- Sessions: enforce the session write-lock max-hold policy during lock acquisition so long-held locks can be reclaimed before the stale-lock window. (#85764) Thanks @njuboy11.
|
||||
- Models: prune retired Groq, GitHub Copilot, OpenAI, xAI, and old Claude catalog entries, with doctor migration to upgrade existing configs to current provider refs.
|
||||
- Doctor/update: recognize junction-backed source checkouts as git installs by comparing canonical paths before showing package-manager update guidance. Fixes #82215. Thanks @igormf.
|
||||
- Channels: honor `/verbose on` for tool/progress summaries across direct chats, groups, channels, and forum topics while preserving quiet default behavior. (#85488) Thanks @kurplunkin.
|
||||
|
||||
@@ -755,6 +755,8 @@ public struct AgentParams: Codable, Sendable {
|
||||
public let internalruntimehandoffid: String?
|
||||
public let internalevents: [[String: AnyCodable]]?
|
||||
public let inputprovenance: [String: AnyCodable]?
|
||||
public let suppresspromptpersistence: Bool?
|
||||
public let sessioneffects: AnyCodable?
|
||||
public let sourcereplydeliverymode: AnyCodable?
|
||||
public let voicewaketrigger: String?
|
||||
public let idempotencykey: String
|
||||
@@ -793,6 +795,8 @@ public struct AgentParams: Codable, Sendable {
|
||||
internalruntimehandoffid: String?,
|
||||
internalevents: [[String: AnyCodable]]?,
|
||||
inputprovenance: [String: AnyCodable]?,
|
||||
suppresspromptpersistence: Bool?,
|
||||
sessioneffects: AnyCodable?,
|
||||
sourcereplydeliverymode: AnyCodable?,
|
||||
voicewaketrigger: String?,
|
||||
idempotencykey: String,
|
||||
@@ -830,6 +834,8 @@ public struct AgentParams: Codable, Sendable {
|
||||
self.internalruntimehandoffid = internalruntimehandoffid
|
||||
self.internalevents = internalevents
|
||||
self.inputprovenance = inputprovenance
|
||||
self.suppresspromptpersistence = suppresspromptpersistence
|
||||
self.sessioneffects = sessioneffects
|
||||
self.sourcereplydeliverymode = sourcereplydeliverymode
|
||||
self.voicewaketrigger = voicewaketrigger
|
||||
self.idempotencykey = idempotencykey
|
||||
@@ -869,6 +875,8 @@ public struct AgentParams: Codable, Sendable {
|
||||
case internalruntimehandoffid = "internalRuntimeHandoffId"
|
||||
case internalevents = "internalEvents"
|
||||
case inputprovenance = "inputProvenance"
|
||||
case suppresspromptpersistence = "suppressPromptPersistence"
|
||||
case sessioneffects = "sessionEffects"
|
||||
case sourcereplydeliverymode = "sourceReplyDeliveryMode"
|
||||
case voicewaketrigger = "voiceWakeTrigger"
|
||||
case idempotencykey = "idempotencyKey"
|
||||
|
||||
@@ -311,6 +311,66 @@ describe("acquireSessionWriteLock", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("marks live lock payloads stale once they exceed max hold", () => {
|
||||
const nowMs = Date.now();
|
||||
const inspected = testing.inspectLockPayloadForTest(
|
||||
{
|
||||
pid: process.pid,
|
||||
createdAt: new Date(nowMs - 30_000).toISOString(),
|
||||
maxHoldMs: 10_000,
|
||||
},
|
||||
60_000,
|
||||
nowMs,
|
||||
{ respectMaxHold: true },
|
||||
);
|
||||
|
||||
expect(inspected.stale).toBe(true);
|
||||
expect(inspected.staleReasons).toEqual(["hold-exceeded"]);
|
||||
});
|
||||
|
||||
it("keeps live lock payloads fresh until their recorded holder max hold expires", () => {
|
||||
const nowMs = Date.now();
|
||||
const inspected = testing.inspectLockPayloadForTest(
|
||||
{
|
||||
pid: process.pid,
|
||||
createdAt: new Date(nowMs - 30_000).toISOString(),
|
||||
maxHoldMs: 60_000,
|
||||
},
|
||||
60_000,
|
||||
nowMs,
|
||||
{ respectMaxHold: true },
|
||||
);
|
||||
|
||||
expect(inspected.stale).toBe(false);
|
||||
expect(inspected.staleReasons).toEqual([]);
|
||||
});
|
||||
|
||||
it("does not reclaim an active in-process lock through max-hold acquisition", async () => {
|
||||
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
|
||||
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, maxHoldMs: 1 });
|
||||
await fs.writeFile(
|
||||
lockPath,
|
||||
JSON.stringify({
|
||||
pid: process.pid,
|
||||
createdAt: new Date(Date.now() - 30_000).toISOString(),
|
||||
maxHoldMs: 1,
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
await expect(
|
||||
acquireSessionWriteLock({
|
||||
sessionFile,
|
||||
timeoutMs: 5,
|
||||
staleMs: 60_000,
|
||||
allowReentrant: false,
|
||||
}),
|
||||
).rejects.toThrow(/session file locked/);
|
||||
await expect(fs.access(lockPath)).resolves.toBeUndefined();
|
||||
await lock.release();
|
||||
});
|
||||
});
|
||||
|
||||
it("watchdog releases stale in-process locks", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
|
||||
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
|
||||
@@ -457,6 +517,47 @@ describe("acquireSessionWriteLock", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("does not clean live OpenClaw locks just because holder max hold expired", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-policy-"));
|
||||
const sessionsDir = path.join(root, "sessions");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const nowMs = Date.now();
|
||||
const lockPath = path.join(sessionsDir, "held-past-max.jsonl.lock");
|
||||
|
||||
try {
|
||||
await fs.writeFile(
|
||||
lockPath,
|
||||
JSON.stringify({
|
||||
pid: process.pid,
|
||||
createdAt: new Date(nowMs - 30_000).toISOString(),
|
||||
maxHoldMs: 10_000,
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const result = await cleanStaleLockFiles({
|
||||
sessionsDir,
|
||||
staleMs: 60_000,
|
||||
nowMs,
|
||||
removeStale: true,
|
||||
readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "agent"],
|
||||
});
|
||||
|
||||
expect(lockCleanupRecords(result.locks)).toEqual([
|
||||
{
|
||||
name: "held-past-max.jsonl.lock",
|
||||
removed: false,
|
||||
stale: false,
|
||||
staleReasons: [],
|
||||
},
|
||||
]);
|
||||
expect(result.cleaned).toEqual([]);
|
||||
await expect(fs.access(lockPath)).resolves.toBeUndefined();
|
||||
} finally {
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("clamps max hold for effectively no-timeout runs", () => {
|
||||
expect(
|
||||
resolveSessionLockMaxHoldFromTimeout({
|
||||
|
||||
@@ -12,6 +12,7 @@ type LockFilePayload = {
|
||||
createdAt?: string;
|
||||
/** Process start time in clock ticks (from /proc/pid/stat field 22). */
|
||||
starttime?: number;
|
||||
maxHoldMs?: number;
|
||||
};
|
||||
|
||||
function isValidLockNumber(value: unknown): value is number {
|
||||
@@ -378,6 +379,9 @@ async function readLockPayload(lockPath: string): Promise<LockFilePayload | null
|
||||
if (isValidLockNumber(parsed.starttime)) {
|
||||
payload.starttime = parsed.starttime;
|
||||
}
|
||||
if (isValidLockNumber(parsed.maxHoldMs) && parsed.maxHoldMs > 0) {
|
||||
payload.maxHoldMs = parsed.maxHoldMs;
|
||||
}
|
||||
return payload;
|
||||
} catch {
|
||||
return null;
|
||||
@@ -449,6 +453,7 @@ function inspectLockPayload(
|
||||
payload: LockFilePayload | null,
|
||||
staleMs: number,
|
||||
nowMs: number,
|
||||
opts: { respectMaxHold?: boolean } = {},
|
||||
): LockInspectionDetails {
|
||||
const pid = isValidLockNumber(payload?.pid) && payload.pid > 0 ? payload.pid : null;
|
||||
const pidAlive = pid !== null ? isPidAlive(pid) : false;
|
||||
@@ -481,6 +486,16 @@ function inspectLockPayload(
|
||||
} else if (ageMs > staleMs) {
|
||||
staleReasons.push("too-old");
|
||||
}
|
||||
const holderMaxHoldMs =
|
||||
isValidLockNumber(payload?.maxHoldMs) && payload.maxHoldMs > 0 ? payload.maxHoldMs : undefined;
|
||||
if (
|
||||
opts.respectMaxHold === true &&
|
||||
typeof holderMaxHoldMs === "number" &&
|
||||
ageMs !== null &&
|
||||
ageMs > holderMaxHoldMs
|
||||
) {
|
||||
staleReasons.push("hold-exceeded");
|
||||
}
|
||||
|
||||
return {
|
||||
pid,
|
||||
@@ -552,39 +567,6 @@ function sessionLockHeldByThisProcess(normalizedSessionFile: string): boolean {
|
||||
);
|
||||
}
|
||||
|
||||
async function removeReportedStaleLockIfStillStale(params: {
|
||||
lockPath: string;
|
||||
normalizedSessionFile: string;
|
||||
staleMs: number;
|
||||
readOwnerProcessArgs?: SessionLockOwnerProcessArgsReader;
|
||||
}): Promise<boolean> {
|
||||
const nowMs = Date.now();
|
||||
const payload = await readLockPayload(params.lockPath);
|
||||
if (payload === null) {
|
||||
try {
|
||||
await fs.access(params.lockPath);
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
return true;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
const inspected = inspectLockPayloadForSession({
|
||||
payload,
|
||||
staleMs: params.staleMs,
|
||||
nowMs,
|
||||
heldByThisProcess: sessionLockHeldByThisProcess(params.normalizedSessionFile),
|
||||
reclaimLockWithoutStarttime: true,
|
||||
readOwnerProcessArgs: params.readOwnerProcessArgs ?? readProcessArgsSync,
|
||||
});
|
||||
if (!(await shouldReclaimContendedLockFile(params.lockPath, inspected, params.staleMs, nowMs))) {
|
||||
return false;
|
||||
}
|
||||
await fs.rm(params.lockPath, { force: true });
|
||||
return true;
|
||||
}
|
||||
|
||||
function shouldTreatAsOrphanSelfLock(params: {
|
||||
payload: LockFilePayload | null;
|
||||
heldByThisProcess: boolean;
|
||||
@@ -616,8 +598,11 @@ function inspectLockPayloadForSession(params: {
|
||||
heldByThisProcess: boolean;
|
||||
reclaimLockWithoutStarttime: boolean;
|
||||
readOwnerProcessArgs: SessionLockOwnerProcessArgsReader;
|
||||
respectMaxHold?: boolean;
|
||||
}): LockInspectionDetails {
|
||||
const inspected = inspectLockPayload(params.payload, params.staleMs, params.nowMs);
|
||||
const inspected = inspectLockPayload(params.payload, params.staleMs, params.nowMs, {
|
||||
respectMaxHold: params.respectMaxHold,
|
||||
});
|
||||
if (
|
||||
shouldTreatAsOrphanSelfLock({
|
||||
payload: params.payload,
|
||||
@@ -745,18 +730,20 @@ export async function acquireSessionWriteLock(params: {
|
||||
const normalizedSessionFile = await resolveNormalizedSessionFile(sessionFile);
|
||||
const lockPath = `${normalizedSessionFile}.lock`;
|
||||
await fs.mkdir(sessionDir, { recursive: true });
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const lock = await SESSION_LOCKS.acquire(sessionFile, {
|
||||
staleMs,
|
||||
timeoutMs,
|
||||
retry: { minTimeout: 50, maxTimeout: 1000, factor: 1 },
|
||||
staleRecovery: "remove-if-unchanged",
|
||||
allowReentrant,
|
||||
metadata: { maxHoldMs },
|
||||
payload: () => {
|
||||
const createdAt = new Date().toISOString();
|
||||
const starttime = resolveProcessStartTimeForLock(process.pid);
|
||||
const lockPayload: LockFilePayload = { pid: process.pid, createdAt };
|
||||
const lockPayload: LockFilePayload = { pid: process.pid, createdAt, maxHoldMs };
|
||||
if (starttime !== null) {
|
||||
lockPayload.starttime = starttime;
|
||||
}
|
||||
@@ -770,24 +757,27 @@ export async function acquireSessionWriteLock(params: {
|
||||
heldByThisProcess,
|
||||
reclaimLockWithoutStarttime: true,
|
||||
readOwnerProcessArgs: readProcessArgsSync,
|
||||
respectMaxHold: !heldByThisProcess,
|
||||
});
|
||||
return await shouldReclaimContendedLockFile(lockPath, inspected, staleMs, nowMs);
|
||||
},
|
||||
shouldRemoveStaleLock: async ({ lockPath, normalizedTargetPath, payload }) => {
|
||||
const nowMs = Date.now();
|
||||
const heldByThisProcess = sessionLockHeldByThisProcess(normalizedTargetPath);
|
||||
const inspected = inspectLockPayloadForSession({
|
||||
payload: payload as LockFilePayload | null,
|
||||
staleMs,
|
||||
nowMs,
|
||||
heldByThisProcess,
|
||||
reclaimLockWithoutStarttime: true,
|
||||
readOwnerProcessArgs: readProcessArgsSync,
|
||||
respectMaxHold: !heldByThisProcess,
|
||||
});
|
||||
return await shouldReclaimContendedLockFile(lockPath, inspected, staleMs, nowMs);
|
||||
},
|
||||
});
|
||||
return { release: lock.release };
|
||||
} catch (err) {
|
||||
if (isFileLockError(err, "file_lock_stale")) {
|
||||
const staleLockPath = (err as { lockPath?: string }).lockPath ?? lockPath;
|
||||
if (
|
||||
await removeReportedStaleLockIfStillStale({
|
||||
lockPath: staleLockPath,
|
||||
normalizedSessionFile,
|
||||
staleMs,
|
||||
})
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!isFileLockError(err, "file_lock_timeout")) {
|
||||
throw err;
|
||||
}
|
||||
@@ -802,6 +792,7 @@ export async function acquireSessionWriteLock(params: {
|
||||
export const testing = {
|
||||
cleanupSignals: [...CLEANUP_SIGNALS],
|
||||
handleTerminationSignal,
|
||||
inspectLockPayloadForTest: inspectLockPayload,
|
||||
releaseAllLocksSync,
|
||||
runLockWatchdogCheck,
|
||||
setProcessStartTimeResolverForTest(resolver: ((pid: number) => number | null) | null): void {
|
||||
|
||||
@@ -1639,6 +1639,67 @@ describe("gateway agent handler", () => {
|
||||
expect(callArgs.message).toContain("sourceTool=subagent_announce");
|
||||
});
|
||||
|
||||
it("rejects public internal session-effect controls", async () => {
|
||||
primeMainAgentRun({ cfg: mocks.loadConfigReturn });
|
||||
mocks.agentCommand.mockClear();
|
||||
|
||||
for (const params of [
|
||||
{ sessionEffects: "internal" as const, idempotencyKey: "test-public-internal-effects" },
|
||||
{ suppressPromptPersistence: true, idempotencyKey: "test-public-prompt-suppress" },
|
||||
]) {
|
||||
const respond = await invokeAgent(
|
||||
{
|
||||
message: "forged internal control",
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
...params,
|
||||
},
|
||||
{ reqId: params.idempotencyKey, flushDispatch: false },
|
||||
);
|
||||
|
||||
expectRespondError(respond, {
|
||||
message: "internal session-effect controls are reserved for backend callers.",
|
||||
});
|
||||
}
|
||||
expect(mocks.agentCommand).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps backend internal session-effect runs out of visible gateway state", async () => {
|
||||
primeMainAgentRun({ cfg: mocks.loadConfigReturn });
|
||||
mocks.agentCommand.mockClear();
|
||||
mocks.updateSessionStore.mockClear();
|
||||
mocks.registerAgentRunContext.mockClear();
|
||||
const context = makeContext();
|
||||
|
||||
await invokeAgent(
|
||||
{
|
||||
message: "internal resume",
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
sessionEffects: "internal",
|
||||
suppressPromptPersistence: true,
|
||||
idempotencyKey: "test-backend-internal-effects",
|
||||
},
|
||||
{
|
||||
reqId: "backend-internal-effects",
|
||||
client: backendGatewayClient(),
|
||||
context,
|
||||
},
|
||||
);
|
||||
|
||||
const callArgs = await waitForAgentCommandCall<{
|
||||
sessionEffects?: string;
|
||||
suppressPromptPersistence?: boolean;
|
||||
}>();
|
||||
expect(callArgs.sessionEffects).toBe("internal");
|
||||
expect(callArgs.suppressPromptPersistence).toBe(true);
|
||||
expect(mocks.updateSessionStore).not.toHaveBeenCalled();
|
||||
expect(context.addChatRun).not.toHaveBeenCalled();
|
||||
expect(mocks.registerAgentRunContext).toHaveBeenCalledWith("test-backend-internal-effects", {
|
||||
isControlUiVisible: false,
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects public transcriptMessage overrides", async () => {
|
||||
primeMainAgentRun({ cfg: mocks.loadConfigReturn });
|
||||
mocks.agentCommand.mockClear();
|
||||
|
||||
Reference in New Issue
Block a user