Compare commits

..

89 Commits

Author SHA1 Message Date
Vincent Koc
99de26761d fix(telegram): use dm thread hook session for plugin commands 2026-03-09 11:03:05 -07:00
Vincent Koc
0225e4c110 docs: format contributing whitespace 2026-03-09 11:02:34 -07:00
Pejman Pour-Moezzi
162232ae2f fix(acp): propagate setSessionMode gateway errors to client (#41185)
* fix(acp): propagate setSessionMode gateway errors to client

* fix: add changelog entry for ACP setSessionMode propagation (#41185) (thanks @pejmanjohn)

---------

Co-authored-by: Pejman Pour-Moezzi <481729+pejmanjohn@users.noreply.github.com>
Co-authored-by: Onur <onur@textcortex.com>
2026-03-09 10:58:05 -07:00
Pejman Pour-Moezzi
ae824ab269 fix(acp): map error states to end_turn instead of unconditional refusal (#41187)
* fix(acp): map error states to end_turn instead of unconditional refusal

* fix: map ACP error stop reason to end_turn (#41187) (thanks @pejmanjohn)

---------

Co-authored-by: Pejman Pour-Moezzi <481729+pejmanjohn@users.noreply.github.com>
Co-authored-by: Onur <onur@textcortex.com>
2026-03-09 10:58:05 -07:00
Radek Sienkiewicz
4808bf526e Update CONTRIBUTING.md 2026-03-09 10:58:05 -07:00
Robin Waslander
75c71eb18e Add Robin Waslander to maintainers 2026-03-09 10:58:05 -07:00
Radek Sienkiewicz
e5af902dda Update CONTRIBUTING.md 2026-03-09 10:58:05 -07:00
xaeon2026
2209cc5832 Allow ACP sessions.patch lineage fields on ACP session keys (#40995)
Merged via squash.

Prepared head SHA: c1191edc08
Co-authored-by: xaeon2026 <264572156+xaeon2026@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
2026-03-09 10:58:05 -07:00
Charles Dusek
fce77e2d45 fix(agents): bound compaction retry wait and drain embedded runs on restart (#40324)
Merged via squash.

Prepared head SHA: cfd99562d6
Co-authored-by: cgdusek <38732970+cgdusek@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
2026-03-09 10:58:05 -07:00
Daniel Reis
d0df6f3a4c test(context-engine): add bundle chunk isolation tests for registry (#40460)
Merged via squash.

Prepared head SHA: 44622abfbc
Co-authored-by: dsantoreis <220753637+dsantoreis@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
2026-03-09 10:58:05 -07:00
Joshua Lelon Mitchell
e5ef7cbdab fix(swiftformat): exclude HostEnvSecurityPolicy.generated.swift from formatters (#39969) 2026-03-09 10:58:04 -07:00
opriz
d59eb6db5b fix(kimi-coding): fix kimi tool format: use native Anthropic tool schema instead of OpenAI … (openclaw#40008)
Verified:
- pnpm install --frozen-lockfile
- pnpm build
- pnpm check
- pnpm test:macmini

Co-authored-by: opriz <51957849+opriz@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
2026-03-09 10:58:04 -07:00
Radek Sienkiewicz
6fac513119 fix(ui): preserve control-ui auth across refresh (#40892)
Merged via squash.

Prepared head SHA: f9b2375892
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Reviewed-by: @velvet-shark
2026-03-09 10:58:04 -07:00
Peter Steinberger
57b90adbf2 build: sync plugin versions for 2026.3.9 2026-03-09 10:58:04 -07:00
Peter Steinberger
0cf4c46004 fix: stabilize launchd paths and appcast secret scan 2026-03-09 10:58:04 -07:00
Peter Steinberger
1695b9203c build: bump unreleased version to 2026.3.9 2026-03-09 10:58:04 -07:00
Peter Steinberger
7cc3cc0687 fix(onboard): avoid persisting talk fallback on fresh setup 2026-03-09 10:58:04 -07:00
Peter Steinberger
0413f9576e fix(launchd): harden macOS launchagent install permissions 2026-03-09 10:58:04 -07:00
Peter Steinberger
559b38d507 test: narrow gateway loop signal harness 2026-03-09 10:58:04 -07:00
Peter Steinberger
2ca87d06f7 chore: prepare 2026.3.8 npm release 2026-03-09 10:58:04 -07:00
Peter Steinberger
09f678599d fix(update): re-enable launchd service before updater bootstrap 2026-03-09 10:58:04 -07:00
Peter Steinberger
2b8828ea46 test: fix windows runtime and restart loop harnesses 2026-03-09 10:58:04 -07:00
Peter Steinberger
dbabaa0fb2 chore: update appcast for 2026.3.8-beta.1 2026-03-09 10:58:04 -07:00
Peter Steinberger
daf0ade96b chore: prepare 2026.3.8-beta.1 release 2026-03-09 10:58:04 -07:00
Peter Steinberger
6c11b4378a fix: normalize windows runtime shim executables 2026-03-09 10:58:04 -07:00
Peter Steinberger
ddc3a3fc71 test: fix Windows fake runtime bin fixtures 2026-03-09 10:58:04 -07:00
Peter Steinberger
56218dcc21 test: fix Node 24+ test runner and subagent registry mocks 2026-03-09 10:58:04 -07:00
Peter Steinberger
38068de8e9 docs: move 2026.3.8 entries back to unreleased 2026-03-09 10:58:04 -07:00
Peter Steinberger
83b453f48a chore: refresh secrets baseline 2026-03-09 10:58:04 -07:00
Peter Steinberger
98d52062e7 build: sync pnpm lockfile 2026-03-09 10:58:04 -07:00
Peter Steinberger
64910240b9 docs: reorder 2026.3.8 changelog by impact 2026-03-09 10:58:04 -07:00
Peter Steinberger
bd4d7f6137 refactor: flatten supervisor marker hints 2026-03-09 10:58:04 -07:00
Peter Steinberger
1b8f800487 refactor: split cron startup catch-up flow 2026-03-09 10:58:04 -07:00
Peter Steinberger
8cb688c44d refactor: extract telegram polling session 2026-03-09 10:58:04 -07:00
Peter Steinberger
9bde7ef39f build: update app deps except carbon 2026-03-09 10:58:04 -07:00
Peter Steinberger
3c4377651e fix: stagger missed cron jobs on restart (#18925) (thanks @rexlunae) 2026-03-09 10:58:04 -07:00
rexlunae
41a39085d3 fix(cron): stagger missed jobs on restart to prevent gateway overload
When the gateway restarts with many overdue cron jobs, they are now
executed with staggered delays to prevent overwhelming the gateway.

- Add missedJobStaggerMs config (default 5s between jobs)
- Add maxMissedJobsPerRestart limit (default 5 jobs immediately)
- Prioritize most overdue jobs by sorting by nextRunAtMs
- Reschedule deferred jobs to fire gradually via normal timer

Fixes #18892
2026-03-09 10:58:04 -07:00
Peter Steinberger
2220a58ff7 fix: abort telegram getupdates on shutdown (#23950) (thanks @Gkinthecodeland) 2026-03-09 10:58:04 -07:00
George Kalogirou
e383257552 fix(telegram): use manual signal forwarding to avoid cross-realm AbortSignal
AbortSignal.any() fails in Node.js when signals come from different module
contexts (grammY's internal signal vs local AbortController), producing:
"The signals[0] argument must be an instance of AbortSignal. Received an
instance of AbortSignal".

Replace with manual event forwarding that works across all realms.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 10:58:04 -07:00
George Kalogirou
fa6436eaf3 fix(telegram): abort in-flight getUpdates fetch on shutdown
When the gateway receives SIGTERM, runner.stop() stops the grammY polling
loop but does not abort the in-flight getUpdates HTTP request. That request
hangs for up to 30 seconds (the Telegram API timeout). If a new gateway
instance starts polling during that window, Telegram returns a 409 Conflict
error, causing message loss and requiring exponential backoff recovery.

This is especially problematic with service managers (launchd, systemd)
that restart the process immediately after SIGTERM.

Wire an AbortController into the fetch layer so every Telegram API request
(especially the long-polling getUpdates) aborts immediately on shutdown:

- bot.ts: Accept optional fetchAbortSignal in TelegramBotOptions; wrap
  the grammY fetch with AbortSignal.any() to merge the shutdown signal.
- monitor.ts: Create a per-iteration AbortController, pass its signal to
  createTelegramBot, and abort it from the SIGTERM handler, force-restart
  path, and finally block.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 10:58:04 -07:00
Peter Steinberger
1809b92f15 fix(skills): pin validated download roots 2026-03-09 10:58:04 -07:00
Peter Steinberger
4006e388e7 fix(node-host): bind bun and deno approval scripts 2026-03-09 10:58:04 -07:00
Peter Steinberger
b22d596e59 fix: detect launchd supervision via xpc service name (#20555) (thanks @dimat) 2026-03-09 10:58:04 -07:00
dimatu
38f192a29c fix(gateway): detect launchd supervision via XPC_SERVICE_NAME
On macOS, launchd sets XPC_SERVICE_NAME on managed processes but does
not set LAUNCH_JOB_LABEL or LAUNCH_JOB_NAME. Without checking
XPC_SERVICE_NAME, isLikelySupervisedProcess() returns false for
launchd-managed gateways, causing restartGatewayProcessWithFreshPid()
to fork a detached child instead of returning "supervised". The
detached child holds the gateway lock while launchd simultaneously
respawns the original process (KeepAlive=true), leading to an infinite
lock-timeout / restart loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 10:58:04 -07:00
merlin
95ed5781f6 fix: release gateway lock on restart failure + reply to Codex reviews
- Release gateway lock when in-process restart fails, so daemon
  restart/stop can still manage the process (Codex P2)
- P1 (env mismatch) already addressed: best-effort by design, documented
  in JSDoc
2026-03-09 10:58:04 -07:00
merlin
d275df82a2 fix: move config pre-flight before onNotLoaded in runServiceRestart (Codex P2)
The config check was positioned after onNotLoaded, which could send
SIGUSR1 to an unmanaged process before config was validated.
2026-03-09 10:58:04 -07:00
merlin
32f6501dbd fix: address bot review feedback on #35862
- Remove dead 'return false' in runServiceStart (Greptile)
- Include stack trace in run-loop crash guard error log (Greptile)
- Only catch startup errors on subsequent restarts, not initial start (Codex P1)
- Add JSDoc note about env var false positive edge case (Codex P1)
2026-03-09 10:58:03 -07:00
merlin
ce44b366db test: add runServiceStart config pre-flight tests (#35862)
Address Greptile review: add test coverage for runServiceStart path.
The error message copy-paste issue was already fixed in the DRY refactor
(uses params.serviceNoun instead of hardcoded 'restart').
2026-03-09 10:58:03 -07:00
merlin
2cb063b72e fix(gateway): catch startup failure in run loop to prevent process exit (#35862)
When an in-process restart (SIGUSR1) triggers a config-triggered restart
and the new config is invalid, params.start() throws and the while loop
exits, killing the process. On macOS this loses TCC permissions.

Wrap params.start() in try/catch: on failure, set server=null, log the
error, and wait for the next SIGUSR1 instead of crashing.
2026-03-09 10:58:03 -07:00
merlin
5d82c2cc89 fix(gateway): validate config before restart to prevent crash + macOS permission loss (#35862)
When 'openclaw gateway restart' is run with an invalid config, the new
process crashes on startup due to config validation failure. On macOS,
this causes Full Disk Access (TCC) permissions to be lost because the
respawned process has a different PID.

Add getConfigValidationError() helper and pre-flight config validation
in both runServiceRestart() and runServiceStart(). If config is invalid,
abort with a clear error message instead of crashing.

The config watcher's hot-reload path already had this guard
(handleInvalidSnapshot), but the CLI restart/start commands did not.

AI-assisted (OpenClaw agent, fully tested)
2026-03-09 10:58:03 -07:00
Peter Steinberger
78a1644a8a fix(msteams): enforce sender allowlists with route allowlists 2026-03-09 10:58:03 -07:00
Peter Steinberger
d6b26e22d5 test(cron): cover owner-only tool availability 2026-03-09 10:58:03 -07:00
Peter Steinberger
0d607942d5 fix(cron): restore owner-only tools for isolated runs 2026-03-09 10:58:03 -07:00
Peter Steinberger
cc919d3856 fix(browser): enforce redirect-hop SSRF checks 2026-03-09 10:58:03 -07:00
Peter Steinberger
8948ed8e33 fix: add changelog for restart timeout recovery (#40380) (thanks @dsantoreis) 2026-03-09 10:58:03 -07:00
DevMac
648213653d test(secrets): skip ACL-dependent runtime snapshot tests on windows 2026-03-09 10:58:03 -07:00
Daniel dos Santos Reis
cec7006abb fix(gateway): exit non-zero on restart shutdown timeout
When a config-change restart hits the force-exit timeout, exit with
code 1 instead of 0 so launchd/systemd treats it as a failure and
triggers a clean process restart. Stop-timeout stays at exit(0)
since graceful stops should not cause supervisor recovery.

Closes #36822
2026-03-09 10:58:03 -07:00
scoootscooob
2d8c8e7f26 fix(daemon): also enable LaunchAgent in repairLaunchAgentBootstrap
The repair/recovery path had the same missing `enable` guard as
`restartLaunchAgent`.  If launchd persists a "disabled" state after a
previous `bootout`, the `bootstrap` call in `repairLaunchAgentBootstrap`
fails silently, leaving the gateway unloaded in the recovery flow.

Add the same `enable` guard before `bootstrap` that was already applied
to `installLaunchAgent` and (in this PR) `restartLaunchAgent`.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 10:58:03 -07:00
scoootscooob
e2c8c27c8c fix(daemon): enable LaunchAgent before bootstrap on restart
restartLaunchAgent was missing the launchctl enable call that
installLaunchAgent already performs. launchd can persist a "disabled"
state after bootout, causing bootstrap to silently fail and leaving the
gateway unloaded until a manual reinstall.

Fixes #39211

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 10:58:03 -07:00
Peter Steinberger
dad107630e test: fix windows secrets runtime ci 2026-03-09 10:58:03 -07:00
GazeKingNuWu
0d597ab800 fix: clear plugin discovery cache after plugin installation (openclaw#39752)
Verified:
- pnpm build
- pnpm check
- pnpm test:macmini

Co-authored-by: GazeKingNuWu <264914544+GazeKingNuWu@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
2026-03-09 10:58:03 -07:00
Ayaan Zaidi
03bc43a503 Fix cron text announce delivery for Telegram targets (#40575)
Merged via squash.

Prepared head SHA: 54b1513c78
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
2026-03-09 10:58:03 -07:00
Bronko
8e506634b1 fix(matrix): restore robust DM routing without the memberCount heuristic (#19736)
* fix(matrix): remove memberCount heuristic from DM detection

The memberCount === 2 check in isDirectMessage() misclassifies 2-person
group rooms (admin channels, monitoring rooms) as DMs, routing them to
the main session instead of their room-specific session.

Matrix already distinguishes DMs from groups at the protocol level via
m.direct account data and is_direct member state flags. Both are already
checked by client.dms.isDm() and hasDirectFlag(). The memberCount
heuristic only adds false positives for 2-person groups.

Move resolveMemberCount() below the protocol-level checks so it is only
reached for rooms not matched by m.direct or is_direct. This narrows its
role to diagnostic logging for confirmed group rooms.

Refs: #19739

* fix(matrix): add conservative fallback for broken DM flags

Some homeservers (notably Continuwuity) have broken m.direct account
data or never set is_direct on invite events. With the memberCount
heuristic removed, these DMs are no longer detected.

Add a conservative fallback that requires two signals before classifying
as DM: memberCount === 2 AND no explicit m.room.name. Group rooms almost
always have explicit names; DMs almost never do.

Error handling distinguishes M_NOT_FOUND (missing state event, expected
for unnamed rooms) from network/auth errors. Non-404 errors fall through
to group classification rather than guessing.

This is independently revertable — removing this commit restores pure
protocol-based detection without any heuristic fallback.

* fix(matrix): add parentPeer for DM room binding support

Add parentPeer to DM routes so conversations are bindable by room ID
while preserving DM trust semantics (secure 1:1, no group restrictions).

Suggested by @KirillShchetinin.

* fix(matrix): override DM detection for explicitly configured rooms

Builds on @robertcorreiro's config-driven approach from #9106.

Move resolveMatrixRoomConfig() before the DM check. If a room matches
a non-wildcard config entry (matchSource === "direct") and was
classified as DM, override the classification to group. This gives users
a deterministic escape hatch for misclassified rooms.

Wildcards are excluded from the override to avoid breaking DM routing
when a "*" catch-all exists. roomConfig is gated behind isRoom so DMs
never inherit group settings (skills, systemPrompt, autoReply).

This commit is independently droppable if the scope is too broad.

* test(matrix): add DM detection and config override tests

- 15 unit tests for direct.ts: all detection paths, priority order,
  M_NOT_FOUND vs network error handling, edge cases (whitespace names,
  API failures)
- 8 unit tests for rooms.ts: matchSource classification, wildcard
  safety for DM override, direct match priority over wildcard

* Changelog: note matrix DM routing follow-up

* fix(matrix): preserve DM fallback and room bindings

---------

Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
2026-03-09 10:58:03 -07:00
Ayaan Zaidi
27d2ac8460 fix: dedupe inbound Telegram DM replies per agent (#40519)
Merged via squash.

Prepared head SHA: 6e235e7d1f
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
2026-03-09 10:58:03 -07:00
Peter Steinberger
fbc8c19e52 build(protocol): sync generated swift models 2026-03-09 10:58:03 -07:00
Peter Steinberger
a7e5c7c18b fix(media): accept reader read result type 2026-03-09 10:58:03 -07:00
Peter Steinberger
def4b221d9 fix(agents): re-expose configured tools under restrictive profiles 2026-03-09 10:58:03 -07:00
Tak Hoffman
40542aba96 chore(acpx): move runtime test fixtures to test-utils (openclaw#40548)
Verified:
- pnpm install --frozen-lockfile
- pnpm build
- pnpm check
- pnpm test:macmini
2026-03-09 10:58:03 -07:00
Ayaan Zaidi
f34b0e6c42 test: fix android talk config contract fixture 2026-03-09 10:58:03 -07:00
Kyle
84064d08d3 fix(plugin-sdk): remove remaining bundled plugin src imports (openclaw#39638)
Verified:
- pnpm build
- pnpm check
- pnpm test:macmini

Co-authored-by: Kyle <3477429+kyledh@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
2026-03-09 10:58:03 -07:00
Kesku
022923be15 alphabetize web search providers (#40259)
Merged via squash.

Prepared head SHA: be6350e5ae
Co-authored-by: kesku <62210496+kesku@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
2026-03-09 10:58:03 -07:00
Mariano
a95cce9f2f ACP: add optional ingress provenance receipts (#40473)
Merged via squash.

Prepared head SHA: b63e46dd94
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
2026-03-09 10:58:03 -07:00
Tyson Cung
81bd382e6c fix(telegram): add download timeout to prevent polling loop hang (#40098)
Merged via squash.

Prepared head SHA: abdfa1a35f
Co-authored-by: tysoncung <45380903+tysoncung@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
2026-03-09 10:58:03 -07:00
yuweuii
5b28093b01 fix(models): use 1M context for openai-codex gpt-5.4 (#37876)
Merged via squash.

Prepared head SHA: c41020779e
Co-authored-by: yuweuii <82372187+yuweuii@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
2026-03-09 10:58:03 -07:00
Radek Sienkiewicz
f1449a5590 docs(changelog): correct Control UI contributor credit (#40420)
Merged via squash.

Prepared head SHA: e4295fe18b
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Reviewed-by: @velvet-shark
2026-03-09 10:58:03 -07:00
Vincent Koc
3fe81fdb96 fix(tests): correct security check failure 2026-03-09 10:58:03 -07:00
Vincent Koc
ccd6043240 Docker: improve build cache reuse (#40351)
* Docker: improve build cache reuse

* Tests: cover Docker build cache layout

* Docker: fix sandbox cache mount continuations

* Docker: document qr-import manifest scope

* Docker: narrow e2e install inputs

* CI: cache Docker builds in workflows

* CI: route sandbox smoke through setup script

* CI: keep sandbox smoke on script path
2026-03-09 10:58:03 -07:00
Radek Sienkiewicz
f3d6a3018a gateway: fix global Control UI 404s for symlinked wrappers and bundled package roots (#40385)
Merged via squash.

Prepared head SHA: 567b3ed684
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Co-authored-by: velvet-shark <126378+velvet-shark@users.noreply.github.com>
Reviewed-by: @velvet-shark
2026-03-09 10:58:02 -07:00
Peter Steinberger
df31d0e8b6 chore(docs): drop refactor cleanup tracker 2026-03-09 10:58:02 -07:00
Peter Steinberger
65623e1559 refactor(models): split provider discovery helpers 2026-03-09 10:58:02 -07:00
Peter Steinberger
b0973880b4 refactor(models): split models.json planning from writes 2026-03-09 10:58:02 -07:00
Peter Steinberger
98aa2a8cf0 refactor(agents): extract provider model normalization 2026-03-09 10:58:02 -07:00
Peter Steinberger
f7eccaee4a refactor(models): extract list row builders 2026-03-09 10:58:02 -07:00
Peter Steinberger
4b694d565d refactor: harden browser runtime profile handling 2026-03-09 10:58:02 -07:00
bbblending
7875fb6c27 fix(config): refresh runtime snapshot from disk after write. Fixes #37175 (#37313)
Merged via squash.

Prepared head SHA: 69e1861abf
Co-authored-by: bbblending <122739024+bbblending@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
2026-03-09 10:58:02 -07:00
Peter Steinberger
43451ebab7 refactor: harden browser relay CDP flows 2026-03-09 10:58:02 -07:00
Vincent Koc
017b389549 telegram: align sent hooks with command session 2026-03-09 09:36:00 -07:00
Vincent Koc
0504fb35fe Merge branch 'main' into vincentkoc-code/telegram-message-sent-parity 2026-03-08 16:40:02 -07:00
Vincent Koc
817fa5462b telegram: bridge direct delivery message hooks 2026-03-08 12:14:19 -07:00
44 changed files with 617 additions and 2176 deletions

View File

@@ -8,8 +8,6 @@ Docs: https://docs.openclaw.ai
### Breaking
- Cron/doctor: tighten isolated cron delivery so cron jobs can no longer notify through ad hoc agent sends or fallback main-session summaries, and add `openclaw doctor --fix` migration for legacy cron storage and legacy notify/webhook delivery metadata. (#40998) Thanks @mbelinky.
### Fixes
- macOS/LaunchAgent install: tighten LaunchAgent directory and plist permissions during install so launchd bootstrap does not fail when the target home path or generated plist inherited group/world-writable modes.
@@ -20,9 +18,6 @@ Docs: https://docs.openclaw.ai
- ACP/sessions.patch: allow `spawnedBy` and `spawnDepth` lineage fields on ACP session keys so `sessions_spawn` with `runtime: "acp"` no longer fails during child-session setup. Fixes #40971. (#40995) thanks @xaeon2026.
- ACP/stop reason mapping: resolve gateway chat `state: "error"` completions as ACP `end_turn` instead of `refusal` so transient backend failures are not surfaced as deliberate refusals. (#41187) thanks @pejmanjohn.
- ACP/setSessionMode: propagate gateway `sessions.patch` failures back to ACP clients so rejected mode changes no longer return silent success. (#41185) thanks @pejmanjohn.
- Agents/embedded logs: add structured, sanitized lifecycle and failover observation events so overload and provider failures are easier to tail and filter. (#41336) thanks @altaywtf.
- iOS/gateway foreground recovery: reconnect immediately on foreground return after stale background sockets are torn down, so the app no longer stays disconnected until a later wake path happens. (#41384) Thanks @mbelinky.
- Cron/subagent followup: do not misclassify empty or `NO_REPLY` cron responses as interim acknowledgements that need a rerun, so deliberately silent cron jobs are no longer retried. (#41383) thanks @jackal092927.
## 2026.3.8

View File

@@ -362,14 +362,7 @@ final class NodeAppModel {
await MainActor.run {
self.operatorConnected = false
self.gatewayConnected = false
// Foreground recovery must actively restart the saved gateway config.
// Disconnecting stale sockets alone can leave us idle if the old
// reconnect tasks were suppressed or otherwise got stuck in background.
self.gatewayStatusText = "Reconnecting…"
self.talkMode.updateGatewayConnected(false)
if let cfg = self.activeGatewayConnectConfig {
self.applyGatewayConnectConfig(cfg)
}
}
}
}

View File

@@ -29,7 +29,6 @@ Troubleshooting: [/automation/troubleshooting](/automation/troubleshooting)
- Wakeups are first-class: a job can request “wake now” vs “next heartbeat”.
- Webhook posting is per job via `delivery.mode = "webhook"` + `delivery.to = "<url>"`.
- Legacy fallback remains for stored jobs with `notify: true` when `cron.webhook` is set, migrate those jobs to webhook delivery mode.
- For upgrades, `openclaw doctor --fix` can normalize legacy cron store fields before the scheduler touches them.
## Quick start (actionable)

View File

@@ -30,12 +30,6 @@ Note: retention/pruning is controlled in config:
- `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions.
- `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/<jobId>.jsonl`.
Upgrade note: if you have older cron jobs from before the current delivery/store format, run
`openclaw doctor --fix`. Doctor now normalizes legacy cron fields (`jobId`, `schedule.cron`,
top-level delivery fields, payload `provider` delivery aliases) and migrates simple
`notify: true` webhook fallback jobs to explicit webhook delivery when `cron.webhook` is
configured.
## Common edits
Update delivery settings without changing the message:

View File

@@ -28,7 +28,6 @@ Notes:
- Interactive prompts (like keychain/OAuth fixes) only run when stdin is a TTY and `--non-interactive` is **not** set. Headless runs (cron, Telegram, no terminal) will skip prompts.
- `--fix` (alias for `--repair`) writes a backup to `~/.openclaw/openclaw.json.bak` and drops unknown config keys, listing each removal.
- State integrity checks now detect orphan transcript files in the sessions directory and can archive them as `.deleted.<timestamp>` to reclaim space safely.
- Doctor also scans `~/.openclaw/cron/jobs.json` (or `cron.store`) for legacy cron job shapes and can rewrite them in place before the scheduler has to auto-normalize them at runtime.
- Doctor includes a memory-search readiness check and can recommend `openclaw configure --section model` when embedding credentials are missing.
- If sandbox mode is enabled but Docker is unavailable, doctor reports a high-signal warning with remediation (`install Docker` or `openclaw config set agents.defaults.sandbox.mode off`).

View File

@@ -65,7 +65,6 @@ cat ~/.openclaw/openclaw.json
- Config normalization for legacy values.
- OpenCode Zen provider override warnings (`models.providers.opencode`).
- Legacy on-disk state migration (sessions/agent dir/WhatsApp auth).
- Legacy cron store migration (`jobId`, `schedule.cron`, top-level delivery/payload fields, payload `provider`, simple `notify: true` webhook fallback jobs).
- State integrity and permissions checks (sessions, transcripts, state dir).
- Config file permission checks (chmod 600) when running locally.
- Model auth health: checks OAuth expiry, can refresh expiring tokens, and reports auth-profile cooldown/disabled states.
@@ -159,25 +158,6 @@ the legacy sessions + agent dir on startup so history/auth/models land in the
per-agent path without a manual doctor run. WhatsApp auth is intentionally only
migrated via `openclaw doctor`.
### 3b) Legacy cron store migrations
Doctor also checks the cron job store (`~/.openclaw/cron/jobs.json` by default,
or `cron.store` when overridden) for old job shapes that the scheduler still
accepts for compatibility.
Current cron cleanups include:
- `jobId``id`
- `schedule.cron``schedule.expr`
- top-level payload fields (`message`, `model`, `thinking`, ...) → `payload`
- top-level delivery fields (`deliver`, `channel`, `to`, `provider`, ...) → `delivery`
- payload `provider` delivery aliases → explicit `delivery.channel`
- simple legacy `notify: true` webhook fallback jobs → explicit `delivery.mode="webhook"` with `delivery.to=cron.webhook`
Doctor only auto-migrates `notify: true` jobs when it can do so without
changing behavior. If a job combines legacy notify fallback with an existing
non-webhook delivery mode, doctor warns and leaves that job for manual review.
### 4) State integrity checks (session persistence, routing, and safety)
The state directory is the operational brainstem. If it vanishes, you lose

View File

@@ -148,37 +148,6 @@ replace logs; they exist to feed metrics, traces, and other exporters.
Diagnostics events are emitted in-process, but exporters only attach when
diagnostics + the exporter plugin are enabled.
### Telemetry surface ownership
OpenClaw has separate surfaces for automation, runtime control, and telemetry:
| If you want to... | Use... | Why |
| -------------------------------------------------------------------------------------- | --------------------------------------- | ------------------------------------------------------------------ |
| Export metrics, traces, or machine-readable health signals | Diagnostic events | Observability should be append-only telemetry, not a behavior hook |
| Rewrite prompts, block tools, cancel outbound messages, or add policy/middleware | Typed plugin hooks via `api.on(...)` | Runtime hooks can mutate or block behavior |
| Trigger coarse operator automation such as file writes, notifications, or side effects | HOOK.md hooks / `api.registerHook(...)` | Internal hooks are for operator automation, not telemetry schemas |
Future OTEL work should extend `src/infra/diagnostic-events.ts`, then map those
events in the `diagnostics-otel` plugin. Do not add telemetry-only proposals by
growing the hook APIs.
### What diagnostic events are for
Diagnostic events are the observability contract between the gateway runtime and
telemetry consumers such as the `diagnostics-otel` plugin.
Diagnostic events should be:
- append-only signals for exporters, dashboards, alerts, and troubleshooting
- safe to ignore without affecting runtime behavior
- stable enough that exporters can map them into metrics, traces, or logs
Diagnostic events should not be used for:
- blocking, vetoing, or rewriting runtime behavior
- policy enforcement or middleware ordering
- side-effect automation that must run for the system to behave correctly
### OpenTelemetry vs OTLP
- **OpenTelemetry (OTel)**: the data model + SDKs for traces, metrics, and logs.
@@ -215,28 +184,6 @@ Queue + session:
- `run.attempt`: run retry/attempt metadata.
- `diagnostic.heartbeat`: aggregate counters (webhooks/queue/session).
Tool safety:
- `tool.loop`: repeated-tool-loop warning/block telemetry emitted by the runtime.
### What is still missing
The current event catalog is useful, but still coarse in a few places. New
observability work should generally extend `src/infra/diagnostic-events.ts`
instead of asking hooks to carry telemetry-only meaning.
Priority gaps for future telemetry work:
- Run lifecycle: explicit run start, run end, and run error boundaries.
- Model lifecycle: request/response/error boundaries in addition to aggregate
`model.usage`.
- Tool lifecycle: tool call start/end/error boundaries, plus first-class exporter
mapping for existing `tool.loop` events.
- Outbound delivery lifecycle: delivery attempted/sent/failed boundaries across
channels, separate from message processing.
- Attribute hygiene: clearer redaction and cardinality guidance for exporter-safe
fields.
### Enable diagnostics (no exporter)
Use this if you want diagnostics events available to plugins or custom sinks:

View File

@@ -1,182 +0,0 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import * as loggingConfigModule from "../logging/config.js";
import {
buildApiErrorObservationFields,
buildTextObservationFields,
sanitizeForConsole,
} from "./pi-embedded-error-observation.js";
afterEach(() => {
vi.restoreAllMocks();
});
describe("buildApiErrorObservationFields", () => {
it("redacts request ids and exposes stable hashes instead of raw payloads", () => {
const observed = buildApiErrorObservationFields(
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_overload"}',
);
expect(observed).toMatchObject({
rawErrorPreview: expect.stringContaining('"request_id":"sha256:'),
rawErrorHash: expect.stringMatching(/^sha256:/),
rawErrorFingerprint: expect.stringMatching(/^sha256:/),
providerErrorType: "overloaded_error",
providerErrorMessagePreview: "Overloaded",
requestIdHash: expect.stringMatching(/^sha256:/),
});
expect(observed.rawErrorPreview).not.toContain("req_overload");
});
it("forces token redaction for observation previews", () => {
const observed = buildApiErrorObservationFields(
"Authorization: Bearer sk-abcdefghijklmnopqrstuvwxyz123456",
);
expect(observed.rawErrorPreview).not.toContain("sk-abcdefghijklmnopqrstuvwxyz123456");
expect(observed.rawErrorPreview).toContain("sk-abc");
expect(observed.rawErrorHash).toMatch(/^sha256:/);
});
it("redacts observation-only header and cookie formats", () => {
const observed = buildApiErrorObservationFields(
"x-api-key: sk-abcdefghijklmnopqrstuvwxyz123456 Cookie: session=abcdefghijklmnopqrstuvwxyz123456",
);
expect(observed.rawErrorPreview).not.toContain("abcdefghijklmnopqrstuvwxyz123456");
expect(observed.rawErrorPreview).toContain("x-api-key: ***");
expect(observed.rawErrorPreview).toContain("Cookie: session=");
});
it("does not let cookie redaction consume unrelated fields on the same line", () => {
const observed = buildApiErrorObservationFields(
"Cookie: session=abcdefghijklmnopqrstuvwxyz123456 status=503 request_id=req_cookie",
);
expect(observed.rawErrorPreview).toContain("Cookie: session=");
expect(observed.rawErrorPreview).toContain("status=503");
expect(observed.rawErrorPreview).toContain("request_id=sha256:");
});
it("builds sanitized generic text observation fields", () => {
const observed = buildTextObservationFields(
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_prev"}',
);
expect(observed).toMatchObject({
textPreview: expect.stringContaining('"request_id":"sha256:'),
textHash: expect.stringMatching(/^sha256:/),
textFingerprint: expect.stringMatching(/^sha256:/),
providerErrorType: "overloaded_error",
providerErrorMessagePreview: "Overloaded",
requestIdHash: expect.stringMatching(/^sha256:/),
});
expect(observed.textPreview).not.toContain("req_prev");
});
it("redacts request ids in formatted plain-text errors", () => {
const observed = buildApiErrorObservationFields(
"LLM error overloaded_error: Overloaded (request_id: req_plaintext_123)",
);
expect(observed).toMatchObject({
rawErrorPreview: expect.stringContaining("request_id: sha256:"),
rawErrorFingerprint: expect.stringMatching(/^sha256:/),
requestIdHash: expect.stringMatching(/^sha256:/),
});
expect(observed.rawErrorPreview).not.toContain("req_plaintext_123");
});
it("keeps fingerprints stable across request ids for equivalent errors", () => {
const first = buildApiErrorObservationFields(
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_001"}',
);
const second = buildApiErrorObservationFields(
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_002"}',
);
expect(first.rawErrorFingerprint).toBe(second.rawErrorFingerprint);
expect(first.rawErrorHash).not.toBe(second.rawErrorHash);
});
it("truncates oversized raw and provider previews", () => {
const longMessage = "X".repeat(260);
const observed = buildApiErrorObservationFields(
`{"type":"error","error":{"type":"server_error","message":"${longMessage}"},"request_id":"req_long"}`,
);
expect(observed.rawErrorPreview).toBeDefined();
expect(observed.providerErrorMessagePreview).toBeDefined();
expect(observed.rawErrorPreview?.length).toBeLessThanOrEqual(401);
expect(observed.providerErrorMessagePreview?.length).toBeLessThanOrEqual(201);
expect(observed.providerErrorMessagePreview?.endsWith("…")).toBe(true);
});
it("caps oversized raw inputs before hashing and fingerprinting", () => {
const oversized = "X".repeat(70_000);
const bounded = "X".repeat(64_000);
expect(buildApiErrorObservationFields(oversized)).toMatchObject({
rawErrorHash: buildApiErrorObservationFields(bounded).rawErrorHash,
rawErrorFingerprint: buildApiErrorObservationFields(bounded).rawErrorFingerprint,
});
});
it("returns empty observation fields for empty input", () => {
expect(buildApiErrorObservationFields(undefined)).toEqual({});
expect(buildApiErrorObservationFields("")).toEqual({});
expect(buildApiErrorObservationFields(" ")).toEqual({});
});
it("re-reads configured redact patterns on each call", () => {
const readLoggingConfig = vi.spyOn(loggingConfigModule, "readLoggingConfig");
readLoggingConfig.mockReturnValueOnce(undefined);
readLoggingConfig.mockReturnValueOnce({
redactPatterns: [String.raw`\bcustom-secret-[A-Za-z0-9]+\b`],
});
const first = buildApiErrorObservationFields("custom-secret-abc123");
const second = buildApiErrorObservationFields("custom-secret-abc123");
expect(first.rawErrorPreview).toContain("custom-secret-abc123");
expect(second.rawErrorPreview).not.toContain("custom-secret-abc123");
expect(second.rawErrorPreview).toContain("custom");
});
it("fails closed when observation sanitization throws", () => {
vi.spyOn(loggingConfigModule, "readLoggingConfig").mockImplementation(() => {
throw new Error("boom");
});
expect(buildApiErrorObservationFields("request_id=req_123")).toEqual({});
expect(buildTextObservationFields("request_id=req_123")).toEqual({
textPreview: undefined,
textHash: undefined,
textFingerprint: undefined,
httpCode: undefined,
providerErrorType: undefined,
providerErrorMessagePreview: undefined,
requestIdHash: undefined,
});
});
it("ignores non-string configured redact patterns", () => {
vi.spyOn(loggingConfigModule, "readLoggingConfig").mockReturnValue({
redactPatterns: [
123 as never,
{ bad: true } as never,
String.raw`\bcustom-secret-[A-Za-z0-9]+\b`,
],
});
const observed = buildApiErrorObservationFields("custom-secret-abc123");
expect(observed.rawErrorPreview).not.toContain("custom-secret-abc123");
expect(observed.rawErrorPreview).toContain("custom");
});
});
describe("sanitizeForConsole", () => {
it("strips control characters from console-facing values", () => {
expect(sanitizeForConsole("run-1\nprovider\tmodel\rtest")).toBe("run-1 provider model test");
});
});

View File

@@ -1,199 +0,0 @@
import { readLoggingConfig } from "../logging/config.js";
import { redactIdentifier } from "../logging/redact-identifier.js";
import { getDefaultRedactPatterns, redactSensitiveText } from "../logging/redact.js";
import { getApiErrorPayloadFingerprint, parseApiErrorInfo } from "./pi-embedded-helpers.js";
import { stableStringify } from "./stable-stringify.js";
const MAX_OBSERVATION_INPUT_CHARS = 64_000;
const MAX_FINGERPRINT_MESSAGE_CHARS = 8_000;
const RAW_ERROR_PREVIEW_MAX_CHARS = 400;
const PROVIDER_ERROR_PREVIEW_MAX_CHARS = 200;
const REQUEST_ID_RE = /\brequest[_ ]?id\b\s*[:=]\s*["'()]*([A-Za-z0-9._:-]+)/i;
const OBSERVATION_EXTRA_REDACT_PATTERNS = [
String.raw`\b(?:x-)?api[-_]?key\b\s*[:=]\s*(["']?)([^\s"'\\;]+)\1`,
String.raw`"(?:api[-_]?key|api_key)"\s*:\s*"([^"]+)"`,
String.raw`(?:\bCookie\b\s*[:=]\s*[^;=\s]+=|;\s*[^;=\s]+=)([^;\s\r\n]+)`,
];
function resolveConfiguredRedactPatterns(): string[] {
const configured = readLoggingConfig()?.redactPatterns;
if (!Array.isArray(configured)) {
return [];
}
return configured.filter((pattern): pattern is string => typeof pattern === "string");
}
function truncateForObservation(text: string | undefined, maxChars: number): string | undefined {
const trimmed = text?.trim();
if (!trimmed) {
return undefined;
}
return trimmed.length > maxChars ? `${trimmed.slice(0, maxChars)}` : trimmed;
}
function boundObservationInput(text: string | undefined): string | undefined {
const trimmed = text?.trim();
if (!trimmed) {
return undefined;
}
return trimmed.length > MAX_OBSERVATION_INPUT_CHARS
? trimmed.slice(0, MAX_OBSERVATION_INPUT_CHARS)
: trimmed;
}
export function sanitizeForConsole(text: string | undefined, maxChars = 200): string | undefined {
const trimmed = text?.trim();
if (!trimmed) {
return undefined;
}
const withoutControlChars = Array.from(trimmed)
.filter((char) => {
const code = char.charCodeAt(0);
return !(
code <= 0x08 ||
code === 0x0b ||
code === 0x0c ||
(code >= 0x0e && code <= 0x1f) ||
code === 0x7f
);
})
.join("");
const sanitized = withoutControlChars
.replace(/[\r\n\t]+/g, " ")
.replace(/\s+/g, " ")
.trim();
return sanitized.length > maxChars ? `${sanitized.slice(0, maxChars)}` : sanitized;
}
function replaceRequestIdPreview(
text: string | undefined,
requestId: string | undefined,
): string | undefined {
if (!text || !requestId) {
return text;
}
return text.split(requestId).join(redactIdentifier(requestId, { len: 12 }));
}
function redactObservationText(text: string | undefined): string | undefined {
if (!text) {
return text;
}
// Observation logs must stay redacted even when operators disable general-purpose
// log redaction, otherwise raw provider payloads leak back into always-on logs.
const configuredPatterns = resolveConfiguredRedactPatterns();
return redactSensitiveText(text, {
mode: "tools",
patterns: [
...getDefaultRedactPatterns(),
...configuredPatterns,
...OBSERVATION_EXTRA_REDACT_PATTERNS,
],
});
}
function extractRequestId(text: string | undefined): string | undefined {
if (!text) {
return undefined;
}
const match = text.match(REQUEST_ID_RE);
return match?.[1]?.trim() || undefined;
}
function buildObservationFingerprint(params: {
raw: string;
requestId?: string;
httpCode?: string;
type?: string;
message?: string;
}): string | null {
const boundedMessage =
params.message && params.message.length > MAX_FINGERPRINT_MESSAGE_CHARS
? params.message.slice(0, MAX_FINGERPRINT_MESSAGE_CHARS)
: params.message;
const structured =
params.httpCode || params.type || boundedMessage
? stableStringify({
httpCode: params.httpCode,
type: params.type,
message: boundedMessage,
})
: null;
if (structured) {
return structured;
}
if (params.requestId) {
return params.raw.split(params.requestId).join("<request_id>");
}
return getApiErrorPayloadFingerprint(params.raw);
}
export function buildApiErrorObservationFields(rawError?: string): {
rawErrorPreview?: string;
rawErrorHash?: string;
rawErrorFingerprint?: string;
httpCode?: string;
providerErrorType?: string;
providerErrorMessagePreview?: string;
requestIdHash?: string;
} {
const trimmed = boundObservationInput(rawError);
if (!trimmed) {
return {};
}
try {
const parsed = parseApiErrorInfo(trimmed);
const requestId = parsed?.requestId ?? extractRequestId(trimmed);
const requestIdHash = requestId ? redactIdentifier(requestId, { len: 12 }) : undefined;
const rawFingerprint = buildObservationFingerprint({
raw: trimmed,
requestId,
httpCode: parsed?.httpCode,
type: parsed?.type,
message: parsed?.message,
});
const redactedRawPreview = replaceRequestIdPreview(redactObservationText(trimmed), requestId);
const redactedProviderMessage = replaceRequestIdPreview(
redactObservationText(parsed?.message),
requestId,
);
return {
rawErrorPreview: truncateForObservation(redactedRawPreview, RAW_ERROR_PREVIEW_MAX_CHARS),
rawErrorHash: redactIdentifier(trimmed, { len: 12 }),
rawErrorFingerprint: rawFingerprint
? redactIdentifier(rawFingerprint, { len: 12 })
: undefined,
httpCode: parsed?.httpCode,
providerErrorType: parsed?.type,
providerErrorMessagePreview: truncateForObservation(
redactedProviderMessage,
PROVIDER_ERROR_PREVIEW_MAX_CHARS,
),
requestIdHash,
};
} catch {
return {};
}
}
export function buildTextObservationFields(text?: string): {
textPreview?: string;
textHash?: string;
textFingerprint?: string;
httpCode?: string;
providerErrorType?: string;
providerErrorMessagePreview?: string;
requestIdHash?: string;
} {
const observed = buildApiErrorObservationFields(text);
return {
textPreview: observed.rawErrorPreview,
textHash: observed.rawErrorHash,
textFingerprint: observed.rawErrorFingerprint,
httpCode: observed.httpCode,
providerErrorType: observed.providerErrorType,
providerErrorMessagePreview: observed.providerErrorMessagePreview,
requestIdHash: observed.requestIdHash,
};
}

View File

@@ -61,7 +61,6 @@ import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
import { log } from "./logger.js";
import { resolveModel } from "./model.js";
import { runEmbeddedAttempt } from "./run/attempt.js";
import { createFailoverDecisionLogger } from "./run/failover-observation.js";
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
import { buildEmbeddedRunPayloads } from "./run/payloads.js";
import {
@@ -1227,26 +1226,11 @@ export async function runEmbeddedPiAgent(
reason: promptProfileFailureReason,
});
const promptFailoverFailure = isFailoverErrorMessage(errorText);
// Capture the failing profile before auth-profile rotation mutates `lastProfileId`.
const failedPromptProfileId = lastProfileId;
const logPromptFailoverDecision = createFailoverDecisionLogger({
stage: "prompt",
runId: params.runId,
rawError: errorText,
failoverReason: promptFailoverReason,
profileFailureReason: promptProfileFailureReason,
provider,
model: modelId,
profileId: failedPromptProfileId,
fallbackConfigured,
aborted,
});
if (
promptFailoverFailure &&
promptFailoverReason !== "timeout" &&
(await advanceAuthProfile())
) {
logPromptFailoverDecision("rotate_profile");
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
continue;
}
@@ -1265,20 +1249,15 @@ export async function runEmbeddedPiAgent(
// are configured so outer model fallback can continue on overload,
// rate-limit, auth, or billing failures.
if (fallbackConfigured && promptFailoverFailure) {
const status = resolveFailoverStatus(promptFailoverReason ?? "unknown");
logPromptFailoverDecision("fallback_model", { status });
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
throw new FailoverError(errorText, {
reason: promptFailoverReason ?? "unknown",
provider,
model: modelId,
profileId: lastProfileId,
status,
status: resolveFailoverStatus(promptFailoverReason ?? "unknown"),
});
}
if (promptFailoverFailure || promptFailoverReason) {
logPromptFailoverDecision("surface_error");
}
throw promptError;
}
@@ -1303,21 +1282,6 @@ export async function runEmbeddedPiAgent(
resolveAuthProfileFailureReason(assistantFailoverReason);
const cloudCodeAssistFormatError = attempt.cloudCodeAssistFormatError;
const imageDimensionError = parseImageDimensionError(lastAssistant?.errorMessage ?? "");
// Capture the failing profile before auth-profile rotation mutates `lastProfileId`.
const failedAssistantProfileId = lastProfileId;
const logAssistantFailoverDecision = createFailoverDecisionLogger({
stage: "assistant",
runId: params.runId,
rawError: lastAssistant?.errorMessage?.trim(),
failoverReason: assistantFailoverReason,
profileFailureReason: assistantProfileFailureReason,
provider: activeErrorContext.provider,
model: activeErrorContext.model,
profileId: failedAssistantProfileId,
fallbackConfigured,
timedOut,
aborted,
});
if (
authFailure &&
@@ -1375,7 +1339,6 @@ export async function runEmbeddedPiAgent(
const rotated = await advanceAuthProfile();
if (rotated) {
logAssistantFailoverDecision("rotate_profile");
await maybeBackoffBeforeOverloadFailover(assistantFailoverReason);
continue;
}
@@ -1408,7 +1371,6 @@ export async function runEmbeddedPiAgent(
const status =
resolveFailoverStatus(assistantFailoverReason ?? "unknown") ??
(isTimeoutErrorMessage(message) ? 408 : undefined);
logAssistantFailoverDecision("fallback_model", { status });
throw new FailoverError(message, {
reason: assistantFailoverReason ?? "unknown",
provider: activeErrorContext.provider,
@@ -1417,7 +1379,6 @@ export async function runEmbeddedPiAgent(
status,
});
}
logAssistantFailoverDecision("surface_error");
}
const usage = toNormalizedUsage(usageAccumulator);

View File

@@ -1,48 +0,0 @@
import { describe, expect, it } from "vitest";
import { normalizeFailoverDecisionObservationBase } from "./failover-observation.js";
describe("normalizeFailoverDecisionObservationBase", () => {
it("fills timeout observation reasons for deadline timeouts without provider error text", () => {
expect(
normalizeFailoverDecisionObservationBase({
stage: "assistant",
runId: "run:timeout",
rawError: "",
failoverReason: null,
profileFailureReason: null,
provider: "openai",
model: "mock-1",
profileId: "openai:p1",
fallbackConfigured: false,
timedOut: true,
aborted: false,
}),
).toMatchObject({
failoverReason: "timeout",
profileFailureReason: "timeout",
timedOut: true,
});
});
it("preserves explicit failover reasons", () => {
expect(
normalizeFailoverDecisionObservationBase({
stage: "assistant",
runId: "run:overloaded",
rawError: '{"error":{"type":"overloaded_error"}}',
failoverReason: "overloaded",
profileFailureReason: "overloaded",
provider: "openai",
model: "mock-1",
profileId: "openai:p1",
fallbackConfigured: true,
timedOut: true,
aborted: false,
}),
).toMatchObject({
failoverReason: "overloaded",
profileFailureReason: "overloaded",
timedOut: true,
});
});
});

View File

@@ -1,76 +0,0 @@
import { redactIdentifier } from "../../../logging/redact-identifier.js";
import type { AuthProfileFailureReason } from "../../auth-profiles.js";
import {
buildApiErrorObservationFields,
sanitizeForConsole,
} from "../../pi-embedded-error-observation.js";
import type { FailoverReason } from "../../pi-embedded-helpers.js";
import { log } from "../logger.js";
export type FailoverDecisionLoggerInput = {
stage: "prompt" | "assistant";
decision: "rotate_profile" | "fallback_model" | "surface_error";
runId?: string;
rawError?: string;
failoverReason: FailoverReason | null;
profileFailureReason?: AuthProfileFailureReason | null;
provider: string;
model: string;
profileId?: string;
fallbackConfigured: boolean;
timedOut?: boolean;
aborted?: boolean;
status?: number;
};
export type FailoverDecisionLoggerBase = Omit<FailoverDecisionLoggerInput, "decision" | "status">;
export function normalizeFailoverDecisionObservationBase(
base: FailoverDecisionLoggerBase,
): FailoverDecisionLoggerBase {
return {
...base,
failoverReason: base.failoverReason ?? (base.timedOut ? "timeout" : null),
profileFailureReason: base.profileFailureReason ?? (base.timedOut ? "timeout" : null),
};
}
export function createFailoverDecisionLogger(
base: FailoverDecisionLoggerBase,
): (
decision: FailoverDecisionLoggerInput["decision"],
extra?: Pick<FailoverDecisionLoggerInput, "status">,
) => void {
const normalizedBase = normalizeFailoverDecisionObservationBase(base);
const safeProfileId = normalizedBase.profileId
? redactIdentifier(normalizedBase.profileId, { len: 12 })
: undefined;
const safeRunId = sanitizeForConsole(normalizedBase.runId) ?? "-";
const safeProvider = sanitizeForConsole(normalizedBase.provider) ?? "-";
const safeModel = sanitizeForConsole(normalizedBase.model) ?? "-";
const profileText = safeProfileId ?? "-";
const reasonText = normalizedBase.failoverReason ?? "none";
return (decision, extra) => {
const observedError = buildApiErrorObservationFields(normalizedBase.rawError);
log.warn("embedded run failover decision", {
event: "embedded_run_failover_decision",
tags: ["error_handling", "failover", normalizedBase.stage, decision],
runId: normalizedBase.runId,
stage: normalizedBase.stage,
decision,
failoverReason: normalizedBase.failoverReason,
profileFailureReason: normalizedBase.profileFailureReason,
provider: normalizedBase.provider,
model: normalizedBase.model,
profileId: safeProfileId,
fallbackConfigured: normalizedBase.fallbackConfigured,
timedOut: normalizedBase.timedOut,
aborted: normalizedBase.aborted,
status: extra?.status,
...observedError,
consoleMessage:
`embedded run failover decision: runId=${safeRunId} stage=${normalizedBase.stage} decision=${decision} ` +
`reason=${reasonText} provider=${safeProvider}/${safeModel} profile=${profileText}`,
});
};
}

View File

@@ -54,13 +54,8 @@ describe("handleAgentEnd", () => {
const warn = vi.mocked(ctx.log.warn);
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
expect(warn.mock.calls[0]?.[1]).toMatchObject({
event: "embedded_run_agent_end",
runId: "run-1",
error: "connection refused",
rawErrorPreview: "connection refused",
});
expect(warn.mock.calls[0]?.[0]).toContain("runId=run-1");
expect(warn.mock.calls[0]?.[0]).toContain("error=connection refused");
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
data: {
@@ -70,59 +65,6 @@ describe("handleAgentEnd", () => {
});
});
it("attaches raw provider error metadata without changing the console message", () => {
const ctx = createContext({
role: "assistant",
stopReason: "error",
provider: "anthropic",
model: "claude-test",
errorMessage: '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}',
content: [{ type: "text", text: "" }],
});
handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
expect(warn.mock.calls[0]?.[1]).toMatchObject({
event: "embedded_run_agent_end",
runId: "run-1",
error: "The AI service is temporarily overloaded. Please try again in a moment.",
failoverReason: "overloaded",
providerErrorType: "overloaded_error",
});
});
it("redacts logged error text before emitting lifecycle events", () => {
const onAgentEvent = vi.fn();
const ctx = createContext(
{
role: "assistant",
stopReason: "error",
errorMessage: "x-api-key: sk-abcdefghijklmnopqrstuvwxyz123456",
content: [{ type: "text", text: "" }],
},
{ onAgentEvent },
);
handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn.mock.calls[0]?.[1]).toMatchObject({
event: "embedded_run_agent_end",
error: "x-api-key: ***",
rawErrorPreview: "x-api-key: ***",
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
data: {
phase: "error",
error: "x-api-key: ***",
},
});
});
it("keeps non-error run-end logging on debug only", () => {
const ctx = createContext(undefined);

View File

@@ -1,11 +1,6 @@
import { emitAgentEvent } from "../infra/agent-events.js";
import { createInlineCodeState } from "../markdown/code-spans.js";
import {
buildApiErrorObservationFields,
buildTextObservationFields,
sanitizeForConsole,
} from "./pi-embedded-error-observation.js";
import { classifyFailoverReason, formatAssistantErrorText } from "./pi-embedded-helpers.js";
import { formatAssistantErrorText } from "./pi-embedded-helpers.js";
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
import { isAssistantMessage } from "./pi-embedded-utils.js";
@@ -41,31 +36,16 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
provider: lastAssistant.provider,
model: lastAssistant.model,
});
const rawError = lastAssistant.errorMessage?.trim();
const failoverReason = classifyFailoverReason(rawError ?? "");
const errorText = (friendlyError || lastAssistant.errorMessage || "LLM request failed.").trim();
const observedError = buildApiErrorObservationFields(rawError);
const safeErrorText =
buildTextObservationFields(errorText).textPreview ?? "LLM request failed.";
const safeRunId = sanitizeForConsole(ctx.params.runId) ?? "-";
ctx.log.warn("embedded run agent end", {
event: "embedded_run_agent_end",
tags: ["error_handling", "lifecycle", "agent_end", "assistant_error"],
runId: ctx.params.runId,
isError: true,
error: safeErrorText,
failoverReason,
provider: lastAssistant.provider,
model: lastAssistant.model,
...observedError,
consoleMessage: `embedded run agent end: runId=${safeRunId} isError=true error=${safeErrorText}`,
});
ctx.log.warn(
`embedded run agent end: runId=${ctx.params.runId} isError=true error=${errorText}`,
);
emitAgentEvent({
runId: ctx.params.runId,
stream: "lifecycle",
data: {
phase: "error",
error: safeErrorText,
error: errorText,
endedAt: Date.now(),
},
});
@@ -73,7 +53,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
stream: "lifecycle",
data: {
phase: "error",
error: safeErrorText,
error: errorText,
},
});
} else {

View File

@@ -12,8 +12,8 @@ import type {
import type { NormalizedUsage } from "./usage.js";
export type EmbeddedSubscribeLogger = {
debug: (message: string, meta?: Record<string, unknown>) => void;
warn: (message: string, meta?: Record<string, unknown>) => void;
debug: (message: string) => void;
warn: (message: string) => void;
};
export type ToolErrorSummary = {

View File

@@ -16,7 +16,7 @@ export function registerCronCli(program: Command) {
.addHelpText(
"after",
() =>
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n${theme.muted("Upgrade tip:")} run \`openclaw doctor --fix\` to normalize legacy cron job storage.\n`,
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n`,
);
registerCronStatusCommand(cron);

View File

@@ -1,269 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import * as noteModule from "../terminal/note.js";
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
let tempRoot: string | null = null;
async function makeTempStorePath() {
tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-doctor-cron-"));
return path.join(tempRoot, "cron", "jobs.json");
}
afterEach(async () => {
vi.restoreAllMocks();
if (tempRoot) {
await fs.rm(tempRoot, { recursive: true, force: true });
tempRoot = null;
}
});
function makePrompter(confirmResult = true) {
return {
confirm: vi.fn().mockResolvedValue(confirmResult),
};
}
describe("maybeRepairLegacyCronStore", () => {
it("repairs legacy cron store fields and migrates notify fallback to webhook delivery", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
jobId: "legacy-job",
name: "Legacy job",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
payload: {
kind: "systemEvent",
text: "Morning brief",
},
state: {},
},
],
},
null,
2,
),
"utf-8",
);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
const cfg: OpenClawConfig = {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
};
await maybeRepairLegacyCronStore({
cfg,
options: {},
prompter: makePrompter(true),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
const [job] = persisted.jobs;
expect(job?.jobId).toBeUndefined();
expect(job?.id).toBe("legacy-job");
expect(job?.notify).toBeUndefined();
expect(job?.schedule).toMatchObject({
kind: "cron",
expr: "0 7 * * *",
tz: "UTC",
});
expect(job?.delivery).toMatchObject({
mode: "webhook",
to: "https://example.invalid/cron-finished",
});
expect(job?.payload).toMatchObject({
kind: "systemEvent",
text: "Morning brief",
});
expect(noteSpy).toHaveBeenCalledWith(
expect.stringContaining("Legacy cron job storage detected"),
"Cron",
);
expect(noteSpy).toHaveBeenCalledWith(
expect.stringContaining("Cron store normalized"),
"Doctor changes",
);
});
it("warns instead of replacing announce delivery for notify fallback jobs", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "notify-and-announce",
name: "Notify and announce",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "Status" },
delivery: { mode: "announce", channel: "telegram", to: "123" },
state: {},
},
],
},
null,
2,
),
"utf-8",
);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
await maybeRepairLegacyCronStore({
cfg: {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
},
options: { nonInteractive: true },
prompter: makePrompter(true),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
expect(persisted.jobs[0]?.notify).toBe(true);
expect(noteSpy).toHaveBeenCalledWith(
expect.stringContaining('uses legacy notify fallback alongside delivery mode "announce"'),
"Doctor warnings",
);
});
it("does not auto-repair in non-interactive mode without explicit repair approval", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
jobId: "legacy-job",
name: "Legacy job",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
payload: {
kind: "systemEvent",
text: "Morning brief",
},
state: {},
},
],
},
null,
2,
),
"utf-8",
);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
const prompter = makePrompter(false);
await maybeRepairLegacyCronStore({
cfg: {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
},
options: { nonInteractive: true },
prompter,
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
expect(prompter.confirm).toHaveBeenCalledWith({
message: "Repair legacy cron jobs now?",
initialValue: true,
});
expect(persisted.jobs[0]?.jobId).toBe("legacy-job");
expect(persisted.jobs[0]?.notify).toBe(true);
expect(noteSpy).not.toHaveBeenCalledWith(
expect.stringContaining("Cron store normalized"),
"Doctor changes",
);
});
it("migrates notify fallback none delivery jobs to cron.webhook", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "notify-none",
name: "Notify none",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "every", everyMs: 60_000 },
payload: {
kind: "systemEvent",
text: "Status",
},
delivery: { mode: "none", to: "123456789" },
state: {},
},
],
},
null,
2,
),
"utf-8",
);
await maybeRepairLegacyCronStore({
cfg: {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
},
options: {},
prompter: makePrompter(true),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
expect(persisted.jobs[0]?.notify).toBeUndefined();
expect(persisted.jobs[0]?.delivery).toMatchObject({
mode: "webhook",
to: "https://example.invalid/cron-finished",
});
});
});

View File

@@ -1,183 +0,0 @@
import { formatCliCommand } from "../cli/command-format.js";
import type { OpenClawConfig } from "../config/config.js";
import { normalizeStoredCronJobs } from "../cron/store-migration.js";
import { resolveCronStorePath, loadCronStore, saveCronStore } from "../cron/store.js";
import type { CronJob } from "../cron/types.js";
import { note } from "../terminal/note.js";
import { shortenHomePath } from "../utils.js";
import type { DoctorPrompter, DoctorOptions } from "./doctor-prompter.js";
type CronDoctorOutcome = {
changed: boolean;
warnings: string[];
};
function pluralize(count: number, noun: string) {
return `${count} ${noun}${count === 1 ? "" : "s"}`;
}
function formatLegacyIssuePreview(issues: Partial<Record<string, number>>): string[] {
const lines: string[] = [];
if (issues.jobId) {
lines.push(`- ${pluralize(issues.jobId, "job")} still uses legacy \`jobId\``);
}
if (issues.legacyScheduleString) {
lines.push(
`- ${pluralize(issues.legacyScheduleString, "job")} stores schedule as a bare string`,
);
}
if (issues.legacyScheduleCron) {
lines.push(`- ${pluralize(issues.legacyScheduleCron, "job")} still uses \`schedule.cron\``);
}
if (issues.legacyPayloadKind) {
lines.push(`- ${pluralize(issues.legacyPayloadKind, "job")} needs payload kind normalization`);
}
if (issues.legacyPayloadProvider) {
lines.push(
`- ${pluralize(issues.legacyPayloadProvider, "job")} still uses payload \`provider\` as a delivery alias`,
);
}
if (issues.legacyTopLevelPayloadFields) {
lines.push(
`- ${pluralize(issues.legacyTopLevelPayloadFields, "job")} still uses top-level payload fields`,
);
}
if (issues.legacyTopLevelDeliveryFields) {
lines.push(
`- ${pluralize(issues.legacyTopLevelDeliveryFields, "job")} still uses top-level delivery fields`,
);
}
if (issues.legacyDeliveryMode) {
lines.push(
`- ${pluralize(issues.legacyDeliveryMode, "job")} still uses delivery mode \`deliver\``,
);
}
return lines;
}
function trimString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function migrateLegacyNotifyFallback(params: {
jobs: Array<Record<string, unknown>>;
legacyWebhook?: string;
}): CronDoctorOutcome {
let changed = false;
const warnings: string[] = [];
for (const raw of params.jobs) {
if (!("notify" in raw)) {
continue;
}
const jobName = trimString(raw.name) ?? trimString(raw.id) ?? "<unnamed>";
const notify = raw.notify === true;
if (!notify) {
delete raw.notify;
changed = true;
continue;
}
const delivery =
raw.delivery && typeof raw.delivery === "object" && !Array.isArray(raw.delivery)
? (raw.delivery as Record<string, unknown>)
: null;
const mode = trimString(delivery?.mode)?.toLowerCase();
const to = trimString(delivery?.to);
if (mode === "webhook" && to) {
delete raw.notify;
changed = true;
continue;
}
if ((mode === undefined || mode === "none" || mode === "webhook") && params.legacyWebhook) {
raw.delivery = {
...delivery,
mode: "webhook",
to: mode === "none" ? params.legacyWebhook : (to ?? params.legacyWebhook),
};
delete raw.notify;
changed = true;
continue;
}
if (!params.legacyWebhook) {
warnings.push(
`Cron job "${jobName}" still uses legacy notify fallback, but cron.webhook is unset so doctor cannot migrate it automatically.`,
);
continue;
}
warnings.push(
`Cron job "${jobName}" uses legacy notify fallback alongside delivery mode "${mode}". Migrate it manually so webhook delivery does not replace existing announce behavior.`,
);
}
return { changed, warnings };
}
export async function maybeRepairLegacyCronStore(params: {
cfg: OpenClawConfig;
options: DoctorOptions;
prompter: Pick<DoctorPrompter, "confirm">;
}) {
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const store = await loadCronStore(storePath);
const rawJobs = (store.jobs ?? []) as unknown as Array<Record<string, unknown>>;
if (rawJobs.length === 0) {
return;
}
const normalized = normalizeStoredCronJobs(rawJobs);
const legacyWebhook = trimString(params.cfg.cron?.webhook);
const notifyCount = rawJobs.filter((job) => job.notify === true).length;
const previewLines = formatLegacyIssuePreview(normalized.issues);
if (notifyCount > 0) {
previewLines.push(
`- ${pluralize(notifyCount, "job")} still uses legacy \`notify: true\` webhook fallback`,
);
}
if (previewLines.length === 0) {
return;
}
note(
[
`Legacy cron job storage detected at ${shortenHomePath(storePath)}.`,
...previewLines,
`Repair with ${formatCliCommand("openclaw doctor --fix")} to normalize the store before the next scheduler run.`,
].join("\n"),
"Cron",
);
const shouldRepair = await params.prompter.confirm({
message: "Repair legacy cron jobs now?",
initialValue: true,
});
if (!shouldRepair) {
return;
}
const notifyMigration = migrateLegacyNotifyFallback({
jobs: rawJobs,
legacyWebhook,
});
const changed = normalized.mutated || notifyMigration.changed;
if (!changed && notifyMigration.warnings.length === 0) {
return;
}
if (changed) {
await saveCronStore(storePath, {
version: 1,
jobs: rawJobs as unknown as CronJob[],
});
note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes");
}
if (notifyMigration.warnings.length > 0) {
note(notifyMigration.warnings.join("\n"), "Doctor warnings");
}
}

View File

@@ -31,7 +31,6 @@ import {
import { noteBootstrapFileSize } from "./doctor-bootstrap-size.js";
import { doctorShellCompletion } from "./doctor-completion.js";
import { loadAndMaybeMigrateDoctorConfig } from "./doctor-config-flow.js";
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
import { maybeRepairGatewayDaemon } from "./doctor-gateway-daemon-flow.js";
import { checkGatewayHealth, probeGatewayMemoryStatus } from "./doctor-gateway-health.js";
import {
@@ -221,11 +220,6 @@ export async function doctorCommand(
await noteStateIntegrity(cfg, prompter, configResult.path ?? CONFIG_PATH);
await noteSessionLockHealth({ shouldRepair: prompter.shouldRepair });
await maybeRepairLegacyCronStore({
cfg,
options,
prompter,
});
cfg = await maybeRepairSandboxImages(cfg, runtime, prompter);
noteSandboxScopeWarnings(cfg);

View File

@@ -1,143 +0,0 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const mocks = vi.hoisted(() => ({
resolveDeliveryTarget: vi.fn(),
deliverOutboundPayloads: vi.fn(),
resolveAgentOutboundIdentity: vi.fn().mockReturnValue({ kind: "identity" }),
buildOutboundSessionContext: vi.fn().mockReturnValue({ kind: "session" }),
createOutboundSendDeps: vi.fn().mockReturnValue({ kind: "deps" }),
warn: vi.fn(),
}));
vi.mock("./isolated-agent/delivery-target.js", () => ({
resolveDeliveryTarget: mocks.resolveDeliveryTarget,
}));
vi.mock("../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
}));
vi.mock("../infra/outbound/identity.js", () => ({
resolveAgentOutboundIdentity: mocks.resolveAgentOutboundIdentity,
}));
vi.mock("../infra/outbound/session-context.js", () => ({
buildOutboundSessionContext: mocks.buildOutboundSessionContext,
}));
vi.mock("../cli/outbound-send-deps.js", () => ({
createOutboundSendDeps: mocks.createOutboundSendDeps,
}));
vi.mock("../logging.js", () => ({
getChildLogger: vi.fn(() => ({
warn: mocks.warn,
})),
}));
const { sendFailureNotificationAnnounce } = await import("./delivery.js");
describe("sendFailureNotificationAnnounce", () => {
beforeEach(() => {
vi.clearAllMocks();
mocks.resolveDeliveryTarget.mockResolvedValue({
ok: true,
channel: "telegram",
to: "123",
accountId: "bot-a",
threadId: 42,
mode: "explicit",
});
mocks.deliverOutboundPayloads.mockResolvedValue([{ ok: true }]);
});
afterEach(() => {
vi.useRealTimers();
});
it("delivers failure alerts to the resolved explicit target with strict send settings", async () => {
const deps = {} as never;
const cfg = {} as never;
await sendFailureNotificationAnnounce(
deps,
cfg,
"main",
"job-1",
{ channel: "telegram", to: "123", accountId: "bot-a" },
"Cron failed",
);
expect(mocks.resolveDeliveryTarget).toHaveBeenCalledWith(cfg, "main", {
channel: "telegram",
to: "123",
accountId: "bot-a",
});
expect(mocks.buildOutboundSessionContext).toHaveBeenCalledWith({
cfg,
agentId: "main",
sessionKey: "cron:job-1:failure",
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
cfg,
channel: "telegram",
to: "123",
accountId: "bot-a",
threadId: 42,
payloads: [{ text: "Cron failed" }],
session: { kind: "session" },
identity: { kind: "identity" },
bestEffort: false,
deps: { kind: "deps" },
abortSignal: expect.any(AbortSignal),
}),
);
});
it("does not send when target resolution fails", async () => {
mocks.resolveDeliveryTarget.mockResolvedValue({
ok: false,
error: new Error("target missing"),
});
await sendFailureNotificationAnnounce(
{} as never,
{} as never,
"main",
"job-1",
{ channel: "telegram", to: "123" },
"Cron failed",
);
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
expect(mocks.warn).toHaveBeenCalledWith(
{ error: "target missing" },
"cron: failed to resolve failure destination target",
);
});
it("swallows outbound delivery errors after logging", async () => {
mocks.deliverOutboundPayloads.mockRejectedValue(new Error("send failed"));
await expect(
sendFailureNotificationAnnounce(
{} as never,
{} as never,
"main",
"job-1",
{ channel: "telegram", to: "123" },
"Cron failed",
),
).resolves.toBeUndefined();
expect(mocks.warn).toHaveBeenCalledWith(
expect.objectContaining({
err: "send failed",
channel: "telegram",
to: "123",
}),
"cron: failure destination announce failed",
);
});
});

View File

@@ -148,46 +148,6 @@ describe("resolveFailureDestination", () => {
expect(plan).toBeNull();
});
it("returns null when webhook failure destination matches the primary webhook target", () => {
const plan = resolveFailureDestination(
makeJob({
sessionTarget: "main",
payload: { kind: "systemEvent", text: "tick" },
delivery: {
mode: "webhook",
to: "https://example.invalid/cron",
failureDestination: {
mode: "webhook",
to: "https://example.invalid/cron",
},
},
}),
undefined,
);
expect(plan).toBeNull();
});
it("does not reuse inherited announce recipient when switching failure destination to webhook", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
failureDestination: {
mode: "webhook",
},
},
}),
{
channel: "signal",
to: "group-abc",
mode: "announce",
},
);
expect(plan).toBeNull();
});
it("allows job-level failure destination fields to clear inherited global values", () => {
const plan = resolveFailureDestination(
makeJob({

View File

@@ -54,7 +54,6 @@ export async function runTelegramAnnounceTurn(params: {
to?: string;
bestEffort?: boolean;
};
deliveryContract?: "cron-owned" | "shared";
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
return runCronIsolatedAgentTurn({
cfg: makeCfg(params.home, params.storePath, {
@@ -68,6 +67,5 @@ export async function runTelegramAnnounceTurn(params: {
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
deliveryContract: params.deliveryContract,
});
}

View File

@@ -23,7 +23,6 @@ async function runExplicitTelegramAnnounceTurn(params: {
home: string;
storePath: string;
deps: CliDeps;
deliveryContract?: "cron-owned" | "shared";
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
return runTelegramAnnounceTurn({
...params,
@@ -302,7 +301,6 @@ describe("runCronIsolatedAgentTurn", () => {
home,
storePath,
deps,
deliveryContract: "shared",
});
expectDeliveredOk(res);

View File

@@ -10,7 +10,7 @@
* returning so the timer correctly skips the system-event fallback.
*/
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
// --- Module mocks (must be hoisted before imports) ---
@@ -105,6 +105,7 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested
resolvedDelivery,
deliveryRequested: overrides.deliveryRequested ?? true,
skipHeartbeatDelivery: false,
skipMessagingToolDelivery: false,
deliveryBestEffort: false,
deliveryPayloadHasStructuredContent: false,
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
@@ -133,10 +134,6 @@ describe("dispatchCronDelivery — double-announce guard", () => {
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
});
afterEach(() => {
vi.unstubAllEnvs();
});
it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
// countActiveDescendantRuns returns >0 → enters wait block; still >0 after wait → early return
vi.mocked(countActiveDescendantRuns).mockReturnValue(2);
@@ -258,42 +255,6 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
});
it("retries transient direct announce failures before succeeding", async () => {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads)
.mockRejectedValueOnce(new Error("ECONNRESET while sending"))
.mockResolvedValueOnce([{ ok: true } as never]);
const params = makeBaseParams({ synthesizedText: "Retry me once." });
const state = await dispatchCronDelivery(params);
expect(state.result).toBeUndefined();
expect(state.deliveryAttempted).toBe(true);
expect(state.delivered).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2);
});
it("does not retry permanent direct announce failures", async () => {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads).mockRejectedValue(new Error("chat not found"));
const params = makeBaseParams({ synthesizedText: "This should fail once." });
const state = await dispatchCronDelivery(params);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
expect(state.result).toEqual(
expect.objectContaining({
status: "error",
error: "Error: chat not found",
deliveryAttempted: true,
}),
);
});
it("no delivery requested means deliveryAttempted stays false and no delivery is sent", async () => {
const params = makeBaseParams({
synthesizedText: "Task done.",

View File

@@ -96,13 +96,4 @@ describe("resolveCronDeliveryBestEffort", () => {
} as never;
expect(resolveCronDeliveryBestEffort(job)).toBe(true);
});
it("lets explicit delivery.bestEffort=false override legacy payload bestEffortDeliver=true", async () => {
const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js");
const job = {
delivery: { bestEffort: false },
payload: { kind: "agentTurn", bestEffortDeliver: true },
} as never;
expect(resolveCronDeliveryBestEffort(job)).toBe(false);
});
});

View File

@@ -83,7 +83,7 @@ type DispatchCronDeliveryParams = {
resolvedDelivery: DeliveryTargetResolution;
deliveryRequested: boolean;
skipHeartbeatDelivery: boolean;
skipMessagingToolDelivery?: boolean;
skipMessagingToolDelivery: boolean;
deliveryBestEffort: boolean;
deliveryPayloadHasStructuredContent: boolean;
deliveryPayloads: ReplyPayload[];
@@ -192,17 +192,15 @@ async function retryTransientDirectCronDelivery<T>(params: {
export async function dispatchCronDelivery(
params: DispatchCronDeliveryParams,
): Promise<DispatchCronDeliveryState> {
const skipMessagingToolDelivery = params.skipMessagingToolDelivery === true;
let summary = params.summary;
let outputText = params.outputText;
let synthesizedText = params.synthesizedText;
let deliveryPayloads = params.deliveryPayloads;
// Shared callers can treat a matching message-tool send as the completed
// delivery path. Cron-owned callers keep this false so direct cron delivery
// remains the only source of delivered state.
let delivered = skipMessagingToolDelivery;
let deliveryAttempted = skipMessagingToolDelivery;
// `true` means we confirmed at least one outbound send reached the target.
// Keep this strict so timer fallback can safely decide whether to wake main.
let delivered = params.skipMessagingToolDelivery;
let deliveryAttempted = params.skipMessagingToolDelivery;
const failDeliveryTarget = (error: string) =>
params.withRunSession({
status: "error",
@@ -406,7 +404,11 @@ export async function dispatchCronDelivery(
}
};
if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) {
if (
params.deliveryRequested &&
!params.skipHeartbeatDelivery &&
!params.skipMessagingToolDelivery
) {
if (!params.resolvedDelivery.ok) {
if (!params.deliveryBestEffort) {
return {

View File

@@ -55,7 +55,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
restoreFastTestEnv(previousFastTestEnv);
});
it('disables the message tool when delivery.mode is "none"', async () => {
it('keeps the message tool enabled when delivery.mode is "none"', async () => {
mockFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
@@ -65,7 +65,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
await runCronIsolatedAgentTurn(makeParams());
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
});
it("disables the message tool when cron delivery is active", async () => {
@@ -82,20 +82,4 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
});
it("keeps the message tool enabled for shared callers when delivery is not requested", async () => {
mockFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
mode: "none",
});
await runCronIsolatedAgentTurn({
...makeParams(),
deliveryContract: "shared",
});
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
});
});

View File

@@ -78,10 +78,11 @@ export type RunCronAgentTurnResult = {
/** Last non-empty agent text output (not truncated). */
outputText?: string;
/**
* `true` when the isolated runner already handled the run's user-visible
* delivery outcome. Cron-owned callers use this for cron delivery or
* explicit suppression; shared callers may also use it for a matching
* message-tool send that already reached the target.
* `true` when the isolated run already delivered its output to the target
* channel (via outbound payloads, the subagent announce flow, or a matching
* messaging-tool send). Callers should skip posting a summary to the main
* session to avoid duplicate
* messages. See: https://github.com/openclaw/openclaw/issues/15692
*/
delivered?: boolean;
/**
@@ -143,22 +144,16 @@ function buildCronAgentDefaultsConfig(params: {
type ResolvedCronDeliveryTarget = Awaited<ReturnType<typeof resolveDeliveryTarget>>;
type IsolatedDeliveryContract = "cron-owned" | "shared";
function resolveCronToolPolicy(params: {
deliveryRequested: boolean;
resolvedDelivery: ResolvedCronDeliveryTarget;
deliveryContract: IsolatedDeliveryContract;
}) {
return {
// Only enforce an explicit message target when the cron delivery target
// was successfully resolved. When resolution fails the agent should not
// be blocked by a target it cannot satisfy (#27898).
requireExplicitMessageTarget: params.deliveryRequested && params.resolvedDelivery.ok,
// Cron-owned runs always route user-facing delivery through the runner
// itself. Shared callers keep the previous behavior so non-cron paths do
// not silently lose the message tool when no explicit delivery is active.
disableMessageTool: params.deliveryContract === "cron-owned" ? true : params.deliveryRequested,
disableMessageTool: params.deliveryRequested,
};
}
@@ -166,7 +161,6 @@ async function resolveCronDeliveryContext(params: {
cfg: OpenClawConfig;
job: CronJob;
agentId: string;
deliveryContract: IsolatedDeliveryContract;
}) {
const deliveryPlan = resolveCronDeliveryPlan(params.job);
const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, {
@@ -182,7 +176,6 @@ async function resolveCronDeliveryContext(params: {
toolPolicy: resolveCronToolPolicy({
deliveryRequested: deliveryPlan.requested,
resolvedDelivery,
deliveryContract: params.deliveryContract,
}),
};
}
@@ -207,7 +200,6 @@ export async function runCronIsolatedAgentTurn(params: {
sessionKey: string;
agentId?: string;
lane?: string;
deliveryContract?: IsolatedDeliveryContract;
}): Promise<RunCronAgentTurnResult> {
const abortSignal = params.abortSignal ?? params.signal;
const isAborted = () => abortSignal?.aborted === true;
@@ -218,7 +210,6 @@ export async function runCronIsolatedAgentTurn(params: {
: "cron: job execution timed out";
};
const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1";
const deliveryContract = params.deliveryContract ?? "cron-owned";
const defaultAgentId = resolveDefaultAgentId(params.cfg);
const requestedAgentId =
typeof params.agentId === "string" && params.agentId.trim()
@@ -434,7 +425,6 @@ export async function runCronIsolatedAgentTurn(params: {
cfg: cfgWithAgentDefaults,
job: params.job,
agentId,
deliveryContract,
});
const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now);
@@ -817,7 +807,6 @@ export async function runCronIsolatedAgentTurn(params: {
const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg);
const skipHeartbeatDelivery = deliveryRequested && isHeartbeatOnlyResponse(payloads, ackMaxChars);
const skipMessagingToolDelivery =
deliveryContract === "shared" &&
deliveryRequested &&
finalRunResult.didSendViaMessagingTool === true &&
(finalRunResult.messagingToolSentTargets ?? []).some((target) =>
@@ -827,6 +816,7 @@ export async function runCronIsolatedAgentTurn(params: {
accountId: resolvedDelivery.accountId,
}),
);
const deliveryResult = await dispatchCronDelivery({
cfg: params.cfg,
cfgWithAgentDefaults,

View File

@@ -47,12 +47,8 @@ describe("isLikelyInterimCronMessage", () => {
false,
);
});
it("does not treat empty as interim (empty = NO_REPLY was stripped)", () => {
expect(isLikelyInterimCronMessage("")).toBe(false);
});
it("does not treat whitespace-only as interim", () => {
expect(isLikelyInterimCronMessage(" ")).toBe(false);
it("treats empty as interim", () => {
expect(isLikelyInterimCronMessage("")).toBe(true);
});
});

View File

@@ -42,10 +42,7 @@ function normalizeHintText(value: string): string {
export function isLikelyInterimCronMessage(value: string): boolean {
const normalized = normalizeHintText(value);
if (!normalized) {
// Empty text after payload filtering means the agent either returned
// NO_REPLY (deliberately silent) or produced no deliverable content.
// Do not treat this as an interim acknowledgement that needs a rerun.
return false;
return true;
}
const words = normalized.split(" ").filter(Boolean).length;
return words <= 45 && INTERIM_CRON_HINTS.some((hint) => normalized.includes(hint));

View File

@@ -86,7 +86,7 @@ describe("CronService delivery plan consistency", () => {
});
});
it("treats delivery object without mode as announce without reviving legacy relay fallback", async () => {
it("treats delivery object without mode as announce", async () => {
await withCronService({}, async ({ cron, enqueueSystemEvent }) => {
const job = await addIsolatedAgentTurnJob(cron, {
name: "partial-delivery",
@@ -96,8 +96,10 @@ describe("CronService delivery plan consistency", () => {
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(cron.getJob(job.id)?.state.lastDeliveryStatus).toBe("unknown");
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Cron: done",
expect.objectContaining({ agentId: undefined }),
);
});
});

View File

@@ -86,7 +86,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
expect(requestHeartbeatNow).not.toHaveBeenCalled();
});
it("does not revive legacy main-session relay for real cron summaries", async () => {
it("still enqueues real cron summaries as system events", async () => {
const { storePath } = await makeStorePath();
const now = Date.now();
@@ -109,7 +109,10 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
await runScheduledCron(cron);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
// Real summaries SHOULD be enqueued.
expect(enqueueSystemEvent).toHaveBeenCalledWith(
expect.stringContaining("Weather update"),
expect.objectContaining({ agentId: undefined }),
);
});
});

View File

@@ -620,14 +620,14 @@ describe("CronService", () => {
await stopCronAndCleanup(cron, store);
});
it("runs an isolated job without posting a fallback summary to main", async () => {
it("runs an isolated job and posts summary to main", async () => {
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" }));
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createIsolatedAnnounceHarness(runIsolatedAgentJob);
await runIsolatedAnnounceScenario({ cron, events, name: "weekly" });
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
expectMainSystemEventPosted(enqueueSystemEvent, "Cron: done");
expect(requestHeartbeatNow).toHaveBeenCalled();
await stopCronAndCleanup(cron, store);
});
@@ -685,7 +685,7 @@ describe("CronService", () => {
await stopCronAndCleanup(cron, store);
});
it("does not post a fallback main summary when an isolated job errors", async () => {
it("posts last output to main even when isolated job errors", async () => {
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
summary: "last output",
@@ -700,8 +700,8 @@ describe("CronService", () => {
status: "error",
});
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
expectMainSystemEventPosted(enqueueSystemEvent, "Cron (error): last output");
expect(requestHeartbeatNow).toHaveBeenCalled();
await stopCronAndCleanup(cron, store);
});

View File

@@ -1,10 +1,161 @@
import fs from "node:fs";
import { normalizeStoredCronJobs } from "../store-migration.js";
import { normalizeLegacyDeliveryInput } from "../legacy-delivery.js";
import { parseAbsoluteTimeMs } from "../parse.js";
import { migrateLegacyCronPayload } from "../payload-migration.js";
import { coerceFiniteScheduleNumber } from "../schedule.js";
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js";
import { loadCronStore, saveCronStore } from "../store.js";
import type { CronJob } from "../types.js";
import { recomputeNextRuns } from "./jobs.js";
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
import type { CronServiceState } from "./state.js";
function normalizePayloadKind(payload: Record<string, unknown>) {
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
if (raw === "agentturn") {
payload.kind = "agentTurn";
return true;
}
if (raw === "systemevent") {
payload.kind = "systemEvent";
return true;
}
return false;
}
function inferPayloadIfMissing(raw: Record<string, unknown>) {
const message = typeof raw.message === "string" ? raw.message.trim() : "";
const text = typeof raw.text === "string" ? raw.text.trim() : "";
const command = typeof raw.command === "string" ? raw.command.trim() : "";
if (message) {
raw.payload = { kind: "agentTurn", message };
return true;
}
if (text) {
raw.payload = { kind: "systemEvent", text };
return true;
}
if (command) {
raw.payload = { kind: "systemEvent", text: command };
return true;
}
return false;
}
function copyTopLevelAgentTurnFields(
raw: Record<string, unknown>,
payload: Record<string, unknown>,
) {
let mutated = false;
const copyTrimmedString = (field: "model" | "thinking") => {
const existing = payload[field];
if (typeof existing === "string" && existing.trim()) {
return;
}
const value = raw[field];
if (typeof value === "string" && value.trim()) {
payload[field] = value.trim();
mutated = true;
}
};
copyTrimmedString("model");
copyTrimmedString("thinking");
if (
typeof payload.timeoutSeconds !== "number" &&
typeof raw.timeoutSeconds === "number" &&
Number.isFinite(raw.timeoutSeconds)
) {
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
mutated = true;
}
if (
typeof payload.allowUnsafeExternalContent !== "boolean" &&
typeof raw.allowUnsafeExternalContent === "boolean"
) {
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
mutated = true;
}
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
payload.deliver = raw.deliver;
mutated = true;
}
if (
typeof payload.channel !== "string" &&
typeof raw.channel === "string" &&
raw.channel.trim()
) {
payload.channel = raw.channel.trim();
mutated = true;
}
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
payload.to = raw.to.trim();
mutated = true;
}
if (
typeof payload.bestEffortDeliver !== "boolean" &&
typeof raw.bestEffortDeliver === "boolean"
) {
payload.bestEffortDeliver = raw.bestEffortDeliver;
mutated = true;
}
if (
typeof payload.provider !== "string" &&
typeof raw.provider === "string" &&
raw.provider.trim()
) {
payload.provider = raw.provider.trim();
mutated = true;
}
return mutated;
}
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
if ("model" in raw) {
delete raw.model;
}
if ("thinking" in raw) {
delete raw.thinking;
}
if ("timeoutSeconds" in raw) {
delete raw.timeoutSeconds;
}
if ("allowUnsafeExternalContent" in raw) {
delete raw.allowUnsafeExternalContent;
}
if ("message" in raw) {
delete raw.message;
}
if ("text" in raw) {
delete raw.text;
}
if ("deliver" in raw) {
delete raw.deliver;
}
if ("channel" in raw) {
delete raw.channel;
}
if ("to" in raw) {
delete raw.to;
}
if ("bestEffortDeliver" in raw) {
delete raw.bestEffortDeliver;
}
if ("provider" in raw) {
delete raw.provider;
}
if ("command" in raw) {
delete raw.command;
}
if ("timeout" in raw) {
delete raw.timeout;
}
}
async function getFileMtimeMs(path: string): Promise<number | null> {
try {
const stats = await fs.promises.stat(path);
@@ -34,7 +185,287 @@ export async function ensureLoaded(
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
const loaded = await loadCronStore(state.deps.storePath);
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
const { mutated } = normalizeStoredCronJobs(jobs);
let mutated = false;
for (const raw of jobs) {
const state = raw.state;
if (!state || typeof state !== "object" || Array.isArray(state)) {
raw.state = {};
mutated = true;
}
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
if (!rawId && legacyJobId) {
raw.id = legacyJobId;
mutated = true;
} else if (rawId && raw.id !== rawId) {
raw.id = rawId;
mutated = true;
}
if ("jobId" in raw) {
delete raw.jobId;
mutated = true;
}
if (typeof raw.schedule === "string") {
const expr = raw.schedule.trim();
raw.schedule = { kind: "cron", expr };
mutated = true;
}
const nameRaw = raw.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
raw.name = inferLegacyName({
schedule: raw.schedule as never,
payload: raw.payload as never,
});
mutated = true;
} else {
raw.name = nameRaw.trim();
}
const desc = normalizeOptionalText(raw.description);
if (raw.description !== desc) {
raw.description = desc;
mutated = true;
}
if ("sessionKey" in raw) {
const sessionKey =
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
if (raw.sessionKey !== sessionKey) {
raw.sessionKey = sessionKey;
mutated = true;
}
}
if (typeof raw.enabled !== "boolean") {
raw.enabled = true;
mutated = true;
}
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
if (wakeModeRaw === "next-heartbeat") {
if (raw.wakeMode !== "next-heartbeat") {
raw.wakeMode = "next-heartbeat";
mutated = true;
}
} else if (wakeModeRaw === "now") {
if (raw.wakeMode !== "now") {
raw.wakeMode = "now";
mutated = true;
}
} else {
raw.wakeMode = "now";
mutated = true;
}
const payload = raw.payload;
if (
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
inferPayloadIfMissing(raw)
) {
mutated = true;
}
const payloadRecord =
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
? (raw.payload as Record<string, unknown>)
: null;
if (payloadRecord) {
if (normalizePayloadKind(payloadRecord)) {
mutated = true;
}
if (!payloadRecord.kind) {
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
payloadRecord.kind = "agentTurn";
mutated = true;
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
payloadRecord.kind = "systemEvent";
mutated = true;
}
}
if (payloadRecord.kind === "agentTurn") {
if (copyTopLevelAgentTurnFields(raw, payloadRecord)) {
mutated = true;
}
}
}
const hadLegacyTopLevelFields =
"model" in raw ||
"thinking" in raw ||
"timeoutSeconds" in raw ||
"allowUnsafeExternalContent" in raw ||
"message" in raw ||
"text" in raw ||
"deliver" in raw ||
"channel" in raw ||
"to" in raw ||
"bestEffortDeliver" in raw ||
"provider" in raw ||
"command" in raw ||
"timeout" in raw;
if (hadLegacyTopLevelFields) {
stripLegacyTopLevelFields(raw);
mutated = true;
}
if (payloadRecord) {
if (migrateLegacyCronPayload(payloadRecord)) {
mutated = true;
}
}
const schedule = raw.schedule;
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
const sched = schedule as Record<string, unknown>;
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
if (!kind && ("at" in sched || "atMs" in sched)) {
sched.kind = "at";
mutated = true;
}
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
const atMsRaw = sched.atMs;
const parsedAtMs =
typeof atMsRaw === "number"
? atMsRaw
: typeof atMsRaw === "string"
? parseAbsoluteTimeMs(atMsRaw)
: atRaw
? parseAbsoluteTimeMs(atRaw)
: null;
if (parsedAtMs !== null) {
sched.at = new Date(parsedAtMs).toISOString();
if ("atMs" in sched) {
delete sched.atMs;
}
mutated = true;
}
const everyMsRaw = sched.everyMs;
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
if (everyMs !== null && everyMsRaw !== everyMs) {
sched.everyMs = everyMs;
mutated = true;
}
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
const anchorRaw = sched.anchorMs;
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
const normalizedAnchor =
anchorCoerced !== undefined
? Math.max(0, Math.floor(anchorCoerced))
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
? Math.max(0, Math.floor(raw.createdAtMs))
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
? Math.max(0, Math.floor(raw.updatedAtMs))
: null;
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
sched.anchorMs = normalizedAnchor;
mutated = true;
}
}
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
let normalizedExpr = exprRaw;
if (!normalizedExpr && legacyCronRaw) {
normalizedExpr = legacyCronRaw;
sched.expr = normalizedExpr;
mutated = true;
}
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
sched.expr = normalizedExpr;
mutated = true;
}
if ("cron" in sched) {
delete sched.cron;
mutated = true;
}
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
if (targetStaggerMs === undefined) {
if ("staggerMs" in sched) {
delete sched.staggerMs;
mutated = true;
}
} else if (sched.staggerMs !== targetStaggerMs) {
sched.staggerMs = targetStaggerMs;
mutated = true;
}
}
}
const delivery = raw.delivery;
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
const modeRaw = (delivery as { mode?: unknown }).mode;
if (typeof modeRaw === "string") {
const lowered = modeRaw.trim().toLowerCase();
if (lowered === "deliver") {
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
} else if (modeRaw === undefined || modeRaw === null) {
// Explicitly persist the default so existing jobs don't silently
// change behaviour when the runtime default shifts.
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
}
const isolation = raw.isolation;
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
delete raw.isolation;
mutated = true;
}
const payloadKind =
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
const normalizedSessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
if (raw.sessionTarget !== normalizedSessionTarget) {
raw.sessionTarget = normalizedSessionTarget;
mutated = true;
}
} else {
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
if (raw.sessionTarget !== inferredSessionTarget) {
raw.sessionTarget = inferredSessionTarget;
mutated = true;
}
}
const sessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
const isIsolatedAgentTurn =
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
const normalizedLegacy = normalizeLegacyDeliveryInput({
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
payload: payloadRecord,
});
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
if (!hasDelivery && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
} else if (!hasDelivery) {
raw.delivery = { mode: "announce" };
mutated = true;
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
}
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
state.storeLoadedAtMs = state.deps.nowMs();
state.storeFileMtimeMs = fileMtimeMs;

View File

@@ -1,7 +1,9 @@
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
import { isCronSystemEvent } from "../../infra/heartbeat-events-filter.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type {
CronDeliveryStatus,
@@ -1136,6 +1138,46 @@ export async function executeJobCore(
return { status: "error", error: timeoutErrorMessage() };
}
// Post a short summary back to the main session only when announce
// delivery was requested and we are confident no outbound delivery path
// ran. If delivery was attempted but final ack is uncertain, suppress the
// main summary to avoid duplicate user-facing sends.
// See: https://github.com/openclaw/openclaw/issues/15692
//
// Also suppress heartbeat-only summaries (e.g. "HEARTBEAT_OK") — these
// are internal ack tokens that should never leak into user conversations.
// See: https://github.com/openclaw/openclaw/issues/32013
const summaryText = res.summary?.trim();
const deliveryPlan = resolveCronDeliveryPlan(job);
const suppressMainSummary =
res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested;
if (
shouldEnqueueCronMainSummary({
summaryText,
deliveryRequested: deliveryPlan.requested,
delivered: res.delivered,
deliveryAttempted: res.deliveryAttempted,
suppressMainSummary,
isCronSystemEvent,
})
) {
const prefix = "Cron";
const label =
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
state.deps.enqueueSystemEvent(label, {
agentId: job.agentId,
sessionKey: job.sessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === "now") {
state.deps.requestHeartbeatNow({
reason: `cron:${job.id}`,
agentId: job.agentId,
sessionKey: job.sessionKey,
});
}
}
return {
status: res.status,
error: res.error,

View File

@@ -1,78 +0,0 @@
import { describe, expect, it } from "vitest";
import { normalizeStoredCronJobs } from "./store-migration.js";
describe("normalizeStoredCronJobs", () => {
it("normalizes legacy cron fields and reports migration issues", () => {
const jobs = [
{
jobId: "legacy-job",
schedule: { kind: "cron", cron: "*/5 * * * *", tz: "UTC" },
message: "say hi",
model: "openai/gpt-4.1",
deliver: true,
provider: " TeLeGrAm ",
to: "12345",
},
] as Array<Record<string, unknown>>;
const result = normalizeStoredCronJobs(jobs);
expect(result.mutated).toBe(true);
expect(result.issues).toMatchObject({
jobId: 1,
legacyScheduleCron: 1,
legacyTopLevelPayloadFields: 1,
legacyTopLevelDeliveryFields: 1,
});
const [job] = jobs;
expect(job?.jobId).toBeUndefined();
expect(job?.id).toBe("legacy-job");
expect(job?.schedule).toMatchObject({
kind: "cron",
expr: "*/5 * * * *",
tz: "UTC",
});
expect(job?.message).toBeUndefined();
expect(job?.provider).toBeUndefined();
expect(job?.delivery).toMatchObject({
mode: "announce",
channel: "telegram",
to: "12345",
});
expect(job?.payload).toMatchObject({
kind: "agentTurn",
message: "say hi",
model: "openai/gpt-4.1",
});
});
it("normalizes payload provider alias into channel", () => {
const jobs = [
{
id: "legacy-provider",
schedule: { kind: "every", everyMs: 60_000 },
payload: {
kind: "agentTurn",
message: "ping",
provider: " Slack ",
},
},
] as Array<Record<string, unknown>>;
const result = normalizeStoredCronJobs(jobs);
expect(result.mutated).toBe(true);
expect(result.issues.legacyPayloadProvider).toBe(1);
expect(jobs[0]?.payload).toMatchObject({
kind: "agentTurn",
message: "ping",
});
const payload = jobs[0]?.payload as Record<string, unknown> | undefined;
expect(payload?.provider).toBeUndefined();
expect(jobs[0]?.delivery).toMatchObject({
mode: "announce",
channel: "slack",
});
});
});

View File

@@ -1,491 +0,0 @@
import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js";
import { parseAbsoluteTimeMs } from "./parse.js";
import { migrateLegacyCronPayload } from "./payload-migration.js";
import { coerceFiniteScheduleNumber } from "./schedule.js";
import { inferLegacyName, normalizeOptionalText } from "./service/normalize.js";
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js";
type CronStoreIssueKey =
| "jobId"
| "legacyScheduleString"
| "legacyScheduleCron"
| "legacyPayloadKind"
| "legacyPayloadProvider"
| "legacyTopLevelPayloadFields"
| "legacyTopLevelDeliveryFields"
| "legacyDeliveryMode";
type CronStoreIssues = Partial<Record<CronStoreIssueKey, number>>;
type NormalizeCronStoreJobsResult = {
issues: CronStoreIssues;
jobs: Array<Record<string, unknown>>;
mutated: boolean;
};
function incrementIssue(issues: CronStoreIssues, key: CronStoreIssueKey) {
issues[key] = (issues[key] ?? 0) + 1;
}
function normalizePayloadKind(payload: Record<string, unknown>) {
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
if (raw === "agentturn") {
payload.kind = "agentTurn";
return true;
}
if (raw === "systemevent") {
payload.kind = "systemEvent";
return true;
}
return false;
}
function inferPayloadIfMissing(raw: Record<string, unknown>) {
const message = typeof raw.message === "string" ? raw.message.trim() : "";
const text = typeof raw.text === "string" ? raw.text.trim() : "";
const command = typeof raw.command === "string" ? raw.command.trim() : "";
if (message) {
raw.payload = { kind: "agentTurn", message };
return true;
}
if (text) {
raw.payload = { kind: "systemEvent", text };
return true;
}
if (command) {
raw.payload = { kind: "systemEvent", text: command };
return true;
}
return false;
}
function copyTopLevelAgentTurnFields(
raw: Record<string, unknown>,
payload: Record<string, unknown>,
) {
let mutated = false;
const copyTrimmedString = (field: "model" | "thinking") => {
const existing = payload[field];
if (typeof existing === "string" && existing.trim()) {
return;
}
const value = raw[field];
if (typeof value === "string" && value.trim()) {
payload[field] = value.trim();
mutated = true;
}
};
copyTrimmedString("model");
copyTrimmedString("thinking");
if (
typeof payload.timeoutSeconds !== "number" &&
typeof raw.timeoutSeconds === "number" &&
Number.isFinite(raw.timeoutSeconds)
) {
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
mutated = true;
}
if (
typeof payload.allowUnsafeExternalContent !== "boolean" &&
typeof raw.allowUnsafeExternalContent === "boolean"
) {
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
mutated = true;
}
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
payload.deliver = raw.deliver;
mutated = true;
}
if (
typeof payload.channel !== "string" &&
typeof raw.channel === "string" &&
raw.channel.trim()
) {
payload.channel = raw.channel.trim();
mutated = true;
}
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
payload.to = raw.to.trim();
mutated = true;
}
if (
typeof payload.bestEffortDeliver !== "boolean" &&
typeof raw.bestEffortDeliver === "boolean"
) {
payload.bestEffortDeliver = raw.bestEffortDeliver;
mutated = true;
}
if (
typeof payload.provider !== "string" &&
typeof raw.provider === "string" &&
raw.provider.trim()
) {
payload.provider = raw.provider.trim();
mutated = true;
}
return mutated;
}
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
if ("model" in raw) {
delete raw.model;
}
if ("thinking" in raw) {
delete raw.thinking;
}
if ("timeoutSeconds" in raw) {
delete raw.timeoutSeconds;
}
if ("allowUnsafeExternalContent" in raw) {
delete raw.allowUnsafeExternalContent;
}
if ("message" in raw) {
delete raw.message;
}
if ("text" in raw) {
delete raw.text;
}
if ("deliver" in raw) {
delete raw.deliver;
}
if ("channel" in raw) {
delete raw.channel;
}
if ("to" in raw) {
delete raw.to;
}
if ("bestEffortDeliver" in raw) {
delete raw.bestEffortDeliver;
}
if ("provider" in raw) {
delete raw.provider;
}
if ("command" in raw) {
delete raw.command;
}
if ("timeout" in raw) {
delete raw.timeout;
}
}
export function normalizeStoredCronJobs(
jobs: Array<Record<string, unknown>>,
): NormalizeCronStoreJobsResult {
const issues: CronStoreIssues = {};
let mutated = false;
for (const raw of jobs) {
const jobIssues = new Set<CronStoreIssueKey>();
const trackIssue = (key: CronStoreIssueKey) => {
if (jobIssues.has(key)) {
return;
}
jobIssues.add(key);
incrementIssue(issues, key);
};
const state = raw.state;
if (!state || typeof state !== "object" || Array.isArray(state)) {
raw.state = {};
mutated = true;
}
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
if (!rawId && legacyJobId) {
raw.id = legacyJobId;
mutated = true;
trackIssue("jobId");
} else if (rawId && raw.id !== rawId) {
raw.id = rawId;
mutated = true;
}
if ("jobId" in raw) {
delete raw.jobId;
mutated = true;
trackIssue("jobId");
}
if (typeof raw.schedule === "string") {
const expr = raw.schedule.trim();
raw.schedule = { kind: "cron", expr };
mutated = true;
trackIssue("legacyScheduleString");
}
const nameRaw = raw.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
raw.name = inferLegacyName({
schedule: raw.schedule as never,
payload: raw.payload as never,
});
mutated = true;
} else {
raw.name = nameRaw.trim();
}
const desc = normalizeOptionalText(raw.description);
if (raw.description !== desc) {
raw.description = desc;
mutated = true;
}
if ("sessionKey" in raw) {
const sessionKey =
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
if (raw.sessionKey !== sessionKey) {
raw.sessionKey = sessionKey;
mutated = true;
}
}
if (typeof raw.enabled !== "boolean") {
raw.enabled = true;
mutated = true;
}
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
if (wakeModeRaw === "next-heartbeat") {
if (raw.wakeMode !== "next-heartbeat") {
raw.wakeMode = "next-heartbeat";
mutated = true;
}
} else if (wakeModeRaw === "now") {
if (raw.wakeMode !== "now") {
raw.wakeMode = "now";
mutated = true;
}
} else {
raw.wakeMode = "now";
mutated = true;
}
const payload = raw.payload;
if (
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
inferPayloadIfMissing(raw)
) {
mutated = true;
trackIssue("legacyTopLevelPayloadFields");
}
const payloadRecord =
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
? (raw.payload as Record<string, unknown>)
: null;
if (payloadRecord) {
if (normalizePayloadKind(payloadRecord)) {
mutated = true;
trackIssue("legacyPayloadKind");
}
if (!payloadRecord.kind) {
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
payloadRecord.kind = "agentTurn";
mutated = true;
trackIssue("legacyPayloadKind");
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
payloadRecord.kind = "systemEvent";
mutated = true;
trackIssue("legacyPayloadKind");
}
}
if (payloadRecord.kind === "agentTurn" && copyTopLevelAgentTurnFields(raw, payloadRecord)) {
mutated = true;
}
}
const hadLegacyTopLevelPayloadFields =
"model" in raw ||
"thinking" in raw ||
"timeoutSeconds" in raw ||
"allowUnsafeExternalContent" in raw ||
"message" in raw ||
"text" in raw ||
"command" in raw ||
"timeout" in raw;
const hadLegacyTopLevelDeliveryFields =
"deliver" in raw ||
"channel" in raw ||
"to" in raw ||
"bestEffortDeliver" in raw ||
"provider" in raw;
if (hadLegacyTopLevelPayloadFields || hadLegacyTopLevelDeliveryFields) {
stripLegacyTopLevelFields(raw);
mutated = true;
if (hadLegacyTopLevelPayloadFields) {
trackIssue("legacyTopLevelPayloadFields");
}
if (hadLegacyTopLevelDeliveryFields) {
trackIssue("legacyTopLevelDeliveryFields");
}
}
if (payloadRecord) {
const hadLegacyPayloadProvider =
typeof payloadRecord.provider === "string" && payloadRecord.provider.trim().length > 0;
if (migrateLegacyCronPayload(payloadRecord)) {
mutated = true;
if (hadLegacyPayloadProvider) {
trackIssue("legacyPayloadProvider");
}
}
}
const schedule = raw.schedule;
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
const sched = schedule as Record<string, unknown>;
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
if (!kind && ("at" in sched || "atMs" in sched)) {
sched.kind = "at";
mutated = true;
}
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
const atMsRaw = sched.atMs;
const parsedAtMs =
typeof atMsRaw === "number"
? atMsRaw
: typeof atMsRaw === "string"
? parseAbsoluteTimeMs(atMsRaw)
: atRaw
? parseAbsoluteTimeMs(atRaw)
: null;
if (parsedAtMs !== null) {
sched.at = new Date(parsedAtMs).toISOString();
if ("atMs" in sched) {
delete sched.atMs;
}
mutated = true;
}
const everyMsRaw = sched.everyMs;
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
if (everyMs !== null && everyMsRaw !== everyMs) {
sched.everyMs = everyMs;
mutated = true;
}
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
const anchorRaw = sched.anchorMs;
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
const normalizedAnchor =
anchorCoerced !== undefined
? Math.max(0, Math.floor(anchorCoerced))
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
? Math.max(0, Math.floor(raw.createdAtMs))
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
? Math.max(0, Math.floor(raw.updatedAtMs))
: null;
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
sched.anchorMs = normalizedAnchor;
mutated = true;
}
}
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
let normalizedExpr = exprRaw;
if (!normalizedExpr && legacyCronRaw) {
normalizedExpr = legacyCronRaw;
sched.expr = normalizedExpr;
mutated = true;
trackIssue("legacyScheduleCron");
}
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
sched.expr = normalizedExpr;
mutated = true;
}
if ("cron" in sched) {
delete sched.cron;
mutated = true;
trackIssue("legacyScheduleCron");
}
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
if (targetStaggerMs === undefined) {
if ("staggerMs" in sched) {
delete sched.staggerMs;
mutated = true;
}
} else if (sched.staggerMs !== targetStaggerMs) {
sched.staggerMs = targetStaggerMs;
mutated = true;
}
}
}
const delivery = raw.delivery;
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
const modeRaw = (delivery as { mode?: unknown }).mode;
if (typeof modeRaw === "string") {
const lowered = modeRaw.trim().toLowerCase();
if (lowered === "deliver") {
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
trackIssue("legacyDeliveryMode");
}
} else if (modeRaw === undefined || modeRaw === null) {
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
}
const isolation = raw.isolation;
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
delete raw.isolation;
mutated = true;
}
const payloadKind =
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
const normalizedSessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
if (raw.sessionTarget !== normalizedSessionTarget) {
raw.sessionTarget = normalizedSessionTarget;
mutated = true;
}
} else {
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
if (raw.sessionTarget !== inferredSessionTarget) {
raw.sessionTarget = inferredSessionTarget;
mutated = true;
}
}
const sessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
const isIsolatedAgentTurn =
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
const normalizedLegacy = normalizeLegacyDeliveryInput({
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
payload: payloadRecord,
});
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
if (!hasDelivery && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
} else if (!hasDelivery) {
raw.delivery = { mode: "announce" };
mutated = true;
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
}
return { issues, jobs, mutated };
}

View File

@@ -848,32 +848,6 @@ describe("gateway server cron", () => {
'Cron job "failure destination webhook" failed: unknown error',
);
fetchWithSsrFGuardMock.mockClear();
cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "best-effort failed" });
const bestEffortFailureDestJobId = await addWebhookCronJob({
ws,
name: "best effort failure destination webhook",
sessionTarget: "isolated",
delivery: {
mode: "announce",
channel: "telegram",
to: "19098680",
bestEffort: true,
failureDestination: {
mode: "webhook",
to: "https://example.invalid/failure-destination",
},
},
});
const bestEffortFailureDestFinished = waitForCronEvent(
ws,
(payload) =>
payload?.jobId === bestEffortFailureDestJobId && payload?.action === "finished",
);
await runCronJobForce(ws, bestEffortFailureDestJobId);
await bestEffortFailureDestFinished;
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
cronIsolatedRun.mockResolvedValueOnce({ status: "ok", summary: "" });
const noSummaryJobId = await addWebhookCronJob({
ws,
@@ -887,7 +861,7 @@ describe("gateway server cron", () => {
);
await runCronJobForce(ws, noSummaryJobId);
await noSummaryFinished;
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1);
} finally {
await cleanupCronTestRun({ ws, server, prevSkipCron });
}

View File

@@ -75,10 +75,6 @@ describe("gateway server hooks", () => {
expect(resAgent.status).toBe(200);
const agentEvents = await waitForSystemEvent();
expect(agentEvents.some((e) => e.includes("Hook Email: done"))).toBe(true);
const firstCall = (cronIsolatedRun.mock.calls[0] as unknown[] | undefined)?.[0] as {
deliveryContract?: string;
};
expect(firstCall?.deliveryContract).toBe("shared");
drainSystemEvents(resolveMainKey());
mockIsolatedRunOkOnce();

View File

@@ -76,7 +76,6 @@ export function createGatewayHooksRequestHandler(params: {
message: value.message,
sessionKey,
lane: "cron",
deliveryContract: "shared",
});
const summary = result.summary?.trim() || result.error?.trim() || result.status;
const prefix =

View File

@@ -1,49 +0,0 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { createMockPluginRegistry } from "./hooks.test-helpers.js";
async function importHookRunnerGlobalModule() {
return import("./hook-runner-global.js");
}
afterEach(async () => {
const mod = await importHookRunnerGlobalModule();
mod.resetGlobalHookRunner();
vi.resetModules();
});
describe("hook-runner-global", () => {
it("preserves the initialized runner across module reloads", async () => {
const modA = await importHookRunnerGlobalModule();
const registry = createMockPluginRegistry([{ hookName: "message_received", handler: vi.fn() }]);
modA.initializeGlobalHookRunner(registry);
expect(modA.getGlobalHookRunner()?.hasHooks("message_received")).toBe(true);
vi.resetModules();
const modB = await importHookRunnerGlobalModule();
expect(modB.getGlobalHookRunner()).not.toBeNull();
expect(modB.getGlobalHookRunner()?.hasHooks("message_received")).toBe(true);
expect(modB.getGlobalPluginRegistry()).toBe(registry);
});
it("clears the shared state across module reloads", async () => {
const modA = await importHookRunnerGlobalModule();
const registry = createMockPluginRegistry([{ hookName: "message_received", handler: vi.fn() }]);
modA.initializeGlobalHookRunner(registry);
vi.resetModules();
const modB = await importHookRunnerGlobalModule();
modB.resetGlobalHookRunner();
expect(modB.getGlobalHookRunner()).toBeNull();
expect(modB.getGlobalPluginRegistry()).toBeNull();
vi.resetModules();
const modC = await importHookRunnerGlobalModule();
expect(modC.getGlobalHookRunner()).toBeNull();
expect(modC.getGlobalPluginRegistry()).toBeNull();
});
});

View File

@@ -12,31 +12,16 @@ import type { PluginHookGatewayContext, PluginHookGatewayStopEvent } from "./typ
const log = createSubsystemLogger("plugins");
type HookRunnerGlobalState = {
hookRunner: HookRunner | null;
registry: PluginRegistry | null;
};
const hookRunnerGlobalStateKey = Symbol.for("openclaw.plugins.hook-runner-global-state");
function getHookRunnerGlobalState(): HookRunnerGlobalState {
const globalStore = globalThis as typeof globalThis & {
[hookRunnerGlobalStateKey]?: HookRunnerGlobalState;
};
return (globalStore[hookRunnerGlobalStateKey] ??= {
hookRunner: null,
registry: null,
});
}
let globalHookRunner: HookRunner | null = null;
let globalRegistry: PluginRegistry | null = null;
/**
* Initialize the global hook runner with a plugin registry.
* Called once when plugins are loaded during gateway startup.
*/
export function initializeGlobalHookRunner(registry: PluginRegistry): void {
const state = getHookRunnerGlobalState();
state.registry = registry;
state.hookRunner = createHookRunner(registry, {
globalRegistry = registry;
globalHookRunner = createHookRunner(registry, {
logger: {
debug: (msg) => log.debug(msg),
warn: (msg) => log.warn(msg),
@@ -56,7 +41,7 @@ export function initializeGlobalHookRunner(registry: PluginRegistry): void {
* Returns null if plugins haven't been loaded yet.
*/
export function getGlobalHookRunner(): HookRunner | null {
return getHookRunnerGlobalState().hookRunner;
return globalHookRunner;
}
/**
@@ -64,14 +49,14 @@ export function getGlobalHookRunner(): HookRunner | null {
* Returns null if plugins haven't been loaded yet.
*/
export function getGlobalPluginRegistry(): PluginRegistry | null {
return getHookRunnerGlobalState().registry;
return globalRegistry;
}
/**
* Check if any hooks are registered for a given hook name.
*/
export function hasGlobalHooks(hookName: Parameters<HookRunner["hasHooks"]>[0]): boolean {
return getHookRunnerGlobalState().hookRunner?.hasHooks(hookName) ?? false;
return globalHookRunner?.hasHooks(hookName) ?? false;
}
export async function runGlobalGatewayStopSafely(params: {
@@ -98,7 +83,6 @@ export async function runGlobalGatewayStopSafely(params: {
* Reset the global hook runner (for testing).
*/
export function resetGlobalHookRunner(): void {
const state = getHookRunnerGlobalState();
state.hookRunner = null;
state.registry = null;
globalHookRunner = null;
globalRegistry = null;
}

View File

@@ -270,4 +270,58 @@ describe("registerTelegramNativeCommands", () => {
);
expect(sendMessage).not.toHaveBeenCalledWith(123, "Command not found.");
});
it("uses the DM thread session key for plugin command internal sent hooks", async () => {
const commandHandlers = new Map<string, (ctx: unknown) => Promise<void>>();
pluginCommandMocks.getPluginCommandSpecs.mockReturnValue([
{
name: "plug",
description: "Plugin command",
},
] as never);
pluginCommandMocks.matchPluginCommand.mockReturnValue({
command: { key: "plug", requireAuth: false },
args: undefined,
} as never);
registerTelegramNativeCommands({
...buildParams({
channels: {
telegram: {
dmPolicy: "open",
},
},
}),
bot: {
api: {
setMyCommands: vi.fn().mockResolvedValue(undefined),
sendMessage: vi.fn().mockResolvedValue(undefined),
},
command: vi.fn((name: string, cb: (ctx: unknown) => Promise<void>) => {
commandHandlers.set(name, cb);
}),
} as unknown as Parameters<typeof registerTelegramNativeCommands>[0]["bot"],
});
const handler = commandHandlers.get("plug");
expect(handler).toBeTruthy();
await handler?.({
match: "",
message: {
message_id: 42,
date: Math.floor(Date.now() / 1000),
chat: { id: 12345, type: "private" },
from: { id: 12345, username: "alice" },
message_thread_id: 99,
},
});
expect(deliveryMocks.deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
sessionKeyForInternalHooks: "agent:main:main:thread:12345:99",
}),
);
});
});

View File

@@ -540,6 +540,20 @@ export const registerTelegramNativeCommands = ({
chunkMode: params.chunkMode,
linkPreview: telegramCfg.linkPreview,
});
const resolveDmThreadSessionKey = (params: {
baseSessionKey: string;
chatId: string | number;
threadSpec: ReturnType<typeof resolveTelegramThreadSpec>;
}): string => {
const dmThreadId = params.threadSpec.scope === "dm" ? params.threadSpec.id : undefined;
if (dmThreadId == null) {
return params.baseSessionKey;
}
return resolveThreadSessionKeys({
baseSessionKey: params.baseSessionKey,
threadId: `${params.chatId}:${dmThreadId}`,
}).sessionKey;
};
if (commandsToRegister.length > 0 || pluginCatalog.commands.length > 0) {
if (typeof (bot as unknown as { command?: unknown }).command !== "function") {
@@ -647,17 +661,11 @@ export const registerTelegramNativeCommands = ({
});
return;
}
const baseSessionKey = route.sessionKey;
// DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums)
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({
baseSessionKey,
threadId: `${chatId}:${dmThreadId}`,
})
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
const sessionKey = resolveDmThreadSessionKey({
baseSessionKey: route.sessionKey,
chatId,
threadSpec,
});
const { skillFilter, groupSystemPrompt } = resolveTelegramGroupPromptSettings({
groupConfig,
topicConfig,
@@ -833,10 +841,15 @@ export const registerTelegramNativeCommands = ({
return;
}
const { threadSpec, route, mediaLocalRoots, tableMode, chunkMode } = runtimeContext;
const sessionKeyForInternalHooks = resolveDmThreadSessionKey({
baseSessionKey: route.sessionKey,
chatId,
threadSpec,
});
const deliveryBaseOptions = buildCommandDeliveryBaseOptions({
chatId,
accountId: route.accountId,
sessionKeyForInternalHooks: route.sessionKey,
sessionKeyForInternalHooks,
mirrorIsGroup: isGroup,
mirrorGroupId: isGroup ? String(chatId) : undefined,
mediaLocalRoots,