fix(tui): stabilize optimistic user messages across history reloads, runId reassignment, and abort (#86205)

* fix(tui): preserve optimistic user messages

* refactor(tui): drop unused pending-user chat-log helpers

* fix(tui): reconcile optimistic user row across runId reassignment and abort

* refactor(tui): reuse asDateTimestampMs for history timestamp coercion

* test(tui): fix event-handler chatLog render mock arity
This commit is contained in:
Dallin Romney
2026-06-05 12:09:24 -07:00
committed by GitHub
parent db7d70ae4d
commit ac9a219692
10 changed files with 322 additions and 40 deletions

View File

@@ -174,15 +174,19 @@ describe("ChatLog", () => {
expect(chatLog.render(120).join("\n")).toContain("queued hello");
});
it("stops counting a pending user message once the run is committed", () => {
it("re-keys a pending user in place without moving its position", () => {
const chatLog = new ChatLog(40);
chatLog.addPendingUser("run-1", "hello");
expect(chatLog.countPendingUsers()).toBe(1);
chatLog.addPendingUser("local", "queued hello", 1_000);
chatLog.startAssistant("hi there", "r-accepted");
expect(chatLog.commitPendingUser("run-1")).toBe(true);
expect(chatLog.rekeyPendingUser("local", "r-accepted")).toBe(true);
const rendered = chatLog.render(120).join("\n");
expect(rendered.indexOf("queued hello")).toBeLessThan(rendered.indexOf("hi there"));
// The row is now addressable by the gateway-assigned runId.
expect(chatLog.dropPendingUser("r-accepted")).toBe(true);
expect(chatLog.countPendingUsers()).toBe(0);
expect(chatLog.render(120).join("\n")).toContain("hello");
});
it("reconciles pending users against rebuilt history using timestamps", () => {

View File

@@ -198,10 +198,6 @@ export class ChatLog extends Container {
return component;
}
commitPendingUser(runId: string) {
return this.pendingUsers.delete(runId);
}
dropPendingUser(runId: string) {
const existing = this.pendingUsers.get(runId);
if (!existing) {
@@ -212,8 +208,20 @@ export class ChatLog extends Container {
return true;
}
hasPendingUser(runId: string) {
return this.pendingUsers.has(runId);
// Re-key in place: the gateway can assign its own runId after the optimistic
// row is rendered. Swap the map key without re-mounting the component so the
// row keeps its transcript position even if a reply already rendered below it.
rekeyPendingUser(fromRunId: string, toRunId: string) {
if (fromRunId === toRunId) {
return false;
}
const existing = this.pendingUsers.get(fromRunId);
if (!existing) {
return false;
}
this.pendingUsers.delete(fromRunId);
this.pendingUsers.set(toRunId, existing);
return true;
}
reconcilePendingUsers(

View File

@@ -90,6 +90,7 @@ function createHarness(params?: {
currentSessionKey?: string;
abortActive?: AbortActiveMock;
consumeCompletedRunForPendingSend?: ConsumeCompletedRunMock;
isRunObserved?: (runId: string) => boolean;
flushPendingHistoryRefreshIfIdle?: FlushPendingHistoryRefreshMock;
}) {
const sendChat = params?.sendChat ?? vi.fn().mockResolvedValue({ runId: "r1" });
@@ -103,6 +104,9 @@ function createHarness(params?: {
const setEmptySession =
params?.setEmptySession ?? (vi.fn().mockResolvedValue(undefined) as SetEmptySessionMock);
const addUser = vi.fn();
const addPendingUser = vi.fn();
const dropPendingUser = vi.fn();
const rekeyPendingUser = vi.fn();
const addSystem = vi.fn();
const clearTools = vi.fn();
const reserveAssistantSlot = vi.fn();
@@ -133,6 +137,7 @@ function createHarness(params?: {
activeChatRunId: params?.activeChatRunId ?? null,
pendingOptimisticUserMessage: params?.pendingOptimisticUserMessage ?? false,
pendingChatRunId: params?.pendingChatRunId ?? null,
pendingSubmitDraft: null as { runId: string; text: string } | null,
activityStatus: params?.activityStatus ?? "idle",
isConnected: params?.isConnected ?? true,
sessionInfo: {},
@@ -148,7 +153,15 @@ function createHarness(params?: {
resetSession,
runGoalCommand,
} as never,
chatLog: { addUser, addSystem, clearTools, reserveAssistantSlot } as never,
chatLog: {
addUser,
addPendingUser,
dropPendingUser,
rekeyPendingUser,
addSystem,
clearTools,
reserveAssistantSlot,
} as never,
tui: { requestRender } as never,
opts: params?.opts ?? {},
state: state as never,
@@ -170,6 +183,7 @@ function createHarness(params?: {
forgetLocalRunId,
forgetLocalBtwRunId: vi.fn(),
consumeCompletedRunForPendingSend: params?.consumeCompletedRunForPendingSend,
isRunObserved: params?.isRunObserved,
flushPendingHistoryRefreshIfIdle: params?.flushPendingHistoryRefreshIfIdle,
runAuthFlow,
requestExit,
@@ -190,6 +204,9 @@ function createHarness(params?: {
setSession,
setEmptySession,
addUser,
addPendingUser,
dropPendingUser,
rekeyPendingUser,
addSystem,
clearTools,
reserveAssistantSlot,
@@ -264,12 +281,12 @@ describe("tui command handlers", () => {
});
it("forwards unknown slash commands to the gateway", async () => {
const { handleCommand, sendChat, addUser, addSystem, requestRender } = createHarness();
const { handleCommand, sendChat, addPendingUser, addSystem, requestRender } = createHarness();
await handleCommand("/unregistered-command");
expect(addSystem).not.toHaveBeenCalled();
expect(addUser).toHaveBeenCalledWith("/unregistered-command");
expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/unregistered-command");
expectSendChatFields(sendChat, {
sessionKey: "agent:main:main",
message: "/unregistered-command",
@@ -277,6 +294,49 @@ describe("tui command handlers", () => {
expect(requestRender).toHaveBeenCalled();
});
it("re-keys the optimistic pending row to the gateway-accepted runId in place", async () => {
const sendChat = vi.fn().mockResolvedValue({ runId: "r-accepted" });
const harness = createHarness({ sendChat });
await harness.handleCommand("hello");
const localRunId = harness.addPendingUser.mock.calls[0]?.[0];
expect(localRunId).toEqual(expect.any(String));
expect(localRunId).not.toBe("r-accepted");
// Re-key happens in place (no drop/re-add) so the row keeps its position.
expect(harness.rekeyPendingUser).toHaveBeenCalledWith(localRunId, "r-accepted");
expect(harness.addPendingUser).toHaveBeenCalledTimes(1);
expect(harness.dropPendingUser).not.toHaveBeenCalled();
expect(harness.state.pendingSubmitDraft).toEqual({ runId: "r-accepted", text: "hello" });
});
it("does not re-arm the submit draft when the accepted run already emitted events", async () => {
const sendChat = vi.fn().mockResolvedValue({ runId: "r-accepted" });
const isRunObserved = vi.fn((runId: string) => runId === "r-accepted");
const harness = createHarness({ sendChat, isRunObserved });
await harness.handleCommand("hello");
// The accepted run already registered, so the draft must not be re-armed —
// otherwise a later abort would drop a row whose reply already rendered.
expect(harness.rekeyPendingUser).toHaveBeenCalledWith(expect.any(String), "r-accepted");
expect(harness.state.pendingSubmitDraft).toBeNull();
});
it("clears the submit draft when the accepted run already completed", async () => {
const sendChat = vi.fn().mockResolvedValue({ runId: "r-accepted" });
const consumeCompletedRunForPendingSend = vi
.fn()
.mockReturnValue(true) as ConsumeCompletedRunMock;
const harness = createHarness({ sendChat, consumeCompletedRunForPendingSend });
await harness.handleCommand("hello");
expect(harness.addPendingUser).toHaveBeenCalledTimes(1);
expect(harness.dropPendingUser).not.toHaveBeenCalled();
expect(harness.state.pendingSubmitDraft).toBeNull();
});
it("passes the current backing session id when sending to the gateway", async () => {
const { handleCommand, sendChat } = createHarness({
currentSessionId: "session-before-relaunch",
@@ -293,10 +353,11 @@ describe("tui command handlers", () => {
it("starts local goals and sends the objective to the model", async () => {
const runGoalCommand = vi.fn().mockResolvedValue({ text: "Goal started: ship" });
const { handleCommand, sendChat, addSystem, refreshSessionInfo, addUser } = createHarness({
opts: { local: true },
runGoalCommand,
});
const { handleCommand, sendChat, addSystem, refreshSessionInfo, addPendingUser } =
createHarness({
opts: { local: true },
runGoalCommand,
});
await handleCommand("/goal start ship");
@@ -309,7 +370,7 @@ describe("tui command handlers", () => {
sessionKey: "agent:main:main",
message: "ship",
});
expect(addUser).toHaveBeenCalledWith("ship");
expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "ship");
expect(addSystem).toHaveBeenCalledWith("Goal started: ship");
expect(refreshSessionInfo).toHaveBeenCalled();
});
@@ -327,7 +388,7 @@ describe("tui command handlers", () => {
sessionKey: "agent:main:main",
message: slashPrompt,
});
expect(slashHarness.addUser).toHaveBeenCalledWith(slashPrompt);
expect(slashHarness.addPendingUser).toHaveBeenCalledWith(expect.any(String), slashPrompt);
const bangRunGoalCommand = vi.fn().mockResolvedValue({ text: "Goal started" });
const bangHarness = createHarness({
@@ -341,7 +402,7 @@ describe("tui command handlers", () => {
sessionKey: "agent:main:main",
message: bangPrompt,
});
expect(bangHarness.addUser).toHaveBeenCalledWith(bangPrompt);
expect(bangHarness.addPendingUser).toHaveBeenCalledWith(expect.any(String), bangPrompt);
});
it("keeps local goal status as a control command", async () => {
@@ -359,7 +420,7 @@ describe("tui command handlers", () => {
it("wraps command-prefixed local goal resume notes before sending", async () => {
const runGoalCommand = vi.fn().mockResolvedValue({ text: "Goal resumed: ship" });
const { handleCommand, sendChat, addUser } = createHarness({
const { handleCommand, sendChat, addPendingUser } = createHarness({
opts: { local: true },
runGoalCommand,
});
@@ -371,7 +432,7 @@ describe("tui command handlers", () => {
sessionKey: "agent:main:main",
message: prompt,
});
expect(addUser).toHaveBeenCalledWith(prompt);
expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), prompt);
});
it("passes the selected agent for local global goal commands", async () => {
@@ -468,12 +529,12 @@ describe("tui command handlers", () => {
});
it("forwards /status to the shared gateway command path", async () => {
const { handleCommand, sendChat, addUser, addSystem } = createHarness();
const { handleCommand, sendChat, addPendingUser, addSystem } = createHarness();
await handleCommand("/status");
expect(addSystem).not.toHaveBeenCalled();
expect(addUser).toHaveBeenCalledWith("/status");
expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/status");
expectSendChatFields(sendChat, {
sessionKey: "agent:main:main",
message: "/status",
@@ -608,10 +669,19 @@ describe("tui command handlers", () => {
it("clears the pending runId if sendChat fails", async () => {
const sendChat = vi.fn().mockRejectedValue(new Error("boom"));
const { handleCommand, state } = createHarness({ sendChat });
const {
handleCommand,
sendChat: sendChatMock,
dropPendingUser,
state,
} = createHarness({
sendChat,
});
await handleCommand("hello");
const sentRunId = (firstMockArg(sendChatMock, "sendChat") as { runId: string }).runId;
expect(dropPendingUser).toHaveBeenCalledWith(sentRunId);
expect(state.pendingChatRunId).toBeNull();
expect(state.pendingOptimisticUserMessage).toBe(false);
});
@@ -837,7 +907,7 @@ describe("tui command handlers", () => {
const {
handleCommand,
sendChat,
addUser,
addPendingUser,
addSystem,
reserveAssistantSlot,
requestRender,
@@ -857,9 +927,9 @@ describe("tui command handlers", () => {
});
expect(reserveAssistantSlot).toHaveBeenCalledWith("run-active");
const reserveCallOrder = reserveAssistantSlot.mock.invocationCallOrder[0];
const addUserCallOrder = addUser.mock.invocationCallOrder[0];
expect(reserveCallOrder).toBeLessThan(addUserCallOrder);
expect(addUser).toHaveBeenCalledWith("/context detail");
const addPendingUserCallOrder = addPendingUser.mock.invocationCallOrder[0];
expect(reserveCallOrder).toBeLessThan(addPendingUserCallOrder);
expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/context detail");
expect(addSystem).not.toHaveBeenCalledWith(
"agent is busy — press Esc to abort before sending a new message",
);
@@ -869,7 +939,7 @@ describe("tui command handlers", () => {
});
it("blocks gateway slash prompts while a run is active", async () => {
const { handleCommand, sendChat, addUser, addSystem } = createHarness({
const { handleCommand, sendChat, addPendingUser, addSystem } = createHarness({
activeChatRunId: "run-active",
activityStatus: "streaming",
});
@@ -877,7 +947,7 @@ describe("tui command handlers", () => {
await handleCommand("/context detail");
expect(sendChat).not.toHaveBeenCalled();
expect(addUser).not.toHaveBeenCalled();
expect(addPendingUser).not.toHaveBeenCalled();
expect(addSystem).toHaveBeenCalledWith(
"agent is busy — press Esc to abort before sending a new message",
);
@@ -900,7 +970,7 @@ describe("tui command handlers", () => {
it("sends slash stop to the backend when there is no tracked run", async () => {
const abortActive = vi.fn().mockResolvedValue(undefined);
const { handleCommand, sendChat, addUser } = createHarness({ abortActive });
const { handleCommand, sendChat, addPendingUser } = createHarness({ abortActive });
await handleCommand("/stop");
@@ -910,18 +980,18 @@ describe("tui command handlers", () => {
message: "/stop",
sessionKey: "agent:main:main",
});
expect(addUser).toHaveBeenCalledWith("/stop");
expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/stop");
});
it("sends broad stop-like text as a normal prompt when idle", async () => {
const abortActive = vi.fn().mockResolvedValue(undefined);
const { handleCommand, sendChat, addUser } = createHarness({ abortActive });
const { handleCommand, sendChat, addPendingUser } = createHarness({ abortActive });
await handleCommand("do not do that");
expect(abortActive).not.toHaveBeenCalled();
expect(sendChat).toHaveBeenCalledTimes(1);
expect(addUser).toHaveBeenCalledWith("do not do that");
expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "do not do that");
});
it("rejects normal sends while a queued submit is pending registration", async () => {
@@ -941,7 +1011,7 @@ describe("tui command handlers", () => {
});
it("allows local sends to queue while the current run is finishing", async () => {
const { handleCommand, sendChat, addUser, addSystem } = createHarness({
const { handleCommand, sendChat, addPendingUser, addSystem } = createHarness({
opts: { local: true },
activeChatRunId: "run-active",
activityStatus: "finishing context",
@@ -950,7 +1020,7 @@ describe("tui command handlers", () => {
await handleCommand("/context detail");
expect(sendChat).toHaveBeenCalledTimes(1);
expect(addUser).toHaveBeenCalledWith("/context detail");
expect(addPendingUser).toHaveBeenCalledWith(expect.any(String), "/context detail");
expect(addSystem).not.toHaveBeenCalledWith(
"agent is busy — press Esc to abort before sending a new message",
);

View File

@@ -63,6 +63,7 @@ type CommandHandlerContext = {
forgetLocalRunId?: (runId: string) => void;
forgetLocalBtwRunId?: (runId: string) => void;
consumeCompletedRunForPendingSend?: (runId: string) => boolean;
isRunObserved?: (runId: string) => boolean;
flushPendingHistoryRefreshIfIdle?: () => void;
runAuthFlow?: (params: {
provider?: string;
@@ -119,6 +120,7 @@ export function createCommandHandlers(context: CommandHandlerContext) {
forgetLocalRunId,
forgetLocalBtwRunId,
consumeCompletedRunForPendingSend,
isRunObserved,
flushPendingHistoryRefreshIfIdle,
runAuthFlow,
requestExit,
@@ -748,7 +750,8 @@ export function createCommandHandlers(context: CommandHandlerContext) {
) {
chatLog.reserveAssistantSlot(state.activeChatRunId);
}
chatLog.addUser(text);
chatLog.addPendingUser(runId, text);
state.pendingSubmitDraft = { runId, text };
noteLocalRunId?.(runId);
state.pendingOptimisticUserMessage = true;
setActivityStatus("sending");
@@ -775,9 +778,21 @@ export function createCommandHandlers(context: CommandHandlerContext) {
if (!acceptedRunAlreadyCompleted) {
noteLocalRunId?.(acceptedRunId);
}
if (state.pendingSubmitDraft?.runId === runId) {
// If the accepted run already emitted events, it is registered;
// re-arming the draft would let a later abort drop a row whose
// reply already rendered.
state.pendingSubmitDraft = isRunObserved?.(acceptedRunId)
? null
: { runId: acceptedRunId, text };
}
chatLog.rekeyPendingUser(runId, acceptedRunId);
}
if (state.pendingOptimisticUserMessage) {
if (acceptedRunAlreadyCompleted) {
if (state.pendingSubmitDraft?.runId === acceptedRunId) {
state.pendingSubmitDraft = null;
}
state.pendingOptimisticUserMessage = false;
state.pendingChatRunId = null;
setActivityStatus("idle");
@@ -803,6 +818,10 @@ export function createCommandHandlers(context: CommandHandlerContext) {
state.pendingOptimisticUserMessage = false;
state.pendingChatRunId = null;
state.activeChatRunId = null;
if (state.pendingSubmitDraft?.runId === runId) {
state.pendingSubmitDraft = null;
}
chatLog.dropPendingUser(runId);
}
chatLog.addSystem(`${isBtw ? "btw failed" : "send failed"}: ${String(err)}`);
if (!isBtw) {

View File

@@ -971,6 +971,31 @@ describe("tui-event-handlers: handleAgentEvent", () => {
expect(loadHistory).not.toHaveBeenCalled();
});
it("keeps pending user text after run binding until history catches up", () => {
const pendingUsers = new Map([["run-gateway", "queued hello"]]);
const chatLog = {
...createMockChatLog(),
countPendingUsers: () => pendingUsers.size,
render: (_width: number) => Array.from(pendingUsers.values()),
};
const { state, noteLocalRunId, handleChatEvent } = createHandlersHarness({
chatLog: chatLog as unknown as HandlerChatLog,
state: { activeChatRunId: null, pendingOptimisticUserMessage: true },
});
noteLocalRunId("run-gateway");
handleChatEvent({
runId: "run-gateway",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "working" },
});
expect(state.pendingOptimisticUserMessage).toBe(false);
expect(chatLog.countPendingUsers()).toBe(1);
expect(chatLog.render(120).join("\n")).toContain("queued hello");
});
it("does not bind unknown gateway run ids while an optimistic message is pending", () => {
const { state, loadHistory, isLocalRunId, handleChatEvent } = createHandlersHarness({
state: { activeChatRunId: null, pendingOptimisticUserMessage: true },

View File

@@ -271,6 +271,12 @@ export function createEventHandlers(context: EventHandlerContext) {
pruneRunMap(sessionRuns);
};
const markSubmittedRunRegistered = (runId: string) => {
if (state.pendingSubmitDraft?.runId === runId) {
state.pendingSubmitDraft = null;
}
};
const noteFinalizedRun = (runId: string, opts?: { displayedFinal?: boolean }) => {
finalizedRuns.set(runId, Date.now());
completedRuns.set(runId, Date.now());
@@ -573,6 +579,7 @@ export function createEventHandlers(context: EventHandlerContext) {
clearPendingTerminalLifecycleError(evt.runId);
chatLog.dismissPendingSystem(evt.runId);
noteSessionRun(evt.runId);
markSubmittedRunRegistered(evt.runId);
const isPendingChatRun = state.pendingChatRunId === evt.runId;
const isLocalChatRun = isLocalRunId?.(evt.runId) ?? false;
const isLocalBtwRun = isLocalBtwRunId?.(evt.runId) ?? false;
@@ -750,6 +757,7 @@ export function createEventHandlers(context: EventHandlerContext) {
if (evt.stream === "lifecycle") {
if (isPendingRun) {
noteSessionRun(evt.runId);
markSubmittedRunRegistered(evt.runId);
state.activeChatRunId = evt.runId;
state.pendingChatRunId = null;
if (state.pendingOptimisticUserMessage) {
@@ -856,6 +864,11 @@ export function createEventHandlers(context: EventHandlerContext) {
return true;
};
// True once any event for this runId has been seen, even before sendChat
// resolves. Lets the optimistic-submit path know an accepted run already
// registered so it does not re-arm a draft the abort path would then drop.
const isRunObserved = (runId: string) => sessionRuns.has(runId);
return {
handleChatEvent,
handleAgentEvent,
@@ -863,6 +876,7 @@ export function createEventHandlers(context: EventHandlerContext) {
pauseStreamingWatchdog,
reconnectStreamingWatchdog,
consumeCompletedRunForPendingSend,
isRunObserved,
flushPendingHistoryRefreshIfIdle,
dispose,
};

View File

@@ -41,7 +41,12 @@ describe("tui session actions", () => {
client: { listSessions: vi.fn() } as unknown as TuiBackend,
chatLog: {
addSystem: vi.fn(),
addUser: vi.fn(),
finalizeAssistant: vi.fn(),
clearPendingUsers: vi.fn(),
clearAll: vi.fn(),
reconcilePendingUsers: vi.fn().mockReturnValue([]),
restorePendingUsers: vi.fn(),
} as unknown as import("./components/chat-log.js").ChatLog,
btw: createBtwPresenter(),
tui: { requestRender: vi.fn() } as unknown as import("@earendil-works/pi-tui").TUI,
@@ -501,8 +506,11 @@ describe("tui session actions", () => {
const chatLog = {
addSystem: vi.fn(),
clearAll: vi.fn(),
clearPendingUsers: vi.fn(),
addUser: vi.fn(),
finalizeAssistant: vi.fn(),
reconcilePendingUsers: vi.fn().mockReturnValue([]),
restorePendingUsers: vi.fn(),
updateAssistant,
startTool: vi.fn(),
} as unknown as import("./components/chat-log.js").ChatLog;
@@ -533,8 +541,11 @@ describe("tui session actions", () => {
const chatLog = {
addSystem: vi.fn(),
clearAll: vi.fn(),
clearPendingUsers: vi.fn(),
addUser: vi.fn(),
finalizeAssistant: vi.fn(),
reconcilePendingUsers: vi.fn().mockReturnValue([]),
restorePendingUsers: vi.fn(),
updateAssistant,
startTool: vi.fn(),
} as unknown as import("./components/chat-log.js").ChatLog;
@@ -562,8 +573,11 @@ describe("tui session actions", () => {
const chatLog = {
addSystem: vi.fn(),
clearAll: vi.fn(),
clearPendingUsers: vi.fn(),
addUser: vi.fn(),
finalizeAssistant: vi.fn(),
reconcilePendingUsers: vi.fn().mockReturnValue([]),
restorePendingUsers: vi.fn(),
updateAssistant,
startTool: vi.fn(),
} as unknown as import("./components/chat-log.js").ChatLog;
@@ -887,6 +901,58 @@ describe("tui session actions", () => {
expect(setActivityStatus).toHaveBeenCalledWith("aborted");
});
it("drops the optimistic pending row when aborting a not-yet-registered submit", async () => {
const abortChat = vi.fn().mockResolvedValue({ ok: true, aborted: true });
const dropPendingUser = vi.fn();
const state = createBaseState({
activeChatRunId: null,
pendingChatRunId: "run-1",
pendingOptimisticUserMessage: true,
pendingSubmitDraft: { runId: "run-1", text: "hello" },
});
const { abortActive } = createTestSessionActions({
client: { listSessions: vi.fn(), abortChat } as unknown as TuiBackend,
chatLog: {
addSystem: vi.fn(),
clearAll: vi.fn(),
dropPendingUser,
} as unknown as import("./components/chat-log.js").ChatLog,
state,
});
await abortActive();
expect(dropPendingUser).toHaveBeenCalledWith("run-1");
expect(state.pendingSubmitDraft).toBeNull();
expect(state.pendingOptimisticUserMessage).toBe(false);
});
it("keeps the optimistic row when aborting a run that already registered", async () => {
const abortChat = vi.fn().mockResolvedValue({ ok: true, aborted: true });
const dropPendingUser = vi.fn();
const state = createBaseState({
activeChatRunId: null,
pendingChatRunId: "run-1",
pendingOptimisticUserMessage: true,
pendingSubmitDraft: null,
});
const { abortActive } = createTestSessionActions({
client: { listSessions: vi.fn(), abortChat } as unknown as TuiBackend,
chatLog: {
addSystem: vi.fn(),
clearAll: vi.fn(),
dropPendingUser,
} as unknown as import("./components/chat-log.js").ChatLog,
state,
});
await abortActive();
expect(dropPendingUser).not.toHaveBeenCalled();
});
it("passes the selected agent when aborting selected global runs", async () => {
const abortChat = vi.fn().mockResolvedValue({ ok: true, aborted: true });
const state = createBaseState({
@@ -1099,6 +1165,48 @@ describe("tui session actions", () => {
expect(rememberSessionKey).toHaveBeenCalledWith("agent:main:main");
});
it("preserves optimistic user messages across stale history rebuilds", async () => {
const listSessions = vi.fn().mockResolvedValue({
ts: Date.now(),
path: "/tmp/sessions.json",
count: 1,
defaults: {},
sessions: [{ key: "agent:main:main", sessionId: "session-main" }],
});
const loadHistory = vi.fn().mockResolvedValue({
sessionId: "session-main",
messages: [
{ role: "user", content: "persisted", timestamp: 2_000 },
{ role: "assistant", content: [{ type: "text", text: "reply" }] },
],
});
const chatLog = {
addSystem: vi.fn(),
addUser: vi.fn(),
finalizeAssistant: vi.fn(),
clearAll: vi.fn(),
clearPendingUsers: vi.fn(),
reconcilePendingUsers: vi.fn().mockReturnValue([]),
restorePendingUsers: vi.fn(),
};
const { loadHistory: runLoadHistory } = createTestSessionActions({
client: {
listSessions,
loadHistory,
} as unknown as TuiBackend,
chatLog: chatLog as unknown as import("./components/chat-log.js").ChatLog,
});
await runLoadHistory();
expect(chatLog.clearAll).toHaveBeenCalledWith({ preservePendingUsers: true });
expect(chatLog.reconcilePendingUsers).toHaveBeenCalledWith([
{ text: "persisted", timestamp: 2_000 },
]);
expect(chatLog.restorePendingUsers).toHaveBeenCalledTimes(1);
});
it("hydrates session info from chat history without listing sessions", async () => {
const listSessions = vi.fn();
const loadHistory = vi.fn().mockResolvedValue({

View File

@@ -1,5 +1,6 @@
// Implements TUI session actions such as switching, forking, and resuming.
import type { TUI } from "@earendil-works/pi-tui";
import { asDateTimestampMs } from "@openclaw/normalization-core/number-coercion";
import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce";
import type { SessionsPatchResult } from "../../packages/gateway-protocol/src/index.js";
import { resolveSessionInfoModelSelection } from "../agents/model-selection-display.js";
@@ -93,6 +94,11 @@ function sessionInfoUiEquals(left: SessionInfo, right: SessionInfo): boolean {
);
}
function extractMessageTimestamp(message: Record<string, unknown>): number | null {
const raw = message.timestamp;
return asDateTimestampMs(typeof raw === "string" ? Date.parse(raw) : raw) ?? null;
}
export function createSessionActions(context: SessionActionContext) {
const {
client,
@@ -449,7 +455,8 @@ export function createSessionActions(context: SessionActionContext) {
await refreshSessionInfo();
}
const showTools = (state.sessionInfo.verboseLevel ?? "off") !== "off";
chatLog.clearAll();
const historyUsers: Array<{ text: string; timestamp?: number | null }> = [];
chatLog.clearAll({ preservePendingUsers: true });
btw.clear();
chatLog.addSystem(`session ${state.currentSessionKey}`);
for (const entry of record.messages ?? []) {
@@ -467,6 +474,10 @@ export function createSessionActions(context: SessionActionContext) {
if (message.role === "user") {
const text = extractTextFromMessage(message);
if (text) {
historyUsers.push({
text,
timestamp: extractMessageTimestamp(message),
});
chatLog.addUser(text);
}
continue;
@@ -501,6 +512,11 @@ export function createSessionActions(context: SessionActionContext) {
);
}
}
const reconciledRunIds = chatLog.reconcilePendingUsers(historyUsers);
if (state.pendingSubmitDraft && reconciledRunIds.includes(state.pendingSubmitDraft.runId)) {
state.pendingSubmitDraft = null;
}
chatLog.restorePendingUsers();
// Restore a run still streaming for this session+agent that the gateway
// reports as in-flight. Its live deltas were delivered to a per-agent key
// we stopped watching after switching away, so the persisted history above
@@ -534,12 +550,14 @@ export function createSessionActions(context: SessionActionContext) {
state.activeChatRunId = null;
state.pendingChatRunId = null;
state.pendingOptimisticUserMessage = false;
state.pendingSubmitDraft = null;
setActivityStatus("idle");
state.currentSessionId = null;
// Session keys can move backwards in updatedAt ordering; drop previous session freshness
// so refresh data for the newly selected session isn't rejected as stale.
state.sessionInfo.updatedAt = null;
state.historyLoaded = false;
chatLog.clearPendingUsers();
clearLocalRunIds?.();
btw.clear();
updateHeader();
@@ -554,6 +572,7 @@ export function createSessionActions(context: SessionActionContext) {
state.activeChatRunId = null;
state.pendingChatRunId = null;
state.pendingOptimisticUserMessage = false;
state.pendingSubmitDraft = null;
setActivityStatus("idle");
state.currentSessionId = null;
const defaults = lastSessionDefaults;
@@ -604,6 +623,7 @@ export function createSessionActions(context: SessionActionContext) {
const abortsPendingRun = Boolean(
state.pendingChatRunId && runIds.includes(state.pendingChatRunId),
);
const pendingRunId = state.pendingChatRunId;
try {
for (const runId of runIds) {
await client.abortChat({
@@ -615,6 +635,10 @@ export function createSessionActions(context: SessionActionContext) {
state.pendingChatRunId = null;
if (abortsPendingRun) {
state.pendingOptimisticUserMessage = false;
if (pendingRunId && state.pendingSubmitDraft?.runId === pendingRunId) {
chatLog.dropPendingUser(pendingRunId);
state.pendingSubmitDraft = null;
}
}
setActivityStatus("aborted");
} catch (err) {

View File

@@ -139,6 +139,7 @@ export type TuiStateAccess = {
activeChatRunId: string | null;
pendingOptimisticUserMessage?: boolean;
pendingChatRunId?: string | null;
pendingSubmitDraft?: { runId: string; text: string } | null;
queuedMessages?: QueuedMessage[];
historyLoaded: boolean;
sessionInfo: SessionInfo;

View File

@@ -505,6 +505,7 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
let activeChatRunId: string | null = null;
let pendingOptimisticUserMessage = false;
let pendingChatRunId: string | null = null;
let pendingSubmitDraft: { runId: string; text: string } | null = null;
let historyLoaded = false;
let isConnected = false;
let wasDisconnected = false;
@@ -595,6 +596,12 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
set pendingChatRunId(value) {
pendingChatRunId = value ?? null;
},
get pendingSubmitDraft() {
return pendingSubmitDraft;
},
set pendingSubmitDraft(value) {
pendingSubmitDraft = value ?? null;
},
get historyLoaded() {
return historyLoaded;
},
@@ -1261,6 +1268,7 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
pauseStreamingWatchdog,
reconnectStreamingWatchdog,
consumeCompletedRunForPendingSend,
isRunObserved,
flushPendingHistoryRefreshIfIdle,
} = createEventHandlers({
chatLog,
@@ -1354,6 +1362,7 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
forgetLocalRunId,
forgetLocalBtwRunId,
consumeCompletedRunForPendingSend,
isRunObserved,
flushPendingHistoryRefreshIfIdle,
runAuthFlow,
requestExit,