mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-27 18:01:53 +08:00
Compare commits
3 Commits
codex/spli
...
codex/spli
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a4bf05cf1b | ||
|
|
b56ddcc6ff | ||
|
|
8c6537b8c1 |
@@ -29,6 +29,7 @@ Docs: https://docs.openclaw.ai
|
||||
- CI: bound Docker/Bash E2E tarball npm installs with `OPENCLAW_E2E_NPM_INSTALL_TIMEOUT` so package, onboarding, plugin, and upgrade lanes fail instead of hanging on a stuck npm install.
|
||||
- CI: keep `OPENCLAW_TESTBOX=1 pnpm check:changed` delegating to Blacksmith Testbox through Crabbox without forwarding local Testbox or worker env into the remote command.
|
||||
- CI: send KILL after the TERM grace period for manual checkout fetch timeouts so stuck Testbox and workflow checkout retries cannot hang behind a wedged `git fetch`.
|
||||
- CI: send KILL after the TERM grace period for Bun global install smoke command timeouts so trapped `openclaw` child processes cannot wedge the scheduled install smoke.
|
||||
- iMessage: thread current channel/account inbound attachment roots into the image tool so iMessage-saved attachments under `~/Library/Messages/Attachments` (including the wildcard `/Users/*/Library/Messages/Attachments` root) are read through the existing inbound path policy instead of being rejected as `path-not-allowed`. Literal `localRoots` stays workspace-scoped. Fixes #30170. (#86569)
|
||||
- QQ Bot: respect `OPENCLAW_HOME` for outbound media path resolution so `<qqmedia>` sends no longer silently fail when `HOME` and `OPENCLAW_HOME` differ (Docker / multi-user hosts). Persisted QQ Bot data (sessions, known users, refs) stays anchored on the OS home for upgrade compatibility. Fixes #83562. Thanks @sliverp.
|
||||
- Update: report the primary malformed `openclaw.extensions` payload error without adding a duplicate missing-main diagnostic. (#86596) Thanks @ferminquant.
|
||||
|
||||
@@ -50,50 +50,6 @@ Disable all flags:
|
||||
OPENCLAW_DIAGNOSTICS=0
|
||||
```
|
||||
|
||||
`OPENCLAW_DIAGNOSTICS=0` is a process-level disable override: it disables
|
||||
flags from both env and config for that process.
|
||||
|
||||
## Profiling flags
|
||||
|
||||
Profiler flags enable targeted timing spans without raising global logging
|
||||
levels. They are disabled by default.
|
||||
|
||||
Enable all profiler-gated spans for one gateway run:
|
||||
|
||||
```bash
|
||||
OPENCLAW_DIAGNOSTICS=profiler openclaw gateway run
|
||||
```
|
||||
|
||||
Enable only reply-dispatch profiler spans:
|
||||
|
||||
```bash
|
||||
OPENCLAW_DIAGNOSTICS=reply.profiler openclaw gateway run
|
||||
```
|
||||
|
||||
Enable only Codex app-server startup/tool/thread profiler spans:
|
||||
|
||||
```bash
|
||||
OPENCLAW_DIAGNOSTICS=codex.profiler openclaw gateway run
|
||||
```
|
||||
|
||||
Enable profiler flags from config:
|
||||
|
||||
```json
|
||||
{
|
||||
"diagnostics": {
|
||||
"flags": ["reply.profiler", "codex.profiler"]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Restart the gateway after changing config flags. To disable a profiler flag,
|
||||
remove it from `diagnostics.flags` and restart. To temporarily disable every
|
||||
diagnostics flag even when config enables profiler flags, start the process with:
|
||||
|
||||
```bash
|
||||
OPENCLAW_DIAGNOSTICS=0 openclaw gateway run
|
||||
```
|
||||
|
||||
## Timeline artifacts
|
||||
|
||||
The `timeline` flag writes structured startup and runtime timing events for
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { isCodexAppServerProfilerEnabled } from "./profiler-flag.js";
|
||||
|
||||
describe("isCodexAppServerProfilerEnabled", () => {
|
||||
it("is disabled by default", () => {
|
||||
expect(isCodexAppServerProfilerEnabled(undefined, {} as NodeJS.ProcessEnv)).toBe(false);
|
||||
});
|
||||
|
||||
it("matches global and Codex profiler flags", () => {
|
||||
expect(
|
||||
isCodexAppServerProfilerEnabled(
|
||||
{ diagnostics: { flags: ["codex.profiler"] } },
|
||||
{} as NodeJS.ProcessEnv,
|
||||
),
|
||||
).toBe(true);
|
||||
expect(
|
||||
isCodexAppServerProfilerEnabled(undefined, {
|
||||
OPENCLAW_DIAGNOSTICS: "profiler",
|
||||
} as NodeJS.ProcessEnv),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("uses the documented diagnostics env disable override", () => {
|
||||
expect(
|
||||
isCodexAppServerProfilerEnabled({ diagnostics: { flags: ["codex.profiler"] } }, {
|
||||
OPENCLAW_DIAGNOSTICS: "0",
|
||||
} as NodeJS.ProcessEnv),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
@@ -1,11 +0,0 @@
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
|
||||
import { isDiagnosticFlagEnabled } from "openclaw/plugin-sdk/diagnostic-runtime";
|
||||
|
||||
const PROFILER_FLAGS = ["profiler", "codex.profiler"] as const;
|
||||
|
||||
export function isCodexAppServerProfilerEnabled(
|
||||
config?: OpenClawConfig,
|
||||
env: NodeJS.ProcessEnv = process.env,
|
||||
): boolean {
|
||||
return PROFILER_FLAGS.some((flag) => isDiagnosticFlagEnabled(flag, config, env));
|
||||
}
|
||||
@@ -887,7 +887,6 @@ describe("runCodexAppServerAttempt", () => {
|
||||
await closeCodexSandboxExecServersForTests();
|
||||
resetCodexAppServerClientFactoryForTest();
|
||||
testing.resetOpenClawCodingToolsFactoryForTests();
|
||||
testing.resetEnsuredCodexWorkspaceDirsForTests();
|
||||
testing.clearPendingCodexNativeHookRelayUnregistersForTests();
|
||||
resetCodexRateLimitCacheForTests();
|
||||
nativeHookRelayTesting.clearNativeHookRelaysForTests();
|
||||
@@ -904,16 +903,6 @@ describe("runCodexAppServerAttempt", () => {
|
||||
await fs.rm(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("recreates cached Codex workspace directories after cleanup removes them", async () => {
|
||||
const workspaceDir = path.join(tempDir, "cached-workspace");
|
||||
|
||||
await testing.ensureCodexWorkspaceDirOnceForTests(workspaceDir);
|
||||
await fs.rm(workspaceDir, { recursive: true, force: true });
|
||||
await testing.ensureCodexWorkspaceDirOnceForTests(workspaceDir);
|
||||
|
||||
expect((await fs.stat(workspaceDir)).isDirectory()).toBe(true);
|
||||
});
|
||||
|
||||
it("filters Codex-native dynamic tools from app-server tool exposure", () => {
|
||||
const tools = [
|
||||
"read",
|
||||
|
||||
@@ -134,7 +134,6 @@ import {
|
||||
mergeCodexThreadConfigs,
|
||||
shouldBuildCodexPluginThreadConfig,
|
||||
} from "./plugin-thread-config.js";
|
||||
import { isCodexAppServerProfilerEnabled } from "./profiler-flag.js";
|
||||
import {
|
||||
assertCodexTurnStartResponse,
|
||||
readCodexDynamicToolCallParams,
|
||||
@@ -279,7 +278,6 @@ type CodexWorkspaceBootstrapContext = CodexBootstrapContext & {
|
||||
};
|
||||
|
||||
let openClawCodingToolsFactoryForTests: OpenClawCodingToolsFactory | undefined;
|
||||
const ensuredCodexWorkspaceDirs = new Set<string>();
|
||||
|
||||
type PendingCodexNativeHookRelayUnregister = {
|
||||
timeout: ReturnType<typeof setTimeout>;
|
||||
@@ -342,30 +340,6 @@ function clearPendingCodexNativeHookRelayUnregistersForTests(): void {
|
||||
pendingCodexNativeHookRelayUnregisters.clear();
|
||||
}
|
||||
|
||||
async function ensureCodexWorkspaceDirOnce(workspaceDir: string): Promise<void> {
|
||||
const normalized = path.resolve(workspaceDir);
|
||||
if (ensuredCodexWorkspaceDirs.has(normalized)) {
|
||||
try {
|
||||
const stat = await fs.stat(normalized);
|
||||
if (stat.isDirectory()) {
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
const code =
|
||||
typeof error === "object" && error ? (error as { code?: unknown }).code : undefined;
|
||||
if (code !== "ENOENT") {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
ensuredCodexWorkspaceDirs.delete(normalized);
|
||||
}
|
||||
// Codex attempts re-enter the same workspace repeatedly; caching successful
|
||||
// mkdirs avoids repeated fs work while still recovering if cleanup prunes
|
||||
// the directory between attempts.
|
||||
await fs.mkdir(normalized, { recursive: true });
|
||||
ensuredCodexWorkspaceDirs.add(normalized);
|
||||
}
|
||||
|
||||
function emitCodexAppServerEvent(
|
||||
params: EmbeddedRunAttemptParams,
|
||||
event: Parameters<NonNullable<EmbeddedRunAttemptParams["onAgentEvent"]>>[0],
|
||||
@@ -1039,7 +1013,6 @@ export async function runCodexAppServerAttempt(
|
||||
} = {},
|
||||
): Promise<EmbeddedRunAttemptResult> {
|
||||
const attemptStartedAt = Date.now();
|
||||
const profilerEnabled = isCodexAppServerProfilerEnabled(params.config);
|
||||
const codexModelCallTrace = freezeDiagnosticTraceContext(
|
||||
createDiagnosticTraceContextFromActiveScope(),
|
||||
);
|
||||
@@ -1049,20 +1022,13 @@ export async function runCodexAppServerAttempt(
|
||||
let codexModelCallStarted = false;
|
||||
let codexModelCallTerminalEmitted = false;
|
||||
let codexModelCallRequestPayloadBytes: number | undefined;
|
||||
// Startup phase timings are profiler-gated because this function runs before
|
||||
// every Codex turn; normal production should not do timing bookkeeping here.
|
||||
const preDynamicStartupStages = createCodexDynamicToolBuildStageTracker({
|
||||
enabled: profilerEnabled,
|
||||
});
|
||||
const attemptClientFactory = options.clientFactory ?? defaultCodexAppServerClientFactory;
|
||||
const pluginConfig = readCodexPluginConfig(options.pluginConfig);
|
||||
const computerUseConfig = resolveCodexComputerUseConfig({ pluginConfig });
|
||||
const configuredAppServer = resolveCodexAppServerRuntimeOptions({ pluginConfig });
|
||||
const beforeToolCallPolicy = getBeforeToolCallPolicyDiagnosticState();
|
||||
preDynamicStartupStages.mark("config");
|
||||
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
|
||||
await ensureCodexWorkspaceDirOnce(resolvedWorkspace);
|
||||
preDynamicStartupStages.mark("workspace");
|
||||
await fs.mkdir(resolvedWorkspace, { recursive: true });
|
||||
const sandboxSessionKey =
|
||||
params.sandboxSessionKey?.trim() || params.sessionKey?.trim() || params.sessionId;
|
||||
const contextSessionKey = params.sessionKey?.trim() || sandboxSessionKey;
|
||||
@@ -1071,14 +1037,12 @@ export async function runCodexAppServerAttempt(
|
||||
sessionKey: sandboxSessionKey,
|
||||
workspaceDir: resolvedWorkspace,
|
||||
});
|
||||
preDynamicStartupStages.mark("sandbox");
|
||||
const effectiveWorkspace = sandbox?.enabled
|
||||
? sandbox.workspaceAccess === "rw"
|
||||
? resolvedWorkspace
|
||||
: sandbox.workspaceDir
|
||||
: resolvedWorkspace;
|
||||
await ensureCodexWorkspaceDirOnce(effectiveWorkspace);
|
||||
preDynamicStartupStages.mark("effective-workspace");
|
||||
await fs.mkdir(effectiveWorkspace, { recursive: true });
|
||||
const appServer = resolveCodexAppServerForOpenClawToolPolicy({
|
||||
appServer: configuredAppServer,
|
||||
pluginConfig,
|
||||
@@ -1098,13 +1062,11 @@ export async function runCodexAppServerAttempt(
|
||||
trustedToolPolicies: beforeToolCallPolicy.trustedToolPolicies,
|
||||
});
|
||||
}
|
||||
preDynamicStartupStages.mark("app-server-policy");
|
||||
let pluginAppServer: CodexAppServerRuntimeOptions = appServer;
|
||||
const nativeHookRelayEvents = resolveCodexNativeHookRelayEvents({
|
||||
configuredEvents: options.nativeHookRelay?.events,
|
||||
appServer,
|
||||
});
|
||||
preDynamicStartupStages.mark("native-hook-relay");
|
||||
|
||||
const runAbortController = new AbortController();
|
||||
const abortFromUpstream = () => {
|
||||
@@ -1122,9 +1084,7 @@ export async function runCodexAppServerAttempt(
|
||||
agentId: params.agentId,
|
||||
});
|
||||
const agentDir = params.agentDir ?? resolveAgentDir(params.config ?? {}, sessionAgentId);
|
||||
preDynamicStartupStages.mark("session-agent");
|
||||
let startupBinding = await readCodexAppServerBinding(params.sessionFile);
|
||||
preDynamicStartupStages.mark("read-binding");
|
||||
const startupBindingAuthProfileId = startupBinding?.authProfileId;
|
||||
startupBinding = await rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: startupBinding,
|
||||
@@ -1134,7 +1094,6 @@ export async function runCodexAppServerAttempt(
|
||||
config: params.config,
|
||||
contextEngineActive: isActiveHarnessContextEngine(params.contextEngine),
|
||||
});
|
||||
preDynamicStartupStages.mark("rotate-binding");
|
||||
const startupAuthProfileCandidate =
|
||||
params.runtimePlan?.auth.forwardedAuthProfileId ??
|
||||
params.authProfileId ??
|
||||
@@ -1151,7 +1110,6 @@ export async function runCodexAppServerAttempt(
|
||||
agentDir,
|
||||
config: params.config,
|
||||
});
|
||||
preDynamicStartupStages.mark("auth-profile");
|
||||
const runtimeParams = {
|
||||
...params,
|
||||
sessionKey: contextSessionKey,
|
||||
@@ -1175,13 +1133,11 @@ export async function runCodexAppServerAttempt(
|
||||
: resolveCodexAppServerFallbackApiKeyCacheKey({
|
||||
startOptions: appServer.start,
|
||||
});
|
||||
preDynamicStartupStages.mark("auth-cache");
|
||||
const nodeExecBlocksNativeExecution = isCodexNativeExecutionBlockedByNodeExecHost(params, {
|
||||
agentId: sessionAgentId,
|
||||
runtimeSessionKey: sandboxSessionKey,
|
||||
sandbox,
|
||||
});
|
||||
preDynamicStartupStages.mark("native-exec-policy");
|
||||
const bundleMcpThreadConfig = await loadCodexBundleMcpThreadConfig({
|
||||
workspaceDir: effectiveWorkspace,
|
||||
cfg: params.config,
|
||||
@@ -1189,14 +1145,12 @@ export async function runCodexAppServerAttempt(
|
||||
disableTools: params.disableTools,
|
||||
toolsAllow: nodeExecBlocksNativeExecution ? [] : params.toolsAllow,
|
||||
});
|
||||
preDynamicStartupStages.mark("bundle-mcp");
|
||||
const sandboxExecServerEnabled = isCodexSandboxExecServerEnabled(pluginConfig);
|
||||
const nativeToolSurfaceEnabled = shouldEnableCodexAppServerNativeToolSurface(params, sandbox, {
|
||||
agentId: sessionAgentId,
|
||||
runtimeSessionKey: sandboxSessionKey,
|
||||
sandboxExecServerEnabled,
|
||||
});
|
||||
preDynamicStartupStages.mark("native-tool-surface");
|
||||
for (const diagnostic of bundleMcpThreadConfig.diagnostics) {
|
||||
embeddedAgentLog.warn(`bundle-mcp: ${diagnostic.pluginId}: ${diagnostic.message}`);
|
||||
}
|
||||
@@ -1211,23 +1165,6 @@ export async function runCodexAppServerAttempt(
|
||||
});
|
||||
}
|
||||
const hookChannelId = resolveCodexAppServerHookChannelId(params, sandboxSessionKey);
|
||||
preDynamicStartupStages.mark("context-engine-support");
|
||||
const preDynamicSummary = preDynamicStartupStages.snapshot();
|
||||
if (shouldWarnCodexDynamicToolBuildStageSummary(preDynamicSummary)) {
|
||||
embeddedAgentLog.warn(
|
||||
`codex app-server pre-dynamic startup timings runId=${params.runId} sessionId=${params.sessionId} totalMs=${preDynamicSummary.totalMs} stages=${formatCodexDynamicToolBuildStageSummary(preDynamicSummary)}`,
|
||||
{
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
totalMs: preDynamicSummary.totalMs,
|
||||
stages: preDynamicSummary.stages,
|
||||
hasStartupBinding: Boolean(startupBinding?.threadId),
|
||||
startupAuthProfileId: startupAuthProfileId ?? null,
|
||||
bundleMcpDiagnosticCount: bundleMcpThreadConfig.diagnostics.length,
|
||||
nativeToolSurfaceEnabled,
|
||||
},
|
||||
);
|
||||
}
|
||||
let yieldDetected = false;
|
||||
const tools = await buildDynamicTools({
|
||||
params,
|
||||
@@ -1239,7 +1176,6 @@ export async function runCodexAppServerAttempt(
|
||||
runAbortController,
|
||||
sessionAgentId,
|
||||
pluginConfig,
|
||||
profilerEnabled,
|
||||
onYieldDetected: () => {
|
||||
yieldDetected = true;
|
||||
},
|
||||
@@ -1254,7 +1190,6 @@ export async function runCodexAppServerAttempt(
|
||||
runAbortController,
|
||||
sessionAgentId,
|
||||
pluginConfig,
|
||||
profilerEnabled,
|
||||
forceHeartbeatTool: true,
|
||||
ignoreRuntimePlan: true,
|
||||
onYieldDetected: () => {
|
||||
@@ -3918,9 +3853,6 @@ function createCodexNativeHookRelay(params: {
|
||||
}),
|
||||
signal: params.signal,
|
||||
command: {
|
||||
// Hook relay subprocesses are observational for most tool events; keep
|
||||
// them lower priority so they do not compete with the active reply turn.
|
||||
nice: 10,
|
||||
timeoutMs: params.options?.gatewayTimeoutMs,
|
||||
},
|
||||
});
|
||||
@@ -4090,7 +4022,6 @@ type DynamicToolBuildParams = {
|
||||
runAbortController: AbortController;
|
||||
sessionAgentId: string;
|
||||
pluginConfig: CodexPluginConfig;
|
||||
profilerEnabled?: boolean;
|
||||
forceHeartbeatTool?: boolean;
|
||||
ignoreRuntimePlan?: boolean;
|
||||
onYieldDetected: () => void;
|
||||
@@ -4120,91 +4051,16 @@ function resolveCodexAppServerHookChannelId(
|
||||
}).channelId;
|
||||
}
|
||||
|
||||
type CodexDynamicToolBuildStageTiming = {
|
||||
name: string;
|
||||
durationMs: number;
|
||||
elapsedMs: number;
|
||||
};
|
||||
|
||||
type CodexDynamicToolBuildStageSummary = {
|
||||
totalMs: number;
|
||||
stages: CodexDynamicToolBuildStageTiming[];
|
||||
};
|
||||
|
||||
const CODEX_DYNAMIC_TOOL_BUILD_WARN_TOTAL_MS = 1_000;
|
||||
const CODEX_DYNAMIC_TOOL_BUILD_WARN_STAGE_MS = 500;
|
||||
|
||||
function createCodexDynamicToolBuildStageTracker(options: { enabled?: boolean } = {}): {
|
||||
mark: (name: string) => void;
|
||||
snapshot: () => CodexDynamicToolBuildStageSummary;
|
||||
} {
|
||||
if (!options.enabled) {
|
||||
return {
|
||||
mark() {},
|
||||
snapshot() {
|
||||
return { totalMs: 0, stages: [] };
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
let previousAt = startedAt;
|
||||
const stages: CodexDynamicToolBuildStageTiming[] = [];
|
||||
const toMs = (value: number) => Math.max(0, Math.round(value));
|
||||
return {
|
||||
mark(name) {
|
||||
const currentAt = Date.now();
|
||||
stages.push({
|
||||
name,
|
||||
durationMs: toMs(currentAt - previousAt),
|
||||
elapsedMs: toMs(currentAt - startedAt),
|
||||
});
|
||||
previousAt = currentAt;
|
||||
},
|
||||
snapshot() {
|
||||
return {
|
||||
totalMs: toMs(Date.now() - startedAt),
|
||||
stages: stages.slice(),
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function shouldWarnCodexDynamicToolBuildStageSummary(
|
||||
summary: CodexDynamicToolBuildStageSummary,
|
||||
): boolean {
|
||||
return (
|
||||
summary.totalMs >= CODEX_DYNAMIC_TOOL_BUILD_WARN_TOTAL_MS ||
|
||||
summary.stages.some((stage) => stage.durationMs >= CODEX_DYNAMIC_TOOL_BUILD_WARN_STAGE_MS)
|
||||
);
|
||||
}
|
||||
|
||||
function formatCodexDynamicToolBuildStageSummary(
|
||||
summary: CodexDynamicToolBuildStageSummary,
|
||||
): string {
|
||||
return summary.stages.length > 0
|
||||
? summary.stages
|
||||
.map((stage) => `${stage.name}:${stage.durationMs}ms@${stage.elapsedMs}ms`)
|
||||
.join(",")
|
||||
: "none";
|
||||
}
|
||||
|
||||
async function buildDynamicTools(input: DynamicToolBuildParams) {
|
||||
const { params } = input;
|
||||
if (params.disableTools || !supportsModelTools(params.model)) {
|
||||
return [];
|
||||
}
|
||||
// Dynamic tool construction is on the reply hot path, so per-stage
|
||||
// Date.now/span bookkeeping runs only when the Codex profiler flag is set.
|
||||
const toolBuildStages = createCodexDynamicToolBuildStageTracker({
|
||||
enabled: input.profilerEnabled,
|
||||
});
|
||||
const modelHasVision = params.model.input?.includes("image") ?? false;
|
||||
const agentDir = params.agentDir ?? resolveAgentDir(params.config ?? {}, input.sessionAgentId);
|
||||
const createOpenClawCodingTools =
|
||||
openClawCodingToolsFactoryForTests ??
|
||||
(await import("openclaw/plugin-sdk/agent-harness")).createOpenClawCodingTools;
|
||||
toolBuildStages.mark("load-agent-harness-tools");
|
||||
const sessionKeys = resolveOpenClawCodingToolsSessionKeys(params, input.sandboxSessionKey);
|
||||
const allTools = createOpenClawCodingTools({
|
||||
agentId: input.sessionAgentId,
|
||||
@@ -4274,11 +4130,7 @@ async function buildDynamicTools(input: DynamicToolBuildParams) {
|
||||
data: { name: "sessions_yield", message },
|
||||
});
|
||||
},
|
||||
recordToolPrepStage: (name) => {
|
||||
toolBuildStages.mark(name);
|
||||
},
|
||||
});
|
||||
toolBuildStages.mark("create-openclaw-coding-tools");
|
||||
const codexFilteredTools = addNodeShellDynamicToolsIfNeeded(
|
||||
addSandboxShellDynamicToolsIfAvailable(
|
||||
isCodexMemoryFlushRun(params)
|
||||
@@ -4290,16 +4142,13 @@ async function buildDynamicTools(input: DynamicToolBuildParams) {
|
||||
allTools,
|
||||
input,
|
||||
);
|
||||
toolBuildStages.mark("codex-filtering");
|
||||
const visionFilteredTools = filterToolsForVisionInputs(codexFilteredTools, {
|
||||
modelHasVision,
|
||||
hasInboundImages: (params.images?.length ?? 0) > 0,
|
||||
});
|
||||
toolBuildStages.mark("vision-filtering");
|
||||
const toolsAllow = includeForcedCodexDynamicToolAllow(params.toolsAllow, params);
|
||||
const filteredTools = filterCodexDynamicToolsForAllowlist(visionFilteredTools, toolsAllow);
|
||||
toolBuildStages.mark("allowlist-filter");
|
||||
const normalizedTools = normalizeAgentRuntimeTools({
|
||||
return normalizeAgentRuntimeTools({
|
||||
runtimePlan: input.ignoreRuntimePlan ? undefined : params.runtimePlan,
|
||||
tools: filteredTools,
|
||||
provider: params.provider,
|
||||
@@ -4310,30 +4159,6 @@ async function buildDynamicTools(input: DynamicToolBuildParams) {
|
||||
modelApi: params.model.api,
|
||||
model: params.model,
|
||||
});
|
||||
toolBuildStages.mark("runtime-normalization");
|
||||
const summary = toolBuildStages.snapshot();
|
||||
if (shouldWarnCodexDynamicToolBuildStageSummary(summary)) {
|
||||
const phase = input.forceHeartbeatTool ? "registered-tools" : "runtime-tools";
|
||||
embeddedAgentLog.warn(
|
||||
`codex app-server dynamic tool build timings runId=${params.runId} sessionId=${params.sessionId} phase=${phase} totalMs=${summary.totalMs} stages=${formatCodexDynamicToolBuildStageSummary(summary)}`,
|
||||
{
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
phase,
|
||||
totalMs: summary.totalMs,
|
||||
stages: summary.stages,
|
||||
allToolCount: allTools.length,
|
||||
codexFilteredToolCount: codexFilteredTools.length,
|
||||
visionFilteredToolCount: visionFilteredTools.length,
|
||||
filteredToolCount: filteredTools.length,
|
||||
normalizedToolCount: normalizedTools.length,
|
||||
forceHeartbeatTool: input.forceHeartbeatTool === true,
|
||||
ignoreRuntimePlan: input.ignoreRuntimePlan === true,
|
||||
nativeToolSurfaceEnabled: input.nativeToolSurfaceEnabled === true,
|
||||
},
|
||||
);
|
||||
}
|
||||
return normalizedTools;
|
||||
}
|
||||
|
||||
function includeForcedCodexDynamicToolAllow(
|
||||
@@ -6169,12 +5994,6 @@ export const testing = {
|
||||
resetOpenClawCodingToolsFactoryForTests(): void {
|
||||
openClawCodingToolsFactoryForTests = undefined;
|
||||
},
|
||||
async ensureCodexWorkspaceDirOnceForTests(workspaceDir: string): Promise<void> {
|
||||
await ensureCodexWorkspaceDirOnce(workspaceDir);
|
||||
},
|
||||
resetEnsuredCodexWorkspaceDirsForTests(): void {
|
||||
ensuredCodexWorkspaceDirs.clear();
|
||||
},
|
||||
flushPendingCodexNativeHookRelayUnregistersForTests,
|
||||
clearPendingCodexNativeHookRelayUnregistersForTests,
|
||||
resolveCodexNativeHookRelayUnregisterGraceMs,
|
||||
|
||||
@@ -19,7 +19,6 @@ import {
|
||||
mergeCodexThreadConfigs,
|
||||
type CodexPluginThreadConfig,
|
||||
} from "./plugin-thread-config.js";
|
||||
import { isCodexAppServerProfilerEnabled } from "./profiler-flag.js";
|
||||
import {
|
||||
assertCodexThreadResumeResponse,
|
||||
assertCodexThreadStartResponse,
|
||||
@@ -85,113 +84,6 @@ const CODEX_LIGHTWEIGHT_CONTEXT_THREAD_CONFIG: JsonObject = {
|
||||
project_doc_max_bytes: 0,
|
||||
};
|
||||
|
||||
type CodexThreadLifecycleTimingSpan = {
|
||||
name: string;
|
||||
durationMs: number;
|
||||
elapsedMs: number;
|
||||
};
|
||||
|
||||
type CodexThreadLifecycleTimingSummary = {
|
||||
totalMs: number;
|
||||
spans: CodexThreadLifecycleTimingSpan[];
|
||||
};
|
||||
|
||||
const CODEX_THREAD_LIFECYCLE_TIMING_WARN_TOTAL_MS = 1_000;
|
||||
const CODEX_THREAD_LIFECYCLE_TIMING_WARN_STAGE_MS = 500;
|
||||
|
||||
function createCodexThreadLifecycleTimingTracker(options: { enabled?: boolean } = {}): {
|
||||
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
|
||||
measureSync: <T>(name: string, run: () => T) => T;
|
||||
logIfSlow: (params: {
|
||||
runId: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
action: "started" | "resumed" | "rotated";
|
||||
threadId?: string;
|
||||
}) => void;
|
||||
} {
|
||||
if (!options.enabled) {
|
||||
return {
|
||||
async measure(_name, run) {
|
||||
return await run();
|
||||
},
|
||||
measureSync(_name, run) {
|
||||
return run();
|
||||
},
|
||||
logIfSlow() {},
|
||||
};
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
let didLog = false;
|
||||
const spans: CodexThreadLifecycleTimingSpan[] = [];
|
||||
const toMs = (value: number) => Math.max(0, Math.round(value));
|
||||
const record = (name: string, spanStartedAt: number) => {
|
||||
spans.push({
|
||||
name,
|
||||
durationMs: toMs(Date.now() - spanStartedAt),
|
||||
elapsedMs: toMs(Date.now() - startedAt),
|
||||
});
|
||||
};
|
||||
const snapshot = (): CodexThreadLifecycleTimingSummary => ({
|
||||
totalMs: toMs(Date.now() - startedAt),
|
||||
spans: spans.slice(),
|
||||
});
|
||||
const shouldLog = (summary: CodexThreadLifecycleTimingSummary) =>
|
||||
summary.totalMs >= CODEX_THREAD_LIFECYCLE_TIMING_WARN_TOTAL_MS ||
|
||||
summary.spans.some((span) => span.durationMs >= CODEX_THREAD_LIFECYCLE_TIMING_WARN_STAGE_MS);
|
||||
const formatSpans = (summary: CodexThreadLifecycleTimingSummary) =>
|
||||
summary.spans.length > 0
|
||||
? summary.spans
|
||||
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
|
||||
.join(",")
|
||||
: "none";
|
||||
return {
|
||||
async measure(name, run) {
|
||||
const spanStartedAt = Date.now();
|
||||
try {
|
||||
return await run();
|
||||
} finally {
|
||||
record(name, spanStartedAt);
|
||||
}
|
||||
},
|
||||
measureSync(name, run) {
|
||||
const spanStartedAt = Date.now();
|
||||
try {
|
||||
return run();
|
||||
} finally {
|
||||
record(name, spanStartedAt);
|
||||
}
|
||||
},
|
||||
logIfSlow(params) {
|
||||
if (didLog) {
|
||||
return;
|
||||
}
|
||||
const summary = snapshot();
|
||||
if (!shouldLog(summary)) {
|
||||
return;
|
||||
}
|
||||
didLog = true;
|
||||
embeddedAgentLog.warn(
|
||||
`codex app-server thread lifecycle timings runId=${params.runId} sessionId=${
|
||||
params.sessionId
|
||||
} sessionKey=${params.sessionKey ?? "unknown"} action=${params.action} totalMs=${
|
||||
summary.totalMs
|
||||
} stages=${formatSpans(summary)}`,
|
||||
{
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
action: params.action,
|
||||
threadId: params.threadId,
|
||||
totalMs: summary.totalMs,
|
||||
spans: summary.spans,
|
||||
},
|
||||
);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export async function startOrResumeThread(params: {
|
||||
client: CodexAppServerClient;
|
||||
params: EmbeddedRunAttemptParams;
|
||||
@@ -211,16 +103,10 @@ export async function startOrResumeThread(params: {
|
||||
pluginThreadConfig?: CodexPluginThreadConfigProvider;
|
||||
contextEngineProjection?: CodexContextEngineThreadBootstrapProjection;
|
||||
}): Promise<CodexAppServerThreadLifecycleBinding> {
|
||||
// Thread lifecycle spans are useful when profiling startup churn, but normal
|
||||
// turns should not pay Date.now/span-array overhead while resuming threads.
|
||||
const lifecycleTiming = createCodexThreadLifecycleTimingTracker({
|
||||
enabled: isCodexAppServerProfilerEnabled(params.params.config),
|
||||
});
|
||||
const dynamicToolsFingerprint = lifecycleTiming.measureSync("fingerprint_dynamic_tools", () =>
|
||||
fingerprintDynamicTools(params.dynamicTools),
|
||||
);
|
||||
const contextEngineBinding = lifecycleTiming.measureSync("context_engine_binding", () =>
|
||||
buildContextEngineBinding(params.params, params.contextEngineProjection),
|
||||
const dynamicToolsFingerprint = fingerprintDynamicTools(params.dynamicTools);
|
||||
const contextEngineBinding = buildContextEngineBinding(
|
||||
params.params,
|
||||
params.contextEngineProjection,
|
||||
);
|
||||
const userMcpServersConfigPatch =
|
||||
params.userMcpServersEnabled === false
|
||||
@@ -232,13 +118,11 @@ export async function startOrResumeThread(params: {
|
||||
const environmentSelectionFingerprint = fingerprintEnvironmentSelection(
|
||||
params.environmentSelection,
|
||||
);
|
||||
let binding = await lifecycleTiming.measure("read_binding", () =>
|
||||
readCodexAppServerBinding(params.params.sessionFile, {
|
||||
authProfileStore: params.params.authProfileStore,
|
||||
agentDir: params.params.agentDir,
|
||||
config: params.params.config,
|
||||
}),
|
||||
);
|
||||
let binding = await readCodexAppServerBinding(params.params.sessionFile, {
|
||||
authProfileStore: params.params.authProfileStore,
|
||||
agentDir: params.params.agentDir,
|
||||
config: params.params.config,
|
||||
});
|
||||
let preserveExistingBinding = false;
|
||||
let rotatedContextEngineBinding = false;
|
||||
let prebuiltPluginThreadConfig: CodexPluginThreadConfig | undefined;
|
||||
@@ -323,9 +207,7 @@ export async function startOrResumeThread(params: {
|
||||
})
|
||||
) {
|
||||
try {
|
||||
prebuiltPluginThreadConfig = await lifecycleTiming.measure("plugin_config_recovery", () =>
|
||||
params.pluginThreadConfig?.build(),
|
||||
);
|
||||
prebuiltPluginThreadConfig = await params.pluginThreadConfig?.build();
|
||||
pluginBindingStale =
|
||||
prebuiltPluginThreadConfig?.fingerprint !== binding.pluginAppsFingerprint;
|
||||
} catch (error) {
|
||||
@@ -392,21 +274,19 @@ export async function startOrResumeThread(params: {
|
||||
userMcpServersConfigPatch,
|
||||
params.finalConfigPatch,
|
||||
);
|
||||
const resumeParams = lifecycleTiming.measureSync("thread_resume_params", () =>
|
||||
buildThreadResumeParams(params.params, {
|
||||
threadId: binding.threadId,
|
||||
authProfileId,
|
||||
appServer: params.appServer,
|
||||
dynamicTools: params.dynamicTools,
|
||||
developerInstructions: params.developerInstructions,
|
||||
config: resumeConfig,
|
||||
nativeCodeModeEnabled: params.nativeCodeModeEnabled,
|
||||
nativeCodeModeOnlyEnabled: params.nativeCodeModeOnlyEnabled,
|
||||
}),
|
||||
);
|
||||
const response = assertCodexThreadResumeResponse(
|
||||
await lifecycleTiming.measure("thread_resume_request", () =>
|
||||
params.client.request("thread/resume", resumeParams),
|
||||
await params.client.request(
|
||||
"thread/resume",
|
||||
buildThreadResumeParams(params.params, {
|
||||
threadId: binding.threadId,
|
||||
authProfileId,
|
||||
appServer: params.appServer,
|
||||
dynamicTools: params.dynamicTools,
|
||||
developerInstructions: params.developerInstructions,
|
||||
config: resumeConfig,
|
||||
nativeCodeModeEnabled: params.nativeCodeModeEnabled,
|
||||
nativeCodeModeOnlyEnabled: params.nativeCodeModeOnlyEnabled,
|
||||
}),
|
||||
),
|
||||
);
|
||||
const boundAuthProfileId = authProfileId;
|
||||
@@ -421,31 +301,29 @@ export async function startOrResumeThread(params: {
|
||||
params.mcpServersFingerprintEvaluated === true
|
||||
? params.mcpServersFingerprint
|
||||
: binding.mcpServersFingerprint;
|
||||
await lifecycleTiming.measure("thread_resume_write_binding", () =>
|
||||
writeCodexAppServerBinding(
|
||||
params.params.sessionFile,
|
||||
{
|
||||
threadId: response.thread.id,
|
||||
cwd: params.cwd,
|
||||
authProfileId: boundAuthProfileId,
|
||||
model: params.params.modelId,
|
||||
modelProvider: response.modelProvider ?? fallbackModelProvider,
|
||||
dynamicToolsFingerprint,
|
||||
userMcpServersFingerprint,
|
||||
mcpServersFingerprint: nextMcpServersFingerprint,
|
||||
pluginAppsFingerprint: binding.pluginAppsFingerprint,
|
||||
pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint,
|
||||
pluginAppPolicyContext: binding.pluginAppPolicyContext,
|
||||
contextEngine: contextEngineBinding,
|
||||
environmentSelectionFingerprint,
|
||||
createdAt: binding.createdAt,
|
||||
},
|
||||
{
|
||||
authProfileStore: params.params.authProfileStore,
|
||||
agentDir: params.params.agentDir,
|
||||
config: params.params.config,
|
||||
},
|
||||
),
|
||||
await writeCodexAppServerBinding(
|
||||
params.params.sessionFile,
|
||||
{
|
||||
threadId: response.thread.id,
|
||||
cwd: params.cwd,
|
||||
authProfileId: boundAuthProfileId,
|
||||
model: params.params.modelId,
|
||||
modelProvider: response.modelProvider ?? fallbackModelProvider,
|
||||
dynamicToolsFingerprint,
|
||||
userMcpServersFingerprint,
|
||||
mcpServersFingerprint: nextMcpServersFingerprint,
|
||||
pluginAppsFingerprint: binding.pluginAppsFingerprint,
|
||||
pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint,
|
||||
pluginAppPolicyContext: binding.pluginAppPolicyContext,
|
||||
contextEngine: contextEngineBinding,
|
||||
environmentSelectionFingerprint,
|
||||
createdAt: binding.createdAt,
|
||||
},
|
||||
{
|
||||
authProfileStore: params.params.authProfileStore,
|
||||
agentDir: params.params.agentDir,
|
||||
config: params.params.config,
|
||||
},
|
||||
);
|
||||
if (contextEngineBinding) {
|
||||
embeddedAgentLog.info("codex app-server wrote context-engine thread binding", {
|
||||
@@ -458,13 +336,6 @@ export async function startOrResumeThread(params: {
|
||||
action: "resumed",
|
||||
});
|
||||
}
|
||||
lifecycleTiming.logIfSlow({
|
||||
runId: params.params.runId,
|
||||
sessionId: params.params.sessionId,
|
||||
sessionKey: params.params.sessionKey,
|
||||
threadId: response.thread.id,
|
||||
action: "resumed",
|
||||
});
|
||||
return {
|
||||
...binding,
|
||||
threadId: response.thread.id,
|
||||
@@ -495,34 +366,27 @@ export async function startOrResumeThread(params: {
|
||||
}
|
||||
|
||||
const pluginThreadConfig = params.pluginThreadConfig?.enabled
|
||||
? (prebuiltPluginThreadConfig ??
|
||||
(await lifecycleTiming.measure("plugin_config_build", () =>
|
||||
params.pluginThreadConfig?.build(),
|
||||
)))
|
||||
? (prebuiltPluginThreadConfig ?? (await params.pluginThreadConfig.build()))
|
||||
: undefined;
|
||||
const config = lifecycleTiming.measureSync("merge_thread_config", () =>
|
||||
mergeCodexThreadConfigs(
|
||||
params.config,
|
||||
userMcpServersConfigPatch,
|
||||
pluginThreadConfig?.configPatch,
|
||||
params.finalConfigPatch,
|
||||
),
|
||||
);
|
||||
const startParams = lifecycleTiming.measureSync("thread_start_params", () =>
|
||||
buildThreadStartParams(params.params, {
|
||||
cwd: params.cwd,
|
||||
dynamicTools: params.dynamicTools,
|
||||
appServer: params.appServer,
|
||||
developerInstructions: params.developerInstructions,
|
||||
config,
|
||||
nativeCodeModeEnabled: params.nativeCodeModeEnabled,
|
||||
nativeCodeModeOnlyEnabled: params.nativeCodeModeOnlyEnabled,
|
||||
environmentSelection: params.environmentSelection,
|
||||
}),
|
||||
const config = mergeCodexThreadConfigs(
|
||||
params.config,
|
||||
userMcpServersConfigPatch,
|
||||
pluginThreadConfig?.configPatch,
|
||||
params.finalConfigPatch,
|
||||
);
|
||||
const response = assertCodexThreadStartResponse(
|
||||
await lifecycleTiming.measure("thread_start_request", () =>
|
||||
params.client.request("thread/start", startParams),
|
||||
await params.client.request(
|
||||
"thread/start",
|
||||
buildThreadStartParams(params.params, {
|
||||
cwd: params.cwd,
|
||||
dynamicTools: params.dynamicTools,
|
||||
appServer: params.appServer,
|
||||
developerInstructions: params.developerInstructions,
|
||||
config,
|
||||
nativeCodeModeEnabled: params.nativeCodeModeEnabled,
|
||||
nativeCodeModeOnlyEnabled: params.nativeCodeModeOnlyEnabled,
|
||||
environmentSelection: params.environmentSelection,
|
||||
}),
|
||||
),
|
||||
);
|
||||
const modelProvider = resolveCodexAppServerModelProvider({
|
||||
@@ -536,31 +400,29 @@ export async function startOrResumeThread(params: {
|
||||
const nextMcpServersFingerprint =
|
||||
params.mcpServersFingerprintEvaluated === true ? params.mcpServersFingerprint : undefined;
|
||||
if (!preserveExistingBinding) {
|
||||
await lifecycleTiming.measure("thread_start_write_binding", () =>
|
||||
writeCodexAppServerBinding(
|
||||
params.params.sessionFile,
|
||||
{
|
||||
threadId: response.thread.id,
|
||||
cwd: params.cwd,
|
||||
authProfileId: params.params.authProfileId,
|
||||
model: response.model ?? params.params.modelId,
|
||||
modelProvider: response.modelProvider ?? modelProvider,
|
||||
dynamicToolsFingerprint,
|
||||
userMcpServersFingerprint,
|
||||
mcpServersFingerprint: nextMcpServersFingerprint,
|
||||
pluginAppsFingerprint: pluginThreadConfig?.fingerprint,
|
||||
pluginAppsInputFingerprint: pluginThreadConfig?.inputFingerprint,
|
||||
pluginAppPolicyContext: pluginThreadConfig?.policyContext,
|
||||
contextEngine: contextEngineBinding,
|
||||
environmentSelectionFingerprint,
|
||||
createdAt,
|
||||
},
|
||||
{
|
||||
authProfileStore: params.params.authProfileStore,
|
||||
agentDir: params.params.agentDir,
|
||||
config: params.params.config,
|
||||
},
|
||||
),
|
||||
await writeCodexAppServerBinding(
|
||||
params.params.sessionFile,
|
||||
{
|
||||
threadId: response.thread.id,
|
||||
cwd: params.cwd,
|
||||
authProfileId: params.params.authProfileId,
|
||||
model: response.model ?? params.params.modelId,
|
||||
modelProvider: response.modelProvider ?? modelProvider,
|
||||
dynamicToolsFingerprint,
|
||||
userMcpServersFingerprint,
|
||||
mcpServersFingerprint: nextMcpServersFingerprint,
|
||||
pluginAppsFingerprint: pluginThreadConfig?.fingerprint,
|
||||
pluginAppsInputFingerprint: pluginThreadConfig?.inputFingerprint,
|
||||
pluginAppPolicyContext: pluginThreadConfig?.policyContext,
|
||||
contextEngine: contextEngineBinding,
|
||||
environmentSelectionFingerprint,
|
||||
createdAt,
|
||||
},
|
||||
{
|
||||
authProfileStore: params.params.authProfileStore,
|
||||
agentDir: params.params.agentDir,
|
||||
config: params.params.config,
|
||||
},
|
||||
);
|
||||
if (contextEngineBinding) {
|
||||
embeddedAgentLog.info("codex app-server wrote context-engine thread binding", {
|
||||
@@ -574,13 +436,6 @@ export async function startOrResumeThread(params: {
|
||||
});
|
||||
}
|
||||
}
|
||||
lifecycleTiming.logIfSlow({
|
||||
runId: params.params.runId,
|
||||
sessionId: params.params.sessionId,
|
||||
sessionKey: params.params.sessionKey,
|
||||
threadId: response.thread.id,
|
||||
action: rotatedContextEngineBinding ? "rotated" : "started",
|
||||
});
|
||||
return {
|
||||
schemaVersion: 1,
|
||||
threadId: response.thread.id,
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { spawn } from "node:child_process";
|
||||
|
||||
const DEFAULT_TIMEOUT_KILL_GRACE_MS = 30_000;
|
||||
|
||||
const usage = () => {
|
||||
console.error("Usage: assertions.mjs <run-with-timeout|assert-image-providers> [...]");
|
||||
@@ -7,16 +9,72 @@ const usage = () => {
|
||||
|
||||
const [mode, ...args] = process.argv.slice(2);
|
||||
|
||||
if (mode === "run-with-timeout") {
|
||||
const [timeoutMs, command, ...commandArgs] = args;
|
||||
const timeout = Number(timeoutMs);
|
||||
if (!Number.isFinite(timeout) || timeout <= 0 || !command) {
|
||||
usage();
|
||||
const parsePositiveNumber = (value, label) => {
|
||||
const parsed = Number(value);
|
||||
if (!Number.isFinite(parsed) || parsed <= 0) {
|
||||
throw new Error(`${label} must be a positive number`);
|
||||
}
|
||||
return parsed;
|
||||
};
|
||||
|
||||
const result = spawnSync(command, commandArgs, { encoding: "utf8", env: process.env, timeout });
|
||||
process.stdout.write(result.stdout ?? "");
|
||||
process.stderr.write(result.stderr ?? "");
|
||||
const signalChild = (child, signal) => {
|
||||
if (!child.pid) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (process.platform === "win32") {
|
||||
child.kill(signal);
|
||||
return;
|
||||
}
|
||||
process.kill(-child.pid, signal);
|
||||
} catch (error) {
|
||||
if (error?.code !== "ESRCH") {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const runWithTimeout = async (timeout, command, commandArgs) => {
|
||||
const killGrace = parsePositiveNumber(
|
||||
process.env.OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS ??
|
||||
String(DEFAULT_TIMEOUT_KILL_GRACE_MS),
|
||||
"OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS",
|
||||
);
|
||||
const child = spawn(command, commandArgs, {
|
||||
detached: process.platform !== "win32",
|
||||
env: process.env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
let timedOut = false;
|
||||
let killTimer;
|
||||
|
||||
child.stdout.setEncoding("utf8");
|
||||
child.stderr.setEncoding("utf8");
|
||||
child.stdout.on("data", (chunk) => process.stdout.write(chunk));
|
||||
child.stderr.on("data", (chunk) => process.stderr.write(chunk));
|
||||
|
||||
const timeoutTimer = setTimeout(() => {
|
||||
timedOut = true;
|
||||
signalChild(child, "SIGTERM");
|
||||
killTimer = setTimeout(() => signalChild(child, "SIGKILL"), killGrace);
|
||||
killTimer.unref();
|
||||
}, timeout);
|
||||
timeoutTimer.unref();
|
||||
|
||||
let spawnError;
|
||||
child.on("error", (error) => {
|
||||
spawnError = error;
|
||||
});
|
||||
const result = await new Promise((resolve) => {
|
||||
child.on("close", (status, signal) => resolve({ error: spawnError, signal, status }));
|
||||
});
|
||||
|
||||
clearTimeout(timeoutTimer);
|
||||
clearTimeout(killTimer);
|
||||
if (timedOut) {
|
||||
console.error(`command timed out after ${timeout}ms: ${command}`);
|
||||
process.exit(1);
|
||||
}
|
||||
if (result.error) {
|
||||
console.error(`command failed: ${command}: ${result.error.message}`);
|
||||
process.exit(1);
|
||||
@@ -26,6 +84,20 @@ if (mode === "run-with-timeout") {
|
||||
process.exit(1);
|
||||
}
|
||||
process.exit(result.status ?? 0);
|
||||
};
|
||||
|
||||
if (mode === "run-with-timeout") {
|
||||
const [timeoutMs, command, ...commandArgs] = args;
|
||||
if (!command) {
|
||||
usage();
|
||||
}
|
||||
let timeout;
|
||||
try {
|
||||
timeout = parsePositiveNumber(timeoutMs, "timeoutMs");
|
||||
} catch {
|
||||
usage();
|
||||
}
|
||||
await runWithTimeout(timeout, command, commandArgs);
|
||||
}
|
||||
|
||||
if (mode === "assert-image-providers") {
|
||||
|
||||
@@ -115,7 +115,7 @@ describe("native hook relay registry", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("preserves permission relays while marking hook-only events without handlers inactive", () => {
|
||||
it("preserves safety relays while marking hook-only events without handlers inactive", () => {
|
||||
const relay = registerNativeHookRelay({
|
||||
provider: "codex",
|
||||
sessionId: "session-1",
|
||||
@@ -127,66 +127,12 @@ describe("native hook relay registry", () => {
|
||||
},
|
||||
});
|
||||
|
||||
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(false);
|
||||
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(true);
|
||||
expect(relay.shouldRelayEvent("post_tool_use")).toBe(false);
|
||||
expect(relay.shouldRelayEvent("before_agent_finalize")).toBe(false);
|
||||
expect(relay.shouldRelayEvent("permission_request")).toBe(true);
|
||||
});
|
||||
|
||||
it("builds pre-tool relay commands only when before-tool policy is active", () => {
|
||||
initializeGlobalHookRunner(
|
||||
createMockPluginRegistry([{ hookName: "before_tool_call", handler: vi.fn() }]),
|
||||
);
|
||||
const relay = registerNativeHookRelay({
|
||||
provider: "codex",
|
||||
sessionId: "session-1",
|
||||
runId: "run-1",
|
||||
command: {
|
||||
executable: "/opt/Open Claw/openclaw.mjs",
|
||||
nodeExecutable: "/usr/local/bin/node",
|
||||
timeoutMs: 1234,
|
||||
},
|
||||
});
|
||||
|
||||
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(true);
|
||||
expect(relay.commandForEvent("pre_tool_use")).toBe(
|
||||
"/usr/local/bin/node '/opt/Open Claw/openclaw.mjs' hooks relay --provider codex --relay-id " +
|
||||
`${relay.relayId} --event pre_tool_use --timeout 1234`,
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps pre-tool relays active when native loop detection is not disabled", () => {
|
||||
const relay = registerNativeHookRelay({
|
||||
provider: "codex",
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
runId: "run-1",
|
||||
command: {
|
||||
executable: "/opt/Open Claw/openclaw.mjs",
|
||||
nodeExecutable: "/usr/local/bin/node",
|
||||
timeoutMs: 1234,
|
||||
},
|
||||
});
|
||||
|
||||
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(true);
|
||||
expect(relay.commandForEvent("pre_tool_use")).toBe(
|
||||
"/usr/local/bin/node '/opt/Open Claw/openclaw.mjs' hooks relay --provider codex --relay-id " +
|
||||
`${relay.relayId} --event pre_tool_use --timeout 1234`,
|
||||
);
|
||||
});
|
||||
|
||||
it("omits pre-tool relays when native loop detection is explicitly disabled", () => {
|
||||
const relay = registerNativeHookRelay({
|
||||
provider: "codex",
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
runId: "run-1",
|
||||
config: { tools: { loopDetection: { enabled: false } } } as never,
|
||||
});
|
||||
|
||||
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(false);
|
||||
});
|
||||
|
||||
it("builds relay commands only for native events with matching local hooks", () => {
|
||||
initializeGlobalHookRunner(
|
||||
createMockPluginRegistry([{ hookName: "after_tool_call", handler: vi.fn() }]),
|
||||
@@ -202,7 +148,7 @@ describe("native hook relay registry", () => {
|
||||
},
|
||||
});
|
||||
|
||||
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(false);
|
||||
expect(relay.shouldRelayEvent("pre_tool_use")).toBe(true);
|
||||
expect(relay.shouldRelayEvent("post_tool_use")).toBe(true);
|
||||
expect(relay.shouldRelayEvent("before_agent_finalize")).toBe(false);
|
||||
expect(relay.commandForEvent("post_tool_use")).toBe(
|
||||
@@ -2338,19 +2284,4 @@ describe("native hook relay command builder", () => {
|
||||
"openclaw hooks relay --provider codex --relay-id relay-1 --event permission_request --timeout 5000",
|
||||
);
|
||||
});
|
||||
|
||||
it("can lower native hook relay process priority", () => {
|
||||
const prefix = process.platform === "win32" ? "" : "nice -n 10 ";
|
||||
expect(
|
||||
buildNativeHookRelayCommand({
|
||||
provider: "codex",
|
||||
relayId: "relay-1",
|
||||
event: "pre_tool_use",
|
||||
executable: "openclaw",
|
||||
nice: 10,
|
||||
}),
|
||||
).toBe(
|
||||
`${prefix}openclaw hooks relay --provider codex --relay-id relay-1 --event pre_tool_use --timeout 5000`,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -17,9 +17,8 @@ import { hasGlobalHooks } from "../../plugins/hook-runner-global.js";
|
||||
import { PluginApprovalResolutions } from "../../plugins/types.js";
|
||||
import { uniqueValues } from "../../shared/string-normalization.js";
|
||||
import { asBoolean } from "../../utils/boolean.js";
|
||||
import { hasBeforeToolCallPolicy, runBeforeToolCallHook } from "../pi-tools.before-tool-call.js";
|
||||
import { runBeforeToolCallHook } from "../pi-tools.before-tool-call.js";
|
||||
import { stableStringify } from "../stable-stringify.js";
|
||||
import { resolveToolLoopDetectionConfig } from "../tool-loop-detection-config.js";
|
||||
import { normalizeToolName } from "../tool-policy.js";
|
||||
import { callGatewayTool } from "../tools/gateway.js";
|
||||
import { runAgentHarnessAfterToolCallHook } from "./hook-helpers.js";
|
||||
@@ -111,7 +110,6 @@ export type RegisterNativeHookRelayParams = {
|
||||
|
||||
export type NativeHookRelayCommandOptions = {
|
||||
executable?: string;
|
||||
nice?: number | false;
|
||||
nodeExecutable?: string;
|
||||
timeoutMs?: number;
|
||||
};
|
||||
@@ -326,13 +324,12 @@ export function registerNativeHookRelay(
|
||||
registerNativeHookRelayBridge(registration);
|
||||
const handle: NativeHookRelayRegistrationHandle = {
|
||||
...registration,
|
||||
shouldRelayEvent: (event) => nativeHookRelayEventHasLocalWork(registration, event),
|
||||
shouldRelayEvent: nativeHookRelayEventHasLocalWork,
|
||||
commandForEvent: (event) =>
|
||||
buildNativeHookRelayCommand({
|
||||
provider: params.provider,
|
||||
relayId,
|
||||
event,
|
||||
nice: params.command?.nice,
|
||||
timeoutMs: params.command?.timeoutMs,
|
||||
executable: params.command?.executable,
|
||||
nodeExecutable: params.command?.nodeExecutable,
|
||||
@@ -379,24 +376,12 @@ function normalizeRelayId(value: string | undefined): string | undefined {
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
function resolveNativeHookRelayNicePrefix(value: number | false | undefined): string[] {
|
||||
if (process.platform === "win32" || value === false || value === undefined) {
|
||||
return [];
|
||||
}
|
||||
const nice = normalizePositiveInteger(value, 0);
|
||||
if (nice <= 0) {
|
||||
return [];
|
||||
}
|
||||
return ["nice", "-n", String(nice)];
|
||||
}
|
||||
|
||||
export function buildNativeHookRelayCommand(params: {
|
||||
provider: NativeHookRelayProvider;
|
||||
relayId: string;
|
||||
event: NativeHookRelayEvent;
|
||||
timeoutMs?: number;
|
||||
executable?: string;
|
||||
nice?: number | false;
|
||||
nodeExecutable?: string;
|
||||
}): string {
|
||||
const timeoutMs = normalizePositiveInteger(params.timeoutMs, DEFAULT_RELAY_TIMEOUT_MS);
|
||||
@@ -405,9 +390,7 @@ export function buildNativeHookRelayCommand(params: {
|
||||
executable === "openclaw"
|
||||
? ["openclaw"]
|
||||
: [params.nodeExecutable ?? process.execPath, executable];
|
||||
const nicePrefix = resolveNativeHookRelayNicePrefix(params.nice);
|
||||
return shellQuoteArgs([
|
||||
...nicePrefix,
|
||||
...argv,
|
||||
"hooks",
|
||||
"relay",
|
||||
@@ -422,25 +405,9 @@ export function buildNativeHookRelayCommand(params: {
|
||||
]);
|
||||
}
|
||||
|
||||
function nativePreToolUseMayRunLoopDetection(registration: NativeHookRelayRegistration): boolean {
|
||||
if (!registration.sessionKey) {
|
||||
return false;
|
||||
}
|
||||
const loopDetection = resolveToolLoopDetectionConfig({
|
||||
cfg: registration.config,
|
||||
agentId: registration.agentId,
|
||||
});
|
||||
return loopDetection?.enabled !== false;
|
||||
}
|
||||
|
||||
function nativeHookRelayEventHasLocalWork(
|
||||
registration: NativeHookRelayRegistration,
|
||||
event: NativeHookRelayEvent,
|
||||
): boolean {
|
||||
function nativeHookRelayEventHasLocalWork(event: NativeHookRelayEvent): boolean {
|
||||
if (event === "pre_tool_use") {
|
||||
// Avoid spawning a native hook relay for every Codex tool call when there
|
||||
// is no before_tool_call hook, trusted-tool policy, or loop detector work.
|
||||
return hasBeforeToolCallPolicy() || nativePreToolUseMayRunLoopDetection(registration);
|
||||
return true;
|
||||
}
|
||||
if (event === "post_tool_use") {
|
||||
return hasGlobalHooks("after_tool_call");
|
||||
|
||||
@@ -20,7 +20,7 @@ import { discoverAuthStorage, discoverModels } from "./pi-model-discovery.js";
|
||||
|
||||
const LIVE = isLiveTestEnabled();
|
||||
const REQUIRE_PROFILE_KEYS = isLiveProfileKeyModeEnabled();
|
||||
const DEFAULT_TARGET_MODEL_REF = "openai-codex/gpt-5.1-codex-mini";
|
||||
const DEFAULT_TARGET_MODEL_REF = "openai-codex/gpt-5.4-mini";
|
||||
const TARGET_MODEL_REF =
|
||||
process.env.OPENCLAW_LIVE_OPENAI_REASONING_COMPAT_MODEL?.trim() || DEFAULT_TARGET_MODEL_REF;
|
||||
const describeLive = LIVE ? describe : describe.skip;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -240,7 +240,7 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
|
||||
expect(coordinator.getRoutedCounts().block).toBe(0);
|
||||
});
|
||||
|
||||
it("waits for direct block dispatcher delivery before resolving block delivery", async () => {
|
||||
it("does not wait for direct block dispatcher delivery before resolving block delivery", async () => {
|
||||
const delivered: unknown[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
@@ -276,13 +276,68 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
|
||||
});
|
||||
|
||||
await deliveryStarted;
|
||||
await Promise.resolve();
|
||||
|
||||
expect(delivered).toEqual([{ text: "hello" }]);
|
||||
expect(deliverySettled).toBe(false);
|
||||
expect(deliverySettled).toBe(true);
|
||||
|
||||
releaseDelivery?.();
|
||||
await expect(deliveryPromise).resolves.toBe(true);
|
||||
expect(deliverySettled).toBe(true);
|
||||
await dispatcher.waitForIdle();
|
||||
});
|
||||
|
||||
it("waits for pending direct block delivery before resolving tool delivery", async () => {
|
||||
const delivered: unknown[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
const deliveryStarted = new Promise<void>((resolve) => {
|
||||
markDeliveryStarted = resolve;
|
||||
});
|
||||
const deliveryGate = new Promise<void>((resolve) => {
|
||||
releaseDelivery = resolve;
|
||||
});
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
delivered.push(payload);
|
||||
markDeliveryStarted?.();
|
||||
await deliveryGate;
|
||||
},
|
||||
});
|
||||
const coordinator = createAcpDispatchDeliveryCoordinator({
|
||||
cfg: createAcpTestConfig(),
|
||||
ctx: buildTestCtx({
|
||||
Provider: "visiblechat",
|
||||
Surface: "visiblechat",
|
||||
SessionKey: "agent:codex-acp:session-1",
|
||||
}),
|
||||
dispatcher,
|
||||
inboundAudio: false,
|
||||
shouldRouteToOriginating: false,
|
||||
});
|
||||
|
||||
await expect(coordinator.deliver("block", { text: "hello" }, { skipTts: true })).resolves.toBe(
|
||||
true,
|
||||
);
|
||||
await deliveryStarted;
|
||||
|
||||
let toolDeliverySettled = false;
|
||||
const toolDeliveryPromise = coordinator
|
||||
.deliver("tool", { text: "tool result" }, { skipTts: true })
|
||||
.then((result) => {
|
||||
toolDeliverySettled = true;
|
||||
return result;
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
|
||||
expect(delivered).toEqual([{ text: "hello" }]);
|
||||
expect(toolDeliverySettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await expect(toolDeliveryPromise).resolves.toBe(true);
|
||||
expect(toolDeliverySettled).toBe(true);
|
||||
expect(delivered).toEqual([{ text: "hello" }, { text: "tool result" }]);
|
||||
});
|
||||
|
||||
it("stops waiting for direct block delivery when the ACP dispatch aborts", async () => {
|
||||
|
||||
@@ -234,11 +234,23 @@ export function createAcpDispatchDeliveryCoordinator(params: {
|
||||
},
|
||||
toolMessageByCallId: new Map(),
|
||||
};
|
||||
let hasPendingDirectBlockReplyDelivery = false;
|
||||
const waitForPendingDirectBlockReplyDelivery = async () => {
|
||||
if (!hasPendingDirectBlockReplyDelivery) {
|
||||
return;
|
||||
}
|
||||
// ACP direct block replies should not block the common visible-reply path.
|
||||
// Defer the idle wait until a later tool delivery would otherwise overtake
|
||||
// that block reply in user-visible ordering.
|
||||
hasPendingDirectBlockReplyDelivery = false;
|
||||
await waitForReplyDispatcherIdle(params.dispatcher, params.abortSignal);
|
||||
};
|
||||
const settleDirectVisibleText = async () => {
|
||||
if (state.settledDirectVisibleText || state.queuedDirectVisibleTextDeliveries === 0) {
|
||||
return;
|
||||
}
|
||||
state.settledDirectVisibleText = true;
|
||||
hasPendingDirectBlockReplyDelivery = false;
|
||||
await params.dispatcher.waitForIdle();
|
||||
const failedCounts = params.dispatcher.getFailedCounts();
|
||||
const failedVisibleCount = failedCounts.block + failedCounts.final;
|
||||
@@ -440,6 +452,10 @@ export function createAcpDispatchDeliveryCoordinator(params: {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (kind === "tool") {
|
||||
await waitForPendingDirectBlockReplyDelivery();
|
||||
}
|
||||
|
||||
const tracksVisibleText = await shouldTreatDeliveredTextAsVisible({
|
||||
channel: directChannel,
|
||||
kind,
|
||||
@@ -462,7 +478,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
|
||||
state.failedVisibleTextDelivery = true;
|
||||
}
|
||||
if (kind === "block" && delivered) {
|
||||
await waitForReplyDispatcherIdle(params.dispatcher, params.abortSignal);
|
||||
hasPendingDirectBlockReplyDelivery = true;
|
||||
}
|
||||
return delivered;
|
||||
};
|
||||
|
||||
@@ -4738,6 +4738,78 @@ describe("dispatchReplyFromConfig", () => {
|
||||
expect(replyResolver).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not run plugin-owned binding delivery when the caller already aborted", async () => {
|
||||
setNoAbort();
|
||||
hookMocks.runner.hasHooks.mockImplementation(
|
||||
((hookName?: string) =>
|
||||
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
|
||||
);
|
||||
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
|
||||
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
|
||||
status: "handled",
|
||||
result: { handled: true, reply: { text: "should not send" } },
|
||||
});
|
||||
sessionBindingMocks.resolveByConversation.mockReturnValue({
|
||||
bindingId: "binding-aborted-1",
|
||||
targetSessionKey: "plugin-binding:codex:abc123",
|
||||
targetKind: "session",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1481858418548412579",
|
||||
},
|
||||
status: "active",
|
||||
boundAt: 1710000000000,
|
||||
metadata: {
|
||||
pluginBindingOwner: "plugin",
|
||||
pluginId: "openclaw-codex-app-server",
|
||||
pluginRoot: "/workspace/openclaw-app-server",
|
||||
data: {
|
||||
kind: "codex-app-server-session",
|
||||
version: 1,
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
workspaceDir: "/workspace/openclaw",
|
||||
},
|
||||
},
|
||||
} satisfies SessionBindingRecord);
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
const cfg = emptyConfig;
|
||||
const dispatcher = createDispatcher();
|
||||
const ctx = buildTestCtx({
|
||||
Provider: "discord",
|
||||
Surface: "discord",
|
||||
OriginatingChannel: "discord",
|
||||
OriginatingTo: "discord:channel:1481858418548412579",
|
||||
To: "discord:channel:1481858418548412579",
|
||||
AccountId: "default",
|
||||
SenderId: "user-9",
|
||||
SenderUsername: "ada",
|
||||
CommandAuthorized: true,
|
||||
WasMentioned: false,
|
||||
CommandBody: "who are you",
|
||||
RawBody: "who are you",
|
||||
Body: "who are you",
|
||||
MessageSid: "msg-claim-plugin-aborted-1",
|
||||
SessionKey: "agent:main:discord:channel:1481858418548412579",
|
||||
});
|
||||
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
|
||||
|
||||
const result = await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions: { abortSignal: abortController.signal },
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } });
|
||||
expect(sessionBindingMocks.touch).not.toHaveBeenCalled();
|
||||
expect(hookMocks.runner.runInboundClaimForPluginOutcome).not.toHaveBeenCalled();
|
||||
expect(replyResolver).not.toHaveBeenCalled();
|
||||
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("lets authorized plugin-owned binding commands fall through to command processing", async () => {
|
||||
setNoAbort();
|
||||
expect(
|
||||
@@ -5706,7 +5778,7 @@ describe("dispatchReplyFromConfig", () => {
|
||||
expect(callOrder).toEqual(["queued:The answer is 42", "dispatch:The answer is 42"]);
|
||||
});
|
||||
|
||||
it("waits for same-channel block dispatcher delivery before resolving block replies", async () => {
|
||||
it("does not wait for same-channel block dispatcher delivery before resolving block replies", async () => {
|
||||
setNoAbort();
|
||||
const ctx = buildTestCtx({ Provider: "whatsapp" });
|
||||
const delivered: ReplyPayload[] = [];
|
||||
@@ -5739,10 +5811,10 @@ describe("dispatchReplyFromConfig", () => {
|
||||
await deliveryStarted;
|
||||
|
||||
expect(delivered).toEqual([{ text: "before tool" }]);
|
||||
expect(blockReplySettled).toBe(false);
|
||||
await blockReplyPromise;
|
||||
expect(blockReplySettled).toBe(true);
|
||||
|
||||
releaseDelivery?.();
|
||||
await blockReplyPromise;
|
||||
return undefined;
|
||||
};
|
||||
|
||||
@@ -5754,6 +5826,177 @@ describe("dispatchReplyFromConfig", () => {
|
||||
});
|
||||
|
||||
expect(blockReplySettled).toBe(true);
|
||||
await dispatcher.waitForIdle();
|
||||
});
|
||||
|
||||
it("waits for pending same-channel block delivery before completing block-only dispatch", async () => {
|
||||
setNoAbort();
|
||||
const ctx = buildTestCtx({ Provider: "whatsapp" });
|
||||
const delivered: ReplyPayload[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
const deliveryStarted = new Promise<void>((resolve) => {
|
||||
markDeliveryStarted = resolve;
|
||||
});
|
||||
const deliveryGate = new Promise<void>((resolve) => {
|
||||
releaseDelivery = resolve;
|
||||
});
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
delivered.push(payload);
|
||||
markDeliveryStarted?.();
|
||||
await deliveryGate;
|
||||
},
|
||||
});
|
||||
const replyResolver = async (
|
||||
_ctx: MsgContext,
|
||||
opts?: GetReplyOptions,
|
||||
): Promise<ReplyPayload | undefined> => {
|
||||
await opts?.onBlockReply?.({ text: "only block" });
|
||||
return undefined;
|
||||
};
|
||||
|
||||
let dispatchSettled = false;
|
||||
const dispatchPromise = dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
}).then((result) => {
|
||||
dispatchSettled = true;
|
||||
return result;
|
||||
});
|
||||
|
||||
await deliveryStarted;
|
||||
|
||||
expect(delivered).toEqual([{ text: "only block" }]);
|
||||
expect(dispatchSettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await dispatchPromise;
|
||||
|
||||
expect(dispatchSettled).toBe(true);
|
||||
});
|
||||
|
||||
it("waits for pending same-channel block delivery before forwarding tool progress", async () => {
|
||||
setNoAbort();
|
||||
const cfg = {
|
||||
agents: { defaults: { verboseDefault: "on" } },
|
||||
} as const satisfies OpenClawConfig;
|
||||
const ctx = buildTestCtx({ Provider: "whatsapp" });
|
||||
const delivered: ReplyPayload[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
const deliveryStarted = new Promise<void>((resolve) => {
|
||||
markDeliveryStarted = resolve;
|
||||
});
|
||||
const deliveryGate = new Promise<void>((resolve) => {
|
||||
releaseDelivery = resolve;
|
||||
});
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
delivered.push(payload);
|
||||
markDeliveryStarted?.();
|
||||
await deliveryGate;
|
||||
},
|
||||
});
|
||||
const onToolStart = vi.fn();
|
||||
let toolProgressSettled = false;
|
||||
const replyResolver = async (
|
||||
_ctx: MsgContext,
|
||||
opts?: GetReplyOptions,
|
||||
): Promise<ReplyPayload | undefined> => {
|
||||
await opts?.onBlockReply?.({ text: "before tool" });
|
||||
const toolProgressPromise = Promise.resolve(opts?.onToolStart?.({ name: "lookup" })).then(
|
||||
() => {
|
||||
toolProgressSettled = true;
|
||||
},
|
||||
);
|
||||
|
||||
await deliveryStarted;
|
||||
|
||||
expect(delivered).toEqual([{ text: "before tool" }]);
|
||||
expect(onToolStart).not.toHaveBeenCalled();
|
||||
expect(toolProgressSettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await toolProgressPromise;
|
||||
return undefined;
|
||||
};
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
replyOptions: { onToolStart },
|
||||
});
|
||||
|
||||
expect(toolProgressSettled).toBe(true);
|
||||
expect(onToolStart).toHaveBeenCalledWith({ name: "lookup" });
|
||||
});
|
||||
|
||||
it("does not synthesize tool-start capability while ordering item progress", async () => {
|
||||
setNoAbort();
|
||||
const cfg = {
|
||||
agents: { defaults: { verboseDefault: "on" } },
|
||||
} as const satisfies OpenClawConfig;
|
||||
const ctx = buildTestCtx({ Provider: "whatsapp" });
|
||||
const delivered: ReplyPayload[] = [];
|
||||
let releaseDelivery: (() => void) | undefined;
|
||||
let markDeliveryStarted: (() => void) | undefined;
|
||||
const deliveryStarted = new Promise<void>((resolve) => {
|
||||
markDeliveryStarted = resolve;
|
||||
});
|
||||
const deliveryGate = new Promise<void>((resolve) => {
|
||||
releaseDelivery = resolve;
|
||||
});
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
delivered.push(payload);
|
||||
markDeliveryStarted?.();
|
||||
await deliveryGate;
|
||||
},
|
||||
});
|
||||
const onItemEvent = vi.fn();
|
||||
let itemProgressSettled = false;
|
||||
const replyResolver = async (
|
||||
_ctx: MsgContext,
|
||||
opts?: GetReplyOptions,
|
||||
): Promise<ReplyPayload | undefined> => {
|
||||
await opts?.onBlockReply?.({ text: "before item" });
|
||||
expect(opts?.onToolStart).toBeUndefined();
|
||||
const itemProgressPromise = Promise.resolve(
|
||||
opts?.onItemEvent?.({ itemId: "1", kind: "tool", progressText: "running" }),
|
||||
).then(() => {
|
||||
itemProgressSettled = true;
|
||||
});
|
||||
|
||||
await deliveryStarted;
|
||||
|
||||
expect(delivered).toEqual([{ text: "before item" }]);
|
||||
expect(onItemEvent).not.toHaveBeenCalled();
|
||||
expect(itemProgressSettled).toBe(false);
|
||||
|
||||
releaseDelivery?.();
|
||||
await itemProgressPromise;
|
||||
return undefined;
|
||||
};
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
replyOptions: { onItemEvent },
|
||||
});
|
||||
|
||||
expect(itemProgressSettled).toBe(true);
|
||||
expect(onItemEvent).toHaveBeenCalledWith({
|
||||
itemId: "1",
|
||||
kind: "tool",
|
||||
progressText: "running",
|
||||
});
|
||||
});
|
||||
|
||||
it("forwards payload metadata into onBlockReplyQueued context", async () => {
|
||||
|
||||
@@ -61,6 +61,7 @@ import {
|
||||
markDiagnosticSessionProgress,
|
||||
} from "../../logging/diagnostic.js";
|
||||
import { createDiagnosticMessageLifecycle } from "../../logging/message-lifecycle.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { matchPluginCommand } from "../../plugins/commands.js";
|
||||
import {
|
||||
buildPluginBindingDeclinedText,
|
||||
@@ -131,6 +132,7 @@ import { resolveOriginMessageProvider } from "./origin-routing.js";
|
||||
import { waitForReplyDispatcherIdle } from "./reply-dispatcher.js";
|
||||
import type { ReplyDispatcher } from "./reply-dispatcher.types.js";
|
||||
import { replyRunRegistry, type ReplyOperation } from "./reply-run-registry.js";
|
||||
import { isReplyProfilerEnabled } from "./reply-timing-tracker.js";
|
||||
import { admitReplyTurn, resolveReplyTurnKind } from "./reply-turn-admission.js";
|
||||
import { resolveRoutedDeliveryThreadId } from "./routed-delivery-thread.js";
|
||||
import { resolveReplyRoutingDecision } from "./routing-policy.js";
|
||||
@@ -783,6 +785,102 @@ function createAbortAwareDispatcher(params: {
|
||||
return dispatcher;
|
||||
}
|
||||
|
||||
type ReplyHotPathTimingSpan = {
|
||||
name: string;
|
||||
durationMs: number;
|
||||
elapsedMs: number;
|
||||
};
|
||||
|
||||
type ReplyHotPathTimingSummary = {
|
||||
totalMs: number;
|
||||
spans: ReplyHotPathTimingSpan[];
|
||||
};
|
||||
|
||||
const replyHotPathTimingLog = createSubsystemLogger("auto-reply/reply-timing");
|
||||
const REPLY_HOT_PATH_TIMING_WARN_TOTAL_MS = 1_000;
|
||||
const REPLY_HOT_PATH_TIMING_WARN_STAGE_MS = 500;
|
||||
|
||||
function createReplyHotPathTimingTracker(options: { profilerEnabled?: boolean } = {}): {
|
||||
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
|
||||
logIfSlow: (params: {
|
||||
channel: string;
|
||||
messageId?: number | string;
|
||||
sessionKey?: string;
|
||||
outcome: "completed" | "skipped" | "error";
|
||||
reason?: string;
|
||||
}) => void;
|
||||
} {
|
||||
if (!options.profilerEnabled) {
|
||||
// This slow-path splitter was added for latency investigation. Keep it
|
||||
// inert in normal production dispatches so only explicit profiler runs pay
|
||||
// the Date.now/span allocation cost.
|
||||
return {
|
||||
async measure(_name, run) {
|
||||
return await run();
|
||||
},
|
||||
logIfSlow() {},
|
||||
};
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
let didLog = false;
|
||||
const spans: ReplyHotPathTimingSpan[] = [];
|
||||
const toMs = (value: number) => Math.max(0, Math.round(value));
|
||||
const snapshot = (): ReplyHotPathTimingSummary => ({
|
||||
totalMs: toMs(Date.now() - startedAt),
|
||||
spans: spans.slice(),
|
||||
});
|
||||
const shouldLog = (summary: ReplyHotPathTimingSummary) =>
|
||||
summary.totalMs >= REPLY_HOT_PATH_TIMING_WARN_TOTAL_MS ||
|
||||
summary.spans.some((span) => span.durationMs >= REPLY_HOT_PATH_TIMING_WARN_STAGE_MS);
|
||||
const formatSpans = (summary: ReplyHotPathTimingSummary) =>
|
||||
summary.spans.length > 0
|
||||
? summary.spans
|
||||
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
|
||||
.join(",")
|
||||
: "none";
|
||||
return {
|
||||
async measure(name, run) {
|
||||
const spanStartedAt = Date.now();
|
||||
try {
|
||||
return await run();
|
||||
} finally {
|
||||
spans.push({
|
||||
name,
|
||||
durationMs: toMs(Date.now() - spanStartedAt),
|
||||
elapsedMs: toMs(Date.now() - startedAt),
|
||||
});
|
||||
}
|
||||
},
|
||||
logIfSlow(params) {
|
||||
if (didLog) {
|
||||
return;
|
||||
}
|
||||
const summary = snapshot();
|
||||
if (!shouldLog(summary)) {
|
||||
return;
|
||||
}
|
||||
didLog = true;
|
||||
replyHotPathTimingLog.warn(
|
||||
`reply hot path timings channel=${params.channel} messageId=${
|
||||
params.messageId ?? "unknown"
|
||||
} sessionKey=${params.sessionKey ?? "unknown"} outcome=${params.outcome} totalMs=${
|
||||
summary.totalMs
|
||||
} stages=${formatSpans(summary)}${params.reason ? ` reason=${params.reason}` : ""}`,
|
||||
{
|
||||
channel: params.channel,
|
||||
messageId: params.messageId,
|
||||
sessionKey: params.sessionKey,
|
||||
outcome: params.outcome,
|
||||
reason: params.reason,
|
||||
totalMs: summary.totalMs,
|
||||
spans: summary.spans,
|
||||
},
|
||||
);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export type {
|
||||
DispatchFromConfigParams,
|
||||
DispatchFromConfigResult,
|
||||
@@ -822,12 +920,17 @@ export async function dispatchReplyFromConfig(
|
||||
hasSessionKey: Boolean(sessionKey),
|
||||
hasRunId: typeof params.replyOptions?.runId === "string",
|
||||
};
|
||||
const replyHotPathTiming = createReplyHotPathTimingTracker({
|
||||
profilerEnabled: isReplyProfilerEnabled({ config: cfg }),
|
||||
});
|
||||
const traceReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
|
||||
measureDiagnosticsTimelineSpan(name, run, {
|
||||
phase: "agent-turn",
|
||||
config: cfg,
|
||||
attributes: traceAttributes,
|
||||
});
|
||||
replyHotPathTiming.measure(name, () =>
|
||||
measureDiagnosticsTimelineSpan(name, run, {
|
||||
phase: "agent-turn",
|
||||
config: cfg,
|
||||
attributes: traceAttributes,
|
||||
}),
|
||||
);
|
||||
let agentDispatchStartedAt = 0;
|
||||
|
||||
const recordProcessed = (
|
||||
@@ -837,6 +940,15 @@ export async function dispatchReplyFromConfig(
|
||||
error?: string;
|
||||
},
|
||||
) => {
|
||||
if (diagnosticsEnabled) {
|
||||
replyHotPathTiming.logIfSlow({
|
||||
channel,
|
||||
messageId,
|
||||
sessionKey,
|
||||
outcome,
|
||||
reason: opts?.reason,
|
||||
});
|
||||
}
|
||||
messageLifecycle.markProcessed(outcome, opts);
|
||||
};
|
||||
|
||||
@@ -1432,6 +1544,16 @@ export async function dispatchReplyFromConfig(
|
||||
counts: dispatcher.getQueuedCounts(),
|
||||
});
|
||||
};
|
||||
const finishReplyOperationAbortedDispatch = (): DispatchFromConfigResult => {
|
||||
commitInboundDedupeIfClaimed();
|
||||
recordProcessed("completed", { reason: "reply_operation_aborted" });
|
||||
markIdle("message_completed");
|
||||
completeDispatchReplyOperation();
|
||||
return attachSourceReplyDeliveryMode({
|
||||
queuedFinal: false,
|
||||
counts: dispatcher.getQueuedCounts(),
|
||||
});
|
||||
};
|
||||
|
||||
let pluginFallbackReason:
|
||||
| "plugin-bound-fallback-missing-plugin"
|
||||
@@ -1439,6 +1561,9 @@ export async function dispatchReplyFromConfig(
|
||||
| undefined;
|
||||
|
||||
if (pluginOwnedBinding) {
|
||||
if (isPreDispatchOperationAborted()) {
|
||||
return finishReplyOperationAbortedDispatch();
|
||||
}
|
||||
touchConversationBindingRecord(pluginOwnedBinding.bindingId);
|
||||
if (shouldBypassPluginOwnedBindingForCommand(ctx)) {
|
||||
logVerbose(
|
||||
@@ -2012,6 +2137,17 @@ export async function dispatchReplyFromConfig(
|
||||
const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary;
|
||||
const allowSuppressedSourceProgressCallbacks =
|
||||
params.replyOptions?.allowProgressCallbacksWhenSourceDeliverySuppressed === true;
|
||||
let hasPendingDirectBlockReplyDelivery = false;
|
||||
const waitForPendingDirectBlockReplyDelivery = async (abortSignal?: AbortSignal) => {
|
||||
if (!hasPendingDirectBlockReplyDelivery) {
|
||||
return;
|
||||
}
|
||||
// Direct block replies are queued asynchronously so lightweight replies do
|
||||
// not wait for dispatcher idle. Flush only before later tool/progress
|
||||
// callbacks and final completion where external ordering is visible.
|
||||
hasPendingDirectBlockReplyDelivery = false;
|
||||
await waitForReplyDispatcherIdle(dispatcher, abortSignal);
|
||||
};
|
||||
const shouldForwardProgressCallback = (options?: {
|
||||
forwardWhenSourceDeliverySuppressed?: boolean;
|
||||
requiresToolSummaryVisibility?: boolean;
|
||||
@@ -2031,9 +2167,10 @@ export async function dispatchReplyFromConfig(
|
||||
forwardWhenSourceDeliverySuppressed?: boolean;
|
||||
requiresToolSummaryVisibility?: boolean;
|
||||
onForward?: (...args: Args) => void;
|
||||
waitForDirectBlockReplyDelivery?: boolean;
|
||||
},
|
||||
): ((...args: Args) => Promise<void>) | undefined => {
|
||||
if (!callback && (!suppressAutomaticSourceDelivery || !canTrackSession)) {
|
||||
if (!callback) {
|
||||
return undefined;
|
||||
}
|
||||
return async (...args: Args) => {
|
||||
@@ -2041,6 +2178,12 @@ export async function dispatchReplyFromConfig(
|
||||
return;
|
||||
}
|
||||
markProgress();
|
||||
if (options?.waitForDirectBlockReplyDelivery) {
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (shouldForwardProgressCallback(options)) {
|
||||
options?.onForward?.(...args);
|
||||
await callback?.(...args);
|
||||
@@ -2077,10 +2220,12 @@ export async function dispatchReplyFromConfig(
|
||||
onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
}),
|
||||
onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
onForward: (payload) => {
|
||||
if (hasFailedProgressStatus(payload)) {
|
||||
markVisibleToolErrorProgress();
|
||||
@@ -2090,6 +2235,7 @@ export async function dispatchReplyFromConfig(
|
||||
onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
onForward: (payload) => {
|
||||
if (hasFailedProgressStatus(payload)) {
|
||||
markVisibleToolErrorProgress();
|
||||
@@ -2099,14 +2245,20 @@ export async function dispatchReplyFromConfig(
|
||||
onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
}),
|
||||
onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd, {
|
||||
forwardWhenSourceDeliverySuppressed: true,
|
||||
requiresToolSummaryVisibility: true,
|
||||
waitForDirectBlockReplyDelivery: true,
|
||||
}),
|
||||
onToolResult: (payload: ReplyPayload) => {
|
||||
markProgress();
|
||||
const run = async () => {
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
@@ -2171,6 +2323,10 @@ export async function dispatchReplyFromConfig(
|
||||
return;
|
||||
}
|
||||
markProgress();
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
if (
|
||||
shouldForwardProgressCallback({
|
||||
@@ -2193,6 +2349,10 @@ export async function dispatchReplyFromConfig(
|
||||
return;
|
||||
}
|
||||
markProgress();
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
if (
|
||||
shouldForwardProgressCallback({
|
||||
@@ -2223,6 +2383,10 @@ export async function dispatchReplyFromConfig(
|
||||
return;
|
||||
}
|
||||
markProgress();
|
||||
await waitForPendingDirectBlockReplyDelivery(dispatchAbortOperation?.abortSignal);
|
||||
if (isDispatchOperationAborted()) {
|
||||
return;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
if (
|
||||
shouldForwardProgressCallback({
|
||||
@@ -2329,7 +2493,7 @@ export async function dispatchReplyFromConfig(
|
||||
markInboundDedupeReplayUnsafe();
|
||||
const delivered = dispatcher.sendBlockReply(normalizedPayload);
|
||||
if (delivered) {
|
||||
await waitForReplyDispatcherIdle(dispatcher, context?.abortSignal);
|
||||
hasPendingDirectBlockReplyDelivery = true;
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -2461,6 +2625,7 @@ export async function dispatchReplyFromConfig(
|
||||
accumulatedBlockTtsText.trim()
|
||||
) {
|
||||
try {
|
||||
await waitForPendingDirectBlockReplyDelivery(getDispatchAbortSignal());
|
||||
throwIfDispatchOperationAborted();
|
||||
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
|
||||
payload: { text: accumulatedBlockTtsText },
|
||||
@@ -2520,6 +2685,7 @@ export async function dispatchReplyFromConfig(
|
||||
}
|
||||
}
|
||||
|
||||
await waitForPendingDirectBlockReplyDelivery(getDispatchAbortSignal());
|
||||
const counts = dispatcher.getQueuedCounts();
|
||||
counts.final += routedFinalCount;
|
||||
commitInboundDedupeIfClaimed();
|
||||
@@ -2537,14 +2703,7 @@ export async function dispatchReplyFromConfig(
|
||||
});
|
||||
} catch (err) {
|
||||
if (isDispatchReplyOperationAbortedError(err)) {
|
||||
commitInboundDedupeIfClaimed();
|
||||
recordProcessed("completed", { reason: "reply_operation_aborted" });
|
||||
markIdle("message_completed");
|
||||
completeDispatchReplyOperation();
|
||||
return attachSourceReplyDeliveryMode({
|
||||
queuedFinal: false,
|
||||
counts: dispatcher.getQueuedCounts(),
|
||||
});
|
||||
return finishReplyOperationAbortedDispatch();
|
||||
}
|
||||
if (inboundDedupeClaim.status === "claimed") {
|
||||
if (inboundDedupeReplayUnsafe) {
|
||||
|
||||
@@ -15,6 +15,7 @@ import { type OpenClawConfig, getRuntimeConfig } from "../../config/config.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { measureDiagnosticsTimelineSpan } from "../../infra/diagnostics-timeline.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { buildAgentHookContextChannelFields } from "../../plugins/hook-agent-context.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
|
||||
@@ -47,6 +48,7 @@ import { hasInboundMedia } from "./inbound-media.js";
|
||||
import { emitPreAgentMessageHooks } from "./message-preprocess-hooks.js";
|
||||
import { createFastTestModelSelectionState, createModelSelectionState } from "./model-selection.js";
|
||||
import { sanitizePendingFinalDeliveryText } from "./pending-final-delivery.js";
|
||||
import { createReplyTimingTracker } from "./reply-timing-tracker.js";
|
||||
import { initSessionState } from "./session.js";
|
||||
import {
|
||||
isStaleHeartbeatAutoFallbackOverride,
|
||||
@@ -89,6 +91,8 @@ const mediaUnderstandingApplyRuntimeLoader = createLazyImportLoader(
|
||||
const linkUnderstandingApplyRuntimeLoader = createLazyImportLoader(
|
||||
() => import("../../link-understanding/apply.runtime.js"),
|
||||
);
|
||||
|
||||
const replyResolverTimingLog = createSubsystemLogger("auto-reply/reply-resolver-timing");
|
||||
const commandsCoreRuntimeLoader = createLazyImportLoader(
|
||||
() => import("./commands-core.runtime.js"),
|
||||
);
|
||||
@@ -214,45 +218,85 @@ export async function getReplyFromConfig(
|
||||
isFastTestEnv,
|
||||
configOverride,
|
||||
});
|
||||
const useFastTestBootstrap = shouldUseReplyFastTestBootstrap({
|
||||
isFastTestEnv,
|
||||
configOverride,
|
||||
});
|
||||
const useFastTestRuntime = shouldUseReplyFastTestRuntime({
|
||||
cfg,
|
||||
isFastTestEnv,
|
||||
});
|
||||
const finalized = finalizeInboundContext(ctx);
|
||||
const targetSessionKey = resolveCommandTurnTargetSessionKey(finalized);
|
||||
const agentSessionKey = targetSessionKey || finalized.SessionKey;
|
||||
const traceAttributes = {
|
||||
// Profiler spans stay inert unless diagnostics enable `profiler` or
|
||||
// `reply.profiler`, so normal replies do not pay per-stage Date.now/array
|
||||
// bookkeeping while we can still split resolver costs on demand.
|
||||
const resolverTiming = createReplyTimingTracker({ log: replyResolverTimingLog, config: cfg });
|
||||
const useFastTestBootstrap = resolverTiming.measureSync("reply.resolve_fast_test_bootstrap", () =>
|
||||
shouldUseReplyFastTestBootstrap({
|
||||
isFastTestEnv,
|
||||
configOverride,
|
||||
}),
|
||||
);
|
||||
const useFastTestRuntime = resolverTiming.measureSync("reply.resolve_fast_test_runtime", () =>
|
||||
shouldUseReplyFastTestRuntime({
|
||||
cfg,
|
||||
isFastTestEnv,
|
||||
}),
|
||||
);
|
||||
const finalized = resolverTiming.measureSync("reply.finalize_context", () =>
|
||||
finalizeInboundContext(ctx),
|
||||
);
|
||||
const { agentSessionKey, agentId } = resolverTiming.measureSync(
|
||||
"reply.resolve_agent_scope",
|
||||
() => {
|
||||
const targetSessionKey = resolveCommandTurnTargetSessionKey(finalized);
|
||||
const resolvedAgentSessionKey = targetSessionKey || finalized.SessionKey;
|
||||
return {
|
||||
agentSessionKey: resolvedAgentSessionKey,
|
||||
agentId: resolveSessionAgentId({
|
||||
sessionKey: resolvedAgentSessionKey,
|
||||
config: cfg,
|
||||
}),
|
||||
};
|
||||
},
|
||||
);
|
||||
const traceAttributes = resolverTiming.measureSync("reply.resolve_trace_context", () => ({
|
||||
surface: normalizeOptionalString(finalized.Surface ?? finalized.Provider) ?? "unknown",
|
||||
hasSessionKey: Boolean(agentSessionKey),
|
||||
isHeartbeat: opts?.isHeartbeat === true,
|
||||
hasMedia: hasInboundMedia(finalized),
|
||||
};
|
||||
const traceGetReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
|
||||
measureDiagnosticsTimelineSpan(name, run, {
|
||||
phase: "agent-turn",
|
||||
config: cfg,
|
||||
attributes: traceAttributes,
|
||||
}));
|
||||
const messageId = finalized.MessageSid ?? finalized.MessageSidFirst ?? finalized.MessageSidLast;
|
||||
let resolverTimingSessionKey = agentSessionKey;
|
||||
const logResolverTiming = (outcome: string, reason?: string, error?: string) =>
|
||||
resolverTiming.logIfSlow({
|
||||
message: `reply resolver timings surface=${traceAttributes.surface} messageId=${
|
||||
messageId ?? "unknown"
|
||||
} sessionKey=${resolverTimingSessionKey ?? "unknown"} agentId=${agentId}`,
|
||||
outcome,
|
||||
reason,
|
||||
error,
|
||||
details: {
|
||||
surface: traceAttributes.surface,
|
||||
messageId,
|
||||
sessionKey: resolverTimingSessionKey,
|
||||
agentId,
|
||||
},
|
||||
});
|
||||
const agentId = resolveSessionAgentId({
|
||||
sessionKey: agentSessionKey,
|
||||
config: cfg,
|
||||
});
|
||||
const mergedSkillFilter = mergeSkillFilters(
|
||||
opts?.skillFilter,
|
||||
resolveAgentSkillsFilter(cfg, agentId),
|
||||
const traceGetReplyPhase = <T>(name: string, run: () => Promise<T> | T): Promise<T> =>
|
||||
resolverTiming.measure(name, () =>
|
||||
measureDiagnosticsTimelineSpan(name, run, {
|
||||
phase: "agent-turn",
|
||||
config: cfg,
|
||||
attributes: traceAttributes,
|
||||
}),
|
||||
);
|
||||
const mergedSkillFilter = resolverTiming.measureSync("reply.resolve_skill_filter", () =>
|
||||
mergeSkillFilters(opts?.skillFilter, resolveAgentSkillsFilter(cfg, agentId)),
|
||||
);
|
||||
const resolvedOpts =
|
||||
mergedSkillFilter !== undefined ? { ...opts, skillFilter: mergedSkillFilter } : opts;
|
||||
const agentCfg = cfg.agents?.defaults;
|
||||
const sessionCfg = cfg.session;
|
||||
const { defaultProvider, defaultModel, aliasIndex } = resolveDefaultModel({
|
||||
cfg,
|
||||
agentId,
|
||||
});
|
||||
const { defaultProvider, defaultModel, aliasIndex } = resolverTiming.measureSync(
|
||||
"reply.resolve_default_model",
|
||||
() =>
|
||||
resolveDefaultModel({
|
||||
cfg,
|
||||
agentId,
|
||||
}),
|
||||
);
|
||||
let provider = defaultProvider;
|
||||
let model = defaultModel;
|
||||
let hasResolvedHeartbeatModelOverride = false;
|
||||
@@ -277,22 +321,34 @@ export async function getReplyFromConfig(
|
||||
}
|
||||
}
|
||||
|
||||
const workspaceDirRaw = resolveAgentWorkspaceDir(cfg, agentId) ?? DEFAULT_AGENT_WORKSPACE_DIR;
|
||||
const workspaceDirForNativeCommand = workspaceDirRaw;
|
||||
const agentDir = resolveAgentDir(cfg, agentId);
|
||||
const timeoutMs = resolveAgentTimeoutMs({ cfg, overrideSeconds: opts?.timeoutOverrideSeconds });
|
||||
const configuredTypingSeconds =
|
||||
agentCfg?.typingIntervalSeconds ?? sessionCfg?.typingIntervalSeconds;
|
||||
const typingIntervalSeconds =
|
||||
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6;
|
||||
const typing = createTypingController({
|
||||
onReplyStart: opts?.onReplyStart,
|
||||
onCleanup: opts?.onTypingCleanup,
|
||||
typingIntervalSeconds,
|
||||
silentToken: SILENT_REPLY_TOKEN,
|
||||
log: defaultRuntime.log,
|
||||
const { workspaceDirRaw, workspaceDirForNativeCommand, agentDir, timeoutMs } =
|
||||
resolverTiming.measureSync("reply.resolve_workspace_agent_dir", () => {
|
||||
const workspaceDirRaw = resolveAgentWorkspaceDir(cfg, agentId) ?? DEFAULT_AGENT_WORKSPACE_DIR;
|
||||
return {
|
||||
workspaceDirRaw,
|
||||
workspaceDirForNativeCommand: workspaceDirRaw,
|
||||
agentDir: resolveAgentDir(cfg, agentId),
|
||||
timeoutMs: resolveAgentTimeoutMs({
|
||||
cfg,
|
||||
overrideSeconds: opts?.timeoutOverrideSeconds,
|
||||
}),
|
||||
};
|
||||
});
|
||||
const typing = resolverTiming.measureSync("reply.create_typing_controller", () => {
|
||||
const configuredTypingSeconds =
|
||||
agentCfg?.typingIntervalSeconds ?? sessionCfg?.typingIntervalSeconds;
|
||||
const typingIntervalSeconds =
|
||||
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6;
|
||||
const controller = createTypingController({
|
||||
onReplyStart: opts?.onReplyStart,
|
||||
onCleanup: opts?.onTypingCleanup,
|
||||
typingIntervalSeconds,
|
||||
silentToken: SILENT_REPLY_TOKEN,
|
||||
log: defaultRuntime.log,
|
||||
});
|
||||
opts?.onTypingController?.(controller);
|
||||
return controller;
|
||||
});
|
||||
opts?.onTypingController?.(typing);
|
||||
|
||||
const nativeSlashCommandFastReply = await traceGetReplyPhase(
|
||||
"reply.native_slash_command_fast_path",
|
||||
@@ -316,6 +372,7 @@ export async function getReplyFromConfig(
|
||||
}),
|
||||
);
|
||||
if (nativeSlashCommandFastReply.handled) {
|
||||
logResolverTiming("completed", "native_slash_command_fast_path");
|
||||
return nativeSlashCommandFastReply.reply;
|
||||
}
|
||||
|
||||
@@ -390,6 +447,7 @@ export async function getReplyFromConfig(
|
||||
triggerBodyNormalized,
|
||||
bodyStripped,
|
||||
} = sessionState;
|
||||
resolverTimingSessionKey = sessionKey ?? resolverTimingSessionKey;
|
||||
|
||||
if (sessionEntry?.pendingFinalDelivery && sessionEntry.pendingFinalDeliveryText) {
|
||||
const text = sanitizePendingFinalDeliveryText(sessionEntry.pendingFinalDeliveryText);
|
||||
@@ -455,6 +513,7 @@ export async function getReplyFromConfig(
|
||||
}),
|
||||
});
|
||||
}
|
||||
logResolverTiming("completed", "pending_final_delivery_replay");
|
||||
return { text: replayText };
|
||||
}
|
||||
}
|
||||
@@ -579,7 +638,8 @@ export async function getReplyFromConfig(
|
||||
triggerBodyNormalized,
|
||||
commandAuthorized,
|
||||
});
|
||||
return await traceGetReplyPhase("reply.run_prepared_reply", () =>
|
||||
logResolverTiming("milestone", "before_fast_directive_prepared_reply");
|
||||
const fastReplyResult = await traceGetReplyPhase("reply.run_prepared_reply", () =>
|
||||
runPreparedReply({
|
||||
ctx,
|
||||
sessionCtx,
|
||||
@@ -636,6 +696,8 @@ export async function getReplyFromConfig(
|
||||
autoFallbackPrimaryProbe,
|
||||
}),
|
||||
);
|
||||
logResolverTiming("completed", "fast_directive_prepared_reply");
|
||||
return fastReplyResult;
|
||||
}
|
||||
|
||||
const directiveResult = await traceGetReplyPhase("reply.resolve_directives", () =>
|
||||
@@ -671,6 +733,7 @@ export async function getReplyFromConfig(
|
||||
}),
|
||||
);
|
||||
if (directiveResult.kind === "reply") {
|
||||
logResolverTiming("completed", "directive_reply");
|
||||
return directiveResult.reply;
|
||||
}
|
||||
|
||||
@@ -771,6 +834,7 @@ export async function getReplyFromConfig(
|
||||
);
|
||||
if (inlineActionResult.kind === "reply") {
|
||||
await maybeEmitMissingResetHooks();
|
||||
logResolverTiming("completed", "inline_action_reply");
|
||||
return inlineActionResult.reply;
|
||||
}
|
||||
await maybeEmitMissingResetHooks();
|
||||
@@ -862,6 +926,7 @@ export async function getReplyFromConfig(
|
||||
),
|
||||
);
|
||||
if (hookResult?.handled) {
|
||||
logResolverTiming("completed", "before_agent_reply_hook");
|
||||
return hookResult.reply ?? { text: SILENT_REPLY_TOKEN };
|
||||
}
|
||||
}
|
||||
@@ -884,7 +949,8 @@ export async function getReplyFromConfig(
|
||||
);
|
||||
}
|
||||
|
||||
return await traceGetReplyPhase("reply.run_prepared_reply", () =>
|
||||
logResolverTiming("milestone", "before_run_prepared_reply");
|
||||
const replyResult = await traceGetReplyPhase("reply.run_prepared_reply", () =>
|
||||
runPreparedReply({
|
||||
ctx,
|
||||
sessionCtx,
|
||||
@@ -931,4 +997,6 @@ export async function getReplyFromConfig(
|
||||
autoFallbackPrimaryProbe: runAutoFallbackPrimaryProbe,
|
||||
}),
|
||||
);
|
||||
logResolverTiming("completed", "prepared_reply");
|
||||
return replyResult;
|
||||
}
|
||||
|
||||
48
src/auto-reply/reply/reply-timing-tracker.test.ts
Normal file
48
src/auto-reply/reply/reply-timing-tracker.test.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { createReplyTimingTracker, isReplyProfilerEnabled } from "./reply-timing-tracker.js";
|
||||
|
||||
describe("isReplyProfilerEnabled", () => {
|
||||
it("matches global and reply profiler diagnostic flags", () => {
|
||||
const cfg = { diagnostics: { flags: ["reply.profiler"] } } as OpenClawConfig;
|
||||
expect(isReplyProfilerEnabled({ config: cfg, env: {} as NodeJS.ProcessEnv })).toBe(true);
|
||||
expect(
|
||||
isReplyProfilerEnabled({
|
||||
env: { OPENCLAW_DIAGNOSTICS: "profiler" } as NodeJS.ProcessEnv,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createReplyTimingTracker", () => {
|
||||
it("is a pass-through tracker unless the profiler flag is enabled", async () => {
|
||||
const warn = vi.fn();
|
||||
const tracker = createReplyTimingTracker({ log: { warn } });
|
||||
|
||||
expect(tracker.measureSync("sync", () => 42)).toBe(42);
|
||||
await expect(tracker.measure("async", async () => "ok")).resolves.toBe("ok");
|
||||
tracker.logIfSlow({ message: "reply timings" });
|
||||
|
||||
expect(warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("records and logs spans when the profiler flag is enabled", () => {
|
||||
const warn = vi.fn();
|
||||
const tracker = createReplyTimingTracker({
|
||||
log: { warn },
|
||||
env: { OPENCLAW_DIAGNOSTICS: "reply.profiler" } as NodeJS.ProcessEnv,
|
||||
totalWarnMs: 0,
|
||||
stageWarnMs: 0,
|
||||
});
|
||||
|
||||
expect(tracker.measureSync("sync", () => 7)).toBe(7);
|
||||
tracker.logIfSlow({ message: "reply timings", outcome: "completed" });
|
||||
|
||||
expect(warn).toHaveBeenCalledOnce();
|
||||
expect(warn.mock.calls[0]?.[0]).toContain("stages=sync:");
|
||||
expect(warn.mock.calls[0]?.[1]).toMatchObject({
|
||||
outcome: "completed",
|
||||
spans: [expect.objectContaining({ name: "sync" })],
|
||||
});
|
||||
});
|
||||
});
|
||||
141
src/auto-reply/reply/reply-timing-tracker.ts
Normal file
141
src/auto-reply/reply/reply-timing-tracker.ts
Normal file
@@ -0,0 +1,141 @@
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { isDiagnosticFlagEnabled } from "../../infra/diagnostic-flags.js";
|
||||
|
||||
type ReplyTimingSpan = {
|
||||
name: string;
|
||||
durationMs: number;
|
||||
elapsedMs: number;
|
||||
};
|
||||
|
||||
type ReplyTimingSummary = {
|
||||
totalMs: number;
|
||||
spans: ReplyTimingSpan[];
|
||||
};
|
||||
|
||||
type ReplyTimingLogger = {
|
||||
warn: (message: string, details?: Record<string, unknown>) => void;
|
||||
};
|
||||
|
||||
type ReplyTimingTracker = {
|
||||
measure: <T>(name: string, run: () => Promise<T> | T) => Promise<T>;
|
||||
measureSync: <T>(name: string, run: () => T) => T;
|
||||
logIfSlow: (params: {
|
||||
message: string;
|
||||
outcome?: string;
|
||||
reason?: string;
|
||||
error?: string;
|
||||
details?: Record<string, unknown>;
|
||||
}) => void;
|
||||
};
|
||||
|
||||
const DEFAULT_TIMING_WARN_TOTAL_MS = 1_000;
|
||||
const DEFAULT_TIMING_WARN_STAGE_MS = 500;
|
||||
|
||||
export function isReplyProfilerEnabled(params?: {
|
||||
config?: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
}): boolean {
|
||||
const cfg = params?.config;
|
||||
const env = params?.env ?? process.env;
|
||||
return (
|
||||
isDiagnosticFlagEnabled("profiler", cfg, env) ||
|
||||
isDiagnosticFlagEnabled("reply.profiler", cfg, env)
|
||||
);
|
||||
}
|
||||
|
||||
export function createReplyTimingTracker(params: {
|
||||
log: ReplyTimingLogger;
|
||||
config?: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
enabled?: boolean;
|
||||
totalWarnMs?: number;
|
||||
stageWarnMs?: number;
|
||||
}): ReplyTimingTracker {
|
||||
const enabled =
|
||||
params.enabled ?? isReplyProfilerEnabled({ config: params.config, env: params.env });
|
||||
if (!enabled) {
|
||||
// Normal production turns use pass-through wrappers so added profiling
|
||||
// calls do not allocate spans or call Date.now on the hot reply path.
|
||||
return {
|
||||
async measure(_name, run) {
|
||||
return await run();
|
||||
},
|
||||
measureSync(_name, run) {
|
||||
return run();
|
||||
},
|
||||
logIfSlow() {},
|
||||
};
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
const spans: ReplyTimingSpan[] = [];
|
||||
let didLog = false;
|
||||
const totalWarnMs = params.totalWarnMs ?? DEFAULT_TIMING_WARN_TOTAL_MS;
|
||||
const stageWarnMs = params.stageWarnMs ?? DEFAULT_TIMING_WARN_STAGE_MS;
|
||||
const toMs = (value: number) => Math.max(0, Math.round(value));
|
||||
const record = (name: string, spanStartedAt: number) => {
|
||||
spans.push({
|
||||
name,
|
||||
durationMs: toMs(Date.now() - spanStartedAt),
|
||||
elapsedMs: toMs(Date.now() - startedAt),
|
||||
});
|
||||
};
|
||||
const snapshot = (): ReplyTimingSummary => ({
|
||||
totalMs: toMs(Date.now() - startedAt),
|
||||
spans: spans.slice(),
|
||||
});
|
||||
const shouldLog = (summary: ReplyTimingSummary) =>
|
||||
summary.totalMs >= totalWarnMs || summary.spans.some((span) => span.durationMs >= stageWarnMs);
|
||||
const formatSpans = (summary: ReplyTimingSummary) =>
|
||||
summary.spans.length > 0
|
||||
? summary.spans
|
||||
.map((span) => `${span.name}:${span.durationMs}ms@${span.elapsedMs}ms`)
|
||||
.join(",")
|
||||
: "none";
|
||||
|
||||
return {
|
||||
async measure(name, run) {
|
||||
const spanStartedAt = Date.now();
|
||||
try {
|
||||
return await run();
|
||||
} finally {
|
||||
record(name, spanStartedAt);
|
||||
}
|
||||
},
|
||||
measureSync(name, run) {
|
||||
const spanStartedAt = Date.now();
|
||||
try {
|
||||
return run();
|
||||
} finally {
|
||||
record(name, spanStartedAt);
|
||||
}
|
||||
},
|
||||
logIfSlow(logParams) {
|
||||
if (didLog) {
|
||||
return;
|
||||
}
|
||||
const summary = snapshot();
|
||||
if (!shouldLog(summary)) {
|
||||
return;
|
||||
}
|
||||
didLog = true;
|
||||
const suffix = [
|
||||
`totalMs=${summary.totalMs}`,
|
||||
`stages=${formatSpans(summary)}`,
|
||||
logParams.outcome ? `outcome=${logParams.outcome}` : undefined,
|
||||
logParams.reason ? `reason=${logParams.reason}` : undefined,
|
||||
logParams.error ? `error="${logParams.error}"` : undefined,
|
||||
]
|
||||
.filter(Boolean)
|
||||
.join(" ");
|
||||
params.log.warn(`${logParams.message} ${suffix}`, {
|
||||
...logParams.details,
|
||||
outcome: logParams.outcome,
|
||||
reason: logParams.reason,
|
||||
error: logParams.error,
|
||||
totalMs: summary.totalMs,
|
||||
spans: summary.spans,
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -133,6 +133,24 @@ describe("startGatewayMaintenanceTimers", () => {
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("refreshes automatic health snapshots without live channel probes", async () => {
|
||||
vi.useFakeTimers();
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
const deps = createMaintenanceTimerDeps();
|
||||
deps.refreshGatewayHealthSnapshot = vi.fn(async () => ({ ok: true }) as HealthSummary);
|
||||
|
||||
const timers = startGatewayMaintenanceTimers(deps);
|
||||
|
||||
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenCalledWith({ probe: false });
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
|
||||
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenCalledTimes(2);
|
||||
expect(deps.refreshGatewayHealthSnapshot).toHaveBeenLastCalledWith({ probe: false });
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("skips overlapping media cleanup runs", async () => {
|
||||
vi.useFakeTimers();
|
||||
let resolveCleanup = () => {};
|
||||
|
||||
@@ -72,16 +72,17 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
params.nodeSendToAllSubscribed("tick", payload);
|
||||
}, TICK_INTERVAL_MS);
|
||||
|
||||
// periodic health refresh to keep cached snapshot warm
|
||||
// Keep cached health warm without request-time live channel probes. Explicit
|
||||
// status/doctor probe paths still pass probe=true when the operator asks.
|
||||
const healthInterval = setInterval(() => {
|
||||
void params
|
||||
.refreshGatewayHealthSnapshot({ probe: true })
|
||||
.refreshGatewayHealthSnapshot({ probe: false })
|
||||
.catch((err) => params.logHealth.error(`refresh failed: ${formatError(err)}`));
|
||||
}, HEALTH_REFRESH_INTERVAL_MS);
|
||||
|
||||
// Prime cache so first client gets a snapshot without waiting.
|
||||
void params
|
||||
.refreshGatewayHealthSnapshot({ probe: true })
|
||||
.refreshGatewayHealthSnapshot({ probe: false })
|
||||
.catch((err) => params.logHealth.error(`initial refresh failed: ${formatError(err)}`));
|
||||
|
||||
// dedupe cache cleanup
|
||||
|
||||
@@ -199,6 +199,13 @@ function deliveryCall(index = 0): Record<string, any> | undefined {
|
||||
return calls[index]?.[0];
|
||||
}
|
||||
|
||||
function appendTranscriptCall(index = 0): Record<string, any> | undefined {
|
||||
const calls = mocks.appendAssistantMessageToSessionTranscript.mock.calls as unknown as Array<
|
||||
[Record<string, any>]
|
||||
>;
|
||||
return calls[index]?.[0];
|
||||
}
|
||||
|
||||
function firstRespondCall(respond: ReturnType<typeof vi.fn>) {
|
||||
const calls = respond.mock.calls as unknown as Array<
|
||||
[
|
||||
@@ -1455,6 +1462,178 @@ describe("gateway send mirroring", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("waits for source transcript mirroring before responding to message.action", async () => {
|
||||
const telegramPlugin: ChannelPlugin = {
|
||||
id: "telegram",
|
||||
meta: {
|
||||
id: "telegram",
|
||||
label: "Telegram",
|
||||
selectionLabel: "Telegram",
|
||||
docsPath: "/channels/telegram",
|
||||
blurb: "Telegram async source send transcript mirror test plugin.",
|
||||
},
|
||||
capabilities: { chatTypes: ["direct"] },
|
||||
config: {
|
||||
listAccountIds: () => ["default"],
|
||||
resolveAccount: () => ({ enabled: true }),
|
||||
isConfigured: () => true,
|
||||
},
|
||||
actions: {
|
||||
describeMessageTool: () => ({ actions: ["send"] }),
|
||||
supportsAction: ({ action }) => action === "send",
|
||||
handleAction: async () => jsonResult({ ok: true, messageId: "tg-async-1" }),
|
||||
},
|
||||
};
|
||||
const mirrorDeferred = createDeferred<{ ok: boolean; sessionFile: string }>();
|
||||
mocks.getChannelPlugin.mockReturnValue(telegramPlugin);
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]),
|
||||
"send-test-source-message-action-async-mirror",
|
||||
);
|
||||
mocks.dispatchChannelMessageAction.mockResolvedValueOnce(
|
||||
jsonResult({ ok: true, messageId: "tg-async-1" }),
|
||||
);
|
||||
mocks.appendAssistantMessageToSessionTranscript.mockReturnValueOnce(mirrorDeferred.promise);
|
||||
|
||||
const respond = vi.fn();
|
||||
const request = sendHandlers["message.action"]({
|
||||
params: {
|
||||
channel: "telegram",
|
||||
action: "send",
|
||||
params: {
|
||||
to: "chat-123",
|
||||
message: "visible media caption",
|
||||
},
|
||||
sessionKey: "agent:main:telegram:direct:chat-123",
|
||||
agentId: "main",
|
||||
toolContext: {
|
||||
currentChannelProvider: "telegram",
|
||||
currentChannelId: "chat-123",
|
||||
},
|
||||
idempotencyKey: "idem-async-source-message-action",
|
||||
} as never,
|
||||
respond,
|
||||
context: makeContext(),
|
||||
req: { type: "req", id: "1", method: "message.action" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(respond).not.toHaveBeenCalled();
|
||||
|
||||
mirrorDeferred.resolve({ ok: true, sessionFile: "x" });
|
||||
await request;
|
||||
|
||||
expect(firstRespondCall(respond)[0]).toBe(true);
|
||||
});
|
||||
|
||||
it("preserves source transcript mirror order before message.action responses", async () => {
|
||||
const telegramPlugin: ChannelPlugin = {
|
||||
id: "telegram",
|
||||
meta: {
|
||||
id: "telegram",
|
||||
label: "Telegram",
|
||||
selectionLabel: "Telegram",
|
||||
docsPath: "/channels/telegram",
|
||||
blurb: "Telegram ordered async source send transcript mirror test plugin.",
|
||||
},
|
||||
capabilities: { chatTypes: ["direct"] },
|
||||
config: {
|
||||
listAccountIds: () => ["default"],
|
||||
resolveAccount: () => ({ enabled: true }),
|
||||
isConfigured: () => true,
|
||||
},
|
||||
actions: {
|
||||
describeMessageTool: () => ({ actions: ["send"] }),
|
||||
supportsAction: ({ action }) => action === "send",
|
||||
handleAction: async () => jsonResult({ ok: true, messageId: "tg-ordered" }),
|
||||
},
|
||||
};
|
||||
const firstMirrorDeferred = createDeferred<{ ok: boolean; sessionFile: string }>();
|
||||
mocks.getChannelPlugin.mockReturnValue(telegramPlugin);
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]),
|
||||
"send-test-source-message-action-ordered-async-mirror",
|
||||
);
|
||||
mocks.dispatchChannelMessageAction.mockResolvedValue(
|
||||
jsonResult({ ok: true, messageId: "tg-ordered" }),
|
||||
);
|
||||
mocks.appendAssistantMessageToSessionTranscript
|
||||
.mockReturnValueOnce(firstMirrorDeferred.promise)
|
||||
.mockResolvedValueOnce({ ok: true, sessionFile: "x" });
|
||||
|
||||
const firstRespond = vi.fn();
|
||||
const secondRespond = vi.fn();
|
||||
const first = sendHandlers["message.action"]({
|
||||
params: {
|
||||
channel: "telegram",
|
||||
action: "send",
|
||||
params: {
|
||||
to: "chat-123",
|
||||
message: "first visible reply",
|
||||
},
|
||||
sessionKey: "agent:main:telegram:direct:chat-123",
|
||||
agentId: "main",
|
||||
toolContext: {
|
||||
currentChannelProvider: "telegram",
|
||||
currentChannelId: "chat-123",
|
||||
},
|
||||
idempotencyKey: "idem-ordered-source-message-action-1",
|
||||
} as never,
|
||||
respond: firstRespond,
|
||||
context: makeContext(),
|
||||
req: { type: "req", id: "1", method: "message.action" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
await vi.waitFor(() => {
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
const second = sendHandlers["message.action"]({
|
||||
params: {
|
||||
channel: "telegram",
|
||||
action: "send",
|
||||
params: {
|
||||
to: "chat-123",
|
||||
message: "second visible reply",
|
||||
},
|
||||
sessionKey: "agent:main:telegram:direct:chat-123",
|
||||
agentId: "main",
|
||||
toolContext: {
|
||||
currentChannelProvider: "telegram",
|
||||
currentChannelId: "chat-123",
|
||||
},
|
||||
idempotencyKey: "idem-ordered-source-message-action-2",
|
||||
} as never,
|
||||
respond: secondRespond,
|
||||
context: makeContext(),
|
||||
req: { type: "req", id: "2", method: "message.action" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(1);
|
||||
expect(firstRespond).not.toHaveBeenCalled();
|
||||
expect(secondRespond).not.toHaveBeenCalled();
|
||||
expect(appendTranscriptCall(0)).toEqual(
|
||||
expect.objectContaining({ text: "first visible reply" }),
|
||||
);
|
||||
|
||||
firstMirrorDeferred.resolve({ ok: true, sessionFile: "x" });
|
||||
await first;
|
||||
await second;
|
||||
|
||||
expect(firstRespondCall(firstRespond)[0]).toBe(true);
|
||||
expect(firstRespondCall(secondRespond)[0]).toBe(true);
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledTimes(2);
|
||||
expect(appendTranscriptCall(1)).toEqual(
|
||||
expect.objectContaining({ text: "second visible reply" }),
|
||||
);
|
||||
});
|
||||
|
||||
it("mirrors presentation-only source-conversation message.action sends", async () => {
|
||||
const telegramPlugin: ChannelPlugin = {
|
||||
id: "telegram",
|
||||
|
||||
@@ -288,6 +288,37 @@ async function mirrorDeliveredSourceReplyToTranscriptBestEffort(params: {
|
||||
}
|
||||
}
|
||||
|
||||
const sourceReplyTranscriptMirrorQueues = new Map<string, Promise<void>>();
|
||||
|
||||
function resolveSourceReplyTranscriptMirrorQueueKey(
|
||||
mirror: Parameters<typeof mirrorDeliveredSourceReplyToTranscript>[0],
|
||||
): string {
|
||||
return mirror.sessionKey?.trim() || "__global__";
|
||||
}
|
||||
|
||||
function scheduleDeliveredSourceReplyTranscriptMirror(params: {
|
||||
context: GatewayRequestContext;
|
||||
mirror: Parameters<typeof mirrorDeliveredSourceReplyToTranscript>[0];
|
||||
}): Promise<void> {
|
||||
const queueKey = resolveSourceReplyTranscriptMirrorQueueKey(params.mirror);
|
||||
const previous = sourceReplyTranscriptMirrorQueues.get(queueKey);
|
||||
// Queue per session so current-conversation source replies are visible before
|
||||
// a following turn can read the transcript.
|
||||
const queued = (async () => {
|
||||
await previous?.catch(() => undefined);
|
||||
await mirrorDeliveredSourceReplyToTranscriptBestEffort(params);
|
||||
})();
|
||||
sourceReplyTranscriptMirrorQueues.set(queueKey, queued);
|
||||
void queued
|
||||
.finally(() => {
|
||||
if (sourceReplyTranscriptMirrorQueues.get(queueKey) === queued) {
|
||||
sourceReplyTranscriptMirrorQueues.delete(queueKey);
|
||||
}
|
||||
})
|
||||
.catch(() => undefined);
|
||||
return queued;
|
||||
}
|
||||
|
||||
export const sendHandlers: GatewayRequestHandlers = {
|
||||
"message.action": async ({ params, respond, context, client }) => {
|
||||
const p = params;
|
||||
@@ -399,7 +430,7 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
const agentId =
|
||||
normalizeOptionalString(request.agentId) ??
|
||||
(sessionKey ? resolveSessionAgentId({ sessionKey, config: cfg }) : undefined);
|
||||
await mirrorDeliveredSourceReplyToTranscriptBestEffort({
|
||||
await scheduleDeliveredSourceReplyTranscriptMirror({
|
||||
context,
|
||||
mirror: {
|
||||
action: request.action,
|
||||
|
||||
@@ -224,7 +224,7 @@ describe("attachGatewayWsMessageHandler post-connect health refresh", () => {
|
||||
expect(hello.ok).toBe(true);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: true });
|
||||
expect(refreshHealthSnapshot).toHaveBeenCalledWith({ probe: false });
|
||||
});
|
||||
resolveRefresh?.();
|
||||
});
|
||||
|
||||
@@ -1776,7 +1776,10 @@ export function attachGatewayWsMessageHandler(params: GatewayWsMessageHandlerPar
|
||||
presence: snapshot.presence.length,
|
||||
stateVersion: snapshot.stateVersion.presence,
|
||||
});
|
||||
void refreshHealthSnapshot({ probe: true }).catch((err) =>
|
||||
// Post-connect refresh only needs a cached/config snapshot for UI state;
|
||||
// live channel probes here pulled slow Discord/Telegram HTTP checks into
|
||||
// reply-adjacent websocket handshakes.
|
||||
void refreshHealthSnapshot({ probe: false }).catch((err) =>
|
||||
logHealth.error(`post-connect health refresh failed: ${formatError(err)}`),
|
||||
);
|
||||
return;
|
||||
|
||||
@@ -250,6 +250,78 @@ describe("heartbeat runner skips when target session lane is busy", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("returns requests-in-flight when another session for the same agent has an active reply run", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg = createHeartbeatTelegramConfig();
|
||||
await seedHeartbeatTelegramSession(storePath, cfg);
|
||||
const listActiveReplyRunSessionKeys = vi.fn(() => [
|
||||
"agent:main:telegram:group:-1003966283270:topic:547",
|
||||
]);
|
||||
|
||||
const result = await runHeartbeatOnce({
|
||||
cfg,
|
||||
deps: {
|
||||
getQueueSize: vi.fn((_lane?: string) => 0),
|
||||
listActiveReplyRunSessionKeys,
|
||||
nowMs: () => Date.now(),
|
||||
getReplyFromConfig: replySpy,
|
||||
} as HeartbeatDeps,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT });
|
||||
expect(listActiveReplyRunSessionKeys).toHaveBeenCalledOnce();
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores unscoped active reply runs when checking same-agent heartbeat work", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg = createHeartbeatTelegramConfig();
|
||||
await seedHeartbeatTelegramSession(storePath, cfg);
|
||||
const listActiveReplyRunSessionKeys = vi.fn(() => ["legacy-session-key"]);
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
const result = await runHeartbeatOnce({
|
||||
cfg,
|
||||
deps: {
|
||||
getQueueSize: vi.fn((_lane?: string) => 0),
|
||||
listActiveReplyRunSessionKeys,
|
||||
nowMs: () => Date.now(),
|
||||
getReplyFromConfig: replySpy,
|
||||
} as HeartbeatDeps,
|
||||
});
|
||||
|
||||
expect(result.status).toBe("ran");
|
||||
expect(listActiveReplyRunSessionKeys).toHaveBeenCalledOnce();
|
||||
expect(replySpy).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
it("does not defer immediate heartbeat wakes for another active session", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg = createHeartbeatTelegramConfig();
|
||||
await seedHeartbeatTelegramSession(storePath, cfg);
|
||||
const listActiveReplyRunSessionKeys = vi.fn(() => [
|
||||
"agent:main:telegram:group:-1003966283270:topic:547",
|
||||
]);
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
const result = await runHeartbeatOnce({
|
||||
cfg,
|
||||
intent: "immediate",
|
||||
deps: {
|
||||
getQueueSize: vi.fn((_lane?: string) => 0),
|
||||
listActiveReplyRunSessionKeys,
|
||||
nowMs: () => Date.now(),
|
||||
getReplyFromConfig: replySpy,
|
||||
} as HeartbeatDeps,
|
||||
});
|
||||
|
||||
expect(result.status).toBe("ran");
|
||||
expect(replySpy).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
it("does not defer on a recent heartbeat ack pending final delivery", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg = createHeartbeatTelegramConfig();
|
||||
|
||||
@@ -36,7 +36,10 @@ import {
|
||||
} from "../auto-reply/heartbeat.js";
|
||||
import { replaceGenericExternalRunFailureText } from "../auto-reply/reply/agent-runner-failure-copy.js";
|
||||
import { resolveDefaultModel } from "../auto-reply/reply/directive-handling.defaults.js";
|
||||
import { replyRunRegistry } from "../auto-reply/reply/reply-run-registry.js";
|
||||
import {
|
||||
listActiveReplyRunSessionKeys,
|
||||
replyRunRegistry,
|
||||
} from "../auto-reply/reply/reply-run-registry.js";
|
||||
import { resolveResponsePrefixTemplate } from "../auto-reply/reply/response-prefix-template.js";
|
||||
import { HEARTBEAT_TOKEN } from "../auto-reply/tokens.js";
|
||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||
@@ -148,6 +151,7 @@ export type HeartbeatDeps = OutboundSendDeps &
|
||||
getQueueSize?: (lane?: string) => number;
|
||||
getCommandLaneSnapshots?: () => readonly CommandLaneSnapshot[];
|
||||
isReplyRunActive?: (sessionKey: string) => boolean;
|
||||
listActiveReplyRunSessionKeys?: () => readonly string[];
|
||||
nowMs?: () => number;
|
||||
};
|
||||
|
||||
@@ -221,6 +225,17 @@ function hasAgentOptInBusyLaneWork(
|
||||
return hasQueuedWorkInLaneSnapshots(getSnapshots(), (lane) => laneBelongsToAgent(lane, agentId));
|
||||
}
|
||||
|
||||
function hasActiveReplyRunForAgent(
|
||||
agentId: string,
|
||||
listSessionKeys: () => readonly string[],
|
||||
): boolean {
|
||||
const normalizedAgentId = normalizeAgentId(agentId);
|
||||
return listSessionKeys().some((sessionKey) => {
|
||||
const parsed = parseAgentSessionKey(sessionKey);
|
||||
return parsed ? normalizeAgentId(parsed.agentId) === normalizedAgentId : false;
|
||||
});
|
||||
}
|
||||
|
||||
function resolveHeartbeatChannelPlugin(channel: string): ChannelPlugin | undefined {
|
||||
const activePlugin = getActivePluginChannelRegistry()?.channels.find(
|
||||
(entry) => entry.plugin.id === channel,
|
||||
@@ -1345,6 +1360,21 @@ export async function runHeartbeatOnce(opts: {
|
||||
return { status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY };
|
||||
}
|
||||
|
||||
const shouldHonorActiveReplyRuns = opts.intent !== "immediate" && opts.intent !== "manual";
|
||||
const listActiveReplyRuns =
|
||||
opts.deps?.listActiveReplyRunSessionKeys ?? listActiveReplyRunSessionKeys;
|
||||
// Scheduled heartbeats are background work, so defer them when any session on
|
||||
// the same agent is already replying; immediate/manual wakes keep their
|
||||
// existing semantics for explicit user/system actions.
|
||||
if (shouldHonorActiveReplyRuns && hasActiveReplyRunForAgent(agentId, listActiveReplyRuns)) {
|
||||
emitHeartbeatEvent({
|
||||
status: "skipped",
|
||||
reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
|
||||
durationMs: Date.now() - startedAt,
|
||||
});
|
||||
return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT };
|
||||
}
|
||||
|
||||
// Phase 2: Stronger heartbeat deferral while a final delivery replay is pending.
|
||||
// Plain `updatedAt` changes are normal for heartbeat sessions and should not
|
||||
// suppress heartbeat runs; only defer when final delivery recovery is active.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
@@ -420,4 +421,30 @@ describe("bun global install smoke", () => {
|
||||
expect(releaseChecks).toContain("uses: ./.github/workflows/install-smoke.yml");
|
||||
expect(releaseChecks).toContain("run_bun_global_install_smoke: true");
|
||||
});
|
||||
|
||||
it("kills Bun global install smoke commands that ignore TERM after timeout", () => {
|
||||
const result = spawnSync(
|
||||
process.execPath,
|
||||
[
|
||||
BUN_GLOBAL_ASSERTIONS_PATH,
|
||||
"run-with-timeout",
|
||||
"50",
|
||||
process.execPath,
|
||||
"-e",
|
||||
"process.on('SIGTERM', () => {}); setInterval(() => {}, 1000);",
|
||||
],
|
||||
{
|
||||
encoding: "utf8",
|
||||
env: {
|
||||
...process.env,
|
||||
OPENCLAW_BUN_GLOBAL_SMOKE_TIMEOUT_KILL_GRACE_MS: "50",
|
||||
},
|
||||
timeout: 5000,
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.error).toBeUndefined();
|
||||
expect(result.status).toBe(1);
|
||||
expect(result.stderr).toContain(`command timed out after 50ms: ${process.execPath}`);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user