Add per-agent SQLite cache store (#88349)

* feat: add per-agent sqlite cache store

* fix: preserve sqlite cache adapter scope

* chore: mark sqlite cache scaffold intentional
This commit is contained in:
Peter Steinberger
2026-05-30 17:00:24 +01:00
committed by GitHub
parent f6b40861f7
commit fc50f949d4
12 changed files with 1238 additions and 1 deletions

View File

@@ -39,6 +39,7 @@ const rawSqliteAllowPathGroups = {
"src/infra/sqlite-pragma.test-support.ts",
"src/infra/sqlite-transaction.ts",
"src/infra/sqlite-wal.ts",
"src/state/openclaw-agent-db.ts",
"src/state/openclaw-state-db.ts",
"src/state/sqlite-schema-shape.test-support.ts",
],

View File

@@ -1,7 +1,15 @@
// Intentional Knip unused-file findings. These are dynamic entrypoints,
// generated/build inputs, manifest-discovered plugin surfaces, live-test
// helpers, or package bridge files that static production scanning cannot see.
export const KNIP_UNUSED_FILE_ALLOWLIST = [];
export const KNIP_UNUSED_FILE_ALLOWLIST = [
// Per-agent SQLite scaffold is intentionally landed before runtime migration
// callers so the schema and scoped cache API can be reviewed together.
"src/agents/cache/agent-cache-store.sqlite.ts",
"src/agents/cache/agent-cache-store.ts",
"src/state/openclaw-agent-db.paths.ts",
"src/state/openclaw-agent-db.ts",
"src/state/openclaw-agent-schema.generated.ts",
];
// Knip can disagree across supported local/CI platforms for files that are
// only reachable through test-only import graphs, sparse-checkout proof

View File

@@ -12,6 +12,13 @@ const SCHEMAS = [
schemaOutFile: "src/state/openclaw-state-schema.generated.ts",
schemaExport: "OPENCLAW_STATE_SCHEMA_SQL",
},
{
name: "openclaw-agent",
schema: "src/state/openclaw-agent-schema.sql",
outFile: "src/state/openclaw-agent-db.generated.d.ts",
schemaOutFile: "src/state/openclaw-agent-schema.generated.ts",
schemaExport: "OPENCLAW_AGENT_SCHEMA_SQL",
},
];
const verify = process.argv.includes("--verify") || process.argv.includes("--check");

View File

