From aa71f7fe15685a00b76fd088e33cab8d5a9066ab Mon Sep 17 00:00:00 2001 From: VACInc Date: Sun, 17 May 2026 17:10:53 -0400 Subject: [PATCH] Fix Telegram stop debounce bypass (#83248) Summary: - The PR adds a generic inbound debounce `cancelKey`, uses Telegram stop-like controls to cancel same-chat pen ... buffers and bypass debounce, and adds focused Telegram regression coverage plus updated channel test mocks. - Reproducibility: yes. by source inspection: current main enqueues Telegram text through inbound debounce bef ... nly has flush semantics for pending keyed work. I did not run a live Telegram repro in this read-only pass. Automerge notes: - PR branch already contained follow-up commit before automerge: Fix Telegram stop debounce bypass Validation: - ClawSweeper review passed for head 19245a341de7ff7d8664a7e738179dfef6574822. - Required merge gates passed before the squash merge. Prepared head SHA: 19245a341de7ff7d8664a7e738179dfef6574822 Review: https://github.com/openclaw/openclaw/pull/83248#issuecomment-4472300906 Co-authored-by: VACInc <3279061+VACInc@users.noreply.github.com> Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: takhoffman Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com> --- .../feishu/src/monitor.reaction.test.ts | 1 + extensions/feishu/src/monitor.test-mocks.ts | 1 + .../test-support/lifecycle-test-support.ts | 1 + .../monitor-handler.feedback-authz.test.ts | 2 + .../src/monitor-handler.file-consent.test.ts | 2 + .../telegram/src/bot-handlers.runtime.ts | 59 +++- .../src/bot.create-telegram-bot.test.ts | 283 ++++++++++++++++++ src/auto-reply/inbound-debounce.ts | 19 +- 8 files changed, 364 insertions(+), 4 deletions(-) diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index ba2e7cfc4af9..00ff5bdac781 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -710,6 +710,7 @@ describe("Feishu inbound debounce regressions", () => { params.onError?.(new Error("dispatch failed"), [item]); }, flushKey: async () => {}, + cancelKey: () => false, }), }), ); diff --git a/extensions/feishu/src/monitor.test-mocks.ts b/extensions/feishu/src/monitor.test-mocks.ts index 691337f1f927..f15dabea0877 100644 --- a/extensions/feishu/src/monitor.test-mocks.ts +++ b/extensions/feishu/src/monitor.test-mocks.ts @@ -34,6 +34,7 @@ export function createFeishuRuntimeMockModule(): { createInboundDebouncer: () => ({ enqueue: async () => {}, flushKey: async () => {}, + cancelKey: () => false, }), }, text: { diff --git a/extensions/feishu/src/test-support/lifecycle-test-support.ts b/extensions/feishu/src/test-support/lifecycle-test-support.ts index 5b71ccfd88d7..38b544f17de3 100644 --- a/extensions/feishu/src/test-support/lifecycle-test-support.ts +++ b/extensions/feishu/src/test-support/lifecycle-test-support.ts @@ -88,6 +88,7 @@ function createImmediateInboundDebounce() { } }, flushKey: async () => {}, + cancelKey: () => false, }), }; } diff --git a/extensions/msteams/src/monitor-handler.feedback-authz.test.ts b/extensions/msteams/src/monitor-handler.feedback-authz.test.ts index 0e66d4c8486f..7d3f3e7c5250 100644 --- a/extensions/msteams/src/monitor-handler.feedback-authz.test.ts +++ b/extensions/msteams/src/monitor-handler.feedback-authz.test.ts @@ -47,6 +47,8 @@ function createRuntimeStub(readAllowFromStore: ReturnType): Plugin resolveInboundDebounceMs: () => 0, createInboundDebouncer: () => ({ enqueue: async () => {}, + flushKey: async () => {}, + cancelKey: () => false, }), }, pairing: { diff --git a/extensions/msteams/src/monitor-handler.file-consent.test.ts b/extensions/msteams/src/monitor-handler.file-consent.test.ts index dd5d535275e5..1a135ee34881 100644 --- a/extensions/msteams/src/monitor-handler.file-consent.test.ts +++ b/extensions/msteams/src/monitor-handler.file-consent.test.ts @@ -39,6 +39,8 @@ function createRuntimeStub(stateDir?: string): PluginRuntime { resolveInboundDebounceMs: () => 0, createInboundDebouncer: () => ({ enqueue: async () => {}, + flushKey: async () => {}, + cancelKey: () => false, }), }, }, diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index 6ad36b0a6e88..462cb70e3299 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -14,6 +14,7 @@ import { } from "openclaw/plugin-sdk/channel-inbound-debounce"; import { resolveStoredModelOverride } from "openclaw/plugin-sdk/command-auth-native"; import { hasControlCommand } from "openclaw/plugin-sdk/command-detection"; +import { isAbortRequestText } from "openclaw/plugin-sdk/command-primitives-runtime"; import { buildCommandsMessagePaginated } from "openclaw/plugin-sdk/command-status"; import type { DmPolicy, OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; import type { @@ -1506,6 +1507,7 @@ export const registerTelegramHandlers = ({ isForum: boolean; resolvedThreadId?: number; dmThreadId?: number; + dmPolicy: DmPolicy; storeAllowFrom: string[]; senderId: string; effectiveGroupAllow: NormalizedAllowFrom; @@ -1524,6 +1526,7 @@ export const registerTelegramHandlers = ({ isForum, resolvedThreadId, dmThreadId, + dmPolicy, storeAllowFrom, senderId, effectiveGroupAllow, @@ -1535,11 +1538,39 @@ export const registerTelegramHandlers = ({ promptContextMinTimestampMs, } = params; + const messageText = getTelegramTextParts(msg).text; + const botUsername = ctx.me?.username; + const isAbortControlMessage = isAbortRequestText(messageText, { botUsername }); + let abortControlAuthorized: Promise | undefined; + const isAuthorizedAbortControlMessage = () => { + if (!isAbortControlMessage || !senderId) { + return Promise.resolve(false); + } + abortControlAuthorized ??= resolveTelegramCommandIngressAuthorization({ + accountId, + cfg, + dmPolicy, + isGroup, + chatId, + resolvedThreadId, + senderId, + effectiveDmAllow, + effectiveGroupAllow, + ownerAccess: { ownerList: [], senderIsOwner: false }, + eventKind: "message", + allowTextCommands: true, + hasControlCommand: true, + modeWhenAccessGroupsOff: "allow", + includeDmAllowForGroupCommands: false, + }).then((gate) => gate.authorized); + return abortControlAuthorized; + }; + // Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars). // We buffer “near-limit” messages and append immediately-following parts. const text = typeof msg.text === "string" ? msg.text : undefined; const isCommandLike = (text ?? "").trim().startsWith("/"); - if (text && !isCommandLike) { + if (text && !isCommandLike && !isAbortControlMessage) { const nowMs = Date.now(); const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown"; // Use resolvedThreadId for forum groups, dmThreadId for DM topics @@ -1602,6 +1633,15 @@ export const registerTelegramHandlers = ({ scheduleTextFragmentFlush(entry); return; } + } else if (text && isAbortControlMessage && (await isAuthorizedAbortControlMessage())) { + const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown"; + const threadId = resolvedThreadId ?? dmThreadId; + const key = `text:${chatId}:${threadId ?? "main"}:${senderId}`; + const existing = textFragmentBuffer.get(key); + if (existing) { + clearTimeout(existing.timer); + textFragmentBuffer.delete(key); + } } // Media group handling - buffer multi-image messages @@ -1743,15 +1783,27 @@ export const registerTelegramHandlers = ({ debounceLane, }) : null; + if (senderId && (await isAuthorizedAbortControlMessage())) { + for (const lane of ["default", "forward"] as const) { + inboundDebouncer.cancelKey( + buildTelegramInboundDebounceKey({ + accountId, + conversationKey, + senderId, + debounceLane: lane, + }), + ); + } + } await inboundDebouncer.enqueue({ ctx, msg, allMedia, storeAllowFrom, receivedAtMs: Date.now(), - debounceKey, + debounceKey: isAbortControlMessage ? null : debounceKey, debounceLane, - botUsername: ctx.me?.username, + botUsername, ...promptContextBoundaryOptions(promptContextMinTimestampMs), }); }; @@ -2656,6 +2708,7 @@ export const registerTelegramHandlers = ({ isForum: event.isForum, resolvedThreadId, dmThreadId, + dmPolicy, storeAllowFrom, senderId: event.senderId, effectiveGroupAllow, diff --git a/extensions/telegram/src/bot.create-telegram-bot.test.ts b/extensions/telegram/src/bot.create-telegram-bot.test.ts index 6179ac627677..76226a63bbdf 100644 --- a/extensions/telegram/src/bot.create-telegram-bot.test.ts +++ b/extensions/telegram/src/bot.create-telegram-bot.test.ts @@ -731,6 +731,289 @@ describe("createTelegramBot", () => { } }); + it.each(["stop", "/stop@openclaw_bot"] as const)( + "lets %s bypass and cancel pending same-chat inbound debounce", + async (stopText) => { + const DEBOUNCE_MS = 4321; + loadConfig.mockReturnValue({ + agents: { + defaults: { + envelopeTimezone: "utc", + }, + }, + messages: { + inbound: { + debounceMs: DEBOUNCE_MS, + }, + }, + channels: { + telegram: { dmPolicy: "open", allowFrom: ["*"] }, + }, + }); + + installPerKeySequentializer(); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const startedBodies: string[] = []; + replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => { + await opts?.onReplyStart?.(); + const body = ctx.Body ?? ""; + startedBodies.push(body); + return { text: `reply:${body}` }; + }); + + const extractLatestDebounceFlush = () => { + const debounceCallIndex = setTimeoutSpy.mock.calls.findLastIndex( + (call) => call[1] === DEBOUNCE_MS, + ); + expect(debounceCallIndex).toBeGreaterThanOrEqual(0); + clearTimeout( + setTimeoutSpy.mock.results[debounceCallIndex]?.value as ReturnType, + ); + return setTimeoutSpy.mock.calls[debounceCallIndex]?.[0] as + | (() => Promise) + | undefined; + }; + + try { + createTelegramBot({ token: "tok" }); + const messageHandler = getOnHandler("message") as ( + ctx: TelegramMiddlewareTestContext, + ) => Promise; + + await runTelegramMiddlewareChain({ + ctx: { + update: { update_id: 101 }, + message: { + chat: { id: 7, type: "private" }, + text: "first", + date: 1736380800, + message_id: 101, + from: { id: 42, first_name: "Ada" }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }, + finalHandler: messageHandler, + }); + + const flushFirst = extractLatestDebounceFlush(); + + await runTelegramMiddlewareChain({ + ctx: { + update: { update_id: 102 }, + message: { + chat: { id: 7, type: "private" }, + text: stopText, + date: 1736380801, + message_id: 102, + from: { id: 42, first_name: "Ada" }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }, + finalHandler: messageHandler, + }); + + expect(startedBodies).toHaveLength(1); + expect(startedBodies[0]).toContain("stop"); + + await flushFirst?.(); + expect(startedBodies).toHaveLength(1); + expect(sendMessageSpy.mock.calls.map((call) => String(call[1])).join("\n")).not.toContain( + "reply:first", + ); + } finally { + setTimeoutSpy.mockRestore(); + } + }, + ); + + it("lets stop cancel pending same-chat forwarded debounce", async () => { + const DEBOUNCE_MS = 4321; + loadConfig.mockReturnValue({ + agents: { + defaults: { + envelopeTimezone: "utc", + }, + }, + messages: { + inbound: { + debounceMs: DEBOUNCE_MS, + }, + }, + channels: { + telegram: { dmPolicy: "open", allowFrom: ["*"] }, + }, + }); + + installPerKeySequentializer(); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const startedBodies: string[] = []; + replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => { + await opts?.onReplyStart?.(); + const body = ctx.Body ?? ""; + startedBodies.push(body); + return { text: `reply:${body}` }; + }); + + const extractLatestForwardDebounceFlush = () => { + const debounceCallIndex = setTimeoutSpy.mock.calls.findLastIndex((call) => call[1] === 80); + expect(debounceCallIndex).toBeGreaterThanOrEqual(0); + clearTimeout( + setTimeoutSpy.mock.results[debounceCallIndex]?.value as ReturnType, + ); + return setTimeoutSpy.mock.calls[debounceCallIndex]?.[0] as (() => Promise) | undefined; + }; + + try { + createTelegramBot({ token: "tok" }); + const messageHandler = getOnHandler("message") as ( + ctx: TelegramMiddlewareTestContext, + ) => Promise; + + await runTelegramMiddlewareChain({ + ctx: { + update: { update_id: 121 }, + message: { + chat: { id: 7, type: "private" }, + text: "forwarded first", + date: 1736380800, + message_id: 121, + from: { id: 42, first_name: "Ada" }, + forward_date: 1736380700, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }, + finalHandler: messageHandler, + }); + + const flushForward = extractLatestForwardDebounceFlush(); + + await runTelegramMiddlewareChain({ + ctx: { + update: { update_id: 122 }, + message: { + chat: { id: 7, type: "private" }, + text: "stop", + date: 1736380801, + message_id: 122, + from: { id: 42, first_name: "Ada" }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }, + finalHandler: messageHandler, + }); + + expect(startedBodies).toHaveLength(1); + expect(startedBodies[0]).toContain("stop"); + + await flushForward?.(); + expect(startedBodies).toHaveLength(1); + expect(sendMessageSpy.mock.calls.map((call) => String(call[1])).join("\n")).not.toContain( + "reply:forwarded first", + ); + } finally { + setTimeoutSpy.mockRestore(); + } + }); + + it("does not let unauthorized group stop cancel pending same-sender inbound debounce", async () => { + const DEBOUNCE_MS = 4321; + loadConfig.mockReturnValue({ + agents: { + defaults: { + envelopeTimezone: "utc", + }, + }, + messages: { + inbound: { + debounceMs: DEBOUNCE_MS, + }, + }, + channels: { + telegram: { + dmPolicy: "pairing", + groupPolicy: "open", + groups: { "*": { requireMention: false } }, + }, + }, + }); + + installPerKeySequentializer(); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const startedBodies: string[] = []; + replySpy.mockImplementation(async (ctx: MsgContext, opts?: GetReplyOptions) => { + await opts?.onReplyStart?.(); + const body = ctx.Body ?? ""; + startedBodies.push(body); + return { text: `reply:${body}` }; + }); + + const extractLatestDebounceFlush = () => { + const debounceCallIndex = setTimeoutSpy.mock.calls.findLastIndex( + (call) => call[1] === DEBOUNCE_MS, + ); + expect(debounceCallIndex).toBeGreaterThanOrEqual(0); + clearTimeout( + setTimeoutSpy.mock.results[debounceCallIndex]?.value as ReturnType, + ); + return setTimeoutSpy.mock.calls[debounceCallIndex]?.[0] as (() => Promise) | undefined; + }; + + try { + createTelegramBot({ token: "tok" }); + const messageHandler = getOnHandler("message") as ( + ctx: TelegramMiddlewareTestContext, + ) => Promise; + + await runTelegramMiddlewareChain({ + ctx: { + update: { update_id: 104 }, + message: { + chat: { id: -1007, type: "supergroup", title: "OpenClaw Ops" }, + text: "first", + date: 1736380804, + message_id: 104, + from: { id: 42, first_name: "Ada" }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }, + finalHandler: messageHandler, + }); + + const flushFirst = extractLatestDebounceFlush(); + + await runTelegramMiddlewareChain({ + ctx: { + update: { update_id: 105 }, + message: { + chat: { id: -1007, type: "supergroup", title: "OpenClaw Ops" }, + text: "stop", + date: 1736380805, + message_id: 105, + from: { id: 42, first_name: "Ada" }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }, + finalHandler: messageHandler, + }); + + await flushFirst?.(); + await vi.waitFor(() => { + expect(startedBodies.some((body) => body.includes("first"))).toBe(true); + }); + } finally { + setTimeoutSpy.mockRestore(); + } + }); + it("routes callback_query payloads as messages and answers callbacks", async () => { createTelegramBot({ token: "tok" }); const callbackHandler = requireValue( diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 33ebe63b9e59..8c840978d808 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -170,6 +170,23 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams await flushBuffer(key, buffer); }; + const cancelKey = (key: string): boolean => { + const buffer = buffers.get(key); + if (!buffer) { + return false; + } + if (buffers.get(key) === buffer) { + buffers.delete(key); + } + if (buffer.timeout) { + clearTimeout(buffer.timeout); + buffer.timeout = null; + } + buffer.items = []; + releaseBuffer(buffer); + return true; + }; + const scheduleFlush = (key: string, buffer: DebounceBuffer) => { if (buffer.timeout) { clearTimeout(buffer.timeout); @@ -262,5 +279,5 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams scheduleFlush(key, buffer); }; - return { enqueue, flushKey }; + return { enqueue, flushKey, cancelKey }; }