From ceee4c6b019c343cba004fa19bb3078a6539e873 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Fri, 5 Jun 2026 17:03:11 +0530 Subject: [PATCH] fix(sessions): mark transcript rewrites in registry --- src/agents/cli-session.ts | 7 +- src/agents/command/session.ts | 5 +- src/commands/agent.session.test.ts | 15 +++ src/config/sessions/transcript.ts | 2 +- src/gateway/server-methods/agent.test.ts | 78 +++++++++++++ src/gateway/server-methods/agent.ts | 110 +++++++++++++++--- .../chat.directive-tags.test.ts | 82 ++++++++----- src/gateway/server-methods/chat.ts | 31 ++++- 8 files changed, 281 insertions(+), 49 deletions(-) diff --git a/src/agents/cli-session.ts b/src/agents/cli-session.ts index eb7dd54755ab..bd875afeefd3 100644 --- a/src/agents/cli-session.ts +++ b/src/agents/cli-session.ts @@ -138,8 +138,13 @@ export function clearCliSession(entry: SessionEntry, provider: string): void { } } +type MutableCliSessionFields = Pick< + SessionEntry, + "cliSessionBindings" | "cliSessionIds" | "claudeCliSessionId" +>; + /** Remove every CLI session binding from a session entry. */ -export function clearAllCliSessions(entry: SessionEntry): void { +export function clearAllCliSessions(entry: Partial): void { entry.cliSessionBindings = undefined; entry.cliSessionIds = undefined; entry.claudeCliSessionId = undefined; diff --git a/src/agents/command/session.ts b/src/agents/command/session.ts index d15bc4f42897..cbcb05de496d 100644 --- a/src/agents/command/session.ts +++ b/src/agents/command/session.ts @@ -37,6 +37,7 @@ import { import { resolveSessionIdMatchSelection } from "../../sessions/session-id-resolution.js"; import { listAgentIds, resolveDefaultAgentId } from "../agent-scope.js"; import { clearBootstrapSnapshotOnSessionRollover } from "../bootstrap-cache.js"; +import { clearAllCliSessions } from "../cli-session.js"; /** Resolved command session identity plus backing store metadata. */ export type SessionResolution = { @@ -62,7 +63,7 @@ function clearRotatedTerminalMainSessionMetadata( if (!entry) { return undefined; } - return { + const next = { ...entry, sessionFile: undefined, status: undefined, @@ -73,6 +74,8 @@ function clearRotatedTerminalMainSessionMetadata( sessionStartedAt: undefined, lastInteractionAt: undefined, }; + clearAllCliSessions(next); + return next; } type SessionIdMatchSet = { diff --git a/src/commands/agent.session.test.ts b/src/commands/agent.session.test.ts index dcf4b049682c..facb8bf70d92 100644 --- a/src/commands/agent.session.test.ts +++ b/src/commands/agent.session.test.ts @@ -189,6 +189,15 @@ describe("agent session resolution", () => { lastInteractionAt: registryUpdatedAt - 30_000, startedAt: registryUpdatedAt - 1_000, endedAt: registryUpdatedAt - 100, + cliSessionBindings: { + "claude-cli": { sessionId: "old-claude-cli-session" }, + "codex-cli": { sessionId: "old-codex-cli-session" }, + }, + cliSessionIds: { + "claude-cli": "old-claude-cli-session", + "codex-cli": "old-codex-cli-session", + }, + claudeCliSessionId: "old-claude-cli-session", }, }); const cfg = mockConfig(home, store); @@ -205,6 +214,9 @@ describe("agent session resolution", () => { expect(resolution.sessionEntry?.runtimeMs).toBeUndefined(); expect(resolution.sessionEntry?.sessionStartedAt).toBeUndefined(); expect(resolution.sessionEntry?.lastInteractionAt).toBeUndefined(); + expect(resolution.sessionEntry?.cliSessionBindings).toBeUndefined(); + expect(resolution.sessionEntry?.cliSessionIds).toBeUndefined(); + expect(resolution.sessionEntry?.claudeCliSessionId).toBeUndefined(); const sessionStore = { [scenario.sessionKey]: resolution.sessionEntry!, @@ -245,6 +257,9 @@ describe("agent session resolution", () => { expect(persisted?.startedAt).toBeUndefined(); expect(persisted?.endedAt).toBeUndefined(); expect(persisted?.runtimeMs).toBeUndefined(); + expect(persisted?.cliSessionBindings).toBeUndefined(); + expect(persisted?.cliSessionIds).toBeUndefined(); + expect(persisted?.claudeCliSessionId).toBeUndefined(); expect(persisted?.sessionStartedAt).toBeGreaterThan(registryUpdatedAt); expect(persisted?.lastInteractionAt).toBeGreaterThan(registryUpdatedAt); }); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index 87b6e2bded63..a3d4a0037590 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -302,7 +302,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { let transcriptMarkerUpdatedAt: number | undefined; const result = await runWithOwnedSessionTranscriptWriteLock( { sessionFile, sessionKey: resolved.normalizedKey }, - async () => { + async (): Promise => { const explicitIdempotencyKey = params.idempotencyKey ?? ((params.message as { idempotencyKey?: unknown }).idempotencyKey as string | undefined); diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index b03d686424ef..333564cee131 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -760,6 +760,15 @@ describe("gateway agent handler", () => { startedAt: now - 20_000, endedAt: now - 15_000, runtimeMs: 5_000, + cliSessionBindings: { + "claude-cli": { sessionId: "old-claude-cli-session" }, + "codex-cli": { sessionId: "old-codex-cli-session" }, + }, + cliSessionIds: { + "claude-cli": "old-claude-cli-session", + "codex-cli": "old-codex-cli-session", + }, + claudeCliSessionId: "old-claude-cli-session", }, canonicalKey: "agent:main:main", }); @@ -776,11 +785,80 @@ describe("gateway agent handler", () => { expect(capturedEntry?.endedAt).toBeUndefined(); expect(capturedEntry?.runtimeMs).toBeUndefined(); expect(capturedEntry?.sessionFile).toBeUndefined(); + expect(capturedEntry?.cliSessionBindings).toBeUndefined(); + expect(capturedEntry?.cliSessionIds).toBeUndefined(); + expect(capturedEntry?.claudeCliSessionId).toBeUndefined(); }, ); }, ); + it("reuses terminal main sessions when the fresh store row has the transcript marker", async () => { + const now = Date.parse("2026-05-18T09:47:30.000Z"); + vi.useFakeTimers({ toFake: ["Date"] }); + dateOnlyFakeClockActive = true; + vi.setSystemTime(now); + + await withTempDir({ prefix: "openclaw-gateway-terminal-main-fresh-marker-" }, async (root) => { + const sessionsDir = `${root}/sessions`; + await fs.mkdir(sessionsDir, { recursive: true }); + const sessionFile = "terminal-main-session.jsonl"; + const transcriptPath = `${sessionsDir}/${sessionFile}`; + await fs.writeFile( + transcriptPath, + `${JSON.stringify({ type: "session", id: "terminal-main-session" })}\n`, + "utf8", + ); + await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000)); + const staleEntry = { + sessionId: "terminal-main-session", + sessionFile, + status: "done", + updatedAt: now - 10_000, + cliSessionBindings: { + "claude-cli": { sessionId: "existing-claude-cli-session" }, + }, + cliSessionIds: { + "claude-cli": "existing-claude-cli-session", + }, + claudeCliSessionId: "existing-claude-cli-session", + }; + mocks.loadSessionEntry.mockReturnValue({ + cfg: {}, + storePath: `${sessionsDir}/sessions.json`, + entry: staleEntry, + canonicalKey: "agent:main:main", + }); + let capturedEntry: Record | undefined; + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + const store = { + "agent:main:main": { + ...staleEntry, + updatedAt: now, + }, + }; + const result = await updater(store); + capturedEntry = result as Record; + return result; + }); + mocks.agentCommand.mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { durationMs: 100 }, + }); + + await runMainAgent("hi", "test-idem-terminal-main-fresh-marker"); + + const call = await waitForAgentCommandCall<{ sessionId?: string }>(); + expect(call.sessionId).toBe("terminal-main-session"); + expect(capturedEntry?.sessionId).toBe("terminal-main-session"); + expect(capturedEntry?.sessionFile).toBe(sessionFile); + expect(capturedEntry?.cliSessionIds).toEqual({ + "claude-cli": "existing-claude-cli-session", + }); + expect(capturedEntry?.claudeCliSessionId).toBe("existing-claude-cli-session"); + }); + }); + it("honors explicit gateway session-id resumes for terminal main rows", async () => { const now = Date.parse("2026-05-18T09:48:00.000Z"); vi.useFakeTimers({ toFake: ["Date"] }); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index abad4fa2a93b..c73928fe8980 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -30,6 +30,7 @@ import { consumeExecApprovalFollowupRuntimeHandoff, parseExecApprovalFollowupApprovalId, } from "../../agents/bash-tools.exec-approval-followup-state.js"; +import { clearAllCliSessions } from "../../agents/cli-session.js"; import type { AgentCommandOpts } from "../../agents/command/types.js"; import { isTimeoutError } from "../../agents/failover-error.js"; import { @@ -51,7 +52,7 @@ import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; import { agentCommandFromIngress } from "../../commands/agent.js"; import { evaluateSessionFreshness, - hasTerminalMainSessionTranscriptNewerThanRegistry, + hasTerminalMainSessionTranscriptNewerThanRegistrySync, mergeSessionEntry, resolveTerminalMainSessionTranscriptRegistryCheck, resolveChannelResetConfig, @@ -1636,7 +1637,7 @@ export const agentHandlers: GatewayRequestHandlers = { agentId: canonicalSessionAgentId, }) : undefined; - const freshness = entry + let freshness = entry ? evaluateSessionFreshness({ updatedAt: entry.updatedAt, ...lifecycleTimestamps, @@ -1644,20 +1645,25 @@ export const agentHandlers: GatewayRequestHandlers = { policy: resetPolicy, }) : undefined; - let failedSessionTranscriptMissing = false; - if (entry?.status === "failed" && entry.sessionId?.trim()) { + const resolveFailedSessionTranscriptMissingForEntry = ( + candidateEntry: SessionEntry | undefined, + ) => { + if (candidateEntry?.status !== "failed" || !candidateEntry.sessionId?.trim()) { + return false; + } try { const sessionPathOpts = resolveSessionFilePathOptions({ storePath, agentId: canonicalSessionAgentId, }); - failedSessionTranscriptMissing = !existsSync( - resolveSessionFilePath(entry.sessionId, entry, sessionPathOpts), + return !existsSync( + resolveSessionFilePath(candidateEntry.sessionId, candidateEntry, sessionPathOpts), ); } catch { - failedSessionTranscriptMissing = true; + return true; } - } + }; + const failedSessionTranscriptMissing = resolveFailedSessionTranscriptMissingForEntry(entry); const mainSessionKeyForRequest = resolveAgentMainSessionKey({ cfg: cfgLocal, agentId: canonicalSessionAgentId, @@ -1680,7 +1686,7 @@ export const agentHandlers: GatewayRequestHandlers = { storePath, }); const terminalMainTranscriptNewerThanRegistry = terminalMainTranscriptCheck - ? await hasTerminalMainSessionTranscriptNewerThanRegistry({ + ? hasTerminalMainSessionTranscriptNewerThanRegistrySync({ entry, sessionScope: cfgLocal.session?.scope, sessionKey: canonicalKey, @@ -1694,7 +1700,7 @@ export const agentHandlers: GatewayRequestHandlers = { (freshness?.fresh ?? false) && !failedSessionTranscriptMissing && !terminalMainTranscriptNewerThanRegistry; - const usableRequestedSessionId = + let usableRequestedSessionId = requestedSessionId && (!entry?.sessionId || canReuseSession) ? requestedSessionId : undefined; @@ -1705,7 +1711,7 @@ export const agentHandlers: GatewayRequestHandlers = { !entry || (!canReuseSession && !usableRequestedSessionId) || Boolean(usableRequestedSessionId && entry?.sessionId !== usableRequestedSessionId); - const rotatedSessionId = Boolean(entry?.sessionId && entry.sessionId !== sessionId); + let rotatedSessionId = Boolean(entry?.sessionId && entry.sessionId !== sessionId); const touchInteraction = !isSystemGatewayRun && !request.internalEvents?.length; const sessionAgent = canonicalSessionAgentId; type AgentSessionPatchBuild = { @@ -1715,6 +1721,10 @@ export const agentHandlers: GatewayRequestHandlers = { groupChannel: string | undefined; groupSpace: string | undefined; freshSessionRotatedSinceLoad: boolean; + isNewSession: boolean; + rotatedSessionId: boolean; + usableRequestedSessionId: string | undefined; + freshness: typeof freshness; }; const requestDeliveryHint = normalizeDeliveryContext({ channel: request.channel?.trim(), @@ -1807,12 +1817,69 @@ export const agentHandlers: GatewayRequestHandlers = { const freshSessionRotatedSinceLoad = Boolean( entry?.sessionId && freshEntry?.sessionId && freshEntry.sessionId !== entry.sessionId, ); - const patchSessionId = freshSessionRotatedSinceLoad ? freshEntry?.sessionId : sessionId; - const shouldClearRotatedState = rotatedSessionId && !freshSessionRotatedSinceLoad; + const freshLifecycleTimestamps = freshEntry + ? resolveSessionLifecycleTimestamps({ + entry: freshEntry, + storePath, + agentId: sessionAgent, + }) + : undefined; + const freshFreshness = freshEntry + ? evaluateSessionFreshness({ + updatedAt: freshEntry.updatedAt, + ...freshLifecycleTimestamps, + now, + policy: resetPolicy, + }) + : undefined; + const freshRequestedSessionMatchesEntry = Boolean( + requestedSessionId && freshEntry?.sessionId?.trim() === requestedSessionId, + ); + const freshTerminalMainTranscriptNewerThanRegistry = + isSystemGatewayRun || freshRequestedSessionMatchesEntry + ? false + : hasTerminalMainSessionTranscriptNewerThanRegistrySync({ + entry: freshEntry, + sessionScope: cfgLocal.session?.scope, + sessionKey: canonicalKey, + agentId: sessionAgent, + mainKey: cfgLocal.session?.mainKey, + storePath, + }); + const freshFailedSessionTranscriptMissing = + resolveFailedSessionTranscriptMissingForEntry(freshEntry); + const freshCanReuseSession = + Boolean(freshEntry?.sessionId) && + (freshFreshness?.fresh ?? false) && + !freshFailedSessionTranscriptMissing && + !freshTerminalMainTranscriptNewerThanRegistry; + const freshUsableRequestedSessionId = + requestedSessionId && (!freshEntry?.sessionId || freshCanReuseSession) + ? requestedSessionId + : undefined; + const freshSessionId = freshUsableRequestedSessionId + ? freshUsableRequestedSessionId + : ((freshCanReuseSession ? freshEntry?.sessionId : undefined) ?? sessionId); + const freshIsNewSession = + !freshEntry || + (!freshCanReuseSession && !freshUsableRequestedSessionId) || + Boolean( + freshUsableRequestedSessionId && + freshEntry?.sessionId !== freshUsableRequestedSessionId, + ); + const freshRotatedSessionId = Boolean( + freshEntry?.sessionId && freshEntry.sessionId !== freshSessionId, + ); + const patchSessionId = freshSessionRotatedSinceLoad + ? freshEntry?.sessionId + : freshSessionId; + const shouldClearRotatedState = freshRotatedSessionId && !freshSessionRotatedSinceLoad; const patch: Partial = { sessionId: patchSessionId, updatedAt: now, - ...(isNewSession && !freshSessionRotatedSinceLoad ? { sessionStartedAt: now } : {}), + ...(freshIsNewSession && !freshSessionRotatedSinceLoad + ? { sessionStartedAt: now } + : {}), ...(touchInteraction ? { lastInteractionAt: now } : {}), ...(effectiveDeliveryFields.route ? { route: effectiveDeliveryFields.route } : {}), ...(effectiveDeliveryFields.deliveryContext @@ -1846,6 +1913,9 @@ export const agentHandlers: GatewayRequestHandlers = { } : {}), }; + if (shouldClearRotatedState) { + clearAllCliSessions(patch); + } return { patch, spawnedBy: freshSpawnedBy, @@ -1853,9 +1923,17 @@ export const agentHandlers: GatewayRequestHandlers = { groupChannel: nextGroup.groupChannel, groupSpace: nextGroup.groupSpace, freshSessionRotatedSinceLoad, + isNewSession: freshIsNewSession, + rotatedSessionId: freshRotatedSessionId, + usableRequestedSessionId: freshUsableRequestedSessionId, + freshness: freshFreshness, }; }; let patchBuild = buildSessionPatch(entry); + isNewSession = patchBuild.isNewSession; + rotatedSessionId = patchBuild.rotatedSessionId; + usableRequestedSessionId = patchBuild.usableRequestedSessionId; + freshness = patchBuild.freshness; sessionEntry = mergeSessionEntry(entry, patchBuild.patch); resolvedSessionId = sessionEntry?.sessionId ?? sessionId; const canonicalSessionKey = canonicalKey; @@ -1961,6 +2039,10 @@ export const agentHandlers: GatewayRequestHandlers = { return; } } + isNewSession = patchBuild.isNewSession; + rotatedSessionId = patchBuild.rotatedSessionId; + usableRequestedSessionId = patchBuild.usableRequestedSessionId; + freshness = patchBuild.freshness; spawnedByValue = patchBuild.spawnedBy; resolvedGroupId = patchBuild.groupId; resolvedGroupChannel = patchBuild.groupChannel; diff --git a/src/gateway/server-methods/chat.directive-tags.test.ts b/src/gateway/server-methods/chat.directive-tags.test.ts index 364453c95acf..642f637b5ba1 100644 --- a/src/gateway/server-methods/chat.directive-tags.test.ts +++ b/src/gateway/server-methods/chat.directive-tags.test.ts @@ -1402,6 +1402,21 @@ describe("chat directive tag stripping for non-streaming final payloads", () => fs.writeFileSync(savedImagePath, Buffer.from(TINY_PNG_BASE64, "base64")); mockState.savedMediaResults = [{ path: savedImagePath, contentType: "image/png" }]; const mirrorIdempotencyKey = "idem-agent-source-reply-media:internal-source-reply:0"; + const updatedAt = Date.parse("2026-05-18T11:00:00.000Z"); + const rewrittenAt = Date.parse("2026-05-18T11:05:00.000Z"); + const storePath = path.join(path.dirname(mockState.transcriptPath), "sessions.json"); + fs.writeFileSync( + storePath, + JSON.stringify({ + main: { + sessionId: mockState.sessionId, + sessionFile: mockState.transcriptPath, + updatedAt, + status: "done", + }, + }), + "utf-8", + ); await appendSourceReplyMirrorEntry({ idempotencyKey: mirrorIdempotencyKey, text: "Codex source reply with media", @@ -1430,34 +1445,47 @@ describe("chat directive tag stripping for non-streaming final payloads", () => const respond = vi.fn(); const context = createChatContext(); - const broadcast = await runNonStreamingChatSend({ - context, - respond, - idempotencyKey: "idem-agent-source-reply-media", - message: "hello from codex", - }); + vi.useFakeTimers({ toFake: ["Date"] }); + vi.setSystemTime(rewrittenAt); + try { + const broadcast = await runNonStreamingChatSend({ + context, + respond, + idempotencyKey: "idem-agent-source-reply-media", + message: "hello from codex", + }); - expect(broadcast).toMatchObject({ - runId: "idem-agent-source-reply-media", - sessionKey: "main", - state: "final", - }); - expect(extractFirstTextBlock(broadcast)).toBe("Codex source reply with media"); - const broadcastContent = getMessageContent(broadcast); - expect(String(broadcastContent[1]?.url)).toContain("/api/chat/media/outgoing/"); - expect(String(broadcastContent[1]?.openUrl)).toContain("/api/chat/media/outgoing/"); - const assistantUpdates = mockState.emittedTranscriptUpdates.filter( - (update) => - typeof update.message === "object" && - update.message !== null && - (update.message as { role?: unknown }).role === "assistant", - ); - expect(assistantUpdates).toStrictEqual([]); - const assistantEntries = await readActiveAssistantTranscriptMessages(); - expect(assistantEntries).toHaveLength(1); - expect(assistantEntries[0]?.idempotencyKey).toBe(mirrorIdempotencyKey); - expect(JSON.stringify(assistantEntries[0])).toContain("/api/chat/media/outgoing/"); - expect(JSON.stringify(assistantEntries[0])).not.toContain(mediaUrl); + expect(broadcast).toMatchObject({ + runId: "idem-agent-source-reply-media", + sessionKey: "main", + state: "final", + }); + expect(extractFirstTextBlock(broadcast)).toBe("Codex source reply with media"); + const broadcastContent = getMessageContent(broadcast); + expect(String(broadcastContent[1]?.url)).toContain("/api/chat/media/outgoing/"); + expect(String(broadcastContent[1]?.openUrl)).toContain("/api/chat/media/outgoing/"); + const assistantUpdates = mockState.emittedTranscriptUpdates.filter( + (update) => + typeof update.message === "object" && + update.message !== null && + (update.message as { role?: unknown }).role === "assistant", + ); + expect(assistantUpdates).toStrictEqual([]); + const assistantEntries = await readActiveAssistantTranscriptMessages(); + expect(assistantEntries).toHaveLength(1); + expect(assistantEntries[0]?.idempotencyKey).toBe(mirrorIdempotencyKey); + expect(JSON.stringify(assistantEntries[0])).toContain("/api/chat/media/outgoing/"); + expect(JSON.stringify(assistantEntries[0])).not.toContain(mediaUrl); + const store = JSON.parse(fs.readFileSync(storePath, "utf-8")) as Record< + string, + { updatedAt?: number; status?: string } + >; + expect(store.main?.updatedAt).toBeGreaterThanOrEqual(rewrittenAt); + expect(store.main?.updatedAt).toBeGreaterThan(updatedAt); + expect(store.main?.status).toBe("done"); + } finally { + vi.useRealTimers(); + } }, ); }); diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index c07e8932778e..fff7848f42fa 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -1643,18 +1643,34 @@ async function appendAssistantTranscriptMessage(params: { ttsSupplement: params.ttsSupplement, config: params.cfg, }); - if (appended.ok && params.storePath) { - const transcriptMarkerUpdatedAt = Date.now(); - await updateSessionStoreEntry({ + if (appended.ok) { + await advanceSessionTranscriptMarker({ storePath: params.storePath, sessionKey: params.sessionKey, - update: (current) => - current.sessionId === params.sessionId ? { updatedAt: transcriptMarkerUpdatedAt } : null, + sessionId: params.sessionId, }); } return appended; } +async function advanceSessionTranscriptMarker(params: { + storePath: string | undefined; + sessionKey: string; + sessionId: string; +}): Promise { + if (!params.storePath) { + return; + } + + const transcriptMarkerUpdatedAt = Date.now(); + await updateSessionStoreEntry({ + storePath: params.storePath, + sessionKey: params.sessionKey, + update: (current) => + current.sessionId === params.sessionId ? { updatedAt: transcriptMarkerUpdatedAt } : null, + }); +} + function collectSessionAbortPartials(params: { chatAbortControllers: Map; chatRunBuffers: Map; @@ -4114,6 +4130,11 @@ export const chatHandlers: GatewayRequestHandlers = { }, }); if (result.changed) { + await advanceSessionTranscriptMarker({ + storePath: latestStorePath, + sessionKey, + sessionId, + }); for (const target of rewriteTargets) { const rewritten = await findSourceReplyTranscriptMirrorByIdempotencyKey(