From a2bd1cef5990e46e667774d094ecd6ed5a581301 Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Tue, 2 Jun 2026 15:38:03 -0700 Subject: [PATCH] clawdbot-587: add sqlite session lifecycle cleanup --- .../session-accessor.conformance.test.ts | 155 ++++++++++++ .../sessions/session-accessor.sqlite.ts | 220 ++++++++++++++++++ 2 files changed, 375 insertions(+) diff --git a/src/config/sessions/session-accessor.conformance.test.ts b/src/config/sessions/session-accessor.conformance.test.ts index f46f58b938bb..21609ff39e26 100644 --- a/src/config/sessions/session-accessor.conformance.test.ts +++ b/src/config/sessions/session-accessor.conformance.test.ts @@ -8,6 +8,7 @@ import { closeOpenClawStateDatabaseForTest } from "../../state/openclaw-state-db import { appendTranscriptEvent, appendTranscriptMessage, + cleanupSessionLifecycleArtifacts, listSessionEntries, loadExactSessionEntry, loadSessionEntry, @@ -32,6 +33,7 @@ import { import { appendSqliteTranscriptEvent, appendSqliteTranscriptMessage, + cleanupSqliteSessionLifecycleArtifacts, listSqliteSessionEntries, loadExactSqliteSessionEntry, loadSqliteSessionEntry, @@ -72,6 +74,13 @@ type AccessorAdapter = { scope: SessionAccessScope, update: (entry: SessionEntry) => Partial | null, ): Promise; + cleanupSessionLifecycleArtifacts(params: { + storePath: string; + sessionKeySegmentPrefix: string; + transcriptContentMarker: string; + orphanTranscriptMinAgeMs: number; + nowMs?: number; + }): Promise<{ removedEntries: number; archivedTranscriptArtifacts: number }>; loadTranscriptEvents(scope: SessionTranscriptReadScope): Promise; appendTranscriptEvent(scope: SessionTranscriptAccessScope, event: TranscriptEvent): Promise; appendTranscriptMessage( @@ -117,6 +126,7 @@ const fileBackedAdapter: AccessorAdapter = { replaceSessionEntry, patchSessionEntry, updateSessionEntry, + cleanupSessionLifecycleArtifacts, loadTranscriptEvents, appendTranscriptEvent, appendTranscriptMessage, @@ -152,6 +162,7 @@ const sqliteAdapter: AccessorAdapter = { replaceSessionEntry: replaceSqliteSessionEntry, patchSessionEntry: patchSqliteSessionEntry, updateSessionEntry: updateSqliteSessionEntry, + cleanupSessionLifecycleArtifacts: cleanupSqliteSessionLifecycleArtifacts, loadTranscriptEvents: loadSqliteTranscriptEvents, appendTranscriptEvent: appendSqliteTranscriptEvent, appendTranscriptMessage: appendSqliteTranscriptMessage, @@ -285,6 +296,150 @@ describe.each([fileBackedAdapter, sqliteAdapter])( }); }); + it("conforms for lifecycle entry and transcript cleanup", async () => { + const nowMs = Date.now(); + const oldTimestamp = nowMs - 600_000; + const cleanupStorePath = + adapter.name === "sqlite" + ? path.join(paths.stateDir, "agents", "main", "sessions", "sessions.json") + : paths.storePath; + const scopedEntry = (sessionKey: string): SessionAccessScope => ({ + ...adapter.entryScope(paths), + sessionKey, + storePath: cleanupStorePath, + }); + const scopedTranscript = ( + sessionKey: string, + sessionId: string, + ): SessionTranscriptAccessScope => ({ + ...adapter.transcriptScope(paths, sessionId), + sessionKey, + storePath: cleanupStorePath, + }); + const writeTranscript = async (params: { + sessionKey: string; + sessionId: string; + old?: boolean; + }) => { + const timestamp = params.old ? oldTimestamp : nowMs; + const event = { + id: `${params.sessionId}-event`, + message: { role: "assistant", content: "cleanup" }, + runId: "lifecycle-marker-run", + timestamp: new Date(timestamp).toISOString(), + type: "message", + }; + if (adapter.name === "sqlite") { + await adapter.appendTranscriptEvent( + scopedTranscript(params.sessionKey, params.sessionId), + event, + ); + return; + } + const transcriptPath = path.join( + path.dirname(cleanupStorePath), + `${params.sessionId}.jsonl`, + ); + fs.writeFileSync(transcriptPath, `${JSON.stringify(event)}\n`, "utf-8"); + if (params.old) { + const oldDate = new Date(oldTimestamp); + fs.utimesSync(transcriptPath, oldDate, oldDate); + } + }; + + await adapter.upsertSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-missing"), { + sessionId: "missing-lifecycle", + updatedAt: oldTimestamp, + }); + await adapter.upsertSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-removed"), { + sessionId: "removed-lifecycle", + updatedAt: oldTimestamp, + }); + await adapter.upsertSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-fresh"), { + sessionId: "fresh-lifecycle", + updatedAt: nowMs, + }); + await adapter.upsertSessionEntry( + scopedEntry("agent:main:telegram:group:lifecycle-cleanup-room"), + { + sessionId: "kept-by-segment", + updatedAt: oldTimestamp, + }, + ); + await adapter.upsertSessionEntry(scopedEntry("agent:main:regular"), { + sessionId: "referenced", + updatedAt: oldTimestamp, + }); + await writeTranscript({ + sessionKey: "agent:main:lifecycle-cleanup-removed", + sessionId: "removed-lifecycle", + old: true, + }); + await writeTranscript({ + sessionKey: "agent:main:lifecycle-cleanup-fresh", + sessionId: "fresh-lifecycle", + }); + await writeTranscript({ + sessionKey: "agent:main:regular", + sessionId: "referenced", + old: true, + }); + await writeTranscript({ + sessionKey: "agent:main:orphan", + sessionId: "orphan-lifecycle", + old: true, + }); + + await expect( + adapter.cleanupSessionLifecycleArtifacts({ + storePath: cleanupStorePath, + sessionKeySegmentPrefix: "lifecycle-cleanup-", + transcriptContentMarker: "lifecycle-marker-", + orphanTranscriptMinAgeMs: 300_000, + nowMs, + }), + ).resolves.toEqual({ removedEntries: 2, archivedTranscriptArtifacts: 2 }); + + expect( + adapter.loadSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-missing")), + ).toBeUndefined(); + expect( + adapter.loadSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-removed")), + ).toBeUndefined(); + expect( + adapter.loadSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-fresh")), + ).toMatchObject({ + sessionId: "fresh-lifecycle", + }); + expect( + adapter.loadSessionEntry(scopedEntry("agent:main:telegram:group:lifecycle-cleanup-room")), + ).toMatchObject({ sessionId: "kept-by-segment" }); + expect(adapter.loadSessionEntry(scopedEntry("agent:main:regular"))).toMatchObject({ + sessionId: "referenced", + }); + if (adapter.name === "sqlite") { + expect(fs.existsSync(cleanupStorePath)).toBe(false); + await expect( + adapter.loadTranscriptEvents(scopedTranscript("agent:main:regular", "referenced")), + ).resolves.not.toEqual([]); + await expect( + adapter.loadTranscriptEvents( + scopedTranscript("agent:main:lifecycle-cleanup-removed", "removed-lifecycle"), + ), + ).resolves.toEqual([]); + } else { + const files = fs.readdirSync(path.dirname(cleanupStorePath)); + expect( + files.filter((file) => file.startsWith("removed-lifecycle.jsonl.deleted.")), + ).toHaveLength(1); + expect( + files.filter((file) => file.startsWith("orphan-lifecycle.jsonl.deleted.")), + ).toHaveLength(1); + expect(files).toContain("fresh-lifecycle.jsonl"); + expect(files).toContain("referenced.jsonl"); + } + }); + it("conforms for raw transcript event load and append", async () => { const scope = adapter.transcriptScope(paths); const readScope = adapter.transcriptReadScope(paths); diff --git a/src/config/sessions/session-accessor.sqlite.ts b/src/config/sessions/session-accessor.sqlite.ts index b21844cdb485..d387f4e7928b 100644 --- a/src/config/sessions/session-accessor.sqlite.ts +++ b/src/config/sessions/session-accessor.sqlite.ts @@ -27,6 +27,8 @@ import type { SessionEntryPatchContext, SessionEntryPatchOptions, SessionEntrySummary, + SessionLifecycleArtifactCleanupParams, + SessionLifecycleArtifactCleanupResult, SessionEntryUpdateOptions, SessionTranscriptAccessScope, SessionTranscriptReadScope, @@ -234,6 +236,34 @@ export async function updateSqliteSessionEntry( }); } +/** Cleans scoped session lifecycle rows and associated SQLite transcript state. */ +export async function cleanupSqliteSessionLifecycleArtifacts( + params: SessionLifecycleArtifactCleanupParams, +): Promise { + const sessionKeySegmentPrefix = params.sessionKeySegmentPrefix.trim(); + const transcriptContentMarker = params.transcriptContentMarker; + if (!sessionKeySegmentPrefix || !transcriptContentMarker) { + return { removedEntries: 0, archivedTranscriptArtifacts: 0 }; + } + + const resolved = resolveSqliteReadScope({ storePath: params.storePath }); + return await runExclusiveSqliteSessionWrite(resolved, async () => { + let result: SessionLifecycleArtifactCleanupResult = { + removedEntries: 0, + archivedTranscriptArtifacts: 0, + }; + runOpenClawAgentWriteTransaction((database) => { + result = cleanupSqliteSessionLifecycleArtifactsInTransaction(database, { + sessionKeySegmentPrefix, + transcriptContentMarker, + orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs, + nowMs: params.nowMs ?? Date.now(), + }); + }, toDatabaseOptions(resolved)); + return result; + }); +} + /** Loads raw transcript events from the additive SQLite transcript store. */ export async function loadSqliteTranscriptEvents( scope: SessionTranscriptReadScope, @@ -533,6 +563,196 @@ function readExactSessionEntryRow( return entry ? { entry, row } : undefined; } +function sessionKeySegmentStartsWith(sessionKey: string, prefix: string): boolean { + const firstSeparator = sessionKey.indexOf(":"); + if (firstSeparator < 0) { + return sessionKey.startsWith(prefix); + } + const secondSeparator = sessionKey.indexOf(":", firstSeparator + 1); + const sessionSegment = secondSeparator < 0 ? sessionKey : sessionKey.slice(secondSeparator + 1); + return sessionSegment.startsWith(prefix); +} + +function readSessionTranscriptUpdatedAt( + database: OpenClawAgentDatabase, + sessionId: string, +): number | undefined { + const db = getSessionKysely(database.db); + const row = executeSqliteQueryTakeFirstSync( + database.db, + db + .selectFrom("transcript_events") + .select((eb) => eb.fn.max("created_at").as("updated_at")) + .where("session_id", "=", sessionId), + ); + if (row?.updated_at === null || row?.updated_at === undefined) { + return undefined; + } + return normalizeSqliteNumber(row.updated_at); +} + +function sqliteTranscriptStateIsReclaimable(params: { + database: OpenClawAgentDatabase; + sessionId: string; + nowMs: number; + orphanTranscriptMinAgeMs: number; +}): boolean { + const updatedAt = readSessionTranscriptUpdatedAt(params.database, params.sessionId); + return updatedAt === undefined || params.nowMs - updatedAt >= params.orphanTranscriptMinAgeMs; +} + +function sqliteTranscriptStateHasMarker(params: { + database: OpenClawAgentDatabase; + sessionId: string; + transcriptContentMarker: string; +}): boolean { + const db = getSessionKysely(params.database.db); + const row = executeSqliteQueryTakeFirstSync( + params.database.db, + db + .selectFrom("transcript_events") + .select("seq") + .where("session_id", "=", params.sessionId) + .where("event_json", "like", `%${params.transcriptContentMarker}%`) + .limit(1), + ); + return row !== undefined; +} + +function readReferencedSqliteSessionIds(database: OpenClawAgentDatabase): Set { + const db = getSessionKysely(database.db); + const rows = executeSqliteQuerySync( + database.db, + db.selectFrom("session_entries").select("session_id"), + ).rows; + return new Set(rows.map((row) => row.session_id)); +} + +function deleteSqliteSessionStateIfUnreferenced(params: { + database: OpenClawAgentDatabase; + referencedSessionIds: ReadonlySet; + sessionId: string; +}): number { + if (params.referencedSessionIds.has(params.sessionId)) { + return 0; + } + const hadTranscriptState = + readSessionTranscriptUpdatedAt(params.database, params.sessionId) !== undefined; + const db = getSessionKysely(params.database.db); + executeSqliteQuerySync( + params.database.db, + db.deleteFrom("sessions").where("session_id", "=", params.sessionId), + ); + return hadTranscriptState ? 1 : 0; +} + +function cleanupSqliteOrphanLifecycleTranscriptState(params: { + database: OpenClawAgentDatabase; + referencedSessionIds: ReadonlySet; + transcriptContentMarker: string; + orphanTranscriptMinAgeMs: number; + nowMs: number; +}): number { + const db = getSessionKysely(params.database.db); + const rows = executeSqliteQuerySync( + params.database.db, + db.selectFrom("sessions").select("session_id").orderBy("session_id", "asc"), + ).rows; + + let removed = 0; + // Orphan transcript state is represented by a sessions row without a live + // session entry. The marker keeps this scoped to the caller-owned lifecycle. + for (const row of rows) { + if (params.referencedSessionIds.has(row.session_id)) { + continue; + } + if ( + !sqliteTranscriptStateIsReclaimable({ + database: params.database, + sessionId: row.session_id, + nowMs: params.nowMs, + orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs, + }) || + !sqliteTranscriptStateHasMarker({ + database: params.database, + sessionId: row.session_id, + transcriptContentMarker: params.transcriptContentMarker, + }) + ) { + continue; + } + executeSqliteQuerySync( + params.database.db, + db.deleteFrom("sessions").where("session_id", "=", row.session_id), + ); + removed += 1; + } + return removed; +} + +function cleanupSqliteSessionLifecycleArtifactsInTransaction( + database: OpenClawAgentDatabase, + params: { + sessionKeySegmentPrefix: string; + transcriptContentMarker: string; + orphanTranscriptMinAgeMs: number; + nowMs: number; + }, +): SessionLifecycleArtifactCleanupResult { + const db = getSessionKysely(database.db); + const rows = executeSqliteQuerySync( + database.db, + db + .selectFrom("session_entries") + .select(["session_key", "session_id"]) + .orderBy("session_key", "asc"), + ).rows; + + const removedSessionIds = new Set(); + let removedEntries = 0; + // Delete matching lifecycle entries first; session/transcript state is only + // removed after we rebuild the post-delete reference set below. + for (const row of rows) { + if (!sessionKeySegmentStartsWith(row.session_key, params.sessionKeySegmentPrefix)) { + continue; + } + if ( + !sqliteTranscriptStateIsReclaimable({ + database, + sessionId: row.session_id, + nowMs: params.nowMs, + orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs, + }) + ) { + continue; + } + executeSqliteQuerySync( + database.db, + db.deleteFrom("session_entries").where("session_key", "=", row.session_key), + ); + removedSessionIds.add(row.session_id); + removedEntries += 1; + } + + const referencedSessionIds = readReferencedSqliteSessionIds(database); + let archivedTranscriptArtifacts = 0; + for (const sessionId of removedSessionIds) { + archivedTranscriptArtifacts += deleteSqliteSessionStateIfUnreferenced({ + database, + referencedSessionIds, + sessionId, + }); + } + archivedTranscriptArtifacts += cleanupSqliteOrphanLifecycleTranscriptState({ + database, + referencedSessionIds, + transcriptContentMarker: params.transcriptContentMarker, + orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs, + nowMs: params.nowMs, + }); + return { removedEntries, archivedTranscriptArtifacts }; +} + function writeSessionEntry( database: OpenClawAgentDatabase, sessionKey: string,