mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
clawdbot-587: fix lifecycle cleanup followups
This commit is contained in:
@@ -1,2 +1,2 @@
|
||||
19bdf1196ec771a00777a16fd1e9c3662b8fd788a81034e705c41a74ee79c7ec plugin-sdk-api-baseline.json
|
||||
43feff80c90adad0f821d1f1e184a9bff1e93d81e6d53a26a26fd9e2972be759 plugin-sdk-api-baseline.jsonl
|
||||
1fc413736b1320d11981317535e791cd40cb7b5ac14d6beef50cc032b4e28afb plugin-sdk-api-baseline.json
|
||||
7a375996b95a8fd4fb4eade436497ea7153faf7061f65a4e4b16b54ed6690561 plugin-sdk-api-baseline.jsonl
|
||||
|
||||
@@ -607,16 +607,15 @@ function sqliteTranscriptStateHasMarker(params: {
|
||||
transcriptContentMarker: string;
|
||||
}): boolean {
|
||||
const db = getSessionKysely(params.database.db);
|
||||
const row = executeSqliteQueryTakeFirstSync(
|
||||
const rows = executeSqliteQuerySync(
|
||||
params.database.db,
|
||||
db
|
||||
.selectFrom("transcript_events")
|
||||
.select("seq")
|
||||
.select("event_json")
|
||||
.where("session_id", "=", params.sessionId)
|
||||
.where("event_json", "like", `%${params.transcriptContentMarker}%`)
|
||||
.limit(1),
|
||||
);
|
||||
return row !== undefined;
|
||||
.orderBy("seq", "asc"),
|
||||
).rows;
|
||||
return rows.some((row) => row.event_json.includes(params.transcriptContentMarker));
|
||||
}
|
||||
|
||||
function readReferencedSqliteSessionIds(database: OpenClawAgentDatabase): Set<string> {
|
||||
|
||||
@@ -876,60 +876,71 @@ export async function updateSessionStore<T>(
|
||||
}
|
||||
|
||||
async function archiveUnreferencedLifecycleTranscriptArtifacts(params: {
|
||||
referencedTranscriptPaths: ReadonlySet<string>;
|
||||
storePath: string;
|
||||
transcriptContentMarker: string;
|
||||
orphanTranscriptMinAgeMs: number;
|
||||
nowMs: number;
|
||||
}): Promise<number> {
|
||||
const sessionsDir = path.dirname(path.resolve(params.storePath));
|
||||
let entries: fs.Dirent[];
|
||||
try {
|
||||
entries = await fs.promises.readdir(sessionsDir, { withFileTypes: true });
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
return await runExclusiveSessionStoreWrite(params.storePath, async () => {
|
||||
const store = loadMutableSessionStoreForWriter(params.storePath);
|
||||
const referencedTranscriptPaths = new Set<string>();
|
||||
for (const entry of Object.values(store)) {
|
||||
const transcriptPath = resolveLifecycleTranscriptPath({ entry, sessionsDir });
|
||||
if (transcriptPath) {
|
||||
referencedTranscriptPaths.add(normalizePathForLifecycleComparison(transcriptPath));
|
||||
}
|
||||
}
|
||||
restoreUnchangedSessionStoreCache(params.storePath, store);
|
||||
|
||||
const { archiveSessionTranscripts } = await loadSessionArchiveRuntime();
|
||||
let archived = 0;
|
||||
// Only archive primary transcripts that are no longer referenced by the
|
||||
// current store and still carry the lifecycle marker supplied by the caller.
|
||||
for (const entry of entries) {
|
||||
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
|
||||
continue;
|
||||
}
|
||||
const transcriptPath = path.join(sessionsDir, entry.name);
|
||||
if (params.referencedTranscriptPaths.has(normalizePathForLifecycleComparison(transcriptPath))) {
|
||||
continue;
|
||||
}
|
||||
let stat: fs.Stats;
|
||||
let entries: fs.Dirent[];
|
||||
try {
|
||||
stat = await fs.promises.stat(transcriptPath);
|
||||
entries = await fs.promises.readdir(sessionsDir, { withFileTypes: true });
|
||||
} catch {
|
||||
continue;
|
||||
return 0;
|
||||
}
|
||||
if (params.nowMs - stat.mtimeMs < params.orphanTranscriptMinAgeMs) {
|
||||
continue;
|
||||
|
||||
const { archiveSessionTranscripts } = await loadSessionArchiveRuntime();
|
||||
let archived = 0;
|
||||
// Only archive primary transcripts that are no longer referenced by the
|
||||
// current store and still carry the lifecycle marker supplied by the caller.
|
||||
for (const entry of entries) {
|
||||
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
|
||||
continue;
|
||||
}
|
||||
const transcriptPath = path.join(sessionsDir, entry.name);
|
||||
if (referencedTranscriptPaths.has(normalizePathForLifecycleComparison(transcriptPath))) {
|
||||
continue;
|
||||
}
|
||||
let stat: fs.Stats;
|
||||
try {
|
||||
stat = await fs.promises.stat(transcriptPath);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (params.nowMs - stat.mtimeMs < params.orphanTranscriptMinAgeMs) {
|
||||
continue;
|
||||
}
|
||||
let content: string;
|
||||
try {
|
||||
content = await fs.promises.readFile(transcriptPath, "utf-8");
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!content.includes(params.transcriptContentMarker)) {
|
||||
continue;
|
||||
}
|
||||
const sessionId = entry.name.slice(0, -".jsonl".length);
|
||||
archived += archiveSessionTranscripts({
|
||||
sessionId,
|
||||
storePath: params.storePath,
|
||||
sessionFile: transcriptPath,
|
||||
reason: "deleted",
|
||||
restrictToStoreDir: true,
|
||||
}).length;
|
||||
}
|
||||
let content: string;
|
||||
try {
|
||||
content = await fs.promises.readFile(transcriptPath, "utf-8");
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!content.includes(params.transcriptContentMarker)) {
|
||||
continue;
|
||||
}
|
||||
const sessionId = entry.name.slice(0, -".jsonl".length);
|
||||
archived += archiveSessionTranscripts({
|
||||
sessionId,
|
||||
storePath: params.storePath,
|
||||
sessionFile: transcriptPath,
|
||||
reason: "deleted",
|
||||
restrictToStoreDir: true,
|
||||
}).length;
|
||||
}
|
||||
return archived;
|
||||
return archived;
|
||||
});
|
||||
}
|
||||
|
||||
/** Cleans scoped session lifecycle entries and their unreferenced transcript artifacts. */
|
||||
@@ -945,15 +956,14 @@ export async function cleanupSessionLifecycleArtifacts(
|
||||
const nowMs = params.nowMs ?? Date.now();
|
||||
const storePath = path.resolve(params.storePath);
|
||||
const sessionsDir = path.dirname(storePath);
|
||||
const referencedTranscriptPaths = new Set<string>();
|
||||
const removedSessionFiles = new Map<string, string | undefined>();
|
||||
let removedEntries = 0;
|
||||
let archivedTranscriptArtifacts = 0;
|
||||
|
||||
await runExclusiveSessionStoreWrite(storePath, async () => {
|
||||
const store = loadMutableSessionStoreForWriter(storePath);
|
||||
// Delete only rows owned by the named lifecycle, then build the remaining
|
||||
// transcript reference set while the store snapshot is writer-owned.
|
||||
// Delete only rows owned by the named lifecycle. Orphan transcript cleanup
|
||||
// reacquires this writer lock later so its reference set cannot go stale.
|
||||
for (const [sessionKey, entry] of Object.entries(store)) {
|
||||
const transcriptPath = resolveLifecycleTranscriptPath({ entry, sessionsDir });
|
||||
const matchesLifecycle = sessionKeySegmentStartsWith(sessionKey, sessionKeySegmentPrefix);
|
||||
@@ -970,9 +980,6 @@ export async function cleanupSessionLifecycleArtifacts(
|
||||
removedEntries += 1;
|
||||
continue;
|
||||
}
|
||||
if (transcriptPath) {
|
||||
referencedTranscriptPaths.add(normalizePathForLifecycleComparison(transcriptPath));
|
||||
}
|
||||
}
|
||||
|
||||
if (removedEntries === 0) {
|
||||
@@ -1015,7 +1022,6 @@ export async function cleanupSessionLifecycleArtifacts(
|
||||
archivedTranscriptArtifacts:
|
||||
archivedTranscriptArtifacts +
|
||||
(await archiveUnreferencedLifecycleTranscriptArtifacts({
|
||||
referencedTranscriptPaths,
|
||||
storePath,
|
||||
transcriptContentMarker,
|
||||
orphanTranscriptMinAgeMs: params.orphanTranscriptMinAgeMs,
|
||||
|
||||
Reference in New Issue
Block a user