diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index edbab03789ea..0a29ff653126 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -2943,6 +2943,7 @@ export const chatHandlers: GatewayRequestHandlers = { } const rawSessionKey = p.sessionKey; const agentIdOverride = normalizeOptionalText(p.agentId); + const clientRunId = p.idempotencyKey; const requestedAgentId = resolveRequestedChatAgentId({ cfg: (context as { getRuntimeConfig?: () => OpenClawConfig }).getRuntimeConfig?.(), requestedSessionKey: rawSessionKey, @@ -2959,6 +2960,7 @@ export const chatHandlers: GatewayRequestHandlers = { { phase: "agent-turn", attributes: { + runId: clientRunId, hasAttachments: normalizedAttachments.length > 0, hasExplicitOrigin: explicitOriginResult.value !== undefined, }, @@ -3013,7 +3015,6 @@ export const chatHandlers: GatewayRequestHandlers = { overrideMs: p.timeoutMs, }); const now = Date.now(); - const clientRunId = p.idempotencyKey; const sendPolicy = resolveSendPolicy({ cfg, @@ -3347,6 +3348,11 @@ export const chatHandlers: GatewayRequestHandlers = { channel: INTERNAL_MESSAGE_CHANNEL, }); const chatSendTraceAttributes = { + runId: clientRunId, + sessionKey, + agentId: selectedAgent.agentId ?? agentId, + provider: resolvedSessionModel.provider, + model: resolvedSessionModel.model, hasAttachments: normalizedAttachments.length > 0, hasExplicitOrigin: explicitOriginResult.value !== undefined, hasConnectedClient: client?.connect !== undefined, diff --git a/src/gateway/server.chat.gateway-server-chat-b.test.ts b/src/gateway/server.chat.gateway-server-chat-b.test.ts index 1adebd209b6a..fe4678ad5140 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.test.ts @@ -116,6 +116,15 @@ async function writeMainSessionTranscript(sessionDir: string, lines: string[]) { await fs.writeFile(path.join(sessionDir, "sess-main.jsonl"), `${lines.join("\n")}\n`, "utf-8"); } +async function readTimelineEvents(filePath: string): Promise>> { + const raw = await fs.readFile(filePath, "utf-8"); + return raw + .trim() + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as Record); +} + async function fetchHistoryMessages( ws: GatewaySocket, params?: { @@ -1178,6 +1187,60 @@ describe("gateway server chat", () => { }); }); + test("chat.send diagnostics timeline carries run correlation attributes", async () => { + const timelineDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-timeline-")); + const timelinePath = path.join(timelineDir, "timeline.jsonl"); + const previousDiagnostics = process.env.OPENCLAW_DIAGNOSTICS; + const previousTimelinePath = process.env.OPENCLAW_DIAGNOSTICS_TIMELINE_PATH; + process.env.OPENCLAW_DIAGNOSTICS = "timeline"; + process.env.OPENCLAW_DIAGNOSTICS_TIMELINE_PATH = timelinePath; + try { + await withGatewayChatHarness(async ({ ws, createSessionDir }) => { + const spy = getReplyFromConfig; + await connectOk(ws); + + await createSessionDir(); + await writeMainSessionStore(); + mockGetReplyFromConfigOnce(async () => undefined); + + const sendRes = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-timeline", + }); + expect(sendRes.ok).toBe(true); + + await vi.waitFor(() => { + expect(spy.mock.calls.length).toBeGreaterThan(0); + }, FAST_WAIT_OPTS); + await vi.waitFor(async () => { + const events = await readTimelineEvents(timelinePath); + expect( + events.some( + (event) => + event.type === "span.end" && + event.name === "gateway.chat_send.dispatch_inbound" && + (event.attributes as Record | undefined)?.runId === + "idem-timeline", + ), + ).toBe(true); + }, FAST_WAIT_OPTS); + }); + } finally { + if (previousDiagnostics === undefined) { + delete process.env.OPENCLAW_DIAGNOSTICS; + } else { + process.env.OPENCLAW_DIAGNOSTICS = previousDiagnostics; + } + if (previousTimelinePath === undefined) { + delete process.env.OPENCLAW_DIAGNOSTICS_TIMELINE_PATH; + } else { + process.env.OPENCLAW_DIAGNOSTICS_TIMELINE_PATH = previousTimelinePath; + } + await fs.rm(timelineDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 }); + } + }); + test("chat.history hard-caps single oversized nested payloads", async () => { await withGatewayChatHarness(async ({ ws, createSessionDir }) => { const historyMaxBytes = 64 * 1024; diff --git a/ui/src/ui/app-chat.ts b/ui/src/ui/app-chat.ts index 7ed70a5858e4..2224435f4be0 100644 --- a/ui/src/ui/app-chat.ts +++ b/ui/src/ui/app-chat.ts @@ -27,13 +27,13 @@ import type { ChatSideResult } from "./chat/side-result.ts"; import { executeSlashCommand } from "./chat/slash-command-executor.ts"; import { parseSlashCommand, refreshSlashCommands } from "./chat/slash-commands.ts"; import { formatConnectError } from "./connect-error.ts"; +import { resolveControlUiAuthHeader } from "./control-ui-auth.ts"; import { controlUiNowMs, recordControlUiPerformanceEvent, roundedControlUiDurationMs, scheduleControlUiAfterPaint, } from "./control-ui-performance.ts"; -import { resolveControlUiAuthHeader } from "./control-ui-auth.ts"; import { abortChatRun, appendUserChatMessage, @@ -41,7 +41,9 @@ import { requestChatSend, sendDetachedChatMessage, sendSteerChatMessage, + type ChatEventPayload, type ChatHistoryResult, + type ChatSendAck, type ChatState, } from "./controllers/chat.ts"; import { loadModels } from "./controllers/models.ts"; @@ -105,6 +107,7 @@ export type ChatHost = ChatInputHistoryState & { refreshSessionsAfterChat: Map; pendingAbort?: { runId?: string | null; sessionKey: string; agentId?: string } | null; chatSubmitGuards?: Map>; + chatSendTimingsByRun?: Map; assistantAgentId?: string | null; agentsList?: { defaultId?: string | null; mainKey?: string | null } | null; eventLogBuffer?: unknown[]; @@ -395,8 +398,9 @@ function enqueuePendingSendMessage( attachments?: ChatAttachment[], refreshSessions?: boolean, submittedAtMs = controlUiNowMs(), - sendState: ChatQueueItem["sendState"] = - host.connected && host.client ? "sending" : "waiting-reconnect", + sendState: ChatQueueItem["sendState"] = host.connected && host.client + ? "sending" + : "waiting-reconnect", ): ChatQueueItem | null { const trimmed = text.trim(); const hasAttachments = Boolean(attachments && attachments.length > 0); @@ -541,9 +545,9 @@ function cancelPendingSendBeforeRequest( restoreComposer && opts.previousDraft != null && !host.chatMessage.trim(); const willRestoreAttachments = Boolean( restoreComposer && - opts.previousAttachments?.length && - host.chatAttachments.length === 0 && - (willRestoreDraft || !host.chatMessage.trim()), + opts.previousAttachments?.length && + host.chatAttachments.length === 0 && + (willRestoreDraft || !host.chatMessage.trim()), ); if (restoreComposer) { if (willRestoreDraft) { @@ -568,11 +572,26 @@ type ChatSendTimingPhase = | "pending-painted" | "request-start" | "ack" + | "first-assistant-visible" + | "terminal-before-delta" | "queued-busy" | "waiting-model" | "waiting-reconnect" | "failed"; +type ChatSendTimingEntry = { + runId: string; + sessionKey?: string; + agentId?: string; + sendAttempts: number; + sendState?: ChatQueueItem["sendState"]; + submittedAtMs: number; + requestStartedAtMs?: number; + ackAtMs?: number; + ackStatus?: ChatSendAck["status"]; + firstAssistantVisibleRecorded?: boolean; +}; + function recordChatSendTiming( host: ChatHost, item: Pick< @@ -603,6 +622,156 @@ function recordChatSendTiming( ); } +function ensureChatSendTimingEntries(host: ChatHost): Map { + if (host.chatSendTimingsByRun) { + return host.chatSendTimingsByRun; + } + const entries = new Map(); + host.chatSendTimingsByRun = entries; + return entries; +} + +function registerChatSendTiming( + host: ChatHost, + item: Pick< + ChatQueueItem, + "sendRunId" | "sessionKey" | "agentId" | "sendAttempts" | "sendState" | "sendSubmittedAtMs" + >, + runId: string, + requestStartedAtMs: number, +) { + ensureChatSendTimingEntries(host).set(runId, { + runId, + sessionKey: item.sessionKey, + agentId: item.agentId, + sendAttempts: item.sendAttempts ?? 0, + sendState: item.sendState, + submittedAtMs: item.sendSubmittedAtMs ?? requestStartedAtMs, + requestStartedAtMs, + }); +} + +function updateChatSendAckTiming( + host: ChatHost, + requestedRunId: string, + ack: ChatSendAck, + item: Pick< + ChatQueueItem, + "sessionKey" | "agentId" | "sendAttempts" | "sendState" | "sendSubmittedAtMs" + >, + requestStartedAtMs: number, +) { + const entries = ensureChatSendTimingEntries(host); + const existing = entries.get(requestedRunId); + const submittedAtMs = existing?.submittedAtMs ?? item.sendSubmittedAtMs ?? requestStartedAtMs; + const next: ChatSendTimingEntry = { + ...(existing ?? { + runId: ack.runId, + sessionKey: item.sessionKey, + agentId: item.agentId, + sendAttempts: item.sendAttempts ?? 0, + sendState: item.sendState, + submittedAtMs, + requestStartedAtMs, + }), + runId: ack.runId, + sessionKey: existing?.sessionKey ?? item.sessionKey, + agentId: existing?.agentId ?? item.agentId, + ackAtMs: controlUiNowMs(), + ackStatus: ack.status, + }; + if (ack.runId !== requestedRunId) { + entries.delete(requestedRunId); + } + entries.set(ack.runId, next); +} + +function chatEventHasVisibleTerminalPayload(payload: ChatEventPayload): boolean { + if (payload.state === "error" && payload.errorMessage?.trim()) { + return true; + } + return Boolean(payload.message && typeof payload.message === "object"); +} + +function resolveFirstAssistantTimingPhase( + host: ChatHost, + payload: ChatEventPayload, + entry: ChatSendTimingEntry, +): Extract | null { + if (entry.firstAssistantVisibleRecorded) { + return null; + } + if (payload.state === "delta") { + return typeof host.chatStream === "string" && host.chatStream.trim() + ? "first-assistant-visible" + : null; + } + if (payload.state === "final" || payload.state === "aborted" || payload.state === "error") { + return chatEventHasVisibleTerminalPayload(payload) ? "terminal-before-delta" : null; + } + return null; +} + +export function recordFirstAssistantChatTiming( + host: ChatHost, + payload: ChatEventPayload | undefined, + handledState: ChatEventPayload["state"] | null, +) { + if (!payload || !handledState || typeof payload.runId !== "string") { + return; + } + const runId = payload.runId.trim(); + const entry = runId ? host.chatSendTimingsByRun?.get(runId) : undefined; + if (!entry) { + return; + } + const phase = resolveFirstAssistantTimingPhase(host, payload, entry); + if (!phase) { + if (payload.state === "final" || payload.state === "aborted" || payload.state === "error") { + host.chatSendTimingsByRun?.delete(runId); + } + return; + } + + const eventAtMs = controlUiNowMs(); + entry.firstAssistantVisibleRecorded = true; + scheduleControlUiAfterPaint(host, () => { + const paintedAtMs = controlUiNowMs(); + recordControlUiPerformanceEvent( + host as Parameters[0], + "control-ui.chat.send", + { + phase, + durationMs: roundedControlUiDurationMs(paintedAtMs - entry.submittedAtMs), + runId, + sessionKey: entry.sessionKey ?? payload.sessionKey, + agentId: entry.agentId ?? payload.agentId, + sendAttempts: entry.sendAttempts, + sendState: entry.sendState, + ackStatus: entry.ackStatus, + eventState: payload.state, + firstAssistantPaintMs: roundedControlUiDurationMs(paintedAtMs - eventAtMs), + ...(entry.requestStartedAtMs != null + ? { + requestToFirstAssistantEventMs: roundedControlUiDurationMs( + eventAtMs - entry.requestStartedAtMs, + ), + } + : {}), + ...(entry.ackAtMs != null + ? { + ackToFirstAssistantEventMs: roundedControlUiDurationMs(eventAtMs - entry.ackAtMs), + } + : {}), + }, + { console: false, maxBufferedEventsForType: 40 }, + ); + if (phase === "terminal-before-delta") { + host.chatSendTimingsByRun?.delete(runId); + } + }); +} + function shouldRecordPendingSendPaint(item: ChatQueueItem): boolean { return ( typeof item.sendSubmittedAtMs === "number" && @@ -622,21 +791,18 @@ function schedulePendingSendPaintTiming( if (!sendRunId || startedAtMs == null) { return; } - scheduleControlUiAfterPaint( - host as Parameters[0], - () => { - if (!visibleSessionMatches(host, sessionKey, item.agentId)) { - return; - } - const queued = readChatQueueForSession(host, sessionKey).find( - (entry) => entry.id === item.id && entry.sendRunId === sendRunId, - ); - if (!queued || !shouldRecordPendingSendPaint(queued)) { - return; - } - recordChatSendTiming(host, queued, "pending-painted", startedAtMs); - }, - ); + scheduleControlUiAfterPaint(host as Parameters[0], () => { + if (!visibleSessionMatches(host, sessionKey, item.agentId)) { + return; + } + const queued = readChatQueueForSession(host, sessionKey).find( + (entry) => entry.id === item.id && entry.sendRunId === sendRunId, + ); + if (!queued || !shouldRecordPendingSendPaint(queued)) { + return; + } + recordChatSendTiming(host, queued, "pending-painted", startedAtMs); + }); } function ensureQueuedSendState( @@ -695,17 +861,19 @@ async function sendQueuedChatMessage( const runId = prepared.sendRunId ?? generateUUID(); const startedAt = Date.now(); const requestStartedAtMs = controlUiNowMs(); - updateQueuedMessageForSession(host, sessionKey, id, (item) => ({ - ...item, - sendAttempts: (item.sendAttempts ?? 0) + 1, - sendError: undefined, - sendRunId: runId, - sendState: "sending", - sendRequestStartedAtMs: requestStartedAtMs, - sessionKey, - agentId: prepared.agentId, - })); - recordChatSendTiming(host, prepared, "request-start", prepared.sendSubmittedAtMs); + const sendingItem = + updateQueuedMessageForSession(host, sessionKey, id, (item) => ({ + ...item, + sendAttempts: (item.sendAttempts ?? 0) + 1, + sendError: undefined, + sendRunId: runId, + sendState: "sending", + sendRequestStartedAtMs: requestStartedAtMs, + sessionKey, + agentId: prepared.agentId, + })) ?? prepared; + registerChatSendTiming(host, sendingItem, runId, requestStartedAtMs); + recordChatSendTiming(host, sendingItem, "request-start", sendingItem.sendSubmittedAtMs); host.chatSending = true; const isVisibleSession = () => visibleSessionMatches(host, sessionKey, prepared.agentId); if (isVisibleSession()) { @@ -723,7 +891,8 @@ async function sendQueuedChatMessage( sessionKey, agentId: prepared.agentId, }); - recordChatSendTiming(host, prepared, "ack", prepared.sendSubmittedAtMs, { + updateChatSendAckTiming(host, runId, ack, sendingItem, requestStartedAtMs); + recordChatSendTiming(host, sendingItem, "ack", sendingItem.sendSubmittedAtMs, { ackStatus: ack.status, requestDurationMs: roundedControlUiDurationMs(controlUiNowMs() - requestStartedAtMs), }); diff --git a/ui/src/ui/app-gateway.node.test.ts b/ui/src/ui/app-gateway.node.test.ts index 616be91c38dd..2e58251858a9 100644 --- a/ui/src/ui/app-gateway.node.test.ts +++ b/ui/src/ui/app-gateway.node.test.ts @@ -125,6 +125,7 @@ type TestGatewayHost = Parameters[0] & { chatSideResult: unknown; chatSideResultTerminalRuns: Set; chatStream: string | null; + updateComplete?: Promise; chatToolMessages: Record[]; activityEntries: ActivityEntry[]; toolStreamById: Map; @@ -209,6 +210,18 @@ function connectHostGateway() { return { host, client }; } +function eventPayloads(host: TestGatewayHost, event: string): Array> { + const payloads: Array> = []; + for (const entry of host.eventLogBuffer) { + const candidate = entry as { event?: unknown; payload?: unknown }; + if (candidate.event !== event || !candidate.payload || typeof candidate.payload !== "object") { + continue; + } + payloads.push(candidate.payload as Record); + } + return payloads; +} + function emitToolResultEvent(client: GatewayClientMock) { client.emitEvent({ event: "agent", @@ -1104,6 +1117,67 @@ describe("connectGateway", () => { expect(loadChatHistoryMock).not.toHaveBeenCalled(); }); + it("records first assistant paint timing for tracked chat sends", async () => { + const { host, client } = connectHostGateway(); + host.updateComplete = Promise.resolve(); + host.chatRunId = "run-first-visible"; + host.chatStream = ""; + ( + host as TestGatewayHost & { + chatSendTimingsByRun: Map>; + } + ).chatSendTimingsByRun = new Map([ + [ + "run-first-visible", + { + runId: "run-first-visible", + sessionKey: "main", + sendAttempts: 1, + sendState: "sending", + submittedAtMs: 100, + requestStartedAtMs: 125, + ackAtMs: 150, + ackStatus: "started", + }, + ], + ]); + + client.emitEvent({ + event: "chat", + payload: { + runId: "run-first-visible", + sessionKey: "main", + state: "delta", + deltaText: "Hello", + message: { + role: "assistant", + content: [{ type: "text", text: "Hello" }], + timestamp: Date.now(), + }, + }, + }); + + await vi.waitFor(() => + expect( + eventPayloads(host, "control-ui.chat.send").some( + (payload) => payload.phase === "first-assistant-visible", + ), + ).toBe(true), + ); + const firstVisible = eventPayloads(host, "control-ui.chat.send").find( + (payload) => payload.phase === "first-assistant-visible", + ); + expect(firstVisible).toMatchObject({ + runId: "run-first-visible", + sessionKey: "main", + ackStatus: "started", + eventState: "delta", + sendState: "sending", + }); + expect(firstVisible?.ackToFirstAssistantEventMs).toEqual(expect.any(Number)); + expect(host.chatStream).toBe("Hello"); + }); + it("renders session-scoped tool events for externally started runs", () => { const { host, client } = connectHostGateway(); diff --git a/ui/src/ui/app-gateway.ts b/ui/src/ui/app-gateway.ts index baefb00112cf..44bbc683a563 100644 --- a/ui/src/ui/app-gateway.ts +++ b/ui/src/ui/app-gateway.ts @@ -9,6 +9,7 @@ import { flushChatQueueForEvent, hasReconnectableQueuedChatSends, markQueuedChatSendsWaitingForReconnect, + recordFirstAssistantChatTiming, refreshChatAvatar, scopedAgentListParamsForRefreshTarget, retryReconnectableQueuedChatSends, @@ -895,6 +896,11 @@ function handleChatGatewayEvent(host: GatewayHost, payload: ChatEventPayload | u } const activeRunIdBeforeEvent = host.chatRunId; const state = handleChatEvent(host as unknown as ChatState, payload); + recordFirstAssistantChatTiming( + host as unknown as Parameters[0], + payload, + state, + ); const terminalEventIsForDifferentActiveRun = isEventForDifferentActiveRun( payload, activeRunIdBeforeEvent, diff --git a/ui/src/ui/e2e/chat-flow.e2e.test.ts b/ui/src/ui/e2e/chat-flow.e2e.test.ts index 769cc4a2f5b4..7459569117ab 100644 --- a/ui/src/ui/e2e/chat-flow.e2e.test.ts +++ b/ui/src/ui/e2e/chat-flow.e2e.test.ts @@ -56,6 +56,26 @@ async function chatThreadDistanceFromBottom(page: Page): Promise { }); } +async function controlUiEventPayloads( + page: Page, + event: string, +): Promise>> { + return page.evaluate((eventName) => { + const app = document.querySelector("openclaw-app") as + | (Element & { eventLogBuffer?: unknown[] }) + | null; + return (app?.eventLogBuffer ?? []) + .filter((entry): entry is { event: string; payload: Record } => { + const candidate = entry as { event?: unknown; payload?: unknown }; + return ( + candidate.event === eventName && + Boolean(candidate.payload && typeof candidate.payload === "object") + ); + }) + .map((entry) => entry.payload); + }, event); +} + function chatSessionListResponse() { return { count: 2, @@ -280,6 +300,40 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => { const runId = requireString(params.idempotencyKey, "chat send idempotency key"); await page.locator(".chat-thread").getByText(prompt).waitFor({ timeout: 10_000 }); + await gateway.emitGatewayEvent("chat", { + deltaText: "First token visible.", + message: { + content: [{ text: "First token visible.", type: "text" }], + role: "assistant", + timestamp: Date.now(), + }, + runId, + sessionKey: "main", + state: "delta", + }); + await page.getByText("First token visible.").waitFor({ timeout: 10_000 }); + await page.waitForFunction((expectedRunId) => { + const app = document.querySelector("openclaw-app") as + | (Element & { eventLogBuffer?: unknown[] }) + | null; + return (app?.eventLogBuffer ?? []).some((entry) => { + const candidate = entry as { + event?: unknown; + payload?: { phase?: unknown; runId?: unknown }; + }; + return ( + candidate.event === "control-ui.chat.send" && + candidate.payload?.phase === "first-assistant-visible" && + candidate.payload.runId === expectedRunId + ); + }); + }, runId); + const firstOutputEvents = await controlUiEventPayloads(page, "control-ui.chat.send"); + expect( + firstOutputEvents.some( + (payload) => payload.phase === "first-assistant-visible" && payload.runId === runId, + ), + ).toBe(true); await gateway.resolveDeferred("chat.history", { messages: [], sessionId: "control-ui-e2e-session",