mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-20 13:42:02 +08:00
Compare commits
1 Commits
codex/spli
...
codex/spli
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0dd656bdb3 |
@@ -29,7 +29,6 @@ Docs: https://docs.openclaw.ai
|
||||
- CI: bound Docker/Bash E2E tarball npm installs with `OPENCLAW_E2E_NPM_INSTALL_TIMEOUT` so package, onboarding, plugin, and upgrade lanes fail instead of hanging on a stuck npm install.
|
||||
- CI: keep `OPENCLAW_TESTBOX=1 pnpm check:changed` delegating to Blacksmith Testbox through Crabbox without forwarding local Testbox or worker env into the remote command.
|
||||
- CI: send KILL after the TERM grace period for manual checkout fetch timeouts so stuck Testbox and workflow checkout retries cannot hang behind a wedged `git fetch`.
|
||||
- CI: send KILL after the TERM grace period for Bun global install smoke command timeouts so trapped `openclaw` child processes cannot wedge the scheduled install smoke.
|
||||
- iMessage: thread current channel/account inbound attachment roots into the image tool so iMessage-saved attachments under `~/Library/Messages/Attachments` (including the wildcard `/Users/*/Library/Messages/Attachments` root) are read through the existing inbound path policy instead of being rejected as `path-not-allowed`. Literal `localRoots` stays workspace-scoped. Fixes #30170. (#86569)
|
||||
- QQ Bot: respect `OPENCLAW_HOME` for outbound media path resolution so `<qqmedia>` sends no longer silently fail when `HOME` and `OPENCLAW_HOME` differ (Docker / multi-user hosts). Persisted QQ Bot data (sessions, known users, refs) stays anchored on the OS home for upgrade compatibility. Fixes #83562. Thanks @sliverp.
|
||||
- Update: report the primary malformed `openclaw.extensions` payload error without adding a duplicate missing-main diagnostic. (#86596) Thanks @ferminquant.
|
||||
|
||||
@@ -24,10 +24,12 @@ type BuildTelegramMessageContextForTestParams = {
|
||||
options?: BuildTelegramMessageContextParams["options"];
|
||||
cfg?: Record<string, unknown>;
|
||||
accountId?: string;
|
||||
dmPolicy?: BuildTelegramMessageContextParams["dmPolicy"];
|
||||
historyLimit?: number;
|
||||
groupHistories?: Map<string, import("openclaw/plugin-sdk/reply-history").HistoryEntry[]>;
|
||||
ackReactionScope?: BuildTelegramMessageContextParams["ackReactionScope"];
|
||||
botApi?: Record<string, unknown>;
|
||||
sendChatActionHandler?: BuildTelegramMessageContextParams["sendChatActionHandler"];
|
||||
runtime?: BuildTelegramMessageContextParams["runtime"];
|
||||
sessionRuntime?: BuildTelegramMessageContextParams["sessionRuntime"] | null;
|
||||
resolveGroupActivation?: BuildTelegramMessageContextParams["resolveGroupActivation"];
|
||||
@@ -127,7 +129,7 @@ export async function buildTelegramMessageContextForTest(
|
||||
account: { accountId: params.accountId ?? "default" } as never,
|
||||
historyLimit: params.historyLimit ?? 0,
|
||||
groupHistories: params.groupHistories ?? new Map(),
|
||||
dmPolicy: "open",
|
||||
dmPolicy: params.dmPolicy ?? "open",
|
||||
allowFrom: ["*"],
|
||||
groupAllowFrom: [],
|
||||
ackReactionScope: params.ackReactionScope ?? "off",
|
||||
@@ -140,7 +142,7 @@ export async function buildTelegramMessageContextForTest(
|
||||
groupConfig: { requireMention: false },
|
||||
topicConfig: undefined,
|
||||
})),
|
||||
sendChatActionHandler: { sendChatAction: vi.fn() } as never,
|
||||
sendChatActionHandler: params.sendChatActionHandler ?? ({ sendChatAction: vi.fn() } as never),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -109,6 +109,7 @@ export type TelegramMessageContext = {
|
||||
sendTyping: () => Promise<void>;
|
||||
sendRecordVoice: () => Promise<void>;
|
||||
sendChatActionHandler: BuildTelegramMessageContextParams["sendChatActionHandler"];
|
||||
initialTypingCueSent?: boolean;
|
||||
ackReactionPromise: Promise<boolean> | null;
|
||||
reactionApi: TelegramReactionApi | null;
|
||||
removeAckAfterReply: boolean;
|
||||
@@ -368,6 +369,7 @@ export const buildTelegramMessageContext = async ({
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
let initialTypingCueSent = false;
|
||||
const ensureConfiguredBindingReady = async (): Promise<boolean> => {
|
||||
if (!configuredBinding) {
|
||||
return true;
|
||||
@@ -480,6 +482,15 @@ export const buildTelegramMessageContext = async ({
|
||||
return null;
|
||||
}
|
||||
|
||||
// Direct chats are now reply-eligible; send the first typing cue before
|
||||
// expensive context/session construction without showing typing for dropped turns.
|
||||
if (!isGroup) {
|
||||
initialTypingCueSent = true;
|
||||
void sendTyping().catch((err) => {
|
||||
logVerbose(`telegram early direct typing cue failed for chat ${chatId}: ${String(err)}`);
|
||||
});
|
||||
}
|
||||
|
||||
const { ctxPayload, skillFilter, turn } = await buildTelegramInboundContextPayload({
|
||||
cfg,
|
||||
primaryCtx,
|
||||
@@ -643,6 +654,7 @@ export const buildTelegramMessageContext = async ({
|
||||
sendTyping,
|
||||
sendRecordVoice,
|
||||
sendChatActionHandler,
|
||||
initialTypingCueSent,
|
||||
ackReactionPromise,
|
||||
reactionApi,
|
||||
removeAckAfterReply,
|
||||
|
||||
80
extensions/telegram/src/bot-message-context.typing.test.ts
Normal file
80
extensions/telegram/src/bot-message-context.typing.test.ts
Normal file
@@ -0,0 +1,80 @@
|
||||
import { buildChannelInboundEventContext } from "openclaw/plugin-sdk/channel-inbound";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { buildTelegramMessageContextForTest } from "./bot-message-context.test-harness.js";
|
||||
import type { TelegramSendChatActionHandler } from "./sendchataction-401-backoff.js";
|
||||
|
||||
function createSendChatActionHandler(
|
||||
sendChatAction = vi.fn(async () => undefined),
|
||||
): TelegramSendChatActionHandler & { sendChatAction: typeof sendChatAction } {
|
||||
return {
|
||||
sendChatAction,
|
||||
isSuspended: () => false,
|
||||
reset: () => undefined,
|
||||
};
|
||||
}
|
||||
|
||||
describe("buildTelegramMessageContext typing", () => {
|
||||
it("sends direct typing after body resolution and before session context construction", async () => {
|
||||
const buildInboundContext = vi.fn(buildChannelInboundEventContext);
|
||||
const sendChatActionHandler = createSendChatActionHandler();
|
||||
|
||||
await expect(
|
||||
buildTelegramMessageContextForTest({
|
||||
message: {
|
||||
chat: { id: 42, type: "private", first_name: "Pat" },
|
||||
from: { id: 42, first_name: "Pat" },
|
||||
text: "hello",
|
||||
},
|
||||
sendChatActionHandler,
|
||||
sessionRuntime: {
|
||||
buildChannelInboundEventContext: buildInboundContext,
|
||||
},
|
||||
}),
|
||||
).resolves.not.toBeNull();
|
||||
|
||||
expect(sendChatActionHandler.sendChatAction).toHaveBeenCalledWith(42, "typing", undefined);
|
||||
expect(sendChatActionHandler.sendChatAction.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
buildInboundContext.mock.invocationCallOrder[0],
|
||||
);
|
||||
});
|
||||
|
||||
it("does not send direct typing when there is no replyable body", async () => {
|
||||
const sendChatActionHandler = createSendChatActionHandler();
|
||||
|
||||
await expect(
|
||||
buildTelegramMessageContextForTest({
|
||||
message: {
|
||||
chat: { id: 42, type: "private", first_name: "Pat" },
|
||||
from: { id: 42, first_name: "Pat" },
|
||||
text: undefined,
|
||||
},
|
||||
sendChatActionHandler,
|
||||
}),
|
||||
).resolves.toBeNull();
|
||||
|
||||
expect(sendChatActionHandler.sendChatAction).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not send early direct typing before DM access passes", async () => {
|
||||
const sendChatActionHandler = createSendChatActionHandler();
|
||||
|
||||
await expect(
|
||||
buildTelegramMessageContextForTest({
|
||||
message: {
|
||||
chat: { id: 42, type: "private", first_name: "Pat" },
|
||||
from: { id: 42, first_name: "Pat" },
|
||||
text: "hello",
|
||||
},
|
||||
cfg: {
|
||||
agents: { defaults: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/openclaw" } },
|
||||
channels: { telegram: { dmPolicy: "disabled", allowFrom: [] } },
|
||||
messages: { groupChat: { mentionPatterns: [] } },
|
||||
},
|
||||
dmPolicy: "disabled",
|
||||
sendChatActionHandler,
|
||||
}),
|
||||
).resolves.toBeNull();
|
||||
|
||||
expect(sendChatActionHandler.sendChatAction).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -1981,6 +1981,35 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
expect(editMessageTelegram).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not stream text-only tool results into progress drafts", async () => {
|
||||
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
|
||||
await dispatcherOptions.deliver(
|
||||
{ text: "stdout line one\nstdout line two" },
|
||||
{ kind: "tool" },
|
||||
);
|
||||
await replyOptions?.onItemEvent?.({ kind: "search", progressText: "docs lookup" });
|
||||
return { queuedFinal: false };
|
||||
},
|
||||
);
|
||||
|
||||
await dispatchWithContext({
|
||||
context: createContext(),
|
||||
streamMode: "progress",
|
||||
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
|
||||
});
|
||||
|
||||
expect(answerDraftStream.update).not.toHaveBeenCalledWith(
|
||||
expect.stringContaining("stdout line one"),
|
||||
);
|
||||
expect(answerDraftStream.update).toHaveBeenLastCalledWith(
|
||||
"Shelling\n\n`🛠️ Exec`\n`🔎 Web Search: docs lookup`",
|
||||
);
|
||||
expect(deliverReplies).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not restart progress drafts after final answer delivery", async () => {
|
||||
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
|
||||
@@ -1752,19 +1752,28 @@ export const dispatchTelegramMessage = async ({
|
||||
reasoningStepState.noteReasoningHint();
|
||||
}
|
||||
if (segment.lane === "answer" && info.kind === "tool") {
|
||||
if (
|
||||
nativeToolProgressDraft &&
|
||||
canUseNativeToolProgressDraft({
|
||||
payload: effectivePayload,
|
||||
reply,
|
||||
buttons: telegramButtons,
|
||||
})
|
||||
) {
|
||||
const canRepresentAsTransientProgress = canUseNativeToolProgressDraft({
|
||||
payload: effectivePayload,
|
||||
reply,
|
||||
buttons: telegramButtons,
|
||||
});
|
||||
if (nativeToolProgressDraft && canRepresentAsTransientProgress) {
|
||||
if (await pushStreamToolProgress(segment.update.text)) {
|
||||
blockDelivered = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (
|
||||
canRepresentAsTransientProgress &&
|
||||
streamMode === "progress" &&
|
||||
answerLane.stream
|
||||
) {
|
||||
// Progress-mode streams render tool status in the
|
||||
// live draft. Do not also emit text-only tool output
|
||||
// as answer text, or simple commands duplicate and
|
||||
// restart the progress draft.
|
||||
continue;
|
||||
}
|
||||
await prepareAnswerLaneForToolProgress();
|
||||
}
|
||||
const result =
|
||||
|
||||
@@ -160,7 +160,10 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
|
||||
(options?.ingressBuffer ? ` buffer=${options.ingressBuffer}` : ""),
|
||||
);
|
||||
}
|
||||
if (context.ctxPayload.InboundEventKind !== "room_event") {
|
||||
if (
|
||||
context.ctxPayload.InboundEventKind !== "room_event" &&
|
||||
context.initialTypingCueSent !== true
|
||||
) {
|
||||
void context.sendTyping().catch((err) => {
|
||||
logVerbose(`telegram early typing cue failed for chat ${context.chatId}: ${String(err)}`);
|
||||
});
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
import { spawn } from "node:child_process";
|
||||
|
||||
const DEFAULT_TIMEOUT_KILL_GRACE_MS = 30_000;
|
||||
import { spawnSync } from "node:child_process";
|
||||
|
||||
const usage = () => {
|
||||
console.error("Usage: assertions.mjs <run-with-timeout|assert-image-providers> [...]");
|
||||
@@ -9,72 +7,16 @@ const usage = () => {
|
||||
|
||||
const [mode, ...args] = process.argv.slice(2);
|
||||
|
||||
const parsePositiveNumber = (value, label) => {
|
||||
const parsed = Number(value);
|
||||
if (!Number.isFinite(parsed) || parsed <= 0) {
|
||||
throw new Error(`${label} must be a positive number`);
|
||||
if (mode === "run-with-timeout") {
|
||||
const [timeoutMs, command, ...commandArgs] = args;
|
||||
const timeout = Number(timeoutMs);
|
||||
if (!Number.isFinite(timeout) || timeout <= 0 || !command) {
|
||||
usage();
|
||||
}
|
||||
return parsed;
|
||||
};
|
||||
|
||||
const signalChild = (child, signal) => {
|
||||
if (!child.pid) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (process.platform === "win32") {
|
||||
child.kill(signal);
|
||||
return;
|
||||
}
|
||||
process.kill(-child.pid, signal);
|
||||
} catch (error) {
|
||||
if (error?.code !== "ESRCH") {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const runWithTimeout = async (timeout, command, commandArgs) => {
|
||||
const killGrace = parsePositiveNumber(
|
||||
process.env.OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS ??
|
||||
String(DEFAULT_TIMEOUT_KILL_GRACE_MS),
|
||||
"OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS",
|
||||
);
|
||||
const child = spawn(command, commandArgs, {
|
||||
detached: process.platform !== "win32",
|
||||
env: process.env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
let timedOut = false;
|
||||
let killTimer;
|
||||
|
||||
child.stdout.setEncoding("utf8");
|
||||
child.stderr.setEncoding("utf8");
|
||||
child.stdout.on("data", (chunk) => process.stdout.write(chunk));
|
||||
child.stderr.on("data", (chunk) => process.stderr.write(chunk));
|
||||
|
||||
const timeoutTimer = setTimeout(() => {
|
||||
timedOut = true;
|
||||
signalChild(child, "SIGTERM");
|
||||
killTimer = setTimeout(() => signalChild(child, "SIGKILL"), killGrace);
|
||||
killTimer.unref();
|
||||
}, timeout);
|
||||
timeoutTimer.unref();
|
||||
|
||||
let spawnError;
|
||||
child.on("error", (error) => {
|
||||
spawnError = error;
|
||||
});
|
||||
const result = await new Promise((resolve) => {
|
||||
child.on("close", (status, signal) => resolve({ error: spawnError, signal, status }));
|
||||
});
|
||||
|
||||
clearTimeout(timeoutTimer);
|
||||
clearTimeout(killTimer);
|
||||
if (timedOut) {
|
||||
console.error(`command timed out after ${timeout}ms: ${command}`);
|
||||
process.exit(1);
|
||||
}
|
||||
const result = spawnSync(command, commandArgs, { encoding: "utf8", env: process.env, timeout });
|
||||
process.stdout.write(result.stdout ?? "");
|
||||
process.stderr.write(result.stderr ?? "");
|
||||
if (result.error) {
|
||||
console.error(`command failed: ${command}: ${result.error.message}`);
|
||||
process.exit(1);
|
||||
@@ -84,20 +26,6 @@ const runWithTimeout = async (timeout, command, commandArgs) => {
|
||||
process.exit(1);
|
||||
}
|
||||
process.exit(result.status ?? 0);
|
||||
};
|
||||
|
||||
if (mode === "run-with-timeout") {
|
||||
const [timeoutMs, command, ...commandArgs] = args;
|
||||
if (!command) {
|
||||
usage();
|
||||
}
|
||||
let timeout;
|
||||
try {
|
||||
timeout = parsePositiveNumber(timeoutMs, "timeoutMs");
|
||||
} catch {
|
||||
usage();
|
||||
}
|
||||
await runWithTimeout(timeout, command, commandArgs);
|
||||
}
|
||||
|
||||
if (mode === "assert-image-providers") {
|
||||
|
||||
@@ -20,7 +20,7 @@ import { discoverAuthStorage, discoverModels } from "./pi-model-discovery.js";
|
||||
|
||||
const LIVE = isLiveTestEnabled();
|
||||
const REQUIRE_PROFILE_KEYS = isLiveProfileKeyModeEnabled();
|
||||
const DEFAULT_TARGET_MODEL_REF = "openai-codex/gpt-5.4-mini";
|
||||
const DEFAULT_TARGET_MODEL_REF = "openai-codex/gpt-5.1-codex-mini";
|
||||
const TARGET_MODEL_REF =
|
||||
process.env.OPENCLAW_LIVE_OPENAI_REASONING_COMPAT_MODEL?.trim() || DEFAULT_TARGET_MODEL_REF;
|
||||
const describeLive = LIVE ? describe : describe.skip;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -240,7 +240,7 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
|
||||
expect(coordinator.getRoutedCounts().block).toBe(0);
|
||||
});
|
||||
|
||||
it("does not wait for direct block dispatcher delivery before resolving block delivery", async () => {
|
||||
it("waits for direct block dispatcher delivery before resolving block delivery", async () => {
|
||||
const delivered: unknown[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
@@ -276,68 +276,13 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
|
||||
});
|
||||
|
||||
await deliveryStarted;
|
||||
await Promise.resolve();
|
||||
|
||||
expect(delivered).toEqual([{ text: "hello" }]);
|
||||
expect(deliverySettled).toBe(true);
|
||||
expect(deliverySettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await expect(deliveryPromise).resolves.toBe(true);
|
||||
expect(deliverySettled).toBe(true);
|
||||
await dispatcher.waitForIdle();
|
||||
});
|
||||
|
||||
it("waits for pending direct block delivery before resolving tool delivery", async () => {
|
||||
const delivered: unknown[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
const deliveryStarted = new Promise<void>((resolve) => {
|
||||
markDeliveryStarted = resolve;
|
||||
});
|
||||
const deliveryGate = new Promise<void>((resolve) => {
|
||||
releaseDelivery = resolve;
|
||||
});
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
delivered.push(payload);
|
||||
markDeliveryStarted?.();
|
||||
await deliveryGate;
|
||||
},
|
||||
});
|
||||
const coordinator = createAcpDispatchDeliveryCoordinator({
|
||||
cfg: createAcpTestConfig(),
|
||||
ctx: buildTestCtx({
|
||||
Provider: "visiblechat",
|
||||
Surface: "visiblechat",
|
||||
SessionKey: "agent:codex-acp:session-1",
|
||||
}),
|
||||
dispatcher,
|
||||
inboundAudio: false,
|
||||
shouldRouteToOriginating: false,
|
||||
});
|
||||
|
||||
await expect(coordinator.deliver("block", { text: "hello" }, { skipTts: true })).resolves.toBe(
|
||||
true,
|
||||
);
|
||||
await deliveryStarted;
|
||||
|
||||
let toolDeliverySettled = false;
|
||||
const toolDeliveryPromise = coordinator
|
||||
.deliver("tool", { text: "tool result" }, { skipTts: true })
|
||||
.then((result) => {
|
||||
toolDeliverySettled = true;
|
||||
return result;
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
|
||||
expect(delivered).toEqual([{ text: "hello" }]);
|
||||
expect(toolDeliverySettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await expect(toolDeliveryPromise).resolves.toBe(true);
|
||||
expect(toolDeliverySettled).toBe(true);
|
||||
expect(delivered).toEqual([{ text: "hello" }, { text: "tool result" }]);
|
||||
});
|
||||
|
||||
it("stops waiting for direct block delivery when the ACP dispatch aborts", async () => {
|
||||
|
||||
@@ -234,23 +234,11 @@ export function createAcpDispatchDeliveryCoordinator(params: {
|
||||
},
|
||||
toolMessageByCallId: new Map(),
|
||||
};
|
||||
let hasPendingDirectBlockReplyDelivery = false;
|
||||
const waitForPendingDirectBlockReplyDelivery = async () => {
|
||||
if (!hasPendingDirectBlockReplyDelivery) {
|
||||
return;
|
||||
}
|
||||
// ACP direct block replies should not block the common visible-reply path.
|
||||
// Defer the idle wait until a later tool delivery would otherwise overtake
|
||||
// that block reply in user-visible ordering.
|
||||
hasPendingDirectBlockReplyDelivery = false;
|
||||
await waitForReplyDispatcherIdle(params.dispatcher, params.abortSignal);
|
||||
};
|
||||
const settleDirectVisibleText = async () => {
|
||||
if (state.settledDirectVisibleText || state.queuedDirectVisibleTextDeliveries === 0) {
|
||||
return;
|
||||
}
|
||||
state.settledDirectVisibleText = true;
|
||||
hasPendingDirectBlockReplyDelivery = false;
|
||||
await params.dispatcher.waitForIdle();
|
||||
const failedCounts = params.dispatcher.getFailedCounts();
|
||||
const failedVisibleCount = failedCounts.block + failedCounts.final;
|
||||
@@ -452,10 +440,6 @@ export function createAcpDispatchDeliveryCoordinator(params: {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (kind === "tool") {
|
||||
await waitForPendingDirectBlockReplyDelivery();
|
||||
}
|
||||
|
||||
const tracksVisibleText = await shouldTreatDeliveredTextAsVisible({
|
||||
channel: directChannel,
|
||||
kind,
|
||||
@@ -478,7 +462,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
|
||||
state.failedVisibleTextDelivery = true;
|
||||
}
|
||||
if (kind === "block" && delivered) {
|
||||
hasPendingDirectBlockReplyDelivery = true;
|
||||
await waitForReplyDispatcherIdle(params.dispatcher, params.abortSignal);
|
||||
}
|
||||
return delivered;
|
||||
};
|
||||
|
||||
@@ -4738,78 +4738,6 @@ describe("dispatchReplyFromConfig", () => {
|
||||
expect(replyResolver).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not run plugin-owned binding delivery when the caller already aborted", async () => {
|
||||
setNoAbort();
|
||||
hookMocks.runner.hasHooks.mockImplementation(
|
||||
((hookName?: string) =>
|
||||
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
|
||||
);
|
||||
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
|
||||
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
|
||||
status: "handled",
|
||||
result: { handled: true, reply: { text: "should not send" } },
|
||||
});
|
||||
sessionBindingMocks.resolveByConversation.mockReturnValue({
|
||||
bindingId: "binding-aborted-1",
|
||||
targetSessionKey: "plugin-binding:codex:abc123",
|
||||
targetKind: "session",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1481858418548412579",
|
||||
},
|
||||
status: "active",
|
||||
boundAt: 1710000000000,
|
||||
metadata: {
|
||||
pluginBindingOwner: "plugin",
|
||||
pluginId: "openclaw-codex-app-server",
|
||||
pluginRoot: "/workspace/openclaw-app-server",
|
||||
data: {
|
||||
kind: "codex-app-server-session",
|
||||
version: 1,
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
workspaceDir: "/workspace/openclaw",
|
||||
},
|
||||
},
|
||||
} satisfies SessionBindingRecord);
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
const cfg = emptyConfig;
|
||||
const dispatcher = createDispatcher();
|
||||
const ctx = buildTestCtx({
|
||||
Provider: "discord",
|
||||
Surface: "discord",
|
||||
OriginatingChannel: "discord",
|
||||
OriginatingTo: "discord:channel:1481858418548412579",
|
||||
To: "discord:channel:1481858418548412579",
|
||||
AccountId: "default",
|
||||
SenderId: "user-9",
|
||||
SenderUsername: "ada",
|
||||
CommandAuthorized: true,
|
||||
WasMentioned: false,
|
||||
CommandBody: "who are you",
|
||||
RawBody: "who are you",
|
||||
Body: "who are you",
|
||||
MessageSid: "msg-claim-plugin-aborted-1",
|
||||
SessionKey: "agent:main:discord:channel:1481858418548412579",
|
||||
});
|
||||
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
|
||||
|
||||
const result = await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions: { abortSignal: abortController.signal },
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } });
|
||||
expect(sessionBindingMocks.touch).not.toHaveBeenCalled();
|
||||
expect(hookMocks.runner.runInboundClaimForPluginOutcome).not.toHaveBeenCalled();
|
||||
expect(replyResolver).not.toHaveBeenCalled();
|
||||
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("lets authorized plugin-owned binding commands fall through to command processing", async () => {
|
||||
setNoAbort();
|
||||
expect(
|
||||
@@ -5778,7 +5706,7 @@ describe("dispatchReplyFromConfig", () => {
|
||||
expect(callOrder).toEqual(["queued:The answer is 42", "dispatch:The answer is 42"]);
|
||||
});
|
||||
|
||||
it("does not wait for same-channel block dispatcher delivery before resolving block replies", async () => {
|
||||
it("waits for same-channel block dispatcher delivery before resolving block replies", async () => {
|
||||
setNoAbort();
|
||||
const ctx = buildTestCtx({ Provider: "whatsapp" });
|
||||
const delivered: ReplyPayload[] = [];
|
||||
@@ -5811,10 +5739,10 @@ describe("dispatchReplyFromConfig", () => {
|
||||
await deliveryStarted;
|
||||
|
||||
expect(delivered).toEqual([{ text: "before tool" }]);
|
||||
await blockReplyPromise;
|
||||
expect(blockReplySettled).toBe(true);
|
||||
expect(blockReplySettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await blockReplyPromise;
|
||||
return undefined;
|
||||
};
|
||||
|
||||
@@ -5826,177 +5754,6 @@ describe("dispatchReplyFromConfig", () => {
|
||||
});
|
||||
|
||||
expect(blockReplySettled).toBe(true);
|
||||
await dispatcher.waitForIdle();
|
||||
});
|
||||
|
||||
it("waits for pending same-channel block delivery before completing block-only dispatch", async () => {
|
||||
setNoAbort();
|
||||
const ctx = buildTestCtx({ Provider: "whatsapp" });
|
||||
const delivered: ReplyPayload[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
const deliveryStarted = new Promise<void>((resolve) => {
|
||||
markDeliveryStarted = resolve;
|
||||
});
|
||||
const deliveryGate = new Promise<void>((resolve) => {
|
||||
releaseDelivery = resolve;
|
||||
});
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
delivered.push(payload);
|
||||
markDeliveryStarted?.();
|
||||
await deliveryGate;
|
||||
},
|
||||
});
|
||||
const replyResolver = async (
|
||||
_ctx: MsgContext,
|
||||
opts?: GetReplyOptions,
|
||||
): Promise<ReplyPayload | undefined> => {
|
||||
await opts?.onBlockReply?.({ text: "only block" });
|
||||
return undefined;
|
||||
};
|
||||
|
||||
let dispatchSettled = false;
|
||||
const dispatchPromise = dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
}).then((result) => {
|
||||
dispatchSettled = true;
|
||||
return result;
|
||||
});
|
||||
|
||||
await deliveryStarted;
|
||||
|
||||
expect(delivered).toEqual([{ text: "only block" }]);
|
||||
expect(dispatchSettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await dispatchPromise;
|
||||
|
||||
expect(dispatchSettled).toBe(true);
|
||||
});
|
||||
|
||||
it("waits for pending same-channel block delivery before forwarding tool progress", async () => {
|
||||
setNoAbort();
|
||||
const cfg = {
|
||||
agents: { defaults: { verboseDefault: "on" } },
|
||||
} as const satisfies OpenClawConfig;
|
||||
const ctx = buildTestCtx({ Provider: "whatsapp" });
|
||||
const delivered: ReplyPayload[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
const deliveryStarted = new Promise<void>((resolve) => {
|
||||
markDeliveryStarted = resolve;
|
||||
});
|
||||
const deliveryGate = new Promise<void>((resolve) => {
|
||||
releaseDelivery = resolve;
|
||||
});
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
delivered.push(payload);
|
||||
markDeliveryStarted?.();
|
||||
await deliveryGate;
|
||||
},
|
||||
});
|
||||
const onToolStart = vi.fn();
|
||||
let toolProgressSettled = false;
|
||||
const replyResolver = async (
|
||||
_ctx: MsgContext,
|
||||
opts?: GetReplyOptions,
|
||||
): Promise<ReplyPayload | undefined> => {
|
||||
await opts?.onBlockReply?.({ text: "before tool" });
|
||||
const toolProgressPromise = Promise.resolve(opts?.onToolStart?.({ name: "lookup" })).then(
|
||||
() => {
|
||||
toolProgressSettled = true;
|
||||
},
|
||||
);
|
||||
|
||||
await deliveryStarted;
|
||||
|
||||
expect(delivered).toEqual([{ text: "before tool" }]);
|
||||
expect(onToolStart).not.toHaveBeenCalled();
|
||||
expect(toolProgressSettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await toolProgressPromise;
|
||||
return undefined;
|
||||
};
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
replyOptions: { onToolStart },
|
||||
});
|
||||
|
||||
expect(toolProgressSettled).toBe(true);
|
||||
expect(onToolStart).toHaveBeenCalledWith({ name: "lookup" });
|
||||
});
|
||||
|
||||
it("does not synthesize tool-start capability while ordering item progress", async () => {
|
||||
setNoAbort();
|
||||
const cfg = {
|
||||
agents: { defaults: { verboseDefault: "on" } },
|
||||
} as const satisfies OpenClawConfig;
|
||||
const ctx = buildTestCtx({ Provider: "whatsapp" });
|
||||
const delivered: ReplyPayload[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
const deliveryStarted = new Promise<void>((resolve) => {
|
||||
markDeliveryStarted = resolve;
|
||||
});
|
||||
const deliveryGate = new Promise<void>((resolve) => {
|
||||
releaseDelivery = resolve;
|
||||
});
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
delivered.push(payload);
|
||||
markDeliveryStarted?.();
|
||||
await deliveryGate;
|
||||
},
|
||||
});
|
||||
const onItemEvent = vi.fn();
|
||||
let itemProgressSettled = false;
|
||||
const replyResolver = async (
|
||||
_ctx: MsgContext,
|
||||
opts?: GetReplyOptions,
|
||||
): Promise<ReplyPayload | undefined> => {
|
||||
await opts?.onBlockReply?.({ text: "before item" });
|
||||
expect(opts?.onToolStart).toBeUndefined();
|
||||
const itemProgressPromise = Promise.resolve(
|
||||
opts?.onItemEvent?.({ itemId: "1", kind: "tool", progressText: "running" }),
|
||||
).then(() => {
|
||||
itemProgressSettled = true;
|
||||
});
|
||||
|
||||
await deliveryStarted;
|
||||
|
||||
expect(delivered).toEqual([{ text: "before item" }]);
|
||||
expect(onItemEvent).not.toHaveBeenCalled();
|
||||
expect(itemProgressSettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await itemProgressPromise;
|
||||
return undefined;
|
||||
};
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
replyOptions: { onItemEvent },
|
||||
});
|
||||
|
||||
expect(itemProgressSettled).toBe(true);
|
||||
expect(onItemEvent).toHaveBeenCalledWith({
|
||||
itemId: "1",
|
||||
kind: "tool",
|
||||
progressText: "running",
|
||||
});
|
||||
});
|
||||
|
||||
it("forwards payload metadata into onBlockReplyQueued context", async () => {
|
||||
|
||||
@@ -61,7 +61,6 @@ import {
|
||||
markDiagnosticSessionProgress,
|
||||
} from "../../logging/diagnostic.js";
|
||||
import { createDiagnosticMessageLifecycle } from "../../logging/message-lifecycle.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { matchPluginCommand } from "../../plugins/commands.js";
|
||||
import {
|
||||
buildPluginBindingDeclinedText,
|
||||
@@ -132,7 +131,6 @@ import { resolveOriginMessageProvider } from "./origin-routing.js";
|
||||
import { waitForReplyDispatcherIdle } from "./reply-dispatcher.js";
|
||||
import type { ReplyDispatcher } from "./reply-dispatcher.types.js";
|
||||
import { replyRunRegistry, type ReplyOperation } from "./reply-run-registry.js";
|
||||
import { isReplyProfilerEnabled } from "./reply-timing-tracker.js";
|
||||
import { admitReplyTurn, resolveReplyTurnKind } from "./reply-turn-admission.js";
|
||||
import { resolveRoutedDeliveryThreadId } from "./routed-delivery-thread.js";
|
||||
import { resolveReplyRoutingDecision } from "./routing-policy.js";
|
||||
@@ -785,102 +783,6 @@ function createAbortAwareDispatcher(params: {
|
||||
return dispatcher;
|
||||
}
|
||||
|
||||
type ReplyHotPathTimingSpan = {
|
||||
name: string;
|
||||
durationMs: number;
|
||||
elapsedMs: number;
|
||||
};
|
||||
|
||||
type ReplyHotPathTimingSummary = {
|
||||
totalMs: number;
|
||||
spans: ReplyHotPathTimingSpan[];
|
||||
};
|
||||
|
||||
const replyHotPathTimingLog = createSubsystemLogger("auto-reply/reply-timing");
|
||||
const REPLY_HOT_PATH_TIMING_WARN_TOTAL_MS = 1_000;
|
||||
const REPLY_HOT_PATH_TIMING_WARN_STAGE_MS = 500;
|
||||
|
||||
function createReplyHotPathTimingTracker(options: { profilerEnabled?: boolean } = {}): {
|
||||
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
|
||||
logIfSlow: (params: {
|
||||
channel: string;
|
||||
messageId?: number | string;
|
||||
sessionKey?: string;
|
||||
outcome: "completed" | "skipped" | "error";
|
||||
reason?: string;
|
||||
}) => void;
|
||||
} {
|
||||
if (!options.profilerEnabled) {
|
||||
// This slow-path splitter was added for latency investigation. Keep it
|
||||
// inert in normal production dispatches so only explicit profiler runs pay
|
||||
// the Date.now/span allocation cost.
|
||||
return {
|
||||
async measure(_name, run) {
|
||||
return await run();
|
||||
},
|
||||
logIfSlow() {},
|
||||
};
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
let didLog = false;
|
||||
const spans: ReplyHotPathTimingSpan[] = [];
|
||||
const toMs = (value: number) => Math.max(0, Math.round(value));
|
||||
const snapshot = (): ReplyHotPathTimingSummary => ({
|
||||
totalMs: toMs(Date.now() - startedAt),
|
||||
spans: spans.slice(),
|
||||
});
|
||||
const shouldLog = (summary: ReplyHotPathTimingSummary) =>
|
||||
summary.totalMs >= REPLY_HOT_PATH_TIMING_WARN_TOTAL_MS ||
|
||||
summary.spans.some((span) => span.durationMs >= REPLY_HOT_PATH_TIMING_WARN_STAGE_MS);
|
||||
const formatSpans = (summary: ReplyHotPathTimingSummary) =>
|
||||
summary.spans.length > 0
|
||||
? summary.spans
|
||||
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
|
||||
.join(",")
|
||||
: "none";
|
||||
return {
|
||||
async measure(name, run) {
|
||||
const spanStartedAt = Date.now();
|
||||
try {
|
||||
return await run();
|
||||
} finally {
|
||||
spans.push({
|
||||
name,
|
||||
durationMs: toMs(Date.now() - spanStartedAt),
|
||||
elapsedMs: toMs(Date.now() - startedAt),
|
||||
});
|
||||
}
|
||||
},
|
||||
logIfSlow(params) {
|
||||
if (didLog) {
|
||||
return;
|
||||
}
|
||||
const summary = snapshot();
|
||||
if (!shouldLog(summary)) {
|
||||
return;
|
||||
}
|
||||
didLog = true;
|
||||
replyHotPathTimingLog.warn(
|
||||
`reply hot path timings channel=${params.channel} messageId=${
|
||||
params.messageId ?? "unknown"
|
||||
} sessionKey=${params.sessionKey ?? "unknown"} outcome=${params.outcome} totalMs=${
|
||||
summary.totalMs
|
||||
} stages=${formatSpans(summary)}${params.reason ? ` reason=${params.reason}` : ""}`,
|
||||
{
|
||||
channel: params.channel,
|
||||
messageId: params.messageId,
|
||||
sessionKey: params.sessionKey,
|
||||
outcome: params.outcome,
|
||||
reason: params.reason,
|
||||
totalMs: summary.totalMs,
|
||||
spans: summary.spans,
|
||||
},
|
||||
);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export type {
|
||||
DispatchFromConfigParams,
|
||||
DispatchFromConfigResult,
|
||||
@@ -920,17 +822,12 @@ export async function dispatchReplyFromConfig(
|
||||
hasSessionKey: Boolean(sessionKey),
|
||||
hasRunId: typeof params.replyOptions?.runId === "string",
|
||||
};
|
||||
const replyHotPathTiming = createReplyHotPathTimingTracker({
|
||||
profilerEnabled: isReplyProfilerEnabled({ config: cfg }),
|
||||
});
|
||||
const traceReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
|
||||
replyHotPathTiming.measure(name, () =>
|
||||
measureDiagnosticsTimelineSpan(name, run, {
|
||||
phase: "agent-turn",
|
||||
config: cfg,
|
||||
attributes: traceAttributes,
|
||||
}),
|
||||
);
|
||||
measureDiagnosticsTimelineSpan(name, run, {
|
||||
phase: "agent-turn",
|
||||
config: cfg,
|
||||
attributes: traceAttributes,
|
||||
});
|
||||
let agentDispatchStartedAt = 0;
|
||||
|
||||
const recordProcessed = (
|
||||
@@ -940,15 +837,6 @@ export async function dispatchReplyFromConfig(
|
||||
error?: string;
|
||||
},
|
||||
) => {
|
||||
if (diagnosticsEnabled) {
|
||||
replyHotPathTiming.logIfSlow({
|
||||
channel,
|
||||
messageId,
|
||||
sessionKey,
|
||||
outcome,
|
||||
reason: opts?.reason,
|
||||
});
|
||||
}
|
||||
messageLifecycle.markProcessed(outcome, opts);
|
||||
};
|
||||
|
||||
@@ -1544,16 +1432,6 @@ export async function dispatchReplyFromConfig(
|
||||
counts: dispatcher.getQueuedCounts(),
|
||||
});
|
||||
};
|
||||
const finishReplyOperationAbortedDispatch = (): DispatchFromConfigResult => {
|
||||
commitInboundDedupeIfClaimed();
|
||||
recordProcessed("completed", { reason: "reply_operation_aborted" });
|
||||
markIdle("message_completed");
|
||||
completeDispatchReplyOperation();
|
||||
return attachSourceReplyDeliveryMode({
|
||||
queuedFinal: false,
|
||||
counts: dispatcher.getQueuedCounts(),
|
||||
});
|
||||
};
|
||||
|
||||
let pluginFallbackReason:
|
||||
| "plugin-bound-fallback-missing-plugin"
|
||||
@@ -1561,9 +1439,6 @@ export async function dispatchReplyFromConfig(
|
||||
| undefined;
|
||||
|
||||
if (pluginOwnedBinding) {
|
||||
if (isPreDispatchOperationAborted()) {
|
||||
return finishReplyOperationAbortedDispatch();
|
||||
}
|
||||
touchConversationBindingRecord(pluginOwnedBinding.bindingId);
|
||||
if (shouldBypassPluginOwnedBindingForCommand(ctx)) {
|
||||
logVerbose(
|
||||
@@ -2137,17 +2012,6 @@ export async function dispatchReplyFromConfig(
|
||||
const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary;
|
||||
const allowSuppressedSourceProgressCallbacks =
|
||||
params.replyOptions?.allowProgressCallbacksWhenSourceDeliverySuppressed === true;
|
||||
let hasPendingDirectBlockReplyDelivery = false;
|
||||
const waitForPendingDirectBlockReplyDelivery = async (abortSignal?: AbortSignal) => {
|
||||
if (!hasPendingDirectBlockReplyDelivery) {
|
||||
return;
|
||||
}
|
||||
// Direct block replies are queued asynchronously so lightweight replies do
|
||||
// not wait for dispatcher idle. Flush only before later tool/progress
|
||||
// callbacks and final completion where external ordering is visible.
|
||||
hasPendingDirectBlockReplyDelivery = false;
|
||||
await waitForReplyDispatcherIdle(dispatcher, abortSignal);
|
||||
};
|
||||
const shouldForwardProgressCallback = (options?: {
|
||||
forwardWhenSourceDeliverySuppressed?: boolean;
|
||||
requiresToolSummaryVisibility?: boolean;
|
||||
@@ -2167,10 +2031,9 @@ export async function dispatchReplyFromConfig(
|
||||
forwardWhenSourceDeliverySuppressed?: boolean;
|
||||
requiresToolSummaryVisibility?: boolean;
|
||||
onForward?: (...args: Args) => void;
|
||||
waitForDirectBlockReplyDelivery?: boolean;
|
||||
},
|
||||
): ((...args: Args) => Promise<void>) | undefined => {
|
||||
if (!callback) {
|
||||
if (!callback && (!suppressAutomaticSourceDelivery || !canTrackSession)) {
|
||||
return undefined;
|
||||
}
|
||||
return async (...args: Args) => {
|
||||
@@ -2178,12 +2041,6 @@ export async function dispatchReplyFromConfig(
|
||||
return;
|
||||
}
|
||||
markProgress();
|
||||
if (options?.waitForDirectBlockReplyDelivery) {
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (shouldForwardProgressCallback(options)) {
|
||||
options?.onForward?.(...args);
|
||||
await callback?.(...args);
|
||||
@@ -2220,12 +2077,10 @@ export async function dispatchReplyFromConfig(
|
||||
onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
}),
|
||||
onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
onForward: (payload) => {
|
||||
if (hasFailedProgressStatus(payload)) {
|
||||
markVisibleToolErrorProgress();
|
||||
@@ -2235,7 +2090,6 @@ export async function dispatchReplyFromConfig(
|
||||
onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
onForward: (payload) => {
|
||||
if (hasFailedProgressStatus(payload)) {
|
||||
markVisibleToolErrorProgress();
|
||||
@@ -2245,20 +2099,14 @@ export async function dispatchReplyFromConfig(
|
||||
onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
}),
|
||||
onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
}),
|
||||
onToolResult: (payload: ReplyPayload) => {
|
||||
markProgress();
|
||||
const run = async () => {
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
@@ -2323,10 +2171,6 @@ export async function dispatchReplyFromConfig(
|
||||
return;
|
||||
}
|
||||
markProgress();
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
if (
|
||||
shouldForwardProgressCallback({
|
||||
@@ -2349,10 +2193,6 @@ export async function dispatchReplyFromConfig(
|
||||
return;
|
||||
}
|
||||
markProgress();
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
if (
|
||||
shouldForwardProgressCallback({
|
||||
@@ -2383,10 +2223,6 @@ export async function dispatchReplyFromConfig(
|
||||
return;
|
||||
}
|
||||
markProgress();
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
if (
|
||||
shouldForwardProgressCallback({
|
||||
@@ -2493,7 +2329,7 @@ export async function dispatchReplyFromConfig(
|
||||
markInboundDedupeReplayUnsafe();
|
||||
const delivered = dispatcher.sendBlockReply(normalizedPayload);
|
||||
if (delivered) {
|
||||
hasPendingDirectBlockReplyDelivery = true;
|
||||
await waitForReplyDispatcherIdle(dispatcher, context?.abortSignal);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -2625,7 +2461,6 @@ export async function dispatchReplyFromConfig(
|
||||
accumulatedBlockTtsText.trim()
|
||||
) {
|
||||
try {
|
||||
await waitForPendingDirectBlockReplyDelivery(getDispatchAbortSignal());
|
||||
throwIfDispatchOperationAborted();
|
||||
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
|
||||
payload: { text: accumulatedBlockTtsText },
|
||||
@@ -2685,7 +2520,6 @@ export async function dispatchReplyFromConfig(
|
||||
}
|
||||
}
|
||||
|
||||
await waitForPendingDirectBlockReplyDelivery(getDispatchAbortSignal());
|
||||
const counts = dispatcher.getQueuedCounts();
|
||||
counts.final += routedFinalCount;
|
||||
commitInboundDedupeIfClaimed();
|
||||
@@ -2703,7 +2537,14 @@ export async function dispatchReplyFromConfig(
|
||||
});
|
||||
} catch (err) {
|
||||
if (isDispatchReplyOperationAbortedError(err)) {
|
||||
return finishReplyOperationAbortedDispatch();
|
||||
commitInboundDedupeIfClaimed();
|
||||
recordProcessed("completed", { reason: "reply_operation_aborted" });
|
||||
markIdle("message_completed");
|
||||
completeDispatchReplyOperation();
|
||||
return attachSourceReplyDeliveryMode({
|
||||
queuedFinal: false,
|
||||
counts: dispatcher.getQueuedCounts(),
|
||||
});
|
||||
}
|
||||
if (inboundDedupeClaim.status === "claimed") {
|
||||
if (inboundDedupeReplayUnsafe) {
|
||||
|
||||
@@ -15,7 +15,6 @@ import { type OpenClawConfig, getRuntimeConfig } from "../../config/config.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { measureDiagnosticsTimelineSpan } from "../../infra/diagnostics-timeline.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { buildAgentHookContextChannelFields } from "../../plugins/hook-agent-context.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
|
||||
@@ -48,7 +47,6 @@ import { hasInboundMedia } from "./inbound-media.js";
|
||||
import { emitPreAgentMessageHooks } from "./message-preprocess-hooks.js";
|
||||
import { createFastTestModelSelectionState, createModelSelectionState } from "./model-selection.js";
|
||||
import { sanitizePendingFinalDeliveryText } from "./pending-final-delivery.js";
|
||||
import { createReplyTimingTracker } from "./reply-timing-tracker.js";
|
||||
import { initSessionState } from "./session.js";
|
||||
import {
|
||||
isStaleHeartbeatAutoFallbackOverride,
|
||||
@@ -91,8 +89,6 @@ const mediaUnderstandingApplyRuntimeLoader = createLazyImportLoader(
|
||||
const linkUnderstandingApplyRuntimeLoader = createLazyImportLoader(
|
||||
() => import("../../link-understanding/apply.runtime.js"),
|
||||
);
|
||||
|
||||
const replyResolverTimingLog = createSubsystemLogger("auto-reply/reply-resolver-timing");
|
||||
const commandsCoreRuntimeLoader = createLazyImportLoader(
|
||||
() => import("./commands-core.runtime.js"),
|
||||
);
|
||||
@@ -218,85 +214,45 @@ export async function getReplyFromConfig(
|
||||
isFastTestEnv,
|
||||
configOverride,
|
||||
});
|
||||
// Profiler spans stay inert unless diagnostics enable `profiler` or
|
||||
// `reply.profiler`, so normal replies do not pay per-stage Date.now/array
|
||||
// bookkeeping while we can still split resolver costs on demand.
|
||||
const resolverTiming = createReplyTimingTracker({ log: replyResolverTimingLog, config: cfg });
|
||||
const useFastTestBootstrap = resolverTiming.measureSync("reply.resolve_fast_test_bootstrap", () =>
|
||||
shouldUseReplyFastTestBootstrap({
|
||||
isFastTestEnv,
|
||||
configOverride,
|
||||
}),
|
||||
);
|
||||
const useFastTestRuntime = resolverTiming.measureSync("reply.resolve_fast_test_runtime", () =>
|
||||
shouldUseReplyFastTestRuntime({
|
||||
cfg,
|
||||
isFastTestEnv,
|
||||
}),
|
||||
);
|
||||
const finalized = resolverTiming.measureSync("reply.finalize_context", () =>
|
||||
finalizeInboundContext(ctx),
|
||||
);
|
||||
const { agentSessionKey, agentId } = resolverTiming.measureSync(
|
||||
"reply.resolve_agent_scope",
|
||||
() => {
|
||||
const targetSessionKey = resolveCommandTurnTargetSessionKey(finalized);
|
||||
const resolvedAgentSessionKey = targetSessionKey || finalized.SessionKey;
|
||||
return {
|
||||
agentSessionKey: resolvedAgentSessionKey,
|
||||
agentId: resolveSessionAgentId({
|
||||
sessionKey: resolvedAgentSessionKey,
|
||||
config: cfg,
|
||||
}),
|
||||
};
|
||||
},
|
||||
);
|
||||
const traceAttributes = resolverTiming.measureSync("reply.resolve_trace_context", () => ({
|
||||
const useFastTestBootstrap = shouldUseReplyFastTestBootstrap({
|
||||
isFastTestEnv,
|
||||
configOverride,
|
||||
});
|
||||
const useFastTestRuntime = shouldUseReplyFastTestRuntime({
|
||||
cfg,
|
||||
isFastTestEnv,
|
||||
});
|
||||
const finalized = finalizeInboundContext(ctx);
|
||||
const targetSessionKey = resolveCommandTurnTargetSessionKey(finalized);
|
||||
const agentSessionKey = targetSessionKey || finalized.SessionKey;
|
||||
const traceAttributes = {
|
||||
surface: normalizeOptionalString(finalized.Surface ?? finalized.Provider) ?? "unknown",
|
||||
hasSessionKey: Boolean(agentSessionKey),
|
||||
isHeartbeat: opts?.isHeartbeat === true,
|
||||
hasMedia: hasInboundMedia(finalized),
|
||||
}));
|
||||
const messageId = finalized.MessageSid ?? finalized.MessageSidFirst ?? finalized.MessageSidLast;
|
||||
let resolverTimingSessionKey = agentSessionKey;
|
||||
const logResolverTiming = (outcome: string, reason?: string, error?: string) =>
|
||||
resolverTiming.logIfSlow({
|
||||
message: `reply resolver timings surface=${traceAttributes.surface} messageId=${
|
||||
messageId ?? "unknown"
|
||||
} sessionKey=${resolverTimingSessionKey ?? "unknown"} agentId=${agentId}`,
|
||||
outcome,
|
||||
reason,
|
||||
error,
|
||||
details: {
|
||||
surface: traceAttributes.surface,
|
||||
messageId,
|
||||
sessionKey: resolverTimingSessionKey,
|
||||
agentId,
|
||||
},
|
||||
});
|
||||
};
|
||||
const traceGetReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
|
||||
resolverTiming.measure(name, () =>
|
||||
measureDiagnosticsTimelineSpan(name, run, {
|
||||
phase: "agent-turn",
|
||||
config: cfg,
|
||||
attributes: traceAttributes,
|
||||
}),
|
||||
);
|
||||
const mergedSkillFilter = resolverTiming.measureSync("reply.resolve_skill_filter", () =>
|
||||
mergeSkillFilters(opts?.skillFilter, resolveAgentSkillsFilter(cfg, agentId)),
|
||||
measureDiagnosticsTimelineSpan(name, run, {
|
||||
phase: "agent-turn",
|
||||
config: cfg,
|
||||
attributes: traceAttributes,
|
||||
});
|
||||
const agentId = resolveSessionAgentId({
|
||||
sessionKey: agentSessionKey,
|
||||
config: cfg,
|
||||
});
|
||||
const mergedSkillFilter = mergeSkillFilters(
|
||||
opts?.skillFilter,
|
||||
resolveAgentSkillsFilter(cfg, agentId),
|
||||
);
|
||||
const resolvedOpts =
|
||||
mergedSkillFilter !== undefined ? { ...opts, skillFilter: mergedSkillFilter } : opts;
|
||||
const agentCfg = cfg.agents?.defaults;
|
||||
const sessionCfg = cfg.session;
|
||||
const { defaultProvider, defaultModel, aliasIndex } = resolverTiming.measureSync(
|
||||
"reply.resolve_default_model",
|
||||
() =>
|
||||
resolveDefaultModel({
|
||||
cfg,
|
||||
agentId,
|
||||
}),
|
||||
);
|
||||
const { defaultProvider, defaultModel, aliasIndex } = resolveDefaultModel({
|
||||
cfg,
|
||||
agentId,
|
||||
});
|
||||
let provider = defaultProvider;
|
||||
let model = defaultModel;
|
||||
let hasResolvedHeartbeatModelOverride = false;
|
||||
@@ -321,34 +277,22 @@ export async function getReplyFromConfig(
|
||||
}
|
||||
}
|
||||
|
||||
const { workspaceDirRaw, workspaceDirForNativeCommand, agentDir, timeoutMs } =
|
||||
resolverTiming.measureSync("reply.resolve_workspace_agent_dir", () => {
|
||||
const workspaceDirRaw = resolveAgentWorkspaceDir(cfg, agentId) ?? DEFAULT_AGENT_WORKSPACE_DIR;
|
||||
return {
|
||||
workspaceDirRaw,
|
||||
workspaceDirForNativeCommand: workspaceDirRaw,
|
||||
agentDir: resolveAgentDir(cfg, agentId),
|
||||
timeoutMs: resolveAgentTimeoutMs({
|
||||
cfg,
|
||||
overrideSeconds: opts?.timeoutOverrideSeconds,
|
||||
}),
|
||||
};
|
||||
});
|
||||
const typing = resolverTiming.measureSync("reply.create_typing_controller", () => {
|
||||
const configuredTypingSeconds =
|
||||
agentCfg?.typingIntervalSeconds ?? sessionCfg?.typingIntervalSeconds;
|
||||
const typingIntervalSeconds =
|
||||
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6;
|
||||
const controller = createTypingController({
|
||||
onReplyStart: opts?.onReplyStart,
|
||||
onCleanup: opts?.onTypingCleanup,
|
||||
typingIntervalSeconds,
|
||||
silentToken: SILENT_REPLY_TOKEN,
|
||||
log: defaultRuntime.log,
|
||||
});
|
||||
opts?.onTypingController?.(controller);
|
||||
return controller;
|
||||
const workspaceDirRaw = resolveAgentWorkspaceDir(cfg, agentId) ?? DEFAULT_AGENT_WORKSPACE_DIR;
|
||||
const workspaceDirForNativeCommand = workspaceDirRaw;
|
||||
const agentDir = resolveAgentDir(cfg, agentId);
|
||||
const timeoutMs = resolveAgentTimeoutMs({ cfg, overrideSeconds: opts?.timeoutOverrideSeconds });
|
||||
const configuredTypingSeconds =
|
||||
agentCfg?.typingIntervalSeconds ?? sessionCfg?.typingIntervalSeconds;
|
||||
const typingIntervalSeconds =
|
||||
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6;
|
||||
const typing = createTypingController({
|
||||
onReplyStart: opts?.onReplyStart,
|
||||
onCleanup: opts?.onTypingCleanup,
|
||||
typingIntervalSeconds,
|
||||
silentToken: SILENT_REPLY_TOKEN,
|
||||
log: defaultRuntime.log,
|
||||
});
|
||||
opts?.onTypingController?.(typing);
|
||||
|
||||
const nativeSlashCommandFastReply = await traceGetReplyPhase(
|
||||
"reply.native_slash_command_fast_path",
|
||||
@@ -372,7 +316,6 @@ export async function getReplyFromConfig(
|
||||
}),
|
||||
);
|
||||
if (nativeSlashCommandFastReply.handled) {
|
||||
logResolverTiming("completed", "native_slash_command_fast_path");
|
||||
return nativeSlashCommandFastReply.reply;
|
||||
}
|
||||
|
||||
@@ -447,7 +390,6 @@ export async function getReplyFromConfig(
|
||||
triggerBodyNormalized,
|
||||
bodyStripped,
|
||||
} = sessionState;
|
||||
resolverTimingSessionKey = sessionKey ?? resolverTimingSessionKey;
|
||||
|
||||
if (sessionEntry?.pendingFinalDelivery && sessionEntry.pendingFinalDeliveryText) {
|
||||
const text = sanitizePendingFinalDeliveryText(sessionEntry.pendingFinalDeliveryText);
|
||||
@@ -513,7 +455,6 @@ export async function getReplyFromConfig(
|
||||
}),
|
||||
});
|
||||
}
|
||||
logResolverTiming("completed", "pending_final_delivery_replay");
|
||||
return { text: replayText };
|
||||
}
|
||||
}
|
||||
@@ -638,8 +579,7 @@ export async function getReplyFromConfig(
|
||||
triggerBodyNormalized,
|
||||
commandAuthorized,
|
||||
});
|
||||
logResolverTiming("milestone", "before_fast_directive_prepared_reply");
|
||||
const fastReplyResult = await traceGetReplyPhase("reply.run_prepared_reply", () =>
|
||||
return await traceGetReplyPhase("reply.run_prepared_reply", () =>
|
||||
runPreparedReply({
|
||||
ctx,
|
||||
sessionCtx,
|
||||
@@ -696,8 +636,6 @@ export async function getReplyFromConfig(
|
||||
autoFallbackPrimaryProbe,
|
||||
}),
|
||||
);
|
||||
logResolverTiming("completed", "fast_directive_prepared_reply");
|
||||
return fastReplyResult;
|
||||
}
|
||||
|
||||
const directiveResult = await traceGetReplyPhase("reply.resolve_directives", () =>
|
||||
@@ -733,7 +671,6 @@ export async function getReplyFromConfig(
|
||||
}),
|
||||
);
|
||||
if (directiveResult.kind === "reply") {
|
||||
logResolverTiming("completed", "directive_reply");
|
||||
return directiveResult.reply;
|
||||
}
|
||||
|
||||
@@ -834,7 +771,6 @@ export async function getReplyFromConfig(
|
||||
);
|
||||
if (inlineActionResult.kind === "reply") {
|
||||
await maybeEmitMissingResetHooks();
|
||||
logResolverTiming("completed", "inline_action_reply");
|
||||
return inlineActionResult.reply;
|
||||
}
|
||||
await maybeEmitMissingResetHooks();
|
||||
@@ -926,7 +862,6 @@ export async function getReplyFromConfig(
|
||||
),
|
||||
);
|
||||
if (hookResult?.handled) {
|
||||
logResolverTiming("completed", "before_agent_reply_hook");
|
||||
return hookResult.reply ?? { text: SILENT_REPLY_TOKEN };
|
||||
}
|
||||
}
|
||||
@@ -949,8 +884,7 @@ export async function getReplyFromConfig(
|
||||
);
|
||||
}
|
||||
|
||||
logResolverTiming("milestone", "before_run_prepared_reply");
|
||||
const replyResult = await traceGetReplyPhase("reply.run_prepared_reply", () =>
|
||||
return await traceGetReplyPhase("reply.run_prepared_reply", () =>
|
||||
runPreparedReply({
|
||||
ctx,
|
||||
sessionCtx,
|
||||
@@ -997,6 +931,4 @@ export async function getReplyFromConfig(
|
||||
autoFallbackPrimaryProbe: runAutoFallbackPrimaryProbe,
|
||||
}),
|
||||
);
|
||||
logResolverTiming("completed", "prepared_reply");
|
||||
return replyResult;
|
||||
}
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { createReplyTimingTracker, isReplyProfilerEnabled } from "./reply-timing-tracker.js";
|
||||
|
||||
describe("isReplyProfilerEnabled", () => {
|
||||
it("matches global and reply profiler diagnostic flags", () => {
|
||||
const cfg = { diagnostics: { flags: ["reply.profiler"] } } as OpenClawConfig;
|
||||
expect(isReplyProfilerEnabled({ config: cfg, env: {} as NodeJS.ProcessEnv })).toBe(true);
|
||||
expect(
|
||||
isReplyProfilerEnabled({
|
||||
env: { OPENCLAW_DIAGNOSTICS: "profiler" } as NodeJS.ProcessEnv,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createReplyTimingTracker", () => {
|
||||
it("is a pass-through tracker unless the profiler flag is enabled", async () => {
|
||||
const warn = vi.fn();
|
||||
const tracker = createReplyTimingTracker({ log: { warn } });
|
||||
|
||||
expect(tracker.measureSync("sync", () => 42)).toBe(42);
|
||||
await expect(tracker.measure("async", async () => "ok")).resolves.toBe("ok");
|
||||
tracker.logIfSlow({ message: "reply timings" });
|
||||
|
||||
expect(warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("records and logs spans when the profiler flag is enabled", () => {
|
||||
const warn = vi.fn();
|
||||
const tracker = createReplyTimingTracker({
|
||||
log: { warn },
|
||||
env: { OPENCLAW_DIAGNOSTICS: "reply.profiler" } as NodeJS.ProcessEnv,
|
||||
totalWarnMs: 0,
|
||||
stageWarnMs: 0,
|
||||
});
|
||||
|
||||
expect(tracker.measureSync("sync", () => 7)).toBe(7);
|
||||
tracker.logIfSlow({ message: "reply timings", outcome: "completed" });
|
||||
|
||||
expect(warn).toHaveBeenCalledOnce();
|
||||
expect(warn.mock.calls[0]?.[0]).toContain("stages=sync:");
|
||||
expect(warn.mock.calls[0]?.[1]).toMatchObject({
|
||||
outcome: "completed",
|
||||
spans: [expect.objectContaining({ name: "sync" })],
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,141 +0,0 @@
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { isDiagnosticFlagEnabled } from "../../infra/diagnostic-flags.js";
|
||||
|
||||
type ReplyTimingSpan = {
|
||||
name: string;
|
||||
durationMs: number;
|
||||
elapsedMs: number;
|
||||
};
|
||||
|
||||
type ReplyTimingSummary = {
|
||||
totalMs: number;
|
||||
spans: ReplyTimingSpan[];
|
||||
};
|
||||
|
||||
type ReplyTimingLogger = {
|
||||
warn: (message: string, details?: Record<string, unknown>) => void;
|
||||
};
|
||||
|
||||
type ReplyTimingTracker = {
|
||||
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
|
||||
measureSync: <T>(name: string, run: () => T) => T;
|
||||
logIfSlow: (params: {
|
||||
message: string;
|
||||
outcome?: string;
|
||||
reason?: string;
|
||||
error?: string;
|
||||
details?: Record<string, unknown>;
|
||||
}) => void;
|
||||
};
|
||||
|
||||
const DEFAULT_TIMING_WARN_TOTAL_MS = 1_000;
|
||||
const DEFAULT_TIMING_WARN_STAGE_MS = 500;
|
||||
|
||||
export function isReplyProfilerEnabled(params?: {
|
||||
config?: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
}): boolean {
|
||||
const cfg = params?.config;
|
||||
const env = params?.env ?? process.env;
|
||||
return (
|
||||
isDiagnosticFlagEnabled("profiler", cfg, env) ||
|
||||
isDiagnosticFlagEnabled("reply.profiler", cfg, env)
|
||||
);
|
||||
}
|
||||
|
||||
export function createReplyTimingTracker(params: {
|
||||
log: ReplyTimingLogger;
|
||||
config?: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
enabled?: boolean;
|
||||
totalWarnMs?: number;
|
||||
stageWarnMs?: number;
|
||||
}): ReplyTimingTracker {
|
||||
const enabled =
|
||||
params.enabled ?? isReplyProfilerEnabled({ config: params.config, env: params.env });
|
||||
if (!enabled) {
|
||||
// Normal production turns use pass-through wrappers so added profiling
|
||||
// calls do not allocate spans or call Date.now on the hot reply path.
|
||||
return {
|
||||
async measure(_name, run) {
|
||||
return await run();
|
||||
},
|
||||
measureSync(_name, run) {
|
||||
return run();
|
||||
},
|
||||
logIfSlow() {},
|
||||
};
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
const spans: ReplyTimingSpan[] = [];
|
||||
let didLog = false;
|
||||
const totalWarnMs = params.totalWarnMs ?? DEFAULT_TIMING_WARN_TOTAL_MS;
|
||||
const stageWarnMs = params.stageWarnMs ?? DEFAULT_TIMING_WARN_STAGE_MS;
|
||||
const toMs = (value: number) => Math.max(0, Math.round(value));
|
||||
const record = (name: string, spanStartedAt: number) => {
|
||||
spans.push({
|
||||
name,
|
||||
durationMs: toMs(Date.now() - spanStartedAt),
|
||||
elapsedMs: toMs(Date.now() - startedAt),
|
||||
});
|
||||
};
|
||||
const snapshot = (): ReplyTimingSummary => ({
|
||||
totalMs: toMs(Date.now() - startedAt),
|
||||
spans: spans.slice(),
|
||||
});
|
||||
const shouldLog = (summary: ReplyTimingSummary) =>
|
||||
summary.totalMs >= totalWarnMs || summary.spans.some((span) => span.durationMs >= stageWarnMs);
|
||||
const formatSpans = (summary: ReplyTimingSummary) =>
|
||||
summary.spans.length > 0
|
||||
? summary.spans
|
||||
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
|
||||
.join(",")
|
||||
: "none";
|
||||
|
||||
return {
|
||||
async measure(name, run) {
|
||||
const spanStartedAt = Date.now();
|
||||
try {
|
||||
return await run();
|
||||
} finally {
|
||||
record(name, spanStartedAt);
|
||||
}
|
||||
},
|
||||
measureSync(name, run) {
|
||||
const spanStartedAt = Date.now();
|
||||
try {
|
||||
return run();
|
||||
} finally {
|
||||
record(name, spanStartedAt);
|
||||
}
|
||||
},
|
||||
logIfSlow(logParams) {
|
||||
if (didLog) {
|
||||
return;
|
||||
}
|
||||
const summary = snapshot();
|
||||
if (!shouldLog(summary)) {
|
||||
return;
|
||||
}
|
||||
didLog = true;
|
||||
const suffix = [
|
||||
`totalMs=${summary.totalMs}`,
|
||||
`stages=${formatSpans(summary)}`,
|
||||
logParams.outcome ? `outcome=${logParams.outcome}` : undefined,
|
||||
logParams.reason ? `reason=${logParams.reason}` : undefined,
|
||||
logParams.error ? `error="${logParams.error}"` : undefined,
|
||||
]
|
||||
.filter(Boolean)
|
||||
.join(" ");
|
||||
params.log.warn(`${logParams.message} ${suffix}`, {
|
||||
...logParams.details,
|
||||
outcome: logParams.outcome,
|
||||
reason: logParams.reason,
|
||||
error: logParams.error,
|
||||
totalMs: summary.totalMs,
|
||||
spans: summary.spans,
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -133,24 +133,6 @@ describe("startGatewayMaintenanceTimers", () => {
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("refreshes automatic health snapshots without live channel probes", async () => {
|
||||
vi.useFakeTimers();
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
const deps = createMaintenanceTimerDeps();
|
||||
deps.refreshGatewayHealthSnapshot = vi.fn(async () => ({ ok: true }) as HealthSummary);
|
||||
|
||||
const timers = startGatewayMaintenanceTimers(deps);
|
||||
|
||||
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenCalledWith({ probe: false });
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
|
||||
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenCalledTimes(2);
|
||||
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenLastCalledWith({ probe: false });
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("skips overlapping media cleanup runs", async () => {
|
||||
vi.useFakeTimers();
|
||||
let resolveCleanup = () => {};
|
||||
|
||||
@@ -72,17 +72,16 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
params.nodeSendToAllSubscribed("tick", payload);
|
||||
}, TICK_INTERVAL_MS);
|
||||
|
||||
// Keep cached health warm without request-time live channel probes. Explicit
|
||||
// status/doctor probe paths still pass probe=true when the operator asks.
|
||||
// periodic health refresh to keep cached snapshot warm
|
||||
const healthInterval = setInterval(() => {
|
||||
void params
|
||||
.refreshGatewayHealthSnapshot({ probe: false })
|
||||
.refreshGatewayHealthSnapshot({ probe: true })
|
||||
.catch((err) => params.logHealth.error(`refresh failed: ${formatError(err)}`));
|
||||
}, HEALTH_REFRESH_INTERVAL_MS);
|
||||
|
||||
// Prime cache so first client gets a snapshot without waiting.
|
||||
void params
|
||||
.refreshGatewayHealthSnapshot({ probe: false })
|
||||
.refreshGatewayHealthSnapshot({ probe: true })
|
||||
.catch((err) => params.logHealth.error(`initial refresh failed: ${formatError(err)}`));
|
||||
|
||||
// dedupe cache cleanup
|
||||
|
||||
@@ -199,13 +199,6 @@ function deliveryCall(index = 0): Record<string, any> | undefined {
|
||||
return calls[index]?.[0];
|
||||
}
|
||||
|
||||
function appendTranscriptCall(index = 0): Record<string, any> | undefined {
|
||||
const calls = mocks.appendAssistantMessageToSessionTranscript.mock.calls as unknown as Array<
|
||||
[Record<string, any>]
|
||||
>;
|
||||
return calls[index]?.[0];
|
||||
}
|
||||
|
||||
function firstRespondCall(respond: ReturnType<typeof vi.fn>) {
|
||||
const calls = respond.mock.calls as unknown as Array<
|
||||
[
|
||||
@@ -1462,178 +1455,6 @@ describe("gateway send mirroring", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("waits for source transcript mirroring before responding to message.action", async () => {
|
||||
const telegramPlugin: ChannelPlugin = {
|
||||
id: "telegram",
|
||||
meta: {
|
||||
id: "telegram",
|
||||
label: "Telegram",
|
||||
selectionLabel: "Telegram",
|
||||
docsPath: "/channels/telegram",
|
||||
blurb: "Telegram async source send transcript mirror test plugin.",
|
||||
},
|
||||
capabilities: { chatTypes: ["direct"] },
|
||||
config: {
|
||||
listAccountIds: () => ["default"],
|
||||
resolveAccount: () => ({ enabled: true }),
|
||||
isConfigured: () => true,
|
||||
},
|
||||
actions: {
|
||||
describeMessageTool: () => ({ actions: ["send"] }),
|
||||
supportsAction: ({ action }) => action === "send",
|
||||
handleAction: async () => jsonResult({ ok: true, messageId: "tg-async-1" }),
|
||||
},
|
||||
};
|
||||
const mirrorDeferred = createDeferred<{ ok: boolean; sessionFile: string }>();
|
||||
mocks.getChannelPlugin.mockReturnValue(telegramPlugin);
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]),
|
||||
"send-test-source-message-action-async-mirror",
|
||||
);
|
||||
mocks.dispatchChannelMessageAction.mockResolvedValueOnce(
|
||||
jsonResult({ ok: true, messageId: "tg-async-1" }),
|
||||
);
|
||||
mocks.appendAssistantMessageToSessionTranscript.mockReturnValueOnce(mirrorDeferred.promise);
|
||||
|
||||
const respond = vi.fn();
|
||||
const request = sendHandlers["message.action"]({
|
||||
params: {
|
||||
channel: "telegram",
|
||||
action: "send",
|
||||
params: {
|
||||
to: "chat-123",
|
||||
message: "visible media caption",
|
||||
},
|
||||
sessionKey: "agent:main:telegram:direct:chat-123",
|
||||
agentId: "main",
|
||||
toolContext: {
|
||||
currentChannelProvider: "telegram",
|
||||
currentChannelId: "chat-123",
|
||||
},
|
||||
idempotencyKey: "idem-async-source-message-action",
|
||||
} as never,
|
||||
respond,
|
||||
context: makeContext(),
|
||||
req: { type: "req", id: "1", method: "message.action" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(respond).not.toHaveBeenCalled();
|
||||
|
||||
mirrorDeferred.resolve({ ok: true, sessionFile: "x" });
|
||||
await request;
|
||||
|
||||
expect(firstRespondCall(respond)[0]).toBe(true);
|
||||
});
|
||||
|
||||
it("preserves source transcript mirror order before message.action responses", async () => {
|
||||
const telegramPlugin: ChannelPlugin = {
|
||||
id: "telegram",
|
||||
meta: {
|
||||
id: "telegram",
|
||||
label: "Telegram",
|
||||
selectionLabel: "Telegram",
|
||||
docsPath: "/channels/telegram",
|
||||
blurb: "Telegram ordered async source send transcript mirror test plugin.",
|
||||
},
|
||||
capabilities: { chatTypes: ["direct"] },
|
||||
config: {
|
||||
listAccountIds: () => ["default"],
|
||||
resolveAccount: () => ({ enabled: true }),
|
||||
isConfigured: () => true,
|
||||
},
|
||||
actions: {
|
||||
describeMessageTool: () => ({ actions: ["send"] }),
|
||||
supportsAction: ({ action }) => action === "send",
|
||||
handleAction: async () => jsonResult({ ok: true, messageId: "tg-ordered" }),
|
||||
},
|
||||
};
|
||||
const firstMirrorDeferred = createDeferred<{ ok: boolean; sessionFile: string }>();
|
||||
mocks.getChannelPlugin.mockReturnValue(telegramPlugin);
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]),
|
||||
"send-test-source-message-action-ordered-async-mirror",
|
||||
);
|
||||
mocks.dispatchChannelMessageAction.mockResolvedValue(
|
||||
jsonResult({ ok: true, messageId: "tg-ordered" }),
|
||||
);
|
||||
mocks.appendAssistantMessageToSessionTranscript
|
||||
.mockReturnValueOnce(firstMirrorDeferred.promise)
|
||||
.mockResolvedValueOnce({ ok: true, sessionFile: "x" });
|
||||
|
||||
const firstRespond = vi.fn();
|
||||
const secondRespond = vi.fn();
|
||||
const first = sendHandlers["message.action"]({
|
||||
params: {
|
||||
channel: "telegram",
|
||||
action: "send",
|
||||
params: {
|
||||
to: "chat-123",
|
||||
message: "first visible reply",
|
||||
},
|
||||
sessionKey: "agent:main:telegram:direct:chat-123",
|
||||
agentId: "main",
|
||||
toolContext: {
|
||||
currentChannelProvider: "telegram",
|
||||
currentChannelId: "chat-123",
|
||||
},
|
||||
idempotencyKey: "idem-ordered-source-message-action-1",
|
||||
} as never,
|
||||
respond: firstRespond,
|
||||
context: makeContext(),
|
||||
req: { type: "req", id: "1", method: "message.action" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
await vi.waitFor(() => {
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
const second = sendHandlers["message.action"]({
|
||||
params: {
|
||||
channel: "telegram",
|
||||
action: "send",
|
||||
params: {
|
||||
to: "chat-123",
|
||||
message: "second visible reply",
|
||||
},
|
||||
sessionKey: "agent:main:telegram:direct:chat-123",
|
||||
agentId: "main",
|
||||
toolContext: {
|
||||
currentChannelProvider: "telegram",
|
||||
currentChannelId: "chat-123",
|
||||
},
|
||||
idempotencyKey: "idem-ordered-source-message-action-2",
|
||||
} as never,
|
||||
respond: secondRespond,
|
||||
context: makeContext(),
|
||||
req: { type: "req", id: "2", method: "message.action" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
|
||||
expect(firstRespond).not.toHaveBeenCalled();
|
||||
expect(secondRespond).not.toHaveBeenCalled();
|
||||
expect(appendTranscriptCall(0)).toEqual(
|
||||
expect.objectContaining({ text: "first visible reply" }),
|
||||
);
|
||||
|
||||
firstMirrorDeferred.resolve({ ok: true, sessionFile: "x" });
|
||||
await first;
|
||||
await second;
|
||||
|
||||
expect(firstRespondCall(firstRespond)[0]).toBe(true);
|
||||
expect(firstRespondCall(secondRespond)[0]).toBe(true);
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(2);
|
||||
expect(appendTranscriptCall(1)).toEqual(
|
||||
expect.objectContaining({ text: "second visible reply" }),
|
||||
);
|
||||
});
|
||||
|
||||
it("mirrors presentation-only source-conversation message.action sends", async () => {
|
||||
const telegramPlugin: ChannelPlugin = {
|
||||
id: "telegram",
|
||||
|
||||
@@ -288,37 +288,6 @@ async function mirrorDeliveredSourceReplyToTranscriptBestEffort(params: {
|
||||
}
|
||||
}
|
||||
|
||||
const sourceReplyTranscriptMirrorQueues = new Map<string, Promise<void>>();
|
||||
|
||||
function resolveSourceReplyTranscriptMirrorQueueKey(
|
||||
mirror: Parameters<typeof mirrorDeliveredSourceReplyToTranscript>[0],
|
||||
): string {
|
||||
return mirror.sessionKey?.trim() || "__global__";
|
||||
}
|
||||
|
||||
function scheduleDeliveredSourceReplyTranscriptMirror(params: {
|
||||
context: GatewayRequestContext;
|
||||
mirror: Parameters<typeof mirrorDeliveredSourceReplyToTranscript>[0];
|
||||
}): Promise<void> {
|
||||
const queueKey = resolveSourceReplyTranscriptMirrorQueueKey(params.mirror);
|
||||
const previous = sourceReplyTranscriptMirrorQueues.get(queueKey);
|
||||
// Queue per session so current-conversation source replies are visible before
|
||||
// a following turn can read the transcript.
|
||||
const queued = (async () => {
|
||||
await previous?.catch(() => undefined);
|
||||
await mirrorDeliveredSourceReplyToTranscriptBestEffort(params);
|
||||
})();
|
||||
sourceReplyTranscriptMirrorQueues.set(queueKey, queued);
|
||||
void queued
|
||||
.finally(() => {
|
||||
if (sourceReplyTranscriptMirrorQueues.get(queueKey) === queued) {
|
||||
sourceReplyTranscriptMirrorQueues.delete(queueKey);
|
||||
}
|
||||
})
|
||||
.catch(() => undefined);
|
||||
return queued;
|
||||
}
|
||||
|
||||
export const sendHandlers: GatewayRequestHandlers = {
|
||||
"message.action": async ({ params, respond, context, client }) => {
|
||||
const p = params;
|
||||
@@ -430,7 +399,7 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
const agentId =
|
||||
normalizeOptionalString(request.agentId) ??
|
||||
(sessionKey ? resolveSessionAgentId({ sessionKey, config: cfg }) : undefined);
|
||||
await scheduleDeliveredSourceReplyTranscriptMirror({
|
||||
await mirrorDeliveredSourceReplyToTranscriptBestEffort({
|
||||
context,
|
||||
mirror: {
|
||||
action: request.action,
|
||||
|
||||
@@ -224,7 +224,7 @@ describe("attachGatewayWsMessageHandler post-connect health refresh", () => {
|
||||
expect(hello.ok).toBe(true);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: false });
|
||||
expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: true });
|
||||
});
|
||||
resolveRefresh?.();
|
||||
});
|
||||
|
||||
@@ -1776,10 +1776,7 @@ export function attachGatewayWsMessageHandler(params: GatewayWsMessageHandlerPar
|
||||
presence: snapshot.presence.length,
|
||||
stateVersion: snapshot.stateVersion.presence,
|
||||
});
|
||||
// Post-connect refresh only needs a cached/config snapshot for UI state;
|
||||
// live channel probes here pulled slow Discord/Telegram HTTP checks into
|
||||
// reply-adjacent websocket handshakes.
|
||||
void refreshHealthSnapshot({ probe: false }).catch((err) =>
|
||||
void refreshHealthSnapshot({ probe: true }).catch((err) =>
|
||||
logHealth.error(`post-connect health refresh failed: ${formatError(err)}`),
|
||||
);
|
||||
return;
|
||||
|
||||
@@ -250,78 +250,6 @@ describe("heartbeat runner skips when target session lane is busy", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("returns requests-in-flight when another session for the same agent has an active reply run", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg = createHeartbeatTelegramConfig();
|
||||
await seedHeartbeatTelegramSession(storePath, cfg);
|
||||
const listActiveReplyRunSessionKeys = vi.fn(() => [
|
||||
"agent:main:telegram:group:-1003966283270:topic:547",
|
||||
]);
|
||||
|
||||
const result = await runHeartbeatOnce({
|
||||
cfg,
|
||||
deps: {
|
||||
getQueueSize: vi.fn((_lane?: string) => 0),
|
||||
listActiveReplyRunSessionKeys,
|
||||
nowMs: () => Date.now(),
|
||||
getReplyFromConfig: replySpy,
|
||||
} as HeartbeatDeps,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT });
|
||||
expect(listActiveReplyRunSessionKeys).toHaveBeenCalledOnce();
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores unscoped active reply runs when checking same-agent heartbeat work", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg = createHeartbeatTelegramConfig();
|
||||
await seedHeartbeatTelegramSession(storePath, cfg);
|
||||
const listActiveReplyRunSessionKeys = vi.fn(() => ["legacy-session-key"]);
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
const result = await runHeartbeatOnce({
|
||||
cfg,
|
||||
deps: {
|
||||
getQueueSize: vi.fn((_lane?: string) => 0),
|
||||
listActiveReplyRunSessionKeys,
|
||||
nowMs: () => Date.now(),
|
||||
getReplyFromConfig: replySpy,
|
||||
} as HeartbeatDeps,
|
||||
});
|
||||
|
||||
expect(result.status).toBe("ran");
|
||||
expect(listActiveReplyRunSessionKeys).toHaveBeenCalledOnce();
|
||||
expect(replySpy).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
it("does not defer immediate heartbeat wakes for another active session", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg = createHeartbeatTelegramConfig();
|
||||
await seedHeartbeatTelegramSession(storePath, cfg);
|
||||
const listActiveReplyRunSessionKeys = vi.fn(() => [
|
||||
"agent:main:telegram:group:-1003966283270:topic:547",
|
||||
]);
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
const result = await runHeartbeatOnce({
|
||||
cfg,
|
||||
intent: "immediate",
|
||||
deps: {
|
||||
getQueueSize: vi.fn((_lane?: string) => 0),
|
||||
listActiveReplyRunSessionKeys,
|
||||
nowMs: () => Date.now(),
|
||||
getReplyFromConfig: replySpy,
|
||||
} as HeartbeatDeps,
|
||||
});
|
||||
|
||||
expect(result.status).toBe("ran");
|
||||
expect(replySpy).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
it("does not defer on a recent heartbeat ack pending final delivery", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg = createHeartbeatTelegramConfig();
|
||||
|
||||
@@ -36,10 +36,7 @@ import {
|
||||
} from "../auto-reply/heartbeat.js";
|
||||
import { replaceGenericExternalRunFailureText } from "../auto-reply/reply/agent-runner-failure-copy.js";
|
||||
import { resolveDefaultModel } from "../auto-reply/reply/directive-handling.defaults.js";
|
||||
import {
|
||||
listActiveReplyRunSessionKeys,
|
||||
replyRunRegistry,
|
||||
} from "../auto-reply/reply/reply-run-registry.js";
|
||||
import { replyRunRegistry } from "../auto-reply/reply/reply-run-registry.js";
|
||||
import { resolveResponsePrefixTemplate } from "../auto-reply/reply/response-prefix-template.js";
|
||||
import { HEARTBEAT_TOKEN } from "../auto-reply/tokens.js";
|
||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||
@@ -151,7 +148,6 @@ export type HeartbeatDeps = OutboundSendDeps &
|
||||
getQueueSize?: (lane?: string) => number;
|
||||
getCommandLaneSnapshots?: () => readonly CommandLaneSnapshot[];
|
||||
isReplyRunActive?: (sessionKey: string) => boolean;
|
||||
listActiveReplyRunSessionKeys?: () => readonly string[];
|
||||
nowMs?: () => number;
|
||||
};
|
||||
|
||||
@@ -225,17 +221,6 @@ function hasAgentOptInBusyLaneWork(
|
||||
return hasQueuedWorkInLaneSnapshots(getSnapshots(), (lane) => laneBelongsToAgent(lane, agentId));
|
||||
}
|
||||
|
||||
function hasActiveReplyRunForAgent(
|
||||
agentId: string,
|
||||
listSessionKeys: () => readonly string[],
|
||||
): boolean {
|
||||
const normalizedAgentId = normalizeAgentId(agentId);
|
||||
return listSessionKeys().some((sessionKey) => {
|
||||
const parsed = parseAgentSessionKey(sessionKey);
|
||||
return parsed ? normalizeAgentId(parsed.agentId) === normalizedAgentId : false;
|
||||
});
|
||||
}
|
||||
|
||||
function resolveHeartbeatChannelPlugin(channel: string): ChannelPlugin | undefined {
|
||||
const activePlugin = getActivePluginChannelRegistry()?.channels.find(
|
||||
(entry) => entry.plugin.id === channel,
|
||||
@@ -1360,21 +1345,6 @@ export async function runHeartbeatOnce(opts: {
|
||||
return { status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY };
|
||||
}
|
||||
|
||||
const shouldHonorActiveReplyRuns = opts.intent !== "immediate" && opts.intent !== "manual";
|
||||
const listActiveReplyRuns =
|
||||
opts.deps?.listActiveReplyRunSessionKeys ?? listActiveReplyRunSessionKeys;
|
||||
// Scheduled heartbeats are background work, so defer them when any session on
|
||||
// the same agent is already replying; immediate/manual wakes keep their
|
||||
// existing semantics for explicit user/system actions.
|
||||
if (shouldHonorActiveReplyRuns && hasActiveReplyRunForAgent(agentId, listActiveReplyRuns)) {
|
||||
emitHeartbeatEvent({
|
||||
status: "skipped",
|
||||
reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
|
||||
durationMs: Date.now() - startedAt,
|
||||
});
|
||||
return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT };
|
||||
}
|
||||
|
||||
// Phase 2: Stronger heartbeat deferral while a final delivery replay is pending.
|
||||
// Plain `updatedAt` changes are normal for heartbeat sessions and should not
|
||||
// suppress heartbeat runs; only defer when final delivery recovery is active.
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
@@ -421,30 +420,4 @@ describe("bun global install smoke", () => {
|
||||
expect(releaseChecks).toContain("uses: ./.github/workflows/install-smoke.yml");
|
||||
expect(releaseChecks).toContain("run_bun_global_install_smoke: true");
|
||||
});
|
||||
|
||||
it("kills Bun global install smoke commands that ignore TERM after timeout", () => {
|
||||
const result = spawnSync(
|
||||
process.execPath,
|
||||
[
|
||||
BUN_GLOBAL_ASSERTIONS_PATH,
|
||||
"run-with-timeout",
|
||||
"50",
|
||||
process.execPath,
|
||||
"-e",
|
||||
"process.on('SIGTERM', () => {}); setInterval(() => {}, 1000);",
|
||||
],
|
||||
{
|
||||
encoding: "utf8",
|
||||
env: {
|
||||
...process.env,
|
||||
OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS: "50",
|
||||
},
|
||||
timeout: 5000,
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.error).toBeUndefined();
|
||||
expect(result.status).toBe(1);
|
||||
expect(result.stderr).toContain(`command timed out after 50ms: ${process.execPath}`);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user