diff --git a/docs/gateway/opentelemetry.md b/docs/gateway/opentelemetry.md index c5b6d97c10a3..887f72f21780 100644 --- a/docs/gateway/opentelemetry.md +++ b/docs/gateway/opentelemetry.md @@ -190,7 +190,7 @@ message bodies are also approved for export. - `gen_ai.client.operation.duration` (histogram, seconds, GenAI semantic-conventions metric, attrs: `gen_ai.provider.name`, `gen_ai.operation.name`, `gen_ai.request.model`, optional `error.type`) - `openclaw.model_call.duration_ms` (histogram, attrs: `openclaw.provider`, `openclaw.model`, `openclaw.api`, `openclaw.transport`, plus `openclaw.errorCategory` and `openclaw.failureKind` on classified errors) - `openclaw.model_call.request_bytes` (histogram, UTF-8 byte size of the final model request payload; no raw payload content) -- `openclaw.model_call.response_bytes` (histogram, UTF-8 byte size of streamed model response events; no raw response content) +- `openclaw.model_call.response_bytes` (histogram, UTF-8 byte size of streamed model response events excluding accumulated `partial` snapshots on delta events; no raw response content) - `openclaw.model_call.time_to_first_byte_ms` (histogram, elapsed time before the first streamed response event) - `openclaw.model.failover` (counter, attrs: `openclaw.provider`, `openclaw.model`, `openclaw.failover.to_provider`, `openclaw.failover.to_model`, `openclaw.failover.reason`, `openclaw.failover.suspended`, `openclaw.lane`) - `openclaw.skill.used` (counter, attrs: `openclaw.skill.name`, `openclaw.skill.source`, `openclaw.skill.activation`, optional `openclaw.agent`, optional `openclaw.toolName`) diff --git a/docs/logging.md b/docs/logging.md index 6309589829a1..e7a88fcc57cf 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -233,7 +233,7 @@ Model-call diagnostics record bounded request/response measurements without capturing raw prompt or response content: - `requestPayloadBytes`: UTF-8 byte size of the final model request payload -- `responseStreamBytes`: UTF-8 byte size of streamed model response events +- `responseStreamBytes`: UTF-8 byte size of streamed model response events, excluding accumulated `partial` snapshots on delta events - `timeToFirstByteMs`: elapsed time before the first streamed response event - `durationMs`: total model-call duration diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index a5c2b6b00f76..966f8c347b14 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -1450,7 +1450,8 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { "openclaw.model_call.response_bytes", { unit: "By", - description: "UTF-8 byte size of streamed model response events", + description: + "UTF-8 byte size of streamed model response events excluding accumulated partial snapshots", }, ); const modelCallTimeToFirstByteHistogram = meter.createHistogram( diff --git a/src/agents/embedded-agent-runner/run/attempt.model-diagnostic-events.test.ts b/src/agents/embedded-agent-runner/run/attempt.model-diagnostic-events.test.ts index 0fc79dcebd15..a6e980b58afb 100644 --- a/src/agents/embedded-agent-runner/run/attempt.model-diagnostic-events.test.ts +++ b/src/agents/embedded-agent-runner/run/attempt.model-diagnostic-events.test.ts @@ -346,6 +346,110 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => { expect(JSON.stringify(events)).not.toContain("sk-original-secret"); }); + it("counts text deltas without serializing full partial snapshots", async () => { + const serializedPartial = vi.fn(() => { + throw new Error("partial snapshot should not be serialized for text deltas"); + }); + async function* stream() { + yield { + type: "text_delta", + contentIndex: 0, + delta: "a", + partial: { + toJSON: serializedPartial, + role: "assistant", + content: [{ type: "text", text: "a".repeat(200_000) }], + }, + }; + yield { + type: "text_delta", + contentIndex: 0, + delta: "bc", + partial: { + toJSON: serializedPartial, + role: "assistant", + content: [{ type: "text", text: "abc".repeat(200_000) }], + }, + }; + } + const wrapped = wrapStreamFnWithDiagnosticModelCallEvents( + (() => stream()) as unknown as StreamFn, + { + runId: "run-1", + provider: "openai", + model: "gpt-5.4", + trace: createDiagnosticTraceContext(), + nextCallId: () => "call-delta-bytes", + }, + ); + + const events = await collectModelCallEvents(async () => { + await drain(wrapped({} as never, {} as never, {} as never) as AsyncIterable); + }); + + const completedEvent = getEvent(events, 1); + expect(completedEvent.type).toBe("model.call.completed"); + expect(completedEvent.responseStreamBytes).toBe( + Buffer.byteLength( + JSON.stringify({ type: "text_delta", contentIndex: 0, delta: "a" }), + "utf8", + ) + + Buffer.byteLength( + JSON.stringify({ type: "text_delta", contentIndex: 0, delta: "bc" }), + "utf8", + ), + ); + expect(serializedPartial).not.toHaveBeenCalled(); + }); + + it("keeps streams alive when diagnostic byte inspection cannot read a chunk", async () => { + const opaqueChunk = new Proxy( + {}, + { + get(_target, property) { + if (property === "then") { + return undefined; + } + throw new Error("chunk should not be inspected"); + }, + }, + ); + async function* stream() { + yield opaqueChunk; + yield { type: "text_delta", delta: "ok" }; + } + const wrapped = wrapStreamFnWithDiagnosticModelCallEvents( + (() => stream()) as unknown as StreamFn, + { + runId: "run-1", + provider: "openai", + model: "gpt-5.4", + trace: createDiagnosticTraceContext(), + nextCallId: () => "call-opaque-chunk", + }, + ); + + const chunks: unknown[] = []; + const events = await collectModelCallEvents(async () => { + for await (const chunk of wrapped( + {} as never, + {} as never, + {} as never, + ) as AsyncIterable) { + chunks.push(chunk); + } + }); + + expect(chunks).toHaveLength(2); + expect(chunks[0]).toBe(opaqueChunk); + expect(chunks[1]).toEqual({ type: "text_delta", delta: "ok" }); + const completedEvent = getEvent(events, 1); + expect(completedEvent.type).toBe("model.call.completed"); + expect(completedEvent.responseStreamBytes).toBe( + Buffer.byteLength(JSON.stringify({ type: "text_delta", delta: "ok" }), "utf8"), + ); + }); + it("captures model input, tools, and output only when content capture is enabled", async () => { const assistant = { role: "assistant", diff --git a/src/agents/embedded-agent-runner/run/attempt.model-diagnostic-events.ts b/src/agents/embedded-agent-runner/run/attempt.model-diagnostic-events.ts index 1331fb6624e6..0b08a2a7b1cb 100644 --- a/src/agents/embedded-agent-runner/run/attempt.model-diagnostic-events.ts +++ b/src/agents/embedded-agent-runner/run/attempt.model-diagnostic-events.ts @@ -106,6 +106,27 @@ function isRecord(value: unknown): value is Record { return Boolean(value) && typeof value === "object" && !Array.isArray(value); } +function responseStreamChunkByteLengthUnchecked(chunk: unknown): number | undefined { + if (!isRecord(chunk)) { + return utf8JsonByteLength(chunk); + } + if (!("partial" in chunk)) { + return utf8JsonByteLength(chunk); + } + // Plain stream deltas can carry an accumulated partial snapshot. Byte metrics + // count the new stream event shape, not the answer-so-far replay. + const { partial: _partial, ...snapshotlessChunk } = chunk; + return utf8JsonByteLength(snapshotlessChunk); +} + +function responseStreamChunkByteLength(chunk: unknown): number | undefined { + try { + return responseStreamChunkByteLengthUnchecked(chunk); + } catch { + return undefined; + } +} + function cloneDiagnosticContentValue(value: unknown): unknown { try { return structuredClone(value); @@ -158,7 +179,7 @@ function observeResponseChunk( ): void { state.timeToFirstByteMs ??= Math.max(0, Date.now() - startedAt); observeOutputMessageContent(state, chunk); - const bytes = utf8JsonByteLength(chunk); + const bytes = responseStreamChunkByteLength(chunk); if (bytes !== undefined) { state.responseStreamBytes += bytes; }