refactor: add shared sqlite state database

Adds the shared SQLite state database base, moves plugin keyed state into it with doctor migration coverage, and keeps generated Kysely guardrails aligned. Proof: focused SQLite/plugin-state tests, db:kysely:check, lint:kysely, architecture/dependency guards, autoreview, and PR CI all clean.
This commit is contained in:
Peter Steinberger
2026-05-30 00:52:23 +02:00
committed by GitHub
parent a6a99b923e
commit bc848b367f
28 changed files with 8064 additions and 596 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1420,7 +1420,7 @@
"canvas:a2ui:bundle": "node scripts/bundle-a2ui.mjs",
"changed:lanes": "node scripts/changed-lanes.mjs",
"check": "node scripts/check.mjs",
"check:architecture": "pnpm check:import-cycles && pnpm check:madge-import-cycles && pnpm check:deprecated-api-usage && pnpm check:deprecated-jsdoc",
"check:architecture": "pnpm check:import-cycles && pnpm check:madge-import-cycles && pnpm check:deprecated-api-usage && pnpm check:deprecated-jsdoc && pnpm db:kysely:check && pnpm lint:kysely",
"check:base-config-schema": "node --import tsx scripts/generate-base-config-schema.ts --check",
"check:bundled-channel-config-metadata": "node --import tsx scripts/generate-bundled-channel-config-metadata.ts --check",
"check:changed": "node scripts/check-changed.mjs",
@@ -1486,6 +1486,8 @@
"deps:ownership-surface:report": "node scripts/dependency-ownership-surface-report.mjs",
"deps:transitive-risk:report": "node scripts/transitive-manifest-risk-report.mjs",
"deps:vuln:gate": "node scripts/dependency-vulnerability-gate.mjs",
"db:kysely:check": "node scripts/generate-kysely-types.mjs --verify",
"db:kysely:gen": "node scripts/generate-kysely-types.mjs",
"dev": "node scripts/run-node.mjs",
"dev:ui:mock": "node --import tsx scripts/control-ui-mock-dev.ts",
"docs:bin": "node scripts/build-docs-list.mjs",
@@ -1534,6 +1536,7 @@
"lint:auth:pairing-account-scope": "node scripts/check-pairing-account-scope.mjs",
"lint:core": "node scripts/run-oxlint-shards.mjs --only=core --split-core",
"lint:docker-e2e": "node scripts/check-docker-e2e-boundaries.mjs",
"lint:kysely": "node scripts/check-kysely-guardrails.mjs",
"lint:docs": "pnpm dlx --config.resolution-mode=highest markdownlint-cli2 --config config/markdownlint-cli2.jsonc",
"lint:docs:fix": "pnpm dlx --config.resolution-mode=highest markdownlint-cli2 --config config/markdownlint-cli2.jsonc --fix",
"lint:extensions:no-deprecated-channel-access": "node --import tsx scripts/check-no-deprecated-channel-access.ts",

View File

@@ -0,0 +1,364 @@
#!/usr/bin/env node
import { promises as fs } from "node:fs";
import { createRequire } from "node:module";
import path from "node:path";
import {
collectTypeScriptFilesFromRoots,
getPropertyNameText,
resolveRepoRoot,
runAsScript,
toLine,
unwrapExpression,
} from "./lib/ts-guard-utils.mjs";
const require = createRequire(import.meta.url);
const ts = require("typescript");
const repoRoot = resolveRepoRoot(import.meta.url);
const sourceRoots = [path.join(repoRoot, "src")];
const kyselyRawAllowPaths = new Set([
"src/infra/kysely-node-sqlite.test.ts",
"src/infra/kysely-sync.ts",
]);
const compiledRawAllowPaths = new Set([
"src/infra/kysely-node-sqlite.ts",
"src/infra/kysely-node-sqlite.test.ts",
]);
const rawSqliteAllowPathGroups = {
"native Kysely adapter and sync execution": [
"src/infra/kysely-node-sqlite.ts",
"src/infra/kysely-sync.ts",
],
"SQLite database lifecycle, schema, transactions, and pragmas": [
"src/infra/node-sqlite.ts",
"src/infra/sqlite-integrity.ts",
"src/infra/sqlite-pragma.test-support.ts",
"src/infra/sqlite-transaction.ts",
"src/infra/sqlite-wal.ts",
"src/state/openclaw-state-db.ts",
"src/state/sqlite-schema-shape.test-support.ts",
],
"backup snapshot maintenance": ["src/commands/backup-verify.ts", "src/infra/backup-create.ts"],
"doctor legacy state migration": ["src/infra/state-migrations.ts"],
"Kysely-backed stores that own a DatabaseSync boundary": [
"src/acp/event-ledger.ts",
"src/agents/subagent-registry.store.ts",
"src/cron/run-log.ts",
"src/cron/store.ts",
"src/infra/outbound/current-conversation-bindings.ts",
"src/media/store.ts",
"src/plugin-sdk/memory-core-host-engine-storage.ts",
"src/plugin-state/plugin-state-store.sqlite.ts",
"src/proxy-capture/store.sqlite.ts",
"src/tasks/task-flow-registry.store.sqlite.ts",
"src/tasks/task-registry.store.sqlite.ts",
"src/tui/tui-last-session.ts",
],
};
const rawSqliteAllowPathReasons = new Map();
for (const [reason, paths] of Object.entries(rawSqliteAllowPathGroups)) {
for (const allowedPath of paths) {
if (rawSqliteAllowPathReasons.has(allowedPath)) {
throw new Error(`Duplicate raw SQLite allowlist path: ${allowedPath}`);
}
rawSqliteAllowPathReasons.set(allowedPath, reason);
}
}
function lineText(sourceFile, node) {
const line = toLine(sourceFile, node);
return sourceFile.text.split("\n")[line - 1] ?? "";
}
function hasAllowComment(sourceFile, node, token) {
const line = lineText(sourceFile, node);
if (line.includes(token)) {
return true;
}
const leading = ts.getLeadingCommentRanges(sourceFile.text, node.pos) ?? [];
return leading.some((range) => sourceFile.text.slice(range.pos, range.end).includes(token));
}
function importSource(node) {
const moduleSpecifier = node.moduleSpecifier;
return ts.isStringLiteral(moduleSpecifier) ? moduleSpecifier.text : "";
}
function collectImports(sourceFile) {
const kyselySqlNames = new Set();
const compiledQueryNames = new Set();
const syncHelperNames = new Set();
let hasKyselyContext = false;
let hasSqliteContext = false;
for (const statement of sourceFile.statements) {
if (!ts.isImportDeclaration(statement)) {
continue;
}
const source = importSource(statement);
const clause = statement.importClause;
const namedBindings = clause?.namedBindings;
if (source === "kysely") {
hasKyselyContext = true;
if (namedBindings && ts.isNamedImports(namedBindings)) {
for (const element of namedBindings.elements) {
const importedName = element.propertyName?.text ?? element.name.text;
if (importedName === "sql") {
kyselySqlNames.add(element.name.text);
}
if (importedName === "CompiledQuery") {
compiledQueryNames.add(element.name.text);
}
}
}
}
if (source.endsWith("kysely-sync.js") || source.endsWith("kysely-node-sqlite.js")) {
hasKyselyContext = true;
if (namedBindings && ts.isNamedImports(namedBindings)) {
for (const element of namedBindings.elements) {
const importedName = element.propertyName?.text ?? element.name.text;
if (
importedName === "executeSqliteQuerySync" ||
importedName === "executeSqliteQueryTakeFirstSync" ||
importedName === "executeSqliteQueryTakeFirstOrThrowSync"
) {
syncHelperNames.add(element.name.text);
}
if (importedName === "getNodeSqliteKysely") {
hasKyselyContext = true;
hasSqliteContext = true;
}
}
}
}
if (
source === "node:sqlite" ||
source.endsWith("node-sqlite.js") ||
source.endsWith("sqlite-transaction.js") ||
source.endsWith("sqlite-wal.js") ||
source.endsWith("openclaw-state-db.js")
) {
hasSqliteContext = true;
}
}
return {
compiledQueryNames,
hasKyselyContext,
hasSqliteContext,
kyselySqlNames,
syncHelperNames,
};
}
function addViolation(violations, sourceFile, node, message) {
violations.push({
line: toLine(sourceFile, node),
message,
});
}
function isIdentifierNamed(node, names) {
const unwrapped = unwrapExpression(node);
return ts.isIdentifier(unwrapped) && names.has(unwrapped.text);
}
function isTestPath(relativePath) {
return (
/\.(?:test|spec|e2e)\.ts$/u.test(relativePath) ||
relativePath.includes(".test-helpers.") ||
relativePath.includes(".test-support.")
);
}
function isSqliteStorePath(relativePath) {
return relativePath.endsWith(".sqlite.ts") || relativePath.includes(".store.sqlite.ts");
}
function isLikelySqliteReceiver(expression) {
const unwrapped = unwrapExpression(expression);
if (ts.isIdentifier(unwrapped)) {
return /^(?:db|database|legacyDb|stateDb|agentDb)$/u.test(unwrapped.text);
}
return ts.isPropertyAccessExpression(unwrapped) && getPropertyNameText(unwrapped.name) === "db";
}
function isPersistedRowExpression(expression) {
const unwrapped = unwrapExpression(expression);
if (ts.isPropertyAccessExpression(unwrapped)) {
const owner = unwrapExpression(unwrapped.expression);
return ts.isIdentifier(owner) && /^(?:row|record|entry)$/u.test(owner.text);
}
if (ts.isElementAccessExpression(unwrapped)) {
const owner = unwrapExpression(unwrapped.expression);
return ts.isIdentifier(owner) && /^(?:row|record|entry)$/u.test(owner.text);
}
return false;
}
function isPersistedStringCastType(typeText) {
return [
/\bTaskRecord\["(?:runtime|scopeKind|status|deliveryStatus|notifyPolicy|terminalOutcome)"\]/u,
/\bTaskFlowRecord\["(?:status|notifyPolicy)"\]/u,
/\bTaskFlowSyncMode\b/u,
/\bVirtualAgentFsEntryKind\b/u,
/\b[A-Z][A-Za-z0-9]*(?:Status|Kind|Mode|Policy|Runtime|Outcome)\b/u,
].some((pattern) => pattern.test(typeText));
}
export function collectKyselyGuardrailViolations(content, relativePath) {
const sourceFile = ts.createSourceFile(relativePath, content, ts.ScriptTarget.Latest, true);
const imports = collectImports(sourceFile);
const violations = [];
function visit(node) {
if (
isSqliteStorePath(relativePath) &&
(ts.isAsExpression(node) || ts.isTypeAssertionExpression(node)) &&
isPersistedStringCastType(node.type.getText(sourceFile)) &&
isPersistedRowExpression(node.expression) &&
!hasAllowComment(sourceFile, node, "sqlite-allow-persisted-cast")
) {
addViolation(
violations,
sourceFile,
node,
"persisted SQLite enum-like values must be parsed through closed validators, not cast",
);
}
if (
ts.isCallExpression(node) &&
ts.isIdentifier(node.expression) &&
imports.syncHelperNames.has(node.expression.text) &&
node.typeArguments?.length &&
!hasAllowComment(sourceFile, node, "kysely-allow-raw")
) {
addViolation(
violations,
sourceFile,
node,
"sync helper row generic at call site; let Kysely infer builder result rows",
);
}
if (
ts.isTaggedTemplateExpression(node) &&
node.typeArguments?.length &&
isIdentifierNamed(node.tag, imports.kyselySqlNames) &&
!kyselyRawAllowPaths.has(relativePath) &&
!hasAllowComment(sourceFile, node, "kysely-allow-raw")
) {
addViolation(
violations,
sourceFile,
node,
"typed raw sql snippet needs a small helper or allowlisted boundary",
);
}
if (
ts.isCallExpression(node) &&
ts.isPropertyAccessExpression(node.expression) &&
isIdentifierNamed(node.expression.expression, imports.kyselySqlNames) &&
["ref", "table", "id", "raw"].includes(getPropertyNameText(node.expression.name) ?? "") &&
!hasAllowComment(sourceFile, node, "kysely-allow-raw")
) {
addViolation(
violations,
sourceFile,
node,
"raw Kysely identifier helper requires a closed-set validator and local allow comment",
);
}
if (
imports.hasKyselyContext &&
ts.isPropertyAccessExpression(node) &&
getPropertyNameText(node.name) === "dynamic" &&
!hasAllowComment(sourceFile, node, "kysely-allow-raw")
) {
addViolation(
violations,
sourceFile,
node,
"Kysely dynamic refs bypass literal reference checking; use only behind closed unions",
);
}
if (
ts.isCallExpression(node) &&
ts.isPropertyAccessExpression(node.expression) &&
isIdentifierNamed(node.expression.expression, imports.compiledQueryNames) &&
getPropertyNameText(node.expression.name) === "raw" &&
!compiledRawAllowPaths.has(relativePath) &&
!hasAllowComment(sourceFile, node, "kysely-allow-raw")
) {
addViolation(
violations,
sourceFile,
node,
"CompiledQuery.raw is only allowed in the native SQLite dialect/test boundary",
);
}
if (
imports.hasSqliteContext &&
!isTestPath(relativePath) &&
ts.isCallExpression(node) &&
ts.isPropertyAccessExpression(node.expression) &&
["prepare", "exec"].includes(getPropertyNameText(node.expression.name) ?? "") &&
isLikelySqliteReceiver(node.expression.expression) &&
!rawSqliteAllowPathReasons.has(relativePath) &&
!hasAllowComment(sourceFile, node, "sqlite-allow-raw")
) {
addViolation(
violations,
sourceFile,
node,
"new raw node:sqlite access requires Kysely or an explicit raw SQLite allowlist entry",
);
}
ts.forEachChild(node, visit);
}
visit(sourceFile);
return violations;
}
export async function collectKyselyGuardrails() {
const files = await collectTypeScriptFilesFromRoots(sourceRoots, { includeTests: true });
const violations = [];
for (const filePath of files) {
const relativePath = path.relative(repoRoot, filePath).split(path.sep).join("/");
const content = await fs.readFile(filePath, "utf8");
for (const violation of collectKyselyGuardrailViolations(content, relativePath)) {
violations.push({ path: relativePath, ...violation });
}
}
return violations;
}
export async function main() {
const violations = await collectKyselyGuardrails();
if (violations.length === 0) {
console.log("Kysely guardrails OK");
return;
}
console.error("Kysely guardrail violations:");
for (const violation of violations) {
console.error(`- ${violation.path}:${violation.line}: ${violation.message}`);
}
process.exit(1);
}
runAsScript(import.meta.url, main);

