mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
Fix Telegram stop debounce bypass (#83248)
Summary: - The PR adds a generic inbound debounce `cancelKey`, uses Telegram stop-like controls to cancel same-chat pen ... buffers and bypass debounce, and adds focused Telegram regression coverage plus updated channel test mocks. - Reproducibility: yes. by source inspection: current main enqueues Telegram text through inbound debounce bef ... nly has flush semantics for pending keyed work. I did not run a live Telegram repro in this read-only pass. Automerge notes: - PR branch already contained follow-up commit before automerge: Fix Telegram stop debounce bypass Validation: - ClawSweeper review passed for head19245a341d. - Required merge gates passed before the squash merge. Prepared head SHA:19245a341dReview: https://github.com/openclaw/openclaw/pull/83248#issuecomment-4472300906 Co-authored-by: VACInc <3279061+VACInc@users.noreply.github.com> Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: takhoffman Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -710,6 +710,7 @@ describe("Feishu inbound debounce regressions", () => {
|
||||
params.onError?.(new Error("dispatch failed"), [item]);
|
||||
},
|
||||
flushKey: async () => {},
|
||||
cancelKey: () => false,
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
@@ -34,6 +34,7 @@ export function createFeishuRuntimeMockModule(): {
|
||||
createInboundDebouncer: () => ({
|
||||
enqueue: async () => {},
|
||||
flushKey: async () => {},
|
||||
cancelKey: () => false,
|
||||
}),
|
||||
},
|
||||
text: {
|
||||
|
||||
@@ -88,6 +88,7 @@ function createImmediateInboundDebounce() {
|
||||
}
|
||||
},
|
||||
flushKey: async () => {},
|
||||
cancelKey: () => false,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -47,6 +47,8 @@ function createRuntimeStub(readAllowFromStore: ReturnType<typeof vi.fn>): Plugin
|
||||
resolveInboundDebounceMs: () => 0,
|
||||
createInboundDebouncer: () => ({
|
||||
enqueue: async () => {},
|
||||
flushKey: async () => {},
|
||||
cancelKey: () => false,
|
||||
}),
|
||||
},
|
||||
pairing: {
|
||||
|
||||
@@ -39,6 +39,8 @@ function createRuntimeStub(stateDir?: string): PluginRuntime {
|
||||
resolveInboundDebounceMs: () => 0,
|
||||
createInboundDebouncer: () => ({
|
||||
enqueue: async () => {},
|
||||
flushKey: async () => {},
|
||||
cancelKey: () => false,
|
||||
}),
|
||||
},
|
||||
},
|
||||
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
} from "openclaw/plugin-sdk/channel-inbound-debounce";
|
||||
import { resolveStoredModelOverride } from "openclaw/plugin-sdk/command-auth-native";
|
||||
import { hasControlCommand } from "openclaw/plugin-sdk/command-detection";
|
||||
import { isAbortRequestText } from "openclaw/plugin-sdk/command-primitives-runtime";
|
||||
import { buildCommandsMessagePaginated } from "openclaw/plugin-sdk/command-status";
|
||||
import type { DmPolicy, OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
|
||||
import type {
|
||||
@@ -1506,6 +1507,7 @@ export const registerTelegramHandlers = ({
|
||||
isForum: boolean;
|
||||
resolvedThreadId?: number;
|
||||
dmThreadId?: number;
|
||||
dmPolicy: DmPolicy;
|
||||
storeAllowFrom: string[];
|
||||
senderId: string;
|
||||
effectiveGroupAllow: NormalizedAllowFrom;
|
||||
@@ -1524,6 +1526,7 @@ export const registerTelegramHandlers = ({
|
||||
isForum,
|
||||
resolvedThreadId,
|
||||
dmThreadId,
|
||||
dmPolicy,
|
||||
storeAllowFrom,
|
||||
senderId,
|
||||
effectiveGroupAllow,
|
||||
@@ -1535,11 +1538,39 @@ export const registerTelegramHandlers = ({
|
||||
promptContextMinTimestampMs,
|
||||
} = params;
|
||||
|
||||
const messageText = getTelegramTextParts(msg).text;
|
||||
const botUsername = ctx.me?.username;
|
||||
const isAbortControlMessage = isAbortRequestText(messageText, { botUsername });
|
||||
let abortControlAuthorized: Promise<boolean> | undefined;
|
||||
const isAuthorizedAbortControlMessage = () => {
|
||||
if (!isAbortControlMessage || !senderId) {
|
||||
return Promise.resolve(false);
|
||||
}
|
||||
abortControlAuthorized ??= resolveTelegramCommandIngressAuthorization({
|
||||
accountId,
|
||||
cfg,
|
||||
dmPolicy,
|
||||
isGroup,
|
||||
chatId,
|
||||
resolvedThreadId,
|
||||
senderId,
|
||||
effectiveDmAllow,
|
||||
effectiveGroupAllow,
|
||||
ownerAccess: { ownerList: [], senderIsOwner: false },
|
||||
eventKind: "message",
|
||||
allowTextCommands: true,
|
||||
hasControlCommand: true,
|
||||
modeWhenAccessGroupsOff: "allow",
|
||||
includeDmAllowForGroupCommands: false,
|
||||
}).then((gate) => gate.authorized);
|
||||
return abortControlAuthorized;
|
||||
};
|
||||
|
||||
// Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars).
|
||||
// We buffer “near-limit” messages and append immediately-following parts.
|
||||
const text = typeof msg.text === "string" ? msg.text : undefined;
|
||||
const isCommandLike = (text ?? "").trim().startsWith("/");
|
||||
if (text && !isCommandLike) {
|
||||
if (text && !isCommandLike && !isAbortControlMessage) {
|
||||
const nowMs = Date.now();
|
||||
const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown";
|
||||
// Use resolvedThreadId for forum groups, dmThreadId for DM topics
|
||||
@@ -1602,6 +1633,15 @@ export const registerTelegramHandlers = ({
|
||||
scheduleTextFragmentFlush(entry);
|
||||
return;
|
||||
}
|
||||
} else if (text && isAbortControlMessage && (await isAuthorizedAbortControlMessage())) {
|
||||
const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown";
|
||||
const threadId = resolvedThreadId ?? dmThreadId;
|
||||
const key = `text:${chatId}:${threadId ?? "main"}:${senderId}`;
|
||||
const existing = textFragmentBuffer.get(key);
|
||||
if (existing) {
|
||||
clearTimeout(existing.timer);
|
||||
textFragmentBuffer.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
// Media group handling - buffer multi-image messages
|
||||
@@ -1743,15 +1783,27 @@ export const registerTelegramHandlers = ({
|
||||
debounceLane,
|
||||
})
|
||||
: null;
|
||||
if (senderId && (await isAuthorizedAbortControlMessage())) {
|
||||
for (const lane of ["default", "forward"] as const) {
|
||||
inboundDebouncer.cancelKey(
|
||||
buildTelegramInboundDebounceKey({
|
||||
accountId,
|
||||
conversationKey,
|
||||
senderId,
|
||||
debounceLane: lane,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
await inboundDebouncer.enqueue({
|
||||
ctx,
|
||||
msg,
|
||||
allMedia,
|
||||
storeAllowFrom,
|
||||
receivedAtMs: Date.now(),
|
||||
debounceKey,
|
||||
debounceKey: isAbortControlMessage ? null : debounceKey,
|
||||
debounceLane,
|
||||
botUsername: ctx.me?.username,
|
||||
botUsername,
|
||||
...promptContextBoundaryOptions(promptContextMinTimestampMs),
|
||||
});
|
||||
};
|
||||
@@ -2656,6 +2708,7 @@ export const registerTelegramHandlers = ({
|
||||
isForum: event.isForum,
|
||||
resolvedThreadId,
|
||||
dmThreadId,
|
||||
dmPolicy,
|
||||
storeAllowFrom,
|
||||
senderId: event.senderId,
|
||||
effectiveGroupAllow,
|
||||
|
||||
@@ -731,6 +731,289 @@ describe("createTelegramBot", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it.each(["stop", "/stop@openclaw_bot"] as const)(
|
||||
"lets %s bypass and cancel pending same-chat inbound debounce",
|
||||
async (stopText) => {
|
||||
const DEBOUNCE_MS = 4321;
|
||||
loadConfig.mockReturnValue({
|
||||
agents: {
|
||||
defaults: {
|
||||
envelopeTimezone: "utc",
|
||||
},
|
||||
},
|
||||
messages: {
|
||||
inbound: {
|
||||
debounceMs: DEBOUNCE_MS,
|
||||
},
|
||||
},
|
||||
channels: {
|
||||
telegram: { dmPolicy: "open", allowFrom: ["*"] },
|
||||
},
|
||||
});
|
||||
|
||||
installPerKeySequentializer();
|
||||
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
|
||||
const startedBodies: string[] = [];
|
||||
replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => {
|
||||
await opts?.onReplyStart?.();
|
||||
const body = ctx.Body ?? "";
|
||||
startedBodies.push(body);
|
||||
return { text: `reply:${body}` };
|
||||
});
|
||||
|
||||
const extractLatestDebounceFlush = () => {
|
||||
const debounceCallIndex = setTimeoutSpy.mock.calls.findLastIndex(
|
||||
(call) => call[1] === DEBOUNCE_MS,
|
||||
);
|
||||
expect(debounceCallIndex).toBeGreaterThanOrEqual(0);
|
||||
clearTimeout(
|
||||
setTimeoutSpy.mock.results[debounceCallIndex]?.value as ReturnType<typeof setTimeout>,
|
||||
);
|
||||
return setTimeoutSpy.mock.calls[debounceCallIndex]?.[0] as
|
||||
| (() => Promise<void>)
|
||||
| undefined;
|
||||
};
|
||||
|
||||
try {
|
||||
createTelegramBot({ token: "tok" });
|
||||
const messageHandler = getOnHandler("message") as (
|
||||
ctx: TelegramMiddlewareTestContext,
|
||||
) => Promise<void>;
|
||||
|
||||
await runTelegramMiddlewareChain({
|
||||
ctx: {
|
||||
update: { update_id: 101 },
|
||||
message: {
|
||||
chat: { id: 7, type: "private" },
|
||||
text: "first",
|
||||
date: 1736380800,
|
||||
message_id: 101,
|
||||
from: { id: 42, first_name: "Ada" },
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({}),
|
||||
},
|
||||
finalHandler: messageHandler,
|
||||
});
|
||||
|
||||
const flushFirst = extractLatestDebounceFlush();
|
||||
|
||||
await runTelegramMiddlewareChain({
|
||||
ctx: {
|
||||
update: { update_id: 102 },
|
||||
message: {
|
||||
chat: { id: 7, type: "private" },
|
||||
text: stopText,
|
||||
date: 1736380801,
|
||||
message_id: 102,
|
||||
from: { id: 42, first_name: "Ada" },
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({}),
|
||||
},
|
||||
finalHandler: messageHandler,
|
||||
});
|
||||
|
||||
expect(startedBodies).toHaveLength(1);
|
||||
expect(startedBodies[0]).toContain("stop");
|
||||
|
||||
await flushFirst?.();
|
||||
expect(startedBodies).toHaveLength(1);
|
||||
expect(sendMessageSpy.mock.calls.map((call) => String(call[1])).join("\n")).not.toContain(
|
||||
"reply:first",
|
||||
);
|
||||
} finally {
|
||||
setTimeoutSpy.mockRestore();
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
it("lets stop cancel pending same-chat forwarded debounce", async () => {
|
||||
const DEBOUNCE_MS = 4321;
|
||||
loadConfig.mockReturnValue({
|
||||
agents: {
|
||||
defaults: {
|
||||
envelopeTimezone: "utc",
|
||||
},
|
||||
},
|
||||
messages: {
|
||||
inbound: {
|
||||
debounceMs: DEBOUNCE_MS,
|
||||
},
|
||||
},
|
||||
channels: {
|
||||
telegram: { dmPolicy: "open", allowFrom: ["*"] },
|
||||
},
|
||||
});
|
||||
|
||||
installPerKeySequentializer();
|
||||
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
|
||||
const startedBodies: string[] = [];
|
||||
replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => {
|
||||
await opts?.onReplyStart?.();
|
||||
const body = ctx.Body ?? "";
|
||||
startedBodies.push(body);
|
||||
return { text: `reply:${body}` };
|
||||
});
|
||||
|
||||
const extractLatestForwardDebounceFlush = () => {
|
||||
const debounceCallIndex = setTimeoutSpy.mock.calls.findLastIndex((call) => call[1] === 80);
|
||||
expect(debounceCallIndex).toBeGreaterThanOrEqual(0);
|
||||
clearTimeout(
|
||||
setTimeoutSpy.mock.results[debounceCallIndex]?.value as ReturnType<typeof setTimeout>,
|
||||
);
|
||||
return setTimeoutSpy.mock.calls[debounceCallIndex]?.[0] as (() => Promise<void>) | undefined;
|
||||
};
|
||||
|
||||
try {
|
||||
createTelegramBot({ token: "tok" });
|
||||
const messageHandler = getOnHandler("message") as (
|
||||
ctx: TelegramMiddlewareTestContext,
|
||||
) => Promise<void>;
|
||||
|
||||
await runTelegramMiddlewareChain({
|
||||
ctx: {
|
||||
update: { update_id: 121 },
|
||||
message: {
|
||||
chat: { id: 7, type: "private" },
|
||||
text: "forwarded first",
|
||||
date: 1736380800,
|
||||
message_id: 121,
|
||||
from: { id: 42, first_name: "Ada" },
|
||||
forward_date: 1736380700,
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({}),
|
||||
},
|
||||
finalHandler: messageHandler,
|
||||
});
|
||||
|
||||
const flushForward = extractLatestForwardDebounceFlush();
|
||||
|
||||
await runTelegramMiddlewareChain({
|
||||
ctx: {
|
||||
update: { update_id: 122 },
|
||||
message: {
|
||||
chat: { id: 7, type: "private" },
|
||||
text: "stop",
|
||||
date: 1736380801,
|
||||
message_id: 122,
|
||||
from: { id: 42, first_name: "Ada" },
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({}),
|
||||
},
|
||||
finalHandler: messageHandler,
|
||||
});
|
||||
|
||||
expect(startedBodies).toHaveLength(1);
|
||||
expect(startedBodies[0]).toContain("stop");
|
||||
|
||||
await flushForward?.();
|
||||
expect(startedBodies).toHaveLength(1);
|
||||
expect(sendMessageSpy.mock.calls.map((call) => String(call[1])).join("\n")).not.toContain(
|
||||
"reply:forwarded first",
|
||||
);
|
||||
} finally {
|
||||
setTimeoutSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not let unauthorized group stop cancel pending same-sender inbound debounce", async () => {
|
||||
const DEBOUNCE_MS = 4321;
|
||||
loadConfig.mockReturnValue({
|
||||
agents: {
|
||||
defaults: {
|
||||
envelopeTimezone: "utc",
|
||||
},
|
||||
},
|
||||
messages: {
|
||||
inbound: {
|
||||
debounceMs: DEBOUNCE_MS,
|
||||
},
|
||||
},
|
||||
channels: {
|
||||
telegram: {
|
||||
dmPolicy: "pairing",
|
||||
groupPolicy: "open",
|
||||
groups: { "*": { requireMention: false } },
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
installPerKeySequentializer();
|
||||
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
|
||||
const startedBodies: string[] = [];
|
||||
replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => {
|
||||
await opts?.onReplyStart?.();
|
||||
const body = ctx.Body ?? "";
|
||||
startedBodies.push(body);
|
||||
return { text: `reply:${body}` };
|
||||
});
|
||||
|
||||
const extractLatestDebounceFlush = () => {
|
||||
const debounceCallIndex = setTimeoutSpy.mock.calls.findLastIndex(
|
||||
(call) => call[1] === DEBOUNCE_MS,
|
||||
);
|
||||
expect(debounceCallIndex).toBeGreaterThanOrEqual(0);
|
||||
clearTimeout(
|
||||
setTimeoutSpy.mock.results[debounceCallIndex]?.value as ReturnType<typeof setTimeout>,
|
||||
);
|
||||
return setTimeoutSpy.mock.calls[debounceCallIndex]?.[0] as (() => Promise<void>) | undefined;
|
||||
};
|
||||
|
||||
try {
|
||||
createTelegramBot({ token: "tok" });
|
||||
const messageHandler = getOnHandler("message") as (
|
||||
ctx: TelegramMiddlewareTestContext,
|
||||
) => Promise<void>;
|
||||
|
||||
await runTelegramMiddlewareChain({
|
||||
ctx: {
|
||||
update: { update_id: 104 },
|
||||
message: {
|
||||
chat: { id: -1007, type: "supergroup", title: "OpenClaw Ops" },
|
||||
text: "first",
|
||||
date: 1736380804,
|
||||
message_id: 104,
|
||||
from: { id: 42, first_name: "Ada" },
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({}),
|
||||
},
|
||||
finalHandler: messageHandler,
|
||||
});
|
||||
|
||||
const flushFirst = extractLatestDebounceFlush();
|
||||
|
||||
await runTelegramMiddlewareChain({
|
||||
ctx: {
|
||||
update: { update_id: 105 },
|
||||
message: {
|
||||
chat: { id: -1007, type: "supergroup", title: "OpenClaw Ops" },
|
||||
text: "stop",
|
||||
date: 1736380805,
|
||||
message_id: 105,
|
||||
from: { id: 42, first_name: "Ada" },
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({}),
|
||||
},
|
||||
finalHandler: messageHandler,
|
||||
});
|
||||
|
||||
await flushFirst?.();
|
||||
await vi.waitFor(() => {
|
||||
expect(startedBodies.some((body) => body.includes("first"))).toBe(true);
|
||||
});
|
||||
} finally {
|
||||
setTimeoutSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("routes callback_query payloads as messages and answers callbacks", async () => {
|
||||
createTelegramBot({ token: "tok" });
|
||||
const callbackHandler = requireValue(
|
||||
|
||||
@@ -170,6 +170,23 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
|
||||
await flushBuffer(key, buffer);
|
||||
};
|
||||
|
||||
const cancelKey = (key: string): boolean => {
|
||||
const buffer = buffers.get(key);
|
||||
if (!buffer) {
|
||||
return false;
|
||||
}
|
||||
if (buffers.get(key) === buffer) {
|
||||
buffers.delete(key);
|
||||
}
|
||||
if (buffer.timeout) {
|
||||
clearTimeout(buffer.timeout);
|
||||
buffer.timeout = null;
|
||||
}
|
||||
buffer.items = [];
|
||||
releaseBuffer(buffer);
|
||||
return true;
|
||||
};
|
||||
|
||||
const scheduleFlush = (key: string, buffer: DebounceBuffer<T>) => {
|
||||
if (buffer.timeout) {
|
||||
clearTimeout(buffer.timeout);
|
||||
@@ -262,5 +279,5 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
|
||||
scheduleFlush(key, buffer);
|
||||
};
|
||||
|
||||
return { enqueue, flushKey };
|
||||
return { enqueue, flushKey, cancelKey };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user