fix(agents): avoid full stream replay on text deltas (#88252)

Prevent streaming assistant text updates from reparsing the full accumulated reply for plain deltas, avoiding repeated work for small-model streams while preserving full cleanup for directives, media, and final events.

Also load the normal Control UI Vite config in the mock browser server so browser E2E uses the same workspace aliases as dev.

Thanks @vincentkoc.
This commit is contained in:
Vincent Koc
2026-05-31 17:59:45 +01:00
committed by GitHub
parent 723d09ff85
commit 0f6be951e0
3 changed files with 153 additions and 4 deletions

View File

@@ -291,7 +291,7 @@ const server = await createServer({
base: "/",
cacheDir: path.join(repoRoot, ".artifacts", "control-ui-mock-vite"),
clearScreen: false,
configFile: false,
configFile: path.join(uiRoot, "vite.config.ts"),
define: {
OPENCLAW_CONTROL_UI_BUILD_ID: JSON.stringify("mock"),
},

View File

@@ -33,6 +33,7 @@ function createMessageUpdateContext(
state?: Record<string, unknown>;
} = {},
) {
const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator();
return {
params: {
runId: "run-1",
@@ -66,7 +67,11 @@ function createMessageUpdateContext(
log: { debug: params.debug ?? vi.fn() },
noteLastAssistant: vi.fn(),
stripBlockTags: params.stripBlockTags ?? vi.fn((text: string) => text),
consumePartialReplyDirectives: params.consumePartialReplyDirectives ?? vi.fn(() => null),
consumePartialReplyDirectives:
params.consumePartialReplyDirectives ??
vi.fn((text: string, options?: { final?: boolean }) =>
partialReplyDirectiveAccumulator.consume(text, options),
),
emitReasoningStream: vi.fn(),
flushBlockReplyBuffer: params.flushBlockReplyBuffer ?? vi.fn(),
resetAssistantMessageState: params.resetAssistantMessageState ?? vi.fn(),
@@ -332,6 +337,85 @@ describe("handleMessageUpdate text signatures", () => {
]);
});
it("holds incomplete streaming directive tails without emitting them as text", () => {
const onAgentEvent = vi.fn();
const accumulator = createStreamingDirectiveAccumulator();
const context = createMessageUpdateContext({
onAgentEvent,
consumePartialReplyDirectives: vi.fn((text: string, options?: { final?: boolean }) =>
accumulator.consume(text, options),
),
});
const createNonPhaseEvent = (delta: string) =>
({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta,
},
}) as never;
handleMessageUpdate(context, createNonPhaseEvent("Hello\n"));
handleMessageUpdate(context, createNonPhaseEvent("M"));
expect(onAgentEvent).toHaveBeenCalledTimes(1);
expect(firstMockArg(onAgentEvent, "agent event")).toMatchObject({
stream: "assistant",
data: { text: "Hello", delta: "Hello" },
});
expect(context.state.lastStreamedAssistantCleaned).toBe("Hello");
});
it("keeps stripped reply directives out of later plain deltas", () => {
const onAgentEvent = vi.fn();
const context = createMessageUpdateContext({ onAgentEvent });
const createNonPhaseEvent = (delta: string) =>
({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta,
},
}) as never;
handleMessageUpdate(context, createNonPhaseEvent("[[reply_to_current]]\nHello"));
handleMessageUpdate(context, createNonPhaseEvent(" world"));
expect(onAgentEvent.mock.calls.map(([event]) => event)).toMatchObject([
{
stream: "assistant",
data: { text: "Hello", delta: "Hello" },
},
{
stream: "assistant",
data: { text: "Hello world", delta: " world" },
},
]);
});
it("does not expose complete legacy media directives on plain deltas", () => {
const onAgentEvent = vi.fn();
const context = createMessageUpdateContext({ onAgentEvent });
handleMessageUpdate(context, {
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: "Here it is.\nMEDIA:/tmp/final.png\n",
},
} as never);
expect(firstMockArg(onAgentEvent, "agent event")).toMatchObject({
stream: "assistant",
data: { text: "Here it is.", delta: "Here it is." },
});
});
it("uses full partial text for suffix deltas after a suppressed commentary item", () => {
const onAgentEvent = vi.fn();
const context = createMessageUpdateContext({ onAgentEvent });

View File

@@ -384,6 +384,63 @@ function mergeReplyDirectiveResults(
};
}
function parseFullStreamingReplyText(text: string): string {
return parseReplyDirectives(splitTrailingDirective(text).text).text;
}
function containsCompleteMediaDirectiveLine(text: string): boolean {
return /(?:^|\n)\s*MEDIA:\s*\S[^\n]*(?:\n|$)/i.test(text);
}
function resolveIncrementalStreamingReplyText(params: {
evtType: "text_delta" | "text_start" | "text_end";
next: string;
previousRawText: string;
previousCleaned: string;
visibleDelta: string;
parsedStreamDirectives: ReplyDirectiveParseResult | null;
shouldUsePhaseAwareBlockReply: boolean;
}): string | undefined {
if (
params.evtType === "text_end" ||
!params.parsedStreamDirectives ||
params.parsedStreamDirectives.isSilent ||
hasReplyDirectiveMetadata(params.parsedStreamDirectives) ||
containsCompleteMediaDirectiveLine(params.visibleDelta) ||
params.parsedStreamDirectives.text !== params.visibleDelta
) {
return undefined;
}
if (
!params.shouldUsePhaseAwareBlockReply &&
params.previousCleaned === params.previousRawText.trim()
) {
return params.next;
}
const cleanedCandidate = `${params.previousCleaned}${params.parsedStreamDirectives.text}`.trim();
return cleanedCandidate === params.next ? cleanedCandidate : undefined;
}
function resolveStreamingReplyText(params: {
evtType: "text_delta" | "text_start" | "text_end";
next: string;
previousRawText: string;
previousCleaned: string;
visibleDelta: string;
parsedStreamDirectives: ReplyDirectiveParseResult | null;
shouldUsePhaseAwareBlockReply: boolean;
}): string {
if (!params.parsedStreamDirectives) {
return params.evtType === "text_delta"
? params.previousCleaned
: parseFullStreamingReplyText(params.next);
}
return resolveIncrementalStreamingReplyText(params) ?? parseFullStreamingReplyText(params.next);
}
export function recordPendingAssistantReplyDirectives(
state: Pick<EmbeddedAgentSubscribeState, "pendingAssistantReplyDirectives">,
parsed: ReplyDirectiveParseResult | null | undefined,
@@ -671,10 +728,18 @@ export function handleMessageUpdate(
if (shouldUsePhaseAwareBlockReply) {
recordPendingAssistantReplyDirectives(ctx.state, parsedStreamDirectives);
}
const cleanedText = parseReplyDirectives(splitTrailingDirective(next).text).text;
const previousCleaned = ctx.state.lastStreamedAssistantCleaned ?? "";
const cleanedText = resolveStreamingReplyText({
evtType,
next,
previousRawText: ctx.state.lastStreamedAssistant ?? "",
previousCleaned,
visibleDelta,
parsedStreamDirectives,
shouldUsePhaseAwareBlockReply,
});
const { mediaUrls, hasMedia } = resolveSendableOutboundReplyParts(parsedStreamDirectives ?? {});
const hasAudio = Boolean(parsedStreamDirectives?.audioAsVoice);
const previousCleaned = ctx.state.lastStreamedAssistantCleaned ?? "";
let shouldEmit = false;
let deltaText = "";