Compare commits

..

38 Commits

Author SHA1 Message Date
Ayaan Zaidi
03578e7a7b fix(telegram): track chunked outbound sends 2026-06-27 12:44:40 -07:00
Ayaan Zaidi
426c137e36 fix(telegram): preserve chunked bot replies 2026-06-27 12:00:06 -07:00
Gio Della-Libera
1bdde66950 Doctor: expose plugin registry findings (#96169)
Merged via squash.

Prepared head SHA: cf4399955e
Co-authored-by: giodl73-repo <235387111+giodl73-repo@users.noreply.github.com>
Co-authored-by: giodl73-repo <235387111+giodl73-repo@users.noreply.github.com>
Reviewed-by: @giodl73-repo
2026-06-27 10:00:50 -07:00
llagy009
2720ac06b7 fix(duckduckgo): guard out-of-range numeric HTML entities (#96583)
decodeHtmlEntities decoded numeric entities with String.fromCodePoint(parseInt(...)) without a range check, so an out-of-range entity such as &#99999999; or &#x110000; threw RangeError and made the whole results page fail to parse. Validate the code point is within 0..0x10FFFF and keep the original entity text otherwise. Add a regression covering decimal and hex out-of-range entities plus a valid astral entity.
2026-06-27 09:37:58 -07:00
miorbnli
ce15f348bb fix(telegram): use idempotent retry context for delete/reaction (#96612)
reactMessageTelegram and deleteMessageTelegram passed context: "send" to
isRecoverableTelegramNetworkError, which disables message-snippet matching
(allowMessageMatch defaults to false only for "send"). Both operations are
idempotent (setMessageReaction / deleteMessage are safe to repeat), yet a
transient snippet-only network error (e.g. "socket hang up", "undici network
error" with no error code) was not retried — stricter than polling/webhook/
unknown, which all default allowMessageMatch to true. Users saw spurious
reaction/delete failures on transient network errors.

Add delete | react to TelegramNetworkErrorContext (additive) and use them at
the two callers. The helper default (context !== "send") is unchanged, so
delete/react now match polling/webhook/unknown. sendMessage keeps "send".

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-27 09:31:52 -07:00
llagy009
e5c3c59c67 fix(synology-chat): truncate sanitized input on UTF-16 boundary (#96574)
sanitizeInput truncated long messages with String.slice(0, 4000), which
can cut through an astral character's surrogate pair (e.g. an emoji at
the 4000-char boundary), leaving a lone surrogate in the sanitized text
passed downstream.

Use truncateUtf16Safe so truncation never splits a surrogate pair,
keeping the existing 4000-char budget and '... [truncated]' suffix.

Adds tests asserting the truncated output stays UTF-16 well formed and
that a supplementary-plane character is preserved when it fits.
2026-06-27 09:31:33 -07:00
llagy009
2e881ab1c6 fix(googlechat): truncate approval card text on UTF-16 boundary (#96573)
truncateText sliced the approval card text paragraph with String.slice,
which can cut through an astral character's surrogate pair (e.g. an emoji
straddling the 1797-char limit), leaving a lone surrogate in the card
text sent to Google Chat.

Use truncateUtf16Safe from the plugin SDK so truncation never splits a
surrogate pair, keeping the '...' suffix and the existing length budget.

Adds tests asserting the truncated Command card text stays UTF-16 well
formed and that an astral character is preserved when it fits.
2026-06-27 09:31:26 -07:00
llagy009
90c20d15c2 fix(slack): truncate approval mrkdwn on UTF-16 boundary (#96576)
truncateSlackMrkdwn cut approval Block Kit mrkdwn (Command/Request/
plugin description) with String.slice(0, maxChars - 1), which can split
an astral character's surrogate pair at the 2600-char preview limit,
leaving a lone surrogate in the chat.postMessage/chat.update payload.

Slice with sliceUtf16Safe so truncation never splits a surrogate pair,
keeping the existing ellipsis suffix and length budget.

Adds tests asserting exec command and plugin request mrkdwn stay free of
lone surrogates, plus a BMP regression keeping the existing limit.
2026-06-27 09:31:20 -07:00
llagy009
cb8bc71ff8 fix(whatsapp): elide auto-reply text on UTF-16 boundary (#96580)
elide truncated text with String.slice(0, limit) on a UTF-16 code-unit
index, so an astral character straddling the limit was cut into a lone
surrogate; the truncated-char count was also computed from the fixed
limit rather than the actual kept length.

Truncate with truncateUtf16Safe so a surrogate pair is never split, and
derive the truncated-char count from the kept length so the annotation
stays accurate.

Adds tests asserting no lone surrogate when the limit lands inside an
emoji and that a complete astral character is kept when it fits.
2026-06-27 09:31:16 -07:00
llagy009
b5c662f4f5 fix(codex): keep CLI session preview text on code-point boundaries (#96582)
truncateText shortened the cached lastMessage preview with value.slice(0, max - 3), which can cut a surrogate pair in half and emit a lone surrogate into the codex CLI session list JSON. Use the shared truncateUtf16Safe helper so truncation falls back to a whole code-point boundary. Add regressions for both the history.jsonl and sessions/**/*.jsonl preview paths.
2026-06-27 09:31:00 -07:00
llagy009
d693ed4af3 fix(qqbot): truncate reminder job name on code-point boundary (#96575)
generateJobName truncated reminder content with String.slice(0, 20) on
a UTF-16 code-unit index, so an astral character (e.g. an emoji) landing
on the boundary was cut into a lone surrogate, producing a malformed
cron job name.

Truncate with the shared truncateUtf16Safe helper so a surrogate pair is
never split, keeping the existing 20-unit budget and ellipsis suffix.

Adds a test asserting the truncated job name contains no lone surrogate.
2026-06-27 09:30:52 -07:00
llagy009
6c5a9fde9f fix(msteams): truncate reflection prompt on UTF-16 boundary (#96578)
buildReflectionPrompt truncated the thumbed-down response with
String.slice(0, 500) on a UTF-16 code-unit index, so an astral
character straddling the 500-char cap was cut into a lone surrogate in
the reflection prompt built for the LLM.

Use truncateUtf16Safe so truncation never splits a surrogate pair,
keeping the existing 500-char budget and '...' suffix.

Adds tests asserting the prompt stays UTF-16 well formed when truncating
and that a boundary emoji is preserved when it fits.
2026-06-27 09:30:26 -07:00
Vincent Koc
b8e3de1160 fix(telegram): recover stalled ingress spool claims (#97118)
* fix(telegram): drain ingress with native queue claims

* fix(telegram): bound native claim drain snapshots

* fix(telegram): recover pid-reused ingress claims

* fix(channels): block claimed candidate lanes

* fix(telegram): recover stalled ingress spool claims

* test(telegram): cover native claimNext drain stalls

---------

Co-authored-by: Dallin Romney <dallinromney@gmail.com>
2026-06-27 09:14:14 -07:00
mikasa
b9c64142e2 fix(agents): keep missing tool results on current model (#95543) 2026-06-27 19:06:56 +03:00
Milosz Jankiewicz
84bcd500c9 feat(xai): route OAuth login through device-code flow (#97249)
Route xAI OAuth through device-code sign-in so remote and headless hosts do not need a localhost callback. Preserve the legacy manual `xai-device-code` auth choice/method as a compatibility alias to the same device-code flow.

Also migrate stale xAI token endpoints on refresh and fail fast on structured refresh errors while keeping retries scoped to detected HTML/Cloudflare challenge responses.

Verification:
- `node scripts/run-vitest.mjs extensions/xai/index.test.ts extensions/xai/xai-oauth.test.ts`
- `node scripts/run-vitest.mjs src/cli/models-cli.test.ts -t 'maps --device-code'`
- `node scripts/run-vitest.mjs src/commands/auth-choice.test.ts -t 'removed provider auth choice'`
- Crabbox local-container live smoke on exact head `fef3cb24afb01cd1f69cf04ef67ed11d71dfadb3`: xAI discovery and device authorization returned 200.
- `$autoreview` after the live smoke: clean.

Co-authored-by: Jaaneek <Jaaneek@users.noreply.github.com>
Co-authored-by: fuller-stack-dev <263060202+fuller-stack-dev@users.noreply.github.com>
2026-06-27 10:02:57 -06:00
ly-wang19
f857e8d66e fix(terminal): wrap long wide-char words by visible width, not code-point count (#96746)
wrapNoteMessage measures every fit decision in visible columns, but its
splitLongWord fallback (for a single word longer than the line budget) sliced
the word into groups of maxLen code points. maxLen is a visible-column budget,
so a run of wide characters (CJK / fullwidth / emoji, 2 columns each) produced
lines up to twice the budget — e.g. a 24-char CJK word at maxWidth 20 emitted a
40-column line.

Accumulate grapheme visible width (the same unit visibleWidth uses) and start a
new segment when the next grapheme would exceed maxLen, so every wrapped segment
fits the budget; a single grapheme wider than the budget still emits, preserving
progress. ASCII wrapping is unchanged.

Co-authored-by: ly-wang19 <ly-wang19@users.noreply.github.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 08:50:45 -07:00
clawSean
a048aeae16 fix(qa-lab): treat claude-cli as Anthropic-family for live turn timeouts (#96567)
The live-frontier turn-timeout resolution only matched `anthropic/` models,
so `claude-cli/*` routes fell into the 120s fallback bucket instead of the
180s Anthropic floor, and claude-cli Opus missed the 240s Opus floor — even
though claude-cli serves the same Anthropic Claude models.

Recognize claude-cli as Anthropic-family via a small `isAnthropicFamilyModel`
helper, mirroring the existing `provider === "anthropic" || provider === "claude-cli"`
precedent in the aimock and mock-openai servers.

Co-authored-by: clawSean <260045960+clawSean@users.noreply.github.com>
Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 08:47:33 -07:00
llagy009
4b9e01813e fix(msteams): keep truncated parent context text well-formed (#96569)
summarizeParentMessage truncated the parent body with text.slice(0, PARENT_TEXT_MAX_CHARS - 1), which can cut a surrogate pair in half and emit a lone surrogate into the injected "Replying to @sender: …" system event. Use the shared truncateUtf16Safe helper so truncation falls back to a whole code-point boundary. Add a regression asserting the summary stays isWellFormed() when the limit lands inside an emoji.
2026-06-27 08:47:27 -07:00
llagy009
7830faa5fe fix(whatsapp): convert GFM bold-italic without leaving literal asterisks (#96570)
markdownToWhatsApp only handled **bold** and __bold__, so combined GFM
strong+emphasis such as ***bi***, __*y*__ or **_x_** was reduced by the
plain bold rule first and left a literal ** around the inner emphasis
(e.g. ***bi*** -> **bi**), which WhatsApp renders as plain characters.

Handle the combined strong+emphasis variants before the plain strong
rules and emit WhatsApp bold+italic (*_text_*). Plain bold, italic,
strikethrough and code-span handling are unchanged.

Adds it.each cases for the GFM bold-italic variants and a regression
case ensuring bold-italic markers inside inline code are preserved.
2026-06-27 08:46:45 -07:00
Masato Hoshino
ddedf13190 fix(irc): sanitize internal tool-trace lines from outbound text (#97214)
* fix(irc): sanitize internal tool-trace lines from outbound text

* fix(irc): sanitize internal tool-trace lines from outbound text

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
2026-06-27 08:34:38 -07:00
Gio Della-Libera
cb4244fe15 Doctor: expose state integrity findings (#95979)
Merged via squash.

Prepared head SHA: eb3bd1adad
Co-authored-by: giodl73-repo <235387111+giodl73-repo@users.noreply.github.com>
Co-authored-by: giodl73-repo <235387111+giodl73-repo@users.noreply.github.com>
Reviewed-by: @giodl73-repo
2026-06-27 08:31:56 -07:00
zw-xysk
361869e434 fix(validation): preserve null in anyOf unions instead of coercing to empty string (fixes #96716) (#97212)
* fix(validation): preserve null in anyOf unions instead of coercing to empty string

Fixes #96716

* fix(validation): preserve null in anyOf unions instead of coercing to empty string

* fix(validation): preserve null in anyOf unions instead of coercing to empty string

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
2026-06-27 08:25:42 -07:00
Gio Della-Libera
4010b81a77 Refactor external plugin catalog toward feeds (#95846)
Merged via squash.

Prepared head SHA: 82d05bdc46
Co-authored-by: giodl73-repo <235387111+giodl73-repo@users.noreply.github.com>
Co-authored-by: giodl73-repo <235387111+giodl73-repo@users.noreply.github.com>
Reviewed-by: @giodl73-repo
2026-06-27 07:41:40 -07:00
xingzhou
8fa24325b5 fix(nostr): bound seen tracker timer options (#97133) 2026-06-27 07:40:36 -07:00
mushuiyu886
f4fa10c2c5 fix(clickclack): bound REST success JSON response reads (#96970)
* fix(clickclack): bound REST success JSON response reads

* test(clickclack): harden response cap proof

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
2026-06-27 07:30:32 -07:00
cornna
2100ee7cc8 fix(telegram): avoid duplicate dm chat window context (#89855)
Co-authored-by: Cornna <96944678+ymylive@users.noreply.github.com>
2026-06-27 07:29:57 -07:00
mushuiyu886
6e8f30c0e2 fix(qqbot): bound STT transcription JSON response (#96968) 2026-06-27 07:25:29 -07:00
ooiuuii
9d800b71c0 fix(scripts): route i18n formatter through pnpm runner (#95534) 2026-06-27 07:13:04 -07:00
mushuiyu886
5ccfc97b31 fix(google): bound TTS success JSON response reads (#96984) 2026-06-27 07:00:36 -07:00
mushuiyu886
a7bfc06f45 fix(google-media): bound JSON response reads (#96920)
* fix(google-media): bound JSON response reads

* test(google): relax media response cap assertion

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
2026-06-27 06:46:58 -07:00
Kevin Lin
c5d34c8376 feat(codex): add always plugin approval mode (#97123)
* feat(codex): add always plugin approval mode

* fix(codex): normalize plugin approval decisions

* fix(codex): fail closed on layered approval overrides
2026-06-27 01:19:00 -07:00
joshavant
fbfadbd806 test: relax local TUI PTY startup wait 2026-06-26 23:08:14 -05:00
Josh Avant
6f1076351c fix: defer active implicit session rollover (#97164) 2026-06-26 22:42:45 -05:00
joshavant
898ca9741c test(trajectory): cover truncated usage preservation 2026-06-26 22:34:55 -05:00
lin-hongkuan
67118d5ab9 fix(trajectory): preserve codex completion usage 2026-06-26 22:34:55 -05:00
lin-hongkuan
bf2a8ecfdb fix(trajectory): preserve usage in truncated events 2026-06-26 22:34:55 -05:00
Josh Avant
cee2aca409 Scope agent cron operations to the calling agent (#96883)
* Scope agent cron operations to caller

* Scope OpenClaw tools MCP cron by session

* Address cron scope review feedback

* Preserve unscoped cron update retargeting

* Move cron caller identity into gateway context

* Clarify Gateway restart guidance

* Add cron caller identity regression proof
2026-06-26 21:41:14 -05:00
Peter Steinberger
56259606d1 fix(agent-core): ignore truncated tool calls (#97140)
* fix(agent-core): ignore truncated tool calls

Co-authored-by: Galin Iliev <5711535+galiniliev@users.noreply.github.com>

* fix(agent-core): require explicit tool-call terminals

---------

Co-authored-by: Galin Iliev <5711535+galiniliev@users.noreply.github.com>
2026-06-27 03:31:42 +01:00
197 changed files with 9095 additions and 1669 deletions

View File

@@ -316,6 +316,11 @@ conversation bindings, or any non-Codex harness.
plugin/app support for the Codex harness. Default: `false`.
- `plugins.entries.codex.config.codexPlugins.allow_destructive_actions`:
default destructive-action policy for migrated plugin app elicitations.
Use `true` to accept safe Codex approval schemas without prompting, `false`
to decline them, `"auto"` to route Codex-required approvals through OpenClaw
plugin approvals, or `"always"` to ask for every plugin write/destructive
action without durable approval. The `"always"` mode clears durable Codex
per-tool approval overrides for the affected app before starting the thread.
Default: `true`.
- `plugins.entries.codex.config.codexPlugins.plugins.<key>.enabled`: enables a
migrated plugin entry when global `codexPlugins.enabled` is also true.
@@ -326,7 +331,8 @@ conversation bindings, or any non-Codex harness.
Codex plugin identity from migration, for example `"google-calendar"`.
- `plugins.entries.codex.config.codexPlugins.plugins.<key>.allow_destructive_actions`:
per-plugin destructive-action override. When omitted, the global
`allow_destructive_actions` value is used.
`allow_destructive_actions` value is used. The per-plugin value accepts the
same `true`, `false`, `"auto"`, or `"always"` policies.
`codexPlugins.enabled` is the global enablement directive. Explicit plugin
entries written by migration are the durable install and repair eligibility set.

View File

@@ -200,11 +200,11 @@ enabled.
OpenClaw sets app-level `destructive_enabled` from the effective global or
per-plugin `allow_destructive_actions` policy and lets Codex enforce
destructive tool metadata from its native app tool annotations. `true` and
`"auto"` both set `destructive_enabled: true`; `false` sets it false. The
`_default` app config is disabled with `open_world_enabled: false`. Enabled
plugin apps are emitted with `open_world_enabled: true`; OpenClaw does not
expose a separate plugin open-world policy knob and does not maintain
destructive tool metadata from its native app tool annotations. `true`,
`"auto"`, and `"always"` set `destructive_enabled: true`; `false` sets it
false. The `_default` app config is disabled with `open_world_enabled: false`.
Enabled plugin apps are emitted with `open_world_enabled: true`; OpenClaw does
not expose a separate plugin open-world policy knob and does not maintain
per-plugin destructive tool-name deny lists.
Tool approval mode is automatic by default for plugin apps so non-destructive
@@ -225,6 +225,10 @@ plugins, while unsafe schemas and ambiguous ownership still fail closed:
- When policy is `"auto"`, OpenClaw exposes destructive plugin actions to
Codex but turns ownership-proven MCP approval elicitations into OpenClaw
plugin approvals before returning the Codex approval response.
- When policy is `"always"`, OpenClaw uses the same Codex write/destructive
gating as `"auto"`, clears durable Codex per-tool approval overrides for the
app before the thread starts, and only offers one-shot approval or denial so
durable approvals cannot suppress later write-action prompts.
- Missing plugin identity, ambiguous ownership, a missing turn id, a wrong turn
id, or an unsafe elicitation schema declines instead of prompting.
@@ -272,8 +276,9 @@ Codex thread bindings keep the app config they started with until OpenClaw
establishes a new harness session or replaces a stale binding.
**Destructive action is declined:** check the global and per-plugin
`allow_destructive_actions` values. Even when policy is true or `"auto"`,
unsafe elicitation schemas and ambiguous plugin identity still fail closed.
`allow_destructive_actions` values. Even when policy is true, `"auto"`, or
`"always"`, unsafe elicitation schemas and ambiguous plugin identity still fail
closed.
## Related

View File

@@ -29,10 +29,11 @@ Use the path that matches your OpenClaw install state:
openclaw onboard --install-daemon
```
On a VPS or over SSH, use device-code during onboarding:
On a VPS or over SSH, select xAI OAuth directly; OpenClaw uses device-code
verification and does not require a localhost callback:
```bash
openclaw onboard --install-daemon --auth-choice xai-device-code
openclaw onboard --install-daemon --auth-choice xai-oauth
```
OAuth does not require an xAI API key. OpenClaw does not require the Grok
@@ -48,13 +49,6 @@ Use the path that matches your OpenClaw install state:
openclaw models auth login --provider xai --method oauth
```
Use the device-code flow instead when the Gateway runs over SSH, Docker, or
a VPS and a localhost browser callback is awkward:
```bash
openclaw models auth login --provider xai --device-code
```
To make Grok the default model after signing in, apply it separately:
```bash
@@ -86,8 +80,7 @@ Use the path that matches your OpenClaw install state:
<Note>
OpenClaw uses the xAI Responses API as the bundled xAI transport. The same
credential from `openclaw models auth login --provider xai --method oauth`,
`openclaw models auth login --provider xai --device-code`, or
credential from `openclaw models auth login --provider xai --method oauth` or
`openclaw models auth login --provider xai --method api-key` can also power first-class
`web_search`, `x_search`, remote `code_execution`, and xAI image/video generation.
Speech and transcription currently require `XAI_API_KEY` or provider config.
@@ -102,8 +95,9 @@ and, by default, `x_search` through an operator xAI Responses proxy.
## OAuth troubleshooting
- If browser OAuth cannot reach `127.0.0.1:56121`, use
`openclaw models auth login --provider xai --device-code`.
- For SSH, Docker, VPS, or other remote setups, use
`openclaw models auth login --provider xai --method oauth`; xAI OAuth uses
device-code verification instead of a localhost callback.
- If sign-in succeeds but Grok is not the default model, run
`openclaw models set xai/grok-4.3`.
- To inspect saved xAI auth profiles, run:
@@ -117,9 +111,9 @@ and, by default, `x_search` through an operator xAI Responses proxy.
eligible, try the API-key path or check the subscription on xAI's side.
<Tip>
Use `xai-device-code` when signing in from SSH, Docker, or a VPS. OpenClaw
prints an xAI URL and short code; finish sign-in in any local browser while the
remote process polls xAI for the completed token exchange.
Use `xai-oauth` when signing in from SSH, Docker, or a VPS. OpenClaw prints an
xAI URL and short code; finish sign-in in any local browser while the remote
process polls xAI for the completed token exchange.
</Tip>
## Built-in catalog
@@ -498,12 +492,10 @@ Legacy aliases still normalize to the canonical bundled ids:
<Accordion title="Known limits">
- xAI auth can use an API key, environment variable, plugin config fallback,
browser OAuth, or device-code OAuth with an eligible xAI account. Browser
OAuth uses a local callback on `127.0.0.1:56121`; for remote hosts, use
`xai-device-code` unless you want to forward that port before opening the
sign-in URL. xAI decides which accounts can receive OAuth API tokens, and
the consent page may show Grok Build even though OpenClaw does not require
the Grok Build app.
or OAuth with an eligible xAI account. OAuth uses device-code verification
without a localhost callback. xAI decides which accounts can receive OAuth
API tokens, and the consent page may show Grok Build even though OpenClaw
does not require the Grok Build app.
- OpenClaw does not currently expose the xAI multi-agent model family. xAI
serves these models through the Responses API, but they do not accept the
client-side or custom tools used by OpenClaw's shared agent loop. See the

View File

@@ -38,13 +38,13 @@ Do **not** use it when you need local files, your shell, your repo, or paired de
<Steps>
<Step title="Provide xAI credentials">
Sign in with Grok OAuth using an eligible SuperGrok or X Premium subscription,
use the remote-friendly device-code flow, or store an API key. OAuth works
for `code_execution` and `x_search`; `XAI_API_KEY` or plugin web-search
config can also power Grok `web_search`.
or store an API key. xAI OAuth uses device-code verification, so it works
from remote hosts without a localhost callback. OAuth works for
`code_execution` and `x_search`; `XAI_API_KEY` or plugin web-search config
can also power Grok `web_search`.
```bash
openclaw models auth login --provider xai --method oauth
openclaw models auth login --provider xai --device-code
```
During a fresh install, the same auth choices are available inside
@@ -52,7 +52,7 @@ Do **not** use it when you need local files, your shell, your repo, or paired de
```bash
openclaw onboard --install-daemon
openclaw onboard --install-daemon --auth-choice xai-device-code
openclaw onboard --install-daemon --auth-choice xai-oauth
```
Or use an API key:

View File

@@ -192,6 +192,109 @@ describe("AcpxRuntime fresh reset wrapper", () => {
);
});
it("adds the OpenClaw session key to the managed OpenClaw tools MCP bridge", () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => undefined),
save: vi.fn(async () => {}),
};
const { runtime } = makeRuntime(baseStore, {
openclawToolsMcpBridgeEnabled: true,
mcpServers: [
{
name: "openclaw-tools",
command: "node",
args: ["dist/mcp/openclaw-tools-serve.js"],
env: [],
},
],
});
const readScopedMcpEnv = (sessionKey: string) => {
const delegate = (
runtime as unknown as {
resolveOpenClawToolsDelegateForSession(sessionKey: string): unknown;
}
).resolveOpenClawToolsDelegateForSession(sessionKey) as {
options: {
mcpServers?: Array<{
env?: Array<{ name: string; value: string }>;
name: string;
}>;
};
};
return delegate.options.mcpServers?.find((server) => server.name === "openclaw-tools")?.env;
};
expect(readScopedMcpEnv("agent:worker:main")).toContainEqual({
name: "OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY",
value: "agent:worker:main",
});
expect(readScopedMcpEnv("agent:research:main")).toContainEqual({
name: "OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY",
value: "agent:research:main",
});
});
it("keeps managed OpenClaw tools MCP delegates reachable for fresh sessions", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => undefined),
save: vi.fn(async () => {}),
};
const { runtime } = makeRuntime(baseStore, {
openclawToolsMcpBridgeEnabled: true,
mcpServers: [
{
name: "openclaw-tools",
command: "node",
args: ["dist/mcp/openclaw-tools-serve.js"],
env: [],
},
],
});
const exposedRuntime = runtime as unknown as {
openclawToolsSessionDelegates: Map<string, unknown>;
resolveOpenClawToolsDelegateForSession(sessionKey: string): unknown;
};
const firstDelegate =
exposedRuntime.resolveOpenClawToolsDelegateForSession("agent:worker:main");
expect(exposedRuntime.openclawToolsSessionDelegates.has("agent:worker:main")).toBe(true);
await runtime.prepareFreshSession({ sessionKey: "agent:worker:main" });
expect(exposedRuntime.openclawToolsSessionDelegates.has("agent:worker:main")).toBe(true);
expect(exposedRuntime.resolveOpenClawToolsDelegateForSession("agent:worker:main")).toBe(
firstDelegate,
);
});
it("uses the no-MCP delegate for startup probes when the OpenClaw tools bridge is enabled", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => undefined),
save: vi.fn(async () => {}),
};
const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, {
openclawToolsMcpBridgeEnabled: true,
mcpServers: [
{
name: "openclaw-tools",
command: "node",
args: ["dist/mcp/openclaw-tools-serve.js"],
env: [],
},
],
});
const defaultProbe = vi.spyOn(delegate, "probeAvailability").mockResolvedValue(undefined);
const safeProbe = vi
.spyOn(bridgeSafeDelegate, "probeAvailability")
.mockResolvedValue(undefined);
await runtime.probeAvailability();
expect(safeProbe).toHaveBeenCalledTimes(1);
expect(defaultProbe).not.toHaveBeenCalled();
});
it("normalizes OpenClaw Codex model ids for ACP startup", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => undefined),
@@ -1163,6 +1266,46 @@ describe("AcpxRuntime fresh reset wrapper", () => {
expect(baseStore["load"]).toHaveBeenCalledOnce();
});
it("releases managed OpenClaw tools MCP delegates after close", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => undefined),
save: vi.fn(async () => {}),
};
const { runtime } = makeRuntime(baseStore, {
openclawToolsMcpBridgeEnabled: true,
mcpServers: [
{
name: "openclaw-tools",
command: "node",
args: ["dist/mcp/openclaw-tools-serve.js"],
env: [],
},
],
});
const exposedRuntime = runtime as unknown as {
openclawToolsSessionDelegates: Map<string, { close: AcpRuntime["close"] }>;
resolveOpenClawToolsDelegateForSession(sessionKey: string): {
close: AcpRuntime["close"];
};
};
const scopedDelegate =
exposedRuntime.resolveOpenClawToolsDelegateForSession("agent:codex:main");
const close = vi.spyOn(scopedDelegate, "close").mockResolvedValue(undefined);
await runtime.close({
handle: {
sessionKey: "agent:codex:main",
backend: "acpx",
runtimeSessionName: "agent:codex:main",
},
reason: "closed",
});
expect(close).toHaveBeenCalledOnce();
expect(exposedRuntime.openclawToolsSessionDelegates.has("agent:codex:main")).toBe(false);
});
it("cleans up OpenClaw-owned ACPX process trees after close", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => ({

View File

@@ -50,6 +50,7 @@ type OpenClawAcpxRuntimeOptions = AcpRuntimeOptions & {
openclawWrapperRoot?: string;
openclawGatewayInstanceId?: string;
openclawProcessLeaseStore?: AcpxProcessLeaseStore;
openclawToolsMcpBridgeEnabled?: boolean;
};
type AcpxRuntimeTestOptions = Record<string, unknown> & {
openclawProcessCleanup?: AcpxProcessCleanupDeps;
@@ -57,6 +58,10 @@ type AcpxRuntimeTestOptions = Record<string, unknown> & {
type OpenClawRuntimeTurnInput = Parameters<NonNullable<AcpRuntime["startTurn"]>>[0];
type OpenClawRuntimeEnsureInput = Parameters<AcpRuntime["ensureSession"]>[0];
type AcpxDelegateEnsureInput = Parameters<BaseAcpxRuntime["ensureSession"]>[0];
type AcpxMcpServer = NonNullable<AcpRuntimeOptions["mcpServers"]>[number];
const ACPX_OPENCLAW_TOOLS_MCP_SERVER_NAME = "openclaw-tools";
const OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY_ENV = "OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY";
type ResetAwareSessionStore = AcpSessionStore & {
markFresh: (sessionKey: string) => void;
@@ -682,6 +687,33 @@ function shouldUseDistinctBridgeDelegate(options: AcpRuntimeOptions): boolean {
return Array.isArray(mcpServers) && mcpServers.length > 0;
}
function withOpenClawToolsMcpSessionEnv(params: {
enabled: boolean | undefined;
mcpServers: AcpRuntimeOptions["mcpServers"];
sessionKey: string;
}): AcpRuntimeOptions["mcpServers"] {
const sessionKey = params.sessionKey.trim();
if (!params.enabled || !sessionKey || !params.mcpServers?.length) {
return params.mcpServers;
}
let changed = false;
const nextServers = params.mcpServers.map((server): AcpxMcpServer => {
if (server.name !== ACPX_OPENCLAW_TOOLS_MCP_SERVER_NAME || !("command" in server)) {
return server;
}
changed = true;
const env = [
...server.env.filter((entry) => entry.name !== OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY_ENV),
{
name: OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY_ENV,
value: sessionKey,
},
];
return { ...server, env };
});
return changed ? nextServers : params.mcpServers;
}
/** OpenClaw-managed ACP runtime implementation backed by the upstream acpx runtime. */
export class AcpxRuntime implements AcpRuntime {
private readonly sessionStore: ResetAwareSessionStore;
@@ -693,6 +725,10 @@ export class AcpxRuntime implements AcpRuntime {
private readonly delegate: BaseAcpxRuntime;
private readonly bridgeSafeDelegate: BaseAcpxRuntime;
private readonly probeDelegate: BaseAcpxRuntime;
private readonly delegateOptions: AcpRuntimeOptions;
private readonly delegateTestOptions: BaseAcpxRuntimeTestOptions;
private readonly openclawToolsMcpBridgeEnabled: boolean;
private readonly openclawToolsSessionDelegates = new Map<string, BaseAcpxRuntime>();
private readonly processCleanupDeps: AcpxProcessCleanupDeps | undefined;
private readonly wrapperRoot: string | undefined;
private readonly gatewayInstanceId: string | undefined;
@@ -706,6 +742,7 @@ export class AcpxRuntime implements AcpRuntime {
this.wrapperRoot = options.openclawWrapperRoot;
this.gatewayInstanceId = options.openclawGatewayInstanceId;
this.processLeaseStore = options.openclawProcessLeaseStore;
this.openclawToolsMcpBridgeEnabled = options.openclawToolsMcpBridgeEnabled === true;
this.cwd = options.cwd;
this.sessionStore = createResetAwareSessionStore(options.sessionStore, {
gatewayInstanceId: this.gatewayInstanceId,
@@ -723,20 +760,21 @@ export class AcpxRuntime implements AcpRuntime {
sessionStore: this.sessionStore,
agentRegistry: this.scopedAgentRegistry,
};
this.delegate = new BaseAcpxRuntime(
sharedOptions,
delegateTestOptions as BaseAcpxRuntimeTestOptions,
);
this.delegateOptions = sharedOptions;
this.delegateTestOptions = delegateTestOptions as BaseAcpxRuntimeTestOptions;
this.delegate = new BaseAcpxRuntime(sharedOptions, this.delegateTestOptions);
this.bridgeSafeDelegate = shouldUseDistinctBridgeDelegate(options)
? new BaseAcpxRuntime(
{
...sharedOptions,
mcpServers: [],
},
delegateTestOptions as BaseAcpxRuntimeTestOptions,
this.delegateTestOptions,
)
: this.delegate;
this.probeDelegate = this.resolveDelegateForAgent(resolveProbeAgentName(options));
this.probeDelegate = this.openclawToolsMcpBridgeEnabled
? this.bridgeSafeDelegate
: this.resolveDelegateForAgent(resolveProbeAgentName(options));
}
private resolveDelegateForAgent(agentName: string | undefined): BaseAcpxRuntime {
@@ -751,6 +789,57 @@ export class AcpxRuntime implements AcpRuntime {
return shouldUseBridgeSafeDelegateForCommand(command) ? this.bridgeSafeDelegate : this.delegate;
}
private resolveDelegateForSession(params: {
command: string | undefined;
sessionKey: string;
}): BaseAcpxRuntime {
if (shouldUseBridgeSafeDelegateForCommand(params.command)) {
return this.bridgeSafeDelegate;
}
return this.resolveOpenClawToolsDelegateForSession(params.sessionKey);
}
private resolveOpenClawToolsDelegateForSession(sessionKey: string): BaseAcpxRuntime {
if (!this.openclawToolsMcpBridgeEnabled) {
return this.delegate;
}
const normalizedSessionKey = sessionKey.trim();
if (!normalizedSessionKey) {
return this.delegate;
}
const cached = this.openclawToolsSessionDelegates.get(normalizedSessionKey);
if (cached) {
return cached;
}
// Upstream acpx captures mcpServers at runtime construction. The managed
// OpenClaw tools bridge needs per-session identity, so cache one delegate
// per session with the scoped MCP env already embedded.
const delegate = new BaseAcpxRuntime(
{
...this.delegateOptions,
mcpServers: withOpenClawToolsMcpSessionEnv({
enabled: this.openclawToolsMcpBridgeEnabled,
mcpServers: this.delegateOptions.mcpServers,
sessionKey: normalizedSessionKey,
}),
},
this.delegateTestOptions,
);
this.openclawToolsSessionDelegates.set(normalizedSessionKey, delegate);
return delegate;
}
private releaseOpenClawToolsDelegateForSession(sessionKey: string): void {
if (!this.openclawToolsMcpBridgeEnabled) {
return;
}
const normalizedSessionKey = sessionKey.trim();
if (!normalizedSessionKey) {
return;
}
this.openclawToolsSessionDelegates.delete(normalizedSessionKey);
}
private async resolveDelegateForHandle(handle: AcpRuntimeHandle): Promise<BaseAcpxRuntime> {
const record = await this.sessionStore.load(handle.acpxRecordId ?? handle.sessionKey);
return this.resolveDelegateForLoadedRecord(handle, record);
@@ -762,9 +851,17 @@ export class AcpxRuntime implements AcpRuntime {
): BaseAcpxRuntime {
const recordCommand = readAgentCommandFromRecord(record);
if (recordCommand) {
return this.resolveDelegateForCommand(recordCommand);
return this.resolveDelegateForSession({
command: recordCommand,
sessionKey: handle.sessionKey,
});
}
return this.resolveDelegateForAgent(readAgentFromHandle(handle));
const agentName = readAgentFromHandle(handle);
const command = resolveAgentCommandForName({
agentName,
agentRegistry: this.agentRegistry,
});
return this.resolveDelegateForSession({ command, sessionKey: handle.sessionKey });
}
private async resolveCommandForHandle(handle: AcpRuntimeHandle): Promise<string | undefined> {
@@ -980,7 +1077,7 @@ export class AcpxRuntime implements AcpRuntime {
agentName: input.agent,
agentRegistry: this.agentRegistry,
});
const delegate = this.resolveDelegateForCommand(command);
const delegate = this.resolveDelegateForSession({ command, sessionKey: input.sessionKey });
const claudeModelOverride = isClaudeAcpCommand(command)
? normalizeClaudeAcpModelOverride(input.model)
: undefined;
@@ -1264,6 +1361,9 @@ export class AcpxRuntime implements AcpRuntime {
}
async prepareFreshSession(input: { sessionKey: string }): Promise<void> {
// Fresh reset has no ACP handle to close the delegate's upstream client.
// Keep the scoped delegate reachable so the next ensure can replace it;
// close() owns cache release when the session lifecycle ends.
this.sessionStore.markFresh(input.sessionKey);
}
@@ -1272,8 +1372,9 @@ export class AcpxRuntime implements AcpRuntime {
input.handle.acpxRecordId ?? input.handle.sessionKey,
);
let closeSucceeded;
const delegate = this.resolveDelegateForLoadedRecord(input.handle, record);
try {
await this.resolveDelegateForLoadedRecord(input.handle, record).close({
await delegate.close({
handle: input.handle,
reason: input.reason,
discardPersistentState: input.discardPersistentState,
@@ -1282,6 +1383,9 @@ export class AcpxRuntime implements AcpRuntime {
} finally {
await this.cleanupProcessTreeForRecord(input.handle, record);
}
if (closeSucceeded) {
this.releaseOpenClawToolsDelegateForSession(input.handle.sessionKey);
}
if (closeSucceeded && input.discardPersistentState) {
this.sessionStore.markFresh(input.handle.sessionKey);
}

View File

@@ -111,6 +111,7 @@ function createLazyDefaultRuntime(params: AcpxRuntimeFactoryParams): AcpxRuntime
}),
probeAgent: params.pluginConfig.probeAgent,
mcpServers: toAcpMcpServers(params.pluginConfig.mcpServers),
openclawToolsMcpBridgeEnabled: params.pluginConfig.openClawToolsMcpBridge,
permissionMode: params.pluginConfig.permissionMode,
nonInteractivePermissions: params.pluginConfig.nonInteractivePermissions,
timeoutMs: resolveAcpxTimerTimeoutMs(params.pluginConfig.timeoutSeconds),

View File

@@ -1,6 +1,81 @@
import { createServer, type Server } from "node:http";
import { describe, expect, it, vi } from "vitest";
import { createClickClackClient } from "./http-client.js";
const LOOPBACK_RESPONSE_BYTES = 18 * 1024 * 1024;
async function listenLoopbackServer(server: Server): Promise<number> {
return await new Promise((resolve, reject) => {
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
server.off("error", reject);
const address = server.address();
if (!address || typeof address === "string") {
reject(new Error("expected loopback TCP address"));
return;
}
resolve(address.port);
});
});
}
function createOversizedJsonServer(): { server: Server; closed: Promise<number> } {
let resolveClosed: (sentBytes: number) => void = () => {};
const closed = new Promise<number>((resolve) => {
resolveClosed = resolve;
});
const server = createServer((req, res) => {
let sentBytes = 0;
let stopped = false;
let prefixSent = false;
const prefixChunk = Buffer.from('{"user":{"id":"');
const bodyChunk = Buffer.alloc(64 * 1024, 0x61);
const suffixChunk = Buffer.from('"}}');
const writeBuffer = (buffer: Buffer) => {
sentBytes += buffer.length;
if (!res.write(buffer)) {
res.once("drain", writeChunks);
return false;
}
return true;
};
const writeChunks = () => {
if (!prefixSent) {
prefixSent = true;
if (!writeBuffer(prefixChunk)) {
return;
}
}
while (true) {
if (stopped) {
return;
}
if (sentBytes + bodyChunk.length + suffixChunk.length >= LOOPBACK_RESPONSE_BYTES) {
break;
}
if (!writeBuffer(bodyChunk)) {
return;
}
}
if (!stopped) {
sentBytes += suffixChunk.length;
res.end(suffixChunk);
}
};
res.writeHead(200, { connection: "close", "content-type": "application/json" });
res.on("close", () => {
stopped = true;
resolveClosed(sentBytes);
});
req.on("aborted", () => {
stopped = true;
res.destroy();
});
writeChunks();
});
return { server, closed };
}
function streamedErrorResponse(body: string, limit: number) {
const encoded = new TextEncoder().encode(body);
let readCount = 0;
@@ -39,6 +114,25 @@ function streamedErrorResponse(body: string, limit: number) {
}
describe("ClickClack HTTP client", () => {
it("bounds oversized success JSON responses and closes the stream early", async () => {
const { server, closed } = createOversizedJsonServer();
const port = await listenLoopbackServer(server);
const client = createClickClackClient({
baseUrl: `http://127.0.0.1:${port}`,
token: "test-token",
});
try {
await expect(client.me()).rejects.toThrow(
"ClickClack response: JSON response exceeds 16777216 bytes",
);
const sentBytes = await closed;
expect(sentBytes).toBeLessThan(LOOPBACK_RESPONSE_BYTES);
} finally {
server.close();
}
});
it("bounds error response bodies without using raw response.text()", async () => {
const streamed = streamedErrorResponse("x".repeat(9000), 8 * 1024);
const fetchMock = vi.fn(async () => streamed.response);

View File

@@ -2,7 +2,10 @@
* Thin ClickClack REST/websocket client used by gateway, resolver, and outbound
* delivery code.
*/
import { readResponseTextLimited } from "openclaw/plugin-sdk/provider-http";
import {
readProviderJsonResponse,
readResponseTextLimited,
} from "openclaw/plugin-sdk/provider-http";
import { WebSocket } from "ws";
import type {
ClickClackChannel,
@@ -44,7 +47,7 @@ export function createClickClackClient(options: ClientOptions) {
const detail = await readResponseTextLimited(response, CLICKCLACK_ERROR_BODY_LIMIT_BYTES);
throw new Error(`ClickClack ${response.status}: ${detail}`);
}
return (await response.json()) as T;
return await readProviderJsonResponse<T>(response, "ClickClack response");
}
return {

View File

@@ -36,6 +36,14 @@ describe("codex doctor contract", () => {
},
}),
).toBe(false);
expect(
legacyConfigRules[1]?.match({
allow_destructive_actions: "always",
plugins: {
"google-calendar": { allow_destructive_actions: "always" },
},
}),
).toBe(false);
});
it("removes the retired dynamic tools profile without dropping other Codex config", () => {

View File

@@ -101,7 +101,7 @@
"default": false
},
"allow_destructive_actions": {
"oneOf": [{ "type": "boolean" }, { "const": "auto" }],
"oneOf": [{ "type": "boolean" }, { "const": "auto" }, { "const": "always" }],
"default": true
},
"plugins": {
@@ -121,7 +121,7 @@
"type": "string"
},
"allow_destructive_actions": {
"oneOf": [{ "type": "boolean" }, { "const": "auto" }]
"oneOf": [{ "type": "boolean" }, { "const": "auto" }, { "const": "always" }]
}
}
}
@@ -343,7 +343,7 @@
},
"codexPlugins.allow_destructive_actions": {
"label": "Allow Destructive Plugin Actions",
"help": "Default policy for plugin app write or destructive action elicitations. Use true to accept safe schemas without prompting, false to decline, or auto to ask through plugin approvals.",
"help": "Default policy for plugin app write or destructive action elicitations. Use true to accept safe schemas without prompting, false to decline, auto to ask through plugin approvals when Codex requires approval, or always to ask for every write/destructive action without durable approval.",
"advanced": true
},
"codexPlugins.plugins": {

View File

@@ -346,6 +346,7 @@ export async function startCodexAttemptThread(params: {
timeoutMs: params.appServer.requestTimeoutMs,
signal,
}),
configCwd: startupExecutionCwd,
appCache: defaultCodexAppInventoryCache,
appCacheKey: pluginAppCacheKey,
}),

View File

@@ -1192,6 +1192,52 @@ allowed_sandbox_modes = ["read-only", "workspace-write"]
});
});
it("parses always native Codex plugin destructive policy", () => {
const config = readCodexPluginConfig({
codexPlugins: {
enabled: true,
allow_destructive_actions: "always",
plugins: {
"google-calendar": {
marketplaceName: "openai-curated",
pluginName: "google-calendar",
},
slack: {
marketplaceName: "openai-curated",
pluginName: "slack",
allow_destructive_actions: "auto",
},
},
},
});
expect(config.codexPlugins?.allow_destructive_actions).toBe("always");
expect(resolveCodexPluginsPolicy(config)).toEqual({
configured: true,
enabled: true,
allowDestructiveActions: true,
destructiveApprovalMode: "always",
pluginPolicies: [
{
configKey: "google-calendar",
marketplaceName: "openai-curated",
pluginName: "google-calendar",
enabled: true,
allowDestructiveActions: true,
destructiveApprovalMode: "always",
},
{
configKey: "slack",
marketplaceName: "openai-curated",
pluginName: "slack",
enabled: true,
allowDestructiveActions: true,
destructiveApprovalMode: "auto",
},
],
});
});
it("rejects unsupported native Codex plugin destructive policy strings", () => {
const config = readCodexPluginConfig({
codexPlugins: {

View File

@@ -74,8 +74,8 @@ export type CodexAppServerSandboxMode = "read-only" | "workspace-write" | "dange
type CodexAppServerApprovalsReviewer = "user" | "auto_review" | "guardian_subagent";
type CodexAppServerCommandSource = "managed" | "resolved-managed" | "config" | "env";
export type CodexDynamicToolsLoading = "searchable" | "direct";
export type CodexPluginDestructivePolicy = boolean | "auto";
export type CodexPluginDestructiveApprovalMode = "allow" | "deny" | "auto";
export type CodexPluginDestructivePolicy = boolean | "auto" | "always";
export type CodexPluginDestructiveApprovalMode = "allow" | "deny" | "auto" | "always";
export const CODEX_PLUGINS_MARKETPLACE_NAME = "openai-curated";
@@ -311,7 +311,11 @@ const codexAppServerApprovalPolicySchema = z.enum([
const codexAppServerSandboxSchema = z.enum(["read-only", "workspace-write", "danger-full-access"]);
const codexAppServerApprovalsReviewerSchema = z.enum(["user", "auto_review", "guardian_subagent"]);
const codexDynamicToolsLoadingSchema = z.enum(["searchable", "direct"]);
const codexPluginDestructivePolicySchema = z.union([z.boolean(), z.literal("auto")]);
const codexPluginDestructivePolicySchema = z.union([
z.boolean(),
z.literal("auto"),
z.literal("always"),
]);
const codexAppServerServiceTierSchema = z
.preprocess(
(value) => (value === null ? null : normalizeCodexServiceTier(value)),
@@ -495,8 +499,8 @@ function resolveCodexPluginDestructivePolicy(policy: CodexPluginDestructivePolic
allowDestructiveActions: boolean;
destructiveApprovalMode: CodexPluginDestructiveApprovalMode;
} {
if (policy === "auto") {
return { allowDestructiveActions: true, destructiveApprovalMode: "auto" };
if (policy === "auto" || policy === "always") {
return { allowDestructiveActions: true, destructiveApprovalMode: policy };
}
return {
allowDestructiveActions: policy,

View File

@@ -157,7 +157,7 @@ function buildConnectorPluginApprovalElicitation(overrides: Record<string, unkno
function createPluginAppPolicyContext(
params: {
allowDestructiveActions?: boolean;
destructiveApprovalMode?: "allow" | "deny" | "auto";
destructiveApprovalMode?: "allow" | "deny" | "auto" | "always";
apps?: Array<{ appId: string; pluginName: string; mcpServerNames: string[] }>;
} = {},
) {
@@ -1017,6 +1017,96 @@ describe("Codex app-server elicitation bridge", () => {
});
});
it("does not expose allow-always for always plugin policy", async () => {
mockCallGatewayTool
.mockResolvedValueOnce({ id: "plugin:approval-calendar-always-policy", status: "accepted" })
.mockResolvedValueOnce({
id: "plugin:approval-calendar-always-policy",
decision: "allow-once",
});
const result = await handleCodexAppServerElicitationRequest({
requestParams: buildConnectorPluginApprovalElicitation({
_meta: {
codex_approval_kind: "mcp_tool_call",
source: "connector",
connector_id: "connector_google_calendar",
connector_name: "Google Calendar",
persist: ["session", "always"],
tool_title: "create_event",
},
}),
paramsForRun: createParams(),
threadId: "thread-1",
turnId: "turn-1",
pluginAppPolicyContext: createPluginAppPolicyContext({
allowDestructiveActions: true,
destructiveApprovalMode: "always",
apps: [
{
appId: "connector_google_calendar",
pluginName: "google-calendar",
mcpServerNames: [],
},
],
}),
});
expect(result).toEqual({
action: "accept",
content: null,
_meta: null,
});
expect(gatewayToolArg(0, 2)).toMatchObject({
allowedDecisions: ["allow-once", "deny"],
});
});
it("maps unexpected allow-always decisions to one-shot for always plugin policy", async () => {
mockCallGatewayTool
.mockResolvedValueOnce({
id: "plugin:approval-calendar-unexpected-always",
status: "accepted",
})
.mockResolvedValueOnce({
id: "plugin:approval-calendar-unexpected-always",
decision: "allow-always",
});
const result = await handleCodexAppServerElicitationRequest({
requestParams: buildConnectorPluginApprovalElicitation({
_meta: {
codex_approval_kind: "mcp_tool_call",
source: "connector",
connector_id: "connector_google_calendar",
connector_name: "Google Calendar",
persist: ["session", "always"],
tool_title: "create_event",
},
}),
paramsForRun: createParams(),
threadId: "thread-1",
turnId: "turn-1",
pluginAppPolicyContext: createPluginAppPolicyContext({
allowDestructiveActions: true,
destructiveApprovalMode: "always",
apps: [
{
appId: "connector_google_calendar",
pluginName: "google-calendar",
mcpServerNames: [],
},
],
}),
});
expect(result).toEqual({
action: "accept",
content: null,
_meta: null,
});
});
it("declines denied auto plugin app approvals", async () => {
mockCallGatewayTool
.mockResolvedValueOnce({ id: "plugin:approval-calendar-deny", status: "accepted" })

View File

@@ -318,10 +318,13 @@ async function buildPluginPolicyElicitationResponse(params: {
paramsForRun: params.paramsForRun,
title: approvalPrompt.title,
description: approvalPrompt.description,
allowedDecisions: approvalPrompt.allowedDecisions,
allowedDecisions: allowedPluginPolicyApprovalDecisions(mode, approvalPrompt),
signal: params.signal,
});
return buildElicitationResponse(approvalPrompt, outcome);
return buildElicitationResponse(
approvalPrompt,
oneShotPluginPolicyApprovalOutcome(mode, outcome),
);
}
logPluginElicitationDecline("unmappable_schema", params.requestParams);
return declineElicitationResponse();
@@ -329,10 +332,28 @@ async function buildPluginPolicyElicitationResponse(params: {
function resolvePluginDestructiveApprovalMode(
entry: PluginAppPolicyContextEntry,
): "allow" | "deny" | "auto" {
): "allow" | "deny" | "auto" | "always" {
return entry.destructiveApprovalMode ?? (entry.allowDestructiveActions ? "allow" : "deny");
}
function allowedPluginPolicyApprovalDecisions(
mode: "allow" | "deny" | "auto" | "always",
approvalPrompt: BridgeableApprovalElicitation,
): ExecApprovalDecision[] {
const allowedDecisions = approvalPrompt.allowedDecisions ?? ["allow-once", "deny"];
if (mode !== "always") {
return allowedDecisions;
}
return allowedDecisions.filter((decision) => decision !== "allow-always");
}
function oneShotPluginPolicyApprovalOutcome(
mode: "allow" | "deny" | "auto" | "always",
outcome: AppServerApprovalOutcome,
): AppServerApprovalOutcome {
return mode === "always" && outcome === "approved-session" ? "approved-once" : outcome;
}
function readPluginApprovalElicitation(
entry: PluginAppPolicyContextEntry,
requestParams: JsonObject,

View File

@@ -170,6 +170,379 @@ describe("Codex plugin thread config", () => {
});
});
it("exposes destructive app access while clearing only durable approval overrides for always mode", async () => {
const appCache = new CodexAppInventoryCache();
await appCache.refreshNow({
key: "runtime",
nowMs: 0,
request: async () => ({
data: [appInfo("google-calendar-app", true)],
nextCursor: null,
}),
});
let configReadCount = 0;
const request = vi.fn(async (method: string) => {
if (method === "plugin/list") {
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
}
if (method === "plugin/read") {
return pluginDetail(
"google-calendar",
[appSummary("google-calendar-app")],
["google-calendar"],
);
}
if (method === "config/read") {
configReadCount += 1;
if (configReadCount > 1) {
return {
config: {
apps: {
"google-calendar-app": {
tools: {
"calendar/read": {
enabled: false,
},
},
},
},
},
};
}
return {
config: {
apps: {
"google-calendar-app": {
tools: {
"calendar/create": {
approval_mode: "approve",
enabled: false,
},
"calendar/read": {
enabled: false,
},
"calendar/update": {
approvalMode: "approve",
},
},
},
},
},
};
}
if (method === "config/value/write") {
return {};
}
throw new Error(`unexpected request ${method}`);
});
const config = await buildCodexPluginThreadConfig({
pluginConfig: {
codexPlugins: {
enabled: true,
allow_destructive_actions: "always",
plugins: {
"google-calendar": {
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
pluginName: "google-calendar",
},
},
},
},
appCache,
appCacheKey: "runtime",
nowMs: 1,
request,
});
const apps = config.configPatch?.apps as Record<string, unknown> | undefined;
expect(apps?.["google-calendar-app"]).toEqual({
enabled: true,
destructive_enabled: true,
open_world_enabled: true,
default_tools_approval_mode: "auto",
});
expect(config.policyContext.apps["google-calendar-app"]).toMatchObject({
allowDestructiveActions: true,
destructiveApprovalMode: "always",
});
expect(request).toHaveBeenCalledWith("config/read", { includeLayers: false });
expect(request.mock.calls.filter(([method]) => method === "config/read")).toHaveLength(2);
expect(request).toHaveBeenCalledWith("config/value/write", {
keyPath: 'apps."google-calendar-app".tools."calendar/create".approval_mode',
value: null,
mergeStrategy: "replace",
});
expect(request).toHaveBeenCalledWith("config/value/write", {
keyPath: 'apps."google-calendar-app".tools."calendar/update".approval_mode',
value: null,
mergeStrategy: "replace",
});
expect(request).not.toHaveBeenCalledWith("config/value/write", {
keyPath: 'apps."google-calendar-app".tools',
value: null,
mergeStrategy: "replace",
});
});
it("omits always policy apps when cwd effective approval overrides remain after cleanup", async () => {
const appCache = new CodexAppInventoryCache();
await appCache.refreshNow({
key: "runtime",
nowMs: 0,
request: async () => ({
data: [appInfo("google-calendar-app", true)],
nextCursor: null,
}),
});
let configReadCount = 0;
const request = vi.fn(async (method: string) => {
if (method === "plugin/list") {
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
}
if (method === "plugin/read") {
return pluginDetail(
"google-calendar",
[appSummary("google-calendar-app")],
["google-calendar"],
);
}
if (method === "config/read") {
configReadCount += 1;
return {
config: {
apps: {
"google-calendar-app": {
tools: {
"calendar/create": {
approval_mode: "approve",
source: configReadCount === 1 ? "user" : "project",
},
},
},
},
},
};
}
if (method === "config/value/write") {
return { status: "ok" };
}
throw new Error(`unexpected request ${method}`);
});
const config = await buildCodexPluginThreadConfig({
pluginConfig: {
codexPlugins: {
enabled: true,
allow_destructive_actions: "always",
plugins: {
"google-calendar": {
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
pluginName: "google-calendar",
},
},
},
},
appCache,
appCacheKey: "runtime",
configCwd: "/repo/project",
nowMs: 1,
request,
});
expect(config.configPatch).toEqual({
apps: {
_default: {
enabled: false,
destructive_enabled: false,
open_world_enabled: false,
},
},
});
expect(config.policyContext.apps).toStrictEqual({});
expect(request).toHaveBeenCalledWith("config/read", {
includeLayers: false,
cwd: "/repo/project",
});
expect(request.mock.calls.filter(([method]) => method === "config/read")).toHaveLength(2);
expect(config.diagnostics).toStrictEqual([
{
code: "approval_overrides_clear_failed",
plugin: {
configKey: "google-calendar",
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
pluginName: "google-calendar",
enabled: true,
allowDestructiveActions: true,
destructiveApprovalMode: "always",
},
message:
"Could not clear durable Codex app approval overrides for google-calendar-app: effective approval overrides remain for calendar/create",
},
]);
});
it("omits always policy apps when approval override writes are overridden", async () => {
const appCache = new CodexAppInventoryCache();
await appCache.refreshNow({
key: "runtime",
nowMs: 0,
request: async () => ({
data: [appInfo("google-calendar-app", true)],
nextCursor: null,
}),
});
const request = vi.fn(async (method: string) => {
if (method === "plugin/list") {
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
}
if (method === "plugin/read") {
return pluginDetail(
"google-calendar",
[appSummary("google-calendar-app")],
["google-calendar"],
);
}
if (method === "config/read") {
return {
config: {
apps: {
"google-calendar-app": {
tools: {
"calendar/create": {
approval_mode: "approve",
},
},
},
},
},
};
}
if (method === "config/value/write") {
return { status: "okOverridden" };
}
throw new Error(`unexpected request ${method}`);
});
const config = await buildCodexPluginThreadConfig({
pluginConfig: {
codexPlugins: {
enabled: true,
allow_destructive_actions: "always",
plugins: {
"google-calendar": {
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
pluginName: "google-calendar",
},
},
},
},
appCache,
appCacheKey: "runtime",
configCwd: "/repo/project",
nowMs: 1,
request,
});
expect(config.configPatch).toEqual({
apps: {
_default: {
enabled: false,
destructive_enabled: false,
open_world_enabled: false,
},
},
});
expect(config.policyContext.apps).toStrictEqual({});
expect(config.diagnostics).toStrictEqual([
{
code: "approval_overrides_clear_failed",
plugin: {
configKey: "google-calendar",
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
pluginName: "google-calendar",
enabled: true,
allowDestructiveActions: true,
destructiveApprovalMode: "always",
},
message:
"Could not clear durable Codex app approval overrides for google-calendar-app: approval override for calendar/create is controlled by another config layer",
},
]);
});
it("omits always policy apps when durable approval override cleanup fails", async () => {
const appCache = new CodexAppInventoryCache();
await appCache.refreshNow({
key: "runtime",
nowMs: 0,
request: async () => ({
data: [appInfo("google-calendar-app", true)],
nextCursor: null,
}),
});
const config = await buildCodexPluginThreadConfig({
pluginConfig: {
codexPlugins: {
enabled: true,
allow_destructive_actions: "always",
plugins: {
"google-calendar": {
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
pluginName: "google-calendar",
},
},
},
},
appCache,
appCacheKey: "runtime",
nowMs: 1,
request: async (method) => {
if (method === "plugin/list") {
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
}
if (method === "plugin/read") {
return pluginDetail(
"google-calendar",
[appSummary("google-calendar-app")],
["google-calendar"],
);
}
if (method === "config/read") {
throw new Error("readonly config");
}
throw new Error(`unexpected request ${method}`);
},
});
expect(config.configPatch).toEqual({
apps: {
_default: {
enabled: false,
destructive_enabled: false,
open_world_enabled: false,
},
},
});
expect(config.policyContext.apps).toStrictEqual({});
expect(config.diagnostics).toStrictEqual([
{
code: "approval_overrides_clear_failed",
plugin: {
configKey: "google-calendar",
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
pluginName: "google-calendar",
enabled: true,
allowDestructiveActions: true,
destructiveApprovalMode: "always",
},
message:
"Could not clear durable Codex app approval overrides for google-calendar-app: readonly config",
},
]);
});
it("builds a restrictive app config when native plugin support is disabled", async () => {
expect(
shouldBuildCodexPluginThreadConfig({

View File

@@ -29,7 +29,7 @@ import {
type CodexPluginOwnedApp,
type CodexPluginRuntimeRequest,
} from "./plugin-inventory.js";
import type { JsonObject, JsonValue } from "./protocol.js";
import { isJsonObject, type JsonObject, type JsonValue } from "./protocol.js";
/** Policy context for one app id exposed by a configured Codex plugin. */
export type PluginAppPolicyContextEntry = {
@@ -52,7 +52,7 @@ export type PluginAppPolicyContext = {
export type CodexPluginThreadConfigDiagnostic =
| CodexPluginInventoryDiagnostic
| {
code: "plugin_activation_failed" | "app_not_ready";
code: "plugin_activation_failed" | "app_not_ready" | "approval_overrides_clear_failed";
plugin?: ResolvedCodexPluginPolicy;
message: string;
};
@@ -72,6 +72,7 @@ export type CodexPluginThreadConfig = {
export type BuildCodexPluginThreadConfigParams = {
pluginConfig?: unknown;
request: CodexPluginRuntimeRequest;
configCwd?: string;
appCache?: CodexAppInventoryCache;
appCacheKey: string;
nowMs?: number;
@@ -250,6 +251,18 @@ export async function buildCodexPluginThreadConfig(
});
continue;
}
if (
record.policy.destructiveApprovalMode === "always" &&
!(await clearPersistedAppToolApprovalOverrides({
request: params.request,
configCwd: params.configCwd,
plugin: record.policy,
app,
diagnostics,
}))
) {
continue;
}
const appConfig: JsonObject = {
enabled: true,
destructive_enabled: record.policy.allowDestructiveActions,
@@ -367,6 +380,86 @@ function buildPluginAppPolicyContext(
};
}
async function clearPersistedAppToolApprovalOverrides(params: {
request: CodexPluginRuntimeRequest;
configCwd?: string;
plugin: ResolvedCodexPluginPolicy;
app: CodexPluginOwnedApp;
diagnostics: CodexPluginThreadConfigDiagnostic[];
}): Promise<boolean> {
try {
const overrideNames = await readPersistedAppToolApprovalOverrideNames(params);
for (const toolName of overrideNames) {
const response = await params.request("config/value/write", {
keyPath: `apps.${quoteConfigKeyPathSegment(params.app.id)}.tools.${quoteConfigKeyPathSegment(
toolName,
)}.approval_mode`,
value: null,
mergeStrategy: "replace",
});
if (isOverriddenConfigWriteResponse(response)) {
throw new Error(`approval override for ${toolName} is controlled by another config layer`);
}
}
const remainingOverrideNames = await readPersistedAppToolApprovalOverrideNames(params);
if (remainingOverrideNames.length > 0) {
throw new Error(
`effective approval overrides remain for ${remainingOverrideNames.join(", ")}`,
);
}
return true;
} catch (error) {
params.diagnostics.push({
code: "approval_overrides_clear_failed",
plugin: params.plugin,
message: `Could not clear durable Codex app approval overrides for ${params.app.id}: ${
error instanceof Error ? error.message : String(error)
}`,
});
return false;
}
}
async function readPersistedAppToolApprovalOverrideNames(params: {
request: CodexPluginRuntimeRequest;
configCwd?: string;
app: CodexPluginOwnedApp;
}): Promise<string[]> {
const response = await params.request("config/read", {
includeLayers: false,
...(params.configCwd ? { cwd: params.configCwd } : {}),
});
const config = isJsonObject(response) ? response.config : undefined;
const appsRoot = isJsonObject(config) ? config.apps : undefined;
const nestedApps = isJsonObject(appsRoot) ? appsRoot.apps : undefined;
const appConfig = isJsonObject(appsRoot)
? (appsRoot[params.app.id] ??
(isJsonObject(nestedApps) ? nestedApps[params.app.id] : undefined))
: undefined;
const tools = isJsonObject(appConfig) ? appConfig.tools : undefined;
if (!isJsonObject(tools)) {
return [];
}
return Object.entries(tools)
.filter(([, value]) => hasPersistedToolApprovalOverride(value))
.map(([toolName]) => toolName)
.toSorted();
}
function hasPersistedToolApprovalOverride(value: JsonValue): boolean {
return (
isJsonObject(value) && (value.approval_mode !== undefined || value.approvalMode !== undefined)
);
}
function isOverriddenConfigWriteResponse(response: unknown): boolean {
return isJsonObject(response) && response.status === "okOverridden";
}
function quoteConfigKeyPathSegment(segment: string): string {
return `"${segment.replace(/["\\]/g, (char) => `\\${char}`)}"`;
}
function shouldWaitForInitialAppInventory(
params: BuildCodexPluginThreadConfigParams,
policy: ResolvedCodexPluginsPolicy,

View File

@@ -575,6 +575,8 @@ type CodexAppServerRequestResultMap = {
"account/read": CodexGetAccountResponse;
"app/list": CodexAppsListResponse;
"config/mcpServer/reload": JsonValue;
"config/read": JsonValue;
"config/value/write": JsonValue;
"environment/add": JsonValue;
"experimentalFeature/enablement/set": JsonValue;
"feedback/upload": JsonValue;

View File

@@ -112,6 +112,44 @@ describe("requestCodexAppServerJson sandbox guard", () => {
expect(request).toHaveBeenCalledWith("thread/list", { limit: 10 }, { timeoutMs: 60_000 });
});
it("allows config value writes in sandboxed sessions", async () => {
const request = vi.fn(async () => ({ ok: true }));
sharedClientMocks.getSharedCodexAppServerClient.mockResolvedValue({ request });
const params = {
keyPath: 'apps."google-calendar-app".tools',
value: null,
mergeStrategy: "replace",
};
await expect(
requestCodexAppServerJson({
method: "config/value/write",
requestParams: params,
config: { agents: { defaults: { sandbox: { mode: "all" } } } },
sessionKey: "sandboxed-session",
}),
).resolves.toEqual({ ok: true });
expect(request).toHaveBeenCalledWith("config/value/write", params, { timeoutMs: 60_000 });
});
it("allows config reads in sandboxed sessions", async () => {
const request = vi.fn(async () => ({ config: { apps: { apps: {} } } }));
sharedClientMocks.getSharedCodexAppServerClient.mockResolvedValue({ request });
const params = { includeLayers: false };
await expect(
requestCodexAppServerJson({
method: "config/read",
requestParams: params,
config: { agents: { defaults: { sandbox: { mode: "all" } } } },
sessionKey: "sandboxed-session",
}),
).resolves.toEqual({ config: { apps: { apps: {} } } });
expect(request).toHaveBeenCalledWith("config/read", params, { timeoutMs: 60_000 });
});
it("allows sandbox-pinned thread starts in sandboxed sessions", async () => {
const request = vi.fn(async () => ({ thread: { id: "thread-1" }, model: "gpt-5.5" }));
sharedClientMocks.getSharedCodexAppServerClient.mockResolvedValue({ request });

View File

@@ -19,6 +19,8 @@ const DIRECT_METHOD_POLICIES = new Map<string, DirectMethodPolicy>([
["account/read", "allowed-control-plane"],
["app/list", "allowed-control-plane"],
["config/mcpServer/reload", "allowed-control-plane"],
["config/read", "allowed-control-plane"],
["config/value/write", "allowed-control-plane"],
["environment/add", "allowed-control-plane"],
["experimentalFeature/enablement/set", "allowed-control-plane"],
["feedback/upload", "allowed-control-plane"],

View File

@@ -145,6 +145,35 @@ describe("codex app-server session binding", () => {
expect(binding?.pluginAppPolicyContext).toEqual(pluginAppPolicyContext);
});
it("round-trips always plugin app policy context destructive approval mode", async () => {
const sessionFile = path.join(tempDir, "session.json");
const pluginAppPolicyContext = {
fingerprint: "plugin-policy-always",
apps: {
"google-calendar-app": {
configKey: "google-calendar",
marketplaceName: "openai-curated" as const,
pluginName: "google-calendar",
allowDestructiveActions: true,
destructiveApprovalMode: "always" as const,
mcpServerNames: ["google-calendar"],
},
},
pluginAppIds: {
"google-calendar": ["google-calendar-app"],
},
};
await writeCodexAppServerBinding(sessionFile, {
threadId: "thread-123",
cwd: tempDir,
pluginAppPolicyContext,
});
const binding = await readCodexAppServerBinding(sessionFile);
expect(binding?.pluginAppPolicyContext).toEqual(pluginAppPolicyContext);
});
it("normalizes v1 plugin app policy context destructive approval modes", async () => {
const sessionFile = path.join(tempDir, "session.json");
await fs.writeFile(

View File

@@ -421,6 +421,9 @@ function readDestructiveApprovalMode(
if (value === "auto") {
return bindingSchemaVersion === 1 ? "allow" : "auto";
}
if (value === "always" && bindingSchemaVersion === 2) {
return "always";
}
if (value === "on-request" && bindingSchemaVersion === 1) {
return "auto";
}

View File

@@ -5,6 +5,7 @@ import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import {
createCodexTrajectoryRecorder,
recordCodexTrajectoryCompletion,
recordCodexTrajectoryContext,
resolveCodexTrajectoryAppendFlags,
resolveCodexTrajectoryPointerFlags,
@@ -80,7 +81,9 @@ describe("Codex trajectory recorder", () => {
expect(content).not.toContain("secret");
expect(content).not.toContain("sk-test-secret-token");
expect(content).not.toContain("sk-other-secret-token");
expect(fs.statSync(filePath).mode & 0o777).toBe(0o600);
if (process.platform !== "win32") {
expect(fs.statSync(filePath).mode & 0o777).toBe(0o600);
}
expect(fs.existsSync(path.join(tmpDir, "session.trajectory-path.json"))).toBe(true);
});
@@ -253,4 +256,235 @@ describe("Codex trajectory recorder", () => {
expect(parsed.data?.truncated).toBe(true);
expect(parsed.data?.reason).toBe("trajectory-event-size-limit");
});
it("preserves usage when truncating oversized model completion events", async () => {
const tmpDir = makeTempDir();
const sessionFile = path.join(tmpDir, "session.jsonl");
const attempt = {
sessionFile,
sessionId: "session-1",
sessionKey: "agent:main:session-1",
runId: "run-1",
provider: "codex",
modelId: "gpt-5.4",
model: { api: "responses" },
} as never;
const usage = {
input: 384_954,
output: 5_624,
cacheRead: 333_824,
reasoningTokens: 2_038,
total: 724_402,
};
const recorder = createCodexTrajectoryRecorder({
cwd: tmpDir,
attempt,
env: {},
});
const trajectoryRecorder = expectTrajectoryRecorder(recorder);
recordCodexTrajectoryCompletion(trajectoryRecorder, {
attempt,
threadId: "thread-1",
turnId: "turn-1",
timedOut: false,
result: {
aborted: false,
attemptUsage: usage,
assistantTexts: ["done"],
messagesSnapshot: Array.from({ length: 20 }, (_value, index) => ({
role: index % 2 === 0 ? "user" : "assistant",
content: `message-${index} ${"x".repeat(32_000)}`,
})),
} as never,
});
await trajectoryRecorder.flush();
const parsed = JSON.parse(
fs.readFileSync(path.join(tmpDir, "session.trajectory.jsonl"), "utf8"),
);
expect(parsed.type).toBe("model.completed");
expect(parsed.data).toMatchObject({
truncated: true,
reason: "trajectory-event-size-limit",
usage,
});
expect(parsed.data.messagesSnapshot).toBeUndefined();
expect(parsed.data.droppedFields).toContain("messagesSnapshot");
expect(Buffer.byteLength(JSON.stringify(parsed), "utf8")).toBeLessThanOrEqual(256 * 1024);
});
it("drops oversized preserved fields when needed to keep completion events bounded", async () => {
const tmpDir = makeTempDir();
const sessionFile = path.join(tmpDir, "session.jsonl");
const attempt = {
sessionFile,
sessionId: "session-1",
sessionKey: "agent:main:session-1",
runId: "run-1",
provider: "codex",
modelId: "gpt-5.4",
model: { api: "responses" },
} as never;
const oversizedUsage = Object.fromEntries(
Array.from({ length: 100 }, (_value, index) => [`field-${index}`, "x".repeat(5_000)]),
);
const recorder = createCodexTrajectoryRecorder({
cwd: tmpDir,
attempt,
env: {},
});
const trajectoryRecorder = expectTrajectoryRecorder(recorder);
recordCodexTrajectoryCompletion(trajectoryRecorder, {
attempt,
threadId: "thread-1",
turnId: "turn-1",
timedOut: false,
result: {
aborted: false,
attemptUsage: oversizedUsage,
assistantTexts: ["x".repeat(32_000)],
messagesSnapshot: [{ role: "assistant", content: "x".repeat(32_000) }],
} as never,
});
await trajectoryRecorder.flush();
const parsed = JSON.parse(
fs.readFileSync(path.join(tmpDir, "session.trajectory.jsonl"), "utf8"),
);
expect(parsed.data).toMatchObject({
truncated: true,
reason: "trajectory-event-size-limit",
});
expect(parsed.data.usage).toBeUndefined();
expect(parsed.data.droppedFields).toEqual(
expect.arrayContaining(["usage", "assistantTexts", "messagesSnapshot"]),
);
expect(Buffer.byteLength(JSON.stringify(parsed), "utf8")).toBeLessThanOrEqual(256 * 1024);
});
it("preserves usage on non-final oversized model completion events", async () => {
const tmpDir = makeTempDir();
const sessionFile = path.join(tmpDir, "session.jsonl");
const attempt = {
sessionFile,
sessionId: "session-1",
sessionKey: "agent:main:session-1",
runId: "run-1",
provider: "codex",
modelId: "gpt-5.4",
model: { api: "responses" },
} as never;
const firstUsage = {
input: 384_954,
output: 5_624,
cacheRead: 333_824,
reasoningTokens: 2_038,
total: 724_402,
};
const secondUsage = { input: 12, output: 3, total: 15 };
const recorder = createCodexTrajectoryRecorder({
cwd: tmpDir,
attempt,
env: {},
});
const trajectoryRecorder = expectTrajectoryRecorder(recorder);
recordCodexTrajectoryCompletion(trajectoryRecorder, {
attempt,
threadId: "thread-1",
turnId: "turn-1",
timedOut: false,
result: {
aborted: false,
attemptUsage: firstUsage,
assistantTexts: ["first"],
messagesSnapshot: Array.from({ length: 20 }, (_value, index) => ({
role: index % 2 === 0 ? "user" : "assistant",
content: `message-${index} ${"x".repeat(32_000)}`,
})),
} as never,
});
recordCodexTrajectoryCompletion(trajectoryRecorder, {
attempt,
threadId: "thread-1",
turnId: "turn-2",
timedOut: false,
result: {
aborted: false,
attemptUsage: secondUsage,
assistantTexts: ["final answer"],
messagesSnapshot: [{ role: "assistant", content: "final answer" }],
} as never,
});
await trajectoryRecorder.flush();
const events = fs
.readFileSync(path.join(tmpDir, "session.trajectory.jsonl"), "utf8")
.trim()
.split(/\r?\n/u)
.map((line) => JSON.parse(line));
expect(events).toHaveLength(2);
expect(events[0].data).toMatchObject({
truncated: true,
usage: firstUsage,
});
expect(events[1].data).toMatchObject({
turnId: "turn-2",
usage: secondUsage,
assistantTexts: ["final answer"],
});
expect(events[1].data.truncated).toBeUndefined();
});
it("redacts secrets before preserving usage in truncated completion events", async () => {
const tmpDir = makeTempDir();
const sessionFile = path.join(tmpDir, "session.jsonl");
const attempt = {
sessionFile,
sessionId: "session-1",
sessionKey: "agent:main:session-1",
runId: "run-1",
provider: "codex",
modelId: "gpt-5.4",
model: { api: "responses" },
} as never;
const recorder = createCodexTrajectoryRecorder({
cwd: tmpDir,
attempt,
env: {},
});
const trajectoryRecorder = expectTrajectoryRecorder(recorder);
recordCodexTrajectoryCompletion(trajectoryRecorder, {
attempt,
threadId: "thread-1",
turnId: "turn-1",
timedOut: false,
result: {
aborted: false,
attemptUsage: {
total: 1,
apiKey: "sk-test-secret-token",
authorization: "Bearer sk-other-secret-token",
},
assistantTexts: ["done"],
messagesSnapshot: Array.from({ length: 20 }, (_value, index) => ({
role: index % 2 === 0 ? "user" : "assistant",
content: `message-${index} ${"x".repeat(32_000)}`,
})),
} as never,
});
await trajectoryRecorder.flush();
const parsed = JSON.parse(
fs.readFileSync(path.join(tmpDir, "session.trajectory.jsonl"), "utf8"),
);
const preservedUsage = JSON.stringify(parsed.data.usage);
expect(parsed.data.truncated).toBe(true);
expect(preservedUsage).toContain("redacted");
expect(preservedUsage).not.toContain("sk-test-secret-token");
expect(preservedUsage).not.toContain("sk-other-secret-token");
});
});

View File

@@ -40,6 +40,7 @@ const JWT_VALUE_RE = /\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]
const COOKIE_PAIR_RE = /\b([A-Za-z][A-Za-z0-9_.-]{1,64})=([A-Za-z0-9+/._~%=-]{16,})(?=;|\s|$)/gu;
const TRAJECTORY_RUNTIME_FILE_MAX_BYTES = 50 * 1024 * 1024;
const TRAJECTORY_RUNTIME_EVENT_MAX_BYTES = 256 * 1024;
const TRAJECTORY_RUNTIME_OVERSIZE_PRESERVED_DATA_KEYS = ["usage", "promptCache"] as const;
type CodexTrajectoryOpenFlagConstants = Pick<
typeof nodeFs.constants,
@@ -82,19 +83,57 @@ function boundedTrajectoryLine(event: Record<string, unknown>): string | undefin
if (bytes <= TRAJECTORY_RUNTIME_EVENT_MAX_BYTES) {
return `${line}\n`;
}
const truncated = JSON.stringify({
...event,
data: {
truncated: true,
originalBytes: bytes,
limitBytes: TRAJECTORY_RUNTIME_EVENT_MAX_BYTES,
reason: "trajectory-event-size-limit",
},
});
if (Buffer.byteLength(truncated, "utf8") <= TRAJECTORY_RUNTIME_EVENT_MAX_BYTES) {
return `${truncated}\n`;
const originalData =
event.data && typeof event.data === "object" && !Array.isArray(event.data)
? (event.data as Record<string, unknown>)
: {};
const originalDataKeys = Object.keys(originalData);
const preservedDataKeys = new Set<string>();
const baseData = {
truncated: true,
originalBytes: bytes,
limitBytes: TRAJECTORY_RUNTIME_EVENT_MAX_BYTES,
reason: "trajectory-event-size-limit",
};
const buildTruncatedLine = (includeDroppedFields: boolean): string | undefined => {
const data: Record<string, unknown> = { ...baseData };
for (const key of TRAJECTORY_RUNTIME_OVERSIZE_PRESERVED_DATA_KEYS) {
if (preservedDataKeys.has(key)) {
data[key] = originalData[key];
}
}
if (includeDroppedFields) {
const droppedFields = originalDataKeys.filter((key) => !preservedDataKeys.has(key));
if (droppedFields.length > 0) {
data.droppedFields = droppedFields;
}
}
const truncated = JSON.stringify({ ...event, data });
if (Buffer.byteLength(truncated, "utf8") <= TRAJECTORY_RUNTIME_EVENT_MAX_BYTES) {
return `${truncated}\n`;
}
return undefined;
};
let best = buildTruncatedLine(true) ?? buildTruncatedLine(false);
if (!best) {
return undefined;
}
return undefined;
for (const key of TRAJECTORY_RUNTIME_OVERSIZE_PRESERVED_DATA_KEYS) {
if (!Object.hasOwn(originalData, key)) {
continue;
}
preservedDataKeys.add(key);
const next = buildTruncatedLine(true) ?? buildTruncatedLine(false);
if (next) {
best = next;
continue;
}
preservedDataKeys.delete(key);
}
return best;
}
function resolveTrajectoryPointerFilePath(sessionFile: string): string {

View File

@@ -23,7 +23,7 @@ export type CodexPluginConfigEntry = {
enabled?: boolean;
marketplaceName?: string;
pluginName?: string;
allow_destructive_actions?: boolean | "auto";
allow_destructive_actions?: boolean | "auto" | "always";
};
export type CodexPluginsConfigBlock = {

View File

@@ -43,7 +43,7 @@ export type CodexPluginMigrationConfigEntry = {
configKey: string;
pluginName: string;
enabled: boolean;
allowDestructiveActions?: "auto";
allowDestructiveActions?: "auto" | "always";
};
type CodexPluginMigrationBlockSkipDetails = {
@@ -168,15 +168,18 @@ function isLegacyDestructivePolicyRepair(
);
}
function isLegacyDestructivePolicyConfigEntryRepair(
function readExistingPluginAllowDestructiveActions(
existing: unknown,
pluginName: string,
): boolean {
): "auto" | "always" | undefined {
const existingEntry = isRecord(existing) ? existing : undefined;
return (
existingEntry?.allow_destructive_actions === "on-request" &&
existingEntry.pluginName === pluginName
if (existingEntry?.pluginName !== pluginName) {
return undefined;
}
const normalized = normalizeExistingAllowDestructiveActions(
existingEntry.allow_destructive_actions,
);
return normalized === "auto" || normalized === "always" ? normalized : undefined;
}
function buildPluginItems(
@@ -203,12 +206,15 @@ function buildPluginItems(
enabled: true,
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
pluginName: plugin.pluginName,
...(isLegacyDestructivePolicyConfigEntryRepair(
existingPluginEntries[configKey],
plugin.pluginName,
)
? { allow_destructive_actions: "auto" }
: {}),
...(() => {
const allowDestructiveActions = readExistingPluginAllowDestructiveActions(
existingPluginEntries[configKey],
plugin.pluginName,
);
return allowDestructiveActions
? { allow_destructive_actions: allowDestructiveActions }
: {};
})(),
};
const conflict =
!ctx.overwrite &&
@@ -234,8 +240,9 @@ function buildPluginItems(
pluginName: plugin.pluginName,
sourceInstalled: plugin.installed === true,
sourceEnabled: plugin.enabled === true,
...(plannedEntry.allow_destructive_actions === "auto"
? { allowDestructiveActions: "auto" }
...(plannedEntry.allow_destructive_actions === "auto" ||
plannedEntry.allow_destructive_actions === "always"
? { allowDestructiveActions: plannedEntry.allow_destructive_actions }
: {}),
...(plugin.apps && plugin.apps.length > 0 && !shouldVerifyPluginApps(ctx)
? { sourceAppVerification: CODEX_PLUGIN_SOURCE_APP_VERIFICATION_UNVERIFIED }
@@ -310,13 +317,15 @@ export function readCodexPluginMigrationConfigEntry(
configKey,
pluginName,
enabled,
...(allowDestructiveActions === "auto" ? { allowDestructiveActions: "auto" } : {}),
...(allowDestructiveActions === "auto" || allowDestructiveActions === "always"
? { allowDestructiveActions }
: {}),
};
}
function readExistingAllowDestructiveActions(
config: MigrationProviderContext["config"],
): boolean | "auto" | undefined {
): boolean | "auto" | "always" | undefined {
const value = readMigrationConfigPath(config as Record<string, unknown>, [
...CODEX_PLUGIN_NATIVE_CONFIG_PATH,
"allow_destructive_actions",
@@ -324,8 +333,16 @@ function readExistingAllowDestructiveActions(
return normalizeExistingAllowDestructiveActions(value);
}
function normalizeExistingAllowDestructiveActions(value: unknown): boolean | "auto" | undefined {
return value === "auto" || value === "on-request" ? "auto" : asBoolean(value);
function normalizeExistingAllowDestructiveActions(
value: unknown,
): boolean | "auto" | "always" | undefined {
if (value === "auto" || value === "on-request") {
return "auto";
}
if (value === "always") {
return "always";
}
return asBoolean(value);
}
function readExistingPluginPolicyRepairs(

View File

@@ -2108,6 +2108,76 @@ describe("buildCodexMigrationProvider", () => {
});
});
it("preserves global always destructive plugin policy during migration", async () => {
const fixture = await createCodexFixture();
const configState: MigrationProviderContext["config"] = {
plugins: {
entries: {
codex: {
enabled: true,
config: {
codexPlugins: {
enabled: true,
allow_destructive_actions: "always",
plugins: {},
},
},
},
},
},
agents: { defaults: { workspace: fixture.workspaceDir } },
} as MigrationProviderContext["config"];
appServerRequest.mockImplementation(async ({ method }: { method: string }) => {
if (method === "plugin/list") {
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
}
if (method === "plugin/read") {
return pluginRead("google-calendar");
}
if (method === "plugin/install") {
return { authPolicy: "ON_USE", appsNeedingAuth: [] } satisfies v2.PluginInstallResponse;
}
if (method === "skills/list") {
return { data: [] } satisfies v2.SkillsListResponse;
}
if (method === "hooks/list") {
return { data: [] } satisfies v2.HooksListResponse;
}
if (method === "config/mcpServer/reload") {
return {};
}
if (method === "app/list") {
return appsList([]);
}
throw new Error(`unexpected request ${method}`);
});
const provider = buildCodexMigrationProvider({
runtime: createConfigRuntime(configState),
});
const result = await provider.apply(
makeContext({
source: fixture.codexHome,
stateDir: fixture.stateDir,
workspaceDir: fixture.workspaceDir,
config: configState,
}),
);
expectRecordFields(findItem(result.items, "config:codex-plugins"), { status: "migrated" });
expect(configState.plugins?.entries?.codex?.config?.codexPlugins).toEqual({
enabled: true,
allow_destructive_actions: "always",
plugins: {
"google-calendar": {
enabled: true,
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
pluginName: "google-calendar",
},
},
});
});
it("records auth-required plugin installs as disabled explicit config entries", async () => {
const fixture = await createCodexFixture();
const configState: MigrationProviderContext["config"] = {

View File

@@ -207,4 +207,65 @@ describe("codex cli node sessions", () => {
}),
).rejects.toThrow("Codex CLI node command returned malformed payloadJSON.");
});
it("keeps Codex history session previews on UTF-16 code point boundaries", async () => {
const sessionId = "019e2007-1f7e-7eb1-a42b-8c01f4b9b5ce";
const text = `${"a".repeat(136)}🤖tail`;
await fs.writeFile(
path.join(tempDir, "history.jsonl"),
JSON.stringify({ session_id: sessionId, ts: 1778678322, text }),
);
const command = createCodexCliSessionNodeHostCommands().find(
(entry) => entry.command === CODEX_CLI_SESSIONS_LIST_COMMAND,
);
const raw = await command?.handle(JSON.stringify({ filter: "", limit: 5 }));
const parsed = JSON.parse(raw ?? "{}") as {
sessions?: Array<{ lastMessage?: string }>;
};
expect(parsed.sessions?.[0]?.lastMessage).toBe(`${"a".repeat(136)}...`);
expect(parsed.sessions?.[0]?.lastMessage).not.toContain("\ud83e");
expect(parsed.sessions?.[0]?.lastMessage).not.toContain("\udd16");
});
it("keeps Codex session-file previews on UTF-16 code point boundaries", async () => {
const sessionId = "019e23d1-f33d-78e3-959e-0f56f30a5248";
const sessionDir = path.join(tempDir, "sessions", "2026", "05", "14");
const sessionFile = path.join(sessionDir, `rollout-2026-05-14T00-10-22-${sessionId}.jsonl`);
const text = `${"b".repeat(136)}🤖tail`;
await fs.mkdir(sessionDir, { recursive: true });
await fs.writeFile(
sessionFile,
[
JSON.stringify({
timestamp: "2026-05-14T00:10:23.618Z",
type: "session_meta",
payload: { id: sessionId, cwd: "/tmp/codex-work" },
}),
JSON.stringify({
timestamp: "2026-05-14T00:10:23.619Z",
type: "response_item",
payload: {
type: "message",
role: "user",
content: [{ type: "input_text", text }],
},
}),
].join("\n"),
);
const command = createCodexCliSessionNodeHostCommands().find(
(entry) => entry.command === CODEX_CLI_SESSIONS_LIST_COMMAND,
);
const raw = await command?.handle(JSON.stringify({ filter: "", limit: 5 }));
const parsed = JSON.parse(raw ?? "{}") as {
sessions?: Array<{ lastMessage?: string }>;
};
expect(parsed.sessions?.[0]?.lastMessage).toBe(`${"b".repeat(136)}...`);
expect(parsed.sessions?.[0]?.lastMessage).not.toContain("\ud83e");
expect(parsed.sessions?.[0]?.lastMessage).not.toContain("\udd16");
});
});

View File

@@ -12,6 +12,7 @@ import type {
import type { PluginRuntime } from "openclaw/plugin-sdk/plugin-runtime";
import { isRecord } from "openclaw/plugin-sdk/string-coerce-runtime";
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
import {
materializeWindowsSpawnProgram,
resolveWindowsSpawnProgram,
@@ -691,7 +692,10 @@ function normalizeTimeoutMs(value: unknown): number {
}
function truncateText(value: string, max: number): string {
return value.length > max ? `${value.slice(0, max - 3)}...` : value;
if (value.length <= max) {
return value;
}
return `${truncateUtf16Safe(value, Math.max(0, max - 3))}...`;
}
function compareOptionalStringsDesc(a?: string, b?: string): number {

View File

@@ -36,6 +36,10 @@ type DuckDuckGoResult = {
snippet: string;
};
function isDecodableCodePoint(cp: number): boolean {
return Number.isInteger(cp) && cp >= 0 && cp <= 0x10ffff && (cp < 0xd800 || cp > 0xdfff);
}
function decodeHtmlEntities(text: string): string {
return text.replace(
/&(?:lt|gt|quot|apos|#39|#x27|#x2F|nbsp|ndash|mdash|hellip|amp|#\d+|#x[0-9a-f]+);/gi,
@@ -72,10 +76,12 @@ function decodeHtmlEntities(text: string): string {
return "&";
}
if (normalized.startsWith("&#x")) {
return String.fromCodePoint(Number.parseInt(normalized.slice(3, -1), 16));
const codePoint = Number.parseInt(normalized.slice(3, -1), 16);
return isDecodableCodePoint(codePoint) ? String.fromCodePoint(codePoint) : entity;
}
if (normalized.startsWith("&#")) {
return String.fromCodePoint(Number.parseInt(normalized.slice(2, -1), 10));
const codePoint = Number.parseInt(normalized.slice(2, -1), 10);
return isDecodableCodePoint(codePoint) ? String.fromCodePoint(codePoint) : entity;
}
return entity;
},

View File

@@ -205,6 +205,20 @@ describe("duckduckgo web search provider", () => {
);
});
it("leaves out-of-range numeric html entities intact instead of throwing", () => {
expect(() => ddgClientTesting.decodeHtmlEntities("Result &#99999999; end")).not.toThrow();
expect(ddgClientTesting.decodeHtmlEntities("Result &#99999999; end")).toBe(
"Result &#99999999; end",
);
expect(ddgClientTesting.decodeHtmlEntities("Hex &#x110000; tail")).toBe("Hex &#x110000; tail");
// Surrogate-range entities would decode to lone UTF-16 surrogates; keep them intact.
expect(ddgClientTesting.decodeHtmlEntities("Bad &#55296; end")).toBe("Bad &#55296; end");
expect(ddgClientTesting.decodeHtmlEntities("Bad &#xD800; end")).toBe("Bad &#xD800; end");
expect(ddgClientTesting.decodeHtmlEntities("Bad &#xDFFF; end")).toBe("Bad &#xDFFF; end");
// A valid supplementary-plane entity still decodes.
expect(ddgClientTesting.decodeHtmlEntities("Smile &#128512;")).toBe("Smile 😀");
});
it("does not double-decode escaped entities (decodes &amp; last)", () => {
// A result whose text literally shows "&lt;" arrives double-encoded as
// "&amp;lt;". Decoding &amp; first would re-decode it into "<", corrupting

View File

@@ -11,6 +11,7 @@ import {
import {
assertOkOrThrowProviderError,
postJsonRequest,
readProviderJsonResponse,
type ProviderRequestTransportOverrides,
} from "openclaw/plugin-sdk/provider-http";
import {
@@ -97,11 +98,11 @@ async function generateGeminiInlineDataText(params: {
try {
await assertOkOrThrowProviderError(res, params.httpErrorLabel);
const payload = (await res.json()) as {
const payload = await readProviderJsonResponse<{
candidates?: Array<{
content?: { parts?: Array<{ text?: string }> };
}>;
};
}>(res, params.httpErrorLabel);
const parts = payload.candidates?.[0]?.content?.parts ?? [];
const text = parts
.map((part) => part?.text?.trim())

View File

@@ -1,4 +1,5 @@
// Google tests cover media understanding provider.video plugin behavior.
import { createServer, type Server } from "node:http";
import {
createRequestCaptureJsonFetch,
installPinnedHostnameTestHooks,
@@ -10,6 +11,49 @@ import { resolveGoogleGenerativeAiHttpRequestConfig } from "./runtime-api.js";
installPinnedHostnameTestHooks();
const LOOPBACK_RESPONSE_BYTES = 18 * 1024 * 1024;
async function listenLoopbackServer(server: Server): Promise<number> {
return await new Promise((resolve, reject) => {
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
server.off("error", reject);
const address = server.address();
if (!address || typeof address === "string") {
reject(new Error("expected loopback TCP address"));
return;
}
resolve(address.port);
});
});
}
function createOversizedJsonServer(): { server: Server; closed: Promise<number> } {
let resolveClosed: (sentBytes: number) => void = () => {};
const closed = new Promise<number>((resolve) => {
resolveClosed = resolve;
});
const server = createServer((_req, res) => {
let sentBytes = 0;
const chunk = Buffer.alloc(64 * 1024, 0x20);
res.writeHead(200, { "content-type": "application/json" });
const timer = setInterval(() => {
if (sentBytes >= LOOPBACK_RESPONSE_BYTES) {
clearInterval(timer);
res.end();
return;
}
sentBytes += chunk.length;
res.write(chunk);
}, 1);
res.on("close", () => {
clearInterval(timer);
resolveClosed(sentBytes);
});
});
return { server, closed };
}
describe("describeGeminiVideo", () => {
it("respects case-insensitive x-goog-api-key overrides", async () => {
let seenKey: string | null = null;
@@ -114,6 +158,29 @@ describe("describeGeminiVideo", () => {
);
});
it("bounds oversized video JSON responses and closes the stream early", async () => {
const { server, closed } = createOversizedJsonServer();
const port = await listenLoopbackServer(server);
const fetchFn = withFetchPreconnect(async () =>
fetch(`http://127.0.0.1:${port}/google-video-json`),
);
try {
await expect(
describeGeminiVideo({
buffer: Buffer.from("video-bytes"),
fileName: "clip.mp4",
apiKey: "test-key",
timeoutMs: 1500,
fetchFn,
}),
).rejects.toThrow(/JSON response exceeds 16777216 bytes/u);
await expect(closed).resolves.toBeLessThan(LOOPBACK_RESPONSE_BYTES);
} finally {
server.close();
}
});
it("rejects non-Google video base URLs before sending authenticated requests", async () => {
await expect(
describeGeminiVideo({

View File

@@ -20,6 +20,8 @@ const {
let buildGoogleSpeechProvider: typeof import("./speech-provider.js").buildGoogleSpeechProvider;
let testing: typeof import("./speech-provider.js").testing;
const GOOGLE_TTS_JSON_CAP_BYTES = 16 * 1024 * 1024;
beforeAll(async () => {
({ buildGoogleSpeechProvider, testing } = await import("./speech-provider.js"));
});
@@ -56,6 +58,26 @@ function installGoogleTtsRequestMock(pcm = Buffer.from([1, 0, 2, 0])) {
return postJsonRequestMock;
}
function oversizedGoogleTtsJsonResponse(onCancel: () => void): Response {
const response = new Response(
new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array(GOOGLE_TTS_JSON_CAP_BYTES + 1));
},
cancel() {
onCancel();
},
}),
{ headers: { "content-type": "application/json" }, status: 200 },
);
Object.defineProperty(response, "json", {
value: async () => {
throw new Error("unbounded json reader was used");
},
});
return response;
}
function expectRecordFields(value: unknown, expected: Record<string, unknown>) {
if (!value || typeof value !== "object") {
throw new Error("Expected record");
@@ -149,6 +171,39 @@ describe("Google speech provider", () => {
expect(transcodeAudioBufferToOpusMock).not.toHaveBeenCalled();
});
it("bounds oversized Gemini TTS success JSON responses and cancels the stream", async () => {
let cancelCount = 0;
const release = vi.fn(async () => {});
postJsonRequestMock
.mockResolvedValueOnce({
response: oversizedGoogleTtsJsonResponse(() => {
cancelCount += 1;
}),
release,
})
.mockResolvedValueOnce({
response: oversizedGoogleTtsJsonResponse(() => {
cancelCount += 1;
}),
release,
});
const provider = buildGoogleSpeechProvider();
await expect(
provider.synthesize({
text: "oversized tts response",
cfg: {},
providerConfig: {
apiKey: "google-test-key",
},
target: "audio-file",
timeoutMs: 12_000,
}),
).rejects.toThrow("Google TTS response: JSON response exceeds 16777216 bytes");
expect(cancelCount).toBe(2);
expect(release).toHaveBeenCalledTimes(2);
});
it("transcodes Gemini PCM to Opus for voice-note targets", async () => {
installGoogleTtsRequestMock(Buffer.from([5, 0, 6, 0]));
transcodeAudioBufferToOpusMock.mockResolvedValueOnce(Buffer.from("google-opus"));

View File

@@ -3,6 +3,7 @@ import { transcodeAudioBufferToOpus } from "openclaw/plugin-sdk/media-runtime";
import {
assertOkOrThrowProviderError,
postJsonRequest,
readProviderJsonResponse,
sanitizeConfiguredModelProviderRequest,
} from "openclaw/plugin-sdk/provider-http";
import type { OpenClawConfig } from "openclaw/plugin-sdk/provider-onboard";
@@ -503,7 +504,11 @@ async function synthesizeGoogleTtsPcmOnce(params: {
}
}
try {
return extractGoogleSpeechPcm((await res.json()) as GoogleGenerateSpeechResponse);
const payload = await readProviderJsonResponse<GoogleGenerateSpeechResponse>(
res,
"Google TTS response",
);
return extractGoogleSpeechPcm(payload);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
throw new GoogleTtsRetryableError(message);

View File

@@ -476,6 +476,45 @@ describe("google transport stream", () => {
expect(result.content[2]).toHaveProperty("thoughtSignature", "Y2FsbF9zaWdfMQ==");
});
it("preserves MAX_TOKENS when the partial response contains a function call", async () => {
guardedFetchMock.mockResolvedValueOnce(
buildSseResponse([
{
candidates: [
{
content: {
parts: [{ functionCall: { name: "lookup", args: { q: "hello" } } }],
},
finishReason: "MAX_TOKENS",
},
],
},
]),
);
const streamFn = createGoogleGenerativeAiTransportStreamFn();
const stream = await Promise.resolve(
streamFn(
buildGeminiModel(),
{
messages: [{ role: "user", content: "hello", timestamp: 0 }],
tools: [
{
name: "lookup",
description: "Look up a value",
parameters: { type: "object" },
},
],
} as Parameters<typeof streamFn>[1],
{ apiKey: "gemini-api-key" } as Parameters<typeof streamFn>[2],
),
);
const result = await stream.result();
expect(result.stopReason).toBe("length");
expect(result.content).toEqual([expect.objectContaining({ type: "toolCall", name: "lookup" })]);
});
it("strips redundant google provider prefixes from Gemini API model paths", async () => {
guardedFetchMock.mockResolvedValueOnce(buildSseResponse([]));

View File

@@ -1404,7 +1404,12 @@ function createGoogleTransportStreamFn(kind: CanonicalGoogleTransportApi): Strea
}
if (typeof candidate?.finishReason === "string") {
output.stopReason = mapStopReasonString(candidate.finishReason);
if (output.content.some((block) => block.type === "toolCall")) {
// MAX_TOKENS can leave a complete-looking partial call. Only a normal
// Google stop may promote parsed calls into an executable tool-use turn.
if (
output.stopReason === "stop" &&
output.content.some((block) => block.type === "toolCall")
) {
output.stopReason = "toolUse";
}
}

View File

@@ -110,6 +110,45 @@ function createDeferred<T>(): {
return { promise, reject, resolve };
}
type CardPayloadWithTextWidgets = {
cardsV2: Array<{
card: {
sections?: Array<{
header?: string;
widgets?: Array<{ textParagraph?: { text: string } }>;
}>;
};
}>;
};
function getTextParagraphText(payload: unknown, header: string): string {
const text = (payload as CardPayloadWithTextWidgets).cardsV2[0]?.card.sections?.find(
(section) => section.header === header,
)?.widgets?.[0]?.textParagraph?.text;
if (typeof text !== "string") {
throw new Error(`Expected ${header} text paragraph`);
}
return text;
}
function isUtf16WellFormed(value: string): boolean {
for (let index = 0; index < value.length; index += 1) {
const codeUnit = value.charCodeAt(index);
if (codeUnit >= 0xd800 && codeUnit <= 0xdbff) {
const nextCodeUnit = index + 1 < value.length ? value.charCodeAt(index + 1) : -1;
if (nextCodeUnit < 0xdc00 || nextCodeUnit > 0xdfff) {
return false;
}
index += 1;
continue;
}
if (codeUnit >= 0xdc00 && codeUnit <= 0xdfff) {
return false;
}
}
return true;
}
describe("googleChatApprovalNativeRuntime", () => {
async function preparePendingDelivery(view = createPendingView()) {
const nowMs = Date.now();
@@ -149,6 +188,31 @@ describe("googleChatApprovalNativeRuntime", () => {
return { pendingPayload, plannedTarget, prepared, request, view };
}
it("keeps truncated pending command card text UTF-16 well formed", async () => {
const view = createPendingView();
view.commandText = `${"a".repeat(1796)}😀${"b".repeat(100)}`;
const { pendingPayload } = await preparePendingDelivery(view);
const commandText = getTextParagraphText(pendingPayload, "Command");
expect(commandText.length).toBeLessThanOrEqual(1800);
expect(commandText.endsWith("...")).toBe(true);
expect(isUtf16WellFormed(commandText)).toBe(true);
expect(JSON.stringify(pendingPayload.cardsV2)).not.toContain("\\ud83d");
});
it("preserves a complete astral character when it fits before the truncation suffix", async () => {
const view = createPendingView();
view.commandText = `${"a".repeat(1795)}😀${"b".repeat(100)}`;
const { pendingPayload } = await preparePendingDelivery(view);
const commandText = getTextParagraphText(pendingPayload, "Command");
expect(commandText).toBe(`${"a".repeat(1795)}😀...`);
expect(commandText.length).toBe(1800);
expect(isUtf16WellFormed(commandText)).toBe(true);
});
it("sends pending cards and updates the delivered message without buttons", async () => {
sendGoogleChatMessage.mockResolvedValue({ messageName: "spaces/AAA/messages/msg-1" });
updateGoogleChatMessage.mockResolvedValue({ messageName: "spaces/AAA/messages/msg-1" });

View File

@@ -9,6 +9,7 @@ import { buildChannelApprovalNativeTargetKey } from "openclaw/plugin-sdk/approva
import type { ExecApprovalDecision } from "openclaw/plugin-sdk/approval-runtime";
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
import { resolveGoogleChatAccount, type ResolvedGoogleChatAccount } from "./accounts.js";
import { sendGoogleChatMessage, updateGoogleChatMessage } from "./api.js";
import {
@@ -87,7 +88,7 @@ function escapeGoogleChatText(text: string): string {
}
function truncateText(text: string, maxChars = MAX_TEXT_PARAGRAPH_CHARS): string {
return text.length <= maxChars ? text : `${text.slice(0, maxChars - 3)}...`;
return text.length <= maxChars ? text : `${truncateUtf16Safe(text, maxChars - 3)}...`;
}
function buildMetadataText(metadata: readonly { label: string; value: string }[]): string {

View File

@@ -15,3 +15,21 @@ describe("irc outbound chunking", () => {
expect(ircOutboundBaseAdapter.textChunkLimit).toBe(350);
});
});
describe("irc outbound sanitizeText", () => {
afterEach(() => {
clearIrcRuntime();
});
it("strips internal tool-trace banners before outbound delivery", () => {
const text = "Done.\n⚠ 🛠️ `search repos (agent)` failed";
expect(ircOutboundBaseAdapter.sanitizeText({ text })).toBe("Done.");
});
it("preserves ordinary assistant prose while sanitizing", () => {
const text = "The pipeline has 3 open deals.";
expect(ircOutboundBaseAdapter.sanitizeText({ text })).toBe(text);
});
});

View File

@@ -1,5 +1,6 @@
// Irc plugin module implements outbound base behavior.
import { sanitizeForPlainText } from "openclaw/plugin-sdk/channel-outbound";
import { sanitizeAssistantVisibleText } from "openclaw/plugin-sdk/text-chunking";
import { chunkTextForOutbound } from "./channel-api.js";
export const ircOutboundBaseAdapter = {
@@ -7,5 +8,9 @@ export const ircOutboundBaseAdapter = {
chunker: chunkTextForOutbound,
chunkerMode: "markdown" as const,
textChunkLimit: 350,
sanitizeText: ({ text }: { text: string }) => sanitizeForPlainText(text),
// IRC's plain-text pass does not remove assistant scaffolding. Run the
// canonical delivery sanitizer first so internal tool traces are dropped
// before channel formatting.
sanitizeText: ({ text }: { text: string }) =>
sanitizeForPlainText(sanitizeAssistantVisibleText(text)),
};

View File

@@ -1,5 +1,6 @@
// Msteams plugin module implements feedback reflection prompt behavior.
import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/string-coerce-runtime";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
/** Max chars of the thumbed-down response to include in the reflection prompt. */
const MAX_RESPONSE_CHARS = 500;
@@ -19,7 +20,7 @@ export function buildReflectionPrompt(params: {
if (params.thumbedDownResponse) {
const truncated =
params.thumbedDownResponse.length > MAX_RESPONSE_CHARS
? `${params.thumbedDownResponse.slice(0, MAX_RESPONSE_CHARS)}...`
? `${truncateUtf16Safe(params.thumbedDownResponse, MAX_RESPONSE_CHARS)}...`
: params.thumbedDownResponse;
parts.push(`\nYour response was:\n> ${truncated}`);
}

View File

@@ -19,6 +19,11 @@ import { msteamsRuntimeStub } from "./test-support/runtime.js";
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
// Matches an unpaired UTF-16 surrogate (lone high or lone low), without relying
// on the ES2024 String.prototype.isWellFormed() runtime API.
const UNPAIRED_SURROGATE_RE =
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/;
describe("buildFeedbackEvent", () => {
it("builds a well-formed custom event", () => {
const event = buildFeedbackEvent({
@@ -73,6 +78,26 @@ describe("buildReflectionPrompt", () => {
expect(prompt.length).toBeLessThan(longResponse.length + 500);
});
it("does not split UTF-16 surrogate pairs when truncating a thumbed-down response", () => {
const thumbedDownResponse = `${"a".repeat(499)}🦞${"b".repeat(20)}`;
const prompt = buildReflectionPrompt({ thumbedDownResponse });
expect(prompt).not.toMatch(UNPAIRED_SURROGATE_RE);
expect(prompt).toContain(`${"a".repeat(499)}...`);
expect(prompt).not.toContain("\ud83e");
expect(prompt).not.toContain("\udd9e");
});
it("keeps a boundary emoji when it fully fits before the truncation cap", () => {
const thumbedDownResponse = `${"a".repeat(498)}🦞${"b".repeat(20)}`;
const prompt = buildReflectionPrompt({ thumbedDownResponse });
expect(prompt).not.toMatch(UNPAIRED_SURROGATE_RE);
expect(prompt).toContain(`${"a".repeat(498)}🦞...`);
});
it("includes user comment when provided", () => {
const prompt = buildReflectionPrompt({
thumbedDownResponse: "Some response",

View File

@@ -10,6 +10,11 @@ import {
summarizeParentMessage,
} from "./thread-parent-context.js";
// Matches an unpaired UTF-16 surrogate (lone high or lone low), without relying
// on the ES2024 String.prototype.isWellFormed() runtime API.
const UNPAIRED_SURROGATE_RE =
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/;
describe("summarizeParentMessage", () => {
it("returns undefined for missing message", () => {
expect(summarizeParentMessage(undefined)).toBeUndefined();
@@ -81,6 +86,20 @@ describe("summarizeParentMessage", () => {
expect(summary?.text.length).toBeLessThanOrEqual(400);
expect(summary?.text.endsWith("…")).toBe(true);
});
it("keeps truncated parent text well-formed when truncating surrogate pairs", () => {
const msg: GraphThreadMessage = {
id: "p1",
from: { user: { displayName: "Dana" } },
body: { content: `${"a".repeat(398)}🦞${"b".repeat(50)}`, contentType: "text" },
};
const summary = summarizeParentMessage(msg);
expect(summary?.text).not.toMatch(UNPAIRED_SURROGATE_RE);
expect(summary?.text).toBe(`${"a".repeat(398)}`);
expect(summary?.text.endsWith("\ud83e…")).toBe(false);
});
});
describe("formatParentContextEvent", () => {

View File

@@ -17,6 +17,7 @@ import {
asDateTimestampMs,
resolveExpiresAtMsFromDurationMs,
} from "openclaw/plugin-sdk/number-runtime";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
import { fetchChannelMessage, stripHtmlFromTeamsMessage } from "./graph-thread.js";
import type { GraphThreadMessage } from "./graph-thread.js";
@@ -138,7 +139,9 @@ export function summarizeParentMessage(
return {
sender,
text:
text.length > PARENT_TEXT_MAX_CHARS ? `${text.slice(0, PARENT_TEXT_MAX_CHARS - 1)}` : text,
text.length > PARENT_TEXT_MAX_CHARS
? `${truncateUtf16Safe(text, PARENT_TEXT_MAX_CHARS - 1)}`
: text,
};
}

View File

@@ -232,6 +232,55 @@ describe("SeenTracker", () => {
tracker.stop();
vi.useRealTimers();
});
it.each([-1, 0])("falls back to default TTL for non-positive ttlMs %s", (ttlMs) => {
vi.useFakeTimers();
const tracker = createTracker({ ttlMs, pruneIntervalMs: 10 * 60 * 1000 });
try {
tracker.add("id1");
vi.advanceTimersByTime(1);
expect(tracker.peek("id1")).toBe(true);
} finally {
tracker.stop();
vi.useRealTimers();
}
});
it("falls back to default TTL for infinite ttlMs", () => {
vi.useFakeTimers();
const tracker = createTracker({
ttlMs: Number.POSITIVE_INFINITY,
pruneIntervalMs: 10 * 60 * 1000,
});
try {
tracker.add("id1");
vi.advanceTimersByTime(60 * 60 * 1000 + 1);
expect(tracker.peek("id1")).toBe(false);
} finally {
tracker.stop();
vi.useRealTimers();
}
});
it.each([-1, 0, Number.POSITIVE_INFINITY])(
"uses the default prune interval for unsafe pruneIntervalMs %s",
(pruneIntervalMs) => {
vi.useFakeTimers();
const setIntervalSpy = vi.spyOn(globalThis, "setInterval");
const tracker = createTracker({ pruneIntervalMs });
try {
expect(setIntervalSpy).toHaveBeenCalledTimes(1);
expect(setIntervalSpy.mock.calls[0]?.[1]).toBe(10 * 60 * 1000);
} finally {
tracker.stop();
setIntervalSpy.mockRestore();
vi.useRealTimers();
}
},
);
});
});

View File

@@ -3,7 +3,10 @@
* Prevents unbounded memory growth under high load or abuse.
*/
import { resolveIntegerOption } from "openclaw/plugin-sdk/number-runtime";
import {
resolveIntegerOption,
resolvePositiveTimerTimeoutMs,
} from "openclaw/plugin-sdk/number-runtime";
interface SeenTrackerOptions {
/** Maximum number of entries to track (default: 100,000) */
@@ -45,8 +48,8 @@ interface Entry {
*/
export function createSeenTracker(options?: SeenTrackerOptions): SeenTracker {
const maxEntries = resolveIntegerOption(options?.maxEntries, 100_000, { min: 1 });
const ttlMs = options?.ttlMs ?? 60 * 60 * 1000; // 1 hour
const pruneIntervalMs = options?.pruneIntervalMs ?? 10 * 60 * 1000; // 10 minutes
const ttlMs = resolvePositiveTimerTimeoutMs(options?.ttlMs, 60 * 60 * 1000);
const pruneIntervalMs = resolvePositiveTimerTimeoutMs(options?.pruneIntervalMs, 10 * 60 * 1000);
// Main storage
const entries = new Map<string, Entry>();

View File

@@ -98,13 +98,13 @@ describe("buildAssistantMessage", () => {
expect(msg.stopReason).toBe("length");
});
it("keeps tool use authoritative over a length stop", () => {
it("keeps a length stop authoritative over complete-looking tool calls", () => {
const response = makeOllamaResponse({
done_reason: "length",
tool_calls: [{ function: { name: "read", arguments: { path: "README.md" } } }],
});
const msg = buildAssistantMessage(response, MODEL_INFO);
expect(msg.stopReason).toBe("toolUse");
expect(msg.stopReason).toBe("length");
});
});
@@ -282,6 +282,32 @@ describe("createOllamaStreamFn thinking events", () => {
expect(done.message?.stopReason).toBe("length");
});
it("preserves a native length stop when the partial response contains tool calls", async () => {
const events = await streamOllamaEvents(
[
makeOllamaResponse({
done_reason: "length",
tool_calls: [{ function: { name: "read", arguments: { path: "README.md" } } }],
}),
],
{},
{
messages: [{ role: "user", content: "test" }],
tools: [{ name: "read", description: "Read files", parameters: { type: "object" } }],
} as never,
);
const done = events.find((event) => event.type === "done") as {
reason?: string;
message?: { content?: Array<Record<string, unknown>>; stopReason?: string };
};
expect(done.reason).toBe("length");
expect(done.message?.stopReason).toBe("length");
expect(done.message?.content).toEqual([
expect.objectContaining({ type: "toolCall", name: "read" }),
]);
});
it("uses generic stream timeout for Ollama request timeout", async () => {
await streamOllamaEvents([makeOllamaResponse({ content: "ok" })], { timeoutMs: 2500 });

View File

@@ -656,10 +656,15 @@ function estimateTokensFromChars(chars: number): number {
}
function resolveOllamaStopReason(response: OllamaChatResponse) {
// Ollama's length terminal means generation hit its token limit, even when
// the partial response already contains a complete-looking tool call.
if (response.done_reason === "length") {
return "length" as const;
}
if (response.message.tool_calls?.length) {
return "toolUse" as const;
}
return response.done_reason === "length" ? ("length" as const) : ("stop" as const);
return "stop" as const;
}
function estimateOllamaPromptTokens(params: {

View File

@@ -68,4 +68,30 @@ describe("qa live timeout policy", () => {
),
).toBe(240_000);
});
it("uses the anthropic floor for claude-cli sonnet turns", () => {
expect(
resolveQaLiveTurnTimeoutMs(
{
providerMode: "live-frontier",
primaryModel: "claude-cli/claude-sonnet-4-6",
alternateModel: "claude-cli/claude-opus-4-8",
},
30_000,
),
).toBe(180_000);
});
it("uses the opus floor for claude-cli opus turns", () => {
expect(
resolveQaLiveTurnTimeoutMs(
{
providerMode: "live-frontier",
primaryModel: "claude-cli/claude-opus-4-8",
alternateModel: "claude-cli/claude-opus-4-8",
},
30_000,
),
).toBe(240_000);
});
});

View File

@@ -9,6 +9,13 @@ function isAnthropicModel(modelRef: string) {
return modelRef.startsWith("anthropic/");
}
// claude-cli is an Anthropic-backed Claude runtime, so it shares the Anthropic
// turn-timeout floors; mirror the claude-cli==anthropic precedent in the aimock
// and mock-openai servers.
function isAnthropicFamilyModel(modelRef: string) {
return isAnthropicModel(modelRef) || modelRef.startsWith("claude-cli/");
}
function isQaFastModeModelRef(modelRef: string) {
return isOpenAiModel(modelRef);
}
@@ -18,7 +25,7 @@ function isGptFiveModel(modelRef: string) {
}
function isClaudeOpusModel(modelRef: string) {
return isAnthropicModel(modelRef) && modelRef.includes("claude-opus");
return isAnthropicFamilyModel(modelRef) && modelRef.includes("claude-opus");
}
export const liveFrontierProviderDefinition: QaProviderDefinition = {
@@ -39,7 +46,7 @@ export const liveFrontierProviderDefinition: QaProviderDefinition = {
if (isClaudeOpusModel(modelRef)) {
return Math.max(fallbackMs, 240_000);
}
if (isAnthropicModel(modelRef)) {
if (isAnthropicFamilyModel(modelRef)) {
return Math.max(fallbackMs, 180_000);
}
if (isGptFiveModel(modelRef)) {

View File

@@ -97,11 +97,45 @@ describe("engine/tools/remind-logic", () => {
expect(generateJobName("drink water")).toBe("Reminder: drink water");
});
it("truncates long content", () => {
const long = "a very long reminder content that exceeds twenty characters";
const name = generateJobName(long);
expect(name.length).toBeLessThan(40);
expect(name).toContain("…");
it("truncates long content to a 20 UTF-16-unit budget with an ellipsis", () => {
expect(generateJobName("a very long reminder content")).toBe(
"Reminder: a very long reminder…",
);
});
it("keeps an exactly-fitting all-emoji content unchanged", () => {
// 10 emoji = 20 UTF-16 units, exactly at the budget, so no truncation.
expect(generateJobName("😀".repeat(10))).toBe(`Reminder: ${"😀".repeat(10)}`);
});
it("does not split surrogate pairs when truncating", () => {
const hasLoneSurrogate = (value: string): boolean => {
for (let index = 0; index < value.length; index++) {
const code = value.charCodeAt(index);
if (code >= 0xd800 && code <= 0xdbff) {
const next = value.charCodeAt(index + 1);
if (!(next >= 0xdc00 && next <= 0xdfff)) {
return true;
}
index++;
} else if (code >= 0xdc00 && code <= 0xdfff) {
return true;
}
}
return false;
};
// 11 emoji = 22 UTF-16 units > 20; the 11th emoji straddles the cap and is
// dropped whole rather than split into a lone surrogate.
const allEmoji = generateJobName("😀".repeat(11));
expect(allEmoji).toBe(`Reminder: ${"😀".repeat(10)}`);
expect(hasLoneSurrogate(allEmoji)).toBe(false);
// 19 ASCII + emoji: the emoji's high surrogate would land at unit 20, so the
// whole pair is dropped to stay within the 20-unit budget.
const name = generateJobName(`${"x".repeat(19)}😀tail`);
expect(name).toBe(`Reminder: ${"x".repeat(19)}`);
expect(hasLoneSurrogate(name)).toBe(false);
});
});

View File

@@ -1,5 +1,6 @@
// Qqbot plugin module implements remind logic behavior.
import { resolveExpiresAtMsFromDurationMs } from "openclaw/plugin-sdk/number-runtime";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
/**
* QQBot reminder tool core logic.
@@ -171,7 +172,7 @@ export function isCronExpression(timeStr: string): boolean {
*/
export function generateJobName(content: string): string {
const trimmed = content.trim();
const short = trimmed.length > 20 ? `${trimmed.slice(0, 20)}` : trimmed;
const short = trimmed.length > 20 ? `${truncateUtf16Safe(trimmed, 20)}` : trimmed;
return `Reminder: ${short}`;
}

View File

@@ -1,8 +1,8 @@
// Qqbot tests cover stt plugin behavior.
import * as fs from "node:fs";
import * as path from "node:path";
import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "openclaw/plugin-sdk/test-env";
import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const ssrfRuntimeMocks = vi.hoisted(() => ({
fetchWithSsrFGuard: vi.fn(),
@@ -41,6 +41,36 @@ function cancelTrackedResponse(
};
}
function largeTranscriptionJsonResponse(params: { chunkCount: number; chunkSize: number }): {
response: Response;
getReadCount: () => number;
} {
let chunkIndex = 0;
const encoder = new TextEncoder();
const chunks = [
'{"text":"',
...Array.from({ length: params.chunkCount }, () => "a".repeat(params.chunkSize)),
'"}',
];
const stream = new ReadableStream<Uint8Array>({
pull(controller) {
if (chunkIndex >= chunks.length) {
controller.close();
return;
}
controller.enqueue(encoder.encode(chunks[chunkIndex]));
chunkIndex += 1;
},
});
return {
response: new Response(stream, {
status: 200,
headers: { "content-type": "application/json" },
}),
getReadCount: () => chunkIndex,
};
}
function requireFirstSsrfRequest(): {
url?: unknown;
auditContext?: unknown;
@@ -177,6 +207,44 @@ describe("engine/utils/stt", () => {
});
});
it("bounds successful STT JSON responses before parsing", async () => {
await withTempDir("openclaw-qqbot-stt-success-limit-", async (tmpDir) => {
const audioPath = path.join(tmpDir, "voice.wav");
fs.writeFileSync(audioPath, Buffer.from([1, 2, 3, 4]));
const release = vi.fn(async () => {});
const streamed = largeTranscriptionJsonResponse({
chunkCount: 18,
chunkSize: 1024 * 1024,
});
ssrfRuntimeMocks.fetchWithSsrFGuard.mockResolvedValueOnce({
response: streamed.response,
release,
});
let error: unknown;
try {
await transcribeAudio(audioPath, {
channels: {
qqbot: {
stt: {
baseUrl: "https://api.example.test/v1/",
apiKey: "secret",
model: "whisper-1",
},
},
},
});
} catch (caught) {
error = caught;
}
expect(String(error)).toContain("qqbot.stt: JSON response exceeds 16777216 bytes");
expect(streamed.getReadCount()).toBeLessThan(20);
expect(release).toHaveBeenCalledTimes(1);
});
});
it("bounds STT error bodies without using response.text()", async () => {
await withTempDir("openclaw-qqbot-stt-error-", async (tmpDir) => {
const audioPath = path.join(tmpDir, "voice.wav");

View File

@@ -8,7 +8,10 @@
import * as fs from "node:fs";
import path from "node:path";
import { mimeTypeFromFilePath } from "openclaw/plugin-sdk/media-mime";
import { readResponseTextLimited } from "openclaw/plugin-sdk/provider-http";
import {
readProviderJsonResponse,
readResponseTextLimited,
} from "openclaw/plugin-sdk/provider-http";
import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime";
import {
normalizeOptionalString,
@@ -100,7 +103,7 @@ export async function transcribeAudio(
throw new Error(`STT failed (HTTP ${resp.status}): ${detail.slice(0, 300)}`);
}
const result = (await resp.json()) as { text?: string };
const result = await readProviderJsonResponse<{ text?: string }>(resp, "qqbot.stt");
return normalizeOptionalString(result.text) ?? null;
} finally {
await release();

View File

@@ -6,7 +6,6 @@ import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { z } from "zod";
import { resolveSlackAccount } from "./accounts.js";
import { validateSlackBlocksArray } from "./blocks-input.js";
import { createSlackApiUrlClientOptions } from "./client-options.js";
import { createSlackWebClient, getSlackWriteClient } from "./client.js";
import { buildSlackEditTextPayload } from "./edit-text.js";
import { resolveSlackMedia } from "./monitor/media.js";
@@ -72,22 +71,6 @@ function resolveToken(explicit?: string, accountId?: string, cfg?: OpenClawConfi
return token;
}
function resolveSlackActionClientOptions(opts: SlackActionClientOpts) {
if (!opts.cfg) {
return createSlackApiUrlClientOptions();
}
const cfg = requireRuntimeConfig(opts.cfg, "Slack actions");
resolveSlackAccount({ cfg, accountId: opts.accountId });
return createSlackApiUrlClientOptions();
}
function slackActionClientOptionArgs(
opts: SlackActionClientOpts,
): [] | [ReturnType<typeof createSlackApiUrlClientOptions>] {
const clientOptions = resolveSlackActionClientOptions(opts);
return clientOptions.slackApiUrl ? [clientOptions] : [];
}
function normalizeEmoji(raw: string) {
const trimmed = raw.trim();
if (!trimmed) {
@@ -148,10 +131,7 @@ async function getClient(opts: SlackActionClientOpts = {}, mode: "read" | "write
return opts.client;
}
const token = resolveToken(opts.token, opts.accountId, opts.cfg);
const clientOptionArgs = slackActionClientOptionArgs(opts);
return mode === "write"
? getSlackWriteClient(token, ...clientOptionArgs)
: createSlackWebClient(token, ...clientOptionArgs);
return mode === "write" ? getSlackWriteClient(token) : createSlackWebClient(token);
}
async function resolveBotUserId(client: WebClient) {

View File

@@ -33,11 +33,190 @@ function readChatUpdatePayload(
return payload as ChatUpdatePayload;
}
const UNPAIRED_SURROGATE_RE =
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/;
function readMrkdwnTexts(blocks: unknown): string[] {
if (!Array.isArray(blocks)) {
return [];
}
const texts: string[] = [];
for (const block of blocks) {
if (!block || typeof block !== "object") {
continue;
}
const text = (block as { text?: unknown }).text;
if (
text &&
typeof text === "object" &&
(text as { type?: unknown }).type === "mrkdwn" &&
typeof (text as { text?: unknown }).text === "string"
) {
texts.push((text as { text: string }).text);
}
const elements = (block as { elements?: unknown }).elements;
if (!Array.isArray(elements)) {
continue;
}
for (const element of elements) {
if (
element &&
typeof element === "object" &&
(element as { type?: unknown }).type === "mrkdwn" &&
typeof (element as { text?: unknown }).text === "string"
) {
texts.push((element as { text: string }).text);
}
}
}
return texts;
}
function findApprovalMrkdwn(payload: SlackPayload, prefix: string): string {
const text = readMrkdwnTexts(payload.blocks).find((entry) => entry.startsWith(prefix));
if (!text) {
throw new Error(`Expected Slack mrkdwn block starting with ${prefix}`);
}
return text;
}
describe("slackApprovalNativeRuntime", () => {
it("subscribes to plugin approval events", () => {
expect(slackApprovalNativeRuntime.eventKinds).toEqual(["exec", "plugin"]);
});
it("does not leave dangling surrogates when truncating exec approval command mrkdwn", async () => {
const commandText = `${"a".repeat(2598)}😀tail`;
const payload = (await slackApprovalNativeRuntime.presentation.buildPendingPayload({
cfg: {} as never,
accountId: "default",
context: {
app: {} as never,
config: {} as never,
},
request: {
id: "req-surrogate",
request: {
command: commandText,
},
createdAtMs: 0,
expiresAtMs: 60_000,
},
approvalKind: "exec",
nowMs: 0,
view: {
approvalKind: "exec",
approvalId: "req-surrogate",
commandText,
metadata: [],
actions: [
{
decision: "allow-once",
label: "Allow Once",
command: "/approve req-surrogate allow-once",
style: "success",
},
],
} as never,
})) as SlackPayload;
const commandMrkdwn = findApprovalMrkdwn(payload, "*Command*");
expect(commandMrkdwn).toMatch(/…\n```$/);
expect(UNPAIRED_SURROGATE_RE.test(commandMrkdwn)).toBe(false);
});
it("does not leave dangling surrogates when truncating plugin approval request mrkdwn", async () => {
const title = `${"a".repeat(2598)}😀tail`;
const payload = (await slackApprovalNativeRuntime.presentation.buildPendingPayload({
cfg: {} as never,
accountId: "default",
context: {
app: {} as never,
config: {} as never,
},
request: {
id: "plugin:req-surrogate",
request: {
title,
description: "Needs approval.",
},
createdAtMs: 0,
expiresAtMs: 60_000,
},
approvalKind: "plugin",
nowMs: 0,
view: {
approvalKind: "plugin",
phase: "pending",
approvalId: "plugin:req-surrogate",
title,
description: "Needs approval.",
severity: "warning",
pluginId: "test-plugin",
toolName: "test-tool",
metadata: [],
actions: [
{
decision: "deny",
label: "Deny",
command: "/approve plugin:req-surrogate deny",
style: "danger",
},
],
expiresAtMs: 60_000,
} as never,
})) as SlackPayload;
const requestMrkdwn = findApprovalMrkdwn(payload, "*Request*");
expect(requestMrkdwn).toMatch(/…$/);
expect(UNPAIRED_SURROGATE_RE.test(requestMrkdwn)).toBe(false);
});
it("still truncates plain BMP approval mrkdwn at the Slack approval preview limit", async () => {
const commandText = "b".repeat(2700);
const payload = (await slackApprovalNativeRuntime.presentation.buildPendingPayload({
cfg: {} as never,
accountId: "default",
context: {
app: {} as never,
config: {} as never,
},
request: {
id: "req-bmp",
request: {
command: commandText,
},
createdAtMs: 0,
expiresAtMs: 60_000,
},
approvalKind: "exec",
nowMs: 0,
view: {
approvalKind: "exec",
approvalId: "req-bmp",
commandText,
metadata: [],
actions: [
{
decision: "allow-once",
label: "Allow Once",
command: "/approve req-bmp allow-once",
style: "success",
},
],
} as never,
})) as SlackPayload;
const commandMrkdwn = findApprovalMrkdwn(payload, "*Command*");
expect(commandMrkdwn).toMatch(/…\n```$/);
expect(commandMrkdwn).toContain(`${"b".repeat(2599)}`);
expect(UNPAIRED_SURROGATE_RE.test(commandMrkdwn)).toBe(false);
});
it("renders only the allowed pending actions", async () => {
const payload = (await slackApprovalNativeRuntime.presentation.buildPendingPayload({
cfg: {} as never,

View File

@@ -19,6 +19,7 @@ import { buildApprovalPresentationFromActionDescriptors } from "openclaw/plugin-
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { logError } from "openclaw/plugin-sdk/logging-core";
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
import {
isSlackAnyNativeApprovalClientEnabled,
resolveSlackApprovalKind,
@@ -73,7 +74,14 @@ function resolveHandlerContext(params: ChannelApprovalCapabilityHandlerContext):
}
function truncateSlackMrkdwn(text: string, maxChars: number): string {
return text.length <= maxChars ? text : `${text.slice(0, maxChars - 1)}`;
const limit = Math.max(0, Math.floor(maxChars));
if (text.length <= limit) {
return text;
}
if (limit <= 1) {
return truncateUtf16Safe(text, limit);
}
return `${truncateUtf16Safe(text, limit - 1)}`;
}
function buildSlackCodeBlock(text: string): string {

View File

@@ -4,7 +4,6 @@ import {
normalizeOptionalString,
} from "openclaw/plugin-sdk/string-coerce-runtime";
import { resolveSlackAccount } from "./accounts.js";
import { createSlackApiUrlClientOptions } from "./client-options.js";
import { createSlackWebClient } from "./client.js";
import { normalizeAllowListLower } from "./monitor/allow-list.js";
import type { OpenClawConfig } from "./runtime-api.js";
@@ -77,7 +76,7 @@ export async function resolveSlackConversationInfo(params: {
}
try {
const client = createSlackWebClient(token, createSlackApiUrlClientOptions());
const client = createSlackWebClient(token);
if (isNativeImChannel) {
const opened = await client.conversations.open({
channel: channelId,

View File

@@ -1,6 +1,7 @@
// Slack plugin module implements channel behavior.
import {
buildLegacyDmAccountAllowlistAdapter,
createAccountScopedAllowlistNameResolver,
createFlatAllowlistOverrideResolver,
} from "openclaw/plugin-sdk/allowlist-config-edit";
import { adaptScopedAccountAccessor } from "openclaw/plugin-sdk/channel-config-helpers";
@@ -51,7 +52,6 @@ import {
type OpenClawConfig,
} from "./channel-api.js";
import { resolveSlackChannelType, resolveSlackConversationInfo } from "./channel-type.js";
import { createSlackApiUrlClientOptions, type SlackApiUrlClientOptions } from "./client-options.js";
import { shouldSuppressLocalSlackExecApprovalPrompt } from "./exec-approvals.js";
import { resolveSlackGroupRequireMention, resolveSlackGroupToolPolicy } from "./group-policy.js";
import {
@@ -405,40 +405,19 @@ function formatSlackScopeDiagnostic(params: {
} as const;
}
function slackApiUrlOptionArgs(): [] | [SlackApiUrlClientOptions] {
const options = createSlackApiUrlClientOptions();
return options.slackApiUrl ? [options] : [];
}
const resolveSlackAllowlistGroupOverrides = createFlatAllowlistOverrideResolver({
resolveRecord: (account: ResolvedSlackAccount) => account.channels,
label: (key) => key,
resolveEntries: (value) => value?.users,
});
const resolveSlackAllowlistNames = async ({
accountId,
cfg,
entries,
}: {
accountId?: string | null;
cfg: OpenClawConfig;
entries: string[];
}) => {
const account = resolveSlackAccount({ cfg, accountId });
const token =
normalizeOptionalString(account.userToken) ?? normalizeOptionalString(account.botToken);
if (!token) {
return [];
}
return await (
await loadSlackResolveUsersModule()
).resolveSlackUserAllowlist({
token,
entries,
...createSlackApiUrlClientOptions(),
});
};
const resolveSlackAllowlistNames = createAccountScopedAllowlistNameResolver({
resolveAccount: resolveSlackAccount,
resolveToken: (account: ResolvedSlackAccount) =>
normalizeOptionalString(account.userToken) ?? normalizeOptionalString(account.botToken),
resolveNames: async ({ token, entries }) =>
(await loadSlackResolveUsersModule()).resolveSlackUserAllowlist({ token, entries }),
});
const slackChannelOutbound: ChannelOutboundAdapter = {
deliveryMode: "direct",
@@ -675,7 +654,6 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount, SlackProbe> = crea
(await loadSlackResolveChannelsModule()).resolveSlackChannelAllowlist({
token,
entries: inputsValue,
...createSlackApiUrlClientOptions(),
}),
mapResolved: (entry) =>
toResolvedTarget(entry, entry.archived ? "archived" : undefined),
@@ -683,14 +661,14 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount, SlackProbe> = crea
}
return resolveTargetsWithOptionalToken({
token:
normalizeOptionalString(account.userToken) ?? normalizeOptionalString(account.botToken),
normalizeOptionalString(account.userToken) ??
normalizeOptionalString(account.botToken),
inputs,
missingTokenNote: "missing Slack token",
resolveWithToken: async ({ token, inputs: inputsLocal }) =>
(await loadSlackResolveUsersModule()).resolveSlackUserAllowlist({
token,
entries: inputsLocal,
...createSlackApiUrlClientOptions(),
}),
mapResolved: (entry) => toResolvedTarget(entry, entry.note),
});
@@ -717,9 +695,7 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount, SlackProbe> = crea
if (!token) {
return { ok: false, error: "missing token" };
}
return await (
await loadSlackProbeModule()
).probeSlack(token, timeoutMs, ...slackApiUrlOptionArgs());
return await (await loadSlackProbeModule()).probeSlack(token, timeoutMs);
},
formatCapabilitiesProbe: ({ probe }) => {
const slackProbe = probe as SlackProbe | undefined;
@@ -739,14 +715,13 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount, SlackProbe> = crea
const botToken = account.botToken?.trim();
const userToken = account.userToken?.trim();
const { fetchSlackScopes } = await loadSlackScopesModule();
const apiUrlOptionArgs = slackApiUrlOptionArgs();
const botScopes: SlackScopesResultShape = botToken
? await fetchSlackScopes(botToken, timeoutMs, ...apiUrlOptionArgs)
? await fetchSlackScopes(botToken, timeoutMs)
: { ok: false, error: "Slack bot token missing." };
lines.push(formatSlackScopeDiagnostic({ tokenType: "bot", result: botScopes }));
details.botScopes = botScopes;
if (userToken) {
const userScopes = await fetchSlackScopes(userToken, timeoutMs, ...apiUrlOptionArgs);
const userScopes = await fetchSlackScopes(userToken, timeoutMs);
lines.push(formatSlackScopeDiagnostic({ tokenType: "user", result: userScopes }));
details.userScopes = userScopes;
}

View File

@@ -3,8 +3,6 @@ import type { Agent } from "node:http";
import type { RetryOptions, WebClientOptions } from "@slack/web-api";
import { createNodeProxyAgent } from "openclaw/plugin-sdk/fetch-runtime";
export type SlackApiUrlClientOptions = Pick<WebClientOptions, "slackApiUrl">;
export const SLACK_DEFAULT_RETRY_OPTIONS: RetryOptions = {
retries: 2,
factor: 2,
@@ -32,11 +30,12 @@ export const SLACK_WRITE_RETRY_OPTIONS: RetryOptions = {
* Returns `undefined` when no proxy env var is configured or when Slack hosts
* are excluded by `NO_PROXY`.
*/
function resolveSlackProxyAgent(targetUrl: string): Agent | undefined {
function resolveSlackProxyAgent(): Agent | undefined {
try {
return createNodeProxyAgent({
mode: "env",
targetUrl,
targetUrl: "https://slack.com/",
protocol: "https",
});
} catch {
// Malformed proxy URL; degrade gracefully to direct connection.
@@ -44,38 +43,19 @@ function resolveSlackProxyAgent(targetUrl: string): Agent | undefined {
}
}
function resolveSlackApiUrlFromOptions(
options: Pick<WebClientOptions, "slackApiUrl">,
): string | undefined {
const explicit = options.slackApiUrl?.trim();
const envDefault = process.env.SLACK_API_URL?.trim();
return explicit || envDefault || undefined;
}
export function createSlackApiUrlClientOptions(): SlackApiUrlClientOptions {
const slackApiUrl = process.env.SLACK_API_URL?.trim();
return slackApiUrl ? { slackApiUrl } : {};
}
export function resolveSlackWebClientOptions(options: WebClientOptions = {}): WebClientOptions {
const slackApiUrl = resolveSlackApiUrlFromOptions(options);
const proxyTargetUrl = slackApiUrl ?? "https://slack.com/";
return {
...options,
agent: options.agent ?? resolveSlackProxyAgent(proxyTargetUrl),
agent: options.agent ?? resolveSlackProxyAgent(),
retryConfig: options.retryConfig ?? SLACK_DEFAULT_RETRY_OPTIONS,
...(slackApiUrl ? { slackApiUrl } : {}),
};
}
export function resolveSlackWriteClientOptions(options: WebClientOptions = {}): WebClientOptions {
const slackApiUrl = resolveSlackApiUrlFromOptions(options);
const proxyTargetUrl = slackApiUrl ?? "https://slack.com/";
return {
...options,
agent: options.agent ?? resolveSlackProxyAgent(proxyTargetUrl),
agent: options.agent ?? resolveSlackProxyAgent(),
retryConfig: options.retryConfig ?? SLACK_WRITE_RETRY_OPTIONS,
maxRequestConcurrency: options.maxRequestConcurrency ?? 1,
...(slackApiUrl ? { slackApiUrl } : {}),
};
}

View File

@@ -27,7 +27,6 @@ let SLACK_DEFAULT_RETRY_OPTIONS: typeof import("./client.js").SLACK_DEFAULT_RETR
let SLACK_WRITE_RETRY_OPTIONS: typeof import("./client.js").SLACK_WRITE_RETRY_OPTIONS;
let WebClient: ReturnType<typeof vi.fn>;
const SLACK_API_URL_KEYS = ["SLACK_API_URL", "OPENCLAW_SLACK_API_URL"] as const;
const PROXY_KEYS = [
"HTTPS_PROXY",
"HTTP_PROXY",
@@ -57,22 +56,6 @@ function restoreProxyEnvForTest() {
}
}
function clearSlackApiUrlEnvForTest() {
for (const key of SLACK_API_URL_KEYS) {
delete process.env[key];
}
}
function restoreSlackApiUrlEnvForTest() {
for (const key of SLACK_API_URL_KEYS) {
if (originalEnv[key] !== undefined) {
process.env[key] = originalEnv[key];
} else {
delete process.env[key];
}
}
}
function requireAgent<T extends { agent?: unknown }>(options: T): NonNullable<T["agent"]> {
if (!options.agent) {
throw new Error("expected proxy agent");
@@ -107,11 +90,6 @@ beforeAll(async () => {
beforeEach(() => {
WebClient.mockClear();
clearSlackWriteClientCacheForTest();
clearSlackApiUrlEnvForTest();
});
afterEach(() => {
restoreSlackApiUrlEnvForTest();
});
describe("slack web client config", () => {
@@ -128,40 +106,6 @@ describe("slack web client config", () => {
expect(options.retryConfig).toBe(customRetry);
});
it("uses explicit Slack API URL as the Slack Web API root", () => {
expect(
resolveSlackWebClientOptions({ slackApiUrl: "http://127.0.0.1:49152/api/" }).slackApiUrl,
).toBe("http://127.0.0.1:49152/api/");
expect(
resolveSlackWriteClientOptions({ slackApiUrl: "http://127.0.0.1:49152/api/" }).slackApiUrl,
).toBe("http://127.0.0.1:49152/api/");
});
it("uses SLACK_API_URL as the default Slack Web API root", () => {
process.env.SLACK_API_URL = " http://127.0.0.1:49152/api/ ";
expect(resolveSlackWebClientOptions().slackApiUrl).toBe("http://127.0.0.1:49152/api/");
expect(resolveSlackWriteClientOptions().slackApiUrl).toBe("http://127.0.0.1:49152/api/");
});
it("does not read OPENCLAW_SLACK_API_URL as a default Slack Web API root", () => {
process.env.OPENCLAW_SLACK_API_URL = "http://127.0.0.1:49152/api/";
expect(resolveSlackWebClientOptions().slackApiUrl).toBeUndefined();
expect(resolveSlackWriteClientOptions().slackApiUrl).toBeUndefined();
});
it("prefers explicit Slack API URL over SLACK_API_URL", () => {
process.env.SLACK_API_URL = "http://127.0.0.1:49152/api/";
expect(
resolveSlackWebClientOptions({ slackApiUrl: "http://127.0.0.1:49153/api/" }).slackApiUrl,
).toBe("http://127.0.0.1:49153/api/");
expect(
resolveSlackWriteClientOptions({ slackApiUrl: "http://127.0.0.1:49153/api/" }).slackApiUrl,
).toBe("http://127.0.0.1:49153/api/");
});
it("passes merged options into WebClient", () => {
const customAgent = {} as never;
@@ -246,38 +190,6 @@ describe("slack web client config", () => {
expect(WebClient).toHaveBeenCalledTimes(2);
});
it("keeps write clients separated by Slack API URL", () => {
clearProxyEnvForTest();
try {
const first = getSlackWriteClient("xoxb-test", {
slackApiUrl: "http://127.0.0.1:49152/api/",
});
const second = getSlackWriteClient("xoxb-test", {
slackApiUrl: "http://127.0.0.1:49153/api/",
});
expect(second).not.toBe(first);
expect(WebClient).toHaveBeenCalledTimes(2);
} finally {
restoreProxyEnvForTest();
}
});
it("keeps write clients separated by SLACK_API_URL", () => {
clearProxyEnvForTest();
try {
process.env.SLACK_API_URL = "http://127.0.0.1:49152/api/";
const first = getSlackWriteClient("xoxb-test");
process.env.SLACK_API_URL = "http://127.0.0.1:49153/api/";
const second = getSlackWriteClient("xoxb-test");
expect(second).not.toBe(first);
expect(WebClient).toHaveBeenCalledTimes(2);
} finally {
restoreProxyEnvForTest();
}
});
it("builds stable non-secret token cache keys", () => {
const token = "xoxb-sensitive-token";
const first = createSlackTokenCacheKey(token);

View File

@@ -1,22 +1,14 @@
// Slack plugin module implements client behavior.
import { createHash } from "node:crypto";
import { type WebClientOptions, WebClient } from "@slack/web-api";
import {
resolveSlackWebClientOptions,
resolveSlackWriteClientOptions,
type SlackApiUrlClientOptions,
} from "./client-options.js";
import { resolveSlackWebClientOptions, resolveSlackWriteClientOptions } from "./client-options.js";
const SLACK_WRITE_CLIENT_CACHE_MAX = 32;
const slackWriteClientCache = new Map<string, WebClient>();
export type SlackWriteClientCacheOptions = SlackApiUrlClientOptions;
export {
createSlackApiUrlClientOptions,
resolveSlackWebClientOptions,
resolveSlackWriteClientOptions,
type SlackApiUrlClientOptions,
SLACK_DEFAULT_RETRY_OPTIONS,
SLACK_WRITE_RETRY_OPTIONS,
} from "./client-options.js";
@@ -33,27 +25,15 @@ export function createSlackTokenCacheKey(token: string): string {
return `sha256:${createHash("sha256").update(token).digest("base64url")}`;
}
function createSlackWriteClientCacheKey(
token: string,
options: SlackWriteClientCacheOptions,
): string {
export function getSlackWriteClient(token: string): WebClient {
const tokenKey = createSlackTokenCacheKey(token);
return options.slackApiUrl ? `${tokenKey}:api:${options.slackApiUrl}` : tokenKey;
}
export function getSlackWriteClient(
token: string,
options: SlackWriteClientCacheOptions = {},
): WebClient {
const resolvedOptions = resolveSlackWriteClientOptions(options);
const tokenKey = createSlackWriteClientCacheKey(token, resolvedOptions);
const cached = slackWriteClientCache.get(tokenKey);
if (cached) {
slackWriteClientCache.delete(tokenKey);
slackWriteClientCache.set(tokenKey, cached);
return cached;
}
const client = new WebClient(token, resolvedOptions);
const client = createSlackWriteClient(token);
if (slackWriteClientCache.size >= SLACK_WRITE_CLIENT_CACHE_MAX) {
const oldestTokenKey = slackWriteClientCache.keys().next().value;
if (oldestTokenKey) {

View File

@@ -1,114 +0,0 @@
// Slack tests cover real Web API routing behavior.
import { createServer, type Server } from "node:http";
import type { AddressInfo } from "node:net";
import { afterEach, describe, expect, it } from "vitest";
import { createSlackWebClient } from "./client.js";
const SLACK_API_URL_KEYS = ["SLACK_API_URL"] as const;
const PROXY_KEYS = [
"HTTPS_PROXY",
"HTTP_PROXY",
"https_proxy",
"http_proxy",
"NO_PROXY",
"no_proxy",
"OPENCLAW_PROXY_ACTIVE",
"OPENCLAW_PROXY_CA_FILE",
] as const;
const TEST_ENV_KEYS = [...SLACK_API_URL_KEYS, ...PROXY_KEYS] as const;
const originalEnv = { ...process.env };
type SlackApiRequest = {
authorization?: string;
method?: string;
url?: string;
};
function restoreTestEnv() {
for (const key of TEST_ENV_KEYS) {
if (originalEnv[key] !== undefined) {
process.env[key] = originalEnv[key];
} else {
delete process.env[key];
}
}
}
async function closeServer(server: Server): Promise<void> {
await new Promise<void>((resolve, reject) => {
server.close((error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
}
async function startSlackApiServer(requests: SlackApiRequest[]): Promise<{
baseUrl: string;
close(): Promise<void>;
}> {
const server = createServer((request, response) => {
requests.push({
authorization: request.headers.authorization,
method: request.method,
url: request.url,
});
request.resume();
response.writeHead(200, { "content-type": "application/json" });
response.end(
`${JSON.stringify({
ok: true,
team: "Mock Slack",
team_id: "TMOCK",
url: "https://mock.slack.test/",
user: "mock-bot",
user_id: "UMOCK",
})}\n`,
);
});
await new Promise<void>((resolve) => {
server.listen(0, "127.0.0.1", resolve);
});
const address = server.address() as AddressInfo;
return {
baseUrl: `http://127.0.0.1:${address.port}`,
close: () => closeServer(server),
};
}
afterEach(() => {
restoreTestEnv();
});
describe("Slack Web API routing", () => {
it("routes real WebClient requests to the SLACK_API_URL root", async () => {
for (const key of TEST_ENV_KEYS) {
delete process.env[key];
}
const requests: SlackApiRequest[] = [];
const server = await startSlackApiServer(requests);
try {
process.env.SLACK_API_URL = `${server.baseUrl}/api/`;
const client = createSlackWebClient("xoxb-route-proof", {
retryConfig: { retries: 0 },
timeout: 1000,
});
const result = await client.auth.test();
expect(result.ok).toBe(true);
expect(requests).toEqual([
{
authorization: "Bearer xoxb-route-proof",
method: "POST",
url: "/api/auth.test",
},
]);
} finally {
await server.close();
}
});
});

View File

@@ -38,22 +38,6 @@ describe("slack config schema", () => {
}
});
it("rejects Slack Web API URL config overrides", () => {
const res = SlackConfigSchema.safeParse({
apiUrl: "http://127.0.0.1:49152/api/",
accounts: { ops: { apiUrl: "http://127.0.0.1:49153/api/" } },
});
expect(res.success).toBe(false);
if (!res.success) {
expect(
res.error.issues.some(
(issue) => issue.code === "unrecognized_keys" && issue.keys.includes("apiUrl"),
),
).toBe(true);
}
});
it("accepts unfurl controls at root and account level", () => {
const res = SlackConfigSchema.safeParse({
unfurlLinks: false,

View File

@@ -9,7 +9,6 @@ import {
normalizeOptionalLowercaseString,
} from "openclaw/plugin-sdk/string-coerce-runtime";
import { resolveSlackAccount } from "./accounts.js";
import { createSlackApiUrlClientOptions } from "./client-options.js";
import { createSlackWebClient } from "./client.js";
type SlackUser = {
@@ -51,10 +50,9 @@ type SlackAuthTestResponse = {
team?: string;
};
function createSlackDirectoryClient(params: DirectoryConfigParams) {
function resolveReadToken(params: DirectoryConfigParams): string | undefined {
const account = resolveSlackAccount({ cfg: params.cfg, accountId: params.accountId });
const token = account.userToken ?? account.botToken?.trim();
return token ? createSlackWebClient(token, createSlackApiUrlClientOptions()) : null;
return account.userToken ?? account.botToken?.trim();
}
function normalizeQuery(value?: string | null): string {
@@ -103,10 +101,11 @@ function slackUserToDirectoryEntry(
export async function getSlackDirectorySelfLive(
params: DirectoryConfigParams,
): Promise<ChannelDirectoryEntry | null> {
const client = createSlackDirectoryClient(params);
if (!client) {
const token = resolveReadToken(params);
if (!token) {
return null;
}
const client = createSlackWebClient(token);
const auth = (await client.auth.test()) as SlackAuthTestResponse;
const userId = normalizeOptionalString(auth.user_id);
if (!userId) {
@@ -126,10 +125,11 @@ export async function getSlackDirectorySelfLive(
export async function listSlackDirectoryPeersLive(
params: DirectoryConfigParams,
): Promise<ChannelDirectoryEntry[]> {
const client = createSlackDirectoryClient(params);
if (!client) {
const token = resolveReadToken(params);
if (!token) {
return [];
}
const client = createSlackWebClient(token);
const query = normalizeQuery(params.query);
const members: SlackUser[] = [];
let cursor: string | undefined;
@@ -172,10 +172,11 @@ export async function listSlackDirectoryPeersLive(
export async function listSlackDirectoryGroupsLive(
params: DirectoryConfigParams,
): Promise<ChannelDirectoryEntry[]> {
const client = createSlackDirectoryClient(params);
if (!client) {
const token = resolveReadToken(params);
if (!token) {
return [];
}
const client = createSlackWebClient(token);
const query = normalizeQuery(params.query);
const channels: SlackChannel[] = [];
let cursor: string | undefined;

View File

@@ -32,7 +32,7 @@ import {
resolveSlackAccountDmPolicy,
} from "../accounts.js";
import { isSlackAnyNativeApprovalClientEnabled } from "../approval-native-gates.js";
import { createSlackApiUrlClientOptions, resolveSlackWebClientOptions } from "../client-options.js";
import { resolveSlackWebClientOptions } from "../client-options.js";
import { normalizeSlackWebhookPath, registerSlackHttpHandler } from "../http/index.js";
import { SLACK_TEXT_LIMIT } from "../limits.js";
import { resolveSlackChannelAllowlist } from "../resolve-channels.js";
@@ -288,8 +288,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const typingReaction = slackCfg.typingReaction?.trim() ?? "";
const mediaMaxBytes = (opts.mediaMaxMb ?? slackCfg.mediaMaxMb ?? 20) * 1024 * 1024;
const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false;
const slackApiUrlClientOptions = createSlackApiUrlClientOptions();
const clientOptions = resolveSlackWebClientOptions(slackApiUrlClientOptions);
const clientOptions = resolveSlackWebClientOptions();
const { app, receiver, socketModeLogger } = createSlackBoltApp({
interop: await getSlackBoltInterop(),
slackMode,
@@ -463,7 +462,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const resolved = await resolveSlackChannelAllowlist({
token: resolveToken,
entries,
...slackApiUrlClientOptions,
});
const nextChannels = { ...channelsConfig };
const mapping: string[] = [];
@@ -509,7 +507,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const resolvedUsers = await resolveSlackUserAllowlist({
token: resolveToken,
entries: allowEntries,
...slackApiUrlClientOptions,
});
const { mapping, unresolved, additions } = buildAllowlistResolutionSummary(
resolvedUsers,
@@ -556,7 +553,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const resolvedUsers = await resolveSlackUserAllowlist({
token: resolveToken,
entries: Array.from(userEntries),
...slackApiUrlClientOptions,
});
const { resolvedMap, mapping, unresolved } = buildAllowlistResolutionSummary(
resolvedUsers,

View File

@@ -1,7 +1,7 @@
// Slack plugin module implements probe behavior.
import type { BaseProbeResult } from "openclaw/plugin-sdk/channel-contract";
import { withTimeout } from "openclaw/plugin-sdk/text-utility-runtime";
import { createSlackWebClient, type SlackApiUrlClientOptions } from "./client.js";
import { createSlackWebClient } from "./client.js";
import { formatSlackError } from "./errors.js";
export type SlackProbe = BaseProbeResult & {
@@ -11,14 +11,8 @@ export type SlackProbe = BaseProbeResult & {
team?: { id?: string; name?: string };
};
export async function probeSlack(
token: string,
timeoutMs = 2500,
options: SlackApiUrlClientOptions = {},
): Promise<SlackProbe> {
const client = options.slackApiUrl
? createSlackWebClient(token, options)
: createSlackWebClient(token);
export async function probeSlack(token: string, timeoutMs = 2500): Promise<SlackProbe> {
const client = createSlackWebClient(token);
const start = Date.now();
try {
const result = await withTimeout(client.auth.test(), timeoutMs);

View File

@@ -1,7 +1,6 @@
// Slack plugin module implements resolve channels behavior.
import type { WebClient } from "@slack/web-api";
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/string-coerce-runtime";
import type { SlackApiUrlClientOptions } from "./client-options.js";
import { createSlackWebClient } from "./client.js";
import {
collectSlackCursorItems,
@@ -102,14 +101,8 @@ export async function resolveSlackChannelAllowlist(params: {
token: string;
entries: string[];
client?: WebClient;
slackApiUrl?: SlackApiUrlClientOptions["slackApiUrl"];
}): Promise<SlackChannelResolution[]> {
const client =
params.client ??
createSlackWebClient(
params.token,
params.slackApiUrl ? { slackApiUrl: params.slackApiUrl } : {},
);
const client = params.client ?? createSlackWebClient(params.token);
const channels = await listSlackChannels(client);
return resolveSlackAllowlistEntries<
{ id?: string; name?: string },

View File

@@ -4,7 +4,6 @@ import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalString,
} from "openclaw/plugin-sdk/string-coerce-runtime";
import type { SlackApiUrlClientOptions } from "./client-options.js";
import { createSlackWebClient } from "./client.js";
import {
collectSlackCursorItems,
@@ -154,14 +153,8 @@ export async function resolveSlackUserAllowlist(params: {
token: string;
entries: string[];
client?: WebClient;
slackApiUrl?: SlackApiUrlClientOptions["slackApiUrl"];
}): Promise<SlackUserResolution[]> {
const client =
params.client ??
createSlackWebClient(
params.token,
params.slackApiUrl ? { slackApiUrl: params.slackApiUrl } : {},
);
const client = params.client ?? createSlackWebClient(params.token);
const users = await listSlackUsers(client);
return resolveSlackAllowlistEntries<
{ id?: string; name?: string; email?: string },

View File

@@ -6,7 +6,7 @@ import {
normalizeOptionalString,
sortUniqueStrings,
} from "openclaw/plugin-sdk/string-coerce-runtime";
import { createSlackWebClient, type SlackApiUrlClientOptions } from "./client.js";
import { createSlackWebClient } from "./client.js";
import { formatSlackError } from "./errors.js";
export type SlackScopesResult = {
@@ -95,9 +95,8 @@ async function callSlack(
export async function fetchSlackScopes(
token: string,
timeoutMs: number,
options: SlackApiUrlClientOptions = {},
): Promise<SlackScopesResult> {
const client = createSlackWebClient(token, { ...options, timeout: timeoutMs });
const client = createSlackWebClient(token, { timeout: timeoutMs });
const attempts: SlackScopesMethod[] = ["auth.test", "auth.scopes", "apps.permissions.info"];
const errors: string[] = [];

View File

@@ -29,7 +29,6 @@ import type { SlackTokenSource } from "./accounts.js";
import { resolveSlackAccount } from "./accounts.js";
import { buildSlackBlocksFallbackText } from "./blocks-fallback.js";
import { validateSlackBlocksArray } from "./blocks-input.js";
import { createSlackApiUrlClientOptions } from "./client-options.js";
import { createSlackTokenCacheKey, getSlackWriteClient } from "./client.js";
import { markdownToSlackMrkdwnChunks } from "./format.js";
import { SLACK_TEXT_LIMIT } from "./limits.js";
@@ -751,7 +750,7 @@ async function sendMessageSlackQueuedInner(params: {
blocks?: (Block | KnownBlock)[];
}): Promise<SlackSendResult> {
const { opts, cfg, account, token, recipient, blocks, trimmedMessage } = params;
const client = opts.client ?? getSlackWriteClient(token, createSlackApiUrlClientOptions());
const client = opts.client ?? getSlackWriteClient(token);
const identity = resolveSlackSendIdentity({
accountId: account.accountId,
explicit: opts.identity,

View File

@@ -433,6 +433,31 @@ describe("synology-chat security helpers", () => {
expect(result).toContain("[truncated]");
});
it("truncates long inputs without splitting a surrogate pair", () => {
const loneSurrogatePattern =
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?:^|[^\uD800-\uDBFF])[\uDC00-\uDFFF]/u;
const input = "a".repeat(3999) + "\u{1F600}" + "b".repeat(2000);
const result = sanitizeInput(input);
expect(result).toContain("[truncated]");
expect(result).not.toMatch(loneSurrogatePattern);
expect(result).toBe(`${"a".repeat(3999)}... [truncated]`);
});
it("keeps complete supplementary-plane characters that fit before truncation", () => {
const loneSurrogatePattern =
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?:^|[^\uD800-\uDBFF])[\uDC00-\uDFFF]/u;
const emoji = "\u{1F600}";
const input = "a".repeat(3998) + emoji + "b".repeat(2000);
const result = sanitizeInput(input);
expect(result).toContain("[truncated]");
expect(result.startsWith(`${"a".repeat(3998)}${emoji}`)).toBe(true);
expect(result).not.toMatch(loneSurrogatePattern);
});
it("rate limits per user and caps tracked state", () => {
const limiter = new RateLimiter(3, 60);
expect(limiter.check("user1")).toBe(true);

View File

@@ -5,6 +5,7 @@
import { resolveStableChannelMessageIngress } from "openclaw/plugin-sdk/channel-ingress-runtime";
import { finiteSecondsToTimerSafeMilliseconds } from "openclaw/plugin-sdk/number-runtime";
import { safeEqualSecret } from "openclaw/plugin-sdk/security-runtime";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
import {
createFixedWindowRateLimiter,
type FixedWindowRateLimiter,
@@ -64,7 +65,7 @@ export function sanitizeInput(text: string): string {
const maxLength = 4000;
if (sanitized.length > maxLength) {
sanitized = sanitized.slice(0, maxLength) + "... [truncated]";
sanitized = truncateUtf16Safe(sanitized, maxLength) + "... [truncated]";
}
return sanitized;

View File

@@ -6,6 +6,7 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import * as querystring from "node:querystring";
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/string-coerce-runtime";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
import {
beginWebhookRequestPipelineOrReject,
createWebhookInFlightLimiter,
@@ -503,7 +504,7 @@ async function parseAndAuthorizeSynologyWebhook(params: {
respondNoContent(params.res);
return { ok: false };
}
const preview = cleanText.length > 100 ? `${cleanText.slice(0, 100)}...` : cleanText;
const preview = cleanText.length > 100 ? `${truncateUtf16Safe(cleanText, 100)}...` : cleanText;
return {
ok: true,
message: {
@@ -574,7 +575,7 @@ async function processAuthorizedSynologyWebhook(params: {
deliveryUserId,
params.account.allowInsecureSsl,
);
const replyPreview = reply.length > 100 ? `${reply.slice(0, 100)}...` : reply;
const replyPreview = reply.length > 100 ? `${truncateUtf16Safe(reply, 100)}...` : reply;
params.log?.info?.(
`Reply sent to ${params.message.payload.username} (${deliveryUserId}): ${replyPreview}`,
);

View File

@@ -0,0 +1,78 @@
import { describe, expect, it } from "vitest";
import { buildTelegramMessageContextForTest } from "./bot-message-context.test-harness.js";
import type { TelegramPromptContextEntry } from "./bot-message-context.types.js";
const telegramChatWindowContext: TelegramPromptContextEntry = {
label: "Conversation context",
source: "telegram",
type: "chat_window",
payload: {
order: "chronological",
relation: "selected_for_current_message",
messages: [
{
message_id: "10",
sender: "Pat",
timestamp_ms: 1_700_000_000_000,
body: "Earlier DM turn already in the transcript",
},
],
},
};
describe("buildTelegramMessageContext prompt context", () => {
it("omits Telegram chat-window context for existing unthreaded private DM sessions", async () => {
const ctx = await buildTelegramMessageContextForTest({
message: {
chat: { id: 1234, type: "private", first_name: "Pat" },
from: { id: 1234, first_name: "Pat" },
text: "continue",
},
promptContext: [telegramChatWindowContext],
sessionRuntime: {
readSessionUpdatedAt: ({ sessionKey }) =>
sessionKey === "agent:main:main" ? 1_700_000_000_000 : undefined,
},
});
expect(ctx?.ctxPayload.SessionKey).toBe("agent:main:main");
expect(ctx?.ctxPayload.UntrustedStructuredContext).toBeUndefined();
});
it("keeps Telegram chat-window context for fresh private DM sessions", async () => {
const ctx = await buildTelegramMessageContextForTest({
message: {
chat: { id: 1234, type: "private", first_name: "Pat" },
from: { id: 1234, first_name: "Pat" },
text: "start",
},
promptContext: [telegramChatWindowContext],
});
expect(ctx?.ctxPayload.UntrustedStructuredContext).toEqual([telegramChatWindowContext]);
});
it("keeps Telegram chat-window context for existing private DM replies", async () => {
const ctx = await buildTelegramMessageContextForTest({
message: {
chat: { id: 1234, type: "private", first_name: "Pat" },
from: { id: 1234, first_name: "Pat" },
text: "replying with context",
reply_to_message: {
chat: { id: 1234, type: "private", first_name: "Pat" },
from: { id: 1234, first_name: "Pat" },
text: "older referenced turn",
date: 1_700_000_000,
message_id: 10,
},
},
promptContext: [telegramChatWindowContext],
sessionRuntime: {
readSessionUpdatedAt: ({ sessionKey }) =>
sessionKey === "agent:main:main" ? 1_700_000_000_000 : undefined,
},
});
expect(ctx?.ctxPayload.UntrustedStructuredContext).toEqual([telegramChatWindowContext]);
});
});

View File

@@ -113,6 +113,10 @@ export async function resolveTelegramMessageContextStorePath(params: {
});
}
function isTelegramChatWindowPromptContext(entry: TelegramPromptContextEntry): boolean {
return entry.source === "telegram" && entry.type === "chat_window";
}
function replyTargetToChainEntry(replyTarget: TelegramReplyTarget): TelegramReplyChainEntry {
return {
...(replyTarget.id ? { messageId: replyTarget.id } : {}),
@@ -378,6 +382,17 @@ export async function buildTelegramInboundContextPayload(params: {
storePath,
sessionKey: route.sessionKey,
});
const shouldSuppressPersistedDmChatWindowContext =
!isGroup &&
previousTimestamp !== undefined &&
dmThreadId == null &&
visibleReplyChain.length === 0 &&
!visibleReplyTarget;
// Existing plain DMs already carry their history through the persistent
// transcript. Keep chat windows for fresh DMs, topics, replies, and groups.
const visiblePromptContext = shouldSuppressPersistedDmChatWindowContext
? promptContext.filter((entry) => !isTelegramChatWindowPromptContext(entry))
: promptContext;
const body = formatInboundEnvelope({
channel: "Telegram",
from: conversationLabel,
@@ -559,7 +574,7 @@ export async function buildTelegramInboundContextPayload(params: {
}
: undefined,
groupSystemPrompt: isGroup || (!isGroup && groupConfig) ? groupSystemPrompt : undefined,
untrustedContext: promptContext.length > 0 ? promptContext : undefined,
untrustedContext: visiblePromptContext.length > 0 ? visiblePromptContext : undefined,
},
contextVisibility: contextVisibilityMode,
extra: {

View File

@@ -23,6 +23,7 @@ type BuildTelegramMessageContextForTestParams = {
message: Record<string, unknown>;
me?: Record<string, unknown>;
allMedia?: TelegramMediaRef[];
promptContext?: BuildTelegramMessageContextParams["promptContext"];
options?: BuildTelegramMessageContextParams["options"];
cfg?: Record<string, unknown>;
accountId?: string;
@@ -112,6 +113,7 @@ export async function buildTelegramMessageContextForTest(
me: { id: 7, username: "bot", ...params.me },
} as never,
allMedia: params.allMedia ?? [],
promptContext: params.promptContext ?? [],
storeAllowFrom: [],
options: params.options ?? {},
bot: {

View File

@@ -4445,12 +4445,12 @@ describe("createTelegramBot", () => {
});
expect(sendMessageSpy.mock.calls.length).toBeGreaterThan(1);
for (const [index, call] of sendMessageSpy.mock.calls.entries()) {
for (const call of sendMessageSpy.mock.calls) {
const params = call[2] as
| { reply_to_message_id?: number; reply_parameters?: { message_id?: number } }
| undefined;
const actual = params?.reply_parameters?.message_id ?? params?.reply_to_message_id;
if (mode === "all" || index === 0) {
if (mode === "all") {
expect(actual).toBe(messageId);
} else {
expect(actual).toBeUndefined();

View File

@@ -326,36 +326,39 @@ async function sendTelegramVoiceFallbackText(opts: {
silent?: boolean;
replyMarkup?: ReturnType<typeof buildInlineKeyboard>;
replyQuoteText?: string;
replyToMode?: ReplyToMode;
}): Promise<number | undefined> {
let firstDeliveredMessageId: number | undefined;
const chunks = filterEmptyTelegramTextChunks(opts.chunkText(opts.text));
let appliedReplyTo = false;
for (const chunk of chunks) {
// Only apply reply reference, quote text, and buttons to the first chunk.
const replyToForChunk = !appliedReplyTo ? opts.replyToId : undefined;
const applyQuoteForChunk = !appliedReplyTo;
const messageId = await sendTelegramText(opts.bot, opts.chatId, chunk.text, opts.runtime, {
replyToMessageId: replyToForChunk,
replyQuoteMessageId: applyQuoteForChunk ? opts.replyQuoteMessageId : undefined,
replyQuoteText: applyQuoteForChunk ? opts.replyQuoteText : undefined,
replyQuotePosition: applyQuoteForChunk ? opts.replyQuotePosition : undefined,
replyQuoteEntities: applyQuoteForChunk ? opts.replyQuoteEntities : undefined,
thread: opts.thread,
textMode: chunk.textMode,
plainText: chunk.plainText,
richMessages: opts.richMessages,
linkPreview: opts.linkPreview,
tableMode: opts.tableMode,
silent: opts.silent,
replyMarkup: !appliedReplyTo ? opts.replyMarkup : undefined,
});
if (firstDeliveredMessageId == null) {
firstDeliveredMessageId = messageId;
}
if (replyToForChunk) {
appliedReplyTo = true;
}
}
await sendChunkedTelegramReplyText({
chunks,
progress: { hasReplied: false, hasDelivered: false },
replyToId: opts.replyToId,
replyToMode: opts.replyToMode ?? "first",
replyMarkup: opts.replyMarkup,
replyQuoteText: opts.replyQuoteText,
quoteOnlyOnFirstChunk: true,
sendChunk: async ({ chunk, replyToMessageId, replyMarkup, replyQuoteText }) => {
const messageId = await sendTelegramText(opts.bot, opts.chatId, chunk.text, opts.runtime, {
replyToMessageId,
replyQuoteMessageId: replyToMessageId ? opts.replyQuoteMessageId : undefined,
replyQuoteText,
replyQuotePosition: replyToMessageId ? opts.replyQuotePosition : undefined,
replyQuoteEntities: replyToMessageId ? opts.replyQuoteEntities : undefined,
thread: opts.thread,
textMode: chunk.textMode,
plainText: chunk.plainText,
richMessages: opts.richMessages,
linkPreview: opts.linkPreview,
tableMode: opts.tableMode,
silent: opts.silent,
replyMarkup,
});
if (firstDeliveredMessageId == null) {
firstDeliveredMessageId = messageId;
}
},
});
return firstDeliveredMessageId;
}
@@ -535,6 +538,7 @@ async function deliverMediaReply(params: {
silent: params.silent,
replyMarkup: params.replyMarkup,
replyQuoteText: params.replyQuoteText,
replyToMode: params.replyToMode,
});
if (firstDeliveredMessageId == null) {
firstDeliveredMessageId = fallbackMessageId;

View File

@@ -1484,7 +1484,7 @@ describe("deliverReplies", () => {
expectRecordFields(mockCallArg(sendMessage, 0, 2), { disable_notification: true });
});
it("voice fallback applies reply-to only on first chunk when replyToMode is first", async () => {
it("voice fallback avoids native replies for chunked first-mode fallback text", async () => {
const { runtime, sendVoice, sendMessage, bot } = createVoiceFailureHarness({
voiceError: createVoiceMessagesForbiddenError(),
sendMessageResult: {
@@ -1520,15 +1520,12 @@ describe("deliverReplies", () => {
expect(sendVoice).toHaveBeenCalledTimes(1);
expect(sendMessage.mock.calls.length).toBeGreaterThanOrEqual(2);
expectRecordFields(mockCallArg(sendMessage, 0, 2), {
reply_parameters: {
message_id: 77,
quote: "quoted context",
allow_sending_without_reply: true,
},
reply_markup: {
inline_keyboard: [[{ text: "Ack", callback_data: "ack" }]],
},
});
expect(mockCallArg(sendMessage, 0, 2)).not.toHaveProperty("reply_to_message_id");
expect(mockCallArg(sendMessage, 0, 2)).not.toHaveProperty("reply_parameters");
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_to_message_id", 77);
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_parameters");
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_markup");
@@ -1555,7 +1552,32 @@ describe("deliverReplies", () => {
expect(sendMessage).not.toHaveBeenCalled();
});
it("replyToMode 'first' only applies reply-to to the first text chunk", async () => {
it("replyToMode 'first' keeps native reply-to for a single text chunk", async () => {
const runtime = createRuntime();
const sendMessage = vi.fn().mockResolvedValue({
message_id: 20,
chat: { id: "123" },
});
const bot = createBot({ sendMessage });
await deliverReplies({
replies: [{ text: "one chunk", replyToId: "700" }],
chatId: "123",
token: "tok",
runtime,
bot,
replyToMode: "first",
textLimit: 4000,
});
expect(sendMessage).toHaveBeenCalledTimes(1);
expectRecordFields(mockCallArg(sendMessage, 0, 2), {
reply_to_message_id: 700,
allow_sending_without_reply: true,
});
});
it("replyToMode 'first' avoids native reply-to for chunked text", async () => {
const runtime = createRuntime();
const sendMessage = vi.fn().mockResolvedValue({
message_id: 20,
@@ -1575,13 +1597,10 @@ describe("deliverReplies", () => {
});
expect(sendMessage.mock.calls.length).toBeGreaterThanOrEqual(2);
// First chunk should have reply_to_message_id
expectRecordFields(mockCallArg(sendMessage, 0, 2), {
reply_to_message_id: 700,
allow_sending_without_reply: true,
});
// Second chunk should NOT have reply_to_message_id
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_to_message_id");
for (const call of sendMessage.mock.calls) {
expect(call[2]).not.toHaveProperty("reply_to_message_id");
expect(call[2]).not.toHaveProperty("reply_parameters");
}
});
it("clamps reply chunks to Telegram rich message limit", async () => {

View File

@@ -184,9 +184,9 @@ describe("buildTelegramThreadParams", () => {
{ input: { id: 0, scope: "dm" as const }, expected: undefined },
{ input: { id: -1, scope: "dm" as const }, expected: undefined },
{ input: { id: 1.9, scope: "dm" as const }, expected: { message_thread_id: 1 } },
// id=0 should be included for forum and none scopes (not falsy)
// id=0 should be included for forum scope (not falsy).
{ input: { id: 0, scope: "forum" as const }, expected: { message_thread_id: 0 } },
{ input: { id: 0, scope: "none" as const }, expected: { message_thread_id: 0 } },
{ input: { id: 42, scope: "none" as const }, expected: undefined },
])("builds thread params", ({ input, expected }) => {
expect(buildTelegramThreadParams(input)).toEqual(expected);
});

View File

@@ -427,6 +427,10 @@ export function buildTelegramThreadParams(thread?: TelegramThreadSpec | null) {
return normalized > 0 ? { message_thread_id: normalized } : undefined;
}
if (thread.scope === "none") {
return undefined;
}
// Telegram rejects message_thread_id=1 for General forum topic
if (normalized === TELEGRAM_GENERAL_TOPIC_ID) {
return undefined;

View File

@@ -1,5 +1,6 @@
// Telegram plugin module implements reply threading behavior.
import type { ReplyToMode } from "openclaw/plugin-sdk/config-contracts";
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
export type DeliveryProgress = {
hasReplied: boolean;
@@ -48,17 +49,24 @@ export async function sendChunkedTelegramReplyText<
}) => Promise<void>;
}): Promise<void> {
const applyDelivered = params.markDelivered ?? markDelivered;
const suppressSingleUseReply =
params.chunks.length > 1 && isSingleUseReplyToMode(params.replyToMode);
for (let i = 0; i < params.chunks.length; i += 1) {
const chunk = params.chunks[i];
if (!chunk) {
continue;
}
const isFirstChunk = i === 0;
const replyToMessageId = resolveReplyToForSend({
replyToId: params.replyToId,
replyToMode: params.replyToMode,
progress: params.progress,
});
// Telegram Desktop can render long formatted native-reply chunks as
// unsupported messages. Multi-part `first` replies consume the reply target
// without adding native reply params, preserving visible text.
const replyToMessageId = suppressSingleUseReply
? undefined
: resolveReplyToForSend({
replyToId: params.replyToId,
replyToMode: params.replyToMode,
progress: params.progress,
});
const shouldAttachQuote =
Boolean(replyToMessageId) &&
Boolean(params.replyQuoteText) &&
@@ -70,7 +78,10 @@ export async function sendChunkedTelegramReplyText<
replyMarkup: isFirstChunk ? params.replyMarkup : undefined,
replyQuoteText: shouldAttachQuote ? params.replyQuoteText : undefined,
});
markReplyApplied(params.progress, replyToMessageId);
markReplyApplied(
params.progress,
suppressSingleUseReply && isFirstChunk ? params.replyToId : replyToMessageId,
);
applyDelivered(params.progress);
}
}

View File

@@ -82,19 +82,37 @@ describe("telegram channel message adapter", () => {
};
const provePayload = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-payload", chatId: "12345" });
sendMessageTelegramMock.mockResolvedValueOnce({
messageId: "tg-payload-2",
chatId: "12345",
receipt: {
primaryPlatformMessageId: "tg-payload-1",
platformMessageIds: ["tg-payload-1", "tg-payload-2"],
parts: [
{ platformMessageId: "tg-payload-1", kind: "text", index: 0 },
{ platformMessageId: "tg-payload-2", kind: "text", index: 1 },
],
sentAt: 123,
},
});
const result = await adapter.send!.payload!({
cfg: {} as never,
to: "12345",
text: "payload",
payload: { text: "payload" },
replyToId: "900",
replyToIdSource: "implicit",
replyToMode: "first",
threadId: "12",
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith("12345", "payload", {
cfg: {},
verbose: false,
messageThreadId: undefined,
replyToMessageId: undefined,
messageThreadId: 12,
replyToMessageId: 900,
replyToIdSource: "implicit",
replyToMode: "first",
accountId: undefined,
silent: undefined,
gatewayClientScopes: undefined,
@@ -104,7 +122,8 @@ describe("telegram channel message adapter", () => {
quoteText: undefined,
buttons: undefined,
});
expect(result.receipt.platformMessageIds).toEqual(["tg-payload"]);
expect(result.receipt.primaryPlatformMessageId).toBe("tg-payload-1");
expect(result.receipt.platformMessageIds).toEqual(["tg-payload-1", "tg-payload-2"]);
};
const proveReplyThreadSilent = async () => {
@@ -114,6 +133,8 @@ describe("telegram channel message adapter", () => {
to: "12345",
text: "threaded",
replyToId: "900",
replyToIdSource: "implicit",
replyToMode: "first",
threadId: "12",
silent: true,
deps: { sendTelegram: sendMessageTelegramMock },
@@ -123,6 +144,8 @@ describe("telegram channel message adapter", () => {
verbose: false,
messageThreadId: 12,
replyToMessageId: 900,
replyToIdSource: "implicit",
replyToMode: "first",
accountId: undefined,
silent: true,
gatewayClientScopes: undefined,
@@ -138,6 +161,9 @@ describe("telegram channel message adapter", () => {
cfg: {} as never,
to: "12345",
text: "batch",
replyToId: "900",
replyToIdSource: "implicit",
replyToMode: "first",
payload: {
text: "batch",
mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
@@ -152,7 +178,9 @@ describe("telegram channel message adapter", () => {
cfg: {},
verbose: false,
messageThreadId: undefined,
replyToMessageId: undefined,
replyToMessageId: 900,
replyToIdSource: "implicit",
replyToMode: "first",
accountId: undefined,
silent: undefined,
gatewayClientScopes: undefined,
@@ -172,6 +200,8 @@ describe("telegram channel message adapter", () => {
verbose: false,
messageThreadId: undefined,
replyToMessageId: undefined,
replyToIdSource: undefined,
replyToMode: undefined,
accountId: undefined,
silent: undefined,
gatewayClientScopes: undefined,
@@ -222,6 +252,36 @@ describe("telegram channel message adapter", () => {
});
});
it("keeps implicit first replies on the first delivered payload media", async () => {
const adapter = requireTelegramMessageAdapter();
sendMessageTelegramMock
.mockResolvedValueOnce({ messageId: "tg-media-1", chatId: "12345" })
.mockResolvedValueOnce({ messageId: "tg-media-2", chatId: "12345" });
await adapter.send!.payload!({
cfg: {} as never,
to: "12345",
text: "batch",
replyToId: "900",
replyToIdSource: "implicit",
replyToMode: "first",
payload: {
text: "batch",
mediaUrls: ["", "https://example.com/a.png", "https://example.com/b.png"],
},
deps: { sendTelegram: sendMessageTelegramMock },
});
const firstOpts = sendMessageTelegramMock.mock.calls[0]?.[2] as
| { replyToMessageId?: number }
| undefined;
const secondOpts = sendMessageTelegramMock.mock.calls[1]?.[2] as
| { replyToMessageId?: number }
| undefined;
expect(firstOpts?.replyToMessageId).toBe(900);
expect(secondOpts?.replyToMessageId).toBeUndefined();
});
it("backs declared live preview finalizer capabilities with adapter proofs", async () => {
const adapter = requireTelegramMessageAdapter();

View File

@@ -139,6 +139,16 @@ describe("isRecoverableTelegramNetworkError", () => {
expect(isRecoverableTelegramNetworkError(undiciSnippetErr, { context: "polling" })).toBe(true);
});
it("treats delete/react (idempotent) contexts like polling, not send", () => {
const undiciSnippetErr = new Error("Undici: socket failure");
// delete and react are idempotent Telegram operations; a transient
// snippet-only error must be retried (allowMessageMatch defaults true),
// matching polling/webhook. send stays strict as the regression guard.
expect(isRecoverableTelegramNetworkError(undiciSnippetErr, { context: "delete" })).toBe(true);
expect(isRecoverableTelegramNetworkError(undiciSnippetErr, { context: "react" })).toBe(true);
expect(isRecoverableTelegramNetworkError(undiciSnippetErr, { context: "send" })).toBe(false);
});
it("treats grammY failed-after envelope errors as recoverable in send context", () => {
expect(
isRecoverableTelegramNetworkError(

View File

@@ -141,7 +141,13 @@ export function isTelegramMisdirectedRequestError(err: unknown): boolean {
return false;
}
export type TelegramNetworkErrorContext = "polling" | "send" | "webhook" | "unknown";
export type TelegramNetworkErrorContext =
| "polling"
| "send"
| "webhook"
| "delete"
| "react"
| "unknown";
export type TelegramNetworkErrorOrigin = {
method?: string | null;
url?: string | null;

View File

@@ -19,6 +19,7 @@ import {
resolvePayloadMediaUrls,
sendPayloadMediaSequenceOrFallback,
} from "openclaw/plugin-sdk/reply-payload";
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import { sanitizeAssistantVisibleText } from "openclaw/plugin-sdk/text-chunking";
import type { TelegramInlineButtons } from "./button-types.js";
@@ -60,6 +61,8 @@ async function resolveTelegramSendContext(params: {
deps?: OutboundSendDeps;
accountId?: string | null;
replyToId?: string | null;
replyToIdSource?: TelegramSendOpts["replyToIdSource"];
replyToMode?: TelegramSendOpts["replyToMode"];
threadId?: string | number | null;
formatting?: OutboundDeliveryFormattingOptions;
silent?: boolean;
@@ -74,6 +77,8 @@ async function resolveTelegramSendContext(params: {
tableMode?: OutboundDeliveryFormattingOptions["tableMode"];
messageThreadId?: number;
replyToMessageId?: number;
replyToIdSource?: TelegramSendOpts["replyToIdSource"];
replyToMode?: TelegramSendOpts["replyToMode"];
accountId?: string;
silent?: boolean;
gatewayClientScopes?: readonly string[];
@@ -87,6 +92,8 @@ async function resolveTelegramSendContext(params: {
cfg: params.cfg,
messageThreadId: parseTelegramThreadId(params.threadId),
replyToMessageId: parseTelegramReplyToMessageId(params.replyToId),
...(params.replyToIdSource !== undefined ? { replyToIdSource: params.replyToIdSource } : {}),
...(params.replyToMode !== undefined ? { replyToMode: params.replyToMode } : {}),
accountId: params.accountId ?? undefined,
silent: params.silent,
gatewayClientScopes: params.gatewayClientScopes,
@@ -151,6 +158,19 @@ export async function sendTelegramPayloadMessages(params: {
quoteText,
...(params.payload.audioAsVoice === true ? { asVoice: true } : {}),
};
const shouldConsumeImplicitReplyTarget =
payloadOpts.replyToIdSource === "implicit" &&
payloadOpts.replyToMode !== undefined &&
isSingleUseReplyToMode(payloadOpts.replyToMode);
const consumedImplicitReplyPayloadOpts = shouldConsumeImplicitReplyTarget
? {
...payloadOpts,
replyToMessageId: undefined,
replyToIdSource: undefined,
replyToMode: undefined,
}
: payloadOpts;
let implicitReplyTargetAvailable = true;
if (reactionEmoji) {
if (typeof replyToMessageId !== "number") {
throw new Error("Telegram reaction requires a reply target");
@@ -179,12 +199,18 @@ export async function sendTelegramPayloadMessages(params: {
...payloadOpts,
buttons,
}),
send: async ({ text: textLocal, mediaUrl, isFirst }) =>
await params.send(params.to, textLocal, {
...payloadOpts,
send: async ({ text: textLocal, mediaUrl, isFirst }) => {
const mediaPayloadOpts =
shouldConsumeImplicitReplyTarget && !implicitReplyTargetAvailable
? consumedImplicitReplyPayloadOpts
: payloadOpts;
implicitReplyTargetAvailable = false;
return await params.send(params.to, textLocal, {
...mediaPayloadOpts,
mediaUrl,
...(isFirst ? { buttons } : {}),
}),
});
},
});
}

View File

@@ -72,6 +72,7 @@ let isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess: typeof import("./telegr
let listTelegramSpooledUpdateClaims: typeof import("./telegram-ingress-spool.js").listTelegramSpooledUpdateClaims;
let listTelegramSpooledUpdates: typeof import("./telegram-ingress-spool.js").listTelegramSpooledUpdates;
let recoverStaleTelegramSpooledUpdateClaims: typeof import("./telegram-ingress-spool.js").recoverStaleTelegramSpooledUpdateClaims;
let telegramSpooledUpdateClaimLeaseMs: typeof import("./telegram-ingress-spool.js").TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS;
let writeTelegramSpooledUpdate: typeof import("./telegram-ingress-spool.js").writeTelegramSpooledUpdate;
let createTelegramSpooledReplayDeferredParticipant: typeof import("./bot-processing-outcome.js").createTelegramSpooledReplayDeferredParticipant;
let TelegramMessageDispatchReplayForgetError: typeof import("./message-dispatch-dedupe.js").TelegramMessageDispatchReplayForgetError;
@@ -685,6 +686,7 @@ describe("TelegramPollingSession", () => {
listTelegramSpooledUpdateClaims,
listTelegramSpooledUpdates,
recoverStaleTelegramSpooledUpdateClaims,
TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS: telegramSpooledUpdateClaimLeaseMs,
writeTelegramSpooledUpdate,
} = await import("./telegram-ingress-spool.js"));
({ createTelegramSpooledReplayDeferredParticipant } =
@@ -1067,13 +1069,13 @@ describe("TelegramPollingSession", () => {
const queue = createChannelIngressQueue({ ...options, channelId: "telegram" });
return {
...queue,
claim: async (...args: Parameters<typeof queue.claim>) => {
if (args[0] === "0000000000000001" && !blockedFirstClaim) {
claimNext: async (...args: Parameters<typeof queue.claimNext>) => {
if (!blockedFirstClaim) {
blockedFirstClaim = true;
firstClaimStarted?.();
await firstClaimGate;
}
return queue.claim(...args);
return queue.claimNext(...args);
},
};
},
@@ -1661,6 +1663,86 @@ describe("TelegramPollingSession", () => {
});
});
it("stops refreshing a claim when the drain loop is stalled", async () => {
vi.useFakeTimers({ now: 1_000 });
const refreshHarness = installSpooledClaimRefreshHarness();
await withTempSpool(async (tempDir) => {
let blockedSecondClaim = false;
let releaseSecondClaim: (() => void) | undefined;
const secondClaimStarted = new Promise<void>((resolve) => {
const gate = new Promise<void>((release) => {
releaseSecondClaim = release;
});
setTelegramRuntime({
state: {
resolveStateDir: () => tempDir,
openChannelIngressQueue: (
options?: Omit<Parameters<typeof createChannelIngressQueue>[0], "channelId">,
) => {
const queue = createChannelIngressQueue({ ...options, channelId: "telegram" });
return {
...queue,
claimNext: async (...args: Parameters<typeof queue.claimNext>) => {
const claimOptions = args[0];
const blockedLaneKeys = claimOptions?.blockedLaneKeys
? Array.from(claimOptions.blockedLaneKeys)
: [];
const candidateIds = claimOptions?.candidateIds
? Array.from(claimOptions.candidateIds)
: [];
if (
candidateIds.includes("0000000000000043") &&
blockedLaneKeys.length > 0 &&
!blockedSecondClaim
) {
blockedSecondClaim = true;
resolve();
await gate;
}
return queue.claimNext(...args);
},
};
},
},
} as TelegramRuntime);
});
const abort = new AbortController();
let releaseHandler: (() => void) | undefined;
const handlerDone = new Promise<void>((resolve) => {
releaseHandler = resolve;
});
await writeSpooledTestUpdates(tempDir, [
topicUpdate(42, 10, "first topic 10 turn"),
topicUpdate(43, 11, "blocked topic 11 turn"),
]);
const { runPromise, stopWorker } = startIsolatedIngressSession({
abort,
spoolDir: tempDir,
handleUpdate: async () => {
await handlerDone;
},
});
try {
await secondClaimStarted;
const before = await claimedAtForUpdate(tempDir, 42);
vi.setSystemTime(1_000 + pollingSessionTesting.spooledClaimRefreshIntervalMs * 2 + 1);
refreshHarness.triggerRefresh();
await Promise.resolve();
expect(await claimedAtForUpdate(tempDir, 42)).toBe(before);
} finally {
releaseSecondClaim?.();
releaseHandler?.();
abort.abort();
stopWorker();
refreshHarness.restore();
vi.useRealTimers();
await runPromise;
}
});
});
it("holds buffered spooled claims until deferred processing settles without blocking same-lane buffering", async () => {
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
@@ -2180,10 +2262,11 @@ describe("TelegramPollingSession", () => {
if (!claimed) {
throw new Error("Expected claimed update");
}
const liveOwnerPid = process.ppid > 0 ? process.ppid : 1;
await adoptClaimOwner({
spoolDir: tempDir,
updateId: 42,
ownerId: `${process.pid}:other-process`,
ownerId: `${liveOwnerPid}:other-process`,
claimedAt: Date.now(),
});
@@ -2203,10 +2286,9 @@ describe("TelegramPollingSession", () => {
});
});
it("fails timed-out current-process claims before draining later same-lane updates", async () => {
it("releases pid-reused claims before draining later same-lane updates", async () => {
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
const log = vi.fn();
const events: string[] = [];
await writeSpooledTestUpdates(tempDir, [
topicUpdate(42, 10, "wedged topic 10 turn"),
@@ -2232,7 +2314,6 @@ describe("TelegramPollingSession", () => {
const { runPromise, stopWorker } = startIsolatedIngressSession({
abort,
spoolDir: tempDir,
log,
spooledUpdateHandlerTimeoutMs: 100,
handleUpdate: async (update) => {
events.push(`handled:${update.update_id}`);
@@ -2240,21 +2321,58 @@ describe("TelegramPollingSession", () => {
},
});
await vi.waitFor(() => expect(events).toEqual(["handled:43"]));
await vi.waitFor(() => expect(events).toEqual(["handled:42"]));
await runPromise;
expect(await failedUpdateReasons(tempDir)).toEqual([
{ id: 42, reason: "lane-released-on-stuck" },
]);
expect(await pendingUpdateIds(tempDir, "all")).toEqual([]);
expect(await failedUpdateReasons(tempDir)).toEqual([]);
expect(await pendingUpdateIds(tempDir, "all")).toEqual([43]);
expect(await listTelegramSpooledUpdateClaims({ spoolDir: tempDir })).toEqual([]);
expectLogIncludes(
log,
"spooled update 42 Telegram spooled update claim owned by this process",
);
stopWorker();
});
});
it("reclaims an expired foreign claim so the lane can drain", async () => {
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
const events: number[] = [];
await writeSpooledTestUpdates(tempDir, [
topicUpdate(42, 10, "expired foreign claim"),
topicUpdate(43, 10, "later topic 10 turn"),
]);
const interrupted = (await listTelegramSpooledUpdates({ spoolDir: tempDir })).find(
(update) => update.updateId === 42,
);
if (!interrupted) {
throw new Error("Expected interrupted update");
}
const claimed = await claimTelegramSpooledUpdate(interrupted);
if (!claimed) {
throw new Error("Expected claimed update");
}
await adoptClaimOwner({
spoolDir: tempDir,
updateId: 42,
ownerId: "1:other-process",
claimedAt: Date.now() - telegramSpooledUpdateClaimLeaseMs - 1,
});
const { runPromise, stopWorker } = startIsolatedIngressSession({
abort,
spoolDir: tempDir,
spooledUpdateHandlerTimeoutMs: 100,
handleUpdate: async (update) => {
events.push(update.update_id ?? -1);
if (events.length === 2) {
abort.abort();
}
},
});
await vi.waitFor(() => expect(events).toEqual([42, 43]));
stopWorker();
await runPromise;
});
});
it("scans past active-lane backlogs to start unrelated lanes", async () => {
await withTempSpool(async (tempDir) => {
const abort = new AbortController();

View File

@@ -34,7 +34,7 @@ import { TelegramPollingTransportState } from "./polling-transport-state.js";
import { TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS } from "./request-timeouts.js";
import { getTelegramSequentialKey } from "./sequential-key.js";
import {
claimTelegramSpooledUpdate,
claimNextTelegramSpooledUpdate,
deleteTelegramSpooledUpdate,
failTelegramSpooledUpdateClaim,
isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess,
@@ -44,6 +44,7 @@ import {
refreshTelegramSpooledUpdateClaim,
releaseTelegramSpooledUpdateClaim,
resolveTelegramIngressSpoolDir,
TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS,
writeTelegramSpooledUpdate,
type ClaimedTelegramSpooledUpdate,
type TelegramSpooledUpdate,
@@ -133,6 +134,7 @@ const TELEGRAM_SPOOLED_HANDLER_TIMEOUT_ENV = "OPENCLAW_TELEGRAM_SPOOLED_HANDLER_
const TELEGRAM_SPOOLED_DRAIN_START_LIMIT = 100;
const TELEGRAM_SPOOLED_DRAIN_SCAN_LIMIT = TELEGRAM_SPOOLED_DRAIN_START_LIMIT * 10;
const TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS = 5 * 60 * 1000;
const TELEGRAM_SPOOLED_CLAIM_HEALTH_GRACE_MS = 2 * TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS;
const TELEGRAM_SPOOLED_SESSION_INIT_CONFLICT_RETRY_BASE_MS = 5_000;
const TELEGRAM_SPOOLED_SESSION_INIT_CONFLICT_RETRY_MAX_MS = 60_000;
const TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS = Math.ceil(
@@ -324,6 +326,21 @@ type SpooledUpdateDrainResult = {
// Account health restarts create a new session in the same process while an old
// spooled handler may still be running after shutdown grace.
const activeSpooledUpdateHandlersByLane = new Map<string, SpooledUpdateHandlerState>();
type SpooledUpdateDrainHealth = {
lastCompletedAt: number;
};
const spooledUpdateDrainHealthBySpool = new Map<string, SpooledUpdateDrainHealth>();
function getSpooledUpdateDrainHealth(spoolDir: string): SpooledUpdateDrainHealth {
const existing = spooledUpdateDrainHealthBySpool.get(spoolDir);
if (existing) {
return existing;
}
const created = { lastCompletedAt: Date.now() };
spooledUpdateDrainHealthBySpool.set(spoolDir, created);
return created;
}
function resolveSpooledUpdateHandlerTimeoutMs(params: {
configured?: number;
@@ -564,32 +581,51 @@ export class TelegramPollingSession {
}
}
async #claimSpooledUpdate(
update: TelegramSpooledUpdate,
): Promise<ClaimedTelegramSpooledUpdate | null> {
async #claimNextSpooledUpdate(params: {
blockedLaneKeys: Set<string>;
candidateUpdateIds: readonly number[];
spoolDir: string;
}): Promise<ClaimedTelegramSpooledUpdate | null> {
try {
return await claimTelegramSpooledUpdate(update);
return await claimNextTelegramSpooledUpdate({
spoolDir: params.spoolDir,
blockedLaneKeys: params.blockedLaneKeys,
botInfo: this.opts.botInfo,
candidateUpdateIds: params.candidateUpdateIds,
scanLimit: TELEGRAM_SPOOLED_DRAIN_SCAN_LIMIT,
});
} catch (err) {
this.opts.log(
`[telegram][diag] spooled update ${update.updateId} claim failed; keeping for retry: ${formatErrorMessage(err)}`,
`[telegram][diag] spooled update claim failed; keeping pending updates for retry: ${formatErrorMessage(err)}`,
);
return null;
}
}
#startSpooledUpdateClaimRefresh(update: ClaimedTelegramSpooledUpdate): () => void {
// Refresh only while this process still owns useful work for this claim token.
// Stopping before release/fail/delete lets stale recovery take over if work stalls.
#startSpooledUpdateClaimRefresh(
update: ClaimedTelegramSpooledUpdate,
isDrainHealthy: () => boolean,
onDrainUnhealthy: () => void,
): () => void {
// Refresh only while this process owns useful work and its drain loop is making progress.
// Stopping the lease on a stalled drain lets another process recover the lane.
let stopped = false;
let refreshing = false;
const refresh = async (): Promise<void> => {
if (stopped || refreshing) {
return;
}
if (!isDrainHealthy()) {
onDrainUnhealthy();
stopped = true;
clearInterval(timer);
return;
}
refreshing = true;
try {
const refreshed = await refreshTelegramSpooledUpdateClaim(update);
if (!refreshed && !stopped) {
onDrainUnhealthy();
stopped = true;
clearInterval(timer);
}
@@ -597,6 +633,11 @@ export class TelegramPollingSession {
this.opts.log(
`[telegram][diag] spooled update ${update.updateId} claim refresh failed: ${formatErrorMessage(err)}`,
);
if (!stopped) {
onDrainUnhealthy();
stopped = true;
clearInterval(timer);
}
} finally {
refreshing = false;
}
@@ -730,58 +771,6 @@ export class TelegramPollingSession {
return deferredSpooledUpdateClaimsByKey.has(buildDeferredSpooledUpdateClaimKey(update));
}
async #failTimedOutCurrentProcessSpooledUpdateClaims(params: {
activeLaneKeys: Set<string>;
spoolDir: string;
}): Promise<void> {
const claims = await listTelegramSpooledUpdateClaims({ spoolDir: params.spoolDir });
const now = Date.now();
for (const claim of claims) {
const claimOwner = claim.claim;
if (!claimOwner) {
continue;
}
if (this.#isDeferredSpooledUpdateClaim(claim)) {
continue;
}
if (params.activeLaneKeys.has(this.#spooledUpdateLaneKey(claim))) {
continue;
}
if (now - claimOwner.claimedAt < this.#spooledUpdateHandlerTimeoutMs) {
continue;
}
if (!isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess(claim)) {
continue;
}
// Same PID with a stale owner id means this process orphaned a previous
// local handler state; different live PIDs may still be processing.
if (claimOwner.processPid !== process.pid) {
continue;
}
const claimedForMs = now - claimOwner.claimedAt;
const message = `Telegram spooled update claim owned by this process for ${formatDurationPrecise(claimedForMs)} without active handler state; marking failed so the lane can continue.`;
try {
const failed = await failTelegramSpooledUpdateClaim({
update: claim,
reason: "lane-released-on-stuck",
message,
});
if (!failed) {
this.opts.log(
`[telegram][diag] spooled update ${claim.updateId} current-process claim no longer had a processing marker to fail.`,
);
continue;
}
} catch (err) {
this.opts.log(
`[telegram][diag] spooled update ${claim.updateId} current-process claim could not be marked failed: ${formatErrorMessage(err)}`,
);
continue;
}
this.opts.log(`[telegram][diag] spooled update ${claim.updateId} ${message}`);
}
}
async #failTimedOutDeferredSpooledUpdate(state: DeferredSpooledUpdateClaimState): Promise<void> {
const message =
state.timedOutMessage ??
@@ -875,8 +864,12 @@ export class TelegramPollingSession {
}
#spooledUpdateLaneKey(update: TelegramSpooledUpdate): string {
return this.#rawSpooledUpdateLaneKey(update.update);
}
#rawSpooledUpdateLaneKey(update: unknown): string {
return getTelegramSequentialKey({
update: update.update as Parameters<typeof getTelegramSequentialKey>[0]["update"],
update: update as Parameters<typeof getTelegramSequentialKey>[0]["update"],
...(this.opts.botInfo ? { me: this.opts.botInfo } : {}),
});
}
@@ -904,20 +897,19 @@ export class TelegramPollingSession {
async #drainSpooledUpdates(params: {
bot: TelegramBot;
isDrainHealthy: () => boolean;
spoolDir: string;
}): Promise<SpooledUpdateDrainResult> {
const activeLaneKeys = this.#activeSpooledUpdateLaneKeysForSpool(params.spoolDir);
await this.#failTimedOutCurrentProcessSpooledUpdateClaims({
activeLaneKeys,
spoolDir: params.spoolDir,
});
await recoverStaleTelegramSpooledUpdateClaims({
spoolDir: params.spoolDir,
staleMs: 0,
shouldRecover: (claim) =>
!this.#isDeferredSpooledUpdateClaim(claim) &&
!activeLaneKeys.has(this.#spooledUpdateLaneKey(claim)) &&
!isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess(claim),
!isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess(claim, {
maxAgeMs: TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS,
}),
});
const claimedLaneKeys = new Set(
(
@@ -932,31 +924,63 @@ export class TelegramPollingSession {
spoolDir: params.spoolDir,
limit: TELEGRAM_SPOOLED_DRAIN_SCAN_LIMIT,
});
const candidateUpdateIds = updates.map((update) => update.updateId);
const blockedByLane = new Set<string>();
let started = 0;
const retryDelayedLaneKeys = new Set<string>();
for (const update of updates) {
const laneKey = this.#spooledUpdateLaneKey(update);
if (this.opts.abortSignal?.aborted) {
break;
}
if (resolveSpooledUpdateRetryDelayMs(update) > 0) {
claimedLaneKeys.add(laneKey);
continue;
}
const handlerKey = buildSpooledUpdateHandlerKey({ spoolDir: params.spoolDir, laneKey });
if (activeSpooledUpdateHandlersByLane.has(handlerKey)) {
blockedByLane.add(handlerKey);
continue;
}
if (claimedLaneKeys.has(laneKey)) {
continue;
if (resolveSpooledUpdateRetryDelayMs(update) > 0) {
retryDelayedLaneKeys.add(laneKey);
}
const claimedUpdate = await this.#claimSpooledUpdate(update);
}
const blockedLaneKeys = new Set([
...activeLaneKeys,
...claimedLaneKeys,
...retryDelayedLaneKeys,
]);
let started = 0;
while (started < TELEGRAM_SPOOLED_DRAIN_START_LIMIT) {
if (this.opts.abortSignal?.aborted) {
break;
}
const claimedUpdate = await this.#claimNextSpooledUpdate({
blockedLaneKeys,
candidateUpdateIds,
spoolDir: params.spoolDir,
});
if (!claimedUpdate) {
claimedLaneKeys.add(laneKey);
break;
}
const laneKey = this.#spooledUpdateLaneKey(claimedUpdate);
const handlerKey = buildSpooledUpdateHandlerKey({ spoolDir: params.spoolDir, laneKey });
if (activeSpooledUpdateHandlersByLane.has(handlerKey)) {
blockedByLane.add(handlerKey);
await releaseTelegramSpooledUpdateClaim(claimedUpdate, {
lastError: "active Telegram spool handler already owns this lane",
});
blockedLaneKeys.add(laneKey);
continue;
}
const stopClaimRefresh = this.#startSpooledUpdateClaimRefresh(claimedUpdate);
const stopClaimRefresh = this.#startSpooledUpdateClaimRefresh(
claimedUpdate,
params.isDrainHealthy,
() => {
const scopedReplyFenceLaneKey = buildTelegramReplyFenceLaneKey({
accountId: this.opts.accountId,
sequentialKey: laneKey,
});
const abortedReplyWork = supersedeTelegramReplyFenceLane(scopedReplyFenceLaneKey);
if (!abortedReplyWork) {
this.opts.log(
`[telegram][diag] spooled update ${claimedUpdate.updateId} drain heartbeat expired without an active reply fence on lane ${laneKey}; stopping claim refresh.`,
);
}
},
);
const handler = this.#handleClaimedSpooledUpdate({
bot: params.bot,
stopClaimRefresh,
@@ -967,13 +991,13 @@ export class TelegramPollingSession {
laneKey,
task: handler,
update: claimedUpdate,
updateId: update.updateId,
updateId: claimedUpdate.updateId,
startedAt: Date.now(),
stopClaimRefresh,
};
activeSpooledUpdateHandlersByLane.set(handlerKey, state);
this.#spooledUpdateHandlerKeys.add(handlerKey);
claimedLaneKeys.add(laneKey);
blockedLaneKeys.add(laneKey);
void handler.finally(() => {
if (
!deferredSpooledUpdateClaimsByKey.has(buildDeferredSpooledUpdateClaimKey(claimedUpdate))
@@ -986,9 +1010,6 @@ export class TelegramPollingSession {
this.#spooledUpdateHandlerKeys.delete(handlerKey);
});
started += 1;
if (started >= TELEGRAM_SPOOLED_DRAIN_START_LIMIT) {
break;
}
}
return { blockedByLane, started };
}
@@ -1216,6 +1237,7 @@ export class TelegramPollingSession {
void writeTelegramSpooledUpdate({
spoolDir,
update: message.update,
laneKey: this.#rawSpooledUpdateLaneKey(message.update),
}).then(
(updateId) => {
ackSpooledUpdate(message.requestId, { ok: true, updateId });
@@ -1241,6 +1263,11 @@ export class TelegramPollingSession {
this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
const drainIntervalMs = Math.max(100, Math.floor(ingress.drainIntervalMs ?? 500));
let drainActive = false;
const drainHealth = getSpooledUpdateDrainHealth(spoolDir);
// Fail closed when the spool stops making progress: keeping any claim live would
// prevent a healthy process from recovering a wedged drain.
const isDrainHealthy = () =>
Date.now() - drainHealth.lastCompletedAt <= TELEGRAM_SPOOLED_CLAIM_HEALTH_GRACE_MS;
const stopBot = () => {
return Promise.resolve(bot.stop())
.then(() => undefined)
@@ -1282,8 +1309,13 @@ export class TelegramPollingSession {
}
drainActive = true;
drainRequested = false;
let drainCompleted = false;
try {
const drain = await this.#drainSpooledUpdates({ bot, spoolDir });
const drain = await this.#drainSpooledUpdates({
bot,
isDrainHealthy,
spoolDir,
});
consecutiveDrainFailures = 0;
for (const handlerKey of stalledBacklogKeys) {
if (
@@ -1320,12 +1352,16 @@ export class TelegramPollingSession {
} else if (timedOutRecovery) {
stalledBacklogKeys.add(timedOutRecovery.handlerKey);
}
drainCompleted = true;
} catch (err) {
consecutiveDrainFailures += 1;
this.opts.log(
`[telegram][diag] isolated polling spool drain failed (${consecutiveDrainFailures}): ${formatErrorMessage(err)}`,
);
} finally {
if (drainCompleted) {
drainHealth.lastCompletedAt = Date.now();
}
drainActive = false;
if (drainRequested && !restartRequested && !this.opts.abortSignal?.aborted) {
drainRequested = false;
@@ -1640,6 +1676,7 @@ const isGetUpdatesConflict = (err: unknown) => {
export const testing = {
resetActiveSpooledUpdateHandlersForTests: (): void => {
activeSpooledUpdateHandlersByLane.clear();
spooledUpdateDrainHealthBySpool.clear();
},
createTelegramRestartBackoffState,
resetTelegramRestartBackoffState,

View File

@@ -1650,6 +1650,49 @@ describe("sendMessageTelegram", () => {
expect(res.messageId).toBe("71");
});
it("does not reuse first-mode reply-to on media caption follow-up text", async () => {
const chatId = "123";
const longText = "A".repeat(1100);
const sendPhoto = vi.fn().mockResolvedValue({
message_id: 70,
chat: { id: chatId },
});
const sendMessage = vi.fn().mockResolvedValue({
message_id: 71,
chat: { id: chatId },
});
const api = { sendPhoto, sendMessage } as unknown as {
sendPhoto: typeof sendPhoto;
sendMessage: typeof sendMessage;
};
mockLoadedMedia({
buffer: Buffer.from("fake-image"),
contentType: "image/jpeg",
fileName: "photo.jpg",
});
await sendMessageTelegram(chatId, longText, {
cfg: TELEGRAM_TEST_CFG,
token: "tok",
api,
mediaUrl: "https://example.com/photo.jpg",
replyToMessageId: 500,
replyToIdSource: "implicit",
replyToMode: "first",
});
expectMediaSendCall(firstMockCall(sendPhoto, "send photo call"), "send photo call", chatId, {
caption: undefined,
reply_to_message_id: 500,
allow_sending_without_reply: true,
});
expect(sendMessage).toHaveBeenCalledWith(chatId, longText, {
parse_mode: "HTML",
});
});
it("chunks long default markdown media follow-up text", async () => {
const chatId = "123";
const longText = `**${"A".repeat(5000)}**`;
@@ -1658,7 +1701,10 @@ describe("sendMessageTelegram", () => {
message_id: 72,
chat: { id: chatId },
});
const sendMessage = vi.fn().mockResolvedValue({ message_id: 74, chat: { id: chatId } });
const sendMessage = vi
.fn()
.mockResolvedValueOnce({ message_id: 73, chat: { id: chatId } })
.mockResolvedValueOnce({ message_id: 74, chat: { id: chatId } });
const api = { sendPhoto, sendMessage } as unknown as {
sendPhoto: typeof sendPhoto;
sendMessage: typeof sendMessage;
@@ -1684,6 +1730,9 @@ describe("sendMessageTelegram", () => {
expect(sendMessage.mock.calls.every((call) => call[2]?.parse_mode === "HTML")).toBe(true);
expect(sendMessage.mock.calls.map((call) => String(call[1] ?? "")).join("")).toContain("A");
expect(res.messageId).toBe("74");
expect(res.receipt?.primaryPlatformMessageId).toBe("73");
expect(res.receipt?.platformMessageIds).toEqual(["73", "74"]);
expect(res.receipt?.parts.map((part) => part.kind)).toEqual(["text", "text"]);
});
it("uses caption when text is within 1024 char limit", async () => {
@@ -2499,6 +2548,93 @@ describe("sendMessageTelegram", () => {
}
});
it("returns a multipart receipt and avoids native replies for chunked first-mode text", async () => {
const sendMessage = vi
.fn()
.mockResolvedValueOnce({ message_id: 101, chat: { id: "-1001234567890" } })
.mockResolvedValueOnce({ message_id: 102, chat: { id: "-1001234567890" } });
const api = { sendMessage } as unknown as {
sendMessage: typeof sendMessage;
};
const result = await sendMessageTelegram("-1001234567890", `BEGIN ${"A".repeat(4100)} END`, {
cfg: TELEGRAM_TEST_CFG,
token: "tok",
api,
messageThreadId: 271,
replyToMessageId: 500,
replyToIdSource: "implicit",
replyToMode: "first",
});
expect(sendMessage).toHaveBeenCalledTimes(2);
expect(sendMessage.mock.calls[0]?.[2]).toEqual({
parse_mode: "HTML",
message_thread_id: 271,
});
expect(sendMessage.mock.calls[1]?.[2]).toEqual({
parse_mode: "HTML",
message_thread_id: 271,
});
expect(result.messageId).toBe("102");
expect(result.receipt?.primaryPlatformMessageId).toBe("101");
expect(result.receipt?.platformMessageIds).toEqual(["101", "102"]);
expect(result.receipt?.threadId).toBe("271");
expect(result.receipt?.replyToId).toBeUndefined();
expect(
result.receipt?.parts.map(({ platformMessageId, kind, index, threadId, replyToId }) => ({
platformMessageId,
kind,
index,
threadId,
replyToId,
})),
).toEqual([
{
platformMessageId: "101",
kind: "text",
index: 0,
threadId: "271",
replyToId: undefined,
},
{
platformMessageId: "102",
kind: "text",
index: 1,
threadId: "271",
replyToId: undefined,
},
]);
});
it("keeps explicit native replies for chunked first-mode text", async () => {
const sendMessage = vi
.fn()
.mockResolvedValueOnce({ message_id: 101, chat: { id: "-1001234567890" } })
.mockResolvedValueOnce({ message_id: 102, chat: { id: "-1001234567890" } });
const api = { sendMessage } as unknown as {
sendMessage: typeof sendMessage;
};
await sendMessageTelegram("-1001234567890", `BEGIN ${"A".repeat(4100)} END`, {
cfg: TELEGRAM_TEST_CFG,
token: "tok",
api,
replyToMessageId: 500,
replyToIdSource: "explicit",
replyToMode: "first",
});
expect(sendMessage.mock.calls[0]?.[2]).toMatchObject({
reply_to_message_id: 500,
allow_sending_without_reply: true,
});
expect(sendMessage.mock.calls[1]?.[2]).toMatchObject({
reply_to_message_id: 500,
allow_sending_without_reply: true,
});
});
it("fails topic sends instead of retrying without message_thread_id", async () => {
const cases = [{ name: "forum", chatId: "-100123", text: "hello forum" }] as const;
const threadErr = new Error("400: Bad Request: message thread not found");

View File

@@ -3,12 +3,17 @@ import * as grammy from "grammy";
import { type ApiClientOptions, Bot, HttpError } from "grammy";
import type { ReactionType, ReactionTypeEmoji } from "grammy/types";
import { recordChannelActivity } from "openclaw/plugin-sdk/channel-activity-runtime";
import type { MarkdownTableMode } from "openclaw/plugin-sdk/config-contracts";
import {
createMessageReceiptFromOutboundResults,
type MessageReceipt,
} from "openclaw/plugin-sdk/channel-outbound";
import type { MarkdownTableMode, ReplyToMode } from "openclaw/plugin-sdk/config-contracts";
import { isDiagnosticFlagEnabled } from "openclaw/plugin-sdk/diagnostic-runtime";
import { formatUncaughtError } from "openclaw/plugin-sdk/error-runtime";
import { redactSensitiveText } from "openclaw/plugin-sdk/logging-core";
import { parseStrictInteger } from "openclaw/plugin-sdk/number-runtime";
import { resolveChunkMode, resolveTextChunkLimit } from "openclaw/plugin-sdk/reply-chunking";
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
import { createTelegramRetryRunner, type RetryConfig } from "openclaw/plugin-sdk/retry-runtime";
import { createSubsystemLogger, logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime";
@@ -84,6 +89,8 @@ type TelegramEditMessageCaptionParams = Parameters<TelegramApi["editMessageCapti
type TelegramCreateForumTopicParams = NonNullable<Parameters<TelegramApi["createForumTopic"]>[2]>;
type TelegramThreadScopedParams = {
message_thread_id?: number;
reply_parameters?: { message_id?: number };
reply_to_message_id?: number;
};
const InputFileCtor = grammy.InputFile;
const MAX_TELEGRAM_PHOTO_DIMENSION_SUM = 10_000;
@@ -111,6 +118,10 @@ type TelegramSendOpts = {
silent?: boolean;
/** Message ID to reply to (for threading) */
replyToMessageId?: number;
/** Whether replyToMessageId came from ambient context or explicit payload/action input. */
replyToIdSource?: "explicit" | "implicit";
/** Controls whether replyToMessageId is applied to every internal text chunk. */
replyToMode?: ReplyToMode;
/** Quote text for Telegram reply_parameters. */
quoteText?: string;
/** Forum topic thread ID (for forum supergroups) */
@@ -124,6 +135,7 @@ type TelegramSendOpts = {
type TelegramSendResult = {
messageId: string;
chatId: string;
receipt?: MessageReceipt;
};
type TelegramMessageLike = {
@@ -274,6 +286,42 @@ function logTelegramOutboundSendOk(params: TelegramOutboundSuccessLogParams): vo
sendLogger.info(parts.join(" "));
}
function buildTelegramTextSendReceipt(params: {
messageIds: readonly string[];
chatId: string;
messageThreadId?: number;
replyToMessageId?: number;
}): MessageReceipt | undefined {
if (params.messageIds.length <= 1) {
return undefined;
}
return createMessageReceiptFromOutboundResults({
results: params.messageIds.map((messageId) => ({
messageId,
chatId: params.chatId,
})),
kind: "text",
...(typeof params.messageThreadId === "number"
? { threadId: String(params.messageThreadId) }
: {}),
...(typeof params.replyToMessageId === "number"
? { replyToId: String(params.replyToMessageId) }
: {}),
});
}
function resolveAcceptedReplyToMessageId(
params: TelegramThreadScopedParams | TelegramRichMessageContextParams | undefined,
): number | undefined {
if (!params) {
return undefined;
}
if ("reply_to_message_id" in params) {
return params.reply_to_message_id;
}
return params.reply_parameters?.message_id;
}
const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i;
const MESSAGE_NOT_MODIFIED_RE =
/400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i;
@@ -661,19 +709,26 @@ export async function sendMessageTelegram(
(typeof account.config.mediaMaxMb === "number" ? account.config.mediaMaxMb : 100) * 1024 * 1024;
const replyMarkup = buildInlineKeyboard(opts.buttons);
const threadParams = buildTelegramThreadReplyParams({
thread: resolveTelegramSendThreadSpec({
targetMessageThreadId: target.messageThreadId,
messageThreadId: opts.messageThreadId,
chatType: target.chatType,
}),
replyToMessageId: opts.replyToMessageId,
replyQuoteText: opts.quoteText,
useReplyIdAsQuoteSource: true,
const threadSpec = resolveTelegramSendThreadSpec({
targetMessageThreadId: target.messageThreadId,
messageThreadId: opts.messageThreadId,
chatType: target.chatType,
});
const richThreadParams = toTelegramRichMessageContextParams(threadParams);
const hasThreadParams = Object.keys(threadParams).length > 0;
const hasRichThreadParams = Object.keys(richThreadParams).length > 0;
const singleUseReplyTo =
opts.replyToIdSource === "implicit" &&
opts.replyToMode !== undefined &&
isSingleUseReplyToMode(opts.replyToMode);
const buildThreadParams = (includeReplyTo: boolean) =>
buildTelegramThreadReplyParams({
thread: threadSpec,
...(includeReplyTo
? {
replyToMessageId: opts.replyToMessageId,
replyQuoteText: opts.quoteText,
useReplyIdAsQuoteSource: true,
}
: {}),
});
const requestWithDiag = createTelegramNonIdempotentRequestWithDiag({
cfg,
account,
@@ -746,29 +801,59 @@ export async function sendMessageTelegram(
return { result, acceptedParams: params };
};
const buildTextParams = (isLastChunk: boolean) =>
hasThreadParams || (isLastChunk && replyMarkup)
? {
...threadParams,
...(isLastChunk && replyMarkup ? { reply_markup: replyMarkup } : {}),
}
: undefined;
const shouldIncludeReplyForChunk = (
index: number,
chunkCount: number,
replyToAlreadyUsed: boolean,
) =>
// Telegram Desktop can render long formatted reply chunks as unsupported messages.
// Multi-part `first` replies keep chat/topic routing but avoid hiding chunk text.
!replyToAlreadyUsed && (!singleUseReplyTo || (chunkCount === 1 && index === 0));
const buildRichTextParams = (isLastChunk: boolean) =>
hasRichThreadParams || (isLastChunk && replyMarkup)
const buildTextParams = (
index: number,
chunkCount: number,
isLastChunk: boolean,
replyToAlreadyUsed: boolean,
) => {
const params = buildThreadParams(
shouldIncludeReplyForChunk(index, chunkCount, replyToAlreadyUsed),
);
return Object.keys(params).length > 0 || (isLastChunk && replyMarkup)
? {
...richThreadParams,
...params,
...(isLastChunk && replyMarkup ? { reply_markup: replyMarkup } : {}),
}
: undefined;
};
const buildRichTextParams = (
index: number,
chunkCount: number,
isLastChunk: boolean,
replyToAlreadyUsed: boolean,
) => {
const params = toTelegramRichMessageContextParams(
buildThreadParams(shouldIncludeReplyForChunk(index, chunkCount, replyToAlreadyUsed)),
);
return Object.keys(params).length > 0 || (isLastChunk && replyMarkup)
? {
...params,
...(isLastChunk && replyMarkup ? { reply_markup: replyMarkup } : {}),
}
: undefined;
};
const sendTelegramTextChunks = async (
chunks: TelegramTextChunk[],
context: string,
): Promise<{ messageId: string; chatId: string }> => {
options: { replyToAlreadyUsed?: boolean } = {},
): Promise<TelegramSendResult> => {
let lastMessageId = "";
let lastChatId = chatId;
let lastAcceptedParams: TelegramThreadScopedParams | undefined;
let acceptedReplyToMessageId: number | undefined;
const messageIds: string[] = [];
let sentChunkCount = 0;
for (let index = 0; index < chunks.length; index += 1) {
const chunk = chunks[index];
@@ -777,7 +862,12 @@ export async function sendMessageTelegram(
}
const { result: res, acceptedParams } = await sendTelegramTextChunk(
chunk,
buildTextParams(index === chunks.length - 1),
buildTextParams(
index,
chunks.length,
index === chunks.length - 1,
options.replyToAlreadyUsed === true,
),
);
const messageId = resolveTelegramMessageIdOrThrow(res, context);
recordSentMessage(chatId, messageId, cfg);
@@ -795,6 +885,8 @@ export async function sendMessageTelegram(
lastMessageId = String(messageId);
lastChatId = String(res?.chat?.id ?? chatId);
lastAcceptedParams = acceptedParams;
acceptedReplyToMessageId ??= resolveAcceptedReplyToMessageId(acceptedParams);
messageIds.push(lastMessageId);
sentChunkCount += 1;
}
if (lastMessageId) {
@@ -810,7 +902,17 @@ export async function sendMessageTelegram(
chunkCount: sentChunkCount,
});
}
return { messageId: lastMessageId, chatId: lastChatId };
const receipt = buildTelegramTextSendReceipt({
messageIds,
chatId: lastChatId,
messageThreadId: lastAcceptedParams?.message_thread_id,
replyToMessageId: acceptedReplyToMessageId,
});
return {
messageId: lastMessageId,
chatId: lastChatId,
...(receipt ? { receipt } : {}),
};
};
const buildChunkedTextPlan = (rawText: string, context: string): TelegramTextChunk[] => {
@@ -841,10 +943,14 @@ export async function sendMessageTelegram(
}));
};
const sendChunkedText = async (rawText: string, context: string) =>
const sendChunkedText = async (
rawText: string,
context: string,
options: { replyToAlreadyUsed?: boolean } = {},
) =>
useRichMessages
? await sendTelegramRichTextChunks(buildRichTextPlan(rawText), context)
: await sendTelegramTextChunks(buildChunkedTextPlan(rawText, context), context);
? await sendTelegramRichTextChunks(buildRichTextPlan(rawText), context, options)
: await sendTelegramTextChunks(buildChunkedTextPlan(rawText, context), context, options);
const buildRichTextPlan = (rawText: string): TelegramRichTextChunk[] => {
const textLimit = Math.min(
@@ -866,18 +972,26 @@ export async function sendMessageTelegram(
const sendTelegramRichTextChunks = async (
chunks: TelegramRichTextChunk[],
context: string,
): Promise<{ messageId: string; chatId: string }> => {
options: { replyToAlreadyUsed?: boolean } = {},
): Promise<TelegramSendResult> => {
const richRawApi = getTelegramRichRawApi(api);
let lastMessageId = "";
let lastChatId = chatId;
let lastAcceptedParams: TelegramRichMessageContextParams | undefined;
let acceptedReplyToMessageId: number | undefined;
const messageIds: string[] = [];
let sentChunkCount = 0;
for (let index = 0; index < chunks.length; index += 1) {
const chunk = chunks[index];
if (!chunk) {
continue;
}
const acceptedParams = buildRichTextParams(index === chunks.length - 1);
const acceptedParams = buildRichTextParams(
index,
chunks.length,
index === chunks.length - 1,
options.replyToAlreadyUsed === true,
);
const result = await requestWithChatNotFound(
() =>
richRawApi.sendRichMessage({
@@ -907,6 +1021,8 @@ export async function sendMessageTelegram(
lastMessageId = String(messageId);
lastChatId = String(result?.chat?.id ?? chatId);
lastAcceptedParams = acceptedParams;
acceptedReplyToMessageId ??= resolveAcceptedReplyToMessageId(acceptedParams);
messageIds.push(lastMessageId);
sentChunkCount += 1;
}
if (lastMessageId) {
@@ -922,7 +1038,17 @@ export async function sendMessageTelegram(
chunkCount: sentChunkCount,
});
}
return { messageId: lastMessageId, chatId: lastChatId };
const receipt = buildTelegramTextSendReceipt({
messageIds,
chatId: lastChatId,
messageThreadId: lastAcceptedParams?.message_thread_id,
replyToMessageId: acceptedReplyToMessageId,
});
return {
messageId: lastMessageId,
chatId: lastChatId,
...(receipt ? { receipt } : {}),
};
};
async function shouldSendTelegramImageAsPhoto(buffer: Buffer): Promise<boolean> {
@@ -1001,8 +1127,10 @@ export async function sendMessageTelegram(
const needsSeparateText = Boolean(followUpText);
// When splitting, put reply_markup only on the follow-up text (the "main" content),
// not on the media message.
const mediaThreadParams = buildThreadParams(true);
const mediaUsedReplyTo = resolveAcceptedReplyToMessageId(mediaThreadParams) !== undefined;
const baseMediaParams = {
...(hasThreadParams ? threadParams : {}),
...mediaThreadParams,
...(!needsSeparateText && replyMarkup ? { reply_markup: replyMarkup } : {}),
};
const videoDimensions =
@@ -1145,8 +1273,13 @@ export async function sendMessageTelegram(
// If text was too long for a caption, send it as a separate follow-up message.
// Use HTML conversion so markdown renders like captions.
if (needsSeparateText && followUpText) {
const textResult = await sendChunkedText(followUpText, "text follow-up send");
return { messageId: textResult.messageId, chatId: resolvedChatId };
const textResult = await sendChunkedText(followUpText, "text follow-up send", {
replyToAlreadyUsed: singleUseReplyTo && mediaUsedReplyTo,
});
return {
...textResult,
chatId: resolvedChatId,
};
}
return { messageId: String(mediaMessageId), chatId: resolvedChatId };
@@ -1219,7 +1352,7 @@ export async function reactMessageTelegram(
account,
retry: opts.retry,
verbose: opts.verbose,
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "react" }),
});
const remove = opts.remove === true;
const trimmedEmoji = emoji.trim();
@@ -1276,7 +1409,7 @@ export async function deleteMessageTelegram(
account,
retry: opts.retry,
verbose: opts.verbose,
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "delete" }),
});
try {
await requestWithDiag(() => api.deleteMessage(chatId, messageId), "deleteMessage", {

View File

@@ -10,6 +10,7 @@ import { afterEach, describe, expect, it } from "vitest";
import { clearTelegramRuntime, setTelegramRuntime } from "./runtime.js";
import type { TelegramRuntime } from "./runtime.types.js";
import {
claimNextTelegramSpooledUpdate,
claimTelegramSpooledUpdate,
deleteTelegramSpooledUpdate,
failTelegramSpooledUpdateClaim,
@@ -118,6 +119,120 @@ describe("Telegram ingress spool", () => {
});
});
it("claims next update through the native ingress queue in update id order", async () => {
await withTempSpool(async (spoolDir) => {
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 101, message: { chat: { id: 1 }, message_id: 1, text: "second" } },
now: 1,
});
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 100, message: { chat: { id: 1 }, message_id: 2, text: "first" } },
now: 2,
});
const claimed = await claimNextTelegramSpooledUpdate({ spoolDir });
expect(claimed?.updateId).toBe(100);
expect(await listTelegramSpooledUpdates({ spoolDir })).toHaveLength(1);
expect(
(await listTelegramSpooledUpdateClaims({ spoolDir })).map((claim) => claim.updateId),
).toEqual([100]);
});
});
it("derives lane keys while claiming legacy rows without stored lane keys", async () => {
await withTempSpool(async (spoolDir) => {
const stateDir = path.dirname(path.dirname(spoolDir));
const queue = createChannelIngressQueue<{
version: 1;
updateId: number;
receivedAt: number;
update: unknown;
}>({
channelId: "telegram",
accountId: "test",
stateDir,
});
await queue.enqueue(
"0000000000000042",
{
version: 1,
updateId: 42,
receivedAt: 1,
update: {
update_id: 42,
message: {
chat: { id: 100, type: "supergroup", is_forum: true },
is_topic_message: true,
message_id: 1,
message_thread_id: 10,
text: "blocked topic",
},
},
},
{ receivedAt: 1 },
);
await queue.enqueue(
"0000000000000043",
{
version: 1,
updateId: 43,
receivedAt: 2,
update: {
update_id: 43,
message: {
chat: { id: 100, type: "supergroup", is_forum: true },
is_topic_message: true,
message_id: 2,
message_thread_id: 11,
text: "open topic",
},
},
},
{ receivedAt: 2 },
);
const claimed = await claimNextTelegramSpooledUpdate({
spoolDir,
blockedLaneKeys: ["telegram:100:topic:10"],
});
expect(claimed?.updateId).toBe(43);
expect(claimed?.claim?.claimToken).toEqual(expect.any(String));
expect(
(await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId),
).toEqual([42]);
});
});
it("does not claim outside the provided candidate update ids", async () => {
await withTempSpool(async (spoolDir) => {
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 200, message: { chat: { id: 1 }, message_id: 1, text: "first" } },
now: 1,
});
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 201, message: { chat: { id: 2 }, message_id: 1, text: "later" } },
now: 2,
});
const claimed = await claimNextTelegramSpooledUpdate({
spoolDir,
blockedLaneKeys: ["telegram:1"],
candidateUpdateIds: [200],
});
expect(claimed).toBeNull();
expect(
(await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId),
).toEqual([200, 201]);
});
});
it("releases failed claims back to the pending spool", async () => {
await withTempSpool(async (spoolDir) => {
await writeTelegramSpooledUpdate({
@@ -303,7 +418,7 @@ describe("Telegram ingress spool", () => {
).toBe(false);
});
it("treats fresh claims with reused pids and different owner ids as live-owned", () => {
it("does not treat fresh claims with the current pid and a different owner id as foreign", () => {
const now = Date.now();
expect(
isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess({
@@ -318,6 +433,25 @@ describe("Telegram ingress spool", () => {
claimedAt: now,
},
}),
).toBe(false);
});
it("treats fresh claims with other live pids as live-owned", () => {
const now = Date.now();
const liveOwnerPid = process.ppid > 0 ? process.ppid : 1;
expect(
isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess({
updateId: 51,
path: path.join(os.tmpdir(), "51.json.processing"),
pendingPath: path.join(os.tmpdir(), "51.json"),
update: { update_id: 51 },
receivedAt: now,
claim: {
processId: "other-process",
processPid: liveOwnerPid,
claimedAt: now,
},
}),
).toBe(true);
});
});

View File

@@ -9,12 +9,15 @@ import type {
ChannelIngressQueueRecord,
} from "openclaw/plugin-sdk/channel-outbound";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import type { TelegramBotInfo } from "./bot-info.js";
import { getTelegramRuntime } from "./runtime.js";
import { getTelegramSequentialKey } from "./sequential-key.js";
import { normalizeTelegramStateAccountId } from "./state-account-id.js";
const SPOOL_VERSION = 1;
const TELEGRAM_INGRESS_SPOOL_PREFIX = "ingress-spool-";
export const TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS = 6 * 60 * 60 * 1000;
export const TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS = 30 * 60 * 1000;
const TELEGRAM_SPOOLED_UPDATE_FAILED_TTL_MS = 30 * 24 * 60 * 60 * 1000;
const TELEGRAM_SPOOLED_UPDATE_FAILED_MAX_ENTRIES = 1000;
const TELEGRAM_SPOOLED_UPDATE_PROCESS_ID = `${process.pid}:${randomUUID()}`;
@@ -152,8 +155,13 @@ function processExists(pid: number): boolean {
}
}
function isFreshClaimOwner(claim: TelegramSpooledUpdateClaimOwner): boolean {
return Date.now() - claim.claimedAt < TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS;
function isFreshClaimOwner(
claim: TelegramSpooledUpdateClaimOwner,
options?: { maxAgeMs?: number; now?: number },
): boolean {
const now = options?.now ?? Date.now();
const maxAgeMs = options?.maxAgeMs ?? TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS;
return now - claim.claimedAt < maxAgeMs;
}
function parseQueueRecord(
@@ -196,6 +204,13 @@ function parseQueueClaim(
};
}
function spooledUpdateLaneKey(update: unknown, botInfo?: TelegramBotInfo): string {
return getTelegramSequentialKey({
update: update as Parameters<typeof getTelegramSequentialKey>[0]["update"],
...(botInfo ? { me: botInfo } : {}),
});
}
function sortTelegramUpdates<T extends TelegramSpooledUpdate>(updates: T[]): T[] {
return updates.toSorted((a, b) => a.updateId - b.updateId);
}
@@ -207,18 +222,27 @@ function queueMutationTarget(update: TelegramSpooledUpdate): string | ChannelIng
export function isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess(
claim: ClaimedTelegramSpooledUpdate,
options?: { maxAgeMs?: number; now?: number },
): boolean {
return Boolean(
claim.claim &&
claim.claim.processId !== TELEGRAM_SPOOLED_UPDATE_PROCESS_ID &&
isFreshClaimOwner(claim.claim) &&
claim.claim.processPid !== process.pid &&
isFreshClaimOwner(claim.claim, options) &&
processExists(claim.claim.processPid),
);
}
export function isTelegramSpooledUpdateClaimOwnedByCurrentProcess(
claim: ClaimedTelegramSpooledUpdate,
): boolean {
return claim.claim?.processId === TELEGRAM_SPOOLED_UPDATE_PROCESS_ID;
}
export async function writeTelegramSpooledUpdate(params: {
spoolDir: string;
update: unknown;
laneKey?: string;
now?: number;
}): Promise<number> {
const updateId = resolveTelegramUpdateId(params.update);
@@ -236,7 +260,10 @@ export async function writeTelegramSpooledUpdate(params: {
receivedAt,
update: params.update,
},
{ receivedAt },
{
receivedAt,
laneKey: params.laneKey ?? spooledUpdateLaneKey(params.update),
},
);
return updateId;
}
@@ -271,6 +298,38 @@ export async function claimTelegramSpooledUpdate(
return claimed ? parseQueueClaim(spoolDir, claimed) : null;
}
export async function claimNextTelegramSpooledUpdate(params: {
spoolDir: string;
blockedLaneKeys?: Iterable<string>;
botInfo?: TelegramBotInfo;
candidateUpdateIds?: Iterable<number>;
scanLimit?: number;
}): Promise<ClaimedTelegramSpooledUpdate | null> {
const queue = createTelegramIngressQueue(params.spoolDir);
const claimed = await queue.claimNext({
ownerId: TELEGRAM_SPOOLED_UPDATE_PROCESS_ID,
blockedLaneKeys: params.blockedLaneKeys,
...(params.candidateUpdateIds === undefined
? {}
: { candidateIds: [...params.candidateUpdateIds].map(queueEventId) }),
orderBy: "id",
scanLimit: params.scanLimit,
deriveLaneKey: (record) => spooledUpdateLaneKey(record.payload.update, params.botInfo),
});
if (!claimed) {
return null;
}
const update = parseQueueClaim(params.spoolDir, claimed);
if (update) {
return update;
}
await queue.fail(claimed, {
reason: "invalid-spooled-update",
message: "Telegram spooled update payload was invalid.",
});
return null;
}
export async function releaseTelegramSpooledUpdateClaim(
update: ClaimedTelegramSpooledUpdate,
options?: { lastError?: string; releasedAt?: number },

View File

@@ -1,5 +1,6 @@
// Whatsapp plugin module implements util behavior.
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/string-coerce-runtime";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
export function elide(text?: string, limit = 400) {
if (!text) {
@@ -8,7 +9,8 @@ export function elide(text?: string, limit = 400) {
if (text.length <= limit) {
return text;
}
return `${text.slice(0, limit)}… (truncated ${text.length - limit} chars)`;
const truncated = truncateUtf16Safe(text, limit);
return `${truncated}… (truncated ${text.length - truncated.length} chars)`;
}
export function markWhatsAppVisibleDeliveryError(error: unknown): unknown {

View File

@@ -365,6 +365,15 @@ describe("web auto-reply util", () => {
});
describe("elide", () => {
const hasLoneSurrogate = (value: string): boolean =>
Array.from(value).some((char) => {
if (char.length !== 1) {
return false;
}
const codeUnit = char.charCodeAt(0);
return codeUnit >= 0xd800 && codeUnit <= 0xdfff;
});
it("returns undefined for undefined input", () => {
expect(elide(undefined)).toBe(undefined);
});
@@ -376,6 +385,20 @@ describe("web auto-reply util", () => {
it("truncates and annotates when over limit", () => {
expect(elide("abcdef", 3)).toBe("abc… (truncated 3 chars)");
});
it("does not split surrogate pairs when the limit lands inside an emoji", () => {
const output = elide("😀😀😀", 5);
expect(output).toBe("😀😀… (truncated 2 chars)");
expect(hasLoneSurrogate(output ?? "")).toBe(false);
});
it("keeps a complete astral character when it fits before the limit", () => {
const output = elide("ab😀cd", 4);
expect(output).toBe("ab😀… (truncated 2 chars)");
expect(hasLoneSurrogate(output ?? "")).toBe(false);
});
});
describe("isLikelyWhatsAppCryptoError", () => {

View File

@@ -288,6 +288,15 @@ export function markdownToWhatsApp(text: string): string {
return `${WHATSAPP_INLINE_CODE_PLACEHOLDER}${inlineCodes.length - 1}${WHATSAPP_PLACEHOLDER_TERMINATOR}`;
});
// Convert combined GFM strong+emphasis before plain strong so the plain
// rules cannot leave literal `**` around the inner emphasis.
result = result.replace(/\*\*\*(.+?)\*\*\*/g, "*_$1_*");
result = result.replace(/___(.+?)___/g, "*_$1_*");
result = result.replace(/\*\*_(.+?)_\*\*/g, "*_$1_*");
result = result.replace(/__\*(.+?)\*__/g, "*_$1_*");
result = result.replace(/_\*\*(.+?)\*\*_/g, "*_$1_*");
result = result.replace(/\*__(.+?)__\*/g, "*_$1_*");
result = result.replace(/\*\*(.+?)\*\*/g, "*$1*");
result = result.replace(/__(.+?)__/g, "*$1*");
result = result.replace(/~~(.+?)~~/g, "~$1~");

View File

@@ -42,6 +42,17 @@ describe("markdownToWhatsApp", () => {
["returns empty string for empty input", "", ""],
["returns plain text unchanged", "no formatting here", "no formatting here"],
["handles bold inside a sentence", "This is **very** important", "This is *very* important"],
["converts GFM ***bold italic*** to WhatsApp bold+italic", "***bi***", "*_bi_*"],
["converts GFM __*bold italic*__ to WhatsApp bold+italic", "__*y*__", "*_y_*"],
["converts GFM **_bold italic_** to WhatsApp bold+italic", "**_x_**", "*_x_*"],
["converts GFM ___bold italic___ to WhatsApp bold+italic", "___z___", "*_z_*"],
["converts GFM *__bold italic__* to WhatsApp bold+italic", "*__q__*", "*_q_*"],
["converts GFM _**bold italic**_ to WhatsApp bold+italic", "_**r**_", "*_r_*"],
[
"preserves inline code containing bold-italic markers",
"Use `***not bold italic***` here",
"Use `***not bold italic***` here",
],
// Regression: a digit immediately after an inline-code span must not be
// absorbed into the placeholder index (which previously dropped both).
["preserves inline code immediately followed by a digit", "`a`5", "`a`5"],

View File

@@ -18,6 +18,7 @@ const providerAuthRuntimeMocks = vi.hoisted(() => ({
vi.mock("openclaw/plugin-sdk/provider-auth-runtime", () => providerAuthRuntimeMocks);
import plugin from "./index.js";
import manifest from "./openclaw.plugin.json" with { type: "json" };
import { buildLiveXaiProvider } from "./provider-catalog.js";
import setupPlugin from "./setup-api.js";
import {
@@ -82,13 +83,24 @@ describe("xai provider plugin", () => {
vi.unstubAllGlobals();
});
it("exposes OAuth and device-code auth choices", async () => {
it("exposes xAI OAuth and preserves the explicit device-code alias", async () => {
const provider = await registerSingleProviderPlugin(plugin);
expect(provider.auth?.map((method) => method.id)).toEqual(["api-key", "oauth", "device-code"]);
const oauth = provider.auth?.find((method) => method.id === "oauth");
expect(oauth?.kind).toBe("oauth");
expect(oauth?.wizard?.choiceId).toBe("xai-oauth");
const deviceCode = provider.auth?.find((method) => method.id === "device-code");
expect(deviceCode?.kind).toBe("device_code");
expect(deviceCode?.wizard?.choiceId).toBe("xai-device-code");
expect(deviceCode?.wizard?.assistantVisibility).toBe("manual-only");
expect(manifest.providerAuthChoices).toContainEqual(
expect.objectContaining({
assistantVisibility: "manual-only",
choiceId: "xai-device-code",
method: "device-code",
}),
);
});
it("filters the xAI API-key catalog against live model ids", async () => {

Some files were not shown because too many files have changed in this diff Show More