fix(agents): count stream deltas incrementally

Count model stream diagnostic response bytes from snapshotless stream chunks, excluding accumulated partial snapshots on delta events. This avoids repeatedly serializing answer-so-far snapshots during streamed model calls and updates OTEL/docs wording for the new metric baseline.

Refs #86599.

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
Vincent Koc
2026-05-31 18:13:58 +01:00
committed by GitHub
parent a053ae5d65
commit 938841cff3
5 changed files with 130 additions and 4 deletions

View File

@@ -190,7 +190,7 @@ message bodies are also approved for export.
- `gen_ai.client.operation.duration` (histogram, seconds, GenAI semantic-conventions metric, attrs: `gen_ai.provider.name`, `gen_ai.operation.name`, `gen_ai.request.model`, optional `error.type`) - `gen_ai.client.operation.duration` (histogram, seconds, GenAI semantic-conventions metric, attrs: `gen_ai.provider.name`, `gen_ai.operation.name`, `gen_ai.request.model`, optional `error.type`)
- `openclaw.model_call.duration_ms` (histogram, attrs: `openclaw.provider`, `openclaw.model`, `openclaw.api`, `openclaw.transport`, plus `openclaw.errorCategory` and `openclaw.failureKind` on classified errors) - `openclaw.model_call.duration_ms` (histogram, attrs: `openclaw.provider`, `openclaw.model`, `openclaw.api`, `openclaw.transport`, plus `openclaw.errorCategory` and `openclaw.failureKind` on classified errors)
- `openclaw.model_call.request_bytes` (histogram, UTF-8 byte size of the final model request payload; no raw payload content) - `openclaw.model_call.request_bytes` (histogram, UTF-8 byte size of the final model request payload; no raw payload content)
- `openclaw.model_call.response_bytes` (histogram, UTF-8 byte size of streamed model response events; no raw response content) - `openclaw.model_call.response_bytes` (histogram, UTF-8 byte size of streamed model response events excluding accumulated `partial` snapshots on delta events; no raw response content)
- `openclaw.model_call.time_to_first_byte_ms` (histogram, elapsed time before the first streamed response event) - `openclaw.model_call.time_to_first_byte_ms` (histogram, elapsed time before the first streamed response event)
- `openclaw.model.failover` (counter, attrs: `openclaw.provider`, `openclaw.model`, `openclaw.failover.to_provider`, `openclaw.failover.to_model`, `openclaw.failover.reason`, `openclaw.failover.suspended`, `openclaw.lane`) - `openclaw.model.failover` (counter, attrs: `openclaw.provider`, `openclaw.model`, `openclaw.failover.to_provider`, `openclaw.failover.to_model`, `openclaw.failover.reason`, `openclaw.failover.suspended`, `openclaw.lane`)
- `openclaw.skill.used` (counter, attrs: `openclaw.skill.name`, `openclaw.skill.source`, `openclaw.skill.activation`, optional `openclaw.agent`, optional `openclaw.toolName`) - `openclaw.skill.used` (counter, attrs: `openclaw.skill.name`, `openclaw.skill.source`, `openclaw.skill.activation`, optional `openclaw.agent`, optional `openclaw.toolName`)

View File

@@ -233,7 +233,7 @@ Model-call diagnostics record bounded request/response measurements without
capturing raw prompt or response content: capturing raw prompt or response content:
- `requestPayloadBytes`: UTF-8 byte size of the final model request payload - `requestPayloadBytes`: UTF-8 byte size of the final model request payload
- `responseStreamBytes`: UTF-8 byte size of streamed model response events - `responseStreamBytes`: UTF-8 byte size of streamed model response events, excluding accumulated `partial` snapshots on delta events
- `timeToFirstByteMs`: elapsed time before the first streamed response event - `timeToFirstByteMs`: elapsed time before the first streamed response event
- `durationMs`: total model-call duration - `durationMs`: total model-call duration

View File

