Compare commits

...

2 Commits

Author SHA1 Message Date
Onur Solmaz
44e4fc2b51 fix(ui): prune persisted live tool cards 2026-06-03 11:16:32 +08:00
Onur Solmaz
c1898ef7a4 fix(ui): keep live stream below prompt 2026-06-03 11:09:26 +08:00
4 changed files with 266 additions and 16 deletions

View File

@@ -413,6 +413,31 @@ describe("buildChatItems", () => {
expect(messageRecord(requireGroup(items[1])).content).toBe("Missing timestamp.");
});
it("renders an active stream after the persisted user turn it answers", () => {
const items = buildChatItems(
createProps({
messages: [
{
role: "user",
content: [{ type: "text", text: "Persisted prompt." }],
timestamp: 2_000,
},
],
stream: "Visible partial answer.",
streamStartedAt: 1_000,
}),
);
expect(items).toHaveLength(2);
expect(requireGroup(items[0]).role).toBe("user");
expect(items[1]).toMatchObject({
kind: "stream",
text: "Visible partial answer.",
startedAt: 2_001,
isStreaming: true,
});
});
it("renders submitted queued sends as user turns before chat.send ACK", () => {
const groups = messageGroups({
messages: [{ role: "assistant", content: "Ready.", timestamp: 1 }],

View File

@@ -348,6 +348,19 @@ function chatItemTimestamp(item: ChatItem): number | null {
return null;
}
function timestampAfterVisibleItems(items: ChatItem[], desiredTimestamp: number): number {
const latestTimestamp = items.reduce<number | null>((latest, item) => {
const timestamp = chatItemTimestamp(item);
if (timestamp == null) {
return latest;
}
return latest == null || timestamp > latest ? timestamp : latest;
}, null);
return latestTimestamp != null && desiredTimestamp <= latestTimestamp
? latestTimestamp + 1
: desiredTimestamp;
}
function sortChatItemsByVisibleTime(items: ChatItem[]): ChatItem[] {
return items
.map((item, index) => ({ item, index, timestamp: chatItemTimestamp(item) }))
@@ -647,13 +660,14 @@ export function buildChatItems(props: BuildChatItemsProps): Array<ChatItem | Mes
const key = `stream:${props.sessionKey}:${props.streamStartedAt ?? "live"}`;
const text = sanitizeStreamText(props.stream);
const visibleText = trimAccumulatedStreamPrefix(text, previousAccumulatedStreamText);
const startedAt = timestampAfterVisibleItems(items, props.streamStartedAt ?? Date.now());
if (visibleText.length > 0) {
if (!stripHeartbeatTokenForDisplay(visibleText).shouldSkip) {
items.push({
kind: "stream",
key,
text: visibleText,
startedAt: props.streamStartedAt ?? Date.now(),
startedAt,
isStreaming: true,
});
}

View File

@@ -2376,6 +2376,80 @@ describe("loadChatHistory retry handling", () => {
expect(state.toolStreamOrder).toEqual([]);
});
it("prunes only the live tool cards that history has caught up with", async () => {
const persistedUser = {
role: "user",
content: [{ type: "text", text: "latest ask" }],
__openclaw: { seq: 1 },
};
const firstToolResult = {
role: "toolResult",
toolCallId: "call_1",
toolName: "shell",
content: [{ type: "text", text: "first output" }],
timestamp: 2,
__openclaw: { seq: 2 },
};
const secondLiveToolResult = {
role: "assistant",
toolCallId: "call_2",
runId: "run-1",
content: [
{ type: "toolcall", name: "shell", arguments: {} },
{ type: "toolresult", name: "shell", text: "second output" },
],
timestamp: 4,
};
const request = vi.fn().mockResolvedValue({
messages: [persistedUser, firstToolResult],
thinkingLevel: "low",
});
const state = createState({
connected: true,
client: { request } as unknown as ChatState["client"],
chatMessages: [persistedUser],
chatRunId: "run-1",
chatStream: "before first tool\nbefore second tool\nStill answering.",
chatStreamStartedAt: 100,
}) as ChatState & {
chatStreamSegments: Array<{ text: string; ts: number; toolCallId?: string }>;
chatToolMessages: Record<string, unknown>[];
toolStreamById: Map<string, unknown>;
toolStreamOrder: string[];
toolStreamSyncTimer: number | null;
};
state.chatStreamSegments = [
{ text: "before first tool", ts: 1, toolCallId: "call_1" },
{
text: "before first tool\nbefore second tool",
ts: 3,
toolCallId: "call_2",
},
];
state.chatToolMessages = [firstToolResult, secondLiveToolResult];
state.toolStreamById = new Map([
["call_1", { message: firstToolResult }],
["call_2", { message: secondLiveToolResult }],
]);
state.toolStreamOrder = ["call_1", "call_2"];
state.toolStreamSyncTimer = null;
await loadChatHistory(state);
expect(state.chatMessages).toHaveLength(3);
expect(state.chatMessages[0]).toEqual(persistedUser);
expectTextChatMessage(state.chatMessages[1], "assistant", "before first tool");
expect(state.chatMessages[2]).toEqual(firstToolResult);
expect(state.chatToolMessages).toEqual([secondLiveToolResult]);
expect(state.chatStreamSegments).toEqual([
{ text: "before second tool", ts: 3, toolCallId: "call_2" },
]);
expect(state.chatStream).toBe("Still answering.");
expect(state.toolStreamById.size).toBe(1);
expect(state.toolStreamById.has("call_2")).toBe(true);
expect(state.toolStreamOrder).toEqual(["call_2"]);
});
it("uses segment tool ids when a tool starts before any stream text", async () => {
const persistedUser = {
role: "user",
@@ -2703,6 +2777,40 @@ describe("loadChatHistory retry handling", () => {
expect(state.chatStreamStartedAt).toBeNull();
});
it("timestamps materialized streamed text after the persisted user prompt", async () => {
const persistedUser = {
role: "user",
content: [{ type: "text", text: "first" }],
timestamp: 200,
__openclaw: { seq: 1 },
};
const request = vi.fn().mockResolvedValue({
messages: [persistedUser],
thinkingLevel: "low",
});
const state = createState({
connected: true,
client: { request } as unknown as ChatState["client"],
chatMessages: [persistedUser],
chatRunId: null,
chatStream: "Partial answer before history catch-up.",
chatStreamStartedAt: 100,
});
await loadChatHistory(state);
expect(state.chatMessages).toHaveLength(2);
expect(state.chatMessages[0]).toEqual(persistedUser);
expectTextChatMessage(
state.chatMessages[1],
"assistant",
"Partial answer before history catch-up.",
);
expect(requireRecord(state.chatMessages[1]).timestamp).toBe(201);
expect(state.chatStream).toBeNull();
expect(state.chatStreamStartedAt).toBeNull();
});
it("materializes orphaned segment-only assistant text before clearing caught-up tools", async () => {
const persistedUser = {
role: "user",

View File

@@ -171,11 +171,13 @@ function currentLiveToolCallIds(state: ChatState): string[] {
: [];
}
function hasPersistedCurrentToolStreamMessages(messages: unknown[], state: ChatState): boolean {
function persistedCurrentToolStreamIds(messages: unknown[], state: ChatState): Set<string> {
const liveToolIds = currentLiveToolCallIds(state);
const matchedToolIds = new Set<string>();
if (liveToolIds.length === 0) {
return false;
return matchedToolIds;
}
const liveToolIdSet = new Set(liveToolIds);
const persistedToolIds = new Set<string>();
for (const message of messages.slice(lastUserMessageIndex(messages) + 1)) {
if (!isPersistedToolHistoryMessage(message)) {
@@ -185,7 +187,12 @@ function hasPersistedCurrentToolStreamMessages(messages: unknown[], state: ChatS
persistedToolIds.add(id);
}
}
return liveToolIds.every((id) => persistedToolIds.has(id));
for (const id of persistedToolIds) {
if (liveToolIdSet.has(id)) {
matchedToolIds.add(id);
}
}
return matchedToolIds;
}
function isTextOnlyContent(content: unknown): boolean {
@@ -513,26 +520,59 @@ function insertMessageAtIndex(messages: unknown[], message: unknown, index: numb
return [...messages.slice(0, index), message, ...messages.slice(index)];
}
function timestampForInsertedVisibleStream(
messages: unknown[],
index: number,
desiredTimestamp: number,
): number {
const previousTimestamp = messages
.slice(0, index)
.toReversed()
.map(messageTimestampMs)
.find((timestamp): timestamp is number => timestamp != null);
const nextTimestamp = messages
.slice(index)
.map(messageTimestampMs)
.find((timestamp): timestamp is number => timestamp != null);
if (previousTimestamp != null && desiredTimestamp <= previousTimestamp) {
const afterPrevious = previousTimestamp + 1;
return nextTimestamp != null && afterPrevious >= nextTimestamp
? previousTimestamp + (nextTimestamp - previousTimestamp) / 2
: afterPrevious;
}
if (nextTimestamp != null && desiredTimestamp >= nextTimestamp) {
const beforeNext = nextTimestamp - 1;
return previousTimestamp != null && beforeNext <= previousTimestamp
? previousTimestamp + (nextTimestamp - previousTimestamp) / 2
: beforeNext;
}
return desiredTimestamp;
}
function appendVisibleStreamStateMessages(
messages: unknown[],
state: ChatState,
replacementMessages: unknown[] = [],
opts?: { includeCurrent?: boolean },
opts?: { includeCurrent?: boolean; requirePersistedTool?: boolean },
): unknown[] {
let nextMessages = messages;
for (const part of visibleAssistantStreamParts(state, opts)) {
if (hasAssistantStreamPartReplacement([...nextMessages, ...replacementMessages], part)) {
continue;
}
const streamMessage = buildAssistantStreamMessage(
part.text,
part.replacementText,
part.timestamp,
);
const toolIndex =
part.source === "segment"
? currentToolStreamMessageIndex(nextMessages, state, part.toolCallId)
: -1;
if (opts?.requirePersistedTool && toolIndex < 0) {
continue;
}
const insertIndex = toolIndex >= 0 ? toolIndex : nextMessages.length;
const streamMessage = buildAssistantStreamMessage(
part.text,
part.replacementText,
timestampForInsertedVisibleStream(nextMessages, insertIndex, part.timestamp),
);
nextMessages =
toolIndex >= 0
? insertMessageAtIndex(nextMessages, streamMessage, toolIndex)
@@ -853,6 +893,57 @@ function clearToolStreamSegments(state: ChatState) {
}
}
function prunePersistedToolStreamMessages(state: ChatState, persistedToolIds: Set<string>) {
if (persistedToolIds.size === 0) {
return;
}
const toolHost = state as ChatState & {
chatStreamSegments?: Array<{ text?: unknown; ts?: unknown; toolCallId?: unknown }>;
chatToolMessages?: unknown[];
toolStreamById?: Map<string, unknown>;
toolStreamOrder?: unknown[];
};
const liveToolIds = currentLiveToolCallIds(state);
if (toolHost.toolStreamById instanceof Map) {
for (const id of persistedToolIds) {
toolHost.toolStreamById.delete(id);
}
}
if (Array.isArray(toolHost.toolStreamOrder)) {
toolHost.toolStreamOrder = toolHost.toolStreamOrder.filter(
(id): id is string => typeof id === "string" && !persistedToolIds.has(id),
);
}
if (Array.isArray(toolHost.chatToolMessages)) {
toolHost.chatToolMessages = toolHost.chatToolMessages.filter((message) => {
const ids = collectToolCallIds(message);
return ids.every((id) => !persistedToolIds.has(id));
});
}
if (!Array.isArray(toolHost.chatStreamSegments)) {
return;
}
let lastPrunedAccumulatedText: string | null = null;
toolHost.chatStreamSegments = toolHost.chatStreamSegments.flatMap((segment, index) => {
const explicitToolCallId =
typeof segment.toolCallId === "string" && segment.toolCallId.trim()
? segment.toolCallId.trim()
: null;
const toolCallId = explicitToolCallId ?? liveToolIds[index] ?? null;
const text = typeof segment.text === "string" ? segment.text : "";
if (toolCallId && persistedToolIds.has(toolCallId)) {
if (text.trim()) {
lastPrunedAccumulatedText = text;
}
return [];
}
const nextText = lastPrunedAccumulatedText
? trimAccumulatedVisibleStreamText(text, lastPrunedAccumulatedText)
: text;
return [{ ...segment, text: nextText }];
});
}
function resolveDeltaChatStreamText(
currentStream: string | null,
payload: ChatEventPayload,
@@ -1072,18 +1163,19 @@ async function loadChatHistoryUncached(
visibleStreamParts.every((part) =>
hasAssistantStreamPartReplacement(state.chatMessages, part),
);
const historyReplacedToolStream = hasPersistedCurrentToolStreamMessages(
state.chatMessages,
state,
);
const liveToolStreamReplaced =
currentLiveToolCallIds(state).length === 0 || historyReplacedToolStream;
const liveToolIds = currentLiveToolCallIds(state);
const persistedToolStreamIds = persistedCurrentToolStreamIds(state.chatMessages, state);
const historyReplacedToolStream =
liveToolIds.length > 0 && liveToolIds.every((id) => persistedToolStreamIds.has(id));
const historyReplacedSomeToolStream = persistedToolStreamIds.size > 0;
const liveToolStreamReplaced = liveToolIds.length === 0 || historyReplacedToolStream;
if (visibleStreamParts.length === 0 || historyReplacedStream) {
if (liveToolStreamReplaced) {
// Clear all streaming state — history includes tool results and text
// inline, so keeping streaming artifacts would cause duplicates.
maybeResetToolStream(state);
} else {
prunePersistedToolStreamMessages(state, persistedToolStreamIds);
clearToolStreamSegments(state);
}
state.chatStream = null;
@@ -1109,6 +1201,17 @@ async function loadChatHistoryUncached(
state.chatStreamStartedAt = null;
}
maybeResetToolStream(state);
} else if (historyReplacedSomeToolStream) {
const visibleCurrentTail = visibleCurrentAssistantStreamTail(state);
state.chatMessages = appendVisibleStreamStateMessages(state.chatMessages, state, [], {
includeCurrent: false,
requirePersistedTool: true,
});
state.chatStream = visibleCurrentTail;
if (state.chatStream === null) {
state.chatStreamStartedAt = null;
}
prunePersistedToolStreamMessages(state, persistedToolStreamIds);
}
}
recordChatHistoryTiming(state, "applied", startedAtMs, {