diff --git a/src/agents/command/session-store.ts b/src/agents/command/session-store.ts index 72e8488b321f..004d9df6fe5e 100644 --- a/src/agents/command/session-store.ts +++ b/src/agents/command/session-store.ts @@ -13,6 +13,7 @@ import { updateSessionStore, rewriteSessionFileForNewSessionId, } from "../../config/sessions.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import { resolveMaintenanceConfigFromInput } from "../../config/sessions/store-maintenance.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; @@ -332,13 +333,11 @@ export async function clearCliSessionInStore(params: { clearCliSession(next, provider); next.updatedAt = Date.now(); - const persisted = await updateSessionStore(storePath, (store) => { - const merged = mergeSessionEntry(store[sessionKey], next); - store[sessionKey] = merged; - return merged; - }); - sessionStore[sessionKey] = persisted; - return persisted; + const persisted = await updateSessionEntry({ storePath, sessionKey }, () => next); + if (persisted) { + sessionStore[sessionKey] = persisted; + } + return persisted ?? undefined; } /** Records CLI compaction metadata on the persisted session entry. */ @@ -401,13 +400,11 @@ export async function recordCliCompactionInStore(params: { next.cacheWrite = undefined; } - const persisted = await updateSessionStore(storePath, (store) => { - const merged = mergeSessionEntry(store[sessionKey], next); - store[sessionKey] = merged; - return merged; - }); - sessionStore[sessionKey] = persisted; - return persisted; + const persisted = await updateSessionEntry({ storePath, sessionKey }, () => next); + if (persisted) { + sessionStore[sessionKey] = persisted; + } + return persisted ?? undefined; } function resolveCompactionSessionFile(params: { diff --git a/src/agents/embedded-agent-runner/run.ts b/src/agents/embedded-agent-runner/run.ts index 529155ed5688..1ac928bfefda 100644 --- a/src/agents/embedded-agent-runner/run.ts +++ b/src/agents/embedded-agent-runner/run.ts @@ -8,7 +8,8 @@ import { sanitizeForLog } from "../../../packages/terminal-core/src/ansi.js"; import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; -import { resolveStorePath, updateSessionStoreEntry } from "../../config/sessions.js"; +import { resolveStorePath } from "../../config/sessions.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import { ensureContextEnginesInitialized } from "../../context-engine/init.js"; import { resolveContextEngine, @@ -239,12 +240,12 @@ async function resetNoRealConversationTokenSnapshot(params: { } const storePath = resolveStorePath(params.config?.session?.store, { agentId: params.agentId }); try { - await updateSessionStoreEntry({ - storePath, - sessionKey: params.sessionKey, - skipMaintenance: true, - takeCacheOwnership: true, - update: async () => ({ + await updateSessionEntry( + { + storePath, + sessionKey: params.sessionKey, + }, + async () => ({ totalTokens: 0, totalTokensFresh: true, inputTokens: undefined, @@ -254,7 +255,11 @@ async function resetNoRealConversationTokenSnapshot(params: { contextBudgetStatus: undefined, updatedAt: Date.now(), }), - }); + { + skipMaintenance: true, + takeCacheOwnership: true, + }, + ); } catch (err) { log.warn( `[context-overflow-precheck] failed to reset stale context snapshot for ` + diff --git a/src/agents/embedded-agent-runner/run/attempt.ts b/src/agents/embedded-agent-runner/run/attempt.ts index aa07837851b4..6e8baf680e1b 100644 --- a/src/agents/embedded-agent-runner/run/attempt.ts +++ b/src/agents/embedded-agent-runner/run/attempt.ts @@ -12,11 +12,8 @@ import { filterHeartbeatTranscriptArtifacts } from "../../../auto-reply/heartbea import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../../../auto-reply/tokens.js"; import { getRuntimeConfig } from "../../../config/config.js"; import { resolveStorePath } from "../../../config/sessions/paths.js"; -import { - loadSessionStore, - runQuotaSuspensionMaintenance, - updateSessionStoreEntry, -} from "../../../config/sessions/store.js"; +import { updateSessionEntry } from "../../../config/sessions/session-accessor.js"; +import { loadSessionStore, runQuotaSuspensionMaintenance } from "../../../config/sessions/store.js"; import { bindOwnedSessionTranscriptWrites, withOwnedSessionTranscriptWrites, @@ -2892,12 +2889,12 @@ export async function runEmbeddedAttempt( activeSubagents: subagents, }); validated.push(handoffMsg); - await updateSessionStoreEntry({ - storePath, - sessionKey: params.sessionKey, - skipMaintenance: true, - takeCacheOwnership: true, - update: async (entry) => { + await updateSessionEntry( + { + storePath, + sessionKey: params.sessionKey, + }, + async (entry) => { if (entry.quotaSuspension?.state !== "resuming") { return null; } @@ -2905,7 +2902,11 @@ export async function runEmbeddedAttempt( quotaSuspension: { ...entry.quotaSuspension, state: "active" }, }; }, - }); + { + skipMaintenance: true, + takeCacheOwnership: true, + }, + ); } } diff --git a/src/auto-reply/reply/abort-cutoff.runtime.ts b/src/auto-reply/reply/abort-cutoff.runtime.ts index 52d4ca8f78f7..ba0cf3603a66 100644 --- a/src/auto-reply/reply/abort-cutoff.runtime.ts +++ b/src/auto-reply/reply/abort-cutoff.runtime.ts @@ -1,5 +1,5 @@ /** Runtime persistence helper for clearing abort-cutoff state from sessions. */ -import { updateSessionStore } from "../../config/sessions/store.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import type { SessionEntry } from "../../config/sessions/types.js"; import { applyAbortCutoffToSessionEntry, hasAbortCutoff } from "./abort-cutoff.js"; @@ -20,15 +20,11 @@ export async function clearAbortCutoffInSessionRuntime(params: { sessionStore[sessionKey] = sessionEntry; if (storePath) { - await updateSessionStore(storePath, (store) => { - const existing = store[sessionKey] ?? sessionEntry; - if (!existing) { - return; - } - applyAbortCutoffToSessionEntry(existing, undefined); - existing.updatedAt = Date.now(); - store[sessionKey] = existing; - }); + await updateSessionEntry({ storePath, sessionKey }, () => ({ + abortCutoffMessageSid: undefined, + abortCutoffTimestamp: undefined, + updatedAt: Date.now(), + })); } return true; diff --git a/src/auto-reply/reply/agent-runner-cli-dispatch.ts b/src/auto-reply/reply/agent-runner-cli-dispatch.ts index bbfbbba14f29..ea6de62a1ac8 100644 --- a/src/auto-reply/reply/agent-runner-cli-dispatch.ts +++ b/src/auto-reply/reply/agent-runner-cli-dispatch.ts @@ -8,7 +8,8 @@ import { runCliAgent } from "../../agents/cli-runner.js"; import type { RunCliAgentParams } from "../../agents/cli-runner/types.js"; import { clearCliSession } from "../../agents/cli-session.js"; import type { EmbeddedAgentRunResult } from "../../agents/embedded-agent.js"; -import { updateSessionStore, type SessionEntry } from "../../config/sessions.js"; +import type { SessionEntry } from "../../config/sessions.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import type { AgentEventPayload } from "../../infra/agent-events.js"; import { emitAgentEvent, onAgentEvent } from "../../infra/agent-events.js"; @@ -141,9 +142,13 @@ export async function clearDroppedCliSessionBinding(params: { if (!params.storePath || !params.sessionKey) { return; } - await updateSessionStore(params.storePath, (store) => { - clearEntry(store[params.sessionKey!]); - }); + await updateSessionEntry( + { storePath: params.storePath, sessionKey: params.sessionKey }, + (entry) => { + clearEntry(entry); + return entry; + }, + ); } function createToolEventBridge(params: { diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index f9e7a8321564..d73114ee06a2 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -55,11 +55,8 @@ import { } from "../../agents/model-selection.js"; import { resolveOpenAIRuntimeProvider } from "../../agents/openai-routing.js"; import { buildAgentRuntimeOutcomePlan } from "../../agents/runtime-plan/build.js"; -import { - resolveGroupSessionKey, - type SessionEntry, - updateSessionStore, -} from "../../config/sessions.js"; +import { resolveGroupSessionKey, type SessionEntry } from "../../config/sessions.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import { resolveSilentReplyPolicy } from "../../config/silent-reply.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { logVerbose } from "../../globals.js"; @@ -398,6 +395,13 @@ function snapshotFallbackSelectionState(entry: SessionEntry): FallbackSelectionS }; } +function buildFallbackSelectionStatePatch(entry: SessionEntry): Partial { + return { + ...snapshotFallbackSelectionState(entry), + updatedAt: entry.updatedAt, + }; +} + function buildFallbackSelectionState(params: { provider: string; model: string; @@ -1786,14 +1790,13 @@ export async function runAgentTurnWithFallback(params: { try { if (params.storePath) { - await updateSessionStore(params.storePath, (store) => { - const persistedEntry = store[params.sessionKey!]; - if (!persistedEntry) { - return; - } - applyFallbackSelectionState(persistedEntry, nextState); - store[params.sessionKey!] = persistedEntry; - }); + await updateSessionEntry( + { storePath: params.storePath, sessionKey: params.sessionKey }, + (persistedEntry) => { + applyFallbackSelectionState(persistedEntry, nextState); + return buildFallbackSelectionStatePatch(persistedEntry); + }, + ); } } catch (error) { rollbackFallbackSelectionStateIfUnchanged(activeSessionEntry, nextState, previousState); @@ -1810,18 +1813,18 @@ export async function runAgentTurnWithFallback(params: { if (rolledBackInMemory) { params.activeSessionStore![params.sessionKey!] = activeSessionEntry; } - if (!params.storePath) { + if (!params.storePath || !params.sessionKey) { return; } - await updateSessionStore(params.storePath, (store) => { - const persistedEntry = store[params.sessionKey!]; - if (!persistedEntry) { - return; - } - if (rollbackFallbackSelectionStateIfUnchanged(persistedEntry, nextState, previousState)) { - store[params.sessionKey!] = persistedEntry; - } - }); + await updateSessionEntry( + { storePath: params.storePath, sessionKey: params.sessionKey }, + (persistedEntry) => { + if (rollbackFallbackSelectionStateIfUnchanged(persistedEntry, nextState, previousState)) { + return buildFallbackSelectionStatePatch(persistedEntry); + } + return null; + }, + ); }; }; const clearRecoveredAutoFallbackPrimaryProbe = async (paramsForClear: { @@ -1854,17 +1857,37 @@ export async function runAgentTurnWithFallback(params: { if (!params.storePath) { return; } - await updateSessionStore(params.storePath, (store) => { - const persistedEntry = store[params.sessionKey!]; - if (!persistedEntry) { - return; - } - if (!entryMatchesAutoFallbackPrimaryProbe(persistedEntry, probe)) { - return; - } - clearAutoFallbackPrimaryProbeSelection(persistedEntry); - store[params.sessionKey!] = persistedEntry; - }); + await updateSessionEntry( + { storePath: params.storePath, sessionKey: params.sessionKey }, + (persistedEntry) => { + if (!entryMatchesAutoFallbackPrimaryProbe(persistedEntry, probe)) { + return null; + } + const shouldClearAuthProfile = + persistedEntry.authProfileOverrideSource === "auto" || + (persistedEntry.authProfileOverrideSource === undefined && + persistedEntry.authProfileOverrideCompactionCount !== undefined); + clearAutoFallbackPrimaryProbeSelection(persistedEntry); + return { + providerOverride: undefined, + modelOverride: undefined, + modelOverrideSource: undefined, + modelOverrideFallbackOriginProvider: undefined, + modelOverrideFallbackOriginModel: undefined, + ...(shouldClearAuthProfile + ? { + authProfileOverride: undefined, + authProfileOverrideSource: undefined, + authProfileOverrideCompactionCount: undefined, + } + : {}), + fallbackNoticeSelectedModel: undefined, + fallbackNoticeActiveModel: undefined, + fallbackNoticeReason: undefined, + updatedAt: persistedEntry.updatedAt, + }; + }, + ); }; while (true) { diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index 65c9327f5104..22414ebd7d3e 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -28,9 +28,8 @@ import { resolveSessionFilePath, resolveSessionFilePathOptions, type SessionEntry, - applySessionStoreEntryPatch, - updateSessionStoreEntry, } from "../../config/sessions.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js"; import { logVerbose } from "../../globals.js"; @@ -62,6 +61,15 @@ import type { ReplyOperation } from "./reply-run-registry.js"; import { incrementCompactionCount } from "./session-updates.js"; type EmbeddedAgentRuntime = typeof import("../../agents/embedded-agent.js"); +type UpdateSessionStoreEntryParams = { + storePath: string; + sessionKey: string; + skipMaintenance?: boolean; + takeCacheOwnership?: boolean; + update: ( + entry: SessionEntry, + ) => Promise | null> | Partial | null; +}; const MAX_VISIBLE_MEMORY_FLUSH_ERROR_CHARS = 600; const MAX_FLUSH_FAILURES = 3; @@ -91,6 +99,22 @@ async function runEmbeddedAgentDefault( return await runEmbeddedAgent(...args); } +async function updateSessionStoreEntryDefault( + params: UpdateSessionStoreEntryParams, +): Promise { + return await updateSessionEntry( + { + storePath: params.storePath, + sessionKey: params.sessionKey, + }, + params.update, + { + skipMaintenance: params.skipMaintenance, + takeCacheOwnership: params.takeCacheOwnership, + }, + ); +} + async function ensureMemoryFlushTargetFile(params: { workspaceDir: string; relativePath: string; @@ -124,7 +148,7 @@ const memoryDeps = { registerAgentRunContext, refreshQueuedFollowupSession, incrementCompactionCount, - updateSessionStoreEntry, + updateSessionStoreEntry: updateSessionStoreEntryDefault, emitAgentEvent, randomUUID: () => crypto.randomUUID(), now: () => Date.now(), @@ -141,7 +165,7 @@ export function setAgentRunnerMemoryTestDeps(overrides?: Partial crypto.randomUUID(), now: () => Date.now(), @@ -1093,13 +1117,17 @@ export async function runMemoryFlushIfNeeded(params: { } if (params.storePath && params.sessionKey) { try { - const updatedEntry = await applySessionStoreEntryPatch({ - storePath: params.storePath, - sessionKey: params.sessionKey, - skipMaintenance: true, - takeCacheOwnership: true, - patch: { totalTokens: transcriptPromptTokens, totalTokensFresh: true }, - }); + const updatedEntry = await updateSessionEntry( + { + storePath: params.storePath, + sessionKey: params.sessionKey, + }, + () => ({ totalTokens: transcriptPromptTokens, totalTokensFresh: true }), + { + skipMaintenance: true, + takeCacheOwnership: true, + }, + ); if (updatedEntry) { entry = updatedEntry; if (params.sessionStore) { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 7f64a4713f1f..a16aebb583fb 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -21,13 +21,12 @@ import { deriveContextPromptTokens, hasNonzeroUsage, normalizeUsage } from "../. import { enqueueCommitmentExtraction } from "../../commitments/runtime.js"; import type { OpenClawConfig } from "../../config/config.js"; import { - applySessionStoreEntryPatch, loadSessionStore, resolveSessionPluginStatusLines, resolveSessionPluginTraceLines, type SessionEntry, - updateSessionStoreEntry, } from "../../config/sessions.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import { parseSessionThreadInfoFast } from "../../config/sessions/thread-info.js"; import type { TypingMode } from "../../config/types.js"; import { resolveSessionTranscriptCandidates } from "../../gateway/session-utils.fs.js"; @@ -1220,12 +1219,9 @@ export async function runReplyAgent(params: { activeSessionEntry.updatedAt = updatedAt; activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - await applySessionStoreEntryPatch({ - storePath, - sessionKey, + await updateSessionEntry({ storePath, sessionKey }, () => ({ updatedAt }), { skipMaintenance: true, takeCacheOwnership: true, - patch: { updatedAt }, }); } }; @@ -1433,14 +1429,16 @@ export async function runReplyAgent(params: { restartRecoveryDeliveryRunId, updatedAt, }; - const persisted = await updateSessionStoreEntry({ - storePath, - sessionKey, - update: async (current) => + const persisted = await updateSessionEntry( + { + storePath, + sessionKey, + }, + async (current) => current.sessionId === replyOperation.sessionId && current.abortedLastRun !== true ? patch : null, - }); + ); if (persisted) { activeSessionEntry = persisted; if (activeSessionStore) { @@ -1459,16 +1457,18 @@ export async function runReplyAgent(params: { restartRecoveryDeliveryRunId: undefined, updatedAt: Date.now(), }; - const persisted = await updateSessionStoreEntry({ - storePath, - sessionKey, - update: async (current) => + const persisted = await updateSessionEntry( + { + storePath, + sessionKey, + }, + async (current) => current.sessionId === replyOperation.sessionId && current.abortedLastRun !== true && current.restartRecoveryDeliveryRunId === restartRecoveryDeliveryRunId ? patch : null, - }); + ); if (persisted) { activeSessionEntry = persisted; if (activeSessionStore) { @@ -1679,16 +1679,17 @@ export async function runReplyAgent(params: { activeSessionEntry.updatedAt = updatedAt; activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - await applySessionStoreEntryPatch({ - storePath, - sessionKey, - skipMaintenance: true, - takeCacheOwnership: true, - patch: { + await updateSessionEntry( + { storePath, sessionKey }, + () => ({ groupActivationNeedsSystemIntro: false, updatedAt, + }), + { + skipMaintenance: true, + takeCacheOwnership: true, }, - }); + ); } } @@ -1743,17 +1744,18 @@ export async function runReplyAgent(params: { activeSessionStore[sessionKey] = fallbackStateEntry; } if (sessionKey && storePath) { - await applySessionStoreEntryPatch({ - storePath, - sessionKey, - skipMaintenance: true, - takeCacheOwnership: true, - patch: { + await updateSessionEntry( + { storePath, sessionKey }, + () => ({ fallbackNoticeSelectedModel: fallbackTransition.nextState.selectedModel, fallbackNoticeActiveModel: fallbackTransition.nextState.activeModel, fallbackNoticeReason: fallbackTransition.nextState.reason, + }), + { + skipMaintenance: true, + takeCacheOwnership: true, }, - }); + ); } } const usedCliProvider = isCliProvider(providerUsed, cfg); @@ -2360,19 +2362,20 @@ export async function runReplyAgent(params: { runtimePolicySessionKey, opts, }); - await applySessionStoreEntryPatch({ - storePath, - sessionKey, - skipMaintenance: true, - takeCacheOwnership: true, - patch: { + await updateSessionEntry( + { storePath, sessionKey }, + () => ({ pendingFinalDelivery: true, pendingFinalDeliveryText: resolvedPendingText, pendingFinalDeliveryContext, pendingFinalDeliveryCreatedAt: Date.now(), updatedAt: Date.now(), + }), + { + skipMaintenance: true, + takeCacheOwnership: true, }, - }); + ); } } diff --git a/src/auto-reply/reply/body.ts b/src/auto-reply/reply/body.ts index dfe53ba08ecc..2d95e9adc812 100644 --- a/src/auto-reply/reply/body.ts +++ b/src/auto-reply/reply/body.ts @@ -3,12 +3,12 @@ import type { SessionEntry } from "../../config/sessions/types.js"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; import { setAbortMemory } from "./abort-primitives.js"; -const sessionStoreRuntimeLoader = createLazyImportLoader( - () => import("../../config/sessions/store.runtime.js"), +const sessionAccessorRuntimeLoader = createLazyImportLoader( + () => import("../../config/sessions/session-accessor.js"), ); -function loadSessionStoreRuntime() { - return sessionStoreRuntimeLoader.load(); +function loadSessionAccessorRuntime() { + return sessionAccessorRuntimeLoader.load(); } /** Applies one-shot session hints to the agent-visible body and clears consumed flags. */ @@ -34,18 +34,17 @@ export async function applySessionHints(params: { params.sessionStore[params.sessionKey] = params.sessionEntry; if (params.storePath) { const sessionKey = params.sessionKey; - const { updateSessionStore } = await loadSessionStoreRuntime(); - await updateSessionStore(params.storePath, (store) => { - const entry = store[sessionKey] ?? params.sessionEntry; - if (!entry) { - return; - } - store[sessionKey] = { - ...entry, + const { updateSessionEntry } = await loadSessionAccessorRuntime(); + await updateSessionEntry( + { + storePath: params.storePath, + sessionKey, + }, + () => ({ abortedLastRun: false, updatedAt: Date.now(), - }; - }); + }), + ); } } else if (params.abortKey) { setAbortMemory(params.abortKey, false); diff --git a/src/auto-reply/reply/commands-acp/lifecycle.ts b/src/auto-reply/reply/commands-acp/lifecycle.ts index 5abc6a45a93a..979ad78f791d 100644 --- a/src/auto-reply/reply/commands-acp/lifecycle.ts +++ b/src/auto-reply/reply/commands-acp/lifecycle.ts @@ -36,7 +36,7 @@ import { resolveThreadBindingPlacementForCurrentContext, resolveThreadBindingSpawnPolicy, } from "../../../channels/thread-bindings-policy.js"; -import { updateSessionStore } from "../../../config/sessions.js"; +import { updateSessionEntry } from "../../../config/sessions/session-accessor.js"; import type { SessionAcpMeta } from "../../../config/sessions/types.js"; import type { OpenClawConfig } from "../../../config/types.openclaw.js"; import { formatErrorMessage } from "../../../infra/errors.js"; @@ -474,17 +474,16 @@ async function persistSpawnedSessionLabel(params: { if (!params.commandParams.storePath) { return; } - await updateSessionStore(params.commandParams.storePath, (store) => { - const existing = store[params.sessionKey]; - if (!existing) { - return; - } - store[params.sessionKey] = { - ...existing, + await updateSessionEntry( + { + storePath: params.commandParams.storePath, + sessionKey: params.sessionKey, + }, + () => ({ label, updatedAt: now, - }; - }); + }), + ); } export async function handleAcpSpawnAction( diff --git a/src/auto-reply/reply/commands-reset.ts b/src/auto-reply/reply/commands-reset.ts index 773a047470f1..1fec6d0d7bde 100644 --- a/src/auto-reply/reply/commands-reset.ts +++ b/src/auto-reply/reply/commands-reset.ts @@ -2,7 +2,7 @@ import { clearBootstrapSnapshot } from "../../agents/bootstrap-cache.js"; import { clearAllCliSessions } from "../../agents/cli-session.js"; import { resetConfiguredBindingTargetInPlace } from "../../channels/plugins/binding-targets.js"; -import { updateSessionStoreEntry } from "../../config/sessions/store.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import { logVerbose } from "../../globals.js"; import { isAcpSessionKey } from "../../routing/session-key.js"; import { resolveBoundAcpThreadSessionKey } from "./commands-acp/targets.js"; @@ -76,10 +76,12 @@ export async function maybeHandleResetCommand( params.sessionStore[params.sessionKey] = targetSessionEntry; } if (params.storePath && params.sessionKey) { - await updateSessionStoreEntry({ - storePath: params.storePath, - sessionKey: params.sessionKey, - update: async (entry) => { + await updateSessionEntry( + { + storePath: params.storePath, + sessionKey: params.sessionKey, + }, + async (entry) => { const next = { ...entry }; clearAllCliSessions(next); return { @@ -90,7 +92,7 @@ export async function maybeHandleResetCommand( lastInteractionAt: now, }; }, - }); + ); } } diff --git a/src/auto-reply/reply/directive-handling.impl.ts b/src/auto-reply/reply/directive-handling.impl.ts index 61ecb55b4171..da03447eec86 100644 --- a/src/auto-reply/reply/directive-handling.impl.ts +++ b/src/auto-reply/reply/directive-handling.impl.ts @@ -5,7 +5,7 @@ import { renderExecTargetLabel } from "../../agents/bash-tools.exec-runtime.js"; import { resolveExecDefaults } from "../../agents/exec-defaults.js"; import { resolveFastModeState } from "../../agents/fast-mode.js"; import { resolveSandboxRuntimeStatus } from "../../agents/sandbox.js"; -import { updateSessionStore } from "../../config/sessions.js"; +import { replaceSessionEntry } from "../../config/sessions/session-accessor.js"; import { triggerSessionPatchHook } from "../../gateway/session-patch-hooks.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import { applyTraceOverride, applyVerboseOverride } from "../../sessions/level-overrides.js"; @@ -476,9 +476,7 @@ export async function handleDirectiveOnly( sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; if (storePath) { - await updateSessionStore(storePath, (store) => { - store[sessionKey] = sessionEntry; - }); + await replaceSessionEntry({ storePath, sessionKey }, sessionEntry); } if (modelSelection && modelSelectionUpdated && sessionKey) { triggerSessionPatchHook({ diff --git a/src/auto-reply/reply/directive-handling.persist.ts b/src/auto-reply/reply/directive-handling.persist.ts index 5184f7632a9f..138e6187cee8 100644 --- a/src/auto-reply/reply/directive-handling.persist.ts +++ b/src/auto-reply/reply/directive-handling.persist.ts @@ -9,7 +9,7 @@ import { resolveAgentHarnessPolicy } from "../../agents/harness/policy.js"; import type { ModelCatalogEntry } from "../../agents/model-catalog.js"; import { normalizeProviderId, type ModelAliasIndex } from "../../agents/model-selection.js"; import { resolveContextConfigProviderForRuntime } from "../../agents/openai-routing.js"; -import { updateSessionStore } from "../../config/sessions/store.js"; +import { replaceSessionEntry } from "../../config/sessions/session-accessor.js"; import type { SessionEntry } from "../../config/sessions/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { triggerSessionPatchHook } from "../../gateway/session-patch-hooks.js"; @@ -359,9 +359,7 @@ export async function persistInlineDirectives(params: { sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; if (storePath) { - await updateSessionStore(storePath, (store) => { - store[sessionKey] = sessionEntry; - }); + await replaceSessionEntry({ storePath, sessionKey }, sessionEntry); } if (modelDirective && modelUpdated) { triggerSessionPatchHook({ diff --git a/src/auto-reply/reply/dispatch-from-config.runtime.ts b/src/auto-reply/reply/dispatch-from-config.runtime.ts index 4b54507828cf..00bcb61b8c1e 100644 --- a/src/auto-reply/reply/dispatch-from-config.runtime.ts +++ b/src/auto-reply/reply/dispatch-from-config.runtime.ts @@ -1,9 +1,33 @@ /** Runtime-only dispatch dependencies shared by config-driven reply delivery. */ +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; +import type { SessionEntry } from "../../config/sessions/types.js"; + export { resolveStorePath } from "../../config/sessions/paths.js"; export { loadSessionStore, readSessionEntry, resolveSessionStoreEntry, - updateSessionStoreEntry, } from "../../config/sessions/store.js"; export { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; + +export async function updateSessionStoreEntry(params: { + storePath: string; + sessionKey: string; + skipMaintenance?: boolean; + takeCacheOwnership?: boolean; + update: ( + entry: SessionEntry, + ) => Promise | null> | Partial | null; +}): Promise { + return await updateSessionEntry( + { + storePath: params.storePath, + sessionKey: params.sessionKey, + }, + params.update, + { + skipMaintenance: params.skipMaintenance, + takeCacheOwnership: params.takeCacheOwnership, + }, + ); +} diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index dbcd5b4fea8d..a4c9b5e31194 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -20,7 +20,8 @@ import { buildAgentRuntimeDeliveryPlan, buildAgentRuntimeOutcomePlan, } from "../../agents/runtime-plan/build.js"; -import { updateSessionStore, type SessionEntry } from "../../config/sessions.js"; +import type { SessionEntry } from "../../config/sessions.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import { readSessionEntry } from "../../config/sessions/store-load.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; @@ -657,16 +658,33 @@ export function createFollowupRunner(params: { if (!storePath) { return; } - await updateSessionStore(storePath, (store) => { - const persistedEntry = store[replySessionKey]; - if (!persistedEntry) { - return; - } + await updateSessionEntry({ storePath, sessionKey: replySessionKey }, (persistedEntry) => { if (!entryMatchesAutoFallbackPrimaryProbe(persistedEntry, probe)) { - return; + return null; } + const shouldClearAuthProfile = + persistedEntry.authProfileOverrideSource === "auto" || + (persistedEntry.authProfileOverrideSource === undefined && + persistedEntry.authProfileOverrideCompactionCount !== undefined); clearAutoFallbackPrimaryProbeSelection(persistedEntry); - store[replySessionKey] = persistedEntry; + return { + providerOverride: undefined, + modelOverride: undefined, + modelOverrideSource: undefined, + modelOverrideFallbackOriginProvider: undefined, + modelOverrideFallbackOriginModel: undefined, + ...(shouldClearAuthProfile + ? { + authProfileOverride: undefined, + authProfileOverrideSource: undefined, + authProfileOverrideCompactionCount: undefined, + } + : {}), + fallbackNoticeSelectedModel: undefined, + fallbackNoticeActiveModel: undefined, + fallbackNoticeReason: undefined, + updatedAt: persistedEntry.updatedAt, + }; }); }; fallbackProvider = run.provider; diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 3b94abbbb217..b6808e2aa2a8 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -339,6 +339,9 @@ const agentRunnerRuntimeLoader = createLazyImportLoader(() => import("./agent-ru const sessionUpdatesRuntimeLoader = createLazyImportLoader( () => import("./session-updates.runtime.js"), ); +const sessionAccessorRuntimeLoader = createLazyImportLoader( + () => import("../../config/sessions/session-accessor.js"), +); function loadEmbeddedAgentRuntime() { return embeddedAgentRuntimeLoader.load(); @@ -352,6 +355,10 @@ function loadSessionUpdatesRuntime() { return sessionUpdatesRuntimeLoader.load(); } +function loadSessionAccessorRuntime() { + return sessionAccessorRuntimeLoader.load(); +} + function stripPromptThinkingDirectives(body: string): string { return body .split("\n") @@ -904,6 +911,20 @@ export async function runPreparedReply( // Execution fallbacks are turn-local; directive/model persistence owns // durable thinking remaps so explicit session overrides survive replies. resolvedThinkLevel = fallbackThinkLevel; + if ( + sessionEntry && + sessionStore && + sessionKey && + sessionEntry.thinkingLevel === previousThinkLevel + ) { + sessionEntry.thinkingLevel = fallbackThinkLevel; + sessionEntry.updatedAt = Date.now(); + sessionStore[sessionKey] = sessionEntry; + if (storePath) { + const { upsertSessionEntry } = await loadSessionAccessorRuntime(); + await upsertSessionEntry({ storePath, sessionKey }, sessionEntry); + } + } } } const internalOpts = opts as InternalGetReplyOptions | undefined; diff --git a/src/auto-reply/reply/get-reply.ts b/src/auto-reply/reply/get-reply.ts index 1803b91f633e..5a32336d4ec4 100644 --- a/src/auto-reply/reply/get-reply.ts +++ b/src/auto-reply/reply/get-reply.ts @@ -519,13 +519,10 @@ export async function getReplyFromConfig( sessionStore[sessionKey] = sessionEntry; } if (sessionKey && storePath) { - const { applySessionStoreEntryPatch } = await import("../../config/sessions.js"); - await applySessionStoreEntryPatch({ - storePath, - sessionKey, - skipMaintenance: true, - takeCacheOwnership: true, - patch: { + const { updateSessionEntry } = await import("../../config/sessions/session-accessor.js"); + await updateSessionEntry( + { storePath, sessionKey }, + () => ({ pendingFinalDelivery: undefined, pendingFinalDeliveryText: undefined, pendingFinalDeliveryCreatedAt: undefined, @@ -533,8 +530,12 @@ export async function getReplyFromConfig( pendingFinalDeliveryAttemptCount: undefined, pendingFinalDeliveryLastError: undefined, pendingFinalDeliveryContext: undefined, + }), + { + skipMaintenance: true, + takeCacheOwnership: true, }, - }); + ); } } } diff --git a/src/auto-reply/reply/model-selection.ts b/src/auto-reply/reply/model-selection.ts index ba90ea49552d..a9f22db97c62 100644 --- a/src/auto-reply/reply/model-selection.ts +++ b/src/auto-reply/reply/model-selection.ts @@ -86,8 +86,8 @@ function shouldLogModelSelectionTiming(): boolean { const modelCatalogRuntimeLoader = createLazyImportLoader( () => import("../../agents/model-catalog.runtime.js"), ); -const sessionStoreRuntimeLoader = createLazyImportLoader( - () => import("../../config/sessions/store.runtime.js"), +const sessionAccessorRuntimeLoader = createLazyImportLoader( + () => import("../../config/sessions/session-accessor.js"), ); function normalizeRuntimeModelRef(provider: string, model: string) { return normalizeModelRef(provider, model, RUNTIME_MODEL_VISIBILITY_NORMALIZATION); @@ -97,8 +97,8 @@ function loadModelCatalogRuntime() { return modelCatalogRuntimeLoader.load(); } -function loadSessionStoreRuntime() { - return sessionStoreRuntimeLoader.load(); +function loadSessionAccessorRuntime() { + return sessionAccessorRuntimeLoader.load(); } function findSelectedCatalogEntry(params: { @@ -290,11 +290,8 @@ export async function createModelSelectionState(params: { if (updated) { sessionStore[sessionKey] = sessionEntry; if (storePath) { - await ( - await loadSessionStoreRuntime() - ).updateSessionStore(storePath, (store) => { - store[sessionKey] = sessionEntry; - }); + const { replaceSessionEntry } = await loadSessionAccessorRuntime(); + await replaceSessionEntry({ storePath, sessionKey }, sessionEntry); } } resetModelOverride = updated; diff --git a/src/auto-reply/reply/session-reset-model.ts b/src/auto-reply/reply/session-reset-model.ts index b7fdd1dbea86..075056d8c4f0 100644 --- a/src/auto-reply/reply/session-reset-model.ts +++ b/src/auto-reply/reply/session-reset-model.ts @@ -126,11 +126,9 @@ function applySelectionToSession(params: { } sessionStore[sessionKey] = sessionEntry; if (storePath) { - void import("../../config/sessions.js") - .then(({ updateSessionStore }) => - updateSessionStore(storePath, (store) => { - store[sessionKey] = sessionEntry; - }), + void import("../../config/sessions/session-accessor.js") + .then(({ replaceSessionEntry }) => + replaceSessionEntry({ storePath, sessionKey }, sessionEntry), ) .catch(() => { // Ignore persistence errors; session still proceeds. diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index 4e077f270301..4a1274a7fc23 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -12,6 +12,7 @@ import { type SessionEntry, updateSessionStore, } from "../../config/sessions.js"; +import { upsertSessionEntry } from "../../config/sessions/session-accessor.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { forgetActiveSessionForShutdown, @@ -43,17 +44,12 @@ async function persistSessionEntryUpdate(params: { if (!params.storePath) { return; } - await updateSessionStore( - params.storePath, - (store) => { - const next = { ...store[params.sessionKey!], ...params.nextEntry }; - store[params.sessionKey!] = next; - return next; - }, + await upsertSessionEntry( { - resolveSingleEntryPersistence: (entry) => - entry && params.sessionKey ? { sessionKey: params.sessionKey, entry } : null, + storePath: params.storePath, + sessionKey: params.sessionKey, }, + params.nextEntry, ); } diff --git a/src/auto-reply/reply/session-usage.ts b/src/auto-reply/reply/session-usage.ts index 36aa790e5b62..07bc3e4bd9e2 100644 --- a/src/auto-reply/reply/session-usage.ts +++ b/src/auto-reply/reply/session-usage.ts @@ -14,8 +14,8 @@ import { resolveSessionGoalDisplayState, type SessionSystemPromptReport, type SessionEntry, - updateSessionStoreEntry, } from "../../config/sessions.js"; +import { updateSessionEntry } from "../../config/sessions/session-accessor.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { logVerbose } from "../../globals.js"; import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js"; @@ -140,12 +140,12 @@ export async function persistSessionUsageUpdate(params: { if (hasUsage || hasFreshContextSnapshot || hasCompactionSnapshot) { try { - await updateSessionStoreEntry({ - storePath, - sessionKey, - skipMaintenance: true, - takeCacheOwnership: true, - update: async (entry) => { + await updateSessionEntry( + { + storePath, + sessionKey, + }, + async (entry) => { const updatedAt = Date.now(); const preserveSessionModelState = params.isHeartbeat === true || params.preserveUserFacingSessionModelState === true; @@ -238,7 +238,11 @@ export async function persistSessionUsageUpdate(params: { ? patch : applyCliSessionIdToSessionPatch(params, entry, patch); }, - }); + { + skipMaintenance: true, + takeCacheOwnership: true, + }, + ); } catch (err) { logVerbose(`failed to persist ${label}usage update: ${String(err)}`); } @@ -247,12 +251,12 @@ export async function persistSessionUsageUpdate(params: { if (params.modelUsed || params.contextTokensUsed) { try { - await updateSessionStoreEntry({ - storePath, - sessionKey, - skipMaintenance: true, - takeCacheOwnership: true, - update: async (entry) => { + await updateSessionEntry( + { + storePath, + sessionKey, + }, + async (entry) => { const preserveSessionModelState = params.isHeartbeat === true || params.preserveUserFacingSessionModelState === true; const preserveUserFacingRunState = params.preserveUserFacingSessionModelState === true; @@ -274,7 +278,11 @@ export async function persistSessionUsageUpdate(params: { ? patch : applyCliSessionIdToSessionPatch(params, entry, patch); }, - }); + { + skipMaintenance: true, + takeCacheOwnership: true, + }, + ); } catch (err) { logVerbose(`failed to persist ${label}model/context update: ${String(err)}`); }