fix(feishu): fallback when accepted turns send no visible reply (#87896)

* fix(feishu): fallback when accepted turns send no visible reply

* fix(feishu): cover no-visible-reply fallback gaps

* fix(feishu): mark media replies visible

* fix(feishu): honor suppressed delivery fallback

* test(auto-reply): trim fallback test churn

* fix(feishu): gate empty fallback eligibility

* test(auto-reply): expect fallback metadata after denied dispatch

* fix(feishu): fallback after failed visible final sends

* test(feishu): keep reply dispatcher mock shape aligned

* fix(auto-reply): respect silent policy for no-visible fallback

* fix(feishu): wait for streaming close before fallback

* fix(feishu): clear silent skip before later finals

* fix(feishu): preserve visible state across keepalives

* test(feishu): align lifecycle dispatcher mocks

* fix(feishu): require accepted streaming content for fallback

---------

Co-authored-by: ArthurNie <264332276+ArthurNie@users.noreply.github.com>
Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
ArthurNie
2026-06-01 04:33:13 +08:00
committed by GitHub
parent e681569536
commit 7c15c2765e
13 changed files with 732 additions and 48 deletions

View File

@@ -19,6 +19,7 @@ const { mockCreateFeishuReplyDispatcher, mockCreateFeishuClient, mockResolveAgen
},
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback: vi.fn(),
})),
mockCreateFeishuClient: vi.fn(),
mockResolveAgentRoute: vi.fn(),
@@ -227,6 +228,20 @@ describe("broadcast dispatch", () => {
lastRoutePolicy: "session",
matchedBy: "default",
});
mockCreateFeishuReplyDispatcher.mockReturnValue({
dispatcher: {
sendToolResult: vi.fn(),
sendBlockReply: vi.fn(),
sendFinalReply: vi.fn(),
waitForIdle: vi.fn(),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(),
},
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback: vi.fn(),
});
mockCreateFeishuClient.mockReturnValue({
contact: {
user: {
@@ -329,6 +344,130 @@ describe("broadcast dispatch", () => {
expect(dispatcherParams?.agentId).toBe("main");
});
it("sends no-visible-reply fallback for active broadcast zero-final dispatch", async () => {
mockDispatchReplyFromConfig
.mockResolvedValueOnce({ queuedFinal: false, counts: { final: 1 } })
.mockResolvedValueOnce({
queuedFinal: false,
counts: { final: 0 },
noVisibleReplyFallbackEligible: true,
});
const ensureNoVisibleReplyFallback = vi.fn();
mockCreateFeishuReplyDispatcher.mockReturnValueOnce({
dispatcher: {
sendToolResult: vi.fn(),
sendBlockReply: vi.fn(),
sendFinalReply: vi.fn(),
waitForIdle: vi.fn(),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(),
},
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback,
});
const cfg = createBroadcastConfig();
const event = createBroadcastEvent({
messageId: "msg-broadcast-zero-final",
text: "hello @bot",
botMentioned: true,
});
await handleFeishuMessage({
cfg,
event,
botOpenId: "bot-open-id",
runtime: createRuntimeEnv(),
});
expect(ensureNoVisibleReplyFallback).toHaveBeenCalledWith(
"broadcast-dispatch-complete-no-visible-reply",
);
});
it("sends no-visible-reply fallback for active broadcast failed final delivery", async () => {
mockDispatchReplyFromConfig
.mockResolvedValueOnce({ queuedFinal: false, counts: { final: 1 } })
.mockResolvedValueOnce({
queuedFinal: true,
counts: { final: 1 },
});
const ensureNoVisibleReplyFallback = vi.fn();
mockCreateFeishuReplyDispatcher.mockReturnValueOnce({
dispatcher: {
sendToolResult: vi.fn(),
sendBlockReply: vi.fn(),
sendFinalReply: vi.fn(),
waitForIdle: vi.fn(),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 1 })),
markComplete: vi.fn(),
},
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback,
});
const cfg = createBroadcastConfig();
const event = createBroadcastEvent({
messageId: "msg-broadcast-final-failed",
text: "hello @bot",
botMentioned: true,
});
await handleFeishuMessage({
cfg,
event,
botOpenId: "bot-open-id",
runtime: createRuntimeEnv(),
});
expect(ensureNoVisibleReplyFallback).toHaveBeenCalledWith(
"broadcast-dispatch-complete-no-visible-reply",
);
});
it("skips no-visible-reply fallback for source-suppressed active broadcast dispatch", async () => {
mockDispatchReplyFromConfig
.mockResolvedValueOnce({ queuedFinal: false, counts: { final: 1 } })
.mockResolvedValueOnce({
queuedFinal: false,
counts: { final: 0 },
sourceReplyDeliveryMode: "message_tool_only",
noVisibleReplyFallbackEligible: true,
});
const ensureNoVisibleReplyFallback = vi.fn();
mockCreateFeishuReplyDispatcher.mockReturnValueOnce({
dispatcher: {
sendToolResult: vi.fn(),
sendBlockReply: vi.fn(),
sendFinalReply: vi.fn(),
waitForIdle: vi.fn(),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(),
},
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback,
});
const cfg = createBroadcastConfig();
const event = createBroadcastEvent({
messageId: "msg-broadcast-source-suppressed",
text: "hello @bot",
botMentioned: true,
});
await handleFeishuMessage({
cfg,
event,
botOpenId: "bot-open-id",
runtime: createRuntimeEnv(),
});
expect(ensureNoVisibleReplyFallback).not.toHaveBeenCalled();
});
it("skips broadcast dispatch when bot is NOT mentioned (requireMention=true)", async () => {
const cfg = createBroadcastConfig();
const event = createBroadcastEvent({

View File

@@ -210,6 +210,7 @@ function createFeishuBotRuntime(overrides: DeepPartial<PluginRuntime> = {}): Plu
onRecordError: turn.record?.onRecordError ?? (() => undefined),
});
return {
dispatched: true,
dispatchResult: await turn.runDispatch(),
};
}),
@@ -294,6 +295,7 @@ const {
dispatcher: createReplyDispatcher(),
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback: vi.fn(),
})),
mockSendMessageFeishu: vi.fn().mockResolvedValue({ messageId: "pairing-msg", chatId: "oc-dm" }),
mockGetMessageFeishu: vi.fn().mockResolvedValue(null),
@@ -464,6 +466,7 @@ describe("handleFeishuMessage ACP routing", () => {
dispatcher: createReplyDispatcher(),
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback: vi.fn(),
});
setFeishuRuntime(createFeishuBotRuntime());
@@ -1046,6 +1049,90 @@ describe("handleFeishuMessage command authorization", () => {
expect(mockEnqueueSystemEvent).not.toHaveBeenCalled();
});
it("does not send no-visible fallback when send policy denied delivery", async () => {
mockDispatchReplyFromConfig.mockResolvedValueOnce({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
sendPolicyDenied: true,
noVisibleReplyFallbackEligible: true,
});
const ensureNoVisibleReplyFallback = vi.fn();
mockCreateFeishuReplyDispatcher.mockReturnValueOnce({
dispatcher: createReplyDispatcher(),
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback,
});
await dispatchMessage({
cfg: {
channels: {
feishu: {
dmPolicy: "open",
},
},
} as ClawdbotConfig,
event: {
sender: {
sender_id: {
open_id: "ou-sender",
},
},
message: {
message_id: "msg-send-policy-deny",
chat_id: "oc-dm",
chat_type: "p2p",
message_type: "text",
content: JSON.stringify({ text: "hello" }),
},
},
});
expect(ensureNoVisibleReplyFallback).not.toHaveBeenCalled();
});
it("sends no-visible fallback when queued final delivery fails", async () => {
mockDispatchReplyFromConfig.mockResolvedValueOnce({
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
});
const ensureNoVisibleReplyFallback = vi.fn();
const dispatcher = createReplyDispatcher();
vi.mocked(dispatcher.getFailedCounts).mockReturnValue({ tool: 0, block: 0, final: 1 });
mockCreateFeishuReplyDispatcher.mockReturnValueOnce({
dispatcher,
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback,
});
await dispatchMessage({
cfg: {
channels: {
feishu: {
dmPolicy: "open",
},
},
} as ClawdbotConfig,
event: {
sender: {
sender_id: {
open_id: "ou-sender",
},
},
message: {
message_id: "msg-final-delivery-failed",
chat_id: "oc-dm",
chat_type: "p2p",
message_type: "text",
content: JSON.stringify({ text: "hello" }),
},
},
});
expect(ensureNoVisibleReplyFallback).toHaveBeenCalledWith("dispatch-complete-no-visible-reply");
});
it("passes disabled config-write policy to dynamic agent creation", async () => {
mockShouldComputeCommandAuthorized.mockReturnValue(false);

View File

@@ -88,6 +88,28 @@ const GROUP_NAME_CACHE_MAX_SIZE = 500; // hard cap
type FeishuGroupSessionScope = "group" | "group_sender" | "group_topic" | "group_topic_sender";
function shouldSendNoVisibleReplyFallback(dispatchResult: {
counts: { final?: number };
failedCounts?: { final?: number };
noVisibleReplyFallbackEligible?: boolean;
queuedFinal?: boolean;
sendPolicyDenied?: boolean;
sourceReplyDeliveryMode?: string;
}): boolean {
const finalCount = dispatchResult.counts.final ?? 0;
const failedFinalCount = dispatchResult.failedCounts?.final ?? 0;
const emptyEligibleDispatch =
dispatchResult.noVisibleReplyFallbackEligible === true &&
dispatchResult.queuedFinal !== true &&
finalCount === 0;
const queuedFinalFailed = dispatchResult.queuedFinal === true && failedFinalCount > 0;
return (
dispatchResult.sendPolicyDenied !== true &&
dispatchResult.sourceReplyDeliveryMode !== "message_tool_only" &&
(emptyEligibleDispatch || queuedFinalFailed)
);
}
function resolveConfiguredFeishuGroupSessionScope(params: {
groupConfig?: {
groupSessionScope?: FeishuGroupSessionScope;
@@ -1487,26 +1509,28 @@ export async function handleFeishuMessage(params: {
if (agentId === activeAgentId) {
// Active agent: real Feishu dispatcher (responds on Feishu)
const identity = resolveAgentOutboundIdentity(cfg, agentId);
const { dispatcher, replyOptions, markDispatchIdle } = createFeishuReplyDispatcher({
cfg,
agentId,
runtime: runtime as RuntimeEnv,
chatId: ctx.chatId,
allowReasoningPreview,
replyToMessageId: replyTargetMessageId,
skipReplyToInMessages: !isGroup,
replyInThread,
rootId: ctx.rootId,
threadReply,
accountId: account.accountId,
identity,
messageCreateTimeMs,
});
const { dispatcher, replyOptions, markDispatchIdle, ensureNoVisibleReplyFallback } =
createFeishuReplyDispatcher({
cfg,
agentId,
runtime: runtime as RuntimeEnv,
chatId: ctx.chatId,
allowReasoningPreview,
replyToMessageId: replyTargetMessageId,
skipReplyToInMessages: !isGroup,
replyInThread,
rootId: ctx.rootId,
threadReply,
accountId: account.accountId,
identity,
messageCreateTimeMs,
sessionKey: agentSessionKey,
});
log(
`feishu[${account.accountId}]: broadcast active dispatch agent=${agentId} (session=${agentSessionKey})`,
);
await core.channel.inbound.run({
const turnResult = await core.channel.inbound.run({
channel: "feishu",
accountId: route.accountId,
raw: ctx,
@@ -1547,6 +1571,15 @@ export async function handleFeishuMessage(params: {
}),
},
});
if (
turnResult.dispatched &&
shouldSendNoVisibleReplyFallback({
...turnResult.dispatchResult,
failedCounts: dispatcher.getFailedCounts(),
})
) {
await ensureNoVisibleReplyFallback("broadcast-dispatch-complete-no-visible-reply");
}
} else {
// Observer agent: no-op dispatcher (session entry + inference, no Feishu reply).
// Strip CommandAuthorized so slash commands (e.g. /reset) don't silently
@@ -1652,21 +1685,23 @@ export async function handleFeishuMessage(params: {
storePath,
sessionKey: route.sessionKey,
});
const { dispatcher, replyOptions, markDispatchIdle } = createFeishuReplyDispatcher({
cfg,
agentId: route.agentId,
runtime: runtime as RuntimeEnv,
chatId: ctx.chatId,
allowReasoningPreview,
replyToMessageId: replyTargetMessageId,
skipReplyToInMessages: !isGroup,
replyInThread,
rootId: ctx.rootId,
threadReply,
accountId: account.accountId,
identity,
messageCreateTimeMs,
});
const { dispatcher, replyOptions, markDispatchIdle, ensureNoVisibleReplyFallback } =
createFeishuReplyDispatcher({
cfg,
agentId: route.agentId,
runtime: runtime as RuntimeEnv,
chatId: ctx.chatId,
allowReasoningPreview,
replyToMessageId: replyTargetMessageId,
skipReplyToInMessages: !isGroup,
replyInThread,
rootId: ctx.rootId,
threadReply,
accountId: account.accountId,
identity,
messageCreateTimeMs,
sessionKey: route.sessionKey,
});
log(`feishu[${account.accountId}]: dispatching to agent (session=${route.sessionKey})`);
const turnResult = await core.channel.inbound.run({
@@ -1733,6 +1768,14 @@ export async function handleFeishuMessage(params: {
}
const { dispatchResult } = turnResult;
const { queuedFinal, counts } = dispatchResult;
if (
shouldSendNoVisibleReplyFallback({
...dispatchResult,
failedCounts: dispatcher.getFailedCounts(),
})
) {
await ensureNoVisibleReplyFallback("dispatch-complete-no-visible-reply");
}
log(
`feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`,

View File

@@ -19,11 +19,13 @@ type DispatchReplyContext = Record<string, unknown> & {
};
type DispatchReplyDispatcher = {
sendFinalReply: (payload: { text: string }) => unknown;
getFailedCounts?: UnknownMock;
};
type FeishuReplyDispatcherMockValue = {
dispatcher: DispatchReplyDispatcher;
replyOptions: Record<string, never>;
markDispatchIdle: () => unknown;
ensureNoVisibleReplyFallback?: AsyncUnknownMock;
};
type CreateFeishuReplyDispatcherMock = Mock<(params?: unknown) => FeishuReplyDispatcherMockValue>;
type DispatchReplyFromConfigMock = Mock<

View File

@@ -82,8 +82,9 @@ vi.mock("./streaming-card.js", () => {
this.active = true;
});
update = vi.fn(async () => {});
close = vi.fn(async () => {
close = vi.fn(async (text?: string) => {
this.active = false;
return Boolean(text?.trim());
});
discard = vi.fn(async () => {
this.active = false;
@@ -119,7 +120,10 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
type TypingDispatcherOptions = {
onReplyStart?: () => Promise<void> | void;
onIdle?: () => Promise<void> | void;
deliver: (payload: { text: string }, meta: { kind: string }) => Promise<void> | void;
deliver: (
payload: { text?: string; mediaUrl?: string; mediaUrls?: string[]; audioAsVoice?: boolean },
meta: { kind: string },
) => Promise<void> | void;
};
beforeEach(() => {
@@ -1574,6 +1578,221 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled();
});
it("sends a no-visible-reply fallback when no visible output was delivered", async () => {
const runtime = createRuntimeLogger();
const { result } = createDispatcherHarness({ runtime });
await expect(result.ensureNoVisibleReplyFallback("empty-complete")).resolves.toBe(true);
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1);
expect(String(firstMockArg(sendMessageFeishuMock, "send message params").text)).toContain(
"without visible content",
);
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: true,
skippedFinalReason: null,
});
});
it("does not send no-visible-reply fallback after an intentional silent final", async () => {
const runtime = createRuntimeLogger();
const { result, options } = createDispatcherHarness({ runtime, sessionKey: "main" });
options.onSkip?.({ text: "NO_REPLY" }, { kind: "final", reason: "silent" });
await expect(result.ensureNoVisibleReplyFallback("empty-complete")).resolves.toBe(false);
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: false,
skippedFinalReason: "silent",
});
});
it("sends no-visible-reply fallback when a final fails after an earlier silent skip", async () => {
useNonStreamingAutoAccount();
const runtime = createRuntimeLogger();
const { result, options } = createDispatcherHarness({ runtime, sessionKey: "main" });
options.onSkip?.({ text: "NO_REPLY" }, { kind: "final", reason: "silent" });
sendMessageFeishuMock.mockRejectedValueOnce(new Error("send failed"));
await expect(
options.deliver({ text: "Later visible final" }, { kind: "final" }),
).rejects.toThrow("send failed");
await expect(result.ensureNoVisibleReplyFallback("failed-final")).resolves.toBe(true);
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(2);
expect(String(firstMockArg(sendMessageFeishuMock, "send message params").text)).toBe(
"Later visible final",
);
expect(String(sendMessageFeishuMock.mock.calls[1]?.[0]?.text)).toContain(
"without visible content",
);
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: true,
skippedFinalReason: null,
});
});
it("does not send no-visible-reply fallback after visible streaming close", async () => {
const runtime = createRuntimeLogger();
const { result, options } = createDispatcherHarness({ runtime });
await options.deliver({ text: "```md\nvisible answer\n```" }, { kind: "final" });
await options.onIdle?.();
await expect(result.ensureNoVisibleReplyFallback("zero-final-count")).resolves.toBe(false);
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: true,
skippedFinalReason: null,
});
});
it("sends no-visible-reply fallback when streaming close accepts no content", async () => {
const runtime = createRuntimeLogger();
const { result, options } = createDispatcherHarness({ runtime });
await options.deliver({ text: "```md\nvisible answer\n```" }, { kind: "final" });
streamingInstances[0].close = vi.fn(async () => {
streamingInstances[0].active = false;
return false;
});
await options.onIdle?.();
await expect(result.ensureNoVisibleReplyFallback("zero-final-count")).resolves.toBe(true);
expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\nvisible answer\n```", {
note: "Agent: agent",
});
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1);
expect(String(firstMockArg(sendMessageFeishuMock, "send message params").text)).toContain(
"without visible content",
);
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: true,
skippedFinalReason: null,
});
});
it("waits for pending streaming close before no-visible-reply fallback", async () => {
const runtime = createRuntimeLogger();
const { result, options } = createDispatcherHarness({ runtime });
await options.deliver({ text: "```md\nvisible answer\n```" }, { kind: "final" });
const streamingSession = streamingInstances[0];
let releaseClose: () => void = () => {};
const closeMock = vi.fn(async () => {
await new Promise<void>((resolve) => {
releaseClose = resolve;
});
streamingSession.active = false;
return true;
});
streamingSession.close = closeMock;
const idlePromise = options.onIdle?.();
const fallbackPromise = result.ensureNoVisibleReplyFallback("zero-final-count");
for (let attempt = 0; attempt < 20 && closeMock.mock.calls.length === 0; attempt += 1) {
await new Promise((resolve) => setTimeout(resolve, 0));
}
expect(closeMock).toHaveBeenCalledTimes(1);
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
releaseClose();
await idlePromise;
await expect(fallbackPromise).resolves.toBe(false);
expect(closeMock).toHaveBeenCalledWith("```md\nvisible answer\n```", {
note: "Agent: agent",
});
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: true,
skippedFinalReason: null,
});
});
it("does not send no-visible-reply fallback after media-only output", async () => {
const runtime = createRuntimeLogger();
const { result, options } = createDispatcherHarness({ runtime });
await options.deliver({ mediaUrl: "https://example.com/a.png" }, { kind: "block" });
await expect(result.ensureNoVisibleReplyFallback("zero-final-count")).resolves.toBe(false);
expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1);
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: true,
skippedFinalReason: null,
});
});
it("sends no-visible-reply fallback after an empty card streaming close", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
const runtime = createRuntimeLogger();
const { result, options } = createDispatcherHarness({ runtime });
await options.onReplyStart?.();
await options.onIdle?.();
await expect(result.ensureNoVisibleReplyFallback("zero-final-count")).resolves.toBe(true);
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith("", { note: "Agent: agent" });
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1);
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: true,
skippedFinalReason: null,
});
});
it("resets no-visible-reply state on the first reply start", async () => {
const runtime = createRuntimeLogger();
const { result, options } = createDispatcherHarness({ runtime });
options.onSkip?.({ text: "NO_REPLY" }, { kind: "final", reason: "silent" });
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: false,
skippedFinalReason: "silent",
});
await options.onReplyStart?.();
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: false,
skippedFinalReason: null,
});
});
it("keeps visible reply state across repeated reply-start keepalives", async () => {
const runtime = createRuntimeLogger();
const { result, options } = createDispatcherHarness({ runtime });
await options.onReplyStart?.();
await options.deliver({ mediaUrl: "https://example.com/a.png" }, { kind: "block" });
await options.onReplyStart?.();
await expect(result.ensureNoVisibleReplyFallback("zero-final-count")).resolves.toBe(false);
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(result.getVisibleReplyState()).toEqual({
visibleReplySent: true,
skippedFinalReason: null,
});
});
it("cleans streaming state even when close throws", async () => {
const origPush = streamingInstances.push.bind(streamingInstances);
streamingInstances.push = (...args: StreamingSessionStub[]) => {

View File

@@ -37,6 +37,8 @@ function shouldUseCard(text: string): boolean {
const TYPING_INDICATOR_MAX_AGE_MS = 2 * 60_000;
const MS_EPOCH_MIN = 1_000_000_000_000;
const STREAMING_START_FAILURE_BACKOFF_MS = 60_000;
const NO_VISIBLE_REPLY_FALLBACK_TEXT =
"⚠️ This reply completed without visible content. The turn may have been interrupted; please retry or ask me to recover from recent context.";
const streamingStartBackoffUntilByAccount = new Map<string, number>();
function isStreamingStartBackedOff(accountId: string, now = Date.now()): boolean {
@@ -128,6 +130,7 @@ type CreateFeishuReplyDispatcherParams = {
/** Epoch ms when the inbound message was created. Used to suppress typing
* indicators on old/replayed messages after context compaction (#30418). */
messageCreateTimeMs?: number;
sessionKey?: string;
};
export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherParams) {
@@ -244,8 +247,16 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
let streamingStartPromise: Promise<void> | null = null;
let streamingClosedForReply = false;
let streamingCloseErroredForReply = false;
let visibleReplySent = false;
let skippedFinalReason: string | null = null;
let idleSideEffectsPromise: Promise<void> = Promise.resolve();
let replyLifecycleStateInitialized = false;
type StreamTextUpdateMode = "snapshot" | "delta";
const markVisibleReplySent = () => {
visibleReplySent = true;
};
const formatReasoningPrefix = (thinking: string): string => {
if (!thinking) {
return "";
@@ -397,11 +408,14 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
statusLine = "";
const text = buildCombinedStreamText(reasoningText, streamText);
const finalNote = resolveCardNote(agentId, identity, prefixContext.prefixContext);
await streaming.close(text, { note: finalNote });
const contentVisible = await streaming.close(text, { note: finalNote });
// Track the raw streamed text so the duplicate-final check in deliver()
// can skip the redundant text delivery that arrives after onIdle closes
// the streaming card.
if (streamText) {
if (contentVisible) {
markVisibleReplySent();
}
if (contentVisible && streamText) {
deliveredFinalTexts.add(streamText);
if (options?.markClosedForReply !== false && !streamingCloseErroredForReply) {
streamingClosedForReply = true;
@@ -458,6 +472,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
chunk,
isFirst: index === 0,
});
markVisibleReplySent();
}
if (paramsLocal.infoKind === "final") {
deliveredFinalTexts.add(paramsLocal.text);
@@ -480,6 +495,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
accountId,
...(payload.audioAsVoice === true ? { audioAsVoice: true } : {}),
});
markVisibleReplySent();
if (result?.voiceIntentDegradedToFile && options?.fallbackText && !sentFallbackText) {
sentFallbackText = true;
await sendChunkedTextReply({
@@ -529,21 +545,76 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
});
};
const ensureNoVisibleReplyFallback = async (reason: string): Promise<boolean> => {
await idleSideEffectsPromise;
if (visibleReplySent) {
return false;
}
if (skippedFinalReason === "silent") {
params.runtime.log?.(
`feishu[${account.accountId}]: no-visible-reply fallback skipped for intentional silence (${reason})`,
);
return false;
}
await sendMessageFeishu({
cfg,
to: chatId,
text: NO_VISIBLE_REPLY_FALLBACK_TEXT,
replyToMessageId: sendReplyToMessageId,
replyInThread: effectiveReplyInThread,
allowTopLevelReplyFallback,
accountId,
});
markVisibleReplySent();
params.runtime.error?.(
`feishu[${account.accountId}]: sent no-visible-reply fallback (${reason})`,
);
return true;
};
const queueIdleSideEffects = (options?: { markClosedForReply?: boolean }): Promise<void> => {
const nextIdleSideEffects = idleSideEffectsPromise.then(async () => {
await closeStreaming(options);
await typingCallbacks?.onIdle?.();
});
idleSideEffectsPromise = nextIdleSideEffects.catch(() => {});
return nextIdleSideEffects;
};
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
responsePrefix: prefixContext.responsePrefix,
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, agentId),
silentReplyContext: {
cfg,
sessionKey: params.sessionKey,
surface: "feishu",
conversationType: chatId.startsWith("oc_") ? "group" : "direct",
},
onSkip: (_payload, info) => {
if (info.kind === "final") {
skippedFinalReason = info.reason;
}
},
onReplyStart: async () => {
deliveredFinalTexts.clear();
streamingClosedForReply = false;
streamingCloseErroredForReply = false;
if (!replyLifecycleStateInitialized) {
replyLifecycleStateInitialized = true;
deliveredFinalTexts.clear();
streamingClosedForReply = false;
streamingCloseErroredForReply = false;
visibleReplySent = false;
skippedFinalReason = null;
}
if (streamingEnabled && renderMode === "card") {
startStreaming();
}
await typingCallbacks?.onReplyStart?.();
},
deliver: async (payload: ReplyPayload, info) => {
if (info?.kind === "final") {
skippedFinalReason = null;
}
const payloadText =
payload.isReasoning && payload.text ? formatReasoningMessage(payload.text) : payload.text;
const reply = resolveSendableOutboundReplyParts({ ...payload, text: payloadText });
@@ -686,13 +757,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
params.runtime.error?.(
`feishu[${account.accountId}] ${info.kind} reply failed: ${String(error)}`,
);
await closeStreaming({ markClosedForReply: false });
typingCallbacks?.onIdle?.();
},
onIdle: async () => {
await closeStreaming();
typingCallbacks?.onIdle?.();
await queueIdleSideEffects({ markClosedForReply: false });
},
onIdle: () => queueIdleSideEffects(),
onCleanup: () => {
typingCallbacks?.onCleanup?.();
},
@@ -778,5 +845,10 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
: undefined,
},
markDispatchIdle,
ensureNoVisibleReplyFallback,
getVisibleReplyState: () => ({
visibleReplySent,
skippedFinalReason,
}),
};
}

View File

@@ -383,6 +383,43 @@ describe("FeishuStreamingSession", () => {
);
});
it("reports no visible content when final close update fails before any accepted text", async () => {
vi.useFakeTimers();
vi.setSystemTime(4_800);
const updateBodies: string[] = [];
const replaceBodies: string[] = [];
mockFetches(updateBodies, new Set<number>(), replaceBodies, new Map([[0, 500]]));
const log = vi.fn();
const session = new FeishuStreamingSession(
{} as never,
{
appId: "app_final_update_non_ok",
appSecret: "secret",
},
log,
);
setStreamingSessionInternals(session, {
state: {
cardId: "card_7",
messageId: "om_7",
sequence: 1,
currentText: "",
sentText: "",
hasNote: false,
},
lastUpdateTime: 3_000,
});
await expect(session.close("final answer")).resolves.toBe(false);
expect(updateBodies).toHaveLength(1);
expect(replaceBodies).toHaveLength(0);
expect(log).toHaveBeenCalledWith(
"Final update failed: Error: Update card content failed with HTTP 500",
);
});
it("bounds streaming token cache lifetime when token expiry overflows", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-29T12:00:00.000Z"));

View File

@@ -523,9 +523,9 @@ export class FeishuStreamingSession {
.catch((e) => this.log?.(`Note update failed: ${String(e)}`));
}
async close(finalText?: string, options?: { note?: string }): Promise<void> {
async close(finalText?: string, options?: { note?: string }): Promise<boolean> {
if (!this.state || this.closed) {
return;
return false;
}
this.closed = true;
this.clearFlushTimer();
@@ -534,6 +534,7 @@ export class FeishuStreamingSession {
const pendingMerged = mergeStreamingText(this.state.currentText, this.pendingText ?? undefined);
const text = finalText ?? pendingMerged;
const apiBase = resolveApiBase(this.creds.domain);
let visibleContentSent = Boolean(this.state.sentText.trim());
// Only send final update if content differs from what's already displayed.
// An explicit empty final text clears a transient preview before closeout.
@@ -549,6 +550,7 @@ export class FeishuStreamingSession {
this.state.currentText = text;
if (sent) {
this.state.sentText = text;
visibleContentSent = Boolean(text.trim());
}
}
@@ -588,6 +590,7 @@ export class FeishuStreamingSession {
this.pendingText = null;
this.log?.(`Closed streaming: cardId=${finalState.cardId}`);
return visibleContentSent;
}
async discard(): Promise<void> {

View File

@@ -37,10 +37,12 @@ type FeishuLifecycleReplyDispatcher = {
sendFinalReply: AsyncUnknownMock;
waitForIdle: AsyncUnknownMock;
getQueuedCounts: UnknownMock;
getFailedCounts: UnknownMock;
markComplete: UnknownMock;
};
replyOptions: Record<string, never>;
markDispatchIdle: UnknownMock;
ensureNoVisibleReplyFallback: AsyncUnknownMock;
};
export function setFeishuLifecycleStateDir(prefix: string) {
@@ -69,10 +71,12 @@ export function createFeishuLifecycleReplyDispatcher(): FeishuLifecycleReplyDisp
sendFinalReply: vi.fn(async () => true),
waitForIdle: vi.fn(async () => {}),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(),
},
replyOptions: {},
markDispatchIdle: vi.fn(),
ensureNoVisibleReplyFallback: vi.fn(async () => false),
};
}

View File

@@ -170,6 +170,8 @@ describe("dispatchReplyFromConfig reply_dispatch hook", () => {
expect(result).toEqual({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
sendPolicyDenied: true,
noVisibleReplyFallbackEligible: true,
});
});

View File

@@ -6912,6 +6912,61 @@ describe("sendPolicy deny — suppress delivery, not processing (#53328)", () =>
// Delivery MUST be suppressed
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(result.queuedFinal).toBe(false);
expect(result.sendPolicyDenied).toBe(true);
});
it("does not mark allowed group silence eligible for no-visible fallback", async () => {
setNoAbort();
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => undefined);
const ctx = buildTestCtx({
ChatType: "group",
Surface: "feishu",
Provider: "feishu",
SessionKey: "agent:main:feishu:group:oc_group",
});
const result = await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
});
expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } });
});
it("marks disallowed group silence eligible for no-visible fallback", async () => {
setNoAbort();
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => undefined);
const ctx = buildTestCtx({
ChatType: "group",
Surface: "feishu",
Provider: "feishu",
SessionKey: "agent:main:feishu:group:oc_group",
});
const result = await dispatchReplyFromConfig({
ctx,
cfg: {
agents: {
defaults: {
silentReply: {
group: "disallow",
},
},
},
} as OpenClawConfig,
dispatcher,
replyResolver,
});
expect(result).toEqual({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
noVisibleReplyFallbackEligible: true,
});
});
it("suppresses tool result delivery when sendPolicy is deny", async () => {

View File

@@ -83,6 +83,7 @@ import type { PluginHookReplyDispatchEvent } from "../../plugins/hook-types.js";
import { isAcpSessionKey } from "../../routing/session-key.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
import { resolveSilentReplyPolicyFromPolicies } from "../../shared/silent-reply-policy.js";
import { createTtsDirectiveTextStreamCleaner } from "../../tts/directives.js";
import {
normalizeTtsAutoMode,
@@ -1600,6 +1601,17 @@ export async function dispatchReplyFromConfig(
agentId: sessionAgentId,
});
const chatType = normalizeChatType(ctx.ChatType);
const silentReplyConversationType = resolveRoutedPolicyConversationType(ctx);
const silentReplySurface = normalizeLowercaseStringOrEmpty(ctx.Surface ?? ctx.Provider);
const emptyFinalAllowedAsSilent =
silentReplyConversationType !== undefined &&
resolveSilentReplyPolicyFromPolicies({
conversationType: silentReplyConversationType,
defaultPolicy: cfg.agents?.defaults?.silentReply,
surfacePolicy: silentReplySurface
? cfg.surfaces?.[silentReplySurface]?.silentReply
: undefined,
}) === "allow";
const configuredVisibleReplies =
chatType === "group" || chatType === "channel"
? (cfg.messages?.groupChat?.visibleReplies ?? cfg.messages?.visibleReplies)
@@ -1701,8 +1713,12 @@ export async function dispatchReplyFromConfig(
const attachSourceReplyDeliveryMode = (
result: DispatchFromConfigResult,
): DispatchFromConfigResult =>
sourceReplyDeliveryMode === "message_tool_only"
? { ...result, sourceReplyDeliveryMode }
sourceReplyDeliveryMode === "message_tool_only" || sendPolicyDenied
? {
...result,
...(sourceReplyDeliveryMode === "message_tool_only" ? { sourceReplyDeliveryMode } : {}),
...(sendPolicyDenied ? { sendPolicyDenied: true } : {}),
}
: result;
const explicitCommandTurnCtx = isExplicitSourceReplyCommand(ctx, cfg);
const unauthorizedTextSlashSourceReplyCtx =
@@ -2939,6 +2955,9 @@ export async function dispatchReplyFromConfig(
return attachSourceReplyDeliveryMode({
queuedFinal,
counts,
...(!queuedFinal && !emptyFinalAllowedAsSilent
? { noVisibleReplyFallbackEligible: true }
: {}),
...(beforeAgentRunBlocked ? { beforeAgentRunBlocked } : {}),
});
} catch (err) {

View File

@@ -10,6 +10,8 @@ export type DispatchFromConfigResult = {
counts: Record<ReplyDispatchKind, number>;
failedCounts?: Partial<Record<ReplyDispatchKind, number>>;
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
sendPolicyDenied?: boolean;
noVisibleReplyFallbackEligible?: boolean;
beforeAgentRunBlocked?: boolean;
};