Compare commits

..

1 Commits

Author SHA1 Message Date
Peter Steinberger
06bf20db4a fix(slack): stop logging inbound message previews 2026-06-24 07:17:35 -07:00
10 changed files with 102 additions and 1018 deletions

View File

@@ -528,25 +528,13 @@ candidate contains redacted secret placeholders such as `***`.
and re-checked, so a path that lexically lives in a config dir but whose
real target escapes every allowed root is still rejected.
- **Error handling**: clear errors for missing files, parse errors, circular includes, invalid path format, and excessive length
- **Hot reload**: edits to regular include files successfully resolved by the
last valid config are watched, including nested includes. Changing an
authored `$include` target inside a watched file re-resolves the graph.
Paths that were missing or invalid during the last successful resolution,
and filesystem or symlink retargets that do not modify a watched file, are
not discovered automatically; edit `openclaw.json` or restart the Gateway
to resolve the graph again.
</Accordion>
</AccordionGroup>
## Config hot reload
The Gateway watches `~/.openclaw/openclaw.json` plus the canonical include files
successfully resolved by the last valid config, and applies changes
automatically - no manual restart needed for most settings. Invalid candidates
keep the last valid watch set. Missing or invalid paths outside that set, plus
filesystem or symlink retargets that do not modify a watched file, require an
`openclaw.json` edit or a Gateway restart before they can be discovered.
The Gateway watches `~/.openclaw/openclaw.json` and applies changes automatically - no manual restart needed for most settings.
Direct file edits are treated as untrusted until they validate. The watcher waits
for editor temp-write/rename churn to settle, reads the final file, and rejects

View File