View File

@@ -0,0 +1,154 @@
#!/usr/bin/env node
import fs from "node:fs";
import process from "node:process";
import { DatabaseSync } from "node:sqlite";
const SCHEMAS = [
{
name: "openclaw-state",
schema: "src/state/openclaw-state-schema.sql",
outFile: "src/state/openclaw-state-db.generated.d.ts",
schemaOutFile: "src/state/openclaw-state-schema.generated.ts",
schemaExport: "OPENCLAW_STATE_SCHEMA_SQL",
},
];
const verify = process.argv.includes("--verify") || process.argv.includes("--check");
function toInterfaceName(tableName) {
return tableName
.split("_")
.map((part) => `${part.slice(0, 1).toUpperCase()}${part.slice(1)}`)
.join("");
}
function columnBaseType(columnType) {
const normalized = columnType.toUpperCase();
if (normalized.includes("BLOB")) {
return "Uint8Array";
}
if (
normalized.includes("INT") ||
normalized.includes("REAL") ||
normalized.includes("FLOA") ||
normalized.includes("DOUB") ||
normalized.includes("NUM") ||
normalized.includes("DEC")
) {
return "number";
}
return "string";
}
function columnType(column, primaryKeyColumnCount) {
const baseType = columnBaseType(String(column.type ?? ""));
const generated =
column.dflt_value != null ||
(primaryKeyColumnCount === 1 &&
Number(column.pk) > 0 &&
String(column.type ?? "")
.toUpperCase()
.includes("INT"));
const nullable = Number(column.notnull) !== 1 && !generated;
const valueType = nullable ? `${baseType} | null` : baseType;
return generated ? `Generated<${valueType}>` : valueType;
}
function quoteSqliteIdentifier(identifier) {
return `"${identifier.replaceAll('"', '""')}"`;
}
function generateTypes(db) {
const tables = db
.prepare(
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name NOT LIKE 'sqlite_%' ORDER BY name;",
)
.all()
.map((row) => String(row.name));
const lines = [
"/**",
" * This file was generated by kysely-codegen.",
" * Please do not edit it manually.",
" */",
"",
'import type { ColumnType } from "kysely";',
"",
"export type Generated<T> =",
" T extends ColumnType<infer S, infer I, infer U>",
" ? ColumnType<S, I | undefined, U>",
" : ColumnType<T, T | undefined, T>;",
"",
];
const interfaces = [];
for (const table of tables) {
const interfaceName = toInterfaceName(table);
interfaces.push({ interfaceName, table });
lines.push(`export interface ${interfaceName} {`);
const columns = db
.prepare(`PRAGMA table_xinfo(${quoteSqliteIdentifier(table)});`)
.all()
.filter((column) => Number(column.hidden) === 0)
.toSorted((left, right) => String(left.name).localeCompare(String(right.name)));
const primaryKeyColumnCount = columns.filter((column) => Number(column.pk) > 0).length;
for (const column of columns) {
lines.push(` ${column.name}: ${columnType(column, primaryKeyColumnCount)};`);
}
lines.push("}", "");
}
lines.push("export interface DB {");
for (const { interfaceName, table } of interfaces) {
lines.push(` ${table}: ${interfaceName};`);
}
lines.push("}", "");
return lines.join("\n");
}
function readUtf8(file) {
return fs.readFileSync(file, "utf8");
}
function generatedSchemaModule(schema) {
const source = readUtf8(schema.schema).trimEnd();
const literal = source.replaceAll("\\", "\\\\").replaceAll("`", "\\`").replaceAll("${", "\\${");
return [
"/**",
" * This file was generated from the SQLite schema source.",
" * Please do not edit it manually.",
" */",
"",
`export const ${schema.schemaExport} = \`${literal}\\n\`;`,
"",
].join("\n");
}
function generate(schema) {
const db = new DatabaseSync(":memory:");
try {
db.exec(readUtf8(schema.schema));
const typesSource = generateTypes(db);
const schemaSource = generatedSchemaModule(schema);
if (verify) {
if (typesSource !== readUtf8(schema.outFile)) {
console.error(`${schema.outFile} is out of date. Run pnpm db:kysely:gen.`);
process.exitCode = 1;
}
if (schemaSource !== readUtf8(schema.schemaOutFile)) {
console.error(`${schema.schemaOutFile} is out of date. Run pnpm db:kysely:gen.`);
process.exitCode = 1;
}
} else {
fs.writeFileSync(schema.outFile, typesSource);
fs.writeFileSync(schema.schemaOutFile, schemaSource);
}
} finally {
db.close();
}
}
for (const schema of SCHEMAS) {
generate(schema);
}

View File

