fix(providers): stream ordinary tool-like prose promptly

This commit is contained in:
Vincent Koc
2026-05-25 10:44:08 +02:00
parent 82bbcf60b0
commit 912fdfbedd
3 changed files with 341 additions and 7 deletions

View File

@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Agents/media: preserve async-started media tool metadata so background generation starts no longer surface generic incomplete-turn warnings while replay stays unsafe. (#85933) Thanks @fuller-stack-dev.
- xAI/LM Studio: avoid buffering ordinary bracketed or `final` prose until stream completion while watching for plain-text tool-call fallbacks.
- Discord: suppress a bot's previous reply body and referenced media from prompt context when a user replies to that bot message, while keeping reply metadata for routing. (#86238) Thanks @fuller-stack-dev.
- Tests: avoid rebuilding the Control UI twice during the installer Docker smoke now that `pnpm build` includes `ui:build`.
- Install/update: bypass npm `min-release-age` policies with `--min-release-age=0` instead of `--before` so hosted installers keep working on npm versions that reject the combined config. (#84749) Thanks @TeodoroRodrigo.

View File

@@ -0,0 +1,163 @@
import type { StreamFn } from "@earendil-works/pi-agent-core";
import { createAssistantMessageEventStream } from "@earendil-works/pi-ai";
import { describe, expect, it } from "vitest";
import { createPlainTextToolCallPromotionWrapper } from "./provider-stream-runtime-internal.js";
type StreamEvent = { type: string } & Record<string, unknown>;
function requireRecord(value: unknown, label: string): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) {
throw new Error(`expected ${label} to be a record`);
}
return value as Record<string, unknown>;
}
function createControlledWrappedStream() {
const source = createAssistantMessageEventStream();
const baseStream: StreamFn = () => source as ReturnType<StreamFn>;
const wrapped = createPlainTextToolCallPromotionWrapper(baseStream);
const stream = wrapped(
{ provider: "test", api: "openai-completions", id: "test-model" } as never,
{
messages: [],
tools: [{ name: "read", description: "Read", parameters: { type: "object" } }],
} as never,
{},
);
return { source, stream };
}
async function resolveStream(stream: ReturnType<StreamFn>) {
return stream instanceof Promise ? await stream : stream;
}
async function nextEvent(
iterator: AsyncIterator<unknown>,
label: string,
): Promise<StreamEvent> {
const result = await Promise.race([
iterator.next(),
new Promise<"timed out">((resolve) => setTimeout(() => resolve("timed out"), 50)),
]);
if (result === "timed out") {
throw new Error(`timed out waiting for ${label}`);
}
expect(result.done).toBe(false);
return result.value as StreamEvent;
}
describe("createPlainTextToolCallPromotionWrapper", () => {
it("promotes standalone plain-text tool calls for result consumers", async () => {
const { source, stream } = createControlledWrappedStream();
const resultPromise = (await resolveStream(stream)).result();
const rawToolText = '[tool:read] {"path":"src/index.ts"}';
source.push({ type: "start", partial: { content: [] } } as never);
source.push({
type: "text_delta",
contentIndex: 0,
delta: rawToolText,
} as never);
source.push({
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: rawToolText }],
stopReason: "stop",
},
} as never);
source.end();
const message = requireRecord(await resultPromise, "result message");
expect(message.stopReason).toBe("toolUse");
expect(requireRecord((message.content as unknown[])[0], "tool call")).toMatchObject({
type: "toolCall",
name: "read",
arguments: { path: "src/index.ts" },
});
});
it("does not buffer ordinary bracketed text until done", async () => {
const { source, stream } = createControlledWrappedStream();
const iterator = (await resolveStream(stream))[Symbol.asyncIterator]();
try {
source.push({ type: "start", partial: { content: [] } } as never);
expect((await nextEvent(iterator, "start")).type).toBe("start");
source.push({
type: "text_start",
contentIndex: 0,
partial: { content: [{ type: "text", text: "" }] },
} as never);
source.push({
type: "text_delta",
contentIndex: 0,
delta: "[note] keep streaming",
} as never);
expect((await nextEvent(iterator, "ordinary bracketed text")).type).toBe("text_start");
} finally {
source.push({ type: "done", reason: "stop", message: {} } as never);
source.end();
await iterator.return?.();
}
});
it("keeps CR-separated bracketed tool calls buffered for promotion", async () => {
const { source, stream } = createControlledWrappedStream();
const iterator = (await resolveStream(stream))[Symbol.asyncIterator]();
try {
source.push({ type: "start", partial: { content: [] } } as never);
expect((await nextEvent(iterator, "start")).type).toBe("start");
source.push({
type: "text_delta",
contentIndex: 0,
delta: '[read]\r{"path":"src/index.ts"}\r[END_TOOL_REQUEST]',
} as never);
source.push({
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [
{ type: "text", text: '[read]\r{"path":"src/index.ts"}\r[END_TOOL_REQUEST]' },
],
stopReason: "stop",
},
} as never);
const event = await nextEvent(iterator, "promoted CR tool call");
expect(event.type).toBe("toolcall_start");
} finally {
source.end();
await iterator.return?.();
}
});
it("does not buffer normal final prose until done", async () => {
const { source, stream } = createControlledWrappedStream();
const iterator = (await resolveStream(stream))[Symbol.asyncIterator]();
try {
source.push({ type: "start", partial: { content: [] } } as never);
expect((await nextEvent(iterator, "start")).type).toBe("start");
source.push({
type: "text_delta",
contentIndex: 0,
delta: "final answer starts here",
} as never);
const event = await nextEvent(iterator, "normal final prose");
expect(event).toMatchObject({ type: "text_delta", delta: "final answer starts here" });
} finally {
source.push({ type: "done", reason: "stop", message: {} } as never);
source.end();
await iterator.return?.();
}
});
});

