import { createChannelInboundDebouncer, shouldDebounceTextInbound, } from "openclaw/plugin-sdk/channel-inbound"; import { danger, logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolveOpenProviderRuntimeGroupPolicy } from "openclaw/plugin-sdk/runtime-group-policy"; import type { Client } from "../internal/discord.js"; import { buildDiscordInboundReplayKey, claimDiscordInboundReplay, commitDiscordInboundReplay, createDiscordInboundReplayGuard, DiscordRetryableInboundError, releaseDiscordInboundReplay, } from "./inbound-dedupe.js"; import { buildDiscordInboundJob, resolveDiscordInboundJobQueueKey } from "./inbound-job.js"; import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js"; import { applyImplicitReplyBatchGate } from "./message-handler.batch-gate.js"; import type { DiscordMessagePreflightContext, DiscordMessagePreflightParams, } from "./message-handler.preflight.types.js"; import { resolveDiscordAcceptedTypingPrestart } from "./message-handler.reply-typing-policy.js"; import { createDiscordMessageRunQueue, type DiscordMessageRunQueueTestingHooks, } from "./message-run-queue.js"; import { hasDiscordMessageStickers, resolveDiscordMessageChannelId, resolveDiscordMessageText, } from "./message-utils.js"; import { createDiscordReplyTypingFeedback, type DiscordReplyTypingFeedback, } from "./reply-typing-feedback.js"; import type { DiscordMonitorStatusSink } from "./status.js"; type PreflightDiscordMessage = typeof import("./message-handler.preflight.js").preflightDiscordMessage; type CreateDiscordReplyTypingFeedback = typeof createDiscordReplyTypingFeedback; type DiscordMessageHandlerParams = Omit< DiscordMessagePreflightParams, "ackReactionScope" | "groupPolicy" | "data" | "client" > & { setStatus?: DiscordMonitorStatusSink; abortSignal?: AbortSignal; testing?: DiscordMessageHandlerTestingHooks; }; type DiscordMessageHandlerTestingHooks = DiscordMessageRunQueueTestingHooks & { preflightDiscordMessage?: PreflightDiscordMessage; createReplyTypingFeedback?: CreateDiscordReplyTypingFeedback; }; type PrestartedTypingFeedbackEntry = { channelId: string; feedback: DiscordReplyTypingFeedback; }; let messagePreflightRuntimePromise: | Promise | undefined; async function loadMessagePreflightRuntime() { messagePreflightRuntimePromise ??= import("./message-handler.preflight.js"); return await messagePreflightRuntimePromise; } export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & { deactivate: () => void; }; function isNonEmptyString(value: string | undefined): value is string { return typeof value === "string" && value.length > 0; } function startAcceptedTypingFeedback(params: { ctx: DiscordMessagePreflightContext; createFeedback?: CreateDiscordReplyTypingFeedback; dedupeKey: string; activeFeedback: Map; }): DiscordReplyTypingFeedback | undefined { const { ctx, createFeedback, dedupeKey, activeFeedback } = params; if (!resolveDiscordAcceptedTypingPrestart(ctx).shouldPrestart) { return undefined; } const channelId = ctx.messageChannelId.trim(); const existing = activeFeedback.get(dedupeKey); if (existing) { // One pre-dispatch keepalive owns each serialized Discord queue key. // Later queued jobs get fresh typing when their dispatch turn starts. return undefined; } const replyTypingFeedback = ctx.replyTypingFeedback ?? (createFeedback ?? createDiscordReplyTypingFeedback)({ cfg: ctx.cfg, token: ctx.token, accountId: ctx.accountId, channelId: ctx.messageChannelId, log: logVerbose, }); const cleanup = replyTypingFeedback.onCleanup; replyTypingFeedback.onCleanup = () => { cleanup?.(); // Cleanup is the lease release for both normal dispatch and skipped jobs. // Without this, a stale queue key would suppress future accepted typing. if (activeFeedback.get(dedupeKey)?.feedback === replyTypingFeedback) { activeFeedback.delete(dedupeKey); } }; activeFeedback.set(dedupeKey, { channelId, feedback: replyTypingFeedback }); ctx.replyTypingFeedback = replyTypingFeedback; void replyTypingFeedback.onReplyStart().catch((err: unknown) => { logVerbose(`discord accepted typing feedback failed: ${String(err)}`); }); return replyTypingFeedback; } export function createDiscordMessageHandler( params: DiscordMessageHandlerParams, ): DiscordMessageHandlerWithLifecycle { const { groupPolicy } = resolveOpenProviderRuntimeGroupPolicy({ providerConfigPresent: params.cfg.channels?.discord !== undefined, groupPolicy: params.discordConfig?.groupPolicy, defaultGroupPolicy: params.cfg.channels?.defaults?.groupPolicy, }); const ackReactionScope = params.discordConfig?.ackReactionScope ?? params.cfg.messages?.ackReactionScope ?? "group-mentions"; const preflightDiscordMessageImpl = params.testing?.preflightDiscordMessage; const replayGuard = createDiscordInboundReplayGuard(); // The map owns pre-dispatch typing leases, not queued work itself. // Each lease is released by the feedback cleanup hook installed below. const prestartedTypingFeedback = new Map(); const messageRunQueue = createDiscordMessageRunQueue({ runtime: params.runtime, setStatus: params.setStatus, abortSignal: params.abortSignal, replayGuard, testing: params.testing, }); const { debouncer } = createChannelInboundDebouncer<{ data: DiscordMessageEvent; client: Client; abortSignal?: AbortSignal; replayKey?: string; }>({ cfg: params.cfg, channel: "discord", buildKey: (entry) => { const message = entry.data.message; const authorId = entry.data.author?.id; if (!message || !authorId) { return null; } const channelId = resolveDiscordMessageChannelId({ message, eventChannelId: entry.data.channel_id, }); if (!channelId) { return null; } return `discord:${params.accountId}:${channelId}:${authorId}`; }, shouldDebounce: (entry) => { const message = entry.data.message; if (!message) { return false; } const baseText = resolveDiscordMessageText(message, { includeForwarded: false }); return shouldDebounceTextInbound({ text: baseText, cfg: params.cfg, hasMedia: (message.attachments && message.attachments.length > 0) || hasDiscordMessageStickers(message), }); }, onFlush: async (entries) => { const last = entries.at(-1); if (!last) { return; } const replayKeys = entries.map((entry) => entry.replayKey).filter(isNonEmptyString); const abortSignal = last.abortSignal; if (abortSignal?.aborted) { releaseDiscordInboundReplay({ replayKeys, error: abortSignal.reason, replayGuard, }); return; } try { if (entries.length === 1) { const preflight = preflightDiscordMessageImpl ?? (await loadMessagePreflightRuntime()).preflightDiscordMessage; const ctx = await preflight({ ...params, ackReactionScope, groupPolicy, abortSignal, data: last.data, client: last.client, }); if (!ctx) { await commitDiscordInboundReplay({ replayKeys, replayGuard }); return; } const queueKey = resolveDiscordInboundJobQueueKey(ctx); startAcceptedTypingFeedback({ ctx, createFeedback: params.testing?.createReplyTypingFeedback, dedupeKey: queueKey, activeFeedback: prestartedTypingFeedback, }); applyImplicitReplyBatchGate(ctx, params.replyToMode, false); messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); return; } const combinedBaseText = entries .map((entry) => resolveDiscordMessageText(entry.data.message, { includeForwarded: false }), ) .filter(Boolean) .join("\n"); const syntheticMessage = Object.create(Object.getPrototypeOf(last.data.message), { ...Object.getOwnPropertyDescriptors(last.data.message), content: { value: combinedBaseText, enumerable: true, configurable: true }, attachments: { value: [], enumerable: true, configurable: true }, message_snapshots: { value: (last.data.message as { message_snapshots?: unknown }).message_snapshots, enumerable: true, configurable: true, }, messageSnapshots: { value: (last.data.message as { messageSnapshots?: unknown }).messageSnapshots, enumerable: true, configurable: true, }, rawData: { value: { ...(last.data.message as { rawData?: Record }).rawData }, enumerable: true, configurable: true, }, }) as DiscordMessageEvent["message"]; const syntheticData: DiscordMessageEvent = { ...last.data, message: syntheticMessage, }; const preflight = preflightDiscordMessageImpl ?? (await loadMessagePreflightRuntime()).preflightDiscordMessage; const ctx = await preflight({ ...params, ackReactionScope, groupPolicy, abortSignal, data: syntheticData, client: last.client, }); if (!ctx) { await commitDiscordInboundReplay({ replayKeys, replayGuard }); return; } const queueKey = resolveDiscordInboundJobQueueKey(ctx); startAcceptedTypingFeedback({ ctx, createFeedback: params.testing?.createReplyTypingFeedback, dedupeKey: queueKey, activeFeedback: prestartedTypingFeedback, }); applyImplicitReplyBatchGate(ctx, params.replyToMode, true); if (entries.length > 1) { const ids = entries.map((entry) => entry.data.message?.id).filter(isNonEmptyString); if (ids.length > 0) { const ctxBatch = ctx as typeof ctx & { MessageSids?: string[]; MessageSidFirst?: string; MessageSidLast?: string; }; ctxBatch.MessageSids = ids; ctxBatch.MessageSidFirst = ids[0]; ctxBatch.MessageSidLast = ids[ids.length - 1]; } } messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); } catch (error) { if (error instanceof DiscordRetryableInboundError) { releaseDiscordInboundReplay({ replayKeys, error, replayGuard }); } else { await commitDiscordInboundReplay({ replayKeys, replayGuard }); } throw error; } }, onError: (err) => { params.runtime.error(danger(`discord debounce flush failed: ${String(err)}`)); }, }); const handler: DiscordMessageHandlerWithLifecycle = async (data, client, options) => { try { if (options?.abortSignal?.aborted) { return; } // Filter bot-own messages before they enter the debounce queue. // The same check exists in preflightDiscordMessage(), but by that point // the message has already consumed debounce capacity and blocked // legitimate user messages. On active servers this causes cumulative // slowdown (see #15874). const msgAuthorId = data.message?.author?.id ?? data.author?.id; if (params.botUserId && msgAuthorId === params.botUserId) { return; } const replayKey = buildDiscordInboundReplayKey({ accountId: params.accountId, data, }); if ( !(await claimDiscordInboundReplay({ replayKey, replayGuard, })) ) { return; } await debouncer.enqueue({ data, client, abortSignal: options?.abortSignal, replayKey: replayKey ?? undefined, }); } catch (err) { params.runtime.error(danger(`handler failed: ${String(err)}`)); } }; handler.deactivate = messageRunQueue.deactivate; return handler; }