clawdbot-9c3: add session accessor seam

This commit is contained in:
Josh Lehman
2026-05-31 17:11:52 -07:00
parent c5d6764f56
commit d0819fd978
3 changed files with 339 additions and 0 deletions

View File

@@ -0,0 +1,159 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
appendTranscriptEvent,
listSessionEntries,
loadSessionEntry,
loadTranscriptEvents,
upsertSessionEntry,
} from "./session-accessor.js";
describe("session accessor file-backed seam", () => {
let tempDir: string;
let storePath: string;
let transcriptPath: string;
beforeEach(() => {
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-accessor-"));
storePath = path.join(tempDir, "sessions.json");
transcriptPath = path.join(tempDir, "session.jsonl");
});
afterEach(() => {
fs.rmSync(tempDir, { recursive: true, force: true });
});
it("loads, lists, and patches session entries without exposing the file store shape", async () => {
const scope = {
sessionKey: "agent:main:main",
storePath,
};
await upsertSessionEntry(scope, {
model: "gpt-5.5",
sessionId: "session-1",
updatedAt: 10,
});
expect(loadSessionEntry(scope)).toMatchObject({
model: "gpt-5.5",
sessionId: "session-1",
updatedAt: expect.any(Number),
});
expect(listSessionEntries({ storePath })).toEqual([
{
sessionKey: "agent:main:main",
entry: expect.objectContaining({
model: "gpt-5.5",
sessionId: "session-1",
updatedAt: expect.any(Number),
}),
},
]);
await upsertSessionEntry(scope, { model: "sonnet-4.6", updatedAt: 20 });
expect(loadSessionEntry(scope)).toMatchObject({
model: "sonnet-4.6",
sessionId: "session-1",
updatedAt: expect.any(Number),
});
});
it("creates durable session ids for metadata-only inserts", async () => {
const scope = {
sessionKey: "agent:main:main",
storePath,
};
const inserted = await upsertSessionEntry(scope, { model: "gpt-5.5" });
expect(inserted?.sessionId).toMatch(
/^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i,
);
expect(inserted?.sessionId).not.toBe(scope.sessionKey);
expect(loadSessionEntry(scope)?.sessionId).toBe(inserted?.sessionId);
});
it("loads and appends transcript events through a session scope", async () => {
const scope = {
sessionFile: transcriptPath,
sessionId: "session-1",
sessionKey: "agent:main:main",
storePath,
};
const event = {
id: "msg-1",
message: { role: "user", content: "hello" },
parentId: null,
type: "message",
};
await appendTranscriptEvent(scope, { type: "session", sessionId: "session-1" });
await appendTranscriptEvent(scope, event);
await expect(loadTranscriptEvents(scope)).resolves.toEqual([
{ type: "session", sessionId: "session-1" },
event,
]);
expect(fs.statSync(transcriptPath).mode & 0o777).toBe(0o600);
});
it("honors thread fallback paths when resolving transcript scope from the store", async () => {
const scope = {
agentId: "main",
sessionId: "session-1",
sessionKey: "agent:main:demo-channel:1234:thread:456",
storePath,
};
const event = {
id: "msg-1",
message: { role: "user", content: "hello" },
parentId: null,
type: "message",
};
await upsertSessionEntry(scope, {
sessionId: scope.sessionId,
updatedAt: 10,
});
await appendTranscriptEvent(scope, event);
const expectedTranscriptPath = path.join(tempDir, "session-1-topic-456.jsonl");
expect(fs.existsSync(expectedTranscriptPath)).toBe(true);
expect(fs.existsSync(path.join(tempDir, "session-1.jsonl"))).toBe(false);
expect(fs.realpathSync(loadSessionEntry(scope)?.sessionFile ?? "")).toBe(
fs.realpathSync(expectedTranscriptPath),
);
await expect(loadTranscriptEvents(scope)).resolves.toEqual([event]);
});
it("persists transcript metadata under the normalized session key", async () => {
const canonicalScope = {
sessionId: "session-1",
sessionKey: "agent:main:main",
storePath,
};
await upsertSessionEntry(canonicalScope, {
sessionId: canonicalScope.sessionId,
updatedAt: 10,
});
await appendTranscriptEvent(
{
agentId: "main",
sessionId: canonicalScope.sessionId,
sessionKey: "AGENT:MAIN:MAIN",
storePath,
},
{ id: "msg-1", type: "message" },
);
expect(listSessionEntries({ storePath }).map((entry) => entry.sessionKey)).toEqual([
canonicalScope.sessionKey,
]);
expect(loadSessionEntry(canonicalScope)?.sessionFile).toBeTruthy();
});
});

