From ac9a219692aa1d89f7ef7c132666a3dc82c4078e Mon Sep 17 00:00:00 2001 From: Dallin Romney Date: Fri, 5 Jun 2026 12:09:24 -0700 Subject: [PATCH] fix(tui): stabilize optimistic user messages across history reloads, runId reassignment, and abort (#86205) * fix(tui): preserve optimistic user messages * refactor(tui): drop unused pending-user chat-log helpers * fix(tui): reconcile optimistic user row across runId reassignment and abort * refactor(tui): reuse asDateTimestampMs for history timestamp coercion * test(tui): fix event-handler chatLog render mock arity --- src/tui/components/chat-log.test.ts | 14 +-- src/tui/components/chat-log.ts | 20 +++-- src/tui/tui-command-handlers.test.ts | 124 +++++++++++++++++++++------ src/tui/tui-command-handlers.ts | 21 ++++- src/tui/tui-event-handlers.test.ts | 25 ++++++ src/tui/tui-event-handlers.ts | 14 +++ src/tui/tui-session-actions.test.ts | 108 +++++++++++++++++++++++ src/tui/tui-session-actions.ts | 26 +++++- src/tui/tui-types.ts | 1 + src/tui/tui.ts | 9 ++ 10 files changed, 322 insertions(+), 40 deletions(-) diff --git a/src/tui/components/chat-log.test.ts b/src/tui/components/chat-log.test.ts index 9dccff77e63d..c16a850eedab 100644 --- a/src/tui/components/chat-log.test.ts +++ b/src/tui/components/chat-log.test.ts @@ -174,15 +174,19 @@ describe("ChatLog", () => { expect(chatLog.render(120).join("\n")).toContain("queued hello"); }); - it("stops counting a pending user message once the run is committed", () => { + it("re-keys a pending user in place without moving its position", () => { const chatLog = new ChatLog(40); - chatLog.addPendingUser("run-1", "hello"); - expect(chatLog.countPendingUsers()).toBe(1); + chatLog.addPendingUser("local", "queued hello", 1_000); + chatLog.startAssistant("hi there", "r-accepted"); - expect(chatLog.commitPendingUser("run-1")).toBe(true); + expect(chatLog.rekeyPendingUser("local", "r-accepted")).toBe(true); + + const rendered = chatLog.render(120).join("\n"); + expect(rendered.indexOf("queued hello")).toBeLessThan(rendered.indexOf("hi there")); + // The row is now addressable by the gateway-assigned runId. + expect(chatLog.dropPendingUser("r-accepted")).toBe(true); expect(chatLog.countPendingUsers()).toBe(0); - expect(chatLog.render(120).join("\n")).toContain("hello"); }); it("reconciles pending users against rebuilt history using timestamps", () => { diff --git a/src/tui/components/chat-log.ts b/src/tui/components/chat-log.ts index 0fa753dba9d4..3ddbafce21a3 100644 --- a/src/tui/components/chat-log.ts +++ b/src/tui/components/chat-log.ts @@ -198,10 +198,6 @@ export class ChatLog extends Container { return component; } - commitPendingUser(runId: string) { - return this.pendingUsers.delete(runId); - } - dropPendingUser(runId: string) { const existing = this.pendingUsers.get(runId); if (!existing) { @@ -212,8 +208,20 @@ export class ChatLog extends Container { return true; } - hasPendingUser(runId: string) { - return this.pendingUsers.has(runId); + // Re-key in place: the gateway can assign its own runId after the optimistic + // row is rendered. Swap the map key without re-mounting the component so the + // row keeps its transcript position even if a reply already rendered below it. + rekeyPendingUser(fromRunId: string, toRunId: string) { + if (fromRunId === toRunId) { + return false; + } + const existing = this.pendingUsers.get(fromRunId); + if (!existing) { + return false; + } + this.pendingUsers.delete(fromRunId); + this.pendingUsers.set(toRunId, existing); + return true; } reconcilePendingUsers( diff --git a/src/tui/tui-command-handlers.test.ts b/src/tui/tui-command-handlers.test.ts index 580cba9123aa..186e9a6fe5a2 100644 --- a/src/tui/tui-command-handlers.test.ts +++ b/src/tui/tui-command-handlers.test.ts @@ -90,6 +90,7 @@ function createHarness(params?: { currentSessionKey?: string; abortActive?: AbortActiveMock; consumeCompletedRunForPendingSend?: ConsumeCompletedRunMock; + isRunObserved?: (runId: string) => boolean; flushPendingHistoryRefreshIfIdle?: FlushPendingHistoryRefreshMock; }) { const sendChat = params?.sendChat ?? vi.fn().mockResolvedValue({ runId: "r1" }); @@ -103,6 +104,9 @@ function createHarness(params?: { const setEmptySession = params?.setEmptySession ?? (vi.fn().mockResolvedValue(undefined) as SetEmptySessionMock); const addUser = vi.fn(); + const addPendingUser = vi.fn(); + const dropPendingUser = vi.fn(); + const rekeyPendingUser = vi.fn(); const addSystem = vi.fn(); const clearTools = vi.fn(); const reserveAssistantSlot = vi.fn(); @@ -133,6 +137,7 @@ function createHarness(params?: { activeChatRunId: params?.activeChatRunId ?? null, pendingOptimisticUserMessage: params?.pendingOptimisticUserMessage ?? false, pendingChatRunId: params?.pendingChatRunId ?? null, + pendingSubmitDraft: null as { runId: string; text: string } | null, activityStatus: params?.activityStatus ?? "idle", isConnected: params?.isConnected ?? true, sessionInfo: {}, @@ -148,7 +153,15 @@ function createHarness(params?: { resetSession, runGoalCommand, } as never, - chatLog: { addUser, addSystem, clearTools, reserveAssistantSlot } as never, + chatLog: { + addUser, + addPendingUser, + dropPendingUser, + rekeyPendingUser, + addSystem, + clearTools, + reserveAssistantSlot, + } as never, tui: { requestRender } as never, opts: params?.opts ?? {}, state: state as never, @@ -170,6 +183,7 @@ function createHarness(params?: { forgetLocalRunId, forgetLocalBtwRunId: vi.fn(), consumeCompletedRunForPendingSend: params?.consumeCompletedRunForPendingSend, + isRunObserved: params?.isRunObserved, flushPendingHistoryRefreshIfIdle: params?.flushPendingHistoryRefreshIfIdle, runAuthFlow, requestExit, @@ -190,6 +204,9 @@ function createHarness(params?: { setSession, setEmptySession, addUser, + addPendingUser, + dropPendingUser, + rekeyPendingUser, addSystem, clearTools, reserveAssistantSlot, @@ -264,12 +281,12 @@ describe("tui command handlers", () => { }); it("forwards unknown slash commands to the gateway", async () => { - const { handleCommand, sendChat, addUser, addSystem, requestRender } = createHarness(); + const { handleCommand, sendChat, addPendingUser, addSystem, requestRender } = createHarness(); await handleCommand("/unregistered-command"); expect(addSystem).not.toHaveBeenCalled(); - expect(addUser).toHaveBeenCalledWith("/unregistered-command"); + expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/unregistered-command"); expectSendChatFields(sendChat, { sessionKey: "agent:main:main", message: "/unregistered-command", @@ -277,6 +294,49 @@ describe("tui command handlers", () => { expect(requestRender).toHaveBeenCalled(); }); + it("re-keys the optimistic pending row to the gateway-accepted runId in place", async () => { + const sendChat = vi.fn().mockResolvedValue({ runId: "r-accepted" }); + const harness = createHarness({ sendChat }); + + await harness.handleCommand("hello"); + + const localRunId = harness.addPendingUser.mock.calls[0]?.[0]; + expect(localRunId).toEqual(expect.any(String)); + expect(localRunId).not.toBe("r-accepted"); + // Re-key happens in place (no drop/re-add) so the row keeps its position. + expect(harness.rekeyPendingUser).toHaveBeenCalledWith(localRunId, "r-accepted"); + expect(harness.addPendingUser).toHaveBeenCalledTimes(1); + expect(harness.dropPendingUser).not.toHaveBeenCalled(); + expect(harness.state.pendingSubmitDraft).toEqual({ runId: "r-accepted", text: "hello" }); + }); + + it("does not re-arm the submit draft when the accepted run already emitted events", async () => { + const sendChat = vi.fn().mockResolvedValue({ runId: "r-accepted" }); + const isRunObserved = vi.fn((runId: string) => runId === "r-accepted"); + const harness = createHarness({ sendChat, isRunObserved }); + + await harness.handleCommand("hello"); + + // The accepted run already registered, so the draft must not be re-armed — + // otherwise a later abort would drop a row whose reply already rendered. + expect(harness.rekeyPendingUser).toHaveBeenCalledWith(expect.any(String), "r-accepted"); + expect(harness.state.pendingSubmitDraft).toBeNull(); + }); + + it("clears the submit draft when the accepted run already completed", async () => { + const sendChat = vi.fn().mockResolvedValue({ runId: "r-accepted" }); + const consumeCompletedRunForPendingSend = vi + .fn() + .mockReturnValue(true) as ConsumeCompletedRunMock; + const harness = createHarness({ sendChat, consumeCompletedRunForPendingSend }); + + await harness.handleCommand("hello"); + + expect(harness.addPendingUser).toHaveBeenCalledTimes(1); + expect(harness.dropPendingUser).not.toHaveBeenCalled(); + expect(harness.state.pendingSubmitDraft).toBeNull(); + }); + it("passes the current backing session id when sending to the gateway", async () => { const { handleCommand, sendChat } = createHarness({ currentSessionId: "session-before-relaunch", @@ -293,10 +353,11 @@ describe("tui command handlers", () => { it("starts local goals and sends the objective to the model", async () => { const runGoalCommand = vi.fn().mockResolvedValue({ text: "Goal started: ship" }); - const { handleCommand, sendChat, addSystem, refreshSessionInfo, addUser } = createHarness({ - opts: { local: true }, - runGoalCommand, - }); + const { handleCommand, sendChat, addSystem, refreshSessionInfo, addPendingUser } = + createHarness({ + opts: { local: true }, + runGoalCommand, + }); await handleCommand("/goal start ship"); @@ -309,7 +370,7 @@ describe("tui command handlers", () => { sessionKey: "agent:main:main", message: "ship", }); - expect(addUser).toHaveBeenCalledWith("ship"); + expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "ship"); expect(addSystem).toHaveBeenCalledWith("Goal started: ship"); expect(refreshSessionInfo).toHaveBeenCalled(); }); @@ -327,7 +388,7 @@ describe("tui command handlers", () => { sessionKey: "agent:main:main", message: slashPrompt, }); - expect(slashHarness.addUser).toHaveBeenCalledWith(slashPrompt); + expect(slashHarness.addPendingUser).toHaveBeenCalledWith(expect.any(String), slashPrompt); const bangRunGoalCommand = vi.fn().mockResolvedValue({ text: "Goal started" }); const bangHarness = createHarness({ @@ -341,7 +402,7 @@ describe("tui command handlers", () => { sessionKey: "agent:main:main", message: bangPrompt, }); - expect(bangHarness.addUser).toHaveBeenCalledWith(bangPrompt); + expect(bangHarness.addPendingUser).toHaveBeenCalledWith(expect.any(String), bangPrompt); }); it("keeps local goal status as a control command", async () => { @@ -359,7 +420,7 @@ describe("tui command handlers", () => { it("wraps command-prefixed local goal resume notes before sending", async () => { const runGoalCommand = vi.fn().mockResolvedValue({ text: "Goal resumed: ship" }); - const { handleCommand, sendChat, addUser } = createHarness({ + const { handleCommand, sendChat, addPendingUser } = createHarness({ opts: { local: true }, runGoalCommand, }); @@ -371,7 +432,7 @@ describe("tui command handlers", () => { sessionKey: "agent:main:main", message: prompt, }); - expect(addUser).toHaveBeenCalledWith(prompt); + expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), prompt); }); it("passes the selected agent for local global goal commands", async () => { @@ -468,12 +529,12 @@ describe("tui command handlers", () => { }); it("forwards /status to the shared gateway command path", async () => { - const { handleCommand, sendChat, addUser, addSystem } = createHarness(); + const { handleCommand, sendChat, addPendingUser, addSystem } = createHarness(); await handleCommand("/status"); expect(addSystem).not.toHaveBeenCalled(); - expect(addUser).toHaveBeenCalledWith("/status"); + expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/status"); expectSendChatFields(sendChat, { sessionKey: "agent:main:main", message: "/status", @@ -608,10 +669,19 @@ describe("tui command handlers", () => { it("clears the pending runId if sendChat fails", async () => { const sendChat = vi.fn().mockRejectedValue(new Error("boom")); - const { handleCommand, state } = createHarness({ sendChat }); + const { + handleCommand, + sendChat: sendChatMock, + dropPendingUser, + state, + } = createHarness({ + sendChat, + }); await handleCommand("hello"); + const sentRunId = (firstMockArg(sendChatMock, "sendChat") as { runId: string }).runId; + expect(dropPendingUser).toHaveBeenCalledWith(sentRunId); expect(state.pendingChatRunId).toBeNull(); expect(state.pendingOptimisticUserMessage).toBe(false); }); @@ -837,7 +907,7 @@ describe("tui command handlers", () => { const { handleCommand, sendChat, - addUser, + addPendingUser, addSystem, reserveAssistantSlot, requestRender, @@ -857,9 +927,9 @@ describe("tui command handlers", () => { }); expect(reserveAssistantSlot).toHaveBeenCalledWith("run-active"); const reserveCallOrder = reserveAssistantSlot.mock.invocationCallOrder[0]; - const addUserCallOrder = addUser.mock.invocationCallOrder[0]; - expect(reserveCallOrder).toBeLessThan(addUserCallOrder); - expect(addUser).toHaveBeenCalledWith("/context detail"); + const addPendingUserCallOrder = addPendingUser.mock.invocationCallOrder[0]; + expect(reserveCallOrder).toBeLessThan(addPendingUserCallOrder); + expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/context detail"); expect(addSystem).not.toHaveBeenCalledWith( "agent is busy — press Esc to abort before sending a new message", ); @@ -869,7 +939,7 @@ describe("tui command handlers", () => { }); it("blocks gateway slash prompts while a run is active", async () => { - const { handleCommand, sendChat, addUser, addSystem } = createHarness({ + const { handleCommand, sendChat, addPendingUser, addSystem } = createHarness({ activeChatRunId: "run-active", activityStatus: "streaming", }); @@ -877,7 +947,7 @@ describe("tui command handlers", () => { await handleCommand("/context detail"); expect(sendChat).not.toHaveBeenCalled(); - expect(addUser).not.toHaveBeenCalled(); + expect(addPendingUser).not.toHaveBeenCalled(); expect(addSystem).toHaveBeenCalledWith( "agent is busy — press Esc to abort before sending a new message", ); @@ -900,7 +970,7 @@ describe("tui command handlers", () => { it("sends slash stop to the backend when there is no tracked run", async () => { const abortActive = vi.fn().mockResolvedValue(undefined); - const { handleCommand, sendChat, addUser } = createHarness({ abortActive }); + const { handleCommand, sendChat, addPendingUser } = createHarness({ abortActive }); await handleCommand("/stop"); @@ -910,18 +980,18 @@ describe("tui command handlers", () => { message: "/stop", sessionKey: "agent:main:main", }); - expect(addUser).toHaveBeenCalledWith("/stop"); + expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/stop"); }); it("sends broad stop-like text as a normal prompt when idle", async () => { const abortActive = vi.fn().mockResolvedValue(undefined); - const { handleCommand, sendChat, addUser } = createHarness({ abortActive }); + const { handleCommand, sendChat, addPendingUser } = createHarness({ abortActive }); await handleCommand("do not do that"); expect(abortActive).not.toHaveBeenCalled(); expect(sendChat).toHaveBeenCalledTimes(1); - expect(addUser).toHaveBeenCalledWith("do not do that"); + expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "do not do that"); }); it("rejects normal sends while a queued submit is pending registration", async () => { @@ -941,7 +1011,7 @@ describe("tui command handlers", () => { }); it("allows local sends to queue while the current run is finishing", async () => { - const { handleCommand, sendChat, addUser, addSystem } = createHarness({ + const { handleCommand, sendChat, addPendingUser, addSystem } = createHarness({ opts: { local: true }, activeChatRunId: "run-active", activityStatus: "finishing context", @@ -950,7 +1020,7 @@ describe("tui command handlers", () => { await handleCommand("/context detail"); expect(sendChat).toHaveBeenCalledTimes(1); - expect(addUser).toHaveBeenCalledWith("/context detail"); + expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/context detail"); expect(addSystem).not.toHaveBeenCalledWith( "agent is busy — press Esc to abort before sending a new message", ); diff --git a/src/tui/tui-command-handlers.ts b/src/tui/tui-command-handlers.ts index a69bb46ba803..aff7786e86fa 100644 --- a/src/tui/tui-command-handlers.ts +++ b/src/tui/tui-command-handlers.ts @@ -63,6 +63,7 @@ type CommandHandlerContext = { forgetLocalRunId?: (runId: string) => void; forgetLocalBtwRunId?: (runId: string) => void; consumeCompletedRunForPendingSend?: (runId: string) => boolean; + isRunObserved?: (runId: string) => boolean; flushPendingHistoryRefreshIfIdle?: () => void; runAuthFlow?: (params: { provider?: string; @@ -119,6 +120,7 @@ export function createCommandHandlers(context: CommandHandlerContext) { forgetLocalRunId, forgetLocalBtwRunId, consumeCompletedRunForPendingSend, + isRunObserved, flushPendingHistoryRefreshIfIdle, runAuthFlow, requestExit, @@ -748,7 +750,8 @@ export function createCommandHandlers(context: CommandHandlerContext) { ) { chatLog.reserveAssistantSlot(state.activeChatRunId); } - chatLog.addUser(text); + chatLog.addPendingUser(runId, text); + state.pendingSubmitDraft = { runId, text }; noteLocalRunId?.(runId); state.pendingOptimisticUserMessage = true; setActivityStatus("sending"); @@ -775,9 +778,21 @@ export function createCommandHandlers(context: CommandHandlerContext) { if (!acceptedRunAlreadyCompleted) { noteLocalRunId?.(acceptedRunId); } + if (state.pendingSubmitDraft?.runId === runId) { + // If the accepted run already emitted events, it is registered; + // re-arming the draft would let a later abort drop a row whose + // reply already rendered. + state.pendingSubmitDraft = isRunObserved?.(acceptedRunId) + ? null + : { runId: acceptedRunId, text }; + } + chatLog.rekeyPendingUser(runId, acceptedRunId); } if (state.pendingOptimisticUserMessage) { if (acceptedRunAlreadyCompleted) { + if (state.pendingSubmitDraft?.runId === acceptedRunId) { + state.pendingSubmitDraft = null; + } state.pendingOptimisticUserMessage = false; state.pendingChatRunId = null; setActivityStatus("idle"); @@ -803,6 +818,10 @@ export function createCommandHandlers(context: CommandHandlerContext) { state.pendingOptimisticUserMessage = false; state.pendingChatRunId = null; state.activeChatRunId = null; + if (state.pendingSubmitDraft?.runId === runId) { + state.pendingSubmitDraft = null; + } + chatLog.dropPendingUser(runId); } chatLog.addSystem(`${isBtw ? "btw failed" : "send failed"}: ${String(err)}`); if (!isBtw) { diff --git a/src/tui/tui-event-handlers.test.ts b/src/tui/tui-event-handlers.test.ts index 9ee6219c0f4f..6383c2262078 100644 --- a/src/tui/tui-event-handlers.test.ts +++ b/src/tui/tui-event-handlers.test.ts @@ -971,6 +971,31 @@ describe("tui-event-handlers: handleAgentEvent", () => { expect(loadHistory).not.toHaveBeenCalled(); }); + it("keeps pending user text after run binding until history catches up", () => { + const pendingUsers = new Map([["run-gateway", "queued hello"]]); + const chatLog = { + ...createMockChatLog(), + countPendingUsers: () => pendingUsers.size, + render: (_width: number) => Array.from(pendingUsers.values()), + }; + const { state, noteLocalRunId, handleChatEvent } = createHandlersHarness({ + chatLog: chatLog as unknown as HandlerChatLog, + state: { activeChatRunId: null, pendingOptimisticUserMessage: true }, + }); + noteLocalRunId("run-gateway"); + + handleChatEvent({ + runId: "run-gateway", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "working" }, + }); + + expect(state.pendingOptimisticUserMessage).toBe(false); + expect(chatLog.countPendingUsers()).toBe(1); + expect(chatLog.render(120).join("\n")).toContain("queued hello"); + }); + it("does not bind unknown gateway run ids while an optimistic message is pending", () => { const { state, loadHistory, isLocalRunId, handleChatEvent } = createHandlersHarness({ state: { activeChatRunId: null, pendingOptimisticUserMessage: true }, diff --git a/src/tui/tui-event-handlers.ts b/src/tui/tui-event-handlers.ts index 93a77dee71d1..0016d79be7d5 100644 --- a/src/tui/tui-event-handlers.ts +++ b/src/tui/tui-event-handlers.ts @@ -271,6 +271,12 @@ export function createEventHandlers(context: EventHandlerContext) { pruneRunMap(sessionRuns); }; + const markSubmittedRunRegistered = (runId: string) => { + if (state.pendingSubmitDraft?.runId === runId) { + state.pendingSubmitDraft = null; + } + }; + const noteFinalizedRun = (runId: string, opts?: { displayedFinal?: boolean }) => { finalizedRuns.set(runId, Date.now()); completedRuns.set(runId, Date.now()); @@ -573,6 +579,7 @@ export function createEventHandlers(context: EventHandlerContext) { clearPendingTerminalLifecycleError(evt.runId); chatLog.dismissPendingSystem(evt.runId); noteSessionRun(evt.runId); + markSubmittedRunRegistered(evt.runId); const isPendingChatRun = state.pendingChatRunId === evt.runId; const isLocalChatRun = isLocalRunId?.(evt.runId) ?? false; const isLocalBtwRun = isLocalBtwRunId?.(evt.runId) ?? false; @@ -750,6 +757,7 @@ export function createEventHandlers(context: EventHandlerContext) { if (evt.stream === "lifecycle") { if (isPendingRun) { noteSessionRun(evt.runId); + markSubmittedRunRegistered(evt.runId); state.activeChatRunId = evt.runId; state.pendingChatRunId = null; if (state.pendingOptimisticUserMessage) { @@ -856,6 +864,11 @@ export function createEventHandlers(context: EventHandlerContext) { return true; }; + // True once any event for this runId has been seen, even before sendChat + // resolves. Lets the optimistic-submit path know an accepted run already + // registered so it does not re-arm a draft the abort path would then drop. + const isRunObserved = (runId: string) => sessionRuns.has(runId); + return { handleChatEvent, handleAgentEvent, @@ -863,6 +876,7 @@ export function createEventHandlers(context: EventHandlerContext) { pauseStreamingWatchdog, reconnectStreamingWatchdog, consumeCompletedRunForPendingSend, + isRunObserved, flushPendingHistoryRefreshIfIdle, dispose, }; diff --git a/src/tui/tui-session-actions.test.ts b/src/tui/tui-session-actions.test.ts index 898e43f6630d..fb8a0de0e813 100644 --- a/src/tui/tui-session-actions.test.ts +++ b/src/tui/tui-session-actions.test.ts @@ -41,7 +41,12 @@ describe("tui session actions", () => { client: { listSessions: vi.fn() } as unknown as TuiBackend, chatLog: { addSystem: vi.fn(), + addUser: vi.fn(), + finalizeAssistant: vi.fn(), + clearPendingUsers: vi.fn(), clearAll: vi.fn(), + reconcilePendingUsers: vi.fn().mockReturnValue([]), + restorePendingUsers: vi.fn(), } as unknown as import("./components/chat-log.js").ChatLog, btw: createBtwPresenter(), tui: { requestRender: vi.fn() } as unknown as import("@earendil-works/pi-tui").TUI, @@ -501,8 +506,11 @@ describe("tui session actions", () => { const chatLog = { addSystem: vi.fn(), clearAll: vi.fn(), + clearPendingUsers: vi.fn(), addUser: vi.fn(), finalizeAssistant: vi.fn(), + reconcilePendingUsers: vi.fn().mockReturnValue([]), + restorePendingUsers: vi.fn(), updateAssistant, startTool: vi.fn(), } as unknown as import("./components/chat-log.js").ChatLog; @@ -533,8 +541,11 @@ describe("tui session actions", () => { const chatLog = { addSystem: vi.fn(), clearAll: vi.fn(), + clearPendingUsers: vi.fn(), addUser: vi.fn(), finalizeAssistant: vi.fn(), + reconcilePendingUsers: vi.fn().mockReturnValue([]), + restorePendingUsers: vi.fn(), updateAssistant, startTool: vi.fn(), } as unknown as import("./components/chat-log.js").ChatLog; @@ -562,8 +573,11 @@ describe("tui session actions", () => { const chatLog = { addSystem: vi.fn(), clearAll: vi.fn(), + clearPendingUsers: vi.fn(), addUser: vi.fn(), finalizeAssistant: vi.fn(), + reconcilePendingUsers: vi.fn().mockReturnValue([]), + restorePendingUsers: vi.fn(), updateAssistant, startTool: vi.fn(), } as unknown as import("./components/chat-log.js").ChatLog; @@ -887,6 +901,58 @@ describe("tui session actions", () => { expect(setActivityStatus).toHaveBeenCalledWith("aborted"); }); + it("drops the optimistic pending row when aborting a not-yet-registered submit", async () => { + const abortChat = vi.fn().mockResolvedValue({ ok: true, aborted: true }); + const dropPendingUser = vi.fn(); + const state = createBaseState({ + activeChatRunId: null, + pendingChatRunId: "run-1", + pendingOptimisticUserMessage: true, + pendingSubmitDraft: { runId: "run-1", text: "hello" }, + }); + + const { abortActive } = createTestSessionActions({ + client: { listSessions: vi.fn(), abortChat } as unknown as TuiBackend, + chatLog: { + addSystem: vi.fn(), + clearAll: vi.fn(), + dropPendingUser, + } as unknown as import("./components/chat-log.js").ChatLog, + state, + }); + + await abortActive(); + + expect(dropPendingUser).toHaveBeenCalledWith("run-1"); + expect(state.pendingSubmitDraft).toBeNull(); + expect(state.pendingOptimisticUserMessage).toBe(false); + }); + + it("keeps the optimistic row when aborting a run that already registered", async () => { + const abortChat = vi.fn().mockResolvedValue({ ok: true, aborted: true }); + const dropPendingUser = vi.fn(); + const state = createBaseState({ + activeChatRunId: null, + pendingChatRunId: "run-1", + pendingOptimisticUserMessage: true, + pendingSubmitDraft: null, + }); + + const { abortActive } = createTestSessionActions({ + client: { listSessions: vi.fn(), abortChat } as unknown as TuiBackend, + chatLog: { + addSystem: vi.fn(), + clearAll: vi.fn(), + dropPendingUser, + } as unknown as import("./components/chat-log.js").ChatLog, + state, + }); + + await abortActive(); + + expect(dropPendingUser).not.toHaveBeenCalled(); + }); + it("passes the selected agent when aborting selected global runs", async () => { const abortChat = vi.fn().mockResolvedValue({ ok: true, aborted: true }); const state = createBaseState({ @@ -1099,6 +1165,48 @@ describe("tui session actions", () => { expect(rememberSessionKey).toHaveBeenCalledWith("agent:main:main"); }); + it("preserves optimistic user messages across stale history rebuilds", async () => { + const listSessions = vi.fn().mockResolvedValue({ + ts: Date.now(), + path: "/tmp/sessions.json", + count: 1, + defaults: {}, + sessions: [{ key: "agent:main:main", sessionId: "session-main" }], + }); + const loadHistory = vi.fn().mockResolvedValue({ + sessionId: "session-main", + messages: [ + { role: "user", content: "persisted", timestamp: 2_000 }, + { role: "assistant", content: [{ type: "text", text: "reply" }] }, + ], + }); + const chatLog = { + addSystem: vi.fn(), + addUser: vi.fn(), + finalizeAssistant: vi.fn(), + clearAll: vi.fn(), + clearPendingUsers: vi.fn(), + reconcilePendingUsers: vi.fn().mockReturnValue([]), + restorePendingUsers: vi.fn(), + }; + + const { loadHistory: runLoadHistory } = createTestSessionActions({ + client: { + listSessions, + loadHistory, + } as unknown as TuiBackend, + chatLog: chatLog as unknown as import("./components/chat-log.js").ChatLog, + }); + + await runLoadHistory(); + + expect(chatLog.clearAll).toHaveBeenCalledWith({ preservePendingUsers: true }); + expect(chatLog.reconcilePendingUsers).toHaveBeenCalledWith([ + { text: "persisted", timestamp: 2_000 }, + ]); + expect(chatLog.restorePendingUsers).toHaveBeenCalledTimes(1); + }); + it("hydrates session info from chat history without listing sessions", async () => { const listSessions = vi.fn(); const loadHistory = vi.fn().mockResolvedValue({ diff --git a/src/tui/tui-session-actions.ts b/src/tui/tui-session-actions.ts index 13816e811671..6765b6fe263f 100644 --- a/src/tui/tui-session-actions.ts +++ b/src/tui/tui-session-actions.ts @@ -1,5 +1,6 @@ // Implements TUI session actions such as switching, forking, and resuming. import type { TUI } from "@earendil-works/pi-tui"; +import { asDateTimestampMs } from "@openclaw/normalization-core/number-coercion"; import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce"; import type { SessionsPatchResult } from "../../packages/gateway-protocol/src/index.js"; import { resolveSessionInfoModelSelection } from "../agents/model-selection-display.js"; @@ -93,6 +94,11 @@ function sessionInfoUiEquals(left: SessionInfo, right: SessionInfo): boolean { ); } +function extractMessageTimestamp(message: Record): number | null { + const raw = message.timestamp; + return asDateTimestampMs(typeof raw === "string" ? Date.parse(raw) : raw) ?? null; +} + export function createSessionActions(context: SessionActionContext) { const { client, @@ -449,7 +455,8 @@ export function createSessionActions(context: SessionActionContext) { await refreshSessionInfo(); } const showTools = (state.sessionInfo.verboseLevel ?? "off") !== "off"; - chatLog.clearAll(); + const historyUsers: Array<{ text: string; timestamp?: number | null }> = []; + chatLog.clearAll({ preservePendingUsers: true }); btw.clear(); chatLog.addSystem(`session ${state.currentSessionKey}`); for (const entry of record.messages ?? []) { @@ -467,6 +474,10 @@ export function createSessionActions(context: SessionActionContext) { if (message.role === "user") { const text = extractTextFromMessage(message); if (text) { + historyUsers.push({ + text, + timestamp: extractMessageTimestamp(message), + }); chatLog.addUser(text); } continue; @@ -501,6 +512,11 @@ export function createSessionActions(context: SessionActionContext) { ); } } + const reconciledRunIds = chatLog.reconcilePendingUsers(historyUsers); + if (state.pendingSubmitDraft && reconciledRunIds.includes(state.pendingSubmitDraft.runId)) { + state.pendingSubmitDraft = null; + } + chatLog.restorePendingUsers(); // Restore a run still streaming for this session+agent that the gateway // reports as in-flight. Its live deltas were delivered to a per-agent key // we stopped watching after switching away, so the persisted history above @@ -534,12 +550,14 @@ export function createSessionActions(context: SessionActionContext) { state.activeChatRunId = null; state.pendingChatRunId = null; state.pendingOptimisticUserMessage = false; + state.pendingSubmitDraft = null; setActivityStatus("idle"); state.currentSessionId = null; // Session keys can move backwards in updatedAt ordering; drop previous session freshness // so refresh data for the newly selected session isn't rejected as stale. state.sessionInfo.updatedAt = null; state.historyLoaded = false; + chatLog.clearPendingUsers(); clearLocalRunIds?.(); btw.clear(); updateHeader(); @@ -554,6 +572,7 @@ export function createSessionActions(context: SessionActionContext) { state.activeChatRunId = null; state.pendingChatRunId = null; state.pendingOptimisticUserMessage = false; + state.pendingSubmitDraft = null; setActivityStatus("idle"); state.currentSessionId = null; const defaults = lastSessionDefaults; @@ -604,6 +623,7 @@ export function createSessionActions(context: SessionActionContext) { const abortsPendingRun = Boolean( state.pendingChatRunId && runIds.includes(state.pendingChatRunId), ); + const pendingRunId = state.pendingChatRunId; try { for (const runId of runIds) { await client.abortChat({ @@ -615,6 +635,10 @@ export function createSessionActions(context: SessionActionContext) { state.pendingChatRunId = null; if (abortsPendingRun) { state.pendingOptimisticUserMessage = false; + if (pendingRunId && state.pendingSubmitDraft?.runId === pendingRunId) { + chatLog.dropPendingUser(pendingRunId); + state.pendingSubmitDraft = null; + } } setActivityStatus("aborted"); } catch (err) { diff --git a/src/tui/tui-types.ts b/src/tui/tui-types.ts index 3cea32b531c3..9ec02054eb8d 100644 --- a/src/tui/tui-types.ts +++ b/src/tui/tui-types.ts @@ -139,6 +139,7 @@ export type TuiStateAccess = { activeChatRunId: string | null; pendingOptimisticUserMessage?: boolean; pendingChatRunId?: string | null; + pendingSubmitDraft?: { runId: string; text: string } | null; queuedMessages?: QueuedMessage[]; historyLoaded: boolean; sessionInfo: SessionInfo; diff --git a/src/tui/tui.ts b/src/tui/tui.ts index 89c266bd2a43..ee1b39b6ca44 100644 --- a/src/tui/tui.ts +++ b/src/tui/tui.ts @@ -505,6 +505,7 @@ export async function runTui(opts: RunTuiOptions): Promise { let activeChatRunId: string | null = null; let pendingOptimisticUserMessage = false; let pendingChatRunId: string | null = null; + let pendingSubmitDraft: { runId: string; text: string } | null = null; let historyLoaded = false; let isConnected = false; let wasDisconnected = false; @@ -595,6 +596,12 @@ export async function runTui(opts: RunTuiOptions): Promise { set pendingChatRunId(value) { pendingChatRunId = value ?? null; }, + get pendingSubmitDraft() { + return pendingSubmitDraft; + }, + set pendingSubmitDraft(value) { + pendingSubmitDraft = value ?? null; + }, get historyLoaded() { return historyLoaded; }, @@ -1261,6 +1268,7 @@ export async function runTui(opts: RunTuiOptions): Promise { pauseStreamingWatchdog, reconnectStreamingWatchdog, consumeCompletedRunForPendingSend, + isRunObserved, flushPendingHistoryRefreshIfIdle, } = createEventHandlers({ chatLog, @@ -1354,6 +1362,7 @@ export async function runTui(opts: RunTuiOptions): Promise { forgetLocalRunId, forgetLocalBtwRunId, consumeCompletedRunForPendingSend, + isRunObserved, flushPendingHistoryRefreshIfIdle, runAuthFlow, requestExit,