refactor: add transcript runtime identity contract

This commit is contained in:
Josh Lehman
2026-06-01 10:16:14 -07:00
parent da45b43789
commit a0e06d48a7
8 changed files with 428 additions and 39 deletions

View File

@@ -14,6 +14,10 @@ import {
TranscriptFileState,
writeTranscriptFileAtomic,
} from "./transcript-file-state.js";
import {
resolveRuntimeTranscriptTarget,
type RuntimeTranscriptScope,
} from "./transcript-runtime-state.js";
type ReadonlySessionManagerForRotation = Pick<
TranscriptFileState,
@@ -99,6 +103,24 @@ export async function rotateTranscriptFileAfterCompaction(params: {
});
}
/**
* Rotates a runtime transcript after compaction using agent/session identity.
*/
export async function rotateRuntimeTranscriptAfterCompaction(params: {
sessionManager?: ReadonlySessionManagerForRotation;
scope: RuntimeTranscriptScope;
now?: () => Date;
}): Promise<CompactionTranscriptRotation> {
const target = await resolveRuntimeTranscriptTarget(params.scope);
const sessionManager =
params.sessionManager ?? (await readTranscriptFileState(target.sessionFile));
return await rotateTranscriptAfterCompaction({
sessionManager,
sessionFile: target.sessionFile,
...(params.now ? { now: params.now } : {}),
});
}
function findLatestCompactionIndex(entries: SessionEntry[]): number {
for (let index = entries.length - 1; index >= 0; index -= 1) {
if (entries[index]?.type === "compaction") {

View File

@@ -25,6 +25,10 @@ import {
rewriteTranscriptEntriesInSessionManager,
rewriteTranscriptEntriesInState,
} from "./transcript-rewrite.js";
import {
resolveRuntimeTranscriptTarget,
type RuntimeTranscriptScope,
} from "./transcript-runtime-state.js";
/**
* Maximum share of the context window a single tool result should occupy.
@@ -818,6 +822,49 @@ export function truncateOversizedToolResultsInSessionManager(params: {
}
}
/**
* Truncates oversized tool results for a runtime transcript scope.
*/
export async function truncateOversizedToolResultsInRuntimeTranscript(params: {
scope: RuntimeTranscriptScope;
contextWindowTokens: number;
maxCharsOverride?: number;
aggregateMaxCharsOverride?: number;
config?: SessionWriteLockAcquireTimeoutConfig;
}): Promise<{ truncated: boolean; truncatedCount: number; reason?: string }> {
let sessionLock: Awaited<ReturnType<typeof acquireSessionWriteLock>> | undefined;
try {
const target = await resolveRuntimeTranscriptTarget(params.scope);
sessionLock = await acquireSessionWriteLock({
sessionFile: target.sessionFile,
...resolveSessionWriteLockOptions(params.config),
});
const state = await readTranscriptFileState(target.sessionFile);
return await truncateOversizedToolResultsInTranscriptState({
state,
contextWindowTokens: params.contextWindowTokens,
maxCharsOverride: params.maxCharsOverride,
aggregateMaxCharsOverride: params.aggregateMaxCharsOverride,
sessionFile: target.sessionFile,
sessionId: target.sessionId,
sessionKey: target.sessionKey,
agentId: target.agentId,
config: params.config,
});
} catch (err) {
const errMsg = formatErrorMessage(err);
log.warn(`[tool-result-truncation] Failed to truncate: ${errMsg}`);
return { truncated: false, truncatedCount: 0, reason: errMsg };
} finally {
await sessionLock?.release();
}
}
/**
* Truncates a named transcript file artifact. Runtime callers should prefer
* truncateOversizedToolResultsInRuntimeTranscript with agent/session scope.
*/
export async function truncateOversizedToolResultsInSession(params: {
sessionFile: string;
contextWindowTokens: number;

View File

@@ -22,6 +22,11 @@ import {
readTranscriptFileState,
type TranscriptFileState,
} from "./transcript-file-state.js";
import {
persistRuntimeTranscriptStateMutation,
resolveRuntimeTranscriptTarget,
type RuntimeTranscriptScope,
} from "./transcript-runtime-state.js";
type SessionManagerLike = ReturnType<typeof SessionManager.open>;
type SessionBranchEntry = ReturnType<SessionManagerLike["getBranch"]>[number];
@@ -372,8 +377,65 @@ export function rewriteTranscriptEntriesInState(params: {
}
/**
* Open a transcript file, rewrite message entries on the active branch, and
* emit a transcript update when the active branch changed.
* Rewrites message entries for a runtime transcript without using the
* file-backed path as caller identity.
*/
export async function rewriteTranscriptEntriesInRuntimeTranscript(params: {
scope: RuntimeTranscriptScope;
request: TranscriptRewriteRequest;
config?: SessionWriteLockAcquireTimeoutConfig;
}): Promise<TranscriptRewriteResult> {
let sessionLock: Awaited<ReturnType<typeof acquireSessionWriteLock>> | undefined;
try {
const target = await resolveRuntimeTranscriptTarget(params.scope);
sessionLock = await acquireSessionWriteLock({
sessionFile: target.sessionFile,
...resolveSessionWriteLockOptions(params.config),
});
const state = await readTranscriptFileState(target.sessionFile);
const result = rewriteTranscriptEntriesInState({
state,
replacements: params.request.replacements,
...(params.request.allowedRewriteSuffixEntryIds
? { allowedRewriteSuffixEntryIds: params.request.allowedRewriteSuffixEntryIds }
: {}),
});
if (result.changed) {
await persistRuntimeTranscriptStateMutation({
target,
state,
appendedEntries: result.appendedEntries,
});
emitSessionTranscriptUpdate({
sessionFile: target.sessionFile,
sessionKey: target.sessionKey,
agentId: target.agentId,
});
log.info(
`[transcript-rewrite] rewrote ${result.rewrittenEntries} entr` +
`${result.rewrittenEntries === 1 ? "y" : "ies"} ` +
`bytesFreed=${result.bytesFreed} ` +
`sessionKey=${target.sessionKey}`,
);
}
return result;
} catch (err) {
const reason = formatErrorMessage(err);
log.warn(`[transcript-rewrite] failed: ${reason}`);
return {
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
reason,
};
} finally {
await sessionLock?.release();
}
}
/**
* Rewrites a named transcript file artifact. Runtime callers should prefer
* rewriteTranscriptEntriesInRuntimeTranscript with agent/session scope.
*/
export async function rewriteTranscriptEntriesInSessionFile(params: {
sessionFile: string;

View File

@@ -0,0 +1,63 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
appendTranscriptMessage,
upsertSessionEntry,
} from "../../config/sessions/session-accessor.js";
import {
deleteRuntimeTranscript,
readRuntimeTranscriptState,
runtimeTranscriptExists,
} from "./transcript-runtime-state.js";
describe("runtime transcript state", () => {
let tempDir: string;
let storePath: string;
beforeEach(() => {
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-runtime-transcript-"));
storePath = path.join(tempDir, "sessions.json");
});
afterEach(() => {
fs.rmSync(tempDir, { recursive: true, force: true });
});
it("reads and deletes transcript state through runtime scope", async () => {
const scope = {
agentId: "main",
sessionId: "session-1",
sessionKey: "agent:main:main",
storePath,
};
await upsertSessionEntry(scope, {
sessionId: scope.sessionId,
updatedAt: 10,
});
await appendTranscriptMessage(scope, {
cwd: tempDir,
message: {
content: "hello",
role: "user",
},
});
await expect(runtimeTranscriptExists(scope)).resolves.toBe(true);
const { state, target } = await readRuntimeTranscriptState(scope);
expect(fs.realpathSync(target.sessionFile)).toBe(
fs.realpathSync(path.join(tempDir, "session-1.jsonl")),
);
expect(state.getBranch()).toEqual([
expect.objectContaining({
message: expect.objectContaining({ content: "hello" }),
type: "message",
}),
]);
await expect(deleteRuntimeTranscript(scope)).resolves.toBe(true);
await expect(runtimeTranscriptExists(scope)).resolves.toBe(false);
});
});

View File

@@ -0,0 +1,100 @@
import fs from "node:fs/promises";
import type {
SessionTranscriptRuntimeScope,
SessionTranscriptRuntimeTarget,
} from "../../config/sessions/session-accessor.js";
import { resolveSessionTranscriptRuntimeTarget } from "../../config/sessions/session-accessor.js";
import type { SessionEntry, SessionHeader } from "../sessions/index.js";
import {
persistTranscriptStateMutation,
readTranscriptFileState,
type TranscriptFileState,
writeTranscriptFileAtomic,
} from "./transcript-file-state.js";
export type RuntimeTranscriptScope = SessionTranscriptRuntimeScope;
export type RuntimeTranscriptTarget = SessionTranscriptRuntimeTarget;
export type RuntimeTranscriptState = {
state: TranscriptFileState;
target: RuntimeTranscriptTarget;
};
/**
* Resolves the current file-backed transcript target for runtime state
* operations. The returned path is an implementation detail, not identity.
*/
export async function resolveRuntimeTranscriptTarget(
scope: RuntimeTranscriptScope,
): Promise<RuntimeTranscriptTarget> {
return await resolveSessionTranscriptRuntimeTarget(scope);
}
/**
* Reads transcript state through the runtime transcript identity contract.
*/
export async function readRuntimeTranscriptState(
scope: RuntimeTranscriptScope,
): Promise<RuntimeTranscriptState> {
const target = await resolveRuntimeTranscriptTarget(scope);
return {
state: await readTranscriptFileState(target.sessionFile),
target,
};
}
/**
* Persists an append or migration rewrite for a resolved runtime transcript.
*/
export async function persistRuntimeTranscriptStateMutation(params: {
appendedEntries: SessionEntry[];
state: TranscriptFileState;
target: RuntimeTranscriptTarget;
}): Promise<void> {
await persistTranscriptStateMutation({
sessionFile: params.target.sessionFile,
state: params.state,
appendedEntries: params.appendedEntries,
});
}
/**
* Atomically replaces the file-backed transcript for a runtime transcript.
*/
export async function replaceRuntimeTranscriptEntries(params: {
entries: Array<SessionHeader | SessionEntry>;
target: RuntimeTranscriptTarget;
}): Promise<void> {
await writeTranscriptFileAtomic(params.target.sessionFile, params.entries);
}
/**
* Checks existence of the current runtime transcript without exposing path
* identity to callers.
*/
export async function runtimeTranscriptExists(scope: RuntimeTranscriptScope): Promise<boolean> {
const target = await resolveRuntimeTranscriptTarget(scope);
try {
const stat = await fs.stat(target.sessionFile);
return stat.isFile();
} catch {
return false;
}
}
/**
* Deletes the current runtime transcript. This remains file-backed until the
* SQLite implementation owns transcript deletion in 3.2.
*/
export async function deleteRuntimeTranscript(scope: RuntimeTranscriptScope): Promise<boolean> {
const target = await resolveRuntimeTranscriptTarget(scope);
try {
await fs.unlink(target.sessionFile);
return true;
} catch (err) {
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
return false;
}
throw err;
}
}

View File

@@ -15,6 +15,7 @@ import {
publishTranscriptUpdate,
readSessionUpdatedAt,
replaceSessionEntry,
resolveSessionTranscriptRuntimeTarget,
updateSessionEntry,
upsertSessionEntry,
} from "./session-accessor.js";
@@ -534,6 +535,31 @@ describe("session accessor file-backed seam", () => {
await expect(loadTranscriptEvents(scope)).resolves.toEqual([event]);
});
it("resolves runtime transcript targets from scope without caller-owned paths", async () => {
const scope = {
agentId: "main",
sessionId: "session-1",
sessionKey: "agent:main:main",
storePath,
};
await upsertSessionEntry(scope, {
sessionId: scope.sessionId,
updatedAt: 10,
});
const target = await resolveSessionTranscriptRuntimeTarget(scope);
expect(target).toMatchObject({
agentId: "main",
sessionId: "session-1",
sessionKey: "agent:main:main",
});
expect(fs.realpathSync(path.dirname(target.sessionFile))).toBe(fs.realpathSync(tempDir));
expect(path.basename(target.sessionFile)).toBe("session-1.jsonl");
expect(loadSessionEntry(scope)?.sessionFile).toBe(target.sessionFile);
});
it("persists transcript metadata under the normalized session key", async () => {
const canonicalScope = {
sessionId: "session-1",

View File

@@ -52,6 +52,11 @@ export type SessionTranscriptAccessScope = SessionTranscriptReadScope & {
sessionKey: string;
};
export type SessionTranscriptRuntimeScope = SessionAccessScope & {
sessionId: string;
threadId?: string | number;
};
export type SessionTranscriptWriteScope = Omit<SessionTranscriptAccessScope, "sessionId"> & {
sessionId?: string;
};
@@ -87,6 +92,13 @@ export type TranscriptMessageAppendResult<TMessage> = {
export type TranscriptUpdatePayload = Omit<SessionTranscriptUpdate, "sessionFile">;
export type SessionTranscriptRuntimeTarget = {
agentId: string;
sessionFile: string;
sessionId: string;
sessionKey: string;
};
export type SessionEntryUpdateOptions = {
skipMaintenance?: boolean;
takeCacheOwnership?: boolean;
@@ -309,6 +321,67 @@ export async function publishTranscriptUpdate(
});
}
/**
* Resolves the current file-backed target for a storage-neutral runtime
* transcript scope. Callers use the scope as identity; sessionFile is returned
* only for current file-backed implementation details such as locks/events.
*/
export async function resolveSessionTranscriptRuntimeTarget(
scope: SessionTranscriptRuntimeScope,
): Promise<SessionTranscriptRuntimeTarget> {
const agentId = scope.agentId ?? resolveAgentIdFromSessionKey(scope.sessionKey);
if (!agentId) {
throw new Error(`Cannot resolve transcript scope without an agent id: ${scope.sessionKey}`);
}
const sessionStore = scope.storePath
? loadSessionStore(scope.storePath, { skipCache: true })
: undefined;
const resolvedStoreEntry = sessionStore
? resolveSessionStoreEntry({ store: sessionStore, sessionKey: scope.sessionKey })
: undefined;
const sessionEntry = resolvedStoreEntry?.existing ?? loadSessionEntry(scope);
const sessionKey = resolvedStoreEntry?.normalizedKey ?? scope.sessionKey;
if (sessionStore && scope.storePath) {
const sessionsDir = path.dirname(path.resolve(scope.storePath));
const threadId = scope.threadId ?? parseSessionThreadInfo(scope.sessionKey).threadId;
const fallbackSessionFile =
!sessionEntry?.sessionFile && threadId !== undefined
? resolveSessionTranscriptPathInDir(scope.sessionId, sessionsDir, threadId)
: undefined;
const resolved = await resolveAndPersistSessionFile({
agentId,
fallbackSessionFile,
sessionEntry,
sessionId: scope.sessionId,
sessionKey,
sessionStore,
sessionsDir,
storePath: scope.storePath,
});
return {
agentId,
sessionFile: resolved.sessionFile,
sessionId: scope.sessionId,
sessionKey,
};
}
const resolved = await resolveSessionTranscriptFile({
agentId,
sessionEntry,
sessionId: scope.sessionId,
sessionKey: scope.sessionKey,
...(sessionStore ? { sessionStore } : {}),
...(scope.storePath ? { storePath: scope.storePath } : {}),
...(scope.threadId !== undefined ? { threadId: scope.threadId } : {}),
});
return {
agentId,
sessionFile: resolved.sessionFile,
sessionId: scope.sessionId,
sessionKey,
};
}
function createFallbackSessionEntry(patch: Partial<SessionEntry>): SessionEntry {
const now = Date.now();
return {
@@ -364,43 +437,8 @@ async function resolveTranscriptAccess(scope: SessionTranscriptWriteScope): Prom
if (!scope.sessionId) {
throw new Error(`Cannot resolve transcript scope without a session id: ${scope.sessionKey}`);
}
const agentId = scope.agentId ?? resolveAgentIdFromSessionKey(scope.sessionKey);
if (!agentId) {
throw new Error(`Cannot resolve transcript scope without an agent id: ${scope.sessionKey}`);
}
const sessionStore = scope.storePath
? loadSessionStore(scope.storePath, { skipCache: true })
: undefined;
const resolvedStoreEntry = sessionStore
? resolveSessionStoreEntry({ store: sessionStore, sessionKey: scope.sessionKey })
: undefined;
const sessionEntry = resolvedStoreEntry?.existing ?? loadSessionEntry(scope);
const sessionKey = resolvedStoreEntry?.normalizedKey ?? scope.sessionKey;
if (sessionStore && scope.storePath) {
const sessionsDir = path.dirname(path.resolve(scope.storePath));
const threadId = scope.threadId ?? parseSessionThreadInfo(scope.sessionKey).threadId;
const fallbackSessionFile =
!sessionEntry?.sessionFile && threadId !== undefined
? resolveSessionTranscriptPathInDir(scope.sessionId, sessionsDir, threadId)
: undefined;
return await resolveAndPersistSessionFile({
agentId,
fallbackSessionFile,
sessionEntry,
sessionId: scope.sessionId,
sessionKey,
sessionStore,
sessionsDir,
storePath: scope.storePath,
});
}
return await resolveSessionTranscriptFile({
agentId,
sessionEntry,
return await resolveSessionTranscriptRuntimeTarget({
...scope,
sessionId: scope.sessionId,
sessionKey: scope.sessionKey,
...(sessionStore ? { sessionStore } : {}),
...(scope.storePath ? { storePath: scope.storePath } : {}),
...(scope.threadId !== undefined ? { threadId: scope.threadId } : {}),
});
}

View File

@@ -13,6 +13,10 @@ import type {
SessionEntry,
} from "../config/sessions.js";
import { isCompactionCheckpointTranscriptFileName } from "../config/sessions/artifacts.js";
import {
resolveSessionTranscriptRuntimeTarget,
type SessionTranscriptRuntimeScope,
} from "../config/sessions/session-accessor.js";
import { streamSessionTranscriptLines } from "../config/sessions/transcript-stream.js";
import { CURRENT_SESSION_VERSION } from "../config/sessions/version.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
@@ -327,6 +331,17 @@ export async function readSessionLeafIdFromTranscriptAsync(
return null;
}
/**
* Reads the latest leaf id for a runtime transcript scope.
*/
export async function readRuntimeSessionLeafIdFromTranscriptAsync(
scope: SessionTranscriptRuntimeScope,
maxBytes = MAX_COMPACTION_CHECKPOINT_LEAF_SCAN_BYTES,
): Promise<string | null> {
const target = await resolveSessionTranscriptRuntimeTarget(scope);
return await readSessionLeafIdFromTranscriptAsync(target.sessionFile, maxBytes);
}
export async function forkCompactionCheckpointTranscriptAsync(params: {
sourceFile: string;
sourceLeafId?: string;
@@ -423,6 +438,22 @@ export async function captureCompactionCheckpointSnapshotAsync(params: {
};
}
/**
* Captures checkpoint metadata for a runtime transcript scope.
*/
export async function captureRuntimeCompactionCheckpointSnapshotAsync(params: {
sessionManager?: Pick<SessionManager, "getLeafId">;
scope: SessionTranscriptRuntimeScope;
maxBytes?: number;
}): Promise<CapturedCompactionCheckpointSnapshot | null> {
const target = await resolveSessionTranscriptRuntimeTarget(params.scope);
return await captureCompactionCheckpointSnapshotAsync({
sessionManager: params.sessionManager,
sessionFile: target.sessionFile,
maxBytes: params.maxBytes,
});
}
export async function cleanupCompactionCheckpointSnapshot(
snapshot: CapturedCompactionCheckpointSnapshot | null | undefined,
): Promise<void> {