fix(gateway): abort stale agent runs on restart

This commit is contained in:
Vincent Koc
2026-05-25 21:28:57 +02:00
parent 4424dafe64
commit a122d804dd
10 changed files with 373 additions and 67 deletions

View File

@@ -87,6 +87,8 @@ Docs: https://docs.openclaw.ai
- Tests: avoid rebuilding the Control UI twice during the installer Docker smoke now that `pnpm build` includes `ui:build`.
- Tests: give QA config mutation RPCs enough native Windows budget to finish gateway config writes and restart settle after hot scenario runs.
- Tests: keep the gateway restart-inflight QA scenario focused on restart recovery on native Windows by allowing expected embedded prompt handoff errors and using the Windows-safe timeout budget.
- QA-Lab: make the synthetic OpenAI provider honor generic `reply exactly:` directives after required kickoff reads so restart-recovery scenarios do not fall through to generic repo-summary prose.
- Gateway: abort active `agent` RPC runs during forced restart shutdown so stale in-process turns cannot keep writing a session after the Gateway lifecycle restarts.
- Crabbox: sync clean sparse worktrees through a temporary full checkout even when reusing an existing lease so tracked build-time files are not omitted.
- Build: route `scripts/ui.js` through the shared pnpm runner and keep Control UI chunking helpers in sparse-included source so native Windows Corepack builds can produce `dist/control-ui`.
- Tests: give the memory fallback QA scenario enough turn budget to exercise native Windows gateway runs instead of failing on the client timeout while the mock agent is still dispatching.

View File

