fix(auto-reply): count message tool sends as delivery (#90123)

This commit is contained in:
Marcus Castro
2026-06-04 00:26:45 -03:00
committed by GitHub
parent 06fee678e1
commit 88dc177afc
15 changed files with 130 additions and 6 deletions

View File

@@ -1139,6 +1139,31 @@ describe("whatsapp inbound dispatch", () => {
expect(rememberSentText).toHaveBeenCalledTimes(1);
});
it("returns success when shared dispatch observes message-tool delivery", async () => {
const deliverReply = vi.fn(async () => acceptedDeliveryResult());
const rememberSentText = vi.fn();
dispatchReplyWithBufferedBlockDispatcherMock.mockImplementationOnce(
async (params: CapturedDispatchParams) => {
capturedDispatchParams = params;
return {
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
observedReplyDelivery: true,
};
},
);
await expect(
dispatchBufferedReply({
deliverReply,
rememberSentText,
}),
).resolves.toBe(true);
expect(deliverReply).not.toHaveBeenCalled();
expect(rememberSentText).not.toHaveBeenCalled();
});
it("does not treat generated WhatsApp text as sent when the provider did not accept it", async () => {
const deliverReply = vi.fn(async () => unacceptedDeliveryResult());
const rememberSentText = vi.fn();

View File

@@ -654,7 +654,7 @@ export async function dispatchWhatsAppBufferedReply(params: {
void statusReactionController.setThinking();
}
const { queuedFinal, counts } = await dispatchReplyWithBufferedBlockDispatcher({
const dispatchResult = await dispatchReplyWithBufferedBlockDispatcher({
ctx: params.context,
cfg: params.cfg,
replyResolver: params.replyResolver,
@@ -798,7 +798,8 @@ export async function dispatchWhatsAppBufferedReply(params: {
: {}),
},
});
const didQueueVisibleReply = hasVisibleInboundReplyDispatch({ queuedFinal, counts });
const didQueueVisibleReply = hasVisibleInboundReplyDispatch(dispatchResult);
const didDeliverVisibleReply = didSendReply || dispatchResult.observedReplyDelivery === true;
if (!didQueueVisibleReply) {
if (statusReactionController) {
void finalizeWhatsAppStatusReaction({
@@ -819,8 +820,8 @@ export async function dispatchWhatsAppBufferedReply(params: {
if (statusReactionController) {
void finalizeWhatsAppStatusReaction({
controller: statusReactionController,
outcome: didSendReply ? "done" : "error",
hasFinalResponse: didSendReply,
outcome: didDeliverVisibleReply ? "done" : "error",
hasFinalResponse: didDeliverVisibleReply,
removeAckAfterReply,
timing: statusReactionTiming,
});
@@ -830,7 +831,7 @@ export async function dispatchWhatsAppBufferedReply(params: {
params.groupHistories.set(params.groupHistoryKey, []);
}
return didSendReply;
return didDeliverVisibleReply;
}
async function finalizeWhatsAppStatusReaction(params: {

View File

@@ -322,6 +322,7 @@ function normalizeEmbeddedRunAttemptResult(
messagingToolSourceReplyPayloads?:
| EmbeddedRunAttemptForRunner["messagingToolSourceReplyPayloads"]
| null;
didDeliverSourceReplyViaMessageTool?: boolean | null;
itemLifecycle?: EmbeddedRunAttemptForRunner["itemLifecycle"] | null;
};
return {
@@ -334,6 +335,7 @@ function normalizeEmbeddedRunAttemptResult(
messagingToolSentMediaUrls: raw.messagingToolSentMediaUrls ?? [],
messagingToolSentTargets: raw.messagingToolSentTargets ?? [],
messagingToolSourceReplyPayloads: raw.messagingToolSourceReplyPayloads ?? [],
didDeliverSourceReplyViaMessageTool: raw.didDeliverSourceReplyViaMessageTool === true,
itemLifecycle: raw.itemLifecycle ?? {
startedCount: 0,
completedCount: 0,
@@ -3061,6 +3063,8 @@ export async function runEmbeddedAgent(
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
},
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
didDeliverSourceReplyViaMessageTool:
attempt.didDeliverSourceReplyViaMessageTool === true,
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
messagingToolSentTexts: attempt.messagingToolSentTexts,
messagingToolSentMediaUrls: attempt.messagingToolSentMediaUrls,
@@ -3306,6 +3310,8 @@ export async function runEmbeddedAgent(
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
},
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
didDeliverSourceReplyViaMessageTool:
attempt.didDeliverSourceReplyViaMessageTool === true,
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
messagingToolSentTexts: attempt.messagingToolSentTexts,
messagingToolSentMediaUrls: attempt.messagingToolSentMediaUrls,
@@ -3360,6 +3366,8 @@ export async function runEmbeddedAgent(
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
},
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
didDeliverSourceReplyViaMessageTool:
attempt.didDeliverSourceReplyViaMessageTool === true,
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
messagingToolSentTexts: attempt.messagingToolSentTexts,
messagingToolSentMediaUrls: attempt.messagingToolSentMediaUrls,
@@ -3481,6 +3489,8 @@ export async function runEmbeddedAgent(
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
},
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
didDeliverSourceReplyViaMessageTool:
attempt.didDeliverSourceReplyViaMessageTool === true,
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
messagingToolSentTexts: attempt.messagingToolSentTexts,
messagingToolSentMediaUrls: attempt.messagingToolSentMediaUrls,
@@ -3624,6 +3634,8 @@ export async function runEmbeddedAgent(
autoCompactionCount > 0 ? { lastTurnCompactions: autoCompactionCount } : undefined,
},
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
didDeliverSourceReplyViaMessageTool:
attempt.didDeliverSourceReplyViaMessageTool === true,
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
messagingToolSentTexts: attempt.messagingToolSentTexts,
messagingToolSentMediaUrls: attempt.messagingToolSentMediaUrls,

View File

@@ -2268,9 +2268,13 @@ export async function runEmbeddedAttempt(
applySystemPromptToSession(activeSession, nextSystemPrompt);
};
setActiveSessionSystemPrompt(systemPromptText);
let didDeliverSourceReplyViaMessageTool = false;
installMessageToolOnlyTerminalHook({
agent: activeSession.agent,
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
onDeliveredSourceReply: () => {
didDeliverSourceReplyViaMessageTool = true;
},
});
prepStages.mark("agent-session");
if (isRawModelRun) {
@@ -5130,6 +5134,7 @@ export async function runEmbeddedAttempt(
currentAttemptAssistant,
lastToolError,
didSendViaMessagingTool: didSendViaMessagingTool(),
didDeliverSourceReplyViaMessageTool,
didSendDeterministicApprovalPrompt: didSendDeterministicApprovalPromptNow,
messagingToolSentTexts: getMessagingToolSentTexts(),
messagingToolSentMediaUrls: getMessagingToolSentMediaUrls(),

View File

@@ -160,9 +160,11 @@ describe("message-tool-only terminal sends", () => {
details: { rewritten: true },
}));
const agent = { afterToolCall: previousAfterToolCall } as unknown as Agent;
const onDeliveredSourceReply = vi.fn();
installMessageToolOnlyTerminalHook({
agent,
sourceReplyDeliveryMode: "message_tool_only",
onDeliveredSourceReply,
});
await expect(
@@ -178,6 +180,7 @@ describe("message-tool-only terminal sends", () => {
terminate: true,
});
expect(previousAfterToolCall).toHaveBeenCalledTimes(1);
expect(onDeliveredSourceReply).toHaveBeenCalledTimes(1);
});
it("leaves existing after-tool-call output alone when the send failed", async () => {
@@ -187,9 +190,11 @@ describe("message-tool-only terminal sends", () => {
isError: true,
}));
const agent = { afterToolCall: previousAfterToolCall } as unknown as Agent;
const onDeliveredSourceReply = vi.fn();
installMessageToolOnlyTerminalHook({
agent,
sourceReplyDeliveryMode: "message_tool_only",
onDeliveredSourceReply,
});
await expect(
@@ -205,6 +210,7 @@ describe("message-tool-only terminal sends", () => {
isError: true,
});
expect(previousAfterToolCall).toHaveBeenCalledTimes(1);
expect(onDeliveredSourceReply).not.toHaveBeenCalled();
});
it("does not install a wrapper for non-message-tool-only delivery", async () => {

View File

@@ -192,6 +192,7 @@ export function shouldTerminateAfterMessageToolOnlySend(params: {
export function installMessageToolOnlyTerminalHook(params: {
agent: Agent;
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
onDeliveredSourceReply?: () => void;
}): void {
if (params.sourceReplyDeliveryMode !== "message_tool_only") {
return;
@@ -206,6 +207,7 @@ export function installMessageToolOnlyTerminalHook(params: {
hookResult,
})
) {
params.onDeliveredSourceReply?.();
return { ...hookResult, terminate: true };
}
return hookResult;

View File

@@ -157,6 +157,7 @@ export type EmbeddedRunAttemptResult = {
currentAttemptAssistant?: AssistantMessage | undefined;
lastToolError?: ToolErrorSummary;
didSendViaMessagingTool: boolean;
didDeliverSourceReplyViaMessageTool?: boolean;
didSendDeterministicApprovalPrompt?: boolean;
messagingToolSentTexts: string[];
messagingToolSentMediaUrls: string[];

View File

@@ -193,6 +193,8 @@ export type EmbeddedAgentRunResult = {
// True if a messaging tool successfully sent a message.
// Used to suppress agent's confirmation text.
didSendViaMessagingTool?: boolean;
// True if message_tool_only delivered a visible reply to the current source conversation.
didDeliverSourceReplyViaMessageTool?: boolean;
// True if a deterministic approval prompt was sent through the tool-result channel.
didSendDeterministicApprovalPrompt?: boolean;
// Texts successfully sent via messaging tools during the run.

View File

@@ -197,6 +197,8 @@ export type GetReplyOptions = {
queuedFollowupLifecycle?: QueuedReplyLifecycle;
/** Allow channel-owned progress UI while final/source reply delivery remains message-tool-only. */
allowProgressCallbacksWhenSourceDeliverySuppressed?: boolean;
/** Called when a suppressed source reply mode observes visible delivery through another path. */
onObservedReplyDelivery?: () => Promise<void> | void;
disableBlockStreaming?: boolean;
/** Timeout for block reply delivery (ms). */
blockReplyTimeoutMs?: number;

View File

@@ -9,6 +9,7 @@ import {
} from "../../agents/agent-scope.js";
import { resolveContextTokensForModel } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { hasVisibleAgentPayload } from "../../agents/embedded-agent-runner/delivery-evidence.js";
import {
formatEmbeddedAgentQueueFailureSummary,
queueEmbeddedAgentMessageWithOutcomeAsync,
@@ -1818,6 +1819,15 @@ export async function runReplyAgent(params: {
messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls,
messagingToolSentTargets: runResult.messagingToolSentTargets,
});
const committedMessagingToolSourceReplyDelivery =
runResult.didDeliverSourceReplyViaMessageTool === true ||
hasVisibleAgentPayload({ payloads: runResult.messagingToolSourceReplyPayloads });
if (
opts?.sourceReplyDeliveryMode === "message_tool_only" &&
committedMessagingToolSourceReplyDelivery
) {
await opts.onObservedReplyDelivery?.();
}
const returnSilentFallbackFailureIfNeeded = async (): Promise<ReplyPayload | undefined> => {
const silentFallbackFailurePayload = buildSilentFallbackFailurePayload({
fallbackTransition,

View File

@@ -8029,6 +8029,44 @@ describe("sendPolicy deny — suppress delivery, not processing (#53328)", () =>
}
});
it("treats message-tool-only observed delivery as visible for fallback eligibility", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "allow",
};
const dispatcher = createDispatcher();
const observedReplyDelivery = vi.fn();
const replyResolver = vi.fn(async (_ctx: MsgContext, opts?: GetReplyOptions) => {
expect(opts?.sourceReplyDeliveryMode).toBe("message_tool_only");
await opts?.onObservedReplyDelivery?.();
return { text: "private final reply" } satisfies ReplyPayload;
});
const result = await dispatchReplyFromConfig({
ctx: buildTestCtx({
ChatType: "channel",
SessionKey: "test:session",
}),
cfg: emptyConfig,
dispatcher,
replyResolver,
replyOptions: {
sourceReplyDeliveryMode: "message_tool_only",
onObservedReplyDelivery: observedReplyDelivery,
},
});
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(observedReplyDelivery).toHaveBeenCalledTimes(1);
expect(result.queuedFinal).toBe(false);
expect(result.observedReplyDelivery).toBe(true);
expect(result.noVisibleReplyFallbackEligible).toBeUndefined();
expect(result.sourceReplyDeliveryMode).toBe("message_tool_only");
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("preserves hook-blocked metadata when source delivery is message-tool-only", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {

View File

@@ -1299,6 +1299,14 @@ export async function dispatchReplyFromConfig(
};
const getQueuedFollowupAbortSignal = () =>
dispatchReplyOperation?.abortSignal ?? params.replyOptions?.abortSignal;
let observedReplyDelivery = false;
const markObservedReplyDelivery = async () => {
if (observedReplyDelivery) {
return;
}
observedReplyDelivery = true;
await params.replyOptions?.onObservedReplyDelivery?.();
};
const getReplyOptions = () => {
const abortSignal = getDispatchAbortSignal();
if (!abortSignal) {
@@ -2451,6 +2459,7 @@ export async function dispatchReplyFromConfig(
{
...getReplyOptions(),
sourceReplyDeliveryMode,
onObservedReplyDelivery: markObservedReplyDelivery,
suppressToolErrorWarnings,
shouldSuppressToolErrorWarnings,
typingPolicy: typing.typingPolicy,
@@ -2953,7 +2962,8 @@ export async function dispatchReplyFromConfig(
return attachSourceReplyDeliveryMode({
queuedFinal,
counts,
...(!queuedFinal && !emptyFinalAllowedAsSilent
...(observedReplyDelivery ? { observedReplyDelivery } : {}),
...(!queuedFinal && !observedReplyDelivery && !emptyFinalAllowedAsSilent
? { noVisibleReplyFallbackEligible: true }
: {}),
...(beforeAgentRunBlocked ? { beforeAgentRunBlocked } : {}),

View File

@@ -11,6 +11,7 @@ export type DispatchFromConfigResult = {
failedCounts?: Partial<Record<ReplyDispatchKind, number>>;
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
sendPolicyDenied?: boolean;
observedReplyDelivery?: boolean;
noVisibleReplyFallbackEligible?: boolean;
beforeAgentRunBlocked?: boolean;
};

View File

@@ -5,6 +5,7 @@ export type ChannelTurnDispatchResultLike =
| {
queuedFinal?: boolean;
counts?: Partial<Record<ReplyDispatchKind, number>>;
observedReplyDelivery?: boolean;
}
| null
| undefined;
@@ -41,6 +42,7 @@ export function hasVisibleChannelTurnDispatch(
const counts = resolveChannelTurnDispatchCounts(result);
// Non-count signals cover delivery paths that bypass the buffered reply dispatcher.
return (
result?.observedReplyDelivery === true ||
signals.observedReplyDelivery === true ||
signals.fallbackDelivered === true ||
signals.deliverySummaryDelivered === true ||

View File

@@ -283,6 +283,13 @@ describe("recordInboundSessionAndDispatchReply", () => {
counts: { tool: 0, block: 1, final: 0 },
}),
).toBe(true);
expect(
hasVisibleInboundReplyDispatch({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
observedReplyDelivery: true,
}),
).toBe(true);
expect(
hasFinalInboundReplyDispatch({
queuedFinal: false,