From de0ae37acb7d1b297734cceede81426a49837ec2 Mon Sep 17 00:00:00 2001 From: Marcus Castro Date: Thu, 4 Jun 2026 19:28:25 -0300 Subject: [PATCH] test: cover visible delivery typing handoff --- ...o-reply.connection-and-logging.e2e.test.ts | 1 + .../monitor/inbound-dispatch.test.ts | 203 +++++++++++++----- src/auto-reply/dispatch.test.ts | 1 + src/auto-reply/reply/before-deliver.test.ts | 174 ++++++++++++++- .../reply/dispatch-acp-delivery.test.ts | 34 +++ ...ispatch-from-config.reply-dispatch.test.ts | 54 +++++ .../reply/dispatch-from-config.test.ts | 22 +- ...et-reply-directives.target-session.test.ts | 2 + ...ine-actions.skip-when-config-empty.test.ts | 1 + .../reply/get-reply-run.media-only.test.ts | 37 +++- src/auto-reply/reply/reply-utils.test.ts | 27 +++ src/auto-reply/reply/reply.test-helpers.ts | 1 + src/auto-reply/reply/route-reply.test.ts | 9 + src/auto-reply/reply/test-helpers.ts | 1 + .../reply/typing-persistence.test.ts | 18 ++ src/channels/message/send.test.ts | 5 + src/channels/turn/durable-delivery.test.ts | 6 + src/infra/outbound/deliver.test.ts | 6 + 18 files changed, 541 insertions(+), 61 deletions(-) diff --git a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts index 8c66df9b5df6..eaf9f4782603 100644 --- a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts +++ b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts @@ -1095,6 +1095,7 @@ describe("web auto-reply connection", () => { const typingMock = { onReplyStart: vi.fn(async () => {}), startTypingLoop: vi.fn(async () => {}), + startTypingForVisibleDelivery: vi.fn(async () => {}), startTypingOnText: vi.fn(async () => {}), refreshTypingTtl: vi.fn(), isActive: vi.fn(() => false), diff --git a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts index 743979a3dc3a..ac0e496aab7a 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts @@ -17,7 +17,10 @@ type CapturedDispatchParams = { dispatcherOptions?: { deliver?: ( payload: CapturedReplyPayload, - info: { kind: "tool" | "block" | "final" }, + info: { + kind: "tool" | "block" | "final"; + startVisibleDeliveryTyping?: () => Promise; + }, ) => Promise; onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void; onSettled?: () => Promise; @@ -26,6 +29,7 @@ type CapturedDispatchParams = { disableBlockStreaming?: boolean; sourceReplyDeliveryMode?: "automatic" | "message_tool_only"; suppressTyping?: boolean; + typingStartPolicy?: "visible_delivery"; }; }; @@ -181,6 +185,12 @@ function makeMsg(overrides: Partial = {}): TestMsg { }; } +const whatsappGroupJid = "120363000000000000@g.us"; + +function makeWhatsAppGroupMsg(overrides: Partial = {}): TestMsg { + return makeMsg({ from: whatsappGroupJid, chatType: "group", ...overrides }); +} + function getCapturedDeliver() { return (capturedDispatchParams as CapturedDispatchParams)?.dispatcherOptions?.deliver; } @@ -697,6 +707,50 @@ describe("whatsapp inbound dispatch", () => { }); }); + it("passes visible-delivery typing into durable final WhatsApp sends", async () => { + const startVisibleDeliveryTyping = vi.fn(); + deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({ + status: "handled_visible", + delivery: { + messageIds: ["wa-1"], + visibleReplySent: true, + }, + }); + + await dispatchBufferedReply({ + cfg: { + channels: { whatsapp: { blockStreaming: true } }, + messages: { groupChat: { visibleReplies: "automatic" } }, + } as never, + context: { Body: "incoming", ChatType: "group" }, + msg: makeWhatsAppGroupMsg(), + }); + + const deliver = getCapturedDeliver(); + await deliver?.( + { text: "final payload" }, + { + kind: "final", + startVisibleDeliveryTyping, + }, + ); + + const durableParams = requireMockArg( + deliverInboundReplyWithMessageSendContextMock, + 0, + 0, + "durable delivery params", + ); + const start = durableParams.onVisibleDeliveryStart; + if (typeof start !== "function") { + throw new Error("expected durable visible delivery start hook"); + } + expect(startVisibleDeliveryTyping).not.toHaveBeenCalled(); + await start(); + + expect(startVisibleDeliveryTyping).toHaveBeenCalledTimes(1); + }); + it("does not fall back when durable WhatsApp delivery suppresses a send", async () => { deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({ status: "handled_no_send", @@ -1038,7 +1092,7 @@ describe("whatsapp inbound dispatch", () => { it("defaults WhatsApp group replies to message-tool-only and disables source streaming", async () => { await dispatchBufferedReply({ context: { Body: "hi", ChatType: "group" }, - msg: makeMsg({ from: "120363000000000000@g.us", chatType: "group" }), + msg: makeWhatsAppGroupMsg(), }); expectRecordFields(requireRecord(getCapturedReplyOptions(), "reply options"), { @@ -1047,8 +1101,70 @@ describe("whatsapp inbound dispatch", () => { }); }); - it("delivers authorized WhatsApp group text slash command replies visibly", async () => { - await dispatchBufferedReply({ + it.each([ + { + name: "automatic unmentioned group", + cfg: { + channels: { whatsapp: { blockStreaming: true } }, + messages: { groupChat: { visibleReplies: "automatic" } }, + } as never, + context: { Body: "hi", ChatType: "group" }, + msg: makeWhatsAppGroupMsg(), + expected: { + sourceReplyDeliveryMode: "automatic", + disableBlockStreaming: false, + suppressTyping: false, + typingStartPolicy: "visible_delivery", + }, + }, + { + name: "passive message-tool-only group", + context: { Body: "hi", ChatType: "group" }, + msg: makeWhatsAppGroupMsg({ wasMentioned: false }), + expected: { + suppressTyping: true, + typingStartPolicy: undefined, + }, + }, + { + name: "mentioned group", + context: { Body: "@bot hi", ChatType: "group" }, + msg: makeWhatsAppGroupMsg({ wasMentioned: true }), + expected: { + suppressTyping: false, + typingStartPolicy: undefined, + }, + }, + { + name: "direct chat", + context: { Body: "hi", ChatType: "direct" }, + msg: makeMsg({ from: "+15550001000", chatType: "direct" }), + expected: { + suppressTyping: false, + typingStartPolicy: undefined, + }, + }, + { + name: "native", + context: { + Body: "/status", + ChatType: "group", + CommandSource: "native" as const, + }, + msg: makeWhatsAppGroupMsg({ body: "/status", wasMentioned: false }), + expected: { + suppressTyping: false, + typingStartPolicy: undefined, + }, + commandTurn: { + kind: "native" as const, + source: "native" as const, + authorized: true, + body: "/status", + }, + }, + { + name: "authorized text slash", cfg: { channels: { whatsapp: { blockStreaming: true } }, messages: { groupChat: { visibleReplies: "message_tool" } }, @@ -1057,64 +1173,37 @@ describe("whatsapp inbound dispatch", () => { Body: "/status", ChatType: "group", CommandAuthorized: true, - CommandSource: "text", + CommandSource: "text" as const, }, - msg: makeMsg({ + msg: makeWhatsAppGroupMsg({ body: "/status", wasMentioned: false }), + expected: { + sourceReplyDeliveryMode: "automatic", + disableBlockStreaming: false, + suppressTyping: false, + typingStartPolicy: undefined, + }, + commandTurn: { + kind: "text-slash" as const, + source: "text" as const, + authorized: true, body: "/status", - from: "120363000000000000@g.us", - chatType: "group", - }), - }); - - expectRecordFields(requireRecord(getCapturedReplyOptions(), "reply options"), { - sourceReplyDeliveryMode: "automatic", - disableBlockStreaming: false, - suppressTyping: false, - }); - }); - - it("honors automatic visible replies for WhatsApp groups", async () => { + }, + }, + ])("resolves visible reply and typing policy for $name", async (testCase) => { await dispatchBufferedReply({ - cfg: { - channels: { whatsapp: { blockStreaming: true } }, - messages: { groupChat: { visibleReplies: "automatic" } }, - } as never, - context: { Body: "hi", ChatType: "group" }, - msg: makeMsg({ from: "120363000000000000@g.us", chatType: "group" }), + ...("cfg" in testCase ? { cfg: testCase.cfg } : {}), + context: { + ...testCase.context, + ...("commandTurn" in testCase ? { CommandTurn: testCase.commandTurn } : {}), + }, + msg: testCase.msg, }); - expectRecordFields(requireRecord(getCapturedReplyOptions(), "reply options"), { - sourceReplyDeliveryMode: "automatic", - disableBlockStreaming: false, - suppressTyping: false, - }); - }); - - it("suppresses typing for message-tool-only group chat without mention", async () => { - await dispatchBufferedReply({ - context: { Body: "hi", ChatType: "group" }, - msg: makeMsg({ from: "120363000000000000@g.us", chatType: "group", wasMentioned: false }), - }); - - expect(getCapturedReplyOptions()?.suppressTyping).toBe(true); - }); - - it("does not suppress typing for group chat when mentioned", async () => { - await dispatchBufferedReply({ - context: { Body: "@bot hi", ChatType: "group" }, - msg: makeMsg({ from: "120363000000000000@g.us", chatType: "group", wasMentioned: true }), - }); - - expect(getCapturedReplyOptions()?.suppressTyping).toBe(false); - }); - - it("does not suppress typing for direct chat", async () => { - await dispatchBufferedReply({ - context: { Body: "hi", ChatType: "direct" }, - msg: makeMsg({ from: "+15550001000", chatType: "direct" }), - }); - - expect(getCapturedReplyOptions()?.suppressTyping).toBe(false); + const replyOptions = requireRecord(getCapturedReplyOptions(), "reply options"); + expectRecordFields(replyOptions, testCase.expected); + if (testCase.expected.typingStartPolicy === undefined) { + expect(getCapturedReplyOptions()?.typingStartPolicy).toBeUndefined(); + } }); it("treats block-only turns as visible replies instead of silent turns", async () => { diff --git a/src/auto-reply/dispatch.test.ts b/src/auto-reply/dispatch.test.ts index afec4e6f9165..e60260a00eac 100644 --- a/src/auto-reply/dispatch.test.ts +++ b/src/auto-reply/dispatch.test.ts @@ -239,6 +239,7 @@ describe("withReplyDispatcher", () => { const typing = { onReplyStart: vi.fn(async () => {}), startTypingLoop: vi.fn(async () => {}), + startTypingForVisibleDelivery: vi.fn(async () => {}), startTypingOnText: vi.fn(async () => {}), refreshTypingTtl: vi.fn(), isActive: vi.fn(() => true), diff --git a/src/auto-reply/reply/before-deliver.test.ts b/src/auto-reply/reply/before-deliver.test.ts index 0225adc02136..854a09f090fe 100644 --- a/src/auto-reply/reply/before-deliver.test.ts +++ b/src/auto-reply/reply/before-deliver.test.ts @@ -1,7 +1,9 @@ // Tests before-deliver hook ordering and payload mutation behavior. -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import type { ReplyPayload } from "../types.js"; -import { createReplyDispatcher } from "./reply-dispatcher.js"; +import { createReplyDispatcher, createReplyDispatcherWithTyping } from "./reply-dispatcher.js"; +import { createMockTypingController } from "./test-helpers.js"; +import { createTypingController, VISIBLE_DELIVERY_TYPING_START_TIMEOUT_MS } from "./typing.js"; describe("beforeDeliver in reply dispatcher", () => { it("cancels delivery before queueing when transformReplyPayload returns null", async () => { @@ -91,4 +93,172 @@ describe("beforeDeliver in reply dispatcher", () => { expect(delivered).toEqual(["plain reply"]); }); + + it("starts real visible-delivery typing once after accepted delivery", async () => { + const order: string[] = []; + const onReplyStart = vi.fn(async () => { + order.push("typing"); + }); + + const { dispatcher, markRunComplete, replyOptions } = createReplyDispatcherWithTyping({ + typingStartPolicy: "visible_delivery", + onReplyStart, + deliver: async (payload, info) => { + await info.startVisibleDeliveryTyping?.(); + order.push(`deliver:${payload.text ?? ""}`); + }, + transformReplyPayload: () => ({ text: "normalized" }), + beforeDeliver: async (payload: ReplyPayload) => { + order.push(`before:${payload.text ?? ""}`); + return { ...payload, text: "approved" }; + }, + }); + const typing = createTypingController({ + onReplyStart: replyOptions.onReplyStart, + }); + replyOptions.onTypingController?.(typing); + markRunComplete(); + + dispatcher.sendFinalReply({ text: "raw" }); + dispatcher.sendFinalReply({ text: "raw again" }); + dispatcher.markComplete(); + await dispatcher.waitForIdle(); + + expect(order).toEqual([ + "before:normalized", + "typing", + "deliver:approved", + "before:normalized", + "deliver:approved", + ]); + expect(onReplyStart).toHaveBeenCalledTimes(1); + }); + + it("awaits fallback visible-delivery typing before delivery", async () => { + const delivered: string[] = []; + const order: string[] = []; + let resolveTyping!: () => void; + const typingStarted = new Promise((resolve) => { + resolveTyping = resolve; + }); + const onReplyStart = vi.fn(async () => { + order.push("typing:start"); + await typingStarted; + order.push("typing:done"); + }); + + const { dispatcher } = createReplyDispatcherWithTyping({ + typingStartPolicy: "visible_delivery", + onReplyStart, + deliver: async (payload, info) => { + await info.startVisibleDeliveryTyping?.(); + order.push("deliver"); + delivered.push(payload.text ?? ""); + }, + }); + + dispatcher.sendFinalReply({ text: "visible" }); + dispatcher.markComplete(); + const idle = dispatcher.waitForIdle(); + await Promise.resolve(); + await Promise.resolve(); + + expect(order).toEqual(["typing:start"]); + expect(delivered).toEqual([]); + + resolveTyping(); + await idle; + + expect(order).toEqual(["typing:start", "typing:done", "deliver"]); + expect(onReplyStart).toHaveBeenCalledTimes(1); + expect(delivered).toEqual(["visible"]); + }); + + it.each([ + { name: "typing controller", installController: true }, + { name: "fallback lifecycle callback", installController: false }, + ] as const)( + "continues delivery when $name visible-delivery typing stalls past the bounded wait", + async ({ installController }) => { + vi.useFakeTimers(); + try { + const delivered: string[] = []; + const startTyping = vi.fn(() => new Promise(() => {})); + + const { dispatcher, replyOptions } = createReplyDispatcherWithTyping({ + typingStartPolicy: "visible_delivery", + ...(installController ? {} : { onReplyStart: startTyping }), + deliver: async (payload, info) => { + await info.startVisibleDeliveryTyping?.(); + delivered.push(payload.text ?? ""); + }, + }); + if (installController) { + replyOptions.onTypingController?.( + createMockTypingController({ + startTypingForVisibleDelivery: startTyping, + }), + ); + } + + dispatcher.sendFinalReply({ text: "visible" }); + dispatcher.markComplete(); + const idle = dispatcher.waitForIdle(); + await Promise.resolve(); + await Promise.resolve(); + + expect(startTyping).toHaveBeenCalledTimes(1); + expect(delivered).toEqual([]); + + await vi.advanceTimersByTimeAsync(VISIBLE_DELIVERY_TYPING_START_TIMEOUT_MS); + await idle; + + expect(delivered).toEqual(["visible"]); + } finally { + vi.useRealTimers(); + } + }, + ); + + it.each([ + { + name: "beforeDeliver cancels", + options: { + beforeDeliver: async () => null, + }, + payload: { text: "blocked" }, + }, + { + name: "suppressTyping is hard never", + options: { + suppressTyping: true, + }, + payload: { text: "visible" }, + }, + { + name: "delivery owner leaves the hook silent", + options: {}, + payload: { text: "dropped" }, + skipHook: true, + }, + ] as const)("does not start visible-delivery typing when $name", async (testCase) => { + const typing = createMockTypingController(); + + const { dispatcher, replyOptions } = createReplyDispatcherWithTyping({ + ...testCase.options, + typingStartPolicy: "visible_delivery", + deliver: async (_payload, info) => { + if (!("skipHook" in testCase) || testCase.skipHook !== true) { + await info.startVisibleDeliveryTyping?.(); + } + }, + }); + replyOptions.onTypingController?.(typing); + + dispatcher.sendFinalReply(testCase.payload); + dispatcher.markComplete(); + await dispatcher.waitForIdle(); + + expect(typing.startTypingForVisibleDelivery).not.toHaveBeenCalled(); + }); }); diff --git a/src/auto-reply/reply/dispatch-acp-delivery.test.ts b/src/auto-reply/reply/dispatch-acp-delivery.test.ts index 7b3079e4d30d..976f772d7763 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.test.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.test.ts @@ -658,6 +658,40 @@ describe("createAcpDispatchDeliveryCoordinator", () => { expect(onReplyStart).not.toHaveBeenCalled(); }); + it("threads visible delivery lifecycle through routed sends when generic lifecycle is suppressed", async () => { + const onReplyStart = vi.fn(async () => {}); + const onVisibleDeliveryStart = vi.fn(async () => {}); + const coordinator = createAcpDispatchDeliveryCoordinator({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "visiblechat", + Surface: "visiblechat", + SessionKey: "agent:codex-acp:session-1", + }), + dispatcher: createDispatcher(), + inboundAudio: false, + suppressReplyLifecycle: true, + shouldRouteToOriginating: true, + originatingChannel: "visiblechat", + originatingTo: "channel:thread-1", + onReplyStart, + onVisibleDeliveryStart, + }); + + const delivered = await coordinator.deliver("block", { text: "hello" }); + + expect(delivered).toBe(true); + expect(deliveryMocks.routeReply).toHaveBeenCalledTimes(1); + const [[routeParams]] = deliveryMocks.routeReply.mock.calls as unknown as Array< + [{ onVisibleDeliveryStart?: () => Promise }] + >; + expect(routeParams.onVisibleDeliveryStart).toEqual(expect.any(Function)); + expect(onVisibleDeliveryStart).not.toHaveBeenCalled(); + await routeParams.onVisibleDeliveryStart?.(); + expect(onVisibleDeliveryStart).toHaveBeenCalledTimes(1); + expect(onReplyStart).not.toHaveBeenCalled(); + }); + it("can start reply lifecycle while user delivery is suppressed", async () => { const onReplyStart = vi.fn(async () => {}); const dispatcher = createDispatcher(); diff --git a/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts b/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts index 46e20d9e01b6..b7262890ea37 100644 --- a/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts @@ -37,9 +37,13 @@ function firstReplyDispatchCall() { sessionKey?: string; sendPolicy?: string; inboundAudio?: boolean; + suppressReplyLifecycle?: boolean; + shouldRouteToOriginating?: boolean; }, { cfg?: unknown; + onReplyStart?: unknown; + onVisibleDeliveryStart?: unknown; }, ] | undefined; @@ -148,6 +152,56 @@ describe("dispatchReplyFromConfig reply_dispatch hook", () => { counts: { tool: 1, block: 2, final: 3 }, }); }); + + it.each([ + { + name: "local", + ctx: createHookCtx(), + shouldRouteToOriginating: false, + }, + { + name: "routed", + ctx: { + ...createHookCtx(), + Provider: "slack", + OriginatingChannel: "discord", + OriginatingTo: "discord:123", + }, + shouldRouteToOriginating: true, + }, + ])( + "suppresses $name reply_dispatch lifecycle when typing starts at visible delivery", + async ({ ctx, shouldRouteToOriginating }) => { + hookMocks.runner.runReplyDispatch.mockResolvedValue({ + handled: true, + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }); + const onReplyStart = vi.fn(); + const onVisibleDeliveryStart = vi.fn(); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher: createDispatcher(), + replyOptions: { + typingStartPolicy: "visible_delivery", + onReplyStart, + onVisibleDeliveryStart, + }, + replyResolver: async () => ({ text: "model reply" }), + }); + + const [replyDispatchEvent, replyDispatchRuntime] = firstReplyDispatchCall() ?? []; + if (shouldRouteToOriginating) { + expect(replyDispatchEvent?.shouldRouteToOriginating).toBe(true); + } + expect(replyDispatchEvent?.suppressReplyLifecycle).toBe(true); + expect(replyDispatchRuntime?.onReplyStart).toBe(onReplyStart); + expect(replyDispatchRuntime?.onVisibleDeliveryStart).toBe(onVisibleDeliveryStart); + }, + ); + it("still applies send-policy deny after an unhandled plugin dispatch", async () => { hookMocks.runner.runReplyDispatch.mockResolvedValue({ handled: false, diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 4d8ffdfa412e..a8efc3ed797c 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -1278,6 +1278,8 @@ describe("dispatchReplyFromConfig", () => { it("routes when OriginatingChannel differs from Provider", async () => { setNoAbort(); mocks.routeReply.mockClear(); + const onReplyStart = vi.fn(); + const onVisibleDeliveryStart = vi.fn(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ @@ -1294,7 +1296,17 @@ describe("dispatchReplyFromConfig", () => { _opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => ({ text: "hi" }) satisfies ReplyPayload; - await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + await dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher, + replyOptions: { + typingStartPolicy: "visible_delivery", + onReplyStart, + onVisibleDeliveryStart, + }, + replyResolver, + }); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); const routeCall = firstRouteReplyCall() as @@ -1303,6 +1315,7 @@ describe("dispatchReplyFromConfig", () => { channel?: unknown; groupId?: unknown; isGroup?: unknown; + onVisibleDeliveryStart?: () => Promise; threadId?: unknown; to?: unknown; } @@ -1313,6 +1326,13 @@ describe("dispatchReplyFromConfig", () => { expect(routeCall?.threadId).toBe(123); expect(routeCall?.isGroup).toBe(true); expect(routeCall?.groupId).toBe("telegram:999"); + expect(routeCall?.onVisibleDeliveryStart).toEqual(expect.any(Function)); + expect(onReplyStart).not.toHaveBeenCalled(); + + await routeCall?.onVisibleDeliveryStart?.(); + + expect(onVisibleDeliveryStart).toHaveBeenCalledTimes(1); + expect(onReplyStart).not.toHaveBeenCalled(); }); it("routes exec-event replies using persisted session delivery context when current turn has no originating route", async () => { diff --git a/src/auto-reply/reply/get-reply-directives.target-session.test.ts b/src/auto-reply/reply/get-reply-directives.target-session.test.ts index 6d1c522c7546..7185558f90eb 100644 --- a/src/auto-reply/reply/get-reply-directives.target-session.test.ts +++ b/src/auto-reply/reply/get-reply-directives.target-session.test.ts @@ -24,6 +24,7 @@ function makeTypingController() { return { onReplyStart: async () => {}, startTypingLoop: async () => {}, + startTypingForVisibleDelivery: async () => {}, startTypingOnText: async () => {}, refreshTypingTtl: () => {}, isActive: () => false, @@ -374,6 +375,7 @@ describe("resolveReplyDirectives", () => { typing: { onReplyStart: async () => {}, startTypingLoop: async () => {}, + startTypingForVisibleDelivery: async () => {}, startTypingOnText: async () => {}, refreshTypingTtl: () => {}, isActive: () => false, diff --git a/src/auto-reply/reply/get-reply-inline-actions.skip-when-config-empty.test.ts b/src/auto-reply/reply/get-reply-inline-actions.skip-when-config-empty.test.ts index 15b62e143f15..5fa9be1610e7 100644 --- a/src/auto-reply/reply/get-reply-inline-actions.skip-when-config-empty.test.ts +++ b/src/auto-reply/reply/get-reply-inline-actions.skip-when-config-empty.test.ts @@ -54,6 +54,7 @@ vi.mock("../../channels/plugins/index.js", () => ({ const createTypingController = (): TypingController => ({ onReplyStart: async () => {}, startTypingLoop: async () => {}, + startTypingForVisibleDelivery: async () => {}, startTypingOnText: async () => {}, refreshTypingTtl: () => {}, isActive: () => false, diff --git a/src/auto-reply/reply/get-reply-run.media-only.test.ts b/src/auto-reply/reply/get-reply-run.media-only.test.ts index 0df021256cfe..ec02227a1a0c 100644 --- a/src/auto-reply/reply/get-reply-run.media-only.test.ts +++ b/src/auto-reply/reply/get-reply-run.media-only.test.ts @@ -217,6 +217,7 @@ function baseParams( model: "claude-opus-4-1", typing: { onReplyStart: vi.fn().mockResolvedValue(undefined), + markRunComplete: vi.fn(), cleanup: vi.fn(), } as never, defaultModel: "claude-opus-4-1", @@ -774,7 +775,7 @@ describe("runPreparedReply media-only handling", () => { expect(call.followupRun.prompt).not.toContain("[Thread starter - for context]"); }); - it("returns the empty-body reply when there is no text and no media", async () => { + it("returns the empty-body reply without starting visible-delivery typing", async () => { const result = await runPreparedReply( baseParams({ ctx: { @@ -794,6 +795,40 @@ describe("runPreparedReply media-only handling", () => { text: "I didn't receive any text in your message. Please resend or add a caption.", }); expect(vi.mocked(runReplyAgent)).not.toHaveBeenCalled(); + + const onReplyStart = vi.fn().mockResolvedValue(undefined); + const markRunComplete = vi.fn(); + const cleanup = vi.fn(); + const visiblePolicyResult = await runPreparedReply( + baseParams({ + ctx: { + Body: "", + RawBody: "", + CommandBody: "", + }, + sessionCtx: { + Body: "", + BodyStripped: "", + Provider: "slack", + }, + opts: { + typingStartPolicy: "visible_delivery", + }, + typing: { + onReplyStart, + markRunComplete, + cleanup, + } as never, + }), + ); + + expect(visiblePolicyResult).toEqual({ + text: "I didn't receive any text in your message. Please resend or add a caption.", + }); + expect(onReplyStart).not.toHaveBeenCalled(); + expect(markRunComplete).toHaveBeenCalledTimes(1); + expect(cleanup).not.toHaveBeenCalled(); + expect(vi.mocked(runReplyAgent)).not.toHaveBeenCalled(); }); it("still skips metadata-only turns when inbound context adds chat_id", async () => { diff --git a/src/auto-reply/reply/reply-utils.test.ts b/src/auto-reply/reply/reply-utils.test.ts index 4be66b303b2b..d40794b70937 100644 --- a/src/auto-reply/reply/reply-utils.test.ts +++ b/src/auto-reply/reply/reply-utils.test.ts @@ -431,6 +431,20 @@ describe("typing controller", () => { expect(onReplyStart).not.toHaveBeenCalled(); }); + it("allows visible-delivery typing during settled delivery before run completion", async () => { + vi.useFakeTimers(); + const { typing, onReplyStart } = createTestTypingController(); + + typing.markDispatchIdle(); + await typing.startTypingForVisibleDelivery(); + expect(onReplyStart).toHaveBeenCalledTimes(1); + + typing.markRunComplete(); + await typing.startTypingForVisibleDelivery(); + await vi.advanceTimersByTimeAsync(2_000); + expect(onReplyStart).toHaveBeenCalledTimes(1); + }); + it("does not restart typing after it has stopped", async () => { vi.useFakeTimers(); const { typing, onReplyStart } = createTestTypingController(); @@ -558,6 +572,17 @@ describe("resolveTypingMode", () => { }, expected: "never", }, + { + name: "visible-delivery typing policy disables stream starts", + input: { + configured: "instant" as const, + isGroupChat: false, + wasMentioned: false, + isHeartbeat: false, + typingStartPolicy: "visible_delivery" as const, + }, + expected: "never", + }, ] as const; for (const testCase of cases) { @@ -819,9 +844,11 @@ describe("createTypingSignaler", () => { await signaler.signalRunStart(); await signaler.signalTextDelta("hi"); await signaler.signalReasoningDelta(); + await signaler.signalToolStart(); expect(typing.startTypingLoop, `mode=${params.mode}`).not.toHaveBeenCalled(); expect(typing.startTypingOnText, `mode=${params.mode}`).not.toHaveBeenCalled(); + expect(typing.refreshTypingTtl, `mode=${params.mode}`).not.toHaveBeenCalled(); } }); }); diff --git a/src/auto-reply/reply/reply.test-helpers.ts b/src/auto-reply/reply/reply.test-helpers.ts index 45c1751f0f18..720b95b5b637 100644 --- a/src/auto-reply/reply/reply.test-helpers.ts +++ b/src/auto-reply/reply/reply.test-helpers.ts @@ -3,6 +3,7 @@ export function createMockTypingController() { return { onReplyStart: async () => undefined, startTypingLoop: async () => undefined, + startTypingForVisibleDelivery: async () => undefined, startTypingOnText: async () => undefined, refreshTypingTtl: () => undefined, isActive: () => false, diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index 3499e6af603a..472a30062db3 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -333,6 +333,7 @@ describe("routeReply", () => { }); it("returns routed reply hook suppression reasons from durable delivery", async () => { + const onVisibleDeliveryStart = vi.fn(async () => {}); mocks.deliverOutboundPayloads.mockImplementationOnce( async ({ onPayloadDeliveryOutcome, @@ -353,6 +354,7 @@ describe("routeReply", () => { channel: "telegram", to: "chat-1", cfg: {} as never, + onVisibleDeliveryStart, }); expect(res).toEqual({ @@ -369,6 +371,13 @@ describe("routeReply", () => { conversationId: "chat-1", }, }); + const onPlatformSendStart = lastDelivery().onPlatformSendStart as + | (() => Promise | void) + | undefined; + expect(onPlatformSendStart).toEqual(expect.any(Function)); + expect(onVisibleDeliveryStart).not.toHaveBeenCalled(); + await onPlatformSendStart?.(); + expect(onVisibleDeliveryStart).toHaveBeenCalledTimes(1); }); it("suppresses routed delivery when reply payload hooks cancel", async () => { diff --git a/src/auto-reply/reply/test-helpers.ts b/src/auto-reply/reply/test-helpers.ts index 9ba7b9b93101..94b8aebdc466 100644 --- a/src/auto-reply/reply/test-helpers.ts +++ b/src/auto-reply/reply/test-helpers.ts @@ -10,6 +10,7 @@ export function createMockTypingController( return { onReplyStart: vi.fn(async () => {}), startTypingLoop: vi.fn(async () => {}), + startTypingForVisibleDelivery: vi.fn(async () => {}), startTypingOnText: vi.fn(async () => {}), refreshTypingTtl: vi.fn(), isActive: vi.fn(() => false), diff --git a/src/auto-reply/reply/typing-persistence.test.ts b/src/auto-reply/reply/typing-persistence.test.ts index 81a4875d4fc2..2dbcf2e90325 100644 --- a/src/auto-reply/reply/typing-persistence.test.ts +++ b/src/auto-reply/reply/typing-persistence.test.ts @@ -42,6 +42,24 @@ describe("typing persistence bug fix", () => { expect(onReplyStartSpy).not.toHaveBeenCalledTimes(2); }); + it("allows visible-delivery typing after run and dispatcher idle", async () => { + const lateController = createTypingController({ + onReplyStart: onReplyStartSpy, + onCleanup: onCleanupSpy, + typingIntervalSeconds: 6, + log: vi.fn(), + }); + + lateController.markRunComplete(); + lateController.markDispatchIdle(); + + await lateController.startTypingForVisibleDelivery(); + await lateController.startTypingForVisibleDelivery(); + + expect(onReplyStartSpy).toHaveBeenCalledTimes(1); + expect(onCleanupSpy).not.toHaveBeenCalled(); + }); + it("keeps typing alive while keepalive ticks continue during long runs", async () => { const longRunCleanupSpy = vi.fn(); const longRunController = createTypingController({ diff --git a/src/channels/message/send.test.ts b/src/channels/message/send.test.ts index d7e502bbdf81..98cac521157d 100644 --- a/src/channels/message/send.test.ts +++ b/src/channels/message/send.test.ts @@ -20,6 +20,7 @@ import type { DurableMessageSendIntent } from "./types.js"; type DeliveryIntentCallbackParams = { onDeliveryIntent?: (intent: OutboundDeliveryIntent) => void; + onPlatformSendStart?: () => Promise | void; onPayloadDeliveryOutcome?: (outcome: OutboundPayloadDeliveryOutcome) => void; }; @@ -66,6 +67,7 @@ function expectBatchStatus { it("renders and sends through a durable send context", async () => { deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => { + await params.onPlatformSendStart?.(); params.onDeliveryIntent?.({ id: "intent-1", channel: "telegram", @@ -74,6 +76,7 @@ describe("withDurableMessageSendContext", () => { }); return [{ channel: "telegram", messageId: "msg-1" }]; }); + const onPlatformSendStart = vi.fn(); const result = await withDurableMessageSendContext( { @@ -83,6 +86,7 @@ describe("withDurableMessageSendContext", () => { payloads: [{ text: "hello" }], threadId: 42, replyToId: "reply-1", + onPlatformSendStart, }, async (ctx) => { expect(ctx.id).toBe("telegram:chat-1"); @@ -124,6 +128,7 @@ describe("withDurableMessageSendContext", () => { expect(request.payloads).toEqual([{ text: "hello" }]); expect(request.threadId).toBe(42); expect(request.replyToId).toBe("reply-1"); + expect(onPlatformSendStart).toHaveBeenCalledTimes(1); }); it("records a replayable rendered batch plan on the durable intent", async () => { diff --git a/src/channels/turn/durable-delivery.test.ts b/src/channels/turn/durable-delivery.test.ts index 3f77bcf2970c..bb6f641280ab 100644 --- a/src/channels/turn/durable-delivery.test.ts +++ b/src/channels/turn/durable-delivery.test.ts @@ -34,6 +34,7 @@ type SendDurableMessageBatchRequest = { threadId?: string | number | null; durability?: string; gatewayClientScopes?: readonly string[]; + onPlatformSendStart?: () => Promise | void; }; type DeliverySupportRequest = { @@ -146,12 +147,15 @@ describe("durable inbound reply delivery", () => { }); it("does not require unknown-send reconciliation for the default best-effort final path", async () => { + const onVisibleDeliveryStart = vi.fn(); + await deliverInboundReplyWithMessageSendContext({ cfg: {}, channel: "telegram", agentId: "main", info: { kind: "final" }, payload: { text: "final" }, + onVisibleDeliveryStart, ctxPayload: ctxPayload({ OriginatingTo: "chat-1", }), @@ -164,6 +168,8 @@ describe("durable inbound reply delivery", () => { }); expect(mocks.sendDurableMessageBatch).toHaveBeenCalledTimes(1); expect(latestSendDurableMessageBatchRequest().durability).toBe("best_effort"); + await latestSendDurableMessageBatchRequest().onPlatformSendStart?.(); + expect(onVisibleDeliveryStart).toHaveBeenCalledTimes(1); }); it("uses required durability when a caller explicitly requires unknown-send reconciliation", async () => { diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index ac34e0427da1..839b78d35029 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -666,6 +666,9 @@ describe("deliverOutboundPayloads", () => { const afterCommit = vi.fn((ctx: { attemptToken?: unknown; result: { messageId?: string } }) => { order.push(`commit:${String(ctx.attemptToken)}:${ctx.result.messageId ?? ""}`); }); + const onPlatformSendStart = vi.fn(async () => { + order.push("visible-start"); + }); setActivePluginRegistry( createTestRegistry([ { @@ -702,12 +705,14 @@ describe("deliverOutboundPayloads", () => { to: "!room:example", payloads: [{ text: "hello" }], queuePolicy: "required", + onPlatformSendStart, }); expect(order).toEqual([ "queue", "before", "mark-started", + "visible-start", "send", "after:pending-1:message-adapter-1", "mark-unknown", @@ -732,6 +737,7 @@ describe("deliverOutboundPayloads", () => { expect(commitParams?.kind).toBe("text"); expect(commitParams?.attemptToken).toBe("pending-1"); expect(commitParams?.result?.messageId).toBe("message-adapter-1"); + expect(onPlatformSendStart).toHaveBeenCalledTimes(1); expect(results[0]?.channel).toBe("matrix"); expect(results[0]?.messageId).toBe("message-adapter-1"); });