mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 14:01:24 +08:00
Compare commits
8 Commits
v2026.5.29
...
node-worke
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
883eed2e95 | ||
|
|
84d793db29 | ||
|
|
0a6e55fa0a | ||
|
|
f74e4161eb | ||
|
|
99881ae378 | ||
|
|
721f8c070d | ||
|
|
745bd861ae | ||
|
|
2e7246e70f |
@@ -21,11 +21,67 @@ Treat them differently from normal config:
|
||||
|
||||
## Currently documented flags
|
||||
|
||||
| Surface | Key | Use it when | More |
|
||||
| ------------------------ | --------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------- |
|
||||
| Local model runtime | `agents.defaults.experimental.localModelLean` | A smaller or stricter local backend chokes on OpenClaw's full default tool surface | [Local Models](/gateway/local-models) |
|
||||
| Memory search | `agents.defaults.memorySearch.experimental.sessionMemory` | You want `memory_search` to index prior session transcripts and accept the extra storage/indexing cost | [Memory configuration reference](/reference/memory-config#session-memory-search-experimental) |
|
||||
| Structured planning tool | `tools.experimental.planTool` | You want the structured `update_plan` tool exposed for multi-step work tracking in compatible runtimes and UIs | [Gateway configuration reference](/gateway/config-tools#toolsexperimental) |
|
||||
| Surface | Key | Use it when | More |
|
||||
| ------------------------------- | --------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------- |
|
||||
| Local model runtime | `agents.defaults.experimental.localModelLean` | A smaller or stricter local backend chokes on OpenClaw's full default tool surface | [Local Models](/gateway/local-models) |
|
||||
| Agent command runtime isolation | `agents.defaults.experimental.runtimeIsolation` | You want `/agent` command attempts to run in a Node worker compartment while testing parallel-agent isolation | [Agent command runtime isolation](#agent-command-runtime-isolation) |
|
||||
| Memory search | `agents.defaults.memorySearch.experimental.sessionMemory` | You want `memory_search` to index prior session transcripts and accept the extra storage/indexing cost | [Memory configuration reference](/reference/memory-config#session-memory-search-experimental) |
|
||||
| Structured planning tool | `tools.experimental.planTool` | You want the structured `update_plan` tool exposed for multi-step work tracking in compatible runtimes and UIs | [Gateway configuration reference](/gateway/config-tools#toolsexperimental) |
|
||||
|
||||
## Agent command runtime isolation
|
||||
|
||||
`agents.defaults.experimental.runtimeIsolation.mode: "worker"` runs `/agent`
|
||||
command attempts in a Node worker thread. The parent process still owns command
|
||||
routing, model fallback policy, final session-store updates, delivery, and
|
||||
lifecycle reporting; the worker owns the in-repo command runtime attempt itself.
|
||||
|
||||
Normal inbound Gateway replies remain on the in-process embedded runner for now.
|
||||
That path owns live streaming and delivery callbacks in the parent process and
|
||||
needs a dedicated callback bridge before it can move into this worker
|
||||
compartment.
|
||||
|
||||
This is a compartment boundary, not a general speed switch. It can help when
|
||||
several in-repo command agents run at once and you want each run to have its own
|
||||
event loop, worker lifetime, and future filesystem permission scope. It will not
|
||||
make remote model calls faster, and CLI/ACP harnesses such as Codex may still
|
||||
spawn their own child processes inside the worker.
|
||||
|
||||
Session-store writes still go through the normal `updateSessionStore(...)` path.
|
||||
That writer uses a `sessions.json.lock` file lock so worker-thread updates for
|
||||
different agents do not overwrite each other when they share the same store.
|
||||
|
||||
### Enable
|
||||
|
||||
```json5
|
||||
{
|
||||
agents: {
|
||||
defaults: {
|
||||
experimental: {
|
||||
runtimeIsolation: {
|
||||
mode: "worker",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
For developer-only overrides, `OPENCLAW_AGENT_RUNTIME_WORKER=1` forces the
|
||||
worker path and `OPENCLAW_AGENT_RUNTIME_WORKER=0` forces the in-process path.
|
||||
The older `OPENCLAW_AGENT_WORKER_EXPERIMENT` env var is also accepted while the
|
||||
experiment is in flight.
|
||||
|
||||
### Worker permissions
|
||||
|
||||
`runtimeIsolation.permissions: true` also starts the worker with Node permission
|
||||
flags scoped to the agent workspace, agent directory, session transcript,
|
||||
session store and lock files, OpenClaw runtime bundle/development source,
|
||||
bundled plugin source, and runtime dependencies.
|
||||
|
||||
Keep this off unless you are explicitly testing filesystem hardening. Node
|
||||
permission behavior is stricter and more runtime-sensitive than worker
|
||||
isolation itself, so package reads or child-process based harnesses may need
|
||||
additional design before this becomes broadly usable.
|
||||
|
||||
## Local model lean mode
|
||||
|
||||
|
||||
@@ -85,7 +85,7 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f
|
||||
- `maxDiskBytes`: optional sessions-directory budget
|
||||
- `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`)
|
||||
|
||||
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations without taking a runtime file lock. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. When a Gateway is reachable, non-dry-run `openclaw sessions cleanup` and `openclaw agents delete` delegate store mutations to the Gateway so cleanup joins the same writer queue; `--store <path>` is the explicit offline repair path for direct file maintenance. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately and prunes old unreferenced transcript, checkpoint, and trajectory artifacts even when no disk budget is configured.
|
||||
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations and takes a `sessions.json.lock` file lock while reading and writing the store. The file lock keeps Node worker threads and other runtime isolates from losing updates when they mutate the same session store. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. When a Gateway is reachable, non-dry-run `openclaw sessions cleanup` and `openclaw agents delete` delegate store mutations to the Gateway so cleanup joins the same writer queue; `--store <path>` is the explicit offline repair path for direct file maintenance. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately and prunes old unreferenced transcript, checkpoint, and trajectory artifacts even when no disk budget is configured.
|
||||
|
||||
Maintenance keeps durable external conversation pointers such as group sessions
|
||||
and thread-scoped chat sessions, but synthetic runtime entries for cron, hooks,
|
||||
|
||||
@@ -42,6 +42,7 @@ import {
|
||||
} from "./agent-scope.js";
|
||||
import { clearSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
|
||||
import { ensureAuthProfileStore } from "./auth-profiles/store.js";
|
||||
import type { RunAgentAttemptParams } from "./command/attempt-execution.js";
|
||||
import {
|
||||
persistSessionEntry as persistSessionEntryBase,
|
||||
prependInternalEventContext,
|
||||
@@ -72,6 +73,10 @@ import { resolveProviderIdForAuth } from "./provider-auth-aliases.js";
|
||||
import { hydrateResolvedSkillsAsync } from "./skills/snapshot-hydration.js";
|
||||
import { normalizeSpawnedRunMetadata } from "./spawned-context.js";
|
||||
import { resolveAgentTimeoutMs } from "./timeout.js";
|
||||
import {
|
||||
runAgentAttemptInWorker,
|
||||
shouldRunAgentCommandAttemptInWorker,
|
||||
} from "./worker-runtime/agent-runtime.js";
|
||||
import { ensureAgentWorkspace } from "./workspace.js";
|
||||
|
||||
const log = createSubsystemLogger("agents/agent-command");
|
||||
@@ -989,7 +994,7 @@ async function agentCommandInternal(
|
||||
run: async (providerOverride, modelOverride, runOptions) => {
|
||||
const isFallbackRetry = fallbackAttemptIndex > 0;
|
||||
fallbackAttemptIndex += 1;
|
||||
return attemptExecutionRuntime.runAgentAttempt({
|
||||
const attemptParams: RunAgentAttemptParams = {
|
||||
providerOverride,
|
||||
modelOverride,
|
||||
modelFallbacksOverride: effectiveFallbacksOverride,
|
||||
@@ -1039,7 +1044,10 @@ async function agentCommandInternal(
|
||||
lifecycleEnded = true;
|
||||
}
|
||||
},
|
||||
});
|
||||
};
|
||||
return shouldRunAgentCommandAttemptInWorker({ config: cfg })
|
||||
? runAgentAttemptInWorker(attemptParams)
|
||||
: attemptExecutionRuntime.runAgentAttempt(attemptParams);
|
||||
},
|
||||
});
|
||||
result = fallbackResult.result;
|
||||
|
||||
@@ -23,6 +23,7 @@ import { FailoverError } from "../failover-error.js";
|
||||
import { resolveAgentHarnessPolicy } from "../harness/selection.js";
|
||||
import { isCliRuntimeAlias, resolveCliRuntimeExecutionProvider } from "../model-runtime-aliases.js";
|
||||
import { isCliProvider } from "../model-selection.js";
|
||||
import type { RunEmbeddedPiAgentParams } from "../pi-embedded-runner/run/params.js";
|
||||
import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js";
|
||||
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
|
||||
import {
|
||||
@@ -345,7 +346,7 @@ export async function persistCliTurnTranscript(params: {
|
||||
});
|
||||
}
|
||||
|
||||
export function runAgentAttempt(params: {
|
||||
export type RunAgentAttemptParams = {
|
||||
providerOverride: string;
|
||||
modelOverride: string;
|
||||
originalProvider: string;
|
||||
@@ -382,7 +383,9 @@ export function runAgentAttempt(params: {
|
||||
sessionHasHistory?: boolean;
|
||||
suppressPromptPersistenceOnRetry?: boolean;
|
||||
onUserMessagePersisted?: (message: Extract<AgentMessage, { role: "user" }>) => void;
|
||||
}) {
|
||||
};
|
||||
|
||||
export function runAgentAttempt(params: RunAgentAttemptParams) {
|
||||
const isRawModelRun = params.opts.modelRun === true || params.opts.promptMode === "none";
|
||||
const claudeCliFallbackPrelude =
|
||||
!isRawModelRun &&
|
||||
@@ -583,7 +586,7 @@ export function runAgentAttempt(params: {
|
||||
});
|
||||
}
|
||||
|
||||
return runEmbeddedPiAgent({
|
||||
const embeddedRunParams: RunEmbeddedPiAgentParams = {
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: params.sessionAgentId,
|
||||
@@ -640,7 +643,9 @@ export function runAgentAttempt(params: {
|
||||
onUserMessagePersisted: params.onUserMessagePersisted,
|
||||
bootstrapPromptWarningSignaturesSeen,
|
||||
bootstrapPromptWarningSignature,
|
||||
});
|
||||
};
|
||||
|
||||
return runEmbeddedPiAgent(embeddedRunParams);
|
||||
}
|
||||
|
||||
function resolveSessionPinnedAgentHarnessId(params: {
|
||||
|
||||
@@ -5,6 +5,10 @@ import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
clearContextEnginesForOwner,
|
||||
listContextEngineIds,
|
||||
} from "../../context-engine/registry.js";
|
||||
import type { ContextEngine } from "../../context-engine/types.js";
|
||||
import {
|
||||
resetCliCompactionTestDeps,
|
||||
@@ -167,4 +171,56 @@ describe("runCliTurnCompactionLifecycle", () => {
|
||||
expect(updatedEntry?.cliSessionIds?.["claude-cli"]).toBeUndefined();
|
||||
expect(updatedEntry?.claudeCliSessionId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("registers the built-in context engines before parent-side CLI transcript persistence", async () => {
|
||||
const sessionKey = "agent:main:explicit:worker";
|
||||
const sessionId = "session-worker";
|
||||
const sessionFile = path.join(tmpDir, "session.jsonl");
|
||||
await writeSessionFile({ sessionFile, sessionId });
|
||||
|
||||
clearContextEnginesForOwner("core");
|
||||
expect(listContextEngineIds()).not.toContain("legacy");
|
||||
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId,
|
||||
updatedAt: Date.now(),
|
||||
sessionFile,
|
||||
contextTokens: 1_000,
|
||||
totalTokens: 100,
|
||||
totalTokensFresh: true,
|
||||
};
|
||||
setCliCompactionTestDeps({
|
||||
createPreparedEmbeddedPiSettingsManager: async () => ({
|
||||
getCompactionReserveTokens: () => 200,
|
||||
getCompactionKeepRecentTokens: () => 0,
|
||||
applyOverrides: () => {},
|
||||
}),
|
||||
applyPiAutoCompactionGuard: () => {},
|
||||
shouldPreemptivelyCompactBeforePrompt: () => ({
|
||||
route: "fits",
|
||||
shouldCompact: false,
|
||||
estimatedPromptTokens: 100,
|
||||
promptBudgetBeforeReserve: 800,
|
||||
overflowTokens: 0,
|
||||
toolResultReducibleChars: 0,
|
||||
effectiveReserveTokens: 200,
|
||||
}),
|
||||
resolveLiveToolResultMaxChars: () => 20_000,
|
||||
});
|
||||
|
||||
const updatedEntry = await runCliTurnCompactionLifecycle({
|
||||
cfg: {} as OpenClawConfig,
|
||||
sessionId,
|
||||
sessionKey,
|
||||
sessionEntry,
|
||||
sessionAgentId: "main",
|
||||
workspaceDir: tmpDir,
|
||||
agentDir: tmpDir,
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
});
|
||||
|
||||
expect(updatedEntry).toBe(sessionEntry);
|
||||
expect(listContextEngineIds()).toContain("legacy");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2,6 +2,7 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { ensureContextEnginesInitialized as ensureContextEnginesInitializedImpl } from "../../context-engine/init.js";
|
||||
import { resolveContextEngine as resolveContextEngineImpl } from "../../context-engine/registry.js";
|
||||
import type { ContextEngine } from "../../context-engine/types.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
@@ -27,6 +28,7 @@ type SettingsManagerLike = {
|
||||
setCompactionEnabled?: (enabled: boolean) => void;
|
||||
};
|
||||
type CliCompactionDeps = {
|
||||
ensureContextEnginesInitialized: () => void;
|
||||
openSessionManager: (sessionFile: string) => SessionManagerLike;
|
||||
resolveContextEngine: (cfg: OpenClawConfig) => Promise<ContextEngine>;
|
||||
createPreparedEmbeddedPiSettingsManager: (params: {
|
||||
@@ -48,6 +50,7 @@ type CliCompactionDeps = {
|
||||
const log = createSubsystemLogger("agents/cli-compaction");
|
||||
|
||||
const cliCompactionDeps: CliCompactionDeps = {
|
||||
ensureContextEnginesInitialized: ensureContextEnginesInitializedImpl,
|
||||
openSessionManager: (sessionFile: string) => SessionManager.open(sessionFile),
|
||||
resolveContextEngine: resolveContextEngineImpl,
|
||||
createPreparedEmbeddedPiSettingsManager: createPreparedEmbeddedPiSettingsManagerImpl,
|
||||
@@ -64,6 +67,7 @@ export function setCliCompactionTestDeps(overrides: Partial<typeof cliCompaction
|
||||
|
||||
export function resetCliCompactionTestDeps(): void {
|
||||
Object.assign(cliCompactionDeps, {
|
||||
ensureContextEnginesInitialized: ensureContextEnginesInitializedImpl,
|
||||
openSessionManager: (sessionFile: string) => SessionManager.open(sessionFile),
|
||||
resolveContextEngine: resolveContextEngineImpl,
|
||||
createPreparedEmbeddedPiSettingsManager: createPreparedEmbeddedPiSettingsManagerImpl,
|
||||
@@ -196,6 +200,7 @@ export async function runCliTurnCompactionLifecycle(params: {
|
||||
return params.sessionEntry;
|
||||
}
|
||||
|
||||
cliCompactionDeps.ensureContextEnginesInitialized();
|
||||
const contextEngine = await cliCompactionDeps.resolveContextEngine(params.cfg);
|
||||
const sessionManager = cliCompactionDeps.openSessionManager(sessionFile);
|
||||
const settingsManager = await cliCompactionDeps.createPreparedEmbeddedPiSettingsManager({
|
||||
|
||||
464
src/agents/worker-runtime/agent-runtime.test.ts
Normal file
464
src/agents/worker-runtime/agent-runtime.test.ts
Normal file
@@ -0,0 +1,464 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
onAgentEvent as onParentAgentEvent,
|
||||
resetAgentEventsForTest,
|
||||
type AgentEventPayload,
|
||||
} from "../../infra/agent-events.js";
|
||||
import type { RunAgentAttemptParams } from "../command/attempt-execution.js";
|
||||
import { LiveSessionModelSwitchError } from "../live-model-switch-error.js";
|
||||
import {
|
||||
AgentWorkerUnsupportedParamsError,
|
||||
runAgentAttemptInWorker,
|
||||
shouldRunAgentCommandAttemptInWorker,
|
||||
} from "./agent-runtime.js";
|
||||
import { serializeWorkerError } from "./errors.js";
|
||||
|
||||
function createFixtureWorkerUrl(): URL {
|
||||
const source = `
|
||||
import fs from "node:fs/promises";
|
||||
import { parentPort } from "node:worker_threads";
|
||||
|
||||
let runStarted = false;
|
||||
|
||||
function post(message) {
|
||||
parentPort.postMessage(message);
|
||||
}
|
||||
|
||||
async function writeSessionStoreUpdate(message, tag) {
|
||||
if (!message.params.storePath || !message.params.sessionKey) {
|
||||
return;
|
||||
}
|
||||
await fs.writeFile(
|
||||
message.params.storePath,
|
||||
JSON.stringify({
|
||||
[message.params.sessionKey]: {
|
||||
sessionId: message.params.sessionId,
|
||||
updatedAt: 456,
|
||||
model: "worker-" + tag
|
||||
}
|
||||
}, null, 2)
|
||||
);
|
||||
}
|
||||
|
||||
parentPort.on("message", async (message) => {
|
||||
if (message.type === "abort") {
|
||||
if (runStarted) {
|
||||
post({ type: "error", error: { name: "AbortError", message: "aborted:" + String(message.reason ?? "") } });
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (message.type !== "run" || runStarted) {
|
||||
return;
|
||||
}
|
||||
|
||||
runStarted = true;
|
||||
if (message.initialAbort) {
|
||||
post({ type: "error", error: { name: "AbortError", message: "initial-aborted:" + String(message.initialAbort.reason ?? "") } });
|
||||
return;
|
||||
}
|
||||
post({
|
||||
type: "agentEvent",
|
||||
origin: "runtime",
|
||||
event: {
|
||||
runId: message.params.runId,
|
||||
seq: 7,
|
||||
ts: 123,
|
||||
stream: "tool",
|
||||
data: { phase: "runtime", runId: message.params.runId }
|
||||
}
|
||||
});
|
||||
post({
|
||||
type: "agentEvent",
|
||||
origin: "runtime",
|
||||
event: {
|
||||
runId: "child-run",
|
||||
seq: 1,
|
||||
ts: 124,
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end", runId: "child-run" }
|
||||
}
|
||||
});
|
||||
post({
|
||||
type: "agentEvent",
|
||||
origin: "callback",
|
||||
event: {
|
||||
stream: "lifecycle",
|
||||
sessionKey: message.params.sessionKey,
|
||||
data: { phase: "fixture", runId: message.params.runId }
|
||||
}
|
||||
});
|
||||
post({
|
||||
type: "userMessagePersisted",
|
||||
message: { role: "user", content: [{ type: "text", text: message.params.body }] }
|
||||
});
|
||||
|
||||
if (message.params.body === "throw") {
|
||||
post({ type: "error", error: { name: "FixtureError", message: "fixture failed", code: "FIXTURE" } });
|
||||
return;
|
||||
}
|
||||
if (message.params.body === "mutate-store-then-throw") {
|
||||
await writeSessionStoreUpdate(message, "error");
|
||||
post({ type: "error", error: { name: "FixtureError", message: "fixture failed", code: "FIXTURE" } });
|
||||
return;
|
||||
}
|
||||
if (message.params.body === "switch") {
|
||||
post({
|
||||
type: "error",
|
||||
error: {
|
||||
name: "LiveSessionModelSwitchError",
|
||||
message: "Live session model switch requested: anthropic/claude-sonnet-4.6",
|
||||
control: {
|
||||
type: "liveSessionModelSwitch",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4.6",
|
||||
authProfileId: "profile-1",
|
||||
authProfileIdSource: "user"
|
||||
}
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (message.params.body === "wait") {
|
||||
return;
|
||||
}
|
||||
if (message.params.body === "mutate-store") {
|
||||
await writeSessionStoreUpdate(message, "result");
|
||||
}
|
||||
|
||||
post({
|
||||
type: "result",
|
||||
result: {
|
||||
payloads: [{ text: "worker:" + message.params.body }],
|
||||
meta: {
|
||||
durationMs: 1,
|
||||
finalAssistantVisibleText: "worker:" + message.params.body,
|
||||
agentMeta: {
|
||||
sessionId: message.params.sessionId,
|
||||
provider: message.params.providerOverride ?? "fixture",
|
||||
model: message.params.modelOverride ?? "fixture-model"
|
||||
},
|
||||
executionTrace: { runner: "embedded" }
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
`;
|
||||
return new URL(`data:text/javascript,${encodeURIComponent(source)}`);
|
||||
}
|
||||
|
||||
async function makeWorkerParams(body: string): Promise<RunAgentAttemptParams> {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-agent-worker-"));
|
||||
tmpDirs.push(tmpDir);
|
||||
return {
|
||||
providerOverride: "openai",
|
||||
originalProvider: "openai",
|
||||
modelOverride: "gpt-5.5",
|
||||
cfg: {} as OpenClawConfig,
|
||||
sessionEntry: undefined,
|
||||
sessionId: "session-worker-test",
|
||||
sessionKey: "agent:main:worker-test",
|
||||
sessionAgentId: "main",
|
||||
sessionFile: path.join(tmpDir, "session.jsonl"),
|
||||
workspaceDir: tmpDir,
|
||||
body,
|
||||
isFallbackRetry: false,
|
||||
resolvedThinkLevel: "medium",
|
||||
timeoutMs: 1_000,
|
||||
runId: "run-worker-test",
|
||||
opts: { message: body, senderIsOwner: false },
|
||||
runContext: {} as RunAgentAttemptParams["runContext"],
|
||||
spawnedBy: undefined,
|
||||
messageChannel: undefined,
|
||||
skillsSnapshot: undefined,
|
||||
resolvedVerboseLevel: undefined,
|
||||
agentDir: tmpDir,
|
||||
onAgentEvent: vi.fn(),
|
||||
authProfileProvider: "openai",
|
||||
sessionHasHistory: false,
|
||||
};
|
||||
}
|
||||
|
||||
const tmpDirs: string[] = [];
|
||||
|
||||
describe("agent runtime worker bridge", () => {
|
||||
afterEach(async () => {
|
||||
resetAgentEventsForTest();
|
||||
await Promise.all(tmpDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })));
|
||||
});
|
||||
|
||||
it("recognizes config and explicit environment overrides", () => {
|
||||
expect(
|
||||
shouldRunAgentCommandAttemptInWorker({
|
||||
config: {
|
||||
agents: { defaults: { experimental: { runtimeIsolation: { mode: "worker" } } } },
|
||||
} as OpenClawConfig,
|
||||
env: {},
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldRunAgentCommandAttemptInWorker({
|
||||
config: {
|
||||
agents: { defaults: { experimental: { runtimeIsolation: { mode: "worker" } } } },
|
||||
} as OpenClawConfig,
|
||||
env: { OPENCLAW_AGENT_RUNTIME_WORKER: "0" },
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldRunAgentCommandAttemptInWorker({
|
||||
config: {} as OpenClawConfig,
|
||||
env: { OPENCLAW_AGENT_RUNTIME_WORKER: "yes" },
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldRunAgentCommandAttemptInWorker({
|
||||
config: {} as OpenClawConfig,
|
||||
env: { OPENCLAW_AGENT_WORKER_EXPERIMENT: "1" },
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("runs an agent attempt through a real worker and proxies supported callbacks", async () => {
|
||||
const onAgentEvent = vi.fn();
|
||||
const onUserMessagePersisted = vi.fn();
|
||||
const parentEvents: AgentEventPayload[] = [];
|
||||
const stopParentEvents = onParentAgentEvent((event) => {
|
||||
parentEvents.push(event);
|
||||
});
|
||||
|
||||
const result = await runAgentAttemptInWorker(
|
||||
{
|
||||
...(await makeWorkerParams("hello")),
|
||||
onAgentEvent,
|
||||
onUserMessagePersisted,
|
||||
},
|
||||
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
|
||||
);
|
||||
stopParentEvents();
|
||||
|
||||
expect(result.payloads?.[0]?.text).toBe("worker:hello");
|
||||
expect(result.meta.agentMeta).toMatchObject({
|
||||
sessionId: "session-worker-test",
|
||||
provider: "openai",
|
||||
model: "gpt-5.5",
|
||||
});
|
||||
expect(parentEvents).toEqual([
|
||||
expect.objectContaining({
|
||||
runId: "run-worker-test",
|
||||
stream: "tool",
|
||||
sessionKey: "agent:main:worker-test",
|
||||
data: { phase: "runtime", runId: "run-worker-test" },
|
||||
seq: expect.any(Number),
|
||||
ts: expect.any(Number),
|
||||
}),
|
||||
expect.objectContaining({
|
||||
runId: "child-run",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end", runId: "child-run" },
|
||||
seq: expect.any(Number),
|
||||
ts: expect.any(Number),
|
||||
}),
|
||||
]);
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "tool",
|
||||
sessionKey: "agent:main:worker-test",
|
||||
data: { phase: "runtime", runId: "run-worker-test" },
|
||||
});
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
sessionKey: "agent:main:worker-test",
|
||||
data: { phase: "fixture", runId: "run-worker-test" },
|
||||
});
|
||||
expect(onAgentEvent).not.toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end", runId: "child-run" },
|
||||
});
|
||||
expect(onUserMessagePersisted).toHaveBeenCalledWith({
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
});
|
||||
});
|
||||
|
||||
it("propagates structured worker errors", async () => {
|
||||
await expect(
|
||||
runAgentAttemptInWorker(await makeWorkerParams("throw"), {
|
||||
workerUrl: createFixtureWorkerUrl(),
|
||||
execArgv: [],
|
||||
usePermissions: false,
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
name: "FixtureError",
|
||||
message: "fixture failed",
|
||||
code: "FIXTURE",
|
||||
});
|
||||
});
|
||||
|
||||
it("refreshes parent session store state after a worker result", async () => {
|
||||
const params = await makeWorkerParams("mutate-store");
|
||||
const sessionStore: Record<string, SessionEntry> = {
|
||||
[params.sessionKey!]: {
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: 1,
|
||||
model: "parent-stale",
|
||||
cliSessionBindings: { "claude-cli": { sessionId: "stale-cli-session" } },
|
||||
},
|
||||
};
|
||||
const storePath = path.join(path.dirname(params.sessionFile), "sessions.json");
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
|
||||
|
||||
await runAgentAttemptInWorker(
|
||||
{
|
||||
...params,
|
||||
sessionStore,
|
||||
storePath,
|
||||
},
|
||||
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
|
||||
);
|
||||
|
||||
expect(sessionStore[params.sessionKey!]).toMatchObject({
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: 456,
|
||||
model: "worker-result",
|
||||
});
|
||||
expect(sessionStore[params.sessionKey!]?.cliSessionBindings).toBeUndefined();
|
||||
});
|
||||
|
||||
it("refreshes parent session store state after a worker error", async () => {
|
||||
const params = await makeWorkerParams("mutate-store-then-throw");
|
||||
const sessionStore: Record<string, SessionEntry> = {
|
||||
[params.sessionKey!]: {
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: 1,
|
||||
model: "parent-stale",
|
||||
cliSessionBindings: { "claude-cli": { sessionId: "stale-cli-session" } },
|
||||
},
|
||||
};
|
||||
const storePath = path.join(path.dirname(params.sessionFile), "sessions.json");
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
|
||||
|
||||
await expect(
|
||||
runAgentAttemptInWorker(
|
||||
{
|
||||
...params,
|
||||
sessionStore,
|
||||
storePath,
|
||||
},
|
||||
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
|
||||
),
|
||||
).rejects.toMatchObject({
|
||||
name: "FixtureError",
|
||||
});
|
||||
|
||||
expect(sessionStore[params.sessionKey!]).toMatchObject({
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: 456,
|
||||
model: "worker-error",
|
||||
});
|
||||
expect(sessionStore[params.sessionKey!]?.cliSessionBindings).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves live model switch errors across the worker boundary", async () => {
|
||||
const error = new LiveSessionModelSwitchError({
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4.6",
|
||||
authProfileId: "profile-1",
|
||||
authProfileIdSource: "user",
|
||||
});
|
||||
|
||||
expect(serializeWorkerError(error)).toMatchObject({
|
||||
type: "error",
|
||||
error: {
|
||||
name: "LiveSessionModelSwitchError",
|
||||
control: {
|
||||
type: "liveSessionModelSwitch",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4.6",
|
||||
authProfileId: "profile-1",
|
||||
authProfileIdSource: "user",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await expect(
|
||||
runAgentAttemptInWorker(await makeWorkerParams("switch"), {
|
||||
workerUrl: createFixtureWorkerUrl(),
|
||||
execArgv: [],
|
||||
usePermissions: false,
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
name: "LiveSessionModelSwitchError",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4.6",
|
||||
authProfileId: "profile-1",
|
||||
authProfileIdSource: "user",
|
||||
});
|
||||
await expect(
|
||||
runAgentAttemptInWorker(await makeWorkerParams("switch"), {
|
||||
workerUrl: createFixtureWorkerUrl(),
|
||||
execArgv: [],
|
||||
usePermissions: false,
|
||||
}),
|
||||
).rejects.toBeInstanceOf(LiveSessionModelSwitchError);
|
||||
});
|
||||
|
||||
it("forwards aborts into the worker", async () => {
|
||||
const controller = new AbortController();
|
||||
const promise = runAgentAttemptInWorker(
|
||||
{
|
||||
...(await makeWorkerParams("wait")),
|
||||
opts: { message: "wait", senderIsOwner: false, abortSignal: controller.signal },
|
||||
},
|
||||
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
|
||||
);
|
||||
|
||||
controller.abort("stop");
|
||||
|
||||
await expect(promise).rejects.toMatchObject({
|
||||
name: "AbortError",
|
||||
message: "aborted:stop",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves an already-aborted signal when starting the worker run", async () => {
|
||||
const controller = new AbortController();
|
||||
controller.abort("already stopped");
|
||||
|
||||
await expect(
|
||||
runAgentAttemptInWorker(
|
||||
{
|
||||
...(await makeWorkerParams("hello")),
|
||||
opts: {
|
||||
message: "hello",
|
||||
senderIsOwner: false,
|
||||
abortSignal: controller.signal,
|
||||
},
|
||||
},
|
||||
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
|
||||
),
|
||||
).rejects.toMatchObject({
|
||||
name: "AbortError",
|
||||
message: "initial-aborted:already stopped",
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects invalid abort signal params before spawning a worker", async () => {
|
||||
await expect(
|
||||
runAgentAttemptInWorker(
|
||||
{
|
||||
...(await makeWorkerParams("hello")),
|
||||
opts: {
|
||||
message: "hello",
|
||||
senderIsOwner: false,
|
||||
abortSignal: "bad" as unknown as AbortSignal,
|
||||
},
|
||||
},
|
||||
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
|
||||
),
|
||||
).rejects.toBeInstanceOf(AgentWorkerUnsupportedParamsError);
|
||||
});
|
||||
});
|
||||
292
src/agents/worker-runtime/agent-runtime.ts
Normal file
292
src/agents/worker-runtime/agent-runtime.ts
Normal file
@@ -0,0 +1,292 @@
|
||||
import fs from "node:fs";
|
||||
import { Worker } from "node:worker_threads";
|
||||
import { loadSessionStore } from "../../config/sessions/store.js";
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { emitAgentEvent } from "../../infra/agent-events.js";
|
||||
import type { RunAgentAttemptParams } from "../command/attempt-execution.js";
|
||||
import type {
|
||||
AgentRuntimeWorkerRunParams,
|
||||
AgentWorkerToParentMessage,
|
||||
RunAgentAttemptResult,
|
||||
} from "./agent-runtime.types.js";
|
||||
import { deserializeWorkerError } from "./errors.js";
|
||||
import { buildAgentWorkerPermissionExecArgv } from "./permissions.js";
|
||||
|
||||
const TRUE_VALUES = new Set(["1", "true", "yes", "on"]);
|
||||
const FALSE_VALUES = new Set(["0", "false", "no", "off"]);
|
||||
|
||||
export class AgentWorkerUnsupportedParamsError extends Error {
|
||||
constructor(readonly keys: string[]) {
|
||||
super(`Agent runtime worker experiment does not support params: ${keys.join(", ")}`);
|
||||
this.name = "AgentWorkerUnsupportedParamsError";
|
||||
}
|
||||
}
|
||||
|
||||
export type RunAgentAttemptInWorkerOptions = {
|
||||
/** Test seam; production uses the compiled agent-runtime.worker entry. */
|
||||
workerUrl?: URL;
|
||||
/** Test seam; production inherits the parent process execArgv. */
|
||||
execArgv?: string[];
|
||||
/** Test seam; production follows config/env permission settings. */
|
||||
usePermissions?: boolean;
|
||||
};
|
||||
|
||||
type RuntimeIsolationDecisionParams = {
|
||||
config?: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
};
|
||||
|
||||
function readBooleanOverride(value: string | undefined): boolean | undefined {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
if (!normalized) {
|
||||
return undefined;
|
||||
}
|
||||
if (TRUE_VALUES.has(normalized)) {
|
||||
return true;
|
||||
}
|
||||
if (FALSE_VALUES.has(normalized)) {
|
||||
return false;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function shouldRunAgentCommandAttemptInWorker(
|
||||
params: RuntimeIsolationDecisionParams = {},
|
||||
): boolean {
|
||||
const env = params.env ?? process.env;
|
||||
const explicit =
|
||||
readBooleanOverride(env.OPENCLAW_AGENT_RUNTIME_WORKER) ??
|
||||
readBooleanOverride(env.OPENCLAW_AGENT_WORKER_EXPERIMENT);
|
||||
if (explicit !== undefined) {
|
||||
return explicit;
|
||||
}
|
||||
return params.config?.agents?.defaults?.experimental?.runtimeIsolation?.mode === "worker";
|
||||
}
|
||||
|
||||
function shouldUseWorkerPermissions(
|
||||
config: OpenClawConfig | undefined,
|
||||
env = process.env,
|
||||
): boolean {
|
||||
const explicit =
|
||||
readBooleanOverride(env.OPENCLAW_AGENT_RUNTIME_WORKER_PERMISSIONS) ??
|
||||
readBooleanOverride(env.OPENCLAW_AGENT_WORKER_PERMISSIONS);
|
||||
if (explicit !== undefined) {
|
||||
return explicit;
|
||||
}
|
||||
return config?.agents?.defaults?.experimental?.runtimeIsolation?.permissions === true;
|
||||
}
|
||||
|
||||
function resolveWorkerUrl(): URL {
|
||||
const current = import.meta.url;
|
||||
return new URL(
|
||||
current.endsWith(".ts") ? "./agent-runtime.worker.ts" : "./agent-runtime.worker.js",
|
||||
current,
|
||||
);
|
||||
}
|
||||
|
||||
function serializeAbortReason(reason: unknown): unknown {
|
||||
if (reason instanceof Error) {
|
||||
return { name: reason.name, message: reason.message, stack: reason.stack };
|
||||
}
|
||||
return reason;
|
||||
}
|
||||
|
||||
function serializeInitialAbort(signal: AbortSignal | undefined): { reason?: unknown } | undefined {
|
||||
if (!signal?.aborted) {
|
||||
return undefined;
|
||||
}
|
||||
return { reason: serializeAbortReason(signal.reason) };
|
||||
}
|
||||
|
||||
type ParentVisibleAgentEvent = Parameters<typeof emitAgentEvent>[0];
|
||||
type CallbackAgentEvent = Parameters<RunAgentAttemptParams["onAgentEvent"]>[0];
|
||||
|
||||
function readNonEmptyString(value: unknown): string | undefined {
|
||||
return typeof value === "string" && value.trim() ? value : undefined;
|
||||
}
|
||||
|
||||
function toParentVisibleAgentEvent(
|
||||
params: RunAgentAttemptParams,
|
||||
event: AgentWorkerToParentMessage & { type: "agentEvent"; origin: "runtime" },
|
||||
): ParentVisibleAgentEvent {
|
||||
const runId = readNonEmptyString(event.event.runId) ?? params.runId;
|
||||
const sessionKey =
|
||||
readNonEmptyString(event.event.sessionKey) ??
|
||||
(runId === params.runId ? params.sessionKey : undefined);
|
||||
return {
|
||||
runId,
|
||||
stream: event.event.stream,
|
||||
data: event.event.data,
|
||||
...(sessionKey ? { sessionKey } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function toCallbackAgentEvent(event: ParentVisibleAgentEvent): CallbackAgentEvent {
|
||||
return {
|
||||
stream: event.stream,
|
||||
data: event.data,
|
||||
...(event.sessionKey ? { sessionKey: event.sessionKey } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function isActiveAttemptEvent(
|
||||
params: RunAgentAttemptParams,
|
||||
event: ParentVisibleAgentEvent,
|
||||
): boolean {
|
||||
return event.runId === params.runId;
|
||||
}
|
||||
|
||||
function stripWorkerCallbacks(params: RunAgentAttemptParams): AgentRuntimeWorkerRunParams {
|
||||
const unsupported: string[] = [];
|
||||
if (params.opts.abortSignal && typeof params.opts.abortSignal !== "object") {
|
||||
unsupported.push("opts.abortSignal");
|
||||
}
|
||||
|
||||
if (unsupported.length > 0) {
|
||||
throw new AgentWorkerUnsupportedParamsError(unsupported);
|
||||
}
|
||||
|
||||
const {
|
||||
onAgentEvent: _onAgentEvent,
|
||||
onUserMessagePersisted: _onUserMessagePersisted,
|
||||
...rest
|
||||
} = params;
|
||||
const { abortSignal: _abortSignal, ...opts } = params.opts;
|
||||
return { ...rest, opts };
|
||||
}
|
||||
|
||||
function syncParentSessionStoreFromDisk(params: RunAgentAttemptParams): void {
|
||||
if (!params.sessionStore || !params.storePath || !fs.existsSync(params.storePath)) {
|
||||
return;
|
||||
}
|
||||
const latest = loadSessionStore(params.storePath, { skipCache: true, clone: false });
|
||||
const mutableStore = params.sessionStore as Record<string, SessionEntry>;
|
||||
for (const key of Object.keys(mutableStore)) {
|
||||
delete mutableStore[key];
|
||||
}
|
||||
Object.assign(mutableStore, latest);
|
||||
}
|
||||
|
||||
export async function runAgentAttemptInWorker(
|
||||
params: RunAgentAttemptParams,
|
||||
options: RunAgentAttemptInWorkerOptions = {},
|
||||
): Promise<RunAgentAttemptResult> {
|
||||
const workerParams = stripWorkerCallbacks(params);
|
||||
const worker = new Worker(options.workerUrl ?? resolveWorkerUrl(), {
|
||||
execArgv:
|
||||
(options.usePermissions ?? shouldUseWorkerPermissions(params.cfg))
|
||||
? [
|
||||
...(options.execArgv ?? process.execArgv),
|
||||
...buildAgentWorkerPermissionExecArgv({
|
||||
workspaceDir: params.workspaceDir,
|
||||
agentDir: params.agentDir,
|
||||
sessionFile: params.sessionFile,
|
||||
storePath: params.storePath,
|
||||
}),
|
||||
]
|
||||
: (options.execArgv ?? process.execArgv),
|
||||
name: `openclaw-agent-runtime:${params.sessionAgentId}:${params.sessionId}`,
|
||||
});
|
||||
|
||||
let settled = false;
|
||||
const cleanup = () => {
|
||||
params.opts.abortSignal?.removeEventListener("abort", abort);
|
||||
if (!settled) {
|
||||
void worker.terminate();
|
||||
}
|
||||
};
|
||||
const abort = () => {
|
||||
// oxlint-disable unicorn/require-post-message-target-origin -- worker_threads Worker has no targetOrigin.
|
||||
worker.postMessage({
|
||||
type: "abort",
|
||||
reason: serializeAbortReason(params.opts.abortSignal?.reason),
|
||||
});
|
||||
// oxlint-enable unicorn/require-post-message-target-origin
|
||||
};
|
||||
|
||||
return await new Promise<RunAgentAttemptResult>((resolve, reject) => {
|
||||
worker.once("error", (error) => {
|
||||
settled = true;
|
||||
cleanup();
|
||||
try {
|
||||
syncParentSessionStoreFromDisk(params);
|
||||
reject(error);
|
||||
} catch (syncError) {
|
||||
reject(syncError);
|
||||
}
|
||||
});
|
||||
worker.once("exit", (code) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
cleanup();
|
||||
try {
|
||||
syncParentSessionStoreFromDisk(params);
|
||||
reject(new Error(`Agent runtime worker exited before completing run (code ${code})`));
|
||||
} catch (syncError) {
|
||||
reject(syncError);
|
||||
}
|
||||
});
|
||||
worker.on("message", (message: AgentWorkerToParentMessage) => {
|
||||
if (message.type === "agentEvent") {
|
||||
if (message.origin === "runtime") {
|
||||
const event = toParentVisibleAgentEvent(params, message);
|
||||
emitAgentEvent(event);
|
||||
if (isActiveAttemptEvent(params, event)) {
|
||||
params.onAgentEvent(toCallbackAgentEvent(event));
|
||||
}
|
||||
} else {
|
||||
params.onAgentEvent(message.event);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (message.type === "userMessagePersisted") {
|
||||
params.onUserMessagePersisted?.(message.message);
|
||||
return;
|
||||
}
|
||||
if (message.type === "result") {
|
||||
settled = true;
|
||||
cleanup();
|
||||
try {
|
||||
syncParentSessionStoreFromDisk(params);
|
||||
resolve(message.result);
|
||||
} catch (error) {
|
||||
reject(error);
|
||||
} finally {
|
||||
void worker.terminate();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (message.type === "error") {
|
||||
settled = true;
|
||||
cleanup();
|
||||
try {
|
||||
syncParentSessionStoreFromDisk(params);
|
||||
reject(deserializeWorkerError(message));
|
||||
} catch (error) {
|
||||
reject(error);
|
||||
} finally {
|
||||
void worker.terminate();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
params.opts.abortSignal?.addEventListener("abort", abort, { once: true });
|
||||
try {
|
||||
// oxlint-disable unicorn/require-post-message-target-origin -- worker_threads Worker has no targetOrigin.
|
||||
worker.postMessage({
|
||||
type: "run",
|
||||
params: workerParams,
|
||||
initialAbort: serializeInitialAbort(params.opts.abortSignal),
|
||||
});
|
||||
// oxlint-enable unicorn/require-post-message-target-origin
|
||||
} catch (error) {
|
||||
settled = true;
|
||||
cleanup();
|
||||
void worker.terminate();
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
}
|
||||
45
src/agents/worker-runtime/agent-runtime.types.ts
Normal file
45
src/agents/worker-runtime/agent-runtime.types.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import type { AgentEventPayload } from "../../infra/agent-events.js";
|
||||
import type { runAgentAttempt, RunAgentAttemptParams } from "../command/attempt-execution.js";
|
||||
|
||||
export type AgentRuntimeWorkerRunParams = Omit<
|
||||
RunAgentAttemptParams,
|
||||
"onAgentEvent" | "onUserMessagePersisted" | "opts"
|
||||
> & {
|
||||
opts: Omit<RunAgentAttemptParams["opts"], "abortSignal">;
|
||||
};
|
||||
|
||||
export type RunAgentAttemptResult = Awaited<ReturnType<typeof runAgentAttempt>>;
|
||||
|
||||
export type SerializedWorkerError = {
|
||||
name?: string;
|
||||
message: string;
|
||||
stack?: string;
|
||||
code?: string;
|
||||
control?: {
|
||||
type: "liveSessionModelSwitch";
|
||||
provider: string;
|
||||
model: string;
|
||||
authProfileId?: string;
|
||||
authProfileIdSource?: "auto" | "user";
|
||||
};
|
||||
};
|
||||
|
||||
export type AgentWorkerToParentMessage =
|
||||
| {
|
||||
type: "agentEvent";
|
||||
origin: "callback";
|
||||
event: { stream: string; data?: Record<string, unknown>; sessionKey?: string };
|
||||
}
|
||||
| {
|
||||
type: "agentEvent";
|
||||
origin: "runtime";
|
||||
event: AgentEventPayload;
|
||||
}
|
||||
| { type: "userMessagePersisted"; message: Extract<AgentMessage, { role: "user" }> }
|
||||
| { type: "result"; result: RunAgentAttemptResult }
|
||||
| { type: "error"; error: SerializedWorkerError };
|
||||
|
||||
export type ParentToAgentWorkerMessage =
|
||||
| { type: "run"; params: AgentRuntimeWorkerRunParams; initialAbort?: { reason?: unknown } }
|
||||
| { type: "abort"; reason?: unknown };
|
||||
66
src/agents/worker-runtime/agent-runtime.worker.ts
Normal file
66
src/agents/worker-runtime/agent-runtime.worker.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import { parentPort } from "node:worker_threads";
|
||||
import { onAgentEvent } from "../../infra/agent-events.js";
|
||||
import { runAgentAttempt } from "../command/attempt-execution.js";
|
||||
import type {
|
||||
AgentWorkerToParentMessage,
|
||||
ParentToAgentWorkerMessage,
|
||||
} from "./agent-runtime.types.js";
|
||||
import { serializeWorkerError } from "./errors.js";
|
||||
import { restoreAgentWorkerPluginRuntime } from "./plugin-runtime.js";
|
||||
|
||||
function post(message: AgentWorkerToParentMessage): void {
|
||||
// oxlint-disable-next-line unicorn/require-post-message-target-origin -- worker_threads MessagePort has no targetOrigin.
|
||||
parentPort?.postMessage(message);
|
||||
}
|
||||
|
||||
let abortController: AbortController | undefined;
|
||||
|
||||
parentPort?.on("message", (message: ParentToAgentWorkerMessage) => {
|
||||
if (message.type === "abort") {
|
||||
abortController?.abort(message.reason);
|
||||
return;
|
||||
}
|
||||
|
||||
if (message.type !== "run") {
|
||||
return;
|
||||
}
|
||||
|
||||
abortController = new AbortController();
|
||||
if (message.initialAbort) {
|
||||
abortController.abort(message.initialAbort.reason);
|
||||
}
|
||||
const stopRuntimeEventBridge = onAgentEvent((event) => {
|
||||
post({ type: "agentEvent", origin: "runtime", event });
|
||||
});
|
||||
try {
|
||||
restoreAgentWorkerPluginRuntime(message.params);
|
||||
} catch (error: unknown) {
|
||||
post(serializeWorkerError(error));
|
||||
stopRuntimeEventBridge();
|
||||
abortController = undefined;
|
||||
return;
|
||||
}
|
||||
void runAgentAttempt({
|
||||
...message.params,
|
||||
opts: {
|
||||
...message.params.opts,
|
||||
abortSignal: abortController.signal,
|
||||
},
|
||||
onAgentEvent: (event) => {
|
||||
post({ type: "agentEvent", origin: "callback", event });
|
||||
},
|
||||
onUserMessagePersisted: (persisted) => {
|
||||
post({ type: "userMessagePersisted", message: persisted });
|
||||
},
|
||||
})
|
||||
.then((result) => {
|
||||
post({ type: "result", result });
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
post(serializeWorkerError(error));
|
||||
})
|
||||
.finally(() => {
|
||||
stopRuntimeEventBridge();
|
||||
abortController = undefined;
|
||||
});
|
||||
});
|
||||
67
src/agents/worker-runtime/errors.ts
Normal file
67
src/agents/worker-runtime/errors.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { LiveSessionModelSwitchError } from "../live-model-switch-error.js";
|
||||
import type { AgentWorkerToParentMessage, SerializedWorkerError } from "./agent-runtime.types.js";
|
||||
|
||||
function serializeControlError(error: Error): Pick<SerializedWorkerError, "control"> | undefined {
|
||||
if (error instanceof LiveSessionModelSwitchError) {
|
||||
return {
|
||||
control: {
|
||||
type: "liveSessionModelSwitch",
|
||||
provider: error.provider,
|
||||
model: error.model,
|
||||
...(error.authProfileId ? { authProfileId: error.authProfileId } : {}),
|
||||
...(error.authProfileIdSource ? { authProfileIdSource: error.authProfileIdSource } : {}),
|
||||
},
|
||||
};
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function deserializeControlError(error: SerializedWorkerError): Error | undefined {
|
||||
if (error.control?.type === "liveSessionModelSwitch") {
|
||||
return new LiveSessionModelSwitchError({
|
||||
provider: error.control.provider,
|
||||
model: error.control.model,
|
||||
...(error.control.authProfileId ? { authProfileId: error.control.authProfileId } : {}),
|
||||
...(error.control.authProfileIdSource
|
||||
? { authProfileIdSource: error.control.authProfileIdSource }
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function serializeWorkerError(error: unknown): AgentWorkerToParentMessage {
|
||||
if (error instanceof Error) {
|
||||
const code =
|
||||
typeof (error as Error & { code?: unknown }).code === "string"
|
||||
? (error as Error & { code: string }).code
|
||||
: undefined;
|
||||
return {
|
||||
type: "error",
|
||||
error: {
|
||||
name: error.name,
|
||||
message: error.message,
|
||||
stack: error.stack,
|
||||
...(code ? { code } : {}),
|
||||
...serializeControlError(error),
|
||||
},
|
||||
};
|
||||
}
|
||||
return { type: "error", error: { message: String(error) } };
|
||||
}
|
||||
|
||||
export function deserializeWorkerError(
|
||||
message: AgentWorkerToParentMessage & { type: "error" },
|
||||
): Error {
|
||||
const error = deserializeControlError(message.error) ?? new Error(message.error.message);
|
||||
if (!message.error.control) {
|
||||
error.name = message.error.name ?? "AgentWorkerError";
|
||||
}
|
||||
if (message.error.stack) {
|
||||
error.stack = message.error.stack;
|
||||
}
|
||||
if (message.error.code) {
|
||||
(error as Error & { code?: string }).code = message.error.code;
|
||||
}
|
||||
return error;
|
||||
}
|
||||
46
src/agents/worker-runtime/permissions.test.ts
Normal file
46
src/agents/worker-runtime/permissions.test.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
import path from "node:path";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { buildAgentWorkerPermissionExecArgv } from "./permissions.js";
|
||||
|
||||
describe("agent worker permissions", () => {
|
||||
it("builds deterministic Node permission flags for an agent worker", () => {
|
||||
const workspaceDir = path.resolve("/tmp/openclaw-worker/workspace");
|
||||
const agentDir = path.resolve("/tmp/openclaw-worker/agent");
|
||||
const sessionFile = path.resolve("/tmp/openclaw-worker/agent/session.jsonl");
|
||||
const storePath = path.resolve("/tmp/openclaw-worker/state/sessions.json");
|
||||
|
||||
const args = buildAgentWorkerPermissionExecArgv({
|
||||
workspaceDir,
|
||||
agentDir,
|
||||
sessionFile,
|
||||
storePath,
|
||||
readRoots: [workspaceDir],
|
||||
writeRoots: [path.join(workspaceDir, "out/*")],
|
||||
});
|
||||
|
||||
expect(args[0]).toBe("--permission");
|
||||
expect(args).toContain(`--allow-fs-read=${workspaceDir}`);
|
||||
expect(args).toContain(`--allow-fs-read=${workspaceDir}/*`);
|
||||
expect(args).toContain(`--allow-fs-read=${agentDir}/*`);
|
||||
expect(args).toContain(`--allow-fs-read=${sessionFile}`);
|
||||
expect(args).toContain(`--allow-fs-read=${sessionFile}.lock`);
|
||||
expect(args).toContain(`--allow-fs-read=${storePath}`);
|
||||
expect(args).toContain(`--allow-fs-read=${storePath}.lock`);
|
||||
expect(args).toContain(`--allow-fs-read=${path.resolve("src/*")}`);
|
||||
expect(args).toContain(`--allow-fs-read=${path.resolve("extensions/*")}`);
|
||||
expect(args).toContain(`--allow-fs-write=${workspaceDir}/*`);
|
||||
expect(args).toContain(`--allow-fs-write=${path.join(workspaceDir, "out/*")}`);
|
||||
expect(args).toContain(`--allow-fs-write=${agentDir}/*`);
|
||||
expect(args).toContain(`--allow-fs-write=${sessionFile}`);
|
||||
expect(args).toContain(`--allow-fs-write=${sessionFile}.lock`);
|
||||
expect(args).toContain(`--allow-fs-write=${path.dirname(sessionFile)}/*`);
|
||||
expect(args).toContain(`--allow-fs-write=${storePath}`);
|
||||
expect(args).toContain(`--allow-fs-write=${storePath}.lock`);
|
||||
expect(args).toContain(`--allow-fs-write=${path.dirname(storePath)}/*`);
|
||||
expect(args.filter((arg) => arg === "--permission")).toHaveLength(1);
|
||||
const firstWriteArg = args.findIndex((arg) => arg.startsWith("--allow-fs-write="));
|
||||
const lastReadArg = args.findLastIndex((arg) => arg.startsWith("--allow-fs-read="));
|
||||
expect(firstWriteArg).toBeGreaterThan(0);
|
||||
expect(lastReadArg).toBeLessThan(firstWriteArg);
|
||||
});
|
||||
});
|
||||
102
src/agents/worker-runtime/permissions.ts
Normal file
102
src/agents/worker-runtime/permissions.ts
Normal file
@@ -0,0 +1,102 @@
|
||||
import { basename, dirname, resolve } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
type AgentWorkerPermissionRoots = {
|
||||
workspaceDir: string;
|
||||
agentDir?: string;
|
||||
sessionFile?: string;
|
||||
storePath?: string;
|
||||
readRoots?: string[];
|
||||
writeRoots?: string[];
|
||||
};
|
||||
|
||||
function normalizeRoot(path: string | undefined): string | undefined {
|
||||
const trimmed = path?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
return resolve(trimmed);
|
||||
}
|
||||
|
||||
function addRoot(target: Set<string>, path: string | undefined): void {
|
||||
const normalized = normalizeRoot(path);
|
||||
if (normalized) {
|
||||
target.add(normalized);
|
||||
}
|
||||
}
|
||||
|
||||
function addMutableFileRoots(params: {
|
||||
readRoots: Set<string>;
|
||||
writeRoots: Set<string>;
|
||||
filePath: string | undefined;
|
||||
}): void {
|
||||
const filePath = normalizeRoot(params.filePath);
|
||||
if (!filePath) {
|
||||
return;
|
||||
}
|
||||
const lockPath = `${filePath}.lock`;
|
||||
addRoot(params.readRoots, filePath);
|
||||
addRoot(params.readRoots, lockPath);
|
||||
addRoot(params.writeRoots, filePath);
|
||||
addRoot(params.writeRoots, lockPath);
|
||||
addRoot(params.writeRoots, `${dirname(filePath)}/*`);
|
||||
}
|
||||
|
||||
function addNodeModuleReadRoots(target: Set<string>): void {
|
||||
let current = dirname(fileURLToPath(import.meta.url));
|
||||
let previous = "";
|
||||
while (current !== previous) {
|
||||
addRoot(target, `${current}/node_modules/*`);
|
||||
previous = current;
|
||||
current = dirname(current);
|
||||
}
|
||||
}
|
||||
|
||||
function addRuntimeReadRoots(target: Set<string>): void {
|
||||
let current = dirname(fileURLToPath(import.meta.url));
|
||||
let previous = "";
|
||||
while (current !== previous) {
|
||||
const name = basename(current);
|
||||
if (name === "dist" || name === "src") {
|
||||
addRoot(target, `${current}/*`);
|
||||
}
|
||||
if (name === "src") {
|
||||
addRoot(target, `${dirname(current)}/extensions/*`);
|
||||
}
|
||||
previous = current;
|
||||
current = dirname(current);
|
||||
}
|
||||
}
|
||||
|
||||
export function buildAgentWorkerPermissionExecArgv(roots: AgentWorkerPermissionRoots): string[] {
|
||||
const readRoots = new Set<string>();
|
||||
const writeRoots = new Set<string>();
|
||||
|
||||
addRoot(readRoots, `${roots.workspaceDir}/*`);
|
||||
addRoot(writeRoots, `${roots.workspaceDir}/*`);
|
||||
|
||||
addRoot(readRoots, roots.agentDir ? `${roots.agentDir}/*` : undefined);
|
||||
addRoot(writeRoots, roots.agentDir ? `${roots.agentDir}/*` : undefined);
|
||||
|
||||
addMutableFileRoots({ readRoots, writeRoots, filePath: roots.sessionFile });
|
||||
addMutableFileRoots({ readRoots, writeRoots, filePath: roots.storePath });
|
||||
|
||||
for (const root of roots.readRoots ?? []) {
|
||||
addRoot(readRoots, root);
|
||||
}
|
||||
for (const root of roots.writeRoots ?? []) {
|
||||
addRoot(writeRoots, root);
|
||||
}
|
||||
|
||||
addNodeModuleReadRoots(readRoots);
|
||||
addRuntimeReadRoots(readRoots);
|
||||
|
||||
const args = ["--permission"];
|
||||
for (const root of [...readRoots].toSorted()) {
|
||||
args.push(`--allow-fs-read=${root}`);
|
||||
}
|
||||
for (const root of [...writeRoots].toSorted()) {
|
||||
args.push(`--allow-fs-write=${root}`);
|
||||
}
|
||||
return args;
|
||||
}
|
||||
58
src/agents/worker-runtime/plugin-runtime.test.ts
Normal file
58
src/agents/worker-runtime/plugin-runtime.test.ts
Normal file
@@ -0,0 +1,58 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
|
||||
import type { AgentRuntimeWorkerRunParams } from "./agent-runtime.types.js";
|
||||
import { restoreAgentWorkerPluginRuntime } from "./plugin-runtime.js";
|
||||
|
||||
vi.mock("../runtime-plugins.js", () => ({
|
||||
ensureRuntimePluginsLoaded: vi.fn(),
|
||||
}));
|
||||
|
||||
const mockedEnsureRuntimePluginsLoaded = vi.mocked(ensureRuntimePluginsLoaded);
|
||||
|
||||
function makeParams(): AgentRuntimeWorkerRunParams {
|
||||
return {
|
||||
providerOverride: "openai",
|
||||
originalProvider: "openai",
|
||||
modelOverride: "gpt-5.5",
|
||||
cfg: { plugins: { entries: { demo: { enabled: true } } } } as OpenClawConfig,
|
||||
sessionEntry: undefined,
|
||||
sessionId: "session-worker-test",
|
||||
sessionKey: "agent:main:worker-test",
|
||||
sessionAgentId: "main",
|
||||
sessionFile: "/tmp/openclaw-worker-session.jsonl",
|
||||
workspaceDir: "/tmp/openclaw-worker-workspace",
|
||||
body: "hello",
|
||||
isFallbackRetry: false,
|
||||
resolvedThinkLevel: "medium",
|
||||
timeoutMs: 1_000,
|
||||
runId: "run-worker-test",
|
||||
opts: { message: "hello", senderIsOwner: false },
|
||||
runContext: {} as AgentRuntimeWorkerRunParams["runContext"],
|
||||
spawnedBy: undefined,
|
||||
messageChannel: undefined,
|
||||
skillsSnapshot: undefined,
|
||||
resolvedVerboseLevel: undefined,
|
||||
agentDir: "/tmp/openclaw-worker-agent",
|
||||
authProfileProvider: "openai",
|
||||
sessionHasHistory: false,
|
||||
};
|
||||
}
|
||||
|
||||
describe("agent worker plugin runtime", () => {
|
||||
beforeEach(() => {
|
||||
mockedEnsureRuntimePluginsLoaded.mockClear();
|
||||
});
|
||||
|
||||
it("restores gateway-bindable runtime plugins before worker attempts", () => {
|
||||
const params = makeParams();
|
||||
|
||||
restoreAgentWorkerPluginRuntime(params);
|
||||
|
||||
expect(mockedEnsureRuntimePluginsLoaded).toHaveBeenCalledWith({
|
||||
config: params.cfg,
|
||||
workspaceDir: params.workspaceDir,
|
||||
allowGatewaySubagentBinding: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
10
src/agents/worker-runtime/plugin-runtime.ts
Normal file
10
src/agents/worker-runtime/plugin-runtime.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
|
||||
import type { AgentRuntimeWorkerRunParams } from "./agent-runtime.types.js";
|
||||
|
||||
export function restoreAgentWorkerPluginRuntime(params: AgentRuntimeWorkerRunParams): void {
|
||||
ensureRuntimePluginsLoaded({
|
||||
config: params.cfg,
|
||||
workspaceDir: params.workspaceDir,
|
||||
allowGatewaySubagentBinding: true,
|
||||
});
|
||||
}
|
||||
@@ -980,6 +980,12 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"Experimental agent-default flags. Keep these off unless you are intentionally testing a preview surface.",
|
||||
"agents.defaults.experimental.localModelLean":
|
||||
"Experimental local-model prompt trim. When enabled, OpenClaw drops heavyweight default tools like browser, cron, and message for weaker or smaller local-model backends.",
|
||||
"agents.defaults.experimental.runtimeIsolation":
|
||||
'Experimental command-agent runtime isolation. Set mode="worker" to run /agent command attempts inside a Node worker thread; gateway reply runs remain in-process while their streaming callbacks stay parent-owned.',
|
||||
"agents.defaults.experimental.runtimeIsolation.mode":
|
||||
'Choose the experimental command-agent runtime isolation mode. "off" keeps the current in-process path; "worker" runs /agent command attempts in Node worker threads.',
|
||||
"agents.defaults.experimental.runtimeIsolation.permissions":
|
||||
"Also enable Node permission flags for the worker runtime. More restrictive and runtime-sensitive than worker isolation itself.",
|
||||
"agents.defaults.bootstrapPromptTruncationWarning":
|
||||
'Inject agent-visible warning text when bootstrap files are truncated: "off", "once" (default), or "always".',
|
||||
"agents.defaults.startupContext":
|
||||
|
||||
@@ -393,6 +393,10 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"agents.defaults.bootstrapTotalMaxChars": "Bootstrap Total Max Chars",
|
||||
"agents.defaults.experimental": "Experimental Agent Flags",
|
||||
"agents.defaults.experimental.localModelLean": "Enable Lean Local Model Mode (Experimental)",
|
||||
"agents.defaults.experimental.runtimeIsolation": "Command Runtime Isolation (Experimental)",
|
||||
"agents.defaults.experimental.runtimeIsolation.mode": "Command Runtime Isolation Mode",
|
||||
"agents.defaults.experimental.runtimeIsolation.permissions":
|
||||
"Command Runtime Isolation Permissions",
|
||||
"agents.defaults.bootstrapPromptTruncationWarning": "Bootstrap Prompt Truncation Warning",
|
||||
"agents.defaults.startupContext": "Startup Context",
|
||||
"agents.defaults.startupContext.enabled": "Enable Startup Context",
|
||||
|
||||
@@ -1,7 +1,14 @@
|
||||
import { execFile } from "node:child_process";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { promisify } from "node:util";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
getSessionStoreWriterQueueSizeForTest,
|
||||
loadSessionStore,
|
||||
withSessionStoreWriterForTest,
|
||||
} from "./store.js";
|
||||
|
||||
@@ -15,6 +22,9 @@ const createDeferred = <T>() => {
|
||||
return { promise, resolve, reject };
|
||||
};
|
||||
|
||||
const execFileAsync = promisify(execFile);
|
||||
const testDir = path.dirname(fileURLToPath(import.meta.url));
|
||||
|
||||
describe("session store writer", () => {
|
||||
afterEach(() => {
|
||||
clearSessionStoreCacheForTest();
|
||||
@@ -53,4 +63,45 @@ describe("session store writer", () => {
|
||||
);
|
||||
expect(getSessionStoreWriterQueueSizeForTest()).toBe(0);
|
||||
});
|
||||
|
||||
it("serializes session store updates across worker threads", async () => {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-store-writer-"));
|
||||
try {
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
await fs.writeFile(storePath, "{}\n", "utf8");
|
||||
const source = `
|
||||
const { updateSessionStore } = await import("./src/config/sessions/store.ts");
|
||||
|
||||
await updateSessionStore(process.env.OPENCLAW_TEST_STORE_PATH, async (store) => {
|
||||
store[process.env.OPENCLAW_TEST_SESSION_KEY] = {
|
||||
sessionId: process.env.OPENCLAW_TEST_SESSION_KEY,
|
||||
updatedAt: Date.now()
|
||||
};
|
||||
await new Promise((resolve) => setTimeout(resolve, 75));
|
||||
});
|
||||
`;
|
||||
const workers = Array.from({ length: 4 }, (_, index) =>
|
||||
execFileAsync(process.execPath, ["--import", "tsx", "--eval", source], {
|
||||
cwd: path.resolve(testDir, "../../.."),
|
||||
env: {
|
||||
...process.env,
|
||||
OPENCLAW_TEST_SESSION_KEY: `agent:worker:${index}`,
|
||||
OPENCLAW_TEST_STORE_PATH: storePath,
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
await Promise.all(workers);
|
||||
|
||||
const store = loadSessionStore(storePath, { skipCache: true });
|
||||
expect(Object.keys(store).toSorted()).toEqual([
|
||||
"agent:worker:0",
|
||||
"agent:worker:1",
|
||||
"agent:worker:2",
|
||||
"agent:worker:3",
|
||||
]);
|
||||
} finally {
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,9 +1,27 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { threadId } from "node:worker_threads";
|
||||
import {
|
||||
WRITER_QUEUES,
|
||||
type SessionStoreWriterQueue,
|
||||
type SessionStoreWriterTask,
|
||||
} from "./store-writer-state.js";
|
||||
|
||||
const DEFAULT_FILE_LOCK_TIMEOUT_MS = 60_000;
|
||||
const DEFAULT_FILE_LOCK_STALE_MS = 30 * 60 * 1000;
|
||||
const DEFAULT_FILE_LOCK_POLL_MS = 25;
|
||||
const ORPHAN_LOCK_PAYLOAD_GRACE_MS = 5_000;
|
||||
|
||||
type SessionStoreLockPayload = {
|
||||
pid?: number;
|
||||
threadId?: number;
|
||||
hostname?: string;
|
||||
createdAt?: string;
|
||||
token?: string;
|
||||
};
|
||||
|
||||
export async function withSessionStoreWriterForTest<T>(
|
||||
storePath: string,
|
||||
fn: () => Promise<T>,
|
||||
@@ -11,6 +29,138 @@ export async function withSessionStoreWriterForTest<T>(
|
||||
return await runExclusiveSessionStoreWrite(storePath, fn);
|
||||
}
|
||||
|
||||
function resolvePositiveMs(value: string | undefined, fallback: number): number {
|
||||
const parsed = value ? Number(value) : Number.NaN;
|
||||
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function resolveSessionStoreLockPath(storePath: string): string {
|
||||
return `${path.resolve(storePath)}.lock`;
|
||||
}
|
||||
|
||||
function getErrorCode(error: unknown): string | undefined {
|
||||
return error && typeof error === "object" && "code" in error
|
||||
? String((error as { code?: unknown }).code)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
async function readSessionStoreLockPayload(
|
||||
lockPath: string,
|
||||
): Promise<SessionStoreLockPayload | null> {
|
||||
try {
|
||||
return JSON.parse(await fs.readFile(lockPath, "utf8")) as SessionStoreLockPayload;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async function isSessionStoreLockStale(
|
||||
lockPath: string,
|
||||
payload: SessionStoreLockPayload | null,
|
||||
staleMs: number,
|
||||
): Promise<boolean> {
|
||||
if (!payload?.createdAt) {
|
||||
const stat = await fs.stat(lockPath).catch(() => null);
|
||||
if (!stat) {
|
||||
return true;
|
||||
}
|
||||
return Date.now() - stat.mtimeMs > ORPHAN_LOCK_PAYLOAD_GRACE_MS;
|
||||
}
|
||||
const createdAtMs = Date.parse(payload.createdAt);
|
||||
if (!Number.isFinite(createdAtMs)) {
|
||||
const stat = await fs.stat(lockPath).catch(() => null);
|
||||
if (!stat) {
|
||||
return true;
|
||||
}
|
||||
return Date.now() - stat.mtimeMs > ORPHAN_LOCK_PAYLOAD_GRACE_MS;
|
||||
}
|
||||
return Date.now() - createdAtMs > staleMs;
|
||||
}
|
||||
|
||||
async function removeStaleSessionStoreLock(
|
||||
lockPath: string,
|
||||
expectedToken: string | undefined,
|
||||
): Promise<void> {
|
||||
if (!expectedToken) {
|
||||
await fs.rm(lockPath, { force: true });
|
||||
return;
|
||||
}
|
||||
const current = await readSessionStoreLockPayload(lockPath);
|
||||
if (current?.token === expectedToken) {
|
||||
await fs.rm(lockPath, { force: true });
|
||||
}
|
||||
}
|
||||
|
||||
async function acquireSessionStoreFileLock(storePath: string): Promise<() => Promise<void>> {
|
||||
const lockPath = resolveSessionStoreLockPath(storePath);
|
||||
const timeoutMs = resolvePositiveMs(
|
||||
process.env.OPENCLAW_SESSION_STORE_LOCK_TIMEOUT_MS,
|
||||
DEFAULT_FILE_LOCK_TIMEOUT_MS,
|
||||
);
|
||||
const staleMs = resolvePositiveMs(
|
||||
process.env.OPENCLAW_SESSION_STORE_LOCK_STALE_MS,
|
||||
DEFAULT_FILE_LOCK_STALE_MS,
|
||||
);
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
const payload: SessionStoreLockPayload = {
|
||||
pid: process.pid,
|
||||
threadId,
|
||||
hostname: os.hostname(),
|
||||
createdAt: new Date().toISOString(),
|
||||
token: randomUUID(),
|
||||
};
|
||||
|
||||
await fs.mkdir(path.dirname(lockPath), { recursive: true });
|
||||
for (;;) {
|
||||
let handle: fs.FileHandle | undefined;
|
||||
try {
|
||||
handle = await fs.open(lockPath, "wx", 0o600);
|
||||
await handle.writeFile(`${JSON.stringify(payload)}\n`, "utf8");
|
||||
await handle.close();
|
||||
let released = false;
|
||||
return async () => {
|
||||
if (released) {
|
||||
return;
|
||||
}
|
||||
released = true;
|
||||
await removeStaleSessionStoreLock(lockPath, payload.token).catch(() => undefined);
|
||||
};
|
||||
} catch (error) {
|
||||
await handle?.close().catch(() => undefined);
|
||||
if (getErrorCode(error) !== "EEXIST") {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const existing = await readSessionStoreLockPayload(lockPath);
|
||||
if (await isSessionStoreLockStale(lockPath, existing, staleMs)) {
|
||||
await removeStaleSessionStoreLock(lockPath, existing?.token).catch(() => undefined);
|
||||
continue;
|
||||
}
|
||||
if (Date.now() >= deadline) {
|
||||
throw new Error(
|
||||
`Timed out waiting for session store writer lock: ${lockPath} ` +
|
||||
`(owner pid=${existing?.pid ?? "unknown"} thread=${existing?.threadId ?? "unknown"})`,
|
||||
{ cause: error },
|
||||
);
|
||||
}
|
||||
await sleep(DEFAULT_FILE_LOCK_POLL_MS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function runWithSessionStoreFileLock<T>(storePath: string, fn: () => Promise<T>): Promise<T> {
|
||||
const release = await acquireSessionStoreFileLock(storePath);
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
|
||||
function getOrCreateWriterQueue(storePath: string): SessionStoreWriterQueue {
|
||||
const existing = WRITER_QUEUES.get(storePath);
|
||||
if (existing) {
|
||||
@@ -43,7 +193,7 @@ async function drainSessionStoreWriterQueue(storePath: string): Promise<void> {
|
||||
let failed: unknown;
|
||||
let hasFailure = false;
|
||||
try {
|
||||
result = await task.fn();
|
||||
result = await runWithSessionStoreFileLock(storePath, task.fn);
|
||||
} catch (err) {
|
||||
hasFailure = true;
|
||||
failed = err;
|
||||
|
||||
@@ -92,6 +92,21 @@ export type AgentContextLimitsConfig = {
|
||||
postCompactionMaxChars?: number;
|
||||
};
|
||||
|
||||
export type AgentRuntimeIsolationExperimentalConfig = {
|
||||
/**
|
||||
* Run /agent command attempts inside an isolated Node worker thread.
|
||||
* Experimental preview only; gateway reply runs stay in-process until their
|
||||
* streaming callbacks have a dedicated worker bridge, and CLI harnesses may
|
||||
* still spawn their own child processes.
|
||||
*/
|
||||
mode?: "off" | "worker";
|
||||
/**
|
||||
* Add Node permission flags to the worker process. This is stricter and more
|
||||
* runtime-sensitive than worker isolation itself.
|
||||
*/
|
||||
permissions?: boolean;
|
||||
};
|
||||
|
||||
export type CliBackendConfig = {
|
||||
/** CLI command to execute (absolute path or on PATH). */
|
||||
command: string;
|
||||
@@ -258,6 +273,12 @@ export type AgentDefaultsConfig = {
|
||||
* model backends. Experimental preview only.
|
||||
*/
|
||||
localModelLean?: boolean;
|
||||
/**
|
||||
* Run /agent command attempts through an isolated Node worker runtime.
|
||||
* Gateway reply runs remain in-process until their streaming callbacks have
|
||||
* a dedicated worker bridge. Experimental preview only.
|
||||
*/
|
||||
runtimeIsolation?: AgentRuntimeIsolationExperimentalConfig;
|
||||
};
|
||||
/**
|
||||
* Agent-visible bootstrap truncation warning mode:
|
||||
|
||||
@@ -64,6 +64,21 @@ describe("agent defaults schema", () => {
|
||||
expect(result.experimental?.localModelLean).toBe(true);
|
||||
});
|
||||
|
||||
it("accepts experimental.runtimeIsolation", () => {
|
||||
const result = AgentDefaultsSchema.parse({
|
||||
experimental: {
|
||||
runtimeIsolation: {
|
||||
mode: "worker",
|
||||
permissions: true,
|
||||
},
|
||||
},
|
||||
})!;
|
||||
expect(result.experimental?.runtimeIsolation).toEqual({
|
||||
mode: "worker",
|
||||
permissions: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("accepts contextInjection: always", () => {
|
||||
const result = AgentDefaultsSchema.parse({ contextInjection: "always" })!;
|
||||
expect(result.contextInjection).toBe("always");
|
||||
|
||||
@@ -105,6 +105,13 @@ export const AgentDefaultsSchema = z
|
||||
experimental: z
|
||||
.object({
|
||||
localModelLean: z.boolean().optional(),
|
||||
runtimeIsolation: z
|
||||
.object({
|
||||
mode: z.union([z.literal("off"), z.literal("worker")]).optional(),
|
||||
permissions: z.boolean().optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { registerLegacyContextEngine } from "./legacy.registration.js";
|
||||
import { getContextEngineFactory } from "./registry.js";
|
||||
|
||||
/**
|
||||
* Ensures all built-in context engines are registered exactly once.
|
||||
* Ensures all built-in context engines are registered.
|
||||
*
|
||||
* The legacy engine is always registered as a safe fallback so that
|
||||
* `resolveContextEngine()` can resolve the default "legacy" slot without
|
||||
@@ -13,7 +14,7 @@ import { registerLegacyContextEngine } from "./legacy.registration.js";
|
||||
let initialized = false;
|
||||
|
||||
export function ensureContextEnginesInitialized(): void {
|
||||
if (initialized) {
|
||||
if (initialized && getContextEngineFactory("legacy")) {
|
||||
return;
|
||||
}
|
||||
initialized = true;
|
||||
|
||||
@@ -232,6 +232,8 @@ function buildCoreDistEntries(): Record<string, string> {
|
||||
"plugins/runtime/index": "src/plugins/runtime/index.ts",
|
||||
"llm-slug-generator": "src/hooks/llm-slug-generator.ts",
|
||||
"mcp/plugin-tools-serve": "src/mcp/plugin-tools-serve.ts",
|
||||
// Stable worker entry loaded from bundled agent runtime chunks.
|
||||
"agent-runtime.worker": "src/agents/worker-runtime/agent-runtime.worker.ts",
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user