From 6f20f29688d7255437aa4d929726105ba2bda0a5 Mon Sep 17 00:00:00 2001 From: Brian <95547369+zhuisDEV@users.noreply.github.com> Date: Sun, 31 May 2026 03:39:39 +1000 Subject: [PATCH] fix(discord): carry reply typing feedback through queue Carry Discord reply typing feedback through preflight, queued dispatch, and cleanup so delayed accepted replies keep typing alive at the actual dispatch target without duplicate keepalives. Adds focused Discord queue/process policy coverage and stronger lifecycle invariant comments. --- AGENTS.md | 2 +- extensions/discord/src/monitor/inbound-job.ts | 7 + .../message-handler.preflight.types.ts | 2 + .../monitor/message-handler.process.test.ts | 97 ++++++- .../src/monitor/message-handler.process.ts | 49 ++-- .../src/monitor/message-handler.queue.test.ts | 253 +++++++++++++----- ...essage-handler.reply-typing-policy.test.ts | 123 +++++++++ .../message-handler.reply-typing-policy.ts | 76 ++++++ .../discord/src/monitor/message-handler.ts | 104 ++++--- .../discord/src/monitor/message-run-queue.ts | 57 +++- .../src/monitor/reply-typing-feedback.ts | 71 +++++ 11 files changed, 720 insertions(+), 121 deletions(-) create mode 100644 extensions/discord/src/monitor/message-handler.reply-typing-policy.test.ts create mode 100644 extensions/discord/src/monitor/message-handler.reply-typing-policy.ts create mode 100644 extensions/discord/src/monitor/reply-typing-feedback.ts diff --git a/AGENTS.md b/AGENTS.md index 44bc0f2afd58..f74b8f21288f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -78,7 +78,7 @@ Skills own workflows; root owns hard policy and routing. - Gateway/plugin metadata is process-stable: installs, manifests, catalogs, generated paths, bundled metadata. Changes require restart or explicit owner reload/install/doctor flow. - Runtime hot paths: no freshness polling (`stat`/`realpath`/JSON reread/hash). Reuse current snapshots, install records, discovery, lookup tables, root scopes, resolved paths. - Process-local metadata caches ok when lifecycle-owned and bounded/single-slot. Freshness exceptions need named owner + tests. -- Inline comments: preserve reviewer context at the code site. Use for cross-path/state invariants, platform/dependency caps, deterministic ordering, compact encoded state, lifecycle ordering, ownership boundaries, session/id adoption, queue-depth symmetry, fallbacks, or intentional caller differences. +- Inline comments: preserve reviewer context at the code site. Required for non-obvious cross-path/state invariants, lifecycle ordering, ownership boundaries, queue/dedupe symmetry, TTL/cache expiry, cleanup/release coupling, session/id adoption, fallback behavior, platform/dependency caps, deterministic ordering, compact encoded state, or intentional caller differences. - Comment shape: 1-3 short lines; state why the branch/helper exists, what contract it protects, and the bad outcome if removed. Cite nearby constants/helpers when useful. No syntax narration, PR/user-specific lore, or obvious mechanics. - Gateway protocol changes: additive first; incompatible needs versioning/docs/client follow-through. - Protocol version bumps: explicit owner confirmation only; never automatic/generated. diff --git a/extensions/discord/src/monitor/inbound-job.ts b/extensions/discord/src/monitor/inbound-job.ts index 6ac7b0ad4f34..448ba4728014 100644 --- a/extensions/discord/src/monitor/inbound-job.ts +++ b/extensions/discord/src/monitor/inbound-job.ts @@ -12,6 +12,9 @@ type DiscordInboundJobRuntimeField = | "guildHistories" | "client" | "threadBindings" + // Function-backed feedback stays runtime-only; payload must remain + // materializable data so queued jobs cannot accidentally serialize it. + | "replyTypingFeedback" | "discordRestFetch"; type DiscordInboundJobRuntime = Pick; @@ -26,6 +29,8 @@ export type DiscordInboundJob = { }; export function resolveDiscordInboundJobQueueKey(ctx: DiscordMessagePreflightContext): string { + // This key is both the run-queue serialization key and the typing prestart + // dedupe key, so keep it aligned with the eventual session route. const sessionKey = ctx.route.sessionKey?.trim(); if (sessionKey) { return sessionKey; @@ -47,6 +52,7 @@ export function buildDiscordInboundJob( guildHistories, client, threadBindings, + replyTypingFeedback, discordRestFetch, message, data, @@ -72,6 +78,7 @@ export function buildDiscordInboundJob( guildHistories, client, threadBindings, + replyTypingFeedback, discordRestFetch, }, replayKeys: options?.replayKeys ? [...options.replayKeys] : undefined, diff --git a/extensions/discord/src/monitor/message-handler.preflight.types.ts b/extensions/discord/src/monitor/message-handler.preflight.types.ts index 9539f4b6732e..43a338499b39 100644 --- a/extensions/discord/src/monitor/message-handler.preflight.types.ts +++ b/extensions/discord/src/monitor/message-handler.preflight.types.ts @@ -8,6 +8,7 @@ import type { ChannelType, Client, User } from "../internal/discord.js"; import type { DiscordChannelConfigResolved, DiscordGuildEntryResolved } from "./allow-list.js"; import type { DiscordChannelInfo } from "./message-utils.js"; import type { DiscordThreadBindingLookup } from "./reply-delivery.js"; +import type { DiscordReplyTypingFeedback } from "./reply-typing-feedback.js"; import type { DiscordSenderIdentity } from "./sender-identity.js"; export type { DiscordSenderIdentity } from "./sender-identity.js"; @@ -95,6 +96,7 @@ export type DiscordMessagePreflightContext = DiscordMessagePreflightSharedFields historyEntry?: HistoryEntry; threadBindings: DiscordThreadBindingLookup; + replyTypingFeedback?: DiscordReplyTypingFeedback; discordRestFetch?: typeof fetch; botLoopProtection?: ChannelBotLoopProtectionFacts; }; diff --git a/extensions/discord/src/monitor/message-handler.process.test.ts b/extensions/discord/src/monitor/message-handler.process.test.ts index 724f0d94bbae..98b2b36df8d4 100644 --- a/extensions/discord/src/monitor/message-handler.process.test.ts +++ b/extensions/discord/src/monitor/message-handler.process.test.ts @@ -86,6 +86,16 @@ vi.mock("../send.js", () => ({ }, })); +const typingMocks = vi.hoisted(() => ({ + sendTyping: vi.fn<(params: { rest: unknown; channelId: string }) => Promise>( + async () => {}, + ), +})); + +vi.mock("./typing.js", () => ({ + sendTyping: typingMocks.sendTyping, +})); + const discordTargetMocks = vi.hoisted(() => ({ resolveDiscordTargetChannelId: vi.fn(async (target: string, _opts?: unknown) => ({ channelId: target === "user:u1" ? "dm-u1" : target, @@ -169,6 +179,7 @@ type DispatchInboundParams = { onPartialReply?: (payload: { text?: string }) => Promise | void; onAssistantMessageStart?: () => Promise | void; allowProgressCallbacksWhenSourceDeliverySuppressed?: boolean; + onTypingCleanup?: () => Promise | void; }; }; const dispatchInboundMessage = vi.hoisted(() => @@ -233,6 +244,7 @@ let createThreadBindingManager: typeof import("./thread-bindings.js").createThre let processDiscordMessage: typeof import("./message-handler.process.js").processDiscordMessage; let formatDiscordReplySkip: typeof import("./message-handler.process.js").formatDiscordReplySkip; let notifyDiscordInboundEventOutboundSuccess: typeof import("../inbound-event-delivery.js").notifyDiscordInboundEventOutboundSuccess; +let createDiscordReplyTypingFeedback: typeof import("./reply-typing-feedback.js").createDiscordReplyTypingFeedback; vi.mock("openclaw/plugin-sdk/reply-runtime", () => ({ dispatchReplyWithBufferedBlockDispatcher: async (params: { @@ -244,6 +256,14 @@ vi.mock("openclaw/plugin-sdk/reply-runtime", () => ({ deliver: (payload: unknown, info: { kind: "block" | "final" }) => Promise | void; onError?: (err: unknown, info: { kind: "block" | "final" }) => void; transformReplyPayload?: (payload: ReplyPayload) => ReplyPayload | null; + typingCallbacks?: { + onReplyStart?: () => Promise | void; + onIdle?: () => void; + onCleanup?: () => void; + }; + onReplyStart?: () => Promise | void; + onIdle?: () => void; + onCleanup?: () => void; onSettled?: () => unknown; onFreshSettledDelivery?: () => unknown; }; @@ -273,10 +293,16 @@ vi.mock("openclaw/plugin-sdk/reply-runtime", () => ({ pendingDeliveries.push(delivery); return true; }; + const typingCallbacks = params.dispatcherOptions.typingCallbacks; + const replyOptions = { + ...params.replyOptions, + onReplyStart: params.dispatcherOptions.onReplyStart ?? typingCallbacks?.onReplyStart, + onTypingCleanup: params.dispatcherOptions.onCleanup ?? typingCallbacks?.onCleanup, + }; try { return await dispatchInboundMessage({ ctx: params.ctx, - replyOptions: params.replyOptions, + replyOptions, dispatcher: { sendBlockReply: vi.fn((payload: ReplyPayload) => queueDelivery(payload, { kind: "block" }), @@ -292,6 +318,8 @@ vi.mock("openclaw/plugin-sdk/reply-runtime", () => ({ } finally { await params.dispatcherOptions.onSettled?.(); await params.dispatcherOptions.onFreshSettledDelivery?.(); + params.dispatcherOptions.onIdle?.(); + typingCallbacks?.onIdle?.(); } }, dispatchInboundMessage: (params: DispatchInboundParams) => dispatchInboundMessage(params), @@ -456,12 +484,15 @@ beforeAll(async () => { ({ processDiscordMessage, formatDiscordReplySkip } = await import("./message-handler.process.js")); ({ notifyDiscordInboundEventOutboundSuccess } = await import("../inbound-event-delivery.js")); + ({ createDiscordReplyTypingFeedback } = await import("./reply-typing-feedback.js")); }); beforeEach(() => { vi.useRealTimers(); sendMocks.reactMessageDiscord.mockClear(); sendMocks.removeReactionDiscord.mockClear(); + typingMocks.sendTyping.mockClear(); + typingMocks.sendTyping.mockResolvedValue(undefined); discordTargetMocks.resolveDiscordTargetChannelId.mockClear(); editMessageDiscord.mockClear(); deliverDiscordReply.mockClear(); @@ -873,6 +904,70 @@ describe("processDiscordMessage ack reactions", () => { expect(feedbackRest).not.toBe(deliveryRest); }); + it("reuses accepted typing feedback through reply dispatch", async () => { + const replyTypingFeedback = { + onReplyStart: vi.fn(async () => {}), + onIdle: vi.fn(), + onCleanup: vi.fn(), + updateChannelId: vi.fn(), + getChannelId: vi.fn(() => "c1"), + restartForDispatch: vi.fn(), + }; + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.replyOptions?.onReplyStart?.(); + return createNoQueuedDispatchResult(); + }); + const ctx = await createAutomaticSourceDeliveryContext({ + replyTypingFeedback, + }); + + await runProcessDiscordMessage(ctx); + + expect(replyTypingFeedback.updateChannelId).not.toHaveBeenCalled(); + expect(replyTypingFeedback.restartForDispatch).toHaveBeenCalledWith("c1"); + expect(replyTypingFeedback.onReplyStart).toHaveBeenCalledTimes(1); + expect(replyTypingFeedback.onIdle).toHaveBeenCalledTimes(1); + expect(replyTypingFeedback.onCleanup).toHaveBeenCalledTimes(1); + expect(typingMocks.sendTyping).not.toHaveBeenCalled(); + }); + + it("restarts stale carried typing feedback before dispatch", async () => { + vi.useFakeTimers(); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const rest = { kind: "feedback-rest" }; + try { + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.replyOptions?.onReplyStart?.(); + await vi.advanceTimersByTimeAsync(3_500); + return createNoQueuedDispatchResult(); + }); + const ctx = await createAutomaticSourceDeliveryContext(); + ctx.replyTypingFeedback = createDiscordReplyTypingFeedback({ + cfg: ctx.cfg, + token: ctx.token, + accountId: ctx.accountId, + channelId: "c1", + rest: rest as never, + log: vi.fn(), + maxDurationMs: 5_000, + }); + await ctx.replyTypingFeedback.onReplyStart(); + await vi.advanceTimersByTimeAsync(5_100); + typingMocks.sendTyping.mockClear(); + + await runProcessDiscordMessage(ctx); + + expect(typingMocks.sendTyping.mock.calls.length).toBeGreaterThanOrEqual(2); + expect( + typingMocks.sendTyping.mock.calls.every( + ([params]) => params.channelId === "c1" && params.rest === rest, + ), + ).toBe(true); + } finally { + warnSpy.mockRestore(); + } + }); + it("debounces intermediate phase reactions and jumps to done for short runs", async () => { dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { await params?.replyOptions?.onReasoningStream?.(); diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index 7f06240e5727..2182df08f033 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -9,7 +9,6 @@ import { createStatusReactionController, DEFAULT_TIMING, logAckFailure, - logTypingFailure, shouldAckReaction as shouldAckReactionGate, } from "openclaw/plugin-sdk/channel-feedback"; import { @@ -66,11 +65,11 @@ import { createDiscordDraftPreviewController } from "./message-handler.draft-pre import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js"; import { resolveForwardedMediaList, resolveMediaList } from "./message-utils.js"; import { deliverDiscordReply } from "./reply-delivery.js"; +import { createDiscordReplyTypingFeedback } from "./reply-typing-feedback.js"; import { DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, } from "./timeouts.js"; -import { sendTyping } from "./typing.js"; function sleep(ms: number): Promise { return new Promise((resolve) => { @@ -78,7 +77,6 @@ function sleep(ms: number): Promise { }); } -const DISCORD_TYPING_MAX_DURATION_MS = 20 * 60_000; let replyRuntimePromise: Promise | undefined; async function loadReplyRuntime() { @@ -154,6 +152,17 @@ function readToolBooleanArg(args: Record, key: string): boolean export async function processDiscordMessage( ctx: DiscordMessagePreflightContext, observer?: DiscordMessageProcessObserver, +) { + try { + await processDiscordMessageInner(ctx, observer); + } finally { + ctx.replyTypingFeedback?.onCleanup?.(); + } +} + +async function processDiscordMessageInner( + ctx: DiscordMessagePreflightContext, + observer?: DiscordMessageProcessObserver, ) { const dispatchStartedAt = Date.now(); const { @@ -184,6 +193,7 @@ export async function processDiscordMessage( discordRestFetch, abortSignal, botLoopProtection, + replyTypingFeedback, } = ctx; if (isProcessAborted(abortSignal)) { return; @@ -432,25 +442,32 @@ export async function processDiscordMessage( const typingChannelId = deliverTarget.startsWith("channel:") ? deliverTarget.slice("channel:".length) : messageChannelId; + // Deliver target can move into a thread after preflight accepted the message. + // The typing owner follows the final target before reply dispatch starts. + const typingFeedback = + replyTypingFeedback ?? + createDiscordReplyTypingFeedback({ + cfg, + token, + accountId, + channelId: typingChannelId, + rest: feedbackRest, + log: logVerbose, + }); + if (replyTypingFeedback) { + // A carried prestart only covers queue wait time; dispatch needs a fresh + // controller after retargeting so an expired TTL cannot silence the run. + replyTypingFeedback.restartForDispatch(typingChannelId); + } else { + typingFeedback.updateChannelId(typingChannelId); + } const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({ cfg, agentId: route.agentId, channel: "discord", accountId: route.accountId, - typing: { - start: () => sendTyping({ rest: feedbackRest, channelId: typingChannelId }), - onStartError: (err) => { - logTypingFailure({ - log: logVerbose, - channel: "discord", - target: typingChannelId, - error: err, - }); - }, - // Long tool-heavy runs are expected on Discord; keep heartbeats alive. - maxDurationMs: DISCORD_TYPING_MAX_DURATION_MS, - }, + typingCallbacks: typingFeedback, }); const tableMode = resolveMarkdownTableMode({ cfg, diff --git a/extensions/discord/src/monitor/message-handler.queue.test.ts b/extensions/discord/src/monitor/message-handler.queue.test.ts index 684c6c73c6f1..dbbe16566e52 100644 --- a/extensions/discord/src/monitor/message-handler.queue.test.ts +++ b/extensions/discord/src/monitor/message-handler.queue.test.ts @@ -11,25 +11,16 @@ import { createDiscordPreflightContext, } from "./message-handler.test-helpers.js"; -const earlyTypingMocks = vi.hoisted(() => ({ - createDiscordRestClient: vi.fn(() => ({ - token: "test-token", - rest: { kind: "discord-rest" }, - account: { accountId: "default", config: {} }, - })), - sendTyping: vi.fn(async () => {}), -})); - -vi.mock("../client.js", () => ({ - createDiscordRestClient: earlyTypingMocks.createDiscordRestClient, -})); - -vi.mock("./typing.js", () => ({ - sendTyping: earlyTypingMocks.sendTyping, -})); - type SetStatusFn = (patch: Record) => void; type MockCallSource = { mock: { calls: Array> } }; +type ReplyTypingFeedbackMock = { + onReplyStart: ReturnType Promise>>; + onIdle: ReturnType void>>; + onCleanup: ReturnType void>>; + updateChannelId: ReturnType void>>; + getChannelId: ReturnType string>>; + restartForDispatch: ReturnType void>>; +}; function mockCall(source: MockCallSource, label: string, callIndex = 0): Array { const call = source.mock.calls[callIndex]; @@ -104,9 +95,22 @@ function createPreflightContext(channelId = "ch-1") { cfg, accountId: "default", token: "test-token", + runtime: { + log: vi.fn(), + error: vi.fn(), + exit: (code: number): never => { + throw new Error(`exit ${code}`); + }, + }, textLimit: 2_000, replyToMode: "off" as const, discordConfig, + messageText: "hello", + isDirectMessage: false, + isGuildMessage: true, + isGroupDm: false, + inboundEventKind: "message" as const, + effectiveWasMentioned: false, }; } @@ -121,6 +125,17 @@ function createAcceptedDmPreflightContext(overrides: Record = { }; } +function createReplyTypingFeedbackMock(channelId = "ch-1"): ReplyTypingFeedbackMock { + return { + onReplyStart: vi.fn(async () => {}), + onIdle: vi.fn(), + onCleanup: vi.fn(), + updateChannelId: vi.fn(), + getChannelId: vi.fn(() => channelId), + restartForDispatch: vi.fn(), + }; +} + function createHandlerWithDefaultPreflight(overrides?: { setStatus?: SetStatusFn }) { preflightDiscordMessageMock.mockImplementation(async (params: { data: { channel_id: string } }) => createPreflightContext(params.data.channel_id), @@ -172,106 +187,118 @@ async function createLifecycleStopScenario(params: { describe("createDiscordMessageHandler queue behavior", () => { beforeEach(() => { - earlyTypingMocks.createDiscordRestClient.mockReset().mockReturnValue({ - token: "test-token", - rest: { kind: "discord-rest" }, - account: { accountId: "default", config: {} }, - }); - earlyTypingMocks.sendTyping.mockReset().mockResolvedValue(undefined); + vi.useRealTimers(); }); - it("sends an accepted DM typing cue before queued processing starts", async () => { + it("starts accepted DM typing feedback before queued processing starts", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); - preflightDiscordMessageMock.mockResolvedValue(createAcceptedDmPreflightContext()); + preflightDiscordMessageMock.mockImplementation(async () => createAcceptedDmPreflightContext()); processDiscordMessageMock.mockResolvedValue(undefined); + const replyTypingFeedback = createReplyTypingFeedbackMock("dm-1"); + const createReplyTypingFeedback = vi.fn(() => replyTypingFeedback); - const handler = createDiscordMessageHandler(createDiscordHandlerParams()); + const handler = createDiscordMessageHandler({ + ...createDiscordHandlerParams(), + testing: { createReplyTypingFeedback }, + }); await expect( handler(createMessageData("m-typing", "dm-1") as never, {} as never), ).resolves.toBeUndefined(); await flushQueueWork(); - expect(earlyTypingMocks.createDiscordRestClient).toHaveBeenCalledTimes(1); - const [restClientParams] = mockCall( - earlyTypingMocks.createDiscordRestClient, - "createDiscordRestClient", + expect(createReplyTypingFeedback).toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "default", + token: "test-token", + channelId: "dm-1", + }), ); - expect((restClientParams as { accountId?: unknown } | undefined)?.accountId).toBe("default"); - expect((restClientParams as { token?: unknown } | undefined)?.token).toBe("test-token"); - expect(earlyTypingMocks.sendTyping).toHaveBeenCalledWith({ - rest: { kind: "discord-rest" }, - channelId: "dm-1", - }); - expect(earlyTypingMocks.sendTyping.mock.invocationCallOrder[0]).toBeLessThan( + expect(replyTypingFeedback.onReplyStart).toHaveBeenCalledTimes(1); + expect(replyTypingFeedback.onReplyStart.mock.invocationCallOrder[0]).toBeLessThan( processDiscordMessageMock.mock.invocationCallOrder[0], ); }); - it("keeps accepted DM dispatch running when the early typing cue fails", async () => { + it("keeps accepted DM dispatch running when accepted typing feedback fails", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); - earlyTypingMocks.sendTyping.mockRejectedValueOnce(new Error("typing failed")); - preflightDiscordMessageMock.mockResolvedValue(createAcceptedDmPreflightContext()); + preflightDiscordMessageMock.mockImplementation(async () => createAcceptedDmPreflightContext()); processDiscordMessageMock.mockResolvedValue(undefined); + const replyTypingFeedback = createReplyTypingFeedbackMock("dm-1"); + replyTypingFeedback.onReplyStart.mockRejectedValueOnce(new Error("typing failed")); - const handler = createDiscordMessageHandler(createDiscordHandlerParams()); + const handler = createDiscordMessageHandler({ + ...createDiscordHandlerParams(), + testing: { createReplyTypingFeedback: vi.fn(() => replyTypingFeedback) }, + }); await expect( handler(createMessageData("m-typing-fails", "dm-1") as never, {} as never), ).resolves.toBeUndefined(); await flushQueueWork(); - expect(earlyTypingMocks.sendTyping).toHaveBeenCalledTimes(1); + expect(replyTypingFeedback.onReplyStart).toHaveBeenCalledTimes(1); expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); }); - it("does not send early typing when preflight rejects the message", async () => { + it("does not start accepted typing feedback when preflight rejects the message", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); preflightDiscordMessageMock.mockResolvedValue(null); + const createReplyTypingFeedback = vi.fn(); - const handler = createDiscordMessageHandler(createDiscordHandlerParams()); + const handler = createDiscordMessageHandler({ + ...createDiscordHandlerParams(), + testing: { createReplyTypingFeedback }, + }); await expect( handler(createMessageData("m-rejected", "dm-1") as never, {} as never), ).resolves.toBeUndefined(); await flushQueueWork(); - expect(earlyTypingMocks.sendTyping).not.toHaveBeenCalled(); + expect(createReplyTypingFeedback).not.toHaveBeenCalled(); expect(processDiscordMessageMock).not.toHaveBeenCalled(); }); - it("does not send early typing when typing mode is not instant", async () => { - preflightDiscordMessageMock.mockReset(); - processDiscordMessageMock.mockReset(); - preflightDiscordMessageMock.mockResolvedValue( - createAcceptedDmPreflightContext({ - cfg: { - ...createPreflightContext().cfg, - agents: { - defaults: { - typingMode: "message", + it.each(["message", "thinking", "never"] as const)( + "does not start accepted typing feedback when typing mode is %s", + async (typingMode) => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + preflightDiscordMessageMock.mockResolvedValue( + createAcceptedDmPreflightContext({ + cfg: { + ...createPreflightContext().cfg, + agents: { + defaults: { + typingMode, + }, }, }, - }, - }), - ); - processDiscordMessageMock.mockResolvedValue(undefined); + }), + ); + processDiscordMessageMock.mockResolvedValue(undefined); + const createReplyTypingFeedback = vi.fn(); - const handler = createDiscordMessageHandler(createDiscordHandlerParams()); - await expect( - handler(createMessageData("m-message-mode", "dm-1") as never, {} as never), - ).resolves.toBeUndefined(); + const handler = createDiscordMessageHandler({ + ...createDiscordHandlerParams(), + testing: { createReplyTypingFeedback }, + }); + await expect( + handler(createMessageData(`m-${typingMode}-mode`, "dm-1") as never, {} as never), + ).resolves.toBeUndefined(); - await flushQueueWork(); + await flushQueueWork(); - expect(earlyTypingMocks.sendTyping).not.toHaveBeenCalled(); - expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); - }); + expect(createReplyTypingFeedback).not.toHaveBeenCalled(); + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }, + ); - it("does not send early typing for guild messages", async () => { + it("does not start default accepted typing feedback for unmentioned guild replies", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); preflightDiscordMessageMock.mockResolvedValue( @@ -279,21 +306,107 @@ describe("createDiscordMessageHandler queue behavior", () => { isDirectMessage: false, isGuildMessage: true, messageChannelId: "guild-channel", + effectiveWasMentioned: false, }), ); processDiscordMessageMock.mockResolvedValue(undefined); + const createReplyTypingFeedback = vi.fn(); - const handler = createDiscordMessageHandler(createDiscordHandlerParams()); + const handler = createDiscordMessageHandler({ + ...createDiscordHandlerParams(), + testing: { createReplyTypingFeedback }, + }); await expect( handler(createMessageData("m-guild", "guild-channel") as never, {} as never), ).resolves.toBeUndefined(); await flushQueueWork(); - expect(earlyTypingMocks.sendTyping).not.toHaveBeenCalled(); + expect(createReplyTypingFeedback).not.toHaveBeenCalled(); expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); }); + it("starts accepted typing feedback for message-tool-only guild replies", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + preflightDiscordMessageMock.mockResolvedValue( + createAcceptedDmPreflightContext({ + cfg: { + ...createPreflightContext().cfg, + messages: { + inbound: { debounceMs: 0 }, + groupChat: { visibleReplies: "message_tool" }, + }, + }, + isDirectMessage: false, + isGuildMessage: true, + messageChannelId: "guild-channel", + effectiveWasMentioned: false, + }), + ); + processDiscordMessageMock.mockResolvedValue(undefined); + const replyTypingFeedback = createReplyTypingFeedbackMock("guild-channel"); + const createReplyTypingFeedback = vi.fn(() => replyTypingFeedback); + + const handler = createDiscordMessageHandler({ + ...createDiscordHandlerParams(), + testing: { createReplyTypingFeedback }, + }); + await expect( + handler(createMessageData("m-guild-tool", "guild-channel") as never, {} as never), + ).resolves.toBeUndefined(); + + await flushQueueWork(); + + expect(replyTypingFeedback.onReplyStart).toHaveBeenCalledTimes(1); + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + + it("deduplicates accepted typing feedback while same-session runs are queued", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const firstRun = createDeferred(); + const processedContexts: Array> = []; + processDiscordMessageMock + .mockImplementationOnce(async (ctx: Record) => { + processedContexts.push(ctx); + await firstRun.promise; + }) + .mockImplementationOnce(async (ctx: Record) => { + processedContexts.push(ctx); + }); + preflightDiscordMessageMock.mockImplementation(async () => createAcceptedDmPreflightContext()); + const replyTypingFeedback = createReplyTypingFeedbackMock("dm-1"); + const createReplyTypingFeedback = vi.fn(() => replyTypingFeedback); + + const handler = createDiscordMessageHandler({ + ...createDiscordHandlerParams(), + testing: { createReplyTypingFeedback }, + }); + await expect( + handler(createMessageData("m-1", "dm-1") as never, {} as never), + ).resolves.toBeUndefined(); + await flushQueueWork(); + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + + await expect( + handler(createMessageData("m-2", "dm-1") as never, {} as never), + ).resolves.toBeUndefined(); + await flushQueueWork(); + + expect(createReplyTypingFeedback).toHaveBeenCalledTimes(1); + expect(replyTypingFeedback.onReplyStart).toHaveBeenCalledTimes(1); + expect(processedContexts[0]?.replyTypingFeedback).toBe(replyTypingFeedback); + + firstRun.resolve(); + await firstRun.promise; + await flushQueueWork(); + + expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); + expect(processedContexts[1]?.replyTypingFeedback).toBeUndefined(); + }); + it("resets busy counters when the handler is created", () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); diff --git a/extensions/discord/src/monitor/message-handler.reply-typing-policy.test.ts b/extensions/discord/src/monitor/message-handler.reply-typing-policy.test.ts new file mode 100644 index 000000000000..5af6d2cf6a56 --- /dev/null +++ b/extensions/discord/src/monitor/message-handler.reply-typing-policy.test.ts @@ -0,0 +1,123 @@ +import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; +import { describe, expect, it, vi } from "vitest"; +import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js"; +import { resolveDiscordAcceptedTypingPrestart } from "./message-handler.reply-typing-policy.js"; +import { createDiscordPreflightContext } from "./message-handler.test-helpers.js"; + +function createPolicyContext( + overrides: Partial = {}, +): DiscordMessagePreflightContext { + const cfg: OpenClawConfig = { + channels: { + discord: { + enabled: true, + token: "test-token", + groupPolicy: "allowlist", + }, + }, + messages: { + inbound: { + debounceMs: 0, + }, + }, + }; + return { + ...createDiscordPreflightContext("c1"), + cfg, + accountId: "default", + token: "test-token", + runtime: { + log: vi.fn(), + error: vi.fn(), + exit: (code: number): never => { + throw new Error(`exit ${code}`); + }, + }, + discordConfig: cfg.channels?.discord, + messageText: "hello", + isDirectMessage: true, + isGuildMessage: false, + isGroupDm: false, + inboundEventKind: "message", + effectiveWasMentioned: false, + ...overrides, + } as DiscordMessagePreflightContext; +} + +describe("resolveDiscordAcceptedTypingPrestart", () => { + it.each([ + ["default direct message", createPolicyContext(), true, "direct"], + [ + "default mentioned guild message", + createPolicyContext({ + isDirectMessage: false, + isGuildMessage: true, + effectiveWasMentioned: true, + }), + true, + "mentioned-group", + ], + [ + "default unmentioned guild message", + createPolicyContext({ + isDirectMessage: false, + isGuildMessage: true, + effectiveWasMentioned: false, + }), + false, + "defer-to-message", + ], + [ + "message-tool-only guild message", + createPolicyContext({ + cfg: { + ...createPolicyContext().cfg, + messages: { + inbound: { debounceMs: 0 }, + groupChat: { visibleReplies: "message_tool" }, + }, + }, + isDirectMessage: false, + isGuildMessage: true, + effectiveWasMentioned: false, + }), + true, + "tool-only", + ], + [ + "room event", + createPolicyContext({ + inboundEventKind: "room_event", + }), + false, + "room-event", + ], + [ + "configured instant", + createPolicyContext({ + cfg: { + ...createPolicyContext().cfg, + agents: { defaults: { typingMode: "instant" } }, + }, + }), + true, + "configured-instant", + ], + [ + "configured message", + createPolicyContext({ + cfg: { + ...createPolicyContext().cfg, + agents: { defaults: { typingMode: "message" } }, + }, + }), + false, + "configured-not-instant", + ], + ] as const)("%s", (_label, ctx, shouldPrestart, reason) => { + expect(resolveDiscordAcceptedTypingPrestart(ctx)).toMatchObject({ + shouldPrestart, + reason, + }); + }); +}); diff --git a/extensions/discord/src/monitor/message-handler.reply-typing-policy.ts b/extensions/discord/src/monitor/message-handler.reply-typing-policy.ts new file mode 100644 index 000000000000..4f209978968b --- /dev/null +++ b/extensions/discord/src/monitor/message-handler.reply-typing-policy.ts @@ -0,0 +1,76 @@ +import { resolveChannelMessageSourceReplyDeliveryMode } from "openclaw/plugin-sdk/channel-outbound"; +import type { DiscordMessagePreflightContext } from "./message-handler.preflight.types.js"; + +type SourceReplyDeliveryMode = ReturnType; + +export type DiscordAcceptedTypingPrestartDecision = { + sourceReplyDeliveryMode: SourceReplyDeliveryMode; + shouldPrestart: boolean; + reason: + | "aborted" + | "empty" + | "room-event" + | "configured-instant" + | "configured-not-instant" + | "tool-only" + | "direct" + | "mentioned-group" + | "defer-to-message"; +}; + +export function resolveDiscordSourceReplyDeliveryMode( + ctx: DiscordMessagePreflightContext, +): SourceReplyDeliveryMode { + // Keep prestart policy keyed to the same source-reply mode as dispatch. + // Otherwise message-tool-only group replies would wait behind "message" mode. + return resolveChannelMessageSourceReplyDeliveryMode({ + cfg: ctx.cfg, + ctx: { + ChatType: ctx.isDirectMessage + ? "direct" + : ctx.isGroupDm + ? "group" + : ctx.isGuildMessage + ? "channel" + : undefined, + InboundEventKind: ctx.inboundEventKind, + }, + }); +} + +export function resolveDiscordAcceptedTypingPrestart( + ctx: DiscordMessagePreflightContext, +): DiscordAcceptedTypingPrestartDecision { + const sourceReplyDeliveryMode = resolveDiscordSourceReplyDeliveryMode(ctx); + if (ctx.abortSignal?.aborted) { + return { sourceReplyDeliveryMode, shouldPrestart: false, reason: "aborted" }; + } + if (!ctx.messageText.trim()) { + return { sourceReplyDeliveryMode, shouldPrestart: false, reason: "empty" }; + } + if (ctx.inboundEventKind === "room_event") { + return { sourceReplyDeliveryMode, shouldPrestart: false, reason: "room-event" }; + } + const configuredTypingMode = ctx.cfg.session?.typingMode ?? ctx.cfg.agents?.defaults?.typingMode; + if (configuredTypingMode !== undefined) { + // Explicit operator config wins over Discord heuristics. + // Non-instant modes intentionally defer to the normal reply pipeline. + return { + sourceReplyDeliveryMode, + shouldPrestart: configuredTypingMode === "instant", + reason: configuredTypingMode === "instant" ? "configured-instant" : "configured-not-instant", + }; + } + if (sourceReplyDeliveryMode === "message_tool_only") { + // Message-tool-only replies have no visible default response path. + // Prestart preserves user feedback while the tool-delivered reply waits. + return { sourceReplyDeliveryMode, shouldPrestart: true, reason: "tool-only" }; + } + if (!ctx.isGuildMessage && !ctx.isGroupDm) { + return { sourceReplyDeliveryMode, shouldPrestart: true, reason: "direct" }; + } + if (ctx.effectiveWasMentioned) { + return { sourceReplyDeliveryMode, shouldPrestart: true, reason: "mentioned-group" }; + } + return { sourceReplyDeliveryMode, shouldPrestart: false, reason: "defer-to-message" }; +} diff --git a/extensions/discord/src/monitor/message-handler.ts b/extensions/discord/src/monitor/message-handler.ts index a40d109ddd3c..be0cce9e3d3b 100644 --- a/extensions/discord/src/monitor/message-handler.ts +++ b/extensions/discord/src/monitor/message-handler.ts @@ -4,7 +4,6 @@ import { } from "openclaw/plugin-sdk/channel-inbound"; import { danger, logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolveOpenProviderRuntimeGroupPolicy } from "openclaw/plugin-sdk/runtime-group-policy"; -import { createDiscordRestClient } from "../client.js"; import type { Client } from "../internal/discord.js"; import { buildDiscordInboundReplayKey, @@ -14,11 +13,14 @@ import { DiscordRetryableInboundError, releaseDiscordInboundReplay, } from "./inbound-dedupe.js"; -import { buildDiscordInboundJob } from "./inbound-job.js"; +import { buildDiscordInboundJob, resolveDiscordInboundJobQueueKey } from "./inbound-job.js"; import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js"; import { applyImplicitReplyBatchGate } from "./message-handler.batch-gate.js"; -import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js"; -import type { DiscordMessagePreflightParams } from "./message-handler.preflight.types.js"; +import type { + DiscordMessagePreflightContext, + DiscordMessagePreflightParams, +} from "./message-handler.preflight.types.js"; +import { resolveDiscordAcceptedTypingPrestart } from "./message-handler.reply-typing-policy.js"; import { createDiscordMessageRunQueue, type DiscordMessageRunQueueTestingHooks, @@ -28,11 +30,15 @@ import { resolveDiscordMessageChannelId, resolveDiscordMessageText, } from "./message-utils.js"; +import { + createDiscordReplyTypingFeedback, + type DiscordReplyTypingFeedback, +} from "./reply-typing-feedback.js"; import type { DiscordMonitorStatusSink } from "./status.js"; -import { sendTyping } from "./typing.js"; type PreflightDiscordMessage = typeof import("./message-handler.preflight.js").preflightDiscordMessage; +type CreateDiscordReplyTypingFeedback = typeof createDiscordReplyTypingFeedback; type DiscordMessageHandlerParams = Omit< DiscordMessagePreflightParams, @@ -45,6 +51,12 @@ type DiscordMessageHandlerParams = Omit< type DiscordMessageHandlerTestingHooks = DiscordMessageRunQueueTestingHooks & { preflightDiscordMessage?: PreflightDiscordMessage; + createReplyTypingFeedback?: CreateDiscordReplyTypingFeedback; +}; + +type PrestartedTypingFeedbackEntry = { + channelId: string; + feedback: DiscordReplyTypingFeedback; }; let messagePreflightRuntimePromise: @@ -64,34 +76,47 @@ function isNonEmptyString(value: string | undefined): value is string { return typeof value === "string" && value.length > 0; } -function shouldSendAcceptedDiscordTypingCue(ctx: DiscordMessagePreflightContext): boolean { - if (ctx.abortSignal?.aborted) { - return false; +function startAcceptedTypingFeedback(params: { + ctx: DiscordMessagePreflightContext; + createFeedback?: CreateDiscordReplyTypingFeedback; + dedupeKey: string; + activeFeedback: Map; +}): DiscordReplyTypingFeedback | undefined { + const { ctx, createFeedback, dedupeKey, activeFeedback } = params; + if (!resolveDiscordAcceptedTypingPrestart(ctx).shouldPrestart) { + return undefined; } - if (!ctx.isDirectMessage || ctx.isGuildMessage || ctx.isGroupDm) { - return false; + const channelId = ctx.messageChannelId.trim(); + const existing = activeFeedback.get(dedupeKey); + if (existing) { + // One pre-dispatch keepalive owns each serialized Discord queue key. + // Later queued jobs get fresh typing when their dispatch turn starts. + return undefined; } - if (!ctx.messageText.trim()) { - return false; - } - const configuredTypingMode = ctx.cfg.session?.typingMode ?? ctx.cfg.agents?.defaults?.typingMode; - return configuredTypingMode === undefined || configuredTypingMode === "instant"; -} - -function queueAcceptedDiscordTypingCue(ctx: DiscordMessagePreflightContext): void { - if (!shouldSendAcceptedDiscordTypingCue(ctx)) { - return; - } - const { rest } = createDiscordRestClient({ - cfg: ctx.cfg, - token: ctx.token, - accountId: ctx.accountId, - }); - void sendTyping({ rest, channelId: ctx.messageChannelId }).catch((err) => { - logVerbose( - `discord early typing cue failed for channel ${ctx.messageChannelId}: ${String(err)}`, - ); + const replyTypingFeedback = + ctx.replyTypingFeedback ?? + (createFeedback ?? createDiscordReplyTypingFeedback)({ + cfg: ctx.cfg, + token: ctx.token, + accountId: ctx.accountId, + channelId: ctx.messageChannelId, + log: logVerbose, + }); + const cleanup = replyTypingFeedback.onCleanup; + replyTypingFeedback.onCleanup = () => { + cleanup?.(); + // Cleanup is the lease release for both normal dispatch and skipped jobs. + // Without this, a stale queue key would suppress future accepted typing. + if (activeFeedback.get(dedupeKey)?.feedback === replyTypingFeedback) { + activeFeedback.delete(dedupeKey); + } + }; + activeFeedback.set(dedupeKey, { channelId, feedback: replyTypingFeedback }); + ctx.replyTypingFeedback = replyTypingFeedback; + void replyTypingFeedback.onReplyStart().catch((err) => { + logVerbose(`discord accepted typing feedback failed: ${String(err)}`); }); + return replyTypingFeedback; } export function createDiscordMessageHandler( @@ -108,6 +133,9 @@ export function createDiscordMessageHandler( "group-mentions"; const preflightDiscordMessageImpl = params.testing?.preflightDiscordMessage; const replayGuard = createDiscordInboundReplayGuard(); + // The map owns pre-dispatch typing leases, not queued work itself. + // Each lease is released by the feedback cleanup hook installed below. + const prestartedTypingFeedback = new Map(); const messageRunQueue = createDiscordMessageRunQueue({ runtime: params.runtime, setStatus: params.setStatus, @@ -185,8 +213,14 @@ export function createDiscordMessageHandler( await commitDiscordInboundReplay({ replayKeys, replayGuard }); return; } + const queueKey = resolveDiscordInboundJobQueueKey(ctx); + startAcceptedTypingFeedback({ + ctx, + createFeedback: params.testing?.createReplyTypingFeedback, + dedupeKey: queueKey, + activeFeedback: prestartedTypingFeedback, + }); applyImplicitReplyBatchGate(ctx, params.replyToMode, false); - queueAcceptedDiscordTypingCue(ctx); messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); return; } @@ -235,6 +269,13 @@ export function createDiscordMessageHandler( await commitDiscordInboundReplay({ replayKeys, replayGuard }); return; } + const queueKey = resolveDiscordInboundJobQueueKey(ctx); + startAcceptedTypingFeedback({ + ctx, + createFeedback: params.testing?.createReplyTypingFeedback, + dedupeKey: queueKey, + activeFeedback: prestartedTypingFeedback, + }); applyImplicitReplyBatchGate(ctx, params.replyToMode, true); if (entries.length > 1) { const ids = entries.map((entry) => entry.data.message?.id).filter(isNonEmptyString); @@ -249,7 +290,6 @@ export function createDiscordMessageHandler( ctxBatch.MessageSidLast = ids[ids.length - 1]; } } - queueAcceptedDiscordTypingCue(ctx); messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); } catch (error) { if (error instanceof DiscordRetryableInboundError) { diff --git a/extensions/discord/src/monitor/message-run-queue.ts b/extensions/discord/src/monitor/message-run-queue.ts index bdd5fee29fbf..ac9e14d4e79d 100644 --- a/extensions/discord/src/monitor/message-run-queue.ts +++ b/extensions/discord/src/monitor/message-run-queue.ts @@ -31,6 +31,8 @@ export type DiscordMessageRunQueueTestingHooks = { processDiscordMessage?: ProcessDiscordMessage; }; +type SkippedQueuedMessageCleanup = () => void; + let messageProcessRuntimePromise: | Promise | undefined; @@ -73,10 +75,28 @@ async function processDiscordQueuedMessage(params: { } } +function cleanupSkippedDiscordQueuedMessage(params: { + job: DiscordInboundJob; + replayGuard: ClaimableDedupe; +}) { + try { + // Skipped jobs never reach processDiscordMessage's finally block. + // Clean carried typing here before reopening the replay key for retry. + params.job.runtime.replyTypingFeedback?.onCleanup?.(); + } finally { + releaseDiscordInboundReplay({ + replayKeys: params.job.replayKeys, + error: new DiscordRetryableInboundError("discord queued run skipped before processing"), + replayGuard: params.replayGuard, + }); + } +} + export function createDiscordMessageRunQueue( params: DiscordMessageRunQueueParams, ): DiscordMessageRunQueue { const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard(); + const skippedCleanup = new Set(); const runQueue = createChannelRunQueue({ setStatus: params.setStatus, abortSignal: params.abortSignal, @@ -84,10 +104,42 @@ export function createDiscordMessageRunQueue( params.runtime.error(danger(`discord message run failed: ${String(error)}`)); }, }); + let lifecycleActive = !params.abortSignal?.aborted; + + const cleanupSkippedQueuedMessages = () => { + // These callbacks represent jobs accepted into the queue but not started. + // Running jobs remove their callback before processDiscordMessage owns cleanup. + if (!lifecycleActive && skippedCleanup.size === 0) { + return; + } + lifecycleActive = false; + const cleanups = [...skippedCleanup]; + skippedCleanup.clear(); + for (const cleanup of cleanups) { + cleanup(); + } + }; + + if (params.abortSignal?.aborted) { + cleanupSkippedQueuedMessages(); + } else { + params.abortSignal?.addEventListener("abort", cleanupSkippedQueuedMessages, { once: true }); + } return { enqueue(job) { + const cleanupSkipped = () => { + cleanupSkippedDiscordQueuedMessage({ job, replayGuard }); + }; + if (!lifecycleActive) { + cleanupSkipped(); + return; + } + skippedCleanup.add(cleanupSkipped); runQueue.enqueue(job.queueKey, async ({ lifecycleSignal }) => { + // Once the task starts, normal process/commit handling owns cleanup. + // Leaving it in skippedCleanup would double-release replay/typing state. + skippedCleanup.delete(cleanupSkipped); await processDiscordQueuedMessage({ job, lifecycleSignal, @@ -96,6 +148,9 @@ export function createDiscordMessageRunQueue( }); }); }, - deactivate: runQueue.deactivate, + deactivate() { + runQueue.deactivate(); + cleanupSkippedQueuedMessages(); + }, }; } diff --git a/extensions/discord/src/monitor/reply-typing-feedback.ts b/extensions/discord/src/monitor/reply-typing-feedback.ts new file mode 100644 index 000000000000..4c23a15d2389 --- /dev/null +++ b/extensions/discord/src/monitor/reply-typing-feedback.ts @@ -0,0 +1,71 @@ +import { logTypingFailure } from "openclaw/plugin-sdk/channel-feedback"; +import { createTypingCallbacks } from "openclaw/plugin-sdk/channel-outbound"; +import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; +import { createDiscordRestClient } from "../client.js"; +import type { RequestClient } from "../internal/discord.js"; +import { sendTyping } from "./typing.js"; + +export const DISCORD_REPLY_TYPING_MAX_DURATION_MS = 20 * 60_000; + +// Discord can keep long tool-heavy replies alive, but not forever. +// The dispatch restart path refreshes this TTL after queue wait time. +export type DiscordReplyTypingFeedback = ReturnType & { + updateChannelId: (channelId: string) => void; + getChannelId: () => string; + restartForDispatch: (channelId: string) => void; +}; + +export function createDiscordReplyTypingFeedback(params: { + cfg: OpenClawConfig; + token: string; + accountId: string; + channelId: string; + rest?: RequestClient; + log: (message: string) => void; + maxDurationMs?: number; +}): DiscordReplyTypingFeedback { + let channelId = params.channelId; + const rest = + params.rest ?? + createDiscordRestClient({ + cfg: params.cfg, + token: params.token, + accountId: params.accountId, + }).rest; + const createCallbacks = () => + createTypingCallbacks({ + start: () => sendTyping({ rest, channelId }), + onStartError: (err) => { + logTypingFailure({ + log: params.log, + channel: "discord", + target: channelId, + error: err, + }); + }, + maxDurationMs: params.maxDurationMs ?? DISCORD_REPLY_TYPING_MAX_DURATION_MS, + }); + const updateChannelId = (nextChannelId: string) => { + const trimmed = nextChannelId.trim(); + if (trimmed) { + channelId = trimmed; + } + }; + let callbacks = createCallbacks(); + return { + // Expose one stable owner while allowing the inner typing controller to + // rotate between prequeue feedback and the actual dispatch lifecycle. + onReplyStart: () => callbacks.onReplyStart(), + onIdle: () => callbacks.onIdle?.(), + onCleanup: () => callbacks.onCleanup?.(), + updateChannelId, + restartForDispatch: (nextChannelId) => { + updateChannelId(nextChannelId); + // Prequeue typing may have hit its TTL before the job starts. + // Rotate the inner controller so dispatch always owns a live heartbeat. + callbacks.onCleanup?.(); + callbacks = createCallbacks(); + }, + getChannelId: () => channelId, + }; +}