diff --git a/src/config/sessions/session-accessor.conformance.test.ts b/src/config/sessions/session-accessor.conformance.test.ts index 21609ff39e26..466895d438b1 100644 --- a/src/config/sessions/session-accessor.conformance.test.ts +++ b/src/config/sessions/session-accessor.conformance.test.ts @@ -2,8 +2,13 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { executeSqliteQueryTakeFirstSync, getNodeSqliteKysely } from "../../infra/kysely-sync.js"; import { onSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; -import { closeOpenClawAgentDatabasesForTest } from "../../state/openclaw-agent-db.js"; +import type { DB as OpenClawAgentKyselyDatabase } from "../../state/openclaw-agent-db.generated.js"; +import { + closeOpenClawAgentDatabasesForTest, + openOpenClawAgentDatabase, +} from "../../state/openclaw-agent-db.js"; import { closeOpenClawStateDatabaseForTest } from "../../state/openclaw-state-db.js"; import { appendTranscriptEvent, @@ -419,6 +424,28 @@ describe.each([fileBackedAdapter, sqliteAdapter])( }); if (adapter.name === "sqlite") { expect(fs.existsSync(cleanupStorePath)).toBe(false); + const database = openOpenClawAgentDatabase({ + agentId: "main", + env: { ...process.env, OPENCLAW_STATE_DIR: paths.stateDir }, + path: path.join(paths.stateDir, "agents", "main", "agent", "openclaw-agent.sqlite"), + }); + const db = getNodeSqliteKysely(database.db); + const removedRoute = executeSqliteQueryTakeFirstSync( + database.db, + db + .selectFrom("session_routes") + .select("session_id") + .where("session_key", "=", "agent:main:lifecycle-cleanup-removed"), + ); + expect(removedRoute).toBeUndefined(); + const freshRoute = executeSqliteQueryTakeFirstSync( + database.db, + db + .selectFrom("session_routes") + .select("session_id") + .where("session_key", "=", "agent:main:lifecycle-cleanup-fresh"), + ); + expect(freshRoute).toEqual({ session_id: "fresh-lifecycle" }); await expect( adapter.loadTranscriptEvents(scopedTranscript("agent:main:regular", "referenced")), ).resolves.not.toEqual([]); @@ -727,3 +754,119 @@ describe.each([fileBackedAdapter, sqliteAdapter])( }); }, ); + +describe("sqlite session normalization", () => { + let paths: TestPaths; + + beforeEach(() => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-sqlite-norm-")); + paths = { + sqlitePath: path.join(tempDir, "openclaw-agent.sqlite"), + stateDir: path.join(tempDir, "state"), + storePath: path.join(tempDir, "sessions.json"), + tempDir, + transcriptPath: path.join(tempDir, "session.jsonl"), + }; + }); + + afterEach(() => { + closeOpenClawAgentDatabasesForTest(); + closeOpenClawStateDatabaseForTest(); + fs.rmSync(paths.tempDir, { recursive: true, force: true }); + }); + + it("maintains normalized session root and route rows", async () => { + const env = { ...process.env, OPENCLAW_STATE_DIR: paths.stateDir }; + await upsertSqliteSessionEntry( + { + agentId: "main", + env, + sessionKey: "agent:main:group:example", + storePath: paths.sqlitePath, + }, + { + agentHarnessId: "codex", + chatType: "group", + channel: "discord", + deliveryContext: { + accountId: "acct-1", + channel: "discord", + threadId: "thread-1", + to: "group-1", + }, + displayName: "Example group", + endedAt: 90, + model: "gpt-5.5", + modelProvider: "openai", + parentSessionKey: "agent:main:parent", + sessionId: "normalized-session", + sessionStartedAt: 50, + spawnedBy: "agent:main:spawner", + startedAt: 60, + status: "done", + updatedAt: 100, + }, + ); + + const database = openOpenClawAgentDatabase({ + agentId: "main", + env, + path: paths.sqlitePath, + }); + const db = getNodeSqliteKysely(database.db); + const session = executeSqliteQueryTakeFirstSync( + database.db, + db + .selectFrom("sessions") + .select([ + "account_id", + "agent_harness_id", + "channel", + "chat_type", + "created_at", + "display_name", + "ended_at", + "model", + "model_provider", + "parent_session_key", + "session_key", + "session_scope", + "spawned_by", + "started_at", + "status", + "updated_at", + ]) + .where("session_id", "=", "normalized-session"), + ); + expect(session).toEqual({ + account_id: "acct-1", + agent_harness_id: "codex", + channel: "discord", + chat_type: "group", + created_at: 50, + display_name: "Example group", + ended_at: 90, + model: "gpt-5.5", + model_provider: "openai", + parent_session_key: "agent:main:parent", + session_key: "agent:main:group:example", + session_scope: "group", + spawned_by: "agent:main:spawner", + started_at: 60, + status: "done", + updated_at: expect.any(Number), + }); + + const route = executeSqliteQueryTakeFirstSync( + database.db, + db + .selectFrom("session_routes") + .select(["session_id", "updated_at"]) + .where("session_key", "=", "agent:main:group:example"), + ); + expect(route).toEqual({ + session_id: "normalized-session", + updated_at: expect.any(Number), + }); + }); +}); diff --git a/src/config/sessions/session-accessor.sqlite.ts b/src/config/sessions/session-accessor.sqlite.ts index 5ccf27425894..cf088fa4f1f5 100644 --- a/src/config/sessions/session-accessor.sqlite.ts +++ b/src/config/sessions/session-accessor.sqlite.ts @@ -45,7 +45,13 @@ import { mergeSessionEntry, mergeSessionEntryPreserveActivity } from "./types.js type SessionSqliteDatabase = Pick< OpenClawAgentKyselyDatabase, - "session_entries" | "sessions" | "transcript_event_identities" | "transcript_events" + | "conversations" + | "session_conversations" + | "session_entries" + | "session_routes" + | "sessions" + | "transcript_event_identities" + | "transcript_events" >; type SessionEntryRow = Selectable; @@ -525,6 +531,17 @@ function cloneSessionEntry(entry: SessionEntry): SessionEntry { return structuredClone(entry); } +function normalizeSqliteText(value: unknown): string | null { + return typeof value === "string" && value.trim() ? value.trim() : null; +} + +function normalizeSqliteChatType(value: unknown): "direct" | "group" | "channel" | null { + if (value === "direct" || value === "group" || value === "channel") { + return value; + } + return null; +} + function normalizeSqliteNumber(value: number | bigint): number { return typeof value === "bigint" ? Number(value) : value; } @@ -725,6 +742,10 @@ function cleanupSqliteSessionLifecycleArtifactsInTransaction( ) { continue; } + executeSqliteQuerySync( + database.db, + db.deleteFrom("session_routes").where("session_key", "=", row.session_key), + ); executeSqliteQuerySync( database.db, db.deleteFrom("session_entries").where("session_key", "=", row.session_key), @@ -759,23 +780,37 @@ function writeSessionEntry( ): void { const db = getSessionKysely(database.db); const updatedAt = entry.updatedAt; + const sessionRow = bindSqliteSessionRoot({ entry, sessionKey, updatedAt }); executeSqliteQuerySync( database.db, db .insertInto("sessions") - .values({ - session_id: entry.sessionId, - session_key: sessionKey, - created_at: entry.sessionStartedAt ?? updatedAt, - updated_at: updatedAt, - }) + .values(sessionRow) .onConflict((conflict) => conflict.column("session_id").doUpdateSet({ session_key: sessionKey, + session_scope: sessionRow.session_scope, updated_at: updatedAt, + started_at: sessionRow.started_at, + ended_at: sessionRow.ended_at, + status: sessionRow.status, + chat_type: sessionRow.chat_type, + channel: sessionRow.channel, + account_id: sessionRow.account_id, + model_provider: sessionRow.model_provider, + model: sessionRow.model, + agent_harness_id: sessionRow.agent_harness_id, + parent_session_key: sessionRow.parent_session_key, + spawned_by: sessionRow.spawned_by, + display_name: sessionRow.display_name, }), ), ); + writeSessionRoute(database, { + sessionId: sessionRow.session_id, + sessionKey, + updatedAt, + }); executeSqliteQuerySync( database.db, db @@ -809,6 +844,7 @@ function ensureTranscriptSessionRoot( .values({ session_id: scope.sessionId, session_key: scope.sessionKey, + session_scope: "conversation", created_at: updatedAt, updated_at: updatedAt, }) @@ -819,6 +855,118 @@ function ensureTranscriptSessionRoot( }), ), ); + writeSessionRoute(database, { + sessionId: scope.sessionId, + sessionKey: scope.sessionKey, + updatedAt, + }); +} + +function bindSqliteSessionRoot(params: { + entry: SessionEntry; + sessionKey: string; + updatedAt: number; +}) { + const updatedAt = Number.isFinite(params.entry.updatedAt) + ? params.entry.updatedAt + : params.updatedAt; + return { + session_id: params.entry.sessionId, + session_key: params.sessionKey, + session_scope: resolveSqliteSessionScope(params.entry, params.sessionKey), + created_at: resolveSqliteSessionCreatedAt(params.entry, updatedAt), + updated_at: updatedAt, + started_at: finiteSqliteNumber(params.entry.startedAt), + ended_at: finiteSqliteNumber(params.entry.endedAt), + status: normalizeSqliteText(params.entry.status), + chat_type: normalizeSqliteChatType(params.entry.chatType), + channel: resolveSqliteSessionChannel(params.entry), + account_id: resolveSqliteSessionAccountId(params.entry), + primary_conversation_id: null, + model_provider: normalizeSqliteText(params.entry.modelProvider), + model: normalizeSqliteText(params.entry.model), + agent_harness_id: normalizeSqliteText(params.entry.agentHarnessId), + parent_session_key: normalizeSqliteText(params.entry.parentSessionKey), + spawned_by: normalizeSqliteText(params.entry.spawnedBy), + display_name: resolveSqliteSessionDisplayName(params.entry), + }; +} + +function writeSessionRoute( + database: OpenClawAgentDatabase, + params: { sessionId: string; sessionKey: string; updatedAt: number }, +): void { + const db = getSessionKysely(database.db); + executeSqliteQuerySync( + database.db, + db + .insertInto("session_routes") + .values({ + session_key: params.sessionKey, + session_id: params.sessionId, + updated_at: params.updatedAt, + }) + .onConflict((conflict) => + conflict.column("session_key").doUpdateSet({ + session_id: params.sessionId, + updated_at: params.updatedAt, + }), + ), + ); +} + +function resolveSqliteSessionScope( + entry: Pick, + sessionKey: string, +): "conversation" | "shared-main" | "group" | "channel" { + const chatType = normalizeSqliteChatType(entry.chatType); + const normalizedKey = sessionKey.trim().toLowerCase(); + if (chatType === "direct" && (normalizedKey === "main" || normalizedKey.endsWith(":main"))) { + return "shared-main"; + } + if (chatType === "group" || chatType === "channel") { + return chatType; + } + return "conversation"; +} + +function resolveSqliteSessionCreatedAt(entry: SessionEntry, updatedAt: number): number { + for (const candidate of [entry.sessionStartedAt, entry.startedAt, entry.updatedAt, updatedAt]) { + if (typeof candidate === "number" && Number.isFinite(candidate) && candidate >= 0) { + return candidate; + } + } + return updatedAt; +} + +function finiteSqliteNumber(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function resolveSqliteSessionChannel(entry: SessionEntry): string | null { + return ( + normalizeSqliteText(entry.channel) ?? + normalizeSqliteText(entry.deliveryContext?.channel) ?? + normalizeSqliteText(entry.lastChannel) ?? + normalizeSqliteText(entry.origin?.provider) + ); +} + +function resolveSqliteSessionAccountId(entry: SessionEntry): string | null { + return ( + normalizeSqliteText(entry.deliveryContext?.accountId) ?? + normalizeSqliteText(entry.lastAccountId) ?? + normalizeSqliteText(entry.origin?.accountId) + ); +} + +function resolveSqliteSessionDisplayName(entry: SessionEntry): string | null { + return ( + normalizeSqliteText(entry.displayName) ?? + normalizeSqliteText(entry.label) ?? + normalizeSqliteText(entry.subject) ?? + normalizeSqliteText(entry.groupId) + ); } function readNextTranscriptSeq(database: OpenClawAgentDatabase, sessionId: string): number { diff --git a/src/state/openclaw-agent-db.generated.d.ts b/src/state/openclaw-agent-db.generated.d.ts index 8d0d2ad40c42..e96b06725c04 100644 --- a/src/state/openclaw-agent-db.generated.d.ts +++ b/src/state/openclaw-agent-db.generated.d.ts @@ -31,6 +31,22 @@ export interface CacheEntries { value_json: string | null; } +export interface Conversations { + account_id: string; + channel: string; + conversation_id: string; + created_at: number; + kind: string; + label: string | null; + metadata_json: string | null; + native_channel_id: string | null; + native_direct_user_id: string | null; + parent_conversation_id: string | null; + peer_id: string; + thread_id: string | null; + updated_at: number; +} + export interface SchemaMeta { agent_id: string | null; app_version: string | null; @@ -41,6 +57,14 @@ export interface SchemaMeta { updated_at: number; } +export interface SessionConversations { + conversation_id: string; + first_seen_at: number; + last_seen_at: number; + role: Generated; + session_id: string; +} + export interface SessionEntries { entry_json: string; session_id: string; @@ -48,13 +72,33 @@ export interface SessionEntries { updated_at: number; } -export interface Sessions { - created_at: number; +export interface SessionRoutes { session_id: string; session_key: string; updated_at: number; } +export interface Sessions { + account_id: string | null; + agent_harness_id: string | null; + channel: string | null; + chat_type: string | null; + created_at: number; + display_name: string | null; + ended_at: number | null; + model: string | null; + model_provider: string | null; + parent_session_key: string | null; + primary_conversation_id: string | null; + session_id: string; + session_key: string; + session_scope: Generated; + spawned_by: string | null; + started_at: number | null; + status: string | null; + updated_at: number; +} + export interface TranscriptEventIdentities { created_at: number; event_id: string; @@ -76,8 +120,11 @@ export interface DB { auth_profile_state: AuthProfileState; auth_profile_store: AuthProfileStore; cache_entries: CacheEntries; + conversations: Conversations; schema_meta: SchemaMeta; + session_conversations: SessionConversations; session_entries: SessionEntries; + session_routes: SessionRoutes; sessions: Sessions; transcript_event_identities: TranscriptEventIdentities; transcript_events: TranscriptEvents; diff --git a/src/state/openclaw-agent-db.test.ts b/src/state/openclaw-agent-db.test.ts index b5e8d3a87f1b..f4c122eaaefc 100644 --- a/src/state/openclaw-agent-db.test.ts +++ b/src/state/openclaw-agent-db.test.ts @@ -88,7 +88,7 @@ describe("openclaw agent database", () => { expect(registered).toMatchObject({ agentId: "worker-1", path: database.path, - schemaVersion: 1, + schemaVersion: 2, }); expect(registered?.sizeBytes).toBeGreaterThan(0); }); @@ -194,7 +194,7 @@ describe("openclaw agent database", () => { expect(readSqliteNumberPragma(database.db, "busy_timeout")).toBe(30_000); expect(readSqliteNumberPragma(database.db, "foreign_keys")).toBe(1); expect(readSqliteNumberPragma(database.db, "synchronous")).toBe(1); - expect(readSqliteNumberPragma(database.db, "user_version")).toBe(1); + expect(readSqliteNumberPragma(database.db, "user_version")).toBe(2); expect(readSqliteNumberPragma(database.db, "wal_autocheckpoint")).toBe(1000); const journalMode = database.db.prepare("PRAGMA journal_mode").get() as | { journal_mode?: string } @@ -217,11 +217,125 @@ describe("openclaw agent database", () => { ), ).toEqual({ role: "agent", - schema_version: 1, + schema_version: 2, agent_id: "worker-1", }); }); + it("migrates compact v1 session tables before applying normalized indexes", () => { + const stateDir = createTempStateDir(); + const databasePath = path.join( + stateDir, + "agents", + "worker-1", + "agent", + "openclaw-agent.sqlite", + ); + fs.mkdirSync(path.dirname(databasePath), { recursive: true }); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(databasePath); + db.exec(` + CREATE TABLE schema_meta ( + meta_key TEXT NOT NULL PRIMARY KEY, + role TEXT NOT NULL, + schema_version INTEGER NOT NULL, + agent_id TEXT, + app_version TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + INSERT INTO schema_meta + (meta_key, role, schema_version, agent_id, app_version, created_at, updated_at) + VALUES ('primary', 'agent', 1, 'worker-1', NULL, 1, 1); + CREATE TABLE sessions ( + session_id TEXT NOT NULL PRIMARY KEY, + session_key TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + INSERT INTO sessions (session_id, session_key, created_at, updated_at) + VALUES ('session-1', 'agent:worker-1:main', 10, 20); + CREATE TABLE session_entries ( + session_key TEXT NOT NULL PRIMARY KEY, + session_id TEXT NOT NULL, + entry_json TEXT NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE + ); + INSERT INTO session_entries (session_key, session_id, entry_json, updated_at) + VALUES ( + 'agent:worker-1:group:example', + 'session-1', + '{"sessionId":"session-1","updatedAt":20,"startedAt":11,"endedAt":19,"status":"done","chatType":"group","channel":"discord","deliveryContext":{"accountId":"acct-1"},"modelProvider":"openai","model":"gpt-5.5","agentHarnessId":"codex","parentSessionKey":"agent:worker-1:parent","spawnedBy":"agent:worker-1:spawner","displayName":"Example group"}', + 20 + ); + PRAGMA user_version = 1; + `); + db.close(); + + const database = openOpenClawAgentDatabase({ + agentId: "worker-1", + env: { OPENCLAW_STATE_DIR: stateDir }, + }); + + expect(readSqliteNumberPragma(database.db, "user_version")).toBe(2); + const session = database.db + .prepare( + ` + SELECT + account_id, + agent_harness_id, + channel, + chat_type, + display_name, + ended_at, + model, + model_provider, + parent_session_key, + session_scope, + spawned_by, + started_at, + status + FROM sessions + WHERE session_id = ? + `, + ) + .get("session-1"); + expect(session).toEqual({ + account_id: "acct-1", + agent_harness_id: "codex", + channel: "discord", + chat_type: "group", + display_name: "Example group", + ended_at: 19, + model: "gpt-5.5", + model_provider: "openai", + parent_session_key: "agent:worker-1:parent", + session_scope: "group", + spawned_by: "agent:worker-1:spawner", + started_at: 11, + status: "done", + }); + const route = database.db + .prepare("SELECT session_id, updated_at FROM session_routes WHERE session_key = ?") + .get("agent:worker-1:group:example"); + expect(route).toEqual({ + session_id: "session-1", + updated_at: 20, + }); + const sessionForeignKeys = database.db.prepare("PRAGMA foreign_key_list(sessions)").all() as + | Array<{ from?: unknown; on_delete?: unknown; table?: unknown; to?: unknown }> + | undefined; + expect(sessionForeignKeys).toContainEqual( + expect.objectContaining({ + from: "primary_conversation_id", + on_delete: "SET NULL", + table: "conversations", + to: "conversation_id", + }), + ); + }); + it("refuses to open newer per-agent schema versions", () => { const stateDir = createTempStateDir(); const databasePath = path.join( @@ -234,7 +348,7 @@ describe("openclaw agent database", () => { fs.mkdirSync(path.dirname(databasePath), { recursive: true }); const { DatabaseSync } = requireNodeSqlite(); const db = new DatabaseSync(databasePath); - db.exec("PRAGMA user_version = 2;"); + db.exec("PRAGMA user_version = 3;"); db.close(); expect(() => @@ -242,6 +356,6 @@ describe("openclaw agent database", () => { agentId: "worker-1", env: { OPENCLAW_STATE_DIR: stateDir }, }), - ).toThrow(/newer schema version 2/); + ).toThrow(/newer schema version 3/); }); }); diff --git a/src/state/openclaw-agent-db.ts b/src/state/openclaw-agent-db.ts index d944e53556ea..5aba304ef946 100644 --- a/src/state/openclaw-agent-db.ts +++ b/src/state/openclaw-agent-db.ts @@ -29,7 +29,7 @@ export { resolveOpenClawAgentSqlitePath } from "./openclaw-agent-db.paths.js"; * per pathname, protected with private file modes, and registered in the shared * OpenClaw state database for discovery and maintenance. */ -const OPENCLAW_AGENT_SCHEMA_VERSION = 1; +const OPENCLAW_AGENT_SCHEMA_VERSION = 2; const OPENCLAW_AGENT_DB_DIR_MODE = 0o700; const OPENCLAW_AGENT_DB_FILE_MODE = 0o600; const OPENCLAW_AGENT_DB_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; @@ -66,6 +66,8 @@ type ExistingSchemaMeta = { role: string | null; }; +type MigratedSessionEntry = Record; + function readSqliteUserVersion(db: DatabaseSync): number { const row = db.prepare("PRAGMA user_version").get() as { user_version?: unknown } | undefined; return Number(row?.user_version ?? 0); @@ -80,6 +82,294 @@ function assertSupportedAgentSchemaVersion(db: DatabaseSync, pathname: string): } } +function readSqliteSessionColumns(db: DatabaseSync): Set | null { + const table = db + .prepare("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?") + .get("sessions"); + if (!table) { + return null; + } + const rows = db.prepare("PRAGMA table_info(sessions)").all() as Array<{ + name?: unknown; + }>; + return new Set(rows.flatMap((row) => (typeof row.name === "string" ? [row.name] : []))); +} + +function migratedSessionColumn( + columns: ReadonlySet, + columnName: string, + fallback: string, +): string { + return columns.has(columnName) ? columnName : fallback; +} + +function migrateOpenClawAgentSchema(db: DatabaseSync): void { + const userVersion = readSqliteUserVersion(db); + if (userVersion >= OPENCLAW_AGENT_SCHEMA_VERSION) { + return; + } + const columns = readSqliteSessionColumns(db); + if (userVersion > 1 || !columns) { + return; + } + const copyColumns = [ + "session_id", + "session_key", + "session_scope", + "created_at", + "updated_at", + "started_at", + "ended_at", + "status", + "chat_type", + "channel", + "account_id", + "primary_conversation_id", + "model_provider", + "model", + "agent_harness_id", + "parent_session_key", + "spawned_by", + "display_name", + ]; + const selectColumns = [ + "session_id", + "session_key", + migratedSessionColumn(columns, "session_scope", "'conversation'"), + "created_at", + "updated_at", + migratedSessionColumn(columns, "started_at", "NULL"), + migratedSessionColumn(columns, "ended_at", "NULL"), + migratedSessionColumn(columns, "status", "NULL"), + migratedSessionColumn(columns, "chat_type", "NULL"), + migratedSessionColumn(columns, "channel", "NULL"), + migratedSessionColumn(columns, "account_id", "NULL"), + migratedSessionColumn(columns, "primary_conversation_id", "NULL"), + migratedSessionColumn(columns, "model_provider", "NULL"), + migratedSessionColumn(columns, "model", "NULL"), + migratedSessionColumn(columns, "agent_harness_id", "NULL"), + migratedSessionColumn(columns, "parent_session_key", "NULL"), + migratedSessionColumn(columns, "spawned_by", "NULL"), + migratedSessionColumn(columns, "display_name", "NULL"), + ]; + db.exec(` + CREATE TABLE IF NOT EXISTS conversations ( + conversation_id TEXT NOT NULL PRIMARY KEY, + channel TEXT NOT NULL, + account_id TEXT NOT NULL, + kind TEXT NOT NULL CHECK (kind IN ('direct', 'group', 'channel')), + peer_id TEXT NOT NULL, + parent_conversation_id TEXT, + thread_id TEXT, + native_channel_id TEXT, + native_direct_user_id TEXT, + label TEXT, + metadata_json TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + `); + db.exec("PRAGMA foreign_keys = OFF;"); + try { + db.exec(` + DROP TABLE IF EXISTS sessions_new; + CREATE TABLE sessions_new ( + session_id TEXT NOT NULL PRIMARY KEY, + session_key TEXT NOT NULL, + session_scope TEXT NOT NULL DEFAULT 'conversation' CHECK (session_scope IN ('conversation', 'shared-main', 'group', 'channel')), + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + started_at INTEGER, + ended_at INTEGER, + status TEXT CHECK (status IS NULL OR status IN ('running', 'done', 'failed', 'killed', 'timeout')), + chat_type TEXT CHECK (chat_type IS NULL OR chat_type IN ('direct', 'group', 'channel')), + channel TEXT, + account_id TEXT, + primary_conversation_id TEXT, + model_provider TEXT, + model TEXT, + agent_harness_id TEXT, + parent_session_key TEXT, + spawned_by TEXT, + display_name TEXT, + FOREIGN KEY (primary_conversation_id) REFERENCES conversations(conversation_id) ON DELETE SET NULL + ); + INSERT INTO sessions_new (${copyColumns.join(", ")}) + SELECT ${selectColumns.join(", ")} FROM sessions; + DROP TABLE sessions; + ALTER TABLE sessions_new RENAME TO sessions; + `); + } finally { + db.exec("PRAGMA foreign_keys = ON;"); + } +} + +function parseMigratedSessionEntry(value: unknown): MigratedSessionEntry | null { + if (typeof value !== "string") { + return null; + } + try { + const parsed = JSON.parse(value) as unknown; + return parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? (parsed as MigratedSessionEntry) + : null; + } catch { + return null; + } +} + +function migratedObjectField( + entry: MigratedSessionEntry, + key: string, +): MigratedSessionEntry | null { + const value = entry[key]; + return value && typeof value === "object" && !Array.isArray(value) + ? (value as MigratedSessionEntry) + : null; +} + +function migratedText(value: unknown): string | null { + return typeof value === "string" && value.trim() ? value.trim() : null; +} + +function migratedNumber(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function migratedChatType(value: unknown): "direct" | "group" | "channel" | null { + if (value === "direct" || value === "group" || value === "channel") { + return value; + } + return null; +} + +function migratedStatus( + value: unknown, +): "running" | "done" | "failed" | "killed" | "timeout" | null { + if ( + value === "running" || + value === "done" || + value === "failed" || + value === "killed" || + value === "timeout" + ) { + return value; + } + return null; +} + +function migratedSessionScope( + entry: MigratedSessionEntry, + sessionKey: string, +): "conversation" | "shared-main" | "group" | "channel" { + const chatType = migratedChatType(entry.chatType); + const normalizedKey = sessionKey.trim().toLowerCase(); + if (chatType === "direct" && (normalizedKey === "main" || normalizedKey.endsWith(":main"))) { + return "shared-main"; + } + if (chatType === "group" || chatType === "channel") { + return chatType; + } + return "conversation"; +} + +function migratedEntryChannel(entry: MigratedSessionEntry): string | null { + const deliveryContext = migratedObjectField(entry, "deliveryContext"); + const origin = migratedObjectField(entry, "origin"); + return ( + migratedText(entry.channel) ?? + migratedText(deliveryContext?.channel) ?? + migratedText(entry.lastChannel) ?? + migratedText(origin?.provider) + ); +} + +function migratedEntryAccountId(entry: MigratedSessionEntry): string | null { + const deliveryContext = migratedObjectField(entry, "deliveryContext"); + const origin = migratedObjectField(entry, "origin"); + return ( + migratedText(deliveryContext?.accountId) ?? + migratedText(entry.lastAccountId) ?? + migratedText(origin?.accountId) + ); +} + +function migratedEntryDisplayName(entry: MigratedSessionEntry): string | null { + return ( + migratedText(entry.displayName) ?? + migratedText(entry.label) ?? + migratedText(entry.subject) ?? + migratedText(entry.groupId) + ); +} + +function backfillOpenClawAgentSchema(db: DatabaseSync, previousVersion: number): void { + if (previousVersion >= 2) { + return; + } + db.exec(` + INSERT OR REPLACE INTO session_routes (session_key, session_id, updated_at) + SELECT se.session_key, se.session_id, se.updated_at + FROM session_entries AS se + INNER JOIN sessions AS s ON s.session_id = se.session_id; + `); + const rows = db + .prepare( + ` + SELECT se.session_key, se.session_id, se.entry_json + FROM session_entries AS se + INNER JOIN sessions AS s ON s.session_id = se.session_id; + `, + ) + .all() as Array<{ + entry_json?: unknown; + session_id?: unknown; + session_key?: unknown; + }>; + const update = db.prepare(` + UPDATE sessions + SET + session_scope = ?, + started_at = ?, + ended_at = ?, + status = ?, + chat_type = ?, + channel = ?, + account_id = ?, + model_provider = ?, + model = ?, + agent_harness_id = ?, + parent_session_key = ?, + spawned_by = ?, + display_name = ? + WHERE session_id = ?; + `); + for (const row of rows) { + const sessionKey = migratedText(row.session_key); + const sessionId = migratedText(row.session_id); + const entry = parseMigratedSessionEntry(row.entry_json); + if (!sessionKey || !sessionId || !entry) { + continue; + } + update.run( + migratedSessionScope(entry, sessionKey), + migratedNumber(entry.startedAt), + migratedNumber(entry.endedAt), + migratedStatus(entry.status), + migratedChatType(entry.chatType), + migratedEntryChannel(entry), + migratedEntryAccountId(entry), + migratedText(entry.modelProvider), + migratedText(entry.model), + migratedText(entry.agentHarnessId), + migratedText(entry.parentSessionKey), + migratedText(entry.spawnedBy), + migratedEntryDisplayName(entry), + sessionId, + ); + } +} + function ensureOpenClawAgentDatabasePermissions( pathname: string, options: OpenClawAgentDatabaseOptions, @@ -150,7 +440,10 @@ function assertExistingSchemaOwner( function ensureAgentSchema(db: DatabaseSync, agentId: string, pathname: string): void { assertSupportedAgentSchemaVersion(db, pathname); assertExistingSchemaOwner(readExistingSchemaMeta(db), agentId, pathname); + const previousVersion = readSqliteUserVersion(db); + migrateOpenClawAgentSchema(db); db.exec(OPENCLAW_AGENT_SCHEMA_SQL); + backfillOpenClawAgentSchema(db, previousVersion); const kysely = getNodeSqliteKysely(db); db.exec(`PRAGMA user_version = ${OPENCLAW_AGENT_SCHEMA_VERSION};`); const now = Date.now(); diff --git a/src/state/openclaw-agent-schema.generated.ts b/src/state/openclaw-agent-schema.generated.ts index 39df7877f955..0e1988458f6c 100644 --- a/src/state/openclaw-agent-schema.generated.ts +++ b/src/state/openclaw-agent-schema.generated.ts @@ -16,12 +16,94 @@ export const OPENCLAW_AGENT_SCHEMA_SQL = `CREATE TABLE IF NOT EXISTS schema_meta CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT NOT NULL PRIMARY KEY, session_key TEXT NOT NULL, + session_scope TEXT NOT NULL DEFAULT 'conversation' CHECK (session_scope IN ('conversation', 'shared-main', 'group', 'channel')), + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + started_at INTEGER, + ended_at INTEGER, + status TEXT CHECK (status IS NULL OR status IN ('running', 'done', 'failed', 'killed', 'timeout')), + chat_type TEXT CHECK (chat_type IS NULL OR chat_type IN ('direct', 'group', 'channel')), + channel TEXT, + account_id TEXT, + primary_conversation_id TEXT, + model_provider TEXT, + model TEXT, + agent_harness_id TEXT, + parent_session_key TEXT, + spawned_by TEXT, + display_name TEXT, + FOREIGN KEY (primary_conversation_id) REFERENCES conversations(conversation_id) ON DELETE SET NULL +); + +CREATE INDEX IF NOT EXISTS idx_agent_sessions_updated_at + ON sessions(updated_at DESC, session_id); + +CREATE INDEX IF NOT EXISTS idx_agent_sessions_created_at + ON sessions(created_at DESC, session_id); + +CREATE INDEX IF NOT EXISTS idx_agent_sessions_conversation + ON sessions(primary_conversation_id, updated_at DESC, session_id) + WHERE primary_conversation_id IS NOT NULL; + +CREATE TABLE IF NOT EXISTS session_routes ( + session_key TEXT NOT NULL PRIMARY KEY, + session_id TEXT NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_agent_session_routes_session_id + ON session_routes(session_id); + +CREATE TABLE IF NOT EXISTS conversations ( + conversation_id TEXT NOT NULL PRIMARY KEY, + channel TEXT NOT NULL, + account_id TEXT NOT NULL, + kind TEXT NOT NULL CHECK (kind IN ('direct', 'group', 'channel')), + peer_id TEXT NOT NULL, + parent_conversation_id TEXT, + thread_id TEXT, + native_channel_id TEXT, + native_direct_user_id TEXT, + label TEXT, + metadata_json TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL ); -CREATE INDEX IF NOT EXISTS idx_agent_sessions_updated - ON sessions(updated_at DESC, session_id); +CREATE INDEX IF NOT EXISTS idx_agent_conversations_lookup + ON conversations(channel, account_id, kind, peer_id, thread_id); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_conversations_identity + ON conversations( + channel, + account_id, + kind, + peer_id, + IFNULL(parent_conversation_id, ''), + IFNULL(thread_id, '') + ); + +CREATE INDEX IF NOT EXISTS idx_agent_conversations_updated + ON conversations(updated_at DESC, conversation_id); + +CREATE TABLE IF NOT EXISTS session_conversations ( + session_id TEXT NOT NULL, + conversation_id TEXT NOT NULL, + role TEXT NOT NULL DEFAULT 'primary' CHECK (role IN ('primary', 'participant', 'related')), + first_seen_at INTEGER NOT NULL, + last_seen_at INTEGER NOT NULL, + PRIMARY KEY (session_id, conversation_id, role), + FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE, + FOREIGN KEY (conversation_id) REFERENCES conversations(conversation_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_agent_session_conversations_conversation + ON session_conversations(conversation_id, last_seen_at DESC, session_id); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_session_conversations_primary + ON session_conversations(session_id) + WHERE role = 'primary'; CREATE TABLE IF NOT EXISTS session_entries ( session_key TEXT NOT NULL PRIMARY KEY, @@ -31,7 +113,7 @@ CREATE TABLE IF NOT EXISTS session_entries ( FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE ); -CREATE INDEX IF NOT EXISTS idx_agent_session_entries_updated +CREATE INDEX IF NOT EXISTS idx_agent_session_entries_updated_at ON session_entries(updated_at DESC, session_key); CREATE INDEX IF NOT EXISTS idx_agent_session_entries_session_id diff --git a/src/state/openclaw-agent-schema.sql b/src/state/openclaw-agent-schema.sql index a759e832f5f2..610ee459d45a 100644 --- a/src/state/openclaw-agent-schema.sql +++ b/src/state/openclaw-agent-schema.sql @@ -11,12 +11,94 @@ CREATE TABLE IF NOT EXISTS schema_meta ( CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT NOT NULL PRIMARY KEY, session_key TEXT NOT NULL, + session_scope TEXT NOT NULL DEFAULT 'conversation' CHECK (session_scope IN ('conversation', 'shared-main', 'group', 'channel')), + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + started_at INTEGER, + ended_at INTEGER, + status TEXT CHECK (status IS NULL OR status IN ('running', 'done', 'failed', 'killed', 'timeout')), + chat_type TEXT CHECK (chat_type IS NULL OR chat_type IN ('direct', 'group', 'channel')), + channel TEXT, + account_id TEXT, + primary_conversation_id TEXT, + model_provider TEXT, + model TEXT, + agent_harness_id TEXT, + parent_session_key TEXT, + spawned_by TEXT, + display_name TEXT, + FOREIGN KEY (primary_conversation_id) REFERENCES conversations(conversation_id) ON DELETE SET NULL +); + +CREATE INDEX IF NOT EXISTS idx_agent_sessions_updated_at + ON sessions(updated_at DESC, session_id); + +CREATE INDEX IF NOT EXISTS idx_agent_sessions_created_at + ON sessions(created_at DESC, session_id); + +CREATE INDEX IF NOT EXISTS idx_agent_sessions_conversation + ON sessions(primary_conversation_id, updated_at DESC, session_id) + WHERE primary_conversation_id IS NOT NULL; + +CREATE TABLE IF NOT EXISTS session_routes ( + session_key TEXT NOT NULL PRIMARY KEY, + session_id TEXT NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_agent_session_routes_session_id + ON session_routes(session_id); + +CREATE TABLE IF NOT EXISTS conversations ( + conversation_id TEXT NOT NULL PRIMARY KEY, + channel TEXT NOT NULL, + account_id TEXT NOT NULL, + kind TEXT NOT NULL CHECK (kind IN ('direct', 'group', 'channel')), + peer_id TEXT NOT NULL, + parent_conversation_id TEXT, + thread_id TEXT, + native_channel_id TEXT, + native_direct_user_id TEXT, + label TEXT, + metadata_json TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL ); -CREATE INDEX IF NOT EXISTS idx_agent_sessions_updated - ON sessions(updated_at DESC, session_id); +CREATE INDEX IF NOT EXISTS idx_agent_conversations_lookup + ON conversations(channel, account_id, kind, peer_id, thread_id); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_conversations_identity + ON conversations( + channel, + account_id, + kind, + peer_id, + IFNULL(parent_conversation_id, ''), + IFNULL(thread_id, '') + ); + +CREATE INDEX IF NOT EXISTS idx_agent_conversations_updated + ON conversations(updated_at DESC, conversation_id); + +CREATE TABLE IF NOT EXISTS session_conversations ( + session_id TEXT NOT NULL, + conversation_id TEXT NOT NULL, + role TEXT NOT NULL DEFAULT 'primary' CHECK (role IN ('primary', 'participant', 'related')), + first_seen_at INTEGER NOT NULL, + last_seen_at INTEGER NOT NULL, + PRIMARY KEY (session_id, conversation_id, role), + FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE, + FOREIGN KEY (conversation_id) REFERENCES conversations(conversation_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_agent_session_conversations_conversation + ON session_conversations(conversation_id, last_seen_at DESC, session_id); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_session_conversations_primary + ON session_conversations(session_id) + WHERE role = 'primary'; CREATE TABLE IF NOT EXISTS session_entries ( session_key TEXT NOT NULL PRIMARY KEY, @@ -26,7 +108,7 @@ CREATE TABLE IF NOT EXISTS session_entries ( FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE ); -CREATE INDEX IF NOT EXISTS idx_agent_session_entries_updated +CREATE INDEX IF NOT EXISTS idx_agent_session_entries_updated_at ON session_entries(updated_at DESC, session_key); CREATE INDEX IF NOT EXISTS idx_agent_session_entries_session_id