fix(plugins): snapshot session scheduler jobs

This commit is contained in:
Vincent Koc
2026-06-05 14:09:23 +02:00
parent d13be933fd
commit 59dfd8aa68
3 changed files with 240 additions and 12 deletions

View File

@@ -0,0 +1,121 @@
// Session scheduler registration tests cover plugin-owned job snapshotting.
import {
createPluginRegistryFixture,
registerTestPlugin,
} from "openclaw/plugin-sdk/plugin-test-contracts";
import { afterEach, describe, expect, it } from "vitest";
import { runPluginHostCleanup } from "../host-hook-cleanup.js";
import {
cleanupPluginSessionSchedulerJobs,
clearPluginHostRuntimeState,
listPluginSessionSchedulerJobs,
registerPluginSessionSchedulerJob,
} from "../host-hook-runtime.js";
import type { PluginSessionSchedulerJobRegistration } from "../host-hooks.js";
import { createEmptyPluginRegistry } from "../registry-empty.js";
import { setActivePluginRegistry } from "../runtime.js";
import { createPluginRecord } from "../status.test-helpers.js";
describe("plugin session scheduler registration", () => {
afterEach(() => {
setActivePluginRegistry(createEmptyPluginRegistry());
clearPluginHostRuntimeState();
});
it("snapshots scheduler job callbacks before host cleanup", async () => {
let idReads = 0;
let cleanupReads = 0;
const cleanupEvents: string[] = [];
const { config, registry } = createPluginRegistryFixture();
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "volatile-scheduler",
name: "Volatile Scheduler",
}),
register(api) {
api.registerSessionSchedulerJob({
get id() {
idReads += 1;
if (idReads > 1) {
throw new Error("job id getter re-read");
}
return "job-cleanup";
},
sessionKey: "agent:main:main",
kind: "session-turn",
description: "Cleanup job",
get cleanup() {
cleanupReads += 1;
if (cleanupReads > 1) {
throw new Error("cleanup getter re-read");
}
return ({ reason }) => {
cleanupEvents.push(reason);
};
},
} as PluginSessionSchedulerJobRegistration);
},
});
setActivePluginRegistry(registry.registry);
expect(registry.registry.sessionSchedulerJobs?.[0]?.job.description).toBe("Cleanup job");
expect(idReads).toBe(1);
expect(cleanupReads).toBe(1);
await expect(
runPluginHostCleanup({
registry: registry.registry,
pluginId: "volatile-scheduler",
reason: "disable",
sessionStorePaths: [],
}),
).resolves.toEqual({ cleanupCount: 0, failures: [] });
expect(cleanupEvents).toEqual(["disable"]);
expect(idReads).toBe(1);
expect(cleanupReads).toBe(1);
});
it("snapshots runtime scheduler jobs before storing cleanup state", async () => {
let idReads = 0;
const cleanupEvents: string[] = [];
expect(
registerPluginSessionSchedulerJob({
pluginId: "runtime-scheduler",
pluginName: "Runtime Scheduler",
job: {
get id() {
idReads += 1;
if (idReads > 1) {
throw new Error("runtime job id getter re-read");
}
return "runtime-job";
},
sessionKey: "agent:main:main",
kind: "session-turn",
cleanup({ reason }) {
cleanupEvents.push(reason);
},
} as PluginSessionSchedulerJobRegistration,
}),
).toEqual({
id: "runtime-job",
pluginId: "runtime-scheduler",
sessionKey: "agent:main:main",
kind: "session-turn",
});
expect(idReads).toBe(1);
await expect(
cleanupPluginSessionSchedulerJobs({
pluginId: "runtime-scheduler",
reason: "disable",
}),
).resolves.toEqual([]);
expect(cleanupEvents).toEqual(["disable"]);
expect(listPluginSessionSchedulerJobs("runtime-scheduler")).toEqual([]);
expect(idReads).toBe(1);
});
});

View File

