Compare commits

...

20 Commits

Author SHA1 Message Date
Onur Solmaz
44e4fc2b51 fix(ui): prune persisted live tool cards 2026-06-03 11:16:32 +08:00
Onur Solmaz
c1898ef7a4 fix(ui): keep live stream below prompt 2026-06-03 11:09:26 +08:00
Onur Solmaz
38fdc4c934 fix(ui): retain stream segment tool context 2026-06-03 02:02:46 +08:00
Onur Solmaz
ccbd43262b fix(ui): pair recovered stream segments with tools 2026-06-03 01:55:00 +08:00
Onur Solmaz
e924b4f958 test(ui): update stream retention timing expectation 2026-06-03 01:47:36 +08:00
Onur Solmaz
0cd1d1808b fix(ui): retain live tools during history catchup 2026-06-03 01:40:55 +08:00
Onur Solmaz
8628fa6345 fix(ui): pair recovered streams with tools 2026-06-03 01:36:04 +08:00
Onur Solmaz
e516dda48f fix(ui): preserve stream recovery ordering 2026-06-03 01:28:18 +08:00
Onur Solmaz
a3ac037c72 fix(ui): match persisted tool block ids 2026-06-03 01:18:50 +08:00
Onur Solmaz
7b44d414a8 fix(ui): scope tool stream catch-up 2026-06-03 01:12:18 +08:00
Onur Solmaz
c42652dbf5 fix(ui): avoid duplicated source reply streams 2026-06-03 01:06:49 +08:00
Onur Solmaz
5d1e3d127a fix(ui): materialize orphaned stream segments 2026-06-03 01:00:11 +08:00
Onur Solmaz
d52bb7393a fix(ui): dedupe roleless terminal stream text 2026-06-03 00:54:45 +08:00
Onur Solmaz
9400c1a5c6 fix(ui): preserve segmented stream reloads 2026-06-03 00:49:57 +08:00
Onur Solmaz
3db9a290c2 fix(ui): avoid duplicated stream tails 2026-06-03 00:43:58 +08:00
Onur Solmaz
4baddea5b4 fix(ui): retain segmented stream text 2026-06-03 00:39:57 +08:00
Onur Solmaz
0f611fe438 fix(ui): preserve error terminal payloads 2026-06-03 00:32:40 +08:00
Onur Solmaz
e05e0fa0f4 fix(ui): clear caught-up tool streams 2026-06-03 00:28:34 +08:00
Onur Solmaz
4d993997e5 fix(ui): scope stream dedupe to current turn 2026-06-03 00:17:43 +08:00
Onur Solmaz
f3cefb8b05 fix(ui): preserve visible chat stream text 2026-06-03 00:10:57 +08:00
8 changed files with 1667 additions and 58 deletions

View File

