diff --git a/src/commands/doctor-config-preflight.state-migration.test.ts b/src/commands/doctor-config-preflight.state-migration.test.ts index 41572d77b089..c662a7bf0926 100644 --- a/src/commands/doctor-config-preflight.state-migration.test.ts +++ b/src/commands/doctor-config-preflight.state-migration.test.ts @@ -10,6 +10,9 @@ const autoMigrateLegacyState = vi.hoisted(() => const autoMigrateLegacyTaskStateSidecars = vi.hoisted(() => vi.fn(async () => ({ migrated: true, skipped: false, changes: ["task-imported"], warnings: [] })), ); +const repairLegacyCronStoreWithoutPrompt = vi.hoisted(() => + vi.fn(async () => ({ changes: ["cron-imported"], warnings: [] })), +); const readConfigFileSnapshot = vi.hoisted(() => vi.fn(async () => ({ exists: true, @@ -29,6 +32,10 @@ vi.mock("./doctor-state-migrations.js", () => ({ autoMigrateLegacyTaskStateSidecars, })); +vi.mock("./doctor/cron/index.js", () => ({ + repairLegacyCronStoreWithoutPrompt, +})); + vi.mock("../config/io.js", () => ({ readConfigFileSnapshot, recoverConfigFromJsonRootSuffix: vi.fn(), @@ -48,11 +55,15 @@ describe("runDoctorConfigPreflight state migration", () => { expect(autoMigrateLegacyStateDir).toHaveBeenCalledOnce(); expect(readConfigFileSnapshot).toHaveBeenCalledOnce(); + expect(repairLegacyCronStoreWithoutPrompt).toHaveBeenCalledWith({ + cfg: { gateway: { mode: "local", port: 19091 } }, + }); expect(autoMigrateLegacyState).toHaveBeenCalledWith({ cfg: { gateway: { mode: "local", port: 19091 } }, env: process.env, recoverCorruptTargetStore: undefined, }); + expect(note).toHaveBeenCalledWith("- cron-imported", "Doctor changes"); expect(note).toHaveBeenCalledWith("- imported", "Doctor changes"); }); @@ -90,6 +101,7 @@ describe("runDoctorConfigPreflight state migration", () => { }); expect(autoMigrateLegacyState).not.toHaveBeenCalled(); + expect(repairLegacyCronStoreWithoutPrompt).not.toHaveBeenCalled(); expect(autoMigrateLegacyTaskStateSidecars).toHaveBeenCalledWith({ env: process.env }); expect(note).toHaveBeenCalledWith("- task-imported", "Doctor changes"); }); diff --git a/src/commands/doctor-config-preflight.ts b/src/commands/doctor-config-preflight.ts index b3861c4f7339..6ef0b85522c9 100644 --- a/src/commands/doctor-config-preflight.ts +++ b/src/commands/doctor-config-preflight.ts @@ -16,14 +16,21 @@ import { noteIncludeConfinementWarning } from "./doctor-config-analysis.js"; import { findDoctorLegacyConfigIssues } from "./doctor/shared/legacy-config-issues.js"; type DoctorStateMigrationsModule = typeof import("./doctor-state-migrations.js"); +type DoctorCronModule = typeof import("./doctor/cron/index.js"); let doctorStateMigrationsPromise: Promise | null = null; +let doctorCronPromise: Promise | null = null; function loadDoctorStateMigrations(): Promise { doctorStateMigrationsPromise ??= import("./doctor-state-migrations.js"); return doctorStateMigrationsPromise; } +function loadDoctorCron(): Promise { + doctorCronPromise ??= import("./doctor/cron/index.js"); + return doctorCronPromise; +} + async function maybeMigrateLegacyConfig(): Promise { const changes: string[] = []; const home = resolveHomeDir(); @@ -174,6 +181,11 @@ export async function runDoctorConfigPreflight( const baseConfig = snapshot.sourceConfig ?? snapshot.config ?? {}; if (options.migrateState !== false) { + if (snapshot.valid) { + const { repairLegacyCronStoreWithoutPrompt } = await loadDoctorCron(); + const cronResult = await repairLegacyCronStoreWithoutPrompt({ cfg: baseConfig }); + noteStateMigrationResult(cronResult); + } const { autoMigrateLegacyState, autoMigrateLegacyTaskStateSidecars } = await loadDoctorStateMigrations(); const stateResult = snapshot.valid diff --git a/src/commands/doctor/cron/index.ts b/src/commands/doctor/cron/index.ts index b50cd817ecdd..c4a1825837e7 100644 --- a/src/commands/doctor/cron/index.ts +++ b/src/commands/doctor/cron/index.ts @@ -52,53 +52,212 @@ function formatRunLogMigrationNote(importedFiles: number): string { : ""; } +function errorMessage(err: unknown): string { + return err instanceof Error ? err.message : String(err); +} + +type LegacyCronRepairState = { + storePath: string; + quarantinePath: string; + legacyStoreDetected: boolean; + legacyRunLogDetected: boolean; + legacyImportCount: number; + sqliteProjectionBackfillCount: number; + rawJobs: Array>; +}; + +export type LegacyCronRepairResult = { + changes: string[]; + warnings: string[]; +}; + +async function loadLegacyCronRepairState(params: { + cfg: OpenClawConfig; + onlyIfLegacyDetected?: boolean; +}): Promise { + const storePath = resolveCronJobsStorePath(params.cfg.cron?.store); + const quarantinePath = resolveCronQuarantinePath(storePath); + const legacyStoreDetected = await legacyCronStoreFilesExist(storePath); + const legacyRunLogDetected = await legacyCronRunLogFilesExist(storePath); + if (params.onlyIfLegacyDetected && !legacyStoreDetected && !legacyRunLogDetected) { + return null; + } + + const loaded = await loadCronJobsStoreWithConfigJobs(storePath); + const currentJobs = + loaded.configJobs.length > 0 + ? loaded.configJobs.map((job, index) => + mergeRuntimeEntryIntoConfigJob({ + job, + runtimeEntry: loaded.configJobRuntimeEntries[index], + }), + ) + : (loaded.store.jobs as unknown as Array>); + const sqliteProjectionBackfillCount = + loaded.configJobs.length > 0 + ? currentJobs.filter((job, index) => + needsSqliteProjectionBackfill({ + configJob: job, + projectedJob: loaded.store.jobs[index], + }), + ).length + : 0; + let rawJobs = currentJobs; + let legacyImportCount = 0; + if (legacyStoreDetected) { + const legacyStore = (await loadLegacyCronStoreForMigration(storePath)).store; + const merged = mergeLegacyCronJobs({ + currentJobs: rawJobs, + legacyJobs: legacyStore.jobs as unknown as Array>, + }); + rawJobs = merged.jobs; + legacyImportCount = merged.importedCount; + } + + return { + storePath, + quarantinePath, + legacyStoreDetected, + legacyRunLogDetected, + legacyImportCount, + sqliteProjectionBackfillCount, + rawJobs, + }; +} + +async function applyLegacyCronStoreRepair(params: { + cfg: OpenClawConfig; + state: LegacyCronRepairState; + normalized?: ReturnType; +}): Promise { + const { state } = params; + const changes: string[] = []; + const warnings: string[] = []; + const normalized = params.normalized ?? normalizeStoredCronJobs(state.rawJobs); + const legacyWebhook = normalizeOptionalString(params.cfg.cron?.webhook); + const notifyMigration = migrateLegacyNotifyFallback({ + jobs: state.rawJobs, + legacyWebhook, + }); + const dreamingMigration = migrateLegacyDreamingPayloadShape(state.rawJobs); + warnings.push(...notifyMigration.warnings); + + const changed = + state.legacyStoreDetected || + state.legacyRunLogDetected || + state.sqliteProjectionBackfillCount > 0 || + normalized.mutated || + notifyMigration.changed || + dreamingMigration.changed; + if (!changed && warnings.length === 0) { + return { changes, warnings }; + } + + if (changed) { + try { + if (normalized.removedJobs.length > 0) { + await saveCronQuarantineFile({ + storePath: state.storePath, + nowMs: Date.now(), + entries: normalized.removedJobs.map((entry) => ({ + sourceIndex: entry.sourceIndex, + reason: entry.reason, + job: entry.job, + })), + }); + } + await saveCronJobsStore(state.storePath, { + version: 1, + jobs: state.rawJobs as unknown as CronJob[], + }); + } catch (err) { + return { + changes, + warnings: [ + ...warnings, + `Failed writing migrated cron store at ${shortenHomePath(state.storePath)}: ${errorMessage(err)}`, + ], + }; + } + } + + let importedRunLogs = 0; + if (state.legacyRunLogDetected) { + try { + importedRunLogs = (await migrateLegacyCronRunLogsToSqlite(state.storePath)).importedFiles; + } catch (err) { + warnings.push( + `Failed importing legacy cron run logs at ${shortenHomePath(state.storePath)}: ${errorMessage(err)}`, + ); + } + } + + if (state.legacyStoreDetected) { + await archiveLegacyCronStoreForMigration(state.storePath); + changes.push( + `Cron store migrated to SQLite at ${shortenHomePath(state.storePath)}.${formatRunLogMigrationNote(importedRunLogs)}`, + ); + } else if (state.legacyRunLogDetected && importedRunLogs > 0) { + changes.push( + `Cron run logs migrated to SQLite at ${shortenHomePath(state.storePath)}.${formatRunLogMigrationNote(importedRunLogs)}`, + ); + } else if (changed) { + changes.push(`Cron store normalized at ${shortenHomePath(state.storePath)}.`); + } + if (dreamingMigration.rewrittenCount > 0) { + changes.push( + `Rewrote ${pluralize(dreamingMigration.rewrittenCount, "managed dreaming job")} to run as an isolated agent turn so dreaming no longer requires heartbeat.`, + ); + } + + return { changes, warnings }; +} + +export async function repairLegacyCronStoreWithoutPrompt(params: { + cfg: OpenClawConfig; +}): Promise { + const storePath = resolveCronJobsStorePath(params.cfg.cron?.store); + let state: LegacyCronRepairState | null; + try { + state = await loadLegacyCronRepairState({ + cfg: params.cfg, + onlyIfLegacyDetected: true, + }); + } catch (err) { + return { + changes: [], + warnings: [ + `Failed reading legacy cron storage at ${shortenHomePath(storePath)}: ${errorMessage(err)}`, + ], + }; + } + if (!state) { + return { changes: [], warnings: [] }; + } + return await applyLegacyCronStoreRepair({ cfg: params.cfg, state }); +} + +function noteLegacyCronRepairResult(result: LegacyCronRepairResult): void { + if (result.changes.length > 0) { + note(result.changes.join("\n"), "Doctor changes"); + } + if (result.warnings.length > 0) { + note(result.warnings.join("\n"), "Doctor warnings"); + } +} + /** Inspect cron storage and optionally repair legacy JSON/SQLite/payload shapes. */ export async function maybeRepairLegacyCronStore(params: { cfg: OpenClawConfig; options: DoctorOptions; prompter: Pick; }) { - const storePath = resolveCronJobsStorePath(params.cfg.cron?.store); - const quarantinePath = resolveCronQuarantinePath(storePath); - let store: Awaited>["store"]; - let legacyStoreDetected; - let legacyRunLogDetected; - let legacyImportCount = 0; - let sqliteProjectionBackfillCount; + let state: LegacyCronRepairState | null; try { - legacyStoreDetected = await legacyCronStoreFilesExist(storePath); - legacyRunLogDetected = await legacyCronRunLogFilesExist(storePath); - const loaded = await loadCronJobsStoreWithConfigJobs(storePath); - const currentJobs = - loaded.configJobs.length > 0 - ? loaded.configJobs.map((job, index) => - mergeRuntimeEntryIntoConfigJob({ - job, - runtimeEntry: loaded.configJobRuntimeEntries[index], - }), - ) - : (loaded.store.jobs as unknown as Array>); - sqliteProjectionBackfillCount = - loaded.configJobs.length > 0 - ? currentJobs.filter((job, index) => - needsSqliteProjectionBackfill({ - configJob: job, - projectedJob: loaded.store.jobs[index], - }), - ).length - : 0; - store = { version: 1, jobs: currentJobs as unknown as CronJob[] }; - if (legacyStoreDetected) { - const legacyStore = (await loadLegacyCronStoreForMigration(storePath)).store; - const merged = mergeLegacyCronJobs({ - currentJobs: store.jobs as unknown as Array>, - legacyJobs: legacyStore.jobs as unknown as Array>, - }); - legacyImportCount = merged.importedCount; - store = { version: 1, jobs: merged.jobs as unknown as CronJob[] }; - } + state = await loadLegacyCronRepairState({ cfg: params.cfg }); } catch (err) { const reason = err instanceof Error ? err.message : String(err); + const storePath = resolveCronJobsStorePath(params.cfg.cron?.store); note( [ `Unable to read cron job store at ${shortenHomePath(storePath)}.`, @@ -109,6 +268,18 @@ export async function maybeRepairLegacyCronStore(params: { ); return; } + if (!state) { + return; + } + const { + storePath, + quarantinePath, + legacyStoreDetected, + legacyRunLogDetected, + legacyImportCount, + sqliteProjectionBackfillCount, + rawJobs, + } = state; try { const quarantine = await loadCronQuarantineFile(quarantinePath); if (quarantine.jobs.length > 0) { @@ -131,7 +302,6 @@ export async function maybeRepairLegacyCronStore(params: { "Cron", ); } - const rawJobs = (store.jobs ?? []) as unknown as Array>; if (rawJobs.length === 0) { if (!legacyStoreDetected && !legacyRunLogDetected) { return; @@ -158,30 +328,12 @@ export async function maybeRepairLegacyCronStore(params: { if (!shouldRepair) { return; } - if (legacyStoreDetected) { - await saveCronJobsStore(storePath, { version: 1, jobs: [] }); - await archiveLegacyCronStoreForMigration(storePath); - } - const runLogMigration = legacyRunLogDetected - ? await migrateLegacyCronRunLogsToSqlite(storePath) - : { importedFiles: 0 }; - if (legacyStoreDetected) { - note( - `Cron store migrated to SQLite at ${shortenHomePath(storePath)}.${formatRunLogMigrationNote(runLogMigration.importedFiles)}`, - "Doctor changes", - ); - } else { - note( - `Cron run logs migrated to SQLite at ${shortenHomePath(storePath)}.${formatRunLogMigrationNote(runLogMigration.importedFiles)}`, - "Doctor changes", - ); - } + noteLegacyCronRepairResult(await applyLegacyCronStoreRepair({ cfg: params.cfg, state })); return; } noteCronModelOverrides({ cfg: params.cfg, jobs: rawJobs, storePath }); const normalized = normalizeStoredCronJobs(rawJobs); - const legacyWebhook = normalizeOptionalString(params.cfg.cron?.webhook); const notifyCount = rawJobs.filter((job) => job.notify === true).length; const dreamingStaleCount = countStaleDreamingJobs(rawJobs); const previewLines = formatLegacyIssuePreview(normalized.issues); @@ -231,64 +383,7 @@ export async function maybeRepairLegacyCronStore(params: { return; } - const notifyMigration = migrateLegacyNotifyFallback({ - jobs: rawJobs, - legacyWebhook, - }); - const dreamingMigration = migrateLegacyDreamingPayloadShape(rawJobs); - const changed = - legacyStoreDetected || - legacyRunLogDetected || - sqliteProjectionBackfillCount > 0 || - normalized.mutated || - notifyMigration.changed || - dreamingMigration.changed; - if (!changed && notifyMigration.warnings.length === 0) { - return; - } - - if (changed) { - if (normalized.removedJobs.length > 0) { - await saveCronQuarantineFile({ - storePath, - nowMs: Date.now(), - entries: normalized.removedJobs.map((entry) => ({ - sourceIndex: entry.sourceIndex, - reason: entry.reason, - job: entry.job, - })), - }); - } - await saveCronJobsStore(storePath, { - version: 1, - jobs: rawJobs as unknown as CronJob[], - }); - const runLogMigration = legacyRunLogDetected - ? await migrateLegacyCronRunLogsToSqlite(storePath) - : { importedFiles: 0 }; - if (legacyStoreDetected) { - await archiveLegacyCronStoreForMigration(storePath); - note( - `Cron store migrated to SQLite at ${shortenHomePath(storePath)}.${formatRunLogMigrationNote(runLogMigration.importedFiles)}`, - "Doctor changes", - ); - } else if (legacyRunLogDetected) { - note( - `Cron run logs migrated to SQLite at ${shortenHomePath(storePath)}.${formatRunLogMigrationNote(runLogMigration.importedFiles)}`, - "Doctor changes", - ); - } else { - note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes"); - } - if (dreamingMigration.rewrittenCount > 0) { - note( - `Rewrote ${pluralize(dreamingMigration.rewrittenCount, "managed dreaming job")} to run as an isolated agent turn so dreaming no longer requires heartbeat.`, - "Doctor changes", - ); - } - } - - if (notifyMigration.warnings.length > 0) { - note(notifyMigration.warnings.join("\n"), "Doctor warnings"); - } + noteLegacyCronRepairResult( + await applyLegacyCronStoreRepair({ cfg: params.cfg, state, normalized }), + ); }