docs: document voice call runtime surfaces

This commit is contained in:
Peter Steinberger
2026-06-04 01:45:11 -04:00
parent 5dd026f3f7
commit d8b5e22e8b
23 changed files with 153 additions and 0 deletions

View File

@@ -1,3 +1,5 @@
// Public voice-call API barrel exposed to plugin-local modules and tests.
export {
definePluginEntry,
fetchWithSsrFGuard,

View File

@@ -1,5 +1,7 @@
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
// Lightweight CLI metadata entry for exposing the voicecall command.
export default definePluginEntry({
id: "voice-call",
name: "Voice Call",

View File

@@ -20,6 +20,9 @@ import {
} from "./src/manager/store.js";
import type { CallRecord } from "./src/types.js";
// Doctor state migration for Voice Call legacy JSONL call logs.
/** Plugin state metadata row for one migrated call record event. */
type CallRecordEventMeta = {
chunkCount: number;
byteLength: number;
@@ -27,11 +30,13 @@ type CallRecordEventMeta = {
sequence?: number;
};
/** Plugin state chunk row for one migrated call record event. */
type CallRecordEventChunk = {
index: number;
dataBase64: string;
};
/** Prepared legacy JSONL call record ready for plugin state import. */
type PreparedLegacyCallRecord = {
eventKey: string;
lineNumber: number;
@@ -39,10 +44,12 @@ type PreparedLegacyCallRecord = {
meta: CallRecordEventMeta;
};
/** Resolve home from doctor env with OS fallback. */
function resolveHome(env: NodeJS.ProcessEnv): string {
return env.HOME?.trim() || os.homedir();
}
/** Resolve config paths, including "~", against the doctor env home. */
function resolveUserPath(input: string, env: NodeJS.ProcessEnv): string {
const trimmed = input.trim();
if (!trimmed) {
@@ -54,6 +61,7 @@ function resolveUserPath(input: string, env: NodeJS.ProcessEnv): string {
return path.resolve(trimmed);
}
/** Read the configured voice-call store path from either package id. */
function getVoiceCallConfigStore(config: PluginDoctorStateMigrationParams["config"]): string {
for (const pluginId of ["voice-call", "@openclaw/voice-call"]) {
const rawConfig = config.plugins?.entries?.[pluginId]?.config;
@@ -72,6 +80,7 @@ type PluginDoctorStateMigrationParams = Parameters<
PluginDoctorStateMigration["detectLegacyState"]
>[0];
/** Resolve the voice-call store path used by legacy and plugin-state call records. */
function resolveVoiceCallStorePath(params: {
config: PluginDoctorStateMigrationParams["config"];
env: NodeJS.ProcessEnv;
@@ -83,6 +92,7 @@ function resolveVoiceCallStorePath(params: {
return path.join(resolveHome(params.env), ".openclaw", "voice-calls");
}
/** Return true when a path exists and is a file. */
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
@@ -92,10 +102,12 @@ async function fileExists(filePath: string): Promise<boolean> {
}
}
/** Build the plugin state key for one migrated event chunk. */
function buildChunkKey(eventKey: string, index: number): string {
return `${eventKey}:chunk:${String(index).padStart(4, "0")}`;
}
/** Chunk a prepared call record into bounded plugin state rows. */
function prepareChunks(call: CallRecord): {
chunks: CallRecordEventChunk[];
meta: CallRecordEventMeta;
@@ -125,6 +137,7 @@ function prepareChunks(call: CallRecord): {
};
}
/** Read and prepare legacy JSONL call records, collecting line-level warnings. */
async function readLegacyCallRecords(filePath: string): Promise<{
entries: PreparedLegacyCallRecord[];
warnings: string[];
@@ -167,6 +180,7 @@ async function readLegacyCallRecords(filePath: string): Promise<{
return { entries, warnings };
}
/** Archive the legacy JSONL source after a complete migration. */
async function archiveLegacySource(params: {
filePath: string;
changes: string[];
@@ -187,6 +201,7 @@ async function archiveLegacySource(params: {
}
}
/** Select newest missing records that fit remaining plugin state capacity. */
async function selectEntriesForImport(params: {
entries: PreparedLegacyCallRecord[];
eventStore: PluginStateKeyedStore<CallRecordEventMeta>;
@@ -217,6 +232,7 @@ async function selectEntriesForImport(params: {
return { existingEventKeys, entries: selected.toReversed() };
}
/** Import prepared legacy call records into plugin state. */
async function importLegacyCallRecords(params: {
entries: PreparedLegacyCallRecord[];
eventStore: PluginStateKeyedStore<CallRecordEventMeta>;
@@ -245,6 +261,7 @@ async function importLegacyCallRecords(params: {
return imported;
}
/** Doctor migrations owned by the voice-call plugin. */
export const stateMigrations: PluginDoctorStateMigration[] = [
{
id: "voice-call-calls-jsonl-to-plugin-state",

View File

@@ -1 +1,3 @@
// Runtime entrypoint for the voice-call plugin package.
export { createVoiceCallRuntime, type VoiceCallRuntime } from "./src/runtime.js";

View File

@@ -3,6 +3,9 @@ import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
import { isRecord } from "openclaw/plugin-sdk/string-coerce-runtime";
import { migrateVoiceCallLegacyConfigInput } from "./config-api.js";
// Setup-time entrypoint for voice-call config migrations.
/** Migrate voice-call plugin config inside the full OpenClaw config object. */
function migrateVoiceCallPluginConfig(config: OpenClawConfig): {
config: OpenClawConfig;
changes: string[];
@@ -37,6 +40,7 @@ function migrateVoiceCallPluginConfig(config: OpenClawConfig): {
};
}
/** Setup plugin entry that registers voice-call config migrations. */
export default definePluginEntry({
id: "voice-call",
name: "Voice Call Setup",

View File

@@ -1,3 +1,6 @@
// Caller allowlist helpers for provider-normalized phone numbers.
/** Normalize a phone number to digits only. */
export function normalizePhoneNumber(input?: string): string {
if (!input) {
return "";
@@ -5,6 +8,7 @@ export function normalizePhoneNumber(input?: string): string {
return input.replace(/\D/g, "");
}
/** Return true when the normalized caller exactly matches an allowlist entry. */
export function isAllowlistedCaller(
normalizedFrom: string,
allowFrom: string[] | undefined,

View File

@@ -1,14 +1,19 @@
// Bounded child-process output buffer for voice-call tunnel/process diagnostics.
const DEFAULT_MAX_OUTPUT_CHARS = 16_384;
/** Captured child output plus truncation flag. */
export type BoundedChildOutput = {
text: string;
truncated: boolean;
};
/** Create an empty bounded output buffer. */
export function emptyBoundedChildOutput(): BoundedChildOutput {
return { text: "", truncated: false };
}
/** Append output while retaining the newest maxChars and recording truncation. */
export function appendBoundedChildOutput(
current: BoundedChildOutput,
chunk: string,
@@ -24,6 +29,7 @@ export function appendBoundedChildOutput(
};
}
/** Format captured output with a truncation marker when older text was dropped. */
export function formatBoundedChildOutput(output: BoundedChildOutput): string {
return output.truncated ? `[output truncated]\n${output.text}` : output.text;
}

View File

@@ -2,8 +2,12 @@ import { asOptionalRecord, readStringField } from "openclaw/plugin-sdk/string-co
import type { VoiceCallConfig } from "./config.js";
import { VoiceCallConfigSchema } from "./config.js";
// Legacy voice-call config warnings and doctor-fix migration helpers.
/** Version where legacy voice-call config shape support is removed. */
export const VOICE_CALL_LEGACY_CONFIG_REMOVAL_VERSION = "2026.6.0";
/** One legacy config issue with the replacement path and message. */
type VoiceCallLegacyConfigIssue = {
path: string;
replacement: string;
@@ -13,11 +17,13 @@ type VoiceCallLegacyConfigIssue = {
const asObject = asOptionalRecord;
const getString = readStringField;
/** Read finite numeric config values. */
function getNumber(obj: Record<string, unknown> | undefined, key: string): number | undefined {
const value = obj?.[key];
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
/** Merge legacy provider-specific values into the canonical providers map. */
function mergeProviderConfig(
providersValue: unknown,
providerId: string,
@@ -38,6 +44,7 @@ function mergeProviderConfig(
};
}
/** Collect legacy voice-call config keys that should be migrated. */
export function collectVoiceCallLegacyConfigIssues(value: unknown): VoiceCallLegacyConfigIssue[] {
const raw = asObject(value) ?? {};
const realtime = asObject(raw.realtime);
@@ -107,6 +114,7 @@ export function collectVoiceCallLegacyConfigIssues(value: unknown): VoiceCallLeg
return issues;
}
/** Format runtime warnings for legacy voice-call config keys. */
export function formatVoiceCallLegacyConfigWarnings(params: {
value: unknown;
configPathPrefix: string;
@@ -125,6 +133,7 @@ export function formatVoiceCallLegacyConfigWarnings(params: {
];
}
/** Migrate legacy voice-call config input to the current canonical shape. */
export function migrateVoiceCallLegacyConfigInput(params: {
value: unknown;
configPathPrefix?: string;
@@ -254,10 +263,12 @@ export function migrateVoiceCallLegacyConfigInput(params: {
return { config, changes, issues };
}
/** Normalize legacy voice-call config input without returning migration metadata. */
export function normalizeVoiceCallLegacyConfigInput(value: unknown): Record<string, unknown> {
return migrateVoiceCallLegacyConfigInput({ value }).config;
}
/** Parse voice-call plugin config after applying legacy normalization. */
export function parseVoiceCallPluginConfig(value: unknown): VoiceCallConfig {
return VoiceCallConfigSchema.parse(normalizeVoiceCallLegacyConfigInput(value));
}

View File

@@ -6,9 +6,12 @@ import type { CoreConfig } from "./core-bridge.js";
import type { VoiceCallRuntime } from "./runtime.js";
import { TELEPHONY_DEFAULT_TTS_TIMEOUT_MS } from "./telephony-tts.js";
// Async operation store for gateway continue-call requests that outlive one HTTP response.
const VOICE_CALL_CONTINUE_OPERATION_BUFFER_MS = 30000;
const VOICE_CALL_CONTINUE_OPERATION_CLEANUP_MS = 5 * 60 * 1000;
/** Internal lifecycle state for one continue-call operation. */
type VoiceCallContinueOperation =
| {
operationId: string;
@@ -36,12 +39,14 @@ type VoiceCallContinueOperation =
error: string;
};
/** Payload returned immediately when a continue operation starts. */
type VoiceCallContinueOperationStartPayload = {
operationId: string;
status: "pending";
pollTimeoutMs: number;
};
/** Payload returned while polling a continue operation. */
type VoiceCallContinueOperationResultPayload =
| {
operationId: string;
@@ -59,12 +64,14 @@ type VoiceCallContinueOperationResultPayload =
error: string;
};
/** Request needed to start a continue-call operation. */
type VoiceCallContinueOperationRequest = {
rt: VoiceCallRuntime;
callId: string;
message: string;
};
/** Create a process-local operation store for gateway continue-call polling. */
export function createVoiceCallContinueOperationStore(params: {
config: VoiceCallConfig;
coreConfig: CoreConfig;
@@ -92,6 +99,7 @@ export function createVoiceCallContinueOperationStore(params: {
timer.unref?.();
};
// continueCall can wait for speech/TTS/transcript work; callers poll this in the meantime.
const start = (
request: VoiceCallContinueOperationRequest,
): VoiceCallContinueOperationStartPayload => {

View File

@@ -1,6 +1,8 @@
import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/string-coerce-runtime";
import type { EndReason } from "../../types.js";
// Shared provider status normalization and terminal-state mapping.
const TERMINAL_PROVIDER_STATUS_TO_END_REASON: Record<string, EndReason> = {
completed: "completed",
failed: "failed",
@@ -9,16 +11,19 @@ const TERMINAL_PROVIDER_STATUS_TO_END_REASON: Record<string, EndReason> = {
canceled: "hangup-bot",
};
/** Normalize provider status text, falling back to "unknown". */
export function normalizeProviderStatus(status: string | null | undefined): string {
const normalized = normalizeOptionalLowercaseString(status);
return normalized && normalized.length > 0 ? normalized : "unknown";
}
/** Map terminal provider status strings to OpenClaw end reasons. */
export function mapProviderStatusToEndReason(status: string | null | undefined): EndReason | null {
const normalized = normalizeProviderStatus(status);
return TERMINAL_PROVIDER_STATUS_TO_END_REASON[normalized] ?? null;
}
/** Return true when a provider status is terminal. */
export function isProviderStatusTerminal(status: string | null | undefined): boolean {
return mapProviderStatusToEndReason(status) !== null;
}

View File

@@ -1,5 +1,8 @@
import { fetchWithSsrFGuard } from "../../../api.js";
// Shared guarded JSON API client for voice-call providers.
/** Parameters for an SSRF-guarded provider JSON request. */
type GuardedJsonApiRequestParams = {
url: string;
method: "GET" | "POST" | "DELETE" | "PUT" | "PATCH";
@@ -11,6 +14,7 @@ type GuardedJsonApiRequestParams = {
errorPrefix: string;
};
/** Send a provider JSON request through the SSRF guard and parse bounded JSON responses. */
export async function guardedJsonApiRequest<T = unknown>(
params: GuardedJsonApiRequestParams,
): Promise<T> {

View File

@@ -1,5 +1,8 @@
import { fetchWithSsrFGuard } from "../../../api.js";
// Guarded Twilio REST API client helpers.
/** Minimal Twilio REST API error payload. */
type ParsedTwilioApiError = {
code?: number;
message?: string;
@@ -7,6 +10,7 @@ type ParsedTwilioApiError = {
const TWILIO_API_TIMEOUT_MS = 30_000;
/** Parse Twilio JSON error responses without trusting response shape. */
function parseTwilioApiError(text: string): ParsedTwilioApiError {
try {
const parsed: unknown = JSON.parse(text);
@@ -23,6 +27,7 @@ function parseTwilioApiError(text: string): ParsedTwilioApiError {
}
}
/** Error thrown for non-2xx Twilio REST API responses. */
export class TwilioApiError extends Error {
readonly httpStatus: number;
readonly responseText: string;
@@ -39,6 +44,7 @@ export class TwilioApiError extends Error {
}
}
/** POST a form-encoded Twilio REST API request through the SSRF guard. */
export async function twilioApiRequest<T = unknown>(params: {
baseUrl: string;
accountSid: string;

View File

@@ -1,6 +1,9 @@
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
import type { WebhookContext } from "../../types.js";
// Twilio webhook policy for deciding whether to stream, pause, queue, or serve stored TwiML.
/** Normalized Twilio webhook request fields used by TwiML policy. */
type TwimlRequestView = {
callStatus: string | null;
direction: string | null;
@@ -9,6 +12,7 @@ type TwimlRequestView = {
callIdFromQuery?: string;
};
/** Full TwiML policy input including manager/runtime state. */
type TwimlPolicyInput = TwimlRequestView & {
hasStoredTwiml: boolean;
isNotifyCall: boolean;
@@ -16,6 +20,7 @@ type TwimlPolicyInput = TwimlRequestView & {
canStream: boolean;
};
/** TwiML response decision plus side effects the caller should apply. */
type TwimlDecision =
| {
kind: "empty" | "pause" | "queue";
@@ -33,10 +38,12 @@ type TwimlDecision =
activateStreamCallSid?: string;
};
/** Return true for Twilio outbound call directions. */
function isOutboundDirection(direction: string | null): boolean {
return direction?.startsWith("outbound") ?? false;
}
/** Read the Twilio request fields needed by TwiML decision logic. */
export function readTwimlRequestView(ctx: WebhookContext): TwimlRequestView {
const params = new URLSearchParams(ctx.rawBody);
const type = normalizeOptionalString(ctx.query?.type);
@@ -51,6 +58,7 @@ export function readTwimlRequestView(ctx: WebhookContext): TwimlRequestView {
};
}
/** Decide the TwiML response kind for a Twilio webhook request. */
export function decideTwimlResponse(input: TwimlPolicyInput): TwimlDecision {
if (input.callIdFromQuery && !input.isStatusCallback) {
if (input.hasStoredTwiml) {

View File

@@ -2,6 +2,9 @@ import type { WebhookContext, WebhookVerificationResult } from "../../types.js";
import { verifyTwilioWebhook } from "../../webhook-security.js";
import type { TwilioProviderOptions } from "../twilio.types.js";
// Twilio-specific webhook verification adapter.
/** Verify a Twilio webhook and map SDK verification details to provider result fields. */
export function verifyTwilioProviderWebhook(params: {
ctx: WebhookContext;
authToken: string;

View File

@@ -1,3 +1,5 @@
// Realtime transcription provider facade for the voice-call plugin runtime.
export {
getRealtimeTranscriptionProvider,
listRealtimeTranscriptionProviders,

View File

@@ -1,3 +1,5 @@
// Realtime voice provider facade for the voice-call plugin runtime.
export {
getRealtimeVoiceProvider,
listRealtimeVoiceProviders,

View File

@@ -1,5 +1,8 @@
import { createPluginRuntimeStore, type PluginRuntime } from "openclaw/plugin-sdk/runtime-store";
// Process-local runtime store used by voice-call persistence helpers.
/** Runtime subset needed by voice-call state persistence. */
export type VoiceCallStateRuntime = Pick<PluginRuntime, "state">;
const {

View File

@@ -1,6 +1,9 @@
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
import type { VoiceCallTtsConfig } from "./config.js";
// Resolves preferred voice settings from configured TTS provider blocks.
/** Read voice setting aliases from one provider-specific config block. */
function resolveProviderVoiceSetting(providerConfig: unknown): string | undefined {
if (!providerConfig || typeof providerConfig !== "object") {
return undefined;
@@ -19,6 +22,7 @@ function resolveProviderVoiceSetting(providerConfig: unknown): string | undefine
);
}
/** Resolve the active provider's preferred voice id/name from voice-call TTS config. */
export function resolvePreferredTtsVoice(config: { tts?: VoiceCallTtsConfig }): string | undefined {
const providerId = config.tts?.provider;
if (!providerId) {

View File

@@ -1,6 +1,9 @@
import os from "node:os";
import path from "node:path";
// Small path helpers shared by voice-call setup and runtime flows.
/** Resolve user input paths, including "~" against the current OS home. */
export function resolveUserPath(input: string): string {
const trimmed = input.trim();
if (!trimmed) {

View File

@@ -1,5 +1,8 @@
import { isBlockedHostnameOrIp } from "../api.js";
// Webhook exposure checks for providers that must reach local voice-call webhooks.
/** Minimal config needed to evaluate webhook exposure. */
type VoiceCallWebhookExposureConfig = {
provider?: string;
publicUrl?: string;
@@ -11,20 +14,24 @@ type VoiceCallWebhookExposureConfig = {
};
};
/** Result of checking whether webhooks are reachable for the selected provider. */
type VoiceCallWebhookExposureStatus = {
ok: boolean;
configured: boolean;
message: string;
};
/** Return true when a provider requires a public webhook URL or tunnel. */
export function providerRequiresPublicWebhook(providerName: string | undefined): boolean {
return providerName === "twilio" || providerName === "telnyx" || providerName === "plivo";
}
/** Return true for localhost, private, or otherwise provider-unreachable hosts. */
export function isLocalOnlyWebhookHost(hostname: string): boolean {
return isBlockedHostnameOrIp(hostname);
}
/** Return true when a webhook URL parses to a local/private host. */
export function isProviderUnreachableWebhookUrl(webhookUrl: string): boolean {
try {
const parsed = new URL(webhookUrl);
@@ -34,6 +41,7 @@ export function isProviderUnreachableWebhookUrl(webhookUrl: string): boolean {
}
}
/** Resolve a human-readable webhook exposure status for doctor/setup surfaces. */
export function resolveWebhookExposureStatus(
config: VoiceCallWebhookExposureConfig,
): VoiceCallWebhookExposureStatus {

View File

@@ -1,3 +1,5 @@
// Realtime telephony audio pacing and speech-start detection for mulaw streams.
const TELEPHONY_SAMPLE_RATE = 8_000;
const TELEPHONY_CHUNK_BYTES = 160;
const TELEPHONY_CHUNK_MS = 20;
@@ -12,6 +14,7 @@ for (let i = 0; i < MULAW_LINEAR_SAMPLES.length; i += 1) {
MULAW_LINEAR_SAMPLES[i] = decodeMulawSample(i);
}
/** Queue item sent over the realtime provider media stream. */
type RealtimeAudioQueueItem =
| {
chunk: Buffer;
@@ -23,14 +26,17 @@ type RealtimeAudioQueueItem =
type: "mark";
};
/** WebSocket send callback for realtime audio frames. */
export type RealtimeAudioSend = (message: string) => boolean;
/** Provider-specific serializer for media, clear, and mark frames. */
export interface RealtimeAudioSerializer {
media(payloadBase64: string): string;
clear(): string;
mark(name: string): string;
}
/** Paces outgoing mulaw audio frames at telephony cadence. */
export class RealtimeAudioPacer {
private queue: RealtimeAudioQueueItem[] = [];
private timer: ReturnType<typeof setTimeout> | null = null;
@@ -46,6 +52,7 @@ export class RealtimeAudioPacer {
},
) {}
/** Queue mulaw audio and split it into 20ms-ish telephony chunks. */
sendAudio(muLaw: Buffer): void {
if (this.closed || muLaw.length === 0) {
return;
@@ -67,6 +74,7 @@ export class RealtimeAudioPacer {
this.ensurePump();
}
/** Queue a provider mark frame after prior audio frames. */
sendMark(name: string): void {
if (this.closed || !name) {
return;
@@ -75,6 +83,7 @@ export class RealtimeAudioPacer {
this.ensurePump();
}
/** Clear queued audio and notify the provider stream. */
clearAudio(): number {
if (this.closed) {
return 0;
@@ -87,6 +96,7 @@ export class RealtimeAudioPacer {
return clearedAudioBytes;
}
/** Stop sending and discard queued frames. */
close(): void {
this.closed = true;
this.clearTimer();
@@ -94,6 +104,7 @@ export class RealtimeAudioPacer {
this.queuedAudioBytes = 0;
}
/** Clear the scheduled pump timer. */
private clearTimer(): void {
if (!this.timer) {
return;
@@ -102,17 +113,20 @@ export class RealtimeAudioPacer {
this.timer = null;
}
/** Start the pump when queued work exists and no timer is active. */
private ensurePump(): void {
if (!this.timer) {
this.pump();
}
}
/** Close the pacer and notify the caller about queued-audio backpressure. */
private failBackpressure(): void {
this.close();
this.params.onBackpressure?.();
}
/** Send one queued item and schedule the next send based on audio duration. */
private pump(): void {
this.timer = null;
if (this.closed) {
@@ -144,6 +158,7 @@ export class RealtimeAudioPacer {
}
}
/** Calculate normalized RMS from mulaw bytes. */
export function calculateMulawRms(muLaw: Buffer): number {
if (muLaw.length === 0) {
return 0;
@@ -156,6 +171,7 @@ export function calculateMulawRms(muLaw: Buffer): number {
return Math.sqrt(sum / muLaw.length);
}
/** Detect likely speech start from consecutive loud mulaw chunks. */
export class RealtimeMulawSpeechStartDetector {
private loudChunks = 0;
private quietChunks = DEFAULT_REQUIRED_QUIET_CHUNKS;
@@ -169,6 +185,7 @@ export class RealtimeMulawSpeechStartDetector {
} = {},
) {}
/** Accept one mulaw chunk and return true only on transition into speaking. */
accept(muLaw: Buffer): boolean {
const rms = calculateMulawRms(muLaw);
const threshold = this.params.rmsThreshold ?? DEFAULT_SPEECH_RMS_THRESHOLD;
@@ -193,6 +210,7 @@ export class RealtimeMulawSpeechStartDetector {
}
}
/** Decode one G.711 mulaw byte to a linear PCM sample. */
function decodeMulawSample(value: number): number {
const muLaw = ~value & 0xff;
const sign = muLaw & 0x80;

View File

@@ -1,8 +1,11 @@
import type { CallManager } from "../manager.js";
import { TerminalStates } from "../types.js";
// Background cleanup loop for calls that never reached answered/terminal state.
const CHECK_INTERVAL_MS = 30_000;
/** Start a stale-call reaper and return its cleanup callback. */
export function startStaleCallReaper(params: {
manager: CallManager;
staleCallReaperSeconds?: number;
@@ -20,6 +23,7 @@ export function startStaleCallReaper(params: {
continue;
}
// Unanswered provider calls can be stranded when callbacks are missed; end them explicitly.
const age = now - call.startedAt;
if (age > maxAgeMs) {
console.log(

View File

@@ -1,3 +1,6 @@
// Provider-specific media stream frame parsing and serialization.
/** Normalized inbound media stream frame. */
export type StreamFrame =
| { kind: "start"; streamId: string; providerCallId: string }
| {
@@ -11,6 +14,7 @@ export type StreamFrame =
| { kind: "error"; code?: string; title?: string; detail?: string }
| { kind: "ignored" };
/** Adapter contract for provider media stream wire formats. */
export interface StreamFrameAdapter {
readonly providerName: "twilio" | "telnyx";
parseInbound(rawMessage: string): StreamFrame;
@@ -19,6 +23,7 @@ export interface StreamFrameAdapter {
serializeMark(name: string): string;
}
/** Parse numeric timestamps sent as numbers or integer strings. */
function parseTimestampMs(value: unknown): number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
@@ -30,6 +35,7 @@ function parseTimestampMs(value: unknown): number | undefined {
return undefined;
}
/** Parse a JSON object frame, returning null for invalid or non-object payloads. */
function tryParseJson(rawMessage: string): Record<string, unknown> | null {
try {
const parsed = JSON.parse(rawMessage) as unknown;
@@ -42,6 +48,7 @@ function tryParseJson(rawMessage: string): Record<string, unknown> | null {
return null;
}
/** Read an object-valued field from a parsed frame. */
function readRecordField(
record: Record<string, unknown>,
field: string,
@@ -52,15 +59,18 @@ function readRecordField(
: undefined;
}
/** Normalize base64/base64url padding differences for validation. */
function normalizeBase64ForCompare(value: string): string {
return value.replace(/=+$/u, "").replace(/-/gu, "+").replace(/_/gu, "/");
}
/** Return true when a payload round-trips as base64. */
function isValidBase64Payload(value: string): boolean {
const buffer = Buffer.from(value, "base64");
return normalizeBase64ForCompare(buffer.toString("base64")) === normalizeBase64ForCompare(value);
}
/** Parse a common provider media frame. */
function parseMediaFrame(msg: Record<string, unknown>): StreamFrame {
const mediaData = readRecordField(msg, "media");
const payload = typeof mediaData?.payload === "string" ? mediaData.payload : undefined;
@@ -75,6 +85,7 @@ function parseMediaFrame(msg: Record<string, unknown>): StreamFrame {
};
}
/** Parse a common provider mark frame. */
function parseMarkFrame(msg: Record<string, unknown>): StreamFrame {
const markData = readRecordField(msg, "mark");
const name = typeof markData?.name === "string" ? markData.name : undefined;
@@ -87,6 +98,7 @@ type ProviderExtraFrameParser = (
msg: Record<string, unknown>,
) => StreamFrame | undefined;
/** Parse common media, mark, and stop frames shared by supported providers. */
function parseCommonInboundFrame(
event: unknown,
msg: Record<string, unknown>,
@@ -103,6 +115,7 @@ function parseCommonInboundFrame(
return undefined;
}
/** Parse one provider frame with provider-specific start/error hooks. */
function parseProviderInboundFrame(
rawMessage: string,
parseStartFrame: ProviderStartFrameParser,
@@ -121,10 +134,12 @@ function parseProviderInboundFrame(
);
}
/** Include streamSid only when Twilio has already supplied one. */
function withOptionalStreamSid(streamSid: string | undefined): Partial<{ streamSid: string }> {
return streamSid === undefined ? {} : { streamSid };
}
/** Serialize a provider media frame. */
function serializeMediaFrame(payloadBase64: string, streamSid?: string): string {
return JSON.stringify({
event: "media",
@@ -133,10 +148,12 @@ function serializeMediaFrame(payloadBase64: string, streamSid?: string): string
});
}
/** Serialize a provider clear frame. */
function serializeClearFrame(streamSid?: string): string {
return JSON.stringify({ event: "clear", ...withOptionalStreamSid(streamSid) });
}
/** Serialize a provider mark frame. */
function serializeMarkFrame(name: string, streamSid?: string): string {
return JSON.stringify({
event: "mark",
@@ -145,10 +162,12 @@ function serializeMarkFrame(name: string, streamSid?: string): string {
});
}
/** Twilio media stream adapter, retaining streamSid for outbound frames. */
export class TwilioStreamFrameAdapter implements StreamFrameAdapter {
readonly providerName = "twilio" as const;
private streamSid = "";
/** Parse one Twilio websocket message into a normalized frame. */
parseInbound(rawMessage: string): StreamFrame {
return parseProviderInboundFrame(rawMessage, (msg) => {
const startData = readRecordField(msg, "start");
@@ -162,22 +181,27 @@ export class TwilioStreamFrameAdapter implements StreamFrameAdapter {
});
}
/** Serialize Twilio media with the active streamSid. */
serializeMedia(payloadBase64: string): string {
return serializeMediaFrame(payloadBase64, this.streamSid);
}
/** Serialize Twilio clear with the active streamSid. */
serializeClear(): string {
return serializeClearFrame(this.streamSid);
}
/** Serialize Twilio mark with the active streamSid. */
serializeMark(name: string): string {
return serializeMarkFrame(name, this.streamSid);
}
}
/** Telnyx media stream adapter. */
export class TelnyxStreamFrameAdapter implements StreamFrameAdapter {
readonly providerName = "telnyx" as const;
/** Parse one Telnyx websocket message into a normalized frame. */
parseInbound(rawMessage: string): StreamFrame {
return parseProviderInboundFrame(
rawMessage,
@@ -216,14 +240,17 @@ export class TelnyxStreamFrameAdapter implements StreamFrameAdapter {
);
}
/** Serialize Telnyx media. */
serializeMedia(payloadBase64: string): string {
return serializeMediaFrame(payloadBase64);
}
/** Serialize Telnyx clear. */
serializeClear(): string {
return serializeClearFrame();
}
/** Serialize Telnyx mark. */
serializeMark(name: string): string {
return serializeMarkFrame(name);
}