Compare commits

..

19 Commits

Author SHA1 Message Date
Dallin Romney
3674ec478e docs: add maturity docs routes 2026-06-23 14:14:29 -07:00
Josh Lehman
0dfa22c6e0 refactor: add embedded run session target seam (#90439) 2026-06-23 10:08:29 -07:00
Vincent Koc
6f80552ee9 fix(qa): prove direct reply routing via qa channel 2026-06-24 00:41:28 +08:00
Josh Lehman
258b83c438 refactor: migrate plugin transcript mirrors (#89518) 2026-06-23 09:32:45 -07:00
Vincent Koc
d095d98a02 fix(qa): reject duplicate rpc rtt controls 2026-06-23 18:00:53 +02:00
Vincent Koc
9ff7abc898 test(ci): read sparse android guard files from git 2026-06-23 23:50:51 +08:00
Vincent Koc
dc9c11be91 fix(qa): reject duplicate plugin gauntlet controls 2026-06-23 17:37:53 +02:00
Vincent Koc
58552f6d7c ci: make release maturity scorecard opt-in 2026-06-23 23:32:45 +08:00
Vincent Koc
b8811b7dde fix(qa): reject duplicate gateway cpu controls 2026-06-23 17:30:37 +02:00
Vincent Koc
0850d83de1 fix(qa): reject duplicate model resolution perf controls 2026-06-23 17:24:41 +02:00
Jesse Merhi
92c10d4edc Fix WebChat dispatch failure session status (#84352)
Merged via squash.

Prepared head SHA: 562f2ac5a8
Co-authored-by: jesse-merhi <79823012+jesse-merhi@users.noreply.github.com>
Reviewed-by: @jesse-merhi
2026-06-24 01:19:21 +10:00
Vincent Koc
b22ae2a4da fix(qa): reject duplicate model bench controls 2026-06-23 17:18:47 +02:00
Vincent Koc
a822c9abaa fix(qa): reject duplicate gateway restart controls 2026-06-23 17:13:39 +02:00
Vincent Koc
c308295cd3 fix(qa): reject duplicate gateway startup controls 2026-06-23 17:08:08 +02:00
Vincent Koc
524e19726f fix(qa): reject duplicate cli bench controls 2026-06-23 17:01:42 +02:00
Vincent Koc
bc243568e7 chore(acpx): bump bundled client to 0.11.2 (#96124) 2026-06-23 22:54:51 +08:00
Vincent Koc
2cbb4e70cc fix(qa): reject duplicate telegram proof controls 2026-06-23 16:54:31 +02:00
Vincent Koc
e9b017d9dc fix(qa): reject duplicate abort leak controls 2026-06-23 16:46:39 +02:00
Vincent Koc
bde5be874a fix(qa): reject duplicate sqlite bench controls 2026-06-23 16:41:01 +02:00
51 changed files with 1976 additions and 658 deletions

1
.github/labeler.yml vendored
View File

@@ -118,6 +118,7 @@
- any-glob-to-any-file:
- "extensions/qa-lab/**"
- "qa/scenarios/**"
- "docs/maturity/**"
- "docs/concepts/qa-e2e-automation.md"
- "docs/concepts/personal-agent-benchmark-pack.md"
- "docs/channels/qa-channel.md"

View File

@@ -44,6 +44,11 @@ on:
required: false
default: false
type: boolean
run_maturity_scorecard:
description: Render advisory maturity scorecard release docs; default release checks rely on dedicated package, QA, live, and E2E gates
required: false
default: false
type: boolean
rerun_group:
description: Release check group to run
required: false
@@ -106,6 +111,7 @@ jobs:
mode: ${{ steps.inputs.outputs.mode }}
release_profile: ${{ steps.inputs.outputs.release_profile }}
run_release_soak: ${{ steps.inputs.outputs.run_release_soak }}
run_maturity_scorecard: ${{ steps.inputs.outputs.run_maturity_scorecard }}
rerun_group: ${{ steps.inputs.outputs.rerun_group }}
live_suite_filter: ${{ steps.inputs.outputs.live_suite_filter }}
cross_os_suite_filter: ${{ steps.inputs.outputs.cross_os_suite_filter }}
@@ -279,6 +285,7 @@ jobs:
RELEASE_MODE_INPUT: ${{ inputs.mode }}
RELEASE_PROFILE_INPUT: ${{ inputs.release_profile }}
RELEASE_RUN_RELEASE_SOAK_INPUT: ${{ inputs.run_release_soak }}
RELEASE_RUN_MATURITY_SCORECARD_INPUT: ${{ inputs.run_maturity_scorecard }}
RELEASE_RERUN_GROUP_INPUT: ${{ inputs.rerun_group }}
RELEASE_LIVE_SUITE_FILTER_INPUT: ${{ inputs.live_suite_filter }}
RELEASE_CROSS_OS_SUITE_FILTER_INPUT: ${{ inputs.cross_os_suite_filter }}
@@ -319,6 +326,12 @@ jobs:
else
run_release_soak=true
fi
run_maturity_scorecard="$(printf '%s' "$RELEASE_RUN_MATURITY_SCORECARD_INPUT" | tr '[:upper:]' '[:lower:]')"
if [[ "$run_maturity_scorecard" != "true" && "$run_maturity_scorecard" != "1" && "$run_maturity_scorecard" != "yes" ]]; then
run_maturity_scorecard=false
else
run_maturity_scorecard=true
fi
release_profile="$RELEASE_PROFILE_INPUT"
if [[ "$release_profile" == "minimum" ]]; then
release_profile=beta
@@ -422,6 +435,7 @@ jobs:
printf 'mode=%s\n' "$RELEASE_MODE_INPUT"
printf 'release_profile=%s\n' "$release_profile"
printf 'run_release_soak=%s\n' "$run_release_soak"
printf 'run_maturity_scorecard=%s\n' "$run_maturity_scorecard"
printf 'rerun_group=%s\n' "$RELEASE_RERUN_GROUP_INPUT"
printf 'live_suite_filter=%s\n' "$RELEASE_LIVE_SUITE_FILTER_INPUT"
printf 'cross_os_suite_filter=%s\n' "$RELEASE_CROSS_OS_SUITE_FILTER_INPUT"
@@ -444,6 +458,7 @@ jobs:
RELEASE_MODE: ${{ inputs.mode }}
RELEASE_PROFILE: ${{ steps.inputs.outputs.release_profile }}
RUN_RELEASE_SOAK: ${{ steps.inputs.outputs.run_release_soak }}
RUN_MATURITY_SCORECARD: ${{ steps.inputs.outputs.run_maturity_scorecard }}
RELEASE_RERUN_GROUP: ${{ inputs.rerun_group }}
RELEASE_LIVE_SUITE_FILTER: ${{ inputs.live_suite_filter }}
RELEASE_CROSS_OS_SUITE_FILTER: ${{ inputs.cross_os_suite_filter }}
@@ -461,6 +476,7 @@ jobs:
echo "- Cross-OS mode: \`${RELEASE_MODE}\`"
echo "- Release profile: \`${RELEASE_PROFILE}\`"
echo "- Release soak lanes: \`${RUN_RELEASE_SOAK}\`"
echo "- Maturity scorecard docs: \`${RUN_MATURITY_SCORECARD}\`"
echo "- Rerun group: \`${RELEASE_RERUN_GROUP}\`"
if [[ -n "${RELEASE_LIVE_SUITE_FILTER// }" ]]; then
echo "- Live suite filter: \`${RELEASE_LIVE_SUITE_FILTER}\`"
@@ -770,7 +786,7 @@ jobs:
maturity_scorecard_release_checks:
name: Render maturity scorecard release docs
needs: [resolve_target]
if: contains(fromJSON('["all","qa"]'), needs.resolve_target.outputs.rerun_group)
if: contains(fromJSON('["all","qa"]'), needs.resolve_target.outputs.rerun_group) && needs.resolve_target.outputs.run_maturity_scorecard == 'true'
permissions:
actions: read
contents: read

View File

@@ -24,6 +24,14 @@ This directory owns docs authoring, Mintlify link rules, and docs i18n policy.
- `scripts/docs-sync-publish.mjs` excludes and prunes `docs/internal/**` from the public `openclaw/docs` publish repo if a page is force-added later.
- Internal docs may mention repo paths, private app names, 1Password item names, and runbooks, but never include secret values.
## Maturity Scorecard Editing
`taxonomy.yaml` and `qa/maturity-scores.yaml` are the source inputs; generated maturity docs under `docs/maturity/` are projections and should not be hand-edited for score, LTS, taxonomy, QA profile, or evidence tables.
`scripts/qa/render-maturity-docs.ts` owns generation; use `pnpm maturity:render` to refresh committed docs and `pnpm maturity:check` to verify them.
`.github/workflows/maturity-scorecard.yml` renders artifact previews and can open generated-doc PRs; `.github/workflows/openclaw-release-checks.yml` dispatches it for release QA.
Keep deterministic `qa-evidence.json.scorecard` data in GitHub Actions artifacts unless a maintainer explicitly asks for a sanitized committed projection.
Human overrides must change source state in a PR and explain the reason plus public or redacted evidence.
## Docs i18n
- Foreign-language docs are not maintained in this repo. The generated publish output lives in the separate `openclaw/docs` repo (often cloned locally as `../openclaw-docs`).

View File

@@ -966,6 +966,7 @@ output and whose artifact paths are resolved relative to that producer
`qa run --qa-profile`, the same `qa-evidence.json` also includes the profile
scorecard summary for the selected taxonomy categories.
Treat it as a discovery aid, not a gate replacement; the selected scenario still needs the right provider mode, live transport, Multipass, Testbox, or release lane for the behavior under test.
For scorecard context, see [Maturity scorecard](/maturity/scorecard).
For character and style checks, run the same scenario across multiple live model
refs and write a judged Markdown report:
@@ -1023,6 +1024,7 @@ When no `--judge-model` is passed, the judges default to
## Related docs
- [Matrix QA](/concepts/qa-matrix)
- [Maturity scorecard](/maturity/scorecard)
- [Personal agent benchmark pack](/concepts/personal-agent-benchmark-pack)
- [QA Channel](/channels/qa-channel)
- [Testing](/help/testing)

View File

@@ -20,6 +20,7 @@ of Docker runners. This doc is a "how we test" guide:
- [QA overview](/concepts/qa-e2e-automation) - architecture, command surface, scenario authoring.
- [Matrix QA](/concepts/qa-matrix) - reference for `pnpm openclaw qa matrix`.
- [Maturity scorecard](/maturity/scorecard) - how release QA evidence supports stability and LTS decisions.
- [QA channel](/channels/qa-channel) - the synthetic transport plugin used by repo-backed scenarios.
This page covers running the regular test suites and Docker/Parallels runners. The QA-specific runners section below ([QA-specific runners](#qa-specific-runners)) lists the concrete `qa` invocations and points back at the references above.

View File

@@ -78,13 +78,16 @@ type CodexWorkspaceBootstrapContext = CodexBootstrapContext & {
};
/** Reads mirrored Codex session history for harness hooks. */
export async function readMirroredSessionHistoryMessages(
sessionFile: string,
): Promise<AgentMessage[] | undefined> {
const messages = await readCodexMirroredSessionHistoryMessages(sessionFile);
export async function readMirroredSessionHistoryMessages(params: {
agentId?: string;
sessionFile: string;
sessionId: string;
sessionKey?: string;
}): Promise<AgentMessage[] | undefined> {
const messages = await readCodexMirroredSessionHistoryMessages(params);
if (!messages) {
embeddedAgentLog.warn("failed to read mirrored session history for codex harness hooks", {
sessionFile,
sessionFile: params.sessionFile,
});
}
return messages;

View File

@@ -1827,7 +1827,14 @@ export class CodexAppServerEventProjector {
}
private async readMirroredSessionMessages(): Promise<AgentMessage[]> {
return (await readCodexMirroredSessionHistoryMessages(this.params.sessionFile)) ?? [];
return (
(await readCodexMirroredSessionHistoryMessages({
agentId: this.params.agentId,
sessionFile: this.params.sessionFile,
sessionId: this.params.sessionId,
sessionKey: this.params.sessionKey,
})) ?? []
);
}
private createAssistantMessage(text: string): AssistantMessage {

View File

@@ -91,9 +91,6 @@ const DEFAULT_COMPLETION_DELIVERY_RETRY_DELAYS_MS = [
];
const DEFAULT_TASK_ROW_RECONCILE_INTERVAL_MS = 10_000;
const RECENT_TERMINAL_TASK_RECONCILE_GRACE_MS = 60_000;
// Codex's recorder uses this filename contract; non-canonical names keep the
// legacy substring fallback for older or test-created transcript files.
const CODEX_ROLLOUT_FILENAME_RE = /^rollout-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}-(.+)\.jsonl$/u;
const defaultRuntime: NativeSubagentMonitorRuntime = {
createAgentHarnessTaskRuntime,
@@ -1191,9 +1188,8 @@ async function findTranscriptPaths(params: {
}): Promise<Map<string, string>> {
const sessionsDir = path.join(params.codexHome, "sessions");
const found = new Map<string, string>();
const remaining = new Set(params.childThreadIds);
const stack = [sessionsDir];
while (stack.length > 0 && remaining.size > 0) {
while (stack.length > 0 && found.size < params.childThreadIds.size) {
const dir = stack.pop()!;
let entries: Array<{ name: string; isDirectory(): boolean; isFile(): boolean }>;
try {
@@ -1210,20 +1206,10 @@ async function findTranscriptPaths(params: {
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
continue;
}
const rolloutMatch = entry.name.match(CODEX_ROLLOUT_FILENAME_RE);
if (rolloutMatch) {
const childThreadId = rolloutMatch[1];
if (remaining.delete(childThreadId)) {
for (const childThreadId of params.childThreadIds) {
if (!found.has(childThreadId) && entry.name.includes(childThreadId)) {
found.set(childThreadId, entryPath);
}
continue;
}
for (const childThreadId of remaining) {
if (entry.name.includes(childThreadId)) {
found.set(childThreadId, entryPath);
remaining.delete(childThreadId);
break;
}
}
}
}
@@ -1250,13 +1236,10 @@ async function findTranscriptPath(params: {
stack.push(entryPath);
continue;
}
const rolloutMatch = entry.name.match(CODEX_ROLLOUT_FILENAME_RE);
if (
entry.isFile() &&
entry.name.endsWith(".jsonl") &&
(rolloutMatch
? rolloutMatch[1] === params.childThreadId
: entry.name.includes(params.childThreadId))
entry.name.includes(params.childThreadId)
) {
return entryPath;
}

View File

@@ -849,7 +849,16 @@ export async function runCodexAppServerAttempt(
},
});
const hadSessionFile = await pathExists(activeSessionFile);
let historyMessages = (await readMirroredSessionHistoryMessages(activeSessionFile)) ?? [];
const activeTranscriptTarget = {
agentId: sessionAgentId,
sessionFile: activeSessionFile,
sessionId: activeSessionId,
sessionKey: contextSessionKey,
};
let historyMessages =
!activeContextEngine && initialStartupBindingHadInactiveThreadBootstrap
? []
: ((await readMirroredSessionHistoryMessages(activeTranscriptTarget)) ?? []);
const hookContextWindowFields = {
...(params.contextWindowInfo?.tokens
? { contextTokenBudget: params.contextWindowInfo.tokens }
@@ -907,7 +916,7 @@ export async function runCodexAppServerAttempt(
warn: (message) => embeddedAgentLog.warn(message),
});
historyMessages =
(await readMirroredSessionHistoryMessages(activeSessionFile)) ?? historyMessages;
(await readMirroredSessionHistoryMessages(activeTranscriptTarget)) ?? historyMessages;
}
const memoryToolNames = getCodexWorkspaceMemoryToolNames(toolBridge.availableSpecs);
const workspaceBootstrapContext = await buildCodexWorkspaceBootstrapContext({
@@ -3039,7 +3048,7 @@ export async function runCodexAppServerAttempt(
const activeContextEnginePluginIdLocal =
resolveContextEngineOwnerPluginId(activeContextEngine);
const finalMessages =
(await readMirroredSessionHistoryMessages(activeSessionFile)) ??
(await readMirroredSessionHistoryMessages(activeTranscriptTarget)) ??
historyMessages.concat(result.messagesSnapshot);
await finalizeHarnessContextEngineTurn({
contextEngine: activeContextEngine,

View File

@@ -51,6 +51,14 @@ function messageEntry(params: {
};
}
function mirroredTarget(sessionFile: string) {
return {
sessionFile,
sessionId: "codex-session",
sessionKey: "codex-session",
};
}
describe("readCodexMirroredSessionHistoryMessages", () => {
it("replays only the branch selected by a leaf control", async () => {
const sessionFile = await writeSession([
@@ -75,7 +83,9 @@ describe("readCodexMirroredSessionHistoryMessages", () => {
},
]);
await expect(readCodexMirroredSessionHistoryMessages(sessionFile)).resolves.toMatchObject([
await expect(
readCodexMirroredSessionHistoryMessages(mirroredTarget(sessionFile)),
).resolves.toMatchObject([
{ role: "user", content: "root prompt" },
{ role: "assistant", content: "active answer" },
]);
@@ -93,7 +103,9 @@ describe("readCodexMirroredSessionHistoryMessages", () => {
},
]);
await expect(readCodexMirroredSessionHistoryMessages(sessionFile)).resolves.toEqual([]);
await expect(
readCodexMirroredSessionHistoryMessages(mirroredTarget(sessionFile)),
).resolves.toEqual([]);
});
it("keeps visible history when continuation rows use a disjoint append cursor", async () => {
@@ -125,7 +137,9 @@ describe("readCodexMirroredSessionHistoryMessages", () => {
}),
]);
await expect(readCodexMirroredSessionHistoryMessages(sessionFile)).resolves.toMatchObject([
await expect(
readCodexMirroredSessionHistoryMessages(mirroredTarget(sessionFile)),
).resolves.toMatchObject([
{ role: "user", content: "visible prompt" },
{ role: "assistant", content: "continued answer" },
]);
@@ -154,7 +168,9 @@ describe("readCodexMirroredSessionHistoryMessages", () => {
}),
]);
await expect(readCodexMirroredSessionHistoryMessages(sessionFile)).resolves.toMatchObject([
await expect(
readCodexMirroredSessionHistoryMessages(mirroredTarget(sessionFile)),
).resolves.toMatchObject([
{ role: "user", content: "visible prompt" },
{ role: "assistant", content: "continued answer" },
]);

View File

@@ -10,40 +10,59 @@ import {
migrateSessionEntries,
parseSessionEntries,
} from "openclaw/plugin-sdk/agent-sessions";
import {
resolveSessionTranscriptTarget,
type SessionTranscriptTargetParams,
} from "openclaw/plugin-sdk/session-transcript-runtime";
import { sanitizeCodexHistoryImagePayloads } from "./image-payload-sanitizer.js";
function isMissingFileError(error: unknown): boolean {
return Boolean(
error &&
typeof error === "object" &&
"code" in error &&
(error as { code?: unknown }).code === "ENOENT",
);
}
export type CodexMirroredSessionHistoryTarget = {
agentId?: string;
sessionFile: string;
sessionId: string;
sessionKey?: string;
};
/** Returns sanitized session-context messages for a Codex mirrored session file. */
export async function readCodexMirroredSessionHistoryMessages(
sessionFile: string,
target: CodexMirroredSessionHistoryTarget,
): Promise<AgentMessage[] | undefined> {
try {
const raw = await fs.readFile(sessionFile, "utf-8");
await resolveSessionTranscriptTarget(resolveCodexHistoryTranscriptTarget(target));
const raw = await fs.readFile(target.sessionFile, "utf-8");
const entries = parseSessionEntries(raw);
if (entries.length === 0) {
return [];
}
const firstEntry = entries[0] as { type?: unknown; id?: unknown } | undefined;
if (firstEntry?.type !== "session" || typeof firstEntry.id !== "string") {
return undefined;
}
migrateSessionEntries(entries);
const sessionEntries = entries.filter(
(entry): entry is SessionEntry => entry.type !== "session",
);
migrateSessionEntries(entries as SessionEntry[]);
const sessionEntries = entries.filter((entry): entry is SessionEntry => {
return (
entry !== null &&
typeof entry === "object" &&
!Array.isArray(entry) &&
(entry as { type?: unknown }).type !== "session"
);
});
return sanitizeCodexHistoryImagePayloads(
buildSessionContext(sessionEntries).messages,
"codex mirrored history",
);
} catch (error) {
if (isMissingFileError(error)) {
return [];
}
} catch {
return undefined;
}
}
function resolveCodexHistoryTranscriptTarget(
target: CodexMirroredSessionHistoryTarget,
): SessionTranscriptTargetParams {
return {
...(target.agentId ? { agentId: target.agentId } : {}),
sessionFile: target.sessionFile,
sessionId: target.sessionId,
sessionKey: target.sessionKey ?? "",
};
}

View File

@@ -21,13 +21,14 @@ import {
mirrorCodexAppServerTranscript,
} from "./transcript-mirror.js";
const emitSessionTranscriptUpdateMock = vi.hoisted(() => vi.fn());
const publishSessionTranscriptUpdateByIdentityMock = vi.hoisted(() => vi.fn());
vi.mock("openclaw/plugin-sdk/agent-harness-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/agent-harness-runtime")>();
vi.mock("openclaw/plugin-sdk/session-transcript-runtime", async (importOriginal) => {
const actual =
await importOriginal<typeof import("openclaw/plugin-sdk/session-transcript-runtime")>();
return {
...actual,
emitSessionTranscriptUpdate: emitSessionTranscriptUpdateMock,
publishSessionTranscriptUpdateByIdentity: publishSessionTranscriptUpdateByIdentityMock,
};
});
@@ -44,7 +45,7 @@ const tempDirs: string[] = [];
afterEach(async () => {
resetGlobalHookRunner();
emitSessionTranscriptUpdateMock.mockReset();
publishSessionTranscriptUpdateByIdentityMock.mockReset();
for (const dir of tempDirs.splice(0)) {
await fs.rm(dir, { recursive: true, force: true });
}
@@ -130,6 +131,7 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [userMessage, assistantMessage, toolResultMessage],
idempotencyScope: "scope-1",
@@ -164,30 +166,32 @@ describe("mirrorCodexAppServerTranscript", () => {
const firstMirror = await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "agent:main:main",
messages: [userMessage],
idempotencyScope: "codex-app-server:thread-1",
});
const secondMirror = await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "agent:main:main",
messages: [userMessage],
idempotencyScope: "codex-app-server:thread-1",
});
const updates = emitSessionTranscriptUpdateMock.mock.calls.map(
([update]) => update as Record<string, unknown>,
const updates = publishSessionTranscriptUpdateByIdentityMock.mock.calls.map(
([update]) => update as Record<string, unknown> & { update?: Record<string, unknown> },
);
expect(updates).toHaveLength(1);
expect(updates[0]?.sessionFile).toBe(sessionFile);
expect(updates[0]?.sessionKey).toBe("agent:main:main");
expect(updates[0]?.messageId).toEqual(expect.any(String));
expect(updates[0]?.message).toMatchObject({
expect(updates[0]?.update?.messageId).toEqual(expect.any(String));
expect(updates[0]?.update?.message).toMatchObject({
role: "user",
content: [{ type: "text", text: "show me live" }],
idempotencyKey: "codex-app-server:thread-1:turn-1:prompt",
});
expect(updates[0]?.messageSeq).toBe(1);
expect(updates[0]?.update?.messageSeq).toBe(1);
expect(firstMirror.userMessagesPresent).toHaveLength(1);
expect(firstMirror.userMessagesPresent[0]).toMatchObject({
role: "user",
@@ -207,6 +211,7 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "agent:main:main",
messages: [
attachCodexMirrorIdentity(
@@ -227,14 +232,16 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "codex-app-server:thread-1",
});
const updates = emitSessionTranscriptUpdateMock.mock.calls.map(
([update]) => update as Record<string, unknown>,
const updates = publishSessionTranscriptUpdateByIdentityMock.mock.calls.map(
([update]) => update as Record<string, unknown> & { update?: Record<string, unknown> },
);
expect(updates.map((update) => update.messageSeq)).toEqual([1, 2]);
expect(updates.map((update) => (update.message as { role?: string }).role)).toEqual([
"user",
"assistant",
]);
expect(updates.map((update) => update.update?.messageSeq)).toEqual([1, 2]);
expect(
updates.map((update) => {
const message = update.update?.message as { role?: string } | undefined;
return message?.role;
}),
).toEqual(["user", "assistant"]);
});
it("creates the transcript directory on first mirror", async () => {
@@ -243,6 +250,7 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [
makeAgentAssistantMessage({
@@ -273,12 +281,14 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [...messages],
idempotencyScope: "scope-1",
});
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [...messages],
idempotencyScope: "scope-1",
@@ -312,6 +322,7 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [sourceMessage],
idempotencyScope: "scope-1",
@@ -348,12 +359,14 @@ describe("mirrorCodexAppServerTranscript", () => {
const first = await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [sourceMessage],
idempotencyScope: "scope-1",
});
const second = await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [sourceMessage],
idempotencyScope: "scope-1",
@@ -394,6 +407,7 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [sourceMessage],
idempotencyScope: "scope-1",
@@ -419,6 +433,7 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [
makeAgentAssistantMessage({
@@ -456,6 +471,7 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [
makeAgentAssistantMessage({
@@ -534,6 +550,7 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [userMessage, assistantMessage],
idempotencyScope: "codex-app-server:thread-X",
@@ -547,6 +564,7 @@ describe("mirrorCodexAppServerTranscript", () => {
);
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [userMessage, reasoningMessage, assistantMessage],
idempotencyScope: "codex-app-server:thread-X",
@@ -595,12 +613,14 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [userTurn1, assistantTurn1],
idempotencyScope: "codex-app-server:thread-X",
});
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [userTurn2, assistantTurn2],
idempotencyScope: "codex-app-server:thread-X",
@@ -638,6 +658,7 @@ describe("mirrorCodexAppServerTranscript", () => {
);
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [userTurn1, assistantTurn1],
idempotencyScope: "codex-app-server:thread-X",
@@ -661,6 +682,7 @@ describe("mirrorCodexAppServerTranscript", () => {
// turn 1's entries (with their original identities preserved).
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [userTurn1, assistantTurn1, userTurn2, assistantTurn2],
idempotencyScope: "codex-app-server:thread-X",
@@ -691,6 +713,7 @@ describe("mirrorCodexAppServerTranscript", () => {
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [userMessage, assistantMessage],
idempotencyScope: "scope-1",

View File

@@ -1,19 +1,19 @@
// Codex plugin module implements transcript mirror behavior.
import { createHash } from "node:crypto";
import fs from "node:fs/promises";
import {
acquireSessionWriteLock,
appendSessionTranscriptMessage,
embeddedAgentLog,
emitSessionTranscriptUpdate,
formatErrorMessage,
resolveSessionWriteLockOptions,
runAgentHarnessBeforeMessageWriteHook,
type AgentMessage,
type EmbeddedRunAttemptParams,
type EmbeddedRunAttemptResult,
type SessionWriteLockAcquireTimeoutConfig,
} from "openclaw/plugin-sdk/agent-harness-runtime";
import {
publishSessionTranscriptUpdateByIdentity,
withSessionTranscriptWriteLock,
type SessionTranscriptTargetParams,
type SessionTranscriptWriteLockParams,
} from "openclaw/plugin-sdk/session-transcript-runtime";
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
type MirroredAgentMessage = Extract<AgentMessage, { role: "user" | "assistant" | "toolResult" }>;
@@ -273,13 +273,13 @@ function buildMirrorDedupeIdentity(message: MirroredAgentMessage): string {
export async function mirrorCodexAppServerTranscript(params: {
sessionFile: string;
sessionId?: string;
sessionId: string;
cwd?: string;
sessionKey?: string;
agentId?: string;
messages: AgentMessage[];
idempotencyScope?: string;
config?: SessionWriteLockAcquireTimeoutConfig;
config?: SessionTranscriptWriteLockParams["config"];
}): Promise<CodexAppServerTranscriptMirrorResult> {
const messages = params.messages.filter(
(message): message is MirroredAgentMessage =>
@@ -289,129 +289,133 @@ export async function mirrorCodexAppServerTranscript(params: {
return { userMessagesPresent: [] };
}
const lock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
...resolveSessionWriteLockOptions(params.config),
});
const appendedUpdates: Array<{ messageId: string; message: AgentMessage; messageSeq: number }> =
[];
const userMessagesPresent: MirroredUserMessage[] = [];
try {
const mirrorState = await readTranscriptMirrorState(params.sessionFile);
let nextMessageSeq = mirrorState.messageCount;
for (const message of messages) {
const dedupeIdentity = buildMirrorDedupeIdentity(message);
const idempotencyKey = params.idempotencyScope
? `${params.idempotencyScope}:${dedupeIdentity}`
: undefined;
const transcriptMessage = {
...message,
...(idempotencyKey ? { idempotencyKey } : {}),
} as AgentMessage;
if (idempotencyKey && mirrorState.idempotencyKeys.has(idempotencyKey)) {
const persistedUserMessage = mirrorState.userMessagesByIdempotencyKey.get(idempotencyKey);
if (persistedUserMessage) {
userMessagesPresent.push(persistedUserMessage);
const transcriptTarget = resolveCodexMirrorTranscriptTarget(params);
const { appendedUpdates, userMessagesPresent } = await withSessionTranscriptWriteLock(
{ ...transcriptTarget, config: params.config },
async (transcript) => {
const nextAppendedUpdates: Array<{
messageId: string;
message: AgentMessage;
messageSeq: number;
}> = [];
const nextUserMessagesPresent: MirroredUserMessage[] = [];
const mirrorState = readTranscriptMirrorState(await transcript.readEvents());
let nextMessageSeq = mirrorState.messageCount;
for (const message of messages) {
const dedupeIdentity = buildMirrorDedupeIdentity(message);
const idempotencyKey = params.idempotencyScope
? `${params.idempotencyScope}:${dedupeIdentity}`
: undefined;
const transcriptMessage = {
...message,
...(idempotencyKey ? { idempotencyKey } : {}),
} as AgentMessage;
if (idempotencyKey && mirrorState.idempotencyKeys.has(idempotencyKey)) {
const persistedUserMessage = mirrorState.userMessagesByIdempotencyKey.get(idempotencyKey);
if (persistedUserMessage) {
nextUserMessagesPresent.push(persistedUserMessage);
}
continue;
}
continue;
}
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
message: transcriptMessage,
agentId: params.agentId,
sessionKey: params.sessionKey,
});
if (!nextMessage) {
continue;
}
const messageToAppend = (
idempotencyKey
? {
...(nextMessage as unknown as Record<string, unknown>),
idempotencyKey,
}
: nextMessage
) as AgentMessage;
const { messageId, message: appendedMessage } = await appendSessionTranscriptMessage({
transcriptPath: params.sessionFile,
message: messageToAppend,
idempotencyLookup: idempotencyKey ? "caller-checked" : "scan",
sessionId: params.sessionId,
cwd: params.cwd,
config: params.config,
});
if (appendedMessage.role === "user") {
userMessagesPresent.push(appendedMessage);
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
message: transcriptMessage,
agentId: params.agentId,
sessionKey: params.sessionKey,
});
if (!nextMessage) {
continue;
}
const messageToAppend = (
idempotencyKey
? {
...(nextMessage as unknown as Record<string, unknown>),
idempotencyKey,
}
: nextMessage
) as AgentMessage;
const appended = await transcript.appendMessage({
message: messageToAppend,
idempotencyLookup: idempotencyKey ? "caller-checked" : "scan",
cwd: params.cwd,
});
if (!appended) {
continue;
}
const { messageId, message: appendedMessage } = appended;
if (appendedMessage.role === "user") {
nextUserMessagesPresent.push(appendedMessage);
if (idempotencyKey) {
mirrorState.userMessagesByIdempotencyKey.set(idempotencyKey, appendedMessage);
}
}
nextMessageSeq += 1;
nextAppendedUpdates.push({
messageId,
message: appendedMessage,
messageSeq: nextMessageSeq,
});
if (idempotencyKey) {
mirrorState.userMessagesByIdempotencyKey.set(idempotencyKey, appendedMessage);
mirrorState.idempotencyKeys.add(idempotencyKey);
}
}
nextMessageSeq += 1;
appendedUpdates.push({ messageId, message: appendedMessage, messageSeq: nextMessageSeq });
if (idempotencyKey) {
mirrorState.idempotencyKeys.add(idempotencyKey);
}
}
} finally {
await lock.release();
}
return { appendedUpdates: nextAppendedUpdates, userMessagesPresent: nextUserMessagesPresent };
},
);
for (const update of appendedUpdates) {
emitSessionTranscriptUpdate({
sessionFile: params.sessionFile,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
...(params.agentId ? { agentId: params.agentId } : {}),
...(params.sessionId && params.sessionKey && params.agentId
? {
target: {
agentId: params.agentId,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
},
}
: {}),
message: update.message,
messageId: update.messageId,
messageSeq: update.messageSeq,
await publishSessionTranscriptUpdateByIdentity({
...transcriptTarget,
update: {
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
...(params.agentId ? { agentId: params.agentId } : {}),
message: update.message,
messageId: update.messageId,
messageSeq: update.messageSeq,
},
});
}
return { userMessagesPresent };
}
async function readTranscriptMirrorState(sessionFile: string): Promise<{
function resolveCodexMirrorTranscriptTarget(params: {
agentId?: string;
sessionFile: string;
sessionId: string;
sessionKey?: string;
}): SessionTranscriptTargetParams {
return {
...(params.agentId ? { agentId: params.agentId } : {}),
sessionFile: params.sessionFile,
sessionId: params.sessionId,
sessionKey: params.sessionKey ?? "",
};
}
function readTranscriptMirrorState(events: unknown[]): {
idempotencyKeys: Set<string>;
messageCount: number;
userMessagesByIdempotencyKey: Map<string, MirroredUserMessage>;
}> {
} {
const idempotencyKeys = new Set<string>();
const userMessagesByIdempotencyKey = new Map<string, MirroredUserMessage>();
let messageCount = 0;
let raw: string;
try {
raw = await fs.readFile(sessionFile, "utf8");
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
throw error;
}
return { idempotencyKeys, messageCount, userMessagesByIdempotencyKey };
}
for (const line of raw.split(/\r?\n/)) {
if (!line.trim()) {
for (const event of events) {
if (!event || typeof event !== "object" || Array.isArray(event)) {
continue;
}
try {
const parsed = JSON.parse(line) as { message?: AgentMessage & { idempotencyKey?: unknown } };
if ((parsed as { type?: unknown }).type === "message") {
messageCount += 1;
const parsed = event as {
message?: AgentMessage & { idempotencyKey?: unknown };
type?: unknown;
};
if (parsed.type === "message") {
messageCount += 1;
}
if (typeof parsed.message?.idempotencyKey === "string") {
idempotencyKeys.add(parsed.message.idempotencyKey);
if (parsed.message.role === "user") {
userMessagesByIdempotencyKey.set(parsed.message.idempotencyKey, parsed.message);
}
if (typeof parsed.message?.idempotencyKey === "string") {
idempotencyKeys.add(parsed.message.idempotencyKey);
if (parsed.message.role === "user") {
userMessagesByIdempotencyKey.set(parsed.message.idempotencyKey, parsed.message);
}
}
} catch {
continue;
}
}
return { idempotencyKeys, messageCount, userMessagesByIdempotencyKey };

View File

@@ -2461,11 +2461,13 @@ describe("runCopilotAttempt", () => {
expect(dualWriteMock.dualWriteCopilotTranscriptBestEffort).toHaveBeenCalledTimes(1);
const args = dualWriteMock.dualWriteCopilotTranscriptBestEffort.mock.calls[0]?.[0] as {
sessionFile: string;
sessionId: string;
messages: Array<{ role: string }>;
idempotencyScope?: string;
};
expect(args.sessionFile).toBe("session.json");
expect(args.idempotencyScope).toMatch(/^copilot:/u);
expect(args.sessionId).toBe("session-1");
expect(args.idempotencyScope).toBe("copilot:sess-1");
expect(args.messages.length).toBeGreaterThan(0);
const roles = args.messages.map((m) => m.role);
expect(roles).toContain("user");
@@ -2512,10 +2514,9 @@ describe("runCopilotAttempt", () => {
}
const identity = message["__openclaw"]?.mirrorIdentity ?? "";
// The terminal assistant carries the turn-stable
// `${runId}:assistant:final` identity attached by attempt.ts
// (rubber-duck-validated identity scheme — survives SDK session
// reuse across turns). Caller-passed history without an
// identity falls through to the positional `${scope}:role:idx`
// `${runId}:assistant:final` identity attached by attempt.ts.
// Caller-passed history without an identity falls through to
// the positional `${scope}:role:idx`.
// fingerprint that the existing tagging map applies.
if (message.role === "assistant" && index === args.messages.length - 1) {
expect(identity).toMatch(/:assistant:final$/u);

View File

@@ -1005,8 +1005,9 @@ export async function runCopilotAttempt(
// extension. Identity-tagged so re-emits dedupe. Errors are
// swallowed so a mirror failure cannot break the attempt.
const sessionFileForMirror = readString(input.sessionFile);
const sessionIdForScope = sessionIdUsed ?? readString(input.sessionId);
if (sessionFileForMirror && messagesSnapshot.length > 0) {
const openClawSessionIdForMirror = readString(input.sessionId);
const mirrorScopeSessionId = sessionIdUsed ?? openClawSessionIdForMirror;
if (sessionFileForMirror && openClawSessionIdForMirror && messagesSnapshot.length > 0) {
const taggedMessages = messagesSnapshot.map((message, index) => {
if (
message.role !== "user" &&
@@ -1027,16 +1028,16 @@ export async function runCopilotAttempt(
if (hasMirrorIdentity(message)) {
return message;
}
const identityScope = sdkSessionId ?? sessionIdForScope ?? "attempt";
const identityScope = sdkSessionId ?? mirrorScopeSessionId ?? "attempt";
return attachCopilotMirrorIdentity(message, `${identityScope}:${message.role}:${index}`);
});
await dualWriteCopilotTranscriptBestEffort({
sessionFile: sessionFileForMirror,
sessionId: openClawSessionIdForMirror,
sessionKey: readString((input as { sessionKey?: unknown }).sessionKey),
sessionId: readString(input.sessionId),
agentId: readString(input.agentId),
messages: taggedMessages,
idempotencyScope: sessionIdForScope ? `copilot:${sessionIdForScope}` : undefined,
idempotencyScope: mirrorScopeSessionId ? `copilot:${mirrorScopeSessionId}` : undefined,
config: (input as { config?: unknown }).config as never,
}).catch((mirrorError: unknown) => {
// Defense-in-depth: the best-effort wrapper already swallows

View File

@@ -86,6 +86,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [userMessage, assistantMessage, toolResultMessage],
idempotencyScope: "copilot:session-1",
@@ -113,6 +114,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [
makeAgentAssistantMessage({
@@ -143,12 +145,14 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [...messages],
idempotencyScope: "copilot:session-1",
});
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [...messages],
idempotencyScope: "copilot:session-1",
@@ -185,6 +189,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [sourceMessage],
idempotencyScope: "copilot:session-1",
@@ -210,6 +215,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [
makeAgentAssistantMessage({
@@ -228,6 +234,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
sessionKey: "session-1",
messages: [],
idempotencyScope: "copilot:session-1",
@@ -245,6 +252,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
messages: [message],
idempotencyScope: "scope-fp",
});
@@ -263,6 +271,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
messages: [tagged],
idempotencyScope: "copilot:openclaw-session-1",
});
@@ -279,6 +288,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
messages: [
makeAgentAssistantMessage({
content: [{ type: "text", text: "no scope" }],
@@ -306,6 +316,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
messages: [userMessage, systemLike],
idempotencyScope: "scope",
});
@@ -326,6 +337,7 @@ describe("mirrorCopilotTranscript", () => {
await mirrorCopilotTranscript({
sessionFile,
sessionId: "session-1",
messages: [second],
idempotencyScope: "scope",
});
@@ -342,6 +354,7 @@ describe("dualWriteCopilotTranscriptBestEffort", () => {
await expect(
dualWriteCopilotTranscriptBestEffort({
sessionFile,
sessionId: "session-1",
messages: [
makeAgentAssistantMessage({
content: [{ type: "text", text: "ok" }],
@@ -356,22 +369,34 @@ describe("dualWriteCopilotTranscriptBestEffort", () => {
});
it("swallows infrastructure failures and never rejects", async () => {
// Pointing sessionFile at a path under a non-existent root with an
// empty-string segment can fail differently on different platforms;
// instead force failure by passing an invalid type and asserting
// that the wrapper itself does not reject. Use any-cast for the
// bad input shape since we are testing the wrapper's catch.
await expect(
dualWriteCopilotTranscriptBestEffort({
sessionFile: "" as unknown as string,
messages: [
makeAgentAssistantMessage({
content: [{ type: "text", text: "should-not-throw" }],
timestamp: Date.now(),
}),
],
idempotencyScope: "scope",
}),
).resolves.toBeUndefined();
const root = await makeRoot("openclaw-copilot-mirror-invalid-");
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = root;
try {
await expect(
dualWriteCopilotTranscriptBestEffort({
agentId: "main",
sessionFile: "",
sessionId: "session-1",
sessionKey: "agent:main:session-1",
messages: [
makeAgentAssistantMessage({
content: [{ type: "text", text: "should-not-throw" }],
timestamp: Date.now(),
}),
],
idempotencyScope: "scope",
}),
).resolves.toBeUndefined();
await expect(
fs.access(path.join(root, "agents", "main", "sessions", "session-1.jsonl")),
).rejects.toHaveProperty("code", "ENOENT");
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
}
});
});

View File

@@ -29,16 +29,16 @@
*/
import { createHash } from "node:crypto";
import fs from "node:fs/promises";
import {
acquireSessionWriteLock,
appendSessionTranscriptMessage,
emitSessionTranscriptUpdate,
resolveSessionWriteLockAcquireTimeoutMs,
runAgentHarnessBeforeMessageWriteHook,
type AgentMessage,
type SessionWriteLockAcquireTimeoutConfig,
} from "openclaw/plugin-sdk/agent-harness-runtime";
import {
publishSessionTranscriptUpdateByIdentity,
withSessionTranscriptWriteLock,
type SessionTranscriptTargetParams,
type SessionTranscriptWriteLockParams,
} from "openclaw/plugin-sdk/session-transcript-runtime";
type MirroredAgentMessage = Extract<AgentMessage, { role: "user" | "assistant" | "toolResult" }>;
@@ -95,8 +95,8 @@ function buildMirrorDedupeIdentity(message: MirroredAgentMessage): string {
export interface MirrorCopilotTranscriptParams {
sessionFile: string;
sessionId: string;
sessionKey?: string;
sessionId?: string;
agentId?: string;
messages: AgentMessage[];
/**
@@ -107,7 +107,7 @@ export interface MirrorCopilotTranscriptParams {
* entry collide with its existing on-disk key and be a true no-op.
*/
idempotencyScope?: string;
config?: SessionWriteLockAcquireTimeoutConfig;
config?: SessionTranscriptWriteLockParams["config"];
}
export async function mirrorCopilotTranscript(
@@ -121,95 +121,91 @@ export async function mirrorCopilotTranscript(
return;
}
const lock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
});
try {
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
for (const message of messages) {
const dedupeIdentity = buildMirrorDedupeIdentity(message);
const idempotencyKey = params.idempotencyScope
? `${params.idempotencyScope}:${dedupeIdentity}`
: undefined;
if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) {
continue;
const transcriptTarget = resolveCopilotMirrorTranscriptTarget(params);
const didAppend = await withSessionTranscriptWriteLock(
{ ...transcriptTarget, config: params.config },
async (transcript) => {
let didAppendMessage = false;
const existingIdempotencyKeys = readTranscriptIdempotencyKeys(await transcript.readEvents());
for (const message of messages) {
const dedupeIdentity = buildMirrorDedupeIdentity(message);
const idempotencyKey = params.idempotencyScope
? `${params.idempotencyScope}:${dedupeIdentity}`
: undefined;
if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) {
continue;
}
const transcriptMessage = {
...message,
...(idempotencyKey ? { idempotencyKey } : {}),
} as AgentMessage;
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
message: transcriptMessage,
agentId: params.agentId,
sessionKey: params.sessionKey,
});
if (!nextMessage) {
continue;
}
const messageToAppend = (
idempotencyKey
? {
...(nextMessage as unknown as Record<string, unknown>),
idempotencyKey,
}
: nextMessage
) as AgentMessage;
const appended = await transcript.appendMessage({
message: messageToAppend,
idempotencyLookup: idempotencyKey ? "caller-checked" : "scan",
});
if (!appended) {
continue;
}
didAppendMessage = true;
if (idempotencyKey) {
existingIdempotencyKeys.add(idempotencyKey);
}
}
const transcriptMessage = {
...message,
...(idempotencyKey ? { idempotencyKey } : {}),
} as AgentMessage;
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
message: transcriptMessage,
agentId: params.agentId,
sessionKey: params.sessionKey,
});
if (!nextMessage) {
continue;
}
const messageToAppend = (
idempotencyKey
? {
...(nextMessage as unknown as Record<string, unknown>),
idempotencyKey,
}
: nextMessage
) as AgentMessage;
await appendSessionTranscriptMessage({
transcriptPath: params.sessionFile,
message: messageToAppend,
config: params.config,
});
if (idempotencyKey) {
existingIdempotencyKeys.add(idempotencyKey);
}
}
} finally {
await lock.release();
}
return didAppendMessage;
},
);
if (params.sessionKey) {
emitSessionTranscriptUpdate({
sessionFile: params.sessionFile,
sessionKey: params.sessionKey,
...(params.agentId ? { agentId: params.agentId } : {}),
...(params.sessionId && params.agentId
? {
target: {
agentId: params.agentId,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
},
}
: {}),
if (didAppend) {
await publishSessionTranscriptUpdateByIdentity({
...transcriptTarget,
update: params.sessionKey ? { sessionKey: params.sessionKey } : undefined,
});
} else {
emitSessionTranscriptUpdate(params.sessionFile);
}
}
async function readTranscriptIdempotencyKeys(sessionFile: string): Promise<Set<string>> {
const keys = new Set<string>();
let raw: string;
try {
raw = await fs.readFile(sessionFile, "utf8");
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
throw error;
}
return keys;
function resolveCopilotMirrorTranscriptTarget(params: {
agentId?: string;
sessionFile: string;
sessionId: string;
sessionKey?: string;
}): SessionTranscriptTargetParams {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
throw new Error("Copilot transcript mirror requires a sessionFile target");
}
for (const line of raw.split(/\r?\n/)) {
if (!line.trim()) {
return {
...(params.agentId ? { agentId: params.agentId } : {}),
sessionFile,
sessionId: params.sessionId,
sessionKey: params.sessionKey ?? "",
};
}
function readTranscriptIdempotencyKeys(events: unknown[]): Set<string> {
const keys = new Set<string>();
for (const event of events) {
if (!event || typeof event !== "object" || Array.isArray(event)) {
continue;
}
try {
const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } };
if (typeof parsed.message?.idempotencyKey === "string") {
keys.add(parsed.message.idempotencyKey);
}
} catch {
continue;
const parsed = event as { message?: { idempotencyKey?: unknown } };
if (typeof parsed.message?.idempotencyKey === "string") {
keys.add(parsed.message.idempotencyKey);
}
}
return keys;

View File

@@ -23,6 +23,7 @@ describe("qa channel transport", () => {
},
},
messages: {
visibleReplies: "automatic",
groupChat: {
mentionPatterns: ["\\b@?openclaw\\b"],
visibleReplies: "automatic",

View File

@@ -91,6 +91,7 @@ export function createQaChannelGatewayConfig(params: {
},
},
messages: {
visibleReplies: "automatic",
groupChat: {
mentionPatterns: ["\\b@?openclaw\\b"],
visibleReplies: "automatic",

View File

@@ -22,6 +22,7 @@ function createQaChannelTransportParams(baseUrl = "http://127.0.0.1:43124") {
},
},
messages: {
visibleReplies: "automatic",
groupChat: {
mentionPatterns: ["\\b@?openclaw\\b"],
visibleReplies: "automatic",
@@ -102,6 +103,7 @@ describe("buildQaGatewayConfig", () => {
expect(cfg.channels?.["qa-channel"]?.enabled).toBe(true);
expect(cfg.channels?.["qa-channel"]?.baseUrl).toBe("http://127.0.0.1:43124");
expect(cfg.channels?.["qa-channel"]?.pollTimeoutMs).toBe(250);
expect(cfg.messages?.visibleReplies).toBe("automatic");
expect(cfg.messages?.groupChat?.mentionPatterns).toEqual(["\\b@?openclaw\\b"]);
expect(cfg.messages?.groupChat?.visibleReplies).toBe("automatic");
});

View File

@@ -11,6 +11,9 @@ scenario:
- tools.message
- channels.webchat
objective: Verify a current-chat reply is delivered as assistant text, not by calling `message(action=send)` and ending with `Sent.`.
gatewayConfigPatch:
session:
dmScope: per-channel-peer
successCriteria:
- The visible outbound reply contains the requested marker exactly once.
- The session transcript does not include a `message(action=send)` call followed by final assistant text `Sent.`.
@@ -24,7 +27,9 @@ scenario:
kind: flow
summary: Run a direct current-chat reply and inspect the actual transcript for self-message routing.
config:
conversationId: qa-operator
expectedMarker: WEBCHAT-DIRECT-REPLY-OK
promptSnippet: Reply exactly
flow:
steps:
@@ -39,31 +44,56 @@ flow:
- ref: env
- 60000
- call: reset
- set: conversationId
value:
expr: config.conversationId
- set: sessionKey
value:
expr: "`agent:qa:webchat-direct-reply:${randomUUID().slice(0, 8)}`"
expr: "buildAgentSessionKey({ agentId: 'qa', channel: 'qa-channel', accountId: 'default', peer: { kind: 'direct', id: `dm:${conversationId}` }, dmScope: env.cfg.session?.dmScope, identityLinks: env.cfg.session?.identityLinks })"
- set: startIndex
value:
expr: state.getSnapshot().messages.length
- call: runAgentPrompt
- set: requestCountBefore
value:
expr: "env.mock ? (await fetchJson(`${env.mock.baseUrl}/debug/requests`)).length : 0"
- call: state.addInboundMessage
args:
- ref: env
- sessionKey:
ref: sessionKey
message:
expr: "`Reply directly in this current chat with exactly ${config.expectedMarker}. Do not call the message tool.`"
timeoutMs:
expr: liveTurnTimeoutMs(env, 60000)
- call: waitForOutboundMessage
saveAs: outbound
args:
- ref: state
- lambda:
params: [candidate]
expr: "candidate.conversation.id === 'qa-operator' && normalizeLowercaseStringOrEmpty(candidate.text).includes(normalizeLowercaseStringOrEmpty(config.expectedMarker))"
- expr: liveTurnTimeoutMs(env, 30000)
- sinceIndex:
ref: startIndex
- conversation:
id:
ref: conversationId
kind: direct
senderId:
ref: conversationId
senderName: WebChat QA
text:
expr: "`Reply exactly \\`${config.expectedMarker}\\` in this current chat. Do not call the message tool.`"
- try:
actions:
- call: waitForCondition
saveAs: scenarioRequest
args:
- lambda:
async: true
expr: "env.mock ? (await fetchJson(`${env.mock.baseUrl}/debug/requests`)).slice(requestCountBefore).find((request) => String(request.allInputText ?? '').includes(config.promptSnippet)) : true"
- expr: liveTurnTimeoutMs(env, 60000)
- 500
- call: waitForOutboundMessage
saveAs: outbound
args:
- ref: state
- lambda:
params: [candidate]
expr: "candidate.conversation.id === conversationId && normalizeLowercaseStringOrEmpty(candidate.text).includes(normalizeLowercaseStringOrEmpty(config.expectedMarker))"
- expr: liveTurnTimeoutMs(env, 60000)
- sinceIndex:
ref: startIndex
catchAs: directReplyError
catch:
- set: directReplyDebugRequests
value:
expr: "env.mock ? (await fetchJson(`${env.mock.baseUrl}/debug/requests`)).slice(requestCountBefore).map((request) => ({ plannedToolName: request.plannedToolName ?? null, plannedToolArgs: request.plannedToolArgs ?? null, allInputText: String(request.allInputText ?? '').slice(0, 400), finalText: String(request.finalText ?? '').slice(0, 200), toolOutput: request.toolOutput ? String(request.toolOutput).slice(0, 200) : null })) : []"
- throw:
expr: "`direct reply marker missing: ${directReplyError?.message ?? directReplyError}; transcript=${formatTransportTranscript(state, { conversationId })}; requests=${JSON.stringify(directReplyDebugRequests)}`"
- set: transcriptSummary
value:
expr: "await readSessionTranscriptSummary(env, sessionKey)"

View File

@@ -24,6 +24,13 @@ const DEFAULT_QA_SCENARIOS = [
"memory-failure-fallback",
"gateway-restart-inflight-run",
];
const SINGLE_VALUE_FLAGS = new Set([
"--cpu-core-warn",
"--hot-wall-warn-ms",
"--output-dir",
"--runs",
"--warmup",
]);
const DEFAULT_CPU_CORE_WARN = 0.9;
const DEFAULT_HOT_WALL_WARN_MS = 30_000;
const PRIVATE_QA_REQUIRED_DIST_ENTRIES = [
@@ -49,8 +56,15 @@ function parseArgs(argv) {
cpuCoreWarn: DEFAULT_CPU_CORE_WARN,
hotWallWarnMs: DEFAULT_HOT_WALL_WARN_MS,
};
const seenSingleValueFlags = new Set();
for (let index = 0; index < args.length; index += 1) {
const arg = args[index];
if (SINGLE_VALUE_FLAGS.has(arg)) {
if (seenSingleValueFlags.has(arg)) {
throw new Error(`${arg} was provided more than once`);
}
seenSingleValueFlags.add(arg);
}
const readValue = () => {
const value = args[index + 1];
if (!value || value.startsWith("-")) {

View File

@@ -35,6 +35,24 @@ const DEFAULT_CPU_CORE_WARN = 0.9;
const DEFAULT_HOT_WALL_WARN_MS = 30_000;
const DEFAULT_MAX_RSS_WARN_MB = 1536;
const DEFAULT_QA_PLUGIN_CHUNK_SIZE = 12;
const SINGLE_VALUE_FLAGS = new Set([
"--build-timeout-ms",
"--command-timeout-ms",
"--cpu-core-warn",
"--hot-wall-warn-ms",
"--limit",
"--max-rss-warn-mb",
"--output-dir",
"--qa-cpu-regression-multiplier",
"--qa-plugin-chunk-size",
"--qa-timeout-ms",
"--qa-wall-regression-multiplier",
"--repo-root",
"--rss-anomaly-multiplier",
"--shard-index",
"--shard-total",
"--wall-anomaly-multiplier",
]);
const COMMAND_OUTPUT_MAX_BUFFER_BYTES = 16 * 1024 * 1024;
const MAX_TIMER_TIMEOUT_MS = 2_147_000_000;
const ANSI_PATTERN = new RegExp(String.raw`\u001B\[[0-9;]*m`, "gu");
@@ -79,8 +97,15 @@ export function parseArgs(argv) {
};
const envIds = normalizeCsv(process.env.OPENCLAW_PLUGIN_GATEWAY_GAUNTLET_IDS);
options.pluginIds.push(...envIds);
const seenSingleValueFlags = new Set();
parseArgv: for (let index = 0; index < args.length; index += 1) {
const arg = args[index];
if (SINGLE_VALUE_FLAGS.has(arg)) {
if (seenSingleValueFlags.has(arg)) {
throw new Error(`${arg} was provided more than once`);
}
seenSingleValueFlags.add(arg);
}
const readValue = () => {
const value = args[index + 1];
if (!value || value.startsWith("-")) {

View File

@@ -14,6 +14,7 @@ import { resolveWindowsTaskkillPath } from "./lib/windows-taskkill.mjs";
const DEFAULT_METHODS = ["health", "config.get"];
const DEFAULT_ITERATIONS = 10;
const SINGLE_VALUE_FLAGS = new Set(["--iterations", "--methods", "--output-dir", "--repo-root"]);
/** Maximum time to wait for a spawned gateway to become reachable. */
export const READY_TIMEOUT_MS = 120_000;
/** Per-probe timeout used while polling gateway readiness endpoints. */
@@ -74,12 +75,19 @@ export function parseArgs(argv) {
iterations: DEFAULT_ITERATIONS,
methods: DEFAULT_METHODS,
};
const seenSingleValueFlags = new Set();
for (let index = 0; index < argv.length; index += 1) {
const arg = argv[index];
if (arg === "--help" || arg === "-h") {
args.help = true;
continue;
}
if (SINGLE_VALUE_FLAGS.has(arg)) {
if (seenSingleValueFlags.has(arg)) {
throw new Error(`${arg} was provided more than once.`);
}
seenSingleValueFlags.add(arg);
}
if (arg === "--output-dir") {
args.outputDir = readFlagValue(argv, index, arg);
index += 1;

View File

@@ -14,9 +14,7 @@ import type {
ContextEngineRuntimeSettings,
} from "../../context-engine/types.js";
import {
captureCompactionCheckpointSnapshotAsync,
cleanupCompactionCheckpointSnapshot,
persistSessionCompactionCheckpoint,
createFileBackedCompactionCheckpointStore,
readSessionLeafStateFromTranscriptAsync,
resolveCompactionCheckpointTranscriptPosition,
resolveSessionCompactionCheckpointReason,
@@ -63,6 +61,8 @@ import { resolveModelAsync } from "./model.js";
import type { EmbeddedAgentCompactResult } from "./types.js";
import { normalizeContextTokenBudget } from "./utils.js";
const compactionCheckpointStore = createFileBackedCompactionCheckpointStore();
function shouldFallbackAfterHarnessCompaction(
result: EmbeddedAgentCompactResult | undefined,
): boolean {
@@ -352,7 +352,7 @@ export async function compactEmbeddedAgentSession(
// are notified regardless of which engine is active.
const engineOwnsCompaction = contextEngine.info.ownsCompaction === true;
checkpointSnapshot = engineOwnsCompaction
? await captureCompactionCheckpointSnapshotAsync({
? await compactionCheckpointStore.captureSnapshot({
sessionFile: params.sessionFile,
})
: null;
@@ -478,7 +478,7 @@ export async function compactEmbeddedAgentSession(
preferredLeafId: postCompactionLeafId,
transcriptState,
});
const storedCheckpoint = await persistSessionCompactionCheckpoint({
const storedCheckpoint = await compactionCheckpointStore.persistCheckpoint({
cfg: params.config,
sessionKey: params.sessionKey,
sessionId: postCompactionSessionId,
@@ -620,7 +620,7 @@ export async function compactEmbeddedAgentSession(
};
} finally {
if (!checkpointSnapshotRetained) {
await cleanupCompactionCheckpointSnapshot(checkpointSnapshot);
await compactionCheckpointStore.cleanupSnapshot(checkpointSnapshot);
}
await contextEngine.dispose?.();
}

View File

@@ -1,12 +1,12 @@
/**
* Types for the lazy embedded-agent compaction runtime boundary.
*/
import type { CompactEmbeddedAgentSessionParams } from "./compact.types.js";
import type { CompactEmbeddedAgentSessionRuntimeParams } from "./compact.types.js";
import type { EmbeddedAgentCompactResult } from "./types.js";
/**
* Lazy-runtime signature for direct embedded session compaction.
*/
export type CompactEmbeddedAgentSessionDirect = (
params: CompactEmbeddedAgentSessionParams,
params: CompactEmbeddedAgentSessionRuntimeParams,
) => Promise<EmbeddedAgentCompactResult>;

View File

@@ -8,9 +8,7 @@ import type { ThinkLevel } from "../../auto-reply/thinking.js";
import { resolveAgentModelFallbackValues } from "../../config/model-input.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import {
captureCompactionCheckpointSnapshotAsync,
cleanupCompactionCheckpointSnapshot,
persistSessionCompactionCheckpoint,
createFileBackedCompactionCheckpointStore,
readSessionLeafStateFromTranscriptAsync,
resolveCompactionCheckpointTranscriptPosition,
resolveSessionCompactionCheckpointReason,
@@ -107,6 +105,10 @@ import { wrapStreamFnTextTransforms } from "../plugin-text-transforms.js";
import { resolveAgentPromptSurfaceForSessionKey } from "../prompt-surface.js";
import { applyPreparedRuntimeAuthToModel } from "../provider-request-config.js";
import { registerProviderStreamForModel } from "../provider-stream.js";
import {
applyAgentRunSessionTargetIdentity,
resolveAgentRunSessionTarget,
} from "../run-session-target.js";
import { collectRuntimeChannelCapabilities } from "../runtime-capabilities.js";
import { buildAgentRuntimePlan } from "../runtime-plan/build.js";
import type { AgentRuntimePlan } from "../runtime-plan/types.js";
@@ -135,6 +137,7 @@ import {
} from "./compact-reasons.js";
import type {
CompactEmbeddedAgentSessionParams,
CompactEmbeddedAgentSessionRuntimeParams,
CompactionMessageMetrics,
} from "./compact.types.js";
import { dedupeDuplicateUserMessagesForCompaction } from "./compaction-duplicate-user-messages.js";
@@ -192,6 +195,11 @@ import { mapThinkingLevel, normalizeContextTokenBudget } from "./utils.js";
import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js";
export type { CompactEmbeddedAgentSessionParams } from "./compact.types.js";
const compactionCheckpointStore = createFileBackedCompactionCheckpointStore();
type CompactEmbeddedAgentSessionParamsWithSessionFile = CompactEmbeddedAgentSessionRuntimeParams & {
sessionFile: string;
};
function hasRealConversationContent(
msg: AgentMessage,
messages: AgentMessage[],
@@ -464,8 +472,17 @@ function fallbackFailureToCompactionResult(err: unknown): EmbeddedAgentCompactRe
* Use this when already inside a session/global lane to avoid deadlocks.
*/
export async function compactEmbeddedAgentSessionDirect(
params: CompactEmbeddedAgentSessionParams,
paramsInput: CompactEmbeddedAgentSessionRuntimeParams,
): Promise<EmbeddedAgentCompactResult> {
const paramsBase = applyAgentRunSessionTargetIdentity(paramsInput);
const runSessionTarget = await resolveAgentRunSessionTarget(paramsBase);
const params: CompactEmbeddedAgentSessionParamsWithSessionFile = {
...paramsBase,
agentId: paramsBase.agentId ?? runSessionTarget.agentId,
sessionId: runSessionTarget.sessionId,
sessionKey: paramsBase.sessionKey ?? runSessionTarget.sessionKey,
sessionFile: runSessionTarget.sessionFile,
};
if (hasExplicitCompactionModel(params) || !hasCompactionModelFallbackCandidates(params)) {
return await compactEmbeddedAgentSessionDirectOnce(params);
}
@@ -530,7 +547,7 @@ export async function compactEmbeddedAgentSessionDirect(
}
async function compactEmbeddedAgentSessionDirectOnce(
params: CompactEmbeddedAgentSessionParams,
params: CompactEmbeddedAgentSessionParamsWithSessionFile,
): Promise<EmbeddedAgentCompactResult> {
const startedAt = Date.now();
const diagId = params.diagId?.trim() || createCompactionDiagId();
@@ -1190,7 +1207,7 @@ async function compactEmbeddedAgentSessionDirectOnce(
: undefined,
allowedToolNames,
});
checkpointSnapshot = await captureCompactionCheckpointSnapshotAsync({
checkpointSnapshot = await compactionCheckpointStore.captureSnapshot({
sessionManager,
sessionFile: params.sessionFile,
});
@@ -1546,7 +1563,7 @@ async function compactEmbeddedAgentSessionDirectOnce(
preferredLeafId: activePostLeafId,
transcriptState,
});
const storedCheckpoint = await persistSessionCompactionCheckpoint({
const storedCheckpoint = await compactionCheckpointStore.persistCheckpoint({
cfg: params.config,
sessionKey: params.sessionKey,
sessionId: activeSessionId,
@@ -1670,7 +1687,7 @@ async function compactEmbeddedAgentSessionDirectOnce(
return fail(reason, err);
} finally {
if (!checkpointSnapshotRetained) {
await cleanupCompactionCheckpointSnapshot(checkpointSnapshot);
await compactionCheckpointStore.cleanupSnapshot(checkpointSnapshot);
}
restoreSkillEnv?.();
}

View File

@@ -9,12 +9,15 @@ import type { ContextEngine, ContextEngineRuntimeContext } from "../../context-e
import type { CommandQueueEnqueueFn } from "../../process/command-queue.types.js";
import type { SkillSnapshot } from "../../skills/types.js";
import type { ExecElevatedDefaults, ExecToolDefaults } from "../bash-tools.exec-types.js";
import type { AgentRunSessionTarget } from "../run-session-target.js";
import type { AgentRuntimePlan } from "../runtime-plan/types.js";
export type CompactEmbeddedAgentSessionParams = {
sessionId: string;
runId?: string;
sessionKey?: string;
/** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */
sessionTarget?: AgentRunSessionTarget;
/** Caller-resolved owner agent for global session aliases. */
agentId?: string;
/** Session key used only for runtime policy/sandbox resolution. Defaults to sessionKey. */
@@ -106,6 +109,14 @@ export type CompactEmbeddedAgentSessionParams = {
oneShotCliRun?: boolean;
};
export type CompactEmbeddedAgentSessionRuntimeParams = Omit<
CompactEmbeddedAgentSessionParams,
"sessionFile"
> & {
/** Deprecated file-backed artifact target. Prefer sessionTarget for new callers. */
sessionFile?: string;
};
export type CompactionMessageMetrics = {
messages: number;
historyTextChars: number;

View File

@@ -125,6 +125,10 @@ import {
import { resolveProviderIdForAuth } from "../provider-auth-aliases.js";
import { hasOnlyAssistantReasoningContent } from "../replay-turn-classification.js";
import { runAgentCleanupStep } from "../run-cleanup-timeout.js";
import {
applyAgentRunSessionTargetIdentity,
resolveAgentRunSessionTarget,
} from "../run-session-target.js";
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
import { buildAgentRuntimePlan } from "../runtime-plan/build.js";
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
@@ -255,6 +259,7 @@ const BEFORE_AGENT_FINALIZE_RETRY_PROMPT_PREFIX =
"Before accepting the previous final answer, apply this revision request and produce the revised final answer. Do not repeat completed work or rerun tools unless the request explicitly requires it.";
const MAX_BEFORE_AGENT_FINALIZE_REVISIONS = 3;
type EmbeddedRunAttemptForRunner = Awaited<ReturnType<typeof runEmbeddedAttemptWithBackend>>;
type RunEmbeddedAgentParamsWithSessionFile = RunEmbeddedAgentParams & { sessionFile: string };
function isNoRealConversationCompactionNoop(params: {
ok?: boolean;
@@ -617,20 +622,28 @@ export function runEmbeddedAgent(
async function runEmbeddedAgentInternal(
paramsInput: RunEmbeddedAgentParams,
): Promise<EmbeddedAgentRunResult> {
let params = paramsInput;
let lifecycleGeneration = params.lifecycleGeneration!;
const paramsBase = applyAgentRunSessionTargetIdentity(paramsInput);
let lifecycleGeneration = paramsBase.lifecycleGeneration!;
const queuedLifecycleGeneration = getAgentEventLifecycleGeneration();
// Resolve sessionKey early so all downstream consumers (hooks, LCM, compaction)
// receive a non-null key even when callers omit it. See #60552.
const effectiveSessionKey = backfillSessionKey({
config: params.config,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
agentId: params.agentId,
config: paramsBase.config,
sessionId: paramsBase.sessionId,
sessionKey: paramsBase.sessionKey,
agentId: paramsBase.agentId,
});
if (effectiveSessionKey !== params.sessionKey) {
params = { ...params, sessionKey: effectiveSessionKey };
}
const runSessionTarget = await resolveAgentRunSessionTarget({
...paramsBase,
sessionKey: effectiveSessionKey,
});
let params: RunEmbeddedAgentParamsWithSessionFile = {
...paramsBase,
agentId: paramsBase.agentId ?? runSessionTarget.agentId,
sessionId: runSessionTarget.sessionId,
sessionKey: effectiveSessionKey ?? runSessionTarget.sessionKey,
sessionFile: runSessionTarget.sessionFile,
};
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
const globalLane = resolveGlobalLane(params.lane);
// Outer fallback attempts defer session suspension only while another

View File

@@ -29,6 +29,7 @@ import type {
} from "../../embedded-agent-subscribe.shared-types.js";
import type { FastModeAutoProgressState } from "../../fast-mode.js";
import type { AgentInternalEvent } from "../../internal-events.js";
import type { AgentRunSessionTarget } from "../../run-session-target.js";
import type { AgentMessage } from "../../runtime/index.js";
import type { SilentReplyPromptMode } from "../../system-prompt.types.js";
import type { PromptMode } from "../../system-prompt.types.js";
@@ -47,6 +48,8 @@ export type CurrentInboundPromptContext = {
export type RunEmbeddedAgentParams = {
sessionId: string;
sessionKey?: string;
/** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */
sessionTarget?: AgentRunSessionTarget;
/** Immutable gateway lifecycle ownership captured when this execution was admitted. */
lifecycleGeneration?: string;
/** Provider prompt-cache affinity key; distinct from transcript/session identity. */
@@ -122,7 +125,8 @@ export type RunEmbeddedAgentParams = {
forceHeartbeatTool?: boolean;
/** Allow runtime plugins for this run to late-bind the gateway subagent. */
allowGatewaySubagentBinding?: boolean;
sessionFile: string;
/** @deprecated Use sessionTarget plus sessionId/sessionKey/agentId for runtime identity. */
sessionFile?: string;
workspaceDir: string;
/** Task working directory for tool/runtime execution. Defaults to workspaceDir. */
cwd?: string;

View File

@@ -40,6 +40,7 @@ type EmbeddedRunAttemptBase = Omit<
| "fastMode"
| "lane"
| "enqueue"
| "sessionFile"
>;
export type EmbeddedRunContextWindowInfo = {
@@ -51,6 +52,8 @@ export type EmbeddedRunContextWindowInfo = {
export type EmbeddedRunFastModeParam = boolean | (() => boolean | undefined);
export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & {
/** Active file-backed artifact target resolved by the run/session target seam. */
sessionFile: string;
initialReplayState?: EmbeddedRunReplayState;
/** Pluggable context engine for ingest/assemble/compact lifecycle. */
contextEngine?: ContextEngine;

View File

@@ -206,13 +206,26 @@ export async function getSessionsSpawnTool(opts: CreateOpenClawToolsOpts) {
compact: async () => ({ ok: true, compacted: false }),
ingest: async () => ({ ingested: false }),
}),
resolveParentForkDecision: async () => ({
status: "fork",
maxTokens: 100_000,
}),
forkSessionFromParent: async () => ({
sessionId: "forked-session-id",
sessionFile: "/tmp/forked-session.jsonl",
forkSessionEntryFromParent: async () => ({
status: "forked",
fork: {
sessionId: "forked-session-id",
sessionFile: "/tmp/forked-session.jsonl",
},
parentEntry: {
sessionId: "parent-session-id",
updatedAt: Date.now(),
},
sessionEntry: {
sessionId: "forked-session-id",
sessionFile: "/tmp/forked-session.jsonl",
forkedFromParent: true,
updatedAt: Date.now(),
},
decision: {
status: "fork",
maxTokens: 100_000,
},
}),
updateSessionStore: async (_storePath, mutator) => mutator({}),
});

View File

@@ -0,0 +1,59 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { loadSessionStore } from "../config/sessions/store.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { resolveAgentRunSessionTarget } from "./run-session-target.js";
describe("agent run session target", () => {
let tempDir: string;
beforeEach(() => {
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-run-session-target-"));
});
afterEach(() => {
fs.rmSync(tempDir, { recursive: true, force: true });
});
it("resolves runtime identity through the run config store", async () => {
const storePath = path.join(tempDir, "custom-sessions", "sessions.json");
const sessionKey = "agent:helper:commitments:test-run";
const target = await resolveAgentRunSessionTarget({
agentId: "helper",
config: { session: { store: storePath } } as OpenClawConfig,
sessionId: "test-run",
sessionKey,
});
expect(target).toMatchObject({
agentId: "helper",
sessionId: "test-run",
sessionKey,
});
expect(path.dirname(target.sessionFile)).toBe(path.dirname(storePath));
expect(loadSessionStore(storePath, { skipCache: true })[sessionKey]?.sessionFile).toBe(
target.sessionFile,
);
});
it("uses the agent from an agent-scoped session key when agentId is omitted", async () => {
const storeRoot = path.join(tempDir, "agents", "{agentId}", "sessions.json");
const sessionKey = "agent:helper:main";
const target = await resolveAgentRunSessionTarget({
config: { session: { store: storeRoot } } as OpenClawConfig,
sessionId: "helper-session",
sessionKey,
});
const helperStorePath = path.join(tempDir, "agents", "helper", "sessions.json");
expect(target.agentId).toBe("helper");
expect(path.dirname(target.sessionFile)).toBe(path.dirname(helperStorePath));
expect(loadSessionStore(helperStorePath, { skipCache: true })[sessionKey]?.sessionFile).toBe(
target.sessionFile,
);
});
});

View File

@@ -0,0 +1,79 @@
import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce";
import { resolveStorePath } from "../config/sessions/paths.js";
import {
resolveSessionTranscriptRuntimeTarget,
type SessionTranscriptRuntimeTarget,
} from "../config/sessions/session-accessor.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { resolveAgentIdFromSessionKey } from "../routing/session-key.js";
/** Identifies a run transcript target without naming the current storage artifact. */
export type AgentRunSessionTarget = {
agentId?: string;
sessionId?: string;
sessionKey?: string;
storePath?: string;
threadId?: string | number;
};
/** File-backed target resolved from the storage-neutral run identity. */
export type ResolvedAgentRunSessionTarget = SessionTranscriptRuntimeTarget;
/** Resolves the active file-backed target used by current run/session internals. */
export async function resolveAgentRunSessionTarget(params: {
agentId?: string;
config?: OpenClawConfig;
sessionFile?: string;
sessionId: string;
sessionKey?: string;
sessionTarget?: AgentRunSessionTarget;
}): Promise<ResolvedAgentRunSessionTarget> {
const sessionTarget = params.sessionTarget;
const agentId = normalizeOptionalString(sessionTarget?.agentId) ?? params.agentId;
const sessionId = normalizeOptionalString(sessionTarget?.sessionId) ?? params.sessionId;
const sessionKey = normalizeOptionalString(sessionTarget?.sessionKey) ?? params.sessionKey;
const effectiveAgentId = agentId ?? resolveAgentIdFromSessionKey(sessionKey);
const sessionFile = normalizeOptionalString(params.sessionFile);
if (sessionFile) {
return {
agentId: effectiveAgentId ?? "",
sessionFile,
sessionId,
sessionKey: sessionKey ?? "",
};
}
if (!sessionKey) {
throw new Error(`Cannot resolve run session target without a session key: ${sessionId}`);
}
const storePath =
normalizeOptionalString(sessionTarget?.storePath) ??
resolveStorePath(params.config?.session?.store, { agentId: effectiveAgentId });
return await resolveSessionTranscriptRuntimeTarget({
...(effectiveAgentId ? { agentId: effectiveAgentId } : {}),
sessionId,
sessionKey,
storePath,
...(sessionTarget?.threadId !== undefined ? { threadId: sessionTarget.threadId } : {}),
});
}
/** Applies identity fields from the explicit target before legacy backfills run. */
export function applyAgentRunSessionTargetIdentity<
T extends {
agentId?: string;
sessionId: string;
sessionKey?: string;
sessionTarget?: AgentRunSessionTarget;
},
>(params: T): T {
const target = params.sessionTarget;
if (!target) {
return params;
}
return {
...params,
agentId: normalizeOptionalString(target.agentId) ?? params.agentId,
sessionId: normalizeOptionalString(target.sessionId) ?? params.sessionId,
sessionKey: normalizeOptionalString(target.sessionKey) ?? params.sessionKey,
};
}

View File

@@ -1,5 +1,3 @@
// Subagent spawn context tests cover isolated, forked, lightweight, and
// thread-bound bootstrap context preparation for child sessions.
import path from "node:path";
import { MAX_TIMER_TIMEOUT_MS } from "@openclaw/normalization-core/number-coercion";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
@@ -14,7 +12,9 @@ type GatewayRequest = { method?: string; params?: Record<string, unknown> };
describe("sessions_spawn context modes", () => {
const storePath = "/tmp/subagent-context-session-store.json";
const callGatewayMock = vi.fn();
const loadSessionStoreMock = vi.fn();
const updateSessionStoreMock = vi.fn();
const forkSessionEntryFromParentMock = vi.fn();
const forkSessionFromParentMock = vi.fn();
const ensureContextEnginesInitializedMock = vi.fn();
const resolveContextEngineMock = vi.fn();
@@ -25,7 +25,9 @@ describe("sessions_spawn context modes", () => {
beforeAll(async () => {
({ spawnSubagentDirect } = await loadSubagentSpawnModuleForTest({
callGatewayMock,
loadSessionStoreMock,
updateSessionStoreMock,
forkSessionEntryFromParentMock,
forkSessionFromParentMock,
ensureContextEnginesInitializedMock,
resolveContextEngineMock,
@@ -35,7 +37,9 @@ describe("sessions_spawn context modes", () => {
beforeEach(() => {
callGatewayMock.mockReset();
loadSessionStoreMock.mockReset();
updateSessionStoreMock.mockReset();
forkSessionEntryFromParentMock.mockReset();
forkSessionFromParentMock.mockReset();
ensureContextEnginesInitializedMock.mockReset();
resolveContextEngineMock.mockReset();
@@ -44,14 +48,78 @@ describe("sessions_spawn context modes", () => {
});
function usePersistentStoreMock(store: SessionStore) {
// The spawn path mutates the session store in-place; this mock keeps that
// contract visible without touching disk.
loadSessionStoreMock.mockReturnValue(store);
updateSessionStoreMock.mockImplementation(async (_storePath: unknown, mutator: unknown) => {
if (typeof mutator !== "function") {
throw new Error("missing session store mutator");
}
return await mutator(store);
});
forkSessionEntryFromParentMock.mockImplementation(
async (params: {
agentId: string;
fallbackEntry?: Record<string, unknown>;
parentStoreKeys?: string[];
sessionKey: string;
sessionsDir?: string;
}) => {
const parentEntry = params.parentStoreKeys
?.map((key) => store[key])
.find((entry): entry is Record<string, unknown> => Boolean(entry));
const maxTokens = 100_000;
const parentTokens = parentEntry?.totalTokens;
if (
typeof parentTokens === "number" &&
Number.isFinite(parentTokens) &&
parentTokens > maxTokens
) {
const sessionEntry = {
...params.fallbackEntry,
...store[params.sessionKey],
};
return {
status: "skipped",
reason: "decision-skip",
parentEntry,
sessionEntry,
decision: {
status: "skip",
reason: "parent-too-large",
maxTokens,
parentTokens,
message: `Parent context is too large to fork (${parentTokens}/${maxTokens} tokens); starting with isolated context instead.`,
},
};
}
const fork = await forkSessionFromParentMock({
parentEntry,
agentId: params.agentId,
sessionsDir: params.sessionsDir,
});
if (!fork) {
return { status: "failed" };
}
const sessionEntry = {
...params.fallbackEntry,
...store[params.sessionKey],
sessionId: fork.sessionId,
sessionFile: fork.sessionFile,
forkedFromParent: true,
};
store[params.sessionKey] = sessionEntry;
return {
status: "forked",
fork,
parentEntry,
sessionEntry,
decision: {
status: "fork",
maxTokens,
...(typeof parentTokens === "number" ? { parentTokens } : {}),
},
};
},
);
}
function requireAcceptedResult(result: Awaited<ReturnType<typeof spawnSubagentDirect>>) {
@@ -202,8 +270,6 @@ describe("sessions_spawn context modes", () => {
});
it("falls back to isolated context when requested fork is too large", async () => {
// Forking very large transcripts would create expensive child context, so
// the accepted run records the downgrade in its note.
const store: SessionStore = {
main: {
sessionId: "parent-session-id",

View File

@@ -10,6 +10,7 @@ export {
export { getRuntimeConfig } from "../config/config.js";
export { loadSessionStore, mergeSessionEntry, updateSessionStore } from "../config/sessions.js";
export {
forkSessionEntryFromParent,
forkSessionFromParent,
resolveParentForkDecision,
type ParentForkDecision,

View File

@@ -134,6 +134,7 @@ export async function loadSubagentSpawnModuleForTest(params: {
loadSessionStoreMock?: MockFn;
ensureContextEnginesInitializedMock?: MockFn;
updateSessionStoreMock?: MockFn;
forkSessionEntryFromParentMock?: MockFn;
forkSessionFromParentMock?: MockFn;
resolveContextEngineMock?: MockFn;
resolveParentForkDecisionMock?: MockFn;
@@ -215,6 +216,36 @@ export async function loadSubagentSpawnModuleForTest(params: {
params.dispatchGatewayMethodInProcessMock?.(...args),
hasInProcessGatewayContext: () => Boolean(params.hasInProcessGatewayContextMock?.()),
buildSubagentSystemPrompt: () => "system-prompt",
forkSessionEntryFromParent:
params.forkSessionEntryFromParentMock ??
(async () => {
const fork = (
params.forkSessionFromParentMock
? await params.forkSessionFromParentMock()
: { sessionId: "forked-session-id", sessionFile: "/tmp/forked-session.jsonl" }
) as { sessionId: string; sessionFile: string } | null;
if (!fork) {
return { status: "failed" };
}
return {
status: "forked",
fork,
parentEntry: {
sessionId: "parent-session-id",
sessionFile: "/tmp/parent-session.jsonl",
updatedAt: Date.now(),
},
sessionEntry: {
sessionId: fork.sessionId,
sessionFile: fork.sessionFile,
forkedFromParent: true,
},
decision: {
status: "fork",
maxTokens: 100_000,
},
};
}),
forkSessionFromParent:
params.forkSessionFromParentMock ??
(async () => ({ sessionId: "forked-session-id", sessionFile: "/tmp/forked-session.jsonl" })),

View File

@@ -88,7 +88,7 @@ import {
callGateway,
dispatchGatewayMethodInProcess,
emitSessionLifecycleEvent,
forkSessionFromParent,
forkSessionEntryFromParent,
getGlobalHookRunner,
getSessionBindingService,
getRuntimeConfig,
@@ -99,7 +99,6 @@ import {
normalizeDeliveryContext,
pruneLegacyStoreKeys,
ensureContextEnginesInitialized,
resolveParentForkDecision,
resolveAgentConfig,
resolveContextEngine,
resolveGatewaySessionStoreTarget,
@@ -133,26 +132,24 @@ function resolveConfiguredAgentIds(cfg: OpenClawConfig): string[] {
type SubagentSpawnDeps = {
callGateway: typeof callGateway;
dispatchGatewayMethodInProcess: typeof dispatchGatewayMethodInProcess;
forkSessionFromParent: typeof forkSessionFromParent;
forkSessionEntryFromParent: typeof forkSessionEntryFromParent;
getGlobalHookRunner: () => SubagentLifecycleHookRunner | null;
getRuntimeConfig: typeof getRuntimeConfig;
hasInProcessGatewayContext: typeof hasInProcessGatewayContext;
ensureContextEnginesInitialized: typeof ensureContextEnginesInitialized;
resolveContextEngine: typeof resolveContextEngine;
resolveParentForkDecision: typeof resolveParentForkDecision;
updateSessionStore: typeof updateSessionStore;
};
const defaultSubagentSpawnDeps: SubagentSpawnDeps = {
callGateway,
dispatchGatewayMethodInProcess,
forkSessionFromParent,
forkSessionEntryFromParent,
getGlobalHookRunner,
getRuntimeConfig,
hasInProcessGatewayContext,
ensureContextEnginesInitialized,
resolveContextEngine,
resolveParentForkDecision,
updateSessionStore,
};
@@ -510,52 +507,45 @@ async function prepareSubagentSessionContext(params: {
const sessionsDir = path.dirname(parentTarget.storePath);
try {
const forked = (await updateSubagentSessionStore(childTarget.storePath, async (store) => {
parentEntry = resolveStoreEntryByKeys(store, parentTarget.storeKeys);
childEntry = resolveStoreEntryByKeys(store, childTarget.storeKeys);
if (params.targetAgentId !== params.requesterAgentId) {
throw new Error(
'context="fork" currently requires the same target agent as the requester; use context="isolated" for cross-agent spawns.',
);
}
if (params.targetAgentId !== params.requesterAgentId) {
throw new Error(
'context="fork" currently requires the same target agent as the requester; use context="isolated" for cross-agent spawns.',
);
}
if (!parentEntry?.sessionId) {
throw new Error(
'context="fork" requested but the requester session transcript is not available.',
);
}
const forkDecision = await subagentSpawnDeps.resolveParentForkDecision({
parentEntry,
storePath: parentTarget.storePath,
});
if (forkDecision.status === "skip") {
forkFallbackNote = forkDecision.message;
return null;
}
const fork = await subagentSpawnDeps.forkSessionFromParent({
parentEntry,
agentId: params.requesterAgentId,
sessionsDir,
});
if (!fork) {
throw new Error(
'context="fork" requested but OpenClaw could not fork the requester transcript.',
);
}
pruneLegacyStoreKeys({
store,
canonicalKey: childTarget.canonicalKey,
candidates: childTarget.storeKeys,
});
store[childTarget.canonicalKey] = mergeSessionEntry(store[childTarget.canonicalKey], {
sessionId: fork.sessionId,
sessionFile: fork.sessionFile,
forkedFromParent: true,
});
childEntry = store[childTarget.canonicalKey];
return fork;
})) as { sessionId: string; sessionFile: string } | null;
const forkedResult = await subagentSpawnDeps.forkSessionEntryFromParent({
storePath: childTarget.storePath,
parentSessionKey: parentTarget.canonicalKey,
parentStoreKeys: parentTarget.storeKeys,
sessionKey: childTarget.canonicalKey,
sessionStoreKeys: childTarget.storeKeys,
fallbackEntry: { sessionId: "", updatedAt: Date.now() },
agentId: params.requesterAgentId,
sessionsDir,
});
if (forkedResult.status === "missing-parent") {
throw new Error(
'context="fork" requested but the requester session transcript is not available.',
);
}
if (forkedResult.status === "failed" || forkedResult.status === "missing-entry") {
throw new Error(
'context="fork" requested but OpenClaw could not fork the requester transcript.',
);
}
parentEntry = forkedResult.parentEntry;
childEntry = forkedResult.sessionEntry;
if (forkedResult.status === "skipped") {
forkFallbackNote =
forkedResult.decision?.status === "skip" ? forkedResult.decision.message : undefined;
}
const forked =
forkedResult.status === "forked"
? {
sessionId: forkedResult.fork.sessionId,
sessionFile: forkedResult.fork.sessionFile,
}
: null;
if (params.contextMode === "fork") {
if (!parentEntry || !forked) {

View File

@@ -0,0 +1,87 @@
// Tests parent-session fork facade storage-boundary behavior.
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { forkSessionEntryFromParent } from "./session-fork.js";
const runtimeMocks = vi.hoisted(() => ({
forkSessionFromParentRuntime: vi.fn(),
resolveParentForkTokenCountRuntime: vi.fn(),
}));
vi.mock("./session-fork.runtime.js", () => runtimeMocks);
const roots: string[] = [];
async function makeRoot(prefix: string): Promise<string> {
const root = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
roots.push(root);
return root;
}
afterEach(async () => {
runtimeMocks.forkSessionFromParentRuntime.mockReset();
runtimeMocks.resolveParentForkTokenCountRuntime.mockReset();
await Promise.all(roots.splice(0).map((root) => fs.rm(root, { recursive: true, force: true })));
});
describe("forkSessionEntryFromParent", () => {
it("forks transcripts in the directory for the store being mutated", async () => {
const root = await makeRoot("openclaw-session-fork-boundary-");
const activeStoreDir = path.join(root, "active-store");
const configStoreDir = path.join(root, "config-store");
await fs.mkdir(activeStoreDir, { recursive: true });
await fs.mkdir(configStoreDir, { recursive: true });
const storePath = path.join(activeStoreDir, "sessions.json");
const configStorePath = path.join(configStoreDir, "sessions.json");
const parentSessionKey = "agent:main:main";
const sessionKey = "agent:main:subagent:child";
await fs.writeFile(
storePath,
JSON.stringify(
{
[parentSessionKey]: {
sessionId: "parent-session",
sessionFile: path.join(activeStoreDir, "parent.jsonl"),
updatedAt: 1,
},
[sessionKey]: { sessionId: "", updatedAt: 2 },
},
null,
2,
),
"utf-8",
);
runtimeMocks.resolveParentForkTokenCountRuntime.mockResolvedValue(10);
runtimeMocks.forkSessionFromParentRuntime.mockImplementation(
async ({ sessionsDir }: { sessionsDir: string }) => ({
sessionId: "forked-session",
sessionFile: path.join(sessionsDir, "forked-session.jsonl"),
}),
);
const result = await forkSessionEntryFromParent({
agentId: "main",
config: { session: { store: configStorePath } } as OpenClawConfig,
fallbackEntry: { sessionId: "", updatedAt: 2 },
parentSessionKey,
sessionKey,
storePath,
});
expect(result.status).toBe("forked");
expect(runtimeMocks.forkSessionFromParentRuntime).toHaveBeenCalledWith(
expect.objectContaining({
sessionsDir: activeStoreDir,
}),
);
const stored = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
string,
{ sessionFile?: string }
>;
expect(stored[sessionKey]?.sessionFile).toBe(path.join(activeStoreDir, "forked-session.jsonl"));
});
});

View File

@@ -1,5 +1,8 @@
/** Public session-fork facade with parent-size admission checks. */
import type { SessionEntry } from "../../config/sessions/types.js";
import path from "node:path";
import { resolveStorePath } from "../../config/sessions/paths.js";
import { updateSessionStore } from "../../config/sessions/store.js";
import { mergeSessionEntry, type SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
/**
@@ -10,7 +13,6 @@ import { createLazyImportLoader } from "../../shared/lazy-promise.js";
const DEFAULT_PARENT_FORK_MAX_TOKENS = 100_000;
const sessionForkRuntimeLoader = createLazyImportLoader(() => import("./session-fork.runtime.js"));
/** Decision for whether a child session should fork parent context or start isolated. */
export type ParentForkDecision =
| {
status: "fork";
@@ -25,6 +27,66 @@ export type ParentForkDecision =
message: string;
};
type ParentForkDecisionParams = {
parentEntry: SessionEntry;
agentId?: string;
config?: OpenClawConfig;
storePath?: string;
};
type ForkSessionFromParentParams = {
parentEntry: SessionEntry;
agentId: string;
config?: OpenClawConfig;
sessionsDir?: string;
};
export type ForkedParentSessionEntry = {
sessionId: string;
sessionFile: string;
};
export type ForkSessionEntryFromParentResult =
| {
status: "forked";
fork: ForkedParentSessionEntry;
parentEntry: SessionEntry;
sessionEntry: SessionEntry;
decision: Extract<ParentForkDecision, { status: "fork" }>;
}
| {
status: "skipped";
reason: "existing-entry" | "decision-skip";
parentEntry?: SessionEntry;
sessionEntry: SessionEntry;
decision?: ParentForkDecision;
}
| { status: "missing-entry" }
| { status: "missing-parent" }
| { status: "failed" };
export type ForkSessionEntryFromParentParams = Omit<ForkSessionFromParentParams, "parentEntry"> & {
parentSessionKey: string;
parentStoreKeys?: readonly string[];
sessionKey: string;
sessionStoreKeys?: readonly string[];
storePath?: string;
fallbackEntry?: SessionEntry;
patch?: (params: {
entry: SessionEntry;
parentEntry: SessionEntry;
fork: ForkedParentSessionEntry;
decision: Extract<ParentForkDecision, { status: "fork" }>;
}) => Partial<SessionEntry>;
skipForkWhen?: (entry: SessionEntry) => boolean;
skipPatch?: (entry: SessionEntry) => Partial<SessionEntry> | null;
decisionSkipPatch?: (params: {
decision: Extract<ParentForkDecision, { status: "skip" }>;
entry: SessionEntry;
parentEntry: SessionEntry;
}) => Partial<SessionEntry> | null;
};
function loadSessionForkRuntime(): Promise<typeof import("./session-fork.runtime.js")> {
return sessionForkRuntimeLoader.load();
}
@@ -39,15 +101,31 @@ function formatParentForkTooLargeMessage(params: {
);
}
/** Decides whether parent context is small enough to fork into a child session. */
export async function resolveParentForkDecision(params: {
parentEntry: SessionEntry;
storePath: string;
}): Promise<ParentForkDecision> {
function resolveParentForkStorePath(params: {
agentId?: string;
config?: OpenClawConfig;
storePath?: string;
}): string {
return (
params.storePath ?? resolveStorePath(params.config?.session?.store, { agentId: params.agentId })
);
}
function resolveParentForkSessionsDir(params: {
agentId: string;
config?: OpenClawConfig;
sessionsDir?: string;
}): string {
return params.sessionsDir ?? path.dirname(resolveParentForkStorePath(params));
}
export async function resolveParentForkDecision(
params: ParentForkDecisionParams,
): Promise<ParentForkDecision> {
const maxTokens = DEFAULT_PARENT_FORK_MAX_TOKENS;
const parentTokens = await resolveParentForkTokenCount({
parentEntry: params.parentEntry,
storePath: params.storePath,
storePath: resolveParentForkStorePath(params),
});
if (typeof parentTokens === "number" && parentTokens > maxTokens) {
return {
@@ -65,14 +143,151 @@ export async function resolveParentForkDecision(params: {
};
}
/** Forks a new session transcript from a parent session. */
export async function forkSessionFromParent(params: {
parentEntry: SessionEntry;
agentId: string;
sessionsDir: string;
}): Promise<{ sessionId: string; sessionFile: string } | null> {
export async function forkSessionFromParent(
params: ForkSessionFromParentParams,
): Promise<{ sessionId: string; sessionFile: string } | null> {
const runtime = await loadSessionForkRuntime();
return runtime.forkSessionFromParentRuntime(params);
return runtime.forkSessionFromParentRuntime({
...params,
sessionsDir: resolveParentForkSessionsDir(params),
});
}
function resolveEntryFromStoreKeys(params: {
store: Record<string, SessionEntry>;
keys: readonly string[];
}): SessionEntry | undefined {
for (const key of params.keys) {
const entry = params.store[key];
if (entry) {
return entry;
}
}
return undefined;
}
function persistForkedSessionEntry(params: {
store: Record<string, SessionEntry>;
sessionKey: string;
sessionStoreKeys?: readonly string[];
existing: SessionEntry;
patch: Partial<SessionEntry>;
}): SessionEntry {
const next = mergeSessionEntry(params.existing, params.patch);
params.store[params.sessionKey] = next;
for (const key of params.sessionStoreKeys ?? []) {
if (key !== params.sessionKey) {
delete params.store[key];
}
}
return next;
}
/**
* Forks the parent transcript and persists the child session entry through one
* storage boundary operation.
*/
export async function forkSessionEntryFromParent(
params: ForkSessionEntryFromParentParams,
): Promise<ForkSessionEntryFromParentResult> {
const storePath = resolveParentForkStorePath(params);
return await updateSessionStore(
storePath,
async (store) => {
const parentEntry = resolveEntryFromStoreKeys({
store,
keys: params.parentStoreKeys ?? [params.parentSessionKey],
});
if (!parentEntry?.sessionId) {
return { status: "missing-parent" };
}
const entry =
resolveEntryFromStoreKeys({
store,
keys: params.sessionStoreKeys ?? [params.sessionKey],
}) ?? params.fallbackEntry;
if (!entry) {
return { status: "missing-entry" };
}
if (params.skipForkWhen?.(entry)) {
const patch = params.skipPatch?.(entry);
const sessionEntry = patch
? persistForkedSessionEntry({
store,
sessionKey: params.sessionKey,
sessionStoreKeys: params.sessionStoreKeys,
existing: entry,
patch,
})
: entry;
return { status: "skipped", reason: "existing-entry", parentEntry, sessionEntry };
}
const decision = await resolveParentForkDecision({
parentEntry,
agentId: params.agentId,
config: params.config,
storePath,
});
if (decision.status === "skip") {
const patch = params.decisionSkipPatch?.({ decision, entry, parentEntry });
const sessionEntry = patch
? persistForkedSessionEntry({
store,
sessionKey: params.sessionKey,
sessionStoreKeys: params.sessionStoreKeys,
existing: entry,
patch,
})
: entry;
return {
status: "skipped",
reason: "decision-skip",
parentEntry,
sessionEntry,
decision,
};
}
const fork = await forkSessionFromParent({
parentEntry,
agentId: params.agentId,
config: params.config,
sessionsDir: params.sessionsDir ?? path.dirname(storePath),
});
if (!fork) {
return { status: "failed" };
}
const sessionEntry = persistForkedSessionEntry({
store,
sessionKey: params.sessionKey,
sessionStoreKeys: params.sessionStoreKeys,
existing: entry,
patch: {
...params.patch?.({ entry, parentEntry, fork, decision }),
sessionId: fork.sessionId,
sessionFile: fork.sessionFile,
forkedFromParent: true,
},
});
return {
status: "forked",
fork,
parentEntry,
sessionEntry,
decision,
};
},
{
skipSaveWhenResult: (result) =>
result.status === "missing-entry" ||
result.status === "missing-parent" ||
result.status === "failed" ||
(result.status === "skipped" && result.sessionEntry === params.fallbackEntry),
},
);
}
async function resolveParentForkTokenCount(params: {

View File

@@ -50,6 +50,97 @@ type ForkSessionParamsForTest = {
};
vi.mock("./session-fork.js", () => ({
forkSessionEntryFromParent: async (params: {
fallbackEntry?: SessionEntry;
parentSessionKey: string;
storePath: string;
patch?: (patchParams: {
entry: SessionEntry;
parentEntry: SessionEntry;
fork: { sessionId: string; sessionFile: string };
decision: { status: "fork"; maxTokens: number; parentTokens?: number };
}) => Partial<SessionEntry>;
decisionSkipPatch?: (patchParams: {
decision: {
status: "skip";
reason: "parent-too-large";
maxTokens: number;
parentTokens: number;
message: string;
};
entry: SessionEntry;
parentEntry: SessionEntry;
}) => Partial<SessionEntry>;
sessionsDir: string;
}) => {
const store = JSON.parse(await fs.readFile(params.storePath, "utf-8")) as Record<
string,
SessionEntry
>;
const parentEntry = store[params.parentSessionKey];
if (!parentEntry?.sessionId) {
return { status: "missing-parent" };
}
const maxTokens = 100_000;
const parentTokens = await sessionForkMocks.resolveParentForkTokenCount({
parentEntry,
storePath: params.storePath,
});
if (typeof parentTokens === "number" && parentTokens > maxTokens) {
const entry = params.fallbackEntry ?? { sessionId: "", updatedAt: Date.now() };
const decision = {
status: "skip" as const,
reason: "parent-too-large" as const,
maxTokens,
parentTokens,
message: `Parent context is too large to fork (${parentTokens}/${maxTokens} tokens); starting with isolated context instead.`,
};
return {
status: "skipped",
reason: "decision-skip",
parentEntry,
sessionEntry: {
...entry,
...params.decisionSkipPatch?.({ decision, entry, parentEntry }),
},
decision,
};
}
const fork = await sessionForkMocks.forkSessionFromParent({
parentEntry,
sessionsDir: params.sessionsDir,
});
if (!fork) {
return { status: "failed" };
}
const entry = params.fallbackEntry ?? { sessionId: "", updatedAt: Date.now() };
return {
status: "forked",
fork,
parentEntry,
sessionEntry: {
...entry,
...params.patch?.({
entry,
parentEntry,
fork,
decision: {
status: "fork",
maxTokens,
...(typeof parentTokens === "number" ? { parentTokens } : {}),
},
}),
sessionId: fork.sessionId,
sessionFile: fork.sessionFile,
forkedFromParent: true,
},
decision: {
status: "fork",
maxTokens,
...(typeof parentTokens === "number" ? { parentTokens } : {}),
},
};
},
forkSessionFromParent: (...args: [ForkSessionParamsForTest]) =>
sessionForkMocks.forkSessionFromParent(...args),
resolveParentForkTokenCount: (...args: [{ parentEntry: SessionEntry; storePath: string }]) =>

View File

@@ -73,7 +73,7 @@ import {
resolveLastChannelRaw,
resolveLastToRaw,
} from "./session-delivery.js";
import { forkSessionFromParent, resolveParentForkDecision } from "./session-fork.js";
import { forkSessionEntryFromParent } from "./session-fork.js";
import { buildSessionEndHookPayload, buildSessionStartHookPayload } from "./session-hooks.js";
import { clearSessionResetRuntimeState } from "./session-reset-cleanup.js";
@@ -753,47 +753,39 @@ export async function initSessionState(params: {
const parentSessionKey = normalizeOptionalString(ctx.ParentSessionKey);
const alreadyForked = sessionEntry.forkedFromParent === true;
let inheritedParentContext = false;
if (
parentSessionKey &&
parentSessionKey !== sessionKey &&
sessionStore[parentSessionKey] &&
!alreadyForked
) {
const parentEntry = sessionStore[parentSessionKey];
const forkDecision = await resolveParentForkDecision({
parentEntry,
if (parentSessionKey && parentSessionKey !== sessionKey && !alreadyForked) {
const forked = await forkSessionEntryFromParent({
parentSessionKey,
sessionKey,
storePath,
fallbackEntry: sessionEntry,
agentId,
sessionsDir: path.dirname(storePath),
decisionSkipPatch: () => ({ ...sessionEntry, forkedFromParent: true }),
patch: () => ({
...sessionEntry,
totalTokens: undefined,
totalTokensFresh: false,
}),
});
if (forkDecision.status === "skip") {
if (forked.status === "skipped" && forked.decision?.status === "skip") {
// The parent branch is too large to inherit usefully. Start fresh and
// mark as handled so the thread does not retry this decision every turn.
log.warn(
`skipping parent fork (parent too large): parentKey=${parentSessionKey} → sessionKey=${sessionKey} ` +
`parentTokens=${forkDecision.parentTokens} maxTokens=${forkDecision.maxTokens}`,
`parentTokens=${forked.decision.parentTokens} maxTokens=${forked.decision.maxTokens}`,
);
sessionEntry.forkedFromParent = true;
} else {
sessionEntry = forked.sessionEntry;
} else if (forked.status === "forked") {
log.warn(
`forking from parent session: parentKey=${parentSessionKey} → sessionKey=${sessionKey} ` +
`parentTokens=${forkDecision.parentTokens ?? "unknown"}`,
`parentTokens=${forked.decision.parentTokens ?? "unknown"}`,
);
const forked = await forkSessionFromParent({
parentEntry,
agentId,
sessionsDir: path.dirname(storePath),
});
if (forked) {
sessionId = forked.sessionId;
sessionEntry.sessionId = forked.sessionId;
sessionEntry.sessionFile = forked.sessionFile;
sessionEntry.forkedFromParent = true;
// The fork replaces the target transcript with inherited parent
// history, so any prior target-session token snapshot is stale.
sessionEntry.totalTokens = undefined;
sessionEntry.totalTokensFresh = false;
inheritedParentContext = true;
log.warn(`forked session created: file=${forked.sessionFile}`);
}
sessionId = forked.fork.sessionId;
sessionEntry = forked.sessionEntry;
sessionEntry.forkedFromParent = true;
inheritedParentContext = true;
log.warn(`forked session created: file=${forked.fork.sessionFile}`);
}
}
const threadIdFromSessionKey = parseSessionThreadInfoFast(

View File

@@ -2,7 +2,6 @@
// restore/preview/send flows over session stores, transcripts, and active runs.
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
@@ -86,7 +85,7 @@ import {
import { ADMIN_SCOPE } from "../operator-scopes.js";
import { resolveSessionKeyForRun } from "../server-session-key.js";
import {
forkCompactionCheckpointTranscriptAsync,
createFileBackedCompactionCheckpointStore,
getSessionCompactionCheckpoint,
listSessionCompactionCheckpoints,
} from "../session-compaction-checkpoints.js";
@@ -136,6 +135,8 @@ import type {
} from "./types.js";
import { assertValidParams } from "./validation.js";
const compactionCheckpointStore = createFileBackedCompactionCheckpointStore();
function filterSessionStoreToConfiguredAgents(
cfg: OpenClawConfig,
store: Record<string, SessionEntry>,
@@ -350,74 +351,6 @@ function buildDashboardSessionKey(agentId: string): string {
return `agent:${agentId}:dashboard:${randomUUID()}`;
}
function cloneCheckpointSessionEntry(params: {
currentEntry: SessionEntry;
nextSessionId: string;
nextSessionFile: string;
label?: string;
parentSessionKey?: string;
totalTokens?: number;
preserveCompactionCheckpoints?: boolean;
}): SessionEntry {
return {
...params.currentEntry,
sessionId: params.nextSessionId,
sessionFile: params.nextSessionFile,
updatedAt: Date.now(),
systemSent: false,
abortedLastRun: false,
startedAt: undefined,
endedAt: undefined,
runtimeMs: undefined,
status: undefined,
inputTokens: undefined,
outputTokens: undefined,
cacheRead: undefined,
cacheWrite: undefined,
estimatedCostUsd: undefined,
totalTokens:
typeof params.totalTokens === "number" && Number.isFinite(params.totalTokens)
? params.totalTokens
: undefined,
totalTokensFresh:
typeof params.totalTokens === "number" && Number.isFinite(params.totalTokens)
? true
: undefined,
label: params.label ?? params.currentEntry.label,
parentSessionKey: params.parentSessionKey ?? params.currentEntry.parentSessionKey,
compactionCheckpoints: params.preserveCompactionCheckpoints
? params.currentEntry.compactionCheckpoints
: undefined,
};
}
function resolveCheckpointForkSource(
checkpoint: NonNullable<ReturnType<typeof getSessionCompactionCheckpoint>>,
): { sourceFile: string; sourceLeafId?: string; totalTokens?: number } | null {
const preCompactionFile = checkpoint.preCompaction.sessionFile?.trim();
if (preCompactionFile) {
return {
sourceFile: preCompactionFile,
sourceLeafId: checkpoint.preCompaction.entryId ?? checkpoint.preCompaction.leafId,
totalTokens: checkpoint.tokensBefore,
};
}
const postCompactionFile = checkpoint.postCompaction.sessionFile?.trim();
if (!postCompactionFile) {
return null;
}
const postCompactionLeafId =
checkpoint.postCompaction.entryId ?? checkpoint.postCompaction.leafId;
if (!postCompactionLeafId) {
return null;
}
return {
sourceFile: postCompactionFile,
sourceLeafId: postCompactionLeafId,
totalTokens: checkpoint.tokensAfter,
};
}
function isAgentMainSessionKey(cfg: OpenClawConfig, sessionKey: string): boolean {
const parsed = parseAgentSessionKey(sessionKey);
if (!parsed) {
@@ -1663,7 +1596,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const loaded = loadSessionEntry(key, { agentId: requestedAgent.agentId });
const { cfg: loadedCfg, entry, canonicalKey } = loaded;
const { cfg: loadedCfg, entry, canonicalKey, legacyKey } = loaded;
const target = resolveGatewaySessionStoreTarget({
cfg: loadedCfg,
key: canonicalKey,
@@ -1678,8 +1611,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const checkpoint = getSessionCompactionCheckpoint({ entry, checkpointId });
const forkSource = checkpoint ? resolveCheckpointForkSource(checkpoint) : null;
if (!checkpoint || !forkSource) {
if (!checkpoint) {
respond(
false,
undefined,
@@ -1687,12 +1619,34 @@ export const sessionsHandlers: GatewayRequestHandlers = {
);
return;
}
const branchedSession = await forkCompactionCheckpointTranscriptAsync({
sourceFile: forkSource.sourceFile,
sourceLeafId: forkSource.sourceLeafId,
sessionDir: path.dirname(forkSource.sourceFile),
const nextKey = buildDashboardSessionKey(target.agentId);
const branchedSession = await compactionCheckpointStore.branchCheckpointSession({
storePath: target.storePath,
sourceKey: canonicalKey,
sourceStoreKey: legacyKey,
nextKey,
checkpointId,
});
if (!branchedSession?.sessionFile) {
if (
branchedSession.status === "missing-checkpoint" ||
branchedSession.status === "missing-boundary"
) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `checkpoint not found: ${checkpointId}`),
);
return;
}
if (branchedSession.status === "missing-session") {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `session not found: ${key}`),
);
return;
}
if (branchedSession.status === "failed") {
respond(
false,
undefined,
@@ -1700,30 +1654,16 @@ export const sessionsHandlers: GatewayRequestHandlers = {
);
return;
}
const nextKey = buildDashboardSessionKey(target.agentId);
const label = entry.label?.trim() ? `${entry.label.trim()} (checkpoint)` : "Checkpoint branch";
const nextEntry = cloneCheckpointSessionEntry({
currentEntry: entry,
nextSessionId: branchedSession.sessionId,
nextSessionFile: branchedSession.sessionFile,
label,
parentSessionKey: canonicalKey,
totalTokens: forkSource.totalTokens,
});
await updateSessionStore(target.storePath, (store) => {
store[nextKey] = nextEntry;
});
respond(
true,
{
ok: true,
sourceKey: canonicalKey,
key: nextKey,
sessionId: nextEntry.sessionId,
checkpoint,
entry: nextEntry,
key: branchedSession.key,
sessionId: branchedSession.entry.sessionId,
checkpoint: branchedSession.checkpoint,
entry: branchedSession.entry,
},
undefined,
);
@@ -1735,7 +1675,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
reason: "checkpoint-branch",
});
emitSessionsChanged(context, {
sessionKey: nextKey,
sessionKey: branchedSession.key,
reason: "checkpoint-branch",
});
},
@@ -1778,7 +1718,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const loaded = loadSessionEntry(key, { agentId: requestedAgent.agentId });
const { entry, canonicalKey, storePath } = loaded;
const { entry, canonicalKey, legacyKey, storePath } = loaded;
if (!entry?.sessionId) {
respond(
false,
@@ -1788,8 +1728,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const checkpoint = getSessionCompactionCheckpoint({ entry, checkpointId });
const forkSource = checkpoint ? resolveCheckpointForkSource(checkpoint) : null;
if (!checkpoint || !forkSource) {
if (!checkpoint) {
respond(
false,
undefined,
@@ -1812,12 +1751,32 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const restoredSession = await forkCompactionCheckpointTranscriptAsync({
sourceFile: forkSource.sourceFile,
sourceLeafId: forkSource.sourceLeafId,
sessionDir: path.dirname(forkSource.sourceFile),
const restoredSession = await compactionCheckpointStore.restoreCheckpointSession({
storePath,
sessionKey: canonicalKey,
sessionStoreKey: legacyKey,
checkpointId,
});
if (!restoredSession?.sessionFile) {
if (
restoredSession.status === "missing-checkpoint" ||
restoredSession.status === "missing-boundary"
) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `checkpoint not found: ${checkpointId}`),
);
return;
}
if (restoredSession.status === "missing-session") {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `session not found: ${key}`),
);
return;
}
if (restoredSession.status === "failed") {
respond(
false,
undefined,
@@ -1825,26 +1784,15 @@ export const sessionsHandlers: GatewayRequestHandlers = {
);
return;
}
const nextEntry = cloneCheckpointSessionEntry({
currentEntry: entry,
nextSessionId: restoredSession.sessionId,
nextSessionFile: restoredSession.sessionFile,
totalTokens: forkSource.totalTokens,
preserveCompactionCheckpoints: true,
});
await updateSessionStore(storePath, (store) => {
store[canonicalKey] = nextEntry;
});
respond(
true,
{
ok: true,
key: canonicalKey,
sessionId: nextEntry.sessionId,
checkpoint,
entry: nextEntry,
key: restoredSession.key,
sessionId: restoredSession.entry.sessionId,
checkpoint: restoredSession.checkpoint,
entry: restoredSession.entry,
},
undefined,
);

View File

@@ -12,6 +12,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
import {
captureCompactionCheckpointSnapshotAsync,
cleanupCompactionCheckpointSnapshot,
createFileBackedCompactionCheckpointStore,
forkCompactionCheckpointTranscriptAsync,
MAX_COMPACTION_CHECKPOINT_LEAF_SCAN_BYTES,
MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION,
@@ -91,7 +92,10 @@ function checkpointConfig(storePath: string): OpenClawConfig {
async function writeSessionStore(
storePath: string,
sessionKey: string,
entry: { sessionId: string; updatedAt: number; compactionCheckpoints?: unknown[] },
entry: { sessionId: string; updatedAt: number; compactionCheckpoints?: unknown[] } & Record<
string,
unknown
>,
): Promise<void> {
await fs.writeFile(storePath, JSON.stringify({ [sessionKey]: entry }, null, 2), "utf-8");
}
@@ -629,6 +633,79 @@ describe("session-compaction-checkpoints", () => {
]);
});
test("file-backed checkpoint store restores from the stored transcript boundary", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-store-"));
tempDirs.push(dir);
const session = SessionManager.create(dir, dir);
session.appendMessage({
role: "user",
content: "checkpoint source",
timestamp: Date.now(),
});
const checkpointLeafId = requireNonEmptyString(
session.getLeafId(),
"checkpoint leaf id missing",
);
session.appendMessage({
role: "assistant",
content: "future turn",
api: "responses",
provider: "openai",
model: "gpt-test",
timestamp: Date.now(),
} as unknown as AssistantMessage);
const sessionFile = requireNonEmptyString(session.getSessionFile(), "session file missing");
const storePath = path.join(dir, "sessions.json");
await writeSessionStore(storePath, MAIN_SESSION_KEY, {
sessionId: "current-session",
sessionFile,
updatedAt: Date.now() - 1,
totalTokens: 200,
compactionCheckpoints: [
{
checkpointId: "checkpoint-1",
sessionKey: MAIN_SESSION_KEY,
sessionId: "stored-session",
createdAt: Date.now(),
reason: "manual",
tokensAfter: 45,
preCompaction: { sessionId: "pre-session", leafId: "pre-leaf" },
postCompaction: {
sessionId: "post-session",
sessionFile,
leafId: checkpointLeafId,
},
},
],
});
const store = createFileBackedCompactionCheckpointStore();
const restored = await store.restoreCheckpointSession({
storePath,
sessionKey: MAIN_SESSION_KEY,
checkpointId: "checkpoint-1",
});
if (restored.status !== "created") {
throw new Error("expected restored checkpoint transcript");
}
expect(restored.entry.totalTokens).toBe(45);
const restoredSessionFile = requireNonEmptyString(
restored.entry.sessionFile,
"restored session file missing",
);
const messages = SessionManager.open(restoredSessionFile, dir).buildSessionContext().messages;
expect(messages.map((message) => (message as { content?: unknown }).content)).toEqual([
"checkpoint source",
]);
const nextStore = await readSessionStore<{ sessionFile?: string; totalTokens?: number }>(
storePath,
);
expect(nextStore[MAIN_SESSION_KEY]?.sessionFile).toBe(restored.entry.sessionFile);
expect(nextStore[MAIN_SESSION_KEY]?.totalTokens).toBe(45);
});
test("async fork migrates legacy checkpoint snapshots before writing a current header", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-legacy-fork-"));
tempDirs.push(dir);

View File

@@ -57,6 +57,87 @@ type ForkedCompactionCheckpointTranscript = {
sessionFile: string;
};
export type CompactionCheckpointForkedTranscript = ForkedCompactionCheckpointTranscript & {
totalTokens?: number;
};
export type CompactionCheckpointTranscriptForkResult =
| { status: "created"; transcript: CompactionCheckpointForkedTranscript }
| { status: "missing-boundary" }
| { status: "failed" };
export type CompactionCheckpointSessionMutationResult =
| {
status: "created";
key: string;
checkpoint: SessionCompactionCheckpoint;
entry: SessionEntry;
}
| { status: "missing-session" }
| { status: "missing-checkpoint" }
| { status: "missing-boundary" }
| { status: "failed" };
export type BranchCheckpointSessionParams = {
storePath: string;
sourceKey: string;
sourceStoreKey?: string;
nextKey: string;
checkpointId: string;
};
export type RestoreCheckpointSessionParams = {
storePath: string;
sessionKey: string;
sessionStoreKey?: string;
checkpointId: string;
};
export type PersistSessionCompactionCheckpointParams = {
cfg: OpenClawConfig;
sessionKey: string;
sessionId: string;
reason: SessionCompactionCheckpointReason;
snapshot: CapturedCompactionCheckpointSnapshot;
summary?: string;
firstKeptEntryId?: string;
tokensBefore?: number;
tokensAfter?: number;
postSessionFile?: string;
postLeafId?: string;
postEntryId?: string;
createdAt?: number;
};
/**
* Storage boundary for compaction checkpoint capture, persistence, branch,
* restore, and cleanup operations.
*/
export type CompactionCheckpointStore = {
/** Captures the pre-compaction transcript identity without copying rows/files. */
captureSnapshot: typeof captureCompactionCheckpointSnapshotAsync;
/** Persists checkpoint metadata and prunes checkpoint artifacts owned by this store. */
persistCheckpoint: (
params: PersistSessionCompactionCheckpointParams,
) => Promise<SessionCompactionCheckpoint | null>;
/** Cleans unpersisted legacy snapshot artifacts after failed persistence. */
cleanupSnapshot: typeof cleanupCompactionCheckpointSnapshot;
/**
* Creates a checkpoint branch and records its session entry in one logical
* store mutation.
*/
branchCheckpointSession: (
params: BranchCheckpointSessionParams,
) => Promise<CompactionCheckpointSessionMutationResult>;
/**
* Restores a checkpoint and replaces the current session entry in one logical
* store mutation.
*/
restoreCheckpointSession: (
params: RestoreCheckpointSessionParams,
) => Promise<CompactionCheckpointSessionMutationResult>;
};
function checkpointSnapshotPath(checkpoint: SessionCompactionCheckpoint): string | undefined {
return checkpoint.preCompaction.sessionFile?.trim() || undefined;
}
@@ -401,6 +482,209 @@ export async function forkCompactionCheckpointTranscriptAsync(params: {
}
}
function resolveCheckpointTranscriptForkSource(
checkpoint: SessionCompactionCheckpoint,
): { sourceFile: string; sourceLeafId?: string; totalTokens?: number } | null {
const preCompactionFile = checkpoint.preCompaction.sessionFile?.trim();
if (preCompactionFile) {
return {
sourceFile: preCompactionFile,
sourceLeafId: checkpoint.preCompaction.entryId ?? checkpoint.preCompaction.leafId,
totalTokens: checkpoint.tokensBefore,
};
}
const postCompactionFile = checkpoint.postCompaction.sessionFile?.trim();
if (!postCompactionFile) {
return null;
}
const postCompactionLeafId =
checkpoint.postCompaction.entryId ?? checkpoint.postCompaction.leafId;
if (!postCompactionLeafId) {
return null;
}
return {
sourceFile: postCompactionFile,
sourceLeafId: postCompactionLeafId,
totalTokens: checkpoint.tokensAfter,
};
}
async function forkCheckpointTranscriptFromStoredBoundary(params: {
checkpoint: SessionCompactionCheckpoint;
sessionDir?: string;
targetCwd?: string;
}): Promise<CompactionCheckpointTranscriptForkResult> {
const forkSource = resolveCheckpointTranscriptForkSource(params.checkpoint);
if (!forkSource) {
return { status: "missing-boundary" };
}
const forked = await forkCompactionCheckpointTranscriptAsync({
sourceFile: forkSource.sourceFile,
sourceLeafId: forkSource.sourceLeafId,
sessionDir: params.sessionDir ?? path.dirname(forkSource.sourceFile),
...(params.targetCwd ? { targetCwd: params.targetCwd } : {}),
});
if (!forked) {
return { status: "failed" };
}
return {
status: "created",
transcript: {
...forked,
...(typeof forkSource.totalTokens === "number"
? { totalTokens: forkSource.totalTokens }
: {}),
},
};
}
function cloneCheckpointSessionEntry(params: {
currentEntry: SessionEntry;
nextSessionId: string;
nextSessionFile: string;
label?: string;
parentSessionKey?: string;
totalTokens?: number;
preserveCompactionCheckpoints?: boolean;
}): SessionEntry {
return {
...params.currentEntry,
sessionId: params.nextSessionId,
sessionFile: params.nextSessionFile,
updatedAt: Date.now(),
systemSent: false,
abortedLastRun: false,
startedAt: undefined,
endedAt: undefined,
runtimeMs: undefined,
status: undefined,
inputTokens: undefined,
outputTokens: undefined,
cacheRead: undefined,
cacheWrite: undefined,
estimatedCostUsd: undefined,
totalTokens:
typeof params.totalTokens === "number" && Number.isFinite(params.totalTokens)
? params.totalTokens
: undefined,
totalTokensFresh:
typeof params.totalTokens === "number" && Number.isFinite(params.totalTokens)
? true
: undefined,
label: params.label ?? params.currentEntry.label,
parentSessionKey: params.parentSessionKey ?? params.currentEntry.parentSessionKey,
compactionCheckpoints: params.preserveCompactionCheckpoints
? params.currentEntry.compactionCheckpoints
: undefined,
};
}
async function branchCheckpointSessionFromStoredBoundary(
params: BranchCheckpointSessionParams,
): Promise<CompactionCheckpointSessionMutationResult> {
return await updateSessionStore(
params.storePath,
async (store) => {
const currentEntry = store[params.sourceStoreKey ?? params.sourceKey];
if (!currentEntry?.sessionId) {
return { status: "missing-session" };
}
const checkpoint = getSessionCompactionCheckpoint({
entry: currentEntry,
checkpointId: params.checkpointId,
});
if (!checkpoint) {
return { status: "missing-checkpoint" };
}
const forkedSession = await forkCheckpointTranscriptFromStoredBoundary({ checkpoint });
if (forkedSession.status !== "created") {
return forkedSession;
}
const forkedTranscript = forkedSession.transcript;
const label = currentEntry.label?.trim()
? `${currentEntry.label.trim()} (checkpoint)`
: "Checkpoint branch";
const nextEntry = cloneCheckpointSessionEntry({
currentEntry,
nextSessionId: forkedTranscript.sessionId,
nextSessionFile: forkedTranscript.sessionFile,
label,
parentSessionKey: params.sourceKey,
totalTokens: forkedTranscript.totalTokens,
});
store[params.nextKey] = nextEntry;
return {
status: "created",
key: params.nextKey,
checkpoint,
entry: nextEntry,
};
},
{ skipSaveWhenResult: (result) => result.status !== "created" },
);
}
async function restoreCheckpointSessionFromStoredBoundary(
params: RestoreCheckpointSessionParams,
): Promise<CompactionCheckpointSessionMutationResult> {
return await updateSessionStore(
params.storePath,
async (store) => {
const currentEntry = store[params.sessionStoreKey ?? params.sessionKey];
if (!currentEntry?.sessionId) {
return { status: "missing-session" };
}
const checkpoint = getSessionCompactionCheckpoint({
entry: currentEntry,
checkpointId: params.checkpointId,
});
if (!checkpoint) {
return { status: "missing-checkpoint" };
}
const restoredSession = await forkCheckpointTranscriptFromStoredBoundary({ checkpoint });
if (restoredSession.status !== "created") {
return restoredSession;
}
const restoredTranscript = restoredSession.transcript;
const nextEntry = cloneCheckpointSessionEntry({
currentEntry,
nextSessionId: restoredTranscript.sessionId,
nextSessionFile: restoredTranscript.sessionFile,
totalTokens: restoredTranscript.totalTokens,
preserveCompactionCheckpoints: true,
});
store[params.sessionKey] = nextEntry;
return {
status: "created",
key: params.sessionKey,
checkpoint,
entry: nextEntry,
};
},
{ skipSaveWhenResult: (result) => result.status !== "created" },
);
}
/**
* Creates the current file-backed compaction checkpoint domain store.
*
* The branch/restore operations own the transcript fork plus session entry
* update so a SQLite implementation can copy transcript rows and update
* `session_entries.entry_json` inside one write transaction.
*/
export function createFileBackedCompactionCheckpointStore(): CompactionCheckpointStore {
return {
captureSnapshot: captureCompactionCheckpointSnapshotAsync,
persistCheckpoint: persistSessionCompactionCheckpoint,
cleanupSnapshot: cleanupCompactionCheckpointSnapshot,
branchCheckpointSession: branchCheckpointSessionFromStoredBoundary,
restoreCheckpointSession: restoreCheckpointSessionFromStoredBoundary,
};
}
/**
* Capture the stable pre-compaction identity without duplicating the transcript.
* Branch/restore uses the compacted successor transcript, while legacy
@@ -488,21 +772,9 @@ async function cleanupTrimmedCompactionCheckpointFiles(params: {
}
}
export async function persistSessionCompactionCheckpoint(params: {
cfg: OpenClawConfig;
sessionKey: string;
sessionId: string;
reason: SessionCompactionCheckpointReason;
snapshot: CapturedCompactionCheckpointSnapshot;
summary?: string;
firstKeptEntryId?: string;
tokensBefore?: number;
tokensAfter?: number;
postSessionFile?: string;
postLeafId?: string;
postEntryId?: string;
createdAt?: number;
}): Promise<SessionCompactionCheckpoint | null> {
export async function persistSessionCompactionCheckpoint(
params: PersistSessionCompactionCheckpointParams,
): Promise<SessionCompactionCheckpoint | null> {
const snapshotSessionFile = params.snapshot.sessionFile?.trim();
const postSessionFile = params.postSessionFile?.trim();
const postSourceLeafId = params.postEntryId?.trim() || params.postLeafId?.trim();

View File

@@ -1,6 +1,10 @@
// Agent consult runtime tests cover consult session creation and runtime handoff.
import { afterEach, describe, expect, it, vi } from "vitest";
import type { RunEmbeddedAgentParams } from "../agents/embedded-agent-runner/run/params.js";
import type {
ForkSessionEntryFromParentParams,
ForkSessionEntryFromParentResult,
} from "../auto-reply/reply/session-fork.js";
import type { SessionEntry } from "../config/sessions/types.js";
import {
setRealtimeVoiceAgentConsultDepsForTest,
consultRealtimeVoiceAgent,
@@ -267,13 +271,47 @@ describe("realtime voice agent consult runtime", () => {
maxTokens: 100_000,
parentTokens: 100,
}));
const forkSessionFromParent = vi.fn(async () => ({
sessionId: "forked-session",
sessionFile: "/tmp/forked.jsonl",
}));
const forkSessionEntryFromParent = vi.fn(
async (
params: ForkSessionEntryFromParentParams,
): Promise<ForkSessionEntryFromParentResult> => {
const fork = {
sessionId: "forked-session",
sessionFile: "/tmp/forked.jsonl",
};
const parentEntry = sessionStore["agent:main:main"];
if (!parentEntry?.sessionId) {
return { status: "missing-parent" };
}
const typedParentEntry: SessionEntry = {
...parentEntry,
sessionId: parentEntry.sessionId,
updatedAt: parentEntry.updatedAt ?? Date.now(),
};
const decision = {
status: "fork" as const,
maxTokens: 100_000,
};
const entry = params.fallbackEntry ?? { sessionId: "", updatedAt: Date.now() };
const sessionEntry: SessionEntry = {
...entry,
...params.patch?.({ entry, parentEntry: typedParentEntry, fork, decision }),
sessionId: fork.sessionId,
sessionFile: fork.sessionFile,
forkedFromParent: true,
};
sessionStore[params.sessionKey] = sessionEntry;
return {
status: "forked" as const,
fork,
parentEntry: typedParentEntry,
sessionEntry,
decision,
};
},
);
setRealtimeVoiceAgentConsultDepsForTest({
resolveParentForkDecision,
forkSessionFromParent,
forkSessionEntryFromParent,
});
await consultRealtimeVoiceAgent({
@@ -293,15 +331,16 @@ describe("realtime voice agent consult runtime", () => {
userLabel: "Participant",
});
expect(resolveParentForkDecision).toHaveBeenCalledWith({
parentEntry: sessionStore["agent:main:main"],
storePath: "/tmp/sessions.json",
});
expect(forkSessionFromParent).toHaveBeenCalledWith({
parentEntry: sessionStore["agent:main:main"],
agentId: "main",
sessionsDir: "/tmp",
});
expect(resolveParentForkDecision).not.toHaveBeenCalled();
expect(forkSessionEntryFromParent).toHaveBeenCalledWith(
expect.objectContaining({
parentSessionKey: "agent:main:main",
agentId: "main",
config: {},
sessionKey: "agent:main:subagent:google-meet:meet-1",
}),
);
expect(runtime.session.patchSessionEntry).not.toHaveBeenCalled();
const forkedEntry = sessionStore["agent:main:subagent:google-meet:meet-1"];
if (!forkedEntry) {
throw new Error("Expected forked consult session entry");
@@ -316,7 +355,75 @@ describe("realtime voice agent consult runtime", () => {
expectPositiveTimestamp(forkedEntry.updatedAt);
const call = requireEmbeddedAgentCall(runEmbeddedAgent);
expect(call.sessionId).toBe("forked-session");
expect(call.sessionFile).toBe("/tmp/forked.jsonl");
expect(call.sessionFile).toBeUndefined();
expect(call.sessionTarget).toMatchObject({
agentId: "main",
sessionId: "forked-session",
sessionKey: "agent:main:subagent:google-meet:meet-1",
storePath: "/tmp/sessions.json",
});
expect(call.spawnedBy).toBe("agent:main:main");
});
it("falls back to a fresh isolated consult session when requester context is too large", async () => {
const { runtime, runEmbeddedAgent } = createAgentRuntime();
const warn = vi.fn();
const forkSessionEntryFromParent = vi.fn(
async (
params: ForkSessionEntryFromParentParams,
): Promise<ForkSessionEntryFromParentResult> => ({
status: "skipped",
reason: "decision-skip",
sessionEntry: {
...(params.fallbackEntry ?? { sessionId: "", updatedAt: Date.now() }),
sessionId: "",
updatedAt: Date.now(),
},
decision: {
status: "skip",
reason: "parent-too-large",
maxTokens: 100_000,
parentTokens: 150_000,
message:
"Parent context is too large to fork (150000/100000 tokens); starting with isolated context instead.",
},
}),
);
setRealtimeVoiceAgentConsultDepsForTest({
forkSessionEntryFromParent,
randomUUID: () => "00000000-0000-4000-8000-000000000000",
});
await consultRealtimeVoiceAgent({
cfg: {} as never,
agentRuntime: runtime as never,
logger: { warn },
agentId: "main",
sessionKey: "agent:main:subagent:google-meet:meet-1",
spawnedBy: "agent:main:main",
contextMode: "fork",
messageProvider: "google-meet",
lane: "google-meet",
runIdPrefix: "google-meet:meet-1",
args: { question: "What should I say?" },
transcript: [],
surface: "a private Google Meet",
userLabel: "Participant",
});
expect(warn).toHaveBeenCalledWith(
"[talk] Parent context is too large to fork (150000/100000 tokens); starting with isolated context instead.",
);
expect(runtime.session.patchSessionEntry).toHaveBeenCalled();
const call = requireEmbeddedAgentCall(runEmbeddedAgent);
expect(call.sessionId).toBe("00000000-0000-4000-8000-000000000000");
expect(call.sessionFile).toBeUndefined();
expect(call.sessionTarget).toMatchObject({
agentId: "main",
sessionId: "00000000-0000-4000-8000-000000000000",
sessionKey: "agent:main:subagent:google-meet:meet-1",
storePath: "/tmp/sessions.json",
});
expect(call.spawnedBy).toBe("agent:main:main");
});

View File

@@ -1,11 +1,7 @@
// Agent consult runtime starts agent consultation flows from talk sessions.
import { randomUUID } from "node:crypto";
import path from "node:path";
import type { RunEmbeddedAgentParams } from "../agents/embedded-agent-runner/run/params.js";
import {
forkSessionFromParent,
resolveParentForkDecision,
} from "../auto-reply/reply/session-fork.js";
import { forkSessionEntryFromParent } from "../auto-reply/reply/session-fork.js";
import { parseSessionThreadInfoFast } from "../config/sessions/thread-info.js";
import type { SessionEntry } from "../config/sessions/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
@@ -44,14 +40,12 @@ export {
type RealtimeVoiceAgentConsultDeps = {
randomUUID: typeof randomUUID;
resolveParentForkDecision: typeof resolveParentForkDecision;
forkSessionFromParent: typeof forkSessionFromParent;
forkSessionEntryFromParent: typeof forkSessionEntryFromParent;
};
const defaultRealtimeVoiceAgentConsultDeps: RealtimeVoiceAgentConsultDeps = {
randomUUID,
resolveParentForkDecision,
forkSessionFromParent,
forkSessionEntryFromParent,
};
let realtimeVoiceAgentConsultDeps = defaultRealtimeVoiceAgentConsultDeps;
@@ -133,6 +127,7 @@ function resolveRealtimeVoiceAgentDeliveryContext(params: {
async function resolveRealtimeVoiceAgentConsultSessionEntry(params: {
agentId: string;
cfg: OpenClawConfig;
sessionKey: string;
spawnedBy?: string | null;
contextMode?: RealtimeVoiceAgentConsultContextMode;
@@ -151,7 +146,37 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: {
(!requesterAgentId || requesterAgentId === params.agentId);
let forkDecisionWarning: string | undefined;
const patched = await params.agentRuntime.session.patchSessionEntry({
let patched: SessionEntry | null = null;
if (shouldFork) {
const forked = await realtimeVoiceAgentConsultDeps.forkSessionEntryFromParent({
storePath: params.storePath,
parentSessionKey: requesterSessionKey,
agentId: params.agentId,
config: params.cfg,
sessionKey: params.sessionKey,
fallbackEntry: {
sessionId: "",
updatedAt: now,
},
skipForkWhen: (entry) => Boolean(entry.sessionId?.trim()),
skipPatch: () => ({ ...deliveryFields, updatedAt: now }),
patch: () => ({
...deliveryFields,
spawnedBy: requesterSessionKey,
updatedAt: now,
}),
});
if (forked.status === "forked" || forked.status === "skipped") {
if (forked.status === "skipped" && forked.decision?.status === "skip") {
forkDecisionWarning = forked.decision.message;
}
if (forked.sessionEntry.sessionId?.trim()) {
patched = forked.sessionEntry;
}
}
}
patched ??= await params.agentRuntime.session.patchSessionEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
fallbackEntry: {
@@ -162,39 +187,6 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: {
if (entry.sessionId?.trim()) {
return { ...deliveryFields, updatedAt: now };
}
// Fork only from same-agent requester sessions. Cross-agent parent sessions may carry
// incompatible provider state, so they get a fresh consult session with spawnedBy linkage.
if (shouldFork) {
const parentEntry = params.agentRuntime.session.getSessionEntry({
storePath: params.storePath,
sessionKey: requesterSessionKey,
});
if (parentEntry?.sessionId?.trim()) {
const decision = await realtimeVoiceAgentConsultDeps.resolveParentForkDecision({
parentEntry,
storePath: params.storePath,
});
if (decision.status === "fork") {
const fork = await realtimeVoiceAgentConsultDeps.forkSessionFromParent({
parentEntry,
agentId: params.agentId,
sessionsDir: path.dirname(params.storePath),
});
if (fork) {
return {
...deliveryFields,
sessionId: fork.sessionId,
sessionFile: fork.sessionFile,
spawnedBy: requesterSessionKey,
forkedFromParent: true,
updatedAt: now,
};
}
} else {
forkDecisionWarning = decision.message;
}
}
}
return {
...deliveryFields,
sessionId: realtimeVoiceAgentConsultDeps.randomUUID(),
@@ -259,6 +251,7 @@ export async function consultRealtimeVoiceAgent(params: {
});
const sessionEntry = await resolveRealtimeVoiceAgentConsultSessionEntry({
agentId,
cfg: params.cfg,
sessionKey: params.sessionKey,
spawnedBy: params.spawnedBy,
contextMode: params.contextMode,
@@ -271,14 +264,17 @@ export async function consultRealtimeVoiceAgent(params: {
resolvedDeliveryContext ?? deliveryContextFromSession(sessionEntry);
const sessionId = sessionEntry.sessionId;
const sessionFile = params.agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, {
agentId,
});
// Voice consults suppress verbose/reasoning output because the bridge needs a short,
// speakable answer, not agent-run diagnostics or hidden reasoning artifacts.
const result = await params.agentRuntime.runEmbeddedAgent({
sessionId,
sessionKey: params.sessionKey,
sessionTarget: {
agentId,
sessionId,
sessionKey: params.sessionKey,
storePath,
},
sandboxSessionKey: resolveRealtimeVoiceAgentSandboxSessionKey(agentId, params.sessionKey),
agentId,
spawnedBy: params.spawnedBy,
@@ -291,7 +287,6 @@ export async function consultRealtimeVoiceAgent(params: {
consultDeliveryContext?.threadId != null
? String(consultDeliveryContext.threadId)
: undefined,
sessionFile,
workspaceDir,
config: params.cfg,
prompt: buildRealtimeVoiceAgentConsultPrompt({

View File

@@ -111,6 +111,19 @@ describe("gateway CPU scenario guard", () => {
}
});
it("rejects duplicate single-value controls before running scenarios", () => {
expect(() =>
testing.parseArgs(["--output-dir", makeTempRoot(), "--output-dir", makeTempRoot()]),
).toThrow("--output-dir was provided more than once");
const result = runCli("--runs", "1", "--runs", "2");
expect(result.status).toBe(1);
expect(result.stdout).toBe("");
expect(result.stderr.trim()).toBe("--runs was provided more than once");
expectNoNodeStack(result.stderr);
});
it("reports CLI argument errors without a Node stack trace", () => {
const result = runCli("--wat");

View File

@@ -1,5 +1,6 @@
// Ci Workflow Guards tests cover ci workflow guards script behavior.
import { readdirSync, readFileSync } from "node:fs";
import { execFileSync } from "node:child_process";
import { existsSync, readdirSync, readFileSync } from "node:fs";
import { describe, expect, it } from "vitest";
import { parse } from "yaml";
@@ -37,10 +38,17 @@ function readCriticalQualityWorkflow() {
return readFileSync(".github/workflows/codeql-critical-quality.yml", "utf8");
}
function readAndroidCompileSdk(path: string): number {
const match = readFileSync(path, "utf8").match(/^\s*compileSdk\s*=\s*(\d+)\s*$/mu);
function readTrackedText(relativePath: string): string {
if (existsSync(relativePath)) {
return readFileSync(relativePath, "utf8");
}
return execFileSync("git", ["show", `:${relativePath}`], { encoding: "utf8" });
}
function readAndroidCompileSdk(relativePath: string): number {
const match = readTrackedText(relativePath).match(/^\s*compileSdk\s*=\s*(\d+)\s*$/mu);
if (!match) {
throw new Error(`Missing compileSdk in ${path}`);
throw new Error(`Missing compileSdk in ${relativePath}`);
}
return Number(match[1]);
}
@@ -727,18 +735,33 @@ describe("ci workflow guards", () => {
expect(openDocsPrStep.if).toBe("${{ github.event_name == 'workflow_dispatch' }}");
});
it("runs maturity scorecard from release checks", () => {
it("keeps maturity scorecard release docs opt-in from release checks", () => {
const releaseWorkflow = readReleaseChecksWorkflow();
const job = releaseWorkflow.jobs.maturity_scorecard_release_checks;
const summaryJob = releaseWorkflow.jobs.summary;
const verifyStep = summaryJob.steps.find(
(step) => step.name === "Verify release check results",
);
const inputs = releaseWorkflow.on.workflow_dispatch.inputs;
const resolveJob = releaseWorkflow.jobs.resolve_target;
const summarizeStep = resolveJob.steps.find((step) => step.name === "Summarize validated ref");
expect(releaseWorkflow.jobs).not.toHaveProperty("qa_profile_release_evidence_release_checks");
expect(inputs.run_maturity_scorecard).toMatchObject({
required: false,
default: false,
type: "boolean",
});
expect(resolveJob.outputs.run_maturity_scorecard).toBe(
"${{ steps.inputs.outputs.run_maturity_scorecard }}",
);
expect(summarizeStep.env.RUN_MATURITY_SCORECARD).toBe(
"${{ steps.inputs.outputs.run_maturity_scorecard }}",
);
expect(summarizeStep.run).toContain("- Maturity scorecard docs:");
expect(job.name).toBe("Render maturity scorecard release docs");
expect(job.if).toBe(
'contains(fromJSON(\'["all","qa"]\'), needs.resolve_target.outputs.rerun_group)',
"contains(fromJSON('[\"all\",\"qa\"]'), needs.resolve_target.outputs.rerun_group) && needs.resolve_target.outputs.run_maturity_scorecard == 'true'",
);
expect(job.permissions).toMatchObject({
actions: "read",

View File

@@ -182,6 +182,12 @@ describe("scripts/measure-rpc-rtt.mjs", () => {
expect(() =>
parseArgs(["--output-dir", "/tmp/rpc-rtt", "--methods", "health, config.get,health"]),
).toThrow("--methods contains duplicate gateway method: health");
expect(() => parseArgs(["--output-dir", "/tmp/one", "--output-dir", "/tmp/two"])).toThrow(
"--output-dir was provided more than once.",
);
expect(() =>
parseArgs(["--output-dir", "/tmp/rpc-rtt", "--methods", "health", "--methods", "config.get"]),
).toThrow("--methods was provided more than once.");
for (const value of ["--methods", "-h"]) {
for (const flag of ["--output-dir", "--repo-root", "--iterations", "--methods"]) {
expect(() => parseArgs([flag, value, "health"])).toThrow(`${flag} requires a value.`);

View File

@@ -150,6 +150,15 @@ describe("plugin gateway gauntlet helpers", () => {
);
});
it("rejects duplicate single-value controls", () => {
expect(() =>
parseArgs(["--output-dir", ".artifacts/one", "--output-dir", ".artifacts/two"]),
).toThrow("--output-dir was provided more than once");
expect(() => parseArgs(["--shard-total", "2", "--shard-total", "3"])).toThrow(
"--shard-total was provided more than once",
);
});
it("rejects valued flags followed by another option", () => {
for (const flag of [
"--repo-root",