Compare commits

..

1 Commits

Author SHA1 Message Date
Keshav's Bot
0dd656bdb3 fix(telegram): refine typing and progress drafts 2026-05-26 19:02:05 +01:00
26 changed files with 853 additions and 2091 deletions

View File

@@ -29,7 +29,6 @@ Docs: https://docs.openclaw.ai
- CI: bound Docker/Bash E2E tarball npm installs with `OPENCLAW_E2E_NPM_INSTALL_TIMEOUT` so package, onboarding, plugin, and upgrade lanes fail instead of hanging on a stuck npm install.
- CI: keep `OPENCLAW_TESTBOX=1 pnpm check:changed` delegating to Blacksmith Testbox through Crabbox without forwarding local Testbox or worker env into the remote command.
- CI: send KILL after the TERM grace period for manual checkout fetch timeouts so stuck Testbox and workflow checkout retries cannot hang behind a wedged `git fetch`.
- CI: send KILL after the TERM grace period for Bun global install smoke command timeouts so trapped `openclaw` child processes cannot wedge the scheduled install smoke.
- iMessage: thread current channel/account inbound attachment roots into the image tool so iMessage-saved attachments under `~/Library/Messages/Attachments` (including the wildcard `/Users/*/Library/Messages/Attachments` root) are read through the existing inbound path policy instead of being rejected as `path-not-allowed`. Literal `localRoots` stays workspace-scoped. Fixes #30170. (#86569)
- QQ Bot: respect `OPENCLAW_HOME` for outbound media path resolution so `<qqmedia>` sends no longer silently fail when `HOME` and `OPENCLAW_HOME` differ (Docker / multi-user hosts). Persisted QQ Bot data (sessions, known users, refs) stays anchored on the OS home for upgrade compatibility. Fixes #83562. Thanks @sliverp.
- Update: report the primary malformed `openclaw.extensions` payload error without adding a duplicate missing-main diagnostic. (#86596) Thanks @ferminquant.

View File

@@ -24,10 +24,12 @@ type BuildTelegramMessageContextForTestParams = {
options?: BuildTelegramMessageContextParams["options"];
cfg?: Record<string, unknown>;
accountId?: string;
dmPolicy?: BuildTelegramMessageContextParams["dmPolicy"];
historyLimit?: number;
groupHistories?: Map<string, import("openclaw/plugin-sdk/reply-history").HistoryEntry[]>;
ackReactionScope?: BuildTelegramMessageContextParams["ackReactionScope"];
botApi?: Record<string, unknown>;
sendChatActionHandler?: BuildTelegramMessageContextParams["sendChatActionHandler"];
runtime?: BuildTelegramMessageContextParams["runtime"];
sessionRuntime?: BuildTelegramMessageContextParams["sessionRuntime"] | null;
resolveGroupActivation?: BuildTelegramMessageContextParams["resolveGroupActivation"];
@@ -127,7 +129,7 @@ export async function buildTelegramMessageContextForTest(
account: { accountId: params.accountId ?? "default" } as never,
historyLimit: params.historyLimit ?? 0,
groupHistories: params.groupHistories ?? new Map(),
dmPolicy: "open",
dmPolicy: params.dmPolicy ?? "open",
allowFrom: ["*"],
groupAllowFrom: [],
ackReactionScope: params.ackReactionScope ?? "off",
@@ -140,7 +142,7 @@ export async function buildTelegramMessageContextForTest(
groupConfig: { requireMention: false },
topicConfig: undefined,
})),
sendChatActionHandler: { sendChatAction: vi.fn() } as never,
sendChatActionHandler: params.sendChatActionHandler ?? ({ sendChatAction: vi.fn() } as never),
});
}

View File

