diff --git a/src/chat/tool-content.ts b/src/chat/tool-content.ts index 44775a3dcefe..f0b289ae0411 100644 --- a/src/chat/tool-content.ts +++ b/src/chat/tool-content.ts @@ -1,5 +1,16 @@ +import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce"; + +const TOOL_USE_ID_FIELDS = [ + "id", + "tool_call_id", + "toolCallId", + "tool_use_id", + "toolUseId", +] as const; +type ToolUseIdField = (typeof TOOL_USE_ID_FIELDS)[number]; + /** Provider-agnostic chat content block shape used before SDK-specific narrowing. */ -export type ToolContentBlock = Record; +export type ToolContentBlock = Record & Partial>; function normalizeToolContentType(value: unknown): string { return typeof value === "string" ? value.toLowerCase() : ""; @@ -34,9 +45,11 @@ export function resolveToolBlockArgs(block: ToolContentBlock): unknown { /** Reads the stable tool-use id across snake_case and camelCase provider field names. */ export function resolveToolUseId(block: ToolContentBlock): string | undefined { - const id = - (typeof block.id === "string" && block.id.trim()) || - (typeof block.tool_use_id === "string" && block.tool_use_id.trim()) || - (typeof block.toolUseId === "string" && block.toolUseId.trim()); - return id || undefined; + for (const field of TOOL_USE_ID_FIELDS) { + const id = normalizeOptionalString(block[field]); + if (id) { + return id; + } + } + return undefined; } diff --git a/ui/src/ui/app-chat.test.ts b/ui/src/ui/app-chat.test.ts index 7748981f6f26..7fcb4125d0ce 100644 --- a/ui/src/ui/app-chat.test.ts +++ b/ui/src/ui/app-chat.test.ts @@ -421,7 +421,7 @@ describe("refreshChat", () => { expect(requestUpdate).toHaveBeenCalled(); }); - it("records chat history timing when a reload resets active stream state", async () => { + it("records chat history timing when a reload keeps active stream state visible", async () => { const request = vi.fn((method: string) => { if (method === "chat.history") { return Promise.resolve({ @@ -440,7 +440,7 @@ describe("refreshChat", () => { await refreshChat(host, { awaitHistory: true, scheduleScroll: false }); - expect(host.chatStream).toBeNull(); + expect(host.chatStream).toBe("partial"); expect(eventPayloads(host, "control-ui.chat.history")).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -448,13 +448,6 @@ describe("refreshChat", () => { sessionKey: "main", previousRunId: "run-main", }), - expect.objectContaining({ - phase: "stream-reset", - sessionKey: "main", - previousRunId: "run-main", - activeRunId: "run-main", - visibleMessageCount: 1, - }), expect.objectContaining({ phase: "applied", sessionKey: "main", diff --git a/ui/src/ui/app-tool-stream.node.test.ts b/ui/src/ui/app-tool-stream.node.test.ts index 3327dd4efa6e..1e4694afcaa7 100644 --- a/ui/src/ui/app-tool-stream.node.test.ts +++ b/ui/src/ui/app-tool-stream.node.test.ts @@ -286,6 +286,34 @@ describe("app-tool-stream fallback lifecycle handling", () => { expect(host.chatModelOverrides?.main).toBeNull(); }); + it("tags stream segments with the tool they precede", () => { + useToolStreamFakeTimers(); + const host = createHost({ + chatRunId: "run-1", + chatStream: "visible text before tool", + chatStreamStartedAt: TOOL_STREAM_TEST_NOW - 10, + }); + + handleAgentEvent(host, { + runId: "run-1", + seq: 1, + stream: "tool", + ts: Date.now(), + sessionKey: "main", + data: { + phase: "start", + name: "exec", + toolCallId: "call_1", + }, + }); + + expect(host.chatStreamSegments).toEqual([ + { text: "visible text before tool", ts: TOOL_STREAM_TEST_NOW, toolCallId: "call_1" }, + ]); + expect(host.chatStream).toBeNull(); + vi.useRealTimers(); + }); + it("records tool activity summaries without storing raw argument values", () => { useToolStreamFakeTimers(); const host = createHost(); diff --git a/ui/src/ui/app-tool-stream.ts b/ui/src/ui/app-tool-stream.ts index ec888650c570..b6359fce5bb4 100644 --- a/ui/src/ui/app-tool-stream.ts +++ b/ui/src/ui/app-tool-stream.ts @@ -60,7 +60,7 @@ type ToolStreamHost = { chatRunId: string | null; chatStream: string | null; chatStreamStartedAt: number | null; - chatStreamSegments: Array<{ text: string; ts: number }>; + chatStreamSegments: Array<{ text: string; ts: number; toolCallId?: string }>; toolStreamById: Map; toolStreamOrder: string[]; chatToolMessages: Record[]; @@ -791,7 +791,10 @@ export function handleAgentEvent(host: ToolStreamHost, payload?: AgentEventPaylo host.chatStream && host.chatStream.trim().length > 0 ) { - host.chatStreamSegments = [...host.chatStreamSegments, { text: host.chatStream, ts: now }]; + host.chatStreamSegments = [ + ...host.chatStreamSegments, + { text: host.chatStream, ts: now, toolCallId }, + ]; host.chatStream = null; host.chatStreamStartedAt = null; } diff --git a/ui/src/ui/chat/build-chat-items.test.ts b/ui/src/ui/chat/build-chat-items.test.ts index ad55a353d787..fdc48ab3c2fa 100644 --- a/ui/src/ui/chat/build-chat-items.test.ts +++ b/ui/src/ui/chat/build-chat-items.test.ts @@ -494,6 +494,31 @@ describe("buildChatItems", () => { expect(messageRecord(requireGroup(items[1])).content).toBe("Missing timestamp."); }); + it("renders an active stream after the persisted user turn it answers", () => { + const items = buildChatItems( + createProps({ + messages: [ + { + role: "user", + content: [{ type: "text", text: "Persisted prompt." }], + timestamp: 2_000, + }, + ], + stream: "Visible partial answer.", + streamStartedAt: 1_000, + }), + ); + + expect(items).toHaveLength(2); + expect(requireGroup(items[0]).role).toBe("user"); + expect(items[1]).toMatchObject({ + kind: "stream", + text: "Visible partial answer.", + startedAt: 2_001, + isStreaming: true, + }); + }); + it("renders submitted queued sends as user turns before chat.send ACK", () => { const groups = messageGroups({ messages: [{ role: "assistant", content: "Ready.", timestamp: 1 }], diff --git a/ui/src/ui/chat/build-chat-items.ts b/ui/src/ui/chat/build-chat-items.ts index 32832c1a0113..7000a5f74a4c 100644 --- a/ui/src/ui/chat/build-chat-items.ts +++ b/ui/src/ui/chat/build-chat-items.ts @@ -9,6 +9,7 @@ import { extractTextCached } from "./message-extract.ts"; import { normalizeMessage, stripMessageDisplayMetadataText } from "./message-normalizer.ts"; import { normalizeRoleForGrouping } from "./role-normalizer.ts"; import { messageMatchesSearchQuery } from "./search-match.ts"; +import { trimAccumulatedStreamPrefix } from "./stream-text.ts"; import { extractToolCardsCached, extractToolPreview } from "./tool-cards.ts"; import { buildUserChatMessageContentBlocks } from "./user-message-content.ts"; @@ -300,13 +301,6 @@ function sanitizeStreamText(text: string): string { return stripped.trim().length > 0 ? stripped : ""; } -function trimAccumulatedStreamPrefix(text: string, previousText: string | null): string { - if (!previousText || !text.startsWith(previousText)) { - return text; - } - return text.slice(previousText.length).trimStart(); -} - function shouldRenderQueuedSendInThread(item: ChatQueueItem): boolean { if (typeof item.sendSubmittedAtMs !== "number" || item.sendState === "failed") { return false; @@ -356,6 +350,19 @@ function chatItemTimestamp(item: ChatItem): number | null { return null; } +function timestampAfterVisibleItems(items: ChatItem[], desiredTimestamp: number): number { + const latestTimestamp = items.reduce((latest, item) => { + const timestamp = chatItemTimestamp(item); + if (timestamp == null) { + return latest; + } + return latest == null || timestamp > latest ? timestamp : latest; + }, null); + return latestTimestamp != null && desiredTimestamp <= latestTimestamp + ? latestTimestamp + 1 + : desiredTimestamp; +} + function sortChatItemsByVisibleTime(items: ChatItem[]): ChatItem[] { return items .map((item, index) => ({ item, index, timestamp: chatItemTimestamp(item) })) @@ -667,13 +674,14 @@ export function buildChatItems(props: BuildChatItemsProps): Array 0) { if (!stripHeartbeatTokenForDisplay(visibleText).shouldSkip) { items.push({ kind: "stream", key, text: visibleText, - startedAt: props.streamStartedAt ?? Date.now(), + startedAt, isStreaming: true, }); } diff --git a/ui/src/ui/chat/stream-reconciliation.ts b/ui/src/ui/chat/stream-reconciliation.ts new file mode 100644 index 000000000000..b784c19c93bd --- /dev/null +++ b/ui/src/ui/chat/stream-reconciliation.ts @@ -0,0 +1,460 @@ +import { resetToolStream } from "../app-tool-stream.ts"; +import { normalizeLowercaseStringOrEmpty } from "../string-coerce.ts"; +import { extractText } from "./message-extract.ts"; +import { trimAccumulatedStreamPrefix } from "./stream-text.ts"; +import { extractToolMessageRefs } from "./tool-message-refs.ts"; + +export type StreamReconciliationState = { + chatStream: string | null; + chatStreamStartedAt: number | null; +}; + +type ToolStreamHost = StreamReconciliationState & { + chatStreamSegments?: Array<{ text?: unknown; ts?: unknown; toolCallId?: unknown }>; + chatToolMessages?: unknown[]; + toolStreamById?: Map; + toolStreamOrder?: unknown[]; +}; + +type VisibleAssistantStreamPart = { + text: string; + replacementText: string; + source: "segment" | "current"; + timestamp: number; + toolCallId?: string; +}; + +export type AssistantMessageVisibility = (message: unknown) => boolean; +export type StreamVisibility = (stream: string) => boolean; + +export type MaterializeVisibleStreamOptions = { + includeCurrent?: boolean; + requirePersistedTool?: boolean; + replacementMessages?: unknown[]; + isHiddenAssistantMessage: AssistantMessageVisibility; + isHiddenStreamText: StreamVisibility; +}; + +export function currentLiveToolCallIds(state: StreamReconciliationState): string[] { + const toolHost = state as ToolStreamHost; + return Array.isArray(toolHost.toolStreamOrder) + ? toolHost.toolStreamOrder.filter( + (value): value is string => typeof value === "string" && value.trim().length > 0, + ) + : []; +} + +export function lastUserMessageIndex(messages: unknown[]): number { + for (let index = messages.length - 1; index >= 0; index--) { + const message = messages[index]; + if (!message || typeof message !== "object") { + continue; + } + const role = normalizeLowercaseStringOrEmpty((message as { role?: unknown }).role); + if (role === "user") { + return index; + } + } + return -1; +} + +export function maybeResetToolStream( + state: StreamReconciliationState, + opts?: { preserveStreamSegments?: boolean }, +) { + const toolHost = state as ToolStreamHost & Partial[0]>; + if ( + toolHost.toolStreamById instanceof Map && + Array.isArray(toolHost.toolStreamOrder) && + Array.isArray(toolHost.chatToolMessages) && + Array.isArray(toolHost.chatStreamSegments) + ) { + const preservedStreamSegments = opts?.preserveStreamSegments + ? [...toolHost.chatStreamSegments] + : null; + resetToolStream(toolHost as Parameters[0]); + if (preservedStreamSegments) { + toolHost.chatStreamSegments = preservedStreamSegments; + } + } +} + +export function clearToolStreamSegments(state: StreamReconciliationState) { + const toolHost = state as ToolStreamHost; + if (Array.isArray(toolHost.chatStreamSegments)) { + toolHost.chatStreamSegments = []; + } +} + +export function persistedCurrentToolStreamIds( + messages: unknown[], + state: StreamReconciliationState, +): Set { + const liveToolIds = currentLiveToolCallIds(state); + const matchedToolIds = new Set(); + if (liveToolIds.length === 0) { + return matchedToolIds; + } + const liveToolIdSet = new Set(liveToolIds); + const persistedToolIds = new Set(); + for (const message of messages.slice(lastUserMessageIndex(messages) + 1)) { + for (const ref of extractToolMessageRefs(message)) { + persistedToolIds.add(ref.id); + } + } + for (const id of persistedToolIds) { + if (liveToolIdSet.has(id)) { + matchedToolIds.add(id); + } + } + return matchedToolIds; +} + +function buildAssistantStreamMessage( + stream: string, + replacementText = stream, + timestamp = Date.now(), +): Record { + return { + role: "assistant", + content: [{ type: "text", text: stream }], + timestamp, + openclawStreamFallback: { + replacementText, + }, + }; +} + +function streamFallbackReplacementText(message: unknown): string | null { + if (!message || typeof message !== "object") { + return null; + } + const fallback = (message as { openclawStreamFallback?: unknown }).openclawStreamFallback; + if (!fallback || typeof fallback !== "object") { + return null; + } + const replacementText = (fallback as { replacementText?: unknown }).replacementText; + if (typeof replacementText === "string" && replacementText.trim()) { + return replacementText.trim(); + } + return extractText(message)?.trim() ?? null; +} + +function terminalMessageReplacesStreamFallback(message: unknown, fallback: unknown): boolean { + const fallbackText = streamFallbackReplacementText(fallback); + if (!fallbackText) { + return false; + } + const terminalText = extractText(message)?.trim(); + return Boolean( + terminalText && (terminalText === fallbackText || terminalText.startsWith(fallbackText)), + ); +} + +export function appendTerminalAssistantMessage(messages: unknown[], message: unknown): unknown[] { + const retainedMessages = messages.filter((existing, index) => { + if (index <= lastUserMessageIndex(messages)) { + return true; + } + return !terminalMessageReplacesStreamFallback(message, existing); + }); + return [...retainedMessages, message]; +} + +function visibleAssistantStreamText( + stream: string | null, + isHiddenStreamText: StreamVisibility, +): string | null { + if (!stream?.trim() || isHiddenStreamText(stream)) { + return null; + } + return stream; +} + +function hasAssistantStreamReplacement( + messages: unknown[], + stream: string, + isHiddenAssistantMessage: AssistantMessageVisibility, +): boolean { + const expected = stream.trim(); + if (!expected) { + return false; + } + const startIndex = lastUserMessageIndex(messages) + 1; + return messages.slice(startIndex).some((message) => { + if (!message || typeof message !== "object") { + return false; + } + const role = normalizeLowercaseStringOrEmpty((message as { role?: unknown }).role); + if (role && role !== "assistant") { + return false; + } + if (role === "assistant" && isHiddenAssistantMessage(message)) { + return false; + } + const text = extractText(message)?.trim(); + return Boolean(text && (text === expected || text.startsWith(expected))); + }); +} + +function visibleAssistantStreamParts( + state: StreamReconciliationState, + opts: Pick, +): VisibleAssistantStreamPart[] { + const streamHost = state as ToolStreamHost; + const liveToolIds = currentLiveToolCallIds(state); + const parts: VisibleAssistantStreamPart[] = []; + let previousText: string | null = null; + const segments = Array.isArray(streamHost.chatStreamSegments) + ? streamHost.chatStreamSegments + : []; + for (let segmentIndex = 0; segmentIndex < segments.length; segmentIndex++) { + const segment = segments[segmentIndex]; + if (!segment || typeof segment.text !== "string") { + continue; + } + const visible = visibleAssistantStreamText( + trimAccumulatedStreamPrefix(segment.text, previousText), + opts.isHiddenStreamText, + ); + if (visible) { + parts.push({ + text: visible, + replacementText: segment.text, + source: "segment", + timestamp: + typeof segment.ts === "number" && Number.isFinite(segment.ts) ? segment.ts : Date.now(), + toolCallId: + typeof segment.toolCallId === "string" && segment.toolCallId.trim() + ? segment.toolCallId.trim() + : liveToolIds[segmentIndex], + }); + } + if (segment.text.trim()) { + previousText = segment.text; + } + } + if (opts.includeCurrent !== false && typeof state.chatStream === "string") { + const visible = visibleAssistantStreamText( + trimAccumulatedStreamPrefix(state.chatStream, previousText), + opts.isHiddenStreamText, + ); + if (visible) { + parts.push({ + text: visible, + replacementText: state.chatStream, + source: "current", + timestamp: state.chatStreamStartedAt ?? Date.now(), + }); + } + } + return parts; +} + +export function visibleCurrentAssistantStreamTail( + state: StreamReconciliationState, + isHiddenStreamText: StreamVisibility, +): string | null { + if (typeof state.chatStream !== "string") { + return null; + } + const streamHost = state as ToolStreamHost; + const segments = Array.isArray(streamHost.chatStreamSegments) + ? streamHost.chatStreamSegments + : []; + let previousText: string | null = null; + for (const segment of segments) { + if (typeof segment.text === "string" && segment.text.trim()) { + previousText = segment.text; + } + } + return visibleAssistantStreamText( + trimAccumulatedStreamPrefix(state.chatStream, previousText), + isHiddenStreamText, + ); +} + +function hasAssistantStreamPartReplacement( + messages: unknown[], + part: VisibleAssistantStreamPart, + isHiddenAssistantMessage: AssistantMessageVisibility, +): boolean { + return ( + hasAssistantStreamReplacement(messages, part.replacementText, isHiddenAssistantMessage) || + hasAssistantStreamReplacement(messages, part.text, isHiddenAssistantMessage) + ); +} + +export function historyReplacedVisibleStream( + messages: unknown[], + state: StreamReconciliationState, + opts: Pick< + MaterializeVisibleStreamOptions, + "includeCurrent" | "isHiddenAssistantMessage" | "isHiddenStreamText" + >, +): boolean { + const parts = visibleAssistantStreamParts(state, opts); + return ( + parts.length > 0 && + parts.every((part) => + hasAssistantStreamPartReplacement(messages, part, opts.isHiddenAssistantMessage), + ) + ); +} + +export function hasVisibleStreamParts( + state: StreamReconciliationState, + opts: Pick, +): boolean { + return visibleAssistantStreamParts(state, opts).length > 0; +} + +function currentToolStreamMessageIndex( + messages: unknown[], + state: StreamReconciliationState, + toolCallId?: string, +): number { + const liveToolIds = toolCallId ? new Set([toolCallId]) : new Set(currentLiveToolCallIds(state)); + if (liveToolIds.size === 0) { + return -1; + } + const startIndex = lastUserMessageIndex(messages) + 1; + for (let index = startIndex; index < messages.length; index++) { + if (extractToolMessageRefs(messages[index]).some((ref) => liveToolIds.has(ref.id))) { + return index; + } + } + return -1; +} + +function insertMessageAtIndex(messages: unknown[], message: unknown, index: number): unknown[] { + return [...messages.slice(0, index), message, ...messages.slice(index)]; +} + +function messageTimestampMs(message: unknown): number | null { + if (!message || typeof message !== "object") { + return null; + } + const timestamp = (message as { timestamp?: unknown; ts?: unknown }).timestamp; + if (typeof timestamp === "number" && Number.isFinite(timestamp)) { + return timestamp; + } + const ts = (message as { timestamp?: unknown; ts?: unknown }).ts; + return typeof ts === "number" && Number.isFinite(ts) ? ts : null; +} + +function timestampForInsertedVisibleStream( + messages: unknown[], + index: number, + desiredTimestamp: number, +): number { + const previousTimestamp = messages + .slice(0, index) + .toReversed() + .map(messageTimestampMs) + .find((timestamp): timestamp is number => timestamp != null); + const nextTimestamp = messages + .slice(index) + .map(messageTimestampMs) + .find((timestamp): timestamp is number => timestamp != null); + if (previousTimestamp != null && desiredTimestamp <= previousTimestamp) { + const afterPrevious = previousTimestamp + 1; + return nextTimestamp != null && afterPrevious >= nextTimestamp + ? previousTimestamp + (nextTimestamp - previousTimestamp) / 2 + : afterPrevious; + } + if (nextTimestamp != null && desiredTimestamp >= nextTimestamp) { + const beforeNext = nextTimestamp - 1; + return previousTimestamp != null && beforeNext <= previousTimestamp + ? previousTimestamp + (nextTimestamp - previousTimestamp) / 2 + : beforeNext; + } + return desiredTimestamp; +} + +export function materializeVisibleStreamState( + messages: unknown[], + state: StreamReconciliationState, + opts: MaterializeVisibleStreamOptions, +): unknown[] { + let nextMessages = messages; + for (const part of visibleAssistantStreamParts(state, opts)) { + const replacementMessages = opts.replacementMessages ?? []; + if ( + hasAssistantStreamPartReplacement( + [...nextMessages, ...replacementMessages], + part, + opts.isHiddenAssistantMessage, + ) + ) { + continue; + } + const toolIndex = + part.source === "segment" + ? currentToolStreamMessageIndex(nextMessages, state, part.toolCallId) + : -1; + if (opts.requirePersistedTool && toolIndex < 0) { + continue; + } + const insertIndex = toolIndex >= 0 ? toolIndex : nextMessages.length; + const streamMessage = buildAssistantStreamMessage( + part.text, + part.replacementText, + timestampForInsertedVisibleStream(nextMessages, insertIndex, part.timestamp), + ); + nextMessages = + toolIndex >= 0 + ? insertMessageAtIndex(nextMessages, streamMessage, toolIndex) + : [...nextMessages, streamMessage]; + } + return nextMessages; +} + +export function prunePersistedToolStreamMessages( + state: StreamReconciliationState, + persistedToolIds: Set, +) { + if (persistedToolIds.size === 0) { + return; + } + const toolHost = state as ToolStreamHost; + const liveToolIds = currentLiveToolCallIds(state); + if (toolHost.toolStreamById instanceof Map) { + for (const id of persistedToolIds) { + toolHost.toolStreamById.delete(id); + } + } + if (Array.isArray(toolHost.toolStreamOrder)) { + toolHost.toolStreamOrder = toolHost.toolStreamOrder.filter( + (id): id is string => typeof id === "string" && !persistedToolIds.has(id), + ); + } + if (Array.isArray(toolHost.chatToolMessages)) { + toolHost.chatToolMessages = toolHost.chatToolMessages.filter((message) => { + const refs = extractToolMessageRefs(message); + return refs.every((ref) => !persistedToolIds.has(ref.id)); + }); + } + if (!Array.isArray(toolHost.chatStreamSegments)) { + return; + } + let lastPrunedAccumulatedText: string | null = null; + toolHost.chatStreamSegments = toolHost.chatStreamSegments.flatMap((segment, index) => { + const explicitToolCallId = + typeof segment.toolCallId === "string" && segment.toolCallId.trim() + ? segment.toolCallId.trim() + : null; + const toolCallId = explicitToolCallId ?? liveToolIds[index] ?? null; + const text = typeof segment.text === "string" ? segment.text : ""; + if (toolCallId && persistedToolIds.has(toolCallId)) { + if (text.trim()) { + lastPrunedAccumulatedText = text; + } + return []; + } + const nextText = lastPrunedAccumulatedText + ? trimAccumulatedStreamPrefix(text, lastPrunedAccumulatedText) + : text; + return [{ ...segment, text: nextText }]; + }); +} diff --git a/ui/src/ui/chat/stream-text.ts b/ui/src/ui/chat/stream-text.ts new file mode 100644 index 000000000000..8eb9b0ee9b7f --- /dev/null +++ b/ui/src/ui/chat/stream-text.ts @@ -0,0 +1,6 @@ +export function trimAccumulatedStreamPrefix(text: string, previousText: string | null): string { + if (!previousText || !text.startsWith(previousText)) { + return text; + } + return text.slice(previousText.length).trimStart(); +} diff --git a/ui/src/ui/chat/tool-message-refs.test.ts b/ui/src/ui/chat/tool-message-refs.test.ts new file mode 100644 index 000000000000..d390f5a91ac8 --- /dev/null +++ b/ui/src/ui/chat/tool-message-refs.test.ts @@ -0,0 +1,47 @@ +import { describe, expect, it } from "vitest"; +import { extractToolMessageRefs } from "./tool-message-refs.ts"; + +describe("extractToolMessageRefs", () => { + it("extracts canonical toolResult ids", () => { + expect( + extractToolMessageRefs({ + role: "toolResult", + toolCallId: "call_1", + toolName: "shell", + }), + ).toEqual([{ id: "call_1" }]); + }); + + it("extracts snake-case tool ids from standalone tool messages", () => { + expect( + extractToolMessageRefs({ + role: "tool", + tool_call_id: "call_2", + tool_name: "shell", + }), + ).toEqual([{ id: "call_2" }]); + }); + + it("extracts assistant tool-call block ids", () => { + expect( + extractToolMessageRefs({ + role: "assistant", + content: [{ type: "toolcall", id: "call_3", name: "shell", arguments: {} }], + }), + ).toEqual([{ id: "call_3" }]); + }); + + it("extracts assistant tool-result block ids", () => { + expect( + extractToolMessageRefs({ + role: "assistant", + content: [{ type: "tool_result", tool_use_id: "call_4", name: "shell", content: "ok" }], + }), + ).toEqual([{ id: "call_4" }]); + }); + + it("ignores plain assistant and user messages", () => { + expect(extractToolMessageRefs({ role: "assistant", content: "hello" })).toEqual([]); + expect(extractToolMessageRefs({ role: "user", content: "hello" })).toEqual([]); + }); +}); diff --git a/ui/src/ui/chat/tool-message-refs.ts b/ui/src/ui/chat/tool-message-refs.ts new file mode 100644 index 000000000000..a37a0e50eb62 --- /dev/null +++ b/ui/src/ui/chat/tool-message-refs.ts @@ -0,0 +1,79 @@ +import { + isToolCallContentType, + isToolResultContentType, + resolveToolUseId, +} from "../../../../src/chat/tool-content.js"; +import { normalizeOptionalString } from "../string-coerce.ts"; +import { normalizeRoleForGrouping } from "./role-normalizer.ts"; + +const TOOL_NAME_FIELDS = ["toolName", "tool_name"] as const; +type ToolNameField = (typeof TOOL_NAME_FIELDS)[number]; +type ToolHistoryRecord = Record & Partial>; + +export type ToolMessageRef = { + id: string; +}; + +function asRecord(value: unknown): ToolHistoryRecord | null { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as ToolHistoryRecord) + : null; +} + +function addToolRef(refs: ToolMessageRef[], seen: Set, id: string | undefined) { + if (!id || seen.has(id)) { + return; + } + seen.add(id); + refs.push({ id }); +} + +function isToolLikeRole(role: unknown): boolean { + return typeof role === "string" && normalizeRoleForGrouping(role).toLowerCase() === "tool"; +} + +function hasToolName(message: ToolHistoryRecord): boolean { + return TOOL_NAME_FIELDS.some((field) => Boolean(normalizeOptionalString(message[field]))); +} + +function toolContentBlocks(message: Record): Record[] { + return Array.isArray(message.content) + ? message.content.filter( + (block): block is Record => Boolean(block) && typeof block === "object", + ) + : []; +} + +function isToolContentBlock(block: Record): boolean { + return isToolCallContentType(block.type) || isToolResultContentType(block.type); +} + +export function extractToolMessageRefs(message: unknown): ToolMessageRef[] { + const record = asRecord(message); + if (!record) { + return []; + } + + const refs: ToolMessageRef[] = []; + const seen = new Set(); + const blocks = toolContentBlocks(record); + const hasToolBlock = blocks.some(isToolContentBlock); + const topLevelToolId = resolveToolUseId(record); + const messageHasToolShape = isToolLikeRole(record.role) || hasToolName(record) || hasToolBlock; + + // Long term, chat.history should expose canonical toolRefs on UI messages so + // WebChat never infers provider/transcript spellings here. Until then, keep + // raw compatibility isolated at this tool-message boundary. + if (messageHasToolShape) { + addToolRef(refs, seen, topLevelToolId); + } + + for (const block of blocks) { + if (!isToolContentBlock(block)) { + continue; + } + addToolRef(refs, seen, resolveToolUseId(block) ?? topLevelToolId); + } + + return refs; +} diff --git a/ui/src/ui/controllers/chat.test.ts b/ui/src/ui/controllers/chat.test.ts index d0bd14ca1112..46abc50bbaf7 100644 --- a/ui/src/ui/controllers/chat.test.ts +++ b/ui/src/ui/controllers/chat.test.ts @@ -439,6 +439,29 @@ describe("handleChatEvent", () => { expect(state.chatStreamStartedAt).toBe(null); }); + it("does not duplicate streamed text when final payload has no role", () => { + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: "Live reply", + chatStreamStartedAt: 100, + }); + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "final", + message: { + text: "Live reply", + }, + }; + + expect(handleChatEvent(state, payload)).toBe("final"); + expect(state.chatMessages).toEqual([payload.message]); + expect(state.chatRunId).toBe(null); + expect(state.chatStream).toBe(null); + expect(state.chatStreamStartedAt).toBe(null); + }); + it("reconciles cached run and indicator state on terminal events", () => { vi.useFakeTimers(); try { @@ -758,7 +781,7 @@ describe("handleChatEvent", () => { const assignments = trackChatMessagesAssignments(state); expect(handleChatEvent(state, payload)).toBe("final"); - expect(assignments).toMatchObject([{ chatRunId: null, chatStream: null }]); + expect(assignments).toMatchObject([{ chatRunId: "run-1", chatStream: "Here is my reply" }]); expect(state.chatRunId).toBe(null); expect(state.chatStream).toBe(null); expect(state.chatStreamStartedAt).toBe(null); @@ -824,7 +847,79 @@ describe("handleChatEvent", () => { expect(state.chatStream).toBe(null); }); - it("appends final payload message from own run after clearing stream state", () => { + it("keeps repeated assistant final text from a later turn", () => { + const firstUser = { + role: "user", + content: [{ type: "text", text: "first" }], + timestamp: 1, + }; + const firstAssistant = { + role: "assistant", + content: [{ type: "text", text: "OK" }], + timestamp: 2, + }; + const secondUser = { + role: "user", + content: [{ type: "text", text: "second" }], + timestamp: 3, + }; + const secondAssistant = { + role: "assistant", + content: [{ type: "text", text: "OK" }], + timestamp: 4, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-2", + chatMessages: [firstUser, firstAssistant, secondUser], + }); + const payload: ChatEventPayload = { + runId: "run-2", + sessionKey: "main", + state: "final", + message: secondAssistant, + }; + + expect(handleChatEvent(state, payload)).toBe("final"); + expect(state.chatMessages).toEqual([firstUser, firstAssistant, secondUser, secondAssistant]); + }); + + it("keeps repeated assistant final text within the same turn", () => { + const user = { + role: "user", + content: [{ type: "text", text: "repeat" }], + timestamp: 1, + }; + const firstAssistant = { + role: "assistant", + content: [{ type: "text", text: "OK" }], + timestamp: 2, + }; + const secondAssistant = { + role: "assistant", + content: [ + { type: "text", text: "OK" }, + { type: "canvas", url: "/__openclaw__/canvas/documents/repeat/index.html" }, + ], + timestamp: 3, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatMessages: [user, firstAssistant], + }); + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "final", + message: secondAssistant, + }; + + expect(handleChatEvent(state, payload)).toBe("final"); + expect(state.chatMessages).toEqual([user, firstAssistant, secondAssistant]); + }); + + it("appends final payload message from own run before clearing stream state", () => { const state = createState({ sessionKey: "main", chatRunId: "run-1", @@ -844,13 +939,39 @@ describe("handleChatEvent", () => { const assignments = trackChatMessagesAssignments(state); expect(handleChatEvent(state, payload)).toBe("final"); - expect(assignments).toMatchObject([{ chatRunId: null, chatStream: null }]); + expect(assignments).toMatchObject([{ chatRunId: "run-1", chatStream: "Reply" }]); expect(state.chatMessages).toEqual([payload.message]); expect(state.chatRunId).toBe(null); expect(state.chatStream).toBe(null); expect(state.chatStreamStartedAt).toBe(null); }); + it("does not materialize stream segments when final payload is renderable", () => { + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: null, + chatStreamStartedAt: null, + }) as ChatState & { chatStreamSegments: Array<{ text: string; ts: number }> }; + state.chatStreamSegments = [{ text: "before tool", ts: 1 }]; + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "final", + message: { + role: "assistant", + content: [{ type: "text", text: "source reply final" }], + timestamp: 101, + }, + }; + + expect(handleChatEvent(state, payload)).toBe("final"); + expect(state.chatMessages).toEqual([payload.message]); + expect(state.chatRunId).toBe(null); + expect(state.chatStream).toBe(null); + expect(state.chatStreamSegments).toEqual([{ text: "before tool", ts: 1 }]); + }); + it("processes aborted from own run and keeps partial assistant message", () => { const existingMessage = { role: "user", @@ -878,7 +999,10 @@ describe("handleChatEvent", () => { const assignments = trackChatMessagesAssignments(state); expect(handleChatEvent(state, payload)).toBe("aborted"); - expect(assignments).toMatchObject([{ chatRunId: null, chatStream: null }]); + expect(assignments.at(-1)).toMatchObject({ + chatRunId: "run-1", + chatStream: "Partial reply", + }); expect(state.chatRunId).toBe(null); expect(state.chatStream).toBe(null); expect(state.chatStreamStartedAt).toBe(null); @@ -997,6 +1121,169 @@ describe("handleChatEvent", () => { expect(state.lastError).toBe('No API key found for provider "openai".'); }); + it("keeps streamed assistant text visible when an error ends the run", () => { + const existingMessage = { + role: "user", + content: [{ type: "text", text: "Ping" }], + timestamp: 1, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatMessages: [existingMessage], + chatStream: "Partial answer before gateway error.", + chatStreamStartedAt: 100, + }); + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "error", + errorMessage: "gateway disconnected", + }; + + expect(handleChatEvent(state, payload)).toBe("error"); + expect(state.chatRunId).toBe(null); + expect(state.chatStream).toBe(null); + expect(state.chatStreamStartedAt).toBe(null); + expect(state.chatMessages).toHaveLength(3); + expect(state.chatMessages[0]).toEqual(existingMessage); + expectTextChatMessage( + state.chatMessages[1], + "assistant", + "Partial answer before gateway error.", + ); + expectTextChatMessage(state.chatMessages[2], "assistant", "Error: gateway disconnected"); + expect(state.lastError).toBe("gateway disconnected"); + }); + + it("does not duplicate streamed text when the error payload already carries it", () => { + const message = { + role: "assistant", + content: [{ type: "text", text: "Partial answer before gateway error." }], + timestamp: 101, + metadata: { source: "gateway" }, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: "Partial answer before gateway error.", + chatStreamStartedAt: 100, + }); + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "error", + errorMessage: "gateway disconnected", + message, + }; + + expect(handleChatEvent(state, payload)).toBe("error"); + expect(state.chatMessages).toEqual([message]); + }); + + it("does not keep partial stream when the error payload contains the fuller text", () => { + const message = { + role: "assistant", + content: [{ type: "text", text: "Partial answer before gateway error. Final detail." }], + timestamp: 101, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: "Partial answer before gateway error.", + chatStreamStartedAt: 100, + }); + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "error", + errorMessage: "gateway disconnected", + message, + }; + + expect(handleChatEvent(state, payload)).toBe("error"); + expect(state.chatMessages).toEqual([message]); + }); + + it("keeps stream segments visible when an error ends after a tool event", () => { + const existingMessage = { + role: "user", + content: [{ type: "text", text: "Ping" }], + timestamp: 1, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatMessages: [existingMessage], + chatStream: null, + chatStreamStartedAt: null, + }) as ChatState & { chatStreamSegments: Array<{ text: string; ts: number }> }; + state.chatStreamSegments = [{ text: "Visible text before tool.", ts: 100 }]; + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "error", + errorMessage: "gateway disconnected", + }; + + expect(handleChatEvent(state, payload)).toBe("error"); + expect(state.chatMessages).toHaveLength(3); + expect(state.chatMessages[0]).toEqual(existingMessage); + expectTextChatMessage(state.chatMessages[1], "assistant", "Visible text before tool."); + expectTextChatMessage(state.chatMessages[2], "assistant", "Error: gateway disconnected"); + }); + + it("does not treat substring matches as stream replacement", () => { + const message = { + role: "assistant", + content: [{ type: "text", text: "Error: provider said NOT OK yet." }], + timestamp: 101, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: "OK", + chatStreamStartedAt: 100, + }); + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "error", + errorMessage: "provider said NOT OK yet", + message, + }; + + expect(handleChatEvent(state, payload)).toBe("error"); + expect(state.chatMessages).toHaveLength(2); + expectTextChatMessage(state.chatMessages[0], "assistant", "OK"); + expect(state.chatMessages[1]).toEqual(message); + }); + + it("does not duplicate post-tool stream tail when error payload has full text", () => { + const message = { + role: "assistant", + content: [{ type: "text", text: "First thought. After tool. Final detail." }], + timestamp: 101, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: "First thought. After tool.", + chatStreamStartedAt: 100, + }) as ChatState & { chatStreamSegments: Array<{ text: string; ts: number }> }; + state.chatStreamSegments = [{ text: "First thought.", ts: 90 }]; + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "error", + errorMessage: "gateway disconnected", + message, + }; + + expect(handleChatEvent(state, payload)).toBe("error"); + expect(state.chatMessages).toEqual([message]); + }); + it("prefers server-provided assistant error messages", () => { const state = createState({ sessionKey: "main", @@ -1982,6 +2269,729 @@ describe("loadChatHistory retry handling", () => { expect(state.chatStream).toBeNull(); }); + it("keeps active streamed assistant text when history reload returns a stale snapshot", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "first" }], + __openclaw: { seq: 1 }, + }; + const optimisticUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + timestamp: 10, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser, optimisticUser], + chatRunId: "run-1", + chatStream: "First visible stream text.", + chatStreamStartedAt: 100, + }); + + await loadChatHistory(state); + + expect(state.chatMessages).toEqual([persistedUser, optimisticUser]); + expect(state.chatRunId).toBe("run-1"); + expect(state.chatStream).toBe("First visible stream text."); + expect(state.chatStreamStartedAt).toBe(100); + }); + + it("clears live tool cards when history catches up before assistant text", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const persistedToolResult = { + role: "toolResult", + toolCallId: "call_1", + toolName: "shell", + content: [{ type: "text", text: "tool output" }], + timestamp: 2, + __openclaw: { seq: 2 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, persistedToolResult], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: "run-1", + chatStream: "Still answering.", + chatStreamStartedAt: 100, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [{ text: "before tool", ts: 1 }]; + state.chatToolMessages = [persistedToolResult]; + state.toolStreamById = new Map([["call_1", { message: persistedToolResult }]]); + state.toolStreamOrder = ["call_1"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(3); + expect(state.chatMessages[0]).toEqual(persistedUser); + expectTextChatMessage(state.chatMessages[1], "assistant", "before tool"); + expect(requireRecord(state.chatMessages[1]).timestamp).toBe(1); + expect(state.chatMessages[2]).toEqual(persistedToolResult); + expect(state.chatRunId).toBe("run-1"); + expect(state.chatStream).toBe("Still answering."); + expect(state.chatStreamStartedAt).toBe(100); + expect(state.chatToolMessages).toEqual([]); + expect(state.chatStreamSegments).toEqual([]); + expect(state.toolStreamById.size).toBe(0); + expect(state.toolStreamOrder).toEqual([]); + }); + + it("inserts multiple recovered stream segments before their matching persisted tools", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const firstToolResult = { + role: "toolResult", + toolCallId: "call_1", + toolName: "shell", + content: [{ type: "text", text: "first output" }], + timestamp: 2, + __openclaw: { seq: 2 }, + }; + const secondToolResult = { + role: "toolResult", + toolCallId: "call_2", + toolName: "shell", + content: [{ type: "text", text: "second output" }], + timestamp: 4, + __openclaw: { seq: 3 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, firstToolResult, secondToolResult], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: "run-1", + chatStream: "Still answering.", + chatStreamStartedAt: 100, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [ + { text: "before first tool", ts: 1 }, + { text: "before first tool\nbefore second tool", ts: 3 }, + ]; + state.chatToolMessages = [firstToolResult, secondToolResult]; + state.toolStreamById = new Map([ + ["call_1", { message: firstToolResult }], + ["call_2", { message: secondToolResult }], + ]); + state.toolStreamOrder = ["call_1", "call_2"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(5); + expect(state.chatMessages[0]).toEqual(persistedUser); + expectTextChatMessage(state.chatMessages[1], "assistant", "before first tool"); + expect(state.chatMessages[2]).toEqual(firstToolResult); + expectTextChatMessage(state.chatMessages[3], "assistant", "before second tool"); + expect(state.chatMessages[4]).toEqual(secondToolResult); + expect(requireRecord(state.chatMessages[1]).timestamp).toBe(1); + expect(requireRecord(state.chatMessages[3]).timestamp).toBe(3); + expect(state.chatToolMessages).toEqual([]); + expect(state.chatStreamSegments).toEqual([]); + expect(state.toolStreamById.size).toBe(0); + expect(state.toolStreamOrder).toEqual([]); + }); + + it("prunes only the live tool cards that history has caught up with", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const firstToolResult = { + role: "toolResult", + toolCallId: "call_1", + toolName: "shell", + content: [{ type: "text", text: "first output" }], + timestamp: 2, + __openclaw: { seq: 2 }, + }; + const secondLiveToolResult = { + role: "assistant", + toolCallId: "call_2", + runId: "run-1", + content: [ + { type: "toolcall", name: "shell", arguments: {} }, + { type: "toolresult", name: "shell", text: "second output" }, + ], + timestamp: 4, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, firstToolResult], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: "run-1", + chatStream: "before first tool\nbefore second tool\nStill answering.", + chatStreamStartedAt: 100, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number; toolCallId?: string }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [ + { text: "before first tool", ts: 1, toolCallId: "call_1" }, + { + text: "before first tool\nbefore second tool", + ts: 3, + toolCallId: "call_2", + }, + ]; + state.chatToolMessages = [firstToolResult, secondLiveToolResult]; + state.toolStreamById = new Map([ + ["call_1", { message: firstToolResult }], + ["call_2", { message: secondLiveToolResult }], + ]); + state.toolStreamOrder = ["call_1", "call_2"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(3); + expect(state.chatMessages[0]).toEqual(persistedUser); + expectTextChatMessage(state.chatMessages[1], "assistant", "before first tool"); + expect(state.chatMessages[2]).toEqual(firstToolResult); + expect(state.chatToolMessages).toEqual([secondLiveToolResult]); + expect(state.chatStreamSegments).toEqual([ + { text: "before second tool", ts: 3, toolCallId: "call_2" }, + ]); + expect(state.chatStream).toBe("Still answering."); + expect(state.toolStreamById.size).toBe(1); + expect(state.toolStreamById.has("call_2")).toBe(true); + expect(state.toolStreamOrder).toEqual(["call_2"]); + }); + + it("uses segment tool ids when a tool starts before any stream text", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const firstToolResult = { + role: "toolResult", + toolCallId: "call_1", + toolName: "shell", + content: [{ type: "text", text: "first output" }], + timestamp: 2, + __openclaw: { seq: 2 }, + }; + const secondToolResult = { + role: "toolResult", + toolCallId: "call_2", + toolName: "shell", + content: [{ type: "text", text: "second output" }], + timestamp: 4, + __openclaw: { seq: 3 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, firstToolResult, secondToolResult], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: "run-1", + chatStream: "Still answering.", + chatStreamStartedAt: 100, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number; toolCallId?: string }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [{ text: "before second tool", ts: 3, toolCallId: "call_2" }]; + state.chatToolMessages = [firstToolResult, secondToolResult]; + state.toolStreamById = new Map([ + ["call_1", { message: firstToolResult }], + ["call_2", { message: secondToolResult }], + ]); + state.toolStreamOrder = ["call_1", "call_2"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(4); + expect(state.chatMessages[0]).toEqual(persistedUser); + expect(state.chatMessages[1]).toEqual(firstToolResult); + expectTextChatMessage(state.chatMessages[2], "assistant", "before second tool"); + expect(state.chatMessages[3]).toEqual(secondToolResult); + expect(requireRecord(state.chatMessages[2]).timestamp).toBe(3); + expect(state.chatToolMessages).toEqual([]); + expect(state.chatStreamSegments).toEqual([]); + expect(state.toolStreamById.size).toBe(0); + expect(state.toolStreamOrder).toEqual([]); + }); + + it("trims accumulated current stream after materializing caught-up tool segments", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const persistedToolResult = { + role: "toolResult", + toolCallId: "call_1", + toolName: "shell", + content: [{ type: "text", text: "tool output" }], + timestamp: 2, + __openclaw: { seq: 2 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, persistedToolResult], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: "run-1", + chatStream: "before tool\nafter tool", + chatStreamStartedAt: 100, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number; toolCallId?: string }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [{ text: "before tool", ts: 1, toolCallId: "call_1" }]; + state.chatToolMessages = [persistedToolResult]; + state.toolStreamById = new Map([["call_1", { message: persistedToolResult }]]); + state.toolStreamOrder = ["call_1"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(3); + expect(state.chatMessages[0]).toEqual(persistedUser); + expectTextChatMessage(state.chatMessages[1], "assistant", "before tool"); + expect(state.chatMessages[2]).toEqual(persistedToolResult); + expect(state.chatStream).toBe("after tool"); + expect(state.chatStreamStartedAt).toBe(100); + expect(state.chatToolMessages).toEqual([]); + expect(state.chatStreamSegments).toEqual([]); + expect(state.toolStreamById.size).toBe(0); + expect(state.toolStreamOrder).toEqual([]); + }); + + it("keeps live tool cards when only older history has a persisted tool result", async () => { + const olderUser = { + role: "user", + content: [{ type: "text", text: "older ask" }], + __openclaw: { seq: 1 }, + }; + const olderToolResult = { + role: "toolResult", + toolCallId: "call_old", + toolName: "shell", + content: [{ type: "text", text: "old tool output" }], + __openclaw: { seq: 2 }, + }; + const latestUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 3 }, + }; + const liveToolMessage = { + role: "assistant", + toolCallId: "call_current", + runId: "run-1", + content: [{ type: "toolcall", name: "shell", arguments: {} }], + }; + const request = vi.fn().mockResolvedValue({ + messages: [olderUser, olderToolResult, latestUser], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [olderUser, olderToolResult, latestUser], + chatRunId: "run-1", + chatStream: "Still answering.", + chatStreamStartedAt: 100, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [{ text: "before current tool", ts: 1 }]; + state.chatToolMessages = [liveToolMessage]; + state.toolStreamById = new Map([["call_current", { message: liveToolMessage }]]); + state.toolStreamOrder = ["call_current"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toEqual([olderUser, olderToolResult, latestUser]); + expect(state.chatRunId).toBe("run-1"); + expect(state.chatStream).toBe("Still answering."); + expect(state.chatStreamStartedAt).toBe(100); + expect(state.chatToolMessages).toEqual([liveToolMessage]); + expect(state.chatStreamSegments).toEqual([{ text: "before current tool", ts: 1 }]); + expect(state.toolStreamById.size).toBe(1); + expect(state.toolStreamOrder).toEqual(["call_current"]); + }); + + it("clears live tool cards when history catches up with content-block tool ids", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const persistedToolCall = { + role: "assistant", + content: [ + { + type: "toolCall", + id: "call_1", + name: "shell", + arguments: {}, + }, + ], + timestamp: 2, + __openclaw: { seq: 2 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, persistedToolCall], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: "run-1", + chatStream: "Still answering.", + chatStreamStartedAt: 100, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [{ text: "before tool", ts: 1 }]; + state.chatToolMessages = [ + { + role: "assistant", + toolCallId: "call_1", + runId: "run-1", + content: [{ type: "toolcall", name: "shell", arguments: {} }], + }, + ]; + state.toolStreamById = new Map([["call_1", { message: state.chatToolMessages[0] }]]); + state.toolStreamOrder = ["call_1"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(3); + expect(state.chatMessages[0]).toEqual(persistedUser); + expectTextChatMessage(state.chatMessages[1], "assistant", "before tool"); + expect(requireRecord(state.chatMessages[1]).timestamp).toBe(1); + expect(state.chatMessages[2]).toEqual(persistedToolCall); + expect(state.chatRunId).toBe("run-1"); + expect(state.chatStream).toBe("Still answering."); + expect(state.chatStreamStartedAt).toBe(100); + expect(state.chatToolMessages).toEqual([]); + expect(state.chatStreamSegments).toEqual([]); + expect(state.toolStreamById.size).toBe(0); + expect(state.toolStreamOrder).toEqual([]); + }); + + it("keeps segment-only streamed text when history catches up with tools", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const persistedToolResult = { + role: "toolResult", + toolCallId: "call_1", + toolName: "shell", + content: [{ type: "text", text: "tool output" }], + timestamp: 2, + __openclaw: { seq: 2 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, persistedToolResult], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: "run-1", + chatStream: null, + chatStreamStartedAt: 100, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [{ text: "before tool", ts: 1 }]; + state.chatToolMessages = [persistedToolResult]; + state.toolStreamById = new Map([["call_1", { message: persistedToolResult }]]); + state.toolStreamOrder = ["call_1"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(3); + expect(state.chatMessages[0]).toEqual(persistedUser); + expectTextChatMessage(state.chatMessages[1], "assistant", "before tool"); + expect(requireRecord(state.chatMessages[1]).timestamp).toBe(1); + expect(state.chatMessages[2]).toEqual(persistedToolResult); + expect(state.chatRunId).toBe("run-1"); + expect(state.chatStream).toBeNull(); + expect(state.chatStreamStartedAt).toBeNull(); + expect(state.chatToolMessages).toEqual([]); + expect(state.chatStreamSegments).toEqual([]); + expect(state.toolStreamById.size).toBe(0); + expect(state.toolStreamOrder).toEqual([]); + }); + + it("materializes orphaned streamed assistant text when history reload is stale", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "first" }], + __openclaw: { seq: 1 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: null, + chatStream: "Partial answer before history catch-up.", + chatStreamStartedAt: 100, + }); + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(2); + expect(state.chatMessages[0]).toEqual(persistedUser); + expectTextChatMessage( + state.chatMessages[1], + "assistant", + "Partial answer before history catch-up.", + ); + expect(state.chatStream).toBeNull(); + expect(state.chatStreamStartedAt).toBeNull(); + }); + + it("timestamps materialized streamed text after the persisted user prompt", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "first" }], + timestamp: 200, + __openclaw: { seq: 1 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: null, + chatStream: "Partial answer before history catch-up.", + chatStreamStartedAt: 100, + }); + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(2); + expect(state.chatMessages[0]).toEqual(persistedUser); + expectTextChatMessage( + state.chatMessages[1], + "assistant", + "Partial answer before history catch-up.", + ); + expect(requireRecord(state.chatMessages[1]).timestamp).toBe(201); + expect(state.chatStream).toBeNull(); + expect(state.chatStreamStartedAt).toBeNull(); + }); + + it("materializes orphaned segment-only assistant text before clearing caught-up tools", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const persistedToolResult = { + role: "toolResult", + toolCallId: "call_1", + toolName: "shell", + content: [{ type: "text", text: "tool output" }], + __openclaw: { seq: 2 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, persistedToolResult], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: null, + chatStream: null, + chatStreamStartedAt: null, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [{ text: "before tool", ts: 1 }]; + state.chatToolMessages = [persistedToolResult]; + state.toolStreamById = new Map([["call_1", { message: persistedToolResult }]]); + state.toolStreamOrder = ["call_1"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toHaveLength(3); + expect(state.chatMessages[0]).toEqual(persistedUser); + expectTextChatMessage(state.chatMessages[1], "assistant", "before tool"); + expect(state.chatMessages[2]).toEqual(persistedToolResult); + expect(state.chatStream).toBeNull(); + expect(state.chatStreamStartedAt).toBeNull(); + expect(state.chatToolMessages).toEqual([]); + expect(state.chatStreamSegments).toEqual([]); + expect(state.toolStreamById.size).toBe(0); + expect(state.toolStreamOrder).toEqual([]); + }); + + it("clears streamed assistant text when history already contains the replacement", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const historyAssistant = { + role: "assistant", + content: [{ type: "text", text: "First visible stream text. More final text." }], + __openclaw: { seq: 2 }, + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, historyAssistant], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: "run-1", + chatStream: "First visible stream text.", + chatStreamStartedAt: 100, + }); + + await loadChatHistory(state); + + expect(state.chatMessages).toEqual([persistedUser, historyAssistant]); + expect(state.chatStream).toBeNull(); + expect(state.chatStreamStartedAt).toBeNull(); + }); + + it("keeps live tool cards when history only replaces streamed text", async () => { + const persistedUser = { + role: "user", + content: [{ type: "text", text: "latest ask" }], + __openclaw: { seq: 1 }, + }; + const historyAssistant = { + role: "assistant", + content: [{ type: "text", text: "First visible stream text. More final text." }], + __openclaw: { seq: 2 }, + }; + const liveToolMessage = { + role: "assistant", + toolCallId: "call_current", + runId: "run-1", + content: [{ type: "toolcall", name: "shell", arguments: {} }], + }; + const request = vi.fn().mockResolvedValue({ + messages: [persistedUser, historyAssistant], + thinkingLevel: "low", + }); + const state = createState({ + connected: true, + client: { request } as unknown as ChatState["client"], + chatMessages: [persistedUser], + chatRunId: "run-1", + chatStream: "First visible stream text.", + chatStreamStartedAt: 100, + }) as ChatState & { + chatStreamSegments: Array<{ text: string; ts: number }>; + chatToolMessages: Record[]; + toolStreamById: Map; + toolStreamOrder: string[]; + toolStreamSyncTimer: number | null; + }; + state.chatStreamSegments = [{ text: "First visible stream text.", ts: 90 }]; + state.chatToolMessages = [liveToolMessage]; + state.toolStreamById = new Map([["call_current", { message: liveToolMessage }]]); + state.toolStreamOrder = ["call_current"]; + state.toolStreamSyncTimer = null; + + await loadChatHistory(state); + + expect(state.chatMessages).toEqual([persistedUser, historyAssistant]); + expect(state.chatStream).toBeNull(); + expect(state.chatStreamStartedAt).toBeNull(); + expect(state.chatToolMessages).toEqual([liveToolMessage]); + expect(state.chatStreamSegments).toEqual([]); + expect(state.toolStreamById.size).toBe(1); + expect(state.toolStreamOrder).toEqual(["call_current"]); + }); + it("keeps local optimistic messages when history reload returns empty", async () => { const optimisticUser = { role: "user", diff --git a/ui/src/ui/controllers/chat.ts b/ui/src/ui/controllers/chat.ts index d6ca48935124..dc33d1ef8b0e 100644 --- a/ui/src/ui/controllers/chat.ts +++ b/ui/src/ui/controllers/chat.ts @@ -1,4 +1,3 @@ -import { resetToolStream } from "../app-tool-stream.ts"; import { getChatAttachmentDataUrl } from "../chat/attachment-payload-store.ts"; import { isAssistantHeartbeatAckForDisplay, @@ -6,6 +5,18 @@ import { } from "../chat/heartbeat-display.ts"; import { extractText } from "../chat/message-extract.ts"; import { reconcileChatRunLifecycle } from "../chat/run-lifecycle.ts"; +import { + appendTerminalAssistantMessage, + clearToolStreamSegments, + currentLiveToolCallIds, + hasVisibleStreamParts, + historyReplacedVisibleStream, + materializeVisibleStreamState, + maybeResetToolStream, + persistedCurrentToolStreamIds, + prunePersistedToolStreamMessages, + visibleCurrentAssistantStreamTail, +} from "../chat/stream-reconciliation.ts"; import { buildUserChatMessageContentBlocks } from "../chat/user-message-content.ts"; import { formatConnectError } from "../connect-error.ts"; import { @@ -148,6 +159,10 @@ function isHeartbeatAckStream(text: string): boolean { return stripHeartbeatTokenForDisplay(text).shouldSkip; } +function isHiddenAssistantStreamText(text: string): boolean { + return isSilentReplyStream(text) || isHeartbeatAckStream(text); +} + function shouldHideAssistantChatMessage(message: unknown): boolean { return isAssistantSilentReply(message) || isAssistantHeartbeatAckForDisplay(message); } @@ -160,6 +175,22 @@ function shouldHideHistoryMessage(message: unknown): boolean { ); } +function materializeVisibleAssistantStreamMessages( + messages: unknown[], + state: ChatState, + opts: { + includeCurrent?: boolean; + requirePersistedTool?: boolean; + replacementMessages?: unknown[]; + } = {}, +): unknown[] { + return materializeVisibleStreamState(messages, state, { + ...opts, + isHiddenAssistantMessage: shouldHideAssistantChatMessage, + isHiddenStreamText: isHiddenAssistantStreamText, + }); +} + function hasTranscriptMeta(message: unknown): boolean { return Boolean( message && @@ -484,18 +515,6 @@ function chatEventSessionMatches(state: ChatState, payload: ChatEventPayload): b ); } -function maybeResetToolStream(state: ChatState) { - const toolHost = state as ChatState & Partial[0]>; - if ( - toolHost.toolStreamById instanceof Map && - Array.isArray(toolHost.toolStreamOrder) && - Array.isArray(toolHost.chatToolMessages) && - Array.isArray(toolHost.chatStreamSegments) - ) { - resetToolStream(toolHost as Parameters[0]); - } -} - function resolveDeltaChatStreamText( currentStream: string | null, payload: ChatEventPayload, @@ -709,18 +728,72 @@ async function loadChatHistoryUncached( state.chatThinkingLevel = res.sessionInfo?.thinkingLevel ?? res.thinkingLevel ?? null; const resetStream = !state.chatRunId || state.chatRunId === previousRunId; if (resetStream) { - // Clear all streaming state — history includes tool results and text - // inline, so keeping streaming artifacts would cause duplicates. - maybeResetToolStream(state); - state.chatStream = null; - state.chatStreamStartedAt = null; - recordChatHistoryTiming(state, "stream-reset", startedAtMs, { - requestSessionKey: sessionKey, - requestAgentId, - previousRunId, - messageCount: messages.length, - visibleMessageCount: visibleMessages.length, - }); + const streamReconciliation = { + isHiddenAssistantMessage: shouldHideAssistantChatMessage, + isHiddenStreamText: isHiddenAssistantStreamText, + }; + const hasVisibleStream = hasVisibleStreamParts(state, streamReconciliation); + const historyReplacedStream = historyReplacedVisibleStream( + state.chatMessages, + state, + streamReconciliation, + ); + const liveToolIds = currentLiveToolCallIds(state); + const persistedToolStreamIds = persistedCurrentToolStreamIds(state.chatMessages, state); + const historyReplacedToolStream = + liveToolIds.length > 0 && liveToolIds.every((id) => persistedToolStreamIds.has(id)); + const historyReplacedSomeToolStream = persistedToolStreamIds.size > 0; + const liveToolStreamReplaced = liveToolIds.length === 0 || historyReplacedToolStream; + if (!hasVisibleStream || historyReplacedStream) { + if (liveToolStreamReplaced) { + // Clear all streaming state — history includes tool results and text + // inline, so keeping streaming artifacts would cause duplicates. + maybeResetToolStream(state); + } else { + prunePersistedToolStreamMessages(state, persistedToolStreamIds); + clearToolStreamSegments(state); + } + state.chatStream = null; + state.chatStreamStartedAt = null; + recordChatHistoryTiming(state, "stream-reset", startedAtMs, { + requestSessionKey: sessionKey, + requestAgentId, + previousRunId, + messageCount: messages.length, + visibleMessageCount: visibleMessages.length, + }); + } else if (!state.chatRunId) { + state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state); + maybeResetToolStream(state); + state.chatStream = null; + state.chatStreamStartedAt = null; + } else if (historyReplacedToolStream) { + state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state, { + includeCurrent: false, + }); + state.chatStream = visibleCurrentAssistantStreamTail( + state, + streamReconciliation.isHiddenStreamText, + ); + if (state.chatStream === null) { + state.chatStreamStartedAt = null; + } + maybeResetToolStream(state); + } else if (historyReplacedSomeToolStream) { + const visibleCurrentTail = visibleCurrentAssistantStreamTail( + state, + streamReconciliation.isHiddenStreamText, + ); + state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state, { + includeCurrent: false, + requirePersistedTool: true, + }); + state.chatStream = visibleCurrentTail; + if (state.chatStream === null) { + state.chatStreamStartedAt = null; + } + prunePersistedToolStreamMessages(state, persistedToolStreamIds); + } } recordChatHistoryTiming(state, "applied", startedAtMs, { requestSessionKey: sessionKey, @@ -1138,54 +1211,46 @@ export function handleChatEvent(state: ChatState, payload?: ChatEventPayload) { } } else if (payload.state === "final") { const finalMessage = normalizeFinalAssistantMessage(payload.message); - const visibleFinalMessage = - finalMessage && !shouldHideAssistantChatMessage(finalMessage) ? finalMessage : null; - const streamedText = state.chatStream ?? ""; - const fallbackMessage = - !visibleFinalMessage && - streamedText.trim() && - !isSilentReplyStream(streamedText) && - !isHeartbeatAckStream(streamedText) - ? { - role: "assistant", - content: [{ type: "text", text: streamedText }], - timestamp: Date.now(), - } - : null; - reconcileTerminalRun("done", "done"); - if (visibleFinalMessage) { - state.chatMessages = [...state.chatMessages, visibleFinalMessage]; - } else if (fallbackMessage) { - state.chatMessages = [...state.chatMessages, fallbackMessage]; + if (finalMessage && !shouldHideAssistantChatMessage(finalMessage)) { + state.chatMessages = appendTerminalAssistantMessage(state.chatMessages, finalMessage); + } else { + state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state); } + reconcileTerminalRun("done", "done"); } else if (payload.state === "aborted") { const normalizedMessage = normalizeAbortedAssistantMessage(payload.message); - const visibleAbortedMessage = - normalizedMessage && !shouldHideAssistantChatMessage(normalizedMessage) - ? normalizedMessage - : null; - const streamedText = state.chatStream ?? ""; - const fallbackMessage = - !visibleAbortedMessage && - streamedText.trim() && - !isSilentReplyStream(streamedText) && - !isHeartbeatAckStream(streamedText) - ? { - role: "assistant", - content: [{ type: "text", text: streamedText }], - timestamp: Date.now(), - } - : null; - reconcileTerminalRun("interrupted", "killed"); - if (visibleAbortedMessage) { - state.chatMessages = [...state.chatMessages, visibleAbortedMessage]; - } else if (fallbackMessage) { - state.chatMessages = [...state.chatMessages, fallbackMessage]; + if (normalizedMessage && !shouldHideAssistantChatMessage(normalizedMessage)) { + state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state, { + replacementMessages: [normalizedMessage], + includeCurrent: false, + }); + state.chatMessages = appendTerminalAssistantMessage(state.chatMessages, normalizedMessage); + } else { + state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state); } + reconcileTerminalRun("interrupted", "killed"); } else if (payload.state === "error") { - const errorMessage = hadActiveRunBeforeEvent ? buildErrorAssistantMessage(payload) : null; - if (errorMessage) { - state.chatMessages = [...state.chatMessages, errorMessage]; + const payloadMessage = hadActiveRunBeforeEvent + ? normalizeFinalAssistantMessage(payload.message) + : null; + const visiblePayloadMessage = + payloadMessage && !shouldHideAssistantChatMessage(payloadMessage) ? payloadMessage : null; + if (visiblePayloadMessage) { + state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state, { + replacementMessages: [visiblePayloadMessage], + }); + state.chatMessages = appendTerminalAssistantMessage( + state.chatMessages, + visiblePayloadMessage, + ); + } else { + const errorMessage = hadActiveRunBeforeEvent ? buildErrorAssistantMessage(payload) : null; + if (hadActiveRunBeforeEvent) { + state.chatMessages = materializeVisibleAssistantStreamMessages(state.chatMessages, state); + } + if (errorMessage) { + state.chatMessages = appendTerminalAssistantMessage(state.chatMessages, errorMessage); + } } reconcileTerminalRun("interrupted", "failed"); setChatError(state, payload.errorMessage ?? "chat error"); diff --git a/ui/src/ui/e2e/chat-flow.e2e.test.ts b/ui/src/ui/e2e/chat-flow.e2e.test.ts index 80c8cd1f7e61..ad40f035a359 100644 --- a/ui/src/ui/e2e/chat-flow.e2e.test.ts +++ b/ui/src/ui/e2e/chat-flow.e2e.test.ts @@ -363,6 +363,7 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => { models: [], }); await page.locator(".chat-thread").getByText(prompt).waitFor({ timeout: 10_000 }); + await page.getByText("First token visible.").waitFor({ timeout: 10_000 }); await gateway.emitChatFinal({ runId, text: "History race stayed visible." }); await page.getByText("History race stayed visible.").waitFor({ timeout: 10_000 }); expect(await gateway.getRequests("agents.list")).toHaveLength(0); @@ -371,6 +372,53 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => { } }); + it("keeps streamed text visible when a chat error terminates the turn", async () => { + const context = await browser.newContext({ + locale: "en-US", + serviceWorkers: "block", + viewport: { height: 900, width: 1280 }, + }); + const page = await context.newPage(); + const gateway = await installMockGateway(page); + + try { + await page.goto(`${server.baseUrl}chat`); + + const prompt = "stream before terminal error"; + await page.locator(".agent-chat__composer-combobox textarea").fill(prompt); + await page.getByRole("button", { name: "Send message" }).click(); + + const sendRequest = await gateway.waitForRequest("chat.send"); + const params = requireRecord(sendRequest.params); + const runId = requireString(params.idempotencyKey, "chat send idempotency key"); + const partialText = "Partial answer before gateway error."; + await gateway.emitGatewayEvent("chat", { + deltaText: partialText, + message: { + content: [{ text: partialText, type: "text" }], + role: "assistant", + timestamp: Date.now(), + }, + runId, + sessionKey: "main", + state: "delta", + }); + await page.getByText(partialText).waitFor({ timeout: 10_000 }); + + await gateway.emitGatewayEvent("chat", { + errorMessage: "gateway disconnected", + runId, + sessionKey: "main", + state: "error", + }); + + await page.getByText(partialText).waitFor({ timeout: 10_000 }); + await page.getByText("Error: gateway disconnected").waitFor({ timeout: 10_000 }); + } finally { + await context.close(); + } + }); + it("keeps a delayed chat.send ACK visible as pending until the ACK resolves", async () => { const context = await browser.newContext({ locale: "en-US",