@@ -0,0 +1,230 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import {
closeOpenClawAgentDatabasesForTest,
listOpenClawRegisteredAgentDatabases,
} from "../../state/openclaw-agent-db.js";
import { closeOpenClawStateDatabaseForTest } from "../../state/openclaw-state-db.js";
import {
clearExpiredSqliteAgentCacheEntries,
clearSqliteAgentCacheEntries,
createSqliteAgentCacheStore,
deleteSqliteAgentCacheEntry,
listSqliteAgentCacheEntries,
readSqliteAgentCacheEntry,
writeSqliteAgentCacheEntry,
} from "./agent-cache-store.sqlite.js";
function createTempStateDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-agent-cache-"));
}
afterEach(() => {
closeOpenClawAgentDatabasesForTest();
closeOpenClawStateDatabaseForTest();
});
describe("SQLite agent cache store", () => {
it("stores scoped JSON values and blobs in the agent database", () => {
const env = { OPENCLAW_STATE_DIR: createTempStateDir() };
expect(
writeSqliteAgentCacheEntry({
env,
agentId: "Main",
scope: "run:one",
key: "payload",
value: { status: "ok" },
blob: "bytes",
now: () => 1000,
}),
).toEqual({
agentId: "main",
scope: "run:one",
key: "payload",
value: { status: "ok" },
blob: Buffer.from("bytes"),
expiresAt: null,
updatedAt: 1000,
});
writeSqliteAgentCacheEntry({
env,
agentId: "main",
scope: "run:two",
key: "payload",
value: { status: "other" },
});
expect(
readSqliteAgentCacheEntry({
env,
agentId: "main",
scope: "run:one",
key: "payload",
}),
).toEqual({
agentId: "main",
scope: "run:one",
key: "payload",
value: { status: "ok" },
blob: Buffer.from("bytes"),
expiresAt: null,
updatedAt: 1000,
});
expect(listSqliteAgentCacheEntries({ env, agentId: "main", scope: "run:one" })).toEqual([
expect.objectContaining({
key: "payload",
value: { status: "ok" },
}),
]);
});
it("hides expired entries and clears expired rows", () => {
const env = { OPENCLAW_STATE_DIR: createTempStateDir() };
writeSqliteAgentCacheEntry({
env,
agentId: "main",
scope: "runtime",
key: "old",
value: "stale",
expiresAt: 1000,
now: () => 900,
});
writeSqliteAgentCacheEntry({
env,
agentId: "main",
scope: "runtime",
key: "fresh",
value: "ok",
ttlMs: 10_000,
now: () => 2000,
});
writeSqliteAgentCacheEntry({
env,
agentId: "main",
scope: "other",
key: "old",
value: "kept",
expiresAt: 1000,
});
expect(
readSqliteAgentCacheEntry({
env,
agentId: "main",
scope: "runtime",
key: "old",
now: () => 2000,
}),
).toBeNull();
expect(
listSqliteAgentCacheEntries({ env, agentId: "main", scope: "runtime", now: () => 2000 }),
).toEqual([
expect.objectContaining({
key: "fresh",
value: "ok",
expiresAt: 12_000,
}),
]);
expect(
clearExpiredSqliteAgentCacheEntries({
env,
agentId: "main",
scope: "runtime",
currentTime: 2000,
}),
).toBe(1);
expect(
clearExpiredSqliteAgentCacheEntries({
env,
agentId: "main",
scope: "other",
currentTime: 2000,
}),
).toBe(1);
});
it("exposes a scoped runtime cache adapter", () => {
const env = { OPENCLAW_STATE_DIR: createTempStateDir() };
const cache = createSqliteAgentCacheStore({
env,
agentId: "main",
scope: "run:adapter",
now: () => 3000,
});
cache.write({
key: "result",
value: ["a", "b"],
blob: Buffer.from([1, 2]),
});
expect(cache.read("result")).toEqual(
expect.objectContaining({
agentId: "main",
scope: "run:adapter",
key: "result",
value: ["a", "b"],
blob: Buffer.from([1, 2]),
}),
);
expect(
deleteSqliteAgentCacheEntry({ env, agentId: "main", scope: "run:adapter", key: "result" }),
).toBe(true);
expect(cache.read("result")).toBeNull();
cache.write({ key: "next", value: true });
expect(clearSqliteAgentCacheEntries({ env, agentId: "main", scope: "run:adapter" })).toBe(1);
});
it("does not let loose write options override the scoped adapter owner", () => {
const env = { OPENCLAW_STATE_DIR: createTempStateDir() };
const cache = createSqliteAgentCacheStore({
env,
agentId: "main",
scope: "safe",
now: () => 4000,
});
cache.write({
key: "result",
value: "ok",
...({
agentId: "other",
scope: "unsafe",
} as Record<string, unknown>),
});
expect(
readSqliteAgentCacheEntry({ env, agentId: "main", scope: "safe", key: "result" }),
).toEqual(expect.objectContaining({ value: "ok" }));
expect(
readSqliteAgentCacheEntry({ env, agentId: "other", scope: "unsafe", key: "result" }),
).toBeNull();
});
it("honors explicit per-agent database paths", () => {
const stateDir = createTempStateDir();
const env = { OPENCLAW_STATE_DIR: stateDir };
const dbPath = path.join(stateDir, "custom", "worker.sqlite");
writeSqliteAgentCacheEntry({
env,
path: dbPath,
agentId: "worker",
scope: "runtime",
key: "entry",
value: { ok: true },
});
expect(fs.existsSync(dbPath)).toBe(true);
expect(
listOpenClawRegisteredAgentDatabases({ env }).find((entry) => entry.path === dbPath),
).toMatchObject({
agentId: "worker",
path: dbPath,
});
});
});

View File

