fix(cron): auto-migrate legacy cron store (#90208)

Merged via squash.

Prepared head SHA: f5aa1b6759
Co-authored-by: MonkeyLeeT <6754057+MonkeyLeeT@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
Ted Li
2026-06-05 10:22:02 -07:00
committed by GitHub
parent 4752e9a67d
commit 21aa297434
3 changed files with 237 additions and 118 deletions

View File

@@ -10,6 +10,9 @@ const autoMigrateLegacyState = vi.hoisted(() =>
const autoMigrateLegacyTaskStateSidecars = vi.hoisted(() =>
vi.fn(async () => ({ migrated: true, skipped: false, changes: ["task-imported"], warnings: [] })),
);
const repairLegacyCronStoreWithoutPrompt = vi.hoisted(() =>
vi.fn(async () => ({ changes: ["cron-imported"], warnings: [] })),
);
const readConfigFileSnapshot = vi.hoisted(() =>
vi.fn(async () => ({
exists: true,
@@ -29,6 +32,10 @@ vi.mock("./doctor-state-migrations.js", () => ({
autoMigrateLegacyTaskStateSidecars,
}));
vi.mock("./doctor/cron/index.js", () => ({
repairLegacyCronStoreWithoutPrompt,
}));
vi.mock("../config/io.js", () => ({
readConfigFileSnapshot,
recoverConfigFromJsonRootSuffix: vi.fn(),
@@ -48,11 +55,15 @@ describe("runDoctorConfigPreflight state migration", () => {
expect(autoMigrateLegacyStateDir).toHaveBeenCalledOnce();
expect(readConfigFileSnapshot).toHaveBeenCalledOnce();
expect(repairLegacyCronStoreWithoutPrompt).toHaveBeenCalledWith({
cfg: { gateway: { mode: "local", port: 19091 } },
});
expect(autoMigrateLegacyState).toHaveBeenCalledWith({
cfg: { gateway: { mode: "local", port: 19091 } },
env: process.env,
recoverCorruptTargetStore: undefined,
});
expect(note).toHaveBeenCalledWith("- cron-imported", "Doctor changes");
expect(note).toHaveBeenCalledWith("- imported", "Doctor changes");
});
@@ -90,6 +101,7 @@ describe("runDoctorConfigPreflight state migration", () => {
});
expect(autoMigrateLegacyState).not.toHaveBeenCalled();
expect(repairLegacyCronStoreWithoutPrompt).not.toHaveBeenCalled();
expect(autoMigrateLegacyTaskStateSidecars).toHaveBeenCalledWith({ env: process.env });
expect(note).toHaveBeenCalledWith("- task-imported", "Doctor changes");
});

View File

@@ -16,14 +16,21 @@ import { noteIncludeConfinementWarning } from "./doctor-config-analysis.js";
import { findDoctorLegacyConfigIssues } from "./doctor/shared/legacy-config-issues.js";
type DoctorStateMigrationsModule = typeof import("./doctor-state-migrations.js");
type DoctorCronModule = typeof import("./doctor/cron/index.js");
let doctorStateMigrationsPromise: Promise<DoctorStateMigrationsModule> | null = null;
let doctorCronPromise: Promise<DoctorCronModule> | null = null;
function loadDoctorStateMigrations(): Promise<DoctorStateMigrationsModule> {
doctorStateMigrationsPromise ??= import("./doctor-state-migrations.js");
return doctorStateMigrationsPromise;
}
function loadDoctorCron(): Promise<DoctorCronModule> {
doctorCronPromise ??= import("./doctor/cron/index.js");
return doctorCronPromise;
}
async function maybeMigrateLegacyConfig(): Promise<string[]> {
const changes: string[] = [];
const home = resolveHomeDir();
@@ -174,6 +181,11 @@ export async function runDoctorConfigPreflight(
const baseConfig = snapshot.sourceConfig ?? snapshot.config ?? {};
if (options.migrateState !== false) {
if (snapshot.valid) {
const { repairLegacyCronStoreWithoutPrompt } = await loadDoctorCron();
const cronResult = await repairLegacyCronStoreWithoutPrompt({ cfg: baseConfig });
noteStateMigrationResult(cronResult);
}
const { autoMigrateLegacyState, autoMigrateLegacyTaskStateSidecars } =
await loadDoctorStateMigrations();
const stateResult = snapshot.valid

View File

@@ -52,22 +52,37 @@ function formatRunLogMigrationNote(importedFiles: number): string {
: "";
}
/** Inspect cron storage and optionally repair legacy JSON/SQLite/payload shapes. */
export async function maybeRepairLegacyCronStore(params: {
function errorMessage(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}
type LegacyCronRepairState = {
storePath: string;
quarantinePath: string;
legacyStoreDetected: boolean;
legacyRunLogDetected: boolean;
legacyImportCount: number;
sqliteProjectionBackfillCount: number;
rawJobs: Array<Record<string, unknown>>;
};
export type LegacyCronRepairResult = {
changes: string[];
warnings: string[];
};
async function loadLegacyCronRepairState(params: {
cfg: OpenClawConfig;
options: DoctorOptions;
prompter: Pick<DoctorPrompter, "confirm">;
}) {
onlyIfLegacyDetected?: boolean;
}): Promise<LegacyCronRepairState | null> {
const storePath = resolveCronJobsStorePath(params.cfg.cron?.store);
const quarantinePath = resolveCronQuarantinePath(storePath);
let store: Awaited<ReturnType<typeof loadCronJobsStoreWithConfigJobs>>["store"];
let legacyStoreDetected;
let legacyRunLogDetected;
let legacyImportCount = 0;
let sqliteProjectionBackfillCount;
try {
legacyStoreDetected = await legacyCronStoreFilesExist(storePath);
legacyRunLogDetected = await legacyCronRunLogFilesExist(storePath);
const legacyStoreDetected = await legacyCronStoreFilesExist(storePath);
const legacyRunLogDetected = await legacyCronRunLogFilesExist(storePath);
if (params.onlyIfLegacyDetected && !legacyStoreDetected && !legacyRunLogDetected) {
return null;
}
const loaded = await loadCronJobsStoreWithConfigJobs(storePath);
const currentJobs =
loaded.configJobs.length > 0
@@ -78,7 +93,7 @@ export async function maybeRepairLegacyCronStore(params: {
}),
)
: (loaded.store.jobs as unknown as Array<Record<string, unknown>>);
sqliteProjectionBackfillCount =
const sqliteProjectionBackfillCount =
loaded.configJobs.length > 0
? currentJobs.filter((job, index) =>
needsSqliteProjectionBackfill({
@@ -87,18 +102,162 @@ export async function maybeRepairLegacyCronStore(params: {
}),
).length
: 0;
store = { version: 1, jobs: currentJobs as unknown as CronJob[] };
let rawJobs = currentJobs;
let legacyImportCount = 0;
if (legacyStoreDetected) {
const legacyStore = (await loadLegacyCronStoreForMigration(storePath)).store;
const merged = mergeLegacyCronJobs({
currentJobs: store.jobs as unknown as Array<Record<string, unknown>>,
currentJobs: rawJobs,
legacyJobs: legacyStore.jobs as unknown as Array<Record<string, unknown>>,
});
rawJobs = merged.jobs;
legacyImportCount = merged.importedCount;
store = { version: 1, jobs: merged.jobs as unknown as CronJob[] };
}
return {
storePath,
quarantinePath,
legacyStoreDetected,
legacyRunLogDetected,
legacyImportCount,
sqliteProjectionBackfillCount,
rawJobs,
};
}
async function applyLegacyCronStoreRepair(params: {
cfg: OpenClawConfig;
state: LegacyCronRepairState;
normalized?: ReturnType<typeof normalizeStoredCronJobs>;
}): Promise<LegacyCronRepairResult> {
const { state } = params;
const changes: string[] = [];
const warnings: string[] = [];
const normalized = params.normalized ?? normalizeStoredCronJobs(state.rawJobs);
const legacyWebhook = normalizeOptionalString(params.cfg.cron?.webhook);
const notifyMigration = migrateLegacyNotifyFallback({
jobs: state.rawJobs,
legacyWebhook,
});
const dreamingMigration = migrateLegacyDreamingPayloadShape(state.rawJobs);
warnings.push(...notifyMigration.warnings);
const changed =
state.legacyStoreDetected ||
state.legacyRunLogDetected ||
state.sqliteProjectionBackfillCount > 0 ||
normalized.mutated ||
notifyMigration.changed ||
dreamingMigration.changed;
if (!changed && warnings.length === 0) {
return { changes, warnings };
}
if (changed) {
try {
if (normalized.removedJobs.length > 0) {
await saveCronQuarantineFile({
storePath: state.storePath,
nowMs: Date.now(),
entries: normalized.removedJobs.map((entry) => ({
sourceIndex: entry.sourceIndex,
reason: entry.reason,
job: entry.job,
})),
});
}
await saveCronJobsStore(state.storePath, {
version: 1,
jobs: state.rawJobs as unknown as CronJob[],
});
} catch (err) {
return {
changes,
warnings: [
...warnings,
`Failed writing migrated cron store at ${shortenHomePath(state.storePath)}: ${errorMessage(err)}`,
],
};
}
}
let importedRunLogs = 0;
if (state.legacyRunLogDetected) {
try {
importedRunLogs = (await migrateLegacyCronRunLogsToSqlite(state.storePath)).importedFiles;
} catch (err) {
warnings.push(
`Failed importing legacy cron run logs at ${shortenHomePath(state.storePath)}: ${errorMessage(err)}`,
);
}
}
if (state.legacyStoreDetected) {
await archiveLegacyCronStoreForMigration(state.storePath);
changes.push(
`Cron store migrated to SQLite at ${shortenHomePath(state.storePath)}.${formatRunLogMigrationNote(importedRunLogs)}`,
);
} else if (state.legacyRunLogDetected && importedRunLogs > 0) {
changes.push(
`Cron run logs migrated to SQLite at ${shortenHomePath(state.storePath)}.${formatRunLogMigrationNote(importedRunLogs)}`,
);
} else if (changed) {
changes.push(`Cron store normalized at ${shortenHomePath(state.storePath)}.`);
}
if (dreamingMigration.rewrittenCount > 0) {
changes.push(
`Rewrote ${pluralize(dreamingMigration.rewrittenCount, "managed dreaming job")} to run as an isolated agent turn so dreaming no longer requires heartbeat.`,
);
}
return { changes, warnings };
}
export async function repairLegacyCronStoreWithoutPrompt(params: {
cfg: OpenClawConfig;
}): Promise<LegacyCronRepairResult> {
const storePath = resolveCronJobsStorePath(params.cfg.cron?.store);
let state: LegacyCronRepairState | null;
try {
state = await loadLegacyCronRepairState({
cfg: params.cfg,
onlyIfLegacyDetected: true,
});
} catch (err) {
return {
changes: [],
warnings: [
`Failed reading legacy cron storage at ${shortenHomePath(storePath)}: ${errorMessage(err)}`,
],
};
}
if (!state) {
return { changes: [], warnings: [] };
}
return await applyLegacyCronStoreRepair({ cfg: params.cfg, state });
}
function noteLegacyCronRepairResult(result: LegacyCronRepairResult): void {
if (result.changes.length > 0) {
note(result.changes.join("\n"), "Doctor changes");
}
if (result.warnings.length > 0) {
note(result.warnings.join("\n"), "Doctor warnings");
}
}
/** Inspect cron storage and optionally repair legacy JSON/SQLite/payload shapes. */
export async function maybeRepairLegacyCronStore(params: {
cfg: OpenClawConfig;
options: DoctorOptions;
prompter: Pick<DoctorPrompter, "confirm">;
}) {
let state: LegacyCronRepairState | null;
try {
state = await loadLegacyCronRepairState({ cfg: params.cfg });
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
const storePath = resolveCronJobsStorePath(params.cfg.cron?.store);
note(
[
`Unable to read cron job store at ${shortenHomePath(storePath)}.`,
@@ -109,6 +268,18 @@ export async function maybeRepairLegacyCronStore(params: {
);
return;
}
if (!state) {
return;
}
const {
storePath,
quarantinePath,
legacyStoreDetected,
legacyRunLogDetected,
legacyImportCount,
sqliteProjectionBackfillCount,
rawJobs,
} = state;
try {
const quarantine = await loadCronQuarantineFile(quarantinePath);
if (quarantine.jobs.length > 0) {
@@ -131,7 +302,6 @@ export async function maybeRepairLegacyCronStore(params: {
"Cron",
);
}
const rawJobs = (store.jobs ?? []) as unknown as Array<Record<string, unknown>>;
if (rawJobs.length === 0) {
if (!legacyStoreDetected && !legacyRunLogDetected) {
return;
@@ -158,30 +328,12 @@ export async function maybeRepairLegacyCronStore(params: {
if (!shouldRepair) {
return;
}
if (legacyStoreDetected) {
await saveCronJobsStore(storePath, { version: 1, jobs: [] });
await archiveLegacyCronStoreForMigration(storePath);
}
const runLogMigration = legacyRunLogDetected
? await migrateLegacyCronRunLogsToSqlite(storePath)
: { importedFiles: 0 };
if (legacyStoreDetected) {
note(
`Cron store migrated to SQLite at ${shortenHomePath(storePath)}.${formatRunLogMigrationNote(runLogMigration.importedFiles)}`,
"Doctor changes",
);
} else {
note(
`Cron run logs migrated to SQLite at ${shortenHomePath(storePath)}.${formatRunLogMigrationNote(runLogMigration.importedFiles)}`,
"Doctor changes",
);
}
noteLegacyCronRepairResult(await applyLegacyCronStoreRepair({ cfg: params.cfg, state }));
return;
}
noteCronModelOverrides({ cfg: params.cfg, jobs: rawJobs, storePath });
const normalized = normalizeStoredCronJobs(rawJobs);
const legacyWebhook = normalizeOptionalString(params.cfg.cron?.webhook);
const notifyCount = rawJobs.filter((job) => job.notify === true).length;
const dreamingStaleCount = countStaleDreamingJobs(rawJobs);
const previewLines = formatLegacyIssuePreview(normalized.issues);
@@ -231,64 +383,7 @@ export async function maybeRepairLegacyCronStore(params: {
return;
}
const notifyMigration = migrateLegacyNotifyFallback({
jobs: rawJobs,
legacyWebhook,
});
const dreamingMigration = migrateLegacyDreamingPayloadShape(rawJobs);
const changed =
legacyStoreDetected ||
legacyRunLogDetected ||
sqliteProjectionBackfillCount > 0 ||
normalized.mutated ||
notifyMigration.changed ||
dreamingMigration.changed;
if (!changed && notifyMigration.warnings.length === 0) {
return;
}
if (changed) {
if (normalized.removedJobs.length > 0) {
await saveCronQuarantineFile({
storePath,
nowMs: Date.now(),
entries: normalized.removedJobs.map((entry) => ({
sourceIndex: entry.sourceIndex,
reason: entry.reason,
job: entry.job,
})),
});
}
await saveCronJobsStore(storePath, {
version: 1,
jobs: rawJobs as unknown as CronJob[],
});
const runLogMigration = legacyRunLogDetected
? await migrateLegacyCronRunLogsToSqlite(storePath)
: { importedFiles: 0 };
if (legacyStoreDetected) {
await archiveLegacyCronStoreForMigration(storePath);
note(
`Cron store migrated to SQLite at ${shortenHomePath(storePath)}.${formatRunLogMigrationNote(runLogMigration.importedFiles)}`,
"Doctor changes",
);
} else if (legacyRunLogDetected) {
note(
`Cron run logs migrated to SQLite at ${shortenHomePath(storePath)}.${formatRunLogMigrationNote(runLogMigration.importedFiles)}`,
"Doctor changes",
);
} else {
note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes");
}
if (dreamingMigration.rewrittenCount > 0) {
note(
`Rewrote ${pluralize(dreamingMigration.rewrittenCount, "managed dreaming job")} to run as an isolated agent turn so dreaming no longer requires heartbeat.`,
"Doctor changes",
noteLegacyCronRepairResult(
await applyLegacyCronStoreRepair({ cfg: params.cfg, state, normalized }),
);
}
}
if (notifyMigration.warnings.length > 0) {
note(notifyMigration.warnings.join("\n"), "Doctor warnings");
}
}