mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
perf(ui): speed up first global chat sends
Speed up Control UI first global chat sends by letting safe literal-global startup refresh use the fresh hello default before agents.list finishes, while keeping stale carried/cached agent ids out of that fast path. Adds chat history/send and gateway chat.send timing markers for the next latency pass.
This commit is contained in:
@@ -50,6 +50,7 @@ import { resolveMirroredTranscriptText } from "../../config/sessions/transcript-
|
||||
import { CURRENT_SESSION_VERSION } from "../../config/sessions/version.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
emitDiagnosticsTimelineEvent,
|
||||
measureDiagnosticsTimelineSpan,
|
||||
measureDiagnosticsTimelineSpanSync,
|
||||
} from "../../infra/diagnostics-timeline.js";
|
||||
@@ -3074,6 +3075,16 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
const clientInfo = client?.connect?.client;
|
||||
const chatSendTraceAttributes = {
|
||||
runId: clientRunId,
|
||||
sessionKey,
|
||||
agentId: selectedAgent.agentId ?? agentId,
|
||||
provider: resolvedSessionModel.provider,
|
||||
model: resolvedSessionModel.model,
|
||||
hasAttachments: normalizedAttachments.length > 0,
|
||||
hasExplicitOrigin: explicitOriginResult.value !== undefined,
|
||||
hasConnectedClient: client?.connect !== undefined,
|
||||
};
|
||||
const originatingRoute = resolveChatSendOriginatingRoute({
|
||||
client: clientInfo,
|
||||
deliver: p.deliver,
|
||||
@@ -3162,8 +3173,8 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
phase: "agent-turn",
|
||||
config: cfg,
|
||||
attributes: {
|
||||
...chatSendTraceAttributes,
|
||||
attachmentCount: normalizedAttachments.length,
|
||||
hasExplicitOrigin: explicitOriginResult.value !== undefined,
|
||||
},
|
||||
},
|
||||
);
|
||||
@@ -3219,6 +3230,18 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
runId: clientRunId,
|
||||
status: "started" as const,
|
||||
};
|
||||
emitDiagnosticsTimelineEvent(
|
||||
{
|
||||
type: "mark",
|
||||
name: "gateway.chat_send.ack_ready",
|
||||
phase: "agent-turn",
|
||||
attributes: {
|
||||
...chatSendTraceAttributes,
|
||||
ackStatus: ackPayload.status,
|
||||
},
|
||||
},
|
||||
{ config: cfg },
|
||||
);
|
||||
respond(true, ackPayload, undefined, { runId: clientRunId });
|
||||
const persistedImagesPromise = persistChatSendImages({
|
||||
images: parsedImages,
|
||||
@@ -3347,16 +3370,6 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
agentId,
|
||||
channel: INTERNAL_MESSAGE_CHANNEL,
|
||||
});
|
||||
const chatSendTraceAttributes = {
|
||||
runId: clientRunId,
|
||||
sessionKey,
|
||||
agentId: selectedAgent.agentId ?? agentId,
|
||||
provider: resolvedSessionModel.provider,
|
||||
model: resolvedSessionModel.model,
|
||||
hasAttachments: normalizedAttachments.length > 0,
|
||||
hasExplicitOrigin: explicitOriginResult.value !== undefined,
|
||||
hasConnectedClient: client?.connect !== undefined,
|
||||
};
|
||||
const deliveredReplies: Array<{ payload: ReplyPayload; kind: "block" | "final" }> = [];
|
||||
let appendedWebchatAgentMedia = false;
|
||||
let agentRunStarted = false;
|
||||
|
||||
@@ -1215,6 +1215,16 @@ describe("gateway server chat", () => {
|
||||
}, FAST_WAIT_OPTS);
|
||||
await vi.waitFor(async () => {
|
||||
const events = await readTimelineEvents(timelinePath);
|
||||
const ackReady = events.find(
|
||||
(event) =>
|
||||
event.type === "mark" &&
|
||||
event.name === "gateway.chat_send.ack_ready" &&
|
||||
(event.attributes as Record<string, unknown> | undefined)?.runId === "idem-timeline",
|
||||
);
|
||||
expect(ackReady?.attributes).toMatchObject({
|
||||
runId: "idem-timeline",
|
||||
ackStatus: "started",
|
||||
});
|
||||
expect(
|
||||
events.some(
|
||||
(event) =>
|
||||
|
||||
@@ -426,6 +426,50 @@ describe("refreshChat", () => {
|
||||
expect(requestUpdate).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("records chat history timing when a reload resets active stream state", async () => {
|
||||
const request = vi.fn((method: string) => {
|
||||
if (method === "chat.history") {
|
||||
return Promise.resolve({
|
||||
messages: [{ role: "assistant", content: [{ type: "text", text: "ready" }] }],
|
||||
});
|
||||
}
|
||||
return pendingPromise();
|
||||
});
|
||||
const host = makeHost({
|
||||
client: { request } as unknown as ChatHost["client"],
|
||||
sessionKey: "main",
|
||||
chatRunId: "run-main",
|
||||
chatStream: "partial",
|
||||
eventLogBuffer: [],
|
||||
});
|
||||
|
||||
await refreshChat(host, { awaitHistory: true, scheduleScroll: false });
|
||||
|
||||
expect(host.chatStream).toBeNull();
|
||||
expect(eventPayloads(host, "control-ui.chat.history")).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
phase: "start",
|
||||
sessionKey: "main",
|
||||
previousRunId: "run-main",
|
||||
}),
|
||||
expect.objectContaining({
|
||||
phase: "stream-reset",
|
||||
sessionKey: "main",
|
||||
previousRunId: "run-main",
|
||||
activeRunId: "run-main",
|
||||
visibleMessageCount: 1,
|
||||
}),
|
||||
expect.objectContaining({
|
||||
phase: "applied",
|
||||
sessionKey: "main",
|
||||
previousRunId: "run-main",
|
||||
resetStream: true,
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it("drains a restored queue after refresh proves the selected session is idle", async () => {
|
||||
const request = vi.fn(async (method: string) => {
|
||||
if (method === "chat.history") {
|
||||
|
||||
@@ -252,6 +252,35 @@ describe("connectGateway chat load startup work", () => {
|
||||
expect(refreshActiveTabMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("starts literal global chat refresh before agents.list when hello names the default agent", async () => {
|
||||
const agentsList = createDeferred();
|
||||
loadAgentsMock.mockReturnValueOnce(agentsList.promise);
|
||||
const { host, client } = connectHost("chat");
|
||||
host.sessionKey = "global";
|
||||
|
||||
client.emitHello({
|
||||
type: "hello-ok",
|
||||
protocol: 4,
|
||||
snapshot: {
|
||||
sessionDefaults: {
|
||||
defaultAgentId: "ops",
|
||||
mainKey: "main",
|
||||
mainSessionKey: "agent:ops:main",
|
||||
},
|
||||
},
|
||||
auth: { role: "operator", scopes: [] },
|
||||
});
|
||||
|
||||
await vi.waitFor(() => expect(refreshActiveTabMock).toHaveBeenCalledWith(host));
|
||||
expect(loadAgentsMock).toHaveBeenCalledWith(host);
|
||||
expect(refreshActiveTabMock).toHaveBeenCalledTimes(1);
|
||||
|
||||
agentsList.resolve();
|
||||
await agentsList.promise;
|
||||
await Promise.resolve();
|
||||
expect(refreshActiveTabMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("waits for agents.list when a stale agent session may need fallback", async () => {
|
||||
const agentsList = createDeferred();
|
||||
const { host, client } = connectHost("chat");
|
||||
@@ -307,6 +336,7 @@ describe("connectGateway chat load startup work", () => {
|
||||
};
|
||||
});
|
||||
host.sessionKey = "global";
|
||||
host.assistantAgentId = "old-default";
|
||||
host.agentsList = {
|
||||
defaultId: "old-default",
|
||||
mainKey: "main",
|
||||
|
||||
@@ -574,7 +574,20 @@ function canRefreshActiveTabBeforeAgents(host: GatewayHost): boolean {
|
||||
return false;
|
||||
}
|
||||
if (isUiGlobalSessionKey(host.sessionKey)) {
|
||||
return false;
|
||||
const freshDefaultAgentId = resolveFreshDefaultAgentId(host);
|
||||
if (!freshDefaultAgentId) {
|
||||
return false;
|
||||
}
|
||||
const carriedAgentId = host.assistantAgentId
|
||||
? normalizeAgentId(host.assistantAgentId)
|
||||
: undefined;
|
||||
if (carriedAgentId && carriedAgentId !== freshDefaultAgentId) {
|
||||
return false;
|
||||
}
|
||||
const cachedDefaultAgentId = host.agentsList?.defaultId
|
||||
? normalizeAgentId(host.agentsList.defaultId)
|
||||
: undefined;
|
||||
return !cachedDefaultAgentId || cachedDefaultAgentId === freshDefaultAgentId;
|
||||
}
|
||||
const parsed = parseAgentSessionKey(host.sessionKey);
|
||||
if (!parsed) {
|
||||
|
||||
@@ -8,6 +8,11 @@ import { extractText } from "../chat/message-extract.ts";
|
||||
import { reconcileChatRunLifecycle } from "../chat/run-lifecycle.ts";
|
||||
import { buildUserChatMessageContentBlocks } from "../chat/user-message-content.ts";
|
||||
import { formatConnectError } from "../connect-error.ts";
|
||||
import {
|
||||
controlUiNowMs,
|
||||
recordControlUiPerformanceEvent,
|
||||
roundedControlUiDurationMs,
|
||||
} from "../control-ui-performance.ts";
|
||||
import { GatewayRequestError, type GatewayBrowserClient, type GatewayHelloOk } from "../gateway.ts";
|
||||
import {
|
||||
areUiSessionKeysEquivalent,
|
||||
@@ -503,6 +508,26 @@ type InFlightChatHistoryRequest = {
|
||||
|
||||
const inFlightChatHistoryRequests = new WeakMap<ChatState, InFlightChatHistoryRequest>();
|
||||
|
||||
function recordChatHistoryTiming(
|
||||
state: ChatState,
|
||||
phase: "start" | "applied" | "stream-reset" | "stale" | "error",
|
||||
startedAtMs: number,
|
||||
extra: Record<string, unknown> = {},
|
||||
) {
|
||||
recordControlUiPerformanceEvent(
|
||||
state as ChatState & Parameters<typeof recordControlUiPerformanceEvent>[0],
|
||||
"control-ui.chat.history",
|
||||
{
|
||||
phase,
|
||||
durationMs: roundedControlUiDurationMs(controlUiNowMs() - startedAtMs),
|
||||
sessionKey: state.sessionKey,
|
||||
activeRunId: state.chatRunId,
|
||||
...extra,
|
||||
},
|
||||
{ console: false, maxBufferedEventsForType: 30 },
|
||||
);
|
||||
}
|
||||
|
||||
export async function loadChatHistory(state: ChatState): Promise<ChatHistoryResult | undefined> {
|
||||
if (!state.client || !state.connected) {
|
||||
return undefined;
|
||||
@@ -544,8 +569,14 @@ async function loadChatHistoryUncached(
|
||||
): Promise<ChatHistoryResult | undefined> {
|
||||
const requestVersion = beginChatHistoryRequest(state);
|
||||
const startedAt = Date.now();
|
||||
const startedAtMs = controlUiNowMs();
|
||||
const previousMessages = state.chatMessages;
|
||||
const previousRunId = state.chatRunId;
|
||||
recordChatHistoryTiming(state, "start", startedAtMs, {
|
||||
requestSessionKey: sessionKey,
|
||||
requestAgentId,
|
||||
previousRunId,
|
||||
});
|
||||
// Any pending input-history snapshot becomes invalid once we start reloading transcript state.
|
||||
state.resetChatInputHistoryNavigation?.();
|
||||
state.chatLoading = true;
|
||||
@@ -562,6 +593,12 @@ async function loadChatHistoryUncached(
|
||||
break;
|
||||
} catch (err) {
|
||||
if (!shouldApplyChatHistoryResult(state, requestVersion, sessionKey, requestAgentId)) {
|
||||
recordChatHistoryTiming(state, "stale", startedAtMs, {
|
||||
requestSessionKey: sessionKey,
|
||||
requestAgentId,
|
||||
previousRunId,
|
||||
reason: "request-version",
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
const withinStartupRetryWindow =
|
||||
@@ -577,6 +614,12 @@ async function loadChatHistoryUncached(
|
||||
}
|
||||
}
|
||||
if (!shouldApplyChatHistoryResult(state, requestVersion, sessionKey, requestAgentId)) {
|
||||
recordChatHistoryTiming(state, "stale", startedAtMs, {
|
||||
requestSessionKey: sessionKey,
|
||||
requestAgentId,
|
||||
previousRunId,
|
||||
reason: "apply-version",
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
const messages = Array.isArray(res.messages) ? res.messages : [];
|
||||
@@ -597,18 +640,45 @@ async function loadChatHistoryUncached(
|
||||
? res.sessionId
|
||||
: null;
|
||||
state.chatThinkingLevel = res.sessionInfo?.thinkingLevel ?? res.thinkingLevel ?? null;
|
||||
if (!state.chatRunId || state.chatRunId === previousRunId) {
|
||||
const resetStream = !state.chatRunId || state.chatRunId === previousRunId;
|
||||
if (resetStream) {
|
||||
// Clear all streaming state — history includes tool results and text
|
||||
// inline, so keeping streaming artifacts would cause duplicates.
|
||||
maybeResetToolStream(state);
|
||||
state.chatStream = null;
|
||||
state.chatStreamStartedAt = null;
|
||||
recordChatHistoryTiming(state, "stream-reset", startedAtMs, {
|
||||
requestSessionKey: sessionKey,
|
||||
requestAgentId,
|
||||
previousRunId,
|
||||
messageCount: messages.length,
|
||||
visibleMessageCount: visibleMessages.length,
|
||||
});
|
||||
}
|
||||
recordChatHistoryTiming(state, "applied", startedAtMs, {
|
||||
requestSessionKey: sessionKey,
|
||||
requestAgentId,
|
||||
previousRunId,
|
||||
messageCount: messages.length,
|
||||
visibleMessageCount: visibleMessages.length,
|
||||
resetStream,
|
||||
});
|
||||
return res;
|
||||
} catch (err) {
|
||||
if (!shouldApplyChatHistoryResult(state, requestVersion, sessionKey, requestAgentId)) {
|
||||
recordChatHistoryTiming(state, "stale", startedAtMs, {
|
||||
requestSessionKey: sessionKey,
|
||||
requestAgentId,
|
||||
previousRunId,
|
||||
reason: "error-version",
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
recordChatHistoryTiming(state, "error", startedAtMs, {
|
||||
requestSessionKey: sessionKey,
|
||||
requestAgentId,
|
||||
previousRunId,
|
||||
});
|
||||
if (isMissingOperatorReadScopeError(err)) {
|
||||
state.chatMessages = [];
|
||||
state.chatThinkingLevel = null;
|
||||
|
||||
@@ -278,8 +278,10 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
|
||||
});
|
||||
const page = await context.newPage();
|
||||
const gateway = await installMockGateway(page, {
|
||||
defaultAgentId: "ops",
|
||||
deferredMethods: ["agents.list", "chat.history"],
|
||||
historyMessages: [],
|
||||
sessionKey: "global",
|
||||
});
|
||||
|
||||
try {
|
||||
@@ -296,7 +298,8 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
|
||||
const sendRequest = await gateway.waitForRequest("chat.send");
|
||||
const params = requireRecord(sendRequest.params);
|
||||
expect(params.message).toBe(prompt);
|
||||
expect(params.sessionKey).toBe("main");
|
||||
expect(params.sessionKey).toBe("global");
|
||||
expect(params.agentId).toBe("ops");
|
||||
|
||||
const runId = requireString(params.idempotencyKey, "chat send idempotency key");
|
||||
await page.locator(".chat-thread").getByText(prompt).waitFor({ timeout: 10_000 });
|
||||
@@ -308,7 +311,8 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
runId,
|
||||
sessionKey: "main",
|
||||
agentId: "ops",
|
||||
sessionKey: "global",
|
||||
state: "delta",
|
||||
});
|
||||
await page.getByText("First token visible.").waitFor({ timeout: 10_000 });
|
||||
|
||||
Reference in New Issue
Block a user