mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
clawdbot-d02.1.9.1.29: add sqlite run session target adapter
This commit is contained in:
@@ -14,7 +14,7 @@ import {
|
||||
import type {
|
||||
CliPreparedBackend,
|
||||
PreparedCliRunContext,
|
||||
RunCliAgentParams,
|
||||
PreparedRunCliAgentParams,
|
||||
} from "./cli-runner/types.js";
|
||||
|
||||
// This e2e spins a real stdio MCP server plus a spawned CLI process. Keep the
|
||||
@@ -151,7 +151,7 @@ async function prepareBundleMcpExecutionContext(params: {
|
||||
workspaceDir: params.workspaceDir,
|
||||
config: params.config,
|
||||
})) as CliPreparedBackend;
|
||||
const runParams: RunCliAgentParams = {
|
||||
const runParams: PreparedRunCliAgentParams = {
|
||||
sessionId: params.sessionId,
|
||||
sessionFile: params.sessionFile,
|
||||
workspaceDir: params.workspaceDir,
|
||||
|
||||
@@ -30,10 +30,15 @@ import {
|
||||
runAgentHarnessLlmInputHook,
|
||||
runAgentHarnessLlmOutputHook,
|
||||
} from "./harness/lifecycle-hook-helpers.js";
|
||||
import {
|
||||
applyAgentRunSessionTargetIdentity,
|
||||
resolveAgentRunSessionTarget,
|
||||
} from "./run-session-target.js";
|
||||
import type { AgentMessage } from "./runtime/index.js";
|
||||
import { SessionManager } from "./sessions/session-manager.js";
|
||||
|
||||
const log = createSubsystemLogger("agents/cli-runner");
|
||||
type RunCliAgentParamsWithSessionFile = RunCliAgentParams & { sessionFile: string };
|
||||
|
||||
const cliRunnerDeps = {
|
||||
claudeCliSessionTranscriptHasContent: claudeCliSessionTranscriptHasContentImpl,
|
||||
@@ -197,7 +202,9 @@ async function runCliAgentEndHook(
|
||||
runAgentEndSideEffects(hookParams);
|
||||
}
|
||||
|
||||
async function persistApprovedCliUserTurnTranscript(params: RunCliAgentParams): Promise<void> {
|
||||
async function persistApprovedCliUserTurnTranscript(
|
||||
params: RunCliAgentParamsWithSessionFile,
|
||||
): Promise<void> {
|
||||
if (params.suppressNextUserMessagePersistence === true || !params.userTurnTranscriptRecorder) {
|
||||
return;
|
||||
}
|
||||
@@ -281,7 +288,16 @@ async function finalizeCliContextEngineTurn(params: {
|
||||
}
|
||||
}
|
||||
|
||||
export async function runCliAgent(params: RunCliAgentParams): Promise<EmbeddedAgentRunResult> {
|
||||
export async function runCliAgent(paramsInput: RunCliAgentParams): Promise<EmbeddedAgentRunResult> {
|
||||
const paramsBase = applyAgentRunSessionTargetIdentity(paramsInput);
|
||||
const runSessionTarget = await resolveAgentRunSessionTarget(paramsBase);
|
||||
const params: RunCliAgentParamsWithSessionFile = {
|
||||
...paramsBase,
|
||||
agentId: paramsBase.agentId ?? runSessionTarget.agentId,
|
||||
sessionId: runSessionTarget.sessionId,
|
||||
sessionKey: paramsBase.sessionKey ?? runSessionTarget.sessionKey,
|
||||
sessionFile: runSessionTarget.sessionFile,
|
||||
};
|
||||
// Cron gate must fire before prepareCliRunContext — that call allocates
|
||||
// backend resources released only by runPreparedCliAgent's try…finally.
|
||||
params.onExecutionStarted?.();
|
||||
|
||||
@@ -76,7 +76,11 @@ import {
|
||||
loadCliSessionReseedMessages,
|
||||
resolveAutoCliSessionReseedHistoryChars,
|
||||
} from "./session-history.js";
|
||||
import type { CliReusableSession, PreparedCliRunContext, RunCliAgentParams } from "./types.js";
|
||||
import type {
|
||||
CliReusableSession,
|
||||
PreparedCliRunContext,
|
||||
PreparedRunCliAgentParams,
|
||||
} from "./types.js";
|
||||
|
||||
const prepareDeps = {
|
||||
makeBootstrapWarn: makeBootstrapWarnImpl,
|
||||
@@ -132,7 +136,7 @@ export function shouldSkipLocalCliCredentialEpoch(params: {
|
||||
}
|
||||
|
||||
export async function prepareCliRunContext(
|
||||
params: RunCliAgentParams,
|
||||
params: PreparedRunCliAgentParams,
|
||||
): Promise<PreparedCliRunContext> {
|
||||
const started = Date.now();
|
||||
const workspaceResolution = resolveRunWorkspaceDir({
|
||||
@@ -638,7 +642,7 @@ export async function prepareCliRunContext(
|
||||
config: contextEngineConfig,
|
||||
});
|
||||
const contextEngineTurnPrompt = params.transcriptPrompt ?? params.prompt;
|
||||
const preparedParams: RunCliAgentParams = {
|
||||
const preparedParams: PreparedRunCliAgentParams = {
|
||||
...params,
|
||||
config: contextEngineConfig,
|
||||
prompt: preparedPrompt,
|
||||
|
||||
@@ -24,15 +24,19 @@ import type {
|
||||
CurrentInboundPromptContext,
|
||||
EmbeddedRunTrigger,
|
||||
} from "../embedded-agent-runner/run/params.js";
|
||||
import type { AgentRunSessionTarget } from "../run-session-target.js";
|
||||
import type { SilentReplyPromptMode } from "../system-prompt.types.js";
|
||||
|
||||
export type RunCliAgentParams = {
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
/** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
sessionEntry?: SessionEntry;
|
||||
agentId?: string;
|
||||
trigger?: EmbeddedRunTrigger;
|
||||
sessionFile: string;
|
||||
/** @deprecated Use sessionTarget plus sessionId/sessionKey/agentId for runtime identity. */
|
||||
sessionFile?: string;
|
||||
workspaceDir: string;
|
||||
/** Task working directory for CLI execution. Defaults to workspaceDir. */
|
||||
cwd?: string;
|
||||
@@ -130,8 +134,10 @@ export type CliReusableSession = {
|
||||
| "orphaned-tool-use";
|
||||
};
|
||||
|
||||
export type PreparedRunCliAgentParams = RunCliAgentParams & { sessionFile: string };
|
||||
|
||||
export type PreparedCliRunContext = {
|
||||
params: RunCliAgentParams;
|
||||
params: PreparedRunCliAgentParams;
|
||||
effectiveAuthProfileId?: string;
|
||||
started: number;
|
||||
workspaceDir: string;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import type { CompactEmbeddedAgentSessionParams } from "./compact.types.js";
|
||||
import type { CompactEmbeddedAgentSessionRuntimeParams } from "./compact.types.js";
|
||||
import type { EmbeddedAgentCompactResult } from "./types.js";
|
||||
|
||||
export type CompactEmbeddedAgentSessionDirect = (
|
||||
params: CompactEmbeddedAgentSessionParams,
|
||||
params: CompactEmbeddedAgentSessionRuntimeParams,
|
||||
) => Promise<EmbeddedAgentCompactResult>;
|
||||
|
||||
@@ -95,6 +95,10 @@ import { ensureOpenClawModelsJson } from "../models-config.js";
|
||||
import { wrapStreamFnTextTransforms } from "../plugin-text-transforms.js";
|
||||
import { resolveAgentPromptSurfaceForSessionKey } from "../prompt-surface.js";
|
||||
import { registerProviderStreamForModel } from "../provider-stream.js";
|
||||
import {
|
||||
applyAgentRunSessionTargetIdentity,
|
||||
resolveAgentRunSessionTarget,
|
||||
} from "../run-session-target.js";
|
||||
import { collectRuntimeChannelCapabilities } from "../runtime-capabilities.js";
|
||||
import { buildAgentRuntimePlan } from "../runtime-plan/build.js";
|
||||
import type { AgentRuntimePlan } from "../runtime-plan/types.js";
|
||||
@@ -123,6 +127,7 @@ import {
|
||||
} from "./compact-reasons.js";
|
||||
import type {
|
||||
CompactEmbeddedAgentSessionParams,
|
||||
CompactEmbeddedAgentSessionRuntimeParams,
|
||||
CompactionMessageMetrics,
|
||||
} from "./compact.types.js";
|
||||
import { dedupeDuplicateUserMessagesForCompaction } from "./compaction-duplicate-user-messages.js";
|
||||
@@ -175,6 +180,10 @@ import { mapThinkingLevel, normalizeContextTokenBudget } from "./utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js";
|
||||
export type { CompactEmbeddedAgentSessionParams } from "./compact.types.js";
|
||||
|
||||
type CompactEmbeddedAgentSessionParamsWithSessionFile = CompactEmbeddedAgentSessionRuntimeParams & {
|
||||
sessionFile: string;
|
||||
};
|
||||
|
||||
function hasRealConversationContent(
|
||||
msg: AgentMessage,
|
||||
messages: AgentMessage[],
|
||||
@@ -417,8 +426,17 @@ function fallbackFailureToCompactionResult(err: unknown): EmbeddedAgentCompactRe
|
||||
* Use this when already inside a session/global lane to avoid deadlocks.
|
||||
*/
|
||||
export async function compactEmbeddedAgentSessionDirect(
|
||||
params: CompactEmbeddedAgentSessionParams,
|
||||
paramsInput: CompactEmbeddedAgentSessionRuntimeParams,
|
||||
): Promise<EmbeddedAgentCompactResult> {
|
||||
const paramsBase = applyAgentRunSessionTargetIdentity(paramsInput);
|
||||
const runSessionTarget = await resolveAgentRunSessionTarget(paramsBase);
|
||||
const params: CompactEmbeddedAgentSessionParamsWithSessionFile = {
|
||||
...paramsBase,
|
||||
agentId: paramsBase.agentId ?? runSessionTarget.agentId,
|
||||
sessionId: runSessionTarget.sessionId,
|
||||
sessionKey: paramsBase.sessionKey ?? runSessionTarget.sessionKey,
|
||||
sessionFile: runSessionTarget.sessionFile,
|
||||
};
|
||||
if (hasExplicitCompactionModel(params) || !hasCompactionModelFallbackCandidates(params)) {
|
||||
return await compactEmbeddedAgentSessionDirectOnce(params);
|
||||
}
|
||||
@@ -483,7 +501,7 @@ export async function compactEmbeddedAgentSessionDirect(
|
||||
}
|
||||
|
||||
async function compactEmbeddedAgentSessionDirectOnce(
|
||||
params: CompactEmbeddedAgentSessionParams,
|
||||
params: CompactEmbeddedAgentSessionParamsWithSessionFile,
|
||||
): Promise<EmbeddedAgentCompactResult> {
|
||||
const startedAt = Date.now();
|
||||
const diagId = params.diagId?.trim() || createCompactionDiagId();
|
||||
|
||||
@@ -5,6 +5,7 @@ import type { ContextEngine, ContextEngineRuntimeContext } from "../../context-e
|
||||
import type { CommandQueueEnqueueFn } from "../../process/command-queue.types.js";
|
||||
import type { SkillSnapshot } from "../../skills/types.js";
|
||||
import type { ExecElevatedDefaults, ExecToolDefaults } from "../bash-tools.exec-types.js";
|
||||
import type { AgentRunSessionTarget } from "../run-session-target.js";
|
||||
import type { AgentRuntimePlan } from "../runtime-plan/types.js";
|
||||
|
||||
export type CompactEmbeddedAgentSessionParams = {
|
||||
@@ -35,6 +36,9 @@ export type CompactEmbeddedAgentSessionParams = {
|
||||
groupSpace?: string | null;
|
||||
/** Parent session key for subagent policy inheritance. */
|
||||
spawnedBy?: string | null;
|
||||
/** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
/** Active file-backed artifact for current compaction internals. */
|
||||
sessionFile: string;
|
||||
/** Optional caller-observed live prompt tokens used for compaction diagnostics. */
|
||||
currentTokenCount?: number;
|
||||
@@ -97,6 +101,14 @@ export type CompactEmbeddedAgentSessionParams = {
|
||||
allowGatewaySubagentBinding?: boolean;
|
||||
};
|
||||
|
||||
export type CompactEmbeddedAgentSessionRuntimeParams = Omit<
|
||||
CompactEmbeddedAgentSessionParams,
|
||||
"sessionFile"
|
||||
> & {
|
||||
/** @deprecated Use sessionTarget plus sessionId/sessionKey/agentId for runtime identity. */
|
||||
sessionFile?: string;
|
||||
};
|
||||
|
||||
export type CompactionMessageMetrics = {
|
||||
messages: number;
|
||||
historyTextChars: number;
|
||||
|
||||
@@ -2,6 +2,11 @@ import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
loadSqliteSessionEntry,
|
||||
patchSqliteSessionEntry,
|
||||
} from "../../config/sessions/session-accessor.sqlite.js";
|
||||
import { closeOpenClawAgentDatabasesForTest } from "../../state/openclaw-agent-db.js";
|
||||
import type { AgentHarness } from "../harness/types.js";
|
||||
import type { AgentInternalEvent } from "../internal-events.js";
|
||||
import type { AgentRuntimePlan } from "../runtime-plan/types.js";
|
||||
@@ -1980,6 +1985,130 @@ describe("runEmbeddedAgent overflow compaction trigger routing", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("recovers empty-transcript preflight compaction through a forced SQLite target", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-empty-preflight-sqlite-"));
|
||||
const storePath = path.join(dir, "state", "agents", "helper", "sessions", "sessions.json");
|
||||
const sqlitePath = path.join(
|
||||
dir,
|
||||
"state",
|
||||
"agents",
|
||||
"helper",
|
||||
"agent",
|
||||
"openclaw-agent.sqlite",
|
||||
);
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
"test-key": {
|
||||
sessionId: "test-session",
|
||||
updatedAt: 1,
|
||||
totalTokens: 1_500_000,
|
||||
totalTokensFresh: true,
|
||||
contextBudgetStatus: { stale: true },
|
||||
},
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
await patchSqliteSessionEntry(
|
||||
{ agentId: "helper", sessionKey: "test-key", storePath: sqlitePath },
|
||||
() => ({
|
||||
contextBudgetStatus: {
|
||||
schemaVersion: 1,
|
||||
source: "pre-prompt-estimate",
|
||||
updatedAt: 1,
|
||||
provider: "claude-cli",
|
||||
model: "claude-opus-4-7",
|
||||
route: "compact_only",
|
||||
shouldCompact: true,
|
||||
estimatedPromptTokens: 1_794_391,
|
||||
contextTokenBudget: 1_048_576,
|
||||
promptBudgetBeforeReserve: 1_044_480,
|
||||
reserveTokens: 4_096,
|
||||
effectiveReserveTokens: 4_096,
|
||||
remainingPromptBudgetTokens: 0,
|
||||
overflowTokens: 749_911,
|
||||
toolResultReducibleChars: 0,
|
||||
messageCount: 0,
|
||||
unwindowedMessageCount: 0,
|
||||
},
|
||||
sessionId: "test-session",
|
||||
totalTokens: 1_500_000,
|
||||
totalTokensFresh: true,
|
||||
updatedAt: 1,
|
||||
}),
|
||||
{
|
||||
fallbackEntry: {
|
||||
sessionId: "test-session",
|
||||
updatedAt: 1,
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
mockedRunEmbeddedAttempt
|
||||
.mockResolvedValueOnce(
|
||||
makeAttemptResult({
|
||||
promptError: makeOverflowError(),
|
||||
promptErrorSource: "precheck",
|
||||
preflightRecovery: { route: "compact_only" },
|
||||
contextBudgetStatus: {
|
||||
schemaVersion: 1,
|
||||
source: "pre-prompt-estimate",
|
||||
updatedAt: 1,
|
||||
provider: "claude-cli",
|
||||
model: "claude-opus-4-7",
|
||||
route: "compact_only",
|
||||
shouldCompact: true,
|
||||
estimatedPromptTokens: 1_794_391,
|
||||
contextTokenBudget: 1_048_576,
|
||||
promptBudgetBeforeReserve: 1_044_480,
|
||||
reserveTokens: 4_096,
|
||||
effectiveReserveTokens: 4_096,
|
||||
remainingPromptBudgetTokens: 0,
|
||||
overflowTokens: 749_911,
|
||||
toolResultReducibleChars: 0,
|
||||
messageCount: 0,
|
||||
unwindowedMessageCount: 0,
|
||||
},
|
||||
assistantTexts: [],
|
||||
}),
|
||||
)
|
||||
.mockResolvedValueOnce(makeAttemptResult({ promptError: null }));
|
||||
mockedCompactDirect.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
compacted: false,
|
||||
reason: "no real conversation messages",
|
||||
});
|
||||
|
||||
try {
|
||||
await runEmbeddedAgent({
|
||||
...overflowBaseRunParams,
|
||||
agentId: "helper",
|
||||
config: {
|
||||
session: {
|
||||
store: storePath,
|
||||
},
|
||||
} as RunEmbeddedAgentParams["config"],
|
||||
sessionFile: undefined,
|
||||
sessionTarget: { storageKind: "sqlite" },
|
||||
});
|
||||
|
||||
const sqliteEntry = loadSqliteSessionEntry({
|
||||
agentId: "helper",
|
||||
sessionKey: "test-key",
|
||||
storePath: sqlitePath,
|
||||
});
|
||||
expect(sqliteEntry?.totalTokens).toBe(0);
|
||||
expect(sqliteEntry?.totalTokensFresh).toBe(true);
|
||||
expect(sqliteEntry?.contextBudgetStatus).toBeUndefined();
|
||||
const jsonEntry = JSON.parse(await fs.readFile(storePath, "utf8"))["test-key"];
|
||||
expect(jsonEntry.totalTokens).toBe(1_500_000);
|
||||
} finally {
|
||||
closeOpenClawAgentDatabasesForTest();
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("passes observed overflow token counts into compaction when providers report them", async () => {
|
||||
const overflowError = new Error(
|
||||
'400 {"type":"error","error":{"type":"invalid_request_error","message":"prompt is too long: 277403 tokens > 200000 maximum"}}',
|
||||
|
||||
@@ -6,6 +6,7 @@ import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
|
||||
import type { ThinkLevel } from "../../auto-reply/thinking.js";
|
||||
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
|
||||
import { resolveStorePath, updateSessionStoreEntry } from "../../config/sessions.js";
|
||||
import { updateSqliteSessionEntry } from "../../config/sessions/session-accessor.sqlite.js";
|
||||
import { ensureContextEnginesInitialized } from "../../context-engine/init.js";
|
||||
import {
|
||||
resolveContextEngine,
|
||||
@@ -94,6 +95,12 @@ import {
|
||||
} from "../openai-routing.js";
|
||||
import { resolveProviderIdForAuth } from "../provider-auth-aliases.js";
|
||||
import { runAgentCleanupStep } from "../run-cleanup-timeout.js";
|
||||
import {
|
||||
applyAgentRunSessionTargetIdentity,
|
||||
persistAgentRunSessionTargetIdentity,
|
||||
resolveAgentRunSessionTarget,
|
||||
type ResolvedAgentRunSessionTarget,
|
||||
} from "../run-session-target.js";
|
||||
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
|
||||
import { buildAgentRuntimePlan } from "../runtime-plan/build.js";
|
||||
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
|
||||
@@ -201,6 +208,7 @@ import type {
|
||||
import { createUsageAccumulator, mergeUsageIntoAccumulator } from "./usage-accumulator.js";
|
||||
|
||||
type ApiKeyInfo = ResolvedProviderAuth;
|
||||
type RunEmbeddedAgentParamsWithSessionFile = RunEmbeddedAgentParams & { sessionFile: string };
|
||||
|
||||
const MAX_SAME_MODEL_IDLE_TIMEOUT_RETRIES = 1;
|
||||
const EMBEDDED_RUN_LANE_TIMEOUT_GRACE_MS = 30_000;
|
||||
@@ -227,27 +235,51 @@ async function resetNoRealConversationTokenSnapshot(params: {
|
||||
config?: RunEmbeddedAgentParams["config"];
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
runSessionTarget?: ResolvedAgentRunSessionTarget;
|
||||
}): Promise<void> {
|
||||
if (!params.sessionKey) {
|
||||
return;
|
||||
}
|
||||
const storePath = resolveStorePath(params.config?.session?.store, { agentId: params.agentId });
|
||||
const resetPatch = () => ({
|
||||
totalTokens: 0,
|
||||
totalTokensFresh: true,
|
||||
inputTokens: undefined,
|
||||
outputTokens: undefined,
|
||||
cacheRead: undefined,
|
||||
cacheWrite: undefined,
|
||||
contextBudgetStatus: undefined,
|
||||
updatedAt: Date.now(),
|
||||
});
|
||||
try {
|
||||
if (params.runSessionTarget?.storageKind === "sqlite") {
|
||||
await updateSqliteSessionEntry(
|
||||
{
|
||||
agentId: params.runSessionTarget.agentId,
|
||||
storePath: params.runSessionTarget.sqlitePath,
|
||||
sessionKey: params.sessionKey,
|
||||
},
|
||||
async () => resetPatch(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const storePath = resolveStorePath(params.config?.session?.store, { agentId: params.agentId });
|
||||
if (storePath.trim().toLowerCase().endsWith(".sqlite")) {
|
||||
await updateSqliteSessionEntry(
|
||||
{
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
storePath,
|
||||
sessionKey: params.sessionKey,
|
||||
},
|
||||
async () => resetPatch(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
await updateSessionStoreEntry({
|
||||
storePath,
|
||||
sessionKey: params.sessionKey,
|
||||
skipMaintenance: true,
|
||||
takeCacheOwnership: true,
|
||||
update: async () => ({
|
||||
totalTokens: 0,
|
||||
totalTokensFresh: true,
|
||||
inputTokens: undefined,
|
||||
outputTokens: undefined,
|
||||
cacheRead: undefined,
|
||||
cacheWrite: undefined,
|
||||
contextBudgetStatus: undefined,
|
||||
updatedAt: Date.now(),
|
||||
}),
|
||||
update: async () => resetPatch(),
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn(
|
||||
@@ -458,18 +490,36 @@ function buildHandledReplyPayloads(reply?: ReplyPayload) {
|
||||
export async function runEmbeddedAgent(
|
||||
paramsInput: RunEmbeddedAgentParams,
|
||||
): Promise<EmbeddedAgentRunResult> {
|
||||
let params = paramsInput;
|
||||
let paramsBase = applyAgentRunSessionTargetIdentity(paramsInput);
|
||||
// Resolve sessionKey early so all downstream consumers (hooks, LCM, compaction)
|
||||
// receive a non-null key even when callers omit it. See #60552.
|
||||
const effectiveSessionKey = backfillSessionKey({
|
||||
config: params.config,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: params.agentId,
|
||||
config: paramsBase.config,
|
||||
sessionId: paramsBase.sessionId,
|
||||
sessionKey: paramsBase.sessionKey,
|
||||
agentId: paramsBase.agentId,
|
||||
});
|
||||
if (effectiveSessionKey !== params.sessionKey) {
|
||||
params = { ...params, sessionKey: effectiveSessionKey };
|
||||
if (effectiveSessionKey !== paramsBase.sessionKey) {
|
||||
paramsBase = { ...paramsBase, sessionKey: effectiveSessionKey };
|
||||
}
|
||||
const runSessionTarget = await resolveAgentRunSessionTarget(paramsBase);
|
||||
const params: RunEmbeddedAgentParamsWithSessionFile = {
|
||||
...paramsBase,
|
||||
agentId: paramsBase.agentId ?? runSessionTarget.agentId,
|
||||
sessionId: runSessionTarget.sessionId,
|
||||
sessionKey: paramsBase.sessionKey ?? runSessionTarget.sessionKey,
|
||||
sessionFile: runSessionTarget.sessionFile,
|
||||
};
|
||||
const persistActiveRunSessionTarget = async (identity: {
|
||||
sessionFile: string;
|
||||
sessionId: string;
|
||||
}) => {
|
||||
await persistAgentRunSessionTargetIdentity({
|
||||
target: runSessionTarget,
|
||||
sessionFile: identity.sessionFile,
|
||||
sessionId: identity.sessionId,
|
||||
});
|
||||
};
|
||||
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
|
||||
const globalLane = resolveGlobalLane(params.lane);
|
||||
const sessionQueuePriority = resolveEmbeddedRunSessionQueuePriority(params.trigger);
|
||||
@@ -1347,8 +1397,10 @@ export async function runEmbeddedAgent(
|
||||
) => {
|
||||
const nextSessionId = compactResult.result?.sessionId;
|
||||
const nextSessionFile = compactResult.result?.sessionFile;
|
||||
let changed = false;
|
||||
if (nextSessionId && nextSessionId !== activeSessionId) {
|
||||
activeSessionId = nextSessionId;
|
||||
changed = true;
|
||||
// Keep the run context's sessionId tracking the live session so
|
||||
// lifecycle persistence isn't treated as stale after a legitimate
|
||||
// mid-run compaction rotation (#88538).
|
||||
@@ -1356,7 +1408,9 @@ export async function runEmbeddedAgent(
|
||||
}
|
||||
if (nextSessionFile && nextSessionFile !== activeSessionFile) {
|
||||
activeSessionFile = nextSessionFile;
|
||||
changed = true;
|
||||
}
|
||||
return changed;
|
||||
};
|
||||
const onCompactionHookMessages = async (payload: {
|
||||
phase: "before" | "after";
|
||||
@@ -1709,13 +1763,22 @@ export async function runEmbeddedAgent(
|
||||
currentAttemptAssistant,
|
||||
} = attempt;
|
||||
const timedOutDuringToolExecution = attempt.timedOutDuringToolExecution ?? false;
|
||||
let activeTargetChanged = false;
|
||||
if (sessionIdUsed && sessionIdUsed !== activeSessionId) {
|
||||
activeSessionId = sessionIdUsed;
|
||||
activeTargetChanged = true;
|
||||
// Track the live session for lifecycle persistence identity (#88538).
|
||||
registerAgentRunContext(params.runId, { sessionId: activeSessionId });
|
||||
}
|
||||
if (sessionFileUsed && sessionFileUsed !== activeSessionFile) {
|
||||
activeSessionFile = sessionFileUsed;
|
||||
activeTargetChanged = true;
|
||||
}
|
||||
if (activeTargetChanged) {
|
||||
await persistActiveRunSessionTarget({
|
||||
sessionFile: activeSessionFile,
|
||||
sessionId: activeSessionId,
|
||||
});
|
||||
}
|
||||
bootstrapPromptWarningSignaturesSeen =
|
||||
attempt.bootstrapPromptWarningSignaturesSeen ??
|
||||
@@ -1978,7 +2041,12 @@ export async function runEmbeddedAgent(
|
||||
};
|
||||
}
|
||||
if (timeoutCompactResult.compacted) {
|
||||
adoptCompactionTranscript(timeoutCompactResult);
|
||||
if (adoptCompactionTranscript(timeoutCompactResult)) {
|
||||
await persistActiveRunSessionTarget({
|
||||
sessionFile: activeSessionFile,
|
||||
sessionId: activeSessionId,
|
||||
});
|
||||
}
|
||||
}
|
||||
await runOwnsCompactionAfterHook("timeout recovery", timeoutCompactResult);
|
||||
if (timeoutCompactResult.compacted) {
|
||||
@@ -2165,7 +2233,12 @@ export async function runEmbeddedAgent(
|
||||
params.abortSignal,
|
||||
);
|
||||
if (compactResult.ok && compactResult.compacted) {
|
||||
adoptCompactionTranscript(compactResult);
|
||||
if (adoptCompactionTranscript(compactResult)) {
|
||||
await persistActiveRunSessionTarget({
|
||||
sessionFile: activeSessionFile,
|
||||
sessionId: activeSessionId,
|
||||
});
|
||||
}
|
||||
await runContextEngineMaintenance({
|
||||
contextEngine,
|
||||
sessionId: activeSessionId,
|
||||
@@ -2195,6 +2268,7 @@ export async function runEmbeddedAgent(
|
||||
config: params.config,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: sessionAgentId,
|
||||
runSessionTarget,
|
||||
});
|
||||
log.info(
|
||||
`[context-overflow-precheck] stale token state had no real conversation messages for ` +
|
||||
@@ -2206,7 +2280,12 @@ export async function runEmbeddedAgent(
|
||||
continue;
|
||||
}
|
||||
if (compactResult.compacted) {
|
||||
adoptCompactionTranscript(compactResult);
|
||||
if (adoptCompactionTranscript(compactResult)) {
|
||||
await persistActiveRunSessionTarget({
|
||||
sessionFile: activeSessionFile,
|
||||
sessionId: activeSessionId,
|
||||
});
|
||||
}
|
||||
if (
|
||||
typeof compactResult.result?.tokensAfter === "number" &&
|
||||
Number.isFinite(compactResult.result.tokensAfter) &&
|
||||
|
||||
@@ -22,6 +22,7 @@ import type {
|
||||
ToolResultFormat,
|
||||
} from "../../embedded-agent-subscribe.shared-types.js";
|
||||
import type { AgentInternalEvent } from "../../internal-events.js";
|
||||
import type { AgentRunSessionTarget } from "../../run-session-target.js";
|
||||
import type { AgentMessage } from "../../runtime/index.js";
|
||||
import type { SilentReplyPromptMode } from "../../system-prompt.types.js";
|
||||
import type { PromptMode } from "../../system-prompt.types.js";
|
||||
@@ -40,6 +41,8 @@ export type CurrentInboundPromptContext = {
|
||||
export type RunEmbeddedAgentParams = {
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
/** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
/** Provider prompt-cache affinity key; distinct from transcript/session identity. */
|
||||
promptCacheKey?: string;
|
||||
/** Session-like key for sandbox and tool-policy resolution. Defaults to sessionKey. */
|
||||
@@ -102,7 +105,8 @@ export type RunEmbeddedAgentParams = {
|
||||
forceHeartbeatTool?: boolean;
|
||||
/** Allow runtime plugins for this run to late-bind the gateway subagent. */
|
||||
allowGatewaySubagentBinding?: boolean;
|
||||
sessionFile: string;
|
||||
/** @deprecated Use sessionTarget plus sessionId/sessionKey/agentId for runtime identity. */
|
||||
sessionFile?: string;
|
||||
workspaceDir: string;
|
||||
/** Task working directory for tool/runtime execution. Defaults to workspaceDir. */
|
||||
cwd?: string;
|
||||
|
||||
@@ -29,8 +29,15 @@ import type { PreemptiveCompactionRoute } from "./preemptive-compaction.types.js
|
||||
|
||||
type EmbeddedRunAttemptBase = Omit<
|
||||
RunEmbeddedAgentParams,
|
||||
"provider" | "model" | "authProfileId" | "authProfileIdSource" | "thinkLevel" | "lane" | "enqueue"
|
||||
>;
|
||||
| "provider"
|
||||
| "model"
|
||||
| "authProfileId"
|
||||
| "authProfileIdSource"
|
||||
| "thinkLevel"
|
||||
| "lane"
|
||||
| "enqueue"
|
||||
| "sessionFile"
|
||||
> & { sessionFile: string };
|
||||
|
||||
export type EmbeddedRunContextWindowInfo = {
|
||||
tokens: number;
|
||||
|
||||
235
src/agents/run-session-target.test.ts
Normal file
235
src/agents/run-session-target.test.ts
Normal file
@@ -0,0 +1,235 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
loadSqliteSessionEntry,
|
||||
patchSqliteSessionEntry,
|
||||
} from "../config/sessions/session-accessor.sqlite.js";
|
||||
import { loadSessionStore } from "../config/sessions/store.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js";
|
||||
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
|
||||
import {
|
||||
persistAgentRunSessionTargetIdentity,
|
||||
resolveAgentRunSessionTarget,
|
||||
} from "./run-session-target.js";
|
||||
|
||||
describe("agent run session target", () => {
|
||||
let tempDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-run-session-target-"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
closeOpenClawAgentDatabasesForTest();
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
fs.rmSync(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("resolves runtime identity through the run config store", async () => {
|
||||
const storePath = path.join(tempDir, "custom-sessions", "sessions.json");
|
||||
const sessionKey = "agent:helper:commitments:test-run";
|
||||
|
||||
const target = await resolveAgentRunSessionTarget({
|
||||
agentId: "helper",
|
||||
config: { session: { store: storePath } } as OpenClawConfig,
|
||||
sessionId: "test-run",
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
expect(target).toMatchObject({
|
||||
agentId: "helper",
|
||||
sessionId: "test-run",
|
||||
sessionKey,
|
||||
targetKind: "runtime-session",
|
||||
});
|
||||
expect(path.dirname(target.sessionFile)).toBe(path.dirname(storePath));
|
||||
expect(loadSessionStore(storePath, { skipCache: true })[sessionKey]?.sessionFile).toBe(
|
||||
target.sessionFile,
|
||||
);
|
||||
});
|
||||
|
||||
it("uses the agent from an agent-scoped session key when agentId is omitted", async () => {
|
||||
const storeRoot = path.join(tempDir, "agents", "{agentId}", "sessions.json");
|
||||
const sessionKey = "agent:helper:main";
|
||||
|
||||
const target = await resolveAgentRunSessionTarget({
|
||||
config: { session: { store: storeRoot } } as OpenClawConfig,
|
||||
sessionId: "helper-session",
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
const helperStorePath = path.join(tempDir, "agents", "helper", "sessions.json");
|
||||
expect(target.agentId).toBe("helper");
|
||||
expect(path.dirname(target.sessionFile)).toBe(path.dirname(helperStorePath));
|
||||
expect(loadSessionStore(helperStorePath, { skipCache: true })[sessionKey]?.sessionFile).toBe(
|
||||
target.sessionFile,
|
||||
);
|
||||
});
|
||||
|
||||
it("resolves SQLite identity through the persisted active file artifact", async () => {
|
||||
const sqlitePath = path.join(tempDir, "helper", "openclaw-agent.sqlite");
|
||||
const sessionKey = "agent:helper:commitments:sqlite-run";
|
||||
|
||||
const target = await resolveAgentRunSessionTarget({
|
||||
agentId: "helper",
|
||||
config: { session: { store: sqlitePath } } as OpenClawConfig,
|
||||
sessionId: "sqlite-run",
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
expect(target).toMatchObject({
|
||||
activeArtifactKind: "embedded-run-session-file",
|
||||
agentId: "helper",
|
||||
sessionId: "sqlite-run",
|
||||
sessionKey,
|
||||
sqlitePath,
|
||||
storageKind: "sqlite",
|
||||
targetKind: "sqlite-runtime-session",
|
||||
});
|
||||
expect(path.dirname(target.sessionFile)).toBe(
|
||||
path.join(path.dirname(sqlitePath), "embedded-run-session-files"),
|
||||
);
|
||||
expect(
|
||||
loadSqliteSessionEntry({
|
||||
agentId: "helper",
|
||||
sessionKey,
|
||||
storePath: sqlitePath,
|
||||
}),
|
||||
).toMatchObject({
|
||||
sessionFile: target.sessionFile,
|
||||
sessionId: "sqlite-run",
|
||||
});
|
||||
|
||||
const rotatedSessionFile = path.join(
|
||||
path.dirname(target.sessionFile),
|
||||
"2026-06-04T12-00-00-000Z_sqlite-run-compact.jsonl",
|
||||
);
|
||||
await persistAgentRunSessionTargetIdentity({
|
||||
sessionFile: rotatedSessionFile,
|
||||
sessionId: "sqlite-run-compact",
|
||||
target,
|
||||
});
|
||||
|
||||
const rotatedTarget = await resolveAgentRunSessionTarget({
|
||||
agentId: "helper",
|
||||
config: { session: { store: sqlitePath } } as OpenClawConfig,
|
||||
sessionId: "sqlite-run-compact",
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
expect(rotatedTarget).toMatchObject({
|
||||
sessionFile: rotatedSessionFile,
|
||||
sessionId: "sqlite-run-compact",
|
||||
storageKind: "sqlite",
|
||||
});
|
||||
|
||||
const nextTarget = await resolveAgentRunSessionTarget({
|
||||
agentId: "helper",
|
||||
config: { session: { store: sqlitePath } } as OpenClawConfig,
|
||||
sessionId: "sqlite-run-next",
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
expect(nextTarget).toMatchObject({
|
||||
sessionId: "sqlite-run-next",
|
||||
storageKind: "sqlite",
|
||||
});
|
||||
expect(nextTarget.sessionFile).toContain("sqlite-run-next.jsonl");
|
||||
expect(
|
||||
loadSqliteSessionEntry({
|
||||
agentId: "helper",
|
||||
sessionKey,
|
||||
storePath: sqlitePath,
|
||||
})?.sessionId,
|
||||
).toBe("sqlite-run-next");
|
||||
expect(
|
||||
loadSqliteSessionEntry({
|
||||
agentId: "helper",
|
||||
sessionKey,
|
||||
storePath: sqlitePath,
|
||||
})?.sessionFile,
|
||||
).toBe(nextTarget.sessionFile);
|
||||
});
|
||||
|
||||
it("ignores stale SQLite session files outside the active artifact boundary", async () => {
|
||||
const sqlitePath = path.join(tempDir, "helper", "openclaw-agent.sqlite");
|
||||
const sessionKey = "agent:helper:commitments:sqlite-stale-file";
|
||||
const legacySessionFile = path.join(tempDir, "legacy", "session.jsonl");
|
||||
await patchSqliteSessionEntry(
|
||||
{
|
||||
agentId: "helper",
|
||||
sessionKey,
|
||||
storePath: sqlitePath,
|
||||
},
|
||||
() => ({
|
||||
sessionFile: legacySessionFile,
|
||||
sessionId: "sqlite-stale-file",
|
||||
updatedAt: Date.now(),
|
||||
}),
|
||||
{
|
||||
fallbackEntry: {
|
||||
sessionFile: legacySessionFile,
|
||||
sessionId: "sqlite-stale-file",
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
const target = await resolveAgentRunSessionTarget({
|
||||
agentId: "helper",
|
||||
config: { session: { store: sqlitePath } } as OpenClawConfig,
|
||||
sessionId: "sqlite-stale-file",
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
expect(target.sessionFile).not.toBe(legacySessionFile);
|
||||
expect(path.dirname(target.sessionFile)).toBe(
|
||||
path.join(path.dirname(sqlitePath), "embedded-run-session-files"),
|
||||
);
|
||||
});
|
||||
|
||||
it("can force SQLite resolution for canonical agent session stores", async () => {
|
||||
const storeRoot = path.join(
|
||||
tempDir,
|
||||
"state",
|
||||
"agents",
|
||||
"{agentId}",
|
||||
"sessions",
|
||||
"sessions.json",
|
||||
);
|
||||
const sqlitePath = path.join(
|
||||
tempDir,
|
||||
"state",
|
||||
"agents",
|
||||
"helper",
|
||||
"agent",
|
||||
"openclaw-agent.sqlite",
|
||||
);
|
||||
const sessionKey = "agent:helper:main";
|
||||
|
||||
const target = await resolveAgentRunSessionTarget({
|
||||
config: { session: { store: storeRoot } } as OpenClawConfig,
|
||||
sessionId: "helper-session",
|
||||
sessionKey,
|
||||
sessionTarget: { storageKind: "sqlite" },
|
||||
});
|
||||
|
||||
expect(target).toMatchObject({
|
||||
agentId: "helper",
|
||||
sessionId: "helper-session",
|
||||
sessionKey,
|
||||
sqlitePath,
|
||||
storageKind: "sqlite",
|
||||
});
|
||||
expect(
|
||||
loadSqliteSessionEntry({
|
||||
agentId: "helper",
|
||||
sessionKey,
|
||||
storePath: sqlitePath,
|
||||
})?.sessionId,
|
||||
).toBe("helper-session");
|
||||
});
|
||||
});
|
||||
139
src/agents/run-session-target.ts
Normal file
139
src/agents/run-session-target.ts
Normal file
@@ -0,0 +1,139 @@
|
||||
import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce";
|
||||
import { resolveStorePath } from "../config/sessions/paths.js";
|
||||
import {
|
||||
resolveSessionTranscriptRuntimeTarget,
|
||||
type SessionTranscriptRuntimeTarget,
|
||||
} from "../config/sessions/session-accessor.js";
|
||||
import {
|
||||
patchSqliteSessionEntry,
|
||||
resolveSqliteSessionTranscriptRuntimeTarget,
|
||||
type SqliteSessionTranscriptRuntimeTarget,
|
||||
} from "../config/sessions/session-accessor.sqlite.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { resolveAgentIdFromSessionKey } from "../routing/session-key.js";
|
||||
|
||||
/** Identifies a run transcript target without naming the current storage artifact. */
|
||||
export type AgentRunSessionTarget = {
|
||||
agentId?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
storageKind?: "file" | "sqlite";
|
||||
storePath?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
|
||||
/** Target resolved from storage-neutral run identity for current run internals. */
|
||||
export type ResolvedAgentRunSessionTarget =
|
||||
| SessionTranscriptRuntimeTarget
|
||||
| SqliteSessionTranscriptRuntimeTarget;
|
||||
|
||||
/** Resolves the active file-backed target used by current run/session internals. */
|
||||
export async function resolveAgentRunSessionTarget(params: {
|
||||
agentId?: string;
|
||||
config?: OpenClawConfig;
|
||||
sessionFile?: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
}): Promise<ResolvedAgentRunSessionTarget> {
|
||||
const sessionTarget = params.sessionTarget;
|
||||
const agentId = normalizeOptionalString(sessionTarget?.agentId) ?? params.agentId;
|
||||
const sessionId = normalizeOptionalString(sessionTarget?.sessionId) ?? params.sessionId;
|
||||
const sessionKey = normalizeOptionalString(sessionTarget?.sessionKey) ?? params.sessionKey;
|
||||
const effectiveAgentId = agentId ?? resolveAgentIdFromSessionKey(sessionKey);
|
||||
const sessionFile = normalizeOptionalString(params.sessionFile);
|
||||
if (sessionFile) {
|
||||
return {
|
||||
agentId: effectiveAgentId ?? "",
|
||||
sessionFile,
|
||||
sessionId,
|
||||
sessionKey: sessionKey ?? "",
|
||||
storageKind: "file",
|
||||
targetKind: "active-session-file",
|
||||
};
|
||||
}
|
||||
if (!sessionKey) {
|
||||
throw new Error(`Cannot resolve run session target without a session key: ${sessionId}`);
|
||||
}
|
||||
const storePath =
|
||||
normalizeOptionalString(sessionTarget?.storePath) ??
|
||||
resolveStorePath(params.config?.session?.store, { agentId: effectiveAgentId });
|
||||
const scope = {
|
||||
...(effectiveAgentId ? { agentId: effectiveAgentId } : {}),
|
||||
sessionId,
|
||||
sessionKey,
|
||||
storePath,
|
||||
...(sessionTarget?.threadId !== undefined ? { threadId: sessionTarget.threadId } : {}),
|
||||
};
|
||||
if (shouldUseSqliteSessionTarget({ sessionTarget, storePath })) {
|
||||
return await resolveSqliteSessionTranscriptRuntimeTarget(scope);
|
||||
}
|
||||
return await resolveSessionTranscriptRuntimeTarget({
|
||||
...scope,
|
||||
});
|
||||
}
|
||||
|
||||
/** Persists the current active artifact for storage-neutral run/session targets. */
|
||||
export async function persistAgentRunSessionTargetIdentity(params: {
|
||||
sessionFile: string;
|
||||
sessionId: string;
|
||||
target: ResolvedAgentRunSessionTarget;
|
||||
}): Promise<void> {
|
||||
if (params.target.storageKind !== "sqlite") {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
await patchSqliteSessionEntry(
|
||||
{
|
||||
agentId: params.target.agentId,
|
||||
sessionKey: params.target.sessionKey,
|
||||
storePath: params.target.sqlitePath,
|
||||
},
|
||||
() => ({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: now,
|
||||
}),
|
||||
{
|
||||
fallbackEntry: {
|
||||
sessionFile: params.sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: now,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function shouldUseSqliteSessionTarget(params: {
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
storePath: string;
|
||||
}): boolean {
|
||||
if (params.sessionTarget?.storageKind === "sqlite") {
|
||||
return true;
|
||||
}
|
||||
if (params.sessionTarget?.storageKind === "file") {
|
||||
return false;
|
||||
}
|
||||
return params.storePath.trim().toLowerCase().endsWith(".sqlite");
|
||||
}
|
||||
|
||||
/** Applies identity fields from the explicit target before legacy backfills run. */
|
||||
export function applyAgentRunSessionTargetIdentity<
|
||||
T extends {
|
||||
agentId?: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
},
|
||||
>(params: T): T {
|
||||
const target = params.sessionTarget;
|
||||
if (!target) {
|
||||
return params;
|
||||
}
|
||||
return {
|
||||
...params,
|
||||
agentId: normalizeOptionalString(target.agentId) ?? params.agentId,
|
||||
sessionId: normalizeOptionalString(target.sessionId) ?? params.sessionId,
|
||||
sessionKey: normalizeOptionalString(target.sessionKey) ?? params.sessionKey,
|
||||
};
|
||||
}
|
||||
@@ -1,4 +1,7 @@
|
||||
import path from "node:path";
|
||||
import { resolveStorePath } from "../../config/sessions/paths.js";
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
|
||||
|
||||
/**
|
||||
@@ -23,6 +26,20 @@ export type ParentForkDecision =
|
||||
message: string;
|
||||
};
|
||||
|
||||
type ParentForkDecisionParams = {
|
||||
parentEntry: SessionEntry;
|
||||
agentId?: string;
|
||||
config?: OpenClawConfig;
|
||||
storePath?: string;
|
||||
};
|
||||
|
||||
type ForkSessionFromParentParams = {
|
||||
parentEntry: SessionEntry;
|
||||
agentId: string;
|
||||
config?: OpenClawConfig;
|
||||
sessionsDir?: string;
|
||||
};
|
||||
|
||||
function loadSessionForkRuntime(): Promise<typeof import("./session-fork.runtime.js")> {
|
||||
return sessionForkRuntimeLoader.load();
|
||||
}
|
||||
@@ -37,14 +54,31 @@ function formatParentForkTooLargeMessage(params: {
|
||||
);
|
||||
}
|
||||
|
||||
export async function resolveParentForkDecision(params: {
|
||||
parentEntry: SessionEntry;
|
||||
storePath: string;
|
||||
}): Promise<ParentForkDecision> {
|
||||
function resolveParentForkStorePath(params: {
|
||||
agentId?: string;
|
||||
config?: OpenClawConfig;
|
||||
storePath?: string;
|
||||
}): string {
|
||||
return (
|
||||
params.storePath ?? resolveStorePath(params.config?.session?.store, { agentId: params.agentId })
|
||||
);
|
||||
}
|
||||
|
||||
function resolveParentForkSessionsDir(params: {
|
||||
agentId: string;
|
||||
config?: OpenClawConfig;
|
||||
sessionsDir?: string;
|
||||
}): string {
|
||||
return params.sessionsDir ?? path.dirname(resolveParentForkStorePath(params));
|
||||
}
|
||||
|
||||
export async function resolveParentForkDecision(
|
||||
params: ParentForkDecisionParams,
|
||||
): Promise<ParentForkDecision> {
|
||||
const maxTokens = DEFAULT_PARENT_FORK_MAX_TOKENS;
|
||||
const parentTokens = await resolveParentForkTokenCount({
|
||||
parentEntry: params.parentEntry,
|
||||
storePath: params.storePath,
|
||||
storePath: resolveParentForkStorePath(params),
|
||||
});
|
||||
if (typeof parentTokens === "number" && parentTokens > maxTokens) {
|
||||
return {
|
||||
@@ -62,13 +96,14 @@ export async function resolveParentForkDecision(params: {
|
||||
};
|
||||
}
|
||||
|
||||
export async function forkSessionFromParent(params: {
|
||||
parentEntry: SessionEntry;
|
||||
agentId: string;
|
||||
sessionsDir: string;
|
||||
}): Promise<{ sessionId: string; sessionFile: string } | null> {
|
||||
export async function forkSessionFromParent(
|
||||
params: ForkSessionFromParentParams,
|
||||
): Promise<{ sessionId: string; sessionFile: string } | null> {
|
||||
const runtime = await loadSessionForkRuntime();
|
||||
return runtime.forkSessionFromParentRuntime(params);
|
||||
return runtime.forkSessionFromParentRuntime({
|
||||
...params,
|
||||
sessionsDir: resolveParentForkSessionsDir(params),
|
||||
});
|
||||
}
|
||||
|
||||
async function resolveParentForkTokenCount(params: {
|
||||
|
||||
@@ -25,6 +25,7 @@ vi.mock("./model-selection.runtime.js", () => ({
|
||||
}));
|
||||
|
||||
function requireFirstEmbeddedAgentRequest(): {
|
||||
config?: OpenClawConfig;
|
||||
provider?: string;
|
||||
model?: string;
|
||||
disableTools?: boolean;
|
||||
@@ -224,6 +225,9 @@ describe("commitment extraction runtime", () => {
|
||||
expect(request.provider).toBe("openai");
|
||||
expect(request.model).toBe("gpt-5.5");
|
||||
expect(request.disableTools).toBe(true);
|
||||
expect(request.config?.session?.store).toMatch(
|
||||
/commitments\/extractor-sessions\/main\/sessions\.json$/,
|
||||
);
|
||||
});
|
||||
|
||||
it("backs off hidden extraction after terminal model or auth failures", async () => {
|
||||
|
||||
@@ -191,16 +191,6 @@ function openTerminalFailureCooldown(
|
||||
});
|
||||
}
|
||||
|
||||
function resolveExtractionSessionFile(agentId: string, runId: string): string {
|
||||
return path.join(
|
||||
resolveStateDir(),
|
||||
"commitments",
|
||||
"extractor-sessions",
|
||||
agentId,
|
||||
`${runId}.jsonl`,
|
||||
);
|
||||
}
|
||||
|
||||
function joinPayloadText(result: EmbeddedAgentPayloadResult): string {
|
||||
return (
|
||||
result.payloads
|
||||
@@ -211,6 +201,16 @@ function joinPayloadText(result: EmbeddedAgentPayloadResult): string {
|
||||
);
|
||||
}
|
||||
|
||||
function resolveExtractionSessionStore(agentId: string): string {
|
||||
return path.join(
|
||||
resolveStateDir(),
|
||||
"commitments",
|
||||
"extractor-sessions",
|
||||
agentId,
|
||||
"sessions.json",
|
||||
);
|
||||
}
|
||||
|
||||
async function resolveDefaultModel(params: {
|
||||
cfg: OpenClawConfig;
|
||||
agentId?: string;
|
||||
@@ -234,15 +234,21 @@ async function defaultExtractBatch(params: {
|
||||
const resolved = resolveCommitmentsConfig(cfg);
|
||||
const runId = `commitments-${randomUUID()}`;
|
||||
const modelRef = await resolveDefaultModel({ cfg, agentId: first.agentId });
|
||||
const helperConfig = {
|
||||
...cfg,
|
||||
session: {
|
||||
...cfg.session,
|
||||
store: resolveExtractionSessionStore(first.agentId),
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
const { runEmbeddedAgent } = await import("../agents/embedded-agent.js");
|
||||
const result = await runEmbeddedAgent({
|
||||
sessionId: runId,
|
||||
sessionKey: `agent:${first.agentId}:commitments:${runId}`,
|
||||
agentId: first.agentId,
|
||||
trigger: "manual",
|
||||
sessionFile: resolveExtractionSessionFile(first.agentId, runId),
|
||||
workspaceDir: resolveAgentWorkspaceDir(cfg, first.agentId),
|
||||
config: cfg,
|
||||
config: helperConfig,
|
||||
provider: modelRef.provider,
|
||||
model: modelRef.model,
|
||||
prompt: buildCommitmentExtractionPrompt({ cfg, items: params.items }),
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { resolveTimestampMsToIsoString } from "@openclaw/normalization-core/number-coercion";
|
||||
import type { Selectable } from "kysely";
|
||||
@@ -21,6 +22,7 @@ import {
|
||||
type OpenClawAgentDatabase,
|
||||
type OpenClawAgentDatabaseOptions,
|
||||
} from "../../state/openclaw-agent-db.js";
|
||||
import { validateSessionId } from "./paths.js";
|
||||
import type {
|
||||
ExactSessionEntry,
|
||||
SessionAccessScope,
|
||||
@@ -32,6 +34,7 @@ import type {
|
||||
SessionEntryUpdateOptions,
|
||||
SessionTranscriptAccessScope,
|
||||
SessionTranscriptReadScope,
|
||||
SessionTranscriptRuntimeTarget,
|
||||
SessionTranscriptWriteScope,
|
||||
TranscriptEvent,
|
||||
TranscriptMessageAppendOptions,
|
||||
@@ -82,6 +85,17 @@ type ResolvedSqliteStoreTarget = {
|
||||
path?: string;
|
||||
};
|
||||
|
||||
export type SqliteSessionTranscriptRuntimeTarget = Omit<
|
||||
SessionTranscriptRuntimeTarget,
|
||||
"storageKind" | "targetKind"
|
||||
> & {
|
||||
/** Current embedded-run internals still need one lock/fence/session-manager file. */
|
||||
activeArtifactKind: "embedded-run-session-file";
|
||||
sqlitePath: string;
|
||||
storageKind: "sqlite";
|
||||
targetKind: "sqlite-runtime-session";
|
||||
};
|
||||
|
||||
const SQLITE_SESSION_WRITER_QUEUES = new Map<string, StoreWriterQueue>();
|
||||
|
||||
/** Loads one session entry from the additive SQLite session store. */
|
||||
@@ -399,10 +413,87 @@ export async function publishSqliteTranscriptUpdate(
|
||||
});
|
||||
}
|
||||
|
||||
/** Resolves SQLite runtime identity plus the active artifact used by current runners. */
|
||||
export async function resolveSqliteSessionTranscriptRuntimeTarget(
|
||||
scope: SessionTranscriptAccessScope,
|
||||
): Promise<SqliteSessionTranscriptRuntimeTarget> {
|
||||
const resolved = resolveSqliteTranscriptScope(scope);
|
||||
const sqlitePath = resolveOpenClawAgentSqlitePath(toDatabaseOptions(resolved));
|
||||
const defaultSessionFile = resolveSqliteEmbeddedRunSessionFile(sqlitePath, resolved.sessionId);
|
||||
const now = Date.now();
|
||||
const entry = await patchSqliteSessionEntry(
|
||||
scope,
|
||||
(current) => {
|
||||
const currentSessionFile = resolveCurrentSqliteEmbeddedRunSessionFile(
|
||||
sqlitePath,
|
||||
current.sessionFile,
|
||||
);
|
||||
const sessionFile =
|
||||
current.sessionId?.trim() === resolved.sessionId && currentSessionFile
|
||||
? currentSessionFile
|
||||
: defaultSessionFile;
|
||||
|
||||
// Current embedded-run internals still rotate a file-backed active artifact
|
||||
// after compaction. Persist that pointer in SQLite so the next run reopens
|
||||
// the live branch instead of reconstructing the pre-rotation path.
|
||||
return { sessionFile, sessionId: resolved.sessionId, updatedAt: now };
|
||||
},
|
||||
{
|
||||
fallbackEntry: {
|
||||
sessionFile: defaultSessionFile,
|
||||
sessionId: resolved.sessionId,
|
||||
updatedAt: now,
|
||||
},
|
||||
},
|
||||
);
|
||||
const sessionId = entry?.sessionId?.trim() || resolved.sessionId;
|
||||
const sessionFile =
|
||||
resolveCurrentSqliteEmbeddedRunSessionFile(sqlitePath, entry?.sessionFile) ??
|
||||
resolveSqliteEmbeddedRunSessionFile(sqlitePath, sessionId);
|
||||
await fs.mkdir(path.dirname(sessionFile), { recursive: true, mode: 0o700 });
|
||||
return {
|
||||
activeArtifactKind: "embedded-run-session-file",
|
||||
agentId: resolved.agentId,
|
||||
sessionFile,
|
||||
sessionId,
|
||||
sessionKey: resolved.sessionKey,
|
||||
sqlitePath,
|
||||
storageKind: "sqlite",
|
||||
targetKind: "sqlite-runtime-session",
|
||||
};
|
||||
}
|
||||
|
||||
function getSessionKysely(database: import("node:sqlite").DatabaseSync) {
|
||||
return getNodeSqliteKysely<SessionSqliteDatabase>(database);
|
||||
}
|
||||
|
||||
function resolveSqliteEmbeddedRunSessionFile(sqlitePath: string, sessionId: string): string {
|
||||
return path.join(
|
||||
resolveSqliteEmbeddedRunSessionFileDir(sqlitePath),
|
||||
`${validateSessionId(sessionId)}.jsonl`,
|
||||
);
|
||||
}
|
||||
|
||||
function resolveSqliteEmbeddedRunSessionFileDir(sqlitePath: string): string {
|
||||
return path.join(path.dirname(sqlitePath), "embedded-run-session-files");
|
||||
}
|
||||
|
||||
function resolveCurrentSqliteEmbeddedRunSessionFile(
|
||||
sqlitePath: string,
|
||||
sessionFile: string | undefined,
|
||||
): string | undefined {
|
||||
const trimmed = sessionFile?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
const artifactDir = path.resolve(resolveSqliteEmbeddedRunSessionFileDir(sqlitePath));
|
||||
const candidate = path.resolve(trimmed);
|
||||
const relative = path.relative(artifactDir, candidate);
|
||||
return relative.length > 0 && !relative.startsWith("..") && !path.isAbsolute(relative)
|
||||
? trimmed
|
||||
: undefined;
|
||||
}
|
||||
|
||||
async function runExclusiveSqliteSessionWrite<T>(
|
||||
scope: Pick<ResolvedSqliteReadScope, "agentId" | "env" | "path">,
|
||||
fn: () => Promise<T>,
|
||||
|
||||
@@ -87,6 +87,16 @@ export type TranscriptMessageAppendResult<TMessage> = {
|
||||
|
||||
export type TranscriptUpdatePayload = Omit<SessionTranscriptUpdate, "sessionFile">;
|
||||
|
||||
/** Active transcript target resolved from storage-neutral runtime identity. */
|
||||
export type SessionTranscriptRuntimeTarget = {
|
||||
agentId: string;
|
||||
sessionFile: string;
|
||||
sessionId: string;
|
||||
sessionKey: string;
|
||||
storageKind: "file";
|
||||
targetKind: "active-session-file" | "runtime-session";
|
||||
};
|
||||
|
||||
export type SessionEntryUpdateOptions = {
|
||||
skipMaintenance?: boolean;
|
||||
takeCacheOwnership?: boolean;
|
||||
@@ -295,6 +305,21 @@ export async function publishTranscriptUpdate(
|
||||
});
|
||||
}
|
||||
|
||||
/** Resolves the current file-backed transcript artifact for a runtime session scope. */
|
||||
export async function resolveSessionTranscriptRuntimeTarget(
|
||||
scope: SessionTranscriptAccessScope,
|
||||
): Promise<SessionTranscriptRuntimeTarget> {
|
||||
const transcript = await resolveTranscriptAccess(scope);
|
||||
return {
|
||||
agentId: scope.agentId ?? resolveAgentIdFromSessionKey(scope.sessionKey) ?? "",
|
||||
sessionFile: transcript.sessionFile,
|
||||
sessionId: scope.sessionId,
|
||||
sessionKey: scope.sessionKey,
|
||||
storageKind: "file",
|
||||
targetKind: "runtime-session",
|
||||
};
|
||||
}
|
||||
|
||||
function createFallbackSessionEntry(patch: Partial<SessionEntry>): SessionEntry {
|
||||
const now = Date.now();
|
||||
return {
|
||||
|
||||
@@ -15,6 +15,7 @@ import {
|
||||
import type { DeliveryContext } from "../../utils/delivery-context.types.js";
|
||||
import { getFileStatSnapshot } from "../cache-utils.js";
|
||||
import { getRuntimeConfig } from "../io.js";
|
||||
import type { OpenClawConfig } from "../types.openclaw.js";
|
||||
import { formatSessionArchiveTimestamp } from "./artifacts.js";
|
||||
import { enforceSessionDiskBudget, type SessionDiskBudgetSweepResult } from "./disk-budget.js";
|
||||
import { deriveSessionMetaPatch } from "./metadata.js";
|
||||
@@ -186,6 +187,7 @@ type SingleEntryPersistencePatch = {
|
||||
|
||||
type SessionEntryWorkflowOptions = {
|
||||
agentId?: string;
|
||||
config?: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
hydrateSkillPromptRefs?: boolean;
|
||||
storePath?: string;
|
||||
@@ -220,7 +222,8 @@ function resolveSessionWorkflowStorePath(
|
||||
return options.storePath;
|
||||
}
|
||||
const agentId = options.agentId ?? resolveAgentIdFromSessionKey(options.sessionKey);
|
||||
return resolveStorePath(getRuntimeConfig().session?.store, {
|
||||
const storeConfig = options.config?.session?.store ?? getRuntimeConfig().session?.store;
|
||||
return resolveStorePath(storeConfig, {
|
||||
agentId,
|
||||
env: options.env,
|
||||
});
|
||||
|
||||
@@ -447,6 +447,27 @@ describe("Engine contract tests", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("delegateCompactionToRuntime uses runtime session identity when available", async () => {
|
||||
const compactRuntimeSpy = installCompactRuntimeSpy();
|
||||
|
||||
await delegateCompactionToRuntime({
|
||||
sessionId: "s3",
|
||||
sessionFile: "/tmp/session.json",
|
||||
runtimeContext: {
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
sessionFile: "/tmp/runtime-context-session.json",
|
||||
workspaceDir: "/tmp/workspace",
|
||||
},
|
||||
});
|
||||
|
||||
expect(compactRuntimeSpy).toHaveBeenCalledTimes(1);
|
||||
const compactRuntimeParams = requireCompactRuntimeParams(0);
|
||||
expect(compactRuntimeParams.agentId).toBe("main");
|
||||
expect(compactRuntimeParams.sessionKey).toBe("agent:main:main");
|
||||
expect(compactRuntimeParams.sessionFile).toBeUndefined();
|
||||
});
|
||||
|
||||
it("builds a normalized memory system prompt addition from the active memory prompt path", () => {
|
||||
registerMemoryPromptSection(({ citationsMode }) => [
|
||||
"## Memory Recall",
|
||||
|
||||
@@ -41,8 +41,9 @@ export async function delegateCompactionToRuntime(
|
||||
// runtimeContext carries the full CompactEmbeddedAgentSessionParams fields set
|
||||
// by runtime callers. We spread them and override the fields that come from
|
||||
// the public ContextEngine compact() signature directly.
|
||||
const runtimeContext = (params.runtimeContext ?? {}) as ContextEngineRuntimeContext &
|
||||
const runtimeContextWithFile = (params.runtimeContext ?? {}) as ContextEngineRuntimeContext &
|
||||
Partial<RuntimeCompactionParams>;
|
||||
const { sessionFile: _runtimeSessionFile, ...runtimeContext } = runtimeContextWithFile;
|
||||
const currentTokenCount =
|
||||
params.currentTokenCount ??
|
||||
(typeof runtimeContext.currentTokenCount === "number" &&
|
||||
@@ -50,11 +51,21 @@ export async function delegateCompactionToRuntime(
|
||||
runtimeContext.currentTokenCount > 0
|
||||
? Math.floor(runtimeContext.currentTokenCount)
|
||||
: undefined);
|
||||
const runtimeSessionKey =
|
||||
typeof runtimeContext.sessionKey === "string" && runtimeContext.sessionKey.trim()
|
||||
? runtimeContext.sessionKey.trim()
|
||||
: undefined;
|
||||
const sessionKey = params.sessionKey ?? runtimeSessionKey;
|
||||
const agentId =
|
||||
typeof runtimeContext.agentId === "string" && runtimeContext.agentId.trim()
|
||||
? runtimeContext.agentId.trim()
|
||||
: undefined;
|
||||
|
||||
const result = await compactEmbeddedAgentSessionDirect({
|
||||
...runtimeContext,
|
||||
sessionId: params.sessionId,
|
||||
sessionFile: params.sessionFile,
|
||||
...(sessionKey ? { sessionKey } : { sessionFile: params.sessionFile }),
|
||||
...(agentId ? { agentId } : {}),
|
||||
tokenBudget: params.tokenBudget,
|
||||
...(currentTokenCount !== undefined ? { currentTokenCount } : {}),
|
||||
force: params.force,
|
||||
|
||||
@@ -149,8 +149,10 @@ describe("Crestodian assistant", () => {
|
||||
expect(firstCliCall.model).toBe("claude-opus-4-8");
|
||||
expect(firstCliCall.cleanupCliLiveSessionOnRunEnd).toBe(true);
|
||||
const firstCliConfig = requireRecord(firstCliCall.config);
|
||||
const firstCliSession = requireRecord(firstCliConfig.session);
|
||||
const firstCliAgents = requireRecord(firstCliConfig.agents);
|
||||
const firstCliDefaults = requireRecord(firstCliAgents.defaults);
|
||||
expect(firstCliSession.store).toBe("/tmp/crestodian-planner/sessions.json");
|
||||
expect(firstCliDefaults.cliBackends).toBeUndefined();
|
||||
expect(firstCliCall.extraSystemPrompt).toBeTypeOf("string");
|
||||
expect(firstCliCall.extraSystemPrompt).toContain("Do not use tools, shell commands");
|
||||
@@ -225,12 +227,14 @@ describe("Crestodian assistant", () => {
|
||||
expect(firstEmbeddedCall.disableTools).toBe(true);
|
||||
expect(firstEmbeddedCall.toolsAllow).toEqual([]);
|
||||
const embeddedConfig = requireRecord(firstEmbeddedCall.config);
|
||||
const embeddedSession = requireRecord(embeddedConfig.session);
|
||||
const embeddedAgents = requireRecord(embeddedConfig.agents);
|
||||
const embeddedDefaults = requireRecord(embeddedAgents.defaults);
|
||||
const embeddedModel = requireRecord(embeddedDefaults.model);
|
||||
const embeddedPlugins = requireRecord(embeddedConfig.plugins);
|
||||
const embeddedEntries = requireRecord(embeddedPlugins.entries);
|
||||
const embeddedCodexEntry = requireRecord(embeddedEntries.codex);
|
||||
expect(embeddedSession.store).toBe("/tmp/crestodian-planner/sessions.json");
|
||||
expect(embeddedModel.primary).toBe("openai/gpt-5.5");
|
||||
expect(embeddedCodexEntry.enabled).toBe(true);
|
||||
});
|
||||
|
||||
@@ -181,9 +181,16 @@ async function runLocalRuntimePlanner(
|
||||
const tempDir = await (params.deps?.createTempDir ?? createTempPlannerDir)();
|
||||
try {
|
||||
const runId = `crestodian-planner-${randomUUID()}`;
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const sessionId = `${runId}-session`;
|
||||
const sessionKey = `temp:crestodian-planner:${runId}`;
|
||||
const backendConfig = backend.buildConfig(tempDir);
|
||||
const helperConfig = {
|
||||
...backendConfig,
|
||||
session: {
|
||||
...backendConfig.session,
|
||||
store: path.join(tempDir, "sessions.json"),
|
||||
},
|
||||
};
|
||||
switch (backend.runner) {
|
||||
case "cli": {
|
||||
const runCli = params.deps?.runCliAgent ?? (await loadRunCliAgent());
|
||||
@@ -192,9 +199,8 @@ async function runLocalRuntimePlanner(
|
||||
sessionKey,
|
||||
agentId: "crestodian",
|
||||
trigger: "manual",
|
||||
sessionFile,
|
||||
workspaceDir: tempDir,
|
||||
config: backend.buildConfig(tempDir),
|
||||
config: helperConfig,
|
||||
prompt: params.prompt,
|
||||
provider: backend.provider,
|
||||
model: backend.model,
|
||||
@@ -215,9 +221,8 @@ async function runLocalRuntimePlanner(
|
||||
sessionKey,
|
||||
agentId: "crestodian",
|
||||
trigger: "manual",
|
||||
sessionFile,
|
||||
workspaceDir: tempDir,
|
||||
config: backend.buildConfig(tempDir),
|
||||
config: helperConfig,
|
||||
prompt: params.prompt,
|
||||
provider: backend.provider,
|
||||
model: backend.model,
|
||||
|
||||
@@ -52,6 +52,10 @@ describe("generateSlugViaLLM", () => {
|
||||
const options = requireFirstRunOptions();
|
||||
expect(options.timeoutMs).toBe(15_000);
|
||||
expect(options.cleanupBundleMcpOnRunEnd).toBe(true);
|
||||
expect(options.sessionKey).toBe("temp:slug-generator");
|
||||
expect(options.sessionId).toMatch(/^slug-generator-/);
|
||||
expect(options.sessionFile).toBeUndefined();
|
||||
expect((options.config as OpenClawConfig).session?.store).toContain("openclaw-slug-");
|
||||
});
|
||||
|
||||
it("marks the run lane-local so internal-helper failures do not poison shared profile health (#71709)", async () => {
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
* LLM-based slug generator for session memory filenames
|
||||
*/
|
||||
|
||||
import { randomUUID } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
@@ -35,16 +36,20 @@ export async function generateSlugViaLLM(params: {
|
||||
sessionContent: string;
|
||||
cfg: OpenClawConfig;
|
||||
}): Promise<string | null> {
|
||||
let tempSessionFile: string | null = null;
|
||||
|
||||
let tempDir: string | undefined;
|
||||
try {
|
||||
const agentId = resolveDefaultAgentId(params.cfg);
|
||||
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, agentId);
|
||||
const agentDir = resolveAgentDir(params.cfg, agentId);
|
||||
|
||||
// Create a temporary session file for this one-off LLM call
|
||||
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-slug-"));
|
||||
tempSessionFile = path.join(tempDir, "session.jsonl");
|
||||
const sessionId = `slug-generator-${randomUUID()}`;
|
||||
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-slug-"));
|
||||
const helperConfig = {
|
||||
...params.cfg,
|
||||
session: {
|
||||
...params.cfg.session,
|
||||
store: path.join(tempDir, "sessions.json"),
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
const prompt = `Based on this conversation, generate a short 1-2 word filename slug (lowercase, hyphen-separated, no file extension).
|
||||
|
||||
@@ -60,13 +65,12 @@ Reply with ONLY the slug, nothing else. Examples: "vendor-pitch", "api-design",
|
||||
const timeoutMs = resolveSlugGeneratorTimeoutMs(params.cfg);
|
||||
|
||||
const result = await runEmbeddedAgent({
|
||||
sessionId: `slug-generator-${Date.now()}`,
|
||||
sessionId,
|
||||
sessionKey: "temp:slug-generator",
|
||||
agentId,
|
||||
sessionFile: tempSessionFile,
|
||||
workspaceDir,
|
||||
agentDir,
|
||||
config: params.cfg,
|
||||
config: helperConfig,
|
||||
prompt,
|
||||
provider,
|
||||
model,
|
||||
@@ -99,12 +103,11 @@ Reply with ONLY the slug, nothing else. Examples: "vendor-pitch", "api-design",
|
||||
log.error(`Failed to generate slug: ${message}`);
|
||||
return null;
|
||||
} finally {
|
||||
// Clean up temporary session file
|
||||
if (tempSessionFile) {
|
||||
if (tempDir) {
|
||||
try {
|
||||
await fs.rm(path.dirname(tempSessionFile), { recursive: true, force: true });
|
||||
await fs.rm(tempDir, { recursive: true, force: true });
|
||||
} catch {
|
||||
// Ignore cleanup errors
|
||||
// Ignore cleanup errors for one-off helper storage.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,13 +294,21 @@ describe("realtime voice agent consult runtime", () => {
|
||||
|
||||
expect(resolveParentForkDecision).toHaveBeenCalledWith({
|
||||
parentEntry: sessionStore["agent:main:main"],
|
||||
storePath: "/tmp/sessions.json",
|
||||
agentId: "main",
|
||||
config: {},
|
||||
});
|
||||
expect(forkSessionFromParent).toHaveBeenCalledWith({
|
||||
parentEntry: sessionStore["agent:main:main"],
|
||||
agentId: "main",
|
||||
sessionsDir: "/tmp",
|
||||
config: {},
|
||||
});
|
||||
expect(runtime.session.patchSessionEntry).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
agentId: "main",
|
||||
config: {},
|
||||
sessionKey: "agent:main:subagent:google-meet:meet-1",
|
||||
}),
|
||||
);
|
||||
const forkedEntry = sessionStore["agent:main:subagent:google-meet:meet-1"];
|
||||
if (!forkedEntry) {
|
||||
throw new Error("Expected forked consult session entry");
|
||||
@@ -315,10 +323,70 @@ describe("realtime voice agent consult runtime", () => {
|
||||
expectPositiveTimestamp(forkedEntry.updatedAt);
|
||||
const call = requireEmbeddedAgentCall(runEmbeddedAgent);
|
||||
expect(call.sessionId).toBe("forked-session");
|
||||
expect(call.sessionFile).toBe("/tmp/forked.jsonl");
|
||||
expect(call.sessionFile).toBeUndefined();
|
||||
expect(call.spawnedBy).toBe("agent:main:main");
|
||||
});
|
||||
|
||||
it("forks cross-agent requester context from the requester agent store", async () => {
|
||||
const { runtime, runEmbeddedAgent, sessionStore } = createAgentRuntime();
|
||||
sessionStore["agent:main:main"] = {
|
||||
sessionId: "parent-session",
|
||||
sessionFile: "relative-parent.jsonl",
|
||||
totalTokens: 100,
|
||||
updatedAt: 1,
|
||||
};
|
||||
const resolveParentForkDecision = vi.fn(async () => ({
|
||||
status: "fork" as const,
|
||||
maxTokens: 100_000,
|
||||
parentTokens: 100,
|
||||
}));
|
||||
const forkSessionFromParent = vi.fn(async () => ({
|
||||
sessionId: "forked-session",
|
||||
sessionFile: "/tmp/forked.jsonl",
|
||||
}));
|
||||
setRealtimeVoiceAgentConsultDepsForTest({
|
||||
resolveParentForkDecision,
|
||||
forkSessionFromParent,
|
||||
});
|
||||
|
||||
await consultRealtimeVoiceAgent({
|
||||
cfg: {} as never,
|
||||
agentRuntime: runtime as never,
|
||||
logger: { warn: vi.fn() },
|
||||
agentId: "voice-agent",
|
||||
sessionKey: "agent:voice-agent:google-meet:meet-1",
|
||||
spawnedBy: "agent:main:main",
|
||||
contextMode: "fork",
|
||||
messageProvider: "google-meet",
|
||||
lane: "google-meet",
|
||||
runIdPrefix: "google-meet:meet-1",
|
||||
args: { question: "What should I say?" },
|
||||
transcript: [],
|
||||
surface: "a private Google Meet",
|
||||
userLabel: "Participant",
|
||||
});
|
||||
|
||||
expect(resolveParentForkDecision).toHaveBeenCalledWith({
|
||||
parentEntry: sessionStore["agent:main:main"],
|
||||
agentId: "main",
|
||||
config: {},
|
||||
});
|
||||
expect(forkSessionFromParent).toHaveBeenCalledWith({
|
||||
parentEntry: sessionStore["agent:main:main"],
|
||||
agentId: "main",
|
||||
config: {},
|
||||
});
|
||||
expect(runtime.session.patchSessionEntry).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
agentId: "voice-agent",
|
||||
config: {},
|
||||
sessionKey: "agent:voice-agent:google-meet:meet-1",
|
||||
}),
|
||||
);
|
||||
const call = requireEmbeddedAgentCall(runEmbeddedAgent);
|
||||
expect(call.sessionFile).toBe("/tmp/forked.jsonl");
|
||||
});
|
||||
|
||||
it("inherits requester message routing for forked consult sessions", async () => {
|
||||
const { runtime, runEmbeddedAgent, sessionStore } = createAgentRuntime();
|
||||
sessionStore["agent:main:discord:channel:123"] = {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import path from "node:path";
|
||||
import type { RunEmbeddedAgentParams } from "../agents/embedded-agent-runner/run/params.js";
|
||||
import {
|
||||
forkSessionFromParent,
|
||||
@@ -81,7 +80,8 @@ function resolveDeliverySessionFields(context?: DeliveryContext): Partial<Sessio
|
||||
|
||||
function resolveRealtimeVoiceAgentDeliveryContext(params: {
|
||||
agentRuntime: RealtimeVoiceAgentConsultRuntime;
|
||||
storePath: string;
|
||||
agentId: string;
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey: string;
|
||||
spawnedBy?: string | null;
|
||||
}): DeliveryContext | undefined {
|
||||
@@ -96,8 +96,10 @@ function resolveRealtimeVoiceAgentDeliveryContext(params: {
|
||||
}
|
||||
candidates.push(params.sessionKey);
|
||||
for (const key of candidates) {
|
||||
const parsed = parseAgentSessionKey(key);
|
||||
const entry = params.agentRuntime.session.getSessionEntry({
|
||||
storePath: params.storePath,
|
||||
agentId: parsed?.agentId ?? params.agentId,
|
||||
config: params.cfg,
|
||||
sessionKey: key,
|
||||
});
|
||||
const context = deliveryContextFromSession(entry);
|
||||
@@ -113,11 +115,11 @@ function resolveRealtimeVoiceAgentDeliveryContext(params: {
|
||||
|
||||
async function resolveRealtimeVoiceAgentConsultSessionEntry(params: {
|
||||
agentId: string;
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey: string;
|
||||
spawnedBy?: string | null;
|
||||
contextMode?: RealtimeVoiceAgentConsultContextMode;
|
||||
deliveryContext?: DeliveryContext;
|
||||
storePath: string;
|
||||
agentRuntime: RealtimeVoiceAgentConsultRuntime;
|
||||
logger: Pick<RuntimeLogger, "warn">;
|
||||
}): Promise<SessionEntry> {
|
||||
@@ -125,14 +127,13 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: {
|
||||
const deliveryFields = resolveDeliverySessionFields(params.deliveryContext);
|
||||
const requesterSessionKey = params.spawnedBy?.trim();
|
||||
const requesterAgentId = parseAgentSessionKey(requesterSessionKey)?.agentId;
|
||||
const shouldFork =
|
||||
params.contextMode === "fork" &&
|
||||
requesterSessionKey &&
|
||||
(!requesterAgentId || requesterAgentId === params.agentId);
|
||||
const parentAgentId = requesterAgentId ?? params.agentId;
|
||||
const shouldFork = params.contextMode === "fork" && requesterSessionKey;
|
||||
let forkDecisionWarning: string | undefined;
|
||||
|
||||
const patched = await params.agentRuntime.session.patchSessionEntry({
|
||||
storePath: params.storePath,
|
||||
agentId: params.agentId,
|
||||
config: params.cfg,
|
||||
sessionKey: params.sessionKey,
|
||||
fallbackEntry: {
|
||||
sessionId: "",
|
||||
@@ -144,24 +145,28 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: {
|
||||
}
|
||||
if (shouldFork) {
|
||||
const parentEntry = params.agentRuntime.session.getSessionEntry({
|
||||
storePath: params.storePath,
|
||||
agentId: parentAgentId,
|
||||
config: params.cfg,
|
||||
sessionKey: requesterSessionKey,
|
||||
});
|
||||
if (parentEntry?.sessionId?.trim()) {
|
||||
const decision = await realtimeVoiceAgentConsultDeps.resolveParentForkDecision({
|
||||
parentEntry,
|
||||
storePath: params.storePath,
|
||||
agentId: parentAgentId,
|
||||
config: params.cfg,
|
||||
});
|
||||
if (decision.status === "fork") {
|
||||
const fork = await realtimeVoiceAgentConsultDeps.forkSessionFromParent({
|
||||
parentEntry,
|
||||
agentId: params.agentId,
|
||||
sessionsDir: path.dirname(params.storePath),
|
||||
agentId: parentAgentId,
|
||||
config: params.cfg,
|
||||
});
|
||||
if (fork) {
|
||||
return {
|
||||
...deliveryFields,
|
||||
sessionId: fork.sessionId,
|
||||
// Current fork storage is file-backed; persist the artifact on
|
||||
// the entry so the run target resolver reuses the forked branch.
|
||||
sessionFile: fork.sessionFile,
|
||||
spawnedBy: requesterSessionKey,
|
||||
forkedFromParent: true,
|
||||
@@ -221,35 +226,39 @@ export async function consultRealtimeVoiceAgent(params: {
|
||||
const workspaceDir = params.agentRuntime.resolveAgentWorkspaceDir(params.cfg, agentId);
|
||||
await params.agentRuntime.ensureAgentWorkspace({ dir: workspaceDir });
|
||||
|
||||
const storePath = params.agentRuntime.session.resolveStorePath(params.cfg.session?.store, {
|
||||
agentId,
|
||||
});
|
||||
const resolvedDeliveryContext = resolveRealtimeVoiceAgentDeliveryContext({
|
||||
agentRuntime: params.agentRuntime,
|
||||
storePath,
|
||||
agentId,
|
||||
cfg: params.cfg,
|
||||
sessionKey: params.sessionKey,
|
||||
spawnedBy: params.spawnedBy,
|
||||
});
|
||||
const sessionEntry = await resolveRealtimeVoiceAgentConsultSessionEntry({
|
||||
agentId,
|
||||
cfg: params.cfg,
|
||||
sessionKey: params.sessionKey,
|
||||
spawnedBy: params.spawnedBy,
|
||||
contextMode: params.contextMode,
|
||||
deliveryContext: resolvedDeliveryContext,
|
||||
storePath,
|
||||
agentRuntime: params.agentRuntime,
|
||||
logger: params.logger,
|
||||
});
|
||||
const consultDeliveryContext =
|
||||
resolvedDeliveryContext ?? deliveryContextFromSession(sessionEntry);
|
||||
const sessionId = sessionEntry.sessionId;
|
||||
const requesterAgentId = parseAgentSessionKey(params.spawnedBy?.trim())?.agentId;
|
||||
const crossAgentForkSessionFile =
|
||||
sessionEntry.forkedFromParent && requesterAgentId && requesterAgentId !== agentId
|
||||
? sessionEntry.sessionFile?.trim()
|
||||
: undefined;
|
||||
|
||||
const sessionFile = params.agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, {
|
||||
agentId,
|
||||
});
|
||||
const result = await params.agentRuntime.runEmbeddedAgent({
|
||||
sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
// Cross-agent forks are file-backed active artifacts in the requester store.
|
||||
// Passing the artifact keeps the consult run on the forked branch until
|
||||
// consult forking moves to a storage-neutral parent/child target contract.
|
||||
...(crossAgentForkSessionFile ? { sessionFile: crossAgentForkSessionFile } : {}),
|
||||
sandboxSessionKey: resolveRealtimeVoiceAgentSandboxSessionKey(agentId, params.sessionKey),
|
||||
agentId,
|
||||
spawnedBy: params.spawnedBy,
|
||||
@@ -262,7 +271,6 @@ export async function consultRealtimeVoiceAgent(params: {
|
||||
consultDeliveryContext?.threadId != null
|
||||
? String(consultDeliveryContext.threadId)
|
||||
: undefined,
|
||||
sessionFile,
|
||||
workspaceDir,
|
||||
config: params.cfg,
|
||||
prompt: buildRealtimeVoiceAgentConsultPrompt({
|
||||
|
||||
Reference in New Issue
Block a user