@@ -109,6 +109,7 @@ export type TelegramMessageContext = {
sendTyping: () => Promise<void>;
sendRecordVoice: () => Promise<void>;
sendChatActionHandler: BuildTelegramMessageContextParams["sendChatActionHandler"];
initialTypingCueSent?: boolean;
ackReactionPromise: Promise<boolean> | null;
reactionApi: TelegramReactionApi | null;
removeAckAfterReply: boolean;
@@ -368,6 +369,7 @@ export const buildTelegramMessageContext = async ({
) {
return null;
}
let initialTypingCueSent = false;
const ensureConfiguredBindingReady = async (): Promise<boolean> => {
if (!configuredBinding) {
return true;
@@ -480,6 +482,15 @@ export const buildTelegramMessageContext = async ({
return null;
}
// Direct chats are now reply-eligible; send the first typing cue before
// expensive context/session construction without showing typing for dropped turns.
if (!isGroup) {
initialTypingCueSent = true;
void sendTyping().catch((err) => {
logVerbose(`telegram early direct typing cue failed for chat ${chatId}: ${String(err)}`);
});
}
const { ctxPayload, skillFilter, turn } = await buildTelegramInboundContextPayload({
cfg,
primaryCtx,
@@ -643,6 +654,7 @@ export const buildTelegramMessageContext = async ({
sendTyping,
sendRecordVoice,
sendChatActionHandler,
initialTypingCueSent,
ackReactionPromise,
reactionApi,
removeAckAfterReply,

View File

@@ -0,0 +1,80 @@
import { buildChannelInboundEventContext } from "openclaw/plugin-sdk/channel-inbound";
import { describe, expect, it, vi } from "vitest";
import { buildTelegramMessageContextForTest } from "./bot-message-context.test-harness.js";
import type { TelegramSendChatActionHandler } from "./sendchataction-401-backoff.js";
function createSendChatActionHandler(
sendChatAction = vi.fn(async () => undefined),
): TelegramSendChatActionHandler & { sendChatAction: typeof sendChatAction } {
return {
sendChatAction,
isSuspended: () => false,
reset: () => undefined,
};
}
describe("buildTelegramMessageContext typing", () => {
it("sends direct typing after body resolution and before session context construction", async () => {
const buildInboundContext = vi.fn(buildChannelInboundEventContext);
const sendChatActionHandler = createSendChatActionHandler();
await expect(
buildTelegramMessageContextForTest({
message: {
chat: { id: 42, type: "private", first_name: "Pat" },
from: { id: 42, first_name: "Pat" },
text: "hello",
},
sendChatActionHandler,
sessionRuntime: {
buildChannelInboundEventContext: buildInboundContext,
},
}),
).resolves.not.toBeNull();
expect(sendChatActionHandler.sendChatAction).toHaveBeenCalledWith(42, "typing", undefined);
expect(sendChatActionHandler.sendChatAction.mock.invocationCallOrder[0]).toBeLessThan(
buildInboundContext.mock.invocationCallOrder[0],
);
});
it("does not send direct typing when there is no replyable body", async () => {
const sendChatActionHandler = createSendChatActionHandler();
await expect(
buildTelegramMessageContextForTest({
message: {
chat: { id: 42, type: "private", first_name: "Pat" },
from: { id: 42, first_name: "Pat" },
text: undefined,
},
sendChatActionHandler,
}),
).resolves.toBeNull();
expect(sendChatActionHandler.sendChatAction).not.toHaveBeenCalled();
});
it("does not send early direct typing before DM access passes", async () => {
const sendChatActionHandler = createSendChatActionHandler();
await expect(
buildTelegramMessageContextForTest({
message: {
chat: { id: 42, type: "private", first_name: "Pat" },
from: { id: 42, first_name: "Pat" },
text: "hello",
},
cfg: {
agents: { defaults: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/openclaw" } },
channels: { telegram: { dmPolicy: "disabled", allowFrom: [] } },
messages: { groupChat: { mentionPatterns: [] } },
},
dmPolicy: "disabled",
sendChatActionHandler,
}),
).resolves.toBeNull();
expect(sendChatActionHandler.sendChatAction).not.toHaveBeenCalled();
});
});

View File

@@ -1981,6 +1981,35 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("does not stream text-only tool results into progress drafts", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await dispatcherOptions.deliver(
{ text: "stdout line one\nstdout line two" },
{ kind: "tool" },
);
await replyOptions?.onItemEvent?.({ kind: "search", progressText: "docs lookup" });
return { queuedFinal: false };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(answerDraftStream.update).not.toHaveBeenCalledWith(
expect.stringContaining("stdout line one"),
);
expect(answerDraftStream.update).toHaveBeenLastCalledWith(
"Shelling\n\n`🛠️ Exec`\n`🔎 Web Search: docs lookup`",
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("does not restart progress drafts after final answer delivery", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(

View File

@@ -1752,19 +1752,28 @@ export const dispatchTelegramMessage = async ({
reasoningStepState.noteReasoningHint();
}
if (segment.lane === "answer" && info.kind === "tool") {
if (
nativeToolProgressDraft &&
canUseNativeToolProgressDraft({
payload: effectivePayload,
reply,
buttons: telegramButtons,
})
) {
const canRepresentAsTransientProgress = canUseNativeToolProgressDraft({
payload: effectivePayload,
reply,
buttons: telegramButtons,
});
if (nativeToolProgressDraft && canRepresentAsTransientProgress) {
if (await pushStreamToolProgress(segment.update.text)) {
blockDelivered = true;
continue;
}
}
if (
canRepresentAsTransientProgress &&
streamMode === "progress" &&
answerLane.stream
) {
// Progress-mode streams render tool status in the
// live draft. Do not also emit text-only tool output
// as answer text, or simple commands duplicate and
// restart the progress draft.
continue;
}
await prepareAnswerLaneForToolProgress();
}
const result =

View File

@@ -160,7 +160,10 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
(options?.ingressBuffer ? ` buffer=${options.ingressBuffer}` : ""),
);
}
if (context.ctxPayload.InboundEventKind !== "room_event") {
if (
context.ctxPayload.InboundEventKind !== "room_event" &&
context.initialTypingCueSent !== true
) {
void context.sendTyping().catch((err) => {
logVerbose(`telegram early typing cue failed for chat ${context.chatId}: ${String(err)}`);
});

View File

@@ -1,6 +1,4 @@
import { spawn } from "node:child_process";
const DEFAULT_TIMEOUT_KILL_GRACE_MS = 30_000;
import { spawnSync } from "node:child_process";
const usage = () => {
console.error("Usage: assertions.mjs <run-with-timeout|assert-image-providers> [...]");
@@ -9,72 +7,16 @@ const usage = () => {
const [mode, ...args] = process.argv.slice(2);
const parsePositiveNumber = (value, label) => {
const parsed = Number(value);
if (!Number.isFinite(parsed) || parsed <= 0) {
throw new Error(`${label} must be a positive number`);
if (mode === "run-with-timeout") {
const [timeoutMs, command, ...commandArgs] = args;
const timeout = Number(timeoutMs);
if (!Number.isFinite(timeout) || timeout <= 0 || !command) {
usage();
}
return parsed;
};
const signalChild = (child, signal) => {
if (!child.pid) {
return;
}
try {
if (process.platform === "win32") {
child.kill(signal);
return;
}
process.kill(-child.pid, signal);
} catch (error) {
if (error?.code !== "ESRCH") {
throw error;
}
}
};
const runWithTimeout = async (timeout, command, commandArgs) => {
const killGrace = parsePositiveNumber(
process.env.OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS ??
String(DEFAULT_TIMEOUT_KILL_GRACE_MS),
"OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS",
);
const child = spawn(command, commandArgs, {
detached: process.platform !== "win32",
env: process.env,
stdio: ["ignore", "pipe", "pipe"],
});
let timedOut = false;
let killTimer;
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => process.stdout.write(chunk));
child.stderr.on("data", (chunk) => process.stderr.write(chunk));
const timeoutTimer = setTimeout(() => {
timedOut = true;
signalChild(child, "SIGTERM");
killTimer = setTimeout(() => signalChild(child, "SIGKILL"), killGrace);
killTimer.unref();
}, timeout);
timeoutTimer.unref();
let spawnError;
child.on("error", (error) => {
spawnError = error;
});
const result = await new Promise((resolve) => {
child.on("close", (status, signal) => resolve({ error: spawnError, signal, status }));
});
clearTimeout(timeoutTimer);
clearTimeout(killTimer);
if (timedOut) {
console.error(`command timed out after ${timeout}ms: ${command}`);
process.exit(1);
}
const result = spawnSync(command, commandArgs, { encoding: "utf8", env: process.env, timeout });
process.stdout.write(result.stdout ?? "");
process.stderr.write(result.stderr ?? "");
if (result.error) {
console.error(`command failed: ${command}: ${result.error.message}`);
process.exit(1);
@@ -84,20 +26,6 @@ const runWithTimeout = async (timeout, command, commandArgs) => {
process.exit(1);
}
process.exit(result.status ?? 0);
};
if (mode === "run-with-timeout") {
const [timeoutMs, command, ...commandArgs] = args;
if (!command) {
usage();
}
let timeout;
try {
timeout = parsePositiveNumber(timeoutMs, "timeoutMs");
} catch {
usage();
}
await runWithTimeout(timeout, command, commandArgs);
}
if (mode === "assert-image-providers") {

View File

@@ -20,7 +20,7 @@ import { discoverAuthStorage, discoverModels } from "./pi-model-discovery.js";
const LIVE = isLiveTestEnabled();
const REQUIRE_PROFILE_KEYS = isLiveProfileKeyModeEnabled();
const DEFAULT_TARGET_MODEL_REF = "openai-codex/gpt-5.4-mini";
const DEFAULT_TARGET_MODEL_REF = "openai-codex/gpt-5.1-codex-mini";
const TARGET_MODEL_REF =
process.env.OPENCLAW_LIVE_OPENAI_REASONING_COMPAT_MODEL?.trim() || DEFAULT_TARGET_MODEL_REF;
const describeLive = LIVE ? describe : describe.skip;

File diff suppressed because it is too large Load Diff

View File

@@ -240,7 +240,7 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(coordinator.getRoutedCounts().block).toBe(0);
});
it("does not wait for direct block dispatcher delivery before resolving block delivery", async () => {
it("waits for direct block dispatcher delivery before resolving block delivery", async () => {
const delivered: unknown[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
@@ -276,68 +276,13 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
});
await deliveryStarted;
await Promise.resolve();
expect(delivered).toEqual([{ text: "hello" }]);
expect(deliverySettled).toBe(true);
expect(deliverySettled).toBe(false);
releaseDelivery?.();
await expect(deliveryPromise).resolves.toBe(true);
expect(deliverySettled).toBe(true);
await dispatcher.waitForIdle();
});
it("waits for pending direct block delivery before resolving tool delivery", async () => {
const delivered: unknown[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
const deliveryStarted = new Promise<void>((resolve) => {
markDeliveryStarted = resolve;
});
const deliveryGate = new Promise<void>((resolve) => {
releaseDelivery = resolve;
});
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
delivered.push(payload);
markDeliveryStarted?.();
await deliveryGate;
},
});
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "visiblechat",
Surface: "visiblechat",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher,
inboundAudio: false,
shouldRouteToOriginating: false,
});
await expect(coordinator.deliver("block", { text: "hello" }, { skipTts: true })).resolves.toBe(
true,
);
await deliveryStarted;
let toolDeliverySettled = false;
const toolDeliveryPromise = coordinator
.deliver("tool", { text: "tool result" }, { skipTts: true })
.then((result) => {
toolDeliverySettled = true;
return result;
});
await Promise.resolve();
expect(delivered).toEqual([{ text: "hello" }]);
expect(toolDeliverySettled).toBe(false);
releaseDelivery?.();
await expect(toolDeliveryPromise).resolves.toBe(true);
expect(toolDeliverySettled).toBe(true);
expect(delivered).toEqual([{ text: "hello" }, { text: "tool result" }]);
});
it("stops waiting for direct block delivery when the ACP dispatch aborts", async () => {

View File

@@ -234,23 +234,11 @@ export function createAcpDispatchDeliveryCoordinator(params: {
},
toolMessageByCallId: new Map(),
};
let hasPendingDirectBlockReplyDelivery = false;
const waitForPendingDirectBlockReplyDelivery = async () => {
if (!hasPendingDirectBlockReplyDelivery) {
return;
}
// ACP direct block replies should not block the common visible-reply path.
// Defer the idle wait until a later tool delivery would otherwise overtake
// that block reply in user-visible ordering.
hasPendingDirectBlockReplyDelivery = false;
await waitForReplyDispatcherIdle(params.dispatcher, params.abortSignal);
};
const settleDirectVisibleText = async () => {
if (state.settledDirectVisibleText || state.queuedDirectVisibleTextDeliveries === 0) {
return;
}
state.settledDirectVisibleText = true;
hasPendingDirectBlockReplyDelivery = false;
await params.dispatcher.waitForIdle();
const failedCounts = params.dispatcher.getFailedCounts();
const failedVisibleCount = failedCounts.block + failedCounts.final;
@@ -452,10 +440,6 @@ export function createAcpDispatchDeliveryCoordinator(params: {
return true;
}
if (kind === "tool") {
await waitForPendingDirectBlockReplyDelivery();
}
const tracksVisibleText = await shouldTreatDeliveredTextAsVisible({
channel: directChannel,
kind,
@@ -478,7 +462,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
state.failedVisibleTextDelivery = true;
}
if (kind === "block" && delivered) {
hasPendingDirectBlockReplyDelivery = true;
await waitForReplyDispatcherIdle(params.dispatcher, params.abortSignal);
}
return delivered;
};

View File

@@ -4738,78 +4738,6 @@ describe("dispatchReplyFromConfig", () => {
expect(replyResolver).not.toHaveBeenCalled();
});
it("does not run plugin-owned binding delivery when the caller already aborted", async () => {
setNoAbort();
hookMocks.runner.hasHooks.mockImplementation(
((hookName?: string) =>
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
);
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "handled",
result: { handled: true, reply: { text: "should not send" } },
});
sessionBindingMocks.resolveByConversation.mockReturnValue({
bindingId: "binding-aborted-1",
targetSessionKey: "plugin-binding:codex:abc123",
targetKind: "session",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "channel:1481858418548412579",
},
status: "active",
boundAt: 1710000000000,
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
pluginRoot: "/workspace/openclaw-app-server",
data: {
kind: "codex-app-server-session",
version: 1,
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/workspace/openclaw",
},
},
} satisfies SessionBindingRecord);
const abortController = new AbortController();
abortController.abort();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "discord",
OriginatingTo: "discord:channel:1481858418548412579",
To: "discord:channel:1481858418548412579",
AccountId: "default",
SenderId: "user-9",
SenderUsername: "ada",
CommandAuthorized: true,
WasMentioned: false,
CommandBody: "who are you",
RawBody: "who are you",
Body: "who are you",
MessageSid: "msg-claim-plugin-aborted-1",
SessionKey: "agent:main:discord:channel:1481858418548412579",
});
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
const result = await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyOptions: { abortSignal: abortController.signal },
replyResolver,
});
expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } });
expect(sessionBindingMocks.touch).not.toHaveBeenCalled();
expect(hookMocks.runner.runInboundClaimForPluginOutcome).not.toHaveBeenCalled();
expect(replyResolver).not.toHaveBeenCalled();
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("lets authorized plugin-owned binding commands fall through to command processing", async () => {
setNoAbort();
expect(
@@ -5778,7 +5706,7 @@ describe("dispatchReplyFromConfig", () => {
expect(callOrder).toEqual(["queued:The answer is 42", "dispatch:The answer is 42"]);
});
it("does not wait for same-channel block dispatcher delivery before resolving block replies", async () => {
it("waits for same-channel block dispatcher delivery before resolving block replies", async () => {
setNoAbort();
const ctx = buildTestCtx({ Provider: "whatsapp" });
const delivered: ReplyPayload[] = [];
@@ -5811,10 +5739,10 @@ describe("dispatchReplyFromConfig", () => {
await deliveryStarted;
expect(delivered).toEqual([{ text: "before tool" }]);
await blockReplyPromise;
expect(blockReplySettled).toBe(true);
expect(blockReplySettled).toBe(false);
releaseDelivery?.();
await blockReplyPromise;
return undefined;
};
@@ -5826,177 +5754,6 @@ describe("dispatchReplyFromConfig", () => {
});
expect(blockReplySettled).toBe(true);
await dispatcher.waitForIdle();
});
it("waits for pending same-channel block delivery before completing block-only dispatch", async () => {
setNoAbort();
const ctx = buildTestCtx({ Provider: "whatsapp" });
const delivered: ReplyPayload[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
const deliveryStarted = new Promise<void>((resolve) => {
markDeliveryStarted = resolve;
});
const deliveryGate = new Promise<void>((resolve) => {
releaseDelivery = resolve;
});
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
delivered.push(payload);
markDeliveryStarted?.();
await deliveryGate;
},
});
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
): Promise<ReplyPayload | undefined> => {
await opts?.onBlockReply?.({ text: "only block" });
return undefined;
};
let dispatchSettled = false;
const dispatchPromise = dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
}).then((result) => {
dispatchSettled = true;
return result;
});
await deliveryStarted;
expect(delivered).toEqual([{ text: "only block" }]);
expect(dispatchSettled).toBe(false);
releaseDelivery?.();
await dispatchPromise;
expect(dispatchSettled).toBe(true);
});
it("waits for pending same-channel block delivery before forwarding tool progress", async () => {
setNoAbort();
const cfg = {
agents: { defaults: { verboseDefault: "on" } },
} as const satisfies OpenClawConfig;
const ctx = buildTestCtx({ Provider: "whatsapp" });
const delivered: ReplyPayload[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
const deliveryStarted = new Promise<void>((resolve) => {
markDeliveryStarted = resolve;
});
const deliveryGate = new Promise<void>((resolve) => {
releaseDelivery = resolve;
});
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
delivered.push(payload);
markDeliveryStarted?.();
await deliveryGate;
},
});
const onToolStart = vi.fn();
let toolProgressSettled = false;
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
): Promise<ReplyPayload | undefined> => {
await opts?.onBlockReply?.({ text: "before tool" });
const toolProgressPromise = Promise.resolve(opts?.onToolStart?.({ name: "lookup" })).then(
() => {
toolProgressSettled = true;
},
);
await deliveryStarted;
expect(delivered).toEqual([{ text: "before tool" }]);
expect(onToolStart).not.toHaveBeenCalled();
expect(toolProgressSettled).toBe(false);
releaseDelivery?.();
await toolProgressPromise;
return undefined;
};
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyResolver,
replyOptions: { onToolStart },
});
expect(toolProgressSettled).toBe(true);
expect(onToolStart).toHaveBeenCalledWith({ name: "lookup" });
});
it("does not synthesize tool-start capability while ordering item progress", async () => {
setNoAbort();
const cfg = {
agents: { defaults: { verboseDefault: "on" } },
} as const satisfies OpenClawConfig;
const ctx = buildTestCtx({ Provider: "whatsapp" });
const delivered: ReplyPayload[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
const deliveryStarted = new Promise<void>((resolve) => {
markDeliveryStarted = resolve;
});
const deliveryGate = new Promise<void>((resolve) => {
releaseDelivery = resolve;
});
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
delivered.push(payload);
markDeliveryStarted?.();
await deliveryGate;
},
});
const onItemEvent = vi.fn();
let itemProgressSettled = false;
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
): Promise<ReplyPayload | undefined> => {
await opts?.onBlockReply?.({ text: "before item" });
expect(opts?.onToolStart).toBeUndefined();
const itemProgressPromise = Promise.resolve(
opts?.onItemEvent?.({ itemId: "1", kind: "tool", progressText: "running" }),
).then(() => {
itemProgressSettled = true;
});
await deliveryStarted;
expect(delivered).toEqual([{ text: "before item" }]);
expect(onItemEvent).not.toHaveBeenCalled();
expect(itemProgressSettled).toBe(false);
releaseDelivery?.();
await itemProgressPromise;
return undefined;
};
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyResolver,
replyOptions: { onItemEvent },
});
expect(itemProgressSettled).toBe(true);
expect(onItemEvent).toHaveBeenCalledWith({
itemId: "1",
kind: "tool",
progressText: "running",
});
});
it("forwards payload metadata into onBlockReplyQueued context", async () => {

View File

@@ -61,7 +61,6 @@ import {
markDiagnosticSessionProgress,
} from "../../logging/diagnostic.js";
import { createDiagnosticMessageLifecycle } from "../../logging/message-lifecycle.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { matchPluginCommand } from "../../plugins/commands.js";
import {
buildPluginBindingDeclinedText,
@@ -132,7 +131,6 @@ import { resolveOriginMessageProvider } from "./origin-routing.js";
import { waitForReplyDispatcherIdle } from "./reply-dispatcher.js";
import type { ReplyDispatcher } from "./reply-dispatcher.types.js";
import { replyRunRegistry, type ReplyOperation } from "./reply-run-registry.js";
import { isReplyProfilerEnabled } from "./reply-timing-tracker.js";
import { admitReplyTurn, resolveReplyTurnKind } from "./reply-turn-admission.js";
import { resolveRoutedDeliveryThreadId } from "./routed-delivery-thread.js";
import { resolveReplyRoutingDecision } from "./routing-policy.js";
@@ -785,102 +783,6 @@ function createAbortAwareDispatcher(params: {
return dispatcher;
}
type ReplyHotPathTimingSpan = {
name: string;
durationMs: number;
elapsedMs: number;
};
type ReplyHotPathTimingSummary = {
totalMs: number;
spans: ReplyHotPathTimingSpan[];
};
const replyHotPathTimingLog = createSubsystemLogger("auto-reply/reply-timing");
const REPLY_HOT_PATH_TIMING_WARN_TOTAL_MS = 1_000;
const REPLY_HOT_PATH_TIMING_WARN_STAGE_MS = 500;
function createReplyHotPathTimingTracker(options: { profilerEnabled?: boolean } = {}): {
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
logIfSlow: (params: {
channel: string;
messageId?: number | string;
sessionKey?: string;
outcome: "completed" | "skipped" | "error";
reason?: string;
}) => void;
} {
if (!options.profilerEnabled) {
// This slow-path splitter was added for latency investigation. Keep it
// inert in normal production dispatches so only explicit profiler runs pay
// the Date.now/span allocation cost.
return {
async measure(_name, run) {
return await run();
},
logIfSlow() {},
};
}
const startedAt = Date.now();
let didLog = false;
const spans: ReplyHotPathTimingSpan[] = [];
const toMs = (value: number) => Math.max(0, Math.round(value));
const snapshot = (): ReplyHotPathTimingSummary => ({
totalMs: toMs(Date.now() - startedAt),
spans: spans.slice(),
});
const shouldLog = (summary: ReplyHotPathTimingSummary) =>
summary.totalMs >= REPLY_HOT_PATH_TIMING_WARN_TOTAL_MS ||
summary.spans.some((span) => span.durationMs >= REPLY_HOT_PATH_TIMING_WARN_STAGE_MS);
const formatSpans = (summary: ReplyHotPathTimingSummary) =>
summary.spans.length > 0
? summary.spans
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
.join(",")
: "none";
return {
async measure(name, run) {
const spanStartedAt = Date.now();
try {
return await run();
} finally {
spans.push({
name,
durationMs: toMs(Date.now() - spanStartedAt),
elapsedMs: toMs(Date.now() - startedAt),
});
}
},
logIfSlow(params) {
if (didLog) {
return;
}
const summary = snapshot();
if (!shouldLog(summary)) {
return;
}
didLog = true;
replyHotPathTimingLog.warn(
`reply hot path timings channel=${params.channel} messageId=${
params.messageId ?? "unknown"
} sessionKey=${params.sessionKey ?? "unknown"} outcome=${params.outcome} totalMs=${
summary.totalMs
} stages=${formatSpans(summary)}${params.reason ? ` reason=${params.reason}` : ""}`,
{
channel: params.channel,
messageId: params.messageId,
sessionKey: params.sessionKey,
outcome: params.outcome,
reason: params.reason,
totalMs: summary.totalMs,
spans: summary.spans,
},
);
},
};
}
export type {
DispatchFromConfigParams,
DispatchFromConfigResult,
@@ -920,17 +822,12 @@ export async function dispatchReplyFromConfig(
hasSessionKey: Boolean(sessionKey),
hasRunId: typeof params.replyOptions?.runId === "string",
};
const replyHotPathTiming = createReplyHotPathTimingTracker({
profilerEnabled: isReplyProfilerEnabled({ config: cfg }),
});
const traceReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
replyHotPathTiming.measure(name, () =>
measureDiagnosticsTimelineSpan(name, run, {
phase: "agent-turn",
config: cfg,
attributes: traceAttributes,
}),
);
measureDiagnosticsTimelineSpan(name, run, {
phase: "agent-turn",
config: cfg,
attributes: traceAttributes,
});
let agentDispatchStartedAt = 0;
const recordProcessed = (
@@ -940,15 +837,6 @@ export async function dispatchReplyFromConfig(
error?: string;
},
) => {
if (diagnosticsEnabled) {
replyHotPathTiming.logIfSlow({
channel,
messageId,
sessionKey,
outcome,
reason: opts?.reason,
});
}
messageLifecycle.markProcessed(outcome, opts);
};
@@ -1544,16 +1432,6 @@ export async function dispatchReplyFromConfig(
counts: dispatcher.getQueuedCounts(),
});
};
const finishReplyOperationAbortedDispatch = (): DispatchFromConfigResult => {
commitInboundDedupeIfClaimed();
recordProcessed("completed", { reason: "reply_operation_aborted" });
markIdle("message_completed");
completeDispatchReplyOperation();
return attachSourceReplyDeliveryMode({
queuedFinal: false,
counts: dispatcher.getQueuedCounts(),
});
};
let pluginFallbackReason:
| "plugin-bound-fallback-missing-plugin"
@@ -1561,9 +1439,6 @@ export async function dispatchReplyFromConfig(
| undefined;
if (pluginOwnedBinding) {
if (isPreDispatchOperationAborted()) {
return finishReplyOperationAbortedDispatch();
}
touchConversationBindingRecord(pluginOwnedBinding.bindingId);
if (shouldBypassPluginOwnedBindingForCommand(ctx)) {
logVerbose(
@@ -2137,17 +2012,6 @@ export async function dispatchReplyFromConfig(
const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary;
const allowSuppressedSourceProgressCallbacks =
params.replyOptions?.allowProgressCallbacksWhenSourceDeliverySuppressed === true;
let hasPendingDirectBlockReplyDelivery = false;
const waitForPendingDirectBlockReplyDelivery = async (abortSignal?: AbortSignal) => {
if (!hasPendingDirectBlockReplyDelivery) {
return;
}
// Direct block replies are queued asynchronously so lightweight replies do
// not wait for dispatcher idle. Flush only before later tool/progress
// callbacks and final completion where external ordering is visible.
hasPendingDirectBlockReplyDelivery = false;
await waitForReplyDispatcherIdle(dispatcher, abortSignal);
};
const shouldForwardProgressCallback = (options?: {
forwardWhenSourceDeliverySuppressed?: boolean;
requiresToolSummaryVisibility?: boolean;
@@ -2167,10 +2031,9 @@ export async function dispatchReplyFromConfig(
forwardWhenSourceDeliverySuppressed?: boolean;
requiresToolSummaryVisibility?: boolean;
onForward?: (...args: Args) => void;
waitForDirectBlockReplyDelivery?: boolean;
},
): ((...args: Args) => Promise<void>) | undefined => {
if (!callback) {
if (!callback && (!suppressAutomaticSourceDelivery || !canTrackSession)) {
return undefined;
}
return async (...args: Args) => {
@@ -2178,12 +2041,6 @@ export async function dispatchReplyFromConfig(
return;
}
markProgress();
if (options?.waitForDirectBlockReplyDelivery) {
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
}
if (shouldForwardProgressCallback(options)) {
options?.onForward?.(...args);
await callback?.(...args);
@@ -2220,12 +2077,10 @@ export async function dispatchReplyFromConfig(
onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
}),
onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
onForward: (payload) => {
if (hasFailedProgressStatus(payload)) {
markVisibleToolErrorProgress();
@@ -2235,7 +2090,6 @@ export async function dispatchReplyFromConfig(
onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
onForward: (payload) => {
if (hasFailedProgressStatus(payload)) {
markVisibleToolErrorProgress();
@@ -2245,20 +2099,14 @@ export async function dispatchReplyFromConfig(
onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
}),
onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
}),
onToolResult: (payload: ReplyPayload) => {
markProgress();
const run = async () => {
if (isDispatchOperationAborted()) {
return;
}
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
@@ -2323,10 +2171,6 @@ export async function dispatchReplyFromConfig(
return;
}
markProgress();
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
markInboundDedupeReplayUnsafe();
if (
shouldForwardProgressCallback({
@@ -2349,10 +2193,6 @@ export async function dispatchReplyFromConfig(
return;
}
markProgress();
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
markInboundDedupeReplayUnsafe();
if (
shouldForwardProgressCallback({
@@ -2383,10 +2223,6 @@ export async function dispatchReplyFromConfig(
return;
}
markProgress();
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
markInboundDedupeReplayUnsafe();
if (
shouldForwardProgressCallback({
@@ -2493,7 +2329,7 @@ export async function dispatchReplyFromConfig(
markInboundDedupeReplayUnsafe();
const delivered = dispatcher.sendBlockReply(normalizedPayload);
if (delivered) {
hasPendingDirectBlockReplyDelivery = true;
await waitForReplyDispatcherIdle(dispatcher, context?.abortSignal);
}
}
};
@@ -2625,7 +2461,6 @@ export async function dispatchReplyFromConfig(
accumulatedBlockTtsText.trim()
) {
try {
await waitForPendingDirectBlockReplyDelivery(getDispatchAbortSignal());
throwIfDispatchOperationAborted();
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
payload: { text: accumulatedBlockTtsText },
@@ -2685,7 +2520,6 @@ export async function dispatchReplyFromConfig(
}
}
await waitForPendingDirectBlockReplyDelivery(getDispatchAbortSignal());
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
commitInboundDedupeIfClaimed();
@@ -2703,7 +2537,14 @@ export async function dispatchReplyFromConfig(
});
} catch (err) {
if (isDispatchReplyOperationAbortedError(err)) {
return finishReplyOperationAbortedDispatch();
commitInboundDedupeIfClaimed();
recordProcessed("completed", { reason: "reply_operation_aborted" });
markIdle("message_completed");
completeDispatchReplyOperation();
return attachSourceReplyDeliveryMode({
queuedFinal: false,
counts: dispatcher.getQueuedCounts(),
});
}
if (inboundDedupeClaim.status === "claimed") {
if (inboundDedupeReplayUnsafe) {

View File

@@ -15,7 +15,6 @@ import { type OpenClawConfig, getRuntimeConfig } from "../../config/config.js";
import { logVerbose } from "../../globals.js";
import { measureDiagnosticsTimelineSpan } from "../../infra/diagnostics-timeline.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { buildAgentHookContextChannelFields } from "../../plugins/hook-agent-context.js";
import { defaultRuntime } from "../../runtime.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
@@ -48,7 +47,6 @@ import { hasInboundMedia } from "./inbound-media.js";
import { emitPreAgentMessageHooks } from "./message-preprocess-hooks.js";
import { createFastTestModelSelectionState, createModelSelectionState } from "./model-selection.js";
import { sanitizePendingFinalDeliveryText } from "./pending-final-delivery.js";
import { createReplyTimingTracker } from "./reply-timing-tracker.js";
import { initSessionState } from "./session.js";
import {
isStaleHeartbeatAutoFallbackOverride,
@@ -91,8 +89,6 @@ const mediaUnderstandingApplyRuntimeLoader = createLazyImportLoader(
const linkUnderstandingApplyRuntimeLoader = createLazyImportLoader(
() => import("../../link-understanding/apply.runtime.js"),
);
const replyResolverTimingLog = createSubsystemLogger("auto-reply/reply-resolver-timing");
const commandsCoreRuntimeLoader = createLazyImportLoader(
() => import("./commands-core.runtime.js"),
);
@@ -218,85 +214,45 @@ export async function getReplyFromConfig(
isFastTestEnv,
configOverride,
});
// Profiler spans stay inert unless diagnostics enable `profiler` or
// `reply.profiler`, so normal replies do not pay per-stage Date.now/array
// bookkeeping while we can still split resolver costs on demand.
const resolverTiming = createReplyTimingTracker({ log: replyResolverTimingLog, config: cfg });
const useFastTestBootstrap = resolverTiming.measureSync("reply.resolve_fast_test_bootstrap", () =>
shouldUseReplyFastTestBootstrap({
isFastTestEnv,
configOverride,
}),
);
const useFastTestRuntime = resolverTiming.measureSync("reply.resolve_fast_test_runtime", () =>
shouldUseReplyFastTestRuntime({
cfg,
isFastTestEnv,
}),
);
const finalized = resolverTiming.measureSync("reply.finalize_context", () =>
finalizeInboundContext(ctx),
);
const { agentSessionKey, agentId } = resolverTiming.measureSync(
"reply.resolve_agent_scope",
() => {
const targetSessionKey = resolveCommandTurnTargetSessionKey(finalized);
const resolvedAgentSessionKey = targetSessionKey || finalized.SessionKey;
return {
agentSessionKey: resolvedAgentSessionKey,
agentId: resolveSessionAgentId({
sessionKey: resolvedAgentSessionKey,
config: cfg,
}),
};
},
);
const traceAttributes = resolverTiming.measureSync("reply.resolve_trace_context", () => ({
const useFastTestBootstrap = shouldUseReplyFastTestBootstrap({
isFastTestEnv,
configOverride,
});
const useFastTestRuntime = shouldUseReplyFastTestRuntime({
cfg,
isFastTestEnv,
});
const finalized = finalizeInboundContext(ctx);
const targetSessionKey = resolveCommandTurnTargetSessionKey(finalized);
const agentSessionKey = targetSessionKey || finalized.SessionKey;
const traceAttributes = {
surface: normalizeOptionalString(finalized.Surface ?? finalized.Provider) ?? "unknown",
hasSessionKey: Boolean(agentSessionKey),
isHeartbeat: opts?.isHeartbeat === true,
hasMedia: hasInboundMedia(finalized),
}));
const messageId = finalized.MessageSid ?? finalized.MessageSidFirst ?? finalized.MessageSidLast;
let resolverTimingSessionKey = agentSessionKey;
const logResolverTiming = (outcome: string, reason?: string, error?: string) =>
resolverTiming.logIfSlow({
message: `reply resolver timings surface=${traceAttributes.surface} messageId=${
messageId ?? "unknown"
} sessionKey=${resolverTimingSessionKey ?? "unknown"} agentId=${agentId}`,
outcome,
reason,
error,
details: {
surface: traceAttributes.surface,
messageId,
sessionKey: resolverTimingSessionKey,
agentId,
},
});
};
const traceGetReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
resolverTiming.measure(name, () =>
measureDiagnosticsTimelineSpan(name, run, {
phase: "agent-turn",
config: cfg,
attributes: traceAttributes,
}),
);
const mergedSkillFilter = resolverTiming.measureSync("reply.resolve_skill_filter", () =>
mergeSkillFilters(opts?.skillFilter, resolveAgentSkillsFilter(cfg, agentId)),
measureDiagnosticsTimelineSpan(name, run, {
phase: "agent-turn",
config: cfg,
attributes: traceAttributes,
});
const agentId = resolveSessionAgentId({
sessionKey: agentSessionKey,
config: cfg,
});
const mergedSkillFilter = mergeSkillFilters(
opts?.skillFilter,
resolveAgentSkillsFilter(cfg, agentId),
);
const resolvedOpts =
mergedSkillFilter !== undefined ? { ...opts, skillFilter: mergedSkillFilter } : opts;
const agentCfg = cfg.agents?.defaults;
const sessionCfg = cfg.session;
const { defaultProvider, defaultModel, aliasIndex } = resolverTiming.measureSync(
"reply.resolve_default_model",
() =>
resolveDefaultModel({
cfg,
agentId,
}),
);
const { defaultProvider, defaultModel, aliasIndex } = resolveDefaultModel({
cfg,
agentId,
});
let provider = defaultProvider;
let model = defaultModel;
let hasResolvedHeartbeatModelOverride = false;
@@ -321,34 +277,22 @@ export async function getReplyFromConfig(
}
}
const { workspaceDirRaw, workspaceDirForNativeCommand, agentDir, timeoutMs } =
resolverTiming.measureSync("reply.resolve_workspace_agent_dir", () => {
const workspaceDirRaw = resolveAgentWorkspaceDir(cfg, agentId) ?? DEFAULT_AGENT_WORKSPACE_DIR;
return {
workspaceDirRaw,
workspaceDirForNativeCommand: workspaceDirRaw,
agentDir: resolveAgentDir(cfg, agentId),
timeoutMs: resolveAgentTimeoutMs({
cfg,
overrideSeconds: opts?.timeoutOverrideSeconds,
}),
};
});
const typing = resolverTiming.measureSync("reply.create_typing_controller", () => {
const configuredTypingSeconds =
agentCfg?.typingIntervalSeconds ?? sessionCfg?.typingIntervalSeconds;
const typingIntervalSeconds =
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6;
const controller = createTypingController({
onReplyStart: opts?.onReplyStart,
onCleanup: opts?.onTypingCleanup,
typingIntervalSeconds,
silentToken: SILENT_REPLY_TOKEN,
log: defaultRuntime.log,
});
opts?.onTypingController?.(controller);
return controller;
const workspaceDirRaw = resolveAgentWorkspaceDir(cfg, agentId) ?? DEFAULT_AGENT_WORKSPACE_DIR;
const workspaceDirForNativeCommand = workspaceDirRaw;
const agentDir = resolveAgentDir(cfg, agentId);
const timeoutMs = resolveAgentTimeoutMs({ cfg, overrideSeconds: opts?.timeoutOverrideSeconds });
const configuredTypingSeconds =
agentCfg?.typingIntervalSeconds ?? sessionCfg?.typingIntervalSeconds;
const typingIntervalSeconds =
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6;
const typing = createTypingController({
onReplyStart: opts?.onReplyStart,
onCleanup: opts?.onTypingCleanup,
typingIntervalSeconds,
silentToken: SILENT_REPLY_TOKEN,
log: defaultRuntime.log,
});
opts?.onTypingController?.(typing);
const nativeSlashCommandFastReply = await traceGetReplyPhase(
"reply.native_slash_command_fast_path",
@@ -372,7 +316,6 @@ export async function getReplyFromConfig(
}),
);
if (nativeSlashCommandFastReply.handled) {
logResolverTiming("completed", "native_slash_command_fast_path");
return nativeSlashCommandFastReply.reply;
}
@@ -447,7 +390,6 @@ export async function getReplyFromConfig(
triggerBodyNormalized,
bodyStripped,
} = sessionState;
resolverTimingSessionKey = sessionKey ?? resolverTimingSessionKey;
if (sessionEntry?.pendingFinalDelivery && sessionEntry.pendingFinalDeliveryText) {
const text = sanitizePendingFinalDeliveryText(sessionEntry.pendingFinalDeliveryText);
@@ -513,7 +455,6 @@ export async function getReplyFromConfig(
}),
});
}
logResolverTiming("completed", "pending_final_delivery_replay");
return { text: replayText };
}
}
@@ -638,8 +579,7 @@ export async function getReplyFromConfig(
triggerBodyNormalized,
commandAuthorized,
});
logResolverTiming("milestone", "before_fast_directive_prepared_reply");
const fastReplyResult = await traceGetReplyPhase("reply.run_prepared_reply", () =>
return await traceGetReplyPhase("reply.run_prepared_reply", () =>
runPreparedReply({
ctx,
sessionCtx,
@@ -696,8 +636,6 @@ export async function getReplyFromConfig(
autoFallbackPrimaryProbe,
}),
);
logResolverTiming("completed", "fast_directive_prepared_reply");
return fastReplyResult;
}
const directiveResult = await traceGetReplyPhase("reply.resolve_directives", () =>
@@ -733,7 +671,6 @@ export async function getReplyFromConfig(
}),
);
if (directiveResult.kind === "reply") {
logResolverTiming("completed", "directive_reply");
return directiveResult.reply;
}
@@ -834,7 +771,6 @@ export async function getReplyFromConfig(
);
if (inlineActionResult.kind === "reply") {
await maybeEmitMissingResetHooks();
logResolverTiming("completed", "inline_action_reply");
return inlineActionResult.reply;
}
await maybeEmitMissingResetHooks();
@@ -926,7 +862,6 @@ export async function getReplyFromConfig(
),
);
if (hookResult?.handled) {
logResolverTiming("completed", "before_agent_reply_hook");
return hookResult.reply ?? { text: SILENT_REPLY_TOKEN };
}
}
@@ -949,8 +884,7 @@ export async function getReplyFromConfig(
);
}
logResolverTiming("milestone", "before_run_prepared_reply");
const replyResult = await traceGetReplyPhase("reply.run_prepared_reply", () =>
return await traceGetReplyPhase("reply.run_prepared_reply", () =>
runPreparedReply({
ctx,
sessionCtx,
@@ -997,6 +931,4 @@ export async function getReplyFromConfig(
autoFallbackPrimaryProbe: runAutoFallbackPrimaryProbe,
}),
);
logResolverTiming("completed", "prepared_reply");
return replyResult;
}

View File

@@ -1,48 +0,0 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { createReplyTimingTracker, isReplyProfilerEnabled } from "./reply-timing-tracker.js";
describe("isReplyProfilerEnabled", () => {
it("matches global and reply profiler diagnostic flags", () => {
const cfg = { diagnostics: { flags: ["reply.profiler"] } } as OpenClawConfig;
expect(isReplyProfilerEnabled({ config: cfg, env: {} as NodeJS.ProcessEnv })).toBe(true);
expect(
isReplyProfilerEnabled({
env: { OPENCLAW_DIAGNOSTICS: "profiler" } as NodeJS.ProcessEnv,
}),
).toBe(true);
});
});
describe("createReplyTimingTracker", () => {
it("is a pass-through tracker unless the profiler flag is enabled", async () => {
const warn = vi.fn();
const tracker = createReplyTimingTracker({ log: { warn } });
expect(tracker.measureSync("sync", () => 42)).toBe(42);
await expect(tracker.measure("async", async () => "ok")).resolves.toBe("ok");
tracker.logIfSlow({ message: "reply timings" });
expect(warn).not.toHaveBeenCalled();
});
it("records and logs spans when the profiler flag is enabled", () => {
const warn = vi.fn();
const tracker = createReplyTimingTracker({
log: { warn },
env: { OPENCLAW_DIAGNOSTICS: "reply.profiler" } as NodeJS.ProcessEnv,
totalWarnMs: 0,
stageWarnMs: 0,
});
expect(tracker.measureSync("sync", () => 7)).toBe(7);
tracker.logIfSlow({ message: "reply timings", outcome: "completed" });
expect(warn).toHaveBeenCalledOnce();
expect(warn.mock.calls[0]?.[0]).toContain("stages=sync:");
expect(warn.mock.calls[0]?.[1]).toMatchObject({
outcome: "completed",
spans: [expect.objectContaining({ name: "sync" })],
});
});
});

View File

@@ -1,141 +0,0 @@
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { isDiagnosticFlagEnabled } from "../../infra/diagnostic-flags.js";
type ReplyTimingSpan = {
name: string;
durationMs: number;
elapsedMs: number;
};
type ReplyTimingSummary = {
totalMs: number;
spans: ReplyTimingSpan[];
};
type ReplyTimingLogger = {
warn: (message: string, details?: Record<string, unknown>) => void;
};
type ReplyTimingTracker = {
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
measureSync: <T>(name: string, run: () => T) => T;
logIfSlow: (params: {
message: string;
outcome?: string;
reason?: string;
error?: string;
details?: Record<string, unknown>;
}) => void;
};
const DEFAULT_TIMING_WARN_TOTAL_MS = 1_000;
const DEFAULT_TIMING_WARN_STAGE_MS = 500;
export function isReplyProfilerEnabled(params?: {
config?: OpenClawConfig;
env?: NodeJS.ProcessEnv;
}): boolean {
const cfg = params?.config;
const env = params?.env ?? process.env;
return (
isDiagnosticFlagEnabled("profiler", cfg, env) ||
isDiagnosticFlagEnabled("reply.profiler", cfg, env)
);
}
export function createReplyTimingTracker(params: {
log: ReplyTimingLogger;
config?: OpenClawConfig;
env?: NodeJS.ProcessEnv;
enabled?: boolean;
totalWarnMs?: number;
stageWarnMs?: number;
}): ReplyTimingTracker {
const enabled =
params.enabled ?? isReplyProfilerEnabled({ config: params.config, env: params.env });
if (!enabled) {
// Normal production turns use pass-through wrappers so added profiling
// calls do not allocate spans or call Date.now on the hot reply path.
return {
async measure(_name, run) {
return await run();
},
measureSync(_name, run) {
return run();
},
logIfSlow() {},
};
}
const startedAt = Date.now();
const spans: ReplyTimingSpan[] = [];
let didLog = false;
const totalWarnMs = params.totalWarnMs ?? DEFAULT_TIMING_WARN_TOTAL_MS;
const stageWarnMs = params.stageWarnMs ?? DEFAULT_TIMING_WARN_STAGE_MS;
const toMs = (value: number) => Math.max(0, Math.round(value));
const record = (name: string, spanStartedAt: number) => {
spans.push({
name,
durationMs: toMs(Date.now() - spanStartedAt),
elapsedMs: toMs(Date.now() - startedAt),
});
};
const snapshot = (): ReplyTimingSummary => ({
totalMs: toMs(Date.now() - startedAt),
spans: spans.slice(),
});
const shouldLog = (summary: ReplyTimingSummary) =>
summary.totalMs >= totalWarnMs || summary.spans.some((span) => span.durationMs >= stageWarnMs);
const formatSpans = (summary: ReplyTimingSummary) =>
summary.spans.length > 0
? summary.spans
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
.join(",")
: "none";
return {
async measure(name, run) {
const spanStartedAt = Date.now();
try {
return await run();
} finally {
record(name, spanStartedAt);
}
},
measureSync(name, run) {
const spanStartedAt = Date.now();
try {
return run();
} finally {
record(name, spanStartedAt);
}
},
logIfSlow(logParams) {
if (didLog) {
return;
}
const summary = snapshot();
if (!shouldLog(summary)) {
return;
}
didLog = true;
const suffix = [
`totalMs=${summary.totalMs}`,
`stages=${formatSpans(summary)}`,
logParams.outcome ? `outcome=${logParams.outcome}` : undefined,
logParams.reason ? `reason=${logParams.reason}` : undefined,
logParams.error ? `error="${logParams.error}"` : undefined,
]
.filter(Boolean)
.join(" ");
params.log.warn(`${logParams.message} ${suffix}`, {
...logParams.details,
outcome: logParams.outcome,
reason: logParams.reason,
error: logParams.error,
totalMs: summary.totalMs,
spans: summary.spans,
});
},
};
}

View File

@@ -133,24 +133,6 @@ describe("startGatewayMaintenanceTimers", () => {
stopMaintenanceTimers(timers);
});
it("refreshes automatic health snapshots without live channel probes", async () => {
vi.useFakeTimers();
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
deps.refreshGatewayHealthSnapshot = vi.fn(async () => ({ ok: true }) as HealthSummary);
const timers = startGatewayMaintenanceTimers(deps);
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenCalledWith({ probe: false });
await vi.advanceTimersByTimeAsync(60_000);
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenCalledTimes(2);
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenLastCalledWith({ probe: false });
stopMaintenanceTimers(timers);
});
it("skips overlapping media cleanup runs", async () => {
vi.useFakeTimers();
let resolveCleanup = () => {};

View File

@@ -72,17 +72,16 @@ export function startGatewayMaintenanceTimers(params: {
params.nodeSendToAllSubscribed("tick", payload);
}, TICK_INTERVAL_MS);
// Keep cached health warm without request-time live channel probes. Explicit
// status/doctor probe paths still pass probe=true when the operator asks.
// periodic health refresh to keep cached snapshot warm
const healthInterval = setInterval(() => {
void params
.refreshGatewayHealthSnapshot({ probe: false })
.refreshGatewayHealthSnapshot({ probe: true })
.catch((err) => params.logHealth.error(`refresh failed: ${formatError(err)}`));
}, HEALTH_REFRESH_INTERVAL_MS);
// Prime cache so first client gets a snapshot without waiting.
void params
.refreshGatewayHealthSnapshot({ probe: false })
.refreshGatewayHealthSnapshot({ probe: true })
.catch((err) => params.logHealth.error(`initial refresh failed: ${formatError(err)}`));
// dedupe cache cleanup

View File

@@ -199,13 +199,6 @@ function deliveryCall(index = 0): Record<string, any> | undefined {
return calls[index]?.[0];
}
function appendTranscriptCall(index = 0): Record<string, any> | undefined {
const calls = mocks.appendAssistantMessageToSessionTranscript.mock.calls as unknown as Array<
[Record<string, any>]
>;
return calls[index]?.[0];
}
function firstRespondCall(respond: ReturnType<typeof vi.fn>) {
const calls = respond.mock.calls as unknown as Array<
[
@@ -1462,178 +1455,6 @@ describe("gateway send mirroring", () => {
});
});
it("waits for source transcript mirroring before responding to message.action", async () => {
const telegramPlugin: ChannelPlugin = {
id: "telegram",
meta: {
id: "telegram",
label: "Telegram",
selectionLabel: "Telegram",
docsPath: "/channels/telegram",
blurb: "Telegram async source send transcript mirror test plugin.",
},
capabilities: { chatTypes: ["direct"] },
config: {
listAccountIds: () => ["default"],
resolveAccount: () => ({ enabled: true }),
isConfigured: () => true,
},
actions: {
describeMessageTool: () => ({ actions: ["send"] }),
supportsAction: ({ action }) => action === "send",
handleAction: async () => jsonResult({ ok: true, messageId: "tg-async-1" }),
},
};
const mirrorDeferred = createDeferred<{ ok: boolean; sessionFile: string }>();
mocks.getChannelPlugin.mockReturnValue(telegramPlugin);
setActivePluginRegistry(
createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]),
"send-test-source-message-action-async-mirror",
);
mocks.dispatchChannelMessageAction.mockResolvedValueOnce(
jsonResult({ ok: true, messageId: "tg-async-1" }),
);
mocks.appendAssistantMessageToSessionTranscript.mockReturnValueOnce(mirrorDeferred.promise);
const respond = vi.fn();
const request = sendHandlers["message.action"]({
params: {
channel: "telegram",
action: "send",
params: {
to: "chat-123",
message: "visible media caption",
},
sessionKey: "agent:main:telegram:direct:chat-123",
agentId: "main",
toolContext: {
currentChannelProvider: "telegram",
currentChannelId: "chat-123",
},
idempotencyKey: "idem-async-source-message-action",
} as never,
respond,
context: makeContext(),
req: { type: "req", id: "1", method: "message.action" },
client: null,
isWebchatConnect: () => false,
});
await vi.waitFor(() => {
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
});
expect(respond).not.toHaveBeenCalled();
mirrorDeferred.resolve({ ok: true, sessionFile: "x" });
await request;
expect(firstRespondCall(respond)[0]).toBe(true);
});
it("preserves source transcript mirror order before message.action responses", async () => {
const telegramPlugin: ChannelPlugin = {
id: "telegram",
meta: {
id: "telegram",
label: "Telegram",
selectionLabel: "Telegram",
docsPath: "/channels/telegram",
blurb: "Telegram ordered async source send transcript mirror test plugin.",
},
capabilities: { chatTypes: ["direct"] },
config: {
listAccountIds: () => ["default"],
resolveAccount: () => ({ enabled: true }),
isConfigured: () => true,
},
actions: {
describeMessageTool: () => ({ actions: ["send"] }),
supportsAction: ({ action }) => action === "send",
handleAction: async () => jsonResult({ ok: true, messageId: "tg-ordered" }),
},
};
const firstMirrorDeferred = createDeferred<{ ok: boolean; sessionFile: string }>();
mocks.getChannelPlugin.mockReturnValue(telegramPlugin);
setActivePluginRegistry(
createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]),
"send-test-source-message-action-ordered-async-mirror",
);
mocks.dispatchChannelMessageAction.mockResolvedValue(
jsonResult({ ok: true, messageId: "tg-ordered" }),
);
mocks.appendAssistantMessageToSessionTranscript
.mockReturnValueOnce(firstMirrorDeferred.promise)
.mockResolvedValueOnce({ ok: true, sessionFile: "x" });
const firstRespond = vi.fn();
const secondRespond = vi.fn();
const first = sendHandlers["message.action"]({
params: {
channel: "telegram",
action: "send",
params: {
to: "chat-123",
message: "first visible reply",
},
sessionKey: "agent:main:telegram:direct:chat-123",
agentId: "main",
toolContext: {
currentChannelProvider: "telegram",
currentChannelId: "chat-123",
},
idempotencyKey: "idem-ordered-source-message-action-1",
} as never,
respond: firstRespond,
context: makeContext(),
req: { type: "req", id: "1", method: "message.action" },
client: null,
isWebchatConnect: () => false,
});
await vi.waitFor(() => {
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
});
const second = sendHandlers["message.action"]({
params: {
channel: "telegram",
action: "send",
params: {
to: "chat-123",
message: "second visible reply",
},
sessionKey: "agent:main:telegram:direct:chat-123",
agentId: "main",
toolContext: {
currentChannelProvider: "telegram",
currentChannelId: "chat-123",
},
idempotencyKey: "idem-ordered-source-message-action-2",
} as never,
respond: secondRespond,
context: makeContext(),
req: { type: "req", id: "2", method: "message.action" },
client: null,
isWebchatConnect: () => false,
});
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
expect(firstRespond).not.toHaveBeenCalled();
expect(secondRespond).not.toHaveBeenCalled();
expect(appendTranscriptCall(0)).toEqual(
expect.objectContaining({ text: "first visible reply" }),
);
firstMirrorDeferred.resolve({ ok: true, sessionFile: "x" });
await first;
await second;
expect(firstRespondCall(firstRespond)[0]).toBe(true);
expect(firstRespondCall(secondRespond)[0]).toBe(true);
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(2);
expect(appendTranscriptCall(1)).toEqual(
expect.objectContaining({ text: "second visible reply" }),
);
});
it("mirrors presentation-only source-conversation message.action sends", async () => {
const telegramPlugin: ChannelPlugin = {
id: "telegram",

View File

@@ -288,37 +288,6 @@ async function mirrorDeliveredSourceReplyToTranscriptBestEffort(params: {
}
}
const sourceReplyTranscriptMirrorQueues = new Map<string, Promise<void>>();
function resolveSourceReplyTranscriptMirrorQueueKey(
mirror: Parameters<typeof mirrorDeliveredSourceReplyToTranscript>[0],
): string {
return mirror.sessionKey?.trim() || "__global__";
}
function scheduleDeliveredSourceReplyTranscriptMirror(params: {
context: GatewayRequestContext;
mirror: Parameters<typeof mirrorDeliveredSourceReplyToTranscript>[0];
}): Promise<void> {
const queueKey = resolveSourceReplyTranscriptMirrorQueueKey(params.mirror);
const previous = sourceReplyTranscriptMirrorQueues.get(queueKey);
// Queue per session so current-conversation source replies are visible before
// a following turn can read the transcript.
const queued = (async () => {
await previous?.catch(() => undefined);
await mirrorDeliveredSourceReplyToTranscriptBestEffort(params);
})();
sourceReplyTranscriptMirrorQueues.set(queueKey, queued);
void queued
.finally(() => {
if (sourceReplyTranscriptMirrorQueues.get(queueKey) === queued) {
sourceReplyTranscriptMirrorQueues.delete(queueKey);
}
})
.catch(() => undefined);
return queued;
}
export const sendHandlers: GatewayRequestHandlers = {
"message.action": async ({ params, respond, context, client }) => {
const p = params;
@@ -430,7 +399,7 @@ export const sendHandlers: GatewayRequestHandlers = {
const agentId =
normalizeOptionalString(request.agentId) ??
(sessionKey ? resolveSessionAgentId({ sessionKey, config: cfg }) : undefined);
await scheduleDeliveredSourceReplyTranscriptMirror({
await mirrorDeliveredSourceReplyToTranscriptBestEffort({
context,
mirror: {
action: request.action,

View File

@@ -224,7 +224,7 @@ describe("attachGatewayWsMessageHandler post-connect health refresh", () => {
expect(hello.ok).toBe(true);
await vi.waitFor(() => {
expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: false });
expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: true });
});
resolveRefresh?.();
});

View File

@@ -1776,10 +1776,7 @@ export function attachGatewayWsMessageHandler(params: GatewayWsMessageHandlerPar
presence: snapshot.presence.length,
stateVersion: snapshot.stateVersion.presence,
});
// Post-connect refresh only needs a cached/config snapshot for UI state;
// live channel probes here pulled slow Discord/Telegram HTTP checks into
// reply-adjacent websocket handshakes.
void refreshHealthSnapshot({ probe: false }).catch((err) =>
void refreshHealthSnapshot({ probe: true }).catch((err) =>
logHealth.error(`post-connect health refresh failed: ${formatError(err)}`),
);
return;

View File

@@ -250,78 +250,6 @@ describe("heartbeat runner skips when target session lane is busy", () => {
});
});
it("returns requests-in-flight when another session for the same agent has an active reply run", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
await seedHeartbeatTelegramSession(storePath, cfg);
const listActiveReplyRunSessionKeys = vi.fn(() => [
"agent:main:telegram:group:-1003966283270:topic:547",
]);
const result = await runHeartbeatOnce({
cfg,
deps: {
getQueueSize: vi.fn((_lane?: string) => 0),
listActiveReplyRunSessionKeys,
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT });
expect(listActiveReplyRunSessionKeys).toHaveBeenCalledOnce();
expect(replySpy).not.toHaveBeenCalled();
});
});
it("ignores unscoped active reply runs when checking same-agent heartbeat work", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
await seedHeartbeatTelegramSession(storePath, cfg);
const listActiveReplyRunSessionKeys = vi.fn(() => ["legacy-session-key"]);
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
const result = await runHeartbeatOnce({
cfg,
deps: {
getQueueSize: vi.fn((_lane?: string) => 0),
listActiveReplyRunSessionKeys,
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result.status).toBe("ran");
expect(listActiveReplyRunSessionKeys).toHaveBeenCalledOnce();
expect(replySpy).toHaveBeenCalledOnce();
});
});
it("does not defer immediate heartbeat wakes for another active session", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
await seedHeartbeatTelegramSession(storePath, cfg);
const listActiveReplyRunSessionKeys = vi.fn(() => [
"agent:main:telegram:group:-1003966283270:topic:547",
]);
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
const result = await runHeartbeatOnce({
cfg,
intent: "immediate",
deps: {
getQueueSize: vi.fn((_lane?: string) => 0),
listActiveReplyRunSessionKeys,
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result.status).toBe("ran");
expect(replySpy).toHaveBeenCalledOnce();
});
});
it("does not defer on a recent heartbeat ack pending final delivery", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();

View File

@@ -36,10 +36,7 @@ import {
} from "../auto-reply/heartbeat.js";
import { replaceGenericExternalRunFailureText } from "../auto-reply/reply/agent-runner-failure-copy.js";
import { resolveDefaultModel } from "../auto-reply/reply/directive-handling.defaults.js";
import {
listActiveReplyRunSessionKeys,
replyRunRegistry,
} from "../auto-reply/reply/reply-run-registry.js";
import { replyRunRegistry } from "../auto-reply/reply/reply-run-registry.js";
import { resolveResponsePrefixTemplate } from "../auto-reply/reply/response-prefix-template.js";
import { HEARTBEAT_TOKEN } from "../auto-reply/tokens.js";
import type { ReplyPayload } from "../auto-reply/types.js";
@@ -151,7 +148,6 @@ export type HeartbeatDeps = OutboundSendDeps &
getQueueSize?: (lane?: string) => number;
getCommandLaneSnapshots?: () => readonly CommandLaneSnapshot[];
isReplyRunActive?: (sessionKey: string) => boolean;
listActiveReplyRunSessionKeys?: () => readonly string[];
nowMs?: () => number;
};
@@ -225,17 +221,6 @@ function hasAgentOptInBusyLaneWork(
return hasQueuedWorkInLaneSnapshots(getSnapshots(), (lane) => laneBelongsToAgent(lane, agentId));
}
function hasActiveReplyRunForAgent(
agentId: string,
listSessionKeys: () => readonly string[],
): boolean {
const normalizedAgentId = normalizeAgentId(agentId);
return listSessionKeys().some((sessionKey) => {
const parsed = parseAgentSessionKey(sessionKey);
return parsed ? normalizeAgentId(parsed.agentId) === normalizedAgentId : false;
});
}
function resolveHeartbeatChannelPlugin(channel: string): ChannelPlugin | undefined {
const activePlugin = getActivePluginChannelRegistry()?.channels.find(
(entry) => entry.plugin.id === channel,
@@ -1360,21 +1345,6 @@ export async function runHeartbeatOnce(opts: {
return { status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY };
}
const shouldHonorActiveReplyRuns = opts.intent !== "immediate" && opts.intent !== "manual";
const listActiveReplyRuns =
opts.deps?.listActiveReplyRunSessionKeys ?? listActiveReplyRunSessionKeys;
// Scheduled heartbeats are background work, so defer them when any session on
// the same agent is already replying; immediate/manual wakes keep their
// existing semantics for explicit user/system actions.
if (shouldHonorActiveReplyRuns && hasActiveReplyRunForAgent(agentId, listActiveReplyRuns)) {
emitHeartbeatEvent({
status: "skipped",
reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
durationMs: Date.now() - startedAt,
});
return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT };
}
// Phase 2: Stronger heartbeat deferral while a final delivery replay is pending.
// Plain `updatedAt` changes are normal for heartbeat sessions and should not
// suppress heartbeat runs; only defer when final delivery recovery is active.

View File

@@ -1,4 +1,3 @@
import { spawnSync } from "node:child_process";
import { readFileSync } from "node:fs";
import { describe, expect, it } from "vitest";
@@ -421,30 +420,4 @@ describe("bun global install smoke", () => {
expect(releaseChecks).toContain("uses: ./.github/workflows/install-smoke.yml");
expect(releaseChecks).toContain("run_bun_global_install_smoke: true");
});
it("kills Bun global install smoke commands that ignore TERM after timeout", () => {
const result = spawnSync(
process.execPath,
[
BUN_GLOBAL_ASSERTIONS_PATH,
"run-with-timeout",
"50",
process.execPath,
"-e",
"process.on('SIGTERM', () => {}); setInterval(() => {}, 1000);",
],
{
encoding: "utf8",
env: {
...process.env,
OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS: "50",
},
timeout: 5000,
},
);
expect(result.error).toBeUndefined();
expect(result.status).toBe(1);
expect(result.stderr).toContain(`command timed out after 50ms: ${process.execPath}`);
});
});