test: cover visible delivery typing handoff

This commit is contained in:
Marcus Castro
2026-06-04 19:28:25 -03:00
parent 2379b00afe
commit de0ae37acb
18 changed files with 541 additions and 61 deletions

View File

@@ -1095,6 +1095,7 @@ describe("web auto-reply connection", () => {
const typingMock = {
onReplyStart: vi.fn(async () => {}),
startTypingLoop: vi.fn(async () => {}),
startTypingForVisibleDelivery: vi.fn(async () => {}),
startTypingOnText: vi.fn(async () => {}),
refreshTypingTtl: vi.fn(),
isActive: vi.fn(() => false),

View File

@@ -17,7 +17,10 @@ type CapturedDispatchParams = {
dispatcherOptions?: {
deliver?: (
payload: CapturedReplyPayload,
info: { kind: "tool" | "block" | "final" },
info: {
kind: "tool" | "block" | "final";
startVisibleDeliveryTyping?: () => Promise<void>;
},
) => Promise<unknown>;
onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void;
onSettled?: () => Promise<unknown>;
@@ -26,6 +29,7 @@ type CapturedDispatchParams = {
disableBlockStreaming?: boolean;
sourceReplyDeliveryMode?: "automatic" | "message_tool_only";
suppressTyping?: boolean;
typingStartPolicy?: "visible_delivery";
};
};
@@ -181,6 +185,12 @@ function makeMsg(overrides: Partial<TestMsg> = {}): TestMsg {
};
}
const whatsappGroupJid = "120363000000000000@g.us";
function makeWhatsAppGroupMsg(overrides: Partial<TestMsg> = {}): TestMsg {
return makeMsg({ from: whatsappGroupJid, chatType: "group", ...overrides });
}
function getCapturedDeliver() {
return (capturedDispatchParams as CapturedDispatchParams)?.dispatcherOptions?.deliver;
}
@@ -697,6 +707,50 @@ describe("whatsapp inbound dispatch", () => {
});
});
it("passes visible-delivery typing into durable final WhatsApp sends", async () => {
const startVisibleDeliveryTyping = vi.fn();
deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({
status: "handled_visible",
delivery: {
messageIds: ["wa-1"],
visibleReplySent: true,
},
});
await dispatchBufferedReply({
cfg: {
channels: { whatsapp: { blockStreaming: true } },
messages: { groupChat: { visibleReplies: "automatic" } },
} as never,
context: { Body: "incoming", ChatType: "group" },
msg: makeWhatsAppGroupMsg(),
});
const deliver = getCapturedDeliver();
await deliver?.(
{ text: "final payload" },
{
kind: "final",
startVisibleDeliveryTyping,
},
);
const durableParams = requireMockArg(
deliverInboundReplyWithMessageSendContextMock,
0,
0,
"durable delivery params",
);
const start = durableParams.onVisibleDeliveryStart;
if (typeof start !== "function") {
throw new Error("expected durable visible delivery start hook");
}
expect(startVisibleDeliveryTyping).not.toHaveBeenCalled();
await start();
expect(startVisibleDeliveryTyping).toHaveBeenCalledTimes(1);
});
it("does not fall back when durable WhatsApp delivery suppresses a send", async () => {
deliverInboundReplyWithMessageSendContextMock.mockResolvedValueOnce({
status: "handled_no_send",
@@ -1038,7 +1092,7 @@ describe("whatsapp inbound dispatch", () => {
it("defaults WhatsApp group replies to message-tool-only and disables source streaming", async () => {
await dispatchBufferedReply({
context: { Body: "hi", ChatType: "group" },
msg: makeMsg({ from: "120363000000000000@g.us", chatType: "group" }),
msg: makeWhatsAppGroupMsg(),
});
expectRecordFields(requireRecord(getCapturedReplyOptions(), "reply options"), {
@@ -1047,8 +1101,70 @@ describe("whatsapp inbound dispatch", () => {
});
});
it("delivers authorized WhatsApp group text slash command replies visibly", async () => {
await dispatchBufferedReply({
it.each([
{
name: "automatic unmentioned group",
cfg: {
channels: { whatsapp: { blockStreaming: true } },
messages: { groupChat: { visibleReplies: "automatic" } },
} as never,
context: { Body: "hi", ChatType: "group" },
msg: makeWhatsAppGroupMsg(),
expected: {
sourceReplyDeliveryMode: "automatic",
disableBlockStreaming: false,
suppressTyping: false,
typingStartPolicy: "visible_delivery",
},
},
{
name: "passive message-tool-only group",
context: { Body: "hi", ChatType: "group" },
msg: makeWhatsAppGroupMsg({ wasMentioned: false }),
expected: {
suppressTyping: true,
typingStartPolicy: undefined,
},
},
{
name: "mentioned group",
context: { Body: "@bot hi", ChatType: "group" },
msg: makeWhatsAppGroupMsg({ wasMentioned: true }),
expected: {
suppressTyping: false,
typingStartPolicy: undefined,
},
},
{
name: "direct chat",
context: { Body: "hi", ChatType: "direct" },
msg: makeMsg({ from: "+15550001000", chatType: "direct" }),
expected: {
suppressTyping: false,
typingStartPolicy: undefined,
},
},
{
name: "native",
context: {
Body: "/status",
ChatType: "group",
CommandSource: "native" as const,
},
msg: makeWhatsAppGroupMsg({ body: "/status", wasMentioned: false }),
expected: {
suppressTyping: false,
typingStartPolicy: undefined,
},
commandTurn: {
kind: "native" as const,
source: "native" as const,
authorized: true,
body: "/status",
},
},
{
name: "authorized text slash",
cfg: {
channels: { whatsapp: { blockStreaming: true } },
messages: { groupChat: { visibleReplies: "message_tool" } },
@@ -1057,64 +1173,37 @@ describe("whatsapp inbound dispatch", () => {
Body: "/status",
ChatType: "group",
CommandAuthorized: true,
CommandSource: "text",
CommandSource: "text" as const,
},
msg: makeMsg({
msg: makeWhatsAppGroupMsg({ body: "/status", wasMentioned: false }),
expected: {
sourceReplyDeliveryMode: "automatic",
disableBlockStreaming: false,
suppressTyping: false,
typingStartPolicy: undefined,
},
commandTurn: {
kind: "text-slash" as const,
source: "text" as const,
authorized: true,
body: "/status",
from: "120363000000000000@g.us",
chatType: "group",
}),
});
expectRecordFields(requireRecord(getCapturedReplyOptions(), "reply options"), {
sourceReplyDeliveryMode: "automatic",
disableBlockStreaming: false,
suppressTyping: false,
});
});
it("honors automatic visible replies for WhatsApp groups", async () => {
},
},
])("resolves visible reply and typing policy for $name", async (testCase) => {
await dispatchBufferedReply({
cfg: {
channels: { whatsapp: { blockStreaming: true } },
messages: { groupChat: { visibleReplies: "automatic" } },
} as never,
context: { Body: "hi", ChatType: "group" },
msg: makeMsg({ from: "120363000000000000@g.us", chatType: "group" }),
...("cfg" in testCase ? { cfg: testCase.cfg } : {}),
context: {
...testCase.context,
...("commandTurn" in testCase ? { CommandTurn: testCase.commandTurn } : {}),
},
msg: testCase.msg,
});
expectRecordFields(requireRecord(getCapturedReplyOptions(), "reply options"), {
sourceReplyDeliveryMode: "automatic",
disableBlockStreaming: false,
suppressTyping: false,
});
});
it("suppresses typing for message-tool-only group chat without mention", async () => {
await dispatchBufferedReply({
context: { Body: "hi", ChatType: "group" },
msg: makeMsg({ from: "120363000000000000@g.us", chatType: "group", wasMentioned: false }),
});
expect(getCapturedReplyOptions()?.suppressTyping).toBe(true);
});
it("does not suppress typing for group chat when mentioned", async () => {
await dispatchBufferedReply({
context: { Body: "@bot hi", ChatType: "group" },
msg: makeMsg({ from: "120363000000000000@g.us", chatType: "group", wasMentioned: true }),
});
expect(getCapturedReplyOptions()?.suppressTyping).toBe(false);
});
it("does not suppress typing for direct chat", async () => {
await dispatchBufferedReply({
context: { Body: "hi", ChatType: "direct" },
msg: makeMsg({ from: "+15550001000", chatType: "direct" }),
});
expect(getCapturedReplyOptions()?.suppressTyping).toBe(false);
const replyOptions = requireRecord(getCapturedReplyOptions(), "reply options");
expectRecordFields(replyOptions, testCase.expected);
if (testCase.expected.typingStartPolicy === undefined) {
expect(getCapturedReplyOptions()?.typingStartPolicy).toBeUndefined();
}
});
it("treats block-only turns as visible replies instead of silent turns", async () => {

View File

@@ -239,6 +239,7 @@ describe("withReplyDispatcher", () => {
const typing = {
onReplyStart: vi.fn(async () => {}),
startTypingLoop: vi.fn(async () => {}),
startTypingForVisibleDelivery: vi.fn(async () => {}),
startTypingOnText: vi.fn(async () => {}),
refreshTypingTtl: vi.fn(),
isActive: vi.fn(() => true),

View File

@@ -1,7 +1,9 @@
// Tests before-deliver hook ordering and payload mutation behavior.
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import type { ReplyPayload } from "../types.js";
import { createReplyDispatcher } from "./reply-dispatcher.js";
import { createReplyDispatcher, createReplyDispatcherWithTyping } from "./reply-dispatcher.js";
import { createMockTypingController } from "./test-helpers.js";
import { createTypingController, VISIBLE_DELIVERY_TYPING_START_TIMEOUT_MS } from "./typing.js";
describe("beforeDeliver in reply dispatcher", () => {
it("cancels delivery before queueing when transformReplyPayload returns null", async () => {
@@ -91,4 +93,172 @@ describe("beforeDeliver in reply dispatcher", () => {
expect(delivered).toEqual(["plain reply"]);
});
it("starts real visible-delivery typing once after accepted delivery", async () => {
const order: string[] = [];
const onReplyStart = vi.fn(async () => {
order.push("typing");
});
const { dispatcher, markRunComplete, replyOptions } = createReplyDispatcherWithTyping({
typingStartPolicy: "visible_delivery",
onReplyStart,
deliver: async (payload, info) => {
await info.startVisibleDeliveryTyping?.();
order.push(`deliver:${payload.text ?? ""}`);
},
transformReplyPayload: () => ({ text: "normalized" }),
beforeDeliver: async (payload: ReplyPayload) => {
order.push(`before:${payload.text ?? ""}`);
return { ...payload, text: "approved" };
},
});
const typing = createTypingController({
onReplyStart: replyOptions.onReplyStart,
});
replyOptions.onTypingController?.(typing);
markRunComplete();
dispatcher.sendFinalReply({ text: "raw" });
dispatcher.sendFinalReply({ text: "raw again" });
dispatcher.markComplete();
await dispatcher.waitForIdle();
expect(order).toEqual([
"before:normalized",
"typing",
"deliver:approved",
"before:normalized",
"deliver:approved",
]);
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
it("awaits fallback visible-delivery typing before delivery", async () => {
const delivered: string[] = [];
const order: string[] = [];
let resolveTyping!: () => void;
const typingStarted = new Promise<void>((resolve) => {
resolveTyping = resolve;
});
const onReplyStart = vi.fn(async () => {
order.push("typing:start");
await typingStarted;
order.push("typing:done");
});
const { dispatcher } = createReplyDispatcherWithTyping({
typingStartPolicy: "visible_delivery",
onReplyStart,
deliver: async (payload, info) => {
await info.startVisibleDeliveryTyping?.();
order.push("deliver");
delivered.push(payload.text ?? "");
},
});
dispatcher.sendFinalReply({ text: "visible" });
dispatcher.markComplete();
const idle = dispatcher.waitForIdle();
await Promise.resolve();
await Promise.resolve();
expect(order).toEqual(["typing:start"]);
expect(delivered).toEqual([]);
resolveTyping();
await idle;
expect(order).toEqual(["typing:start", "typing:done", "deliver"]);
expect(onReplyStart).toHaveBeenCalledTimes(1);
expect(delivered).toEqual(["visible"]);
});
it.each([
{ name: "typing controller", installController: true },
{ name: "fallback lifecycle callback", installController: false },
] as const)(
"continues delivery when $name visible-delivery typing stalls past the bounded wait",
async ({ installController }) => {
vi.useFakeTimers();
try {
const delivered: string[] = [];
const startTyping = vi.fn(() => new Promise<void>(() => {}));
const { dispatcher, replyOptions } = createReplyDispatcherWithTyping({
typingStartPolicy: "visible_delivery",
...(installController ? {} : { onReplyStart: startTyping }),
deliver: async (payload, info) => {
await info.startVisibleDeliveryTyping?.();
delivered.push(payload.text ?? "");
},
});
if (installController) {
replyOptions.onTypingController?.(
createMockTypingController({
startTypingForVisibleDelivery: startTyping,
}),
);
}
dispatcher.sendFinalReply({ text: "visible" });
dispatcher.markComplete();
const idle = dispatcher.waitForIdle();
await Promise.resolve();
await Promise.resolve();
expect(startTyping).toHaveBeenCalledTimes(1);
expect(delivered).toEqual([]);
await vi.advanceTimersByTimeAsync(VISIBLE_DELIVERY_TYPING_START_TIMEOUT_MS);
await idle;
expect(delivered).toEqual(["visible"]);
} finally {
vi.useRealTimers();
}
},
);
it.each([
{
name: "beforeDeliver cancels",
options: {
beforeDeliver: async () => null,
},
payload: { text: "blocked" },
},
{
name: "suppressTyping is hard never",
options: {
suppressTyping: true,
},
payload: { text: "visible" },
},
{
name: "delivery owner leaves the hook silent",
options: {},
payload: { text: "dropped" },
skipHook: true,
},
] as const)("does not start visible-delivery typing when $name", async (testCase) => {
const typing = createMockTypingController();
const { dispatcher, replyOptions } = createReplyDispatcherWithTyping({
...testCase.options,
typingStartPolicy: "visible_delivery",
deliver: async (_payload, info) => {
if (!("skipHook" in testCase) || testCase.skipHook !== true) {
await info.startVisibleDeliveryTyping?.();
}
},
});
replyOptions.onTypingController?.(typing);
dispatcher.sendFinalReply(testCase.payload);
dispatcher.markComplete();
await dispatcher.waitForIdle();
expect(typing.startTypingForVisibleDelivery).not.toHaveBeenCalled();
});
});

View File

@@ -658,6 +658,40 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(onReplyStart).not.toHaveBeenCalled();
});
it("threads visible delivery lifecycle through routed sends when generic lifecycle is suppressed", async () => {
const onReplyStart = vi.fn(async () => {});
const onVisibleDeliveryStart = vi.fn(async () => {});
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "visiblechat",
Surface: "visiblechat",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher: createDispatcher(),
inboundAudio: false,
suppressReplyLifecycle: true,
shouldRouteToOriginating: true,
originatingChannel: "visiblechat",
originatingTo: "channel:thread-1",
onReplyStart,
onVisibleDeliveryStart,
});
const delivered = await coordinator.deliver("block", { text: "hello" });
expect(delivered).toBe(true);
expect(deliveryMocks.routeReply).toHaveBeenCalledTimes(1);
const [[routeParams]] = deliveryMocks.routeReply.mock.calls as unknown as Array<
[{ onVisibleDeliveryStart?: () => Promise<void> }]
>;
expect(routeParams.onVisibleDeliveryStart).toEqual(expect.any(Function));
expect(onVisibleDeliveryStart).not.toHaveBeenCalled();
await routeParams.onVisibleDeliveryStart?.();
expect(onVisibleDeliveryStart).toHaveBeenCalledTimes(1);
expect(onReplyStart).not.toHaveBeenCalled();
});
it("can start reply lifecycle while user delivery is suppressed", async () => {
const onReplyStart = vi.fn(async () => {});
const dispatcher = createDispatcher();

View File

@@ -37,9 +37,13 @@ function firstReplyDispatchCall() {
sessionKey?: string;
sendPolicy?: string;
inboundAudio?: boolean;
suppressReplyLifecycle?: boolean;
shouldRouteToOriginating?: boolean;
},
{
cfg?: unknown;
onReplyStart?: unknown;
onVisibleDeliveryStart?: unknown;
},
]
| undefined;
@@ -148,6 +152,56 @@ describe("dispatchReplyFromConfig reply_dispatch hook", () => {
counts: { tool: 1, block: 2, final: 3 },
});
});
it.each([
{
name: "local",
ctx: createHookCtx(),
shouldRouteToOriginating: false,
},
{
name: "routed",
ctx: {
...createHookCtx(),
Provider: "slack",
OriginatingChannel: "discord",
OriginatingTo: "discord:123",
},
shouldRouteToOriginating: true,
},
])(
"suppresses $name reply_dispatch lifecycle when typing starts at visible delivery",
async ({ ctx, shouldRouteToOriginating }) => {
hookMocks.runner.runReplyDispatch.mockResolvedValue({
handled: true,
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
});
const onReplyStart = vi.fn();
const onVisibleDeliveryStart = vi.fn();
await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher: createDispatcher(),
replyOptions: {
typingStartPolicy: "visible_delivery",
onReplyStart,
onVisibleDeliveryStart,
},
replyResolver: async () => ({ text: "model reply" }),
});
const [replyDispatchEvent, replyDispatchRuntime] = firstReplyDispatchCall() ?? [];
if (shouldRouteToOriginating) {
expect(replyDispatchEvent?.shouldRouteToOriginating).toBe(true);
}
expect(replyDispatchEvent?.suppressReplyLifecycle).toBe(true);
expect(replyDispatchRuntime?.onReplyStart).toBe(onReplyStart);
expect(replyDispatchRuntime?.onVisibleDeliveryStart).toBe(onVisibleDeliveryStart);
},
);
it("still applies send-policy deny after an unhandled plugin dispatch", async () => {
hookMocks.runner.runReplyDispatch.mockResolvedValue({
handled: false,

View File

@@ -1278,6 +1278,8 @@ describe("dispatchReplyFromConfig", () => {
it("routes when OriginatingChannel differs from Provider", async () => {
setNoAbort();
mocks.routeReply.mockClear();
const onReplyStart = vi.fn();
const onVisibleDeliveryStart = vi.fn();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
@@ -1294,7 +1296,17 @@ describe("dispatchReplyFromConfig", () => {
_opts?: GetReplyOptions,
_cfg?: OpenClawConfig,
) => ({ text: "hi" }) satisfies ReplyPayload;
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyOptions: {
typingStartPolicy: "visible_delivery",
onReplyStart,
onVisibleDeliveryStart,
},
replyResolver,
});
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
const routeCall = firstRouteReplyCall() as
@@ -1303,6 +1315,7 @@ describe("dispatchReplyFromConfig", () => {
channel?: unknown;
groupId?: unknown;
isGroup?: unknown;
onVisibleDeliveryStart?: () => Promise<void>;
threadId?: unknown;
to?: unknown;
}
@@ -1313,6 +1326,13 @@ describe("dispatchReplyFromConfig", () => {
expect(routeCall?.threadId).toBe(123);
expect(routeCall?.isGroup).toBe(true);
expect(routeCall?.groupId).toBe("telegram:999");
expect(routeCall?.onVisibleDeliveryStart).toEqual(expect.any(Function));
expect(onReplyStart).not.toHaveBeenCalled();
await routeCall?.onVisibleDeliveryStart?.();
expect(onVisibleDeliveryStart).toHaveBeenCalledTimes(1);
expect(onReplyStart).not.toHaveBeenCalled();
});
it("routes exec-event replies using persisted session delivery context when current turn has no originating route", async () => {

View File

@@ -24,6 +24,7 @@ function makeTypingController() {
return {
onReplyStart: async () => {},
startTypingLoop: async () => {},
startTypingForVisibleDelivery: async () => {},
startTypingOnText: async () => {},
refreshTypingTtl: () => {},
isActive: () => false,
@@ -374,6 +375,7 @@ describe("resolveReplyDirectives", () => {
typing: {
onReplyStart: async () => {},
startTypingLoop: async () => {},
startTypingForVisibleDelivery: async () => {},
startTypingOnText: async () => {},
refreshTypingTtl: () => {},
isActive: () => false,

View File

@@ -54,6 +54,7 @@ vi.mock("../../channels/plugins/index.js", () => ({
const createTypingController = (): TypingController => ({
onReplyStart: async () => {},
startTypingLoop: async () => {},
startTypingForVisibleDelivery: async () => {},
startTypingOnText: async () => {},
refreshTypingTtl: () => {},
isActive: () => false,

View File

@@ -217,6 +217,7 @@ function baseParams(
model: "claude-opus-4-1",
typing: {
onReplyStart: vi.fn().mockResolvedValue(undefined),
markRunComplete: vi.fn(),
cleanup: vi.fn(),
} as never,
defaultModel: "claude-opus-4-1",
@@ -774,7 +775,7 @@ describe("runPreparedReply media-only handling", () => {
expect(call.followupRun.prompt).not.toContain("[Thread starter - for context]");
});
it("returns the empty-body reply when there is no text and no media", async () => {
it("returns the empty-body reply without starting visible-delivery typing", async () => {
const result = await runPreparedReply(
baseParams({
ctx: {
@@ -794,6 +795,40 @@ describe("runPreparedReply media-only handling", () => {
text: "I didn't receive any text in your message. Please resend or add a caption.",
});
expect(vi.mocked(runReplyAgent)).not.toHaveBeenCalled();
const onReplyStart = vi.fn().mockResolvedValue(undefined);
const markRunComplete = vi.fn();
const cleanup = vi.fn();
const visiblePolicyResult = await runPreparedReply(
baseParams({
ctx: {
Body: "",
RawBody: "",
CommandBody: "",
},
sessionCtx: {
Body: "",
BodyStripped: "",
Provider: "slack",
},
opts: {
typingStartPolicy: "visible_delivery",
},
typing: {
onReplyStart,
markRunComplete,
cleanup,
} as never,
}),
);
expect(visiblePolicyResult).toEqual({
text: "I didn't receive any text in your message. Please resend or add a caption.",
});
expect(onReplyStart).not.toHaveBeenCalled();
expect(markRunComplete).toHaveBeenCalledTimes(1);
expect(cleanup).not.toHaveBeenCalled();
expect(vi.mocked(runReplyAgent)).not.toHaveBeenCalled();
});
it("still skips metadata-only turns when inbound context adds chat_id", async () => {

View File

@@ -431,6 +431,20 @@ describe("typing controller", () => {
expect(onReplyStart).not.toHaveBeenCalled();
});
it("allows visible-delivery typing during settled delivery before run completion", async () => {
vi.useFakeTimers();
const { typing, onReplyStart } = createTestTypingController();
typing.markDispatchIdle();
await typing.startTypingForVisibleDelivery();
expect(onReplyStart).toHaveBeenCalledTimes(1);
typing.markRunComplete();
await typing.startTypingForVisibleDelivery();
await vi.advanceTimersByTimeAsync(2_000);
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
it("does not restart typing after it has stopped", async () => {
vi.useFakeTimers();
const { typing, onReplyStart } = createTestTypingController();
@@ -558,6 +572,17 @@ describe("resolveTypingMode", () => {
},
expected: "never",
},
{
name: "visible-delivery typing policy disables stream starts",
input: {
configured: "instant" as const,
isGroupChat: false,
wasMentioned: false,
isHeartbeat: false,
typingStartPolicy: "visible_delivery" as const,
},
expected: "never",
},
] as const;
for (const testCase of cases) {
@@ -819,9 +844,11 @@ describe("createTypingSignaler", () => {
await signaler.signalRunStart();
await signaler.signalTextDelta("hi");
await signaler.signalReasoningDelta();
await signaler.signalToolStart();
expect(typing.startTypingLoop, `mode=${params.mode}`).not.toHaveBeenCalled();
expect(typing.startTypingOnText, `mode=${params.mode}`).not.toHaveBeenCalled();
expect(typing.refreshTypingTtl, `mode=${params.mode}`).not.toHaveBeenCalled();
}
});
});

View File

@@ -3,6 +3,7 @@ export function createMockTypingController() {
return {
onReplyStart: async () => undefined,
startTypingLoop: async () => undefined,
startTypingForVisibleDelivery: async () => undefined,
startTypingOnText: async () => undefined,
refreshTypingTtl: () => undefined,
isActive: () => false,

View File

@@ -333,6 +333,7 @@ describe("routeReply", () => {
});
it("returns routed reply hook suppression reasons from durable delivery", async () => {
const onVisibleDeliveryStart = vi.fn(async () => {});
mocks.deliverOutboundPayloads.mockImplementationOnce(
async ({
onPayloadDeliveryOutcome,
@@ -353,6 +354,7 @@ describe("routeReply", () => {
channel: "telegram",
to: "chat-1",
cfg: {} as never,
onVisibleDeliveryStart,
});
expect(res).toEqual({
@@ -369,6 +371,13 @@ describe("routeReply", () => {
conversationId: "chat-1",
},
});
const onPlatformSendStart = lastDelivery().onPlatformSendStart as
| (() => Promise<void> | void)
| undefined;
expect(onPlatformSendStart).toEqual(expect.any(Function));
expect(onVisibleDeliveryStart).not.toHaveBeenCalled();
await onPlatformSendStart?.();
expect(onVisibleDeliveryStart).toHaveBeenCalledTimes(1);
});
it("suppresses routed delivery when reply payload hooks cancel", async () => {

View File

@@ -10,6 +10,7 @@ export function createMockTypingController(
return {
onReplyStart: vi.fn(async () => {}),
startTypingLoop: vi.fn(async () => {}),
startTypingForVisibleDelivery: vi.fn(async () => {}),
startTypingOnText: vi.fn(async () => {}),
refreshTypingTtl: vi.fn(),
isActive: vi.fn(() => false),

View File

@@ -42,6 +42,24 @@ describe("typing persistence bug fix", () => {
expect(onReplyStartSpy).not.toHaveBeenCalledTimes(2);
});
it("allows visible-delivery typing after run and dispatcher idle", async () => {
const lateController = createTypingController({
onReplyStart: onReplyStartSpy,
onCleanup: onCleanupSpy,
typingIntervalSeconds: 6,
log: vi.fn(),
});
lateController.markRunComplete();
lateController.markDispatchIdle();
await lateController.startTypingForVisibleDelivery();
await lateController.startTypingForVisibleDelivery();
expect(onReplyStartSpy).toHaveBeenCalledTimes(1);
expect(onCleanupSpy).not.toHaveBeenCalled();
});
it("keeps typing alive while keepalive ticks continue during long runs", async () => {
const longRunCleanupSpy = vi.fn();
const longRunController = createTypingController({

View File

@@ -20,6 +20,7 @@ import type { DurableMessageSendIntent } from "./types.js";
type DeliveryIntentCallbackParams = {
onDeliveryIntent?: (intent: OutboundDeliveryIntent) => void;
onPlatformSendStart?: () => Promise<void> | void;
onPayloadDeliveryOutcome?: (outcome: OutboundPayloadDeliveryOutcome) => void;
};
@@ -66,6 +67,7 @@ function expectBatchStatus<TStatus extends DurableMessageBatchSendResult["status
describe("withDurableMessageSendContext", () => {
it("renders and sends through a durable send context", async () => {
deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => {
await params.onPlatformSendStart?.();
params.onDeliveryIntent?.({
id: "intent-1",
channel: "telegram",
@@ -74,6 +76,7 @@ describe("withDurableMessageSendContext", () => {
});
return [{ channel: "telegram", messageId: "msg-1" }];
});
const onPlatformSendStart = vi.fn();
const result = await withDurableMessageSendContext(
{
@@ -83,6 +86,7 @@ describe("withDurableMessageSendContext", () => {
payloads: [{ text: "hello" }],
threadId: 42,
replyToId: "reply-1",
onPlatformSendStart,
},
async (ctx) => {
expect(ctx.id).toBe("telegram:chat-1");
@@ -124,6 +128,7 @@ describe("withDurableMessageSendContext", () => {
expect(request.payloads).toEqual([{ text: "hello" }]);
expect(request.threadId).toBe(42);
expect(request.replyToId).toBe("reply-1");
expect(onPlatformSendStart).toHaveBeenCalledTimes(1);
});
it("records a replayable rendered batch plan on the durable intent", async () => {

View File

@@ -34,6 +34,7 @@ type SendDurableMessageBatchRequest = {
threadId?: string | number | null;
durability?: string;
gatewayClientScopes?: readonly string[];
onPlatformSendStart?: () => Promise<void> | void;
};
type DeliverySupportRequest = {
@@ -146,12 +147,15 @@ describe("durable inbound reply delivery", () => {
});
it("does not require unknown-send reconciliation for the default best-effort final path", async () => {
const onVisibleDeliveryStart = vi.fn();
await deliverInboundReplyWithMessageSendContext({
cfg: {},
channel: "telegram",
agentId: "main",
info: { kind: "final" },
payload: { text: "final" },
onVisibleDeliveryStart,
ctxPayload: ctxPayload({
OriginatingTo: "chat-1",
}),
@@ -164,6 +168,8 @@ describe("durable inbound reply delivery", () => {
});
expect(mocks.sendDurableMessageBatch).toHaveBeenCalledTimes(1);
expect(latestSendDurableMessageBatchRequest().durability).toBe("best_effort");
await latestSendDurableMessageBatchRequest().onPlatformSendStart?.();
expect(onVisibleDeliveryStart).toHaveBeenCalledTimes(1);
});
it("uses required durability when a caller explicitly requires unknown-send reconciliation", async () => {

View File

@@ -666,6 +666,9 @@ describe("deliverOutboundPayloads", () => {
const afterCommit = vi.fn((ctx: { attemptToken?: unknown; result: { messageId?: string } }) => {
order.push(`commit:${String(ctx.attemptToken)}:${ctx.result.messageId ?? ""}`);
});
const onPlatformSendStart = vi.fn(async () => {
order.push("visible-start");
});
setActivePluginRegistry(
createTestRegistry([
{
@@ -702,12 +705,14 @@ describe("deliverOutboundPayloads", () => {
to: "!room:example",
payloads: [{ text: "hello" }],
queuePolicy: "required",
onPlatformSendStart,
});
expect(order).toEqual([
"queue",
"before",
"mark-started",
"visible-start",
"send",
"after:pending-1:message-adapter-1",
"mark-unknown",
@@ -732,6 +737,7 @@ describe("deliverOutboundPayloads", () => {
expect(commitParams?.kind).toBe("text");
expect(commitParams?.attemptToken).toBe("pending-1");
expect(commitParams?.result?.messageId).toBe("message-adapter-1");
expect(onPlatformSendStart).toHaveBeenCalledTimes(1);
expect(results[0]?.channel).toBe("matrix");
expect(results[0]?.messageId).toBe("message-adapter-1");
});