@@ -3,6 +3,7 @@ import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import {
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN,
createPluginStateKeyedStore,
@@ -259,6 +260,34 @@ function writeLegacySessionsFixture(params: {
return legacySessionsDir;
}
function writeLegacyPluginStateSidecar(root: string): string {
const sourcePath = path.join(root, "plugin-state", "state.sqlite");
fs.mkdirSync(path.dirname(sourcePath), { recursive: true });
const sqlite = requireNodeSqlite();
const db = new sqlite.DatabaseSync(sourcePath);
try {
db.exec(`
CREATE TABLE plugin_state_entries (
plugin_id TEXT NOT NULL,
namespace TEXT NOT NULL,
entry_key TEXT NOT NULL,
value_json TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER,
PRIMARY KEY (plugin_id, namespace, entry_key)
);
`);
db.prepare(`
INSERT INTO plugin_state_entries (
plugin_id, namespace, entry_key, value_json, created_at, expires_at
) VALUES (?, ?, ?, ?, ?, ?)
`).run("discord", "components", "interaction:1", '{"ok":true}', 1000, null);
} finally {
db.close();
}
return sourcePath;
}
async function detectAndRunMigrations(params: {
root: string;
cfg: OpenClawConfig;
@@ -778,6 +807,185 @@ describe("doctor legacy state migrations", () => {
});
});
it("imports the shipped plugin-state SQLite sidecar into shared state", async () => {
const root = await makeTempRoot();
const sourcePath = writeLegacyPluginStateSidecar(root);
const detected = await detectLegacyStateMigrations({
cfg: {},
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
expect(detected.pluginStateSidecar).toEqual({ sourcePath, hasLegacy: true });
expect(detected.preview).toContain(
`- Plugin state sidecar: ${sourcePath} → shared SQLite state`,
);
const result = await runLegacyStateMigrations({ detected });
expect(result.warnings).toStrictEqual([]);
expect(result.changes).toContain("Migrated 1 plugin-state sidecar entry → shared SQLite state");
expect(result.changes).toContain(
`Archived plugin-state sidecar legacy source → ${sourcePath}.migrated`,
);
expect(fs.existsSync(sourcePath)).toBe(false);
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(true);
await withStateDir(root, async () => {
const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", {
namespace: "components",
maxEntries: 10,
});
await expect(store.lookup("interaction:1")).resolves.toEqual({ ok: true });
});
});
it("auto-migrates the shipped plugin-state SQLite sidecar by itself", async () => {
const root = await makeTempRoot();
const sourcePath = writeLegacyPluginStateSidecar(root);
const result = await autoMigrateLegacyState({
cfg: {},
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
log: { info: vi.fn(), warn: vi.fn() },
});
expect(result.skipped).toBe(false);
expect(result.changes).toContain("Migrated 1 plugin-state sidecar entry → shared SQLite state");
expect(fs.existsSync(sourcePath)).toBe(false);
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(true);
await withStateDir(root, async () => {
const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", {
namespace: "components",
maxEntries: 10,
});
await expect(store.lookup("interaction:1")).resolves.toEqual({ ok: true });
});
});
it("auto-migrates the plugin-state sidecar when custom agent dirs skip session migration", async () => {
const root = await makeTempRoot();
const sourcePath = writeLegacyPluginStateSidecar(root);
const result = await autoMigrateLegacyState({
cfg: {},
env: {
OPENCLAW_STATE_DIR: root,
OPENCLAW_AGENT_DIR: path.join(root, "custom-agent"),
} as NodeJS.ProcessEnv,
log: { info: vi.fn(), warn: vi.fn() },
});
expect(result.skipped).toBe(true);
expect(result.changes).toContain("Migrated 1 plugin-state sidecar entry → shared SQLite state");
expect(fs.existsSync(sourcePath)).toBe(false);
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(true);
await withStateDir(root, async () => {
const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", {
namespace: "components",
maxEntries: 10,
});
await expect(store.lookup("interaction:1")).resolves.toEqual({ ok: true });
});
});
it("keeps the plugin-state sidecar when shared state already has a conflicting row", async () => {
const root = await makeTempRoot();
const sourcePath = writeLegacyPluginStateSidecar(root);
await withStateDir(root, async () => {
const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", {
namespace: "components",
maxEntries: 10,
});
await store.register("interaction:1", { ok: false });
});
resetPluginStateStoreForTests();
const detected = await detectLegacyStateMigrations({
cfg: {},
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
const result = await runLegacyStateMigrations({ detected });
expect(result.warnings).toStrictEqual([
"Left plugin-state sidecar in place because 1 row already existed in shared state: discord/components/interaction:1",
]);
expect(fs.existsSync(sourcePath)).toBe(true);
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(false);
await withStateDir(root, async () => {
const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", {
namespace: "components",
maxEntries: 10,
});
await expect(store.lookup("interaction:1")).resolves.toEqual({ ok: false });
});
});
it("archives the plugin-state sidecar when conflicting rows already match", async () => {
const root = await makeTempRoot();
const sourcePath = writeLegacyPluginStateSidecar(root);
await withStateDir(root, async () => {
seedPluginStateEntriesForTests([
{
pluginId: "discord",
namespace: "components",
key: "interaction:1",
value: { ok: true },
createdAt: 1000,
expiresAt: null,
},
]);
});
resetPluginStateStoreForTests();
const detected = await detectLegacyStateMigrations({
cfg: {},
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
const result = await runLegacyStateMigrations({ detected });
expect(result.warnings).toStrictEqual([]);
expect(fs.existsSync(sourcePath)).toBe(false);
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(true);
});
it("lets live sidecar rows replace expired shared plugin state during migration", async () => {
const root = await makeTempRoot();
const sourcePath = writeLegacyPluginStateSidecar(root);
await withStateDir(root, async () => {
seedPluginStateEntriesForTests([
{
pluginId: "discord",
namespace: "components",
key: "interaction:1",
value: { ok: false },
expiresAt: 1,
},
]);
});
resetPluginStateStoreForTests();
const detected = await detectLegacyStateMigrations({
cfg: {},
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
const result = await runLegacyStateMigrations({ detected });
expect(result.warnings).toStrictEqual([]);
expect(fs.existsSync(sourcePath)).toBe(false);
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(true);
await withStateDir(root, async () => {
const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", {
namespace: "components",
maxEntries: 10,
});
await expect(store.lookup("interaction:1")).resolves.toEqual({ ok: true });
});
});
it("routes legacy state to the default agent entry", async () => {
const root = await makeTempRoot();
const cfg: OpenClawConfig = {

View File

@@ -206,6 +206,10 @@ function createLegacyStateMigrationDetectionResult(params?: {
targetDir: "/tmp/state/agents/main/agent",
hasLegacy: false,
},
pluginStateSidecar: {
sourcePath: "/tmp/state/plugin-state/state.sqlite",
hasLegacy: false,
},
channelPlans: {
hasLegacy: false,
plans: [],

76
src/infra/kysely-sync.ts Normal file
View File

@@ -0,0 +1,76 @@
import type { DatabaseSync, SQLInputValue } from "node:sqlite";
import type { CompiledQuery, Kysely, QueryResult } from "kysely";
import { InsertQueryNode, Kysely as KyselyInstance } from "kysely";
import { NodeSqliteKyselyDialect } from "./kysely-node-sqlite.js";
type CompilableQuery<Row = unknown> = {
compile(): CompiledQuery<Row>;
};
const kyselyByDatabase = new WeakMap<DatabaseSync, Kysely<unknown>>();
export function getNodeSqliteKysely<Database>(db: DatabaseSync): Kysely<Database> {
const existing = kyselyByDatabase.get(db);
if (existing) {
return existing as Kysely<Database>;
}
const kysely = new KyselyInstance<Database>({
dialect: new NodeSqliteKyselyDialect({ database: db }),
});
kyselyByDatabase.set(db, kysely as Kysely<unknown>);
return kysely;
}
export function executeCompiledSqliteQuerySync<Row>(
db: DatabaseSync,
compiledQuery: CompiledQuery<Row>,
): QueryResult<Row> {
const statement = db.prepare(compiledQuery.sql);
const parameters = compiledQuery.parameters as SQLInputValue[];
if (statement.columns().length > 0) {
return { rows: statement.all(...parameters) as Row[] };
}
const { changes, lastInsertRowid } = statement.run(...parameters);
const result: QueryResult<Row> = {
numAffectedRows: BigInt(changes),
rows: [],
};
if (InsertQueryNode.is(compiledQuery.query) && changes > 0) {
return {
...result,
insertId: BigInt(lastInsertRowid),
};
}
return result;
}
export function executeSqliteQuerySync<Row>(
db: DatabaseSync,
query: CompilableQuery<Row>,
): QueryResult<Row> {
return executeCompiledSqliteQuerySync<Row>(db, query.compile());
}
export function executeSqliteQueryTakeFirstSync<Row>(
db: DatabaseSync,
query: CompilableQuery<Row>,
): Row | undefined {
return executeSqliteQuerySync<Row>(db, query).rows[0];
}
export function executeSqliteQueryTakeFirstOrThrowSync<Row>(
db: DatabaseSync,
query: CompilableQuery<Row>,
): Row {
const row = executeSqliteQueryTakeFirstSync<Row>(db, query);
if (!row) {
throw new Error("Kysely query returned no rows");
}
return row;
}
export function clearNodeSqliteKyselyCacheForDatabase(db: DatabaseSync): void {
kyselyByDatabase.delete(db);
}

View File

@@ -0,0 +1,67 @@
import type { DatabaseSync } from "node:sqlite";
import { describe, expect, expectTypeOf, it } from "vitest";
import {
executeSqliteQuerySync,
executeSqliteQueryTakeFirstSync,
getNodeSqliteKysely,
} from "./kysely-sync.js";
type TypeTestDatabase = {
type_test_items: {
id: number;
name: string | null;
data: Uint8Array;
};
};
describe("kysely sync helper types", () => {
it("preserves Kysely builder result rows through sync helpers", () => {
const nativeDb = {} as DatabaseSync;
const db = getNodeSqliteKysely<TypeTestDatabase>(nativeDb);
const query = db
.selectFrom("type_test_items")
.select((eb) => ["id as itemId", "name", "data", eb.fn.countAll<number>().as("total")])
.groupBy(["id", "name", "data"]);
const assertTypes = () => {
const result = executeSqliteQuerySync(nativeDb, query);
expectTypeOf(result.rows).toEqualTypeOf<
Array<{
itemId: number;
name: string | null;
data: Uint8Array;
total: number;
}>
>();
const row = executeSqliteQueryTakeFirstSync(nativeDb, query);
expectTypeOf(row).toEqualTypeOf<
| {
itemId: number;
name: string | null;
data: Uint8Array;
total: number;
}
| undefined
>();
// @ts-expect-error Kysely checks selected column string literals.
db.selectFrom("type_test_items").select("missing_column");
// @ts-expect-error Kysely checks table string literals.
db.selectFrom("missing_table").selectAll();
// @ts-expect-error Kysely checks where-reference string literals.
db.selectFrom("type_test_items").select("id").where("missing_column", "=", 1);
// @ts-expect-error Kysely checks grouped column string literals.
query.groupBy("missing_column");
// @ts-expect-error Kysely checks order references and selected aliases.
query.orderBy("missingAlias");
};
void assertTypes;
expect(query.compile().sql).toContain("select");
});
});

View File

@@ -0,0 +1,14 @@
import type { DatabaseSync } from "node:sqlite";
export type SqliteNumberPragma =
| "busy_timeout"
| "foreign_keys"
| "synchronous"
| "user_version"
| "wal_autocheckpoint";
export function readSqliteNumberPragma(db: DatabaseSync, pragma: SqliteNumberPragma): number {
const row = db.prepare(`PRAGMA ${pragma}`).get() as Record<string, unknown> | undefined;
const value = row?.[pragma] ?? row?.timeout;
return typeof value === "bigint" ? Number(value) : Number(value);
}

View File

@@ -0,0 +1,123 @@
import { afterEach, describe, expect, it } from "vitest";
import { requireNodeSqlite } from "./node-sqlite.js";
import { runSqliteImmediateTransactionSync } from "./sqlite-transaction.js";
const openDatabases: Array<import("node:sqlite").DatabaseSync> = [];
function createDatabase(): import("node:sqlite").DatabaseSync {
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(":memory:");
db.exec("CREATE TABLE entries (id TEXT NOT NULL PRIMARY KEY, value TEXT NOT NULL);");
openDatabases.push(db);
return db;
}
function readEntries(db: import("node:sqlite").DatabaseSync): string[] {
return db
.prepare("SELECT id FROM entries ORDER BY id")
.all()
.map((row) => (row as { id: string }).id);
}
afterEach(() => {
for (const db of openDatabases.splice(0)) {
db.close();
}
});
describe("runSqliteImmediateTransactionSync", () => {
it("keeps outer writes when a nested savepoint rolls back", () => {
const db = createDatabase();
runSqliteImmediateTransactionSync(db, () => {
db.prepare("INSERT INTO entries(id, value) VALUES (?, ?)").run("outer", "kept");
expect(() =>
runSqliteImmediateTransactionSync(db, () => {
db.prepare("INSERT INTO entries(id, value) VALUES (?, ?)").run("inner", "rolled back");
throw new Error("nested failure");
}),
).toThrow("nested failure");
});
expect(readEntries(db)).toEqual(["outer"]);
});
it("commits nested savepoint writes with the outer transaction", () => {
const db = createDatabase();
runSqliteImmediateTransactionSync(db, () => {
db.prepare("INSERT INTO entries(id, value) VALUES (?, ?)").run("outer", "kept");
runSqliteImmediateTransactionSync(db, () => {
db.prepare("INSERT INTO entries(id, value) VALUES (?, ?)").run("inner", "kept");
});
});
expect(readEntries(db)).toEqual(["inner", "outer"]);
});
it("rejects Promise-returning operations and rolls back their synchronous writes", () => {
const db = createDatabase();
expect(() =>
runSqliteImmediateTransactionSync(db, async () => {
db.prepare("INSERT INTO entries(id, value) VALUES (?, ?)").run("async", "rolled back");
return "done";
}),
).toThrow("must be synchronous");
expect(readEntries(db)).toEqual([]);
runSqliteImmediateTransactionSync(db, () => {
db.prepare("INSERT INTO entries(id, value) VALUES (?, ?)").run("after", "works");
});
expect(readEntries(db)).toEqual(["after"]);
});
it("retries retryable commit failures without rolling back successful writes", () => {
const execCalls: string[] = [];
let commitAttempts = 0;
const db = {
exec(sql: string) {
execCalls.push(sql);
if (sql === "COMMIT") {
commitAttempts += 1;
if (commitAttempts === 1) {
throw Object.assign(new Error("database is busy"), { code: "SQLITE_BUSY" });
}
}
},
} as import("node:sqlite").DatabaseSync;
const result = runSqliteImmediateTransactionSync(db, () => "committed");
expect(result).toBe("committed");
expect(execCalls).toEqual(["BEGIN IMMEDIATE", "COMMIT", "COMMIT"]);
});
it("rolls back and clears depth after exhausted retryable commit failures", () => {
const execCalls: string[] = [];
let failCommits = true;
const db = {
exec(sql: string) {
execCalls.push(sql);
if (failCommits && sql === "COMMIT") {
throw Object.assign(new Error("database is busy"), { code: "SQLITE_BUSY" });
}
},
close() {},
} as import("node:sqlite").DatabaseSync;
expect(() => runSqliteImmediateTransactionSync(db, () => "not committed")).toThrow(
"database is busy",
);
expect(execCalls.filter((sql) => sql === "COMMIT")).toHaveLength(8);
expect(execCalls.at(-1)).toBe("ROLLBACK");
execCalls.length = 0;
failCommits = false;
const result = runSqliteImmediateTransactionSync(db, () => "committed later");
expect(result).toBe("committed later");
expect(execCalls).toEqual(["BEGIN IMMEDIATE", "COMMIT"]);
});
});

View File

@@ -0,0 +1,132 @@
import type { DatabaseSync } from "node:sqlite";
const transactionDepthByDatabase = new WeakMap<DatabaseSync, number>();
const RETRYABLE_COMMIT_ERROR_CODES = new Set(["SQLITE_BUSY", "SQLITE_LOCKED"]);
const MAX_COMMIT_ATTEMPTS = 8;
let nextSavepointId = 0;
function nextSavepointName(): string {
nextSavepointId += 1;
return `openclaw_tx_${nextSavepointId}`;
}
function isPromiseLike(value: unknown): value is PromiseLike<unknown> {
return Boolean(value && typeof (value as { then?: unknown }).then === "function");
}
function assertSyncTransactionResult(value: unknown): void {
if (isPromiseLike(value)) {
throw new Error(
"SQLite write transactions must be synchronous; Promise returns are not supported.",
);
}
}
function isRetryableCommitError(error: unknown): boolean {
const code = error && typeof error === "object" ? (error as { code?: unknown }).code : undefined;
return typeof code === "string" && RETRYABLE_COMMIT_ERROR_CODES.has(code);
}
function commitImmediateTransaction(db: DatabaseSync): void {
for (let attempt = 1; ; attempt += 1) {
try {
db.exec("COMMIT");
return;
} catch (error) {
if (!isRetryableCommitError(error) || attempt >= MAX_COMMIT_ATTEMPTS) {
throw error;
}
}
}
}
function abortImmediateTransaction(db: DatabaseSync): void {
try {
db.exec("ROLLBACK");
} catch {
// If rollback itself fails, close the handle so callers cannot keep using a
// connection that may still hold an abandoned write transaction.
try {
db.close();
} catch {
// Preserve the original transaction error; close failure is secondary.
}
}
}
function getTransactionDepth(db: DatabaseSync): number {
return transactionDepthByDatabase.get(db) ?? 0;
}
function setTransactionDepth(db: DatabaseSync, depth: number): void {
if (depth <= 0) {
transactionDepthByDatabase.delete(db);
return;
}
transactionDepthByDatabase.set(db, depth);
}
export function runSqliteImmediateTransactionSync<T>(db: DatabaseSync, operation: () => T): T {
const depth = getTransactionDepth(db);
if (depth > 0) {
const savepointName = nextSavepointName();
db.exec(`SAVEPOINT ${savepointName}`);
setTransactionDepth(db, depth + 1);
try {
const result = operation();
assertSyncTransactionResult(result);
db.exec(`RELEASE SAVEPOINT ${savepointName}`);
return result;
} catch (error) {
try {
db.exec(`ROLLBACK TO SAVEPOINT ${savepointName}`);
} finally {
db.exec(`RELEASE SAVEPOINT ${savepointName}`);
}
throw error;
} finally {
setTransactionDepth(db, depth);
}
}
db.exec("BEGIN IMMEDIATE");
setTransactionDepth(db, 1);
let transactionStillActive = true;
let result: T;
try {
result = operation();
assertSyncTransactionResult(result);
} catch (error) {
try {
abortImmediateTransaction(db);
transactionStillActive = false;
} catch {
// Preserve the original error; rollback failure is secondary.
}
throw error;
} finally {
if (!transactionStillActive) {
setTransactionDepth(db, 0);
}
}
try {
commitImmediateTransaction(db);
transactionStillActive = false;
return result;
} catch (error) {
try {
abortImmediateTransaction(db);
transactionStillActive = false;
} catch {
// Preserve the original error; rollback failure is secondary.
}
throw error;
} finally {
if (!transactionStillActive) {
setTransactionDepth(db, 0);
}
}
}

View File

@@ -18,6 +18,8 @@ export type SqliteWalMaintenanceOptions = {
autoCheckpointPages?: number;
checkpointIntervalMs?: number;
checkpointMode?: SqliteWalCheckpointMode;
databaseLabel?: string;
databasePath?: string;
onCheckpointError?: (error: unknown) => void;
};
@@ -41,7 +43,6 @@ export function configureSqliteWalMaintenance(
"checkpointIntervalMs",
);
const checkpointMode = options.checkpointMode ?? "TRUNCATE";
db.exec("PRAGMA journal_mode = WAL;");
db.exec(`PRAGMA wal_autocheckpoint = ${autoCheckpointPages};`);

View File

@@ -34,7 +34,15 @@ import {
} from "../routing/session-key.js";
import { normalizeSessionKeyPreservingOpaquePeerIds } from "../sessions/session-key-utils.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
import { runOpenClawStateWriteTransaction } from "../state/openclaw-state-db.js";
import { expandHomePrefix } from "./home-dir.js";
import {
executeSqliteQuerySync,
executeSqliteQueryTakeFirstSync,
getNodeSqliteKysely,
} from "./kysely-sync.js";
import { requireNodeSqlite } from "./node-sqlite.js";
import { isWithinDir } from "./path-safety.js";
import {
ensureDir,
@@ -69,6 +77,10 @@ export type LegacyStateDetection = {
hasLegacy: boolean;
plans: ChannelLegacyStateMigrationPlan[];
};
pluginStateSidecar: {
sourcePath: string;
hasLegacy: boolean;
};
preview: string[];
};
@@ -89,6 +101,25 @@ type LegacySessionSurface = {
}) => string | null | undefined;
};
type LegacyPluginStateSidecarRow = {
plugin_id: string;
namespace: string;
entry_key: string;
value_json: string;
created_at: number | bigint;
expires_at: number | bigint | null;
};
type LegacyPluginStateImportDatabase = Pick<OpenClawStateKyselyDatabase, "plugin_state_entries">;
const PLUGIN_STATE_SQLITE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
class LegacyPluginStateSidecarConflictError extends Error {
constructor(readonly conflictedKeys: string[]) {
super("legacy plugin-state sidecar conflicts with shared state");
}
}
function getLegacySessionSurfaces(): LegacySessionSurface[] {
// Legacy migrations run on cold doctor/startup paths. Prefer the narrower
// setup plugin surface here so session-key cleanup does not materialize full
@@ -125,6 +156,185 @@ function buildLegacyMigrationPreview(plan: ChannelLegacyStateMigrationPlan): str
return `- ${plan.label}: ${plan.sourcePath}${plan.targetPath}`;
}
function resolveLegacyPluginStateSidecarPath(stateDir: string): string {
return path.join(stateDir, "plugin-state", "state.sqlite");
}
function readLegacyPluginStateSidecarRows(sourcePath: string): LegacyPluginStateSidecarRow[] {
const sqlite = requireNodeSqlite();
const db = new sqlite.DatabaseSync(sourcePath, { readOnly: true });
try {
return db
.prepare(
`
SELECT plugin_id, namespace, entry_key, value_json, created_at, expires_at
FROM plugin_state_entries
ORDER BY plugin_id ASC, namespace ASC, entry_key ASC
`,
)
.all() as LegacyPluginStateSidecarRow[];
} finally {
db.close();
}
}
function normalizeLegacySqliteInteger(value: number | bigint | null): number | null {
if (typeof value === "bigint") {
return Number(value);
}
return value;
}
function legacyPluginStateRowsMatch(
existing: { value_json: string; created_at: number | bigint; expires_at: number | bigint | null },
legacy: LegacyPluginStateSidecarRow,
): boolean {
return (
existing.value_json === legacy.value_json &&
normalizeLegacySqliteInteger(existing.created_at) ===
normalizeLegacySqliteInteger(legacy.created_at) &&
normalizeLegacySqliteInteger(existing.expires_at) ===
normalizeLegacySqliteInteger(legacy.expires_at)
);
}
function archiveLegacyPluginStateSidecar(params: {
sourcePath: string;
changes: string[];
warnings: string[];
}): void {
const existingSources = PLUGIN_STATE_SQLITE_SIDECAR_SUFFIXES.map(
(suffix) => `${params.sourcePath}${suffix}`,
).filter(fileExists);
const existingArchives = existingSources
.map((sourcePath) => `${sourcePath}.migrated`)
.filter(fileExists);
if (existingArchives.length > 0) {
params.warnings.push(
`Left migrated plugin-state sidecar in place because archive already exists: ${existingArchives[0]}`,
);
return;
}
for (const sourcePath of existingSources) {
const archivedPath = `${sourcePath}.migrated`;
try {
fs.renameSync(sourcePath, archivedPath);
} catch (err) {
params.warnings.push(`Failed archiving plugin-state sidecar ${sourcePath}: ${String(err)}`);
return;
}
}
params.changes.push(
`Archived plugin-state sidecar legacy source → ${params.sourcePath}.migrated`,
);
}
async function migrateLegacyPluginStateSidecar(params: {
stateDir: string;
}): Promise<{ changes: string[]; warnings: string[] }> {
const sourcePath = resolveLegacyPluginStateSidecarPath(params.stateDir);
if (!fileExists(sourcePath)) {
return { changes: [], warnings: [] };
}
const changes: string[] = [];
const warnings: string[] = [];
let rows: LegacyPluginStateSidecarRow[];
try {
rows = readLegacyPluginStateSidecarRows(sourcePath);
} catch (err) {
return {
changes,
warnings: [`Failed reading plugin-state sidecar ${sourcePath}: ${String(err)}`],
};
}
try {
const conflictedKeys: string[] = [];
const rowsToInsert: LegacyPluginStateSidecarRow[] = [];
let imported = 0;
const now = Date.now();
runOpenClawStateWriteTransaction(
({ db }) => {
const stateDb = getNodeSqliteKysely<LegacyPluginStateImportDatabase>(db);
for (const row of rows) {
executeSqliteQuerySync(
db,
stateDb
.deleteFrom("plugin_state_entries")
.where("plugin_id", "=", row.plugin_id)
.where("namespace", "=", row.namespace)
.where("entry_key", "=", row.entry_key)
.where("expires_at", "is not", null)
.where("expires_at", "<=", now),
);
const existing = executeSqliteQueryTakeFirstSync(
db,
stateDb
.selectFrom("plugin_state_entries")
.select(["value_json", "created_at", "expires_at"])
.where("plugin_id", "=", row.plugin_id)
.where("namespace", "=", row.namespace)
.where("entry_key", "=", row.entry_key),
);
if (existing) {
if (!legacyPluginStateRowsMatch(existing, row)) {
conflictedKeys.push(`${row.plugin_id}/${row.namespace}/${row.entry_key}`);
}
continue;
}
rowsToInsert.push(row);
}
if (conflictedKeys.length > 0) {
throw new LegacyPluginStateSidecarConflictError(conflictedKeys);
}
for (const row of rowsToInsert) {
executeSqliteQuerySync(
db,
stateDb
.insertInto("plugin_state_entries")
.values({
plugin_id: row.plugin_id,
namespace: row.namespace,
entry_key: row.entry_key,
value_json: row.value_json,
created_at: normalizeLegacySqliteInteger(row.created_at) ?? 0,
expires_at: normalizeLegacySqliteInteger(row.expires_at),
})
.onConflict((conflict) =>
conflict.columns(["plugin_id", "namespace", "entry_key"]).doNothing(),
),
);
imported += 1;
}
},
{ env: { ...process.env, OPENCLAW_STATE_DIR: params.stateDir } },
);
if (imported > 0) {
changes.push(
`Migrated ${imported} plugin-state sidecar ${imported === 1 ? "entry" : "entries"} → shared SQLite state`,
);
}
} catch (err) {
if (err instanceof LegacyPluginStateSidecarConflictError) {
return {
changes,
warnings: [
`Left plugin-state sidecar in place because ${err.conflictedKeys.length} ${err.conflictedKeys.length === 1 ? "row" : "rows"} already existed in shared state: ${err.conflictedKeys[0]}`,
],
};
}
return {
changes,
warnings: [`Failed migrating plugin-state sidecar ${sourcePath}: ${String(err)}`],
};
}
archiveLegacyPluginStateSidecar({ sourcePath, changes, warnings });
return { changes, warnings };
}
function resolvePluginStateImportTargetKey(scopeKey: string, key: string): string {
return scopeKey ? `${scopeKey}:${key}` : key;
}
@@ -1089,6 +1299,8 @@ export async function detectLegacyStateMigrations(params: {
const legacyAgentDir = path.join(stateDir, "agent");
const targetAgentDir = path.join(stateDir, "agents", targetAgentId, "agent");
const hasLegacyAgentDir = existsDir(legacyAgentDir);
const pluginStateSidecarPath = resolveLegacyPluginStateSidecarPath(stateDir);
const hasPluginStateSidecar = fileExists(pluginStateSidecarPath);
const channelPlans = await collectChannelLegacyStateMigrationPlans({
cfg: params.cfg,
env,
@@ -1106,6 +1318,9 @@ export async function detectLegacyStateMigrations(params: {
if (hasLegacyAgentDir) {
preview.push(`- Agent dir: ${legacyAgentDir}${targetAgentDir}`);
}
if (hasPluginStateSidecar) {
preview.push(`- Plugin state sidecar: ${pluginStateSidecarPath} → shared SQLite state`);
}
if (channelPlans.length > 0) {
preview.push(...channelPlans.map(buildLegacyMigrationPreview));
}
@@ -1133,6 +1348,10 @@ export async function detectLegacyStateMigrations(params: {
hasLegacy: channelPlans.length > 0,
plans: channelPlans,
},
pluginStateSidecar: {
sourcePath: pluginStateSidecarPath,
hasLegacy: hasPluginStateSidecar,
},
preview,
};
}
@@ -1317,6 +1536,9 @@ export async function runLegacyStateMigrations(params: {
}): Promise<{ changes: string[]; warnings: string[] }> {
const now = params.now ?? (() => Date.now());
const detected = params.detected;
const pluginStateSidecar = await migrateLegacyPluginStateSidecar({
stateDir: detected.stateDir,
});
const preSessionChannelPlans = await runLegacyMigrationPlans(
detected.channelPlans.plans.filter((plan) => plan.kind === "plugin-state-import"),
);
@@ -1327,12 +1549,14 @@ export async function runLegacyStateMigrations(params: {
);
return {
changes: [
...pluginStateSidecar.changes,
...preSessionChannelPlans.changes,
...sessions.changes,
...agentDir.changes,
...channelPlans.changes,
],
warnings: [
...pluginStateSidecar.warnings,
...preSessionChannelPlans.warnings,
...sessions.warnings,
...agentDir.warnings,
@@ -1559,24 +1783,49 @@ export async function autoMigrateLegacyState(params: {
}
};
if (env.OPENCLAW_AGENT_DIR?.trim() || env.PI_CODING_AGENT_DIR?.trim()) {
const changes = [...stateDirResult.changes, ...orphanKeys.changes];
const warnings = [...stateDirResult.warnings, ...orphanKeys.warnings];
logMigrationResults(changes, warnings);
return {
migrated: stateDirResult.migrated || orphanKeys.changes.length > 0,
skipped: true,
changes,
warnings,
};
}
const detected = await detectLegacyStateMigrations({
cfg: params.cfg,
env,
homedir: params.homedir,
});
if (!detected.sessions.hasLegacy && !detected.agentDir.hasLegacy) {
const hasCustomAgentDir = env.OPENCLAW_AGENT_DIR?.trim() || env.PI_CODING_AGENT_DIR?.trim();
if (hasCustomAgentDir) {
const pluginStateSidecar = await migrateLegacyPluginStateSidecar({
stateDir: detected.stateDir,
});
const preSessionChannelPlans = await runLegacyMigrationPlans(
detected.channelPlans.plans.filter((plan) => plan.kind === "plugin-state-import"),
);
const changes = [
...stateDirResult.changes,
...orphanKeys.changes,
...pluginStateSidecar.changes,
...preSessionChannelPlans.changes,
];
const warnings = [
...stateDirResult.warnings,
...orphanKeys.warnings,
...pluginStateSidecar.warnings,
...preSessionChannelPlans.warnings,
];
logMigrationResults(changes, warnings);
return {
migrated:
stateDirResult.migrated ||
orphanKeys.changes.length > 0 ||
pluginStateSidecar.changes.length > 0 ||
preSessionChannelPlans.changes.length > 0,
skipped: true,
changes,
warnings,
};
}
if (
!detected.sessions.hasLegacy &&
!detected.agentDir.hasLegacy &&
!detected.channelPlans.hasLegacy &&
!detected.pluginStateSidecar.hasLegacy
) {
const changes = [...stateDirResult.changes, ...orphanKeys.changes];
const warnings = [...stateDirResult.warnings, ...orphanKeys.warnings];
logMigrationResults(changes, warnings);
@@ -1589,19 +1838,34 @@ export async function autoMigrateLegacyState(params: {
}
const now = params.now ?? (() => Date.now());
const pluginStateSidecar = await migrateLegacyPluginStateSidecar({
stateDir: detected.stateDir,
});
const preSessionChannelPlans = await runLegacyMigrationPlans(
detected.channelPlans.plans.filter((plan) => plan.kind === "plugin-state-import"),
);
const sessions = await migrateLegacySessions(detected, now);
const agentDir = await migrateLegacyAgentDir(detected, now);
const channelPlans = await runLegacyMigrationPlans(
detected.channelPlans.plans.filter((plan) => plan.kind !== "plugin-state-import"),
);
const changes = [
...stateDirResult.changes,
...orphanKeys.changes,
...pluginStateSidecar.changes,
...preSessionChannelPlans.changes,
...sessions.changes,
...agentDir.changes,
...channelPlans.changes,
];
const warnings = [
...stateDirResult.warnings,
...orphanKeys.warnings,
...pluginStateSidecar.warnings,
...preSessionChannelPlans.warnings,
...sessions.warnings,
...agentDir.warnings,
...channelPlans.warnings,
];
logMigrationResults(changes, warnings);

View File

@@ -1,38 +1,19 @@
import { mkdirSync } from "node:fs";
import { afterEach, describe, expect, it, vi } from "vitest";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
import {
closePluginStateSqliteStore,
closePluginStateDatabase,
createPluginStateKeyedStore,
PluginStateStoreError,
probePluginStateStore,
resetPluginStateStoreForTests,
sweepExpiredPluginStateEntries,
} from "./plugin-state-store.js";
import { resolvePluginStateDir, resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js";
import { MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN } from "./plugin-state-store.sqlite.js";
import { seedPluginStateEntriesForTests } from "./plugin-state-store.test-helpers.js";
afterEach(() => {
vi.useRealTimers();
resetPluginStateStoreForTests();
});
async function expectPluginStateStoreError(
promise: Promise<unknown>,
expected: { code: string },
): Promise<void> {
let storeError: unknown;
try {
await promise;
} catch (error) {
storeError = error;
}
expect(storeError).toBeInstanceOf(PluginStateStoreError);
expect((storeError as PluginStateStoreError | undefined)?.code).toBe(expected.code);
}
// ---------------------------------------------------------------------------
// Runtime smoke
// ---------------------------------------------------------------------------
@@ -188,36 +169,7 @@ describe("limits", () => {
});
// 65 535 chars → 65 537 bytes of JSON → over limit.
const oversize = "x".repeat(65_535);
await expectPluginStateStoreError(store.register("big", oversize), {
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
});
});
});
it("enforces the per-plugin live-row cap", async () => {
await withOpenClawTestState({ label: "e2e-limit-plugin" }, async () => {
// Fill the plugin budget outside the namespace that attempts the write.
const nsCount = 10;
const perNs = MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount;
seedPluginStateEntriesForTests(
Array.from({ length: MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN }, (_, index) => {
const ns = Math.floor(index / perNs);
const k = index % perNs;
return {
pluginId: "fixture-plugin",
namespace: `ns-${ns}`,
key: `k-${k}`,
value: { ns, k },
};
}),
);
const store = createPluginStateKeyedStore("fixture-plugin", {
namespace: "overflow-ns",
maxEntries: 10,
});
// One more row tips over the plugin-wide limit.
await expectPluginStateStoreError(store.register("overflow", { boom: true }), {
await expect(store.register("big", oversize)).rejects.toMatchObject({
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
});
});
@@ -252,33 +204,14 @@ describe("limits", () => {
// Failure safety
// ---------------------------------------------------------------------------
describe("failure safety", () => {
it("gives a typed error for unsupported schema versions", async () => {
await withOpenClawTestState({ label: "e2e-fail-schema" }, async () => {
// Pre-seed the DB with a future schema version.
mkdirSync(resolvePluginStateDir(), { recursive: true });
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(resolvePluginStateSqlitePath());
db.exec("PRAGMA user_version = 99;");
db.close();
const store = createPluginStateKeyedStore("fixture-plugin", {
namespace: "schema",
maxEntries: 10,
});
const error = await store.register("k", { ok: true }).catch((e: unknown) => e);
expect(error).toBeInstanceOf(PluginStateStoreError);
expect((error as PluginStateStoreError).code).toBe("PLUGIN_STATE_SCHEMA_UNSUPPORTED");
});
});
it("probe returns redacted diagnostics without leaking stored values", async () => {
await withOpenClawTestState({ label: "e2e-fail-probe" }, async () => {
const result = probePluginStateStore();
expect(result.ok).toBe(true);
expect(result.dbPath).toContain("state.sqlite");
expect(result.databasePath).toContain("openclaw.sqlite");
expect(result.steps.length).toBeGreaterThanOrEqual(4);
const failedSteps = result.steps.filter((step) => !step.ok);
expect(failedSteps).toStrictEqual([]);
expect(failedSteps).toEqual([]);
// The probe's temporary stored value must not leak into the result.
const serialised = JSON.stringify(result);
@@ -295,11 +228,11 @@ describe("failure safety", () => {
await store.register("k", { v: 1 });
// First close.
closePluginStateSqliteStore();
closePluginStateDatabase();
await expect(store.lookup("k")).resolves.toEqual({ v: 1 });
// Second close (idempotent).
closePluginStateSqliteStore();
closePluginStateDatabase();
await expect(store.lookup("k")).resolves.toEqual({ v: 1 });
// Write after reopen.

View File

@@ -1,10 +0,0 @@
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
export function resolvePluginStateDir(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveStateDir(env), "plugin-state");
}
export function resolvePluginStateSqlitePath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolvePluginStateDir(env), "state.sqlite");
}

View File

@@ -1,56 +0,0 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
afterEach(() => {
vi.doUnmock("node:fs");
vi.resetModules();
});
describe("plugin state permission hardening", () => {
it("does not reject a committed write when post-commit chmod fails", async () => {
let chmodCalls = 0;
let throwAfter = Number.POSITIVE_INFINITY;
vi.doMock("node:fs", async (importOriginal) => {
const actual = await importOriginal<typeof import("node:fs")>();
return {
...actual,
chmodSync: (target: Parameters<typeof actual.chmodSync>[0], mode: number) => {
chmodCalls += 1;
if (chmodCalls > throwAfter) {
throw Object.assign(new Error("chmod denied"), { code: "EACCES" });
}
return actual.chmodSync(target, mode);
},
existsSync: (target: Parameters<typeof actual.existsSync>[0]) => {
const pathname = String(target);
if (pathname.endsWith("-shm") || pathname.endsWith("-wal")) {
return false;
}
return actual.existsSync(target);
},
};
});
const { createPluginStateKeyedStore, resetPluginStateStoreForTests } =
await import("./plugin-state-store.js");
try {
await withOpenClawTestState({ label: "plugin-state-post-commit-chmod" }, async () => {
const store = createPluginStateKeyedStore<{ value: number }>("fixture-plugin", {
namespace: "post-commit",
maxEntries: 10,
});
await store.register("first", { value: 1 });
chmodCalls = 0;
throwAfter = 2;
await expect(store.register("second", { value: 2 })).resolves.toBeUndefined();
await expect(store.lookup("second")).resolves.toEqual({ value: 2 });
});
} finally {
resetPluginStateStoreForTests();
}
});
});

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
import { seedPluginStateSqliteEntriesForTests } from "./plugin-state-store.sqlite.js";
import { seedPluginStateDatabaseEntriesForTests } from "./plugin-state-store.sqlite.js";
export type PluginStateSeedEntry = {
pluginId: string;
@@ -14,7 +14,7 @@ export function seedPluginStateEntriesForTests(entries: PluginStateSeedEntry[]):
return;
}
seedPluginStateSqliteEntriesForTests(
seedPluginStateDatabaseEntriesForTests(
entries.map((entry) => {
const valueJson = JSON.stringify(entry.value);
if (valueJson == null) {

View File

@@ -1,14 +1,17 @@
import { mkdirSync, rmSync, statSync } from "node:fs";
import { rmSync, statSync } from "node:fs";
import path from "node:path";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { openOpenClawStateDatabase } from "../state/openclaw-state-db.js";
import { resolveOpenClawStateSqlitePath } from "../state/openclaw-state-db.paths.js";
import {
createOpenClawTestState,
withOpenClawTestState,
type OpenClawTestState,
} from "../test-utils/openclaw-test-state.js";
import {
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN,
clearPluginStateStoreForTests,
closePluginStateSqliteStore,
closePluginStateDatabase,
createCorePluginStateKeyedStore,
createPluginStateKeyedStore,
createPluginStateSyncKeyedStore,
@@ -17,14 +20,13 @@ import {
resetPluginStateStoreForTests,
sweepExpiredPluginStateEntries,
} from "./plugin-state-store.js";
import { resolvePluginStateDir, resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js";
import { seedPluginStateEntriesForTests } from "./plugin-state-store.test-helpers.js";
let testState: OpenClawTestState | undefined;
beforeAll(async () => {
testState = await createOpenClawTestState({ label: "plugin-state-store" });
rmSync(resolvePluginStateDir(), { recursive: true, force: true });
rmSync(path.dirname(resolveOpenClawStateSqlitePath()), { recursive: true, force: true });
});
beforeEach(() => {
@@ -96,6 +98,38 @@ describe("plugin state keyed store", () => {
});
});
it("honors explicit store env without mutating process state", async () => {
await withOpenClawTestState(
{ label: "plugin-state-explicit-env-a", applyEnv: false },
async (stateA) => {
await withOpenClawTestState(
{ label: "plugin-state-explicit-env-b", applyEnv: false },
async (stateB) => {
const storeA = createPluginStateKeyedStore<{ owner: string }>("discord", {
namespace: "explicit-env",
maxEntries: 10,
env: stateA.env,
});
const storeB = createPluginStateKeyedStore<{ owner: string }>("discord", {
namespace: "explicit-env",
maxEntries: 10,
env: stateB.env,
});
await storeA.register("shared", { owner: "a" });
await storeB.register("shared", { owner: "b" });
await expect(storeA.lookup("shared")).resolves.toEqual({ owner: "a" });
await expect(storeB.lookup("shared")).resolves.toEqual({ owner: "b" });
expect(resolveOpenClawStateSqlitePath(stateA.env)).not.toBe(
resolveOpenClawStateSqlitePath(stateB.env),
);
},
);
},
);
});
it("upserts values and refreshes deterministic entry ordering", async () => {
await withPluginStateTestState(async () => {
vi.useFakeTimers();
@@ -590,18 +624,53 @@ describe("plugin state keyed store", () => {
await withPluginStateTestState(async () => {
const store = createPluginStateKeyedStore("discord", { namespace: "close", maxEntries: 10 });
await store.register("k", { ok: true });
closePluginStateSqliteStore();
const database = openOpenClawStateDatabase();
closePluginStateDatabase();
expect(() => database.db.exec("SELECT 1")).toThrow();
await expect(store.lookup("k")).resolves.toEqual({ ok: true });
});
});
it("does not close a shared state database opened before the plugin-state probe", async () => {
await withPluginStateTestState(async () => {
const database = openOpenClawStateDatabase();
const result = probePluginStateStore();
expect(result.ok).toBe(true);
expect(database.db.isOpen).toBe(true);
});
});
it("reopens after the shared state DB cache closes its handle", async () => {
await withPluginStateTestState(async () => {
const store = createPluginStateKeyedStore("discord", {
namespace: "cache-switch",
maxEntries: 10,
});
await store.register("k", { ok: true });
const secondary = await createOpenClawTestState({
label: "plugin-state-cache-secondary",
applyEnv: false,
});
try {
openOpenClawStateDatabase({ env: secondary.env });
testState?.applyEnv();
await expect(store.lookup("k")).resolves.toEqual({ ok: true });
} finally {
await secondary.cleanup();
}
});
});
it.runIf(process.platform !== "win32")("hardens DB directory and file permissions", async () => {
await withPluginStateTestState(async () => {
const store = createPluginStateKeyedStore("discord", { namespace: "perms", maxEntries: 10 });
await store.register("k", { ok: true });
expect(statSync(resolvePluginStateDir()).mode & 0o777).toBe(0o700);
expect(statSync(resolvePluginStateSqlitePath()).mode & 0o777).toBe(0o600);
const databasePath = resolveOpenClawStateSqlitePath();
expect(statSync(path.dirname(databasePath)).mode & 0o777).toBe(0o700);
expect(statSync(databasePath).mode & 0o777).toBe(0o600);
});
});
@@ -614,20 +683,4 @@ describe("plugin state keyed store", () => {
expect(JSON.stringify(result)).not.toContain("probe-value");
});
});
it("throws on unsupported future schema versions", async () => {
await withPluginStateTestState(async () => {
closePluginStateSqliteStore();
mkdirSync(resolvePluginStateDir(), { recursive: true });
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(resolvePluginStateSqlitePath());
db.exec("PRAGMA user_version = 2;");
db.close();
const store = createPluginStateKeyedStore("discord", { namespace: "schema", maxEntries: 10 });
await expectPluginStateStoreError(store.register("k", { ok: true }), {
code: "PLUGIN_STATE_SCHEMA_UNSUPPORTED",
});
});
});
});

View File

@@ -1,6 +1,7 @@
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
import {
clearPluginStateSqliteStoreForTests,
closePluginStateSqliteStore,
clearPluginStateDatabaseForTests,
closePluginStateDatabase,
MAX_PLUGIN_STATE_VALUE_BYTES,
pluginStateClear,
pluginStateConsume,
@@ -31,10 +32,11 @@ export type {
} from "./plugin-state-store.types.js";
export { PluginStateStoreError } from "./plugin-state-store.types.js";
export {
closePluginStateDatabase,
closePluginStateSqliteStore,
countPluginStateLiveEntries,
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN,
isPluginStateDatabaseOpen,
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN,
probePluginStateStore,
sweepExpiredPluginStateEntries,
} from "./plugin-state-store.sqlite.js";
@@ -424,13 +426,14 @@ export function createCorePluginStateSyncKeyedStore<T>(
}
export function clearPluginStateStoreForTests(): void {
clearPluginStateSqliteStoreForTests();
clearPluginStateDatabaseForTests();
namespaceOptionSignatures.clear();
}
export function resetPluginStateStoreForTests(options: { closeDatabase?: boolean } = {}): void {
if (options.closeDatabase !== false) {
closePluginStateSqliteStore();
closePluginStateDatabase();
closeOpenClawStateDatabaseForTest();
}
namespaceOptionSignatures.clear();
}

View File

@@ -35,7 +35,6 @@ export type OpenKeyedStoreOptions = {
export type PluginStateStoreErrorCode =
| "PLUGIN_STATE_SQLITE_UNAVAILABLE"
| "PLUGIN_STATE_OPEN_FAILED"
| "PLUGIN_STATE_SCHEMA_UNSUPPORTED"
| "PLUGIN_STATE_WRITE_FAILED"
| "PLUGIN_STATE_READ_FAILED"
| "PLUGIN_STATE_CORRUPT"
@@ -88,6 +87,6 @@ export type PluginStateStoreProbeStep = {
export type PluginStateStoreProbeResult = {
ok: boolean;
dbPath: string;
databasePath: string;
steps: PluginStateStoreProbeStep[];
};

View File

@@ -0,0 +1,965 @@
/**
* This file was generated by kysely-codegen.
* Please do not edit it manually.
*/
import type { ColumnType } from "kysely";
export type Generated<T> =
T extends ColumnType<infer S, infer I, infer U>
? ColumnType<S, I | undefined, U>
: ColumnType<T, T | undefined, T>;
export interface AcpReplayEvents {
at: number;
run_id: string | null;
seq: number;
session_id: string;
session_key: string;
update_json: string;
}
export interface AcpReplaySessions {
complete: number;
created_at: number;
cwd: string;
next_seq: number;
session_id: string;
session_key: string;
updated_at: number;
}
export interface AgentDatabases {
agent_id: string;
last_seen_at: number;
path: string;
schema_version: number;
size_bytes: number | null;
}
export interface AgentModelCatalogs {
agent_dir: string;
catalog_key: string;
raw_json: string;
updated_at: number;
}
export interface AndroidNotificationRecentPackages {
package_name: string;
sort_order: number;
updated_at_ms: number;
}
export interface ApnsRegistrations {
distribution: string | null;
environment: string;
installation_id: string | null;
node_id: string;
relay_handle: string | null;
send_grant: string | null;
token: string | null;
token_debug_suffix: string | null;
topic: string;
transport: string;
updated_at_ms: number;
}
export interface AuthProfileState {
state_json: string;
store_key: string;
updated_at: number;
}
export interface AuthProfileStores {
store_json: string;
store_key: string;
updated_at: number;
}
export interface BackupRuns {
archive_path: string;
created_at: number;
id: string;
manifest_json: string;
status: string;
}
export interface CaptureBlobs {
blob_id: string;
content_type: string | null;
created_at: number;
data: Uint8Array;
encoding: string;
sha256: string;
size_bytes: number;
}
export interface CaptureEvents {
close_code: number | null;
content_type: string | null;
data_blob_id: string | null;
data_sha256: string | null;
data_text: string | null;
direction: string;
error_text: string | null;
flow_id: string;
headers_json: string | null;
host: string | null;
id: Generated<number>;
kind: string;
meta_json: string | null;
method: string | null;
path: string | null;
protocol: string;
session_id: string;
source_process: string;
source_scope: string;
status: number | null;
ts: number;
}
export interface CaptureSessions {
ended_at: number | null;
id: string;
mode: string;
proxy_url: string | null;
source_process: string;
source_scope: string;
started_at: number;
}
export interface ChannelPairingAllowEntries {
account_id: string;
channel_key: string;
entry: string;
sort_order: number;
updated_at: number;
}
export interface ChannelPairingRequests {
account_id: string;
channel_key: string;
code: string;
created_at: string;
last_seen_at: string;
meta_json: string | null;
request_id: string;
}
export interface CommandLogEntries {
action: string;
entry_json: string;
id: string;
sender_id: string;
session_key: string;
source: string;
timestamp_ms: number;
}
export interface Commitments {
account_id: string | null;
agent_id: string;
attempts: number;
channel: string;
confidence: number;
created_at_ms: number;
dedupe_key: string;
dismissed_at_ms: number | null;
due_earliest_ms: number;
due_latest_ms: number;
due_timezone: string;
expired_at_ms: number | null;
id: string;
kind: string;
last_attempt_at_ms: number | null;
reason: string;
recipient_id: string | null;
record_json: string;
sender_id: string | null;
sensitivity: string;
sent_at_ms: number | null;
session_key: string;
snoozed_until_ms: number | null;
source: string;
source_message_id: string | null;
source_run_id: string | null;
status: string;
suggested_text: string;
thread_id: string | null;
updated_at_ms: number;
}
export interface ConfigHealthEntries {
config_path: string;
last_known_good_json: string | null;
last_observed_suspicious_signature: string | null;
last_promoted_good_json: string | null;
updated_at_ms: number;
}
export interface CronJobs {
agent_id: string | null;
anchor_ms: number | null;
at: string | null;
consecutive_errors: number | null;
consecutive_skipped: number | null;
created_at_ms: number;
delete_after_run: number | null;
delivery_account_id: string | null;
delivery_best_effort: number | null;
delivery_channel: string | null;
delivery_mode: string | null;
delivery_thread_id: string | null;
delivery_to: string | null;
description: string | null;
enabled: number;
every_ms: number | null;
failure_alert_account_id: string | null;
failure_alert_after: number | null;
failure_alert_channel: string | null;
failure_alert_cooldown_ms: number | null;
failure_alert_disabled: number | null;
failure_alert_include_skipped: number | null;
failure_alert_mode: string | null;
failure_alert_to: string | null;
failure_delivery_account_id: string | null;
failure_delivery_channel: string | null;
failure_delivery_mode: string | null;
failure_delivery_to: string | null;
job_id: string;
job_json: string;
last_delivered: number | null;
last_delivery_error: string | null;
last_delivery_status: string | null;
last_duration_ms: number | null;
last_error: string | null;
last_failure_alert_at_ms: number | null;
last_run_at_ms: number | null;
last_run_status: string | null;
name: string;
next_run_at_ms: number | null;
payload_allow_unsafe_external_content: number | null;
payload_external_content_source_json: string | null;
payload_fallbacks_json: string | null;
payload_kind: string;
payload_light_context: number | null;
payload_message: string | null;
payload_model: string | null;
payload_thinking: string | null;
payload_timeout_seconds: number | null;
payload_tools_allow_json: string | null;
running_at_ms: number | null;
runtime_updated_at_ms: number | null;
schedule_error_count: number | null;
schedule_expr: string | null;
schedule_identity: string | null;
schedule_kind: string;
schedule_tz: string | null;
session_key: string | null;
session_target: string;
sort_order: Generated<number>;
stagger_ms: number | null;
state_json: Generated<string>;
store_key: string;
updated_at: number;
wake_mode: string;
}
export interface CronRunLogs {
created_at: number;
delivered: number | null;
delivery_error: string | null;
delivery_status: string | null;
diagnostics_summary: string | null;
duration_ms: number | null;
entry_json: string;
error: string | null;
job_id: string;
model: string | null;
next_run_at_ms: number | null;
provider: string | null;
run_at_ms: number | null;
run_id: string | null;
seq: number;
session_id: string | null;
session_key: string | null;
status: string | null;
store_key: string;
summary: string | null;
total_tokens: number | null;
ts: number;
}
export interface CurrentConversationBindings {
account_id: string;
binding_id: string;
binding_key: string;
bound_at: number;
channel: string;
conversation_id: string;
conversation_kind: string;
expires_at: number | null;
metadata_json: string | null;
parent_conversation_id: string | null;
record_json: string;
status: string;
target_agent_id: string;
target_kind: string;
target_session_id: string | null;
target_session_key: string;
updated_at: number;
}
export interface DeliveryQueueEntries {
account_id: string | null;
channel: string | null;
enqueued_at: number;
entry_json: string;
entry_kind: string | null;
failed_at: number | null;
id: string;
last_attempt_at: number | null;
last_error: string | null;
platform_send_started_at: number | null;
queue_name: string;
recovery_state: string | null;
retry_count: Generated<number>;
session_key: string | null;
status: string;
target: string | null;
updated_at: number;
}
export interface DeviceAuthTokens {
device_id: string;
role: string;
scopes_json: string;
token: string;
updated_at_ms: number;
}
export interface DeviceBootstrapTokens {
device_id: string | null;
issued_at_ms: number;
last_used_at_ms: number | null;
pending_profile_json: string | null;
profile_json: string | null;
public_key: string | null;
redeemed_profile_json: string | null;
token: string;
token_key: string;
ts: number;
}
export interface DeviceIdentities {
created_at_ms: number;
device_id: string;
identity_key: string;
private_key_pem: string;
public_key_pem: string;
updated_at_ms: number;
}
export interface DevicePairingPaired {
approved_at_ms: number;
approved_scopes_json: string | null;
client_id: string | null;
client_mode: string | null;
created_at_ms: number;
device_family: string | null;
device_id: string;
display_name: string | null;
last_seen_at_ms: number | null;
last_seen_reason: string | null;
platform: string | null;
public_key: string;
remote_ip: string | null;
role: string | null;
roles_json: string | null;
scopes_json: string | null;
tokens_json: string | null;
}
export interface DevicePairingPending {
client_id: string | null;
client_mode: string | null;
device_family: string | null;
device_id: string;
display_name: string | null;
is_repair: number | null;
platform: string | null;
public_key: string;
remote_ip: string | null;
request_id: string;
role: string | null;
roles_json: string | null;
scopes_json: string | null;
silent: number | null;
ts: number;
}
export interface DiagnosticEvents {
created_at: number;
event_key: string;
payload_json: string;
scope: string;
}
export interface DiagnosticStabilityBundles {
bundle_json: string;
bundle_key: string;
created_at: number;
generated_at: string;
reason: string;
}
export interface ExecApprovalsConfig {
agent_count: number;
allowlist_count: number;
auto_allow_skills: number | null;
config_key: string;
default_ask: string | null;
default_ask_fallback: string | null;
default_security: string | null;
has_socket_token: number;
raw_json: string;
socket_path: string | null;
updated_at_ms: number;
}
export interface FlowRuns {
blocked_summary: string | null;
blocked_task_id: string | null;
cancel_requested_at: number | null;
controller_id: string | null;
created_at: number;
current_step: string | null;
ended_at: number | null;
flow_id: string;
goal: string;
notify_policy: string;
owner_key: string;
requester_origin_json: string | null;
revision: Generated<number>;
shape: string | null;
state_json: string | null;
status: string;
sync_mode: Generated<string>;
updated_at: number;
wait_json: string | null;
}
export interface GatewayRestartHandoff {
created_at: number;
expires_at: number;
handoff_key: string;
intent_id: string;
kind: string;
pid: number;
process_instance_id: string | null;
reason: string | null;
restart_kind: string;
restart_trace_last_at: number | null;
restart_trace_started_at: number | null;
source: string;
supervisor_mode: string;
updated_at_ms: number;
version: number;
}
export interface GatewayRestartIntent {
created_at: number;
force: number | null;
intent_key: string;
kind: string;
pid: number;
reason: string | null;
updated_at_ms: number;
wait_ms: number | null;
}
export interface GatewayRestartSentinel {
continuation_json: string | null;
delivery_account_id: string | null;
delivery_channel: string | null;
delivery_to: string | null;
doctor_hint: string | null;
kind: string;
message: string | null;
payload_json: string;
sentinel_key: string;
session_key: string | null;
stats_json: string | null;
status: string;
thread_id: string | null;
ts: number;
updated_at_ms: number;
version: number;
}
export interface InstalledPluginIndex {
compat_registry_version: string;
diagnostics_json: string;
generated_at_ms: number;
host_contract_version: string;
index_key: string;
install_records_json: string;
migration_version: number;
plugins_json: string;
policy_hash: string;
refresh_reason: string | null;
updated_at_ms: number;
version: number;
warning: string | null;
}
export interface MacosPortGuardianRecords {
command: string;
mode: string;
pid: Generated<number>;
port: number;
timestamp: number;
}
export interface ManagedOutgoingImageRecords {
alt: string;
attachment_id: string;
created_at: string;
message_id: string | null;
original_content_type: string;
original_filename: string | null;
original_height: number | null;
original_media_id: string;
original_media_subdir: string;
original_size_bytes: number | null;
original_width: number | null;
record_json: string;
retention_class: string | null;
session_key: string;
updated_at: string | null;
}
export interface MediaBlobs {
blob: Uint8Array;
content_type: string | null;
created_at: number;
id: string;
size_bytes: number;
subdir: string;
updated_at: number;
}
export interface MigrationRuns {
finished_at: number | null;
id: string;
report_json: string;
started_at: number;
status: string;
}
export interface MigrationSources {
imported_at: number;
last_run_id: string;
migration_kind: string;
removed_source: Generated<number>;
report_json: string;
source_key: string;
source_path: string;
source_record_count: number | null;
source_sha256: string | null;
source_size_bytes: number | null;
status: string;
target_table: string;
}
export interface ModelCapabilityCache {
context_window: number;
cost_cache_read: number;
cost_cache_write: number;
cost_input: number;
cost_output: number;
input_image: number;
input_text: number;
max_tokens: number;
model_id: string;
name: string;
provider_id: string;
reasoning: number;
supports_tools: number | null;
updated_at_ms: number;
}
export interface NativeHookRelayBridges {
expires_at_ms: number;
hostname: string;
pid: number;
port: number;
relay_id: string;
token: string;
updated_at_ms: number;
}
export interface NodeHostConfig {
config_key: string;
display_name: string | null;
gateway_host: string | null;
gateway_port: number | null;
gateway_tls: number | null;
gateway_tls_fingerprint: string | null;
node_id: string;
token: string | null;
updated_at_ms: number;
version: number;
}
export interface NodePairingPaired {
approved_at_ms: number;
bins_json: string | null;
caps_json: string | null;
client_id: string | null;
client_mode: string | null;
commands_json: string | null;
core_version: string | null;
created_at_ms: number;
device_family: string | null;
display_name: string | null;
last_connected_at_ms: number | null;
last_seen_at_ms: number | null;
last_seen_reason: string | null;
model_identifier: string | null;
node_id: string;
permissions_json: string | null;
platform: string | null;
remote_ip: string | null;
token: string;
ui_version: string | null;
version: string | null;
}
export interface NodePairingPending {
caps_json: string | null;
client_id: string | null;
client_mode: string | null;
commands_json: string | null;
core_version: string | null;
device_family: string | null;
display_name: string | null;
model_identifier: string | null;
node_id: string;
permissions_json: string | null;
platform: string | null;
remote_ip: string | null;
request_id: string;
silent: number | null;
ts: number;
ui_version: string | null;
version: string | null;
}
export interface PluginBindingApprovals {
account_id: string;
approved_at: number;
channel: string;
plugin_id: string;
plugin_name: string | null;
plugin_root: string;
}
export interface PluginBlobEntries {
blob: Uint8Array;
created_at: number;
entry_key: string;
expires_at: number | null;
metadata_json: string;
namespace: string;
plugin_id: string;
}
export interface PluginStateEntries {
created_at: number;
entry_key: string;
expires_at: number | null;
namespace: string;
plugin_id: string;
value_json: string;
}
export interface SandboxRegistryEntries {
backend_id: string | null;
cdp_port: number | null;
config_hash: string | null;
config_label_kind: string | null;
container_name: string;
created_at_ms: number | null;
entry_json: string;
image: string | null;
last_used_at_ms: number | null;
no_vnc_port: number | null;
registry_kind: string;
runtime_label: string | null;
session_key: string | null;
updated_at: number;
}
export interface SchemaMeta {
agent_id: string | null;
app_version: string | null;
created_at: number;
meta_key: string;
role: string;
schema_version: number;
updated_at: number;
}
export interface SkillUploads {
actual_sha256: string | null;
archive_blob: Uint8Array;
committed: number;
committed_at: number | null;
created_at: number;
expires_at: number;
force: number;
idempotency_key_hash: string | null;
kind: string;
received_bytes: number;
sha256: string | null;
size_bytes: number;
slug: string;
upload_id: string;
}
export interface StateLeases {
created_at: number;
expires_at: number | null;
heartbeat_at: number | null;
lease_key: string;
owner: string;
payload_json: string | null;
scope: string;
updated_at: number;
}
export interface SubagentRuns {
accumulated_runtime_ms: number | null;
agent_dir: string | null;
announce_retry_count: number | null;
archive_at_ms: number | null;
child_session_key: string;
cleanup: string;
cleanup_completed_at: number | null;
cleanup_handled: number | null;
completion_announced_at: number | null;
controller_session_key: string | null;
created_at: number;
ended_at: number | null;
ended_hook_emitted_at: number | null;
ended_reason: string | null;
expects_completion_message: number | null;
fallback_frozen_result_captured_at: number | null;
fallback_frozen_result_text: string | null;
frozen_result_captured_at: number | null;
frozen_result_text: string | null;
label: string | null;
last_announce_delivery_error: string | null;
last_announce_retry_at: number | null;
model: string | null;
outcome_json: string | null;
pause_reason: string | null;
payload_json: Generated<string>;
pending_final_delivery: number | null;
pending_final_delivery_attempt_count: number | null;
pending_final_delivery_created_at: number | null;
pending_final_delivery_last_attempt_at: number | null;
pending_final_delivery_last_error: string | null;
pending_final_delivery_payload_json: string | null;
requester_display_key: string;
requester_origin_json: string | null;
requester_session_key: string;
run_id: string;
run_timeout_seconds: number | null;
session_started_at: number | null;
spawn_mode: string | null;
started_at: number | null;
suppress_announce_reason: string | null;
task: string;
task_name: string | null;
wake_on_descendant_settle: number | null;
workspace_dir: string | null;
}
export interface TaskDeliveryState {
last_notified_event_at: number | null;
requester_origin_json: string | null;
task_id: string;
}
export interface TaskRuns {
agent_id: string | null;
child_session_key: string | null;
cleanup_after: number | null;
created_at: number;
delivery_status: string;
ended_at: number | null;
error: string | null;
label: string | null;
last_event_at: number | null;
notify_policy: string;
owner_key: string;
parent_flow_id: string | null;
parent_task_id: string | null;
progress_summary: string | null;
requester_session_key: string | null;
run_id: string | null;
runtime: string;
scope_kind: string;
source_id: string | null;
started_at: number | null;
status: string;
task: string;
task_id: string;
task_kind: string | null;
terminal_outcome: string | null;
terminal_summary: string | null;
}
export interface TuiLastSessions {
scope_key: string;
session_key: string;
updated_at: number;
}
export interface UpdateCheckState {
auto_first_seen_at: string | null;
auto_first_seen_tag: string | null;
auto_first_seen_version: string | null;
auto_install_id: string | null;
auto_last_attempt_at: string | null;
auto_last_attempt_version: string | null;
auto_last_success_at: string | null;
auto_last_success_version: string | null;
last_available_tag: string | null;
last_available_version: string | null;
last_checked_at: string | null;
last_notified_tag: string | null;
last_notified_version: string | null;
state_key: string;
updated_at_ms: number;
}
export interface VoicewakeRoutingConfig {
config_key: string;
default_target_agent_id: string | null;
default_target_mode: string;
default_target_session_key: string | null;
updated_at_ms: number;
version: number;
}
export interface VoicewakeRoutingRoutes {
config_key: string;
position: number;
target_agent_id: string | null;
target_mode: string;
target_session_key: string | null;
trigger: string;
updated_at_ms: number;
}
export interface VoicewakeTriggers {
config_key: string;
position: number;
trigger: string;
updated_at_ms: number;
}
export interface WebPushSubscriptions {
auth: string;
created_at_ms: number;
endpoint: string;
endpoint_hash: string;
p256dh: string;
subscription_id: string;
updated_at_ms: number;
}
export interface WebPushVapidKeys {
key_id: string;
private_key: string;
public_key: string;
subject: string;
updated_at_ms: number;
}
export interface WorkspaceSetupState {
bootstrap_seeded_at: string | null;
setup_completed_at: string | null;
updated_at: number;
version: number;
workspace_key: string;
workspace_path: string;
}
export interface DB {
acp_replay_events: AcpReplayEvents;
acp_replay_sessions: AcpReplaySessions;
agent_databases: AgentDatabases;
agent_model_catalogs: AgentModelCatalogs;
android_notification_recent_packages: AndroidNotificationRecentPackages;
apns_registrations: ApnsRegistrations;
auth_profile_state: AuthProfileState;
auth_profile_stores: AuthProfileStores;
backup_runs: BackupRuns;
capture_blobs: CaptureBlobs;
capture_events: CaptureEvents;
capture_sessions: CaptureSessions;
channel_pairing_allow_entries: ChannelPairingAllowEntries;
channel_pairing_requests: ChannelPairingRequests;
command_log_entries: CommandLogEntries;
commitments: Commitments;
config_health_entries: ConfigHealthEntries;
cron_jobs: CronJobs;
cron_run_logs: CronRunLogs;
current_conversation_bindings: CurrentConversationBindings;
delivery_queue_entries: DeliveryQueueEntries;
device_auth_tokens: DeviceAuthTokens;
device_bootstrap_tokens: DeviceBootstrapTokens;
device_identities: DeviceIdentities;
device_pairing_paired: DevicePairingPaired;
device_pairing_pending: DevicePairingPending;
diagnostic_events: DiagnosticEvents;
diagnostic_stability_bundles: DiagnosticStabilityBundles;
exec_approvals_config: ExecApprovalsConfig;
flow_runs: FlowRuns;
gateway_restart_handoff: GatewayRestartHandoff;
gateway_restart_intent: GatewayRestartIntent;
gateway_restart_sentinel: GatewayRestartSentinel;
installed_plugin_index: InstalledPluginIndex;
macos_port_guardian_records: MacosPortGuardianRecords;
managed_outgoing_image_records: ManagedOutgoingImageRecords;
media_blobs: MediaBlobs;
migration_runs: MigrationRuns;
migration_sources: MigrationSources;
model_capability_cache: ModelCapabilityCache;
native_hook_relay_bridges: NativeHookRelayBridges;
node_host_config: NodeHostConfig;
node_pairing_paired: NodePairingPaired;
node_pairing_pending: NodePairingPending;
plugin_binding_approvals: PluginBindingApprovals;
plugin_blob_entries: PluginBlobEntries;
plugin_state_entries: PluginStateEntries;
sandbox_registry_entries: SandboxRegistryEntries;
schema_meta: SchemaMeta;
skill_uploads: SkillUploads;
state_leases: StateLeases;
subagent_runs: SubagentRuns;
task_delivery_state: TaskDeliveryState;
task_runs: TaskRuns;
tui_last_sessions: TuiLastSessions;
update_check_state: UpdateCheckState;
voicewake_routing_config: VoicewakeRoutingConfig;
voicewake_routing_routes: VoicewakeRoutingRoutes;
voicewake_triggers: VoicewakeTriggers;
web_push_subscriptions: WebPushSubscriptions;
web_push_vapid_keys: WebPushVapidKeys;
workspace_setup_state: WorkspaceSetupState;
}

View File

@@ -0,0 +1,10 @@
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
export function resolveOpenClawStateSqliteDir(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveStateDir(env), "state");
}
export function resolveOpenClawStateSqlitePath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveOpenClawStateSqliteDir(env), "openclaw.sqlite");
}

View File

@@ -0,0 +1,205 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import {
executeSqliteQuerySync,
executeSqliteQueryTakeFirstSync,
getNodeSqliteKysely,
} from "../infra/kysely-sync.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { readSqliteNumberPragma } from "../infra/sqlite-pragma.test-support.js";
import type { DB as OpenClawStateKyselyDatabase } from "./openclaw-state-db.generated.js";
import {
closeOpenClawStateDatabaseForTest,
openOpenClawStateDatabase,
runOpenClawStateWriteTransaction,
} from "./openclaw-state-db.js";
import { resolveOpenClawStateSqlitePath } from "./openclaw-state-db.paths.js";
import {
collectSqliteSchemaShape,
createSqliteSchemaShapeFromSql,
} from "./sqlite-schema-shape.test-support.js";
type StateDbTestDatabase = Pick<OpenClawStateKyselyDatabase, "diagnostic_events" | "schema_meta">;
function createTempStateDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-state-db-"));
}
afterEach(() => {
closeOpenClawStateDatabaseForTest();
});
describe("openclaw state database", () => {
it("resolves under the shared state database directory", () => {
const stateDir = createTempStateDir();
expect(resolveOpenClawStateSqlitePath({ OPENCLAW_STATE_DIR: stateDir })).toBe(
path.join(stateDir, "state", "openclaw.sqlite"),
);
});
it("creates the shared state schema from the committed SQL shape", () => {
const stateDir = createTempStateDir();
const database = openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
});
expect(collectSqliteSchemaShape(database.db)).toEqual(
createSqliteSchemaShapeFromSql(new URL("./openclaw-state-schema.sql", import.meta.url)),
);
expect(database.path).toBe(path.join(stateDir, "state", "openclaw.sqlite"));
});
it("configures durable SQLite connection pragmas", () => {
const stateDir = createTempStateDir();
const database = openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
});
expect(readSqliteNumberPragma(database.db, "busy_timeout")).toBe(30_000);
expect(readSqliteNumberPragma(database.db, "foreign_keys")).toBe(1);
expect(readSqliteNumberPragma(database.db, "synchronous")).toBe(1);
expect(readSqliteNumberPragma(database.db, "user_version")).toBe(1);
expect(readSqliteNumberPragma(database.db, "wal_autocheckpoint")).toBe(1000);
const journalMode = database.db.prepare("PRAGMA journal_mode").get() as
| { journal_mode?: string }
| undefined;
expect(journalMode?.journal_mode?.toLowerCase()).toBe("wal");
});
it("records durable schema metadata", () => {
const stateDir = createTempStateDir();
const database = openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
});
const stateDb = getNodeSqliteKysely<StateDbTestDatabase>(database.db);
expect(
executeSqliteQueryTakeFirstSync(
database.db,
stateDb.selectFrom("schema_meta").select(["role", "schema_version"]),
),
).toEqual({ role: "global", schema_version: 1 });
});
it("refuses to open newer global schema versions", () => {
const stateDir = createTempStateDir();
const databasePath = path.join(stateDir, "state", "openclaw.sqlite");
fs.mkdirSync(path.dirname(databasePath), { recursive: true });
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(databasePath);
db.exec("PRAGMA user_version = 2;");
db.close();
expect(() =>
openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
}),
).toThrow(/newer schema version 2/);
});
it("does not chmod shared parent directories for explicit database paths", () => {
const databasePath = path.join(
os.tmpdir(),
`openclaw-explicit-state-${process.pid}-${Date.now()}.sqlite`,
);
expect(() => openOpenClawStateDatabase({ path: databasePath })).not.toThrow();
expect(fs.existsSync(databasePath)).toBe(true);
});
it("keeps cached handles open when another state path is opened", () => {
const firstPath = path.join(
createTempStateDir(),
"state",
`first-${process.pid}-${Date.now()}.sqlite`,
);
const secondPath = path.join(
createTempStateDir(),
"state",
`second-${process.pid}-${Date.now()}.sqlite`,
);
const first = openOpenClawStateDatabase({ path: firstPath });
const second = openOpenClawStateDatabase({ path: secondPath });
expect(first.db.isOpen).toBe(true);
expect(second.db.isOpen).toBe(true);
expect(openOpenClawStateDatabase({ path: firstPath })).toBe(first);
expect(readSqliteNumberPragma(first.db, "user_version")).toBe(1);
});
it("uses savepoints for nested write transaction rollback", () => {
const stateDir = createTempStateDir();
const options = { env: { OPENCLAW_STATE_DIR: stateDir } };
runOpenClawStateWriteTransaction((database) => {
const stateDb = getNodeSqliteKysely<StateDbTestDatabase>(database.db);
executeSqliteQuerySync(
database.db,
stateDb.insertInto("diagnostic_events").values({
scope: "transaction-test",
event_key: "outer",
payload_json: "{}",
created_at: 1,
}),
);
expect(() =>
runOpenClawStateWriteTransaction((inner) => {
const innerDb = getNodeSqliteKysely<StateDbTestDatabase>(inner.db);
executeSqliteQuerySync(
inner.db,
innerDb.insertInto("diagnostic_events").values({
scope: "transaction-test",
event_key: "inner",
payload_json: "{}",
created_at: 2,
}),
);
throw new Error("rollback nested");
}, options),
).toThrow("rollback nested");
}, options);
const database = openOpenClawStateDatabase(options);
const stateDb = getNodeSqliteKysely<StateDbTestDatabase>(database.db);
expect(
executeSqliteQuerySync(
database.db,
stateDb
.selectFrom("diagnostic_events")
.select("event_key")
.where("scope", "=", "transaction-test")
.orderBy("event_key"),
).rows.map((row) => row.event_key),
).toEqual(["outer"]);
});
it("rejects Promise-returning write transactions", () => {
const stateDir = createTempStateDir();
const options = { env: { OPENCLAW_STATE_DIR: stateDir } };
expect(() =>
runOpenClawStateWriteTransaction(async () => {
return "not sync";
}, options),
).toThrow("must be synchronous");
expect(() =>
runOpenClawStateWriteTransaction((database) => {
const stateDb = getNodeSqliteKysely<StateDbTestDatabase>(database.db);
executeSqliteQuerySync(
database.db,
stateDb.insertInto("diagnostic_events").values({
scope: "transaction-test",
event_key: "after",
payload_json: "{}",
created_at: 3,
}),
);
}, options),
).not.toThrow();
});
});

View File

@@ -0,0 +1,317 @@
import { randomUUID } from "node:crypto";
import { chmodSync, existsSync, mkdirSync } from "node:fs";
import path from "node:path";
import type { DatabaseSync } from "node:sqlite";
import {
clearNodeSqliteKyselyCacheForDatabase,
executeSqliteQuerySync,
getNodeSqliteKysely,
} from "../infra/kysely-sync.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { runSqliteImmediateTransactionSync } from "../infra/sqlite-transaction.js";
import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js";
import type { DB as OpenClawStateKyselyDatabase } from "./openclaw-state-db.generated.js";
import {
resolveOpenClawStateSqliteDir,
resolveOpenClawStateSqlitePath,
} from "./openclaw-state-db.paths.js";
import { OPENCLAW_STATE_SCHEMA_SQL } from "./openclaw-state-schema.generated.js";
const OPENCLAW_STATE_SCHEMA_VERSION = 1;
export const OPENCLAW_SQLITE_BUSY_TIMEOUT_MS = 30_000;
const OPENCLAW_STATE_DIR_MODE = 0o700;
const OPENCLAW_STATE_FILE_MODE = 0o600;
const OPENCLAW_STATE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
export type OpenClawStateDatabase = {
db: DatabaseSync;
path: string;
walMaintenance: SqliteWalMaintenance;
};
export type OpenClawStateDatabaseOptions = {
env?: NodeJS.ProcessEnv;
path?: string;
};
export type OpenClawMigrationRunStatus = "completed" | "warning" | "failed";
export type OpenClawBackupRunStatus = "completed" | "failed";
export type RecordOpenClawStateMigrationRunOptions = OpenClawStateDatabaseOptions & {
id?: string;
startedAt: number;
finishedAt?: number;
status: OpenClawMigrationRunStatus;
report: Record<string, unknown>;
};
export type RecordOpenClawStateMigrationSourceOptions = OpenClawStateDatabaseOptions & {
runId: string;
migrationKind: string;
sourceKey: string;
sourcePath: string;
targetTable: string;
status: OpenClawMigrationRunStatus;
importedAt: number;
removedSource: boolean;
sourceSha256?: string;
sourceSizeBytes?: number;
sourceRecordCount?: number;
report: Record<string, unknown>;
};
export type RecordOpenClawStateBackupRunOptions = OpenClawStateDatabaseOptions & {
id?: string;
createdAt: number;
archivePath: string;
status: OpenClawBackupRunStatus;
manifest: Record<string, unknown>;
};
const cachedDatabases = new Map<string, OpenClawStateDatabase>();
type OpenClawStateMetadataDatabase = Pick<
OpenClawStateKyselyDatabase,
"backup_runs" | "migration_runs" | "migration_sources" | "schema_meta"
>;
function readSqliteUserVersion(db: DatabaseSync): number {
const row = db.prepare("PRAGMA user_version").get() as { user_version?: unknown } | undefined;
return Number(row?.user_version ?? 0);
}
function assertSupportedSchemaVersion(db: DatabaseSync, pathname: string): void {
const userVersion = readSqliteUserVersion(db);
if (userVersion > OPENCLAW_STATE_SCHEMA_VERSION) {
throw new Error(
`OpenClaw state database ${pathname} uses newer schema version ${userVersion}; this OpenClaw build supports ${OPENCLAW_STATE_SCHEMA_VERSION}.`,
);
}
}
function ensureOpenClawStatePermissions(pathname: string, env: NodeJS.ProcessEnv): void {
const dir = path.dirname(pathname);
const defaultDir = resolveOpenClawStateSqliteDir(env);
const isDefaultStateDatabase =
path.resolve(pathname) === path.resolve(resolveOpenClawStateSqlitePath(env));
if (isDefaultStateDatabase && dir !== defaultDir) {
throw new Error(`OpenClaw state database path resolved outside its state dir: ${pathname}`);
}
const dirExisted = existsSync(dir);
mkdirSync(dir, { recursive: true, mode: OPENCLAW_STATE_DIR_MODE });
if (isDefaultStateDatabase || !dirExisted) {
chmodSync(dir, OPENCLAW_STATE_DIR_MODE);
}
for (const suffix of OPENCLAW_STATE_SIDECAR_SUFFIXES) {
const candidate = `${pathname}${suffix}`;
if (existsSync(candidate)) {
chmodSync(candidate, OPENCLAW_STATE_FILE_MODE);
}
}
}
function tableHasColumn(db: DatabaseSync, tableName: string, columnName: string): boolean {
const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ name?: unknown }>;
return rows.some((row) => row.name === columnName);
}
function ensureColumn(db: DatabaseSync, tableName: string, columnSql: string): void {
const columnName = columnSql.trim().split(/\s+/, 1)[0];
if (!columnName || tableHasColumn(db, tableName, columnName)) {
return;
}
db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnSql};`);
}
function ensureAdditiveStateColumns(db: DatabaseSync): void {
ensureColumn(db, "node_pairing_pending", "client_id TEXT");
ensureColumn(db, "node_pairing_pending", "client_mode TEXT");
ensureColumn(db, "node_pairing_paired", "client_id TEXT");
ensureColumn(db, "node_pairing_paired", "client_mode TEXT");
}
function ensureSchema(db: DatabaseSync, pathname: string): void {
assertSupportedSchemaVersion(db, pathname);
db.exec(OPENCLAW_STATE_SCHEMA_SQL);
ensureAdditiveStateColumns(db);
db.exec(`PRAGMA user_version = ${OPENCLAW_STATE_SCHEMA_VERSION};`);
const now = Date.now();
const kysely = getNodeSqliteKysely<OpenClawStateMetadataDatabase>(db);
executeSqliteQuerySync(
db,
kysely
.insertInto("schema_meta")
.values({
meta_key: "primary",
role: "global",
schema_version: OPENCLAW_STATE_SCHEMA_VERSION,
agent_id: null,
app_version: null,
created_at: now,
updated_at: now,
})
.onConflict((conflict) =>
conflict.column("meta_key").doUpdateSet({
role: "global",
schema_version: OPENCLAW_STATE_SCHEMA_VERSION,
agent_id: null,
app_version: null,
updated_at: now,
}),
),
);
}
function resolveDatabasePath(options: OpenClawStateDatabaseOptions = {}): string {
return options.path ?? resolveOpenClawStateSqlitePath(options.env ?? process.env);
}
export function openOpenClawStateDatabase(
options: OpenClawStateDatabaseOptions = {},
): OpenClawStateDatabase {
const env = options.env ?? process.env;
const pathname = resolveDatabasePath(options);
const cached = cachedDatabases.get(pathname);
if (cached?.db.isOpen) {
return cached;
}
if (cached) {
cached.walMaintenance.close();
clearNodeSqliteKyselyCacheForDatabase(cached.db);
cachedDatabases.delete(pathname);
}
ensureOpenClawStatePermissions(pathname, env);
const sqlite = requireNodeSqlite();
const db = new sqlite.DatabaseSync(pathname);
const walMaintenance = configureSqliteWalMaintenance(db, {
databaseLabel: "openclaw-state",
databasePath: pathname,
});
db.exec("PRAGMA synchronous = NORMAL;");
db.exec(`PRAGMA busy_timeout = ${OPENCLAW_SQLITE_BUSY_TIMEOUT_MS};`);
db.exec("PRAGMA foreign_keys = ON;");
try {
ensureSchema(db, pathname);
} catch (err) {
walMaintenance.close();
db.close();
throw err;
}
ensureOpenClawStatePermissions(pathname, env);
const database = { db, path: pathname, walMaintenance };
cachedDatabases.set(pathname, database);
return database;
}
export function runOpenClawStateWriteTransaction<T>(
operation: (database: OpenClawStateDatabase) => T,
options: OpenClawStateDatabaseOptions = {},
): T {
const database = openOpenClawStateDatabase(options);
const result = runSqliteImmediateTransactionSync(database.db, () => operation(database));
try {
ensureOpenClawStatePermissions(database.path, options.env ?? process.env);
} catch {
// The write already committed; permission hardening is best-effort here so
// callers never retry an operation that is durable in SQLite.
}
return result;
}
export function recordOpenClawStateMigrationRun(
options: RecordOpenClawStateMigrationRunOptions,
): string {
const id = options.id ?? randomUUID();
runOpenClawStateWriteTransaction((database) => {
const db = getNodeSqliteKysely<OpenClawStateMetadataDatabase>(database.db);
executeSqliteQuerySync(
database.db,
db.insertInto("migration_runs").values({
id,
started_at: options.startedAt,
finished_at: options.finishedAt ?? null,
status: options.status,
report_json: JSON.stringify(options.report),
}),
);
}, options);
return id;
}
export function recordOpenClawStateMigrationSource(
options: RecordOpenClawStateMigrationSourceOptions,
): void {
runOpenClawStateWriteTransaction((database) => {
const db = getNodeSqliteKysely<OpenClawStateMetadataDatabase>(database.db);
executeSqliteQuerySync(
database.db,
db
.insertInto("migration_sources")
.values({
source_key: options.sourceKey,
migration_kind: options.migrationKind,
source_path: options.sourcePath,
target_table: options.targetTable,
source_sha256: options.sourceSha256 ?? null,
source_size_bytes: options.sourceSizeBytes ?? null,
source_record_count: options.sourceRecordCount ?? null,
last_run_id: options.runId,
status: options.status,
imported_at: options.importedAt,
removed_source: options.removedSource ? 1 : 0,
report_json: JSON.stringify(options.report),
})
.onConflict((conflict) =>
conflict.column("source_key").doUpdateSet({
migration_kind: (eb) => eb.ref("excluded.migration_kind"),
source_path: (eb) => eb.ref("excluded.source_path"),
target_table: (eb) => eb.ref("excluded.target_table"),
source_sha256: (eb) => eb.ref("excluded.source_sha256"),
source_size_bytes: (eb) => eb.ref("excluded.source_size_bytes"),
source_record_count: (eb) => eb.ref("excluded.source_record_count"),
last_run_id: (eb) => eb.ref("excluded.last_run_id"),
status: (eb) => eb.ref("excluded.status"),
imported_at: (eb) => eb.ref("excluded.imported_at"),
removed_source: (eb) => eb.ref("excluded.removed_source"),
report_json: (eb) => eb.ref("excluded.report_json"),
}),
),
);
}, options);
}
export function recordOpenClawStateBackupRun(options: RecordOpenClawStateBackupRunOptions): string {
const id = options.id ?? randomUUID();
runOpenClawStateWriteTransaction((database) => {
const db = getNodeSqliteKysely<OpenClawStateMetadataDatabase>(database.db);
executeSqliteQuerySync(
database.db,
db.insertInto("backup_runs").values({
id,
created_at: options.createdAt,
archive_path: options.archivePath,
status: options.status,
manifest_json: JSON.stringify(options.manifest),
}),
);
}, options);
return id;
}
export function closeOpenClawStateDatabase(): void {
for (const database of cachedDatabases.values()) {
database.walMaintenance.close();
clearNodeSqliteKyselyCacheForDatabase(database.db);
if (database.db.isOpen) {
database.db.close();
}
}
cachedDatabases.clear();
}
export function isOpenClawStateDatabaseOpen(): boolean {
return Array.from(cachedDatabases.values()).some((database) => database.db.isOpen);
}
export const closeOpenClawStateDatabaseForTest = closeOpenClawStateDatabase;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,106 @@
import { readFileSync } from "node:fs";
import { DatabaseSync } from "node:sqlite";
type ColumnShape = {
name: string;
type: string;
notnull: number;
dflt_value: unknown;
pk: number;
};
type IndexShape = {
name: string;
unique: number;
origin: string;
partial: number;
};
export type SqliteSchemaShape = Record<
string,
{
columns: ColumnShape[];
indexes: IndexShape[];
}
>;
type TableInfoRow = ColumnShape & {
cid: number;
};
type IndexListRow = IndexShape & {
seq: number;
};
type SqliteMasterRow = {
name: string;
};
export function createSqliteSchemaShapeFromSql(schemaUrl: URL): SqliteSchemaShape {
const db = new DatabaseSync(":memory:");
try {
db.exec(readFileSync(schemaUrl, "utf8"));
return collectSqliteSchemaShape(db);
} finally {
db.close();
}
}
export function collectSqliteSchemaShape(db: DatabaseSync): SqliteSchemaShape {
const tableRows = db
.prepare(
`
SELECT name
FROM sqlite_master
WHERE type = 'table'
AND name NOT LIKE 'sqlite_%'
ORDER BY name ASC
`,
)
.all() as SqliteMasterRow[];
return Object.fromEntries(
tableRows.map((table) => [
table.name,
{
columns: collectColumns(db, table.name),
indexes: collectIndexes(db, table.name),
},
]),
);
}
function collectColumns(db: DatabaseSync, tableName: string): ColumnShape[] {
return (
db.prepare(`PRAGMA table_info(${quoteSqliteIdentifier(tableName)})`).all() as TableInfoRow[]
)
.map(({ name, type, notnull, dflt_value, pk }) => ({
name,
type,
notnull,
dflt_value,
pk,
}))
.toSorted((left, right) => left.name.localeCompare(right.name));
}
function collectIndexes(db: DatabaseSync, tableName: string): IndexShape[] {
return (
db.prepare(`PRAGMA index_list(${quoteSqliteIdentifier(tableName)})`).all() as IndexListRow[]
)
.map(({ name, unique, origin, partial }) => ({
name: normalizeAutoIndexName(name),
unique,
origin,
partial,
}))
.toSorted((left, right) => left.name.localeCompare(right.name));
}
function normalizeAutoIndexName(name: string): string {
return name.startsWith("sqlite_autoindex_") ? "sqlite_autoindex" : name;
}
function quoteSqliteIdentifier(identifier: string): string {
return `"${identifier.replaceAll('"', '""')}"`;
}