From 57bed6ae0c4b96ad843d57274a79ceeef7c7146c Mon Sep 17 00:00:00 2001 From: Fermin Quant <14808645+ferminquant@users.noreply.github.com> Date: Fri, 5 Jun 2026 02:40:06 -0400 Subject: [PATCH] fix(sessions): cover terminal transcript markers --- .../src/bot-message-dispatch.runtime.ts | 1 + .../telegram/src/bot-message-dispatch.test.ts | 33 ++++- .../telegram/src/bot-message-dispatch.ts | 18 ++- src/agents/command/attempt-execution.ts | 1 + src/agents/command/session.ts | 28 ++-- src/auto-reply/reply/session.test.ts | 14 +- src/commands/agent.session.test.ts | 86 +++++++++++- src/config/sessions/lifecycle.ts | 5 +- src/config/sessions/transcript.test.ts | 85 ++++++++++++ src/config/sessions/transcript.ts | 15 ++- src/gateway/server-methods/agent.test.ts | 124 ++++++++++++++---- src/gateway/server-methods/agent.ts | 24 ++-- .../chat.directive-tags.test.ts | 48 +++++++ src/gateway/server-methods/chat.ts | 14 +- 14 files changed, 438 insertions(+), 58 deletions(-) diff --git a/extensions/telegram/src/bot-message-dispatch.runtime.ts b/extensions/telegram/src/bot-message-dispatch.runtime.ts index 4b952d4058bc..c844d1398505 100644 --- a/extensions/telegram/src/bot-message-dispatch.runtime.ts +++ b/extensions/telegram/src/bot-message-dispatch.runtime.ts @@ -4,6 +4,7 @@ export { readLatestAssistantTextFromSessionTranscript, resolveAndPersistSessionFile, resolveSessionStoreEntry, + updateSessionStoreEntry, } from "openclaw/plugin-sdk/session-store-runtime"; export { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime"; export { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime"; diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 5306e7aeea08..a758f2360bd3 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -70,7 +70,11 @@ const createChannelMessageReplyPipeline = vi.hoisted(() => ); const wasSentByBot = vi.hoisted(() => vi.fn(() => false)); const appendSessionTranscriptMessage = vi.hoisted(() => - vi.fn(async (_params: { message?: unknown }) => ({ messageId: "m1" })), + vi.fn(async ({ message }: { message?: unknown }) => ({ + messageId: "m1", + message, + appended: true, + })), ); const emitSessionTranscriptUpdate = vi.hoisted(() => vi.fn()); const loadSessionStore = vi.hoisted(() => vi.fn()); @@ -101,6 +105,7 @@ const resolveSessionStoreEntry = vi.hoisted(() => existing: store[sessionKey], })), ); +const updateSessionStoreEntry = vi.hoisted(() => vi.fn(async () => null)); vi.mock("./draft-stream.js", () => ({ createTelegramDraftStream, @@ -155,6 +160,7 @@ vi.mock("./bot-message-dispatch.runtime.js", () => ({ resolveMarkdownTableMode, resolveSessionStoreEntry, resolveStorePath, + updateSessionStoreEntry, })); vi.mock("./bot-message-dispatch.agent.runtime.js", () => ({ @@ -268,11 +274,13 @@ describe("dispatchTelegramMessage draft streaming", () => { loadSessionStore.mockReset(); resolveStorePath.mockReset(); resolveAndPersistSessionFile.mockReset(); + updateSessionStoreEntry.mockReset(); generateTopicLabel.mockReset(); getAgentScopedMediaLocalRoots.mockClear(); resolveChunkMode.mockClear(); resolveMarkdownTableMode.mockClear(); resolveSessionStoreEntry.mockClear(); + updateSessionStoreEntry.mockClear(); describeStickerImage.mockReset(); loadModelCatalog.mockReset(); findModelInCatalog.mockReset(); @@ -1445,6 +1453,29 @@ describe("dispatchTelegramMessage draft streaming", () => { }); }); + it("advances the session marker after mirroring preview-finalized finals", async () => { + setupDraftStreams({ answerMessageId: 2001 }); + const context = createContext(); + context.ctxPayload.SessionKey = "agent:default:telegram:direct:123"; + loadSessionStore.mockReturnValue({ + "agent:default:telegram:direct:123": { sessionId: "s1" }, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" }); + return { queuedFinal: true }; + }); + + await dispatchWithContext({ context }); + + const markerUpdateCall = expectRecordFields(mockCallArg(updateSessionStoreEntry), { + storePath: "/tmp/sessions.json", + sessionKey: "agent:default:telegram:direct:123", + }); + const update = markerUpdateCall.update as (entry: { sessionId?: string }) => unknown; + expect(update({ sessionId: "s1" })).toEqual({ updatedAt: expect.any(Number) }); + expect(update({ sessionId: "new-session" })).toBeNull(); + }); + it("does not mirror non-final tool progress into the session transcript", async () => { const context = createContext(); context.ctxPayload.SessionKey = "agent:default:telegram:direct:123"; diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 9d1bf99da2b4..f14aba4876ee 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -79,6 +79,7 @@ import { resolveMarkdownTableMode, resolveAndPersistSessionFile, resolveSessionStoreEntry, + updateSessionStoreEntry, } from "./bot-message-dispatch.runtime.js"; import type { TelegramBotOptions } from "./bot.types.js"; import { deliverReplies, emitInternalMessageSentHook } from "./bot/delivery.js"; @@ -373,11 +374,26 @@ async function mirrorTelegramAssistantReplyToTranscript(params: { stopReason: "stop" as const, timestamp: Date.now(), }; - const { messageId, message: appendedMessage } = await appendSessionTranscriptMessage({ + const { + appended, + messageId, + message: appendedMessage, + } = await appendSessionTranscriptMessage({ transcriptPath: sessionFile, message, config: params.cfg, }); + if (appended) { + const transcriptMarkerUpdatedAt = Date.now(); + await updateSessionStoreEntry({ + storePath, + sessionKey: params.sessionKey, + update: (current) => + current.sessionId === sessionEntry.sessionId + ? { updatedAt: transcriptMarkerUpdatedAt } + : null, + }); + } emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey, diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index 412a8238d5a9..93ee8379bd0b 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -48,6 +48,7 @@ import { claudeCliSessionTranscriptHasContent, resolveFallbackRetryPrompt, } from "./attempt-execution.helpers.js"; +import { persistSessionEntry } from "./attempt-execution.shared.js"; import { resolveAgentRunContext } from "./run-context.js"; import { clearCliSessionInStore } from "./session-store.js"; import type { AgentCommandOpts } from "./types.js"; diff --git a/src/agents/command/session.ts b/src/agents/command/session.ts index a2d41900b64f..d15bc4f42897 100644 --- a/src/agents/command/session.ts +++ b/src/agents/command/session.ts @@ -70,6 +70,8 @@ function clearRotatedTerminalMainSessionMetadata( endedAt: undefined, runtimeMs: undefined, abortedLastRun: undefined, + sessionStartedAt: undefined, + lastInteractionAt: undefined, }; } @@ -362,16 +364,18 @@ export function resolveSession(opts: { resetType, resetOverride: channelReset, }); - const terminalMainTranscriptNewerThanRegistry = sessionEntry - ? hasTerminalMainSessionTranscriptNewerThanRegistrySync({ - entry: sessionEntry, - sessionScope: sessionCfg?.scope, - sessionKey, - agentId: sessionAgentId, - mainKey: sessionCfg?.mainKey, - storePath, - }) - : false; + const requestedSessionId = opts.sessionId?.trim() || undefined; + const terminalMainTranscriptNewerThanRegistry = + sessionEntry && !requestedSessionId + ? hasTerminalMainSessionTranscriptNewerThanRegistrySync({ + entry: sessionEntry, + sessionScope: sessionCfg?.scope, + sessionKey, + agentId: sessionAgentId, + mainKey: sessionCfg?.mainKey, + storePath, + }) + : false; const fresh = sessionEntry ? !terminalMainTranscriptNewerThanRegistry && evaluateSessionFreshness({ @@ -386,8 +390,8 @@ export function resolveSession(opts: { }).fresh : false; const sessionId = - opts.sessionId?.trim() || (fresh ? sessionEntry?.sessionId : undefined) || crypto.randomUUID(); - const isNewSession = !fresh && !opts.sessionId; + requestedSessionId || (fresh ? sessionEntry?.sessionId : undefined) || crypto.randomUUID(); + const isNewSession = !fresh && !requestedSessionId; const resolvedSessionEntry = terminalMainTranscriptNewerThanRegistry ? clearRotatedTerminalMainSessionMetadata(sessionEntry) : sessionEntry; diff --git a/src/auto-reply/reply/session.test.ts b/src/auto-reply/reply/session.test.ts index b321b80c45ba..0445a54585f8 100644 --- a/src/auto-reply/reply/session.test.ts +++ b/src/auto-reply/reply/session.test.ts @@ -186,6 +186,7 @@ async function writeTerminalTranscriptSessionStore(params: { sessionKey: string; sessionId: string; status?: SessionEntry["status"]; + omitStatus?: boolean; updatedAt: number; endedAt: number; transcriptMtimeMs: number; @@ -198,6 +199,7 @@ async function writeTerminalTranscriptSessionStore(params: { "utf-8", ); await fs.utimes(transcriptPath, params.transcriptMtimeMs / 1000, params.transcriptMtimeMs / 1000); + const status = params.status ?? (params.omitStatus ? undefined : "done"); await writeSessionStoreFast(params.storePath, { [params.sessionKey]: { sessionId: params.sessionId, @@ -206,7 +208,7 @@ async function writeTerminalTranscriptSessionStore(params: { startedAt: params.endedAt - 10_000, endedAt: params.endedAt, runtimeMs: 9_000, - status: params.status ?? "done", + ...(status ? { status } : {}), }, }); } @@ -1641,6 +1643,15 @@ describe("initSessionState reset policy", () => { transcriptMtimeOffsetMs: 0, expectNewSession: true, }, + { + name: "main endedAt-only rows rotate when transcript is newer than updatedAt", + sessionKey: "agent:main:main", + updatedAtOffsetMs: -10_000, + endedAtOffsetMs: -11_000, + transcriptMtimeOffsetMs: 0, + omitStatus: true, + expectNewSession: true, + }, { name: "failed main terminal rows reuse when the transcript exists", sessionKey: "agent:main:main", @@ -1687,6 +1698,7 @@ describe("initSessionState reset policy", () => { sessionKey: scenario.sessionKey, sessionId: existingSessionId, status: scenario.status, + omitStatus: scenario.omitStatus, updatedAt: terminalUpdatedAt, endedAt: terminalEndedAt, transcriptMtimeMs: now + scenario.transcriptMtimeOffsetMs, diff --git a/src/commands/agent.session.test.ts b/src/commands/agent.session.test.ts index 2c6ac9c12318..dcf4b049682c 100644 --- a/src/commands/agent.session.test.ts +++ b/src/commands/agent.session.test.ts @@ -151,9 +151,20 @@ describe("agent session resolution", () => { it("rotates stale terminal main sessions whose transcript is newer than the registry", async () => { const scenarios = [ - { label: "canonical main", mainKey: "main", sessionKey: "agent:main:main" }, - { label: "raw main alias", mainKey: "main", sessionKey: "main" }, - { label: "custom main alias", mainKey: "work", sessionKey: "agent:main:main" }, + { + label: "canonical main", + mainKey: "main", + sessionKey: "agent:main:main", + status: "done" as const, + }, + { label: "raw main alias", mainKey: "main", sessionKey: "main", status: "done" as const }, + { + label: "custom main alias", + mainKey: "work", + sessionKey: "agent:main:main", + status: "done" as const, + }, + { label: "endedAt-only main", mainKey: "main", sessionKey: "agent:main:main" }, ]; for (const scenario of scenarios) { await withTempHome(async (home) => { @@ -173,7 +184,9 @@ describe("agent session resolution", () => { sessionId, sessionFile, updatedAt: registryUpdatedAt, - status: "done", + ...(scenario.status ? { status: scenario.status } : {}), + sessionStartedAt: registryUpdatedAt - 60_000, + lastInteractionAt: registryUpdatedAt - 30_000, startedAt: registryUpdatedAt - 1_000, endedAt: registryUpdatedAt - 100, }, @@ -190,6 +203,8 @@ describe("agent session resolution", () => { expect(resolution.sessionEntry?.startedAt).toBeUndefined(); expect(resolution.sessionEntry?.endedAt).toBeUndefined(); expect(resolution.sessionEntry?.runtimeMs).toBeUndefined(); + expect(resolution.sessionEntry?.sessionStartedAt).toBeUndefined(); + expect(resolution.sessionEntry?.lastInteractionAt).toBeUndefined(); const sessionStore = { [scenario.sessionKey]: resolution.sessionEntry!, @@ -230,10 +245,73 @@ describe("agent session resolution", () => { expect(persisted?.startedAt).toBeUndefined(); expect(persisted?.endedAt).toBeUndefined(); expect(persisted?.runtimeMs).toBeUndefined(); + expect(persisted?.sessionStartedAt).toBeGreaterThan(registryUpdatedAt); + expect(persisted?.lastInteractionAt).toBeGreaterThan(registryUpdatedAt); }); } }); + it("preserves explicit session-id resumes for stale terminal main rows", async () => { + await withTempHome(async (home) => { + const store = path.join(home, "sessions.json"); + const sessionFile = path.join(home, "explicit-terminal-main.jsonl"); + const sessionId = "explicit-terminal-main"; + const registryUpdatedAt = Date.now() - 10_000; + fs.writeFileSync(sessionFile, JSON.stringify({ type: "session", id: sessionId }) + "\n"); + fs.utimesSync( + sessionFile, + (registryUpdatedAt + 5_000) / 1000, + (registryUpdatedAt + 5_000) / 1000, + ); + writeSessionStoreSeed(store, { + "agent:main:main": { + sessionId, + sessionFile, + updatedAt: registryUpdatedAt, + status: "done", + startedAt: registryUpdatedAt - 1_000, + endedAt: registryUpdatedAt - 100, + runtimeMs: 900, + }, + }); + const cfg = mockConfig(home, store); + + const resolution = resolveSession({ cfg, sessionId }); + + expect(resolution.sessionKey).toBe("agent:main:main"); + expect(resolution.sessionId).toBe(sessionId); + expect(resolution.isNewSession).toBe(false); + expect(resolution.sessionEntry?.sessionFile).toBe(sessionFile); + expect(resolution.sessionEntry?.status).toBe("done"); + expect(resolution.sessionEntry?.startedAt).toBe(registryUpdatedAt - 1_000); + expect(resolution.sessionEntry?.endedAt).toBe(registryUpdatedAt - 100); + expect(resolution.sessionEntry?.runtimeMs).toBe(900); + + if (!resolution.sessionKey || !resolution.sessionStore) { + throw new Error("expected resolved explicit session store"); + } + const resolvedTranscript = await resolveSessionTranscriptFile({ + sessionId: resolution.sessionId, + sessionKey: resolution.sessionKey, + sessionEntry: resolution.sessionEntry, + sessionStore: resolution.sessionStore, + storePath: resolution.storePath, + agentId: "main", + }); + expect(resolvedTranscript.sessionFile).toBe(sessionFile); + + const persisted = loadSessionStore(resolution.storePath, { skipCache: true })[ + resolution.sessionKey + ]; + expect(persisted?.sessionId).toBe(sessionId); + expect(persisted?.sessionFile).toBe(sessionFile); + expect(persisted?.status).toBe("done"); + expect(persisted?.startedAt).toBe(registryUpdatedAt - 1_000); + expect(persisted?.endedAt).toBe(registryUpdatedAt - 100); + expect(persisted?.runtimeMs).toBe(900); + }); + }); + it("forwards resolved outbound session context when resuming by sessionId", async () => { await withCrossAgentResumeFixture(async ({ sessionId, sessionKey, cfg }) => { const resolution = resolveSession({ cfg, sessionId }); diff --git a/src/config/sessions/lifecycle.ts b/src/config/sessions/lifecycle.ts index c7ada51fed0f..cf7fb07a7b94 100644 --- a/src/config/sessions/lifecycle.ts +++ b/src/config/sessions/lifecycle.ts @@ -159,7 +159,10 @@ export function resolveTerminalMainSessionTranscriptRegistryCheck( if (candidateSessionKey !== configuredMainSessionKey) { return undefined; } - if (!isTerminalSessionStatus(params.entry.status)) { + const hasTerminalLifecycle = + isTerminalSessionStatus(params.entry.status) || + resolvePositiveTimestamp(params.entry.endedAt) !== undefined; + if (!hasTerminalLifecycle) { return undefined; } if (params.entry.status === "failed") { diff --git a/src/config/sessions/transcript.test.ts b/src/config/sessions/transcript.test.ts index 4504add70010..28f27bcd45c3 100644 --- a/src/config/sessions/transcript.test.ts +++ b/src/config/sessions/transcript.test.ts @@ -138,6 +138,91 @@ describe("appendAssistantMessageToSessionTranscript", () => { } }); + it("advances the session registry marker after managed transcript appends", async () => { + const updatedAt = Date.parse("2026-05-18T09:00:00.000Z"); + const appendedAt = Date.parse("2026-05-18T09:05:00.000Z"); + const sessionFile = "managed-marker.jsonl"; + fs.writeFileSync( + fixture.storePath(), + JSON.stringify({ + [sessionKey]: { + sessionId, + sessionFile, + updatedAt, + status: "done", + }, + }), + "utf-8", + ); + vi.useFakeTimers({ toFake: ["Date"] }); + vi.setSystemTime(appendedAt); + try { + const result = await appendAssistantMessageToSessionTranscript({ + sessionKey, + text: "Hello with registry marker", + storePath: fixture.storePath(), + }); + + expect(result.ok).toBe(true); + const store = JSON.parse(fs.readFileSync(fixture.storePath(), "utf-8")) as Record< + string, + { updatedAt?: number; status?: string } + >; + expect(store[sessionKey]?.updatedAt).toBe(appendedAt); + expect(store[sessionKey]?.status).toBe("done"); + } finally { + vi.useRealTimers(); + } + }); + + it("does not advance the registry marker for duplicate delivery mirror replays", async () => { + const updatedAt = Date.parse("2026-05-18T10:00:00.000Z"); + const firstAppendAt = Date.parse("2026-05-18T10:05:00.000Z"); + const duplicateReplayAt = Date.parse("2026-05-18T10:10:00.000Z"); + const sessionFile = "duplicate-marker.jsonl"; + fs.writeFileSync( + fixture.storePath(), + JSON.stringify({ + [sessionKey]: { + sessionId, + sessionFile, + updatedAt, + status: "done", + }, + }), + "utf-8", + ); + vi.useFakeTimers({ toFake: ["Date"] }); + try { + vi.setSystemTime(firstAppendAt); + const first = await appendAssistantMessageToSessionTranscript({ + sessionKey, + text: "Replay-safe marker", + storePath: fixture.storePath(), + }); + expect(first.ok).toBe(true); + + vi.setSystemTime(duplicateReplayAt); + const duplicate = await appendAssistantMessageToSessionTranscript({ + sessionKey, + text: "Replay-safe marker", + storePath: fixture.storePath(), + }); + expect(duplicate.ok).toBe(true); + + const store = JSON.parse(fs.readFileSync(fixture.storePath(), "utf-8")) as Record< + string, + { updatedAt?: number } + >; + expect(store[sessionKey]?.updatedAt).toBe(firstAppendAt); + if (first.ok && duplicate.ok) { + expect(duplicate.messageId).toBe(first.messageId); + } + } finally { + vi.useRealTimers(); + } + }); + it("uses spawned cwd when creating a missing transcript header", async () => { const taskCwd = path.join(fixture.sessionsDir(), "task-repo"); fs.mkdirSync(taskCwd, { recursive: true }); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index ae5e1f9bc1a2..c00a0e36b760 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -15,7 +15,7 @@ import { resolveSessionTranscriptPath, } from "./paths.js"; import { resolveAndPersistSessionFile } from "./session-file.js"; -import { loadSessionStore, resolveSessionStoreEntry } from "./store.js"; +import { loadSessionStore, resolveSessionStoreEntry, updateSessionStoreEntry } from "./store.js"; import { parseSessionThreadInfo } from "./thread-info.js"; import { appendSessionTranscriptMessage } from "./transcript-append.js"; import { createSessionTranscriptHeader } from "./transcript-header.js"; @@ -299,7 +299,8 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { }; } - return await runWithOwnedSessionTranscriptWriteLock( + let transcriptMarkerUpdatedAt: number | undefined; + const result = await runWithOwnedSessionTranscriptWriteLock( { sessionFile, sessionKey: resolved.normalizedKey }, async () => { const explicitIdempotencyKey = @@ -340,6 +341,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { if (!appended) { return { ok: true, sessionFile, messageId }; } + transcriptMarkerUpdatedAt = Date.now(); switch (params.updateMode ?? "inline") { case "inline": @@ -364,6 +366,15 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { return { ok: true, sessionFile, messageId }; }, ); + if (result.ok && transcriptMarkerUpdatedAt !== undefined) { + await updateSessionStoreEntry({ + storePath, + sessionKey: resolved.normalizedKey, + update: (current) => + current.sessionId === entry.sessionId ? { updatedAt: transcriptMarkerUpdatedAt } : null, + }); + } + return result; } function isRedundantDeliveryMirror(message: SessionTranscriptAssistantMessage): boolean { diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 119975d51822..b03d686424ef 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -723,14 +723,72 @@ describe("gateway agent handler", () => { expect(capturedEntry?.sessionFile).toBeUndefined(); }); - it("rotates a terminal main session when its transcript is newer than the registry row", async () => { - const now = Date.parse("2026-05-18T09:47:00.000Z"); + it.each([ + { name: "status terminal row", status: "done" as const }, + { name: "endedAt-only terminal row" }, + ])( + "rotates a terminal main session from a $name when its transcript is newer", + async (scenario) => { + const now = Date.parse("2026-05-18T09:47:00.000Z"); + vi.useFakeTimers({ toFake: ["Date"] }); + dateOnlyFakeClockActive = true; + vi.setSystemTime(now); + + await withTempDir( + { prefix: "openclaw-gateway-terminal-main-newer-transcript-" }, + async (root) => { + const sessionsDir = `${root}/sessions`; + await fs.mkdir(sessionsDir, { recursive: true }); + const sessionFile = "terminal-main-session.jsonl"; + const transcriptPath = `${sessionsDir}/${sessionFile}`; + await fs.writeFile( + transcriptPath, + `${JSON.stringify({ type: "session", id: "terminal-main-session" })}\n`, + "utf8", + ); + await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000)); + mocks.loadSessionEntry.mockReturnValue({ + cfg: {}, + storePath: `${sessionsDir}/sessions.json`, + entry: { + sessionId: "terminal-main-session", + sessionFile, + ...(scenario.status ? { status: scenario.status } : {}), + updatedAt: now - 10_000, + sessionStartedAt: now - 60_000, + lastInteractionAt: now - 10_000, + startedAt: now - 20_000, + endedAt: now - 15_000, + runtimeMs: 5_000, + }, + canonicalKey: "agent:main:main", + }); + + const capturedEntry = await runMainAgentAndCaptureEntry( + "test-idem-terminal-main-newer-transcript", + ); + + const call = await waitForAgentCommandCall<{ sessionId?: string }>(); + expect(call.sessionId).not.toBe("terminal-main-session"); + expect(capturedEntry?.sessionId).not.toBe("terminal-main-session"); + expect(capturedEntry?.status).toBeUndefined(); + expect(capturedEntry?.startedAt).toBeUndefined(); + expect(capturedEntry?.endedAt).toBeUndefined(); + expect(capturedEntry?.runtimeMs).toBeUndefined(); + expect(capturedEntry?.sessionFile).toBeUndefined(); + }, + ); + }, + ); + + it("honors explicit gateway session-id resumes for terminal main rows", async () => { + const now = Date.parse("2026-05-18T09:48:00.000Z"); vi.useFakeTimers({ toFake: ["Date"] }); dateOnlyFakeClockActive = true; vi.setSystemTime(now); await withTempDir( - { prefix: "openclaw-gateway-terminal-main-newer-transcript-" }, + { prefix: "openclaw-gateway-terminal-main-explicit-resume-" }, async (root) => { const sessionsDir = `${root}/sessions`; await fs.mkdir(sessionsDir, { recursive: true }); @@ -742,35 +800,53 @@ describe("gateway agent handler", () => { "utf8", ); await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000)); + const existingEntry = { + sessionId: "terminal-main-session", + sessionFile, + status: "done", + updatedAt: now - 10_000, + sessionStartedAt: now - 60_000, + lastInteractionAt: now - 10_000, + startedAt: now - 20_000, + endedAt: now - 15_000, + runtimeMs: 5_000, + }; mocks.loadSessionEntry.mockReturnValue({ cfg: {}, storePath: `${sessionsDir}/sessions.json`, - entry: { - sessionId: "terminal-main-session", - sessionFile, - status: "done", - updatedAt: now - 10_000, - sessionStartedAt: now - 60_000, - lastInteractionAt: now - 10_000, - startedAt: now - 20_000, - endedAt: now - 15_000, - runtimeMs: 5_000, - }, + entry: existingEntry, canonicalKey: "agent:main:main", }); + let capturedEntry: Record | undefined; + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + const store: Record = { + "agent:main:main": { ...existingEntry }, + }; + const result = await updater(store); + capturedEntry = result as Record; + return result; + }); + mocks.agentCommand.mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { durationMs: 100 }, + }); - const capturedEntry = await runMainAgentAndCaptureEntry( - "test-idem-terminal-main-newer-transcript", - ); + await invokeAgent({ + message: "resume terminal main", + agentId: "main", + sessionKey: "agent:main:main", + sessionId: "terminal-main-session", + idempotencyKey: "test-idem-terminal-main-explicit-resume", + } as AgentParams); const call = await waitForAgentCommandCall<{ sessionId?: string }>(); - expect(call.sessionId).not.toBe("terminal-main-session"); - expect(capturedEntry?.sessionId).not.toBe("terminal-main-session"); - expect(capturedEntry?.status).toBeUndefined(); - expect(capturedEntry?.startedAt).toBeUndefined(); - expect(capturedEntry?.endedAt).toBeUndefined(); - expect(capturedEntry?.runtimeMs).toBeUndefined(); - expect(capturedEntry?.sessionFile).toBeUndefined(); + expect(call.sessionId).toBe("terminal-main-session"); + expect(capturedEntry?.sessionId).toBe("terminal-main-session"); + expect(capturedEntry?.sessionFile).toBe(sessionFile); + expect(capturedEntry?.status).toBe("done"); + expect(capturedEntry?.startedAt).toBe(now - 20_000); + expect(capturedEntry?.endedAt).toBe(now - 15_000); + expect(capturedEntry?.runtimeMs).toBe(5_000); }, ); }); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index cb8eb1ac42e1..abad4fa2a93b 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -1665,16 +1665,20 @@ export const agentHandlers: GatewayRequestHandlers = { const isSystemGatewayRun = request.bootstrapContextRunKind === "cron" || request.bootstrapContextRunKind === "heartbeat"; - const terminalMainTranscriptCheck = isSystemGatewayRun - ? undefined - : resolveTerminalMainSessionTranscriptRegistryCheck({ - entry, - sessionScope: cfgLocal.session?.scope, - sessionKey: canonicalKey, - agentId: canonicalSessionAgentId, - mainKey: cfgLocal.session?.mainKey, - storePath, - }); + const requestedSessionMatchesEntry = Boolean( + requestedSessionId && entry?.sessionId?.trim() === requestedSessionId, + ); + const terminalMainTranscriptCheck = + isSystemGatewayRun || requestedSessionMatchesEntry + ? undefined + : resolveTerminalMainSessionTranscriptRegistryCheck({ + entry, + sessionScope: cfgLocal.session?.scope, + sessionKey: canonicalKey, + agentId: canonicalSessionAgentId, + mainKey: cfgLocal.session?.mainKey, + storePath, + }); const terminalMainTranscriptNewerThanRegistry = terminalMainTranscriptCheck ? await hasTerminalMainSessionTranscriptNewerThanRegistry({ entry, diff --git a/src/gateway/server-methods/chat.directive-tags.test.ts b/src/gateway/server-methods/chat.directive-tags.test.ts index 1888ba665c86..364453c95acf 100644 --- a/src/gateway/server-methods/chat.directive-tags.test.ts +++ b/src/gateway/server-methods/chat.directive-tags.test.ts @@ -2328,6 +2328,54 @@ describe("chat directive tag stripping for non-streaming final payloads", () => expect(nodeSend?.[2].sessionKey).toBe("agent:main:canon"); }); + it("chat.inject advances the session registry marker after transcript append", async () => { + const fixtureDir = createTranscriptFixture("openclaw-chat-inject-registry-marker-"); + const updatedAt = Date.parse("2026-05-18T11:00:00.000Z"); + const appendedAt = Date.parse("2026-05-18T11:05:00.000Z"); + const storePath = path.join(path.dirname(mockState.transcriptPath), "sessions.json"); + fs.writeFileSync( + storePath, + JSON.stringify({ + main: { + sessionId: mockState.sessionId, + sessionFile: mockState.transcriptPath, + updatedAt, + status: "done", + }, + }), + "utf-8", + ); + const respond = vi.fn(); + const context = createChatContext(); + vi.useFakeTimers({ toFake: ["Date"] }); + vi.setSystemTime(appendedAt); + try { + await chatHandlers["chat.inject"]({ + params: { + sessionKey: "main", + message: "hello with registry marker", + }, + respond, + req: {} as never, + client: null as never, + isWebchatConnect: () => false, + context: context as GatewayRequestContext, + }); + + const response = lastRespondCall(respond); + expect(response?.[0]).toBe(true); + const store = JSON.parse(fs.readFileSync(storePath, "utf-8")) as Record< + string, + { updatedAt?: number; status?: string } + >; + expect(store.main?.updatedAt).toBe(appendedAt); + expect(store.main?.status).toBe("done"); + } finally { + vi.useRealTimers(); + fs.rmSync(fixtureDir, { recursive: true, force: true }); + } + }); + it("chat.inject scopes selected-agent global sessions before appending", async () => { createTranscriptFixture("openclaw-chat-inject-selected-global-"); mockState.config = { diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 4a9387003c0b..c07e8932778e 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -48,7 +48,7 @@ import { getReplyPayloadMetadata, type ReplyPayload } from "../../auto-reply/rep import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; import { stageSandboxMedia } from "../../auto-reply/reply/stage-sandbox-media.js"; import type { MsgContext, TemplateContext } from "../../auto-reply/templating.js"; -import { resolveSessionFilePath } from "../../config/sessions.js"; +import { resolveSessionFilePath, updateSessionStoreEntry } from "../../config/sessions.js"; import { resolveMirroredTranscriptText } from "../../config/sessions/transcript-mirror.js"; import { CURRENT_SESSION_VERSION } from "../../config/sessions/version.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; @@ -1631,7 +1631,7 @@ async function appendAssistantTranscriptMessage(params: { } } - return await appendInjectedAssistantMessageToTranscript({ + const appended = await appendInjectedAssistantMessageToTranscript({ transcriptPath, sessionKey: params.sessionKey, ...(params.agentId ? { agentId: params.agentId } : {}), @@ -1643,6 +1643,16 @@ async function appendAssistantTranscriptMessage(params: { ttsSupplement: params.ttsSupplement, config: params.cfg, }); + if (appended.ok && params.storePath) { + const transcriptMarkerUpdatedAt = Date.now(); + await updateSessionStoreEntry({ + storePath: params.storePath, + sessionKey: params.sessionKey, + update: (current) => + current.sessionId === params.sessionId ? { updatedAt: transcriptMarkerUpdatedAt } : null, + }); + } + return appended; } function collectSessionAbortPartials(params: {