refactor: share Google Meet audio input loop

This commit is contained in:
Vincent Koc
2026-05-30 06:25:36 +02:00
parent 7b3104fe4c
commit d13c8b03c9

View File

@@ -81,6 +81,85 @@ function normalizeGoogleMeetTtsPromptText(text: string | undefined): string | un
return trimmed;
}
function startGoogleMeetNodeAudioInputLoop(params: {
runtime: PluginRuntime;
nodeId: string;
bridgeId: string;
logger: RuntimeLogger;
logPrefix: string;
isStopped: () => boolean;
stop: () => Promise<void>;
isInputSuppressed: () => boolean;
onAudio: (audio: Buffer) => void;
}) {
let lastInputAt: string | undefined;
let lastInputBytes = 0;
let suppressedInputBytes = 0;
let lastSuppressedInputAt: string | undefined;
let consecutiveInputErrors = 0;
let lastInputError: string | undefined;
void (async () => {
for (;;) {
if (params.isStopped()) {
break;
}
try {
const raw = await params.runtime.nodes.invoke({
nodeId: params.nodeId,
command: "googlemeet.chrome",
params: { action: "pullAudio", bridgeId: params.bridgeId, timeoutMs: 250 },
timeoutMs: 2_000,
});
const result = asRecord(asRecord(raw).payload ?? raw);
consecutiveInputErrors = 0;
lastInputError = undefined;
const base64 = readString(result.base64);
if (base64) {
const audio = Buffer.from(base64, "base64");
if (params.isInputSuppressed()) {
lastSuppressedInputAt = new Date().toISOString();
suppressedInputBytes += audio.byteLength;
continue;
}
lastInputAt = new Date().toISOString();
lastInputBytes += audio.byteLength;
params.onAudio(audio);
}
if (result.closed === true) {
await params.stop();
}
} catch (error) {
if (!params.isStopped()) {
const message = formatErrorMessage(error);
consecutiveInputErrors += 1;
lastInputError = message;
params.logger.warn(
`[google-meet] ${params.logPrefix} audio input failed (${consecutiveInputErrors}/5): ${message}`,
);
if (consecutiveInputErrors >= 5 || /unknown bridgeId|bridge is not open/i.test(message)) {
await params.stop();
} else {
await new Promise((resolve) => setTimeout(resolve, 250));
}
}
}
}
})();
return {
getHealth: () => ({
audioInputActive: lastInputBytes > 0,
lastInputAt,
lastSuppressedInputAt,
lastInputBytes,
suppressedInputBytes,
consecutiveInputErrors,
lastInputError,
}),
};
}
export async function startNodeAgentAudioBridge(params: {
config: GoogleMeetConfig;
fullConfig: OpenClawConfig;
@@ -95,16 +174,10 @@ export async function startNodeAgentAudioBridge(params: {
let stopped = false;
let sttSession: RealtimeTranscriptionSession | null = null;
let realtimeReady = false;
let lastInputAt: string | undefined;
let lastOutputAt: string | undefined;
let lastInputBytes = 0;
const outputActivity = createRealtimeVoiceOutputActivityTracker();
let suppressedInputBytes = 0;
let lastSuppressedInputAt: string | undefined;
let suppressInputUntil = 0;
let lastOutputPlayableUntilMs = 0;
let consecutiveInputErrors = 0;
let lastInputError: string | undefined;
const resolved = resolveGoogleMeetRealtimeTranscriptionProvider({
config: params.config,
fullConfig: params.fullConfig,
@@ -260,53 +333,19 @@ export async function startNodeAgentAudioBridge(params: {
await sttSession.connect();
realtimeReady = true;
void (async () => {
for (;;) {
if (stopped) {
break;
}
try {
const raw = await params.runtime.nodes.invoke({
nodeId: params.nodeId,
command: "googlemeet.chrome",
params: { action: "pullAudio", bridgeId: params.bridgeId, timeoutMs: 250 },
timeoutMs: 2_000,
});
const result = asRecord(asRecord(raw).payload ?? raw);
consecutiveInputErrors = 0;
lastInputError = undefined;
const base64 = readString(result.base64);
if (base64) {
const audio = Buffer.from(base64, "base64");
if (Date.now() < suppressInputUntil) {
lastSuppressedInputAt = new Date().toISOString();
suppressedInputBytes += audio.byteLength;
continue;
}
lastInputAt = new Date().toISOString();
lastInputBytes += audio.byteLength;
sttSession?.sendAudio(convertGoogleMeetBridgeAudioForStt(audio, params.config));
}
if (result.closed === true) {
await stop();
}
} catch (error) {
if (!stopped) {
const message = formatErrorMessage(error);
consecutiveInputErrors += 1;
lastInputError = message;
params.logger.warn(
`[google-meet] node agent audio input failed (${consecutiveInputErrors}/5): ${message}`,
);
if (consecutiveInputErrors >= 5 || /unknown bridgeId|bridge is not open/i.test(message)) {
await stop();
} else {
await new Promise((resolve) => setTimeout(resolve, 250));
}
}
}
}
})();
const audioInputLoop = startGoogleMeetNodeAudioInputLoop({
runtime: params.runtime,
nodeId: params.nodeId,
bridgeId: params.bridgeId,
logger: params.logger,
logPrefix: "node agent",
isStopped: () => stopped,
stop,
isInputSuppressed: () => Date.now() < suppressInputUntil,
onAudio: (audio) => {
sttSession?.sendAudio(convertGoogleMeetBridgeAudioForStt(audio, params.config));
},
});
return {
type: "node-command-pair",
@@ -317,17 +356,11 @@ export async function startNodeAgentAudioBridge(params: {
getHealth: () => ({
providerConnected: sttSession?.isConnected() ?? false,
realtimeReady,
audioInputActive: lastInputBytes > 0,
...audioInputLoop.getHealth(),
audioOutputActive: outputActivity.isActive(),
lastInputAt,
lastOutputAt,
lastSuppressedInputAt,
lastInputBytes,
lastOutputBytes: outputActivity.snapshot().sinkAudioBytes,
suppressedInputBytes,
...getGoogleMeetRealtimeTranscriptHealth(transcript),
consecutiveInputErrors,
lastInputError,
bridgeClosed: stopped,
}),
stop,
@@ -348,17 +381,11 @@ export async function startNodeRealtimeAudioBridge(params: {
let stopped = false;
let bridge: RealtimeVoiceBridgeSession | null = null;
let realtimeReady = false;
let lastInputAt: string | undefined;
let lastOutputAt: string | undefined;
let lastClearAt: string | undefined;
let lastInputBytes = 0;
const outputActivity = createRealtimeVoiceOutputActivityTracker();
let suppressedInputBytes = 0;
let lastSuppressedInputAt: string | undefined;
let suppressInputUntil = 0;
let lastOutputPlayableUntilMs = 0;
let consecutiveInputErrors = 0;
let lastInputError: string | undefined;
let clearCount = 0;
const resolved = resolveGoogleMeetRealtimeProvider({
config: params.config,
@@ -695,58 +722,24 @@ export async function startNodeRealtimeAudioBridge(params: {
await bridge.connect();
void (async () => {
for (;;) {
if (stopped) {
break;
}
try {
const raw = await params.runtime.nodes.invoke({
nodeId: params.nodeId,
command: "googlemeet.chrome",
params: { action: "pullAudio", bridgeId: params.bridgeId, timeoutMs: 250 },
timeoutMs: 2_000,
});
const result = asRecord(asRecord(raw).payload ?? raw);
consecutiveInputErrors = 0;
lastInputError = undefined;
const base64 = readString(result.base64);
if (base64) {
const audio = Buffer.from(base64, "base64");
if (Date.now() < suppressInputUntil) {
lastSuppressedInputAt = new Date().toISOString();
suppressedInputBytes += audio.byteLength;
continue;
}
lastInputAt = new Date().toISOString();
lastInputBytes += audio.byteLength;
emitTalkEvent({
type: "input.audio.delta",
turnId: ensureTalkTurn(),
payload: { byteLength: audio.byteLength },
});
bridge?.sendAudio(audio);
}
if (result.closed === true) {
await stop();
}
} catch (error) {
if (!stopped) {
const message = formatErrorMessage(error);
consecutiveInputErrors += 1;
lastInputError = message;
params.logger.warn(
`[google-meet] node audio input failed (${consecutiveInputErrors}/5): ${message}`,
);
if (consecutiveInputErrors >= 5 || /unknown bridgeId|bridge is not open/i.test(message)) {
await stop();
} else {
await new Promise((resolve) => setTimeout(resolve, 250));
}
}
}
}
})();
const audioInputLoop = startGoogleMeetNodeAudioInputLoop({
runtime: params.runtime,
nodeId: params.nodeId,
bridgeId: params.bridgeId,
logger: params.logger,
logPrefix: "node",
isStopped: () => stopped,
stop,
isInputSuppressed: () => Date.now() < suppressInputUntil,
onAudio: (audio) => {
emitTalkEvent({
type: "input.audio.delta",
turnId: ensureTalkTurn(),
payload: { byteLength: audio.byteLength },
});
bridge?.sendAudio(audio);
},
});
return {
type: "node-command-pair",
@@ -759,20 +752,14 @@ export async function startNodeRealtimeAudioBridge(params: {
getHealth: () => ({
providerConnected: bridge?.bridge.isConnected() ?? false,
realtimeReady,
audioInputActive: lastInputBytes > 0,
...audioInputLoop.getHealth(),
audioOutputActive: outputActivity.isActive(),
lastInputAt,
lastOutputAt,
lastSuppressedInputAt,
lastClearAt,
lastInputBytes,
lastOutputBytes: outputActivity.snapshot().sinkAudioBytes,
suppressedInputBytes,
...getGoogleMeetRealtimeTranscriptHealth(transcript),
...getGoogleMeetRealtimeEventHealth(realtimeEvents),
recentTalkEvents: summarizeGoogleMeetTalkEvents(recentTalkEvents),
consecutiveInputErrors,
lastInputError,
clearCount,
bridgeClosed: stopped,
}),