mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-21 14:32:03 +08:00
Compare commits
89 Commits
vincentkoc
...
vincentkoc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
99de26761d | ||
|
|
0225e4c110 | ||
|
|
162232ae2f | ||
|
|
ae824ab269 | ||
|
|
4808bf526e | ||
|
|
75c71eb18e | ||
|
|
e5af902dda | ||
|
|
2209cc5832 | ||
|
|
fce77e2d45 | ||
|
|
d0df6f3a4c | ||
|
|
e5ef7cbdab | ||
|
|
d59eb6db5b | ||
|
|
6fac513119 | ||
|
|
57b90adbf2 | ||
|
|
0cf4c46004 | ||
|
|
1695b9203c | ||
|
|
7cc3cc0687 | ||
|
|
0413f9576e | ||
|
|
559b38d507 | ||
|
|
2ca87d06f7 | ||
|
|
09f678599d | ||
|
|
2b8828ea46 | ||
|
|
dbabaa0fb2 | ||
|
|
daf0ade96b | ||
|
|
6c11b4378a | ||
|
|
ddc3a3fc71 | ||
|
|
56218dcc21 | ||
|
|
38068de8e9 | ||
|
|
83b453f48a | ||
|
|
98d52062e7 | ||
|
|
64910240b9 | ||
|
|
bd4d7f6137 | ||
|
|
1b8f800487 | ||
|
|
8cb688c44d | ||
|
|
9bde7ef39f | ||
|
|
3c4377651e | ||
|
|
41a39085d3 | ||
|
|
2220a58ff7 | ||
|
|
e383257552 | ||
|
|
fa6436eaf3 | ||
|
|
1809b92f15 | ||
|
|
4006e388e7 | ||
|
|
b22d596e59 | ||
|
|
38f192a29c | ||
|
|
95ed5781f6 | ||
|
|
d275df82a2 | ||
|
|
32f6501dbd | ||
|
|
ce44b366db | ||
|
|
2cb063b72e | ||
|
|
5d82c2cc89 | ||
|
|
78a1644a8a | ||
|
|
d6b26e22d5 | ||
|
|
0d607942d5 | ||
|
|
cc919d3856 | ||
|
|
8948ed8e33 | ||
|
|
648213653d | ||
|
|
cec7006abb | ||
|
|
2d8c8e7f26 | ||
|
|
e2c8c27c8c | ||
|
|
dad107630e | ||
|
|
0d597ab800 | ||
|
|
03bc43a503 | ||
|
|
8e506634b1 | ||
|
|
27d2ac8460 | ||
|
|
fbc8c19e52 | ||
|
|
a7e5c7c18b | ||
|
|
def4b221d9 | ||
|
|
40542aba96 | ||
|
|
f34b0e6c42 | ||
|
|
84064d08d3 | ||
|
|
022923be15 | ||
|
|
a95cce9f2f | ||
|
|
81bd382e6c | ||
|
|
5b28093b01 | ||
|
|
f1449a5590 | ||
|
|
3fe81fdb96 | ||
|
|
ccd6043240 | ||
|
|
f3d6a3018a | ||
|
|
df31d0e8b6 | ||
|
|
65623e1559 | ||
|
|
b0973880b4 | ||
|
|
98aa2a8cf0 | ||
|
|
f7eccaee4a | ||
|
|
4b694d565d | ||
|
|
7875fb6c27 | ||
|
|
43451ebab7 | ||
|
|
017b389549 | ||
|
|
0504fb35fe | ||
|
|
817fa5462b |
@@ -8,8 +8,6 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Breaking
|
||||
|
||||
- Cron/doctor: tighten isolated cron delivery so cron jobs can no longer notify through ad hoc agent sends or fallback main-session summaries, and add `openclaw doctor --fix` migration for legacy cron storage and legacy notify/webhook delivery metadata. (#40998) Thanks @mbelinky.
|
||||
|
||||
### Fixes
|
||||
|
||||
- macOS/LaunchAgent install: tighten LaunchAgent directory and plist permissions during install so launchd bootstrap does not fail when the target home path or generated plist inherited group/world-writable modes.
|
||||
@@ -20,9 +18,6 @@ Docs: https://docs.openclaw.ai
|
||||
- ACP/sessions.patch: allow `spawnedBy` and `spawnDepth` lineage fields on ACP session keys so `sessions_spawn` with `runtime: "acp"` no longer fails during child-session setup. Fixes #40971. (#40995) thanks @xaeon2026.
|
||||
- ACP/stop reason mapping: resolve gateway chat `state: "error"` completions as ACP `end_turn` instead of `refusal` so transient backend failures are not surfaced as deliberate refusals. (#41187) thanks @pejmanjohn.
|
||||
- ACP/setSessionMode: propagate gateway `sessions.patch` failures back to ACP clients so rejected mode changes no longer return silent success. (#41185) thanks @pejmanjohn.
|
||||
- Agents/embedded logs: add structured, sanitized lifecycle and failover observation events so overload and provider failures are easier to tail and filter. (#41336) thanks @altaywtf.
|
||||
- iOS/gateway foreground recovery: reconnect immediately on foreground return after stale background sockets are torn down, so the app no longer stays disconnected until a later wake path happens. (#41384) Thanks @mbelinky.
|
||||
- Cron/subagent followup: do not misclassify empty or `NO_REPLY` cron responses as interim acknowledgements that need a rerun, so deliberately silent cron jobs are no longer retried. (#41383) thanks @jackal092927.
|
||||
|
||||
## 2026.3.8
|
||||
|
||||
|
||||
@@ -362,14 +362,7 @@ final class NodeAppModel {
|
||||
await MainActor.run {
|
||||
self.operatorConnected = false
|
||||
self.gatewayConnected = false
|
||||
// Foreground recovery must actively restart the saved gateway config.
|
||||
// Disconnecting stale sockets alone can leave us idle if the old
|
||||
// reconnect tasks were suppressed or otherwise got stuck in background.
|
||||
self.gatewayStatusText = "Reconnecting…"
|
||||
self.talkMode.updateGatewayConnected(false)
|
||||
if let cfg = self.activeGatewayConnectConfig {
|
||||
self.applyGatewayConnectConfig(cfg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ Troubleshooting: [/automation/troubleshooting](/automation/troubleshooting)
|
||||
- Wakeups are first-class: a job can request “wake now” vs “next heartbeat”.
|
||||
- Webhook posting is per job via `delivery.mode = "webhook"` + `delivery.to = "<url>"`.
|
||||
- Legacy fallback remains for stored jobs with `notify: true` when `cron.webhook` is set, migrate those jobs to webhook delivery mode.
|
||||
- For upgrades, `openclaw doctor --fix` can normalize legacy cron store fields before the scheduler touches them.
|
||||
|
||||
## Quick start (actionable)
|
||||
|
||||
|
||||
@@ -30,12 +30,6 @@ Note: retention/pruning is controlled in config:
|
||||
- `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions.
|
||||
- `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/<jobId>.jsonl`.
|
||||
|
||||
Upgrade note: if you have older cron jobs from before the current delivery/store format, run
|
||||
`openclaw doctor --fix`. Doctor now normalizes legacy cron fields (`jobId`, `schedule.cron`,
|
||||
top-level delivery fields, payload `provider` delivery aliases) and migrates simple
|
||||
`notify: true` webhook fallback jobs to explicit webhook delivery when `cron.webhook` is
|
||||
configured.
|
||||
|
||||
## Common edits
|
||||
|
||||
Update delivery settings without changing the message:
|
||||
|
||||
@@ -28,7 +28,6 @@ Notes:
|
||||
- Interactive prompts (like keychain/OAuth fixes) only run when stdin is a TTY and `--non-interactive` is **not** set. Headless runs (cron, Telegram, no terminal) will skip prompts.
|
||||
- `--fix` (alias for `--repair`) writes a backup to `~/.openclaw/openclaw.json.bak` and drops unknown config keys, listing each removal.
|
||||
- State integrity checks now detect orphan transcript files in the sessions directory and can archive them as `.deleted.<timestamp>` to reclaim space safely.
|
||||
- Doctor also scans `~/.openclaw/cron/jobs.json` (or `cron.store`) for legacy cron job shapes and can rewrite them in place before the scheduler has to auto-normalize them at runtime.
|
||||
- Doctor includes a memory-search readiness check and can recommend `openclaw configure --section model` when embedding credentials are missing.
|
||||
- If sandbox mode is enabled but Docker is unavailable, doctor reports a high-signal warning with remediation (`install Docker` or `openclaw config set agents.defaults.sandbox.mode off`).
|
||||
|
||||
|
||||
@@ -65,7 +65,6 @@ cat ~/.openclaw/openclaw.json
|
||||
- Config normalization for legacy values.
|
||||
- OpenCode Zen provider override warnings (`models.providers.opencode`).
|
||||
- Legacy on-disk state migration (sessions/agent dir/WhatsApp auth).
|
||||
- Legacy cron store migration (`jobId`, `schedule.cron`, top-level delivery/payload fields, payload `provider`, simple `notify: true` webhook fallback jobs).
|
||||
- State integrity and permissions checks (sessions, transcripts, state dir).
|
||||
- Config file permission checks (chmod 600) when running locally.
|
||||
- Model auth health: checks OAuth expiry, can refresh expiring tokens, and reports auth-profile cooldown/disabled states.
|
||||
@@ -159,25 +158,6 @@ the legacy sessions + agent dir on startup so history/auth/models land in the
|
||||
per-agent path without a manual doctor run. WhatsApp auth is intentionally only
|
||||
migrated via `openclaw doctor`.
|
||||
|
||||
### 3b) Legacy cron store migrations
|
||||
|
||||
Doctor also checks the cron job store (`~/.openclaw/cron/jobs.json` by default,
|
||||
or `cron.store` when overridden) for old job shapes that the scheduler still
|
||||
accepts for compatibility.
|
||||
|
||||
Current cron cleanups include:
|
||||
|
||||
- `jobId` → `id`
|
||||
- `schedule.cron` → `schedule.expr`
|
||||
- top-level payload fields (`message`, `model`, `thinking`, ...) → `payload`
|
||||
- top-level delivery fields (`deliver`, `channel`, `to`, `provider`, ...) → `delivery`
|
||||
- payload `provider` delivery aliases → explicit `delivery.channel`
|
||||
- simple legacy `notify: true` webhook fallback jobs → explicit `delivery.mode="webhook"` with `delivery.to=cron.webhook`
|
||||
|
||||
Doctor only auto-migrates `notify: true` jobs when it can do so without
|
||||
changing behavior. If a job combines legacy notify fallback with an existing
|
||||
non-webhook delivery mode, doctor warns and leaves that job for manual review.
|
||||
|
||||
### 4) State integrity checks (session persistence, routing, and safety)
|
||||
|
||||
The state directory is the operational brainstem. If it vanishes, you lose
|
||||
|
||||
@@ -148,37 +148,6 @@ replace logs; they exist to feed metrics, traces, and other exporters.
|
||||
Diagnostics events are emitted in-process, but exporters only attach when
|
||||
diagnostics + the exporter plugin are enabled.
|
||||
|
||||
### Telemetry surface ownership
|
||||
|
||||
OpenClaw has separate surfaces for automation, runtime control, and telemetry:
|
||||
|
||||
| If you want to... | Use... | Why |
|
||||
| -------------------------------------------------------------------------------------- | --------------------------------------- | ------------------------------------------------------------------ |
|
||||
| Export metrics, traces, or machine-readable health signals | Diagnostic events | Observability should be append-only telemetry, not a behavior hook |
|
||||
| Rewrite prompts, block tools, cancel outbound messages, or add policy/middleware | Typed plugin hooks via `api.on(...)` | Runtime hooks can mutate or block behavior |
|
||||
| Trigger coarse operator automation such as file writes, notifications, or side effects | HOOK.md hooks / `api.registerHook(...)` | Internal hooks are for operator automation, not telemetry schemas |
|
||||
|
||||
Future OTEL work should extend `src/infra/diagnostic-events.ts`, then map those
|
||||
events in the `diagnostics-otel` plugin. Do not add telemetry-only proposals by
|
||||
growing the hook APIs.
|
||||
|
||||
### What diagnostic events are for
|
||||
|
||||
Diagnostic events are the observability contract between the gateway runtime and
|
||||
telemetry consumers such as the `diagnostics-otel` plugin.
|
||||
|
||||
Diagnostic events should be:
|
||||
|
||||
- append-only signals for exporters, dashboards, alerts, and troubleshooting
|
||||
- safe to ignore without affecting runtime behavior
|
||||
- stable enough that exporters can map them into metrics, traces, or logs
|
||||
|
||||
Diagnostic events should not be used for:
|
||||
|
||||
- blocking, vetoing, or rewriting runtime behavior
|
||||
- policy enforcement or middleware ordering
|
||||
- side-effect automation that must run for the system to behave correctly
|
||||
|
||||
### OpenTelemetry vs OTLP
|
||||
|
||||
- **OpenTelemetry (OTel)**: the data model + SDKs for traces, metrics, and logs.
|
||||
@@ -215,28 +184,6 @@ Queue + session:
|
||||
- `run.attempt`: run retry/attempt metadata.
|
||||
- `diagnostic.heartbeat`: aggregate counters (webhooks/queue/session).
|
||||
|
||||
Tool safety:
|
||||
|
||||
- `tool.loop`: repeated-tool-loop warning/block telemetry emitted by the runtime.
|
||||
|
||||
### What is still missing
|
||||
|
||||
The current event catalog is useful, but still coarse in a few places. New
|
||||
observability work should generally extend `src/infra/diagnostic-events.ts`
|
||||
instead of asking hooks to carry telemetry-only meaning.
|
||||
|
||||
Priority gaps for future telemetry work:
|
||||
|
||||
- Run lifecycle: explicit run start, run end, and run error boundaries.
|
||||
- Model lifecycle: request/response/error boundaries in addition to aggregate
|
||||
`model.usage`.
|
||||
- Tool lifecycle: tool call start/end/error boundaries, plus first-class exporter
|
||||
mapping for existing `tool.loop` events.
|
||||
- Outbound delivery lifecycle: delivery attempted/sent/failed boundaries across
|
||||
channels, separate from message processing.
|
||||
- Attribute hygiene: clearer redaction and cardinality guidance for exporter-safe
|
||||
fields.
|
||||
|
||||
### Enable diagnostics (no exporter)
|
||||
|
||||
Use this if you want diagnostics events available to plugins or custom sinks:
|
||||
|
||||
@@ -1,182 +0,0 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import * as loggingConfigModule from "../logging/config.js";
|
||||
import {
|
||||
buildApiErrorObservationFields,
|
||||
buildTextObservationFields,
|
||||
sanitizeForConsole,
|
||||
} from "./pi-embedded-error-observation.js";
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe("buildApiErrorObservationFields", () => {
|
||||
it("redacts request ids and exposes stable hashes instead of raw payloads", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_overload"}',
|
||||
);
|
||||
|
||||
expect(observed).toMatchObject({
|
||||
rawErrorPreview: expect.stringContaining('"request_id":"sha256:'),
|
||||
rawErrorHash: expect.stringMatching(/^sha256:/),
|
||||
rawErrorFingerprint: expect.stringMatching(/^sha256:/),
|
||||
providerErrorType: "overloaded_error",
|
||||
providerErrorMessagePreview: "Overloaded",
|
||||
requestIdHash: expect.stringMatching(/^sha256:/),
|
||||
});
|
||||
expect(observed.rawErrorPreview).not.toContain("req_overload");
|
||||
});
|
||||
|
||||
it("forces token redaction for observation previews", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
"Authorization: Bearer sk-abcdefghijklmnopqrstuvwxyz123456",
|
||||
);
|
||||
|
||||
expect(observed.rawErrorPreview).not.toContain("sk-abcdefghijklmnopqrstuvwxyz123456");
|
||||
expect(observed.rawErrorPreview).toContain("sk-abc");
|
||||
expect(observed.rawErrorHash).toMatch(/^sha256:/);
|
||||
});
|
||||
|
||||
it("redacts observation-only header and cookie formats", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
"x-api-key: sk-abcdefghijklmnopqrstuvwxyz123456 Cookie: session=abcdefghijklmnopqrstuvwxyz123456",
|
||||
);
|
||||
|
||||
expect(observed.rawErrorPreview).not.toContain("abcdefghijklmnopqrstuvwxyz123456");
|
||||
expect(observed.rawErrorPreview).toContain("x-api-key: ***");
|
||||
expect(observed.rawErrorPreview).toContain("Cookie: session=");
|
||||
});
|
||||
|
||||
it("does not let cookie redaction consume unrelated fields on the same line", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
"Cookie: session=abcdefghijklmnopqrstuvwxyz123456 status=503 request_id=req_cookie",
|
||||
);
|
||||
|
||||
expect(observed.rawErrorPreview).toContain("Cookie: session=");
|
||||
expect(observed.rawErrorPreview).toContain("status=503");
|
||||
expect(observed.rawErrorPreview).toContain("request_id=sha256:");
|
||||
});
|
||||
|
||||
it("builds sanitized generic text observation fields", () => {
|
||||
const observed = buildTextObservationFields(
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_prev"}',
|
||||
);
|
||||
|
||||
expect(observed).toMatchObject({
|
||||
textPreview: expect.stringContaining('"request_id":"sha256:'),
|
||||
textHash: expect.stringMatching(/^sha256:/),
|
||||
textFingerprint: expect.stringMatching(/^sha256:/),
|
||||
providerErrorType: "overloaded_error",
|
||||
providerErrorMessagePreview: "Overloaded",
|
||||
requestIdHash: expect.stringMatching(/^sha256:/),
|
||||
});
|
||||
expect(observed.textPreview).not.toContain("req_prev");
|
||||
});
|
||||
|
||||
it("redacts request ids in formatted plain-text errors", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
"LLM error overloaded_error: Overloaded (request_id: req_plaintext_123)",
|
||||
);
|
||||
|
||||
expect(observed).toMatchObject({
|
||||
rawErrorPreview: expect.stringContaining("request_id: sha256:"),
|
||||
rawErrorFingerprint: expect.stringMatching(/^sha256:/),
|
||||
requestIdHash: expect.stringMatching(/^sha256:/),
|
||||
});
|
||||
expect(observed.rawErrorPreview).not.toContain("req_plaintext_123");
|
||||
});
|
||||
|
||||
it("keeps fingerprints stable across request ids for equivalent errors", () => {
|
||||
const first = buildApiErrorObservationFields(
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_001"}',
|
||||
);
|
||||
const second = buildApiErrorObservationFields(
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_002"}',
|
||||
);
|
||||
|
||||
expect(first.rawErrorFingerprint).toBe(second.rawErrorFingerprint);
|
||||
expect(first.rawErrorHash).not.toBe(second.rawErrorHash);
|
||||
});
|
||||
|
||||
it("truncates oversized raw and provider previews", () => {
|
||||
const longMessage = "X".repeat(260);
|
||||
const observed = buildApiErrorObservationFields(
|
||||
`{"type":"error","error":{"type":"server_error","message":"${longMessage}"},"request_id":"req_long"}`,
|
||||
);
|
||||
|
||||
expect(observed.rawErrorPreview).toBeDefined();
|
||||
expect(observed.providerErrorMessagePreview).toBeDefined();
|
||||
expect(observed.rawErrorPreview?.length).toBeLessThanOrEqual(401);
|
||||
expect(observed.providerErrorMessagePreview?.length).toBeLessThanOrEqual(201);
|
||||
expect(observed.providerErrorMessagePreview?.endsWith("…")).toBe(true);
|
||||
});
|
||||
|
||||
it("caps oversized raw inputs before hashing and fingerprinting", () => {
|
||||
const oversized = "X".repeat(70_000);
|
||||
const bounded = "X".repeat(64_000);
|
||||
|
||||
expect(buildApiErrorObservationFields(oversized)).toMatchObject({
|
||||
rawErrorHash: buildApiErrorObservationFields(bounded).rawErrorHash,
|
||||
rawErrorFingerprint: buildApiErrorObservationFields(bounded).rawErrorFingerprint,
|
||||
});
|
||||
});
|
||||
|
||||
it("returns empty observation fields for empty input", () => {
|
||||
expect(buildApiErrorObservationFields(undefined)).toEqual({});
|
||||
expect(buildApiErrorObservationFields("")).toEqual({});
|
||||
expect(buildApiErrorObservationFields(" ")).toEqual({});
|
||||
});
|
||||
|
||||
it("re-reads configured redact patterns on each call", () => {
|
||||
const readLoggingConfig = vi.spyOn(loggingConfigModule, "readLoggingConfig");
|
||||
readLoggingConfig.mockReturnValueOnce(undefined);
|
||||
readLoggingConfig.mockReturnValueOnce({
|
||||
redactPatterns: [String.raw`\bcustom-secret-[A-Za-z0-9]+\b`],
|
||||
});
|
||||
|
||||
const first = buildApiErrorObservationFields("custom-secret-abc123");
|
||||
const second = buildApiErrorObservationFields("custom-secret-abc123");
|
||||
|
||||
expect(first.rawErrorPreview).toContain("custom-secret-abc123");
|
||||
expect(second.rawErrorPreview).not.toContain("custom-secret-abc123");
|
||||
expect(second.rawErrorPreview).toContain("custom");
|
||||
});
|
||||
|
||||
it("fails closed when observation sanitization throws", () => {
|
||||
vi.spyOn(loggingConfigModule, "readLoggingConfig").mockImplementation(() => {
|
||||
throw new Error("boom");
|
||||
});
|
||||
|
||||
expect(buildApiErrorObservationFields("request_id=req_123")).toEqual({});
|
||||
expect(buildTextObservationFields("request_id=req_123")).toEqual({
|
||||
textPreview: undefined,
|
||||
textHash: undefined,
|
||||
textFingerprint: undefined,
|
||||
httpCode: undefined,
|
||||
providerErrorType: undefined,
|
||||
providerErrorMessagePreview: undefined,
|
||||
requestIdHash: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores non-string configured redact patterns", () => {
|
||||
vi.spyOn(loggingConfigModule, "readLoggingConfig").mockReturnValue({
|
||||
redactPatterns: [
|
||||
123 as never,
|
||||
{ bad: true } as never,
|
||||
String.raw`\bcustom-secret-[A-Za-z0-9]+\b`,
|
||||
],
|
||||
});
|
||||
|
||||
const observed = buildApiErrorObservationFields("custom-secret-abc123");
|
||||
|
||||
expect(observed.rawErrorPreview).not.toContain("custom-secret-abc123");
|
||||
expect(observed.rawErrorPreview).toContain("custom");
|
||||
});
|
||||
});
|
||||
|
||||
describe("sanitizeForConsole", () => {
|
||||
it("strips control characters from console-facing values", () => {
|
||||
expect(sanitizeForConsole("run-1\nprovider\tmodel\rtest")).toBe("run-1 provider model test");
|
||||
});
|
||||
});
|
||||
@@ -1,199 +0,0 @@
|
||||
import { readLoggingConfig } from "../logging/config.js";
|
||||
import { redactIdentifier } from "../logging/redact-identifier.js";
|
||||
import { getDefaultRedactPatterns, redactSensitiveText } from "../logging/redact.js";
|
||||
import { getApiErrorPayloadFingerprint, parseApiErrorInfo } from "./pi-embedded-helpers.js";
|
||||
import { stableStringify } from "./stable-stringify.js";
|
||||
|
||||
const MAX_OBSERVATION_INPUT_CHARS = 64_000;
|
||||
const MAX_FINGERPRINT_MESSAGE_CHARS = 8_000;
|
||||
const RAW_ERROR_PREVIEW_MAX_CHARS = 400;
|
||||
const PROVIDER_ERROR_PREVIEW_MAX_CHARS = 200;
|
||||
const REQUEST_ID_RE = /\brequest[_ ]?id\b\s*[:=]\s*["'()]*([A-Za-z0-9._:-]+)/i;
|
||||
const OBSERVATION_EXTRA_REDACT_PATTERNS = [
|
||||
String.raw`\b(?:x-)?api[-_]?key\b\s*[:=]\s*(["']?)([^\s"'\\;]+)\1`,
|
||||
String.raw`"(?:api[-_]?key|api_key)"\s*:\s*"([^"]+)"`,
|
||||
String.raw`(?:\bCookie\b\s*[:=]\s*[^;=\s]+=|;\s*[^;=\s]+=)([^;\s\r\n]+)`,
|
||||
];
|
||||
|
||||
function resolveConfiguredRedactPatterns(): string[] {
|
||||
const configured = readLoggingConfig()?.redactPatterns;
|
||||
if (!Array.isArray(configured)) {
|
||||
return [];
|
||||
}
|
||||
return configured.filter((pattern): pattern is string => typeof pattern === "string");
|
||||
}
|
||||
|
||||
function truncateForObservation(text: string | undefined, maxChars: number): string | undefined {
|
||||
const trimmed = text?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
return trimmed.length > maxChars ? `${trimmed.slice(0, maxChars)}…` : trimmed;
|
||||
}
|
||||
|
||||
function boundObservationInput(text: string | undefined): string | undefined {
|
||||
const trimmed = text?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
return trimmed.length > MAX_OBSERVATION_INPUT_CHARS
|
||||
? trimmed.slice(0, MAX_OBSERVATION_INPUT_CHARS)
|
||||
: trimmed;
|
||||
}
|
||||
|
||||
export function sanitizeForConsole(text: string | undefined, maxChars = 200): string | undefined {
|
||||
const trimmed = text?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
const withoutControlChars = Array.from(trimmed)
|
||||
.filter((char) => {
|
||||
const code = char.charCodeAt(0);
|
||||
return !(
|
||||
code <= 0x08 ||
|
||||
code === 0x0b ||
|
||||
code === 0x0c ||
|
||||
(code >= 0x0e && code <= 0x1f) ||
|
||||
code === 0x7f
|
||||
);
|
||||
})
|
||||
.join("");
|
||||
const sanitized = withoutControlChars
|
||||
.replace(/[\r\n\t]+/g, " ")
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
return sanitized.length > maxChars ? `${sanitized.slice(0, maxChars)}…` : sanitized;
|
||||
}
|
||||
|
||||
function replaceRequestIdPreview(
|
||||
text: string | undefined,
|
||||
requestId: string | undefined,
|
||||
): string | undefined {
|
||||
if (!text || !requestId) {
|
||||
return text;
|
||||
}
|
||||
return text.split(requestId).join(redactIdentifier(requestId, { len: 12 }));
|
||||
}
|
||||
|
||||
function redactObservationText(text: string | undefined): string | undefined {
|
||||
if (!text) {
|
||||
return text;
|
||||
}
|
||||
// Observation logs must stay redacted even when operators disable general-purpose
|
||||
// log redaction, otherwise raw provider payloads leak back into always-on logs.
|
||||
const configuredPatterns = resolveConfiguredRedactPatterns();
|
||||
return redactSensitiveText(text, {
|
||||
mode: "tools",
|
||||
patterns: [
|
||||
...getDefaultRedactPatterns(),
|
||||
...configuredPatterns,
|
||||
...OBSERVATION_EXTRA_REDACT_PATTERNS,
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
function extractRequestId(text: string | undefined): string | undefined {
|
||||
if (!text) {
|
||||
return undefined;
|
||||
}
|
||||
const match = text.match(REQUEST_ID_RE);
|
||||
return match?.[1]?.trim() || undefined;
|
||||
}
|
||||
|
||||
function buildObservationFingerprint(params: {
|
||||
raw: string;
|
||||
requestId?: string;
|
||||
httpCode?: string;
|
||||
type?: string;
|
||||
message?: string;
|
||||
}): string | null {
|
||||
const boundedMessage =
|
||||
params.message && params.message.length > MAX_FINGERPRINT_MESSAGE_CHARS
|
||||
? params.message.slice(0, MAX_FINGERPRINT_MESSAGE_CHARS)
|
||||
: params.message;
|
||||
const structured =
|
||||
params.httpCode || params.type || boundedMessage
|
||||
? stableStringify({
|
||||
httpCode: params.httpCode,
|
||||
type: params.type,
|
||||
message: boundedMessage,
|
||||
})
|
||||
: null;
|
||||
if (structured) {
|
||||
return structured;
|
||||
}
|
||||
if (params.requestId) {
|
||||
return params.raw.split(params.requestId).join("<request_id>");
|
||||
}
|
||||
return getApiErrorPayloadFingerprint(params.raw);
|
||||
}
|
||||
|
||||
export function buildApiErrorObservationFields(rawError?: string): {
|
||||
rawErrorPreview?: string;
|
||||
rawErrorHash?: string;
|
||||
rawErrorFingerprint?: string;
|
||||
httpCode?: string;
|
||||
providerErrorType?: string;
|
||||
providerErrorMessagePreview?: string;
|
||||
requestIdHash?: string;
|
||||
} {
|
||||
const trimmed = boundObservationInput(rawError);
|
||||
if (!trimmed) {
|
||||
return {};
|
||||
}
|
||||
try {
|
||||
const parsed = parseApiErrorInfo(trimmed);
|
||||
const requestId = parsed?.requestId ?? extractRequestId(trimmed);
|
||||
const requestIdHash = requestId ? redactIdentifier(requestId, { len: 12 }) : undefined;
|
||||
const rawFingerprint = buildObservationFingerprint({
|
||||
raw: trimmed,
|
||||
requestId,
|
||||
httpCode: parsed?.httpCode,
|
||||
type: parsed?.type,
|
||||
message: parsed?.message,
|
||||
});
|
||||
const redactedRawPreview = replaceRequestIdPreview(redactObservationText(trimmed), requestId);
|
||||
const redactedProviderMessage = replaceRequestIdPreview(
|
||||
redactObservationText(parsed?.message),
|
||||
requestId,
|
||||
);
|
||||
|
||||
return {
|
||||
rawErrorPreview: truncateForObservation(redactedRawPreview, RAW_ERROR_PREVIEW_MAX_CHARS),
|
||||
rawErrorHash: redactIdentifier(trimmed, { len: 12 }),
|
||||
rawErrorFingerprint: rawFingerprint
|
||||
? redactIdentifier(rawFingerprint, { len: 12 })
|
||||
: undefined,
|
||||
httpCode: parsed?.httpCode,
|
||||
providerErrorType: parsed?.type,
|
||||
providerErrorMessagePreview: truncateForObservation(
|
||||
redactedProviderMessage,
|
||||
PROVIDER_ERROR_PREVIEW_MAX_CHARS,
|
||||
),
|
||||
requestIdHash,
|
||||
};
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
export function buildTextObservationFields(text?: string): {
|
||||
textPreview?: string;
|
||||
textHash?: string;
|
||||
textFingerprint?: string;
|
||||
httpCode?: string;
|
||||
providerErrorType?: string;
|
||||
providerErrorMessagePreview?: string;
|
||||
requestIdHash?: string;
|
||||
} {
|
||||
const observed = buildApiErrorObservationFields(text);
|
||||
return {
|
||||
textPreview: observed.rawErrorPreview,
|
||||
textHash: observed.rawErrorHash,
|
||||
textFingerprint: observed.rawErrorFingerprint,
|
||||
httpCode: observed.httpCode,
|
||||
providerErrorType: observed.providerErrorType,
|
||||
providerErrorMessagePreview: observed.providerErrorMessagePreview,
|
||||
requestIdHash: observed.requestIdHash,
|
||||
};
|
||||
}
|
||||
@@ -61,7 +61,6 @@ import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
|
||||
import { log } from "./logger.js";
|
||||
import { resolveModel } from "./model.js";
|
||||
import { runEmbeddedAttempt } from "./run/attempt.js";
|
||||
import { createFailoverDecisionLogger } from "./run/failover-observation.js";
|
||||
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
|
||||
import { buildEmbeddedRunPayloads } from "./run/payloads.js";
|
||||
import {
|
||||
@@ -1227,26 +1226,11 @@ export async function runEmbeddedPiAgent(
|
||||
reason: promptProfileFailureReason,
|
||||
});
|
||||
const promptFailoverFailure = isFailoverErrorMessage(errorText);
|
||||
// Capture the failing profile before auth-profile rotation mutates `lastProfileId`.
|
||||
const failedPromptProfileId = lastProfileId;
|
||||
const logPromptFailoverDecision = createFailoverDecisionLogger({
|
||||
stage: "prompt",
|
||||
runId: params.runId,
|
||||
rawError: errorText,
|
||||
failoverReason: promptFailoverReason,
|
||||
profileFailureReason: promptProfileFailureReason,
|
||||
provider,
|
||||
model: modelId,
|
||||
profileId: failedPromptProfileId,
|
||||
fallbackConfigured,
|
||||
aborted,
|
||||
});
|
||||
if (
|
||||
promptFailoverFailure &&
|
||||
promptFailoverReason !== "timeout" &&
|
||||
(await advanceAuthProfile())
|
||||
) {
|
||||
logPromptFailoverDecision("rotate_profile");
|
||||
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
|
||||
continue;
|
||||
}
|
||||
@@ -1265,20 +1249,15 @@ export async function runEmbeddedPiAgent(
|
||||
// are configured so outer model fallback can continue on overload,
|
||||
// rate-limit, auth, or billing failures.
|
||||
if (fallbackConfigured && promptFailoverFailure) {
|
||||
const status = resolveFailoverStatus(promptFailoverReason ?? "unknown");
|
||||
logPromptFailoverDecision("fallback_model", { status });
|
||||
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
|
||||
throw new FailoverError(errorText, {
|
||||
reason: promptFailoverReason ?? "unknown",
|
||||
provider,
|
||||
model: modelId,
|
||||
profileId: lastProfileId,
|
||||
status,
|
||||
status: resolveFailoverStatus(promptFailoverReason ?? "unknown"),
|
||||
});
|
||||
}
|
||||
if (promptFailoverFailure || promptFailoverReason) {
|
||||
logPromptFailoverDecision("surface_error");
|
||||
}
|
||||
throw promptError;
|
||||
}
|
||||
|
||||
@@ -1303,21 +1282,6 @@ export async function runEmbeddedPiAgent(
|
||||
resolveAuthProfileFailureReason(assistantFailoverReason);
|
||||
const cloudCodeAssistFormatError = attempt.cloudCodeAssistFormatError;
|
||||
const imageDimensionError = parseImageDimensionError(lastAssistant?.errorMessage ?? "");
|
||||
// Capture the failing profile before auth-profile rotation mutates `lastProfileId`.
|
||||
const failedAssistantProfileId = lastProfileId;
|
||||
const logAssistantFailoverDecision = createFailoverDecisionLogger({
|
||||
stage: "assistant",
|
||||
runId: params.runId,
|
||||
rawError: lastAssistant?.errorMessage?.trim(),
|
||||
failoverReason: assistantFailoverReason,
|
||||
profileFailureReason: assistantProfileFailureReason,
|
||||
provider: activeErrorContext.provider,
|
||||
model: activeErrorContext.model,
|
||||
profileId: failedAssistantProfileId,
|
||||
fallbackConfigured,
|
||||
timedOut,
|
||||
aborted,
|
||||
});
|
||||
|
||||
if (
|
||||
authFailure &&
|
||||
@@ -1375,7 +1339,6 @@ export async function runEmbeddedPiAgent(
|
||||
|
||||
const rotated = await advanceAuthProfile();
|
||||
if (rotated) {
|
||||
logAssistantFailoverDecision("rotate_profile");
|
||||
await maybeBackoffBeforeOverloadFailover(assistantFailoverReason);
|
||||
continue;
|
||||
}
|
||||
@@ -1408,7 +1371,6 @@ export async function runEmbeddedPiAgent(
|
||||
const status =
|
||||
resolveFailoverStatus(assistantFailoverReason ?? "unknown") ??
|
||||
(isTimeoutErrorMessage(message) ? 408 : undefined);
|
||||
logAssistantFailoverDecision("fallback_model", { status });
|
||||
throw new FailoverError(message, {
|
||||
reason: assistantFailoverReason ?? "unknown",
|
||||
provider: activeErrorContext.provider,
|
||||
@@ -1417,7 +1379,6 @@ export async function runEmbeddedPiAgent(
|
||||
status,
|
||||
});
|
||||
}
|
||||
logAssistantFailoverDecision("surface_error");
|
||||
}
|
||||
|
||||
const usage = toNormalizedUsage(usageAccumulator);
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { normalizeFailoverDecisionObservationBase } from "./failover-observation.js";
|
||||
|
||||
describe("normalizeFailoverDecisionObservationBase", () => {
|
||||
it("fills timeout observation reasons for deadline timeouts without provider error text", () => {
|
||||
expect(
|
||||
normalizeFailoverDecisionObservationBase({
|
||||
stage: "assistant",
|
||||
runId: "run:timeout",
|
||||
rawError: "",
|
||||
failoverReason: null,
|
||||
profileFailureReason: null,
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
profileId: "openai:p1",
|
||||
fallbackConfigured: false,
|
||||
timedOut: true,
|
||||
aborted: false,
|
||||
}),
|
||||
).toMatchObject({
|
||||
failoverReason: "timeout",
|
||||
profileFailureReason: "timeout",
|
||||
timedOut: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves explicit failover reasons", () => {
|
||||
expect(
|
||||
normalizeFailoverDecisionObservationBase({
|
||||
stage: "assistant",
|
||||
runId: "run:overloaded",
|
||||
rawError: '{"error":{"type":"overloaded_error"}}',
|
||||
failoverReason: "overloaded",
|
||||
profileFailureReason: "overloaded",
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
profileId: "openai:p1",
|
||||
fallbackConfigured: true,
|
||||
timedOut: true,
|
||||
aborted: false,
|
||||
}),
|
||||
).toMatchObject({
|
||||
failoverReason: "overloaded",
|
||||
profileFailureReason: "overloaded",
|
||||
timedOut: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,76 +0,0 @@
|
||||
import { redactIdentifier } from "../../../logging/redact-identifier.js";
|
||||
import type { AuthProfileFailureReason } from "../../auth-profiles.js";
|
||||
import {
|
||||
buildApiErrorObservationFields,
|
||||
sanitizeForConsole,
|
||||
} from "../../pi-embedded-error-observation.js";
|
||||
import type { FailoverReason } from "../../pi-embedded-helpers.js";
|
||||
import { log } from "../logger.js";
|
||||
|
||||
export type FailoverDecisionLoggerInput = {
|
||||
stage: "prompt" | "assistant";
|
||||
decision: "rotate_profile" | "fallback_model" | "surface_error";
|
||||
runId?: string;
|
||||
rawError?: string;
|
||||
failoverReason: FailoverReason | null;
|
||||
profileFailureReason?: AuthProfileFailureReason | null;
|
||||
provider: string;
|
||||
model: string;
|
||||
profileId?: string;
|
||||
fallbackConfigured: boolean;
|
||||
timedOut?: boolean;
|
||||
aborted?: boolean;
|
||||
status?: number;
|
||||
};
|
||||
|
||||
export type FailoverDecisionLoggerBase = Omit<FailoverDecisionLoggerInput, "decision" | "status">;
|
||||
|
||||
export function normalizeFailoverDecisionObservationBase(
|
||||
base: FailoverDecisionLoggerBase,
|
||||
): FailoverDecisionLoggerBase {
|
||||
return {
|
||||
...base,
|
||||
failoverReason: base.failoverReason ?? (base.timedOut ? "timeout" : null),
|
||||
profileFailureReason: base.profileFailureReason ?? (base.timedOut ? "timeout" : null),
|
||||
};
|
||||
}
|
||||
|
||||
export function createFailoverDecisionLogger(
|
||||
base: FailoverDecisionLoggerBase,
|
||||
): (
|
||||
decision: FailoverDecisionLoggerInput["decision"],
|
||||
extra?: Pick<FailoverDecisionLoggerInput, "status">,
|
||||
) => void {
|
||||
const normalizedBase = normalizeFailoverDecisionObservationBase(base);
|
||||
const safeProfileId = normalizedBase.profileId
|
||||
? redactIdentifier(normalizedBase.profileId, { len: 12 })
|
||||
: undefined;
|
||||
const safeRunId = sanitizeForConsole(normalizedBase.runId) ?? "-";
|
||||
const safeProvider = sanitizeForConsole(normalizedBase.provider) ?? "-";
|
||||
const safeModel = sanitizeForConsole(normalizedBase.model) ?? "-";
|
||||
const profileText = safeProfileId ?? "-";
|
||||
const reasonText = normalizedBase.failoverReason ?? "none";
|
||||
return (decision, extra) => {
|
||||
const observedError = buildApiErrorObservationFields(normalizedBase.rawError);
|
||||
log.warn("embedded run failover decision", {
|
||||
event: "embedded_run_failover_decision",
|
||||
tags: ["error_handling", "failover", normalizedBase.stage, decision],
|
||||
runId: normalizedBase.runId,
|
||||
stage: normalizedBase.stage,
|
||||
decision,
|
||||
failoverReason: normalizedBase.failoverReason,
|
||||
profileFailureReason: normalizedBase.profileFailureReason,
|
||||
provider: normalizedBase.provider,
|
||||
model: normalizedBase.model,
|
||||
profileId: safeProfileId,
|
||||
fallbackConfigured: normalizedBase.fallbackConfigured,
|
||||
timedOut: normalizedBase.timedOut,
|
||||
aborted: normalizedBase.aborted,
|
||||
status: extra?.status,
|
||||
...observedError,
|
||||
consoleMessage:
|
||||
`embedded run failover decision: runId=${safeRunId} stage=${normalizedBase.stage} decision=${decision} ` +
|
||||
`reason=${reasonText} provider=${safeProvider}/${safeModel} profile=${profileText}`,
|
||||
});
|
||||
};
|
||||
}
|
||||
@@ -54,13 +54,8 @@ describe("handleAgentEnd", () => {
|
||||
|
||||
const warn = vi.mocked(ctx.log.warn);
|
||||
expect(warn).toHaveBeenCalledTimes(1);
|
||||
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
|
||||
expect(warn.mock.calls[0]?.[1]).toMatchObject({
|
||||
event: "embedded_run_agent_end",
|
||||
runId: "run-1",
|
||||
error: "connection refused",
|
||||
rawErrorPreview: "connection refused",
|
||||
});
|
||||
expect(warn.mock.calls[0]?.[0]).toContain("runId=run-1");
|
||||
expect(warn.mock.calls[0]?.[0]).toContain("error=connection refused");
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
@@ -70,59 +65,6 @@ describe("handleAgentEnd", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("attaches raw provider error metadata without changing the console message", () => {
|
||||
const ctx = createContext({
|
||||
role: "assistant",
|
||||
stopReason: "error",
|
||||
provider: "anthropic",
|
||||
model: "claude-test",
|
||||
errorMessage: '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}',
|
||||
content: [{ type: "text", text: "" }],
|
||||
});
|
||||
|
||||
handleAgentEnd(ctx);
|
||||
|
||||
const warn = vi.mocked(ctx.log.warn);
|
||||
expect(warn).toHaveBeenCalledTimes(1);
|
||||
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
|
||||
expect(warn.mock.calls[0]?.[1]).toMatchObject({
|
||||
event: "embedded_run_agent_end",
|
||||
runId: "run-1",
|
||||
error: "The AI service is temporarily overloaded. Please try again in a moment.",
|
||||
failoverReason: "overloaded",
|
||||
providerErrorType: "overloaded_error",
|
||||
});
|
||||
});
|
||||
|
||||
it("redacts logged error text before emitting lifecycle events", () => {
|
||||
const onAgentEvent = vi.fn();
|
||||
const ctx = createContext(
|
||||
{
|
||||
role: "assistant",
|
||||
stopReason: "error",
|
||||
errorMessage: "x-api-key: sk-abcdefghijklmnopqrstuvwxyz123456",
|
||||
content: [{ type: "text", text: "" }],
|
||||
},
|
||||
{ onAgentEvent },
|
||||
);
|
||||
|
||||
handleAgentEnd(ctx);
|
||||
|
||||
const warn = vi.mocked(ctx.log.warn);
|
||||
expect(warn.mock.calls[0]?.[1]).toMatchObject({
|
||||
event: "embedded_run_agent_end",
|
||||
error: "x-api-key: ***",
|
||||
rawErrorPreview: "x-api-key: ***",
|
||||
});
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
error: "x-api-key: ***",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps non-error run-end logging on debug only", () => {
|
||||
const ctx = createContext(undefined);
|
||||
|
||||
|
||||
@@ -1,11 +1,6 @@
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
import { createInlineCodeState } from "../markdown/code-spans.js";
|
||||
import {
|
||||
buildApiErrorObservationFields,
|
||||
buildTextObservationFields,
|
||||
sanitizeForConsole,
|
||||
} from "./pi-embedded-error-observation.js";
|
||||
import { classifyFailoverReason, formatAssistantErrorText } from "./pi-embedded-helpers.js";
|
||||
import { formatAssistantErrorText } from "./pi-embedded-helpers.js";
|
||||
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
|
||||
import { isAssistantMessage } from "./pi-embedded-utils.js";
|
||||
|
||||
@@ -41,31 +36,16 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
|
||||
provider: lastAssistant.provider,
|
||||
model: lastAssistant.model,
|
||||
});
|
||||
const rawError = lastAssistant.errorMessage?.trim();
|
||||
const failoverReason = classifyFailoverReason(rawError ?? "");
|
||||
const errorText = (friendlyError || lastAssistant.errorMessage || "LLM request failed.").trim();
|
||||
const observedError = buildApiErrorObservationFields(rawError);
|
||||
const safeErrorText =
|
||||
buildTextObservationFields(errorText).textPreview ?? "LLM request failed.";
|
||||
const safeRunId = sanitizeForConsole(ctx.params.runId) ?? "-";
|
||||
ctx.log.warn("embedded run agent end", {
|
||||
event: "embedded_run_agent_end",
|
||||
tags: ["error_handling", "lifecycle", "agent_end", "assistant_error"],
|
||||
runId: ctx.params.runId,
|
||||
isError: true,
|
||||
error: safeErrorText,
|
||||
failoverReason,
|
||||
provider: lastAssistant.provider,
|
||||
model: lastAssistant.model,
|
||||
...observedError,
|
||||
consoleMessage: `embedded run agent end: runId=${safeRunId} isError=true error=${safeErrorText}`,
|
||||
});
|
||||
ctx.log.warn(
|
||||
`embedded run agent end: runId=${ctx.params.runId} isError=true error=${errorText}`,
|
||||
);
|
||||
emitAgentEvent({
|
||||
runId: ctx.params.runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
error: safeErrorText,
|
||||
error: errorText,
|
||||
endedAt: Date.now(),
|
||||
},
|
||||
});
|
||||
@@ -73,7 +53,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
error: safeErrorText,
|
||||
error: errorText,
|
||||
},
|
||||
});
|
||||
} else {
|
||||
|
||||
@@ -12,8 +12,8 @@ import type {
|
||||
import type { NormalizedUsage } from "./usage.js";
|
||||
|
||||
export type EmbeddedSubscribeLogger = {
|
||||
debug: (message: string, meta?: Record<string, unknown>) => void;
|
||||
warn: (message: string, meta?: Record<string, unknown>) => void;
|
||||
debug: (message: string) => void;
|
||||
warn: (message: string) => void;
|
||||
};
|
||||
|
||||
export type ToolErrorSummary = {
|
||||
|
||||
@@ -16,7 +16,7 @@ export function registerCronCli(program: Command) {
|
||||
.addHelpText(
|
||||
"after",
|
||||
() =>
|
||||
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n${theme.muted("Upgrade tip:")} run \`openclaw doctor --fix\` to normalize legacy cron job storage.\n`,
|
||||
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n`,
|
||||
);
|
||||
|
||||
registerCronStatusCommand(cron);
|
||||
|
||||
@@ -1,269 +0,0 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import * as noteModule from "../terminal/note.js";
|
||||
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
|
||||
|
||||
let tempRoot: string | null = null;
|
||||
|
||||
async function makeTempStorePath() {
|
||||
tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-doctor-cron-"));
|
||||
return path.join(tempRoot, "cron", "jobs.json");
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
vi.restoreAllMocks();
|
||||
if (tempRoot) {
|
||||
await fs.rm(tempRoot, { recursive: true, force: true });
|
||||
tempRoot = null;
|
||||
}
|
||||
});
|
||||
|
||||
function makePrompter(confirmResult = true) {
|
||||
return {
|
||||
confirm: vi.fn().mockResolvedValue(confirmResult),
|
||||
};
|
||||
}
|
||||
|
||||
describe("maybeRepairLegacyCronStore", () => {
|
||||
it("repairs legacy cron store fields and migrates notify fallback to webhook delivery", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
jobId: "legacy-job",
|
||||
name: "Legacy job",
|
||||
notify: true,
|
||||
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
|
||||
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "Morning brief",
|
||||
},
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
|
||||
const cfg: OpenClawConfig = {
|
||||
cron: {
|
||||
store: storePath,
|
||||
webhook: "https://example.invalid/cron-finished",
|
||||
},
|
||||
};
|
||||
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg,
|
||||
options: {},
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
const [job] = persisted.jobs;
|
||||
expect(job?.jobId).toBeUndefined();
|
||||
expect(job?.id).toBe("legacy-job");
|
||||
expect(job?.notify).toBeUndefined();
|
||||
expect(job?.schedule).toMatchObject({
|
||||
kind: "cron",
|
||||
expr: "0 7 * * *",
|
||||
tz: "UTC",
|
||||
});
|
||||
expect(job?.delivery).toMatchObject({
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron-finished",
|
||||
});
|
||||
expect(job?.payload).toMatchObject({
|
||||
kind: "systemEvent",
|
||||
text: "Morning brief",
|
||||
});
|
||||
|
||||
expect(noteSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Legacy cron job storage detected"),
|
||||
"Cron",
|
||||
);
|
||||
expect(noteSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Cron store normalized"),
|
||||
"Doctor changes",
|
||||
);
|
||||
});
|
||||
|
||||
it("warns instead of replacing announce delivery for notify fallback jobs", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "notify-and-announce",
|
||||
name: "Notify and announce",
|
||||
notify: true,
|
||||
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "agentTurn", message: "Status" },
|
||||
delivery: { mode: "announce", channel: "telegram", to: "123" },
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
|
||||
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg: {
|
||||
cron: {
|
||||
store: storePath,
|
||||
webhook: "https://example.invalid/cron-finished",
|
||||
},
|
||||
},
|
||||
options: { nonInteractive: true },
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
expect(persisted.jobs[0]?.notify).toBe(true);
|
||||
expect(noteSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('uses legacy notify fallback alongside delivery mode "announce"'),
|
||||
"Doctor warnings",
|
||||
);
|
||||
});
|
||||
|
||||
it("does not auto-repair in non-interactive mode without explicit repair approval", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
jobId: "legacy-job",
|
||||
name: "Legacy job",
|
||||
notify: true,
|
||||
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
|
||||
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "Morning brief",
|
||||
},
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
|
||||
const prompter = makePrompter(false);
|
||||
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg: {
|
||||
cron: {
|
||||
store: storePath,
|
||||
webhook: "https://example.invalid/cron-finished",
|
||||
},
|
||||
},
|
||||
options: { nonInteractive: true },
|
||||
prompter,
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
expect(prompter.confirm).toHaveBeenCalledWith({
|
||||
message: "Repair legacy cron jobs now?",
|
||||
initialValue: true,
|
||||
});
|
||||
expect(persisted.jobs[0]?.jobId).toBe("legacy-job");
|
||||
expect(persisted.jobs[0]?.notify).toBe(true);
|
||||
expect(noteSpy).not.toHaveBeenCalledWith(
|
||||
expect.stringContaining("Cron store normalized"),
|
||||
"Doctor changes",
|
||||
);
|
||||
});
|
||||
|
||||
it("migrates notify fallback none delivery jobs to cron.webhook", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "notify-none",
|
||||
name: "Notify none",
|
||||
notify: true,
|
||||
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "Status",
|
||||
},
|
||||
delivery: { mode: "none", to: "123456789" },
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg: {
|
||||
cron: {
|
||||
store: storePath,
|
||||
webhook: "https://example.invalid/cron-finished",
|
||||
},
|
||||
},
|
||||
options: {},
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
expect(persisted.jobs[0]?.notify).toBeUndefined();
|
||||
expect(persisted.jobs[0]?.delivery).toMatchObject({
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron-finished",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,183 +0,0 @@
|
||||
import { formatCliCommand } from "../cli/command-format.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { normalizeStoredCronJobs } from "../cron/store-migration.js";
|
||||
import { resolveCronStorePath, loadCronStore, saveCronStore } from "../cron/store.js";
|
||||
import type { CronJob } from "../cron/types.js";
|
||||
import { note } from "../terminal/note.js";
|
||||
import { shortenHomePath } from "../utils.js";
|
||||
import type { DoctorPrompter, DoctorOptions } from "./doctor-prompter.js";
|
||||
|
||||
type CronDoctorOutcome = {
|
||||
changed: boolean;
|
||||
warnings: string[];
|
||||
};
|
||||
|
||||
function pluralize(count: number, noun: string) {
|
||||
return `${count} ${noun}${count === 1 ? "" : "s"}`;
|
||||
}
|
||||
|
||||
function formatLegacyIssuePreview(issues: Partial<Record<string, number>>): string[] {
|
||||
const lines: string[] = [];
|
||||
if (issues.jobId) {
|
||||
lines.push(`- ${pluralize(issues.jobId, "job")} still uses legacy \`jobId\``);
|
||||
}
|
||||
if (issues.legacyScheduleString) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyScheduleString, "job")} stores schedule as a bare string`,
|
||||
);
|
||||
}
|
||||
if (issues.legacyScheduleCron) {
|
||||
lines.push(`- ${pluralize(issues.legacyScheduleCron, "job")} still uses \`schedule.cron\``);
|
||||
}
|
||||
if (issues.legacyPayloadKind) {
|
||||
lines.push(`- ${pluralize(issues.legacyPayloadKind, "job")} needs payload kind normalization`);
|
||||
}
|
||||
if (issues.legacyPayloadProvider) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyPayloadProvider, "job")} still uses payload \`provider\` as a delivery alias`,
|
||||
);
|
||||
}
|
||||
if (issues.legacyTopLevelPayloadFields) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyTopLevelPayloadFields, "job")} still uses top-level payload fields`,
|
||||
);
|
||||
}
|
||||
if (issues.legacyTopLevelDeliveryFields) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyTopLevelDeliveryFields, "job")} still uses top-level delivery fields`,
|
||||
);
|
||||
}
|
||||
if (issues.legacyDeliveryMode) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyDeliveryMode, "job")} still uses delivery mode \`deliver\``,
|
||||
);
|
||||
}
|
||||
return lines;
|
||||
}
|
||||
|
||||
function trimString(value: unknown): string | undefined {
|
||||
return typeof value === "string" && value.trim() ? value.trim() : undefined;
|
||||
}
|
||||
|
||||
function migrateLegacyNotifyFallback(params: {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
legacyWebhook?: string;
|
||||
}): CronDoctorOutcome {
|
||||
let changed = false;
|
||||
const warnings: string[] = [];
|
||||
|
||||
for (const raw of params.jobs) {
|
||||
if (!("notify" in raw)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const jobName = trimString(raw.name) ?? trimString(raw.id) ?? "<unnamed>";
|
||||
const notify = raw.notify === true;
|
||||
if (!notify) {
|
||||
delete raw.notify;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
const delivery =
|
||||
raw.delivery && typeof raw.delivery === "object" && !Array.isArray(raw.delivery)
|
||||
? (raw.delivery as Record<string, unknown>)
|
||||
: null;
|
||||
const mode = trimString(delivery?.mode)?.toLowerCase();
|
||||
const to = trimString(delivery?.to);
|
||||
|
||||
if (mode === "webhook" && to) {
|
||||
delete raw.notify;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((mode === undefined || mode === "none" || mode === "webhook") && params.legacyWebhook) {
|
||||
raw.delivery = {
|
||||
...delivery,
|
||||
mode: "webhook",
|
||||
to: mode === "none" ? params.legacyWebhook : (to ?? params.legacyWebhook),
|
||||
};
|
||||
delete raw.notify;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!params.legacyWebhook) {
|
||||
warnings.push(
|
||||
`Cron job "${jobName}" still uses legacy notify fallback, but cron.webhook is unset so doctor cannot migrate it automatically.`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
warnings.push(
|
||||
`Cron job "${jobName}" uses legacy notify fallback alongside delivery mode "${mode}". Migrate it manually so webhook delivery does not replace existing announce behavior.`,
|
||||
);
|
||||
}
|
||||
|
||||
return { changed, warnings };
|
||||
}
|
||||
|
||||
export async function maybeRepairLegacyCronStore(params: {
|
||||
cfg: OpenClawConfig;
|
||||
options: DoctorOptions;
|
||||
prompter: Pick<DoctorPrompter, "confirm">;
|
||||
}) {
|
||||
const storePath = resolveCronStorePath(params.cfg.cron?.store);
|
||||
const store = await loadCronStore(storePath);
|
||||
const rawJobs = (store.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
||||
if (rawJobs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const normalized = normalizeStoredCronJobs(rawJobs);
|
||||
const legacyWebhook = trimString(params.cfg.cron?.webhook);
|
||||
const notifyCount = rawJobs.filter((job) => job.notify === true).length;
|
||||
const previewLines = formatLegacyIssuePreview(normalized.issues);
|
||||
if (notifyCount > 0) {
|
||||
previewLines.push(
|
||||
`- ${pluralize(notifyCount, "job")} still uses legacy \`notify: true\` webhook fallback`,
|
||||
);
|
||||
}
|
||||
if (previewLines.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
note(
|
||||
[
|
||||
`Legacy cron job storage detected at ${shortenHomePath(storePath)}.`,
|
||||
...previewLines,
|
||||
`Repair with ${formatCliCommand("openclaw doctor --fix")} to normalize the store before the next scheduler run.`,
|
||||
].join("\n"),
|
||||
"Cron",
|
||||
);
|
||||
|
||||
const shouldRepair = await params.prompter.confirm({
|
||||
message: "Repair legacy cron jobs now?",
|
||||
initialValue: true,
|
||||
});
|
||||
if (!shouldRepair) {
|
||||
return;
|
||||
}
|
||||
|
||||
const notifyMigration = migrateLegacyNotifyFallback({
|
||||
jobs: rawJobs,
|
||||
legacyWebhook,
|
||||
});
|
||||
const changed = normalized.mutated || notifyMigration.changed;
|
||||
if (!changed && notifyMigration.warnings.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (changed) {
|
||||
await saveCronStore(storePath, {
|
||||
version: 1,
|
||||
jobs: rawJobs as unknown as CronJob[],
|
||||
});
|
||||
note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes");
|
||||
}
|
||||
|
||||
if (notifyMigration.warnings.length > 0) {
|
||||
note(notifyMigration.warnings.join("\n"), "Doctor warnings");
|
||||
}
|
||||
}
|
||||
@@ -31,7 +31,6 @@ import {
|
||||
import { noteBootstrapFileSize } from "./doctor-bootstrap-size.js";
|
||||
import { doctorShellCompletion } from "./doctor-completion.js";
|
||||
import { loadAndMaybeMigrateDoctorConfig } from "./doctor-config-flow.js";
|
||||
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
|
||||
import { maybeRepairGatewayDaemon } from "./doctor-gateway-daemon-flow.js";
|
||||
import { checkGatewayHealth, probeGatewayMemoryStatus } from "./doctor-gateway-health.js";
|
||||
import {
|
||||
@@ -221,11 +220,6 @@ export async function doctorCommand(
|
||||
|
||||
await noteStateIntegrity(cfg, prompter, configResult.path ?? CONFIG_PATH);
|
||||
await noteSessionLockHealth({ shouldRepair: prompter.shouldRepair });
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg,
|
||||
options,
|
||||
prompter,
|
||||
});
|
||||
|
||||
cfg = await maybeRepairSandboxImages(cfg, runtime, prompter);
|
||||
noteSandboxScopeWarnings(cfg);
|
||||
|
||||
@@ -1,143 +0,0 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
resolveDeliveryTarget: vi.fn(),
|
||||
deliverOutboundPayloads: vi.fn(),
|
||||
resolveAgentOutboundIdentity: vi.fn().mockReturnValue({ kind: "identity" }),
|
||||
buildOutboundSessionContext: vi.fn().mockReturnValue({ kind: "session" }),
|
||||
createOutboundSendDeps: vi.fn().mockReturnValue({ kind: "deps" }),
|
||||
warn: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./isolated-agent/delivery-target.js", () => ({
|
||||
resolveDeliveryTarget: mocks.resolveDeliveryTarget,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/outbound/deliver.js", () => ({
|
||||
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/outbound/identity.js", () => ({
|
||||
resolveAgentOutboundIdentity: mocks.resolveAgentOutboundIdentity,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/outbound/session-context.js", () => ({
|
||||
buildOutboundSessionContext: mocks.buildOutboundSessionContext,
|
||||
}));
|
||||
|
||||
vi.mock("../cli/outbound-send-deps.js", () => ({
|
||||
createOutboundSendDeps: mocks.createOutboundSendDeps,
|
||||
}));
|
||||
|
||||
vi.mock("../logging.js", () => ({
|
||||
getChildLogger: vi.fn(() => ({
|
||||
warn: mocks.warn,
|
||||
})),
|
||||
}));
|
||||
|
||||
const { sendFailureNotificationAnnounce } = await import("./delivery.js");
|
||||
|
||||
describe("sendFailureNotificationAnnounce", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mocks.resolveDeliveryTarget.mockResolvedValue({
|
||||
ok: true,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
accountId: "bot-a",
|
||||
threadId: 42,
|
||||
mode: "explicit",
|
||||
});
|
||||
mocks.deliverOutboundPayloads.mockResolvedValue([{ ok: true }]);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("delivers failure alerts to the resolved explicit target with strict send settings", async () => {
|
||||
const deps = {} as never;
|
||||
const cfg = {} as never;
|
||||
|
||||
await sendFailureNotificationAnnounce(
|
||||
deps,
|
||||
cfg,
|
||||
"main",
|
||||
"job-1",
|
||||
{ channel: "telegram", to: "123", accountId: "bot-a" },
|
||||
"Cron failed",
|
||||
);
|
||||
|
||||
expect(mocks.resolveDeliveryTarget).toHaveBeenCalledWith(cfg, "main", {
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
accountId: "bot-a",
|
||||
});
|
||||
expect(mocks.buildOutboundSessionContext).toHaveBeenCalledWith({
|
||||
cfg,
|
||||
agentId: "main",
|
||||
sessionKey: "cron:job-1:failure",
|
||||
});
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
accountId: "bot-a",
|
||||
threadId: 42,
|
||||
payloads: [{ text: "Cron failed" }],
|
||||
session: { kind: "session" },
|
||||
identity: { kind: "identity" },
|
||||
bestEffort: false,
|
||||
deps: { kind: "deps" },
|
||||
abortSignal: expect.any(AbortSignal),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not send when target resolution fails", async () => {
|
||||
mocks.resolveDeliveryTarget.mockResolvedValue({
|
||||
ok: false,
|
||||
error: new Error("target missing"),
|
||||
});
|
||||
|
||||
await sendFailureNotificationAnnounce(
|
||||
{} as never,
|
||||
{} as never,
|
||||
"main",
|
||||
"job-1",
|
||||
{ channel: "telegram", to: "123" },
|
||||
"Cron failed",
|
||||
);
|
||||
|
||||
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
|
||||
expect(mocks.warn).toHaveBeenCalledWith(
|
||||
{ error: "target missing" },
|
||||
"cron: failed to resolve failure destination target",
|
||||
);
|
||||
});
|
||||
|
||||
it("swallows outbound delivery errors after logging", async () => {
|
||||
mocks.deliverOutboundPayloads.mockRejectedValue(new Error("send failed"));
|
||||
|
||||
await expect(
|
||||
sendFailureNotificationAnnounce(
|
||||
{} as never,
|
||||
{} as never,
|
||||
"main",
|
||||
"job-1",
|
||||
{ channel: "telegram", to: "123" },
|
||||
"Cron failed",
|
||||
),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(mocks.warn).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
err: "send failed",
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
}),
|
||||
"cron: failure destination announce failed",
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -148,46 +148,6 @@ describe("resolveFailureDestination", () => {
|
||||
expect(plan).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null when webhook failure destination matches the primary webhook target", () => {
|
||||
const plan = resolveFailureDestination(
|
||||
makeJob({
|
||||
sessionTarget: "main",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
delivery: {
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron",
|
||||
failureDestination: {
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron",
|
||||
},
|
||||
},
|
||||
}),
|
||||
undefined,
|
||||
);
|
||||
expect(plan).toBeNull();
|
||||
});
|
||||
|
||||
it("does not reuse inherited announce recipient when switching failure destination to webhook", () => {
|
||||
const plan = resolveFailureDestination(
|
||||
makeJob({
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "111",
|
||||
failureDestination: {
|
||||
mode: "webhook",
|
||||
},
|
||||
},
|
||||
}),
|
||||
{
|
||||
channel: "signal",
|
||||
to: "group-abc",
|
||||
mode: "announce",
|
||||
},
|
||||
);
|
||||
expect(plan).toBeNull();
|
||||
});
|
||||
|
||||
it("allows job-level failure destination fields to clear inherited global values", () => {
|
||||
const plan = resolveFailureDestination(
|
||||
makeJob({
|
||||
|
||||
@@ -54,7 +54,6 @@ export async function runTelegramAnnounceTurn(params: {
|
||||
to?: string;
|
||||
bestEffort?: boolean;
|
||||
};
|
||||
deliveryContract?: "cron-owned" | "shared";
|
||||
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
|
||||
return runCronIsolatedAgentTurn({
|
||||
cfg: makeCfg(params.home, params.storePath, {
|
||||
@@ -68,6 +67,5 @@ export async function runTelegramAnnounceTurn(params: {
|
||||
message: "do it",
|
||||
sessionKey: "cron:job-1",
|
||||
lane: "cron",
|
||||
deliveryContract: params.deliveryContract,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ async function runExplicitTelegramAnnounceTurn(params: {
|
||||
home: string;
|
||||
storePath: string;
|
||||
deps: CliDeps;
|
||||
deliveryContract?: "cron-owned" | "shared";
|
||||
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
|
||||
return runTelegramAnnounceTurn({
|
||||
...params,
|
||||
@@ -302,7 +301,6 @@ describe("runCronIsolatedAgentTurn", () => {
|
||||
home,
|
||||
storePath,
|
||||
deps,
|
||||
deliveryContract: "shared",
|
||||
});
|
||||
|
||||
expectDeliveredOk(res);
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
* returning so the timer correctly skips the system-event fallback.
|
||||
*/
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
// --- Module mocks (must be hoisted before imports) ---
|
||||
|
||||
@@ -105,6 +105,7 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested
|
||||
resolvedDelivery,
|
||||
deliveryRequested: overrides.deliveryRequested ?? true,
|
||||
skipHeartbeatDelivery: false,
|
||||
skipMessagingToolDelivery: false,
|
||||
deliveryBestEffort: false,
|
||||
deliveryPayloadHasStructuredContent: false,
|
||||
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
|
||||
@@ -133,10 +134,6 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.unstubAllEnvs();
|
||||
});
|
||||
|
||||
it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
|
||||
// countActiveDescendantRuns returns >0 → enters wait block; still >0 after wait → early return
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(2);
|
||||
@@ -258,42 +255,6 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("retries transient direct announce failures before succeeding", async () => {
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
vi.mocked(deliverOutboundPayloads)
|
||||
.mockRejectedValueOnce(new Error("ECONNRESET while sending"))
|
||||
.mockResolvedValueOnce([{ ok: true } as never]);
|
||||
|
||||
const params = makeBaseParams({ synthesizedText: "Retry me once." });
|
||||
const state = await dispatchCronDelivery(params);
|
||||
|
||||
expect(state.result).toBeUndefined();
|
||||
expect(state.deliveryAttempted).toBe(true);
|
||||
expect(state.delivered).toBe(true);
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("does not retry permanent direct announce failures", async () => {
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
vi.mocked(deliverOutboundPayloads).mockRejectedValue(new Error("chat not found"));
|
||||
|
||||
const params = makeBaseParams({ synthesizedText: "This should fail once." });
|
||||
const state = await dispatchCronDelivery(params);
|
||||
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
expect(state.result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "error",
|
||||
error: "Error: chat not found",
|
||||
deliveryAttempted: true,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("no delivery requested means deliveryAttempted stays false and no delivery is sent", async () => {
|
||||
const params = makeBaseParams({
|
||||
synthesizedText: "Task done.",
|
||||
|
||||
@@ -96,13 +96,4 @@ describe("resolveCronDeliveryBestEffort", () => {
|
||||
} as never;
|
||||
expect(resolveCronDeliveryBestEffort(job)).toBe(true);
|
||||
});
|
||||
|
||||
it("lets explicit delivery.bestEffort=false override legacy payload bestEffortDeliver=true", async () => {
|
||||
const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js");
|
||||
const job = {
|
||||
delivery: { bestEffort: false },
|
||||
payload: { kind: "agentTurn", bestEffortDeliver: true },
|
||||
} as never;
|
||||
expect(resolveCronDeliveryBestEffort(job)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -83,7 +83,7 @@ type DispatchCronDeliveryParams = {
|
||||
resolvedDelivery: DeliveryTargetResolution;
|
||||
deliveryRequested: boolean;
|
||||
skipHeartbeatDelivery: boolean;
|
||||
skipMessagingToolDelivery?: boolean;
|
||||
skipMessagingToolDelivery: boolean;
|
||||
deliveryBestEffort: boolean;
|
||||
deliveryPayloadHasStructuredContent: boolean;
|
||||
deliveryPayloads: ReplyPayload[];
|
||||
@@ -192,17 +192,15 @@ async function retryTransientDirectCronDelivery<T>(params: {
|
||||
export async function dispatchCronDelivery(
|
||||
params: DispatchCronDeliveryParams,
|
||||
): Promise<DispatchCronDeliveryState> {
|
||||
const skipMessagingToolDelivery = params.skipMessagingToolDelivery === true;
|
||||
let summary = params.summary;
|
||||
let outputText = params.outputText;
|
||||
let synthesizedText = params.synthesizedText;
|
||||
let deliveryPayloads = params.deliveryPayloads;
|
||||
|
||||
// Shared callers can treat a matching message-tool send as the completed
|
||||
// delivery path. Cron-owned callers keep this false so direct cron delivery
|
||||
// remains the only source of delivered state.
|
||||
let delivered = skipMessagingToolDelivery;
|
||||
let deliveryAttempted = skipMessagingToolDelivery;
|
||||
// `true` means we confirmed at least one outbound send reached the target.
|
||||
// Keep this strict so timer fallback can safely decide whether to wake main.
|
||||
let delivered = params.skipMessagingToolDelivery;
|
||||
let deliveryAttempted = params.skipMessagingToolDelivery;
|
||||
const failDeliveryTarget = (error: string) =>
|
||||
params.withRunSession({
|
||||
status: "error",
|
||||
@@ -406,7 +404,11 @@ export async function dispatchCronDelivery(
|
||||
}
|
||||
};
|
||||
|
||||
if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) {
|
||||
if (
|
||||
params.deliveryRequested &&
|
||||
!params.skipHeartbeatDelivery &&
|
||||
!params.skipMessagingToolDelivery
|
||||
) {
|
||||
if (!params.resolvedDelivery.ok) {
|
||||
if (!params.deliveryBestEffort) {
|
||||
return {
|
||||
|
||||
@@ -55,7 +55,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
|
||||
restoreFastTestEnv(previousFastTestEnv);
|
||||
});
|
||||
|
||||
it('disables the message tool when delivery.mode is "none"', async () => {
|
||||
it('keeps the message tool enabled when delivery.mode is "none"', async () => {
|
||||
mockFallbackPassthrough();
|
||||
resolveCronDeliveryPlanMock.mockReturnValue({
|
||||
requested: false,
|
||||
@@ -65,7 +65,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
|
||||
await runCronIsolatedAgentTurn(makeParams());
|
||||
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
|
||||
});
|
||||
|
||||
it("disables the message tool when cron delivery is active", async () => {
|
||||
@@ -82,20 +82,4 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps the message tool enabled for shared callers when delivery is not requested", async () => {
|
||||
mockFallbackPassthrough();
|
||||
resolveCronDeliveryPlanMock.mockReturnValue({
|
||||
requested: false,
|
||||
mode: "none",
|
||||
});
|
||||
|
||||
await runCronIsolatedAgentTurn({
|
||||
...makeParams(),
|
||||
deliveryContract: "shared",
|
||||
});
|
||||
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -78,10 +78,11 @@ export type RunCronAgentTurnResult = {
|
||||
/** Last non-empty agent text output (not truncated). */
|
||||
outputText?: string;
|
||||
/**
|
||||
* `true` when the isolated runner already handled the run's user-visible
|
||||
* delivery outcome. Cron-owned callers use this for cron delivery or
|
||||
* explicit suppression; shared callers may also use it for a matching
|
||||
* message-tool send that already reached the target.
|
||||
* `true` when the isolated run already delivered its output to the target
|
||||
* channel (via outbound payloads, the subagent announce flow, or a matching
|
||||
* messaging-tool send). Callers should skip posting a summary to the main
|
||||
* session to avoid duplicate
|
||||
* messages. See: https://github.com/openclaw/openclaw/issues/15692
|
||||
*/
|
||||
delivered?: boolean;
|
||||
/**
|
||||
@@ -143,22 +144,16 @@ function buildCronAgentDefaultsConfig(params: {
|
||||
|
||||
type ResolvedCronDeliveryTarget = Awaited<ReturnType<typeof resolveDeliveryTarget>>;
|
||||
|
||||
type IsolatedDeliveryContract = "cron-owned" | "shared";
|
||||
|
||||
function resolveCronToolPolicy(params: {
|
||||
deliveryRequested: boolean;
|
||||
resolvedDelivery: ResolvedCronDeliveryTarget;
|
||||
deliveryContract: IsolatedDeliveryContract;
|
||||
}) {
|
||||
return {
|
||||
// Only enforce an explicit message target when the cron delivery target
|
||||
// was successfully resolved. When resolution fails the agent should not
|
||||
// be blocked by a target it cannot satisfy (#27898).
|
||||
requireExplicitMessageTarget: params.deliveryRequested && params.resolvedDelivery.ok,
|
||||
// Cron-owned runs always route user-facing delivery through the runner
|
||||
// itself. Shared callers keep the previous behavior so non-cron paths do
|
||||
// not silently lose the message tool when no explicit delivery is active.
|
||||
disableMessageTool: params.deliveryContract === "cron-owned" ? true : params.deliveryRequested,
|
||||
disableMessageTool: params.deliveryRequested,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -166,7 +161,6 @@ async function resolveCronDeliveryContext(params: {
|
||||
cfg: OpenClawConfig;
|
||||
job: CronJob;
|
||||
agentId: string;
|
||||
deliveryContract: IsolatedDeliveryContract;
|
||||
}) {
|
||||
const deliveryPlan = resolveCronDeliveryPlan(params.job);
|
||||
const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, {
|
||||
@@ -182,7 +176,6 @@ async function resolveCronDeliveryContext(params: {
|
||||
toolPolicy: resolveCronToolPolicy({
|
||||
deliveryRequested: deliveryPlan.requested,
|
||||
resolvedDelivery,
|
||||
deliveryContract: params.deliveryContract,
|
||||
}),
|
||||
};
|
||||
}
|
||||
@@ -207,7 +200,6 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
sessionKey: string;
|
||||
agentId?: string;
|
||||
lane?: string;
|
||||
deliveryContract?: IsolatedDeliveryContract;
|
||||
}): Promise<RunCronAgentTurnResult> {
|
||||
const abortSignal = params.abortSignal ?? params.signal;
|
||||
const isAborted = () => abortSignal?.aborted === true;
|
||||
@@ -218,7 +210,6 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
: "cron: job execution timed out";
|
||||
};
|
||||
const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1";
|
||||
const deliveryContract = params.deliveryContract ?? "cron-owned";
|
||||
const defaultAgentId = resolveDefaultAgentId(params.cfg);
|
||||
const requestedAgentId =
|
||||
typeof params.agentId === "string" && params.agentId.trim()
|
||||
@@ -434,7 +425,6 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
cfg: cfgWithAgentDefaults,
|
||||
job: params.job,
|
||||
agentId,
|
||||
deliveryContract,
|
||||
});
|
||||
|
||||
const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now);
|
||||
@@ -817,7 +807,6 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg);
|
||||
const skipHeartbeatDelivery = deliveryRequested && isHeartbeatOnlyResponse(payloads, ackMaxChars);
|
||||
const skipMessagingToolDelivery =
|
||||
deliveryContract === "shared" &&
|
||||
deliveryRequested &&
|
||||
finalRunResult.didSendViaMessagingTool === true &&
|
||||
(finalRunResult.messagingToolSentTargets ?? []).some((target) =>
|
||||
@@ -827,6 +816,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
accountId: resolvedDelivery.accountId,
|
||||
}),
|
||||
);
|
||||
|
||||
const deliveryResult = await dispatchCronDelivery({
|
||||
cfg: params.cfg,
|
||||
cfgWithAgentDefaults,
|
||||
|
||||
@@ -47,12 +47,8 @@ describe("isLikelyInterimCronMessage", () => {
|
||||
false,
|
||||
);
|
||||
});
|
||||
it("does not treat empty as interim (empty = NO_REPLY was stripped)", () => {
|
||||
expect(isLikelyInterimCronMessage("")).toBe(false);
|
||||
});
|
||||
|
||||
it("does not treat whitespace-only as interim", () => {
|
||||
expect(isLikelyInterimCronMessage(" ")).toBe(false);
|
||||
it("treats empty as interim", () => {
|
||||
expect(isLikelyInterimCronMessage("")).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -42,10 +42,7 @@ function normalizeHintText(value: string): string {
|
||||
export function isLikelyInterimCronMessage(value: string): boolean {
|
||||
const normalized = normalizeHintText(value);
|
||||
if (!normalized) {
|
||||
// Empty text after payload filtering means the agent either returned
|
||||
// NO_REPLY (deliberately silent) or produced no deliverable content.
|
||||
// Do not treat this as an interim acknowledgement that needs a rerun.
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
const words = normalized.split(" ").filter(Boolean).length;
|
||||
return words <= 45 && INTERIM_CRON_HINTS.some((hint) => normalized.includes(hint));
|
||||
|
||||
@@ -86,7 +86,7 @@ describe("CronService delivery plan consistency", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("treats delivery object without mode as announce without reviving legacy relay fallback", async () => {
|
||||
it("treats delivery object without mode as announce", async () => {
|
||||
await withCronService({}, async ({ cron, enqueueSystemEvent }) => {
|
||||
const job = await addIsolatedAgentTurnJob(cron, {
|
||||
name: "partial-delivery",
|
||||
@@ -96,8 +96,10 @@ describe("CronService delivery plan consistency", () => {
|
||||
|
||||
const result = await cron.run(job.id, "force");
|
||||
expect(result).toEqual({ ok: true, ran: true });
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(cron.getJob(job.id)?.state.lastDeliveryStatus).toBe("unknown");
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith(
|
||||
"Cron: done",
|
||||
expect.objectContaining({ agentId: undefined }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not revive legacy main-session relay for real cron summaries", async () => {
|
||||
it("still enqueues real cron summaries as system events", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.now();
|
||||
|
||||
@@ -109,7 +109,10 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
|
||||
|
||||
await runScheduledCron(cron);
|
||||
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
// Real summaries SHOULD be enqueued.
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Weather update"),
|
||||
expect.objectContaining({ agentId: undefined }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -620,14 +620,14 @@ describe("CronService", () => {
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("runs an isolated job without posting a fallback summary to main", async () => {
|
||||
it("runs an isolated job and posts summary to main", async () => {
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" }));
|
||||
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
|
||||
await createIsolatedAnnounceHarness(runIsolatedAgentJob);
|
||||
await runIsolatedAnnounceScenario({ cron, events, name: "weekly" });
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
expectMainSystemEventPosted(enqueueSystemEvent, "Cron: done");
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
@@ -685,7 +685,7 @@ describe("CronService", () => {
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("does not post a fallback main summary when an isolated job errors", async () => {
|
||||
it("posts last output to main even when isolated job errors", async () => {
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({
|
||||
status: "error" as const,
|
||||
summary: "last output",
|
||||
@@ -700,8 +700,8 @@ describe("CronService", () => {
|
||||
status: "error",
|
||||
});
|
||||
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
expectMainSystemEventPosted(enqueueSystemEvent, "Cron (error): last output");
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
|
||||
@@ -1,10 +1,161 @@
|
||||
import fs from "node:fs";
|
||||
import { normalizeStoredCronJobs } from "../store-migration.js";
|
||||
import { normalizeLegacyDeliveryInput } from "../legacy-delivery.js";
|
||||
import { parseAbsoluteTimeMs } from "../parse.js";
|
||||
import { migrateLegacyCronPayload } from "../payload-migration.js";
|
||||
import { coerceFiniteScheduleNumber } from "../schedule.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js";
|
||||
import { loadCronStore, saveCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { recomputeNextRuns } from "./jobs.js";
|
||||
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
|
||||
import type { CronServiceState } from "./state.js";
|
||||
|
||||
function normalizePayloadKind(payload: Record<string, unknown>) {
|
||||
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
|
||||
if (raw === "agentturn") {
|
||||
payload.kind = "agentTurn";
|
||||
return true;
|
||||
}
|
||||
if (raw === "systemevent") {
|
||||
payload.kind = "systemEvent";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function inferPayloadIfMissing(raw: Record<string, unknown>) {
|
||||
const message = typeof raw.message === "string" ? raw.message.trim() : "";
|
||||
const text = typeof raw.text === "string" ? raw.text.trim() : "";
|
||||
const command = typeof raw.command === "string" ? raw.command.trim() : "";
|
||||
if (message) {
|
||||
raw.payload = { kind: "agentTurn", message };
|
||||
return true;
|
||||
}
|
||||
if (text) {
|
||||
raw.payload = { kind: "systemEvent", text };
|
||||
return true;
|
||||
}
|
||||
if (command) {
|
||||
raw.payload = { kind: "systemEvent", text: command };
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function copyTopLevelAgentTurnFields(
|
||||
raw: Record<string, unknown>,
|
||||
payload: Record<string, unknown>,
|
||||
) {
|
||||
let mutated = false;
|
||||
|
||||
const copyTrimmedString = (field: "model" | "thinking") => {
|
||||
const existing = payload[field];
|
||||
if (typeof existing === "string" && existing.trim()) {
|
||||
return;
|
||||
}
|
||||
const value = raw[field];
|
||||
if (typeof value === "string" && value.trim()) {
|
||||
payload[field] = value.trim();
|
||||
mutated = true;
|
||||
}
|
||||
};
|
||||
copyTrimmedString("model");
|
||||
copyTrimmedString("thinking");
|
||||
|
||||
if (
|
||||
typeof payload.timeoutSeconds !== "number" &&
|
||||
typeof raw.timeoutSeconds === "number" &&
|
||||
Number.isFinite(raw.timeoutSeconds)
|
||||
) {
|
||||
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (
|
||||
typeof payload.allowUnsafeExternalContent !== "boolean" &&
|
||||
typeof raw.allowUnsafeExternalContent === "boolean"
|
||||
) {
|
||||
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
|
||||
payload.deliver = raw.deliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.channel !== "string" &&
|
||||
typeof raw.channel === "string" &&
|
||||
raw.channel.trim()
|
||||
) {
|
||||
payload.channel = raw.channel.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
|
||||
payload.to = raw.to.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.bestEffortDeliver !== "boolean" &&
|
||||
typeof raw.bestEffortDeliver === "boolean"
|
||||
) {
|
||||
payload.bestEffortDeliver = raw.bestEffortDeliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.provider !== "string" &&
|
||||
typeof raw.provider === "string" &&
|
||||
raw.provider.trim()
|
||||
) {
|
||||
payload.provider = raw.provider.trim();
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
return mutated;
|
||||
}
|
||||
|
||||
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
|
||||
if ("model" in raw) {
|
||||
delete raw.model;
|
||||
}
|
||||
if ("thinking" in raw) {
|
||||
delete raw.thinking;
|
||||
}
|
||||
if ("timeoutSeconds" in raw) {
|
||||
delete raw.timeoutSeconds;
|
||||
}
|
||||
if ("allowUnsafeExternalContent" in raw) {
|
||||
delete raw.allowUnsafeExternalContent;
|
||||
}
|
||||
if ("message" in raw) {
|
||||
delete raw.message;
|
||||
}
|
||||
if ("text" in raw) {
|
||||
delete raw.text;
|
||||
}
|
||||
if ("deliver" in raw) {
|
||||
delete raw.deliver;
|
||||
}
|
||||
if ("channel" in raw) {
|
||||
delete raw.channel;
|
||||
}
|
||||
if ("to" in raw) {
|
||||
delete raw.to;
|
||||
}
|
||||
if ("bestEffortDeliver" in raw) {
|
||||
delete raw.bestEffortDeliver;
|
||||
}
|
||||
if ("provider" in raw) {
|
||||
delete raw.provider;
|
||||
}
|
||||
if ("command" in raw) {
|
||||
delete raw.command;
|
||||
}
|
||||
if ("timeout" in raw) {
|
||||
delete raw.timeout;
|
||||
}
|
||||
}
|
||||
|
||||
async function getFileMtimeMs(path: string): Promise<number | null> {
|
||||
try {
|
||||
const stats = await fs.promises.stat(path);
|
||||
@@ -34,7 +185,287 @@ export async function ensureLoaded(
|
||||
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
|
||||
const loaded = await loadCronStore(state.deps.storePath);
|
||||
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
||||
const { mutated } = normalizeStoredCronJobs(jobs);
|
||||
let mutated = false;
|
||||
for (const raw of jobs) {
|
||||
const state = raw.state;
|
||||
if (!state || typeof state !== "object" || Array.isArray(state)) {
|
||||
raw.state = {};
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
|
||||
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
|
||||
if (!rawId && legacyJobId) {
|
||||
raw.id = legacyJobId;
|
||||
mutated = true;
|
||||
} else if (rawId && raw.id !== rawId) {
|
||||
raw.id = rawId;
|
||||
mutated = true;
|
||||
}
|
||||
if ("jobId" in raw) {
|
||||
delete raw.jobId;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (typeof raw.schedule === "string") {
|
||||
const expr = raw.schedule.trim();
|
||||
raw.schedule = { kind: "cron", expr };
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const nameRaw = raw.name;
|
||||
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
|
||||
raw.name = inferLegacyName({
|
||||
schedule: raw.schedule as never,
|
||||
payload: raw.payload as never,
|
||||
});
|
||||
mutated = true;
|
||||
} else {
|
||||
raw.name = nameRaw.trim();
|
||||
}
|
||||
|
||||
const desc = normalizeOptionalText(raw.description);
|
||||
if (raw.description !== desc) {
|
||||
raw.description = desc;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if ("sessionKey" in raw) {
|
||||
const sessionKey =
|
||||
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
|
||||
if (raw.sessionKey !== sessionKey) {
|
||||
raw.sessionKey = sessionKey;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof raw.enabled !== "boolean") {
|
||||
raw.enabled = true;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
|
||||
if (wakeModeRaw === "next-heartbeat") {
|
||||
if (raw.wakeMode !== "next-heartbeat") {
|
||||
raw.wakeMode = "next-heartbeat";
|
||||
mutated = true;
|
||||
}
|
||||
} else if (wakeModeRaw === "now") {
|
||||
if (raw.wakeMode !== "now") {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payload = raw.payload;
|
||||
if (
|
||||
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
|
||||
inferPayloadIfMissing(raw)
|
||||
) {
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payloadRecord =
|
||||
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
|
||||
? (raw.payload as Record<string, unknown>)
|
||||
: null;
|
||||
|
||||
if (payloadRecord) {
|
||||
if (normalizePayloadKind(payloadRecord)) {
|
||||
mutated = true;
|
||||
}
|
||||
if (!payloadRecord.kind) {
|
||||
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
|
||||
payloadRecord.kind = "agentTurn";
|
||||
mutated = true;
|
||||
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
|
||||
payloadRecord.kind = "systemEvent";
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
if (payloadRecord.kind === "agentTurn") {
|
||||
if (copyTopLevelAgentTurnFields(raw, payloadRecord)) {
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const hadLegacyTopLevelFields =
|
||||
"model" in raw ||
|
||||
"thinking" in raw ||
|
||||
"timeoutSeconds" in raw ||
|
||||
"allowUnsafeExternalContent" in raw ||
|
||||
"message" in raw ||
|
||||
"text" in raw ||
|
||||
"deliver" in raw ||
|
||||
"channel" in raw ||
|
||||
"to" in raw ||
|
||||
"bestEffortDeliver" in raw ||
|
||||
"provider" in raw ||
|
||||
"command" in raw ||
|
||||
"timeout" in raw;
|
||||
if (hadLegacyTopLevelFields) {
|
||||
stripLegacyTopLevelFields(raw);
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (payloadRecord) {
|
||||
if (migrateLegacyCronPayload(payloadRecord)) {
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const schedule = raw.schedule;
|
||||
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
|
||||
const sched = schedule as Record<string, unknown>;
|
||||
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
|
||||
if (!kind && ("at" in sched || "atMs" in sched)) {
|
||||
sched.kind = "at";
|
||||
mutated = true;
|
||||
}
|
||||
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
|
||||
const atMsRaw = sched.atMs;
|
||||
const parsedAtMs =
|
||||
typeof atMsRaw === "number"
|
||||
? atMsRaw
|
||||
: typeof atMsRaw === "string"
|
||||
? parseAbsoluteTimeMs(atMsRaw)
|
||||
: atRaw
|
||||
? parseAbsoluteTimeMs(atRaw)
|
||||
: null;
|
||||
if (parsedAtMs !== null) {
|
||||
sched.at = new Date(parsedAtMs).toISOString();
|
||||
if ("atMs" in sched) {
|
||||
delete sched.atMs;
|
||||
}
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const everyMsRaw = sched.everyMs;
|
||||
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
|
||||
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
|
||||
if (everyMs !== null && everyMsRaw !== everyMs) {
|
||||
sched.everyMs = everyMs;
|
||||
mutated = true;
|
||||
}
|
||||
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
|
||||
const anchorRaw = sched.anchorMs;
|
||||
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
|
||||
const normalizedAnchor =
|
||||
anchorCoerced !== undefined
|
||||
? Math.max(0, Math.floor(anchorCoerced))
|
||||
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
|
||||
? Math.max(0, Math.floor(raw.createdAtMs))
|
||||
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
|
||||
? Math.max(0, Math.floor(raw.updatedAtMs))
|
||||
: null;
|
||||
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
|
||||
sched.anchorMs = normalizedAnchor;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
|
||||
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
|
||||
let normalizedExpr = exprRaw;
|
||||
if (!normalizedExpr && legacyCronRaw) {
|
||||
normalizedExpr = legacyCronRaw;
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
}
|
||||
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
}
|
||||
if ("cron" in sched) {
|
||||
delete sched.cron;
|
||||
mutated = true;
|
||||
}
|
||||
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
|
||||
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
|
||||
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
|
||||
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
|
||||
if (targetStaggerMs === undefined) {
|
||||
if ("staggerMs" in sched) {
|
||||
delete sched.staggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (sched.staggerMs !== targetStaggerMs) {
|
||||
sched.staggerMs = targetStaggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const delivery = raw.delivery;
|
||||
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
|
||||
const modeRaw = (delivery as { mode?: unknown }).mode;
|
||||
if (typeof modeRaw === "string") {
|
||||
const lowered = modeRaw.trim().toLowerCase();
|
||||
if (lowered === "deliver") {
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
}
|
||||
} else if (modeRaw === undefined || modeRaw === null) {
|
||||
// Explicitly persist the default so existing jobs don't silently
|
||||
// change behaviour when the runtime default shifts.
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const isolation = raw.isolation;
|
||||
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
|
||||
delete raw.isolation;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payloadKind =
|
||||
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
|
||||
const normalizedSessionTarget =
|
||||
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
|
||||
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
|
||||
if (raw.sessionTarget !== normalizedSessionTarget) {
|
||||
raw.sessionTarget = normalizedSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
|
||||
if (raw.sessionTarget !== inferredSessionTarget) {
|
||||
raw.sessionTarget = inferredSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const sessionTarget =
|
||||
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
|
||||
const isIsolatedAgentTurn =
|
||||
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
|
||||
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
|
||||
const normalizedLegacy = normalizeLegacyDeliveryInput({
|
||||
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
|
||||
payload: payloadRecord,
|
||||
});
|
||||
|
||||
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
|
||||
if (!hasDelivery && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
} else if (!hasDelivery) {
|
||||
raw.delivery = { mode: "announce" };
|
||||
mutated = true;
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
|
||||
state.storeLoadedAtMs = state.deps.nowMs();
|
||||
state.storeFileMtimeMs = fileMtimeMs;
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
|
||||
import { isCronSystemEvent } from "../../infra/heartbeat-events-filter.js";
|
||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
|
||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
|
||||
import { sweepCronRunSessions } from "../session-reaper.js";
|
||||
import type {
|
||||
CronDeliveryStatus,
|
||||
@@ -1136,6 +1138,46 @@ export async function executeJobCore(
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
|
||||
// Post a short summary back to the main session only when announce
|
||||
// delivery was requested and we are confident no outbound delivery path
|
||||
// ran. If delivery was attempted but final ack is uncertain, suppress the
|
||||
// main summary to avoid duplicate user-facing sends.
|
||||
// See: https://github.com/openclaw/openclaw/issues/15692
|
||||
//
|
||||
// Also suppress heartbeat-only summaries (e.g. "HEARTBEAT_OK") — these
|
||||
// are internal ack tokens that should never leak into user conversations.
|
||||
// See: https://github.com/openclaw/openclaw/issues/32013
|
||||
const summaryText = res.summary?.trim();
|
||||
const deliveryPlan = resolveCronDeliveryPlan(job);
|
||||
const suppressMainSummary =
|
||||
res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested;
|
||||
if (
|
||||
shouldEnqueueCronMainSummary({
|
||||
summaryText,
|
||||
deliveryRequested: deliveryPlan.requested,
|
||||
delivered: res.delivered,
|
||||
deliveryAttempted: res.deliveryAttempted,
|
||||
suppressMainSummary,
|
||||
isCronSystemEvent,
|
||||
})
|
||||
) {
|
||||
const prefix = "Cron";
|
||||
const label =
|
||||
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
|
||||
state.deps.enqueueSystemEvent(label, {
|
||||
agentId: job.agentId,
|
||||
sessionKey: job.sessionKey,
|
||||
contextKey: `cron:${job.id}`,
|
||||
});
|
||||
if (job.wakeMode === "now") {
|
||||
state.deps.requestHeartbeatNow({
|
||||
reason: `cron:${job.id}`,
|
||||
agentId: job.agentId,
|
||||
sessionKey: job.sessionKey,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
status: res.status,
|
||||
error: res.error,
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { normalizeStoredCronJobs } from "./store-migration.js";
|
||||
|
||||
describe("normalizeStoredCronJobs", () => {
|
||||
it("normalizes legacy cron fields and reports migration issues", () => {
|
||||
const jobs = [
|
||||
{
|
||||
jobId: "legacy-job",
|
||||
schedule: { kind: "cron", cron: "*/5 * * * *", tz: "UTC" },
|
||||
message: "say hi",
|
||||
model: "openai/gpt-4.1",
|
||||
deliver: true,
|
||||
provider: " TeLeGrAm ",
|
||||
to: "12345",
|
||||
},
|
||||
] as Array<Record<string, unknown>>;
|
||||
|
||||
const result = normalizeStoredCronJobs(jobs);
|
||||
|
||||
expect(result.mutated).toBe(true);
|
||||
expect(result.issues).toMatchObject({
|
||||
jobId: 1,
|
||||
legacyScheduleCron: 1,
|
||||
legacyTopLevelPayloadFields: 1,
|
||||
legacyTopLevelDeliveryFields: 1,
|
||||
});
|
||||
|
||||
const [job] = jobs;
|
||||
expect(job?.jobId).toBeUndefined();
|
||||
expect(job?.id).toBe("legacy-job");
|
||||
expect(job?.schedule).toMatchObject({
|
||||
kind: "cron",
|
||||
expr: "*/5 * * * *",
|
||||
tz: "UTC",
|
||||
});
|
||||
expect(job?.message).toBeUndefined();
|
||||
expect(job?.provider).toBeUndefined();
|
||||
expect(job?.delivery).toMatchObject({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "12345",
|
||||
});
|
||||
expect(job?.payload).toMatchObject({
|
||||
kind: "agentTurn",
|
||||
message: "say hi",
|
||||
model: "openai/gpt-4.1",
|
||||
});
|
||||
});
|
||||
|
||||
it("normalizes payload provider alias into channel", () => {
|
||||
const jobs = [
|
||||
{
|
||||
id: "legacy-provider",
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "ping",
|
||||
provider: " Slack ",
|
||||
},
|
||||
},
|
||||
] as Array<Record<string, unknown>>;
|
||||
|
||||
const result = normalizeStoredCronJobs(jobs);
|
||||
|
||||
expect(result.mutated).toBe(true);
|
||||
expect(result.issues.legacyPayloadProvider).toBe(1);
|
||||
expect(jobs[0]?.payload).toMatchObject({
|
||||
kind: "agentTurn",
|
||||
message: "ping",
|
||||
});
|
||||
const payload = jobs[0]?.payload as Record<string, unknown> | undefined;
|
||||
expect(payload?.provider).toBeUndefined();
|
||||
expect(jobs[0]?.delivery).toMatchObject({
|
||||
mode: "announce",
|
||||
channel: "slack",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,491 +0,0 @@
|
||||
import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js";
|
||||
import { parseAbsoluteTimeMs } from "./parse.js";
|
||||
import { migrateLegacyCronPayload } from "./payload-migration.js";
|
||||
import { coerceFiniteScheduleNumber } from "./schedule.js";
|
||||
import { inferLegacyName, normalizeOptionalText } from "./service/normalize.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js";
|
||||
|
||||
type CronStoreIssueKey =
|
||||
| "jobId"
|
||||
| "legacyScheduleString"
|
||||
| "legacyScheduleCron"
|
||||
| "legacyPayloadKind"
|
||||
| "legacyPayloadProvider"
|
||||
| "legacyTopLevelPayloadFields"
|
||||
| "legacyTopLevelDeliveryFields"
|
||||
| "legacyDeliveryMode";
|
||||
|
||||
type CronStoreIssues = Partial<Record<CronStoreIssueKey, number>>;
|
||||
|
||||
type NormalizeCronStoreJobsResult = {
|
||||
issues: CronStoreIssues;
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
mutated: boolean;
|
||||
};
|
||||
|
||||
function incrementIssue(issues: CronStoreIssues, key: CronStoreIssueKey) {
|
||||
issues[key] = (issues[key] ?? 0) + 1;
|
||||
}
|
||||
|
||||
function normalizePayloadKind(payload: Record<string, unknown>) {
|
||||
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
|
||||
if (raw === "agentturn") {
|
||||
payload.kind = "agentTurn";
|
||||
return true;
|
||||
}
|
||||
if (raw === "systemevent") {
|
||||
payload.kind = "systemEvent";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function inferPayloadIfMissing(raw: Record<string, unknown>) {
|
||||
const message = typeof raw.message === "string" ? raw.message.trim() : "";
|
||||
const text = typeof raw.text === "string" ? raw.text.trim() : "";
|
||||
const command = typeof raw.command === "string" ? raw.command.trim() : "";
|
||||
if (message) {
|
||||
raw.payload = { kind: "agentTurn", message };
|
||||
return true;
|
||||
}
|
||||
if (text) {
|
||||
raw.payload = { kind: "systemEvent", text };
|
||||
return true;
|
||||
}
|
||||
if (command) {
|
||||
raw.payload = { kind: "systemEvent", text: command };
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function copyTopLevelAgentTurnFields(
|
||||
raw: Record<string, unknown>,
|
||||
payload: Record<string, unknown>,
|
||||
) {
|
||||
let mutated = false;
|
||||
|
||||
const copyTrimmedString = (field: "model" | "thinking") => {
|
||||
const existing = payload[field];
|
||||
if (typeof existing === "string" && existing.trim()) {
|
||||
return;
|
||||
}
|
||||
const value = raw[field];
|
||||
if (typeof value === "string" && value.trim()) {
|
||||
payload[field] = value.trim();
|
||||
mutated = true;
|
||||
}
|
||||
};
|
||||
copyTrimmedString("model");
|
||||
copyTrimmedString("thinking");
|
||||
|
||||
if (
|
||||
typeof payload.timeoutSeconds !== "number" &&
|
||||
typeof raw.timeoutSeconds === "number" &&
|
||||
Number.isFinite(raw.timeoutSeconds)
|
||||
) {
|
||||
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (
|
||||
typeof payload.allowUnsafeExternalContent !== "boolean" &&
|
||||
typeof raw.allowUnsafeExternalContent === "boolean"
|
||||
) {
|
||||
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
|
||||
payload.deliver = raw.deliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.channel !== "string" &&
|
||||
typeof raw.channel === "string" &&
|
||||
raw.channel.trim()
|
||||
) {
|
||||
payload.channel = raw.channel.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
|
||||
payload.to = raw.to.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.bestEffortDeliver !== "boolean" &&
|
||||
typeof raw.bestEffortDeliver === "boolean"
|
||||
) {
|
||||
payload.bestEffortDeliver = raw.bestEffortDeliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.provider !== "string" &&
|
||||
typeof raw.provider === "string" &&
|
||||
raw.provider.trim()
|
||||
) {
|
||||
payload.provider = raw.provider.trim();
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
return mutated;
|
||||
}
|
||||
|
||||
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
|
||||
if ("model" in raw) {
|
||||
delete raw.model;
|
||||
}
|
||||
if ("thinking" in raw) {
|
||||
delete raw.thinking;
|
||||
}
|
||||
if ("timeoutSeconds" in raw) {
|
||||
delete raw.timeoutSeconds;
|
||||
}
|
||||
if ("allowUnsafeExternalContent" in raw) {
|
||||
delete raw.allowUnsafeExternalContent;
|
||||
}
|
||||
if ("message" in raw) {
|
||||
delete raw.message;
|
||||
}
|
||||
if ("text" in raw) {
|
||||
delete raw.text;
|
||||
}
|
||||
if ("deliver" in raw) {
|
||||
delete raw.deliver;
|
||||
}
|
||||
if ("channel" in raw) {
|
||||
delete raw.channel;
|
||||
}
|
||||
if ("to" in raw) {
|
||||
delete raw.to;
|
||||
}
|
||||
if ("bestEffortDeliver" in raw) {
|
||||
delete raw.bestEffortDeliver;
|
||||
}
|
||||
if ("provider" in raw) {
|
||||
delete raw.provider;
|
||||
}
|
||||
if ("command" in raw) {
|
||||
delete raw.command;
|
||||
}
|
||||
if ("timeout" in raw) {
|
||||
delete raw.timeout;
|
||||
}
|
||||
}
|
||||
|
||||
export function normalizeStoredCronJobs(
|
||||
jobs: Array<Record<string, unknown>>,
|
||||
): NormalizeCronStoreJobsResult {
|
||||
const issues: CronStoreIssues = {};
|
||||
let mutated = false;
|
||||
|
||||
for (const raw of jobs) {
|
||||
const jobIssues = new Set<CronStoreIssueKey>();
|
||||
const trackIssue = (key: CronStoreIssueKey) => {
|
||||
if (jobIssues.has(key)) {
|
||||
return;
|
||||
}
|
||||
jobIssues.add(key);
|
||||
incrementIssue(issues, key);
|
||||
};
|
||||
|
||||
const state = raw.state;
|
||||
if (!state || typeof state !== "object" || Array.isArray(state)) {
|
||||
raw.state = {};
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
|
||||
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
|
||||
if (!rawId && legacyJobId) {
|
||||
raw.id = legacyJobId;
|
||||
mutated = true;
|
||||
trackIssue("jobId");
|
||||
} else if (rawId && raw.id !== rawId) {
|
||||
raw.id = rawId;
|
||||
mutated = true;
|
||||
}
|
||||
if ("jobId" in raw) {
|
||||
delete raw.jobId;
|
||||
mutated = true;
|
||||
trackIssue("jobId");
|
||||
}
|
||||
|
||||
if (typeof raw.schedule === "string") {
|
||||
const expr = raw.schedule.trim();
|
||||
raw.schedule = { kind: "cron", expr };
|
||||
mutated = true;
|
||||
trackIssue("legacyScheduleString");
|
||||
}
|
||||
|
||||
const nameRaw = raw.name;
|
||||
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
|
||||
raw.name = inferLegacyName({
|
||||
schedule: raw.schedule as never,
|
||||
payload: raw.payload as never,
|
||||
});
|
||||
mutated = true;
|
||||
} else {
|
||||
raw.name = nameRaw.trim();
|
||||
}
|
||||
|
||||
const desc = normalizeOptionalText(raw.description);
|
||||
if (raw.description !== desc) {
|
||||
raw.description = desc;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if ("sessionKey" in raw) {
|
||||
const sessionKey =
|
||||
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
|
||||
if (raw.sessionKey !== sessionKey) {
|
||||
raw.sessionKey = sessionKey;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof raw.enabled !== "boolean") {
|
||||
raw.enabled = true;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
|
||||
if (wakeModeRaw === "next-heartbeat") {
|
||||
if (raw.wakeMode !== "next-heartbeat") {
|
||||
raw.wakeMode = "next-heartbeat";
|
||||
mutated = true;
|
||||
}
|
||||
} else if (wakeModeRaw === "now") {
|
||||
if (raw.wakeMode !== "now") {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payload = raw.payload;
|
||||
if (
|
||||
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
|
||||
inferPayloadIfMissing(raw)
|
||||
) {
|
||||
mutated = true;
|
||||
trackIssue("legacyTopLevelPayloadFields");
|
||||
}
|
||||
|
||||
const payloadRecord =
|
||||
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
|
||||
? (raw.payload as Record<string, unknown>)
|
||||
: null;
|
||||
|
||||
if (payloadRecord) {
|
||||
if (normalizePayloadKind(payloadRecord)) {
|
||||
mutated = true;
|
||||
trackIssue("legacyPayloadKind");
|
||||
}
|
||||
if (!payloadRecord.kind) {
|
||||
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
|
||||
payloadRecord.kind = "agentTurn";
|
||||
mutated = true;
|
||||
trackIssue("legacyPayloadKind");
|
||||
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
|
||||
payloadRecord.kind = "systemEvent";
|
||||
mutated = true;
|
||||
trackIssue("legacyPayloadKind");
|
||||
}
|
||||
}
|
||||
if (payloadRecord.kind === "agentTurn" && copyTopLevelAgentTurnFields(raw, payloadRecord)) {
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const hadLegacyTopLevelPayloadFields =
|
||||
"model" in raw ||
|
||||
"thinking" in raw ||
|
||||
"timeoutSeconds" in raw ||
|
||||
"allowUnsafeExternalContent" in raw ||
|
||||
"message" in raw ||
|
||||
"text" in raw ||
|
||||
"command" in raw ||
|
||||
"timeout" in raw;
|
||||
const hadLegacyTopLevelDeliveryFields =
|
||||
"deliver" in raw ||
|
||||
"channel" in raw ||
|
||||
"to" in raw ||
|
||||
"bestEffortDeliver" in raw ||
|
||||
"provider" in raw;
|
||||
if (hadLegacyTopLevelPayloadFields || hadLegacyTopLevelDeliveryFields) {
|
||||
stripLegacyTopLevelFields(raw);
|
||||
mutated = true;
|
||||
if (hadLegacyTopLevelPayloadFields) {
|
||||
trackIssue("legacyTopLevelPayloadFields");
|
||||
}
|
||||
if (hadLegacyTopLevelDeliveryFields) {
|
||||
trackIssue("legacyTopLevelDeliveryFields");
|
||||
}
|
||||
}
|
||||
|
||||
if (payloadRecord) {
|
||||
const hadLegacyPayloadProvider =
|
||||
typeof payloadRecord.provider === "string" && payloadRecord.provider.trim().length > 0;
|
||||
if (migrateLegacyCronPayload(payloadRecord)) {
|
||||
mutated = true;
|
||||
if (hadLegacyPayloadProvider) {
|
||||
trackIssue("legacyPayloadProvider");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const schedule = raw.schedule;
|
||||
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
|
||||
const sched = schedule as Record<string, unknown>;
|
||||
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
|
||||
if (!kind && ("at" in sched || "atMs" in sched)) {
|
||||
sched.kind = "at";
|
||||
mutated = true;
|
||||
}
|
||||
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
|
||||
const atMsRaw = sched.atMs;
|
||||
const parsedAtMs =
|
||||
typeof atMsRaw === "number"
|
||||
? atMsRaw
|
||||
: typeof atMsRaw === "string"
|
||||
? parseAbsoluteTimeMs(atMsRaw)
|
||||
: atRaw
|
||||
? parseAbsoluteTimeMs(atRaw)
|
||||
: null;
|
||||
if (parsedAtMs !== null) {
|
||||
sched.at = new Date(parsedAtMs).toISOString();
|
||||
if ("atMs" in sched) {
|
||||
delete sched.atMs;
|
||||
}
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const everyMsRaw = sched.everyMs;
|
||||
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
|
||||
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
|
||||
if (everyMs !== null && everyMsRaw !== everyMs) {
|
||||
sched.everyMs = everyMs;
|
||||
mutated = true;
|
||||
}
|
||||
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
|
||||
const anchorRaw = sched.anchorMs;
|
||||
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
|
||||
const normalizedAnchor =
|
||||
anchorCoerced !== undefined
|
||||
? Math.max(0, Math.floor(anchorCoerced))
|
||||
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
|
||||
? Math.max(0, Math.floor(raw.createdAtMs))
|
||||
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
|
||||
? Math.max(0, Math.floor(raw.updatedAtMs))
|
||||
: null;
|
||||
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
|
||||
sched.anchorMs = normalizedAnchor;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
|
||||
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
|
||||
let normalizedExpr = exprRaw;
|
||||
if (!normalizedExpr && legacyCronRaw) {
|
||||
normalizedExpr = legacyCronRaw;
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
trackIssue("legacyScheduleCron");
|
||||
}
|
||||
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
}
|
||||
if ("cron" in sched) {
|
||||
delete sched.cron;
|
||||
mutated = true;
|
||||
trackIssue("legacyScheduleCron");
|
||||
}
|
||||
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
|
||||
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
|
||||
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
|
||||
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
|
||||
if (targetStaggerMs === undefined) {
|
||||
if ("staggerMs" in sched) {
|
||||
delete sched.staggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (sched.staggerMs !== targetStaggerMs) {
|
||||
sched.staggerMs = targetStaggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const delivery = raw.delivery;
|
||||
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
|
||||
const modeRaw = (delivery as { mode?: unknown }).mode;
|
||||
if (typeof modeRaw === "string") {
|
||||
const lowered = modeRaw.trim().toLowerCase();
|
||||
if (lowered === "deliver") {
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
trackIssue("legacyDeliveryMode");
|
||||
}
|
||||
} else if (modeRaw === undefined || modeRaw === null) {
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const isolation = raw.isolation;
|
||||
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
|
||||
delete raw.isolation;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payloadKind =
|
||||
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
|
||||
const normalizedSessionTarget =
|
||||
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
|
||||
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
|
||||
if (raw.sessionTarget !== normalizedSessionTarget) {
|
||||
raw.sessionTarget = normalizedSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
|
||||
if (raw.sessionTarget !== inferredSessionTarget) {
|
||||
raw.sessionTarget = inferredSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const sessionTarget =
|
||||
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
|
||||
const isIsolatedAgentTurn =
|
||||
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
|
||||
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
|
||||
const normalizedLegacy = normalizeLegacyDeliveryInput({
|
||||
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
|
||||
payload: payloadRecord,
|
||||
});
|
||||
|
||||
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
|
||||
if (!hasDelivery && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
} else if (!hasDelivery) {
|
||||
raw.delivery = { mode: "announce" };
|
||||
mutated = true;
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
return { issues, jobs, mutated };
|
||||
}
|
||||
@@ -848,32 +848,6 @@ describe("gateway server cron", () => {
|
||||
'Cron job "failure destination webhook" failed: unknown error',
|
||||
);
|
||||
|
||||
fetchWithSsrFGuardMock.mockClear();
|
||||
cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "best-effort failed" });
|
||||
const bestEffortFailureDestJobId = await addWebhookCronJob({
|
||||
ws,
|
||||
name: "best effort failure destination webhook",
|
||||
sessionTarget: "isolated",
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "19098680",
|
||||
bestEffort: true,
|
||||
failureDestination: {
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/failure-destination",
|
||||
},
|
||||
},
|
||||
});
|
||||
const bestEffortFailureDestFinished = waitForCronEvent(
|
||||
ws,
|
||||
(payload) =>
|
||||
payload?.jobId === bestEffortFailureDestJobId && payload?.action === "finished",
|
||||
);
|
||||
await runCronJobForce(ws, bestEffortFailureDestJobId);
|
||||
await bestEffortFailureDestFinished;
|
||||
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
|
||||
|
||||
cronIsolatedRun.mockResolvedValueOnce({ status: "ok", summary: "" });
|
||||
const noSummaryJobId = await addWebhookCronJob({
|
||||
ws,
|
||||
@@ -887,7 +861,7 @@ describe("gateway server cron", () => {
|
||||
);
|
||||
await runCronJobForce(ws, noSummaryJobId);
|
||||
await noSummaryFinished;
|
||||
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
|
||||
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
await cleanupCronTestRun({ ws, server, prevSkipCron });
|
||||
}
|
||||
|
||||
@@ -75,10 +75,6 @@ describe("gateway server hooks", () => {
|
||||
expect(resAgent.status).toBe(200);
|
||||
const agentEvents = await waitForSystemEvent();
|
||||
expect(agentEvents.some((e) => e.includes("Hook Email: done"))).toBe(true);
|
||||
const firstCall = (cronIsolatedRun.mock.calls[0] as unknown[] | undefined)?.[0] as {
|
||||
deliveryContract?: string;
|
||||
};
|
||||
expect(firstCall?.deliveryContract).toBe("shared");
|
||||
drainSystemEvents(resolveMainKey());
|
||||
|
||||
mockIsolatedRunOkOnce();
|
||||
|
||||
@@ -76,7 +76,6 @@ export function createGatewayHooksRequestHandler(params: {
|
||||
message: value.message,
|
||||
sessionKey,
|
||||
lane: "cron",
|
||||
deliveryContract: "shared",
|
||||
});
|
||||
const summary = result.summary?.trim() || result.error?.trim() || result.status;
|
||||
const prefix =
|
||||
|
||||
@@ -1,49 +0,0 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { createMockPluginRegistry } from "./hooks.test-helpers.js";
|
||||
|
||||
async function importHookRunnerGlobalModule() {
|
||||
return import("./hook-runner-global.js");
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
const mod = await importHookRunnerGlobalModule();
|
||||
mod.resetGlobalHookRunner();
|
||||
vi.resetModules();
|
||||
});
|
||||
|
||||
describe("hook-runner-global", () => {
|
||||
it("preserves the initialized runner across module reloads", async () => {
|
||||
const modA = await importHookRunnerGlobalModule();
|
||||
const registry = createMockPluginRegistry([{ hookName: "message_received", handler: vi.fn() }]);
|
||||
|
||||
modA.initializeGlobalHookRunner(registry);
|
||||
expect(modA.getGlobalHookRunner()?.hasHooks("message_received")).toBe(true);
|
||||
|
||||
vi.resetModules();
|
||||
|
||||
const modB = await importHookRunnerGlobalModule();
|
||||
expect(modB.getGlobalHookRunner()).not.toBeNull();
|
||||
expect(modB.getGlobalHookRunner()?.hasHooks("message_received")).toBe(true);
|
||||
expect(modB.getGlobalPluginRegistry()).toBe(registry);
|
||||
});
|
||||
|
||||
it("clears the shared state across module reloads", async () => {
|
||||
const modA = await importHookRunnerGlobalModule();
|
||||
const registry = createMockPluginRegistry([{ hookName: "message_received", handler: vi.fn() }]);
|
||||
|
||||
modA.initializeGlobalHookRunner(registry);
|
||||
|
||||
vi.resetModules();
|
||||
|
||||
const modB = await importHookRunnerGlobalModule();
|
||||
modB.resetGlobalHookRunner();
|
||||
expect(modB.getGlobalHookRunner()).toBeNull();
|
||||
expect(modB.getGlobalPluginRegistry()).toBeNull();
|
||||
|
||||
vi.resetModules();
|
||||
|
||||
const modC = await importHookRunnerGlobalModule();
|
||||
expect(modC.getGlobalHookRunner()).toBeNull();
|
||||
expect(modC.getGlobalPluginRegistry()).toBeNull();
|
||||
});
|
||||
});
|
||||
@@ -12,31 +12,16 @@ import type { PluginHookGatewayContext, PluginHookGatewayStopEvent } from "./typ
|
||||
|
||||
const log = createSubsystemLogger("plugins");
|
||||
|
||||
type HookRunnerGlobalState = {
|
||||
hookRunner: HookRunner | null;
|
||||
registry: PluginRegistry | null;
|
||||
};
|
||||
|
||||
const hookRunnerGlobalStateKey = Symbol.for("openclaw.plugins.hook-runner-global-state");
|
||||
|
||||
function getHookRunnerGlobalState(): HookRunnerGlobalState {
|
||||
const globalStore = globalThis as typeof globalThis & {
|
||||
[hookRunnerGlobalStateKey]?: HookRunnerGlobalState;
|
||||
};
|
||||
return (globalStore[hookRunnerGlobalStateKey] ??= {
|
||||
hookRunner: null,
|
||||
registry: null,
|
||||
});
|
||||
}
|
||||
let globalHookRunner: HookRunner | null = null;
|
||||
let globalRegistry: PluginRegistry | null = null;
|
||||
|
||||
/**
|
||||
* Initialize the global hook runner with a plugin registry.
|
||||
* Called once when plugins are loaded during gateway startup.
|
||||
*/
|
||||
export function initializeGlobalHookRunner(registry: PluginRegistry): void {
|
||||
const state = getHookRunnerGlobalState();
|
||||
state.registry = registry;
|
||||
state.hookRunner = createHookRunner(registry, {
|
||||
globalRegistry = registry;
|
||||
globalHookRunner = createHookRunner(registry, {
|
||||
logger: {
|
||||
debug: (msg) => log.debug(msg),
|
||||
warn: (msg) => log.warn(msg),
|
||||
@@ -56,7 +41,7 @@ export function initializeGlobalHookRunner(registry: PluginRegistry): void {
|
||||
* Returns null if plugins haven't been loaded yet.
|
||||
*/
|
||||
export function getGlobalHookRunner(): HookRunner | null {
|
||||
return getHookRunnerGlobalState().hookRunner;
|
||||
return globalHookRunner;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -64,14 +49,14 @@ export function getGlobalHookRunner(): HookRunner | null {
|
||||
* Returns null if plugins haven't been loaded yet.
|
||||
*/
|
||||
export function getGlobalPluginRegistry(): PluginRegistry | null {
|
||||
return getHookRunnerGlobalState().registry;
|
||||
return globalRegistry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if any hooks are registered for a given hook name.
|
||||
*/
|
||||
export function hasGlobalHooks(hookName: Parameters<HookRunner["hasHooks"]>[0]): boolean {
|
||||
return getHookRunnerGlobalState().hookRunner?.hasHooks(hookName) ?? false;
|
||||
return globalHookRunner?.hasHooks(hookName) ?? false;
|
||||
}
|
||||
|
||||
export async function runGlobalGatewayStopSafely(params: {
|
||||
@@ -98,7 +83,6 @@ export async function runGlobalGatewayStopSafely(params: {
|
||||
* Reset the global hook runner (for testing).
|
||||
*/
|
||||
export function resetGlobalHookRunner(): void {
|
||||
const state = getHookRunnerGlobalState();
|
||||
state.hookRunner = null;
|
||||
state.registry = null;
|
||||
globalHookRunner = null;
|
||||
globalRegistry = null;
|
||||
}
|
||||
|
||||
@@ -270,4 +270,58 @@ describe("registerTelegramNativeCommands", () => {
|
||||
);
|
||||
expect(sendMessage).not.toHaveBeenCalledWith(123, "Command not found.");
|
||||
});
|
||||
|
||||
it("uses the DM thread session key for plugin command internal sent hooks", async () => {
|
||||
const commandHandlers = new Map<string, (ctx: unknown) => Promise<void>>();
|
||||
|
||||
pluginCommandMocks.getPluginCommandSpecs.mockReturnValue([
|
||||
{
|
||||
name: "plug",
|
||||
description: "Plugin command",
|
||||
},
|
||||
] as never);
|
||||
pluginCommandMocks.matchPluginCommand.mockReturnValue({
|
||||
command: { key: "plug", requireAuth: false },
|
||||
args: undefined,
|
||||
} as never);
|
||||
|
||||
registerTelegramNativeCommands({
|
||||
...buildParams({
|
||||
channels: {
|
||||
telegram: {
|
||||
dmPolicy: "open",
|
||||
},
|
||||
},
|
||||
}),
|
||||
bot: {
|
||||
api: {
|
||||
setMyCommands: vi.fn().mockResolvedValue(undefined),
|
||||
sendMessage: vi.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
command: vi.fn((name: string, cb: (ctx: unknown) => Promise<void>) => {
|
||||
commandHandlers.set(name, cb);
|
||||
}),
|
||||
} as unknown as Parameters<typeof registerTelegramNativeCommands>[0]["bot"],
|
||||
});
|
||||
|
||||
const handler = commandHandlers.get("plug");
|
||||
expect(handler).toBeTruthy();
|
||||
|
||||
await handler?.({
|
||||
match: "",
|
||||
message: {
|
||||
message_id: 42,
|
||||
date: Math.floor(Date.now() / 1000),
|
||||
chat: { id: 12345, type: "private" },
|
||||
from: { id: 12345, username: "alice" },
|
||||
message_thread_id: 99,
|
||||
},
|
||||
});
|
||||
|
||||
expect(deliveryMocks.deliverReplies).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionKeyForInternalHooks: "agent:main:main:thread:12345:99",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -540,6 +540,20 @@ export const registerTelegramNativeCommands = ({
|
||||
chunkMode: params.chunkMode,
|
||||
linkPreview: telegramCfg.linkPreview,
|
||||
});
|
||||
const resolveDmThreadSessionKey = (params: {
|
||||
baseSessionKey: string;
|
||||
chatId: string | number;
|
||||
threadSpec: ReturnType<typeof resolveTelegramThreadSpec>;
|
||||
}): string => {
|
||||
const dmThreadId = params.threadSpec.scope === "dm" ? params.threadSpec.id : undefined;
|
||||
if (dmThreadId == null) {
|
||||
return params.baseSessionKey;
|
||||
}
|
||||
return resolveThreadSessionKeys({
|
||||
baseSessionKey: params.baseSessionKey,
|
||||
threadId: `${params.chatId}:${dmThreadId}`,
|
||||
}).sessionKey;
|
||||
};
|
||||
|
||||
if (commandsToRegister.length > 0 || pluginCatalog.commands.length > 0) {
|
||||
if (typeof (bot as unknown as { command?: unknown }).command !== "function") {
|
||||
@@ -647,17 +661,11 @@ export const registerTelegramNativeCommands = ({
|
||||
});
|
||||
return;
|
||||
}
|
||||
const baseSessionKey = route.sessionKey;
|
||||
// DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums)
|
||||
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
|
||||
const threadKeys =
|
||||
dmThreadId != null
|
||||
? resolveThreadSessionKeys({
|
||||
baseSessionKey,
|
||||
threadId: `${chatId}:${dmThreadId}`,
|
||||
})
|
||||
: null;
|
||||
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
|
||||
const sessionKey = resolveDmThreadSessionKey({
|
||||
baseSessionKey: route.sessionKey,
|
||||
chatId,
|
||||
threadSpec,
|
||||
});
|
||||
const { skillFilter, groupSystemPrompt } = resolveTelegramGroupPromptSettings({
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
@@ -833,10 +841,15 @@ export const registerTelegramNativeCommands = ({
|
||||
return;
|
||||
}
|
||||
const { threadSpec, route, mediaLocalRoots, tableMode, chunkMode } = runtimeContext;
|
||||
const sessionKeyForInternalHooks = resolveDmThreadSessionKey({
|
||||
baseSessionKey: route.sessionKey,
|
||||
chatId,
|
||||
threadSpec,
|
||||
});
|
||||
const deliveryBaseOptions = buildCommandDeliveryBaseOptions({
|
||||
chatId,
|
||||
accountId: route.accountId,
|
||||
sessionKeyForInternalHooks: route.sessionKey,
|
||||
sessionKeyForInternalHooks,
|
||||
mirrorIsGroup: isGroup,
|
||||
mirrorGroupId: isGroup ? String(chatId) : undefined,
|
||||
mediaLocalRoots,
|
||||
|
||||
Reference in New Issue
Block a user