mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
refactor(channels): share progress draft compositor
This commit is contained in:
@@ -1,2 +1,2 @@
|
||||
f3e0379cbe0e584a8c9658253d4a808356fe80fb5ec775bbee9e968e8d815380 plugin-sdk-api-baseline.json
|
||||
601b55acafbd1e00b850c9b0c15d587029050906960071d448d37538b223e226 plugin-sdk-api-baseline.jsonl
|
||||
a9501e226bb26befb02072cf5e60c3dc124cbd5dc0b16eb281789d0843f72f71 plugin-sdk-api-baseline.json
|
||||
b106090dc12bf7e46beac4ed160f0cff0ef8039291f24172b693e8d8b752d571 plugin-sdk-api-baseline.jsonl
|
||||
|
||||
106
src/channels/progress-draft-compositor.test.ts
Normal file
106
src/channels/progress-draft-compositor.test.ts
Normal file
@@ -0,0 +1,106 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createChannelProgressDraftCompositor } from "./progress-draft-compositor.js";
|
||||
|
||||
describe("createChannelProgressDraftCompositor", () => {
|
||||
it("keeps the progress label visible when tool lines are hidden", async () => {
|
||||
const update = vi.fn();
|
||||
const progress = createChannelProgressDraftCompositor({
|
||||
entry: {
|
||||
streaming: { mode: "progress", progress: { label: "Shelling", toolProgress: false } },
|
||||
},
|
||||
mode: "progress",
|
||||
active: true,
|
||||
seed: "test",
|
||||
update,
|
||||
});
|
||||
|
||||
await progress.pushToolProgress("🛠️ Exec", { startImmediately: true });
|
||||
|
||||
expect(update).toHaveBeenCalledWith("Shelling", { flush: true });
|
||||
});
|
||||
|
||||
it("keeps reasoning details hidden when tool progress lines are hidden", async () => {
|
||||
const update = vi.fn();
|
||||
const progress = createChannelProgressDraftCompositor({
|
||||
entry: {
|
||||
streaming: { mode: "progress", progress: { label: "Shelling", toolProgress: false } },
|
||||
},
|
||||
mode: "progress",
|
||||
active: true,
|
||||
seed: "test",
|
||||
update,
|
||||
});
|
||||
|
||||
await progress.pushToolProgress("🛠️ Exec", { startImmediately: true });
|
||||
await progress.pushReasoningProgress("Reading files");
|
||||
|
||||
expect(update).toHaveBeenCalledWith("Shelling", { flush: true });
|
||||
expect(update).not.toHaveBeenCalledWith(expect.stringContaining("Reading"), undefined);
|
||||
});
|
||||
|
||||
it("does not resurrect progress after suppression", async () => {
|
||||
const update = vi.fn();
|
||||
const progress = createChannelProgressDraftCompositor({
|
||||
entry: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
|
||||
mode: "progress",
|
||||
active: true,
|
||||
seed: "test",
|
||||
update,
|
||||
});
|
||||
|
||||
progress.suppress();
|
||||
await progress.pushReasoningProgress("Reading files");
|
||||
|
||||
expect(update).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("composes reasoning deltas with tool progress", async () => {
|
||||
const update = vi.fn();
|
||||
const progress = createChannelProgressDraftCompositor({
|
||||
entry: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
|
||||
mode: "progress",
|
||||
active: true,
|
||||
seed: "test",
|
||||
update,
|
||||
});
|
||||
|
||||
await progress.pushToolProgress("🛠️ Exec", { startImmediately: true });
|
||||
await progress.pushReasoningProgress("Reading");
|
||||
await progress.pushReasoningProgress(" files");
|
||||
|
||||
expect(update).toHaveBeenLastCalledWith("Shelling\n\n🛠️ Exec\n• _Reading files_", undefined);
|
||||
});
|
||||
|
||||
it("preserves tagged reasoning content without leaking tags", async () => {
|
||||
const update = vi.fn();
|
||||
const progress = createChannelProgressDraftCompositor({
|
||||
entry: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
|
||||
mode: "progress",
|
||||
active: true,
|
||||
seed: "test",
|
||||
update,
|
||||
});
|
||||
|
||||
await progress.pushToolProgress("🛠️ Exec", { startImmediately: true });
|
||||
await progress.pushReasoningProgress("<think>Checking files</think>");
|
||||
|
||||
expect(update).toHaveBeenLastCalledWith("Shelling\n\n🛠️ Exec\n• _Checking files_", undefined);
|
||||
});
|
||||
|
||||
it("replaces repeated formatted reasoning snapshots", async () => {
|
||||
const update = vi.fn();
|
||||
const progress = createChannelProgressDraftCompositor({
|
||||
entry: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
|
||||
mode: "progress",
|
||||
active: true,
|
||||
seed: "test",
|
||||
update,
|
||||
});
|
||||
|
||||
await progress.pushToolProgress("🛠️ Exec", { startImmediately: true });
|
||||
await progress.pushReasoningProgress("Thinking\n\n_Reading_");
|
||||
await progress.pushReasoningProgress("Thinking\n\n_Reading files_");
|
||||
|
||||
expect(update).toHaveBeenLastCalledWith("Shelling\n\n🛠️ Exec\n• _Reading files_", undefined);
|
||||
});
|
||||
});
|
||||
387
src/channels/progress-draft-compositor.ts
Normal file
387
src/channels/progress-draft-compositor.ts
Normal file
@@ -0,0 +1,387 @@
|
||||
import { formatReasoningMessage } from "../agents/embedded-agent-utils.js";
|
||||
import { stripInlineDirectiveTagsForDelivery } from "../utils/directive-tags.js";
|
||||
import {
|
||||
createChannelProgressDraftGate,
|
||||
type ChannelProgressDraftLine,
|
||||
formatChannelProgressDraftText,
|
||||
isChannelProgressDraftWorkToolName,
|
||||
mergeChannelProgressDraftLine,
|
||||
normalizeChannelProgressDraftLineIdentity,
|
||||
resolveChannelProgressDraftMaxLineChars,
|
||||
resolveChannelProgressDraftMaxLines,
|
||||
resolveChannelStreamingProgressCommentary,
|
||||
resolveChannelStreamingPreviewToolProgress,
|
||||
resolveChannelStreamingSuppressDefaultToolProgressMessages,
|
||||
type StreamingCompatEntry,
|
||||
type StreamingMode,
|
||||
} from "./streaming.js";
|
||||
|
||||
export type ChannelProgressDraftMode = StreamingMode;
|
||||
|
||||
export type ChannelProgressDraftCompositor = ReturnType<
|
||||
typeof createChannelProgressDraftCompositor
|
||||
>;
|
||||
type ProgressDraftLine = string | ChannelProgressDraftLine;
|
||||
|
||||
export function createChannelProgressDraftCompositor(params: {
|
||||
entry: StreamingCompatEntry | null | undefined;
|
||||
mode: ChannelProgressDraftMode;
|
||||
active: boolean;
|
||||
seed: string;
|
||||
update: (text: string, options?: { flush?: boolean }) => Promise<void> | void;
|
||||
deleteCurrent?: () => Promise<void> | void;
|
||||
tryNativeUpdate?: (text: string) => Promise<boolean> | boolean;
|
||||
formatLine?: (line: string) => string;
|
||||
isEmptyLine?: (line: ProgressDraftLine | undefined) => boolean;
|
||||
shouldStartNow?: (line: ProgressDraftLine | undefined) => boolean;
|
||||
}) {
|
||||
const previewToolProgressEnabled =
|
||||
params.active && resolveChannelStreamingPreviewToolProgress(params.entry);
|
||||
const commentaryProgressEnabled =
|
||||
params.active && resolveChannelStreamingProgressCommentary(params.entry);
|
||||
const suppressDefaultToolProgressMessages =
|
||||
params.active &&
|
||||
resolveChannelStreamingSuppressDefaultToolProgressMessages(params.entry, {
|
||||
draftStreamActive: true,
|
||||
previewToolProgressEnabled,
|
||||
});
|
||||
let progressSuppressed = false;
|
||||
let lines: ProgressDraftLine[] = [];
|
||||
let lastRenderedText = "";
|
||||
let reasoningRawText = "";
|
||||
let lastReasoningLine: string | undefined;
|
||||
let finalReplyStarted = false;
|
||||
let finalReplyDelivered = false;
|
||||
|
||||
const formatDraftText = (draftLines = lines, options?: { formatted?: boolean }) =>
|
||||
formatChannelProgressDraftText({
|
||||
entry: params.entry,
|
||||
lines: draftLines,
|
||||
seed: params.seed,
|
||||
formatLine: options?.formatted === false ? undefined : params.formatLine,
|
||||
});
|
||||
|
||||
const clearProgressState = (suppressed: boolean) => {
|
||||
progressSuppressed = suppressed;
|
||||
lines = [];
|
||||
lastRenderedText = "";
|
||||
reasoningRawText = "";
|
||||
lastReasoningLine = undefined;
|
||||
};
|
||||
|
||||
const render = async (options?: { flush?: boolean }): Promise<boolean> => {
|
||||
if (!params.active || params.mode !== "progress") {
|
||||
return false;
|
||||
}
|
||||
const text = formatDraftText();
|
||||
if (!text || text === lastRenderedText) {
|
||||
return false;
|
||||
}
|
||||
lastRenderedText = text;
|
||||
await params.update(text, options);
|
||||
return true;
|
||||
};
|
||||
|
||||
const gate = createChannelProgressDraftGate({
|
||||
onStart: async () => {
|
||||
await render({ flush: true });
|
||||
},
|
||||
});
|
||||
|
||||
const clearLine = async (lineId: string) => {
|
||||
const nextLines = lines.filter(
|
||||
(line) => typeof line !== "object" || line.id?.trim() !== lineId,
|
||||
);
|
||||
if (nextLines.length === lines.length) {
|
||||
return;
|
||||
}
|
||||
lines = nextLines;
|
||||
if (!gate.hasStarted) {
|
||||
return;
|
||||
}
|
||||
const text = formatDraftText();
|
||||
if (text) {
|
||||
await render();
|
||||
return;
|
||||
}
|
||||
lastRenderedText = "";
|
||||
await params.deleteCurrent?.();
|
||||
};
|
||||
|
||||
const noteProgress = async (
|
||||
line?: ProgressDraftLine,
|
||||
options?: { toolName?: string; startImmediately?: boolean },
|
||||
) => {
|
||||
if (!params.active || finalReplyStarted || finalReplyDelivered) {
|
||||
return false;
|
||||
}
|
||||
if (options?.toolName !== undefined && !isChannelProgressDraftWorkToolName(options.toolName)) {
|
||||
return false;
|
||||
}
|
||||
if (params.isEmptyLine?.(line)) {
|
||||
return false;
|
||||
}
|
||||
const normalized = normalizeChannelProgressDraftLineIdentity(line);
|
||||
if (!normalized || progressSuppressed) {
|
||||
return false;
|
||||
}
|
||||
if (params.mode !== "progress" && !previewToolProgressEnabled) {
|
||||
return false;
|
||||
}
|
||||
const progressLine = typeof line === "object" && line !== undefined ? line : normalized;
|
||||
const shouldStoreLine = previewToolProgressEnabled;
|
||||
const nextLines = shouldStoreLine
|
||||
? mergeChannelProgressDraftLine(lines, progressLine, {
|
||||
maxLines: resolveChannelProgressDraftMaxLines(params.entry),
|
||||
})
|
||||
: lines;
|
||||
if (shouldStoreLine && nextLines === lines) {
|
||||
return false;
|
||||
}
|
||||
if (shouldStoreLine && params.tryNativeUpdate) {
|
||||
const text = formatDraftText(nextLines, { formatted: false });
|
||||
if (text && (await params.tryNativeUpdate(text))) {
|
||||
lines = nextLines;
|
||||
lastRenderedText = text;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
lines = nextLines;
|
||||
if (params.mode !== "progress") {
|
||||
if (!shouldStoreLine) {
|
||||
return false;
|
||||
}
|
||||
const text = formatDraftText();
|
||||
if (!text || text === lastRenderedText) {
|
||||
return false;
|
||||
}
|
||||
lastRenderedText = text;
|
||||
await params.update(text);
|
||||
return true;
|
||||
}
|
||||
if (options?.startImmediately || params.shouldStartNow?.(line)) {
|
||||
await gate.startNow();
|
||||
return gate.hasStarted ? await render() : false;
|
||||
}
|
||||
const alreadyStarted = gate.hasStarted;
|
||||
const progressActive = await gate.noteWork();
|
||||
if ((alreadyStarted || progressActive) && gate.hasStarted) {
|
||||
return await render();
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
return {
|
||||
get previewToolProgressEnabled() {
|
||||
return previewToolProgressEnabled;
|
||||
},
|
||||
get commentaryProgressEnabled() {
|
||||
return commentaryProgressEnabled;
|
||||
},
|
||||
get suppressDefaultToolProgressMessages() {
|
||||
return suppressDefaultToolProgressMessages;
|
||||
},
|
||||
get hasStarted() {
|
||||
return gate.hasStarted;
|
||||
},
|
||||
markFinalReplyStarted() {
|
||||
finalReplyStarted = true;
|
||||
},
|
||||
markFinalReplyDelivered() {
|
||||
finalReplyDelivered = true;
|
||||
},
|
||||
reset() {
|
||||
clearProgressState(false);
|
||||
},
|
||||
suppress() {
|
||||
clearProgressState(true);
|
||||
},
|
||||
cancel() {
|
||||
gate.cancel();
|
||||
},
|
||||
start() {
|
||||
return gate.startNow();
|
||||
},
|
||||
pushToolProgress: noteProgress,
|
||||
async pushReasoningProgress(text?: string, options?: { snapshot?: boolean }) {
|
||||
if (
|
||||
!params.active ||
|
||||
params.mode !== "progress" ||
|
||||
!text ||
|
||||
progressSuppressed ||
|
||||
finalReplyDelivered
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
reasoningRawText = mergeReasoningProgressText(reasoningRawText, text, {
|
||||
snapshot: options?.snapshot === true,
|
||||
});
|
||||
const normalized = normalizeReasoningProgressLine(reasoningRawText);
|
||||
if (!normalized) {
|
||||
return false;
|
||||
}
|
||||
const displayLine = formatReasoningProgressDisplayLine(
|
||||
normalized,
|
||||
resolveChannelProgressDraftMaxLineChars(params.entry),
|
||||
);
|
||||
if (!displayLine) {
|
||||
return false;
|
||||
}
|
||||
if (previewToolProgressEnabled) {
|
||||
const priorIndex =
|
||||
lastReasoningLine === undefined ? -1 : lines.lastIndexOf(lastReasoningLine);
|
||||
if (priorIndex >= 0) {
|
||||
lines = [...lines];
|
||||
lines[priorIndex] = displayLine;
|
||||
} else {
|
||||
lines = [...lines, displayLine].slice(-resolveChannelProgressDraftMaxLines(params.entry));
|
||||
}
|
||||
lastReasoningLine = displayLine;
|
||||
}
|
||||
const progressActive = await gate.noteWork();
|
||||
if (progressActive && gate.hasStarted) {
|
||||
return await render();
|
||||
}
|
||||
return false;
|
||||
},
|
||||
async pushCommentaryProgress(text?: string, options?: { itemId?: string }) {
|
||||
if (!params.active || params.mode !== "progress" || !commentaryProgressEnabled) {
|
||||
return false;
|
||||
}
|
||||
if (finalReplyStarted || finalReplyDelivered) {
|
||||
return false;
|
||||
}
|
||||
const itemId = options?.itemId?.trim();
|
||||
if (!text && !itemId) {
|
||||
return false;
|
||||
}
|
||||
const normalized = normalizeCommentaryProgressText(text ?? "");
|
||||
const lineId = itemId ? `commentary:${itemId}` : normalized ? `commentary:${normalized}` : "";
|
||||
if (!normalized) {
|
||||
if (lineId) {
|
||||
await clearLine(lineId);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
const line: ChannelProgressDraftLine = {
|
||||
id: lineId,
|
||||
kind: "item",
|
||||
text: normalized,
|
||||
label: "Commentary",
|
||||
prefix: false,
|
||||
};
|
||||
lines = mergeChannelProgressDraftLine(lines, line, {
|
||||
maxLines: resolveChannelProgressDraftMaxLines(params.entry),
|
||||
});
|
||||
await gate.startNow();
|
||||
return await render();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeReasoningProgressLine(text: string): string {
|
||||
return stripReasoningProgressTags(text)
|
||||
.replace(
|
||||
/^\s*(?:>\s*)?(?:Reasoning:\s*(?:\r?\n|\r)\s*|Thinking\.{0,3}\s*(?:\r?\n|\r)\s*(?:\r?\n|\r)\s*)/i,
|
||||
"",
|
||||
)
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
}
|
||||
|
||||
function stripReasoningProgressTags(text: string): string {
|
||||
return text.replace(
|
||||
/<\s*\/?\s*(?:(?:antml:)?(?:think(?:ing)?|thought)|antthinking)\b[^<>]*>/giu,
|
||||
"",
|
||||
);
|
||||
}
|
||||
|
||||
function normalizeReasoningProgressInput(text: string): string {
|
||||
const normalized = normalizeReasoningProgressLine(text);
|
||||
const italic = normalized.match(/^_(.*)_$/u);
|
||||
return (italic?.[1] ?? normalized).trim();
|
||||
}
|
||||
|
||||
function formatReasoningProgressDisplayLine(text: string, maxChars: number): string {
|
||||
const normalizedText = normalizeReasoningProgressInput(text);
|
||||
const formatted = normalizeReasoningProgressLine(formatReasoningMessage(normalizedText));
|
||||
if (!formatted) {
|
||||
return "";
|
||||
}
|
||||
if (Array.from(formatted).length <= maxChars) {
|
||||
return formatted;
|
||||
}
|
||||
const italic = formatted.match(/^_(.*)_$/u);
|
||||
if (!italic) {
|
||||
return compactReasoningProgressDisplayLine(formatted, maxChars);
|
||||
}
|
||||
const body = compactReasoningProgressDisplayLine(italic[1] ?? "", Math.max(1, maxChars - 2));
|
||||
return body ? `_${body}_` : "";
|
||||
}
|
||||
|
||||
function compactReasoningProgressDisplayLine(text: string, maxChars: number): string {
|
||||
const normalized = text.replace(/\s+/g, " ").trim();
|
||||
const chars = Array.from(normalized);
|
||||
if (chars.length <= maxChars) {
|
||||
return normalized;
|
||||
}
|
||||
if (maxChars <= 1) {
|
||||
return "…";
|
||||
}
|
||||
const head = chars
|
||||
.slice(0, maxChars - 1)
|
||||
.join("")
|
||||
.trimEnd();
|
||||
const boundary = head.search(/\s+\S*$/u);
|
||||
if (boundary > Math.floor(maxChars * 0.6)) {
|
||||
return `${head.slice(0, boundary).trimEnd()}…`;
|
||||
}
|
||||
return `${head}…`;
|
||||
}
|
||||
|
||||
function normalizeCommentaryProgressText(text: string): string {
|
||||
const cleaned = stripInlineDirectiveTagsForDelivery(text).text.trim();
|
||||
if (!cleaned || isSilentCommentaryProgressText(cleaned)) {
|
||||
return "";
|
||||
}
|
||||
return cleaned
|
||||
.split(/\r?\n/u)
|
||||
.map((line) => line.replace(/\s+/g, " ").trim())
|
||||
.filter(Boolean)
|
||||
.map((line) => `_${line}_`)
|
||||
.join("\n");
|
||||
}
|
||||
|
||||
function isSilentCommentaryProgressText(text: string): boolean {
|
||||
const normalized = text.replace(/^[\s*_`~]+|[\s*_`~]+$/gu, "").trim();
|
||||
return /^NO_REPLY$/iu.test(normalized);
|
||||
}
|
||||
|
||||
function mergeReasoningProgressText(
|
||||
current: string,
|
||||
incoming: string,
|
||||
options?: { snapshot?: boolean },
|
||||
): string {
|
||||
if (!current) {
|
||||
return incoming;
|
||||
}
|
||||
const normalizedCurrent = normalizeReasoningProgressInput(current);
|
||||
const normalizedIncoming = normalizeReasoningProgressInput(incoming);
|
||||
if (!normalizedIncoming || normalizedIncoming === normalizedCurrent) {
|
||||
return current;
|
||||
}
|
||||
if (
|
||||
options?.snapshot === true ||
|
||||
isReasoningSnapshotText(incoming) ||
|
||||
normalizedIncoming.startsWith(normalizedCurrent)
|
||||
) {
|
||||
return incoming;
|
||||
}
|
||||
return `${current}${incoming}`;
|
||||
}
|
||||
|
||||
function isReasoningSnapshotText(text: string): boolean {
|
||||
return /^\s*(?:>\s*)?(?:Reasoning:\s*(?:\r?\n|\r)\s*|Thinking\.{0,3}\s*(?:\r?\n|\r)\s*(?:\r?\n|\r)\s*)/i.test(
|
||||
text,
|
||||
);
|
||||
}
|
||||
@@ -26,7 +26,7 @@ export type {
|
||||
} from "../config/types.base.js";
|
||||
export type { SlackChannelStreamingConfig } from "../config/types.slack.js";
|
||||
|
||||
type StreamingCompatEntry = {
|
||||
export type StreamingCompatEntry = {
|
||||
streaming?: unknown;
|
||||
streamMode?: unknown;
|
||||
chunkMode?: unknown;
|
||||
|
||||
@@ -75,6 +75,7 @@ export type { OutboundSendDeps } from "../infra/outbound/send-deps.js";
|
||||
export { sanitizeForPlainText } from "../infra/outbound/sanitize-text.js";
|
||||
export { logAckFailure, logTypingFailure } from "../channels/logging.js";
|
||||
export * from "../channels/streaming.js";
|
||||
export * from "../channels/progress-draft-compositor.js";
|
||||
export {
|
||||
classifyDurableSendRecoveryState,
|
||||
createChannelMessageAdapterFromOutbound,
|
||||
|
||||
Reference in New Issue
Block a user