@@ -426,7 +426,7 @@ describe("refreshChat", () => {
expect(requestUpdate).toHaveBeenCalled();
});
it("records chat history timing when a reload resets active stream state", async () => {
it("records chat history timing when a reload keeps active stream state visible", async () => {
const request = vi.fn((method: string) => {
if (method === "chat.history") {
return Promise.resolve({
@@ -445,7 +445,7 @@ describe("refreshChat", () => {
await refreshChat(host, { awaitHistory: true, scheduleScroll: false });
expect(host.chatStream).toBeNull();
expect(host.chatStream).toBe("partial");
expect(eventPayloads(host, "control-ui.chat.history")).toEqual(
expect.arrayContaining([
expect.objectContaining({
@@ -453,13 +453,6 @@ describe("refreshChat", () => {
sessionKey: "main",
previousRunId: "run-main",
}),
expect.objectContaining({
phase: "stream-reset",
sessionKey: "main",
previousRunId: "run-main",
activeRunId: "run-main",
visibleMessageCount: 1,
}),
expect.objectContaining({
phase: "applied",
sessionKey: "main",

View File

@@ -286,6 +286,34 @@ describe("app-tool-stream fallback lifecycle handling", () => {
expect(host.chatModelOverrides?.main).toBeNull();
});
it("tags stream segments with the tool they precede", () => {
useToolStreamFakeTimers();
const host = createHost({
chatRunId: "run-1",
chatStream: "visible text before tool",
chatStreamStartedAt: TOOL_STREAM_TEST_NOW - 10,
});
handleAgentEvent(host, {
runId: "run-1",
seq: 1,
stream: "tool",
ts: Date.now(),
sessionKey: "main",
data: {
phase: "start",
name: "exec",
toolCallId: "call_1",
},
});
expect(host.chatStreamSegments).toEqual([
{ text: "visible text before tool", ts: TOOL_STREAM_TEST_NOW, toolCallId: "call_1" },
]);
expect(host.chatStream).toBeNull();
vi.useRealTimers();
});
it("records tool activity summaries without storing raw argument values", () => {
useToolStreamFakeTimers();
const host = createHost();

View File

@@ -60,7 +60,7 @@ type ToolStreamHost = {
chatRunId: string | null;
chatStream: string | null;
chatStreamStartedAt: number | null;
chatStreamSegments: Array<{ text: string; ts: number }>;
chatStreamSegments: Array<{ text: string; ts: number; toolCallId?: string }>;
toolStreamById: Map<string, ToolStreamEntry>;
toolStreamOrder: string[];
chatToolMessages: Record<string, unknown>[];
@@ -791,7 +791,10 @@ export function handleAgentEvent(host: ToolStreamHost, payload?: AgentEventPaylo
host.chatStream &&
host.chatStream.trim().length > 0
) {
host.chatStreamSegments = [...host.chatStreamSegments, { text: host.chatStream, ts: now }];
host.chatStreamSegments = [
...host.chatStreamSegments,
{ text: host.chatStream, ts: now, toolCallId },
];
host.chatStream = null;
host.chatStreamStartedAt = null;
}

View File

@@ -413,6 +413,31 @@ describe("buildChatItems", () => {
expect(messageRecord(requireGroup(items[1])).content).toBe("Missing timestamp.");
});
it("renders an active stream after the persisted user turn it answers", () => {
const items = buildChatItems(
createProps({
messages: [
{
role: "user",
content: [{ type: "text", text: "Persisted prompt." }],
timestamp: 2_000,
},
],
stream: "Visible partial answer.",
streamStartedAt: 1_000,
}),
);
expect(items).toHaveLength(2);
expect(requireGroup(items[0]).role).toBe("user");
expect(items[1]).toMatchObject({
kind: "stream",
text: "Visible partial answer.",
startedAt: 2_001,
isStreaming: true,
});
});
it("renders submitted queued sends as user turns before chat.send ACK", () => {
const groups = messageGroups({
messages: [{ role: "assistant", content: "Ready.", timestamp: 1 }],

View File

@@ -348,6 +348,19 @@ function chatItemTimestamp(item: ChatItem): number | null {
return null;
}
function timestampAfterVisibleItems(items: ChatItem[], desiredTimestamp: number): number {
const latestTimestamp = items.reduce<number | null>((latest, item) => {
const timestamp = chatItemTimestamp(item);
if (timestamp == null) {
return latest;
}
return latest == null || timestamp > latest ? timestamp : latest;
}, null);
return latestTimestamp != null && desiredTimestamp <= latestTimestamp
? latestTimestamp + 1
: desiredTimestamp;
}
function sortChatItemsByVisibleTime(items: ChatItem[]): ChatItem[] {
return items
.map((item, index) => ({ item, index, timestamp: chatItemTimestamp(item) }))
@@ -647,13 +660,14 @@ export function buildChatItems(props: BuildChatItemsProps): Array<ChatItem | Mes
const key = `stream:${props.sessionKey}:${props.streamStartedAt ?? "live"}`;
const text = sanitizeStreamText(props.stream);
const visibleText = trimAccumulatedStreamPrefix(text, previousAccumulatedStreamText);
const startedAt = timestampAfterVisibleItems(items, props.streamStartedAt ?? Date.now());
if (visibleText.length > 0) {
if (!stripHeartbeatTokenForDisplay(visibleText).shouldSkip) {
items.push({
kind: "stream",
key,
text: visibleText,
startedAt: props.streamStartedAt ?? Date.now(),
startedAt,
isStreaming: true,
});
}

File diff suppressed because it is too large Load Diff

View File

@@ -95,6 +95,106 @@ function isSyntheticTranscriptRepairToolResult(message: unknown): boolean {
return typeof text === "string" && text.trim() === SYNTHETIC_TRANSCRIPT_REPAIR_RESULT;
}
function isPersistedToolHistoryMessage(message: unknown): boolean {
if (!message || typeof message !== "object") {
return false;
}
const entry = message as Record<string, unknown>;
const role = normalizeLowercaseStringOrEmpty(entry.role);
if (role === "toolresult" || role === "tool_result" || role === "tool" || role === "function") {
return true;
}
if (
typeof entry.toolCallId === "string" ||
typeof entry.tool_call_id === "string" ||
typeof entry.toolName === "string" ||
typeof entry.tool_name === "string"
) {
return true;
}
return (
Array.isArray(entry.content) &&
entry.content.some((block) => {
if (!block || typeof block !== "object") {
return false;
}
const type = normalizeLowercaseStringOrEmpty((block as Record<string, unknown>).type);
return (
type === "toolresult" ||
type === "tool_result" ||
type === "toolcall" ||
type === "tool_call" ||
type === "tooluse" ||
type === "tool_use"
);
})
);
}
function collectToolCallIds(message: unknown): string[] {
if (!message || typeof message !== "object") {
return [];
}
const entry = message as Record<string, unknown>;
const ids = [entry.toolCallId, entry.tool_call_id, entry.toolUseId, entry.tool_use_id, entry.id]
.filter((value): value is string => typeof value === "string" && value.trim().length > 0)
.map((value) => value.trim());
if (!Array.isArray(entry.content)) {
return ids;
}
for (const block of entry.content) {
if (!block || typeof block !== "object") {
continue;
}
const contentEntry = block as Record<string, unknown>;
for (const value of [
contentEntry.toolCallId,
contentEntry.tool_call_id,
contentEntry.toolUseId,
contentEntry.tool_use_id,
contentEntry.id,
]) {
if (typeof value === "string" && value.trim()) {
ids.push(value.trim());
}
}
}
return ids;
}
function currentLiveToolCallIds(state: ChatState): string[] {
const toolHost = state as ChatState & { toolStreamOrder?: unknown };
return Array.isArray(toolHost.toolStreamOrder)
? toolHost.toolStreamOrder.filter(
(value): value is string => typeof value === "string" && value.trim().length > 0,
)
: [];
}
function persistedCurrentToolStreamIds(messages: unknown[], state: ChatState): Set<string> {
const liveToolIds = currentLiveToolCallIds(state);
const matchedToolIds = new Set<string>();
if (liveToolIds.length === 0) {
return matchedToolIds;
}
const liveToolIdSet = new Set(liveToolIds);
const persistedToolIds = new Set<string>();
for (const message of messages.slice(lastUserMessageIndex(messages) + 1)) {
if (!isPersistedToolHistoryMessage(message)) {
continue;
}
for (const id of collectToolCallIds(message)) {
persistedToolIds.add(id);
}
}
for (const id of persistedToolIds) {
if (liveToolIdSet.has(id)) {
matchedToolIds.add(id);
}
}
return matchedToolIds;
}
function isTextOnlyContent(content: unknown): boolean {
if (typeof content === "string") {
return true;
@@ -197,6 +297,290 @@ function messageDisplaySignature(message: unknown): string | null {
}
}
function visibleAssistantStreamText(stream: string | null): string | null {
if (!stream?.trim() || isSilentReplyStream(stream) || isHeartbeatAckStream(stream)) {
return null;
}
return stream;
}
function buildAssistantStreamMessage(
stream: string,
replacementText = stream,
timestamp = Date.now(),
): Record<string, unknown> {
return {
role: "assistant",
content: [{ type: "text", text: stream }],
timestamp,
openclawStreamFallback: {
replacementText,
},
};
}
function streamFallbackReplacementText(message: unknown): string | null {
if (!message || typeof message !== "object") {
return null;
}
const fallback = (message as { openclawStreamFallback?: unknown }).openclawStreamFallback;
if (!fallback || typeof fallback !== "object") {
return null;
}
const replacementText = (fallback as { replacementText?: unknown }).replacementText;
if (typeof replacementText === "string" && replacementText.trim()) {
return replacementText.trim();
}
return extractText(message)?.trim() ?? null;
}
function terminalMessageReplacesStreamFallback(message: unknown, fallback: unknown): boolean {
const fallbackText = streamFallbackReplacementText(fallback);
if (!fallbackText) {
return false;
}
const terminalText = extractText(message)?.trim();
return Boolean(
terminalText && (terminalText === fallbackText || terminalText.startsWith(fallbackText)),
);
}
function appendTerminalAssistantMessage(messages: unknown[], message: unknown): unknown[] {
const retainedMessages = messages.filter((existing, index) => {
if (index <= lastUserMessageIndex(messages)) {
return true;
}
return !terminalMessageReplacesStreamFallback(message, existing);
});
return [...retainedMessages, message];
}
function lastUserMessageIndex(messages: unknown[]): number {
for (let index = messages.length - 1; index >= 0; index--) {
const message = messages[index];
if (!message || typeof message !== "object") {
continue;
}
const role = normalizeLowercaseStringOrEmpty((message as { role?: unknown }).role);
if (role === "user") {
return index;
}
}
return -1;
}
function hasAssistantStreamReplacement(messages: unknown[], stream: string): boolean {
const expected = stream.trim();
if (!expected) {
return false;
}
const startIndex = lastUserMessageIndex(messages) + 1;
return messages.slice(startIndex).some((message) => {
if (!message || typeof message !== "object") {
return false;
}
const role = normalizeLowercaseStringOrEmpty((message as { role?: unknown }).role);
if (role && role !== "assistant") {
return false;
}
if (role === "assistant" && shouldHideAssistantChatMessage(message)) {
return false;
}
const text = extractText(message)?.trim();
return Boolean(text && (text === expected || text.startsWith(expected)));
});
}
function trimAccumulatedVisibleStreamText(text: string, previousText: string | null): string {
if (!previousText || !text.startsWith(previousText)) {
return text;
}
return text.slice(previousText.length).trimStart();
}
type VisibleAssistantStreamPart = {
text: string;
replacementText: string;
source: "segment" | "current";
timestamp: number;
toolCallId?: string;
};
function visibleAssistantStreamParts(
state: ChatState,
opts?: { includeCurrent?: boolean },
): VisibleAssistantStreamPart[] {
const streamHost = state as ChatState & {
chatStreamSegments?: Array<{ text?: unknown; ts?: unknown; toolCallId?: unknown }>;
};
const liveToolIds = currentLiveToolCallIds(state);
const parts: VisibleAssistantStreamPart[] = [];
let previousText: string | null = null;
const segments = Array.isArray(streamHost.chatStreamSegments)
? streamHost.chatStreamSegments
: [];
for (let segmentIndex = 0; segmentIndex < segments.length; segmentIndex++) {
const segment = segments[segmentIndex];
if (!segment) {
continue;
}
if (typeof segment.text !== "string") {
continue;
}
const visible = visibleAssistantStreamText(
trimAccumulatedVisibleStreamText(segment.text, previousText),
);
if (visible) {
parts.push({
text: visible,
replacementText: segment.text,
source: "segment",
timestamp:
typeof segment.ts === "number" && Number.isFinite(segment.ts) ? segment.ts : Date.now(),
toolCallId:
typeof segment.toolCallId === "string" && segment.toolCallId.trim()
? segment.toolCallId.trim()
: liveToolIds[segmentIndex],
});
}
if (segment.text.trim()) {
previousText = segment.text;
}
}
if (opts?.includeCurrent !== false && typeof state.chatStream === "string") {
const visible = visibleAssistantStreamText(
trimAccumulatedVisibleStreamText(state.chatStream, previousText),
);
if (visible) {
parts.push({
text: visible,
replacementText: state.chatStream,
source: "current",
timestamp: state.chatStreamStartedAt ?? Date.now(),
});
}
}
return parts;
}
function visibleCurrentAssistantStreamTail(state: ChatState): string | null {
if (typeof state.chatStream !== "string") {
return null;
}
const streamHost = state as ChatState & {
chatStreamSegments?: Array<{ text?: unknown }>;
};
const segments = Array.isArray(streamHost.chatStreamSegments)
? streamHost.chatStreamSegments
: [];
let previousText: string | null = null;
for (const segment of segments) {
if (typeof segment.text === "string" && segment.text.trim()) {
previousText = segment.text;
}
}
return visibleAssistantStreamText(
trimAccumulatedVisibleStreamText(state.chatStream, previousText),
);
}
function hasAssistantStreamPartReplacement(
messages: unknown[],
part: VisibleAssistantStreamPart,
): boolean {
return (
hasAssistantStreamReplacement(messages, part.replacementText) ||
hasAssistantStreamReplacement(messages, part.text)
);
}
function currentToolStreamMessageIndex(
messages: unknown[],
state: ChatState,
toolCallId?: string,
): number {
const liveToolIds = toolCallId ? new Set([toolCallId]) : new Set(currentLiveToolCallIds(state));
if (liveToolIds.size === 0) {
return -1;
}
const startIndex = lastUserMessageIndex(messages) + 1;
for (let index = startIndex; index < messages.length; index++) {
const message = messages[index];
if (!isPersistedToolHistoryMessage(message)) {
continue;
}
if (collectToolCallIds(message).some((id) => liveToolIds.has(id))) {
return index;
}
}
return -1;
}
function insertMessageAtIndex(messages: unknown[], message: unknown, index: number): unknown[] {
return [...messages.slice(0, index), message, ...messages.slice(index)];
}
function timestampForInsertedVisibleStream(
messages: unknown[],
index: number,
desiredTimestamp: number,
): number {
const previousTimestamp = messages
.slice(0, index)
.toReversed()
.map(messageTimestampMs)
.find((timestamp): timestamp is number => timestamp != null);
const nextTimestamp = messages
.slice(index)
.map(messageTimestampMs)
.find((timestamp): timestamp is number => timestamp != null);
if (previousTimestamp != null && desiredTimestamp <= previousTimestamp) {
const afterPrevious = previousTimestamp + 1;
return nextTimestamp != null && afterPrevious >= nextTimestamp
? previousTimestamp + (nextTimestamp - previousTimestamp) / 2
: afterPrevious;
}
if (nextTimestamp != null && desiredTimestamp >= nextTimestamp) {
const beforeNext = nextTimestamp - 1;
return previousTimestamp != null && beforeNext <= previousTimestamp
? previousTimestamp + (nextTimestamp - previousTimestamp) / 2
: beforeNext;
}
return desiredTimestamp;
}
function appendVisibleStreamStateMessages(
messages: unknown[],
state: ChatState,
replacementMessages: unknown[] = [],
opts?: { includeCurrent?: boolean; requirePersistedTool?: boolean },
): unknown[] {
let nextMessages = messages;
for (const part of visibleAssistantStreamParts(state, opts)) {
if (hasAssistantStreamPartReplacement([...nextMessages, ...replacementMessages], part)) {
continue;
}
const toolIndex =
part.source === "segment"
? currentToolStreamMessageIndex(nextMessages, state, part.toolCallId)
: -1;
if (opts?.requirePersistedTool && toolIndex < 0) {
continue;
}
const insertIndex = toolIndex >= 0 ? toolIndex : nextMessages.length;
const streamMessage = buildAssistantStreamMessage(
part.text,
part.replacementText,
timestampForInsertedVisibleStream(nextMessages, insertIndex, part.timestamp),
);
nextMessages =
toolIndex >= 0
? insertMessageAtIndex(nextMessages, streamMessage, toolIndex)
: [...nextMessages, streamMessage];
}
return nextMessages;
}
function messageTimestampMs(message: unknown): number | null {
if (!message || typeof message !== "object") {
return null;
@@ -484,7 +868,7 @@ function chatEventSessionMatches(state: ChatState, payload: ChatEventPayload): b
);
}
function maybeResetToolStream(state: ChatState) {
function maybeResetToolStream(state: ChatState, opts?: { preserveStreamSegments?: boolean }) {
const toolHost = state as ChatState & Partial<Parameters<typeof resetToolStream>[0]>;
if (
toolHost.toolStreamById instanceof Map &&
@@ -492,10 +876,74 @@ function maybeResetToolStream(state: ChatState) {
Array.isArray(toolHost.chatToolMessages) &&
Array.isArray(toolHost.chatStreamSegments)
) {
const preservedStreamSegments = opts?.preserveStreamSegments
? [...toolHost.chatStreamSegments]
: null;
resetToolStream(toolHost as Parameters<typeof resetToolStream>[0]);
if (preservedStreamSegments) {
toolHost.chatStreamSegments = preservedStreamSegments;
}
}
}
function clearToolStreamSegments(state: ChatState) {
const toolHost = state as ChatState & { chatStreamSegments?: unknown };
if (Array.isArray(toolHost.chatStreamSegments)) {
toolHost.chatStreamSegments = [];
}
}
function prunePersistedToolStreamMessages(state: ChatState, persistedToolIds: Set<string>) {
if (persistedToolIds.size === 0) {
return;
}
const toolHost = state as ChatState & {
chatStreamSegments?: Array<{ text?: unknown; ts?: unknown; toolCallId?: unknown }>;
chatToolMessages?: unknown[];
toolStreamById?: Map<string, unknown>;
toolStreamOrder?: unknown[];
};
const liveToolIds = currentLiveToolCallIds(state);
if (toolHost.toolStreamById instanceof Map) {
for (const id of persistedToolIds) {
toolHost.toolStreamById.delete(id);
}
}
if (Array.isArray(toolHost.toolStreamOrder)) {
toolHost.toolStreamOrder = toolHost.toolStreamOrder.filter(
(id): id is string => typeof id === "string" && !persistedToolIds.has(id),
);
}
if (Array.isArray(toolHost.chatToolMessages)) {
toolHost.chatToolMessages = toolHost.chatToolMessages.filter((message) => {
const ids = collectToolCallIds(message);
return ids.every((id) => !persistedToolIds.has(id));
});
}
if (!Array.isArray(toolHost.chatStreamSegments)) {
return;
}
let lastPrunedAccumulatedText: string | null = null;
toolHost.chatStreamSegments = toolHost.chatStreamSegments.flatMap((segment, index) => {
const explicitToolCallId =
typeof segment.toolCallId === "string" && segment.toolCallId.trim()
? segment.toolCallId.trim()
: null;
const toolCallId = explicitToolCallId ?? liveToolIds[index] ?? null;
const text = typeof segment.text === "string" ? segment.text : "";
if (toolCallId && persistedToolIds.has(toolCallId)) {
if (text.trim()) {
lastPrunedAccumulatedText = text;
}
return [];
}
const nextText = lastPrunedAccumulatedText
? trimAccumulatedVisibleStreamText(text, lastPrunedAccumulatedText)
: text;
return [{ ...segment, text: nextText }];
});
}
function resolveDeltaChatStreamText(
currentStream: string | null,
payload: ChatEventPayload,
@@ -709,18 +1157,62 @@ async function loadChatHistoryUncached(
state.chatThinkingLevel = res.sessionInfo?.thinkingLevel ?? res.thinkingLevel ?? null;
const resetStream = !state.chatRunId || state.chatRunId === previousRunId;
if (resetStream) {
// Clear all streaming state — history includes tool results and text
// inline, so keeping streaming artifacts would cause duplicates.
maybeResetToolStream(state);
state.chatStream = null;
state.chatStreamStartedAt = null;
recordChatHistoryTiming(state, "stream-reset", startedAtMs, {
requestSessionKey: sessionKey,
requestAgentId,
previousRunId,
messageCount: messages.length,
visibleMessageCount: visibleMessages.length,
});
const visibleStreamParts = visibleAssistantStreamParts(state);
const historyReplacedStream =
visibleStreamParts.length > 0 &&
visibleStreamParts.every((part) =>
hasAssistantStreamPartReplacement(state.chatMessages, part),
);
const liveToolIds = currentLiveToolCallIds(state);
const persistedToolStreamIds = persistedCurrentToolStreamIds(state.chatMessages, state);
const historyReplacedToolStream =
liveToolIds.length > 0 && liveToolIds.every((id) => persistedToolStreamIds.has(id));
const historyReplacedSomeToolStream = persistedToolStreamIds.size > 0;
const liveToolStreamReplaced = liveToolIds.length === 0 || historyReplacedToolStream;
if (visibleStreamParts.length === 0 || historyReplacedStream) {
if (liveToolStreamReplaced) {
// Clear all streaming state — history includes tool results and text
// inline, so keeping streaming artifacts would cause duplicates.
maybeResetToolStream(state);
} else {
prunePersistedToolStreamMessages(state, persistedToolStreamIds);
clearToolStreamSegments(state);
}
state.chatStream = null;
state.chatStreamStartedAt = null;
recordChatHistoryTiming(state, "stream-reset", startedAtMs, {
requestSessionKey: sessionKey,
requestAgentId,
previousRunId,
messageCount: messages.length,
visibleMessageCount: visibleMessages.length,
});
} else if (!state.chatRunId) {
state.chatMessages = appendVisibleStreamStateMessages(state.chatMessages, state);
maybeResetToolStream(state);
state.chatStream = null;
state.chatStreamStartedAt = null;
} else if (historyReplacedToolStream) {
state.chatMessages = appendVisibleStreamStateMessages(state.chatMessages, state, [], {
includeCurrent: false,
});
state.chatStream = visibleCurrentAssistantStreamTail(state);
if (state.chatStream === null) {
state.chatStreamStartedAt = null;
}
maybeResetToolStream(state);
} else if (historyReplacedSomeToolStream) {
const visibleCurrentTail = visibleCurrentAssistantStreamTail(state);
state.chatMessages = appendVisibleStreamStateMessages(state.chatMessages, state, [], {
includeCurrent: false,
requirePersistedTool: true,
});
state.chatStream = visibleCurrentTail;
if (state.chatStream === null) {
state.chatStreamStartedAt = null;
}
prunePersistedToolStreamMessages(state, persistedToolStreamIds);
}
}
recordChatHistoryTiming(state, "applied", startedAtMs, {
requestSessionKey: sessionKey,
@@ -1130,48 +1622,47 @@ export function handleChatEvent(state: ChatState, payload?: ChatEventPayload) {
} else if (payload.state === "final") {
const finalMessage = normalizeFinalAssistantMessage(payload.message);
if (finalMessage && !shouldHideAssistantChatMessage(finalMessage)) {
state.chatMessages = [...state.chatMessages, finalMessage];
} else if (
state.chatStream?.trim() &&
!isSilentReplyStream(state.chatStream) &&
!isHeartbeatAckStream(state.chatStream)
) {
state.chatMessages = [
...state.chatMessages,
{
role: "assistant",
content: [{ type: "text", text: state.chatStream }],
timestamp: Date.now(),
},
];
state.chatMessages = appendTerminalAssistantMessage(state.chatMessages, finalMessage);
} else {
state.chatMessages = appendVisibleStreamStateMessages(state.chatMessages, state);
}
reconcileTerminalRun("done", "done");
} else if (payload.state === "aborted") {
const normalizedMessage = normalizeAbortedAssistantMessage(payload.message);
if (normalizedMessage && !shouldHideAssistantChatMessage(normalizedMessage)) {
state.chatMessages = [...state.chatMessages, normalizedMessage];
state.chatMessages = appendVisibleStreamStateMessages(
state.chatMessages,
state,
[normalizedMessage],
{ includeCurrent: false },
);
state.chatMessages = appendTerminalAssistantMessage(state.chatMessages, normalizedMessage);
} else {
const streamedText = state.chatStream ?? "";
if (
streamedText.trim() &&
!isSilentReplyStream(streamedText) &&
!isHeartbeatAckStream(streamedText)
) {
state.chatMessages = [
...state.chatMessages,
{
role: "assistant",
content: [{ type: "text", text: streamedText }],
timestamp: Date.now(),
},
];
}
state.chatMessages = appendVisibleStreamStateMessages(state.chatMessages, state);
}
reconcileTerminalRun("interrupted", "killed");
} else if (payload.state === "error") {
const errorMessage = hadActiveRunBeforeEvent ? buildErrorAssistantMessage(payload) : null;
if (errorMessage) {
state.chatMessages = [...state.chatMessages, errorMessage];
const payloadMessage = hadActiveRunBeforeEvent
? normalizeFinalAssistantMessage(payload.message)
: null;
const visiblePayloadMessage =
payloadMessage && !shouldHideAssistantChatMessage(payloadMessage) ? payloadMessage : null;
if (visiblePayloadMessage) {
state.chatMessages = appendVisibleStreamStateMessages(state.chatMessages, state, [
visiblePayloadMessage,
]);
state.chatMessages = appendTerminalAssistantMessage(
state.chatMessages,
visiblePayloadMessage,
);
} else {
const errorMessage = hadActiveRunBeforeEvent ? buildErrorAssistantMessage(payload) : null;
if (hadActiveRunBeforeEvent) {
state.chatMessages = appendVisibleStreamStateMessages(state.chatMessages, state);
}
if (errorMessage) {
state.chatMessages = appendTerminalAssistantMessage(state.chatMessages, errorMessage);
}
}
reconcileTerminalRun("interrupted", "failed");
setChatError(state, payload.errorMessage ?? "chat error");

View File

@@ -356,6 +356,7 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
thinkingLevel: null,
});
await page.locator(".chat-thread").getByText(prompt).waitFor({ timeout: 10_000 });
await page.getByText("First token visible.").waitFor({ timeout: 10_000 });
await gateway.emitChatFinal({ runId, text: "History race stayed visible." });
await page.getByText("History race stayed visible.").waitFor({ timeout: 10_000 });
expect(await gateway.getRequests("agents.list")).toHaveLength(0);
@@ -364,6 +365,53 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
}
});
it("keeps streamed text visible when a chat error terminates the turn", async () => {
const context = await browser.newContext({
locale: "en-US",
serviceWorkers: "block",
viewport: { height: 900, width: 1280 },
});
const page = await context.newPage();
const gateway = await installMockGateway(page);
try {
await page.goto(`${server.baseUrl}chat`);
const prompt = "stream before terminal error";
await page.locator(".agent-chat__composer-combobox textarea").fill(prompt);
await page.getByRole("button", { name: "Send message" }).click();
const sendRequest = await gateway.waitForRequest("chat.send");
const params = requireRecord(sendRequest.params);
const runId = requireString(params.idempotencyKey, "chat send idempotency key");
const partialText = "Partial answer before gateway error.";
await gateway.emitGatewayEvent("chat", {
deltaText: partialText,
message: {
content: [{ text: partialText, type: "text" }],
role: "assistant",
timestamp: Date.now(),
},
runId,
sessionKey: "main",
state: "delta",
});
await page.getByText(partialText).waitFor({ timeout: 10_000 });
await gateway.emitGatewayEvent("chat", {
errorMessage: "gateway disconnected",
runId,
sessionKey: "main",
state: "error",
});
await page.getByText(partialText).waitFor({ timeout: 10_000 });
await page.getByText("Error: gateway disconnected").waitFor({ timeout: 10_000 });
} finally {
await context.close();
}
});
it("keeps a delayed chat.send ACK visible as pending until the ACK resolves", async () => {
const context = await browser.newContext({
locale: "en-US",