diff --git a/CHANGELOG.md b/CHANGELOG.md index 62148e9d40b8..baf2a9fd345d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,8 @@ Docs: https://docs.openclaw.ai - Tests: avoid rebuilding the Control UI twice during the installer Docker smoke now that `pnpm build` includes `ui:build`. - Tests: give QA config mutation RPCs enough native Windows budget to finish gateway config writes and restart settle after hot scenario runs. - Tests: keep the gateway restart-inflight QA scenario focused on restart recovery on native Windows by allowing expected embedded prompt handoff errors and using the Windows-safe timeout budget. +- QA-Lab: make the synthetic OpenAI provider honor generic `reply exactly:` directives after required kickoff reads so restart-recovery scenarios do not fall through to generic repo-summary prose. +- Gateway: abort active `agent` RPC runs during forced restart shutdown so stale in-process turns cannot keep writing a session after the Gateway lifecycle restarts. - Crabbox: sync clean sparse worktrees through a temporary full checkout even when reusing an existing lease so tracked build-time files are not omitted. - Build: route `scripts/ui.js` through the shared pnpm runner and keep Control UI chunking helpers in sparse-included source so native Windows Corepack builds can produce `dist/control-ui`. - Tests: give the memory fallback QA scenario enough turn budget to exercise native Windows gateway runs instead of failing on the client timeout while the mock agent is still dispatching. diff --git a/extensions/qa-lab/src/providers/mock-openai/server.test.ts b/extensions/qa-lab/src/providers/mock-openai/server.test.ts index d595ea00a7eb..d92f516212e1 100644 --- a/extensions/qa-lab/src/providers/mock-openai/server.test.ts +++ b/extensions/qa-lab/src/providers/mock-openai/server.test.ts @@ -397,6 +397,51 @@ describe("qa mock openai server", () => { expect(final.output[0]?.content?.[0]?.text).toBe("TOOL_PROGRESS_MARKER_OK"); }); + it("honors exact replies after QA kickoff reads without marker wording", async () => { + const server = await startMockServer(); + const prompt = + "Gateway restart in-flight QA check. Read QA_KICKOFF_TASK.md, then reply exactly: RESTART-INFLIGHT-MAYBE-OK"; + + const final = await expectResponsesJson<{ + output: Array<{ content?: Array<{ text?: string }> }>; + }>(server, { + stream: false, + input: [ + makeUserInput(prompt), + { + type: "function_call_output", + call_id: "call_mock_read_1", + output: JSON.stringify({ text: "QA mission: understand this OpenClaw repo." }), + }, + ], + }); + + expect(final.output[0]?.content?.[0]?.text).toBe("RESTART-INFLIGHT-MAYBE-OK"); + }); + + it("does not use stale exact replies from instructions after QA reads", async () => { + const server = await startMockServer(); + + const final = await expectResponsesJson<{ + output: Array<{ content?: Array<{ text?: string }> }>; + }>(server, { + stream: false, + instructions: "If this is a heartbeat check, reply exactly: HEARTBEAT_OK", + input: [ + makeUserInput("Read QA_KICKOFF_TASK.md, then summarize what you found."), + { + type: "function_call_output", + call_id: "call_mock_read_1", + output: JSON.stringify({ text: "QA mission: understand this OpenClaw repo." }), + }, + ], + }); + + const text = final.output[0]?.content?.[0]?.text ?? ""; + expect(text).toContain("Protocol note: I reviewed the requested material."); + expect(text).not.toContain("HEARTBEAT_OK"); + }); + it("requires deterministic tool-progress error prompts to observe a failed tool", async () => { const server = await startMockServer(); const prompt = diff --git a/extensions/qa-lab/src/providers/mock-openai/server.ts b/extensions/qa-lab/src/providers/mock-openai/server.ts index 27750b019afb..7ae7b0a86511 100644 --- a/extensions/qa-lab/src/providers/mock-openai/server.ts +++ b/extensions/qa-lab/src/providers/mock-openai/server.ts @@ -1003,8 +1003,8 @@ function buildAssistantText( : scenarioToolOutput; const orbitCode = extractOrbitCode(memorySnippet) ?? extractOrbitCode(allInputText); const mediaPath = /MEDIA:([^\n]+)/.exec(toolOutput)?.[1]?.trim(); - const exactReplyDirective = - extractExactReplyDirective(prompt) ?? extractExactReplyDirective(allInputText); + const promptExactReplyDirective = extractExactReplyDirective(prompt); + const exactReplyDirective = promptExactReplyDirective ?? extractExactReplyDirective(allInputText); const exactMarkerDirective = extractExactMarkerDirective(prompt) ?? extractExactMarkerDirective(allInputText); const finishExactlyDirective = @@ -1042,6 +1042,9 @@ function buildAssistantText( if (/\bmarker\b/i.test(allInputText) && exactMarkerDirective) { return exactMarkerDirective; } + if (promptExactReplyDirective) { + return promptExactReplyDirective; + } if (/visible skill marker/i.test(prompt)) { return "VISIBLE-SKILL-OK"; } diff --git a/extensions/qa-lab/src/scenario-catalog.test.ts b/extensions/qa-lab/src/scenario-catalog.test.ts index 7b9e1d3b905f..f61a8f391ae6 100644 --- a/extensions/qa-lab/src/scenario-catalog.test.ts +++ b/extensions/qa-lab/src/scenario-catalog.test.ts @@ -199,6 +199,10 @@ describe("qa scenario catalog", () => { ); expect(JSON.stringify(readQaScenarioById("gateway-restart-inflight-run").execution.flow)) .toContain("EmbeddedAttemptSessionTakeoverError"); + expect(JSON.stringify(readQaScenarioById("gateway-restart-inflight-run").execution.flow)) + .toContain("AbortError"); + expect(JSON.stringify(readQaScenarioById("gateway-restart-inflight-run").execution.flow)) + .toContain("This operation was aborted"); expect(JSON.stringify(readQaScenarioById("gateway-restart-inflight-run").execution.flow)) .toContain("liveTurnTimeoutMs(env, 180000)"); expect(readQaScenarioExecutionConfig("long-context-progress-watchdog")).toMatchObject({ diff --git a/qa/scenarios/runtime/gateway-restart-inflight-run.md b/qa/scenarios/runtime/gateway-restart-inflight-run.md index 6bcc499885a2..f3edcc8a7927 100644 --- a/qa/scenarios/runtime/gateway-restart-inflight-run.md +++ b/qa/scenarios/runtime/gateway-restart-inflight-run.md @@ -98,7 +98,7 @@ steps: - expr: started.runId - expr: liveTurnTimeoutMs(env, 180000) - assert: - expr: "waited.status === 'ok' || waited.status === 'timeout' || (waited.status === 'error' && String(waited.error ?? '').includes('EmbeddedAttemptSessionTakeoverError'))" + expr: "waited.status === 'ok' || waited.status === 'timeout' || (waited.status === 'error' && (String(waited.error ?? '').includes('EmbeddedAttemptSessionTakeoverError') || String(waited.error ?? '').includes('AbortError') || String(waited.error ?? '').includes('This operation was aborted')))" message: expr: "`interrupted agent run ended with unexpected status: ${JSON.stringify(waited)}`" - set: interruptedMatches diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts index 16ad22206cbb..ba23e83c6c06 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts @@ -25,6 +25,7 @@ import { createDefaultEmbeddedSession, createContextEngineBootstrapAndAssemble, createContextEngineAttemptRunner, + createSubscriptionMock, expectCalledWithSessionKey, getHoisted, resetEmbeddedAttemptHarness, @@ -1514,6 +1515,102 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { expect(events.indexOf("bootstrap")).toBeLessThan(events.indexOf("lock")); }); + it("does not acquire the session write lock after aborting during prep", async () => { + const abortController = new AbortController(); + hoisted.resolveBootstrapContextForRunMock.mockImplementation(async () => { + abortController.abort(); + return { bootstrapFiles: [], contextFiles: [] }; + }); + + await expect( + createContextEngineAttemptRunner({ + contextEngine: createContextEngineBootstrapAndAssemble(), + sessionKey, + tempPaths, + attemptOverrides: { abortSignal: abortController.signal }, + }), + ).rejects.toMatchObject({ name: "AbortError" }); + + expect(hoisted.acquireSessionWriteLockMock).not.toHaveBeenCalled(); + }); + + it("disposes prep runtimes after aborting before the session write lock", async () => { + const abortController = new AbortController(); + const lspDispose = vi.fn(async () => {}); + hoisted.createBundleLspToolRuntimeMock.mockImplementationOnce(async () => { + abortController.abort(); + return { + tools: [], + dispose: lspDispose, + }; + }); + + await expect( + createContextEngineAttemptRunner({ + contextEngine: createContextEngineBootstrapAndAssemble(), + sessionKey, + tempPaths, + attemptOverrides: { + abortSignal: abortController.signal, + disableTools: false, + toolsAllow: ["*"], + }, + }), + ).rejects.toMatchObject({ name: "AbortError" }); + + expect(hoisted.acquireSessionWriteLockMock).not.toHaveBeenCalled(); + expect(hoisted.createBundleLspToolRuntimeMock).toHaveBeenCalledTimes(1); + expect(lspDispose).toHaveBeenCalledTimes(1); + }); + + it("stops session setup when aborting after the session write lock", async () => { + const abortController = new AbortController(); + const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble(); + hoisted.sessionManagerOpenMock.mockImplementationOnce(() => { + abortController.abort(); + return hoisted.sessionManager; + }); + + await expect( + createContextEngineAttemptRunner({ + contextEngine: createTestContextEngine({ bootstrap, assemble }), + sessionKey, + tempPaths, + attemptOverrides: { abortSignal: abortController.signal }, + }), + ).rejects.toMatchObject({ name: "AbortError" }); + + expect(hoisted.acquireSessionWriteLockMock).toHaveBeenCalledTimes(1); + expect(bootstrap).not.toHaveBeenCalled(); + expect(assemble).not.toHaveBeenCalled(); + expect(hoisted.createAgentSessionMock).not.toHaveBeenCalled(); + }); + + it("does not submit a prompt after aborting a created session", async () => { + const abortController = new AbortController(); + const sessionPrompt = vi.fn(async () => {}); + hoisted.subscribeEmbeddedPiSessionMock.mockImplementationOnce(() => { + abortController.abort(); + return createSubscriptionMock(); + }); + + await expect( + createContextEngineAttemptRunner({ + contextEngine: createContextEngineBootstrapAndAssemble(), + sessionKey, + tempPaths, + createSession: () => + createDefaultEmbeddedSession({ + initialMessages: [seedMessage], + prompt: sessionPrompt, + }), + attemptOverrides: { abortSignal: abortController.signal }, + }), + ).rejects.toMatchObject({ name: "AbortError" }); + + expect(sessionPrompt).not.toHaveBeenCalled(); + }); + it("forwards modelId to assemble", async () => { const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble(); const contextEngine = createTestContextEngine({ bootstrap, assemble }); diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts index cc2038416ea1..ff60234e6cf2 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts @@ -68,6 +68,9 @@ type AttemptSpawnWorkspaceHoisted = { ensureGlobalUndiciStreamTimeoutsMock: UnknownMock; buildEmbeddedMessageActionDiscoveryInputMock: UnknownMock; createOpenClawCodingToolsMock: UnknownMock; + getOrCreateSessionMcpRuntimeMock: AsyncUnknownMock; + materializeBundleMcpToolsForRunMock: AsyncUnknownMock; + createBundleLspToolRuntimeMock: AsyncUnknownMock; subscribeEmbeddedPiSessionMock: Mock; acquireSessionWriteLockMock: Mock; installToolResultContextGuardMock: UnknownMock; @@ -138,6 +141,9 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => { const ensureGlobalUndiciStreamTimeoutsMock = vi.fn(); const buildEmbeddedMessageActionDiscoveryInputMock = vi.fn((params: unknown) => params); const createOpenClawCodingToolsMock = vi.fn(() => []); + const getOrCreateSessionMcpRuntimeMock = vi.fn(async () => undefined); + const materializeBundleMcpToolsForRunMock = vi.fn(async () => undefined); + const createBundleLspToolRuntimeMock = vi.fn(async () => undefined); const installToolResultContextGuardMock = vi.fn(() => () => {}); const installContextEngineLoopHookMock = vi.fn(() => () => {}); const flushPendingToolResultsAfterIdleMock = vi.fn(async () => {}); @@ -206,6 +212,9 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => { ensureGlobalUndiciStreamTimeoutsMock, buildEmbeddedMessageActionDiscoveryInputMock, createOpenClawCodingToolsMock, + getOrCreateSessionMcpRuntimeMock, + materializeBundleMcpToolsForRunMock, + createBundleLspToolRuntimeMock, subscribeEmbeddedPiSessionMock, acquireSessionWriteLockMock, installToolResultContextGuardMock, @@ -566,13 +575,16 @@ vi.mock("../../pi-tools.js", () => ({ vi.mock("../../pi-bundle-mcp-tools.js", () => ({ createBundleMcpToolRuntime: async () => undefined, - getOrCreateSessionMcpRuntime: async () => undefined, - materializeBundleMcpToolsForRun: async () => undefined, + getOrCreateSessionMcpRuntime: (...args: unknown[]) => + hoisted.getOrCreateSessionMcpRuntimeMock(...args), + materializeBundleMcpToolsForRun: (...args: unknown[]) => + hoisted.materializeBundleMcpToolsForRunMock(...args), retireSessionMcpRuntime: async () => true, })); vi.mock("../../pi-bundle-lsp-runtime.js", () => ({ - createBundleLspToolRuntime: async () => undefined, + createBundleLspToolRuntime: (...args: unknown[]) => + hoisted.createBundleLspToolRuntimeMock(...args), })); vi.mock("../../../image-generation/runtime.js", () => ({ @@ -930,6 +942,9 @@ export function resetEmbeddedAttemptHarness( }, ]; }); + hoisted.getOrCreateSessionMcpRuntimeMock.mockReset().mockResolvedValue(undefined); + hoisted.materializeBundleMcpToolsForRunMock.mockReset().mockResolvedValue(undefined); + hoisted.createBundleLspToolRuntimeMock.mockReset().mockResolvedValue(undefined); hoisted.subscribeEmbeddedPiSessionMock .mockReset() .mockImplementation(() => createSubscriptionMock()); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index bd44890c6b9e..5b70a28df78f 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1207,6 +1207,21 @@ function collectAttemptExplicitToolAllowlistSources(params: { ]); } +function createAttemptAbortError(signal: AbortSignal): Error { + if (signal.reason instanceof Error) { + return signal.reason; + } + const error = new Error(typeof signal.reason === "string" ? signal.reason : "agent run aborted"); + error.name = "AbortError"; + return error; +} + +function throwIfAttemptAbortSignalFired(signal: AbortSignal | undefined): void { + if (signal?.aborted === true) { + throw createAttemptAbortError(signal); + } +} + export async function runEmbeddedAttempt( params: EmbeddedRunAttemptParams, ): Promise { @@ -1218,6 +1233,7 @@ export async function runEmbeddedAttempt( `embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`, ); const prepStages = createEmbeddedRunStageTracker(); + throwIfAttemptAbortSignalFired(params.abortSignal); const emitPrepStageSummary = (phase: string) => { const summary = prepStages.snapshot(); const shouldWarn = shouldWarnEmbeddedRunStageSummary(summary); @@ -1304,6 +1320,108 @@ export async function runEmbeddedAttempt( let beforeAgentRunBlockedBy: string | undefined; // Releases the eager session lock if post-prompt code exits before cleanup. let releaseRetainedSessionLock: (() => Promise) | undefined; + let bundleMcpRuntime: Awaited> | undefined; + let bundleLspRuntime: Awaited> | undefined; + let toolSearchCatalogRef: ToolSearchCatalogRef | undefined; + let toolSearchCatalogApplied = false; + let sessionCleanupOwnsEmbeddedResources = false; + let abortActiveSessionForExternalSignal: (() => Promise) | undefined; + let abortRunForExternalSignal: ((isTimeout?: boolean, reason?: unknown) => void) | undefined; + let isCompactionPendingForExternalSignal: (() => boolean) | undefined; + let isCompactionInFlightForExternalSignal: (() => boolean) | undefined; + let removeExternalAbortSignalListener: (() => void) | undefined; + const getAbortReason = (signal: AbortSignal): unknown => + "reason" in signal ? (signal as { reason?: unknown }).reason : undefined; + const makeTimeoutAbortReason = (): Error => { + const err = new Error("request timed out"); + err.name = "TimeoutError"; + return err; + }; + const cleanupEmbeddedPrepResourcesAfterEarlyExit = async () => { + if (toolSearchCatalogApplied) { + clearToolSearchCatalog({ + sessionId: params.sessionId, + sessionKey: sandboxSessionKey, + agentId: sessionAgentId, + runId: params.runId, + catalogRef: toolSearchCatalogRef, + }); + toolSearchCatalogApplied = false; + } + try { + await bundleMcpRuntime?.dispose(); + } catch { + /* best-effort */ + } finally { + bundleMcpRuntime = undefined; + } + try { + await bundleLspRuntime?.dispose(); + } catch { + /* best-effort */ + } finally { + bundleLspRuntime = undefined; + } + }; + const onExternalAbortSignal = () => { + const signal = params.abortSignal; + if (!signal) { + return; + } + externalAbort = true; + const reason = getAbortReason(signal); + const timeout = reason ? isTimeoutError(reason) : false; + if ( + shouldFlagCompactionTimeout({ + isTimeout: timeout, + isCompactionPendingOrRetrying: isCompactionPendingForExternalSignal?.() ?? false, + isCompactionInFlight: isCompactionInFlightForExternalSignal?.() ?? false, + }) + ) { + timedOutDuringCompaction = true; + } + if (abortRunForExternalSignal) { + abortRunForExternalSignal(timeout, reason); + return; + } + aborted = true; + if (timeout) { + timedOut = true; + if (!timedOutDuringCompaction && countActiveToolExecutions(params.runId) > 0) { + timedOutDuringToolExecution = true; + } + } + promptError = createAttemptAbortError(signal); + if (!runAbortController.signal.aborted) { + runAbortController.abort(timeout ? (reason ?? makeTimeoutAbortReason()) : reason); + } + void abortActiveSessionForExternalSignal?.(); + }; + const armExternalAbortSignal = () => { + const signal = params.abortSignal; + if (!signal || removeExternalAbortSignalListener) { + return; + } + if (signal.aborted) { + onExternalAbortSignal(); + return; + } + signal.addEventListener("abort", onExternalAbortSignal, { once: true }); + removeExternalAbortSignalListener = () => { + signal.removeEventListener("abort", onExternalAbortSignal); + removeExternalAbortSignalListener = undefined; + }; + }; + const throwIfAttemptAbortSignalFiredAfterPrepCleanup = async () => { + if (params.abortSignal?.aborted === true) { + const abortError = createAttemptAbortError(params.abortSignal); + aborted = true; + externalAbort = true; + promptError = abortError; + await cleanupEmbeddedPrepResourcesAfterEarlyExit(); + throw abortError; + } + }; try { const skillsSnapshotForRun = sandbox?.enabled && sandbox.workspaceAccess !== "rw" ? undefined : params.skillsSnapshot; @@ -1431,7 +1549,7 @@ export async function runEmbeddedAttempt( toolSearchControlsEnabledForRun || codeModeControlsEnabledForRun; let toolSearchCatalogExecutor: ToolSearchCatalogToolExecutor | undefined; - const toolSearchCatalogRef: ToolSearchCatalogRef | undefined = + toolSearchCatalogRef = toolSearchControlsEnabledForRun || codeModeControlsEnabledForRun ? createToolSearchCatalogRef() : undefined; @@ -1713,7 +1831,7 @@ export async function runEmbeddedAttempt( cfg: params.config, }) : undefined; - const bundleMcpRuntime = bundleMcpSessionRuntime + bundleMcpRuntime = bundleMcpSessionRuntime ? await materializeBundleMcpToolsForRun({ runtime: bundleMcpSessionRuntime, reservedToolNames: [ @@ -1727,7 +1845,7 @@ export async function runEmbeddedAttempt( disableTools: params.disableTools || isRawModelRun, toolsAllow: params.toolsAllow, }); - const bundleLspRuntime = bundleLspEnabled + bundleLspRuntime = bundleLspEnabled ? await createBundleLspToolRuntime({ workspaceDir: effectiveWorkspace, cfg: params.config, @@ -1835,6 +1953,7 @@ export async function runEmbeddedAttempt( catalogRef: toolSearchCatalogRef, toolHookContext: catalogToolHookContext, }); + toolSearchCatalogApplied = true; effectiveTools = filterLocalModelLeanTools({ tools: toolSearch.tools, config: params.config, @@ -2135,6 +2254,7 @@ export async function runEmbeddedAttempt( config: params.config, compactionTimeoutMs, }); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); const sessionLockController = await createEmbeddedAttemptSessionLockController({ acquireSessionWriteLock, lockOptions: { @@ -2143,6 +2263,8 @@ export async function runEmbeddedAttempt( }, }); releaseRetainedSessionLock = () => sessionLockController.dispose(); + armExternalAbortSignal(); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); let sessionManager: ReturnType | undefined; let session: Awaited>["session"] | undefined; @@ -2150,16 +2272,19 @@ export async function runEmbeddedAttempt( let trajectoryRecorder: ReturnType | null = null; let trajectoryEndRecorded = false; let buildAbortSettlePromise: () => Promise | null = () => null; + sessionCleanupOwnsEmbeddedResources = true; try { await repairSessionFileIfNeeded({ sessionFile: params.sessionFile, debug: (message) => log.debug(message), warn: (message) => log.warn(message), }); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); const hadSessionFile = await fs .stat(params.sessionFile) .then(() => true) .catch(() => false); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); const transcriptPolicy = resolveAttemptTranscriptPolicy({ runtimePlan: params.runtimePlan, @@ -2171,6 +2296,7 @@ export async function runEmbeddedAttempt( }); await prewarmSessionFile(params.sessionFile); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), { agentId: sessionAgentId, sessionKey: params.sessionKey, @@ -2200,6 +2326,7 @@ export async function runEmbeddedAttempt( }, }); trackSessionManagerAccess(params.sessionFile); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); await runAttemptContextEngineBootstrap({ hadSessionFile, @@ -2230,6 +2357,7 @@ export async function runEmbeddedAttempt( }), warn: (message) => log.warn(message), }); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); await prepareSessionManagerForRun({ sessionManager, @@ -2238,6 +2366,7 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, cwd: effectiveWorkspace, }); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); const settingsManager = createPreparedEmbeddedPiSettingsManager({ cwd: effectiveWorkspace, @@ -2275,6 +2404,7 @@ export async function runEmbeddedAttempt( extensionFactories, }); await resourceLoader.reload(); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); // DefaultResourceLoader.reload() rehydrates settings from disk and can drop OpenClaw // compaction overrides applied in createPreparedEmbeddedPiSettingsManager — same // rehydration also restores Pi's auto-compaction (openclaw#75799), so re-apply @@ -2448,10 +2578,11 @@ export async function runEmbeddedAttempt( }, }); session = createdSession.session; - applySystemPromptOverrideToSession(session, systemPromptText); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); if (!session) { throw new Error("Embedded agent session missing"); } + applySystemPromptOverrideToSession(session, systemPromptText); session.setActiveToolsByName(sessionToolAllowlist); const activeSession = session; installSessionEventWriteLock({ @@ -2509,6 +2640,10 @@ export async function runEmbeddedAttempt( trackSettlePromise(inFlightAbortSettlePromises, promise); const abortActiveSession = (): Promise => trackAbortSettlePromise(Promise.resolve(activeSession.abort())); + abortActiveSessionForExternalSignal = abortActiveSession; + if (runAbortController.signal.aborted) { + void abortActiveSession(); + } buildAbortSettlePromise = (): Promise | null => { const promises = [...inFlightPromptSettlePromises, ...inFlightAbortSettlePromises]; if (promises.length === 0) { @@ -3230,13 +3365,6 @@ export async function runEmbeddedAttempt( } let yieldAborted = false; - const getAbortReason = (signal: AbortSignal): unknown => - "reason" in signal ? (signal as { reason?: unknown }).reason : undefined; - const makeTimeoutAbortReason = (): Error => { - const err = new Error("request timed out"); - err.name = "TimeoutError"; - return err; - }; const abortCompaction = () => { if (!activeSession.isCompacting) { return; @@ -3267,6 +3395,7 @@ export async function runEmbeddedAttempt( abortCompaction(); void abortActiveSession(); }; + abortRunForExternalSignal = abortRun; idleTimeoutTrigger = (error) => { idleTimedOut = true; abortRun(true, error); @@ -3285,9 +3414,12 @@ export async function runEmbeddedAttempt( prompt: string, options?: Parameters[1], ): Promise => - withOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, async () => - abortable(trackPromptSettlePromise(activeSession.prompt(prompt, options))), - ); + withOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, async () => { + if (runAbortController.signal.aborted) { + throw createAttemptAbortError(runAbortController.signal); + } + return await abortable(trackPromptSettlePromise(activeSession.prompt(prompt, options))); + }); const onBlockReply = params.onBlockReply ? bindOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, params.onBlockReply) : undefined; @@ -3373,6 +3505,8 @@ export async function runEmbeddedAttempt( getCompactionCount, getLastCompactionTokensAfter, } = subscription; + isCompactionPendingForExternalSignal = () => subscription.isCompacting(); + isCompactionInFlightForExternalSignal = () => activeSession.isCompacting; toolSearchCatalogExecutor = async (toolParams) => { try { const result = await runToolLifecycle({ @@ -3512,29 +3646,9 @@ export async function runEmbeddedAttempt( let messagesSnapshot: AgentMessage[] = []; let sessionIdUsed = activeSession.sessionId; let sessionFileUsed: string | undefined = params.sessionFile; - const onAbort = () => { - externalAbort = true; - const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined; - const timeout = reason ? isTimeoutError(reason) : false; - if ( - shouldFlagCompactionTimeout({ - isTimeout: timeout, - isCompactionPendingOrRetrying: subscription.isCompacting(), - isCompactionInFlight: activeSession.isCompacting, - }) - ) { - timedOutDuringCompaction = true; - } - abortRun(timeout, reason); - }; - if (params.abortSignal) { - if (params.abortSignal.aborted) { - onAbort(); - } else { - params.abortSignal.addEventListener("abort", onAbort, { - once: true, - }); - } + if (params.abortSignal?.aborted === true) { + onExternalAbortSignal(); + await throwIfAttemptAbortSignalFiredAfterPrepCleanup(); } // Hook runner was already obtained earlier before tool creation @@ -4658,7 +4772,6 @@ export async function runEmbeddedAttempt( params.replyOperation.detachBackend(queueHandle); } clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); - params.abortSignal?.removeEventListener?.("abort", onAbort); } const toolMetasNormalized = toolMetas @@ -5073,6 +5186,16 @@ export async function runEmbeddedAttempt( } } } finally { + removeExternalAbortSignalListener?.(); + if (!sessionCleanupOwnsEmbeddedResources) { + try { + await cleanupEmbeddedPrepResourcesAfterEarlyExit(); + } catch (cleanupErr) { + log.warn( + `failed to clean up embedded prep resources after early attempt exit: runId=${params.runId} ${String(cleanupErr)}`, + ); + } + } try { await releaseRetainedSessionLock?.(); } catch (releaseErr) { diff --git a/src/gateway/server-close.test.ts b/src/gateway/server-close.test.ts index a04ff3bbedfe..6d8e433f2012 100644 --- a/src/gateway/server-close.test.ts +++ b/src/gateway/server-close.test.ts @@ -450,7 +450,7 @@ describe("createGatewayCloseHandler", () => { ).toBe(true); }); - it("aborts active chat runs when restart reply drain times out", async () => { + it("aborts active runs when restart reply drain times out", async () => { vi.useFakeTimers(); const controller = new AbortController(); const agentController = new AbortController(); @@ -509,9 +509,9 @@ describe("createGatewayCloseHandler", () => { expect(result.warnings).toContain("restart-reply-drain"); expect(controller.signal.aborted).toBe(true); - expect(agentController.signal.aborted).toBe(false); + expect(agentController.signal.aborted).toBe(true); expect(chatAbortControllers.has("run-1")).toBe(false); - expect(chatAbortControllers.has("agent-run-1")).toBe(true); + expect(chatAbortControllers.has("agent-run-1")).toBe(false); expect(chatRunState.buffers.has("run-1")).toBe(false); expect(chatRunState.deltaSentAt.has("run-1")).toBe(false); expect(chatRunState.deltaLastBroadcastLen.has("run-1")).toBe(false); @@ -521,13 +521,13 @@ describe("createGatewayCloseHandler", () => { expect( mocks.logWarn.mock.calls.some(([message]) => String(message).includes( - "restart reply drain timed out after 100ms with 1 active chat run(s) still active", + "restart reply drain timed out after 100ms with 2 active run(s) still active", ), ), ).toBe(true); expect( mocks.logWarn.mock.calls.some(([message]) => - String(message).includes("aborted 1 active chat run(s) during restart shutdown"), + String(message).includes("aborted 2 active run(s) during restart shutdown"), ), ).toBe(true); expect(broadcast).toHaveBeenCalledWith( @@ -539,9 +539,26 @@ describe("createGatewayCloseHandler", () => { "chat", expect.objectContaining({ runId: "run-1", state: "aborted", stopReason: "restart" }), ); + expect(broadcast).toHaveBeenCalledWith( + "chat", + expect.objectContaining({ + runId: "agent-run-1", + state: "aborted", + stopReason: "restart", + }), + ); + expect(nodeSendToSession).toHaveBeenCalledWith( + "session-1", + "chat", + expect.objectContaining({ + runId: "agent-run-1", + state: "aborted", + stopReason: "restart", + }), + ); }); - it("does not drain or abort active chat runs for normal shutdown", async () => { + it("does not drain or abort active runs for normal shutdown", async () => { const controller = new AbortController(); const chatAbortControllers = new Map([ [ @@ -568,7 +585,7 @@ describe("createGatewayCloseHandler", () => { expect(chatAbortControllers.size).toBe(1); }); - it("aborts active chat runs immediately when restart drain budget is exhausted", async () => { + it("aborts active runs immediately when restart drain budget is exhausted", async () => { const controller = new AbortController(); const chatAbortControllers = new Map([ [ diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index 24c1ef8e2390..5f12186c63ce 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -91,32 +91,32 @@ function getRestartReplyDrainCounts(params: { chatAbortControllers: Map; }) { const pendingReplyCount = params.getPendingReplyCount(); - const activeChatRuns = listRestartDrainChatRuns(params.chatAbortControllers).length; + const activeRuns = listRestartDrainRuns(params.chatAbortControllers).length; return { pendingReplies: Number.isFinite(pendingReplyCount) && pendingReplyCount > 0 ? Math.floor(pendingReplyCount) : 0, - activeChatRuns, + activeRuns, }; } -function listRestartDrainChatRuns( +function listRestartDrainRuns( chatAbortControllers: Map, ): Array<[string, ChatAbortControllerEntry]> { - return Array.from(chatAbortControllers.entries()).filter(([, entry]) => entry.kind !== "agent"); + return Array.from(chatAbortControllers.entries()); } function formatRestartReplyDrainDetails(counts: { pendingReplies: number; - activeChatRuns: number; + activeRuns: number; }): string { const details: string[] = []; if (counts.pendingReplies > 0) { details.push(`${counts.pendingReplies} pending reply(ies)`); } - if (counts.activeChatRuns > 0) { - details.push(`${counts.activeChatRuns} active chat run(s)`); + if (counts.activeRuns > 0) { + details.push(`${counts.activeRuns} active run(s)`); } return details.length > 0 ? details.join(", ") : "no pending reply work"; } @@ -136,12 +136,12 @@ async function waitForRestartReplyDrain(params: { }): Promise<{ drained: boolean; elapsedMs: number; - counts: { pendingReplies: number; activeChatRuns: number }; + counts: { pendingReplies: number; activeRuns: number }; }> { const timeoutMs = Math.max(0, Math.floor(params.timeoutMs)); const pollMs = Math.max(25, Math.floor(params.pollMs ?? RESTART_REPLY_DRAIN_POLL_MS)); let counts = getRestartReplyDrainCounts(params); - if (counts.pendingReplies <= 0 && counts.activeChatRuns <= 0) { + if (counts.pendingReplies <= 0 && counts.activeRuns <= 0) { return { drained: true, elapsedMs: 0, counts }; } if (timeoutMs <= 0) { @@ -156,13 +156,13 @@ async function waitForRestartReplyDrain(params: { } await sleepForRestartReplyDrain(Math.min(pollMs, timeoutMs - elapsedMs)); counts = getRestartReplyDrainCounts(params); - if (counts.pendingReplies <= 0 && counts.activeChatRuns <= 0) { + if (counts.pendingReplies <= 0 && counts.activeRuns <= 0) { return { drained: true, elapsedMs: Date.now() - startedAt, counts }; } } } -function abortActiveChatRunsForRestart(params: { +function abortActiveRunsForRestart(params: { chatAbortControllers: Map; chatRunState: ChatRunState; removeChatRun: ( @@ -175,7 +175,7 @@ function abortActiveChatRunsForRestart(params: { nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void; }): number { let aborted = 0; - for (const [runId, entry] of listRestartDrainChatRuns(params.chatAbortControllers)) { + for (const [runId, entry] of listRestartDrainRuns(params.chatAbortControllers)) { const result = abortChatRunById( { chatAbortControllers: params.chatAbortControllers, @@ -220,7 +220,7 @@ async function drainRestartPendingRepliesForShutdown(params: { warnings: string[]; }): Promise { const initialCounts = getRestartReplyDrainCounts(params); - if (initialCounts.pendingReplies <= 0 && initialCounts.activeChatRuns <= 0) { + if (initialCounts.pendingReplies <= 0 && initialCounts.activeRuns <= 0) { return; } @@ -246,16 +246,16 @@ async function drainRestartPendingRepliesForShutdown(params: { ); recordShutdownWarning(params.warnings, "restart-reply-drain"); - if (drainResult.counts.activeChatRuns <= 0) { + if (drainResult.counts.activeRuns <= 0) { return; } - const abortedRuns = abortActiveChatRunsForRestart(params); + const abortedRuns = abortActiveRunsForRestart(params); if (abortedRuns <= 0) { return; } - shutdownLog.warn(`aborted ${abortedRuns} active chat run(s) during restart shutdown`); + shutdownLog.warn(`aborted ${abortedRuns} active run(s) during restart shutdown`); const postAbortDrain = await waitForRestartReplyDrain({ getPendingReplyCount: params.getPendingReplyCount, chatAbortControllers: params.chatAbortControllers,