@@ -0,0 +1,309 @@
import type { Selectable } from "kysely";
import {
executeSqliteQuerySync,
executeSqliteQueryTakeFirstSync,
getNodeSqliteKysely,
} from "../../infra/kysely-sync.js";
import { normalizeAgentId } from "../../routing/session-key.js";
import type { DB as OpenClawAgentKyselyDatabase } from "../../state/openclaw-agent-db.generated.js";
import {
openOpenClawAgentDatabase,
runOpenClawAgentWriteTransaction,
type OpenClawAgentDatabaseOptions,
} from "../../state/openclaw-agent-db.js";
import type {
AgentRuntimeCacheStore,
AgentRuntimeCacheValue,
AgentRuntimeCacheWriteOptions,
} from "./agent-cache-store.js";
export type SqliteAgentCacheStoreOptions = OpenClawAgentDatabaseOptions & {
scope: string;
now?: () => number;
};
export type WriteSqliteAgentCacheEntryOptions = SqliteAgentCacheStoreOptions &
AgentRuntimeCacheWriteOptions;
type CacheEntriesTable = OpenClawAgentKyselyDatabase["cache_entries"];
type AgentCacheDatabase = Pick<OpenClawAgentKyselyDatabase, "cache_entries">;
type AgentCacheRow = Selectable<CacheEntriesTable>;
function normalizeScopeValue(value: string): string {
const scope = value.trim();
if (!scope) {
throw new Error("SQLite agent cache scope is required.");
}
if (scope.includes("\0")) {
throw new Error("SQLite agent cache scope must not contain NUL bytes.");
}
return scope;
}
function normalizeKey(value: string): string {
const key = value.trim();
if (!key) {
throw new Error("SQLite agent cache key is required.");
}
if (key.includes("\0")) {
throw new Error("SQLite agent cache key must not contain NUL bytes.");
}
return key;
}
function normalizeScope(options: SqliteAgentCacheStoreOptions): {
agentId: string;
scope: string;
} {
return {
agentId: normalizeAgentId(options.agentId),
scope: normalizeScopeValue(options.scope),
};
}
function toDatabaseOptions(options: SqliteAgentCacheStoreOptions): OpenClawAgentDatabaseOptions {
return {
agentId: options.agentId,
...(options.env ? { env: options.env } : {}),
...(options.path ? { path: options.path } : {}),
};
}
function asNumber(value: number | bigint | null): number | null {
if (value === null) {
return null;
}
return typeof value === "bigint" ? Number(value) : value;
}
function parseValue(raw: string | null): unknown {
if (raw === null) {
return null;
}
try {
return JSON.parse(raw) as unknown;
} catch {
return null;
}
}
function isExpired(row: AgentCacheRow, now: number): boolean {
const expiresAt = asNumber(row.expires_at);
return expiresAt !== null && expiresAt <= now;
}
function rowToCacheValue(
row: AgentCacheRow,
scope: { agentId: string; scope: string },
): AgentRuntimeCacheValue {
return {
agentId: scope.agentId,
scope: scope.scope,
key: row.key,
value: parseValue(row.value_json),
...(row.blob ? { blob: Buffer.from(row.blob) } : {}),
expiresAt: asNumber(row.expires_at),
updatedAt: asNumber(row.updated_at) ?? 0,
};
}
function resolveExpiresAt(options: AgentRuntimeCacheWriteOptions, now: number): number | null {
if (typeof options.ttlMs === "number") {
if (!Number.isFinite(options.ttlMs) || options.ttlMs <= 0) {
throw new Error("SQLite agent cache ttlMs must be a positive finite number.");
}
return now + options.ttlMs;
}
return options.expiresAt ?? null;
}
export function writeSqliteAgentCacheEntry(
options: WriteSqliteAgentCacheEntryOptions,
): AgentRuntimeCacheValue {
const scope = normalizeScope(options);
const key = normalizeKey(options.key);
const updatedAt = options.now?.() ?? Date.now();
const expiresAt = resolveExpiresAt(options, updatedAt);
const valueJson = options.value === undefined ? null : JSON.stringify(options.value);
const blob =
options.blob === undefined
? null
: Buffer.isBuffer(options.blob)
? options.blob
: Buffer.from(options.blob);
runOpenClawAgentWriteTransaction((database) => {
const db = getNodeSqliteKysely<AgentCacheDatabase>(database.db);
executeSqliteQuerySync(
database.db,
db
.insertInto("cache_entries")
.values({
scope: scope.scope,
key,
value_json: valueJson,
blob,
expires_at: expiresAt,
updated_at: updatedAt,
})
.onConflict((conflict) =>
conflict.columns(["scope", "key"]).doUpdateSet({
value_json: valueJson,
blob,
expires_at: expiresAt,
updated_at: updatedAt,
}),
),
);
}, toDatabaseOptions(options));
return {
agentId: scope.agentId,
scope: scope.scope,
key,
value: options.value ?? null,
...(blob ? { blob: Buffer.from(blob) } : {}),
expiresAt,
updatedAt,
};
}
export function readSqliteAgentCacheEntry(
options: SqliteAgentCacheStoreOptions & { key: string },
): AgentRuntimeCacheValue | null {
const scope = normalizeScope(options);
const key = normalizeKey(options.key);
const database = openOpenClawAgentDatabase(toDatabaseOptions(options));
const db = getNodeSqliteKysely<AgentCacheDatabase>(database.db);
const row =
executeSqliteQueryTakeFirstSync(
database.db,
db
.selectFrom("cache_entries")
.select(["scope", "key", "value_json", "blob", "expires_at", "updated_at"])
.where("scope", "=", scope.scope)
.where("key", "=", key),
) ?? null;
if (!row || isExpired(row, options.now?.() ?? Date.now())) {
return null;
}
return rowToCacheValue(row, scope);
}
export function listSqliteAgentCacheEntries(
options: SqliteAgentCacheStoreOptions,
): AgentRuntimeCacheValue[] {
const scope = normalizeScope(options);
const now = options.now?.() ?? Date.now();
const database = openOpenClawAgentDatabase(toDatabaseOptions(options));
const db = getNodeSqliteKysely<AgentCacheDatabase>(database.db);
return executeSqliteQuerySync(
database.db,
db
.selectFrom("cache_entries")
.select(["scope", "key", "value_json", "blob", "expires_at", "updated_at"])
.where("scope", "=", scope.scope)
.orderBy("key", "asc"),
)
.rows.filter((row) => !isExpired(row, now))
.map((row) => rowToCacheValue(row, scope));
}
export function deleteSqliteAgentCacheEntry(
options: SqliteAgentCacheStoreOptions & { key: string },
): boolean {
const scope = normalizeScope(options);
const key = normalizeKey(options.key);
return runOpenClawAgentWriteTransaction((database) => {
const db = getNodeSqliteKysely<AgentCacheDatabase>(database.db);
const result = executeSqliteQuerySync(
database.db,
db.deleteFrom("cache_entries").where("scope", "=", scope.scope).where("key", "=", key),
);
return Number(result.numAffectedRows ?? 0) > 0;
}, toDatabaseOptions(options));
}
export function clearSqliteAgentCacheEntries(options: SqliteAgentCacheStoreOptions): number {
const scope = normalizeScope(options);
return runOpenClawAgentWriteTransaction((database) => {
const db = getNodeSqliteKysely<AgentCacheDatabase>(database.db);
const result = executeSqliteQuerySync(
database.db,
db.deleteFrom("cache_entries").where("scope", "=", scope.scope),
);
return Number(result.numAffectedRows ?? 0);
}, toDatabaseOptions(options));
}
export function clearExpiredSqliteAgentCacheEntries(
options: SqliteAgentCacheStoreOptions & { currentTime?: number },
): number {
const scope = normalizeScope(options);
const currentTime = options.currentTime ?? options.now?.() ?? Date.now();
return runOpenClawAgentWriteTransaction((database) => {
const db = getNodeSqliteKysely<AgentCacheDatabase>(database.db);
const result = executeSqliteQuerySync(
database.db,
db
.deleteFrom("cache_entries")
.where("scope", "=", scope.scope)
.where("expires_at", "is not", null)
.where("expires_at", "<=", currentTime),
);
return Number(result.numAffectedRows ?? 0);
}, toDatabaseOptions(options));
}
export class SqliteAgentCacheStore implements AgentRuntimeCacheStore {
readonly #options: SqliteAgentCacheStoreOptions;
constructor(options: SqliteAgentCacheStoreOptions) {
this.#options = options;
}
write(options: AgentRuntimeCacheWriteOptions): AgentRuntimeCacheValue {
const { blob, expiresAt, key, ttlMs, value } = options;
return writeSqliteAgentCacheEntry({
...this.#options,
key,
...(value === undefined ? {} : { value }),
...(blob === undefined ? {} : { blob }),
...(expiresAt === undefined ? {} : { expiresAt }),
...(ttlMs === undefined ? {} : { ttlMs }),
});
}
read(key: string): AgentRuntimeCacheValue | null {
return readSqliteAgentCacheEntry({
...this.#options,
key,
});
}
list(): AgentRuntimeCacheValue[] {
return listSqliteAgentCacheEntries(this.#options);
}
delete(key: string): boolean {
return deleteSqliteAgentCacheEntry({
...this.#options,
key,
});
}
clear(): number {
return clearSqliteAgentCacheEntries(this.#options);
}
clearExpired(now?: number): number {
return clearExpiredSqliteAgentCacheEntries({
...this.#options,
...(now === undefined ? {} : { currentTime: now }),
});
}
}
export function createSqliteAgentCacheStore(
options: SqliteAgentCacheStoreOptions,
): SqliteAgentCacheStore {
return new SqliteAgentCacheStore(options);
}

26
src/agents/cache/agent-cache-store.ts vendored Normal file
View File

@@ -0,0 +1,26 @@
export type AgentRuntimeCacheValue = {
agentId: string;
scope: string;
key: string;
value: unknown;
blob?: Buffer;
expiresAt: number | null;
updatedAt: number;
};
export type AgentRuntimeCacheWriteOptions = {
key: string;
value?: unknown;
blob?: Buffer | string;
expiresAt?: number | null;
ttlMs?: number;
};
export type AgentRuntimeCacheStore = {
write(options: AgentRuntimeCacheWriteOptions): AgentRuntimeCacheValue;
read(key: string): AgentRuntimeCacheValue | null;
list(): AgentRuntimeCacheValue[];
delete(key: string): boolean;
clear(): number;
clearExpired(now?: number): number;
};

View File

@@ -0,0 +1,35 @@
/**
* This file was generated by kysely-codegen.
* Please do not edit it manually.
*/
import type { ColumnType } from "kysely";
export type Generated<T> =
T extends ColumnType<infer S, infer I, infer U>
? ColumnType<S, I | undefined, U>
: ColumnType<T, T | undefined, T>;
export interface CacheEntries {
blob: Uint8Array | null;
expires_at: number | null;
key: string;
scope: string;
updated_at: number;
value_json: string | null;
}
export interface SchemaMeta {
agent_id: string | null;
app_version: string | null;
created_at: number;
meta_key: string;
role: string;
schema_version: number;
updated_at: number;
}
export interface DB {
cache_entries: CacheEntries;
schema_meta: SchemaMeta;
}

View File

@@ -0,0 +1,27 @@
import path from "node:path";
import { normalizeAgentId } from "../routing/session-key.js";
import { resolveOpenClawStateSqliteDir } from "./openclaw-state-db.paths.js";
export type OpenClawAgentSqlitePathOptions = {
agentId: string;
env?: NodeJS.ProcessEnv;
path?: string;
};
export function resolveOpenClawAgentSqlitePath(options: OpenClawAgentSqlitePathOptions): string {
const agentId = normalizeAgentId(options.agentId);
return (
options.path ??
path.join(
path.dirname(resolveOpenClawStateSqliteDir(options.env ?? process.env)),
"agents",
agentId,
"agent",
"openclaw-agent.sqlite",
)
);
}
export function resolveOpenClawAgentSqliteDir(options: OpenClawAgentSqlitePathOptions): string {
return path.dirname(resolveOpenClawAgentSqlitePath(options));
}

View File

@@ -0,0 +1,247 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { executeSqliteQueryTakeFirstSync, getNodeSqliteKysely } from "../infra/kysely-sync.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { readSqliteNumberPragma } from "../infra/sqlite-pragma.test-support.js";
import type { DB as OpenClawAgentKyselyDatabase } from "./openclaw-agent-db.generated.js";
import {
closeOpenClawAgentDatabasesForTest,
listOpenClawRegisteredAgentDatabases,
openOpenClawAgentDatabase,
resolveOpenClawAgentSqlitePath,
} from "./openclaw-agent-db.js";
import {
closeOpenClawStateDatabaseForTest,
openOpenClawStateDatabase,
} from "./openclaw-state-db.js";
import {
collectSqliteSchemaShape,
createSqliteSchemaShapeFromSql,
} from "./sqlite-schema-shape.test-support.js";
type AgentDbTestDatabase = Pick<OpenClawAgentKyselyDatabase, "schema_meta">;
function createTempStateDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-agent-db-"));
}
afterEach(() => {
closeOpenClawAgentDatabasesForTest();
closeOpenClawStateDatabaseForTest();
});
describe("openclaw agent database", () => {
it("resolves under the per-agent state directory", () => {
const stateDir = createTempStateDir();
expect(
resolveOpenClawAgentSqlitePath({
agentId: "Worker-1",
env: { OPENCLAW_STATE_DIR: stateDir },
}),
).toBe(path.join(stateDir, "agents", "worker-1", "agent", "openclaw-agent.sqlite"));
});
it("keeps test default state under a worker-sharded temp directory", () => {
expect(
resolveOpenClawAgentSqlitePath({
agentId: "main",
env: {
VITEST: "true",
VITEST_WORKER_ID: "7",
} as NodeJS.ProcessEnv,
}),
).toBe(
path.join(
os.tmpdir(),
"openclaw-test-state",
`${process.pid}-7`,
"agents",
"main",
"agent",
"openclaw-agent.sqlite",
),
);
});
it("creates the per-agent schema and registers it globally", () => {
const stateDir = createTempStateDir();
const database = openOpenClawAgentDatabase({
agentId: "worker-1",
env: { OPENCLAW_STATE_DIR: stateDir },
});
expect(collectSqliteSchemaShape(database.db)).toEqual(
createSqliteSchemaShapeFromSql(new URL("./openclaw-agent-schema.sql", import.meta.url)),
);
expect(database.agentId).toBe("worker-1");
expect(database.path).toBe(
path.join(stateDir, "agents", "worker-1", "agent", "openclaw-agent.sqlite"),
);
const registered = listOpenClawRegisteredAgentDatabases({
env: { OPENCLAW_STATE_DIR: stateDir },
}).find((entry) => entry.agentId === "worker-1");
expect(registered).toMatchObject({
agentId: "worker-1",
path: database.path,
schemaVersion: 1,
});
expect(registered?.sizeBytes).toBeGreaterThan(0);
});
it("keeps multiple registered paths for the same agent", () => {
const stateDir = createTempStateDir();
const env = { OPENCLAW_STATE_DIR: stateDir };
const relocatedPath = path.join(stateDir, "relocated", "worker-1.sqlite");
const relocated = openOpenClawAgentDatabase({
agentId: "worker-1",
env,
path: relocatedPath,
});
const defaultDatabase = openOpenClawAgentDatabase({
agentId: "worker-1",
env,
});
expect(
listOpenClawRegisteredAgentDatabases({ env })
.filter((entry) => entry.agentId === "worker-1")
.map((entry) => entry.path),
).toEqual([defaultDatabase.path, relocated.path].toSorted());
});
it("rejects sharing one explicit database path across agent ids", () => {
const stateDir = createTempStateDir();
const env = { OPENCLAW_STATE_DIR: stateDir };
const databasePath = path.join(stateDir, "relocated", "shared.sqlite");
openOpenClawAgentDatabase({
agentId: "worker-1",
env,
path: databasePath,
});
expect(() =>
openOpenClawAgentDatabase({
agentId: "worker-2",
env,
path: databasePath,
}),
).toThrow(/already open for agent worker-1/);
closeOpenClawAgentDatabasesForTest();
expect(() =>
openOpenClawAgentDatabase({
agentId: "worker-2",
env,
path: databasePath,
}),
).toThrow(/belongs to agent worker-1/);
});
it("rejects explicit paths that point at the global state database", () => {
const stateDir = createTempStateDir();
const env = { OPENCLAW_STATE_DIR: stateDir };
const databasePath = path.join(stateDir, "state", "openclaw.sqlite");
const stateDatabase = openOpenClawStateDatabase({
env,
path: databasePath,
});
closeOpenClawStateDatabaseForTest();
expect(() =>
openOpenClawAgentDatabase({
agentId: "worker-1",
env,
path: stateDatabase.path,
}),
).toThrow(/schema role global/);
const reopenedStateDatabase = openOpenClawStateDatabase({
env,
path: databasePath,
});
const row = reopenedStateDatabase.db
.prepare("SELECT role, agent_id FROM schema_meta WHERE meta_key = 'primary'")
.get() as { agent_id?: unknown; role?: unknown } | undefined;
expect(row).toEqual({ role: "global", agent_id: null });
});
it("does not chmod shared parent directories for explicit database paths", () => {
const parentDir = createTempStateDir();
fs.chmodSync(parentDir, 0o755);
const databasePath = path.join(parentDir, "worker-1.sqlite");
openOpenClawAgentDatabase({
agentId: "worker-1",
path: databasePath,
});
expect(fs.statSync(parentDir).mode & 0o777).toBe(0o755);
});
it("configures durable SQLite connection pragmas", () => {
const stateDir = createTempStateDir();
const database = openOpenClawAgentDatabase({
agentId: "worker-1",
env: { OPENCLAW_STATE_DIR: stateDir },
});
expect(readSqliteNumberPragma(database.db, "busy_timeout")).toBe(30_000);
expect(readSqliteNumberPragma(database.db, "foreign_keys")).toBe(1);
expect(readSqliteNumberPragma(database.db, "synchronous")).toBe(1);
expect(readSqliteNumberPragma(database.db, "user_version")).toBe(1);
expect(readSqliteNumberPragma(database.db, "wal_autocheckpoint")).toBe(1000);
const journalMode = database.db.prepare("PRAGMA journal_mode").get() as
| { journal_mode?: string }
| undefined;
expect(journalMode?.journal_mode?.toLowerCase()).toBe("wal");
});
it("records durable per-agent schema metadata", () => {
const stateDir = createTempStateDir();
const database = openOpenClawAgentDatabase({
agentId: "worker-1",
env: { OPENCLAW_STATE_DIR: stateDir },
});
const agentDb = getNodeSqliteKysely<AgentDbTestDatabase>(database.db);
expect(
executeSqliteQueryTakeFirstSync(
database.db,
agentDb.selectFrom("schema_meta").select(["role", "schema_version", "agent_id"]),
),
).toEqual({
role: "agent",
schema_version: 1,
agent_id: "worker-1",
});
});
it("refuses to open newer per-agent schema versions", () => {
const stateDir = createTempStateDir();
const databasePath = path.join(
stateDir,
"agents",
"worker-1",
"agent",
"openclaw-agent.sqlite",
);
fs.mkdirSync(path.dirname(databasePath), { recursive: true });
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(databasePath);
db.exec("PRAGMA user_version = 2;");
db.close();
expect(() =>
openOpenClawAgentDatabase({
agentId: "worker-1",
env: { OPENCLAW_STATE_DIR: stateDir },
}),
).toThrow(/newer schema version 2/);
});
});

View File

@@ -0,0 +1,290 @@
import { chmodSync, existsSync, mkdirSync, statSync } from "node:fs";
import path from "node:path";
import type { DatabaseSync } from "node:sqlite";
import {
clearNodeSqliteKyselyCacheForDatabase,
executeSqliteQuerySync,
getNodeSqliteKysely,
} from "../infra/kysely-sync.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { runSqliteImmediateTransactionSync } from "../infra/sqlite-transaction.js";
import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js";
import { normalizeAgentId } from "../routing/session-key.js";
import type { DB as OpenClawAgentKyselyDatabase } from "./openclaw-agent-db.generated.js";
import { resolveOpenClawAgentSqlitePath } from "./openclaw-agent-db.paths.js";
import { OPENCLAW_AGENT_SCHEMA_SQL } from "./openclaw-agent-schema.generated.js";
import type { DB as OpenClawStateKyselyDatabase } from "./openclaw-state-db.generated.js";
import {
OPENCLAW_SQLITE_BUSY_TIMEOUT_MS,
openOpenClawStateDatabase,
runOpenClawStateWriteTransaction,
type OpenClawStateDatabaseOptions,
} from "./openclaw-state-db.js";
export { resolveOpenClawAgentSqlitePath } from "./openclaw-agent-db.paths.js";
const OPENCLAW_AGENT_SCHEMA_VERSION = 1;
const OPENCLAW_AGENT_DB_DIR_MODE = 0o700;
const OPENCLAW_AGENT_DB_FILE_MODE = 0o600;
const OPENCLAW_AGENT_DB_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
export type OpenClawAgentDatabase = {
agentId: string;
db: DatabaseSync;
path: string;
walMaintenance: SqliteWalMaintenance;
};
export type OpenClawAgentDatabaseOptions = OpenClawStateDatabaseOptions & {
agentId: string;
};
export type OpenClawRegisteredAgentDatabase = {
agentId: string;
path: string;
schemaVersion: number;
lastSeenAt: number;
sizeBytes: number | null;
};
type OpenClawAgentMetadataDatabase = Pick<OpenClawAgentKyselyDatabase, "schema_meta">;
type OpenClawAgentRegistryDatabase = Pick<OpenClawStateKyselyDatabase, "agent_databases">;
const cachedDatabases = new Map<string, OpenClawAgentDatabase>();
type ExistingSchemaMeta = {
agentId: string | null;
role: string | null;
};
function readSqliteUserVersion(db: DatabaseSync): number {
const row = db.prepare("PRAGMA user_version").get() as { user_version?: unknown } | undefined;
return Number(row?.user_version ?? 0);
}
function assertSupportedAgentSchemaVersion(db: DatabaseSync, pathname: string): void {
const userVersion = readSqliteUserVersion(db);
if (userVersion > OPENCLAW_AGENT_SCHEMA_VERSION) {
throw new Error(
`OpenClaw agent database ${pathname} uses newer schema version ${userVersion}; this OpenClaw build supports ${OPENCLAW_AGENT_SCHEMA_VERSION}.`,
);
}
}
function ensureOpenClawAgentDatabasePermissions(
pathname: string,
options: OpenClawAgentDatabaseOptions,
): void {
const dir = path.dirname(pathname);
const defaultPath = resolveOpenClawAgentSqlitePath({
agentId: options.agentId,
env: options.env,
});
const isDefaultAgentDatabase = path.resolve(pathname) === path.resolve(defaultPath);
const dirExisted = existsSync(dir);
mkdirSync(dir, { recursive: true, mode: OPENCLAW_AGENT_DB_DIR_MODE });
if (isDefaultAgentDatabase || !dirExisted) {
chmodSync(dir, OPENCLAW_AGENT_DB_DIR_MODE);
}
for (const suffix of OPENCLAW_AGENT_DB_SIDECAR_SUFFIXES) {
const candidate = `${pathname}${suffix}`;
if (existsSync(candidate)) {
chmodSync(candidate, OPENCLAW_AGENT_DB_FILE_MODE);
}
}
}
function readExistingSchemaMeta(db: DatabaseSync): ExistingSchemaMeta | null {
const schemaMetaTable = db
.prepare("SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'schema_meta'")
.get();
if (!schemaMetaTable) {
return null;
}
const row = db
.prepare("SELECT role, agent_id FROM schema_meta WHERE meta_key = 'primary'")
.get() as { agent_id?: unknown; role?: unknown } | undefined;
if (!row) {
return null;
}
return {
agentId: typeof row.agent_id === "string" ? row.agent_id : null,
role: typeof row.role === "string" ? row.role : null,
};
}
function assertExistingSchemaOwner(
existing: ExistingSchemaMeta | null,
agentId: string,
pathname: string,
): void {
if (!existing) {
return;
}
if (existing.role !== "agent") {
throw new Error(
`OpenClaw agent database ${pathname} has schema role ${existing.role ?? "unknown"}; expected agent.`,
);
}
if (!existing.agentId) {
throw new Error(`OpenClaw agent database ${pathname} has no agent owner.`);
}
if (normalizeAgentId(existing.agentId) !== agentId) {
throw new Error(
`OpenClaw agent database ${pathname} belongs to agent ${existing.agentId}; requested agent ${agentId}.`,
);
}
}
function ensureAgentSchema(db: DatabaseSync, agentId: string, pathname: string): void {
assertSupportedAgentSchemaVersion(db, pathname);
assertExistingSchemaOwner(readExistingSchemaMeta(db), agentId, pathname);
db.exec(OPENCLAW_AGENT_SCHEMA_SQL);
const kysely = getNodeSqliteKysely<OpenClawAgentMetadataDatabase>(db);
db.exec(`PRAGMA user_version = ${OPENCLAW_AGENT_SCHEMA_VERSION};`);
const now = Date.now();
executeSqliteQuerySync(
db,
kysely
.insertInto("schema_meta")
.values({
meta_key: "primary",
role: "agent",
schema_version: OPENCLAW_AGENT_SCHEMA_VERSION,
agent_id: agentId,
app_version: null,
created_at: now,
updated_at: now,
})
.onConflict((conflict) =>
conflict.column("meta_key").doUpdateSet({
role: "agent",
schema_version: OPENCLAW_AGENT_SCHEMA_VERSION,
agent_id: agentId,
app_version: null,
updated_at: now,
}),
),
);
}
function registerAgentDatabase(params: {
agentId: string;
path: string;
env?: NodeJS.ProcessEnv;
}): void {
let sizeBytes: number | null = null;
try {
sizeBytes = statSync(params.path).size;
} catch {
sizeBytes = null;
}
const lastSeenAt = Date.now();
runOpenClawStateWriteTransaction(
(database) => {
const db = getNodeSqliteKysely<OpenClawAgentRegistryDatabase>(database.db);
executeSqliteQuerySync(
database.db,
db
.insertInto("agent_databases")
.values({
agent_id: params.agentId,
path: params.path,
schema_version: OPENCLAW_AGENT_SCHEMA_VERSION,
last_seen_at: lastSeenAt,
size_bytes: sizeBytes,
})
.onConflict((conflict) =>
conflict.columns(["agent_id", "path"]).doUpdateSet({
schema_version: OPENCLAW_AGENT_SCHEMA_VERSION,
last_seen_at: lastSeenAt,
size_bytes: sizeBytes,
}),
),
);
},
{ env: params.env },
);
}
export function listOpenClawRegisteredAgentDatabases(
options: OpenClawStateDatabaseOptions = {},
): OpenClawRegisteredAgentDatabase[] {
const database = openOpenClawStateDatabase(options);
const db = getNodeSqliteKysely<OpenClawAgentRegistryDatabase>(database.db);
const rows = executeSqliteQuerySync(
database.db,
db.selectFrom("agent_databases").selectAll().orderBy("agent_id", "asc").orderBy("path", "asc"),
).rows;
return rows.map((row) => ({
agentId: normalizeAgentId(row.agent_id),
path: row.path,
schemaVersion: row.schema_version,
lastSeenAt: row.last_seen_at,
sizeBytes: row.size_bytes,
}));
}
export function openOpenClawAgentDatabase(
options: OpenClawAgentDatabaseOptions,
): OpenClawAgentDatabase {
const agentId = normalizeAgentId(options.agentId);
const databaseOptions = { ...options, agentId };
const pathname = resolveOpenClawAgentSqlitePath(databaseOptions);
const cached = cachedDatabases.get(pathname);
if (cached?.db.isOpen) {
if (cached.agentId !== agentId) {
throw new Error(
`OpenClaw agent database ${pathname} is already open for agent ${cached.agentId}; requested agent ${agentId}.`,
);
}
registerAgentDatabase({ agentId, path: pathname, env: options.env });
return cached;
}
if (cached) {
cached.walMaintenance.close();
clearNodeSqliteKyselyCacheForDatabase(cached.db);
cachedDatabases.delete(pathname);
}
ensureOpenClawAgentDatabasePermissions(pathname, databaseOptions);
const sqlite = requireNodeSqlite();
const db = new sqlite.DatabaseSync(pathname);
const walMaintenance = configureSqliteWalMaintenance(db, {
databaseLabel: `openclaw-agent:${agentId}`,
databasePath: pathname,
});
db.exec("PRAGMA synchronous = NORMAL;");
db.exec(`PRAGMA busy_timeout = ${OPENCLAW_SQLITE_BUSY_TIMEOUT_MS};`);
db.exec("PRAGMA foreign_keys = ON;");
try {
ensureAgentSchema(db, agentId, pathname);
} catch (err) {
walMaintenance.close();
db.close();
throw err;
}
ensureOpenClawAgentDatabasePermissions(pathname, databaseOptions);
const database = { agentId, db, path: pathname, walMaintenance };
cachedDatabases.set(pathname, database);
registerAgentDatabase({ agentId, path: pathname, env: options.env });
return database;
}
export function runOpenClawAgentWriteTransaction<T>(
operation: (database: OpenClawAgentDatabase) => T,
options: OpenClawAgentDatabaseOptions,
): T {
const database = openOpenClawAgentDatabase(options);
const result = runSqliteImmediateTransactionSync(database.db, () => operation(database));
ensureOpenClawAgentDatabasePermissions(database.path, options);
return result;
}
export function closeOpenClawAgentDatabasesForTest(): void {
for (const database of cachedDatabases.values()) {
database.walMaintenance.close();
clearNodeSqliteKyselyCacheForDatabase(database.db);
database.db.close();
}
cachedDatabases.clear();
}

View File

@@ -0,0 +1,31 @@
/**
* This file was generated from the SQLite schema source.
* Please do not edit it manually.
*/
export const OPENCLAW_AGENT_SCHEMA_SQL = `CREATE TABLE IF NOT EXISTS schema_meta (
meta_key TEXT NOT NULL PRIMARY KEY,
role TEXT NOT NULL,
schema_version INTEGER NOT NULL,
agent_id TEXT,
app_version TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS cache_entries (
scope TEXT NOT NULL,
key TEXT NOT NULL,
value_json TEXT,
blob BLOB,
expires_at INTEGER,
updated_at INTEGER NOT NULL,
PRIMARY KEY (scope, key)
);
CREATE INDEX IF NOT EXISTS idx_agent_cache_expiry
ON cache_entries(scope, expires_at, key)
WHERE expires_at IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_agent_cache_updated
ON cache_entries(scope, updated_at DESC, key);\n`;

View File

@@ -0,0 +1,26 @@
CREATE TABLE IF NOT EXISTS schema_meta (
meta_key TEXT NOT NULL PRIMARY KEY,
role TEXT NOT NULL,
schema_version INTEGER NOT NULL,
agent_id TEXT,
app_version TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS cache_entries (
scope TEXT NOT NULL,
key TEXT NOT NULL,
value_json TEXT,
blob BLOB,
expires_at INTEGER,
updated_at INTEGER NOT NULL,
PRIMARY KEY (scope, key)
);
CREATE INDEX IF NOT EXISTS idx_agent_cache_expiry
ON cache_entries(scope, expires_at, key)
WHERE expires_at IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_agent_cache_updated
ON cache_entries(scope, updated_at DESC, key);