Refactor realtime voice turn context tracking (#86650)

* refactor: share realtime turn context tracking

* chore: track realtime voice sdk api baseline

* fix: preserve pruned realtime turn handle state
This commit is contained in:
Peter Steinberger
2026-05-25 23:13:27 +01:00
committed by GitHub
parent 48adcb162c
commit fda0141a01
9 changed files with 369 additions and 96 deletions

View File

@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
### Changes
- Voice: expose shared realtime turn-context tracking through the realtime voice SDK and reuse it for Discord speaker attribution and wake-name context recovery.
- Voice: expose shared realtime consult question matching, speakable-result extraction, and forced-consult coordination through the realtime voice SDK, then reuse it in Gateway Talk, Voice Call, and Discord voice paths.
- Voice: share activation-name matching and consult-transcript screening through the realtime voice SDK so Discord, browser voice, and meeting surfaces can reuse one implementation.
- Cron: default `cron.maxConcurrentRuns` to 8 so scheduled automations and their isolated agent turns can make progress in parallel without explicit configuration.

View File

@@ -1,2 +1,2 @@
374f1fec7d6fa8c00865dcb58b68d89ec10e85e81ef536c5746167a83d10bcc7 plugin-sdk-api-baseline.json
ffc6a2faf381d1bb118845e010b2798397c3d41fff400f52ee57b6dc197c8af3 plugin-sdk-api-baseline.jsonl
88b179d6ae301fdfbb5104533b66cdac629a76c1afb33faa1548162cee22dc79 plugin-sdk-api-baseline.json
30c981ed0987cc72335f3f16aff264e9a0f1c903b1cc03f347fa97db9468366e plugin-sdk-api-baseline.jsonl

View File

@@ -626,7 +626,7 @@ releases.
| `plugin-sdk/speech` | Speech helpers | Speech provider types plus provider-facing directive, registry, validation helpers, and OpenAI-compatible TTS builder |
| `plugin-sdk/speech-core` | Shared speech core | Speech provider types, registry, directives, normalization |
| `plugin-sdk/realtime-transcription` | Realtime transcription helpers | Provider types, registry helpers, and shared WebSocket session helper |
| `plugin-sdk/realtime-voice` | Realtime voice helpers | Provider types, registry/resolution helpers, bridge session helpers, shared agent talk-back queues, active-run voice control, transcript/event health, echo suppression, consult question matching, forced-consult coordination, and fast context consult helpers |
| `plugin-sdk/realtime-voice` | Realtime voice helpers | Provider types, registry/resolution helpers, bridge session helpers, shared agent talk-back queues, active-run voice control, transcript/event health, echo suppression, consult question matching, forced-consult coordination, turn-context tracking, and fast context consult helpers |
| `plugin-sdk/image-generation` | Image-generation helpers | Image generation provider types plus image asset/data URL helpers and the OpenAI-compatible image provider builder |
| `plugin-sdk/image-generation-core` | Shared image-generation core | Image-generation types, failover, auth, and registry helpers |
| `plugin-sdk/music-generation` | Music-generation helpers | Music-generation provider/request/result types |

View File

@@ -331,7 +331,7 @@ focused channel/runtime subpaths, `config-contracts`, `string-coerce-runtime`,
| `plugin-sdk/speech-core` | Shared speech provider types, registry, directive, normalization, and speech helper exports |
| `plugin-sdk/realtime-transcription` | Realtime transcription provider types, registry helpers, and shared WebSocket session helper |
| `plugin-sdk/realtime-bootstrap-context` | Realtime profile bootstrap helper for bounded `IDENTITY.md`, `USER.md`, and `SOUL.md` context injection |
| `plugin-sdk/realtime-voice` | Realtime voice provider types and registry helpers |
| `plugin-sdk/realtime-voice` | Realtime voice provider types, registry helpers, and shared realtime voice behavior helpers |
| `plugin-sdk/image-generation` | Image generation provider types plus image asset/data URL helpers and the OpenAI-compatible image provider builder |
| `plugin-sdk/image-generation-core` | Shared image-generation types, failover, auth, and registry helpers |
| `plugin-sdk/music-generation` | Music generation provider/request/result types |

View File

@@ -7,6 +7,7 @@ import {
controlRealtimeVoiceAgentRun,
createRealtimeVoiceAgentTalkbackQueue,
createRealtimeVoiceBridgeSession,
createRealtimeVoiceTurnContextTracker,
matchRealtimeVoiceActivationName,
matchRealtimeVoiceConsultQuestions,
normalizeSupportedRealtimeVoiceActivationName,
@@ -26,6 +27,8 @@ import {
type RealtimeVoiceBridgeSession,
type RealtimeVoiceProviderConfig,
type RealtimeVoiceToolCallEvent,
type RealtimeVoiceTurnContextHandle,
type RealtimeVoiceTurnContextTracker,
sortRealtimeVoiceActivationNames,
type RealtimeVoiceActivationNameTranscriptResult,
} from "openclaw/plugin-sdk/realtime-voice";
@@ -94,18 +97,18 @@ type DiscordRealtimeSpeakerContext = VoiceRealtimeSpeakerContext & { userId: str
type DiscordRealtimeVoiceConfig = NonNullable<DiscordAccountConfig["voice"]>["realtime"];
type PendingSpeakerTurn = {
context: DiscordRealtimeSpeakerContext;
hasAudio: boolean;
type PendingSpeakerTurnStats = {
inputDiscordBytes: number;
inputRealtimeBytes: number;
inputChunks: number;
interruptedPlayback: boolean;
closed: boolean;
startedAt: number;
lastAudioAt?: number;
};
type PendingSpeakerTurn = RealtimeVoiceTurnContextHandle<
DiscordRealtimeSpeakerContext,
PendingSpeakerTurnStats
>;
type PendingAgentProxyConsultContext = {
context: DiscordRealtimeSpeakerContext;
question: string;
@@ -126,11 +129,6 @@ type RecentAgentProxyConsultContext = {
result?: RecentAgentProxyConsultResult;
};
type RecentIgnoredWakeNameSpeakerContext = {
context: DiscordRealtimeSpeakerContext;
createdAt: number;
};
function formatRealtimeLogPreview(text: string): string {
const oneLine = text.replace(/\s+/g, " ").trim();
if (oneLine.length <= DISCORD_REALTIME_LOG_PREVIEW_CHARS) {
@@ -366,8 +364,15 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
private wakeNames: string[] = [];
private pendingAgentProxyConsultContexts: PendingAgentProxyConsultContext[] = [];
private recentAgentProxyConsultContexts: RecentAgentProxyConsultContext[] = [];
private recentIgnoredWakeNameSpeakerContext: RecentIgnoredWakeNameSpeakerContext | undefined;
private readonly pendingSpeakerTurns: PendingSpeakerTurn[] = [];
private readonly speakerTurns: RealtimeVoiceTurnContextTracker<
DiscordRealtimeSpeakerContext,
PendingSpeakerTurnStats
> = createRealtimeVoiceTurnContextTracker<DiscordRealtimeSpeakerContext, PendingSpeakerTurnStats>(
{
limit: DISCORD_REALTIME_PENDING_SPEAKER_CONTEXT_LIMIT,
ignoredContextTtlMs: DISCORD_REALTIME_IGNORED_WAKE_NAME_CONTEXT_TTL_MS,
},
);
private outputAudioTimestampMs = 0;
private outputAudioDiscordBytes = 0;
private outputAudioRealtimeBytes = 0;
@@ -573,8 +578,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
this.clearForcedConsultTimers();
this.pendingAgentProxyConsultContexts = [];
this.recentAgentProxyConsultContexts = [];
this.recentIgnoredWakeNameSpeakerContext = undefined;
this.pendingSpeakerTurns.length = 0;
this.speakerTurns.clear();
this.queuedExactSpeechMessages = [];
this.exactSpeechResponseActive = false;
this.exactSpeechAudioStarted = false;
@@ -613,29 +617,25 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
beginSpeakerTurn(context: VoiceRealtimeSpeakerContext, userId: string): VoiceRealtimeSpeakerTurn {
this.resetPartialWakeNameTracking();
const turn: PendingSpeakerTurn = {
context: { ...context, userId },
hasAudio: false,
inputDiscordBytes: 0,
inputRealtimeBytes: 0,
inputChunks: 0,
interruptedPlayback: false,
closed: false,
startedAt: Date.now(),
};
this.pendingSpeakerTurns.push(turn);
logger.info(
`discord voice: realtime speaker turn opened guild=${this.params.entry.guildId} channel=${this.params.entry.channelId} user=${userId} speaker=${context.speakerLabel} owner=${context.senderIsOwner} pendingTurns=${this.pendingSpeakerTurns.length}`,
const turn = this.speakerTurns.open(
{ ...context, userId },
{
inputDiscordBytes: 0,
inputRealtimeBytes: 0,
inputChunks: 0,
interruptedPlayback: false,
},
);
logger.info(
`discord voice: realtime speaker turn opened guild=${this.params.entry.guildId} channel=${this.params.entry.channelId} user=${userId} speaker=${context.speakerLabel} owner=${context.senderIsOwner} pendingTurns=${this.speakerTurns.size()}`,
);
this.prunePendingSpeakerTurns();
return {
sendInputAudio: (discordPcm48kStereo) =>
this.sendInputAudioForTurn(turn, discordPcm48kStereo),
close: () => {
this.sendRealtimeTrailingSilenceForTurn(turn);
this.logSpeakerTurnClosed(turn);
turn.closed = true;
this.prunePendingSpeakerTurns();
this.speakerTurns.close(turn);
},
};
}
@@ -644,13 +644,12 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
if (!this.bridge || this.stopped) {
return;
}
turn.hasAudio = true;
this.speakerTurns.markAudio(turn);
const realtimePcm = convertDiscordPcm48kStereoToRealtimePcm24kMono(discordPcm48kStereo);
if (realtimePcm.length > 0) {
turn.inputDiscordBytes += discordPcm48kStereo.length;
turn.inputRealtimeBytes += realtimePcm.length;
turn.inputChunks += 1;
turn.lastAudioAt = Date.now();
if (turn.inputChunks === 1) {
logger.info(
`discord voice: realtime input audio started guild=${this.params.entry.guildId} channel=${this.params.entry.channelId} user=${turn.context.userId} speaker=${turn.context.speakerLabel} discordBytes=${discordPcm48kStereo.length} realtimeBytes=${realtimePcm.length} outputAudioMs=${Math.floor(this.outputAudioTimestampMs)} outputActive=${this.isOutputAudioActive()}`,
@@ -1456,79 +1455,25 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession {
}
private consumePendingSpeakerContext(): DiscordRealtimeSpeakerContext | undefined {
this.prunePendingSpeakerTurns();
this.expireClosedSpeakerTurnsBeforeLaterAudio();
const index = this.pendingSpeakerTurns.findIndex((turn) => turn.hasAudio);
if (index < 0) {
return undefined;
}
const [turn] = this.pendingSpeakerTurns.splice(index, 1);
this.prunePendingSpeakerTurns();
return turn?.context;
return this.speakerTurns.consumeAudioContext();
}
private rememberIgnoredWakeNameSpeakerContext(
context: DiscordRealtimeSpeakerContext | undefined,
): void {
if (!context) {
return;
}
this.recentIgnoredWakeNameSpeakerContext = {
context,
createdAt: Date.now(),
};
this.speakerTurns.rememberIgnoredContext(context);
}
private consumeRecentIgnoredWakeNameSpeakerContext(): DiscordRealtimeSpeakerContext | undefined {
const recent = this.recentIgnoredWakeNameSpeakerContext;
this.recentIgnoredWakeNameSpeakerContext = undefined;
if (
!recent ||
Date.now() - recent.createdAt > DISCORD_REALTIME_IGNORED_WAKE_NAME_CONTEXT_TTL_MS
) {
return undefined;
}
return recent.context;
return this.speakerTurns.consumeIgnoredContext();
}
private peekPendingSpeakerTurn(): PendingSpeakerTurn | undefined {
this.prunePendingSpeakerTurns();
this.expireClosedSpeakerTurnsBeforeLaterAudio();
return this.pendingSpeakerTurns.find((turn) => turn.hasAudio);
return this.speakerTurns.peekAudioTurn();
}
private hasPendingSpeakerAudioContext(): boolean {
this.prunePendingSpeakerTurns();
this.expireClosedSpeakerTurnsBeforeLaterAudio();
return this.pendingSpeakerTurns.some((turn) => turn.hasAudio);
}
private prunePendingSpeakerTurns(): void {
for (let index = this.pendingSpeakerTurns.length - 1; index >= 0; index -= 1) {
const turn = this.pendingSpeakerTurns[index];
if (turn?.closed && !turn.hasAudio) {
this.pendingSpeakerTurns.splice(index, 1);
}
}
while (this.pendingSpeakerTurns.length > DISCORD_REALTIME_PENDING_SPEAKER_CONTEXT_LIMIT) {
const completedIndex = this.pendingSpeakerTurns.findIndex((turn) => turn.closed);
this.pendingSpeakerTurns.splice(Math.max(completedIndex, 0), 1);
}
}
private expireClosedSpeakerTurnsBeforeLaterAudio(): void {
let hasLaterAudio = false;
for (let index = this.pendingSpeakerTurns.length - 1; index >= 0; index -= 1) {
const turn = this.pendingSpeakerTurns[index];
if (!turn?.hasAudio) {
continue;
}
if (turn.closed && hasLaterAudio) {
this.pendingSpeakerTurns.splice(index, 1);
continue;
}
hasLaterAudio = true;
}
return this.speakerTurns.hasAudioContext();
}
private rememberRecentAgentProxyConsultContext(

View File

@@ -104,6 +104,9 @@ export const pluginSdkDocMetadata = {
"speech-core": {
category: "provider",
},
"realtime-voice": {
category: "provider",
},
"tts-runtime": {
category: "runtime",
},

View File

@@ -84,6 +84,12 @@ export {
type RealtimeVoiceForcedConsultNativeRecentOptions,
type RealtimeVoiceForcedConsultTimer,
} from "../talk/forced-consult-coordinator.js";
export {
createRealtimeVoiceTurnContextTracker,
type RealtimeVoiceTurnContextHandle,
type RealtimeVoiceTurnContextTracker,
type RealtimeVoiceTurnContextTrackerOptions,
} from "../talk/turn-context-tracker.js";
export {
buildRealtimeVoiceAgentConsultChatMessage,
buildRealtimeVoiceAgentConsultPolicyInstructions,

View File

@@ -0,0 +1,124 @@
import { describe, expect, it } from "vitest";
import { createRealtimeVoiceTurnContextTracker } from "./turn-context-tracker.js";
describe("realtime voice turn context tracker", () => {
it("consumes audio contexts and prunes silent closed turns", () => {
const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>();
const silent = tracker.open({ id: "silent" });
const spoken = tracker.open({ id: "spoken" });
tracker.close(silent);
tracker.markAudio(spoken);
expect(tracker.size()).toBe(1);
expect(tracker.consumeAudioContext()).toEqual({ id: "spoken" });
expect(tracker.consumeAudioContext()).toBeUndefined();
});
it("marks consumed handles closed when callers close them later", () => {
const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>();
const turn = tracker.open({ id: "speaker" });
tracker.markAudio(turn);
expect(tracker.consumeAudioContext()).toEqual({ id: "speaker" });
tracker.close(turn);
expect(turn.closed).toBe(true);
});
it("ignores handles from another tracker", () => {
const first = createRealtimeVoiceTurnContextTracker<{ id: string }>();
const second = createRealtimeVoiceTurnContextTracker<{ id: string }>();
const firstTurn = first.open({ id: "first" });
second.markAudio(firstTurn);
second.close(firstTurn);
expect(firstTurn.hasAudio).toBe(false);
expect(firstTurn.closed).toBe(false);
expect(first.consumeAudioContext()).toBeUndefined();
});
it("drops closed audio turns that are older than later audio", () => {
const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>();
const older = tracker.open({ id: "older" });
tracker.markAudio(older);
tracker.close(older);
const later = tracker.open({ id: "later" });
tracker.markAudio(later);
expect(tracker.consumeAudioContext()).toEqual({ id: "later" });
expect(tracker.consumeAudioContext()).toBeUndefined();
});
it("retains caller-owned turn stats on peeked audio turns", () => {
const tracker = createRealtimeVoiceTurnContextTracker<
{ id: string },
{ chunks: number; interruptedPlayback: boolean }
>();
const turn = tracker.open({ id: "speaker" }, { chunks: 0, interruptedPlayback: false });
tracker.markAudio(turn);
turn.chunks += 1;
expect(tracker.peekAudioTurn()).toMatchObject({
context: { id: "speaker" },
chunks: 1,
interruptedPlayback: false,
hasAudio: true,
});
});
it("bounds retained turn handles", () => {
const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>({ limit: 2 });
const first = tracker.open({ id: "first" });
tracker.markAudio(first);
tracker.close(first);
const second = tracker.open({ id: "second" });
tracker.markAudio(second);
const third = tracker.open({ id: "third" });
tracker.markAudio(third);
expect(tracker.consumeAudioContext()).toEqual({ id: "second" });
expect(tracker.consumeAudioContext()).toEqual({ id: "third" });
expect(tracker.consumeAudioContext()).toBeUndefined();
});
it("allows a zero turn limit", () => {
const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>({ limit: 0 });
const turn = tracker.open({ id: "discarded" });
tracker.markAudio(turn);
expect(tracker.size()).toBe(0);
expect(turn.hasAudio).toBe(true);
expect(tracker.consumeAudioContext()).toBeUndefined();
});
it("consumes recently ignored contexts once before the ttl expires", () => {
let now = 1_000;
const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>({
ignoredContextTtlMs: 500,
now: () => now,
});
tracker.rememberIgnoredContext({ id: "recent" });
now = 1_400;
expect(tracker.consumeIgnoredContext()).toEqual({ id: "recent" });
expect(tracker.consumeIgnoredContext()).toBeUndefined();
});
it("expires ignored contexts after the ttl", () => {
let now = 1_000;
const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>({
ignoredContextTtlMs: 500,
now: () => now,
});
tracker.rememberIgnoredContext({ id: "old" });
now = 1_501;
expect(tracker.consumeIgnoredContext()).toBeUndefined();
});
});

View File

@@ -0,0 +1,194 @@
const DEFAULT_REALTIME_VOICE_TURN_CONTEXT_LIMIT = 32;
const DEFAULT_REALTIME_VOICE_IGNORED_CONTEXT_TTL_MS = 10_000;
export type RealtimeVoiceTurnContextTrackerOptions = {
limit?: number;
ignoredContextTtlMs?: number;
now?: () => number;
};
export type RealtimeVoiceTurnContextHandle<
TContext,
TExtra extends object = Record<never, never>,
> = TExtra & {
id: string;
context: TContext;
hasAudio: boolean;
closed: boolean;
startedAt: number;
lastAudioAt?: number;
};
type RealtimeVoiceTurnContextOpenArgs<TExtra extends object> = keyof TExtra extends never
? [extra?: TExtra]
: [extra: TExtra];
export type RealtimeVoiceTurnContextTracker<
TContext,
TExtra extends object = Record<never, never>,
> = {
open(
context: TContext,
...extra: RealtimeVoiceTurnContextOpenArgs<TExtra>
): RealtimeVoiceTurnContextHandle<TContext, TExtra>;
markAudio(handle: RealtimeVoiceTurnContextHandle<TContext, TExtra>): void;
close(handle: RealtimeVoiceTurnContextHandle<TContext, TExtra>): void;
consumeAudioContext(): TContext | undefined;
peekAudioTurn(): RealtimeVoiceTurnContextHandle<TContext, TExtra> | undefined;
hasAudioContext(): boolean;
rememberIgnoredContext(context: TContext | undefined): void;
consumeIgnoredContext(): TContext | undefined;
size(): number;
clear(): void;
};
type RecentIgnoredContext<TContext> = {
context: TContext;
createdAt: number;
};
function normalizeNonNegativeInteger(value: number | undefined, fallback: number): number {
if (value === undefined || !Number.isFinite(value)) {
return fallback;
}
return Math.max(0, Math.floor(value));
}
export function createRealtimeVoiceTurnContextTracker<
TContext,
TExtra extends object = Record<never, never>,
>(
options: RealtimeVoiceTurnContextTrackerOptions = {},
): RealtimeVoiceTurnContextTracker<TContext, TExtra> {
const turns: RealtimeVoiceTurnContextHandle<TContext, TExtra>[] = [];
let recentIgnoredContext: RecentIgnoredContext<TContext> | undefined;
let nextId = 0;
const owner = Symbol("realtimeVoiceTurnContextTracker");
const now = options.now ?? Date.now;
const limit = normalizeNonNegativeInteger(
options.limit,
DEFAULT_REALTIME_VOICE_TURN_CONTEXT_LIMIT,
);
const ignoredContextTtlMs = normalizeNonNegativeInteger(
options.ignoredContextTtlMs,
DEFAULT_REALTIME_VOICE_IGNORED_CONTEXT_TTL_MS,
);
const prune = () => {
for (let index = turns.length - 1; index >= 0; index -= 1) {
const turn = turns[index];
if (turn?.closed && !turn.hasAudio) {
turns.splice(index, 1);
}
}
while (turns.length > limit) {
const completedIndex = turns.findIndex((turn) => turn.closed);
turns.splice(Math.max(completedIndex, 0), 1);
}
};
const expireClosedTurnsBeforeLaterAudio = () => {
let hasLaterAudio = false;
for (let index = turns.length - 1; index >= 0; index -= 1) {
const turn = turns[index];
if (!turn?.hasAudio) {
continue;
}
if (turn.closed && hasLaterAudio) {
turns.splice(index, 1);
continue;
}
hasLaterAudio = true;
}
};
const prepareForAudioContextRead = () => {
prune();
expireClosedTurnsBeforeLaterAudio();
};
const owns = (handle: RealtimeVoiceTurnContextHandle<TContext, TExtra>) =>
(
handle as RealtimeVoiceTurnContextHandle<TContext, TExtra> & {
[owner]?: true;
}
)[owner] === true;
return {
open(context, ...extra) {
const startedAt = now();
const handle: RealtimeVoiceTurnContextHandle<TContext, TExtra> = {
...(extra[0] ?? ({} as TExtra)),
[owner]: true,
id: `realtime-turn:${startedAt}:${++nextId}`,
context,
hasAudio: false,
closed: false,
startedAt,
};
turns.push(handle);
prune();
return handle;
},
markAudio(handle) {
if (!owns(handle)) {
return;
}
handle.hasAudio = true;
handle.lastAudioAt = now();
if (!turns.includes(handle)) {
return;
}
},
close(handle) {
if (!owns(handle)) {
return;
}
handle.closed = true;
if (!turns.includes(handle)) {
return;
}
prune();
},
consumeAudioContext() {
prepareForAudioContextRead();
const index = turns.findIndex((turn) => turn.hasAudio);
if (index < 0) {
return undefined;
}
const [turn] = turns.splice(index, 1);
prune();
return turn?.context;
},
peekAudioTurn() {
prepareForAudioContextRead();
return turns.find((turn) => turn.hasAudio);
},
hasAudioContext() {
prepareForAudioContextRead();
return turns.some((turn) => turn.hasAudio);
},
rememberIgnoredContext(context) {
if (!context) {
return;
}
recentIgnoredContext = { context, createdAt: now() };
},
consumeIgnoredContext() {
const recent = recentIgnoredContext;
recentIgnoredContext = undefined;
if (!recent || now() - recent.createdAt > ignoredContextTtlMs) {
return undefined;
}
return recent.context;
},
size() {
prune();
return turns.length;
},
clear() {
turns.length = 0;
recentIgnoredContext = undefined;
},
};
}