clawdbot-587: align sqlite session schema

This commit is contained in:
Josh Lehman
2026-06-03 07:46:26 -07:00
parent 5a98fb41d1
commit 978004252d
7 changed files with 931 additions and 22 deletions

View File

@@ -2,8 +2,13 @@ import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { executeSqliteQueryTakeFirstSync, getNodeSqliteKysely } from "../../infra/kysely-sync.js";
import { onSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
import { closeOpenClawAgentDatabasesForTest } from "../../state/openclaw-agent-db.js";
import type { DB as OpenClawAgentKyselyDatabase } from "../../state/openclaw-agent-db.generated.js";
import {
closeOpenClawAgentDatabasesForTest,
openOpenClawAgentDatabase,
} from "../../state/openclaw-agent-db.js";
import { closeOpenClawStateDatabaseForTest } from "../../state/openclaw-state-db.js";
import {
appendTranscriptEvent,
@@ -419,6 +424,28 @@ describe.each([fileBackedAdapter, sqliteAdapter])(
});
if (adapter.name === "sqlite") {
expect(fs.existsSync(cleanupStorePath)).toBe(false);
const database = openOpenClawAgentDatabase({
agentId: "main",
env: { ...process.env, OPENCLAW_STATE_DIR: paths.stateDir },
path: path.join(paths.stateDir, "agents", "main", "agent", "openclaw-agent.sqlite"),
});
const db = getNodeSqliteKysely<OpenClawAgentKyselyDatabase>(database.db);
const removedRoute = executeSqliteQueryTakeFirstSync(
database.db,
db
.selectFrom("session_routes")
.select("session_id")
.where("session_key", "=", "agent:main:lifecycle-cleanup-removed"),
);
expect(removedRoute).toBeUndefined();
const freshRoute = executeSqliteQueryTakeFirstSync(
database.db,
db
.selectFrom("session_routes")
.select("session_id")
.where("session_key", "=", "agent:main:lifecycle-cleanup-fresh"),
);
expect(freshRoute).toEqual({ session_id: "fresh-lifecycle" });
await expect(
adapter.loadTranscriptEvents(scopedTranscript("agent:main:regular", "referenced")),
).resolves.not.toEqual([]);
@@ -727,3 +754,119 @@ describe.each([fileBackedAdapter, sqliteAdapter])(
});
},
);
describe("sqlite session normalization", () => {
let paths: TestPaths;
beforeEach(() => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-sqlite-norm-"));
paths = {
sqlitePath: path.join(tempDir, "openclaw-agent.sqlite"),
stateDir: path.join(tempDir, "state"),
storePath: path.join(tempDir, "sessions.json"),
tempDir,
transcriptPath: path.join(tempDir, "session.jsonl"),
};
});
afterEach(() => {
closeOpenClawAgentDatabasesForTest();
closeOpenClawStateDatabaseForTest();
fs.rmSync(paths.tempDir, { recursive: true, force: true });
});
it("maintains normalized session root and route rows", async () => {
const env = { ...process.env, OPENCLAW_STATE_DIR: paths.stateDir };
await upsertSqliteSessionEntry(
{
agentId: "main",
env,
sessionKey: "agent:main:group:example",
storePath: paths.sqlitePath,
},
{
agentHarnessId: "codex",
chatType: "group",
channel: "discord",
deliveryContext: {
accountId: "acct-1",
channel: "discord",
threadId: "thread-1",
to: "group-1",
},
displayName: "Example group",
endedAt: 90,
model: "gpt-5.5",
modelProvider: "openai",
parentSessionKey: "agent:main:parent",
sessionId: "normalized-session",
sessionStartedAt: 50,
spawnedBy: "agent:main:spawner",
startedAt: 60,
status: "done",
updatedAt: 100,
},
);
const database = openOpenClawAgentDatabase({
agentId: "main",
env,
path: paths.sqlitePath,
});
const db = getNodeSqliteKysely<OpenClawAgentKyselyDatabase>(database.db);
const session = executeSqliteQueryTakeFirstSync(
database.db,
db
.selectFrom("sessions")
.select([
"account_id",
"agent_harness_id",
"channel",
"chat_type",
"created_at",
"display_name",
"ended_at",
"model",
"model_provider",
"parent_session_key",
"session_key",
"session_scope",
"spawned_by",
"started_at",
"status",
"updated_at",
])
.where("session_id", "=", "normalized-session"),
);
expect(session).toEqual({
account_id: "acct-1",
agent_harness_id: "codex",
channel: "discord",
chat_type: "group",
created_at: 50,
display_name: "Example group",
ended_at: 90,
model: "gpt-5.5",
model_provider: "openai",
parent_session_key: "agent:main:parent",
session_key: "agent:main:group:example",
session_scope: "group",
spawned_by: "agent:main:spawner",
started_at: 60,
status: "done",
updated_at: expect.any(Number),
});
const route = executeSqliteQueryTakeFirstSync(
database.db,
db
.selectFrom("session_routes")
.select(["session_id", "updated_at"])
.where("session_key", "=", "agent:main:group:example"),
);
expect(route).toEqual({
session_id: "normalized-session",
updated_at: expect.any(Number),
});
});
});

