From d86b6da0120a7b3c065470acd6b6dc07ca5f4814 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 31 May 2026 22:04:19 -0400 Subject: [PATCH] fix: allow cron delivery clears --- .../src/cron-validators.test.ts | 33 ++++++ packages/gateway-protocol/src/schema/cron.ts | 24 ++++- src/agents/tools/cron-tool.test.ts | 43 +++++++- src/agents/tools/cron-tool.ts | 101 +++++++++++++----- src/cron/normalize.test.ts | 42 ++++++++ src/cron/normalize.ts | 78 +++++++++++++- src/cron/service/jobs.apply-patch.test.ts | 61 +++++++++++ src/cron/service/jobs.ts | 9 +- src/cron/types.ts | 14 ++- .../server-methods/cron.validation.test.ts | 86 +++++++++++++++ 10 files changed, 450 insertions(+), 41 deletions(-) diff --git a/packages/gateway-protocol/src/cron-validators.test.ts b/packages/gateway-protocol/src/cron-validators.test.ts index 24c74b7ee4d5..db08ea1f3f0d 100644 --- a/packages/gateway-protocol/src/cron-validators.test.ts +++ b/packages/gateway-protocol/src/cron-validators.test.ts @@ -99,6 +99,39 @@ describe("cron protocol validators", () => { ).toBe(true); }); + it("accepts nullable delivery clears on update params", () => { + expect( + validateCronUpdateParams({ + id: "job-1", + patch: { + delivery: { + channel: null, + to: null, + threadId: null, + accountId: null, + failureDestination: null, + }, + }, + }), + ).toBe(true); + + expect( + validateCronUpdateParams({ + id: "job-1", + patch: { + delivery: { + failureDestination: { + channel: null, + to: null, + accountId: null, + mode: null, + }, + }, + }, + }), + ).toBe(true); + }); + it("accepts remove params for id and jobId selectors", () => { expect(validateCronRemoveParams({ id: "job-1" })).toBe(true); expect(validateCronRemoveParams({ jobId: "job-2" })).toBe(true); diff --git a/packages/gateway-protocol/src/schema/cron.ts b/packages/gateway-protocol/src/schema/cron.ts index 097a75ce785f..aa8c6da02294 100644 --- a/packages/gateway-protocol/src/schema/cron.ts +++ b/packages/gateway-protocol/src/schema/cron.ts @@ -235,6 +235,18 @@ export const CronFailureDestinationSchema = Type.Object( { additionalProperties: false }, ); +const CronFailureDestinationPatchSchema = Type.Object( + { + channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString, Type.Null()])), + to: Type.Optional(Type.Union([Type.String(), Type.Null()])), + accountId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), + mode: Type.Optional( + Type.Union([Type.Literal("announce"), Type.Literal("webhook"), Type.Null()]), + ), + }, + { additionalProperties: false }, +); + export const CronCompletionDestinationSchema = Type.Object( { mode: Type.Literal("webhook"), @@ -251,6 +263,14 @@ const CronDeliverySharedProperties = { failureDestination: Type.Optional(CronFailureDestinationSchema), }; +const CronDeliveryPatchSharedProperties = { + channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString, Type.Null()])), + threadId: Type.Optional(Type.Union([Type.String(), Type.Number(), Type.Null()])), + accountId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), + bestEffort: Type.Optional(Type.Boolean()), + failureDestination: Type.Optional(Type.Union([CronFailureDestinationPatchSchema, Type.Null()])), +}; + const CronDeliveryNoopSchema = Type.Object( { mode: Type.Literal("none"), @@ -290,11 +310,11 @@ export const CronDeliveryPatchSchema = Type.Object( mode: Type.Optional( Type.Union([Type.Literal("none"), Type.Literal("announce"), Type.Literal("webhook")]), ), - ...CronDeliverySharedProperties, + ...CronDeliveryPatchSharedProperties, completionDestination: Type.Optional( Type.Union([CronCompletionDestinationSchema, Type.Null()]), ), - to: Type.Optional(Type.String()), + to: Type.Optional(Type.Union([Type.String(), Type.Null()])), }, { additionalProperties: false }, ); diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index e9d87fe75548..4b725ac15650 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -25,6 +25,7 @@ describe("cron tool", () => { anyOf?: Array<{ type?: string }>; description?: string; properties?: Record; + type?: string; }; type TestDelivery = { @@ -458,10 +459,44 @@ describe("cron tool", () => { const jobThreadId = parameters.properties?.job?.properties?.delivery?.properties?.threadId; const patchThreadId = parameters.properties?.patch?.properties?.delivery?.properties?.threadId; - for (const threadId of [jobThreadId, patchThreadId]) { - expect(threadId?.description).toContain("Thread/topic id"); - expect(threadId?.anyOf?.map((entry) => entry.type)).toEqual(["string", "number"]); - } + expect(jobThreadId?.description).toContain("Thread/topic id"); + expect(jobThreadId?.anyOf?.map((entry) => entry.type)).toEqual(["string", "number"]); + expect(patchThreadId?.description).toContain("Thread/topic id"); + expect(patchThreadId?.anyOf?.map((entry) => entry.type)).toEqual(["string", "number", "null"]); + }); + + it("advertises nullable cron update clears in the tool schema", () => { + const tool = createTestCronTool(); + const parameters = tool.parameters as SchemaLike; + const jobDelivery = parameters.properties?.job?.properties?.delivery; + const patch = parameters.properties?.patch; + const payload = patch?.properties?.payload; + const delivery = patch?.properties?.delivery; + + expect(jobDelivery?.properties?.channel?.anyOf).toBeUndefined(); + expect(jobDelivery?.properties?.channel?.type).toBe("string"); + expect(jobDelivery?.properties?.failureDestination?.anyOf).toBeUndefined(); + expect(jobDelivery?.properties?.failureDestination?.type).toBe("object"); + expect(patch?.properties?.agentId?.anyOf?.map((entry) => entry.type)).toEqual([ + "string", + "null", + ]); + expect(patch?.properties?.sessionKey?.anyOf?.map((entry) => entry.type)).toEqual([ + "string", + "null", + ]); + expect(payload?.properties?.toolsAllow?.anyOf?.map((entry) => entry.type)).toEqual([ + "array", + "null", + ]); + expect(delivery?.properties?.channel?.anyOf?.map((entry) => entry.type)).toEqual([ + "string", + "null", + ]); + expect(delivery?.properties?.failureDestination?.anyOf?.map((entry) => entry.type)).toEqual([ + "object", + "null", + ]); }); it.each([ diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index f736f8adc0dc..aa67c2eeb055 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -64,11 +64,31 @@ function isMissingOrEmptyObject(value: unknown): boolean { } function nullableStringSchema(description: string) { - return Type.Optional(Type.String({ description })); + return Type.Optional(Type.Union([Type.String(), Type.Null()], { description })); } function nullableStringArraySchema(description: string) { - return Type.Optional(Type.Array(Type.String(), { description })); + return Type.Optional(Type.Union([Type.Array(Type.String()), Type.Null()], { description })); +} + +function deliveryStringSchema(params: { description: string; nullableClears: boolean }) { + return params.nullableClears + ? nullableStringSchema(params.description) + : Type.Optional(Type.String({ description: params.description })); +} + +function deliveryThreadIdSchema(params: { nullableClears: boolean }) { + const variants = params.nullableClears + ? [Type.String(), Type.Number(), Type.Null()] + : [Type.String(), Type.Number()]; + return Type.Optional(Type.Union(variants, { description: "Thread/topic id" })); +} + +function failureDestinationModeSchema(params: { nullableClears: boolean }) { + const variants = params.nullableClears + ? [Type.Literal("announce"), Type.Literal("webhook"), Type.Null()] + : [Type.Literal("announce"), Type.Literal("webhook")]; + return Type.Optional(Type.Union(variants)); } function cronPayloadObjectSchema(params: { toolsAllow: TSchema }) { @@ -122,34 +142,59 @@ const CronPayloadSchema = Type.Optional( }), ); -const CronDeliverySchema = Type.Optional( - Type.Object( +function cronDeliverySchema(params: { nullableClears: boolean }) { + const failureDestinationObject = Type.Object( { - mode: optionalStringEnum(CRON_DELIVERY_MODES, { description: "Delivery mode" }), - channel: Type.Optional(Type.String({ description: "Delivery channel" })), - to: Type.Optional(Type.String({ description: "Delivery target" })), - threadId: Type.Optional( - Type.Union([Type.String(), Type.Number()], { - description: "Thread/topic id", - }), - ), - bestEffort: Type.Optional(Type.Boolean()), - accountId: Type.Optional(Type.String({ description: "Delivery account" })), - failureDestination: Type.Optional( - Type.Object( - { - channel: Type.Optional(Type.String()), - to: Type.Optional(Type.String()), - accountId: Type.Optional(Type.String()), - mode: optionalStringEnum(["announce", "webhook"] as const), - }, - { additionalProperties: true }, - ), - ), + channel: deliveryStringSchema({ + description: "Failure delivery channel", + nullableClears: params.nullableClears, + }), + to: deliveryStringSchema({ + description: "Failure delivery target", + nullableClears: params.nullableClears, + }), + accountId: deliveryStringSchema({ + description: "Failure delivery account", + nullableClears: params.nullableClears, + }), + mode: failureDestinationModeSchema({ nullableClears: params.nullableClears }), }, { additionalProperties: true }, - ), -); + ); + + return Type.Optional( + Type.Object( + { + mode: optionalStringEnum(CRON_DELIVERY_MODES, { description: "Delivery mode" }), + channel: deliveryStringSchema({ + description: "Delivery channel", + nullableClears: params.nullableClears, + }), + to: deliveryStringSchema({ + description: "Delivery target", + nullableClears: params.nullableClears, + }), + threadId: deliveryThreadIdSchema({ nullableClears: params.nullableClears }), + bestEffort: Type.Optional(Type.Boolean()), + accountId: deliveryStringSchema({ + description: "Delivery account", + nullableClears: params.nullableClears, + }), + failureDestination: params.nullableClears + ? Type.Optional( + Type.Union([failureDestinationObject, Type.Null()], { + description: "Failure destination, or null to clear", + }), + ) + : Type.Optional(failureDestinationObject), + }, + { additionalProperties: true }, + ), + ); +} + +const CronDeliverySchema = cronDeliverySchema({ nullableClears: false }); +const CronDeliveryPatchSchema = cronDeliverySchema({ nullableClears: true }); // Omitting `failureAlert` means "leave defaults/unchanged"; `false` explicitly disables alerts. // Runtime handles `failureAlert === false` in cron/service/timer.ts. @@ -211,7 +256,7 @@ const CronPatchObjectSchema = Type.Optional( toolsAllow: nullableStringArraySchema("Allowed tool ids, or null to clear"), }), ), - delivery: CronDeliverySchema, + delivery: CronDeliveryPatchSchema, description: Type.Optional(Type.String()), enabled: Type.Optional(Type.Boolean()), deleteAfterRun: Type.Optional(Type.Boolean()), diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index ed09ff353493..8b7ffe4a3e62 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -498,6 +498,27 @@ describe("normalizeCronJobCreate", () => { anchorMs: 123, }); expect(validateCronUpdateParams({ id: "job", patch: normalized })).toBe(true); + + const nested = normalizeCronJobPatch({ + delivery: { + failureDestination: { + channel: null, + to: null, + accountId: null, + mode: null, + }, + }, + }) as unknown as Record; + + expect(nested.delivery).toEqual({ + failureDestination: { + channel: null, + to: null, + accountId: null, + mode: null, + }, + }); + expect(validateCronUpdateParams({ id: "job", patch: nested })).toBe(true); }); it("keeps invalid every schedule numbers invalid for validation", () => { @@ -759,6 +780,27 @@ describe("normalizeCronJobPatch", () => { expect(validateCronUpdateParams({ id: "job", patch: normalized })).toBe(true); }); + it("preserves nullable delivery field clears in patches", () => { + const normalized = normalizeCronJobPatch({ + delivery: { + channel: null, + to: null, + threadId: null, + accountId: null, + failureDestination: null, + }, + }) as unknown as Record; + + expect(normalized.delivery).toEqual({ + channel: null, + to: null, + threadId: null, + accountId: null, + failureDestination: null, + }); + expect(validateCronUpdateParams({ id: "job", patch: normalized })).toBe(true); + }); + it("normalizes cron stagger values in patch schedules", () => { const normalized = normalizeCronJobPatch({ schedule: { kind: "cron", expr: "0 * * * *", staggerMs: "30000" }, diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index a2a4e9569233..1bcbcec2689d 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -203,26 +203,43 @@ function coerceDelivery(delivery: UnknownRecord) { } else if ("mode" in next) { delete next.mode; } - if (parsed.channel !== undefined) { + if ("channel" in delivery && delivery.channel === null) { + next.channel = null; + } else if (parsed.channel !== undefined) { next.channel = parsed.channel; } else if ("channel" in next) { delete next.channel; } - if (parsed.to !== undefined) { + if ("to" in delivery && delivery.to === null) { + next.to = null; + } else if (parsed.to !== undefined) { next.to = parsed.to; } else if ("to" in next) { delete next.to; } - if (parsed.threadId !== undefined) { + if ("threadId" in delivery && delivery.threadId === null) { + next.threadId = null; + } else if (parsed.threadId !== undefined) { next.threadId = parsed.threadId; } else if ("threadId" in next) { delete next.threadId; } - if (parsed.accountId !== undefined) { + if ("accountId" in delivery && delivery.accountId === null) { + next.accountId = null; + } else if (parsed.accountId !== undefined) { next.accountId = parsed.accountId; } else if ("accountId" in next) { delete next.accountId; } + if ("failureDestination" in next) { + if (next.failureDestination === null) { + next.failureDestination = null; + } else if (isRecord(next.failureDestination)) { + next.failureDestination = coerceFailureDestination(next.failureDestination); + } else { + delete next.failureDestination; + } + } if ("completionDestination" in next) { if (next.completionDestination === null) { next.completionDestination = null; @@ -252,6 +269,59 @@ function coerceCompletionDestination(value: UnknownRecord) { } satisfies UnknownRecord; } +function coerceFailureDestination(value: UnknownRecord) { + const next: UnknownRecord = { ...value }; + if ("channel" in next) { + if (next.channel === null) { + next.channel = null; + } else { + const channel = normalizeOptionalLowercaseString(next.channel); + if (channel) { + next.channel = channel; + } else { + delete next.channel; + } + } + } + if ("to" in next) { + if (next.to === null) { + next.to = null; + } else { + const to = normalizeOptionalString(next.to); + if (to) { + next.to = to; + } else { + delete next.to; + } + } + } + if ("accountId" in next) { + if (next.accountId === null) { + next.accountId = null; + } else { + const accountId = normalizeOptionalString(next.accountId); + if (accountId) { + next.accountId = accountId; + } else { + delete next.accountId; + } + } + } + if ("mode" in next) { + if (next.mode === null) { + next.mode = null; + } else { + const mode = normalizeOptionalLowercaseString(next.mode); + if (mode === "announce" || mode === "webhook") { + next.mode = mode; + } else { + delete next.mode; + } + } + } + return next; +} + function normalizeSessionTarget(raw: unknown) { if (typeof raw !== "string") { return undefined; diff --git a/src/cron/service/jobs.apply-patch.test.ts b/src/cron/service/jobs.apply-patch.test.ts index 3f6cae85c108..2eadb0510a13 100644 --- a/src/cron/service/jobs.apply-patch.test.ts +++ b/src/cron/service/jobs.apply-patch.test.ts @@ -34,4 +34,65 @@ describe("applyJobPatch delivery merge", () => { threadId: "99", }); }); + + it("clears nullable delivery fields", () => { + const job = makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "-1001234567890", + threadId: "99", + accountId: "bot-a", + failureDestination: { + mode: "announce", + channel: "slack", + to: "C123", + accountId: "bot-b", + }, + }, + }); + const patch = { + delivery: { + channel: null, + to: null, + threadId: null, + accountId: null, + failureDestination: null, + }, + } as Parameters[1]; + + applyJobPatch(job, patch); + + expect(job.delivery).toEqual({ mode: "announce" }); + }); + + it("clears nullable failure destination fields", () => { + const job = makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "-1001234567890", + failureDestination: { + mode: "announce", + channel: "slack", + to: "C123", + accountId: "bot-b", + }, + }, + }); + const patch = { + delivery: { + failureDestination: { + channel: null, + to: null, + accountId: null, + mode: null, + }, + }, + } as Parameters[1]; + + applyJobPatch(job, patch); + + expect(job.delivery?.failureDestination).toBeUndefined(); + }); }); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 2f3ff06aabcb..2f7b16e3a059 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -954,7 +954,7 @@ function mergeCronDelivery( } } if ("failureDestination" in patch) { - if (patch.failureDestination === undefined) { + if (patch.failureDestination == null) { next.failureDestination = undefined; } else { const existingFd = next.failureDestination; @@ -983,7 +983,12 @@ function mergeCronDelivery( nextFd.mode = mode === "announce" || mode === "webhook" ? mode : undefined; } } - next.failureDestination = nextFd; + const hasFailureDestination = + nextFd.channel !== undefined || + nextFd.to !== undefined || + nextFd.accountId !== undefined || + nextFd.mode !== undefined; + next.failureDestination = hasFailureDestination ? nextFd : undefined; } } diff --git a/src/cron/types.ts b/src/cron/types.ts index 9b5ed1c81984..561c6247be12 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -49,8 +49,20 @@ export type CronFailureDestination = { mode?: "announce" | "webhook"; }; -export type CronDeliveryPatch = Partial> & { +export type CronFailureDestinationPatch = { + channel?: CronMessageChannel | null; + to?: string | null; + accountId?: string | null; + mode?: "announce" | "webhook" | null; +}; + +export type CronDeliveryPatch = Partial> & { + channel?: CronMessageChannel | null; + to?: string | null; + threadId?: string | number | null; + accountId?: string | null; completionDestination?: CronCompletionDestination | null; + failureDestination?: CronFailureDestinationPatch | null; }; export type CronRunStatus = "ok" | "error" | "skipped"; diff --git a/src/gateway/server-methods/cron.validation.test.ts b/src/gateway/server-methods/cron.validation.test.ts index 634c9d5c2898..cb70e7de6b59 100644 --- a/src/gateway/server-methods/cron.validation.test.ts +++ b/src/gateway/server-methods/cron.validation.test.ts @@ -532,6 +532,92 @@ describe("cron method validation", () => { expect(clearDelivery.completionDestination).toBeNull(); }); + it("accepts nullable delivery target clears on update", async () => { + const { context, respond } = await invokeCronUpdate( + { + id: "cron-1", + patch: { + delivery: { + channel: null, + to: null, + threadId: null, + accountId: null, + failureDestination: null, + }, + }, + }, + createCronJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "telegram:123", + threadId: "99", + accountId: "bot-a", + failureDestination: { + mode: "announce", + channel: "slack", + to: "C123", + accountId: "bot-b", + }, + }, + }), + ); + + expect(context.cron.update).toHaveBeenCalled(); + expect(requireCronUpdatePatch(context).delivery).toEqual({ + channel: null, + to: null, + threadId: null, + accountId: null, + failureDestination: null, + }); + expectCronSuccess(respond); + }); + + it("accepts nullable failure destination field clears on update", async () => { + setRuntimeConfig(telegramSlackConfig()); + + const { context, respond } = await invokeCronUpdate( + { + id: "cron-1", + patch: { + delivery: { + failureDestination: { + channel: null, + to: null, + accountId: null, + mode: null, + }, + }, + }, + }, + createCronJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "telegram:123", + failureDestination: { + mode: "announce", + channel: "slack", + to: "C123", + accountId: "bot-b", + }, + }, + }), + ); + + expect(context.cron.update).toHaveBeenCalled(); + expect(requireCronUpdatePatch(context).delivery).toEqual({ + failureDestination: { + channel: null, + to: null, + accountId: null, + mode: null, + }, + }); + expectCronSuccess(respond); + }); + it("rejects underscored provider prefixes for a different explicit delivery channel", async () => { setRuntimeConfig(slackSynologyConfig());