mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-21 06:22:28 +08:00
Compare commits
94 Commits
vincentkoc
...
gateway/no
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f2c544f057 | ||
|
|
c49af9ea7c | ||
|
|
54536e48f5 | ||
|
|
2b2e5e2038 | ||
|
|
0bcddb3d4f | ||
|
|
d86647d7db | ||
|
|
87d939be79 | ||
|
|
d4e59a3666 | ||
|
|
7b88249c9e | ||
|
|
12702e11a5 | ||
|
|
14bbcad169 | ||
|
|
eab39c721b | ||
|
|
4815dc0603 | ||
|
|
2cce45962f | ||
|
|
258b7902a4 | ||
|
|
425bd89b48 | ||
|
|
54be30ef89 | ||
|
|
fbf5d56366 | ||
|
|
98ea71aca5 | ||
|
|
51bae75120 | ||
|
|
f2f561fab1 | ||
|
|
f6d0712f50 | ||
|
|
6c579d7842 | ||
|
|
f9706fde6a | ||
|
|
7217b97658 | ||
|
|
ce9e91fdfc | ||
|
|
3caab9260c | ||
|
|
d0847ee322 | ||
|
|
1d3dde8d21 | ||
|
|
cc0f30f5fb | ||
|
|
250d3c949e | ||
|
|
5fca4c0de0 | ||
|
|
66c581c64c | ||
|
|
912aa8744a | ||
|
|
8d2d6db9ad | ||
|
|
2d55ad05f3 | ||
|
|
9631f4665c | ||
|
|
e2a1a4a3db | ||
|
|
f82931ba8b | ||
|
|
17599a8ea2 | ||
|
|
e86b38f09d | ||
|
|
1d301f74a6 | ||
|
|
2e79d82198 | ||
|
|
96d17f3cb1 | ||
|
|
79853aca9c | ||
|
|
2d5e70f3e7 | ||
|
|
6186f620d2 | ||
|
|
2767907abf | ||
|
|
9abf014f35 | ||
|
|
cf3a479bd1 | ||
|
|
fd902b0651 | ||
|
|
cf796e2a22 | ||
|
|
f84adcbe88 | ||
|
|
f184e7811c | ||
|
|
c79a0dbdb4 | ||
|
|
335223af32 | ||
|
|
6740cdf160 | ||
|
|
eea925b12b | ||
|
|
88aee9161e | ||
|
|
03a6e3b460 | ||
|
|
41e023a80b | ||
|
|
93775ef6a4 | ||
|
|
31402b8542 | ||
|
|
4bb8104810 | ||
|
|
1d6a2d0165 | ||
|
|
44beb7be1f | ||
|
|
69cd376e3b | ||
|
|
41eef15cdc | ||
|
|
41450187dd | ||
|
|
a40c29b11a | ||
|
|
d4a960fcca | ||
|
|
26e76f9a61 | ||
|
|
8befd88119 | ||
|
|
99cbda83a2 | ||
|
|
e8775cda93 | ||
|
|
ef36cb8cbc | ||
|
|
f114a5c638 | ||
|
|
a438ff4397 | ||
|
|
adec8b28bb | ||
|
|
e3df94365b | ||
|
|
4d501e4ccf | ||
|
|
f6243916b5 | ||
|
|
b34158086a | ||
|
|
eabda6e3a4 | ||
|
|
6d5e142b93 | ||
|
|
4f42c03a49 | ||
|
|
13bd3db307 | ||
|
|
ff4745fc3f | ||
|
|
c29b098744 | ||
|
|
24b53fcf47 | ||
|
|
dfc18b7a2b | ||
|
|
141738f717 | ||
|
|
4ff4ed7ec9 | ||
|
|
362248e559 |
@@ -8,6 +8,8 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Breaking
|
||||
|
||||
- Cron/doctor: tighten isolated cron delivery so cron jobs can no longer notify through ad hoc agent sends or fallback main-session summaries, and add `openclaw doctor --fix` migration for legacy cron storage and legacy notify/webhook delivery metadata. (#40998) Thanks @mbelinky.
|
||||
|
||||
### Fixes
|
||||
|
||||
- macOS/LaunchAgent install: tighten LaunchAgent directory and plist permissions during install so launchd bootstrap does not fail when the target home path or generated plist inherited group/world-writable modes.
|
||||
@@ -18,6 +20,9 @@ Docs: https://docs.openclaw.ai
|
||||
- ACP/sessions.patch: allow `spawnedBy` and `spawnDepth` lineage fields on ACP session keys so `sessions_spawn` with `runtime: "acp"` no longer fails during child-session setup. Fixes #40971. (#40995) thanks @xaeon2026.
|
||||
- ACP/stop reason mapping: resolve gateway chat `state: "error"` completions as ACP `end_turn` instead of `refusal` so transient backend failures are not surfaced as deliberate refusals. (#41187) thanks @pejmanjohn.
|
||||
- ACP/setSessionMode: propagate gateway `sessions.patch` failures back to ACP clients so rejected mode changes no longer return silent success. (#41185) thanks @pejmanjohn.
|
||||
- Agents/embedded logs: add structured, sanitized lifecycle and failover observation events so overload and provider failures are easier to tail and filter. (#41336) thanks @altaywtf.
|
||||
- iOS/gateway foreground recovery: reconnect immediately on foreground return after stale background sockets are torn down, so the app no longer stays disconnected until a later wake path happens. (#41384) Thanks @mbelinky.
|
||||
- Cron/subagent followup: do not misclassify empty or `NO_REPLY` cron responses as interim acknowledgements that need a rerun, so deliberately silent cron jobs are no longer retried. (#41383) thanks @jackal092927.
|
||||
|
||||
## 2026.3.8
|
||||
|
||||
|
||||
@@ -362,7 +362,14 @@ final class NodeAppModel {
|
||||
await MainActor.run {
|
||||
self.operatorConnected = false
|
||||
self.gatewayConnected = false
|
||||
// Foreground recovery must actively restart the saved gateway config.
|
||||
// Disconnecting stale sockets alone can leave us idle if the old
|
||||
// reconnect tasks were suppressed or otherwise got stuck in background.
|
||||
self.gatewayStatusText = "Reconnecting…"
|
||||
self.talkMode.updateGatewayConnected(false)
|
||||
if let cfg = self.activeGatewayConnectConfig {
|
||||
self.applyGatewayConnectConfig(cfg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ Troubleshooting: [/automation/troubleshooting](/automation/troubleshooting)
|
||||
- Wakeups are first-class: a job can request “wake now” vs “next heartbeat”.
|
||||
- Webhook posting is per job via `delivery.mode = "webhook"` + `delivery.to = "<url>"`.
|
||||
- Legacy fallback remains for stored jobs with `notify: true` when `cron.webhook` is set, migrate those jobs to webhook delivery mode.
|
||||
- For upgrades, `openclaw doctor --fix` can normalize legacy cron store fields before the scheduler touches them.
|
||||
|
||||
## Quick start (actionable)
|
||||
|
||||
|
||||
@@ -30,6 +30,12 @@ Note: retention/pruning is controlled in config:
|
||||
- `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions.
|
||||
- `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/<jobId>.jsonl`.
|
||||
|
||||
Upgrade note: if you have older cron jobs from before the current delivery/store format, run
|
||||
`openclaw doctor --fix`. Doctor now normalizes legacy cron fields (`jobId`, `schedule.cron`,
|
||||
top-level delivery fields, payload `provider` delivery aliases) and migrates simple
|
||||
`notify: true` webhook fallback jobs to explicit webhook delivery when `cron.webhook` is
|
||||
configured.
|
||||
|
||||
## Common edits
|
||||
|
||||
Update delivery settings without changing the message:
|
||||
|
||||
@@ -28,6 +28,7 @@ Notes:
|
||||
- Interactive prompts (like keychain/OAuth fixes) only run when stdin is a TTY and `--non-interactive` is **not** set. Headless runs (cron, Telegram, no terminal) will skip prompts.
|
||||
- `--fix` (alias for `--repair`) writes a backup to `~/.openclaw/openclaw.json.bak` and drops unknown config keys, listing each removal.
|
||||
- State integrity checks now detect orphan transcript files in the sessions directory and can archive them as `.deleted.<timestamp>` to reclaim space safely.
|
||||
- Doctor also scans `~/.openclaw/cron/jobs.json` (or `cron.store`) for legacy cron job shapes and can rewrite them in place before the scheduler has to auto-normalize them at runtime.
|
||||
- Doctor includes a memory-search readiness check and can recommend `openclaw configure --section model` when embedding credentials are missing.
|
||||
- If sandbox mode is enabled but Docker is unavailable, doctor reports a high-signal warning with remediation (`install Docker` or `openclaw config set agents.defaults.sandbox.mode off`).
|
||||
|
||||
|
||||
@@ -65,6 +65,7 @@ cat ~/.openclaw/openclaw.json
|
||||
- Config normalization for legacy values.
|
||||
- OpenCode Zen provider override warnings (`models.providers.opencode`).
|
||||
- Legacy on-disk state migration (sessions/agent dir/WhatsApp auth).
|
||||
- Legacy cron store migration (`jobId`, `schedule.cron`, top-level delivery/payload fields, payload `provider`, simple `notify: true` webhook fallback jobs).
|
||||
- State integrity and permissions checks (sessions, transcripts, state dir).
|
||||
- Config file permission checks (chmod 600) when running locally.
|
||||
- Model auth health: checks OAuth expiry, can refresh expiring tokens, and reports auth-profile cooldown/disabled states.
|
||||
@@ -158,6 +159,25 @@ the legacy sessions + agent dir on startup so history/auth/models land in the
|
||||
per-agent path without a manual doctor run. WhatsApp auth is intentionally only
|
||||
migrated via `openclaw doctor`.
|
||||
|
||||
### 3b) Legacy cron store migrations
|
||||
|
||||
Doctor also checks the cron job store (`~/.openclaw/cron/jobs.json` by default,
|
||||
or `cron.store` when overridden) for old job shapes that the scheduler still
|
||||
accepts for compatibility.
|
||||
|
||||
Current cron cleanups include:
|
||||
|
||||
- `jobId` → `id`
|
||||
- `schedule.cron` → `schedule.expr`
|
||||
- top-level payload fields (`message`, `model`, `thinking`, ...) → `payload`
|
||||
- top-level delivery fields (`deliver`, `channel`, `to`, `provider`, ...) → `delivery`
|
||||
- payload `provider` delivery aliases → explicit `delivery.channel`
|
||||
- simple legacy `notify: true` webhook fallback jobs → explicit `delivery.mode="webhook"` with `delivery.to=cron.webhook`
|
||||
|
||||
Doctor only auto-migrates `notify: true` jobs when it can do so without
|
||||
changing behavior. If a job combines legacy notify fallback with an existing
|
||||
non-webhook delivery mode, doctor warns and leaves that job for manual review.
|
||||
|
||||
### 4) State integrity checks (session persistence, routing, and safety)
|
||||
|
||||
The state directory is the operational brainstem. If it vanishes, you lose
|
||||
|
||||
182
src/agents/pi-embedded-error-observation.test.ts
Normal file
182
src/agents/pi-embedded-error-observation.test.ts
Normal file
@@ -0,0 +1,182 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import * as loggingConfigModule from "../logging/config.js";
|
||||
import {
|
||||
buildApiErrorObservationFields,
|
||||
buildTextObservationFields,
|
||||
sanitizeForConsole,
|
||||
} from "./pi-embedded-error-observation.js";
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe("buildApiErrorObservationFields", () => {
|
||||
it("redacts request ids and exposes stable hashes instead of raw payloads", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_overload"}',
|
||||
);
|
||||
|
||||
expect(observed).toMatchObject({
|
||||
rawErrorPreview: expect.stringContaining('"request_id":"sha256:'),
|
||||
rawErrorHash: expect.stringMatching(/^sha256:/),
|
||||
rawErrorFingerprint: expect.stringMatching(/^sha256:/),
|
||||
providerErrorType: "overloaded_error",
|
||||
providerErrorMessagePreview: "Overloaded",
|
||||
requestIdHash: expect.stringMatching(/^sha256:/),
|
||||
});
|
||||
expect(observed.rawErrorPreview).not.toContain("req_overload");
|
||||
});
|
||||
|
||||
it("forces token redaction for observation previews", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
"Authorization: Bearer sk-abcdefghijklmnopqrstuvwxyz123456",
|
||||
);
|
||||
|
||||
expect(observed.rawErrorPreview).not.toContain("sk-abcdefghijklmnopqrstuvwxyz123456");
|
||||
expect(observed.rawErrorPreview).toContain("sk-abc");
|
||||
expect(observed.rawErrorHash).toMatch(/^sha256:/);
|
||||
});
|
||||
|
||||
it("redacts observation-only header and cookie formats", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
"x-api-key: sk-abcdefghijklmnopqrstuvwxyz123456 Cookie: session=abcdefghijklmnopqrstuvwxyz123456",
|
||||
);
|
||||
|
||||
expect(observed.rawErrorPreview).not.toContain("abcdefghijklmnopqrstuvwxyz123456");
|
||||
expect(observed.rawErrorPreview).toContain("x-api-key: ***");
|
||||
expect(observed.rawErrorPreview).toContain("Cookie: session=");
|
||||
});
|
||||
|
||||
it("does not let cookie redaction consume unrelated fields on the same line", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
"Cookie: session=abcdefghijklmnopqrstuvwxyz123456 status=503 request_id=req_cookie",
|
||||
);
|
||||
|
||||
expect(observed.rawErrorPreview).toContain("Cookie: session=");
|
||||
expect(observed.rawErrorPreview).toContain("status=503");
|
||||
expect(observed.rawErrorPreview).toContain("request_id=sha256:");
|
||||
});
|
||||
|
||||
it("builds sanitized generic text observation fields", () => {
|
||||
const observed = buildTextObservationFields(
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_prev"}',
|
||||
);
|
||||
|
||||
expect(observed).toMatchObject({
|
||||
textPreview: expect.stringContaining('"request_id":"sha256:'),
|
||||
textHash: expect.stringMatching(/^sha256:/),
|
||||
textFingerprint: expect.stringMatching(/^sha256:/),
|
||||
providerErrorType: "overloaded_error",
|
||||
providerErrorMessagePreview: "Overloaded",
|
||||
requestIdHash: expect.stringMatching(/^sha256:/),
|
||||
});
|
||||
expect(observed.textPreview).not.toContain("req_prev");
|
||||
});
|
||||
|
||||
it("redacts request ids in formatted plain-text errors", () => {
|
||||
const observed = buildApiErrorObservationFields(
|
||||
"LLM error overloaded_error: Overloaded (request_id: req_plaintext_123)",
|
||||
);
|
||||
|
||||
expect(observed).toMatchObject({
|
||||
rawErrorPreview: expect.stringContaining("request_id: sha256:"),
|
||||
rawErrorFingerprint: expect.stringMatching(/^sha256:/),
|
||||
requestIdHash: expect.stringMatching(/^sha256:/),
|
||||
});
|
||||
expect(observed.rawErrorPreview).not.toContain("req_plaintext_123");
|
||||
});
|
||||
|
||||
it("keeps fingerprints stable across request ids for equivalent errors", () => {
|
||||
const first = buildApiErrorObservationFields(
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_001"}',
|
||||
);
|
||||
const second = buildApiErrorObservationFields(
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_002"}',
|
||||
);
|
||||
|
||||
expect(first.rawErrorFingerprint).toBe(second.rawErrorFingerprint);
|
||||
expect(first.rawErrorHash).not.toBe(second.rawErrorHash);
|
||||
});
|
||||
|
||||
it("truncates oversized raw and provider previews", () => {
|
||||
const longMessage = "X".repeat(260);
|
||||
const observed = buildApiErrorObservationFields(
|
||||
`{"type":"error","error":{"type":"server_error","message":"${longMessage}"},"request_id":"req_long"}`,
|
||||
);
|
||||
|
||||
expect(observed.rawErrorPreview).toBeDefined();
|
||||
expect(observed.providerErrorMessagePreview).toBeDefined();
|
||||
expect(observed.rawErrorPreview?.length).toBeLessThanOrEqual(401);
|
||||
expect(observed.providerErrorMessagePreview?.length).toBeLessThanOrEqual(201);
|
||||
expect(observed.providerErrorMessagePreview?.endsWith("…")).toBe(true);
|
||||
});
|
||||
|
||||
it("caps oversized raw inputs before hashing and fingerprinting", () => {
|
||||
const oversized = "X".repeat(70_000);
|
||||
const bounded = "X".repeat(64_000);
|
||||
|
||||
expect(buildApiErrorObservationFields(oversized)).toMatchObject({
|
||||
rawErrorHash: buildApiErrorObservationFields(bounded).rawErrorHash,
|
||||
rawErrorFingerprint: buildApiErrorObservationFields(bounded).rawErrorFingerprint,
|
||||
});
|
||||
});
|
||||
|
||||
it("returns empty observation fields for empty input", () => {
|
||||
expect(buildApiErrorObservationFields(undefined)).toEqual({});
|
||||
expect(buildApiErrorObservationFields("")).toEqual({});
|
||||
expect(buildApiErrorObservationFields(" ")).toEqual({});
|
||||
});
|
||||
|
||||
it("re-reads configured redact patterns on each call", () => {
|
||||
const readLoggingConfig = vi.spyOn(loggingConfigModule, "readLoggingConfig");
|
||||
readLoggingConfig.mockReturnValueOnce(undefined);
|
||||
readLoggingConfig.mockReturnValueOnce({
|
||||
redactPatterns: [String.raw`\bcustom-secret-[A-Za-z0-9]+\b`],
|
||||
});
|
||||
|
||||
const first = buildApiErrorObservationFields("custom-secret-abc123");
|
||||
const second = buildApiErrorObservationFields("custom-secret-abc123");
|
||||
|
||||
expect(first.rawErrorPreview).toContain("custom-secret-abc123");
|
||||
expect(second.rawErrorPreview).not.toContain("custom-secret-abc123");
|
||||
expect(second.rawErrorPreview).toContain("custom");
|
||||
});
|
||||
|
||||
it("fails closed when observation sanitization throws", () => {
|
||||
vi.spyOn(loggingConfigModule, "readLoggingConfig").mockImplementation(() => {
|
||||
throw new Error("boom");
|
||||
});
|
||||
|
||||
expect(buildApiErrorObservationFields("request_id=req_123")).toEqual({});
|
||||
expect(buildTextObservationFields("request_id=req_123")).toEqual({
|
||||
textPreview: undefined,
|
||||
textHash: undefined,
|
||||
textFingerprint: undefined,
|
||||
httpCode: undefined,
|
||||
providerErrorType: undefined,
|
||||
providerErrorMessagePreview: undefined,
|
||||
requestIdHash: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores non-string configured redact patterns", () => {
|
||||
vi.spyOn(loggingConfigModule, "readLoggingConfig").mockReturnValue({
|
||||
redactPatterns: [
|
||||
123 as never,
|
||||
{ bad: true } as never,
|
||||
String.raw`\bcustom-secret-[A-Za-z0-9]+\b`,
|
||||
],
|
||||
});
|
||||
|
||||
const observed = buildApiErrorObservationFields("custom-secret-abc123");
|
||||
|
||||
expect(observed.rawErrorPreview).not.toContain("custom-secret-abc123");
|
||||
expect(observed.rawErrorPreview).toContain("custom");
|
||||
});
|
||||
});
|
||||
|
||||
describe("sanitizeForConsole", () => {
|
||||
it("strips control characters from console-facing values", () => {
|
||||
expect(sanitizeForConsole("run-1\nprovider\tmodel\rtest")).toBe("run-1 provider model test");
|
||||
});
|
||||
});
|
||||
199
src/agents/pi-embedded-error-observation.ts
Normal file
199
src/agents/pi-embedded-error-observation.ts
Normal file
@@ -0,0 +1,199 @@
|
||||
import { readLoggingConfig } from "../logging/config.js";
|
||||
import { redactIdentifier } from "../logging/redact-identifier.js";
|
||||
import { getDefaultRedactPatterns, redactSensitiveText } from "../logging/redact.js";
|
||||
import { getApiErrorPayloadFingerprint, parseApiErrorInfo } from "./pi-embedded-helpers.js";
|
||||
import { stableStringify } from "./stable-stringify.js";
|
||||
|
||||
const MAX_OBSERVATION_INPUT_CHARS = 64_000;
|
||||
const MAX_FINGERPRINT_MESSAGE_CHARS = 8_000;
|
||||
const RAW_ERROR_PREVIEW_MAX_CHARS = 400;
|
||||
const PROVIDER_ERROR_PREVIEW_MAX_CHARS = 200;
|
||||
const REQUEST_ID_RE = /\brequest[_ ]?id\b\s*[:=]\s*["'()]*([A-Za-z0-9._:-]+)/i;
|
||||
const OBSERVATION_EXTRA_REDACT_PATTERNS = [
|
||||
String.raw`\b(?:x-)?api[-_]?key\b\s*[:=]\s*(["']?)([^\s"'\\;]+)\1`,
|
||||
String.raw`"(?:api[-_]?key|api_key)"\s*:\s*"([^"]+)"`,
|
||||
String.raw`(?:\bCookie\b\s*[:=]\s*[^;=\s]+=|;\s*[^;=\s]+=)([^;\s\r\n]+)`,
|
||||
];
|
||||
|
||||
function resolveConfiguredRedactPatterns(): string[] {
|
||||
const configured = readLoggingConfig()?.redactPatterns;
|
||||
if (!Array.isArray(configured)) {
|
||||
return [];
|
||||
}
|
||||
return configured.filter((pattern): pattern is string => typeof pattern === "string");
|
||||
}
|
||||
|
||||
function truncateForObservation(text: string | undefined, maxChars: number): string | undefined {
|
||||
const trimmed = text?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
return trimmed.length > maxChars ? `${trimmed.slice(0, maxChars)}…` : trimmed;
|
||||
}
|
||||
|
||||
function boundObservationInput(text: string | undefined): string | undefined {
|
||||
const trimmed = text?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
return trimmed.length > MAX_OBSERVATION_INPUT_CHARS
|
||||
? trimmed.slice(0, MAX_OBSERVATION_INPUT_CHARS)
|
||||
: trimmed;
|
||||
}
|
||||
|
||||
export function sanitizeForConsole(text: string | undefined, maxChars = 200): string | undefined {
|
||||
const trimmed = text?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
const withoutControlChars = Array.from(trimmed)
|
||||
.filter((char) => {
|
||||
const code = char.charCodeAt(0);
|
||||
return !(
|
||||
code <= 0x08 ||
|
||||
code === 0x0b ||
|
||||
code === 0x0c ||
|
||||
(code >= 0x0e && code <= 0x1f) ||
|
||||
code === 0x7f
|
||||
);
|
||||
})
|
||||
.join("");
|
||||
const sanitized = withoutControlChars
|
||||
.replace(/[\r\n\t]+/g, " ")
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
return sanitized.length > maxChars ? `${sanitized.slice(0, maxChars)}…` : sanitized;
|
||||
}
|
||||
|
||||
function replaceRequestIdPreview(
|
||||
text: string | undefined,
|
||||
requestId: string | undefined,
|
||||
): string | undefined {
|
||||
if (!text || !requestId) {
|
||||
return text;
|
||||
}
|
||||
return text.split(requestId).join(redactIdentifier(requestId, { len: 12 }));
|
||||
}
|
||||
|
||||
function redactObservationText(text: string | undefined): string | undefined {
|
||||
if (!text) {
|
||||
return text;
|
||||
}
|
||||
// Observation logs must stay redacted even when operators disable general-purpose
|
||||
// log redaction, otherwise raw provider payloads leak back into always-on logs.
|
||||
const configuredPatterns = resolveConfiguredRedactPatterns();
|
||||
return redactSensitiveText(text, {
|
||||
mode: "tools",
|
||||
patterns: [
|
||||
...getDefaultRedactPatterns(),
|
||||
...configuredPatterns,
|
||||
...OBSERVATION_EXTRA_REDACT_PATTERNS,
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
function extractRequestId(text: string | undefined): string | undefined {
|
||||
if (!text) {
|
||||
return undefined;
|
||||
}
|
||||
const match = text.match(REQUEST_ID_RE);
|
||||
return match?.[1]?.trim() || undefined;
|
||||
}
|
||||
|
||||
function buildObservationFingerprint(params: {
|
||||
raw: string;
|
||||
requestId?: string;
|
||||
httpCode?: string;
|
||||
type?: string;
|
||||
message?: string;
|
||||
}): string | null {
|
||||
const boundedMessage =
|
||||
params.message && params.message.length > MAX_FINGERPRINT_MESSAGE_CHARS
|
||||
? params.message.slice(0, MAX_FINGERPRINT_MESSAGE_CHARS)
|
||||
: params.message;
|
||||
const structured =
|
||||
params.httpCode || params.type || boundedMessage
|
||||
? stableStringify({
|
||||
httpCode: params.httpCode,
|
||||
type: params.type,
|
||||
message: boundedMessage,
|
||||
})
|
||||
: null;
|
||||
if (structured) {
|
||||
return structured;
|
||||
}
|
||||
if (params.requestId) {
|
||||
return params.raw.split(params.requestId).join("<request_id>");
|
||||
}
|
||||
return getApiErrorPayloadFingerprint(params.raw);
|
||||
}
|
||||
|
||||
export function buildApiErrorObservationFields(rawError?: string): {
|
||||
rawErrorPreview?: string;
|
||||
rawErrorHash?: string;
|
||||
rawErrorFingerprint?: string;
|
||||
httpCode?: string;
|
||||
providerErrorType?: string;
|
||||
providerErrorMessagePreview?: string;
|
||||
requestIdHash?: string;
|
||||
} {
|
||||
const trimmed = boundObservationInput(rawError);
|
||||
if (!trimmed) {
|
||||
return {};
|
||||
}
|
||||
try {
|
||||
const parsed = parseApiErrorInfo(trimmed);
|
||||
const requestId = parsed?.requestId ?? extractRequestId(trimmed);
|
||||
const requestIdHash = requestId ? redactIdentifier(requestId, { len: 12 }) : undefined;
|
||||
const rawFingerprint = buildObservationFingerprint({
|
||||
raw: trimmed,
|
||||
requestId,
|
||||
httpCode: parsed?.httpCode,
|
||||
type: parsed?.type,
|
||||
message: parsed?.message,
|
||||
});
|
||||
const redactedRawPreview = replaceRequestIdPreview(redactObservationText(trimmed), requestId);
|
||||
const redactedProviderMessage = replaceRequestIdPreview(
|
||||
redactObservationText(parsed?.message),
|
||||
requestId,
|
||||
);
|
||||
|
||||
return {
|
||||
rawErrorPreview: truncateForObservation(redactedRawPreview, RAW_ERROR_PREVIEW_MAX_CHARS),
|
||||
rawErrorHash: redactIdentifier(trimmed, { len: 12 }),
|
||||
rawErrorFingerprint: rawFingerprint
|
||||
? redactIdentifier(rawFingerprint, { len: 12 })
|
||||
: undefined,
|
||||
httpCode: parsed?.httpCode,
|
||||
providerErrorType: parsed?.type,
|
||||
providerErrorMessagePreview: truncateForObservation(
|
||||
redactedProviderMessage,
|
||||
PROVIDER_ERROR_PREVIEW_MAX_CHARS,
|
||||
),
|
||||
requestIdHash,
|
||||
};
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
export function buildTextObservationFields(text?: string): {
|
||||
textPreview?: string;
|
||||
textHash?: string;
|
||||
textFingerprint?: string;
|
||||
httpCode?: string;
|
||||
providerErrorType?: string;
|
||||
providerErrorMessagePreview?: string;
|
||||
requestIdHash?: string;
|
||||
} {
|
||||
const observed = buildApiErrorObservationFields(text);
|
||||
return {
|
||||
textPreview: observed.rawErrorPreview,
|
||||
textHash: observed.rawErrorHash,
|
||||
textFingerprint: observed.rawErrorFingerprint,
|
||||
httpCode: observed.httpCode,
|
||||
providerErrorType: observed.providerErrorType,
|
||||
providerErrorMessagePreview: observed.providerErrorMessagePreview,
|
||||
requestIdHash: observed.requestIdHash,
|
||||
};
|
||||
}
|
||||
@@ -61,6 +61,7 @@ import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
|
||||
import { log } from "./logger.js";
|
||||
import { resolveModel } from "./model.js";
|
||||
import { runEmbeddedAttempt } from "./run/attempt.js";
|
||||
import { createFailoverDecisionLogger } from "./run/failover-observation.js";
|
||||
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
|
||||
import { buildEmbeddedRunPayloads } from "./run/payloads.js";
|
||||
import {
|
||||
@@ -1226,11 +1227,26 @@ export async function runEmbeddedPiAgent(
|
||||
reason: promptProfileFailureReason,
|
||||
});
|
||||
const promptFailoverFailure = isFailoverErrorMessage(errorText);
|
||||
// Capture the failing profile before auth-profile rotation mutates `lastProfileId`.
|
||||
const failedPromptProfileId = lastProfileId;
|
||||
const logPromptFailoverDecision = createFailoverDecisionLogger({
|
||||
stage: "prompt",
|
||||
runId: params.runId,
|
||||
rawError: errorText,
|
||||
failoverReason: promptFailoverReason,
|
||||
profileFailureReason: promptProfileFailureReason,
|
||||
provider,
|
||||
model: modelId,
|
||||
profileId: failedPromptProfileId,
|
||||
fallbackConfigured,
|
||||
aborted,
|
||||
});
|
||||
if (
|
||||
promptFailoverFailure &&
|
||||
promptFailoverReason !== "timeout" &&
|
||||
(await advanceAuthProfile())
|
||||
) {
|
||||
logPromptFailoverDecision("rotate_profile");
|
||||
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
|
||||
continue;
|
||||
}
|
||||
@@ -1249,15 +1265,20 @@ export async function runEmbeddedPiAgent(
|
||||
// are configured so outer model fallback can continue on overload,
|
||||
// rate-limit, auth, or billing failures.
|
||||
if (fallbackConfigured && promptFailoverFailure) {
|
||||
const status = resolveFailoverStatus(promptFailoverReason ?? "unknown");
|
||||
logPromptFailoverDecision("fallback_model", { status });
|
||||
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
|
||||
throw new FailoverError(errorText, {
|
||||
reason: promptFailoverReason ?? "unknown",
|
||||
provider,
|
||||
model: modelId,
|
||||
profileId: lastProfileId,
|
||||
status: resolveFailoverStatus(promptFailoverReason ?? "unknown"),
|
||||
status,
|
||||
});
|
||||
}
|
||||
if (promptFailoverFailure || promptFailoverReason) {
|
||||
logPromptFailoverDecision("surface_error");
|
||||
}
|
||||
throw promptError;
|
||||
}
|
||||
|
||||
@@ -1282,6 +1303,21 @@ export async function runEmbeddedPiAgent(
|
||||
resolveAuthProfileFailureReason(assistantFailoverReason);
|
||||
const cloudCodeAssistFormatError = attempt.cloudCodeAssistFormatError;
|
||||
const imageDimensionError = parseImageDimensionError(lastAssistant?.errorMessage ?? "");
|
||||
// Capture the failing profile before auth-profile rotation mutates `lastProfileId`.
|
||||
const failedAssistantProfileId = lastProfileId;
|
||||
const logAssistantFailoverDecision = createFailoverDecisionLogger({
|
||||
stage: "assistant",
|
||||
runId: params.runId,
|
||||
rawError: lastAssistant?.errorMessage?.trim(),
|
||||
failoverReason: assistantFailoverReason,
|
||||
profileFailureReason: assistantProfileFailureReason,
|
||||
provider: activeErrorContext.provider,
|
||||
model: activeErrorContext.model,
|
||||
profileId: failedAssistantProfileId,
|
||||
fallbackConfigured,
|
||||
timedOut,
|
||||
aborted,
|
||||
});
|
||||
|
||||
if (
|
||||
authFailure &&
|
||||
@@ -1339,6 +1375,7 @@ export async function runEmbeddedPiAgent(
|
||||
|
||||
const rotated = await advanceAuthProfile();
|
||||
if (rotated) {
|
||||
logAssistantFailoverDecision("rotate_profile");
|
||||
await maybeBackoffBeforeOverloadFailover(assistantFailoverReason);
|
||||
continue;
|
||||
}
|
||||
@@ -1371,6 +1408,7 @@ export async function runEmbeddedPiAgent(
|
||||
const status =
|
||||
resolveFailoverStatus(assistantFailoverReason ?? "unknown") ??
|
||||
(isTimeoutErrorMessage(message) ? 408 : undefined);
|
||||
logAssistantFailoverDecision("fallback_model", { status });
|
||||
throw new FailoverError(message, {
|
||||
reason: assistantFailoverReason ?? "unknown",
|
||||
provider: activeErrorContext.provider,
|
||||
@@ -1379,6 +1417,7 @@ export async function runEmbeddedPiAgent(
|
||||
status,
|
||||
});
|
||||
}
|
||||
logAssistantFailoverDecision("surface_error");
|
||||
}
|
||||
|
||||
const usage = toNormalizedUsage(usageAccumulator);
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { normalizeFailoverDecisionObservationBase } from "./failover-observation.js";
|
||||
|
||||
describe("normalizeFailoverDecisionObservationBase", () => {
|
||||
it("fills timeout observation reasons for deadline timeouts without provider error text", () => {
|
||||
expect(
|
||||
normalizeFailoverDecisionObservationBase({
|
||||
stage: "assistant",
|
||||
runId: "run:timeout",
|
||||
rawError: "",
|
||||
failoverReason: null,
|
||||
profileFailureReason: null,
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
profileId: "openai:p1",
|
||||
fallbackConfigured: false,
|
||||
timedOut: true,
|
||||
aborted: false,
|
||||
}),
|
||||
).toMatchObject({
|
||||
failoverReason: "timeout",
|
||||
profileFailureReason: "timeout",
|
||||
timedOut: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves explicit failover reasons", () => {
|
||||
expect(
|
||||
normalizeFailoverDecisionObservationBase({
|
||||
stage: "assistant",
|
||||
runId: "run:overloaded",
|
||||
rawError: '{"error":{"type":"overloaded_error"}}',
|
||||
failoverReason: "overloaded",
|
||||
profileFailureReason: "overloaded",
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
profileId: "openai:p1",
|
||||
fallbackConfigured: true,
|
||||
timedOut: true,
|
||||
aborted: false,
|
||||
}),
|
||||
).toMatchObject({
|
||||
failoverReason: "overloaded",
|
||||
profileFailureReason: "overloaded",
|
||||
timedOut: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
76
src/agents/pi-embedded-runner/run/failover-observation.ts
Normal file
76
src/agents/pi-embedded-runner/run/failover-observation.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import { redactIdentifier } from "../../../logging/redact-identifier.js";
|
||||
import type { AuthProfileFailureReason } from "../../auth-profiles.js";
|
||||
import {
|
||||
buildApiErrorObservationFields,
|
||||
sanitizeForConsole,
|
||||
} from "../../pi-embedded-error-observation.js";
|
||||
import type { FailoverReason } from "../../pi-embedded-helpers.js";
|
||||
import { log } from "../logger.js";
|
||||
|
||||
export type FailoverDecisionLoggerInput = {
|
||||
stage: "prompt" | "assistant";
|
||||
decision: "rotate_profile" | "fallback_model" | "surface_error";
|
||||
runId?: string;
|
||||
rawError?: string;
|
||||
failoverReason: FailoverReason | null;
|
||||
profileFailureReason?: AuthProfileFailureReason | null;
|
||||
provider: string;
|
||||
model: string;
|
||||
profileId?: string;
|
||||
fallbackConfigured: boolean;
|
||||
timedOut?: boolean;
|
||||
aborted?: boolean;
|
||||
status?: number;
|
||||
};
|
||||
|
||||
export type FailoverDecisionLoggerBase = Omit<FailoverDecisionLoggerInput, "decision" | "status">;
|
||||
|
||||
export function normalizeFailoverDecisionObservationBase(
|
||||
base: FailoverDecisionLoggerBase,
|
||||
): FailoverDecisionLoggerBase {
|
||||
return {
|
||||
...base,
|
||||
failoverReason: base.failoverReason ?? (base.timedOut ? "timeout" : null),
|
||||
profileFailureReason: base.profileFailureReason ?? (base.timedOut ? "timeout" : null),
|
||||
};
|
||||
}
|
||||
|
||||
export function createFailoverDecisionLogger(
|
||||
base: FailoverDecisionLoggerBase,
|
||||
): (
|
||||
decision: FailoverDecisionLoggerInput["decision"],
|
||||
extra?: Pick<FailoverDecisionLoggerInput, "status">,
|
||||
) => void {
|
||||
const normalizedBase = normalizeFailoverDecisionObservationBase(base);
|
||||
const safeProfileId = normalizedBase.profileId
|
||||
? redactIdentifier(normalizedBase.profileId, { len: 12 })
|
||||
: undefined;
|
||||
const safeRunId = sanitizeForConsole(normalizedBase.runId) ?? "-";
|
||||
const safeProvider = sanitizeForConsole(normalizedBase.provider) ?? "-";
|
||||
const safeModel = sanitizeForConsole(normalizedBase.model) ?? "-";
|
||||
const profileText = safeProfileId ?? "-";
|
||||
const reasonText = normalizedBase.failoverReason ?? "none";
|
||||
return (decision, extra) => {
|
||||
const observedError = buildApiErrorObservationFields(normalizedBase.rawError);
|
||||
log.warn("embedded run failover decision", {
|
||||
event: "embedded_run_failover_decision",
|
||||
tags: ["error_handling", "failover", normalizedBase.stage, decision],
|
||||
runId: normalizedBase.runId,
|
||||
stage: normalizedBase.stage,
|
||||
decision,
|
||||
failoverReason: normalizedBase.failoverReason,
|
||||
profileFailureReason: normalizedBase.profileFailureReason,
|
||||
provider: normalizedBase.provider,
|
||||
model: normalizedBase.model,
|
||||
profileId: safeProfileId,
|
||||
fallbackConfigured: normalizedBase.fallbackConfigured,
|
||||
timedOut: normalizedBase.timedOut,
|
||||
aborted: normalizedBase.aborted,
|
||||
status: extra?.status,
|
||||
...observedError,
|
||||
consoleMessage:
|
||||
`embedded run failover decision: runId=${safeRunId} stage=${normalizedBase.stage} decision=${decision} ` +
|
||||
`reason=${reasonText} provider=${safeProvider}/${safeModel} profile=${profileText}`,
|
||||
});
|
||||
};
|
||||
}
|
||||
@@ -54,8 +54,13 @@ describe("handleAgentEnd", () => {
|
||||
|
||||
const warn = vi.mocked(ctx.log.warn);
|
||||
expect(warn).toHaveBeenCalledTimes(1);
|
||||
expect(warn.mock.calls[0]?.[0]).toContain("runId=run-1");
|
||||
expect(warn.mock.calls[0]?.[0]).toContain("error=connection refused");
|
||||
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
|
||||
expect(warn.mock.calls[0]?.[1]).toMatchObject({
|
||||
event: "embedded_run_agent_end",
|
||||
runId: "run-1",
|
||||
error: "connection refused",
|
||||
rawErrorPreview: "connection refused",
|
||||
});
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
@@ -65,6 +70,59 @@ describe("handleAgentEnd", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("attaches raw provider error metadata without changing the console message", () => {
|
||||
const ctx = createContext({
|
||||
role: "assistant",
|
||||
stopReason: "error",
|
||||
provider: "anthropic",
|
||||
model: "claude-test",
|
||||
errorMessage: '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}',
|
||||
content: [{ type: "text", text: "" }],
|
||||
});
|
||||
|
||||
handleAgentEnd(ctx);
|
||||
|
||||
const warn = vi.mocked(ctx.log.warn);
|
||||
expect(warn).toHaveBeenCalledTimes(1);
|
||||
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
|
||||
expect(warn.mock.calls[0]?.[1]).toMatchObject({
|
||||
event: "embedded_run_agent_end",
|
||||
runId: "run-1",
|
||||
error: "The AI service is temporarily overloaded. Please try again in a moment.",
|
||||
failoverReason: "overloaded",
|
||||
providerErrorType: "overloaded_error",
|
||||
});
|
||||
});
|
||||
|
||||
it("redacts logged error text before emitting lifecycle events", () => {
|
||||
const onAgentEvent = vi.fn();
|
||||
const ctx = createContext(
|
||||
{
|
||||
role: "assistant",
|
||||
stopReason: "error",
|
||||
errorMessage: "x-api-key: sk-abcdefghijklmnopqrstuvwxyz123456",
|
||||
content: [{ type: "text", text: "" }],
|
||||
},
|
||||
{ onAgentEvent },
|
||||
);
|
||||
|
||||
handleAgentEnd(ctx);
|
||||
|
||||
const warn = vi.mocked(ctx.log.warn);
|
||||
expect(warn.mock.calls[0]?.[1]).toMatchObject({
|
||||
event: "embedded_run_agent_end",
|
||||
error: "x-api-key: ***",
|
||||
rawErrorPreview: "x-api-key: ***",
|
||||
});
|
||||
expect(onAgentEvent).toHaveBeenCalledWith({
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
error: "x-api-key: ***",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps non-error run-end logging on debug only", () => {
|
||||
const ctx = createContext(undefined);
|
||||
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
import { createInlineCodeState } from "../markdown/code-spans.js";
|
||||
import { formatAssistantErrorText } from "./pi-embedded-helpers.js";
|
||||
import {
|
||||
buildApiErrorObservationFields,
|
||||
buildTextObservationFields,
|
||||
sanitizeForConsole,
|
||||
} from "./pi-embedded-error-observation.js";
|
||||
import { classifyFailoverReason, formatAssistantErrorText } from "./pi-embedded-helpers.js";
|
||||
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
|
||||
import { isAssistantMessage } from "./pi-embedded-utils.js";
|
||||
|
||||
@@ -36,16 +41,31 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
|
||||
provider: lastAssistant.provider,
|
||||
model: lastAssistant.model,
|
||||
});
|
||||
const rawError = lastAssistant.errorMessage?.trim();
|
||||
const failoverReason = classifyFailoverReason(rawError ?? "");
|
||||
const errorText = (friendlyError || lastAssistant.errorMessage || "LLM request failed.").trim();
|
||||
ctx.log.warn(
|
||||
`embedded run agent end: runId=${ctx.params.runId} isError=true error=${errorText}`,
|
||||
);
|
||||
const observedError = buildApiErrorObservationFields(rawError);
|
||||
const safeErrorText =
|
||||
buildTextObservationFields(errorText).textPreview ?? "LLM request failed.";
|
||||
const safeRunId = sanitizeForConsole(ctx.params.runId) ?? "-";
|
||||
ctx.log.warn("embedded run agent end", {
|
||||
event: "embedded_run_agent_end",
|
||||
tags: ["error_handling", "lifecycle", "agent_end", "assistant_error"],
|
||||
runId: ctx.params.runId,
|
||||
isError: true,
|
||||
error: safeErrorText,
|
||||
failoverReason,
|
||||
provider: lastAssistant.provider,
|
||||
model: lastAssistant.model,
|
||||
...observedError,
|
||||
consoleMessage: `embedded run agent end: runId=${safeRunId} isError=true error=${safeErrorText}`,
|
||||
});
|
||||
emitAgentEvent({
|
||||
runId: ctx.params.runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
error: errorText,
|
||||
error: safeErrorText,
|
||||
endedAt: Date.now(),
|
||||
},
|
||||
});
|
||||
@@ -53,7 +73,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
error: errorText,
|
||||
error: safeErrorText,
|
||||
},
|
||||
});
|
||||
} else {
|
||||
|
||||
@@ -12,8 +12,8 @@ import type {
|
||||
import type { NormalizedUsage } from "./usage.js";
|
||||
|
||||
export type EmbeddedSubscribeLogger = {
|
||||
debug: (message: string) => void;
|
||||
warn: (message: string) => void;
|
||||
debug: (message: string, meta?: Record<string, unknown>) => void;
|
||||
warn: (message: string, meta?: Record<string, unknown>) => void;
|
||||
};
|
||||
|
||||
export type ToolErrorSummary = {
|
||||
|
||||
@@ -16,7 +16,7 @@ export function registerCronCli(program: Command) {
|
||||
.addHelpText(
|
||||
"after",
|
||||
() =>
|
||||
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n`,
|
||||
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n${theme.muted("Upgrade tip:")} run \`openclaw doctor --fix\` to normalize legacy cron job storage.\n`,
|
||||
);
|
||||
|
||||
registerCronStatusCommand(cron);
|
||||
|
||||
269
src/commands/doctor-cron.test.ts
Normal file
269
src/commands/doctor-cron.test.ts
Normal file
@@ -0,0 +1,269 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import * as noteModule from "../terminal/note.js";
|
||||
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
|
||||
|
||||
let tempRoot: string | null = null;
|
||||
|
||||
async function makeTempStorePath() {
|
||||
tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-doctor-cron-"));
|
||||
return path.join(tempRoot, "cron", "jobs.json");
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
vi.restoreAllMocks();
|
||||
if (tempRoot) {
|
||||
await fs.rm(tempRoot, { recursive: true, force: true });
|
||||
tempRoot = null;
|
||||
}
|
||||
});
|
||||
|
||||
function makePrompter(confirmResult = true) {
|
||||
return {
|
||||
confirm: vi.fn().mockResolvedValue(confirmResult),
|
||||
};
|
||||
}
|
||||
|
||||
describe("maybeRepairLegacyCronStore", () => {
|
||||
it("repairs legacy cron store fields and migrates notify fallback to webhook delivery", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
jobId: "legacy-job",
|
||||
name: "Legacy job",
|
||||
notify: true,
|
||||
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
|
||||
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "Morning brief",
|
||||
},
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
|
||||
const cfg: OpenClawConfig = {
|
||||
cron: {
|
||||
store: storePath,
|
||||
webhook: "https://example.invalid/cron-finished",
|
||||
},
|
||||
};
|
||||
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg,
|
||||
options: {},
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
const [job] = persisted.jobs;
|
||||
expect(job?.jobId).toBeUndefined();
|
||||
expect(job?.id).toBe("legacy-job");
|
||||
expect(job?.notify).toBeUndefined();
|
||||
expect(job?.schedule).toMatchObject({
|
||||
kind: "cron",
|
||||
expr: "0 7 * * *",
|
||||
tz: "UTC",
|
||||
});
|
||||
expect(job?.delivery).toMatchObject({
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron-finished",
|
||||
});
|
||||
expect(job?.payload).toMatchObject({
|
||||
kind: "systemEvent",
|
||||
text: "Morning brief",
|
||||
});
|
||||
|
||||
expect(noteSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Legacy cron job storage detected"),
|
||||
"Cron",
|
||||
);
|
||||
expect(noteSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Cron store normalized"),
|
||||
"Doctor changes",
|
||||
);
|
||||
});
|
||||
|
||||
it("warns instead of replacing announce delivery for notify fallback jobs", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "notify-and-announce",
|
||||
name: "Notify and announce",
|
||||
notify: true,
|
||||
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "agentTurn", message: "Status" },
|
||||
delivery: { mode: "announce", channel: "telegram", to: "123" },
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
|
||||
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg: {
|
||||
cron: {
|
||||
store: storePath,
|
||||
webhook: "https://example.invalid/cron-finished",
|
||||
},
|
||||
},
|
||||
options: { nonInteractive: true },
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
expect(persisted.jobs[0]?.notify).toBe(true);
|
||||
expect(noteSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('uses legacy notify fallback alongside delivery mode "announce"'),
|
||||
"Doctor warnings",
|
||||
);
|
||||
});
|
||||
|
||||
it("does not auto-repair in non-interactive mode without explicit repair approval", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
jobId: "legacy-job",
|
||||
name: "Legacy job",
|
||||
notify: true,
|
||||
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
|
||||
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "Morning brief",
|
||||
},
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
|
||||
const prompter = makePrompter(false);
|
||||
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg: {
|
||||
cron: {
|
||||
store: storePath,
|
||||
webhook: "https://example.invalid/cron-finished",
|
||||
},
|
||||
},
|
||||
options: { nonInteractive: true },
|
||||
prompter,
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
expect(prompter.confirm).toHaveBeenCalledWith({
|
||||
message: "Repair legacy cron jobs now?",
|
||||
initialValue: true,
|
||||
});
|
||||
expect(persisted.jobs[0]?.jobId).toBe("legacy-job");
|
||||
expect(persisted.jobs[0]?.notify).toBe(true);
|
||||
expect(noteSpy).not.toHaveBeenCalledWith(
|
||||
expect.stringContaining("Cron store normalized"),
|
||||
"Doctor changes",
|
||||
);
|
||||
});
|
||||
|
||||
it("migrates notify fallback none delivery jobs to cron.webhook", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "notify-none",
|
||||
name: "Notify none",
|
||||
notify: true,
|
||||
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "Status",
|
||||
},
|
||||
delivery: { mode: "none", to: "123456789" },
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg: {
|
||||
cron: {
|
||||
store: storePath,
|
||||
webhook: "https://example.invalid/cron-finished",
|
||||
},
|
||||
},
|
||||
options: {},
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
expect(persisted.jobs[0]?.notify).toBeUndefined();
|
||||
expect(persisted.jobs[0]?.delivery).toMatchObject({
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron-finished",
|
||||
});
|
||||
});
|
||||
});
|
||||
183
src/commands/doctor-cron.ts
Normal file
183
src/commands/doctor-cron.ts
Normal file
@@ -0,0 +1,183 @@
|
||||
import { formatCliCommand } from "../cli/command-format.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { normalizeStoredCronJobs } from "../cron/store-migration.js";
|
||||
import { resolveCronStorePath, loadCronStore, saveCronStore } from "../cron/store.js";
|
||||
import type { CronJob } from "../cron/types.js";
|
||||
import { note } from "../terminal/note.js";
|
||||
import { shortenHomePath } from "../utils.js";
|
||||
import type { DoctorPrompter, DoctorOptions } from "./doctor-prompter.js";
|
||||
|
||||
type CronDoctorOutcome = {
|
||||
changed: boolean;
|
||||
warnings: string[];
|
||||
};
|
||||
|
||||
function pluralize(count: number, noun: string) {
|
||||
return `${count} ${noun}${count === 1 ? "" : "s"}`;
|
||||
}
|
||||
|
||||
function formatLegacyIssuePreview(issues: Partial<Record<string, number>>): string[] {
|
||||
const lines: string[] = [];
|
||||
if (issues.jobId) {
|
||||
lines.push(`- ${pluralize(issues.jobId, "job")} still uses legacy \`jobId\``);
|
||||
}
|
||||
if (issues.legacyScheduleString) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyScheduleString, "job")} stores schedule as a bare string`,
|
||||
);
|
||||
}
|
||||
if (issues.legacyScheduleCron) {
|
||||
lines.push(`- ${pluralize(issues.legacyScheduleCron, "job")} still uses \`schedule.cron\``);
|
||||
}
|
||||
if (issues.legacyPayloadKind) {
|
||||
lines.push(`- ${pluralize(issues.legacyPayloadKind, "job")} needs payload kind normalization`);
|
||||
}
|
||||
if (issues.legacyPayloadProvider) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyPayloadProvider, "job")} still uses payload \`provider\` as a delivery alias`,
|
||||
);
|
||||
}
|
||||
if (issues.legacyTopLevelPayloadFields) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyTopLevelPayloadFields, "job")} still uses top-level payload fields`,
|
||||
);
|
||||
}
|
||||
if (issues.legacyTopLevelDeliveryFields) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyTopLevelDeliveryFields, "job")} still uses top-level delivery fields`,
|
||||
);
|
||||
}
|
||||
if (issues.legacyDeliveryMode) {
|
||||
lines.push(
|
||||
`- ${pluralize(issues.legacyDeliveryMode, "job")} still uses delivery mode \`deliver\``,
|
||||
);
|
||||
}
|
||||
return lines;
|
||||
}
|
||||
|
||||
function trimString(value: unknown): string | undefined {
|
||||
return typeof value === "string" && value.trim() ? value.trim() : undefined;
|
||||
}
|
||||
|
||||
function migrateLegacyNotifyFallback(params: {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
legacyWebhook?: string;
|
||||
}): CronDoctorOutcome {
|
||||
let changed = false;
|
||||
const warnings: string[] = [];
|
||||
|
||||
for (const raw of params.jobs) {
|
||||
if (!("notify" in raw)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const jobName = trimString(raw.name) ?? trimString(raw.id) ?? "<unnamed>";
|
||||
const notify = raw.notify === true;
|
||||
if (!notify) {
|
||||
delete raw.notify;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
const delivery =
|
||||
raw.delivery && typeof raw.delivery === "object" && !Array.isArray(raw.delivery)
|
||||
? (raw.delivery as Record<string, unknown>)
|
||||
: null;
|
||||
const mode = trimString(delivery?.mode)?.toLowerCase();
|
||||
const to = trimString(delivery?.to);
|
||||
|
||||
if (mode === "webhook" && to) {
|
||||
delete raw.notify;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((mode === undefined || mode === "none" || mode === "webhook") && params.legacyWebhook) {
|
||||
raw.delivery = {
|
||||
...delivery,
|
||||
mode: "webhook",
|
||||
to: mode === "none" ? params.legacyWebhook : (to ?? params.legacyWebhook),
|
||||
};
|
||||
delete raw.notify;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!params.legacyWebhook) {
|
||||
warnings.push(
|
||||
`Cron job "${jobName}" still uses legacy notify fallback, but cron.webhook is unset so doctor cannot migrate it automatically.`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
warnings.push(
|
||||
`Cron job "${jobName}" uses legacy notify fallback alongside delivery mode "${mode}". Migrate it manually so webhook delivery does not replace existing announce behavior.`,
|
||||
);
|
||||
}
|
||||
|
||||
return { changed, warnings };
|
||||
}
|
||||
|
||||
export async function maybeRepairLegacyCronStore(params: {
|
||||
cfg: OpenClawConfig;
|
||||
options: DoctorOptions;
|
||||
prompter: Pick<DoctorPrompter, "confirm">;
|
||||
}) {
|
||||
const storePath = resolveCronStorePath(params.cfg.cron?.store);
|
||||
const store = await loadCronStore(storePath);
|
||||
const rawJobs = (store.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
||||
if (rawJobs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const normalized = normalizeStoredCronJobs(rawJobs);
|
||||
const legacyWebhook = trimString(params.cfg.cron?.webhook);
|
||||
const notifyCount = rawJobs.filter((job) => job.notify === true).length;
|
||||
const previewLines = formatLegacyIssuePreview(normalized.issues);
|
||||
if (notifyCount > 0) {
|
||||
previewLines.push(
|
||||
`- ${pluralize(notifyCount, "job")} still uses legacy \`notify: true\` webhook fallback`,
|
||||
);
|
||||
}
|
||||
if (previewLines.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
note(
|
||||
[
|
||||
`Legacy cron job storage detected at ${shortenHomePath(storePath)}.`,
|
||||
...previewLines,
|
||||
`Repair with ${formatCliCommand("openclaw doctor --fix")} to normalize the store before the next scheduler run.`,
|
||||
].join("\n"),
|
||||
"Cron",
|
||||
);
|
||||
|
||||
const shouldRepair = await params.prompter.confirm({
|
||||
message: "Repair legacy cron jobs now?",
|
||||
initialValue: true,
|
||||
});
|
||||
if (!shouldRepair) {
|
||||
return;
|
||||
}
|
||||
|
||||
const notifyMigration = migrateLegacyNotifyFallback({
|
||||
jobs: rawJobs,
|
||||
legacyWebhook,
|
||||
});
|
||||
const changed = normalized.mutated || notifyMigration.changed;
|
||||
if (!changed && notifyMigration.warnings.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (changed) {
|
||||
await saveCronStore(storePath, {
|
||||
version: 1,
|
||||
jobs: rawJobs as unknown as CronJob[],
|
||||
});
|
||||
note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes");
|
||||
}
|
||||
|
||||
if (notifyMigration.warnings.length > 0) {
|
||||
note(notifyMigration.warnings.join("\n"), "Doctor warnings");
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,7 @@ import {
|
||||
import { noteBootstrapFileSize } from "./doctor-bootstrap-size.js";
|
||||
import { doctorShellCompletion } from "./doctor-completion.js";
|
||||
import { loadAndMaybeMigrateDoctorConfig } from "./doctor-config-flow.js";
|
||||
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
|
||||
import { maybeRepairGatewayDaemon } from "./doctor-gateway-daemon-flow.js";
|
||||
import { checkGatewayHealth, probeGatewayMemoryStatus } from "./doctor-gateway-health.js";
|
||||
import {
|
||||
@@ -220,6 +221,11 @@ export async function doctorCommand(
|
||||
|
||||
await noteStateIntegrity(cfg, prompter, configResult.path ?? CONFIG_PATH);
|
||||
await noteSessionLockHealth({ shouldRepair: prompter.shouldRepair });
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg,
|
||||
options,
|
||||
prompter,
|
||||
});
|
||||
|
||||
cfg = await maybeRepairSandboxImages(cfg, runtime, prompter);
|
||||
noteSandboxScopeWarnings(cfg);
|
||||
|
||||
143
src/cron/delivery.failure-notify.test.ts
Normal file
143
src/cron/delivery.failure-notify.test.ts
Normal file
@@ -0,0 +1,143 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
resolveDeliveryTarget: vi.fn(),
|
||||
deliverOutboundPayloads: vi.fn(),
|
||||
resolveAgentOutboundIdentity: vi.fn().mockReturnValue({ kind: "identity" }),
|
||||
buildOutboundSessionContext: vi.fn().mockReturnValue({ kind: "session" }),
|
||||
createOutboundSendDeps: vi.fn().mockReturnValue({ kind: "deps" }),
|
||||
warn: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./isolated-agent/delivery-target.js", () => ({
|
||||
resolveDeliveryTarget: mocks.resolveDeliveryTarget,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/outbound/deliver.js", () => ({
|
||||
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/outbound/identity.js", () => ({
|
||||
resolveAgentOutboundIdentity: mocks.resolveAgentOutboundIdentity,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/outbound/session-context.js", () => ({
|
||||
buildOutboundSessionContext: mocks.buildOutboundSessionContext,
|
||||
}));
|
||||
|
||||
vi.mock("../cli/outbound-send-deps.js", () => ({
|
||||
createOutboundSendDeps: mocks.createOutboundSendDeps,
|
||||
}));
|
||||
|
||||
vi.mock("../logging.js", () => ({
|
||||
getChildLogger: vi.fn(() => ({
|
||||
warn: mocks.warn,
|
||||
})),
|
||||
}));
|
||||
|
||||
const { sendFailureNotificationAnnounce } = await import("./delivery.js");
|
||||
|
||||
describe("sendFailureNotificationAnnounce", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mocks.resolveDeliveryTarget.mockResolvedValue({
|
||||
ok: true,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
accountId: "bot-a",
|
||||
threadId: 42,
|
||||
mode: "explicit",
|
||||
});
|
||||
mocks.deliverOutboundPayloads.mockResolvedValue([{ ok: true }]);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("delivers failure alerts to the resolved explicit target with strict send settings", async () => {
|
||||
const deps = {} as never;
|
||||
const cfg = {} as never;
|
||||
|
||||
await sendFailureNotificationAnnounce(
|
||||
deps,
|
||||
cfg,
|
||||
"main",
|
||||
"job-1",
|
||||
{ channel: "telegram", to: "123", accountId: "bot-a" },
|
||||
"Cron failed",
|
||||
);
|
||||
|
||||
expect(mocks.resolveDeliveryTarget).toHaveBeenCalledWith(cfg, "main", {
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
accountId: "bot-a",
|
||||
});
|
||||
expect(mocks.buildOutboundSessionContext).toHaveBeenCalledWith({
|
||||
cfg,
|
||||
agentId: "main",
|
||||
sessionKey: "cron:job-1:failure",
|
||||
});
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
accountId: "bot-a",
|
||||
threadId: 42,
|
||||
payloads: [{ text: "Cron failed" }],
|
||||
session: { kind: "session" },
|
||||
identity: { kind: "identity" },
|
||||
bestEffort: false,
|
||||
deps: { kind: "deps" },
|
||||
abortSignal: expect.any(AbortSignal),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not send when target resolution fails", async () => {
|
||||
mocks.resolveDeliveryTarget.mockResolvedValue({
|
||||
ok: false,
|
||||
error: new Error("target missing"),
|
||||
});
|
||||
|
||||
await sendFailureNotificationAnnounce(
|
||||
{} as never,
|
||||
{} as never,
|
||||
"main",
|
||||
"job-1",
|
||||
{ channel: "telegram", to: "123" },
|
||||
"Cron failed",
|
||||
);
|
||||
|
||||
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
|
||||
expect(mocks.warn).toHaveBeenCalledWith(
|
||||
{ error: "target missing" },
|
||||
"cron: failed to resolve failure destination target",
|
||||
);
|
||||
});
|
||||
|
||||
it("swallows outbound delivery errors after logging", async () => {
|
||||
mocks.deliverOutboundPayloads.mockRejectedValue(new Error("send failed"));
|
||||
|
||||
await expect(
|
||||
sendFailureNotificationAnnounce(
|
||||
{} as never,
|
||||
{} as never,
|
||||
"main",
|
||||
"job-1",
|
||||
{ channel: "telegram", to: "123" },
|
||||
"Cron failed",
|
||||
),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(mocks.warn).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
err: "send failed",
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
}),
|
||||
"cron: failure destination announce failed",
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -148,6 +148,46 @@ describe("resolveFailureDestination", () => {
|
||||
expect(plan).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null when webhook failure destination matches the primary webhook target", () => {
|
||||
const plan = resolveFailureDestination(
|
||||
makeJob({
|
||||
sessionTarget: "main",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
delivery: {
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron",
|
||||
failureDestination: {
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron",
|
||||
},
|
||||
},
|
||||
}),
|
||||
undefined,
|
||||
);
|
||||
expect(plan).toBeNull();
|
||||
});
|
||||
|
||||
it("does not reuse inherited announce recipient when switching failure destination to webhook", () => {
|
||||
const plan = resolveFailureDestination(
|
||||
makeJob({
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "111",
|
||||
failureDestination: {
|
||||
mode: "webhook",
|
||||
},
|
||||
},
|
||||
}),
|
||||
{
|
||||
channel: "signal",
|
||||
to: "group-abc",
|
||||
mode: "announce",
|
||||
},
|
||||
);
|
||||
expect(plan).toBeNull();
|
||||
});
|
||||
|
||||
it("allows job-level failure destination fields to clear inherited global values", () => {
|
||||
const plan = resolveFailureDestination(
|
||||
makeJob({
|
||||
|
||||
@@ -54,6 +54,7 @@ export async function runTelegramAnnounceTurn(params: {
|
||||
to?: string;
|
||||
bestEffort?: boolean;
|
||||
};
|
||||
deliveryContract?: "cron-owned" | "shared";
|
||||
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
|
||||
return runCronIsolatedAgentTurn({
|
||||
cfg: makeCfg(params.home, params.storePath, {
|
||||
@@ -67,5 +68,6 @@ export async function runTelegramAnnounceTurn(params: {
|
||||
message: "do it",
|
||||
sessionKey: "cron:job-1",
|
||||
lane: "cron",
|
||||
deliveryContract: params.deliveryContract,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ async function runExplicitTelegramAnnounceTurn(params: {
|
||||
home: string;
|
||||
storePath: string;
|
||||
deps: CliDeps;
|
||||
deliveryContract?: "cron-owned" | "shared";
|
||||
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
|
||||
return runTelegramAnnounceTurn({
|
||||
...params,
|
||||
@@ -301,6 +302,7 @@ describe("runCronIsolatedAgentTurn", () => {
|
||||
home,
|
||||
storePath,
|
||||
deps,
|
||||
deliveryContract: "shared",
|
||||
});
|
||||
|
||||
expectDeliveredOk(res);
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
* returning so the timer correctly skips the system-event fallback.
|
||||
*/
|
||||
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
// --- Module mocks (must be hoisted before imports) ---
|
||||
|
||||
@@ -105,7 +105,6 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested
|
||||
resolvedDelivery,
|
||||
deliveryRequested: overrides.deliveryRequested ?? true,
|
||||
skipHeartbeatDelivery: false,
|
||||
skipMessagingToolDelivery: false,
|
||||
deliveryBestEffort: false,
|
||||
deliveryPayloadHasStructuredContent: false,
|
||||
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
|
||||
@@ -134,6 +133,10 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.unstubAllEnvs();
|
||||
});
|
||||
|
||||
it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
|
||||
// countActiveDescendantRuns returns >0 → enters wait block; still >0 after wait → early return
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(2);
|
||||
@@ -255,6 +258,42 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("retries transient direct announce failures before succeeding", async () => {
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
vi.mocked(deliverOutboundPayloads)
|
||||
.mockRejectedValueOnce(new Error("ECONNRESET while sending"))
|
||||
.mockResolvedValueOnce([{ ok: true } as never]);
|
||||
|
||||
const params = makeBaseParams({ synthesizedText: "Retry me once." });
|
||||
const state = await dispatchCronDelivery(params);
|
||||
|
||||
expect(state.result).toBeUndefined();
|
||||
expect(state.deliveryAttempted).toBe(true);
|
||||
expect(state.delivered).toBe(true);
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("does not retry permanent direct announce failures", async () => {
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
vi.mocked(deliverOutboundPayloads).mockRejectedValue(new Error("chat not found"));
|
||||
|
||||
const params = makeBaseParams({ synthesizedText: "This should fail once." });
|
||||
const state = await dispatchCronDelivery(params);
|
||||
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
expect(state.result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "error",
|
||||
error: "Error: chat not found",
|
||||
deliveryAttempted: true,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("no delivery requested means deliveryAttempted stays false and no delivery is sent", async () => {
|
||||
const params = makeBaseParams({
|
||||
synthesizedText: "Task done.",
|
||||
|
||||
@@ -96,4 +96,13 @@ describe("resolveCronDeliveryBestEffort", () => {
|
||||
} as never;
|
||||
expect(resolveCronDeliveryBestEffort(job)).toBe(true);
|
||||
});
|
||||
|
||||
it("lets explicit delivery.bestEffort=false override legacy payload bestEffortDeliver=true", async () => {
|
||||
const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js");
|
||||
const job = {
|
||||
delivery: { bestEffort: false },
|
||||
payload: { kind: "agentTurn", bestEffortDeliver: true },
|
||||
} as never;
|
||||
expect(resolveCronDeliveryBestEffort(job)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -83,7 +83,7 @@ type DispatchCronDeliveryParams = {
|
||||
resolvedDelivery: DeliveryTargetResolution;
|
||||
deliveryRequested: boolean;
|
||||
skipHeartbeatDelivery: boolean;
|
||||
skipMessagingToolDelivery: boolean;
|
||||
skipMessagingToolDelivery?: boolean;
|
||||
deliveryBestEffort: boolean;
|
||||
deliveryPayloadHasStructuredContent: boolean;
|
||||
deliveryPayloads: ReplyPayload[];
|
||||
@@ -192,15 +192,17 @@ async function retryTransientDirectCronDelivery<T>(params: {
|
||||
export async function dispatchCronDelivery(
|
||||
params: DispatchCronDeliveryParams,
|
||||
): Promise<DispatchCronDeliveryState> {
|
||||
const skipMessagingToolDelivery = params.skipMessagingToolDelivery === true;
|
||||
let summary = params.summary;
|
||||
let outputText = params.outputText;
|
||||
let synthesizedText = params.synthesizedText;
|
||||
let deliveryPayloads = params.deliveryPayloads;
|
||||
|
||||
// `true` means we confirmed at least one outbound send reached the target.
|
||||
// Keep this strict so timer fallback can safely decide whether to wake main.
|
||||
let delivered = params.skipMessagingToolDelivery;
|
||||
let deliveryAttempted = params.skipMessagingToolDelivery;
|
||||
// Shared callers can treat a matching message-tool send as the completed
|
||||
// delivery path. Cron-owned callers keep this false so direct cron delivery
|
||||
// remains the only source of delivered state.
|
||||
let delivered = skipMessagingToolDelivery;
|
||||
let deliveryAttempted = skipMessagingToolDelivery;
|
||||
const failDeliveryTarget = (error: string) =>
|
||||
params.withRunSession({
|
||||
status: "error",
|
||||
@@ -404,11 +406,7 @@ export async function dispatchCronDelivery(
|
||||
}
|
||||
};
|
||||
|
||||
if (
|
||||
params.deliveryRequested &&
|
||||
!params.skipHeartbeatDelivery &&
|
||||
!params.skipMessagingToolDelivery
|
||||
) {
|
||||
if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) {
|
||||
if (!params.resolvedDelivery.ok) {
|
||||
if (!params.deliveryBestEffort) {
|
||||
return {
|
||||
|
||||
@@ -55,7 +55,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
|
||||
restoreFastTestEnv(previousFastTestEnv);
|
||||
});
|
||||
|
||||
it('keeps the message tool enabled when delivery.mode is "none"', async () => {
|
||||
it('disables the message tool when delivery.mode is "none"', async () => {
|
||||
mockFallbackPassthrough();
|
||||
resolveCronDeliveryPlanMock.mockReturnValue({
|
||||
requested: false,
|
||||
@@ -65,7 +65,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
|
||||
await runCronIsolatedAgentTurn(makeParams());
|
||||
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
|
||||
});
|
||||
|
||||
it("disables the message tool when cron delivery is active", async () => {
|
||||
@@ -82,4 +82,20 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps the message tool enabled for shared callers when delivery is not requested", async () => {
|
||||
mockFallbackPassthrough();
|
||||
resolveCronDeliveryPlanMock.mockReturnValue({
|
||||
requested: false,
|
||||
mode: "none",
|
||||
});
|
||||
|
||||
await runCronIsolatedAgentTurn({
|
||||
...makeParams(),
|
||||
deliveryContract: "shared",
|
||||
});
|
||||
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -78,11 +78,10 @@ export type RunCronAgentTurnResult = {
|
||||
/** Last non-empty agent text output (not truncated). */
|
||||
outputText?: string;
|
||||
/**
|
||||
* `true` when the isolated run already delivered its output to the target
|
||||
* channel (via outbound payloads, the subagent announce flow, or a matching
|
||||
* messaging-tool send). Callers should skip posting a summary to the main
|
||||
* session to avoid duplicate
|
||||
* messages. See: https://github.com/openclaw/openclaw/issues/15692
|
||||
* `true` when the isolated runner already handled the run's user-visible
|
||||
* delivery outcome. Cron-owned callers use this for cron delivery or
|
||||
* explicit suppression; shared callers may also use it for a matching
|
||||
* message-tool send that already reached the target.
|
||||
*/
|
||||
delivered?: boolean;
|
||||
/**
|
||||
@@ -144,16 +143,22 @@ function buildCronAgentDefaultsConfig(params: {
|
||||
|
||||
type ResolvedCronDeliveryTarget = Awaited<ReturnType<typeof resolveDeliveryTarget>>;
|
||||
|
||||
type IsolatedDeliveryContract = "cron-owned" | "shared";
|
||||
|
||||
function resolveCronToolPolicy(params: {
|
||||
deliveryRequested: boolean;
|
||||
resolvedDelivery: ResolvedCronDeliveryTarget;
|
||||
deliveryContract: IsolatedDeliveryContract;
|
||||
}) {
|
||||
return {
|
||||
// Only enforce an explicit message target when the cron delivery target
|
||||
// was successfully resolved. When resolution fails the agent should not
|
||||
// be blocked by a target it cannot satisfy (#27898).
|
||||
requireExplicitMessageTarget: params.deliveryRequested && params.resolvedDelivery.ok,
|
||||
disableMessageTool: params.deliveryRequested,
|
||||
// Cron-owned runs always route user-facing delivery through the runner
|
||||
// itself. Shared callers keep the previous behavior so non-cron paths do
|
||||
// not silently lose the message tool when no explicit delivery is active.
|
||||
disableMessageTool: params.deliveryContract === "cron-owned" ? true : params.deliveryRequested,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -161,6 +166,7 @@ async function resolveCronDeliveryContext(params: {
|
||||
cfg: OpenClawConfig;
|
||||
job: CronJob;
|
||||
agentId: string;
|
||||
deliveryContract: IsolatedDeliveryContract;
|
||||
}) {
|
||||
const deliveryPlan = resolveCronDeliveryPlan(params.job);
|
||||
const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, {
|
||||
@@ -176,6 +182,7 @@ async function resolveCronDeliveryContext(params: {
|
||||
toolPolicy: resolveCronToolPolicy({
|
||||
deliveryRequested: deliveryPlan.requested,
|
||||
resolvedDelivery,
|
||||
deliveryContract: params.deliveryContract,
|
||||
}),
|
||||
};
|
||||
}
|
||||
@@ -200,6 +207,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
sessionKey: string;
|
||||
agentId?: string;
|
||||
lane?: string;
|
||||
deliveryContract?: IsolatedDeliveryContract;
|
||||
}): Promise<RunCronAgentTurnResult> {
|
||||
const abortSignal = params.abortSignal ?? params.signal;
|
||||
const isAborted = () => abortSignal?.aborted === true;
|
||||
@@ -210,6 +218,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
: "cron: job execution timed out";
|
||||
};
|
||||
const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1";
|
||||
const deliveryContract = params.deliveryContract ?? "cron-owned";
|
||||
const defaultAgentId = resolveDefaultAgentId(params.cfg);
|
||||
const requestedAgentId =
|
||||
typeof params.agentId === "string" && params.agentId.trim()
|
||||
@@ -425,6 +434,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
cfg: cfgWithAgentDefaults,
|
||||
job: params.job,
|
||||
agentId,
|
||||
deliveryContract,
|
||||
});
|
||||
|
||||
const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now);
|
||||
@@ -807,6 +817,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg);
|
||||
const skipHeartbeatDelivery = deliveryRequested && isHeartbeatOnlyResponse(payloads, ackMaxChars);
|
||||
const skipMessagingToolDelivery =
|
||||
deliveryContract === "shared" &&
|
||||
deliveryRequested &&
|
||||
finalRunResult.didSendViaMessagingTool === true &&
|
||||
(finalRunResult.messagingToolSentTargets ?? []).some((target) =>
|
||||
@@ -816,7 +827,6 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
accountId: resolvedDelivery.accountId,
|
||||
}),
|
||||
);
|
||||
|
||||
const deliveryResult = await dispatchCronDelivery({
|
||||
cfg: params.cfg,
|
||||
cfgWithAgentDefaults,
|
||||
|
||||
@@ -47,8 +47,12 @@ describe("isLikelyInterimCronMessage", () => {
|
||||
false,
|
||||
);
|
||||
});
|
||||
it("treats empty as interim", () => {
|
||||
expect(isLikelyInterimCronMessage("")).toBe(true);
|
||||
it("does not treat empty as interim (empty = NO_REPLY was stripped)", () => {
|
||||
expect(isLikelyInterimCronMessage("")).toBe(false);
|
||||
});
|
||||
|
||||
it("does not treat whitespace-only as interim", () => {
|
||||
expect(isLikelyInterimCronMessage(" ")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -42,7 +42,10 @@ function normalizeHintText(value: string): string {
|
||||
export function isLikelyInterimCronMessage(value: string): boolean {
|
||||
const normalized = normalizeHintText(value);
|
||||
if (!normalized) {
|
||||
return true;
|
||||
// Empty text after payload filtering means the agent either returned
|
||||
// NO_REPLY (deliberately silent) or produced no deliverable content.
|
||||
// Do not treat this as an interim acknowledgement that needs a rerun.
|
||||
return false;
|
||||
}
|
||||
const words = normalized.split(" ").filter(Boolean).length;
|
||||
return words <= 45 && INTERIM_CRON_HINTS.some((hint) => normalized.includes(hint));
|
||||
|
||||
@@ -86,7 +86,7 @@ describe("CronService delivery plan consistency", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("treats delivery object without mode as announce", async () => {
|
||||
it("treats delivery object without mode as announce without reviving legacy relay fallback", async () => {
|
||||
await withCronService({}, async ({ cron, enqueueSystemEvent }) => {
|
||||
const job = await addIsolatedAgentTurnJob(cron, {
|
||||
name: "partial-delivery",
|
||||
@@ -96,10 +96,8 @@ describe("CronService delivery plan consistency", () => {
|
||||
|
||||
const result = await cron.run(job.id, "force");
|
||||
expect(result).toEqual({ ok: true, ran: true });
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith(
|
||||
"Cron: done",
|
||||
expect.objectContaining({ agentId: undefined }),
|
||||
);
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(cron.getJob(job.id)?.state.lastDeliveryStatus).toBe("unknown");
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("still enqueues real cron summaries as system events", async () => {
|
||||
it("does not revive legacy main-session relay for real cron summaries", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.now();
|
||||
|
||||
@@ -109,10 +109,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
|
||||
|
||||
await runScheduledCron(cron);
|
||||
|
||||
// Real summaries SHOULD be enqueued.
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Weather update"),
|
||||
expect.objectContaining({ agentId: undefined }),
|
||||
);
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -620,14 +620,14 @@ describe("CronService", () => {
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("runs an isolated job and posts summary to main", async () => {
|
||||
it("runs an isolated job without posting a fallback summary to main", async () => {
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" }));
|
||||
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
|
||||
await createIsolatedAnnounceHarness(runIsolatedAgentJob);
|
||||
await runIsolatedAnnounceScenario({ cron, events, name: "weekly" });
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||
expectMainSystemEventPosted(enqueueSystemEvent, "Cron: done");
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
@@ -685,7 +685,7 @@ describe("CronService", () => {
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("posts last output to main even when isolated job errors", async () => {
|
||||
it("does not post a fallback main summary when an isolated job errors", async () => {
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({
|
||||
status: "error" as const,
|
||||
summary: "last output",
|
||||
@@ -700,8 +700,8 @@ describe("CronService", () => {
|
||||
status: "error",
|
||||
});
|
||||
|
||||
expectMainSystemEventPosted(enqueueSystemEvent, "Cron (error): last output");
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
|
||||
@@ -1,161 +1,10 @@
|
||||
import fs from "node:fs";
|
||||
import { normalizeLegacyDeliveryInput } from "../legacy-delivery.js";
|
||||
import { parseAbsoluteTimeMs } from "../parse.js";
|
||||
import { migrateLegacyCronPayload } from "../payload-migration.js";
|
||||
import { coerceFiniteScheduleNumber } from "../schedule.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js";
|
||||
import { normalizeStoredCronJobs } from "../store-migration.js";
|
||||
import { loadCronStore, saveCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { recomputeNextRuns } from "./jobs.js";
|
||||
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
|
||||
import type { CronServiceState } from "./state.js";
|
||||
|
||||
function normalizePayloadKind(payload: Record<string, unknown>) {
|
||||
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
|
||||
if (raw === "agentturn") {
|
||||
payload.kind = "agentTurn";
|
||||
return true;
|
||||
}
|
||||
if (raw === "systemevent") {
|
||||
payload.kind = "systemEvent";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function inferPayloadIfMissing(raw: Record<string, unknown>) {
|
||||
const message = typeof raw.message === "string" ? raw.message.trim() : "";
|
||||
const text = typeof raw.text === "string" ? raw.text.trim() : "";
|
||||
const command = typeof raw.command === "string" ? raw.command.trim() : "";
|
||||
if (message) {
|
||||
raw.payload = { kind: "agentTurn", message };
|
||||
return true;
|
||||
}
|
||||
if (text) {
|
||||
raw.payload = { kind: "systemEvent", text };
|
||||
return true;
|
||||
}
|
||||
if (command) {
|
||||
raw.payload = { kind: "systemEvent", text: command };
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function copyTopLevelAgentTurnFields(
|
||||
raw: Record<string, unknown>,
|
||||
payload: Record<string, unknown>,
|
||||
) {
|
||||
let mutated = false;
|
||||
|
||||
const copyTrimmedString = (field: "model" | "thinking") => {
|
||||
const existing = payload[field];
|
||||
if (typeof existing === "string" && existing.trim()) {
|
||||
return;
|
||||
}
|
||||
const value = raw[field];
|
||||
if (typeof value === "string" && value.trim()) {
|
||||
payload[field] = value.trim();
|
||||
mutated = true;
|
||||
}
|
||||
};
|
||||
copyTrimmedString("model");
|
||||
copyTrimmedString("thinking");
|
||||
|
||||
if (
|
||||
typeof payload.timeoutSeconds !== "number" &&
|
||||
typeof raw.timeoutSeconds === "number" &&
|
||||
Number.isFinite(raw.timeoutSeconds)
|
||||
) {
|
||||
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (
|
||||
typeof payload.allowUnsafeExternalContent !== "boolean" &&
|
||||
typeof raw.allowUnsafeExternalContent === "boolean"
|
||||
) {
|
||||
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
|
||||
payload.deliver = raw.deliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.channel !== "string" &&
|
||||
typeof raw.channel === "string" &&
|
||||
raw.channel.trim()
|
||||
) {
|
||||
payload.channel = raw.channel.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
|
||||
payload.to = raw.to.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.bestEffortDeliver !== "boolean" &&
|
||||
typeof raw.bestEffortDeliver === "boolean"
|
||||
) {
|
||||
payload.bestEffortDeliver = raw.bestEffortDeliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.provider !== "string" &&
|
||||
typeof raw.provider === "string" &&
|
||||
raw.provider.trim()
|
||||
) {
|
||||
payload.provider = raw.provider.trim();
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
return mutated;
|
||||
}
|
||||
|
||||
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
|
||||
if ("model" in raw) {
|
||||
delete raw.model;
|
||||
}
|
||||
if ("thinking" in raw) {
|
||||
delete raw.thinking;
|
||||
}
|
||||
if ("timeoutSeconds" in raw) {
|
||||
delete raw.timeoutSeconds;
|
||||
}
|
||||
if ("allowUnsafeExternalContent" in raw) {
|
||||
delete raw.allowUnsafeExternalContent;
|
||||
}
|
||||
if ("message" in raw) {
|
||||
delete raw.message;
|
||||
}
|
||||
if ("text" in raw) {
|
||||
delete raw.text;
|
||||
}
|
||||
if ("deliver" in raw) {
|
||||
delete raw.deliver;
|
||||
}
|
||||
if ("channel" in raw) {
|
||||
delete raw.channel;
|
||||
}
|
||||
if ("to" in raw) {
|
||||
delete raw.to;
|
||||
}
|
||||
if ("bestEffortDeliver" in raw) {
|
||||
delete raw.bestEffortDeliver;
|
||||
}
|
||||
if ("provider" in raw) {
|
||||
delete raw.provider;
|
||||
}
|
||||
if ("command" in raw) {
|
||||
delete raw.command;
|
||||
}
|
||||
if ("timeout" in raw) {
|
||||
delete raw.timeout;
|
||||
}
|
||||
}
|
||||
|
||||
async function getFileMtimeMs(path: string): Promise<number | null> {
|
||||
try {
|
||||
const stats = await fs.promises.stat(path);
|
||||
@@ -185,287 +34,7 @@ export async function ensureLoaded(
|
||||
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
|
||||
const loaded = await loadCronStore(state.deps.storePath);
|
||||
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
||||
let mutated = false;
|
||||
for (const raw of jobs) {
|
||||
const state = raw.state;
|
||||
if (!state || typeof state !== "object" || Array.isArray(state)) {
|
||||
raw.state = {};
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
|
||||
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
|
||||
if (!rawId && legacyJobId) {
|
||||
raw.id = legacyJobId;
|
||||
mutated = true;
|
||||
} else if (rawId && raw.id !== rawId) {
|
||||
raw.id = rawId;
|
||||
mutated = true;
|
||||
}
|
||||
if ("jobId" in raw) {
|
||||
delete raw.jobId;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (typeof raw.schedule === "string") {
|
||||
const expr = raw.schedule.trim();
|
||||
raw.schedule = { kind: "cron", expr };
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const nameRaw = raw.name;
|
||||
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
|
||||
raw.name = inferLegacyName({
|
||||
schedule: raw.schedule as never,
|
||||
payload: raw.payload as never,
|
||||
});
|
||||
mutated = true;
|
||||
} else {
|
||||
raw.name = nameRaw.trim();
|
||||
}
|
||||
|
||||
const desc = normalizeOptionalText(raw.description);
|
||||
if (raw.description !== desc) {
|
||||
raw.description = desc;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if ("sessionKey" in raw) {
|
||||
const sessionKey =
|
||||
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
|
||||
if (raw.sessionKey !== sessionKey) {
|
||||
raw.sessionKey = sessionKey;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof raw.enabled !== "boolean") {
|
||||
raw.enabled = true;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
|
||||
if (wakeModeRaw === "next-heartbeat") {
|
||||
if (raw.wakeMode !== "next-heartbeat") {
|
||||
raw.wakeMode = "next-heartbeat";
|
||||
mutated = true;
|
||||
}
|
||||
} else if (wakeModeRaw === "now") {
|
||||
if (raw.wakeMode !== "now") {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payload = raw.payload;
|
||||
if (
|
||||
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
|
||||
inferPayloadIfMissing(raw)
|
||||
) {
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payloadRecord =
|
||||
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
|
||||
? (raw.payload as Record<string, unknown>)
|
||||
: null;
|
||||
|
||||
if (payloadRecord) {
|
||||
if (normalizePayloadKind(payloadRecord)) {
|
||||
mutated = true;
|
||||
}
|
||||
if (!payloadRecord.kind) {
|
||||
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
|
||||
payloadRecord.kind = "agentTurn";
|
||||
mutated = true;
|
||||
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
|
||||
payloadRecord.kind = "systemEvent";
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
if (payloadRecord.kind === "agentTurn") {
|
||||
if (copyTopLevelAgentTurnFields(raw, payloadRecord)) {
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const hadLegacyTopLevelFields =
|
||||
"model" in raw ||
|
||||
"thinking" in raw ||
|
||||
"timeoutSeconds" in raw ||
|
||||
"allowUnsafeExternalContent" in raw ||
|
||||
"message" in raw ||
|
||||
"text" in raw ||
|
||||
"deliver" in raw ||
|
||||
"channel" in raw ||
|
||||
"to" in raw ||
|
||||
"bestEffortDeliver" in raw ||
|
||||
"provider" in raw ||
|
||||
"command" in raw ||
|
||||
"timeout" in raw;
|
||||
if (hadLegacyTopLevelFields) {
|
||||
stripLegacyTopLevelFields(raw);
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (payloadRecord) {
|
||||
if (migrateLegacyCronPayload(payloadRecord)) {
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const schedule = raw.schedule;
|
||||
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
|
||||
const sched = schedule as Record<string, unknown>;
|
||||
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
|
||||
if (!kind && ("at" in sched || "atMs" in sched)) {
|
||||
sched.kind = "at";
|
||||
mutated = true;
|
||||
}
|
||||
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
|
||||
const atMsRaw = sched.atMs;
|
||||
const parsedAtMs =
|
||||
typeof atMsRaw === "number"
|
||||
? atMsRaw
|
||||
: typeof atMsRaw === "string"
|
||||
? parseAbsoluteTimeMs(atMsRaw)
|
||||
: atRaw
|
||||
? parseAbsoluteTimeMs(atRaw)
|
||||
: null;
|
||||
if (parsedAtMs !== null) {
|
||||
sched.at = new Date(parsedAtMs).toISOString();
|
||||
if ("atMs" in sched) {
|
||||
delete sched.atMs;
|
||||
}
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const everyMsRaw = sched.everyMs;
|
||||
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
|
||||
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
|
||||
if (everyMs !== null && everyMsRaw !== everyMs) {
|
||||
sched.everyMs = everyMs;
|
||||
mutated = true;
|
||||
}
|
||||
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
|
||||
const anchorRaw = sched.anchorMs;
|
||||
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
|
||||
const normalizedAnchor =
|
||||
anchorCoerced !== undefined
|
||||
? Math.max(0, Math.floor(anchorCoerced))
|
||||
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
|
||||
? Math.max(0, Math.floor(raw.createdAtMs))
|
||||
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
|
||||
? Math.max(0, Math.floor(raw.updatedAtMs))
|
||||
: null;
|
||||
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
|
||||
sched.anchorMs = normalizedAnchor;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
|
||||
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
|
||||
let normalizedExpr = exprRaw;
|
||||
if (!normalizedExpr && legacyCronRaw) {
|
||||
normalizedExpr = legacyCronRaw;
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
}
|
||||
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
}
|
||||
if ("cron" in sched) {
|
||||
delete sched.cron;
|
||||
mutated = true;
|
||||
}
|
||||
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
|
||||
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
|
||||
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
|
||||
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
|
||||
if (targetStaggerMs === undefined) {
|
||||
if ("staggerMs" in sched) {
|
||||
delete sched.staggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (sched.staggerMs !== targetStaggerMs) {
|
||||
sched.staggerMs = targetStaggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const delivery = raw.delivery;
|
||||
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
|
||||
const modeRaw = (delivery as { mode?: unknown }).mode;
|
||||
if (typeof modeRaw === "string") {
|
||||
const lowered = modeRaw.trim().toLowerCase();
|
||||
if (lowered === "deliver") {
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
}
|
||||
} else if (modeRaw === undefined || modeRaw === null) {
|
||||
// Explicitly persist the default so existing jobs don't silently
|
||||
// change behaviour when the runtime default shifts.
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const isolation = raw.isolation;
|
||||
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
|
||||
delete raw.isolation;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payloadKind =
|
||||
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
|
||||
const normalizedSessionTarget =
|
||||
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
|
||||
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
|
||||
if (raw.sessionTarget !== normalizedSessionTarget) {
|
||||
raw.sessionTarget = normalizedSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
|
||||
if (raw.sessionTarget !== inferredSessionTarget) {
|
||||
raw.sessionTarget = inferredSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const sessionTarget =
|
||||
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
|
||||
const isIsolatedAgentTurn =
|
||||
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
|
||||
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
|
||||
const normalizedLegacy = normalizeLegacyDeliveryInput({
|
||||
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
|
||||
payload: payloadRecord,
|
||||
});
|
||||
|
||||
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
|
||||
if (!hasDelivery && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
} else if (!hasDelivery) {
|
||||
raw.delivery = { mode: "announce" };
|
||||
mutated = true;
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
const { mutated } = normalizeStoredCronJobs(jobs);
|
||||
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
|
||||
state.storeLoadedAtMs = state.deps.nowMs();
|
||||
state.storeFileMtimeMs = fileMtimeMs;
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
|
||||
import { isCronSystemEvent } from "../../infra/heartbeat-events-filter.js";
|
||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
|
||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
|
||||
import { sweepCronRunSessions } from "../session-reaper.js";
|
||||
import type {
|
||||
CronDeliveryStatus,
|
||||
@@ -1138,46 +1136,6 @@ export async function executeJobCore(
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
|
||||
// Post a short summary back to the main session only when announce
|
||||
// delivery was requested and we are confident no outbound delivery path
|
||||
// ran. If delivery was attempted but final ack is uncertain, suppress the
|
||||
// main summary to avoid duplicate user-facing sends.
|
||||
// See: https://github.com/openclaw/openclaw/issues/15692
|
||||
//
|
||||
// Also suppress heartbeat-only summaries (e.g. "HEARTBEAT_OK") — these
|
||||
// are internal ack tokens that should never leak into user conversations.
|
||||
// See: https://github.com/openclaw/openclaw/issues/32013
|
||||
const summaryText = res.summary?.trim();
|
||||
const deliveryPlan = resolveCronDeliveryPlan(job);
|
||||
const suppressMainSummary =
|
||||
res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested;
|
||||
if (
|
||||
shouldEnqueueCronMainSummary({
|
||||
summaryText,
|
||||
deliveryRequested: deliveryPlan.requested,
|
||||
delivered: res.delivered,
|
||||
deliveryAttempted: res.deliveryAttempted,
|
||||
suppressMainSummary,
|
||||
isCronSystemEvent,
|
||||
})
|
||||
) {
|
||||
const prefix = "Cron";
|
||||
const label =
|
||||
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
|
||||
state.deps.enqueueSystemEvent(label, {
|
||||
agentId: job.agentId,
|
||||
sessionKey: job.sessionKey,
|
||||
contextKey: `cron:${job.id}`,
|
||||
});
|
||||
if (job.wakeMode === "now") {
|
||||
state.deps.requestHeartbeatNow({
|
||||
reason: `cron:${job.id}`,
|
||||
agentId: job.agentId,
|
||||
sessionKey: job.sessionKey,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
status: res.status,
|
||||
error: res.error,
|
||||
|
||||
78
src/cron/store-migration.test.ts
Normal file
78
src/cron/store-migration.test.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { normalizeStoredCronJobs } from "./store-migration.js";
|
||||
|
||||
describe("normalizeStoredCronJobs", () => {
|
||||
it("normalizes legacy cron fields and reports migration issues", () => {
|
||||
const jobs = [
|
||||
{
|
||||
jobId: "legacy-job",
|
||||
schedule: { kind: "cron", cron: "*/5 * * * *", tz: "UTC" },
|
||||
message: "say hi",
|
||||
model: "openai/gpt-4.1",
|
||||
deliver: true,
|
||||
provider: " TeLeGrAm ",
|
||||
to: "12345",
|
||||
},
|
||||
] as Array<Record<string, unknown>>;
|
||||
|
||||
const result = normalizeStoredCronJobs(jobs);
|
||||
|
||||
expect(result.mutated).toBe(true);
|
||||
expect(result.issues).toMatchObject({
|
||||
jobId: 1,
|
||||
legacyScheduleCron: 1,
|
||||
legacyTopLevelPayloadFields: 1,
|
||||
legacyTopLevelDeliveryFields: 1,
|
||||
});
|
||||
|
||||
const [job] = jobs;
|
||||
expect(job?.jobId).toBeUndefined();
|
||||
expect(job?.id).toBe("legacy-job");
|
||||
expect(job?.schedule).toMatchObject({
|
||||
kind: "cron",
|
||||
expr: "*/5 * * * *",
|
||||
tz: "UTC",
|
||||
});
|
||||
expect(job?.message).toBeUndefined();
|
||||
expect(job?.provider).toBeUndefined();
|
||||
expect(job?.delivery).toMatchObject({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "12345",
|
||||
});
|
||||
expect(job?.payload).toMatchObject({
|
||||
kind: "agentTurn",
|
||||
message: "say hi",
|
||||
model: "openai/gpt-4.1",
|
||||
});
|
||||
});
|
||||
|
||||
it("normalizes payload provider alias into channel", () => {
|
||||
const jobs = [
|
||||
{
|
||||
id: "legacy-provider",
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "ping",
|
||||
provider: " Slack ",
|
||||
},
|
||||
},
|
||||
] as Array<Record<string, unknown>>;
|
||||
|
||||
const result = normalizeStoredCronJobs(jobs);
|
||||
|
||||
expect(result.mutated).toBe(true);
|
||||
expect(result.issues.legacyPayloadProvider).toBe(1);
|
||||
expect(jobs[0]?.payload).toMatchObject({
|
||||
kind: "agentTurn",
|
||||
message: "ping",
|
||||
});
|
||||
const payload = jobs[0]?.payload as Record<string, unknown> | undefined;
|
||||
expect(payload?.provider).toBeUndefined();
|
||||
expect(jobs[0]?.delivery).toMatchObject({
|
||||
mode: "announce",
|
||||
channel: "slack",
|
||||
});
|
||||
});
|
||||
});
|
||||
491
src/cron/store-migration.ts
Normal file
491
src/cron/store-migration.ts
Normal file
@@ -0,0 +1,491 @@
|
||||
import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js";
|
||||
import { parseAbsoluteTimeMs } from "./parse.js";
|
||||
import { migrateLegacyCronPayload } from "./payload-migration.js";
|
||||
import { coerceFiniteScheduleNumber } from "./schedule.js";
|
||||
import { inferLegacyName, normalizeOptionalText } from "./service/normalize.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js";
|
||||
|
||||
type CronStoreIssueKey =
|
||||
| "jobId"
|
||||
| "legacyScheduleString"
|
||||
| "legacyScheduleCron"
|
||||
| "legacyPayloadKind"
|
||||
| "legacyPayloadProvider"
|
||||
| "legacyTopLevelPayloadFields"
|
||||
| "legacyTopLevelDeliveryFields"
|
||||
| "legacyDeliveryMode";
|
||||
|
||||
type CronStoreIssues = Partial<Record<CronStoreIssueKey, number>>;
|
||||
|
||||
type NormalizeCronStoreJobsResult = {
|
||||
issues: CronStoreIssues;
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
mutated: boolean;
|
||||
};
|
||||
|
||||
function incrementIssue(issues: CronStoreIssues, key: CronStoreIssueKey) {
|
||||
issues[key] = (issues[key] ?? 0) + 1;
|
||||
}
|
||||
|
||||
function normalizePayloadKind(payload: Record<string, unknown>) {
|
||||
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
|
||||
if (raw === "agentturn") {
|
||||
payload.kind = "agentTurn";
|
||||
return true;
|
||||
}
|
||||
if (raw === "systemevent") {
|
||||
payload.kind = "systemEvent";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function inferPayloadIfMissing(raw: Record<string, unknown>) {
|
||||
const message = typeof raw.message === "string" ? raw.message.trim() : "";
|
||||
const text = typeof raw.text === "string" ? raw.text.trim() : "";
|
||||
const command = typeof raw.command === "string" ? raw.command.trim() : "";
|
||||
if (message) {
|
||||
raw.payload = { kind: "agentTurn", message };
|
||||
return true;
|
||||
}
|
||||
if (text) {
|
||||
raw.payload = { kind: "systemEvent", text };
|
||||
return true;
|
||||
}
|
||||
if (command) {
|
||||
raw.payload = { kind: "systemEvent", text: command };
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function copyTopLevelAgentTurnFields(
|
||||
raw: Record<string, unknown>,
|
||||
payload: Record<string, unknown>,
|
||||
) {
|
||||
let mutated = false;
|
||||
|
||||
const copyTrimmedString = (field: "model" | "thinking") => {
|
||||
const existing = payload[field];
|
||||
if (typeof existing === "string" && existing.trim()) {
|
||||
return;
|
||||
}
|
||||
const value = raw[field];
|
||||
if (typeof value === "string" && value.trim()) {
|
||||
payload[field] = value.trim();
|
||||
mutated = true;
|
||||
}
|
||||
};
|
||||
copyTrimmedString("model");
|
||||
copyTrimmedString("thinking");
|
||||
|
||||
if (
|
||||
typeof payload.timeoutSeconds !== "number" &&
|
||||
typeof raw.timeoutSeconds === "number" &&
|
||||
Number.isFinite(raw.timeoutSeconds)
|
||||
) {
|
||||
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (
|
||||
typeof payload.allowUnsafeExternalContent !== "boolean" &&
|
||||
typeof raw.allowUnsafeExternalContent === "boolean"
|
||||
) {
|
||||
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
|
||||
payload.deliver = raw.deliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.channel !== "string" &&
|
||||
typeof raw.channel === "string" &&
|
||||
raw.channel.trim()
|
||||
) {
|
||||
payload.channel = raw.channel.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
|
||||
payload.to = raw.to.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.bestEffortDeliver !== "boolean" &&
|
||||
typeof raw.bestEffortDeliver === "boolean"
|
||||
) {
|
||||
payload.bestEffortDeliver = raw.bestEffortDeliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.provider !== "string" &&
|
||||
typeof raw.provider === "string" &&
|
||||
raw.provider.trim()
|
||||
) {
|
||||
payload.provider = raw.provider.trim();
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
return mutated;
|
||||
}
|
||||
|
||||
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
|
||||
if ("model" in raw) {
|
||||
delete raw.model;
|
||||
}
|
||||
if ("thinking" in raw) {
|
||||
delete raw.thinking;
|
||||
}
|
||||
if ("timeoutSeconds" in raw) {
|
||||
delete raw.timeoutSeconds;
|
||||
}
|
||||
if ("allowUnsafeExternalContent" in raw) {
|
||||
delete raw.allowUnsafeExternalContent;
|
||||
}
|
||||
if ("message" in raw) {
|
||||
delete raw.message;
|
||||
}
|
||||
if ("text" in raw) {
|
||||
delete raw.text;
|
||||
}
|
||||
if ("deliver" in raw) {
|
||||
delete raw.deliver;
|
||||
}
|
||||
if ("channel" in raw) {
|
||||
delete raw.channel;
|
||||
}
|
||||
if ("to" in raw) {
|
||||
delete raw.to;
|
||||
}
|
||||
if ("bestEffortDeliver" in raw) {
|
||||
delete raw.bestEffortDeliver;
|
||||
}
|
||||
if ("provider" in raw) {
|
||||
delete raw.provider;
|
||||
}
|
||||
if ("command" in raw) {
|
||||
delete raw.command;
|
||||
}
|
||||
if ("timeout" in raw) {
|
||||
delete raw.timeout;
|
||||
}
|
||||
}
|
||||
|
||||
export function normalizeStoredCronJobs(
|
||||
jobs: Array<Record<string, unknown>>,
|
||||
): NormalizeCronStoreJobsResult {
|
||||
const issues: CronStoreIssues = {};
|
||||
let mutated = false;
|
||||
|
||||
for (const raw of jobs) {
|
||||
const jobIssues = new Set<CronStoreIssueKey>();
|
||||
const trackIssue = (key: CronStoreIssueKey) => {
|
||||
if (jobIssues.has(key)) {
|
||||
return;
|
||||
}
|
||||
jobIssues.add(key);
|
||||
incrementIssue(issues, key);
|
||||
};
|
||||
|
||||
const state = raw.state;
|
||||
if (!state || typeof state !== "object" || Array.isArray(state)) {
|
||||
raw.state = {};
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
|
||||
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
|
||||
if (!rawId && legacyJobId) {
|
||||
raw.id = legacyJobId;
|
||||
mutated = true;
|
||||
trackIssue("jobId");
|
||||
} else if (rawId && raw.id !== rawId) {
|
||||
raw.id = rawId;
|
||||
mutated = true;
|
||||
}
|
||||
if ("jobId" in raw) {
|
||||
delete raw.jobId;
|
||||
mutated = true;
|
||||
trackIssue("jobId");
|
||||
}
|
||||
|
||||
if (typeof raw.schedule === "string") {
|
||||
const expr = raw.schedule.trim();
|
||||
raw.schedule = { kind: "cron", expr };
|
||||
mutated = true;
|
||||
trackIssue("legacyScheduleString");
|
||||
}
|
||||
|
||||
const nameRaw = raw.name;
|
||||
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
|
||||
raw.name = inferLegacyName({
|
||||
schedule: raw.schedule as never,
|
||||
payload: raw.payload as never,
|
||||
});
|
||||
mutated = true;
|
||||
} else {
|
||||
raw.name = nameRaw.trim();
|
||||
}
|
||||
|
||||
const desc = normalizeOptionalText(raw.description);
|
||||
if (raw.description !== desc) {
|
||||
raw.description = desc;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if ("sessionKey" in raw) {
|
||||
const sessionKey =
|
||||
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
|
||||
if (raw.sessionKey !== sessionKey) {
|
||||
raw.sessionKey = sessionKey;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof raw.enabled !== "boolean") {
|
||||
raw.enabled = true;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
|
||||
if (wakeModeRaw === "next-heartbeat") {
|
||||
if (raw.wakeMode !== "next-heartbeat") {
|
||||
raw.wakeMode = "next-heartbeat";
|
||||
mutated = true;
|
||||
}
|
||||
} else if (wakeModeRaw === "now") {
|
||||
if (raw.wakeMode !== "now") {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payload = raw.payload;
|
||||
if (
|
||||
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
|
||||
inferPayloadIfMissing(raw)
|
||||
) {
|
||||
mutated = true;
|
||||
trackIssue("legacyTopLevelPayloadFields");
|
||||
}
|
||||
|
||||
const payloadRecord =
|
||||
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
|
||||
? (raw.payload as Record<string, unknown>)
|
||||
: null;
|
||||
|
||||
if (payloadRecord) {
|
||||
if (normalizePayloadKind(payloadRecord)) {
|
||||
mutated = true;
|
||||
trackIssue("legacyPayloadKind");
|
||||
}
|
||||
if (!payloadRecord.kind) {
|
||||
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
|
||||
payloadRecord.kind = "agentTurn";
|
||||
mutated = true;
|
||||
trackIssue("legacyPayloadKind");
|
||||
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
|
||||
payloadRecord.kind = "systemEvent";
|
||||
mutated = true;
|
||||
trackIssue("legacyPayloadKind");
|
||||
}
|
||||
}
|
||||
if (payloadRecord.kind === "agentTurn" && copyTopLevelAgentTurnFields(raw, payloadRecord)) {
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const hadLegacyTopLevelPayloadFields =
|
||||
"model" in raw ||
|
||||
"thinking" in raw ||
|
||||
"timeoutSeconds" in raw ||
|
||||
"allowUnsafeExternalContent" in raw ||
|
||||
"message" in raw ||
|
||||
"text" in raw ||
|
||||
"command" in raw ||
|
||||
"timeout" in raw;
|
||||
const hadLegacyTopLevelDeliveryFields =
|
||||
"deliver" in raw ||
|
||||
"channel" in raw ||
|
||||
"to" in raw ||
|
||||
"bestEffortDeliver" in raw ||
|
||||
"provider" in raw;
|
||||
if (hadLegacyTopLevelPayloadFields || hadLegacyTopLevelDeliveryFields) {
|
||||
stripLegacyTopLevelFields(raw);
|
||||
mutated = true;
|
||||
if (hadLegacyTopLevelPayloadFields) {
|
||||
trackIssue("legacyTopLevelPayloadFields");
|
||||
}
|
||||
if (hadLegacyTopLevelDeliveryFields) {
|
||||
trackIssue("legacyTopLevelDeliveryFields");
|
||||
}
|
||||
}
|
||||
|
||||
if (payloadRecord) {
|
||||
const hadLegacyPayloadProvider =
|
||||
typeof payloadRecord.provider === "string" && payloadRecord.provider.trim().length > 0;
|
||||
if (migrateLegacyCronPayload(payloadRecord)) {
|
||||
mutated = true;
|
||||
if (hadLegacyPayloadProvider) {
|
||||
trackIssue("legacyPayloadProvider");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const schedule = raw.schedule;
|
||||
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
|
||||
const sched = schedule as Record<string, unknown>;
|
||||
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
|
||||
if (!kind && ("at" in sched || "atMs" in sched)) {
|
||||
sched.kind = "at";
|
||||
mutated = true;
|
||||
}
|
||||
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
|
||||
const atMsRaw = sched.atMs;
|
||||
const parsedAtMs =
|
||||
typeof atMsRaw === "number"
|
||||
? atMsRaw
|
||||
: typeof atMsRaw === "string"
|
||||
? parseAbsoluteTimeMs(atMsRaw)
|
||||
: atRaw
|
||||
? parseAbsoluteTimeMs(atRaw)
|
||||
: null;
|
||||
if (parsedAtMs !== null) {
|
||||
sched.at = new Date(parsedAtMs).toISOString();
|
||||
if ("atMs" in sched) {
|
||||
delete sched.atMs;
|
||||
}
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const everyMsRaw = sched.everyMs;
|
||||
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
|
||||
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
|
||||
if (everyMs !== null && everyMsRaw !== everyMs) {
|
||||
sched.everyMs = everyMs;
|
||||
mutated = true;
|
||||
}
|
||||
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
|
||||
const anchorRaw = sched.anchorMs;
|
||||
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
|
||||
const normalizedAnchor =
|
||||
anchorCoerced !== undefined
|
||||
? Math.max(0, Math.floor(anchorCoerced))
|
||||
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
|
||||
? Math.max(0, Math.floor(raw.createdAtMs))
|
||||
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
|
||||
? Math.max(0, Math.floor(raw.updatedAtMs))
|
||||
: null;
|
||||
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
|
||||
sched.anchorMs = normalizedAnchor;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
|
||||
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
|
||||
let normalizedExpr = exprRaw;
|
||||
if (!normalizedExpr && legacyCronRaw) {
|
||||
normalizedExpr = legacyCronRaw;
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
trackIssue("legacyScheduleCron");
|
||||
}
|
||||
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
}
|
||||
if ("cron" in sched) {
|
||||
delete sched.cron;
|
||||
mutated = true;
|
||||
trackIssue("legacyScheduleCron");
|
||||
}
|
||||
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
|
||||
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
|
||||
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
|
||||
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
|
||||
if (targetStaggerMs === undefined) {
|
||||
if ("staggerMs" in sched) {
|
||||
delete sched.staggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (sched.staggerMs !== targetStaggerMs) {
|
||||
sched.staggerMs = targetStaggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const delivery = raw.delivery;
|
||||
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
|
||||
const modeRaw = (delivery as { mode?: unknown }).mode;
|
||||
if (typeof modeRaw === "string") {
|
||||
const lowered = modeRaw.trim().toLowerCase();
|
||||
if (lowered === "deliver") {
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
trackIssue("legacyDeliveryMode");
|
||||
}
|
||||
} else if (modeRaw === undefined || modeRaw === null) {
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const isolation = raw.isolation;
|
||||
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
|
||||
delete raw.isolation;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payloadKind =
|
||||
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
|
||||
const normalizedSessionTarget =
|
||||
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
|
||||
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
|
||||
if (raw.sessionTarget !== normalizedSessionTarget) {
|
||||
raw.sessionTarget = normalizedSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
|
||||
if (raw.sessionTarget !== inferredSessionTarget) {
|
||||
raw.sessionTarget = inferredSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const sessionTarget =
|
||||
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
|
||||
const isIsolatedAgentTurn =
|
||||
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
|
||||
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
|
||||
const normalizedLegacy = normalizeLegacyDeliveryInput({
|
||||
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
|
||||
payload: payloadRecord,
|
||||
});
|
||||
|
||||
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
|
||||
if (!hasDelivery && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
} else if (!hasDelivery) {
|
||||
raw.delivery = { mode: "announce" };
|
||||
mutated = true;
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
return { issues, jobs, mutated };
|
||||
}
|
||||
@@ -18,6 +18,10 @@ describe("method scope resolution", () => {
|
||||
expect(resolveLeastPrivilegeOperatorScopesForMethod("poll")).toEqual(["operator.write"]);
|
||||
});
|
||||
|
||||
it("leaves node-only pending drain outside operator scopes", () => {
|
||||
expect(resolveLeastPrivilegeOperatorScopesForMethod("node.pending.drain")).toEqual([]);
|
||||
});
|
||||
|
||||
it("returns empty scopes for unknown methods", () => {
|
||||
expect(resolveLeastPrivilegeOperatorScopesForMethod("totally.unknown.method")).toEqual([]);
|
||||
});
|
||||
|
||||
@@ -22,6 +22,7 @@ export const CLI_DEFAULT_OPERATOR_SCOPES: OperatorScope[] = [
|
||||
const NODE_ROLE_METHODS = new Set([
|
||||
"node.invoke.result",
|
||||
"node.event",
|
||||
"node.pending.drain",
|
||||
"node.canvas.capability.refresh",
|
||||
"node.pending.pull",
|
||||
"node.pending.ack",
|
||||
@@ -102,6 +103,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"chat.abort",
|
||||
"browser.request",
|
||||
"push.test",
|
||||
"node.pending.enqueue",
|
||||
],
|
||||
[ADMIN_SCOPE]: [
|
||||
"channels.logout",
|
||||
|
||||
67
src/gateway/node-pending-work.test.ts
Normal file
67
src/gateway/node-pending-work.test.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { describe, expect, it, beforeEach } from "vitest";
|
||||
import {
|
||||
acknowledgeNodePendingWork,
|
||||
drainNodePendingWork,
|
||||
enqueueNodePendingWork,
|
||||
getNodePendingWorkStateCountForTests,
|
||||
resetNodePendingWorkForTests,
|
||||
} from "./node-pending-work.js";
|
||||
|
||||
describe("node pending work", () => {
|
||||
beforeEach(() => {
|
||||
resetNodePendingWorkForTests();
|
||||
});
|
||||
|
||||
it("returns a baseline status request even when no explicit work is queued", () => {
|
||||
const drained = drainNodePendingWork("node-1");
|
||||
expect(drained.items).toEqual([
|
||||
expect.objectContaining({
|
||||
id: "baseline-status",
|
||||
type: "status.request",
|
||||
priority: "default",
|
||||
}),
|
||||
]);
|
||||
expect(drained.hasMore).toBe(false);
|
||||
});
|
||||
|
||||
it("dedupes explicit work by type and removes acknowledged items", () => {
|
||||
const first = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
|
||||
const second = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
|
||||
|
||||
expect(first.deduped).toBe(false);
|
||||
expect(second.deduped).toBe(true);
|
||||
expect(second.item.id).toBe(first.item.id);
|
||||
|
||||
const drained = drainNodePendingWork("node-2");
|
||||
expect(drained.items.map((item) => item.type)).toEqual(["location.request", "status.request"]);
|
||||
|
||||
const acked = acknowledgeNodePendingWork({
|
||||
nodeId: "node-2",
|
||||
itemIds: [first.item.id, "baseline-status"],
|
||||
});
|
||||
expect(acked.removedItemIds).toEqual([first.item.id]);
|
||||
|
||||
const afterAck = drainNodePendingWork("node-2");
|
||||
expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]);
|
||||
});
|
||||
|
||||
it("keeps hasMore true when the baseline status item is deferred by maxItems", () => {
|
||||
enqueueNodePendingWork({ nodeId: "node-3", type: "location.request" });
|
||||
|
||||
const drained = drainNodePendingWork("node-3", { maxItems: 1 });
|
||||
|
||||
expect(drained.items.map((item) => item.type)).toEqual(["location.request"]);
|
||||
expect(drained.hasMore).toBe(true);
|
||||
});
|
||||
|
||||
it("does not allocate state for drain-only nodes with no queued work", () => {
|
||||
expect(getNodePendingWorkStateCountForTests()).toBe(0);
|
||||
|
||||
const drained = drainNodePendingWork("node-4");
|
||||
const acked = acknowledgeNodePendingWork({ nodeId: "node-4", itemIds: ["baseline-status"] });
|
||||
|
||||
expect(drained.items.map((item) => item.id)).toEqual(["baseline-status"]);
|
||||
expect(acked).toEqual({ revision: 0, removedItemIds: [] });
|
||||
expect(getNodePendingWorkStateCountForTests()).toBe(0);
|
||||
});
|
||||
});
|
||||
193
src/gateway/node-pending-work.ts
Normal file
193
src/gateway/node-pending-work.ts
Normal file
@@ -0,0 +1,193 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
|
||||
export const NODE_PENDING_WORK_TYPES = ["status.request", "location.request"] as const;
|
||||
export type NodePendingWorkType = (typeof NODE_PENDING_WORK_TYPES)[number];
|
||||
|
||||
export const NODE_PENDING_WORK_PRIORITIES = ["default", "normal", "high"] as const;
|
||||
export type NodePendingWorkPriority = (typeof NODE_PENDING_WORK_PRIORITIES)[number];
|
||||
|
||||
export type NodePendingWorkItem = {
|
||||
id: string;
|
||||
type: NodePendingWorkType;
|
||||
priority: NodePendingWorkPriority;
|
||||
createdAtMs: number;
|
||||
expiresAtMs: number | null;
|
||||
payload?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
type NodePendingWorkState = {
|
||||
revision: number;
|
||||
itemsById: Map<string, NodePendingWorkItem>;
|
||||
};
|
||||
|
||||
type DrainOptions = {
|
||||
maxItems?: number;
|
||||
includeDefaultStatus?: boolean;
|
||||
nowMs?: number;
|
||||
};
|
||||
|
||||
type DrainResult = {
|
||||
revision: number;
|
||||
items: NodePendingWorkItem[];
|
||||
hasMore: boolean;
|
||||
};
|
||||
|
||||
const DEFAULT_STATUS_ITEM_ID = "baseline-status";
|
||||
const DEFAULT_STATUS_PRIORITY: NodePendingWorkPriority = "default";
|
||||
const DEFAULT_PRIORITY: NodePendingWorkPriority = "normal";
|
||||
const DEFAULT_MAX_ITEMS = 4;
|
||||
const MAX_ITEMS = 10;
|
||||
const PRIORITY_RANK: Record<NodePendingWorkPriority, number> = {
|
||||
high: 3,
|
||||
normal: 2,
|
||||
default: 1,
|
||||
};
|
||||
|
||||
const stateByNodeId = new Map<string, NodePendingWorkState>();
|
||||
|
||||
function getOrCreateState(nodeId: string): NodePendingWorkState {
|
||||
let state = stateByNodeId.get(nodeId);
|
||||
if (!state) {
|
||||
state = {
|
||||
revision: 0,
|
||||
itemsById: new Map(),
|
||||
};
|
||||
stateByNodeId.set(nodeId, state);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
function pruneExpired(state: NodePendingWorkState, nowMs: number): boolean {
|
||||
let changed = false;
|
||||
for (const [id, item] of state.itemsById) {
|
||||
if (item.expiresAtMs !== null && item.expiresAtMs <= nowMs) {
|
||||
state.itemsById.delete(id);
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
if (changed) {
|
||||
state.revision += 1;
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
function sortedItems(state: NodePendingWorkState): NodePendingWorkItem[] {
|
||||
return [...state.itemsById.values()].toSorted((a, b) => {
|
||||
const priorityDelta = PRIORITY_RANK[b.priority] - PRIORITY_RANK[a.priority];
|
||||
if (priorityDelta !== 0) {
|
||||
return priorityDelta;
|
||||
}
|
||||
if (a.createdAtMs !== b.createdAtMs) {
|
||||
return a.createdAtMs - b.createdAtMs;
|
||||
}
|
||||
return a.id.localeCompare(b.id);
|
||||
});
|
||||
}
|
||||
|
||||
function makeBaselineStatusItem(nowMs: number): NodePendingWorkItem {
|
||||
return {
|
||||
id: DEFAULT_STATUS_ITEM_ID,
|
||||
type: "status.request",
|
||||
priority: DEFAULT_STATUS_PRIORITY,
|
||||
createdAtMs: nowMs,
|
||||
expiresAtMs: null,
|
||||
};
|
||||
}
|
||||
|
||||
export function enqueueNodePendingWork(params: {
|
||||
nodeId: string;
|
||||
type: NodePendingWorkType;
|
||||
priority?: NodePendingWorkPriority;
|
||||
expiresInMs?: number;
|
||||
payload?: Record<string, unknown>;
|
||||
}): { revision: number; item: NodePendingWorkItem; deduped: boolean } {
|
||||
const nodeId = params.nodeId.trim();
|
||||
if (!nodeId) {
|
||||
throw new Error("nodeId required");
|
||||
}
|
||||
const nowMs = Date.now();
|
||||
const state = getOrCreateState(nodeId);
|
||||
pruneExpired(state, nowMs);
|
||||
const existing = [...state.itemsById.values()].find((item) => item.type === params.type);
|
||||
if (existing) {
|
||||
return { revision: state.revision, item: existing, deduped: true };
|
||||
}
|
||||
const item: NodePendingWorkItem = {
|
||||
id: randomUUID(),
|
||||
type: params.type,
|
||||
priority: params.priority ?? DEFAULT_PRIORITY,
|
||||
createdAtMs: nowMs,
|
||||
expiresAtMs:
|
||||
typeof params.expiresInMs === "number" && Number.isFinite(params.expiresInMs)
|
||||
? nowMs + Math.max(1_000, Math.trunc(params.expiresInMs))
|
||||
: null,
|
||||
...(params.payload ? { payload: params.payload } : {}),
|
||||
};
|
||||
state.itemsById.set(item.id, item);
|
||||
state.revision += 1;
|
||||
return { revision: state.revision, item, deduped: false };
|
||||
}
|
||||
|
||||
export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): DrainResult {
|
||||
const normalizedNodeId = nodeId.trim();
|
||||
if (!normalizedNodeId) {
|
||||
return { revision: 0, items: [], hasMore: false };
|
||||
}
|
||||
const nowMs = opts.nowMs ?? Date.now();
|
||||
const state = stateByNodeId.get(normalizedNodeId);
|
||||
const revision = state?.revision ?? 0;
|
||||
if (state) {
|
||||
pruneExpired(state, nowMs);
|
||||
}
|
||||
const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS)));
|
||||
const explicitItems = state ? sortedItems(state) : [];
|
||||
const items = explicitItems.slice(0, maxItems);
|
||||
const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request");
|
||||
const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus;
|
||||
if (includeBaseline && items.length < maxItems) {
|
||||
items.push(makeBaselineStatusItem(nowMs));
|
||||
}
|
||||
const explicitReturnedCount = items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length;
|
||||
const baselineIncluded = items.some((item) => item.id === DEFAULT_STATUS_ITEM_ID);
|
||||
return {
|
||||
revision,
|
||||
items,
|
||||
hasMore: explicitItems.length > explicitReturnedCount || (includeBaseline && !baselineIncluded),
|
||||
};
|
||||
}
|
||||
|
||||
export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: string[] }): {
|
||||
revision: number;
|
||||
removedItemIds: string[];
|
||||
} {
|
||||
const nodeId = params.nodeId.trim();
|
||||
if (!nodeId) {
|
||||
return { revision: 0, removedItemIds: [] };
|
||||
}
|
||||
const state = stateByNodeId.get(nodeId);
|
||||
if (!state) {
|
||||
return { revision: 0, removedItemIds: [] };
|
||||
}
|
||||
const removedItemIds: string[] = [];
|
||||
for (const itemId of params.itemIds) {
|
||||
const trimmedId = itemId.trim();
|
||||
if (!trimmedId || trimmedId === DEFAULT_STATUS_ITEM_ID) {
|
||||
continue;
|
||||
}
|
||||
if (state.itemsById.delete(trimmedId)) {
|
||||
removedItemIds.push(trimmedId);
|
||||
}
|
||||
}
|
||||
if (removedItemIds.length > 0) {
|
||||
state.revision += 1;
|
||||
}
|
||||
return { revision: state.revision, removedItemIds };
|
||||
}
|
||||
|
||||
export function resetNodePendingWorkForTests() {
|
||||
stateByNodeId.clear();
|
||||
}
|
||||
|
||||
export function getNodePendingWorkStateCountForTests(): number {
|
||||
return stateByNodeId.size;
|
||||
}
|
||||
@@ -140,6 +140,14 @@ import {
|
||||
NodeDescribeParamsSchema,
|
||||
type NodeEventParams,
|
||||
NodeEventParamsSchema,
|
||||
type NodePendingDrainParams,
|
||||
NodePendingDrainParamsSchema,
|
||||
type NodePendingDrainResult,
|
||||
NodePendingDrainResultSchema,
|
||||
type NodePendingEnqueueParams,
|
||||
NodePendingEnqueueParamsSchema,
|
||||
type NodePendingEnqueueResult,
|
||||
NodePendingEnqueueResultSchema,
|
||||
type NodeInvokeParams,
|
||||
NodeInvokeParamsSchema,
|
||||
type NodeInvokeResultParams,
|
||||
@@ -296,6 +304,12 @@ export const validateNodeInvokeResultParams = ajv.compile<NodeInvokeResultParams
|
||||
NodeInvokeResultParamsSchema,
|
||||
);
|
||||
export const validateNodeEventParams = ajv.compile<NodeEventParams>(NodeEventParamsSchema);
|
||||
export const validateNodePendingDrainParams = ajv.compile<NodePendingDrainParams>(
|
||||
NodePendingDrainParamsSchema,
|
||||
);
|
||||
export const validateNodePendingEnqueueParams = ajv.compile<NodePendingEnqueueParams>(
|
||||
NodePendingEnqueueParamsSchema,
|
||||
);
|
||||
export const validatePushTestParams = ajv.compile<PushTestParams>(PushTestParamsSchema);
|
||||
export const validateSecretsResolveParams = ajv.compile<SecretsResolveParams>(
|
||||
SecretsResolveParamsSchema,
|
||||
@@ -472,6 +486,10 @@ export {
|
||||
NodeListParamsSchema,
|
||||
NodePendingAckParamsSchema,
|
||||
NodeInvokeParamsSchema,
|
||||
NodePendingDrainParamsSchema,
|
||||
NodePendingDrainResultSchema,
|
||||
NodePendingEnqueueParamsSchema,
|
||||
NodePendingEnqueueResultSchema,
|
||||
SessionsListParamsSchema,
|
||||
SessionsPreviewParamsSchema,
|
||||
SessionsPatchParamsSchema,
|
||||
@@ -621,6 +639,10 @@ export type {
|
||||
NodeInvokeParams,
|
||||
NodeInvokeResultParams,
|
||||
NodeEventParams,
|
||||
NodePendingDrainParams,
|
||||
NodePendingDrainResult,
|
||||
NodePendingEnqueueParams,
|
||||
NodePendingEnqueueResult,
|
||||
SessionsListParams,
|
||||
SessionsPreviewParams,
|
||||
SessionsResolveParams,
|
||||
|
||||
@@ -1,6 +1,14 @@
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import { NonEmptyString } from "./primitives.js";
|
||||
|
||||
const NodePendingWorkTypeSchema = Type.String({
|
||||
enum: ["status.request", "location.request"],
|
||||
});
|
||||
|
||||
const NodePendingWorkPrioritySchema = Type.String({
|
||||
enum: ["normal", "high"],
|
||||
});
|
||||
|
||||
export const NodePairRequestParamsSchema = Type.Object(
|
||||
{
|
||||
nodeId: NonEmptyString,
|
||||
@@ -95,6 +103,56 @@ export const NodeEventParamsSchema = Type.Object(
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingDrainParamsSchema = Type.Object(
|
||||
{
|
||||
maxItems: Type.Optional(Type.Integer({ minimum: 1, maximum: 10 })),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingDrainItemSchema = Type.Object(
|
||||
{
|
||||
id: NonEmptyString,
|
||||
type: NodePendingWorkTypeSchema,
|
||||
priority: Type.String({ enum: ["default", "normal", "high"] }),
|
||||
createdAtMs: Type.Integer({ minimum: 0 }),
|
||||
expiresAtMs: Type.Optional(Type.Union([Type.Integer({ minimum: 0 }), Type.Null()])),
|
||||
payload: Type.Optional(Type.Record(Type.String(), Type.Unknown())),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingDrainResultSchema = Type.Object(
|
||||
{
|
||||
nodeId: NonEmptyString,
|
||||
revision: Type.Integer({ minimum: 0 }),
|
||||
items: Type.Array(NodePendingDrainItemSchema),
|
||||
hasMore: Type.Boolean(),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingEnqueueParamsSchema = Type.Object(
|
||||
{
|
||||
nodeId: NonEmptyString,
|
||||
type: NodePendingWorkTypeSchema,
|
||||
priority: Type.Optional(NodePendingWorkPrioritySchema),
|
||||
expiresInMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 86_400_000 })),
|
||||
wake: Type.Optional(Type.Boolean()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodePendingEnqueueResultSchema = Type.Object(
|
||||
{
|
||||
nodeId: NonEmptyString,
|
||||
revision: Type.Integer({ minimum: 0 }),
|
||||
queued: NodePendingDrainItemSchema,
|
||||
wakeTriggered: Type.Boolean(),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const NodeInvokeRequestEventSchema = Type.Object(
|
||||
{
|
||||
id: NonEmptyString,
|
||||
|
||||
@@ -114,6 +114,10 @@ import {
|
||||
import {
|
||||
NodeDescribeParamsSchema,
|
||||
NodeEventParamsSchema,
|
||||
NodePendingDrainParamsSchema,
|
||||
NodePendingDrainResultSchema,
|
||||
NodePendingEnqueueParamsSchema,
|
||||
NodePendingEnqueueResultSchema,
|
||||
NodeInvokeParamsSchema,
|
||||
NodeInvokeResultParamsSchema,
|
||||
NodeInvokeRequestEventSchema,
|
||||
@@ -186,6 +190,10 @@ export const ProtocolSchemas = {
|
||||
NodeInvokeParams: NodeInvokeParamsSchema,
|
||||
NodeInvokeResultParams: NodeInvokeResultParamsSchema,
|
||||
NodeEventParams: NodeEventParamsSchema,
|
||||
NodePendingDrainParams: NodePendingDrainParamsSchema,
|
||||
NodePendingDrainResult: NodePendingDrainResultSchema,
|
||||
NodePendingEnqueueParams: NodePendingEnqueueParamsSchema,
|
||||
NodePendingEnqueueResult: NodePendingEnqueueResultSchema,
|
||||
NodeInvokeRequestEvent: NodeInvokeRequestEventSchema,
|
||||
PushTestParams: PushTestParamsSchema,
|
||||
PushTestResult: PushTestResultSchema,
|
||||
|
||||
@@ -32,6 +32,10 @@ export type NodeDescribeParams = SchemaType<"NodeDescribeParams">;
|
||||
export type NodeInvokeParams = SchemaType<"NodeInvokeParams">;
|
||||
export type NodeInvokeResultParams = SchemaType<"NodeInvokeResultParams">;
|
||||
export type NodeEventParams = SchemaType<"NodeEventParams">;
|
||||
export type NodePendingDrainParams = SchemaType<"NodePendingDrainParams">;
|
||||
export type NodePendingDrainResult = SchemaType<"NodePendingDrainResult">;
|
||||
export type NodePendingEnqueueParams = SchemaType<"NodePendingEnqueueParams">;
|
||||
export type NodePendingEnqueueResult = SchemaType<"NodePendingEnqueueResult">;
|
||||
export type PushTestParams = SchemaType<"PushTestParams">;
|
||||
export type PushTestResult = SchemaType<"PushTestResult">;
|
||||
export type SessionsListParams = SchemaType<"SessionsListParams">;
|
||||
|
||||
@@ -21,8 +21,10 @@ describe("gateway role policy", () => {
|
||||
|
||||
test("authorizes roles against node vs operator methods", () => {
|
||||
expect(isRoleAuthorizedForMethod("node", "node.event")).toBe(true);
|
||||
expect(isRoleAuthorizedForMethod("node", "node.pending.drain")).toBe(true);
|
||||
expect(isRoleAuthorizedForMethod("node", "status")).toBe(false);
|
||||
expect(isRoleAuthorizedForMethod("operator", "status")).toBe(true);
|
||||
expect(isRoleAuthorizedForMethod("operator", "node.pending.drain")).toBe(false);
|
||||
expect(isRoleAuthorizedForMethod("operator", "node.event")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -76,6 +76,8 @@ const BASE_METHODS = [
|
||||
"node.rename",
|
||||
"node.list",
|
||||
"node.describe",
|
||||
"node.pending.drain",
|
||||
"node.pending.enqueue",
|
||||
"node.invoke",
|
||||
"node.pending.pull",
|
||||
"node.pending.ack",
|
||||
|
||||
@@ -18,6 +18,7 @@ import { execApprovalsHandlers } from "./server-methods/exec-approvals.js";
|
||||
import { healthHandlers } from "./server-methods/health.js";
|
||||
import { logsHandlers } from "./server-methods/logs.js";
|
||||
import { modelsHandlers } from "./server-methods/models.js";
|
||||
import { nodePendingHandlers } from "./server-methods/nodes-pending.js";
|
||||
import { nodeHandlers } from "./server-methods/nodes.js";
|
||||
import { pushHandlers } from "./server-methods/push.js";
|
||||
import { sendHandlers } from "./server-methods/send.js";
|
||||
@@ -87,6 +88,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = {
|
||||
...systemHandlers,
|
||||
...updateHandlers,
|
||||
...nodeHandlers,
|
||||
...nodePendingHandlers,
|
||||
...pushHandlers,
|
||||
...sendHandlers,
|
||||
...usageHandlers,
|
||||
|
||||
177
src/gateway/server-methods/nodes-pending.test.ts
Normal file
177
src/gateway/server-methods/nodes-pending.test.ts
Normal file
@@ -0,0 +1,177 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { nodePendingHandlers } from "./nodes-pending.js";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
drainNodePendingWork: vi.fn(),
|
||||
enqueueNodePendingWork: vi.fn(),
|
||||
maybeWakeNodeWithApns: vi.fn(),
|
||||
maybeSendNodeWakeNudge: vi.fn(),
|
||||
waitForNodeReconnect: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../node-pending-work.js", () => ({
|
||||
drainNodePendingWork: mocks.drainNodePendingWork,
|
||||
enqueueNodePendingWork: mocks.enqueueNodePendingWork,
|
||||
}));
|
||||
|
||||
vi.mock("./nodes.js", () => ({
|
||||
NODE_WAKE_RECONNECT_WAIT_MS: 3_000,
|
||||
NODE_WAKE_RECONNECT_RETRY_WAIT_MS: 12_000,
|
||||
maybeWakeNodeWithApns: mocks.maybeWakeNodeWithApns,
|
||||
maybeSendNodeWakeNudge: mocks.maybeSendNodeWakeNudge,
|
||||
waitForNodeReconnect: mocks.waitForNodeReconnect,
|
||||
}));
|
||||
|
||||
type RespondCall = [
|
||||
boolean,
|
||||
unknown?,
|
||||
{
|
||||
code?: number;
|
||||
message?: string;
|
||||
details?: unknown;
|
||||
}?,
|
||||
];
|
||||
|
||||
function makeContext(overrides?: Partial<Record<string, unknown>>) {
|
||||
return {
|
||||
nodeRegistry: {
|
||||
get: vi.fn(() => undefined),
|
||||
},
|
||||
logGateway: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
},
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("node.pending handlers", () => {
|
||||
beforeEach(() => {
|
||||
mocks.drainNodePendingWork.mockReset();
|
||||
mocks.enqueueNodePendingWork.mockReset();
|
||||
mocks.maybeWakeNodeWithApns.mockReset();
|
||||
mocks.maybeSendNodeWakeNudge.mockReset();
|
||||
mocks.waitForNodeReconnect.mockReset();
|
||||
});
|
||||
|
||||
it("drains pending work for the connected node identity", async () => {
|
||||
mocks.drainNodePendingWork.mockReturnValue({
|
||||
revision: 2,
|
||||
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
|
||||
hasMore: false,
|
||||
});
|
||||
const respond = vi.fn();
|
||||
|
||||
await nodePendingHandlers["node.pending.drain"]({
|
||||
params: { maxItems: 3 },
|
||||
respond: respond as never,
|
||||
client: { connect: { device: { id: "ios-node-1" } } } as never,
|
||||
context: makeContext() as never,
|
||||
req: { type: "req", id: "req-node-pending-drain", method: "node.pending.drain" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(mocks.drainNodePendingWork).toHaveBeenCalledWith("ios-node-1", {
|
||||
maxItems: 3,
|
||||
includeDefaultStatus: true,
|
||||
});
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
{
|
||||
nodeId: "ios-node-1",
|
||||
revision: 2,
|
||||
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
|
||||
hasMore: false,
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects node.pending.drain without a connected device identity", async () => {
|
||||
const respond = vi.fn();
|
||||
|
||||
await nodePendingHandlers["node.pending.drain"]({
|
||||
params: {},
|
||||
respond: respond as never,
|
||||
client: null,
|
||||
context: makeContext() as never,
|
||||
req: { type: "req", id: "req-node-pending-drain-missing", method: "node.pending.drain" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
const call = respond.mock.calls[0] as RespondCall | undefined;
|
||||
expect(call?.[0]).toBe(false);
|
||||
expect(call?.[2]?.message).toContain("connected device identity");
|
||||
});
|
||||
|
||||
it("enqueues pending work and wakes a disconnected node once", async () => {
|
||||
mocks.enqueueNodePendingWork.mockReturnValue({
|
||||
revision: 4,
|
||||
deduped: false,
|
||||
item: {
|
||||
id: "pending-1",
|
||||
type: "location.request",
|
||||
priority: "high",
|
||||
createdAtMs: 100,
|
||||
expiresAtMs: null,
|
||||
},
|
||||
});
|
||||
mocks.maybeWakeNodeWithApns.mockResolvedValue({
|
||||
available: true,
|
||||
throttled: false,
|
||||
path: "apns",
|
||||
durationMs: 12,
|
||||
apnsStatus: 200,
|
||||
apnsReason: null,
|
||||
});
|
||||
let connected = false;
|
||||
mocks.waitForNodeReconnect.mockImplementation(async () => {
|
||||
connected = true;
|
||||
return true;
|
||||
});
|
||||
const context = makeContext({
|
||||
nodeRegistry: {
|
||||
get: vi.fn(() => (connected ? { nodeId: "ios-node-2" } : undefined)),
|
||||
},
|
||||
});
|
||||
const respond = vi.fn();
|
||||
|
||||
await nodePendingHandlers["node.pending.enqueue"]({
|
||||
params: {
|
||||
nodeId: "ios-node-2",
|
||||
type: "location.request",
|
||||
priority: "high",
|
||||
},
|
||||
respond: respond as never,
|
||||
client: null,
|
||||
context: context as never,
|
||||
req: { type: "req", id: "req-node-pending-enqueue", method: "node.pending.enqueue" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(mocks.enqueueNodePendingWork).toHaveBeenCalledWith({
|
||||
nodeId: "ios-node-2",
|
||||
type: "location.request",
|
||||
priority: "high",
|
||||
expiresInMs: undefined,
|
||||
});
|
||||
expect(mocks.maybeWakeNodeWithApns).toHaveBeenCalledWith("ios-node-2", {
|
||||
wakeReason: "node.pending",
|
||||
});
|
||||
expect(mocks.waitForNodeReconnect).toHaveBeenCalledWith({
|
||||
nodeId: "ios-node-2",
|
||||
context,
|
||||
timeoutMs: 3_000,
|
||||
});
|
||||
expect(mocks.maybeSendNodeWakeNudge).not.toHaveBeenCalled();
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
expect.objectContaining({
|
||||
nodeId: "ios-node-2",
|
||||
revision: 4,
|
||||
wakeTriggered: true,
|
||||
}),
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
});
|
||||
159
src/gateway/server-methods/nodes-pending.ts
Normal file
159
src/gateway/server-methods/nodes-pending.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import {
|
||||
drainNodePendingWork,
|
||||
enqueueNodePendingWork,
|
||||
type NodePendingWorkPriority,
|
||||
type NodePendingWorkType,
|
||||
} from "../node-pending-work.js";
|
||||
import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
validateNodePendingDrainParams,
|
||||
validateNodePendingEnqueueParams,
|
||||
} from "../protocol/index.js";
|
||||
import { respondInvalidParams, respondUnavailableOnThrow } from "./nodes.helpers.js";
|
||||
import {
|
||||
maybeSendNodeWakeNudge,
|
||||
maybeWakeNodeWithApns,
|
||||
NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
|
||||
NODE_WAKE_RECONNECT_WAIT_MS,
|
||||
waitForNodeReconnect,
|
||||
} from "./nodes.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
function resolveClientNodeId(
|
||||
client: { connect?: { device?: { id?: string }; client?: { id?: string } } } | null,
|
||||
): string | null {
|
||||
const nodeId = client?.connect?.device?.id ?? client?.connect?.client?.id ?? "";
|
||||
const trimmed = nodeId.trim();
|
||||
return trimmed.length > 0 ? trimmed : null;
|
||||
}
|
||||
|
||||
export const nodePendingHandlers: GatewayRequestHandlers = {
|
||||
"node.pending.drain": async ({ params, respond, client }) => {
|
||||
if (!validateNodePendingDrainParams(params)) {
|
||||
respondInvalidParams({
|
||||
respond,
|
||||
method: "node.pending.drain",
|
||||
validator: validateNodePendingDrainParams,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const nodeId = resolveClientNodeId(client);
|
||||
if (!nodeId) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
"node.pending.drain requires a connected device identity",
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const p = params as { maxItems?: number };
|
||||
const drained = drainNodePendingWork(nodeId, {
|
||||
maxItems: p.maxItems,
|
||||
includeDefaultStatus: true,
|
||||
});
|
||||
respond(true, { nodeId, ...drained }, undefined);
|
||||
},
|
||||
"node.pending.enqueue": async ({ params, respond, context }) => {
|
||||
if (!validateNodePendingEnqueueParams(params)) {
|
||||
respondInvalidParams({
|
||||
respond,
|
||||
method: "node.pending.enqueue",
|
||||
validator: validateNodePendingEnqueueParams,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const p = params as {
|
||||
nodeId: string;
|
||||
type: NodePendingWorkType;
|
||||
priority?: NodePendingWorkPriority;
|
||||
expiresInMs?: number;
|
||||
wake?: boolean;
|
||||
};
|
||||
await respondUnavailableOnThrow(respond, async () => {
|
||||
const queued = enqueueNodePendingWork({
|
||||
nodeId: p.nodeId,
|
||||
type: p.type,
|
||||
priority: p.priority,
|
||||
expiresInMs: p.expiresInMs,
|
||||
});
|
||||
let wakeTriggered = false;
|
||||
if (p.wake !== false && !queued.deduped && !context.nodeRegistry.get(p.nodeId)) {
|
||||
const wakeReqId = queued.item.id;
|
||||
context.logGateway.info(
|
||||
`node pending wake start node=${p.nodeId} req=${wakeReqId} type=${queued.item.type}`,
|
||||
);
|
||||
const wake = await maybeWakeNodeWithApns(p.nodeId, { wakeReason: "node.pending" });
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wake1 node=${p.nodeId} req=${wakeReqId} ` +
|
||||
`available=${wake.available} throttled=${wake.throttled} ` +
|
||||
`path=${wake.path} durationMs=${wake.durationMs} ` +
|
||||
`apnsStatus=${wake.apnsStatus ?? -1} apnsReason=${wake.apnsReason ?? "-"}`,
|
||||
);
|
||||
wakeTriggered = wake.available;
|
||||
if (wake.available) {
|
||||
const reconnected = await waitForNodeReconnect({
|
||||
nodeId: p.nodeId,
|
||||
context,
|
||||
timeoutMs: NODE_WAKE_RECONNECT_WAIT_MS,
|
||||
});
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wait1 node=${p.nodeId} req=${wakeReqId} ` +
|
||||
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_WAIT_MS}`,
|
||||
);
|
||||
}
|
||||
if (!context.nodeRegistry.get(p.nodeId) && wake.available) {
|
||||
const retryWake = await maybeWakeNodeWithApns(p.nodeId, {
|
||||
force: true,
|
||||
wakeReason: "node.pending",
|
||||
});
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wake2 node=${p.nodeId} req=${wakeReqId} force=true ` +
|
||||
`available=${retryWake.available} throttled=${retryWake.throttled} ` +
|
||||
`path=${retryWake.path} durationMs=${retryWake.durationMs} ` +
|
||||
`apnsStatus=${retryWake.apnsStatus ?? -1} apnsReason=${retryWake.apnsReason ?? "-"}`,
|
||||
);
|
||||
if (retryWake.available) {
|
||||
const reconnected = await waitForNodeReconnect({
|
||||
nodeId: p.nodeId,
|
||||
context,
|
||||
timeoutMs: NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
|
||||
});
|
||||
context.logGateway.info(
|
||||
`node pending wake stage=wait2 node=${p.nodeId} req=${wakeReqId} ` +
|
||||
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_RETRY_WAIT_MS}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
if (!context.nodeRegistry.get(p.nodeId)) {
|
||||
const nudge = await maybeSendNodeWakeNudge(p.nodeId);
|
||||
context.logGateway.info(
|
||||
`node pending wake nudge node=${p.nodeId} req=${wakeReqId} sent=${nudge.sent} ` +
|
||||
`throttled=${nudge.throttled} reason=${nudge.reason} durationMs=${nudge.durationMs} ` +
|
||||
`apnsStatus=${nudge.apnsStatus ?? -1} apnsReason=${nudge.apnsReason ?? "-"}`,
|
||||
);
|
||||
context.logGateway.warn(
|
||||
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=false reason=not_connected`,
|
||||
);
|
||||
} else {
|
||||
context.logGateway.info(
|
||||
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=true`,
|
||||
);
|
||||
}
|
||||
}
|
||||
respond(
|
||||
true,
|
||||
{
|
||||
nodeId: p.nodeId,
|
||||
revision: queued.revision,
|
||||
queued: queued.item,
|
||||
wakeTriggered,
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
},
|
||||
};
|
||||
@@ -47,9 +47,9 @@ import {
|
||||
} from "./nodes.helpers.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
||||
const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
|
||||
const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
||||
export const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
||||
export const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
|
||||
export const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
||||
const NODE_WAKE_THROTTLE_MS = 15_000;
|
||||
const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000;
|
||||
const NODE_PENDING_ACTION_TTL_MS = 10 * 60_000;
|
||||
@@ -208,9 +208,9 @@ function toPendingParamsJSON(params: unknown): string | undefined {
|
||||
}
|
||||
}
|
||||
|
||||
async function maybeWakeNodeWithApns(
|
||||
export async function maybeWakeNodeWithApns(
|
||||
nodeId: string,
|
||||
opts?: { force?: boolean },
|
||||
opts?: { force?: boolean; wakeReason?: string },
|
||||
): Promise<NodeWakeAttempt> {
|
||||
const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 };
|
||||
nodeWakeById.set(nodeId, state);
|
||||
@@ -253,7 +253,7 @@ async function maybeWakeNodeWithApns(
|
||||
auth: auth.value,
|
||||
registration,
|
||||
nodeId,
|
||||
wakeReason: "node.invoke",
|
||||
wakeReason: opts?.wakeReason ?? "node.invoke",
|
||||
});
|
||||
if (!wakeResult.ok) {
|
||||
return withDuration({
|
||||
@@ -298,7 +298,7 @@ async function maybeWakeNodeWithApns(
|
||||
}
|
||||
}
|
||||
|
||||
async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
|
||||
export async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
|
||||
const startedAtMs = Date.now();
|
||||
const withDuration = (
|
||||
attempt: Omit<NodeWakeNudgeAttempt, "durationMs">,
|
||||
@@ -362,7 +362,7 @@ async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAtte
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForNodeReconnect(params: {
|
||||
export async function waitForNodeReconnect(params: {
|
||||
nodeId: string;
|
||||
context: { nodeRegistry: { get: (nodeId: string) => unknown } };
|
||||
timeoutMs?: number;
|
||||
|
||||
@@ -848,6 +848,32 @@ describe("gateway server cron", () => {
|
||||
'Cron job "failure destination webhook" failed: unknown error',
|
||||
);
|
||||
|
||||
fetchWithSsrFGuardMock.mockClear();
|
||||
cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "best-effort failed" });
|
||||
const bestEffortFailureDestJobId = await addWebhookCronJob({
|
||||
ws,
|
||||
name: "best effort failure destination webhook",
|
||||
sessionTarget: "isolated",
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "19098680",
|
||||
bestEffort: true,
|
||||
failureDestination: {
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/failure-destination",
|
||||
},
|
||||
},
|
||||
});
|
||||
const bestEffortFailureDestFinished = waitForCronEvent(
|
||||
ws,
|
||||
(payload) =>
|
||||
payload?.jobId === bestEffortFailureDestJobId && payload?.action === "finished",
|
||||
);
|
||||
await runCronJobForce(ws, bestEffortFailureDestJobId);
|
||||
await bestEffortFailureDestFinished;
|
||||
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
|
||||
|
||||
cronIsolatedRun.mockResolvedValueOnce({ status: "ok", summary: "" });
|
||||
const noSummaryJobId = await addWebhookCronJob({
|
||||
ws,
|
||||
@@ -861,7 +887,7 @@ describe("gateway server cron", () => {
|
||||
);
|
||||
await runCronJobForce(ws, noSummaryJobId);
|
||||
await noSummaryFinished;
|
||||
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1);
|
||||
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
await cleanupCronTestRun({ ws, server, prevSkipCron });
|
||||
}
|
||||
|
||||
@@ -75,6 +75,10 @@ describe("gateway server hooks", () => {
|
||||
expect(resAgent.status).toBe(200);
|
||||
const agentEvents = await waitForSystemEvent();
|
||||
expect(agentEvents.some((e) => e.includes("Hook Email: done"))).toBe(true);
|
||||
const firstCall = (cronIsolatedRun.mock.calls[0] as unknown[] | undefined)?.[0] as {
|
||||
deliveryContract?: string;
|
||||
};
|
||||
expect(firstCall?.deliveryContract).toBe("shared");
|
||||
drainSystemEvents(resolveMainKey());
|
||||
|
||||
mockIsolatedRunOkOnce();
|
||||
|
||||
@@ -76,6 +76,7 @@ export function createGatewayHooksRequestHandler(params: {
|
||||
message: value.message,
|
||||
sessionKey,
|
||||
lane: "cron",
|
||||
deliveryContract: "shared",
|
||||
});
|
||||
const summary = result.summary?.trim() || result.error?.trim() || result.status;
|
||||
const prefix =
|
||||
|
||||
49
src/plugins/hook-runner-global.test.ts
Normal file
49
src/plugins/hook-runner-global.test.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { createMockPluginRegistry } from "./hooks.test-helpers.js";
|
||||
|
||||
async function importHookRunnerGlobalModule() {
|
||||
return import("./hook-runner-global.js");
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
const mod = await importHookRunnerGlobalModule();
|
||||
mod.resetGlobalHookRunner();
|
||||
vi.resetModules();
|
||||
});
|
||||
|
||||
describe("hook-runner-global", () => {
|
||||
it("preserves the initialized runner across module reloads", async () => {
|
||||
const modA = await importHookRunnerGlobalModule();
|
||||
const registry = createMockPluginRegistry([{ hookName: "message_received", handler: vi.fn() }]);
|
||||
|
||||
modA.initializeGlobalHookRunner(registry);
|
||||
expect(modA.getGlobalHookRunner()?.hasHooks("message_received")).toBe(true);
|
||||
|
||||
vi.resetModules();
|
||||
|
||||
const modB = await importHookRunnerGlobalModule();
|
||||
expect(modB.getGlobalHookRunner()).not.toBeNull();
|
||||
expect(modB.getGlobalHookRunner()?.hasHooks("message_received")).toBe(true);
|
||||
expect(modB.getGlobalPluginRegistry()).toBe(registry);
|
||||
});
|
||||
|
||||
it("clears the shared state across module reloads", async () => {
|
||||
const modA = await importHookRunnerGlobalModule();
|
||||
const registry = createMockPluginRegistry([{ hookName: "message_received", handler: vi.fn() }]);
|
||||
|
||||
modA.initializeGlobalHookRunner(registry);
|
||||
|
||||
vi.resetModules();
|
||||
|
||||
const modB = await importHookRunnerGlobalModule();
|
||||
modB.resetGlobalHookRunner();
|
||||
expect(modB.getGlobalHookRunner()).toBeNull();
|
||||
expect(modB.getGlobalPluginRegistry()).toBeNull();
|
||||
|
||||
vi.resetModules();
|
||||
|
||||
const modC = await importHookRunnerGlobalModule();
|
||||
expect(modC.getGlobalHookRunner()).toBeNull();
|
||||
expect(modC.getGlobalPluginRegistry()).toBeNull();
|
||||
});
|
||||
});
|
||||
@@ -12,16 +12,31 @@ import type { PluginHookGatewayContext, PluginHookGatewayStopEvent } from "./typ
|
||||
|
||||
const log = createSubsystemLogger("plugins");
|
||||
|
||||
let globalHookRunner: HookRunner | null = null;
|
||||
let globalRegistry: PluginRegistry | null = null;
|
||||
type HookRunnerGlobalState = {
|
||||
hookRunner: HookRunner | null;
|
||||
registry: PluginRegistry | null;
|
||||
};
|
||||
|
||||
const hookRunnerGlobalStateKey = Symbol.for("openclaw.plugins.hook-runner-global-state");
|
||||
|
||||
function getHookRunnerGlobalState(): HookRunnerGlobalState {
|
||||
const globalStore = globalThis as typeof globalThis & {
|
||||
[hookRunnerGlobalStateKey]?: HookRunnerGlobalState;
|
||||
};
|
||||
return (globalStore[hookRunnerGlobalStateKey] ??= {
|
||||
hookRunner: null,
|
||||
registry: null,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the global hook runner with a plugin registry.
|
||||
* Called once when plugins are loaded during gateway startup.
|
||||
*/
|
||||
export function initializeGlobalHookRunner(registry: PluginRegistry): void {
|
||||
globalRegistry = registry;
|
||||
globalHookRunner = createHookRunner(registry, {
|
||||
const state = getHookRunnerGlobalState();
|
||||
state.registry = registry;
|
||||
state.hookRunner = createHookRunner(registry, {
|
||||
logger: {
|
||||
debug: (msg) => log.debug(msg),
|
||||
warn: (msg) => log.warn(msg),
|
||||
@@ -41,7 +56,7 @@ export function initializeGlobalHookRunner(registry: PluginRegistry): void {
|
||||
* Returns null if plugins haven't been loaded yet.
|
||||
*/
|
||||
export function getGlobalHookRunner(): HookRunner | null {
|
||||
return globalHookRunner;
|
||||
return getHookRunnerGlobalState().hookRunner;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -49,14 +64,14 @@ export function getGlobalHookRunner(): HookRunner | null {
|
||||
* Returns null if plugins haven't been loaded yet.
|
||||
*/
|
||||
export function getGlobalPluginRegistry(): PluginRegistry | null {
|
||||
return globalRegistry;
|
||||
return getHookRunnerGlobalState().registry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if any hooks are registered for a given hook name.
|
||||
*/
|
||||
export function hasGlobalHooks(hookName: Parameters<HookRunner["hasHooks"]>[0]): boolean {
|
||||
return globalHookRunner?.hasHooks(hookName) ?? false;
|
||||
return getHookRunnerGlobalState().hookRunner?.hasHooks(hookName) ?? false;
|
||||
}
|
||||
|
||||
export async function runGlobalGatewayStopSafely(params: {
|
||||
@@ -83,6 +98,7 @@ export async function runGlobalGatewayStopSafely(params: {
|
||||
* Reset the global hook runner (for testing).
|
||||
*/
|
||||
export function resetGlobalHookRunner(): void {
|
||||
globalHookRunner = null;
|
||||
globalRegistry = null;
|
||||
const state = getHookRunnerGlobalState();
|
||||
state.hookRunner = null;
|
||||
state.registry = null;
|
||||
}
|
||||
|
||||
@@ -270,58 +270,4 @@ describe("registerTelegramNativeCommands", () => {
|
||||
);
|
||||
expect(sendMessage).not.toHaveBeenCalledWith(123, "Command not found.");
|
||||
});
|
||||
|
||||
it("uses the DM thread session key for plugin command internal sent hooks", async () => {
|
||||
const commandHandlers = new Map<string, (ctx: unknown) => Promise<void>>();
|
||||
|
||||
pluginCommandMocks.getPluginCommandSpecs.mockReturnValue([
|
||||
{
|
||||
name: "plug",
|
||||
description: "Plugin command",
|
||||
},
|
||||
] as never);
|
||||
pluginCommandMocks.matchPluginCommand.mockReturnValue({
|
||||
command: { key: "plug", requireAuth: false },
|
||||
args: undefined,
|
||||
} as never);
|
||||
|
||||
registerTelegramNativeCommands({
|
||||
...buildParams({
|
||||
channels: {
|
||||
telegram: {
|
||||
dmPolicy: "open",
|
||||
},
|
||||
},
|
||||
}),
|
||||
bot: {
|
||||
api: {
|
||||
setMyCommands: vi.fn().mockResolvedValue(undefined),
|
||||
sendMessage: vi.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
command: vi.fn((name: string, cb: (ctx: unknown) => Promise<void>) => {
|
||||
commandHandlers.set(name, cb);
|
||||
}),
|
||||
} as unknown as Parameters<typeof registerTelegramNativeCommands>[0]["bot"],
|
||||
});
|
||||
|
||||
const handler = commandHandlers.get("plug");
|
||||
expect(handler).toBeTruthy();
|
||||
|
||||
await handler?.({
|
||||
match: "",
|
||||
message: {
|
||||
message_id: 42,
|
||||
date: Math.floor(Date.now() / 1000),
|
||||
chat: { id: 12345, type: "private" },
|
||||
from: { id: 12345, username: "alice" },
|
||||
message_thread_id: 99,
|
||||
},
|
||||
});
|
||||
|
||||
expect(deliveryMocks.deliverReplies).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionKeyForInternalHooks: "agent:main:main:thread:12345:99",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -540,20 +540,6 @@ export const registerTelegramNativeCommands = ({
|
||||
chunkMode: params.chunkMode,
|
||||
linkPreview: telegramCfg.linkPreview,
|
||||
});
|
||||
const resolveDmThreadSessionKey = (params: {
|
||||
baseSessionKey: string;
|
||||
chatId: string | number;
|
||||
threadSpec: ReturnType<typeof resolveTelegramThreadSpec>;
|
||||
}): string => {
|
||||
const dmThreadId = params.threadSpec.scope === "dm" ? params.threadSpec.id : undefined;
|
||||
if (dmThreadId == null) {
|
||||
return params.baseSessionKey;
|
||||
}
|
||||
return resolveThreadSessionKeys({
|
||||
baseSessionKey: params.baseSessionKey,
|
||||
threadId: `${params.chatId}:${dmThreadId}`,
|
||||
}).sessionKey;
|
||||
};
|
||||
|
||||
if (commandsToRegister.length > 0 || pluginCatalog.commands.length > 0) {
|
||||
if (typeof (bot as unknown as { command?: unknown }).command !== "function") {
|
||||
@@ -661,11 +647,17 @@ export const registerTelegramNativeCommands = ({
|
||||
});
|
||||
return;
|
||||
}
|
||||
const sessionKey = resolveDmThreadSessionKey({
|
||||
baseSessionKey: route.sessionKey,
|
||||
chatId,
|
||||
threadSpec,
|
||||
});
|
||||
const baseSessionKey = route.sessionKey;
|
||||
// DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums)
|
||||
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
|
||||
const threadKeys =
|
||||
dmThreadId != null
|
||||
? resolveThreadSessionKeys({
|
||||
baseSessionKey,
|
||||
threadId: `${chatId}:${dmThreadId}`,
|
||||
})
|
||||
: null;
|
||||
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
|
||||
const { skillFilter, groupSystemPrompt } = resolveTelegramGroupPromptSettings({
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
@@ -841,15 +833,10 @@ export const registerTelegramNativeCommands = ({
|
||||
return;
|
||||
}
|
||||
const { threadSpec, route, mediaLocalRoots, tableMode, chunkMode } = runtimeContext;
|
||||
const sessionKeyForInternalHooks = resolveDmThreadSessionKey({
|
||||
baseSessionKey: route.sessionKey,
|
||||
chatId,
|
||||
threadSpec,
|
||||
});
|
||||
const deliveryBaseOptions = buildCommandDeliveryBaseOptions({
|
||||
chatId,
|
||||
accountId: route.accountId,
|
||||
sessionKeyForInternalHooks,
|
||||
sessionKeyForInternalHooks: route.sessionKey,
|
||||
mirrorIsGroup: isGroup,
|
||||
mirrorGroupId: isGroup ? String(chatId) : undefined,
|
||||
mediaLocalRoots,
|
||||
|
||||
Reference in New Issue
Block a user