feat(matrix): handle voice preflight and threads (#90415)

* feat(matrix): handle voice preflight and threads

Co-authored-by: Frank Dierolf <frank_dierolf@web.de>
Co-authored-by: marc.wilson <marcwilson@gazasrv15i5.globaladvisors.biz>

* test(matrix): satisfy ci guards

* fix(matrix): preserve thread relations on edits

* chore: annotate deprecated compatibility aliases

* fix(matrix): include poll thread roots in reads

* test(matrix): enable audio preflight qa config

* test(matrix): make voice preflight QA mention deterministic

---------

Co-authored-by: Frank Dierolf <frank_dierolf@web.de>
Co-authored-by: marc.wilson <marcwilson@gazasrv15i5.globaladvisors.biz>
This commit is contained in:
Peter Steinberger
2026-06-05 08:49:35 -07:00
committed by GitHub
parent c85b0ee3db
commit 2514980118
32 changed files with 2712 additions and 128 deletions

View File

@@ -232,6 +232,20 @@ Notes:
- Tool-progress preview updates are enabled by default when Matrix preview streaming is active. Set `streaming.preview.toolProgress: false` to keep preview edits for answer text but leave tool progress on the normal delivery path.
- Preview edits cost extra Matrix API calls. Leave `streaming: "off"` if you want the most conservative rate-limit profile.
## Voice messages
Inbound Matrix voice notes are transcribed before the room mention gate. This lets a voice note that says the bot name trigger the agent in a `requireMention: true` room, and it gives the agent the transcript instead of only an audio attachment placeholder.
Matrix uses the shared audio media provider configured under `tools.media.audio`, such as OpenAI `gpt-4o-mini-transcribe`. See [Media tools overview](/tools/media-overview) for provider setup and limits.
Behavior details:
- `m.audio` events and `m.file` events with an `audio/*` MIME type are eligible.
- In encrypted rooms, OpenClaw decrypts the attachment through the existing Matrix media path before transcription.
- The transcript is marked as machine-generated and untrusted in the agent prompt.
- The attachment is marked as already transcribed so downstream media tools do not transcribe the same voice note again.
- Set `tools.media.audio.enabled: false` to disable audio transcription globally.
## Approval metadata
Matrix native approval prompts are normal `m.room.message` events with OpenClaw-specific custom event content under `com.openclaw.approval`. Matrix permits custom event-content keys, so stock clients still render the text body while OpenClaw-aware clients can read the structured approval id, kind, state, available decisions, and exec/plugin details.

View File

@@ -56,7 +56,11 @@ export function classifyCiaoProcessError(reason: unknown): CiaoProcessErrorClass
return null;
}
/** Alternate export name for unhandled-rejection classification. */
/**
* Backward-compatible alias for unhandled-rejection classification.
*
* @deprecated Use classifyCiaoProcessError.
*/
export const classifyCiaoUnhandledRejection = classifyCiaoProcessError;
/** Return whether a ciao unhandled rejection is known and ignorable. */

View File

@@ -240,6 +240,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
limit,
before: readStringParam(params, "before"),
after: readStringParam(params, "after"),
threadId: readStringParam(params, "threadId"),
});
}

View File

