Compare commits

..

3 Commits

Author SHA1 Message Date
Keshav's Bot
a4bf05cf1b fix(reply): reduce visible reply delivery latency 2026-05-26 19:13:00 +01:00
Peter Steinberger
b56ddcc6ff ci: use supported codex mini live target 2026-05-26 19:04:43 +01:00
Vincent Koc
8c6537b8c1 fix(ci): kill wedged bun smoke commands 2026-05-26 20:04:12 +02:00
28 changed files with 2177 additions and 1328 deletions

View File

@@ -29,6 +29,7 @@ Docs: https://docs.openclaw.ai
- CI: bound Docker/Bash E2E tarball npm installs with `OPENCLAW_E2E_NPM_INSTALL_TIMEOUT` so package, onboarding, plugin, and upgrade lanes fail instead of hanging on a stuck npm install.
- CI: keep `OPENCLAW_TESTBOX=1 pnpm check:changed` delegating to Blacksmith Testbox through Crabbox without forwarding local Testbox or worker env into the remote command.
- CI: send KILL after the TERM grace period for manual checkout fetch timeouts so stuck Testbox and workflow checkout retries cannot hang behind a wedged `git fetch`.
- CI: send KILL after the TERM grace period for Bun global install smoke command timeouts so trapped `openclaw` child processes cannot wedge the scheduled install smoke.
- iMessage: thread current channel/account inbound attachment roots into the image tool so iMessage-saved attachments under `~/Library/Messages/Attachments` (including the wildcard `/Users/*/Library/Messages/Attachments` root) are read through the existing inbound path policy instead of being rejected as `path-not-allowed`. Literal `localRoots` stays workspace-scoped. Fixes #30170. (#86569)
- QQ Bot: respect `OPENCLAW_HOME` for outbound media path resolution so `<qqmedia>` sends no longer silently fail when `HOME` and `OPENCLAW_HOME` differ (Docker / multi-user hosts). Persisted QQ Bot data (sessions, known users, refs) stays anchored on the OS home for upgrade compatibility. Fixes #83562. Thanks @sliverp.
- Update: report the primary malformed `openclaw.extensions` payload error without adding a duplicate missing-main diagnostic. (#86596) Thanks @ferminquant.

View File

@@ -50,50 +50,6 @@ Disable all flags:
OPENCLAW_DIAGNOSTICS=0
```
`OPENCLAW_DIAGNOSTICS=0` is a process-level disable override: it disables
flags from both env and config for that process.
## Profiling flags
Profiler flags enable targeted timing spans without raising global logging
levels. They are disabled by default.
Enable all profiler-gated spans for one gateway run:
```bash
OPENCLAW_DIAGNOSTICS=profiler openclaw gateway run
```
Enable only reply-dispatch profiler spans:
```bash
OPENCLAW_DIAGNOSTICS=reply.profiler openclaw gateway run
```
Enable only Codex app-server startup/tool/thread profiler spans:
```bash
OPENCLAW_DIAGNOSTICS=codex.profiler openclaw gateway run
```
Enable profiler flags from config:
```json
{
"diagnostics": {
"flags": ["reply.profiler", "codex.profiler"]
}
}
```
Restart the gateway after changing config flags. To disable a profiler flag,
remove it from `diagnostics.flags` and restart. To temporarily disable every
diagnostics flag even when config enables profiler flags, start the process with:
```bash
OPENCLAW_DIAGNOSTICS=0 openclaw gateway run
```
## Timeline artifacts
The `timeline` flag writes structured startup and runtime timing events for

View File

@@ -1,30 +0,0 @@
import { describe, expect, it } from "vitest";
import { isCodexAppServerProfilerEnabled } from "./profiler-flag.js";
describe("isCodexAppServerProfilerEnabled", () => {
it("is disabled by default", () => {
expect(isCodexAppServerProfilerEnabled(undefined, {} as NodeJS.ProcessEnv)).toBe(false);
});
it("matches global and Codex profiler flags", () => {
expect(
isCodexAppServerProfilerEnabled(
{ diagnostics: { flags: ["codex.profiler"] } },
{} as NodeJS.ProcessEnv,
),
).toBe(true);
expect(
isCodexAppServerProfilerEnabled(undefined, {
OPENCLAW_DIAGNOSTICS: "profiler",
} as NodeJS.ProcessEnv),
).toBe(true);
});
it("uses the documented diagnostics env disable override", () => {
expect(
isCodexAppServerProfilerEnabled({ diagnostics: { flags: ["codex.profiler"] } }, {
OPENCLAW_DIAGNOSTICS: "0",
} as NodeJS.ProcessEnv),
).toBe(false);
});
});

View File

@@ -1,11 +0,0 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { isDiagnosticFlagEnabled } from "openclaw/plugin-sdk/diagnostic-runtime";
const PROFILER_FLAGS = ["profiler", "codex.profiler"] as const;
export function isCodexAppServerProfilerEnabled(
config?: OpenClawConfig,
env: NodeJS.ProcessEnv = process.env,
): boolean {
return PROFILER_FLAGS.some((flag) => isDiagnosticFlagEnabled(flag, config, env));
}

View File

@@ -887,7 +887,6 @@ describe("runCodexAppServerAttempt", () => {
await closeCodexSandboxExecServersForTests();
resetCodexAppServerClientFactoryForTest();
testing.resetOpenClawCodingToolsFactoryForTests();
testing.resetEnsuredCodexWorkspaceDirsForTests();
testing.clearPendingCodexNativeHookRelayUnregistersForTests();
resetCodexRateLimitCacheForTests();
nativeHookRelayTesting.clearNativeHookRelaysForTests();
@@ -904,16 +903,6 @@ describe("runCodexAppServerAttempt", () => {
await fs.rm(tempDir, { recursive: true, force: true });
});
it("recreates cached Codex workspace directories after cleanup removes them", async () => {
const workspaceDir = path.join(tempDir, "cached-workspace");
await testing.ensureCodexWorkspaceDirOnceForTests(workspaceDir);
await fs.rm(workspaceDir, { recursive: true, force: true });
await testing.ensureCodexWorkspaceDirOnceForTests(workspaceDir);
expect((await fs.stat(workspaceDir)).isDirectory()).toBe(true);
});
it("filters Codex-native dynamic tools from app-server tool exposure", () => {
const tools = [
"read",

View File

@@ -134,7 +134,6 @@ import {
mergeCodexThreadConfigs,
shouldBuildCodexPluginThreadConfig,
} from "./plugin-thread-config.js";
import { isCodexAppServerProfilerEnabled } from "./profiler-flag.js";
import {
assertCodexTurnStartResponse,
readCodexDynamicToolCallParams,
@@ -279,7 +278,6 @@ type CodexWorkspaceBootstrapContext = CodexBootstrapContext & {
};
let openClawCodingToolsFactoryForTests: OpenClawCodingToolsFactory | undefined;
const ensuredCodexWorkspaceDirs = new Set<string>();
type PendingCodexNativeHookRelayUnregister = {
timeout: ReturnType<typeof setTimeout>;
@@ -342,30 +340,6 @@ function clearPendingCodexNativeHookRelayUnregistersForTests(): void {
pendingCodexNativeHookRelayUnregisters.clear();
}
async function ensureCodexWorkspaceDirOnce(workspaceDir: string): Promise<void> {
const normalized = path.resolve(workspaceDir);
if (ensuredCodexWorkspaceDirs.has(normalized)) {
try {
const stat = await fs.stat(normalized);
if (stat.isDirectory()) {
return;
}
} catch (error) {
const code =
typeof error === "object" && error ? (error as { code?: unknown }).code : undefined;
if (code !== "ENOENT") {
throw error;
}
}
ensuredCodexWorkspaceDirs.delete(normalized);
}
// Codex attempts re-enter the same workspace repeatedly; caching successful
// mkdirs avoids repeated fs work while still recovering if cleanup prunes
// the directory between attempts.
await fs.mkdir(normalized, { recursive: true });
ensuredCodexWorkspaceDirs.add(normalized);
}
function emitCodexAppServerEvent(
params: EmbeddedRunAttemptParams,
event: Parameters<NonNullable<EmbeddedRunAttemptParams["onAgentEvent"]>>[0],
@@ -1039,7 +1013,6 @@ export async function runCodexAppServerAttempt(
} = {},
): Promise<EmbeddedRunAttemptResult> {
const attemptStartedAt = Date.now();
const profilerEnabled = isCodexAppServerProfilerEnabled(params.config);
const codexModelCallTrace = freezeDiagnosticTraceContext(
createDiagnosticTraceContextFromActiveScope(),
);
@@ -1049,20 +1022,13 @@ export async function runCodexAppServerAttempt(
let codexModelCallStarted = false;
let codexModelCallTerminalEmitted = false;
let codexModelCallRequestPayloadBytes: number | undefined;
// Startup phase timings are profiler-gated because this function runs before
// every Codex turn; normal production should not do timing bookkeeping here.
const preDynamicStartupStages = createCodexDynamicToolBuildStageTracker({
enabled: profilerEnabled,
});
const attemptClientFactory = options.clientFactory ?? defaultCodexAppServerClientFactory;
const pluginConfig = readCodexPluginConfig(options.pluginConfig);
const computerUseConfig = resolveCodexComputerUseConfig({ pluginConfig });
const configuredAppServer = resolveCodexAppServerRuntimeOptions({ pluginConfig });
const beforeToolCallPolicy = getBeforeToolCallPolicyDiagnosticState();
preDynamicStartupStages.mark("config");
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
await ensureCodexWorkspaceDirOnce(resolvedWorkspace);
preDynamicStartupStages.mark("workspace");
await fs.mkdir(resolvedWorkspace, { recursive: true });
const sandboxSessionKey =
params.sandboxSessionKey?.trim() || params.sessionKey?.trim() || params.sessionId;
const contextSessionKey = params.sessionKey?.trim() || sandboxSessionKey;
@@ -1071,14 +1037,12 @@ export async function runCodexAppServerAttempt(
sessionKey: sandboxSessionKey,
workspaceDir: resolvedWorkspace,
});
preDynamicStartupStages.mark("sandbox");
const effectiveWorkspace = sandbox?.enabled
? sandbox.workspaceAccess === "rw"
? resolvedWorkspace
: sandbox.workspaceDir
: resolvedWorkspace;
await ensureCodexWorkspaceDirOnce(effectiveWorkspace);
preDynamicStartupStages.mark("effective-workspace");
await fs.mkdir(effectiveWorkspace, { recursive: true });
const appServer = resolveCodexAppServerForOpenClawToolPolicy({
appServer: configuredAppServer,
pluginConfig,
@@ -1098,13 +1062,11 @@ export async function runCodexAppServerAttempt(
trustedToolPolicies: beforeToolCallPolicy.trustedToolPolicies,
});
}
preDynamicStartupStages.mark("app-server-policy");
let pluginAppServer: CodexAppServerRuntimeOptions = appServer;
const nativeHookRelayEvents = resolveCodexNativeHookRelayEvents({
configuredEvents: options.nativeHookRelay?.events,
appServer,
});
preDynamicStartupStages.mark("native-hook-relay");
const runAbortController = new AbortController();
const abortFromUpstream = () => {
@@ -1122,9 +1084,7 @@ export async function runCodexAppServerAttempt(
agentId: params.agentId,
});
const agentDir = params.agentDir ?? resolveAgentDir(params.config ?? {}, sessionAgentId);
preDynamicStartupStages.mark("session-agent");
let startupBinding = await readCodexAppServerBinding(params.sessionFile);
preDynamicStartupStages.mark("read-binding");
const startupBindingAuthProfileId = startupBinding?.authProfileId;
startupBinding = await rotateOversizedCodexAppServerStartupBinding({
binding: startupBinding,
@@ -1134,7 +1094,6 @@ export async function runCodexAppServerAttempt(
config: params.config,
contextEngineActive: isActiveHarnessContextEngine(params.contextEngine),
});
preDynamicStartupStages.mark("rotate-binding");
const startupAuthProfileCandidate =
params.runtimePlan?.auth.forwardedAuthProfileId ??
params.authProfileId ??
@@ -1151,7 +1110,6 @@ export async function runCodexAppServerAttempt(
agentDir,
config: params.config,
});
preDynamicStartupStages.mark("auth-profile");
const runtimeParams = {
...params,
sessionKey: contextSessionKey,
@@ -1175,13 +1133,11 @@ export async function runCodexAppServerAttempt(
: resolveCodexAppServerFallbackApiKeyCacheKey({
startOptions: appServer.start,
});
preDynamicStartupStages.mark("auth-cache");
const nodeExecBlocksNativeExecution = isCodexNativeExecutionBlockedByNodeExecHost(params, {
agentId: sessionAgentId,
runtimeSessionKey: sandboxSessionKey,
sandbox,
});
preDynamicStartupStages.mark("native-exec-policy");
const bundleMcpThreadConfig = await loadCodexBundleMcpThreadConfig({
workspaceDir: effectiveWorkspace,
cfg: params.config,
@@ -1189,14 +1145,12 @@ export async function runCodexAppServerAttempt(
disableTools: params.disableTools,
toolsAllow: nodeExecBlocksNativeExecution ? [] : params.toolsAllow,
});
preDynamicStartupStages.mark("bundle-mcp");
const sandboxExecServerEnabled = isCodexSandboxExecServerEnabled(pluginConfig);
const nativeToolSurfaceEnabled = shouldEnableCodexAppServerNativeToolSurface(params, sandbox, {
agentId: sessionAgentId,
runtimeSessionKey: sandboxSessionKey,
sandboxExecServerEnabled,
});
preDynamicStartupStages.mark("native-tool-surface");
for (const diagnostic of bundleMcpThreadConfig.diagnostics) {
embeddedAgentLog.warn(`bundle-mcp: ${diagnostic.pluginId}: ${diagnostic.message}`);
}
@@ -1211,23 +1165,6 @@ export async function runCodexAppServerAttempt(
});
}
const hookChannelId = resolveCodexAppServerHookChannelId(params, sandboxSessionKey);
preDynamicStartupStages.mark("context-engine-support");
const preDynamicSummary = preDynamicStartupStages.snapshot();
if (shouldWarnCodexDynamicToolBuildStageSummary(preDynamicSummary)) {
embeddedAgentLog.warn(
`codex app-server pre-dynamic startup timings runId=${params.runId} sessionId=${params.sessionId} totalMs=${preDynamicSummary.totalMs} stages=${formatCodexDynamicToolBuildStageSummary(preDynamicSummary)}`,
{
runId: params.runId,
sessionId: params.sessionId,
totalMs: preDynamicSummary.totalMs,
stages: preDynamicSummary.stages,
hasStartupBinding: Boolean(startupBinding?.threadId),
startupAuthProfileId: startupAuthProfileId ?? null,
bundleMcpDiagnosticCount: bundleMcpThreadConfig.diagnostics.length,
nativeToolSurfaceEnabled,
},
);
}
let yieldDetected = false;
const tools = await buildDynamicTools({
params,
@@ -1239,7 +1176,6 @@ export async function runCodexAppServerAttempt(
runAbortController,
sessionAgentId,
pluginConfig,
profilerEnabled,
onYieldDetected: () => {
yieldDetected = true;
},
@@ -1254,7 +1190,6 @@ export async function runCodexAppServerAttempt(
runAbortController,
sessionAgentId,
pluginConfig,
profilerEnabled,
forceHeartbeatTool: true,
ignoreRuntimePlan: true,
onYieldDetected: () => {
@@ -3918,9 +3853,6 @@ function createCodexNativeHookRelay(params: {
}),
signal: params.signal,
command: {
// Hook relay subprocesses are observational for most tool events; keep
// them lower priority so they do not compete with the active reply turn.
nice: 10,
timeoutMs: params.options?.gatewayTimeoutMs,
},
});
@@ -4090,7 +4022,6 @@ type DynamicToolBuildParams = {
runAbortController: AbortController;
sessionAgentId: string;
pluginConfig: CodexPluginConfig;
profilerEnabled?: boolean;
forceHeartbeatTool?: boolean;
ignoreRuntimePlan?: boolean;
onYieldDetected: () => void;
@@ -4120,91 +4051,16 @@ function resolveCodexAppServerHookChannelId(
}).channelId;
}
type CodexDynamicToolBuildStageTiming = {
name: string;
durationMs: number;
elapsedMs: number;
};
type CodexDynamicToolBuildStageSummary = {
totalMs: number;
stages: CodexDynamicToolBuildStageTiming[];
};
const CODEX_DYNAMIC_TOOL_BUILD_WARN_TOTAL_MS = 1_000;
const CODEX_DYNAMIC_TOOL_BUILD_WARN_STAGE_MS = 500;
function createCodexDynamicToolBuildStageTracker(options: { enabled?: boolean } = {}): {
mark: (name: string) => void;
snapshot: () => CodexDynamicToolBuildStageSummary;
} {
if (!options.enabled) {
return {
mark() {},
snapshot() {
return { totalMs: 0, stages: [] };
},
};
}
const startedAt = Date.now();
let previousAt = startedAt;
const stages: CodexDynamicToolBuildStageTiming[] = [];
const toMs = (value: number) => Math.max(0, Math.round(value));
return {
mark(name) {
const currentAt = Date.now();
stages.push({
name,
durationMs: toMs(currentAt - previousAt),
elapsedMs: toMs(currentAt - startedAt),
});
previousAt = currentAt;
},
snapshot() {
return {
totalMs: toMs(Date.now() - startedAt),
stages: stages.slice(),
};
},
};
}
function shouldWarnCodexDynamicToolBuildStageSummary(
summary: CodexDynamicToolBuildStageSummary,
): boolean {
return (
summary.totalMs >= CODEX_DYNAMIC_TOOL_BUILD_WARN_TOTAL_MS ||
summary.stages.some((stage) => stage.durationMs >= CODEX_DYNAMIC_TOOL_BUILD_WARN_STAGE_MS)
);
}
function formatCodexDynamicToolBuildStageSummary(
summary: CodexDynamicToolBuildStageSummary,
): string {
return summary.stages.length > 0
? summary.stages
.map((stage) => `${stage.name}:${stage.durationMs}ms@${stage.elapsedMs}ms`)
.join(",")
: "none";
}
async function buildDynamicTools(input: DynamicToolBuildParams) {
const { params } = input;
if (params.disableTools || !supportsModelTools(params.model)) {
return [];
}
// Dynamic tool construction is on the reply hot path, so per-stage
// Date.now/span bookkeeping runs only when the Codex profiler flag is set.
const toolBuildStages = createCodexDynamicToolBuildStageTracker({
enabled: input.profilerEnabled,
});
const modelHasVision = params.model.input?.includes("image") ?? false;
const agentDir = params.agentDir ?? resolveAgentDir(params.config ?? {}, input.sessionAgentId);
const createOpenClawCodingTools =
openClawCodingToolsFactoryForTests ??
(await import("openclaw/plugin-sdk/agent-harness")).createOpenClawCodingTools;
toolBuildStages.mark("load-agent-harness-tools");
const sessionKeys = resolveOpenClawCodingToolsSessionKeys(params, input.sandboxSessionKey);
const allTools = createOpenClawCodingTools({
agentId: input.sessionAgentId,
@@ -4274,11 +4130,7 @@ async function buildDynamicTools(input: DynamicToolBuildParams) {
data: { name: "sessions_yield", message },
});
},
recordToolPrepStage: (name) => {
toolBuildStages.mark(name);
},
});
toolBuildStages.mark("create-openclaw-coding-tools");
const codexFilteredTools = addNodeShellDynamicToolsIfNeeded(
addSandboxShellDynamicToolsIfAvailable(
isCodexMemoryFlushRun(params)
@@ -4290,16 +4142,13 @@ async function buildDynamicTools(input: DynamicToolBuildParams) {
allTools,
input,
);
toolBuildStages.mark("codex-filtering");
const visionFilteredTools = filterToolsForVisionInputs(codexFilteredTools, {
modelHasVision,
hasInboundImages: (params.images?.length ?? 0) > 0,
});
toolBuildStages.mark("vision-filtering");
const toolsAllow = includeForcedCodexDynamicToolAllow(params.toolsAllow, params);
const filteredTools = filterCodexDynamicToolsForAllowlist(visionFilteredTools, toolsAllow);
toolBuildStages.mark("allowlist-filter");
const normalizedTools = normalizeAgentRuntimeTools({
return normalizeAgentRuntimeTools({
runtimePlan: input.ignoreRuntimePlan ? undefined : params.runtimePlan,
tools: filteredTools,
provider: params.provider,
@@ -4310,30 +4159,6 @@ async function buildDynamicTools(input: DynamicToolBuildParams) {
modelApi: params.model.api,
model: params.model,
});
toolBuildStages.mark("runtime-normalization");
const summary = toolBuildStages.snapshot();
if (shouldWarnCodexDynamicToolBuildStageSummary(summary)) {
const phase = input.forceHeartbeatTool ? "registered-tools" : "runtime-tools";
embeddedAgentLog.warn(
`codex app-server dynamic tool build timings runId=${params.runId} sessionId=${params.sessionId} phase=${phase} totalMs=${summary.totalMs} stages=${formatCodexDynamicToolBuildStageSummary(summary)}`,
{
runId: params.runId,
sessionId: params.sessionId,
phase,
totalMs: summary.totalMs,
stages: summary.stages,
allToolCount: allTools.length,
codexFilteredToolCount: codexFilteredTools.length,
visionFilteredToolCount: visionFilteredTools.length,
filteredToolCount: filteredTools.length,
normalizedToolCount: normalizedTools.length,
forceHeartbeatTool: input.forceHeartbeatTool === true,
ignoreRuntimePlan: input.ignoreRuntimePlan === true,
nativeToolSurfaceEnabled: input.nativeToolSurfaceEnabled === true,
},
);
}
return normalizedTools;
}
function includeForcedCodexDynamicToolAllow(
@@ -6169,12 +5994,6 @@ export const testing = {
resetOpenClawCodingToolsFactoryForTests(): void {
openClawCodingToolsFactoryForTests = undefined;
},
async ensureCodexWorkspaceDirOnceForTests(workspaceDir: string): Promise<void> {
await ensureCodexWorkspaceDirOnce(workspaceDir);
},
resetEnsuredCodexWorkspaceDirsForTests(): void {
ensuredCodexWorkspaceDirs.clear();
},
flushPendingCodexNativeHookRelayUnregistersForTests,
clearPendingCodexNativeHookRelayUnregistersForTests,
resolveCodexNativeHookRelayUnregisterGraceMs,

View File

@@ -19,7 +19,6 @@ import {
mergeCodexThreadConfigs,
type CodexPluginThreadConfig,
} from "./plugin-thread-config.js";
import { isCodexAppServerProfilerEnabled } from "./profiler-flag.js";
import {
assertCodexThreadResumeResponse,
assertCodexThreadStartResponse,
@@ -85,113 +84,6 @@ const CODEX_LIGHTWEIGHT_CONTEXT_THREAD_CONFIG: JsonObject = {
project_doc_max_bytes: 0,
};
type CodexThreadLifecycleTimingSpan = {
name: string;
durationMs: number;
elapsedMs: number;
};
type CodexThreadLifecycleTimingSummary = {
totalMs: number;
spans: CodexThreadLifecycleTimingSpan[];
};
const CODEX_THREAD_LIFECYCLE_TIMING_WARN_TOTAL_MS = 1_000;
const CODEX_THREAD_LIFECYCLE_TIMING_WARN_STAGE_MS = 500;
function createCodexThreadLifecycleTimingTracker(options: { enabled?: boolean } = {}): {
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
measureSync: <T>(name: string, run: () => T) => T;
logIfSlow: (params: {
runId: string;
sessionId: string;
sessionKey?: string;
action: "started" | "resumed" | "rotated";
threadId?: string;
}) => void;
} {
if (!options.enabled) {
return {
async measure(_name, run) {
return await run();
},
measureSync(_name, run) {
return run();
},
logIfSlow() {},
};
}
const startedAt = Date.now();
let didLog = false;
const spans: CodexThreadLifecycleTimingSpan[] = [];
const toMs = (value: number) => Math.max(0, Math.round(value));
const record = (name: string, spanStartedAt: number) => {
spans.push({
name,
durationMs: toMs(Date.now() - spanStartedAt),
elapsedMs: toMs(Date.now() - startedAt),
});
};
const snapshot = (): CodexThreadLifecycleTimingSummary => ({
totalMs: toMs(Date.now() - startedAt),
spans: spans.slice(),
});
const shouldLog = (summary: CodexThreadLifecycleTimingSummary) =>
summary.totalMs >= CODEX_THREAD_LIFECYCLE_TIMING_WARN_TOTAL_MS ||
summary.spans.some((span) => span.durationMs >= CODEX_THREAD_LIFECYCLE_TIMING_WARN_STAGE_MS);
const formatSpans = (summary: CodexThreadLifecycleTimingSummary) =>
summary.spans.length > 0
? summary.spans
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
.join(",")
: "none";
return {
async measure(name, run) {
const spanStartedAt = Date.now();
try {
return await run();
} finally {
record(name, spanStartedAt);
}
},
measureSync(name, run) {
const spanStartedAt = Date.now();
try {
return run();
} finally {
record(name, spanStartedAt);
}
},
logIfSlow(params) {
if (didLog) {
return;
}
const summary = snapshot();
if (!shouldLog(summary)) {
return;
}
didLog = true;
embeddedAgentLog.warn(
`codex app-server thread lifecycle timings runId=${params.runId} sessionId=${
params.sessionId
} sessionKey=${params.sessionKey ?? "unknown"} action=${params.action} totalMs=${
summary.totalMs
} stages=${formatSpans(summary)}`,
{
runId: params.runId,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
action: params.action,
threadId: params.threadId,
totalMs: summary.totalMs,
spans: summary.spans,
},
);
},
};
}
export async function startOrResumeThread(params: {
client: CodexAppServerClient;
params: EmbeddedRunAttemptParams;
@@ -211,16 +103,10 @@ export async function startOrResumeThread(params: {
pluginThreadConfig?: CodexPluginThreadConfigProvider;
contextEngineProjection?: CodexContextEngineThreadBootstrapProjection;
}): Promise<CodexAppServerThreadLifecycleBinding> {
// Thread lifecycle spans are useful when profiling startup churn, but normal
// turns should not pay Date.now/span-array overhead while resuming threads.
const lifecycleTiming = createCodexThreadLifecycleTimingTracker({
enabled: isCodexAppServerProfilerEnabled(params.params.config),
});
const dynamicToolsFingerprint = lifecycleTiming.measureSync("fingerprint_dynamic_tools", () =>
fingerprintDynamicTools(params.dynamicTools),
);
const contextEngineBinding = lifecycleTiming.measureSync("context_engine_binding", () =>
buildContextEngineBinding(params.params, params.contextEngineProjection),
const dynamicToolsFingerprint = fingerprintDynamicTools(params.dynamicTools);
const contextEngineBinding = buildContextEngineBinding(
params.params,
params.contextEngineProjection,
);
const userMcpServersConfigPatch =
params.userMcpServersEnabled === false
@@ -232,13 +118,11 @@ export async function startOrResumeThread(params: {
const environmentSelectionFingerprint = fingerprintEnvironmentSelection(
params.environmentSelection,
);
let binding = await lifecycleTiming.measure("read_binding", () =>
readCodexAppServerBinding(params.params.sessionFile, {
authProfileStore: params.params.authProfileStore,
agentDir: params.params.agentDir,
config: params.params.config,
}),
);
let binding = await readCodexAppServerBinding(params.params.sessionFile, {
authProfileStore: params.params.authProfileStore,
agentDir: params.params.agentDir,
config: params.params.config,
});
let preserveExistingBinding = false;
let rotatedContextEngineBinding = false;
let prebuiltPluginThreadConfig: CodexPluginThreadConfig | undefined;
@@ -323,9 +207,7 @@ export async function startOrResumeThread(params: {
})
) {
try {
prebuiltPluginThreadConfig = await lifecycleTiming.measure("plugin_config_recovery", () =>
params.pluginThreadConfig?.build(),
);
prebuiltPluginThreadConfig = await params.pluginThreadConfig?.build();
pluginBindingStale =
prebuiltPluginThreadConfig?.fingerprint !== binding.pluginAppsFingerprint;
} catch (error) {
@@ -392,21 +274,19 @@ export async function startOrResumeThread(params: {
userMcpServersConfigPatch,
params.finalConfigPatch,
);
const resumeParams = lifecycleTiming.measureSync("thread_resume_params", () =>
buildThreadResumeParams(params.params, {
threadId: binding.threadId,
authProfileId,
appServer: params.appServer,
dynamicTools: params.dynamicTools,
developerInstructions: params.developerInstructions,
config: resumeConfig,
nativeCodeModeEnabled: params.nativeCodeModeEnabled,
nativeCodeModeOnlyEnabled: params.nativeCodeModeOnlyEnabled,
}),
);
const response = assertCodexThreadResumeResponse(
await lifecycleTiming.measure("thread_resume_request", () =>
params.client.request("thread/resume", resumeParams),
await params.client.request(
"thread/resume",
buildThreadResumeParams(params.params, {
threadId: binding.threadId,
authProfileId,
appServer: params.appServer,
dynamicTools: params.dynamicTools,
developerInstructions: params.developerInstructions,
config: resumeConfig,
nativeCodeModeEnabled: params.nativeCodeModeEnabled,
nativeCodeModeOnlyEnabled: params.nativeCodeModeOnlyEnabled,
}),
),
);
const boundAuthProfileId = authProfileId;
@@ -421,31 +301,29 @@ export async function startOrResumeThread(params: {
params.mcpServersFingerprintEvaluated === true
? params.mcpServersFingerprint
: binding.mcpServersFingerprint;
await lifecycleTiming.measure("thread_resume_write_binding", () =>
writeCodexAppServerBinding(
params.params.sessionFile,
{
threadId: response.thread.id,
cwd: params.cwd,
authProfileId: boundAuthProfileId,
model: params.params.modelId,
modelProvider: response.modelProvider ?? fallbackModelProvider,
dynamicToolsFingerprint,
userMcpServersFingerprint,
mcpServersFingerprint: nextMcpServersFingerprint,
pluginAppsFingerprint: binding.pluginAppsFingerprint,
pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint,
pluginAppPolicyContext: binding.pluginAppPolicyContext,
contextEngine: contextEngineBinding,
environmentSelectionFingerprint,
createdAt: binding.createdAt,
},
{
authProfileStore: params.params.authProfileStore,
agentDir: params.params.agentDir,
config: params.params.config,
},
),
await writeCodexAppServerBinding(
params.params.sessionFile,
{
threadId: response.thread.id,
cwd: params.cwd,
authProfileId: boundAuthProfileId,
model: params.params.modelId,
modelProvider: response.modelProvider ?? fallbackModelProvider,
dynamicToolsFingerprint,
userMcpServersFingerprint,
mcpServersFingerprint: nextMcpServersFingerprint,
pluginAppsFingerprint: binding.pluginAppsFingerprint,
pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint,
pluginAppPolicyContext: binding.pluginAppPolicyContext,
contextEngine: contextEngineBinding,
environmentSelectionFingerprint,
createdAt: binding.createdAt,
},
{
authProfileStore: params.params.authProfileStore,
agentDir: params.params.agentDir,
config: params.params.config,
},
);
if (contextEngineBinding) {
embeddedAgentLog.info("codex app-server wrote context-engine thread binding", {
@@ -458,13 +336,6 @@ export async function startOrResumeThread(params: {
action: "resumed",
});
}
lifecycleTiming.logIfSlow({
runId: params.params.runId,
sessionId: params.params.sessionId,
sessionKey: params.params.sessionKey,
threadId: response.thread.id,
action: "resumed",
});
return {
...binding,
threadId: response.thread.id,
@@ -495,34 +366,27 @@ export async function startOrResumeThread(params: {
}
const pluginThreadConfig = params.pluginThreadConfig?.enabled
? (prebuiltPluginThreadConfig ??
(await lifecycleTiming.measure("plugin_config_build", () =>
params.pluginThreadConfig?.build(),
)))
? (prebuiltPluginThreadConfig ?? (await params.pluginThreadConfig.build()))
: undefined;
const config = lifecycleTiming.measureSync("merge_thread_config", () =>
mergeCodexThreadConfigs(
params.config,
userMcpServersConfigPatch,
pluginThreadConfig?.configPatch,
params.finalConfigPatch,
),
);
const startParams = lifecycleTiming.measureSync("thread_start_params", () =>
buildThreadStartParams(params.params, {
cwd: params.cwd,
dynamicTools: params.dynamicTools,
appServer: params.appServer,
developerInstructions: params.developerInstructions,
config,
nativeCodeModeEnabled: params.nativeCodeModeEnabled,
nativeCodeModeOnlyEnabled: params.nativeCodeModeOnlyEnabled,
environmentSelection: params.environmentSelection,
}),
const config = mergeCodexThreadConfigs(
params.config,
userMcpServersConfigPatch,
pluginThreadConfig?.configPatch,
params.finalConfigPatch,
);
const response = assertCodexThreadStartResponse(
await lifecycleTiming.measure("thread_start_request", () =>
params.client.request("thread/start", startParams),
await params.client.request(
"thread/start",
buildThreadStartParams(params.params, {
cwd: params.cwd,
dynamicTools: params.dynamicTools,
appServer: params.appServer,
developerInstructions: params.developerInstructions,
config,
nativeCodeModeEnabled: params.nativeCodeModeEnabled,
nativeCodeModeOnlyEnabled: params.nativeCodeModeOnlyEnabled,
environmentSelection: params.environmentSelection,
}),
),
);
const modelProvider = resolveCodexAppServerModelProvider({
@@ -536,31 +400,29 @@ export async function startOrResumeThread(params: {
const nextMcpServersFingerprint =
params.mcpServersFingerprintEvaluated === true ? params.mcpServersFingerprint : undefined;
if (!preserveExistingBinding) {
await lifecycleTiming.measure("thread_start_write_binding", () =>
writeCodexAppServerBinding(
params.params.sessionFile,
{
threadId: response.thread.id,
cwd: params.cwd,
authProfileId: params.params.authProfileId,
model: response.model ?? params.params.modelId,
modelProvider: response.modelProvider ?? modelProvider,
dynamicToolsFingerprint,
userMcpServersFingerprint,
mcpServersFingerprint: nextMcpServersFingerprint,
pluginAppsFingerprint: pluginThreadConfig?.fingerprint,
pluginAppsInputFingerprint: pluginThreadConfig?.inputFingerprint,
pluginAppPolicyContext: pluginThreadConfig?.policyContext,
contextEngine: contextEngineBinding,
environmentSelectionFingerprint,
createdAt,
},
{
authProfileStore: params.params.authProfileStore,
agentDir: params.params.agentDir,
config: params.params.config,
},
),
await writeCodexAppServerBinding(
params.params.sessionFile,
{
threadId: response.thread.id,
cwd: params.cwd,
authProfileId: params.params.authProfileId,
model: response.model ?? params.params.modelId,
modelProvider: response.modelProvider ?? modelProvider,
dynamicToolsFingerprint,
userMcpServersFingerprint,
mcpServersFingerprint: nextMcpServersFingerprint,
pluginAppsFingerprint: pluginThreadConfig?.fingerprint,
pluginAppsInputFingerprint: pluginThreadConfig?.inputFingerprint,
pluginAppPolicyContext: pluginThreadConfig?.policyContext,
contextEngine: contextEngineBinding,
environmentSelectionFingerprint,
createdAt,
},
{
authProfileStore: params.params.authProfileStore,
agentDir: params.params.agentDir,
config: params.params.config,
},
);
if (contextEngineBinding) {
embeddedAgentLog.info("codex app-server wrote context-engine thread binding", {
@@ -574,13 +436,6 @@ export async function startOrResumeThread(params: {
});
}
}
lifecycleTiming.logIfSlow({
runId: params.params.runId,
sessionId: params.params.sessionId,
sessionKey: params.params.sessionKey,
threadId: response.thread.id,
action: rotatedContextEngineBinding ? "rotated" : "started",
});
return {
schemaVersion: 1,
threadId: response.thread.id,

View File

@@ -1,4 +1,6 @@
import { spawnSync } from "node:child_process";
import { spawn } from "node:child_process";
const DEFAULT_TIMEOUT_KILL_GRACE_MS = 30_000;
const usage = () => {
console.error("Usage: assertions.mjs <run-with-timeout|assert-image-providers> [...]");
@@ -7,16 +9,72 @@ const usage = () => {
const [mode, ...args] = process.argv.slice(2);
if (mode === "run-with-timeout") {
const [timeoutMs, command, ...commandArgs] = args;
const timeout = Number(timeoutMs);
if (!Number.isFinite(timeout) || timeout <= 0 || !command) {
usage();
const parsePositiveNumber = (value, label) => {
const parsed = Number(value);
if (!Number.isFinite(parsed) || parsed <= 0) {
throw new Error(`${label} must be a positive number`);
}
return parsed;
};
const result = spawnSync(command, commandArgs, { encoding: "utf8", env: process.env, timeout });
process.stdout.write(result.stdout ?? "");
process.stderr.write(result.stderr ?? "");
const signalChild = (child, signal) => {
if (!child.pid) {
return;
}
try {
if (process.platform === "win32") {
child.kill(signal);
return;
}
process.kill(-child.pid, signal);
} catch (error) {
if (error?.code !== "ESRCH") {
throw error;
}
}
};
const runWithTimeout = async (timeout, command, commandArgs) => {
const killGrace = parsePositiveNumber(
process.env.OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS ??
String(DEFAULT_TIMEOUT_KILL_GRACE_MS),
"OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS",
);
const child = spawn(command, commandArgs, {
detached: process.platform !== "win32",
env: process.env,
stdio: ["ignore", "pipe", "pipe"],
});
let timedOut = false;
let killTimer;
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => process.stdout.write(chunk));
child.stderr.on("data", (chunk) => process.stderr.write(chunk));
const timeoutTimer = setTimeout(() => {
timedOut = true;
signalChild(child, "SIGTERM");
killTimer = setTimeout(() => signalChild(child, "SIGKILL"), killGrace);
killTimer.unref();
}, timeout);
timeoutTimer.unref();
let spawnError;
child.on("error", (error) => {
spawnError = error;
});
const result = await new Promise((resolve) => {
child.on("close", (status, signal) => resolve({ error: spawnError, signal, status }));
});
clearTimeout(timeoutTimer);
clearTimeout(killTimer);
if (timedOut) {
console.error(`command timed out after ${timeout}ms: ${command}`);
process.exit(1);
}
if (result.error) {
console.error(`command failed: ${command}: ${result.error.message}`);
process.exit(1);
@@ -26,6 +84,20 @@ if (mode === "run-with-timeout") {
process.exit(1);
}
process.exit(result.status ?? 0);
};
if (mode === "run-with-timeout") {
const [timeoutMs, command, ...commandArgs] = args;
if (!command) {
usage();
}
let timeout;
try {
timeout = parsePositiveNumber(timeoutMs, "timeoutMs");
} catch {
usage();
}
await runWithTimeout(timeout, command, commandArgs);
}
if (mode === "assert-image-providers") {

View File

@@ -115,7 +115,7 @@ describe("native hook relay registry", () => {
);
});
it("preserves permission relays while marking hook-only events without handlers inactive", () => {
it("preserves safety relays while marking hook-only events without handlers inactive", () => {
const relay = registerNativeHookRelay({
provider: "codex",
sessionId: "session-1",
@@ -127,66 +127,12 @@ describe("native hook relay registry", () => {
},
});
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(false);
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(true);
expect(relay.shouldRelayEvent("post_tool_use")).toBe(false);
expect(relay.shouldRelayEvent("before_agent_finalize")).toBe(false);
expect(relay.shouldRelayEvent("permission_request")).toBe(true);
});
it("builds pre-tool relay commands only when before-tool policy is active", () => {
initializeGlobalHookRunner(
createMockPluginRegistry([{ hookName: "before_tool_call", handler: vi.fn() }]),
);
const relay = registerNativeHookRelay({
provider: "codex",
sessionId: "session-1",
runId: "run-1",
command: {
executable: "/opt/Open Claw/openclaw.mjs",
nodeExecutable: "/usr/local/bin/node",
timeoutMs: 1234,
},
});
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(true);
expect(relay.commandForEvent("pre_tool_use")).toBe(
"/usr/local/bin/node '/opt/Open Claw/openclaw.mjs' hooks relay --provider codex --relay-id " +
`${relay.relayId} --event pre_tool_use --timeout 1234`,
);
});
it("keeps pre-tool relays active when native loop detection is not disabled", () => {
const relay = registerNativeHookRelay({
provider: "codex",
sessionId: "session-1",
sessionKey: "agent:main:session-1",
runId: "run-1",
command: {
executable: "/opt/Open Claw/openclaw.mjs",
nodeExecutable: "/usr/local/bin/node",
timeoutMs: 1234,
},
});
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(true);
expect(relay.commandForEvent("pre_tool_use")).toBe(
"/usr/local/bin/node '/opt/Open Claw/openclaw.mjs' hooks relay --provider codex --relay-id " +
`${relay.relayId} --event pre_tool_use --timeout 1234`,
);
});
it("omits pre-tool relays when native loop detection is explicitly disabled", () => {
const relay = registerNativeHookRelay({
provider: "codex",
sessionId: "session-1",
sessionKey: "agent:main:session-1",
runId: "run-1",
config: { tools: { loopDetection: { enabled: false } } } as never,
});
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(false);
});
it("builds relay commands only for native events with matching local hooks", () => {
initializeGlobalHookRunner(
createMockPluginRegistry([{ hookName: "after_tool_call", handler: vi.fn() }]),
@@ -202,7 +148,7 @@ describe("native hook relay registry", () => {
},
});
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(false);
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(true);
expect(relay.shouldRelayEvent("post_tool_use")).toBe(true);
expect(relay.shouldRelayEvent("before_agent_finalize")).toBe(false);
expect(relay.commandForEvent("post_tool_use")).toBe(
@@ -2338,19 +2284,4 @@ describe("native hook relay command builder", () => {
"openclaw hooks relay --provider codex --relay-id relay-1 --event permission_request --timeout 5000",
);
});
it("can lower native hook relay process priority", () => {
const prefix = process.platform === "win32" ? "" : "nice -n 10 ";
expect(
buildNativeHookRelayCommand({
provider: "codex",
relayId: "relay-1",
event: "pre_tool_use",
executable: "openclaw",
nice: 10,
}),
).toBe(
`${prefix}openclaw hooks relay --provider codex --relay-id relay-1 --event pre_tool_use --timeout 5000`,
);
});
});

View File

@@ -17,9 +17,8 @@ import { hasGlobalHooks } from "../../plugins/hook-runner-global.js";
import { PluginApprovalResolutions } from "../../plugins/types.js";
import { uniqueValues } from "../../shared/string-normalization.js";
import { asBoolean } from "../../utils/boolean.js";
import { hasBeforeToolCallPolicy, runBeforeToolCallHook } from "../pi-tools.before-tool-call.js";
import { runBeforeToolCallHook } from "../pi-tools.before-tool-call.js";
import { stableStringify } from "../stable-stringify.js";
import { resolveToolLoopDetectionConfig } from "../tool-loop-detection-config.js";
import { normalizeToolName } from "../tool-policy.js";
import { callGatewayTool } from "../tools/gateway.js";
import { runAgentHarnessAfterToolCallHook } from "./hook-helpers.js";
@@ -111,7 +110,6 @@ export type RegisterNativeHookRelayParams = {
export type NativeHookRelayCommandOptions = {
executable?: string;
nice?: number | false;
nodeExecutable?: string;
timeoutMs?: number;
};
@@ -326,13 +324,12 @@ export function registerNativeHookRelay(
registerNativeHookRelayBridge(registration);
const handle: NativeHookRelayRegistrationHandle = {
...registration,
shouldRelayEvent: (event) => nativeHookRelayEventHasLocalWork(registration, event),
shouldRelayEvent: nativeHookRelayEventHasLocalWork,
commandForEvent: (event) =>
buildNativeHookRelayCommand({
provider: params.provider,
relayId,
event,
nice: params.command?.nice,
timeoutMs: params.command?.timeoutMs,
executable: params.command?.executable,
nodeExecutable: params.command?.nodeExecutable,
@@ -379,24 +376,12 @@ function normalizeRelayId(value: string | undefined): string | undefined {
return trimmed;
}
function resolveNativeHookRelayNicePrefix(value: number | false | undefined): string[] {
if (process.platform === "win32" || value === false || value === undefined) {
return [];
}
const nice = normalizePositiveInteger(value, 0);
if (nice <= 0) {
return [];
}
return ["nice", "-n", String(nice)];
}
export function buildNativeHookRelayCommand(params: {
provider: NativeHookRelayProvider;
relayId: string;
event: NativeHookRelayEvent;
timeoutMs?: number;
executable?: string;
nice?: number | false;
nodeExecutable?: string;
}): string {
const timeoutMs = normalizePositiveInteger(params.timeoutMs, DEFAULT_RELAY_TIMEOUT_MS);
@@ -405,9 +390,7 @@ export function buildNativeHookRelayCommand(params: {
executable === "openclaw"
? ["openclaw"]
: [params.nodeExecutable ?? process.execPath, executable];
const nicePrefix = resolveNativeHookRelayNicePrefix(params.nice);
return shellQuoteArgs([
...nicePrefix,
...argv,
"hooks",
"relay",
@@ -422,25 +405,9 @@ export function buildNativeHookRelayCommand(params: {
]);
}
function nativePreToolUseMayRunLoopDetection(registration: NativeHookRelayRegistration): boolean {
if (!registration.sessionKey) {
return false;
}
const loopDetection = resolveToolLoopDetectionConfig({
cfg: registration.config,
agentId: registration.agentId,
});
return loopDetection?.enabled !== false;
}
function nativeHookRelayEventHasLocalWork(
registration: NativeHookRelayRegistration,
event: NativeHookRelayEvent,
): boolean {
function nativeHookRelayEventHasLocalWork(event: NativeHookRelayEvent): boolean {
if (event === "pre_tool_use") {
// Avoid spawning a native hook relay for every Codex tool call when there
// is no before_tool_call hook, trusted-tool policy, or loop detector work.
return hasBeforeToolCallPolicy() || nativePreToolUseMayRunLoopDetection(registration);
return true;
}
if (event === "post_tool_use") {
return hasGlobalHooks("after_tool_call");

View File

@@ -20,7 +20,7 @@ import { discoverAuthStorage, discoverModels } from "./pi-model-discovery.js";
const LIVE = isLiveTestEnabled();
const REQUIRE_PROFILE_KEYS = isLiveProfileKeyModeEnabled();
const DEFAULT_TARGET_MODEL_REF = "openai-codex/gpt-5.1-codex-mini";
const DEFAULT_TARGET_MODEL_REF = "openai-codex/gpt-5.4-mini";
const TARGET_MODEL_REF =
process.env.OPENCLAW_LIVE_OPENAI_REASONING_COMPAT_MODEL?.trim() || DEFAULT_TARGET_MODEL_REF;
const describeLive = LIVE ? describe : describe.skip;

File diff suppressed because it is too large Load Diff

View File

@@ -240,7 +240,7 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(coordinator.getRoutedCounts().block).toBe(0);
});
it("waits for direct block dispatcher delivery before resolving block delivery", async () => {
it("does not wait for direct block dispatcher delivery before resolving block delivery", async () => {
const delivered: unknown[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
@@ -276,13 +276,68 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
});
await deliveryStarted;
await Promise.resolve();
expect(delivered).toEqual([{ text: "hello" }]);
expect(deliverySettled).toBe(false);
expect(deliverySettled).toBe(true);
releaseDelivery?.();
await expect(deliveryPromise).resolves.toBe(true);
expect(deliverySettled).toBe(true);
await dispatcher.waitForIdle();
});
it("waits for pending direct block delivery before resolving tool delivery", async () => {
const delivered: unknown[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
const deliveryStarted = new Promise<void>((resolve) => {
markDeliveryStarted = resolve;
});
const deliveryGate = new Promise<void>((resolve) => {
releaseDelivery = resolve;
});
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
delivered.push(payload);
markDeliveryStarted?.();
await deliveryGate;
},
});
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "visiblechat",
Surface: "visiblechat",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher,
inboundAudio: false,
shouldRouteToOriginating: false,
});
await expect(coordinator.deliver("block", { text: "hello" }, { skipTts: true })).resolves.toBe(
true,
);
await deliveryStarted;
let toolDeliverySettled = false;
const toolDeliveryPromise = coordinator
.deliver("tool", { text: "tool result" }, { skipTts: true })
.then((result) => {
toolDeliverySettled = true;
return result;
});
await Promise.resolve();
expect(delivered).toEqual([{ text: "hello" }]);
expect(toolDeliverySettled).toBe(false);
releaseDelivery?.();
await expect(toolDeliveryPromise).resolves.toBe(true);
expect(toolDeliverySettled).toBe(true);
expect(delivered).toEqual([{ text: "hello" }, { text: "tool result" }]);
});
it("stops waiting for direct block delivery when the ACP dispatch aborts", async () => {

View File

@@ -234,11 +234,23 @@ export function createAcpDispatchDeliveryCoordinator(params: {
},
toolMessageByCallId: new Map(),
};
let hasPendingDirectBlockReplyDelivery = false;
const waitForPendingDirectBlockReplyDelivery = async () => {
if (!hasPendingDirectBlockReplyDelivery) {
return;
}
// ACP direct block replies should not block the common visible-reply path.
// Defer the idle wait until a later tool delivery would otherwise overtake
// that block reply in user-visible ordering.
hasPendingDirectBlockReplyDelivery = false;
await waitForReplyDispatcherIdle(params.dispatcher, params.abortSignal);
};
const settleDirectVisibleText = async () => {
if (state.settledDirectVisibleText || state.queuedDirectVisibleTextDeliveries === 0) {
return;
}
state.settledDirectVisibleText = true;
hasPendingDirectBlockReplyDelivery = false;
await params.dispatcher.waitForIdle();
const failedCounts = params.dispatcher.getFailedCounts();
const failedVisibleCount = failedCounts.block + failedCounts.final;
@@ -440,6 +452,10 @@ export function createAcpDispatchDeliveryCoordinator(params: {
return true;
}
if (kind === "tool") {
await waitForPendingDirectBlockReplyDelivery();
}
const tracksVisibleText = await shouldTreatDeliveredTextAsVisible({
channel: directChannel,
kind,
@@ -462,7 +478,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
state.failedVisibleTextDelivery = true;
}
if (kind === "block" && delivered) {
await waitForReplyDispatcherIdle(params.dispatcher, params.abortSignal);
hasPendingDirectBlockReplyDelivery = true;
}
return delivered;
};

View File

@@ -4738,6 +4738,78 @@ describe("dispatchReplyFromConfig", () => {
expect(replyResolver).not.toHaveBeenCalled();
});
it("does not run plugin-owned binding delivery when the caller already aborted", async () => {
setNoAbort();
hookMocks.runner.hasHooks.mockImplementation(
((hookName?: string) =>
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
);
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "handled",
result: { handled: true, reply: { text: "should not send" } },
});
sessionBindingMocks.resolveByConversation.mockReturnValue({
bindingId: "binding-aborted-1",
targetSessionKey: "plugin-binding:codex:abc123",
targetKind: "session",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "channel:1481858418548412579",
},
status: "active",
boundAt: 1710000000000,
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
pluginRoot: "/workspace/openclaw-app-server",
data: {
kind: "codex-app-server-session",
version: 1,
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/workspace/openclaw",
},
},
} satisfies SessionBindingRecord);
const abortController = new AbortController();
abortController.abort();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "discord",
OriginatingTo: "discord:channel:1481858418548412579",
To: "discord:channel:1481858418548412579",
AccountId: "default",
SenderId: "user-9",
SenderUsername: "ada",
CommandAuthorized: true,
WasMentioned: false,
CommandBody: "who are you",
RawBody: "who are you",
Body: "who are you",
MessageSid: "msg-claim-plugin-aborted-1",
SessionKey: "agent:main:discord:channel:1481858418548412579",
});
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
const result = await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyOptions: { abortSignal: abortController.signal },
replyResolver,
});
expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } });
expect(sessionBindingMocks.touch).not.toHaveBeenCalled();
expect(hookMocks.runner.runInboundClaimForPluginOutcome).not.toHaveBeenCalled();
expect(replyResolver).not.toHaveBeenCalled();
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("lets authorized plugin-owned binding commands fall through to command processing", async () => {
setNoAbort();
expect(
@@ -5706,7 +5778,7 @@ describe("dispatchReplyFromConfig", () => {
expect(callOrder).toEqual(["queued:The answer is 42", "dispatch:The answer is 42"]);
});
it("waits for same-channel block dispatcher delivery before resolving block replies", async () => {
it("does not wait for same-channel block dispatcher delivery before resolving block replies", async () => {
setNoAbort();
const ctx = buildTestCtx({ Provider: "whatsapp" });
const delivered: ReplyPayload[] = [];
@@ -5739,10 +5811,10 @@ describe("dispatchReplyFromConfig", () => {
await deliveryStarted;
expect(delivered).toEqual([{ text: "before tool" }]);
expect(blockReplySettled).toBe(false);
await blockReplyPromise;
expect(blockReplySettled).toBe(true);
releaseDelivery?.();
await blockReplyPromise;
return undefined;
};
@@ -5754,6 +5826,177 @@ describe("dispatchReplyFromConfig", () => {
});
expect(blockReplySettled).toBe(true);
await dispatcher.waitForIdle();
});
it("waits for pending same-channel block delivery before completing block-only dispatch", async () => {
setNoAbort();
const ctx = buildTestCtx({ Provider: "whatsapp" });
const delivered: ReplyPayload[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
const deliveryStarted = new Promise<void>((resolve) => {
markDeliveryStarted = resolve;
});
const deliveryGate = new Promise<void>((resolve) => {
releaseDelivery = resolve;
});
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
delivered.push(payload);
markDeliveryStarted?.();
await deliveryGate;
},
});
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
): Promise<ReplyPayload | undefined> => {
await opts?.onBlockReply?.({ text: "only block" });
return undefined;
};
let dispatchSettled = false;
const dispatchPromise = dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
}).then((result) => {
dispatchSettled = true;
return result;
});
await deliveryStarted;
expect(delivered).toEqual([{ text: "only block" }]);
expect(dispatchSettled).toBe(false);
releaseDelivery?.();
await dispatchPromise;
expect(dispatchSettled).toBe(true);
});
it("waits for pending same-channel block delivery before forwarding tool progress", async () => {
setNoAbort();
const cfg = {
agents: { defaults: { verboseDefault: "on" } },
} as const satisfies OpenClawConfig;
const ctx = buildTestCtx({ Provider: "whatsapp" });
const delivered: ReplyPayload[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
const deliveryStarted = new Promise<void>((resolve) => {
markDeliveryStarted = resolve;
});
const deliveryGate = new Promise<void>((resolve) => {
releaseDelivery = resolve;
});
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
delivered.push(payload);
markDeliveryStarted?.();
await deliveryGate;
},
});
const onToolStart = vi.fn();
let toolProgressSettled = false;
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
): Promise<ReplyPayload | undefined> => {
await opts?.onBlockReply?.({ text: "before tool" });
const toolProgressPromise = Promise.resolve(opts?.onToolStart?.({ name: "lookup" })).then(
() => {
toolProgressSettled = true;
},
);
await deliveryStarted;
expect(delivered).toEqual([{ text: "before tool" }]);
expect(onToolStart).not.toHaveBeenCalled();
expect(toolProgressSettled).toBe(false);
releaseDelivery?.();
await toolProgressPromise;
return undefined;
};
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyResolver,
replyOptions: { onToolStart },
});
expect(toolProgressSettled).toBe(true);
expect(onToolStart).toHaveBeenCalledWith({ name: "lookup" });
});
it("does not synthesize tool-start capability while ordering item progress", async () => {
setNoAbort();
const cfg = {
agents: { defaults: { verboseDefault: "on" } },
} as const satisfies OpenClawConfig;
const ctx = buildTestCtx({ Provider: "whatsapp" });
const delivered: ReplyPayload[] = [];
let releaseDelivery: (() => void) | undefined;
let markDeliveryStarted: (() => void) | undefined;
const deliveryStarted = new Promise<void>((resolve) => {
markDeliveryStarted = resolve;
});
const deliveryGate = new Promise<void>((resolve) => {
releaseDelivery = resolve;
});
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
delivered.push(payload);
markDeliveryStarted?.();
await deliveryGate;
},
});
const onItemEvent = vi.fn();
let itemProgressSettled = false;
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
): Promise<ReplyPayload | undefined> => {
await opts?.onBlockReply?.({ text: "before item" });
expect(opts?.onToolStart).toBeUndefined();
const itemProgressPromise = Promise.resolve(
opts?.onItemEvent?.({ itemId: "1", kind: "tool", progressText: "running" }),
).then(() => {
itemProgressSettled = true;
});
await deliveryStarted;
expect(delivered).toEqual([{ text: "before item" }]);
expect(onItemEvent).not.toHaveBeenCalled();
expect(itemProgressSettled).toBe(false);
releaseDelivery?.();
await itemProgressPromise;
return undefined;
};
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyResolver,
replyOptions: { onItemEvent },
});
expect(itemProgressSettled).toBe(true);
expect(onItemEvent).toHaveBeenCalledWith({
itemId: "1",
kind: "tool",
progressText: "running",
});
});
it("forwards payload metadata into onBlockReplyQueued context", async () => {

View File

@@ -61,6 +61,7 @@ import {
markDiagnosticSessionProgress,
} from "../../logging/diagnostic.js";
import { createDiagnosticMessageLifecycle } from "../../logging/message-lifecycle.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { matchPluginCommand } from "../../plugins/commands.js";
import {
buildPluginBindingDeclinedText,
@@ -131,6 +132,7 @@ import { resolveOriginMessageProvider } from "./origin-routing.js";
import { waitForReplyDispatcherIdle } from "./reply-dispatcher.js";
import type { ReplyDispatcher } from "./reply-dispatcher.types.js";
import { replyRunRegistry, type ReplyOperation } from "./reply-run-registry.js";
import { isReplyProfilerEnabled } from "./reply-timing-tracker.js";
import { admitReplyTurn, resolveReplyTurnKind } from "./reply-turn-admission.js";
import { resolveRoutedDeliveryThreadId } from "./routed-delivery-thread.js";
import { resolveReplyRoutingDecision } from "./routing-policy.js";
@@ -783,6 +785,102 @@ function createAbortAwareDispatcher(params: {
return dispatcher;
}
type ReplyHotPathTimingSpan = {
name: string;
durationMs: number;
elapsedMs: number;
};
type ReplyHotPathTimingSummary = {
totalMs: number;
spans: ReplyHotPathTimingSpan[];
};
const replyHotPathTimingLog = createSubsystemLogger("auto-reply/reply-timing");
const REPLY_HOT_PATH_TIMING_WARN_TOTAL_MS = 1_000;
const REPLY_HOT_PATH_TIMING_WARN_STAGE_MS = 500;
function createReplyHotPathTimingTracker(options: { profilerEnabled?: boolean } = {}): {
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
logIfSlow: (params: {
channel: string;
messageId?: number | string;
sessionKey?: string;
outcome: "completed" | "skipped" | "error";
reason?: string;
}) => void;
} {
if (!options.profilerEnabled) {
// This slow-path splitter was added for latency investigation. Keep it
// inert in normal production dispatches so only explicit profiler runs pay
// the Date.now/span allocation cost.
return {
async measure(_name, run) {
return await run();
},
logIfSlow() {},
};
}
const startedAt = Date.now();
let didLog = false;
const spans: ReplyHotPathTimingSpan[] = [];
const toMs = (value: number) => Math.max(0, Math.round(value));
const snapshot = (): ReplyHotPathTimingSummary => ({
totalMs: toMs(Date.now() - startedAt),
spans: spans.slice(),
});
const shouldLog = (summary: ReplyHotPathTimingSummary) =>
summary.totalMs >= REPLY_HOT_PATH_TIMING_WARN_TOTAL_MS ||
summary.spans.some((span) => span.durationMs >= REPLY_HOT_PATH_TIMING_WARN_STAGE_MS);
const formatSpans = (summary: ReplyHotPathTimingSummary) =>
summary.spans.length > 0
? summary.spans
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
.join(",")
: "none";
return {
async measure(name, run) {
const spanStartedAt = Date.now();
try {
return await run();
} finally {
spans.push({
name,
durationMs: toMs(Date.now() - spanStartedAt),
elapsedMs: toMs(Date.now() - startedAt),
});
}
},
logIfSlow(params) {
if (didLog) {
return;
}
const summary = snapshot();
if (!shouldLog(summary)) {
return;
}
didLog = true;
replyHotPathTimingLog.warn(
`reply hot path timings channel=${params.channel} messageId=${
params.messageId ?? "unknown"
} sessionKey=${params.sessionKey ?? "unknown"} outcome=${params.outcome} totalMs=${
summary.totalMs
} stages=${formatSpans(summary)}${params.reason ? ` reason=${params.reason}` : ""}`,
{
channel: params.channel,
messageId: params.messageId,
sessionKey: params.sessionKey,
outcome: params.outcome,
reason: params.reason,
totalMs: summary.totalMs,
spans: summary.spans,
},
);
},
};
}
export type {
DispatchFromConfigParams,
DispatchFromConfigResult,
@@ -822,12 +920,17 @@ export async function dispatchReplyFromConfig(
hasSessionKey: Boolean(sessionKey),
hasRunId: typeof params.replyOptions?.runId === "string",
};
const replyHotPathTiming = createReplyHotPathTimingTracker({
profilerEnabled: isReplyProfilerEnabled({ config: cfg }),
});
const traceReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
measureDiagnosticsTimelineSpan(name, run, {
phase: "agent-turn",
config: cfg,
attributes: traceAttributes,
});
replyHotPathTiming.measure(name, () =>
measureDiagnosticsTimelineSpan(name, run, {
phase: "agent-turn",
config: cfg,
attributes: traceAttributes,
}),
);
let agentDispatchStartedAt = 0;
const recordProcessed = (
@@ -837,6 +940,15 @@ export async function dispatchReplyFromConfig(
error?: string;
},
) => {
if (diagnosticsEnabled) {
replyHotPathTiming.logIfSlow({
channel,
messageId,
sessionKey,
outcome,
reason: opts?.reason,
});
}
messageLifecycle.markProcessed(outcome, opts);
};
@@ -1432,6 +1544,16 @@ export async function dispatchReplyFromConfig(
counts: dispatcher.getQueuedCounts(),
});
};
const finishReplyOperationAbortedDispatch = (): DispatchFromConfigResult => {
commitInboundDedupeIfClaimed();
recordProcessed("completed", { reason: "reply_operation_aborted" });
markIdle("message_completed");
completeDispatchReplyOperation();
return attachSourceReplyDeliveryMode({
queuedFinal: false,
counts: dispatcher.getQueuedCounts(),
});
};
let pluginFallbackReason:
| "plugin-bound-fallback-missing-plugin"
@@ -1439,6 +1561,9 @@ export async function dispatchReplyFromConfig(
| undefined;
if (pluginOwnedBinding) {
if (isPreDispatchOperationAborted()) {
return finishReplyOperationAbortedDispatch();
}
touchConversationBindingRecord(pluginOwnedBinding.bindingId);
if (shouldBypassPluginOwnedBindingForCommand(ctx)) {
logVerbose(
@@ -2012,6 +2137,17 @@ export async function dispatchReplyFromConfig(
const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary;
const allowSuppressedSourceProgressCallbacks =
params.replyOptions?.allowProgressCallbacksWhenSourceDeliverySuppressed === true;
let hasPendingDirectBlockReplyDelivery = false;
const waitForPendingDirectBlockReplyDelivery = async (abortSignal?: AbortSignal) => {
if (!hasPendingDirectBlockReplyDelivery) {
return;
}
// Direct block replies are queued asynchronously so lightweight replies do
// not wait for dispatcher idle. Flush only before later tool/progress
// callbacks and final completion where external ordering is visible.
hasPendingDirectBlockReplyDelivery = false;
await waitForReplyDispatcherIdle(dispatcher, abortSignal);
};
const shouldForwardProgressCallback = (options?: {
forwardWhenSourceDeliverySuppressed?: boolean;
requiresToolSummaryVisibility?: boolean;
@@ -2031,9 +2167,10 @@ export async function dispatchReplyFromConfig(
forwardWhenSourceDeliverySuppressed?: boolean;
requiresToolSummaryVisibility?: boolean;
onForward?: (...args: Args) => void;
waitForDirectBlockReplyDelivery?: boolean;
},
): ((...args: Args) => Promise<void>) | undefined => {
if (!callback && (!suppressAutomaticSourceDelivery || !canTrackSession)) {
if (!callback) {
return undefined;
}
return async (...args: Args) => {
@@ -2041,6 +2178,12 @@ export async function dispatchReplyFromConfig(
return;
}
markProgress();
if (options?.waitForDirectBlockReplyDelivery) {
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
}
if (shouldForwardProgressCallback(options)) {
options?.onForward?.(...args);
await callback?.(...args);
@@ -2077,10 +2220,12 @@ export async function dispatchReplyFromConfig(
onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
}),
onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
onForward: (payload) => {
if (hasFailedProgressStatus(payload)) {
markVisibleToolErrorProgress();
@@ -2090,6 +2235,7 @@ export async function dispatchReplyFromConfig(
onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
onForward: (payload) => {
if (hasFailedProgressStatus(payload)) {
markVisibleToolErrorProgress();
@@ -2099,14 +2245,20 @@ export async function dispatchReplyFromConfig(
onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
}),
onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd, {
forwardWhenSourceDeliverySuppressed: true,
requiresToolSummaryVisibility: true,
waitForDirectBlockReplyDelivery: true,
}),
onToolResult: (payload: ReplyPayload) => {
markProgress();
const run = async () => {
if (isDispatchOperationAborted()) {
return;
}
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
@@ -2171,6 +2323,10 @@ export async function dispatchReplyFromConfig(
return;
}
markProgress();
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
markInboundDedupeReplayUnsafe();
if (
shouldForwardProgressCallback({
@@ -2193,6 +2349,10 @@ export async function dispatchReplyFromConfig(
return;
}
markProgress();
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
markInboundDedupeReplayUnsafe();
if (
shouldForwardProgressCallback({
@@ -2223,6 +2383,10 @@ export async function dispatchReplyFromConfig(
return;
}
markProgress();
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
if (isDispatchOperationAborted()) {
return;
}
markInboundDedupeReplayUnsafe();
if (
shouldForwardProgressCallback({
@@ -2329,7 +2493,7 @@ export async function dispatchReplyFromConfig(
markInboundDedupeReplayUnsafe();
const delivered = dispatcher.sendBlockReply(normalizedPayload);
if (delivered) {
await waitForReplyDispatcherIdle(dispatcher, context?.abortSignal);
hasPendingDirectBlockReplyDelivery = true;
}
}
};
@@ -2461,6 +2625,7 @@ export async function dispatchReplyFromConfig(
accumulatedBlockTtsText.trim()
) {
try {
await waitForPendingDirectBlockReplyDelivery(getDispatchAbortSignal());
throwIfDispatchOperationAborted();
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
payload: { text: accumulatedBlockTtsText },
@@ -2520,6 +2685,7 @@ export async function dispatchReplyFromConfig(
}
}
await waitForPendingDirectBlockReplyDelivery(getDispatchAbortSignal());
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
commitInboundDedupeIfClaimed();
@@ -2537,14 +2703,7 @@ export async function dispatchReplyFromConfig(
});
} catch (err) {
if (isDispatchReplyOperationAbortedError(err)) {
commitInboundDedupeIfClaimed();
recordProcessed("completed", { reason: "reply_operation_aborted" });
markIdle("message_completed");
completeDispatchReplyOperation();
return attachSourceReplyDeliveryMode({
queuedFinal: false,
counts: dispatcher.getQueuedCounts(),
});
return finishReplyOperationAbortedDispatch();
}
if (inboundDedupeClaim.status === "claimed") {
if (inboundDedupeReplayUnsafe) {

View File

@@ -15,6 +15,7 @@ import { type OpenClawConfig, getRuntimeConfig } from "../../config/config.js";
import { logVerbose } from "../../globals.js";
import { measureDiagnosticsTimelineSpan } from "../../infra/diagnostics-timeline.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { buildAgentHookContextChannelFields } from "../../plugins/hook-agent-context.js";
import { defaultRuntime } from "../../runtime.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
@@ -47,6 +48,7 @@ import { hasInboundMedia } from "./inbound-media.js";
import { emitPreAgentMessageHooks } from "./message-preprocess-hooks.js";
import { createFastTestModelSelectionState, createModelSelectionState } from "./model-selection.js";
import { sanitizePendingFinalDeliveryText } from "./pending-final-delivery.js";
import { createReplyTimingTracker } from "./reply-timing-tracker.js";
import { initSessionState } from "./session.js";
import {
isStaleHeartbeatAutoFallbackOverride,
@@ -89,6 +91,8 @@ const mediaUnderstandingApplyRuntimeLoader = createLazyImportLoader(
const linkUnderstandingApplyRuntimeLoader = createLazyImportLoader(
() => import("../../link-understanding/apply.runtime.js"),
);
const replyResolverTimingLog = createSubsystemLogger("auto-reply/reply-resolver-timing");
const commandsCoreRuntimeLoader = createLazyImportLoader(
() => import("./commands-core.runtime.js"),
);
@@ -214,45 +218,85 @@ export async function getReplyFromConfig(
isFastTestEnv,
configOverride,
});
const useFastTestBootstrap = shouldUseReplyFastTestBootstrap({
isFastTestEnv,
configOverride,
});
const useFastTestRuntime = shouldUseReplyFastTestRuntime({
cfg,
isFastTestEnv,
});
const finalized = finalizeInboundContext(ctx);
const targetSessionKey = resolveCommandTurnTargetSessionKey(finalized);
const agentSessionKey = targetSessionKey || finalized.SessionKey;
const traceAttributes = {
// Profiler spans stay inert unless diagnostics enable `profiler` or
// `reply.profiler`, so normal replies do not pay per-stage Date.now/array
// bookkeeping while we can still split resolver costs on demand.
const resolverTiming = createReplyTimingTracker({ log: replyResolverTimingLog, config: cfg });
const useFastTestBootstrap = resolverTiming.measureSync("reply.resolve_fast_test_bootstrap", () =>
shouldUseReplyFastTestBootstrap({
isFastTestEnv,
configOverride,
}),
);
const useFastTestRuntime = resolverTiming.measureSync("reply.resolve_fast_test_runtime", () =>
shouldUseReplyFastTestRuntime({
cfg,
isFastTestEnv,
}),
);
const finalized = resolverTiming.measureSync("reply.finalize_context", () =>
finalizeInboundContext(ctx),
);
const { agentSessionKey, agentId } = resolverTiming.measureSync(
"reply.resolve_agent_scope",
() => {
const targetSessionKey = resolveCommandTurnTargetSessionKey(finalized);
const resolvedAgentSessionKey = targetSessionKey || finalized.SessionKey;
return {
agentSessionKey: resolvedAgentSessionKey,
agentId: resolveSessionAgentId({
sessionKey: resolvedAgentSessionKey,
config: cfg,
}),
};
},
);
const traceAttributes = resolverTiming.measureSync("reply.resolve_trace_context", () => ({
surface: normalizeOptionalString(finalized.Surface ?? finalized.Provider) ?? "unknown",
hasSessionKey: Boolean(agentSessionKey),
isHeartbeat: opts?.isHeartbeat === true,
hasMedia: hasInboundMedia(finalized),
};
const traceGetReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
measureDiagnosticsTimelineSpan(name, run, {
phase: "agent-turn",
config: cfg,
attributes: traceAttributes,
}));
const messageId = finalized.MessageSid ?? finalized.MessageSidFirst ?? finalized.MessageSidLast;
let resolverTimingSessionKey = agentSessionKey;
const logResolverTiming = (outcome: string, reason?: string, error?: string) =>
resolverTiming.logIfSlow({
message: `reply resolver timings surface=${traceAttributes.surface} messageId=${
messageId ?? "unknown"
} sessionKey=${resolverTimingSessionKey ?? "unknown"} agentId=${agentId}`,
outcome,
reason,
error,
details: {
surface: traceAttributes.surface,
messageId,
sessionKey: resolverTimingSessionKey,
agentId,
},
});
const agentId = resolveSessionAgentId({
sessionKey: agentSessionKey,
config: cfg,
});
const mergedSkillFilter = mergeSkillFilters(
opts?.skillFilter,
resolveAgentSkillsFilter(cfg, agentId),
const traceGetReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
resolverTiming.measure(name, () =>
measureDiagnosticsTimelineSpan(name, run, {
phase: "agent-turn",
config: cfg,
attributes: traceAttributes,
}),
);
const mergedSkillFilter = resolverTiming.measureSync("reply.resolve_skill_filter", () =>
mergeSkillFilters(opts?.skillFilter, resolveAgentSkillsFilter(cfg, agentId)),
);
const resolvedOpts =
mergedSkillFilter !== undefined ? { ...opts, skillFilter: mergedSkillFilter } : opts;
const agentCfg = cfg.agents?.defaults;
const sessionCfg = cfg.session;
const { defaultProvider, defaultModel, aliasIndex } = resolveDefaultModel({
cfg,
agentId,
});
const { defaultProvider, defaultModel, aliasIndex } = resolverTiming.measureSync(
"reply.resolve_default_model",
() =>
resolveDefaultModel({
cfg,
agentId,
}),
);
let provider = defaultProvider;
let model = defaultModel;
let hasResolvedHeartbeatModelOverride = false;
@@ -277,22 +321,34 @@ export async function getReplyFromConfig(
}
}
const workspaceDirRaw = resolveAgentWorkspaceDir(cfg, agentId) ?? DEFAULT_AGENT_WORKSPACE_DIR;
const workspaceDirForNativeCommand = workspaceDirRaw;
const agentDir = resolveAgentDir(cfg, agentId);
const timeoutMs = resolveAgentTimeoutMs({ cfg, overrideSeconds: opts?.timeoutOverrideSeconds });
const configuredTypingSeconds =
agentCfg?.typingIntervalSeconds ?? sessionCfg?.typingIntervalSeconds;
const typingIntervalSeconds =
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6;
const typing = createTypingController({
onReplyStart: opts?.onReplyStart,
onCleanup: opts?.onTypingCleanup,
typingIntervalSeconds,
silentToken: SILENT_REPLY_TOKEN,
log: defaultRuntime.log,
const { workspaceDirRaw, workspaceDirForNativeCommand, agentDir, timeoutMs } =
resolverTiming.measureSync("reply.resolve_workspace_agent_dir", () => {
const workspaceDirRaw = resolveAgentWorkspaceDir(cfg, agentId) ?? DEFAULT_AGENT_WORKSPACE_DIR;
return {
workspaceDirRaw,
workspaceDirForNativeCommand: workspaceDirRaw,
agentDir: resolveAgentDir(cfg, agentId),
timeoutMs: resolveAgentTimeoutMs({
cfg,
overrideSeconds: opts?.timeoutOverrideSeconds,
}),
};
});
const typing = resolverTiming.measureSync("reply.create_typing_controller", () => {
const configuredTypingSeconds =
agentCfg?.typingIntervalSeconds ?? sessionCfg?.typingIntervalSeconds;
const typingIntervalSeconds =
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6;
const controller = createTypingController({
onReplyStart: opts?.onReplyStart,
onCleanup: opts?.onTypingCleanup,
typingIntervalSeconds,
silentToken: SILENT_REPLY_TOKEN,
log: defaultRuntime.log,
});
opts?.onTypingController?.(controller);
return controller;
});
opts?.onTypingController?.(typing);
const nativeSlashCommandFastReply = await traceGetReplyPhase(
"reply.native_slash_command_fast_path",
@@ -316,6 +372,7 @@ export async function getReplyFromConfig(
}),
);
if (nativeSlashCommandFastReply.handled) {
logResolverTiming("completed", "native_slash_command_fast_path");
return nativeSlashCommandFastReply.reply;
}
@@ -390,6 +447,7 @@ export async function getReplyFromConfig(
triggerBodyNormalized,
bodyStripped,
} = sessionState;
resolverTimingSessionKey = sessionKey ?? resolverTimingSessionKey;
if (sessionEntry?.pendingFinalDelivery && sessionEntry.pendingFinalDeliveryText) {
const text = sanitizePendingFinalDeliveryText(sessionEntry.pendingFinalDeliveryText);
@@ -455,6 +513,7 @@ export async function getReplyFromConfig(
}),
});
}
logResolverTiming("completed", "pending_final_delivery_replay");
return { text: replayText };
}
}
@@ -579,7 +638,8 @@ export async function getReplyFromConfig(
triggerBodyNormalized,
commandAuthorized,
});
return await traceGetReplyPhase("reply.run_prepared_reply", () =>
logResolverTiming("milestone", "before_fast_directive_prepared_reply");
const fastReplyResult = await traceGetReplyPhase("reply.run_prepared_reply", () =>
runPreparedReply({
ctx,
sessionCtx,
@@ -636,6 +696,8 @@ export async function getReplyFromConfig(
autoFallbackPrimaryProbe,
}),
);
logResolverTiming("completed", "fast_directive_prepared_reply");
return fastReplyResult;
}
const directiveResult = await traceGetReplyPhase("reply.resolve_directives", () =>
@@ -671,6 +733,7 @@ export async function getReplyFromConfig(
}),
);
if (directiveResult.kind === "reply") {
logResolverTiming("completed", "directive_reply");
return directiveResult.reply;
}
@@ -771,6 +834,7 @@ export async function getReplyFromConfig(
);
if (inlineActionResult.kind === "reply") {
await maybeEmitMissingResetHooks();
logResolverTiming("completed", "inline_action_reply");
return inlineActionResult.reply;
}
await maybeEmitMissingResetHooks();
@@ -862,6 +926,7 @@ export async function getReplyFromConfig(
),
);
if (hookResult?.handled) {
logResolverTiming("completed", "before_agent_reply_hook");
return hookResult.reply ?? { text: SILENT_REPLY_TOKEN };
}
}
@@ -884,7 +949,8 @@ export async function getReplyFromConfig(
);
}
return await traceGetReplyPhase("reply.run_prepared_reply", () =>
logResolverTiming("milestone", "before_run_prepared_reply");
const replyResult = await traceGetReplyPhase("reply.run_prepared_reply", () =>
runPreparedReply({
ctx,
sessionCtx,
@@ -931,4 +997,6 @@ export async function getReplyFromConfig(
autoFallbackPrimaryProbe: runAutoFallbackPrimaryProbe,
}),
);
logResolverTiming("completed", "prepared_reply");
return replyResult;
}

View File

@@ -0,0 +1,48 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { createReplyTimingTracker, isReplyProfilerEnabled } from "./reply-timing-tracker.js";
describe("isReplyProfilerEnabled", () => {
it("matches global and reply profiler diagnostic flags", () => {
const cfg = { diagnostics: { flags: ["reply.profiler"] } } as OpenClawConfig;
expect(isReplyProfilerEnabled({ config: cfg, env: {} as NodeJS.ProcessEnv })).toBe(true);
expect(
isReplyProfilerEnabled({
env: { OPENCLAW_DIAGNOSTICS: "profiler" } as NodeJS.ProcessEnv,
}),
).toBe(true);
});
});
describe("createReplyTimingTracker", () => {
it("is a pass-through tracker unless the profiler flag is enabled", async () => {
const warn = vi.fn();
const tracker = createReplyTimingTracker({ log: { warn } });
expect(tracker.measureSync("sync", () => 42)).toBe(42);
await expect(tracker.measure("async", async () => "ok")).resolves.toBe("ok");
tracker.logIfSlow({ message: "reply timings" });
expect(warn).not.toHaveBeenCalled();
});
it("records and logs spans when the profiler flag is enabled", () => {
const warn = vi.fn();
const tracker = createReplyTimingTracker({
log: { warn },
env: { OPENCLAW_DIAGNOSTICS: "reply.profiler" } as NodeJS.ProcessEnv,
totalWarnMs: 0,
stageWarnMs: 0,
});
expect(tracker.measureSync("sync", () => 7)).toBe(7);
tracker.logIfSlow({ message: "reply timings", outcome: "completed" });
expect(warn).toHaveBeenCalledOnce();
expect(warn.mock.calls[0]?.[0]).toContain("stages=sync:");
expect(warn.mock.calls[0]?.[1]).toMatchObject({
outcome: "completed",
spans: [expect.objectContaining({ name: "sync" })],
});
});
});

View File

@@ -0,0 +1,141 @@
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { isDiagnosticFlagEnabled } from "../../infra/diagnostic-flags.js";
type ReplyTimingSpan = {
name: string;
durationMs: number;
elapsedMs: number;
};
type ReplyTimingSummary = {
totalMs: number;
spans: ReplyTimingSpan[];
};
type ReplyTimingLogger = {
warn: (message: string, details?: Record<string, unknown>) => void;
};
type ReplyTimingTracker = {
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
measureSync: <T>(name: string, run: () => T) => T;
logIfSlow: (params: {
message: string;
outcome?: string;
reason?: string;
error?: string;
details?: Record<string, unknown>;
}) => void;
};
const DEFAULT_TIMING_WARN_TOTAL_MS = 1_000;
const DEFAULT_TIMING_WARN_STAGE_MS = 500;
export function isReplyProfilerEnabled(params?: {
config?: OpenClawConfig;
env?: NodeJS.ProcessEnv;
}): boolean {
const cfg = params?.config;
const env = params?.env ?? process.env;
return (
isDiagnosticFlagEnabled("profiler", cfg, env) ||
isDiagnosticFlagEnabled("reply.profiler", cfg, env)
);
}
export function createReplyTimingTracker(params: {
log: ReplyTimingLogger;
config?: OpenClawConfig;
env?: NodeJS.ProcessEnv;
enabled?: boolean;
totalWarnMs?: number;
stageWarnMs?: number;
}): ReplyTimingTracker {
const enabled =
params.enabled ?? isReplyProfilerEnabled({ config: params.config, env: params.env });
if (!enabled) {
// Normal production turns use pass-through wrappers so added profiling
// calls do not allocate spans or call Date.now on the hot reply path.
return {
async measure(_name, run) {
return await run();
},
measureSync(_name, run) {
return run();
},
logIfSlow() {},
};
}
const startedAt = Date.now();
const spans: ReplyTimingSpan[] = [];
let didLog = false;
const totalWarnMs = params.totalWarnMs ?? DEFAULT_TIMING_WARN_TOTAL_MS;
const stageWarnMs = params.stageWarnMs ?? DEFAULT_TIMING_WARN_STAGE_MS;
const toMs = (value: number) => Math.max(0, Math.round(value));
const record = (name: string, spanStartedAt: number) => {
spans.push({
name,
durationMs: toMs(Date.now() - spanStartedAt),
elapsedMs: toMs(Date.now() - startedAt),
});
};
const snapshot = (): ReplyTimingSummary => ({
totalMs: toMs(Date.now() - startedAt),
spans: spans.slice(),
});
const shouldLog = (summary: ReplyTimingSummary) =>
summary.totalMs >= totalWarnMs || summary.spans.some((span) => span.durationMs >= stageWarnMs);
const formatSpans = (summary: ReplyTimingSummary) =>
summary.spans.length > 0
? summary.spans
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
.join(",")
: "none";
return {
async measure(name, run) {
const spanStartedAt = Date.now();
try {
return await run();
} finally {
record(name, spanStartedAt);
}
},
measureSync(name, run) {
const spanStartedAt = Date.now();
try {
return run();
} finally {
record(name, spanStartedAt);
}
},
logIfSlow(logParams) {
if (didLog) {
return;
}
const summary = snapshot();
if (!shouldLog(summary)) {
return;
}
didLog = true;
const suffix = [
`totalMs=${summary.totalMs}`,
`stages=${formatSpans(summary)}`,
logParams.outcome ? `outcome=${logParams.outcome}` : undefined,
logParams.reason ? `reason=${logParams.reason}` : undefined,
logParams.error ? `error="${logParams.error}"` : undefined,
]
.filter(Boolean)
.join(" ");
params.log.warn(`${logParams.message} ${suffix}`, {
...logParams.details,
outcome: logParams.outcome,
reason: logParams.reason,
error: logParams.error,
totalMs: summary.totalMs,
spans: summary.spans,
});
},
};
}

View File

@@ -133,6 +133,24 @@ describe("startGatewayMaintenanceTimers", () => {
stopMaintenanceTimers(timers);
});
it("refreshes automatic health snapshots without live channel probes", async () => {
vi.useFakeTimers();
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
const deps = createMaintenanceTimerDeps();
deps.refreshGatewayHealthSnapshot = vi.fn(async () => ({ ok: true }) as HealthSummary);
const timers = startGatewayMaintenanceTimers(deps);
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenCalledWith({ probe: false });
await vi.advanceTimersByTimeAsync(60_000);
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenCalledTimes(2);
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenLastCalledWith({ probe: false });
stopMaintenanceTimers(timers);
});
it("skips overlapping media cleanup runs", async () => {
vi.useFakeTimers();
let resolveCleanup = () => {};

View File

@@ -72,16 +72,17 @@ export function startGatewayMaintenanceTimers(params: {
params.nodeSendToAllSubscribed("tick", payload);
}, TICK_INTERVAL_MS);
// periodic health refresh to keep cached snapshot warm
// Keep cached health warm without request-time live channel probes. Explicit
// status/doctor probe paths still pass probe=true when the operator asks.
const healthInterval = setInterval(() => {
void params
.refreshGatewayHealthSnapshot({ probe: true })
.refreshGatewayHealthSnapshot({ probe: false })
.catch((err) => params.logHealth.error(`refresh failed: ${formatError(err)}`));
}, HEALTH_REFRESH_INTERVAL_MS);
// Prime cache so first client gets a snapshot without waiting.
void params
.refreshGatewayHealthSnapshot({ probe: true })
.refreshGatewayHealthSnapshot({ probe: false })
.catch((err) => params.logHealth.error(`initial refresh failed: ${formatError(err)}`));
// dedupe cache cleanup

View File

@@ -199,6 +199,13 @@ function deliveryCall(index = 0): Record<string, any> | undefined {
return calls[index]?.[0];
}
function appendTranscriptCall(index = 0): Record<string, any> | undefined {
const calls = mocks.appendAssistantMessageToSessionTranscript.mock.calls as unknown as Array<
[Record<string, any>]
>;
return calls[index]?.[0];
}
function firstRespondCall(respond: ReturnType<typeof vi.fn>) {
const calls = respond.mock.calls as unknown as Array<
[
@@ -1455,6 +1462,178 @@ describe("gateway send mirroring", () => {
});
});
it("waits for source transcript mirroring before responding to message.action", async () => {
const telegramPlugin: ChannelPlugin = {
id: "telegram",
meta: {
id: "telegram",
label: "Telegram",
selectionLabel: "Telegram",
docsPath: "/channels/telegram",
blurb: "Telegram async source send transcript mirror test plugin.",
},
capabilities: { chatTypes: ["direct"] },
config: {
listAccountIds: () => ["default"],
resolveAccount: () => ({ enabled: true }),
isConfigured: () => true,
},
actions: {
describeMessageTool: () => ({ actions: ["send"] }),
supportsAction: ({ action }) => action === "send",
handleAction: async () => jsonResult({ ok: true, messageId: "tg-async-1" }),
},
};
const mirrorDeferred = createDeferred<{ ok: boolean; sessionFile: string }>();
mocks.getChannelPlugin.mockReturnValue(telegramPlugin);
setActivePluginRegistry(
createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]),
"send-test-source-message-action-async-mirror",
);
mocks.dispatchChannelMessageAction.mockResolvedValueOnce(
jsonResult({ ok: true, messageId: "tg-async-1" }),
);
mocks.appendAssistantMessageToSessionTranscript.mockReturnValueOnce(mirrorDeferred.promise);
const respond = vi.fn();
const request = sendHandlers["message.action"]({
params: {
channel: "telegram",
action: "send",
params: {
to: "chat-123",
message: "visible media caption",
},
sessionKey: "agent:main:telegram:direct:chat-123",
agentId: "main",
toolContext: {
currentChannelProvider: "telegram",
currentChannelId: "chat-123",
},
idempotencyKey: "idem-async-source-message-action",
} as never,
respond,
context: makeContext(),
req: { type: "req", id: "1", method: "message.action" },
client: null,
isWebchatConnect: () => false,
});
await vi.waitFor(() => {
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
});
expect(respond).not.toHaveBeenCalled();
mirrorDeferred.resolve({ ok: true, sessionFile: "x" });
await request;
expect(firstRespondCall(respond)[0]).toBe(true);
});
it("preserves source transcript mirror order before message.action responses", async () => {
const telegramPlugin: ChannelPlugin = {
id: "telegram",
meta: {
id: "telegram",
label: "Telegram",
selectionLabel: "Telegram",
docsPath: "/channels/telegram",
blurb: "Telegram ordered async source send transcript mirror test plugin.",
},
capabilities: { chatTypes: ["direct"] },
config: {
listAccountIds: () => ["default"],
resolveAccount: () => ({ enabled: true }),
isConfigured: () => true,
},
actions: {
describeMessageTool: () => ({ actions: ["send"] }),
supportsAction: ({ action }) => action === "send",
handleAction: async () => jsonResult({ ok: true, messageId: "tg-ordered" }),
},
};
const firstMirrorDeferred = createDeferred<{ ok: boolean; sessionFile: string }>();
mocks.getChannelPlugin.mockReturnValue(telegramPlugin);
setActivePluginRegistry(
createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]),
"send-test-source-message-action-ordered-async-mirror",
);
mocks.dispatchChannelMessageAction.mockResolvedValue(
jsonResult({ ok: true, messageId: "tg-ordered" }),
);
mocks.appendAssistantMessageToSessionTranscript
.mockReturnValueOnce(firstMirrorDeferred.promise)
.mockResolvedValueOnce({ ok: true, sessionFile: "x" });
const firstRespond = vi.fn();
const secondRespond = vi.fn();
const first = sendHandlers["message.action"]({
params: {
channel: "telegram",
action: "send",
params: {
to: "chat-123",
message: "first visible reply",
},
sessionKey: "agent:main:telegram:direct:chat-123",
agentId: "main",
toolContext: {
currentChannelProvider: "telegram",
currentChannelId: "chat-123",
},
idempotencyKey: "idem-ordered-source-message-action-1",
} as never,
respond: firstRespond,
context: makeContext(),
req: { type: "req", id: "1", method: "message.action" },
client: null,
isWebchatConnect: () => false,
});
await vi.waitFor(() => {
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
});
const second = sendHandlers["message.action"]({
params: {
channel: "telegram",
action: "send",
params: {
to: "chat-123",
message: "second visible reply",
},
sessionKey: "agent:main:telegram:direct:chat-123",
agentId: "main",
toolContext: {
currentChannelProvider: "telegram",
currentChannelId: "chat-123",
},
idempotencyKey: "idem-ordered-source-message-action-2",
} as never,
respond: secondRespond,
context: makeContext(),
req: { type: "req", id: "2", method: "message.action" },
client: null,
isWebchatConnect: () => false,
});
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
expect(firstRespond).not.toHaveBeenCalled();
expect(secondRespond).not.toHaveBeenCalled();
expect(appendTranscriptCall(0)).toEqual(
expect.objectContaining({ text: "first visible reply" }),
);
firstMirrorDeferred.resolve({ ok: true, sessionFile: "x" });
await first;
await second;
expect(firstRespondCall(firstRespond)[0]).toBe(true);
expect(firstRespondCall(secondRespond)[0]).toBe(true);
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(2);
expect(appendTranscriptCall(1)).toEqual(
expect.objectContaining({ text: "second visible reply" }),
);
});
it("mirrors presentation-only source-conversation message.action sends", async () => {
const telegramPlugin: ChannelPlugin = {
id: "telegram",

View File

@@ -288,6 +288,37 @@ async function mirrorDeliveredSourceReplyToTranscriptBestEffort(params: {
}
}
const sourceReplyTranscriptMirrorQueues = new Map<string, Promise<void>>();
function resolveSourceReplyTranscriptMirrorQueueKey(
mirror: Parameters<typeof mirrorDeliveredSourceReplyToTranscript>[0],
): string {
return mirror.sessionKey?.trim() || "__global__";
}
function scheduleDeliveredSourceReplyTranscriptMirror(params: {
context: GatewayRequestContext;
mirror: Parameters<typeof mirrorDeliveredSourceReplyToTranscript>[0];
}): Promise<void> {
const queueKey = resolveSourceReplyTranscriptMirrorQueueKey(params.mirror);
const previous = sourceReplyTranscriptMirrorQueues.get(queueKey);
// Queue per session so current-conversation source replies are visible before
// a following turn can read the transcript.
const queued = (async () => {
await previous?.catch(() => undefined);
await mirrorDeliveredSourceReplyToTranscriptBestEffort(params);
})();
sourceReplyTranscriptMirrorQueues.set(queueKey, queued);
void queued
.finally(() => {
if (sourceReplyTranscriptMirrorQueues.get(queueKey) === queued) {
sourceReplyTranscriptMirrorQueues.delete(queueKey);
}
})
.catch(() => undefined);
return queued;
}
export const sendHandlers: GatewayRequestHandlers = {
"message.action": async ({ params, respond, context, client }) => {
const p = params;
@@ -399,7 +430,7 @@ export const sendHandlers: GatewayRequestHandlers = {
const agentId =
normalizeOptionalString(request.agentId) ??
(sessionKey ? resolveSessionAgentId({ sessionKey, config: cfg }) : undefined);
await mirrorDeliveredSourceReplyToTranscriptBestEffort({
await scheduleDeliveredSourceReplyTranscriptMirror({
context,
mirror: {
action: request.action,

View File

@@ -224,7 +224,7 @@ describe("attachGatewayWsMessageHandler post-connect health refresh", () => {
expect(hello.ok).toBe(true);
await vi.waitFor(() => {
expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: true });
expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: false });
});
resolveRefresh?.();
});

View File

@@ -1776,7 +1776,10 @@ export function attachGatewayWsMessageHandler(params: GatewayWsMessageHandlerPar
presence: snapshot.presence.length,
stateVersion: snapshot.stateVersion.presence,
});
void refreshHealthSnapshot({ probe: true }).catch((err) =>
// Post-connect refresh only needs a cached/config snapshot for UI state;
// live channel probes here pulled slow Discord/Telegram HTTP checks into
// reply-adjacent websocket handshakes.
void refreshHealthSnapshot({ probe: false }).catch((err) =>
logHealth.error(`post-connect health refresh failed: ${formatError(err)}`),
);
return;

View File

@@ -250,6 +250,78 @@ describe("heartbeat runner skips when target session lane is busy", () => {
});
});
it("returns requests-in-flight when another session for the same agent has an active reply run", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
await seedHeartbeatTelegramSession(storePath, cfg);
const listActiveReplyRunSessionKeys = vi.fn(() => [
"agent:main:telegram:group:-1003966283270:topic:547",
]);
const result = await runHeartbeatOnce({
cfg,
deps: {
getQueueSize: vi.fn((_lane?: string) => 0),
listActiveReplyRunSessionKeys,
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT });
expect(listActiveReplyRunSessionKeys).toHaveBeenCalledOnce();
expect(replySpy).not.toHaveBeenCalled();
});
});
it("ignores unscoped active reply runs when checking same-agent heartbeat work", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
await seedHeartbeatTelegramSession(storePath, cfg);
const listActiveReplyRunSessionKeys = vi.fn(() => ["legacy-session-key"]);
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
const result = await runHeartbeatOnce({
cfg,
deps: {
getQueueSize: vi.fn((_lane?: string) => 0),
listActiveReplyRunSessionKeys,
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result.status).toBe("ran");
expect(listActiveReplyRunSessionKeys).toHaveBeenCalledOnce();
expect(replySpy).toHaveBeenCalledOnce();
});
});
it("does not defer immediate heartbeat wakes for another active session", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
await seedHeartbeatTelegramSession(storePath, cfg);
const listActiveReplyRunSessionKeys = vi.fn(() => [
"agent:main:telegram:group:-1003966283270:topic:547",
]);
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
const result = await runHeartbeatOnce({
cfg,
intent: "immediate",
deps: {
getQueueSize: vi.fn((_lane?: string) => 0),
listActiveReplyRunSessionKeys,
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result.status).toBe("ran");
expect(replySpy).toHaveBeenCalledOnce();
});
});
it("does not defer on a recent heartbeat ack pending final delivery", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();

View File

@@ -36,7 +36,10 @@ import {
} from "../auto-reply/heartbeat.js";
import { replaceGenericExternalRunFailureText } from "../auto-reply/reply/agent-runner-failure-copy.js";
import { resolveDefaultModel } from "../auto-reply/reply/directive-handling.defaults.js";
import { replyRunRegistry } from "../auto-reply/reply/reply-run-registry.js";
import {
listActiveReplyRunSessionKeys,
replyRunRegistry,
} from "../auto-reply/reply/reply-run-registry.js";
import { resolveResponsePrefixTemplate } from "../auto-reply/reply/response-prefix-template.js";
import { HEARTBEAT_TOKEN } from "../auto-reply/tokens.js";
import type { ReplyPayload } from "../auto-reply/types.js";
@@ -148,6 +151,7 @@ export type HeartbeatDeps = OutboundSendDeps &
getQueueSize?: (lane?: string) => number;
getCommandLaneSnapshots?: () => readonly CommandLaneSnapshot[];
isReplyRunActive?: (sessionKey: string) => boolean;
listActiveReplyRunSessionKeys?: () => readonly string[];
nowMs?: () => number;
};
@@ -221,6 +225,17 @@ function hasAgentOptInBusyLaneWork(
return hasQueuedWorkInLaneSnapshots(getSnapshots(), (lane) => laneBelongsToAgent(lane, agentId));
}
function hasActiveReplyRunForAgent(
agentId: string,
listSessionKeys: () => readonly string[],
): boolean {
const normalizedAgentId = normalizeAgentId(agentId);
return listSessionKeys().some((sessionKey) => {
const parsed = parseAgentSessionKey(sessionKey);
return parsed ? normalizeAgentId(parsed.agentId) === normalizedAgentId : false;
});
}
function resolveHeartbeatChannelPlugin(channel: string): ChannelPlugin | undefined {
const activePlugin = getActivePluginChannelRegistry()?.channels.find(
(entry) => entry.plugin.id === channel,
@@ -1345,6 +1360,21 @@ export async function runHeartbeatOnce(opts: {
return { status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY };
}
const shouldHonorActiveReplyRuns = opts.intent !== "immediate" && opts.intent !== "manual";
const listActiveReplyRuns =
opts.deps?.listActiveReplyRunSessionKeys ?? listActiveReplyRunSessionKeys;
// Scheduled heartbeats are background work, so defer them when any session on
// the same agent is already replying; immediate/manual wakes keep their
// existing semantics for explicit user/system actions.
if (shouldHonorActiveReplyRuns && hasActiveReplyRunForAgent(agentId, listActiveReplyRuns)) {
emitHeartbeatEvent({
status: "skipped",
reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
durationMs: Date.now() - startedAt,
});
return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT };
}
// Phase 2: Stronger heartbeat deferral while a final delivery replay is pending.
// Plain `updatedAt` changes are normal for heartbeat sessions and should not
// suppress heartbeat runs; only defer when final delivery recovery is active.

View File

@@ -1,3 +1,4 @@
import { spawnSync } from "node:child_process";
import { readFileSync } from "node:fs";
import { describe, expect, it } from "vitest";
@@ -420,4 +421,30 @@ describe("bun global install smoke", () => {
expect(releaseChecks).toContain("uses: ./.github/workflows/install-smoke.yml");
expect(releaseChecks).toContain("run_bun_global_install_smoke: true");
});
it("kills Bun global install smoke commands that ignore TERM after timeout", () => {
const result = spawnSync(
process.execPath,
[
BUN_GLOBAL_ASSERTIONS_PATH,
"run-with-timeout",
"50",
process.execPath,
"-e",
"process.on('SIGTERM', () => {}); setInterval(() => {}, 1000);",
],
{
encoding: "utf8",
env: {
...process.env,
OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS: "50",
},
timeout: 5000,
},
);
expect(result.error).toBeUndefined();
expect(result.status).toBe(1);
expect(result.stderr).toContain(`command timed out after 50ms: ${process.execPath}`);
});
});