fix(sessions): mark transcript rewrites in registry

This commit is contained in:
Ayaan Zaidi
2026-06-05 17:03:11 +05:30
parent e22e857ddd
commit ceee4c6b01
8 changed files with 281 additions and 49 deletions

View File

@@ -138,8 +138,13 @@ export function clearCliSession(entry: SessionEntry, provider: string): void {
}
}
type MutableCliSessionFields = Pick<
SessionEntry,
"cliSessionBindings" | "cliSessionIds" | "claudeCliSessionId"
>;
/** Remove every CLI session binding from a session entry. */
export function clearAllCliSessions(entry: SessionEntry): void {
export function clearAllCliSessions(entry: Partial<MutableCliSessionFields>): void {
entry.cliSessionBindings = undefined;
entry.cliSessionIds = undefined;
entry.claudeCliSessionId = undefined;

View File

@@ -37,6 +37,7 @@ import {
import { resolveSessionIdMatchSelection } from "../../sessions/session-id-resolution.js";
import { listAgentIds, resolveDefaultAgentId } from "../agent-scope.js";
import { clearBootstrapSnapshotOnSessionRollover } from "../bootstrap-cache.js";
import { clearAllCliSessions } from "../cli-session.js";
/** Resolved command session identity plus backing store metadata. */
export type SessionResolution = {
@@ -62,7 +63,7 @@ function clearRotatedTerminalMainSessionMetadata(
if (!entry) {
return undefined;
}
return {
const next = {
...entry,
sessionFile: undefined,
status: undefined,
@@ -73,6 +74,8 @@ function clearRotatedTerminalMainSessionMetadata(
sessionStartedAt: undefined,
lastInteractionAt: undefined,
};
clearAllCliSessions(next);
return next;
}
type SessionIdMatchSet = {

View File

@@ -189,6 +189,15 @@ describe("agent session resolution", () => {
lastInteractionAt: registryUpdatedAt - 30_000,
startedAt: registryUpdatedAt - 1_000,
endedAt: registryUpdatedAt - 100,
cliSessionBindings: {
"claude-cli": { sessionId: "old-claude-cli-session" },
"codex-cli": { sessionId: "old-codex-cli-session" },
},
cliSessionIds: {
"claude-cli": "old-claude-cli-session",
"codex-cli": "old-codex-cli-session",
},
claudeCliSessionId: "old-claude-cli-session",
},
});
const cfg = mockConfig(home, store);
@@ -205,6 +214,9 @@ describe("agent session resolution", () => {
expect(resolution.sessionEntry?.runtimeMs).toBeUndefined();
expect(resolution.sessionEntry?.sessionStartedAt).toBeUndefined();
expect(resolution.sessionEntry?.lastInteractionAt).toBeUndefined();
expect(resolution.sessionEntry?.cliSessionBindings).toBeUndefined();
expect(resolution.sessionEntry?.cliSessionIds).toBeUndefined();
expect(resolution.sessionEntry?.claudeCliSessionId).toBeUndefined();
const sessionStore = {
[scenario.sessionKey]: resolution.sessionEntry!,
@@ -245,6 +257,9 @@ describe("agent session resolution", () => {
expect(persisted?.startedAt).toBeUndefined();
expect(persisted?.endedAt).toBeUndefined();
expect(persisted?.runtimeMs).toBeUndefined();
expect(persisted?.cliSessionBindings).toBeUndefined();
expect(persisted?.cliSessionIds).toBeUndefined();
expect(persisted?.claudeCliSessionId).toBeUndefined();
expect(persisted?.sessionStartedAt).toBeGreaterThan(registryUpdatedAt);
expect(persisted?.lastInteractionAt).toBeGreaterThan(registryUpdatedAt);
});

View File

@@ -302,7 +302,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
let transcriptMarkerUpdatedAt: number | undefined;
const result = await runWithOwnedSessionTranscriptWriteLock<SessionTranscriptAppendResult>(
{ sessionFile, sessionKey: resolved.normalizedKey },
async () => {
async (): Promise<SessionTranscriptAppendResult> => {
const explicitIdempotencyKey =
params.idempotencyKey ??
((params.message as { idempotencyKey?: unknown }).idempotencyKey as string | undefined);

View File

@@ -760,6 +760,15 @@ describe("gateway agent handler", () => {
startedAt: now - 20_000,
endedAt: now - 15_000,
runtimeMs: 5_000,
cliSessionBindings: {
"claude-cli": { sessionId: "old-claude-cli-session" },
"codex-cli": { sessionId: "old-codex-cli-session" },
},
cliSessionIds: {
"claude-cli": "old-claude-cli-session",
"codex-cli": "old-codex-cli-session",
},
claudeCliSessionId: "old-claude-cli-session",
},
canonicalKey: "agent:main:main",
});
@@ -776,11 +785,80 @@ describe("gateway agent handler", () => {
expect(capturedEntry?.endedAt).toBeUndefined();
expect(capturedEntry?.runtimeMs).toBeUndefined();
expect(capturedEntry?.sessionFile).toBeUndefined();
expect(capturedEntry?.cliSessionBindings).toBeUndefined();
expect(capturedEntry?.cliSessionIds).toBeUndefined();
expect(capturedEntry?.claudeCliSessionId).toBeUndefined();
},
);
},
);
it("reuses terminal main sessions when the fresh store row has the transcript marker", async () => {
const now = Date.parse("2026-05-18T09:47:30.000Z");
vi.useFakeTimers({ toFake: ["Date"] });
dateOnlyFakeClockActive = true;
vi.setSystemTime(now);
await withTempDir({ prefix: "openclaw-gateway-terminal-main-fresh-marker-" }, async (root) => {
const sessionsDir = `${root}/sessions`;
await fs.mkdir(sessionsDir, { recursive: true });
const sessionFile = "terminal-main-session.jsonl";
const transcriptPath = `${sessionsDir}/${sessionFile}`;
await fs.writeFile(
transcriptPath,
`${JSON.stringify({ type: "session", id: "terminal-main-session" })}\n`,
"utf8",
);
await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000));
const staleEntry = {
sessionId: "terminal-main-session",
sessionFile,
status: "done",
updatedAt: now - 10_000,
cliSessionBindings: {
"claude-cli": { sessionId: "existing-claude-cli-session" },
},
cliSessionIds: {
"claude-cli": "existing-claude-cli-session",
},
claudeCliSessionId: "existing-claude-cli-session",
};
mocks.loadSessionEntry.mockReturnValue({
cfg: {},
storePath: `${sessionsDir}/sessions.json`,
entry: staleEntry,
canonicalKey: "agent:main:main",
});
let capturedEntry: Record<string, unknown> | undefined;
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
const store = {
"agent:main:main": {
...staleEntry,
updatedAt: now,
},
};
const result = await updater(store);
capturedEntry = result as Record<string, unknown>;
return result;
});
mocks.agentCommand.mockResolvedValue({
payloads: [{ text: "ok" }],
meta: { durationMs: 100 },
});
await runMainAgent("hi", "test-idem-terminal-main-fresh-marker");
const call = await waitForAgentCommandCall<{ sessionId?: string }>();
expect(call.sessionId).toBe("terminal-main-session");
expect(capturedEntry?.sessionId).toBe("terminal-main-session");
expect(capturedEntry?.sessionFile).toBe(sessionFile);
expect(capturedEntry?.cliSessionIds).toEqual({
"claude-cli": "existing-claude-cli-session",
});
expect(capturedEntry?.claudeCliSessionId).toBe("existing-claude-cli-session");
});
});
it("honors explicit gateway session-id resumes for terminal main rows", async () => {
const now = Date.parse("2026-05-18T09:48:00.000Z");
vi.useFakeTimers({ toFake: ["Date"] });

View File

@@ -30,6 +30,7 @@ import {
consumeExecApprovalFollowupRuntimeHandoff,
parseExecApprovalFollowupApprovalId,
} from "../../agents/bash-tools.exec-approval-followup-state.js";
import { clearAllCliSessions } from "../../agents/cli-session.js";
import type { AgentCommandOpts } from "../../agents/command/types.js";
import { isTimeoutError } from "../../agents/failover-error.js";
import {
@@ -51,7 +52,7 @@ import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
import { agentCommandFromIngress } from "../../commands/agent.js";
import {
evaluateSessionFreshness,
hasTerminalMainSessionTranscriptNewerThanRegistry,
hasTerminalMainSessionTranscriptNewerThanRegistrySync,
mergeSessionEntry,
resolveTerminalMainSessionTranscriptRegistryCheck,
resolveChannelResetConfig,
@@ -1636,7 +1637,7 @@ export const agentHandlers: GatewayRequestHandlers = {
agentId: canonicalSessionAgentId,
})
: undefined;
const freshness = entry
let freshness = entry
? evaluateSessionFreshness({
updatedAt: entry.updatedAt,
...lifecycleTimestamps,
@@ -1644,20 +1645,25 @@ export const agentHandlers: GatewayRequestHandlers = {
policy: resetPolicy,
})
: undefined;
let failedSessionTranscriptMissing = false;
if (entry?.status === "failed" && entry.sessionId?.trim()) {
const resolveFailedSessionTranscriptMissingForEntry = (
candidateEntry: SessionEntry | undefined,
) => {
if (candidateEntry?.status !== "failed" || !candidateEntry.sessionId?.trim()) {
return false;
}
try {
const sessionPathOpts = resolveSessionFilePathOptions({
storePath,
agentId: canonicalSessionAgentId,
});
failedSessionTranscriptMissing = !existsSync(
resolveSessionFilePath(entry.sessionId, entry, sessionPathOpts),
return !existsSync(
resolveSessionFilePath(candidateEntry.sessionId, candidateEntry, sessionPathOpts),
);
} catch {
failedSessionTranscriptMissing = true;
return true;
}
}
};
const failedSessionTranscriptMissing = resolveFailedSessionTranscriptMissingForEntry(entry);
const mainSessionKeyForRequest = resolveAgentMainSessionKey({
cfg: cfgLocal,
agentId: canonicalSessionAgentId,
@@ -1680,7 +1686,7 @@ export const agentHandlers: GatewayRequestHandlers = {
storePath,
});
const terminalMainTranscriptNewerThanRegistry = terminalMainTranscriptCheck
? await hasTerminalMainSessionTranscriptNewerThanRegistry({
? hasTerminalMainSessionTranscriptNewerThanRegistrySync({
entry,
sessionScope: cfgLocal.session?.scope,
sessionKey: canonicalKey,
@@ -1694,7 +1700,7 @@ export const agentHandlers: GatewayRequestHandlers = {
(freshness?.fresh ?? false) &&
!failedSessionTranscriptMissing &&
!terminalMainTranscriptNewerThanRegistry;
const usableRequestedSessionId =
let usableRequestedSessionId =
requestedSessionId && (!entry?.sessionId || canReuseSession)
? requestedSessionId
: undefined;
@@ -1705,7 +1711,7 @@ export const agentHandlers: GatewayRequestHandlers = {
!entry ||
(!canReuseSession && !usableRequestedSessionId) ||
Boolean(usableRequestedSessionId && entry?.sessionId !== usableRequestedSessionId);
const rotatedSessionId = Boolean(entry?.sessionId && entry.sessionId !== sessionId);
let rotatedSessionId = Boolean(entry?.sessionId && entry.sessionId !== sessionId);
const touchInteraction = !isSystemGatewayRun && !request.internalEvents?.length;
const sessionAgent = canonicalSessionAgentId;
type AgentSessionPatchBuild = {
@@ -1715,6 +1721,10 @@ export const agentHandlers: GatewayRequestHandlers = {
groupChannel: string | undefined;
groupSpace: string | undefined;
freshSessionRotatedSinceLoad: boolean;
isNewSession: boolean;
rotatedSessionId: boolean;
usableRequestedSessionId: string | undefined;
freshness: typeof freshness;
};
const requestDeliveryHint = normalizeDeliveryContext({
channel: request.channel?.trim(),
@@ -1807,12 +1817,69 @@ export const agentHandlers: GatewayRequestHandlers = {
const freshSessionRotatedSinceLoad = Boolean(
entry?.sessionId && freshEntry?.sessionId && freshEntry.sessionId !== entry.sessionId,
);
const patchSessionId = freshSessionRotatedSinceLoad ? freshEntry?.sessionId : sessionId;
const shouldClearRotatedState = rotatedSessionId && !freshSessionRotatedSinceLoad;
const freshLifecycleTimestamps = freshEntry
? resolveSessionLifecycleTimestamps({
entry: freshEntry,
storePath,
agentId: sessionAgent,
})
: undefined;
const freshFreshness = freshEntry
? evaluateSessionFreshness({
updatedAt: freshEntry.updatedAt,
...freshLifecycleTimestamps,
now,
policy: resetPolicy,
})
: undefined;
const freshRequestedSessionMatchesEntry = Boolean(
requestedSessionId && freshEntry?.sessionId?.trim() === requestedSessionId,
);
const freshTerminalMainTranscriptNewerThanRegistry =
isSystemGatewayRun || freshRequestedSessionMatchesEntry
? false
: hasTerminalMainSessionTranscriptNewerThanRegistrySync({
entry: freshEntry,
sessionScope: cfgLocal.session?.scope,
sessionKey: canonicalKey,
agentId: sessionAgent,
mainKey: cfgLocal.session?.mainKey,
storePath,
});
const freshFailedSessionTranscriptMissing =
resolveFailedSessionTranscriptMissingForEntry(freshEntry);
const freshCanReuseSession =
Boolean(freshEntry?.sessionId) &&
(freshFreshness?.fresh ?? false) &&
!freshFailedSessionTranscriptMissing &&
!freshTerminalMainTranscriptNewerThanRegistry;
const freshUsableRequestedSessionId =
requestedSessionId && (!freshEntry?.sessionId || freshCanReuseSession)
? requestedSessionId
: undefined;
const freshSessionId = freshUsableRequestedSessionId
? freshUsableRequestedSessionId
: ((freshCanReuseSession ? freshEntry?.sessionId : undefined) ?? sessionId);
const freshIsNewSession =
!freshEntry ||
(!freshCanReuseSession && !freshUsableRequestedSessionId) ||
Boolean(
freshUsableRequestedSessionId &&
freshEntry?.sessionId !== freshUsableRequestedSessionId,
);
const freshRotatedSessionId = Boolean(
freshEntry?.sessionId && freshEntry.sessionId !== freshSessionId,
);
const patchSessionId = freshSessionRotatedSinceLoad
? freshEntry?.sessionId
: freshSessionId;
const shouldClearRotatedState = freshRotatedSessionId && !freshSessionRotatedSinceLoad;
const patch: Partial<SessionEntry> = {
sessionId: patchSessionId,
updatedAt: now,
...(isNewSession && !freshSessionRotatedSinceLoad ? { sessionStartedAt: now } : {}),
...(freshIsNewSession && !freshSessionRotatedSinceLoad
? { sessionStartedAt: now }
: {}),
...(touchInteraction ? { lastInteractionAt: now } : {}),
...(effectiveDeliveryFields.route ? { route: effectiveDeliveryFields.route } : {}),
...(effectiveDeliveryFields.deliveryContext
@@ -1846,6 +1913,9 @@ export const agentHandlers: GatewayRequestHandlers = {
}
: {}),
};
if (shouldClearRotatedState) {
clearAllCliSessions(patch);
}
return {
patch,
spawnedBy: freshSpawnedBy,
@@ -1853,9 +1923,17 @@ export const agentHandlers: GatewayRequestHandlers = {
groupChannel: nextGroup.groupChannel,
groupSpace: nextGroup.groupSpace,
freshSessionRotatedSinceLoad,
isNewSession: freshIsNewSession,
rotatedSessionId: freshRotatedSessionId,
usableRequestedSessionId: freshUsableRequestedSessionId,
freshness: freshFreshness,
};
};
let patchBuild = buildSessionPatch(entry);
isNewSession = patchBuild.isNewSession;
rotatedSessionId = patchBuild.rotatedSessionId;
usableRequestedSessionId = patchBuild.usableRequestedSessionId;
freshness = patchBuild.freshness;
sessionEntry = mergeSessionEntry(entry, patchBuild.patch);
resolvedSessionId = sessionEntry?.sessionId ?? sessionId;
const canonicalSessionKey = canonicalKey;
@@ -1961,6 +2039,10 @@ export const agentHandlers: GatewayRequestHandlers = {
return;
}
}
isNewSession = patchBuild.isNewSession;
rotatedSessionId = patchBuild.rotatedSessionId;
usableRequestedSessionId = patchBuild.usableRequestedSessionId;
freshness = patchBuild.freshness;
spawnedByValue = patchBuild.spawnedBy;
resolvedGroupId = patchBuild.groupId;
resolvedGroupChannel = patchBuild.groupChannel;

View File

@@ -1402,6 +1402,21 @@ describe("chat directive tag stripping for non-streaming final payloads", () =>
fs.writeFileSync(savedImagePath, Buffer.from(TINY_PNG_BASE64, "base64"));
mockState.savedMediaResults = [{ path: savedImagePath, contentType: "image/png" }];
const mirrorIdempotencyKey = "idem-agent-source-reply-media:internal-source-reply:0";
const updatedAt = Date.parse("2026-05-18T11:00:00.000Z");
const rewrittenAt = Date.parse("2026-05-18T11:05:00.000Z");
const storePath = path.join(path.dirname(mockState.transcriptPath), "sessions.json");
fs.writeFileSync(
storePath,
JSON.stringify({
main: {
sessionId: mockState.sessionId,
sessionFile: mockState.transcriptPath,
updatedAt,
status: "done",
},
}),
"utf-8",
);
await appendSourceReplyMirrorEntry({
idempotencyKey: mirrorIdempotencyKey,
text: "Codex source reply with media",
@@ -1430,34 +1445,47 @@ describe("chat directive tag stripping for non-streaming final payloads", () =>
const respond = vi.fn();
const context = createChatContext();
const broadcast = await runNonStreamingChatSend({
context,
respond,
idempotencyKey: "idem-agent-source-reply-media",
message: "hello from codex",
});
vi.useFakeTimers({ toFake: ["Date"] });
vi.setSystemTime(rewrittenAt);
try {
const broadcast = await runNonStreamingChatSend({
context,
respond,
idempotencyKey: "idem-agent-source-reply-media",
message: "hello from codex",
});
expect(broadcast).toMatchObject({
runId: "idem-agent-source-reply-media",
sessionKey: "main",
state: "final",
});
expect(extractFirstTextBlock(broadcast)).toBe("Codex source reply with media");
const broadcastContent = getMessageContent(broadcast);
expect(String(broadcastContent[1]?.url)).toContain("/api/chat/media/outgoing/");
expect(String(broadcastContent[1]?.openUrl)).toContain("/api/chat/media/outgoing/");
const assistantUpdates = mockState.emittedTranscriptUpdates.filter(
(update) =>
typeof update.message === "object" &&
update.message !== null &&
(update.message as { role?: unknown }).role === "assistant",
);
expect(assistantUpdates).toStrictEqual([]);
const assistantEntries = await readActiveAssistantTranscriptMessages();
expect(assistantEntries).toHaveLength(1);
expect(assistantEntries[0]?.idempotencyKey).toBe(mirrorIdempotencyKey);
expect(JSON.stringify(assistantEntries[0])).toContain("/api/chat/media/outgoing/");
expect(JSON.stringify(assistantEntries[0])).not.toContain(mediaUrl);
expect(broadcast).toMatchObject({
runId: "idem-agent-source-reply-media",
sessionKey: "main",
state: "final",
});
expect(extractFirstTextBlock(broadcast)).toBe("Codex source reply with media");
const broadcastContent = getMessageContent(broadcast);
expect(String(broadcastContent[1]?.url)).toContain("/api/chat/media/outgoing/");
expect(String(broadcastContent[1]?.openUrl)).toContain("/api/chat/media/outgoing/");
const assistantUpdates = mockState.emittedTranscriptUpdates.filter(
(update) =>
typeof update.message === "object" &&
update.message !== null &&
(update.message as { role?: unknown }).role === "assistant",
);
expect(assistantUpdates).toStrictEqual([]);
const assistantEntries = await readActiveAssistantTranscriptMessages();
expect(assistantEntries).toHaveLength(1);
expect(assistantEntries[0]?.idempotencyKey).toBe(mirrorIdempotencyKey);
expect(JSON.stringify(assistantEntries[0])).toContain("/api/chat/media/outgoing/");
expect(JSON.stringify(assistantEntries[0])).not.toContain(mediaUrl);
const store = JSON.parse(fs.readFileSync(storePath, "utf-8")) as Record<
string,
{ updatedAt?: number; status?: string }
>;
expect(store.main?.updatedAt).toBeGreaterThanOrEqual(rewrittenAt);
expect(store.main?.updatedAt).toBeGreaterThan(updatedAt);
expect(store.main?.status).toBe("done");
} finally {
vi.useRealTimers();
}
},
);
});

View File

@@ -1643,18 +1643,34 @@ async function appendAssistantTranscriptMessage(params: {
ttsSupplement: params.ttsSupplement,
config: params.cfg,
});
if (appended.ok && params.storePath) {
const transcriptMarkerUpdatedAt = Date.now();
await updateSessionStoreEntry({
if (appended.ok) {
await advanceSessionTranscriptMarker({
storePath: params.storePath,
sessionKey: params.sessionKey,
update: (current) =>
current.sessionId === params.sessionId ? { updatedAt: transcriptMarkerUpdatedAt } : null,
sessionId: params.sessionId,
});
}
return appended;
}
async function advanceSessionTranscriptMarker(params: {
storePath: string | undefined;
sessionKey: string;
sessionId: string;
}): Promise<void> {
if (!params.storePath) {
return;
}
const transcriptMarkerUpdatedAt = Date.now();
await updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
update: (current) =>
current.sessionId === params.sessionId ? { updatedAt: transcriptMarkerUpdatedAt } : null,
});
}
function collectSessionAbortPartials(params: {
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatRunBuffers: Map<string, string>;
@@ -4114,6 +4130,11 @@ export const chatHandlers: GatewayRequestHandlers = {
},
});
if (result.changed) {
await advanceSessionTranscriptMarker({
storePath: latestStorePath,
sessionKey,
sessionId,
});
for (const target of rewriteTargets) {
const rewritten =
await findSourceReplyTranscriptMirrorByIdempotencyKey(