Compare commits

..

94 Commits

Author SHA1 Message Date
Mariano Belinky
f2c544f057 Gateway: tighten node pending drain semantics 2026-03-09 21:44:33 +01:00
Mariano Belinky
c49af9ea7c Gateway: scope node pending drain to nodes 2026-03-09 21:25:00 +01:00
Mariano Belinky
54536e48f5 Gateway: add pending node work primitives 2026-03-09 21:21:12 +01:00
Robin Waslander
2b2e5e2038 fix(cron): do not misclassify empty/NO_REPLY as interim acknowledgement (#41401)
* fix(cron): do not misclassify empty/NO_REPLY as interim acknowledgement

When a cron task's agent returns NO_REPLY, the payload filter strips the
silent token, leaving an empty text string. isLikelyInterimCronMessage()
previously returned true for empty input, causing the cron runner to
inject a forced rerun prompt ('Your previous response was only an
acknowledgement...').

Change the empty-string branch to return false: empty text after payload
filtering means the agent deliberately chose silent completion, not that
it sent an interim 'on it' message.

Fixes #41246

* fix(cron): do not misclassify empty/NO_REPLY as interim acknowledgement

Fixes #41246. (#41383) thanks @jackal092927.

---------

Co-authored-by: xaeon2026 <xaeon2026@gmail.com>
2026-03-09 21:16:28 +01:00
Mariano
0bcddb3d4f iOS: reconnect gateway on foreground return (#41384)
Merged via squash.

Prepared head SHA: 0e2e0dcc36
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
2026-03-09 21:12:23 +01:00
Vincent Koc
d86647d7db Doctor: fix non-interactive cron repair gating (#41386) 2026-03-09 12:35:31 -07:00
Altay
87d939be79 Agents: add embedded error observations (#41336)
Merged via squash.

Prepared head SHA: 4900042298
Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com>
Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com>
Reviewed-by: @altaywtf
2026-03-09 22:27:05 +03:00
Mariano
d4e59a3666 Cron: enforce cron-owned delivery contract (#40998)
Merged via squash.

Prepared head SHA: 5877389e33
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
2026-03-09 20:12:37 +01:00
Vincent Koc
7b88249c9e fix(telegram): bridge direct delivery to internal message:sent hooks (#40185)
* telegram: bridge direct delivery message hooks

* telegram: align sent hooks with command session
2026-03-09 11:21:19 -07:00
Vincent Koc
12702e11a5 plugins: harden global hook runner state (#40184) 2026-03-09 11:20:33 -07:00
Pejman Pour-Moezzi
14bbcad169 fix(acp): propagate setSessionMode gateway errors to client (#41185)
* fix(acp): propagate setSessionMode gateway errors to client

* fix: add changelog entry for ACP setSessionMode propagation (#41185) (thanks @pejmanjohn)

---------

Co-authored-by: Pejman Pour-Moezzi <481729+pejmanjohn@users.noreply.github.com>
Co-authored-by: Onur <onur@textcortex.com>
2026-03-09 17:50:38 +01:00
Pejman Pour-Moezzi
eab39c721b fix(acp): map error states to end_turn instead of unconditional refusal (#41187)
* fix(acp): map error states to end_turn instead of unconditional refusal

* fix: map ACP error stop reason to end_turn (#41187) (thanks @pejmanjohn)

---------

Co-authored-by: Pejman Pour-Moezzi <481729+pejmanjohn@users.noreply.github.com>
Co-authored-by: Onur <onur@textcortex.com>
2026-03-09 17:37:33 +01:00
Radek Sienkiewicz
4815dc0603 Update CONTRIBUTING.md 2026-03-09 17:27:29 +01:00
Robin Waslander
2cce45962f Add Robin Waslander to maintainers 2026-03-09 17:23:56 +01:00
Radek Sienkiewicz
258b7902a4 Update CONTRIBUTING.md 2026-03-09 17:13:16 +01:00
xaeon2026
425bd89b48 Allow ACP sessions.patch lineage fields on ACP session keys (#40995)
Merged via squash.

Prepared head SHA: c1191edc08
Co-authored-by: xaeon2026 <264572156+xaeon2026@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
2026-03-09 17:08:11 +01:00
Charles Dusek
54be30ef89 fix(agents): bound compaction retry wait and drain embedded runs on restart (#40324)
Merged via squash.

Prepared head SHA: cfd99562d6
Co-authored-by: cgdusek <38732970+cgdusek@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
2026-03-09 08:27:29 -07:00
Daniel Reis
fbf5d56366 test(context-engine): add bundle chunk isolation tests for registry (#40460)
Merged via squash.

Prepared head SHA: 44622abfbc
Co-authored-by: dsantoreis <220753637+dsantoreis@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
2026-03-09 08:15:35 -07:00
Joshua Lelon Mitchell
98ea71aca5 fix(swiftformat): exclude HostEnvSecurityPolicy.generated.swift from formatters (#39969) 2026-03-09 07:30:43 -07:00
opriz
51bae75120 fix(kimi-coding): fix kimi tool format: use native Anthropic tool schema instead of OpenAI … (openclaw#40008)
Verified:
- pnpm install --frozen-lockfile
- pnpm build
- pnpm check
- pnpm test:macmini

Co-authored-by: opriz <51957849+opriz@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
2026-03-09 08:28:47 -05:00
Radek Sienkiewicz
f2f561fab1 fix(ui): preserve control-ui auth across refresh (#40892)
Merged via squash.

Prepared head SHA: f9b2375892
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Reviewed-by: @velvet-shark
2026-03-09 12:50:47 +01:00
Peter Steinberger
f6d0712f50 build: sync plugin versions for 2026.3.9 2026-03-09 08:39:52 +00:00
Peter Steinberger
6c579d7842 fix: stabilize launchd paths and appcast secret scan 2026-03-09 08:37:37 +00:00
Peter Steinberger
f9706fde6a build: bump unreleased version to 2026.3.9 2026-03-09 08:33:58 +00:00
Peter Steinberger
7217b97658 fix(onboard): avoid persisting talk fallback on fresh setup 2026-03-09 08:33:58 +00:00
Peter Steinberger
ce9e91fdfc fix(launchd): harden macOS launchagent install permissions 2026-03-09 08:14:46 +00:00
Peter Steinberger
3caab9260c test: narrow gateway loop signal harness 2026-03-09 07:42:15 +00:00
Peter Steinberger
d0847ee322 chore: prepare 2026.3.8 npm release 2026-03-09 07:37:50 +00:00
Peter Steinberger
1d3dde8d21 fix(update): re-enable launchd service before updater bootstrap 2026-03-09 07:27:11 +00:00
Peter Steinberger
cc0f30f5fb test: fix windows runtime and restart loop harnesses 2026-03-09 07:22:23 +00:00
Peter Steinberger
250d3c949e chore: update appcast for 2026.3.8-beta.1 2026-03-09 07:20:08 +00:00
Peter Steinberger
5fca4c0de0 chore: prepare 2026.3.8-beta.1 release 2026-03-09 07:09:37 +00:00
Peter Steinberger
66c581c64c fix: normalize windows runtime shim executables 2026-03-09 07:01:42 +00:00
Peter Steinberger
912aa8744a test: fix Windows fake runtime bin fixtures 2026-03-09 06:50:52 +00:00
Peter Steinberger
8d2d6db9ad test: fix Node 24+ test runner and subagent registry mocks 2026-03-09 06:45:13 +00:00
Peter Steinberger
2d55ad05f3 docs: move 2026.3.8 entries back to unreleased 2026-03-09 06:34:53 +00:00
Peter Steinberger
9631f4665c chore: refresh secrets baseline 2026-03-09 06:31:35 +00:00
Peter Steinberger
e2a1a4a3db build: sync pnpm lockfile 2026-03-09 06:25:01 +00:00
Peter Steinberger
f82931ba8b docs: reorder 2026.3.8 changelog by impact 2026-03-09 06:24:29 +00:00
Peter Steinberger
17599a8ea2 refactor: flatten supervisor marker hints 2026-03-09 06:19:30 +00:00
Peter Steinberger
e86b38f09d refactor: split cron startup catch-up flow 2026-03-09 06:19:10 +00:00
Peter Steinberger
1d301f74a6 refactor: extract telegram polling session 2026-03-09 06:18:07 +00:00
Peter Steinberger
2e79d82198 build: update app deps except carbon 2026-03-09 06:09:33 +00:00
Peter Steinberger
96d17f3cb1 fix: stagger missed cron jobs on restart (#18925) (thanks @rexlunae) 2026-03-09 06:07:43 +00:00
rexlunae
79853aca9c fix(cron): stagger missed jobs on restart to prevent gateway overload
When the gateway restarts with many overdue cron jobs, they are now
executed with staggered delays to prevent overwhelming the gateway.

- Add missedJobStaggerMs config (default 5s between jobs)
- Add maxMissedJobsPerRestart limit (default 5 jobs immediately)
- Prioritize most overdue jobs by sorting by nextRunAtMs
- Reschedule deferred jobs to fire gradually via normal timer

Fixes #18892
2026-03-09 06:07:43 +00:00
Peter Steinberger
2d5e70f3e7 fix: abort telegram getupdates on shutdown (#23950) (thanks @Gkinthecodeland) 2026-03-09 06:03:46 +00:00
George Kalogirou
6186f620d2 fix(telegram): use manual signal forwarding to avoid cross-realm AbortSignal
AbortSignal.any() fails in Node.js when signals come from different module
contexts (grammY's internal signal vs local AbortController), producing:
"The signals[0] argument must be an instance of AbortSignal. Received an
instance of AbortSignal".

Replace with manual event forwarding that works across all realms.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 06:03:46 +00:00
George Kalogirou
2767907abf fix(telegram): abort in-flight getUpdates fetch on shutdown
When the gateway receives SIGTERM, runner.stop() stops the grammY polling
loop but does not abort the in-flight getUpdates HTTP request. That request
hangs for up to 30 seconds (the Telegram API timeout). If a new gateway
instance starts polling during that window, Telegram returns a 409 Conflict
error, causing message loss and requiring exponential backoff recovery.

This is especially problematic with service managers (launchd, systemd)
that restart the process immediately after SIGTERM.

Wire an AbortController into the fetch layer so every Telegram API request
(especially the long-polling getUpdates) aborts immediately on shutdown:

- bot.ts: Accept optional fetchAbortSignal in TelegramBotOptions; wrap
  the grammY fetch with AbortSignal.any() to merge the shutdown signal.
- monitor.ts: Create a per-iteration AbortController, pass its signal to
  createTelegramBot, and abort it from the SIGTERM handler, force-restart
  path, and finally block.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 06:03:46 +00:00
Peter Steinberger
9abf014f35 fix(skills): pin validated download roots 2026-03-09 06:00:50 +00:00
Peter Steinberger
cf3a479bd1 fix(node-host): bind bun and deno approval scripts 2026-03-09 05:59:32 +00:00
Peter Steinberger
fd902b0651 fix: detect launchd supervision via xpc service name (#20555) (thanks @dimat) 2026-03-09 05:57:35 +00:00
dimatu
cf796e2a22 fix(gateway): detect launchd supervision via XPC_SERVICE_NAME
On macOS, launchd sets XPC_SERVICE_NAME on managed processes but does
not set LAUNCH_JOB_LABEL or LAUNCH_JOB_NAME. Without checking
XPC_SERVICE_NAME, isLikelySupervisedProcess() returns false for
launchd-managed gateways, causing restartGatewayProcessWithFreshPid()
to fork a detached child instead of returning "supervised". The
detached child holds the gateway lock while launchd simultaneously
respawns the original process (KeepAlive=true), leading to an infinite
lock-timeout / restart loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 05:57:35 +00:00
merlin
f84adcbe88 fix: release gateway lock on restart failure + reply to Codex reviews
- Release gateway lock when in-process restart fails, so daemon
  restart/stop can still manage the process (Codex P2)
- P1 (env mismatch) already addressed: best-effort by design, documented
  in JSDoc
2026-03-09 05:53:52 +00:00
merlin
f184e7811c fix: move config pre-flight before onNotLoaded in runServiceRestart (Codex P2)
The config check was positioned after onNotLoaded, which could send
SIGUSR1 to an unmanaged process before config was validated.
2026-03-09 05:53:52 +00:00
merlin
c79a0dbdb4 fix: address bot review feedback on #35862
- Remove dead 'return false' in runServiceStart (Greptile)
- Include stack trace in run-loop crash guard error log (Greptile)
- Only catch startup errors on subsequent restarts, not initial start (Codex P1)
- Add JSDoc note about env var false positive edge case (Codex P1)
2026-03-09 05:53:52 +00:00
merlin
335223af32 test: add runServiceStart config pre-flight tests (#35862)
Address Greptile review: add test coverage for runServiceStart path.
The error message copy-paste issue was already fixed in the DRY refactor
(uses params.serviceNoun instead of hardcoded 'restart').
2026-03-09 05:53:52 +00:00
merlin
6740cdf160 fix(gateway): catch startup failure in run loop to prevent process exit (#35862)
When an in-process restart (SIGUSR1) triggers a config-triggered restart
and the new config is invalid, params.start() throws and the while loop
exits, killing the process. On macOS this loses TCC permissions.

Wrap params.start() in try/catch: on failure, set server=null, log the
error, and wait for the next SIGUSR1 instead of crashing.
2026-03-09 05:53:52 +00:00
merlin
eea925b12b fix(gateway): validate config before restart to prevent crash + macOS permission loss (#35862)
When 'openclaw gateway restart' is run with an invalid config, the new
process crashes on startup due to config validation failure. On macOS,
this causes Full Disk Access (TCC) permissions to be lost because the
respawned process has a different PID.

Add getConfigValidationError() helper and pre-flight config validation
in both runServiceRestart() and runServiceStart(). If config is invalid,
abort with a clear error message instead of crashing.

The config watcher's hot-reload path already had this guard
(handleInvalidSnapshot), but the CLI restart/start commands did not.

AI-assisted (OpenClaw agent, fully tested)
2026-03-09 05:53:52 +00:00
Peter Steinberger
88aee9161e fix(msteams): enforce sender allowlists with route allowlists 2026-03-09 05:52:19 +00:00
Peter Steinberger
03a6e3b460 test(cron): cover owner-only tool availability 2026-03-09 05:52:04 +00:00
Peter Steinberger
41e023a80b fix(cron): restore owner-only tools for isolated runs 2026-03-09 05:49:20 +00:00
Peter Steinberger
93775ef6a4 fix(browser): enforce redirect-hop SSRF checks 2026-03-09 05:41:36 +00:00
Peter Steinberger
31402b8542 fix: add changelog for restart timeout recovery (#40380) (thanks @dsantoreis) 2026-03-09 05:38:54 +00:00
DevMac
4bb8104810 test(secrets): skip ACL-dependent runtime snapshot tests on windows 2026-03-09 05:38:54 +00:00
Daniel dos Santos Reis
1d6a2d0165 fix(gateway): exit non-zero on restart shutdown timeout
When a config-change restart hits the force-exit timeout, exit with
code 1 instead of 0 so launchd/systemd treats it as a failure and
triggers a clean process restart. Stop-timeout stays at exit(0)
since graceful stops should not cause supervisor recovery.

Closes #36822
2026-03-09 05:38:54 +00:00
scoootscooob
44beb7be1f fix(daemon): also enable LaunchAgent in repairLaunchAgentBootstrap
The repair/recovery path had the same missing `enable` guard as
`restartLaunchAgent`.  If launchd persists a "disabled" state after a
previous `bootout`, the `bootstrap` call in `repairLaunchAgentBootstrap`
fails silently, leaving the gateway unloaded in the recovery flow.

Add the same `enable` guard before `bootstrap` that was already applied
to `installLaunchAgent` and (in this PR) `restartLaunchAgent`.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 05:36:27 +00:00
scoootscooob
69cd376e3b fix(daemon): enable LaunchAgent before bootstrap on restart
restartLaunchAgent was missing the launchctl enable call that
installLaunchAgent already performs. launchd can persist a "disabled"
state after bootout, causing bootstrap to silently fail and leaving the
gateway unloaded until a manual reinstall.

Fixes #39211

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 05:36:27 +00:00
Peter Steinberger
41eef15cdc test: fix windows secrets runtime ci 2026-03-09 05:24:09 +00:00
GazeKingNuWu
41450187dd fix: clear plugin discovery cache after plugin installation (openclaw#39752)
Verified:
- pnpm build
- pnpm check
- pnpm test:macmini

Co-authored-by: GazeKingNuWu <264914544+GazeKingNuWu@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
2026-03-09 00:16:25 -05:00
Ayaan Zaidi
a40c29b11a Fix cron text announce delivery for Telegram targets (#40575)
Merged via squash.

Prepared head SHA: 54b1513c78
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
2026-03-09 10:26:17 +05:30
Bronko
d4a960fcca fix(matrix): restore robust DM routing without the memberCount heuristic (#19736)
* fix(matrix): remove memberCount heuristic from DM detection

The memberCount === 2 check in isDirectMessage() misclassifies 2-person
group rooms (admin channels, monitoring rooms) as DMs, routing them to
the main session instead of their room-specific session.

Matrix already distinguishes DMs from groups at the protocol level via
m.direct account data and is_direct member state flags. Both are already
checked by client.dms.isDm() and hasDirectFlag(). The memberCount
heuristic only adds false positives for 2-person groups.

Move resolveMemberCount() below the protocol-level checks so it is only
reached for rooms not matched by m.direct or is_direct. This narrows its
role to diagnostic logging for confirmed group rooms.

Refs: #19739

* fix(matrix): add conservative fallback for broken DM flags

Some homeservers (notably Continuwuity) have broken m.direct account
data or never set is_direct on invite events. With the memberCount
heuristic removed, these DMs are no longer detected.

Add a conservative fallback that requires two signals before classifying
as DM: memberCount === 2 AND no explicit m.room.name. Group rooms almost
always have explicit names; DMs almost never do.

Error handling distinguishes M_NOT_FOUND (missing state event, expected
for unnamed rooms) from network/auth errors. Non-404 errors fall through
to group classification rather than guessing.

This is independently revertable — removing this commit restores pure
protocol-based detection without any heuristic fallback.

* fix(matrix): add parentPeer for DM room binding support

Add parentPeer to DM routes so conversations are bindable by room ID
while preserving DM trust semantics (secure 1:1, no group restrictions).

Suggested by @KirillShchetinin.

* fix(matrix): override DM detection for explicitly configured rooms

Builds on @robertcorreiro's config-driven approach from #9106.

Move resolveMatrixRoomConfig() before the DM check. If a room matches
a non-wildcard config entry (matchSource === "direct") and was
classified as DM, override the classification to group. This gives users
a deterministic escape hatch for misclassified rooms.

Wildcards are excluded from the override to avoid breaking DM routing
when a "*" catch-all exists. roomConfig is gated behind isRoom so DMs
never inherit group settings (skills, systemPrompt, autoReply).

This commit is independently droppable if the scope is too broad.

* test(matrix): add DM detection and config override tests

- 15 unit tests for direct.ts: all detection paths, priority order,
  M_NOT_FOUND vs network error handling, edge cases (whitespace names,
  API failures)
- 8 unit tests for rooms.ts: matchSource classification, wildcard
  safety for DM override, direct match priority over wildcard

* Changelog: note matrix DM routing follow-up

* fix(matrix): preserve DM fallback and room bindings

---------

Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
2026-03-08 23:26:48 -05:00
Ayaan Zaidi
26e76f9a61 fix: dedupe inbound Telegram DM replies per agent (#40519)
Merged via squash.

Prepared head SHA: 6e235e7d1f
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
2026-03-09 09:31:05 +05:30
Peter Steinberger
8befd88119 build(protocol): sync generated swift models 2026-03-09 03:49:50 +00:00
Peter Steinberger
99cbda83a2 fix(media): accept reader read result type 2026-03-09 03:49:50 +00:00
Peter Steinberger
e8775cda93 fix(agents): re-expose configured tools under restrictive profiles 2026-03-09 03:49:50 +00:00
Tak Hoffman
ef36cb8cbc chore(acpx): move runtime test fixtures to test-utils (openclaw#40548)
Verified:
- pnpm install --frozen-lockfile
- pnpm build
- pnpm check
- pnpm test:macmini
2026-03-08 22:47:04 -05:00
Ayaan Zaidi
f114a5c638 test: fix android talk config contract fixture 2026-03-09 09:15:49 +05:30
Kyle
a438ff4397 fix(plugin-sdk): remove remaining bundled plugin src imports (openclaw#39638)
Verified:
- pnpm build
- pnpm check
- pnpm test:macmini

Co-authored-by: Kyle <3477429+kyledh@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
2026-03-08 22:32:45 -05:00
Kesku
adec8b28bb alphabetize web search providers (#40259)
Merged via squash.

Prepared head SHA: be6350e5ae
Co-authored-by: kesku <62210496+kesku@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
2026-03-09 08:54:54 +05:30
Mariano
e3df94365b ACP: add optional ingress provenance receipts (#40473)
Merged via squash.

Prepared head SHA: b63e46dd94
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
2026-03-09 04:19:03 +01:00
Tyson Cung
4d501e4ccf fix(telegram): add download timeout to prevent polling loop hang (#40098)
Merged via squash.

Prepared head SHA: abdfa1a35f
Co-authored-by: tysoncung <45380903+tysoncung@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
2026-03-09 08:29:21 +05:30
yuweuii
f6243916b5 fix(models): use 1M context for openai-codex gpt-5.4 (#37876)
Merged via squash.

Prepared head SHA: c41020779e
Co-authored-by: yuweuii <82372187+yuweuii@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
2026-03-08 18:23:49 -07:00
Radek Sienkiewicz
b34158086a docs(changelog): correct Control UI contributor credit (#40420)
Merged via squash.

Prepared head SHA: e4295fe18b
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Reviewed-by: @velvet-shark
2026-03-09 02:18:30 +01:00
Vincent Koc
eabda6e3a4 fix(tests): correct security check failure 2026-03-08 18:13:35 -07:00
Vincent Koc
6d5e142b93 Docker: improve build cache reuse (#40351)
* Docker: improve build cache reuse

* Tests: cover Docker build cache layout

* Docker: fix sandbox cache mount continuations

* Docker: document qr-import manifest scope

* Docker: narrow e2e install inputs

* CI: cache Docker builds in workflows

* CI: route sandbox smoke through setup script

* CI: keep sandbox smoke on script path
2026-03-08 17:57:46 -07:00
Radek Sienkiewicz
4f42c03a49 gateway: fix global Control UI 404s for symlinked wrappers and bundled package roots (#40385)
Merged via squash.

Prepared head SHA: 567b3ed684
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Reviewed-by: @velvet-shark
2026-03-09 01:50:42 +01:00
Peter Steinberger
13bd3db307 chore(docs): drop refactor cleanup tracker 2026-03-09 00:26:20 +00:00
Peter Steinberger
ff4745fc3f refactor(models): split provider discovery helpers 2026-03-09 00:26:20 +00:00
Peter Steinberger
c29b098744 refactor(models): split models.json planning from writes 2026-03-09 00:26:20 +00:00
Peter Steinberger
24b53fcf47 refactor(agents): extract provider model normalization 2026-03-09 00:26:20 +00:00
Peter Steinberger
dfc18b7a2b refactor(models): extract list row builders 2026-03-09 00:26:20 +00:00
Peter Steinberger
141738f717 refactor: harden browser runtime profile handling 2026-03-09 00:25:43 +00:00
bbblending
4ff4ed7ec9 fix(config): refresh runtime snapshot from disk after write. Fixes #37175 (#37313)
Merged via squash.

Prepared head SHA: 69e1861abf
Co-authored-by: bbblending <122739024+bbblending@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
2026-03-08 19:49:15 -04:00
Peter Steinberger
362248e559 refactor: harden browser relay CDP flows 2026-03-08 23:46:10 +00:00
57 changed files with 2831 additions and 625 deletions

View File

@@ -8,6 +8,8 @@ Docs: https://docs.openclaw.ai
### Breaking
- Cron/doctor: tighten isolated cron delivery so cron jobs can no longer notify through ad hoc agent sends or fallback main-session summaries, and add `openclaw doctor --fix` migration for legacy cron storage and legacy notify/webhook delivery metadata. (#40998) Thanks @mbelinky.
### Fixes
- macOS/LaunchAgent install: tighten LaunchAgent directory and plist permissions during install so launchd bootstrap does not fail when the target home path or generated plist inherited group/world-writable modes.
@@ -18,6 +20,9 @@ Docs: https://docs.openclaw.ai
- ACP/sessions.patch: allow `spawnedBy` and `spawnDepth` lineage fields on ACP session keys so `sessions_spawn` with `runtime: "acp"` no longer fails during child-session setup. Fixes #40971. (#40995) thanks @xaeon2026.
- ACP/stop reason mapping: resolve gateway chat `state: "error"` completions as ACP `end_turn` instead of `refusal` so transient backend failures are not surfaced as deliberate refusals. (#41187) thanks @pejmanjohn.
- ACP/setSessionMode: propagate gateway `sessions.patch` failures back to ACP clients so rejected mode changes no longer return silent success. (#41185) thanks @pejmanjohn.
- Agents/embedded logs: add structured, sanitized lifecycle and failover observation events so overload and provider failures are easier to tail and filter. (#41336) thanks @altaywtf.
- iOS/gateway foreground recovery: reconnect immediately on foreground return after stale background sockets are torn down, so the app no longer stays disconnected until a later wake path happens. (#41384) Thanks @mbelinky.
- Cron/subagent followup: do not misclassify empty or `NO_REPLY` cron responses as interim acknowledgements that need a rerun, so deliberately silent cron jobs are no longer retried. (#41383) thanks @jackal092927.
## 2026.3.8

View File

@@ -362,7 +362,14 @@ final class NodeAppModel {
await MainActor.run {
self.operatorConnected = false
self.gatewayConnected = false
// Foreground recovery must actively restart the saved gateway config.
// Disconnecting stale sockets alone can leave us idle if the old
// reconnect tasks were suppressed or otherwise got stuck in background.
self.gatewayStatusText = "Reconnecting…"
self.talkMode.updateGatewayConnected(false)
if let cfg = self.activeGatewayConnectConfig {
self.applyGatewayConnectConfig(cfg)
}
}
}
}

View File

@@ -29,6 +29,7 @@ Troubleshooting: [/automation/troubleshooting](/automation/troubleshooting)
- Wakeups are first-class: a job can request “wake now” vs “next heartbeat”.
- Webhook posting is per job via `delivery.mode = "webhook"` + `delivery.to = "<url>"`.
- Legacy fallback remains for stored jobs with `notify: true` when `cron.webhook` is set, migrate those jobs to webhook delivery mode.
- For upgrades, `openclaw doctor --fix` can normalize legacy cron store fields before the scheduler touches them.
## Quick start (actionable)

View File

@@ -30,6 +30,12 @@ Note: retention/pruning is controlled in config:
- `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions.
- `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/<jobId>.jsonl`.
Upgrade note: if you have older cron jobs from before the current delivery/store format, run
`openclaw doctor --fix`. Doctor now normalizes legacy cron fields (`jobId`, `schedule.cron`,
top-level delivery fields, payload `provider` delivery aliases) and migrates simple
`notify: true` webhook fallback jobs to explicit webhook delivery when `cron.webhook` is
configured.
## Common edits
Update delivery settings without changing the message:

View File

@@ -28,6 +28,7 @@ Notes:
- Interactive prompts (like keychain/OAuth fixes) only run when stdin is a TTY and `--non-interactive` is **not** set. Headless runs (cron, Telegram, no terminal) will skip prompts.
- `--fix` (alias for `--repair`) writes a backup to `~/.openclaw/openclaw.json.bak` and drops unknown config keys, listing each removal.
- State integrity checks now detect orphan transcript files in the sessions directory and can archive them as `.deleted.<timestamp>` to reclaim space safely.
- Doctor also scans `~/.openclaw/cron/jobs.json` (or `cron.store`) for legacy cron job shapes and can rewrite them in place before the scheduler has to auto-normalize them at runtime.
- Doctor includes a memory-search readiness check and can recommend `openclaw configure --section model` when embedding credentials are missing.
- If sandbox mode is enabled but Docker is unavailable, doctor reports a high-signal warning with remediation (`install Docker` or `openclaw config set agents.defaults.sandbox.mode off`).

View File

@@ -65,6 +65,7 @@ cat ~/.openclaw/openclaw.json
- Config normalization for legacy values.
- OpenCode Zen provider override warnings (`models.providers.opencode`).
- Legacy on-disk state migration (sessions/agent dir/WhatsApp auth).
- Legacy cron store migration (`jobId`, `schedule.cron`, top-level delivery/payload fields, payload `provider`, simple `notify: true` webhook fallback jobs).
- State integrity and permissions checks (sessions, transcripts, state dir).
- Config file permission checks (chmod 600) when running locally.
- Model auth health: checks OAuth expiry, can refresh expiring tokens, and reports auth-profile cooldown/disabled states.
@@ -158,6 +159,25 @@ the legacy sessions + agent dir on startup so history/auth/models land in the
per-agent path without a manual doctor run. WhatsApp auth is intentionally only
migrated via `openclaw doctor`.
### 3b) Legacy cron store migrations
Doctor also checks the cron job store (`~/.openclaw/cron/jobs.json` by default,
or `cron.store` when overridden) for old job shapes that the scheduler still
accepts for compatibility.
Current cron cleanups include:
- `jobId``id`
- `schedule.cron``schedule.expr`
- top-level payload fields (`message`, `model`, `thinking`, ...) → `payload`
- top-level delivery fields (`deliver`, `channel`, `to`, `provider`, ...) → `delivery`
- payload `provider` delivery aliases → explicit `delivery.channel`
- simple legacy `notify: true` webhook fallback jobs → explicit `delivery.mode="webhook"` with `delivery.to=cron.webhook`
Doctor only auto-migrates `notify: true` jobs when it can do so without
changing behavior. If a job combines legacy notify fallback with an existing
non-webhook delivery mode, doctor warns and leaves that job for manual review.
### 4) State integrity checks (session persistence, routing, and safety)
The state directory is the operational brainstem. If it vanishes, you lose

View File

@@ -0,0 +1,182 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import * as loggingConfigModule from "../logging/config.js";
import {
buildApiErrorObservationFields,
buildTextObservationFields,
sanitizeForConsole,
} from "./pi-embedded-error-observation.js";
afterEach(() => {
vi.restoreAllMocks();
});
describe("buildApiErrorObservationFields", () => {
it("redacts request ids and exposes stable hashes instead of raw payloads", () => {
const observed = buildApiErrorObservationFields(
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_overload"}',
);
expect(observed).toMatchObject({
rawErrorPreview: expect.stringContaining('"request_id":"sha256:'),
rawErrorHash: expect.stringMatching(/^sha256:/),
rawErrorFingerprint: expect.stringMatching(/^sha256:/),
providerErrorType: "overloaded_error",
providerErrorMessagePreview: "Overloaded",
requestIdHash: expect.stringMatching(/^sha256:/),
});
expect(observed.rawErrorPreview).not.toContain("req_overload");
});
it("forces token redaction for observation previews", () => {
const observed = buildApiErrorObservationFields(
"Authorization: Bearer sk-abcdefghijklmnopqrstuvwxyz123456",
);
expect(observed.rawErrorPreview).not.toContain("sk-abcdefghijklmnopqrstuvwxyz123456");
expect(observed.rawErrorPreview).toContain("sk-abc");
expect(observed.rawErrorHash).toMatch(/^sha256:/);
});
it("redacts observation-only header and cookie formats", () => {
const observed = buildApiErrorObservationFields(
"x-api-key: sk-abcdefghijklmnopqrstuvwxyz123456 Cookie: session=abcdefghijklmnopqrstuvwxyz123456",
);
expect(observed.rawErrorPreview).not.toContain("abcdefghijklmnopqrstuvwxyz123456");
expect(observed.rawErrorPreview).toContain("x-api-key: ***");
expect(observed.rawErrorPreview).toContain("Cookie: session=");
});
it("does not let cookie redaction consume unrelated fields on the same line", () => {
const observed = buildApiErrorObservationFields(
"Cookie: session=abcdefghijklmnopqrstuvwxyz123456 status=503 request_id=req_cookie",
);
expect(observed.rawErrorPreview).toContain("Cookie: session=");
expect(observed.rawErrorPreview).toContain("status=503");
expect(observed.rawErrorPreview).toContain("request_id=sha256:");
});
it("builds sanitized generic text observation fields", () => {
const observed = buildTextObservationFields(
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_prev"}',
);
expect(observed).toMatchObject({
textPreview: expect.stringContaining('"request_id":"sha256:'),
textHash: expect.stringMatching(/^sha256:/),
textFingerprint: expect.stringMatching(/^sha256:/),
providerErrorType: "overloaded_error",
providerErrorMessagePreview: "Overloaded",
requestIdHash: expect.stringMatching(/^sha256:/),
});
expect(observed.textPreview).not.toContain("req_prev");
});
it("redacts request ids in formatted plain-text errors", () => {
const observed = buildApiErrorObservationFields(
"LLM error overloaded_error: Overloaded (request_id: req_plaintext_123)",
);
expect(observed).toMatchObject({
rawErrorPreview: expect.stringContaining("request_id: sha256:"),
rawErrorFingerprint: expect.stringMatching(/^sha256:/),
requestIdHash: expect.stringMatching(/^sha256:/),
});
expect(observed.rawErrorPreview).not.toContain("req_plaintext_123");
});
it("keeps fingerprints stable across request ids for equivalent errors", () => {
const first = buildApiErrorObservationFields(
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_001"}',
);
const second = buildApiErrorObservationFields(
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_002"}',
);
expect(first.rawErrorFingerprint).toBe(second.rawErrorFingerprint);
expect(first.rawErrorHash).not.toBe(second.rawErrorHash);
});
it("truncates oversized raw and provider previews", () => {
const longMessage = "X".repeat(260);
const observed = buildApiErrorObservationFields(
`{"type":"error","error":{"type":"server_error","message":"${longMessage}"},"request_id":"req_long"}`,
);
expect(observed.rawErrorPreview).toBeDefined();
expect(observed.providerErrorMessagePreview).toBeDefined();
expect(observed.rawErrorPreview?.length).toBeLessThanOrEqual(401);
expect(observed.providerErrorMessagePreview?.length).toBeLessThanOrEqual(201);
expect(observed.providerErrorMessagePreview?.endsWith("…")).toBe(true);
});
it("caps oversized raw inputs before hashing and fingerprinting", () => {
const oversized = "X".repeat(70_000);
const bounded = "X".repeat(64_000);
expect(buildApiErrorObservationFields(oversized)).toMatchObject({
rawErrorHash: buildApiErrorObservationFields(bounded).rawErrorHash,
rawErrorFingerprint: buildApiErrorObservationFields(bounded).rawErrorFingerprint,
});
});
it("returns empty observation fields for empty input", () => {
expect(buildApiErrorObservationFields(undefined)).toEqual({});
expect(buildApiErrorObservationFields("")).toEqual({});
expect(buildApiErrorObservationFields(" ")).toEqual({});
});
it("re-reads configured redact patterns on each call", () => {
const readLoggingConfig = vi.spyOn(loggingConfigModule, "readLoggingConfig");
readLoggingConfig.mockReturnValueOnce(undefined);
readLoggingConfig.mockReturnValueOnce({
redactPatterns: [String.raw`\bcustom-secret-[A-Za-z0-9]+\b`],
});
const first = buildApiErrorObservationFields("custom-secret-abc123");
const second = buildApiErrorObservationFields("custom-secret-abc123");
expect(first.rawErrorPreview).toContain("custom-secret-abc123");
expect(second.rawErrorPreview).not.toContain("custom-secret-abc123");
expect(second.rawErrorPreview).toContain("custom");
});
it("fails closed when observation sanitization throws", () => {
vi.spyOn(loggingConfigModule, "readLoggingConfig").mockImplementation(() => {
throw new Error("boom");
});
expect(buildApiErrorObservationFields("request_id=req_123")).toEqual({});
expect(buildTextObservationFields("request_id=req_123")).toEqual({
textPreview: undefined,
textHash: undefined,
textFingerprint: undefined,
httpCode: undefined,
providerErrorType: undefined,
providerErrorMessagePreview: undefined,
requestIdHash: undefined,
});
});
it("ignores non-string configured redact patterns", () => {
vi.spyOn(loggingConfigModule, "readLoggingConfig").mockReturnValue({
redactPatterns: [
123 as never,
{ bad: true } as never,
String.raw`\bcustom-secret-[A-Za-z0-9]+\b`,
],
});
const observed = buildApiErrorObservationFields("custom-secret-abc123");
expect(observed.rawErrorPreview).not.toContain("custom-secret-abc123");
expect(observed.rawErrorPreview).toContain("custom");
});
});
describe("sanitizeForConsole", () => {
it("strips control characters from console-facing values", () => {
expect(sanitizeForConsole("run-1\nprovider\tmodel\rtest")).toBe("run-1 provider model test");
});
});

View File

@@ -0,0 +1,199 @@
import { readLoggingConfig } from "../logging/config.js";
import { redactIdentifier } from "../logging/redact-identifier.js";
import { getDefaultRedactPatterns, redactSensitiveText } from "../logging/redact.js";
import { getApiErrorPayloadFingerprint, parseApiErrorInfo } from "./pi-embedded-helpers.js";
import { stableStringify } from "./stable-stringify.js";
const MAX_OBSERVATION_INPUT_CHARS = 64_000;
const MAX_FINGERPRINT_MESSAGE_CHARS = 8_000;
const RAW_ERROR_PREVIEW_MAX_CHARS = 400;
const PROVIDER_ERROR_PREVIEW_MAX_CHARS = 200;
const REQUEST_ID_RE = /\brequest[_ ]?id\b\s*[:=]\s*["'()]*([A-Za-z0-9._:-]+)/i;
const OBSERVATION_EXTRA_REDACT_PATTERNS = [
String.raw`\b(?:x-)?api[-_]?key\b\s*[:=]\s*(["']?)([^\s"'\\;]+)\1`,
String.raw`"(?:api[-_]?key|api_key)"\s*:\s*"([^"]+)"`,
String.raw`(?:\bCookie\b\s*[:=]\s*[^;=\s]+=|;\s*[^;=\s]+=)([^;\s\r\n]+)`,
];
function resolveConfiguredRedactPatterns(): string[] {
const configured = readLoggingConfig()?.redactPatterns;
if (!Array.isArray(configured)) {
return [];
}
return configured.filter((pattern): pattern is string => typeof pattern === "string");
}
function truncateForObservation(text: string | undefined, maxChars: number): string | undefined {
const trimmed = text?.trim();
if (!trimmed) {
return undefined;
}
return trimmed.length > maxChars ? `${trimmed.slice(0, maxChars)}` : trimmed;
}
function boundObservationInput(text: string | undefined): string | undefined {
const trimmed = text?.trim();
if (!trimmed) {
return undefined;
}
return trimmed.length > MAX_OBSERVATION_INPUT_CHARS
? trimmed.slice(0, MAX_OBSERVATION_INPUT_CHARS)
: trimmed;
}
export function sanitizeForConsole(text: string | undefined, maxChars = 200): string | undefined {
const trimmed = text?.trim();
if (!trimmed) {
return undefined;
}
const withoutControlChars = Array.from(trimmed)
.filter((char) => {
const code = char.charCodeAt(0);
return !(
code <= 0x08 ||
code === 0x0b ||
code === 0x0c ||
(code >= 0x0e && code <= 0x1f) ||
code === 0x7f
);
})
.join("");
const sanitized = withoutControlChars
.replace(/[\r\n\t]+/g, " ")
.replace(/\s+/g, " ")
.trim();
return sanitized.length > maxChars ? `${sanitized.slice(0, maxChars)}` : sanitized;
}
function replaceRequestIdPreview(
text: string | undefined,
requestId: string | undefined,
): string | undefined {
if (!text || !requestId) {
return text;
}
return text.split(requestId).join(redactIdentifier(requestId, { len: 12 }));
}
function redactObservationText(text: string | undefined): string | undefined {
if (!text) {
return text;
}
// Observation logs must stay redacted even when operators disable general-purpose
// log redaction, otherwise raw provider payloads leak back into always-on logs.
const configuredPatterns = resolveConfiguredRedactPatterns();
return redactSensitiveText(text, {
mode: "tools",
patterns: [
...getDefaultRedactPatterns(),
...configuredPatterns,
...OBSERVATION_EXTRA_REDACT_PATTERNS,
],
});
}
function extractRequestId(text: string | undefined): string | undefined {
if (!text) {
return undefined;
}
const match = text.match(REQUEST_ID_RE);
return match?.[1]?.trim() || undefined;
}
function buildObservationFingerprint(params: {
raw: string;
requestId?: string;
httpCode?: string;
type?: string;
message?: string;
}): string | null {
const boundedMessage =
params.message && params.message.length > MAX_FINGERPRINT_MESSAGE_CHARS
? params.message.slice(0, MAX_FINGERPRINT_MESSAGE_CHARS)
: params.message;
const structured =
params.httpCode || params.type || boundedMessage
? stableStringify({
httpCode: params.httpCode,
type: params.type,
message: boundedMessage,
})
: null;
if (structured) {
return structured;
}
if (params.requestId) {
return params.raw.split(params.requestId).join("<request_id>");
}
return getApiErrorPayloadFingerprint(params.raw);
}
export function buildApiErrorObservationFields(rawError?: string): {
rawErrorPreview?: string;
rawErrorHash?: string;
rawErrorFingerprint?: string;
httpCode?: string;
providerErrorType?: string;
providerErrorMessagePreview?: string;
requestIdHash?: string;
} {
const trimmed = boundObservationInput(rawError);
if (!trimmed) {
return {};
}
try {
const parsed = parseApiErrorInfo(trimmed);
const requestId = parsed?.requestId ?? extractRequestId(trimmed);
const requestIdHash = requestId ? redactIdentifier(requestId, { len: 12 }) : undefined;
const rawFingerprint = buildObservationFingerprint({
raw: trimmed,
requestId,
httpCode: parsed?.httpCode,
type: parsed?.type,
message: parsed?.message,
});
const redactedRawPreview = replaceRequestIdPreview(redactObservationText(trimmed), requestId);
const redactedProviderMessage = replaceRequestIdPreview(
redactObservationText(parsed?.message),
requestId,
);
return {
rawErrorPreview: truncateForObservation(redactedRawPreview, RAW_ERROR_PREVIEW_MAX_CHARS),
rawErrorHash: redactIdentifier(trimmed, { len: 12 }),
rawErrorFingerprint: rawFingerprint
? redactIdentifier(rawFingerprint, { len: 12 })
: undefined,
httpCode: parsed?.httpCode,
providerErrorType: parsed?.type,
providerErrorMessagePreview: truncateForObservation(
redactedProviderMessage,
PROVIDER_ERROR_PREVIEW_MAX_CHARS,
),
requestIdHash,
};
} catch {
return {};
}
}
export function buildTextObservationFields(text?: string): {
textPreview?: string;
textHash?: string;
textFingerprint?: string;
httpCode?: string;
providerErrorType?: string;
providerErrorMessagePreview?: string;
requestIdHash?: string;
} {
const observed = buildApiErrorObservationFields(text);
return {
textPreview: observed.rawErrorPreview,
textHash: observed.rawErrorHash,
textFingerprint: observed.rawErrorFingerprint,
httpCode: observed.httpCode,
providerErrorType: observed.providerErrorType,
providerErrorMessagePreview: observed.providerErrorMessagePreview,
requestIdHash: observed.requestIdHash,
};
}

View File

@@ -61,6 +61,7 @@ import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
import { log } from "./logger.js";
import { resolveModel } from "./model.js";
import { runEmbeddedAttempt } from "./run/attempt.js";
import { createFailoverDecisionLogger } from "./run/failover-observation.js";
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
import { buildEmbeddedRunPayloads } from "./run/payloads.js";
import {
@@ -1226,11 +1227,26 @@ export async function runEmbeddedPiAgent(
reason: promptProfileFailureReason,
});
const promptFailoverFailure = isFailoverErrorMessage(errorText);
// Capture the failing profile before auth-profile rotation mutates `lastProfileId`.
const failedPromptProfileId = lastProfileId;
const logPromptFailoverDecision = createFailoverDecisionLogger({
stage: "prompt",
runId: params.runId,
rawError: errorText,
failoverReason: promptFailoverReason,
profileFailureReason: promptProfileFailureReason,
provider,
model: modelId,
profileId: failedPromptProfileId,
fallbackConfigured,
aborted,
});
if (
promptFailoverFailure &&
promptFailoverReason !== "timeout" &&
(await advanceAuthProfile())
) {
logPromptFailoverDecision("rotate_profile");
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
continue;
}
@@ -1249,15 +1265,20 @@ export async function runEmbeddedPiAgent(
// are configured so outer model fallback can continue on overload,
// rate-limit, auth, or billing failures.
if (fallbackConfigured && promptFailoverFailure) {
const status = resolveFailoverStatus(promptFailoverReason ?? "unknown");
logPromptFailoverDecision("fallback_model", { status });
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
throw new FailoverError(errorText, {
reason: promptFailoverReason ?? "unknown",
provider,
model: modelId,
profileId: lastProfileId,
status: resolveFailoverStatus(promptFailoverReason ?? "unknown"),
status,
});
}
if (promptFailoverFailure || promptFailoverReason) {
logPromptFailoverDecision("surface_error");
}
throw promptError;
}
@@ -1282,6 +1303,21 @@ export async function runEmbeddedPiAgent(
resolveAuthProfileFailureReason(assistantFailoverReason);
const cloudCodeAssistFormatError = attempt.cloudCodeAssistFormatError;
const imageDimensionError = parseImageDimensionError(lastAssistant?.errorMessage ?? "");
// Capture the failing profile before auth-profile rotation mutates `lastProfileId`.
const failedAssistantProfileId = lastProfileId;
const logAssistantFailoverDecision = createFailoverDecisionLogger({
stage: "assistant",
runId: params.runId,
rawError: lastAssistant?.errorMessage?.trim(),
failoverReason: assistantFailoverReason,
profileFailureReason: assistantProfileFailureReason,
provider: activeErrorContext.provider,
model: activeErrorContext.model,
profileId: failedAssistantProfileId,
fallbackConfigured,
timedOut,
aborted,
});
if (
authFailure &&
@@ -1339,6 +1375,7 @@ export async function runEmbeddedPiAgent(
const rotated = await advanceAuthProfile();
if (rotated) {
logAssistantFailoverDecision("rotate_profile");
await maybeBackoffBeforeOverloadFailover(assistantFailoverReason);
continue;
}
@@ -1371,6 +1408,7 @@ export async function runEmbeddedPiAgent(
const status =
resolveFailoverStatus(assistantFailoverReason ?? "unknown") ??
(isTimeoutErrorMessage(message) ? 408 : undefined);
logAssistantFailoverDecision("fallback_model", { status });
throw new FailoverError(message, {
reason: assistantFailoverReason ?? "unknown",
provider: activeErrorContext.provider,
@@ -1379,6 +1417,7 @@ export async function runEmbeddedPiAgent(
status,
});
}
logAssistantFailoverDecision("surface_error");
}
const usage = toNormalizedUsage(usageAccumulator);

View File

@@ -0,0 +1,48 @@
import { describe, expect, it } from "vitest";
import { normalizeFailoverDecisionObservationBase } from "./failover-observation.js";
describe("normalizeFailoverDecisionObservationBase", () => {
it("fills timeout observation reasons for deadline timeouts without provider error text", () => {
expect(
normalizeFailoverDecisionObservationBase({
stage: "assistant",
runId: "run:timeout",
rawError: "",
failoverReason: null,
profileFailureReason: null,
provider: "openai",
model: "mock-1",
profileId: "openai:p1",
fallbackConfigured: false,
timedOut: true,
aborted: false,
}),
).toMatchObject({
failoverReason: "timeout",
profileFailureReason: "timeout",
timedOut: true,
});
});
it("preserves explicit failover reasons", () => {
expect(
normalizeFailoverDecisionObservationBase({
stage: "assistant",
runId: "run:overloaded",
rawError: '{"error":{"type":"overloaded_error"}}',
failoverReason: "overloaded",
profileFailureReason: "overloaded",
provider: "openai",
model: "mock-1",
profileId: "openai:p1",
fallbackConfigured: true,
timedOut: true,
aborted: false,
}),
).toMatchObject({
failoverReason: "overloaded",
profileFailureReason: "overloaded",
timedOut: true,
});
});
});

View File

@@ -0,0 +1,76 @@
import { redactIdentifier } from "../../../logging/redact-identifier.js";
import type { AuthProfileFailureReason } from "../../auth-profiles.js";
import {
buildApiErrorObservationFields,
sanitizeForConsole,
} from "../../pi-embedded-error-observation.js";
import type { FailoverReason } from "../../pi-embedded-helpers.js";
import { log } from "../logger.js";
export type FailoverDecisionLoggerInput = {
stage: "prompt" | "assistant";
decision: "rotate_profile" | "fallback_model" | "surface_error";
runId?: string;
rawError?: string;
failoverReason: FailoverReason | null;
profileFailureReason?: AuthProfileFailureReason | null;
provider: string;
model: string;
profileId?: string;
fallbackConfigured: boolean;
timedOut?: boolean;
aborted?: boolean;
status?: number;
};
export type FailoverDecisionLoggerBase = Omit<FailoverDecisionLoggerInput, "decision" | "status">;
export function normalizeFailoverDecisionObservationBase(
base: FailoverDecisionLoggerBase,
): FailoverDecisionLoggerBase {
return {
...base,
failoverReason: base.failoverReason ?? (base.timedOut ? "timeout" : null),
profileFailureReason: base.profileFailureReason ?? (base.timedOut ? "timeout" : null),
};
}
export function createFailoverDecisionLogger(
base: FailoverDecisionLoggerBase,
): (
decision: FailoverDecisionLoggerInput["decision"],
extra?: Pick<FailoverDecisionLoggerInput, "status">,
) => void {
const normalizedBase = normalizeFailoverDecisionObservationBase(base);
const safeProfileId = normalizedBase.profileId
? redactIdentifier(normalizedBase.profileId, { len: 12 })
: undefined;
const safeRunId = sanitizeForConsole(normalizedBase.runId) ?? "-";
const safeProvider = sanitizeForConsole(normalizedBase.provider) ?? "-";
const safeModel = sanitizeForConsole(normalizedBase.model) ?? "-";
const profileText = safeProfileId ?? "-";
const reasonText = normalizedBase.failoverReason ?? "none";
return (decision, extra) => {
const observedError = buildApiErrorObservationFields(normalizedBase.rawError);
log.warn("embedded run failover decision", {
event: "embedded_run_failover_decision",
tags: ["error_handling", "failover", normalizedBase.stage, decision],
runId: normalizedBase.runId,
stage: normalizedBase.stage,
decision,
failoverReason: normalizedBase.failoverReason,
profileFailureReason: normalizedBase.profileFailureReason,
provider: normalizedBase.provider,
model: normalizedBase.model,
profileId: safeProfileId,
fallbackConfigured: normalizedBase.fallbackConfigured,
timedOut: normalizedBase.timedOut,
aborted: normalizedBase.aborted,
status: extra?.status,
...observedError,
consoleMessage:
`embedded run failover decision: runId=${safeRunId} stage=${normalizedBase.stage} decision=${decision} ` +
`reason=${reasonText} provider=${safeProvider}/${safeModel} profile=${profileText}`,
});
};
}

View File

@@ -54,8 +54,13 @@ describe("handleAgentEnd", () => {
const warn = vi.mocked(ctx.log.warn);
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0]?.[0]).toContain("runId=run-1");
expect(warn.mock.calls[0]?.[0]).toContain("error=connection refused");
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
expect(warn.mock.calls[0]?.[1]).toMatchObject({
event: "embedded_run_agent_end",
runId: "run-1",
error: "connection refused",
rawErrorPreview: "connection refused",
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
data: {
@@ -65,6 +70,59 @@ describe("handleAgentEnd", () => {
});
});
it("attaches raw provider error metadata without changing the console message", () => {
const ctx = createContext({
role: "assistant",
stopReason: "error",
provider: "anthropic",
model: "claude-test",
errorMessage: '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}',
content: [{ type: "text", text: "" }],
});
handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
expect(warn.mock.calls[0]?.[1]).toMatchObject({
event: "embedded_run_agent_end",
runId: "run-1",
error: "The AI service is temporarily overloaded. Please try again in a moment.",
failoverReason: "overloaded",
providerErrorType: "overloaded_error",
});
});
it("redacts logged error text before emitting lifecycle events", () => {
const onAgentEvent = vi.fn();
const ctx = createContext(
{
role: "assistant",
stopReason: "error",
errorMessage: "x-api-key: sk-abcdefghijklmnopqrstuvwxyz123456",
content: [{ type: "text", text: "" }],
},
{ onAgentEvent },
);
handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn.mock.calls[0]?.[1]).toMatchObject({
event: "embedded_run_agent_end",
error: "x-api-key: ***",
rawErrorPreview: "x-api-key: ***",
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
data: {
phase: "error",
error: "x-api-key: ***",
},
});
});
it("keeps non-error run-end logging on debug only", () => {
const ctx = createContext(undefined);

View File

@@ -1,6 +1,11 @@
import { emitAgentEvent } from "../infra/agent-events.js";
import { createInlineCodeState } from "../markdown/code-spans.js";
import { formatAssistantErrorText } from "./pi-embedded-helpers.js";
import {
buildApiErrorObservationFields,
buildTextObservationFields,
sanitizeForConsole,
} from "./pi-embedded-error-observation.js";
import { classifyFailoverReason, formatAssistantErrorText } from "./pi-embedded-helpers.js";
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
import { isAssistantMessage } from "./pi-embedded-utils.js";
@@ -36,16 +41,31 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
provider: lastAssistant.provider,
model: lastAssistant.model,
});
const rawError = lastAssistant.errorMessage?.trim();
const failoverReason = classifyFailoverReason(rawError ?? "");
const errorText = (friendlyError || lastAssistant.errorMessage || "LLM request failed.").trim();
ctx.log.warn(
`embedded run agent end: runId=${ctx.params.runId} isError=true error=${errorText}`,
);
const observedError = buildApiErrorObservationFields(rawError);
const safeErrorText =
buildTextObservationFields(errorText).textPreview ?? "LLM request failed.";
const safeRunId = sanitizeForConsole(ctx.params.runId) ?? "-";
ctx.log.warn("embedded run agent end", {
event: "embedded_run_agent_end",
tags: ["error_handling", "lifecycle", "agent_end", "assistant_error"],
runId: ctx.params.runId,
isError: true,
error: safeErrorText,
failoverReason,
provider: lastAssistant.provider,
model: lastAssistant.model,
...observedError,
consoleMessage: `embedded run agent end: runId=${safeRunId} isError=true error=${safeErrorText}`,
});
emitAgentEvent({
runId: ctx.params.runId,
stream: "lifecycle",
data: {
phase: "error",
error: errorText,
error: safeErrorText,
endedAt: Date.now(),
},
});
@@ -53,7 +73,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
stream: "lifecycle",
data: {
phase: "error",
error: errorText,
error: safeErrorText,
},
});
} else {

View File

@@ -12,8 +12,8 @@ import type {
import type { NormalizedUsage } from "./usage.js";
export type EmbeddedSubscribeLogger = {
debug: (message: string) => void;
warn: (message: string) => void;
debug: (message: string, meta?: Record<string, unknown>) => void;
warn: (message: string, meta?: Record<string, unknown>) => void;
};
export type ToolErrorSummary = {

View File

@@ -16,7 +16,7 @@ export function registerCronCli(program: Command) {
.addHelpText(
"after",
() =>
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n`,
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n${theme.muted("Upgrade tip:")} run \`openclaw doctor --fix\` to normalize legacy cron job storage.\n`,
);
registerCronStatusCommand(cron);

View File

@@ -0,0 +1,269 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import * as noteModule from "../terminal/note.js";
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
let tempRoot: string | null = null;
async function makeTempStorePath() {
tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-doctor-cron-"));
return path.join(tempRoot, "cron", "jobs.json");
}
afterEach(async () => {
vi.restoreAllMocks();
if (tempRoot) {
await fs.rm(tempRoot, { recursive: true, force: true });
tempRoot = null;
}
});
function makePrompter(confirmResult = true) {
return {
confirm: vi.fn().mockResolvedValue(confirmResult),
};
}
describe("maybeRepairLegacyCronStore", () => {
it("repairs legacy cron store fields and migrates notify fallback to webhook delivery", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
jobId: "legacy-job",
name: "Legacy job",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
payload: {
kind: "systemEvent",
text: "Morning brief",
},
state: {},
},
],
},
null,
2,
),
"utf-8",
);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
const cfg: OpenClawConfig = {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
};
await maybeRepairLegacyCronStore({
cfg,
options: {},
prompter: makePrompter(true),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
const [job] = persisted.jobs;
expect(job?.jobId).toBeUndefined();
expect(job?.id).toBe("legacy-job");
expect(job?.notify).toBeUndefined();
expect(job?.schedule).toMatchObject({
kind: "cron",
expr: "0 7 * * *",
tz: "UTC",
});
expect(job?.delivery).toMatchObject({
mode: "webhook",
to: "https://example.invalid/cron-finished",
});
expect(job?.payload).toMatchObject({
kind: "systemEvent",
text: "Morning brief",
});
expect(noteSpy).toHaveBeenCalledWith(
expect.stringContaining("Legacy cron job storage detected"),
"Cron",
);
expect(noteSpy).toHaveBeenCalledWith(
expect.stringContaining("Cron store normalized"),
"Doctor changes",
);
});
it("warns instead of replacing announce delivery for notify fallback jobs", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "notify-and-announce",
name: "Notify and announce",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "Status" },
delivery: { mode: "announce", channel: "telegram", to: "123" },
state: {},
},
],
},
null,
2,
),
"utf-8",
);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
await maybeRepairLegacyCronStore({
cfg: {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
},
options: { nonInteractive: true },
prompter: makePrompter(true),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
expect(persisted.jobs[0]?.notify).toBe(true);
expect(noteSpy).toHaveBeenCalledWith(
expect.stringContaining('uses legacy notify fallback alongside delivery mode "announce"'),
"Doctor warnings",
);
});
it("does not auto-repair in non-interactive mode without explicit repair approval", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
jobId: "legacy-job",
name: "Legacy job",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
payload: {
kind: "systemEvent",
text: "Morning brief",
},
state: {},
},
],
},
null,
2,
),
"utf-8",
);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
const prompter = makePrompter(false);
await maybeRepairLegacyCronStore({
cfg: {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
},
options: { nonInteractive: true },
prompter,
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
expect(prompter.confirm).toHaveBeenCalledWith({
message: "Repair legacy cron jobs now?",
initialValue: true,
});
expect(persisted.jobs[0]?.jobId).toBe("legacy-job");
expect(persisted.jobs[0]?.notify).toBe(true);
expect(noteSpy).not.toHaveBeenCalledWith(
expect.stringContaining("Cron store normalized"),
"Doctor changes",
);
});
it("migrates notify fallback none delivery jobs to cron.webhook", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "notify-none",
name: "Notify none",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "every", everyMs: 60_000 },
payload: {
kind: "systemEvent",
text: "Status",
},
delivery: { mode: "none", to: "123456789" },
state: {},
},
],
},
null,
2,
),
"utf-8",
);
await maybeRepairLegacyCronStore({
cfg: {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
},
options: {},
prompter: makePrompter(true),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
expect(persisted.jobs[0]?.notify).toBeUndefined();
expect(persisted.jobs[0]?.delivery).toMatchObject({
mode: "webhook",
to: "https://example.invalid/cron-finished",
});
});
});

183
src/commands/doctor-cron.ts Normal file
View File

@@ -0,0 +1,183 @@
import { formatCliCommand } from "../cli/command-format.js";
import type { OpenClawConfig } from "../config/config.js";
import { normalizeStoredCronJobs } from "../cron/store-migration.js";
import { resolveCronStorePath, loadCronStore, saveCronStore } from "../cron/store.js";
import type { CronJob } from "../cron/types.js";
import { note } from "../terminal/note.js";
import { shortenHomePath } from "../utils.js";
import type { DoctorPrompter, DoctorOptions } from "./doctor-prompter.js";
type CronDoctorOutcome = {
changed: boolean;
warnings: string[];
};
function pluralize(count: number, noun: string) {
return `${count} ${noun}${count === 1 ? "" : "s"}`;
}
function formatLegacyIssuePreview(issues: Partial<Record<string, number>>): string[] {
const lines: string[] = [];
if (issues.jobId) {
lines.push(`- ${pluralize(issues.jobId, "job")} still uses legacy \`jobId\``);
}
if (issues.legacyScheduleString) {
lines.push(
`- ${pluralize(issues.legacyScheduleString, "job")} stores schedule as a bare string`,
);
}
if (issues.legacyScheduleCron) {
lines.push(`- ${pluralize(issues.legacyScheduleCron, "job")} still uses \`schedule.cron\``);
}
if (issues.legacyPayloadKind) {
lines.push(`- ${pluralize(issues.legacyPayloadKind, "job")} needs payload kind normalization`);
}
if (issues.legacyPayloadProvider) {
lines.push(
`- ${pluralize(issues.legacyPayloadProvider, "job")} still uses payload \`provider\` as a delivery alias`,
);
}
if (issues.legacyTopLevelPayloadFields) {
lines.push(
`- ${pluralize(issues.legacyTopLevelPayloadFields, "job")} still uses top-level payload fields`,
);
}
if (issues.legacyTopLevelDeliveryFields) {
lines.push(
`- ${pluralize(issues.legacyTopLevelDeliveryFields, "job")} still uses top-level delivery fields`,
);
}
if (issues.legacyDeliveryMode) {
lines.push(
`- ${pluralize(issues.legacyDeliveryMode, "job")} still uses delivery mode \`deliver\``,
);
}
return lines;
}
function trimString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function migrateLegacyNotifyFallback(params: {
jobs: Array<Record<string, unknown>>;
legacyWebhook?: string;
}): CronDoctorOutcome {
let changed = false;
const warnings: string[] = [];
for (const raw of params.jobs) {
if (!("notify" in raw)) {
continue;
}
const jobName = trimString(raw.name) ?? trimString(raw.id) ?? "<unnamed>";
const notify = raw.notify === true;
if (!notify) {
delete raw.notify;
changed = true;
continue;
}
const delivery =
raw.delivery && typeof raw.delivery === "object" && !Array.isArray(raw.delivery)
? (raw.delivery as Record<string, unknown>)
: null;
const mode = trimString(delivery?.mode)?.toLowerCase();
const to = trimString(delivery?.to);
if (mode === "webhook" && to) {
delete raw.notify;
changed = true;
continue;
}
if ((mode === undefined || mode === "none" || mode === "webhook") && params.legacyWebhook) {
raw.delivery = {
...delivery,
mode: "webhook",
to: mode === "none" ? params.legacyWebhook : (to ?? params.legacyWebhook),
};
delete raw.notify;
changed = true;
continue;
}
if (!params.legacyWebhook) {
warnings.push(
`Cron job "${jobName}" still uses legacy notify fallback, but cron.webhook is unset so doctor cannot migrate it automatically.`,
);
continue;
}
warnings.push(
`Cron job "${jobName}" uses legacy notify fallback alongside delivery mode "${mode}". Migrate it manually so webhook delivery does not replace existing announce behavior.`,
);
}
return { changed, warnings };
}
export async function maybeRepairLegacyCronStore(params: {
cfg: OpenClawConfig;
options: DoctorOptions;
prompter: Pick<DoctorPrompter, "confirm">;
}) {
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const store = await loadCronStore(storePath);
const rawJobs = (store.jobs ?? []) as unknown as Array<Record<string, unknown>>;
if (rawJobs.length === 0) {
return;
}
const normalized = normalizeStoredCronJobs(rawJobs);
const legacyWebhook = trimString(params.cfg.cron?.webhook);
const notifyCount = rawJobs.filter((job) => job.notify === true).length;
const previewLines = formatLegacyIssuePreview(normalized.issues);
if (notifyCount > 0) {
previewLines.push(
`- ${pluralize(notifyCount, "job")} still uses legacy \`notify: true\` webhook fallback`,
);
}
if (previewLines.length === 0) {
return;
}
note(
[
`Legacy cron job storage detected at ${shortenHomePath(storePath)}.`,
...previewLines,
`Repair with ${formatCliCommand("openclaw doctor --fix")} to normalize the store before the next scheduler run.`,
].join("\n"),
"Cron",
);
const shouldRepair = await params.prompter.confirm({
message: "Repair legacy cron jobs now?",
initialValue: true,
});
if (!shouldRepair) {
return;
}
const notifyMigration = migrateLegacyNotifyFallback({
jobs: rawJobs,
legacyWebhook,
});
const changed = normalized.mutated || notifyMigration.changed;
if (!changed && notifyMigration.warnings.length === 0) {
return;
}
if (changed) {
await saveCronStore(storePath, {
version: 1,
jobs: rawJobs as unknown as CronJob[],
});
note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes");
}
if (notifyMigration.warnings.length > 0) {
note(notifyMigration.warnings.join("\n"), "Doctor warnings");
}
}

View File

@@ -31,6 +31,7 @@ import {
import { noteBootstrapFileSize } from "./doctor-bootstrap-size.js";
import { doctorShellCompletion } from "./doctor-completion.js";
import { loadAndMaybeMigrateDoctorConfig } from "./doctor-config-flow.js";
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
import { maybeRepairGatewayDaemon } from "./doctor-gateway-daemon-flow.js";
import { checkGatewayHealth, probeGatewayMemoryStatus } from "./doctor-gateway-health.js";
import {
@@ -220,6 +221,11 @@ export async function doctorCommand(
await noteStateIntegrity(cfg, prompter, configResult.path ?? CONFIG_PATH);
await noteSessionLockHealth({ shouldRepair: prompter.shouldRepair });
await maybeRepairLegacyCronStore({
cfg,
options,
prompter,
});
cfg = await maybeRepairSandboxImages(cfg, runtime, prompter);
noteSandboxScopeWarnings(cfg);

View File

@@ -0,0 +1,143 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const mocks = vi.hoisted(() => ({
resolveDeliveryTarget: vi.fn(),
deliverOutboundPayloads: vi.fn(),
resolveAgentOutboundIdentity: vi.fn().mockReturnValue({ kind: "identity" }),
buildOutboundSessionContext: vi.fn().mockReturnValue({ kind: "session" }),
createOutboundSendDeps: vi.fn().mockReturnValue({ kind: "deps" }),
warn: vi.fn(),
}));
vi.mock("./isolated-agent/delivery-target.js", () => ({
resolveDeliveryTarget: mocks.resolveDeliveryTarget,
}));
vi.mock("../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
}));
vi.mock("../infra/outbound/identity.js", () => ({
resolveAgentOutboundIdentity: mocks.resolveAgentOutboundIdentity,
}));
vi.mock("../infra/outbound/session-context.js", () => ({
buildOutboundSessionContext: mocks.buildOutboundSessionContext,
}));
vi.mock("../cli/outbound-send-deps.js", () => ({
createOutboundSendDeps: mocks.createOutboundSendDeps,
}));
vi.mock("../logging.js", () => ({
getChildLogger: vi.fn(() => ({
warn: mocks.warn,
})),
}));
const { sendFailureNotificationAnnounce } = await import("./delivery.js");
describe("sendFailureNotificationAnnounce", () => {
beforeEach(() => {
vi.clearAllMocks();
mocks.resolveDeliveryTarget.mockResolvedValue({
ok: true,
channel: "telegram",
to: "123",
accountId: "bot-a",
threadId: 42,
mode: "explicit",
});
mocks.deliverOutboundPayloads.mockResolvedValue([{ ok: true }]);
});
afterEach(() => {
vi.useRealTimers();
});
it("delivers failure alerts to the resolved explicit target with strict send settings", async () => {
const deps = {} as never;
const cfg = {} as never;
await sendFailureNotificationAnnounce(
deps,
cfg,
"main",
"job-1",
{ channel: "telegram", to: "123", accountId: "bot-a" },
"Cron failed",
);
expect(mocks.resolveDeliveryTarget).toHaveBeenCalledWith(cfg, "main", {
channel: "telegram",
to: "123",
accountId: "bot-a",
});
expect(mocks.buildOutboundSessionContext).toHaveBeenCalledWith({
cfg,
agentId: "main",
sessionKey: "cron:job-1:failure",
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
cfg,
channel: "telegram",
to: "123",
accountId: "bot-a",
threadId: 42,
payloads: [{ text: "Cron failed" }],
session: { kind: "session" },
identity: { kind: "identity" },
bestEffort: false,
deps: { kind: "deps" },
abortSignal: expect.any(AbortSignal),
}),
);
});
it("does not send when target resolution fails", async () => {
mocks.resolveDeliveryTarget.mockResolvedValue({
ok: false,
error: new Error("target missing"),
});
await sendFailureNotificationAnnounce(
{} as never,
{} as never,
"main",
"job-1",
{ channel: "telegram", to: "123" },
"Cron failed",
);
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
expect(mocks.warn).toHaveBeenCalledWith(
{ error: "target missing" },
"cron: failed to resolve failure destination target",
);
});
it("swallows outbound delivery errors after logging", async () => {
mocks.deliverOutboundPayloads.mockRejectedValue(new Error("send failed"));
await expect(
sendFailureNotificationAnnounce(
{} as never,
{} as never,
"main",
"job-1",
{ channel: "telegram", to: "123" },
"Cron failed",
),
).resolves.toBeUndefined();
expect(mocks.warn).toHaveBeenCalledWith(
expect.objectContaining({
err: "send failed",
channel: "telegram",
to: "123",
}),
"cron: failure destination announce failed",
);
});
});

View File

@@ -148,6 +148,46 @@ describe("resolveFailureDestination", () => {
expect(plan).toBeNull();
});
it("returns null when webhook failure destination matches the primary webhook target", () => {
const plan = resolveFailureDestination(
makeJob({
sessionTarget: "main",
payload: { kind: "systemEvent", text: "tick" },
delivery: {
mode: "webhook",
to: "https://example.invalid/cron",
failureDestination: {
mode: "webhook",
to: "https://example.invalid/cron",
},
},
}),
undefined,
);
expect(plan).toBeNull();
});
it("does not reuse inherited announce recipient when switching failure destination to webhook", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
failureDestination: {
mode: "webhook",
},
},
}),
{
channel: "signal",
to: "group-abc",
mode: "announce",
},
);
expect(plan).toBeNull();
});
it("allows job-level failure destination fields to clear inherited global values", () => {
const plan = resolveFailureDestination(
makeJob({

View File

@@ -54,6 +54,7 @@ export async function runTelegramAnnounceTurn(params: {
to?: string;
bestEffort?: boolean;
};
deliveryContract?: "cron-owned" | "shared";
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
return runCronIsolatedAgentTurn({
cfg: makeCfg(params.home, params.storePath, {
@@ -67,5 +68,6 @@ export async function runTelegramAnnounceTurn(params: {
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
deliveryContract: params.deliveryContract,
});
}

View File

@@ -23,6 +23,7 @@ async function runExplicitTelegramAnnounceTurn(params: {
home: string;
storePath: string;
deps: CliDeps;
deliveryContract?: "cron-owned" | "shared";
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
return runTelegramAnnounceTurn({
...params,
@@ -301,6 +302,7 @@ describe("runCronIsolatedAgentTurn", () => {
home,
storePath,
deps,
deliveryContract: "shared",
});
expectDeliveredOk(res);

View File

@@ -10,7 +10,7 @@
* returning so the timer correctly skips the system-event fallback.
*/
import { beforeEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// --- Module mocks (must be hoisted before imports) ---
@@ -105,7 +105,6 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested
resolvedDelivery,
deliveryRequested: overrides.deliveryRequested ?? true,
skipHeartbeatDelivery: false,
skipMessagingToolDelivery: false,
deliveryBestEffort: false,
deliveryPayloadHasStructuredContent: false,
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
@@ -134,6 +133,10 @@ describe("dispatchCronDelivery — double-announce guard", () => {
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
});
afterEach(() => {
vi.unstubAllEnvs();
});
it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
// countActiveDescendantRuns returns >0 → enters wait block; still >0 after wait → early return
vi.mocked(countActiveDescendantRuns).mockReturnValue(2);
@@ -255,6 +258,42 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
});
it("retries transient direct announce failures before succeeding", async () => {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads)
.mockRejectedValueOnce(new Error("ECONNRESET while sending"))
.mockResolvedValueOnce([{ ok: true } as never]);
const params = makeBaseParams({ synthesizedText: "Retry me once." });
const state = await dispatchCronDelivery(params);
expect(state.result).toBeUndefined();
expect(state.deliveryAttempted).toBe(true);
expect(state.delivered).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2);
});
it("does not retry permanent direct announce failures", async () => {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads).mockRejectedValue(new Error("chat not found"));
const params = makeBaseParams({ synthesizedText: "This should fail once." });
const state = await dispatchCronDelivery(params);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
expect(state.result).toEqual(
expect.objectContaining({
status: "error",
error: "Error: chat not found",
deliveryAttempted: true,
}),
);
});
it("no delivery requested means deliveryAttempted stays false and no delivery is sent", async () => {
const params = makeBaseParams({
synthesizedText: "Task done.",

View File

@@ -96,4 +96,13 @@ describe("resolveCronDeliveryBestEffort", () => {
} as never;
expect(resolveCronDeliveryBestEffort(job)).toBe(true);
});
it("lets explicit delivery.bestEffort=false override legacy payload bestEffortDeliver=true", async () => {
const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js");
const job = {
delivery: { bestEffort: false },
payload: { kind: "agentTurn", bestEffortDeliver: true },
} as never;
expect(resolveCronDeliveryBestEffort(job)).toBe(false);
});
});

View File

@@ -83,7 +83,7 @@ type DispatchCronDeliveryParams = {
resolvedDelivery: DeliveryTargetResolution;
deliveryRequested: boolean;
skipHeartbeatDelivery: boolean;
skipMessagingToolDelivery: boolean;
skipMessagingToolDelivery?: boolean;
deliveryBestEffort: boolean;
deliveryPayloadHasStructuredContent: boolean;
deliveryPayloads: ReplyPayload[];
@@ -192,15 +192,17 @@ async function retryTransientDirectCronDelivery<T>(params: {
export async function dispatchCronDelivery(
params: DispatchCronDeliveryParams,
): Promise<DispatchCronDeliveryState> {
const skipMessagingToolDelivery = params.skipMessagingToolDelivery === true;
let summary = params.summary;
let outputText = params.outputText;
let synthesizedText = params.synthesizedText;
let deliveryPayloads = params.deliveryPayloads;
// `true` means we confirmed at least one outbound send reached the target.
// Keep this strict so timer fallback can safely decide whether to wake main.
let delivered = params.skipMessagingToolDelivery;
let deliveryAttempted = params.skipMessagingToolDelivery;
// Shared callers can treat a matching message-tool send as the completed
// delivery path. Cron-owned callers keep this false so direct cron delivery
// remains the only source of delivered state.
let delivered = skipMessagingToolDelivery;
let deliveryAttempted = skipMessagingToolDelivery;
const failDeliveryTarget = (error: string) =>
params.withRunSession({
status: "error",
@@ -404,11 +406,7 @@ export async function dispatchCronDelivery(
}
};
if (
params.deliveryRequested &&
!params.skipHeartbeatDelivery &&
!params.skipMessagingToolDelivery
) {
if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) {
if (!params.resolvedDelivery.ok) {
if (!params.deliveryBestEffort) {
return {

View File

@@ -55,7 +55,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
restoreFastTestEnv(previousFastTestEnv);
});
it('keeps the message tool enabled when delivery.mode is "none"', async () => {
it('disables the message tool when delivery.mode is "none"', async () => {
mockFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
@@ -65,7 +65,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
await runCronIsolatedAgentTurn(makeParams());
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
});
it("disables the message tool when cron delivery is active", async () => {
@@ -82,4 +82,20 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
});
it("keeps the message tool enabled for shared callers when delivery is not requested", async () => {
mockFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
mode: "none",
});
await runCronIsolatedAgentTurn({
...makeParams(),
deliveryContract: "shared",
});
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
});
});

View File

@@ -78,11 +78,10 @@ export type RunCronAgentTurnResult = {
/** Last non-empty agent text output (not truncated). */
outputText?: string;
/**
* `true` when the isolated run already delivered its output to the target
* channel (via outbound payloads, the subagent announce flow, or a matching
* messaging-tool send). Callers should skip posting a summary to the main
* session to avoid duplicate
* messages. See: https://github.com/openclaw/openclaw/issues/15692
* `true` when the isolated runner already handled the run's user-visible
* delivery outcome. Cron-owned callers use this for cron delivery or
* explicit suppression; shared callers may also use it for a matching
* message-tool send that already reached the target.
*/
delivered?: boolean;
/**
@@ -144,16 +143,22 @@ function buildCronAgentDefaultsConfig(params: {
type ResolvedCronDeliveryTarget = Awaited<ReturnType<typeof resolveDeliveryTarget>>;
type IsolatedDeliveryContract = "cron-owned" | "shared";
function resolveCronToolPolicy(params: {
deliveryRequested: boolean;
resolvedDelivery: ResolvedCronDeliveryTarget;
deliveryContract: IsolatedDeliveryContract;
}) {
return {
// Only enforce an explicit message target when the cron delivery target
// was successfully resolved. When resolution fails the agent should not
// be blocked by a target it cannot satisfy (#27898).
requireExplicitMessageTarget: params.deliveryRequested && params.resolvedDelivery.ok,
disableMessageTool: params.deliveryRequested,
// Cron-owned runs always route user-facing delivery through the runner
// itself. Shared callers keep the previous behavior so non-cron paths do
// not silently lose the message tool when no explicit delivery is active.
disableMessageTool: params.deliveryContract === "cron-owned" ? true : params.deliveryRequested,
};
}
@@ -161,6 +166,7 @@ async function resolveCronDeliveryContext(params: {
cfg: OpenClawConfig;
job: CronJob;
agentId: string;
deliveryContract: IsolatedDeliveryContract;
}) {
const deliveryPlan = resolveCronDeliveryPlan(params.job);
const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, {
@@ -176,6 +182,7 @@ async function resolveCronDeliveryContext(params: {
toolPolicy: resolveCronToolPolicy({
deliveryRequested: deliveryPlan.requested,
resolvedDelivery,
deliveryContract: params.deliveryContract,
}),
};
}
@@ -200,6 +207,7 @@ export async function runCronIsolatedAgentTurn(params: {
sessionKey: string;
agentId?: string;
lane?: string;
deliveryContract?: IsolatedDeliveryContract;
}): Promise<RunCronAgentTurnResult> {
const abortSignal = params.abortSignal ?? params.signal;
const isAborted = () => abortSignal?.aborted === true;
@@ -210,6 +218,7 @@ export async function runCronIsolatedAgentTurn(params: {
: "cron: job execution timed out";
};
const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1";
const deliveryContract = params.deliveryContract ?? "cron-owned";
const defaultAgentId = resolveDefaultAgentId(params.cfg);
const requestedAgentId =
typeof params.agentId === "string" && params.agentId.trim()
@@ -425,6 +434,7 @@ export async function runCronIsolatedAgentTurn(params: {
cfg: cfgWithAgentDefaults,
job: params.job,
agentId,
deliveryContract,
});
const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now);
@@ -807,6 +817,7 @@ export async function runCronIsolatedAgentTurn(params: {
const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg);
const skipHeartbeatDelivery = deliveryRequested && isHeartbeatOnlyResponse(payloads, ackMaxChars);
const skipMessagingToolDelivery =
deliveryContract === "shared" &&
deliveryRequested &&
finalRunResult.didSendViaMessagingTool === true &&
(finalRunResult.messagingToolSentTargets ?? []).some((target) =>
@@ -816,7 +827,6 @@ export async function runCronIsolatedAgentTurn(params: {
accountId: resolvedDelivery.accountId,
}),
);
const deliveryResult = await dispatchCronDelivery({
cfg: params.cfg,
cfgWithAgentDefaults,

View File

@@ -47,8 +47,12 @@ describe("isLikelyInterimCronMessage", () => {
false,
);
});
it("treats empty as interim", () => {
expect(isLikelyInterimCronMessage("")).toBe(true);
it("does not treat empty as interim (empty = NO_REPLY was stripped)", () => {
expect(isLikelyInterimCronMessage("")).toBe(false);
});
it("does not treat whitespace-only as interim", () => {
expect(isLikelyInterimCronMessage(" ")).toBe(false);
});
});

View File

@@ -42,7 +42,10 @@ function normalizeHintText(value: string): string {
export function isLikelyInterimCronMessage(value: string): boolean {
const normalized = normalizeHintText(value);
if (!normalized) {
return true;
// Empty text after payload filtering means the agent either returned
// NO_REPLY (deliberately silent) or produced no deliverable content.
// Do not treat this as an interim acknowledgement that needs a rerun.
return false;
}
const words = normalized.split(" ").filter(Boolean).length;
return words <= 45 && INTERIM_CRON_HINTS.some((hint) => normalized.includes(hint));

View File

@@ -86,7 +86,7 @@ describe("CronService delivery plan consistency", () => {
});
});
it("treats delivery object without mode as announce", async () => {
it("treats delivery object without mode as announce without reviving legacy relay fallback", async () => {
await withCronService({}, async ({ cron, enqueueSystemEvent }) => {
const job = await addIsolatedAgentTurnJob(cron, {
name: "partial-delivery",
@@ -96,10 +96,8 @@ describe("CronService delivery plan consistency", () => {
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Cron: done",
expect.objectContaining({ agentId: undefined }),
);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(cron.getJob(job.id)?.state.lastDeliveryStatus).toBe("unknown");
});
});

View File

@@ -86,7 +86,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
expect(requestHeartbeatNow).not.toHaveBeenCalled();
});
it("still enqueues real cron summaries as system events", async () => {
it("does not revive legacy main-session relay for real cron summaries", async () => {
const { storePath } = await makeStorePath();
const now = Date.now();
@@ -109,10 +109,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
await runScheduledCron(cron);
// Real summaries SHOULD be enqueued.
expect(enqueueSystemEvent).toHaveBeenCalledWith(
expect.stringContaining("Weather update"),
expect.objectContaining({ agentId: undefined }),
);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
});
});

View File

@@ -620,14 +620,14 @@ describe("CronService", () => {
await stopCronAndCleanup(cron, store);
});
it("runs an isolated job and posts summary to main", async () => {
it("runs an isolated job without posting a fallback summary to main", async () => {
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" }));
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createIsolatedAnnounceHarness(runIsolatedAgentJob);
await runIsolatedAnnounceScenario({ cron, events, name: "weekly" });
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
expectMainSystemEventPosted(enqueueSystemEvent, "Cron: done");
expect(requestHeartbeatNow).toHaveBeenCalled();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
await stopCronAndCleanup(cron, store);
});
@@ -685,7 +685,7 @@ describe("CronService", () => {
await stopCronAndCleanup(cron, store);
});
it("posts last output to main even when isolated job errors", async () => {
it("does not post a fallback main summary when an isolated job errors", async () => {
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
summary: "last output",
@@ -700,8 +700,8 @@ describe("CronService", () => {
status: "error",
});
expectMainSystemEventPosted(enqueueSystemEvent, "Cron (error): last output");
expect(requestHeartbeatNow).toHaveBeenCalled();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
await stopCronAndCleanup(cron, store);
});

View File

@@ -1,161 +1,10 @@
import fs from "node:fs";
import { normalizeLegacyDeliveryInput } from "../legacy-delivery.js";
import { parseAbsoluteTimeMs } from "../parse.js";
import { migrateLegacyCronPayload } from "../payload-migration.js";
import { coerceFiniteScheduleNumber } from "../schedule.js";
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js";
import { normalizeStoredCronJobs } from "../store-migration.js";
import { loadCronStore, saveCronStore } from "../store.js";
import type { CronJob } from "../types.js";
import { recomputeNextRuns } from "./jobs.js";
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
import type { CronServiceState } from "./state.js";
function normalizePayloadKind(payload: Record<string, unknown>) {
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
if (raw === "agentturn") {
payload.kind = "agentTurn";
return true;
}
if (raw === "systemevent") {
payload.kind = "systemEvent";
return true;
}
return false;
}
function inferPayloadIfMissing(raw: Record<string, unknown>) {
const message = typeof raw.message === "string" ? raw.message.trim() : "";
const text = typeof raw.text === "string" ? raw.text.trim() : "";
const command = typeof raw.command === "string" ? raw.command.trim() : "";
if (message) {
raw.payload = { kind: "agentTurn", message };
return true;
}
if (text) {
raw.payload = { kind: "systemEvent", text };
return true;
}
if (command) {
raw.payload = { kind: "systemEvent", text: command };
return true;
}
return false;
}
function copyTopLevelAgentTurnFields(
raw: Record<string, unknown>,
payload: Record<string, unknown>,
) {
let mutated = false;
const copyTrimmedString = (field: "model" | "thinking") => {
const existing = payload[field];
if (typeof existing === "string" && existing.trim()) {
return;
}
const value = raw[field];
if (typeof value === "string" && value.trim()) {
payload[field] = value.trim();
mutated = true;
}
};
copyTrimmedString("model");
copyTrimmedString("thinking");
if (
typeof payload.timeoutSeconds !== "number" &&
typeof raw.timeoutSeconds === "number" &&
Number.isFinite(raw.timeoutSeconds)
) {
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
mutated = true;
}
if (
typeof payload.allowUnsafeExternalContent !== "boolean" &&
typeof raw.allowUnsafeExternalContent === "boolean"
) {
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
mutated = true;
}
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
payload.deliver = raw.deliver;
mutated = true;
}
if (
typeof payload.channel !== "string" &&
typeof raw.channel === "string" &&
raw.channel.trim()
) {
payload.channel = raw.channel.trim();
mutated = true;
}
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
payload.to = raw.to.trim();
mutated = true;
}
if (
typeof payload.bestEffortDeliver !== "boolean" &&
typeof raw.bestEffortDeliver === "boolean"
) {
payload.bestEffortDeliver = raw.bestEffortDeliver;
mutated = true;
}
if (
typeof payload.provider !== "string" &&
typeof raw.provider === "string" &&
raw.provider.trim()
) {
payload.provider = raw.provider.trim();
mutated = true;
}
return mutated;
}
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
if ("model" in raw) {
delete raw.model;
}
if ("thinking" in raw) {
delete raw.thinking;
}
if ("timeoutSeconds" in raw) {
delete raw.timeoutSeconds;
}
if ("allowUnsafeExternalContent" in raw) {
delete raw.allowUnsafeExternalContent;
}
if ("message" in raw) {
delete raw.message;
}
if ("text" in raw) {
delete raw.text;
}
if ("deliver" in raw) {
delete raw.deliver;
}
if ("channel" in raw) {
delete raw.channel;
}
if ("to" in raw) {
delete raw.to;
}
if ("bestEffortDeliver" in raw) {
delete raw.bestEffortDeliver;
}
if ("provider" in raw) {
delete raw.provider;
}
if ("command" in raw) {
delete raw.command;
}
if ("timeout" in raw) {
delete raw.timeout;
}
}
async function getFileMtimeMs(path: string): Promise<number | null> {
try {
const stats = await fs.promises.stat(path);
@@ -185,287 +34,7 @@ export async function ensureLoaded(
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
const loaded = await loadCronStore(state.deps.storePath);
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
let mutated = false;
for (const raw of jobs) {
const state = raw.state;
if (!state || typeof state !== "object" || Array.isArray(state)) {
raw.state = {};
mutated = true;
}
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
if (!rawId && legacyJobId) {
raw.id = legacyJobId;
mutated = true;
} else if (rawId && raw.id !== rawId) {
raw.id = rawId;
mutated = true;
}
if ("jobId" in raw) {
delete raw.jobId;
mutated = true;
}
if (typeof raw.schedule === "string") {
const expr = raw.schedule.trim();
raw.schedule = { kind: "cron", expr };
mutated = true;
}
const nameRaw = raw.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
raw.name = inferLegacyName({
schedule: raw.schedule as never,
payload: raw.payload as never,
});
mutated = true;
} else {
raw.name = nameRaw.trim();
}
const desc = normalizeOptionalText(raw.description);
if (raw.description !== desc) {
raw.description = desc;
mutated = true;
}
if ("sessionKey" in raw) {
const sessionKey =
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
if (raw.sessionKey !== sessionKey) {
raw.sessionKey = sessionKey;
mutated = true;
}
}
if (typeof raw.enabled !== "boolean") {
raw.enabled = true;
mutated = true;
}
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
if (wakeModeRaw === "next-heartbeat") {
if (raw.wakeMode !== "next-heartbeat") {
raw.wakeMode = "next-heartbeat";
mutated = true;
}
} else if (wakeModeRaw === "now") {
if (raw.wakeMode !== "now") {
raw.wakeMode = "now";
mutated = true;
}
} else {
raw.wakeMode = "now";
mutated = true;
}
const payload = raw.payload;
if (
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
inferPayloadIfMissing(raw)
) {
mutated = true;
}
const payloadRecord =
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
? (raw.payload as Record<string, unknown>)
: null;
if (payloadRecord) {
if (normalizePayloadKind(payloadRecord)) {
mutated = true;
}
if (!payloadRecord.kind) {
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
payloadRecord.kind = "agentTurn";
mutated = true;
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
payloadRecord.kind = "systemEvent";
mutated = true;
}
}
if (payloadRecord.kind === "agentTurn") {
if (copyTopLevelAgentTurnFields(raw, payloadRecord)) {
mutated = true;
}
}
}
const hadLegacyTopLevelFields =
"model" in raw ||
"thinking" in raw ||
"timeoutSeconds" in raw ||
"allowUnsafeExternalContent" in raw ||
"message" in raw ||
"text" in raw ||
"deliver" in raw ||
"channel" in raw ||
"to" in raw ||
"bestEffortDeliver" in raw ||
"provider" in raw ||
"command" in raw ||
"timeout" in raw;
if (hadLegacyTopLevelFields) {
stripLegacyTopLevelFields(raw);
mutated = true;
}
if (payloadRecord) {
if (migrateLegacyCronPayload(payloadRecord)) {
mutated = true;
}
}
const schedule = raw.schedule;
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
const sched = schedule as Record<string, unknown>;
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
if (!kind && ("at" in sched || "atMs" in sched)) {
sched.kind = "at";
mutated = true;
}
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
const atMsRaw = sched.atMs;
const parsedAtMs =
typeof atMsRaw === "number"
? atMsRaw
: typeof atMsRaw === "string"
? parseAbsoluteTimeMs(atMsRaw)
: atRaw
? parseAbsoluteTimeMs(atRaw)
: null;
if (parsedAtMs !== null) {
sched.at = new Date(parsedAtMs).toISOString();
if ("atMs" in sched) {
delete sched.atMs;
}
mutated = true;
}
const everyMsRaw = sched.everyMs;
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
if (everyMs !== null && everyMsRaw !== everyMs) {
sched.everyMs = everyMs;
mutated = true;
}
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
const anchorRaw = sched.anchorMs;
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
const normalizedAnchor =
anchorCoerced !== undefined
? Math.max(0, Math.floor(anchorCoerced))
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
? Math.max(0, Math.floor(raw.createdAtMs))
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
? Math.max(0, Math.floor(raw.updatedAtMs))
: null;
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
sched.anchorMs = normalizedAnchor;
mutated = true;
}
}
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
let normalizedExpr = exprRaw;
if (!normalizedExpr && legacyCronRaw) {
normalizedExpr = legacyCronRaw;
sched.expr = normalizedExpr;
mutated = true;
}
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
sched.expr = normalizedExpr;
mutated = true;
}
if ("cron" in sched) {
delete sched.cron;
mutated = true;
}
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
if (targetStaggerMs === undefined) {
if ("staggerMs" in sched) {
delete sched.staggerMs;
mutated = true;
}
} else if (sched.staggerMs !== targetStaggerMs) {
sched.staggerMs = targetStaggerMs;
mutated = true;
}
}
}
const delivery = raw.delivery;
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
const modeRaw = (delivery as { mode?: unknown }).mode;
if (typeof modeRaw === "string") {
const lowered = modeRaw.trim().toLowerCase();
if (lowered === "deliver") {
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
} else if (modeRaw === undefined || modeRaw === null) {
// Explicitly persist the default so existing jobs don't silently
// change behaviour when the runtime default shifts.
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
}
const isolation = raw.isolation;
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
delete raw.isolation;
mutated = true;
}
const payloadKind =
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
const normalizedSessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
if (raw.sessionTarget !== normalizedSessionTarget) {
raw.sessionTarget = normalizedSessionTarget;
mutated = true;
}
} else {
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
if (raw.sessionTarget !== inferredSessionTarget) {
raw.sessionTarget = inferredSessionTarget;
mutated = true;
}
}
const sessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
const isIsolatedAgentTurn =
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
const normalizedLegacy = normalizeLegacyDeliveryInput({
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
payload: payloadRecord,
});
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
if (!hasDelivery && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
} else if (!hasDelivery) {
raw.delivery = { mode: "announce" };
mutated = true;
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
}
const { mutated } = normalizeStoredCronJobs(jobs);
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
state.storeLoadedAtMs = state.deps.nowMs();
state.storeFileMtimeMs = fileMtimeMs;

View File

@@ -1,9 +1,7 @@
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
import { isCronSystemEvent } from "../../infra/heartbeat-events-filter.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type {
CronDeliveryStatus,
@@ -1138,46 +1136,6 @@ export async function executeJobCore(
return { status: "error", error: timeoutErrorMessage() };
}
// Post a short summary back to the main session only when announce
// delivery was requested and we are confident no outbound delivery path
// ran. If delivery was attempted but final ack is uncertain, suppress the
// main summary to avoid duplicate user-facing sends.
// See: https://github.com/openclaw/openclaw/issues/15692
//
// Also suppress heartbeat-only summaries (e.g. "HEARTBEAT_OK") — these
// are internal ack tokens that should never leak into user conversations.
// See: https://github.com/openclaw/openclaw/issues/32013
const summaryText = res.summary?.trim();
const deliveryPlan = resolveCronDeliveryPlan(job);
const suppressMainSummary =
res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested;
if (
shouldEnqueueCronMainSummary({
summaryText,
deliveryRequested: deliveryPlan.requested,
delivered: res.delivered,
deliveryAttempted: res.deliveryAttempted,
suppressMainSummary,
isCronSystemEvent,
})
) {
const prefix = "Cron";
const label =
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
state.deps.enqueueSystemEvent(label, {
agentId: job.agentId,
sessionKey: job.sessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === "now") {
state.deps.requestHeartbeatNow({
reason: `cron:${job.id}`,
agentId: job.agentId,
sessionKey: job.sessionKey,
});
}
}
return {
status: res.status,
error: res.error,

View File

@@ -0,0 +1,78 @@
import { describe, expect, it } from "vitest";
import { normalizeStoredCronJobs } from "./store-migration.js";
describe("normalizeStoredCronJobs", () => {
it("normalizes legacy cron fields and reports migration issues", () => {
const jobs = [
{
jobId: "legacy-job",
schedule: { kind: "cron", cron: "*/5 * * * *", tz: "UTC" },
message: "say hi",
model: "openai/gpt-4.1",
deliver: true,
provider: " TeLeGrAm ",
to: "12345",
},
] as Array<Record<string, unknown>>;
const result = normalizeStoredCronJobs(jobs);
expect(result.mutated).toBe(true);
expect(result.issues).toMatchObject({
jobId: 1,
legacyScheduleCron: 1,
legacyTopLevelPayloadFields: 1,
legacyTopLevelDeliveryFields: 1,
});
const [job] = jobs;
expect(job?.jobId).toBeUndefined();
expect(job?.id).toBe("legacy-job");
expect(job?.schedule).toMatchObject({
kind: "cron",
expr: "*/5 * * * *",
tz: "UTC",
});
expect(job?.message).toBeUndefined();
expect(job?.provider).toBeUndefined();
expect(job?.delivery).toMatchObject({
mode: "announce",
channel: "telegram",
to: "12345",
});
expect(job?.payload).toMatchObject({
kind: "agentTurn",
message: "say hi",
model: "openai/gpt-4.1",
});
});
it("normalizes payload provider alias into channel", () => {
const jobs = [
{
id: "legacy-provider",
schedule: { kind: "every", everyMs: 60_000 },
payload: {
kind: "agentTurn",
message: "ping",
provider: " Slack ",
},
},
] as Array<Record<string, unknown>>;
const result = normalizeStoredCronJobs(jobs);
expect(result.mutated).toBe(true);
expect(result.issues.legacyPayloadProvider).toBe(1);
expect(jobs[0]?.payload).toMatchObject({
kind: "agentTurn",
message: "ping",
});
const payload = jobs[0]?.payload as Record<string, unknown> | undefined;
expect(payload?.provider).toBeUndefined();
expect(jobs[0]?.delivery).toMatchObject({
mode: "announce",
channel: "slack",
});
});
});

491
src/cron/store-migration.ts Normal file
View File

@@ -0,0 +1,491 @@
import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js";
import { parseAbsoluteTimeMs } from "./parse.js";
import { migrateLegacyCronPayload } from "./payload-migration.js";
import { coerceFiniteScheduleNumber } from "./schedule.js";
import { inferLegacyName, normalizeOptionalText } from "./service/normalize.js";
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js";
type CronStoreIssueKey =
| "jobId"
| "legacyScheduleString"
| "legacyScheduleCron"
| "legacyPayloadKind"
| "legacyPayloadProvider"
| "legacyTopLevelPayloadFields"
| "legacyTopLevelDeliveryFields"
| "legacyDeliveryMode";
type CronStoreIssues = Partial<Record<CronStoreIssueKey, number>>;
type NormalizeCronStoreJobsResult = {
issues: CronStoreIssues;
jobs: Array<Record<string, unknown>>;
mutated: boolean;
};
function incrementIssue(issues: CronStoreIssues, key: CronStoreIssueKey) {
issues[key] = (issues[key] ?? 0) + 1;
}
function normalizePayloadKind(payload: Record<string, unknown>) {
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
if (raw === "agentturn") {
payload.kind = "agentTurn";
return true;
}
if (raw === "systemevent") {
payload.kind = "systemEvent";
return true;
}
return false;
}
function inferPayloadIfMissing(raw: Record<string, unknown>) {
const message = typeof raw.message === "string" ? raw.message.trim() : "";
const text = typeof raw.text === "string" ? raw.text.trim() : "";
const command = typeof raw.command === "string" ? raw.command.trim() : "";
if (message) {
raw.payload = { kind: "agentTurn", message };
return true;
}
if (text) {
raw.payload = { kind: "systemEvent", text };
return true;
}
if (command) {
raw.payload = { kind: "systemEvent", text: command };
return true;
}
return false;
}
function copyTopLevelAgentTurnFields(
raw: Record<string, unknown>,
payload: Record<string, unknown>,
) {
let mutated = false;
const copyTrimmedString = (field: "model" | "thinking") => {
const existing = payload[field];
if (typeof existing === "string" && existing.trim()) {
return;
}
const value = raw[field];
if (typeof value === "string" && value.trim()) {
payload[field] = value.trim();
mutated = true;
}
};
copyTrimmedString("model");
copyTrimmedString("thinking");
if (
typeof payload.timeoutSeconds !== "number" &&
typeof raw.timeoutSeconds === "number" &&
Number.isFinite(raw.timeoutSeconds)
) {
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
mutated = true;
}
if (
typeof payload.allowUnsafeExternalContent !== "boolean" &&
typeof raw.allowUnsafeExternalContent === "boolean"
) {
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
mutated = true;
}
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
payload.deliver = raw.deliver;
mutated = true;
}
if (
typeof payload.channel !== "string" &&
typeof raw.channel === "string" &&
raw.channel.trim()
) {
payload.channel = raw.channel.trim();
mutated = true;
}
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
payload.to = raw.to.trim();
mutated = true;
}
if (
typeof payload.bestEffortDeliver !== "boolean" &&
typeof raw.bestEffortDeliver === "boolean"
) {
payload.bestEffortDeliver = raw.bestEffortDeliver;
mutated = true;
}
if (
typeof payload.provider !== "string" &&
typeof raw.provider === "string" &&
raw.provider.trim()
) {
payload.provider = raw.provider.trim();
mutated = true;
}
return mutated;
}
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
if ("model" in raw) {
delete raw.model;
}
if ("thinking" in raw) {
delete raw.thinking;
}
if ("timeoutSeconds" in raw) {
delete raw.timeoutSeconds;
}
if ("allowUnsafeExternalContent" in raw) {
delete raw.allowUnsafeExternalContent;
}
if ("message" in raw) {
delete raw.message;
}
if ("text" in raw) {
delete raw.text;
}
if ("deliver" in raw) {
delete raw.deliver;
}
if ("channel" in raw) {
delete raw.channel;
}
if ("to" in raw) {
delete raw.to;
}
if ("bestEffortDeliver" in raw) {
delete raw.bestEffortDeliver;
}
if ("provider" in raw) {
delete raw.provider;
}
if ("command" in raw) {
delete raw.command;
}
if ("timeout" in raw) {
delete raw.timeout;
}
}
export function normalizeStoredCronJobs(
jobs: Array<Record<string, unknown>>,
): NormalizeCronStoreJobsResult {
const issues: CronStoreIssues = {};
let mutated = false;
for (const raw of jobs) {
const jobIssues = new Set<CronStoreIssueKey>();
const trackIssue = (key: CronStoreIssueKey) => {
if (jobIssues.has(key)) {
return;
}
jobIssues.add(key);
incrementIssue(issues, key);
};
const state = raw.state;
if (!state || typeof state !== "object" || Array.isArray(state)) {
raw.state = {};
mutated = true;
}
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
if (!rawId && legacyJobId) {
raw.id = legacyJobId;
mutated = true;
trackIssue("jobId");
} else if (rawId && raw.id !== rawId) {
raw.id = rawId;
mutated = true;
}
if ("jobId" in raw) {
delete raw.jobId;
mutated = true;
trackIssue("jobId");
}
if (typeof raw.schedule === "string") {
const expr = raw.schedule.trim();
raw.schedule = { kind: "cron", expr };
mutated = true;
trackIssue("legacyScheduleString");
}
const nameRaw = raw.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
raw.name = inferLegacyName({
schedule: raw.schedule as never,
payload: raw.payload as never,
});
mutated = true;
} else {
raw.name = nameRaw.trim();
}
const desc = normalizeOptionalText(raw.description);
if (raw.description !== desc) {
raw.description = desc;
mutated = true;
}
if ("sessionKey" in raw) {
const sessionKey =
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
if (raw.sessionKey !== sessionKey) {
raw.sessionKey = sessionKey;
mutated = true;
}
}
if (typeof raw.enabled !== "boolean") {
raw.enabled = true;
mutated = true;
}
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
if (wakeModeRaw === "next-heartbeat") {
if (raw.wakeMode !== "next-heartbeat") {
raw.wakeMode = "next-heartbeat";
mutated = true;
}
} else if (wakeModeRaw === "now") {
if (raw.wakeMode !== "now") {
raw.wakeMode = "now";
mutated = true;
}
} else {
raw.wakeMode = "now";
mutated = true;
}
const payload = raw.payload;
if (
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
inferPayloadIfMissing(raw)
) {
mutated = true;
trackIssue("legacyTopLevelPayloadFields");
}
const payloadRecord =
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
? (raw.payload as Record<string, unknown>)
: null;
if (payloadRecord) {
if (normalizePayloadKind(payloadRecord)) {
mutated = true;
trackIssue("legacyPayloadKind");
}
if (!payloadRecord.kind) {
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
payloadRecord.kind = "agentTurn";
mutated = true;
trackIssue("legacyPayloadKind");
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
payloadRecord.kind = "systemEvent";
mutated = true;
trackIssue("legacyPayloadKind");
}
}
if (payloadRecord.kind === "agentTurn" && copyTopLevelAgentTurnFields(raw, payloadRecord)) {
mutated = true;
}
}
const hadLegacyTopLevelPayloadFields =
"model" in raw ||
"thinking" in raw ||
"timeoutSeconds" in raw ||
"allowUnsafeExternalContent" in raw ||
"message" in raw ||
"text" in raw ||
"command" in raw ||
"timeout" in raw;
const hadLegacyTopLevelDeliveryFields =
"deliver" in raw ||
"channel" in raw ||
"to" in raw ||
"bestEffortDeliver" in raw ||
"provider" in raw;
if (hadLegacyTopLevelPayloadFields || hadLegacyTopLevelDeliveryFields) {
stripLegacyTopLevelFields(raw);
mutated = true;
if (hadLegacyTopLevelPayloadFields) {
trackIssue("legacyTopLevelPayloadFields");
}
if (hadLegacyTopLevelDeliveryFields) {
trackIssue("legacyTopLevelDeliveryFields");
}
}
if (payloadRecord) {
const hadLegacyPayloadProvider =
typeof payloadRecord.provider === "string" && payloadRecord.provider.trim().length > 0;
if (migrateLegacyCronPayload(payloadRecord)) {
mutated = true;
if (hadLegacyPayloadProvider) {
trackIssue("legacyPayloadProvider");
}
}
}
const schedule = raw.schedule;
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
const sched = schedule as Record<string, unknown>;
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
if (!kind && ("at" in sched || "atMs" in sched)) {
sched.kind = "at";
mutated = true;
}
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
const atMsRaw = sched.atMs;
const parsedAtMs =
typeof atMsRaw === "number"
? atMsRaw
: typeof atMsRaw === "string"
? parseAbsoluteTimeMs(atMsRaw)
: atRaw
? parseAbsoluteTimeMs(atRaw)
: null;
if (parsedAtMs !== null) {
sched.at = new Date(parsedAtMs).toISOString();
if ("atMs" in sched) {
delete sched.atMs;
}
mutated = true;
}
const everyMsRaw = sched.everyMs;
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
if (everyMs !== null && everyMsRaw !== everyMs) {
sched.everyMs = everyMs;
mutated = true;
}
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
const anchorRaw = sched.anchorMs;
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
const normalizedAnchor =
anchorCoerced !== undefined
? Math.max(0, Math.floor(anchorCoerced))
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
? Math.max(0, Math.floor(raw.createdAtMs))
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
? Math.max(0, Math.floor(raw.updatedAtMs))
: null;
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
sched.anchorMs = normalizedAnchor;
mutated = true;
}
}
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
let normalizedExpr = exprRaw;
if (!normalizedExpr && legacyCronRaw) {
normalizedExpr = legacyCronRaw;
sched.expr = normalizedExpr;
mutated = true;
trackIssue("legacyScheduleCron");
}
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
sched.expr = normalizedExpr;
mutated = true;
}
if ("cron" in sched) {
delete sched.cron;
mutated = true;
trackIssue("legacyScheduleCron");
}
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
if (targetStaggerMs === undefined) {
if ("staggerMs" in sched) {
delete sched.staggerMs;
mutated = true;
}
} else if (sched.staggerMs !== targetStaggerMs) {
sched.staggerMs = targetStaggerMs;
mutated = true;
}
}
}
const delivery = raw.delivery;
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
const modeRaw = (delivery as { mode?: unknown }).mode;
if (typeof modeRaw === "string") {
const lowered = modeRaw.trim().toLowerCase();
if (lowered === "deliver") {
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
trackIssue("legacyDeliveryMode");
}
} else if (modeRaw === undefined || modeRaw === null) {
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
}
const isolation = raw.isolation;
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
delete raw.isolation;
mutated = true;
}
const payloadKind =
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
const normalizedSessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
if (raw.sessionTarget !== normalizedSessionTarget) {
raw.sessionTarget = normalizedSessionTarget;
mutated = true;
}
} else {
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
if (raw.sessionTarget !== inferredSessionTarget) {
raw.sessionTarget = inferredSessionTarget;
mutated = true;
}
}
const sessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
const isIsolatedAgentTurn =
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
const normalizedLegacy = normalizeLegacyDeliveryInput({
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
payload: payloadRecord,
});
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
if (!hasDelivery && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
} else if (!hasDelivery) {
raw.delivery = { mode: "announce" };
mutated = true;
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
}
return { issues, jobs, mutated };
}

View File

@@ -18,6 +18,10 @@ describe("method scope resolution", () => {
expect(resolveLeastPrivilegeOperatorScopesForMethod("poll")).toEqual(["operator.write"]);
});
it("leaves node-only pending drain outside operator scopes", () => {
expect(resolveLeastPrivilegeOperatorScopesForMethod("node.pending.drain")).toEqual([]);
});
it("returns empty scopes for unknown methods", () => {
expect(resolveLeastPrivilegeOperatorScopesForMethod("totally.unknown.method")).toEqual([]);
});

View File

@@ -22,6 +22,7 @@ export const CLI_DEFAULT_OPERATOR_SCOPES: OperatorScope[] = [
const NODE_ROLE_METHODS = new Set([
"node.invoke.result",
"node.event",
"node.pending.drain",
"node.canvas.capability.refresh",
"node.pending.pull",
"node.pending.ack",
@@ -102,6 +103,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
"chat.abort",
"browser.request",
"push.test",
"node.pending.enqueue",
],
[ADMIN_SCOPE]: [
"channels.logout",

View File

@@ -0,0 +1,67 @@
import { describe, expect, it, beforeEach } from "vitest";
import {
acknowledgeNodePendingWork,
drainNodePendingWork,
enqueueNodePendingWork,
getNodePendingWorkStateCountForTests,
resetNodePendingWorkForTests,
} from "./node-pending-work.js";
describe("node pending work", () => {
beforeEach(() => {
resetNodePendingWorkForTests();
});
it("returns a baseline status request even when no explicit work is queued", () => {
const drained = drainNodePendingWork("node-1");
expect(drained.items).toEqual([
expect.objectContaining({
id: "baseline-status",
type: "status.request",
priority: "default",
}),
]);
expect(drained.hasMore).toBe(false);
});
it("dedupes explicit work by type and removes acknowledged items", () => {
const first = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
const second = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
expect(first.deduped).toBe(false);
expect(second.deduped).toBe(true);
expect(second.item.id).toBe(first.item.id);
const drained = drainNodePendingWork("node-2");
expect(drained.items.map((item) => item.type)).toEqual(["location.request", "status.request"]);
const acked = acknowledgeNodePendingWork({
nodeId: "node-2",
itemIds: [first.item.id, "baseline-status"],
});
expect(acked.removedItemIds).toEqual([first.item.id]);
const afterAck = drainNodePendingWork("node-2");
expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]);
});
it("keeps hasMore true when the baseline status item is deferred by maxItems", () => {
enqueueNodePendingWork({ nodeId: "node-3", type: "location.request" });
const drained = drainNodePendingWork("node-3", { maxItems: 1 });
expect(drained.items.map((item) => item.type)).toEqual(["location.request"]);
expect(drained.hasMore).toBe(true);
});
it("does not allocate state for drain-only nodes with no queued work", () => {
expect(getNodePendingWorkStateCountForTests()).toBe(0);
const drained = drainNodePendingWork("node-4");
const acked = acknowledgeNodePendingWork({ nodeId: "node-4", itemIds: ["baseline-status"] });
expect(drained.items.map((item) => item.id)).toEqual(["baseline-status"]);
expect(acked).toEqual({ revision: 0, removedItemIds: [] });
expect(getNodePendingWorkStateCountForTests()).toBe(0);
});
});

View File

@@ -0,0 +1,193 @@
import { randomUUID } from "node:crypto";
export const NODE_PENDING_WORK_TYPES = ["status.request", "location.request"] as const;
export type NodePendingWorkType = (typeof NODE_PENDING_WORK_TYPES)[number];
export const NODE_PENDING_WORK_PRIORITIES = ["default", "normal", "high"] as const;
export type NodePendingWorkPriority = (typeof NODE_PENDING_WORK_PRIORITIES)[number];
export type NodePendingWorkItem = {
id: string;
type: NodePendingWorkType;
priority: NodePendingWorkPriority;
createdAtMs: number;
expiresAtMs: number | null;
payload?: Record<string, unknown>;
};
type NodePendingWorkState = {
revision: number;
itemsById: Map<string, NodePendingWorkItem>;
};
type DrainOptions = {
maxItems?: number;
includeDefaultStatus?: boolean;
nowMs?: number;
};
type DrainResult = {
revision: number;
items: NodePendingWorkItem[];
hasMore: boolean;
};
const DEFAULT_STATUS_ITEM_ID = "baseline-status";
const DEFAULT_STATUS_PRIORITY: NodePendingWorkPriority = "default";
const DEFAULT_PRIORITY: NodePendingWorkPriority = "normal";
const DEFAULT_MAX_ITEMS = 4;
const MAX_ITEMS = 10;
const PRIORITY_RANK: Record<NodePendingWorkPriority, number> = {
high: 3,
normal: 2,
default: 1,
};
const stateByNodeId = new Map<string, NodePendingWorkState>();
function getOrCreateState(nodeId: string): NodePendingWorkState {
let state = stateByNodeId.get(nodeId);
if (!state) {
state = {
revision: 0,
itemsById: new Map(),
};
stateByNodeId.set(nodeId, state);
}
return state;
}
function pruneExpired(state: NodePendingWorkState, nowMs: number): boolean {
let changed = false;
for (const [id, item] of state.itemsById) {
if (item.expiresAtMs !== null && item.expiresAtMs <= nowMs) {
state.itemsById.delete(id);
changed = true;
}
}
if (changed) {
state.revision += 1;
}
return changed;
}
function sortedItems(state: NodePendingWorkState): NodePendingWorkItem[] {
return [...state.itemsById.values()].toSorted((a, b) => {
const priorityDelta = PRIORITY_RANK[b.priority] - PRIORITY_RANK[a.priority];
if (priorityDelta !== 0) {
return priorityDelta;
}
if (a.createdAtMs !== b.createdAtMs) {
return a.createdAtMs - b.createdAtMs;
}
return a.id.localeCompare(b.id);
});
}
function makeBaselineStatusItem(nowMs: number): NodePendingWorkItem {
return {
id: DEFAULT_STATUS_ITEM_ID,
type: "status.request",
priority: DEFAULT_STATUS_PRIORITY,
createdAtMs: nowMs,
expiresAtMs: null,
};
}
export function enqueueNodePendingWork(params: {
nodeId: string;
type: NodePendingWorkType;
priority?: NodePendingWorkPriority;
expiresInMs?: number;
payload?: Record<string, unknown>;
}): { revision: number; item: NodePendingWorkItem; deduped: boolean } {
const nodeId = params.nodeId.trim();
if (!nodeId) {
throw new Error("nodeId required");
}
const nowMs = Date.now();
const state = getOrCreateState(nodeId);
pruneExpired(state, nowMs);
const existing = [...state.itemsById.values()].find((item) => item.type === params.type);
if (existing) {
return { revision: state.revision, item: existing, deduped: true };
}
const item: NodePendingWorkItem = {
id: randomUUID(),
type: params.type,
priority: params.priority ?? DEFAULT_PRIORITY,
createdAtMs: nowMs,
expiresAtMs:
typeof params.expiresInMs === "number" && Number.isFinite(params.expiresInMs)
? nowMs + Math.max(1_000, Math.trunc(params.expiresInMs))
: null,
...(params.payload ? { payload: params.payload } : {}),
};
state.itemsById.set(item.id, item);
state.revision += 1;
return { revision: state.revision, item, deduped: false };
}
export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): DrainResult {
const normalizedNodeId = nodeId.trim();
if (!normalizedNodeId) {
return { revision: 0, items: [], hasMore: false };
}
const nowMs = opts.nowMs ?? Date.now();
const state = stateByNodeId.get(normalizedNodeId);
const revision = state?.revision ?? 0;
if (state) {
pruneExpired(state, nowMs);
}
const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS)));
const explicitItems = state ? sortedItems(state) : [];
const items = explicitItems.slice(0, maxItems);
const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request");
const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus;
if (includeBaseline && items.length < maxItems) {
items.push(makeBaselineStatusItem(nowMs));
}
const explicitReturnedCount = items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length;
const baselineIncluded = items.some((item) => item.id === DEFAULT_STATUS_ITEM_ID);
return {
revision,
items,
hasMore: explicitItems.length > explicitReturnedCount || (includeBaseline && !baselineIncluded),
};
}
export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: string[] }): {
revision: number;
removedItemIds: string[];
} {
const nodeId = params.nodeId.trim();
if (!nodeId) {
return { revision: 0, removedItemIds: [] };
}
const state = stateByNodeId.get(nodeId);
if (!state) {
return { revision: 0, removedItemIds: [] };
}
const removedItemIds: string[] = [];
for (const itemId of params.itemIds) {
const trimmedId = itemId.trim();
if (!trimmedId || trimmedId === DEFAULT_STATUS_ITEM_ID) {
continue;
}
if (state.itemsById.delete(trimmedId)) {
removedItemIds.push(trimmedId);
}
}
if (removedItemIds.length > 0) {
state.revision += 1;
}
return { revision: state.revision, removedItemIds };
}
export function resetNodePendingWorkForTests() {
stateByNodeId.clear();
}
export function getNodePendingWorkStateCountForTests(): number {
return stateByNodeId.size;
}

View File

@@ -140,6 +140,14 @@ import {
NodeDescribeParamsSchema,
type NodeEventParams,
NodeEventParamsSchema,
type NodePendingDrainParams,
NodePendingDrainParamsSchema,
type NodePendingDrainResult,
NodePendingDrainResultSchema,
type NodePendingEnqueueParams,
NodePendingEnqueueParamsSchema,
type NodePendingEnqueueResult,
NodePendingEnqueueResultSchema,
type NodeInvokeParams,
NodeInvokeParamsSchema,
type NodeInvokeResultParams,
@@ -296,6 +304,12 @@ export const validateNodeInvokeResultParams = ajv.compile<NodeInvokeResultParams
NodeInvokeResultParamsSchema,
);
export const validateNodeEventParams = ajv.compile<NodeEventParams>(NodeEventParamsSchema);
export const validateNodePendingDrainParams = ajv.compile<NodePendingDrainParams>(
NodePendingDrainParamsSchema,
);
export const validateNodePendingEnqueueParams = ajv.compile<NodePendingEnqueueParams>(
NodePendingEnqueueParamsSchema,
);
export const validatePushTestParams = ajv.compile<PushTestParams>(PushTestParamsSchema);
export const validateSecretsResolveParams = ajv.compile<SecretsResolveParams>(
SecretsResolveParamsSchema,
@@ -472,6 +486,10 @@ export {
NodeListParamsSchema,
NodePendingAckParamsSchema,
NodeInvokeParamsSchema,
NodePendingDrainParamsSchema,
NodePendingDrainResultSchema,
NodePendingEnqueueParamsSchema,
NodePendingEnqueueResultSchema,
SessionsListParamsSchema,
SessionsPreviewParamsSchema,
SessionsPatchParamsSchema,
@@ -621,6 +639,10 @@ export type {
NodeInvokeParams,
NodeInvokeResultParams,
NodeEventParams,
NodePendingDrainParams,
NodePendingDrainResult,
NodePendingEnqueueParams,
NodePendingEnqueueResult,
SessionsListParams,
SessionsPreviewParams,
SessionsResolveParams,

View File

@@ -1,6 +1,14 @@
import { Type } from "@sinclair/typebox";
import { NonEmptyString } from "./primitives.js";
const NodePendingWorkTypeSchema = Type.String({
enum: ["status.request", "location.request"],
});
const NodePendingWorkPrioritySchema = Type.String({
enum: ["normal", "high"],
});
export const NodePairRequestParamsSchema = Type.Object(
{
nodeId: NonEmptyString,
@@ -95,6 +103,56 @@ export const NodeEventParamsSchema = Type.Object(
{ additionalProperties: false },
);
export const NodePendingDrainParamsSchema = Type.Object(
{
maxItems: Type.Optional(Type.Integer({ minimum: 1, maximum: 10 })),
},
{ additionalProperties: false },
);
export const NodePendingDrainItemSchema = Type.Object(
{
id: NonEmptyString,
type: NodePendingWorkTypeSchema,
priority: Type.String({ enum: ["default", "normal", "high"] }),
createdAtMs: Type.Integer({ minimum: 0 }),
expiresAtMs: Type.Optional(Type.Union([Type.Integer({ minimum: 0 }), Type.Null()])),
payload: Type.Optional(Type.Record(Type.String(), Type.Unknown())),
},
{ additionalProperties: false },
);
export const NodePendingDrainResultSchema = Type.Object(
{
nodeId: NonEmptyString,
revision: Type.Integer({ minimum: 0 }),
items: Type.Array(NodePendingDrainItemSchema),
hasMore: Type.Boolean(),
},
{ additionalProperties: false },
);
export const NodePendingEnqueueParamsSchema = Type.Object(
{
nodeId: NonEmptyString,
type: NodePendingWorkTypeSchema,
priority: Type.Optional(NodePendingWorkPrioritySchema),
expiresInMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 86_400_000 })),
wake: Type.Optional(Type.Boolean()),
},
{ additionalProperties: false },
);
export const NodePendingEnqueueResultSchema = Type.Object(
{
nodeId: NonEmptyString,
revision: Type.Integer({ minimum: 0 }),
queued: NodePendingDrainItemSchema,
wakeTriggered: Type.Boolean(),
},
{ additionalProperties: false },
);
export const NodeInvokeRequestEventSchema = Type.Object(
{
id: NonEmptyString,

View File

@@ -114,6 +114,10 @@ import {
import {
NodeDescribeParamsSchema,
NodeEventParamsSchema,
NodePendingDrainParamsSchema,
NodePendingDrainResultSchema,
NodePendingEnqueueParamsSchema,
NodePendingEnqueueResultSchema,
NodeInvokeParamsSchema,
NodeInvokeResultParamsSchema,
NodeInvokeRequestEventSchema,
@@ -186,6 +190,10 @@ export const ProtocolSchemas = {
NodeInvokeParams: NodeInvokeParamsSchema,
NodeInvokeResultParams: NodeInvokeResultParamsSchema,
NodeEventParams: NodeEventParamsSchema,
NodePendingDrainParams: NodePendingDrainParamsSchema,
NodePendingDrainResult: NodePendingDrainResultSchema,
NodePendingEnqueueParams: NodePendingEnqueueParamsSchema,
NodePendingEnqueueResult: NodePendingEnqueueResultSchema,
NodeInvokeRequestEvent: NodeInvokeRequestEventSchema,
PushTestParams: PushTestParamsSchema,
PushTestResult: PushTestResultSchema,

View File

@@ -32,6 +32,10 @@ export type NodeDescribeParams = SchemaType<"NodeDescribeParams">;
export type NodeInvokeParams = SchemaType<"NodeInvokeParams">;
export type NodeInvokeResultParams = SchemaType<"NodeInvokeResultParams">;
export type NodeEventParams = SchemaType<"NodeEventParams">;
export type NodePendingDrainParams = SchemaType<"NodePendingDrainParams">;
export type NodePendingDrainResult = SchemaType<"NodePendingDrainResult">;
export type NodePendingEnqueueParams = SchemaType<"NodePendingEnqueueParams">;
export type NodePendingEnqueueResult = SchemaType<"NodePendingEnqueueResult">;
export type PushTestParams = SchemaType<"PushTestParams">;
export type PushTestResult = SchemaType<"PushTestResult">;
export type SessionsListParams = SchemaType<"SessionsListParams">;

View File

@@ -21,8 +21,10 @@ describe("gateway role policy", () => {
test("authorizes roles against node vs operator methods", () => {
expect(isRoleAuthorizedForMethod("node", "node.event")).toBe(true);
expect(isRoleAuthorizedForMethod("node", "node.pending.drain")).toBe(true);
expect(isRoleAuthorizedForMethod("node", "status")).toBe(false);
expect(isRoleAuthorizedForMethod("operator", "status")).toBe(true);
expect(isRoleAuthorizedForMethod("operator", "node.pending.drain")).toBe(false);
expect(isRoleAuthorizedForMethod("operator", "node.event")).toBe(false);
});
});

View File

@@ -76,6 +76,8 @@ const BASE_METHODS = [
"node.rename",
"node.list",
"node.describe",
"node.pending.drain",
"node.pending.enqueue",
"node.invoke",
"node.pending.pull",
"node.pending.ack",

View File

@@ -18,6 +18,7 @@ import { execApprovalsHandlers } from "./server-methods/exec-approvals.js";
import { healthHandlers } from "./server-methods/health.js";
import { logsHandlers } from "./server-methods/logs.js";
import { modelsHandlers } from "./server-methods/models.js";
import { nodePendingHandlers } from "./server-methods/nodes-pending.js";
import { nodeHandlers } from "./server-methods/nodes.js";
import { pushHandlers } from "./server-methods/push.js";
import { sendHandlers } from "./server-methods/send.js";
@@ -87,6 +88,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = {
...systemHandlers,
...updateHandlers,
...nodeHandlers,
...nodePendingHandlers,
...pushHandlers,
...sendHandlers,
...usageHandlers,

View File

@@ -0,0 +1,177 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { nodePendingHandlers } from "./nodes-pending.js";
const mocks = vi.hoisted(() => ({
drainNodePendingWork: vi.fn(),
enqueueNodePendingWork: vi.fn(),
maybeWakeNodeWithApns: vi.fn(),
maybeSendNodeWakeNudge: vi.fn(),
waitForNodeReconnect: vi.fn(),
}));
vi.mock("../node-pending-work.js", () => ({
drainNodePendingWork: mocks.drainNodePendingWork,
enqueueNodePendingWork: mocks.enqueueNodePendingWork,
}));
vi.mock("./nodes.js", () => ({
NODE_WAKE_RECONNECT_WAIT_MS: 3_000,
NODE_WAKE_RECONNECT_RETRY_WAIT_MS: 12_000,
maybeWakeNodeWithApns: mocks.maybeWakeNodeWithApns,
maybeSendNodeWakeNudge: mocks.maybeSendNodeWakeNudge,
waitForNodeReconnect: mocks.waitForNodeReconnect,
}));
type RespondCall = [
boolean,
unknown?,
{
code?: number;
message?: string;
details?: unknown;
}?,
];
function makeContext(overrides?: Partial<Record<string, unknown>>) {
return {
nodeRegistry: {
get: vi.fn(() => undefined),
},
logGateway: {
info: vi.fn(),
warn: vi.fn(),
},
...overrides,
};
}
describe("node.pending handlers", () => {
beforeEach(() => {
mocks.drainNodePendingWork.mockReset();
mocks.enqueueNodePendingWork.mockReset();
mocks.maybeWakeNodeWithApns.mockReset();
mocks.maybeSendNodeWakeNudge.mockReset();
mocks.waitForNodeReconnect.mockReset();
});
it("drains pending work for the connected node identity", async () => {
mocks.drainNodePendingWork.mockReturnValue({
revision: 2,
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
hasMore: false,
});
const respond = vi.fn();
await nodePendingHandlers["node.pending.drain"]({
params: { maxItems: 3 },
respond: respond as never,
client: { connect: { device: { id: "ios-node-1" } } } as never,
context: makeContext() as never,
req: { type: "req", id: "req-node-pending-drain", method: "node.pending.drain" },
isWebchatConnect: () => false,
});
expect(mocks.drainNodePendingWork).toHaveBeenCalledWith("ios-node-1", {
maxItems: 3,
includeDefaultStatus: true,
});
expect(respond).toHaveBeenCalledWith(
true,
{
nodeId: "ios-node-1",
revision: 2,
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
hasMore: false,
},
undefined,
);
});
it("rejects node.pending.drain without a connected device identity", async () => {
const respond = vi.fn();
await nodePendingHandlers["node.pending.drain"]({
params: {},
respond: respond as never,
client: null,
context: makeContext() as never,
req: { type: "req", id: "req-node-pending-drain-missing", method: "node.pending.drain" },
isWebchatConnect: () => false,
});
const call = respond.mock.calls[0] as RespondCall | undefined;
expect(call?.[0]).toBe(false);
expect(call?.[2]?.message).toContain("connected device identity");
});
it("enqueues pending work and wakes a disconnected node once", async () => {
mocks.enqueueNodePendingWork.mockReturnValue({
revision: 4,
deduped: false,
item: {
id: "pending-1",
type: "location.request",
priority: "high",
createdAtMs: 100,
expiresAtMs: null,
},
});
mocks.maybeWakeNodeWithApns.mockResolvedValue({
available: true,
throttled: false,
path: "apns",
durationMs: 12,
apnsStatus: 200,
apnsReason: null,
});
let connected = false;
mocks.waitForNodeReconnect.mockImplementation(async () => {
connected = true;
return true;
});
const context = makeContext({
nodeRegistry: {
get: vi.fn(() => (connected ? { nodeId: "ios-node-2" } : undefined)),
},
});
const respond = vi.fn();
await nodePendingHandlers["node.pending.enqueue"]({
params: {
nodeId: "ios-node-2",
type: "location.request",
priority: "high",
},
respond: respond as never,
client: null,
context: context as never,
req: { type: "req", id: "req-node-pending-enqueue", method: "node.pending.enqueue" },
isWebchatConnect: () => false,
});
expect(mocks.enqueueNodePendingWork).toHaveBeenCalledWith({
nodeId: "ios-node-2",
type: "location.request",
priority: "high",
expiresInMs: undefined,
});
expect(mocks.maybeWakeNodeWithApns).toHaveBeenCalledWith("ios-node-2", {
wakeReason: "node.pending",
});
expect(mocks.waitForNodeReconnect).toHaveBeenCalledWith({
nodeId: "ios-node-2",
context,
timeoutMs: 3_000,
});
expect(mocks.maybeSendNodeWakeNudge).not.toHaveBeenCalled();
expect(respond).toHaveBeenCalledWith(
true,
expect.objectContaining({
nodeId: "ios-node-2",
revision: 4,
wakeTriggered: true,
}),
undefined,
);
});
});

View File

@@ -0,0 +1,159 @@
import {
drainNodePendingWork,
enqueueNodePendingWork,
type NodePendingWorkPriority,
type NodePendingWorkType,
} from "../node-pending-work.js";
import {
ErrorCodes,
errorShape,
validateNodePendingDrainParams,
validateNodePendingEnqueueParams,
} from "../protocol/index.js";
import { respondInvalidParams, respondUnavailableOnThrow } from "./nodes.helpers.js";
import {
maybeSendNodeWakeNudge,
maybeWakeNodeWithApns,
NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
NODE_WAKE_RECONNECT_WAIT_MS,
waitForNodeReconnect,
} from "./nodes.js";
import type { GatewayRequestHandlers } from "./types.js";
function resolveClientNodeId(
client: { connect?: { device?: { id?: string }; client?: { id?: string } } } | null,
): string | null {
const nodeId = client?.connect?.device?.id ?? client?.connect?.client?.id ?? "";
const trimmed = nodeId.trim();
return trimmed.length > 0 ? trimmed : null;
}
export const nodePendingHandlers: GatewayRequestHandlers = {
"node.pending.drain": async ({ params, respond, client }) => {
if (!validateNodePendingDrainParams(params)) {
respondInvalidParams({
respond,
method: "node.pending.drain",
validator: validateNodePendingDrainParams,
});
return;
}
const nodeId = resolveClientNodeId(client);
if (!nodeId) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"node.pending.drain requires a connected device identity",
),
);
return;
}
const p = params as { maxItems?: number };
const drained = drainNodePendingWork(nodeId, {
maxItems: p.maxItems,
includeDefaultStatus: true,
});
respond(true, { nodeId, ...drained }, undefined);
},
"node.pending.enqueue": async ({ params, respond, context }) => {
if (!validateNodePendingEnqueueParams(params)) {
respondInvalidParams({
respond,
method: "node.pending.enqueue",
validator: validateNodePendingEnqueueParams,
});
return;
}
const p = params as {
nodeId: string;
type: NodePendingWorkType;
priority?: NodePendingWorkPriority;
expiresInMs?: number;
wake?: boolean;
};
await respondUnavailableOnThrow(respond, async () => {
const queued = enqueueNodePendingWork({
nodeId: p.nodeId,
type: p.type,
priority: p.priority,
expiresInMs: p.expiresInMs,
});
let wakeTriggered = false;
if (p.wake !== false && !queued.deduped && !context.nodeRegistry.get(p.nodeId)) {
const wakeReqId = queued.item.id;
context.logGateway.info(
`node pending wake start node=${p.nodeId} req=${wakeReqId} type=${queued.item.type}`,
);
const wake = await maybeWakeNodeWithApns(p.nodeId, { wakeReason: "node.pending" });
context.logGateway.info(
`node pending wake stage=wake1 node=${p.nodeId} req=${wakeReqId} ` +
`available=${wake.available} throttled=${wake.throttled} ` +
`path=${wake.path} durationMs=${wake.durationMs} ` +
`apnsStatus=${wake.apnsStatus ?? -1} apnsReason=${wake.apnsReason ?? "-"}`,
);
wakeTriggered = wake.available;
if (wake.available) {
const reconnected = await waitForNodeReconnect({
nodeId: p.nodeId,
context,
timeoutMs: NODE_WAKE_RECONNECT_WAIT_MS,
});
context.logGateway.info(
`node pending wake stage=wait1 node=${p.nodeId} req=${wakeReqId} ` +
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_WAIT_MS}`,
);
}
if (!context.nodeRegistry.get(p.nodeId) && wake.available) {
const retryWake = await maybeWakeNodeWithApns(p.nodeId, {
force: true,
wakeReason: "node.pending",
});
context.logGateway.info(
`node pending wake stage=wake2 node=${p.nodeId} req=${wakeReqId} force=true ` +
`available=${retryWake.available} throttled=${retryWake.throttled} ` +
`path=${retryWake.path} durationMs=${retryWake.durationMs} ` +
`apnsStatus=${retryWake.apnsStatus ?? -1} apnsReason=${retryWake.apnsReason ?? "-"}`,
);
if (retryWake.available) {
const reconnected = await waitForNodeReconnect({
nodeId: p.nodeId,
context,
timeoutMs: NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
});
context.logGateway.info(
`node pending wake stage=wait2 node=${p.nodeId} req=${wakeReqId} ` +
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_RETRY_WAIT_MS}`,
);
}
}
if (!context.nodeRegistry.get(p.nodeId)) {
const nudge = await maybeSendNodeWakeNudge(p.nodeId);
context.logGateway.info(
`node pending wake nudge node=${p.nodeId} req=${wakeReqId} sent=${nudge.sent} ` +
`throttled=${nudge.throttled} reason=${nudge.reason} durationMs=${nudge.durationMs} ` +
`apnsStatus=${nudge.apnsStatus ?? -1} apnsReason=${nudge.apnsReason ?? "-"}`,
);
context.logGateway.warn(
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=false reason=not_connected`,
);
} else {
context.logGateway.info(
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=true`,
);
}
}
respond(
true,
{
nodeId: p.nodeId,
revision: queued.revision,
queued: queued.item,
wakeTriggered,
},
undefined,
);
});
},
};

View File

@@ -47,9 +47,9 @@ import {
} from "./nodes.helpers.js";
import type { GatewayRequestHandlers } from "./types.js";
const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
const NODE_WAKE_RECONNECT_POLL_MS = 150;
export const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
export const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
export const NODE_WAKE_RECONNECT_POLL_MS = 150;
const NODE_WAKE_THROTTLE_MS = 15_000;
const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000;
const NODE_PENDING_ACTION_TTL_MS = 10 * 60_000;
@@ -208,9 +208,9 @@ function toPendingParamsJSON(params: unknown): string | undefined {
}
}
async function maybeWakeNodeWithApns(
export async function maybeWakeNodeWithApns(
nodeId: string,
opts?: { force?: boolean },
opts?: { force?: boolean; wakeReason?: string },
): Promise<NodeWakeAttempt> {
const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 };
nodeWakeById.set(nodeId, state);
@@ -253,7 +253,7 @@ async function maybeWakeNodeWithApns(
auth: auth.value,
registration,
nodeId,
wakeReason: "node.invoke",
wakeReason: opts?.wakeReason ?? "node.invoke",
});
if (!wakeResult.ok) {
return withDuration({
@@ -298,7 +298,7 @@ async function maybeWakeNodeWithApns(
}
}
async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
export async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
const startedAtMs = Date.now();
const withDuration = (
attempt: Omit<NodeWakeNudgeAttempt, "durationMs">,
@@ -362,7 +362,7 @@ async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAtte
}
}
async function waitForNodeReconnect(params: {
export async function waitForNodeReconnect(params: {
nodeId: string;
context: { nodeRegistry: { get: (nodeId: string) => unknown } };
timeoutMs?: number;

View File

@@ -848,6 +848,32 @@ describe("gateway server cron", () => {
'Cron job "failure destination webhook" failed: unknown error',
);
fetchWithSsrFGuardMock.mockClear();
cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "best-effort failed" });
const bestEffortFailureDestJobId = await addWebhookCronJob({
ws,
name: "best effort failure destination webhook",
sessionTarget: "isolated",
delivery: {
mode: "announce",
channel: "telegram",
to: "19098680",
bestEffort: true,
failureDestination: {
mode: "webhook",
to: "https://example.invalid/failure-destination",
},
},
});
const bestEffortFailureDestFinished = waitForCronEvent(
ws,
(payload) =>
payload?.jobId === bestEffortFailureDestJobId && payload?.action === "finished",
);
await runCronJobForce(ws, bestEffortFailureDestJobId);
await bestEffortFailureDestFinished;
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
cronIsolatedRun.mockResolvedValueOnce({ status: "ok", summary: "" });
const noSummaryJobId = await addWebhookCronJob({
ws,
@@ -861,7 +887,7 @@ describe("gateway server cron", () => {
);
await runCronJobForce(ws, noSummaryJobId);
await noSummaryFinished;
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1);
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
} finally {
await cleanupCronTestRun({ ws, server, prevSkipCron });
}

View File

@@ -75,6 +75,10 @@ describe("gateway server hooks", () => {
expect(resAgent.status).toBe(200);
const agentEvents = await waitForSystemEvent();
expect(agentEvents.some((e) => e.includes("Hook Email: done"))).toBe(true);
const firstCall = (cronIsolatedRun.mock.calls[0] as unknown[] | undefined)?.[0] as {
deliveryContract?: string;
};
expect(firstCall?.deliveryContract).toBe("shared");
drainSystemEvents(resolveMainKey());
mockIsolatedRunOkOnce();

View File

@@ -76,6 +76,7 @@ export function createGatewayHooksRequestHandler(params: {
message: value.message,
sessionKey,
lane: "cron",
deliveryContract: "shared",
});
const summary = result.summary?.trim() || result.error?.trim() || result.status;
const prefix =

View File

@@ -0,0 +1,49 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { createMockPluginRegistry } from "./hooks.test-helpers.js";
async function importHookRunnerGlobalModule() {
return import("./hook-runner-global.js");
}
afterEach(async () => {
const mod = await importHookRunnerGlobalModule();
mod.resetGlobalHookRunner();
vi.resetModules();
});
describe("hook-runner-global", () => {
it("preserves the initialized runner across module reloads", async () => {
const modA = await importHookRunnerGlobalModule();
const registry = createMockPluginRegistry([{ hookName: "message_received", handler: vi.fn() }]);
modA.initializeGlobalHookRunner(registry);
expect(modA.getGlobalHookRunner()?.hasHooks("message_received")).toBe(true);
vi.resetModules();
const modB = await importHookRunnerGlobalModule();
expect(modB.getGlobalHookRunner()).not.toBeNull();
expect(modB.getGlobalHookRunner()?.hasHooks("message_received")).toBe(true);
expect(modB.getGlobalPluginRegistry()).toBe(registry);
});
it("clears the shared state across module reloads", async () => {
const modA = await importHookRunnerGlobalModule();
const registry = createMockPluginRegistry([{ hookName: "message_received", handler: vi.fn() }]);
modA.initializeGlobalHookRunner(registry);
vi.resetModules();
const modB = await importHookRunnerGlobalModule();
modB.resetGlobalHookRunner();
expect(modB.getGlobalHookRunner()).toBeNull();
expect(modB.getGlobalPluginRegistry()).toBeNull();
vi.resetModules();
const modC = await importHookRunnerGlobalModule();
expect(modC.getGlobalHookRunner()).toBeNull();
expect(modC.getGlobalPluginRegistry()).toBeNull();
});
});

View File

@@ -12,16 +12,31 @@ import type { PluginHookGatewayContext, PluginHookGatewayStopEvent } from "./typ
const log = createSubsystemLogger("plugins");
let globalHookRunner: HookRunner | null = null;
let globalRegistry: PluginRegistry | null = null;
type HookRunnerGlobalState = {
hookRunner: HookRunner | null;
registry: PluginRegistry | null;
};
const hookRunnerGlobalStateKey = Symbol.for("openclaw.plugins.hook-runner-global-state");
function getHookRunnerGlobalState(): HookRunnerGlobalState {
const globalStore = globalThis as typeof globalThis & {
[hookRunnerGlobalStateKey]?: HookRunnerGlobalState;
};
return (globalStore[hookRunnerGlobalStateKey] ??= {
hookRunner: null,
registry: null,
});
}
/**
* Initialize the global hook runner with a plugin registry.
* Called once when plugins are loaded during gateway startup.
*/
export function initializeGlobalHookRunner(registry: PluginRegistry): void {
globalRegistry = registry;
globalHookRunner = createHookRunner(registry, {
const state = getHookRunnerGlobalState();
state.registry = registry;
state.hookRunner = createHookRunner(registry, {
logger: {
debug: (msg) => log.debug(msg),
warn: (msg) => log.warn(msg),
@@ -41,7 +56,7 @@ export function initializeGlobalHookRunner(registry: PluginRegistry): void {
* Returns null if plugins haven't been loaded yet.
*/
export function getGlobalHookRunner(): HookRunner | null {
return globalHookRunner;
return getHookRunnerGlobalState().hookRunner;
}
/**
@@ -49,14 +64,14 @@ export function getGlobalHookRunner(): HookRunner | null {
* Returns null if plugins haven't been loaded yet.
*/
export function getGlobalPluginRegistry(): PluginRegistry | null {
return globalRegistry;
return getHookRunnerGlobalState().registry;
}
/**
* Check if any hooks are registered for a given hook name.
*/
export function hasGlobalHooks(hookName: Parameters<HookRunner["hasHooks"]>[0]): boolean {
return globalHookRunner?.hasHooks(hookName) ?? false;
return getHookRunnerGlobalState().hookRunner?.hasHooks(hookName) ?? false;
}
export async function runGlobalGatewayStopSafely(params: {
@@ -83,6 +98,7 @@ export async function runGlobalGatewayStopSafely(params: {
* Reset the global hook runner (for testing).
*/
export function resetGlobalHookRunner(): void {
globalHookRunner = null;
globalRegistry = null;
const state = getHookRunnerGlobalState();
state.hookRunner = null;
state.registry = null;
}

View File

@@ -270,58 +270,4 @@ describe("registerTelegramNativeCommands", () => {
);
expect(sendMessage).not.toHaveBeenCalledWith(123, "Command not found.");
});
it("uses the DM thread session key for plugin command internal sent hooks", async () => {
const commandHandlers = new Map<string, (ctx: unknown) => Promise<void>>();
pluginCommandMocks.getPluginCommandSpecs.mockReturnValue([
{
name: "plug",
description: "Plugin command",
},
] as never);
pluginCommandMocks.matchPluginCommand.mockReturnValue({
command: { key: "plug", requireAuth: false },
args: undefined,
} as never);
registerTelegramNativeCommands({
...buildParams({
channels: {
telegram: {
dmPolicy: "open",
},
},
}),
bot: {
api: {
setMyCommands: vi.fn().mockResolvedValue(undefined),
sendMessage: vi.fn().mockResolvedValue(undefined),
},
command: vi.fn((name: string, cb: (ctx: unknown) => Promise<void>) => {
commandHandlers.set(name, cb);
}),
} as unknown as Parameters<typeof registerTelegramNativeCommands>[0]["bot"],
});
const handler = commandHandlers.get("plug");
expect(handler).toBeTruthy();
await handler?.({
match: "",
message: {
message_id: 42,
date: Math.floor(Date.now() / 1000),
chat: { id: 12345, type: "private" },
from: { id: 12345, username: "alice" },
message_thread_id: 99,
},
});
expect(deliveryMocks.deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
sessionKeyForInternalHooks: "agent:main:main:thread:12345:99",
}),
);
});
});

View File

@@ -540,20 +540,6 @@ export const registerTelegramNativeCommands = ({
chunkMode: params.chunkMode,
linkPreview: telegramCfg.linkPreview,
});
const resolveDmThreadSessionKey = (params: {
baseSessionKey: string;
chatId: string | number;
threadSpec: ReturnType<typeof resolveTelegramThreadSpec>;
}): string => {
const dmThreadId = params.threadSpec.scope === "dm" ? params.threadSpec.id : undefined;
if (dmThreadId == null) {
return params.baseSessionKey;
}
return resolveThreadSessionKeys({
baseSessionKey: params.baseSessionKey,
threadId: `${params.chatId}:${dmThreadId}`,
}).sessionKey;
};
if (commandsToRegister.length > 0 || pluginCatalog.commands.length > 0) {
if (typeof (bot as unknown as { command?: unknown }).command !== "function") {
@@ -661,11 +647,17 @@ export const registerTelegramNativeCommands = ({
});
return;
}
const sessionKey = resolveDmThreadSessionKey({
baseSessionKey: route.sessionKey,
chatId,
threadSpec,
});
const baseSessionKey = route.sessionKey;
// DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums)
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({
baseSessionKey,
threadId: `${chatId}:${dmThreadId}`,
})
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
const { skillFilter, groupSystemPrompt } = resolveTelegramGroupPromptSettings({
groupConfig,
topicConfig,
@@ -841,15 +833,10 @@ export const registerTelegramNativeCommands = ({
return;
}
const { threadSpec, route, mediaLocalRoots, tableMode, chunkMode } = runtimeContext;
const sessionKeyForInternalHooks = resolveDmThreadSessionKey({
baseSessionKey: route.sessionKey,
chatId,
threadSpec,
});
const deliveryBaseOptions = buildCommandDeliveryBaseOptions({
chatId,
accountId: route.accountId,
sessionKeyForInternalHooks,
sessionKeyForInternalHooks: route.sessionKey,
mirrorIsGroup: isGroup,
mirrorGroupId: isGroup ? String(chatId) : undefined,
mediaLocalRoots,