mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(sessions): reconcile stale terminal main transcripts
This commit is contained in:
committed by
Ayaan Zaidi
parent
afa04d6454
commit
0c9ac48d2c
@@ -693,30 +693,62 @@ describe("CLI attempt execution", () => {
|
||||
|
||||
it("persists CLI replies into the session transcript", async () => {
|
||||
const sessionKey = "agent:main:subagent:cli-transcript";
|
||||
const sessionFile = path.join(tmpDir, "session-cli-transcript.jsonl");
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session-cli-transcript",
|
||||
updatedAt: Date.now(),
|
||||
sessionFile,
|
||||
updatedAt: 1,
|
||||
status: "running",
|
||||
startedAt: 2,
|
||||
};
|
||||
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8");
|
||||
|
||||
const updatedEntry = await persistCliTurnTranscript({
|
||||
body: "persist this",
|
||||
result: makeCliResult("hello from cli"),
|
||||
sessionId: sessionEntry.sessionId,
|
||||
sessionKey,
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
sessionAgentId: "main",
|
||||
sessionCwd: tmpDir,
|
||||
config: {},
|
||||
});
|
||||
JSON.stringify(
|
||||
{
|
||||
[sessionKey]: {
|
||||
...sessionEntry,
|
||||
updatedAt: 5,
|
||||
status: "done",
|
||||
endedAt: 4,
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const sessionFile = updatedEntry?.sessionFile;
|
||||
if (!sessionFile) {
|
||||
const nowCalls: number[] = [];
|
||||
let nextNow = 10_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => {
|
||||
nextNow += 1_000;
|
||||
nowCalls.push(nextNow);
|
||||
return nextNow;
|
||||
});
|
||||
let updatedEntry: SessionEntry | undefined;
|
||||
try {
|
||||
updatedEntry = await persistCliTurnTranscript({
|
||||
body: "persist this",
|
||||
result: makeCliResult("hello from cli"),
|
||||
sessionId: sessionEntry.sessionId,
|
||||
sessionKey,
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
storePath,
|
||||
sessionAgentId: "main",
|
||||
sessionCwd: tmpDir,
|
||||
config: {},
|
||||
});
|
||||
} finally {
|
||||
nowSpy.mockRestore();
|
||||
}
|
||||
|
||||
const updatedSessionFile = updatedEntry?.sessionFile;
|
||||
if (!updatedSessionFile) {
|
||||
throw new Error("expected CLI transcript persistence to create a session file");
|
||||
}
|
||||
expect(updatedSessionFile).toBe(sessionFile);
|
||||
const entries = await readSessionFileEntries(sessionFile);
|
||||
expectRecordFields(requireRecord(entries[0], "session entry"), {
|
||||
type: "session",
|
||||
@@ -744,6 +776,18 @@ describe("CLI attempt execution", () => {
|
||||
model: "opus",
|
||||
content: [{ type: "text", text: "hello from cli" }],
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
|
||||
string,
|
||||
SessionEntry
|
||||
>;
|
||||
expect(persisted[sessionKey]?.sessionFile).toBe(sessionFile);
|
||||
expect(persisted[sessionKey]?.updatedAt).toBeGreaterThan(sessionEntry.updatedAt);
|
||||
expect(persisted[sessionKey]?.updatedAt).toBeLessThan(nowCalls.at(-1) ?? 0);
|
||||
expect(persisted[sessionKey]?.status).toBe("done");
|
||||
expect(persisted[sessionKey]?.endedAt).toBe(4);
|
||||
expect(persisted[sessionKey]?.startedAt).toBe(2);
|
||||
expect(sessionStore[sessionKey]?.updatedAt).toBe(persisted[sessionKey]?.updatedAt);
|
||||
});
|
||||
|
||||
it("embedded assistant gap-fill skips user mirror and dedupes identical assistant tails", async () => {
|
||||
|
||||
@@ -21,10 +21,16 @@ export type PersistSessionEntryParams = {
|
||||
storePath: string;
|
||||
entry: SessionEntry;
|
||||
clearedFields?: string[];
|
||||
preserveTranscriptMarkerUpdatedAt?: boolean;
|
||||
shouldPersist?: (entry: SessionEntry | undefined) => boolean;
|
||||
};
|
||||
|
||||
/** Persists one session entry while keeping the caller's in-memory store aligned. */
|
||||
|
||||
function normalizeTranscriptMarkerUpdatedAt(value: number | undefined): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : undefined;
|
||||
}
|
||||
|
||||
export async function persistSessionEntry(
|
||||
params: PersistSessionEntryParams,
|
||||
): Promise<SessionEntry | undefined> {
|
||||
@@ -36,6 +42,13 @@ export async function persistSessionEntry(
|
||||
return current;
|
||||
}
|
||||
const merged = mergeSessionEntry(store[params.sessionKey], params.entry);
|
||||
if (params.preserveTranscriptMarkerUpdatedAt) {
|
||||
const currentUpdatedAt = normalizeTranscriptMarkerUpdatedAt(current?.updatedAt);
|
||||
const markerUpdatedAt = normalizeTranscriptMarkerUpdatedAt(params.entry.updatedAt);
|
||||
if (markerUpdatedAt !== undefined) {
|
||||
merged.updatedAt = Math.max(currentUpdatedAt ?? 0, markerUpdatedAt);
|
||||
}
|
||||
}
|
||||
for (const field of params.clearedFields ?? []) {
|
||||
// Cleared fields only apply when the replacement entry did not set the
|
||||
// field again; this preserves explicit false/null updates.
|
||||
|
||||
@@ -244,7 +244,9 @@ async function persistTextTurnTranscript(
|
||||
...resolveSessionWriteLockOptions(params.config),
|
||||
allowReentrant: true,
|
||||
});
|
||||
let transcriptMarkerUpdatedAt: number | undefined;
|
||||
try {
|
||||
let wroteTranscript = false;
|
||||
const userMessage = params.userMessage;
|
||||
if (userMessage || promptText) {
|
||||
await appendUserTurnTranscriptMessage({
|
||||
@@ -264,6 +266,7 @@ async function persistTextTurnTranscript(
|
||||
}),
|
||||
updateMode: "none",
|
||||
});
|
||||
wroteTranscript = true;
|
||||
}
|
||||
|
||||
if (replyText) {
|
||||
@@ -293,18 +296,44 @@ async function persistTextTurnTranscript(
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
});
|
||||
wroteTranscript = true;
|
||||
}
|
||||
}
|
||||
if (wroteTranscript) {
|
||||
transcriptMarkerUpdatedAt = Date.now();
|
||||
}
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
|
||||
let updatedSessionEntry = sessionEntry;
|
||||
if (params.sessionStore && params.storePath && transcriptMarkerUpdatedAt !== undefined) {
|
||||
const currentEntry = params.sessionStore[params.sessionKey] ?? sessionEntry;
|
||||
if (currentEntry?.sessionId === params.sessionId) {
|
||||
// Keep updatedAt as the registry marker for transcript writes we own.
|
||||
// Session reuse checks compare transcript mtime against this marker, not endedAt.
|
||||
updatedSessionEntry =
|
||||
(await persistSessionEntry({
|
||||
sessionStore: params.sessionStore,
|
||||
sessionKey: params.sessionKey,
|
||||
storePath: params.storePath,
|
||||
entry: {
|
||||
sessionId: params.sessionId,
|
||||
sessionFile,
|
||||
updatedAt: transcriptMarkerUpdatedAt,
|
||||
},
|
||||
preserveTranscriptMarkerUpdatedAt: true,
|
||||
shouldPersist: (current) => current?.sessionId === params.sessionId,
|
||||
})) ?? updatedSessionEntry;
|
||||
}
|
||||
}
|
||||
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: params.sessionAgentId,
|
||||
});
|
||||
return sessionEntry;
|
||||
return updatedSessionEntry;
|
||||
}
|
||||
|
||||
function resolveCliTranscriptReplyText(result: EmbeddedAgentRunResult): string {
|
||||
|
||||
@@ -9,7 +9,10 @@ import {
|
||||
type ThinkLevel,
|
||||
type VerboseLevel,
|
||||
} from "../../auto-reply/thinking.js";
|
||||
import { resolveSessionLifecycleTimestamps } from "../../config/sessions/lifecycle.js";
|
||||
import {
|
||||
hasTerminalMainSessionTranscriptNewerThanRegistrySync,
|
||||
resolveSessionLifecycleTimestamps,
|
||||
} from "../../config/sessions/lifecycle.js";
|
||||
import {
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveExplicitAgentSessionKey,
|
||||
@@ -53,6 +56,23 @@ type SessionKeyResolution = {
|
||||
storePath: string;
|
||||
};
|
||||
|
||||
function clearRotatedTerminalMainSessionMetadata(
|
||||
entry: SessionEntry | undefined,
|
||||
): SessionEntry | undefined {
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
...entry,
|
||||
sessionFile: undefined,
|
||||
status: undefined,
|
||||
startedAt: undefined,
|
||||
endedAt: undefined,
|
||||
runtimeMs: undefined,
|
||||
abortedLastRun: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
type SessionIdMatchSet = {
|
||||
matches: Array<[string, SessionEntry]>;
|
||||
primaryStoreMatches: Array<[string, SessionEntry]>;
|
||||
@@ -328,6 +348,9 @@ export function resolveSession(opts: {
|
||||
const now = Date.now();
|
||||
|
||||
const sessionEntry = sessionKey ? sessionStore[sessionKey] : undefined;
|
||||
const sessionAgentId = opts.agentId?.trim()
|
||||
? normalizeAgentId(opts.agentId)
|
||||
: resolveAgentIdFromSessionKey(sessionKey);
|
||||
|
||||
const resetType = resolveSessionResetType({ sessionKey });
|
||||
const channelReset = resolveChannelResetConfig({
|
||||
@@ -339,12 +362,23 @@ export function resolveSession(opts: {
|
||||
resetType,
|
||||
resetOverride: channelReset,
|
||||
});
|
||||
const terminalMainTranscriptNewerThanRegistry = sessionEntry
|
||||
? hasTerminalMainSessionTranscriptNewerThanRegistrySync({
|
||||
entry: sessionEntry,
|
||||
sessionScope: sessionCfg?.scope,
|
||||
sessionKey,
|
||||
agentId: sessionAgentId,
|
||||
mainKey: sessionCfg?.mainKey,
|
||||
storePath,
|
||||
})
|
||||
: false;
|
||||
const fresh = sessionEntry
|
||||
? evaluateSessionFreshness({
|
||||
? !terminalMainTranscriptNewerThanRegistry &&
|
||||
evaluateSessionFreshness({
|
||||
updatedAt: sessionEntry.updatedAt,
|
||||
...resolveSessionLifecycleTimestamps({
|
||||
entry: sessionEntry,
|
||||
agentId: opts.agentId,
|
||||
agentId: sessionAgentId,
|
||||
storePath,
|
||||
}),
|
||||
now,
|
||||
@@ -354,6 +388,9 @@ export function resolveSession(opts: {
|
||||
const sessionId =
|
||||
opts.sessionId?.trim() || (fresh ? sessionEntry?.sessionId : undefined) || crypto.randomUUID();
|
||||
const isNewSession = !fresh && !opts.sessionId;
|
||||
const resolvedSessionEntry = terminalMainTranscriptNewerThanRegistry
|
||||
? clearRotatedTerminalMainSessionMetadata(sessionEntry)
|
||||
: sessionEntry;
|
||||
|
||||
clearBootstrapSnapshotOnSessionRollover({
|
||||
sessionKey,
|
||||
@@ -372,7 +409,7 @@ export function resolveSession(opts: {
|
||||
return {
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionEntry,
|
||||
sessionEntry: resolvedSessionEntry,
|
||||
sessionStore,
|
||||
storePath,
|
||||
isNewSession,
|
||||
|
||||
@@ -181,6 +181,36 @@ async function writeSessionStoreFast(
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
}
|
||||
|
||||
async function writeTerminalTranscriptSessionStore(params: {
|
||||
storePath: string;
|
||||
sessionKey: string;
|
||||
sessionId: string;
|
||||
status?: SessionEntry["status"];
|
||||
updatedAt: number;
|
||||
endedAt: number;
|
||||
transcriptMtimeMs: number;
|
||||
}): Promise<void> {
|
||||
const sessionFile = `${params.sessionId}.jsonl`;
|
||||
const transcriptPath = path.join(path.dirname(params.storePath), sessionFile);
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
`${JSON.stringify({ type: "session", id: params.sessionId })}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
await fs.utimes(transcriptPath, params.transcriptMtimeMs / 1000, params.transcriptMtimeMs / 1000);
|
||||
await writeSessionStoreFast(params.storePath, {
|
||||
[params.sessionKey]: {
|
||||
sessionId: params.sessionId,
|
||||
sessionFile,
|
||||
updatedAt: params.updatedAt,
|
||||
startedAt: params.endedAt - 10_000,
|
||||
endedAt: params.endedAt,
|
||||
runtimeMs: 9_000,
|
||||
status: params.status ?? "done",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function setMinimalCurrentConversationBindingRegistryForTests(): void {
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
@@ -1594,6 +1624,101 @@ describe("initSessionState reset policy", () => {
|
||||
expect(persisted[sessionKey]?.runtimeMs).toBe(9_000);
|
||||
});
|
||||
|
||||
it.each([
|
||||
{
|
||||
name: "non-main terminal rows ignore transcript mtime",
|
||||
sessionKey: "agent:main:whatsapp:dm:terminal-entry",
|
||||
updatedAtOffsetMs: -5_000,
|
||||
endedAtOffsetMs: -6_000,
|
||||
transcriptMtimeOffsetMs: -3_000,
|
||||
expectNewSession: false,
|
||||
},
|
||||
{
|
||||
name: "main terminal rows rotate when transcript is newer than updatedAt",
|
||||
sessionKey: "agent:main:main",
|
||||
updatedAtOffsetMs: -10_000,
|
||||
endedAtOffsetMs: -11_000,
|
||||
transcriptMtimeOffsetMs: 0,
|
||||
expectNewSession: true,
|
||||
},
|
||||
{
|
||||
name: "failed main terminal rows reuse when the transcript exists",
|
||||
sessionKey: "agent:main:main",
|
||||
status: "failed" as const,
|
||||
updatedAtOffsetMs: -10_000,
|
||||
endedAtOffsetMs: -11_000,
|
||||
transcriptMtimeOffsetMs: 0,
|
||||
expectNewSession: false,
|
||||
},
|
||||
{
|
||||
name: "main terminal rows reuse when updatedAt already reflects the transcript",
|
||||
sessionKey: "agent:main:main",
|
||||
updatedAtOffsetMs: -1_000,
|
||||
endedAtOffsetMs: -6_000,
|
||||
transcriptMtimeOffsetMs: -4_000,
|
||||
expectNewSession: false,
|
||||
},
|
||||
{
|
||||
name: "main terminal rows reuse when transcript mtime differs only by sub-millisecond precision",
|
||||
sessionKey: "agent:main:main",
|
||||
updatedAtOffsetMs: -4_000,
|
||||
endedAtOffsetMs: -6_000,
|
||||
transcriptMtimeOffsetMs: -3_999.5,
|
||||
expectNewSession: false,
|
||||
},
|
||||
{
|
||||
name: "main terminal rows reuse when transcript is not newer than updatedAt",
|
||||
sessionKey: "agent:main:main",
|
||||
updatedAtOffsetMs: -10_000,
|
||||
endedAtOffsetMs: -11_000,
|
||||
transcriptMtimeOffsetMs: -15_000,
|
||||
expectNewSession: false,
|
||||
},
|
||||
])("$name", async (scenario) => {
|
||||
vi.setSystemTime(new Date(2026, 0, 18, 5, 30, 0));
|
||||
const root = await makeCaseDir("openclaw-reset-terminal-entry-");
|
||||
const storePath = path.join(root, "sessions.json");
|
||||
const existingSessionId = "terminal-entry-old";
|
||||
const now = Date.now();
|
||||
const terminalUpdatedAt = now + scenario.updatedAtOffsetMs;
|
||||
const terminalEndedAt = now + scenario.endedAtOffsetMs;
|
||||
await writeTerminalTranscriptSessionStore({
|
||||
storePath,
|
||||
sessionKey: scenario.sessionKey,
|
||||
sessionId: existingSessionId,
|
||||
status: scenario.status,
|
||||
updatedAt: terminalUpdatedAt,
|
||||
endedAt: terminalEndedAt,
|
||||
transcriptMtimeMs: now + scenario.transcriptMtimeOffsetMs,
|
||||
});
|
||||
|
||||
const cfg = { session: { store: storePath } } as OpenClawConfig;
|
||||
const result = await initSessionState({
|
||||
ctx: { Body: "hello", SessionKey: scenario.sessionKey },
|
||||
cfg,
|
||||
commandAuthorized: true,
|
||||
});
|
||||
|
||||
expect(result.isNewSession).toBe(scenario.expectNewSession);
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
|
||||
string,
|
||||
SessionEntry
|
||||
>;
|
||||
const entry = persisted[scenario.sessionKey];
|
||||
if (scenario.expectNewSession) {
|
||||
expect(result.sessionId).not.toBe(existingSessionId);
|
||||
expect(entry?.sessionId).not.toBe(existingSessionId);
|
||||
expect(entry?.status).toBeUndefined();
|
||||
expect(entry?.startedAt).toBeUndefined();
|
||||
expect(entry?.endedAt).toBeUndefined();
|
||||
expect(entry?.runtimeMs).toBeUndefined();
|
||||
} else {
|
||||
expect(result.sessionId).toBe(existingSessionId);
|
||||
expect(entry?.status).toBe(scenario.status ?? "done");
|
||||
expect(entry?.endedAt).toBe(terminalEndedAt);
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps the existing stale session for /reset soft", async () => {
|
||||
vi.setSystemTime(new Date(2026, 0, 18, 5, 30, 0));
|
||||
const root = await makeCaseDir("openclaw-reset-soft-stale-");
|
||||
|
||||
@@ -14,7 +14,10 @@ import { resetRegisteredAgentHarnessSessions } from "../../agents/harness/regist
|
||||
import { cleanupBrowserSessionsForLifecycleEnd } from "../../browser-lifecycle-cleanup.js";
|
||||
import { normalizeChatType } from "../../channels/chat-type.js";
|
||||
import { resolveGroupSessionKey } from "../../config/sessions/group.js";
|
||||
import { resolveSessionLifecycleTimestamps } from "../../config/sessions/lifecycle.js";
|
||||
import {
|
||||
hasTerminalMainSessionTranscriptNewerThanRegistry,
|
||||
resolveSessionLifecycleTimestamps,
|
||||
} from "../../config/sessions/lifecycle.js";
|
||||
import { canonicalizeMainSessionAlias } from "../../config/sessions/main-session.js";
|
||||
import { deriveSessionMetaPatch } from "../../config/sessions/metadata.js";
|
||||
import { resolveSessionTranscriptPath, resolveStorePath } from "../../config/sessions/paths.js";
|
||||
@@ -467,10 +470,20 @@ export async function initSessionState(params: {
|
||||
skipConfiguredFallbackWhenActiveSessionNonAcp: false,
|
||||
}) ?? "",
|
||||
);
|
||||
const terminalMainTranscriptNewerThanRegistry =
|
||||
!isSystemEvent &&
|
||||
(await hasTerminalMainSessionTranscriptNewerThanRegistry({
|
||||
entry,
|
||||
sessionScope,
|
||||
sessionKey,
|
||||
agentId,
|
||||
mainKey,
|
||||
storePath,
|
||||
}));
|
||||
const freshEntry =
|
||||
(isSystemEvent && canReuseExistingEntry) ||
|
||||
(entryFreshness?.fresh ?? false) ||
|
||||
(softResetAllowed && canReuseExistingEntry);
|
||||
(((entryFreshness?.fresh ?? false) || (softResetAllowed && canReuseExistingEntry)) &&
|
||||
!terminalMainTranscriptNewerThanRegistry);
|
||||
// Capture the current session entry before any reset so its transcript can be
|
||||
// archived afterward. We need to do this for both explicit resets (/new, /reset)
|
||||
// and for scheduled/daily resets where the session has become stale (!freshEntry).
|
||||
|
||||
@@ -4,8 +4,11 @@ import path from "node:path";
|
||||
import { withTempHome as withTempHomeBase } from "openclaw/plugin-sdk/test-env";
|
||||
import { beforeEach, describe, expect, it } from "vitest";
|
||||
import { resolveAgentDir, resolveSessionAgentId } from "../agents/agent-scope.js";
|
||||
import { updateSessionStoreAfterAgentRun } from "../agents/command/session-store.js";
|
||||
import { resolveSession } from "../agents/command/session.js";
|
||||
import { loadSessionStore } from "../config/sessions/store-load.js";
|
||||
import { clearSessionStoreCacheForTest } from "../config/sessions/store.js";
|
||||
import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
|
||||
|
||||
@@ -146,6 +149,91 @@ describe("agent session resolution", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("rotates stale terminal main sessions whose transcript is newer than the registry", async () => {
|
||||
const scenarios = [
|
||||
{ label: "canonical main", mainKey: "main", sessionKey: "agent:main:main" },
|
||||
{ label: "raw main alias", mainKey: "main", sessionKey: "main" },
|
||||
{ label: "custom main alias", mainKey: "work", sessionKey: "agent:main:main" },
|
||||
];
|
||||
for (const scenario of scenarios) {
|
||||
await withTempHome(async (home) => {
|
||||
const store = path.join(home, "sessions.json");
|
||||
const sessionFile = path.join(home, `session-${scenario.label.replaceAll(" ", "-")}.jsonl`);
|
||||
const sessionId = `stale-terminal-${scenario.label.replaceAll(" ", "-")}`;
|
||||
const registryUpdatedAt = Date.now() - 10_000;
|
||||
fs.mkdirSync(path.dirname(sessionFile), { recursive: true });
|
||||
fs.writeFileSync(sessionFile, JSON.stringify({ type: "session", id: sessionId }) + "\n");
|
||||
fs.utimesSync(
|
||||
sessionFile,
|
||||
(registryUpdatedAt + 5_000) / 1000,
|
||||
(registryUpdatedAt + 5_000) / 1000,
|
||||
);
|
||||
writeSessionStoreSeed(store, {
|
||||
[scenario.sessionKey]: {
|
||||
sessionId,
|
||||
sessionFile,
|
||||
updatedAt: registryUpdatedAt,
|
||||
status: "done",
|
||||
startedAt: registryUpdatedAt - 1_000,
|
||||
endedAt: registryUpdatedAt - 100,
|
||||
},
|
||||
});
|
||||
const cfg = mockConfig(home, store);
|
||||
cfg.session = { ...cfg.session, mainKey: scenario.mainKey };
|
||||
|
||||
const resolution = resolveSession({ cfg, sessionKey: scenario.sessionKey });
|
||||
|
||||
expect(resolution.isNewSession).toBe(true);
|
||||
expect(resolution.sessionId).not.toBe(sessionId);
|
||||
expect(resolution.sessionEntry?.sessionFile).toBeUndefined();
|
||||
expect(resolution.sessionEntry?.status).toBeUndefined();
|
||||
expect(resolution.sessionEntry?.startedAt).toBeUndefined();
|
||||
expect(resolution.sessionEntry?.endedAt).toBeUndefined();
|
||||
expect(resolution.sessionEntry?.runtimeMs).toBeUndefined();
|
||||
|
||||
const sessionStore = {
|
||||
[scenario.sessionKey]: resolution.sessionEntry!,
|
||||
};
|
||||
await resolveSessionTranscriptFile({
|
||||
sessionId: resolution.sessionId,
|
||||
sessionKey: scenario.sessionKey,
|
||||
sessionEntry: resolution.sessionEntry,
|
||||
sessionStore,
|
||||
storePath: resolution.storePath,
|
||||
agentId: "main",
|
||||
});
|
||||
await updateSessionStoreAfterAgentRun({
|
||||
cfg,
|
||||
sessionId: resolution.sessionId,
|
||||
sessionKey: scenario.sessionKey,
|
||||
storePath: resolution.storePath,
|
||||
sessionStore,
|
||||
defaultProvider: "openai",
|
||||
defaultModel: "gpt-5.5",
|
||||
result: {
|
||||
payloads: [],
|
||||
meta: {
|
||||
aborted: false,
|
||||
agentMeta: {
|
||||
provider: "openai",
|
||||
model: "gpt-5.5",
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
});
|
||||
const persisted = loadSessionStore(resolution.storePath, { skipCache: true })[
|
||||
scenario.sessionKey
|
||||
];
|
||||
expect(persisted?.sessionId).toBe(resolution.sessionId);
|
||||
expect(persisted?.sessionFile).not.toBe(sessionFile);
|
||||
expect(persisted?.status).toBeUndefined();
|
||||
expect(persisted?.startedAt).toBeUndefined();
|
||||
expect(persisted?.endedAt).toBeUndefined();
|
||||
expect(persisted?.runtimeMs).toBeUndefined();
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
it("forwards resolved outbound session context when resuming by sessionId", async () => {
|
||||
await withCrossAgentResumeFixture(async ({ sessionId, sessionKey, cfg }) => {
|
||||
const resolution = resolveSession({ cfg, sessionId });
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
// Session lifecycle timestamps prefer store metadata and fall back to transcript headers.
|
||||
import fs from "node:fs";
|
||||
import fsp from "node:fs/promises";
|
||||
import { asDateTimestampMs } from "../../shared/number-coercion.js";
|
||||
import { canonicalizeMainSessionAlias } from "./main-session.js";
|
||||
import {
|
||||
resolveSessionFilePath,
|
||||
resolveSessionFilePathOptions,
|
||||
type SessionFilePathOptions,
|
||||
} from "./paths.js";
|
||||
import type { SessionEntry } from "./types.js";
|
||||
import { isTerminalSessionStatus, type SessionEntry, type SessionScope } from "./types.js";
|
||||
|
||||
type SessionLifecycleEntry = Pick<
|
||||
SessionEntry,
|
||||
@@ -14,11 +16,31 @@ type SessionLifecycleEntry = Pick<
|
||||
>;
|
||||
|
||||
// Transcript headers are read lazily to recover startedAt without parsing full files.
|
||||
|
||||
type TerminalMainSessionTranscriptRegistryParams = {
|
||||
entry: SessionEntry | undefined;
|
||||
sessionScope?: SessionScope;
|
||||
sessionKey?: string;
|
||||
agentId: string;
|
||||
mainKey?: string;
|
||||
storePath?: string;
|
||||
};
|
||||
|
||||
type TerminalMainSessionTranscriptRegistryCheck = {
|
||||
sessionId: string;
|
||||
registryTimestampMs: number;
|
||||
};
|
||||
|
||||
function resolveTimestamp(value: number | undefined): number | undefined {
|
||||
const timestampMs = asDateTimestampMs(value);
|
||||
return timestampMs !== undefined && timestampMs >= 0 ? timestampMs : undefined;
|
||||
}
|
||||
|
||||
function resolvePositiveTimestamp(value: number | undefined): number | undefined {
|
||||
const timestampMs = resolveTimestamp(value);
|
||||
return timestampMs !== undefined && timestampMs > 0 ? timestampMs : undefined;
|
||||
}
|
||||
|
||||
function parseTimestampMs(value: unknown): number | undefined {
|
||||
if (typeof value === "number") {
|
||||
return resolveTimestamp(value);
|
||||
@@ -117,3 +139,99 @@ export function resolveSessionLifecycleTimestamps(params: {
|
||||
lastInteractionAt: resolveTimestamp(entry.lastInteractionAt),
|
||||
};
|
||||
}
|
||||
|
||||
export function resolveTerminalMainSessionTranscriptRegistryCheck(
|
||||
params: TerminalMainSessionTranscriptRegistryParams,
|
||||
): TerminalMainSessionTranscriptRegistryCheck | undefined {
|
||||
if (!params.entry || !params.sessionKey) {
|
||||
return undefined;
|
||||
}
|
||||
const configuredMainSessionKey = canonicalizeMainSessionAlias({
|
||||
cfg: { session: { scope: params.sessionScope, mainKey: params.mainKey } },
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.mainKey ?? "main",
|
||||
});
|
||||
const candidateSessionKey = canonicalizeMainSessionAlias({
|
||||
cfg: { session: { scope: params.sessionScope, mainKey: params.mainKey } },
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
if (candidateSessionKey !== configuredMainSessionKey) {
|
||||
return undefined;
|
||||
}
|
||||
if (!isTerminalSessionStatus(params.entry.status)) {
|
||||
return undefined;
|
||||
}
|
||||
if (params.entry.status === "failed") {
|
||||
// Failed rows with a present transcript stay reusable for retry/recovery.
|
||||
// Callers already rotate failed rows when the transcript is missing.
|
||||
return undefined;
|
||||
}
|
||||
// updatedAt is touched after managed transcript appends; endedAt can predate
|
||||
// healthy post-run transcript writes and would rotate valid sessions.
|
||||
const registryTimestampMs = resolvePositiveTimestamp(params.entry.updatedAt);
|
||||
if (registryTimestampMs === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
const sessionId = typeof params.entry.sessionId === "string" ? params.entry.sessionId.trim() : "";
|
||||
if (!sessionId) {
|
||||
return undefined;
|
||||
}
|
||||
return { sessionId, registryTimestampMs };
|
||||
}
|
||||
|
||||
function isTranscriptMtimeNewerThanRegistry(params: {
|
||||
transcriptMtimeMs: number;
|
||||
registryTimestampMs: number;
|
||||
}): boolean {
|
||||
const transcriptMtimeMs = Math.floor(params.transcriptMtimeMs);
|
||||
const registryTimestampMs = Math.floor(params.registryTimestampMs);
|
||||
return Number.isFinite(transcriptMtimeMs) && transcriptMtimeMs > registryTimestampMs;
|
||||
}
|
||||
|
||||
export function hasTerminalMainSessionTranscriptNewerThanRegistrySync(
|
||||
params: TerminalMainSessionTranscriptRegistryParams,
|
||||
): boolean {
|
||||
const check = resolveTerminalMainSessionTranscriptRegistryCheck(params);
|
||||
if (!check) {
|
||||
return false;
|
||||
}
|
||||
const pathOptions = resolveSessionFilePathOptions({
|
||||
agentId: params.agentId,
|
||||
storePath: params.storePath,
|
||||
});
|
||||
try {
|
||||
const sessionFile = resolveSessionFilePath(check.sessionId, params.entry, pathOptions);
|
||||
const stats = fs.statSync(sessionFile);
|
||||
return isTranscriptMtimeNewerThanRegistry({
|
||||
transcriptMtimeMs: stats.mtimeMs,
|
||||
registryTimestampMs: check.registryTimestampMs,
|
||||
});
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export async function hasTerminalMainSessionTranscriptNewerThanRegistry(
|
||||
params: TerminalMainSessionTranscriptRegistryParams,
|
||||
): Promise<boolean> {
|
||||
const check = resolveTerminalMainSessionTranscriptRegistryCheck(params);
|
||||
if (!check) {
|
||||
return false;
|
||||
}
|
||||
const pathOptions = resolveSessionFilePathOptions({
|
||||
agentId: params.agentId,
|
||||
storePath: params.storePath,
|
||||
});
|
||||
try {
|
||||
// Session admission owns this bounded stat as the terminal-main reconciliation gate.
|
||||
const sessionFile = resolveSessionFilePath(check.sessionId, params.entry, pathOptions);
|
||||
const stats = await fsp.stat(sessionFile);
|
||||
return isTranscriptMtimeNewerThanRegistry({
|
||||
transcriptMtimeMs: stats.mtimeMs,
|
||||
registryTimestampMs: check.registryTimestampMs,
|
||||
});
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -723,6 +723,128 @@ describe("gateway agent handler", () => {
|
||||
expect(capturedEntry?.sessionFile).toBeUndefined();
|
||||
});
|
||||
|
||||
it("rotates a terminal main session when its transcript is newer than the registry row", async () => {
|
||||
const now = Date.parse("2026-05-18T09:47:00.000Z");
|
||||
vi.useFakeTimers({ toFake: ["Date"] });
|
||||
dateOnlyFakeClockActive = true;
|
||||
vi.setSystemTime(now);
|
||||
|
||||
await withTempDir(
|
||||
{ prefix: "openclaw-gateway-terminal-main-newer-transcript-" },
|
||||
async (root) => {
|
||||
const sessionsDir = `${root}/sessions`;
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const sessionFile = "terminal-main-session.jsonl";
|
||||
const transcriptPath = `${sessionsDir}/${sessionFile}`;
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
`${JSON.stringify({ type: "session", id: "terminal-main-session" })}\n`,
|
||||
"utf8",
|
||||
);
|
||||
await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000));
|
||||
mocks.loadSessionEntry.mockReturnValue({
|
||||
cfg: {},
|
||||
storePath: `${sessionsDir}/sessions.json`,
|
||||
entry: {
|
||||
sessionId: "terminal-main-session",
|
||||
sessionFile,
|
||||
status: "done",
|
||||
updatedAt: now - 10_000,
|
||||
sessionStartedAt: now - 60_000,
|
||||
lastInteractionAt: now - 10_000,
|
||||
startedAt: now - 20_000,
|
||||
endedAt: now - 15_000,
|
||||
runtimeMs: 5_000,
|
||||
},
|
||||
canonicalKey: "agent:main:main",
|
||||
});
|
||||
|
||||
const capturedEntry = await runMainAgentAndCaptureEntry(
|
||||
"test-idem-terminal-main-newer-transcript",
|
||||
);
|
||||
|
||||
const call = await waitForAgentCommandCall<{ sessionId?: string }>();
|
||||
expect(call.sessionId).not.toBe("terminal-main-session");
|
||||
expect(capturedEntry?.sessionId).not.toBe("terminal-main-session");
|
||||
expect(capturedEntry?.status).toBeUndefined();
|
||||
expect(capturedEntry?.startedAt).toBeUndefined();
|
||||
expect(capturedEntry?.endedAt).toBeUndefined();
|
||||
expect(capturedEntry?.runtimeMs).toBeUndefined();
|
||||
expect(capturedEntry?.sessionFile).toBeUndefined();
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it.each(["heartbeat", "cron"] as const)(
|
||||
"preserves terminal main session reuse for %s gateway runs",
|
||||
async (runKind) => {
|
||||
const now = Date.parse("2026-05-18T09:49:00.000Z");
|
||||
vi.useFakeTimers({ toFake: ["Date"] });
|
||||
dateOnlyFakeClockActive = true;
|
||||
vi.setSystemTime(now);
|
||||
|
||||
await withTempDir(
|
||||
{ prefix: `openclaw-gateway-terminal-main-${runKind}-reuse-` },
|
||||
async (root) => {
|
||||
const sessionsDir = `${root}/sessions`;
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const sessionFile = `terminal-main-${runKind}.jsonl`;
|
||||
const transcriptPath = `${sessionsDir}/${sessionFile}`;
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
`${JSON.stringify({ type: "session", id: "terminal-main-session" })}\n`,
|
||||
"utf8",
|
||||
);
|
||||
await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000));
|
||||
const existingEntry = {
|
||||
sessionId: "terminal-main-session",
|
||||
sessionFile,
|
||||
status: "done",
|
||||
updatedAt: now - 10_000,
|
||||
sessionStartedAt: now - 60_000,
|
||||
lastInteractionAt: now - 10_000,
|
||||
startedAt: now - 20_000,
|
||||
endedAt: now - 15_000,
|
||||
runtimeMs: 5_000,
|
||||
};
|
||||
mocks.loadSessionEntry.mockReturnValue({
|
||||
cfg: {},
|
||||
storePath: `${sessionsDir}/sessions.json`,
|
||||
entry: existingEntry,
|
||||
canonicalKey: "agent:main:main",
|
||||
});
|
||||
|
||||
let capturedEntry: Record<string, unknown> | undefined;
|
||||
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
|
||||
const store: Record<string, unknown> = {
|
||||
"agent:main:main": { ...existingEntry },
|
||||
};
|
||||
const result = await updater(store);
|
||||
capturedEntry = result as Record<string, unknown>;
|
||||
return result;
|
||||
});
|
||||
mocks.agentCommand.mockResolvedValue({
|
||||
payloads: [{ text: "ok" }],
|
||||
meta: { durationMs: 100 },
|
||||
});
|
||||
|
||||
await invokeAgent({
|
||||
message: `${runKind} probe`,
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
bootstrapContextRunKind: runKind,
|
||||
idempotencyKey: `test-idem-terminal-main-${runKind}-reuse`,
|
||||
} as AgentParams);
|
||||
|
||||
const call = await waitForAgentCommandCall<{ sessionId?: string }>();
|
||||
expect(call.sessionId).toBe("terminal-main-session");
|
||||
expect(capturedEntry?.sessionId).toBe("terminal-main-session");
|
||||
expect(capturedEntry?.sessionFile).toBe(sessionFile);
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it("rotates a failed session when its default transcript is missing", async () => {
|
||||
const now = Date.parse("2026-05-18T09:48:00.000Z");
|
||||
vi.useFakeTimers({ toFake: ["Date"] });
|
||||
|
||||
@@ -51,7 +51,9 @@ import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
|
||||
import { agentCommandFromIngress } from "../../commands/agent.js";
|
||||
import {
|
||||
evaluateSessionFreshness,
|
||||
hasTerminalMainSessionTranscriptNewerThanRegistry,
|
||||
mergeSessionEntry,
|
||||
resolveTerminalMainSessionTranscriptRegistryCheck,
|
||||
resolveChannelResetConfig,
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveExplicitAgentSessionKey,
|
||||
@@ -1656,10 +1658,38 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
failedSessionTranscriptMissing = true;
|
||||
}
|
||||
}
|
||||
const mainSessionKeyForRequest = resolveAgentMainSessionKey({
|
||||
cfg: cfgLocal,
|
||||
agentId: canonicalSessionAgentId,
|
||||
});
|
||||
const isSystemGatewayRun =
|
||||
request.bootstrapContextRunKind === "cron" ||
|
||||
request.bootstrapContextRunKind === "heartbeat";
|
||||
const terminalMainTranscriptCheck = isSystemGatewayRun
|
||||
? undefined
|
||||
: resolveTerminalMainSessionTranscriptRegistryCheck({
|
||||
entry,
|
||||
sessionScope: cfgLocal.session?.scope,
|
||||
sessionKey: canonicalKey,
|
||||
agentId: canonicalSessionAgentId,
|
||||
mainKey: cfgLocal.session?.mainKey,
|
||||
storePath,
|
||||
});
|
||||
const terminalMainTranscriptNewerThanRegistry = terminalMainTranscriptCheck
|
||||
? await hasTerminalMainSessionTranscriptNewerThanRegistry({
|
||||
entry,
|
||||
sessionScope: cfgLocal.session?.scope,
|
||||
sessionKey: canonicalKey,
|
||||
agentId: canonicalSessionAgentId,
|
||||
mainKey: cfgLocal.session?.mainKey,
|
||||
storePath,
|
||||
})
|
||||
: false;
|
||||
const canReuseSession =
|
||||
Boolean(entry?.sessionId) &&
|
||||
(freshness?.fresh ?? false) &&
|
||||
!failedSessionTranscriptMissing;
|
||||
!failedSessionTranscriptMissing &&
|
||||
!terminalMainTranscriptNewerThanRegistry;
|
||||
const usableRequestedSessionId =
|
||||
requestedSessionId && (!entry?.sessionId || canReuseSession)
|
||||
? requestedSessionId
|
||||
@@ -1672,10 +1702,7 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
(!canReuseSession && !usableRequestedSessionId) ||
|
||||
Boolean(usableRequestedSessionId && entry?.sessionId !== usableRequestedSessionId);
|
||||
const rotatedSessionId = Boolean(entry?.sessionId && entry.sessionId !== sessionId);
|
||||
const touchInteraction =
|
||||
request.bootstrapContextRunKind !== "cron" &&
|
||||
request.bootstrapContextRunKind !== "heartbeat" &&
|
||||
!request.internalEvents?.length;
|
||||
const touchInteraction = !isSystemGatewayRun && !request.internalEvents?.length;
|
||||
const sessionAgent = canonicalSessionAgentId;
|
||||
type AgentSessionPatchBuild = {
|
||||
patch: Partial<SessionEntry>;
|
||||
@@ -1831,10 +1858,7 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
resolvedSessionKey = canonicalSessionKey;
|
||||
const sessionAgentId = canonicalSessionAgentId;
|
||||
resolvedSessionAgentId = sessionAgentId;
|
||||
const mainSessionKey = resolveAgentMainSessionKey({
|
||||
cfg: cfgLocal,
|
||||
agentId: sessionAgentId,
|
||||
});
|
||||
const mainSessionKey = mainSessionKeyForRequest;
|
||||
// Legacy stores may lack sessionStartedAt entirely. Pre-compute a
|
||||
// JSONL-transcript-derived candidate outside the store lock; the
|
||||
// updater below only writes it when the freshly-loaded store still
|
||||
|
||||
Reference in New Issue
Block a user