@@ -64,6 +64,7 @@ function createMessagesClient(params: {
hydratedChunk?: Array<Record<string, unknown>>;
pollRoot?: Record<string, unknown>;
pollRelations?: Array<Record<string, unknown>>;
threadRelations?: Array<Record<string, unknown>>;
}) {
const doRequest = vi.fn(async () => ({
chunk: params.chunk,
@@ -72,11 +73,19 @@ function createMessagesClient(params: {
}));
const hydrateEvents = vi.fn(
async (_roomId: string, _events: Array<Record<string, unknown>>) =>
(params.hydratedChunk ?? params.chunk) as unknown,
(params.hydratedChunk ?? _events) as unknown,
);
const getEvent = vi.fn(async () => params.pollRoot ?? null);
const getRelations = vi.fn(async () => ({
events: params.pollRelations ?? [],
const getEvent = vi.fn(async (_roomId: string, eventId: string) => {
if (params.pollRoot?.event_id === eventId) {
return params.pollRoot;
}
return null;
});
const getRelations = vi.fn(async (_roomId: string, _eventId: string, relType: string) => ({
events:
relType === "m.thread"
? (params.threadRelations ?? params.pollRelations ?? [])
: (params.pollRelations ?? []),
nextBatch: null,
prevBatch: null,
}));
@@ -274,7 +283,7 @@ describe("matrix message actions", () => {
expect(result.messages).toHaveLength(1);
expectRecordFields(result.messages[0], { eventId: "$poll" });
expect(result.messages[0]?.body).toContain("[Poll]");
expect(getEvent).toHaveBeenCalledTimes(1);
expect(getEvent).toHaveBeenCalledTimes(2);
});
it("uses hydrated history events so encrypted poll entries can be read", async () => {
@@ -304,4 +313,330 @@ describe("matrix message actions", () => {
expect(result.messages).toHaveLength(1);
expect(result.messages[0]?.eventId).toBe("$poll");
});
it("filters Matrix thread events out of main-room reads", async () => {
const { client } = createMessagesClient({
chunk: [
{
event_id: "$thread-reply",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 20,
content: {
msgtype: "m.text",
body: "thread reply",
"m.relates_to": { rel_type: "m.thread", event_id: "$thread-root" },
},
},
{
event_id: "$main",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 10,
content: {
msgtype: "m.text",
body: "main room",
},
},
],
});
const result = await readMatrixMessages("room:!room:example.org", { client });
expect(result.messages.map((message) => message.eventId)).toEqual(["$main"]);
});
it("filters threaded poll roots out of main-room reads", async () => {
const threadedPollRoot = createPollStartEvent();
const threadedPollContent = threadedPollRoot.content as Record<string, unknown>;
threadedPollRoot.content = {
...threadedPollContent,
"m.relates_to": { rel_type: "m.thread", event_id: "$thread-root" },
};
const { client, getEvent } = createMessagesClient({
chunk: [createPollResponseEvent()],
pollRoot: threadedPollRoot,
pollRelations: [createPollResponseEvent()],
});
const result = await readMatrixMessages("room:!room:example.org", { client });
expect(getEvent).toHaveBeenCalledWith("!room:example.org", "$poll");
expect(result.messages).toEqual([]);
});
it("uses the thread relations endpoint and includes the thread root once", async () => {
const { client, doRequest, getEvent, getRelations } = createMessagesClient({
chunk: [],
pollRelations: [
{
event_id: "$thread-reply",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 20,
content: {
msgtype: "m.text",
body: "thread reply",
"m.relates_to": { rel_type: "m.thread", event_id: "$thread-root" },
},
},
],
pollRoot: {
event_id: "$thread-root",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 10,
content: {
msgtype: "m.text",
body: "thread root",
},
},
});
const result = await readMatrixMessages("room:!room:example.org", {
client,
threadId: "$thread-root",
limit: 5,
});
expect(doRequest).not.toHaveBeenCalled();
expect(getRelations).toHaveBeenCalledWith(
"!room:example.org",
"$thread-root",
"m.thread",
undefined,
{ dir: "b", from: undefined, limit: 4 },
);
expect(getEvent).toHaveBeenCalledWith("!room:example.org", "$thread-root");
expect(result.messages.map((message) => message.eventId)).toEqual([
"$thread-root",
"$thread-reply",
]);
});
it("includes poll snapshots from threaded reads", async () => {
const { client, getEvent, getRelations } = createMessagesClient({
chunk: [],
pollRoot: createPollStartEvent({
includeDisclosedKind: true,
maxSelections: 1,
answers: [
{ id: "a1", "m.text": "Apple" },
{ id: "a2", "m.text": "Strawberry" },
],
}),
pollRelations: [createPollResponseEvent()],
});
const result = await readMatrixMessages("room:!room:example.org", {
client,
threadId: "$thread-root",
limit: 5,
});
expect(getRelations).toHaveBeenCalledWith(
"!room:example.org",
"$thread-root",
"m.thread",
undefined,
{ dir: "b", from: undefined, limit: 5 },
);
expect(getEvent).toHaveBeenCalledWith("!room:example.org", "$poll");
expect(result.messages[0]?.body).toContain("1. Apple (1 vote)");
});
it("includes poll roots when reading the thread they start", async () => {
const { client, getEvent, getRelations } = createMessagesClient({
chunk: [],
pollRoot: createPollStartEvent({
includeDisclosedKind: true,
maxSelections: 1,
answers: [
{ id: "a1", "m.text": "Apple" },
{ id: "a2", "m.text": "Strawberry" },
],
}),
pollRelations: [createPollResponseEvent()],
threadRelations: [
{
event_id: "$thread-reply",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 20,
content: {
msgtype: "m.text",
body: "thread reply",
"m.relates_to": { rel_type: "m.thread", event_id: "$poll" },
},
},
],
});
const result = await readMatrixMessages("room:!room:example.org", {
client,
threadId: "$poll",
limit: 5,
});
expect(getEvent).toHaveBeenCalledWith("!room:example.org", "$poll");
expect(getRelations).toHaveBeenCalledWith(
"!room:example.org",
"$poll",
"m.reference",
undefined,
{
from: undefined,
},
);
expect(getRelations).toHaveBeenCalledWith("!room:example.org", "$poll", "m.thread", undefined, {
dir: "b",
from: undefined,
limit: 4,
});
expect(result.messages.map((message) => message.eventId)).toEqual(["$poll", "$thread-reply"]);
expect(result.messages[0]?.body).toContain("1. Apple (1 vote)");
});
it("does not summarize non-start poll events as thread roots", async () => {
const { client, getRelations } = createMessagesClient({
chunk: [],
pollRoot: createPollResponseEvent(),
threadRelations: [
{
event_id: "$thread-reply",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 20,
content: {
msgtype: "m.text",
body: "thread reply",
"m.relates_to": { rel_type: "m.thread", event_id: "$vote" },
},
},
],
});
const result = await readMatrixMessages("room:!room:example.org", {
client,
threadId: "$vote",
limit: 5,
});
expect(getRelations).toHaveBeenCalledWith("!room:example.org", "$vote", "m.thread", undefined, {
dir: "b",
from: undefined,
limit: 5,
});
expect(result.messages.map((message) => message.eventId)).toEqual(["$thread-reply"]);
});
it("counts the thread root toward the requested first-page limit", async () => {
const { client, doRequest, getEvent, getRelations } = createMessagesClient({
chunk: [],
pollRelations: [
{
event_id: "$thread-reply",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 20,
content: {
msgtype: "m.text",
body: "thread reply",
"m.relates_to": { rel_type: "m.thread", event_id: "$thread-root" },
},
},
],
pollRoot: {
event_id: "$thread-root",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 10,
content: {
msgtype: "m.text",
body: "thread root",
},
},
});
const result = await readMatrixMessages("room:!room:example.org", {
client,
threadId: "$thread-root",
limit: 1,
});
expect(getRelations).toHaveBeenCalledWith(
"!room:example.org",
"$thread-root",
"m.thread",
undefined,
{ dir: "b", from: undefined, limit: 1 },
);
expect(doRequest).not.toHaveBeenCalled();
expect(getEvent).toHaveBeenCalledWith("!room:example.org", "$thread-root");
expect(result.messages.map((message) => message.eventId)).toEqual(["$thread-root"]);
expect(result.nextBatch).toEqual(
expect.stringContaining("openclaw.matrix.thread-relations-start:"),
);
const next = await readMatrixMessages("room:!room:example.org", {
client,
threadId: "$thread-root",
limit: 1,
before: result.nextBatch ?? undefined,
});
expect(getRelations).toHaveBeenLastCalledWith(
"!room:example.org",
"$thread-root",
"m.thread",
undefined,
{ dir: "b", from: undefined, limit: 1 },
);
expect(next.messages.map((message) => message.eventId)).toEqual(["$thread-reply"]);
});
it("does not reserve first-page thread capacity for a redacted root", async () => {
const { client, doRequest, getEvent, getRelations } = createMessagesClient({
chunk: [],
pollRelations: [
{
event_id: "$thread-reply",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 20,
content: {
msgtype: "m.text",
body: "thread reply",
"m.relates_to": { rel_type: "m.thread", event_id: "$thread-root" },
},
},
],
pollRoot: {
event_id: "$thread-root",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: 10,
unsigned: { redacted_because: {} },
content: {},
},
});
const result = await readMatrixMessages("room:!room:example.org", {
client,
threadId: "$thread-root",
limit: 1,
});
expect(getRelations).toHaveBeenCalledWith(
"!room:example.org",
"$thread-root",
"m.thread",
undefined,
{ dir: "b", from: undefined, limit: 1 },
);
expect(doRequest).not.toHaveBeenCalled();
expect(getEvent).toHaveBeenCalledWith("!room:example.org", "$thread-root");
expect(result.messages.map((message) => message.eventId)).toEqual(["$thread-reply"]);
expect(result.nextBatch).toBeNull();
});
});

View File

@@ -1,7 +1,7 @@
// Matrix plugin module implements messages behavior.
import type { Direction } from "matrix-js-sdk/lib/models/event-timeline.js";
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
import { fetchMatrixPollMessageSummary, resolveMatrixPollRootEventId } from "../poll-summary.js";
import { isPollEventType } from "../poll-types.js";
import { isPollEventType, isPollStartType } from "../poll-types.js";
import { editMessageMatrix, sendMessageMatrix } from "../send.js";
import { withResolvedRoomAction } from "./client.js";
import { resolveMatrixActionLimit } from "./limits.js";
@@ -13,6 +13,8 @@ import {
type MatrixRawEvent,
} from "./types.js";
const MATRIX_THREAD_RELATIONS_START_CURSOR_PREFIX = "openclaw.matrix.thread-relations-start:";
export async function sendMatrixMessage(
to: string,
content: string | undefined,
@@ -77,6 +79,7 @@ export async function readMatrixMessages(
limit?: number;
before?: string;
after?: string;
threadId?: string;
} = {},
): Promise<{
messages: MatrixMessageSummary[];
@@ -85,26 +88,67 @@ export async function readMatrixMessages(
}> {
return await withResolvedRoomAction(roomId, opts, async (client, resolvedRoom) => {
const limit = resolveMatrixActionLimit(opts.limit, 20);
const token = normalizeOptionalString(opts.before) ?? normalizeOptionalString(opts.after);
const rawBefore = normalizeOptionalString(opts.before);
const rawAfter = normalizeOptionalString(opts.after);
const dir = opts.after ? "f" : "b";
// Room history is queried via the low-level endpoint for compatibility.
const res = (await client.doRequest(
"GET",
`/_matrix/client/v3/rooms/${encodeURIComponent(resolvedRoom)}/messages`,
{
dir,
limit,
from: token,
},
)) as { chunk: MatrixRawEvent[]; start?: string; end?: string };
const hydratedChunk = await client.hydrateEvents(resolvedRoom, res.chunk);
const threadId = normalizeOptionalString(opts.threadId);
const isThreadRelationsStartCursor = threadId
? isMatrixThreadRelationsStartCursor(rawBefore, threadId)
: false;
const token = isThreadRelationsStartCursor ? undefined : (rawBefore ?? rawAfter);
const includeThreadRoot = threadId !== undefined && !token && !isThreadRelationsStartCursor;
const threadRootSummary =
includeThreadRoot && threadId
? await fetchDisplayableThreadRootSummary(client, resolvedRoom, threadId)
: undefined;
const rootCountsTowardLimit = threadRootSummary !== undefined;
const rootFillsThreadPage = rootCountsTowardLimit && limit === 1;
const relationLimit = rootCountsTowardLimit ? Math.max(limit - 1, 1) : limit;
const seenPollRoots = new Set<string>();
const threadRootEventId = normalizeOptionalString(threadRootSummary?.eventId);
if (threadRootEventId) {
seenPollRoots.add(threadRootEventId);
}
const relationPage =
threadId && relationLimit > 0
? await client.getRelations(resolvedRoom, threadId, "m.thread", undefined, {
dir: dir as Direction,
from: token,
limit: relationLimit,
})
: null;
// Flat room history uses the low-level endpoint for compatibility; threaded reads use
// the SDK relations helper so encrypted rooms get the SDK's event-type translation.
const flatPage = threadId
? null
: ((await client.doRequest(
"GET",
`/_matrix/client/v3/rooms/${encodeURIComponent(resolvedRoom)}/messages`,
{
dir,
limit,
from: token,
},
)) as { chunk: MatrixRawEvent[]; start?: string; end?: string });
const hydratedChunk = await client.hydrateEvents(
resolvedRoom,
relationPage ? (rootFillsThreadPage ? [] : relationPage.events) : (flatPage?.chunk ?? []),
);
const messages: MatrixMessageSummary[] = [];
if (threadRootSummary) {
messages.push(threadRootSummary);
}
for (const event of hydratedChunk) {
if (event.unsigned?.redacted_because) {
continue;
}
if (!threadId && isMatrixThreadEvent(event)) {
continue;
}
if (event.type === EventType.RoomMessage) {
if (threadId && event.event_id === threadId) {
continue;
}
messages.push(summarizeMatrixRawEvent(event));
continue;
}
@@ -115,16 +159,103 @@ export async function readMatrixMessages(
if (!pollRootId || seenPollRoots.has(pollRootId)) {
continue;
}
if (
!threadId &&
(await isMatrixPollRootThreaded({
client,
event,
pollRootId,
resolvedRoom,
}))
) {
continue;
}
seenPollRoots.add(pollRootId);
const pollSummary = await fetchMatrixPollMessageSummary(client, resolvedRoom, event);
if (pollSummary) {
messages.push(pollSummary);
}
}
const nextBatch =
rootFillsThreadPage && threadId && relationPage?.events.length
? encodeMatrixThreadRelationsStartCursor(threadId)
: (relationPage?.nextBatch ?? flatPage?.end ?? null);
return {
messages,
nextBatch: res.end ?? null,
prevBatch: res.start ?? null,
nextBatch,
prevBatch: relationPage?.prevBatch ?? flatPage?.start ?? null,
};
});
}
function encodeMatrixThreadRelationsStartCursor(threadId: string): string {
const payload = Buffer.from(JSON.stringify({ v: 1, threadId }), "utf8").toString("base64url");
return `${MATRIX_THREAD_RELATIONS_START_CURSOR_PREFIX}${payload}`;
}
function isMatrixThreadRelationsStartCursor(raw: string | undefined, threadId: string): boolean {
if (!raw?.startsWith(MATRIX_THREAD_RELATIONS_START_CURSOR_PREFIX)) {
return false;
}
const encoded = raw.slice(MATRIX_THREAD_RELATIONS_START_CURSOR_PREFIX.length);
try {
const decoded = JSON.parse(Buffer.from(encoded, "base64url").toString("utf8")) as {
v?: unknown;
threadId?: unknown;
};
return decoded.v === 1 && decoded.threadId === threadId;
} catch {
return false;
}
}
async function fetchDisplayableThreadRootSummary(
client: MatrixActionClientOpts["client"] & NonNullable<MatrixActionClientOpts["client"]>,
resolvedRoom: string,
threadId: string,
): Promise<MatrixMessageSummary | undefined> {
const rawRootEvent = (await client
.getEvent(resolvedRoom, threadId)
.catch(() => null)) as MatrixRawEvent | null;
if (!rawRootEvent) {
return undefined;
}
const rootEvent = (await client.hydrateEvents(resolvedRoom, [rawRootEvent]))[0];
if (!rootEvent || rootEvent.unsigned?.redacted_because) {
return undefined;
}
if (rootEvent.type === EventType.RoomMessage) {
return summarizeMatrixRawEvent(rootEvent);
}
if (isPollStartType(rootEvent.type)) {
return (await fetchMatrixPollMessageSummary(client, resolvedRoom, rootEvent)) ?? undefined;
}
return undefined;
}
function isMatrixThreadEvent(event: MatrixRawEvent): boolean {
const relates = event.content?.["m.relates_to"];
if (!relates || typeof relates !== "object") {
return false;
}
return (relates as { rel_type?: unknown }).rel_type === "m.thread";
}
async function isMatrixPollRootThreaded(params: {
client: MatrixActionClientOpts["client"] & NonNullable<MatrixActionClientOpts["client"]>;
event: MatrixRawEvent;
pollRootId: string;
resolvedRoom: string;
}): Promise<boolean> {
if (isMatrixThreadEvent(params.event)) {
return true;
}
const rootEvent = (await params.client
.getEvent(params.resolvedRoom, params.pollRootId)
.catch(() => null)) as MatrixRawEvent | null;
if (!rootEvent) {
return false;
}
const hydratedRoot = (await params.client.hydrateEvents(params.resolvedRoom, [rootEvent]))[0];
return hydratedRoot ? isMatrixThreadEvent(hydratedRoot) : false;
}

View File

@@ -36,7 +36,7 @@ function formatMatrixAttachmentMarker(params: {
return params.unavailable ? `[matrix ${label} unavailable]` : `[matrix ${label}]`;
}
function isLikelyBareFilename(text: string): boolean {
export function isLikelyBareFilename(text: string): boolean {
const trimmed = text.trim();
if (!trimmed || trimmed.includes("\n") || /\s/.test(trimmed)) {
return false;

View File

@@ -0,0 +1,497 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { installMatrixMonitorTestRuntime } from "../../test-runtime.js";
import { MatrixMediaSizeLimitError } from "../media-errors.js";
import {
createMatrixHandlerTestHarness,
createMatrixRoomMessageEvent,
} from "./handler.test-helpers.js";
const { downloadMatrixMediaMock, sendDurableMessageBatchMock, transcribeFirstAudioMock } =
vi.hoisted(() => ({
downloadMatrixMediaMock: vi.fn(),
sendDurableMessageBatchMock: vi.fn(),
transcribeFirstAudioMock: vi.fn(),
}));
vi.mock("./media.js", async () => {
const actual = await vi.importActual<typeof import("./media.js")>("./media.js");
return {
...actual,
downloadMatrixMedia: (...args: unknown[]) => downloadMatrixMediaMock(...args),
};
});
vi.mock("./preflight-audio.runtime.js", () => ({
sendDurableMessageBatch: sendDurableMessageBatchMock,
transcribeFirstAudio: transcribeFirstAudioMock,
}));
function createAudioPreflightHarness(
overrides: Parameters<typeof createMatrixHandlerTestHarness>[0] = {},
) {
return createMatrixHandlerTestHarness({
isDirectMessage: true,
shouldHandleTextCommands: () => true,
resolveMarkdownTableMode: () => "code",
resolveAgentRoute: () => ({
agentId: "main",
accountId: "ops",
sessionKey: "agent:main:matrix:channel:!room:example.org",
mainSessionKey: "agent:main:main",
channel: "matrix",
matchedBy: "binding.account",
}),
resolveStorePath: () => "/tmp/openclaw-test-session.json",
readSessionUpdatedAt: () => 123,
getRoomInfo: async () => ({
name: "Audio Room",
canonicalAlias: "#audio:example.org",
altAliases: [],
}),
getMemberDisplayName: async () => "Frank",
startupMs: Date.now() - 120_000,
startupGraceMs: 60_000,
textLimit: 4000,
mediaMaxBytes: 5 * 1024 * 1024,
replyToMode: "first",
...overrides,
});
}
function createAudioEvent(content: Record<string, unknown>) {
return createMatrixRoomMessageEvent({
eventId: "$audio1",
sender: "@frank:matrix.example.org",
content: content as never,
});
}
function expectLatestInboundContext(
recordInboundSession: ReturnType<typeof createMatrixHandlerTestHarness>["recordInboundSession"],
) {
const call = vi.mocked(recordInboundSession).mock.calls.at(-1)?.[0] as
| { ctx?: Record<string, unknown> }
| undefined;
if (!call?.ctx) {
throw new Error("expected inbound session context");
}
return call.ctx;
}
describe("createMatrixRoomMessageHandler audio preflight", () => {
beforeEach(() => {
downloadMatrixMediaMock.mockReset();
sendDurableMessageBatchMock.mockReset();
transcribeFirstAudioMock.mockReset();
installMatrixMonitorTestRuntime();
});
it("transcribes inbound voice notes in DMs and surfaces the transcript as the agent body", async () => {
downloadMatrixMediaMock.mockResolvedValue({
path: "/tmp/inbound/voice.ogg",
contentType: "audio/ogg",
placeholder: "[matrix audio attachment]",
});
transcribeFirstAudioMock.mockResolvedValue("hello bot");
const { handler, recordInboundSession } = createAudioPreflightHarness();
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "voice.ogg",
url: "mxc://example/voice",
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
expect(transcribeFirstAudioMock).toHaveBeenCalledWith(
expect.objectContaining({
ctx: expect.objectContaining({
MediaPaths: ["/tmp/inbound/voice.ogg"],
MediaTypes: ["audio/ogg"],
Provider: "matrix",
Surface: "matrix",
OriginatingChannel: "matrix",
OriginatingTo: "room:!room:example.org",
AccountId: "ops",
ChatType: "direct",
SessionKey: "agent:main:matrix:channel:!room:example.org",
}),
}),
);
expect(expectLatestInboundContext(recordInboundSession)).toMatchObject({
BodyForAgent: '[Audio transcript (machine-generated, untrusted)]: "hello bot"',
MediaTranscribedIndexes: [0],
MediaPath: "/tmp/inbound/voice.ogg",
MediaType: "audio/ogg",
});
});
it("lets transcript-mentioned voice notes pass the requireMention room gate", async () => {
downloadMatrixMediaMock.mockResolvedValue({
path: "/tmp/inbound/voice.ogg",
contentType: "audio/ogg",
placeholder: "[matrix audio attachment]",
});
transcribeFirstAudioMock.mockResolvedValue("bot can you check this");
const { handler, recordInboundSession } = createAudioPreflightHarness({
isDirectMessage: false,
mentionRegexes: [/\bbot\b/i],
roomsConfig: {
"!room:example.org": { requireMention: true } as never,
},
});
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "voice.ogg",
url: "mxc://example/voice",
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1);
expect(expectLatestInboundContext(recordInboundSession)).toMatchObject({
BodyForAgent: expect.stringContaining("bot can you check this"),
WasMentioned: true,
});
});
it("keeps non-filename audio fallback text while still surfacing the transcript", async () => {
downloadMatrixMediaMock.mockResolvedValue({
path: "/tmp/inbound/voice.ogg",
contentType: "audio/ogg",
placeholder: "[matrix audio attachment]",
});
transcribeFirstAudioMock.mockResolvedValue("hello bot from fallback audio");
const { handler, recordInboundSession } = createAudioPreflightHarness();
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "Voice message",
url: "mxc://example/voice",
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
expect(expectLatestInboundContext(recordInboundSession)).toMatchObject({
BodyForAgent:
'Voice message\n[Audio transcript (machine-generated, untrusted)]: "hello bot from fallback audio"',
MediaTranscribedIndexes: [0],
});
});
it("echoes accepted preflight transcripts after the mention gate", async () => {
downloadMatrixMediaMock.mockResolvedValue({
path: "/tmp/inbound/voice.ogg",
contentType: "audio/ogg",
placeholder: "[matrix audio attachment]",
});
sendDurableMessageBatchMock.mockResolvedValue({ status: "sent", results: [] });
transcribeFirstAudioMock.mockResolvedValue("hello bot");
const { handler } = createAudioPreflightHarness({
cfg: {
channels: { matrix: { dm: { allowFrom: ["*"] } } },
tools: { media: { audio: { enabled: true, echoTranscript: true } } },
},
});
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "voice.ogg",
url: "mxc://example/voice",
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
expect(sendDurableMessageBatchMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "matrix",
to: "room:!room:example.org",
accountId: "ops",
payloads: [{ text: '📝 "hello bot"' }],
bestEffort: true,
durability: "best_effort",
}),
);
});
it("drops transcript-unmentioned voice notes in requireMention rooms", async () => {
downloadMatrixMediaMock.mockResolvedValue({
path: "/tmp/inbound/voice.ogg",
contentType: "audio/ogg",
placeholder: "[matrix audio attachment]",
});
transcribeFirstAudioMock.mockResolvedValue("hello world");
const { handler, recordInboundSession } = createAudioPreflightHarness({
isDirectMessage: false,
historyLimit: 5,
mentionRegexes: [/\bbot\b/i],
roomsConfig: {
"!room:example.org": { requireMention: true } as never,
},
});
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "voice.ogg",
url: "mxc://example/voice",
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
expect(transcribeFirstAudioMock).toHaveBeenCalledTimes(1);
expect(recordInboundSession).not.toHaveBeenCalled();
await handler(
"!room:example.org",
createMatrixRoomMessageEvent({
eventId: "$text-after-unmentioned-audio",
sender: "@frank:matrix.example.org",
content: { msgtype: "m.text", body: "bot what did I say before?" },
}),
);
const followUpContext = expectLatestInboundContext(recordInboundSession);
const history = followUpContext.InboundHistory as Array<{ body?: string }> | undefined;
expect(history?.map((entryValue) => entryValue.body)).toContain(
'[Audio transcript (machine-generated, untrusted)]: "hello world"',
);
});
it("does not preflight-download gated audio when audio transcription is disabled", async () => {
const { handler, recordInboundSession } = createAudioPreflightHarness({
cfg: {
channels: { matrix: { dm: { allowFrom: ["*"] } } },
tools: { media: { audio: { enabled: false } } },
},
isDirectMessage: false,
mentionRegexes: [/\bbot\b/i],
roomsConfig: {
"!room:example.org": { requireMention: true } as never,
},
});
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "voice.ogg",
url: "mxc://example/voice",
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
expect(downloadMatrixMediaMock).not.toHaveBeenCalled();
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
expect(recordInboundSession).not.toHaveBeenCalled();
});
it("does not hold the room history ingress queue during slow audio preflight", async () => {
let releaseDownload:
| ((media: { path: string; contentType: string; placeholder: string }) => void)
| undefined;
downloadMatrixMediaMock.mockReturnValue(
new Promise((resolve) => {
releaseDownload = resolve;
}),
);
transcribeFirstAudioMock.mockResolvedValue("bot voice request");
const { handler, recordInboundSession } = createAudioPreflightHarness({
isDirectMessage: false,
historyLimit: 5,
mentionRegexes: [/\bbot\b/i],
roomsConfig: {
"!room:example.org": { requireMention: true } as never,
},
});
const slowAudio = handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "voice.ogg",
url: "mxc://example/voice",
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
await new Promise((resolve) => {
setTimeout(resolve, 0);
});
await handler(
"!room:example.org",
createMatrixRoomMessageEvent({
eventId: "$text-after-audio",
sender: "@frank:matrix.example.org",
content: { msgtype: "m.text", body: "bot text after audio" },
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
ctx: expect.objectContaining({ BodyForAgent: "bot text after audio" }),
}),
);
releaseDownload?.({
path: "/tmp/inbound/voice.ogg",
contentType: "audio/ogg",
placeholder: "[matrix audio attachment]",
});
await slowAudio;
const voiceCall = vi
.mocked(recordInboundSession)
.mock.calls.map((call) => call[0] as { ctx?: Record<string, unknown> })
.find((call) => {
const bodyForAgent = call.ctx?.BodyForAgent;
return typeof bodyForAgent === "string" && bodyForAgent.includes("bot voice request");
});
const voiceHistory = voiceCall?.ctx?.InboundHistory as Array<{ body?: string }> | undefined;
expect(voiceHistory?.map((entry) => entry.body) ?? []).not.toContain("bot text after audio");
});
it("keeps placeholder body when transcription fails", async () => {
downloadMatrixMediaMock.mockResolvedValue({
path: "/tmp/inbound/voice.ogg",
contentType: "audio/ogg",
placeholder: "[matrix audio attachment]",
});
transcribeFirstAudioMock.mockRejectedValue(new Error("STT down"));
const { handler, recordInboundSession } = createAudioPreflightHarness();
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "voice.ogg",
url: "mxc://example/voice",
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
expect(expectLatestInboundContext(recordInboundSession)).toMatchObject({
BodyForAgent: "[matrix audio attachment]",
MediaPath: "/tmp/inbound/voice.ogg",
});
expect(
expectLatestInboundContext(recordInboundSession).MediaTranscribedIndexes,
).toBeUndefined();
});
it("does not invoke audio preflight for non-audio media", async () => {
downloadMatrixMediaMock.mockResolvedValue({
path: "/tmp/inbound/photo.jpg",
contentType: "image/jpeg",
placeholder: "[matrix image attachment]",
});
const { handler, recordInboundSession } = createAudioPreflightHarness();
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.image",
body: "photo.jpg",
url: "mxc://example/photo",
info: { mimetype: "image/jpeg", size: 12345 },
}),
);
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
expect(recordInboundSession).toHaveBeenCalled();
});
it("transcribes encrypted voice notes after existing Matrix media decryption", async () => {
downloadMatrixMediaMock.mockResolvedValue({
path: "/tmp/inbound/encrypted-voice.ogg",
contentType: "audio/ogg",
placeholder: "[matrix audio attachment]",
});
transcribeFirstAudioMock.mockResolvedValue("encrypted hello");
const { handler, recordInboundSession } = createAudioPreflightHarness();
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "voice.ogg",
file: {
url: "mxc://example/encrypted-voice",
key: { kty: "oct", key_ops: ["encrypt"], alg: "A256CTR", k: "secret", ext: true },
iv: "iv",
hashes: { sha256: "hash" },
v: "v2",
},
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
expect(downloadMatrixMediaMock).toHaveBeenCalledWith(
expect.objectContaining({
mxcUrl: "mxc://example/encrypted-voice",
file: expect.objectContaining({
url: "mxc://example/encrypted-voice",
key: expect.objectContaining({ alg: "A256CTR" }),
}),
}),
);
expect(expectLatestInboundContext(recordInboundSession)).toMatchObject({
BodyForAgent: '[Audio transcript (machine-generated, untrusted)]: "encrypted hello"',
MediaTranscribedIndexes: [0],
MediaPath: "/tmp/inbound/encrypted-voice.ogg",
});
});
it("preserves the too-large placeholder when audio download exceeds the size limit", async () => {
downloadMatrixMediaMock.mockRejectedValue(new MatrixMediaSizeLimitError());
const { handler, recordInboundSession } = createAudioPreflightHarness();
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "big-voice.ogg",
url: "mxc://example/big-voice",
info: { mimetype: "audio/ogg", size: 10 * 1024 * 1024 },
}),
);
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
expect(expectLatestInboundContext(recordInboundSession)).toMatchObject({
BodyForAgent: "[matrix audio attachment too large]",
});
expect(expectLatestInboundContext(recordInboundSession).MediaPath).toBeUndefined();
});
it("downloads audio only once across preflight and normal media handling", async () => {
downloadMatrixMediaMock.mockResolvedValue({
path: "/tmp/inbound/voice.ogg",
contentType: "audio/ogg",
placeholder: "[matrix audio attachment]",
});
transcribeFirstAudioMock.mockResolvedValue("hello bot");
const { handler } = createAudioPreflightHarness();
await handler(
"!room:example.org",
createAudioEvent({
msgtype: "m.audio",
body: "voice.ogg",
url: "mxc://example/voice",
info: { mimetype: "audio/ogg", size: 12345 },
}),
);
expect(downloadMatrixMediaMock).toHaveBeenCalledTimes(1);
});
});

View File

@@ -192,6 +192,72 @@ describe("matrix group chat history — scenario 1: basic accumulation", () => {
expect(history[0]?.body).toContain("msg A");
});
it('keeps threaded messages in parent history when threadReplies is "off"', async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
groupPolicy: "open",
isDirectMessage: false,
threadReplies: "off",
finalizeInboundContext,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
await handler(
DEFAULT_ROOM,
createMatrixRoomMessageEvent({
eventId: "$thread-plain",
content: {
msgtype: "m.text",
body: "thread plain",
"m.relates_to": { rel_type: "m.thread", event_id: "$thread-root" },
},
}),
);
await handler(
DEFAULT_ROOM,
makeRoomTriggerEvent({ eventId: "$main-trigger", body: "main trigger", ts: 2000 }),
);
expectSomeBodyContaining(inboundHistoryBodies(finalizeInboundContext, 0), "thread plain");
});
it('keeps top-level room history flat when threadReplies is "always"', async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
groupPolicy: "open",
isDirectMessage: false,
threadReplies: "always",
finalizeInboundContext,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
await handler(
DEFAULT_ROOM,
makeRoomPlainEvent({ eventId: "$top-level-plain", body: "top-level plain", ts: 1000 }),
);
await handler(
DEFAULT_ROOM,
makeRoomTriggerEvent({ eventId: "$top-level-trigger", body: "main trigger", ts: 2000 }),
);
expectSomeBodyContaining(inboundHistoryBodies(finalizeInboundContext, 0), "top-level plain");
await handler(
DEFAULT_ROOM,
makeRoomTriggerEvent({ eventId: "$top-level-trigger-2", body: "main trigger 2", ts: 3000 }),
);
expectNoBodyContaining(inboundHistoryBodies(finalizeInboundContext, 1), "top-level plain");
});
it("multi-agent: each agent has an independent watermark", async () => {
let currentAgentId = "agent_a";
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);

View File

@@ -63,6 +63,7 @@ import {
formatMatrixMediaTooLargeText,
formatMatrixMediaUnavailableText,
formatMatrixMessageText,
isLikelyBareFilename,
resolveMatrixMessageAttachment,
resolveMatrixMessageBody,
} from "../media-text.js";
@@ -90,10 +91,16 @@ import type { MatrixInboundEventDeduper } from "./inbound-dedupe.js";
import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js";
import { downloadMatrixMedia } from "./media.js";
import { resolveMentions, stripMatrixMentionPrefix } from "./mentions.js";
import {
formatMatrixAudioTranscript,
isMatrixAudioContent,
resolveMatrixPreflightAudioTranscript,
sendMatrixPreflightAudioTranscriptEcho,
} from "./preflight-audio.js";
import { deliverMatrixReplies } from "./replies.js";
import { createMatrixReplyContextResolver } from "./reply-context.js";
import { createRoomHistoryTracker } from "./room-history.js";
import type { HistoryEntry } from "./room-history.js";
import type { HistoryEntry, ReservedHistorySlot } from "./room-history.js";
import { resolveMatrixRoomConfig } from "./rooms.js";
import { resolveMatrixInboundRoute } from "./route.js";
import {
@@ -400,6 +407,46 @@ function resolveMatrixPendingHistoryText(params: {
);
}
function isMatrixAudioMediaEnabled(cfg: CoreConfig): boolean {
const tools = cfg.tools as
| {
media?: {
audio?: {
enabled?: boolean;
};
};
}
| undefined;
return tools?.media?.audio?.enabled !== false;
}
function shouldDeferMatrixAudioPreflightForRoomIngress(params: {
content: RoomMessageEventContent;
cfg: CoreConfig;
}): boolean {
if (!isMatrixAudioMediaEnabled(params.cfg)) {
return false;
}
const content = params.content;
const contentUrl = "url" in content && typeof content.url === "string" ? content.url : undefined;
const contentFile =
"file" in content && content.file && typeof content.file === "object"
? content.file
: undefined;
const mediaUrl = contentUrl ?? contentFile?.url;
const contentInfo =
"info" in content && content.info && typeof content.info === "object"
? (content.info as { mimetype?: string })
: undefined;
return (
mediaUrl?.startsWith("mxc://") === true &&
isMatrixAudioContent({
msgtype: typeof content.msgtype === "string" ? content.msgtype : undefined,
mimetype: contentInfo?.mimetype,
})
);
}
function resolveMatrixAllowBotsMode(value?: boolean | "mentions"): MatrixAllowBotsMode {
if (value === true) {
return "all";
@@ -679,17 +726,41 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
return { content, isDirectMessage, locationPayload, selfUserId };
};
const continueIngress = async (paramsLocal: {
audioPreflightMode?: "defer" | "run";
content: RoomMessageEventContent;
isDirectMessage: boolean;
locationPayload: MatrixLocationPayload | null;
reservedHistorySlot?: ReservedHistorySlot;
selfUserId: string;
}) => {
let content = paramsLocal.content;
const isDirectMessage = paramsLocal.isDirectMessage;
const isRoom = !isDirectMessage;
const { locationPayload, selfUserId } = paramsLocal;
if (isRoom && groupPolicy === "disabled") {
const { audioPreflightMode, locationPayload, reservedHistorySlot, selfUserId } =
paramsLocal;
const messageId = event.event_id ?? "";
const threadRootId = resolveMatrixThreadRootId({ event, content });
const thread = resolveMatrixThreadRouting({
isDirectMessage,
threadReplies,
dmThreadReplies,
messageId,
threadRootId,
});
const historyThreadId = threadRootId ? thread.threadId : undefined;
let reservedHistorySlotConsumed = false;
const discardReservedHistorySlot = () => {
if (reservedHistorySlot && !reservedHistorySlotConsumed) {
roomHistoryTracker.discardPending(roomId, reservedHistorySlot, historyThreadId);
reservedHistorySlotConsumed = true;
}
};
const commitInboundEventIfClaimedAndDiscardReserved = async () => {
discardReservedHistorySlot();
await commitInboundEventIfClaimed();
};
if (isRoom && groupPolicy === "disabled") {
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
@@ -722,7 +793,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
logVerboseMessage(
`matrix: drop configured bot sender=${senderId} (allowBots=false${isDirectMessage ? "" : `, ${roomMatchMeta}`})`,
);
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
const botLoopProtection: ChannelBotLoopProtectionFacts | undefined =
@@ -744,18 +815,18 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
if (isRoom && roomConfig && !roomConfigInfo?.allowed) {
logVerboseMessage(`matrix: room disabled room=${roomId} (${roomMatchMeta})`);
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
if (isRoom && groupPolicy === "allowlist") {
if (!roomConfigInfo?.allowlistConfigured) {
logVerboseMessage(`matrix: drop room message (no allowlist, ${roomMatchMeta})`);
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
if (!roomConfig) {
logVerboseMessage(`matrix: drop room message (not in allowlist, ${roomMatchMeta})`);
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
}
@@ -811,7 +882,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
if (isDirectMessage) {
if (!dmEnabled || dmPolicy === "disabled") {
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
const senderReason = messageIngress.senderAccess.reasonCode;
@@ -851,20 +922,21 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
await commitInboundEventIfClaimed();
} catch (err) {
logVerboseMessage(`matrix pairing reply failed for ${senderId}: ${String(err)}`);
discardReservedHistorySlot();
return undefined;
}
} else {
logVerboseMessage(
`matrix pairing reminder suppressed sender=${senderId} (cooldown)`,
);
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
}
}
if (isReactionEvent || dmPolicy !== "pairing") {
logVerboseMessage(
`matrix: blocked ${isReactionEvent ? "reaction" : "dm"} sender ${senderId} (dmPolicy=${dmPolicy}, reason=${senderReason})`,
);
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
}
return undefined;
}
@@ -874,7 +946,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
logVerboseMessage(
`matrix: blocked sender ${senderId} (ingress=${ingressDecision.reasonCode}, ${roomMatchMeta})`,
);
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
if (isRoom) {
@@ -897,7 +969,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
isDirectMessage,
logVerboseMessage,
});
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
@@ -929,6 +1001,17 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
? content.file
: undefined;
const mediaUrl = contentUrl ?? contentFile?.url;
const earlyContentInfo =
"info" in content && content.info && typeof content.info === "object"
? (content.info as { mimetype?: string; size?: number })
: undefined;
const earlyContentType = earlyContentInfo?.mimetype;
const earlyContentSize =
typeof earlyContentInfo?.size === "number" ? earlyContentInfo.size : undefined;
const earlyContentBody = typeof content.body === "string" ? content.body.trim() : "";
const earlyContentFilename =
typeof content.filename === "string" ? content.filename.trim() : "";
const earlyOriginalFilename = earlyContentFilename || earlyContentBody || undefined;
const pendingHistoryText = resolveMatrixPendingHistoryText({
mentionPrecheckText,
content,
@@ -939,19 +1022,19 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
? (await getPollSnapshot())?.text
: "";
if (!mentionPrecheckText && !mediaUrl && !isPollEvent) {
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
const messageId = event.event_id ?? "";
const threadRootId = resolveMatrixThreadRootId({ event, content });
const thread = resolveMatrixThreadRouting({
isDirectMessage,
threadReplies,
dmThreadReplies,
messageId,
threadRootId,
});
let preflightMedia: {
path: string;
contentType?: string;
placeholder: string;
} | null = null;
let preflightMediaDownloadFailed = false;
let preflightMediaSizeLimitExceeded = false;
let preflightAudioTranscript: string | undefined;
const {
route: _route,
configuredBinding: _configuredBinding,
@@ -968,6 +1051,81 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
resolveAgentRoute: core.channel.routing.resolveAgentRoute,
});
const hasExplicitSessionBinding = _configuredBinding !== null || _runtimeBindingId !== null;
const preflightAudioMediaUrl = mediaUrl?.startsWith("mxc://") ? mediaUrl : undefined;
const shouldRunMatrixAudioPreflight =
isMatrixAudioContent({
msgtype: typeof content.msgtype === "string" ? content.msgtype : undefined,
mimetype: earlyContentType,
}) &&
isMatrixAudioMediaEnabled(cfg) &&
preflightAudioMediaUrl !== undefined;
if (
shouldRunMatrixAudioPreflight &&
audioPreflightMode === "defer" &&
isRoom &&
historyLimit > 0 &&
!reservedHistorySlot
) {
const reserved = roomHistoryTracker.reservePending(
_route.agentId,
roomId,
{
sender: senderId,
body: pendingHistoryText,
timestamp: eventTs ?? undefined,
messageId,
},
historyThreadId,
);
return {
deferredPrefix: {
...paramsLocal,
audioPreflightMode: "run" as const,
reservedHistorySlot: reserved,
},
} as const;
}
if (shouldRunMatrixAudioPreflight) {
try {
preflightMedia = await downloadMatrixMedia({
client,
mxcUrl: preflightAudioMediaUrl,
contentType: earlyContentType,
sizeBytes: earlyContentSize,
maxBytes: mediaMaxBytes,
file: contentFile,
originalFilename: earlyOriginalFilename,
});
} catch (err) {
preflightMediaDownloadFailed = true;
if (isMatrixMediaSizeLimitError(err)) {
preflightMediaSizeLimitExceeded = true;
}
const errorText = formatMatrixErrorMessage(err);
logVerboseMessage(
`matrix: media download failed room=${roomId} id=${event.event_id ?? "unknown"} type=${content.msgtype} error=${errorText}`,
);
logger.warn("matrix media download failed", {
roomId,
eventId: event.event_id,
msgtype: content.msgtype,
encrypted: Boolean(contentFile),
error: errorText,
});
}
if (preflightMedia) {
preflightAudioTranscript = await resolveMatrixPreflightAudioTranscript({
mediaPath: preflightMedia.path,
mediaContentType: preflightMedia.contentType,
cfg,
accountId,
chatType: isDirectMessage ? "direct" : "channel",
originatingTo: `room:${roomId}`,
messageThreadId: thread.threadId,
sessionKey: _route.sessionKey,
});
}
}
const agentMentionRegexes = core.channel.mentions.buildMentionRegexes(cfg, _route.agentId, {
provider: "matrix",
conversationId: roomId,
@@ -976,11 +1134,14 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const selfDisplayName = content.formatted_body
? await getMemberDisplayName(roomId, selfUserId).catch(() => undefined)
: undefined;
const mentionPrecheckTextWithTranscript = preflightAudioTranscript
? [mentionPrecheckText, preflightAudioTranscript].filter(Boolean).join("\n").trim()
: mentionPrecheckText;
const { wasMentioned, hasExplicitMention } = resolveMentions({
content,
userId: selfUserId,
displayName: selfDisplayName,
text: mentionPrecheckText,
text: mentionPrecheckTextWithTranscript,
mentionRegexes: agentMentionRegexes,
});
if (
@@ -992,7 +1153,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
logVerboseMessage(
`matrix: drop configured bot sender=${senderId} (allowBots=mentions, missing mention, ${roomMatchMeta})`,
);
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
const allowTextCommands = core.channel.commands.shouldHandleTextCommands({
@@ -1025,7 +1186,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
reason: "control command (unauthorized)",
target: senderId,
});
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
const shouldRequireMention = isRoom
@@ -1047,7 +1208,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
hasControlCommandInMessage;
const canDetectMention = agentMentionRegexes.length > 0 || hasExplicitMention;
if (isRoom && shouldRequireMention && !wasMentioned && !shouldBypassMention) {
const pendingHistoryBody = pendingHistoryText || pendingHistoryPollText;
const pendingHistoryBody = preflightAudioTranscript
? formatMatrixAudioTranscript(preflightAudioTranscript)
: pendingHistoryText || pendingHistoryPollText;
if (historyLimit > 0 && pendingHistoryBody) {
const pendingEntry: HistoryEntry = {
sender: senderId,
@@ -1055,16 +1218,36 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
timestamp: eventTs ?? undefined,
messageId,
};
roomHistoryTracker.recordPending(roomId, pendingEntry);
if (reservedHistorySlot) {
roomHistoryTracker.finalizePending(
roomId,
reservedHistorySlot,
pendingEntry,
historyThreadId,
);
reservedHistorySlotConsumed = true;
} else {
roomHistoryTracker.recordPending(roomId, pendingEntry, historyThreadId);
}
}
logger.info("skipping room message", { roomId, reason: "no-mention" });
await commitInboundEventIfClaimed();
return undefined;
}
if (preflightAudioTranscript) {
await sendMatrixPreflightAudioTranscriptEcho({
transcript: preflightAudioTranscript,
cfg,
accountId,
originatingTo: `room:${roomId}`,
messageThreadId: thread.threadId,
});
}
if (isPollEvent) {
const pollSnapshot = await getPollSnapshot();
if (!pollSnapshot) {
discardReservedHistorySlot();
return undefined;
}
content = {
@@ -1077,9 +1260,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
path: string;
contentType?: string;
placeholder: string;
} | null = null;
let mediaDownloadFailed = false;
let mediaSizeLimitExceeded = false;
} | null = preflightMedia;
let mediaDownloadFailed = preflightMediaDownloadFailed;
let mediaSizeLimitExceeded = preflightMediaSizeLimitExceeded;
const finalContentUrl =
"url" in content && typeof content.url === "string" ? content.url : undefined;
const finalContentFile =
@@ -1096,7 +1279,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
: undefined;
const contentType = contentInfo?.mimetype;
const contentSize = typeof contentInfo?.size === "number" ? contentInfo.size : undefined;
if (finalMediaUrl?.startsWith("mxc://")) {
if (!media && !mediaDownloadFailed && finalMediaUrl?.startsWith("mxc://")) {
try {
media = await downloadMatrixMedia({
client,
@@ -1127,7 +1310,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}
const rawBody = locationPayload?.text ?? contentBody;
const bodyText = resolveMatrixInboundBodyText({
let bodyText = resolveMatrixInboundBodyText({
rawBody,
filename: typeof content.filename === "string" ? content.filename : undefined,
mediaPlaceholder: media?.placeholder,
@@ -1136,8 +1319,24 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
mediaDownloadFailed,
mediaSizeLimitExceeded,
});
if (
preflightMedia &&
bodyText &&
bodyText !== preflightMedia.placeholder &&
isLikelyBareFilename(bodyText)
) {
// Matrix voice clients commonly set body to the attachment filename.
bodyText = preflightMedia.placeholder;
}
if (preflightAudioTranscript) {
const transcriptBody = formatMatrixAudioTranscript(preflightAudioTranscript);
bodyText =
!bodyText || bodyText === media?.placeholder
? transcriptBody
: `${bodyText}\n${transcriptBody}`;
}
if (!bodyText) {
await commitInboundEventIfClaimed();
await commitInboundEventIfClaimedAndDiscardReserved();
return undefined;
}
const commandBodyText = hasControlCommandInMessage ? commandCheckText : bodyText;
@@ -1155,6 +1354,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
reason: "configured ACP binding unavailable",
target: _configuredBinding.spec.conversationId,
});
discardReservedHistorySlot();
return undefined;
}
}
@@ -1164,13 +1364,36 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}
const preparedTrigger =
isRoom && historyLimit > 0
? roomHistoryTracker.prepareTrigger(_route.agentId, roomId, historyLimit, {
sender: senderName,
body: bodyText,
timestamp: eventTs ?? undefined,
messageId,
})
? reservedHistorySlot
? roomHistoryTracker.prepareReservedTrigger(
_route.agentId,
roomId,
historyLimit,
reservedHistorySlot,
{
sender: senderName,
body: bodyText,
timestamp: eventTs ?? undefined,
messageId,
},
historyThreadId,
)
: roomHistoryTracker.prepareTrigger(
_route.agentId,
roomId,
historyLimit,
{
sender: senderName,
body: bodyText,
timestamp: eventTs ?? undefined,
messageId,
},
historyThreadId,
)
: undefined;
if (reservedHistorySlot && preparedTrigger) {
reservedHistorySlotConsumed = true;
}
const inboundHistory = preparedTrigger
? buildInboundHistoryFromEntries({
entries: preparedTrigger.history,
@@ -1195,6 +1418,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
bodyText,
commandBodyText,
media,
preflightAudioTranscript,
locationPayload,
messageId,
triggerSnapshot,
@@ -1215,7 +1439,18 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
if (prefix.isDirectMessage) {
return { deferredPrefix: prefix } as const;
}
return { ingressResult: await continueIngress(prefix) } as const;
const result = await continueIngress({
...prefix,
audioPreflightMode: shouldDeferMatrixAudioPreflightForRoomIngress({
content: prefix.content,
cfg,
})
? "defer"
: "run",
});
return result && "deferredPrefix" in result
? { deferredPrefix: result.deferredPrefix }
: { ingressResult: result };
})
: undefined;
const resolvedIngressResult =
@@ -1233,6 +1468,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
if (!resolvedIngressResult) {
return;
}
if ("deferredPrefix" in resolvedIngressResult) {
return;
}
const {
route: _route,
@@ -1250,6 +1488,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
bodyText,
commandBodyText,
media,
preflightAudioTranscript,
locationPayload,
messageId,
triggerSnapshot,
@@ -1389,7 +1628,14 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
},
media: toInboundMediaFacts(
media
? [{ path: media.path, url: media.path, contentType: media.contentType }]
? [
{
path: media.path,
url: media.path,
contentType: media.contentType,
transcribed: preflightAudioTranscript !== undefined,
},
]
: undefined,
),
messageId,
@@ -1865,6 +2111,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
textLimit,
replyToMode,
threadId: threadTarget,
replyToId: threadTarget ?? replyToEventId ?? undefined,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
@@ -1974,6 +2221,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
textLimit,
replyToMode,
threadId: threadTarget,
replyToId: threadTarget ?? replyToEventId ?? undefined,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
@@ -2041,6 +2289,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
textLimit,
replyToMode,
threadId: threadTarget,
replyToId: threadTarget ?? replyToEventId ?? undefined,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
@@ -2065,6 +2314,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
textLimit,
replyToMode,
threadId: threadTarget,
replyToId: threadTarget ?? replyToEventId ?? undefined,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
@@ -2096,6 +2346,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
textLimit,
replyToMode,
threadId: threadTarget,
replyToId: threadTarget ?? replyToEventId ?? undefined,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
@@ -2331,7 +2582,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
// Only advance to the snapshot position — messages added during async processing remain
// visible for the next trigger.
if (isRoom && triggerSnapshot) {
roomHistoryTracker.consumeHistory(_route.agentId, roomId, triggerSnapshot, messageId);
roomHistoryTracker.consumeHistory(
_route.agentId,
roomId,
triggerSnapshot,
messageId,
threadRootId ? thread.threadId : undefined,
);
}
if (!hasFinalInboundReplyDispatch({ queuedFinal, counts })) {
await commitInboundEventIfClaimed();

View File

@@ -0,0 +1,18 @@
import { sendDurableMessageBatch as sendDurableMessageBatchImpl } from "openclaw/plugin-sdk/channel-outbound";
import { transcribeFirstAudio as transcribeFirstAudioImpl } from "openclaw/plugin-sdk/media-runtime";
type TranscribeFirstAudio = typeof import("openclaw/plugin-sdk/media-runtime").transcribeFirstAudio;
type SendDurableMessageBatch =
typeof import("openclaw/plugin-sdk/channel-outbound").sendDurableMessageBatch;
export async function transcribeFirstAudio(
...args: Parameters<TranscribeFirstAudio>
): ReturnType<TranscribeFirstAudio> {
return await transcribeFirstAudioImpl(...args);
}
export async function sendDurableMessageBatch(
...args: Parameters<SendDurableMessageBatch>
): ReturnType<SendDurableMessageBatch> {
return await sendDurableMessageBatchImpl(...args);
}

View File

@@ -0,0 +1,177 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const { sendDurableMessageBatchMock, transcribeFirstAudioMock } = vi.hoisted(() => ({
sendDurableMessageBatchMock: vi.fn(),
transcribeFirstAudioMock: vi.fn(),
}));
vi.mock("./preflight-audio.runtime.js", () => ({
sendDurableMessageBatch: sendDurableMessageBatchMock,
transcribeFirstAudio: transcribeFirstAudioMock,
}));
import {
formatMatrixAudioTranscript,
isMatrixAudioContent,
resolveMatrixPreflightAudioTranscript,
sendMatrixPreflightAudioTranscriptEcho,
} from "./preflight-audio.js";
const cfg = {} as import("openclaw/plugin-sdk/config-contracts").OpenClawConfig;
describe("isMatrixAudioContent", () => {
it("accepts Matrix audio messages and audio files", () => {
expect(isMatrixAudioContent({ msgtype: "m.audio" })).toBe(true);
expect(isMatrixAudioContent({ msgtype: "m.file", mimetype: "audio/ogg" })).toBe(true);
expect(isMatrixAudioContent({ msgtype: "m.file", mimetype: "AUDIO/MP4" })).toBe(true);
});
it("rejects non-audio Matrix content", () => {
expect(isMatrixAudioContent({ msgtype: "m.image", mimetype: "image/png" })).toBe(false);
expect(isMatrixAudioContent({ msgtype: "m.file", mimetype: "application/pdf" })).toBe(false);
expect(isMatrixAudioContent({ mimetype: "audio/ogg" })).toBe(false);
});
});
describe("formatMatrixAudioTranscript", () => {
it("wraps transcripts with untrusted machine-generated framing", () => {
expect(formatMatrixAudioTranscript('say "hi"\nthen go')).toBe(
`[Audio transcript (machine-generated, untrusted)]: ${JSON.stringify('say "hi"\nthen go')}`,
);
});
});
describe("resolveMatrixPreflightAudioTranscript", () => {
beforeEach(() => {
sendDurableMessageBatchMock.mockReset();
transcribeFirstAudioMock.mockReset();
});
it("passes the Matrix-local media path to shared audio preflight", async () => {
transcribeFirstAudioMock.mockResolvedValue("hello from voice");
const transcript = await resolveMatrixPreflightAudioTranscript({
mediaPath: "/tmp/inbound/voice.ogg",
mediaContentType: "audio/ogg",
cfg,
accountId: "ops",
chatType: "channel",
originatingTo: "room:!room:example.org",
messageThreadId: "$thread",
sessionKey: "agent:main:matrix:channel:!room:example.org",
});
expect(transcribeFirstAudioMock).toHaveBeenCalledWith(
expect.objectContaining({
ctx: expect.objectContaining({
MediaPaths: ["/tmp/inbound/voice.ogg"],
MediaTypes: ["audio/ogg"],
Provider: "matrix",
Surface: "matrix",
OriginatingChannel: "matrix",
OriginatingTo: "room:!room:example.org",
AccountId: "ops",
MessageThreadId: "$thread",
ChatType: "channel",
SessionKey: "agent:main:matrix:channel:!room:example.org",
}),
cfg,
}),
);
expect(transcript).toBe("hello from voice");
});
it("suppresses shared echo during pre-mention transcription", async () => {
const echoCfg = {
tools: { media: { audio: { echoTranscript: true, echoFormat: "echo: {transcript}" } } },
} as import("openclaw/plugin-sdk/config-contracts").OpenClawConfig;
transcribeFirstAudioMock.mockResolvedValue("hello from voice");
await resolveMatrixPreflightAudioTranscript({
mediaPath: "/tmp/inbound/voice.ogg",
mediaContentType: "audio/ogg",
cfg: echoCfg,
accountId: "ops",
chatType: "channel",
originatingTo: "room:!room:example.org",
sessionKey: "agent:main:matrix:channel:!room:example.org",
});
const callCfg = transcribeFirstAudioMock.mock.calls[0]?.[0]?.cfg as
| { tools?: { media?: { audio?: { echoTranscript?: unknown } } } }
| undefined;
expect(callCfg?.tools?.media?.audio?.echoTranscript).toBe(false);
});
it("swallows provider failures and aborts", async () => {
transcribeFirstAudioMock.mockRejectedValue(new Error("STT down"));
await expect(
resolveMatrixPreflightAudioTranscript({
mediaPath: "/tmp/inbound/voice.ogg",
cfg,
accountId: "ops",
chatType: "direct",
originatingTo: "room:!dm:example.org",
sessionKey: "agent:main:matrix:direct:@frank:example.org",
}),
).resolves.toBeUndefined();
const controller = new AbortController();
controller.abort();
transcribeFirstAudioMock.mockClear();
await expect(
resolveMatrixPreflightAudioTranscript({
mediaPath: "/tmp/inbound/voice.ogg",
cfg,
accountId: "ops",
chatType: "direct",
originatingTo: "room:!dm:example.org",
sessionKey: "agent:main:matrix:direct:@frank:example.org",
abortSignal: controller.signal,
}),
).resolves.toBeUndefined();
expect(transcribeFirstAudioMock).not.toHaveBeenCalled();
});
});
describe("sendMatrixPreflightAudioTranscriptEcho", () => {
beforeEach(() => {
sendDurableMessageBatchMock.mockReset();
transcribeFirstAudioMock.mockReset();
});
it("sends accepted Matrix preflight transcript echoes through durable delivery", async () => {
sendDurableMessageBatchMock.mockResolvedValue({ status: "sent", results: [] });
await sendMatrixPreflightAudioTranscriptEcho({
transcript: "hello bot",
cfg: {
tools: { media: { audio: { echoTranscript: true, echoFormat: "heard: {transcript}" } } },
} as import("openclaw/plugin-sdk/config-contracts").OpenClawConfig,
accountId: "ops",
originatingTo: "room:!room:example.org",
messageThreadId: "$thread",
});
expect(sendDurableMessageBatchMock).toHaveBeenCalledWith({
cfg: expect.any(Object),
channel: "matrix",
to: "room:!room:example.org",
accountId: "ops",
threadId: "$thread",
payloads: [{ text: "heard: hello bot" }],
bestEffort: true,
durability: "best_effort",
});
});
it("does not echo when transcript echo is disabled", async () => {
await sendMatrixPreflightAudioTranscriptEcho({
transcript: "hello bot",
cfg,
accountId: "ops",
originatingTo: "room:!room:example.org",
});
expect(sendDurableMessageBatchMock).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,126 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
type MatrixPreflightAudioRuntime = typeof import("./preflight-audio.runtime.js");
const MATRIX_DEFAULT_ECHO_TRANSCRIPT_FORMAT = '📝 "{transcript}"';
let matrixPreflightAudioRuntimePromise: Promise<MatrixPreflightAudioRuntime> | undefined;
function loadMatrixPreflightAudioRuntime(): Promise<MatrixPreflightAudioRuntime> {
matrixPreflightAudioRuntimePromise ??= import("./preflight-audio.runtime.js");
return matrixPreflightAudioRuntimePromise;
}
export function formatMatrixAudioTranscript(transcript: string): string {
return `[Audio transcript (machine-generated, untrusted)]: ${JSON.stringify(transcript)}`;
}
function formatMatrixAudioTranscriptEcho(transcript: string, format: string): string {
return format.replace("{transcript}", transcript);
}
function suppressMatrixPreflightAudioEcho(cfg: OpenClawConfig): OpenClawConfig {
const audio = cfg.tools?.media?.audio;
if (!audio?.echoTranscript) {
return cfg;
}
return {
...cfg,
tools: {
...cfg.tools,
media: {
...cfg.tools?.media,
audio: {
...audio,
echoTranscript: false,
},
},
},
};
}
export function isMatrixAudioContent(params: { msgtype?: string; mimetype?: string }): boolean {
if (params.msgtype === "m.audio") {
return true;
}
if (params.msgtype === "m.file" && typeof params.mimetype === "string") {
return params.mimetype.toLowerCase().startsWith("audio/");
}
return false;
}
export async function resolveMatrixPreflightAudioTranscript(params: {
mediaPath: string;
mediaContentType?: string;
cfg: OpenClawConfig;
accountId: string;
chatType: "channel" | "direct";
originatingTo: string;
messageThreadId?: string;
sessionKey: string;
abortSignal?: AbortSignal;
}): Promise<string | undefined> {
if (params.abortSignal?.aborted) {
return undefined;
}
try {
const { transcribeFirstAudio } = await loadMatrixPreflightAudioRuntime();
if (params.abortSignal?.aborted) {
return undefined;
}
const transcript = await transcribeFirstAudio({
ctx: {
MediaPaths: [params.mediaPath],
MediaTypes: params.mediaContentType ? [params.mediaContentType] : undefined,
Provider: "matrix",
Surface: "matrix",
OriginatingChannel: "matrix",
OriginatingTo: params.originatingTo,
AccountId: params.accountId,
MessageThreadId: params.messageThreadId,
ChatType: params.chatType,
SessionKey: params.sessionKey,
},
cfg: suppressMatrixPreflightAudioEcho(params.cfg),
});
return params.abortSignal?.aborted ? undefined : transcript;
} catch (err) {
logVerbose(`matrix: audio preflight transcription failed: ${String(err)}`);
return undefined;
}
}
export async function sendMatrixPreflightAudioTranscriptEcho(params: {
transcript: string;
cfg: OpenClawConfig;
accountId: string;
originatingTo: string;
messageThreadId?: string;
}): Promise<void> {
const audio = params.cfg.tools?.media?.audio;
if (!audio?.echoTranscript) {
return;
}
const text = formatMatrixAudioTranscriptEcho(
params.transcript,
audio.echoFormat ?? MATRIX_DEFAULT_ECHO_TRANSCRIPT_FORMAT,
);
try {
const { sendDurableMessageBatch } = await loadMatrixPreflightAudioRuntime();
const send = await sendDurableMessageBatch({
cfg: params.cfg,
channel: "matrix",
to: params.originatingTo,
accountId: params.accountId,
threadId: params.messageThreadId,
payloads: [{ text }],
bestEffort: true,
durability: "best_effort",
});
if (send.status === "failed") {
throw send.error;
}
} catch (err) {
logVerbose(`matrix: audio transcript echo failed: ${String(err)}`);
}
}

View File

@@ -149,7 +149,7 @@ describe("deliverMatrixReplies", () => {
expect(sendOptions(2).replyToId).toBe("reply-text");
});
it("suppresses replyToId when threadId is set", async () => {
it("keeps replyToId when threadId is set so Matrix can send fallback metadata", async () => {
chunkMatrixTextMock.mockImplementation((text: string) => ({
trimmedText: text.trim(),
convertedText: text,
@@ -160,19 +160,20 @@ describe("deliverMatrixReplies", () => {
await deliverMatrixReplies({
cfg,
replies: [{ text: "hello|thread", replyToId: "reply-thread" }],
replies: [{ text: "hello|thread" }],
roomId: "room:3",
client: {} as MatrixClient,
runtime: runtimeEnv,
textLimit: 4000,
replyToMode: "all",
replyToMode: "off",
threadId: "thread-77",
replyToId: "reply-thread",
});
expect(sendMessageMatrixMock).toHaveBeenCalledTimes(2);
expect(sendOptions(0).replyToId).toBeUndefined();
expect(sendOptions(0).replyToId).toBe("reply-thread");
expect(sendOptions(0).threadId).toBe("thread-77");
expect(sendOptions(1).replyToId).toBeUndefined();
expect(sendOptions(1).replyToId).toBe("reply-thread");
expect(sendOptions(1).threadId).toBe("thread-77");
});

View File

@@ -39,6 +39,7 @@ export async function deliverMatrixReplies(params: {
textLimit: number;
replyToMode: "off" | "first" | "all" | "batched";
threadId?: string;
replyToId?: string;
accountId?: string;
mediaLocalRoots?: readonly string[];
tableMode?: MarkdownTableMode;
@@ -72,8 +73,12 @@ export async function deliverMatrixReplies(params: {
params.runtime.error?.("matrix reply missing text/media");
continue;
}
const replyToIdRaw = reply.replyToId?.trim();
const replyToId = params.threadId || params.replyToMode === "off" ? undefined : replyToIdRaw;
const replyToIdRaw = (reply.replyToId ?? params.replyToId)?.trim();
const replyToId = params.threadId
? replyToIdRaw
: params.replyToMode === "off"
? undefined
: replyToIdRaw;
const rawText = reply.text ?? "";
const mediaList = reply.mediaUrls?.length
? reply.mediaUrls
@@ -82,7 +87,7 @@ export async function deliverMatrixReplies(params: {
: [];
const shouldIncludeReply = (id?: string) =>
Boolean(id) && (params.replyToMode === "all" || !hasReplied);
Boolean(id) && (params.threadId || params.replyToMode === "all" || !hasReplied);
const replyToIdForReply = shouldIncludeReply(replyToId) ? replyToId : undefined;
if (mediaList.length === 0) {

View File

@@ -65,6 +65,223 @@ describe("createRoomHistoryTracker — watermark monotonicity", () => {
expect(retried.snapshotIdx).toBe(first.snapshotIdx);
});
it("reserved triggers keep their arrival-order history window", () => {
const tracker = createRoomHistoryTrackerForTests();
tracker.recordPending(ROOM, { sender: "user", body: "before", messageId: "$before" });
const reserved = tracker.reservePending(AGENT, ROOM, {
sender: "user",
body: "audio placeholder",
messageId: "$audio",
});
tracker.recordPending(ROOM, { sender: "user", body: "after", messageId: "$after" });
const prepared = tracker.prepareReservedTrigger(AGENT, ROOM, 100, reserved, {
sender: "user",
body: "audio trigger",
messageId: "$audio",
});
expect(prepared.history.map((entryValue) => entryValue.body)).toEqual(["before"]);
tracker.consumeHistory(AGENT, ROOM, prepared, "$audio");
expect(
tracker.getPendingHistory(AGENT, ROOM, 100).map((entryValue) => entryValue.body),
).toEqual(["after"]);
});
it("reserved pending slots are finalized in arrival order", () => {
const tracker = createRoomHistoryTrackerForTests();
const reserved = tracker.reservePending(AGENT, ROOM, {
sender: "user",
body: "audio placeholder",
messageId: "$audio",
});
tracker.recordPending(ROOM, { sender: "user", body: "after", messageId: "$after" });
tracker.finalizePending(ROOM, reserved, {
sender: "user",
body: "audio final",
messageId: "$audio",
});
expect(
tracker.getPendingHistory(AGENT, ROOM, 100).map((entryValue) => entryValue.body),
).toEqual(["audio final", "after"]);
});
it("discarded reserved slots do not leak into later history", () => {
const tracker = createRoomHistoryTrackerForTests();
const reserved = tracker.reservePending(AGENT, ROOM, {
sender: "blocked",
body: "blocked audio",
messageId: "$blocked",
});
tracker.discardPending(ROOM, reserved);
tracker.recordPending(ROOM, { sender: "user", body: "after", messageId: "$after" });
const prepared = tracker.prepareTrigger(AGENT, ROOM, 100, {
sender: "user",
body: "trigger",
messageId: "$trigger",
});
expect(prepared.history.map((entryValue) => entryValue.body)).toEqual(["after"]);
});
it("reserved triggers use the arrival-time watermark even if a later trigger consumes history", () => {
const tracker = createRoomHistoryTrackerForTests();
tracker.recordPending(ROOM, { sender: "user", body: "before", messageId: "$before" });
const reserved = tracker.reservePending(AGENT, ROOM, {
sender: "user",
body: "audio placeholder",
messageId: "$audio",
});
const later = tracker.prepareTrigger(AGENT, ROOM, 100, {
sender: "user",
body: "later trigger",
messageId: "$later",
});
tracker.consumeHistory(AGENT, ROOM, later, "$later");
const prepared = tracker.prepareReservedTrigger(AGENT, ROOM, 100, reserved, {
sender: "user",
body: "audio trigger",
messageId: "$audio",
});
expect(prepared.history.map((entryValue) => entryValue.body)).toEqual(["before"]);
});
it("does not let later triggers consume unfinalized reserved slots", () => {
const tracker = createRoomHistoryTrackerForTests();
const reserved = tracker.reservePending(AGENT, ROOM, {
sender: "user",
body: "audio placeholder",
messageId: "$audio",
});
const later = tracker.prepareTrigger(AGENT, ROOM, 100, {
sender: "user",
body: "later trigger",
messageId: "$later",
});
tracker.consumeHistory(AGENT, ROOM, later, "$later");
tracker.finalizePending(ROOM, reserved, {
sender: "user",
body: "audio transcript",
messageId: "$audio",
});
const followUp = tracker.prepareTrigger(AGENT, ROOM, 100, {
sender: "user",
body: "follow up",
messageId: "$follow-up",
});
expect(followUp.history.map((entryValue) => entryValue.body)).toEqual(["audio transcript"]);
});
it("reserved trigger retries discard the extra placeholder slot", () => {
const tracker = createRoomHistoryTrackerForTests();
tracker.recordPending(ROOM, { sender: "user", body: "before", messageId: "$before" });
const firstReserved = tracker.reservePending(AGENT, ROOM, {
sender: "user",
body: "audio placeholder",
messageId: "$audio",
});
const firstPrepared = tracker.prepareReservedTrigger(AGENT, ROOM, 100, firstReserved, {
sender: "user",
body: "audio trigger",
messageId: "$audio",
});
const retryReserved = tracker.reservePending(AGENT, ROOM, {
sender: "user",
body: "audio placeholder retry",
messageId: "$audio",
});
const retried = tracker.prepareReservedTrigger(AGENT, ROOM, 100, retryReserved, {
sender: "user",
body: "audio trigger",
messageId: "$audio",
});
tracker.consumeHistory(AGENT, ROOM, retried, "$audio");
expect(retried.snapshotIdx).toBe(firstPrepared.snapshotIdx);
expect(tracker.getPendingHistory(AGENT, ROOM, 100)).toHaveLength(0);
});
it("keeps main-room and thread histories isolated", () => {
const tracker = createRoomHistoryTrackerForTests();
tracker.recordPending(ROOM, entry("main-1"));
tracker.recordPending(ROOM, entry("thread-1"), "$thread");
tracker.recordPending(ROOM, entry("main-2"));
const mainPrepared = tracker.prepareTrigger(AGENT, ROOM, 100, entry("main-trigger"));
const threadPrepared = tracker.prepareTrigger(
AGENT,
ROOM,
100,
entry("thread-trigger"),
"$thread",
);
expect(mainPrepared.history.map((entryValue) => entryValue.body)).toEqual(["main-1", "main-2"]);
expect(threadPrepared.history.map((entryValue) => entryValue.body)).toEqual(["thread-1"]);
});
it("advances watermarks independently per thread", () => {
const tracker = createRoomHistoryTrackerForTests();
tracker.recordPending(ROOM, entry("thread-a-1"), "$thread-a");
tracker.recordPending(ROOM, entry("thread-b-1"), "$thread-b");
const snapA = tracker.prepareTrigger(AGENT, ROOM, 100, entry("trigger-a"), "$thread-a");
tracker.consumeHistory(AGENT, ROOM, snapA, undefined, "$thread-a");
expect(tracker.getPendingHistory(AGENT, ROOM, 100, "$thread-a")).toHaveLength(0);
expect(
tracker.getPendingHistory(AGENT, ROOM, 100, "$thread-b").map((entryValue) => entryValue.body),
).toEqual(["thread-b-1"]);
});
it("reserved thread triggers keep the thread arrival-order history window", () => {
const tracker = createRoomHistoryTrackerForTests();
tracker.recordPending(ROOM, entry("main-before"));
tracker.recordPending(ROOM, entry("thread-before"), "$thread");
const reserved = tracker.reservePending(
AGENT,
ROOM,
{
sender: "user",
body: "audio placeholder",
messageId: "$audio",
},
"$thread",
);
tracker.recordPending(ROOM, entry("thread-after"), "$thread");
tracker.recordPending(ROOM, entry("main-after"));
const prepared = tracker.prepareReservedTrigger(
AGENT,
ROOM,
100,
reserved,
{
sender: "user",
body: "audio trigger",
messageId: "$audio",
},
"$thread",
);
expect(prepared.history.map((entryValue) => entryValue.body)).toEqual(["thread-before"]);
});
it("refreshes watermark recency before capped-map eviction", () => {
const tracker = createRoomHistoryTrackerForTests(200, 10, 2);
const room1 = "!room1:test";

View File

@@ -11,6 +11,9 @@
* Race-condition safety: the watermark only advances to the snapshot index taken at
* dispatch time, NOT to the queue's end at reply time. Messages that land in the queue
* while the agent is processing stay visible to the next trigger for that agent.
*
* Thread-scoped history uses a separate sub-queue per Matrix thread root. Main-room
* history and thread history must not share watermarks or pending context.
*/
import type { HistoryEntry } from "openclaw/plugin-sdk/reply-history";
@@ -23,6 +26,8 @@ const DEFAULT_MAX_ROOM_QUEUES = 1000;
const MAX_WATERMARK_ENTRIES = 5000;
/** Maximum prepared trigger snapshots retained per room for retry reuse. */
const MAX_PREPARED_TRIGGER_ENTRIES = 500;
/** Maximum thread queues retained per room (FIFO eviction beyond this). */
const MAX_THREAD_QUEUES_PER_ROOM = 200;
export type { HistoryEntry };
@@ -31,6 +36,17 @@ type HistorySnapshotToken = {
queueGeneration: number;
};
export type ReservedHistorySlot = HistorySnapshotToken & {
slotIdx: number;
watermarkIdx?: number;
};
type QueuedHistoryEntry = HistoryEntry & {
discarded?: true;
reserved?: true;
consumedBy?: Set<string>;
};
type PreparedTriggerResult = {
history: HistoryEntry[];
} & HistorySnapshotToken;
@@ -40,7 +56,26 @@ type RoomHistoryTracker = {
* Record a non-trigger message for future context.
* Call this when a room message arrives but does not mention the bot.
*/
recordPending: (roomId: string, entry: HistoryEntry) => void;
recordPending: (roomId: string, entry: HistoryEntry, threadRootId?: string) => void;
/** Reserve an arrival-order slot for slow preflight work that finishes later. */
reservePending: (
agentId: string,
roomId: string,
entry: HistoryEntry,
threadRootId?: string,
) => ReservedHistorySlot;
/** Replace a reserved slot with its final non-trigger history entry. */
finalizePending: (
roomId: string,
slot: ReservedHistorySlot,
entry: HistoryEntry,
threadRootId?: string,
) => void;
/** Remove a reserved slot without changing later absolute indexes. */
discardPending: (roomId: string, slot: ReservedHistorySlot, threadRootId?: string) => void;
/**
* Capture pending history and append the trigger as one idempotent operation.
@@ -51,6 +86,17 @@ type RoomHistoryTracker = {
roomId: string,
limit: number,
entry: HistoryEntry,
threadRootId?: string,
) => PreparedTriggerResult;
/** Prepare a trigger using a previously reserved arrival-order slot. */
prepareReservedTrigger: (
agentId: string,
roomId: string,
limit: number,
slot: ReservedHistorySlot,
entry: HistoryEntry,
threadRootId?: string,
) => PreparedTriggerResult;
/**
@@ -63,6 +109,7 @@ type RoomHistoryTracker = {
roomId: string,
snapshot: HistorySnapshotToken,
messageId?: string,
threadRootId?: string,
) => void;
};
@@ -70,22 +117,35 @@ type RoomHistoryTrackerTestApi = RoomHistoryTracker & {
/**
* Test-only helper for inspecting pending room history directly.
*/
getPendingHistory: (agentId: string, roomId: string, limit: number) => HistoryEntry[];
getPendingHistory: (
agentId: string,
roomId: string,
limit: number,
threadRootId?: string,
) => HistoryEntry[];
/**
* Test-only helper for manually appending a trigger entry and snapshot index.
*/
recordTrigger: (roomId: string, entry: HistoryEntry) => HistorySnapshotToken;
recordTrigger: (
roomId: string,
entry: HistoryEntry,
threadRootId?: string,
) => HistorySnapshotToken;
};
type RoomQueue = {
entries: HistoryEntry[];
type HistoryQueue = {
entries: QueuedHistoryEntry[];
/** Absolute index of entries[0] — increases as old entries are trimmed. */
baseIndex: number;
generation: number;
preparedTriggers: Map<string, PreparedTriggerResult>;
};
type RoomQueue = HistoryQueue & {
threadQueues: Map<string, HistoryQueue>;
};
function createRoomHistoryTrackerInternal(
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
maxRoomQueues = DEFAULT_MAX_ROOM_QUEUES,
@@ -93,27 +153,43 @@ function createRoomHistoryTrackerInternal(
maxPreparedTriggerEntries = MAX_PREPARED_TRIGGER_ENTRIES,
): RoomHistoryTrackerTestApi {
const roomQueues = new Map<string, RoomQueue>();
/** Maps `${agentId}:${roomId}` → absolute consumed-up-to index */
/** Maps `{agentId, roomId, scope}` → absolute consumed-up-to index */
const agentWatermarks = new Map<string, number>();
let nextQueueGeneration = 1;
function clearRoomWatermarks(roomId: string): void {
const roomSuffix = `:${roomId}`;
for (const key of agentWatermarks.keys()) {
if (key.endsWith(roomSuffix)) {
const parsed = JSON.parse(key) as { roomId?: string } | null;
if (parsed?.roomId === roomId) {
agentWatermarks.delete(key);
}
}
}
function clearThreadWatermarks(roomId: string, threadRootId: string): void {
for (const key of agentWatermarks.keys()) {
const parsed = JSON.parse(key) as { roomId?: string; scope?: string } | null;
if (parsed?.roomId === roomId && parsed.scope === threadRootId) {
agentWatermarks.delete(key);
}
}
}
function createHistoryQueue(): HistoryQueue {
return {
entries: [],
baseIndex: 0,
generation: nextQueueGeneration++,
preparedTriggers: new Map(),
};
}
function getOrCreateQueue(roomId: string): RoomQueue {
let queue = roomQueues.get(roomId);
if (!queue) {
queue = {
entries: [],
baseIndex: 0,
generation: nextQueueGeneration++,
preparedTriggers: new Map(),
...createHistoryQueue(),
threadQueues: new Map(),
};
roomQueues.set(roomId, queue);
// FIFO eviction to prevent unbounded growth across many rooms
@@ -128,7 +204,40 @@ function createRoomHistoryTrackerInternal(
return queue;
}
function appendToQueue(queue: RoomQueue, entry: HistoryEntry): HistorySnapshotToken {
function getOrCreateThreadQueue(
roomId: string,
roomQueue: RoomQueue,
threadRootId: string,
): HistoryQueue {
let queue = roomQueue.threadQueues.get(threadRootId);
if (!queue) {
queue = createHistoryQueue();
roomQueue.threadQueues.set(threadRootId, queue);
if (roomQueue.threadQueues.size > MAX_THREAD_QUEUES_PER_ROOM) {
const oldest = roomQueue.threadQueues.keys().next().value;
if (oldest !== undefined) {
roomQueue.threadQueues.delete(oldest);
clearThreadWatermarks(roomId, oldest);
}
}
}
return queue;
}
function getScopedQueue(roomId: string, threadRootId?: string): HistoryQueue {
const roomQueue = getOrCreateQueue(roomId);
return threadRootId ? getOrCreateThreadQueue(roomId, roomQueue, threadRootId) : roomQueue;
}
function findScopedQueue(roomId: string, threadRootId?: string): HistoryQueue | undefined {
const roomQueue = roomQueues.get(roomId);
if (!roomQueue) {
return undefined;
}
return threadRootId ? roomQueue.threadQueues.get(threadRootId) : roomQueue;
}
function appendToQueue(queue: HistoryQueue, entry: QueuedHistoryEntry): HistorySnapshotToken {
queue.entries.push(entry);
if (queue.entries.length > maxQueueSize) {
const overflow = queue.entries.length - maxQueueSize;
@@ -141,8 +250,12 @@ function createRoomHistoryTrackerInternal(
};
}
function wmKey(agentId: string, roomId: string): string {
return `${agentId}:${roomId}`;
function wmKey(agentId: string, roomId: string, threadRootId?: string): string {
return JSON.stringify({
agentId,
roomId,
scope: threadRootId ?? "main",
});
}
function preparedTriggerKey(agentId: string, messageId?: string): string | null {
@@ -167,8 +280,25 @@ function createRoomHistoryTrackerInternal(
}
}
function markConsumedAfterReservedGap(
queue: HistoryQueue,
key: string,
firstReservedRel: number,
snapshotIdx: number,
): void {
const endRel = Math.min(snapshotIdx - queue.baseIndex, queue.entries.length);
for (let rel = firstReservedRel + 1; rel < endRel; rel += 1) {
const entry = queue.entries[rel];
if (!entry || entry.reserved || entry.discarded) {
continue;
}
entry.consumedBy ??= new Set<string>();
entry.consumedBy.add(key);
}
}
function rememberPreparedTrigger(
queue: RoomQueue,
queue: HistoryQueue,
retryKey: string,
prepared: PreparedTriggerResult,
): PreparedTriggerResult {
@@ -187,53 +317,174 @@ function createRoomHistoryTrackerInternal(
}
function computePendingHistory(
queue: RoomQueue,
queue: HistoryQueue,
agentId: string,
roomId: string,
limit: number,
endAbsExclusive = queue.baseIndex + queue.entries.length,
startAbsOverride?: number,
threadRootId?: string,
): HistoryEntry[] {
if (limit <= 0 || queue.entries.length === 0) {
return [];
}
const wm = agentWatermarks.get(wmKey(agentId, roomId)) ?? 0;
const wm = startAbsOverride ?? agentWatermarks.get(wmKey(agentId, roomId, threadRootId)) ?? 0;
// startAbs: the first absolute index the agent hasn't seen yet
const startAbs = Math.max(wm, queue.baseIndex);
const startRel = startAbs - queue.baseIndex;
const available = queue.entries.slice(startRel);
const endRel = Math.max(
startRel,
Math.min(endAbsExclusive - queue.baseIndex, queue.entries.length),
);
const available = queue.entries
.slice(startRel, endRel)
.filter(
(entry) =>
!entry.discarded &&
!entry.reserved &&
!entry.consumedBy?.has(wmKey(agentId, roomId, threadRootId)),
);
return available.length > limit ? available.slice(-limit) : available;
}
function prepareTriggerInternal(
agentId: string,
roomId: string,
limit: number,
entry: HistoryEntry,
threadRootId?: string,
): PreparedTriggerResult {
const queue = getScopedQueue(roomId, threadRootId);
const retryKey = preparedTriggerKey(agentId, entry.messageId);
if (retryKey) {
const prepared = queue.preparedTriggers.get(retryKey);
if (prepared) {
return rememberPreparedTrigger(queue, retryKey, prepared);
}
}
const prepared = {
history: computePendingHistory(
queue,
agentId,
roomId,
limit,
undefined,
undefined,
threadRootId,
),
...appendToQueue(queue, entry),
};
if (retryKey) {
return rememberPreparedTrigger(queue, retryKey, prepared);
}
return prepared;
}
return {
recordPending(roomId, entry) {
const queue = getOrCreateQueue(roomId);
recordPending(roomId, entry, threadRootId) {
const queue = getScopedQueue(roomId, threadRootId);
appendToQueue(queue, entry);
},
getPendingHistory(agentId, roomId, limit) {
const queue = roomQueues.get(roomId);
reservePending(agentId, roomId, entry, threadRootId) {
const queue = getScopedQueue(roomId, threadRootId);
const snapshot = appendToQueue(queue, { ...entry, reserved: true });
return {
...snapshot,
slotIdx: snapshot.snapshotIdx - 1,
watermarkIdx: agentWatermarks.get(wmKey(agentId, roomId, threadRootId)) ?? 0,
};
},
finalizePending(roomId, slot, entry, threadRootId) {
const queue = findScopedQueue(roomId, threadRootId);
if (!queue || queue.generation !== slot.queueGeneration) {
return;
}
const rel = slot.slotIdx - queue.baseIndex;
if (rel < 0 || rel >= queue.entries.length) {
return;
}
queue.entries[rel] = entry;
},
discardPending(roomId, slot, threadRootId) {
const queue = findScopedQueue(roomId, threadRootId);
if (!queue || queue.generation !== slot.queueGeneration) {
return;
}
const rel = slot.slotIdx - queue.baseIndex;
if (rel < 0 || rel >= queue.entries.length) {
return;
}
queue.entries[rel] = {
sender: "",
body: "",
messageId: undefined,
discarded: true,
};
},
getPendingHistory(agentId, roomId, limit, threadRootId) {
const queue = findScopedQueue(roomId, threadRootId);
if (!queue) {
return [];
}
return computePendingHistory(queue, agentId, roomId, limit);
return computePendingHistory(
queue,
agentId,
roomId,
limit,
undefined,
undefined,
threadRootId,
);
},
recordTrigger(roomId, entry) {
const queue = getOrCreateQueue(roomId);
recordTrigger(roomId, entry, threadRootId) {
const queue = getScopedQueue(roomId, threadRootId);
return appendToQueue(queue, entry);
},
prepareTrigger(agentId, roomId, limit, entry) {
const queue = getOrCreateQueue(roomId);
prepareTrigger(agentId, roomId, limit, entry, threadRootId) {
return prepareTriggerInternal(agentId, roomId, limit, entry, threadRootId);
},
prepareReservedTrigger(agentId, roomId, limit, slot, entry, threadRootId) {
const queue = findScopedQueue(roomId, threadRootId);
if (!queue || queue.generation !== slot.queueGeneration) {
return prepareTriggerInternal(agentId, roomId, limit, entry, threadRootId);
}
const rel = slot.slotIdx - queue.baseIndex;
if (rel < 0 || rel >= queue.entries.length) {
return prepareTriggerInternal(agentId, roomId, limit, entry, threadRootId);
}
const retryKey = preparedTriggerKey(agentId, entry.messageId);
if (retryKey) {
const prepared = queue.preparedTriggers.get(retryKey);
if (prepared) {
queue.entries[rel] = {
sender: "",
body: "",
messageId: undefined,
discarded: true,
};
return rememberPreparedTrigger(queue, retryKey, prepared);
}
}
queue.entries[rel] = entry;
const prepared = {
history: computePendingHistory(queue, agentId, roomId, limit),
...appendToQueue(queue, entry),
history: computePendingHistory(
queue,
agentId,
roomId,
limit,
slot.slotIdx,
slot.watermarkIdx,
threadRootId,
),
snapshotIdx: slot.slotIdx + 1,
queueGeneration: queue.generation,
};
if (retryKey) {
return rememberPreparedTrigger(queue, retryKey, prepared);
@@ -241,12 +492,12 @@ function createRoomHistoryTrackerInternal(
return prepared;
},
consumeHistory(agentId, roomId, snapshot, messageId) {
const key = wmKey(agentId, roomId);
const queue = roomQueues.get(roomId);
consumeHistory(agentId, roomId, snapshot, messageId, threadRootId) {
const key = wmKey(agentId, roomId, threadRootId);
const queue = findScopedQueue(roomId, threadRootId);
if (!queue) {
// The room was evicted while this trigger was in flight. Keep eviction authoritative
// so a late completion cannot recreate a stale watermark against a fresh queue.
// The room or thread was evicted while this trigger was in flight. Keep eviction
// authoritative so a late completion cannot recreate a stale watermark.
agentWatermarks.delete(key);
return;
}
@@ -255,12 +506,20 @@ function createRoomHistoryTrackerInternal(
// snapshot so it cannot advance or erase state for the new queue generation.
return;
}
const firstReservedRel = queue.entries.findIndex(
(entry, index) => entry.reserved === true && queue.baseIndex + index < snapshot.snapshotIdx,
);
if (firstReservedRel >= 0) {
markConsumedAfterReservedGap(queue, key, firstReservedRel, snapshot.snapshotIdx);
}
const consumableSnapshotIdx =
firstReservedRel >= 0 ? queue.baseIndex + firstReservedRel : snapshot.snapshotIdx;
// Monotone write: never regress an already-advanced watermark.
// Guards against out-of-order completion when two triggers for the same
// (agentId, roomId) are in-flight concurrently.
rememberWatermark(key, snapshot.snapshotIdx);
rememberWatermark(key, consumableSnapshotIdx);
const retryKey = preparedTriggerKey(agentId, messageId);
if (queue && retryKey) {
if (retryKey) {
queue.preparedTriggers.delete(retryKey);
}
},
@@ -281,7 +540,11 @@ export function createRoomHistoryTracker(
);
return {
recordPending: tracker.recordPending,
reservePending: tracker.reservePending,
finalizePending: tracker.finalizePending,
discardPending: tracker.discardPending,
prepareTrigger: tracker.prepareTrigger,
prepareReservedTrigger: tracker.prepareReservedTrigger,
consumeHistory: tracker.consumeHistory,
};
}

View File

@@ -10,6 +10,7 @@ import {
type MatrixClient as MatrixJsClient,
type MatrixEvent,
} from "matrix-js-sdk/lib/matrix.js";
import type { Direction } from "matrix-js-sdk/lib/models/event-timeline.js";
import { VerificationMethod } from "matrix-js-sdk/lib/types.js";
import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue";
import type { PinnedDispatcherPolicy } from "openclaw/plugin-sdk/ssrf-dispatcher";
@@ -1078,7 +1079,9 @@ export class MatrixClient {
relationType: string | null,
eventType?: string | null,
opts: {
dir?: Direction;
from?: string;
limit?: number;
} = {},
): Promise<MatrixRelationsPage> {
const result = await this.client.relations(roomId, eventId, relationType, eventType, opts);

View File

@@ -97,6 +97,59 @@ describe("event-helpers", () => {
});
});
it("preserves original thread relation when serializing edited current content", () => {
const event = {
getId: () => "$root",
getSender: () => "@alice:example.org",
getType: () => "m.room.message",
getTs: () => 1000,
getOriginalContent: () => ({
body: "original",
msgtype: "m.text",
"m.relates_to": {
rel_type: "m.thread",
event_id: "$thread",
},
}),
getContent: () => ({
body: "@bot edited",
"m.mentions": { user_ids: ["@bot:example.org"] },
msgtype: "m.text",
}),
getUnsigned: () => ({}),
} as unknown as MatrixEvent;
expect(matrixEventToRaw(event).content["m.relates_to"]).toEqual({
rel_type: "m.thread",
event_id: "$thread",
});
});
it("preserves wire thread relation for decrypted encrypted events", () => {
const event = {
getId: () => "$encrypted",
getSender: () => "@alice:example.org",
getType: () => "m.room.message",
getTs: () => 1000,
getContent: () => ({
body: "decrypted edit",
msgtype: "m.text",
}),
getUnsigned: () => ({}),
getWireContent: () => ({
"m.relates_to": {
rel_type: "m.thread",
event_id: "$thread",
},
}),
} as unknown as MatrixEvent;
expect(matrixEventToRaw(event).content["m.relates_to"]).toEqual({
rel_type: "m.thread",
event_id: "$thread",
});
});
it("can serialize original content for inbound trigger filtering", () => {
expect(matrixEventToRaw(makeEditedMessageEvent(), { contentMode: "original" })).toEqual({
event_id: "$root",

View File

@@ -19,12 +19,13 @@ export function matrixEventToRaw(
opts.contentMode === "original"
? (eventWithOriginalContent.getOriginalContent?.() ?? event.getContent?.() ?? {})
: (event.getContent?.() ?? eventWithOriginalContent.getOriginalContent?.() ?? {});
const normalizedContent = preserveMatrixRelation(event, content || {});
const raw: MatrixRawEvent = {
event_id: event.getId() ?? "",
sender: event.getSender() ?? "",
type: event.getType() ?? "",
origin_server_ts: event.getTs() ?? 0,
content: content || {},
content: normalizedContent,
unsigned,
};
const stateKey = resolveMatrixStateKey(event);
@@ -34,6 +35,39 @@ export function matrixEventToRaw(
return raw;
}
function preserveMatrixRelation(
event: MatrixEvent,
content: Record<string, unknown>,
): Record<string, unknown> {
if (Object.hasOwn(content, "m.relates_to")) {
return content;
}
const relation = resolveMatrixRelation(event);
return relation ? { ...content, "m.relates_to": relation } : content;
}
function resolveMatrixRelation(event: MatrixEvent): unknown {
const originalContent = (
event as { getOriginalContent?: () => Record<string, unknown> | undefined }
).getOriginalContent?.();
const originalRelation = originalContent?.["m.relates_to"];
if (originalRelation) {
return originalRelation;
}
const wireContent = (
event as { getWireContent?: () => Record<string, unknown> | undefined }
).getWireContent?.();
const wireRelation = wireContent?.["m.relates_to"];
if (wireRelation) {
return wireRelation;
}
const rawContent = (event as { event?: { content?: unknown } }).event?.content;
if (rawContent && typeof rawContent === "object") {
return (rawContent as Record<string, unknown>)["m.relates_to"];
}
return undefined;
}
export function parseMxc(url: string): { server: string; mediaId: string } | null {
const match = /^mxc:\/\/([^/]+)\/(.+)$/.exec(url.trim());
if (!match) {

View File

@@ -607,6 +607,34 @@ describe("sendMessageMatrix threads", () => {
"m.relates_to"?: {
rel_type?: string;
event_id?: string;
is_falling_back?: boolean;
"m.in_reply_to"?: { event_id?: string };
};
};
expect(content["m.relates_to"]).toEqual({
rel_type: "m.thread",
event_id: "$thread",
});
expect(content["m.relates_to"]).not.toHaveProperty("is_falling_back");
expect(content["m.relates_to"]).not.toHaveProperty("m.in_reply_to");
});
it("includes thread fallback metadata only with an explicit reply target", async () => {
const { client, sendMessage } = makeClient();
await sendMessageMatrix("room:!room:example", "hello thread", {
client,
cfg: {} as never,
threadId: "$thread",
replyToId: "$reply",
});
const content = sentContent(sendMessage) as {
"m.relates_to"?: {
rel_type?: string;
event_id?: string;
is_falling_back?: boolean;
"m.in_reply_to"?: { event_id?: string };
};
};
@@ -615,7 +643,7 @@ describe("sendMessageMatrix threads", () => {
rel_type: "m.thread",
event_id: "$thread",
is_falling_back: true,
"m.in_reply_to": { event_id: "$thread" },
"m.in_reply_to": { event_id: "$reply" },
});
});
@@ -913,6 +941,52 @@ describe("editMessageMatrix mentions", () => {
expect(content[MATRIX_OPENCLAW_FINALIZED_PREVIEW_KEY]).toBe(true);
expect(newContent(content)[MATRIX_OPENCLAW_FINALIZED_PREVIEW_KEY]).toBe(true);
});
it("edits threaded originals with a pure replace relation", async () => {
const { client, getEvent, sendMessage } = makeClient();
getEvent.mockResolvedValue({
content: {
body: "before",
msgtype: "m.text",
"m.relates_to": {
rel_type: "m.thread",
event_id: "$thread",
},
},
});
await editMessageMatrix("room:!room:example", "$original", "done", {
client,
cfg: {} as never,
threadId: "$thread",
});
const content = sentContent(sendMessage);
expect(content["m.relates_to"]).toEqual({
rel_type: "m.replace",
event_id: "$original",
});
expect(newContent(content)).not.toHaveProperty("m.relates_to");
});
it("rejects thread edits when the original event is not already in that thread", async () => {
const { client, getEvent, sendMessage } = makeClient();
getEvent.mockResolvedValue({
content: {
body: "before",
msgtype: "m.text",
},
});
await expect(
editMessageMatrix("room:!room:example", "$original", "done", {
client,
cfg: {} as never,
threadId: "$thread",
}),
).rejects.toThrow("cannot add or change the original event thread relation");
expect(sendMessage).not.toHaveBeenCalled();
});
});
describe("sendPollMatrix mentions", () => {

View File

@@ -127,6 +127,28 @@ function resolvePreviousEditContent(previousEvent: unknown): Record<string, unkn
: content;
}
function resolvePreviousThreadId(previousEvent: unknown): string | undefined {
if (!previousEvent || typeof previousEvent !== "object") {
return undefined;
}
const content = (previousEvent as { content?: unknown }).content;
if (!content || typeof content !== "object") {
return undefined;
}
const relation = (content as Record<string, unknown>)["m.relates_to"];
if (!relation || typeof relation !== "object") {
return undefined;
}
const relationRecord = relation as { event_id?: unknown; rel_type?: unknown };
if (
relationRecord.rel_type !== RelationType.Thread ||
typeof relationRecord.event_id !== "string"
) {
return undefined;
}
return normalizeThreadId(relationRecord.event_id) ?? undefined;
}
function hasMatrixMentionsMetadata(content: Record<string, unknown> | undefined): boolean {
return Boolean(content && Object.hasOwn(content, "m.mentions"));
}
@@ -585,6 +607,7 @@ export async function editMessageMatrix(
markdown: convertedText,
includeMentions: opts.includeMentions,
});
const previousEvent = await getPreviousMatrixEvent(client, resolvedRoom, originalEventId);
const replaceMentions =
opts.includeMentions === false
? undefined
@@ -592,9 +615,7 @@ export async function editMessageMatrix(
extractMatrixMentions(newContent),
await resolvePreviousEditMentions({
client,
content: resolvePreviousEditContent(
await getPreviousMatrixEvent(client, resolvedRoom, originalEventId),
),
content: resolvePreviousEditContent(previousEvent),
}),
);
@@ -604,9 +625,11 @@ export async function editMessageMatrix(
};
const threadId = normalizeThreadId(opts.threadId);
if (threadId) {
// Thread-aware replace: Synapse needs the thread context to keep the
// edited event visible in the thread timeline.
replaceRelation["m.in_reply_to"] = { event_id: threadId };
// Matrix applies m.new_content while preserving the original relation.
// Edits can update threaded events, but cannot add or move thread membership.
if (resolvePreviousThreadId(previousEvent) !== threadId) {
throw new Error("Matrix edit cannot add or change the original event thread relation.");
}
}
// Spread newContent into the outer event so clients that don't support

View File

@@ -144,12 +144,16 @@ export function buildReplyRelation(replyToId?: string): MatrixReplyRelation | un
export function buildThreadRelation(threadId: string, replyToId?: string): MatrixThreadRelation {
const trimmed = threadId.trim();
return {
const relation: MatrixThreadRelation = {
rel_type: RelationType.Thread,
event_id: trimmed,
is_falling_back: true,
"m.in_reply_to": { event_id: replyToId?.trim() || trimmed },
};
const fallbackReplyToId = replyToId?.trim();
if (fallbackReplyToId) {
relation.is_falling_back = true;
relation["m.in_reply_to"] = { event_id: fallbackReplyToId };
}
return relation;
}
export function resolveMatrixMsgType(contentType?: string, _fileName?: string): MatrixMediaMsgType {

View File

@@ -272,10 +272,12 @@ export async function handleMatrixAction(
});
const before = readStringParam(params, "before");
const after = readStringParam(params, "after");
const threadId = readStringParam(params, "threadId");
const result = await readMatrixMessages(roomId, {
limit: limit ?? undefined,
before: before ?? undefined,
after: after ?? undefined,
threadId: threadId ?? undefined,
...clientOpts,
});
return jsonResult({ ok: true, ...result });

View File

@@ -32,6 +32,7 @@ type MatrixQaScenarioId =
| "matrix-room-image-understanding-attachment"
| "matrix-room-generated-image-delivery"
| "matrix-media-type-coverage"
| "matrix-voice-preflight-mention"
| "matrix-attachment-only-ignored"
| "matrix-unsupported-media-safe"
| "matrix-dm-reply-shape"
@@ -497,6 +498,19 @@ export const MATRIX_QA_SCENARIOS: MatrixQaScenarioDefinition[] = [
title: "Matrix media attachments cover image, audio, video, PDF, and EPUB transport",
topology: MATRIX_QA_MEDIA_ROOM_TOPOLOGY,
},
{
id: "matrix-voice-preflight-mention",
configOverrides: {
audio: {
enabled: true,
},
groupMentionPatterns: ["\\S"],
},
providerMode: "live-frontier",
timeoutMs: 180_000,
title: "Matrix voice notes can trigger mention gating through transcription",
topology: MATRIX_QA_MEDIA_ROOM_TOPOLOGY,
},
{
id: "matrix-attachment-only-ignored",
timeoutMs: 8_000,
@@ -1225,6 +1239,7 @@ const MATRIX_QA_MEDIA_PROFILE_SCENARIO_IDS = [
"matrix-room-image-understanding-attachment",
"matrix-room-generated-image-delivery",
"matrix-media-type-coverage",
"matrix-voice-preflight-mention",
"matrix-attachment-only-ignored",
"matrix-unsupported-media-safe",
"matrix-e2ee-media-image",

File diff suppressed because one or more lines are too long

View File

@@ -4,10 +4,13 @@ import { MATRIX_QA_MEDIA_ROOM_KEY, resolveMatrixQaScenarioRoomId } from "./scena
import {
buildMatrixQaImageGenerationPrompt,
buildMatrixQaImageUnderstandingPrompt,
createMatrixQaVoicePreflightWav,
createMatrixQaSplitColorImagePng,
hasMatrixQaExpectedColorReply,
MATRIX_QA_IMAGE_ATTACHMENT_FILENAME,
MATRIX_QA_MEDIA_TYPE_COVERAGE_CASES,
MATRIX_QA_VOICE_PREFLIGHT_FILENAME,
MATRIX_QA_VOICE_PREFLIGHT_REPLY_MARKER,
} from "./scenario-media-fixtures.js";
import {
advanceMatrixQaActorCursor,
@@ -63,6 +66,17 @@ function buildMatrixQaMediaTypeCoveragePrompt(params: {
return `${params.sutUserId} Matrix media type coverage (${params.label}): ignore the attachment content and reply with only this exact marker: ${params.token}`;
}
function normalizeMatrixQaVoiceReply(value: string | undefined) {
return (value ?? "")
.toUpperCase()
.replace(/[^A-Z0-9]+/g, " ")
.trim();
}
function hasMatrixQaVoicePreflightReply(body: string | undefined) {
return normalizeMatrixQaVoiceReply(body).includes(MATRIX_QA_VOICE_PREFLIGHT_REPLY_MARKER);
}
export async function runImageUnderstandingAttachmentScenario(context: MatrixQaScenarioContext) {
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_MEDIA_ROOM_KEY);
const { client, startSince } = await primeMatrixQaDriverMediaClient(context);
@@ -217,6 +231,67 @@ export async function runMediaTypeCoverageScenario(context: MatrixQaScenarioCont
} satisfies MatrixQaScenarioExecution;
}
export async function runVoicePreflightMentionScenario(context: MatrixQaScenarioContext) {
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_MEDIA_ROOM_KEY);
const { client, startSince } = await primeMatrixQaDriverMediaClient(context);
const driverEventId = await client.sendMediaMessage({
buffer: createMatrixQaVoicePreflightWav(),
contentType: "audio/wav",
fileName: MATRIX_QA_VOICE_PREFLIGHT_FILENAME,
kind: "audio",
roomId,
});
const attachmentEvent = await client.waitForRoomEvent({
observedEvents: context.observedEvents,
predicate: (event) =>
event.roomId === roomId &&
event.eventId === driverEventId &&
event.sender === context.driverUserId &&
event.msgtype === "m.audio" &&
event.attachment?.kind === "audio" &&
event.attachment.filename === MATRIX_QA_VOICE_PREFLIGHT_FILENAME &&
event.attachment.caption === undefined,
roomId,
since: startSince,
timeoutMs: context.timeoutMs,
});
const matched = await client.waitForRoomEvent({
observedEvents: context.observedEvents,
predicate: (event) =>
event.roomId === roomId &&
event.sender === context.sutUserId &&
event.type === "m.room.message" &&
event.relatesTo === undefined &&
isMatrixQaMessageLikeKind(event.kind) &&
hasMatrixQaVoicePreflightReply(event.body),
roomId,
since: attachmentEvent.since,
timeoutMs: context.timeoutMs,
});
advanceMatrixQaActorCursor({
actorId: "driver",
syncState: context.syncState,
nextSince: matched.since,
startSince,
});
const reply = buildMatrixReplyArtifact(matched.event, MATRIX_QA_VOICE_PREFLIGHT_REPLY_MARKER);
return {
artifacts: {
attachmentFilename: MATRIX_QA_VOICE_PREFLIGHT_FILENAME,
driverEventId,
reply,
roomId,
expectedMarker: MATRIX_QA_VOICE_PREFLIGHT_REPLY_MARKER,
},
details: [
`room id: ${roomId}`,
`driver voice event: ${driverEventId}`,
`voice filename: ${MATRIX_QA_VOICE_PREFLIGHT_FILENAME}`,
...buildMatrixReplyDetails("reply", reply),
].join("\n"),
} satisfies MatrixQaScenarioExecution;
}
export async function runAttachmentOnlyIgnoredScenario(context: MatrixQaScenarioContext) {
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_MEDIA_ROOM_KEY);
const { client, startSince } = await primeMatrixQaDriverMediaClient(context);

View File

@@ -77,6 +77,7 @@ import {
runImageUnderstandingAttachmentScenario,
runMediaTypeCoverageScenario,
runUnsupportedMediaSafeScenario,
runVoicePreflightMentionScenario,
} from "./scenario-runtime-media.js";
import {
runReactionNotAReplyScenario,
@@ -248,6 +249,8 @@ export async function runMatrixQaScenario(
return await runGeneratedImageDeliveryScenario(context);
case "matrix-media-type-coverage":
return await runMediaTypeCoverageScenario(context);
case "matrix-voice-preflight-mention":
return await runVoicePreflightMentionScenario(context);
case "matrix-attachment-only-ignored":
return await runAttachmentOnlyIgnoredScenario(context);
case "matrix-unsupported-media-safe":

View File

@@ -55,6 +55,7 @@ export type MatrixQaScenarioArtifacts = {
editEventId?: string;
editedToken?: string;
expectedNoReplyWindowMs?: number;
expectedMarker?: string;
firstDriverEventId?: string;
firstReply?: MatrixQaReplyArtifact;
firstToken?: string;

View File

@@ -50,7 +50,11 @@ import {
findMissingLiveTransportStandardScenarios,
} from "../../shared/live-transport-scenarios.js";
import type { MatrixQaObservedEvent } from "../../substrate/events.js";
import { MATRIX_QA_MEDIA_TYPE_COVERAGE_CASES } from "./scenario-media-fixtures.js";
import {
MATRIX_QA_MEDIA_TYPE_COVERAGE_CASES,
MATRIX_QA_VOICE_PREFLIGHT_FILENAME,
MATRIX_QA_VOICE_PREFLIGHT_REPLY_MARKER,
} from "./scenario-media-fixtures.js";
import {
testing as scenarioTesting,
MATRIX_QA_SCENARIOS,
@@ -383,6 +387,7 @@ describe("matrix live qa scenarios", () => {
"matrix-room-image-understanding-attachment",
"matrix-room-generated-image-delivery",
"matrix-media-type-coverage",
"matrix-voice-preflight-mention",
"matrix-attachment-only-ignored",
"matrix-unsupported-media-safe",
"matrix-dm-reply-shape",
@@ -4568,6 +4573,129 @@ describe("matrix live qa scenarios", () => {
).toBe(true);
});
it("sends voice preflight audio without a text mention and waits for the transcribed reply", async () => {
const primeRoom = vi.fn().mockResolvedValue("driver-sync-start");
const sendMediaMessage = vi.fn().mockResolvedValue("$voice-preflight");
const waitForRoomEvent = vi.fn().mockImplementation(async () => {
const callIndex = waitForRoomEvent.mock.calls.length - 1;
if (callIndex === 0) {
return {
event: {
kind: "message",
roomId: "!media:matrix-qa.test",
eventId: "$voice-preflight",
sender: "@driver:matrix-qa.test",
type: "m.room.message",
msgtype: "m.audio",
attachment: {
kind: "audio",
filename: MATRIX_QA_VOICE_PREFLIGHT_FILENAME,
},
},
since: "driver-sync-attachment",
};
}
return {
event: {
kind: "message",
roomId: "!media:matrix-qa.test",
eventId: "$voice-reply",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
body: `Sure: ${MATRIX_QA_VOICE_PREFLIGHT_REPLY_MARKER}.`,
},
since: "driver-sync-reply",
};
});
createMatrixQaClient.mockReturnValue({
primeRoom,
sendMediaMessage,
waitForRoomEvent,
});
const scenario = requireMatrixQaScenario("matrix-voice-preflight-mention");
expect(scenario.configOverrides?.audio?.enabled).toBe(true);
expect(scenario.configOverrides?.groupMentionPatterns).toEqual(["\\S"]);
const result = await runMatrixQaScenario(scenario, {
baseUrl: "http://127.0.0.1:28008/",
canary: undefined,
driverAccessToken: "driver-token",
driverUserId: "@driver:matrix-qa.test",
observedEvents: [],
observerAccessToken: "observer-token",
observerUserId: "@observer:matrix-qa.test",
roomId: "!main:matrix-qa.test",
restartGateway: undefined,
syncState: {},
sutAccessToken: "sut-token",
sutUserId: "@sut:matrix-qa.test",
timeoutMs: 8_000,
topology: {
defaultRoomId: "!main:matrix-qa.test",
defaultRoomKey: "main",
rooms: [
{
key: scenarioTesting.MATRIX_QA_MEDIA_ROOM_KEY,
kind: "group",
memberRoles: ["driver", "observer", "sut"],
memberUserIds: [
"@driver:matrix-qa.test",
"@observer:matrix-qa.test",
"@sut:matrix-qa.test",
],
name: "Media",
requireMention: true,
roomId: "!media:matrix-qa.test",
},
],
},
});
const mediaMessage = mockObjectArg(sendMediaMessage, "sendMediaMessage") as {
body?: unknown;
buffer?: Buffer;
contentType?: unknown;
fileName?: unknown;
kind?: unknown;
mentionUserIds?: unknown;
roomId?: unknown;
};
expect(mediaMessage.body).toBeUndefined();
expect(mediaMessage.buffer?.byteLength).toBeGreaterThan(1_000);
expect(mediaMessage.contentType).toBe("audio/wav");
expect(mediaMessage.fileName).toBe(MATRIX_QA_VOICE_PREFLIGHT_FILENAME);
expect(mediaMessage.kind).toBe("audio");
expect(mediaMessage.mentionUserIds).toBeUndefined();
expect(mediaMessage.roomId).toBe("!media:matrix-qa.test");
const replyWait = mockObjectArg(waitForRoomEvent, "waitForRoomEvent", 1) as {
predicate: (event: MatrixQaObservedEvent) => boolean;
};
expect(
replyWait.predicate({
kind: "message",
roomId: "!media:matrix-qa.test",
eventId: "$voice-reply",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
body: ` ${MATRIX_QA_VOICE_PREFLIGHT_REPLY_MARKER.toLowerCase()}!\n`,
}),
).toBe(true);
const artifacts = result.artifacts as {
attachmentFilename?: unknown;
driverEventId?: unknown;
expectedMarker?: unknown;
reply?: { eventId?: unknown };
};
expect(artifacts.attachmentFilename).toBe(MATRIX_QA_VOICE_PREFLIGHT_FILENAME);
expect(artifacts.driverEventId).toBe("$voice-preflight");
expect(artifacts.expectedMarker).toBe(MATRIX_QA_VOICE_PREFLIGHT_REPLY_MARKER);
expect(artifacts.reply?.eventId).toBe("$voice-reply");
});
it("uses DM thread override scenarios against the provisioned DM room", async () => {
const primeRoom = vi.fn().mockResolvedValue("driver-sync-start");
const sendTextMessage = vi.fn().mockResolvedValue("$dm-thread-trigger");

View File

@@ -110,6 +110,7 @@ describe("matrix qa config", () => {
allowBots: "mentions",
configuredBotRoles: ["observer"],
groupAllowFrom: ["@driver:matrix-qa.test", "@observer:matrix-qa.test"],
groupMentionPatterns: ["\\S"],
groupsByKey: {
secondary: {
allowBots: false,
@@ -127,6 +128,10 @@ describe("matrix qa config", () => {
spawnSessions: true,
},
threadReplies: "always",
audio: {
echoTranscript: false,
enabled: true,
},
toolProfile: "coding",
},
observerAccessToken: "observer-token",
@@ -147,6 +152,11 @@ describe("matrix qa config", () => {
minChars: 1,
});
expect(next.tools?.profile).toBe("coding");
expect(next.tools?.media?.audio).toEqual({
echoTranscript: false,
enabled: true,
});
expect(next.messages?.groupChat?.mentionPatterns).toEqual(["\\S"]);
const observer = next.channels?.matrix?.accounts?.["qa-observer-bot-source"];
expect(observer?.accessToken).toBe("observer-token");
expect(observer?.enabled).toBe(false);
@@ -227,6 +237,7 @@ describe("matrix qa config", () => {
dm: {
sessionScope: "per-room",
},
groupMentionPatterns: ["\\S"],
groupPolicy: "open",
streaming: true,
},
@@ -255,6 +266,7 @@ describe("matrix qa config", () => {
execApprovals: undefined,
configuredBotRoles: [],
groupAllowFrom: ["@driver:matrix-qa.test"],
groupMentionPatterns: ["\\S"],
groupPolicy: "open",
groupsByKey: {
main: {
@@ -277,6 +289,7 @@ describe("matrix qa config", () => {
});
expect(summarizeMatrixQaConfigSnapshot(snapshot)).toContain("allowBots=<default>");
expect(summarizeMatrixQaConfigSnapshot(snapshot)).toContain("configuredBotRoles=<none>");
expect(summarizeMatrixQaConfigSnapshot(snapshot)).toContain("groupMentionPatterns=\\S");
expect(summarizeMatrixQaConfigSnapshot(snapshot)).toContain("autoJoin=allowlist");
expect(summarizeMatrixQaConfigSnapshot(snapshot)).toContain("streaming=partial");
expect(summarizeMatrixQaConfigSnapshot(snapshot)).toContain(

View File

@@ -40,6 +40,10 @@ type MatrixQaToolConfigOverrides = {
deny?: string[];
};
type MatrixQaAudioConfigOverrides = NonNullable<
NonNullable<NonNullable<OpenClawConfig["tools"]>["media"]>["audio"]
>;
type MatrixQaGroupConfigOverrides = {
allowBots?: MatrixQaAllowBotsMode;
enabled?: boolean;
@@ -91,6 +95,7 @@ export type MatrixQaConfigOverrides = {
execApprovals?: MatrixQaExecApprovalsConfigOverrides;
groupAllowFrom?: string[];
groupAllowRoles?: MatrixQaActorRole[];
groupMentionPatterns?: string[];
groupPolicy?: MatrixQaGroupPolicy;
configuredBotRoles?: MatrixQaActorRole[];
groupsByKey?: Record<string, MatrixQaGroupConfigOverrides>;
@@ -100,6 +105,7 @@ export type MatrixQaConfigOverrides = {
textChunkLimit?: number;
threadBindings?: MatrixQaThreadBindingsConfigOverrides;
threadReplies?: MatrixQaThreadRepliesMode;
audio?: MatrixQaAudioConfigOverrides;
toolProfile?: "coding" | "messaging" | "minimal";
};
@@ -124,6 +130,7 @@ export type MatrixQaConfigSnapshot = {
execApprovals?: MatrixQaExecApprovalsConfigOverrides;
configuredBotRoles: MatrixQaActorRole[];
groupAllowFrom: string[];
groupMentionPatterns: string[];
groupPolicy: MatrixQaGroupPolicy;
groupsByKey: Record<string, MatrixQaGroupSnapshot>;
replyToMode: MatrixQaReplyToMode;
@@ -507,6 +514,7 @@ export function buildMatrixQaConfigSnapshot(params: {
execApprovals: params.overrides?.execApprovals,
configuredBotRoles: [...(params.overrides?.configuredBotRoles ?? [])],
groupAllowFrom: resolveMatrixQaGroupAllowFrom(params),
groupMentionPatterns: normalizeMatrixQaAllowlist(params.overrides?.groupMentionPatterns),
groupPolicy: params.overrides?.groupPolicy ?? "allowlist",
groupsByKey: resolveMatrixQaGroupSnapshots({
overrides: params.overrides,
@@ -539,6 +547,7 @@ export function summarizeMatrixQaConfigSnapshot(snapshot: MatrixQaConfigSnapshot
`dm.policy=${snapshot.dm.policy}`,
`dm.sessionScope=${snapshot.dm.sessionScope}`,
`dm.threadReplies=${snapshot.dm.threadReplies}`,
`groupMentionPatterns=${snapshot.groupMentionPatterns.length > 0 ? snapshot.groupMentionPatterns.join("|") : "<default>"}`,
`streaming=${snapshot.streaming}`,
`streaming.preview.toolProgress=${formatMatrixQaBoolean(snapshot.streamingPreviewToolProgress)}`,
`textChunkLimit=${snapshot.textChunkLimit ?? "<default>"}`,
@@ -616,15 +625,35 @@ export function buildMatrixQaConfig(
}
: {};
const toolsConfig =
params.overrides?.toolProfile || params.overrides?.audio
? {
...baseCfg.tools,
...(params.overrides?.toolProfile
? {
profile: params.overrides.toolProfile,
}
: {}),
...(params.overrides?.audio
? {
media: {
...baseCfg.tools?.media,
audio: {
...baseCfg.tools?.media?.audio,
...params.overrides.audio,
},
},
}
: {}),
}
: undefined;
return {
...baseCfg,
...approvalForwardingConfig,
...(params.overrides?.toolProfile
...(toolsConfig
? {
tools: {
...baseCfg.tools,
profile: params.overrides.toolProfile,
},
tools: toolsConfig,
}
: {}),
...(params.overrides?.agentDefaults
@@ -650,6 +679,9 @@ export function buildMatrixQaConfig(
...baseCfg.messages,
groupChat: {
...baseCfg.messages?.groupChat,
...(snapshot.groupMentionPatterns.length > 0
? { mentionPatterns: snapshot.groupMentionPatterns }
: {}),
visibleReplies: "automatic",
},
},

View File

@@ -256,5 +256,9 @@ export function promptMigrationSkillSelectionValues(
return prompt.prompt();
}
/** Compatibility alias for plugin selection prompts that share the same picker. */
/**
* Compatibility alias for plugin selection prompts that share the same picker.
*
* @deprecated Use promptMigrationSkillSelectionValues.
*/
export const promptMigrationSelectionValues = promptMigrationSkillSelectionValues;