diff --git a/docs/concepts/progress-drafts.md b/docs/concepts/progress-drafts.md index cdcf4f42c642..24b755b7b620 100644 --- a/docs/concepts/progress-drafts.md +++ b/docs/concepts/progress-drafts.md @@ -178,6 +178,50 @@ Progress lines are enabled by default in progress mode. They come from real run events: tool starts, item updates, task plans, approvals, command output, patch summaries, and similar agent activity. +Tools can also emit typed progress while a single tool call is still running. +That is how a slow fetch or search can update the visible draft before the tool +returns its final result. The progress update is a partial tool result with +empty model content and explicit public channel metadata: + +```json +{ + "content": [], + "progress": { + "text": "Fetching page content...", + "visibility": "channel", + "privacy": "public", + "id": "web_fetch:fetching" + } +} +``` + +OpenClaw renders only the `progress.text` in the channel progress UI. The +normal tool result still arrives later as `content` and `details`, and is the +only part returned to the model. + +When adding progress to a tool, use a short, generic message and delay it until +the operation has been pending long enough to be useful: + +```typescript +const clearProgressTimer = scheduleToolProgress( + onUpdate, + { text: "Fetching page content...", id: "web_fetch:fetching" }, + 5_000, + { signal }, +); + +try { + return await runToolWork(); +} finally { + clearProgressTimer(); +} +``` + +This pattern means fast calls do not show a progress line, long calls show one +while they are still pending, and canceled calls clear the timer before stale +progress can appear. Progress text is a public UI side channel, so it must not +include secrets, raw arguments, fetched content, command output, or page text. + OpenClaw uses the same formatter for progress drafts and `/verbose`: ```json5 diff --git a/docs/concepts/streaming.md b/docs/concepts/streaming.md index 9775179dd68f..189e126b337b 100644 --- a/docs/concepts/streaming.md +++ b/docs/concepts/streaming.md @@ -197,6 +197,12 @@ Matrix: Preview streaming can also include **tool-progress** updates - short status lines like "searching the web", "reading file", or "calling tool" - that appear in the same preview message while tools are running, ahead of the final reply. In Codex app-server mode, Codex preamble/commentary messages use this same preview path, so short "I am checking..." progress notes can stream into the editable draft without becoming part of the final answer. This keeps multi-step tool turns visually alive rather than silent between the first thinking preview and the final answer. +Long-running tools may emit typed progress before they return. For example, +`web_fetch` arms a five-second timer when it starts: if the fetch is still +pending, the preview can show `Fetching page content...`; if the fetch finishes +or is canceled before then, no progress line is emitted. The later final tool +result is still delivered normally to the model. + Supported surfaces: - **Discord**, **Slack**, **Telegram**, and **Matrix** stream tool-progress and Codex preamble updates into the live preview edit by default when preview streaming is active. Microsoft Teams uses its native progress stream in personal chats. diff --git a/docs/tools/web-fetch.md b/docs/tools/web-fetch.md index ee15c4067f69..39fcc032e2ab 100644 --- a/docs/tools/web-fetch.md +++ b/docs/tools/web-fetch.md @@ -57,6 +57,21 @@ Truncate output to this many characters. +## Progress updates + +`web_fetch` emits a public progress line only when the fetch is still pending +after five seconds: + +```text +Fetching page content... +``` + +Fast cache hits and quick network responses finish before the timer fires, so +they do not show a progress line. If the call is canceled, the timer is cleared. +When the fetch eventually completes, the agent receives the normal tool result; +the progress line is only channel UI state and never contains fetched page +content. + ## Config ```json5 diff --git a/packages/agent-core/src/types.ts b/packages/agent-core/src/types.ts index 23d6d0aa5474..4cc7391c521a 100644 --- a/packages/agent-core/src/types.ts +++ b/packages/agent-core/src/types.ts @@ -345,12 +345,26 @@ export interface AgentState { readonly errorMessage?: string; } +/** Channel-safe progress text emitted by a running tool. */ +export interface AgentToolProgress { + /** Public text suitable for user-facing progress surfaces. */ + text: string; + /** Tool progress is rendered by channel progress UIs. */ + visibility: "channel"; + /** Progress text must not contain secrets, private args, or fetched content. */ + privacy: "public"; + /** Optional stable id for progress line replacement. */ + id?: string; +} + /** Final or partial result produced by a tool. */ export interface AgentToolResult { /** Text or image content returned to the model. */ content: (TextContent | ImageContent)[]; /** Arbitrary structured details for logs or UI rendering. */ details: T; + /** Optional public progress hint for partial tool updates; never model content. */ + progress?: AgentToolProgress; /** * Hint that the agent should stop after the current tool batch. * Early termination only happens when every finalized tool result in the batch sets this to true. diff --git a/src/agents/embedded-agent-subscribe.handlers.tools.test.ts b/src/agents/embedded-agent-subscribe.handlers.tools.test.ts index 0540331c2f00..92e6419453c6 100644 --- a/src/agents/embedded-agent-subscribe.handlers.tools.test.ts +++ b/src/agents/embedded-agent-subscribe.handlers.tools.test.ts @@ -1087,6 +1087,138 @@ describe("handleToolExecutionEnd exec approval prompts", () => { }); describe("handleToolExecutionEnd derived tool events", () => { + it("surfaces typed public tool progress for any non-exec tool", () => { + resetAgentEventsForTest(); + const events: Array<{ stream?: string; data?: Record }> = []; + registerAgentEventListener((evt) => { + events.push(evt as never); + }); + const { ctx, onAgentEvent } = createTestContext(); + + handleToolExecutionUpdate( + ctx as never, + { + type: "tool_execution_update", + toolName: "custom_fetcher", + toolCallId: "tool-custom-progress", + partialResult: { + content: [], + details: undefined, + progress: { + text: "Loading remote resource...", + visibility: "channel", + privacy: "public", + }, + }, + } as never, + ); + + expect( + events.filter( + (event) => + event.stream === "tool" && + (event.data as { phase?: string } | undefined)?.phase === "update", + ), + ).toHaveLength(0); + const itemEvent = requireRecord( + onAgentEvent.mock.calls + .map((call) => call[0]) + .find((event) => (event as { stream?: string })?.stream === "item"), + "progress item event", + ); + expectRecordFields(itemEvent.data, "progress item event data", { + itemId: "tool:tool-custom-progress", + phase: "update", + kind: "tool", + name: "custom_fetcher", + progressText: "Loading remote resource...", + status: "running", + }); + expect(requireRecord(itemEvent.data, "progress item event data").meta).toBeUndefined(); + + resetAgentEventsForTest(); + }); + + it("does not promote untyped non-exec content into channel progress", () => { + resetAgentEventsForTest(); + const events: Array<{ stream?: string; data?: Record }> = []; + registerAgentEventListener((evt) => { + events.push(evt as never); + }); + const { ctx, onAgentEvent } = createTestContext(); + + handleToolExecutionUpdate( + ctx as never, + { + type: "tool_execution_update", + toolName: "web_fetch", + toolCallId: "tool-web-fetch-untyped", + partialResult: { + content: [{ type: "text", text: "Fetching page content..." }], + details: undefined, + }, + } as never, + ); + + expect( + events.filter( + (event) => + event.stream === "tool" && + (event.data as { phase?: string } | undefined)?.phase === "update", + ), + ).toHaveLength(1); + const itemEvent = requireRecord( + onAgentEvent.mock.calls + .map((call) => call[0]) + .find((event) => (event as { stream?: string })?.stream === "item"), + "tool item event", + ); + expect(requireRecord(itemEvent.data, "tool item event data").progressText).toBeUndefined(); + expect( + onAgentEvent.mock.calls + .map((call) => call[0]) + .filter((event) => (event as { stream?: string })?.stream === "tool"), + ).toHaveLength(1); + + resetAgentEventsForTest(); + }); + + it("caps typed public tool progress before channel item events", () => { + const { ctx, onAgentEvent } = createTestContext(); + const largeProgress = "x".repeat(9000); + + handleToolExecutionUpdate( + ctx as never, + { + type: "tool_execution_update", + toolName: "custom_fetcher", + toolCallId: "tool-large-progress", + partialResult: { + content: [], + details: undefined, + progress: { + text: largeProgress, + visibility: "channel", + privacy: "public", + }, + }, + } as never, + ); + + const itemEvent = requireRecord( + onAgentEvent.mock.calls + .map((call) => call[0]) + .find((event) => (event as { stream?: string })?.stream === "item"), + "large progress item event", + ); + const progressText = requireString( + requireRecord(itemEvent.data, "large progress item event data").progressText, + "progress text", + ); + expect(progressText).toContain("...(live output truncated)..."); + expect(progressText.length).toBeLessThan(largeProgress.length); + }); + it("emits command output deltas for exec update results", async () => { const { ctx, onAgentEvent } = createTestContext(); diff --git a/src/agents/embedded-agent-subscribe.handlers.tools.ts b/src/agents/embedded-agent-subscribe.handlers.tools.ts index dce939c47e08..7b5116a04c15 100644 --- a/src/agents/embedded-agent-subscribe.handlers.tools.ts +++ b/src/agents/embedded-agent-subscribe.handlers.tools.ts @@ -26,6 +26,7 @@ import { import { normalizeOptionalLowercaseString, readStringValue } from "../shared/string-coerce.js"; import { truncateUtf16Safe } from "../utils.js"; import { normalizeAcceptedSessionSpawnResult } from "./accepted-session-spawn.js"; +import { REQUIRED_PARAM_GROUPS, type RequiredParamGroup } from "./agent-tools.params.js"; import type { ApplyPatchSummary } from "./apply-patch.js"; import type { ExecToolDetails } from "./bash-tools.exec-types.js"; import { sanitizeForConsole } from "./console-sanitize.js"; @@ -53,7 +54,6 @@ import { import { inferToolMetaFromArgs } from "./embedded-agent-utils.js"; import { parseExecApprovalResultText } from "./exec-approval-result.js"; import type { AgentEvent } from "./runtime/index.js"; -import { REQUIRED_PARAM_GROUPS, type RequiredParamGroup } from "./agent-tools.params.js"; import { buildToolMutationState, isSameToolMutationAction } from "./tool-mutation.js"; import { normalizeToolName } from "./tool-policy.js"; @@ -61,6 +61,9 @@ type ExecApprovalReplyModule = typeof import("../infra/exec-approval-reply.js"); type HookRunnerGlobalModule = typeof import("../plugins/hook-runner-global.js"); type MediaParseModule = typeof import("../media/parse.js"); type BeforeToolCallModule = typeof import("./agent-tools.before-tool-call.js"); +type ChannelToolProgress = { + text: string; +}; const execApprovalReplyModuleLoader = createLazyImportLoader( () => import("../infra/exec-approval-reply.js"), @@ -346,6 +349,20 @@ function extractLiveExecOutput(result: unknown): string | undefined { return typeof output === "string" ? truncateLiveExecOutput(output) : undefined; } +function readChannelToolProgress(result: unknown): ChannelToolProgress | undefined { + const progress = readRecordField(asOptionalObjectRecord(result)?.progress); + // Only an explicit typed progress field crosses into channel UI. Tool output + // and details may contain fetched content or private args, so never infer. + if (progress?.visibility !== "channel" || progress.privacy !== "public") { + return undefined; + } + const text = readStringValue(progress.text)?.trim(); + if (!text) { + return undefined; + } + return { text: truncateLiveExecOutput(text) }; +} + function shouldEmitLiveExecUpdate(ctx: ToolHandlerContext, toolCallId: string): boolean { const now = Date.now(); const state = ctx.state.execLiveUpdateStateById ?? new Map(); @@ -1062,7 +1079,11 @@ export function handleToolExecutionUpdate( const sanitized = sanitizeToolResult(partial); const isExecTool = isExecToolName(toolName); const liveResult = isExecTool ? capLiveExecResult(sanitized) : sanitized; - const emitDetailedLiveUpdate = !isExecTool || shouldEmitLiveExecUpdate(ctx, toolCallId); + const toolProgress = isExecTool ? undefined : readChannelToolProgress(liveResult); + // Typed progress already has a sanitized item update path. Suppress the raw + // partial-result event for those updates to avoid duplicate preview lines. + const emitDetailedLiveUpdate = + !toolProgress && (!isExecTool || shouldEmitLiveExecUpdate(ctx, toolCallId)); if (emitDetailedLiveUpdate) { emitAgentEvent({ runId: ctx.params.runId, @@ -1082,18 +1103,22 @@ export function handleToolExecutionUpdate( title: buildToolItemTitle(toolName, ctx.state.toolMetaById.get(toolCallId)?.meta), status: "running", name: toolName, - meta: ctx.state.toolMetaById.get(toolCallId)?.meta, toolCallId, + ...(toolProgress + ? { progressText: toolProgress.text } + : { meta: ctx.state.toolMetaById.get(toolCallId)?.meta }), }; emitTrackedItemEvent(ctx, itemData); - void ctx.params.onAgentEvent?.({ - stream: "tool", - data: { - phase: "update", - name: toolName, - toolCallId, - }, - }); + if (!toolProgress) { + void ctx.params.onAgentEvent?.({ + stream: "tool", + data: { + phase: "update", + name: toolName, + toolCallId, + }, + }); + } if (isExecTool) { const output = extractLiveExecOutput(liveResult); const commandData: AgentItemEventData = { diff --git a/src/agents/tools/common.ts b/src/agents/tools/common.ts index 3b8d46e8d6ac..db5980b631ec 100644 --- a/src/agents/tools/common.ts +++ b/src/agents/tools/common.ts @@ -9,7 +9,12 @@ import { } from "../../shared/number-coercion.js"; import { normalizeStringEntries } from "../../shared/string-normalization.js"; import type { ImageSanitizationLimits } from "../image-sanitization.js"; -import type { AgentTool, AgentToolResult, AgentToolUpdateCallback } from "../runtime/index.js"; +import type { + AgentTool, + AgentToolProgress, + AgentToolResult, + AgentToolUpdateCallback, +} from "../runtime/index.js"; import { sanitizeToolResultImages } from "../tool-images.js"; export type AgentToolWithMeta = AgentTool< @@ -395,6 +400,67 @@ export function jsonResult(payload: unknown): AgentToolResult { return textResult(JSON.stringify(payload, null, 2), payload); } +export type PublicToolProgress = Pick; + +export function toolProgressResult(progress: PublicToolProgress): AgentToolResult { + return { + content: [], + details: undefined, + progress: { + text: progress.text, + visibility: "channel", + privacy: "public", + ...(progress.id ? { id: progress.id } : {}), + }, + }; +} + +// Tool progress is a UI side channel. The model-facing tool result remains in +// `content`; progress text must already be safe to show in channel previews. +export function emitToolProgress( + onUpdate: AgentToolUpdateCallback | undefined, + progress: PublicToolProgress, +): void { + const text = progress.text.trim(); + if (!onUpdate || !text) { + return; + } + try { + onUpdate(toolProgressResult({ ...progress, text })); + } catch { + // Progress is best-effort UI state; tool execution must not depend on subscribers. + } +} + +// Long-running tools can arm delayed progress and cancel it on completion or +// abort. This avoids stale "still working" lines after a fast or canceled call. +export function scheduleToolProgress( + onUpdate: AgentToolUpdateCallback | undefined, + progress: PublicToolProgress, + delayMs: number, + options: { signal?: AbortSignal } = {}, +): () => void { + if (!onUpdate || options.signal?.aborted) { + return () => {}; + } + let cleared = false; + let timer: ReturnType; + const clear = () => { + if (cleared) { + return; + } + cleared = true; + clearTimeout(timer); + options.signal?.removeEventListener("abort", clear); + }; + timer = setTimeout(() => { + clear(); + emitToolProgress(onUpdate, progress); + }, delayMs); + options.signal?.addEventListener("abort", clear, { once: true }); + return clear; +} + export async function imageResult(params: { label: string; path: string; diff --git a/src/agents/tools/web-fetch.ts b/src/agents/tools/web-fetch.ts index bc7f8011c36f..55b1c48c1fbd 100644 --- a/src/agents/tools/web-fetch.ts +++ b/src/agents/tools/web-fetch.ts @@ -15,7 +15,12 @@ import { extractReadableContent } from "../../web-fetch/content-extractors.runti import { resolveWebProviderConfig } from "../../web/provider-runtime-shared.js"; import { stringEnum } from "../schema/string-enum.js"; import type { AnyAgentTool } from "./common.js"; -import { jsonResult, readPositiveIntegerParam, readStringParam } from "./common.js"; +import { + jsonResult, + readPositiveIntegerParam, + readStringParam, + scheduleToolProgress, +} from "./common.js"; import { extractBasicHtmlContent, htmlToMarkdown, @@ -43,6 +48,8 @@ const DEFAULT_FETCH_MAX_RESPONSE_BYTES = 750_000; const FETCH_MAX_RESPONSE_BYTES_MIN = 32_000; const FETCH_MAX_RESPONSE_BYTES_MAX = 10_000_000; const DEFAULT_FETCH_MAX_REDIRECTS = 3; +const WEB_FETCH_PROGRESS_THRESHOLD_MS = 5_000; +const WEB_FETCH_PROGRESS_TEXT = "Fetching page content..."; const DEFAULT_ERROR_MAX_CHARS = 4_000; const DEFAULT_ERROR_MAX_BYTES = 64_000; const DEFAULT_FETCH_USER_AGENT = @@ -285,6 +292,7 @@ type WebFetchRuntimeParams = { }; providerCacheKey?: string; lookupFn?: LookupFn; + signal?: AbortSignal; resolveProviderFallback: () => Promise; }; @@ -310,6 +318,15 @@ function normalizeProviderFinalUrl(value: unknown): string | undefined { } } +function throwIfFetchAborted(signal: AbortSignal | undefined): void { + if (!signal?.aborted) { + return; + } + // readResponseText may finish after an abort races with body reading. Recheck + // before wrapping, caching, or returning content from a canceled tool call. + throw signal.reason instanceof Error ? signal.reason : new Error("aborted"); +} + function normalizeProviderWebFetchPayload(params: { providerId: string; payload: unknown; @@ -435,6 +452,7 @@ async function runWebFetch(params: WebFetchRuntimeParams): Promise { + execute: async (_toolCallId, args, signal, onUpdate) => { const { config, preferRuntimeProviders, runtimeWebFetch } = resolveWebFetchToolRuntimeContext( { config: options?.config, @@ -676,34 +702,47 @@ export function createWebFetchTool(options?: { const extractMode = readStringParam(params, "extractMode") === "text" ? "text" : "markdown"; const maxChars = readPositiveIntegerParam(params, "maxChars"); const maxCharsCap = resolveFetchMaxCharsCap(executionFetch); - const result = await runWebFetch({ - url, - extractMode, - maxChars: resolveMaxChars( - maxChars ?? executionFetch?.maxChars, - DEFAULT_FETCH_MAX_CHARS, - maxCharsCap, - ), - maxResponseBytes, - maxRedirects: resolveMaxRedirects( - executionFetch?.maxRedirects, - DEFAULT_FETCH_MAX_REDIRECTS, - ), - timeoutSeconds: resolveTimeoutSeconds( - executionFetch?.timeoutSeconds, - DEFAULT_TIMEOUT_SECONDS, - ), - cacheTtlMs: resolveCacheTtlMs(executionFetch?.cacheTtlMinutes, DEFAULT_CACHE_TTL_MINUTES), - userAgent, - readabilityEnabled, - config, - useTrustedEnvProxy: resolveFetchUseTrustedEnvProxy(executionFetch), - ssrfPolicy: executionFetch?.ssrfPolicy, - ...(providerCacheKey ? { providerCacheKey } : {}), - lookupFn: options?.lookupFn, - resolveProviderFallback, - }); - return jsonResult(result); + // The progress line is emitted only if the fetch is still pending after + // the threshold; fast cache/network hits clear the timer before it fires. + const clearProgressTimer = scheduleToolProgress( + onUpdate, + { text: WEB_FETCH_PROGRESS_TEXT, id: "web_fetch:fetching" }, + WEB_FETCH_PROGRESS_THRESHOLD_MS, + { signal }, + ); + try { + const result = await runWebFetch({ + url, + extractMode, + maxChars: resolveMaxChars( + maxChars ?? executionFetch?.maxChars, + DEFAULT_FETCH_MAX_CHARS, + maxCharsCap, + ), + maxResponseBytes, + maxRedirects: resolveMaxRedirects( + executionFetch?.maxRedirects, + DEFAULT_FETCH_MAX_REDIRECTS, + ), + timeoutSeconds: resolveTimeoutSeconds( + executionFetch?.timeoutSeconds, + DEFAULT_TIMEOUT_SECONDS, + ), + cacheTtlMs: resolveCacheTtlMs(executionFetch?.cacheTtlMinutes, DEFAULT_CACHE_TTL_MINUTES), + userAgent, + readabilityEnabled, + config, + useTrustedEnvProxy: resolveFetchUseTrustedEnvProxy(executionFetch), + ssrfPolicy: executionFetch?.ssrfPolicy, + ...(providerCacheKey ? { providerCacheKey } : {}), + lookupFn: options?.lookupFn, + signal, + resolveProviderFallback, + }); + return jsonResult(result); + } finally { + clearProgressTimer(); + } }, }; } diff --git a/src/agents/tools/web-tools.fetch.test.ts b/src/agents/tools/web-tools.fetch.test.ts index 09f926a24857..ee22934342e8 100644 --- a/src/agents/tools/web-tools.fetch.test.ts +++ b/src/agents/tools/web-tools.fetch.test.ts @@ -196,6 +196,207 @@ describe("web_fetch extraction fallbacks", () => { expect(details.wrappedLength).toBe(details.text?.length); }); + it("emits typed public progress for slow fetches", async () => { + vi.useFakeTimers(); + try { + installMockFetch(async (input: RequestInfo | URL) => { + await new Promise((resolve) => setTimeout(resolve, 6000)); + return textResponse("Loaded page", resolveRequestUrl(input)) as Response; + }); + const updates: unknown[] = []; + const tool = createFetchTool({ firecrawl: { enabled: false } }); + const resultPromise = tool?.execute?.( + "call", + { url: "https://example.com/" }, + undefined, + (partialResult) => { + updates.push(partialResult); + }, + ); + + await vi.advanceTimersByTimeAsync(5000); + + expect(updates).toEqual([ + { + content: [], + details: undefined, + progress: { + text: "Fetching page content...", + visibility: "channel", + privacy: "public", + id: "web_fetch:fetching", + }, + }, + ]); + + await vi.advanceTimersByTimeAsync(1000); + await resultPromise; + } finally { + vi.useRealTimers(); + } + }); + + it("cancels typed progress when fetches finish before the progress threshold", async () => { + vi.useFakeTimers(); + try { + installPlainTextFetch("Loaded quickly"); + const updates: unknown[] = []; + const tool = createFetchTool({ firecrawl: { enabled: false } }); + + await tool?.execute?.("call", { url: "https://example.com/" }, undefined, (partial) => { + updates.push(partial); + }); + await vi.advanceTimersByTimeAsync(5000); + + expect(updates).toHaveLength(0); + } finally { + vi.useRealTimers(); + } + }); + + it("cancels typed progress when fetches are aborted", async () => { + vi.useFakeTimers(); + try { + const providerExecute = vi.fn(async () => ({ text: "provider fallback" })); + resolveWebFetchDefinitionMock.mockReturnValue({ + provider: { id: "firecrawl" }, + definition: { + description: "firecrawl", + parameters: {}, + execute: providerExecute, + }, + }); + installMockFetch(async (_input: RequestInfo | URL, init?: RequestInit) => { + return await new Promise((resolve, reject) => { + init?.signal?.addEventListener( + "abort", + () => { + reject(new Error("aborted")); + }, + { once: true }, + ); + setTimeout(() => { + resolve(textResponse("Loaded page") as Response); + }, 6000); + }); + }); + const updates: unknown[] = []; + const controller = new AbortController(); + const tool = createFetchTool({ firecrawl: { enabled: false } }); + const resultPromise = tool?.execute?.( + "call", + { url: "https://example.com/" }, + controller.signal, + (partial) => { + updates.push(partial); + }, + ); + const observedResultPromise = resultPromise?.catch((error: unknown) => error); + + await vi.advanceTimersByTimeAsync(0); + controller.abort(); + await vi.advanceTimersByTimeAsync(5000); + + const error = await observedResultPromise; + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toBe("aborted"); + expect(updates).toHaveLength(0); + expect(resolveWebFetchDefinitionMock).not.toHaveBeenCalled(); + expect(providerExecute).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it("cancels typed progress when fetch body reads are aborted", async () => { + vi.useFakeTimers(); + try { + const providerExecute = vi.fn(async () => ({ text: "provider fallback" })); + resolveWebFetchDefinitionMock.mockReturnValue({ + provider: { id: "firecrawl" }, + definition: { + description: "firecrawl", + parameters: {}, + execute: providerExecute, + }, + }); + installMockFetch(async (_input: RequestInfo | URL, init?: RequestInit) => { + const body = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode("partial body")); + const lateTimer = setTimeout(() => { + controller.enqueue(new TextEncoder().encode("late body")); + controller.close(); + }, 6000); + init?.signal?.addEventListener( + "abort", + () => { + clearTimeout(lateTimer); + controller.error(new Error("body aborted")); + }, + { once: true }, + ); + }, + }); + return new Response(body, { + status: 200, + headers: { "content-type": "text/plain" }, + }); + }); + const updates: unknown[] = []; + const controller = new AbortController(); + const tool = createFetchTool({ firecrawl: { enabled: false } }); + const resultPromise = tool?.execute?.( + "call", + { url: "https://example.com/" }, + controller.signal, + (partial) => { + updates.push(partial); + }, + ); + const observedResultPromise = resultPromise?.catch((error: unknown) => error); + + await vi.advanceTimersByTimeAsync(0); + controller.abort(new Error("cancelled")); + await vi.advanceTimersByTimeAsync(5000); + + const error = await observedResultPromise; + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toBe("cancelled"); + expect(updates).toHaveLength(0); + expect(providerExecute).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it("keeps fetch execution alive when progress subscribers throw", async () => { + vi.useFakeTimers(); + try { + installMockFetch(async (input: RequestInfo | URL) => { + await new Promise((resolve) => setTimeout(resolve, 6000)); + return textResponse("Loaded page", resolveRequestUrl(input)) as Response; + }); + const tool = createFetchTool({ firecrawl: { enabled: false } }); + const onUpdate = vi.fn(() => { + throw new Error("subscriber failed"); + }); + const resultPromise = tool?.execute?.( + "call", + { url: "https://example.com/" }, + undefined, + onUpdate, + ); + + await vi.advanceTimersByTimeAsync(5000); + await vi.advanceTimersByTimeAsync(1000); + await expect(resultPromise).resolves.toBeTruthy(); + expect(onUpdate).toHaveBeenCalledTimes(1); + } finally { + vi.useRealTimers(); + } + }); + it("enforces maxChars after wrapping", async () => { const longText = "x".repeat(5_000); installMockFetch((input: RequestInfo | URL) =>