mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
clawdbot-9c3: add session lifecycle cleanup seam
This commit is contained in:
@@ -1,2 +1,2 @@
|
||||
9ce72d763de6c95566e0167f99f5454b07c7c67940675533cb24c07058619a63 plugin-sdk-api-baseline.json
|
||||
e4dfccb85b985fe865145e24978255b729cdcbca0e26650a363a11bfcfc2e27b plugin-sdk-api-baseline.jsonl
|
||||
1fc413736b1320d11981317535e791cd40cb7b5ac14d6beef50cc032b4e28afb plugin-sdk-api-baseline.json
|
||||
7a375996b95a8fd4fb4eade436497ea7153faf7061f65a4e4b16b54ed6690561 plugin-sdk-api-baseline.jsonl
|
||||
|
||||
@@ -6,6 +6,7 @@ import { onSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
|
||||
import {
|
||||
appendTranscriptMessage,
|
||||
appendTranscriptEvent,
|
||||
cleanupSessionLifecycleArtifacts,
|
||||
listSessionEntries,
|
||||
loadExactSessionEntry,
|
||||
loadSessionEntry,
|
||||
@@ -266,6 +267,83 @@ describe("session accessor file-backed seam", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("cleans scoped lifecycle entries and unreferenced transcript artifacts", async () => {
|
||||
const nowMs = Date.now();
|
||||
const oldDate = new Date(nowMs - 600_000);
|
||||
const removedTranscriptPath = path.join(tempDir, "removed-lifecycle.jsonl");
|
||||
const customTranscriptPath = path.join(tempDir, "custom-lifecycle-old.jsonl");
|
||||
const freshDefaultTranscriptPath = path.join(tempDir, "custom-lifecycle.jsonl");
|
||||
const freshTranscriptPath = path.join(tempDir, "fresh-lifecycle.jsonl");
|
||||
const referencedTranscriptPath = path.join(tempDir, "referenced.jsonl");
|
||||
const orphanTranscriptPath = path.join(tempDir, "orphan-lifecycle.jsonl");
|
||||
|
||||
fs.writeFileSync(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
"agent:main:lifecycle-cleanup-removed": {
|
||||
sessionId: "removed-lifecycle",
|
||||
},
|
||||
"agent:main:lifecycle-cleanup-fresh": {
|
||||
sessionId: "fresh-lifecycle",
|
||||
},
|
||||
"agent:main:lifecycle-cleanup-custom": {
|
||||
sessionFile: "custom-lifecycle-old.jsonl",
|
||||
sessionId: "custom-lifecycle",
|
||||
},
|
||||
"agent:main:telegram:group:lifecycle-cleanup-room": {
|
||||
sessionId: "kept-by-segment",
|
||||
},
|
||||
"agent:main:regular": {
|
||||
sessionId: "referenced",
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
fs.writeFileSync(removedTranscriptPath, '{"runId":"lifecycle-marker-removed"}\n', "utf-8");
|
||||
fs.writeFileSync(customTranscriptPath, '{"runId":"lifecycle-marker-custom"}\n', "utf-8");
|
||||
fs.writeFileSync(freshDefaultTranscriptPath, '{"runId":"lifecycle-marker-default"}\n', "utf-8");
|
||||
fs.writeFileSync(freshTranscriptPath, '{"runId":"lifecycle-marker-fresh"}\n', "utf-8");
|
||||
fs.writeFileSync(
|
||||
referencedTranscriptPath,
|
||||
'{"runId":"lifecycle-marker-referenced"}\n',
|
||||
"utf-8",
|
||||
);
|
||||
fs.writeFileSync(orphanTranscriptPath, '{"runId":"lifecycle-marker-orphan"}\n', "utf-8");
|
||||
fs.utimesSync(removedTranscriptPath, oldDate, oldDate);
|
||||
fs.utimesSync(customTranscriptPath, oldDate, oldDate);
|
||||
fs.utimesSync(referencedTranscriptPath, oldDate, oldDate);
|
||||
fs.utimesSync(orphanTranscriptPath, oldDate, oldDate);
|
||||
|
||||
const result = await cleanupSessionLifecycleArtifacts({
|
||||
storePath,
|
||||
sessionKeySegmentPrefix: "lifecycle-cleanup-",
|
||||
transcriptContentMarker: "lifecycle-marker-",
|
||||
orphanTranscriptMinAgeMs: 300_000,
|
||||
nowMs,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ removedEntries: 2, archivedTranscriptArtifacts: 3 });
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true });
|
||||
expect(loaded).not.toHaveProperty("agent:main:lifecycle-cleanup-removed");
|
||||
expect(loaded).not.toHaveProperty("agent:main:lifecycle-cleanup-custom");
|
||||
expect(loaded).toHaveProperty("agent:main:lifecycle-cleanup-fresh");
|
||||
expect(loaded).toHaveProperty("agent:main:telegram:group:lifecycle-cleanup-room");
|
||||
expect(loaded).toHaveProperty("agent:main:regular");
|
||||
const files = fs.readdirSync(tempDir);
|
||||
expect(
|
||||
files.filter((file) => file.startsWith("removed-lifecycle.jsonl.deleted.")),
|
||||
).toHaveLength(1);
|
||||
expect(files.filter((file) => file.startsWith("orphan-lifecycle.jsonl.deleted."))).toHaveLength(
|
||||
1,
|
||||
);
|
||||
expect(
|
||||
files.filter((file) => file.startsWith("custom-lifecycle-old.jsonl.deleted.")),
|
||||
).toHaveLength(1);
|
||||
expect(files).toContain("custom-lifecycle.jsonl");
|
||||
expect(files).toContain("fresh-lifecycle.jsonl");
|
||||
expect(files).toContain("referenced.jsonl");
|
||||
});
|
||||
|
||||
it("loads and appends transcript events through a session scope", async () => {
|
||||
const scope = {
|
||||
sessionFile: transcriptPath,
|
||||
|
||||
@@ -13,12 +13,15 @@ import {
|
||||
import { resolveAndPersistSessionFile } from "./session-file.js";
|
||||
import {
|
||||
getSessionEntry,
|
||||
cleanupSessionLifecycleArtifacts as cleanupFileSessionLifecycleArtifacts,
|
||||
listSessionEntries as listFileSessionEntries,
|
||||
loadSessionStore,
|
||||
patchSessionEntry as patchFileSessionEntry,
|
||||
readSessionUpdatedAt as readFileSessionUpdatedAt,
|
||||
resolveSessionStoreEntry,
|
||||
updateSessionStoreEntry as updateFileSessionStoreEntry,
|
||||
type SessionLifecycleArtifactCleanupParams,
|
||||
type SessionLifecycleArtifactCleanupResult,
|
||||
} from "./store.js";
|
||||
import { parseSessionThreadInfo } from "./thread-info.js";
|
||||
import {
|
||||
@@ -99,6 +102,8 @@ export type SessionEntryPatchContext = {
|
||||
existingEntry?: SessionEntry;
|
||||
};
|
||||
|
||||
export type { SessionLifecycleArtifactCleanupParams, SessionLifecycleArtifactCleanupResult };
|
||||
|
||||
/** Loads one session entry through the storage-neutral accessor seam. */
|
||||
export function loadSessionEntry(scope: SessionAccessScope): SessionEntry | undefined {
|
||||
if (scope.clone === false) {
|
||||
@@ -214,6 +219,13 @@ export async function updateSessionEntry(
|
||||
});
|
||||
}
|
||||
|
||||
/** Cleans scoped session lifecycle entries and transcript artifacts through the accessor seam. */
|
||||
export async function cleanupSessionLifecycleArtifacts(
|
||||
params: SessionLifecycleArtifactCleanupParams,
|
||||
): Promise<SessionLifecycleArtifactCleanupResult> {
|
||||
return await cleanupFileSessionLifecycleArtifacts(params);
|
||||
}
|
||||
|
||||
/** Loads raw transcript events through the storage-neutral accessor seam. */
|
||||
export async function loadTranscriptEvents(
|
||||
scope: SessionTranscriptReadScope,
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
// Runtime facade for session store mutation helpers.
|
||||
export {
|
||||
applySessionStoreEntryPatch,
|
||||
cleanupSessionLifecycleArtifacts,
|
||||
updateSessionStore,
|
||||
updateSessionStoreEntry,
|
||||
} from "./store.js";
|
||||
export type {
|
||||
SessionLifecycleArtifactCleanupParams,
|
||||
SessionLifecycleArtifactCleanupResult,
|
||||
} from "./store.js";
|
||||
|
||||
@@ -5,6 +5,7 @@ import type { MsgContext } from "../../auto-reply/templating.js";
|
||||
import { writeTextAtomic } from "../../infra/json-files.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
|
||||
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
|
||||
import {
|
||||
deliveryContextFromChannelRoute,
|
||||
deliveryContextFromSession,
|
||||
@@ -15,9 +16,10 @@ import {
|
||||
import type { DeliveryContext } from "../../utils/delivery-context.types.js";
|
||||
import { getFileStatSnapshot } from "../cache-utils.js";
|
||||
import { getRuntimeConfig } from "../io.js";
|
||||
import { formatSessionArchiveTimestamp } from "./artifacts.js";
|
||||
import { enforceSessionDiskBudget, type SessionDiskBudgetSweepResult } from "./disk-budget.js";
|
||||
import { deriveSessionMetaPatch } from "./metadata.js";
|
||||
import { resolveStorePath } from "./paths.js";
|
||||
import { resolveSessionFilePath, resolveStorePath } from "./paths.js";
|
||||
import {
|
||||
ensureSessionStorePromptBlobsForPersistence,
|
||||
isSessionSkillPromptBlobReadable,
|
||||
@@ -191,6 +193,24 @@ type SessionEntryWorkflowOptions = {
|
||||
storePath?: string;
|
||||
};
|
||||
|
||||
export type SessionLifecycleArtifactCleanupParams = {
|
||||
/** Session store to clean. */
|
||||
storePath: string;
|
||||
/** Matches the persisted session-key segment after `agent:<id>:`. */
|
||||
sessionKeySegmentPrefix: string;
|
||||
/** Marker that identifies transcript artifacts owned by this lifecycle. */
|
||||
transcriptContentMarker: string;
|
||||
/** Minimum age before a present transcript can be reclaimed or archived. */
|
||||
orphanTranscriptMinAgeMs: number;
|
||||
/** Testable clock override. */
|
||||
nowMs?: number;
|
||||
};
|
||||
|
||||
export type SessionLifecycleArtifactCleanupResult = {
|
||||
removedEntries: number;
|
||||
archivedTranscriptArtifacts: number;
|
||||
};
|
||||
|
||||
function cloneSessionEntry(entry: SessionEntry): SessionEntry {
|
||||
return cloneSessionStoreRecord({ entry }).entry;
|
||||
}
|
||||
@@ -503,6 +523,66 @@ function sessionEntriesHaveSameSerializedForm(
|
||||
return previous !== undefined && JSON.stringify(previous) === JSON.stringify(next);
|
||||
}
|
||||
|
||||
function normalizePathForLifecycleComparison(filePath: string): string {
|
||||
try {
|
||||
return path.normalize(fs.realpathSync(filePath));
|
||||
} catch {
|
||||
return path.normalize(path.resolve(filePath));
|
||||
}
|
||||
}
|
||||
|
||||
function sessionKeySegmentStartsWith(sessionKey: string, prefix: string): boolean {
|
||||
const firstSeparator = sessionKey.indexOf(":");
|
||||
if (firstSeparator < 0) {
|
||||
return sessionKey.startsWith(prefix);
|
||||
}
|
||||
const secondSeparator = sessionKey.indexOf(":", firstSeparator + 1);
|
||||
const sessionSegment = secondSeparator < 0 ? sessionKey : sessionKey.slice(secondSeparator + 1);
|
||||
return sessionSegment.startsWith(prefix);
|
||||
}
|
||||
|
||||
function resolveLifecycleTranscriptPath(params: {
|
||||
entry: SessionEntry | undefined;
|
||||
sessionsDir: string;
|
||||
}): string | null {
|
||||
const sessionId = params.entry?.sessionId?.trim();
|
||||
if (!sessionId) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return resolveSessionFilePath(sessionId, params.entry, { sessionsDir: params.sessionsDir });
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function lifecycleTranscriptIsReclaimable(params: {
|
||||
transcriptPath: string | null;
|
||||
nowMs: number;
|
||||
orphanTranscriptMinAgeMs: number;
|
||||
}): boolean {
|
||||
if (!params.transcriptPath || !fs.existsSync(params.transcriptPath)) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
const stat = fs.statSync(params.transcriptPath);
|
||||
return params.nowMs - stat.mtimeMs >= params.orphanTranscriptMinAgeMs;
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
function archiveExactLifecycleTranscriptPath(transcriptPath: string): number {
|
||||
const archivedPath = `${transcriptPath}.deleted.${formatSessionArchiveTimestamp()}`;
|
||||
try {
|
||||
fs.renameSync(transcriptPath, archivedPath);
|
||||
emitSessionTranscriptUpdate({ sessionFile: archivedPath });
|
||||
return 1;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
async function saveSessionStoreUnlocked(
|
||||
storePath: string,
|
||||
store: Record<string, SessionEntry>,
|
||||
@@ -818,6 +898,158 @@ export async function updateSessionStore<T>(
|
||||
});
|
||||
}
|
||||
|
||||
async function archiveUnreferencedLifecycleTranscriptArtifacts(params: {
|
||||
storePath: string;
|
||||
transcriptContentMarker: string;
|
||||
orphanTranscriptMinAgeMs: number;
|
||||
nowMs: number;
|
||||
}): Promise<number> {
|
||||
const sessionsDir = path.dirname(path.resolve(params.storePath));
|
||||
return await runExclusiveSessionStoreWrite(params.storePath, async () => {
|
||||
const store = loadMutableSessionStoreForWriter(params.storePath);
|
||||
const referencedTranscriptPaths = new Set<string>();
|
||||
for (const entry of Object.values(store)) {
|
||||
const transcriptPath = resolveLifecycleTranscriptPath({ entry, sessionsDir });
|
||||
if (transcriptPath) {
|
||||
referencedTranscriptPaths.add(normalizePathForLifecycleComparison(transcriptPath));
|
||||
}
|
||||
}
|
||||
restoreUnchangedSessionStoreCache(params.storePath, store);
|
||||
|
||||
let entries: fs.Dirent[];
|
||||
try {
|
||||
entries = await fs.promises.readdir(sessionsDir, { withFileTypes: true });
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const { archiveSessionTranscripts } = await loadSessionArchiveRuntime();
|
||||
let archived = 0;
|
||||
// Only archive primary transcripts that are no longer referenced by the
|
||||
// current store and still carry the lifecycle marker supplied by the caller.
|
||||
for (const entry of entries) {
|
||||
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
|
||||
continue;
|
||||
}
|
||||
const transcriptPath = path.join(sessionsDir, entry.name);
|
||||
if (referencedTranscriptPaths.has(normalizePathForLifecycleComparison(transcriptPath))) {
|
||||
continue;
|
||||
}
|
||||
let stat: fs.Stats;
|
||||
try {
|
||||
stat = await fs.promises.stat(transcriptPath);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (params.nowMs - stat.mtimeMs < params.orphanTranscriptMinAgeMs) {
|
||||
continue;
|
||||
}
|
||||
let content: string;
|
||||
try {
|
||||
content = await fs.promises.readFile(transcriptPath, "utf-8");
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!content.includes(params.transcriptContentMarker)) {
|
||||
continue;
|
||||
}
|
||||
const sessionId = entry.name.slice(0, -".jsonl".length);
|
||||
archived += archiveSessionTranscripts({
|
||||
sessionId,
|
||||
storePath: params.storePath,
|
||||
sessionFile: transcriptPath,
|
||||
reason: "deleted",
|
||||
restrictToStoreDir: true,
|
||||
}).length;
|
||||
}
|
||||
return archived;
|
||||
});
|
||||
}
|
||||
|
||||
/** Cleans scoped session lifecycle entries and their unreferenced transcript artifacts. */
|
||||
export async function cleanupSessionLifecycleArtifacts(
|
||||
params: SessionLifecycleArtifactCleanupParams,
|
||||
): Promise<SessionLifecycleArtifactCleanupResult> {
|
||||
const sessionKeySegmentPrefix = params.sessionKeySegmentPrefix.trim();
|
||||
const transcriptContentMarker = params.transcriptContentMarker;
|
||||
if (!sessionKeySegmentPrefix || !transcriptContentMarker) {
|
||||
return { removedEntries: 0, archivedTranscriptArtifacts: 0 };
|
||||
}
|
||||
|
||||
const nowMs = params.nowMs ?? Date.now();
|
||||
const storePath = path.resolve(params.storePath);
|
||||
const sessionsDir = path.dirname(storePath);
|
||||
const removedSessionFiles = new Map<string, string | undefined>();
|
||||
const removedTranscriptPaths: Array<{ sessionId: string; transcriptPath: string }> = [];
|
||||
let removedEntries = 0;
|
||||
let archivedTranscriptArtifacts = 0;
|
||||
|
||||
await runExclusiveSessionStoreWrite(storePath, async () => {
|
||||
const store = loadMutableSessionStoreForWriter(storePath);
|
||||
// Delete only rows owned by the named lifecycle. Orphan transcript cleanup
|
||||
// reacquires this writer lock later so its reference set cannot go stale.
|
||||
for (const [sessionKey, entry] of Object.entries(store)) {
|
||||
const transcriptPath = resolveLifecycleTranscriptPath({ entry, sessionsDir });
|
||||
const matchesLifecycle = sessionKeySegmentStartsWith(sessionKey, sessionKeySegmentPrefix);
|
||||
if (
|
||||
matchesLifecycle &&
|
||||
lifecycleTranscriptIsReclaimable({
|
||||
transcriptPath,
|
||||
nowMs,
|
||||
orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs,
|
||||
})
|
||||
) {
|
||||
rememberRemovedSessionFile(removedSessionFiles, entry);
|
||||
if (entry.sessionId && transcriptPath && fs.existsSync(transcriptPath)) {
|
||||
removedTranscriptPaths.push({ sessionId: entry.sessionId, transcriptPath });
|
||||
}
|
||||
delete store[sessionKey];
|
||||
removedEntries += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (removedEntries === 0) {
|
||||
restoreUnchangedSessionStoreCache(storePath, store);
|
||||
return;
|
||||
}
|
||||
|
||||
const referencedSessionIds = new Set(
|
||||
Object.values(store)
|
||||
.map((entry) => entry?.sessionId)
|
||||
.filter((sessionId): sessionId is string => Boolean(sessionId)),
|
||||
);
|
||||
// Archive only the exact transcript path that passed the age/missing guard.
|
||||
// Broader session-id candidate scans can include fresh sibling transcripts.
|
||||
for (const { sessionId: removedSessionId, transcriptPath } of removedTranscriptPaths) {
|
||||
if (referencedSessionIds.has(removedSessionId)) {
|
||||
continue;
|
||||
}
|
||||
archivedTranscriptArtifacts += archiveExactLifecycleTranscriptPath(transcriptPath);
|
||||
}
|
||||
const { removeRemovedSessionTrajectoryArtifacts } = await loadTrajectoryCleanupRuntime();
|
||||
await removeRemovedSessionTrajectoryArtifacts({
|
||||
removedSessionFiles,
|
||||
referencedSessionIds,
|
||||
storePath,
|
||||
restrictToStoreDir: true,
|
||||
});
|
||||
await saveSessionStoreUnlocked(storePath, store, { skipMaintenance: true });
|
||||
});
|
||||
|
||||
return {
|
||||
removedEntries,
|
||||
archivedTranscriptArtifacts:
|
||||
archivedTranscriptArtifacts +
|
||||
(await archiveUnreferencedLifecycleTranscriptArtifacts({
|
||||
storePath,
|
||||
transcriptContentMarker,
|
||||
orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs,
|
||||
nowMs,
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
||||
export async function runQuotaSuspensionMaintenance(params: {
|
||||
storePath: string;
|
||||
now?: number;
|
||||
|
||||
@@ -22,6 +22,7 @@ export { resolveGroupSessionKey } from "../config/sessions/group.js";
|
||||
export { canonicalizeMainSessionAlias } from "../config/sessions/main-session.js";
|
||||
export {
|
||||
clearSessionStoreCacheForTest,
|
||||
cleanupSessionLifecycleArtifacts,
|
||||
getSessionEntry,
|
||||
listSessionEntries,
|
||||
patchSessionEntry,
|
||||
@@ -33,6 +34,10 @@ export {
|
||||
updateSessionStoreEntry,
|
||||
upsertSessionEntry,
|
||||
} from "../config/sessions/store.js";
|
||||
export type {
|
||||
SessionLifecycleArtifactCleanupParams,
|
||||
SessionLifecycleArtifactCleanupResult,
|
||||
} from "../config/sessions/store.js";
|
||||
export {
|
||||
evaluateSessionFreshness,
|
||||
resolveChannelResetConfig,
|
||||
|
||||
Reference in New Issue
Block a user