clawdbot-6f0: route agent runtime session writes through seam

This commit is contained in:
Josh Lehman
2026-05-31 18:41:34 -07:00
parent 1c31222d71
commit 276e3a1e0c
21 changed files with 340 additions and 223 deletions

View File

@@ -13,6 +13,7 @@ import {
updateSessionStore,
rewriteSessionFileForNewSessionId,
} from "../../config/sessions.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import { resolveMaintenanceConfigFromInput } from "../../config/sessions/store-maintenance.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
@@ -332,13 +333,11 @@ export async function clearCliSessionInStore(params: {
clearCliSession(next, provider);
next.updatedAt = Date.now();
const persisted = await updateSessionStore(storePath, (store) => {
const merged = mergeSessionEntry(store[sessionKey], next);
store[sessionKey] = merged;
return merged;
});
sessionStore[sessionKey] = persisted;
return persisted;
const persisted = await updateSessionEntry({ storePath, sessionKey }, () => next);
if (persisted) {
sessionStore[sessionKey] = persisted;
}
return persisted ?? undefined;
}
/** Records CLI compaction metadata on the persisted session entry. */
@@ -401,13 +400,11 @@ export async function recordCliCompactionInStore(params: {
next.cacheWrite = undefined;
}
const persisted = await updateSessionStore(storePath, (store) => {
const merged = mergeSessionEntry(store[sessionKey], next);
store[sessionKey] = merged;
return merged;
});
sessionStore[sessionKey] = persisted;
return persisted;
const persisted = await updateSessionEntry({ storePath, sessionKey }, () => next);
if (persisted) {
sessionStore[sessionKey] = persisted;
}
return persisted ?? undefined;
}
function resolveCompactionSessionFile(params: {

View File

@@ -8,7 +8,8 @@ import { sanitizeForLog } from "../../../packages/terminal-core/src/ansi.js";
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
import { resolveStorePath, updateSessionStoreEntry } from "../../config/sessions.js";
import { resolveStorePath } from "../../config/sessions.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import { ensureContextEnginesInitialized } from "../../context-engine/init.js";
import {
resolveContextEngine,
@@ -239,12 +240,12 @@ async function resetNoRealConversationTokenSnapshot(params: {
}
const storePath = resolveStorePath(params.config?.session?.store, { agentId: params.agentId });
try {
await updateSessionStoreEntry({
storePath,
sessionKey: params.sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
update: async () => ({
await updateSessionEntry(
{
storePath,
sessionKey: params.sessionKey,
},
async () => ({
totalTokens: 0,
totalTokensFresh: true,
inputTokens: undefined,
@@ -254,7 +255,11 @@ async function resetNoRealConversationTokenSnapshot(params: {
contextBudgetStatus: undefined,
updatedAt: Date.now(),
}),
});
{
skipMaintenance: true,
takeCacheOwnership: true,
},
);
} catch (err) {
log.warn(
`[context-overflow-precheck] failed to reset stale context snapshot for ` +

View File

@@ -12,11 +12,8 @@ import { filterHeartbeatTranscriptArtifacts } from "../../../auto-reply/heartbea
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../../../auto-reply/tokens.js";
import { getRuntimeConfig } from "../../../config/config.js";
import { resolveStorePath } from "../../../config/sessions/paths.js";
import {
loadSessionStore,
runQuotaSuspensionMaintenance,
updateSessionStoreEntry,
} from "../../../config/sessions/store.js";
import { updateSessionEntry } from "../../../config/sessions/session-accessor.js";
import { loadSessionStore, runQuotaSuspensionMaintenance } from "../../../config/sessions/store.js";
import {
bindOwnedSessionTranscriptWrites,
withOwnedSessionTranscriptWrites,
@@ -2892,12 +2889,12 @@ export async function runEmbeddedAttempt(
activeSubagents: subagents,
});
validated.push(handoffMsg);
await updateSessionStoreEntry({
storePath,
sessionKey: params.sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
update: async (entry) => {
await updateSessionEntry(
{
storePath,
sessionKey: params.sessionKey,
},
async (entry) => {
if (entry.quotaSuspension?.state !== "resuming") {
return null;
}
@@ -2905,7 +2902,11 @@ export async function runEmbeddedAttempt(
quotaSuspension: { ...entry.quotaSuspension, state: "active" },
};
},
});
{
skipMaintenance: true,
takeCacheOwnership: true,
},
);
}
}

View File

@@ -1,5 +1,5 @@
/** Runtime persistence helper for clearing abort-cutoff state from sessions. */
import { updateSessionStore } from "../../config/sessions/store.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import { applyAbortCutoffToSessionEntry, hasAbortCutoff } from "./abort-cutoff.js";
@@ -20,15 +20,11 @@ export async function clearAbortCutoffInSessionRuntime(params: {
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await updateSessionStore(storePath, (store) => {
const existing = store[sessionKey] ?? sessionEntry;
if (!existing) {
return;
}
applyAbortCutoffToSessionEntry(existing, undefined);
existing.updatedAt = Date.now();
store[sessionKey] = existing;
});
await updateSessionEntry({ storePath, sessionKey }, () => ({
abortCutoffMessageSid: undefined,
abortCutoffTimestamp: undefined,
updatedAt: Date.now(),
}));
}
return true;

View File

@@ -8,7 +8,8 @@ import { runCliAgent } from "../../agents/cli-runner.js";
import type { RunCliAgentParams } from "../../agents/cli-runner/types.js";
import { clearCliSession } from "../../agents/cli-session.js";
import type { EmbeddedAgentRunResult } from "../../agents/embedded-agent.js";
import { updateSessionStore, type SessionEntry } from "../../config/sessions.js";
import type { SessionEntry } from "../../config/sessions.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import type { AgentEventPayload } from "../../infra/agent-events.js";
import { emitAgentEvent, onAgentEvent } from "../../infra/agent-events.js";
@@ -141,9 +142,13 @@ export async function clearDroppedCliSessionBinding(params: {
if (!params.storePath || !params.sessionKey) {
return;
}
await updateSessionStore(params.storePath, (store) => {
clearEntry(store[params.sessionKey!]);
});
await updateSessionEntry(
{ storePath: params.storePath, sessionKey: params.sessionKey },
(entry) => {
clearEntry(entry);
return entry;
},
);
}
function createToolEventBridge(params: {

View File

@@ -55,11 +55,8 @@ import {
} from "../../agents/model-selection.js";
import { resolveOpenAIRuntimeProvider } from "../../agents/openai-routing.js";
import { buildAgentRuntimeOutcomePlan } from "../../agents/runtime-plan/build.js";
import {
resolveGroupSessionKey,
type SessionEntry,
updateSessionStore,
} from "../../config/sessions.js";
import { resolveGroupSessionKey, type SessionEntry } from "../../config/sessions.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import { resolveSilentReplyPolicy } from "../../config/silent-reply.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { logVerbose } from "../../globals.js";
@@ -398,6 +395,13 @@ function snapshotFallbackSelectionState(entry: SessionEntry): FallbackSelectionS
};
}
function buildFallbackSelectionStatePatch(entry: SessionEntry): Partial<SessionEntry> {
return {
...snapshotFallbackSelectionState(entry),
updatedAt: entry.updatedAt,
};
}
function buildFallbackSelectionState(params: {
provider: string;
model: string;
@@ -1786,14 +1790,13 @@ export async function runAgentTurnWithFallback(params: {
try {
if (params.storePath) {
await updateSessionStore(params.storePath, (store) => {
const persistedEntry = store[params.sessionKey!];
if (!persistedEntry) {
return;
}
applyFallbackSelectionState(persistedEntry, nextState);
store[params.sessionKey!] = persistedEntry;
});
await updateSessionEntry(
{ storePath: params.storePath, sessionKey: params.sessionKey },
(persistedEntry) => {
applyFallbackSelectionState(persistedEntry, nextState);
return buildFallbackSelectionStatePatch(persistedEntry);
},
);
}
} catch (error) {
rollbackFallbackSelectionStateIfUnchanged(activeSessionEntry, nextState, previousState);
@@ -1810,18 +1813,18 @@ export async function runAgentTurnWithFallback(params: {
if (rolledBackInMemory) {
params.activeSessionStore![params.sessionKey!] = activeSessionEntry;
}
if (!params.storePath) {
if (!params.storePath || !params.sessionKey) {
return;
}
await updateSessionStore(params.storePath, (store) => {
const persistedEntry = store[params.sessionKey!];
if (!persistedEntry) {
return;
}
if (rollbackFallbackSelectionStateIfUnchanged(persistedEntry, nextState, previousState)) {
store[params.sessionKey!] = persistedEntry;
}
});
await updateSessionEntry(
{ storePath: params.storePath, sessionKey: params.sessionKey },
(persistedEntry) => {
if (rollbackFallbackSelectionStateIfUnchanged(persistedEntry, nextState, previousState)) {
return buildFallbackSelectionStatePatch(persistedEntry);
}
return null;
},
);
};
};
const clearRecoveredAutoFallbackPrimaryProbe = async (paramsForClear: {
@@ -1854,17 +1857,37 @@ export async function runAgentTurnWithFallback(params: {
if (!params.storePath) {
return;
}
await updateSessionStore(params.storePath, (store) => {
const persistedEntry = store[params.sessionKey!];
if (!persistedEntry) {
return;
}
if (!entryMatchesAutoFallbackPrimaryProbe(persistedEntry, probe)) {
return;
}
clearAutoFallbackPrimaryProbeSelection(persistedEntry);
store[params.sessionKey!] = persistedEntry;
});
await updateSessionEntry(
{ storePath: params.storePath, sessionKey: params.sessionKey },
(persistedEntry) => {
if (!entryMatchesAutoFallbackPrimaryProbe(persistedEntry, probe)) {
return null;
}
const shouldClearAuthProfile =
persistedEntry.authProfileOverrideSource === "auto" ||
(persistedEntry.authProfileOverrideSource === undefined &&
persistedEntry.authProfileOverrideCompactionCount !== undefined);
clearAutoFallbackPrimaryProbeSelection(persistedEntry);
return {
providerOverride: undefined,
modelOverride: undefined,
modelOverrideSource: undefined,
modelOverrideFallbackOriginProvider: undefined,
modelOverrideFallbackOriginModel: undefined,
...(shouldClearAuthProfile
? {
authProfileOverride: undefined,
authProfileOverrideSource: undefined,
authProfileOverrideCompactionCount: undefined,
}
: {}),
fallbackNoticeSelectedModel: undefined,
fallbackNoticeActiveModel: undefined,
fallbackNoticeReason: undefined,
updatedAt: persistedEntry.updatedAt,
};
},
);
};
while (true) {

View File

@@ -28,9 +28,8 @@ import {
resolveSessionFilePath,
resolveSessionFilePathOptions,
type SessionEntry,
applySessionStoreEntryPatch,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js";
import { logVerbose } from "../../globals.js";
@@ -62,6 +61,15 @@ import type { ReplyOperation } from "./reply-run-registry.js";
import { incrementCompactionCount } from "./session-updates.js";
type EmbeddedAgentRuntime = typeof import("../../agents/embedded-agent.js");
type UpdateSessionStoreEntryParams = {
storePath: string;
sessionKey: string;
skipMaintenance?: boolean;
takeCacheOwnership?: boolean;
update: (
entry: SessionEntry,
) => Promise<Partial<SessionEntry> | null> | Partial<SessionEntry> | null;
};
const MAX_VISIBLE_MEMORY_FLUSH_ERROR_CHARS = 600;
const MAX_FLUSH_FAILURES = 3;
@@ -91,6 +99,22 @@ async function runEmbeddedAgentDefault(
return await runEmbeddedAgent(...args);
}
async function updateSessionStoreEntryDefault(
params: UpdateSessionStoreEntryParams,
): Promise<SessionEntry | null> {
return await updateSessionEntry(
{
storePath: params.storePath,
sessionKey: params.sessionKey,
},
params.update,
{
skipMaintenance: params.skipMaintenance,
takeCacheOwnership: params.takeCacheOwnership,
},
);
}
async function ensureMemoryFlushTargetFile(params: {
workspaceDir: string;
relativePath: string;
@@ -124,7 +148,7 @@ const memoryDeps = {
registerAgentRunContext,
refreshQueuedFollowupSession,
incrementCompactionCount,
updateSessionStoreEntry,
updateSessionStoreEntry: updateSessionStoreEntryDefault,
emitAgentEvent,
randomUUID: () => crypto.randomUUID(),
now: () => Date.now(),
@@ -141,7 +165,7 @@ export function setAgentRunnerMemoryTestDeps(overrides?: Partial<typeof memoryDe
registerAgentRunContext,
refreshQueuedFollowupSession,
incrementCompactionCount,
updateSessionStoreEntry,
updateSessionStoreEntry: updateSessionStoreEntryDefault,
emitAgentEvent,
randomUUID: () => crypto.randomUUID(),
now: () => Date.now(),
@@ -1093,13 +1117,17 @@ export async function runMemoryFlushIfNeeded(params: {
}
if (params.storePath && params.sessionKey) {
try {
const updatedEntry = await applySessionStoreEntryPatch({
storePath: params.storePath,
sessionKey: params.sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
patch: { totalTokens: transcriptPromptTokens, totalTokensFresh: true },
});
const updatedEntry = await updateSessionEntry(
{
storePath: params.storePath,
sessionKey: params.sessionKey,
},
() => ({ totalTokens: transcriptPromptTokens, totalTokensFresh: true }),
{
skipMaintenance: true,
takeCacheOwnership: true,
},
);
if (updatedEntry) {
entry = updatedEntry;
if (params.sessionStore) {

View File

@@ -21,13 +21,12 @@ import { deriveContextPromptTokens, hasNonzeroUsage, normalizeUsage } from "../.
import { enqueueCommitmentExtraction } from "../../commitments/runtime.js";
import type { OpenClawConfig } from "../../config/config.js";
import {
applySessionStoreEntryPatch,
loadSessionStore,
resolveSessionPluginStatusLines,
resolveSessionPluginTraceLines,
type SessionEntry,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import { parseSessionThreadInfoFast } from "../../config/sessions/thread-info.js";
import type { TypingMode } from "../../config/types.js";
import { resolveSessionTranscriptCandidates } from "../../gateway/session-utils.fs.js";
@@ -1220,12 +1219,9 @@ export async function runReplyAgent(params: {
activeSessionEntry.updatedAt = updatedAt;
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await applySessionStoreEntryPatch({
storePath,
sessionKey,
await updateSessionEntry({ storePath, sessionKey }, () => ({ updatedAt }), {
skipMaintenance: true,
takeCacheOwnership: true,
patch: { updatedAt },
});
}
};
@@ -1433,14 +1429,16 @@ export async function runReplyAgent(params: {
restartRecoveryDeliveryRunId,
updatedAt,
};
const persisted = await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (current) =>
const persisted = await updateSessionEntry(
{
storePath,
sessionKey,
},
async (current) =>
current.sessionId === replyOperation.sessionId && current.abortedLastRun !== true
? patch
: null,
});
);
if (persisted) {
activeSessionEntry = persisted;
if (activeSessionStore) {
@@ -1459,16 +1457,18 @@ export async function runReplyAgent(params: {
restartRecoveryDeliveryRunId: undefined,
updatedAt: Date.now(),
};
const persisted = await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (current) =>
const persisted = await updateSessionEntry(
{
storePath,
sessionKey,
},
async (current) =>
current.sessionId === replyOperation.sessionId &&
current.abortedLastRun !== true &&
current.restartRecoveryDeliveryRunId === restartRecoveryDeliveryRunId
? patch
: null,
});
);
if (persisted) {
activeSessionEntry = persisted;
if (activeSessionStore) {
@@ -1679,16 +1679,17 @@ export async function runReplyAgent(params: {
activeSessionEntry.updatedAt = updatedAt;
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await applySessionStoreEntryPatch({
storePath,
sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
patch: {
await updateSessionEntry(
{ storePath, sessionKey },
() => ({
groupActivationNeedsSystemIntro: false,
updatedAt,
}),
{
skipMaintenance: true,
takeCacheOwnership: true,
},
});
);
}
}
@@ -1743,17 +1744,18 @@ export async function runReplyAgent(params: {
activeSessionStore[sessionKey] = fallbackStateEntry;
}
if (sessionKey && storePath) {
await applySessionStoreEntryPatch({
storePath,
sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
patch: {
await updateSessionEntry(
{ storePath, sessionKey },
() => ({
fallbackNoticeSelectedModel: fallbackTransition.nextState.selectedModel,
fallbackNoticeActiveModel: fallbackTransition.nextState.activeModel,
fallbackNoticeReason: fallbackTransition.nextState.reason,
}),
{
skipMaintenance: true,
takeCacheOwnership: true,
},
});
);
}
}
const usedCliProvider = isCliProvider(providerUsed, cfg);
@@ -2360,19 +2362,20 @@ export async function runReplyAgent(params: {
runtimePolicySessionKey,
opts,
});
await applySessionStoreEntryPatch({
storePath,
sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
patch: {
await updateSessionEntry(
{ storePath, sessionKey },
() => ({
pendingFinalDelivery: true,
pendingFinalDeliveryText: resolvedPendingText,
pendingFinalDeliveryContext,
pendingFinalDeliveryCreatedAt: Date.now(),
updatedAt: Date.now(),
}),
{
skipMaintenance: true,
takeCacheOwnership: true,
},
});
);
}
}

View File

@@ -3,12 +3,12 @@ import type { SessionEntry } from "../../config/sessions/types.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
import { setAbortMemory } from "./abort-primitives.js";
const sessionStoreRuntimeLoader = createLazyImportLoader(
() => import("../../config/sessions/store.runtime.js"),
const sessionAccessorRuntimeLoader = createLazyImportLoader(
() => import("../../config/sessions/session-accessor.js"),
);
function loadSessionStoreRuntime() {
return sessionStoreRuntimeLoader.load();
function loadSessionAccessorRuntime() {
return sessionAccessorRuntimeLoader.load();
}
/** Applies one-shot session hints to the agent-visible body and clears consumed flags. */
@@ -34,18 +34,17 @@ export async function applySessionHints(params: {
params.sessionStore[params.sessionKey] = params.sessionEntry;
if (params.storePath) {
const sessionKey = params.sessionKey;
const { updateSessionStore } = await loadSessionStoreRuntime();
await updateSessionStore(params.storePath, (store) => {
const entry = store[sessionKey] ?? params.sessionEntry;
if (!entry) {
return;
}
store[sessionKey] = {
...entry,
const { updateSessionEntry } = await loadSessionAccessorRuntime();
await updateSessionEntry(
{
storePath: params.storePath,
sessionKey,
},
() => ({
abortedLastRun: false,
updatedAt: Date.now(),
};
});
}),
);
}
} else if (params.abortKey) {
setAbortMemory(params.abortKey, false);

View File

@@ -36,7 +36,7 @@ import {
resolveThreadBindingPlacementForCurrentContext,
resolveThreadBindingSpawnPolicy,
} from "../../../channels/thread-bindings-policy.js";
import { updateSessionStore } from "../../../config/sessions.js";
import { updateSessionEntry } from "../../../config/sessions/session-accessor.js";
import type { SessionAcpMeta } from "../../../config/sessions/types.js";
import type { OpenClawConfig } from "../../../config/types.openclaw.js";
import { formatErrorMessage } from "../../../infra/errors.js";
@@ -474,17 +474,16 @@ async function persistSpawnedSessionLabel(params: {
if (!params.commandParams.storePath) {
return;
}
await updateSessionStore(params.commandParams.storePath, (store) => {
const existing = store[params.sessionKey];
if (!existing) {
return;
}
store[params.sessionKey] = {
...existing,
await updateSessionEntry(
{
storePath: params.commandParams.storePath,
sessionKey: params.sessionKey,
},
() => ({
label,
updatedAt: now,
};
});
}),
);
}
export async function handleAcpSpawnAction(

View File

@@ -2,7 +2,7 @@
import { clearBootstrapSnapshot } from "../../agents/bootstrap-cache.js";
import { clearAllCliSessions } from "../../agents/cli-session.js";
import { resetConfiguredBindingTargetInPlace } from "../../channels/plugins/binding-targets.js";
import { updateSessionStoreEntry } from "../../config/sessions/store.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import { logVerbose } from "../../globals.js";
import { isAcpSessionKey } from "../../routing/session-key.js";
import { resolveBoundAcpThreadSessionKey } from "./commands-acp/targets.js";
@@ -76,10 +76,12 @@ export async function maybeHandleResetCommand(
params.sessionStore[params.sessionKey] = targetSessionEntry;
}
if (params.storePath && params.sessionKey) {
await updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
update: async (entry) => {
await updateSessionEntry(
{
storePath: params.storePath,
sessionKey: params.sessionKey,
},
async (entry) => {
const next = { ...entry };
clearAllCliSessions(next);
return {
@@ -90,7 +92,7 @@ export async function maybeHandleResetCommand(
lastInteractionAt: now,
};
},
});
);
}
}

View File

@@ -5,7 +5,7 @@ import { renderExecTargetLabel } from "../../agents/bash-tools.exec-runtime.js";
import { resolveExecDefaults } from "../../agents/exec-defaults.js";
import { resolveFastModeState } from "../../agents/fast-mode.js";
import { resolveSandboxRuntimeStatus } from "../../agents/sandbox.js";
import { updateSessionStore } from "../../config/sessions.js";
import { replaceSessionEntry } from "../../config/sessions/session-accessor.js";
import { triggerSessionPatchHook } from "../../gateway/session-patch-hooks.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { applyTraceOverride, applyVerboseOverride } from "../../sessions/level-overrides.js";
@@ -476,9 +476,7 @@ export async function handleDirectiveOnly(
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
await replaceSessionEntry({ storePath, sessionKey }, sessionEntry);
}
if (modelSelection && modelSelectionUpdated && sessionKey) {
triggerSessionPatchHook({

View File

@@ -9,7 +9,7 @@ import { resolveAgentHarnessPolicy } from "../../agents/harness/policy.js";
import type { ModelCatalogEntry } from "../../agents/model-catalog.js";
import { normalizeProviderId, type ModelAliasIndex } from "../../agents/model-selection.js";
import { resolveContextConfigProviderForRuntime } from "../../agents/openai-routing.js";
import { updateSessionStore } from "../../config/sessions/store.js";
import { replaceSessionEntry } from "../../config/sessions/session-accessor.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { triggerSessionPatchHook } from "../../gateway/session-patch-hooks.js";
@@ -359,9 +359,7 @@ export async function persistInlineDirectives(params: {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
await replaceSessionEntry({ storePath, sessionKey }, sessionEntry);
}
if (modelDirective && modelUpdated) {
triggerSessionPatchHook({

View File

@@ -1,9 +1,33 @@
/** Runtime-only dispatch dependencies shared by config-driven reply delivery. */
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import type { SessionEntry } from "../../config/sessions/types.js";
export { resolveStorePath } from "../../config/sessions/paths.js";
export {
loadSessionStore,
readSessionEntry,
resolveSessionStoreEntry,
updateSessionStoreEntry,
} from "../../config/sessions/store.js";
export { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
export async function updateSessionStoreEntry(params: {
storePath: string;
sessionKey: string;
skipMaintenance?: boolean;
takeCacheOwnership?: boolean;
update: (
entry: SessionEntry,
) => Promise<Partial<SessionEntry> | null> | Partial<SessionEntry> | null;
}): Promise<SessionEntry | null> {
return await updateSessionEntry(
{
storePath: params.storePath,
sessionKey: params.sessionKey,
},
params.update,
{
skipMaintenance: params.skipMaintenance,
takeCacheOwnership: params.takeCacheOwnership,
},
);
}

View File

@@ -20,7 +20,8 @@ import {
buildAgentRuntimeDeliveryPlan,
buildAgentRuntimeOutcomePlan,
} from "../../agents/runtime-plan/build.js";
import { updateSessionStore, type SessionEntry } from "../../config/sessions.js";
import type { SessionEntry } from "../../config/sessions.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import { readSessionEntry } from "../../config/sessions/store-load.js";
import type { TypingMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
@@ -657,16 +658,33 @@ export function createFollowupRunner(params: {
if (!storePath) {
return;
}
await updateSessionStore(storePath, (store) => {
const persistedEntry = store[replySessionKey];
if (!persistedEntry) {
return;
}
await updateSessionEntry({ storePath, sessionKey: replySessionKey }, (persistedEntry) => {
if (!entryMatchesAutoFallbackPrimaryProbe(persistedEntry, probe)) {
return;
return null;
}
const shouldClearAuthProfile =
persistedEntry.authProfileOverrideSource === "auto" ||
(persistedEntry.authProfileOverrideSource === undefined &&
persistedEntry.authProfileOverrideCompactionCount !== undefined);
clearAutoFallbackPrimaryProbeSelection(persistedEntry);
store[replySessionKey] = persistedEntry;
return {
providerOverride: undefined,
modelOverride: undefined,
modelOverrideSource: undefined,
modelOverrideFallbackOriginProvider: undefined,
modelOverrideFallbackOriginModel: undefined,
...(shouldClearAuthProfile
? {
authProfileOverride: undefined,
authProfileOverrideSource: undefined,
authProfileOverrideCompactionCount: undefined,
}
: {}),
fallbackNoticeSelectedModel: undefined,
fallbackNoticeActiveModel: undefined,
fallbackNoticeReason: undefined,
updatedAt: persistedEntry.updatedAt,
};
});
};
fallbackProvider = run.provider;

View File

@@ -339,6 +339,9 @@ const agentRunnerRuntimeLoader = createLazyImportLoader(() => import("./agent-ru
const sessionUpdatesRuntimeLoader = createLazyImportLoader(
() => import("./session-updates.runtime.js"),
);
const sessionAccessorRuntimeLoader = createLazyImportLoader(
() => import("../../config/sessions/session-accessor.js"),
);
function loadEmbeddedAgentRuntime() {
return embeddedAgentRuntimeLoader.load();
@@ -352,6 +355,10 @@ function loadSessionUpdatesRuntime() {
return sessionUpdatesRuntimeLoader.load();
}
function loadSessionAccessorRuntime() {
return sessionAccessorRuntimeLoader.load();
}
function stripPromptThinkingDirectives(body: string): string {
return body
.split("\n")
@@ -904,6 +911,20 @@ export async function runPreparedReply(
// Execution fallbacks are turn-local; directive/model persistence owns
// durable thinking remaps so explicit session overrides survive replies.
resolvedThinkLevel = fallbackThinkLevel;
if (
sessionEntry &&
sessionStore &&
sessionKey &&
sessionEntry.thinkingLevel === previousThinkLevel
) {
sessionEntry.thinkingLevel = fallbackThinkLevel;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
const { upsertSessionEntry } = await loadSessionAccessorRuntime();
await upsertSessionEntry({ storePath, sessionKey }, sessionEntry);
}
}
}
}
const internalOpts = opts as InternalGetReplyOptions | undefined;

View File

@@ -519,13 +519,10 @@ export async function getReplyFromConfig(
sessionStore[sessionKey] = sessionEntry;
}
if (sessionKey && storePath) {
const { applySessionStoreEntryPatch } = await import("../../config/sessions.js");
await applySessionStoreEntryPatch({
storePath,
sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
patch: {
const { updateSessionEntry } = await import("../../config/sessions/session-accessor.js");
await updateSessionEntry(
{ storePath, sessionKey },
() => ({
pendingFinalDelivery: undefined,
pendingFinalDeliveryText: undefined,
pendingFinalDeliveryCreatedAt: undefined,
@@ -533,8 +530,12 @@ export async function getReplyFromConfig(
pendingFinalDeliveryAttemptCount: undefined,
pendingFinalDeliveryLastError: undefined,
pendingFinalDeliveryContext: undefined,
}),
{
skipMaintenance: true,
takeCacheOwnership: true,
},
});
);
}
}
}

View File

@@ -86,8 +86,8 @@ function shouldLogModelSelectionTiming(): boolean {
const modelCatalogRuntimeLoader = createLazyImportLoader(
() => import("../../agents/model-catalog.runtime.js"),
);
const sessionStoreRuntimeLoader = createLazyImportLoader(
() => import("../../config/sessions/store.runtime.js"),
const sessionAccessorRuntimeLoader = createLazyImportLoader(
() => import("../../config/sessions/session-accessor.js"),
);
function normalizeRuntimeModelRef(provider: string, model: string) {
return normalizeModelRef(provider, model, RUNTIME_MODEL_VISIBILITY_NORMALIZATION);
@@ -97,8 +97,8 @@ function loadModelCatalogRuntime() {
return modelCatalogRuntimeLoader.load();
}
function loadSessionStoreRuntime() {
return sessionStoreRuntimeLoader.load();
function loadSessionAccessorRuntime() {
return sessionAccessorRuntimeLoader.load();
}
function findSelectedCatalogEntry(params: {
@@ -290,11 +290,8 @@ export async function createModelSelectionState(params: {
if (updated) {
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await (
await loadSessionStoreRuntime()
).updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
const { replaceSessionEntry } = await loadSessionAccessorRuntime();
await replaceSessionEntry({ storePath, sessionKey }, sessionEntry);
}
}
resetModelOverride = updated;

View File

@@ -126,11 +126,9 @@ function applySelectionToSession(params: {
}
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
void import("../../config/sessions.js")
.then(({ updateSessionStore }) =>
updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
}),
void import("../../config/sessions/session-accessor.js")
.then(({ replaceSessionEntry }) =>
replaceSessionEntry({ storePath, sessionKey }, sessionEntry),
)
.catch(() => {
// Ignore persistence errors; session still proceeds.

View File

@@ -12,6 +12,7 @@ import {
type SessionEntry,
updateSessionStore,
} from "../../config/sessions.js";
import { upsertSessionEntry } from "../../config/sessions/session-accessor.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import {
forgetActiveSessionForShutdown,
@@ -43,17 +44,12 @@ async function persistSessionEntryUpdate(params: {
if (!params.storePath) {
return;
}
await updateSessionStore(
params.storePath,
(store) => {
const next = { ...store[params.sessionKey!], ...params.nextEntry };
store[params.sessionKey!] = next;
return next;
},
await upsertSessionEntry(
{
resolveSingleEntryPersistence: (entry) =>
entry && params.sessionKey ? { sessionKey: params.sessionKey, entry } : null,
storePath: params.storePath,
sessionKey: params.sessionKey,
},
params.nextEntry,
);
}

View File

@@ -14,8 +14,8 @@ import {
resolveSessionGoalDisplayState,
type SessionSystemPromptReport,
type SessionEntry,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import { updateSessionEntry } from "../../config/sessions/session-accessor.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { logVerbose } from "../../globals.js";
import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js";
@@ -140,12 +140,12 @@ export async function persistSessionUsageUpdate(params: {
if (hasUsage || hasFreshContextSnapshot || hasCompactionSnapshot) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
update: async (entry) => {
await updateSessionEntry(
{
storePath,
sessionKey,
},
async (entry) => {
const updatedAt = Date.now();
const preserveSessionModelState =
params.isHeartbeat === true || params.preserveUserFacingSessionModelState === true;
@@ -238,7 +238,11 @@ export async function persistSessionUsageUpdate(params: {
? patch
: applyCliSessionIdToSessionPatch(params, entry, patch);
},
});
{
skipMaintenance: true,
takeCacheOwnership: true,
},
);
} catch (err) {
logVerbose(`failed to persist ${label}usage update: ${String(err)}`);
}
@@ -247,12 +251,12 @@ export async function persistSessionUsageUpdate(params: {
if (params.modelUsed || params.contextTokensUsed) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
update: async (entry) => {
await updateSessionEntry(
{
storePath,
sessionKey,
},
async (entry) => {
const preserveSessionModelState =
params.isHeartbeat === true || params.preserveUserFacingSessionModelState === true;
const preserveUserFacingRunState = params.preserveUserFacingSessionModelState === true;
@@ -274,7 +278,11 @@ export async function persistSessionUsageUpdate(params: {
? patch
: applyCliSessionIdToSessionPatch(params, entry, patch);
},
});
{
skipMaintenance: true,
takeCacheOwnership: true,
},
);
} catch (err) {
logVerbose(`failed to persist ${label}model/context update: ${String(err)}`);
}