diff --git a/src/agents/command/attempt-execution.cli.test.ts b/src/agents/command/attempt-execution.cli.test.ts index 13a80f2bcabf..68ce1f987d8a 100644 --- a/src/agents/command/attempt-execution.cli.test.ts +++ b/src/agents/command/attempt-execution.cli.test.ts @@ -693,30 +693,62 @@ describe("CLI attempt execution", () => { it("persists CLI replies into the session transcript", async () => { const sessionKey = "agent:main:subagent:cli-transcript"; + const sessionFile = path.join(tmpDir, "session-cli-transcript.jsonl"); const sessionEntry: SessionEntry = { sessionId: "session-cli-transcript", - updatedAt: Date.now(), + sessionFile, + updatedAt: 1, + status: "running", + startedAt: 2, }; const sessionStore: Record = { [sessionKey]: sessionEntry }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8"); - - const updatedEntry = await persistCliTurnTranscript({ - body: "persist this", - result: makeCliResult("hello from cli"), - sessionId: sessionEntry.sessionId, - sessionKey, - sessionEntry, - sessionStore, + await fs.writeFile( storePath, - sessionAgentId: "main", - sessionCwd: tmpDir, - config: {}, - }); + JSON.stringify( + { + [sessionKey]: { + ...sessionEntry, + updatedAt: 5, + status: "done", + endedAt: 4, + }, + }, + null, + 2, + ), + "utf-8", + ); - const sessionFile = updatedEntry?.sessionFile; - if (!sessionFile) { + const nowCalls: number[] = []; + let nextNow = 10_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => { + nextNow += 1_000; + nowCalls.push(nextNow); + return nextNow; + }); + let updatedEntry: SessionEntry | undefined; + try { + updatedEntry = await persistCliTurnTranscript({ + body: "persist this", + result: makeCliResult("hello from cli"), + sessionId: sessionEntry.sessionId, + sessionKey, + sessionEntry, + sessionStore, + storePath, + sessionAgentId: "main", + sessionCwd: tmpDir, + config: {}, + }); + } finally { + nowSpy.mockRestore(); + } + + const updatedSessionFile = updatedEntry?.sessionFile; + if (!updatedSessionFile) { throw new Error("expected CLI transcript persistence to create a session file"); } + expect(updatedSessionFile).toBe(sessionFile); const entries = await readSessionFileEntries(sessionFile); expectRecordFields(requireRecord(entries[0], "session entry"), { type: "session", @@ -744,6 +776,18 @@ describe("CLI attempt execution", () => { model: "opus", content: [{ type: "text", text: "hello from cli" }], }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + SessionEntry + >; + expect(persisted[sessionKey]?.sessionFile).toBe(sessionFile); + expect(persisted[sessionKey]?.updatedAt).toBeGreaterThan(sessionEntry.updatedAt); + expect(persisted[sessionKey]?.updatedAt).toBeLessThan(nowCalls.at(-1) ?? 0); + expect(persisted[sessionKey]?.status).toBe("done"); + expect(persisted[sessionKey]?.endedAt).toBe(4); + expect(persisted[sessionKey]?.startedAt).toBe(2); + expect(sessionStore[sessionKey]?.updatedAt).toBe(persisted[sessionKey]?.updatedAt); }); it("embedded assistant gap-fill skips user mirror and dedupes identical assistant tails", async () => { diff --git a/src/agents/command/attempt-execution.shared.ts b/src/agents/command/attempt-execution.shared.ts index f77f41f168ad..3d7f808c8937 100644 --- a/src/agents/command/attempt-execution.shared.ts +++ b/src/agents/command/attempt-execution.shared.ts @@ -21,10 +21,16 @@ export type PersistSessionEntryParams = { storePath: string; entry: SessionEntry; clearedFields?: string[]; + preserveTranscriptMarkerUpdatedAt?: boolean; shouldPersist?: (entry: SessionEntry | undefined) => boolean; }; /** Persists one session entry while keeping the caller's in-memory store aligned. */ + +function normalizeTranscriptMarkerUpdatedAt(value: number | undefined): number | undefined { + return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : undefined; +} + export async function persistSessionEntry( params: PersistSessionEntryParams, ): Promise { @@ -36,6 +42,13 @@ export async function persistSessionEntry( return current; } const merged = mergeSessionEntry(store[params.sessionKey], params.entry); + if (params.preserveTranscriptMarkerUpdatedAt) { + const currentUpdatedAt = normalizeTranscriptMarkerUpdatedAt(current?.updatedAt); + const markerUpdatedAt = normalizeTranscriptMarkerUpdatedAt(params.entry.updatedAt); + if (markerUpdatedAt !== undefined) { + merged.updatedAt = Math.max(currentUpdatedAt ?? 0, markerUpdatedAt); + } + } for (const field of params.clearedFields ?? []) { // Cleared fields only apply when the replacement entry did not set the // field again; this preserves explicit false/null updates. diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index 5b560aad01bf..412a8238d5a9 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -244,7 +244,9 @@ async function persistTextTurnTranscript( ...resolveSessionWriteLockOptions(params.config), allowReentrant: true, }); + let transcriptMarkerUpdatedAt: number | undefined; try { + let wroteTranscript = false; const userMessage = params.userMessage; if (userMessage || promptText) { await appendUserTurnTranscriptMessage({ @@ -264,6 +266,7 @@ async function persistTextTurnTranscript( }), updateMode: "none", }); + wroteTranscript = true; } if (replyText) { @@ -293,18 +296,44 @@ async function persistTextTurnTranscript( timestamp: Date.now(), }, }); + wroteTranscript = true; } } + if (wroteTranscript) { + transcriptMarkerUpdatedAt = Date.now(); + } } finally { await lock.release(); } + let updatedSessionEntry = sessionEntry; + if (params.sessionStore && params.storePath && transcriptMarkerUpdatedAt !== undefined) { + const currentEntry = params.sessionStore[params.sessionKey] ?? sessionEntry; + if (currentEntry?.sessionId === params.sessionId) { + // Keep updatedAt as the registry marker for transcript writes we own. + // Session reuse checks compare transcript mtime against this marker, not endedAt. + updatedSessionEntry = + (await persistSessionEntry({ + sessionStore: params.sessionStore, + sessionKey: params.sessionKey, + storePath: params.storePath, + entry: { + sessionId: params.sessionId, + sessionFile, + updatedAt: transcriptMarkerUpdatedAt, + }, + preserveTranscriptMarkerUpdatedAt: true, + shouldPersist: (current) => current?.sessionId === params.sessionId, + })) ?? updatedSessionEntry; + } + } + emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey, agentId: params.sessionAgentId, }); - return sessionEntry; + return updatedSessionEntry; } function resolveCliTranscriptReplyText(result: EmbeddedAgentRunResult): string { diff --git a/src/agents/command/session.ts b/src/agents/command/session.ts index 46bbb3d2fe03..a2d41900b64f 100644 --- a/src/agents/command/session.ts +++ b/src/agents/command/session.ts @@ -9,7 +9,10 @@ import { type ThinkLevel, type VerboseLevel, } from "../../auto-reply/thinking.js"; -import { resolveSessionLifecycleTimestamps } from "../../config/sessions/lifecycle.js"; +import { + hasTerminalMainSessionTranscriptNewerThanRegistrySync, + resolveSessionLifecycleTimestamps, +} from "../../config/sessions/lifecycle.js"; import { resolveAgentIdFromSessionKey, resolveExplicitAgentSessionKey, @@ -53,6 +56,23 @@ type SessionKeyResolution = { storePath: string; }; +function clearRotatedTerminalMainSessionMetadata( + entry: SessionEntry | undefined, +): SessionEntry | undefined { + if (!entry) { + return undefined; + } + return { + ...entry, + sessionFile: undefined, + status: undefined, + startedAt: undefined, + endedAt: undefined, + runtimeMs: undefined, + abortedLastRun: undefined, + }; +} + type SessionIdMatchSet = { matches: Array<[string, SessionEntry]>; primaryStoreMatches: Array<[string, SessionEntry]>; @@ -328,6 +348,9 @@ export function resolveSession(opts: { const now = Date.now(); const sessionEntry = sessionKey ? sessionStore[sessionKey] : undefined; + const sessionAgentId = opts.agentId?.trim() + ? normalizeAgentId(opts.agentId) + : resolveAgentIdFromSessionKey(sessionKey); const resetType = resolveSessionResetType({ sessionKey }); const channelReset = resolveChannelResetConfig({ @@ -339,12 +362,23 @@ export function resolveSession(opts: { resetType, resetOverride: channelReset, }); + const terminalMainTranscriptNewerThanRegistry = sessionEntry + ? hasTerminalMainSessionTranscriptNewerThanRegistrySync({ + entry: sessionEntry, + sessionScope: sessionCfg?.scope, + sessionKey, + agentId: sessionAgentId, + mainKey: sessionCfg?.mainKey, + storePath, + }) + : false; const fresh = sessionEntry - ? evaluateSessionFreshness({ + ? !terminalMainTranscriptNewerThanRegistry && + evaluateSessionFreshness({ updatedAt: sessionEntry.updatedAt, ...resolveSessionLifecycleTimestamps({ entry: sessionEntry, - agentId: opts.agentId, + agentId: sessionAgentId, storePath, }), now, @@ -354,6 +388,9 @@ export function resolveSession(opts: { const sessionId = opts.sessionId?.trim() || (fresh ? sessionEntry?.sessionId : undefined) || crypto.randomUUID(); const isNewSession = !fresh && !opts.sessionId; + const resolvedSessionEntry = terminalMainTranscriptNewerThanRegistry + ? clearRotatedTerminalMainSessionMetadata(sessionEntry) + : sessionEntry; clearBootstrapSnapshotOnSessionRollover({ sessionKey, @@ -372,7 +409,7 @@ export function resolveSession(opts: { return { sessionId, sessionKey, - sessionEntry, + sessionEntry: resolvedSessionEntry, sessionStore, storePath, isNewSession, diff --git a/src/auto-reply/reply/session.test.ts b/src/auto-reply/reply/session.test.ts index f06c4b452b45..b321b80c45ba 100644 --- a/src/auto-reply/reply/session.test.ts +++ b/src/auto-reply/reply/session.test.ts @@ -181,6 +181,36 @@ async function writeSessionStoreFast( await fs.writeFile(storePath, JSON.stringify(store), "utf-8"); } +async function writeTerminalTranscriptSessionStore(params: { + storePath: string; + sessionKey: string; + sessionId: string; + status?: SessionEntry["status"]; + updatedAt: number; + endedAt: number; + transcriptMtimeMs: number; +}): Promise { + const sessionFile = `${params.sessionId}.jsonl`; + const transcriptPath = path.join(path.dirname(params.storePath), sessionFile); + await fs.writeFile( + transcriptPath, + `${JSON.stringify({ type: "session", id: params.sessionId })}\n`, + "utf-8", + ); + await fs.utimes(transcriptPath, params.transcriptMtimeMs / 1000, params.transcriptMtimeMs / 1000); + await writeSessionStoreFast(params.storePath, { + [params.sessionKey]: { + sessionId: params.sessionId, + sessionFile, + updatedAt: params.updatedAt, + startedAt: params.endedAt - 10_000, + endedAt: params.endedAt, + runtimeMs: 9_000, + status: params.status ?? "done", + }, + }); +} + function setMinimalCurrentConversationBindingRegistryForTests(): void { setActivePluginRegistry( createTestRegistry([ @@ -1594,6 +1624,101 @@ describe("initSessionState reset policy", () => { expect(persisted[sessionKey]?.runtimeMs).toBe(9_000); }); + it.each([ + { + name: "non-main terminal rows ignore transcript mtime", + sessionKey: "agent:main:whatsapp:dm:terminal-entry", + updatedAtOffsetMs: -5_000, + endedAtOffsetMs: -6_000, + transcriptMtimeOffsetMs: -3_000, + expectNewSession: false, + }, + { + name: "main terminal rows rotate when transcript is newer than updatedAt", + sessionKey: "agent:main:main", + updatedAtOffsetMs: -10_000, + endedAtOffsetMs: -11_000, + transcriptMtimeOffsetMs: 0, + expectNewSession: true, + }, + { + name: "failed main terminal rows reuse when the transcript exists", + sessionKey: "agent:main:main", + status: "failed" as const, + updatedAtOffsetMs: -10_000, + endedAtOffsetMs: -11_000, + transcriptMtimeOffsetMs: 0, + expectNewSession: false, + }, + { + name: "main terminal rows reuse when updatedAt already reflects the transcript", + sessionKey: "agent:main:main", + updatedAtOffsetMs: -1_000, + endedAtOffsetMs: -6_000, + transcriptMtimeOffsetMs: -4_000, + expectNewSession: false, + }, + { + name: "main terminal rows reuse when transcript mtime differs only by sub-millisecond precision", + sessionKey: "agent:main:main", + updatedAtOffsetMs: -4_000, + endedAtOffsetMs: -6_000, + transcriptMtimeOffsetMs: -3_999.5, + expectNewSession: false, + }, + { + name: "main terminal rows reuse when transcript is not newer than updatedAt", + sessionKey: "agent:main:main", + updatedAtOffsetMs: -10_000, + endedAtOffsetMs: -11_000, + transcriptMtimeOffsetMs: -15_000, + expectNewSession: false, + }, + ])("$name", async (scenario) => { + vi.setSystemTime(new Date(2026, 0, 18, 5, 30, 0)); + const root = await makeCaseDir("openclaw-reset-terminal-entry-"); + const storePath = path.join(root, "sessions.json"); + const existingSessionId = "terminal-entry-old"; + const now = Date.now(); + const terminalUpdatedAt = now + scenario.updatedAtOffsetMs; + const terminalEndedAt = now + scenario.endedAtOffsetMs; + await writeTerminalTranscriptSessionStore({ + storePath, + sessionKey: scenario.sessionKey, + sessionId: existingSessionId, + status: scenario.status, + updatedAt: terminalUpdatedAt, + endedAt: terminalEndedAt, + transcriptMtimeMs: now + scenario.transcriptMtimeOffsetMs, + }); + + const cfg = { session: { store: storePath } } as OpenClawConfig; + const result = await initSessionState({ + ctx: { Body: "hello", SessionKey: scenario.sessionKey }, + cfg, + commandAuthorized: true, + }); + + expect(result.isNewSession).toBe(scenario.expectNewSession); + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + SessionEntry + >; + const entry = persisted[scenario.sessionKey]; + if (scenario.expectNewSession) { + expect(result.sessionId).not.toBe(existingSessionId); + expect(entry?.sessionId).not.toBe(existingSessionId); + expect(entry?.status).toBeUndefined(); + expect(entry?.startedAt).toBeUndefined(); + expect(entry?.endedAt).toBeUndefined(); + expect(entry?.runtimeMs).toBeUndefined(); + } else { + expect(result.sessionId).toBe(existingSessionId); + expect(entry?.status).toBe(scenario.status ?? "done"); + expect(entry?.endedAt).toBe(terminalEndedAt); + } + }); + it("keeps the existing stale session for /reset soft", async () => { vi.setSystemTime(new Date(2026, 0, 18, 5, 30, 0)); const root = await makeCaseDir("openclaw-reset-soft-stale-"); diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index af7ba1d6e39c..b36e7a17dd0f 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -14,7 +14,10 @@ import { resetRegisteredAgentHarnessSessions } from "../../agents/harness/regist import { cleanupBrowserSessionsForLifecycleEnd } from "../../browser-lifecycle-cleanup.js"; import { normalizeChatType } from "../../channels/chat-type.js"; import { resolveGroupSessionKey } from "../../config/sessions/group.js"; -import { resolveSessionLifecycleTimestamps } from "../../config/sessions/lifecycle.js"; +import { + hasTerminalMainSessionTranscriptNewerThanRegistry, + resolveSessionLifecycleTimestamps, +} from "../../config/sessions/lifecycle.js"; import { canonicalizeMainSessionAlias } from "../../config/sessions/main-session.js"; import { deriveSessionMetaPatch } from "../../config/sessions/metadata.js"; import { resolveSessionTranscriptPath, resolveStorePath } from "../../config/sessions/paths.js"; @@ -467,10 +470,20 @@ export async function initSessionState(params: { skipConfiguredFallbackWhenActiveSessionNonAcp: false, }) ?? "", ); + const terminalMainTranscriptNewerThanRegistry = + !isSystemEvent && + (await hasTerminalMainSessionTranscriptNewerThanRegistry({ + entry, + sessionScope, + sessionKey, + agentId, + mainKey, + storePath, + })); const freshEntry = (isSystemEvent && canReuseExistingEntry) || - (entryFreshness?.fresh ?? false) || - (softResetAllowed && canReuseExistingEntry); + (((entryFreshness?.fresh ?? false) || (softResetAllowed && canReuseExistingEntry)) && + !terminalMainTranscriptNewerThanRegistry); // Capture the current session entry before any reset so its transcript can be // archived afterward. We need to do this for both explicit resets (/new, /reset) // and for scheduled/daily resets where the session has become stale (!freshEntry). diff --git a/src/commands/agent.session.test.ts b/src/commands/agent.session.test.ts index 2e459e504b58..2c6ac9c12318 100644 --- a/src/commands/agent.session.test.ts +++ b/src/commands/agent.session.test.ts @@ -4,8 +4,11 @@ import path from "node:path"; import { withTempHome as withTempHomeBase } from "openclaw/plugin-sdk/test-env"; import { beforeEach, describe, expect, it } from "vitest"; import { resolveAgentDir, resolveSessionAgentId } from "../agents/agent-scope.js"; +import { updateSessionStoreAfterAgentRun } from "../agents/command/session-store.js"; import { resolveSession } from "../agents/command/session.js"; +import { loadSessionStore } from "../config/sessions/store-load.js"; import { clearSessionStoreCacheForTest } from "../config/sessions/store.js"; +import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; @@ -146,6 +149,91 @@ describe("agent session resolution", () => { }); }); + it("rotates stale terminal main sessions whose transcript is newer than the registry", async () => { + const scenarios = [ + { label: "canonical main", mainKey: "main", sessionKey: "agent:main:main" }, + { label: "raw main alias", mainKey: "main", sessionKey: "main" }, + { label: "custom main alias", mainKey: "work", sessionKey: "agent:main:main" }, + ]; + for (const scenario of scenarios) { + await withTempHome(async (home) => { + const store = path.join(home, "sessions.json"); + const sessionFile = path.join(home, `session-${scenario.label.replaceAll(" ", "-")}.jsonl`); + const sessionId = `stale-terminal-${scenario.label.replaceAll(" ", "-")}`; + const registryUpdatedAt = Date.now() - 10_000; + fs.mkdirSync(path.dirname(sessionFile), { recursive: true }); + fs.writeFileSync(sessionFile, JSON.stringify({ type: "session", id: sessionId }) + "\n"); + fs.utimesSync( + sessionFile, + (registryUpdatedAt + 5_000) / 1000, + (registryUpdatedAt + 5_000) / 1000, + ); + writeSessionStoreSeed(store, { + [scenario.sessionKey]: { + sessionId, + sessionFile, + updatedAt: registryUpdatedAt, + status: "done", + startedAt: registryUpdatedAt - 1_000, + endedAt: registryUpdatedAt - 100, + }, + }); + const cfg = mockConfig(home, store); + cfg.session = { ...cfg.session, mainKey: scenario.mainKey }; + + const resolution = resolveSession({ cfg, sessionKey: scenario.sessionKey }); + + expect(resolution.isNewSession).toBe(true); + expect(resolution.sessionId).not.toBe(sessionId); + expect(resolution.sessionEntry?.sessionFile).toBeUndefined(); + expect(resolution.sessionEntry?.status).toBeUndefined(); + expect(resolution.sessionEntry?.startedAt).toBeUndefined(); + expect(resolution.sessionEntry?.endedAt).toBeUndefined(); + expect(resolution.sessionEntry?.runtimeMs).toBeUndefined(); + + const sessionStore = { + [scenario.sessionKey]: resolution.sessionEntry!, + }; + await resolveSessionTranscriptFile({ + sessionId: resolution.sessionId, + sessionKey: scenario.sessionKey, + sessionEntry: resolution.sessionEntry, + sessionStore, + storePath: resolution.storePath, + agentId: "main", + }); + await updateSessionStoreAfterAgentRun({ + cfg, + sessionId: resolution.sessionId, + sessionKey: scenario.sessionKey, + storePath: resolution.storePath, + sessionStore, + defaultProvider: "openai", + defaultModel: "gpt-5.5", + result: { + payloads: [], + meta: { + aborted: false, + agentMeta: { + provider: "openai", + model: "gpt-5.5", + }, + }, + } as never, + }); + const persisted = loadSessionStore(resolution.storePath, { skipCache: true })[ + scenario.sessionKey + ]; + expect(persisted?.sessionId).toBe(resolution.sessionId); + expect(persisted?.sessionFile).not.toBe(sessionFile); + expect(persisted?.status).toBeUndefined(); + expect(persisted?.startedAt).toBeUndefined(); + expect(persisted?.endedAt).toBeUndefined(); + expect(persisted?.runtimeMs).toBeUndefined(); + }); + } + }); + it("forwards resolved outbound session context when resuming by sessionId", async () => { await withCrossAgentResumeFixture(async ({ sessionId, sessionKey, cfg }) => { const resolution = resolveSession({ cfg, sessionId }); diff --git a/src/config/sessions/lifecycle.ts b/src/config/sessions/lifecycle.ts index d528d759471c..c7ada51fed0f 100644 --- a/src/config/sessions/lifecycle.ts +++ b/src/config/sessions/lifecycle.ts @@ -1,12 +1,14 @@ // Session lifecycle timestamps prefer store metadata and fall back to transcript headers. import fs from "node:fs"; +import fsp from "node:fs/promises"; import { asDateTimestampMs } from "../../shared/number-coercion.js"; +import { canonicalizeMainSessionAlias } from "./main-session.js"; import { resolveSessionFilePath, resolveSessionFilePathOptions, type SessionFilePathOptions, } from "./paths.js"; -import type { SessionEntry } from "./types.js"; +import { isTerminalSessionStatus, type SessionEntry, type SessionScope } from "./types.js"; type SessionLifecycleEntry = Pick< SessionEntry, @@ -14,11 +16,31 @@ type SessionLifecycleEntry = Pick< >; // Transcript headers are read lazily to recover startedAt without parsing full files. + +type TerminalMainSessionTranscriptRegistryParams = { + entry: SessionEntry | undefined; + sessionScope?: SessionScope; + sessionKey?: string; + agentId: string; + mainKey?: string; + storePath?: string; +}; + +type TerminalMainSessionTranscriptRegistryCheck = { + sessionId: string; + registryTimestampMs: number; +}; + function resolveTimestamp(value: number | undefined): number | undefined { const timestampMs = asDateTimestampMs(value); return timestampMs !== undefined && timestampMs >= 0 ? timestampMs : undefined; } +function resolvePositiveTimestamp(value: number | undefined): number | undefined { + const timestampMs = resolveTimestamp(value); + return timestampMs !== undefined && timestampMs > 0 ? timestampMs : undefined; +} + function parseTimestampMs(value: unknown): number | undefined { if (typeof value === "number") { return resolveTimestamp(value); @@ -117,3 +139,99 @@ export function resolveSessionLifecycleTimestamps(params: { lastInteractionAt: resolveTimestamp(entry.lastInteractionAt), }; } + +export function resolveTerminalMainSessionTranscriptRegistryCheck( + params: TerminalMainSessionTranscriptRegistryParams, +): TerminalMainSessionTranscriptRegistryCheck | undefined { + if (!params.entry || !params.sessionKey) { + return undefined; + } + const configuredMainSessionKey = canonicalizeMainSessionAlias({ + cfg: { session: { scope: params.sessionScope, mainKey: params.mainKey } }, + agentId: params.agentId, + sessionKey: params.mainKey ?? "main", + }); + const candidateSessionKey = canonicalizeMainSessionAlias({ + cfg: { session: { scope: params.sessionScope, mainKey: params.mainKey } }, + agentId: params.agentId, + sessionKey: params.sessionKey, + }); + if (candidateSessionKey !== configuredMainSessionKey) { + return undefined; + } + if (!isTerminalSessionStatus(params.entry.status)) { + return undefined; + } + if (params.entry.status === "failed") { + // Failed rows with a present transcript stay reusable for retry/recovery. + // Callers already rotate failed rows when the transcript is missing. + return undefined; + } + // updatedAt is touched after managed transcript appends; endedAt can predate + // healthy post-run transcript writes and would rotate valid sessions. + const registryTimestampMs = resolvePositiveTimestamp(params.entry.updatedAt); + if (registryTimestampMs === undefined) { + return undefined; + } + const sessionId = typeof params.entry.sessionId === "string" ? params.entry.sessionId.trim() : ""; + if (!sessionId) { + return undefined; + } + return { sessionId, registryTimestampMs }; +} + +function isTranscriptMtimeNewerThanRegistry(params: { + transcriptMtimeMs: number; + registryTimestampMs: number; +}): boolean { + const transcriptMtimeMs = Math.floor(params.transcriptMtimeMs); + const registryTimestampMs = Math.floor(params.registryTimestampMs); + return Number.isFinite(transcriptMtimeMs) && transcriptMtimeMs > registryTimestampMs; +} + +export function hasTerminalMainSessionTranscriptNewerThanRegistrySync( + params: TerminalMainSessionTranscriptRegistryParams, +): boolean { + const check = resolveTerminalMainSessionTranscriptRegistryCheck(params); + if (!check) { + return false; + } + const pathOptions = resolveSessionFilePathOptions({ + agentId: params.agentId, + storePath: params.storePath, + }); + try { + const sessionFile = resolveSessionFilePath(check.sessionId, params.entry, pathOptions); + const stats = fs.statSync(sessionFile); + return isTranscriptMtimeNewerThanRegistry({ + transcriptMtimeMs: stats.mtimeMs, + registryTimestampMs: check.registryTimestampMs, + }); + } catch { + return false; + } +} + +export async function hasTerminalMainSessionTranscriptNewerThanRegistry( + params: TerminalMainSessionTranscriptRegistryParams, +): Promise { + const check = resolveTerminalMainSessionTranscriptRegistryCheck(params); + if (!check) { + return false; + } + const pathOptions = resolveSessionFilePathOptions({ + agentId: params.agentId, + storePath: params.storePath, + }); + try { + // Session admission owns this bounded stat as the terminal-main reconciliation gate. + const sessionFile = resolveSessionFilePath(check.sessionId, params.entry, pathOptions); + const stats = await fsp.stat(sessionFile); + return isTranscriptMtimeNewerThanRegistry({ + transcriptMtimeMs: stats.mtimeMs, + registryTimestampMs: check.registryTimestampMs, + }); + } catch { + return false; + } +} diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index a018fd4e7a04..119975d51822 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -723,6 +723,128 @@ describe("gateway agent handler", () => { expect(capturedEntry?.sessionFile).toBeUndefined(); }); + it("rotates a terminal main session when its transcript is newer than the registry row", async () => { + const now = Date.parse("2026-05-18T09:47:00.000Z"); + vi.useFakeTimers({ toFake: ["Date"] }); + dateOnlyFakeClockActive = true; + vi.setSystemTime(now); + + await withTempDir( + { prefix: "openclaw-gateway-terminal-main-newer-transcript-" }, + async (root) => { + const sessionsDir = `${root}/sessions`; + await fs.mkdir(sessionsDir, { recursive: true }); + const sessionFile = "terminal-main-session.jsonl"; + const transcriptPath = `${sessionsDir}/${sessionFile}`; + await fs.writeFile( + transcriptPath, + `${JSON.stringify({ type: "session", id: "terminal-main-session" })}\n`, + "utf8", + ); + await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000)); + mocks.loadSessionEntry.mockReturnValue({ + cfg: {}, + storePath: `${sessionsDir}/sessions.json`, + entry: { + sessionId: "terminal-main-session", + sessionFile, + status: "done", + updatedAt: now - 10_000, + sessionStartedAt: now - 60_000, + lastInteractionAt: now - 10_000, + startedAt: now - 20_000, + endedAt: now - 15_000, + runtimeMs: 5_000, + }, + canonicalKey: "agent:main:main", + }); + + const capturedEntry = await runMainAgentAndCaptureEntry( + "test-idem-terminal-main-newer-transcript", + ); + + const call = await waitForAgentCommandCall<{ sessionId?: string }>(); + expect(call.sessionId).not.toBe("terminal-main-session"); + expect(capturedEntry?.sessionId).not.toBe("terminal-main-session"); + expect(capturedEntry?.status).toBeUndefined(); + expect(capturedEntry?.startedAt).toBeUndefined(); + expect(capturedEntry?.endedAt).toBeUndefined(); + expect(capturedEntry?.runtimeMs).toBeUndefined(); + expect(capturedEntry?.sessionFile).toBeUndefined(); + }, + ); + }); + + it.each(["heartbeat", "cron"] as const)( + "preserves terminal main session reuse for %s gateway runs", + async (runKind) => { + const now = Date.parse("2026-05-18T09:49:00.000Z"); + vi.useFakeTimers({ toFake: ["Date"] }); + dateOnlyFakeClockActive = true; + vi.setSystemTime(now); + + await withTempDir( + { prefix: `openclaw-gateway-terminal-main-${runKind}-reuse-` }, + async (root) => { + const sessionsDir = `${root}/sessions`; + await fs.mkdir(sessionsDir, { recursive: true }); + const sessionFile = `terminal-main-${runKind}.jsonl`; + const transcriptPath = `${sessionsDir}/${sessionFile}`; + await fs.writeFile( + transcriptPath, + `${JSON.stringify({ type: "session", id: "terminal-main-session" })}\n`, + "utf8", + ); + await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000)); + const existingEntry = { + sessionId: "terminal-main-session", + sessionFile, + status: "done", + updatedAt: now - 10_000, + sessionStartedAt: now - 60_000, + lastInteractionAt: now - 10_000, + startedAt: now - 20_000, + endedAt: now - 15_000, + runtimeMs: 5_000, + }; + mocks.loadSessionEntry.mockReturnValue({ + cfg: {}, + storePath: `${sessionsDir}/sessions.json`, + entry: existingEntry, + canonicalKey: "agent:main:main", + }); + + let capturedEntry: Record | undefined; + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + const store: Record = { + "agent:main:main": { ...existingEntry }, + }; + const result = await updater(store); + capturedEntry = result as Record; + return result; + }); + mocks.agentCommand.mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { durationMs: 100 }, + }); + + await invokeAgent({ + message: `${runKind} probe`, + agentId: "main", + sessionKey: "agent:main:main", + bootstrapContextRunKind: runKind, + idempotencyKey: `test-idem-terminal-main-${runKind}-reuse`, + } as AgentParams); + + const call = await waitForAgentCommandCall<{ sessionId?: string }>(); + expect(call.sessionId).toBe("terminal-main-session"); + expect(capturedEntry?.sessionId).toBe("terminal-main-session"); + expect(capturedEntry?.sessionFile).toBe(sessionFile); + }, + ); + }, + ); + it("rotates a failed session when its default transcript is missing", async () => { const now = Date.parse("2026-05-18T09:48:00.000Z"); vi.useFakeTimers({ toFake: ["Date"] }); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 82c7f2df132b..cb8eb1ac42e1 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -51,7 +51,9 @@ import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; import { agentCommandFromIngress } from "../../commands/agent.js"; import { evaluateSessionFreshness, + hasTerminalMainSessionTranscriptNewerThanRegistry, mergeSessionEntry, + resolveTerminalMainSessionTranscriptRegistryCheck, resolveChannelResetConfig, resolveAgentIdFromSessionKey, resolveExplicitAgentSessionKey, @@ -1656,10 +1658,38 @@ export const agentHandlers: GatewayRequestHandlers = { failedSessionTranscriptMissing = true; } } + const mainSessionKeyForRequest = resolveAgentMainSessionKey({ + cfg: cfgLocal, + agentId: canonicalSessionAgentId, + }); + const isSystemGatewayRun = + request.bootstrapContextRunKind === "cron" || + request.bootstrapContextRunKind === "heartbeat"; + const terminalMainTranscriptCheck = isSystemGatewayRun + ? undefined + : resolveTerminalMainSessionTranscriptRegistryCheck({ + entry, + sessionScope: cfgLocal.session?.scope, + sessionKey: canonicalKey, + agentId: canonicalSessionAgentId, + mainKey: cfgLocal.session?.mainKey, + storePath, + }); + const terminalMainTranscriptNewerThanRegistry = terminalMainTranscriptCheck + ? await hasTerminalMainSessionTranscriptNewerThanRegistry({ + entry, + sessionScope: cfgLocal.session?.scope, + sessionKey: canonicalKey, + agentId: canonicalSessionAgentId, + mainKey: cfgLocal.session?.mainKey, + storePath, + }) + : false; const canReuseSession = Boolean(entry?.sessionId) && (freshness?.fresh ?? false) && - !failedSessionTranscriptMissing; + !failedSessionTranscriptMissing && + !terminalMainTranscriptNewerThanRegistry; const usableRequestedSessionId = requestedSessionId && (!entry?.sessionId || canReuseSession) ? requestedSessionId @@ -1672,10 +1702,7 @@ export const agentHandlers: GatewayRequestHandlers = { (!canReuseSession && !usableRequestedSessionId) || Boolean(usableRequestedSessionId && entry?.sessionId !== usableRequestedSessionId); const rotatedSessionId = Boolean(entry?.sessionId && entry.sessionId !== sessionId); - const touchInteraction = - request.bootstrapContextRunKind !== "cron" && - request.bootstrapContextRunKind !== "heartbeat" && - !request.internalEvents?.length; + const touchInteraction = !isSystemGatewayRun && !request.internalEvents?.length; const sessionAgent = canonicalSessionAgentId; type AgentSessionPatchBuild = { patch: Partial; @@ -1831,10 +1858,7 @@ export const agentHandlers: GatewayRequestHandlers = { resolvedSessionKey = canonicalSessionKey; const sessionAgentId = canonicalSessionAgentId; resolvedSessionAgentId = sessionAgentId; - const mainSessionKey = resolveAgentMainSessionKey({ - cfg: cfgLocal, - agentId: sessionAgentId, - }); + const mainSessionKey = mainSessionKeyForRequest; // Legacy stores may lack sessionStartedAt entirely. Pre-compute a // JSONL-transcript-derived candidate outside the store lock; the // updater below only writes it when the freshly-loaded store still