From a84819a639534d7a953022aa4fa0ab145378e0c2 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 31 May 2026 15:39:47 +0100 Subject: [PATCH] refactor(cron): keep runtime on canonical sqlite rows --- docs/automation/cron-jobs.md | 6 +- docs/cli/doctor.md | 2 +- src/agents/tools/cron-tool.ts | 216 +++++++++-- src/commands/doctor/cron/index.test.ts | 140 ++++++- src/commands/doctor/cron/index.ts | 93 ++++- .../doctor/cron/legacy-store-migration.ts | 75 +++- .../doctor/cron/store-migration.test.ts | 16 + src/commands/doctor/cron/store-migration.ts | 29 +- src/cron/normalize.test.ts | 365 ++---------------- src/cron/normalize.ts | 239 +----------- src/cron/schedule-identity.ts | 13 +- src/cron/schedule.test.ts | 15 +- src/cron/schedule.ts | 28 +- src/cron/service.every-jobs-fire.test.ts | 12 +- .../service.issue-19676-at-reschedule.test.ts | 11 - src/cron/service.jobs.test.ts | 20 +- src/cron/service/jobs.ts | 13 +- src/cron/service/normalize.ts | 9 +- src/cron/service/ops.test.ts | 56 +-- src/cron/service/store.test.ts | 22 -- src/cron/store.test.ts | 126 +----- src/cron/store/delivery-codec.ts | 137 +------ src/cron/store/payload-codec.ts | 45 +-- src/cron/store/row-codec.ts | 16 +- src/cron/store/scalar-codec.ts | 42 -- .../server-methods/cron.validation.test.ts | 4 +- src/gateway/server.cron.test.ts | 21 +- src/state/openclaw-state-db.generated.d.ts | 2 + src/state/openclaw-state-db.ts | 2 + src/state/openclaw-state-schema.generated.ts | 2 + src/state/openclaw-state-schema.sql | 2 + 31 files changed, 680 insertions(+), 1099 deletions(-) diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 8a11cf968972..1d76cbfbcee8 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -41,8 +41,8 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t - Cron runs **inside the Gateway** process (not inside the model). - Job definitions, runtime state, and run history persist in OpenClaw's shared SQLite state database so restarts do not lose schedules. -- On upgrade, legacy `~/.openclaw/cron/jobs.json`, `jobs-state.json`, and `runs/*.jsonl` files are imported once and renamed with a `.migrated` suffix. Malformed job rows are skipped from runtime and copied to `jobs-quarantine.json` for later repair or review. -- `cron.store` still names the logical cron store key and legacy import path. After import, editing that JSON file no longer changes active cron jobs; use `openclaw cron add|edit|remove` or the Gateway cron RPC methods instead. +- On upgrade, run `openclaw doctor --fix` to import legacy `~/.openclaw/cron/jobs.json`, `jobs-state.json`, and `runs/*.jsonl` files into SQLite and rename them with a `.migrated` suffix. Malformed job rows are skipped from runtime and copied to `jobs-quarantine.json` for later repair or review. +- `cron.store` still names the logical cron store key and doctor import path. After import, editing that JSON file no longer changes active cron jobs; use `openclaw cron add|edit|remove` or the Gateway cron RPC methods instead. - All cron executions create [background task](/automation/tasks) records. - On Gateway startup, overdue isolated agent-turn jobs are rescheduled out of the channel-connect window instead of replaying immediately, so Discord/Telegram startup and native-command setup stay responsive after restarts. - One-shot jobs (`--at`) auto-delete after success by default. @@ -460,7 +460,7 @@ Model override note: `maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution, and defaults to 8. Isolated cron agent turns use the queue's dedicated `cron-nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers. The shared non-cron `nested` lane is not widened by this setting. -`cron.store` is a logical store key and legacy import path. Existing stores are imported into SQLite on first load and archived; future cron changes should go through the CLI or Gateway API. +`cron.store` is a logical store key and legacy doctor import path. Run `openclaw doctor --fix` to import existing JSON stores into SQLite and archive them; future cron changes should go through the CLI or Gateway API. Disable cron: `cron.enabled: false` or `OPENCLAW_SKIP_CRON=1`. diff --git a/docs/cli/doctor.md b/docs/cli/doctor.md index 1cf71ec13300..c762ae8160c4 100644 --- a/docs/cli/doctor.md +++ b/docs/cli/doctor.md @@ -199,7 +199,7 @@ Notes: - Modernized health checks can expose a `repair()` path for `doctor --fix`; checks that do not expose one continue through the existing doctor repair flow. - `doctor --fix --non-interactive` reports missing or stale gateway service definitions but does not install or rewrite them outside update repair mode. Run `openclaw gateway install` for a missing service, or `openclaw gateway install --force` when you intentionally want to replace the launcher. - State integrity checks now detect orphan transcript files in the sessions directory. Archiving them as `.deleted.` requires an interactive confirmation; `--fix`, `--yes`, and headless runs leave them in place. -- Doctor also scans `~/.openclaw/cron/jobs.json` (or `cron.store`) for legacy cron job shapes and can rewrite them in place before the scheduler has to auto-normalize them at runtime. +- Doctor also scans `~/.openclaw/cron/jobs.json` (or `cron.store`) for legacy cron job shapes and rewrites them before importing canonical rows into SQLite. - Doctor reports cron jobs with explicit `payload.model` overrides, including provider namespace counts and mismatches against `agents.defaults.model`, so scheduled jobs that do not inherit the default model are visible during auth or billing investigations. - On Linux, doctor warns when the user's crontab still runs legacy `~/.openclaw/bin/ensure-whatsapp.sh`; that script is no longer maintained and can log false WhatsApp gateway outages when cron lacks the systemd user-bus environment. - When WhatsApp is enabled, doctor checks for a degraded Gateway event loop with local `openclaw-tui` clients still running. `doctor --fix` stops only verified local TUI clients so WhatsApp replies are not queued behind stale TUI refresh loops. diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index 70b1262eb027..6d4bbe85809c 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -94,10 +94,195 @@ const REMINDER_CONTEXT_PER_MESSAGE_MAX = 220; const REMINDER_CONTEXT_TOTAL_MAX = 700; const REMINDER_CONTEXT_MARKER = "\n\nRecent context:\n"; +function isCronScheduleKind(value: unknown): value is (typeof CRON_SCHEDULE_KINDS)[number] { + return value === "at" || value === "every" || value === "cron"; +} + +function isCronPayloadKind(value: unknown): value is (typeof CRON_PAYLOAD_KINDS)[number] { + return value === "systemEvent" || value === "agentTurn"; +} + function isMissingOrEmptyObject(value: unknown): boolean { return !value || (isRecord(value) && Object.keys(value).length === 0); } +function isNonEmptyString(value: unknown): value is string { + return typeof value === "string" && value.trim().length > 0; +} + +function isStringArrayOrNull(value: unknown): boolean { + return ( + value === null || (Array.isArray(value) && value.every((entry) => typeof entry === "string")) + ); +} + +function moveDefinedField(params: { + source: Record; + target: Record; + from: string; + to?: string; +}): boolean { + if (params.source[params.from] === undefined) { + return false; + } + params.target[params.to ?? params.from] = params.source[params.from]; + delete params.source[params.from]; + return true; +} + +function setScheduleAtMs(schedule: Record, value: unknown): void { + const atMs = typeof value === "number" ? value : Number(value); + schedule.at = Number.isFinite(atMs) ? new Date(Math.floor(atMs)).toISOString() : value; +} + +function canonicalizeCronToolSchedule(value: Record): void { + const schedule = isRecord(value.schedule) ? { ...value.schedule } : {}; + let hasSchedule = isRecord(value.schedule); + + if (schedule.atMs !== undefined) { + setScheduleAtMs(schedule, schedule.atMs); + delete schedule.atMs; + if (!isCronScheduleKind(schedule.kind)) { + schedule.kind = "at"; + } + } + if (schedule.everyMs === undefined && schedule.every !== undefined) { + schedule.everyMs = schedule.every; + delete schedule.every; + } + if (schedule.expr === undefined && schedule.cron !== undefined) { + schedule.expr = schedule.cron; + delete schedule.cron; + } + if (schedule.staggerMs === undefined && schedule.stagger !== undefined) { + schedule.staggerMs = schedule.stagger; + delete schedule.stagger; + } + if (schedule.exact === true && schedule.staggerMs === undefined) { + schedule.staggerMs = 0; + } + delete schedule.exact; + + if (isCronScheduleKind(value.kind) && !isCronScheduleKind(schedule.kind)) { + schedule.kind = value.kind; + delete value.kind; + hasSchedule = true; + } + + const movedAt = moveDefinedField({ source: value, target: schedule, from: "at" }); + if (movedAt && !isCronScheduleKind(schedule.kind)) { + schedule.kind = "at"; + } + + if (value.atMs !== undefined) { + setScheduleAtMs(schedule, value.atMs); + delete value.atMs; + if (!isCronScheduleKind(schedule.kind)) { + schedule.kind = "at"; + } + hasSchedule = true; + } + + const movedEveryMs = + moveDefinedField({ source: value, target: schedule, from: "everyMs" }) || + moveDefinedField({ source: value, target: schedule, from: "every", to: "everyMs" }); + if (movedEveryMs && !isCronScheduleKind(schedule.kind)) { + schedule.kind = "every"; + } + + const movedCron = + moveDefinedField({ source: value, target: schedule, from: "cron", to: "expr" }) || + moveDefinedField({ source: value, target: schedule, from: "expr" }); + if (movedCron && !isCronScheduleKind(schedule.kind)) { + schedule.kind = "cron"; + } + + for (const key of ["anchorMs", "tz", "staggerMs"] as const) { + hasSchedule = moveDefinedField({ source: value, target: schedule, from: key }) || hasSchedule; + } + hasSchedule = + moveDefinedField({ source: value, target: schedule, from: "stagger", to: "staggerMs" }) || + hasSchedule; + + if (value.exact === true && schedule.staggerMs === undefined) { + schedule.staggerMs = 0; + hasSchedule = true; + } + delete value.exact; + + if (!isCronScheduleKind(schedule.kind)) { + if (schedule.at !== undefined) { + schedule.kind = "at"; + } else if (schedule.everyMs !== undefined) { + schedule.kind = "every"; + } else if (schedule.expr !== undefined) { + schedule.kind = "cron"; + } + } + + if (hasSchedule || Object.keys(schedule).length > 0) { + value.schedule = schedule; + } +} + +function canonicalizeCronToolPayload(value: Record): void { + const payload = isRecord(value.payload) ? { ...value.payload } : {}; + let hasPayload = isRecord(value.payload); + + for (const key of CRON_FLAT_PAYLOAD_KEYS) { + hasPayload = moveDefinedField({ source: value, target: payload, from: key }) || hasPayload; + } + + if (isCronPayloadKind(value.kind) && !isCronPayloadKind(payload.kind)) { + payload.kind = value.kind; + delete value.kind; + hasPayload = true; + } + + if (!isCronPayloadKind(payload.kind)) { + const hasAgentTurnSignal = + isNonEmptyString(payload.message) || + isNonEmptyString(payload.model) || + isNonEmptyString(payload.thinking) || + typeof payload.timeoutSeconds === "number" || + typeof payload.lightContext === "boolean" || + typeof payload.allowUnsafeExternalContent === "boolean" || + (payload.fallbacks !== undefined && isStringArrayOrNull(payload.fallbacks)) || + (payload.toolsAllow !== undefined && isStringArrayOrNull(payload.toolsAllow)); + if (hasAgentTurnSignal) { + payload.kind = "agentTurn"; + } else if (isNonEmptyString(payload.text)) { + payload.kind = "systemEvent"; + } + } + + if (hasPayload || Object.keys(payload).length > 0) { + value.payload = payload; + } +} + +function canonicalizeCronToolObject(value: Record): Record { + const unwrapped = isRecord(value.data) ? value.data : isRecord(value.job) ? value.job : value; + const next = { ...unwrapped }; + canonicalizeCronToolSchedule(next); + canonicalizeCronToolPayload(next); + return next; +} + +function isEmptyRecoveredCronPatch(value: unknown): boolean { + if (!isRecord(value)) { + return true; + } + const keys = Object.keys(value); + return ( + keys.length === 0 || + (keys.length === 1 && + keys[0] === "payload" && + isRecord(value.payload) && + Object.keys(value.payload).length === 0) + ); +} + function recoverCronObjectFromFlatParams(params: Record): { found: boolean; value: Record; @@ -110,19 +295,7 @@ function recoverCronObjectFromFlatParams(params: Record): { found = true; } } - if (value.everyMs === undefined && value.every !== undefined) { - value.everyMs = value.every; - } - if (value.staggerMs === undefined && value.stagger !== undefined) { - value.staggerMs = value.stagger; - } - if (value.exact === true && value.staggerMs === undefined) { - value.staggerMs = 0; - } - delete value.every; - delete value.stagger; - delete value.exact; - return { found, value }; + return { found, value: canonicalizeCronToolObject(value) }; } function hasCronCreateSignal(value: Record): boolean { @@ -662,10 +835,11 @@ Use jobId canonical; id accepted compat. contextMessages (0-10) adds previous me if (!params.job || typeof params.job !== "object") { throw new Error("job required"); } + const canonicalJob = canonicalizeCronToolObject(params.job as Record); const job = - normalizeCronJobCreate(params.job, { + normalizeCronJobCreate(canonicalJob, { sessionContext: { sessionKey: opts?.agentSessionKey }, - }) ?? params.job; + }) ?? canonicalJob; const cfg = getRuntimeConfig(); if (job && typeof job === "object") { const { mainKey, alias } = resolveMainSessionAlias(cfg); @@ -775,13 +949,11 @@ Use jobId canonical; id accepted compat. contextMessages (0-10) adds previous me if (!params.patch || typeof params.patch !== "object") { throw new Error("patch required"); } - const patch = normalizeCronJobPatch(params.patch) ?? params.patch; - if ( - recoveredFlatPatch && - typeof patch === "object" && - patch !== null && - Object.keys(patch as Record).length === 0 - ) { + const canonicalPatch = canonicalizeCronToolObject( + params.patch as Record, + ); + const patch = normalizeCronJobPatch(canonicalPatch) ?? canonicalPatch; + if (recoveredFlatPatch && isEmptyRecoveredCronPatch(patch)) { throw new Error("patch required"); } return jsonResult( diff --git a/src/commands/doctor/cron/index.test.ts b/src/commands/doctor/cron/index.test.ts index ae657c5d2f10..7087e7d0b961 100644 --- a/src/commands/doctor/cron/index.test.ts +++ b/src/commands/doctor/cron/index.test.ts @@ -4,7 +4,13 @@ import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../../config/config.js"; import { readCronRunLogEntriesSync } from "../../../cron/run-log.js"; -import { loadCronStore, resolveCronQuarantinePath, saveCronStore } from "../../../cron/store.js"; +import { + loadCronQuarantineFile, + loadCronStore, + resolveCronQuarantinePath, + saveCronStore, +} from "../../../cron/store.js"; +import { runOpenClawStateWriteTransaction } from "../../../state/openclaw-state-db.js"; import { collectLegacyWhatsAppCrontabHealthWarning, maybeRepairLegacyCronStore, @@ -108,6 +114,43 @@ async function writeCurrentCronStore(storePath: string, jobs: Array, + options: { payloadMessage?: string | null } = {}, +) { + const schedule = requireRecord(job.schedule, "cron schedule"); + const payload = requireRecord(job.payload, "cron payload"); + runOpenClawStateWriteTransaction(({ db }) => { + db.prepare( + `INSERT INTO cron_jobs ( + store_key, job_id, name, enabled, created_at_ms, updated_at, + schedule_kind, every_ms, session_target, wake_mode, payload_kind, payload_message, + job_json, state_json + ) VALUES ( + $storeKey, $jobId, $name, $enabled, $createdAtMs, $updatedAt, + $scheduleKind, $everyMs, $sessionTarget, $wakeMode, $payloadKind, $payloadMessage, + $jobJson, $stateJson + )`, + ).run({ + $storeKey: path.resolve(storePath), + $jobId: String(job.id), + $name: String(job.name), + $enabled: job.enabled === false ? 0 : 1, + $createdAtMs: Number(job.createdAtMs), + $updatedAt: Number(job.updatedAtMs), + $scheduleKind: String(schedule.kind), + $everyMs: Number(schedule.everyMs), + $sessionTarget: String(job.sessionTarget), + $wakeMode: String(job.wakeMode), + $payloadKind: String(payload.kind), + $payloadMessage: options.payloadMessage ?? null, + $jobJson: JSON.stringify(job), + $stateJson: JSON.stringify(job.state ?? {}), + }); + }); +} + async function writeLegacyCronArrayStore(storePath: string, jobs: Array>) { await fs.mkdir(path.dirname(storePath), { recursive: true }); await fs.writeFile(storePath, JSON.stringify(jobs, null, 2), "utf-8"); @@ -415,6 +458,75 @@ describe("maybeRepairLegacyCronStore", () => { expectNoteContaining("Cron store migrated to SQLite", "Doctor changes"); }); + it("backfills early SQLite rows from job_json before runtime relies on split columns", async () => { + const storePath = await makeTempStorePath(); + insertEarlySQLiteCronRow(storePath, { + id: "early-sqlite-agent-turn", + name: "Early SQLite agent turn", + enabled: true, + createdAtMs: Date.parse("2026-02-03T00:00:00.000Z"), + updatedAtMs: Date.parse("2026-02-03T00:00:00.000Z"), + schedule: { kind: "every", everyMs: 3_600_000, anchorMs: 0 }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: "use config json" }, + state: {}, + }); + + expect(await readPersistedJobs(storePath)).toEqual([]); + + await maybeRepairLegacyCronStore({ + cfg: createCronConfig(storePath), + options: {}, + prompter: makePrompter(true), + }); + + const jobs = await readPersistedJobs(storePath); + const job = requirePersistedJob(jobs, 0); + expect(job.id).toBe("early-sqlite-agent-turn"); + expect(job.payload).toEqual({ kind: "agentTurn", message: "use config json" }); + expectNoteContaining("1 SQLite cron row will be backfilled", "Cron"); + }); + + it("backfills parseable SQLite rows when optional config fields only exist in job_json", async () => { + const storePath = await makeTempStorePath(); + insertEarlySQLiteCronRow( + storePath, + { + id: "early-sqlite-model", + name: "Early SQLite model", + enabled: true, + createdAtMs: Date.parse("2026-02-03T00:00:00.000Z"), + updatedAtMs: Date.parse("2026-02-03T00:00:00.000Z"), + schedule: { kind: "every", everyMs: 3_600_000, anchorMs: 0 }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: "use split text", model: "openai/gpt-5.5" }, + state: {}, + }, + { payloadMessage: "use split text" }, + ); + + expect(requirePersistedJob(await readPersistedJobs(storePath), 0).payload).toEqual({ + kind: "agentTurn", + message: "use split text", + }); + + await maybeRepairLegacyCronStore({ + cfg: createCronConfig(storePath), + options: {}, + prompter: makePrompter(true), + }); + + const job = requirePersistedJob(await readPersistedJobs(storePath), 0); + expect(job.payload).toEqual({ + kind: "agentTurn", + message: "use split text", + model: "openai/gpt-5.5", + }); + expectNoteContaining("1 SQLite cron row will be backfilled", "Cron"); + }); + it("migrates legacy run logs even when the legacy job store was already archived", async () => { const storePath = await makeTempStorePath(); await writeCurrentCronStore(storePath, [createCurrentCronJob()]); @@ -665,7 +777,7 @@ describe("maybeRepairLegacyCronStore", () => { expect(delivery.to).toBe("https://example.invalid/cron-finished"); }); - it("keeps notify fallback when cron.webhook is invalid", async () => { + it("warns when cron.webhook is invalid for a legacy notify fallback", async () => { const storePath = await makeTempStorePath(); await writeCronStore(storePath, [ createLegacyCronJob({ @@ -688,7 +800,7 @@ describe("maybeRepairLegacyCronStore", () => { const jobs = await readPersistedJobs(storePath); const job = requirePersistedJob(jobs, 0); - expect(job.notify).toBe(true); + expect(job.notify).toBeUndefined(); expect(job.delivery).toBeUndefined(); expectNoteContaining( "cron.webhook is not a valid HTTP(S) URL so doctor cannot migrate it automatically", @@ -696,6 +808,28 @@ describe("maybeRepairLegacyCronStore", () => { ); }); + it("quarantines invalid legacy rows before saving the repaired store", async () => { + const storePath = await makeTempStorePath(); + await writeCronStore(storePath, [ + createLegacyCronJob({ + id: "invalid-legacy-cron", + jobId: undefined, + schedule: { kind: "cron" }, + }), + ]); + + await maybeRepairLegacyCronStore({ + cfg: createCronConfig(storePath), + options: {}, + prompter: makePrompter(true), + }); + + expect(await readPersistedJobs(storePath)).toEqual([]); + const quarantine = await loadCronQuarantineFile(resolveCronQuarantinePath(storePath)); + expect(quarantine.jobs[0]?.reason).toBe("invalid-schedule"); + expect(quarantine.jobs[0]?.job?.id).toBe("invalid-legacy-cron"); + }); + it("repairs legacy root delivery threadId hints into delivery", async () => { const storePath = await makeTempStorePath(); await writeCronStore(storePath, [ diff --git a/src/commands/doctor/cron/index.ts b/src/commands/doctor/cron/index.ts index e7fea45d9d3e..698174c3163f 100644 --- a/src/commands/doctor/cron/index.ts +++ b/src/commands/doctor/cron/index.ts @@ -1,16 +1,18 @@ import { execFile } from "node:child_process"; -import { promisify } from "node:util"; +import { isDeepStrictEqual, promisify } from "node:util"; import { normalizeOptionalString } from "../../../../packages/normalization-core/src/string-coerce.js"; import { note } from "../../../../packages/terminal-core/src/note.js"; import { formatCliCommand } from "../../../cli/command-format.js"; import { resolveAgentModelPrimaryValue } from "../../../config/model-input.js"; import type { OpenClawConfig } from "../../../config/types.openclaw.js"; import { migrateLegacyNotifyFallback } from "../../../cron/migrations/legacy-notify.js"; +import { normalizeCronJobInput } from "../../../cron/normalize.js"; import { loadCronQuarantineFile, - loadCronStore, + loadCronStoreWithConfigJobs, resolveCronQuarantinePath, resolveCronStorePath, + saveCronQuarantineFile, saveCronStore, } from "../../../cron/store.js"; import type { CronJob } from "../../../cron/types.js"; @@ -173,6 +175,52 @@ function mergeLegacyCronJobs(params: { return { jobs: merged, importedCount }; } +function mergeRuntimeEntryIntoConfigJob(params: { + job: Record; + runtimeEntry?: { updatedAtMs?: number; state?: Record }; +}): Record { + return { + ...params.job, + ...(params.runtimeEntry?.updatedAtMs !== undefined + ? { updatedAtMs: params.runtimeEntry.updatedAtMs } + : {}), + ...(params.runtimeEntry?.state ? { state: structuredClone(params.runtimeEntry.state) } : {}), + }; +} + +function needsSqliteProjectionBackfill(params: { + configJob: Record; + projectedJob?: CronJob; +}): boolean { + if (!params.projectedJob) { + return true; + } + const normalizedConfig = normalizeCronJobInput(params.configJob, { applyDefaults: true }); + if (!normalizedConfig) { + return true; + } + const projected = params.projectedJob as unknown as Record; + for (const field of [ + "agentId", + "deleteAfterRun", + "delivery", + "description", + "enabled", + "failureAlert", + "name", + "payload", + "schedule", + "sessionKey", + "sessionTarget", + "wakeMode", + ] as const) { + if (!isDeepStrictEqual(normalizedConfig[field], projected[field])) { + return true; + } + } + return false; +} + function formatProviderCounts(counts: Map): string { return [...counts.entries()] .toSorted(([left], [right]) => left.localeCompare(right)) @@ -319,14 +367,34 @@ export async function maybeRepairLegacyCronStore(params: { }) { const storePath = resolveCronStorePath(params.cfg.cron?.store); const quarantinePath = resolveCronQuarantinePath(storePath); - let store: Awaited>; + let store: Awaited>["store"]; let legacyStoreDetected = false; let legacyRunLogDetected = false; let legacyImportCount = 0; + let sqliteProjectionBackfillCount = 0; try { legacyStoreDetected = await legacyCronStoreFilesExist(storePath); legacyRunLogDetected = await legacyCronRunLogFilesExist(storePath); - store = await loadCronStore(storePath); + const loaded = await loadCronStoreWithConfigJobs(storePath); + const currentJobs = + loaded.configJobs.length > 0 + ? loaded.configJobs.map((job, index) => + mergeRuntimeEntryIntoConfigJob({ + job, + runtimeEntry: loaded.configJobRuntimeEntries[index], + }), + ) + : (loaded.store.jobs as unknown as Array>); + sqliteProjectionBackfillCount = + loaded.configJobs.length > 0 + ? currentJobs.filter((job, index) => + needsSqliteProjectionBackfill({ + configJob: job, + projectedJob: loaded.store.jobs[index], + }), + ).length + : 0; + store = { version: 1, jobs: currentJobs as unknown as CronJob[] }; if (legacyStoreDetected) { const legacyStore = (await loadLegacyCronStoreForMigration(storePath)).store; const merged = mergeLegacyCronJobs({ @@ -434,6 +502,11 @@ export async function maybeRepairLegacyCronStore(params: { if (legacyRunLogDetected) { previewLines.push("- legacy JSON cron run logs will be imported into SQLite"); } + if (sqliteProjectionBackfillCount > 0) { + previewLines.push( + `- ${pluralize(sqliteProjectionBackfillCount, "SQLite cron row")} will be backfilled from stored config JSON into split columns`, + ); + } if (notifyCount > 0) { previewLines.push( `- ${pluralize(notifyCount, "job")} still uses legacy \`notify: true\` webhook fallback`, @@ -473,6 +546,7 @@ export async function maybeRepairLegacyCronStore(params: { const changed = legacyStoreDetected || legacyRunLogDetected || + sqliteProjectionBackfillCount > 0 || normalized.mutated || notifyMigration.changed || dreamingMigration.changed; @@ -481,6 +555,17 @@ export async function maybeRepairLegacyCronStore(params: { } if (changed) { + if (normalized.removedJobs.length > 0) { + await saveCronQuarantineFile({ + storePath, + nowMs: Date.now(), + entries: normalized.removedJobs.map((entry) => ({ + sourceIndex: entry.sourceIndex, + reason: entry.reason, + job: entry.job, + })), + }); + } await saveCronStore(storePath, { version: 1, jobs: rawJobs as unknown as CronJob[], diff --git a/src/commands/doctor/cron/legacy-store-migration.ts b/src/commands/doctor/cron/legacy-store-migration.ts index 49eb76637397..7f2f23505b54 100644 --- a/src/commands/doctor/cron/legacy-store-migration.ts +++ b/src/commands/doctor/cron/legacy-store-migration.ts @@ -1,14 +1,15 @@ import fs from "node:fs/promises"; import path from "node:path"; -import { tryCronScheduleIdentity } from "../../../cron/schedule-identity.js"; +import { isRecord } from "../../../../packages/normalization-core/src/record-coerce.js"; +import { normalizeOptionalString } from "../../../../packages/normalization-core/src/string-coerce.js"; +import { coerceFiniteScheduleNumber } from "../../../cron/schedule-number.js"; +import { normalizeCronStaggerMs } from "../../../cron/stagger.js"; import type { CronConfigJobRuntimeEntry, LoadedCronStore, QuarantinedCronConfigJob, } from "../../../cron/store.js"; import type { CronStoreFile } from "../../../cron/types.js"; -import { isRecord } from "../../../../packages/normalization-core/src/record-coerce.js"; -import { normalizeOptionalString } from "../../../../packages/normalization-core/src/string-coerce.js"; import { parseJsonWithJson5Fallback } from "../../../utils/parse-json-compat.js"; const LEGACY_CRON_ARCHIVE_SUFFIX = ".migrated"; @@ -65,6 +66,71 @@ function parseCronStateFile(raw: string): { } } +function readString(record: Record, key: string): string | undefined { + return normalizeOptionalString(record[key]); +} + +function readNumber(record: Record, key: string): number | undefined { + return coerceFiniteScheduleNumber(record[key]); +} + +function legacySchedulePayloadFromRecord( + schedule: Record, +): + | { kind: "at"; at: string } + | { kind: "every"; everyMs: number; anchorMs?: number } + | { kind: "cron"; expr: string; tz?: string; staggerMs?: number } + | undefined { + const rawKind = readString(schedule, "kind")?.toLowerCase(); + const expr = readString(schedule, "expr") ?? readString(schedule, "cron"); + const at = readString(schedule, "at"); + const atMs = readNumber(schedule, "atMs"); + const everyMs = readNumber(schedule, "everyMs"); + const anchorMs = readNumber(schedule, "anchorMs"); + const tz = readString(schedule, "tz"); + const staggerMs = normalizeCronStaggerMs(schedule.staggerMs); + const kind = + rawKind === "at" || rawKind === "every" || rawKind === "cron" + ? rawKind + : at || atMs !== undefined + ? "at" + : everyMs !== undefined + ? "every" + : expr + ? "cron" + : undefined; + + if (kind === "at") { + return at + ? { kind: "at", at } + : atMs !== undefined + ? { kind: "at", at: String(atMs) } + : undefined; + } + if (kind === "every" && everyMs !== undefined) { + return { kind: "every", everyMs, anchorMs }; + } + if (kind === "cron" && expr) { + return { kind: "cron", expr, tz, staggerMs }; + } + return undefined; +} + +function tryLegacyCronScheduleIdentity(job: Record): string | undefined { + const schedule = + job.schedule && typeof job.schedule === "object" && !Array.isArray(job.schedule) + ? legacySchedulePayloadFromRecord(job.schedule as Record) + : legacySchedulePayloadFromRecord(job); + if (!schedule) { + return undefined; + } + return JSON.stringify({ + version: 1, + enabled: typeof job.enabled === "boolean" ? job.enabled : true, + schedule, + }); +} + function getRawCronJobs(parsed: unknown): unknown[] { return Array.isArray(parsed) ? parsed @@ -135,7 +201,8 @@ function mergeStateFileEntry(job: CronStoreFile["jobs"][number], entry: unknown) job.state = isRecord(entry.state) ? (entry.state as never) : ({} as never); if ( typeof entry.scheduleIdentity === "string" && - entry.scheduleIdentity !== tryCronScheduleIdentity(job as unknown as Record) + entry.scheduleIdentity !== + tryLegacyCronScheduleIdentity(job as unknown as Record) ) { ensureJobStateObject(job); job.state.nextRunAtMs = undefined; diff --git a/src/commands/doctor/cron/store-migration.test.ts b/src/commands/doctor/cron/store-migration.test.ts index feb0df384e2e..97502a4d1167 100644 --- a/src/commands/doctor/cron/store-migration.test.ts +++ b/src/commands/doctor/cron/store-migration.test.ts @@ -142,6 +142,22 @@ describe("normalizeStoredCronJobs", () => { expect(result.issues.legacyPayloadKind).toBeUndefined(); }); + it("rewrites legacy systemEvent message payloads to text", () => { + const jobs = [ + makeLegacyJob({ + id: "legacy-system-event-message", + schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 }, + payload: { kind: "systemEvent", message: "tick" }, + }), + ]; + + const result = normalizeStoredCronJobs(jobs); + + expect(result.mutated).toBe(true); + expect(result.jobs[0]?.payload).toEqual({ kind: "systemEvent", text: "tick" }); + expect(result.removedJobs).toEqual([]); + }); + it("removes unrepairable persisted schedule and payload shapes", () => { const jobs = [ makeLegacyJob({ diff --git a/src/commands/doctor/cron/store-migration.ts b/src/commands/doctor/cron/store-migration.ts index acd7b086b542..f91d4a60fccb 100644 --- a/src/commands/doctor/cron/store-migration.ts +++ b/src/commands/doctor/cron/store-migration.ts @@ -1,9 +1,4 @@ import { randomUUID } from "node:crypto"; -import { parseAbsoluteTimeMs } from "../../../cron/parse.js"; -import { getInvalidPersistedCronJobReason } from "../../../cron/persisted-shape.js"; -import { coerceFiniteScheduleNumber } from "../../../cron/schedule.js"; -import { inferLegacyName } from "../../../cron/service/normalize.js"; -import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../../../cron/stagger.js"; import { timestampMsToIsoString } from "../../../../packages/normalization-core/src/number-coercion.js"; import { normalizeLowercaseStringOrEmpty, @@ -11,6 +6,11 @@ import { normalizeOptionalString, normalizeOptionalStringifiedId, } from "../../../../packages/normalization-core/src/string-coerce.js"; +import { parseAbsoluteTimeMs } from "../../../cron/parse.js"; +import { getInvalidPersistedCronJobReason } from "../../../cron/persisted-shape.js"; +import { coerceFiniteScheduleNumber } from "../../../cron/schedule.js"; +import { inferCronJobName } from "../../../cron/service/normalize.js"; +import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../../../cron/stagger.js"; import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js"; import { hasLegacyOpenAICodexCronModelRef, migrateLegacyCronPayload } from "./payload-migration.js"; @@ -35,6 +35,7 @@ type NormalizeCronStoreJobsResult = { issues: CronStoreIssues; jobs: Array>; mutated: boolean; + removedJobs: Array<{ job: Record; reason: string; sourceIndex: number }>; }; function incrementIssue(issues: CronStoreIssues, key: CronStoreIssueKey) { @@ -237,8 +238,9 @@ export function normalizeStoredCronJobs( const issues: CronStoreIssues = {}; let mutated = false; const keptJobs: Array> = []; + const removedJobs: NormalizeCronStoreJobsResult["removedJobs"] = []; - for (const raw of jobs) { + for (const [sourceIndex, raw] of jobs.entries()) { const jobIssues = new Set(); const trackIssue = (key: CronStoreIssueKey) => { if (jobIssues.has(key)) { @@ -277,7 +279,7 @@ export function normalizeStoredCronJobs( const nameRaw = raw.name; if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) { - raw.name = inferLegacyName({ + raw.name = inferCronJobName({ schedule: raw.schedule as never, payload: raw.payload as never, }); @@ -355,6 +357,15 @@ export function normalizeStoredCronJobs( if (payloadRecord.kind === "agentTurn" && copyTopLevelAgentTurnFields(raw, payloadRecord)) { mutated = true; } + if (payloadRecord.kind === "systemEvent" && !normalizeOptionalString(payloadRecord.text)) { + const message = normalizeOptionalString(payloadRecord.message); + if (message) { + payloadRecord.text = message; + delete payloadRecord.message; + mutated = true; + trackIssue("legacyPayloadKind"); + } + } } const hadLegacyTopLevelPayloadFields = @@ -573,6 +584,7 @@ export function normalizeStoredCronJobs( invalidPersistedReason === "invalid-schedule" ) { trackIssue("invalidSchedule"); + removedJobs.push({ job: structuredClone(raw), reason: invalidPersistedReason, sourceIndex }); mutated = true; continue; } @@ -581,6 +593,7 @@ export function normalizeStoredCronJobs( invalidPersistedReason === "invalid-payload" ) { trackIssue("invalidPayload"); + removedJobs.push({ job: structuredClone(raw), reason: invalidPersistedReason, sourceIndex }); mutated = true; continue; } @@ -591,5 +604,5 @@ export function normalizeStoredCronJobs( jobs.splice(0, jobs.length, ...keptJobs); } - return { issues, jobs, mutated }; + return { issues, jobs, mutated, removedJobs }; } diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index 45f55b05ca32..ed09ff353493 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -33,15 +33,6 @@ function expectAnnounceDeliveryTarget( expect(delivery.to).toBe(params.to); } -function expectPayloadDeliveryHintsCleared(payload: Record): void { - expect(payload.channel).toBeUndefined(); - expect(payload.deliver).toBeUndefined(); - expect(payload.to).toBeUndefined(); - expect(payload.threadId).toBeUndefined(); - expect(payload.bestEffortDeliver).toBeUndefined(); - expect(payload.provider).toBeUndefined(); -} - function normalizeIsolatedAgentTurnCreateJob(params: { name: string; payload?: Record; @@ -80,23 +71,6 @@ function normalizeMainSystemEventCreateJob(params: { } describe("normalizeCronJobCreate", () => { - it("strips payload-level legacy delivery hints from live input", () => { - const normalized = normalizeIsolatedAgentTurnCreateJob({ - name: "legacy", - payload: { - deliver: true, - provider: " TeLeGrAm ", - to: "7200373102", - }, - }); - - const payload = normalized.payload as Record; - expectPayloadDeliveryHintsCleared(payload); - - const delivery = normalized.delivery as Record; - expect(delivery).toEqual({ mode: "announce" }); - }); - it("trims agentId and drops null", () => { const normalized = normalizeCronJobCreate({ name: "agent-set", @@ -153,42 +127,6 @@ describe("normalizeCronJobCreate", () => { expect("sessionKey" in cleared).toBe(false); }); - it("strips top-level legacy delivery hints from live input", () => { - const normalized = normalizeIsolatedAgentTurnCreateJob({ - name: "legacy top-level delivery", - payload: { - kind: "agentTurn", - message: "hi", - }, - delivery: undefined, - }); - - const withLegacyTopLevel = normalizeCronJobCreate({ - name: "legacy top-level delivery", - enabled: true, - schedule: { kind: "cron", expr: "* * * * *" }, - sessionTarget: "isolated", - wakeMode: "now", - payload: { - kind: "agentTurn", - message: "hi", - }, - deliver: false, - channel: "Telegram", - to: "-1001234567890", - threadId: " 99 ", - }) as unknown as Record; - - expect(normalized.delivery).toEqual({ mode: "announce" }); - expect(withLegacyTopLevel.deliver).toBeUndefined(); - expect(withLegacyTopLevel.channel).toBeUndefined(); - expect(withLegacyTopLevel.to).toBeUndefined(); - expect(withLegacyTopLevel.threadId).toBeUndefined(); - - const delivery = withLegacyTopLevel.delivery as Record; - expect(delivery).toEqual({ mode: "announce" }); - }); - it("canonicalizes delivery.channel casing", () => { const normalized = normalizeIsolatedAgentTurnCreateJob({ name: "delivery channel casing", @@ -204,34 +142,7 @@ describe("normalizeCronJobCreate", () => { }); it("coerces ISO schedule.at to normalized ISO (UTC)", () => { - expectNormalizedAtSchedule({ at: "2026-01-12T18:00:00" }); - }); - - it("coerces schedule.atMs string to schedule.at (UTC)", () => { - expectNormalizedAtSchedule({ kind: "at", atMs: "2026-01-12T18:00:00" }); - }); - - it("keeps out-of-range numeric schedule.atMs invalid instead of throwing for create jobs", () => { - const normalized = normalizeMainSystemEventCreateJob({ - name: "out-of-range-at-ms", - schedule: { kind: "at", atMs: 8_640_000_000_000_001 }, - }); - - const schedule = normalized.schedule as Record; - expect(schedule).toEqual({ kind: "at" }); - expect(validateCronAddParams(normalized)).toBe(false); - }); - - it("migrates legacy schedule.cron into schedule.expr", () => { - const normalized = normalizeMainSystemEventCreateJob({ - name: "legacy-cron-field", - schedule: { kind: "cron", cron: "*/10 * * * *", tz: "UTC" }, - }); - - const schedule = normalized.schedule as Record; - expect(schedule.kind).toBe("cron"); - expect(schedule.expr).toBe("*/10 * * * *"); - expect(schedule.cron).toBeUndefined(); + expectNormalizedAtSchedule({ kind: "at", at: "2026-01-12T18:00:00" }); }); it("defaults cron stagger for recurring top-of-hour schedules", () => { @@ -258,7 +169,7 @@ describe("normalizeCronJobCreate", () => { const normalized = normalizeCronJobCreate({ name: "default delete", enabled: true, - schedule: { at: "2026-01-12T18:00:00Z" }, + schedule: { kind: "at", at: "2026-01-12T18:00:00Z" }, sessionTarget: "main", wakeMode: "next-heartbeat", payload: { @@ -413,141 +324,11 @@ describe("normalizeCronJobCreate", () => { expect(delivery.mode).toBe("announce"); }); - it("migrates legacy isolation settings to announce delivery", () => { - const normalized = normalizeCronJobCreate({ - name: "legacy isolation", - enabled: true, - schedule: { kind: "cron", expr: "* * * * *" }, - payload: { - kind: "agentTurn", - message: "hi", - }, - isolation: { postToMainPrefix: "Cron" }, - }) as unknown as Record; - - const delivery = normalized.delivery as Record; - expect(delivery.mode).toBe("announce"); - expect((normalized as { isolation?: unknown }).isolation).toBeUndefined(); - }); - - it("infers payload kind/session target and name for message-only jobs", () => { - const normalized = normalizeCronJobCreate({ - schedule: { kind: "every", everyMs: 60_000 }, - payload: { message: "Nightly backup" }, - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload.kind).toBe("agentTurn"); - expect(payload.message).toBe("Nightly backup"); - expect(normalized.sessionTarget).toBe("isolated"); - expect(normalized.wakeMode).toBe("now"); - expect(typeof normalized.name).toBe("string"); - }); - - it("normalizes flat legacy cron job rows", () => { - const normalized = normalizeCronJobCreate({ - id: "dbus-watchdog-001", - name: "dbus-watchdog", - kind: "cron", - cron: "*/10 * * * *", - tz: "UTC", - session: "isolated", - message: "watch dbus", - tools: [" exec "], - enabled: true, - created_at: "2026-04-17T20:09:00Z", - }) as unknown as Record; - - expect(normalized.schedule).toEqual({ - kind: "cron", - expr: "*/10 * * * *", - tz: "UTC", - }); - expect(normalized.sessionTarget).toBe("isolated"); - expect(normalized.payload).toEqual({ - kind: "agentTurn", - message: "watch dbus", - toolsAllow: ["exec"], - }); - expect(normalized.kind).toBeUndefined(); - expect(normalized.cron).toBeUndefined(); - expect(normalized.tz).toBeUndefined(); - expect(normalized.session).toBeUndefined(); - expect(normalized.tools).toBeUndefined(); - }); - - it("maps top-level model/thinking/timeout into payload for legacy add params", () => { - const normalized = normalizeCronJobCreate({ - name: "legacy root fields", - schedule: { kind: "every", everyMs: 60_000 }, - payload: { kind: "agentTurn", message: "hello" }, - model: " openrouter/deepseek/deepseek-r1 ", - thinking: " high ", - timeoutSeconds: 45, - toolsAllow: [" exec ", " read "], - allowUnsafeExternalContent: true, - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload.model).toBe("openrouter/deepseek/deepseek-r1"); - expect(payload.thinking).toBe("high"); - expect(payload.timeoutSeconds).toBe(45); - expect(payload.toolsAllow).toEqual(["exec", "read"]); - expect(payload.allowUnsafeExternalContent).toBe(true); - expect(validateCronAddParams(normalized)).toBe(true); - }); - - it("promotes implicit text payloads with agentTurn hints for create jobs", () => { - const normalized = normalizeCronJobCreate({ - name: "nested text model", - schedule: { kind: "every", everyMs: 60_000 }, - payload: { - text: " summarize issue status ", - model: " anthropic/claude-sonnet-4-6 ", - thinking: " high ", - }, - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload).toEqual({ - kind: "agentTurn", - message: "summarize issue status", - model: "anthropic/claude-sonnet-4-6", - thinking: "high", - }); - expect(normalized.sessionTarget).toBe("isolated"); - expect(validateCronAddParams(normalized)).toBe(true); - }); - - it("promotes legacy top-level text with agentTurn hints for create jobs", () => { - const normalized = normalizeCronJobCreate({ - name: "legacy text model", - schedule: { kind: "every", everyMs: 60_000 }, - text: " summarize issue status ", - model: " openrouter/deepseek/deepseek-r1 ", - fallbacks: [], - toolsAllow: [" read "], - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload).toEqual({ - kind: "agentTurn", - message: "summarize issue status", - model: "openrouter/deepseek/deepseek-r1", - fallbacks: [], - toolsAllow: ["read"], - }); - expect(normalized.text).toBeUndefined(); - expect(normalized.model).toBeUndefined(); - expect(validateCronAddParams(normalized)).toBe(true); - }); - it("preserves timeoutSeconds=0 for no-timeout agentTurn payloads", () => { const normalized = normalizeCronJobCreate({ - name: "legacy no-timeout", + name: "no-timeout", schedule: { kind: "every", everyMs: 60_000 }, - payload: { kind: "agentTurn", message: "hello" }, - timeoutSeconds: 0, + payload: { kind: "agentTurn", message: "hello", timeoutSeconds: 0 }, }) as unknown as Record; const payload = normalized.payload as Record; @@ -680,6 +461,7 @@ describe("normalizeCronJobCreate", () => { const normalized = normalizeCronJobCreate({ name: "every-string", schedule: { + kind: "every", everyMs: "60000", anchorMs: "123.9", }, @@ -837,19 +619,10 @@ describe("normalizeCronJobCreate", () => { }); describe("normalizeCronJobPatch", () => { - it("infers agentTurn payloads from top-level model-only patch hints", () => { - const normalized = normalizeCronJobPatch({ - model: "openrouter/deepseek/deepseek-r1", - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload.kind).toBe("agentTurn"); - expect(payload.model).toBe("openrouter/deepseek/deepseek-r1"); - }); - - it("infers agentTurn kind for model-only payload patches", () => { + it("normalizes agentTurn model-only payload patches", () => { const normalized = normalizeCronJobPatch({ payload: { + kind: "agentTurn", model: "anthropic/claude-sonnet-4-6", }, }) as unknown as Record; @@ -859,76 +632,10 @@ describe("normalizeCronJobPatch", () => { expect(payload.model).toBe("anthropic/claude-sonnet-4-6"); }); - it("promotes implicit text payloads with agentTurn hints for patches", () => { - const normalized = normalizeCronJobPatch({ - payload: { - text: " summarize issue status ", - model: "anthropic/claude-sonnet-4-6", - }, - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload).toEqual({ - kind: "agentTurn", - message: "summarize issue status", - model: "anthropic/claude-sonnet-4-6", - }); - expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(true); - }); - - it("promotes legacy top-level text with agentTurn hints for patches", () => { - const normalized = normalizeCronJobPatch({ - text: " summarize issue status ", - model: "openrouter/deepseek/deepseek-r1", - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload).toEqual({ - kind: "agentTurn", - message: "summarize issue status", - model: "openrouter/deepseek/deepseek-r1", - }); - expect(normalized.text).toBeUndefined(); - expect(normalized.model).toBeUndefined(); - expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(true); - }); - - it("infers agentTurn kind for lightContext-only payload patches", () => { - const normalized = normalizeCronJobPatch({ - payload: { - lightContext: true, - }, - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload.kind).toBe("agentTurn"); - expect(payload.lightContext).toBe(true); - }); - - it("maps top-level fallback lists into agentTurn payload patches", () => { - const normalized = normalizeCronJobPatch({ - fallbacks: [" openrouter/gpt-4.1-mini ", "anthropic/claude-haiku-3-5"], - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload.kind).toBe("agentTurn"); - expect(payload.fallbacks).toEqual(["openrouter/gpt-4.1-mini", "anthropic/claude-haiku-3-5"]); - }); - - it("maps top-level toolsAllow lists into agentTurn payload patches", () => { - const normalized = normalizeCronJobPatch({ - toolsAllow: [" exec ", " read "], - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload.kind).toBe("agentTurn"); - expect(payload.toolsAllow).toEqual(["exec", "read"]); - expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(true); - }); - it("preserves empty fallback lists so patches can disable fallbacks", () => { const normalized = normalizeCronJobPatch({ payload: { + kind: "agentTurn", fallbacks: [], }, }) as unknown as Record; @@ -941,6 +648,7 @@ describe("normalizeCronJobPatch", () => { it("preserves empty toolsAllow lists so patches can disable all tools", () => { const normalized = normalizeCronJobPatch({ payload: { + kind: "agentTurn", toolsAllow: [], }, }) as unknown as Record; @@ -951,9 +659,10 @@ describe("normalizeCronJobPatch", () => { expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(true); }); - it("infers agentTurn kind for fallback-only payload patches", () => { + it("normalizes agentTurn fallback-only payload patches", () => { const normalized = normalizeCronJobPatch({ payload: { + kind: "agentTurn", fallbacks: [" openrouter/gpt-4.1-mini ", "anthropic/claude-haiku-3-5"], }, }) as unknown as Record; @@ -963,22 +672,24 @@ describe("normalizeCronJobPatch", () => { expect(payload.fallbacks).toEqual(["openrouter/gpt-4.1-mini", "anthropic/claude-haiku-3-5"]); }); - it("does not infer agentTurn kind for malformed fallback-only payload patches", () => { + it("drops malformed agentTurn fallback-only payload patches", () => { const normalized = normalizeCronJobPatch({ payload: { + kind: "agentTurn", fallbacks: [123], }, }) as unknown as Record; const payload = normalized.payload as Record; - expect(payload.kind).toBeUndefined(); + expect(payload.kind).toBe("agentTurn"); expect(payload.fallbacks).toBeUndefined(); - expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(false); + expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(true); }); - it("infers agentTurn kind for toolsAllow-only payload patches", () => { + it("normalizes agentTurn toolsAllow-only payload patches", () => { const normalized = normalizeCronJobPatch({ payload: { + kind: "agentTurn", toolsAllow: [" exec ", " read "], }, }) as unknown as Record; @@ -989,22 +700,24 @@ describe("normalizeCronJobPatch", () => { expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(true); }); - it("does not infer agentTurn kind for malformed toolsAllow-only payload patches", () => { + it("drops malformed agentTurn toolsAllow-only payload patches", () => { const normalized = normalizeCronJobPatch({ payload: { + kind: "agentTurn", toolsAllow: [123], }, }) as unknown as Record; const payload = normalized.payload as Record; - expect(payload.kind).toBeUndefined(); + expect(payload.kind).toBe("agentTurn"); expect(payload.toolsAllow).toBeUndefined(); - expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(false); + expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(true); }); it("preserves null toolsAllow so patches can clear the allow-list", () => { const normalized = normalizeCronJobPatch({ payload: { + kind: "agentTurn", toolsAllow: null, }, }) as unknown as Record; @@ -1014,18 +727,6 @@ describe("normalizeCronJobPatch", () => { expect(payload.toolsAllow).toBeNull(); expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(true); }); - it("does not infer agentTurn kind for delivery-only legacy hints", () => { - const normalized = normalizeCronJobPatch({ - payload: { - channel: "telegram", - to: "+15550001111", - }, - }) as unknown as Record; - - const payload = normalized.payload as Record; - expect(payload.kind).toBeUndefined(); - expectPayloadDeliveryHintsCleared(payload); - }); it("preserves null sessionKey patches and trims string values", () => { const trimmed = normalizeCronJobPatch({ @@ -1067,18 +768,6 @@ describe("normalizeCronJobPatch", () => { expect(schedule.staggerMs).toBe(30_000); }); - it("strips legacy patch threadId hints from live input", () => { - const normalized = normalizeCronJobPatch({ - payload: { - kind: "agentTurn", - threadId: 77, - }, - }) as unknown as Record; - - expect(normalized.delivery).toBeUndefined(); - expect((normalized.payload as Record).threadId).toBeUndefined(); - }); - it("prunes agentTurn-only payload fields from systemEvent patch payloads", () => { const normalized = normalizeCronJobPatch({ payload: { @@ -1120,16 +809,6 @@ describe("normalizeCronJobPatch", () => { expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(true); }); - it("keeps out-of-range numeric schedule.atMs invalid instead of throwing for patches", () => { - const normalized = normalizeCronJobPatch({ - schedule: { kind: "at", atMs: 8_640_000_000_000_001 }, - }) as unknown as Record; - - const schedule = normalized.schedule as Record; - expect(schedule).toEqual({ kind: "at" }); - expect(validateCronUpdateParams({ id: "job-1", patch: normalized })).toBe(false); - }); - it("prunes staggerMs from every schedules for patches", () => { const normalized = normalizeCronJobPatch({ schedule: { diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index bd43e9dc393c..a2a4e9569233 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -15,7 +15,7 @@ import { } from "./delivery-field-schemas.js"; import { parseAbsoluteTimeMs } from "./parse.js"; import { coerceFiniteScheduleNumber } from "./schedule-number.js"; -import { inferLegacyName } from "./service/normalize.js"; +import { inferCronJobName } from "./service/normalize.js"; import { assertSafeCronSessionTargetId, resolveCronCurrentSessionTarget, @@ -35,22 +35,6 @@ const DEFAULT_OPTIONS: NormalizeOptions = { applyDefaults: false, }; -function hasTrimmedStringValue(value: unknown) { - return parseOptionalField(TrimmedNonEmptyStringFieldSchema, value) !== undefined; -} - -function hasAgentTurnPayloadHint(payload: UnknownRecord) { - return ( - hasTrimmedStringValue(payload.model) || - normalizeTrimmedStringArray(payload.fallbacks) !== undefined || - normalizeTrimmedStringArray(payload.toolsAllow, { allowNull: true }) !== undefined || - hasTrimmedStringValue(payload.thinking) || - typeof payload.timeoutSeconds === "number" || - typeof payload.lightContext === "boolean" || - typeof payload.allowUnsafeExternalContent === "boolean" - ); -} - function normalizeTrimmedStringArray( value: unknown, options?: { allowNull?: boolean }, @@ -73,34 +57,13 @@ function coerceSchedule(schedule: UnknownRecord) { const rawKind = normalizeLowercaseStringOrEmpty(schedule.kind); const kind = rawKind === "at" || rawKind === "every" || rawKind === "cron" ? rawKind : undefined; const exprRaw = normalizeOptionalString(schedule.expr) ?? ""; - const legacyCronRaw = normalizeOptionalString(schedule.cron) ?? ""; - const normalizedExpr = exprRaw || legacyCronRaw; const everyMs = coerceFiniteScheduleNumber(schedule.everyMs); const anchorMs = coerceFiniteScheduleNumber(schedule.anchorMs); - const atMsRaw = schedule.atMs; - const atRaw = schedule.at; - const atString = normalizeOptionalString(atRaw) ?? ""; - const parsedAtMs = - typeof atMsRaw === "number" - ? atMsRaw - : typeof atMsRaw === "string" - ? parseAbsoluteTimeMs(atMsRaw) - : atString - ? parseAbsoluteTimeMs(atString) - : null; + const atString = normalizeOptionalString(schedule.at) ?? ""; + const parsedAtMs = atString ? parseAbsoluteTimeMs(atString) : null; if (kind) { next.kind = kind; - } else if ( - typeof schedule.atMs === "number" || - typeof schedule.at === "string" || - typeof schedule.atMs === "string" - ) { - next.kind = "at"; - } else if (everyMs !== undefined) { - next.kind = "every"; - } else if (normalizedExpr) { - next.kind = "cron"; } const parsedAtIso = parsedAtMs !== null ? timestampMsToIsoString(parsedAtMs) : undefined; @@ -109,12 +72,9 @@ function coerceSchedule(schedule: UnknownRecord) { } else if (parsedAtIso !== undefined) { next.at = parsedAtIso; } - if ("atMs" in next) { - delete next.atMs; - } - if (normalizedExpr) { - next.expr = normalizedExpr; + if (exprRaw) { + next.expr = exprRaw; } else if ("expr" in next) { delete next.expr; } @@ -125,10 +85,6 @@ function coerceSchedule(schedule: UnknownRecord) { if (anchorMs !== undefined && anchorMs >= 0) { next.anchorMs = Math.floor(anchorMs); } - if ("cron" in next) { - delete next.cron; - } - const staggerMs = normalizeCronStaggerMs(schedule.staggerMs); if (staggerMs !== undefined) { next.staggerMs = staggerMs; @@ -156,21 +112,6 @@ function coerceSchedule(schedule: UnknownRecord) { return next; } -function inferTopLevelSchedule(next: UnknownRecord): UnknownRecord | null { - const kindRaw = normalizeLowercaseStringOrEmpty(next.kind); - const kind = kindRaw === "at" || kindRaw === "every" || kindRaw === "cron" ? kindRaw : undefined; - const schedule: UnknownRecord = {}; - if (kind) { - schedule.kind = kind; - } - for (const field of ["at", "atMs", "everyMs", "anchorMs", "expr", "cron", "tz", "staggerMs"]) { - if (field in next) { - schedule[field] = next[field]; - } - } - return Object.keys(schedule).length > 0 ? coerceSchedule(schedule) : null; -} - function coercePayload(payload: UnknownRecord) { const next: UnknownRecord = { ...payload }; const kindRaw = normalizeLowercaseStringOrEmpty(next.kind); @@ -181,22 +122,6 @@ function coercePayload(payload: UnknownRecord) { } else if (kindRaw) { next.kind = kindRaw; } - if (!next.kind) { - const message = normalizeOptionalString(next.message); - const text = normalizeOptionalString(next.text); - const hasAgentTurnHint = hasAgentTurnPayloadHint(next); - if (message) { - next.kind = "agentTurn"; - } else if (text && hasAgentTurnHint) { - next.kind = "agentTurn"; - next.message = text; - } else if (text) { - next.kind = "systemEvent"; - } else if (hasAgentTurnHint) { - // Accept partial agentTurn payload patches that only tweak agent-turn-only fields. - next.kind = "agentTurn"; - } - } if (typeof next.message === "string") { const trimmed = normalizeOptionalString(next.message) ?? ""; if (trimmed) { @@ -267,24 +192,6 @@ function coercePayload(payload: UnknownRecord) { } else if (next.kind === "agentTurn") { delete next.text; } - if ("deliver" in next) { - delete next.deliver; - } - if ("channel" in next) { - delete next.channel; - } - if ("to" in next) { - delete next.to; - } - if ("threadId" in next) { - delete next.threadId; - } - if ("bestEffortDeliver" in next) { - delete next.bestEffortDeliver; - } - if ("provider" in next) { - delete next.provider; - } return next; } @@ -345,37 +252,6 @@ function coerceCompletionDestination(value: UnknownRecord) { } satisfies UnknownRecord; } -function inferTopLevelPayload(next: UnknownRecord) { - const message = normalizeOptionalString(next.message) ?? ""; - if (message) { - return { kind: "agentTurn", message } satisfies UnknownRecord; - } - - const text = normalizeOptionalString(next.text) ?? ""; - if (text) { - if (hasAgentTurnPayloadHint(next)) { - return { kind: "agentTurn", message: text } satisfies UnknownRecord; - } - return { kind: "systemEvent", text } satisfies UnknownRecord; - } - - if (hasAgentTurnPayloadHint(next)) { - return { kind: "agentTurn" } satisfies UnknownRecord; - } - - return null; -} - -function unwrapJob(raw: UnknownRecord) { - if (isRecord(raw.data)) { - return raw.data; - } - if (isRecord(raw.job)) { - return raw.job; - } - return raw; -} - function normalizeSessionTarget(raw: unknown) { if (typeof raw !== "string") { return undefined; @@ -403,80 +279,6 @@ function normalizeWakeMode(raw: unknown) { return undefined; } -function copyTopLevelAgentTurnFields(next: UnknownRecord, payload: UnknownRecord) { - const copyString = (field: "model" | "thinking") => { - if (normalizeOptionalString(payload[field])) { - return; - } - const value = next[field]; - const normalized = normalizeOptionalString(value); - if (normalized) { - payload[field] = normalized; - } - }; - copyString("model"); - copyString("thinking"); - - if (typeof payload.timeoutSeconds !== "number" && "timeoutSeconds" in next) { - const timeoutSeconds = parseOptionalField(TimeoutSecondsFieldSchema, next.timeoutSeconds); - if (timeoutSeconds !== undefined) { - payload.timeoutSeconds = timeoutSeconds; - } - } - if (!Array.isArray(payload.fallbacks) && Array.isArray(next.fallbacks)) { - const fallbacks = normalizeTrimmedStringArray(next.fallbacks); - if (fallbacks !== undefined) { - payload.fallbacks = fallbacks; - } - } - if (!("toolsAllow" in payload) || payload.toolsAllow === undefined) { - const toolsAllow = - normalizeTrimmedStringArray(next.toolsAllow, { allowNull: true }) ?? - normalizeTrimmedStringArray(next.tools); - if (toolsAllow !== undefined) { - payload.toolsAllow = toolsAllow; - } - } - if (typeof payload.lightContext !== "boolean" && typeof next.lightContext === "boolean") { - payload.lightContext = next.lightContext; - } - if ( - typeof payload.allowUnsafeExternalContent !== "boolean" && - typeof next.allowUnsafeExternalContent === "boolean" - ) { - payload.allowUnsafeExternalContent = next.allowUnsafeExternalContent; - } -} - -function stripLegacyTopLevelFields(next: UnknownRecord) { - delete next.model; - delete next.thinking; - delete next.timeoutSeconds; - delete next.fallbacks; - delete next.lightContext; - delete next.toolsAllow; - delete next.allowUnsafeExternalContent; - delete next.message; - delete next.text; - delete next.kind; - delete next.cron; - delete next.tz; - delete next.at; - delete next.atMs; - delete next.everyMs; - delete next.anchorMs; - delete next.staggerMs; - delete next.session; - delete next.tools; - delete next.deliver; - delete next.channel; - delete next.to; - delete next.toolsAllow; - delete next.threadId; - delete next.bestEffortDeliver; - delete next.provider; -} - export function normalizeCronJobInput( raw: unknown, options: NormalizeOptions = DEFAULT_OPTIONS, @@ -484,7 +286,7 @@ export function normalizeCronJobInput( if (!isRecord(raw)) { return null; } - const base = unwrapJob(raw); + const base = raw; const next: UnknownRecord = { ...base }; if ("agentId" in base) { @@ -537,11 +339,6 @@ export function normalizeCronJobInput( } else { delete next.sessionTarget; } - } else if ("session" in base) { - const normalized = normalizeSessionTarget(base.session); - if (normalized) { - next.sessionTarget = normalized; - } } if ("wakeMode" in base) { @@ -555,18 +352,6 @@ export function normalizeCronJobInput( if (isRecord(base.schedule)) { next.schedule = coerceSchedule(base.schedule); - } else if (!isRecord(next.schedule)) { - const inferredSchedule = inferTopLevelSchedule(next); - if (inferredSchedule) { - next.schedule = inferredSchedule; - } - } - - if (!("payload" in next) || !isRecord(next.payload)) { - const inferredPayload = inferTopLevelPayload(next); - if (inferredPayload) { - next.payload = inferredPayload; - } } if (isRecord(base.payload)) { @@ -577,16 +362,6 @@ export function normalizeCronJobInput( next.delivery = coerceDelivery(base.delivery); } - if ("isolation" in next) { - delete next.isolation; - } - - const payload = isRecord(next.payload) ? next.payload : null; - if (payload && payload.kind === "agentTurn") { - copyTopLevelAgentTurnFields(next, payload); - } - stripLegacyTopLevelFields(next); - if (options.applyDefaults) { if (!next.wakeMode) { next.wakeMode = "now"; @@ -599,7 +374,7 @@ export function normalizeCronJobInput( isRecord(next.schedule) && isRecord(next.payload) ) { - next.name = inferLegacyName({ + next.name = inferCronJobName({ schedule: next.schedule as { kind?: unknown; everyMs?: unknown; expr?: unknown }, payload: next.payload as { kind?: unknown; text?: unknown; message?: unknown }, }); diff --git a/src/cron/schedule-identity.ts b/src/cron/schedule-identity.ts index 8fcd8ae39e0c..bfa5ad52d8d4 100644 --- a/src/cron/schedule-identity.ts +++ b/src/cron/schedule-identity.ts @@ -27,9 +27,8 @@ function schedulePayloadFromRecord( | { kind: "cron"; expr: string; tz?: string; staggerMs?: number } | undefined { const rawKind = readString(schedule, "kind")?.toLowerCase(); - const expr = readString(schedule, "expr") ?? readString(schedule, "cron"); + const expr = readString(schedule, "expr"); const at = readString(schedule, "at"); - const atMs = readNumber(schedule, "atMs"); const everyMs = readNumber(schedule, "everyMs"); const anchorMs = readNumber(schedule, "anchorMs"); const tz = readString(schedule, "tz"); @@ -37,7 +36,7 @@ function schedulePayloadFromRecord( const kind = rawKind === "at" || rawKind === "every" || rawKind === "cron" ? rawKind - : at || atMs !== undefined + : at ? "at" : everyMs !== undefined ? "every" @@ -46,11 +45,7 @@ function schedulePayloadFromRecord( : undefined; if (kind === "at") { - return at - ? { kind: "at", at } - : atMs !== undefined - ? { kind: "at", at: String(atMs) } - : undefined; + return at ? { kind: "at", at } : undefined; } if (kind === "every" && everyMs !== undefined) { return { kind: "every", everyMs, anchorMs }; @@ -67,7 +62,7 @@ function resolveSchedulePayload( if (job.schedule && typeof job.schedule === "object" && !Array.isArray(job.schedule)) { return schedulePayloadFromRecord(job.schedule as Record); } - return schedulePayloadFromRecord(job); + return undefined; } export function tryCronScheduleIdentity(job: CronScheduleIdentityInput): string | undefined { diff --git a/src/cron/schedule.test.ts b/src/cron/schedule.test.ts index 6823c7797638..1d1e7a84ce89 100644 --- a/src/cron/schedule.test.ts +++ b/src/cron/schedule.test.ts @@ -58,19 +58,6 @@ describe("cron schedule", () => { ).toThrow("invalid cron schedule: expr is required"); }); - it("supports legacy cron field when expr is missing", () => { - const nowMs = Date.parse("2025-12-13T00:00:00.000Z"); - const next = computeNextRunAtMs( - { - kind: "cron", - cron: "0 9 * * 3", - tz: "America/Los_Angeles", - } as unknown as { kind: "cron"; expr: string; tz?: string }, - nowMs, - ); - expect(next).toBe(Date.parse("2025-12-17T17:00:00.000Z")); - }); - it("computes next run for every schedule", () => { const anchor = Date.parse("2025-12-13T00:00:00.000Z"); const now = anchor + 10_000; @@ -86,7 +73,7 @@ describe("cron schedule", () => { expect(next).toBe(now + 30_000); }); - it("handles string-typed everyMs and anchorMs from legacy persisted data", () => { + it("handles string-typed everyMs and anchorMs", () => { const anchor = Date.parse("2025-12-13T00:00:00.000Z"); const now = anchor + 10_000; const next = computeNextRunAtMs( diff --git a/src/cron/schedule.ts b/src/cron/schedule.ts index b6811e30f423..651f2d382cb8 100644 --- a/src/cron/schedule.ts +++ b/src/cron/schedule.ts @@ -37,16 +37,11 @@ function resolveCachedCron(expr: string, timezone: string): Cron { return next; } -function resolveCronFromSchedule(schedule: { - tz?: string; - expr?: unknown; - cron?: unknown; -}): Cron | undefined { - const exprSource = typeof schedule.expr === "string" ? schedule.expr : schedule.cron; - if (typeof exprSource !== "string") { +function resolveCronFromSchedule(schedule: { tz?: string; expr?: unknown }): Cron | undefined { + if (typeof schedule.expr !== "string") { throw new Error("invalid cron schedule: expr is required"); } - const expr = exprSource.trim(); + const expr = schedule.expr.trim(); if (!expr) { return undefined; } @@ -55,18 +50,7 @@ function resolveCronFromSchedule(schedule: { export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined { if (schedule.kind === "at") { - // Handle both canonical `at` (string) and legacy `atMs` (number) fields. - // The store migration should convert atMs→at, but be defensive in case - // the migration hasn't run yet or was bypassed. - const sched = schedule as { at?: string; atMs?: number | string }; - const atMs = - typeof sched.atMs === "number" && Number.isFinite(sched.atMs) && sched.atMs > 0 - ? sched.atMs - : typeof sched.atMs === "string" - ? parseAbsoluteTimeMs(sched.atMs) - : typeof sched.at === "string" - ? parseAbsoluteTimeMs(sched.at) - : null; + const atMs = parseAbsoluteTimeMs(schedule.at); if (atMs === null) { return undefined; } @@ -89,7 +73,7 @@ export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): numbe return anchor + steps * everyMs; } - const cron = resolveCronFromSchedule(schedule as { tz?: string; expr?: unknown; cron?: unknown }); + const cron = resolveCronFromSchedule(schedule); if (!cron) { return undefined; } @@ -134,7 +118,7 @@ export function computePreviousRunAtMs(schedule: CronSchedule, nowMs: number): n if (schedule.kind !== "cron") { return undefined; } - const cron = resolveCronFromSchedule(schedule as { tz?: string; expr?: unknown; cron?: unknown }); + const cron = resolveCronFromSchedule(schedule); if (!cron) { return undefined; } diff --git a/src/cron/service.every-jobs-fire.test.ts b/src/cron/service.every-jobs-fire.test.ts index c7f3c0d452ce..61165a73bfc4 100644 --- a/src/cron/service.every-jobs-fire.test.ts +++ b/src/cron/service.every-jobs-fire.test.ts @@ -137,7 +137,7 @@ describe("CronService interval/cron jobs fire on time", () => { await store.cleanup(); }); - it("keeps legacy every jobs due while minute cron jobs recompute schedules", async () => { + it("keeps every jobs due while minute cron jobs recompute schedules", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeat = vi.fn(); @@ -147,8 +147,8 @@ describe("CronService interval/cron jobs fire on time", () => { storePath: store.storePath, jobs: [ { - id: "legacy-every", - name: "legacy every", + id: "loaded-every", + name: "loaded every", enabled: true, createdAtMs: nowMs, updatedAtMs: nowMs, @@ -183,7 +183,7 @@ describe("CronService interval/cron jobs fire on time", () => { }); await cron.start(); - // Perf: a few recomputation cycles are enough to catch legacy "every" drift. + // Perf: a few recomputation cycles are enough to catch "every" drift. for (let minute = 1; minute <= 3; minute++) { vi.setSystemTime(new Date(nowMs + minute * 60_000)); const minuteRun = await cron.run("minute-cron", "force"); @@ -192,7 +192,7 @@ describe("CronService interval/cron jobs fire on time", () => { // "every" cadence is 2m; verify it stays due at the 6-minute boundary. vi.setSystemTime(new Date(nowMs + 6 * 60_000)); - const sfRun = await cron.run("legacy-every", "due"); + const sfRun = await cron.run("loaded-every", "due"); expect(sfRun).toEqual({ ok: true, ran: true }); const sfRuns = countMainSystemEvents(enqueueSystemEvent, "sf-tick"); @@ -201,7 +201,7 @@ describe("CronService interval/cron jobs fire on time", () => { expect(sfRuns).toBeGreaterThan(0); const jobs = await cron.list({ includeDisabled: true }); - const sfJob = jobs.find((job) => job.id === "legacy-every"); + const sfJob = jobs.find((job) => job.id === "loaded-every"); expect(sfJob?.state.lastStatus).toBe("ok"); expect(sfJob?.schedule.kind).toBe("every"); expect(sfJob?.state.nextRunAtMs).toBe(nowMs + 8 * 60_000); diff --git a/src/cron/service.issue-19676-at-reschedule.test.ts b/src/cron/service.issue-19676-at-reschedule.test.ts index 32139dd7ddf9..6007b05829d1 100644 --- a/src/cron/service.issue-19676-at-reschedule.test.ts +++ b/src/cron/service.issue-19676-at-reschedule.test.ts @@ -42,17 +42,6 @@ describe("Cron issue #19676 at-job reschedule", () => { expect(computeJobNextRunAtMs(job, nowMs)).toBe(RESCHEDULED_AT_MS); }); - it("returns the new atMs when rescheduled via legacy numeric atMs field", () => { - const job = createAtJob({ - state: { lastStatus: "ok", lastRunAtMs: LAST_RUN_AT_MS }, - }); - // Simulate legacy numeric atMs field on the schedule object. - const schedule = job.schedule as { kind: "at"; atMs?: number }; - schedule.atMs = RESCHEDULED_AT_MS; - const nowMs = LAST_RUN_AT_MS + 1_000; - expect(computeJobNextRunAtMs(job, nowMs)).toBe(RESCHEDULED_AT_MS); - }); - it("returns undefined when rescheduled to a time before the last run", () => { const beforeLastRun = LAST_RUN_AT_MS - 60_000; const job = createAtJob({ diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index 30b614f3b68a..689430255c1b 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -836,29 +836,15 @@ describe("createJob delivery defaults", () => { }); expect(job.delivery).toBeUndefined(); }); - - it("uses legacy systemEvent message text without throwing", () => { - const state = createMockState(now, { defaultAgentId: "main" }); - const job = createJob(state, { - name: "legacy system event", - enabled: true, - schedule: { kind: "every", everyMs: 60_000 }, - sessionTarget: "main", - wakeMode: "now", - payload: { kind: "systemEvent", message: "legacy text" } as never, - }); - - expect(resolveJobPayloadTextForMain(job)).toBe("legacy text"); - }); }); describe("recomputeNextRuns", () => { - it("backfills missing every anchorMs for legacy loaded jobs", () => { + it("backfills missing every anchorMs for loaded jobs", () => { const now = Date.parse("2026-03-01T12:00:00.000Z"); const createdAtMs = now - 120_000; const job: CronJob = { - id: "legacy-every", - name: "legacy-every", + id: "loaded-every", + name: "loaded-every", enabled: true, createdAtMs, updatedAtMs: createdAtMs, diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 7619487a201a..6b19807b45dc 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -400,18 +400,7 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und return isFiniteTimestamp(next) ? next : undefined; } if (job.schedule.kind === "at") { - // Handle both canonical `at` (string) and legacy `atMs` (number) fields. - // The store migration should convert atMs→at, but be defensive in case - // the migration hasn't run yet or was bypassed. - const schedule = job.schedule as { at?: string; atMs?: number | string }; - const atMs = - typeof schedule.atMs === "number" && Number.isFinite(schedule.atMs) && schedule.atMs > 0 - ? schedule.atMs - : typeof schedule.atMs === "string" - ? parseAbsoluteTimeMs(schedule.atMs) - : typeof schedule.at === "string" - ? parseAbsoluteTimeMs(schedule.at) - : null; + const atMs = parseAbsoluteTimeMs(job.schedule.at); // One-shot jobs stay due until they successfully finish, but if the // schedule was updated to a time after the last run, re-arm the job. if (job.state.lastStatus === "ok" && job.state.lastRunAtMs) { diff --git a/src/cron/service/normalize.ts b/src/cron/service/normalize.ts index 1dfb1da20a4e..9a0c1855cbac 100644 --- a/src/cron/service/normalize.ts +++ b/src/cron/service/normalize.ts @@ -29,7 +29,7 @@ export function normalizeOptionalAgentId(raw: unknown) { return normalizeAgentId(trimmed); } -export function inferLegacyName(job: { +export function inferCronJobName(job: { schedule?: { kind?: unknown; everyMs?: unknown; expr?: unknown }; payload?: { kind?: unknown; text?: unknown; message?: unknown }; }) { @@ -63,12 +63,7 @@ export function inferLegacyName(job: { export function normalizePayloadToSystemText(payload: CronPayload) { if (payload.kind === "systemEvent") { - const text = (payload as { text?: unknown }).text; - if (typeof text === "string") { - return text.trim(); - } - const legacyMessage = (payload as { message?: unknown }).message; - return typeof legacyMessage === "string" ? legacyMessage.trim() : ""; + return typeof payload.text === "string" ? payload.text.trim() : ""; } return typeof payload.message === "string" ? payload.message.trim() : ""; } diff --git a/src/cron/service/ops.test.ts b/src/cron/service/ops.test.ts index 41d493608898..3a23a73854d6 100644 --- a/src/cron/service/ops.test.ts +++ b/src/cron/service/ops.test.ts @@ -6,7 +6,7 @@ import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js"; import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; import { formatTaskStatusDetail } from "../../tasks/task-status.js"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js"; -import { loadCronStore } from "../store.js"; +import { loadCronStore, loadCronStoreWithConfigJobs } from "../store.js"; import type { CronJob } from "../types.js"; import { add, run, start, stop, update } from "./ops.js"; import { createCronServiceState } from "./state.js"; @@ -113,23 +113,34 @@ function insertCronJobRow(storePath: string, job: CronJob) { db.prepare( `INSERT INTO cron_jobs ( store_key, job_id, name, enabled, created_at_ms, schedule_kind, - session_target, wake_mode, payload_kind, payload_message, job_json, state_json, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - ).run( - path.resolve(storePath), - job.id, - job.name, - job.enabled ? 1 : 0, - job.createdAtMs, - job.schedule.kind, - job.sessionTarget, - job.wakeMode, - job.payload.kind, - "message" in job.payload ? job.payload.message : null, - JSON.stringify(job), - JSON.stringify(job.state), - job.updatedAtMs, - ); + at, every_ms, anchor_ms, schedule_expr, session_target, wake_mode, payload_kind, + payload_message, delivery_mode, delivery_to, job_json, state_json, updated_at + ) VALUES ( + $storeKey, $jobId, $name, $enabled, $createdAtMs, $scheduleKind, + $at, $everyMs, $anchorMs, $scheduleExpr, $sessionTarget, $wakeMode, $payloadKind, + $payloadMessage, $deliveryMode, $deliveryTo, $jobJson, $stateJson, $updatedAt + )`, + ).run({ + $storeKey: path.resolve(storePath), + $jobId: job.id, + $name: job.name, + $enabled: job.enabled ? 1 : 0, + $createdAtMs: job.createdAtMs, + $scheduleKind: job.schedule.kind, + $at: job.schedule.kind === "at" ? job.schedule.at : null, + $everyMs: job.schedule.kind === "every" ? job.schedule.everyMs : null, + $anchorMs: job.schedule.kind === "every" ? (job.schedule.anchorMs ?? null) : null, + $scheduleExpr: job.schedule.kind === "cron" ? job.schedule.expr : null, + $sessionTarget: job.sessionTarget, + $wakeMode: job.wakeMode, + $payloadKind: job.payload.kind, + $payloadMessage: "message" in job.payload ? job.payload.message : null, + $deliveryMode: job.delivery ? (job.delivery.mode ?? "announce") : null, + $deliveryTo: job.delivery?.to ?? null, + $jobJson: JSON.stringify(job), + $stateJson: JSON.stringify(job.state), + $updatedAt: job.updatedAtMs, + }); }); } @@ -249,7 +260,7 @@ describe("cron service ops seam coverage", () => { enabled: true, createdAtMs: now - 60_000, updatedAtMs: now - 60_000, - schedule: { kind: "every", everyMs: 3_600_000 }, + schedule: { kind: "every", everyMs: 3_600_000, anchorMs: now }, sessionTarget: "isolated", wakeMode: "next-heartbeat", payload: { kind: "agentTurn", message: "do work" }, @@ -274,13 +285,14 @@ describe("cron service ops seam coverage", () => { clearTimeout(state.timer); } - const loaded = await loadCronStore(storePath); - const persisted = loaded.jobs[0] as CronJob & { notify?: unknown }; - expect(persisted.notify).toBe(true); + const loaded = await loadCronStoreWithConfigJobs(storePath); + const persisted = loaded.store.jobs[0] as CronJob & { notify?: unknown }; + expect(persisted.notify).toBeUndefined(); expect(persisted.delivery).toEqual({ mode: "announce", to: "telegram:chat-1", }); + expect(loaded.configJobs[0]?.notify).toBe(true); expect(logger.info).not.toHaveBeenCalledWith( { storePath }, "cron: migrated legacy notify fallback jobs before scheduler startup", diff --git a/src/cron/service/store.test.ts b/src/cron/service/store.test.ts index b16938f012e4..68a31f258cb7 100644 --- a/src/cron/service/store.test.ts +++ b/src/cron/service/store.test.ts @@ -274,28 +274,6 @@ describe("cron service store seam coverage", () => { expect(findJobOrThrow(state, "reload-cron-expr-job").state.nextRunAtMs).toBe(dueNextRunAtMs); }); - it("keeps a force-reloaded legacy string schedule for runtime repair handling", async () => { - const { storePath } = await makeStorePath(); - const staleNextRunAtMs = STORE_TEST_NOW + 3_600_000; - - await writeSingleJobStore(storePath, { - ...createReloadCronJob({ - updatedAtMs: STORE_TEST_NOW, - state: { nextRunAtMs: staleNextRunAtMs }, - }), - schedule: "0 17 * * *", - }); - - const state = createStoreTestState(storePath); - await expect(ensureLoaded(state, { forceReload: true, skipRecompute: true })).resolves.toBe( - undefined, - ); - - const job = findJobOrThrow(state, "reload-cron-expr-job"); - expect(job.schedule).toBe("0 17 * * *"); - expect(job.state.nextRunAtMs).toBe(staleNextRunAtMs); - }); - it("preserves nextRunAtMs after force reload when scheduling inputs are unchanged", async () => { const { storePath } = await makeStorePath(); const originalNextRunAtMs = STORE_TEST_NOW + 3_600_000; diff --git a/src/cron/store.test.ts b/src/cron/store.test.ts index 2d884d0d7835..ce27defc1ed0 100644 --- a/src/cron/store.test.ts +++ b/src/cron/store.test.ts @@ -515,95 +515,8 @@ describe("cron store", () => { }); }); - it("falls back to job_json payloads for early SQLite cron rows", async () => { + it("round-trips completion destinations through SQLite delivery columns", async () => { const { storePath } = await makeStorePath(); - const storeKey = path.resolve(storePath); - const job = makeStore("early-sqlite-job", true).jobs[0]; - job.sessionTarget = "isolated"; - job.payload = { - kind: "agentTurn", - message: "Keep this prompt", - externalContentSource: "gmail", - }; - - runOpenClawStateWriteTransaction(({ db }) => { - db.prepare( - `INSERT INTO cron_jobs ( - store_key, job_id, name, enabled, created_at_ms, schedule_kind, - session_target, wake_mode, payload_kind, payload_message, job_json, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - ).run( - storeKey, - job.id, - job.name, - 1, - job.createdAtMs, - "every", - "isolated", - "next-heartbeat", - "agentTurn", - null, - JSON.stringify(job), - job.updatedAtMs, - ); - }); - - expect((await loadCronStore(storePath)).jobs[0]?.payload).toMatchObject({ - kind: "agentTurn", - message: "Keep this prompt", - externalContentSource: "gmail", - }); - }); - - it("falls back to modeless job_json delivery for early SQLite cron rows", async () => { - const { storePath } = await makeStorePath(); - const storeKey = path.resolve(storePath); - const job = makeStore("early-sqlite-delivery-job", true).jobs[0]; - job.delivery = { - to: "telegram:chat-1", - threadId: "topic-9", - completionDestination: { - mode: "webhook", - to: "https://example.invalid/cron", - }, - } as CronStoreFile["jobs"][number]["delivery"]; - - runOpenClawStateWriteTransaction(({ db }) => { - db.prepare( - `INSERT INTO cron_jobs ( - store_key, job_id, name, enabled, created_at_ms, schedule_kind, - session_target, wake_mode, payload_kind, payload_message, job_json, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - ).run( - storeKey, - job.id, - job.name, - 1, - job.createdAtMs, - "every", - job.sessionTarget, - job.wakeMode, - "systemEvent", - null, - JSON.stringify(job), - job.updatedAtMs, - ); - }); - - expect((await loadCronStore(storePath)).jobs[0]?.delivery).toEqual({ - mode: "announce", - to: "telegram:chat-1", - threadId: "topic-9", - completionDestination: { - mode: "webhook", - to: "https://example.invalid/cron", - }, - }); - }); - - it("drops fallback completion destinations when SQLite stores non-announce delivery mode", async () => { - const { storePath } = await makeStorePath(); - const storeKey = path.resolve(storePath); const job = makeStore("sqlite-webhook-delivery-job", true).jobs[0]; job.delivery = { mode: "announce", @@ -618,34 +531,19 @@ describe("cron store", () => { }, }; - runOpenClawStateWriteTransaction(({ db }) => { - db.prepare( - `INSERT INTO cron_jobs ( - store_key, job_id, name, enabled, created_at_ms, schedule_kind, - session_target, wake_mode, payload_kind, payload_message, - delivery_mode, delivery_to, job_json, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - ).run( - storeKey, - job.id, - job.name, - 1, - job.createdAtMs, - "every", - job.sessionTarget, - job.wakeMode, - "systemEvent", - null, - "webhook", - "https://example.invalid/direct-webhook", - JSON.stringify(job), - job.updatedAtMs, - ); - }); + await saveCronStore(storePath, { version: 1, jobs: [job] }); expect((await loadCronStore(storePath)).jobs[0]?.delivery).toEqual({ - mode: "webhook", - to: "https://example.invalid/direct-webhook", + mode: "announce", + channel: "telegram", + to: "telegram:chat-1", + threadId: "topic-9", + accountId: "bot-1", + bestEffort: true, + completionDestination: { + mode: "webhook", + to: "https://example.invalid/legacy-completion", + }, }); }); diff --git a/src/cron/store/delivery-codec.ts b/src/cron/store/delivery-codec.ts index 868fc2359aed..d3d2bbb2f73e 100644 --- a/src/cron/store/delivery-codec.ts +++ b/src/cron/store/delivery-codec.ts @@ -1,12 +1,5 @@ -import { isRecord } from "@openclaw/normalization-core/record-coerce"; -import type { CronCompletionDestination, CronDelivery, CronMessageChannel } from "../types.js"; -import { - booleanToInteger, - integerToBoolean, - optionalBooleanFromRecord, - optionalStringFromRecord, - optionalThreadIdFromRecord, -} from "./scalar-codec.js"; +import type { CronDelivery } from "../types.js"; +import { booleanToInteger, integerToBoolean } from "./scalar-codec.js"; import type { CronJobInsert, CronJobRow } from "./schema.js"; export function bindDeliveryColumns( @@ -16,6 +9,8 @@ export function bindDeliveryColumns( | "delivery_account_id" | "delivery_best_effort" | "delivery_channel" + | "delivery_completion_mode" + | "delivery_completion_to" | "delivery_mode" | "delivery_thread_id" | "delivery_to" @@ -34,6 +29,8 @@ export function bindDeliveryColumns( : String(delivery.threadId), delivery_account_id: delivery?.accountId ?? null, delivery_best_effort: booleanToInteger(delivery?.bestEffort), + delivery_completion_mode: delivery?.completionDestination?.mode ?? null, + delivery_completion_to: delivery?.completionDestination?.to ?? null, failure_delivery_mode: delivery?.failureDestination?.mode ?? null, failure_delivery_channel: delivery?.failureDestination?.channel ?? null, failure_delivery_to: delivery?.failureDestination?.to ?? null, @@ -45,113 +42,28 @@ function cronDeliveryModeFromValue(value: unknown): CronDelivery["mode"] | undef return value === "none" || value === "announce" || value === "webhook" ? value : undefined; } -function cronFailureDeliveryModeFromValue(value: unknown): "announce" | "webhook" | undefined { - return value === "announce" || value === "webhook" ? value : undefined; -} - -function completionDestinationFromFallback(params: { - fallback: unknown; - mode: CronDelivery["mode"] | undefined; -}): CronCompletionDestination | undefined { - if (params.mode !== "announce") { - return undefined; - } - const { fallback } = params; - if (!isRecord(fallback)) { - return undefined; - } - const raw = fallback.completionDestination; - if (!isRecord(raw) || raw.mode !== "webhook") { - return undefined; - } - const to = optionalStringFromRecord(raw, "to"); - return { - mode: "webhook", - ...(to ? { to } : {}), - }; -} - -function failureDestinationFromFallback( - fallback: unknown, -): CronDelivery["failureDestination"] | undefined { - if (!isRecord(fallback)) { - return undefined; - } - const raw = fallback.failureDestination; - if (!isRecord(raw)) { - return undefined; - } - const mode = cronFailureDeliveryModeFromValue(raw.mode); - const channel = optionalStringFromRecord(raw, "channel") as CronMessageChannel | undefined; - const to = optionalStringFromRecord(raw, "to"); - const accountId = optionalStringFromRecord(raw, "accountId"); - if (!mode && !channel && !to && !accountId) { - return undefined; - } - return { - ...(mode ? { mode } : {}), - ...(channel ? { channel } : {}), - ...(to ? { to } : {}), - ...(accountId ? { accountId } : {}), - }; -} - -function fallbackDeliveryFromRecord(fallback: unknown): CronDelivery | undefined { - if (!isRecord(fallback)) { - return undefined; - } - const mode = cronDeliveryModeFromValue(fallback.mode); - const channel = optionalStringFromRecord(fallback, "channel") as CronMessageChannel | undefined; - const to = optionalStringFromRecord(fallback, "to"); - const threadId = optionalThreadIdFromRecord(fallback, "threadId"); - const accountId = optionalStringFromRecord(fallback, "accountId"); - const bestEffort = optionalBooleanFromRecord(fallback, "bestEffort"); - const completionDestination = completionDestinationFromFallback({ - fallback, - mode: mode ?? "announce", - }); - const failureDestination = failureDestinationFromFallback(fallback); - if ( - !mode && - !channel && - !to && - threadId == null && - !accountId && - bestEffort == null && - !completionDestination && - !failureDestination - ) { - return undefined; - } - return { - mode: mode ?? "announce", - ...(channel ? { channel } : {}), - ...(to ? { to } : {}), - ...(threadId != null ? { threadId } : {}), - ...(accountId ? { accountId } : {}), - ...(bestEffort != null ? { bestEffort } : {}), - ...(completionDestination ? { completionDestination } : {}), - ...(failureDestination ? { failureDestination } : {}), - }; -} - -export function deliveryFromRow(row: CronJobRow, fallback?: unknown): CronDelivery | undefined { - const fallbackDelivery = fallbackDeliveryFromRecord(fallback); +export function deliveryFromRow(row: CronJobRow): CronDelivery | undefined { const rowMode = cronDeliveryModeFromValue(row.delivery_mode); - const mode = rowMode ?? fallbackDelivery?.mode; const hasDeliveryColumns = Boolean( row.delivery_channel || row.delivery_to || row.delivery_thread_id || row.delivery_account_id || + row.delivery_completion_mode || + row.delivery_completion_to || row.failure_delivery_channel || row.failure_delivery_to || row.failure_delivery_mode || row.failure_delivery_account_id, ) || row.delivery_best_effort != null; const completionDestination = - mode === "announce" ? fallbackDelivery?.completionDestination : undefined; + rowMode === "announce" && row.delivery_completion_mode === "webhook" + ? { + mode: "webhook" as const, + ...(row.delivery_completion_to ? { to: row.delivery_completion_to } : {}), + } + : undefined; const failureDestination = row.failure_delivery_channel || row.failure_delivery_to || @@ -169,25 +81,12 @@ export function deliveryFromRow(row: CronJobRow, fallback?: unknown): CronDelive ? { accountId: row.failure_delivery_account_id } : {}), } - : fallbackDelivery?.failureDestination; - if (!mode && !hasDeliveryColumns && !fallbackDelivery) { + : undefined; + if (!rowMode && !hasDeliveryColumns) { return undefined; } - const fallbackDeliveryFields = - rowMode === "none" || rowMode === "webhook" - ? {} - : { - ...(fallbackDelivery?.channel ? { channel: fallbackDelivery.channel } : {}), - ...(fallbackDelivery?.to ? { to: fallbackDelivery.to } : {}), - ...(fallbackDelivery?.threadId != null ? { threadId: fallbackDelivery.threadId } : {}), - ...(fallbackDelivery?.accountId ? { accountId: fallbackDelivery.accountId } : {}), - ...(fallbackDelivery?.bestEffort != null - ? { bestEffort: fallbackDelivery.bestEffort } - : {}), - }; return { - ...fallbackDeliveryFields, - mode: mode ?? "announce", + mode: rowMode ?? "announce", ...(row.delivery_channel ? { channel: row.delivery_channel as CronDelivery["channel"] } : {}), ...(row.delivery_to ? { to: row.delivery_to } : {}), ...(row.delivery_thread_id ? { threadId: row.delivery_thread_id } : {}), diff --git a/src/cron/store/payload-codec.ts b/src/cron/store/payload-codec.ts index d65ea6fceab3..b1365fd34b73 100644 --- a/src/cron/store/payload-codec.ts +++ b/src/cron/store/payload-codec.ts @@ -1,24 +1,16 @@ -import { isRecord } from "@openclaw/normalization-core/record-coerce"; import type { CronPayload } from "../types.js"; import { booleanToInteger, integerToBoolean, normalizeNumber, - optionalBooleanFromRecord, - optionalNumberFromRecord, - optionalStringArrayFromRecord, - optionalStringFromRecord, parseJsonArray, parseJsonValue, serializeJson, } from "./scalar-codec.js"; import type { CronJobInsert, CronJobRow } from "./schema.js"; -function parseExternalContentSource( - raw: string | null, - fallback: unknown, -): "gmail" | "webhook" | undefined { - const parsed = raw ? parseJsonValue(raw, undefined) : fallback; +function parseExternalContentSource(raw: string | null): "gmail" | "webhook" | undefined { + const parsed = raw ? parseJsonValue(raw, undefined) : undefined; return parsed === "gmail" || parsed === "webhook" ? parsed : undefined; } @@ -65,47 +57,36 @@ export function bindPayloadColumns( }; } -export function payloadFromRow(row: CronJobRow, fallback: unknown): CronPayload | null { - const fallbackRecord = isRecord(fallback) ? fallback : {}; +export function payloadFromRow(row: CronJobRow): CronPayload | null { if (row.payload_kind === "systemEvent") { - const text = row.payload_message ?? optionalStringFromRecord(fallbackRecord, "text"); - return text == null ? null : { kind: "systemEvent", text }; + return row.payload_message == null ? null : { kind: "systemEvent", text: row.payload_message }; } if (row.payload_kind === "agentTurn") { - const message = row.payload_message ?? optionalStringFromRecord(fallbackRecord, "message"); - if (message == null) { + if (row.payload_message == null) { return null; } - const model = row.payload_model ?? optionalStringFromRecord(fallbackRecord, "model"); const fallbacks = row.payload_fallbacks_json ? parseJsonArray(row.payload_fallbacks_json) - : optionalStringArrayFromRecord(fallbackRecord, "fallbacks"); - const thinking = row.payload_thinking ?? optionalStringFromRecord(fallbackRecord, "thinking"); - const timeoutSeconds = - row.payload_timeout_seconds != null - ? normalizeNumber(row.payload_timeout_seconds) - : optionalNumberFromRecord(fallbackRecord, "timeoutSeconds"); + : undefined; + const timeoutSeconds = normalizeNumber(row.payload_timeout_seconds); const allowUnsafeExternalContent = row.payload_allow_unsafe_external_content != null ? integerToBoolean(row.payload_allow_unsafe_external_content) - : optionalBooleanFromRecord(fallbackRecord, "allowUnsafeExternalContent"); + : undefined; const externalContentSource = parseExternalContentSource( row.payload_external_content_source_json, - fallbackRecord.externalContentSource, ); const lightContext = - row.payload_light_context != null - ? integerToBoolean(row.payload_light_context) - : optionalBooleanFromRecord(fallbackRecord, "lightContext"); + row.payload_light_context != null ? integerToBoolean(row.payload_light_context) : undefined; const toolsAllow = row.payload_tools_allow_json ? parseJsonArray(row.payload_tools_allow_json) - : optionalStringArrayFromRecord(fallbackRecord, "toolsAllow"); + : undefined; return { kind: "agentTurn", - message, - ...(model ? { model } : {}), + message: row.payload_message, + ...(row.payload_model ? { model: row.payload_model } : {}), ...(fallbacks ? { fallbacks } : {}), - ...(thinking ? { thinking } : {}), + ...(row.payload_thinking ? { thinking: row.payload_thinking } : {}), ...(timeoutSeconds != null ? { timeoutSeconds } : {}), ...(allowUnsafeExternalContent != null ? { allowUnsafeExternalContent } : {}), ...(externalContentSource ? { externalContentSource } : {}), diff --git a/src/cron/store/row-codec.ts b/src/cron/store/row-codec.ts index 060971187078..999b8f4d8f13 100644 --- a/src/cron/store/row-codec.ts +++ b/src/cron/store/row-codec.ts @@ -157,16 +157,15 @@ function scheduleFromRow(row: CronJobRow): CronSchedule | null { } function rowToCronJob(row: CronJobRow): CronJob | null { - const base = parseJsonObject>(row.job_json, {}); - const schedule = scheduleFromRow(row) ?? base.schedule; - const payload = payloadFromRow(row, base.payload) ?? base.payload; - const delivery = deliveryFromRow(row, base.delivery); + const schedule = scheduleFromRow(row); + const payload = payloadFromRow(row); + const delivery = deliveryFromRow(row); const failureAlert = failureAlertFromRow(row); if (!schedule || !payload) { return null; } + const createdAtMs = normalizeNumber(row.created_at_ms) ?? Date.now(); return { - ...base, id: row.job_id, name: row.name, ...(row.description ? { description: row.description } : {}), @@ -174,12 +173,9 @@ function rowToCronJob(row: CronJobRow): CronJob | null { ...(row.delete_after_run != null ? { deleteAfterRun: integerToBoolean(row.delete_after_run) } : {}), - createdAtMs: normalizeNumber(row.created_at_ms) ?? base.createdAtMs ?? Date.now(), + createdAtMs, updatedAtMs: - normalizeNumber(row.runtime_updated_at_ms) ?? - normalizeNumber(row.updated_at) ?? - base.updatedAtMs ?? - Date.now(), + normalizeNumber(row.runtime_updated_at_ms) ?? normalizeNumber(row.updated_at) ?? createdAtMs, ...(row.agent_id ? { agentId: row.agent_id } : {}), ...(row.session_key ? { sessionKey: row.session_key } : {}), schedule, diff --git a/src/cron/store/scalar-codec.ts b/src/cron/store/scalar-codec.ts index 3d6635dea407..8a877fb7c4b1 100644 --- a/src/cron/store/scalar-codec.ts +++ b/src/cron/store/scalar-codec.ts @@ -44,45 +44,3 @@ export function parseJsonArray(raw: string | null): string[] | undefined { ? parsed.filter((item): item is string => typeof item === "string") : undefined; } - -export function optionalStringFromRecord( - record: Record, - key: string, -): string | undefined { - const value = record[key]; - return typeof value === "string" ? value : undefined; -} - -export function optionalBooleanFromRecord( - record: Record, - key: string, -): boolean | undefined { - const value = record[key]; - return typeof value === "boolean" ? value : undefined; -} - -export function optionalNumberFromRecord( - record: Record, - key: string, -): number | undefined { - const value = record[key]; - return typeof value === "number" && Number.isFinite(value) ? value : undefined; -} - -export function optionalStringArrayFromRecord( - record: Record, - key: string, -): string[] | undefined { - const value = record[key]; - return Array.isArray(value) && value.every((item) => typeof item === "string") - ? value - : undefined; -} - -export function optionalThreadIdFromRecord( - record: Record, - key: string, -): string | number | undefined { - const value = record[key]; - return typeof value === "string" || typeof value === "number" ? value : undefined; -} diff --git a/src/gateway/server-methods/cron.validation.test.ts b/src/gateway/server-methods/cron.validation.test.ts index 6aa16e104995..8e4e03dafa8e 100644 --- a/src/gateway/server-methods/cron.validation.test.ts +++ b/src/gateway/server-methods/cron.validation.test.ts @@ -699,7 +699,7 @@ describe("cron method validation", () => { params: { name: "bad-cron", enabled: true, - schedule: { kind: "cron", cron: "not-a-cron-expr" }, + schedule: { kind: "cron", expr: "not-a-cron-expr" }, sessionTarget: "isolated", wakeMode: "next-heartbeat", payload: { kind: "agentTurn", message: "ping" }, @@ -725,7 +725,7 @@ describe("cron method validation", () => { params: { id: existingJob.id, patch: { - schedule: { kind: "cron", cron: "99 * * * *" }, + schedule: { kind: "cron", expr: "99 * * * *" }, }, } as never, respond: respond as never, diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index d9257087cc29..9b3b134758c1 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -491,22 +491,6 @@ describe("gateway server cron", () => { expect(missingGetRes.ok).toBe(false); expect(missingGetRes.error?.code).toBe("INVALID_REQUEST"); expect(missingGetRes.error?.message).toContain("cron job not found: missing-job-id"); - - const wrappedAtMs = Date.now() + 1000; - const wrappedRes = await directCronReq(cronState, "cron.add", { - data: { - name: "wrapped", - schedule: { at: new Date(wrappedAtMs).toISOString() }, - payload: { kind: "systemEvent", text: "hello" }, - }, - }); - expect(wrappedRes.ok).toBe(true); - const wrappedPayload = wrappedRes.payload as - | { schedule?: unknown; sessionTarget?: unknown; wakeMode?: unknown } - | undefined; - expect(wrappedPayload?.sessionTarget).toBe("main"); - expect(wrappedPayload?.wakeMode).toBe("now"); - expect((wrappedPayload?.schedule as { kind?: unknown } | undefined)?.kind).toBe("at"); } finally { await cleanupCronTestRun({ cronState, @@ -648,7 +632,7 @@ describe("gateway server cron", () => { const updateRes = await directCronReq(cronState, "cron.update", { id: patchJobId, patch: { - schedule: { at: new Date(atMs).toISOString() }, + schedule: { kind: "at", at: new Date(atMs).toISOString() }, payload: { kind: "systemEvent", text: "updated" }, }, }); @@ -716,6 +700,7 @@ describe("gateway server cron", () => { id: mergeJobId, patch: { payload: { + kind: "agentTurn", model: "anthropic/claude-sonnet-4-6", }, }, @@ -780,7 +765,7 @@ describe("gateway server cron", () => { const jobIdUpdateRes = await directCronReq(cronState, "cron.update", { jobId, patch: { - schedule: { at: new Date(Date.now() + 2_000).toISOString() }, + schedule: { kind: "at", at: new Date(Date.now() + 2_000).toISOString() }, payload: { kind: "systemEvent", text: "updated" }, }, }); diff --git a/src/state/openclaw-state-db.generated.d.ts b/src/state/openclaw-state-db.generated.d.ts index ce57a6c95e1b..b744f07e7cdc 100644 --- a/src/state/openclaw-state-db.generated.d.ts +++ b/src/state/openclaw-state-db.generated.d.ts @@ -208,6 +208,8 @@ export interface CronJobs { delivery_account_id: string | null; delivery_best_effort: number | null; delivery_channel: string | null; + delivery_completion_mode: string | null; + delivery_completion_to: string | null; delivery_mode: string | null; delivery_thread_id: string | null; delivery_to: string | null; diff --git a/src/state/openclaw-state-db.ts b/src/state/openclaw-state-db.ts index 3a20ade57f8f..48df909ae463 100644 --- a/src/state/openclaw-state-db.ts +++ b/src/state/openclaw-state-db.ts @@ -213,6 +213,8 @@ function ensureAdditiveStateColumns(db: DatabaseSync): void { ensureColumn(db, "cron_jobs", "delivery_thread_id TEXT"); ensureColumn(db, "cron_jobs", "delivery_account_id TEXT"); ensureColumn(db, "cron_jobs", "delivery_best_effort INTEGER"); + ensureColumn(db, "cron_jobs", "delivery_completion_mode TEXT"); + ensureColumn(db, "cron_jobs", "delivery_completion_to TEXT"); ensureColumn(db, "cron_jobs", "failure_delivery_mode TEXT"); ensureColumn(db, "cron_jobs", "failure_delivery_channel TEXT"); ensureColumn(db, "cron_jobs", "failure_delivery_to TEXT"); diff --git a/src/state/openclaw-state-schema.generated.ts b/src/state/openclaw-state-schema.generated.ts index 5a5f0e277789..fd88d6f8dfff 100644 --- a/src/state/openclaw-state-schema.generated.ts +++ b/src/state/openclaw-state-schema.generated.ts @@ -835,6 +835,8 @@ CREATE TABLE IF NOT EXISTS cron_jobs ( delivery_thread_id TEXT, delivery_account_id TEXT, delivery_best_effort INTEGER, + delivery_completion_mode TEXT, + delivery_completion_to TEXT, failure_delivery_mode TEXT, failure_delivery_channel TEXT, failure_delivery_to TEXT, diff --git a/src/state/openclaw-state-schema.sql b/src/state/openclaw-state-schema.sql index 55101c69cf94..6dc723131161 100644 --- a/src/state/openclaw-state-schema.sql +++ b/src/state/openclaw-state-schema.sql @@ -830,6 +830,8 @@ CREATE TABLE IF NOT EXISTS cron_jobs ( delivery_thread_id TEXT, delivery_account_id TEXT, delivery_best_effort INTEGER, + delivery_completion_mode TEXT, + delivery_completion_to TEXT, failure_delivery_mode TEXT, failure_delivery_channel TEXT, failure_delivery_to TEXT,