@@ -1450,7 +1450,8 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
"openclaw.model_call.response_bytes", "openclaw.model_call.response_bytes",
{ {
unit: "By", unit: "By",
description: "UTF-8 byte size of streamed model response events", description:
"UTF-8 byte size of streamed model response events excluding accumulated partial snapshots",
}, },
); );
const modelCallTimeToFirstByteHistogram = meter.createHistogram( const modelCallTimeToFirstByteHistogram = meter.createHistogram(

View File

@@ -346,6 +346,110 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
expect(JSON.stringify(events)).not.toContain("sk-original-secret"); expect(JSON.stringify(events)).not.toContain("sk-original-secret");
}); });
it("counts text deltas without serializing full partial snapshots", async () => {
const serializedPartial = vi.fn(() => {
throw new Error("partial snapshot should not be serialized for text deltas");
});
async function* stream() {
yield {
type: "text_delta",
contentIndex: 0,
delta: "a",
partial: {
toJSON: serializedPartial,
role: "assistant",
content: [{ type: "text", text: "a".repeat(200_000) }],
},
};
yield {
type: "text_delta",
contentIndex: 0,
delta: "bc",
partial: {
toJSON: serializedPartial,
role: "assistant",
content: [{ type: "text", text: "abc".repeat(200_000) }],
},
};
}
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
(() => stream()) as unknown as StreamFn,
{
runId: "run-1",
provider: "openai",
model: "gpt-5.4",
trace: createDiagnosticTraceContext(),
nextCallId: () => "call-delta-bytes",
},
);
const events = await collectModelCallEvents(async () => {
await drain(wrapped({} as never, {} as never, {} as never) as AsyncIterable<unknown>);
});
const completedEvent = getEvent(events, 1);
expect(completedEvent.type).toBe("model.call.completed");
expect(completedEvent.responseStreamBytes).toBe(
Buffer.byteLength(
JSON.stringify({ type: "text_delta", contentIndex: 0, delta: "a" }),
"utf8",
) +
Buffer.byteLength(
JSON.stringify({ type: "text_delta", contentIndex: 0, delta: "bc" }),
"utf8",
),
);
expect(serializedPartial).not.toHaveBeenCalled();
});
it("keeps streams alive when diagnostic byte inspection cannot read a chunk", async () => {
const opaqueChunk = new Proxy(
{},
{
get(_target, property) {
if (property === "then") {
return undefined;
}
throw new Error("chunk should not be inspected");
},
},
);
async function* stream() {
yield opaqueChunk;
yield { type: "text_delta", delta: "ok" };
}
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
(() => stream()) as unknown as StreamFn,
{
runId: "run-1",
provider: "openai",
model: "gpt-5.4",
trace: createDiagnosticTraceContext(),
nextCallId: () => "call-opaque-chunk",
},
);
const chunks: unknown[] = [];
const events = await collectModelCallEvents(async () => {
for await (const chunk of wrapped(
{} as never,
{} as never,
{} as never,
) as AsyncIterable<unknown>) {
chunks.push(chunk);
}
});
expect(chunks).toHaveLength(2);
expect(chunks[0]).toBe(opaqueChunk);
expect(chunks[1]).toEqual({ type: "text_delta", delta: "ok" });
const completedEvent = getEvent(events, 1);
expect(completedEvent.type).toBe("model.call.completed");
expect(completedEvent.responseStreamBytes).toBe(
Buffer.byteLength(JSON.stringify({ type: "text_delta", delta: "ok" }), "utf8"),
);
});
it("captures model input, tools, and output only when content capture is enabled", async () => { it("captures model input, tools, and output only when content capture is enabled", async () => {
const assistant = { const assistant = {
role: "assistant", role: "assistant",

View File

@@ -106,6 +106,27 @@ function isRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value) && typeof value === "object" && !Array.isArray(value); return Boolean(value) && typeof value === "object" && !Array.isArray(value);
} }
function responseStreamChunkByteLengthUnchecked(chunk: unknown): number | undefined {
if (!isRecord(chunk)) {
return utf8JsonByteLength(chunk);
}
if (!("partial" in chunk)) {
return utf8JsonByteLength(chunk);
}
// Plain stream deltas can carry an accumulated partial snapshot. Byte metrics
// count the new stream event shape, not the answer-so-far replay.
const { partial: _partial, ...snapshotlessChunk } = chunk;
return utf8JsonByteLength(snapshotlessChunk);
}
function responseStreamChunkByteLength(chunk: unknown): number | undefined {
try {
return responseStreamChunkByteLengthUnchecked(chunk);
} catch {
return undefined;
}
}
function cloneDiagnosticContentValue(value: unknown): unknown { function cloneDiagnosticContentValue(value: unknown): unknown {
try { try {
return structuredClone(value); return structuredClone(value);
@@ -158,7 +179,7 @@ function observeResponseChunk(
): void { ): void {
state.timeToFirstByteMs ??= Math.max(0, Date.now() - startedAt); state.timeToFirstByteMs ??= Math.max(0, Date.now() - startedAt);
observeOutputMessageContent(state, chunk); observeOutputMessageContent(state, chunk);
const bytes = utf8JsonByteLength(chunk); const bytes = responseStreamChunkByteLength(chunk);
if (bytes !== undefined) { if (bytes !== undefined) {
state.responseStreamBytes += bytes; state.responseStreamBytes += bytes;
} }