mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-24 08:12:29 +08:00
Compare commits
19 Commits
perf/codex
...
dallin/mat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3674ec478e | ||
|
|
0dfa22c6e0 | ||
|
|
6f80552ee9 | ||
|
|
258b83c438 | ||
|
|
d095d98a02 | ||
|
|
9ff7abc898 | ||
|
|
dc9c11be91 | ||
|
|
58552f6d7c | ||
|
|
b8811b7dde | ||
|
|
0850d83de1 | ||
|
|
92c10d4edc | ||
|
|
b22ae2a4da | ||
|
|
a822c9abaa | ||
|
|
c308295cd3 | ||
|
|
524e19726f | ||
|
|
bc243568e7 | ||
|
|
2cbb4e70cc | ||
|
|
e9b017d9dc | ||
|
|
bde5be874a |
1
.github/labeler.yml
vendored
1
.github/labeler.yml
vendored
@@ -118,6 +118,7 @@
|
||||
- any-glob-to-any-file:
|
||||
- "extensions/qa-lab/**"
|
||||
- "qa/scenarios/**"
|
||||
- "docs/maturity/**"
|
||||
- "docs/concepts/qa-e2e-automation.md"
|
||||
- "docs/concepts/personal-agent-benchmark-pack.md"
|
||||
- "docs/channels/qa-channel.md"
|
||||
|
||||
18
.github/workflows/openclaw-release-checks.yml
vendored
18
.github/workflows/openclaw-release-checks.yml
vendored
@@ -44,6 +44,11 @@ on:
|
||||
required: false
|
||||
default: false
|
||||
type: boolean
|
||||
run_maturity_scorecard:
|
||||
description: Render advisory maturity scorecard release docs; default release checks rely on dedicated package, QA, live, and E2E gates
|
||||
required: false
|
||||
default: false
|
||||
type: boolean
|
||||
rerun_group:
|
||||
description: Release check group to run
|
||||
required: false
|
||||
@@ -106,6 +111,7 @@ jobs:
|
||||
mode: ${{ steps.inputs.outputs.mode }}
|
||||
release_profile: ${{ steps.inputs.outputs.release_profile }}
|
||||
run_release_soak: ${{ steps.inputs.outputs.run_release_soak }}
|
||||
run_maturity_scorecard: ${{ steps.inputs.outputs.run_maturity_scorecard }}
|
||||
rerun_group: ${{ steps.inputs.outputs.rerun_group }}
|
||||
live_suite_filter: ${{ steps.inputs.outputs.live_suite_filter }}
|
||||
cross_os_suite_filter: ${{ steps.inputs.outputs.cross_os_suite_filter }}
|
||||
@@ -279,6 +285,7 @@ jobs:
|
||||
RELEASE_MODE_INPUT: ${{ inputs.mode }}
|
||||
RELEASE_PROFILE_INPUT: ${{ inputs.release_profile }}
|
||||
RELEASE_RUN_RELEASE_SOAK_INPUT: ${{ inputs.run_release_soak }}
|
||||
RELEASE_RUN_MATURITY_SCORECARD_INPUT: ${{ inputs.run_maturity_scorecard }}
|
||||
RELEASE_RERUN_GROUP_INPUT: ${{ inputs.rerun_group }}
|
||||
RELEASE_LIVE_SUITE_FILTER_INPUT: ${{ inputs.live_suite_filter }}
|
||||
RELEASE_CROSS_OS_SUITE_FILTER_INPUT: ${{ inputs.cross_os_suite_filter }}
|
||||
@@ -319,6 +326,12 @@ jobs:
|
||||
else
|
||||
run_release_soak=true
|
||||
fi
|
||||
run_maturity_scorecard="$(printf '%s' "$RELEASE_RUN_MATURITY_SCORECARD_INPUT" | tr '[:upper:]' '[:lower:]')"
|
||||
if [[ "$run_maturity_scorecard" != "true" && "$run_maturity_scorecard" != "1" && "$run_maturity_scorecard" != "yes" ]]; then
|
||||
run_maturity_scorecard=false
|
||||
else
|
||||
run_maturity_scorecard=true
|
||||
fi
|
||||
release_profile="$RELEASE_PROFILE_INPUT"
|
||||
if [[ "$release_profile" == "minimum" ]]; then
|
||||
release_profile=beta
|
||||
@@ -422,6 +435,7 @@ jobs:
|
||||
printf 'mode=%s\n' "$RELEASE_MODE_INPUT"
|
||||
printf 'release_profile=%s\n' "$release_profile"
|
||||
printf 'run_release_soak=%s\n' "$run_release_soak"
|
||||
printf 'run_maturity_scorecard=%s\n' "$run_maturity_scorecard"
|
||||
printf 'rerun_group=%s\n' "$RELEASE_RERUN_GROUP_INPUT"
|
||||
printf 'live_suite_filter=%s\n' "$RELEASE_LIVE_SUITE_FILTER_INPUT"
|
||||
printf 'cross_os_suite_filter=%s\n' "$RELEASE_CROSS_OS_SUITE_FILTER_INPUT"
|
||||
@@ -444,6 +458,7 @@ jobs:
|
||||
RELEASE_MODE: ${{ inputs.mode }}
|
||||
RELEASE_PROFILE: ${{ steps.inputs.outputs.release_profile }}
|
||||
RUN_RELEASE_SOAK: ${{ steps.inputs.outputs.run_release_soak }}
|
||||
RUN_MATURITY_SCORECARD: ${{ steps.inputs.outputs.run_maturity_scorecard }}
|
||||
RELEASE_RERUN_GROUP: ${{ inputs.rerun_group }}
|
||||
RELEASE_LIVE_SUITE_FILTER: ${{ inputs.live_suite_filter }}
|
||||
RELEASE_CROSS_OS_SUITE_FILTER: ${{ inputs.cross_os_suite_filter }}
|
||||
@@ -461,6 +476,7 @@ jobs:
|
||||
echo "- Cross-OS mode: \`${RELEASE_MODE}\`"
|
||||
echo "- Release profile: \`${RELEASE_PROFILE}\`"
|
||||
echo "- Release soak lanes: \`${RUN_RELEASE_SOAK}\`"
|
||||
echo "- Maturity scorecard docs: \`${RUN_MATURITY_SCORECARD}\`"
|
||||
echo "- Rerun group: \`${RELEASE_RERUN_GROUP}\`"
|
||||
if [[ -n "${RELEASE_LIVE_SUITE_FILTER// }" ]]; then
|
||||
echo "- Live suite filter: \`${RELEASE_LIVE_SUITE_FILTER}\`"
|
||||
@@ -770,7 +786,7 @@ jobs:
|
||||
maturity_scorecard_release_checks:
|
||||
name: Render maturity scorecard release docs
|
||||
needs: [resolve_target]
|
||||
if: contains(fromJSON('["all","qa"]'), needs.resolve_target.outputs.rerun_group)
|
||||
if: contains(fromJSON('["all","qa"]'), needs.resolve_target.outputs.rerun_group) && needs.resolve_target.outputs.run_maturity_scorecard == 'true'
|
||||
permissions:
|
||||
actions: read
|
||||
contents: read
|
||||
|
||||
@@ -24,6 +24,14 @@ This directory owns docs authoring, Mintlify link rules, and docs i18n policy.
|
||||
- `scripts/docs-sync-publish.mjs` excludes and prunes `docs/internal/**` from the public `openclaw/docs` publish repo if a page is force-added later.
|
||||
- Internal docs may mention repo paths, private app names, 1Password item names, and runbooks, but never include secret values.
|
||||
|
||||
## Maturity Scorecard Editing
|
||||
|
||||
`taxonomy.yaml` and `qa/maturity-scores.yaml` are the source inputs; generated maturity docs under `docs/maturity/` are projections and should not be hand-edited for score, LTS, taxonomy, QA profile, or evidence tables.
|
||||
`scripts/qa/render-maturity-docs.ts` owns generation; use `pnpm maturity:render` to refresh committed docs and `pnpm maturity:check` to verify them.
|
||||
`.github/workflows/maturity-scorecard.yml` renders artifact previews and can open generated-doc PRs; `.github/workflows/openclaw-release-checks.yml` dispatches it for release QA.
|
||||
Keep deterministic `qa-evidence.json.scorecard` data in GitHub Actions artifacts unless a maintainer explicitly asks for a sanitized committed projection.
|
||||
Human overrides must change source state in a PR and explain the reason plus public or redacted evidence.
|
||||
|
||||
## Docs i18n
|
||||
|
||||
- Foreign-language docs are not maintained in this repo. The generated publish output lives in the separate `openclaw/docs` repo (often cloned locally as `../openclaw-docs`).
|
||||
|
||||
@@ -966,6 +966,7 @@ output and whose artifact paths are resolved relative to that producer
|
||||
`qa run --qa-profile`, the same `qa-evidence.json` also includes the profile
|
||||
scorecard summary for the selected taxonomy categories.
|
||||
Treat it as a discovery aid, not a gate replacement; the selected scenario still needs the right provider mode, live transport, Multipass, Testbox, or release lane for the behavior under test.
|
||||
For scorecard context, see [Maturity scorecard](/maturity/scorecard).
|
||||
|
||||
For character and style checks, run the same scenario across multiple live model
|
||||
refs and write a judged Markdown report:
|
||||
@@ -1023,6 +1024,7 @@ When no `--judge-model` is passed, the judges default to
|
||||
## Related docs
|
||||
|
||||
- [Matrix QA](/concepts/qa-matrix)
|
||||
- [Maturity scorecard](/maturity/scorecard)
|
||||
- [Personal agent benchmark pack](/concepts/personal-agent-benchmark-pack)
|
||||
- [QA Channel](/channels/qa-channel)
|
||||
- [Testing](/help/testing)
|
||||
|
||||
@@ -20,6 +20,7 @@ of Docker runners. This doc is a "how we test" guide:
|
||||
|
||||
- [QA overview](/concepts/qa-e2e-automation) - architecture, command surface, scenario authoring.
|
||||
- [Matrix QA](/concepts/qa-matrix) - reference for `pnpm openclaw qa matrix`.
|
||||
- [Maturity scorecard](/maturity/scorecard) - how release QA evidence supports stability and LTS decisions.
|
||||
- [QA channel](/channels/qa-channel) - the synthetic transport plugin used by repo-backed scenarios.
|
||||
|
||||
This page covers running the regular test suites and Docker/Parallels runners. The QA-specific runners section below ([QA-specific runners](#qa-specific-runners)) lists the concrete `qa` invocations and points back at the references above.
|
||||
|
||||
@@ -78,13 +78,16 @@ type CodexWorkspaceBootstrapContext = CodexBootstrapContext & {
|
||||
};
|
||||
|
||||
/** Reads mirrored Codex session history for harness hooks. */
|
||||
export async function readMirroredSessionHistoryMessages(
|
||||
sessionFile: string,
|
||||
): Promise<AgentMessage[] | undefined> {
|
||||
const messages = await readCodexMirroredSessionHistoryMessages(sessionFile);
|
||||
export async function readMirroredSessionHistoryMessages(params: {
|
||||
agentId?: string;
|
||||
sessionFile: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
}): Promise<AgentMessage[] | undefined> {
|
||||
const messages = await readCodexMirroredSessionHistoryMessages(params);
|
||||
if (!messages) {
|
||||
embeddedAgentLog.warn("failed to read mirrored session history for codex harness hooks", {
|
||||
sessionFile,
|
||||
sessionFile: params.sessionFile,
|
||||
});
|
||||
}
|
||||
return messages;
|
||||
|
||||
@@ -1827,7 +1827,14 @@ export class CodexAppServerEventProjector {
|
||||
}
|
||||
|
||||
private async readMirroredSessionMessages(): Promise<AgentMessage[]> {
|
||||
return (await readCodexMirroredSessionHistoryMessages(this.params.sessionFile)) ?? [];
|
||||
return (
|
||||
(await readCodexMirroredSessionHistoryMessages({
|
||||
agentId: this.params.agentId,
|
||||
sessionFile: this.params.sessionFile,
|
||||
sessionId: this.params.sessionId,
|
||||
sessionKey: this.params.sessionKey,
|
||||
})) ?? []
|
||||
);
|
||||
}
|
||||
|
||||
private createAssistantMessage(text: string): AssistantMessage {
|
||||
|
||||
@@ -91,9 +91,6 @@ const DEFAULT_COMPLETION_DELIVERY_RETRY_DELAYS_MS = [
|
||||
];
|
||||
const DEFAULT_TASK_ROW_RECONCILE_INTERVAL_MS = 10_000;
|
||||
const RECENT_TERMINAL_TASK_RECONCILE_GRACE_MS = 60_000;
|
||||
// Codex's recorder uses this filename contract; non-canonical names keep the
|
||||
// legacy substring fallback for older or test-created transcript files.
|
||||
const CODEX_ROLLOUT_FILENAME_RE = /^rollout-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}-(.+)\.jsonl$/u;
|
||||
|
||||
const defaultRuntime: NativeSubagentMonitorRuntime = {
|
||||
createAgentHarnessTaskRuntime,
|
||||
@@ -1191,9 +1188,8 @@ async function findTranscriptPaths(params: {
|
||||
}): Promise<Map<string, string>> {
|
||||
const sessionsDir = path.join(params.codexHome, "sessions");
|
||||
const found = new Map<string, string>();
|
||||
const remaining = new Set(params.childThreadIds);
|
||||
const stack = [sessionsDir];
|
||||
while (stack.length > 0 && remaining.size > 0) {
|
||||
while (stack.length > 0 && found.size < params.childThreadIds.size) {
|
||||
const dir = stack.pop()!;
|
||||
let entries: Array<{ name: string; isDirectory(): boolean; isFile(): boolean }>;
|
||||
try {
|
||||
@@ -1210,20 +1206,10 @@ async function findTranscriptPaths(params: {
|
||||
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
|
||||
continue;
|
||||
}
|
||||
const rolloutMatch = entry.name.match(CODEX_ROLLOUT_FILENAME_RE);
|
||||
if (rolloutMatch) {
|
||||
const childThreadId = rolloutMatch[1];
|
||||
if (remaining.delete(childThreadId)) {
|
||||
for (const childThreadId of params.childThreadIds) {
|
||||
if (!found.has(childThreadId) && entry.name.includes(childThreadId)) {
|
||||
found.set(childThreadId, entryPath);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
for (const childThreadId of remaining) {
|
||||
if (entry.name.includes(childThreadId)) {
|
||||
found.set(childThreadId, entryPath);
|
||||
remaining.delete(childThreadId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1250,13 +1236,10 @@ async function findTranscriptPath(params: {
|
||||
stack.push(entryPath);
|
||||
continue;
|
||||
}
|
||||
const rolloutMatch = entry.name.match(CODEX_ROLLOUT_FILENAME_RE);
|
||||
if (
|
||||
entry.isFile() &&
|
||||
entry.name.endsWith(".jsonl") &&
|
||||
(rolloutMatch
|
||||
? rolloutMatch[1] === params.childThreadId
|
||||
: entry.name.includes(params.childThreadId))
|
||||
entry.name.includes(params.childThreadId)
|
||||
) {
|
||||
return entryPath;
|
||||
}
|
||||
|
||||
@@ -849,7 +849,16 @@ export async function runCodexAppServerAttempt(
|
||||
},
|
||||
});
|
||||
const hadSessionFile = await pathExists(activeSessionFile);
|
||||
let historyMessages = (await readMirroredSessionHistoryMessages(activeSessionFile)) ?? [];
|
||||
const activeTranscriptTarget = {
|
||||
agentId: sessionAgentId,
|
||||
sessionFile: activeSessionFile,
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: contextSessionKey,
|
||||
};
|
||||
let historyMessages =
|
||||
!activeContextEngine && initialStartupBindingHadInactiveThreadBootstrap
|
||||
? []
|
||||
: ((await readMirroredSessionHistoryMessages(activeTranscriptTarget)) ?? []);
|
||||
const hookContextWindowFields = {
|
||||
...(params.contextWindowInfo?.tokens
|
||||
? { contextTokenBudget: params.contextWindowInfo.tokens }
|
||||
@@ -907,7 +916,7 @@ export async function runCodexAppServerAttempt(
|
||||
warn: (message) => embeddedAgentLog.warn(message),
|
||||
});
|
||||
historyMessages =
|
||||
(await readMirroredSessionHistoryMessages(activeSessionFile)) ?? historyMessages;
|
||||
(await readMirroredSessionHistoryMessages(activeTranscriptTarget)) ?? historyMessages;
|
||||
}
|
||||
const memoryToolNames = getCodexWorkspaceMemoryToolNames(toolBridge.availableSpecs);
|
||||
const workspaceBootstrapContext = await buildCodexWorkspaceBootstrapContext({
|
||||
@@ -3039,7 +3048,7 @@ export async function runCodexAppServerAttempt(
|
||||
const activeContextEnginePluginIdLocal =
|
||||
resolveContextEngineOwnerPluginId(activeContextEngine);
|
||||
const finalMessages =
|
||||
(await readMirroredSessionHistoryMessages(activeSessionFile)) ??
|
||||
(await readMirroredSessionHistoryMessages(activeTranscriptTarget)) ??
|
||||
historyMessages.concat(result.messagesSnapshot);
|
||||
await finalizeHarnessContextEngineTurn({
|
||||
contextEngine: activeContextEngine,
|
||||
|
||||
@@ -51,6 +51,14 @@ function messageEntry(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function mirroredTarget(sessionFile: string) {
|
||||
return {
|
||||
sessionFile,
|
||||
sessionId: "codex-session",
|
||||
sessionKey: "codex-session",
|
||||
};
|
||||
}
|
||||
|
||||
describe("readCodexMirroredSessionHistoryMessages", () => {
|
||||
it("replays only the branch selected by a leaf control", async () => {
|
||||
const sessionFile = await writeSession([
|
||||
@@ -75,7 +83,9 @@ describe("readCodexMirroredSessionHistoryMessages", () => {
|
||||
},
|
||||
]);
|
||||
|
||||
await expect(readCodexMirroredSessionHistoryMessages(sessionFile)).resolves.toMatchObject([
|
||||
await expect(
|
||||
readCodexMirroredSessionHistoryMessages(mirroredTarget(sessionFile)),
|
||||
).resolves.toMatchObject([
|
||||
{ role: "user", content: "root prompt" },
|
||||
{ role: "assistant", content: "active answer" },
|
||||
]);
|
||||
@@ -93,7 +103,9 @@ describe("readCodexMirroredSessionHistoryMessages", () => {
|
||||
},
|
||||
]);
|
||||
|
||||
await expect(readCodexMirroredSessionHistoryMessages(sessionFile)).resolves.toEqual([]);
|
||||
await expect(
|
||||
readCodexMirroredSessionHistoryMessages(mirroredTarget(sessionFile)),
|
||||
).resolves.toEqual([]);
|
||||
});
|
||||
|
||||
it("keeps visible history when continuation rows use a disjoint append cursor", async () => {
|
||||
@@ -125,7 +137,9 @@ describe("readCodexMirroredSessionHistoryMessages", () => {
|
||||
}),
|
||||
]);
|
||||
|
||||
await expect(readCodexMirroredSessionHistoryMessages(sessionFile)).resolves.toMatchObject([
|
||||
await expect(
|
||||
readCodexMirroredSessionHistoryMessages(mirroredTarget(sessionFile)),
|
||||
).resolves.toMatchObject([
|
||||
{ role: "user", content: "visible prompt" },
|
||||
{ role: "assistant", content: "continued answer" },
|
||||
]);
|
||||
@@ -154,7 +168,9 @@ describe("readCodexMirroredSessionHistoryMessages", () => {
|
||||
}),
|
||||
]);
|
||||
|
||||
await expect(readCodexMirroredSessionHistoryMessages(sessionFile)).resolves.toMatchObject([
|
||||
await expect(
|
||||
readCodexMirroredSessionHistoryMessages(mirroredTarget(sessionFile)),
|
||||
).resolves.toMatchObject([
|
||||
{ role: "user", content: "visible prompt" },
|
||||
{ role: "assistant", content: "continued answer" },
|
||||
]);
|
||||
|
||||
@@ -10,40 +10,59 @@ import {
|
||||
migrateSessionEntries,
|
||||
parseSessionEntries,
|
||||
} from "openclaw/plugin-sdk/agent-sessions";
|
||||
import {
|
||||
resolveSessionTranscriptTarget,
|
||||
type SessionTranscriptTargetParams,
|
||||
} from "openclaw/plugin-sdk/session-transcript-runtime";
|
||||
import { sanitizeCodexHistoryImagePayloads } from "./image-payload-sanitizer.js";
|
||||
|
||||
function isMissingFileError(error: unknown): boolean {
|
||||
return Boolean(
|
||||
error &&
|
||||
typeof error === "object" &&
|
||||
"code" in error &&
|
||||
(error as { code?: unknown }).code === "ENOENT",
|
||||
);
|
||||
}
|
||||
export type CodexMirroredSessionHistoryTarget = {
|
||||
agentId?: string;
|
||||
sessionFile: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
};
|
||||
|
||||
/** Returns sanitized session-context messages for a Codex mirrored session file. */
|
||||
export async function readCodexMirroredSessionHistoryMessages(
|
||||
sessionFile: string,
|
||||
target: CodexMirroredSessionHistoryTarget,
|
||||
): Promise<AgentMessage[] | undefined> {
|
||||
try {
|
||||
const raw = await fs.readFile(sessionFile, "utf-8");
|
||||
await resolveSessionTranscriptTarget(resolveCodexHistoryTranscriptTarget(target));
|
||||
const raw = await fs.readFile(target.sessionFile, "utf-8");
|
||||
const entries = parseSessionEntries(raw);
|
||||
if (entries.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const firstEntry = entries[0] as { type?: unknown; id?: unknown } | undefined;
|
||||
if (firstEntry?.type !== "session" || typeof firstEntry.id !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
migrateSessionEntries(entries);
|
||||
const sessionEntries = entries.filter(
|
||||
(entry): entry is SessionEntry => entry.type !== "session",
|
||||
);
|
||||
migrateSessionEntries(entries as SessionEntry[]);
|
||||
const sessionEntries = entries.filter((entry): entry is SessionEntry => {
|
||||
return (
|
||||
entry !== null &&
|
||||
typeof entry === "object" &&
|
||||
!Array.isArray(entry) &&
|
||||
(entry as { type?: unknown }).type !== "session"
|
||||
);
|
||||
});
|
||||
return sanitizeCodexHistoryImagePayloads(
|
||||
buildSessionContext(sessionEntries).messages,
|
||||
"codex mirrored history",
|
||||
);
|
||||
} catch (error) {
|
||||
if (isMissingFileError(error)) {
|
||||
return [];
|
||||
}
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function resolveCodexHistoryTranscriptTarget(
|
||||
target: CodexMirroredSessionHistoryTarget,
|
||||
): SessionTranscriptTargetParams {
|
||||
return {
|
||||
...(target.agentId ? { agentId: target.agentId } : {}),
|
||||
sessionFile: target.sessionFile,
|
||||
sessionId: target.sessionId,
|
||||
sessionKey: target.sessionKey ?? "",
|
||||
};
|
||||
}
|
||||
|
||||
@@ -21,13 +21,14 @@ import {
|
||||
mirrorCodexAppServerTranscript,
|
||||
} from "./transcript-mirror.js";
|
||||
|
||||
const emitSessionTranscriptUpdateMock = vi.hoisted(() => vi.fn());
|
||||
const publishSessionTranscriptUpdateByIdentityMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/agent-harness-runtime", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/agent-harness-runtime")>();
|
||||
vi.mock("openclaw/plugin-sdk/session-transcript-runtime", async (importOriginal) => {
|
||||
const actual =
|
||||
await importOriginal<typeof import("openclaw/plugin-sdk/session-transcript-runtime")>();
|
||||
return {
|
||||
...actual,
|
||||
emitSessionTranscriptUpdate: emitSessionTranscriptUpdateMock,
|
||||
publishSessionTranscriptUpdateByIdentity: publishSessionTranscriptUpdateByIdentityMock,
|
||||
};
|
||||
});
|
||||
|
||||
@@ -44,7 +45,7 @@ const tempDirs: string[] = [];
|
||||
|
||||
afterEach(async () => {
|
||||
resetGlobalHookRunner();
|
||||
emitSessionTranscriptUpdateMock.mockReset();
|
||||
publishSessionTranscriptUpdateByIdentityMock.mockReset();
|
||||
for (const dir of tempDirs.splice(0)) {
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
}
|
||||
@@ -130,6 +131,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [userMessage, assistantMessage, toolResultMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
@@ -164,30 +166,32 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
const firstMirror = await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:main",
|
||||
messages: [userMessage],
|
||||
idempotencyScope: "codex-app-server:thread-1",
|
||||
});
|
||||
const secondMirror = await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:main",
|
||||
messages: [userMessage],
|
||||
idempotencyScope: "codex-app-server:thread-1",
|
||||
});
|
||||
|
||||
const updates = emitSessionTranscriptUpdateMock.mock.calls.map(
|
||||
([update]) => update as Record<string, unknown>,
|
||||
const updates = publishSessionTranscriptUpdateByIdentityMock.mock.calls.map(
|
||||
([update]) => update as Record<string, unknown> & { update?: Record<string, unknown> },
|
||||
);
|
||||
expect(updates).toHaveLength(1);
|
||||
expect(updates[0]?.sessionFile).toBe(sessionFile);
|
||||
expect(updates[0]?.sessionKey).toBe("agent:main:main");
|
||||
expect(updates[0]?.messageId).toEqual(expect.any(String));
|
||||
expect(updates[0]?.message).toMatchObject({
|
||||
expect(updates[0]?.update?.messageId).toEqual(expect.any(String));
|
||||
expect(updates[0]?.update?.message).toMatchObject({
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "show me live" }],
|
||||
idempotencyKey: "codex-app-server:thread-1:turn-1:prompt",
|
||||
});
|
||||
expect(updates[0]?.messageSeq).toBe(1);
|
||||
expect(updates[0]?.update?.messageSeq).toBe(1);
|
||||
expect(firstMirror.userMessagesPresent).toHaveLength(1);
|
||||
expect(firstMirror.userMessagesPresent[0]).toMatchObject({
|
||||
role: "user",
|
||||
@@ -207,6 +211,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:main",
|
||||
messages: [
|
||||
attachCodexMirrorIdentity(
|
||||
@@ -227,14 +232,16 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
idempotencyScope: "codex-app-server:thread-1",
|
||||
});
|
||||
|
||||
const updates = emitSessionTranscriptUpdateMock.mock.calls.map(
|
||||
([update]) => update as Record<string, unknown>,
|
||||
const updates = publishSessionTranscriptUpdateByIdentityMock.mock.calls.map(
|
||||
([update]) => update as Record<string, unknown> & { update?: Record<string, unknown> },
|
||||
);
|
||||
expect(updates.map((update) => update.messageSeq)).toEqual([1, 2]);
|
||||
expect(updates.map((update) => (update.message as { role?: string }).role)).toEqual([
|
||||
"user",
|
||||
"assistant",
|
||||
]);
|
||||
expect(updates.map((update) => update.update?.messageSeq)).toEqual([1, 2]);
|
||||
expect(
|
||||
updates.map((update) => {
|
||||
const message = update.update?.message as { role?: string } | undefined;
|
||||
return message?.role;
|
||||
}),
|
||||
).toEqual(["user", "assistant"]);
|
||||
});
|
||||
|
||||
it("creates the transcript directory on first mirror", async () => {
|
||||
@@ -243,6 +250,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
@@ -273,12 +281,14 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [...messages],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [...messages],
|
||||
idempotencyScope: "scope-1",
|
||||
@@ -312,6 +322,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [sourceMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
@@ -348,12 +359,14 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
const first = await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [sourceMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
const second = await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [sourceMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
@@ -394,6 +407,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [sourceMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
@@ -419,6 +433,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
@@ -456,6 +471,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
@@ -534,6 +550,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [userMessage, assistantMessage],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
@@ -547,6 +564,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
);
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [userMessage, reasoningMessage, assistantMessage],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
@@ -595,12 +613,14 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [userTurn1, assistantTurn1],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
});
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [userTurn2, assistantTurn2],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
@@ -638,6 +658,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
);
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [userTurn1, assistantTurn1],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
@@ -661,6 +682,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
// turn 1's entries (with their original identities preserved).
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [userTurn1, assistantTurn1, userTurn2, assistantTurn2],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
@@ -691,6 +713,7 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [userMessage, assistantMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
// Codex plugin module implements transcript mirror behavior.
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
appendSessionTranscriptMessage,
|
||||
embeddedAgentLog,
|
||||
emitSessionTranscriptUpdate,
|
||||
formatErrorMessage,
|
||||
resolveSessionWriteLockOptions,
|
||||
runAgentHarnessBeforeMessageWriteHook,
|
||||
type AgentMessage,
|
||||
type EmbeddedRunAttemptParams,
|
||||
type EmbeddedRunAttemptResult,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
} from "openclaw/plugin-sdk/agent-harness-runtime";
|
||||
import {
|
||||
publishSessionTranscriptUpdateByIdentity,
|
||||
withSessionTranscriptWriteLock,
|
||||
type SessionTranscriptTargetParams,
|
||||
type SessionTranscriptWriteLockParams,
|
||||
} from "openclaw/plugin-sdk/session-transcript-runtime";
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
|
||||
type MirroredAgentMessage = Extract<AgentMessage, { role: "user" | "assistant" | "toolResult" }>;
|
||||
@@ -273,13 +273,13 @@ function buildMirrorDedupeIdentity(message: MirroredAgentMessage): string {
|
||||
|
||||
export async function mirrorCodexAppServerTranscript(params: {
|
||||
sessionFile: string;
|
||||
sessionId?: string;
|
||||
sessionId: string;
|
||||
cwd?: string;
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
messages: AgentMessage[];
|
||||
idempotencyScope?: string;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
config?: SessionTranscriptWriteLockParams["config"];
|
||||
}): Promise<CodexAppServerTranscriptMirrorResult> {
|
||||
const messages = params.messages.filter(
|
||||
(message): message is MirroredAgentMessage =>
|
||||
@@ -289,129 +289,133 @@ export async function mirrorCodexAppServerTranscript(params: {
|
||||
return { userMessagesPresent: [] };
|
||||
}
|
||||
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
...resolveSessionWriteLockOptions(params.config),
|
||||
});
|
||||
const appendedUpdates: Array<{ messageId: string; message: AgentMessage; messageSeq: number }> =
|
||||
[];
|
||||
const userMessagesPresent: MirroredUserMessage[] = [];
|
||||
try {
|
||||
const mirrorState = await readTranscriptMirrorState(params.sessionFile);
|
||||
let nextMessageSeq = mirrorState.messageCount;
|
||||
for (const message of messages) {
|
||||
const dedupeIdentity = buildMirrorDedupeIdentity(message);
|
||||
const idempotencyKey = params.idempotencyScope
|
||||
? `${params.idempotencyScope}:${dedupeIdentity}`
|
||||
: undefined;
|
||||
const transcriptMessage = {
|
||||
...message,
|
||||
...(idempotencyKey ? { idempotencyKey } : {}),
|
||||
} as AgentMessage;
|
||||
if (idempotencyKey && mirrorState.idempotencyKeys.has(idempotencyKey)) {
|
||||
const persistedUserMessage = mirrorState.userMessagesByIdempotencyKey.get(idempotencyKey);
|
||||
if (persistedUserMessage) {
|
||||
userMessagesPresent.push(persistedUserMessage);
|
||||
const transcriptTarget = resolveCodexMirrorTranscriptTarget(params);
|
||||
const { appendedUpdates, userMessagesPresent } = await withSessionTranscriptWriteLock(
|
||||
{ ...transcriptTarget, config: params.config },
|
||||
async (transcript) => {
|
||||
const nextAppendedUpdates: Array<{
|
||||
messageId: string;
|
||||
message: AgentMessage;
|
||||
messageSeq: number;
|
||||
}> = [];
|
||||
const nextUserMessagesPresent: MirroredUserMessage[] = [];
|
||||
const mirrorState = readTranscriptMirrorState(await transcript.readEvents());
|
||||
let nextMessageSeq = mirrorState.messageCount;
|
||||
for (const message of messages) {
|
||||
const dedupeIdentity = buildMirrorDedupeIdentity(message);
|
||||
const idempotencyKey = params.idempotencyScope
|
||||
? `${params.idempotencyScope}:${dedupeIdentity}`
|
||||
: undefined;
|
||||
const transcriptMessage = {
|
||||
...message,
|
||||
...(idempotencyKey ? { idempotencyKey } : {}),
|
||||
} as AgentMessage;
|
||||
if (idempotencyKey && mirrorState.idempotencyKeys.has(idempotencyKey)) {
|
||||
const persistedUserMessage = mirrorState.userMessagesByIdempotencyKey.get(idempotencyKey);
|
||||
if (persistedUserMessage) {
|
||||
nextUserMessagesPresent.push(persistedUserMessage);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
|
||||
message: transcriptMessage,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
if (!nextMessage) {
|
||||
continue;
|
||||
}
|
||||
const messageToAppend = (
|
||||
idempotencyKey
|
||||
? {
|
||||
...(nextMessage as unknown as Record<string, unknown>),
|
||||
idempotencyKey,
|
||||
}
|
||||
: nextMessage
|
||||
) as AgentMessage;
|
||||
const { messageId, message: appendedMessage } = await appendSessionTranscriptMessage({
|
||||
transcriptPath: params.sessionFile,
|
||||
message: messageToAppend,
|
||||
idempotencyLookup: idempotencyKey ? "caller-checked" : "scan",
|
||||
sessionId: params.sessionId,
|
||||
cwd: params.cwd,
|
||||
config: params.config,
|
||||
});
|
||||
if (appendedMessage.role === "user") {
|
||||
userMessagesPresent.push(appendedMessage);
|
||||
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
|
||||
message: transcriptMessage,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
if (!nextMessage) {
|
||||
continue;
|
||||
}
|
||||
const messageToAppend = (
|
||||
idempotencyKey
|
||||
? {
|
||||
...(nextMessage as unknown as Record<string, unknown>),
|
||||
idempotencyKey,
|
||||
}
|
||||
: nextMessage
|
||||
) as AgentMessage;
|
||||
const appended = await transcript.appendMessage({
|
||||
message: messageToAppend,
|
||||
idempotencyLookup: idempotencyKey ? "caller-checked" : "scan",
|
||||
cwd: params.cwd,
|
||||
});
|
||||
if (!appended) {
|
||||
continue;
|
||||
}
|
||||
const { messageId, message: appendedMessage } = appended;
|
||||
if (appendedMessage.role === "user") {
|
||||
nextUserMessagesPresent.push(appendedMessage);
|
||||
if (idempotencyKey) {
|
||||
mirrorState.userMessagesByIdempotencyKey.set(idempotencyKey, appendedMessage);
|
||||
}
|
||||
}
|
||||
nextMessageSeq += 1;
|
||||
nextAppendedUpdates.push({
|
||||
messageId,
|
||||
message: appendedMessage,
|
||||
messageSeq: nextMessageSeq,
|
||||
});
|
||||
if (idempotencyKey) {
|
||||
mirrorState.userMessagesByIdempotencyKey.set(idempotencyKey, appendedMessage);
|
||||
mirrorState.idempotencyKeys.add(idempotencyKey);
|
||||
}
|
||||
}
|
||||
nextMessageSeq += 1;
|
||||
appendedUpdates.push({ messageId, message: appendedMessage, messageSeq: nextMessageSeq });
|
||||
if (idempotencyKey) {
|
||||
mirrorState.idempotencyKeys.add(idempotencyKey);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
return { appendedUpdates: nextAppendedUpdates, userMessagesPresent: nextUserMessagesPresent };
|
||||
},
|
||||
);
|
||||
|
||||
for (const update of appendedUpdates) {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: params.sessionFile,
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.sessionId && params.sessionKey && params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
message: update.message,
|
||||
messageId: update.messageId,
|
||||
messageSeq: update.messageSeq,
|
||||
await publishSessionTranscriptUpdateByIdentity({
|
||||
...transcriptTarget,
|
||||
update: {
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
message: update.message,
|
||||
messageId: update.messageId,
|
||||
messageSeq: update.messageSeq,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return { userMessagesPresent };
|
||||
}
|
||||
|
||||
async function readTranscriptMirrorState(sessionFile: string): Promise<{
|
||||
function resolveCodexMirrorTranscriptTarget(params: {
|
||||
agentId?: string;
|
||||
sessionFile: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
}): SessionTranscriptTargetParams {
|
||||
return {
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
sessionFile: params.sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey ?? "",
|
||||
};
|
||||
}
|
||||
|
||||
function readTranscriptMirrorState(events: unknown[]): {
|
||||
idempotencyKeys: Set<string>;
|
||||
messageCount: number;
|
||||
userMessagesByIdempotencyKey: Map<string, MirroredUserMessage>;
|
||||
}> {
|
||||
} {
|
||||
const idempotencyKeys = new Set<string>();
|
||||
const userMessagesByIdempotencyKey = new Map<string, MirroredUserMessage>();
|
||||
let messageCount = 0;
|
||||
let raw: string;
|
||||
try {
|
||||
raw = await fs.readFile(sessionFile, "utf8");
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
|
||||
throw error;
|
||||
}
|
||||
return { idempotencyKeys, messageCount, userMessagesByIdempotencyKey };
|
||||
}
|
||||
for (const line of raw.split(/\r?\n/)) {
|
||||
if (!line.trim()) {
|
||||
for (const event of events) {
|
||||
if (!event || typeof event !== "object" || Array.isArray(event)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(line) as { message?: AgentMessage & { idempotencyKey?: unknown } };
|
||||
if ((parsed as { type?: unknown }).type === "message") {
|
||||
messageCount += 1;
|
||||
const parsed = event as {
|
||||
message?: AgentMessage & { idempotencyKey?: unknown };
|
||||
type?: unknown;
|
||||
};
|
||||
if (parsed.type === "message") {
|
||||
messageCount += 1;
|
||||
}
|
||||
if (typeof parsed.message?.idempotencyKey === "string") {
|
||||
idempotencyKeys.add(parsed.message.idempotencyKey);
|
||||
if (parsed.message.role === "user") {
|
||||
userMessagesByIdempotencyKey.set(parsed.message.idempotencyKey, parsed.message);
|
||||
}
|
||||
if (typeof parsed.message?.idempotencyKey === "string") {
|
||||
idempotencyKeys.add(parsed.message.idempotencyKey);
|
||||
if (parsed.message.role === "user") {
|
||||
userMessagesByIdempotencyKey.set(parsed.message.idempotencyKey, parsed.message);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return { idempotencyKeys, messageCount, userMessagesByIdempotencyKey };
|
||||
|
||||
@@ -2461,11 +2461,13 @@ describe("runCopilotAttempt", () => {
|
||||
expect(dualWriteMock.dualWriteCopilotTranscriptBestEffort).toHaveBeenCalledTimes(1);
|
||||
const args = dualWriteMock.dualWriteCopilotTranscriptBestEffort.mock.calls[0]?.[0] as {
|
||||
sessionFile: string;
|
||||
sessionId: string;
|
||||
messages: Array<{ role: string }>;
|
||||
idempotencyScope?: string;
|
||||
};
|
||||
expect(args.sessionFile).toBe("session.json");
|
||||
expect(args.idempotencyScope).toMatch(/^copilot:/u);
|
||||
expect(args.sessionId).toBe("session-1");
|
||||
expect(args.idempotencyScope).toBe("copilot:sess-1");
|
||||
expect(args.messages.length).toBeGreaterThan(0);
|
||||
const roles = args.messages.map((m) => m.role);
|
||||
expect(roles).toContain("user");
|
||||
@@ -2512,10 +2514,9 @@ describe("runCopilotAttempt", () => {
|
||||
}
|
||||
const identity = message["__openclaw"]?.mirrorIdentity ?? "";
|
||||
// The terminal assistant carries the turn-stable
|
||||
// `${runId}:assistant:final` identity attached by attempt.ts
|
||||
// (rubber-duck-validated identity scheme — survives SDK session
|
||||
// reuse across turns). Caller-passed history without an
|
||||
// identity falls through to the positional `${scope}:role:idx`
|
||||
// `${runId}:assistant:final` identity attached by attempt.ts.
|
||||
// Caller-passed history without an identity falls through to
|
||||
// the positional `${scope}:role:idx`.
|
||||
// fingerprint that the existing tagging map applies.
|
||||
if (message.role === "assistant" && index === args.messages.length - 1) {
|
||||
expect(identity).toMatch(/:assistant:final$/u);
|
||||
|
||||
@@ -1005,8 +1005,9 @@ export async function runCopilotAttempt(
|
||||
// extension. Identity-tagged so re-emits dedupe. Errors are
|
||||
// swallowed so a mirror failure cannot break the attempt.
|
||||
const sessionFileForMirror = readString(input.sessionFile);
|
||||
const sessionIdForScope = sessionIdUsed ?? readString(input.sessionId);
|
||||
if (sessionFileForMirror && messagesSnapshot.length > 0) {
|
||||
const openClawSessionIdForMirror = readString(input.sessionId);
|
||||
const mirrorScopeSessionId = sessionIdUsed ?? openClawSessionIdForMirror;
|
||||
if (sessionFileForMirror && openClawSessionIdForMirror && messagesSnapshot.length > 0) {
|
||||
const taggedMessages = messagesSnapshot.map((message, index) => {
|
||||
if (
|
||||
message.role !== "user" &&
|
||||
@@ -1027,16 +1028,16 @@ export async function runCopilotAttempt(
|
||||
if (hasMirrorIdentity(message)) {
|
||||
return message;
|
||||
}
|
||||
const identityScope = sdkSessionId ?? sessionIdForScope ?? "attempt";
|
||||
const identityScope = sdkSessionId ?? mirrorScopeSessionId ?? "attempt";
|
||||
return attachCopilotMirrorIdentity(message, `${identityScope}:${message.role}:${index}`);
|
||||
});
|
||||
await dualWriteCopilotTranscriptBestEffort({
|
||||
sessionFile: sessionFileForMirror,
|
||||
sessionId: openClawSessionIdForMirror,
|
||||
sessionKey: readString((input as { sessionKey?: unknown }).sessionKey),
|
||||
sessionId: readString(input.sessionId),
|
||||
agentId: readString(input.agentId),
|
||||
messages: taggedMessages,
|
||||
idempotencyScope: sessionIdForScope ? `copilot:${sessionIdForScope}` : undefined,
|
||||
idempotencyScope: mirrorScopeSessionId ? `copilot:${mirrorScopeSessionId}` : undefined,
|
||||
config: (input as { config?: unknown }).config as never,
|
||||
}).catch((mirrorError: unknown) => {
|
||||
// Defense-in-depth: the best-effort wrapper already swallows
|
||||
|
||||
@@ -86,6 +86,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [userMessage, assistantMessage, toolResultMessage],
|
||||
idempotencyScope: "copilot:session-1",
|
||||
@@ -113,6 +114,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
@@ -143,12 +145,14 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [...messages],
|
||||
idempotencyScope: "copilot:session-1",
|
||||
});
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [...messages],
|
||||
idempotencyScope: "copilot:session-1",
|
||||
@@ -185,6 +189,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [sourceMessage],
|
||||
idempotencyScope: "copilot:session-1",
|
||||
@@ -210,6 +215,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
@@ -228,6 +234,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [],
|
||||
idempotencyScope: "copilot:session-1",
|
||||
@@ -245,6 +252,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
messages: [message],
|
||||
idempotencyScope: "scope-fp",
|
||||
});
|
||||
@@ -263,6 +271,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
messages: [tagged],
|
||||
idempotencyScope: "copilot:openclaw-session-1",
|
||||
});
|
||||
@@ -279,6 +288,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "no scope" }],
|
||||
@@ -306,6 +316,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
messages: [userMessage, systemLike],
|
||||
idempotencyScope: "scope",
|
||||
});
|
||||
@@ -326,6 +337,7 @@ describe("mirrorCopilotTranscript", () => {
|
||||
|
||||
await mirrorCopilotTranscript({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
messages: [second],
|
||||
idempotencyScope: "scope",
|
||||
});
|
||||
@@ -342,6 +354,7 @@ describe("dualWriteCopilotTranscriptBestEffort", () => {
|
||||
await expect(
|
||||
dualWriteCopilotTranscriptBestEffort({
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
@@ -356,22 +369,34 @@ describe("dualWriteCopilotTranscriptBestEffort", () => {
|
||||
});
|
||||
|
||||
it("swallows infrastructure failures and never rejects", async () => {
|
||||
// Pointing sessionFile at a path under a non-existent root with an
|
||||
// empty-string segment can fail differently on different platforms;
|
||||
// instead force failure by passing an invalid type and asserting
|
||||
// that the wrapper itself does not reject. Use any-cast for the
|
||||
// bad input shape since we are testing the wrapper's catch.
|
||||
await expect(
|
||||
dualWriteCopilotTranscriptBestEffort({
|
||||
sessionFile: "" as unknown as string,
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "should-not-throw" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
],
|
||||
idempotencyScope: "scope",
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
const root = await makeRoot("openclaw-copilot-mirror-invalid-");
|
||||
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
try {
|
||||
await expect(
|
||||
dualWriteCopilotTranscriptBestEffort({
|
||||
agentId: "main",
|
||||
sessionFile: "",
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "should-not-throw" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
],
|
||||
idempotencyScope: "scope",
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
await expect(
|
||||
fs.access(path.join(root, "agents", "main", "sessions", "session-1.jsonl")),
|
||||
).rejects.toHaveProperty("code", "ENOENT");
|
||||
} finally {
|
||||
if (previousStateDir === undefined) {
|
||||
delete process.env.OPENCLAW_STATE_DIR;
|
||||
} else {
|
||||
process.env.OPENCLAW_STATE_DIR = previousStateDir;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -29,16 +29,16 @@
|
||||
*/
|
||||
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
appendSessionTranscriptMessage,
|
||||
emitSessionTranscriptUpdate,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
runAgentHarnessBeforeMessageWriteHook,
|
||||
type AgentMessage,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
} from "openclaw/plugin-sdk/agent-harness-runtime";
|
||||
import {
|
||||
publishSessionTranscriptUpdateByIdentity,
|
||||
withSessionTranscriptWriteLock,
|
||||
type SessionTranscriptTargetParams,
|
||||
type SessionTranscriptWriteLockParams,
|
||||
} from "openclaw/plugin-sdk/session-transcript-runtime";
|
||||
|
||||
type MirroredAgentMessage = Extract<AgentMessage, { role: "user" | "assistant" | "toolResult" }>;
|
||||
|
||||
@@ -95,8 +95,8 @@ function buildMirrorDedupeIdentity(message: MirroredAgentMessage): string {
|
||||
|
||||
export interface MirrorCopilotTranscriptParams {
|
||||
sessionFile: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
sessionId?: string;
|
||||
agentId?: string;
|
||||
messages: AgentMessage[];
|
||||
/**
|
||||
@@ -107,7 +107,7 @@ export interface MirrorCopilotTranscriptParams {
|
||||
* entry collide with its existing on-disk key and be a true no-op.
|
||||
*/
|
||||
idempotencyScope?: string;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
config?: SessionTranscriptWriteLockParams["config"];
|
||||
}
|
||||
|
||||
export async function mirrorCopilotTranscript(
|
||||
@@ -121,95 +121,91 @@ export async function mirrorCopilotTranscript(
|
||||
return;
|
||||
}
|
||||
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
});
|
||||
try {
|
||||
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
|
||||
for (const message of messages) {
|
||||
const dedupeIdentity = buildMirrorDedupeIdentity(message);
|
||||
const idempotencyKey = params.idempotencyScope
|
||||
? `${params.idempotencyScope}:${dedupeIdentity}`
|
||||
: undefined;
|
||||
if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) {
|
||||
continue;
|
||||
const transcriptTarget = resolveCopilotMirrorTranscriptTarget(params);
|
||||
const didAppend = await withSessionTranscriptWriteLock(
|
||||
{ ...transcriptTarget, config: params.config },
|
||||
async (transcript) => {
|
||||
let didAppendMessage = false;
|
||||
const existingIdempotencyKeys = readTranscriptIdempotencyKeys(await transcript.readEvents());
|
||||
for (const message of messages) {
|
||||
const dedupeIdentity = buildMirrorDedupeIdentity(message);
|
||||
const idempotencyKey = params.idempotencyScope
|
||||
? `${params.idempotencyScope}:${dedupeIdentity}`
|
||||
: undefined;
|
||||
if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) {
|
||||
continue;
|
||||
}
|
||||
const transcriptMessage = {
|
||||
...message,
|
||||
...(idempotencyKey ? { idempotencyKey } : {}),
|
||||
} as AgentMessage;
|
||||
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
|
||||
message: transcriptMessage,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
if (!nextMessage) {
|
||||
continue;
|
||||
}
|
||||
const messageToAppend = (
|
||||
idempotencyKey
|
||||
? {
|
||||
...(nextMessage as unknown as Record<string, unknown>),
|
||||
idempotencyKey,
|
||||
}
|
||||
: nextMessage
|
||||
) as AgentMessage;
|
||||
const appended = await transcript.appendMessage({
|
||||
message: messageToAppend,
|
||||
idempotencyLookup: idempotencyKey ? "caller-checked" : "scan",
|
||||
});
|
||||
if (!appended) {
|
||||
continue;
|
||||
}
|
||||
didAppendMessage = true;
|
||||
if (idempotencyKey) {
|
||||
existingIdempotencyKeys.add(idempotencyKey);
|
||||
}
|
||||
}
|
||||
const transcriptMessage = {
|
||||
...message,
|
||||
...(idempotencyKey ? { idempotencyKey } : {}),
|
||||
} as AgentMessage;
|
||||
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
|
||||
message: transcriptMessage,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
if (!nextMessage) {
|
||||
continue;
|
||||
}
|
||||
const messageToAppend = (
|
||||
idempotencyKey
|
||||
? {
|
||||
...(nextMessage as unknown as Record<string, unknown>),
|
||||
idempotencyKey,
|
||||
}
|
||||
: nextMessage
|
||||
) as AgentMessage;
|
||||
await appendSessionTranscriptMessage({
|
||||
transcriptPath: params.sessionFile,
|
||||
message: messageToAppend,
|
||||
config: params.config,
|
||||
});
|
||||
if (idempotencyKey) {
|
||||
existingIdempotencyKeys.add(idempotencyKey);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
return didAppendMessage;
|
||||
},
|
||||
);
|
||||
|
||||
if (params.sessionKey) {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.sessionId && params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
if (didAppend) {
|
||||
await publishSessionTranscriptUpdateByIdentity({
|
||||
...transcriptTarget,
|
||||
update: params.sessionKey ? { sessionKey: params.sessionKey } : undefined,
|
||||
});
|
||||
} else {
|
||||
emitSessionTranscriptUpdate(params.sessionFile);
|
||||
}
|
||||
}
|
||||
|
||||
async function readTranscriptIdempotencyKeys(sessionFile: string): Promise<Set<string>> {
|
||||
const keys = new Set<string>();
|
||||
let raw: string;
|
||||
try {
|
||||
raw = await fs.readFile(sessionFile, "utf8");
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
|
||||
throw error;
|
||||
}
|
||||
return keys;
|
||||
function resolveCopilotMirrorTranscriptTarget(params: {
|
||||
agentId?: string;
|
||||
sessionFile: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
}): SessionTranscriptTargetParams {
|
||||
const sessionFile = params.sessionFile.trim();
|
||||
if (!sessionFile) {
|
||||
throw new Error("Copilot transcript mirror requires a sessionFile target");
|
||||
}
|
||||
for (const line of raw.split(/\r?\n/)) {
|
||||
if (!line.trim()) {
|
||||
return {
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey ?? "",
|
||||
};
|
||||
}
|
||||
|
||||
function readTranscriptIdempotencyKeys(events: unknown[]): Set<string> {
|
||||
const keys = new Set<string>();
|
||||
for (const event of events) {
|
||||
if (!event || typeof event !== "object" || Array.isArray(event)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } };
|
||||
if (typeof parsed.message?.idempotencyKey === "string") {
|
||||
keys.add(parsed.message.idempotencyKey);
|
||||
}
|
||||
} catch {
|
||||
continue;
|
||||
const parsed = event as { message?: { idempotencyKey?: unknown } };
|
||||
if (typeof parsed.message?.idempotencyKey === "string") {
|
||||
keys.add(parsed.message.idempotencyKey);
|
||||
}
|
||||
}
|
||||
return keys;
|
||||
|
||||
@@ -23,6 +23,7 @@ describe("qa channel transport", () => {
|
||||
},
|
||||
},
|
||||
messages: {
|
||||
visibleReplies: "automatic",
|
||||
groupChat: {
|
||||
mentionPatterns: ["\\b@?openclaw\\b"],
|
||||
visibleReplies: "automatic",
|
||||
|
||||
@@ -91,6 +91,7 @@ export function createQaChannelGatewayConfig(params: {
|
||||
},
|
||||
},
|
||||
messages: {
|
||||
visibleReplies: "automatic",
|
||||
groupChat: {
|
||||
mentionPatterns: ["\\b@?openclaw\\b"],
|
||||
visibleReplies: "automatic",
|
||||
|
||||
@@ -22,6 +22,7 @@ function createQaChannelTransportParams(baseUrl = "http://127.0.0.1:43124") {
|
||||
},
|
||||
},
|
||||
messages: {
|
||||
visibleReplies: "automatic",
|
||||
groupChat: {
|
||||
mentionPatterns: ["\\b@?openclaw\\b"],
|
||||
visibleReplies: "automatic",
|
||||
@@ -102,6 +103,7 @@ describe("buildQaGatewayConfig", () => {
|
||||
expect(cfg.channels?.["qa-channel"]?.enabled).toBe(true);
|
||||
expect(cfg.channels?.["qa-channel"]?.baseUrl).toBe("http://127.0.0.1:43124");
|
||||
expect(cfg.channels?.["qa-channel"]?.pollTimeoutMs).toBe(250);
|
||||
expect(cfg.messages?.visibleReplies).toBe("automatic");
|
||||
expect(cfg.messages?.groupChat?.mentionPatterns).toEqual(["\\b@?openclaw\\b"]);
|
||||
expect(cfg.messages?.groupChat?.visibleReplies).toBe("automatic");
|
||||
});
|
||||
|
||||
@@ -11,6 +11,9 @@ scenario:
|
||||
- tools.message
|
||||
- channels.webchat
|
||||
objective: Verify a current-chat reply is delivered as assistant text, not by calling `message(action=send)` and ending with `Sent.`.
|
||||
gatewayConfigPatch:
|
||||
session:
|
||||
dmScope: per-channel-peer
|
||||
successCriteria:
|
||||
- The visible outbound reply contains the requested marker exactly once.
|
||||
- The session transcript does not include a `message(action=send)` call followed by final assistant text `Sent.`.
|
||||
@@ -24,7 +27,9 @@ scenario:
|
||||
kind: flow
|
||||
summary: Run a direct current-chat reply and inspect the actual transcript for self-message routing.
|
||||
config:
|
||||
conversationId: qa-operator
|
||||
expectedMarker: WEBCHAT-DIRECT-REPLY-OK
|
||||
promptSnippet: Reply exactly
|
||||
|
||||
flow:
|
||||
steps:
|
||||
@@ -39,31 +44,56 @@ flow:
|
||||
- ref: env
|
||||
- 60000
|
||||
- call: reset
|
||||
- set: conversationId
|
||||
value:
|
||||
expr: config.conversationId
|
||||
- set: sessionKey
|
||||
value:
|
||||
expr: "`agent:qa:webchat-direct-reply:${randomUUID().slice(0, 8)}`"
|
||||
expr: "buildAgentSessionKey({ agentId: 'qa', channel: 'qa-channel', accountId: 'default', peer: { kind: 'direct', id: `dm:${conversationId}` }, dmScope: env.cfg.session?.dmScope, identityLinks: env.cfg.session?.identityLinks })"
|
||||
- set: startIndex
|
||||
value:
|
||||
expr: state.getSnapshot().messages.length
|
||||
- call: runAgentPrompt
|
||||
- set: requestCountBefore
|
||||
value:
|
||||
expr: "env.mock ? (await fetchJson(`${env.mock.baseUrl}/debug/requests`)).length : 0"
|
||||
- call: state.addInboundMessage
|
||||
args:
|
||||
- ref: env
|
||||
- sessionKey:
|
||||
ref: sessionKey
|
||||
message:
|
||||
expr: "`Reply directly in this current chat with exactly ${config.expectedMarker}. Do not call the message tool.`"
|
||||
timeoutMs:
|
||||
expr: liveTurnTimeoutMs(env, 60000)
|
||||
- call: waitForOutboundMessage
|
||||
saveAs: outbound
|
||||
args:
|
||||
- ref: state
|
||||
- lambda:
|
||||
params: [candidate]
|
||||
expr: "candidate.conversation.id === 'qa-operator' && normalizeLowercaseStringOrEmpty(candidate.text).includes(normalizeLowercaseStringOrEmpty(config.expectedMarker))"
|
||||
- expr: liveTurnTimeoutMs(env, 30000)
|
||||
- sinceIndex:
|
||||
ref: startIndex
|
||||
- conversation:
|
||||
id:
|
||||
ref: conversationId
|
||||
kind: direct
|
||||
senderId:
|
||||
ref: conversationId
|
||||
senderName: WebChat QA
|
||||
text:
|
||||
expr: "`Reply exactly \\`${config.expectedMarker}\\` in this current chat. Do not call the message tool.`"
|
||||
- try:
|
||||
actions:
|
||||
- call: waitForCondition
|
||||
saveAs: scenarioRequest
|
||||
args:
|
||||
- lambda:
|
||||
async: true
|
||||
expr: "env.mock ? (await fetchJson(`${env.mock.baseUrl}/debug/requests`)).slice(requestCountBefore).find((request) => String(request.allInputText ?? '').includes(config.promptSnippet)) : true"
|
||||
- expr: liveTurnTimeoutMs(env, 60000)
|
||||
- 500
|
||||
- call: waitForOutboundMessage
|
||||
saveAs: outbound
|
||||
args:
|
||||
- ref: state
|
||||
- lambda:
|
||||
params: [candidate]
|
||||
expr: "candidate.conversation.id === conversationId && normalizeLowercaseStringOrEmpty(candidate.text).includes(normalizeLowercaseStringOrEmpty(config.expectedMarker))"
|
||||
- expr: liveTurnTimeoutMs(env, 60000)
|
||||
- sinceIndex:
|
||||
ref: startIndex
|
||||
catchAs: directReplyError
|
||||
catch:
|
||||
- set: directReplyDebugRequests
|
||||
value:
|
||||
expr: "env.mock ? (await fetchJson(`${env.mock.baseUrl}/debug/requests`)).slice(requestCountBefore).map((request) => ({ plannedToolName: request.plannedToolName ?? null, plannedToolArgs: request.plannedToolArgs ?? null, allInputText: String(request.allInputText ?? '').slice(0, 400), finalText: String(request.finalText ?? '').slice(0, 200), toolOutput: request.toolOutput ? String(request.toolOutput).slice(0, 200) : null })) : []"
|
||||
- throw:
|
||||
expr: "`direct reply marker missing: ${directReplyError?.message ?? directReplyError}; transcript=${formatTransportTranscript(state, { conversationId })}; requests=${JSON.stringify(directReplyDebugRequests)}`"
|
||||
- set: transcriptSummary
|
||||
value:
|
||||
expr: "await readSessionTranscriptSummary(env, sessionKey)"
|
||||
|
||||
@@ -24,6 +24,13 @@ const DEFAULT_QA_SCENARIOS = [
|
||||
"memory-failure-fallback",
|
||||
"gateway-restart-inflight-run",
|
||||
];
|
||||
const SINGLE_VALUE_FLAGS = new Set([
|
||||
"--cpu-core-warn",
|
||||
"--hot-wall-warn-ms",
|
||||
"--output-dir",
|
||||
"--runs",
|
||||
"--warmup",
|
||||
]);
|
||||
const DEFAULT_CPU_CORE_WARN = 0.9;
|
||||
const DEFAULT_HOT_WALL_WARN_MS = 30_000;
|
||||
const PRIVATE_QA_REQUIRED_DIST_ENTRIES = [
|
||||
@@ -49,8 +56,15 @@ function parseArgs(argv) {
|
||||
cpuCoreWarn: DEFAULT_CPU_CORE_WARN,
|
||||
hotWallWarnMs: DEFAULT_HOT_WALL_WARN_MS,
|
||||
};
|
||||
const seenSingleValueFlags = new Set();
|
||||
for (let index = 0; index < args.length; index += 1) {
|
||||
const arg = args[index];
|
||||
if (SINGLE_VALUE_FLAGS.has(arg)) {
|
||||
if (seenSingleValueFlags.has(arg)) {
|
||||
throw new Error(`${arg} was provided more than once`);
|
||||
}
|
||||
seenSingleValueFlags.add(arg);
|
||||
}
|
||||
const readValue = () => {
|
||||
const value = args[index + 1];
|
||||
if (!value || value.startsWith("-")) {
|
||||
|
||||
@@ -35,6 +35,24 @@ const DEFAULT_CPU_CORE_WARN = 0.9;
|
||||
const DEFAULT_HOT_WALL_WARN_MS = 30_000;
|
||||
const DEFAULT_MAX_RSS_WARN_MB = 1536;
|
||||
const DEFAULT_QA_PLUGIN_CHUNK_SIZE = 12;
|
||||
const SINGLE_VALUE_FLAGS = new Set([
|
||||
"--build-timeout-ms",
|
||||
"--command-timeout-ms",
|
||||
"--cpu-core-warn",
|
||||
"--hot-wall-warn-ms",
|
||||
"--limit",
|
||||
"--max-rss-warn-mb",
|
||||
"--output-dir",
|
||||
"--qa-cpu-regression-multiplier",
|
||||
"--qa-plugin-chunk-size",
|
||||
"--qa-timeout-ms",
|
||||
"--qa-wall-regression-multiplier",
|
||||
"--repo-root",
|
||||
"--rss-anomaly-multiplier",
|
||||
"--shard-index",
|
||||
"--shard-total",
|
||||
"--wall-anomaly-multiplier",
|
||||
]);
|
||||
const COMMAND_OUTPUT_MAX_BUFFER_BYTES = 16 * 1024 * 1024;
|
||||
const MAX_TIMER_TIMEOUT_MS = 2_147_000_000;
|
||||
const ANSI_PATTERN = new RegExp(String.raw`\u001B\[[0-9;]*m`, "gu");
|
||||
@@ -79,8 +97,15 @@ export function parseArgs(argv) {
|
||||
};
|
||||
const envIds = normalizeCsv(process.env.OPENCLAW_PLUGIN_GATEWAY_GAUNTLET_IDS);
|
||||
options.pluginIds.push(...envIds);
|
||||
const seenSingleValueFlags = new Set();
|
||||
parseArgv: for (let index = 0; index < args.length; index += 1) {
|
||||
const arg = args[index];
|
||||
if (SINGLE_VALUE_FLAGS.has(arg)) {
|
||||
if (seenSingleValueFlags.has(arg)) {
|
||||
throw new Error(`${arg} was provided more than once`);
|
||||
}
|
||||
seenSingleValueFlags.add(arg);
|
||||
}
|
||||
const readValue = () => {
|
||||
const value = args[index + 1];
|
||||
if (!value || value.startsWith("-")) {
|
||||
|
||||
@@ -14,6 +14,7 @@ import { resolveWindowsTaskkillPath } from "./lib/windows-taskkill.mjs";
|
||||
|
||||
const DEFAULT_METHODS = ["health", "config.get"];
|
||||
const DEFAULT_ITERATIONS = 10;
|
||||
const SINGLE_VALUE_FLAGS = new Set(["--iterations", "--methods", "--output-dir", "--repo-root"]);
|
||||
/** Maximum time to wait for a spawned gateway to become reachable. */
|
||||
export const READY_TIMEOUT_MS = 120_000;
|
||||
/** Per-probe timeout used while polling gateway readiness endpoints. */
|
||||
@@ -74,12 +75,19 @@ export function parseArgs(argv) {
|
||||
iterations: DEFAULT_ITERATIONS,
|
||||
methods: DEFAULT_METHODS,
|
||||
};
|
||||
const seenSingleValueFlags = new Set();
|
||||
for (let index = 0; index < argv.length; index += 1) {
|
||||
const arg = argv[index];
|
||||
if (arg === "--help" || arg === "-h") {
|
||||
args.help = true;
|
||||
continue;
|
||||
}
|
||||
if (SINGLE_VALUE_FLAGS.has(arg)) {
|
||||
if (seenSingleValueFlags.has(arg)) {
|
||||
throw new Error(`${arg} was provided more than once.`);
|
||||
}
|
||||
seenSingleValueFlags.add(arg);
|
||||
}
|
||||
if (arg === "--output-dir") {
|
||||
args.outputDir = readFlagValue(argv, index, arg);
|
||||
index += 1;
|
||||
|
||||
@@ -14,9 +14,7 @@ import type {
|
||||
ContextEngineRuntimeSettings,
|
||||
} from "../../context-engine/types.js";
|
||||
import {
|
||||
captureCompactionCheckpointSnapshotAsync,
|
||||
cleanupCompactionCheckpointSnapshot,
|
||||
persistSessionCompactionCheckpoint,
|
||||
createFileBackedCompactionCheckpointStore,
|
||||
readSessionLeafStateFromTranscriptAsync,
|
||||
resolveCompactionCheckpointTranscriptPosition,
|
||||
resolveSessionCompactionCheckpointReason,
|
||||
@@ -63,6 +61,8 @@ import { resolveModelAsync } from "./model.js";
|
||||
import type { EmbeddedAgentCompactResult } from "./types.js";
|
||||
import { normalizeContextTokenBudget } from "./utils.js";
|
||||
|
||||
const compactionCheckpointStore = createFileBackedCompactionCheckpointStore();
|
||||
|
||||
function shouldFallbackAfterHarnessCompaction(
|
||||
result: EmbeddedAgentCompactResult | undefined,
|
||||
): boolean {
|
||||
@@ -352,7 +352,7 @@ export async function compactEmbeddedAgentSession(
|
||||
// are notified regardless of which engine is active.
|
||||
const engineOwnsCompaction = contextEngine.info.ownsCompaction === true;
|
||||
checkpointSnapshot = engineOwnsCompaction
|
||||
? await captureCompactionCheckpointSnapshotAsync({
|
||||
? await compactionCheckpointStore.captureSnapshot({
|
||||
sessionFile: params.sessionFile,
|
||||
})
|
||||
: null;
|
||||
@@ -478,7 +478,7 @@ export async function compactEmbeddedAgentSession(
|
||||
preferredLeafId: postCompactionLeafId,
|
||||
transcriptState,
|
||||
});
|
||||
const storedCheckpoint = await persistSessionCompactionCheckpoint({
|
||||
const storedCheckpoint = await compactionCheckpointStore.persistCheckpoint({
|
||||
cfg: params.config,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionId: postCompactionSessionId,
|
||||
@@ -620,7 +620,7 @@ export async function compactEmbeddedAgentSession(
|
||||
};
|
||||
} finally {
|
||||
if (!checkpointSnapshotRetained) {
|
||||
await cleanupCompactionCheckpointSnapshot(checkpointSnapshot);
|
||||
await compactionCheckpointStore.cleanupSnapshot(checkpointSnapshot);
|
||||
}
|
||||
await contextEngine.dispose?.();
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
/**
|
||||
* Types for the lazy embedded-agent compaction runtime boundary.
|
||||
*/
|
||||
import type { CompactEmbeddedAgentSessionParams } from "./compact.types.js";
|
||||
import type { CompactEmbeddedAgentSessionRuntimeParams } from "./compact.types.js";
|
||||
import type { EmbeddedAgentCompactResult } from "./types.js";
|
||||
|
||||
/**
|
||||
* Lazy-runtime signature for direct embedded session compaction.
|
||||
*/
|
||||
export type CompactEmbeddedAgentSessionDirect = (
|
||||
params: CompactEmbeddedAgentSessionParams,
|
||||
params: CompactEmbeddedAgentSessionRuntimeParams,
|
||||
) => Promise<EmbeddedAgentCompactResult>;
|
||||
|
||||
@@ -8,9 +8,7 @@ import type { ThinkLevel } from "../../auto-reply/thinking.js";
|
||||
import { resolveAgentModelFallbackValues } from "../../config/model-input.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
captureCompactionCheckpointSnapshotAsync,
|
||||
cleanupCompactionCheckpointSnapshot,
|
||||
persistSessionCompactionCheckpoint,
|
||||
createFileBackedCompactionCheckpointStore,
|
||||
readSessionLeafStateFromTranscriptAsync,
|
||||
resolveCompactionCheckpointTranscriptPosition,
|
||||
resolveSessionCompactionCheckpointReason,
|
||||
@@ -107,6 +105,10 @@ import { wrapStreamFnTextTransforms } from "../plugin-text-transforms.js";
|
||||
import { resolveAgentPromptSurfaceForSessionKey } from "../prompt-surface.js";
|
||||
import { applyPreparedRuntimeAuthToModel } from "../provider-request-config.js";
|
||||
import { registerProviderStreamForModel } from "../provider-stream.js";
|
||||
import {
|
||||
applyAgentRunSessionTargetIdentity,
|
||||
resolveAgentRunSessionTarget,
|
||||
} from "../run-session-target.js";
|
||||
import { collectRuntimeChannelCapabilities } from "../runtime-capabilities.js";
|
||||
import { buildAgentRuntimePlan } from "../runtime-plan/build.js";
|
||||
import type { AgentRuntimePlan } from "../runtime-plan/types.js";
|
||||
@@ -135,6 +137,7 @@ import {
|
||||
} from "./compact-reasons.js";
|
||||
import type {
|
||||
CompactEmbeddedAgentSessionParams,
|
||||
CompactEmbeddedAgentSessionRuntimeParams,
|
||||
CompactionMessageMetrics,
|
||||
} from "./compact.types.js";
|
||||
import { dedupeDuplicateUserMessagesForCompaction } from "./compaction-duplicate-user-messages.js";
|
||||
@@ -192,6 +195,11 @@ import { mapThinkingLevel, normalizeContextTokenBudget } from "./utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js";
|
||||
export type { CompactEmbeddedAgentSessionParams } from "./compact.types.js";
|
||||
|
||||
const compactionCheckpointStore = createFileBackedCompactionCheckpointStore();
|
||||
type CompactEmbeddedAgentSessionParamsWithSessionFile = CompactEmbeddedAgentSessionRuntimeParams & {
|
||||
sessionFile: string;
|
||||
};
|
||||
|
||||
function hasRealConversationContent(
|
||||
msg: AgentMessage,
|
||||
messages: AgentMessage[],
|
||||
@@ -464,8 +472,17 @@ function fallbackFailureToCompactionResult(err: unknown): EmbeddedAgentCompactRe
|
||||
* Use this when already inside a session/global lane to avoid deadlocks.
|
||||
*/
|
||||
export async function compactEmbeddedAgentSessionDirect(
|
||||
params: CompactEmbeddedAgentSessionParams,
|
||||
paramsInput: CompactEmbeddedAgentSessionRuntimeParams,
|
||||
): Promise<EmbeddedAgentCompactResult> {
|
||||
const paramsBase = applyAgentRunSessionTargetIdentity(paramsInput);
|
||||
const runSessionTarget = await resolveAgentRunSessionTarget(paramsBase);
|
||||
const params: CompactEmbeddedAgentSessionParamsWithSessionFile = {
|
||||
...paramsBase,
|
||||
agentId: paramsBase.agentId ?? runSessionTarget.agentId,
|
||||
sessionId: runSessionTarget.sessionId,
|
||||
sessionKey: paramsBase.sessionKey ?? runSessionTarget.sessionKey,
|
||||
sessionFile: runSessionTarget.sessionFile,
|
||||
};
|
||||
if (hasExplicitCompactionModel(params) || !hasCompactionModelFallbackCandidates(params)) {
|
||||
return await compactEmbeddedAgentSessionDirectOnce(params);
|
||||
}
|
||||
@@ -530,7 +547,7 @@ export async function compactEmbeddedAgentSessionDirect(
|
||||
}
|
||||
|
||||
async function compactEmbeddedAgentSessionDirectOnce(
|
||||
params: CompactEmbeddedAgentSessionParams,
|
||||
params: CompactEmbeddedAgentSessionParamsWithSessionFile,
|
||||
): Promise<EmbeddedAgentCompactResult> {
|
||||
const startedAt = Date.now();
|
||||
const diagId = params.diagId?.trim() || createCompactionDiagId();
|
||||
@@ -1190,7 +1207,7 @@ async function compactEmbeddedAgentSessionDirectOnce(
|
||||
: undefined,
|
||||
allowedToolNames,
|
||||
});
|
||||
checkpointSnapshot = await captureCompactionCheckpointSnapshotAsync({
|
||||
checkpointSnapshot = await compactionCheckpointStore.captureSnapshot({
|
||||
sessionManager,
|
||||
sessionFile: params.sessionFile,
|
||||
});
|
||||
@@ -1546,7 +1563,7 @@ async function compactEmbeddedAgentSessionDirectOnce(
|
||||
preferredLeafId: activePostLeafId,
|
||||
transcriptState,
|
||||
});
|
||||
const storedCheckpoint = await persistSessionCompactionCheckpoint({
|
||||
const storedCheckpoint = await compactionCheckpointStore.persistCheckpoint({
|
||||
cfg: params.config,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionId: activeSessionId,
|
||||
@@ -1670,7 +1687,7 @@ async function compactEmbeddedAgentSessionDirectOnce(
|
||||
return fail(reason, err);
|
||||
} finally {
|
||||
if (!checkpointSnapshotRetained) {
|
||||
await cleanupCompactionCheckpointSnapshot(checkpointSnapshot);
|
||||
await compactionCheckpointStore.cleanupSnapshot(checkpointSnapshot);
|
||||
}
|
||||
restoreSkillEnv?.();
|
||||
}
|
||||
|
||||
@@ -9,12 +9,15 @@ import type { ContextEngine, ContextEngineRuntimeContext } from "../../context-e
|
||||
import type { CommandQueueEnqueueFn } from "../../process/command-queue.types.js";
|
||||
import type { SkillSnapshot } from "../../skills/types.js";
|
||||
import type { ExecElevatedDefaults, ExecToolDefaults } from "../bash-tools.exec-types.js";
|
||||
import type { AgentRunSessionTarget } from "../run-session-target.js";
|
||||
import type { AgentRuntimePlan } from "../runtime-plan/types.js";
|
||||
|
||||
export type CompactEmbeddedAgentSessionParams = {
|
||||
sessionId: string;
|
||||
runId?: string;
|
||||
sessionKey?: string;
|
||||
/** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
/** Caller-resolved owner agent for global session aliases. */
|
||||
agentId?: string;
|
||||
/** Session key used only for runtime policy/sandbox resolution. Defaults to sessionKey. */
|
||||
@@ -106,6 +109,14 @@ export type CompactEmbeddedAgentSessionParams = {
|
||||
oneShotCliRun?: boolean;
|
||||
};
|
||||
|
||||
export type CompactEmbeddedAgentSessionRuntimeParams = Omit<
|
||||
CompactEmbeddedAgentSessionParams,
|
||||
"sessionFile"
|
||||
> & {
|
||||
/** Deprecated file-backed artifact target. Prefer sessionTarget for new callers. */
|
||||
sessionFile?: string;
|
||||
};
|
||||
|
||||
export type CompactionMessageMetrics = {
|
||||
messages: number;
|
||||
historyTextChars: number;
|
||||
|
||||
@@ -125,6 +125,10 @@ import {
|
||||
import { resolveProviderIdForAuth } from "../provider-auth-aliases.js";
|
||||
import { hasOnlyAssistantReasoningContent } from "../replay-turn-classification.js";
|
||||
import { runAgentCleanupStep } from "../run-cleanup-timeout.js";
|
||||
import {
|
||||
applyAgentRunSessionTargetIdentity,
|
||||
resolveAgentRunSessionTarget,
|
||||
} from "../run-session-target.js";
|
||||
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
|
||||
import { buildAgentRuntimePlan } from "../runtime-plan/build.js";
|
||||
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
|
||||
@@ -255,6 +259,7 @@ const BEFORE_AGENT_FINALIZE_RETRY_PROMPT_PREFIX =
|
||||
"Before accepting the previous final answer, apply this revision request and produce the revised final answer. Do not repeat completed work or rerun tools unless the request explicitly requires it.";
|
||||
const MAX_BEFORE_AGENT_FINALIZE_REVISIONS = 3;
|
||||
type EmbeddedRunAttemptForRunner = Awaited<ReturnType<typeof runEmbeddedAttemptWithBackend>>;
|
||||
type RunEmbeddedAgentParamsWithSessionFile = RunEmbeddedAgentParams & { sessionFile: string };
|
||||
|
||||
function isNoRealConversationCompactionNoop(params: {
|
||||
ok?: boolean;
|
||||
@@ -617,20 +622,28 @@ export function runEmbeddedAgent(
|
||||
async function runEmbeddedAgentInternal(
|
||||
paramsInput: RunEmbeddedAgentParams,
|
||||
): Promise<EmbeddedAgentRunResult> {
|
||||
let params = paramsInput;
|
||||
let lifecycleGeneration = params.lifecycleGeneration!;
|
||||
const paramsBase = applyAgentRunSessionTargetIdentity(paramsInput);
|
||||
let lifecycleGeneration = paramsBase.lifecycleGeneration!;
|
||||
const queuedLifecycleGeneration = getAgentEventLifecycleGeneration();
|
||||
// Resolve sessionKey early so all downstream consumers (hooks, LCM, compaction)
|
||||
// receive a non-null key even when callers omit it. See #60552.
|
||||
const effectiveSessionKey = backfillSessionKey({
|
||||
config: params.config,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: params.agentId,
|
||||
config: paramsBase.config,
|
||||
sessionId: paramsBase.sessionId,
|
||||
sessionKey: paramsBase.sessionKey,
|
||||
agentId: paramsBase.agentId,
|
||||
});
|
||||
if (effectiveSessionKey !== params.sessionKey) {
|
||||
params = { ...params, sessionKey: effectiveSessionKey };
|
||||
}
|
||||
const runSessionTarget = await resolveAgentRunSessionTarget({
|
||||
...paramsBase,
|
||||
sessionKey: effectiveSessionKey,
|
||||
});
|
||||
let params: RunEmbeddedAgentParamsWithSessionFile = {
|
||||
...paramsBase,
|
||||
agentId: paramsBase.agentId ?? runSessionTarget.agentId,
|
||||
sessionId: runSessionTarget.sessionId,
|
||||
sessionKey: effectiveSessionKey ?? runSessionTarget.sessionKey,
|
||||
sessionFile: runSessionTarget.sessionFile,
|
||||
};
|
||||
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
|
||||
const globalLane = resolveGlobalLane(params.lane);
|
||||
// Outer fallback attempts defer session suspension only while another
|
||||
|
||||
@@ -29,6 +29,7 @@ import type {
|
||||
} from "../../embedded-agent-subscribe.shared-types.js";
|
||||
import type { FastModeAutoProgressState } from "../../fast-mode.js";
|
||||
import type { AgentInternalEvent } from "../../internal-events.js";
|
||||
import type { AgentRunSessionTarget } from "../../run-session-target.js";
|
||||
import type { AgentMessage } from "../../runtime/index.js";
|
||||
import type { SilentReplyPromptMode } from "../../system-prompt.types.js";
|
||||
import type { PromptMode } from "../../system-prompt.types.js";
|
||||
@@ -47,6 +48,8 @@ export type CurrentInboundPromptContext = {
|
||||
export type RunEmbeddedAgentParams = {
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
/** Storage-neutral transcript/session target. Defaults to sessionId/sessionKey/agentId. */
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
/** Immutable gateway lifecycle ownership captured when this execution was admitted. */
|
||||
lifecycleGeneration?: string;
|
||||
/** Provider prompt-cache affinity key; distinct from transcript/session identity. */
|
||||
@@ -122,7 +125,8 @@ export type RunEmbeddedAgentParams = {
|
||||
forceHeartbeatTool?: boolean;
|
||||
/** Allow runtime plugins for this run to late-bind the gateway subagent. */
|
||||
allowGatewaySubagentBinding?: boolean;
|
||||
sessionFile: string;
|
||||
/** @deprecated Use sessionTarget plus sessionId/sessionKey/agentId for runtime identity. */
|
||||
sessionFile?: string;
|
||||
workspaceDir: string;
|
||||
/** Task working directory for tool/runtime execution. Defaults to workspaceDir. */
|
||||
cwd?: string;
|
||||
|
||||
@@ -40,6 +40,7 @@ type EmbeddedRunAttemptBase = Omit<
|
||||
| "fastMode"
|
||||
| "lane"
|
||||
| "enqueue"
|
||||
| "sessionFile"
|
||||
>;
|
||||
|
||||
export type EmbeddedRunContextWindowInfo = {
|
||||
@@ -51,6 +52,8 @@ export type EmbeddedRunContextWindowInfo = {
|
||||
export type EmbeddedRunFastModeParam = boolean | (() => boolean | undefined);
|
||||
|
||||
export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & {
|
||||
/** Active file-backed artifact target resolved by the run/session target seam. */
|
||||
sessionFile: string;
|
||||
initialReplayState?: EmbeddedRunReplayState;
|
||||
/** Pluggable context engine for ingest/assemble/compact lifecycle. */
|
||||
contextEngine?: ContextEngine;
|
||||
|
||||
@@ -206,13 +206,26 @@ export async function getSessionsSpawnTool(opts: CreateOpenClawToolsOpts) {
|
||||
compact: async () => ({ ok: true, compacted: false }),
|
||||
ingest: async () => ({ ingested: false }),
|
||||
}),
|
||||
resolveParentForkDecision: async () => ({
|
||||
status: "fork",
|
||||
maxTokens: 100_000,
|
||||
}),
|
||||
forkSessionFromParent: async () => ({
|
||||
sessionId: "forked-session-id",
|
||||
sessionFile: "/tmp/forked-session.jsonl",
|
||||
forkSessionEntryFromParent: async () => ({
|
||||
status: "forked",
|
||||
fork: {
|
||||
sessionId: "forked-session-id",
|
||||
sessionFile: "/tmp/forked-session.jsonl",
|
||||
},
|
||||
parentEntry: {
|
||||
sessionId: "parent-session-id",
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
sessionEntry: {
|
||||
sessionId: "forked-session-id",
|
||||
sessionFile: "/tmp/forked-session.jsonl",
|
||||
forkedFromParent: true,
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
decision: {
|
||||
status: "fork",
|
||||
maxTokens: 100_000,
|
||||
},
|
||||
}),
|
||||
updateSessionStore: async (_storePath, mutator) => mutator({}),
|
||||
});
|
||||
|
||||
59
src/agents/run-session-target.test.ts
Normal file
59
src/agents/run-session-target.test.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { loadSessionStore } from "../config/sessions/store.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { resolveAgentRunSessionTarget } from "./run-session-target.js";
|
||||
|
||||
describe("agent run session target", () => {
|
||||
let tempDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-run-session-target-"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
fs.rmSync(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("resolves runtime identity through the run config store", async () => {
|
||||
const storePath = path.join(tempDir, "custom-sessions", "sessions.json");
|
||||
const sessionKey = "agent:helper:commitments:test-run";
|
||||
|
||||
const target = await resolveAgentRunSessionTarget({
|
||||
agentId: "helper",
|
||||
config: { session: { store: storePath } } as OpenClawConfig,
|
||||
sessionId: "test-run",
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
expect(target).toMatchObject({
|
||||
agentId: "helper",
|
||||
sessionId: "test-run",
|
||||
sessionKey,
|
||||
});
|
||||
expect(path.dirname(target.sessionFile)).toBe(path.dirname(storePath));
|
||||
expect(loadSessionStore(storePath, { skipCache: true })[sessionKey]?.sessionFile).toBe(
|
||||
target.sessionFile,
|
||||
);
|
||||
});
|
||||
|
||||
it("uses the agent from an agent-scoped session key when agentId is omitted", async () => {
|
||||
const storeRoot = path.join(tempDir, "agents", "{agentId}", "sessions.json");
|
||||
const sessionKey = "agent:helper:main";
|
||||
|
||||
const target = await resolveAgentRunSessionTarget({
|
||||
config: { session: { store: storeRoot } } as OpenClawConfig,
|
||||
sessionId: "helper-session",
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
const helperStorePath = path.join(tempDir, "agents", "helper", "sessions.json");
|
||||
expect(target.agentId).toBe("helper");
|
||||
expect(path.dirname(target.sessionFile)).toBe(path.dirname(helperStorePath));
|
||||
expect(loadSessionStore(helperStorePath, { skipCache: true })[sessionKey]?.sessionFile).toBe(
|
||||
target.sessionFile,
|
||||
);
|
||||
});
|
||||
});
|
||||
79
src/agents/run-session-target.ts
Normal file
79
src/agents/run-session-target.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce";
|
||||
import { resolveStorePath } from "../config/sessions/paths.js";
|
||||
import {
|
||||
resolveSessionTranscriptRuntimeTarget,
|
||||
type SessionTranscriptRuntimeTarget,
|
||||
} from "../config/sessions/session-accessor.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { resolveAgentIdFromSessionKey } from "../routing/session-key.js";
|
||||
|
||||
/** Identifies a run transcript target without naming the current storage artifact. */
|
||||
export type AgentRunSessionTarget = {
|
||||
agentId?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
storePath?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
|
||||
/** File-backed target resolved from the storage-neutral run identity. */
|
||||
export type ResolvedAgentRunSessionTarget = SessionTranscriptRuntimeTarget;
|
||||
|
||||
/** Resolves the active file-backed target used by current run/session internals. */
|
||||
export async function resolveAgentRunSessionTarget(params: {
|
||||
agentId?: string;
|
||||
config?: OpenClawConfig;
|
||||
sessionFile?: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
}): Promise<ResolvedAgentRunSessionTarget> {
|
||||
const sessionTarget = params.sessionTarget;
|
||||
const agentId = normalizeOptionalString(sessionTarget?.agentId) ?? params.agentId;
|
||||
const sessionId = normalizeOptionalString(sessionTarget?.sessionId) ?? params.sessionId;
|
||||
const sessionKey = normalizeOptionalString(sessionTarget?.sessionKey) ?? params.sessionKey;
|
||||
const effectiveAgentId = agentId ?? resolveAgentIdFromSessionKey(sessionKey);
|
||||
const sessionFile = normalizeOptionalString(params.sessionFile);
|
||||
if (sessionFile) {
|
||||
return {
|
||||
agentId: effectiveAgentId ?? "",
|
||||
sessionFile,
|
||||
sessionId,
|
||||
sessionKey: sessionKey ?? "",
|
||||
};
|
||||
}
|
||||
if (!sessionKey) {
|
||||
throw new Error(`Cannot resolve run session target without a session key: ${sessionId}`);
|
||||
}
|
||||
const storePath =
|
||||
normalizeOptionalString(sessionTarget?.storePath) ??
|
||||
resolveStorePath(params.config?.session?.store, { agentId: effectiveAgentId });
|
||||
return await resolveSessionTranscriptRuntimeTarget({
|
||||
...(effectiveAgentId ? { agentId: effectiveAgentId } : {}),
|
||||
sessionId,
|
||||
sessionKey,
|
||||
storePath,
|
||||
...(sessionTarget?.threadId !== undefined ? { threadId: sessionTarget.threadId } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
/** Applies identity fields from the explicit target before legacy backfills run. */
|
||||
export function applyAgentRunSessionTargetIdentity<
|
||||
T extends {
|
||||
agentId?: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
sessionTarget?: AgentRunSessionTarget;
|
||||
},
|
||||
>(params: T): T {
|
||||
const target = params.sessionTarget;
|
||||
if (!target) {
|
||||
return params;
|
||||
}
|
||||
return {
|
||||
...params,
|
||||
agentId: normalizeOptionalString(target.agentId) ?? params.agentId,
|
||||
sessionId: normalizeOptionalString(target.sessionId) ?? params.sessionId,
|
||||
sessionKey: normalizeOptionalString(target.sessionKey) ?? params.sessionKey,
|
||||
};
|
||||
}
|
||||
@@ -1,5 +1,3 @@
|
||||
// Subagent spawn context tests cover isolated, forked, lightweight, and
|
||||
// thread-bound bootstrap context preparation for child sessions.
|
||||
import path from "node:path";
|
||||
import { MAX_TIMER_TIMEOUT_MS } from "@openclaw/normalization-core/number-coercion";
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
@@ -14,7 +12,9 @@ type GatewayRequest = { method?: string; params?: Record<string, unknown> };
|
||||
describe("sessions_spawn context modes", () => {
|
||||
const storePath = "/tmp/subagent-context-session-store.json";
|
||||
const callGatewayMock = vi.fn();
|
||||
const loadSessionStoreMock = vi.fn();
|
||||
const updateSessionStoreMock = vi.fn();
|
||||
const forkSessionEntryFromParentMock = vi.fn();
|
||||
const forkSessionFromParentMock = vi.fn();
|
||||
const ensureContextEnginesInitializedMock = vi.fn();
|
||||
const resolveContextEngineMock = vi.fn();
|
||||
@@ -25,7 +25,9 @@ describe("sessions_spawn context modes", () => {
|
||||
beforeAll(async () => {
|
||||
({ spawnSubagentDirect } = await loadSubagentSpawnModuleForTest({
|
||||
callGatewayMock,
|
||||
loadSessionStoreMock,
|
||||
updateSessionStoreMock,
|
||||
forkSessionEntryFromParentMock,
|
||||
forkSessionFromParentMock,
|
||||
ensureContextEnginesInitializedMock,
|
||||
resolveContextEngineMock,
|
||||
@@ -35,7 +37,9 @@ describe("sessions_spawn context modes", () => {
|
||||
|
||||
beforeEach(() => {
|
||||
callGatewayMock.mockReset();
|
||||
loadSessionStoreMock.mockReset();
|
||||
updateSessionStoreMock.mockReset();
|
||||
forkSessionEntryFromParentMock.mockReset();
|
||||
forkSessionFromParentMock.mockReset();
|
||||
ensureContextEnginesInitializedMock.mockReset();
|
||||
resolveContextEngineMock.mockReset();
|
||||
@@ -44,14 +48,78 @@ describe("sessions_spawn context modes", () => {
|
||||
});
|
||||
|
||||
function usePersistentStoreMock(store: SessionStore) {
|
||||
// The spawn path mutates the session store in-place; this mock keeps that
|
||||
// contract visible without touching disk.
|
||||
loadSessionStoreMock.mockReturnValue(store);
|
||||
updateSessionStoreMock.mockImplementation(async (_storePath: unknown, mutator: unknown) => {
|
||||
if (typeof mutator !== "function") {
|
||||
throw new Error("missing session store mutator");
|
||||
}
|
||||
return await mutator(store);
|
||||
});
|
||||
forkSessionEntryFromParentMock.mockImplementation(
|
||||
async (params: {
|
||||
agentId: string;
|
||||
fallbackEntry?: Record<string, unknown>;
|
||||
parentStoreKeys?: string[];
|
||||
sessionKey: string;
|
||||
sessionsDir?: string;
|
||||
}) => {
|
||||
const parentEntry = params.parentStoreKeys
|
||||
?.map((key) => store[key])
|
||||
.find((entry): entry is Record<string, unknown> => Boolean(entry));
|
||||
const maxTokens = 100_000;
|
||||
const parentTokens = parentEntry?.totalTokens;
|
||||
if (
|
||||
typeof parentTokens === "number" &&
|
||||
Number.isFinite(parentTokens) &&
|
||||
parentTokens > maxTokens
|
||||
) {
|
||||
const sessionEntry = {
|
||||
...params.fallbackEntry,
|
||||
...store[params.sessionKey],
|
||||
};
|
||||
return {
|
||||
status: "skipped",
|
||||
reason: "decision-skip",
|
||||
parentEntry,
|
||||
sessionEntry,
|
||||
decision: {
|
||||
status: "skip",
|
||||
reason: "parent-too-large",
|
||||
maxTokens,
|
||||
parentTokens,
|
||||
message: `Parent context is too large to fork (${parentTokens}/${maxTokens} tokens); starting with isolated context instead.`,
|
||||
},
|
||||
};
|
||||
}
|
||||
const fork = await forkSessionFromParentMock({
|
||||
parentEntry,
|
||||
agentId: params.agentId,
|
||||
sessionsDir: params.sessionsDir,
|
||||
});
|
||||
if (!fork) {
|
||||
return { status: "failed" };
|
||||
}
|
||||
const sessionEntry = {
|
||||
...params.fallbackEntry,
|
||||
...store[params.sessionKey],
|
||||
sessionId: fork.sessionId,
|
||||
sessionFile: fork.sessionFile,
|
||||
forkedFromParent: true,
|
||||
};
|
||||
store[params.sessionKey] = sessionEntry;
|
||||
return {
|
||||
status: "forked",
|
||||
fork,
|
||||
parentEntry,
|
||||
sessionEntry,
|
||||
decision: {
|
||||
status: "fork",
|
||||
maxTokens,
|
||||
...(typeof parentTokens === "number" ? { parentTokens } : {}),
|
||||
},
|
||||
};
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function requireAcceptedResult(result: Awaited<ReturnType<typeof spawnSubagentDirect>>) {
|
||||
@@ -202,8 +270,6 @@ describe("sessions_spawn context modes", () => {
|
||||
});
|
||||
|
||||
it("falls back to isolated context when requested fork is too large", async () => {
|
||||
// Forking very large transcripts would create expensive child context, so
|
||||
// the accepted run records the downgrade in its note.
|
||||
const store: SessionStore = {
|
||||
main: {
|
||||
sessionId: "parent-session-id",
|
||||
|
||||
@@ -10,6 +10,7 @@ export {
|
||||
export { getRuntimeConfig } from "../config/config.js";
|
||||
export { loadSessionStore, mergeSessionEntry, updateSessionStore } from "../config/sessions.js";
|
||||
export {
|
||||
forkSessionEntryFromParent,
|
||||
forkSessionFromParent,
|
||||
resolveParentForkDecision,
|
||||
type ParentForkDecision,
|
||||
|
||||
@@ -134,6 +134,7 @@ export async function loadSubagentSpawnModuleForTest(params: {
|
||||
loadSessionStoreMock?: MockFn;
|
||||
ensureContextEnginesInitializedMock?: MockFn;
|
||||
updateSessionStoreMock?: MockFn;
|
||||
forkSessionEntryFromParentMock?: MockFn;
|
||||
forkSessionFromParentMock?: MockFn;
|
||||
resolveContextEngineMock?: MockFn;
|
||||
resolveParentForkDecisionMock?: MockFn;
|
||||
@@ -215,6 +216,36 @@ export async function loadSubagentSpawnModuleForTest(params: {
|
||||
params.dispatchGatewayMethodInProcessMock?.(...args),
|
||||
hasInProcessGatewayContext: () => Boolean(params.hasInProcessGatewayContextMock?.()),
|
||||
buildSubagentSystemPrompt: () => "system-prompt",
|
||||
forkSessionEntryFromParent:
|
||||
params.forkSessionEntryFromParentMock ??
|
||||
(async () => {
|
||||
const fork = (
|
||||
params.forkSessionFromParentMock
|
||||
? await params.forkSessionFromParentMock()
|
||||
: { sessionId: "forked-session-id", sessionFile: "/tmp/forked-session.jsonl" }
|
||||
) as { sessionId: string; sessionFile: string } | null;
|
||||
if (!fork) {
|
||||
return { status: "failed" };
|
||||
}
|
||||
return {
|
||||
status: "forked",
|
||||
fork,
|
||||
parentEntry: {
|
||||
sessionId: "parent-session-id",
|
||||
sessionFile: "/tmp/parent-session.jsonl",
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
sessionEntry: {
|
||||
sessionId: fork.sessionId,
|
||||
sessionFile: fork.sessionFile,
|
||||
forkedFromParent: true,
|
||||
},
|
||||
decision: {
|
||||
status: "fork",
|
||||
maxTokens: 100_000,
|
||||
},
|
||||
};
|
||||
}),
|
||||
forkSessionFromParent:
|
||||
params.forkSessionFromParentMock ??
|
||||
(async () => ({ sessionId: "forked-session-id", sessionFile: "/tmp/forked-session.jsonl" })),
|
||||
|
||||
@@ -88,7 +88,7 @@ import {
|
||||
callGateway,
|
||||
dispatchGatewayMethodInProcess,
|
||||
emitSessionLifecycleEvent,
|
||||
forkSessionFromParent,
|
||||
forkSessionEntryFromParent,
|
||||
getGlobalHookRunner,
|
||||
getSessionBindingService,
|
||||
getRuntimeConfig,
|
||||
@@ -99,7 +99,6 @@ import {
|
||||
normalizeDeliveryContext,
|
||||
pruneLegacyStoreKeys,
|
||||
ensureContextEnginesInitialized,
|
||||
resolveParentForkDecision,
|
||||
resolveAgentConfig,
|
||||
resolveContextEngine,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
@@ -133,26 +132,24 @@ function resolveConfiguredAgentIds(cfg: OpenClawConfig): string[] {
|
||||
type SubagentSpawnDeps = {
|
||||
callGateway: typeof callGateway;
|
||||
dispatchGatewayMethodInProcess: typeof dispatchGatewayMethodInProcess;
|
||||
forkSessionFromParent: typeof forkSessionFromParent;
|
||||
forkSessionEntryFromParent: typeof forkSessionEntryFromParent;
|
||||
getGlobalHookRunner: () => SubagentLifecycleHookRunner | null;
|
||||
getRuntimeConfig: typeof getRuntimeConfig;
|
||||
hasInProcessGatewayContext: typeof hasInProcessGatewayContext;
|
||||
ensureContextEnginesInitialized: typeof ensureContextEnginesInitialized;
|
||||
resolveContextEngine: typeof resolveContextEngine;
|
||||
resolveParentForkDecision: typeof resolveParentForkDecision;
|
||||
updateSessionStore: typeof updateSessionStore;
|
||||
};
|
||||
|
||||
const defaultSubagentSpawnDeps: SubagentSpawnDeps = {
|
||||
callGateway,
|
||||
dispatchGatewayMethodInProcess,
|
||||
forkSessionFromParent,
|
||||
forkSessionEntryFromParent,
|
||||
getGlobalHookRunner,
|
||||
getRuntimeConfig,
|
||||
hasInProcessGatewayContext,
|
||||
ensureContextEnginesInitialized,
|
||||
resolveContextEngine,
|
||||
resolveParentForkDecision,
|
||||
updateSessionStore,
|
||||
};
|
||||
|
||||
@@ -510,52 +507,45 @@ async function prepareSubagentSessionContext(params: {
|
||||
const sessionsDir = path.dirname(parentTarget.storePath);
|
||||
|
||||
try {
|
||||
const forked = (await updateSubagentSessionStore(childTarget.storePath, async (store) => {
|
||||
parentEntry = resolveStoreEntryByKeys(store, parentTarget.storeKeys);
|
||||
childEntry = resolveStoreEntryByKeys(store, childTarget.storeKeys);
|
||||
if (params.targetAgentId !== params.requesterAgentId) {
|
||||
throw new Error(
|
||||
'context="fork" currently requires the same target agent as the requester; use context="isolated" for cross-agent spawns.',
|
||||
);
|
||||
}
|
||||
|
||||
if (params.targetAgentId !== params.requesterAgentId) {
|
||||
throw new Error(
|
||||
'context="fork" currently requires the same target agent as the requester; use context="isolated" for cross-agent spawns.',
|
||||
);
|
||||
}
|
||||
if (!parentEntry?.sessionId) {
|
||||
throw new Error(
|
||||
'context="fork" requested but the requester session transcript is not available.',
|
||||
);
|
||||
}
|
||||
const forkDecision = await subagentSpawnDeps.resolveParentForkDecision({
|
||||
parentEntry,
|
||||
storePath: parentTarget.storePath,
|
||||
});
|
||||
if (forkDecision.status === "skip") {
|
||||
forkFallbackNote = forkDecision.message;
|
||||
return null;
|
||||
}
|
||||
|
||||
const fork = await subagentSpawnDeps.forkSessionFromParent({
|
||||
parentEntry,
|
||||
agentId: params.requesterAgentId,
|
||||
sessionsDir,
|
||||
});
|
||||
if (!fork) {
|
||||
throw new Error(
|
||||
'context="fork" requested but OpenClaw could not fork the requester transcript.',
|
||||
);
|
||||
}
|
||||
pruneLegacyStoreKeys({
|
||||
store,
|
||||
canonicalKey: childTarget.canonicalKey,
|
||||
candidates: childTarget.storeKeys,
|
||||
});
|
||||
store[childTarget.canonicalKey] = mergeSessionEntry(store[childTarget.canonicalKey], {
|
||||
sessionId: fork.sessionId,
|
||||
sessionFile: fork.sessionFile,
|
||||
forkedFromParent: true,
|
||||
});
|
||||
childEntry = store[childTarget.canonicalKey];
|
||||
return fork;
|
||||
})) as { sessionId: string; sessionFile: string } | null;
|
||||
const forkedResult = await subagentSpawnDeps.forkSessionEntryFromParent({
|
||||
storePath: childTarget.storePath,
|
||||
parentSessionKey: parentTarget.canonicalKey,
|
||||
parentStoreKeys: parentTarget.storeKeys,
|
||||
sessionKey: childTarget.canonicalKey,
|
||||
sessionStoreKeys: childTarget.storeKeys,
|
||||
fallbackEntry: { sessionId: "", updatedAt: Date.now() },
|
||||
agentId: params.requesterAgentId,
|
||||
sessionsDir,
|
||||
});
|
||||
if (forkedResult.status === "missing-parent") {
|
||||
throw new Error(
|
||||
'context="fork" requested but the requester session transcript is not available.',
|
||||
);
|
||||
}
|
||||
if (forkedResult.status === "failed" || forkedResult.status === "missing-entry") {
|
||||
throw new Error(
|
||||
'context="fork" requested but OpenClaw could not fork the requester transcript.',
|
||||
);
|
||||
}
|
||||
parentEntry = forkedResult.parentEntry;
|
||||
childEntry = forkedResult.sessionEntry;
|
||||
if (forkedResult.status === "skipped") {
|
||||
forkFallbackNote =
|
||||
forkedResult.decision?.status === "skip" ? forkedResult.decision.message : undefined;
|
||||
}
|
||||
const forked =
|
||||
forkedResult.status === "forked"
|
||||
? {
|
||||
sessionId: forkedResult.fork.sessionId,
|
||||
sessionFile: forkedResult.fork.sessionFile,
|
||||
}
|
||||
: null;
|
||||
|
||||
if (params.contextMode === "fork") {
|
||||
if (!parentEntry || !forked) {
|
||||
|
||||
87
src/auto-reply/reply/session-fork.test.ts
Normal file
87
src/auto-reply/reply/session-fork.test.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
// Tests parent-session fork facade storage-boundary behavior.
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { forkSessionEntryFromParent } from "./session-fork.js";
|
||||
|
||||
const runtimeMocks = vi.hoisted(() => ({
|
||||
forkSessionFromParentRuntime: vi.fn(),
|
||||
resolveParentForkTokenCountRuntime: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./session-fork.runtime.js", () => runtimeMocks);
|
||||
|
||||
const roots: string[] = [];
|
||||
|
||||
async function makeRoot(prefix: string): Promise<string> {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
|
||||
roots.push(root);
|
||||
return root;
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
runtimeMocks.forkSessionFromParentRuntime.mockReset();
|
||||
runtimeMocks.resolveParentForkTokenCountRuntime.mockReset();
|
||||
await Promise.all(roots.splice(0).map((root) => fs.rm(root, { recursive: true, force: true })));
|
||||
});
|
||||
|
||||
describe("forkSessionEntryFromParent", () => {
|
||||
it("forks transcripts in the directory for the store being mutated", async () => {
|
||||
const root = await makeRoot("openclaw-session-fork-boundary-");
|
||||
const activeStoreDir = path.join(root, "active-store");
|
||||
const configStoreDir = path.join(root, "config-store");
|
||||
await fs.mkdir(activeStoreDir, { recursive: true });
|
||||
await fs.mkdir(configStoreDir, { recursive: true });
|
||||
const storePath = path.join(activeStoreDir, "sessions.json");
|
||||
const configStorePath = path.join(configStoreDir, "sessions.json");
|
||||
const parentSessionKey = "agent:main:main";
|
||||
const sessionKey = "agent:main:subagent:child";
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[parentSessionKey]: {
|
||||
sessionId: "parent-session",
|
||||
sessionFile: path.join(activeStoreDir, "parent.jsonl"),
|
||||
updatedAt: 1,
|
||||
},
|
||||
[sessionKey]: { sessionId: "", updatedAt: 2 },
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
runtimeMocks.resolveParentForkTokenCountRuntime.mockResolvedValue(10);
|
||||
runtimeMocks.forkSessionFromParentRuntime.mockImplementation(
|
||||
async ({ sessionsDir }: { sessionsDir: string }) => ({
|
||||
sessionId: "forked-session",
|
||||
sessionFile: path.join(sessionsDir, "forked-session.jsonl"),
|
||||
}),
|
||||
);
|
||||
|
||||
const result = await forkSessionEntryFromParent({
|
||||
agentId: "main",
|
||||
config: { session: { store: configStorePath } } as OpenClawConfig,
|
||||
fallbackEntry: { sessionId: "", updatedAt: 2 },
|
||||
parentSessionKey,
|
||||
sessionKey,
|
||||
storePath,
|
||||
});
|
||||
|
||||
expect(result.status).toBe("forked");
|
||||
expect(runtimeMocks.forkSessionFromParentRuntime).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionsDir: activeStoreDir,
|
||||
}),
|
||||
);
|
||||
const stored = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
|
||||
string,
|
||||
{ sessionFile?: string }
|
||||
>;
|
||||
expect(stored[sessionKey]?.sessionFile).toBe(path.join(activeStoreDir, "forked-session.jsonl"));
|
||||
});
|
||||
});
|
||||
@@ -1,5 +1,8 @@
|
||||
/** Public session-fork facade with parent-size admission checks. */
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import path from "node:path";
|
||||
import { resolveStorePath } from "../../config/sessions/paths.js";
|
||||
import { updateSessionStore } from "../../config/sessions/store.js";
|
||||
import { mergeSessionEntry, type SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
|
||||
|
||||
/**
|
||||
@@ -10,7 +13,6 @@ import { createLazyImportLoader } from "../../shared/lazy-promise.js";
|
||||
const DEFAULT_PARENT_FORK_MAX_TOKENS = 100_000;
|
||||
const sessionForkRuntimeLoader = createLazyImportLoader(() => import("./session-fork.runtime.js"));
|
||||
|
||||
/** Decision for whether a child session should fork parent context or start isolated. */
|
||||
export type ParentForkDecision =
|
||||
| {
|
||||
status: "fork";
|
||||
@@ -25,6 +27,66 @@ export type ParentForkDecision =
|
||||
message: string;
|
||||
};
|
||||
|
||||
type ParentForkDecisionParams = {
|
||||
parentEntry: SessionEntry;
|
||||
agentId?: string;
|
||||
config?: OpenClawConfig;
|
||||
storePath?: string;
|
||||
};
|
||||
|
||||
type ForkSessionFromParentParams = {
|
||||
parentEntry: SessionEntry;
|
||||
agentId: string;
|
||||
config?: OpenClawConfig;
|
||||
sessionsDir?: string;
|
||||
};
|
||||
|
||||
export type ForkedParentSessionEntry = {
|
||||
sessionId: string;
|
||||
sessionFile: string;
|
||||
};
|
||||
|
||||
export type ForkSessionEntryFromParentResult =
|
||||
| {
|
||||
status: "forked";
|
||||
fork: ForkedParentSessionEntry;
|
||||
parentEntry: SessionEntry;
|
||||
sessionEntry: SessionEntry;
|
||||
decision: Extract<ParentForkDecision, { status: "fork" }>;
|
||||
}
|
||||
| {
|
||||
status: "skipped";
|
||||
reason: "existing-entry" | "decision-skip";
|
||||
parentEntry?: SessionEntry;
|
||||
sessionEntry: SessionEntry;
|
||||
decision?: ParentForkDecision;
|
||||
}
|
||||
| { status: "missing-entry" }
|
||||
| { status: "missing-parent" }
|
||||
| { status: "failed" };
|
||||
|
||||
export type ForkSessionEntryFromParentParams = Omit<ForkSessionFromParentParams, "parentEntry"> & {
|
||||
parentSessionKey: string;
|
||||
parentStoreKeys?: readonly string[];
|
||||
sessionKey: string;
|
||||
sessionStoreKeys?: readonly string[];
|
||||
storePath?: string;
|
||||
fallbackEntry?: SessionEntry;
|
||||
patch?: (params: {
|
||||
entry: SessionEntry;
|
||||
parentEntry: SessionEntry;
|
||||
fork: ForkedParentSessionEntry;
|
||||
decision: Extract<ParentForkDecision, { status: "fork" }>;
|
||||
}) => Partial<SessionEntry>;
|
||||
skipForkWhen?: (entry: SessionEntry) => boolean;
|
||||
skipPatch?: (entry: SessionEntry) => Partial<SessionEntry> | null;
|
||||
decisionSkipPatch?: (params: {
|
||||
decision: Extract<ParentForkDecision, { status: "skip" }>;
|
||||
entry: SessionEntry;
|
||||
parentEntry: SessionEntry;
|
||||
}) => Partial<SessionEntry> | null;
|
||||
};
|
||||
|
||||
function loadSessionForkRuntime(): Promise<typeof import("./session-fork.runtime.js")> {
|
||||
return sessionForkRuntimeLoader.load();
|
||||
}
|
||||
@@ -39,15 +101,31 @@ function formatParentForkTooLargeMessage(params: {
|
||||
);
|
||||
}
|
||||
|
||||
/** Decides whether parent context is small enough to fork into a child session. */
|
||||
export async function resolveParentForkDecision(params: {
|
||||
parentEntry: SessionEntry;
|
||||
storePath: string;
|
||||
}): Promise<ParentForkDecision> {
|
||||
function resolveParentForkStorePath(params: {
|
||||
agentId?: string;
|
||||
config?: OpenClawConfig;
|
||||
storePath?: string;
|
||||
}): string {
|
||||
return (
|
||||
params.storePath ?? resolveStorePath(params.config?.session?.store, { agentId: params.agentId })
|
||||
);
|
||||
}
|
||||
|
||||
function resolveParentForkSessionsDir(params: {
|
||||
agentId: string;
|
||||
config?: OpenClawConfig;
|
||||
sessionsDir?: string;
|
||||
}): string {
|
||||
return params.sessionsDir ?? path.dirname(resolveParentForkStorePath(params));
|
||||
}
|
||||
|
||||
export async function resolveParentForkDecision(
|
||||
params: ParentForkDecisionParams,
|
||||
): Promise<ParentForkDecision> {
|
||||
const maxTokens = DEFAULT_PARENT_FORK_MAX_TOKENS;
|
||||
const parentTokens = await resolveParentForkTokenCount({
|
||||
parentEntry: params.parentEntry,
|
||||
storePath: params.storePath,
|
||||
storePath: resolveParentForkStorePath(params),
|
||||
});
|
||||
if (typeof parentTokens === "number" && parentTokens > maxTokens) {
|
||||
return {
|
||||
@@ -65,14 +143,151 @@ export async function resolveParentForkDecision(params: {
|
||||
};
|
||||
}
|
||||
|
||||
/** Forks a new session transcript from a parent session. */
|
||||
export async function forkSessionFromParent(params: {
|
||||
parentEntry: SessionEntry;
|
||||
agentId: string;
|
||||
sessionsDir: string;
|
||||
}): Promise<{ sessionId: string; sessionFile: string } | null> {
|
||||
export async function forkSessionFromParent(
|
||||
params: ForkSessionFromParentParams,
|
||||
): Promise<{ sessionId: string; sessionFile: string } | null> {
|
||||
const runtime = await loadSessionForkRuntime();
|
||||
return runtime.forkSessionFromParentRuntime(params);
|
||||
return runtime.forkSessionFromParentRuntime({
|
||||
...params,
|
||||
sessionsDir: resolveParentForkSessionsDir(params),
|
||||
});
|
||||
}
|
||||
|
||||
function resolveEntryFromStoreKeys(params: {
|
||||
store: Record<string, SessionEntry>;
|
||||
keys: readonly string[];
|
||||
}): SessionEntry | undefined {
|
||||
for (const key of params.keys) {
|
||||
const entry = params.store[key];
|
||||
if (entry) {
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function persistForkedSessionEntry(params: {
|
||||
store: Record<string, SessionEntry>;
|
||||
sessionKey: string;
|
||||
sessionStoreKeys?: readonly string[];
|
||||
existing: SessionEntry;
|
||||
patch: Partial<SessionEntry>;
|
||||
}): SessionEntry {
|
||||
const next = mergeSessionEntry(params.existing, params.patch);
|
||||
params.store[params.sessionKey] = next;
|
||||
for (const key of params.sessionStoreKeys ?? []) {
|
||||
if (key !== params.sessionKey) {
|
||||
delete params.store[key];
|
||||
}
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
/**
|
||||
* Forks the parent transcript and persists the child session entry through one
|
||||
* storage boundary operation.
|
||||
*/
|
||||
export async function forkSessionEntryFromParent(
|
||||
params: ForkSessionEntryFromParentParams,
|
||||
): Promise<ForkSessionEntryFromParentResult> {
|
||||
const storePath = resolveParentForkStorePath(params);
|
||||
return await updateSessionStore(
|
||||
storePath,
|
||||
async (store) => {
|
||||
const parentEntry = resolveEntryFromStoreKeys({
|
||||
store,
|
||||
keys: params.parentStoreKeys ?? [params.parentSessionKey],
|
||||
});
|
||||
if (!parentEntry?.sessionId) {
|
||||
return { status: "missing-parent" };
|
||||
}
|
||||
|
||||
const entry =
|
||||
resolveEntryFromStoreKeys({
|
||||
store,
|
||||
keys: params.sessionStoreKeys ?? [params.sessionKey],
|
||||
}) ?? params.fallbackEntry;
|
||||
if (!entry) {
|
||||
return { status: "missing-entry" };
|
||||
}
|
||||
|
||||
if (params.skipForkWhen?.(entry)) {
|
||||
const patch = params.skipPatch?.(entry);
|
||||
const sessionEntry = patch
|
||||
? persistForkedSessionEntry({
|
||||
store,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionStoreKeys: params.sessionStoreKeys,
|
||||
existing: entry,
|
||||
patch,
|
||||
})
|
||||
: entry;
|
||||
return { status: "skipped", reason: "existing-entry", parentEntry, sessionEntry };
|
||||
}
|
||||
|
||||
const decision = await resolveParentForkDecision({
|
||||
parentEntry,
|
||||
agentId: params.agentId,
|
||||
config: params.config,
|
||||
storePath,
|
||||
});
|
||||
if (decision.status === "skip") {
|
||||
const patch = params.decisionSkipPatch?.({ decision, entry, parentEntry });
|
||||
const sessionEntry = patch
|
||||
? persistForkedSessionEntry({
|
||||
store,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionStoreKeys: params.sessionStoreKeys,
|
||||
existing: entry,
|
||||
patch,
|
||||
})
|
||||
: entry;
|
||||
return {
|
||||
status: "skipped",
|
||||
reason: "decision-skip",
|
||||
parentEntry,
|
||||
sessionEntry,
|
||||
decision,
|
||||
};
|
||||
}
|
||||
|
||||
const fork = await forkSessionFromParent({
|
||||
parentEntry,
|
||||
agentId: params.agentId,
|
||||
config: params.config,
|
||||
sessionsDir: params.sessionsDir ?? path.dirname(storePath),
|
||||
});
|
||||
if (!fork) {
|
||||
return { status: "failed" };
|
||||
}
|
||||
const sessionEntry = persistForkedSessionEntry({
|
||||
store,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionStoreKeys: params.sessionStoreKeys,
|
||||
existing: entry,
|
||||
patch: {
|
||||
...params.patch?.({ entry, parentEntry, fork, decision }),
|
||||
sessionId: fork.sessionId,
|
||||
sessionFile: fork.sessionFile,
|
||||
forkedFromParent: true,
|
||||
},
|
||||
});
|
||||
return {
|
||||
status: "forked",
|
||||
fork,
|
||||
parentEntry,
|
||||
sessionEntry,
|
||||
decision,
|
||||
};
|
||||
},
|
||||
{
|
||||
skipSaveWhenResult: (result) =>
|
||||
result.status === "missing-entry" ||
|
||||
result.status === "missing-parent" ||
|
||||
result.status === "failed" ||
|
||||
(result.status === "skipped" && result.sessionEntry === params.fallbackEntry),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
async function resolveParentForkTokenCount(params: {
|
||||
|
||||
@@ -50,6 +50,97 @@ type ForkSessionParamsForTest = {
|
||||
};
|
||||
|
||||
vi.mock("./session-fork.js", () => ({
|
||||
forkSessionEntryFromParent: async (params: {
|
||||
fallbackEntry?: SessionEntry;
|
||||
parentSessionKey: string;
|
||||
storePath: string;
|
||||
patch?: (patchParams: {
|
||||
entry: SessionEntry;
|
||||
parentEntry: SessionEntry;
|
||||
fork: { sessionId: string; sessionFile: string };
|
||||
decision: { status: "fork"; maxTokens: number; parentTokens?: number };
|
||||
}) => Partial<SessionEntry>;
|
||||
decisionSkipPatch?: (patchParams: {
|
||||
decision: {
|
||||
status: "skip";
|
||||
reason: "parent-too-large";
|
||||
maxTokens: number;
|
||||
parentTokens: number;
|
||||
message: string;
|
||||
};
|
||||
entry: SessionEntry;
|
||||
parentEntry: SessionEntry;
|
||||
}) => Partial<SessionEntry>;
|
||||
sessionsDir: string;
|
||||
}) => {
|
||||
const store = JSON.parse(await fs.readFile(params.storePath, "utf-8")) as Record<
|
||||
string,
|
||||
SessionEntry
|
||||
>;
|
||||
const parentEntry = store[params.parentSessionKey];
|
||||
if (!parentEntry?.sessionId) {
|
||||
return { status: "missing-parent" };
|
||||
}
|
||||
const maxTokens = 100_000;
|
||||
const parentTokens = await sessionForkMocks.resolveParentForkTokenCount({
|
||||
parentEntry,
|
||||
storePath: params.storePath,
|
||||
});
|
||||
if (typeof parentTokens === "number" && parentTokens > maxTokens) {
|
||||
const entry = params.fallbackEntry ?? { sessionId: "", updatedAt: Date.now() };
|
||||
const decision = {
|
||||
status: "skip" as const,
|
||||
reason: "parent-too-large" as const,
|
||||
maxTokens,
|
||||
parentTokens,
|
||||
message: `Parent context is too large to fork (${parentTokens}/${maxTokens} tokens); starting with isolated context instead.`,
|
||||
};
|
||||
return {
|
||||
status: "skipped",
|
||||
reason: "decision-skip",
|
||||
parentEntry,
|
||||
sessionEntry: {
|
||||
...entry,
|
||||
...params.decisionSkipPatch?.({ decision, entry, parentEntry }),
|
||||
},
|
||||
decision,
|
||||
};
|
||||
}
|
||||
const fork = await sessionForkMocks.forkSessionFromParent({
|
||||
parentEntry,
|
||||
sessionsDir: params.sessionsDir,
|
||||
});
|
||||
if (!fork) {
|
||||
return { status: "failed" };
|
||||
}
|
||||
const entry = params.fallbackEntry ?? { sessionId: "", updatedAt: Date.now() };
|
||||
return {
|
||||
status: "forked",
|
||||
fork,
|
||||
parentEntry,
|
||||
sessionEntry: {
|
||||
...entry,
|
||||
...params.patch?.({
|
||||
entry,
|
||||
parentEntry,
|
||||
fork,
|
||||
decision: {
|
||||
status: "fork",
|
||||
maxTokens,
|
||||
...(typeof parentTokens === "number" ? { parentTokens } : {}),
|
||||
},
|
||||
}),
|
||||
sessionId: fork.sessionId,
|
||||
sessionFile: fork.sessionFile,
|
||||
forkedFromParent: true,
|
||||
},
|
||||
decision: {
|
||||
status: "fork",
|
||||
maxTokens,
|
||||
...(typeof parentTokens === "number" ? { parentTokens } : {}),
|
||||
},
|
||||
};
|
||||
},
|
||||
forkSessionFromParent: (...args: [ForkSessionParamsForTest]) =>
|
||||
sessionForkMocks.forkSessionFromParent(...args),
|
||||
resolveParentForkTokenCount: (...args: [{ parentEntry: SessionEntry; storePath: string }]) =>
|
||||
|
||||
@@ -73,7 +73,7 @@ import {
|
||||
resolveLastChannelRaw,
|
||||
resolveLastToRaw,
|
||||
} from "./session-delivery.js";
|
||||
import { forkSessionFromParent, resolveParentForkDecision } from "./session-fork.js";
|
||||
import { forkSessionEntryFromParent } from "./session-fork.js";
|
||||
import { buildSessionEndHookPayload, buildSessionStartHookPayload } from "./session-hooks.js";
|
||||
import { clearSessionResetRuntimeState } from "./session-reset-cleanup.js";
|
||||
|
||||
@@ -753,47 +753,39 @@ export async function initSessionState(params: {
|
||||
const parentSessionKey = normalizeOptionalString(ctx.ParentSessionKey);
|
||||
const alreadyForked = sessionEntry.forkedFromParent === true;
|
||||
let inheritedParentContext = false;
|
||||
if (
|
||||
parentSessionKey &&
|
||||
parentSessionKey !== sessionKey &&
|
||||
sessionStore[parentSessionKey] &&
|
||||
!alreadyForked
|
||||
) {
|
||||
const parentEntry = sessionStore[parentSessionKey];
|
||||
const forkDecision = await resolveParentForkDecision({
|
||||
parentEntry,
|
||||
if (parentSessionKey && parentSessionKey !== sessionKey && !alreadyForked) {
|
||||
const forked = await forkSessionEntryFromParent({
|
||||
parentSessionKey,
|
||||
sessionKey,
|
||||
storePath,
|
||||
fallbackEntry: sessionEntry,
|
||||
agentId,
|
||||
sessionsDir: path.dirname(storePath),
|
||||
decisionSkipPatch: () => ({ ...sessionEntry, forkedFromParent: true }),
|
||||
patch: () => ({
|
||||
...sessionEntry,
|
||||
totalTokens: undefined,
|
||||
totalTokensFresh: false,
|
||||
}),
|
||||
});
|
||||
if (forkDecision.status === "skip") {
|
||||
if (forked.status === "skipped" && forked.decision?.status === "skip") {
|
||||
// The parent branch is too large to inherit usefully. Start fresh and
|
||||
// mark as handled so the thread does not retry this decision every turn.
|
||||
log.warn(
|
||||
`skipping parent fork (parent too large): parentKey=${parentSessionKey} → sessionKey=${sessionKey} ` +
|
||||
`parentTokens=${forkDecision.parentTokens} maxTokens=${forkDecision.maxTokens}`,
|
||||
`parentTokens=${forked.decision.parentTokens} maxTokens=${forked.decision.maxTokens}`,
|
||||
);
|
||||
sessionEntry.forkedFromParent = true;
|
||||
} else {
|
||||
sessionEntry = forked.sessionEntry;
|
||||
} else if (forked.status === "forked") {
|
||||
log.warn(
|
||||
`forking from parent session: parentKey=${parentSessionKey} → sessionKey=${sessionKey} ` +
|
||||
`parentTokens=${forkDecision.parentTokens ?? "unknown"}`,
|
||||
`parentTokens=${forked.decision.parentTokens ?? "unknown"}`,
|
||||
);
|
||||
const forked = await forkSessionFromParent({
|
||||
parentEntry,
|
||||
agentId,
|
||||
sessionsDir: path.dirname(storePath),
|
||||
});
|
||||
if (forked) {
|
||||
sessionId = forked.sessionId;
|
||||
sessionEntry.sessionId = forked.sessionId;
|
||||
sessionEntry.sessionFile = forked.sessionFile;
|
||||
sessionEntry.forkedFromParent = true;
|
||||
// The fork replaces the target transcript with inherited parent
|
||||
// history, so any prior target-session token snapshot is stale.
|
||||
sessionEntry.totalTokens = undefined;
|
||||
sessionEntry.totalTokensFresh = false;
|
||||
inheritedParentContext = true;
|
||||
log.warn(`forked session created: file=${forked.sessionFile}`);
|
||||
}
|
||||
sessionId = forked.fork.sessionId;
|
||||
sessionEntry = forked.sessionEntry;
|
||||
sessionEntry.forkedFromParent = true;
|
||||
inheritedParentContext = true;
|
||||
log.warn(`forked session created: file=${forked.fork.sessionFile}`);
|
||||
}
|
||||
}
|
||||
const threadIdFromSessionKey = parseSessionThreadInfoFast(
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
// restore/preview/send flows over session stores, transcripts, and active runs.
|
||||
import { randomUUID } from "node:crypto";
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import {
|
||||
normalizeOptionalLowercaseString,
|
||||
normalizeOptionalString,
|
||||
@@ -86,7 +85,7 @@ import {
|
||||
import { ADMIN_SCOPE } from "../operator-scopes.js";
|
||||
import { resolveSessionKeyForRun } from "../server-session-key.js";
|
||||
import {
|
||||
forkCompactionCheckpointTranscriptAsync,
|
||||
createFileBackedCompactionCheckpointStore,
|
||||
getSessionCompactionCheckpoint,
|
||||
listSessionCompactionCheckpoints,
|
||||
} from "../session-compaction-checkpoints.js";
|
||||
@@ -136,6 +135,8 @@ import type {
|
||||
} from "./types.js";
|
||||
import { assertValidParams } from "./validation.js";
|
||||
|
||||
const compactionCheckpointStore = createFileBackedCompactionCheckpointStore();
|
||||
|
||||
function filterSessionStoreToConfiguredAgents(
|
||||
cfg: OpenClawConfig,
|
||||
store: Record<string, SessionEntry>,
|
||||
@@ -350,74 +351,6 @@ function buildDashboardSessionKey(agentId: string): string {
|
||||
return `agent:${agentId}:dashboard:${randomUUID()}`;
|
||||
}
|
||||
|
||||
function cloneCheckpointSessionEntry(params: {
|
||||
currentEntry: SessionEntry;
|
||||
nextSessionId: string;
|
||||
nextSessionFile: string;
|
||||
label?: string;
|
||||
parentSessionKey?: string;
|
||||
totalTokens?: number;
|
||||
preserveCompactionCheckpoints?: boolean;
|
||||
}): SessionEntry {
|
||||
return {
|
||||
...params.currentEntry,
|
||||
sessionId: params.nextSessionId,
|
||||
sessionFile: params.nextSessionFile,
|
||||
updatedAt: Date.now(),
|
||||
systemSent: false,
|
||||
abortedLastRun: false,
|
||||
startedAt: undefined,
|
||||
endedAt: undefined,
|
||||
runtimeMs: undefined,
|
||||
status: undefined,
|
||||
inputTokens: undefined,
|
||||
outputTokens: undefined,
|
||||
cacheRead: undefined,
|
||||
cacheWrite: undefined,
|
||||
estimatedCostUsd: undefined,
|
||||
totalTokens:
|
||||
typeof params.totalTokens === "number" && Number.isFinite(params.totalTokens)
|
||||
? params.totalTokens
|
||||
: undefined,
|
||||
totalTokensFresh:
|
||||
typeof params.totalTokens === "number" && Number.isFinite(params.totalTokens)
|
||||
? true
|
||||
: undefined,
|
||||
label: params.label ?? params.currentEntry.label,
|
||||
parentSessionKey: params.parentSessionKey ?? params.currentEntry.parentSessionKey,
|
||||
compactionCheckpoints: params.preserveCompactionCheckpoints
|
||||
? params.currentEntry.compactionCheckpoints
|
||||
: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveCheckpointForkSource(
|
||||
checkpoint: NonNullable<ReturnType<typeof getSessionCompactionCheckpoint>>,
|
||||
): { sourceFile: string; sourceLeafId?: string; totalTokens?: number } | null {
|
||||
const preCompactionFile = checkpoint.preCompaction.sessionFile?.trim();
|
||||
if (preCompactionFile) {
|
||||
return {
|
||||
sourceFile: preCompactionFile,
|
||||
sourceLeafId: checkpoint.preCompaction.entryId ?? checkpoint.preCompaction.leafId,
|
||||
totalTokens: checkpoint.tokensBefore,
|
||||
};
|
||||
}
|
||||
const postCompactionFile = checkpoint.postCompaction.sessionFile?.trim();
|
||||
if (!postCompactionFile) {
|
||||
return null;
|
||||
}
|
||||
const postCompactionLeafId =
|
||||
checkpoint.postCompaction.entryId ?? checkpoint.postCompaction.leafId;
|
||||
if (!postCompactionLeafId) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
sourceFile: postCompactionFile,
|
||||
sourceLeafId: postCompactionLeafId,
|
||||
totalTokens: checkpoint.tokensAfter,
|
||||
};
|
||||
}
|
||||
|
||||
function isAgentMainSessionKey(cfg: OpenClawConfig, sessionKey: string): boolean {
|
||||
const parsed = parseAgentSessionKey(sessionKey);
|
||||
if (!parsed) {
|
||||
@@ -1663,7 +1596,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
const loaded = loadSessionEntry(key, { agentId: requestedAgent.agentId });
|
||||
const { cfg: loadedCfg, entry, canonicalKey } = loaded;
|
||||
const { cfg: loadedCfg, entry, canonicalKey, legacyKey } = loaded;
|
||||
const target = resolveGatewaySessionStoreTarget({
|
||||
cfg: loadedCfg,
|
||||
key: canonicalKey,
|
||||
@@ -1678,8 +1611,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
const checkpoint = getSessionCompactionCheckpoint({ entry, checkpointId });
|
||||
const forkSource = checkpoint ? resolveCheckpointForkSource(checkpoint) : null;
|
||||
if (!checkpoint || !forkSource) {
|
||||
if (!checkpoint) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
@@ -1687,12 +1619,34 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const branchedSession = await forkCompactionCheckpointTranscriptAsync({
|
||||
sourceFile: forkSource.sourceFile,
|
||||
sourceLeafId: forkSource.sourceLeafId,
|
||||
sessionDir: path.dirname(forkSource.sourceFile),
|
||||
const nextKey = buildDashboardSessionKey(target.agentId);
|
||||
const branchedSession = await compactionCheckpointStore.branchCheckpointSession({
|
||||
storePath: target.storePath,
|
||||
sourceKey: canonicalKey,
|
||||
sourceStoreKey: legacyKey,
|
||||
nextKey,
|
||||
checkpointId,
|
||||
});
|
||||
if (!branchedSession?.sessionFile) {
|
||||
if (
|
||||
branchedSession.status === "missing-checkpoint" ||
|
||||
branchedSession.status === "missing-boundary"
|
||||
) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(ErrorCodes.INVALID_REQUEST, `checkpoint not found: ${checkpointId}`),
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (branchedSession.status === "missing-session") {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(ErrorCodes.INVALID_REQUEST, `session not found: ${key}`),
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (branchedSession.status === "failed") {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
@@ -1700,30 +1654,16 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const nextKey = buildDashboardSessionKey(target.agentId);
|
||||
const label = entry.label?.trim() ? `${entry.label.trim()} (checkpoint)` : "Checkpoint branch";
|
||||
const nextEntry = cloneCheckpointSessionEntry({
|
||||
currentEntry: entry,
|
||||
nextSessionId: branchedSession.sessionId,
|
||||
nextSessionFile: branchedSession.sessionFile,
|
||||
label,
|
||||
parentSessionKey: canonicalKey,
|
||||
totalTokens: forkSource.totalTokens,
|
||||
});
|
||||
|
||||
await updateSessionStore(target.storePath, (store) => {
|
||||
store[nextKey] = nextEntry;
|
||||
});
|
||||
|
||||
respond(
|
||||
true,
|
||||
{
|
||||
ok: true,
|
||||
sourceKey: canonicalKey,
|
||||
key: nextKey,
|
||||
sessionId: nextEntry.sessionId,
|
||||
checkpoint,
|
||||
entry: nextEntry,
|
||||
key: branchedSession.key,
|
||||
sessionId: branchedSession.entry.sessionId,
|
||||
checkpoint: branchedSession.checkpoint,
|
||||
entry: branchedSession.entry,
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
@@ -1735,7 +1675,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
reason: "checkpoint-branch",
|
||||
});
|
||||
emitSessionsChanged(context, {
|
||||
sessionKey: nextKey,
|
||||
sessionKey: branchedSession.key,
|
||||
reason: "checkpoint-branch",
|
||||
});
|
||||
},
|
||||
@@ -1778,7 +1718,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
const loaded = loadSessionEntry(key, { agentId: requestedAgent.agentId });
|
||||
const { entry, canonicalKey, storePath } = loaded;
|
||||
const { entry, canonicalKey, legacyKey, storePath } = loaded;
|
||||
if (!entry?.sessionId) {
|
||||
respond(
|
||||
false,
|
||||
@@ -1788,8 +1728,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
const checkpoint = getSessionCompactionCheckpoint({ entry, checkpointId });
|
||||
const forkSource = checkpoint ? resolveCheckpointForkSource(checkpoint) : null;
|
||||
if (!checkpoint || !forkSource) {
|
||||
if (!checkpoint) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
@@ -1812,12 +1751,32 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
|
||||
const restoredSession = await forkCompactionCheckpointTranscriptAsync({
|
||||
sourceFile: forkSource.sourceFile,
|
||||
sourceLeafId: forkSource.sourceLeafId,
|
||||
sessionDir: path.dirname(forkSource.sourceFile),
|
||||
const restoredSession = await compactionCheckpointStore.restoreCheckpointSession({
|
||||
storePath,
|
||||
sessionKey: canonicalKey,
|
||||
sessionStoreKey: legacyKey,
|
||||
checkpointId,
|
||||
});
|
||||
if (!restoredSession?.sessionFile) {
|
||||
if (
|
||||
restoredSession.status === "missing-checkpoint" ||
|
||||
restoredSession.status === "missing-boundary"
|
||||
) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(ErrorCodes.INVALID_REQUEST, `checkpoint not found: ${checkpointId}`),
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (restoredSession.status === "missing-session") {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(ErrorCodes.INVALID_REQUEST, `session not found: ${key}`),
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (restoredSession.status === "failed") {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
@@ -1825,26 +1784,15 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const nextEntry = cloneCheckpointSessionEntry({
|
||||
currentEntry: entry,
|
||||
nextSessionId: restoredSession.sessionId,
|
||||
nextSessionFile: restoredSession.sessionFile,
|
||||
totalTokens: forkSource.totalTokens,
|
||||
preserveCompactionCheckpoints: true,
|
||||
});
|
||||
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
store[canonicalKey] = nextEntry;
|
||||
});
|
||||
|
||||
respond(
|
||||
true,
|
||||
{
|
||||
ok: true,
|
||||
key: canonicalKey,
|
||||
sessionId: nextEntry.sessionId,
|
||||
checkpoint,
|
||||
entry: nextEntry,
|
||||
key: restoredSession.key,
|
||||
sessionId: restoredSession.entry.sessionId,
|
||||
checkpoint: restoredSession.checkpoint,
|
||||
entry: restoredSession.entry,
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
|
||||
@@ -12,6 +12,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import {
|
||||
captureCompactionCheckpointSnapshotAsync,
|
||||
cleanupCompactionCheckpointSnapshot,
|
||||
createFileBackedCompactionCheckpointStore,
|
||||
forkCompactionCheckpointTranscriptAsync,
|
||||
MAX_COMPACTION_CHECKPOINT_LEAF_SCAN_BYTES,
|
||||
MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION,
|
||||
@@ -91,7 +92,10 @@ function checkpointConfig(storePath: string): OpenClawConfig {
|
||||
async function writeSessionStore(
|
||||
storePath: string,
|
||||
sessionKey: string,
|
||||
entry: { sessionId: string; updatedAt: number; compactionCheckpoints?: unknown[] },
|
||||
entry: { sessionId: string; updatedAt: number; compactionCheckpoints?: unknown[] } & Record<
|
||||
string,
|
||||
unknown
|
||||
>,
|
||||
): Promise<void> {
|
||||
await fs.writeFile(storePath, JSON.stringify({ [sessionKey]: entry }, null, 2), "utf-8");
|
||||
}
|
||||
@@ -629,6 +633,79 @@ describe("session-compaction-checkpoints", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
test("file-backed checkpoint store restores from the stored transcript boundary", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-store-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
const session = SessionManager.create(dir, dir);
|
||||
session.appendMessage({
|
||||
role: "user",
|
||||
content: "checkpoint source",
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
const checkpointLeafId = requireNonEmptyString(
|
||||
session.getLeafId(),
|
||||
"checkpoint leaf id missing",
|
||||
);
|
||||
session.appendMessage({
|
||||
role: "assistant",
|
||||
content: "future turn",
|
||||
api: "responses",
|
||||
provider: "openai",
|
||||
model: "gpt-test",
|
||||
timestamp: Date.now(),
|
||||
} as unknown as AssistantMessage);
|
||||
|
||||
const sessionFile = requireNonEmptyString(session.getSessionFile(), "session file missing");
|
||||
const storePath = path.join(dir, "sessions.json");
|
||||
await writeSessionStore(storePath, MAIN_SESSION_KEY, {
|
||||
sessionId: "current-session",
|
||||
sessionFile,
|
||||
updatedAt: Date.now() - 1,
|
||||
totalTokens: 200,
|
||||
compactionCheckpoints: [
|
||||
{
|
||||
checkpointId: "checkpoint-1",
|
||||
sessionKey: MAIN_SESSION_KEY,
|
||||
sessionId: "stored-session",
|
||||
createdAt: Date.now(),
|
||||
reason: "manual",
|
||||
tokensAfter: 45,
|
||||
preCompaction: { sessionId: "pre-session", leafId: "pre-leaf" },
|
||||
postCompaction: {
|
||||
sessionId: "post-session",
|
||||
sessionFile,
|
||||
leafId: checkpointLeafId,
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
const store = createFileBackedCompactionCheckpointStore();
|
||||
const restored = await store.restoreCheckpointSession({
|
||||
storePath,
|
||||
sessionKey: MAIN_SESSION_KEY,
|
||||
checkpointId: "checkpoint-1",
|
||||
});
|
||||
|
||||
if (restored.status !== "created") {
|
||||
throw new Error("expected restored checkpoint transcript");
|
||||
}
|
||||
expect(restored.entry.totalTokens).toBe(45);
|
||||
const restoredSessionFile = requireNonEmptyString(
|
||||
restored.entry.sessionFile,
|
||||
"restored session file missing",
|
||||
);
|
||||
const messages = SessionManager.open(restoredSessionFile, dir).buildSessionContext().messages;
|
||||
expect(messages.map((message) => (message as { content?: unknown }).content)).toEqual([
|
||||
"checkpoint source",
|
||||
]);
|
||||
const nextStore = await readSessionStore<{ sessionFile?: string; totalTokens?: number }>(
|
||||
storePath,
|
||||
);
|
||||
expect(nextStore[MAIN_SESSION_KEY]?.sessionFile).toBe(restored.entry.sessionFile);
|
||||
expect(nextStore[MAIN_SESSION_KEY]?.totalTokens).toBe(45);
|
||||
});
|
||||
|
||||
test("async fork migrates legacy checkpoint snapshots before writing a current header", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-legacy-fork-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
@@ -57,6 +57,87 @@ type ForkedCompactionCheckpointTranscript = {
|
||||
sessionFile: string;
|
||||
};
|
||||
|
||||
export type CompactionCheckpointForkedTranscript = ForkedCompactionCheckpointTranscript & {
|
||||
totalTokens?: number;
|
||||
};
|
||||
|
||||
export type CompactionCheckpointTranscriptForkResult =
|
||||
| { status: "created"; transcript: CompactionCheckpointForkedTranscript }
|
||||
| { status: "missing-boundary" }
|
||||
| { status: "failed" };
|
||||
|
||||
export type CompactionCheckpointSessionMutationResult =
|
||||
| {
|
||||
status: "created";
|
||||
key: string;
|
||||
checkpoint: SessionCompactionCheckpoint;
|
||||
entry: SessionEntry;
|
||||
}
|
||||
| { status: "missing-session" }
|
||||
| { status: "missing-checkpoint" }
|
||||
| { status: "missing-boundary" }
|
||||
| { status: "failed" };
|
||||
|
||||
export type BranchCheckpointSessionParams = {
|
||||
storePath: string;
|
||||
sourceKey: string;
|
||||
sourceStoreKey?: string;
|
||||
nextKey: string;
|
||||
checkpointId: string;
|
||||
};
|
||||
|
||||
export type RestoreCheckpointSessionParams = {
|
||||
storePath: string;
|
||||
sessionKey: string;
|
||||
sessionStoreKey?: string;
|
||||
checkpointId: string;
|
||||
};
|
||||
|
||||
export type PersistSessionCompactionCheckpointParams = {
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey: string;
|
||||
sessionId: string;
|
||||
reason: SessionCompactionCheckpointReason;
|
||||
snapshot: CapturedCompactionCheckpointSnapshot;
|
||||
summary?: string;
|
||||
firstKeptEntryId?: string;
|
||||
tokensBefore?: number;
|
||||
tokensAfter?: number;
|
||||
postSessionFile?: string;
|
||||
postLeafId?: string;
|
||||
postEntryId?: string;
|
||||
createdAt?: number;
|
||||
};
|
||||
|
||||
/**
|
||||
* Storage boundary for compaction checkpoint capture, persistence, branch,
|
||||
* restore, and cleanup operations.
|
||||
*/
|
||||
export type CompactionCheckpointStore = {
|
||||
/** Captures the pre-compaction transcript identity without copying rows/files. */
|
||||
captureSnapshot: typeof captureCompactionCheckpointSnapshotAsync;
|
||||
/** Persists checkpoint metadata and prunes checkpoint artifacts owned by this store. */
|
||||
persistCheckpoint: (
|
||||
params: PersistSessionCompactionCheckpointParams,
|
||||
) => Promise<SessionCompactionCheckpoint | null>;
|
||||
/** Cleans unpersisted legacy snapshot artifacts after failed persistence. */
|
||||
cleanupSnapshot: typeof cleanupCompactionCheckpointSnapshot;
|
||||
/**
|
||||
* Creates a checkpoint branch and records its session entry in one logical
|
||||
* store mutation.
|
||||
*/
|
||||
branchCheckpointSession: (
|
||||
params: BranchCheckpointSessionParams,
|
||||
) => Promise<CompactionCheckpointSessionMutationResult>;
|
||||
/**
|
||||
* Restores a checkpoint and replaces the current session entry in one logical
|
||||
* store mutation.
|
||||
*/
|
||||
restoreCheckpointSession: (
|
||||
params: RestoreCheckpointSessionParams,
|
||||
) => Promise<CompactionCheckpointSessionMutationResult>;
|
||||
};
|
||||
|
||||
function checkpointSnapshotPath(checkpoint: SessionCompactionCheckpoint): string | undefined {
|
||||
return checkpoint.preCompaction.sessionFile?.trim() || undefined;
|
||||
}
|
||||
@@ -401,6 +482,209 @@ export async function forkCompactionCheckpointTranscriptAsync(params: {
|
||||
}
|
||||
}
|
||||
|
||||
function resolveCheckpointTranscriptForkSource(
|
||||
checkpoint: SessionCompactionCheckpoint,
|
||||
): { sourceFile: string; sourceLeafId?: string; totalTokens?: number } | null {
|
||||
const preCompactionFile = checkpoint.preCompaction.sessionFile?.trim();
|
||||
if (preCompactionFile) {
|
||||
return {
|
||||
sourceFile: preCompactionFile,
|
||||
sourceLeafId: checkpoint.preCompaction.entryId ?? checkpoint.preCompaction.leafId,
|
||||
totalTokens: checkpoint.tokensBefore,
|
||||
};
|
||||
}
|
||||
|
||||
const postCompactionFile = checkpoint.postCompaction.sessionFile?.trim();
|
||||
if (!postCompactionFile) {
|
||||
return null;
|
||||
}
|
||||
const postCompactionLeafId =
|
||||
checkpoint.postCompaction.entryId ?? checkpoint.postCompaction.leafId;
|
||||
if (!postCompactionLeafId) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
sourceFile: postCompactionFile,
|
||||
sourceLeafId: postCompactionLeafId,
|
||||
totalTokens: checkpoint.tokensAfter,
|
||||
};
|
||||
}
|
||||
|
||||
async function forkCheckpointTranscriptFromStoredBoundary(params: {
|
||||
checkpoint: SessionCompactionCheckpoint;
|
||||
sessionDir?: string;
|
||||
targetCwd?: string;
|
||||
}): Promise<CompactionCheckpointTranscriptForkResult> {
|
||||
const forkSource = resolveCheckpointTranscriptForkSource(params.checkpoint);
|
||||
if (!forkSource) {
|
||||
return { status: "missing-boundary" };
|
||||
}
|
||||
const forked = await forkCompactionCheckpointTranscriptAsync({
|
||||
sourceFile: forkSource.sourceFile,
|
||||
sourceLeafId: forkSource.sourceLeafId,
|
||||
sessionDir: params.sessionDir ?? path.dirname(forkSource.sourceFile),
|
||||
...(params.targetCwd ? { targetCwd: params.targetCwd } : {}),
|
||||
});
|
||||
if (!forked) {
|
||||
return { status: "failed" };
|
||||
}
|
||||
return {
|
||||
status: "created",
|
||||
transcript: {
|
||||
...forked,
|
||||
...(typeof forkSource.totalTokens === "number"
|
||||
? { totalTokens: forkSource.totalTokens }
|
||||
: {}),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function cloneCheckpointSessionEntry(params: {
|
||||
currentEntry: SessionEntry;
|
||||
nextSessionId: string;
|
||||
nextSessionFile: string;
|
||||
label?: string;
|
||||
parentSessionKey?: string;
|
||||
totalTokens?: number;
|
||||
preserveCompactionCheckpoints?: boolean;
|
||||
}): SessionEntry {
|
||||
return {
|
||||
...params.currentEntry,
|
||||
sessionId: params.nextSessionId,
|
||||
sessionFile: params.nextSessionFile,
|
||||
updatedAt: Date.now(),
|
||||
systemSent: false,
|
||||
abortedLastRun: false,
|
||||
startedAt: undefined,
|
||||
endedAt: undefined,
|
||||
runtimeMs: undefined,
|
||||
status: undefined,
|
||||
inputTokens: undefined,
|
||||
outputTokens: undefined,
|
||||
cacheRead: undefined,
|
||||
cacheWrite: undefined,
|
||||
estimatedCostUsd: undefined,
|
||||
totalTokens:
|
||||
typeof params.totalTokens === "number" && Number.isFinite(params.totalTokens)
|
||||
? params.totalTokens
|
||||
: undefined,
|
||||
totalTokensFresh:
|
||||
typeof params.totalTokens === "number" && Number.isFinite(params.totalTokens)
|
||||
? true
|
||||
: undefined,
|
||||
label: params.label ?? params.currentEntry.label,
|
||||
parentSessionKey: params.parentSessionKey ?? params.currentEntry.parentSessionKey,
|
||||
compactionCheckpoints: params.preserveCompactionCheckpoints
|
||||
? params.currentEntry.compactionCheckpoints
|
||||
: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
async function branchCheckpointSessionFromStoredBoundary(
|
||||
params: BranchCheckpointSessionParams,
|
||||
): Promise<CompactionCheckpointSessionMutationResult> {
|
||||
return await updateSessionStore(
|
||||
params.storePath,
|
||||
async (store) => {
|
||||
const currentEntry = store[params.sourceStoreKey ?? params.sourceKey];
|
||||
if (!currentEntry?.sessionId) {
|
||||
return { status: "missing-session" };
|
||||
}
|
||||
const checkpoint = getSessionCompactionCheckpoint({
|
||||
entry: currentEntry,
|
||||
checkpointId: params.checkpointId,
|
||||
});
|
||||
if (!checkpoint) {
|
||||
return { status: "missing-checkpoint" };
|
||||
}
|
||||
const forkedSession = await forkCheckpointTranscriptFromStoredBoundary({ checkpoint });
|
||||
if (forkedSession.status !== "created") {
|
||||
return forkedSession;
|
||||
}
|
||||
|
||||
const forkedTranscript = forkedSession.transcript;
|
||||
const label = currentEntry.label?.trim()
|
||||
? `${currentEntry.label.trim()} (checkpoint)`
|
||||
: "Checkpoint branch";
|
||||
const nextEntry = cloneCheckpointSessionEntry({
|
||||
currentEntry,
|
||||
nextSessionId: forkedTranscript.sessionId,
|
||||
nextSessionFile: forkedTranscript.sessionFile,
|
||||
label,
|
||||
parentSessionKey: params.sourceKey,
|
||||
totalTokens: forkedTranscript.totalTokens,
|
||||
});
|
||||
store[params.nextKey] = nextEntry;
|
||||
return {
|
||||
status: "created",
|
||||
key: params.nextKey,
|
||||
checkpoint,
|
||||
entry: nextEntry,
|
||||
};
|
||||
},
|
||||
{ skipSaveWhenResult: (result) => result.status !== "created" },
|
||||
);
|
||||
}
|
||||
|
||||
async function restoreCheckpointSessionFromStoredBoundary(
|
||||
params: RestoreCheckpointSessionParams,
|
||||
): Promise<CompactionCheckpointSessionMutationResult> {
|
||||
return await updateSessionStore(
|
||||
params.storePath,
|
||||
async (store) => {
|
||||
const currentEntry = store[params.sessionStoreKey ?? params.sessionKey];
|
||||
if (!currentEntry?.sessionId) {
|
||||
return { status: "missing-session" };
|
||||
}
|
||||
const checkpoint = getSessionCompactionCheckpoint({
|
||||
entry: currentEntry,
|
||||
checkpointId: params.checkpointId,
|
||||
});
|
||||
if (!checkpoint) {
|
||||
return { status: "missing-checkpoint" };
|
||||
}
|
||||
const restoredSession = await forkCheckpointTranscriptFromStoredBoundary({ checkpoint });
|
||||
if (restoredSession.status !== "created") {
|
||||
return restoredSession;
|
||||
}
|
||||
|
||||
const restoredTranscript = restoredSession.transcript;
|
||||
const nextEntry = cloneCheckpointSessionEntry({
|
||||
currentEntry,
|
||||
nextSessionId: restoredTranscript.sessionId,
|
||||
nextSessionFile: restoredTranscript.sessionFile,
|
||||
totalTokens: restoredTranscript.totalTokens,
|
||||
preserveCompactionCheckpoints: true,
|
||||
});
|
||||
store[params.sessionKey] = nextEntry;
|
||||
return {
|
||||
status: "created",
|
||||
key: params.sessionKey,
|
||||
checkpoint,
|
||||
entry: nextEntry,
|
||||
};
|
||||
},
|
||||
{ skipSaveWhenResult: (result) => result.status !== "created" },
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the current file-backed compaction checkpoint domain store.
|
||||
*
|
||||
* The branch/restore operations own the transcript fork plus session entry
|
||||
* update so a SQLite implementation can copy transcript rows and update
|
||||
* `session_entries.entry_json` inside one write transaction.
|
||||
*/
|
||||
export function createFileBackedCompactionCheckpointStore(): CompactionCheckpointStore {
|
||||
return {
|
||||
captureSnapshot: captureCompactionCheckpointSnapshotAsync,
|
||||
persistCheckpoint: persistSessionCompactionCheckpoint,
|
||||
cleanupSnapshot: cleanupCompactionCheckpointSnapshot,
|
||||
branchCheckpointSession: branchCheckpointSessionFromStoredBoundary,
|
||||
restoreCheckpointSession: restoreCheckpointSessionFromStoredBoundary,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Capture the stable pre-compaction identity without duplicating the transcript.
|
||||
* Branch/restore uses the compacted successor transcript, while legacy
|
||||
@@ -488,21 +772,9 @@ async function cleanupTrimmedCompactionCheckpointFiles(params: {
|
||||
}
|
||||
}
|
||||
|
||||
export async function persistSessionCompactionCheckpoint(params: {
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey: string;
|
||||
sessionId: string;
|
||||
reason: SessionCompactionCheckpointReason;
|
||||
snapshot: CapturedCompactionCheckpointSnapshot;
|
||||
summary?: string;
|
||||
firstKeptEntryId?: string;
|
||||
tokensBefore?: number;
|
||||
tokensAfter?: number;
|
||||
postSessionFile?: string;
|
||||
postLeafId?: string;
|
||||
postEntryId?: string;
|
||||
createdAt?: number;
|
||||
}): Promise<SessionCompactionCheckpoint | null> {
|
||||
export async function persistSessionCompactionCheckpoint(
|
||||
params: PersistSessionCompactionCheckpointParams,
|
||||
): Promise<SessionCompactionCheckpoint | null> {
|
||||
const snapshotSessionFile = params.snapshot.sessionFile?.trim();
|
||||
const postSessionFile = params.postSessionFile?.trim();
|
||||
const postSourceLeafId = params.postEntryId?.trim() || params.postLeafId?.trim();
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
// Agent consult runtime tests cover consult session creation and runtime handoff.
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { RunEmbeddedAgentParams } from "../agents/embedded-agent-runner/run/params.js";
|
||||
import type {
|
||||
ForkSessionEntryFromParentParams,
|
||||
ForkSessionEntryFromParentResult,
|
||||
} from "../auto-reply/reply/session-fork.js";
|
||||
import type { SessionEntry } from "../config/sessions/types.js";
|
||||
import {
|
||||
setRealtimeVoiceAgentConsultDepsForTest,
|
||||
consultRealtimeVoiceAgent,
|
||||
@@ -267,13 +271,47 @@ describe("realtime voice agent consult runtime", () => {
|
||||
maxTokens: 100_000,
|
||||
parentTokens: 100,
|
||||
}));
|
||||
const forkSessionFromParent = vi.fn(async () => ({
|
||||
sessionId: "forked-session",
|
||||
sessionFile: "/tmp/forked.jsonl",
|
||||
}));
|
||||
const forkSessionEntryFromParent = vi.fn(
|
||||
async (
|
||||
params: ForkSessionEntryFromParentParams,
|
||||
): Promise<ForkSessionEntryFromParentResult> => {
|
||||
const fork = {
|
||||
sessionId: "forked-session",
|
||||
sessionFile: "/tmp/forked.jsonl",
|
||||
};
|
||||
const parentEntry = sessionStore["agent:main:main"];
|
||||
if (!parentEntry?.sessionId) {
|
||||
return { status: "missing-parent" };
|
||||
}
|
||||
const typedParentEntry: SessionEntry = {
|
||||
...parentEntry,
|
||||
sessionId: parentEntry.sessionId,
|
||||
updatedAt: parentEntry.updatedAt ?? Date.now(),
|
||||
};
|
||||
const decision = {
|
||||
status: "fork" as const,
|
||||
maxTokens: 100_000,
|
||||
};
|
||||
const entry = params.fallbackEntry ?? { sessionId: "", updatedAt: Date.now() };
|
||||
const sessionEntry: SessionEntry = {
|
||||
...entry,
|
||||
...params.patch?.({ entry, parentEntry: typedParentEntry, fork, decision }),
|
||||
sessionId: fork.sessionId,
|
||||
sessionFile: fork.sessionFile,
|
||||
forkedFromParent: true,
|
||||
};
|
||||
sessionStore[params.sessionKey] = sessionEntry;
|
||||
return {
|
||||
status: "forked" as const,
|
||||
fork,
|
||||
parentEntry: typedParentEntry,
|
||||
sessionEntry,
|
||||
decision,
|
||||
};
|
||||
},
|
||||
);
|
||||
setRealtimeVoiceAgentConsultDepsForTest({
|
||||
resolveParentForkDecision,
|
||||
forkSessionFromParent,
|
||||
forkSessionEntryFromParent,
|
||||
});
|
||||
|
||||
await consultRealtimeVoiceAgent({
|
||||
@@ -293,15 +331,16 @@ describe("realtime voice agent consult runtime", () => {
|
||||
userLabel: "Participant",
|
||||
});
|
||||
|
||||
expect(resolveParentForkDecision).toHaveBeenCalledWith({
|
||||
parentEntry: sessionStore["agent:main:main"],
|
||||
storePath: "/tmp/sessions.json",
|
||||
});
|
||||
expect(forkSessionFromParent).toHaveBeenCalledWith({
|
||||
parentEntry: sessionStore["agent:main:main"],
|
||||
agentId: "main",
|
||||
sessionsDir: "/tmp",
|
||||
});
|
||||
expect(resolveParentForkDecision).not.toHaveBeenCalled();
|
||||
expect(forkSessionEntryFromParent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
parentSessionKey: "agent:main:main",
|
||||
agentId: "main",
|
||||
config: {},
|
||||
sessionKey: "agent:main:subagent:google-meet:meet-1",
|
||||
}),
|
||||
);
|
||||
expect(runtime.session.patchSessionEntry).not.toHaveBeenCalled();
|
||||
const forkedEntry = sessionStore["agent:main:subagent:google-meet:meet-1"];
|
||||
if (!forkedEntry) {
|
||||
throw new Error("Expected forked consult session entry");
|
||||
@@ -316,7 +355,75 @@ describe("realtime voice agent consult runtime", () => {
|
||||
expectPositiveTimestamp(forkedEntry.updatedAt);
|
||||
const call = requireEmbeddedAgentCall(runEmbeddedAgent);
|
||||
expect(call.sessionId).toBe("forked-session");
|
||||
expect(call.sessionFile).toBe("/tmp/forked.jsonl");
|
||||
expect(call.sessionFile).toBeUndefined();
|
||||
expect(call.sessionTarget).toMatchObject({
|
||||
agentId: "main",
|
||||
sessionId: "forked-session",
|
||||
sessionKey: "agent:main:subagent:google-meet:meet-1",
|
||||
storePath: "/tmp/sessions.json",
|
||||
});
|
||||
expect(call.spawnedBy).toBe("agent:main:main");
|
||||
});
|
||||
|
||||
it("falls back to a fresh isolated consult session when requester context is too large", async () => {
|
||||
const { runtime, runEmbeddedAgent } = createAgentRuntime();
|
||||
const warn = vi.fn();
|
||||
const forkSessionEntryFromParent = vi.fn(
|
||||
async (
|
||||
params: ForkSessionEntryFromParentParams,
|
||||
): Promise<ForkSessionEntryFromParentResult> => ({
|
||||
status: "skipped",
|
||||
reason: "decision-skip",
|
||||
sessionEntry: {
|
||||
...(params.fallbackEntry ?? { sessionId: "", updatedAt: Date.now() }),
|
||||
sessionId: "",
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
decision: {
|
||||
status: "skip",
|
||||
reason: "parent-too-large",
|
||||
maxTokens: 100_000,
|
||||
parentTokens: 150_000,
|
||||
message:
|
||||
"Parent context is too large to fork (150000/100000 tokens); starting with isolated context instead.",
|
||||
},
|
||||
}),
|
||||
);
|
||||
setRealtimeVoiceAgentConsultDepsForTest({
|
||||
forkSessionEntryFromParent,
|
||||
randomUUID: () => "00000000-0000-4000-8000-000000000000",
|
||||
});
|
||||
|
||||
await consultRealtimeVoiceAgent({
|
||||
cfg: {} as never,
|
||||
agentRuntime: runtime as never,
|
||||
logger: { warn },
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:subagent:google-meet:meet-1",
|
||||
spawnedBy: "agent:main:main",
|
||||
contextMode: "fork",
|
||||
messageProvider: "google-meet",
|
||||
lane: "google-meet",
|
||||
runIdPrefix: "google-meet:meet-1",
|
||||
args: { question: "What should I say?" },
|
||||
transcript: [],
|
||||
surface: "a private Google Meet",
|
||||
userLabel: "Participant",
|
||||
});
|
||||
|
||||
expect(warn).toHaveBeenCalledWith(
|
||||
"[talk] Parent context is too large to fork (150000/100000 tokens); starting with isolated context instead.",
|
||||
);
|
||||
expect(runtime.session.patchSessionEntry).toHaveBeenCalled();
|
||||
const call = requireEmbeddedAgentCall(runEmbeddedAgent);
|
||||
expect(call.sessionId).toBe("00000000-0000-4000-8000-000000000000");
|
||||
expect(call.sessionFile).toBeUndefined();
|
||||
expect(call.sessionTarget).toMatchObject({
|
||||
agentId: "main",
|
||||
sessionId: "00000000-0000-4000-8000-000000000000",
|
||||
sessionKey: "agent:main:subagent:google-meet:meet-1",
|
||||
storePath: "/tmp/sessions.json",
|
||||
});
|
||||
expect(call.spawnedBy).toBe("agent:main:main");
|
||||
});
|
||||
|
||||
|
||||
@@ -1,11 +1,7 @@
|
||||
// Agent consult runtime starts agent consultation flows from talk sessions.
|
||||
import { randomUUID } from "node:crypto";
|
||||
import path from "node:path";
|
||||
import type { RunEmbeddedAgentParams } from "../agents/embedded-agent-runner/run/params.js";
|
||||
import {
|
||||
forkSessionFromParent,
|
||||
resolveParentForkDecision,
|
||||
} from "../auto-reply/reply/session-fork.js";
|
||||
import { forkSessionEntryFromParent } from "../auto-reply/reply/session-fork.js";
|
||||
import { parseSessionThreadInfoFast } from "../config/sessions/thread-info.js";
|
||||
import type { SessionEntry } from "../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
@@ -44,14 +40,12 @@ export {
|
||||
|
||||
type RealtimeVoiceAgentConsultDeps = {
|
||||
randomUUID: typeof randomUUID;
|
||||
resolveParentForkDecision: typeof resolveParentForkDecision;
|
||||
forkSessionFromParent: typeof forkSessionFromParent;
|
||||
forkSessionEntryFromParent: typeof forkSessionEntryFromParent;
|
||||
};
|
||||
|
||||
const defaultRealtimeVoiceAgentConsultDeps: RealtimeVoiceAgentConsultDeps = {
|
||||
randomUUID,
|
||||
resolveParentForkDecision,
|
||||
forkSessionFromParent,
|
||||
forkSessionEntryFromParent,
|
||||
};
|
||||
|
||||
let realtimeVoiceAgentConsultDeps = defaultRealtimeVoiceAgentConsultDeps;
|
||||
@@ -133,6 +127,7 @@ function resolveRealtimeVoiceAgentDeliveryContext(params: {
|
||||
|
||||
async function resolveRealtimeVoiceAgentConsultSessionEntry(params: {
|
||||
agentId: string;
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey: string;
|
||||
spawnedBy?: string | null;
|
||||
contextMode?: RealtimeVoiceAgentConsultContextMode;
|
||||
@@ -151,7 +146,37 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: {
|
||||
(!requesterAgentId || requesterAgentId === params.agentId);
|
||||
let forkDecisionWarning: string | undefined;
|
||||
|
||||
const patched = await params.agentRuntime.session.patchSessionEntry({
|
||||
let patched: SessionEntry | null = null;
|
||||
if (shouldFork) {
|
||||
const forked = await realtimeVoiceAgentConsultDeps.forkSessionEntryFromParent({
|
||||
storePath: params.storePath,
|
||||
parentSessionKey: requesterSessionKey,
|
||||
agentId: params.agentId,
|
||||
config: params.cfg,
|
||||
sessionKey: params.sessionKey,
|
||||
fallbackEntry: {
|
||||
sessionId: "",
|
||||
updatedAt: now,
|
||||
},
|
||||
skipForkWhen: (entry) => Boolean(entry.sessionId?.trim()),
|
||||
skipPatch: () => ({ ...deliveryFields, updatedAt: now }),
|
||||
patch: () => ({
|
||||
...deliveryFields,
|
||||
spawnedBy: requesterSessionKey,
|
||||
updatedAt: now,
|
||||
}),
|
||||
});
|
||||
if (forked.status === "forked" || forked.status === "skipped") {
|
||||
if (forked.status === "skipped" && forked.decision?.status === "skip") {
|
||||
forkDecisionWarning = forked.decision.message;
|
||||
}
|
||||
if (forked.sessionEntry.sessionId?.trim()) {
|
||||
patched = forked.sessionEntry;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
patched ??= await params.agentRuntime.session.patchSessionEntry({
|
||||
storePath: params.storePath,
|
||||
sessionKey: params.sessionKey,
|
||||
fallbackEntry: {
|
||||
@@ -162,39 +187,6 @@ async function resolveRealtimeVoiceAgentConsultSessionEntry(params: {
|
||||
if (entry.sessionId?.trim()) {
|
||||
return { ...deliveryFields, updatedAt: now };
|
||||
}
|
||||
// Fork only from same-agent requester sessions. Cross-agent parent sessions may carry
|
||||
// incompatible provider state, so they get a fresh consult session with spawnedBy linkage.
|
||||
if (shouldFork) {
|
||||
const parentEntry = params.agentRuntime.session.getSessionEntry({
|
||||
storePath: params.storePath,
|
||||
sessionKey: requesterSessionKey,
|
||||
});
|
||||
if (parentEntry?.sessionId?.trim()) {
|
||||
const decision = await realtimeVoiceAgentConsultDeps.resolveParentForkDecision({
|
||||
parentEntry,
|
||||
storePath: params.storePath,
|
||||
});
|
||||
if (decision.status === "fork") {
|
||||
const fork = await realtimeVoiceAgentConsultDeps.forkSessionFromParent({
|
||||
parentEntry,
|
||||
agentId: params.agentId,
|
||||
sessionsDir: path.dirname(params.storePath),
|
||||
});
|
||||
if (fork) {
|
||||
return {
|
||||
...deliveryFields,
|
||||
sessionId: fork.sessionId,
|
||||
sessionFile: fork.sessionFile,
|
||||
spawnedBy: requesterSessionKey,
|
||||
forkedFromParent: true,
|
||||
updatedAt: now,
|
||||
};
|
||||
}
|
||||
} else {
|
||||
forkDecisionWarning = decision.message;
|
||||
}
|
||||
}
|
||||
}
|
||||
return {
|
||||
...deliveryFields,
|
||||
sessionId: realtimeVoiceAgentConsultDeps.randomUUID(),
|
||||
@@ -259,6 +251,7 @@ export async function consultRealtimeVoiceAgent(params: {
|
||||
});
|
||||
const sessionEntry = await resolveRealtimeVoiceAgentConsultSessionEntry({
|
||||
agentId,
|
||||
cfg: params.cfg,
|
||||
sessionKey: params.sessionKey,
|
||||
spawnedBy: params.spawnedBy,
|
||||
contextMode: params.contextMode,
|
||||
@@ -271,14 +264,17 @@ export async function consultRealtimeVoiceAgent(params: {
|
||||
resolvedDeliveryContext ?? deliveryContextFromSession(sessionEntry);
|
||||
const sessionId = sessionEntry.sessionId;
|
||||
|
||||
const sessionFile = params.agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, {
|
||||
agentId,
|
||||
});
|
||||
// Voice consults suppress verbose/reasoning output because the bridge needs a short,
|
||||
// speakable answer, not agent-run diagnostics or hidden reasoning artifacts.
|
||||
const result = await params.agentRuntime.runEmbeddedAgent({
|
||||
sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionTarget: {
|
||||
agentId,
|
||||
sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
storePath,
|
||||
},
|
||||
sandboxSessionKey: resolveRealtimeVoiceAgentSandboxSessionKey(agentId, params.sessionKey),
|
||||
agentId,
|
||||
spawnedBy: params.spawnedBy,
|
||||
@@ -291,7 +287,6 @@ export async function consultRealtimeVoiceAgent(params: {
|
||||
consultDeliveryContext?.threadId != null
|
||||
? String(consultDeliveryContext.threadId)
|
||||
: undefined,
|
||||
sessionFile,
|
||||
workspaceDir,
|
||||
config: params.cfg,
|
||||
prompt: buildRealtimeVoiceAgentConsultPrompt({
|
||||
|
||||
@@ -111,6 +111,19 @@ describe("gateway CPU scenario guard", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects duplicate single-value controls before running scenarios", () => {
|
||||
expect(() =>
|
||||
testing.parseArgs(["--output-dir", makeTempRoot(), "--output-dir", makeTempRoot()]),
|
||||
).toThrow("--output-dir was provided more than once");
|
||||
|
||||
const result = runCli("--runs", "1", "--runs", "2");
|
||||
|
||||
expect(result.status).toBe(1);
|
||||
expect(result.stdout).toBe("");
|
||||
expect(result.stderr.trim()).toBe("--runs was provided more than once");
|
||||
expectNoNodeStack(result.stderr);
|
||||
});
|
||||
|
||||
it("reports CLI argument errors without a Node stack trace", () => {
|
||||
const result = runCli("--wat");
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Ci Workflow Guards tests cover ci workflow guards script behavior.
|
||||
import { readdirSync, readFileSync } from "node:fs";
|
||||
import { execFileSync } from "node:child_process";
|
||||
import { existsSync, readdirSync, readFileSync } from "node:fs";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { parse } from "yaml";
|
||||
|
||||
@@ -37,10 +38,17 @@ function readCriticalQualityWorkflow() {
|
||||
return readFileSync(".github/workflows/codeql-critical-quality.yml", "utf8");
|
||||
}
|
||||
|
||||
function readAndroidCompileSdk(path: string): number {
|
||||
const match = readFileSync(path, "utf8").match(/^\s*compileSdk\s*=\s*(\d+)\s*$/mu);
|
||||
function readTrackedText(relativePath: string): string {
|
||||
if (existsSync(relativePath)) {
|
||||
return readFileSync(relativePath, "utf8");
|
||||
}
|
||||
return execFileSync("git", ["show", `:${relativePath}`], { encoding: "utf8" });
|
||||
}
|
||||
|
||||
function readAndroidCompileSdk(relativePath: string): number {
|
||||
const match = readTrackedText(relativePath).match(/^\s*compileSdk\s*=\s*(\d+)\s*$/mu);
|
||||
if (!match) {
|
||||
throw new Error(`Missing compileSdk in ${path}`);
|
||||
throw new Error(`Missing compileSdk in ${relativePath}`);
|
||||
}
|
||||
return Number(match[1]);
|
||||
}
|
||||
@@ -727,18 +735,33 @@ describe("ci workflow guards", () => {
|
||||
expect(openDocsPrStep.if).toBe("${{ github.event_name == 'workflow_dispatch' }}");
|
||||
});
|
||||
|
||||
it("runs maturity scorecard from release checks", () => {
|
||||
it("keeps maturity scorecard release docs opt-in from release checks", () => {
|
||||
const releaseWorkflow = readReleaseChecksWorkflow();
|
||||
const job = releaseWorkflow.jobs.maturity_scorecard_release_checks;
|
||||
const summaryJob = releaseWorkflow.jobs.summary;
|
||||
const verifyStep = summaryJob.steps.find(
|
||||
(step) => step.name === "Verify release check results",
|
||||
);
|
||||
const inputs = releaseWorkflow.on.workflow_dispatch.inputs;
|
||||
const resolveJob = releaseWorkflow.jobs.resolve_target;
|
||||
const summarizeStep = resolveJob.steps.find((step) => step.name === "Summarize validated ref");
|
||||
|
||||
expect(releaseWorkflow.jobs).not.toHaveProperty("qa_profile_release_evidence_release_checks");
|
||||
expect(inputs.run_maturity_scorecard).toMatchObject({
|
||||
required: false,
|
||||
default: false,
|
||||
type: "boolean",
|
||||
});
|
||||
expect(resolveJob.outputs.run_maturity_scorecard).toBe(
|
||||
"${{ steps.inputs.outputs.run_maturity_scorecard }}",
|
||||
);
|
||||
expect(summarizeStep.env.RUN_MATURITY_SCORECARD).toBe(
|
||||
"${{ steps.inputs.outputs.run_maturity_scorecard }}",
|
||||
);
|
||||
expect(summarizeStep.run).toContain("- Maturity scorecard docs:");
|
||||
expect(job.name).toBe("Render maturity scorecard release docs");
|
||||
expect(job.if).toBe(
|
||||
'contains(fromJSON(\'["all","qa"]\'), needs.resolve_target.outputs.rerun_group)',
|
||||
"contains(fromJSON('[\"all\",\"qa\"]'), needs.resolve_target.outputs.rerun_group) && needs.resolve_target.outputs.run_maturity_scorecard == 'true'",
|
||||
);
|
||||
expect(job.permissions).toMatchObject({
|
||||
actions: "read",
|
||||
|
||||
@@ -182,6 +182,12 @@ describe("scripts/measure-rpc-rtt.mjs", () => {
|
||||
expect(() =>
|
||||
parseArgs(["--output-dir", "/tmp/rpc-rtt", "--methods", "health, config.get,health"]),
|
||||
).toThrow("--methods contains duplicate gateway method: health");
|
||||
expect(() => parseArgs(["--output-dir", "/tmp/one", "--output-dir", "/tmp/two"])).toThrow(
|
||||
"--output-dir was provided more than once.",
|
||||
);
|
||||
expect(() =>
|
||||
parseArgs(["--output-dir", "/tmp/rpc-rtt", "--methods", "health", "--methods", "config.get"]),
|
||||
).toThrow("--methods was provided more than once.");
|
||||
for (const value of ["--methods", "-h"]) {
|
||||
for (const flag of ["--output-dir", "--repo-root", "--iterations", "--methods"]) {
|
||||
expect(() => parseArgs([flag, value, "health"])).toThrow(`${flag} requires a value.`);
|
||||
|
||||
@@ -150,6 +150,15 @@ describe("plugin gateway gauntlet helpers", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects duplicate single-value controls", () => {
|
||||
expect(() =>
|
||||
parseArgs(["--output-dir", ".artifacts/one", "--output-dir", ".artifacts/two"]),
|
||||
).toThrow("--output-dir was provided more than once");
|
||||
expect(() => parseArgs(["--shard-total", "2", "--shard-total", "3"])).toThrow(
|
||||
"--shard-total was provided more than once",
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects valued flags followed by another option", () => {
|
||||
for (const flag of [
|
||||
"--repo-root",
|
||||
|
||||
Reference in New Issue
Block a user