@@ -397,6 +397,51 @@ describe("qa mock openai server", () => {
expect(final.output[0]?.content?.[0]?.text).toBe("TOOL_PROGRESS_MARKER_OK");
});
it("honors exact replies after QA kickoff reads without marker wording", async () => {
const server = await startMockServer();
const prompt =
"Gateway restart in-flight QA check. Read QA_KICKOFF_TASK.md, then reply exactly: RESTART-INFLIGHT-MAYBE-OK";
const final = await expectResponsesJson<{
output: Array<{ content?: Array<{ text?: string }> }>;
}>(server, {
stream: false,
input: [
makeUserInput(prompt),
{
type: "function_call_output",
call_id: "call_mock_read_1",
output: JSON.stringify({ text: "QA mission: understand this OpenClaw repo." }),
},
],
});
expect(final.output[0]?.content?.[0]?.text).toBe("RESTART-INFLIGHT-MAYBE-OK");
});
it("does not use stale exact replies from instructions after QA reads", async () => {
const server = await startMockServer();
const final = await expectResponsesJson<{
output: Array<{ content?: Array<{ text?: string }> }>;
}>(server, {
stream: false,
instructions: "If this is a heartbeat check, reply exactly: HEARTBEAT_OK",
input: [
makeUserInput("Read QA_KICKOFF_TASK.md, then summarize what you found."),
{
type: "function_call_output",
call_id: "call_mock_read_1",
output: JSON.stringify({ text: "QA mission: understand this OpenClaw repo." }),
},
],
});
const text = final.output[0]?.content?.[0]?.text ?? "";
expect(text).toContain("Protocol note: I reviewed the requested material.");
expect(text).not.toContain("HEARTBEAT_OK");
});
it("requires deterministic tool-progress error prompts to observe a failed tool", async () => {
const server = await startMockServer();
const prompt =

View File

@@ -1003,8 +1003,8 @@ function buildAssistantText(
: scenarioToolOutput;
const orbitCode = extractOrbitCode(memorySnippet) ?? extractOrbitCode(allInputText);
const mediaPath = /MEDIA:([^\n]+)/.exec(toolOutput)?.[1]?.trim();
const exactReplyDirective =
extractExactReplyDirective(prompt) ?? extractExactReplyDirective(allInputText);
const promptExactReplyDirective = extractExactReplyDirective(prompt);
const exactReplyDirective = promptExactReplyDirective ?? extractExactReplyDirective(allInputText);
const exactMarkerDirective =
extractExactMarkerDirective(prompt) ?? extractExactMarkerDirective(allInputText);
const finishExactlyDirective =
@@ -1042,6 +1042,9 @@ function buildAssistantText(
if (/\bmarker\b/i.test(allInputText) && exactMarkerDirective) {
return exactMarkerDirective;
}
if (promptExactReplyDirective) {
return promptExactReplyDirective;
}
if (/visible skill marker/i.test(prompt)) {
return "VISIBLE-SKILL-OK";
}

View File

@@ -199,6 +199,10 @@ describe("qa scenario catalog", () => {
);
expect(JSON.stringify(readQaScenarioById("gateway-restart-inflight-run").execution.flow))
.toContain("EmbeddedAttemptSessionTakeoverError");
expect(JSON.stringify(readQaScenarioById("gateway-restart-inflight-run").execution.flow))
.toContain("AbortError");
expect(JSON.stringify(readQaScenarioById("gateway-restart-inflight-run").execution.flow))
.toContain("This operation was aborted");
expect(JSON.stringify(readQaScenarioById("gateway-restart-inflight-run").execution.flow))
.toContain("liveTurnTimeoutMs(env, 180000)");
expect(readQaScenarioExecutionConfig("long-context-progress-watchdog")).toMatchObject({

View File

@@ -98,7 +98,7 @@ steps:
- expr: started.runId
- expr: liveTurnTimeoutMs(env, 180000)
- assert:
expr: "waited.status === 'ok' || waited.status === 'timeout' || (waited.status === 'error' && String(waited.error ?? '').includes('EmbeddedAttemptSessionTakeoverError'))"
expr: "waited.status === 'ok' || waited.status === 'timeout' || (waited.status === 'error' && (String(waited.error ?? '').includes('EmbeddedAttemptSessionTakeoverError') || String(waited.error ?? '').includes('AbortError') || String(waited.error ?? '').includes('This operation was aborted')))"
message:
expr: "`interrupted agent run ended with unexpected status: ${JSON.stringify(waited)}`"
- set: interruptedMatches

View File

@@ -25,6 +25,7 @@ import {
createDefaultEmbeddedSession,
createContextEngineBootstrapAndAssemble,
createContextEngineAttemptRunner,
createSubscriptionMock,
expectCalledWithSessionKey,
getHoisted,
resetEmbeddedAttemptHarness,
@@ -1514,6 +1515,102 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
expect(events.indexOf("bootstrap")).toBeLessThan(events.indexOf("lock"));
});
it("does not acquire the session write lock after aborting during prep", async () => {
const abortController = new AbortController();
hoisted.resolveBootstrapContextForRunMock.mockImplementation(async () => {
abortController.abort();
return { bootstrapFiles: [], contextFiles: [] };
});
await expect(
createContextEngineAttemptRunner({
contextEngine: createContextEngineBootstrapAndAssemble(),
sessionKey,
tempPaths,
attemptOverrides: { abortSignal: abortController.signal },
}),
).rejects.toMatchObject({ name: "AbortError" });
expect(hoisted.acquireSessionWriteLockMock).not.toHaveBeenCalled();
});
it("disposes prep runtimes after aborting before the session write lock", async () => {
const abortController = new AbortController();
const lspDispose = vi.fn(async () => {});
hoisted.createBundleLspToolRuntimeMock.mockImplementationOnce(async () => {
abortController.abort();
return {
tools: [],
dispose: lspDispose,
};
});
await expect(
createContextEngineAttemptRunner({
contextEngine: createContextEngineBootstrapAndAssemble(),
sessionKey,
tempPaths,
attemptOverrides: {
abortSignal: abortController.signal,
disableTools: false,
toolsAllow: ["*"],
},
}),
).rejects.toMatchObject({ name: "AbortError" });
expect(hoisted.acquireSessionWriteLockMock).not.toHaveBeenCalled();
expect(hoisted.createBundleLspToolRuntimeMock).toHaveBeenCalledTimes(1);
expect(lspDispose).toHaveBeenCalledTimes(1);
});
it("stops session setup when aborting after the session write lock", async () => {
const abortController = new AbortController();
const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble();
hoisted.sessionManagerOpenMock.mockImplementationOnce(() => {
abortController.abort();
return hoisted.sessionManager;
});
await expect(
createContextEngineAttemptRunner({
contextEngine: createTestContextEngine({ bootstrap, assemble }),
sessionKey,
tempPaths,
attemptOverrides: { abortSignal: abortController.signal },
}),
).rejects.toMatchObject({ name: "AbortError" });
expect(hoisted.acquireSessionWriteLockMock).toHaveBeenCalledTimes(1);
expect(bootstrap).not.toHaveBeenCalled();
expect(assemble).not.toHaveBeenCalled();
expect(hoisted.createAgentSessionMock).not.toHaveBeenCalled();
});
it("does not submit a prompt after aborting a created session", async () => {
const abortController = new AbortController();
const sessionPrompt = vi.fn(async () => {});
hoisted.subscribeEmbeddedPiSessionMock.mockImplementationOnce(() => {
abortController.abort();
return createSubscriptionMock();
});
await expect(
createContextEngineAttemptRunner({
contextEngine: createContextEngineBootstrapAndAssemble(),
sessionKey,
tempPaths,
createSession: () =>
createDefaultEmbeddedSession({
initialMessages: [seedMessage],
prompt: sessionPrompt,
}),
attemptOverrides: { abortSignal: abortController.signal },
}),
).rejects.toMatchObject({ name: "AbortError" });
expect(sessionPrompt).not.toHaveBeenCalled();
});
it("forwards modelId to assemble", async () => {
const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble();
const contextEngine = createTestContextEngine({ bootstrap, assemble });

View File

@@ -68,6 +68,9 @@ type AttemptSpawnWorkspaceHoisted = {
ensureGlobalUndiciStreamTimeoutsMock: UnknownMock;
buildEmbeddedMessageActionDiscoveryInputMock: UnknownMock;
createOpenClawCodingToolsMock: UnknownMock;
getOrCreateSessionMcpRuntimeMock: AsyncUnknownMock;
materializeBundleMcpToolsForRunMock: AsyncUnknownMock;
createBundleLspToolRuntimeMock: AsyncUnknownMock;
subscribeEmbeddedPiSessionMock: Mock<SubscribeEmbeddedPiSessionFn>;
acquireSessionWriteLockMock: Mock<AcquireSessionWriteLockFn>;
installToolResultContextGuardMock: UnknownMock;
@@ -138,6 +141,9 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
const ensureGlobalUndiciStreamTimeoutsMock = vi.fn();
const buildEmbeddedMessageActionDiscoveryInputMock = vi.fn((params: unknown) => params);
const createOpenClawCodingToolsMock = vi.fn(() => []);
const getOrCreateSessionMcpRuntimeMock = vi.fn(async () => undefined);
const materializeBundleMcpToolsForRunMock = vi.fn(async () => undefined);
const createBundleLspToolRuntimeMock = vi.fn(async () => undefined);
const installToolResultContextGuardMock = vi.fn(() => () => {});
const installContextEngineLoopHookMock = vi.fn(() => () => {});
const flushPendingToolResultsAfterIdleMock = vi.fn(async () => {});
@@ -206,6 +212,9 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
ensureGlobalUndiciStreamTimeoutsMock,
buildEmbeddedMessageActionDiscoveryInputMock,
createOpenClawCodingToolsMock,
getOrCreateSessionMcpRuntimeMock,
materializeBundleMcpToolsForRunMock,
createBundleLspToolRuntimeMock,
subscribeEmbeddedPiSessionMock,
acquireSessionWriteLockMock,
installToolResultContextGuardMock,
@@ -566,13 +575,16 @@ vi.mock("../../pi-tools.js", () => ({
vi.mock("../../pi-bundle-mcp-tools.js", () => ({
createBundleMcpToolRuntime: async () => undefined,
getOrCreateSessionMcpRuntime: async () => undefined,
materializeBundleMcpToolsForRun: async () => undefined,
getOrCreateSessionMcpRuntime: (...args: unknown[]) =>
hoisted.getOrCreateSessionMcpRuntimeMock(...args),
materializeBundleMcpToolsForRun: (...args: unknown[]) =>
hoisted.materializeBundleMcpToolsForRunMock(...args),
retireSessionMcpRuntime: async () => true,
}));
vi.mock("../../pi-bundle-lsp-runtime.js", () => ({
createBundleLspToolRuntime: async () => undefined,
createBundleLspToolRuntime: (...args: unknown[]) =>
hoisted.createBundleLspToolRuntimeMock(...args),
}));
vi.mock("../../../image-generation/runtime.js", () => ({
@@ -930,6 +942,9 @@ export function resetEmbeddedAttemptHarness(
},
];
});
hoisted.getOrCreateSessionMcpRuntimeMock.mockReset().mockResolvedValue(undefined);
hoisted.materializeBundleMcpToolsForRunMock.mockReset().mockResolvedValue(undefined);
hoisted.createBundleLspToolRuntimeMock.mockReset().mockResolvedValue(undefined);
hoisted.subscribeEmbeddedPiSessionMock
.mockReset()
.mockImplementation(() => createSubscriptionMock());

View File

@@ -1207,6 +1207,21 @@ function collectAttemptExplicitToolAllowlistSources(params: {
]);
}
function createAttemptAbortError(signal: AbortSignal): Error {
if (signal.reason instanceof Error) {
return signal.reason;
}
const error = new Error(typeof signal.reason === "string" ? signal.reason : "agent run aborted");
error.name = "AbortError";
return error;
}
function throwIfAttemptAbortSignalFired(signal: AbortSignal | undefined): void {
if (signal?.aborted === true) {
throw createAttemptAbortError(signal);
}
}
export async function runEmbeddedAttempt(
params: EmbeddedRunAttemptParams,
): Promise<EmbeddedRunAttemptResult> {
@@ -1218,6 +1233,7 @@ export async function runEmbeddedAttempt(
`embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`,
);
const prepStages = createEmbeddedRunStageTracker();
throwIfAttemptAbortSignalFired(params.abortSignal);
const emitPrepStageSummary = (phase: string) => {
const summary = prepStages.snapshot();
const shouldWarn = shouldWarnEmbeddedRunStageSummary(summary);
@@ -1304,6 +1320,108 @@ export async function runEmbeddedAttempt(
let beforeAgentRunBlockedBy: string | undefined;
// Releases the eager session lock if post-prompt code exits before cleanup.
let releaseRetainedSessionLock: (() => Promise<void>) | undefined;
let bundleMcpRuntime: Awaited<ReturnType<typeof materializeBundleMcpToolsForRun>> | undefined;
let bundleLspRuntime: Awaited<ReturnType<typeof createBundleLspToolRuntime>> | undefined;
let toolSearchCatalogRef: ToolSearchCatalogRef | undefined;
let toolSearchCatalogApplied = false;
let sessionCleanupOwnsEmbeddedResources = false;
let abortActiveSessionForExternalSignal: (() => Promise<void>) | undefined;
let abortRunForExternalSignal: ((isTimeout?: boolean, reason?: unknown) => void) | undefined;
let isCompactionPendingForExternalSignal: (() => boolean) | undefined;
let isCompactionInFlightForExternalSignal: (() => boolean) | undefined;
let removeExternalAbortSignalListener: (() => void) | undefined;
const getAbortReason = (signal: AbortSignal): unknown =>
"reason" in signal ? (signal as { reason?: unknown }).reason : undefined;
const makeTimeoutAbortReason = (): Error => {
const err = new Error("request timed out");
err.name = "TimeoutError";
return err;
};
const cleanupEmbeddedPrepResourcesAfterEarlyExit = async () => {
if (toolSearchCatalogApplied) {
clearToolSearchCatalog({
sessionId: params.sessionId,
sessionKey: sandboxSessionKey,
agentId: sessionAgentId,
runId: params.runId,
catalogRef: toolSearchCatalogRef,
});
toolSearchCatalogApplied = false;
}
try {
await bundleMcpRuntime?.dispose();
} catch {
/* best-effort */
} finally {
bundleMcpRuntime = undefined;
}
try {
await bundleLspRuntime?.dispose();
} catch {
/* best-effort */
} finally {
bundleLspRuntime = undefined;
}
};
const onExternalAbortSignal = () => {
const signal = params.abortSignal;
if (!signal) {
return;
}
externalAbort = true;
const reason = getAbortReason(signal);
const timeout = reason ? isTimeoutError(reason) : false;
if (
shouldFlagCompactionTimeout({
isTimeout: timeout,
isCompactionPendingOrRetrying: isCompactionPendingForExternalSignal?.() ?? false,
isCompactionInFlight: isCompactionInFlightForExternalSignal?.() ?? false,
})
) {
timedOutDuringCompaction = true;
}
if (abortRunForExternalSignal) {
abortRunForExternalSignal(timeout, reason);
return;
}
aborted = true;
if (timeout) {
timedOut = true;
if (!timedOutDuringCompaction && countActiveToolExecutions(params.runId) > 0) {
timedOutDuringToolExecution = true;
}
}
promptError = createAttemptAbortError(signal);
if (!runAbortController.signal.aborted) {
runAbortController.abort(timeout ? (reason ?? makeTimeoutAbortReason()) : reason);
}
void abortActiveSessionForExternalSignal?.();
};
const armExternalAbortSignal = () => {
const signal = params.abortSignal;
if (!signal || removeExternalAbortSignalListener) {
return;
}
if (signal.aborted) {
onExternalAbortSignal();
return;
}
signal.addEventListener("abort", onExternalAbortSignal, { once: true });
removeExternalAbortSignalListener = () => {
signal.removeEventListener("abort", onExternalAbortSignal);
removeExternalAbortSignalListener = undefined;
};
};
const throwIfAttemptAbortSignalFiredAfterPrepCleanup = async () => {
if (params.abortSignal?.aborted === true) {
const abortError = createAttemptAbortError(params.abortSignal);
aborted = true;
externalAbort = true;
promptError = abortError;
await cleanupEmbeddedPrepResourcesAfterEarlyExit();
throw abortError;
}
};
try {
const skillsSnapshotForRun =
sandbox?.enabled && sandbox.workspaceAccess !== "rw" ? undefined : params.skillsSnapshot;
@@ -1431,7 +1549,7 @@ export async function runEmbeddedAttempt(
toolSearchControlsEnabledForRun ||
codeModeControlsEnabledForRun;
let toolSearchCatalogExecutor: ToolSearchCatalogToolExecutor | undefined;
const toolSearchCatalogRef: ToolSearchCatalogRef | undefined =
toolSearchCatalogRef =
toolSearchControlsEnabledForRun || codeModeControlsEnabledForRun
? createToolSearchCatalogRef()
: undefined;
@@ -1713,7 +1831,7 @@ export async function runEmbeddedAttempt(
cfg: params.config,
})
: undefined;
const bundleMcpRuntime = bundleMcpSessionRuntime
bundleMcpRuntime = bundleMcpSessionRuntime
? await materializeBundleMcpToolsForRun({
runtime: bundleMcpSessionRuntime,
reservedToolNames: [
@@ -1727,7 +1845,7 @@ export async function runEmbeddedAttempt(
disableTools: params.disableTools || isRawModelRun,
toolsAllow: params.toolsAllow,
});
const bundleLspRuntime = bundleLspEnabled
bundleLspRuntime = bundleLspEnabled
? await createBundleLspToolRuntime({
workspaceDir: effectiveWorkspace,
cfg: params.config,
@@ -1835,6 +1953,7 @@ export async function runEmbeddedAttempt(
catalogRef: toolSearchCatalogRef,
toolHookContext: catalogToolHookContext,
});
toolSearchCatalogApplied = true;
effectiveTools = filterLocalModelLeanTools({
tools: toolSearch.tools,
config: params.config,
@@ -2135,6 +2254,7 @@ export async function runEmbeddedAttempt(
config: params.config,
compactionTimeoutMs,
});
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
const sessionLockController = await createEmbeddedAttemptSessionLockController({
acquireSessionWriteLock,
lockOptions: {
@@ -2143,6 +2263,8 @@ export async function runEmbeddedAttempt(
},
});
releaseRetainedSessionLock = () => sessionLockController.dispose();
armExternalAbortSignal();
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
let sessionManager: ReturnType<typeof guardSessionManager> | undefined;
let session: Awaited<ReturnType<typeof createAgentSession>>["session"] | undefined;
@@ -2150,16 +2272,19 @@ export async function runEmbeddedAttempt(
let trajectoryRecorder: ReturnType<typeof createTrajectoryRuntimeRecorder> | null = null;
let trajectoryEndRecorded = false;
let buildAbortSettlePromise: () => Promise<void> | null = () => null;
sessionCleanupOwnsEmbeddedResources = true;
try {
await repairSessionFileIfNeeded({
sessionFile: params.sessionFile,
debug: (message) => log.debug(message),
warn: (message) => log.warn(message),
});
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
const hadSessionFile = await fs
.stat(params.sessionFile)
.then(() => true)
.catch(() => false);
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
const transcriptPolicy = resolveAttemptTranscriptPolicy({
runtimePlan: params.runtimePlan,
@@ -2171,6 +2296,7 @@ export async function runEmbeddedAttempt(
});
await prewarmSessionFile(params.sessionFile);
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), {
agentId: sessionAgentId,
sessionKey: params.sessionKey,
@@ -2200,6 +2326,7 @@ export async function runEmbeddedAttempt(
},
});
trackSessionManagerAccess(params.sessionFile);
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
await runAttemptContextEngineBootstrap({
hadSessionFile,
@@ -2230,6 +2357,7 @@ export async function runEmbeddedAttempt(
}),
warn: (message) => log.warn(message),
});
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
await prepareSessionManagerForRun({
sessionManager,
@@ -2238,6 +2366,7 @@ export async function runEmbeddedAttempt(
sessionId: params.sessionId,
cwd: effectiveWorkspace,
});
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
const settingsManager = createPreparedEmbeddedPiSettingsManager({
cwd: effectiveWorkspace,
@@ -2275,6 +2404,7 @@ export async function runEmbeddedAttempt(
extensionFactories,
});
await resourceLoader.reload();
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
// DefaultResourceLoader.reload() rehydrates settings from disk and can drop OpenClaw
// compaction overrides applied in createPreparedEmbeddedPiSettingsManager — same
// rehydration also restores Pi's auto-compaction (openclaw#75799), so re-apply
@@ -2448,10 +2578,11 @@ export async function runEmbeddedAttempt(
},
});
session = createdSession.session;
applySystemPromptOverrideToSession(session, systemPromptText);
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
if (!session) {
throw new Error("Embedded agent session missing");
}
applySystemPromptOverrideToSession(session, systemPromptText);
session.setActiveToolsByName(sessionToolAllowlist);
const activeSession = session;
installSessionEventWriteLock({
@@ -2509,6 +2640,10 @@ export async function runEmbeddedAttempt(
trackSettlePromise(inFlightAbortSettlePromises, promise);
const abortActiveSession = (): Promise<void> =>
trackAbortSettlePromise(Promise.resolve(activeSession.abort()));
abortActiveSessionForExternalSignal = abortActiveSession;
if (runAbortController.signal.aborted) {
void abortActiveSession();
}
buildAbortSettlePromise = (): Promise<void> | null => {
const promises = [...inFlightPromptSettlePromises, ...inFlightAbortSettlePromises];
if (promises.length === 0) {
@@ -3230,13 +3365,6 @@ export async function runEmbeddedAttempt(
}
let yieldAborted = false;
const getAbortReason = (signal: AbortSignal): unknown =>
"reason" in signal ? (signal as { reason?: unknown }).reason : undefined;
const makeTimeoutAbortReason = (): Error => {
const err = new Error("request timed out");
err.name = "TimeoutError";
return err;
};
const abortCompaction = () => {
if (!activeSession.isCompacting) {
return;
@@ -3267,6 +3395,7 @@ export async function runEmbeddedAttempt(
abortCompaction();
void abortActiveSession();
};
abortRunForExternalSignal = abortRun;
idleTimeoutTrigger = (error) => {
idleTimedOut = true;
abortRun(true, error);
@@ -3285,9 +3414,12 @@ export async function runEmbeddedAttempt(
prompt: string,
options?: Parameters<typeof activeSession.prompt>[1],
): Promise<void> =>
withOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, async () =>
abortable(trackPromptSettlePromise(activeSession.prompt(prompt, options))),
);
withOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, async () => {
if (runAbortController.signal.aborted) {
throw createAttemptAbortError(runAbortController.signal);
}
return await abortable(trackPromptSettlePromise(activeSession.prompt(prompt, options)));
});
const onBlockReply = params.onBlockReply
? bindOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, params.onBlockReply)
: undefined;
@@ -3373,6 +3505,8 @@ export async function runEmbeddedAttempt(
getCompactionCount,
getLastCompactionTokensAfter,
} = subscription;
isCompactionPendingForExternalSignal = () => subscription.isCompacting();
isCompactionInFlightForExternalSignal = () => activeSession.isCompacting;
toolSearchCatalogExecutor = async (toolParams) => {
try {
const result = await runToolLifecycle({
@@ -3512,29 +3646,9 @@ export async function runEmbeddedAttempt(
let messagesSnapshot: AgentMessage[] = [];
let sessionIdUsed = activeSession.sessionId;
let sessionFileUsed: string | undefined = params.sessionFile;
const onAbort = () => {
externalAbort = true;
const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined;
const timeout = reason ? isTimeoutError(reason) : false;
if (
shouldFlagCompactionTimeout({
isTimeout: timeout,
isCompactionPendingOrRetrying: subscription.isCompacting(),
isCompactionInFlight: activeSession.isCompacting,
})
) {
timedOutDuringCompaction = true;
}
abortRun(timeout, reason);
};
if (params.abortSignal) {
if (params.abortSignal.aborted) {
onAbort();
} else {
params.abortSignal.addEventListener("abort", onAbort, {
once: true,
});
}
if (params.abortSignal?.aborted === true) {
onExternalAbortSignal();
await throwIfAttemptAbortSignalFiredAfterPrepCleanup();
}
// Hook runner was already obtained earlier before tool creation
@@ -4658,7 +4772,6 @@ export async function runEmbeddedAttempt(
params.replyOperation.detachBackend(queueHandle);
}
clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
params.abortSignal?.removeEventListener?.("abort", onAbort);
}
const toolMetasNormalized = toolMetas
@@ -5073,6 +5186,16 @@ export async function runEmbeddedAttempt(
}
}
} finally {
removeExternalAbortSignalListener?.();
if (!sessionCleanupOwnsEmbeddedResources) {
try {
await cleanupEmbeddedPrepResourcesAfterEarlyExit();
} catch (cleanupErr) {
log.warn(
`failed to clean up embedded prep resources after early attempt exit: runId=${params.runId} ${String(cleanupErr)}`,
);
}
}
try {
await releaseRetainedSessionLock?.();
} catch (releaseErr) {

View File

@@ -450,7 +450,7 @@ describe("createGatewayCloseHandler", () => {
).toBe(true);
});
it("aborts active chat runs when restart reply drain times out", async () => {
it("aborts active runs when restart reply drain times out", async () => {
vi.useFakeTimers();
const controller = new AbortController();
const agentController = new AbortController();
@@ -509,9 +509,9 @@ describe("createGatewayCloseHandler", () => {
expect(result.warnings).toContain("restart-reply-drain");
expect(controller.signal.aborted).toBe(true);
expect(agentController.signal.aborted).toBe(false);
expect(agentController.signal.aborted).toBe(true);
expect(chatAbortControllers.has("run-1")).toBe(false);
expect(chatAbortControllers.has("agent-run-1")).toBe(true);
expect(chatAbortControllers.has("agent-run-1")).toBe(false);
expect(chatRunState.buffers.has("run-1")).toBe(false);
expect(chatRunState.deltaSentAt.has("run-1")).toBe(false);
expect(chatRunState.deltaLastBroadcastLen.has("run-1")).toBe(false);
@@ -521,13 +521,13 @@ describe("createGatewayCloseHandler", () => {
expect(
mocks.logWarn.mock.calls.some(([message]) =>
String(message).includes(
"restart reply drain timed out after 100ms with 1 active chat run(s) still active",
"restart reply drain timed out after 100ms with 2 active run(s) still active",
),
),
).toBe(true);
expect(
mocks.logWarn.mock.calls.some(([message]) =>
String(message).includes("aborted 1 active chat run(s) during restart shutdown"),
String(message).includes("aborted 2 active run(s) during restart shutdown"),
),
).toBe(true);
expect(broadcast).toHaveBeenCalledWith(
@@ -539,9 +539,26 @@ describe("createGatewayCloseHandler", () => {
"chat",
expect.objectContaining({ runId: "run-1", state: "aborted", stopReason: "restart" }),
);
expect(broadcast).toHaveBeenCalledWith(
"chat",
expect.objectContaining({
runId: "agent-run-1",
state: "aborted",
stopReason: "restart",
}),
);
expect(nodeSendToSession).toHaveBeenCalledWith(
"session-1",
"chat",
expect.objectContaining({
runId: "agent-run-1",
state: "aborted",
stopReason: "restart",
}),
);
});
it("does not drain or abort active chat runs for normal shutdown", async () => {
it("does not drain or abort active runs for normal shutdown", async () => {
const controller = new AbortController();
const chatAbortControllers = new Map([
[
@@ -568,7 +585,7 @@ describe("createGatewayCloseHandler", () => {
expect(chatAbortControllers.size).toBe(1);
});
it("aborts active chat runs immediately when restart drain budget is exhausted", async () => {
it("aborts active runs immediately when restart drain budget is exhausted", async () => {
const controller = new AbortController();
const chatAbortControllers = new Map([
[

View File

@@ -91,32 +91,32 @@ function getRestartReplyDrainCounts(params: {
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
}) {
const pendingReplyCount = params.getPendingReplyCount();
const activeChatRuns = listRestartDrainChatRuns(params.chatAbortControllers).length;
const activeRuns = listRestartDrainRuns(params.chatAbortControllers).length;
return {
pendingReplies:
Number.isFinite(pendingReplyCount) && pendingReplyCount > 0
? Math.floor(pendingReplyCount)
: 0,
activeChatRuns,
activeRuns,
};
}
function listRestartDrainChatRuns(
function listRestartDrainRuns(
chatAbortControllers: Map<string, ChatAbortControllerEntry>,
): Array<[string, ChatAbortControllerEntry]> {
return Array.from(chatAbortControllers.entries()).filter(([, entry]) => entry.kind !== "agent");
return Array.from(chatAbortControllers.entries());
}
function formatRestartReplyDrainDetails(counts: {
pendingReplies: number;
activeChatRuns: number;
activeRuns: number;
}): string {
const details: string[] = [];
if (counts.pendingReplies > 0) {
details.push(`${counts.pendingReplies} pending reply(ies)`);
}
if (counts.activeChatRuns > 0) {
details.push(`${counts.activeChatRuns} active chat run(s)`);
if (counts.activeRuns > 0) {
details.push(`${counts.activeRuns} active run(s)`);
}
return details.length > 0 ? details.join(", ") : "no pending reply work";
}
@@ -136,12 +136,12 @@ async function waitForRestartReplyDrain(params: {
}): Promise<{
drained: boolean;
elapsedMs: number;
counts: { pendingReplies: number; activeChatRuns: number };
counts: { pendingReplies: number; activeRuns: number };
}> {
const timeoutMs = Math.max(0, Math.floor(params.timeoutMs));
const pollMs = Math.max(25, Math.floor(params.pollMs ?? RESTART_REPLY_DRAIN_POLL_MS));
let counts = getRestartReplyDrainCounts(params);
if (counts.pendingReplies <= 0 && counts.activeChatRuns <= 0) {
if (counts.pendingReplies <= 0 && counts.activeRuns <= 0) {
return { drained: true, elapsedMs: 0, counts };
}
if (timeoutMs <= 0) {
@@ -156,13 +156,13 @@ async function waitForRestartReplyDrain(params: {
}
await sleepForRestartReplyDrain(Math.min(pollMs, timeoutMs - elapsedMs));
counts = getRestartReplyDrainCounts(params);
if (counts.pendingReplies <= 0 && counts.activeChatRuns <= 0) {
if (counts.pendingReplies <= 0 && counts.activeRuns <= 0) {
return { drained: true, elapsedMs: Date.now() - startedAt, counts };
}
}
}
function abortActiveChatRunsForRestart(params: {
function abortActiveRunsForRestart(params: {
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatRunState: ChatRunState;
removeChatRun: (
@@ -175,7 +175,7 @@ function abortActiveChatRunsForRestart(params: {
nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void;
}): number {
let aborted = 0;
for (const [runId, entry] of listRestartDrainChatRuns(params.chatAbortControllers)) {
for (const [runId, entry] of listRestartDrainRuns(params.chatAbortControllers)) {
const result = abortChatRunById(
{
chatAbortControllers: params.chatAbortControllers,
@@ -220,7 +220,7 @@ async function drainRestartPendingRepliesForShutdown(params: {
warnings: string[];
}): Promise<void> {
const initialCounts = getRestartReplyDrainCounts(params);
if (initialCounts.pendingReplies <= 0 && initialCounts.activeChatRuns <= 0) {
if (initialCounts.pendingReplies <= 0 && initialCounts.activeRuns <= 0) {
return;
}
@@ -246,16 +246,16 @@ async function drainRestartPendingRepliesForShutdown(params: {
);
recordShutdownWarning(params.warnings, "restart-reply-drain");
if (drainResult.counts.activeChatRuns <= 0) {
if (drainResult.counts.activeRuns <= 0) {
return;
}
const abortedRuns = abortActiveChatRunsForRestart(params);
const abortedRuns = abortActiveRunsForRestart(params);
if (abortedRuns <= 0) {
return;
}
shutdownLog.warn(`aborted ${abortedRuns} active chat run(s) during restart shutdown`);
shutdownLog.warn(`aborted ${abortedRuns} active run(s) during restart shutdown`);
const postAbortDrain = await waitForRestartReplyDrain({
getPendingReplyCount: params.getPendingReplyCount,
chatAbortControllers: params.chatAbortControllers,