mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix: repair anchorless iMessage watch payloads
Repair explicit anchorless iMessage watch payloads by GUID before debounce/routing, and drop unrecoverable payloads fail-closed instead of routing them as sender DMs. Closes #84470. Refs #84503. Thanks @zhangguiping-xydt and @zqchris.
This commit is contained in:
@@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Installer: avoid the incompatible generated `--before` install filter when raw npm `min-release-age` config is present. (#85491) Thanks @TurboTheTurtle.
|
||||
- Agents/MCP: bound bundled MCP `tools/list` catalog discovery so hung MCP servers do not block session tool materialization. (#85063) Thanks @nxmxbbd.
|
||||
- Scripts: run generated-module formatting through the shared pnpm launcher and Windows CI coverage so native Windows generator checks avoid shell-mode package-manager shims.
|
||||
- Channels/iMessage: recover malformed anchorless group watch payloads by GUID before debounce/routing, and drop unrecoverable payloads instead of replying to the sender DM. Fixes #84470. Refs #84503. Thanks @zhangguiping-xydt and @zqchris.
|
||||
- Channels/iMessage: advance the startup catchup cursor from live-handled rows after a completed catchup pass, including rows received while catchup is still running, so restarts do not replay them. (#85475) Thanks @TurboTheTurtle.
|
||||
- Tests: mount the shared Windows command helper into bare Docker E2E harness containers so published upgrade-survivor config walks can start on Linux.
|
||||
- Tests: keep the plugin binding command escape Docker smoke focused on its intended Vitest cases and skip source-only install lifecycle scripts.
|
||||
|
||||
@@ -324,6 +324,101 @@ describe("iMessage monitor last-route updates", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("repairs anchorless group watch payloads before routing or cursor updates", async () => {
|
||||
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-anchor-repair-"));
|
||||
tempDirs.push(stateDir);
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
|
||||
|
||||
let onNotification: ((message: { method: string; params: unknown }) => void) | undefined;
|
||||
const client = {
|
||||
request: vi.fn(async (method: string, params?: Record<string, unknown>) => {
|
||||
if (method === "watch.subscribe") {
|
||||
return { subscription: 1 };
|
||||
}
|
||||
if (method === "chats.list") {
|
||||
return { chats: [{ id: 349 }] };
|
||||
}
|
||||
if (method === "messages.history") {
|
||||
expect(params?.chat_id).toBe(349);
|
||||
return {
|
||||
messages: [
|
||||
{
|
||||
id: 9500,
|
||||
guid: "ANCHORLESS-GROUP-GUID",
|
||||
chat_id: 349,
|
||||
chat_guid: "iMessage;+;chat349",
|
||||
chat_identifier: "chat349",
|
||||
chat_name: "Project group",
|
||||
participants: ["+15550001111", "+15550002222"],
|
||||
is_group: true,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
throw new Error(`unexpected imsg method ${method}`);
|
||||
}),
|
||||
waitForClose: vi.fn(async () => {
|
||||
onNotification?.({
|
||||
method: "message",
|
||||
params: {
|
||||
message: {
|
||||
id: 9500,
|
||||
guid: "ANCHORLESS-GROUP-GUID",
|
||||
chat_id: 0,
|
||||
sender: "+15550001111",
|
||||
is_from_me: false,
|
||||
text: "@openclaw check this https://example.com",
|
||||
is_group: false,
|
||||
chat_guid: "",
|
||||
chat_identifier: "",
|
||||
chat_name: "",
|
||||
participants: null,
|
||||
created_at: "2026-05-22T15:30:00.000Z",
|
||||
},
|
||||
},
|
||||
});
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
}),
|
||||
stop: vi.fn(async () => {}),
|
||||
};
|
||||
createIMessageRpcClientMock.mockImplementation(async (params) => {
|
||||
if (!params?.onNotification) {
|
||||
throw new Error("expected iMessage notification handler");
|
||||
}
|
||||
onNotification = params.onNotification;
|
||||
return client as never;
|
||||
});
|
||||
|
||||
await monitorIMessageProvider({
|
||||
config: {
|
||||
channels: {
|
||||
imessage: {
|
||||
catchup: { enabled: true },
|
||||
groupPolicy: "open",
|
||||
groups: { "*": { requireMention: true } },
|
||||
},
|
||||
},
|
||||
messages: {
|
||||
groupChat: { mentionPatterns: ["@openclaw"] },
|
||||
inbound: { debounceMs: 0 },
|
||||
},
|
||||
session: { mainKey: "main" },
|
||||
} as never,
|
||||
runtime: { error: vi.fn(), exit: vi.fn(), log: vi.fn() },
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
const dispatchParams = dispatchInboundMessageMock.mock.calls.at(0)?.[0];
|
||||
expect(dispatchParams?.ctx.To).toBe("chat_id:349");
|
||||
expect(dispatchParams?.ctx.From).toBe("imessage:group:349");
|
||||
expect(dispatchParams?.ctx.ChatType).toBe("group");
|
||||
expect(dispatchParams?.ctx.SessionKey).toBe("agent:main:imessage:group:349");
|
||||
expect(dispatchParams?.ctx.To).not.toBe("imessage:+15550001111");
|
||||
});
|
||||
|
||||
it("does not advance the live cursor after partial startup catchup", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-05-22T15:31:00.000Z"));
|
||||
|
||||
155
extensions/imessage/src/monitor/conversation-repair.test.ts
Normal file
155
extensions/imessage/src/monitor/conversation-repair.test.ts
Normal file
@@ -0,0 +1,155 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { isIMessageAnchorless, repairIMessageConversationAnchor } from "./conversation-repair.js";
|
||||
import type { IMessagePayload } from "./types.js";
|
||||
|
||||
function anchorlessMessage(overrides: Partial<IMessagePayload> = {}): IMessagePayload {
|
||||
return {
|
||||
id: 9500,
|
||||
guid: "ANCHORLESS-GUID-1",
|
||||
chat_id: 0,
|
||||
sender: "+15550001111",
|
||||
is_from_me: false,
|
||||
text: "https://example.com",
|
||||
chat_guid: "",
|
||||
chat_identifier: "",
|
||||
chat_name: "",
|
||||
participants: null,
|
||||
is_group: false,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function mockClient(chats: Array<{ id: number; messages: Record<string, unknown>[] }>) {
|
||||
const request = vi.fn(async (method: string, params?: Record<string, unknown>) => {
|
||||
if (method === "chats.list") {
|
||||
return { chats: chats.map((chat) => ({ id: chat.id })) };
|
||||
}
|
||||
if (method === "messages.history") {
|
||||
return {
|
||||
messages: chats.find((chat) => chat.id === params?.chat_id)?.messages ?? [],
|
||||
};
|
||||
}
|
||||
throw new Error(`unexpected method ${method}`);
|
||||
});
|
||||
return { request };
|
||||
}
|
||||
|
||||
describe("isIMessageAnchorless", () => {
|
||||
it("detects explicit broken conversation anchors", () => {
|
||||
expect(isIMessageAnchorless(anchorlessMessage())).toBe(true);
|
||||
expect(isIMessageAnchorless(anchorlessMessage({ chat_guid: undefined }))).toBe(true);
|
||||
expect(isIMessageAnchorless(anchorlessMessage({ chat_identifier: undefined }))).toBe(true);
|
||||
expect(
|
||||
isIMessageAnchorless(
|
||||
anchorlessMessage({ chat_id: undefined, chat_guid: "", chat_identifier: "" }),
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("does not classify sender-only direct messages as anchorless", () => {
|
||||
expect(
|
||||
isIMessageAnchorless({
|
||||
guid: "DM-GUID",
|
||||
sender: "+15550001111",
|
||||
is_from_me: false,
|
||||
text: "hello",
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("does not classify messages with any usable conversation anchor", () => {
|
||||
expect(isIMessageAnchorless(anchorlessMessage({ chat_id: 349 }))).toBe(false);
|
||||
expect(isIMessageAnchorless(anchorlessMessage({ chat_guid: "iMessage;+;chat349" }))).toBe(
|
||||
false,
|
||||
);
|
||||
expect(isIMessageAnchorless(anchorlessMessage({ chat_identifier: "chat349" }))).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("repairIMessageConversationAnchor", () => {
|
||||
it("passes through non-anchorless messages without recovery RPCs", async () => {
|
||||
const message = anchorlessMessage({ chat_id: 349, is_group: true });
|
||||
const client = mockClient([]);
|
||||
|
||||
await expect(
|
||||
repairIMessageConversationAnchor({ client: client as never, message }),
|
||||
).resolves.toBe(message);
|
||||
expect(client.request).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("recovers the conversation from recent history by GUID", async () => {
|
||||
const message = anchorlessMessage();
|
||||
const client = mockClient([
|
||||
{ id: 100, messages: [{ guid: "OTHER-GUID", chat_id: 100, is_group: true }] },
|
||||
{
|
||||
id: 349,
|
||||
messages: [
|
||||
{
|
||||
guid: "ANCHORLESS-GUID-1",
|
||||
chat_id: 349,
|
||||
chat_guid: "iMessage;+;chat349",
|
||||
chat_identifier: "chat349",
|
||||
chat_name: "Project group",
|
||||
participants: ["+15550001111", "+15550002222"],
|
||||
is_group: true,
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
|
||||
const repaired = await repairIMessageConversationAnchor({
|
||||
client: client as never,
|
||||
message,
|
||||
});
|
||||
|
||||
expect(repaired).toMatchObject({
|
||||
chat_id: 349,
|
||||
chat_guid: "iMessage;+;chat349",
|
||||
chat_identifier: "chat349",
|
||||
chat_name: "Project group",
|
||||
participants: ["+15550001111", "+15550002222"],
|
||||
is_group: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("drops fail-closed when the GUID cannot be matched", async () => {
|
||||
const runtime = { error: vi.fn() };
|
||||
const client = mockClient([{ id: 349, messages: [{ guid: "OTHER-GUID", chat_id: 349 }] }]);
|
||||
|
||||
await expect(
|
||||
repairIMessageConversationAnchor({
|
||||
client: client as never,
|
||||
message: anchorlessMessage(),
|
||||
runtime,
|
||||
}),
|
||||
).resolves.toBeNull();
|
||||
expect(runtime.error.mock.calls.at(-1)?.[0]).toContain("no recent chat matched");
|
||||
});
|
||||
|
||||
it("drops fail-closed when history finds the GUID but no usable anchor", async () => {
|
||||
const runtime = { error: vi.fn() };
|
||||
const client = mockClient([
|
||||
{
|
||||
id: 349,
|
||||
messages: [
|
||||
{
|
||||
guid: "ANCHORLESS-GUID-1",
|
||||
chat_id: 0,
|
||||
chat_guid: "",
|
||||
chat_identifier: "",
|
||||
is_group: false,
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
|
||||
await expect(
|
||||
repairIMessageConversationAnchor({
|
||||
client: client as never,
|
||||
message: anchorlessMessage(),
|
||||
runtime,
|
||||
}),
|
||||
).resolves.toBeNull();
|
||||
expect(runtime.error.mock.calls.at(-1)?.[0]).toContain("no usable conversation anchor");
|
||||
});
|
||||
});
|
||||
167
extensions/imessage/src/monitor/conversation-repair.ts
Normal file
167
extensions/imessage/src/monitor/conversation-repair.ts
Normal file
@@ -0,0 +1,167 @@
|
||||
import type { IMessageRpcClient } from "../client.js";
|
||||
import type { IMessagePayload } from "./types.js";
|
||||
|
||||
const DEFAULT_CHATS_LIMIT = 20;
|
||||
const DEFAULT_PER_CHAT_HISTORY_LIMIT = 50;
|
||||
const DEFAULT_RPC_TIMEOUT_MS = 5_000;
|
||||
|
||||
type RuntimeLogger = {
|
||||
error?: (message: string) => void;
|
||||
log?: (message: string) => void;
|
||||
};
|
||||
|
||||
type ChatsListEntry = {
|
||||
id?: number | null;
|
||||
};
|
||||
|
||||
type MessagesHistoryResult = {
|
||||
messages?: unknown[];
|
||||
};
|
||||
|
||||
export type RepairIMessageConversationAnchorParams = {
|
||||
client: IMessageRpcClient;
|
||||
message: IMessagePayload;
|
||||
runtime?: RuntimeLogger;
|
||||
chatsLimit?: number;
|
||||
perChatHistoryLimit?: number;
|
||||
rpcTimeoutMs?: number;
|
||||
};
|
||||
|
||||
function isNonEmptyString(value: unknown): value is string {
|
||||
return typeof value === "string" && value.trim() !== "";
|
||||
}
|
||||
|
||||
function hasPositiveChatId(value: unknown): value is number {
|
||||
return typeof value === "number" && Number.isFinite(value) && value > 0;
|
||||
}
|
||||
|
||||
function isExplicitEmptyString(value: unknown): boolean {
|
||||
return typeof value === "string" && value.trim() === "";
|
||||
}
|
||||
|
||||
export function isIMessageAnchorless(message: IMessagePayload): boolean {
|
||||
const hasUsableAnchor =
|
||||
hasPositiveChatId(message.chat_id) ||
|
||||
isNonEmptyString(message.chat_guid) ||
|
||||
isNonEmptyString(message.chat_identifier);
|
||||
if (hasUsableAnchor) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const hasExplicitBrokenAnchor =
|
||||
message.chat_id === null ||
|
||||
(typeof message.chat_id === "number" &&
|
||||
(!Number.isFinite(message.chat_id) || message.chat_id <= 0)) ||
|
||||
isExplicitEmptyString(message.chat_guid) ||
|
||||
isExplicitEmptyString(message.chat_identifier);
|
||||
|
||||
return hasExplicitBrokenAnchor;
|
||||
}
|
||||
|
||||
function overlayRecoveredConversation(
|
||||
message: IMessagePayload,
|
||||
entry: Record<string, unknown>,
|
||||
): IMessagePayload {
|
||||
const repaired = { ...message };
|
||||
|
||||
if (hasPositiveChatId(entry.chat_id)) {
|
||||
repaired.chat_id = entry.chat_id;
|
||||
}
|
||||
if (isNonEmptyString(entry.chat_guid)) {
|
||||
repaired.chat_guid = entry.chat_guid;
|
||||
}
|
||||
if (isNonEmptyString(entry.chat_identifier)) {
|
||||
repaired.chat_identifier = entry.chat_identifier;
|
||||
}
|
||||
if (typeof entry.is_group === "boolean") {
|
||||
repaired.is_group = entry.is_group;
|
||||
}
|
||||
if (typeof entry.chat_name === "string") {
|
||||
repaired.chat_name = entry.chat_name;
|
||||
}
|
||||
if (
|
||||
Array.isArray(entry.participants) &&
|
||||
entry.participants.every((participant) => typeof participant === "string")
|
||||
) {
|
||||
repaired.participants = entry.participants;
|
||||
}
|
||||
|
||||
return repaired;
|
||||
}
|
||||
|
||||
export async function repairIMessageConversationAnchor(
|
||||
params: RepairIMessageConversationAnchorParams,
|
||||
): Promise<IMessagePayload | null> {
|
||||
const { client, message, runtime } = params;
|
||||
|
||||
if (!isIMessageAnchorless(message)) {
|
||||
return message;
|
||||
}
|
||||
|
||||
const guid = message.guid?.trim();
|
||||
if (!guid) {
|
||||
runtime?.error?.("imessage: dropping anchorless message without GUID");
|
||||
return null;
|
||||
}
|
||||
|
||||
let chatsResult: { chats?: ChatsListEntry[] } | undefined;
|
||||
try {
|
||||
chatsResult = await client.request<{ chats?: ChatsListEntry[] }>(
|
||||
"chats.list",
|
||||
{ limit: params.chatsLimit ?? DEFAULT_CHATS_LIMIT },
|
||||
{ timeoutMs: params.rpcTimeoutMs ?? DEFAULT_RPC_TIMEOUT_MS },
|
||||
);
|
||||
} catch (err) {
|
||||
runtime?.error?.(`imessage: anchorless message recovery failed listing chats: ${String(err)}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
const chats = chatsResult?.chats ?? [];
|
||||
for (const chat of chats) {
|
||||
const chatId = hasPositiveChatId(chat.id) ? chat.id : null;
|
||||
if (chatId === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let historyResult: MessagesHistoryResult | undefined;
|
||||
try {
|
||||
historyResult = await client.request<MessagesHistoryResult>(
|
||||
"messages.history",
|
||||
{
|
||||
attachments: false,
|
||||
chat_id: chatId,
|
||||
limit: params.perChatHistoryLimit ?? DEFAULT_PER_CHAT_HISTORY_LIMIT,
|
||||
},
|
||||
{ timeoutMs: params.rpcTimeoutMs ?? DEFAULT_RPC_TIMEOUT_MS },
|
||||
);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
const messages = Array.isArray(historyResult?.messages) ? historyResult.messages : [];
|
||||
for (const raw of messages) {
|
||||
if (!raw || typeof raw !== "object" || Array.isArray(raw)) {
|
||||
continue;
|
||||
}
|
||||
const entry = raw as Record<string, unknown>;
|
||||
if (entry.guid !== guid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const repaired = overlayRecoveredConversation(message, entry);
|
||||
if (isIMessageAnchorless(repaired)) {
|
||||
runtime?.error?.(
|
||||
`imessage: dropping anchorless message GUID=${guid} after recovery found no usable conversation anchor`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
runtime?.log?.(
|
||||
`imessage: recovered anchorless message GUID=${guid} chat_id=${repaired.chat_id ?? "unknown"} is_group=${repaired.is_group === true}`,
|
||||
);
|
||||
return repaired;
|
||||
}
|
||||
}
|
||||
|
||||
runtime?.error?.(`imessage: dropping anchorless message GUID=${guid}; no recent chat matched`);
|
||||
return null;
|
||||
}
|
||||
@@ -57,6 +57,7 @@ import { attachIMessageMonitorAbortHandler } from "./abort-handler.js";
|
||||
import { runIMessageCatchup } from "./catchup-bridge.js";
|
||||
import { advanceIMessageCatchupCursor, resolveCatchupConfig } from "./catchup.js";
|
||||
import { combineIMessagePayloads } from "./coalesce.js";
|
||||
import { repairIMessageConversationAnchor } from "./conversation-repair.js";
|
||||
import { createIMessageEchoCachingSend, deliverReplies } from "./deliver.js";
|
||||
import { createSentMessageCache } from "./echo-cache.js";
|
||||
import {
|
||||
@@ -357,6 +358,16 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
return client;
|
||||
};
|
||||
|
||||
async function repairMessageConversationAnchor(
|
||||
message: IMessagePayload,
|
||||
): Promise<IMessagePayload | null> {
|
||||
return await repairIMessageConversationAnchor({
|
||||
client: getActiveClient(),
|
||||
message,
|
||||
runtime,
|
||||
});
|
||||
}
|
||||
|
||||
function resolveLiveCatchupCursor(
|
||||
message: IMessagePayload,
|
||||
): { lastSeenMs: number; lastSeenRowid: number } | null {
|
||||
@@ -425,7 +436,12 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
}
|
||||
}
|
||||
|
||||
async function handleMessageNowInner(message: IMessagePayload) {
|
||||
async function handleMessageNowInner(rawMessage: IMessagePayload) {
|
||||
const message = await repairMessageConversationAnchor(rawMessage);
|
||||
if (!message) {
|
||||
return;
|
||||
}
|
||||
|
||||
const messageText = (message.text ?? "").trim();
|
||||
|
||||
const attachments = includeAttachments ? (message.attachments ?? []) : [];
|
||||
@@ -872,7 +888,11 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
runtime.error?.(`imessage: dropping malformed RPC message payload (keys=${shape})`);
|
||||
return;
|
||||
}
|
||||
await inboundDebouncer.enqueue({ message });
|
||||
const repairedMessage = await repairMessageConversationAnchor(message);
|
||||
if (!repairedMessage) {
|
||||
return;
|
||||
}
|
||||
await inboundDebouncer.enqueue({ message: repairedMessage });
|
||||
};
|
||||
|
||||
await waitForTransportReady({
|
||||
|
||||
Reference in New Issue
Block a user