@@ -31,7 +31,20 @@ import {
} from "./prepare.test-helpers.js";
import { clearSlackSubteamMentionCacheForTest } from "./subteam-mentions.js";
const enqueueSystemEventMock = vi.hoisted(() => vi.fn());
const { enqueueSystemEventMock, logVerboseMock, shouldLogVerboseMock } = vi.hoisted(() => ({
enqueueSystemEventMock: vi.fn(),
logVerboseMock: vi.fn(),
shouldLogVerboseMock: vi.fn(() => false),
}));
vi.mock("openclaw/plugin-sdk/runtime-env", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/runtime-env")>();
return {
...actual,
logVerbose: (...args: unknown[]) => logVerboseMock(...args),
shouldLogVerbose: () => shouldLogVerboseMock(),
};
});
vi.mock("openclaw/plugin-sdk/system-event-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/system-event-runtime")>();
@@ -54,6 +67,9 @@ describe("slack prepareSlackMessage inbound contract", () => {
clearSlackAllowFromCacheForTest();
clearSlackSubteamMentionCacheForTest();
enqueueSystemEventMock.mockClear();
logVerboseMock.mockClear();
shouldLogVerboseMock.mockReset();
shouldLogVerboseMock.mockReturnValue(false);
});
afterAll(() => {
@@ -171,6 +187,28 @@ describe("slack prepareSlackMessage inbound contract", () => {
expect(prepared.ctxPayload.BodyForAgent).toContain(body);
});
it("logs inbound metadata without logging message content", async () => {
const body = "confidential acquisition target: northstar; do not include this text in logs";
shouldLogVerboseMock.mockReturnValue(true);
const prepared = await prepareWithDefaultCtx(createSlackMessage({ text: body }));
assertPrepared(prepared);
const inboundLog = logVerboseMock.mock.calls
.map(([entry]) => entry)
.find((entry) => typeof entry === "string" && entry.startsWith("slack inbound:"));
const verboseOutput = logVerboseMock.mock.calls
.flat()
.filter((entry): entry is string => typeof entry === "string")
.join("\n");
expect(inboundLog).toBe(
`slack inbound: account=${prepared.route.accountId} agent=${prepared.route.agentId} channel=D123 message_ts=1.000 thread_ts=none from=slack:U1 chat=direct chars=${body.length}`,
);
expect(verboseOutput).not.toContain(body);
expect(verboseOutput).not.toContain("confidential acquisition target");
expect(verboseOutput).not.toContain("preview=");
});
it("prepares wildcard open-policy account DMs", async () => {
const ctx = createInboundSlackCtx({
cfg: {

View File

@@ -1386,7 +1386,9 @@ export async function prepareSlackMessage(params: {
}
if (shouldLogVerbose()) {
logVerbose(`slack inbound: channel=${message.channel} from=${slackFrom} preview="${preview}"`);
logVerbose(
`slack inbound: account=${route.accountId} agent=${route.agentId} channel=${message.channel} message_ts=${message.ts ?? "unknown"} thread_ts=${effectiveMessageThreadId ?? "none"} from=${slackFrom} chat=${chatType} chars=${rawBody.length}`,
);
}
const updateLastRouteSessionKey = resolveInboundLastRouteSessionKey({ route, sessionKey });

View File

@@ -1199,7 +1199,6 @@ function resolveConfigIncludesForRead(
deps: Required<ConfigIoDeps>,
includeFileHashesForWrite?: Record<string, string>,
includeFileTargetsForWrite?: Record<string, string>,
includeFilePaths?: Set<string>,
): unknown {
const allowedRoots = resolveIncludeRoots(deps.env, deps.homedir);
const recordIncludeTarget = (resolvedPath: string, canonicalPath?: string) => {
@@ -1232,10 +1231,7 @@ function resolveConfigIncludesForRead(
resolvedPath,
rootRealDir,
ioFs: deps.fs,
onResolvedPath: (canonicalPath) => {
recordIncludeTarget(resolvedPath, canonicalPath);
includeFilePaths?.add(path.normalize(canonicalPath));
},
onResolvedPath: (canonicalPath) => recordIncludeTarget(resolvedPath, canonicalPath),
});
if (includeFileHashesForWrite) {
includeFileHashesForWrite[path.normalize(resolvedPath)] = hashConfigIncludeRaw(raw);
@@ -1311,13 +1307,11 @@ type ReadConfigFileSnapshotInternalResult = {
envSnapshotForRestore?: Record<string, string | undefined>;
includeFileHashesForWrite?: Record<string, string>;
includeFileTargetsForWrite?: Record<string, string>;
includeFilePaths?: readonly string[];
pluginMetadataSnapshot?: PluginMetadataSnapshot;
};
export type ReadConfigFileSnapshotWithPluginMetadataResult = {
snapshot: ConfigFileSnapshot;
includeFilePaths?: readonly string[];
pluginMetadataSnapshot?: PluginMetadataSnapshot;
};
@@ -1874,7 +1868,6 @@ export function createConfigIO(
let fallbackEnvSnapshotForRestore: Record<string, string | undefined> | undefined;
const includeFileHashesForWrite: Record<string, string> = {};
const includeFileTargetsForWrite: Record<string, string> = {};
const includeFilePaths = new Set<string>();
try {
const raw = await deps.measure("config.snapshot.read.file", () =>
@@ -1923,7 +1916,6 @@ export function createConfigIO(
deps,
includeFileHashesForWrite,
includeFileTargetsForWrite,
includeFilePaths,
),
);
} catch (err) {
@@ -2094,7 +2086,6 @@ export function createConfigIO(
envSnapshotForRestore: readResolution.envSnapshotForRestore,
includeFileHashesForWrite,
includeFileTargetsForWrite,
includeFilePaths: [...includeFilePaths].toSorted(),
pluginMetadataSnapshot: validationPluginMetadata.getSnapshot(),
},
{ observe: !callerRejectedSuspiciousRecovery },
@@ -2160,7 +2151,6 @@ export function createConfigIO(
});
return {
snapshot: result.snapshot,
...(result.snapshot.valid ? { includeFilePaths: result.includeFilePaths ?? [] } : {}),
...(result.pluginMetadataSnapshot
? { pluginMetadataSnapshot: result.pluginMetadataSnapshot }
: {}),

View File

@@ -1865,57 +1865,6 @@ describe("config io write", () => {
});
});
it.runIf(process.platform !== "win32")(
"exposes only canonical valid include paths through the metadata wrapper",
async () => {
await withSuiteHome(async (home) => {
const configDir = path.join(home, ".openclaw");
const configPath = path.join(configDir, "openclaw.json");
const fragmentsDir = path.join(configDir, "fragments");
const aliasDir = path.join(configDir, "alias");
const defaultsPath = path.join(fragmentsDir, "defaults.json5");
const nestedPath = path.join(fragmentsDir, "nested.json5");
await fs.mkdir(fragmentsDir, { recursive: true });
await fs.symlink(fragmentsDir, aliasDir, "dir");
await fs.writeFile(nestedPath, '{ workspace: "~/.openclaw/workspace" }\n', "utf-8");
await fs.writeFile(
defaultsPath,
'{ $include: "./nested.json5", maxConcurrent: 1 }\n',
"utf-8",
);
await fs.writeFile(
configPath,
'{ agents: { defaults: { $include: "./alias/defaults.json5" } } }\n',
"utf-8",
);
const io = createConfigIO({
env: { OPENCLAW_TEST_FAST: "1" } as NodeJS.ProcessEnv,
homedir: () => home,
logger: silentLogger,
});
const valid = await io.readConfigFileSnapshotWithPluginMetadata();
expect(valid.snapshot.valid).toBe(true);
expect(valid.includeFilePaths).toEqual(
[await fs.realpath(defaultsPath), await fs.realpath(nestedPath)].toSorted(),
);
expect(valid.includeFilePaths).not.toContain(path.join(aliasDir, "defaults.json5"));
expect(valid.snapshot).not.toHaveProperty("includeFilePaths");
await fs.writeFile(nestedPath, "{ malformed", "utf-8");
const invalid = await io.readConfigFileSnapshotWithPluginMetadata();
expect(invalid.snapshot.valid).toBe(false);
expect(invalid).not.toHaveProperty("includeFilePaths");
await fs.rm(nestedPath);
const missing = await io.readConfigFileSnapshotWithPluginMetadata();
expect(missing.snapshot.valid).toBe(false);
expect(missing).not.toHaveProperty("includeFilePaths");
});
},
);
it("repairs invalid root-authored siblings without flattening included config", async () => {
await withSuiteHome(async (home) => {
const configPath = path.join(home, ".openclaw", "openclaw.json");

View File

@@ -1,6 +1,5 @@
// Gateway config reload tests cover changed-path detection, reload planning,
// plugin registry refresh, skill snapshot invalidation, and watcher behavior.
import nodePath from "node:path";
import chokidar from "chokidar";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { listChannelPlugins } from "../channels/plugins/index.js";
@@ -579,8 +578,8 @@ describe("resolveGatewayReloadSettings", () => {
});
});
type WatcherHandler = (value?: string | Error) => void;
type WatcherEvent = "add" | "change" | "unlink" | "error" | "ready";
type WatcherHandler = () => void;
type WatcherEvent = "add" | "change" | "unlink" | "error";
function createWatcherMock(effectiveUsePolling?: boolean) {
const handlers = new Map<WatcherEvent, WatcherHandler[]>();
@@ -593,9 +592,9 @@ function createWatcherMock(effectiveUsePolling?: boolean) {
handlers.set(event, existing);
return this;
},
emit(event: WatcherEvent, value?: string | Error) {
emit(event: WatcherEvent) {
for (const handler of handlers.get(event) ?? []) {
handler(value);
handler();
}
},
close: vi.fn(async () => {}),
@@ -660,30 +659,17 @@ function makeZeroDebounceHookWrite(persistedHash: string): ConfigWriteNotificati
}
function createReloaderHarness(
readSnapshot: () => Promise<
ConfigFileSnapshot | { snapshot: ConfigFileSnapshot; includeFilePaths?: readonly string[] }
>,
readSnapshot: () => Promise<ConfigFileSnapshot>,
options: {
initialCompareConfig?: OpenClawConfig;
initialInternalWriteHash?: string | null;
initialIncludeFilePaths?: readonly string[];
promoteSnapshot?: (snapshot: ConfigFileSnapshot, reason: string) => Promise<boolean>;
initialPluginInstallRecords?: Record<string, PluginInstallRecord>;
readPluginInstallRecords?: () => Promise<Record<string, PluginInstallRecord>>;
watchers?: ReturnType<typeof createWatcherMock>[];
} = {},
) {
const watchers = options.watchers ?? [createWatcherMock()];
const watcher = watchers[0] ?? createWatcherMock();
let watcherIndex = 0;
const watchSpy = vi.spyOn(chokidar, "watch").mockImplementation((_path, watchOptions) => {
const next = watchers[watcherIndex++];
if (!next) {
throw new Error("missing watcher mock");
}
next.options.usePolling = next.effectiveUsePolling ?? Boolean(watchOptions?.usePolling);
return next as unknown as never;
});
const watcher = createWatcherMock();
vi.spyOn(chokidar, "watch").mockReturnValue(watcher as unknown as never);
const onHotReload = vi.fn(async (_plan: GatewayReloadPlan, _nextConfig: OpenClawConfig) => {});
const onRestart = vi.fn((_plan: GatewayReloadPlan, _nextConfig: OpenClawConfig) => {});
let writeListener: ((event: ConfigWriteNotification) => void) | null = null;
@@ -704,7 +690,6 @@ function createReloaderHarness(
initialConfig: { gateway: { reload: { debounceMs: 0 } } },
initialCompareConfig: options.initialCompareConfig,
initialInternalWriteHash: options.initialInternalWriteHash,
initialIncludeFilePaths: options.initialIncludeFilePaths,
readSnapshot,
promoteSnapshot: options.promoteSnapshot,
initialPluginInstallRecords: options.initialPluginInstallRecords ?? {},
@@ -717,8 +702,6 @@ function createReloaderHarness(
});
return {
watcher,
watchers,
watchSpy,
onHotReload,
onRestart,
log,
@@ -1016,9 +999,7 @@ describe("startGatewayConfigReloader", () => {
await harness.reloader.stop();
});
it("does not replay a rejected graph and accepts a later content change", async () => {
const oldInclude = nodePath.normalize("/tmp/includes/old.json5");
const nextInclude = nodePath.normalize("/tmp/includes/next.json5");
it("does not promote external config edits when hot reload rejects them", async () => {
const acceptedSnapshot = makeSnapshot({
config: {
gateway: { reload: { debounceMs: 0 } },
@@ -1026,50 +1007,22 @@ describe("startGatewayConfigReloader", () => {
},
hash: "external-rejected-1",
});
const revisedSnapshot = makeSnapshot({
config: {
gateway: { reload: { debounceMs: 0 } },
hooks: { enabled: false },
},
hash: "external-revised-2",
});
const readSnapshot = vi
.fn()
.mockResolvedValueOnce({ snapshot: acceptedSnapshot, includeFilePaths: [nextInclude] })
.mockResolvedValueOnce({ snapshot: acceptedSnapshot, includeFilePaths: [nextInclude] })
.mockResolvedValueOnce({ snapshot: revisedSnapshot, includeFilePaths: [nextInclude] });
.fn<() => Promise<ConfigFileSnapshot>>()
.mockResolvedValueOnce(acceptedSnapshot);
const promoteSnapshot = vi.fn(async (_snapshot: ConfigFileSnapshot, _reason: string) => true);
const watchers = [createWatcherMock(), createWatcherMock(), createWatcherMock()];
const { watcher, onHotReload, log, reloader } = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [oldInclude],
promoteSnapshot,
watchers,
});
onHotReload.mockRejectedValueOnce(new Error("reload refused"));
watcher.emit("change");
await vi.runOnlyPendingTimersAsync();
await vi.runAllTimersAsync();
expect(onHotReload).toHaveBeenCalledTimes(1);
expect(promoteSnapshot).not.toHaveBeenCalled();
expect(log.error).toHaveBeenCalledWith("config reload failed: Error: reload refused");
watcher.emit("change");
await vi.runOnlyPendingTimersAsync();
expect(onHotReload).toHaveBeenCalledTimes(1);
expect(promoteSnapshot).not.toHaveBeenCalled();
expect(log.warn).toHaveBeenCalledWith(
"config reload skipped (previous apply failed; waiting for config change)",
);
watcher.emit("change");
await vi.runOnlyPendingTimersAsync();
expect(onHotReload).toHaveBeenCalledTimes(2);
expect(promoteSnapshot).toHaveBeenCalledTimes(1);
expect(promoteSnapshot).toHaveBeenCalledWith(revisedSnapshot, "valid-config");
await reloader.stop();
});
@@ -1151,461 +1104,6 @@ describe("startGatewayConfigReloader", () => {
await harness.reloader.stop();
});
it("retains a queued include reconciliation when an in-process hot reload throws", async () => {
const includePath = nodePath.normalize("/tmp/includes/active.json5");
const acceptedSnapshot = makeZeroDebounceHookSnapshot("internal-reconcile-1");
const readSnapshot = vi.fn().mockResolvedValueOnce({
snapshot: acceptedSnapshot,
includeFilePaths: [includePath],
});
const watchers = [createWatcherMock(), createWatcherMock()];
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [includePath],
watchers,
});
harness.onHotReload.mockRejectedValueOnce(new Error("reload refused"));
harness.emitWrite(makeZeroDebounceHookWrite("internal-reconcile-1"));
watchers[1]?.emit("ready");
await vi.runOnlyPendingTimersAsync();
await vi.runOnlyPendingTimersAsync();
expect(harness.log.error).toHaveBeenCalledWith("config reload failed: Error: reload refused");
expect(readSnapshot).toHaveBeenCalledTimes(1);
expect(harness.onHotReload).toHaveBeenCalledTimes(1);
expect(harness.log.warn).toHaveBeenCalledWith(
"config reload skipped (previous apply failed; waiting for config change)",
);
await harness.reloader.stop();
});
it("watches nested startup includes and does not apply root hash dedupe to include edits", async () => {
const includePaths = [
nodePath.normalize("/tmp/includes/outer.json5"),
nodePath.normalize("/tmp/includes/nested.json5"),
];
const readSnapshot = vi
.fn()
.mockResolvedValueOnce({
snapshot: makeZeroDebounceHookSnapshot("internal-include-1"),
includeFilePaths: includePaths,
})
.mockResolvedValueOnce({
snapshot: makeSnapshot({
sourceConfig: {
gateway: { reload: { debounceMs: 0 } },
hooks: { enabled: false },
},
runtimeConfig: {
gateway: { reload: { debounceMs: 0 } },
hooks: { enabled: false },
},
config: {
gateway: { reload: { debounceMs: 0 } },
hooks: { enabled: false },
},
hash: "internal-include-1",
}),
includeFilePaths: includePaths,
});
const watchers = [createWatcherMock(), createWatcherMock(), createWatcherMock()];
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: includePaths,
promoteSnapshot: vi.fn(async () => true),
watchers,
});
expect(harness.watchSpy.mock.calls.map((call) => call[0])).toEqual([
"/tmp/openclaw.json",
nodePath.normalize("/tmp/includes/nested.json5"),
nodePath.normalize("/tmp/includes/outer.json5"),
]);
harness.emitWrite(makeZeroDebounceHookWrite("internal-include-1"));
await vi.runOnlyPendingTimersAsync();
expect(harness.onHotReload).toHaveBeenCalledTimes(1);
watchers[2]?.emit("change", nodePath.normalize("/tmp/includes/outer.json5"));
await vi.runOnlyPendingTimersAsync();
expect(readSnapshot).toHaveBeenCalledTimes(2);
expect(harness.onHotReload).toHaveBeenCalledTimes(2);
await harness.reloader.stop();
});
it("clears a stale root write hash when an include-triggered read sees different root bytes", async () => {
const includePath = nodePath.normalize("/tmp/includes/active.json5");
const readSnapshot = vi
.fn()
.mockResolvedValueOnce({
snapshot: makeZeroDebounceHookSnapshot("external-root-2"),
includeFilePaths: [includePath],
})
.mockResolvedValueOnce({
snapshot: makeSnapshot({
sourceConfig: { gateway: { reload: { debounceMs: 0 } }, hooks: { enabled: false } },
runtimeConfig: { gateway: { reload: { debounceMs: 0 } }, hooks: { enabled: false } },
config: { gateway: { reload: { debounceMs: 0 } }, hooks: { enabled: false } },
hash: "internal-root-1",
}),
includeFilePaths: [includePath],
});
const watchers = [createWatcherMock(), createWatcherMock()];
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [includePath],
initialInternalWriteHash: "internal-root-1",
watchers,
});
watchers[1]?.emit("change", includePath);
await vi.runOnlyPendingTimersAsync();
expect(harness.onHotReload).toHaveBeenCalledTimes(1);
watchers[0]?.emit("change", nodePath.normalize("/tmp/openclaw.json"));
await vi.runOnlyPendingTimersAsync();
expect(readSnapshot).toHaveBeenCalledTimes(2);
expect(harness.onHotReload).toHaveBeenCalledTimes(2);
await harness.reloader.stop();
});
it("retries a failed include watcher handoff while the prior set stays active", async () => {
const rootPath = nodePath.normalize("/tmp/openclaw.json");
const oldInclude = nodePath.normalize("/tmp/includes/old.json5");
const nextInclude = nodePath.normalize("/tmp/includes/next.json5");
const nextSnapshot = {
snapshot: makeZeroDebounceHookSnapshot("graph-retry-1"),
includeFilePaths: [nextInclude],
};
const readSnapshot = vi
.fn()
.mockResolvedValueOnce(nextSnapshot)
.mockResolvedValueOnce(nextSnapshot);
const watchers = [
createWatcherMock(),
createWatcherMock(),
createWatcherMock(),
createWatcherMock(),
];
const [rootWatcher, oldWatcher, failedCandidate, retryCandidate] = watchers;
if (!rootWatcher || !oldWatcher || !failedCandidate || !retryCandidate) {
throw new Error("expected watcher mocks");
}
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [oldInclude],
watchers,
});
rootWatcher.emit("change", rootPath);
await vi.runOnlyPendingTimersAsync();
failedCandidate.emit("error", new Error("ENOSPC"));
failedCandidate.emit("ready");
expect(oldWatcher.close).not.toHaveBeenCalled();
expect(rootWatcher.close).not.toHaveBeenCalled();
expect(harness.watchSpy).toHaveBeenCalledTimes(3);
await vi.advanceTimersByTimeAsync(500);
expect(harness.watchSpy).toHaveBeenCalledTimes(4);
retryCandidate.emit("ready");
await vi.runOnlyPendingTimersAsync();
expect(oldWatcher.close).toHaveBeenCalledTimes(1);
expect(rootWatcher.close).not.toHaveBeenCalled();
expect(readSnapshot).toHaveBeenCalledTimes(2);
expect(harness.log.warn).toHaveBeenCalledWith(
expect.stringContaining("retrying replacement (attempt 1/3 in 500ms)"),
);
await harness.reloader.stop();
});
it("uses the include watcher's effective polling mode when retries are exhausted", async () => {
const originalVitest = process.env.VITEST;
const originalChokidarPolling = process.env.CHOKIDAR_USEPOLLING;
delete process.env.VITEST;
delete process.env.CHOKIDAR_USEPOLLING;
let harness: ReloaderHarness | undefined;
try {
const oldInclude = nodePath.normalize("/tmp/includes/old.json5");
const nextInclude = nodePath.normalize("/tmp/includes/next.json5");
const watchers = [
createWatcherMock(false),
createWatcherMock(false),
createWatcherMock(true),
createWatcherMock(true),
createWatcherMock(true),
createWatcherMock(true),
];
harness = createReloaderHarness(
vi.fn().mockResolvedValueOnce({
snapshot: makeZeroDebounceHookSnapshot("effective-polling"),
includeFilePaths: [nextInclude],
}),
{ initialIncludeFilePaths: [oldInclude], watchers },
);
watchers[0]?.emit("change", nodePath.normalize("/tmp/openclaw.json"));
await vi.runOnlyPendingTimersAsync();
for (const [index, delay] of [
[2, 500],
[3, 2000],
[4, 5000],
] as const) {
watchers[index]?.emit("error", new Error("polling failed"));
await vi.advanceTimersByTimeAsync(delay);
}
watchers[5]?.emit("error", new Error("polling failed"));
expect(harness.reloader.hotReloadStatus()).toBe("disabled");
expect(harness.log.error).toHaveBeenCalledWith(expect.stringContaining("in polling mode"));
expect(harness.log.warn).not.toHaveBeenCalledWith(
expect.stringContaining("degrading to polling mode"),
);
} finally {
if (originalVitest === undefined) {
delete process.env.VITEST;
} else {
process.env.VITEST = originalVitest;
}
if (originalChokidarPolling === undefined) {
delete process.env.CHOKIDAR_USEPOLLING;
} else {
process.env.CHOKIDAR_USEPOLLING = originalChokidarPolling;
}
await harness?.reloader.stop();
}
});
it("reconciles once the initial include watcher set is ready", async () => {
const includePath = nodePath.normalize("/tmp/includes/startup.json5");
const readSnapshot = vi.fn().mockResolvedValueOnce({
snapshot: makeZeroDebounceHookSnapshot("startup-include-ready"),
includeFilePaths: [includePath],
});
const watchers = [createWatcherMock(), createWatcherMock()];
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [includePath],
watchers,
});
watchers[1]?.emit("ready");
await vi.runOnlyPendingTimersAsync();
expect(readSnapshot).toHaveBeenCalledTimes(1);
await harness.reloader.stop();
});
it("reconciles a retained initial watcher after a graph change reverts before ready", async () => {
const rootPath = nodePath.normalize("/tmp/openclaw.json");
const initialInclude = nodePath.normalize("/tmp/includes/initial.json5");
const transientInclude = nodePath.normalize("/tmp/includes/transient.json5");
const initialSnapshot = {
snapshot: makeZeroDebounceHookSnapshot("initial-graph"),
includeFilePaths: [initialInclude],
};
const readSnapshot = vi
.fn()
.mockResolvedValueOnce({
snapshot: makeZeroDebounceHookSnapshot("transient-graph"),
includeFilePaths: [transientInclude],
})
.mockResolvedValueOnce(initialSnapshot)
.mockResolvedValueOnce(initialSnapshot);
const watchers = [createWatcherMock(), createWatcherMock(), createWatcherMock()];
const [rootWatcher, initialWatcher, transientCandidate] = watchers;
if (!rootWatcher || !initialWatcher || !transientCandidate) {
throw new Error("expected watcher mocks");
}
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [initialInclude],
watchers,
});
rootWatcher.emit("change", rootPath);
await vi.runOnlyPendingTimersAsync();
rootWatcher.emit("change", rootPath);
await vi.runOnlyPendingTimersAsync();
expect(transientCandidate.close).toHaveBeenCalledTimes(1);
initialWatcher.emit("ready");
await vi.runOnlyPendingTimersAsync();
expect(readSnapshot).toHaveBeenCalledTimes(3);
await harness.reloader.stop();
});
it("invalidates an active include watcher that errors during a newer graph handoff", async () => {
const rootPath = nodePath.normalize("/tmp/openclaw.json");
const oldInclude = nodePath.normalize("/tmp/includes/old.json5");
const nextInclude = nodePath.normalize("/tmp/includes/next.json5");
const nextSnapshot = {
snapshot: makeZeroDebounceHookSnapshot("graph-active-error"),
includeFilePaths: [nextInclude],
};
const readSnapshot = vi
.fn()
.mockResolvedValueOnce(nextSnapshot)
.mockResolvedValueOnce(nextSnapshot);
const watchers = [createWatcherMock(), createWatcherMock(), createWatcherMock()];
const [rootWatcher, oldWatcher, candidateWatcher] = watchers;
if (!rootWatcher || !oldWatcher || !candidateWatcher) {
throw new Error("expected watcher mocks");
}
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [oldInclude],
watchers,
});
rootWatcher.emit("change", rootPath);
await vi.runOnlyPendingTimersAsync();
oldWatcher.emit("error", new Error("active failed"));
expect(oldWatcher.close).toHaveBeenCalledTimes(1);
expect(rootWatcher.close).not.toHaveBeenCalled();
candidateWatcher.emit("ready");
await vi.runOnlyPendingTimersAsync();
expect(readSnapshot).toHaveBeenCalledTimes(2);
await harness.reloader.stop();
});
it("atomically swaps changed include graphs after ready and reconciles without watcher leaks", async () => {
const rootPath = nodePath.normalize("/tmp/openclaw.json");
const oldInclude = nodePath.normalize("/tmp/includes/old.json5");
const nextInclude = nodePath.normalize("/tmp/includes/next.json5");
const finalInclude = nodePath.normalize("/tmp/includes/final.json5");
const firstConfig: OpenClawConfig = {
gateway: { reload: { debounceMs: 0 } },
hooks: { enabled: true },
};
const finalConfig: OpenClawConfig = {
gateway: { reload: { debounceMs: 0 } },
hooks: { enabled: false },
};
const readSnapshot = vi
.fn()
.mockResolvedValueOnce({
snapshot: makeSnapshot({
sourceConfig: firstConfig,
runtimeConfig: firstConfig,
config: firstConfig,
hash: "graph-1",
}),
includeFilePaths: [nextInclude],
})
.mockResolvedValueOnce({
snapshot: makeSnapshot({
sourceConfig: firstConfig,
runtimeConfig: firstConfig,
config: firstConfig,
hash: "graph-1",
}),
includeFilePaths: [nextInclude],
})
.mockResolvedValueOnce({
snapshot: makeSnapshot({
sourceConfig: finalConfig,
runtimeConfig: finalConfig,
config: finalConfig,
hash: "graph-2",
}),
includeFilePaths: [finalInclude],
});
const watchers = [
createWatcherMock(),
createWatcherMock(),
createWatcherMock(),
createWatcherMock(),
];
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [oldInclude],
watchers,
});
const [rootWatcher, initialIncludeWatcher, replacementWatcher, pendingFinalWatcher] = watchers;
if (!rootWatcher || !initialIncludeWatcher || !replacementWatcher || !pendingFinalWatcher) {
throw new Error("expected watcher mocks");
}
rootWatcher.emit("change", rootPath);
await vi.runOnlyPendingTimersAsync();
expect(harness.watchSpy.mock.calls[2]?.[0]).toBe(nextInclude);
expect(rootWatcher.close).not.toHaveBeenCalled();
expect(initialIncludeWatcher.close).not.toHaveBeenCalled();
replacementWatcher.emit("change", nextInclude);
await vi.runOnlyPendingTimersAsync();
expect(readSnapshot).toHaveBeenCalledTimes(1);
replacementWatcher.emit("ready");
expect(initialIncludeWatcher.close).toHaveBeenCalledTimes(1);
expect(rootWatcher.close).not.toHaveBeenCalled();
await vi.runOnlyPendingTimersAsync();
expect(readSnapshot).toHaveBeenCalledTimes(2);
expect(harness.onHotReload).toHaveBeenCalledTimes(1);
initialIncludeWatcher.emit("change", oldInclude);
await vi.runOnlyPendingTimersAsync();
expect(readSnapshot).toHaveBeenCalledTimes(2);
rootWatcher.emit("change", rootPath);
await vi.runOnlyPendingTimersAsync();
expect(harness.watchSpy.mock.calls[3]?.[0]).toBe(finalInclude);
expect(harness.onHotReload).toHaveBeenCalledTimes(2);
await harness.reloader.stop();
expect(rootWatcher.close).toHaveBeenCalledTimes(1);
expect(initialIncludeWatcher.close).toHaveBeenCalledTimes(1);
expect(replacementWatcher.close).toHaveBeenCalledTimes(1);
expect(pendingFinalWatcher.close).toHaveBeenCalledTimes(1);
});
it("keeps the last valid include watch set when a candidate snapshot is invalid", async () => {
const rootPath = nodePath.normalize("/tmp/openclaw.json");
const acceptedInclude = nodePath.normalize("/tmp/includes/accepted.json5");
const rejectedInclude = nodePath.normalize("/tmp/includes/rejected.json5");
const nextConfig: OpenClawConfig = {
gateway: { reload: { debounceMs: 0 } },
hooks: { enabled: true },
};
const readSnapshot = vi
.fn()
.mockResolvedValueOnce({
snapshot: makeSnapshot({
valid: false,
issues: [{ path: "hooks.enabled", message: "Expected boolean" }],
hash: "invalid-graph",
}),
includeFilePaths: [rejectedInclude],
})
.mockResolvedValueOnce({
snapshot: makeSnapshot({
sourceConfig: nextConfig,
runtimeConfig: nextConfig,
config: nextConfig,
hash: "accepted-graph",
}),
includeFilePaths: [acceptedInclude],
});
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [acceptedInclude],
watchers: [createWatcherMock(), createWatcherMock()],
});
harness.watcher.emit("change", rootPath);
await vi.runOnlyPendingTimersAsync();
expect(harness.watchSpy).toHaveBeenCalledTimes(2);
harness.watchers[1]?.emit("change", acceptedInclude);
await vi.runOnlyPendingTimersAsync();
expect(harness.watchSpy).toHaveBeenCalledTimes(2);
expect(harness.onHotReload).toHaveBeenCalledTimes(1);
await harness.reloader.stop();
});
it("honors in-process write intent to skip reload", async () => {
const readSnapshot = vi
.fn<() => Promise<ConfigFileSnapshot>>()
@@ -2035,40 +1533,6 @@ describe("startGatewayConfigReloader", () => {
await harness.reloader.stop();
});
it("skips in-process promotion when includes change under the same root hash", async () => {
const oldInclude = nodePath.normalize("/tmp/includes/old.json5");
const nextInclude = nodePath.normalize("/tmp/includes/next.json5");
const changedByInclude: OpenClawConfig = {
gateway: { reload: { debounceMs: 0 } },
hooks: { enabled: false },
};
const readSnapshot = vi.fn().mockResolvedValueOnce({
snapshot: makeSnapshot({
sourceConfig: changedByInclude,
runtimeConfig: changedByInclude,
config: changedByInclude,
hash: "internal-1",
}),
includeFilePaths: [nextInclude],
});
const promoteSnapshot = vi.fn(async () => true);
const harness = createReloaderHarness(readSnapshot, {
initialIncludeFilePaths: [oldInclude],
promoteSnapshot,
watchers: [createWatcherMock(), createWatcherMock()],
});
harness.emitWrite(makeZeroDebounceHookWrite("internal-1"));
await vi.runOnlyPendingTimersAsync();
expect(harness.onHotReload).toHaveBeenCalledTimes(1);
expect(readSnapshot).toHaveBeenCalledTimes(1);
expect(promoteSnapshot).not.toHaveBeenCalled();
expect(harness.watchSpy).toHaveBeenCalledTimes(2);
await harness.reloader.stop();
});
it("dedupes the first watcher reread for startup internal writes", async () => {
const readSnapshot = vi
.fn<() => Promise<ConfigFileSnapshot>>()

View File

@@ -1,6 +1,5 @@
// Gateway config hot-reload watcher.
// Diffs config/plugin install snapshots and dispatches hot reload or restart plans.
import nodePath from "node:path";
import chokidar from "chokidar";
import type { ConfigWriteNotification } from "../config/io.js";
import { formatConfigIssueLines } from "../config/issue-format.js";
@@ -103,36 +102,6 @@ type GatewayConfigReloader = {
type PluginInstallRecords = Record<string, PluginInstallRecord>;
type ConfigReloadSnapshotReadResult =
| ConfigFileSnapshot
| {
snapshot: ConfigFileSnapshot;
includeFilePaths?: readonly string[];
};
function unpackConfigReloadSnapshot(result: ConfigReloadSnapshotReadResult): {
snapshot: ConfigFileSnapshot;
includeFilePaths?: readonly string[];
} {
return "snapshot" in result ? result : { snapshot: result };
}
function normalizeIncludeWatcherPaths(
rootPath: string,
includeFilePaths: readonly string[] = [],
): string[] {
const normalizedRoot = nodePath.normalize(rootPath);
const includes = new Set(
includeFilePaths.map((includePath) => nodePath.normalize(includePath)).filter(Boolean),
);
includes.delete(normalizedRoot);
return [...includes].toSorted((left, right) => left.localeCompare(right));
}
function watcherPathsEqual(left: readonly string[], right: readonly string[]): boolean {
return left.length === right.length && left.every((entry, index) => entry === right[index]);
}
function asPluginInstallConfig(records: PluginInstallRecords): OpenClawConfig {
return {
plugins: {
@@ -145,8 +114,7 @@ export function startGatewayConfigReloader(opts: {
initialConfig: OpenClawConfig;
initialCompareConfig?: OpenClawConfig;
initialInternalWriteHash?: string | null;
initialIncludeFilePaths?: readonly string[];
readSnapshot: () => Promise<ConfigReloadSnapshotReadResult>;
readSnapshot: () => Promise<ConfigFileSnapshot>;
onHotReload: (plan: GatewayReloadPlan, nextConfig: OpenClawConfig) => Promise<void>;
onRestart: (plan: GatewayReloadPlan, nextConfig: OpenClawConfig) => void | Promise<void>;
promoteSnapshot?: (snapshot: ConfigFileSnapshot, reason: string) => Promise<boolean>;
@@ -167,7 +135,6 @@ export function startGatewayConfigReloader(opts: {
let pending = false;
let running = false;
let stopped = false;
let pendingIncludeReload = false;
let restartQueued = false;
let missingConfigRetries = 0;
let pendingInProcessConfig: {
@@ -177,7 +144,6 @@ export function startGatewayConfigReloader(opts: {
afterWrite?: ConfigWriteNotification["afterWrite"];
} | null = null;
let lastAppliedWriteHash = opts.initialInternalWriteHash ?? null;
let currentApplyRejected = false;
let currentPluginInstallRecords =
opts.initialPluginInstallRecords ?? loadInstalledPluginIndexInstallRecordsSync();
const readPluginInstallRecords =
@@ -290,11 +256,7 @@ export function startGatewayConfigReloader(opts: {
currentPluginInstallRecords = nextPluginInstallRecords;
settings = resolveGatewayReloadSettings(nextConfig);
if (changedPaths.length === 0) {
if (currentApplyRejected) {
opts.log.warn("config reload skipped (previous apply failed; waiting for config change)");
return false;
}
return true;
return;
}
// Invalidate cached skills snapshots (persisted in sessions.json) whenever
@@ -311,21 +273,18 @@ export function startGatewayConfigReloader(opts: {
opts.log.info(`config change detected; evaluating reload (${changedPaths.join(", ")})`);
if (followUp.mode === "none") {
opts.log.info(`config reload skipped by writer intent (${followUp.reason})`);
currentApplyRejected = false;
return true;
return;
}
const plan = buildGatewayReloadPlan(changedPaths, {
noopPaths: pluginInstallTimestampNoopPaths,
forceChangedPaths: pluginInstallWholeRecordPaths,
});
if (isNoopReloadPlan(plan) && !followUp.requiresRestart) {
currentApplyRejected = false;
return true;
return;
}
if (settings.mode === "off") {
opts.log.info("config reload disabled (gateway.reload.mode=off)");
currentApplyRejected = false;
return true;
return;
}
if (followUp.requiresRestart) {
queueRestart(
@@ -336,13 +295,11 @@ export function startGatewayConfigReloader(opts: {
},
nextConfig,
);
currentApplyRejected = false;
return true;
return;
}
if (settings.mode === "restart") {
queueRestart(plan, nextConfig);
currentApplyRejected = false;
return true;
return;
}
if (plan.restartGateway) {
if (settings.mode === "hot") {
@@ -351,23 +308,13 @@ export function startGatewayConfigReloader(opts: {
", ",
)})`,
);
currentApplyRejected = false;
return true;
return;
}
queueRestart(plan, nextConfig);
currentApplyRejected = false;
return true;
return;
}
try {
await opts.onHotReload(plan, nextConfig);
currentApplyRejected = false;
return true;
} catch (err) {
currentApplyRejected = true;
opts.log.error(`config reload failed: ${String(err)}`);
return false;
}
await opts.onHotReload(plan, nextConfig);
};
const promoteAcceptedSnapshot = async (snapshot: ConfigFileSnapshot, reason: string) => {
@@ -381,26 +328,15 @@ export function startGatewayConfigReloader(opts: {
}
};
const promoteAcceptedInProcessWrite = async (
persistedHash: string,
acceptedCompareConfig: OpenClawConfig,
) => {
const promoteAcceptedInProcessWrite = async (persistedHash: string) => {
if (!opts.promoteSnapshot) {
return;
}
try {
const snapshotRead = unpackConfigReloadSnapshot(await opts.readSnapshot());
const snapshot = snapshotRead.snapshot;
if (
snapshot.hash !== persistedHash ||
!snapshot.valid ||
diffConfigPaths(acceptedCompareConfig, snapshot.sourceConfig).length > 0
) {
const snapshot = await opts.readSnapshot();
if (snapshot.hash !== persistedHash || !snapshot.valid) {
return;
}
if (snapshotRead.includeFilePaths) {
replaceWatchedPaths(snapshotRead.includeFilePaths);
}
await promoteAcceptedSnapshot(snapshot, "in-process-write");
} catch (err) {
opts.log.warn(`config reload in-process last-known-good promotion failed: ${String(err)}`);
@@ -425,31 +361,20 @@ export function startGatewayConfigReloader(opts: {
const pendingWrite = pendingInProcessConfig;
pendingInProcessConfig = null;
missingConfigRetries = 0;
const applied = await applySnapshot(
await applySnapshot(
pendingWrite.config,
pendingWrite.compareConfig,
pendingWrite.afterWrite,
);
if (!applied) {
if (lastAppliedWriteHash === pendingWrite.persistedHash) {
lastAppliedWriteHash = null;
}
return;
}
await promoteAcceptedInProcessWrite(pendingWrite.persistedHash, pendingWrite.compareConfig);
await promoteAcceptedInProcessWrite(pendingWrite.persistedHash);
return;
}
const bypassRootWriteHashDedupe = pendingIncludeReload;
pendingIncludeReload = false;
const snapshotRead = unpackConfigReloadSnapshot(await opts.readSnapshot());
const snapshot = snapshotRead.snapshot;
const snapshot = await opts.readSnapshot();
if (lastAppliedWriteHash && typeof snapshot.hash === "string") {
if (!bypassRootWriteHashDedupe && snapshot.hash === lastAppliedWriteHash) {
if (snapshot.hash === lastAppliedWriteHash) {
return;
}
if (snapshot.hash !== lastAppliedWriteHash) {
lastAppliedWriteHash = null;
}
lastAppliedWriteHash = null;
}
if (handleMissingSnapshot(snapshot)) {
return;
@@ -458,13 +383,7 @@ export function startGatewayConfigReloader(opts: {
handleInvalidSnapshot(snapshot);
return;
}
const applied = await applySnapshot(snapshot.config, snapshot.sourceConfig);
if (!applied) {
return;
}
if (snapshotRead.includeFilePaths) {
replaceWatchedPaths(snapshotRead.includeFilePaths);
}
await applySnapshot(snapshot.config, snapshot.sourceConfig);
await promoteAcceptedSnapshot(snapshot, "valid-config");
} catch (err) {
opts.log.error(`config reload failed: ${String(err)}`);
@@ -473,20 +392,11 @@ export function startGatewayConfigReloader(opts: {
if (pending) {
pending = false;
schedule();
} else if (pendingIncludeReload) {
scheduleAfter(0);
}
}
};
const normalizedRootWatchPath = nodePath.normalize(opts.watchPath);
const scheduleFromWatcher = (changedPath?: string) => {
if (
typeof changedPath === "string" &&
nodePath.normalize(changedPath) !== normalizedRootWatchPath
) {
pendingIncludeReload = true;
}
const scheduleFromWatcher = () => {
schedule();
};
@@ -505,254 +415,35 @@ export function startGatewayConfigReloader(opts: {
scheduleAfter(0);
}) ?? (() => {});
type ConfigWatcher = ReturnType<typeof chokidar.watch>;
type IncludeWatcherGroup = {
paths: string[];
watchers: ConfigWatcher[];
ready: Set<ConfigWatcher>;
usePolling: boolean;
};
const emptyIncludeGroup = (paths: string[] = []): IncludeWatcherGroup => ({
paths,
watchers: [],
ready: new Set(),
usePolling: false,
});
let watcher: ConfigWatcher | null = null;
let watcher: ReturnType<typeof chokidar.watch> | null = null;
let watcherRecreateRetries = 0;
let watcherRecreateTimer: ReturnType<typeof setTimeout> | null = null;
let rootHotReloadDisabled = false;
let hotReloadStatus: GatewayHotReloadStatus = "active";
let degradedToPolling = false;
let watcherUsesPolling = false;
const initialIncludePaths = normalizeIncludeWatcherPaths(
opts.watchPath,
opts.initialIncludeFilePaths,
);
let activeIncludeGroup = emptyIncludeGroup(initialIncludePaths);
let pendingIncludeGroup: IncludeWatcherGroup | null = null;
let desiredIncludePaths = initialIncludePaths;
let includeGeneration = 0;
let includeReplacementRetries = 0;
let includeReplacementTimer: ReturnType<typeof setTimeout> | null = null;
let includeHotReloadDisabled = false;
let includeDegradedToPolling = false;
const closeWatcher = (target: ConfigWatcher | null) => {
void target?.close().catch(() => {});
};
const closeIncludeGroup = (group: IncludeWatcherGroup | null) => {
for (const target of group?.watchers ?? []) {
closeWatcher(target);
}
};
const createWatcherInstance = (watchPath: string, usePolling: boolean): ConfigWatcher =>
chokidar.watch(watchPath, {
ignoreInitial: true,
awaitWriteFinish: { stabilityThreshold: 200, pollInterval: 50 },
usePolling,
});
const activateIncludeGroup = (group: IncludeWatcherGroup) => {
if (stopped || group !== pendingIncludeGroup) {
return;
}
const previous = activeIncludeGroup;
activeIncludeGroup = group;
pendingIncludeGroup = null;
includeReplacementRetries = 0;
includeHotReloadDisabled = false;
closeIncludeGroup(previous);
// Re-read once after the handoff so edits during candidate startup are
// reconciled without opening a gap between the old and new exact sets.
pendingIncludeReload = true;
schedule();
};
const scheduleIncludeReplacementRetry = (
generation: number,
failedWithPolling: boolean,
err: unknown,
) => {
if (stopped || generation !== includeGeneration) {
return;
}
if (includeReplacementRetries >= WATCHER_RECREATE_MAX_RETRIES) {
if (!failedWithPolling && resolveChokidarUsePolling(true)) {
includeDegradedToPolling = true;
includeReplacementRetries = 0;
opts.log.warn(
`config include watcher native retries exhausted; degrading to polling mode: ${String(err)}`,
);
includeReplacementTimer = setTimeout(() => {
includeReplacementTimer = null;
stageIncludeReplacement(generation);
}, WATCHER_RECREATE_BACKOFF_MS[0] ?? 500);
return;
}
const mode = failedWithPolling ? "polling mode" : "native mode";
includeHotReloadDisabled = true;
opts.log.error(
`config include hot-reload disabled: watcher failed after ${WATCHER_RECREATE_MAX_RETRIES} re-create attempts in ${mode}; keeping prior paths: ${String(err)}`,
);
return;
}
const backoff =
WATCHER_RECREATE_BACKOFF_MS[includeReplacementRetries] ??
WATCHER_RECREATE_BACKOFF_MS[WATCHER_RECREATE_BACKOFF_MS.length - 1] ??
0;
includeReplacementRetries += 1;
opts.log.warn(
`config include watcher error; retrying replacement (attempt ${includeReplacementRetries}/${WATCHER_RECREATE_MAX_RETRIES} in ${backoff}ms): ${String(err)}`,
);
includeReplacementTimer = setTimeout(() => {
includeReplacementTimer = null;
stageIncludeReplacement(generation);
}, backoff);
};
const createIncludeGroup = (paths: string[], generation: number): IncludeWatcherGroup => {
const usePolling = resolveChokidarUsePolling(includeDegradedToPolling);
const group: IncludeWatcherGroup = {
paths,
watchers: [],
ready: new Set(),
usePolling: false,
};
try {
for (const includePath of paths) {
const next = createWatcherInstance(includePath, usePolling);
group.watchers.push(next);
group.usePolling ||= Boolean(next.options.usePolling);
const scheduleIfActive = (changedPath: string) => {
if (group === activeIncludeGroup) {
scheduleFromWatcher(changedPath);
}
};
next.on("add", scheduleIfActive);
next.on("change", scheduleIfActive);
next.on("unlink", scheduleIfActive);
next.on("ready", () => {
if (stopped) {
return;
}
group.ready.add(next);
if (group.ready.size !== group.watchers.length) {
return;
}
if (group === pendingIncludeGroup) {
if (generation !== includeGeneration) {
return;
}
activateIncludeGroup(group);
} else if (group === activeIncludeGroup) {
pendingIncludeReload = true;
schedule();
}
});
next.on("error", (err) => {
if (stopped) {
return;
}
if (group === pendingIncludeGroup) {
if (generation !== includeGeneration) {
return;
}
pendingIncludeGroup = null;
closeIncludeGroup(group);
scheduleIncludeReplacementRetry(generation, group.usePolling, err);
return;
}
if (group === activeIncludeGroup) {
activeIncludeGroup = emptyIncludeGroup();
closeIncludeGroup(group);
if (!pendingIncludeGroup && !includeReplacementTimer) {
scheduleIncludeReplacementRetry(includeGeneration, group.usePolling, err);
}
}
});
}
return group;
} catch (err) {
closeIncludeGroup(group);
throw err;
}
};
function stageIncludeReplacement(generation: number) {
if (
stopped ||
generation !== includeGeneration ||
pendingIncludeGroup ||
watcherPathsEqual(desiredIncludePaths, activeIncludeGroup.paths)
) {
return;
}
if (desiredIncludePaths.length === 0) {
pendingIncludeGroup = emptyIncludeGroup();
activateIncludeGroup(pendingIncludeGroup);
return;
}
try {
pendingIncludeGroup = createIncludeGroup([...desiredIncludePaths], generation);
} catch (err) {
scheduleIncludeReplacementRetry(
generation,
resolveChokidarUsePolling(includeDegradedToPolling),
err,
);
}
}
const replaceWatchedPaths = (includeFilePaths: readonly string[]) => {
const nextPaths = normalizeIncludeWatcherPaths(opts.watchPath, includeFilePaths);
if (watcherPathsEqual(nextPaths, desiredIncludePaths)) {
return;
}
includeGeneration += 1;
desiredIncludePaths = nextPaths;
includeReplacementRetries = 0;
if (includeReplacementTimer) {
clearTimeout(includeReplacementTimer);
includeReplacementTimer = null;
}
const stagedGroup = pendingIncludeGroup;
pendingIncludeGroup = null;
closeIncludeGroup(stagedGroup);
if (watcherPathsEqual(nextPaths, activeIncludeGroup.paths)) {
includeHotReloadDisabled = false;
return;
}
stageIncludeReplacement(includeGeneration);
};
const createWatcher = () => {
if (stopped) {
return;
}
const next = createWatcherInstance(
opts.watchPath,
resolveChokidarUsePolling(degradedToPolling),
);
const usePolling = resolveChokidarUsePolling(degradedToPolling);
const next = chokidar.watch(opts.watchPath, {
ignoreInitial: true,
awaitWriteFinish: { stabilityThreshold: 200, pollInterval: 50 },
usePolling,
});
next.on("add", scheduleFromWatcher);
next.on("change", scheduleFromWatcher);
next.on("unlink", scheduleFromWatcher);
next.on("error", (err) => {
handleWatcherError(next, err);
});
watcher = next;
watcherUsesPolling = Boolean(next.options.usePolling);
rootHotReloadDisabled = false;
const scheduleIfActive = (changedPath: string) => {
if (next === watcher) {
scheduleFromWatcher(changedPath);
}
};
next.on("add", scheduleIfActive);
next.on("change", scheduleIfActive);
next.on("unlink", scheduleIfActive);
next.on("error", (err) => handleWatcherError(next, err));
watcherUsesPolling = next.options.usePolling;
hotReloadStatus = "active";
};
const handleWatcherError = (source: ConfigWatcher, err: unknown) => {
const handleWatcherError = (source: typeof watcher, err: unknown) => {
// Ignore stale errors from a watcher we already replaced or stopped.
if (stopped || source !== watcher) {
return;
@@ -760,7 +451,7 @@ export function startGatewayConfigReloader(opts: {
const failedWatcherUsedPolling = watcherUsesPolling;
watcher = null;
watcherUsesPolling = false;
closeWatcher(source);
void source?.close().catch(() => {});
if (watcherRecreateRetries >= WATCHER_RECREATE_MAX_RETRIES) {
// All native (inotify/kqueue) retries exhausted — fall back to polling
// mode so config hot-reload survives on hosts where inotify resources
@@ -778,7 +469,7 @@ export function startGatewayConfigReloader(opts: {
return;
}
const mode = failedWatcherUsedPolling ? "polling mode" : "native mode";
rootHotReloadDisabled = true;
hotReloadStatus = "disabled";
opts.log.error(
`config hot-reload disabled: watcher failed after ${WATCHER_RECREATE_MAX_RETRIES} re-create attempts in ${mode}: ${String(err)}`,
);
@@ -799,18 +490,6 @@ export function startGatewayConfigReloader(opts: {
};
createWatcher();
if (initialIncludePaths.length > 0) {
try {
activeIncludeGroup = createIncludeGroup(initialIncludePaths, includeGeneration);
} catch (err) {
activeIncludeGroup = emptyIncludeGroup();
scheduleIncludeReplacementRetry(
includeGeneration,
resolveChokidarUsePolling(includeDegradedToPolling),
err,
);
}
}
return {
stop: async () => {
@@ -823,26 +502,11 @@ export function startGatewayConfigReloader(opts: {
clearTimeout(watcherRecreateTimer);
watcherRecreateTimer = null;
}
if (includeReplacementTimer) {
clearTimeout(includeReplacementTimer);
includeReplacementTimer = null;
}
unsubscribeFromWrites();
const rootWatcher = watcher;
const activeIncludes = activeIncludeGroup;
const stagedIncludes = pendingIncludeGroup;
const active = watcher;
watcher = null;
activeIncludeGroup = emptyIncludeGroup();
pendingIncludeGroup = null;
await Promise.all(
[
...(rootWatcher ? [rootWatcher] : []),
...activeIncludes.watchers,
...(stagedIncludes?.watchers ?? []),
].map(async (target) => await target.close().catch(() => {})),
);
await active?.close().catch(() => {});
},
hotReloadStatus: () =>
rootHotReloadDisabled || includeHotReloadDisabled ? "disabled" : "active",
hotReloadStatus: () => hotReloadStatus,
};
}

View File

@@ -193,9 +193,8 @@ type ManagedGatewayConfigReloaderParams = Omit<
initialConfig: OpenClawConfig;
initialCompareConfig?: OpenClawConfig;
initialInternalWriteHash: string | null;
initialIncludeFilePaths?: readonly string[];
watchPath: string;
readSnapshot: typeof import("../config/config.js").readConfigFileSnapshotWithPluginMetadata;
readSnapshot: typeof import("../config/config.js").readConfigFileSnapshot;
promoteSnapshot: typeof import("../config/config.js").promoteConfigSnapshotToLastKnownGood;
subscribeToWrites: typeof import("../config/config.js").registerConfigWriteListener;
logReload: GatewayReloadLog & {
@@ -682,7 +681,6 @@ export function startManagedGatewayConfigReloader(params: ManagedGatewayConfigRe
initialConfig: params.initialConfig,
initialCompareConfig: params.initialCompareConfig,
initialInternalWriteHash: params.initialInternalWriteHash,
initialIncludeFilePaths: params.initialIncludeFilePaths,
readSnapshot: params.readSnapshot,
promoteSnapshot: async (snapshot, _reason) => await params.promoteSnapshot(snapshot),
subscribeToWrites: params.subscribeToWrites,

View File

@@ -99,7 +99,6 @@ function secretsPrepareTimelineAttributes(
export type GatewayStartupConfigSnapshotLoadResult = {
snapshot: ConfigFileSnapshot;
wroteConfig: boolean;
includeFilePaths?: readonly string[];
pluginMetadataSnapshot?: PluginMetadataSnapshot;
};
@@ -144,7 +143,6 @@ export async function loadGatewayStartupConfigSnapshot(params: {
return {
snapshot: configSnapshot,
wroteConfig,
...(snapshotRead.includeFilePaths ? { includeFilePaths: snapshotRead.includeFilePaths } : {}),
...(pluginMetadataSnapshot ? { pluginMetadataSnapshot } : {}),
};
}
@@ -155,7 +153,6 @@ export async function loadGatewayStartupConfigSnapshot(params: {
return {
snapshot: withRuntimeConfig(configSnapshot, autoEnable.config),
wroteConfig,
...(snapshotRead.includeFilePaths ? { includeFilePaths: snapshotRead.includeFilePaths } : {}),
...(pluginMetadataSnapshot ? { pluginMetadataSnapshot } : {}),
};
}

View File

@@ -17,7 +17,7 @@ import { isRestartEnabled } from "../config/commands.flags.js";
import {
getRuntimeConfig,
promoteConfigSnapshotToLastKnownGood,
readConfigFileSnapshotWithPluginMetadata,
readConfigFileSnapshot,
registerConfigWriteListener,
setRuntimeConfigSnapshot,
type ReadConfigFileSnapshotWithPluginMetadataResult,
@@ -638,7 +638,6 @@ export async function startGatewayServer(
let cfgAtStart: OpenClawConfig;
let startupInternalWriteHash: string | null = null;
let startupLastGoodSnapshot = configSnapshot;
let startupIncludeFilePaths = startupConfigLoad.includeFilePaths;
const startupActivationSourceConfig = configSnapshot.sourceConfig;
const startupRuntimeConfig = applyConfigOverrides(configSnapshot.config);
startupTrace.setConfig(startupRuntimeConfig);
@@ -695,15 +694,11 @@ export async function startGatewayServer(
// Keep the old startup-write suppression path intact for compatibility with
// callers that may still report a write, but startup itself no longer mutates config.
if (startupConfigLoad.wroteConfig || authBootstrap.persistedGeneratedToken) {
const startupSnapshotRead = await startupTrace.measure("config.final-snapshot", () =>
readConfigFileSnapshotWithPluginMetadata(),
const startupSnapshot = await startupTrace.measure("config.final-snapshot", () =>
readConfigFileSnapshot(),
);
const startupSnapshot = startupSnapshotRead.snapshot;
startupInternalWriteHash = startupSnapshot.hash ?? null;
startupLastGoodSnapshot = startupSnapshot;
if (startupSnapshotRead.includeFilePaths) {
startupIncludeFilePaths = startupSnapshotRead.includeFilePaths;
}
}
setRuntimeConfigSnapshot(cfgAtStart, startupLastGoodSnapshot.sourceConfig);
const { prepareGatewayPluginBootstrap } = await loadStartupPluginsModule();
@@ -1732,9 +1727,8 @@ export async function startGatewayServer(
initialConfig: cfgAtStart,
initialCompareConfig: startupLastGoodSnapshot.sourceConfig,
initialInternalWriteHash: startupInternalWriteHash,
initialIncludeFilePaths: startupIncludeFilePaths,
watchPath: configSnapshot.path,
readSnapshot: readConfigFileSnapshotWithPluginMetadata,
readSnapshot: readConfigFileSnapshot,
promoteSnapshot: promoteConfigSnapshotToLastKnownGood,
subscribeToWrites: registerConfigWriteListener,
deps,