View File

@@ -0,0 +1,142 @@
import { randomUUID } from "node:crypto";
import path from "node:path";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { resolveSessionTranscriptPathInDir } from "./paths.js";
import { resolveAndPersistSessionFile } from "./session-file.js";
import {
getSessionEntry,
listSessionEntries as listFileSessionEntries,
loadSessionStore,
patchSessionEntry as patchFileSessionEntry,
resolveSessionStoreEntry,
} from "./store.js";
import { parseSessionThreadInfo } from "./thread-info.js";
import { appendSessionTranscriptEvent } from "./transcript-append.js";
import { streamSessionTranscriptLines } from "./transcript-stream.js";
import { resolveSessionTranscriptFile } from "./transcript.js";
import type { SessionEntry } from "./types.js";
export type SessionAccessScope = {
agentId?: string;
env?: NodeJS.ProcessEnv;
hydrateSkillPromptRefs?: boolean;
sessionKey: string;
storePath?: string;
};
export type SessionTranscriptAccessScope = SessionAccessScope & {
sessionFile?: string;
sessionId: string;
threadId?: string | number;
};
export type SessionEntrySummary = {
sessionKey: string;
entry: SessionEntry;
};
export type TranscriptEvent = unknown;
/** Loads one session entry through the storage-neutral accessor seam. */
export function loadSessionEntry(scope: SessionAccessScope): SessionEntry | undefined {
return getSessionEntry(scope);
}
/** Lists session entries through the storage-neutral accessor seam. */
export function listSessionEntries(
scope: Partial<Omit<SessionAccessScope, "sessionKey">> = {},
): SessionEntrySummary[] {
return listFileSessionEntries(scope);
}
/** Applies a partial entry update through the storage-neutral accessor seam. */
export async function upsertSessionEntry(
scope: SessionAccessScope,
patch: Partial<SessionEntry>,
): Promise<SessionEntry | null> {
return await patchFileSessionEntry({
...scope,
fallbackEntry: createFallbackSessionEntry(patch),
update: () => patch,
});
}
/** Loads raw transcript events through the storage-neutral accessor seam. */
export async function loadTranscriptEvents(
scope: SessionTranscriptAccessScope,
): Promise<TranscriptEvent[]> {
const transcript = await resolveTranscriptAccess(scope);
const events: TranscriptEvent[] = [];
for await (const line of streamSessionTranscriptLines(transcript.sessionFile)) {
events.push(JSON.parse(line) as TranscriptEvent);
}
return events;
}
/** Appends one raw transcript event through the storage-neutral accessor seam. */
export async function appendTranscriptEvent(
scope: SessionTranscriptAccessScope,
event: TranscriptEvent,
): Promise<void> {
const transcript = await resolveTranscriptAccess(scope);
await appendSessionTranscriptEvent({
event,
transcriptPath: transcript.sessionFile,
});
}
function createFallbackSessionEntry(patch: Partial<SessionEntry>): SessionEntry {
const now = Date.now();
return {
sessionId: patch.sessionId ?? randomUUID(),
updatedAt: patch.updatedAt ?? now,
...patch,
};
}
async function resolveTranscriptAccess(scope: SessionTranscriptAccessScope): Promise<{
sessionFile: string;
}> {
if (scope.sessionFile?.trim()) {
return { sessionFile: scope.sessionFile };
}
const agentId = scope.agentId ?? resolveAgentIdFromSessionKey(scope.sessionKey);
if (!agentId) {
throw new Error(`Cannot resolve transcript scope without an agent id: ${scope.sessionKey}`);
}
const sessionStore = scope.storePath
? loadSessionStore(scope.storePath, { skipCache: true })
: undefined;
const resolvedStoreEntry = sessionStore
? resolveSessionStoreEntry({ store: sessionStore, sessionKey: scope.sessionKey })
: undefined;
const sessionEntry = resolvedStoreEntry?.existing ?? loadSessionEntry(scope);
const sessionKey = resolvedStoreEntry?.normalizedKey ?? scope.sessionKey;
if (sessionStore && scope.storePath) {
const sessionsDir = path.dirname(path.resolve(scope.storePath));
const threadId = scope.threadId ?? parseSessionThreadInfo(scope.sessionKey).threadId;
const fallbackSessionFile =
!sessionEntry?.sessionFile && threadId !== undefined
? resolveSessionTranscriptPathInDir(scope.sessionId, sessionsDir, threadId)
: undefined;
return await resolveAndPersistSessionFile({
agentId,
fallbackSessionFile,
sessionEntry,
sessionId: scope.sessionId,
sessionKey,
sessionStore,
sessionsDir,
storePath: scope.storePath,
});
}
return await resolveSessionTranscriptFile({
agentId,
sessionEntry,
sessionId: scope.sessionId,
sessionKey: scope.sessionKey,
...(sessionStore ? { sessionStore } : {}),
...(scope.storePath ? { storePath: scope.storePath } : {}),
...(scope.threadId !== undefined ? { threadId: scope.threadId } : {}),
});
}