@@ -59,6 +59,28 @@ function normalizeNamespace(value: string | undefined): string {
return (value ?? "").trim();
}
function readSessionSchedulerJobFields(job: PluginSessionSchedulerJobRegistration):
| {
id: unknown;
sessionKey: unknown;
kind: unknown;
description: unknown;
cleanup: unknown;
}
| undefined {
try {
return {
id: job.id,
sessionKey: job.sessionKey,
kind: job.kind,
description: job.description,
cleanup: job.cleanup,
};
} catch {
return undefined;
}
}
function copyJsonValue(value: PluginJsonValue): PluginJsonValue {
return structuredClone(value);
}
@@ -367,10 +389,17 @@ export function registerPluginSessionSchedulerJob(params: {
job: PluginSessionSchedulerJobRegistration;
ownerRegistry?: PluginRegistry;
}): PluginSessionSchedulerJobHandle | undefined {
const id = normalizeOptionalString(params.job.id);
const sessionKey = normalizeOptionalString(params.job.sessionKey);
const kind = normalizeOptionalString(params.job.kind);
if (!id || !sessionKey || !kind) {
const fields = readSessionSchedulerJobFields(params.job);
const id = normalizeOptionalString(typeof fields?.id === "string" ? fields.id : undefined);
const sessionKey = normalizeOptionalString(
typeof fields?.sessionKey === "string" ? fields.sessionKey : undefined,
);
const kind = normalizeOptionalString(typeof fields?.kind === "string" ? fields.kind : undefined);
const cleanup = fields?.cleanup;
if (!fields || !id || !sessionKey || !kind) {
return undefined;
}
if (cleanup !== undefined && typeof cleanup !== "function") {
return undefined;
}
const state = getPluginHostRuntimeState();
@@ -379,7 +408,15 @@ export function registerPluginSessionSchedulerJob(params: {
jobs.set(id, {
pluginId: params.pluginId,
pluginName: params.pluginName,
job: { ...params.job, id, sessionKey, kind },
job: {
id,
sessionKey,
kind,
...(fields.description !== undefined ? { description: fields.description as string } : {}),
...(cleanup !== undefined
? { cleanup: cleanup as PluginSessionSchedulerJobRegistration["cleanup"] }
: {}),
},
generation,
...(params.ownerRegistry ? { ownerRegistry: params.ownerRegistry } : {}),
});

View File

@@ -2562,13 +2562,70 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
});
};
const readSessionSchedulerJobFields = (
record: PluginRecord,
job: PluginSessionSchedulerJobRegistration,
):
| {
id: unknown;
sessionKey: unknown;
kind: unknown;
description: unknown;
cleanup: unknown;
}
| undefined => {
let id: unknown;
try {
id = job.id;
return {
id,
sessionKey: job.sessionKey,
kind: job.kind,
description: job.description,
cleanup: job.cleanup,
};
} catch (error) {
const normalizedId = normalizeOptionalHostHookString(id);
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message:
`session scheduler job registration has unreadable fields` +
`${normalizedId ? `: ${normalizedId}` : ""}: ${formatErrorMessage(error)}`,
});
return undefined;
}
};
const createSessionSchedulerJobSnapshot = (params: {
id: string;
sessionKey: string;
kind: string;
description: unknown;
cleanup: unknown;
}): PluginSessionSchedulerJobRegistration => ({
id: params.id,
sessionKey: params.sessionKey,
kind: params.kind,
...(params.description !== undefined ? { description: params.description as string } : {}),
...(params.cleanup !== undefined
? { cleanup: params.cleanup as PluginSessionSchedulerJobRegistration["cleanup"] }
: {}),
});
const registerSessionSchedulerJob = (
record: PluginRecord,
job: PluginSessionSchedulerJobRegistration,
) => {
const jobId = normalizeHostHookString(job.id);
const sessionKey = normalizeHostHookString(job.sessionKey);
const kind = normalizeHostHookString(job.kind);
const fields = readSessionSchedulerJobFields(record, job);
if (!fields) {
return undefined;
}
const jobId = normalizeHostHookString(fields.id);
const sessionKey = normalizeHostHookString(fields.sessionKey);
const kind = normalizeHostHookString(fields.kind);
const cleanup = fields.cleanup;
if (
jobId &&
(registry.sessionSchedulerJobs ?? []).some(
@@ -2592,7 +2649,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
});
return undefined;
}
if (job.cleanup !== undefined && typeof job.cleanup !== "function") {
if (cleanup !== undefined && typeof cleanup !== "function") {
pushDiagnostic({
level: "error",
pluginId: record.id,
@@ -2601,11 +2658,18 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
});
return undefined;
}
const normalizedJob = createSessionSchedulerJobSnapshot({
id: jobId,
sessionKey,
kind,
description: fields.description,
cleanup,
});
if (registryParams.activateGlobalSideEffects === false) {
(registry.sessionSchedulerJobs ??= []).push({
pluginId: record.id,
pluginName: record.name,
job: { ...job, id: jobId, sessionKey, kind },
job: normalizedJob,
source: record.source,
rootDir: record.rootDir,
});
@@ -2615,7 +2679,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
pluginId: record.id,
pluginName: record.name,
ownerRegistry: registry,
job: { ...job, id: jobId, sessionKey, kind },
job: normalizedJob,
});
if (!handle) {
pushDiagnostic({
@@ -2629,7 +2693,13 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
(registry.sessionSchedulerJobs ??= []).push({
pluginId: record.id,
pluginName: record.name,
job: { ...job, id: handle.id, sessionKey: handle.sessionKey, kind: handle.kind },
job: createSessionSchedulerJobSnapshot({
id: handle.id,
sessionKey: handle.sessionKey,
kind: handle.kind,
description: fields.description,
cleanup,
}),
generation: getPluginSessionSchedulerJobGeneration({
pluginId: record.id,
jobId: handle.id,