Compare commits

...

1 Commits

Author SHA1 Message Date
Josh Lehman
78a7ac9826 fix: make sqlite session entry writes atomic 2026-06-08 12:09:37 -07:00
9 changed files with 210 additions and 18 deletions

View File

@@ -61,7 +61,9 @@ export async function persistSessionEntry(
},
{
resolveSingleEntryPersistence: (entry) =>
entry ? { sessionKey: params.sessionKey, entry } : null,
entry && !params.shouldPersist && (params.clearedFields?.length ?? 0) === 0
? { sessionKey: params.sessionKey, entry, patch: params.entry }
: null,
takeCacheOwnership: true,
},
);

View File

@@ -227,12 +227,16 @@ describe("updateSessionStoreAfterAgentRun", () => {
sessionId,
updatedAt: 2,
} as SessionEntry),
).toEqual({
).toMatchObject({
sessionKey,
entry: {
sessionId,
updatedAt: 2,
},
patch: {
model: "gpt-5.5",
modelProvider: "openai",
},
});
});
});

View File

@@ -46,13 +46,24 @@ function resolvePositiveInteger(value: number | undefined): number | undefined {
return Math.floor(value);
}
function removeLifecycleStateFromMetadataPatch(entry: SessionEntry): SessionEntry {
const next = { ...entry };
delete next.status;
delete next.startedAt;
delete next.endedAt;
delete next.runtimeMs;
return next;
const TRANSIENT_LIFECYCLE_SESSION_FIELDS = new Set(["status", "startedAt", "endedAt", "runtimeMs"]);
function sameJsonValue(left: unknown, right: unknown): boolean {
return JSON.stringify(left) === JSON.stringify(right);
}
function deriveRunMetadataPatch(previous: SessionEntry, next: SessionEntry): Partial<SessionEntry> {
const patch: Record<string, unknown> = {};
const keys = new Set([...Object.keys(previous), ...Object.keys(next)]);
for (const key of keys) {
if (TRANSIENT_LIFECYCLE_SESSION_FIELDS.has(key)) {
continue;
}
if (!sameJsonValue(previous[key as keyof SessionEntry], next[key as keyof SessionEntry])) {
patch[key] = next[key as keyof SessionEntry];
}
}
return patch as Partial<SessionEntry>;
}
/** Applies run result metadata, usage, and CLI bindings to a session entry. */
@@ -291,7 +302,7 @@ export async function updateSessionStoreAfterAgentRun(params: {
updatedAt: next.updatedAt,
...(touchInteraction ? { lastInteractionAt: next.lastInteractionAt } : {}),
}
: removeLifecycleStateFromMetadataPatch(next);
: deriveRunMetadataPatch(entry, next);
const maintenanceConfig = resolveMaintenanceConfigFromInput(cfg.session?.maintenance);
const persisted = await updateSessionStore(
storePath,
@@ -307,7 +318,7 @@ export async function updateSessionStoreAfterAgentRun(params: {
takeCacheOwnership: true,
maintenanceConfig,
resolveSingleEntryPersistence: (entryLocal) =>
entryLocal ? { sessionKey, entry: entryLocal } : undefined,
entryLocal ? { sessionKey, entry: entryLocal, patch: metadataPatch } : undefined,
},
);
if (persisted) {

View File

@@ -24,7 +24,7 @@ export async function persistSessionEntry(params: CommandParams): Promise<boolea
},
{
resolveSingleEntryPersistence: (entry) =>
entry ? { sessionKey: params.sessionKey, entry } : null,
entry ? { sessionKey: params.sessionKey, entry, patch: params.sessionEntry } : null,
skipMaintenance: true,
},
);
@@ -50,6 +50,12 @@ export async function persistAbortTargetEntry(params: {
sessionStore[key] = entry;
if (storePath) {
const patch: Partial<SessionEntry> = {
abortedLastRun: true,
abortCutoffMessageSid: abortCutoff?.messageSid,
abortCutoffTimestamp: abortCutoff?.timestamp,
updatedAt: entry.updatedAt,
};
await updateSessionStore(
storePath,
(store) => {
@@ -65,7 +71,7 @@ export async function persistAbortTargetEntry(params: {
},
{
resolveSingleEntryPersistence: (updated) =>
updated ? { sessionKey: key, entry: updated } : null,
updated ? { sessionKey: key, entry: updated, patch } : null,
},
);
}

View File

@@ -52,7 +52,9 @@ async function persistSessionEntryUpdate(params: {
},
{
resolveSingleEntryPersistence: (entry) =>
entry && params.sessionKey ? { sessionKey: params.sessionKey, entry } : null,
entry && params.sessionKey
? { sessionKey: params.sessionKey, entry, patch: params.nextEntry }
: null,
},
);
}

View File

@@ -1048,6 +1048,41 @@ describe("sessions", () => {
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
});
it("updateSessionStoreEntry preserves external SQLite writes made while callback runs", async () => {
const mainSessionKey = "agent:main:main";
const { storePath } = await createSessionStoreFixture({
prefix: "updateSessionStoreEntry-atomic-reread",
entries: {
[mainSessionKey]: {
sessionId: "sess-1",
updatedAt: 123,
thinkingLevel: "low",
},
},
});
await updateSessionStoreEntry({
storePath,
sessionKey: mainSessionKey,
update: async (entry) => {
expect(entry.providerOverride).toBeUndefined();
replaceSqliteSessionStoreBehindCache(storePath, {
[mainSessionKey]: {
sessionId: "sess-1",
updatedAt: 124,
thinkingLevel: "low",
providerOverride: "anthropic",
},
});
return { thinkingLevel: "high" };
},
});
const store = loadSessionStore(storePath, { skipCache: true });
expect(store[mainSessionKey]?.providerOverride).toBe("anthropic");
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
});
it("updateSessionStoreEntry can skip maintenance for existing-entry metadata writes", async () => {
const mainSessionKey = "agent:main:main";
const staleSessionKey = "agent:main:stale";

View File

@@ -22,7 +22,11 @@ import {
resolveAgentIdFromSessionStorePath,
resolveStateDirFromSessionStorePath,
} from "./paths.js";
import type { SessionEntry } from "./types.js";
import {
mergeSessionEntry,
mergeSessionEntryPreserveActivity,
type SessionEntry,
} from "./types.js";
const SESSION_STORE_SCOPE = "session_entries";
@@ -176,6 +180,69 @@ export function replaceSqliteSessionStore(
database.walMaintenance.checkpoint();
}
export type PatchSqliteSessionEntryMode = "merge" | "preserve-activity" | "replace";
export function patchSqliteSessionEntry(params: {
storePath: string;
sessionKey: string;
fallbackEntry: SessionEntry;
patch: Partial<SessionEntry>;
mode?: PatchSqliteSessionEntryMode;
}): SessionEntry {
const databaseOptions = resolveSessionStoreDatabaseOptions(params.storePath);
let persisted = params.fallbackEntry;
runOpenClawAgentWriteTransaction((database) => {
const db = getNodeSqliteKysely<SessionStoreDatabase>(database.db);
const row =
executeSqliteQueryTakeFirstSync(
database.db,
db
.selectFrom("cache_entries")
.select(["value_json"])
.where("scope", "=", SESSION_STORE_SCOPE)
.where("key", "=", params.sessionKey),
) ?? null;
const current = row ? parseSessionEntryValue(row.value_json) : undefined;
persisted =
params.mode === "replace"
? params.fallbackEntry
: current
? params.mode === "preserve-activity"
? mergeSessionEntryPreserveActivity(current, params.patch)
: mergeSessionEntry(current, params.patch)
: params.fallbackEntry;
const valueJson = JSON.stringify(persisted);
persisted = parseSessionEntryValue(valueJson) ?? persisted;
const updatedAt =
typeof persisted.updatedAt === "number" && Number.isFinite(persisted.updatedAt)
? persisted.updatedAt
: Date.now();
executeSqliteQuerySync(
database.db,
db
.insertInto("cache_entries")
.values({
scope: SESSION_STORE_SCOPE,
key: params.sessionKey,
value_json: valueJson,
blob: null,
expires_at: null,
updated_at: updatedAt,
})
.onConflict((conflict) =>
conflict.columns(["scope", "key"]).doUpdateSet({
value_json: valueJson,
blob: null,
expires_at: null,
updated_at: updatedAt,
}),
),
);
}, databaseOptions);
openOpenClawAgentDatabase(databaseOptions).walMaintenance.checkpoint();
return persisted;
}
export function clearExistingSqliteSessionStore(
storePath: string,
opts?: { compact?: boolean },

View File

@@ -48,7 +48,7 @@ import {
type ResolvedSessionMaintenanceConfig,
type SessionMaintenanceWarning,
} from "./store-maintenance.js";
import { replaceSqliteSessionStore } from "./store-sqlite.js";
import { patchSqliteSessionEntry, replaceSqliteSessionStore } from "./store-sqlite.js";
import { runExclusiveSessionStoreWrite } from "./store-writer.js";
import {
mergeSessionEntry,
@@ -174,6 +174,8 @@ type UpdateSessionStoreOptions<T> = SaveSessionStoreOptions & {
type SingleEntryPersistencePatch = {
sessionKey: string;
entry: SessionEntry;
patch: Partial<SessionEntry>;
mode?: "merge" | "preserve-activity" | "replace";
};
type SessionEntryWorkflowOptions = {
@@ -307,6 +309,21 @@ function sessionEntriesHaveSameSerializedForm(
return previous !== undefined && JSON.stringify(previous) === JSON.stringify(next);
}
function reconcileFreshSessionStoreForCache(params: {
writerStore: Record<string, SessionEntry>;
freshStore: Record<string, SessionEntry>;
}): Record<string, SessionEntry> {
const reconciled: Record<string, SessionEntry> = {};
for (const [key, freshEntry] of Object.entries(params.freshStore)) {
const writerEntry = params.writerStore[key];
reconciled[key] =
writerEntry && sessionEntriesHaveSameSerializedForm(writerEntry, freshEntry)
? writerEntry
: freshEntry;
}
return reconciled;
}
async function saveSessionStoreUnlocked(
storePath: string,
store: Record<string, SessionEntry>,
@@ -460,6 +477,30 @@ async function saveSessionStoreUnlocked(
return;
}
if (opts?.singleEntryPersistence && !maintenanceChangedStore) {
const persisted = patchSqliteSessionEntry({
storePath,
sessionKey: opts.singleEntryPersistence.sessionKey,
fallbackEntry: opts.singleEntryPersistence.entry,
patch: opts.singleEntryPersistence.patch,
mode: opts.singleEntryPersistence.mode,
});
store[opts.singleEntryPersistence.sessionKey] = persisted;
const freshStore = loadSessionStore(storePath, { skipCache: true, clone: false });
const cacheStore = reconcileFreshSessionStoreForCache({
writerStore: store,
freshStore,
});
store[opts.singleEntryPersistence.sessionKey] =
cacheStore[opts.singleEntryPersistence.sessionKey] ?? persisted;
updateSessionStoreWriteCache({
storePath,
store: cacheStore,
takeOwnership: true,
});
return;
}
replaceSqliteSessionStore(storePath, store, { compact: maintenanceChangedStore });
updateSessionStoreWriteCache({
storePath,
@@ -558,6 +599,8 @@ async function persistResolvedSessionEntry(params: {
store: Record<string, SessionEntry>;
resolved: ReturnType<typeof resolveSessionStoreEntry>;
next: SessionEntry;
patch?: Partial<SessionEntry>;
persistenceMode?: "merge" | "preserve-activity" | "replace";
skipMaintenance?: boolean;
takeCacheOwnership?: boolean;
returnDetached?: boolean;
@@ -576,11 +619,17 @@ async function persistResolvedSessionEntry(params: {
skipSerializeForUnchangedStore: entryUnchanged,
singleEntryPersistence:
params.resolved.legacyKeys.length === 0 && params.resolved.existing
? { sessionKey: params.resolved.normalizedKey, entry: next }
? {
sessionKey: params.resolved.normalizedKey,
entry: next,
patch: params.patch ?? next,
mode: params.persistenceMode,
}
: undefined,
takeCacheOwnership: params.takeCacheOwnership,
});
return entryUnchanged || params.returnDetached ? cloneSessionEntry(next) : next;
const persisted = params.store[params.resolved.normalizedKey] ?? next;
return entryUnchanged || params.returnDetached ? cloneSessionEntry(persisted) : persisted;
}
export async function updateSessionStoreEntry(params: {
@@ -610,6 +659,7 @@ export async function updateSessionStoreEntry(params: {
store,
resolved,
next,
patch,
skipMaintenance: params.skipMaintenance,
takeCacheOwnership: params.takeCacheOwnership ?? true,
returnDetached: params.takeCacheOwnership !== true,
@@ -638,6 +688,7 @@ export async function applySessionStoreEntryPatch(params: {
store,
resolved,
next,
patch,
skipMaintenance: params.skipMaintenance,
takeCacheOwnership: params.takeCacheOwnership ?? true,
returnDetached: params.takeCacheOwnership !== true,
@@ -678,6 +729,12 @@ export async function patchSessionEntry(
store,
resolved,
next,
patch,
persistenceMode: params.replaceEntry
? "replace"
: params.preserveActivity
? "preserve-activity"
: "merge",
takeCacheOwnership: true,
returnDetached: true,
});
@@ -700,6 +757,8 @@ export async function upsertSessionEntry(
store,
resolved,
next,
patch: next,
persistenceMode: "replace",
takeCacheOwnership: true,
});
});
@@ -758,6 +817,8 @@ export async function recordSessionMetaFromInbound(params: {
store,
resolved,
next,
patch,
persistenceMode: existing ? "preserve-activity" : "merge",
takeCacheOwnership: true,
returnDetached: true,
});
@@ -855,6 +916,8 @@ export async function updateLastRoute(params: {
store,
resolved,
next,
patch: metaPatch ? { ...basePatch, ...metaPatch } : basePatch,
persistenceMode: "preserve-activity",
takeCacheOwnership: true,
returnDetached: true,
});

View File

@@ -2000,6 +2000,7 @@ export const agentHandlers: GatewayRequestHandlers = {
| {
sessionKey: string;
entry: SessionEntry;
patch: Partial<SessionEntry>;
}
| undefined;
const persisted = await updateSessionStore(
@@ -2054,6 +2055,7 @@ export const agentHandlers: GatewayRequestHandlers = {
? {
sessionKey: primaryKey,
entry: merged,
patch: effectivePatch,
}
: undefined;
return merged;