mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(agents): add typed tool progress updates
Add a general typed tool-progress contract so long-running non-exec tools can emit public channel progress without overloading model-facing tool content. `web_fetch` now uses the generic delayed progress helper: it shows `Fetching page content...` only when the fetch is still pending after five seconds, clears the timer on completion/abort, passes the abort signal into guarded fetch, and avoids provider fallback or cached success after cancellation. The subscriber path accepts only explicit `visibility: "channel"` and `privacy: "public"` progress metadata, while untyped tool partials and exec output keep their existing behavior. Docs now explain typed progress, delayed producer examples, and the `web_fetch` timing behavior. Proof: `pnpm test src/agents/tools/web-tools.fetch.test.ts src/agents/embedded-agent-subscribe.handlers.tools.test.ts -- --run`; `pnpm docs:check-mdx`; changed-file `pnpm exec oxlint ...`; `git diff --check`; autoreview clean.
This commit is contained in:
committed by
GitHub
parent
bba28df9f7
commit
aff6d079d3
@@ -178,6 +178,50 @@ Progress lines are enabled by default in progress mode. They come from real run
|
||||
events: tool starts, item updates, task plans, approvals, command output, patch
|
||||
summaries, and similar agent activity.
|
||||
|
||||
Tools can also emit typed progress while a single tool call is still running.
|
||||
That is how a slow fetch or search can update the visible draft before the tool
|
||||
returns its final result. The progress update is a partial tool result with
|
||||
empty model content and explicit public channel metadata:
|
||||
|
||||
```json
|
||||
{
|
||||
"content": [],
|
||||
"progress": {
|
||||
"text": "Fetching page content...",
|
||||
"visibility": "channel",
|
||||
"privacy": "public",
|
||||
"id": "web_fetch:fetching"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
OpenClaw renders only the `progress.text` in the channel progress UI. The
|
||||
normal tool result still arrives later as `content` and `details`, and is the
|
||||
only part returned to the model.
|
||||
|
||||
When adding progress to a tool, use a short, generic message and delay it until
|
||||
the operation has been pending long enough to be useful:
|
||||
|
||||
```typescript
|
||||
const clearProgressTimer = scheduleToolProgress(
|
||||
onUpdate,
|
||||
{ text: "Fetching page content...", id: "web_fetch:fetching" },
|
||||
5_000,
|
||||
{ signal },
|
||||
);
|
||||
|
||||
try {
|
||||
return await runToolWork();
|
||||
} finally {
|
||||
clearProgressTimer();
|
||||
}
|
||||
```
|
||||
|
||||
This pattern means fast calls do not show a progress line, long calls show one
|
||||
while they are still pending, and canceled calls clear the timer before stale
|
||||
progress can appear. Progress text is a public UI side channel, so it must not
|
||||
include secrets, raw arguments, fetched content, command output, or page text.
|
||||
|
||||
OpenClaw uses the same formatter for progress drafts and `/verbose`:
|
||||
|
||||
```json5
|
||||
|
||||
@@ -197,6 +197,12 @@ Matrix:
|
||||
|
||||
Preview streaming can also include **tool-progress** updates - short status lines like "searching the web", "reading file", or "calling tool" - that appear in the same preview message while tools are running, ahead of the final reply. In Codex app-server mode, Codex preamble/commentary messages use this same preview path, so short "I am checking..." progress notes can stream into the editable draft without becoming part of the final answer. This keeps multi-step tool turns visually alive rather than silent between the first thinking preview and the final answer.
|
||||
|
||||
Long-running tools may emit typed progress before they return. For example,
|
||||
`web_fetch` arms a five-second timer when it starts: if the fetch is still
|
||||
pending, the preview can show `Fetching page content...`; if the fetch finishes
|
||||
or is canceled before then, no progress line is emitted. The later final tool
|
||||
result is still delivered normally to the model.
|
||||
|
||||
Supported surfaces:
|
||||
|
||||
- **Discord**, **Slack**, **Telegram**, and **Matrix** stream tool-progress and Codex preamble updates into the live preview edit by default when preview streaming is active. Microsoft Teams uses its native progress stream in personal chats.
|
||||
|
||||
@@ -57,6 +57,21 @@ Truncate output to this many characters.
|
||||
</Step>
|
||||
</Steps>
|
||||
|
||||
## Progress updates
|
||||
|
||||
`web_fetch` emits a public progress line only when the fetch is still pending
|
||||
after five seconds:
|
||||
|
||||
```text
|
||||
Fetching page content...
|
||||
```
|
||||
|
||||
Fast cache hits and quick network responses finish before the timer fires, so
|
||||
they do not show a progress line. If the call is canceled, the timer is cleared.
|
||||
When the fetch eventually completes, the agent receives the normal tool result;
|
||||
the progress line is only channel UI state and never contains fetched page
|
||||
content.
|
||||
|
||||
## Config
|
||||
|
||||
```json5
|
||||
|
||||
@@ -345,12 +345,26 @@ export interface AgentState {
|
||||
readonly errorMessage?: string;
|
||||
}
|
||||
|
||||
/** Channel-safe progress text emitted by a running tool. */
|
||||
export interface AgentToolProgress {
|
||||
/** Public text suitable for user-facing progress surfaces. */
|
||||
text: string;
|
||||
/** Tool progress is rendered by channel progress UIs. */
|
||||
visibility: "channel";
|
||||
/** Progress text must not contain secrets, private args, or fetched content. */
|
||||
privacy: "public";
|
||||
/** Optional stable id for progress line replacement. */
|
||||
id?: string;
|
||||
}
|
||||
|
||||
/** Final or partial result produced by a tool. */
|
||||
export interface AgentToolResult<T> {
|
||||
/** Text or image content returned to the model. */
|
||||
content: (TextContent | ImageContent)[];
|
||||
/** Arbitrary structured details for logs or UI rendering. */
|
||||
details: T;
|
||||
/** Optional public progress hint for partial tool updates; never model content. */
|
||||
progress?: AgentToolProgress;
|
||||
/**
|
||||
* Hint that the agent should stop after the current tool batch.
|
||||
* Early termination only happens when every finalized tool result in the batch sets this to true.
|
||||
|
||||
@@ -1087,6 +1087,138 @@ describe("handleToolExecutionEnd exec approval prompts", () => {
|
||||
});
|
||||
|
||||
describe("handleToolExecutionEnd derived tool events", () => {
|
||||
it("surfaces typed public tool progress for any non-exec tool", () => {
|
||||
resetAgentEventsForTest();
|
||||
const events: Array<{ stream?: string; data?: Record<string, unknown> }> = [];
|
||||
registerAgentEventListener((evt) => {
|
||||
events.push(evt as never);
|
||||
});
|
||||
const { ctx, onAgentEvent } = createTestContext();
|
||||
|
||||
handleToolExecutionUpdate(
|
||||
ctx as never,
|
||||
{
|
||||
type: "tool_execution_update",
|
||||
toolName: "custom_fetcher",
|
||||
toolCallId: "tool-custom-progress",
|
||||
partialResult: {
|
||||
content: [],
|
||||
details: undefined,
|
||||
progress: {
|
||||
text: "Loading remote resource...",
|
||||
visibility: "channel",
|
||||
privacy: "public",
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
);
|
||||
|
||||
expect(
|
||||
events.filter(
|
||||
(event) =>
|
||||
event.stream === "tool" &&
|
||||
(event.data as { phase?: string } | undefined)?.phase === "update",
|
||||
),
|
||||
).toHaveLength(0);
|
||||
const itemEvent = requireRecord(
|
||||
onAgentEvent.mock.calls
|
||||
.map((call) => call[0])
|
||||
.find((event) => (event as { stream?: string })?.stream === "item"),
|
||||
"progress item event",
|
||||
);
|
||||
expectRecordFields(itemEvent.data, "progress item event data", {
|
||||
itemId: "tool:tool-custom-progress",
|
||||
phase: "update",
|
||||
kind: "tool",
|
||||
name: "custom_fetcher",
|
||||
progressText: "Loading remote resource...",
|
||||
status: "running",
|
||||
});
|
||||
expect(requireRecord(itemEvent.data, "progress item event data").meta).toBeUndefined();
|
||||
|
||||
resetAgentEventsForTest();
|
||||
});
|
||||
|
||||
it("does not promote untyped non-exec content into channel progress", () => {
|
||||
resetAgentEventsForTest();
|
||||
const events: Array<{ stream?: string; data?: Record<string, unknown> }> = [];
|
||||
registerAgentEventListener((evt) => {
|
||||
events.push(evt as never);
|
||||
});
|
||||
const { ctx, onAgentEvent } = createTestContext();
|
||||
|
||||
handleToolExecutionUpdate(
|
||||
ctx as never,
|
||||
{
|
||||
type: "tool_execution_update",
|
||||
toolName: "web_fetch",
|
||||
toolCallId: "tool-web-fetch-untyped",
|
||||
partialResult: {
|
||||
content: [{ type: "text", text: "Fetching page content..." }],
|
||||
details: undefined,
|
||||
},
|
||||
} as never,
|
||||
);
|
||||
|
||||
expect(
|
||||
events.filter(
|
||||
(event) =>
|
||||
event.stream === "tool" &&
|
||||
(event.data as { phase?: string } | undefined)?.phase === "update",
|
||||
),
|
||||
).toHaveLength(1);
|
||||
const itemEvent = requireRecord(
|
||||
onAgentEvent.mock.calls
|
||||
.map((call) => call[0])
|
||||
.find((event) => (event as { stream?: string })?.stream === "item"),
|
||||
"tool item event",
|
||||
);
|
||||
expect(requireRecord(itemEvent.data, "tool item event data").progressText).toBeUndefined();
|
||||
expect(
|
||||
onAgentEvent.mock.calls
|
||||
.map((call) => call[0])
|
||||
.filter((event) => (event as { stream?: string })?.stream === "tool"),
|
||||
).toHaveLength(1);
|
||||
|
||||
resetAgentEventsForTest();
|
||||
});
|
||||
|
||||
it("caps typed public tool progress before channel item events", () => {
|
||||
const { ctx, onAgentEvent } = createTestContext();
|
||||
const largeProgress = "x".repeat(9000);
|
||||
|
||||
handleToolExecutionUpdate(
|
||||
ctx as never,
|
||||
{
|
||||
type: "tool_execution_update",
|
||||
toolName: "custom_fetcher",
|
||||
toolCallId: "tool-large-progress",
|
||||
partialResult: {
|
||||
content: [],
|
||||
details: undefined,
|
||||
progress: {
|
||||
text: largeProgress,
|
||||
visibility: "channel",
|
||||
privacy: "public",
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
);
|
||||
|
||||
const itemEvent = requireRecord(
|
||||
onAgentEvent.mock.calls
|
||||
.map((call) => call[0])
|
||||
.find((event) => (event as { stream?: string })?.stream === "item"),
|
||||
"large progress item event",
|
||||
);
|
||||
const progressText = requireString(
|
||||
requireRecord(itemEvent.data, "large progress item event data").progressText,
|
||||
"progress text",
|
||||
);
|
||||
expect(progressText).toContain("...(live output truncated)...");
|
||||
expect(progressText.length).toBeLessThan(largeProgress.length);
|
||||
});
|
||||
|
||||
it("emits command output deltas for exec update results", async () => {
|
||||
const { ctx, onAgentEvent } = createTestContext();
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ import {
|
||||
import { normalizeOptionalLowercaseString, readStringValue } from "../shared/string-coerce.js";
|
||||
import { truncateUtf16Safe } from "../utils.js";
|
||||
import { normalizeAcceptedSessionSpawnResult } from "./accepted-session-spawn.js";
|
||||
import { REQUIRED_PARAM_GROUPS, type RequiredParamGroup } from "./agent-tools.params.js";
|
||||
import type { ApplyPatchSummary } from "./apply-patch.js";
|
||||
import type { ExecToolDetails } from "./bash-tools.exec-types.js";
|
||||
import { sanitizeForConsole } from "./console-sanitize.js";
|
||||
@@ -53,7 +54,6 @@ import {
|
||||
import { inferToolMetaFromArgs } from "./embedded-agent-utils.js";
|
||||
import { parseExecApprovalResultText } from "./exec-approval-result.js";
|
||||
import type { AgentEvent } from "./runtime/index.js";
|
||||
import { REQUIRED_PARAM_GROUPS, type RequiredParamGroup } from "./agent-tools.params.js";
|
||||
import { buildToolMutationState, isSameToolMutationAction } from "./tool-mutation.js";
|
||||
import { normalizeToolName } from "./tool-policy.js";
|
||||
|
||||
@@ -61,6 +61,9 @@ type ExecApprovalReplyModule = typeof import("../infra/exec-approval-reply.js");
|
||||
type HookRunnerGlobalModule = typeof import("../plugins/hook-runner-global.js");
|
||||
type MediaParseModule = typeof import("../media/parse.js");
|
||||
type BeforeToolCallModule = typeof import("./agent-tools.before-tool-call.js");
|
||||
type ChannelToolProgress = {
|
||||
text: string;
|
||||
};
|
||||
|
||||
const execApprovalReplyModuleLoader = createLazyImportLoader<ExecApprovalReplyModule>(
|
||||
() => import("../infra/exec-approval-reply.js"),
|
||||
@@ -346,6 +349,20 @@ function extractLiveExecOutput(result: unknown): string | undefined {
|
||||
return typeof output === "string" ? truncateLiveExecOutput(output) : undefined;
|
||||
}
|
||||
|
||||
function readChannelToolProgress(result: unknown): ChannelToolProgress | undefined {
|
||||
const progress = readRecordField(asOptionalObjectRecord(result)?.progress);
|
||||
// Only an explicit typed progress field crosses into channel UI. Tool output
|
||||
// and details may contain fetched content or private args, so never infer.
|
||||
if (progress?.visibility !== "channel" || progress.privacy !== "public") {
|
||||
return undefined;
|
||||
}
|
||||
const text = readStringValue(progress.text)?.trim();
|
||||
if (!text) {
|
||||
return undefined;
|
||||
}
|
||||
return { text: truncateLiveExecOutput(text) };
|
||||
}
|
||||
|
||||
function shouldEmitLiveExecUpdate(ctx: ToolHandlerContext, toolCallId: string): boolean {
|
||||
const now = Date.now();
|
||||
const state = ctx.state.execLiveUpdateStateById ?? new Map<string, { lastEmittedAtMs: number }>();
|
||||
@@ -1062,7 +1079,11 @@ export function handleToolExecutionUpdate(
|
||||
const sanitized = sanitizeToolResult(partial);
|
||||
const isExecTool = isExecToolName(toolName);
|
||||
const liveResult = isExecTool ? capLiveExecResult(sanitized) : sanitized;
|
||||
const emitDetailedLiveUpdate = !isExecTool || shouldEmitLiveExecUpdate(ctx, toolCallId);
|
||||
const toolProgress = isExecTool ? undefined : readChannelToolProgress(liveResult);
|
||||
// Typed progress already has a sanitized item update path. Suppress the raw
|
||||
// partial-result event for those updates to avoid duplicate preview lines.
|
||||
const emitDetailedLiveUpdate =
|
||||
!toolProgress && (!isExecTool || shouldEmitLiveExecUpdate(ctx, toolCallId));
|
||||
if (emitDetailedLiveUpdate) {
|
||||
emitAgentEvent({
|
||||
runId: ctx.params.runId,
|
||||
@@ -1082,18 +1103,22 @@ export function handleToolExecutionUpdate(
|
||||
title: buildToolItemTitle(toolName, ctx.state.toolMetaById.get(toolCallId)?.meta),
|
||||
status: "running",
|
||||
name: toolName,
|
||||
meta: ctx.state.toolMetaById.get(toolCallId)?.meta,
|
||||
toolCallId,
|
||||
...(toolProgress
|
||||
? { progressText: toolProgress.text }
|
||||
: { meta: ctx.state.toolMetaById.get(toolCallId)?.meta }),
|
||||
};
|
||||
emitTrackedItemEvent(ctx, itemData);
|
||||
void ctx.params.onAgentEvent?.({
|
||||
stream: "tool",
|
||||
data: {
|
||||
phase: "update",
|
||||
name: toolName,
|
||||
toolCallId,
|
||||
},
|
||||
});
|
||||
if (!toolProgress) {
|
||||
void ctx.params.onAgentEvent?.({
|
||||
stream: "tool",
|
||||
data: {
|
||||
phase: "update",
|
||||
name: toolName,
|
||||
toolCallId,
|
||||
},
|
||||
});
|
||||
}
|
||||
if (isExecTool) {
|
||||
const output = extractLiveExecOutput(liveResult);
|
||||
const commandData: AgentItemEventData = {
|
||||
|
||||
@@ -9,7 +9,12 @@ import {
|
||||
} from "../../shared/number-coercion.js";
|
||||
import { normalizeStringEntries } from "../../shared/string-normalization.js";
|
||||
import type { ImageSanitizationLimits } from "../image-sanitization.js";
|
||||
import type { AgentTool, AgentToolResult, AgentToolUpdateCallback } from "../runtime/index.js";
|
||||
import type {
|
||||
AgentTool,
|
||||
AgentToolProgress,
|
||||
AgentToolResult,
|
||||
AgentToolUpdateCallback,
|
||||
} from "../runtime/index.js";
|
||||
import { sanitizeToolResultImages } from "../tool-images.js";
|
||||
|
||||
export type AgentToolWithMeta<TParameters extends TSchema, TResult> = AgentTool<
|
||||
@@ -395,6 +400,67 @@ export function jsonResult(payload: unknown): AgentToolResult<unknown> {
|
||||
return textResult(JSON.stringify(payload, null, 2), payload);
|
||||
}
|
||||
|
||||
export type PublicToolProgress = Pick<AgentToolProgress, "text" | "id">;
|
||||
|
||||
export function toolProgressResult(progress: PublicToolProgress): AgentToolResult<undefined> {
|
||||
return {
|
||||
content: [],
|
||||
details: undefined,
|
||||
progress: {
|
||||
text: progress.text,
|
||||
visibility: "channel",
|
||||
privacy: "public",
|
||||
...(progress.id ? { id: progress.id } : {}),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// Tool progress is a UI side channel. The model-facing tool result remains in
|
||||
// `content`; progress text must already be safe to show in channel previews.
|
||||
export function emitToolProgress(
|
||||
onUpdate: AgentToolUpdateCallback | undefined,
|
||||
progress: PublicToolProgress,
|
||||
): void {
|
||||
const text = progress.text.trim();
|
||||
if (!onUpdate || !text) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
onUpdate(toolProgressResult({ ...progress, text }));
|
||||
} catch {
|
||||
// Progress is best-effort UI state; tool execution must not depend on subscribers.
|
||||
}
|
||||
}
|
||||
|
||||
// Long-running tools can arm delayed progress and cancel it on completion or
|
||||
// abort. This avoids stale "still working" lines after a fast or canceled call.
|
||||
export function scheduleToolProgress(
|
||||
onUpdate: AgentToolUpdateCallback | undefined,
|
||||
progress: PublicToolProgress,
|
||||
delayMs: number,
|
||||
options: { signal?: AbortSignal } = {},
|
||||
): () => void {
|
||||
if (!onUpdate || options.signal?.aborted) {
|
||||
return () => {};
|
||||
}
|
||||
let cleared = false;
|
||||
let timer: ReturnType<typeof setTimeout>;
|
||||
const clear = () => {
|
||||
if (cleared) {
|
||||
return;
|
||||
}
|
||||
cleared = true;
|
||||
clearTimeout(timer);
|
||||
options.signal?.removeEventListener("abort", clear);
|
||||
};
|
||||
timer = setTimeout(() => {
|
||||
clear();
|
||||
emitToolProgress(onUpdate, progress);
|
||||
}, delayMs);
|
||||
options.signal?.addEventListener("abort", clear, { once: true });
|
||||
return clear;
|
||||
}
|
||||
|
||||
export async function imageResult(params: {
|
||||
label: string;
|
||||
path: string;
|
||||
|
||||
@@ -15,7 +15,12 @@ import { extractReadableContent } from "../../web-fetch/content-extractors.runti
|
||||
import { resolveWebProviderConfig } from "../../web/provider-runtime-shared.js";
|
||||
import { stringEnum } from "../schema/string-enum.js";
|
||||
import type { AnyAgentTool } from "./common.js";
|
||||
import { jsonResult, readPositiveIntegerParam, readStringParam } from "./common.js";
|
||||
import {
|
||||
jsonResult,
|
||||
readPositiveIntegerParam,
|
||||
readStringParam,
|
||||
scheduleToolProgress,
|
||||
} from "./common.js";
|
||||
import {
|
||||
extractBasicHtmlContent,
|
||||
htmlToMarkdown,
|
||||
@@ -43,6 +48,8 @@ const DEFAULT_FETCH_MAX_RESPONSE_BYTES = 750_000;
|
||||
const FETCH_MAX_RESPONSE_BYTES_MIN = 32_000;
|
||||
const FETCH_MAX_RESPONSE_BYTES_MAX = 10_000_000;
|
||||
const DEFAULT_FETCH_MAX_REDIRECTS = 3;
|
||||
const WEB_FETCH_PROGRESS_THRESHOLD_MS = 5_000;
|
||||
const WEB_FETCH_PROGRESS_TEXT = "Fetching page content...";
|
||||
const DEFAULT_ERROR_MAX_CHARS = 4_000;
|
||||
const DEFAULT_ERROR_MAX_BYTES = 64_000;
|
||||
const DEFAULT_FETCH_USER_AGENT =
|
||||
@@ -285,6 +292,7 @@ type WebFetchRuntimeParams = {
|
||||
};
|
||||
providerCacheKey?: string;
|
||||
lookupFn?: LookupFn;
|
||||
signal?: AbortSignal;
|
||||
resolveProviderFallback: () => Promise<WebFetchProviderFallback>;
|
||||
};
|
||||
|
||||
@@ -310,6 +318,15 @@ function normalizeProviderFinalUrl(value: unknown): string | undefined {
|
||||
}
|
||||
}
|
||||
|
||||
function throwIfFetchAborted(signal: AbortSignal | undefined): void {
|
||||
if (!signal?.aborted) {
|
||||
return;
|
||||
}
|
||||
// readResponseText may finish after an abort races with body reading. Recheck
|
||||
// before wrapping, caching, or returning content from a canceled tool call.
|
||||
throw signal.reason instanceof Error ? signal.reason : new Error("aborted");
|
||||
}
|
||||
|
||||
function normalizeProviderWebFetchPayload(params: {
|
||||
providerId: string;
|
||||
payload: unknown;
|
||||
@@ -435,6 +452,7 @@ async function runWebFetch(params: WebFetchRuntimeParams): Promise<Record<string
|
||||
url: params.url,
|
||||
maxRedirects: params.maxRedirects,
|
||||
timeoutSeconds: params.timeoutSeconds,
|
||||
signal: params.signal,
|
||||
lookupFn: params.lookupFn,
|
||||
useEnvProxy: useTrustedEnvProxy,
|
||||
policy: ssrfPolicy,
|
||||
@@ -461,6 +479,9 @@ async function runWebFetch(params: WebFetchRuntimeParams): Promise<Record<string
|
||||
if (error instanceof SsrFBlockedError) {
|
||||
throw error;
|
||||
}
|
||||
if (params.signal?.aborted) {
|
||||
throw error;
|
||||
}
|
||||
const payload = await maybeFetchProviderWebFetchPayload({
|
||||
...params,
|
||||
urlToFetch: finalUrl,
|
||||
@@ -475,6 +496,9 @@ async function runWebFetch(params: WebFetchRuntimeParams): Promise<Record<string
|
||||
|
||||
try {
|
||||
if (!res.ok) {
|
||||
if (params.signal?.aborted) {
|
||||
throw params.signal.reason instanceof Error ? params.signal.reason : new Error("aborted");
|
||||
}
|
||||
const payload = await maybeFetchProviderWebFetchPayload({
|
||||
...params,
|
||||
urlToFetch: params.url,
|
||||
@@ -485,6 +509,7 @@ async function runWebFetch(params: WebFetchRuntimeParams): Promise<Record<string
|
||||
return payload;
|
||||
}
|
||||
const rawDetailResult = await readResponseText(res, { maxBytes: DEFAULT_ERROR_MAX_BYTES });
|
||||
throwIfFetchAborted(params.signal);
|
||||
const rawDetail = rawDetailResult.text;
|
||||
const detail = formatWebFetchErrorDetail({
|
||||
detail: rawDetail,
|
||||
@@ -498,6 +523,7 @@ async function runWebFetch(params: WebFetchRuntimeParams): Promise<Record<string
|
||||
const contentType = res.headers.get("content-type") ?? "application/octet-stream";
|
||||
const normalizedContentType = normalizeContentType(contentType) ?? "application/octet-stream";
|
||||
const bodyResult = await readResponseText(res, { maxBytes: params.maxResponseBytes });
|
||||
throwIfFetchAborted(params.signal);
|
||||
const body = bodyResult.text;
|
||||
const responseTruncatedWarning = bodyResult.truncated
|
||||
? `Response body truncated after ${params.maxResponseBytes} bytes.`
|
||||
@@ -630,7 +656,7 @@ export function createWebFetchTool(options?: {
|
||||
description:
|
||||
"Fetch URL and extract readable markdown/text. Lightweight page access; no browser automation.",
|
||||
parameters: WebFetchSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
execute: async (_toolCallId, args, signal, onUpdate) => {
|
||||
const { config, preferRuntimeProviders, runtimeWebFetch } = resolveWebFetchToolRuntimeContext(
|
||||
{
|
||||
config: options?.config,
|
||||
@@ -676,34 +702,47 @@ export function createWebFetchTool(options?: {
|
||||
const extractMode = readStringParam(params, "extractMode") === "text" ? "text" : "markdown";
|
||||
const maxChars = readPositiveIntegerParam(params, "maxChars");
|
||||
const maxCharsCap = resolveFetchMaxCharsCap(executionFetch);
|
||||
const result = await runWebFetch({
|
||||
url,
|
||||
extractMode,
|
||||
maxChars: resolveMaxChars(
|
||||
maxChars ?? executionFetch?.maxChars,
|
||||
DEFAULT_FETCH_MAX_CHARS,
|
||||
maxCharsCap,
|
||||
),
|
||||
maxResponseBytes,
|
||||
maxRedirects: resolveMaxRedirects(
|
||||
executionFetch?.maxRedirects,
|
||||
DEFAULT_FETCH_MAX_REDIRECTS,
|
||||
),
|
||||
timeoutSeconds: resolveTimeoutSeconds(
|
||||
executionFetch?.timeoutSeconds,
|
||||
DEFAULT_TIMEOUT_SECONDS,
|
||||
),
|
||||
cacheTtlMs: resolveCacheTtlMs(executionFetch?.cacheTtlMinutes, DEFAULT_CACHE_TTL_MINUTES),
|
||||
userAgent,
|
||||
readabilityEnabled,
|
||||
config,
|
||||
useTrustedEnvProxy: resolveFetchUseTrustedEnvProxy(executionFetch),
|
||||
ssrfPolicy: executionFetch?.ssrfPolicy,
|
||||
...(providerCacheKey ? { providerCacheKey } : {}),
|
||||
lookupFn: options?.lookupFn,
|
||||
resolveProviderFallback,
|
||||
});
|
||||
return jsonResult(result);
|
||||
// The progress line is emitted only if the fetch is still pending after
|
||||
// the threshold; fast cache/network hits clear the timer before it fires.
|
||||
const clearProgressTimer = scheduleToolProgress(
|
||||
onUpdate,
|
||||
{ text: WEB_FETCH_PROGRESS_TEXT, id: "web_fetch:fetching" },
|
||||
WEB_FETCH_PROGRESS_THRESHOLD_MS,
|
||||
{ signal },
|
||||
);
|
||||
try {
|
||||
const result = await runWebFetch({
|
||||
url,
|
||||
extractMode,
|
||||
maxChars: resolveMaxChars(
|
||||
maxChars ?? executionFetch?.maxChars,
|
||||
DEFAULT_FETCH_MAX_CHARS,
|
||||
maxCharsCap,
|
||||
),
|
||||
maxResponseBytes,
|
||||
maxRedirects: resolveMaxRedirects(
|
||||
executionFetch?.maxRedirects,
|
||||
DEFAULT_FETCH_MAX_REDIRECTS,
|
||||
),
|
||||
timeoutSeconds: resolveTimeoutSeconds(
|
||||
executionFetch?.timeoutSeconds,
|
||||
DEFAULT_TIMEOUT_SECONDS,
|
||||
),
|
||||
cacheTtlMs: resolveCacheTtlMs(executionFetch?.cacheTtlMinutes, DEFAULT_CACHE_TTL_MINUTES),
|
||||
userAgent,
|
||||
readabilityEnabled,
|
||||
config,
|
||||
useTrustedEnvProxy: resolveFetchUseTrustedEnvProxy(executionFetch),
|
||||
ssrfPolicy: executionFetch?.ssrfPolicy,
|
||||
...(providerCacheKey ? { providerCacheKey } : {}),
|
||||
lookupFn: options?.lookupFn,
|
||||
signal,
|
||||
resolveProviderFallback,
|
||||
});
|
||||
return jsonResult(result);
|
||||
} finally {
|
||||
clearProgressTimer();
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -196,6 +196,207 @@ describe("web_fetch extraction fallbacks", () => {
|
||||
expect(details.wrappedLength).toBe(details.text?.length);
|
||||
});
|
||||
|
||||
it("emits typed public progress for slow fetches", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
installMockFetch(async (input: RequestInfo | URL) => {
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 6000));
|
||||
return textResponse("Loaded page", resolveRequestUrl(input)) as Response;
|
||||
});
|
||||
const updates: unknown[] = [];
|
||||
const tool = createFetchTool({ firecrawl: { enabled: false } });
|
||||
const resultPromise = tool?.execute?.(
|
||||
"call",
|
||||
{ url: "https://example.com/" },
|
||||
undefined,
|
||||
(partialResult) => {
|
||||
updates.push(partialResult);
|
||||
},
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5000);
|
||||
|
||||
expect(updates).toEqual([
|
||||
{
|
||||
content: [],
|
||||
details: undefined,
|
||||
progress: {
|
||||
text: "Fetching page content...",
|
||||
visibility: "channel",
|
||||
privacy: "public",
|
||||
id: "web_fetch:fetching",
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1000);
|
||||
await resultPromise;
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("cancels typed progress when fetches finish before the progress threshold", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
installPlainTextFetch("Loaded quickly");
|
||||
const updates: unknown[] = [];
|
||||
const tool = createFetchTool({ firecrawl: { enabled: false } });
|
||||
|
||||
await tool?.execute?.("call", { url: "https://example.com/" }, undefined, (partial) => {
|
||||
updates.push(partial);
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(5000);
|
||||
|
||||
expect(updates).toHaveLength(0);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("cancels typed progress when fetches are aborted", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const providerExecute = vi.fn(async () => ({ text: "provider fallback" }));
|
||||
resolveWebFetchDefinitionMock.mockReturnValue({
|
||||
provider: { id: "firecrawl" },
|
||||
definition: {
|
||||
description: "firecrawl",
|
||||
parameters: {},
|
||||
execute: providerExecute,
|
||||
},
|
||||
});
|
||||
installMockFetch(async (_input: RequestInfo | URL, init?: RequestInit) => {
|
||||
return await new Promise<Response>((resolve, reject) => {
|
||||
init?.signal?.addEventListener(
|
||||
"abort",
|
||||
() => {
|
||||
reject(new Error("aborted"));
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
setTimeout(() => {
|
||||
resolve(textResponse("Loaded page") as Response);
|
||||
}, 6000);
|
||||
});
|
||||
});
|
||||
const updates: unknown[] = [];
|
||||
const controller = new AbortController();
|
||||
const tool = createFetchTool({ firecrawl: { enabled: false } });
|
||||
const resultPromise = tool?.execute?.(
|
||||
"call",
|
||||
{ url: "https://example.com/" },
|
||||
controller.signal,
|
||||
(partial) => {
|
||||
updates.push(partial);
|
||||
},
|
||||
);
|
||||
const observedResultPromise = resultPromise?.catch((error: unknown) => error);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
controller.abort();
|
||||
await vi.advanceTimersByTimeAsync(5000);
|
||||
|
||||
const error = await observedResultPromise;
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect((error as Error).message).toBe("aborted");
|
||||
expect(updates).toHaveLength(0);
|
||||
expect(resolveWebFetchDefinitionMock).not.toHaveBeenCalled();
|
||||
expect(providerExecute).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("cancels typed progress when fetch body reads are aborted", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const providerExecute = vi.fn(async () => ({ text: "provider fallback" }));
|
||||
resolveWebFetchDefinitionMock.mockReturnValue({
|
||||
provider: { id: "firecrawl" },
|
||||
definition: {
|
||||
description: "firecrawl",
|
||||
parameters: {},
|
||||
execute: providerExecute,
|
||||
},
|
||||
});
|
||||
installMockFetch(async (_input: RequestInfo | URL, init?: RequestInit) => {
|
||||
const body = new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode("partial body"));
|
||||
const lateTimer = setTimeout(() => {
|
||||
controller.enqueue(new TextEncoder().encode("late body"));
|
||||
controller.close();
|
||||
}, 6000);
|
||||
init?.signal?.addEventListener(
|
||||
"abort",
|
||||
() => {
|
||||
clearTimeout(lateTimer);
|
||||
controller.error(new Error("body aborted"));
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
},
|
||||
});
|
||||
return new Response(body, {
|
||||
status: 200,
|
||||
headers: { "content-type": "text/plain" },
|
||||
});
|
||||
});
|
||||
const updates: unknown[] = [];
|
||||
const controller = new AbortController();
|
||||
const tool = createFetchTool({ firecrawl: { enabled: false } });
|
||||
const resultPromise = tool?.execute?.(
|
||||
"call",
|
||||
{ url: "https://example.com/" },
|
||||
controller.signal,
|
||||
(partial) => {
|
||||
updates.push(partial);
|
||||
},
|
||||
);
|
||||
const observedResultPromise = resultPromise?.catch((error: unknown) => error);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
controller.abort(new Error("cancelled"));
|
||||
await vi.advanceTimersByTimeAsync(5000);
|
||||
|
||||
const error = await observedResultPromise;
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect((error as Error).message).toBe("cancelled");
|
||||
expect(updates).toHaveLength(0);
|
||||
expect(providerExecute).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps fetch execution alive when progress subscribers throw", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
installMockFetch(async (input: RequestInfo | URL) => {
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 6000));
|
||||
return textResponse("Loaded page", resolveRequestUrl(input)) as Response;
|
||||
});
|
||||
const tool = createFetchTool({ firecrawl: { enabled: false } });
|
||||
const onUpdate = vi.fn(() => {
|
||||
throw new Error("subscriber failed");
|
||||
});
|
||||
const resultPromise = tool?.execute?.(
|
||||
"call",
|
||||
{ url: "https://example.com/" },
|
||||
undefined,
|
||||
onUpdate,
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5000);
|
||||
await vi.advanceTimersByTimeAsync(1000);
|
||||
await expect(resultPromise).resolves.toBeTruthy();
|
||||
expect(onUpdate).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("enforces maxChars after wrapping", async () => {
|
||||
const longText = "x".repeat(5_000);
|
||||
installMockFetch((input: RequestInfo | URL) =>
|
||||
|
||||
Reference in New Issue
Block a user