View File

@@ -15,6 +15,7 @@ import { redactSecrets } from "../../logging/redact.js";
import { createSessionTranscriptHeader } from "./transcript-header.js";
import {
appendJsonlEntry,
serializeJsonlEntry,
serializeJsonlLine,
writeJsonlEntry,
writeJsonlLines,
@@ -289,6 +290,31 @@ export async function appendSessionTranscriptMessage<TMessage>(
);
}
export type AppendSessionTranscriptEventParams = {
config?: OpenClawConfig;
event: unknown;
transcriptPath: string;
};
/** Appends a raw transcript event using the same write lock and FIFO as message appends. */
export async function appendSessionTranscriptEvent(
params: AppendSessionTranscriptEventParams,
): Promise<void> {
const activeLockRunner = resolveOwnedSessionTranscriptWriteLockRunner({
sessionFile: params.transcriptPath,
});
if (activeLockRunner) {
return await activeLockRunner(() =>
withTranscriptAppendQueue(params.transcriptPath, () =>
appendSessionTranscriptEventLocked(params),
),
);
}
return await withTranscriptAppendQueue(params.transcriptPath, () =>
withSessionTranscriptWriteLock(params, () => appendSessionTranscriptEventLocked(params)),
);
}
async function withSessionTranscriptWriteLock<T>(
params: Pick<AppendSessionTranscriptMessageParams, "transcriptPath" | "config">,
run: () => Promise<T> | T,
@@ -305,6 +331,18 @@ async function withSessionTranscriptWriteLock<T>(
}
}
async function appendSessionTranscriptEventLocked(
params: AppendSessionTranscriptEventParams,
): Promise<void> {
await fs.mkdir(path.dirname(params.transcriptPath), { recursive: true });
const handle = await fs.open(params.transcriptPath, "a", 0o600);
try {
await handle.appendFile(serializeJsonlEntry(params.event), "utf-8");
} finally {
await handle.close();
}
}
async function appendSessionTranscriptMessageLocked<TMessage>(
params: AppendSessionTranscriptMessageParams<TMessage>,
): Promise<AppendSessionTranscriptMessageResult<TMessage> | undefined> {