diff --git a/src/agents/cli-runner.bundle-mcp.e2e.test.ts b/src/agents/cli-runner.bundle-mcp.e2e.test.ts index 70b1ac06bb4c..008f2d047f33 100644 --- a/src/agents/cli-runner.bundle-mcp.e2e.test.ts +++ b/src/agents/cli-runner.bundle-mcp.e2e.test.ts @@ -14,7 +14,7 @@ import { import type { CliPreparedBackend, PreparedCliRunContext, - RunCliAgentParams, + PreparedRunCliAgentParams, } from "./cli-runner/types.js"; // This e2e spins a real stdio MCP server plus a spawned CLI process. Keep the @@ -151,7 +151,7 @@ async function prepareBundleMcpExecutionContext(params: { workspaceDir: params.workspaceDir, config: params.config, })) as CliPreparedBackend; - const runParams: RunCliAgentParams = { + const runParams: PreparedRunCliAgentParams = { sessionId: params.sessionId, sessionFile: params.sessionFile, workspaceDir: params.workspaceDir, diff --git a/src/agents/cli-runner.ts b/src/agents/cli-runner.ts index 704b419be177..edb48e1d9b5e 100644 --- a/src/agents/cli-runner.ts +++ b/src/agents/cli-runner.ts @@ -30,10 +30,15 @@ import { runAgentHarnessLlmInputHook, runAgentHarnessLlmOutputHook, } from "./harness/lifecycle-hook-helpers.js"; +import { + applyAgentRunSessionTargetIdentity, + resolveAgentRunSessionTarget, +} from "./run-session-target.js"; import type { AgentMessage } from "./runtime/index.js"; import { SessionManager } from "./sessions/session-manager.js"; const log = createSubsystemLogger("agents/cli-runner"); +type RunCliAgentParamsWithSessionFile = RunCliAgentParams & { sessionFile: string }; const cliRunnerDeps = { claudeCliSessionTranscriptHasContent: claudeCliSessionTranscriptHasContentImpl, @@ -197,7 +202,9 @@ async function runCliAgentEndHook( runAgentEndSideEffects(hookParams); } -async function persistApprovedCliUserTurnTranscript(params: RunCliAgentParams): Promise { +async function persistApprovedCliUserTurnTranscript( + params: RunCliAgentParamsWithSessionFile, +): Promise { if (params.suppressNextUserMessagePersistence === true || !params.userTurnTranscriptRecorder) { return; } @@ -281,7 +288,16 @@ async function finalizeCliContextEngineTurn(params: { } } -export async function runCliAgent(params: RunCliAgentParams): Promise { +export async function runCliAgent(paramsInput: RunCliAgentParams): Promise { + const paramsBase = applyAgentRunSessionTargetIdentity(paramsInput); + const runSessionTarget = await resolveAgentRunSessionTarget(paramsBase); + const params: RunCliAgentParamsWithSessionFile = { + ...paramsBase, + agentId: paramsBase.agentId ?? runSessionTarget.agentId, + sessionId: runSessionTarget.sessionId, + sessionKey: paramsBase.sessionKey ?? runSessionTarget.sessionKey, + sessionFile: runSessionTarget.sessionFile, + }; // Cron gate must fire before prepareCliRunContext — that call allocates // backend resources released only by runPreparedCliAgent's try…finally. params.onExecutionStarted?.(); diff --git a/src/agents/cli-runner/prepare.ts b/src/agents/cli-runner/prepare.ts index c65ac067e927..101016cfe6f6 100644 --- a/src/agents/cli-runner/prepare.ts +++ b/src/agents/cli-runner/prepare.ts @@ -76,7 +76,11 @@ import { loadCliSessionReseedMessages, resolveAutoCliSessionReseedHistoryChars, } from "./session-history.js"; -import type { CliReusableSession, PreparedCliRunContext, RunCliAgentParams } from "./types.js"; +import type { + CliReusableSession, + PreparedCliRunContext, + PreparedRunCliAgentParams, +} from "./types.js"; const prepareDeps = { makeBootstrapWarn: makeBootstrapWarnImpl, @@ -132,7 +136,7 @@ export function shouldSkipLocalCliCredentialEpoch(params: { } export async function prepareCliRunContext( - params: RunCliAgentParams, + params: PreparedRunCliAgentParams, ): Promise { const started = Date.now(); const workspaceResolution = resolveRunWorkspaceDir({ @@ -638,7 +642,7 @@ export async function prepareCliRunContext( config: contextEngineConfig, }); const contextEngineTurnPrompt = params.transcriptPrompt ?? params.prompt; - const preparedParams: RunCliAgentParams = { + const preparedParams: PreparedRunCliAgentParams = { ...params, config: contextEngineConfig, prompt: preparedPrompt, diff --git a/src/agents/cli-runner/types.ts b/src/agents/cli-runner/types.ts index 740d1506de46..982be4c9e0aa 100644 --- a/src/agents/cli-runner/types.ts +++ b/src/agents/cli-runner/types.ts @@ -24,15 +24,19 @@ import type { CurrentInboundPromptContext, EmbeddedRunTrigger, } from "../embedded-agent-runner/run/params.js"; +import type { AgentRunSessionTarget } from "../run-session-target.js"; import type { SilentReplyPromptMode } from "../system-prompt.types.js"; export type RunCliAgentParams = { sessionId: string; sessionKey?: string; + /** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */ + sessionTarget?: AgentRunSessionTarget; sessionEntry?: SessionEntry; agentId?: string; trigger?: EmbeddedRunTrigger; - sessionFile: string; + /** @deprecated Use sessionTarget plus sessionId/sessionKey/agentId for runtime identity. */ + sessionFile?: string; workspaceDir: string; /** Task working directory for CLI execution. Defaults to workspaceDir. */ cwd?: string; @@ -130,8 +134,10 @@ export type CliReusableSession = { | "orphaned-tool-use"; }; +export type PreparedRunCliAgentParams = RunCliAgentParams & { sessionFile: string }; + export type PreparedCliRunContext = { - params: RunCliAgentParams; + params: PreparedRunCliAgentParams; effectiveAuthProfileId?: string; started: number; workspaceDir: string; diff --git a/src/agents/embedded-agent-runner/compact.runtime.types.ts b/src/agents/embedded-agent-runner/compact.runtime.types.ts index 07824d6863a5..96bedddd13d5 100644 --- a/src/agents/embedded-agent-runner/compact.runtime.types.ts +++ b/src/agents/embedded-agent-runner/compact.runtime.types.ts @@ -1,6 +1,6 @@ -import type { CompactEmbeddedAgentSessionParams } from "./compact.types.js"; +import type { CompactEmbeddedAgentSessionRuntimeParams } from "./compact.types.js"; import type { EmbeddedAgentCompactResult } from "./types.js"; export type CompactEmbeddedAgentSessionDirect = ( - params: CompactEmbeddedAgentSessionParams, + params: CompactEmbeddedAgentSessionRuntimeParams, ) => Promise; diff --git a/src/agents/embedded-agent-runner/compact.ts b/src/agents/embedded-agent-runner/compact.ts index 4e79f2aad9f3..a43d715f3eab 100644 --- a/src/agents/embedded-agent-runner/compact.ts +++ b/src/agents/embedded-agent-runner/compact.ts @@ -95,6 +95,10 @@ import { ensureOpenClawModelsJson } from "../models-config.js"; import { wrapStreamFnTextTransforms } from "../plugin-text-transforms.js"; import { resolveAgentPromptSurfaceForSessionKey } from "../prompt-surface.js"; import { registerProviderStreamForModel } from "../provider-stream.js"; +import { + applyAgentRunSessionTargetIdentity, + resolveAgentRunSessionTarget, +} from "../run-session-target.js"; import { collectRuntimeChannelCapabilities } from "../runtime-capabilities.js"; import { buildAgentRuntimePlan } from "../runtime-plan/build.js"; import type { AgentRuntimePlan } from "../runtime-plan/types.js"; @@ -123,6 +127,7 @@ import { } from "./compact-reasons.js"; import type { CompactEmbeddedAgentSessionParams, + CompactEmbeddedAgentSessionRuntimeParams, CompactionMessageMetrics, } from "./compact.types.js"; import { dedupeDuplicateUserMessagesForCompaction } from "./compaction-duplicate-user-messages.js"; @@ -175,6 +180,10 @@ import { mapThinkingLevel, normalizeContextTokenBudget } from "./utils.js"; import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js"; export type { CompactEmbeddedAgentSessionParams } from "./compact.types.js"; +type CompactEmbeddedAgentSessionParamsWithSessionFile = CompactEmbeddedAgentSessionRuntimeParams & { + sessionFile: string; +}; + function hasRealConversationContent( msg: AgentMessage, messages: AgentMessage[], @@ -417,8 +426,17 @@ function fallbackFailureToCompactionResult(err: unknown): EmbeddedAgentCompactRe * Use this when already inside a session/global lane to avoid deadlocks. */ export async function compactEmbeddedAgentSessionDirect( - params: CompactEmbeddedAgentSessionParams, + paramsInput: CompactEmbeddedAgentSessionRuntimeParams, ): Promise { + const paramsBase = applyAgentRunSessionTargetIdentity(paramsInput); + const runSessionTarget = await resolveAgentRunSessionTarget(paramsBase); + const params: CompactEmbeddedAgentSessionParamsWithSessionFile = { + ...paramsBase, + agentId: paramsBase.agentId ?? runSessionTarget.agentId, + sessionId: runSessionTarget.sessionId, + sessionKey: paramsBase.sessionKey ?? runSessionTarget.sessionKey, + sessionFile: runSessionTarget.sessionFile, + }; if (hasExplicitCompactionModel(params) || !hasCompactionModelFallbackCandidates(params)) { return await compactEmbeddedAgentSessionDirectOnce(params); } @@ -483,7 +501,7 @@ export async function compactEmbeddedAgentSessionDirect( } async function compactEmbeddedAgentSessionDirectOnce( - params: CompactEmbeddedAgentSessionParams, + params: CompactEmbeddedAgentSessionParamsWithSessionFile, ): Promise { const startedAt = Date.now(); const diagId = params.diagId?.trim() || createCompactionDiagId(); diff --git a/src/agents/embedded-agent-runner/compact.types.ts b/src/agents/embedded-agent-runner/compact.types.ts index f161cd8ad908..a3f8ef3ddd40 100644 --- a/src/agents/embedded-agent-runner/compact.types.ts +++ b/src/agents/embedded-agent-runner/compact.types.ts @@ -5,6 +5,7 @@ import type { ContextEngine, ContextEngineRuntimeContext } from "../../context-e import type { CommandQueueEnqueueFn } from "../../process/command-queue.types.js"; import type { SkillSnapshot } from "../../skills/types.js"; import type { ExecElevatedDefaults, ExecToolDefaults } from "../bash-tools.exec-types.js"; +import type { AgentRunSessionTarget } from "../run-session-target.js"; import type { AgentRuntimePlan } from "../runtime-plan/types.js"; export type CompactEmbeddedAgentSessionParams = { @@ -35,6 +36,9 @@ export type CompactEmbeddedAgentSessionParams = { groupSpace?: string | null; /** Parent session key for subagent policy inheritance. */ spawnedBy?: string | null; + /** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */ + sessionTarget?: AgentRunSessionTarget; + /** Active file-backed artifact for current compaction internals. */ sessionFile: string; /** Optional caller-observed live prompt tokens used for compaction diagnostics. */ currentTokenCount?: number; @@ -97,6 +101,14 @@ export type CompactEmbeddedAgentSessionParams = { allowGatewaySubagentBinding?: boolean; }; +export type CompactEmbeddedAgentSessionRuntimeParams = Omit< + CompactEmbeddedAgentSessionParams, + "sessionFile" +> & { + /** @deprecated Use sessionTarget plus sessionId/sessionKey/agentId for runtime identity. */ + sessionFile?: string; +}; + export type CompactionMessageMetrics = { messages: number; historyTextChars: number; diff --git a/src/agents/embedded-agent-runner/run.overflow-compaction.test.ts b/src/agents/embedded-agent-runner/run.overflow-compaction.test.ts index cdb95b67e679..6d0dc44013e8 100644 --- a/src/agents/embedded-agent-runner/run.overflow-compaction.test.ts +++ b/src/agents/embedded-agent-runner/run.overflow-compaction.test.ts @@ -2,6 +2,11 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { + loadSqliteSessionEntry, + patchSqliteSessionEntry, +} from "../../config/sessions/session-accessor.sqlite.js"; +import { closeOpenClawAgentDatabasesForTest } from "../../state/openclaw-agent-db.js"; import type { AgentHarness } from "../harness/types.js"; import type { AgentInternalEvent } from "../internal-events.js"; import type { AgentRuntimePlan } from "../runtime-plan/types.js"; @@ -1980,6 +1985,130 @@ describe("runEmbeddedAgent overflow compaction trigger routing", () => { } }); + it("recovers empty-transcript preflight compaction through a forced SQLite target", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-empty-preflight-sqlite-")); + const storePath = path.join(dir, "state", "agents", "helper", "sessions", "sessions.json"); + const sqlitePath = path.join( + dir, + "state", + "agents", + "helper", + "agent", + "openclaw-agent.sqlite", + ); + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile( + storePath, + JSON.stringify({ + "test-key": { + sessionId: "test-session", + updatedAt: 1, + totalTokens: 1_500_000, + totalTokensFresh: true, + contextBudgetStatus: { stale: true }, + }, + }), + "utf8", + ); + await patchSqliteSessionEntry( + { agentId: "helper", sessionKey: "test-key", storePath: sqlitePath }, + () => ({ + contextBudgetStatus: { + schemaVersion: 1, + source: "pre-prompt-estimate", + updatedAt: 1, + provider: "claude-cli", + model: "claude-opus-4-7", + route: "compact_only", + shouldCompact: true, + estimatedPromptTokens: 1_794_391, + contextTokenBudget: 1_048_576, + promptBudgetBeforeReserve: 1_044_480, + reserveTokens: 4_096, + effectiveReserveTokens: 4_096, + remainingPromptBudgetTokens: 0, + overflowTokens: 749_911, + toolResultReducibleChars: 0, + messageCount: 0, + unwindowedMessageCount: 0, + }, + sessionId: "test-session", + totalTokens: 1_500_000, + totalTokensFresh: true, + updatedAt: 1, + }), + { + fallbackEntry: { + sessionId: "test-session", + updatedAt: 1, + }, + }, + ); + + mockedRunEmbeddedAttempt + .mockResolvedValueOnce( + makeAttemptResult({ + promptError: makeOverflowError(), + promptErrorSource: "precheck", + preflightRecovery: { route: "compact_only" }, + contextBudgetStatus: { + schemaVersion: 1, + source: "pre-prompt-estimate", + updatedAt: 1, + provider: "claude-cli", + model: "claude-opus-4-7", + route: "compact_only", + shouldCompact: true, + estimatedPromptTokens: 1_794_391, + contextTokenBudget: 1_048_576, + promptBudgetBeforeReserve: 1_044_480, + reserveTokens: 4_096, + effectiveReserveTokens: 4_096, + remainingPromptBudgetTokens: 0, + overflowTokens: 749_911, + toolResultReducibleChars: 0, + messageCount: 0, + unwindowedMessageCount: 0, + }, + assistantTexts: [], + }), + ) + .mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + mockedCompactDirect.mockResolvedValueOnce({ + ok: true, + compacted: false, + reason: "no real conversation messages", + }); + + try { + await runEmbeddedAgent({ + ...overflowBaseRunParams, + agentId: "helper", + config: { + session: { + store: storePath, + }, + } as RunEmbeddedAgentParams["config"], + sessionFile: undefined, + sessionTarget: { storageKind: "sqlite" }, + }); + + const sqliteEntry = loadSqliteSessionEntry({ + agentId: "helper", + sessionKey: "test-key", + storePath: sqlitePath, + }); + expect(sqliteEntry?.totalTokens).toBe(0); + expect(sqliteEntry?.totalTokensFresh).toBe(true); + expect(sqliteEntry?.contextBudgetStatus).toBeUndefined(); + const jsonEntry = JSON.parse(await fs.readFile(storePath, "utf8"))["test-key"]; + expect(jsonEntry.totalTokens).toBe(1_500_000); + } finally { + closeOpenClawAgentDatabasesForTest(); + await fs.rm(dir, { recursive: true, force: true }); + } + }); + it("passes observed overflow token counts into compaction when providers report them", async () => { const overflowError = new Error( '400 {"type":"error","error":{"type":"invalid_request_error","message":"prompt is too long: 277403 tokens > 200000 maximum"}}', diff --git a/src/agents/embedded-agent-runner/run.ts b/src/agents/embedded-agent-runner/run.ts index 7572a61f940b..5a910fc57082 100644 --- a/src/agents/embedded-agent-runner/run.ts +++ b/src/agents/embedded-agent-runner/run.ts @@ -6,6 +6,7 @@ import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; import { resolveStorePath, updateSessionStoreEntry } from "../../config/sessions.js"; +import { updateSqliteSessionEntry } from "../../config/sessions/session-accessor.sqlite.js"; import { ensureContextEnginesInitialized } from "../../context-engine/init.js"; import { resolveContextEngine, @@ -94,6 +95,12 @@ import { } from "../openai-routing.js"; import { resolveProviderIdForAuth } from "../provider-auth-aliases.js"; import { runAgentCleanupStep } from "../run-cleanup-timeout.js"; +import { + applyAgentRunSessionTargetIdentity, + persistAgentRunSessionTargetIdentity, + resolveAgentRunSessionTarget, + type ResolvedAgentRunSessionTarget, +} from "../run-session-target.js"; import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js"; import { buildAgentRuntimePlan } from "../runtime-plan/build.js"; import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js"; @@ -201,6 +208,7 @@ import type { import { createUsageAccumulator, mergeUsageIntoAccumulator } from "./usage-accumulator.js"; type ApiKeyInfo = ResolvedProviderAuth; +type RunEmbeddedAgentParamsWithSessionFile = RunEmbeddedAgentParams & { sessionFile: string }; const MAX_SAME_MODEL_IDLE_TIMEOUT_RETRIES = 1; const EMBEDDED_RUN_LANE_TIMEOUT_GRACE_MS = 30_000; @@ -227,27 +235,51 @@ async function resetNoRealConversationTokenSnapshot(params: { config?: RunEmbeddedAgentParams["config"]; sessionKey?: string; agentId?: string; + runSessionTarget?: ResolvedAgentRunSessionTarget; }): Promise { if (!params.sessionKey) { return; } - const storePath = resolveStorePath(params.config?.session?.store, { agentId: params.agentId }); + const resetPatch = () => ({ + totalTokens: 0, + totalTokensFresh: true, + inputTokens: undefined, + outputTokens: undefined, + cacheRead: undefined, + cacheWrite: undefined, + contextBudgetStatus: undefined, + updatedAt: Date.now(), + }); try { + if (params.runSessionTarget?.storageKind === "sqlite") { + await updateSqliteSessionEntry( + { + agentId: params.runSessionTarget.agentId, + storePath: params.runSessionTarget.sqlitePath, + sessionKey: params.sessionKey, + }, + async () => resetPatch(), + ); + return; + } + const storePath = resolveStorePath(params.config?.session?.store, { agentId: params.agentId }); + if (storePath.trim().toLowerCase().endsWith(".sqlite")) { + await updateSqliteSessionEntry( + { + ...(params.agentId ? { agentId: params.agentId } : {}), + storePath, + sessionKey: params.sessionKey, + }, + async () => resetPatch(), + ); + return; + } await updateSessionStoreEntry({ storePath, sessionKey: params.sessionKey, skipMaintenance: true, takeCacheOwnership: true, - update: async () => ({ - totalTokens: 0, - totalTokensFresh: true, - inputTokens: undefined, - outputTokens: undefined, - cacheRead: undefined, - cacheWrite: undefined, - contextBudgetStatus: undefined, - updatedAt: Date.now(), - }), + update: async () => resetPatch(), }); } catch (err) { log.warn( @@ -458,18 +490,36 @@ function buildHandledReplyPayloads(reply?: ReplyPayload) { export async function runEmbeddedAgent( paramsInput: RunEmbeddedAgentParams, ): Promise { - let params = paramsInput; + let paramsBase = applyAgentRunSessionTargetIdentity(paramsInput); // Resolve sessionKey early so all downstream consumers (hooks, LCM, compaction) // receive a non-null key even when callers omit it. See #60552. const effectiveSessionKey = backfillSessionKey({ - config: params.config, - sessionId: params.sessionId, - sessionKey: params.sessionKey, - agentId: params.agentId, + config: paramsBase.config, + sessionId: paramsBase.sessionId, + sessionKey: paramsBase.sessionKey, + agentId: paramsBase.agentId, }); - if (effectiveSessionKey !== params.sessionKey) { - params = { ...params, sessionKey: effectiveSessionKey }; + if (effectiveSessionKey !== paramsBase.sessionKey) { + paramsBase = { ...paramsBase, sessionKey: effectiveSessionKey }; } + const runSessionTarget = await resolveAgentRunSessionTarget(paramsBase); + const params: RunEmbeddedAgentParamsWithSessionFile = { + ...paramsBase, + agentId: paramsBase.agentId ?? runSessionTarget.agentId, + sessionId: runSessionTarget.sessionId, + sessionKey: paramsBase.sessionKey ?? runSessionTarget.sessionKey, + sessionFile: runSessionTarget.sessionFile, + }; + const persistActiveRunSessionTarget = async (identity: { + sessionFile: string; + sessionId: string; + }) => { + await persistAgentRunSessionTargetIdentity({ + target: runSessionTarget, + sessionFile: identity.sessionFile, + sessionId: identity.sessionId, + }); + }; const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId); const globalLane = resolveGlobalLane(params.lane); const sessionQueuePriority = resolveEmbeddedRunSessionQueuePriority(params.trigger); @@ -1347,8 +1397,10 @@ export async function runEmbeddedAgent( ) => { const nextSessionId = compactResult.result?.sessionId; const nextSessionFile = compactResult.result?.sessionFile; + let changed = false; if (nextSessionId && nextSessionId !== activeSessionId) { activeSessionId = nextSessionId; + changed = true; // Keep the run context's sessionId tracking the live session so // lifecycle persistence isn't treated as stale after a legitimate // mid-run compaction rotation (#88538). @@ -1356,7 +1408,9 @@ export async function runEmbeddedAgent( } if (nextSessionFile && nextSessionFile !== activeSessionFile) { activeSessionFile = nextSessionFile; + changed = true; } + return changed; }; const onCompactionHookMessages = async (payload: { phase: "before" | "after"; @@ -1709,13 +1763,22 @@ export async function runEmbeddedAgent( currentAttemptAssistant, } = attempt; const timedOutDuringToolExecution = attempt.timedOutDuringToolExecution ?? false; + let activeTargetChanged = false; if (sessionIdUsed && sessionIdUsed !== activeSessionId) { activeSessionId = sessionIdUsed; + activeTargetChanged = true; // Track the live session for lifecycle persistence identity (#88538). registerAgentRunContext(params.runId, { sessionId: activeSessionId }); } if (sessionFileUsed && sessionFileUsed !== activeSessionFile) { activeSessionFile = sessionFileUsed; + activeTargetChanged = true; + } + if (activeTargetChanged) { + await persistActiveRunSessionTarget({ + sessionFile: activeSessionFile, + sessionId: activeSessionId, + }); } bootstrapPromptWarningSignaturesSeen = attempt.bootstrapPromptWarningSignaturesSeen ?? @@ -1978,7 +2041,12 @@ export async function runEmbeddedAgent( }; } if (timeoutCompactResult.compacted) { - adoptCompactionTranscript(timeoutCompactResult); + if (adoptCompactionTranscript(timeoutCompactResult)) { + await persistActiveRunSessionTarget({ + sessionFile: activeSessionFile, + sessionId: activeSessionId, + }); + } } await runOwnsCompactionAfterHook("timeout recovery", timeoutCompactResult); if (timeoutCompactResult.compacted) { @@ -2165,7 +2233,12 @@ export async function runEmbeddedAgent( params.abortSignal, ); if (compactResult.ok && compactResult.compacted) { - adoptCompactionTranscript(compactResult); + if (adoptCompactionTranscript(compactResult)) { + await persistActiveRunSessionTarget({ + sessionFile: activeSessionFile, + sessionId: activeSessionId, + }); + } await runContextEngineMaintenance({ contextEngine, sessionId: activeSessionId, @@ -2195,6 +2268,7 @@ export async function runEmbeddedAgent( config: params.config, sessionKey: params.sessionKey, agentId: sessionAgentId, + runSessionTarget, }); log.info( `[context-overflow-precheck] stale token state had no real conversation messages for ` + @@ -2206,7 +2280,12 @@ export async function runEmbeddedAgent( continue; } if (compactResult.compacted) { - adoptCompactionTranscript(compactResult); + if (adoptCompactionTranscript(compactResult)) { + await persistActiveRunSessionTarget({ + sessionFile: activeSessionFile, + sessionId: activeSessionId, + }); + } if ( typeof compactResult.result?.tokensAfter === "number" && Number.isFinite(compactResult.result.tokensAfter) && diff --git a/src/agents/embedded-agent-runner/run/params.ts b/src/agents/embedded-agent-runner/run/params.ts index 54ba60b1faa6..c7ab69632e21 100644 --- a/src/agents/embedded-agent-runner/run/params.ts +++ b/src/agents/embedded-agent-runner/run/params.ts @@ -22,6 +22,7 @@ import type { ToolResultFormat, } from "../../embedded-agent-subscribe.shared-types.js"; import type { AgentInternalEvent } from "../../internal-events.js"; +import type { AgentRunSessionTarget } from "../../run-session-target.js"; import type { AgentMessage } from "../../runtime/index.js"; import type { SilentReplyPromptMode } from "../../system-prompt.types.js"; import type { PromptMode } from "../../system-prompt.types.js"; @@ -40,6 +41,8 @@ export type CurrentInboundPromptContext = { export type RunEmbeddedAgentParams = { sessionId: string; sessionKey?: string; + /** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */ + sessionTarget?: AgentRunSessionTarget; /** Provider prompt-cache affinity key; distinct from transcript/session identity. */ promptCacheKey?: string; /** Session-like key for sandbox and tool-policy resolution. Defaults to sessionKey. */ @@ -102,7 +105,8 @@ export type RunEmbeddedAgentParams = { forceHeartbeatTool?: boolean; /** Allow runtime plugins for this run to late-bind the gateway subagent. */ allowGatewaySubagentBinding?: boolean; - sessionFile: string; + /** @deprecated Use sessionTarget plus sessionId/sessionKey/agentId for runtime identity. */ + sessionFile?: string; workspaceDir: string; /** Task working directory for tool/runtime execution. Defaults to workspaceDir. */ cwd?: string; diff --git a/src/agents/embedded-agent-runner/run/types.ts b/src/agents/embedded-agent-runner/run/types.ts index a928853931af..ffd0ae6cb498 100644 --- a/src/agents/embedded-agent-runner/run/types.ts +++ b/src/agents/embedded-agent-runner/run/types.ts @@ -29,8 +29,15 @@ import type { PreemptiveCompactionRoute } from "./preemptive-compaction.types.js type EmbeddedRunAttemptBase = Omit< RunEmbeddedAgentParams, - "provider" | "model" | "authProfileId" | "authProfileIdSource" | "thinkLevel" | "lane" | "enqueue" ->; + | "provider" + | "model" + | "authProfileId" + | "authProfileIdSource" + | "thinkLevel" + | "lane" + | "enqueue" + | "sessionFile" +> & { sessionFile: string }; export type EmbeddedRunContextWindowInfo = { tokens: number; diff --git a/src/agents/run-session-target.test.ts b/src/agents/run-session-target.test.ts new file mode 100644 index 000000000000..40ec518912eb --- /dev/null +++ b/src/agents/run-session-target.test.ts @@ -0,0 +1,235 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { + loadSqliteSessionEntry, + patchSqliteSessionEntry, +} from "../config/sessions/session-accessor.sqlite.js"; +import { loadSessionStore } from "../config/sessions/store.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js"; +import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; +import { + persistAgentRunSessionTargetIdentity, + resolveAgentRunSessionTarget, +} from "./run-session-target.js"; + +describe("agent run session target", () => { + let tempDir: string; + + beforeEach(() => { + tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-run-session-target-")); + }); + + afterEach(() => { + closeOpenClawAgentDatabasesForTest(); + closeOpenClawStateDatabaseForTest(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it("resolves runtime identity through the run config store", async () => { + const storePath = path.join(tempDir, "custom-sessions", "sessions.json"); + const sessionKey = "agent:helper:commitments:test-run"; + + const target = await resolveAgentRunSessionTarget({ + agentId: "helper", + config: { session: { store: storePath } } as OpenClawConfig, + sessionId: "test-run", + sessionKey, + }); + + expect(target).toMatchObject({ + agentId: "helper", + sessionId: "test-run", + sessionKey, + targetKind: "runtime-session", + }); + expect(path.dirname(target.sessionFile)).toBe(path.dirname(storePath)); + expect(loadSessionStore(storePath, { skipCache: true })[sessionKey]?.sessionFile).toBe( + target.sessionFile, + ); + }); + + it("uses the agent from an agent-scoped session key when agentId is omitted", async () => { + const storeRoot = path.join(tempDir, "agents", "{agentId}", "sessions.json"); + const sessionKey = "agent:helper:main"; + + const target = await resolveAgentRunSessionTarget({ + config: { session: { store: storeRoot } } as OpenClawConfig, + sessionId: "helper-session", + sessionKey, + }); + + const helperStorePath = path.join(tempDir, "agents", "helper", "sessions.json"); + expect(target.agentId).toBe("helper"); + expect(path.dirname(target.sessionFile)).toBe(path.dirname(helperStorePath)); + expect(loadSessionStore(helperStorePath, { skipCache: true })[sessionKey]?.sessionFile).toBe( + target.sessionFile, + ); + }); + + it("resolves SQLite identity through the persisted active file artifact", async () => { + const sqlitePath = path.join(tempDir, "helper", "openclaw-agent.sqlite"); + const sessionKey = "agent:helper:commitments:sqlite-run"; + + const target = await resolveAgentRunSessionTarget({ + agentId: "helper", + config: { session: { store: sqlitePath } } as OpenClawConfig, + sessionId: "sqlite-run", + sessionKey, + }); + + expect(target).toMatchObject({ + activeArtifactKind: "embedded-run-session-file", + agentId: "helper", + sessionId: "sqlite-run", + sessionKey, + sqlitePath, + storageKind: "sqlite", + targetKind: "sqlite-runtime-session", + }); + expect(path.dirname(target.sessionFile)).toBe( + path.join(path.dirname(sqlitePath), "embedded-run-session-files"), + ); + expect( + loadSqliteSessionEntry({ + agentId: "helper", + sessionKey, + storePath: sqlitePath, + }), + ).toMatchObject({ + sessionFile: target.sessionFile, + sessionId: "sqlite-run", + }); + + const rotatedSessionFile = path.join( + path.dirname(target.sessionFile), + "2026-06-04T12-00-00-000Z_sqlite-run-compact.jsonl", + ); + await persistAgentRunSessionTargetIdentity({ + sessionFile: rotatedSessionFile, + sessionId: "sqlite-run-compact", + target, + }); + + const rotatedTarget = await resolveAgentRunSessionTarget({ + agentId: "helper", + config: { session: { store: sqlitePath } } as OpenClawConfig, + sessionId: "sqlite-run-compact", + sessionKey, + }); + + expect(rotatedTarget).toMatchObject({ + sessionFile: rotatedSessionFile, + sessionId: "sqlite-run-compact", + storageKind: "sqlite", + }); + + const nextTarget = await resolveAgentRunSessionTarget({ + agentId: "helper", + config: { session: { store: sqlitePath } } as OpenClawConfig, + sessionId: "sqlite-run-next", + sessionKey, + }); + + expect(nextTarget).toMatchObject({ + sessionId: "sqlite-run-next", + storageKind: "sqlite", + }); + expect(nextTarget.sessionFile).toContain("sqlite-run-next.jsonl"); + expect( + loadSqliteSessionEntry({ + agentId: "helper", + sessionKey, + storePath: sqlitePath, + })?.sessionId, + ).toBe("sqlite-run-next"); + expect( + loadSqliteSessionEntry({ + agentId: "helper", + sessionKey, + storePath: sqlitePath, + })?.sessionFile, + ).toBe(nextTarget.sessionFile); + }); + + it("ignores stale SQLite session files outside the active artifact boundary", async () => { + const sqlitePath = path.join(tempDir, "helper", "openclaw-agent.sqlite"); + const sessionKey = "agent:helper:commitments:sqlite-stale-file"; + const legacySessionFile = path.join(tempDir, "legacy", "session.jsonl"); + await patchSqliteSessionEntry( + { + agentId: "helper", + sessionKey, + storePath: sqlitePath, + }, + () => ({ + sessionFile: legacySessionFile, + sessionId: "sqlite-stale-file", + updatedAt: Date.now(), + }), + { + fallbackEntry: { + sessionFile: legacySessionFile, + sessionId: "sqlite-stale-file", + updatedAt: Date.now(), + }, + }, + ); + + const target = await resolveAgentRunSessionTarget({ + agentId: "helper", + config: { session: { store: sqlitePath } } as OpenClawConfig, + sessionId: "sqlite-stale-file", + sessionKey, + }); + + expect(target.sessionFile).not.toBe(legacySessionFile); + expect(path.dirname(target.sessionFile)).toBe( + path.join(path.dirname(sqlitePath), "embedded-run-session-files"), + ); + }); + + it("can force SQLite resolution for canonical agent session stores", async () => { + const storeRoot = path.join( + tempDir, + "state", + "agents", + "{agentId}", + "sessions", + "sessions.json", + ); + const sqlitePath = path.join( + tempDir, + "state", + "agents", + "helper", + "agent", + "openclaw-agent.sqlite", + ); + const sessionKey = "agent:helper:main"; + + const target = await resolveAgentRunSessionTarget({ + config: { session: { store: storeRoot } } as OpenClawConfig, + sessionId: "helper-session", + sessionKey, + sessionTarget: { storageKind: "sqlite" }, + }); + + expect(target).toMatchObject({ + agentId: "helper", + sessionId: "helper-session", + sessionKey, + sqlitePath, + storageKind: "sqlite", + }); + expect( + loadSqliteSessionEntry({ + agentId: "helper", + sessionKey, + storePath: sqlitePath, + })?.sessionId, + ).toBe("helper-session"); + }); +}); diff --git a/src/agents/run-session-target.ts b/src/agents/run-session-target.ts new file mode 100644 index 000000000000..06bc7220f03c --- /dev/null +++ b/src/agents/run-session-target.ts @@ -0,0 +1,139 @@ +import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce"; +import { resolveStorePath } from "../config/sessions/paths.js"; +import { + resolveSessionTranscriptRuntimeTarget, + type SessionTranscriptRuntimeTarget, +} from "../config/sessions/session-accessor.js"; +import { + patchSqliteSessionEntry, + resolveSqliteSessionTranscriptRuntimeTarget, + type SqliteSessionTranscriptRuntimeTarget, +} from "../config/sessions/session-accessor.sqlite.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { resolveAgentIdFromSessionKey } from "../routing/session-key.js"; + +/** Identifies a run transcript target without naming the current storage artifact. */ +export type AgentRunSessionTarget = { + agentId?: string; + sessionId?: string; + sessionKey?: string; + storageKind?: "file" | "sqlite"; + storePath?: string; + threadId?: string | number; +}; + +/** Target resolved from storage-neutral run identity for current run internals. */ +export type ResolvedAgentRunSessionTarget = + | SessionTranscriptRuntimeTarget + | SqliteSessionTranscriptRuntimeTarget; + +/** Resolves the active file-backed target used by current run/session internals. */ +export async function resolveAgentRunSessionTarget(params: { + agentId?: string; + config?: OpenClawConfig; + sessionFile?: string; + sessionId: string; + sessionKey?: string; + sessionTarget?: AgentRunSessionTarget; +}): Promise { + const sessionTarget = params.sessionTarget; + const agentId = normalizeOptionalString(sessionTarget?.agentId) ?? params.agentId; + const sessionId = normalizeOptionalString(sessionTarget?.sessionId) ?? params.sessionId; + const sessionKey = normalizeOptionalString(sessionTarget?.sessionKey) ?? params.sessionKey; + const effectiveAgentId = agentId ?? resolveAgentIdFromSessionKey(sessionKey); + const sessionFile = normalizeOptionalString(params.sessionFile); + if (sessionFile) { + return { + agentId: effectiveAgentId ?? "", + sessionFile, + sessionId, + sessionKey: sessionKey ?? "", + storageKind: "file", + targetKind: "active-session-file", + }; + } + if (!sessionKey) { + throw new Error(`Cannot resolve run session target without a session key: ${sessionId}`); + } + const storePath = + normalizeOptionalString(sessionTarget?.storePath) ?? + resolveStorePath(params.config?.session?.store, { agentId: effectiveAgentId }); + const scope = { + ...(effectiveAgentId ? { agentId: effectiveAgentId } : {}), + sessionId, + sessionKey, + storePath, + ...(sessionTarget?.threadId !== undefined ? { threadId: sessionTarget.threadId } : {}), + }; + if (shouldUseSqliteSessionTarget({ sessionTarget, storePath })) { + return await resolveSqliteSessionTranscriptRuntimeTarget(scope); + } + return await resolveSessionTranscriptRuntimeTarget({ + ...scope, + }); +} + +/** Persists the current active artifact for storage-neutral run/session targets. */ +export async function persistAgentRunSessionTargetIdentity(params: { + sessionFile: string; + sessionId: string; + target: ResolvedAgentRunSessionTarget; +}): Promise { + if (params.target.storageKind !== "sqlite") { + return; + } + const now = Date.now(); + await patchSqliteSessionEntry( + { + agentId: params.target.agentId, + sessionKey: params.target.sessionKey, + storePath: params.target.sqlitePath, + }, + () => ({ + sessionFile: params.sessionFile, + sessionId: params.sessionId, + updatedAt: now, + }), + { + fallbackEntry: { + sessionFile: params.sessionFile, + sessionId: params.sessionId, + updatedAt: now, + }, + }, + ); +} + +function shouldUseSqliteSessionTarget(params: { + sessionTarget?: AgentRunSessionTarget; + storePath: string; +}): boolean { + if (params.sessionTarget?.storageKind === "sqlite") { + return true; + } + if (params.sessionTarget?.storageKind === "file") { + return false; + } + return params.storePath.trim().toLowerCase().endsWith(".sqlite"); +} + +/** Applies identity fields from the explicit target before legacy backfills run. */ +export function applyAgentRunSessionTargetIdentity< + T extends { + agentId?: string; + sessionId: string; + sessionKey?: string; + sessionTarget?: AgentRunSessionTarget; + }, +>(params: T): T { + const target = params.sessionTarget; + if (!target) { + return params; + } + return { + ...params, + agentId: normalizeOptionalString(target.agentId) ?? params.agentId, + sessionId: normalizeOptionalString(target.sessionId) ?? params.sessionId, + sessionKey: normalizeOptionalString(target.sessionKey) ?? params.sessionKey, + }; +} diff --git a/src/auto-reply/reply/session-fork.ts b/src/auto-reply/reply/session-fork.ts index 47bd07ec3acd..e8c1ade3a9c6 100644 --- a/src/auto-reply/reply/session-fork.ts +++ b/src/auto-reply/reply/session-fork.ts @@ -1,4 +1,7 @@ +import path from "node:path"; +import { resolveStorePath } from "../../config/sessions/paths.js"; import type { SessionEntry } from "../../config/sessions/types.js"; +import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; /** @@ -23,6 +26,20 @@ export type ParentForkDecision = message: string; }; +type ParentForkDecisionParams = { + parentEntry: SessionEntry; + agentId?: string; + config?: OpenClawConfig; + storePath?: string; +}; + +type ForkSessionFromParentParams = { + parentEntry: SessionEntry; + agentId: string; + config?: OpenClawConfig; + sessionsDir?: string; +}; + function loadSessionForkRuntime(): Promise { return sessionForkRuntimeLoader.load(); } @@ -37,14 +54,31 @@ function formatParentForkTooLargeMessage(params: { ); } -export async function resolveParentForkDecision(params: { - parentEntry: SessionEntry; - storePath: string; -}): Promise { +function resolveParentForkStorePath(params: { + agentId?: string; + config?: OpenClawConfig; + storePath?: string; +}): string { + return ( + params.storePath ?? resolveStorePath(params.config?.session?.store, { agentId: params.agentId }) + ); +} + +function resolveParentForkSessionsDir(params: { + agentId: string; + config?: OpenClawConfig; + sessionsDir?: string; +}): string { + return params.sessionsDir ?? path.dirname(resolveParentForkStorePath(params)); +} + +export async function resolveParentForkDecision( + params: ParentForkDecisionParams, +): Promise { const maxTokens = DEFAULT_PARENT_FORK_MAX_TOKENS; const parentTokens = await resolveParentForkTokenCount({ parentEntry: params.parentEntry, - storePath: params.storePath, + storePath: resolveParentForkStorePath(params), }); if (typeof parentTokens === "number" && parentTokens > maxTokens) { return { @@ -62,13 +96,14 @@ export async function resolveParentForkDecision(params: { }; } -export async function forkSessionFromParent(params: { - parentEntry: SessionEntry; - agentId: string; - sessionsDir: string; -}): Promise<{ sessionId: string; sessionFile: string } | null> { +export async function forkSessionFromParent( + params: ForkSessionFromParentParams, +): Promise<{ sessionId: string; sessionFile: string } | null> { const runtime = await loadSessionForkRuntime(); - return runtime.forkSessionFromParentRuntime(params); + return runtime.forkSessionFromParentRuntime({ + ...params, + sessionsDir: resolveParentForkSessionsDir(params), + }); } async function resolveParentForkTokenCount(params: { diff --git a/src/commitments/runtime.test.ts b/src/commitments/runtime.test.ts index 2e853779f14d..c20159ee323c 100644 --- a/src/commitments/runtime.test.ts +++ b/src/commitments/runtime.test.ts @@ -25,6 +25,7 @@ vi.mock("./model-selection.runtime.js", () => ({ })); function requireFirstEmbeddedAgentRequest(): { + config?: OpenClawConfig; provider?: string; model?: string; disableTools?: boolean; @@ -224,6 +225,9 @@ describe("commitment extraction runtime", () => { expect(request.provider).toBe("openai"); expect(request.model).toBe("gpt-5.5"); expect(request.disableTools).toBe(true); + expect(request.config?.session?.store).toMatch( + /commitments\/extractor-sessions\/main\/sessions\.json$/, + ); }); it("backs off hidden extraction after terminal model or auth failures", async () => { diff --git a/src/commitments/runtime.ts b/src/commitments/runtime.ts index 0ea8f429fa2a..46e7979859c7 100644 --- a/src/commitments/runtime.ts +++ b/src/commitments/runtime.ts @@ -191,16 +191,6 @@ function openTerminalFailureCooldown( }); } -function resolveExtractionSessionFile(agentId: string, runId: string): string { - return path.join( - resolveStateDir(), - "commitments", - "extractor-sessions", - agentId, - `${runId}.jsonl`, - ); -} - function joinPayloadText(result: EmbeddedAgentPayloadResult): string { return ( result.payloads @@ -211,6 +201,16 @@ function joinPayloadText(result: EmbeddedAgentPayloadResult): string { ); } +function resolveExtractionSessionStore(agentId: string): string { + return path.join( + resolveStateDir(), + "commitments", + "extractor-sessions", + agentId, + "sessions.json", + ); +} + async function resolveDefaultModel(params: { cfg: OpenClawConfig; agentId?: string; @@ -234,15 +234,21 @@ async function defaultExtractBatch(params: { const resolved = resolveCommitmentsConfig(cfg); const runId = `commitments-${randomUUID()}`; const modelRef = await resolveDefaultModel({ cfg, agentId: first.agentId }); + const helperConfig = { + ...cfg, + session: { + ...cfg.session, + store: resolveExtractionSessionStore(first.agentId), + }, + } as OpenClawConfig; const { runEmbeddedAgent } = await import("../agents/embedded-agent.js"); const result = await runEmbeddedAgent({ sessionId: runId, sessionKey: `agent:${first.agentId}:commitments:${runId}`, agentId: first.agentId, trigger: "manual", - sessionFile: resolveExtractionSessionFile(first.agentId, runId), workspaceDir: resolveAgentWorkspaceDir(cfg, first.agentId), - config: cfg, + config: helperConfig, provider: modelRef.provider, model: modelRef.model, prompt: buildCommitmentExtractionPrompt({ cfg, items: params.items }), diff --git a/src/config/sessions/session-accessor.sqlite.ts b/src/config/sessions/session-accessor.sqlite.ts index fa556e6f23f2..c3daffecb820 100644 --- a/src/config/sessions/session-accessor.sqlite.ts +++ b/src/config/sessions/session-accessor.sqlite.ts @@ -1,4 +1,5 @@ import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; import path from "node:path"; import { resolveTimestampMsToIsoString } from "@openclaw/normalization-core/number-coercion"; import type { Selectable } from "kysely"; @@ -21,6 +22,7 @@ import { type OpenClawAgentDatabase, type OpenClawAgentDatabaseOptions, } from "../../state/openclaw-agent-db.js"; +import { validateSessionId } from "./paths.js"; import type { ExactSessionEntry, SessionAccessScope, @@ -32,6 +34,7 @@ import type { SessionEntryUpdateOptions, SessionTranscriptAccessScope, SessionTranscriptReadScope, + SessionTranscriptRuntimeTarget, SessionTranscriptWriteScope, TranscriptEvent, TranscriptMessageAppendOptions, @@ -82,6 +85,17 @@ type ResolvedSqliteStoreTarget = { path?: string; }; +export type SqliteSessionTranscriptRuntimeTarget = Omit< + SessionTranscriptRuntimeTarget, + "storageKind" | "targetKind" +> & { + /** Current embedded-run internals still need one lock/fence/session-manager file. */ + activeArtifactKind: "embedded-run-session-file"; + sqlitePath: string; + storageKind: "sqlite"; + targetKind: "sqlite-runtime-session"; +}; + const SQLITE_SESSION_WRITER_QUEUES = new Map(); /** Loads one session entry from the additive SQLite session store. */ @@ -399,10 +413,87 @@ export async function publishSqliteTranscriptUpdate( }); } +/** Resolves SQLite runtime identity plus the active artifact used by current runners. */ +export async function resolveSqliteSessionTranscriptRuntimeTarget( + scope: SessionTranscriptAccessScope, +): Promise { + const resolved = resolveSqliteTranscriptScope(scope); + const sqlitePath = resolveOpenClawAgentSqlitePath(toDatabaseOptions(resolved)); + const defaultSessionFile = resolveSqliteEmbeddedRunSessionFile(sqlitePath, resolved.sessionId); + const now = Date.now(); + const entry = await patchSqliteSessionEntry( + scope, + (current) => { + const currentSessionFile = resolveCurrentSqliteEmbeddedRunSessionFile( + sqlitePath, + current.sessionFile, + ); + const sessionFile = + current.sessionId?.trim() === resolved.sessionId && currentSessionFile + ? currentSessionFile + : defaultSessionFile; + + // Current embedded-run internals still rotate a file-backed active artifact + // after compaction. Persist that pointer in SQLite so the next run reopens + // the live branch instead of reconstructing the pre-rotation path. + return { sessionFile, sessionId: resolved.sessionId, updatedAt: now }; + }, + { + fallbackEntry: { + sessionFile: defaultSessionFile, + sessionId: resolved.sessionId, + updatedAt: now, + }, + }, + ); + const sessionId = entry?.sessionId?.trim() || resolved.sessionId; + const sessionFile = + resolveCurrentSqliteEmbeddedRunSessionFile(sqlitePath, entry?.sessionFile) ?? + resolveSqliteEmbeddedRunSessionFile(sqlitePath, sessionId); + await fs.mkdir(path.dirname(sessionFile), { recursive: true, mode: 0o700 }); + return { + activeArtifactKind: "embedded-run-session-file", + agentId: resolved.agentId, + sessionFile, + sessionId, + sessionKey: resolved.sessionKey, + sqlitePath, + storageKind: "sqlite", + targetKind: "sqlite-runtime-session", + }; +} + function getSessionKysely(database: import("node:sqlite").DatabaseSync) { return getNodeSqliteKysely(database); } +function resolveSqliteEmbeddedRunSessionFile(sqlitePath: string, sessionId: string): string { + return path.join( + resolveSqliteEmbeddedRunSessionFileDir(sqlitePath), + `${validateSessionId(sessionId)}.jsonl`, + ); +} + +function resolveSqliteEmbeddedRunSessionFileDir(sqlitePath: string): string { + return path.join(path.dirname(sqlitePath), "embedded-run-session-files"); +} + +function resolveCurrentSqliteEmbeddedRunSessionFile( + sqlitePath: string, + sessionFile: string | undefined, +): string | undefined { + const trimmed = sessionFile?.trim(); + if (!trimmed) { + return undefined; + } + const artifactDir = path.resolve(resolveSqliteEmbeddedRunSessionFileDir(sqlitePath)); + const candidate = path.resolve(trimmed); + const relative = path.relative(artifactDir, candidate); + return relative.length > 0 && !relative.startsWith("..") && !path.isAbsolute(relative) + ? trimmed + : undefined; +} + async function runExclusiveSqliteSessionWrite( scope: Pick, fn: () => Promise, diff --git a/src/config/sessions/session-accessor.ts b/src/config/sessions/session-accessor.ts index d4a8649426b9..5edbc72c4e69 100644 --- a/src/config/sessions/session-accessor.ts +++ b/src/config/sessions/session-accessor.ts @@ -87,6 +87,16 @@ export type TranscriptMessageAppendResult = { export type TranscriptUpdatePayload = Omit; +/** Active transcript target resolved from storage-neutral runtime identity. */ +export type SessionTranscriptRuntimeTarget = { + agentId: string; + sessionFile: string; + sessionId: string; + sessionKey: string; + storageKind: "file"; + targetKind: "active-session-file" | "runtime-session"; +}; + export type SessionEntryUpdateOptions = { skipMaintenance?: boolean; takeCacheOwnership?: boolean; @@ -295,6 +305,21 @@ export async function publishTranscriptUpdate( }); } +/** Resolves the current file-backed transcript artifact for a runtime session scope. */ +export async function resolveSessionTranscriptRuntimeTarget( + scope: SessionTranscriptAccessScope, +): Promise { + const transcript = await resolveTranscriptAccess(scope); + return { + agentId: scope.agentId ?? resolveAgentIdFromSessionKey(scope.sessionKey) ?? "", + sessionFile: transcript.sessionFile, + sessionId: scope.sessionId, + sessionKey: scope.sessionKey, + storageKind: "file", + targetKind: "runtime-session", + }; +} + function createFallbackSessionEntry(patch: Partial): SessionEntry { const now = Date.now(); return { diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 44883b0a839e..18cd7072c692 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -15,6 +15,7 @@ import { import type { DeliveryContext } from "../../utils/delivery-context.types.js"; import { getFileStatSnapshot } from "../cache-utils.js"; import { getRuntimeConfig } from "../io.js"; +import type { OpenClawConfig } from "../types.openclaw.js"; import { formatSessionArchiveTimestamp } from "./artifacts.js"; import { enforceSessionDiskBudget, type SessionDiskBudgetSweepResult } from "./disk-budget.js"; import { deriveSessionMetaPatch } from "./metadata.js"; @@ -186,6 +187,7 @@ type SingleEntryPersistencePatch = { type SessionEntryWorkflowOptions = { agentId?: string; + config?: OpenClawConfig; env?: NodeJS.ProcessEnv; hydrateSkillPromptRefs?: boolean; storePath?: string; @@ -220,7 +222,8 @@ function resolveSessionWorkflowStorePath( return options.storePath; } const agentId = options.agentId ?? resolveAgentIdFromSessionKey(options.sessionKey); - return resolveStorePath(getRuntimeConfig().session?.store, { + const storeConfig = options.config?.session?.store ?? getRuntimeConfig().session?.store; + return resolveStorePath(storeConfig, { agentId, env: options.env, }); diff --git a/src/context-engine/context-engine.test.ts b/src/context-engine/context-engine.test.ts index 95c00bede0e6..de0a69a53bf4 100644 --- a/src/context-engine/context-engine.test.ts +++ b/src/context-engine/context-engine.test.ts @@ -447,6 +447,27 @@ describe("Engine contract tests", () => { }); }); + it("delegateCompactionToRuntime uses runtime session identity when available", async () => { + const compactRuntimeSpy = installCompactRuntimeSpy(); + + await delegateCompactionToRuntime({ + sessionId: "s3", + sessionFile: "/tmp/session.json", + runtimeContext: { + agentId: "main", + sessionKey: "agent:main:main", + sessionFile: "/tmp/runtime-context-session.json", + workspaceDir: "/tmp/workspace", + }, + }); + + expect(compactRuntimeSpy).toHaveBeenCalledTimes(1); + const compactRuntimeParams = requireCompactRuntimeParams(0); + expect(compactRuntimeParams.agentId).toBe("main"); + expect(compactRuntimeParams.sessionKey).toBe("agent:main:main"); + expect(compactRuntimeParams.sessionFile).toBeUndefined(); + }); + it("builds a normalized memory system prompt addition from the active memory prompt path", () => { registerMemoryPromptSection(({ citationsMode }) => [ "## Memory Recall", diff --git a/src/context-engine/delegate.ts b/src/context-engine/delegate.ts index 66514b78329f..3189fb4896bc 100644 --- a/src/context-engine/delegate.ts +++ b/src/context-engine/delegate.ts @@ -41,8 +41,9 @@ export async function delegateCompactionToRuntime( // runtimeContext carries the full CompactEmbeddedAgentSessionParams fields set // by runtime callers. We spread them and override the fields that come from // the public ContextEngine compact() signature directly. - const runtimeContext = (params.runtimeContext ?? {}) as ContextEngineRuntimeContext & + const runtimeContextWithFile = (params.runtimeContext ?? {}) as ContextEngineRuntimeContext & Partial; + const { sessionFile: _runtimeSessionFile, ...runtimeContext } = runtimeContextWithFile; const currentTokenCount = params.currentTokenCount ?? (typeof runtimeContext.currentTokenCount === "number" && @@ -50,11 +51,21 @@ export async function delegateCompactionToRuntime( runtimeContext.currentTokenCount > 0 ? Math.floor(runtimeContext.currentTokenCount) : undefined); + const runtimeSessionKey = + typeof runtimeContext.sessionKey === "string" && runtimeContext.sessionKey.trim() + ? runtimeContext.sessionKey.trim() + : undefined; + const sessionKey = params.sessionKey ?? runtimeSessionKey; + const agentId = + typeof runtimeContext.agentId === "string" && runtimeContext.agentId.trim() + ? runtimeContext.agentId.trim() + : undefined; const result = await compactEmbeddedAgentSessionDirect({ ...runtimeContext, sessionId: params.sessionId, - sessionFile: params.sessionFile, + ...(sessionKey ? { sessionKey } : { sessionFile: params.sessionFile }), + ...(agentId ? { agentId } : {}), tokenBudget: params.tokenBudget, ...(currentTokenCount !== undefined ? { currentTokenCount } : {}), force: params.force, diff --git a/src/crestodian/assistant.test.ts b/src/crestodian/assistant.test.ts index a78fc76f7e5c..c6e85b22f8a0 100644 --- a/src/crestodian/assistant.test.ts +++ b/src/crestodian/assistant.test.ts @@ -149,8 +149,10 @@ describe("Crestodian assistant", () => { expect(firstCliCall.model).toBe("claude-opus-4-8"); expect(firstCliCall.cleanupCliLiveSessionOnRunEnd).toBe(true); const firstCliConfig = requireRecord(firstCliCall.config); + const firstCliSession = requireRecord(firstCliConfig.session); const firstCliAgents = requireRecord(firstCliConfig.agents); const firstCliDefaults = requireRecord(firstCliAgents.defaults); + expect(firstCliSession.store).toBe("/tmp/crestodian-planner/sessions.json"); expect(firstCliDefaults.cliBackends).toBeUndefined(); expect(firstCliCall.extraSystemPrompt).toBeTypeOf("string"); expect(firstCliCall.extraSystemPrompt).toContain("Do not use tools, shell commands"); @@ -225,12 +227,14 @@ describe("Crestodian assistant", () => { expect(firstEmbeddedCall.disableTools).toBe(true); expect(firstEmbeddedCall.toolsAllow).toEqual([]); const embeddedConfig = requireRecord(firstEmbeddedCall.config); + const embeddedSession = requireRecord(embeddedConfig.session); const embeddedAgents = requireRecord(embeddedConfig.agents); const embeddedDefaults = requireRecord(embeddedAgents.defaults); const embeddedModel = requireRecord(embeddedDefaults.model); const embeddedPlugins = requireRecord(embeddedConfig.plugins); const embeddedEntries = requireRecord(embeddedPlugins.entries); const embeddedCodexEntry = requireRecord(embeddedEntries.codex); + expect(embeddedSession.store).toBe("/tmp/crestodian-planner/sessions.json"); expect(embeddedModel.primary).toBe("openai/gpt-5.5"); expect(embeddedCodexEntry.enabled).toBe(true); }); diff --git a/src/crestodian/assistant.ts b/src/crestodian/assistant.ts index 72eaaf389549..5a04329cd769 100644 --- a/src/crestodian/assistant.ts +++ b/src/crestodian/assistant.ts @@ -181,9 +181,16 @@ async function runLocalRuntimePlanner( const tempDir = await (params.deps?.createTempDir ?? createTempPlannerDir)(); try { const runId = `crestodian-planner-${randomUUID()}`; - const sessionFile = path.join(tempDir, "session.jsonl"); const sessionId = `${runId}-session`; const sessionKey = `temp:crestodian-planner:${runId}`; + const backendConfig = backend.buildConfig(tempDir); + const helperConfig = { + ...backendConfig, + session: { + ...backendConfig.session, + store: path.join(tempDir, "sessions.json"), + }, + }; switch (backend.runner) { case "cli": { const runCli = params.deps?.runCliAgent ?? (await loadRunCliAgent()); @@ -192,9 +199,8 @@ async function runLocalRuntimePlanner( sessionKey, agentId: "crestodian", trigger: "manual", - sessionFile, workspaceDir: tempDir, - config: backend.buildConfig(tempDir), + config: helperConfig, prompt: params.prompt, provider: backend.provider, model: backend.model, @@ -215,9 +221,8 @@ async function runLocalRuntimePlanner( sessionKey, agentId: "crestodian", trigger: "manual", - sessionFile, workspaceDir: tempDir, - config: backend.buildConfig(tempDir), + config: helperConfig, prompt: params.prompt, provider: backend.provider, model: backend.model, diff --git a/src/hooks/llm-slug-generator.test.ts b/src/hooks/llm-slug-generator.test.ts index b8cd931a5aa0..2ce3db15d4e1 100644 --- a/src/hooks/llm-slug-generator.test.ts +++ b/src/hooks/llm-slug-generator.test.ts @@ -52,6 +52,10 @@ describe("generateSlugViaLLM", () => { const options = requireFirstRunOptions(); expect(options.timeoutMs).toBe(15_000); expect(options.cleanupBundleMcpOnRunEnd).toBe(true); + expect(options.sessionKey).toBe("temp:slug-generator"); + expect(options.sessionId).toMatch(/^slug-generator-/); + expect(options.sessionFile).toBeUndefined(); + expect((options.config as OpenClawConfig).session?.store).toContain("openclaw-slug-"); }); it("marks the run lane-local so internal-helper failures do not poison shared profile health (#71709)", async () => { diff --git a/src/hooks/llm-slug-generator.ts b/src/hooks/llm-slug-generator.ts index 54845c6fb9c0..324ea005ccab 100644 --- a/src/hooks/llm-slug-generator.ts +++ b/src/hooks/llm-slug-generator.ts @@ -2,6 +2,7 @@ * LLM-based slug generator for session memory filenames */ +import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; @@ -35,16 +36,20 @@ export async function generateSlugViaLLM(params: { sessionContent: string; cfg: OpenClawConfig; }): Promise { - let tempSessionFile: string | null = null; - + let tempDir: string | undefined; try { const agentId = resolveDefaultAgentId(params.cfg); const workspaceDir = resolveAgentWorkspaceDir(params.cfg, agentId); const agentDir = resolveAgentDir(params.cfg, agentId); - - // Create a temporary session file for this one-off LLM call - const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-slug-")); - tempSessionFile = path.join(tempDir, "session.jsonl"); + const sessionId = `slug-generator-${randomUUID()}`; + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-slug-")); + const helperConfig = { + ...params.cfg, + session: { + ...params.cfg.session, + store: path.join(tempDir, "sessions.json"), + }, + } as OpenClawConfig; const prompt = `Based on this conversation, generate a short 1-2 word filename slug (lowercase, hyphen-separated, no file extension). @@ -60,13 +65,12 @@ Reply with ONLY the slug, nothing else. Examples: "vendor-pitch", "api-design", const timeoutMs = resolveSlugGeneratorTimeoutMs(params.cfg); const result = await runEmbeddedAgent({ - sessionId: `slug-generator-${Date.now()}`, + sessionId, sessionKey: "temp:slug-generator", agentId, - sessionFile: tempSessionFile, workspaceDir, agentDir, - config: params.cfg, + config: helperConfig, prompt, provider, model, @@ -99,12 +103,11 @@ Reply with ONLY the slug, nothing else. Examples: "vendor-pitch", "api-design", log.error(`Failed to generate slug: ${message}`); return null; } finally { - // Clean up temporary session file - if (tempSessionFile) { + if (tempDir) { try { - await fs.rm(path.dirname(tempSessionFile), { recursive: true, force: true }); + await fs.rm(tempDir, { recursive: true, force: true }); } catch { - // Ignore cleanup errors + // Ignore cleanup errors for one-off helper storage. } } } diff --git a/src/talk/agent-consult-runtime.test.ts b/src/talk/agent-consult-runtime.test.ts index 1b6bca4a63f3..a58f1ee065ed 100644 --- a/src/talk/agent-consult-runtime.test.ts +++ b/src/talk/agent-consult-runtime.test.ts @@ -294,13 +294,21 @@ describe("realtime voice agent consult runtime", () => { expect(resolveParentForkDecision).toHaveBeenCalledWith({ parentEntry: sessionStore["agent:main:main"], - storePath: "/tmp/sessions.json", + agentId: "main", + config: {}, }); expect(forkSessionFromParent).toHaveBeenCalledWith({ parentEntry: sessionStore["agent:main:main"], agentId: "main", - sessionsDir: "/tmp", + config: {}, }); + expect(runtime.session.patchSessionEntry).toHaveBeenCalledWith( + expect.objectContaining({ + agentId: "main", + config: {}, + sessionKey: "agent:main:subagent:google-meet:meet-1", + }), + ); const forkedEntry = sessionStore["agent:main:subagent:google-meet:meet-1"]; if (!forkedEntry) { throw new Error("Expected forked consult session entry"); @@ -315,10 +323,70 @@ describe("realtime voice agent consult runtime", () => { expectPositiveTimestamp(forkedEntry.updatedAt); const call = requireEmbeddedAgentCall(runEmbeddedAgent); expect(call.sessionId).toBe("forked-session"); - expect(call.sessionFile).toBe("/tmp/forked.jsonl"); + expect(call.sessionFile).toBeUndefined(); expect(call.spawnedBy).toBe("agent:main:main"); }); + it("forks cross-agent requester context from the requester agent store", async () => { + const { runtime, runEmbeddedAgent, sessionStore } = createAgentRuntime(); + sessionStore["agent:main:main"] = { + sessionId: "parent-session", + sessionFile: "relative-parent.jsonl", + totalTokens: 100, + updatedAt: 1, + }; + const resolveParentForkDecision = vi.fn(async () => ({ + status: "fork" as const, + maxTokens: 100_000, + parentTokens: 100, + })); + const forkSessionFromParent = vi.fn(async () => ({ + sessionId: "forked-session", + sessionFile: "/tmp/forked.jsonl", + })); + setRealtimeVoiceAgentConsultDepsForTest({ + resolveParentForkDecision, + forkSessionFromParent, + }); + + await consultRealtimeVoiceAgent({ + cfg: {} as never, + agentRuntime: runtime as never, + logger: { warn: vi.fn() }, + agentId: "voice-agent", + sessionKey: "agent:voice-agent:google-meet:meet-1", + spawnedBy: "agent:main:main", + contextMode: "fork", + messageProvider: "google-meet", + lane: "google-meet", + runIdPrefix: "google-meet:meet-1", + args: { question: "What should I say?" }, + transcript: [], + surface: "a private Google Meet", + userLabel: "Participant", + }); + + expect(resolveParentForkDecision).toHaveBeenCalledWith({ + parentEntry: sessionStore["agent:main:main"], + agentId: "main", + config: {}, + }); + expect(forkSessionFromParent).toHaveBeenCalledWith({ + parentEntry: sessionStore["agent:main:main"], + agentId: "main", + config: {}, + }); + expect(runtime.session.patchSessionEntry).toHaveBeenCalledWith( + expect.objectContaining({ + agentId: "voice-agent", + config: {}, + sessionKey: "agent:voice-agent:google-meet:meet-1", + }), + ); + const call = requireEmbeddedAgentCall(runEmbeddedAgent); + expect(call.sessionFile).toBe("/tmp/forked.jsonl"); + }); + it("inherits requester message routing for forked consult sessions", async () => { const { runtime, runEmbeddedAgent, sessionStore } = createAgentRuntime(); sessionStore["agent:main:discord:channel:123"] = { diff --git a/src/talk/agent-consult-runtime.ts b/src/talk/agent-consult-runtime.ts index 83d2d810cdb5..d5e4e4090fc7 100644 --- a/src/talk/agent-consult-runtime.ts +++ b/src/talk/agent-consult-runtime.ts @@ -1,5 +1,4 @@ import { randomUUID } from "node:crypto"; -import path from "node:path"; import type { RunEmbeddedAgentParams } from "../agents/embedded-agent-runner/run/params.js"; import { forkSessionFromParent, @@ -81,7 +80,8 @@ function resolveDeliverySessionFields(context?: DeliveryContext): Partial; }): Promise { @@ -125,14 +127,13 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: { const deliveryFields = resolveDeliverySessionFields(params.deliveryContext); const requesterSessionKey = params.spawnedBy?.trim(); const requesterAgentId = parseAgentSessionKey(requesterSessionKey)?.agentId; - const shouldFork = - params.contextMode === "fork" && - requesterSessionKey && - (!requesterAgentId || requesterAgentId === params.agentId); + const parentAgentId = requesterAgentId ?? params.agentId; + const shouldFork = params.contextMode === "fork" && requesterSessionKey; let forkDecisionWarning: string | undefined; const patched = await params.agentRuntime.session.patchSessionEntry({ - storePath: params.storePath, + agentId: params.agentId, + config: params.cfg, sessionKey: params.sessionKey, fallbackEntry: { sessionId: "", @@ -144,24 +145,28 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: { } if (shouldFork) { const parentEntry = params.agentRuntime.session.getSessionEntry({ - storePath: params.storePath, + agentId: parentAgentId, + config: params.cfg, sessionKey: requesterSessionKey, }); if (parentEntry?.sessionId?.trim()) { const decision = await realtimeVoiceAgentConsultDeps.resolveParentForkDecision({ parentEntry, - storePath: params.storePath, + agentId: parentAgentId, + config: params.cfg, }); if (decision.status === "fork") { const fork = await realtimeVoiceAgentConsultDeps.forkSessionFromParent({ parentEntry, - agentId: params.agentId, - sessionsDir: path.dirname(params.storePath), + agentId: parentAgentId, + config: params.cfg, }); if (fork) { return { ...deliveryFields, sessionId: fork.sessionId, + // Current fork storage is file-backed; persist the artifact on + // the entry so the run target resolver reuses the forked branch. sessionFile: fork.sessionFile, spawnedBy: requesterSessionKey, forkedFromParent: true, @@ -221,35 +226,39 @@ export async function consultRealtimeVoiceAgent(params: { const workspaceDir = params.agentRuntime.resolveAgentWorkspaceDir(params.cfg, agentId); await params.agentRuntime.ensureAgentWorkspace({ dir: workspaceDir }); - const storePath = params.agentRuntime.session.resolveStorePath(params.cfg.session?.store, { - agentId, - }); const resolvedDeliveryContext = resolveRealtimeVoiceAgentDeliveryContext({ agentRuntime: params.agentRuntime, - storePath, + agentId, + cfg: params.cfg, sessionKey: params.sessionKey, spawnedBy: params.spawnedBy, }); const sessionEntry = await resolveRealtimeVoiceAgentConsultSessionEntry({ agentId, + cfg: params.cfg, sessionKey: params.sessionKey, spawnedBy: params.spawnedBy, contextMode: params.contextMode, deliveryContext: resolvedDeliveryContext, - storePath, agentRuntime: params.agentRuntime, logger: params.logger, }); const consultDeliveryContext = resolvedDeliveryContext ?? deliveryContextFromSession(sessionEntry); const sessionId = sessionEntry.sessionId; + const requesterAgentId = parseAgentSessionKey(params.spawnedBy?.trim())?.agentId; + const crossAgentForkSessionFile = + sessionEntry.forkedFromParent && requesterAgentId && requesterAgentId !== agentId + ? sessionEntry.sessionFile?.trim() + : undefined; - const sessionFile = params.agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, { - agentId, - }); const result = await params.agentRuntime.runEmbeddedAgent({ sessionId, sessionKey: params.sessionKey, + // Cross-agent forks are file-backed active artifacts in the requester store. + // Passing the artifact keeps the consult run on the forked branch until + // consult forking moves to a storage-neutral parent/child target contract. + ...(crossAgentForkSessionFile ? { sessionFile: crossAgentForkSessionFile } : {}), sandboxSessionKey: resolveRealtimeVoiceAgentSandboxSessionKey(agentId, params.sessionKey), agentId, spawnedBy: params.spawnedBy, @@ -262,7 +271,6 @@ export async function consultRealtimeVoiceAgent(params: { consultDeliveryContext?.threadId != null ? String(consultDeliveryContext.threadId) : undefined, - sessionFile, workspaceDir, config: params.cfg, prompt: buildRealtimeVoiceAgentConsultPrompt({