clawdbot-587: add sqlite session lifecycle cleanup

This commit is contained in:
Josh Lehman
2026-06-02 15:38:03 -07:00
parent 61aa3d17fb
commit a2bd1cef59
2 changed files with 375 additions and 0 deletions

View File

@@ -8,6 +8,7 @@ import { closeOpenClawStateDatabaseForTest } from "../../state/openclaw-state-db
import {
appendTranscriptEvent,
appendTranscriptMessage,
cleanupSessionLifecycleArtifacts,
listSessionEntries,
loadExactSessionEntry,
loadSessionEntry,
@@ -32,6 +33,7 @@ import {
import {
appendSqliteTranscriptEvent,
appendSqliteTranscriptMessage,
cleanupSqliteSessionLifecycleArtifacts,
listSqliteSessionEntries,
loadExactSqliteSessionEntry,
loadSqliteSessionEntry,
@@ -72,6 +74,13 @@ type AccessorAdapter = {
scope: SessionAccessScope,
update: (entry: SessionEntry) => Partial<SessionEntry> | null,
): Promise<SessionEntry | null>;
cleanupSessionLifecycleArtifacts(params: {
storePath: string;
sessionKeySegmentPrefix: string;
transcriptContentMarker: string;
orphanTranscriptMinAgeMs: number;
nowMs?: number;
}): Promise<{ removedEntries: number; archivedTranscriptArtifacts: number }>;
loadTranscriptEvents(scope: SessionTranscriptReadScope): Promise<TranscriptEvent[]>;
appendTranscriptEvent(scope: SessionTranscriptAccessScope, event: TranscriptEvent): Promise<void>;
appendTranscriptMessage<TMessage>(
@@ -117,6 +126,7 @@ const fileBackedAdapter: AccessorAdapter = {
replaceSessionEntry,
patchSessionEntry,
updateSessionEntry,
cleanupSessionLifecycleArtifacts,
loadTranscriptEvents,
appendTranscriptEvent,
appendTranscriptMessage,
@@ -152,6 +162,7 @@ const sqliteAdapter: AccessorAdapter = {
replaceSessionEntry: replaceSqliteSessionEntry,
patchSessionEntry: patchSqliteSessionEntry,
updateSessionEntry: updateSqliteSessionEntry,
cleanupSessionLifecycleArtifacts: cleanupSqliteSessionLifecycleArtifacts,
loadTranscriptEvents: loadSqliteTranscriptEvents,
appendTranscriptEvent: appendSqliteTranscriptEvent,
appendTranscriptMessage: appendSqliteTranscriptMessage,
@@ -285,6 +296,150 @@ describe.each([fileBackedAdapter, sqliteAdapter])(
});
});
it("conforms for lifecycle entry and transcript cleanup", async () => {
const nowMs = Date.now();
const oldTimestamp = nowMs - 600_000;
const cleanupStorePath =
adapter.name === "sqlite"
? path.join(paths.stateDir, "agents", "main", "sessions", "sessions.json")
: paths.storePath;
const scopedEntry = (sessionKey: string): SessionAccessScope => ({
...adapter.entryScope(paths),
sessionKey,
storePath: cleanupStorePath,
});
const scopedTranscript = (
sessionKey: string,
sessionId: string,
): SessionTranscriptAccessScope => ({
...adapter.transcriptScope(paths, sessionId),
sessionKey,
storePath: cleanupStorePath,
});
const writeTranscript = async (params: {
sessionKey: string;
sessionId: string;
old?: boolean;
}) => {
const timestamp = params.old ? oldTimestamp : nowMs;
const event = {
id: `${params.sessionId}-event`,
message: { role: "assistant", content: "cleanup" },
runId: "lifecycle-marker-run",
timestamp: new Date(timestamp).toISOString(),
type: "message",
};
if (adapter.name === "sqlite") {
await adapter.appendTranscriptEvent(
scopedTranscript(params.sessionKey, params.sessionId),
event,
);
return;
}
const transcriptPath = path.join(
path.dirname(cleanupStorePath),
`${params.sessionId}.jsonl`,
);
fs.writeFileSync(transcriptPath, `${JSON.stringify(event)}\n`, "utf-8");
if (params.old) {
const oldDate = new Date(oldTimestamp);
fs.utimesSync(transcriptPath, oldDate, oldDate);
}
};
await adapter.upsertSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-missing"), {
sessionId: "missing-lifecycle",
updatedAt: oldTimestamp,
});
await adapter.upsertSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-removed"), {
sessionId: "removed-lifecycle",
updatedAt: oldTimestamp,
});
await adapter.upsertSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-fresh"), {
sessionId: "fresh-lifecycle",
updatedAt: nowMs,
});
await adapter.upsertSessionEntry(
scopedEntry("agent:main:telegram:group:lifecycle-cleanup-room"),
{
sessionId: "kept-by-segment",
updatedAt: oldTimestamp,
},
);
await adapter.upsertSessionEntry(scopedEntry("agent:main:regular"), {
sessionId: "referenced",
updatedAt: oldTimestamp,
});
await writeTranscript({
sessionKey: "agent:main:lifecycle-cleanup-removed",
sessionId: "removed-lifecycle",
old: true,
});
await writeTranscript({
sessionKey: "agent:main:lifecycle-cleanup-fresh",
sessionId: "fresh-lifecycle",
});
await writeTranscript({
sessionKey: "agent:main:regular",
sessionId: "referenced",
old: true,
});
await writeTranscript({
sessionKey: "agent:main:orphan",
sessionId: "orphan-lifecycle",
old: true,
});
await expect(
adapter.cleanupSessionLifecycleArtifacts({
storePath: cleanupStorePath,
sessionKeySegmentPrefix: "lifecycle-cleanup-",
transcriptContentMarker: "lifecycle-marker-",
orphanTranscriptMinAgeMs: 300_000,
nowMs,
}),
).resolves.toEqual({ removedEntries: 2, archivedTranscriptArtifacts: 2 });
expect(
adapter.loadSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-missing")),
).toBeUndefined();
expect(
adapter.loadSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-removed")),
).toBeUndefined();
expect(
adapter.loadSessionEntry(scopedEntry("agent:main:lifecycle-cleanup-fresh")),
).toMatchObject({
sessionId: "fresh-lifecycle",
});
expect(
adapter.loadSessionEntry(scopedEntry("agent:main:telegram:group:lifecycle-cleanup-room")),
).toMatchObject({ sessionId: "kept-by-segment" });
expect(adapter.loadSessionEntry(scopedEntry("agent:main:regular"))).toMatchObject({
sessionId: "referenced",
});
if (adapter.name === "sqlite") {
expect(fs.existsSync(cleanupStorePath)).toBe(false);
await expect(
adapter.loadTranscriptEvents(scopedTranscript("agent:main:regular", "referenced")),
).resolves.not.toEqual([]);
await expect(
adapter.loadTranscriptEvents(
scopedTranscript("agent:main:lifecycle-cleanup-removed", "removed-lifecycle"),
),
).resolves.toEqual([]);
} else {
const files = fs.readdirSync(path.dirname(cleanupStorePath));
expect(
files.filter((file) => file.startsWith("removed-lifecycle.jsonl.deleted.")),
).toHaveLength(1);
expect(
files.filter((file) => file.startsWith("orphan-lifecycle.jsonl.deleted.")),
).toHaveLength(1);
expect(files).toContain("fresh-lifecycle.jsonl");
expect(files).toContain("referenced.jsonl");
}
});
it("conforms for raw transcript event load and append", async () => {
const scope = adapter.transcriptScope(paths);
const readScope = adapter.transcriptReadScope(paths);

View File

@@ -27,6 +27,8 @@ import type {
SessionEntryPatchContext,
SessionEntryPatchOptions,
SessionEntrySummary,
SessionLifecycleArtifactCleanupParams,
SessionLifecycleArtifactCleanupResult,
SessionEntryUpdateOptions,
SessionTranscriptAccessScope,
SessionTranscriptReadScope,
@@ -234,6 +236,34 @@ export async function updateSqliteSessionEntry(
});
}
/** Cleans scoped session lifecycle rows and associated SQLite transcript state. */
export async function cleanupSqliteSessionLifecycleArtifacts(
params: SessionLifecycleArtifactCleanupParams,
): Promise<SessionLifecycleArtifactCleanupResult> {
const sessionKeySegmentPrefix = params.sessionKeySegmentPrefix.trim();
const transcriptContentMarker = params.transcriptContentMarker;
if (!sessionKeySegmentPrefix || !transcriptContentMarker) {
return { removedEntries: 0, archivedTranscriptArtifacts: 0 };
}
const resolved = resolveSqliteReadScope({ storePath: params.storePath });
return await runExclusiveSqliteSessionWrite(resolved, async () => {
let result: SessionLifecycleArtifactCleanupResult = {
removedEntries: 0,
archivedTranscriptArtifacts: 0,
};
runOpenClawAgentWriteTransaction((database) => {
result = cleanupSqliteSessionLifecycleArtifactsInTransaction(database, {
sessionKeySegmentPrefix,
transcriptContentMarker,
orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs,
nowMs: params.nowMs ?? Date.now(),
});
}, toDatabaseOptions(resolved));
return result;
});
}
/** Loads raw transcript events from the additive SQLite transcript store. */
export async function loadSqliteTranscriptEvents(
scope: SessionTranscriptReadScope,
@@ -533,6 +563,196 @@ function readExactSessionEntryRow(
return entry ? { entry, row } : undefined;
}
function sessionKeySegmentStartsWith(sessionKey: string, prefix: string): boolean {
const firstSeparator = sessionKey.indexOf(":");
if (firstSeparator < 0) {
return sessionKey.startsWith(prefix);
}
const secondSeparator = sessionKey.indexOf(":", firstSeparator + 1);
const sessionSegment = secondSeparator < 0 ? sessionKey : sessionKey.slice(secondSeparator + 1);
return sessionSegment.startsWith(prefix);
}
function readSessionTranscriptUpdatedAt(
database: OpenClawAgentDatabase,
sessionId: string,
): number | undefined {
const db = getSessionKysely(database.db);
const row = executeSqliteQueryTakeFirstSync(
database.db,
db
.selectFrom("transcript_events")
.select((eb) => eb.fn.max<number | bigint>("created_at").as("updated_at"))
.where("session_id", "=", sessionId),
);
if (row?.updated_at === null || row?.updated_at === undefined) {
return undefined;
}
return normalizeSqliteNumber(row.updated_at);
}
function sqliteTranscriptStateIsReclaimable(params: {
database: OpenClawAgentDatabase;
sessionId: string;
nowMs: number;
orphanTranscriptMinAgeMs: number;
}): boolean {
const updatedAt = readSessionTranscriptUpdatedAt(params.database, params.sessionId);
return updatedAt === undefined || params.nowMs - updatedAt >= params.orphanTranscriptMinAgeMs;
}
function sqliteTranscriptStateHasMarker(params: {
database: OpenClawAgentDatabase;
sessionId: string;
transcriptContentMarker: string;
}): boolean {
const db = getSessionKysely(params.database.db);
const row = executeSqliteQueryTakeFirstSync(
params.database.db,
db
.selectFrom("transcript_events")
.select("seq")
.where("session_id", "=", params.sessionId)
.where("event_json", "like", `%${params.transcriptContentMarker}%`)
.limit(1),
);
return row !== undefined;
}
function readReferencedSqliteSessionIds(database: OpenClawAgentDatabase): Set<string> {
const db = getSessionKysely(database.db);
const rows = executeSqliteQuerySync(
database.db,
db.selectFrom("session_entries").select("session_id"),
).rows;
return new Set(rows.map((row) => row.session_id));
}
function deleteSqliteSessionStateIfUnreferenced(params: {
database: OpenClawAgentDatabase;
referencedSessionIds: ReadonlySet<string>;
sessionId: string;
}): number {
if (params.referencedSessionIds.has(params.sessionId)) {
return 0;
}
const hadTranscriptState =
readSessionTranscriptUpdatedAt(params.database, params.sessionId) !== undefined;
const db = getSessionKysely(params.database.db);
executeSqliteQuerySync(
params.database.db,
db.deleteFrom("sessions").where("session_id", "=", params.sessionId),
);
return hadTranscriptState ? 1 : 0;
}
function cleanupSqliteOrphanLifecycleTranscriptState(params: {
database: OpenClawAgentDatabase;
referencedSessionIds: ReadonlySet<string>;
transcriptContentMarker: string;
orphanTranscriptMinAgeMs: number;
nowMs: number;
}): number {
const db = getSessionKysely(params.database.db);
const rows = executeSqliteQuerySync(
params.database.db,
db.selectFrom("sessions").select("session_id").orderBy("session_id", "asc"),
).rows;
let removed = 0;
// Orphan transcript state is represented by a sessions row without a live
// session entry. The marker keeps this scoped to the caller-owned lifecycle.
for (const row of rows) {
if (params.referencedSessionIds.has(row.session_id)) {
continue;
}
if (
!sqliteTranscriptStateIsReclaimable({
database: params.database,
sessionId: row.session_id,
nowMs: params.nowMs,
orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs,
}) ||
!sqliteTranscriptStateHasMarker({
database: params.database,
sessionId: row.session_id,
transcriptContentMarker: params.transcriptContentMarker,
})
) {
continue;
}
executeSqliteQuerySync(
params.database.db,
db.deleteFrom("sessions").where("session_id", "=", row.session_id),
);
removed += 1;
}
return removed;
}
function cleanupSqliteSessionLifecycleArtifactsInTransaction(
database: OpenClawAgentDatabase,
params: {
sessionKeySegmentPrefix: string;
transcriptContentMarker: string;
orphanTranscriptMinAgeMs: number;
nowMs: number;
},
): SessionLifecycleArtifactCleanupResult {
const db = getSessionKysely(database.db);
const rows = executeSqliteQuerySync(
database.db,
db
.selectFrom("session_entries")
.select(["session_key", "session_id"])
.orderBy("session_key", "asc"),
).rows;
const removedSessionIds = new Set<string>();
let removedEntries = 0;
// Delete matching lifecycle entries first; session/transcript state is only
// removed after we rebuild the post-delete reference set below.
for (const row of rows) {
if (!sessionKeySegmentStartsWith(row.session_key, params.sessionKeySegmentPrefix)) {
continue;
}
if (
!sqliteTranscriptStateIsReclaimable({
database,
sessionId: row.session_id,
nowMs: params.nowMs,
orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs,
})
) {
continue;
}
executeSqliteQuerySync(
database.db,
db.deleteFrom("session_entries").where("session_key", "=", row.session_key),
);
removedSessionIds.add(row.session_id);
removedEntries += 1;
}
const referencedSessionIds = readReferencedSqliteSessionIds(database);
let archivedTranscriptArtifacts = 0;
for (const sessionId of removedSessionIds) {
archivedTranscriptArtifacts += deleteSqliteSessionStateIfUnreferenced({
database,
referencedSessionIds,
sessionId,
});
}
archivedTranscriptArtifacts += cleanupSqliteOrphanLifecycleTranscriptState({
database,
referencedSessionIds,
transcriptContentMarker: params.transcriptContentMarker,
orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs,
nowMs: params.nowMs,
});
return { removedEntries, archivedTranscriptArtifacts };
}
function writeSessionEntry(
database: OpenClawAgentDatabase,
sessionKey: string,