Compare commits

...

8 Commits

Author SHA1 Message Date
Peter Steinberger
883eed2e95 fix: restore worker runtime state 2026-05-06 09:22:40 +01:00
Peter Steinberger
84d793db29 fix: preserve worker runtime control errors 2026-05-06 08:49:52 +01:00
Peter Steinberger
0a6e55fa0a fix: harden agent worker runtime isolation 2026-05-06 07:50:44 +01:00
Peter Steinberger
f74e4161eb fix: serialize session store writes across workers 2026-05-06 06:22:47 +01:00
Peter Steinberger
99881ae378 fix: initialize context engines before CLI compaction 2026-05-06 01:56:19 +01:00
Peter Steinberger
721f8c070d build: emit agent runtime worker entry 2026-05-06 01:56:18 +01:00
Peter Steinberger
745bd861ae feat: isolate agent attempts in workers 2026-05-06 01:56:18 +01:00
Peter Steinberger
2e7246e70f feat: experiment with agent worker runtime 2026-05-06 01:56:18 +01:00
24 changed files with 1552 additions and 15 deletions

View File

@@ -21,11 +21,67 @@ Treat them differently from normal config:
## Currently documented flags
| Surface | Key | Use it when | More |
| ------------------------ | --------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------- |
| Local model runtime | `agents.defaults.experimental.localModelLean` | A smaller or stricter local backend chokes on OpenClaw's full default tool surface | [Local Models](/gateway/local-models) |
| Memory search | `agents.defaults.memorySearch.experimental.sessionMemory` | You want `memory_search` to index prior session transcripts and accept the extra storage/indexing cost | [Memory configuration reference](/reference/memory-config#session-memory-search-experimental) |
| Structured planning tool | `tools.experimental.planTool` | You want the structured `update_plan` tool exposed for multi-step work tracking in compatible runtimes and UIs | [Gateway configuration reference](/gateway/config-tools#toolsexperimental) |
| Surface | Key | Use it when | More |
| ------------------------------- | --------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------- |
| Local model runtime | `agents.defaults.experimental.localModelLean` | A smaller or stricter local backend chokes on OpenClaw's full default tool surface | [Local Models](/gateway/local-models) |
| Agent command runtime isolation | `agents.defaults.experimental.runtimeIsolation` | You want `/agent` command attempts to run in a Node worker compartment while testing parallel-agent isolation | [Agent command runtime isolation](#agent-command-runtime-isolation) |
| Memory search | `agents.defaults.memorySearch.experimental.sessionMemory` | You want `memory_search` to index prior session transcripts and accept the extra storage/indexing cost | [Memory configuration reference](/reference/memory-config#session-memory-search-experimental) |
| Structured planning tool | `tools.experimental.planTool` | You want the structured `update_plan` tool exposed for multi-step work tracking in compatible runtimes and UIs | [Gateway configuration reference](/gateway/config-tools#toolsexperimental) |
## Agent command runtime isolation
`agents.defaults.experimental.runtimeIsolation.mode: "worker"` runs `/agent`
command attempts in a Node worker thread. The parent process still owns command
routing, model fallback policy, final session-store updates, delivery, and
lifecycle reporting; the worker owns the in-repo command runtime attempt itself.
Normal inbound Gateway replies remain on the in-process embedded runner for now.
That path owns live streaming and delivery callbacks in the parent process and
needs a dedicated callback bridge before it can move into this worker
compartment.
This is a compartment boundary, not a general speed switch. It can help when
several in-repo command agents run at once and you want each run to have its own
event loop, worker lifetime, and future filesystem permission scope. It will not
make remote model calls faster, and CLI/ACP harnesses such as Codex may still
spawn their own child processes inside the worker.
Session-store writes still go through the normal `updateSessionStore(...)` path.
That writer uses a `sessions.json.lock` file lock so worker-thread updates for
different agents do not overwrite each other when they share the same store.
### Enable
```json5
{
agents: {
defaults: {
experimental: {
runtimeIsolation: {
mode: "worker",
},
},
},
},
}
```
For developer-only overrides, `OPENCLAW_AGENT_RUNTIME_WORKER=1` forces the
worker path and `OPENCLAW_AGENT_RUNTIME_WORKER=0` forces the in-process path.
The older `OPENCLAW_AGENT_WORKER_EXPERIMENT` env var is also accepted while the
experiment is in flight.
### Worker permissions
`runtimeIsolation.permissions: true` also starts the worker with Node permission
flags scoped to the agent workspace, agent directory, session transcript,
session store and lock files, OpenClaw runtime bundle/development source,
bundled plugin source, and runtime dependencies.
Keep this off unless you are explicitly testing filesystem hardening. Node
permission behavior is stricter and more runtime-sensitive than worker
isolation itself, so package reads or child-process based harnesses may need
additional design before this becomes broadly usable.
## Local model lean mode

View File

@@ -85,7 +85,7 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f
- `maxDiskBytes`: optional sessions-directory budget
- `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`)
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations without taking a runtime file lock. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. When a Gateway is reachable, non-dry-run `openclaw sessions cleanup` and `openclaw agents delete` delegate store mutations to the Gateway so cleanup joins the same writer queue; `--store <path>` is the explicit offline repair path for direct file maintenance. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately and prunes old unreferenced transcript, checkpoint, and trajectory artifacts even when no disk budget is configured.
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations and takes a `sessions.json.lock` file lock while reading and writing the store. The file lock keeps Node worker threads and other runtime isolates from losing updates when they mutate the same session store. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. When a Gateway is reachable, non-dry-run `openclaw sessions cleanup` and `openclaw agents delete` delegate store mutations to the Gateway so cleanup joins the same writer queue; `--store <path>` is the explicit offline repair path for direct file maintenance. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately and prunes old unreferenced transcript, checkpoint, and trajectory artifacts even when no disk budget is configured.
Maintenance keeps durable external conversation pointers such as group sessions
and thread-scoped chat sessions, but synthetic runtime entries for cron, hooks,

View File

@@ -42,6 +42,7 @@ import {
} from "./agent-scope.js";
import { clearSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
import { ensureAuthProfileStore } from "./auth-profiles/store.js";
import type { RunAgentAttemptParams } from "./command/attempt-execution.js";
import {
persistSessionEntry as persistSessionEntryBase,
prependInternalEventContext,
@@ -72,6 +73,10 @@ import { resolveProviderIdForAuth } from "./provider-auth-aliases.js";
import { hydrateResolvedSkillsAsync } from "./skills/snapshot-hydration.js";
import { normalizeSpawnedRunMetadata } from "./spawned-context.js";
import { resolveAgentTimeoutMs } from "./timeout.js";
import {
runAgentAttemptInWorker,
shouldRunAgentCommandAttemptInWorker,
} from "./worker-runtime/agent-runtime.js";
import { ensureAgentWorkspace } from "./workspace.js";
const log = createSubsystemLogger("agents/agent-command");
@@ -989,7 +994,7 @@ async function agentCommandInternal(
run: async (providerOverride, modelOverride, runOptions) => {
const isFallbackRetry = fallbackAttemptIndex > 0;
fallbackAttemptIndex += 1;
return attemptExecutionRuntime.runAgentAttempt({
const attemptParams: RunAgentAttemptParams = {
providerOverride,
modelOverride,
modelFallbacksOverride: effectiveFallbacksOverride,
@@ -1039,7 +1044,10 @@ async function agentCommandInternal(
lifecycleEnded = true;
}
},
});
};
return shouldRunAgentCommandAttemptInWorker({ config: cfg })
? runAgentAttemptInWorker(attemptParams)
: attemptExecutionRuntime.runAgentAttempt(attemptParams);
},
});
result = fallbackResult.result;

View File

@@ -23,6 +23,7 @@ import { FailoverError } from "../failover-error.js";
import { resolveAgentHarnessPolicy } from "../harness/selection.js";
import { isCliRuntimeAlias, resolveCliRuntimeExecutionProvider } from "../model-runtime-aliases.js";
import { isCliProvider } from "../model-selection.js";
import type { RunEmbeddedPiAgentParams } from "../pi-embedded-runner/run/params.js";
import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js";
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
import {
@@ -345,7 +346,7 @@ export async function persistCliTurnTranscript(params: {
});
}
export function runAgentAttempt(params: {
export type RunAgentAttemptParams = {
providerOverride: string;
modelOverride: string;
originalProvider: string;
@@ -382,7 +383,9 @@ export function runAgentAttempt(params: {
sessionHasHistory?: boolean;
suppressPromptPersistenceOnRetry?: boolean;
onUserMessagePersisted?: (message: Extract<AgentMessage, { role: "user" }>) => void;
}) {
};
export function runAgentAttempt(params: RunAgentAttemptParams) {
const isRawModelRun = params.opts.modelRun === true || params.opts.promptMode === "none";
const claudeCliFallbackPrelude =
!isRawModelRun &&
@@ -583,7 +586,7 @@ export function runAgentAttempt(params: {
});
}
return runEmbeddedPiAgent({
const embeddedRunParams: RunEmbeddedPiAgentParams = {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
agentId: params.sessionAgentId,
@@ -640,7 +643,9 @@ export function runAgentAttempt(params: {
onUserMessagePersisted: params.onUserMessagePersisted,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
});
};
return runEmbeddedPiAgent(embeddedRunParams);
}
function resolveSessionPinnedAgentHarnessId(params: {

View File

@@ -5,6 +5,10 @@ import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import {
clearContextEnginesForOwner,
listContextEngineIds,
} from "../../context-engine/registry.js";
import type { ContextEngine } from "../../context-engine/types.js";
import {
resetCliCompactionTestDeps,
@@ -167,4 +171,56 @@ describe("runCliTurnCompactionLifecycle", () => {
expect(updatedEntry?.cliSessionIds?.["claude-cli"]).toBeUndefined();
expect(updatedEntry?.claudeCliSessionId).toBeUndefined();
});
it("registers the built-in context engines before parent-side CLI transcript persistence", async () => {
const sessionKey = "agent:main:explicit:worker";
const sessionId = "session-worker";
const sessionFile = path.join(tmpDir, "session.jsonl");
await writeSessionFile({ sessionFile, sessionId });
clearContextEnginesForOwner("core");
expect(listContextEngineIds()).not.toContain("legacy");
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 100,
totalTokensFresh: true,
};
setCliCompactionTestDeps({
createPreparedEmbeddedPiSettingsManager: async () => ({
getCompactionReserveTokens: () => 200,
getCompactionKeepRecentTokens: () => 0,
applyOverrides: () => {},
}),
applyPiAutoCompactionGuard: () => {},
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 100,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
effectiveReserveTokens: 200,
}),
resolveLiveToolResultMaxChars: () => 20_000,
});
const updatedEntry = await runCliTurnCompactionLifecycle({
cfg: {} as OpenClawConfig,
sessionId,
sessionKey,
sessionEntry,
sessionAgentId: "main",
workspaceDir: tmpDir,
agentDir: tmpDir,
provider: "openai",
model: "gpt-5.4",
});
expect(updatedEntry).toBe(sessionEntry);
expect(listContextEngineIds()).toContain("legacy");
});
});

View File

@@ -2,6 +2,7 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { ensureContextEnginesInitialized as ensureContextEnginesInitializedImpl } from "../../context-engine/init.js";
import { resolveContextEngine as resolveContextEngineImpl } from "../../context-engine/registry.js";
import type { ContextEngine } from "../../context-engine/types.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
@@ -27,6 +28,7 @@ type SettingsManagerLike = {
setCompactionEnabled?: (enabled: boolean) => void;
};
type CliCompactionDeps = {
ensureContextEnginesInitialized: () => void;
openSessionManager: (sessionFile: string) => SessionManagerLike;
resolveContextEngine: (cfg: OpenClawConfig) => Promise<ContextEngine>;
createPreparedEmbeddedPiSettingsManager: (params: {
@@ -48,6 +50,7 @@ type CliCompactionDeps = {
const log = createSubsystemLogger("agents/cli-compaction");
const cliCompactionDeps: CliCompactionDeps = {
ensureContextEnginesInitialized: ensureContextEnginesInitializedImpl,
openSessionManager: (sessionFile: string) => SessionManager.open(sessionFile),
resolveContextEngine: resolveContextEngineImpl,
createPreparedEmbeddedPiSettingsManager: createPreparedEmbeddedPiSettingsManagerImpl,
@@ -64,6 +67,7 @@ export function setCliCompactionTestDeps(overrides: Partial<typeof cliCompaction
export function resetCliCompactionTestDeps(): void {
Object.assign(cliCompactionDeps, {
ensureContextEnginesInitialized: ensureContextEnginesInitializedImpl,
openSessionManager: (sessionFile: string) => SessionManager.open(sessionFile),
resolveContextEngine: resolveContextEngineImpl,
createPreparedEmbeddedPiSettingsManager: createPreparedEmbeddedPiSettingsManagerImpl,
@@ -196,6 +200,7 @@ export async function runCliTurnCompactionLifecycle(params: {
return params.sessionEntry;
}
cliCompactionDeps.ensureContextEnginesInitialized();
const contextEngine = await cliCompactionDeps.resolveContextEngine(params.cfg);
const sessionManager = cliCompactionDeps.openSessionManager(sessionFile);
const settingsManager = await cliCompactionDeps.createPreparedEmbeddedPiSettingsManager({

View File

@@ -0,0 +1,464 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import {
onAgentEvent as onParentAgentEvent,
resetAgentEventsForTest,
type AgentEventPayload,
} from "../../infra/agent-events.js";
import type { RunAgentAttemptParams } from "../command/attempt-execution.js";
import { LiveSessionModelSwitchError } from "../live-model-switch-error.js";
import {
AgentWorkerUnsupportedParamsError,
runAgentAttemptInWorker,
shouldRunAgentCommandAttemptInWorker,
} from "./agent-runtime.js";
import { serializeWorkerError } from "./errors.js";
function createFixtureWorkerUrl(): URL {
const source = `
import fs from "node:fs/promises";
import { parentPort } from "node:worker_threads";
let runStarted = false;
function post(message) {
parentPort.postMessage(message);
}
async function writeSessionStoreUpdate(message, tag) {
if (!message.params.storePath || !message.params.sessionKey) {
return;
}
await fs.writeFile(
message.params.storePath,
JSON.stringify({
[message.params.sessionKey]: {
sessionId: message.params.sessionId,
updatedAt: 456,
model: "worker-" + tag
}
}, null, 2)
);
}
parentPort.on("message", async (message) => {
if (message.type === "abort") {
if (runStarted) {
post({ type: "error", error: { name: "AbortError", message: "aborted:" + String(message.reason ?? "") } });
}
return;
}
if (message.type !== "run" || runStarted) {
return;
}
runStarted = true;
if (message.initialAbort) {
post({ type: "error", error: { name: "AbortError", message: "initial-aborted:" + String(message.initialAbort.reason ?? "") } });
return;
}
post({
type: "agentEvent",
origin: "runtime",
event: {
runId: message.params.runId,
seq: 7,
ts: 123,
stream: "tool",
data: { phase: "runtime", runId: message.params.runId }
}
});
post({
type: "agentEvent",
origin: "runtime",
event: {
runId: "child-run",
seq: 1,
ts: 124,
stream: "lifecycle",
data: { phase: "end", runId: "child-run" }
}
});
post({
type: "agentEvent",
origin: "callback",
event: {
stream: "lifecycle",
sessionKey: message.params.sessionKey,
data: { phase: "fixture", runId: message.params.runId }
}
});
post({
type: "userMessagePersisted",
message: { role: "user", content: [{ type: "text", text: message.params.body }] }
});
if (message.params.body === "throw") {
post({ type: "error", error: { name: "FixtureError", message: "fixture failed", code: "FIXTURE" } });
return;
}
if (message.params.body === "mutate-store-then-throw") {
await writeSessionStoreUpdate(message, "error");
post({ type: "error", error: { name: "FixtureError", message: "fixture failed", code: "FIXTURE" } });
return;
}
if (message.params.body === "switch") {
post({
type: "error",
error: {
name: "LiveSessionModelSwitchError",
message: "Live session model switch requested: anthropic/claude-sonnet-4.6",
control: {
type: "liveSessionModelSwitch",
provider: "anthropic",
model: "claude-sonnet-4.6",
authProfileId: "profile-1",
authProfileIdSource: "user"
}
}
});
return;
}
if (message.params.body === "wait") {
return;
}
if (message.params.body === "mutate-store") {
await writeSessionStoreUpdate(message, "result");
}
post({
type: "result",
result: {
payloads: [{ text: "worker:" + message.params.body }],
meta: {
durationMs: 1,
finalAssistantVisibleText: "worker:" + message.params.body,
agentMeta: {
sessionId: message.params.sessionId,
provider: message.params.providerOverride ?? "fixture",
model: message.params.modelOverride ?? "fixture-model"
},
executionTrace: { runner: "embedded" }
}
}
});
});
`;
return new URL(`data:text/javascript,${encodeURIComponent(source)}`);
}
async function makeWorkerParams(body: string): Promise<RunAgentAttemptParams> {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-agent-worker-"));
tmpDirs.push(tmpDir);
return {
providerOverride: "openai",
originalProvider: "openai",
modelOverride: "gpt-5.5",
cfg: {} as OpenClawConfig,
sessionEntry: undefined,
sessionId: "session-worker-test",
sessionKey: "agent:main:worker-test",
sessionAgentId: "main",
sessionFile: path.join(tmpDir, "session.jsonl"),
workspaceDir: tmpDir,
body,
isFallbackRetry: false,
resolvedThinkLevel: "medium",
timeoutMs: 1_000,
runId: "run-worker-test",
opts: { message: body, senderIsOwner: false },
runContext: {} as RunAgentAttemptParams["runContext"],
spawnedBy: undefined,
messageChannel: undefined,
skillsSnapshot: undefined,
resolvedVerboseLevel: undefined,
agentDir: tmpDir,
onAgentEvent: vi.fn(),
authProfileProvider: "openai",
sessionHasHistory: false,
};
}
const tmpDirs: string[] = [];
describe("agent runtime worker bridge", () => {
afterEach(async () => {
resetAgentEventsForTest();
await Promise.all(tmpDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })));
});
it("recognizes config and explicit environment overrides", () => {
expect(
shouldRunAgentCommandAttemptInWorker({
config: {
agents: { defaults: { experimental: { runtimeIsolation: { mode: "worker" } } } },
} as OpenClawConfig,
env: {},
}),
).toBe(true);
expect(
shouldRunAgentCommandAttemptInWorker({
config: {
agents: { defaults: { experimental: { runtimeIsolation: { mode: "worker" } } } },
} as OpenClawConfig,
env: { OPENCLAW_AGENT_RUNTIME_WORKER: "0" },
}),
).toBe(false);
expect(
shouldRunAgentCommandAttemptInWorker({
config: {} as OpenClawConfig,
env: { OPENCLAW_AGENT_RUNTIME_WORKER: "yes" },
}),
).toBe(true);
expect(
shouldRunAgentCommandAttemptInWorker({
config: {} as OpenClawConfig,
env: { OPENCLAW_AGENT_WORKER_EXPERIMENT: "1" },
}),
).toBe(true);
});
it("runs an agent attempt through a real worker and proxies supported callbacks", async () => {
const onAgentEvent = vi.fn();
const onUserMessagePersisted = vi.fn();
const parentEvents: AgentEventPayload[] = [];
const stopParentEvents = onParentAgentEvent((event) => {
parentEvents.push(event);
});
const result = await runAgentAttemptInWorker(
{
...(await makeWorkerParams("hello")),
onAgentEvent,
onUserMessagePersisted,
},
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
);
stopParentEvents();
expect(result.payloads?.[0]?.text).toBe("worker:hello");
expect(result.meta.agentMeta).toMatchObject({
sessionId: "session-worker-test",
provider: "openai",
model: "gpt-5.5",
});
expect(parentEvents).toEqual([
expect.objectContaining({
runId: "run-worker-test",
stream: "tool",
sessionKey: "agent:main:worker-test",
data: { phase: "runtime", runId: "run-worker-test" },
seq: expect.any(Number),
ts: expect.any(Number),
}),
expect.objectContaining({
runId: "child-run",
stream: "lifecycle",
data: { phase: "end", runId: "child-run" },
seq: expect.any(Number),
ts: expect.any(Number),
}),
]);
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "tool",
sessionKey: "agent:main:worker-test",
data: { phase: "runtime", runId: "run-worker-test" },
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
sessionKey: "agent:main:worker-test",
data: { phase: "fixture", runId: "run-worker-test" },
});
expect(onAgentEvent).not.toHaveBeenCalledWith({
stream: "lifecycle",
data: { phase: "end", runId: "child-run" },
});
expect(onUserMessagePersisted).toHaveBeenCalledWith({
role: "user",
content: [{ type: "text", text: "hello" }],
});
});
it("propagates structured worker errors", async () => {
await expect(
runAgentAttemptInWorker(await makeWorkerParams("throw"), {
workerUrl: createFixtureWorkerUrl(),
execArgv: [],
usePermissions: false,
}),
).rejects.toMatchObject({
name: "FixtureError",
message: "fixture failed",
code: "FIXTURE",
});
});
it("refreshes parent session store state after a worker result", async () => {
const params = await makeWorkerParams("mutate-store");
const sessionStore: Record<string, SessionEntry> = {
[params.sessionKey!]: {
sessionId: params.sessionId,
updatedAt: 1,
model: "parent-stale",
cliSessionBindings: { "claude-cli": { sessionId: "stale-cli-session" } },
},
};
const storePath = path.join(path.dirname(params.sessionFile), "sessions.json");
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
await runAgentAttemptInWorker(
{
...params,
sessionStore,
storePath,
},
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
);
expect(sessionStore[params.sessionKey!]).toMatchObject({
sessionId: params.sessionId,
updatedAt: 456,
model: "worker-result",
});
expect(sessionStore[params.sessionKey!]?.cliSessionBindings).toBeUndefined();
});
it("refreshes parent session store state after a worker error", async () => {
const params = await makeWorkerParams("mutate-store-then-throw");
const sessionStore: Record<string, SessionEntry> = {
[params.sessionKey!]: {
sessionId: params.sessionId,
updatedAt: 1,
model: "parent-stale",
cliSessionBindings: { "claude-cli": { sessionId: "stale-cli-session" } },
},
};
const storePath = path.join(path.dirname(params.sessionFile), "sessions.json");
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
await expect(
runAgentAttemptInWorker(
{
...params,
sessionStore,
storePath,
},
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
),
).rejects.toMatchObject({
name: "FixtureError",
});
expect(sessionStore[params.sessionKey!]).toMatchObject({
sessionId: params.sessionId,
updatedAt: 456,
model: "worker-error",
});
expect(sessionStore[params.sessionKey!]?.cliSessionBindings).toBeUndefined();
});
it("preserves live model switch errors across the worker boundary", async () => {
const error = new LiveSessionModelSwitchError({
provider: "anthropic",
model: "claude-sonnet-4.6",
authProfileId: "profile-1",
authProfileIdSource: "user",
});
expect(serializeWorkerError(error)).toMatchObject({
type: "error",
error: {
name: "LiveSessionModelSwitchError",
control: {
type: "liveSessionModelSwitch",
provider: "anthropic",
model: "claude-sonnet-4.6",
authProfileId: "profile-1",
authProfileIdSource: "user",
},
},
});
await expect(
runAgentAttemptInWorker(await makeWorkerParams("switch"), {
workerUrl: createFixtureWorkerUrl(),
execArgv: [],
usePermissions: false,
}),
).rejects.toMatchObject({
name: "LiveSessionModelSwitchError",
provider: "anthropic",
model: "claude-sonnet-4.6",
authProfileId: "profile-1",
authProfileIdSource: "user",
});
await expect(
runAgentAttemptInWorker(await makeWorkerParams("switch"), {
workerUrl: createFixtureWorkerUrl(),
execArgv: [],
usePermissions: false,
}),
).rejects.toBeInstanceOf(LiveSessionModelSwitchError);
});
it("forwards aborts into the worker", async () => {
const controller = new AbortController();
const promise = runAgentAttemptInWorker(
{
...(await makeWorkerParams("wait")),
opts: { message: "wait", senderIsOwner: false, abortSignal: controller.signal },
},
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
);
controller.abort("stop");
await expect(promise).rejects.toMatchObject({
name: "AbortError",
message: "aborted:stop",
});
});
it("preserves an already-aborted signal when starting the worker run", async () => {
const controller = new AbortController();
controller.abort("already stopped");
await expect(
runAgentAttemptInWorker(
{
...(await makeWorkerParams("hello")),
opts: {
message: "hello",
senderIsOwner: false,
abortSignal: controller.signal,
},
},
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
),
).rejects.toMatchObject({
name: "AbortError",
message: "initial-aborted:already stopped",
});
});
it("rejects invalid abort signal params before spawning a worker", async () => {
await expect(
runAgentAttemptInWorker(
{
...(await makeWorkerParams("hello")),
opts: {
message: "hello",
senderIsOwner: false,
abortSignal: "bad" as unknown as AbortSignal,
},
},
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
),
).rejects.toBeInstanceOf(AgentWorkerUnsupportedParamsError);
});
});

View File

@@ -0,0 +1,292 @@
import fs from "node:fs";
import { Worker } from "node:worker_threads";
import { loadSessionStore } from "../../config/sessions/store.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import type { RunAgentAttemptParams } from "../command/attempt-execution.js";
import type {
AgentRuntimeWorkerRunParams,
AgentWorkerToParentMessage,
RunAgentAttemptResult,
} from "./agent-runtime.types.js";
import { deserializeWorkerError } from "./errors.js";
import { buildAgentWorkerPermissionExecArgv } from "./permissions.js";
const TRUE_VALUES = new Set(["1", "true", "yes", "on"]);
const FALSE_VALUES = new Set(["0", "false", "no", "off"]);
export class AgentWorkerUnsupportedParamsError extends Error {
constructor(readonly keys: string[]) {
super(`Agent runtime worker experiment does not support params: ${keys.join(", ")}`);
this.name = "AgentWorkerUnsupportedParamsError";
}
}
export type RunAgentAttemptInWorkerOptions = {
/** Test seam; production uses the compiled agent-runtime.worker entry. */
workerUrl?: URL;
/** Test seam; production inherits the parent process execArgv. */
execArgv?: string[];
/** Test seam; production follows config/env permission settings. */
usePermissions?: boolean;
};
type RuntimeIsolationDecisionParams = {
config?: OpenClawConfig;
env?: NodeJS.ProcessEnv;
};
function readBooleanOverride(value: string | undefined): boolean | undefined {
const normalized = value?.trim().toLowerCase();
if (!normalized) {
return undefined;
}
if (TRUE_VALUES.has(normalized)) {
return true;
}
if (FALSE_VALUES.has(normalized)) {
return false;
}
return undefined;
}
export function shouldRunAgentCommandAttemptInWorker(
params: RuntimeIsolationDecisionParams = {},
): boolean {
const env = params.env ?? process.env;
const explicit =
readBooleanOverride(env.OPENCLAW_AGENT_RUNTIME_WORKER) ??
readBooleanOverride(env.OPENCLAW_AGENT_WORKER_EXPERIMENT);
if (explicit !== undefined) {
return explicit;
}
return params.config?.agents?.defaults?.experimental?.runtimeIsolation?.mode === "worker";
}
function shouldUseWorkerPermissions(
config: OpenClawConfig | undefined,
env = process.env,
): boolean {
const explicit =
readBooleanOverride(env.OPENCLAW_AGENT_RUNTIME_WORKER_PERMISSIONS) ??
readBooleanOverride(env.OPENCLAW_AGENT_WORKER_PERMISSIONS);
if (explicit !== undefined) {
return explicit;
}
return config?.agents?.defaults?.experimental?.runtimeIsolation?.permissions === true;
}
function resolveWorkerUrl(): URL {
const current = import.meta.url;
return new URL(
current.endsWith(".ts") ? "./agent-runtime.worker.ts" : "./agent-runtime.worker.js",
current,
);
}
function serializeAbortReason(reason: unknown): unknown {
if (reason instanceof Error) {
return { name: reason.name, message: reason.message, stack: reason.stack };
}
return reason;
}
function serializeInitialAbort(signal: AbortSignal | undefined): { reason?: unknown } | undefined {
if (!signal?.aborted) {
return undefined;
}
return { reason: serializeAbortReason(signal.reason) };
}
type ParentVisibleAgentEvent = Parameters<typeof emitAgentEvent>[0];
type CallbackAgentEvent = Parameters<RunAgentAttemptParams["onAgentEvent"]>[0];
function readNonEmptyString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value : undefined;
}
function toParentVisibleAgentEvent(
params: RunAgentAttemptParams,
event: AgentWorkerToParentMessage & { type: "agentEvent"; origin: "runtime" },
): ParentVisibleAgentEvent {
const runId = readNonEmptyString(event.event.runId) ?? params.runId;
const sessionKey =
readNonEmptyString(event.event.sessionKey) ??
(runId === params.runId ? params.sessionKey : undefined);
return {
runId,
stream: event.event.stream,
data: event.event.data,
...(sessionKey ? { sessionKey } : {}),
};
}
function toCallbackAgentEvent(event: ParentVisibleAgentEvent): CallbackAgentEvent {
return {
stream: event.stream,
data: event.data,
...(event.sessionKey ? { sessionKey: event.sessionKey } : {}),
};
}
function isActiveAttemptEvent(
params: RunAgentAttemptParams,
event: ParentVisibleAgentEvent,
): boolean {
return event.runId === params.runId;
}
function stripWorkerCallbacks(params: RunAgentAttemptParams): AgentRuntimeWorkerRunParams {
const unsupported: string[] = [];
if (params.opts.abortSignal && typeof params.opts.abortSignal !== "object") {
unsupported.push("opts.abortSignal");
}
if (unsupported.length > 0) {
throw new AgentWorkerUnsupportedParamsError(unsupported);
}
const {
onAgentEvent: _onAgentEvent,
onUserMessagePersisted: _onUserMessagePersisted,
...rest
} = params;
const { abortSignal: _abortSignal, ...opts } = params.opts;
return { ...rest, opts };
}
function syncParentSessionStoreFromDisk(params: RunAgentAttemptParams): void {
if (!params.sessionStore || !params.storePath || !fs.existsSync(params.storePath)) {
return;
}
const latest = loadSessionStore(params.storePath, { skipCache: true, clone: false });
const mutableStore = params.sessionStore as Record<string, SessionEntry>;
for (const key of Object.keys(mutableStore)) {
delete mutableStore[key];
}
Object.assign(mutableStore, latest);
}
export async function runAgentAttemptInWorker(
params: RunAgentAttemptParams,
options: RunAgentAttemptInWorkerOptions = {},
): Promise<RunAgentAttemptResult> {
const workerParams = stripWorkerCallbacks(params);
const worker = new Worker(options.workerUrl ?? resolveWorkerUrl(), {
execArgv:
(options.usePermissions ?? shouldUseWorkerPermissions(params.cfg))
? [
...(options.execArgv ?? process.execArgv),
...buildAgentWorkerPermissionExecArgv({
workspaceDir: params.workspaceDir,
agentDir: params.agentDir,
sessionFile: params.sessionFile,
storePath: params.storePath,
}),
]
: (options.execArgv ?? process.execArgv),
name: `openclaw-agent-runtime:${params.sessionAgentId}:${params.sessionId}`,
});
let settled = false;
const cleanup = () => {
params.opts.abortSignal?.removeEventListener("abort", abort);
if (!settled) {
void worker.terminate();
}
};
const abort = () => {
// oxlint-disable unicorn/require-post-message-target-origin -- worker_threads Worker has no targetOrigin.
worker.postMessage({
type: "abort",
reason: serializeAbortReason(params.opts.abortSignal?.reason),
});
// oxlint-enable unicorn/require-post-message-target-origin
};
return await new Promise<RunAgentAttemptResult>((resolve, reject) => {
worker.once("error", (error) => {
settled = true;
cleanup();
try {
syncParentSessionStoreFromDisk(params);
reject(error);
} catch (syncError) {
reject(syncError);
}
});
worker.once("exit", (code) => {
if (settled) {
return;
}
settled = true;
cleanup();
try {
syncParentSessionStoreFromDisk(params);
reject(new Error(`Agent runtime worker exited before completing run (code ${code})`));
} catch (syncError) {
reject(syncError);
}
});
worker.on("message", (message: AgentWorkerToParentMessage) => {
if (message.type === "agentEvent") {
if (message.origin === "runtime") {
const event = toParentVisibleAgentEvent(params, message);
emitAgentEvent(event);
if (isActiveAttemptEvent(params, event)) {
params.onAgentEvent(toCallbackAgentEvent(event));
}
} else {
params.onAgentEvent(message.event);
}
return;
}
if (message.type === "userMessagePersisted") {
params.onUserMessagePersisted?.(message.message);
return;
}
if (message.type === "result") {
settled = true;
cleanup();
try {
syncParentSessionStoreFromDisk(params);
resolve(message.result);
} catch (error) {
reject(error);
} finally {
void worker.terminate();
}
return;
}
if (message.type === "error") {
settled = true;
cleanup();
try {
syncParentSessionStoreFromDisk(params);
reject(deserializeWorkerError(message));
} catch (error) {
reject(error);
} finally {
void worker.terminate();
}
}
});
params.opts.abortSignal?.addEventListener("abort", abort, { once: true });
try {
// oxlint-disable unicorn/require-post-message-target-origin -- worker_threads Worker has no targetOrigin.
worker.postMessage({
type: "run",
params: workerParams,
initialAbort: serializeInitialAbort(params.opts.abortSignal),
});
// oxlint-enable unicorn/require-post-message-target-origin
} catch (error) {
settled = true;
cleanup();
void worker.terminate();
reject(error);
}
});
}

View File

@@ -0,0 +1,45 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AgentEventPayload } from "../../infra/agent-events.js";
import type { runAgentAttempt, RunAgentAttemptParams } from "../command/attempt-execution.js";
export type AgentRuntimeWorkerRunParams = Omit<
RunAgentAttemptParams,
"onAgentEvent" | "onUserMessagePersisted" | "opts"
> & {
opts: Omit<RunAgentAttemptParams["opts"], "abortSignal">;
};
export type RunAgentAttemptResult = Awaited<ReturnType<typeof runAgentAttempt>>;
export type SerializedWorkerError = {
name?: string;
message: string;
stack?: string;
code?: string;
control?: {
type: "liveSessionModelSwitch";
provider: string;
model: string;
authProfileId?: string;
authProfileIdSource?: "auto" | "user";
};
};
export type AgentWorkerToParentMessage =
| {
type: "agentEvent";
origin: "callback";
event: { stream: string; data?: Record<string, unknown>; sessionKey?: string };
}
| {
type: "agentEvent";
origin: "runtime";
event: AgentEventPayload;
}
| { type: "userMessagePersisted"; message: Extract<AgentMessage, { role: "user" }> }
| { type: "result"; result: RunAgentAttemptResult }
| { type: "error"; error: SerializedWorkerError };
export type ParentToAgentWorkerMessage =
| { type: "run"; params: AgentRuntimeWorkerRunParams; initialAbort?: { reason?: unknown } }
| { type: "abort"; reason?: unknown };

View File

@@ -0,0 +1,66 @@
import { parentPort } from "node:worker_threads";
import { onAgentEvent } from "../../infra/agent-events.js";
import { runAgentAttempt } from "../command/attempt-execution.js";
import type {
AgentWorkerToParentMessage,
ParentToAgentWorkerMessage,
} from "./agent-runtime.types.js";
import { serializeWorkerError } from "./errors.js";
import { restoreAgentWorkerPluginRuntime } from "./plugin-runtime.js";
function post(message: AgentWorkerToParentMessage): void {
// oxlint-disable-next-line unicorn/require-post-message-target-origin -- worker_threads MessagePort has no targetOrigin.
parentPort?.postMessage(message);
}
let abortController: AbortController | undefined;
parentPort?.on("message", (message: ParentToAgentWorkerMessage) => {
if (message.type === "abort") {
abortController?.abort(message.reason);
return;
}
if (message.type !== "run") {
return;
}
abortController = new AbortController();
if (message.initialAbort) {
abortController.abort(message.initialAbort.reason);
}
const stopRuntimeEventBridge = onAgentEvent((event) => {
post({ type: "agentEvent", origin: "runtime", event });
});
try {
restoreAgentWorkerPluginRuntime(message.params);
} catch (error: unknown) {
post(serializeWorkerError(error));
stopRuntimeEventBridge();
abortController = undefined;
return;
}
void runAgentAttempt({
...message.params,
opts: {
...message.params.opts,
abortSignal: abortController.signal,
},
onAgentEvent: (event) => {
post({ type: "agentEvent", origin: "callback", event });
},
onUserMessagePersisted: (persisted) => {
post({ type: "userMessagePersisted", message: persisted });
},
})
.then((result) => {
post({ type: "result", result });
})
.catch((error: unknown) => {
post(serializeWorkerError(error));
})
.finally(() => {
stopRuntimeEventBridge();
abortController = undefined;
});
});

View File

@@ -0,0 +1,67 @@
import { LiveSessionModelSwitchError } from "../live-model-switch-error.js";
import type { AgentWorkerToParentMessage, SerializedWorkerError } from "./agent-runtime.types.js";
function serializeControlError(error: Error): Pick<SerializedWorkerError, "control"> | undefined {
if (error instanceof LiveSessionModelSwitchError) {
return {
control: {
type: "liveSessionModelSwitch",
provider: error.provider,
model: error.model,
...(error.authProfileId ? { authProfileId: error.authProfileId } : {}),
...(error.authProfileIdSource ? { authProfileIdSource: error.authProfileIdSource } : {}),
},
};
}
return undefined;
}
function deserializeControlError(error: SerializedWorkerError): Error | undefined {
if (error.control?.type === "liveSessionModelSwitch") {
return new LiveSessionModelSwitchError({
provider: error.control.provider,
model: error.control.model,
...(error.control.authProfileId ? { authProfileId: error.control.authProfileId } : {}),
...(error.control.authProfileIdSource
? { authProfileIdSource: error.control.authProfileIdSource }
: {}),
});
}
return undefined;
}
export function serializeWorkerError(error: unknown): AgentWorkerToParentMessage {
if (error instanceof Error) {
const code =
typeof (error as Error & { code?: unknown }).code === "string"
? (error as Error & { code: string }).code
: undefined;
return {
type: "error",
error: {
name: error.name,
message: error.message,
stack: error.stack,
...(code ? { code } : {}),
...serializeControlError(error),
},
};
}
return { type: "error", error: { message: String(error) } };
}
export function deserializeWorkerError(
message: AgentWorkerToParentMessage & { type: "error" },
): Error {
const error = deserializeControlError(message.error) ?? new Error(message.error.message);
if (!message.error.control) {
error.name = message.error.name ?? "AgentWorkerError";
}
if (message.error.stack) {
error.stack = message.error.stack;
}
if (message.error.code) {
(error as Error & { code?: string }).code = message.error.code;
}
return error;
}

View File

@@ -0,0 +1,46 @@
import path from "node:path";
import { describe, expect, it } from "vitest";
import { buildAgentWorkerPermissionExecArgv } from "./permissions.js";
describe("agent worker permissions", () => {
it("builds deterministic Node permission flags for an agent worker", () => {
const workspaceDir = path.resolve("/tmp/openclaw-worker/workspace");
const agentDir = path.resolve("/tmp/openclaw-worker/agent");
const sessionFile = path.resolve("/tmp/openclaw-worker/agent/session.jsonl");
const storePath = path.resolve("/tmp/openclaw-worker/state/sessions.json");
const args = buildAgentWorkerPermissionExecArgv({
workspaceDir,
agentDir,
sessionFile,
storePath,
readRoots: [workspaceDir],
writeRoots: [path.join(workspaceDir, "out/*")],
});
expect(args[0]).toBe("--permission");
expect(args).toContain(`--allow-fs-read=${workspaceDir}`);
expect(args).toContain(`--allow-fs-read=${workspaceDir}/*`);
expect(args).toContain(`--allow-fs-read=${agentDir}/*`);
expect(args).toContain(`--allow-fs-read=${sessionFile}`);
expect(args).toContain(`--allow-fs-read=${sessionFile}.lock`);
expect(args).toContain(`--allow-fs-read=${storePath}`);
expect(args).toContain(`--allow-fs-read=${storePath}.lock`);
expect(args).toContain(`--allow-fs-read=${path.resolve("src/*")}`);
expect(args).toContain(`--allow-fs-read=${path.resolve("extensions/*")}`);
expect(args).toContain(`--allow-fs-write=${workspaceDir}/*`);
expect(args).toContain(`--allow-fs-write=${path.join(workspaceDir, "out/*")}`);
expect(args).toContain(`--allow-fs-write=${agentDir}/*`);
expect(args).toContain(`--allow-fs-write=${sessionFile}`);
expect(args).toContain(`--allow-fs-write=${sessionFile}.lock`);
expect(args).toContain(`--allow-fs-write=${path.dirname(sessionFile)}/*`);
expect(args).toContain(`--allow-fs-write=${storePath}`);
expect(args).toContain(`--allow-fs-write=${storePath}.lock`);
expect(args).toContain(`--allow-fs-write=${path.dirname(storePath)}/*`);
expect(args.filter((arg) => arg === "--permission")).toHaveLength(1);
const firstWriteArg = args.findIndex((arg) => arg.startsWith("--allow-fs-write="));
const lastReadArg = args.findLastIndex((arg) => arg.startsWith("--allow-fs-read="));
expect(firstWriteArg).toBeGreaterThan(0);
expect(lastReadArg).toBeLessThan(firstWriteArg);
});
});

View File

@@ -0,0 +1,102 @@
import { basename, dirname, resolve } from "node:path";
import { fileURLToPath } from "node:url";
type AgentWorkerPermissionRoots = {
workspaceDir: string;
agentDir?: string;
sessionFile?: string;
storePath?: string;
readRoots?: string[];
writeRoots?: string[];
};
function normalizeRoot(path: string | undefined): string | undefined {
const trimmed = path?.trim();
if (!trimmed) {
return undefined;
}
return resolve(trimmed);
}
function addRoot(target: Set<string>, path: string | undefined): void {
const normalized = normalizeRoot(path);
if (normalized) {
target.add(normalized);
}
}
function addMutableFileRoots(params: {
readRoots: Set<string>;
writeRoots: Set<string>;
filePath: string | undefined;
}): void {
const filePath = normalizeRoot(params.filePath);
if (!filePath) {
return;
}
const lockPath = `${filePath}.lock`;
addRoot(params.readRoots, filePath);
addRoot(params.readRoots, lockPath);
addRoot(params.writeRoots, filePath);
addRoot(params.writeRoots, lockPath);
addRoot(params.writeRoots, `${dirname(filePath)}/*`);
}
function addNodeModuleReadRoots(target: Set<string>): void {
let current = dirname(fileURLToPath(import.meta.url));
let previous = "";
while (current !== previous) {
addRoot(target, `${current}/node_modules/*`);
previous = current;
current = dirname(current);
}
}
function addRuntimeReadRoots(target: Set<string>): void {
let current = dirname(fileURLToPath(import.meta.url));
let previous = "";
while (current !== previous) {
const name = basename(current);
if (name === "dist" || name === "src") {
addRoot(target, `${current}/*`);
}
if (name === "src") {
addRoot(target, `${dirname(current)}/extensions/*`);
}
previous = current;
current = dirname(current);
}
}
export function buildAgentWorkerPermissionExecArgv(roots: AgentWorkerPermissionRoots): string[] {
const readRoots = new Set<string>();
const writeRoots = new Set<string>();
addRoot(readRoots, `${roots.workspaceDir}/*`);
addRoot(writeRoots, `${roots.workspaceDir}/*`);
addRoot(readRoots, roots.agentDir ? `${roots.agentDir}/*` : undefined);
addRoot(writeRoots, roots.agentDir ? `${roots.agentDir}/*` : undefined);
addMutableFileRoots({ readRoots, writeRoots, filePath: roots.sessionFile });
addMutableFileRoots({ readRoots, writeRoots, filePath: roots.storePath });
for (const root of roots.readRoots ?? []) {
addRoot(readRoots, root);
}
for (const root of roots.writeRoots ?? []) {
addRoot(writeRoots, root);
}
addNodeModuleReadRoots(readRoots);
addRuntimeReadRoots(readRoots);
const args = ["--permission"];
for (const root of [...readRoots].toSorted()) {
args.push(`--allow-fs-read=${root}`);
}
for (const root of [...writeRoots].toSorted()) {
args.push(`--allow-fs-write=${root}`);
}
return args;
}

View File

@@ -0,0 +1,58 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
import type { AgentRuntimeWorkerRunParams } from "./agent-runtime.types.js";
import { restoreAgentWorkerPluginRuntime } from "./plugin-runtime.js";
vi.mock("../runtime-plugins.js", () => ({
ensureRuntimePluginsLoaded: vi.fn(),
}));
const mockedEnsureRuntimePluginsLoaded = vi.mocked(ensureRuntimePluginsLoaded);
function makeParams(): AgentRuntimeWorkerRunParams {
return {
providerOverride: "openai",
originalProvider: "openai",
modelOverride: "gpt-5.5",
cfg: { plugins: { entries: { demo: { enabled: true } } } } as OpenClawConfig,
sessionEntry: undefined,
sessionId: "session-worker-test",
sessionKey: "agent:main:worker-test",
sessionAgentId: "main",
sessionFile: "/tmp/openclaw-worker-session.jsonl",
workspaceDir: "/tmp/openclaw-worker-workspace",
body: "hello",
isFallbackRetry: false,
resolvedThinkLevel: "medium",
timeoutMs: 1_000,
runId: "run-worker-test",
opts: { message: "hello", senderIsOwner: false },
runContext: {} as AgentRuntimeWorkerRunParams["runContext"],
spawnedBy: undefined,
messageChannel: undefined,
skillsSnapshot: undefined,
resolvedVerboseLevel: undefined,
agentDir: "/tmp/openclaw-worker-agent",
authProfileProvider: "openai",
sessionHasHistory: false,
};
}
describe("agent worker plugin runtime", () => {
beforeEach(() => {
mockedEnsureRuntimePluginsLoaded.mockClear();
});
it("restores gateway-bindable runtime plugins before worker attempts", () => {
const params = makeParams();
restoreAgentWorkerPluginRuntime(params);
expect(mockedEnsureRuntimePluginsLoaded).toHaveBeenCalledWith({
config: params.cfg,
workspaceDir: params.workspaceDir,
allowGatewaySubagentBinding: true,
});
});
});

View File

@@ -0,0 +1,10 @@
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
import type { AgentRuntimeWorkerRunParams } from "./agent-runtime.types.js";
export function restoreAgentWorkerPluginRuntime(params: AgentRuntimeWorkerRunParams): void {
ensureRuntimePluginsLoaded({
config: params.cfg,
workspaceDir: params.workspaceDir,
allowGatewaySubagentBinding: true,
});
}

View File

@@ -980,6 +980,12 @@ export const FIELD_HELP: Record<string, string> = {
"Experimental agent-default flags. Keep these off unless you are intentionally testing a preview surface.",
"agents.defaults.experimental.localModelLean":
"Experimental local-model prompt trim. When enabled, OpenClaw drops heavyweight default tools like browser, cron, and message for weaker or smaller local-model backends.",
"agents.defaults.experimental.runtimeIsolation":
'Experimental command-agent runtime isolation. Set mode="worker" to run /agent command attempts inside a Node worker thread; gateway reply runs remain in-process while their streaming callbacks stay parent-owned.',
"agents.defaults.experimental.runtimeIsolation.mode":
'Choose the experimental command-agent runtime isolation mode. "off" keeps the current in-process path; "worker" runs /agent command attempts in Node worker threads.',
"agents.defaults.experimental.runtimeIsolation.permissions":
"Also enable Node permission flags for the worker runtime. More restrictive and runtime-sensitive than worker isolation itself.",
"agents.defaults.bootstrapPromptTruncationWarning":
'Inject agent-visible warning text when bootstrap files are truncated: "off", "once" (default), or "always".',
"agents.defaults.startupContext":

View File

@@ -393,6 +393,10 @@ export const FIELD_LABELS: Record<string, string> = {
"agents.defaults.bootstrapTotalMaxChars": "Bootstrap Total Max Chars",
"agents.defaults.experimental": "Experimental Agent Flags",
"agents.defaults.experimental.localModelLean": "Enable Lean Local Model Mode (Experimental)",
"agents.defaults.experimental.runtimeIsolation": "Command Runtime Isolation (Experimental)",
"agents.defaults.experimental.runtimeIsolation.mode": "Command Runtime Isolation Mode",
"agents.defaults.experimental.runtimeIsolation.permissions":
"Command Runtime Isolation Permissions",
"agents.defaults.bootstrapPromptTruncationWarning": "Bootstrap Prompt Truncation Warning",
"agents.defaults.startupContext": "Startup Context",
"agents.defaults.startupContext.enabled": "Enable Startup Context",

View File

@@ -1,7 +1,14 @@
import { execFile } from "node:child_process";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { promisify } from "node:util";
import { afterEach, describe, expect, it } from "vitest";
import {
clearSessionStoreCacheForTest,
getSessionStoreWriterQueueSizeForTest,
loadSessionStore,
withSessionStoreWriterForTest,
} from "./store.js";
@@ -15,6 +22,9 @@ const createDeferred = <T>() => {
return { promise, resolve, reject };
};
const execFileAsync = promisify(execFile);
const testDir = path.dirname(fileURLToPath(import.meta.url));
describe("session store writer", () => {
afterEach(() => {
clearSessionStoreCacheForTest();
@@ -53,4 +63,45 @@ describe("session store writer", () => {
);
expect(getSessionStoreWriterQueueSizeForTest()).toBe(0);
});
it("serializes session store updates across worker threads", async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-store-writer-"));
try {
const storePath = path.join(tmpDir, "sessions.json");
await fs.writeFile(storePath, "{}\n", "utf8");
const source = `
const { updateSessionStore } = await import("./src/config/sessions/store.ts");
await updateSessionStore(process.env.OPENCLAW_TEST_STORE_PATH, async (store) => {
store[process.env.OPENCLAW_TEST_SESSION_KEY] = {
sessionId: process.env.OPENCLAW_TEST_SESSION_KEY,
updatedAt: Date.now()
};
await new Promise((resolve) => setTimeout(resolve, 75));
});
`;
const workers = Array.from({ length: 4 }, (_, index) =>
execFileAsync(process.execPath, ["--import", "tsx", "--eval", source], {
cwd: path.resolve(testDir, "../../.."),
env: {
...process.env,
OPENCLAW_TEST_SESSION_KEY: `agent:worker:${index}`,
OPENCLAW_TEST_STORE_PATH: storePath,
},
}),
);
await Promise.all(workers);
const store = loadSessionStore(storePath, { skipCache: true });
expect(Object.keys(store).toSorted()).toEqual([
"agent:worker:0",
"agent:worker:1",
"agent:worker:2",
"agent:worker:3",
]);
} finally {
await fs.rm(tmpDir, { recursive: true, force: true });
}
});
});

View File

@@ -1,9 +1,27 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { threadId } from "node:worker_threads";
import {
WRITER_QUEUES,
type SessionStoreWriterQueue,
type SessionStoreWriterTask,
} from "./store-writer-state.js";
const DEFAULT_FILE_LOCK_TIMEOUT_MS = 60_000;
const DEFAULT_FILE_LOCK_STALE_MS = 30 * 60 * 1000;
const DEFAULT_FILE_LOCK_POLL_MS = 25;
const ORPHAN_LOCK_PAYLOAD_GRACE_MS = 5_000;
type SessionStoreLockPayload = {
pid?: number;
threadId?: number;
hostname?: string;
createdAt?: string;
token?: string;
};
export async function withSessionStoreWriterForTest<T>(
storePath: string,
fn: () => Promise<T>,
@@ -11,6 +29,138 @@ export async function withSessionStoreWriterForTest<T>(
return await runExclusiveSessionStoreWrite(storePath, fn);
}
function resolvePositiveMs(value: string | undefined, fallback: number): number {
const parsed = value ? Number(value) : Number.NaN;
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function resolveSessionStoreLockPath(storePath: string): string {
return `${path.resolve(storePath)}.lock`;
}
function getErrorCode(error: unknown): string | undefined {
return error && typeof error === "object" && "code" in error
? String((error as { code?: unknown }).code)
: undefined;
}
async function readSessionStoreLockPayload(
lockPath: string,
): Promise<SessionStoreLockPayload | null> {
try {
return JSON.parse(await fs.readFile(lockPath, "utf8")) as SessionStoreLockPayload;
} catch {
return null;
}
}
async function isSessionStoreLockStale(
lockPath: string,
payload: SessionStoreLockPayload | null,
staleMs: number,
): Promise<boolean> {
if (!payload?.createdAt) {
const stat = await fs.stat(lockPath).catch(() => null);
if (!stat) {
return true;
}
return Date.now() - stat.mtimeMs > ORPHAN_LOCK_PAYLOAD_GRACE_MS;
}
const createdAtMs = Date.parse(payload.createdAt);
if (!Number.isFinite(createdAtMs)) {
const stat = await fs.stat(lockPath).catch(() => null);
if (!stat) {
return true;
}
return Date.now() - stat.mtimeMs > ORPHAN_LOCK_PAYLOAD_GRACE_MS;
}
return Date.now() - createdAtMs > staleMs;
}
async function removeStaleSessionStoreLock(
lockPath: string,
expectedToken: string | undefined,
): Promise<void> {
if (!expectedToken) {
await fs.rm(lockPath, { force: true });
return;
}
const current = await readSessionStoreLockPayload(lockPath);
if (current?.token === expectedToken) {
await fs.rm(lockPath, { force: true });
}
}
async function acquireSessionStoreFileLock(storePath: string): Promise<() => Promise<void>> {
const lockPath = resolveSessionStoreLockPath(storePath);
const timeoutMs = resolvePositiveMs(
process.env.OPENCLAW_SESSION_STORE_LOCK_TIMEOUT_MS,
DEFAULT_FILE_LOCK_TIMEOUT_MS,
);
const staleMs = resolvePositiveMs(
process.env.OPENCLAW_SESSION_STORE_LOCK_STALE_MS,
DEFAULT_FILE_LOCK_STALE_MS,
);
const deadline = Date.now() + timeoutMs;
const payload: SessionStoreLockPayload = {
pid: process.pid,
threadId,
hostname: os.hostname(),
createdAt: new Date().toISOString(),
token: randomUUID(),
};
await fs.mkdir(path.dirname(lockPath), { recursive: true });
for (;;) {
let handle: fs.FileHandle | undefined;
try {
handle = await fs.open(lockPath, "wx", 0o600);
await handle.writeFile(`${JSON.stringify(payload)}\n`, "utf8");
await handle.close();
let released = false;
return async () => {
if (released) {
return;
}
released = true;
await removeStaleSessionStoreLock(lockPath, payload.token).catch(() => undefined);
};
} catch (error) {
await handle?.close().catch(() => undefined);
if (getErrorCode(error) !== "EEXIST") {
throw error;
}
const existing = await readSessionStoreLockPayload(lockPath);
if (await isSessionStoreLockStale(lockPath, existing, staleMs)) {
await removeStaleSessionStoreLock(lockPath, existing?.token).catch(() => undefined);
continue;
}
if (Date.now() >= deadline) {
throw new Error(
`Timed out waiting for session store writer lock: ${lockPath} ` +
`(owner pid=${existing?.pid ?? "unknown"} thread=${existing?.threadId ?? "unknown"})`,
{ cause: error },
);
}
await sleep(DEFAULT_FILE_LOCK_POLL_MS);
}
}
}
async function runWithSessionStoreFileLock<T>(storePath: string, fn: () => Promise<T>): Promise<T> {
const release = await acquireSessionStoreFileLock(storePath);
try {
return await fn();
} finally {
await release();
}
}
function getOrCreateWriterQueue(storePath: string): SessionStoreWriterQueue {
const existing = WRITER_QUEUES.get(storePath);
if (existing) {
@@ -43,7 +193,7 @@ async function drainSessionStoreWriterQueue(storePath: string): Promise<void> {
let failed: unknown;
let hasFailure = false;
try {
result = await task.fn();
result = await runWithSessionStoreFileLock(storePath, task.fn);
} catch (err) {
hasFailure = true;
failed = err;

View File

@@ -92,6 +92,21 @@ export type AgentContextLimitsConfig = {
postCompactionMaxChars?: number;
};
export type AgentRuntimeIsolationExperimentalConfig = {
/**
* Run /agent command attempts inside an isolated Node worker thread.
* Experimental preview only; gateway reply runs stay in-process until their
* streaming callbacks have a dedicated worker bridge, and CLI harnesses may
* still spawn their own child processes.
*/
mode?: "off" | "worker";
/**
* Add Node permission flags to the worker process. This is stricter and more
* runtime-sensitive than worker isolation itself.
*/
permissions?: boolean;
};
export type CliBackendConfig = {
/** CLI command to execute (absolute path or on PATH). */
command: string;
@@ -258,6 +273,12 @@ export type AgentDefaultsConfig = {
* model backends. Experimental preview only.
*/
localModelLean?: boolean;
/**
* Run /agent command attempts through an isolated Node worker runtime.
* Gateway reply runs remain in-process until their streaming callbacks have
* a dedicated worker bridge. Experimental preview only.
*/
runtimeIsolation?: AgentRuntimeIsolationExperimentalConfig;
};
/**
* Agent-visible bootstrap truncation warning mode:

View File

@@ -64,6 +64,21 @@ describe("agent defaults schema", () => {
expect(result.experimental?.localModelLean).toBe(true);
});
it("accepts experimental.runtimeIsolation", () => {
const result = AgentDefaultsSchema.parse({
experimental: {
runtimeIsolation: {
mode: "worker",
permissions: true,
},
},
})!;
expect(result.experimental?.runtimeIsolation).toEqual({
mode: "worker",
permissions: true,
});
});
it("accepts contextInjection: always", () => {
const result = AgentDefaultsSchema.parse({ contextInjection: "always" })!;
expect(result.contextInjection).toBe("always");

View File

@@ -105,6 +105,13 @@ export const AgentDefaultsSchema = z
experimental: z
.object({
localModelLean: z.boolean().optional(),
runtimeIsolation: z
.object({
mode: z.union([z.literal("off"), z.literal("worker")]).optional(),
permissions: z.boolean().optional(),
})
.strict()
.optional(),
})
.strict()
.optional(),

View File

@@ -1,7 +1,8 @@
import { registerLegacyContextEngine } from "./legacy.registration.js";
import { getContextEngineFactory } from "./registry.js";
/**
* Ensures all built-in context engines are registered exactly once.
* Ensures all built-in context engines are registered.
*
* The legacy engine is always registered as a safe fallback so that
* `resolveContextEngine()` can resolve the default "legacy" slot without
@@ -13,7 +14,7 @@ import { registerLegacyContextEngine } from "./legacy.registration.js";
let initialized = false;
export function ensureContextEnginesInitialized(): void {
if (initialized) {
if (initialized && getContextEngineFactory("legacy")) {
return;
}
initialized = true;

View File

@@ -232,6 +232,8 @@ function buildCoreDistEntries(): Record<string, string> {
"plugins/runtime/index": "src/plugins/runtime/index.ts",
"llm-slug-generator": "src/hooks/llm-slug-generator.ts",
"mcp/plugin-tools-serve": "src/mcp/plugin-tools-serve.ts",
// Stable worker entry loaded from bundled agent runtime chunks.
"agent-runtime.worker": "src/agents/worker-runtime/agent-runtime.worker.ts",
};
}