fix(ui): preserve visible chat stream text

Fix WebChat stream/history reconciliation so visible assistant text survives stale history reloads, tool-history catch-up, and terminal final/error/abort events.\n\nRefactors the UI path into stream reconciliation, stream text, and typed tool-message helpers so persisted history and live stream state use the same matching rules.\n\nCloses #67035.
This commit is contained in:
Onur Solmaz
2026-06-03 16:56:33 +08:00
committed by GitHub
parent c2d7b4a486
commit 17a285f298
13 changed files with 1882 additions and 97 deletions

View File

@@ -1,5 +1,16 @@
import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce";
const TOOL_USE_ID_FIELDS = [
"id",
"tool_call_id",
"toolCallId",
"tool_use_id",
"toolUseId",
] as const;
type ToolUseIdField = (typeof TOOL_USE_ID_FIELDS)[number];
/** Provider-agnostic chat content block shape used before SDK-specific narrowing. */
export type ToolContentBlock = Record<string, unknown>;
export type ToolContentBlock = Record<string, unknown> & Partial<Record<ToolUseIdField, unknown>>;
function normalizeToolContentType(value: unknown): string {
return typeof value === "string" ? value.toLowerCase() : "";
@@ -34,9 +45,11 @@ export function resolveToolBlockArgs(block: ToolContentBlock): unknown {
/** Reads the stable tool-use id across snake_case and camelCase provider field names. */
export function resolveToolUseId(block: ToolContentBlock): string | undefined {
const id =
(typeof block.id === "string" && block.id.trim()) ||
(typeof block.tool_use_id === "string" && block.tool_use_id.trim()) ||
(typeof block.toolUseId === "string" && block.toolUseId.trim());
return id || undefined;
for (const field of TOOL_USE_ID_FIELDS) {
const id = normalizeOptionalString(block[field]);
if (id) {
return id;
}
}
return undefined;
}

View File

@@ -421,7 +421,7 @@ describe("refreshChat", () => {
expect(requestUpdate).toHaveBeenCalled();
});
it("records chat history timing when a reload resets active stream state", async () => {
it("records chat history timing when a reload keeps active stream state visible", async () => {
const request = vi.fn((method: string) => {
if (method === "chat.history") {
return Promise.resolve({
@@ -440,7 +440,7 @@ describe("refreshChat", () => {
await refreshChat(host, { awaitHistory: true, scheduleScroll: false });
expect(host.chatStream).toBeNull();
expect(host.chatStream).toBe("partial");
expect(eventPayloads(host, "control-ui.chat.history")).toEqual(
expect.arrayContaining([
expect.objectContaining({
@@ -448,13 +448,6 @@ describe("refreshChat", () => {
sessionKey: "main",
previousRunId: "run-main",
}),
expect.objectContaining({
phase: "stream-reset",
sessionKey: "main",
previousRunId: "run-main",
activeRunId: "run-main",
visibleMessageCount: 1,
}),
expect.objectContaining({
phase: "applied",
sessionKey: "main",

View File

@@ -286,6 +286,34 @@ describe("app-tool-stream fallback lifecycle handling", () => {
expect(host.chatModelOverrides?.main).toBeNull();
});
it("tags stream segments with the tool they precede", () => {
useToolStreamFakeTimers();
const host = createHost({
chatRunId: "run-1",
chatStream: "visible text before tool",
chatStreamStartedAt: TOOL_STREAM_TEST_NOW - 10,
});
handleAgentEvent(host, {
runId: "run-1",
seq: 1,
stream: "tool",
ts: Date.now(),
sessionKey: "main",
data: {
phase: "start",
name: "exec",
toolCallId: "call_1",
},
});
expect(host.chatStreamSegments).toEqual([
{ text: "visible text before tool", ts: TOOL_STREAM_TEST_NOW, toolCallId: "call_1" },
]);
expect(host.chatStream).toBeNull();
vi.useRealTimers();
});
it("records tool activity summaries without storing raw argument values", () => {
useToolStreamFakeTimers();
const host = createHost();

View File

@@ -60,7 +60,7 @@ type ToolStreamHost = {
chatRunId: string | null;
chatStream: string | null;
chatStreamStartedAt: number | null;
chatStreamSegments: Array<{ text: string; ts: number }>;
chatStreamSegments: Array<{ text: string; ts: number; toolCallId?: string }>;
toolStreamById: Map<string, ToolStreamEntry>;
toolStreamOrder: string[];
chatToolMessages: Record<string, unknown>[];
@@ -791,7 +791,10 @@ export function handleAgentEvent(host: ToolStreamHost, payload?: AgentEventPaylo
host.chatStream &&
host.chatStream.trim().length > 0
) {
host.chatStreamSegments = [...host.chatStreamSegments, { text: host.chatStream, ts: now }];
host.chatStreamSegments = [
...host.chatStreamSegments,
{ text: host.chatStream, ts: now, toolCallId },
];
host.chatStream = null;
host.chatStreamStartedAt = null;
}

View File

@@ -494,6 +494,31 @@ describe("buildChatItems", () => {
expect(messageRecord(requireGroup(items[1])).content).toBe("Missing timestamp.");
});
it("renders an active stream after the persisted user turn it answers", () => {
const items = buildChatItems(
createProps({
messages: [
{
role: "user",
content: [{ type: "text", text: "Persisted prompt." }],
timestamp: 2_000,
},
],
stream: "Visible partial answer.",
streamStartedAt: 1_000,
}),
);
expect(items).toHaveLength(2);
expect(requireGroup(items[0]).role).toBe("user");
expect(items[1]).toMatchObject({
kind: "stream",
text: "Visible partial answer.",
startedAt: 2_001,
isStreaming: true,
});
});
it("renders submitted queued sends as user turns before chat.send ACK", () => {
const groups = messageGroups({
messages: [{ role: "assistant", content: "Ready.", timestamp: 1 }],

View File

@@ -9,6 +9,7 @@ import { extractTextCached } from "./message-extract.ts";
import { normalizeMessage, stripMessageDisplayMetadataText } from "./message-normalizer.ts";
import { normalizeRoleForGrouping } from "./role-normalizer.ts";
import { messageMatchesSearchQuery } from "./search-match.ts";
import { trimAccumulatedStreamPrefix } from "./stream-text.ts";
import { extractToolCardsCached, extractToolPreview } from "./tool-cards.ts";
import { buildUserChatMessageContentBlocks } from "./user-message-content.ts";
@@ -300,13 +301,6 @@ function sanitizeStreamText(text: string): string {
return stripped.trim().length > 0 ? stripped : "";
}
function trimAccumulatedStreamPrefix(text: string, previousText: string | null): string {
if (!previousText || !text.startsWith(previousText)) {
return text;
}
return text.slice(previousText.length).trimStart();
}
function shouldRenderQueuedSendInThread(item: ChatQueueItem): boolean {
if (typeof item.sendSubmittedAtMs !== "number" || item.sendState === "failed") {
return false;
@@ -356,6 +350,19 @@ function chatItemTimestamp(item: ChatItem): number | null {
return null;
}
function timestampAfterVisibleItems(items: ChatItem[], desiredTimestamp: number): number {
const latestTimestamp = items.reduce<number | null>((latest, item) => {
const timestamp = chatItemTimestamp(item);
if (timestamp == null) {
return latest;
}
return latest == null || timestamp > latest ? timestamp : latest;
}, null);
return latestTimestamp != null && desiredTimestamp <= latestTimestamp
? latestTimestamp + 1
: desiredTimestamp;
}
function sortChatItemsByVisibleTime(items: ChatItem[]): ChatItem[] {
return items
.map((item, index) => ({ item, index, timestamp: chatItemTimestamp(item) }))
@@ -667,13 +674,14 @@ export function buildChatItems(props: BuildChatItemsProps): Array<ChatItem | Mes
const key = `stream:${props.sessionKey}:${props.streamStartedAt ?? "live"}`;
const text = sanitizeStreamText(props.stream);
const visibleText = trimAccumulatedStreamPrefix(text, previousAccumulatedStreamText);
const startedAt = timestampAfterVisibleItems(items, props.streamStartedAt ?? Date.now());
if (visibleText.length > 0) {
if (!stripHeartbeatTokenForDisplay(visibleText).shouldSkip) {
items.push({
kind: "stream",
key,
text: visibleText,
startedAt: props.streamStartedAt ?? Date.now(),
startedAt,
isStreaming: true,
});
}

View File

@@ -0,0 +1,460 @@
import { resetToolStream } from "../app-tool-stream.ts";
import { normalizeLowercaseStringOrEmpty } from "../string-coerce.ts";
import { extractText } from "./message-extract.ts";
import { trimAccumulatedStreamPrefix } from "./stream-text.ts";
import { extractToolMessageRefs } from "./tool-message-refs.ts";
export type StreamReconciliationState = {
chatStream: string | null;
chatStreamStartedAt: number | null;
};
type ToolStreamHost = StreamReconciliationState & {
chatStreamSegments?: Array<{ text?: unknown; ts?: unknown; toolCallId?: unknown }>;
chatToolMessages?: unknown[];
toolStreamById?: Map<string, unknown>;
toolStreamOrder?: unknown[];
};
type VisibleAssistantStreamPart = {
text: string;
replacementText: string;
source: "segment" | "current";
timestamp: number;
toolCallId?: string;
};
export type AssistantMessageVisibility = (message: unknown) => boolean;
export type StreamVisibility = (stream: string) => boolean;
export type MaterializeVisibleStreamOptions = {
includeCurrent?: boolean;
requirePersistedTool?: boolean;
replacementMessages?: unknown[];
isHiddenAssistantMessage: AssistantMessageVisibility;
isHiddenStreamText: StreamVisibility;
};
export function currentLiveToolCallIds(state: StreamReconciliationState): string[] {
const toolHost = state as ToolStreamHost;
return Array.isArray(toolHost.toolStreamOrder)
? toolHost.toolStreamOrder.filter(
(value): value is string => typeof value === "string" && value.trim().length > 0,
)
: [];
}
export function lastUserMessageIndex(messages: unknown[]): number {
for (let index = messages.length - 1; index >= 0; index--) {
const message = messages[index];
if (!message || typeof message !== "object") {
continue;
}
const role = normalizeLowercaseStringOrEmpty((message as { role?: unknown }).role);
if (role === "user") {
return index;
}
}
return -1;
}
export function maybeResetToolStream(
state: StreamReconciliationState,
opts?: { preserveStreamSegments?: boolean },
) {
const toolHost = state as ToolStreamHost & Partial<Parameters<typeof resetToolStream>[0]>;
if (
toolHost.toolStreamById instanceof Map &&
Array.isArray(toolHost.toolStreamOrder) &&
Array.isArray(toolHost.chatToolMessages) &&
Array.isArray(toolHost.chatStreamSegments)
) {
const preservedStreamSegments = opts?.preserveStreamSegments
? [...toolHost.chatStreamSegments]
: null;
resetToolStream(toolHost as Parameters<typeof resetToolStream>[0]);
if (preservedStreamSegments) {
toolHost.chatStreamSegments = preservedStreamSegments;
}
}
}
export function clearToolStreamSegments(state: StreamReconciliationState) {
const toolHost = state as ToolStreamHost;
if (Array.isArray(toolHost.chatStreamSegments)) {
toolHost.chatStreamSegments = [];
}
}
export function persistedCurrentToolStreamIds(
messages: unknown[],
state: StreamReconciliationState,
): Set<string> {
const liveToolIds = currentLiveToolCallIds(state);
const matchedToolIds = new Set<string>();
if (liveToolIds.length === 0) {
return matchedToolIds;
}
const liveToolIdSet = new Set(liveToolIds);
const persistedToolIds = new Set<string>();
for (const message of messages.slice(lastUserMessageIndex(messages) + 1)) {
for (const ref of extractToolMessageRefs(message)) {
persistedToolIds.add(ref.id);
}
}
for (const id of persistedToolIds) {
if (liveToolIdSet.has(id)) {
matchedToolIds.add(id);
}
}
return matchedToolIds;
}
function buildAssistantStreamMessage(
stream: string,
replacementText = stream,
timestamp = Date.now(),
): Record<string, unknown> {
return {
role: "assistant",
content: [{ type: "text", text: stream }],
timestamp,
openclawStreamFallback: {
replacementText,
},
};
}
function streamFallbackReplacementText(message: unknown): string | null {
if (!message || typeof message !== "object") {
return null;
}
const fallback = (message as { openclawStreamFallback?: unknown }).openclawStreamFallback;
if (!fallback || typeof fallback !== "object") {
return null;
}
const replacementText = (fallback as { replacementText?: unknown }).replacementText;
if (typeof replacementText === "string" && replacementText.trim()) {
return replacementText.trim();
}
return extractText(message)?.trim() ?? null;
}
function terminalMessageReplacesStreamFallback(message: unknown, fallback: unknown): boolean {
const fallbackText = streamFallbackReplacementText(fallback);
if (!fallbackText) {
return false;
}
const terminalText = extractText(message)?.trim();
return Boolean(
terminalText && (terminalText === fallbackText || terminalText.startsWith(fallbackText)),
);
}
export function appendTerminalAssistantMessage(messages: unknown[], message: unknown): unknown[] {
const retainedMessages = messages.filter((existing, index) => {
if (index <= lastUserMessageIndex(messages)) {
return true;
}
return !terminalMessageReplacesStreamFallback(message, existing);
});
return [...retainedMessages, message];
}
function visibleAssistantStreamText(
stream: string | null,
isHiddenStreamText: StreamVisibility,
): string | null {
if (!stream?.trim() || isHiddenStreamText(stream)) {
return null;
}
return stream;
}
function hasAssistantStreamReplacement(
messages: unknown[],
stream: string,
isHiddenAssistantMessage: AssistantMessageVisibility,
): boolean {
const expected = stream.trim();
if (!expected) {
return false;
}
const startIndex = lastUserMessageIndex(messages) + 1;
return messages.slice(startIndex).some((message) => {
if (!message || typeof message !== "object") {
return false;
}
const role = normalizeLowercaseStringOrEmpty((message as { role?: unknown }).role);
if (role && role !== "assistant") {
return false;
}
if (role === "assistant" && isHiddenAssistantMessage(message)) {
return false;
}
const text = extractText(message)?.trim();
return Boolean(text && (text === expected || text.startsWith(expected)));
});
}
function visibleAssistantStreamParts(
state: StreamReconciliationState,
opts: Pick<MaterializeVisibleStreamOptions, "includeCurrent" | "isHiddenStreamText">,
): VisibleAssistantStreamPart[] {
const streamHost = state as ToolStreamHost;
const liveToolIds = currentLiveToolCallIds(state);
const parts: VisibleAssistantStreamPart[] = [];
let previousText: string | null = null;
const segments = Array.isArray(streamHost.chatStreamSegments)
? streamHost.chatStreamSegments
: [];
for (let segmentIndex = 0; segmentIndex < segments.length; segmentIndex++) {
const segment = segments[segmentIndex];
if (!segment || typeof segment.text !== "string") {
continue;
}
const visible = visibleAssistantStreamText(
trimAccumulatedStreamPrefix(segment.text, previousText),
opts.isHiddenStreamText,
);
if (visible) {
parts.push({
text: visible,
replacementText: segment.text,
source: "segment",
timestamp:
typeof segment.ts === "number" && Number.isFinite(segment.ts) ? segment.ts : Date.now(),
toolCallId:
typeof segment.toolCallId === "string" && segment.toolCallId.trim()
? segment.toolCallId.trim()
: liveToolIds[segmentIndex],
});
}
if (segment.text.trim()) {
previousText = segment.text;
}
}
if (opts.includeCurrent !== false && typeof state.chatStream === "string") {
const visible = visibleAssistantStreamText(
trimAccumulatedStreamPrefix(state.chatStream, previousText),
opts.isHiddenStreamText,
);
if (visible) {
parts.push({
text: visible,
replacementText: state.chatStream,
source: "current",
timestamp: state.chatStreamStartedAt ?? Date.now(),
});
}
}
return parts;
}
export function visibleCurrentAssistantStreamTail(
state: StreamReconciliationState,
isHiddenStreamText: StreamVisibility,
): string | null {
if (typeof state.chatStream !== "string") {
return null;
}
const streamHost = state as ToolStreamHost;
const segments = Array.isArray(streamHost.chatStreamSegments)
? streamHost.chatStreamSegments
: [];
let previousText: string | null = null;
for (const segment of segments) {
if (typeof segment.text === "string" && segment.text.trim()) {
previousText = segment.text;
}
}
return visibleAssistantStreamText(
trimAccumulatedStreamPrefix(state.chatStream, previousText),
isHiddenStreamText,
);
}
function hasAssistantStreamPartReplacement(
messages: unknown[],
part: VisibleAssistantStreamPart,
isHiddenAssistantMessage: AssistantMessageVisibility,
): boolean {
return (
hasAssistantStreamReplacement(messages, part.replacementText, isHiddenAssistantMessage) ||
hasAssistantStreamReplacement(messages, part.text, isHiddenAssistantMessage)
);
}
export function historyReplacedVisibleStream(
messages: unknown[],
state: StreamReconciliationState,
opts: Pick<
MaterializeVisibleStreamOptions,
"includeCurrent" | "isHiddenAssistantMessage" | "isHiddenStreamText"
>,
): boolean {
const parts = visibleAssistantStreamParts(state, opts);
return (
parts.length > 0 &&
parts.every((part) =>
hasAssistantStreamPartReplacement(messages, part, opts.isHiddenAssistantMessage),
)
);
}
export function hasVisibleStreamParts(
state: StreamReconciliationState,
opts: Pick<MaterializeVisibleStreamOptions, "includeCurrent" | "isHiddenStreamText">,
): boolean {
return visibleAssistantStreamParts(state, opts).length > 0;
}
function currentToolStreamMessageIndex(
messages: unknown[],
state: StreamReconciliationState,
toolCallId?: string,
): number {
const liveToolIds = toolCallId ? new Set([toolCallId]) : new Set(currentLiveToolCallIds(state));
if (liveToolIds.size === 0) {
return -1;
}
const startIndex = lastUserMessageIndex(messages) + 1;
for (let index = startIndex; index < messages.length; index++) {
if (extractToolMessageRefs(messages[index]).some((ref) => liveToolIds.has(ref.id))) {
return index;
}
}
return -1;
}
function insertMessageAtIndex(messages: unknown[], message: unknown, index: number): unknown[] {
return [...messages.slice(0, index), message, ...messages.slice(index)];
}
function messageTimestampMs(message: unknown): number | null {
if (!message || typeof message !== "object") {
return null;
}
const timestamp = (message as { timestamp?: unknown; ts?: unknown }).timestamp;
if (typeof timestamp === "number" && Number.isFinite(timestamp)) {
return timestamp;
}
const ts = (message as { timestamp?: unknown; ts?: unknown }).ts;
return typeof ts === "number" && Number.isFinite(ts) ? ts : null;
}
function timestampForInsertedVisibleStream(
messages: unknown[],
index: number,
desiredTimestamp: number,
): number {
const previousTimestamp = messages
.slice(0, index)
.toReversed()
.map(messageTimestampMs)
.find((timestamp): timestamp is number => timestamp != null);
const nextTimestamp = messages
.slice(index)
.map(messageTimestampMs)
.find((timestamp): timestamp is number => timestamp != null);
if (previousTimestamp != null && desiredTimestamp <= previousTimestamp) {
const afterPrevious = previousTimestamp + 1;
return nextTimestamp != null && afterPrevious >= nextTimestamp
? previousTimestamp + (nextTimestamp - previousTimestamp) / 2
: afterPrevious;
}
if (nextTimestamp != null && desiredTimestamp >= nextTimestamp) {
const beforeNext = nextTimestamp - 1;
return previousTimestamp != null && beforeNext <= previousTimestamp
? previousTimestamp + (nextTimestamp - previousTimestamp) / 2
: beforeNext;
}
return desiredTimestamp;
}
export function materializeVisibleStreamState(
messages: unknown[],
state: StreamReconciliationState,
opts: MaterializeVisibleStreamOptions,
): unknown[] {
let nextMessages = messages;
for (const part of visibleAssistantStreamParts(state, opts)) {
const replacementMessages = opts.replacementMessages ?? [];
if (
hasAssistantStreamPartReplacement(
[...nextMessages, ...replacementMessages],
part,
opts.isHiddenAssistantMessage,
)
) {
continue;
}
const toolIndex =
part.source === "segment"
? currentToolStreamMessageIndex(nextMessages, state, part.toolCallId)
: -1;
if (opts.requirePersistedTool && toolIndex < 0) {
continue;
}
const insertIndex = toolIndex >= 0 ? toolIndex : nextMessages.length;
const streamMessage = buildAssistantStreamMessage(
part.text,
part.replacementText,
timestampForInsertedVisibleStream(nextMessages, insertIndex, part.timestamp),
);
nextMessages =
toolIndex >= 0
? insertMessageAtIndex(nextMessages, streamMessage, toolIndex)
: [...nextMessages, streamMessage];
}
return nextMessages;
}
export function prunePersistedToolStreamMessages(
state: StreamReconciliationState,
persistedToolIds: Set<string>,
) {
if (persistedToolIds.size === 0) {
return;
}
const toolHost = state as ToolStreamHost;
const liveToolIds = currentLiveToolCallIds(state);
if (toolHost.toolStreamById instanceof Map) {
for (const id of persistedToolIds) {
toolHost.toolStreamById.delete(id);
}
}
if (Array.isArray(toolHost.toolStreamOrder)) {
toolHost.toolStreamOrder = toolHost.toolStreamOrder.filter(
(id): id is string => typeof id === "string" && !persistedToolIds.has(id),
);
}
if (Array.isArray(toolHost.chatToolMessages)) {
toolHost.chatToolMessages = toolHost.chatToolMessages.filter((message) => {
const refs = extractToolMessageRefs(message);
return refs.every((ref) => !persistedToolIds.has(ref.id));
});
}
if (!Array.isArray(toolHost.chatStreamSegments)) {
return;
}
let lastPrunedAccumulatedText: string | null = null;
toolHost.chatStreamSegments = toolHost.chatStreamSegments.flatMap((segment, index) => {
const explicitToolCallId =
typeof segment.toolCallId === "string" && segment.toolCallId.trim()
? segment.toolCallId.trim()
: null;
const toolCallId = explicitToolCallId ?? liveToolIds[index] ?? null;
const text = typeof segment.text === "string" ? segment.text : "";
if (toolCallId && persistedToolIds.has(toolCallId)) {
if (text.trim()) {
lastPrunedAccumulatedText = text;
}
return [];
}
const nextText = lastPrunedAccumulatedText
? trimAccumulatedStreamPrefix(text, lastPrunedAccumulatedText)
: text;
return [{ ...segment, text: nextText }];
});
}

View File

@@ -0,0 +1,6 @@
export function trimAccumulatedStreamPrefix(text: string, previousText: string | null): string {
if (!previousText || !text.startsWith(previousText)) {
return text;
}
return text.slice(previousText.length).trimStart();
}

View File

@@ -0,0 +1,47 @@
import { describe, expect, it } from "vitest";
import { extractToolMessageRefs } from "./tool-message-refs.ts";
describe("extractToolMessageRefs", () => {
it("extracts canonical toolResult ids", () => {
expect(
extractToolMessageRefs({
role: "toolResult",
toolCallId: "call_1",
toolName: "shell",
}),
).toEqual([{ id: "call_1" }]);
});
it("extracts snake-case tool ids from standalone tool messages", () => {
expect(
extractToolMessageRefs({
role: "tool",
tool_call_id: "call_2",
tool_name: "shell",
}),
).toEqual([{ id: "call_2" }]);
});
it("extracts assistant tool-call block ids", () => {
expect(
extractToolMessageRefs({
role: "assistant",
content: [{ type: "toolcall", id: "call_3", name: "shell", arguments: {} }],
}),
).toEqual([{ id: "call_3" }]);
});
it("extracts assistant tool-result block ids", () => {
expect(
extractToolMessageRefs({
role: "assistant",
content: [{ type: "tool_result", tool_use_id: "call_4", name: "shell", content: "ok" }],
}),
).toEqual([{ id: "call_4" }]);
});
it("ignores plain assistant and user messages", () => {
expect(extractToolMessageRefs({ role: "assistant", content: "hello" })).toEqual([]);
expect(extractToolMessageRefs({ role: "user", content: "hello" })).toEqual([]);
});
});

View File

@@ -0,0 +1,79 @@
import {
isToolCallContentType,
isToolResultContentType,
resolveToolUseId,
} from "../../../../src/chat/tool-content.js";
import { normalizeOptionalString } from "../string-coerce.ts";
import { normalizeRoleForGrouping } from "./role-normalizer.ts";
const TOOL_NAME_FIELDS = ["toolName", "tool_name"] as const;
type ToolNameField = (typeof TOOL_NAME_FIELDS)[number];
type ToolHistoryRecord = Record<string, unknown> & Partial<Record<ToolNameField, unknown>>;
export type ToolMessageRef = {
id: string;
};
function asRecord(value: unknown): ToolHistoryRecord | null {
return value && typeof value === "object" && !Array.isArray(value)
? (value as ToolHistoryRecord)
: null;
}
function addToolRef(refs: ToolMessageRef[], seen: Set<string>, id: string | undefined) {
if (!id || seen.has(id)) {
return;
}
seen.add(id);
refs.push({ id });
}
function isToolLikeRole(role: unknown): boolean {
return typeof role === "string" && normalizeRoleForGrouping(role).toLowerCase() === "tool";
}
function hasToolName(message: ToolHistoryRecord): boolean {
return TOOL_NAME_FIELDS.some((field) => Boolean(normalizeOptionalString(message[field])));
}
function toolContentBlocks(message: Record<string, unknown>): Record<string, unknown>[] {
return Array.isArray(message.content)
? message.content.filter(
(block): block is Record<string, unknown> => Boolean(block) && typeof block === "object",
)
: [];
}
function isToolContentBlock(block: Record<string, unknown>): boolean {
return isToolCallContentType(block.type) || isToolResultContentType(block.type);
}
export function extractToolMessageRefs(message: unknown): ToolMessageRef[] {
const record = asRecord(message);
if (!record) {
return [];
}
const refs: ToolMessageRef[] = [];
const seen = new Set<string>();
const blocks = toolContentBlocks(record);
const hasToolBlock = blocks.some(isToolContentBlock);
const topLevelToolId = resolveToolUseId(record);
const messageHasToolShape = isToolLikeRole(record.role) || hasToolName(record) || hasToolBlock;
// Long term, chat.history should expose canonical toolRefs on UI messages so
// WebChat never infers provider/transcript spellings here. Until then, keep
// raw compatibility isolated at this tool-message boundary.
if (messageHasToolShape) {
addToolRef(refs, seen, topLevelToolId);
}
for (const block of blocks) {
if (!isToolContentBlock(block)) {
continue;
}
addToolRef(refs, seen, resolveToolUseId(block) ?? topLevelToolId);
}
return refs;
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,3 @@
import { resetToolStream } from "../app-tool-stream.ts";
import { getChatAttachmentDataUrl } from "../chat/attachment-payload-store.ts";
import {
isAssistantHeartbeatAckForDisplay,
@@ -6,6 +5,18 @@ import {
} from "../chat/heartbeat-display.ts";
import { extractText } from "../chat/message-extract.ts";
import { reconcileChatRunLifecycle } from "../chat/run-lifecycle.ts";
import {
appendTerminalAssistantMessage,
clearToolStreamSegments,
currentLiveToolCallIds,
hasVisibleStreamParts,
historyReplacedVisibleStream,
materializeVisibleStreamState,
maybeResetToolStream,
persistedCurrentToolStreamIds,
prunePersistedToolStreamMessages,
visibleCurrentAssistantStreamTail,
} from "../chat/stream-reconciliation.ts";
import { buildUserChatMessageContentBlocks } from "../chat/user-message-content.ts";
import { formatConnectError } from "../connect-error.ts";
import {
@@ -148,6 +159,10 @@ function isHeartbeatAckStream(text: string): boolean {
return stripHeartbeatTokenForDisplay(text).shouldSkip;
}
function isHiddenAssistantStreamText(text: string): boolean {
return isSilentReplyStream(text) || isHeartbeatAckStream(text);
}
function shouldHideAssistantChatMessage(message: unknown): boolean {
return isAssistantSilentReply(message) || isAssistantHeartbeatAckForDisplay(message);
}
@@ -160,6 +175,22 @@ function shouldHideHistoryMessage(message: unknown): boolean {
);
}
function materializeVisibleAssistantStreamMessages(
messages: unknown[],
state: ChatState,
opts: {
includeCurrent?: boolean;
requirePersistedTool?: boolean;
replacementMessages?: unknown[];
} = {},
): unknown[] {
return materializeVisibleStreamState(messages, state, {
...opts,
isHiddenAssistantMessage: shouldHideAssistantChatMessage,
isHiddenStreamText: isHiddenAssistantStreamText,
});
}
function hasTranscriptMeta(message: unknown): boolean {
return Boolean(
message &&
@@ -484,18 +515,6 @@ function chatEventSessionMatches(state: ChatState, payload: ChatEventPayload): b
);
}
function maybeResetToolStream(state: ChatState) {
const toolHost = state as ChatState & Partial<Parameters<typeof resetToolStream>[0]>;
if (
toolHost.toolStreamById instanceof Map &&
Array.isArray(toolHost.toolStreamOrder) &&
Array.isArray(toolHost.chatToolMessages) &&
Array.isArray(toolHost.chatStreamSegments)
) {
resetToolStream(toolHost as Parameters<typeof resetToolStream>[0]);
}
}
function resolveDeltaChatStreamText(
currentStream: string | null,
payload: ChatEventPayload,
@@ -709,18 +728,72 @@ async function loadChatHistoryUncached(
state.chatThinkingLevel = res.sessionInfo?.thinkingLevel ?? res.thinkingLevel ?? null;
const resetStream = !state.chatRunId || state.chatRunId === previousRunId;
if (resetStream) {
// Clear all streaming state — history includes tool results and text
// inline, so keeping streaming artifacts would cause duplicates.
maybeResetToolStream(state);
state.chatStream = null;
state.chatStreamStartedAt = null;
recordChatHistoryTiming(state, "stream-reset", startedAtMs, {
requestSessionKey: sessionKey,
requestAgentId,
previousRunId,
messageCount: messages.length,
visibleMessageCount: visibleMessages.length,
});
const streamReconciliation = {
isHiddenAssistantMessage: shouldHideAssistantChatMessage,
isHiddenStreamText: isHiddenAssistantStreamText,
};
const hasVisibleStream = hasVisibleStreamParts(state, streamReconciliation);
const historyReplacedStream = historyReplacedVisibleStream(
state.chatMessages,
state,
streamReconciliation,
);
const liveToolIds = currentLiveToolCallIds(state);
const persistedToolStreamIds = persistedCurrentToolStreamIds(state.chatMessages, state);
const historyReplacedToolStream =
liveToolIds.length > 0 && liveToolIds.every((id) => persistedToolStreamIds.has(id));
const historyReplacedSomeToolStream = persistedToolStreamIds.size > 0;
const liveToolStreamReplaced = liveToolIds.length === 0 || historyReplacedToolStream;
if (!hasVisibleStream || historyReplacedStream) {
if (liveToolStreamReplaced) {
// Clear all streaming state — history includes tool results and text
// inline, so keeping streaming artifacts would cause duplicates.
maybeResetToolStream(state);
} else {
prunePersistedToolStreamMessages(state, persistedToolStreamIds);
clearToolStreamSegments(state);
}
state.chatStream = null;
state.chatStreamStartedAt = null;
recordChatHistoryTiming(state, "stream-reset", startedAtMs, {
requestSessionKey: sessionKey,
requestAgentId,
previousRunId,
messageCount: messages.length,
visibleMessageCount: visibleMessages.length,
});
} else if (!state.chatRunId) {
state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state);
maybeResetToolStream(state);
state.chatStream = null;
state.chatStreamStartedAt = null;
} else if (historyReplacedToolStream) {
state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state, {
includeCurrent: false,
});
state.chatStream = visibleCurrentAssistantStreamTail(
state,
streamReconciliation.isHiddenStreamText,
);
if (state.chatStream === null) {
state.chatStreamStartedAt = null;
}
maybeResetToolStream(state);
} else if (historyReplacedSomeToolStream) {
const visibleCurrentTail = visibleCurrentAssistantStreamTail(
state,
streamReconciliation.isHiddenStreamText,
);
state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state, {
includeCurrent: false,
requirePersistedTool: true,
});
state.chatStream = visibleCurrentTail;
if (state.chatStream === null) {
state.chatStreamStartedAt = null;
}
prunePersistedToolStreamMessages(state, persistedToolStreamIds);
}
}
recordChatHistoryTiming(state, "applied", startedAtMs, {
requestSessionKey: sessionKey,
@@ -1138,54 +1211,46 @@ export function handleChatEvent(state: ChatState, payload?: ChatEventPayload) {
}
} else if (payload.state === "final") {
const finalMessage = normalizeFinalAssistantMessage(payload.message);
const visibleFinalMessage =
finalMessage && !shouldHideAssistantChatMessage(finalMessage) ? finalMessage : null;
const streamedText = state.chatStream ?? "";
const fallbackMessage =
!visibleFinalMessage &&
streamedText.trim() &&
!isSilentReplyStream(streamedText) &&
!isHeartbeatAckStream(streamedText)
? {
role: "assistant",
content: [{ type: "text", text: streamedText }],
timestamp: Date.now(),
}
: null;
reconcileTerminalRun("done", "done");
if (visibleFinalMessage) {
state.chatMessages = [...state.chatMessages, visibleFinalMessage];
} else if (fallbackMessage) {
state.chatMessages = [...state.chatMessages, fallbackMessage];
if (finalMessage && !shouldHideAssistantChatMessage(finalMessage)) {
state.chatMessages = appendTerminalAssistantMessage(state.chatMessages, finalMessage);
} else {
state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state);
}
reconcileTerminalRun("done", "done");
} else if (payload.state === "aborted") {
const normalizedMessage = normalizeAbortedAssistantMessage(payload.message);
const visibleAbortedMessage =
normalizedMessage && !shouldHideAssistantChatMessage(normalizedMessage)
? normalizedMessage
: null;
const streamedText = state.chatStream ?? "";
const fallbackMessage =
!visibleAbortedMessage &&
streamedText.trim() &&
!isSilentReplyStream(streamedText) &&
!isHeartbeatAckStream(streamedText)
? {
role: "assistant",
content: [{ type: "text", text: streamedText }],
timestamp: Date.now(),
}
: null;
reconcileTerminalRun("interrupted", "killed");
if (visibleAbortedMessage) {
state.chatMessages = [...state.chatMessages, visibleAbortedMessage];
} else if (fallbackMessage) {
state.chatMessages = [...state.chatMessages, fallbackMessage];
if (normalizedMessage && !shouldHideAssistantChatMessage(normalizedMessage)) {
state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state, {
replacementMessages: [normalizedMessage],
includeCurrent: false,
});
state.chatMessages = appendTerminalAssistantMessage(state.chatMessages, normalizedMessage);
} else {
state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state);
}
reconcileTerminalRun("interrupted", "killed");
} else if (payload.state === "error") {
const errorMessage = hadActiveRunBeforeEvent ? buildErrorAssistantMessage(payload) : null;
if (errorMessage) {
state.chatMessages = [...state.chatMessages, errorMessage];
const payloadMessage = hadActiveRunBeforeEvent
? normalizeFinalAssistantMessage(payload.message)
: null;
const visiblePayloadMessage =
payloadMessage && !shouldHideAssistantChatMessage(payloadMessage) ? payloadMessage : null;
if (visiblePayloadMessage) {
state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state, {
replacementMessages: [visiblePayloadMessage],
});
state.chatMessages = appendTerminalAssistantMessage(
state.chatMessages,
visiblePayloadMessage,
);
} else {
const errorMessage = hadActiveRunBeforeEvent ? buildErrorAssistantMessage(payload) : null;
if (hadActiveRunBeforeEvent) {
state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state);
}
if (errorMessage) {
state.chatMessages = appendTerminalAssistantMessage(state.chatMessages, errorMessage);
}
}
reconcileTerminalRun("interrupted", "failed");
setChatError(state, payload.errorMessage ?? "chat error");

View File

@@ -363,6 +363,7 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
models: [],
});
await page.locator(".chat-thread").getByText(prompt).waitFor({ timeout: 10_000 });
await page.getByText("First token visible.").waitFor({ timeout: 10_000 });
await gateway.emitChatFinal({ runId, text: "History race stayed visible." });
await page.getByText("History race stayed visible.").waitFor({ timeout: 10_000 });
expect(await gateway.getRequests("agents.list")).toHaveLength(0);
@@ -371,6 +372,53 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
}
});
it("keeps streamed text visible when a chat error terminates the turn", async () => {
const context = await browser.newContext({
locale: "en-US",
serviceWorkers: "block",
viewport: { height: 900, width: 1280 },
});
const page = await context.newPage();
const gateway = await installMockGateway(page);
try {
await page.goto(`${server.baseUrl}chat`);
const prompt = "stream before terminal error";
await page.locator(".agent-chat__composer-combobox textarea").fill(prompt);
await page.getByRole("button", { name: "Send message" }).click();
const sendRequest = await gateway.waitForRequest("chat.send");
const params = requireRecord(sendRequest.params);
const runId = requireString(params.idempotencyKey, "chat send idempotency key");
const partialText = "Partial answer before gateway error.";
await gateway.emitGatewayEvent("chat", {
deltaText: partialText,
message: {
content: [{ text: partialText, type: "text" }],
role: "assistant",
timestamp: Date.now(),
},
runId,
sessionKey: "main",
state: "delta",
});
await page.getByText(partialText).waitFor({ timeout: 10_000 });
await gateway.emitGatewayEvent("chat", {
errorMessage: "gateway disconnected",
runId,
sessionKey: "main",
state: "error",
});
await page.getByText(partialText).waitFor({ timeout: 10_000 });
await page.getByText("Error: gateway disconnected").waitFor({ timeout: 10_000 });
} finally {
await context.close();
}
});
it("keeps a delayed chat.send ACK visible as pending until the ACK resolves", async () => {
const context = await browser.newContext({
locale: "en-US",