View File

@@ -45,7 +45,13 @@ import { mergeSessionEntry, mergeSessionEntryPreserveActivity } from "./types.js
type SessionSqliteDatabase = Pick<
OpenClawAgentKyselyDatabase,
"session_entries" | "sessions" | "transcript_event_identities" | "transcript_events"
| "conversations"
| "session_conversations"
| "session_entries"
| "session_routes"
| "sessions"
| "transcript_event_identities"
| "transcript_events"
>;
type SessionEntryRow = Selectable<OpenClawAgentKyselyDatabase["session_entries"]>;
@@ -525,6 +531,17 @@ function cloneSessionEntry(entry: SessionEntry): SessionEntry {
return structuredClone(entry);
}
function normalizeSqliteText(value: unknown): string | null {
return typeof value === "string" && value.trim() ? value.trim() : null;
}
function normalizeSqliteChatType(value: unknown): "direct" | "group" | "channel" | null {
if (value === "direct" || value === "group" || value === "channel") {
return value;
}
return null;
}
function normalizeSqliteNumber(value: number | bigint): number {
return typeof value === "bigint" ? Number(value) : value;
}
@@ -725,6 +742,10 @@ function cleanupSqliteSessionLifecycleArtifactsInTransaction(
) {
continue;
}
executeSqliteQuerySync(
database.db,
db.deleteFrom("session_routes").where("session_key", "=", row.session_key),
);
executeSqliteQuerySync(
database.db,
db.deleteFrom("session_entries").where("session_key", "=", row.session_key),
@@ -759,23 +780,37 @@ function writeSessionEntry(
): void {
const db = getSessionKysely(database.db);
const updatedAt = entry.updatedAt;
const sessionRow = bindSqliteSessionRoot({ entry, sessionKey, updatedAt });
executeSqliteQuerySync(
database.db,
db
.insertInto("sessions")
.values({
session_id: entry.sessionId,
session_key: sessionKey,
created_at: entry.sessionStartedAt ?? updatedAt,
updated_at: updatedAt,
})
.values(sessionRow)
.onConflict((conflict) =>
conflict.column("session_id").doUpdateSet({
session_key: sessionKey,
session_scope: sessionRow.session_scope,
updated_at: updatedAt,
started_at: sessionRow.started_at,
ended_at: sessionRow.ended_at,
status: sessionRow.status,
chat_type: sessionRow.chat_type,
channel: sessionRow.channel,
account_id: sessionRow.account_id,
model_provider: sessionRow.model_provider,
model: sessionRow.model,
agent_harness_id: sessionRow.agent_harness_id,
parent_session_key: sessionRow.parent_session_key,
spawned_by: sessionRow.spawned_by,
display_name: sessionRow.display_name,
}),
),
);
writeSessionRoute(database, {
sessionId: sessionRow.session_id,
sessionKey,
updatedAt,
});
executeSqliteQuerySync(
database.db,
db
@@ -809,6 +844,7 @@ function ensureTranscriptSessionRoot(
.values({
session_id: scope.sessionId,
session_key: scope.sessionKey,
session_scope: "conversation",
created_at: updatedAt,
updated_at: updatedAt,
})
@@ -819,6 +855,118 @@ function ensureTranscriptSessionRoot(
}),
),
);
writeSessionRoute(database, {
sessionId: scope.sessionId,
sessionKey: scope.sessionKey,
updatedAt,
});
}
function bindSqliteSessionRoot(params: {
entry: SessionEntry;
sessionKey: string;
updatedAt: number;
}) {
const updatedAt = Number.isFinite(params.entry.updatedAt)
? params.entry.updatedAt
: params.updatedAt;
return {
session_id: params.entry.sessionId,
session_key: params.sessionKey,
session_scope: resolveSqliteSessionScope(params.entry, params.sessionKey),
created_at: resolveSqliteSessionCreatedAt(params.entry, updatedAt),
updated_at: updatedAt,
started_at: finiteSqliteNumber(params.entry.startedAt),
ended_at: finiteSqliteNumber(params.entry.endedAt),
status: normalizeSqliteText(params.entry.status),
chat_type: normalizeSqliteChatType(params.entry.chatType),
channel: resolveSqliteSessionChannel(params.entry),
account_id: resolveSqliteSessionAccountId(params.entry),
primary_conversation_id: null,
model_provider: normalizeSqliteText(params.entry.modelProvider),
model: normalizeSqliteText(params.entry.model),
agent_harness_id: normalizeSqliteText(params.entry.agentHarnessId),
parent_session_key: normalizeSqliteText(params.entry.parentSessionKey),
spawned_by: normalizeSqliteText(params.entry.spawnedBy),
display_name: resolveSqliteSessionDisplayName(params.entry),
};
}
function writeSessionRoute(
database: OpenClawAgentDatabase,
params: { sessionId: string; sessionKey: string; updatedAt: number },
): void {
const db = getSessionKysely(database.db);
executeSqliteQuerySync(
database.db,
db
.insertInto("session_routes")
.values({
session_key: params.sessionKey,
session_id: params.sessionId,
updated_at: params.updatedAt,
})
.onConflict((conflict) =>
conflict.column("session_key").doUpdateSet({
session_id: params.sessionId,
updated_at: params.updatedAt,
}),
),
);
}
function resolveSqliteSessionScope(
entry: Pick<SessionEntry, "chatType">,
sessionKey: string,
): "conversation" | "shared-main" | "group" | "channel" {
const chatType = normalizeSqliteChatType(entry.chatType);
const normalizedKey = sessionKey.trim().toLowerCase();
if (chatType === "direct" && (normalizedKey === "main" || normalizedKey.endsWith(":main"))) {
return "shared-main";
}
if (chatType === "group" || chatType === "channel") {
return chatType;
}
return "conversation";
}
function resolveSqliteSessionCreatedAt(entry: SessionEntry, updatedAt: number): number {
for (const candidate of [entry.sessionStartedAt, entry.startedAt, entry.updatedAt, updatedAt]) {
if (typeof candidate === "number" && Number.isFinite(candidate) && candidate >= 0) {
return candidate;
}
}
return updatedAt;
}
function finiteSqliteNumber(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function resolveSqliteSessionChannel(entry: SessionEntry): string | null {
return (
normalizeSqliteText(entry.channel) ??
normalizeSqliteText(entry.deliveryContext?.channel) ??
normalizeSqliteText(entry.lastChannel) ??
normalizeSqliteText(entry.origin?.provider)
);
}
function resolveSqliteSessionAccountId(entry: SessionEntry): string | null {
return (
normalizeSqliteText(entry.deliveryContext?.accountId) ??
normalizeSqliteText(entry.lastAccountId) ??
normalizeSqliteText(entry.origin?.accountId)
);
}
function resolveSqliteSessionDisplayName(entry: SessionEntry): string | null {
return (
normalizeSqliteText(entry.displayName) ??
normalizeSqliteText(entry.label) ??
normalizeSqliteText(entry.subject) ??
normalizeSqliteText(entry.groupId)
);
}
function readNextTranscriptSeq(database: OpenClawAgentDatabase, sessionId: string): number {

View File

@@ -31,6 +31,22 @@ export interface CacheEntries {
value_json: string | null;
}
export interface Conversations {
account_id: string;
channel: string;
conversation_id: string;
created_at: number;
kind: string;
label: string | null;
metadata_json: string | null;
native_channel_id: string | null;
native_direct_user_id: string | null;
parent_conversation_id: string | null;
peer_id: string;
thread_id: string | null;
updated_at: number;
}
export interface SchemaMeta {
agent_id: string | null;
app_version: string | null;
@@ -41,6 +57,14 @@ export interface SchemaMeta {
updated_at: number;
}
export interface SessionConversations {
conversation_id: string;
first_seen_at: number;
last_seen_at: number;
role: Generated<string>;
session_id: string;
}
export interface SessionEntries {
entry_json: string;
session_id: string;
@@ -48,13 +72,33 @@ export interface SessionEntries {
updated_at: number;
}
export interface Sessions {
created_at: number;
export interface SessionRoutes {
session_id: string;
session_key: string;
updated_at: number;
}
export interface Sessions {
account_id: string | null;
agent_harness_id: string | null;
channel: string | null;
chat_type: string | null;
created_at: number;
display_name: string | null;
ended_at: number | null;
model: string | null;
model_provider: string | null;
parent_session_key: string | null;
primary_conversation_id: string | null;
session_id: string;
session_key: string;
session_scope: Generated<string>;
spawned_by: string | null;
started_at: number | null;
status: string | null;
updated_at: number;
}
export interface TranscriptEventIdentities {
created_at: number;
event_id: string;
@@ -76,8 +120,11 @@ export interface DB {
auth_profile_state: AuthProfileState;
auth_profile_store: AuthProfileStore;
cache_entries: CacheEntries;
conversations: Conversations;
schema_meta: SchemaMeta;
session_conversations: SessionConversations;
session_entries: SessionEntries;
session_routes: SessionRoutes;
sessions: Sessions;
transcript_event_identities: TranscriptEventIdentities;
transcript_events: TranscriptEvents;

View File

@@ -88,7 +88,7 @@ describe("openclaw agent database", () => {
expect(registered).toMatchObject({
agentId: "worker-1",
path: database.path,
schemaVersion: 1,
schemaVersion: 2,
});
expect(registered?.sizeBytes).toBeGreaterThan(0);
});
@@ -194,7 +194,7 @@ describe("openclaw agent database", () => {
expect(readSqliteNumberPragma(database.db, "busy_timeout")).toBe(30_000);
expect(readSqliteNumberPragma(database.db, "foreign_keys")).toBe(1);
expect(readSqliteNumberPragma(database.db, "synchronous")).toBe(1);
expect(readSqliteNumberPragma(database.db, "user_version")).toBe(1);
expect(readSqliteNumberPragma(database.db, "user_version")).toBe(2);
expect(readSqliteNumberPragma(database.db, "wal_autocheckpoint")).toBe(1000);
const journalMode = database.db.prepare("PRAGMA journal_mode").get() as
| { journal_mode?: string }
@@ -217,11 +217,125 @@ describe("openclaw agent database", () => {
),
).toEqual({
role: "agent",
schema_version: 1,
schema_version: 2,
agent_id: "worker-1",
});
});
it("migrates compact v1 session tables before applying normalized indexes", () => {
const stateDir = createTempStateDir();
const databasePath = path.join(
stateDir,
"agents",
"worker-1",
"agent",
"openclaw-agent.sqlite",
);
fs.mkdirSync(path.dirname(databasePath), { recursive: true });
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(databasePath);
db.exec(`
CREATE TABLE schema_meta (
meta_key TEXT NOT NULL PRIMARY KEY,
role TEXT NOT NULL,
schema_version INTEGER NOT NULL,
agent_id TEXT,
app_version TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
INSERT INTO schema_meta
(meta_key, role, schema_version, agent_id, app_version, created_at, updated_at)
VALUES ('primary', 'agent', 1, 'worker-1', NULL, 1, 1);
CREATE TABLE sessions (
session_id TEXT NOT NULL PRIMARY KEY,
session_key TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
INSERT INTO sessions (session_id, session_key, created_at, updated_at)
VALUES ('session-1', 'agent:worker-1:main', 10, 20);
CREATE TABLE session_entries (
session_key TEXT NOT NULL PRIMARY KEY,
session_id TEXT NOT NULL,
entry_json TEXT NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
);
INSERT INTO session_entries (session_key, session_id, entry_json, updated_at)
VALUES (
'agent:worker-1:group:example',
'session-1',
'{"sessionId":"session-1","updatedAt":20,"startedAt":11,"endedAt":19,"status":"done","chatType":"group","channel":"discord","deliveryContext":{"accountId":"acct-1"},"modelProvider":"openai","model":"gpt-5.5","agentHarnessId":"codex","parentSessionKey":"agent:worker-1:parent","spawnedBy":"agent:worker-1:spawner","displayName":"Example group"}',
20
);
PRAGMA user_version = 1;
`);
db.close();
const database = openOpenClawAgentDatabase({
agentId: "worker-1",
env: { OPENCLAW_STATE_DIR: stateDir },
});
expect(readSqliteNumberPragma(database.db, "user_version")).toBe(2);
const session = database.db
.prepare(
`
SELECT
account_id,
agent_harness_id,
channel,
chat_type,
display_name,
ended_at,
model,
model_provider,
parent_session_key,
session_scope,
spawned_by,
started_at,
status
FROM sessions
WHERE session_id = ?
`,
)
.get("session-1");
expect(session).toEqual({
account_id: "acct-1",
agent_harness_id: "codex",
channel: "discord",
chat_type: "group",
display_name: "Example group",
ended_at: 19,
model: "gpt-5.5",
model_provider: "openai",
parent_session_key: "agent:worker-1:parent",
session_scope: "group",
spawned_by: "agent:worker-1:spawner",
started_at: 11,
status: "done",
});
const route = database.db
.prepare("SELECT session_id, updated_at FROM session_routes WHERE session_key = ?")
.get("agent:worker-1:group:example");
expect(route).toEqual({
session_id: "session-1",
updated_at: 20,
});
const sessionForeignKeys = database.db.prepare("PRAGMA foreign_key_list(sessions)").all() as
| Array<{ from?: unknown; on_delete?: unknown; table?: unknown; to?: unknown }>
| undefined;
expect(sessionForeignKeys).toContainEqual(
expect.objectContaining({
from: "primary_conversation_id",
on_delete: "SET NULL",
table: "conversations",
to: "conversation_id",
}),
);
});
it("refuses to open newer per-agent schema versions", () => {
const stateDir = createTempStateDir();
const databasePath = path.join(
@@ -234,7 +348,7 @@ describe("openclaw agent database", () => {
fs.mkdirSync(path.dirname(databasePath), { recursive: true });
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(databasePath);
db.exec("PRAGMA user_version = 2;");
db.exec("PRAGMA user_version = 3;");
db.close();
expect(() =>
@@ -242,6 +356,6 @@ describe("openclaw agent database", () => {
agentId: "worker-1",
env: { OPENCLAW_STATE_DIR: stateDir },
}),
).toThrow(/newer schema version 2/);
).toThrow(/newer schema version 3/);
});
});

View File

@@ -29,7 +29,7 @@ export { resolveOpenClawAgentSqlitePath } from "./openclaw-agent-db.paths.js";
* per pathname, protected with private file modes, and registered in the shared
* OpenClaw state database for discovery and maintenance.
*/
const OPENCLAW_AGENT_SCHEMA_VERSION = 1;
const OPENCLAW_AGENT_SCHEMA_VERSION = 2;
const OPENCLAW_AGENT_DB_DIR_MODE = 0o700;
const OPENCLAW_AGENT_DB_FILE_MODE = 0o600;
const OPENCLAW_AGENT_DB_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
@@ -66,6 +66,8 @@ type ExistingSchemaMeta = {
role: string | null;
};
type MigratedSessionEntry = Record<string, unknown>;
function readSqliteUserVersion(db: DatabaseSync): number {
const row = db.prepare("PRAGMA user_version").get() as { user_version?: unknown } | undefined;
return Number(row?.user_version ?? 0);
@@ -80,6 +82,294 @@ function assertSupportedAgentSchemaVersion(db: DatabaseSync, pathname: string):
}
}
function readSqliteSessionColumns(db: DatabaseSync): Set<string> | null {
const table = db
.prepare("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?")
.get("sessions");
if (!table) {
return null;
}
const rows = db.prepare("PRAGMA table_info(sessions)").all() as Array<{
name?: unknown;
}>;
return new Set(rows.flatMap((row) => (typeof row.name === "string" ? [row.name] : [])));
}
function migratedSessionColumn(
columns: ReadonlySet<string>,
columnName: string,
fallback: string,
): string {
return columns.has(columnName) ? columnName : fallback;
}
function migrateOpenClawAgentSchema(db: DatabaseSync): void {
const userVersion = readSqliteUserVersion(db);
if (userVersion >= OPENCLAW_AGENT_SCHEMA_VERSION) {
return;
}
const columns = readSqliteSessionColumns(db);
if (userVersion > 1 || !columns) {
return;
}
const copyColumns = [
"session_id",
"session_key",
"session_scope",
"created_at",
"updated_at",
"started_at",
"ended_at",
"status",
"chat_type",
"channel",
"account_id",
"primary_conversation_id",
"model_provider",
"model",
"agent_harness_id",
"parent_session_key",
"spawned_by",
"display_name",
];
const selectColumns = [
"session_id",
"session_key",
migratedSessionColumn(columns, "session_scope", "'conversation'"),
"created_at",
"updated_at",
migratedSessionColumn(columns, "started_at", "NULL"),
migratedSessionColumn(columns, "ended_at", "NULL"),
migratedSessionColumn(columns, "status", "NULL"),
migratedSessionColumn(columns, "chat_type", "NULL"),
migratedSessionColumn(columns, "channel", "NULL"),
migratedSessionColumn(columns, "account_id", "NULL"),
migratedSessionColumn(columns, "primary_conversation_id", "NULL"),
migratedSessionColumn(columns, "model_provider", "NULL"),
migratedSessionColumn(columns, "model", "NULL"),
migratedSessionColumn(columns, "agent_harness_id", "NULL"),
migratedSessionColumn(columns, "parent_session_key", "NULL"),
migratedSessionColumn(columns, "spawned_by", "NULL"),
migratedSessionColumn(columns, "display_name", "NULL"),
];
db.exec(`
CREATE TABLE IF NOT EXISTS conversations (
conversation_id TEXT NOT NULL PRIMARY KEY,
channel TEXT NOT NULL,
account_id TEXT NOT NULL,
kind TEXT NOT NULL CHECK (kind IN ('direct', 'group', 'channel')),
peer_id TEXT NOT NULL,
parent_conversation_id TEXT,
thread_id TEXT,
native_channel_id TEXT,
native_direct_user_id TEXT,
label TEXT,
metadata_json TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
`);
db.exec("PRAGMA foreign_keys = OFF;");
try {
db.exec(`
DROP TABLE IF EXISTS sessions_new;
CREATE TABLE sessions_new (
session_id TEXT NOT NULL PRIMARY KEY,
session_key TEXT NOT NULL,
session_scope TEXT NOT NULL DEFAULT 'conversation' CHECK (session_scope IN ('conversation', 'shared-main', 'group', 'channel')),
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
started_at INTEGER,
ended_at INTEGER,
status TEXT CHECK (status IS NULL OR status IN ('running', 'done', 'failed', 'killed', 'timeout')),
chat_type TEXT CHECK (chat_type IS NULL OR chat_type IN ('direct', 'group', 'channel')),
channel TEXT,
account_id TEXT,
primary_conversation_id TEXT,
model_provider TEXT,
model TEXT,
agent_harness_id TEXT,
parent_session_key TEXT,
spawned_by TEXT,
display_name TEXT,
FOREIGN KEY (primary_conversation_id) REFERENCES conversations(conversation_id) ON DELETE SET NULL
);
INSERT INTO sessions_new (${copyColumns.join(", ")})
SELECT ${selectColumns.join(", ")} FROM sessions;
DROP TABLE sessions;
ALTER TABLE sessions_new RENAME TO sessions;
`);
} finally {
db.exec("PRAGMA foreign_keys = ON;");
}
}
function parseMigratedSessionEntry(value: unknown): MigratedSessionEntry | null {
if (typeof value !== "string") {
return null;
}
try {
const parsed = JSON.parse(value) as unknown;
return parsed && typeof parsed === "object" && !Array.isArray(parsed)
? (parsed as MigratedSessionEntry)
: null;
} catch {
return null;
}
}
function migratedObjectField(
entry: MigratedSessionEntry,
key: string,
): MigratedSessionEntry | null {
const value = entry[key];
return value && typeof value === "object" && !Array.isArray(value)
? (value as MigratedSessionEntry)
: null;
}
function migratedText(value: unknown): string | null {
return typeof value === "string" && value.trim() ? value.trim() : null;
}
function migratedNumber(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function migratedChatType(value: unknown): "direct" | "group" | "channel" | null {
if (value === "direct" || value === "group" || value === "channel") {
return value;
}
return null;
}
function migratedStatus(
value: unknown,
): "running" | "done" | "failed" | "killed" | "timeout" | null {
if (
value === "running" ||
value === "done" ||
value === "failed" ||
value === "killed" ||
value === "timeout"
) {
return value;
}
return null;
}
function migratedSessionScope(
entry: MigratedSessionEntry,
sessionKey: string,
): "conversation" | "shared-main" | "group" | "channel" {
const chatType = migratedChatType(entry.chatType);
const normalizedKey = sessionKey.trim().toLowerCase();
if (chatType === "direct" && (normalizedKey === "main" || normalizedKey.endsWith(":main"))) {
return "shared-main";
}
if (chatType === "group" || chatType === "channel") {
return chatType;
}
return "conversation";
}
function migratedEntryChannel(entry: MigratedSessionEntry): string | null {
const deliveryContext = migratedObjectField(entry, "deliveryContext");
const origin = migratedObjectField(entry, "origin");
return (
migratedText(entry.channel) ??
migratedText(deliveryContext?.channel) ??
migratedText(entry.lastChannel) ??
migratedText(origin?.provider)
);
}
function migratedEntryAccountId(entry: MigratedSessionEntry): string | null {
const deliveryContext = migratedObjectField(entry, "deliveryContext");
const origin = migratedObjectField(entry, "origin");
return (
migratedText(deliveryContext?.accountId) ??
migratedText(entry.lastAccountId) ??
migratedText(origin?.accountId)
);
}
function migratedEntryDisplayName(entry: MigratedSessionEntry): string | null {
return (
migratedText(entry.displayName) ??
migratedText(entry.label) ??
migratedText(entry.subject) ??
migratedText(entry.groupId)
);
}
function backfillOpenClawAgentSchema(db: DatabaseSync, previousVersion: number): void {
if (previousVersion >= 2) {
return;
}
db.exec(`
INSERT OR REPLACE INTO session_routes (session_key, session_id, updated_at)
SELECT se.session_key, se.session_id, se.updated_at
FROM session_entries AS se
INNER JOIN sessions AS s ON s.session_id = se.session_id;
`);
const rows = db
.prepare(
`
SELECT se.session_key, se.session_id, se.entry_json
FROM session_entries AS se
INNER JOIN sessions AS s ON s.session_id = se.session_id;
`,
)
.all() as Array<{
entry_json?: unknown;
session_id?: unknown;
session_key?: unknown;
}>;
const update = db.prepare(`
UPDATE sessions
SET
session_scope = ?,
started_at = ?,
ended_at = ?,
status = ?,
chat_type = ?,
channel = ?,
account_id = ?,
model_provider = ?,
model = ?,
agent_harness_id = ?,
parent_session_key = ?,
spawned_by = ?,
display_name = ?
WHERE session_id = ?;
`);
for (const row of rows) {
const sessionKey = migratedText(row.session_key);
const sessionId = migratedText(row.session_id);
const entry = parseMigratedSessionEntry(row.entry_json);
if (!sessionKey || !sessionId || !entry) {
continue;
}
update.run(
migratedSessionScope(entry, sessionKey),
migratedNumber(entry.startedAt),
migratedNumber(entry.endedAt),
migratedStatus(entry.status),
migratedChatType(entry.chatType),
migratedEntryChannel(entry),
migratedEntryAccountId(entry),
migratedText(entry.modelProvider),
migratedText(entry.model),
migratedText(entry.agentHarnessId),
migratedText(entry.parentSessionKey),
migratedText(entry.spawnedBy),
migratedEntryDisplayName(entry),
sessionId,
);
}
}
function ensureOpenClawAgentDatabasePermissions(
pathname: string,
options: OpenClawAgentDatabaseOptions,
@@ -150,7 +440,10 @@ function assertExistingSchemaOwner(
function ensureAgentSchema(db: DatabaseSync, agentId: string, pathname: string): void {
assertSupportedAgentSchemaVersion(db, pathname);
assertExistingSchemaOwner(readExistingSchemaMeta(db), agentId, pathname);
const previousVersion = readSqliteUserVersion(db);
migrateOpenClawAgentSchema(db);
db.exec(OPENCLAW_AGENT_SCHEMA_SQL);
backfillOpenClawAgentSchema(db, previousVersion);
const kysely = getNodeSqliteKysely<OpenClawAgentMetadataDatabase>(db);
db.exec(`PRAGMA user_version = ${OPENCLAW_AGENT_SCHEMA_VERSION};`);
const now = Date.now();

View File

@@ -16,12 +16,94 @@ export const OPENCLAW_AGENT_SCHEMA_SQL = `CREATE TABLE IF NOT EXISTS schema_meta
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT NOT NULL PRIMARY KEY,
session_key TEXT NOT NULL,
session_scope TEXT NOT NULL DEFAULT 'conversation' CHECK (session_scope IN ('conversation', 'shared-main', 'group', 'channel')),
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
started_at INTEGER,
ended_at INTEGER,
status TEXT CHECK (status IS NULL OR status IN ('running', 'done', 'failed', 'killed', 'timeout')),
chat_type TEXT CHECK (chat_type IS NULL OR chat_type IN ('direct', 'group', 'channel')),
channel TEXT,
account_id TEXT,
primary_conversation_id TEXT,
model_provider TEXT,
model TEXT,
agent_harness_id TEXT,
parent_session_key TEXT,
spawned_by TEXT,
display_name TEXT,
FOREIGN KEY (primary_conversation_id) REFERENCES conversations(conversation_id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_agent_sessions_updated_at
ON sessions(updated_at DESC, session_id);
CREATE INDEX IF NOT EXISTS idx_agent_sessions_created_at
ON sessions(created_at DESC, session_id);
CREATE INDEX IF NOT EXISTS idx_agent_sessions_conversation
ON sessions(primary_conversation_id, updated_at DESC, session_id)
WHERE primary_conversation_id IS NOT NULL;
CREATE TABLE IF NOT EXISTS session_routes (
session_key TEXT NOT NULL PRIMARY KEY,
session_id TEXT NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_agent_session_routes_session_id
ON session_routes(session_id);
CREATE TABLE IF NOT EXISTS conversations (
conversation_id TEXT NOT NULL PRIMARY KEY,
channel TEXT NOT NULL,
account_id TEXT NOT NULL,
kind TEXT NOT NULL CHECK (kind IN ('direct', 'group', 'channel')),
peer_id TEXT NOT NULL,
parent_conversation_id TEXT,
thread_id TEXT,
native_channel_id TEXT,
native_direct_user_id TEXT,
label TEXT,
metadata_json TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_agent_sessions_updated
ON sessions(updated_at DESC, session_id);
CREATE INDEX IF NOT EXISTS idx_agent_conversations_lookup
ON conversations(channel, account_id, kind, peer_id, thread_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_conversations_identity
ON conversations(
channel,
account_id,
kind,
peer_id,
IFNULL(parent_conversation_id, ''),
IFNULL(thread_id, '')
);
CREATE INDEX IF NOT EXISTS idx_agent_conversations_updated
ON conversations(updated_at DESC, conversation_id);
CREATE TABLE IF NOT EXISTS session_conversations (
session_id TEXT NOT NULL,
conversation_id TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'primary' CHECK (role IN ('primary', 'participant', 'related')),
first_seen_at INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
PRIMARY KEY (session_id, conversation_id, role),
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE,
FOREIGN KEY (conversation_id) REFERENCES conversations(conversation_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_agent_session_conversations_conversation
ON session_conversations(conversation_id, last_seen_at DESC, session_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_session_conversations_primary
ON session_conversations(session_id)
WHERE role = 'primary';
CREATE TABLE IF NOT EXISTS session_entries (
session_key TEXT NOT NULL PRIMARY KEY,
@@ -31,7 +113,7 @@ CREATE TABLE IF NOT EXISTS session_entries (
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_agent_session_entries_updated
CREATE INDEX IF NOT EXISTS idx_agent_session_entries_updated_at
ON session_entries(updated_at DESC, session_key);
CREATE INDEX IF NOT EXISTS idx_agent_session_entries_session_id

View File

@@ -11,12 +11,94 @@ CREATE TABLE IF NOT EXISTS schema_meta (
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT NOT NULL PRIMARY KEY,
session_key TEXT NOT NULL,
session_scope TEXT NOT NULL DEFAULT 'conversation' CHECK (session_scope IN ('conversation', 'shared-main', 'group', 'channel')),
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
started_at INTEGER,
ended_at INTEGER,
status TEXT CHECK (status IS NULL OR status IN ('running', 'done', 'failed', 'killed', 'timeout')),
chat_type TEXT CHECK (chat_type IS NULL OR chat_type IN ('direct', 'group', 'channel')),
channel TEXT,
account_id TEXT,
primary_conversation_id TEXT,
model_provider TEXT,
model TEXT,
agent_harness_id TEXT,
parent_session_key TEXT,
spawned_by TEXT,
display_name TEXT,
FOREIGN KEY (primary_conversation_id) REFERENCES conversations(conversation_id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_agent_sessions_updated_at
ON sessions(updated_at DESC, session_id);
CREATE INDEX IF NOT EXISTS idx_agent_sessions_created_at
ON sessions(created_at DESC, session_id);
CREATE INDEX IF NOT EXISTS idx_agent_sessions_conversation
ON sessions(primary_conversation_id, updated_at DESC, session_id)
WHERE primary_conversation_id IS NOT NULL;
CREATE TABLE IF NOT EXISTS session_routes (
session_key TEXT NOT NULL PRIMARY KEY,
session_id TEXT NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_agent_session_routes_session_id
ON session_routes(session_id);
CREATE TABLE IF NOT EXISTS conversations (
conversation_id TEXT NOT NULL PRIMARY KEY,
channel TEXT NOT NULL,
account_id TEXT NOT NULL,
kind TEXT NOT NULL CHECK (kind IN ('direct', 'group', 'channel')),
peer_id TEXT NOT NULL,
parent_conversation_id TEXT,
thread_id TEXT,
native_channel_id TEXT,
native_direct_user_id TEXT,
label TEXT,
metadata_json TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_agent_sessions_updated
ON sessions(updated_at DESC, session_id);
CREATE INDEX IF NOT EXISTS idx_agent_conversations_lookup
ON conversations(channel, account_id, kind, peer_id, thread_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_conversations_identity
ON conversations(
channel,
account_id,
kind,
peer_id,
IFNULL(parent_conversation_id, ''),
IFNULL(thread_id, '')
);
CREATE INDEX IF NOT EXISTS idx_agent_conversations_updated
ON conversations(updated_at DESC, conversation_id);
CREATE TABLE IF NOT EXISTS session_conversations (
session_id TEXT NOT NULL,
conversation_id TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'primary' CHECK (role IN ('primary', 'participant', 'related')),
first_seen_at INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
PRIMARY KEY (session_id, conversation_id, role),
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE,
FOREIGN KEY (conversation_id) REFERENCES conversations(conversation_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_agent_session_conversations_conversation
ON session_conversations(conversation_id, last_seen_at DESC, session_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_session_conversations_primary
ON session_conversations(session_id)
WHERE role = 'primary';
CREATE TABLE IF NOT EXISTS session_entries (
session_key TEXT NOT NULL PRIMARY KEY,
@@ -26,7 +108,7 @@ CREATE TABLE IF NOT EXISTS session_entries (
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_agent_session_entries_updated
CREATE INDEX IF NOT EXISTS idx_agent_session_entries_updated_at
ON session_entries(updated_at DESC, session_key);
CREATE INDEX IF NOT EXISTS idx_agent_session_entries_session_id