View File

@@ -21,21 +21,191 @@ function resolveContextToolNames(context: Parameters<StreamFn>[1]): Set<string>
return new Set(names);
}
function couldStillBePlainTextToolCall(text: string): boolean {
function couldStillBePlainTextToolCall(text: string, toolNames: Set<string>): boolean {
if (text.length > 256_000) {
return false;
}
const trimmed = text.trimStart();
return (
trimmed.length === 0 ||
trimmed.startsWith("[") ||
trimmed.startsWith("<|channel|>") ||
trimmed.startsWith("commentary") ||
trimmed.startsWith("analysis") ||
trimmed.startsWith("final")
couldStillBeBracketedToolCall(trimmed, toolNames) ||
couldStillBeHarmonyToolCall(trimmed, toolNames)
);
}
function matchesLiteralPrefix(text: string, literal: string): boolean {
return literal.startsWith(text) || text.startsWith(literal);
}
function skipHorizontalWhitespace(text: string, start: number): number {
let cursor = start;
while (text[cursor] === " " || text[cursor] === "\t") {
cursor += 1;
}
return cursor;
}
function isToolNameChar(char: string | undefined): boolean {
return Boolean(char && /[A-Za-z0-9_-]/.test(char));
}
function hasToolNamePrefix(toolNames: Set<string>, prefix: string): boolean {
for (const toolName of toolNames) {
if (toolName.startsWith(prefix)) {
return true;
}
}
return false;
}
function couldStillBeJsonPayload(text: string, start: number): boolean {
let cursor = start;
while (cursor < text.length && /\s/.test(text[cursor] ?? "")) {
cursor += 1;
}
return cursor >= text.length || text[cursor] === "{";
}
function couldStillBeBracketedToolCall(text: string, toolNames: Set<string>): boolean {
if (!text.startsWith("[")) {
return false;
}
const toolPrefix = "[tool:";
if (matchesLiteralPrefix(text, toolPrefix)) {
if (text.length <= toolPrefix.length) {
return true;
}
let cursor = toolPrefix.length;
while (isToolNameChar(text[cursor])) {
cursor += 1;
}
const name = text.slice(toolPrefix.length, cursor);
if (!name || !hasToolNamePrefix(toolNames, name)) {
return false;
}
if (cursor >= text.length) {
return true;
}
if (text[cursor] !== "]") {
return false;
}
return couldStillBeJsonPayload(text, cursor + 1);
}
let cursor = 1;
while (isToolNameChar(text[cursor])) {
cursor += 1;
}
const name = text.slice(1, cursor);
if (!name || !hasToolNamePrefix(toolNames, name)) {
return false;
}
if (cursor >= text.length) {
return true;
}
if (text[cursor] !== "]") {
return false;
}
cursor = skipHorizontalWhitespace(text, cursor + 1);
if (cursor >= text.length) {
return true;
}
if (text[cursor] === "\r") {
if (cursor + 1 >= text.length) {
return true;
}
return couldStillBeJsonPayload(text, text[cursor + 1] === "\n" ? cursor + 2 : cursor + 1);
}
if (text[cursor] !== "\n") {
return false;
}
return couldStillBeJsonPayload(text, cursor + 1);
}
function couldStillBeHarmonyToolCall(text: string, toolNames: Set<string>): boolean {
const channelMarker = "<|channel|>";
let cursor = 0;
if (matchesLiteralPrefix(text, channelMarker)) {
if (text.length <= channelMarker.length) {
return true;
}
cursor = channelMarker.length;
}
const rest = text.slice(cursor);
const channel = ["commentary", "analysis", "final"].find((candidate) =>
matchesLiteralPrefix(rest, candidate),
);
if (!channel) {
return false;
}
if (rest.length <= channel.length) {
return true;
}
cursor += channel.length;
cursor = skipHorizontalWhitespace(text, cursor);
if (cursor >= text.length) {
return true;
}
const toMarker = "to=";
const toRest = text.slice(cursor);
if (!matchesLiteralPrefix(toRest, toMarker)) {
return false;
}
if (toRest.length <= toMarker.length) {
return true;
}
cursor += toMarker.length;
const nameStart = cursor;
while (isToolNameChar(text[cursor])) {
cursor += 1;
}
const name = text.slice(nameStart, cursor);
if (!name || !hasToolNamePrefix(toolNames, name)) {
return false;
}
if (cursor >= text.length) {
return true;
}
cursor = skipHorizontalWhitespace(text, cursor);
if (cursor >= text.length) {
return true;
}
if (!toolNames.has(name)) {
return false;
}
const codeMarker = "code";
const codeRest = text.slice(cursor);
if (!matchesLiteralPrefix(codeRest, codeMarker)) {
return false;
}
if (codeRest.length <= codeMarker.length) {
return true;
}
cursor += codeMarker.length;
while (cursor < text.length && /\s/.test(text[cursor] ?? "")) {
cursor += 1;
}
if (cursor >= text.length) {
return true;
}
const messageMarker = "<|message|>";
const messageRest = text.slice(cursor);
if (matchesLiteralPrefix(messageRest, messageMarker)) {
return true;
}
return text[cursor] === "{";
}
function createSyntheticToolCallId(): string {
return `call_${randomUUID().replace(/-/g, "").slice(0, 24)}`;
}
@@ -179,7 +349,7 @@ function wrapPlainTextToolCallStream(
} else if (typeof record?.content === "string" && !bufferedText) {
bufferedText = record.content;
}
if (!couldStillBePlainTextToolCall(bufferedText)) {
if (!couldStillBePlainTextToolCall(bufferedText, toolNames)) {
flushBufferedTextEvents();
}
continue;