fix(codex): restore bounded recovery continuity

Restore bounded Codex native recovery continuity without replaying covered mirrored transcript history. Closes #88352. Closes #88354.
This commit is contained in:
Peter Steinberger
2026-05-31 15:55:32 +01:00
committed by GitHub
parent 7b78941ea5
commit 827ceb55d0
2 changed files with 388 additions and 12 deletions

View File

@@ -1722,7 +1722,7 @@ describe("runCodexAppServerAttempt", () => {
]);
});
it("does not inject mirrored history when starting Codex without a native thread binding", async () => {
it("projects bounded continuity when starting Codex without a native thread binding", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const sessionManager = SessionManager.open(sessionFile);
@@ -1743,14 +1743,52 @@ describe("runCodexAppServerAttempt", () => {
(turnStart?.params as { input?: Array<{ text?: string }> } | undefined)?.input?.[0]?.text ??
"";
expect(inputText).not.toContain("OpenClaw assembled context for this turn:");
expect(inputText).not.toContain("we are fixing the Opik default project");
expect(inputText).not.toContain("Opik default project context");
expect(inputText).not.toContain("Current user request:");
expect(inputText).toContain("OpenClaw assembled context for this turn:");
expect(inputText).toContain("we are fixing the Opik default project");
expect(inputText).toContain("Opik default project context");
expect(inputText).toContain("Current user request:");
expect(inputText).toContain("make the default webpage openclaw");
});
it("does not inject newer mirrored history when resuming an existing Codex thread binding", async () => {
it("keeps thread-start developer instructions stable when adding fresh-thread continuity", async () => {
let hookCalls = 0;
const beforePromptBuild = vi.fn(async () => {
hookCalls += 1;
return {
systemPrompt: `custom codex system ${hookCalls}`,
prependContext: `queued context ${hookCalls}`,
};
});
initializeGlobalHookRunner(
createMockPluginRegistry([{ hookName: "before_prompt_build", handler: beforePromptBuild }]),
);
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage(userMessage("prior visible context", Date.now()));
sessionManager.appendMessage(assistantMessage("prior assistant context", Date.now() + 1));
const harness = createStartedThreadHarness();
const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir));
await harness.waitForMethod("turn/start");
await new Promise<void>((resolve) => setImmediate(resolve));
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
expect(beforePromptBuild).toHaveBeenCalled();
const threadStart = harness.requests.find((request) => request.method === "thread/start");
const threadStartParams = threadStart?.params as { developerInstructions?: string } | undefined;
expect(threadStartParams?.developerInstructions).toContain("custom codex system 1");
expect(threadStartParams?.developerInstructions).not.toContain("custom codex system 2");
const turnStart = harness.requests.find((request) => request.method === "turn/start");
const inputText =
(turnStart?.params as { input?: Array<{ text?: string }> } | undefined)?.input?.[0]?.text ??
"";
expect(inputText).toContain("queued context");
expect(inputText).toContain("prior visible context");
});
it("does not replay mirrored history already covered by an existing Codex binding", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
@@ -1761,10 +1799,10 @@ describe("runCodexAppServerAttempt", () => {
}
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage(
userMessage("we were discussing the Sonnet leak screenshots", bindingUpdatedAt + 1_000),
userMessage("we were discussing the Sonnet leak screenshots", bindingUpdatedAt - 2_000),
);
sessionManager.appendMessage(
assistantMessage("David Ondrej was mentioned in that prior thread", bindingUpdatedAt + 2_000),
assistantMessage("David Ondrej was mentioned in that prior thread", bindingUpdatedAt - 1_000),
);
const harness = createResumeHarness();
const params = createParams(sessionFile, workspaceDir);
@@ -1789,6 +1827,133 @@ describe("runCodexAppServerAttempt", () => {
expect(inputText).toContain("is the previous message trustworthy?");
});
it("projects only newer visible history when a resumed Codex binding is stale", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
const binding = await readCodexAppServerBinding(sessionFile);
const bindingUpdatedAt = Date.parse(binding?.updatedAt ?? "");
if (!Number.isFinite(bindingUpdatedAt)) {
throw new Error("expected valid Codex binding timestamp");
}
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage(userMessage("old native-owned context", bindingUpdatedAt - 2_000));
sessionManager.appendMessage(
userMessage("we were discussing the Sonnet leak screenshots", bindingUpdatedAt + 1_000),
);
sessionManager.appendMessage(
assistantMessage("David Ondrej was mentioned in that prior thread", bindingUpdatedAt + 2_000),
);
const copilotMirrorMessage = {
...assistantMessage("copilot mirror context also matters", bindingUpdatedAt + 3_000),
__openclaw: { mirrorIdentity: "copilot:assistant-1" },
} as ReturnType<typeof assistantMessage> & { __openclaw: { mirrorIdentity: string } };
sessionManager.appendMessage(copilotMirrorMessage);
const harness = createResumeHarness();
const params = createParams(sessionFile, workspaceDir);
params.prompt = "is the previous message trustworthy?";
const run = runCodexAppServerAttempt(params);
await harness.waitForMethod("turn/start");
await new Promise<void>((resolve) => setImmediate(resolve));
await harness.completeTurn({ threadId: "thread-existing", turnId: "turn-1" });
await run;
expect(harness.requests.map((request) => request.method)).toContain("thread/resume");
const turnStart = harness.requests.find((request) => request.method === "turn/start");
const inputText =
(turnStart?.params as { input?: Array<{ text?: string }> } | undefined)?.input?.[0]?.text ??
"";
expect(inputText).toContain("OpenClaw assembled context for this turn:");
expect(inputText).not.toContain("old native-owned context");
expect(inputText).toContain("we were discussing the Sonnet leak screenshots");
expect(inputText).toContain("David Ondrej was mentioned in that prior thread");
expect(inputText).toContain("copilot mirror context also matters");
expect(inputText).toContain("Current user request:");
expect(inputText).toContain("is the previous message trustworthy?");
});
it("does not project Codex mirrored transcript echoes as stale binding continuity", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
const binding = await readCodexAppServerBinding(sessionFile);
const bindingUpdatedAt = Date.parse(binding?.updatedAt ?? "");
if (!Number.isFinite(bindingUpdatedAt)) {
throw new Error("expected valid Codex binding timestamp");
}
const sessionManager = SessionManager.open(sessionFile);
const codexMirrorUserMessage = {
...userMessage("codex mirrored user echo", bindingUpdatedAt + 1_000),
idempotencyKey: "codex-app-server:user-1",
} as ReturnType<typeof userMessage> & { idempotencyKey: string };
sessionManager.appendMessage(codexMirrorUserMessage);
const codexMirrorAssistantMessage = {
...assistantMessage("codex mirrored assistant echo", bindingUpdatedAt + 2_000),
__openclaw: { mirrorIdentity: "codex-app-server:assistant-1" },
} as ReturnType<typeof assistantMessage> & { __openclaw: { mirrorIdentity: string } };
sessionManager.appendMessage(codexMirrorAssistantMessage);
const harness = createResumeHarness();
const params = createParams(sessionFile, workspaceDir);
params.prompt = "continue from the real user message";
const run = runCodexAppServerAttempt(params);
await harness.waitForMethod("turn/start");
await new Promise<void>((resolve) => setImmediate(resolve));
await harness.completeTurn({ threadId: "thread-existing", turnId: "turn-1" });
await run;
const turnStart = harness.requests.find((request) => request.method === "turn/start");
const inputText =
(turnStart?.params as { input?: Array<{ text?: string }> } | undefined)?.input?.[0]?.text ??
"";
expect(inputText).not.toContain("OpenClaw assembled context for this turn:");
expect(inputText).not.toContain("codex mirrored user echo");
expect(inputText).not.toContain("codex mirrored assistant echo");
expect(inputText).toContain("continue from the real user message");
});
it("does not replay messages persisted during an active native Codex turn", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
const originalBindingUpdatedAt = Date.now() - 60_000;
const bindingPath = `${sessionFile}.codex-app-server.json`;
const bindingPayload = JSON.parse(await fs.readFile(bindingPath, "utf8")) as Record<
string,
unknown
>;
bindingPayload.updatedAt = new Date(originalBindingUpdatedAt).toISOString();
await fs.writeFile(bindingPath, `${JSON.stringify(bindingPayload, null, 2)}\n`);
const sessionManager = SessionManager.open(sessionFile);
const firstHarness = createResumeHarness();
const firstRun = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir));
await firstHarness.waitForMethod("turn/start");
sessionManager.appendMessage(userMessage("steered into active native turn", Date.now()));
await firstHarness.completeTurn({ threadId: "thread-existing", turnId: "turn-1" });
await firstRun;
const completedBinding = await readCodexAppServerBinding(sessionFile);
expect(Date.parse(completedBinding?.updatedAt ?? "")).toBeGreaterThan(originalBindingUpdatedAt);
const secondHarness = createResumeHarness();
const secondParams = createParams(sessionFile, workspaceDir);
secondParams.prompt = "continue after steering";
const secondRun = runCodexAppServerAttempt(secondParams);
await secondHarness.waitForMethod("turn/start");
await secondHarness.completeTurn({ threadId: "thread-existing", turnId: "turn-1" });
await secondRun;
const turnStart = secondHarness.requests.find((request) => request.method === "turn/start");
const inputText =
(turnStart?.params as { input?: Array<{ text?: string }> } | undefined)?.input?.[0]?.text ??
"";
expect(inputText).not.toContain("OpenClaw assembled context for this turn:");
expect(inputText).not.toContain("steered into active native turn");
expect(inputText).toContain("continue after steering");
});
it("does not project mirrored messages on consecutive resumes", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
@@ -1824,8 +1989,8 @@ describe("runCodexAppServerAttempt", () => {
const firstInputText =
(firstTurnStart?.params as { input?: Array<{ text?: string }> } | undefined)?.input?.[0]
?.text ?? "";
expect(firstInputText).not.toContain("OpenClaw assembled context for this turn:");
expect(firstInputText).not.toContain("we were discussing the Sonnet leak screenshots");
expect(firstInputText).toContain("OpenClaw assembled context for this turn:");
expect(firstInputText).toContain("we were discussing the Sonnet leak screenshots");
expect(firstInputText).toContain("is the previous message trustworthy?");
const secondHarness = createResumeHarness();
@@ -3869,6 +4034,72 @@ describe("runCodexAppServerAttempt", () => {
expect(savedBinding?.threadId).toBe("thread-1");
});
it("preserves stale-binding continuity when token pressure forces a fresh Codex thread", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const agentDir = path.join(tempDir, "agent");
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
const binding = await readCodexAppServerBinding(sessionFile);
const bindingUpdatedAt = Date.parse(binding?.updatedAt ?? "");
if (!Number.isFinite(bindingUpdatedAt)) {
throw new Error("expected valid Codex binding timestamp");
}
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage(
userMessage("post-binding user context", bindingUpdatedAt + 1_000),
);
sessionManager.appendMessage(
assistantMessage("post-binding assistant context", bindingUpdatedAt + 2_000),
);
await fs.writeFile(
path.join(path.dirname(sessionFile), "sessions.json"),
JSON.stringify({
"agent:main:session-1": {
sessionFile,
totalTokens: 12_000,
},
}),
);
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
await fs.mkdir(rolloutDir, { recursive: true });
await fs.writeFile(
path.join(rolloutDir, "rollout-thread-existing.jsonl"),
`${JSON.stringify({
payload: {
type: "token_count",
info: {
last_token_usage: {
total_tokens: 220_000,
},
model_context_window: 258_400,
},
},
})}\n`,
);
const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();
const params = createParams(sessionFile, workspaceDir);
params.agentDir = agentDir;
params.prompt = "large prompt ".repeat(12_000);
const run = runCodexAppServerAttempt(params, {
pluginConfig: { appServer: { mode: "yolo" } },
});
await waitForMethod("turn/start");
await completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
expect(requests.map((entry) => entry.method)).toContain("thread/start");
expect(requests.map((entry) => entry.method)).not.toContain("thread/resume");
const turnStart = requests.find((request) => request.method === "turn/start");
const inputText =
(turnStart?.params as { input?: Array<{ text?: string }> } | undefined)?.input?.[0]?.text ??
"";
expect(inputText).toContain("post-binding user context");
expect(inputText).toContain("post-binding assistant context");
const savedBinding = await readCodexAppServerBinding(sessionFile);
expect(savedBinding?.threadId).toBe("thread-1");
});
it("preserves bound auth when rotating a fallback-fuse native rollout", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");

View File

@@ -208,6 +208,7 @@ import {
clearCodexAppServerBinding,
clearCodexAppServerBindingForThread,
readCodexAppServerBinding,
writeCodexAppServerBinding,
type CodexAppServerThreadBinding,
} from "./session-binding.js";
import { rotateOversizedCodexAppServerStartupBinding } from "./startup-binding.js";
@@ -681,6 +682,17 @@ export async function runCodexAppServerAttempt(
let developerInstructions = baseDeveloperInstructions;
let prePromptMessageCount = historyMessages.length;
let contextEngineProjection: CodexContextEngineThreadBootstrapProjection | undefined;
let precomputedStaleBindingContinuityProjectionApplied = false;
let staleBindingContinuityForcedFreshStart = false;
const applyFreshThreadContinuityProjection = () => {
const projection = projectContextEngineAssemblyForCodex({
assembledMessages: historyMessages,
originalHistoryMessages: historyMessages,
prompt: params.prompt,
});
promptText = projection.promptText;
prePromptMessageCount = projection.prePromptMessageCount;
};
const applyActiveContextEngineProjection = async (
decisionStartupBinding: CodexAppServerThreadBinding | undefined,
) => {
@@ -796,10 +808,102 @@ export async function runCodexAppServerAttempt(
promptBuild.developerInstructions,
buildCodexTurnCollaborationDeveloperInstructions(),
);
const rebuildCodexTurnPromptFromCurrentProjection = async () => {
const rebuildCodexPromptBuildFromCurrentProjection = async () => {
promptBuild = await buildPromptFromCurrentInputs();
codexTurnPromptText = decorateCodexTurnPromptText(promptBuild.prompt);
};
const rebuildCodexTurnPromptTextFromCurrentProjection = async () => {
const nextPromptBuild = await buildPromptFromCurrentInputs();
// Native Codex thread instructions are fixed once thread/start or
// thread/resume completes; recovery continuity after that is turn input.
promptBuild = {
...promptBuild,
prompt: nextPromptBuild.prompt,
};
codexTurnPromptText = decorateCodexTurnPromptText(nextPromptBuild.prompt);
};
const selectNewerVisibleHistoryAfterBinding = (binding: CodexAppServerThreadBinding) => {
const bindingUpdatedAt = Date.parse(binding.updatedAt);
if (!Number.isFinite(bindingUpdatedAt)) {
return [];
}
return historyMessages.filter((message) => {
if (message.role !== "user" && message.role !== "assistant") {
return false;
}
const record = message as unknown as Record<string, unknown>;
const idempotencyKey = record.idempotencyKey;
if (typeof idempotencyKey === "string" && idempotencyKey.startsWith("codex-app-server:")) {
return false;
}
const meta = record["__openclaw"];
const mirrorIdentity =
meta && typeof meta === "object" && !Array.isArray(meta)
? (meta as Record<string, unknown>).mirrorIdentity
: undefined;
if (typeof mirrorIdentity === "string" && mirrorIdentity.startsWith("codex-app-server:")) {
return false;
}
const timestamp =
typeof message.timestamp === "number"
? message.timestamp
: typeof message.timestamp === "string"
? Date.parse(message.timestamp)
: Number.NaN;
return Number.isFinite(timestamp) && timestamp > bindingUpdatedAt;
});
};
const applyResumeStaleBindingContinuityProjection = (binding: CodexAppServerThreadBinding) => {
const newerVisibleMessages = selectNewerVisibleHistoryAfterBinding(binding);
if (newerVisibleMessages.length === 0) {
return false;
}
const projection = projectContextEngineAssemblyForCodex({
assembledMessages: newerVisibleMessages,
originalHistoryMessages: historyMessages,
prompt: params.prompt,
});
promptText = projection.promptText;
prePromptMessageCount = projection.prePromptMessageCount;
return true;
};
const precomputeNoContextEngineStaleBindingProjection = (
binding: CodexAppServerThreadBinding | undefined,
) => {
precomputedStaleBindingContinuityProjectionApplied = false;
staleBindingContinuityForcedFreshStart = false;
if (activeContextEngine || !binding?.threadId) {
return false;
}
const projected = applyResumeStaleBindingContinuityProjection(binding);
precomputedStaleBindingContinuityProjectionApplied = projected;
return projected;
};
const applyNoContextEngineContinuityProjection = (
action: "started" | "resumed",
binding?: CodexAppServerThreadBinding,
) => {
if (activeContextEngine || !historyMessages.some((message) => message.role === "user")) {
return false;
}
if (action === "resumed" && precomputedStaleBindingContinuityProjectionApplied) {
return true;
}
if (action === "started" && staleBindingContinuityForcedFreshStart) {
return true;
}
if (action === "resumed" && binding) {
return applyResumeStaleBindingContinuityProjection(binding);
}
if (action === "started") {
applyFreshThreadContinuityProjection();
return true;
}
return false;
};
if (precomputeNoContextEngineStaleBindingProjection(startupBinding)) {
await rebuildCodexPromptBuildFromCurrentProjection();
}
const rotateStartupBindingForProjectedTurn = async () => {
if (!startupBinding?.threadId) {
return;
@@ -821,6 +925,7 @@ export async function runCodexAppServerAttempt(
if (startupBinding?.threadId) {
return;
}
staleBindingContinuityForcedFreshStart = precomputedStaleBindingContinuityProjectionApplied;
if (activeContextEngine) {
contextEngineProjection = undefined;
try {
@@ -831,7 +936,7 @@ export async function runCodexAppServerAttempt(
});
}
}
await rebuildCodexTurnPromptFromCurrentProjection();
await rebuildCodexPromptBuildFromCurrentProjection();
embeddedAgentLog.info("codex app-server rebuilt turn prompt after native thread rotation", {
sessionId: params.sessionId,
sessionKey: contextSessionKey,
@@ -969,6 +1074,9 @@ export async function runCodexAppServerAttempt(
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
throw error;
}
if (applyNoContextEngineContinuityProjection(thread.lifecycle.action, thread)) {
await rebuildCodexTurnPromptTextFromCurrentProjection();
}
trajectoryRecorder?.recordEvent("session.started", {
sessionFile: params.sessionFile,
threadId: thread.threadId,
@@ -2239,6 +2347,15 @@ export async function runCodexAppServerAttempt(
!runAbortController.signal.aborted &&
!finalAborted &&
!finalPromptError;
if (shouldDelayNativeHookRelayUnregister) {
await markCodexAppServerBindingCoveredThroughTurn({
sessionFile: params.sessionFile,
threadId: thread.threadId,
authProfileStore: params.authProfileStore,
agentDir: params.agentDir,
config: params.config,
});
}
return {
...result,
timedOut,
@@ -2356,6 +2473,34 @@ async function clearCodexBindingAfterInvalidImagePayload(
await clearCodexAppServerBinding(sessionFile);
}
async function markCodexAppServerBindingCoveredThroughTurn(params: {
sessionFile: string;
threadId: string;
authProfileStore: EmbeddedRunAttemptParams["authProfileStore"];
agentDir?: string;
config?: EmbeddedRunAttemptParams["config"];
}): Promise<void> {
const currentBinding = await readCodexAppServerBinding(params.sessionFile, {
authProfileStore: params.authProfileStore,
agentDir: params.agentDir,
config: params.config,
});
if (!currentBinding || currentBinding.threadId !== params.threadId) {
return;
}
const {
schemaVersion: _schemaVersion,
sessionFile: _boundSessionFile,
updatedAt: _updatedAt,
...bindingForWrite
} = currentBinding;
await writeCodexAppServerBinding(params.sessionFile, bindingForWrite, {
authProfileStore: params.authProfileStore,
agentDir: params.agentDir,
config: params.config,
});
}
function isNonEmptyString(value: unknown): value is string {
return typeof value === "string" && value.length > 0;
}