mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-28 10:21:45 +08:00
Compare commits
38 Commits
codex/slac
...
codex/tele
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
03578e7a7b | ||
|
|
426c137e36 | ||
|
|
1bdde66950 | ||
|
|
2720ac06b7 | ||
|
|
ce15f348bb | ||
|
|
e5c3c59c67 | ||
|
|
2e881ab1c6 | ||
|
|
90c20d15c2 | ||
|
|
cb8bc71ff8 | ||
|
|
b5c662f4f5 | ||
|
|
d693ed4af3 | ||
|
|
6c5a9fde9f | ||
|
|
b8e3de1160 | ||
|
|
b9c64142e2 | ||
|
|
84bcd500c9 | ||
|
|
f857e8d66e | ||
|
|
a048aeae16 | ||
|
|
4b9e01813e | ||
|
|
7830faa5fe | ||
|
|
ddedf13190 | ||
|
|
cb4244fe15 | ||
|
|
361869e434 | ||
|
|
4010b81a77 | ||
|
|
8fa24325b5 | ||
|
|
f4fa10c2c5 | ||
|
|
2100ee7cc8 | ||
|
|
6e8f30c0e2 | ||
|
|
9d800b71c0 | ||
|
|
5ccfc97b31 | ||
|
|
a7bfc06f45 | ||
|
|
c5d34c8376 | ||
|
|
fbfadbd806 | ||
|
|
6f1076351c | ||
|
|
898ca9741c | ||
|
|
67118d5ab9 | ||
|
|
bf2a8ecfdb | ||
|
|
cee2aca409 | ||
|
|
56259606d1 |
@@ -316,6 +316,11 @@ conversation bindings, or any non-Codex harness.
|
||||
plugin/app support for the Codex harness. Default: `false`.
|
||||
- `plugins.entries.codex.config.codexPlugins.allow_destructive_actions`:
|
||||
default destructive-action policy for migrated plugin app elicitations.
|
||||
Use `true` to accept safe Codex approval schemas without prompting, `false`
|
||||
to decline them, `"auto"` to route Codex-required approvals through OpenClaw
|
||||
plugin approvals, or `"always"` to ask for every plugin write/destructive
|
||||
action without durable approval. The `"always"` mode clears durable Codex
|
||||
per-tool approval overrides for the affected app before starting the thread.
|
||||
Default: `true`.
|
||||
- `plugins.entries.codex.config.codexPlugins.plugins.<key>.enabled`: enables a
|
||||
migrated plugin entry when global `codexPlugins.enabled` is also true.
|
||||
@@ -326,7 +331,8 @@ conversation bindings, or any non-Codex harness.
|
||||
Codex plugin identity from migration, for example `"google-calendar"`.
|
||||
- `plugins.entries.codex.config.codexPlugins.plugins.<key>.allow_destructive_actions`:
|
||||
per-plugin destructive-action override. When omitted, the global
|
||||
`allow_destructive_actions` value is used.
|
||||
`allow_destructive_actions` value is used. The per-plugin value accepts the
|
||||
same `true`, `false`, `"auto"`, or `"always"` policies.
|
||||
|
||||
`codexPlugins.enabled` is the global enablement directive. Explicit plugin
|
||||
entries written by migration are the durable install and repair eligibility set.
|
||||
|
||||
@@ -200,11 +200,11 @@ enabled.
|
||||
|
||||
OpenClaw sets app-level `destructive_enabled` from the effective global or
|
||||
per-plugin `allow_destructive_actions` policy and lets Codex enforce
|
||||
destructive tool metadata from its native app tool annotations. `true` and
|
||||
`"auto"` both set `destructive_enabled: true`; `false` sets it false. The
|
||||
`_default` app config is disabled with `open_world_enabled: false`. Enabled
|
||||
plugin apps are emitted with `open_world_enabled: true`; OpenClaw does not
|
||||
expose a separate plugin open-world policy knob and does not maintain
|
||||
destructive tool metadata from its native app tool annotations. `true`,
|
||||
`"auto"`, and `"always"` set `destructive_enabled: true`; `false` sets it
|
||||
false. The `_default` app config is disabled with `open_world_enabled: false`.
|
||||
Enabled plugin apps are emitted with `open_world_enabled: true`; OpenClaw does
|
||||
not expose a separate plugin open-world policy knob and does not maintain
|
||||
per-plugin destructive tool-name deny lists.
|
||||
|
||||
Tool approval mode is automatic by default for plugin apps so non-destructive
|
||||
@@ -225,6 +225,10 @@ plugins, while unsafe schemas and ambiguous ownership still fail closed:
|
||||
- When policy is `"auto"`, OpenClaw exposes destructive plugin actions to
|
||||
Codex but turns ownership-proven MCP approval elicitations into OpenClaw
|
||||
plugin approvals before returning the Codex approval response.
|
||||
- When policy is `"always"`, OpenClaw uses the same Codex write/destructive
|
||||
gating as `"auto"`, clears durable Codex per-tool approval overrides for the
|
||||
app before the thread starts, and only offers one-shot approval or denial so
|
||||
durable approvals cannot suppress later write-action prompts.
|
||||
- Missing plugin identity, ambiguous ownership, a missing turn id, a wrong turn
|
||||
id, or an unsafe elicitation schema declines instead of prompting.
|
||||
|
||||
@@ -272,8 +276,9 @@ Codex thread bindings keep the app config they started with until OpenClaw
|
||||
establishes a new harness session or replaces a stale binding.
|
||||
|
||||
**Destructive action is declined:** check the global and per-plugin
|
||||
`allow_destructive_actions` values. Even when policy is true or `"auto"`,
|
||||
unsafe elicitation schemas and ambiguous plugin identity still fail closed.
|
||||
`allow_destructive_actions` values. Even when policy is true, `"auto"`, or
|
||||
`"always"`, unsafe elicitation schemas and ambiguous plugin identity still fail
|
||||
closed.
|
||||
|
||||
## Related
|
||||
|
||||
|
||||
@@ -29,10 +29,11 @@ Use the path that matches your OpenClaw install state:
|
||||
openclaw onboard --install-daemon
|
||||
```
|
||||
|
||||
On a VPS or over SSH, use device-code during onboarding:
|
||||
On a VPS or over SSH, select xAI OAuth directly; OpenClaw uses device-code
|
||||
verification and does not require a localhost callback:
|
||||
|
||||
```bash
|
||||
openclaw onboard --install-daemon --auth-choice xai-device-code
|
||||
openclaw onboard --install-daemon --auth-choice xai-oauth
|
||||
```
|
||||
|
||||
OAuth does not require an xAI API key. OpenClaw does not require the Grok
|
||||
@@ -48,13 +49,6 @@ Use the path that matches your OpenClaw install state:
|
||||
openclaw models auth login --provider xai --method oauth
|
||||
```
|
||||
|
||||
Use the device-code flow instead when the Gateway runs over SSH, Docker, or
|
||||
a VPS and a localhost browser callback is awkward:
|
||||
|
||||
```bash
|
||||
openclaw models auth login --provider xai --device-code
|
||||
```
|
||||
|
||||
To make Grok the default model after signing in, apply it separately:
|
||||
|
||||
```bash
|
||||
@@ -86,8 +80,7 @@ Use the path that matches your OpenClaw install state:
|
||||
|
||||
<Note>
|
||||
OpenClaw uses the xAI Responses API as the bundled xAI transport. The same
|
||||
credential from `openclaw models auth login --provider xai --method oauth`,
|
||||
`openclaw models auth login --provider xai --device-code`, or
|
||||
credential from `openclaw models auth login --provider xai --method oauth` or
|
||||
`openclaw models auth login --provider xai --method api-key` can also power first-class
|
||||
`web_search`, `x_search`, remote `code_execution`, and xAI image/video generation.
|
||||
Speech and transcription currently require `XAI_API_KEY` or provider config.
|
||||
@@ -102,8 +95,9 @@ and, by default, `x_search` through an operator xAI Responses proxy.
|
||||
|
||||
## OAuth troubleshooting
|
||||
|
||||
- If browser OAuth cannot reach `127.0.0.1:56121`, use
|
||||
`openclaw models auth login --provider xai --device-code`.
|
||||
- For SSH, Docker, VPS, or other remote setups, use
|
||||
`openclaw models auth login --provider xai --method oauth`; xAI OAuth uses
|
||||
device-code verification instead of a localhost callback.
|
||||
- If sign-in succeeds but Grok is not the default model, run
|
||||
`openclaw models set xai/grok-4.3`.
|
||||
- To inspect saved xAI auth profiles, run:
|
||||
@@ -117,9 +111,9 @@ and, by default, `x_search` through an operator xAI Responses proxy.
|
||||
eligible, try the API-key path or check the subscription on xAI's side.
|
||||
|
||||
<Tip>
|
||||
Use `xai-device-code` when signing in from SSH, Docker, or a VPS. OpenClaw
|
||||
prints an xAI URL and short code; finish sign-in in any local browser while the
|
||||
remote process polls xAI for the completed token exchange.
|
||||
Use `xai-oauth` when signing in from SSH, Docker, or a VPS. OpenClaw prints an
|
||||
xAI URL and short code; finish sign-in in any local browser while the remote
|
||||
process polls xAI for the completed token exchange.
|
||||
</Tip>
|
||||
|
||||
## Built-in catalog
|
||||
@@ -498,12 +492,10 @@ Legacy aliases still normalize to the canonical bundled ids:
|
||||
|
||||
<Accordion title="Known limits">
|
||||
- xAI auth can use an API key, environment variable, plugin config fallback,
|
||||
browser OAuth, or device-code OAuth with an eligible xAI account. Browser
|
||||
OAuth uses a local callback on `127.0.0.1:56121`; for remote hosts, use
|
||||
`xai-device-code` unless you want to forward that port before opening the
|
||||
sign-in URL. xAI decides which accounts can receive OAuth API tokens, and
|
||||
the consent page may show Grok Build even though OpenClaw does not require
|
||||
the Grok Build app.
|
||||
or OAuth with an eligible xAI account. OAuth uses device-code verification
|
||||
without a localhost callback. xAI decides which accounts can receive OAuth
|
||||
API tokens, and the consent page may show Grok Build even though OpenClaw
|
||||
does not require the Grok Build app.
|
||||
- OpenClaw does not currently expose the xAI multi-agent model family. xAI
|
||||
serves these models through the Responses API, but they do not accept the
|
||||
client-side or custom tools used by OpenClaw's shared agent loop. See the
|
||||
|
||||
@@ -38,13 +38,13 @@ Do **not** use it when you need local files, your shell, your repo, or paired de
|
||||
<Steps>
|
||||
<Step title="Provide xAI credentials">
|
||||
Sign in with Grok OAuth using an eligible SuperGrok or X Premium subscription,
|
||||
use the remote-friendly device-code flow, or store an API key. OAuth works
|
||||
for `code_execution` and `x_search`; `XAI_API_KEY` or plugin web-search
|
||||
config can also power Grok `web_search`.
|
||||
or store an API key. xAI OAuth uses device-code verification, so it works
|
||||
from remote hosts without a localhost callback. OAuth works for
|
||||
`code_execution` and `x_search`; `XAI_API_KEY` or plugin web-search config
|
||||
can also power Grok `web_search`.
|
||||
|
||||
```bash
|
||||
openclaw models auth login --provider xai --method oauth
|
||||
openclaw models auth login --provider xai --device-code
|
||||
```
|
||||
|
||||
During a fresh install, the same auth choices are available inside
|
||||
@@ -52,7 +52,7 @@ Do **not** use it when you need local files, your shell, your repo, or paired de
|
||||
|
||||
```bash
|
||||
openclaw onboard --install-daemon
|
||||
openclaw onboard --install-daemon --auth-choice xai-device-code
|
||||
openclaw onboard --install-daemon --auth-choice xai-oauth
|
||||
```
|
||||
|
||||
Or use an API key:
|
||||
|
||||
@@ -192,6 +192,109 @@ describe("AcpxRuntime fresh reset wrapper", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("adds the OpenClaw session key to the managed OpenClaw tools MCP bridge", () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => undefined),
|
||||
save: vi.fn(async () => {}),
|
||||
};
|
||||
const { runtime } = makeRuntime(baseStore, {
|
||||
openclawToolsMcpBridgeEnabled: true,
|
||||
mcpServers: [
|
||||
{
|
||||
name: "openclaw-tools",
|
||||
command: "node",
|
||||
args: ["dist/mcp/openclaw-tools-serve.js"],
|
||||
env: [],
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const readScopedMcpEnv = (sessionKey: string) => {
|
||||
const delegate = (
|
||||
runtime as unknown as {
|
||||
resolveOpenClawToolsDelegateForSession(sessionKey: string): unknown;
|
||||
}
|
||||
).resolveOpenClawToolsDelegateForSession(sessionKey) as {
|
||||
options: {
|
||||
mcpServers?: Array<{
|
||||
env?: Array<{ name: string; value: string }>;
|
||||
name: string;
|
||||
}>;
|
||||
};
|
||||
};
|
||||
return delegate.options.mcpServers?.find((server) => server.name === "openclaw-tools")?.env;
|
||||
};
|
||||
|
||||
expect(readScopedMcpEnv("agent:worker:main")).toContainEqual({
|
||||
name: "OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY",
|
||||
value: "agent:worker:main",
|
||||
});
|
||||
expect(readScopedMcpEnv("agent:research:main")).toContainEqual({
|
||||
name: "OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY",
|
||||
value: "agent:research:main",
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps managed OpenClaw tools MCP delegates reachable for fresh sessions", async () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => undefined),
|
||||
save: vi.fn(async () => {}),
|
||||
};
|
||||
const { runtime } = makeRuntime(baseStore, {
|
||||
openclawToolsMcpBridgeEnabled: true,
|
||||
mcpServers: [
|
||||
{
|
||||
name: "openclaw-tools",
|
||||
command: "node",
|
||||
args: ["dist/mcp/openclaw-tools-serve.js"],
|
||||
env: [],
|
||||
},
|
||||
],
|
||||
});
|
||||
const exposedRuntime = runtime as unknown as {
|
||||
openclawToolsSessionDelegates: Map<string, unknown>;
|
||||
resolveOpenClawToolsDelegateForSession(sessionKey: string): unknown;
|
||||
};
|
||||
|
||||
const firstDelegate =
|
||||
exposedRuntime.resolveOpenClawToolsDelegateForSession("agent:worker:main");
|
||||
expect(exposedRuntime.openclawToolsSessionDelegates.has("agent:worker:main")).toBe(true);
|
||||
|
||||
await runtime.prepareFreshSession({ sessionKey: "agent:worker:main" });
|
||||
|
||||
expect(exposedRuntime.openclawToolsSessionDelegates.has("agent:worker:main")).toBe(true);
|
||||
expect(exposedRuntime.resolveOpenClawToolsDelegateForSession("agent:worker:main")).toBe(
|
||||
firstDelegate,
|
||||
);
|
||||
});
|
||||
|
||||
it("uses the no-MCP delegate for startup probes when the OpenClaw tools bridge is enabled", async () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => undefined),
|
||||
save: vi.fn(async () => {}),
|
||||
};
|
||||
const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, {
|
||||
openclawToolsMcpBridgeEnabled: true,
|
||||
mcpServers: [
|
||||
{
|
||||
name: "openclaw-tools",
|
||||
command: "node",
|
||||
args: ["dist/mcp/openclaw-tools-serve.js"],
|
||||
env: [],
|
||||
},
|
||||
],
|
||||
});
|
||||
const defaultProbe = vi.spyOn(delegate, "probeAvailability").mockResolvedValue(undefined);
|
||||
const safeProbe = vi
|
||||
.spyOn(bridgeSafeDelegate, "probeAvailability")
|
||||
.mockResolvedValue(undefined);
|
||||
|
||||
await runtime.probeAvailability();
|
||||
|
||||
expect(safeProbe).toHaveBeenCalledTimes(1);
|
||||
expect(defaultProbe).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("normalizes OpenClaw Codex model ids for ACP startup", async () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => undefined),
|
||||
@@ -1163,6 +1266,46 @@ describe("AcpxRuntime fresh reset wrapper", () => {
|
||||
expect(baseStore["load"]).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("releases managed OpenClaw tools MCP delegates after close", async () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => undefined),
|
||||
save: vi.fn(async () => {}),
|
||||
};
|
||||
|
||||
const { runtime } = makeRuntime(baseStore, {
|
||||
openclawToolsMcpBridgeEnabled: true,
|
||||
mcpServers: [
|
||||
{
|
||||
name: "openclaw-tools",
|
||||
command: "node",
|
||||
args: ["dist/mcp/openclaw-tools-serve.js"],
|
||||
env: [],
|
||||
},
|
||||
],
|
||||
});
|
||||
const exposedRuntime = runtime as unknown as {
|
||||
openclawToolsSessionDelegates: Map<string, { close: AcpRuntime["close"] }>;
|
||||
resolveOpenClawToolsDelegateForSession(sessionKey: string): {
|
||||
close: AcpRuntime["close"];
|
||||
};
|
||||
};
|
||||
const scopedDelegate =
|
||||
exposedRuntime.resolveOpenClawToolsDelegateForSession("agent:codex:main");
|
||||
const close = vi.spyOn(scopedDelegate, "close").mockResolvedValue(undefined);
|
||||
|
||||
await runtime.close({
|
||||
handle: {
|
||||
sessionKey: "agent:codex:main",
|
||||
backend: "acpx",
|
||||
runtimeSessionName: "agent:codex:main",
|
||||
},
|
||||
reason: "closed",
|
||||
});
|
||||
|
||||
expect(close).toHaveBeenCalledOnce();
|
||||
expect(exposedRuntime.openclawToolsSessionDelegates.has("agent:codex:main")).toBe(false);
|
||||
});
|
||||
|
||||
it("cleans up OpenClaw-owned ACPX process trees after close", async () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => ({
|
||||
|
||||
@@ -50,6 +50,7 @@ type OpenClawAcpxRuntimeOptions = AcpRuntimeOptions & {
|
||||
openclawWrapperRoot?: string;
|
||||
openclawGatewayInstanceId?: string;
|
||||
openclawProcessLeaseStore?: AcpxProcessLeaseStore;
|
||||
openclawToolsMcpBridgeEnabled?: boolean;
|
||||
};
|
||||
type AcpxRuntimeTestOptions = Record<string, unknown> & {
|
||||
openclawProcessCleanup?: AcpxProcessCleanupDeps;
|
||||
@@ -57,6 +58,10 @@ type AcpxRuntimeTestOptions = Record<string, unknown> & {
|
||||
type OpenClawRuntimeTurnInput = Parameters<NonNullable<AcpRuntime["startTurn"]>>[0];
|
||||
type OpenClawRuntimeEnsureInput = Parameters<AcpRuntime["ensureSession"]>[0];
|
||||
type AcpxDelegateEnsureInput = Parameters<BaseAcpxRuntime["ensureSession"]>[0];
|
||||
type AcpxMcpServer = NonNullable<AcpRuntimeOptions["mcpServers"]>[number];
|
||||
|
||||
const ACPX_OPENCLAW_TOOLS_MCP_SERVER_NAME = "openclaw-tools";
|
||||
const OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY_ENV = "OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY";
|
||||
|
||||
type ResetAwareSessionStore = AcpSessionStore & {
|
||||
markFresh: (sessionKey: string) => void;
|
||||
@@ -682,6 +687,33 @@ function shouldUseDistinctBridgeDelegate(options: AcpRuntimeOptions): boolean {
|
||||
return Array.isArray(mcpServers) && mcpServers.length > 0;
|
||||
}
|
||||
|
||||
function withOpenClawToolsMcpSessionEnv(params: {
|
||||
enabled: boolean | undefined;
|
||||
mcpServers: AcpRuntimeOptions["mcpServers"];
|
||||
sessionKey: string;
|
||||
}): AcpRuntimeOptions["mcpServers"] {
|
||||
const sessionKey = params.sessionKey.trim();
|
||||
if (!params.enabled || !sessionKey || !params.mcpServers?.length) {
|
||||
return params.mcpServers;
|
||||
}
|
||||
let changed = false;
|
||||
const nextServers = params.mcpServers.map((server): AcpxMcpServer => {
|
||||
if (server.name !== ACPX_OPENCLAW_TOOLS_MCP_SERVER_NAME || !("command" in server)) {
|
||||
return server;
|
||||
}
|
||||
changed = true;
|
||||
const env = [
|
||||
...server.env.filter((entry) => entry.name !== OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY_ENV),
|
||||
{
|
||||
name: OPENCLAW_TOOLS_MCP_AGENT_SESSION_KEY_ENV,
|
||||
value: sessionKey,
|
||||
},
|
||||
];
|
||||
return { ...server, env };
|
||||
});
|
||||
return changed ? nextServers : params.mcpServers;
|
||||
}
|
||||
|
||||
/** OpenClaw-managed ACP runtime implementation backed by the upstream acpx runtime. */
|
||||
export class AcpxRuntime implements AcpRuntime {
|
||||
private readonly sessionStore: ResetAwareSessionStore;
|
||||
@@ -693,6 +725,10 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
private readonly delegate: BaseAcpxRuntime;
|
||||
private readonly bridgeSafeDelegate: BaseAcpxRuntime;
|
||||
private readonly probeDelegate: BaseAcpxRuntime;
|
||||
private readonly delegateOptions: AcpRuntimeOptions;
|
||||
private readonly delegateTestOptions: BaseAcpxRuntimeTestOptions;
|
||||
private readonly openclawToolsMcpBridgeEnabled: boolean;
|
||||
private readonly openclawToolsSessionDelegates = new Map<string, BaseAcpxRuntime>();
|
||||
private readonly processCleanupDeps: AcpxProcessCleanupDeps | undefined;
|
||||
private readonly wrapperRoot: string | undefined;
|
||||
private readonly gatewayInstanceId: string | undefined;
|
||||
@@ -706,6 +742,7 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
this.wrapperRoot = options.openclawWrapperRoot;
|
||||
this.gatewayInstanceId = options.openclawGatewayInstanceId;
|
||||
this.processLeaseStore = options.openclawProcessLeaseStore;
|
||||
this.openclawToolsMcpBridgeEnabled = options.openclawToolsMcpBridgeEnabled === true;
|
||||
this.cwd = options.cwd;
|
||||
this.sessionStore = createResetAwareSessionStore(options.sessionStore, {
|
||||
gatewayInstanceId: this.gatewayInstanceId,
|
||||
@@ -723,20 +760,21 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
sessionStore: this.sessionStore,
|
||||
agentRegistry: this.scopedAgentRegistry,
|
||||
};
|
||||
this.delegate = new BaseAcpxRuntime(
|
||||
sharedOptions,
|
||||
delegateTestOptions as BaseAcpxRuntimeTestOptions,
|
||||
);
|
||||
this.delegateOptions = sharedOptions;
|
||||
this.delegateTestOptions = delegateTestOptions as BaseAcpxRuntimeTestOptions;
|
||||
this.delegate = new BaseAcpxRuntime(sharedOptions, this.delegateTestOptions);
|
||||
this.bridgeSafeDelegate = shouldUseDistinctBridgeDelegate(options)
|
||||
? new BaseAcpxRuntime(
|
||||
{
|
||||
...sharedOptions,
|
||||
mcpServers: [],
|
||||
},
|
||||
delegateTestOptions as BaseAcpxRuntimeTestOptions,
|
||||
this.delegateTestOptions,
|
||||
)
|
||||
: this.delegate;
|
||||
this.probeDelegate = this.resolveDelegateForAgent(resolveProbeAgentName(options));
|
||||
this.probeDelegate = this.openclawToolsMcpBridgeEnabled
|
||||
? this.bridgeSafeDelegate
|
||||
: this.resolveDelegateForAgent(resolveProbeAgentName(options));
|
||||
}
|
||||
|
||||
private resolveDelegateForAgent(agentName: string | undefined): BaseAcpxRuntime {
|
||||
@@ -751,6 +789,57 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
return shouldUseBridgeSafeDelegateForCommand(command) ? this.bridgeSafeDelegate : this.delegate;
|
||||
}
|
||||
|
||||
private resolveDelegateForSession(params: {
|
||||
command: string | undefined;
|
||||
sessionKey: string;
|
||||
}): BaseAcpxRuntime {
|
||||
if (shouldUseBridgeSafeDelegateForCommand(params.command)) {
|
||||
return this.bridgeSafeDelegate;
|
||||
}
|
||||
return this.resolveOpenClawToolsDelegateForSession(params.sessionKey);
|
||||
}
|
||||
|
||||
private resolveOpenClawToolsDelegateForSession(sessionKey: string): BaseAcpxRuntime {
|
||||
if (!this.openclawToolsMcpBridgeEnabled) {
|
||||
return this.delegate;
|
||||
}
|
||||
const normalizedSessionKey = sessionKey.trim();
|
||||
if (!normalizedSessionKey) {
|
||||
return this.delegate;
|
||||
}
|
||||
const cached = this.openclawToolsSessionDelegates.get(normalizedSessionKey);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
// Upstream acpx captures mcpServers at runtime construction. The managed
|
||||
// OpenClaw tools bridge needs per-session identity, so cache one delegate
|
||||
// per session with the scoped MCP env already embedded.
|
||||
const delegate = new BaseAcpxRuntime(
|
||||
{
|
||||
...this.delegateOptions,
|
||||
mcpServers: withOpenClawToolsMcpSessionEnv({
|
||||
enabled: this.openclawToolsMcpBridgeEnabled,
|
||||
mcpServers: this.delegateOptions.mcpServers,
|
||||
sessionKey: normalizedSessionKey,
|
||||
}),
|
||||
},
|
||||
this.delegateTestOptions,
|
||||
);
|
||||
this.openclawToolsSessionDelegates.set(normalizedSessionKey, delegate);
|
||||
return delegate;
|
||||
}
|
||||
|
||||
private releaseOpenClawToolsDelegateForSession(sessionKey: string): void {
|
||||
if (!this.openclawToolsMcpBridgeEnabled) {
|
||||
return;
|
||||
}
|
||||
const normalizedSessionKey = sessionKey.trim();
|
||||
if (!normalizedSessionKey) {
|
||||
return;
|
||||
}
|
||||
this.openclawToolsSessionDelegates.delete(normalizedSessionKey);
|
||||
}
|
||||
|
||||
private async resolveDelegateForHandle(handle: AcpRuntimeHandle): Promise<BaseAcpxRuntime> {
|
||||
const record = await this.sessionStore.load(handle.acpxRecordId ?? handle.sessionKey);
|
||||
return this.resolveDelegateForLoadedRecord(handle, record);
|
||||
@@ -762,9 +851,17 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
): BaseAcpxRuntime {
|
||||
const recordCommand = readAgentCommandFromRecord(record);
|
||||
if (recordCommand) {
|
||||
return this.resolveDelegateForCommand(recordCommand);
|
||||
return this.resolveDelegateForSession({
|
||||
command: recordCommand,
|
||||
sessionKey: handle.sessionKey,
|
||||
});
|
||||
}
|
||||
return this.resolveDelegateForAgent(readAgentFromHandle(handle));
|
||||
const agentName = readAgentFromHandle(handle);
|
||||
const command = resolveAgentCommandForName({
|
||||
agentName,
|
||||
agentRegistry: this.agentRegistry,
|
||||
});
|
||||
return this.resolveDelegateForSession({ command, sessionKey: handle.sessionKey });
|
||||
}
|
||||
|
||||
private async resolveCommandForHandle(handle: AcpRuntimeHandle): Promise<string | undefined> {
|
||||
@@ -980,7 +1077,7 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
agentName: input.agent,
|
||||
agentRegistry: this.agentRegistry,
|
||||
});
|
||||
const delegate = this.resolveDelegateForCommand(command);
|
||||
const delegate = this.resolveDelegateForSession({ command, sessionKey: input.sessionKey });
|
||||
const claudeModelOverride = isClaudeAcpCommand(command)
|
||||
? normalizeClaudeAcpModelOverride(input.model)
|
||||
: undefined;
|
||||
@@ -1264,6 +1361,9 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
}
|
||||
|
||||
async prepareFreshSession(input: { sessionKey: string }): Promise<void> {
|
||||
// Fresh reset has no ACP handle to close the delegate's upstream client.
|
||||
// Keep the scoped delegate reachable so the next ensure can replace it;
|
||||
// close() owns cache release when the session lifecycle ends.
|
||||
this.sessionStore.markFresh(input.sessionKey);
|
||||
}
|
||||
|
||||
@@ -1272,8 +1372,9 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
input.handle.acpxRecordId ?? input.handle.sessionKey,
|
||||
);
|
||||
let closeSucceeded;
|
||||
const delegate = this.resolveDelegateForLoadedRecord(input.handle, record);
|
||||
try {
|
||||
await this.resolveDelegateForLoadedRecord(input.handle, record).close({
|
||||
await delegate.close({
|
||||
handle: input.handle,
|
||||
reason: input.reason,
|
||||
discardPersistentState: input.discardPersistentState,
|
||||
@@ -1282,6 +1383,9 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
} finally {
|
||||
await this.cleanupProcessTreeForRecord(input.handle, record);
|
||||
}
|
||||
if (closeSucceeded) {
|
||||
this.releaseOpenClawToolsDelegateForSession(input.handle.sessionKey);
|
||||
}
|
||||
if (closeSucceeded && input.discardPersistentState) {
|
||||
this.sessionStore.markFresh(input.handle.sessionKey);
|
||||
}
|
||||
|
||||
@@ -111,6 +111,7 @@ function createLazyDefaultRuntime(params: AcpxRuntimeFactoryParams): AcpxRuntime
|
||||
}),
|
||||
probeAgent: params.pluginConfig.probeAgent,
|
||||
mcpServers: toAcpMcpServers(params.pluginConfig.mcpServers),
|
||||
openclawToolsMcpBridgeEnabled: params.pluginConfig.openClawToolsMcpBridge,
|
||||
permissionMode: params.pluginConfig.permissionMode,
|
||||
nonInteractivePermissions: params.pluginConfig.nonInteractivePermissions,
|
||||
timeoutMs: resolveAcpxTimerTimeoutMs(params.pluginConfig.timeoutSeconds),
|
||||
|
||||
@@ -1,6 +1,81 @@
|
||||
import { createServer, type Server } from "node:http";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createClickClackClient } from "./http-client.js";
|
||||
|
||||
const LOOPBACK_RESPONSE_BYTES = 18 * 1024 * 1024;
|
||||
|
||||
async function listenLoopbackServer(server: Server): Promise<number> {
|
||||
return await new Promise((resolve, reject) => {
|
||||
server.once("error", reject);
|
||||
server.listen(0, "127.0.0.1", () => {
|
||||
server.off("error", reject);
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
reject(new Error("expected loopback TCP address"));
|
||||
return;
|
||||
}
|
||||
resolve(address.port);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function createOversizedJsonServer(): { server: Server; closed: Promise<number> } {
|
||||
let resolveClosed: (sentBytes: number) => void = () => {};
|
||||
const closed = new Promise<number>((resolve) => {
|
||||
resolveClosed = resolve;
|
||||
});
|
||||
const server = createServer((req, res) => {
|
||||
let sentBytes = 0;
|
||||
let stopped = false;
|
||||
let prefixSent = false;
|
||||
const prefixChunk = Buffer.from('{"user":{"id":"');
|
||||
const bodyChunk = Buffer.alloc(64 * 1024, 0x61);
|
||||
const suffixChunk = Buffer.from('"}}');
|
||||
const writeBuffer = (buffer: Buffer) => {
|
||||
sentBytes += buffer.length;
|
||||
if (!res.write(buffer)) {
|
||||
res.once("drain", writeChunks);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
const writeChunks = () => {
|
||||
if (!prefixSent) {
|
||||
prefixSent = true;
|
||||
if (!writeBuffer(prefixChunk)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
while (true) {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
if (sentBytes + bodyChunk.length + suffixChunk.length >= LOOPBACK_RESPONSE_BYTES) {
|
||||
break;
|
||||
}
|
||||
if (!writeBuffer(bodyChunk)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (!stopped) {
|
||||
sentBytes += suffixChunk.length;
|
||||
res.end(suffixChunk);
|
||||
}
|
||||
};
|
||||
res.writeHead(200, { connection: "close", "content-type": "application/json" });
|
||||
res.on("close", () => {
|
||||
stopped = true;
|
||||
resolveClosed(sentBytes);
|
||||
});
|
||||
req.on("aborted", () => {
|
||||
stopped = true;
|
||||
res.destroy();
|
||||
});
|
||||
writeChunks();
|
||||
});
|
||||
return { server, closed };
|
||||
}
|
||||
|
||||
function streamedErrorResponse(body: string, limit: number) {
|
||||
const encoded = new TextEncoder().encode(body);
|
||||
let readCount = 0;
|
||||
@@ -39,6 +114,25 @@ function streamedErrorResponse(body: string, limit: number) {
|
||||
}
|
||||
|
||||
describe("ClickClack HTTP client", () => {
|
||||
it("bounds oversized success JSON responses and closes the stream early", async () => {
|
||||
const { server, closed } = createOversizedJsonServer();
|
||||
const port = await listenLoopbackServer(server);
|
||||
const client = createClickClackClient({
|
||||
baseUrl: `http://127.0.0.1:${port}`,
|
||||
token: "test-token",
|
||||
});
|
||||
|
||||
try {
|
||||
await expect(client.me()).rejects.toThrow(
|
||||
"ClickClack response: JSON response exceeds 16777216 bytes",
|
||||
);
|
||||
const sentBytes = await closed;
|
||||
expect(sentBytes).toBeLessThan(LOOPBACK_RESPONSE_BYTES);
|
||||
} finally {
|
||||
server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it("bounds error response bodies without using raw response.text()", async () => {
|
||||
const streamed = streamedErrorResponse("x".repeat(9000), 8 * 1024);
|
||||
const fetchMock = vi.fn(async () => streamed.response);
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
* Thin ClickClack REST/websocket client used by gateway, resolver, and outbound
|
||||
* delivery code.
|
||||
*/
|
||||
import { readResponseTextLimited } from "openclaw/plugin-sdk/provider-http";
|
||||
import {
|
||||
readProviderJsonResponse,
|
||||
readResponseTextLimited,
|
||||
} from "openclaw/plugin-sdk/provider-http";
|
||||
import { WebSocket } from "ws";
|
||||
import type {
|
||||
ClickClackChannel,
|
||||
@@ -44,7 +47,7 @@ export function createClickClackClient(options: ClientOptions) {
|
||||
const detail = await readResponseTextLimited(response, CLICKCLACK_ERROR_BODY_LIMIT_BYTES);
|
||||
throw new Error(`ClickClack ${response.status}: ${detail}`);
|
||||
}
|
||||
return (await response.json()) as T;
|
||||
return await readProviderJsonResponse<T>(response, "ClickClack response");
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -36,6 +36,14 @@ describe("codex doctor contract", () => {
|
||||
},
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
legacyConfigRules[1]?.match({
|
||||
allow_destructive_actions: "always",
|
||||
plugins: {
|
||||
"google-calendar": { allow_destructive_actions: "always" },
|
||||
},
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("removes the retired dynamic tools profile without dropping other Codex config", () => {
|
||||
|
||||
@@ -101,7 +101,7 @@
|
||||
"default": false
|
||||
},
|
||||
"allow_destructive_actions": {
|
||||
"oneOf": [{ "type": "boolean" }, { "const": "auto" }],
|
||||
"oneOf": [{ "type": "boolean" }, { "const": "auto" }, { "const": "always" }],
|
||||
"default": true
|
||||
},
|
||||
"plugins": {
|
||||
@@ -121,7 +121,7 @@
|
||||
"type": "string"
|
||||
},
|
||||
"allow_destructive_actions": {
|
||||
"oneOf": [{ "type": "boolean" }, { "const": "auto" }]
|
||||
"oneOf": [{ "type": "boolean" }, { "const": "auto" }, { "const": "always" }]
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -343,7 +343,7 @@
|
||||
},
|
||||
"codexPlugins.allow_destructive_actions": {
|
||||
"label": "Allow Destructive Plugin Actions",
|
||||
"help": "Default policy for plugin app write or destructive action elicitations. Use true to accept safe schemas without prompting, false to decline, or auto to ask through plugin approvals.",
|
||||
"help": "Default policy for plugin app write or destructive action elicitations. Use true to accept safe schemas without prompting, false to decline, auto to ask through plugin approvals when Codex requires approval, or always to ask for every write/destructive action without durable approval.",
|
||||
"advanced": true
|
||||
},
|
||||
"codexPlugins.plugins": {
|
||||
|
||||
@@ -346,6 +346,7 @@ export async function startCodexAttemptThread(params: {
|
||||
timeoutMs: params.appServer.requestTimeoutMs,
|
||||
signal,
|
||||
}),
|
||||
configCwd: startupExecutionCwd,
|
||||
appCache: defaultCodexAppInventoryCache,
|
||||
appCacheKey: pluginAppCacheKey,
|
||||
}),
|
||||
|
||||
@@ -1192,6 +1192,52 @@ allowed_sandbox_modes = ["read-only", "workspace-write"]
|
||||
});
|
||||
});
|
||||
|
||||
it("parses always native Codex plugin destructive policy", () => {
|
||||
const config = readCodexPluginConfig({
|
||||
codexPlugins: {
|
||||
enabled: true,
|
||||
allow_destructive_actions: "always",
|
||||
plugins: {
|
||||
"google-calendar": {
|
||||
marketplaceName: "openai-curated",
|
||||
pluginName: "google-calendar",
|
||||
},
|
||||
slack: {
|
||||
marketplaceName: "openai-curated",
|
||||
pluginName: "slack",
|
||||
allow_destructive_actions: "auto",
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(config.codexPlugins?.allow_destructive_actions).toBe("always");
|
||||
expect(resolveCodexPluginsPolicy(config)).toEqual({
|
||||
configured: true,
|
||||
enabled: true,
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "always",
|
||||
pluginPolicies: [
|
||||
{
|
||||
configKey: "google-calendar",
|
||||
marketplaceName: "openai-curated",
|
||||
pluginName: "google-calendar",
|
||||
enabled: true,
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "always",
|
||||
},
|
||||
{
|
||||
configKey: "slack",
|
||||
marketplaceName: "openai-curated",
|
||||
pluginName: "slack",
|
||||
enabled: true,
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "auto",
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects unsupported native Codex plugin destructive policy strings", () => {
|
||||
const config = readCodexPluginConfig({
|
||||
codexPlugins: {
|
||||
|
||||
@@ -74,8 +74,8 @@ export type CodexAppServerSandboxMode = "read-only" | "workspace-write" | "dange
|
||||
type CodexAppServerApprovalsReviewer = "user" | "auto_review" | "guardian_subagent";
|
||||
type CodexAppServerCommandSource = "managed" | "resolved-managed" | "config" | "env";
|
||||
export type CodexDynamicToolsLoading = "searchable" | "direct";
|
||||
export type CodexPluginDestructivePolicy = boolean | "auto";
|
||||
export type CodexPluginDestructiveApprovalMode = "allow" | "deny" | "auto";
|
||||
export type CodexPluginDestructivePolicy = boolean | "auto" | "always";
|
||||
export type CodexPluginDestructiveApprovalMode = "allow" | "deny" | "auto" | "always";
|
||||
|
||||
export const CODEX_PLUGINS_MARKETPLACE_NAME = "openai-curated";
|
||||
|
||||
@@ -311,7 +311,11 @@ const codexAppServerApprovalPolicySchema = z.enum([
|
||||
const codexAppServerSandboxSchema = z.enum(["read-only", "workspace-write", "danger-full-access"]);
|
||||
const codexAppServerApprovalsReviewerSchema = z.enum(["user", "auto_review", "guardian_subagent"]);
|
||||
const codexDynamicToolsLoadingSchema = z.enum(["searchable", "direct"]);
|
||||
const codexPluginDestructivePolicySchema = z.union([z.boolean(), z.literal("auto")]);
|
||||
const codexPluginDestructivePolicySchema = z.union([
|
||||
z.boolean(),
|
||||
z.literal("auto"),
|
||||
z.literal("always"),
|
||||
]);
|
||||
const codexAppServerServiceTierSchema = z
|
||||
.preprocess(
|
||||
(value) => (value === null ? null : normalizeCodexServiceTier(value)),
|
||||
@@ -495,8 +499,8 @@ function resolveCodexPluginDestructivePolicy(policy: CodexPluginDestructivePolic
|
||||
allowDestructiveActions: boolean;
|
||||
destructiveApprovalMode: CodexPluginDestructiveApprovalMode;
|
||||
} {
|
||||
if (policy === "auto") {
|
||||
return { allowDestructiveActions: true, destructiveApprovalMode: "auto" };
|
||||
if (policy === "auto" || policy === "always") {
|
||||
return { allowDestructiveActions: true, destructiveApprovalMode: policy };
|
||||
}
|
||||
return {
|
||||
allowDestructiveActions: policy,
|
||||
|
||||
@@ -157,7 +157,7 @@ function buildConnectorPluginApprovalElicitation(overrides: Record<string, unkno
|
||||
function createPluginAppPolicyContext(
|
||||
params: {
|
||||
allowDestructiveActions?: boolean;
|
||||
destructiveApprovalMode?: "allow" | "deny" | "auto";
|
||||
destructiveApprovalMode?: "allow" | "deny" | "auto" | "always";
|
||||
apps?: Array<{ appId: string; pluginName: string; mcpServerNames: string[] }>;
|
||||
} = {},
|
||||
) {
|
||||
@@ -1017,6 +1017,96 @@ describe("Codex app-server elicitation bridge", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("does not expose allow-always for always plugin policy", async () => {
|
||||
mockCallGatewayTool
|
||||
.mockResolvedValueOnce({ id: "plugin:approval-calendar-always-policy", status: "accepted" })
|
||||
.mockResolvedValueOnce({
|
||||
id: "plugin:approval-calendar-always-policy",
|
||||
decision: "allow-once",
|
||||
});
|
||||
|
||||
const result = await handleCodexAppServerElicitationRequest({
|
||||
requestParams: buildConnectorPluginApprovalElicitation({
|
||||
_meta: {
|
||||
codex_approval_kind: "mcp_tool_call",
|
||||
source: "connector",
|
||||
connector_id: "connector_google_calendar",
|
||||
connector_name: "Google Calendar",
|
||||
persist: ["session", "always"],
|
||||
tool_title: "create_event",
|
||||
},
|
||||
}),
|
||||
paramsForRun: createParams(),
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
pluginAppPolicyContext: createPluginAppPolicyContext({
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "always",
|
||||
apps: [
|
||||
{
|
||||
appId: "connector_google_calendar",
|
||||
pluginName: "google-calendar",
|
||||
mcpServerNames: [],
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
|
||||
expect(result).toEqual({
|
||||
action: "accept",
|
||||
content: null,
|
||||
_meta: null,
|
||||
});
|
||||
expect(gatewayToolArg(0, 2)).toMatchObject({
|
||||
allowedDecisions: ["allow-once", "deny"],
|
||||
});
|
||||
});
|
||||
|
||||
it("maps unexpected allow-always decisions to one-shot for always plugin policy", async () => {
|
||||
mockCallGatewayTool
|
||||
.mockResolvedValueOnce({
|
||||
id: "plugin:approval-calendar-unexpected-always",
|
||||
status: "accepted",
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
id: "plugin:approval-calendar-unexpected-always",
|
||||
decision: "allow-always",
|
||||
});
|
||||
|
||||
const result = await handleCodexAppServerElicitationRequest({
|
||||
requestParams: buildConnectorPluginApprovalElicitation({
|
||||
_meta: {
|
||||
codex_approval_kind: "mcp_tool_call",
|
||||
source: "connector",
|
||||
connector_id: "connector_google_calendar",
|
||||
connector_name: "Google Calendar",
|
||||
persist: ["session", "always"],
|
||||
tool_title: "create_event",
|
||||
},
|
||||
}),
|
||||
paramsForRun: createParams(),
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
pluginAppPolicyContext: createPluginAppPolicyContext({
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "always",
|
||||
apps: [
|
||||
{
|
||||
appId: "connector_google_calendar",
|
||||
pluginName: "google-calendar",
|
||||
mcpServerNames: [],
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
|
||||
expect(result).toEqual({
|
||||
action: "accept",
|
||||
content: null,
|
||||
_meta: null,
|
||||
});
|
||||
});
|
||||
|
||||
it("declines denied auto plugin app approvals", async () => {
|
||||
mockCallGatewayTool
|
||||
.mockResolvedValueOnce({ id: "plugin:approval-calendar-deny", status: "accepted" })
|
||||
|
||||
@@ -318,10 +318,13 @@ async function buildPluginPolicyElicitationResponse(params: {
|
||||
paramsForRun: params.paramsForRun,
|
||||
title: approvalPrompt.title,
|
||||
description: approvalPrompt.description,
|
||||
allowedDecisions: approvalPrompt.allowedDecisions,
|
||||
allowedDecisions: allowedPluginPolicyApprovalDecisions(mode, approvalPrompt),
|
||||
signal: params.signal,
|
||||
});
|
||||
return buildElicitationResponse(approvalPrompt, outcome);
|
||||
return buildElicitationResponse(
|
||||
approvalPrompt,
|
||||
oneShotPluginPolicyApprovalOutcome(mode, outcome),
|
||||
);
|
||||
}
|
||||
logPluginElicitationDecline("unmappable_schema", params.requestParams);
|
||||
return declineElicitationResponse();
|
||||
@@ -329,10 +332,28 @@ async function buildPluginPolicyElicitationResponse(params: {
|
||||
|
||||
function resolvePluginDestructiveApprovalMode(
|
||||
entry: PluginAppPolicyContextEntry,
|
||||
): "allow" | "deny" | "auto" {
|
||||
): "allow" | "deny" | "auto" | "always" {
|
||||
return entry.destructiveApprovalMode ?? (entry.allowDestructiveActions ? "allow" : "deny");
|
||||
}
|
||||
|
||||
function allowedPluginPolicyApprovalDecisions(
|
||||
mode: "allow" | "deny" | "auto" | "always",
|
||||
approvalPrompt: BridgeableApprovalElicitation,
|
||||
): ExecApprovalDecision[] {
|
||||
const allowedDecisions = approvalPrompt.allowedDecisions ?? ["allow-once", "deny"];
|
||||
if (mode !== "always") {
|
||||
return allowedDecisions;
|
||||
}
|
||||
return allowedDecisions.filter((decision) => decision !== "allow-always");
|
||||
}
|
||||
|
||||
function oneShotPluginPolicyApprovalOutcome(
|
||||
mode: "allow" | "deny" | "auto" | "always",
|
||||
outcome: AppServerApprovalOutcome,
|
||||
): AppServerApprovalOutcome {
|
||||
return mode === "always" && outcome === "approved-session" ? "approved-once" : outcome;
|
||||
}
|
||||
|
||||
function readPluginApprovalElicitation(
|
||||
entry: PluginAppPolicyContextEntry,
|
||||
requestParams: JsonObject,
|
||||
|
||||
@@ -170,6 +170,379 @@ describe("Codex plugin thread config", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("exposes destructive app access while clearing only durable approval overrides for always mode", async () => {
|
||||
const appCache = new CodexAppInventoryCache();
|
||||
await appCache.refreshNow({
|
||||
key: "runtime",
|
||||
nowMs: 0,
|
||||
request: async () => ({
|
||||
data: [appInfo("google-calendar-app", true)],
|
||||
nextCursor: null,
|
||||
}),
|
||||
});
|
||||
let configReadCount = 0;
|
||||
const request = vi.fn(async (method: string) => {
|
||||
if (method === "plugin/list") {
|
||||
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
|
||||
}
|
||||
if (method === "plugin/read") {
|
||||
return pluginDetail(
|
||||
"google-calendar",
|
||||
[appSummary("google-calendar-app")],
|
||||
["google-calendar"],
|
||||
);
|
||||
}
|
||||
if (method === "config/read") {
|
||||
configReadCount += 1;
|
||||
if (configReadCount > 1) {
|
||||
return {
|
||||
config: {
|
||||
apps: {
|
||||
"google-calendar-app": {
|
||||
tools: {
|
||||
"calendar/read": {
|
||||
enabled: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
return {
|
||||
config: {
|
||||
apps: {
|
||||
"google-calendar-app": {
|
||||
tools: {
|
||||
"calendar/create": {
|
||||
approval_mode: "approve",
|
||||
enabled: false,
|
||||
},
|
||||
"calendar/read": {
|
||||
enabled: false,
|
||||
},
|
||||
"calendar/update": {
|
||||
approvalMode: "approve",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
if (method === "config/value/write") {
|
||||
return {};
|
||||
}
|
||||
throw new Error(`unexpected request ${method}`);
|
||||
});
|
||||
|
||||
const config = await buildCodexPluginThreadConfig({
|
||||
pluginConfig: {
|
||||
codexPlugins: {
|
||||
enabled: true,
|
||||
allow_destructive_actions: "always",
|
||||
plugins: {
|
||||
"google-calendar": {
|
||||
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
|
||||
pluginName: "google-calendar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
appCache,
|
||||
appCacheKey: "runtime",
|
||||
nowMs: 1,
|
||||
request,
|
||||
});
|
||||
|
||||
const apps = config.configPatch?.apps as Record<string, unknown> | undefined;
|
||||
expect(apps?.["google-calendar-app"]).toEqual({
|
||||
enabled: true,
|
||||
destructive_enabled: true,
|
||||
open_world_enabled: true,
|
||||
default_tools_approval_mode: "auto",
|
||||
});
|
||||
expect(config.policyContext.apps["google-calendar-app"]).toMatchObject({
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "always",
|
||||
});
|
||||
expect(request).toHaveBeenCalledWith("config/read", { includeLayers: false });
|
||||
expect(request.mock.calls.filter(([method]) => method === "config/read")).toHaveLength(2);
|
||||
expect(request).toHaveBeenCalledWith("config/value/write", {
|
||||
keyPath: 'apps."google-calendar-app".tools."calendar/create".approval_mode',
|
||||
value: null,
|
||||
mergeStrategy: "replace",
|
||||
});
|
||||
expect(request).toHaveBeenCalledWith("config/value/write", {
|
||||
keyPath: 'apps."google-calendar-app".tools."calendar/update".approval_mode',
|
||||
value: null,
|
||||
mergeStrategy: "replace",
|
||||
});
|
||||
expect(request).not.toHaveBeenCalledWith("config/value/write", {
|
||||
keyPath: 'apps."google-calendar-app".tools',
|
||||
value: null,
|
||||
mergeStrategy: "replace",
|
||||
});
|
||||
});
|
||||
|
||||
it("omits always policy apps when cwd effective approval overrides remain after cleanup", async () => {
|
||||
const appCache = new CodexAppInventoryCache();
|
||||
await appCache.refreshNow({
|
||||
key: "runtime",
|
||||
nowMs: 0,
|
||||
request: async () => ({
|
||||
data: [appInfo("google-calendar-app", true)],
|
||||
nextCursor: null,
|
||||
}),
|
||||
});
|
||||
let configReadCount = 0;
|
||||
const request = vi.fn(async (method: string) => {
|
||||
if (method === "plugin/list") {
|
||||
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
|
||||
}
|
||||
if (method === "plugin/read") {
|
||||
return pluginDetail(
|
||||
"google-calendar",
|
||||
[appSummary("google-calendar-app")],
|
||||
["google-calendar"],
|
||||
);
|
||||
}
|
||||
if (method === "config/read") {
|
||||
configReadCount += 1;
|
||||
return {
|
||||
config: {
|
||||
apps: {
|
||||
"google-calendar-app": {
|
||||
tools: {
|
||||
"calendar/create": {
|
||||
approval_mode: "approve",
|
||||
source: configReadCount === 1 ? "user" : "project",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
if (method === "config/value/write") {
|
||||
return { status: "ok" };
|
||||
}
|
||||
throw new Error(`unexpected request ${method}`);
|
||||
});
|
||||
|
||||
const config = await buildCodexPluginThreadConfig({
|
||||
pluginConfig: {
|
||||
codexPlugins: {
|
||||
enabled: true,
|
||||
allow_destructive_actions: "always",
|
||||
plugins: {
|
||||
"google-calendar": {
|
||||
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
|
||||
pluginName: "google-calendar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
appCache,
|
||||
appCacheKey: "runtime",
|
||||
configCwd: "/repo/project",
|
||||
nowMs: 1,
|
||||
request,
|
||||
});
|
||||
|
||||
expect(config.configPatch).toEqual({
|
||||
apps: {
|
||||
_default: {
|
||||
enabled: false,
|
||||
destructive_enabled: false,
|
||||
open_world_enabled: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(config.policyContext.apps).toStrictEqual({});
|
||||
expect(request).toHaveBeenCalledWith("config/read", {
|
||||
includeLayers: false,
|
||||
cwd: "/repo/project",
|
||||
});
|
||||
expect(request.mock.calls.filter(([method]) => method === "config/read")).toHaveLength(2);
|
||||
expect(config.diagnostics).toStrictEqual([
|
||||
{
|
||||
code: "approval_overrides_clear_failed",
|
||||
plugin: {
|
||||
configKey: "google-calendar",
|
||||
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
|
||||
pluginName: "google-calendar",
|
||||
enabled: true,
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "always",
|
||||
},
|
||||
message:
|
||||
"Could not clear durable Codex app approval overrides for google-calendar-app: effective approval overrides remain for calendar/create",
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("omits always policy apps when approval override writes are overridden", async () => {
|
||||
const appCache = new CodexAppInventoryCache();
|
||||
await appCache.refreshNow({
|
||||
key: "runtime",
|
||||
nowMs: 0,
|
||||
request: async () => ({
|
||||
data: [appInfo("google-calendar-app", true)],
|
||||
nextCursor: null,
|
||||
}),
|
||||
});
|
||||
const request = vi.fn(async (method: string) => {
|
||||
if (method === "plugin/list") {
|
||||
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
|
||||
}
|
||||
if (method === "plugin/read") {
|
||||
return pluginDetail(
|
||||
"google-calendar",
|
||||
[appSummary("google-calendar-app")],
|
||||
["google-calendar"],
|
||||
);
|
||||
}
|
||||
if (method === "config/read") {
|
||||
return {
|
||||
config: {
|
||||
apps: {
|
||||
"google-calendar-app": {
|
||||
tools: {
|
||||
"calendar/create": {
|
||||
approval_mode: "approve",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
if (method === "config/value/write") {
|
||||
return { status: "okOverridden" };
|
||||
}
|
||||
throw new Error(`unexpected request ${method}`);
|
||||
});
|
||||
|
||||
const config = await buildCodexPluginThreadConfig({
|
||||
pluginConfig: {
|
||||
codexPlugins: {
|
||||
enabled: true,
|
||||
allow_destructive_actions: "always",
|
||||
plugins: {
|
||||
"google-calendar": {
|
||||
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
|
||||
pluginName: "google-calendar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
appCache,
|
||||
appCacheKey: "runtime",
|
||||
configCwd: "/repo/project",
|
||||
nowMs: 1,
|
||||
request,
|
||||
});
|
||||
|
||||
expect(config.configPatch).toEqual({
|
||||
apps: {
|
||||
_default: {
|
||||
enabled: false,
|
||||
destructive_enabled: false,
|
||||
open_world_enabled: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(config.policyContext.apps).toStrictEqual({});
|
||||
expect(config.diagnostics).toStrictEqual([
|
||||
{
|
||||
code: "approval_overrides_clear_failed",
|
||||
plugin: {
|
||||
configKey: "google-calendar",
|
||||
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
|
||||
pluginName: "google-calendar",
|
||||
enabled: true,
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "always",
|
||||
},
|
||||
message:
|
||||
"Could not clear durable Codex app approval overrides for google-calendar-app: approval override for calendar/create is controlled by another config layer",
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("omits always policy apps when durable approval override cleanup fails", async () => {
|
||||
const appCache = new CodexAppInventoryCache();
|
||||
await appCache.refreshNow({
|
||||
key: "runtime",
|
||||
nowMs: 0,
|
||||
request: async () => ({
|
||||
data: [appInfo("google-calendar-app", true)],
|
||||
nextCursor: null,
|
||||
}),
|
||||
});
|
||||
|
||||
const config = await buildCodexPluginThreadConfig({
|
||||
pluginConfig: {
|
||||
codexPlugins: {
|
||||
enabled: true,
|
||||
allow_destructive_actions: "always",
|
||||
plugins: {
|
||||
"google-calendar": {
|
||||
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
|
||||
pluginName: "google-calendar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
appCache,
|
||||
appCacheKey: "runtime",
|
||||
nowMs: 1,
|
||||
request: async (method) => {
|
||||
if (method === "plugin/list") {
|
||||
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
|
||||
}
|
||||
if (method === "plugin/read") {
|
||||
return pluginDetail(
|
||||
"google-calendar",
|
||||
[appSummary("google-calendar-app")],
|
||||
["google-calendar"],
|
||||
);
|
||||
}
|
||||
if (method === "config/read") {
|
||||
throw new Error("readonly config");
|
||||
}
|
||||
throw new Error(`unexpected request ${method}`);
|
||||
},
|
||||
});
|
||||
|
||||
expect(config.configPatch).toEqual({
|
||||
apps: {
|
||||
_default: {
|
||||
enabled: false,
|
||||
destructive_enabled: false,
|
||||
open_world_enabled: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(config.policyContext.apps).toStrictEqual({});
|
||||
expect(config.diagnostics).toStrictEqual([
|
||||
{
|
||||
code: "approval_overrides_clear_failed",
|
||||
plugin: {
|
||||
configKey: "google-calendar",
|
||||
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
|
||||
pluginName: "google-calendar",
|
||||
enabled: true,
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "always",
|
||||
},
|
||||
message:
|
||||
"Could not clear durable Codex app approval overrides for google-calendar-app: readonly config",
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("builds a restrictive app config when native plugin support is disabled", async () => {
|
||||
expect(
|
||||
shouldBuildCodexPluginThreadConfig({
|
||||
|
||||
@@ -29,7 +29,7 @@ import {
|
||||
type CodexPluginOwnedApp,
|
||||
type CodexPluginRuntimeRequest,
|
||||
} from "./plugin-inventory.js";
|
||||
import type { JsonObject, JsonValue } from "./protocol.js";
|
||||
import { isJsonObject, type JsonObject, type JsonValue } from "./protocol.js";
|
||||
|
||||
/** Policy context for one app id exposed by a configured Codex plugin. */
|
||||
export type PluginAppPolicyContextEntry = {
|
||||
@@ -52,7 +52,7 @@ export type PluginAppPolicyContext = {
|
||||
export type CodexPluginThreadConfigDiagnostic =
|
||||
| CodexPluginInventoryDiagnostic
|
||||
| {
|
||||
code: "plugin_activation_failed" | "app_not_ready";
|
||||
code: "plugin_activation_failed" | "app_not_ready" | "approval_overrides_clear_failed";
|
||||
plugin?: ResolvedCodexPluginPolicy;
|
||||
message: string;
|
||||
};
|
||||
@@ -72,6 +72,7 @@ export type CodexPluginThreadConfig = {
|
||||
export type BuildCodexPluginThreadConfigParams = {
|
||||
pluginConfig?: unknown;
|
||||
request: CodexPluginRuntimeRequest;
|
||||
configCwd?: string;
|
||||
appCache?: CodexAppInventoryCache;
|
||||
appCacheKey: string;
|
||||
nowMs?: number;
|
||||
@@ -250,6 +251,18 @@ export async function buildCodexPluginThreadConfig(
|
||||
});
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
record.policy.destructiveApprovalMode === "always" &&
|
||||
!(await clearPersistedAppToolApprovalOverrides({
|
||||
request: params.request,
|
||||
configCwd: params.configCwd,
|
||||
plugin: record.policy,
|
||||
app,
|
||||
diagnostics,
|
||||
}))
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
const appConfig: JsonObject = {
|
||||
enabled: true,
|
||||
destructive_enabled: record.policy.allowDestructiveActions,
|
||||
@@ -367,6 +380,86 @@ function buildPluginAppPolicyContext(
|
||||
};
|
||||
}
|
||||
|
||||
async function clearPersistedAppToolApprovalOverrides(params: {
|
||||
request: CodexPluginRuntimeRequest;
|
||||
configCwd?: string;
|
||||
plugin: ResolvedCodexPluginPolicy;
|
||||
app: CodexPluginOwnedApp;
|
||||
diagnostics: CodexPluginThreadConfigDiagnostic[];
|
||||
}): Promise<boolean> {
|
||||
try {
|
||||
const overrideNames = await readPersistedAppToolApprovalOverrideNames(params);
|
||||
for (const toolName of overrideNames) {
|
||||
const response = await params.request("config/value/write", {
|
||||
keyPath: `apps.${quoteConfigKeyPathSegment(params.app.id)}.tools.${quoteConfigKeyPathSegment(
|
||||
toolName,
|
||||
)}.approval_mode`,
|
||||
value: null,
|
||||
mergeStrategy: "replace",
|
||||
});
|
||||
if (isOverriddenConfigWriteResponse(response)) {
|
||||
throw new Error(`approval override for ${toolName} is controlled by another config layer`);
|
||||
}
|
||||
}
|
||||
const remainingOverrideNames = await readPersistedAppToolApprovalOverrideNames(params);
|
||||
if (remainingOverrideNames.length > 0) {
|
||||
throw new Error(
|
||||
`effective approval overrides remain for ${remainingOverrideNames.join(", ")}`,
|
||||
);
|
||||
}
|
||||
return true;
|
||||
} catch (error) {
|
||||
params.diagnostics.push({
|
||||
code: "approval_overrides_clear_failed",
|
||||
plugin: params.plugin,
|
||||
message: `Could not clear durable Codex app approval overrides for ${params.app.id}: ${
|
||||
error instanceof Error ? error.message : String(error)
|
||||
}`,
|
||||
});
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function readPersistedAppToolApprovalOverrideNames(params: {
|
||||
request: CodexPluginRuntimeRequest;
|
||||
configCwd?: string;
|
||||
app: CodexPluginOwnedApp;
|
||||
}): Promise<string[]> {
|
||||
const response = await params.request("config/read", {
|
||||
includeLayers: false,
|
||||
...(params.configCwd ? { cwd: params.configCwd } : {}),
|
||||
});
|
||||
const config = isJsonObject(response) ? response.config : undefined;
|
||||
const appsRoot = isJsonObject(config) ? config.apps : undefined;
|
||||
const nestedApps = isJsonObject(appsRoot) ? appsRoot.apps : undefined;
|
||||
const appConfig = isJsonObject(appsRoot)
|
||||
? (appsRoot[params.app.id] ??
|
||||
(isJsonObject(nestedApps) ? nestedApps[params.app.id] : undefined))
|
||||
: undefined;
|
||||
const tools = isJsonObject(appConfig) ? appConfig.tools : undefined;
|
||||
if (!isJsonObject(tools)) {
|
||||
return [];
|
||||
}
|
||||
return Object.entries(tools)
|
||||
.filter(([, value]) => hasPersistedToolApprovalOverride(value))
|
||||
.map(([toolName]) => toolName)
|
||||
.toSorted();
|
||||
}
|
||||
|
||||
function hasPersistedToolApprovalOverride(value: JsonValue): boolean {
|
||||
return (
|
||||
isJsonObject(value) && (value.approval_mode !== undefined || value.approvalMode !== undefined)
|
||||
);
|
||||
}
|
||||
|
||||
function isOverriddenConfigWriteResponse(response: unknown): boolean {
|
||||
return isJsonObject(response) && response.status === "okOverridden";
|
||||
}
|
||||
|
||||
function quoteConfigKeyPathSegment(segment: string): string {
|
||||
return `"${segment.replace(/["\\]/g, (char) => `\\${char}`)}"`;
|
||||
}
|
||||
|
||||
function shouldWaitForInitialAppInventory(
|
||||
params: BuildCodexPluginThreadConfigParams,
|
||||
policy: ResolvedCodexPluginsPolicy,
|
||||
|
||||
@@ -575,6 +575,8 @@ type CodexAppServerRequestResultMap = {
|
||||
"account/read": CodexGetAccountResponse;
|
||||
"app/list": CodexAppsListResponse;
|
||||
"config/mcpServer/reload": JsonValue;
|
||||
"config/read": JsonValue;
|
||||
"config/value/write": JsonValue;
|
||||
"environment/add": JsonValue;
|
||||
"experimentalFeature/enablement/set": JsonValue;
|
||||
"feedback/upload": JsonValue;
|
||||
|
||||
@@ -112,6 +112,44 @@ describe("requestCodexAppServerJson sandbox guard", () => {
|
||||
expect(request).toHaveBeenCalledWith("thread/list", { limit: 10 }, { timeoutMs: 60_000 });
|
||||
});
|
||||
|
||||
it("allows config value writes in sandboxed sessions", async () => {
|
||||
const request = vi.fn(async () => ({ ok: true }));
|
||||
sharedClientMocks.getSharedCodexAppServerClient.mockResolvedValue({ request });
|
||||
const params = {
|
||||
keyPath: 'apps."google-calendar-app".tools',
|
||||
value: null,
|
||||
mergeStrategy: "replace",
|
||||
};
|
||||
|
||||
await expect(
|
||||
requestCodexAppServerJson({
|
||||
method: "config/value/write",
|
||||
requestParams: params,
|
||||
config: { agents: { defaults: { sandbox: { mode: "all" } } } },
|
||||
sessionKey: "sandboxed-session",
|
||||
}),
|
||||
).resolves.toEqual({ ok: true });
|
||||
|
||||
expect(request).toHaveBeenCalledWith("config/value/write", params, { timeoutMs: 60_000 });
|
||||
});
|
||||
|
||||
it("allows config reads in sandboxed sessions", async () => {
|
||||
const request = vi.fn(async () => ({ config: { apps: { apps: {} } } }));
|
||||
sharedClientMocks.getSharedCodexAppServerClient.mockResolvedValue({ request });
|
||||
const params = { includeLayers: false };
|
||||
|
||||
await expect(
|
||||
requestCodexAppServerJson({
|
||||
method: "config/read",
|
||||
requestParams: params,
|
||||
config: { agents: { defaults: { sandbox: { mode: "all" } } } },
|
||||
sessionKey: "sandboxed-session",
|
||||
}),
|
||||
).resolves.toEqual({ config: { apps: { apps: {} } } });
|
||||
|
||||
expect(request).toHaveBeenCalledWith("config/read", params, { timeoutMs: 60_000 });
|
||||
});
|
||||
|
||||
it("allows sandbox-pinned thread starts in sandboxed sessions", async () => {
|
||||
const request = vi.fn(async () => ({ thread: { id: "thread-1" }, model: "gpt-5.5" }));
|
||||
sharedClientMocks.getSharedCodexAppServerClient.mockResolvedValue({ request });
|
||||
|
||||
@@ -19,6 +19,8 @@ const DIRECT_METHOD_POLICIES = new Map<string, DirectMethodPolicy>([
|
||||
["account/read", "allowed-control-plane"],
|
||||
["app/list", "allowed-control-plane"],
|
||||
["config/mcpServer/reload", "allowed-control-plane"],
|
||||
["config/read", "allowed-control-plane"],
|
||||
["config/value/write", "allowed-control-plane"],
|
||||
["environment/add", "allowed-control-plane"],
|
||||
["experimentalFeature/enablement/set", "allowed-control-plane"],
|
||||
["feedback/upload", "allowed-control-plane"],
|
||||
|
||||
@@ -145,6 +145,35 @@ describe("codex app-server session binding", () => {
|
||||
expect(binding?.pluginAppPolicyContext).toEqual(pluginAppPolicyContext);
|
||||
});
|
||||
|
||||
it("round-trips always plugin app policy context destructive approval mode", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.json");
|
||||
const pluginAppPolicyContext = {
|
||||
fingerprint: "plugin-policy-always",
|
||||
apps: {
|
||||
"google-calendar-app": {
|
||||
configKey: "google-calendar",
|
||||
marketplaceName: "openai-curated" as const,
|
||||
pluginName: "google-calendar",
|
||||
allowDestructiveActions: true,
|
||||
destructiveApprovalMode: "always" as const,
|
||||
mcpServerNames: ["google-calendar"],
|
||||
},
|
||||
},
|
||||
pluginAppIds: {
|
||||
"google-calendar": ["google-calendar-app"],
|
||||
},
|
||||
};
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
threadId: "thread-123",
|
||||
cwd: tempDir,
|
||||
pluginAppPolicyContext,
|
||||
});
|
||||
|
||||
const binding = await readCodexAppServerBinding(sessionFile);
|
||||
|
||||
expect(binding?.pluginAppPolicyContext).toEqual(pluginAppPolicyContext);
|
||||
});
|
||||
|
||||
it("normalizes v1 plugin app policy context destructive approval modes", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.json");
|
||||
await fs.writeFile(
|
||||
|
||||
@@ -421,6 +421,9 @@ function readDestructiveApprovalMode(
|
||||
if (value === "auto") {
|
||||
return bindingSchemaVersion === 1 ? "allow" : "auto";
|
||||
}
|
||||
if (value === "always" && bindingSchemaVersion === 2) {
|
||||
return "always";
|
||||
}
|
||||
if (value === "on-request" && bindingSchemaVersion === 1) {
|
||||
return "auto";
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import path from "node:path";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
createCodexTrajectoryRecorder,
|
||||
recordCodexTrajectoryCompletion,
|
||||
recordCodexTrajectoryContext,
|
||||
resolveCodexTrajectoryAppendFlags,
|
||||
resolveCodexTrajectoryPointerFlags,
|
||||
@@ -80,7 +81,9 @@ describe("Codex trajectory recorder", () => {
|
||||
expect(content).not.toContain("secret");
|
||||
expect(content).not.toContain("sk-test-secret-token");
|
||||
expect(content).not.toContain("sk-other-secret-token");
|
||||
expect(fs.statSync(filePath).mode & 0o777).toBe(0o600);
|
||||
if (process.platform !== "win32") {
|
||||
expect(fs.statSync(filePath).mode & 0o777).toBe(0o600);
|
||||
}
|
||||
expect(fs.existsSync(path.join(tmpDir, "session.trajectory-path.json"))).toBe(true);
|
||||
});
|
||||
|
||||
@@ -253,4 +256,235 @@ describe("Codex trajectory recorder", () => {
|
||||
expect(parsed.data?.truncated).toBe(true);
|
||||
expect(parsed.data?.reason).toBe("trajectory-event-size-limit");
|
||||
});
|
||||
|
||||
it("preserves usage when truncating oversized model completion events", async () => {
|
||||
const tmpDir = makeTempDir();
|
||||
const sessionFile = path.join(tmpDir, "session.jsonl");
|
||||
const attempt = {
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
runId: "run-1",
|
||||
provider: "codex",
|
||||
modelId: "gpt-5.4",
|
||||
model: { api: "responses" },
|
||||
} as never;
|
||||
const usage = {
|
||||
input: 384_954,
|
||||
output: 5_624,
|
||||
cacheRead: 333_824,
|
||||
reasoningTokens: 2_038,
|
||||
total: 724_402,
|
||||
};
|
||||
const recorder = createCodexTrajectoryRecorder({
|
||||
cwd: tmpDir,
|
||||
attempt,
|
||||
env: {},
|
||||
});
|
||||
|
||||
const trajectoryRecorder = expectTrajectoryRecorder(recorder);
|
||||
recordCodexTrajectoryCompletion(trajectoryRecorder, {
|
||||
attempt,
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
timedOut: false,
|
||||
result: {
|
||||
aborted: false,
|
||||
attemptUsage: usage,
|
||||
assistantTexts: ["done"],
|
||||
messagesSnapshot: Array.from({ length: 20 }, (_value, index) => ({
|
||||
role: index % 2 === 0 ? "user" : "assistant",
|
||||
content: `message-${index} ${"x".repeat(32_000)}`,
|
||||
})),
|
||||
} as never,
|
||||
});
|
||||
await trajectoryRecorder.flush();
|
||||
|
||||
const parsed = JSON.parse(
|
||||
fs.readFileSync(path.join(tmpDir, "session.trajectory.jsonl"), "utf8"),
|
||||
);
|
||||
expect(parsed.type).toBe("model.completed");
|
||||
expect(parsed.data).toMatchObject({
|
||||
truncated: true,
|
||||
reason: "trajectory-event-size-limit",
|
||||
usage,
|
||||
});
|
||||
expect(parsed.data.messagesSnapshot).toBeUndefined();
|
||||
expect(parsed.data.droppedFields).toContain("messagesSnapshot");
|
||||
expect(Buffer.byteLength(JSON.stringify(parsed), "utf8")).toBeLessThanOrEqual(256 * 1024);
|
||||
});
|
||||
|
||||
it("drops oversized preserved fields when needed to keep completion events bounded", async () => {
|
||||
const tmpDir = makeTempDir();
|
||||
const sessionFile = path.join(tmpDir, "session.jsonl");
|
||||
const attempt = {
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
runId: "run-1",
|
||||
provider: "codex",
|
||||
modelId: "gpt-5.4",
|
||||
model: { api: "responses" },
|
||||
} as never;
|
||||
const oversizedUsage = Object.fromEntries(
|
||||
Array.from({ length: 100 }, (_value, index) => [`field-${index}`, "x".repeat(5_000)]),
|
||||
);
|
||||
const recorder = createCodexTrajectoryRecorder({
|
||||
cwd: tmpDir,
|
||||
attempt,
|
||||
env: {},
|
||||
});
|
||||
|
||||
const trajectoryRecorder = expectTrajectoryRecorder(recorder);
|
||||
recordCodexTrajectoryCompletion(trajectoryRecorder, {
|
||||
attempt,
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
timedOut: false,
|
||||
result: {
|
||||
aborted: false,
|
||||
attemptUsage: oversizedUsage,
|
||||
assistantTexts: ["x".repeat(32_000)],
|
||||
messagesSnapshot: [{ role: "assistant", content: "x".repeat(32_000) }],
|
||||
} as never,
|
||||
});
|
||||
await trajectoryRecorder.flush();
|
||||
|
||||
const parsed = JSON.parse(
|
||||
fs.readFileSync(path.join(tmpDir, "session.trajectory.jsonl"), "utf8"),
|
||||
);
|
||||
expect(parsed.data).toMatchObject({
|
||||
truncated: true,
|
||||
reason: "trajectory-event-size-limit",
|
||||
});
|
||||
expect(parsed.data.usage).toBeUndefined();
|
||||
expect(parsed.data.droppedFields).toEqual(
|
||||
expect.arrayContaining(["usage", "assistantTexts", "messagesSnapshot"]),
|
||||
);
|
||||
expect(Buffer.byteLength(JSON.stringify(parsed), "utf8")).toBeLessThanOrEqual(256 * 1024);
|
||||
});
|
||||
|
||||
it("preserves usage on non-final oversized model completion events", async () => {
|
||||
const tmpDir = makeTempDir();
|
||||
const sessionFile = path.join(tmpDir, "session.jsonl");
|
||||
const attempt = {
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
runId: "run-1",
|
||||
provider: "codex",
|
||||
modelId: "gpt-5.4",
|
||||
model: { api: "responses" },
|
||||
} as never;
|
||||
const firstUsage = {
|
||||
input: 384_954,
|
||||
output: 5_624,
|
||||
cacheRead: 333_824,
|
||||
reasoningTokens: 2_038,
|
||||
total: 724_402,
|
||||
};
|
||||
const secondUsage = { input: 12, output: 3, total: 15 };
|
||||
const recorder = createCodexTrajectoryRecorder({
|
||||
cwd: tmpDir,
|
||||
attempt,
|
||||
env: {},
|
||||
});
|
||||
|
||||
const trajectoryRecorder = expectTrajectoryRecorder(recorder);
|
||||
recordCodexTrajectoryCompletion(trajectoryRecorder, {
|
||||
attempt,
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
timedOut: false,
|
||||
result: {
|
||||
aborted: false,
|
||||
attemptUsage: firstUsage,
|
||||
assistantTexts: ["first"],
|
||||
messagesSnapshot: Array.from({ length: 20 }, (_value, index) => ({
|
||||
role: index % 2 === 0 ? "user" : "assistant",
|
||||
content: `message-${index} ${"x".repeat(32_000)}`,
|
||||
})),
|
||||
} as never,
|
||||
});
|
||||
recordCodexTrajectoryCompletion(trajectoryRecorder, {
|
||||
attempt,
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-2",
|
||||
timedOut: false,
|
||||
result: {
|
||||
aborted: false,
|
||||
attemptUsage: secondUsage,
|
||||
assistantTexts: ["final answer"],
|
||||
messagesSnapshot: [{ role: "assistant", content: "final answer" }],
|
||||
} as never,
|
||||
});
|
||||
await trajectoryRecorder.flush();
|
||||
|
||||
const events = fs
|
||||
.readFileSync(path.join(tmpDir, "session.trajectory.jsonl"), "utf8")
|
||||
.trim()
|
||||
.split(/\r?\n/u)
|
||||
.map((line) => JSON.parse(line));
|
||||
expect(events).toHaveLength(2);
|
||||
expect(events[0].data).toMatchObject({
|
||||
truncated: true,
|
||||
usage: firstUsage,
|
||||
});
|
||||
expect(events[1].data).toMatchObject({
|
||||
turnId: "turn-2",
|
||||
usage: secondUsage,
|
||||
assistantTexts: ["final answer"],
|
||||
});
|
||||
expect(events[1].data.truncated).toBeUndefined();
|
||||
});
|
||||
|
||||
it("redacts secrets before preserving usage in truncated completion events", async () => {
|
||||
const tmpDir = makeTempDir();
|
||||
const sessionFile = path.join(tmpDir, "session.jsonl");
|
||||
const attempt = {
|
||||
sessionFile,
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
runId: "run-1",
|
||||
provider: "codex",
|
||||
modelId: "gpt-5.4",
|
||||
model: { api: "responses" },
|
||||
} as never;
|
||||
const recorder = createCodexTrajectoryRecorder({
|
||||
cwd: tmpDir,
|
||||
attempt,
|
||||
env: {},
|
||||
});
|
||||
|
||||
const trajectoryRecorder = expectTrajectoryRecorder(recorder);
|
||||
recordCodexTrajectoryCompletion(trajectoryRecorder, {
|
||||
attempt,
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
timedOut: false,
|
||||
result: {
|
||||
aborted: false,
|
||||
attemptUsage: {
|
||||
total: 1,
|
||||
apiKey: "sk-test-secret-token",
|
||||
authorization: "Bearer sk-other-secret-token",
|
||||
},
|
||||
assistantTexts: ["done"],
|
||||
messagesSnapshot: Array.from({ length: 20 }, (_value, index) => ({
|
||||
role: index % 2 === 0 ? "user" : "assistant",
|
||||
content: `message-${index} ${"x".repeat(32_000)}`,
|
||||
})),
|
||||
} as never,
|
||||
});
|
||||
await trajectoryRecorder.flush();
|
||||
|
||||
const parsed = JSON.parse(
|
||||
fs.readFileSync(path.join(tmpDir, "session.trajectory.jsonl"), "utf8"),
|
||||
);
|
||||
const preservedUsage = JSON.stringify(parsed.data.usage);
|
||||
expect(parsed.data.truncated).toBe(true);
|
||||
expect(preservedUsage).toContain("redacted");
|
||||
expect(preservedUsage).not.toContain("sk-test-secret-token");
|
||||
expect(preservedUsage).not.toContain("sk-other-secret-token");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -40,6 +40,7 @@ const JWT_VALUE_RE = /\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]
|
||||
const COOKIE_PAIR_RE = /\b([A-Za-z][A-Za-z0-9_.-]{1,64})=([A-Za-z0-9+/._~%=-]{16,})(?=;|\s|$)/gu;
|
||||
const TRAJECTORY_RUNTIME_FILE_MAX_BYTES = 50 * 1024 * 1024;
|
||||
const TRAJECTORY_RUNTIME_EVENT_MAX_BYTES = 256 * 1024;
|
||||
const TRAJECTORY_RUNTIME_OVERSIZE_PRESERVED_DATA_KEYS = ["usage", "promptCache"] as const;
|
||||
|
||||
type CodexTrajectoryOpenFlagConstants = Pick<
|
||||
typeof nodeFs.constants,
|
||||
@@ -82,19 +83,57 @@ function boundedTrajectoryLine(event: Record<string, unknown>): string | undefin
|
||||
if (bytes <= TRAJECTORY_RUNTIME_EVENT_MAX_BYTES) {
|
||||
return `${line}\n`;
|
||||
}
|
||||
const truncated = JSON.stringify({
|
||||
...event,
|
||||
data: {
|
||||
truncated: true,
|
||||
originalBytes: bytes,
|
||||
limitBytes: TRAJECTORY_RUNTIME_EVENT_MAX_BYTES,
|
||||
reason: "trajectory-event-size-limit",
|
||||
},
|
||||
});
|
||||
if (Buffer.byteLength(truncated, "utf8") <= TRAJECTORY_RUNTIME_EVENT_MAX_BYTES) {
|
||||
return `${truncated}\n`;
|
||||
|
||||
const originalData =
|
||||
event.data && typeof event.data === "object" && !Array.isArray(event.data)
|
||||
? (event.data as Record<string, unknown>)
|
||||
: {};
|
||||
const originalDataKeys = Object.keys(originalData);
|
||||
const preservedDataKeys = new Set<string>();
|
||||
const baseData = {
|
||||
truncated: true,
|
||||
originalBytes: bytes,
|
||||
limitBytes: TRAJECTORY_RUNTIME_EVENT_MAX_BYTES,
|
||||
reason: "trajectory-event-size-limit",
|
||||
};
|
||||
const buildTruncatedLine = (includeDroppedFields: boolean): string | undefined => {
|
||||
const data: Record<string, unknown> = { ...baseData };
|
||||
for (const key of TRAJECTORY_RUNTIME_OVERSIZE_PRESERVED_DATA_KEYS) {
|
||||
if (preservedDataKeys.has(key)) {
|
||||
data[key] = originalData[key];
|
||||
}
|
||||
}
|
||||
if (includeDroppedFields) {
|
||||
const droppedFields = originalDataKeys.filter((key) => !preservedDataKeys.has(key));
|
||||
if (droppedFields.length > 0) {
|
||||
data.droppedFields = droppedFields;
|
||||
}
|
||||
}
|
||||
const truncated = JSON.stringify({ ...event, data });
|
||||
if (Buffer.byteLength(truncated, "utf8") <= TRAJECTORY_RUNTIME_EVENT_MAX_BYTES) {
|
||||
return `${truncated}\n`;
|
||||
}
|
||||
return undefined;
|
||||
};
|
||||
|
||||
let best = buildTruncatedLine(true) ?? buildTruncatedLine(false);
|
||||
if (!best) {
|
||||
return undefined;
|
||||
}
|
||||
return undefined;
|
||||
|
||||
for (const key of TRAJECTORY_RUNTIME_OVERSIZE_PRESERVED_DATA_KEYS) {
|
||||
if (!Object.hasOwn(originalData, key)) {
|
||||
continue;
|
||||
}
|
||||
preservedDataKeys.add(key);
|
||||
const next = buildTruncatedLine(true) ?? buildTruncatedLine(false);
|
||||
if (next) {
|
||||
best = next;
|
||||
continue;
|
||||
}
|
||||
preservedDataKeys.delete(key);
|
||||
}
|
||||
return best;
|
||||
}
|
||||
|
||||
function resolveTrajectoryPointerFilePath(sessionFile: string): string {
|
||||
|
||||
@@ -23,7 +23,7 @@ export type CodexPluginConfigEntry = {
|
||||
enabled?: boolean;
|
||||
marketplaceName?: string;
|
||||
pluginName?: string;
|
||||
allow_destructive_actions?: boolean | "auto";
|
||||
allow_destructive_actions?: boolean | "auto" | "always";
|
||||
};
|
||||
|
||||
export type CodexPluginsConfigBlock = {
|
||||
|
||||
@@ -43,7 +43,7 @@ export type CodexPluginMigrationConfigEntry = {
|
||||
configKey: string;
|
||||
pluginName: string;
|
||||
enabled: boolean;
|
||||
allowDestructiveActions?: "auto";
|
||||
allowDestructiveActions?: "auto" | "always";
|
||||
};
|
||||
|
||||
type CodexPluginMigrationBlockSkipDetails = {
|
||||
@@ -168,15 +168,18 @@ function isLegacyDestructivePolicyRepair(
|
||||
);
|
||||
}
|
||||
|
||||
function isLegacyDestructivePolicyConfigEntryRepair(
|
||||
function readExistingPluginAllowDestructiveActions(
|
||||
existing: unknown,
|
||||
pluginName: string,
|
||||
): boolean {
|
||||
): "auto" | "always" | undefined {
|
||||
const existingEntry = isRecord(existing) ? existing : undefined;
|
||||
return (
|
||||
existingEntry?.allow_destructive_actions === "on-request" &&
|
||||
existingEntry.pluginName === pluginName
|
||||
if (existingEntry?.pluginName !== pluginName) {
|
||||
return undefined;
|
||||
}
|
||||
const normalized = normalizeExistingAllowDestructiveActions(
|
||||
existingEntry.allow_destructive_actions,
|
||||
);
|
||||
return normalized === "auto" || normalized === "always" ? normalized : undefined;
|
||||
}
|
||||
|
||||
function buildPluginItems(
|
||||
@@ -203,12 +206,15 @@ function buildPluginItems(
|
||||
enabled: true,
|
||||
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
|
||||
pluginName: plugin.pluginName,
|
||||
...(isLegacyDestructivePolicyConfigEntryRepair(
|
||||
existingPluginEntries[configKey],
|
||||
plugin.pluginName,
|
||||
)
|
||||
? { allow_destructive_actions: "auto" }
|
||||
: {}),
|
||||
...(() => {
|
||||
const allowDestructiveActions = readExistingPluginAllowDestructiveActions(
|
||||
existingPluginEntries[configKey],
|
||||
plugin.pluginName,
|
||||
);
|
||||
return allowDestructiveActions
|
||||
? { allow_destructive_actions: allowDestructiveActions }
|
||||
: {};
|
||||
})(),
|
||||
};
|
||||
const conflict =
|
||||
!ctx.overwrite &&
|
||||
@@ -234,8 +240,9 @@ function buildPluginItems(
|
||||
pluginName: plugin.pluginName,
|
||||
sourceInstalled: plugin.installed === true,
|
||||
sourceEnabled: plugin.enabled === true,
|
||||
...(plannedEntry.allow_destructive_actions === "auto"
|
||||
? { allowDestructiveActions: "auto" }
|
||||
...(plannedEntry.allow_destructive_actions === "auto" ||
|
||||
plannedEntry.allow_destructive_actions === "always"
|
||||
? { allowDestructiveActions: plannedEntry.allow_destructive_actions }
|
||||
: {}),
|
||||
...(plugin.apps && plugin.apps.length > 0 && !shouldVerifyPluginApps(ctx)
|
||||
? { sourceAppVerification: CODEX_PLUGIN_SOURCE_APP_VERIFICATION_UNVERIFIED }
|
||||
@@ -310,13 +317,15 @@ export function readCodexPluginMigrationConfigEntry(
|
||||
configKey,
|
||||
pluginName,
|
||||
enabled,
|
||||
...(allowDestructiveActions === "auto" ? { allowDestructiveActions: "auto" } : {}),
|
||||
...(allowDestructiveActions === "auto" || allowDestructiveActions === "always"
|
||||
? { allowDestructiveActions }
|
||||
: {}),
|
||||
};
|
||||
}
|
||||
|
||||
function readExistingAllowDestructiveActions(
|
||||
config: MigrationProviderContext["config"],
|
||||
): boolean | "auto" | undefined {
|
||||
): boolean | "auto" | "always" | undefined {
|
||||
const value = readMigrationConfigPath(config as Record<string, unknown>, [
|
||||
...CODEX_PLUGIN_NATIVE_CONFIG_PATH,
|
||||
"allow_destructive_actions",
|
||||
@@ -324,8 +333,16 @@ function readExistingAllowDestructiveActions(
|
||||
return normalizeExistingAllowDestructiveActions(value);
|
||||
}
|
||||
|
||||
function normalizeExistingAllowDestructiveActions(value: unknown): boolean | "auto" | undefined {
|
||||
return value === "auto" || value === "on-request" ? "auto" : asBoolean(value);
|
||||
function normalizeExistingAllowDestructiveActions(
|
||||
value: unknown,
|
||||
): boolean | "auto" | "always" | undefined {
|
||||
if (value === "auto" || value === "on-request") {
|
||||
return "auto";
|
||||
}
|
||||
if (value === "always") {
|
||||
return "always";
|
||||
}
|
||||
return asBoolean(value);
|
||||
}
|
||||
|
||||
function readExistingPluginPolicyRepairs(
|
||||
|
||||
@@ -2108,6 +2108,76 @@ describe("buildCodexMigrationProvider", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves global always destructive plugin policy during migration", async () => {
|
||||
const fixture = await createCodexFixture();
|
||||
const configState: MigrationProviderContext["config"] = {
|
||||
plugins: {
|
||||
entries: {
|
||||
codex: {
|
||||
enabled: true,
|
||||
config: {
|
||||
codexPlugins: {
|
||||
enabled: true,
|
||||
allow_destructive_actions: "always",
|
||||
plugins: {},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
agents: { defaults: { workspace: fixture.workspaceDir } },
|
||||
} as MigrationProviderContext["config"];
|
||||
appServerRequest.mockImplementation(async ({ method }: { method: string }) => {
|
||||
if (method === "plugin/list") {
|
||||
return pluginList([pluginSummary("google-calendar", { installed: true, enabled: true })]);
|
||||
}
|
||||
if (method === "plugin/read") {
|
||||
return pluginRead("google-calendar");
|
||||
}
|
||||
if (method === "plugin/install") {
|
||||
return { authPolicy: "ON_USE", appsNeedingAuth: [] } satisfies v2.PluginInstallResponse;
|
||||
}
|
||||
if (method === "skills/list") {
|
||||
return { data: [] } satisfies v2.SkillsListResponse;
|
||||
}
|
||||
if (method === "hooks/list") {
|
||||
return { data: [] } satisfies v2.HooksListResponse;
|
||||
}
|
||||
if (method === "config/mcpServer/reload") {
|
||||
return {};
|
||||
}
|
||||
if (method === "app/list") {
|
||||
return appsList([]);
|
||||
}
|
||||
throw new Error(`unexpected request ${method}`);
|
||||
});
|
||||
const provider = buildCodexMigrationProvider({
|
||||
runtime: createConfigRuntime(configState),
|
||||
});
|
||||
|
||||
const result = await provider.apply(
|
||||
makeContext({
|
||||
source: fixture.codexHome,
|
||||
stateDir: fixture.stateDir,
|
||||
workspaceDir: fixture.workspaceDir,
|
||||
config: configState,
|
||||
}),
|
||||
);
|
||||
|
||||
expectRecordFields(findItem(result.items, "config:codex-plugins"), { status: "migrated" });
|
||||
expect(configState.plugins?.entries?.codex?.config?.codexPlugins).toEqual({
|
||||
enabled: true,
|
||||
allow_destructive_actions: "always",
|
||||
plugins: {
|
||||
"google-calendar": {
|
||||
enabled: true,
|
||||
marketplaceName: CODEX_PLUGINS_MARKETPLACE_NAME,
|
||||
pluginName: "google-calendar",
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("records auth-required plugin installs as disabled explicit config entries", async () => {
|
||||
const fixture = await createCodexFixture();
|
||||
const configState: MigrationProviderContext["config"] = {
|
||||
|
||||
@@ -207,4 +207,65 @@ describe("codex cli node sessions", () => {
|
||||
}),
|
||||
).rejects.toThrow("Codex CLI node command returned malformed payloadJSON.");
|
||||
});
|
||||
|
||||
it("keeps Codex history session previews on UTF-16 code point boundaries", async () => {
|
||||
const sessionId = "019e2007-1f7e-7eb1-a42b-8c01f4b9b5ce";
|
||||
const text = `${"a".repeat(136)}🤖tail`;
|
||||
await fs.writeFile(
|
||||
path.join(tempDir, "history.jsonl"),
|
||||
JSON.stringify({ session_id: sessionId, ts: 1778678322, text }),
|
||||
);
|
||||
|
||||
const command = createCodexCliSessionNodeHostCommands().find(
|
||||
(entry) => entry.command === CODEX_CLI_SESSIONS_LIST_COMMAND,
|
||||
);
|
||||
const raw = await command?.handle(JSON.stringify({ filter: "", limit: 5 }));
|
||||
const parsed = JSON.parse(raw ?? "{}") as {
|
||||
sessions?: Array<{ lastMessage?: string }>;
|
||||
};
|
||||
|
||||
expect(parsed.sessions?.[0]?.lastMessage).toBe(`${"a".repeat(136)}...`);
|
||||
expect(parsed.sessions?.[0]?.lastMessage).not.toContain("\ud83e");
|
||||
expect(parsed.sessions?.[0]?.lastMessage).not.toContain("\udd16");
|
||||
});
|
||||
|
||||
it("keeps Codex session-file previews on UTF-16 code point boundaries", async () => {
|
||||
const sessionId = "019e23d1-f33d-78e3-959e-0f56f30a5248";
|
||||
const sessionDir = path.join(tempDir, "sessions", "2026", "05", "14");
|
||||
const sessionFile = path.join(sessionDir, `rollout-2026-05-14T00-10-22-${sessionId}.jsonl`);
|
||||
const text = `${"b".repeat(136)}🤖tail`;
|
||||
|
||||
await fs.mkdir(sessionDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
sessionFile,
|
||||
[
|
||||
JSON.stringify({
|
||||
timestamp: "2026-05-14T00:10:23.618Z",
|
||||
type: "session_meta",
|
||||
payload: { id: sessionId, cwd: "/tmp/codex-work" },
|
||||
}),
|
||||
JSON.stringify({
|
||||
timestamp: "2026-05-14T00:10:23.619Z",
|
||||
type: "response_item",
|
||||
payload: {
|
||||
type: "message",
|
||||
role: "user",
|
||||
content: [{ type: "input_text", text }],
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
);
|
||||
|
||||
const command = createCodexCliSessionNodeHostCommands().find(
|
||||
(entry) => entry.command === CODEX_CLI_SESSIONS_LIST_COMMAND,
|
||||
);
|
||||
const raw = await command?.handle(JSON.stringify({ filter: "", limit: 5 }));
|
||||
const parsed = JSON.parse(raw ?? "{}") as {
|
||||
sessions?: Array<{ lastMessage?: string }>;
|
||||
};
|
||||
|
||||
expect(parsed.sessions?.[0]?.lastMessage).toBe(`${"b".repeat(136)}...`);
|
||||
expect(parsed.sessions?.[0]?.lastMessage).not.toContain("\ud83e");
|
||||
expect(parsed.sessions?.[0]?.lastMessage).not.toContain("\udd16");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -12,6 +12,7 @@ import type {
|
||||
import type { PluginRuntime } from "openclaw/plugin-sdk/plugin-runtime";
|
||||
import { isRecord } from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
import {
|
||||
materializeWindowsSpawnProgram,
|
||||
resolveWindowsSpawnProgram,
|
||||
@@ -691,7 +692,10 @@ function normalizeTimeoutMs(value: unknown): number {
|
||||
}
|
||||
|
||||
function truncateText(value: string, max: number): string {
|
||||
return value.length > max ? `${value.slice(0, max - 3)}...` : value;
|
||||
if (value.length <= max) {
|
||||
return value;
|
||||
}
|
||||
return `${truncateUtf16Safe(value, Math.max(0, max - 3))}...`;
|
||||
}
|
||||
|
||||
function compareOptionalStringsDesc(a?: string, b?: string): number {
|
||||
|
||||
@@ -36,6 +36,10 @@ type DuckDuckGoResult = {
|
||||
snippet: string;
|
||||
};
|
||||
|
||||
function isDecodableCodePoint(cp: number): boolean {
|
||||
return Number.isInteger(cp) && cp >= 0 && cp <= 0x10ffff && (cp < 0xd800 || cp > 0xdfff);
|
||||
}
|
||||
|
||||
function decodeHtmlEntities(text: string): string {
|
||||
return text.replace(
|
||||
/&(?:lt|gt|quot|apos|#39|#x27|#x2F|nbsp|ndash|mdash|hellip|amp|#\d+|#x[0-9a-f]+);/gi,
|
||||
@@ -72,10 +76,12 @@ function decodeHtmlEntities(text: string): string {
|
||||
return "&";
|
||||
}
|
||||
if (normalized.startsWith("&#x")) {
|
||||
return String.fromCodePoint(Number.parseInt(normalized.slice(3, -1), 16));
|
||||
const codePoint = Number.parseInt(normalized.slice(3, -1), 16);
|
||||
return isDecodableCodePoint(codePoint) ? String.fromCodePoint(codePoint) : entity;
|
||||
}
|
||||
if (normalized.startsWith("&#")) {
|
||||
return String.fromCodePoint(Number.parseInt(normalized.slice(2, -1), 10));
|
||||
const codePoint = Number.parseInt(normalized.slice(2, -1), 10);
|
||||
return isDecodableCodePoint(codePoint) ? String.fromCodePoint(codePoint) : entity;
|
||||
}
|
||||
return entity;
|
||||
},
|
||||
|
||||
@@ -205,6 +205,20 @@ describe("duckduckgo web search provider", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("leaves out-of-range numeric html entities intact instead of throwing", () => {
|
||||
expect(() => ddgClientTesting.decodeHtmlEntities("Result � end")).not.toThrow();
|
||||
expect(ddgClientTesting.decodeHtmlEntities("Result � end")).toBe(
|
||||
"Result � end",
|
||||
);
|
||||
expect(ddgClientTesting.decodeHtmlEntities("Hex � tail")).toBe("Hex � tail");
|
||||
// Surrogate-range entities would decode to lone UTF-16 surrogates; keep them intact.
|
||||
expect(ddgClientTesting.decodeHtmlEntities("Bad � end")).toBe("Bad � end");
|
||||
expect(ddgClientTesting.decodeHtmlEntities("Bad � end")).toBe("Bad � end");
|
||||
expect(ddgClientTesting.decodeHtmlEntities("Bad � end")).toBe("Bad � end");
|
||||
// A valid supplementary-plane entity still decodes.
|
||||
expect(ddgClientTesting.decodeHtmlEntities("Smile 😀")).toBe("Smile 😀");
|
||||
});
|
||||
|
||||
it("does not double-decode escaped entities (decodes & last)", () => {
|
||||
// A result whose text literally shows "<" arrives double-encoded as
|
||||
// "&lt;". Decoding & first would re-decode it into "<", corrupting
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
import {
|
||||
assertOkOrThrowProviderError,
|
||||
postJsonRequest,
|
||||
readProviderJsonResponse,
|
||||
type ProviderRequestTransportOverrides,
|
||||
} from "openclaw/plugin-sdk/provider-http";
|
||||
import {
|
||||
@@ -97,11 +98,11 @@ async function generateGeminiInlineDataText(params: {
|
||||
try {
|
||||
await assertOkOrThrowProviderError(res, params.httpErrorLabel);
|
||||
|
||||
const payload = (await res.json()) as {
|
||||
const payload = await readProviderJsonResponse<{
|
||||
candidates?: Array<{
|
||||
content?: { parts?: Array<{ text?: string }> };
|
||||
}>;
|
||||
};
|
||||
}>(res, params.httpErrorLabel);
|
||||
const parts = payload.candidates?.[0]?.content?.parts ?? [];
|
||||
const text = parts
|
||||
.map((part) => part?.text?.trim())
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
// Google tests cover media understanding provider.video plugin behavior.
|
||||
import { createServer, type Server } from "node:http";
|
||||
import {
|
||||
createRequestCaptureJsonFetch,
|
||||
installPinnedHostnameTestHooks,
|
||||
@@ -10,6 +11,49 @@ import { resolveGoogleGenerativeAiHttpRequestConfig } from "./runtime-api.js";
|
||||
|
||||
installPinnedHostnameTestHooks();
|
||||
|
||||
const LOOPBACK_RESPONSE_BYTES = 18 * 1024 * 1024;
|
||||
|
||||
async function listenLoopbackServer(server: Server): Promise<number> {
|
||||
return await new Promise((resolve, reject) => {
|
||||
server.once("error", reject);
|
||||
server.listen(0, "127.0.0.1", () => {
|
||||
server.off("error", reject);
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
reject(new Error("expected loopback TCP address"));
|
||||
return;
|
||||
}
|
||||
resolve(address.port);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function createOversizedJsonServer(): { server: Server; closed: Promise<number> } {
|
||||
let resolveClosed: (sentBytes: number) => void = () => {};
|
||||
const closed = new Promise<number>((resolve) => {
|
||||
resolveClosed = resolve;
|
||||
});
|
||||
const server = createServer((_req, res) => {
|
||||
let sentBytes = 0;
|
||||
const chunk = Buffer.alloc(64 * 1024, 0x20);
|
||||
res.writeHead(200, { "content-type": "application/json" });
|
||||
const timer = setInterval(() => {
|
||||
if (sentBytes >= LOOPBACK_RESPONSE_BYTES) {
|
||||
clearInterval(timer);
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
sentBytes += chunk.length;
|
||||
res.write(chunk);
|
||||
}, 1);
|
||||
res.on("close", () => {
|
||||
clearInterval(timer);
|
||||
resolveClosed(sentBytes);
|
||||
});
|
||||
});
|
||||
return { server, closed };
|
||||
}
|
||||
|
||||
describe("describeGeminiVideo", () => {
|
||||
it("respects case-insensitive x-goog-api-key overrides", async () => {
|
||||
let seenKey: string | null = null;
|
||||
@@ -114,6 +158,29 @@ describe("describeGeminiVideo", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("bounds oversized video JSON responses and closes the stream early", async () => {
|
||||
const { server, closed } = createOversizedJsonServer();
|
||||
const port = await listenLoopbackServer(server);
|
||||
const fetchFn = withFetchPreconnect(async () =>
|
||||
fetch(`http://127.0.0.1:${port}/google-video-json`),
|
||||
);
|
||||
|
||||
try {
|
||||
await expect(
|
||||
describeGeminiVideo({
|
||||
buffer: Buffer.from("video-bytes"),
|
||||
fileName: "clip.mp4",
|
||||
apiKey: "test-key",
|
||||
timeoutMs: 1500,
|
||||
fetchFn,
|
||||
}),
|
||||
).rejects.toThrow(/JSON response exceeds 16777216 bytes/u);
|
||||
await expect(closed).resolves.toBeLessThan(LOOPBACK_RESPONSE_BYTES);
|
||||
} finally {
|
||||
server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects non-Google video base URLs before sending authenticated requests", async () => {
|
||||
await expect(
|
||||
describeGeminiVideo({
|
||||
|
||||
@@ -20,6 +20,8 @@ const {
|
||||
let buildGoogleSpeechProvider: typeof import("./speech-provider.js").buildGoogleSpeechProvider;
|
||||
let testing: typeof import("./speech-provider.js").testing;
|
||||
|
||||
const GOOGLE_TTS_JSON_CAP_BYTES = 16 * 1024 * 1024;
|
||||
|
||||
beforeAll(async () => {
|
||||
({ buildGoogleSpeechProvider, testing } = await import("./speech-provider.js"));
|
||||
});
|
||||
@@ -56,6 +58,26 @@ function installGoogleTtsRequestMock(pcm = Buffer.from([1, 0, 2, 0])) {
|
||||
return postJsonRequestMock;
|
||||
}
|
||||
|
||||
function oversizedGoogleTtsJsonResponse(onCancel: () => void): Response {
|
||||
const response = new Response(
|
||||
new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
controller.enqueue(new Uint8Array(GOOGLE_TTS_JSON_CAP_BYTES + 1));
|
||||
},
|
||||
cancel() {
|
||||
onCancel();
|
||||
},
|
||||
}),
|
||||
{ headers: { "content-type": "application/json" }, status: 200 },
|
||||
);
|
||||
Object.defineProperty(response, "json", {
|
||||
value: async () => {
|
||||
throw new Error("unbounded json reader was used");
|
||||
},
|
||||
});
|
||||
return response;
|
||||
}
|
||||
|
||||
function expectRecordFields(value: unknown, expected: Record<string, unknown>) {
|
||||
if (!value || typeof value !== "object") {
|
||||
throw new Error("Expected record");
|
||||
@@ -149,6 +171,39 @@ describe("Google speech provider", () => {
|
||||
expect(transcodeAudioBufferToOpusMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("bounds oversized Gemini TTS success JSON responses and cancels the stream", async () => {
|
||||
let cancelCount = 0;
|
||||
const release = vi.fn(async () => {});
|
||||
postJsonRequestMock
|
||||
.mockResolvedValueOnce({
|
||||
response: oversizedGoogleTtsJsonResponse(() => {
|
||||
cancelCount += 1;
|
||||
}),
|
||||
release,
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
response: oversizedGoogleTtsJsonResponse(() => {
|
||||
cancelCount += 1;
|
||||
}),
|
||||
release,
|
||||
});
|
||||
const provider = buildGoogleSpeechProvider();
|
||||
|
||||
await expect(
|
||||
provider.synthesize({
|
||||
text: "oversized tts response",
|
||||
cfg: {},
|
||||
providerConfig: {
|
||||
apiKey: "google-test-key",
|
||||
},
|
||||
target: "audio-file",
|
||||
timeoutMs: 12_000,
|
||||
}),
|
||||
).rejects.toThrow("Google TTS response: JSON response exceeds 16777216 bytes");
|
||||
expect(cancelCount).toBe(2);
|
||||
expect(release).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("transcodes Gemini PCM to Opus for voice-note targets", async () => {
|
||||
installGoogleTtsRequestMock(Buffer.from([5, 0, 6, 0]));
|
||||
transcodeAudioBufferToOpusMock.mockResolvedValueOnce(Buffer.from("google-opus"));
|
||||
|
||||
@@ -3,6 +3,7 @@ import { transcodeAudioBufferToOpus } from "openclaw/plugin-sdk/media-runtime";
|
||||
import {
|
||||
assertOkOrThrowProviderError,
|
||||
postJsonRequest,
|
||||
readProviderJsonResponse,
|
||||
sanitizeConfiguredModelProviderRequest,
|
||||
} from "openclaw/plugin-sdk/provider-http";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/provider-onboard";
|
||||
@@ -503,7 +504,11 @@ async function synthesizeGoogleTtsPcmOnce(params: {
|
||||
}
|
||||
}
|
||||
try {
|
||||
return extractGoogleSpeechPcm((await res.json()) as GoogleGenerateSpeechResponse);
|
||||
const payload = await readProviderJsonResponse<GoogleGenerateSpeechResponse>(
|
||||
res,
|
||||
"Google TTS response",
|
||||
);
|
||||
return extractGoogleSpeechPcm(payload);
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
throw new GoogleTtsRetryableError(message);
|
||||
|
||||
@@ -476,6 +476,45 @@ describe("google transport stream", () => {
|
||||
expect(result.content[2]).toHaveProperty("thoughtSignature", "Y2FsbF9zaWdfMQ==");
|
||||
});
|
||||
|
||||
it("preserves MAX_TOKENS when the partial response contains a function call", async () => {
|
||||
guardedFetchMock.mockResolvedValueOnce(
|
||||
buildSseResponse([
|
||||
{
|
||||
candidates: [
|
||||
{
|
||||
content: {
|
||||
parts: [{ functionCall: { name: "lookup", args: { q: "hello" } } }],
|
||||
},
|
||||
finishReason: "MAX_TOKENS",
|
||||
},
|
||||
],
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
const streamFn = createGoogleGenerativeAiTransportStreamFn();
|
||||
const stream = await Promise.resolve(
|
||||
streamFn(
|
||||
buildGeminiModel(),
|
||||
{
|
||||
messages: [{ role: "user", content: "hello", timestamp: 0 }],
|
||||
tools: [
|
||||
{
|
||||
name: "lookup",
|
||||
description: "Look up a value",
|
||||
parameters: { type: "object" },
|
||||
},
|
||||
],
|
||||
} as Parameters<typeof streamFn>[1],
|
||||
{ apiKey: "gemini-api-key" } as Parameters<typeof streamFn>[2],
|
||||
),
|
||||
);
|
||||
const result = await stream.result();
|
||||
|
||||
expect(result.stopReason).toBe("length");
|
||||
expect(result.content).toEqual([expect.objectContaining({ type: "toolCall", name: "lookup" })]);
|
||||
});
|
||||
|
||||
it("strips redundant google provider prefixes from Gemini API model paths", async () => {
|
||||
guardedFetchMock.mockResolvedValueOnce(buildSseResponse([]));
|
||||
|
||||
|
||||
@@ -1404,7 +1404,12 @@ function createGoogleTransportStreamFn(kind: CanonicalGoogleTransportApi): Strea
|
||||
}
|
||||
if (typeof candidate?.finishReason === "string") {
|
||||
output.stopReason = mapStopReasonString(candidate.finishReason);
|
||||
if (output.content.some((block) => block.type === "toolCall")) {
|
||||
// MAX_TOKENS can leave a complete-looking partial call. Only a normal
|
||||
// Google stop may promote parsed calls into an executable tool-use turn.
|
||||
if (
|
||||
output.stopReason === "stop" &&
|
||||
output.content.some((block) => block.type === "toolCall")
|
||||
) {
|
||||
output.stopReason = "toolUse";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,6 +110,45 @@ function createDeferred<T>(): {
|
||||
return { promise, reject, resolve };
|
||||
}
|
||||
|
||||
type CardPayloadWithTextWidgets = {
|
||||
cardsV2: Array<{
|
||||
card: {
|
||||
sections?: Array<{
|
||||
header?: string;
|
||||
widgets?: Array<{ textParagraph?: { text: string } }>;
|
||||
}>;
|
||||
};
|
||||
}>;
|
||||
};
|
||||
|
||||
function getTextParagraphText(payload: unknown, header: string): string {
|
||||
const text = (payload as CardPayloadWithTextWidgets).cardsV2[0]?.card.sections?.find(
|
||||
(section) => section.header === header,
|
||||
)?.widgets?.[0]?.textParagraph?.text;
|
||||
if (typeof text !== "string") {
|
||||
throw new Error(`Expected ${header} text paragraph`);
|
||||
}
|
||||
return text;
|
||||
}
|
||||
|
||||
function isUtf16WellFormed(value: string): boolean {
|
||||
for (let index = 0; index < value.length; index += 1) {
|
||||
const codeUnit = value.charCodeAt(index);
|
||||
if (codeUnit >= 0xd800 && codeUnit <= 0xdbff) {
|
||||
const nextCodeUnit = index + 1 < value.length ? value.charCodeAt(index + 1) : -1;
|
||||
if (nextCodeUnit < 0xdc00 || nextCodeUnit > 0xdfff) {
|
||||
return false;
|
||||
}
|
||||
index += 1;
|
||||
continue;
|
||||
}
|
||||
if (codeUnit >= 0xdc00 && codeUnit <= 0xdfff) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
describe("googleChatApprovalNativeRuntime", () => {
|
||||
async function preparePendingDelivery(view = createPendingView()) {
|
||||
const nowMs = Date.now();
|
||||
@@ -149,6 +188,31 @@ describe("googleChatApprovalNativeRuntime", () => {
|
||||
return { pendingPayload, plannedTarget, prepared, request, view };
|
||||
}
|
||||
|
||||
it("keeps truncated pending command card text UTF-16 well formed", async () => {
|
||||
const view = createPendingView();
|
||||
view.commandText = `${"a".repeat(1796)}😀${"b".repeat(100)}`;
|
||||
|
||||
const { pendingPayload } = await preparePendingDelivery(view);
|
||||
const commandText = getTextParagraphText(pendingPayload, "Command");
|
||||
|
||||
expect(commandText.length).toBeLessThanOrEqual(1800);
|
||||
expect(commandText.endsWith("...")).toBe(true);
|
||||
expect(isUtf16WellFormed(commandText)).toBe(true);
|
||||
expect(JSON.stringify(pendingPayload.cardsV2)).not.toContain("\\ud83d");
|
||||
});
|
||||
|
||||
it("preserves a complete astral character when it fits before the truncation suffix", async () => {
|
||||
const view = createPendingView();
|
||||
view.commandText = `${"a".repeat(1795)}😀${"b".repeat(100)}`;
|
||||
|
||||
const { pendingPayload } = await preparePendingDelivery(view);
|
||||
const commandText = getTextParagraphText(pendingPayload, "Command");
|
||||
|
||||
expect(commandText).toBe(`${"a".repeat(1795)}😀...`);
|
||||
expect(commandText.length).toBe(1800);
|
||||
expect(isUtf16WellFormed(commandText)).toBe(true);
|
||||
});
|
||||
|
||||
it("sends pending cards and updates the delivered message without buttons", async () => {
|
||||
sendGoogleChatMessage.mockResolvedValue({ messageName: "spaces/AAA/messages/msg-1" });
|
||||
updateGoogleChatMessage.mockResolvedValue({ messageName: "spaces/AAA/messages/msg-1" });
|
||||
|
||||
@@ -9,6 +9,7 @@ import { buildChannelApprovalNativeTargetKey } from "openclaw/plugin-sdk/approva
|
||||
import type { ExecApprovalDecision } from "openclaw/plugin-sdk/approval-runtime";
|
||||
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
import { resolveGoogleChatAccount, type ResolvedGoogleChatAccount } from "./accounts.js";
|
||||
import { sendGoogleChatMessage, updateGoogleChatMessage } from "./api.js";
|
||||
import {
|
||||
@@ -87,7 +88,7 @@ function escapeGoogleChatText(text: string): string {
|
||||
}
|
||||
|
||||
function truncateText(text: string, maxChars = MAX_TEXT_PARAGRAPH_CHARS): string {
|
||||
return text.length <= maxChars ? text : `${text.slice(0, maxChars - 3)}...`;
|
||||
return text.length <= maxChars ? text : `${truncateUtf16Safe(text, maxChars - 3)}...`;
|
||||
}
|
||||
|
||||
function buildMetadataText(metadata: readonly { label: string; value: string }[]): string {
|
||||
|
||||
@@ -15,3 +15,21 @@ describe("irc outbound chunking", () => {
|
||||
expect(ircOutboundBaseAdapter.textChunkLimit).toBe(350);
|
||||
});
|
||||
});
|
||||
|
||||
describe("irc outbound sanitizeText", () => {
|
||||
afterEach(() => {
|
||||
clearIrcRuntime();
|
||||
});
|
||||
|
||||
it("strips internal tool-trace banners before outbound delivery", () => {
|
||||
const text = "Done.\n⚠️ 🛠️ `search repos (agent)` failed";
|
||||
|
||||
expect(ircOutboundBaseAdapter.sanitizeText({ text })).toBe("Done.");
|
||||
});
|
||||
|
||||
it("preserves ordinary assistant prose while sanitizing", () => {
|
||||
const text = "The pipeline has 3 open deals.";
|
||||
|
||||
expect(ircOutboundBaseAdapter.sanitizeText({ text })).toBe(text);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Irc plugin module implements outbound base behavior.
|
||||
import { sanitizeForPlainText } from "openclaw/plugin-sdk/channel-outbound";
|
||||
import { sanitizeAssistantVisibleText } from "openclaw/plugin-sdk/text-chunking";
|
||||
import { chunkTextForOutbound } from "./channel-api.js";
|
||||
|
||||
export const ircOutboundBaseAdapter = {
|
||||
@@ -7,5 +8,9 @@ export const ircOutboundBaseAdapter = {
|
||||
chunker: chunkTextForOutbound,
|
||||
chunkerMode: "markdown" as const,
|
||||
textChunkLimit: 350,
|
||||
sanitizeText: ({ text }: { text: string }) => sanitizeForPlainText(text),
|
||||
// IRC's plain-text pass does not remove assistant scaffolding. Run the
|
||||
// canonical delivery sanitizer first so internal tool traces are dropped
|
||||
// before channel formatting.
|
||||
sanitizeText: ({ text }: { text: string }) =>
|
||||
sanitizeForPlainText(sanitizeAssistantVisibleText(text)),
|
||||
};
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Msteams plugin module implements feedback reflection prompt behavior.
|
||||
import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
|
||||
/** Max chars of the thumbed-down response to include in the reflection prompt. */
|
||||
const MAX_RESPONSE_CHARS = 500;
|
||||
@@ -19,7 +20,7 @@ export function buildReflectionPrompt(params: {
|
||||
if (params.thumbedDownResponse) {
|
||||
const truncated =
|
||||
params.thumbedDownResponse.length > MAX_RESPONSE_CHARS
|
||||
? `${params.thumbedDownResponse.slice(0, MAX_RESPONSE_CHARS)}...`
|
||||
? `${truncateUtf16Safe(params.thumbedDownResponse, MAX_RESPONSE_CHARS)}...`
|
||||
: params.thumbedDownResponse;
|
||||
parts.push(`\nYour response was:\n> ${truncated}`);
|
||||
}
|
||||
|
||||
@@ -19,6 +19,11 @@ import { msteamsRuntimeStub } from "./test-support/runtime.js";
|
||||
|
||||
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
|
||||
|
||||
// Matches an unpaired UTF-16 surrogate (lone high or lone low), without relying
|
||||
// on the ES2024 String.prototype.isWellFormed() runtime API.
|
||||
const UNPAIRED_SURROGATE_RE =
|
||||
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/;
|
||||
|
||||
describe("buildFeedbackEvent", () => {
|
||||
it("builds a well-formed custom event", () => {
|
||||
const event = buildFeedbackEvent({
|
||||
@@ -73,6 +78,26 @@ describe("buildReflectionPrompt", () => {
|
||||
expect(prompt.length).toBeLessThan(longResponse.length + 500);
|
||||
});
|
||||
|
||||
it("does not split UTF-16 surrogate pairs when truncating a thumbed-down response", () => {
|
||||
const thumbedDownResponse = `${"a".repeat(499)}🦞${"b".repeat(20)}`;
|
||||
|
||||
const prompt = buildReflectionPrompt({ thumbedDownResponse });
|
||||
|
||||
expect(prompt).not.toMatch(UNPAIRED_SURROGATE_RE);
|
||||
expect(prompt).toContain(`${"a".repeat(499)}...`);
|
||||
expect(prompt).not.toContain("\ud83e");
|
||||
expect(prompt).not.toContain("\udd9e");
|
||||
});
|
||||
|
||||
it("keeps a boundary emoji when it fully fits before the truncation cap", () => {
|
||||
const thumbedDownResponse = `${"a".repeat(498)}🦞${"b".repeat(20)}`;
|
||||
|
||||
const prompt = buildReflectionPrompt({ thumbedDownResponse });
|
||||
|
||||
expect(prompt).not.toMatch(UNPAIRED_SURROGATE_RE);
|
||||
expect(prompt).toContain(`${"a".repeat(498)}🦞...`);
|
||||
});
|
||||
|
||||
it("includes user comment when provided", () => {
|
||||
const prompt = buildReflectionPrompt({
|
||||
thumbedDownResponse: "Some response",
|
||||
|
||||
@@ -10,6 +10,11 @@ import {
|
||||
summarizeParentMessage,
|
||||
} from "./thread-parent-context.js";
|
||||
|
||||
// Matches an unpaired UTF-16 surrogate (lone high or lone low), without relying
|
||||
// on the ES2024 String.prototype.isWellFormed() runtime API.
|
||||
const UNPAIRED_SURROGATE_RE =
|
||||
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/;
|
||||
|
||||
describe("summarizeParentMessage", () => {
|
||||
it("returns undefined for missing message", () => {
|
||||
expect(summarizeParentMessage(undefined)).toBeUndefined();
|
||||
@@ -81,6 +86,20 @@ describe("summarizeParentMessage", () => {
|
||||
expect(summary?.text.length).toBeLessThanOrEqual(400);
|
||||
expect(summary?.text.endsWith("…")).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps truncated parent text well-formed when truncating surrogate pairs", () => {
|
||||
const msg: GraphThreadMessage = {
|
||||
id: "p1",
|
||||
from: { user: { displayName: "Dana" } },
|
||||
body: { content: `${"a".repeat(398)}🦞${"b".repeat(50)}`, contentType: "text" },
|
||||
};
|
||||
|
||||
const summary = summarizeParentMessage(msg);
|
||||
|
||||
expect(summary?.text).not.toMatch(UNPAIRED_SURROGATE_RE);
|
||||
expect(summary?.text).toBe(`${"a".repeat(398)}…`);
|
||||
expect(summary?.text.endsWith("\ud83e…")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatParentContextEvent", () => {
|
||||
|
||||
@@ -17,6 +17,7 @@ import {
|
||||
asDateTimestampMs,
|
||||
resolveExpiresAtMsFromDurationMs,
|
||||
} from "openclaw/plugin-sdk/number-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
import { fetchChannelMessage, stripHtmlFromTeamsMessage } from "./graph-thread.js";
|
||||
import type { GraphThreadMessage } from "./graph-thread.js";
|
||||
|
||||
@@ -138,7 +139,9 @@ export function summarizeParentMessage(
|
||||
return {
|
||||
sender,
|
||||
text:
|
||||
text.length > PARENT_TEXT_MAX_CHARS ? `${text.slice(0, PARENT_TEXT_MAX_CHARS - 1)}…` : text,
|
||||
text.length > PARENT_TEXT_MAX_CHARS
|
||||
? `${truncateUtf16Safe(text, PARENT_TEXT_MAX_CHARS - 1)}…`
|
||||
: text,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -232,6 +232,55 @@ describe("SeenTracker", () => {
|
||||
tracker.stop();
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it.each([-1, 0])("falls back to default TTL for non-positive ttlMs %s", (ttlMs) => {
|
||||
vi.useFakeTimers();
|
||||
const tracker = createTracker({ ttlMs, pruneIntervalMs: 10 * 60 * 1000 });
|
||||
|
||||
try {
|
||||
tracker.add("id1");
|
||||
vi.advanceTimersByTime(1);
|
||||
expect(tracker.peek("id1")).toBe(true);
|
||||
} finally {
|
||||
tracker.stop();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("falls back to default TTL for infinite ttlMs", () => {
|
||||
vi.useFakeTimers();
|
||||
const tracker = createTracker({
|
||||
ttlMs: Number.POSITIVE_INFINITY,
|
||||
pruneIntervalMs: 10 * 60 * 1000,
|
||||
});
|
||||
|
||||
try {
|
||||
tracker.add("id1");
|
||||
vi.advanceTimersByTime(60 * 60 * 1000 + 1);
|
||||
expect(tracker.peek("id1")).toBe(false);
|
||||
} finally {
|
||||
tracker.stop();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it.each([-1, 0, Number.POSITIVE_INFINITY])(
|
||||
"uses the default prune interval for unsafe pruneIntervalMs %s",
|
||||
(pruneIntervalMs) => {
|
||||
vi.useFakeTimers();
|
||||
const setIntervalSpy = vi.spyOn(globalThis, "setInterval");
|
||||
const tracker = createTracker({ pruneIntervalMs });
|
||||
|
||||
try {
|
||||
expect(setIntervalSpy).toHaveBeenCalledTimes(1);
|
||||
expect(setIntervalSpy.mock.calls[0]?.[1]).toBe(10 * 60 * 1000);
|
||||
} finally {
|
||||
tracker.stop();
|
||||
setIntervalSpy.mockRestore();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -3,7 +3,10 @@
|
||||
* Prevents unbounded memory growth under high load or abuse.
|
||||
*/
|
||||
|
||||
import { resolveIntegerOption } from "openclaw/plugin-sdk/number-runtime";
|
||||
import {
|
||||
resolveIntegerOption,
|
||||
resolvePositiveTimerTimeoutMs,
|
||||
} from "openclaw/plugin-sdk/number-runtime";
|
||||
|
||||
interface SeenTrackerOptions {
|
||||
/** Maximum number of entries to track (default: 100,000) */
|
||||
@@ -45,8 +48,8 @@ interface Entry {
|
||||
*/
|
||||
export function createSeenTracker(options?: SeenTrackerOptions): SeenTracker {
|
||||
const maxEntries = resolveIntegerOption(options?.maxEntries, 100_000, { min: 1 });
|
||||
const ttlMs = options?.ttlMs ?? 60 * 60 * 1000; // 1 hour
|
||||
const pruneIntervalMs = options?.pruneIntervalMs ?? 10 * 60 * 1000; // 10 minutes
|
||||
const ttlMs = resolvePositiveTimerTimeoutMs(options?.ttlMs, 60 * 60 * 1000);
|
||||
const pruneIntervalMs = resolvePositiveTimerTimeoutMs(options?.pruneIntervalMs, 10 * 60 * 1000);
|
||||
|
||||
// Main storage
|
||||
const entries = new Map<string, Entry>();
|
||||
|
||||
@@ -98,13 +98,13 @@ describe("buildAssistantMessage", () => {
|
||||
expect(msg.stopReason).toBe("length");
|
||||
});
|
||||
|
||||
it("keeps tool use authoritative over a length stop", () => {
|
||||
it("keeps a length stop authoritative over complete-looking tool calls", () => {
|
||||
const response = makeOllamaResponse({
|
||||
done_reason: "length",
|
||||
tool_calls: [{ function: { name: "read", arguments: { path: "README.md" } } }],
|
||||
});
|
||||
const msg = buildAssistantMessage(response, MODEL_INFO);
|
||||
expect(msg.stopReason).toBe("toolUse");
|
||||
expect(msg.stopReason).toBe("length");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -282,6 +282,32 @@ describe("createOllamaStreamFn thinking events", () => {
|
||||
expect(done.message?.stopReason).toBe("length");
|
||||
});
|
||||
|
||||
it("preserves a native length stop when the partial response contains tool calls", async () => {
|
||||
const events = await streamOllamaEvents(
|
||||
[
|
||||
makeOllamaResponse({
|
||||
done_reason: "length",
|
||||
tool_calls: [{ function: { name: "read", arguments: { path: "README.md" } } }],
|
||||
}),
|
||||
],
|
||||
{},
|
||||
{
|
||||
messages: [{ role: "user", content: "test" }],
|
||||
tools: [{ name: "read", description: "Read files", parameters: { type: "object" } }],
|
||||
} as never,
|
||||
);
|
||||
|
||||
const done = events.find((event) => event.type === "done") as {
|
||||
reason?: string;
|
||||
message?: { content?: Array<Record<string, unknown>>; stopReason?: string };
|
||||
};
|
||||
expect(done.reason).toBe("length");
|
||||
expect(done.message?.stopReason).toBe("length");
|
||||
expect(done.message?.content).toEqual([
|
||||
expect.objectContaining({ type: "toolCall", name: "read" }),
|
||||
]);
|
||||
});
|
||||
|
||||
it("uses generic stream timeout for Ollama request timeout", async () => {
|
||||
await streamOllamaEvents([makeOllamaResponse({ content: "ok" })], { timeoutMs: 2500 });
|
||||
|
||||
|
||||
@@ -656,10 +656,15 @@ function estimateTokensFromChars(chars: number): number {
|
||||
}
|
||||
|
||||
function resolveOllamaStopReason(response: OllamaChatResponse) {
|
||||
// Ollama's length terminal means generation hit its token limit, even when
|
||||
// the partial response already contains a complete-looking tool call.
|
||||
if (response.done_reason === "length") {
|
||||
return "length" as const;
|
||||
}
|
||||
if (response.message.tool_calls?.length) {
|
||||
return "toolUse" as const;
|
||||
}
|
||||
return response.done_reason === "length" ? ("length" as const) : ("stop" as const);
|
||||
return "stop" as const;
|
||||
}
|
||||
|
||||
function estimateOllamaPromptTokens(params: {
|
||||
|
||||
@@ -68,4 +68,30 @@ describe("qa live timeout policy", () => {
|
||||
),
|
||||
).toBe(240_000);
|
||||
});
|
||||
|
||||
it("uses the anthropic floor for claude-cli sonnet turns", () => {
|
||||
expect(
|
||||
resolveQaLiveTurnTimeoutMs(
|
||||
{
|
||||
providerMode: "live-frontier",
|
||||
primaryModel: "claude-cli/claude-sonnet-4-6",
|
||||
alternateModel: "claude-cli/claude-opus-4-8",
|
||||
},
|
||||
30_000,
|
||||
),
|
||||
).toBe(180_000);
|
||||
});
|
||||
|
||||
it("uses the opus floor for claude-cli opus turns", () => {
|
||||
expect(
|
||||
resolveQaLiveTurnTimeoutMs(
|
||||
{
|
||||
providerMode: "live-frontier",
|
||||
primaryModel: "claude-cli/claude-opus-4-8",
|
||||
alternateModel: "claude-cli/claude-opus-4-8",
|
||||
},
|
||||
30_000,
|
||||
),
|
||||
).toBe(240_000);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -9,6 +9,13 @@ function isAnthropicModel(modelRef: string) {
|
||||
return modelRef.startsWith("anthropic/");
|
||||
}
|
||||
|
||||
// claude-cli is an Anthropic-backed Claude runtime, so it shares the Anthropic
|
||||
// turn-timeout floors; mirror the claude-cli==anthropic precedent in the aimock
|
||||
// and mock-openai servers.
|
||||
function isAnthropicFamilyModel(modelRef: string) {
|
||||
return isAnthropicModel(modelRef) || modelRef.startsWith("claude-cli/");
|
||||
}
|
||||
|
||||
function isQaFastModeModelRef(modelRef: string) {
|
||||
return isOpenAiModel(modelRef);
|
||||
}
|
||||
@@ -18,7 +25,7 @@ function isGptFiveModel(modelRef: string) {
|
||||
}
|
||||
|
||||
function isClaudeOpusModel(modelRef: string) {
|
||||
return isAnthropicModel(modelRef) && modelRef.includes("claude-opus");
|
||||
return isAnthropicFamilyModel(modelRef) && modelRef.includes("claude-opus");
|
||||
}
|
||||
|
||||
export const liveFrontierProviderDefinition: QaProviderDefinition = {
|
||||
@@ -39,7 +46,7 @@ export const liveFrontierProviderDefinition: QaProviderDefinition = {
|
||||
if (isClaudeOpusModel(modelRef)) {
|
||||
return Math.max(fallbackMs, 240_000);
|
||||
}
|
||||
if (isAnthropicModel(modelRef)) {
|
||||
if (isAnthropicFamilyModel(modelRef)) {
|
||||
return Math.max(fallbackMs, 180_000);
|
||||
}
|
||||
if (isGptFiveModel(modelRef)) {
|
||||
|
||||
@@ -97,11 +97,45 @@ describe("engine/tools/remind-logic", () => {
|
||||
expect(generateJobName("drink water")).toBe("Reminder: drink water");
|
||||
});
|
||||
|
||||
it("truncates long content", () => {
|
||||
const long = "a very long reminder content that exceeds twenty characters";
|
||||
const name = generateJobName(long);
|
||||
expect(name.length).toBeLessThan(40);
|
||||
expect(name).toContain("…");
|
||||
it("truncates long content to a 20 UTF-16-unit budget with an ellipsis", () => {
|
||||
expect(generateJobName("a very long reminder content")).toBe(
|
||||
"Reminder: a very long reminder…",
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps an exactly-fitting all-emoji content unchanged", () => {
|
||||
// 10 emoji = 20 UTF-16 units, exactly at the budget, so no truncation.
|
||||
expect(generateJobName("😀".repeat(10))).toBe(`Reminder: ${"😀".repeat(10)}`);
|
||||
});
|
||||
|
||||
it("does not split surrogate pairs when truncating", () => {
|
||||
const hasLoneSurrogate = (value: string): boolean => {
|
||||
for (let index = 0; index < value.length; index++) {
|
||||
const code = value.charCodeAt(index);
|
||||
if (code >= 0xd800 && code <= 0xdbff) {
|
||||
const next = value.charCodeAt(index + 1);
|
||||
if (!(next >= 0xdc00 && next <= 0xdfff)) {
|
||||
return true;
|
||||
}
|
||||
index++;
|
||||
} else if (code >= 0xdc00 && code <= 0xdfff) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
// 11 emoji = 22 UTF-16 units > 20; the 11th emoji straddles the cap and is
|
||||
// dropped whole rather than split into a lone surrogate.
|
||||
const allEmoji = generateJobName("😀".repeat(11));
|
||||
expect(allEmoji).toBe(`Reminder: ${"😀".repeat(10)}…`);
|
||||
expect(hasLoneSurrogate(allEmoji)).toBe(false);
|
||||
|
||||
// 19 ASCII + emoji: the emoji's high surrogate would land at unit 20, so the
|
||||
// whole pair is dropped to stay within the 20-unit budget.
|
||||
const name = generateJobName(`${"x".repeat(19)}😀tail`);
|
||||
expect(name).toBe(`Reminder: ${"x".repeat(19)}…`);
|
||||
expect(hasLoneSurrogate(name)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Qqbot plugin module implements remind logic behavior.
|
||||
import { resolveExpiresAtMsFromDurationMs } from "openclaw/plugin-sdk/number-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
|
||||
/**
|
||||
* QQBot reminder tool core logic.
|
||||
@@ -171,7 +172,7 @@ export function isCronExpression(timeStr: string): boolean {
|
||||
*/
|
||||
export function generateJobName(content: string): string {
|
||||
const trimmed = content.trim();
|
||||
const short = trimmed.length > 20 ? `${trimmed.slice(0, 20)}…` : trimmed;
|
||||
const short = trimmed.length > 20 ? `${truncateUtf16Safe(trimmed, 20)}…` : trimmed;
|
||||
return `Reminder: ${short}`;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
// Qqbot tests cover stt plugin behavior.
|
||||
import * as fs from "node:fs";
|
||||
import * as path from "node:path";
|
||||
import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { withTempDir } from "openclaw/plugin-sdk/test-env";
|
||||
import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const ssrfRuntimeMocks = vi.hoisted(() => ({
|
||||
fetchWithSsrFGuard: vi.fn(),
|
||||
@@ -41,6 +41,36 @@ function cancelTrackedResponse(
|
||||
};
|
||||
}
|
||||
|
||||
function largeTranscriptionJsonResponse(params: { chunkCount: number; chunkSize: number }): {
|
||||
response: Response;
|
||||
getReadCount: () => number;
|
||||
} {
|
||||
let chunkIndex = 0;
|
||||
const encoder = new TextEncoder();
|
||||
const chunks = [
|
||||
'{"text":"',
|
||||
...Array.from({ length: params.chunkCount }, () => "a".repeat(params.chunkSize)),
|
||||
'"}',
|
||||
];
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
pull(controller) {
|
||||
if (chunkIndex >= chunks.length) {
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
controller.enqueue(encoder.encode(chunks[chunkIndex]));
|
||||
chunkIndex += 1;
|
||||
},
|
||||
});
|
||||
return {
|
||||
response: new Response(stream, {
|
||||
status: 200,
|
||||
headers: { "content-type": "application/json" },
|
||||
}),
|
||||
getReadCount: () => chunkIndex,
|
||||
};
|
||||
}
|
||||
|
||||
function requireFirstSsrfRequest(): {
|
||||
url?: unknown;
|
||||
auditContext?: unknown;
|
||||
@@ -177,6 +207,44 @@ describe("engine/utils/stt", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("bounds successful STT JSON responses before parsing", async () => {
|
||||
await withTempDir("openclaw-qqbot-stt-success-limit-", async (tmpDir) => {
|
||||
const audioPath = path.join(tmpDir, "voice.wav");
|
||||
fs.writeFileSync(audioPath, Buffer.from([1, 2, 3, 4]));
|
||||
|
||||
const release = vi.fn(async () => {});
|
||||
const streamed = largeTranscriptionJsonResponse({
|
||||
chunkCount: 18,
|
||||
chunkSize: 1024 * 1024,
|
||||
});
|
||||
ssrfRuntimeMocks.fetchWithSsrFGuard.mockResolvedValueOnce({
|
||||
response: streamed.response,
|
||||
release,
|
||||
});
|
||||
|
||||
let error: unknown;
|
||||
try {
|
||||
await transcribeAudio(audioPath, {
|
||||
channels: {
|
||||
qqbot: {
|
||||
stt: {
|
||||
baseUrl: "https://api.example.test/v1/",
|
||||
apiKey: "secret",
|
||||
model: "whisper-1",
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
} catch (caught) {
|
||||
error = caught;
|
||||
}
|
||||
|
||||
expect(String(error)).toContain("qqbot.stt: JSON response exceeds 16777216 bytes");
|
||||
expect(streamed.getReadCount()).toBeLessThan(20);
|
||||
expect(release).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
it("bounds STT error bodies without using response.text()", async () => {
|
||||
await withTempDir("openclaw-qqbot-stt-error-", async (tmpDir) => {
|
||||
const audioPath = path.join(tmpDir, "voice.wav");
|
||||
|
||||
@@ -8,7 +8,10 @@
|
||||
import * as fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { mimeTypeFromFilePath } from "openclaw/plugin-sdk/media-mime";
|
||||
import { readResponseTextLimited } from "openclaw/plugin-sdk/provider-http";
|
||||
import {
|
||||
readProviderJsonResponse,
|
||||
readResponseTextLimited,
|
||||
} from "openclaw/plugin-sdk/provider-http";
|
||||
import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime";
|
||||
import {
|
||||
normalizeOptionalString,
|
||||
@@ -100,7 +103,7 @@ export async function transcribeAudio(
|
||||
throw new Error(`STT failed (HTTP ${resp.status}): ${detail.slice(0, 300)}`);
|
||||
}
|
||||
|
||||
const result = (await resp.json()) as { text?: string };
|
||||
const result = await readProviderJsonResponse<{ text?: string }>(resp, "qqbot.stt");
|
||||
return normalizeOptionalString(result.text) ?? null;
|
||||
} finally {
|
||||
await release();
|
||||
|
||||
@@ -6,7 +6,6 @@ import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { z } from "zod";
|
||||
import { resolveSlackAccount } from "./accounts.js";
|
||||
import { validateSlackBlocksArray } from "./blocks-input.js";
|
||||
import { createSlackApiUrlClientOptions } from "./client-options.js";
|
||||
import { createSlackWebClient, getSlackWriteClient } from "./client.js";
|
||||
import { buildSlackEditTextPayload } from "./edit-text.js";
|
||||
import { resolveSlackMedia } from "./monitor/media.js";
|
||||
@@ -72,22 +71,6 @@ function resolveToken(explicit?: string, accountId?: string, cfg?: OpenClawConfi
|
||||
return token;
|
||||
}
|
||||
|
||||
function resolveSlackActionClientOptions(opts: SlackActionClientOpts) {
|
||||
if (!opts.cfg) {
|
||||
return createSlackApiUrlClientOptions();
|
||||
}
|
||||
const cfg = requireRuntimeConfig(opts.cfg, "Slack actions");
|
||||
resolveSlackAccount({ cfg, accountId: opts.accountId });
|
||||
return createSlackApiUrlClientOptions();
|
||||
}
|
||||
|
||||
function slackActionClientOptionArgs(
|
||||
opts: SlackActionClientOpts,
|
||||
): [] | [ReturnType<typeof createSlackApiUrlClientOptions>] {
|
||||
const clientOptions = resolveSlackActionClientOptions(opts);
|
||||
return clientOptions.slackApiUrl ? [clientOptions] : [];
|
||||
}
|
||||
|
||||
function normalizeEmoji(raw: string) {
|
||||
const trimmed = raw.trim();
|
||||
if (!trimmed) {
|
||||
@@ -148,10 +131,7 @@ async function getClient(opts: SlackActionClientOpts = {}, mode: "read" | "write
|
||||
return opts.client;
|
||||
}
|
||||
const token = resolveToken(opts.token, opts.accountId, opts.cfg);
|
||||
const clientOptionArgs = slackActionClientOptionArgs(opts);
|
||||
return mode === "write"
|
||||
? getSlackWriteClient(token, ...clientOptionArgs)
|
||||
: createSlackWebClient(token, ...clientOptionArgs);
|
||||
return mode === "write" ? getSlackWriteClient(token) : createSlackWebClient(token);
|
||||
}
|
||||
|
||||
async function resolveBotUserId(client: WebClient) {
|
||||
|
||||
@@ -33,11 +33,190 @@ function readChatUpdatePayload(
|
||||
return payload as ChatUpdatePayload;
|
||||
}
|
||||
|
||||
const UNPAIRED_SURROGATE_RE =
|
||||
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/;
|
||||
|
||||
function readMrkdwnTexts(blocks: unknown): string[] {
|
||||
if (!Array.isArray(blocks)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const texts: string[] = [];
|
||||
for (const block of blocks) {
|
||||
if (!block || typeof block !== "object") {
|
||||
continue;
|
||||
}
|
||||
|
||||
const text = (block as { text?: unknown }).text;
|
||||
if (
|
||||
text &&
|
||||
typeof text === "object" &&
|
||||
(text as { type?: unknown }).type === "mrkdwn" &&
|
||||
typeof (text as { text?: unknown }).text === "string"
|
||||
) {
|
||||
texts.push((text as { text: string }).text);
|
||||
}
|
||||
|
||||
const elements = (block as { elements?: unknown }).elements;
|
||||
if (!Array.isArray(elements)) {
|
||||
continue;
|
||||
}
|
||||
for (const element of elements) {
|
||||
if (
|
||||
element &&
|
||||
typeof element === "object" &&
|
||||
(element as { type?: unknown }).type === "mrkdwn" &&
|
||||
typeof (element as { text?: unknown }).text === "string"
|
||||
) {
|
||||
texts.push((element as { text: string }).text);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return texts;
|
||||
}
|
||||
|
||||
function findApprovalMrkdwn(payload: SlackPayload, prefix: string): string {
|
||||
const text = readMrkdwnTexts(payload.blocks).find((entry) => entry.startsWith(prefix));
|
||||
if (!text) {
|
||||
throw new Error(`Expected Slack mrkdwn block starting with ${prefix}`);
|
||||
}
|
||||
return text;
|
||||
}
|
||||
|
||||
describe("slackApprovalNativeRuntime", () => {
|
||||
it("subscribes to plugin approval events", () => {
|
||||
expect(slackApprovalNativeRuntime.eventKinds).toEqual(["exec", "plugin"]);
|
||||
});
|
||||
|
||||
it("does not leave dangling surrogates when truncating exec approval command mrkdwn", async () => {
|
||||
const commandText = `${"a".repeat(2598)}😀tail`;
|
||||
const payload = (await slackApprovalNativeRuntime.presentation.buildPendingPayload({
|
||||
cfg: {} as never,
|
||||
accountId: "default",
|
||||
context: {
|
||||
app: {} as never,
|
||||
config: {} as never,
|
||||
},
|
||||
request: {
|
||||
id: "req-surrogate",
|
||||
request: {
|
||||
command: commandText,
|
||||
},
|
||||
createdAtMs: 0,
|
||||
expiresAtMs: 60_000,
|
||||
},
|
||||
approvalKind: "exec",
|
||||
nowMs: 0,
|
||||
view: {
|
||||
approvalKind: "exec",
|
||||
approvalId: "req-surrogate",
|
||||
commandText,
|
||||
metadata: [],
|
||||
actions: [
|
||||
{
|
||||
decision: "allow-once",
|
||||
label: "Allow Once",
|
||||
command: "/approve req-surrogate allow-once",
|
||||
style: "success",
|
||||
},
|
||||
],
|
||||
} as never,
|
||||
})) as SlackPayload;
|
||||
|
||||
const commandMrkdwn = findApprovalMrkdwn(payload, "*Command*");
|
||||
expect(commandMrkdwn).toMatch(/…\n```$/);
|
||||
expect(UNPAIRED_SURROGATE_RE.test(commandMrkdwn)).toBe(false);
|
||||
});
|
||||
|
||||
it("does not leave dangling surrogates when truncating plugin approval request mrkdwn", async () => {
|
||||
const title = `${"a".repeat(2598)}😀tail`;
|
||||
const payload = (await slackApprovalNativeRuntime.presentation.buildPendingPayload({
|
||||
cfg: {} as never,
|
||||
accountId: "default",
|
||||
context: {
|
||||
app: {} as never,
|
||||
config: {} as never,
|
||||
},
|
||||
request: {
|
||||
id: "plugin:req-surrogate",
|
||||
request: {
|
||||
title,
|
||||
description: "Needs approval.",
|
||||
},
|
||||
createdAtMs: 0,
|
||||
expiresAtMs: 60_000,
|
||||
},
|
||||
approvalKind: "plugin",
|
||||
nowMs: 0,
|
||||
view: {
|
||||
approvalKind: "plugin",
|
||||
phase: "pending",
|
||||
approvalId: "plugin:req-surrogate",
|
||||
title,
|
||||
description: "Needs approval.",
|
||||
severity: "warning",
|
||||
pluginId: "test-plugin",
|
||||
toolName: "test-tool",
|
||||
metadata: [],
|
||||
actions: [
|
||||
{
|
||||
decision: "deny",
|
||||
label: "Deny",
|
||||
command: "/approve plugin:req-surrogate deny",
|
||||
style: "danger",
|
||||
},
|
||||
],
|
||||
expiresAtMs: 60_000,
|
||||
} as never,
|
||||
})) as SlackPayload;
|
||||
|
||||
const requestMrkdwn = findApprovalMrkdwn(payload, "*Request*");
|
||||
expect(requestMrkdwn).toMatch(/…$/);
|
||||
expect(UNPAIRED_SURROGATE_RE.test(requestMrkdwn)).toBe(false);
|
||||
});
|
||||
|
||||
it("still truncates plain BMP approval mrkdwn at the Slack approval preview limit", async () => {
|
||||
const commandText = "b".repeat(2700);
|
||||
const payload = (await slackApprovalNativeRuntime.presentation.buildPendingPayload({
|
||||
cfg: {} as never,
|
||||
accountId: "default",
|
||||
context: {
|
||||
app: {} as never,
|
||||
config: {} as never,
|
||||
},
|
||||
request: {
|
||||
id: "req-bmp",
|
||||
request: {
|
||||
command: commandText,
|
||||
},
|
||||
createdAtMs: 0,
|
||||
expiresAtMs: 60_000,
|
||||
},
|
||||
approvalKind: "exec",
|
||||
nowMs: 0,
|
||||
view: {
|
||||
approvalKind: "exec",
|
||||
approvalId: "req-bmp",
|
||||
commandText,
|
||||
metadata: [],
|
||||
actions: [
|
||||
{
|
||||
decision: "allow-once",
|
||||
label: "Allow Once",
|
||||
command: "/approve req-bmp allow-once",
|
||||
style: "success",
|
||||
},
|
||||
],
|
||||
} as never,
|
||||
})) as SlackPayload;
|
||||
|
||||
const commandMrkdwn = findApprovalMrkdwn(payload, "*Command*");
|
||||
expect(commandMrkdwn).toMatch(/…\n```$/);
|
||||
expect(commandMrkdwn).toContain(`${"b".repeat(2599)}…`);
|
||||
expect(UNPAIRED_SURROGATE_RE.test(commandMrkdwn)).toBe(false);
|
||||
});
|
||||
|
||||
it("renders only the allowed pending actions", async () => {
|
||||
const payload = (await slackApprovalNativeRuntime.presentation.buildPendingPayload({
|
||||
cfg: {} as never,
|
||||
|
||||
@@ -19,6 +19,7 @@ import { buildApprovalPresentationFromActionDescriptors } from "openclaw/plugin-
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
|
||||
import { logError } from "openclaw/plugin-sdk/logging-core";
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
import {
|
||||
isSlackAnyNativeApprovalClientEnabled,
|
||||
resolveSlackApprovalKind,
|
||||
@@ -73,7 +74,14 @@ function resolveHandlerContext(params: ChannelApprovalCapabilityHandlerContext):
|
||||
}
|
||||
|
||||
function truncateSlackMrkdwn(text: string, maxChars: number): string {
|
||||
return text.length <= maxChars ? text : `${text.slice(0, maxChars - 1)}…`;
|
||||
const limit = Math.max(0, Math.floor(maxChars));
|
||||
if (text.length <= limit) {
|
||||
return text;
|
||||
}
|
||||
if (limit <= 1) {
|
||||
return truncateUtf16Safe(text, limit);
|
||||
}
|
||||
return `${truncateUtf16Safe(text, limit - 1)}…`;
|
||||
}
|
||||
|
||||
function buildSlackCodeBlock(text: string): string {
|
||||
|
||||
@@ -4,7 +4,6 @@ import {
|
||||
normalizeOptionalString,
|
||||
} from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { resolveSlackAccount } from "./accounts.js";
|
||||
import { createSlackApiUrlClientOptions } from "./client-options.js";
|
||||
import { createSlackWebClient } from "./client.js";
|
||||
import { normalizeAllowListLower } from "./monitor/allow-list.js";
|
||||
import type { OpenClawConfig } from "./runtime-api.js";
|
||||
@@ -77,7 +76,7 @@ export async function resolveSlackConversationInfo(params: {
|
||||
}
|
||||
|
||||
try {
|
||||
const client = createSlackWebClient(token, createSlackApiUrlClientOptions());
|
||||
const client = createSlackWebClient(token);
|
||||
if (isNativeImChannel) {
|
||||
const opened = await client.conversations.open({
|
||||
channel: channelId,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Slack plugin module implements channel behavior.
|
||||
import {
|
||||
buildLegacyDmAccountAllowlistAdapter,
|
||||
createAccountScopedAllowlistNameResolver,
|
||||
createFlatAllowlistOverrideResolver,
|
||||
} from "openclaw/plugin-sdk/allowlist-config-edit";
|
||||
import { adaptScopedAccountAccessor } from "openclaw/plugin-sdk/channel-config-helpers";
|
||||
@@ -51,7 +52,6 @@ import {
|
||||
type OpenClawConfig,
|
||||
} from "./channel-api.js";
|
||||
import { resolveSlackChannelType, resolveSlackConversationInfo } from "./channel-type.js";
|
||||
import { createSlackApiUrlClientOptions, type SlackApiUrlClientOptions } from "./client-options.js";
|
||||
import { shouldSuppressLocalSlackExecApprovalPrompt } from "./exec-approvals.js";
|
||||
import { resolveSlackGroupRequireMention, resolveSlackGroupToolPolicy } from "./group-policy.js";
|
||||
import {
|
||||
@@ -405,40 +405,19 @@ function formatSlackScopeDiagnostic(params: {
|
||||
} as const;
|
||||
}
|
||||
|
||||
function slackApiUrlOptionArgs(): [] | [SlackApiUrlClientOptions] {
|
||||
const options = createSlackApiUrlClientOptions();
|
||||
return options.slackApiUrl ? [options] : [];
|
||||
}
|
||||
|
||||
const resolveSlackAllowlistGroupOverrides = createFlatAllowlistOverrideResolver({
|
||||
resolveRecord: (account: ResolvedSlackAccount) => account.channels,
|
||||
label: (key) => key,
|
||||
resolveEntries: (value) => value?.users,
|
||||
});
|
||||
|
||||
const resolveSlackAllowlistNames = async ({
|
||||
accountId,
|
||||
cfg,
|
||||
entries,
|
||||
}: {
|
||||
accountId?: string | null;
|
||||
cfg: OpenClawConfig;
|
||||
entries: string[];
|
||||
}) => {
|
||||
const account = resolveSlackAccount({ cfg, accountId });
|
||||
const token =
|
||||
normalizeOptionalString(account.userToken) ?? normalizeOptionalString(account.botToken);
|
||||
if (!token) {
|
||||
return [];
|
||||
}
|
||||
return await (
|
||||
await loadSlackResolveUsersModule()
|
||||
).resolveSlackUserAllowlist({
|
||||
token,
|
||||
entries,
|
||||
...createSlackApiUrlClientOptions(),
|
||||
});
|
||||
};
|
||||
const resolveSlackAllowlistNames = createAccountScopedAllowlistNameResolver({
|
||||
resolveAccount: resolveSlackAccount,
|
||||
resolveToken: (account: ResolvedSlackAccount) =>
|
||||
normalizeOptionalString(account.userToken) ?? normalizeOptionalString(account.botToken),
|
||||
resolveNames: async ({ token, entries }) =>
|
||||
(await loadSlackResolveUsersModule()).resolveSlackUserAllowlist({ token, entries }),
|
||||
});
|
||||
|
||||
const slackChannelOutbound: ChannelOutboundAdapter = {
|
||||
deliveryMode: "direct",
|
||||
@@ -675,7 +654,6 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount, SlackProbe> = crea
|
||||
(await loadSlackResolveChannelsModule()).resolveSlackChannelAllowlist({
|
||||
token,
|
||||
entries: inputsValue,
|
||||
...createSlackApiUrlClientOptions(),
|
||||
}),
|
||||
mapResolved: (entry) =>
|
||||
toResolvedTarget(entry, entry.archived ? "archived" : undefined),
|
||||
@@ -683,14 +661,14 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount, SlackProbe> = crea
|
||||
}
|
||||
return resolveTargetsWithOptionalToken({
|
||||
token:
|
||||
normalizeOptionalString(account.userToken) ?? normalizeOptionalString(account.botToken),
|
||||
normalizeOptionalString(account.userToken) ??
|
||||
normalizeOptionalString(account.botToken),
|
||||
inputs,
|
||||
missingTokenNote: "missing Slack token",
|
||||
resolveWithToken: async ({ token, inputs: inputsLocal }) =>
|
||||
(await loadSlackResolveUsersModule()).resolveSlackUserAllowlist({
|
||||
token,
|
||||
entries: inputsLocal,
|
||||
...createSlackApiUrlClientOptions(),
|
||||
}),
|
||||
mapResolved: (entry) => toResolvedTarget(entry, entry.note),
|
||||
});
|
||||
@@ -717,9 +695,7 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount, SlackProbe> = crea
|
||||
if (!token) {
|
||||
return { ok: false, error: "missing token" };
|
||||
}
|
||||
return await (
|
||||
await loadSlackProbeModule()
|
||||
).probeSlack(token, timeoutMs, ...slackApiUrlOptionArgs());
|
||||
return await (await loadSlackProbeModule()).probeSlack(token, timeoutMs);
|
||||
},
|
||||
formatCapabilitiesProbe: ({ probe }) => {
|
||||
const slackProbe = probe as SlackProbe | undefined;
|
||||
@@ -739,14 +715,13 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount, SlackProbe> = crea
|
||||
const botToken = account.botToken?.trim();
|
||||
const userToken = account.userToken?.trim();
|
||||
const { fetchSlackScopes } = await loadSlackScopesModule();
|
||||
const apiUrlOptionArgs = slackApiUrlOptionArgs();
|
||||
const botScopes: SlackScopesResultShape = botToken
|
||||
? await fetchSlackScopes(botToken, timeoutMs, ...apiUrlOptionArgs)
|
||||
? await fetchSlackScopes(botToken, timeoutMs)
|
||||
: { ok: false, error: "Slack bot token missing." };
|
||||
lines.push(formatSlackScopeDiagnostic({ tokenType: "bot", result: botScopes }));
|
||||
details.botScopes = botScopes;
|
||||
if (userToken) {
|
||||
const userScopes = await fetchSlackScopes(userToken, timeoutMs, ...apiUrlOptionArgs);
|
||||
const userScopes = await fetchSlackScopes(userToken, timeoutMs);
|
||||
lines.push(formatSlackScopeDiagnostic({ tokenType: "user", result: userScopes }));
|
||||
details.userScopes = userScopes;
|
||||
}
|
||||
|
||||
@@ -3,8 +3,6 @@ import type { Agent } from "node:http";
|
||||
import type { RetryOptions, WebClientOptions } from "@slack/web-api";
|
||||
import { createNodeProxyAgent } from "openclaw/plugin-sdk/fetch-runtime";
|
||||
|
||||
export type SlackApiUrlClientOptions = Pick<WebClientOptions, "slackApiUrl">;
|
||||
|
||||
export const SLACK_DEFAULT_RETRY_OPTIONS: RetryOptions = {
|
||||
retries: 2,
|
||||
factor: 2,
|
||||
@@ -32,11 +30,12 @@ export const SLACK_WRITE_RETRY_OPTIONS: RetryOptions = {
|
||||
* Returns `undefined` when no proxy env var is configured or when Slack hosts
|
||||
* are excluded by `NO_PROXY`.
|
||||
*/
|
||||
function resolveSlackProxyAgent(targetUrl: string): Agent | undefined {
|
||||
function resolveSlackProxyAgent(): Agent | undefined {
|
||||
try {
|
||||
return createNodeProxyAgent({
|
||||
mode: "env",
|
||||
targetUrl,
|
||||
targetUrl: "https://slack.com/",
|
||||
protocol: "https",
|
||||
});
|
||||
} catch {
|
||||
// Malformed proxy URL; degrade gracefully to direct connection.
|
||||
@@ -44,38 +43,19 @@ function resolveSlackProxyAgent(targetUrl: string): Agent | undefined {
|
||||
}
|
||||
}
|
||||
|
||||
function resolveSlackApiUrlFromOptions(
|
||||
options: Pick<WebClientOptions, "slackApiUrl">,
|
||||
): string | undefined {
|
||||
const explicit = options.slackApiUrl?.trim();
|
||||
const envDefault = process.env.SLACK_API_URL?.trim();
|
||||
return explicit || envDefault || undefined;
|
||||
}
|
||||
|
||||
export function createSlackApiUrlClientOptions(): SlackApiUrlClientOptions {
|
||||
const slackApiUrl = process.env.SLACK_API_URL?.trim();
|
||||
return slackApiUrl ? { slackApiUrl } : {};
|
||||
}
|
||||
|
||||
export function resolveSlackWebClientOptions(options: WebClientOptions = {}): WebClientOptions {
|
||||
const slackApiUrl = resolveSlackApiUrlFromOptions(options);
|
||||
const proxyTargetUrl = slackApiUrl ?? "https://slack.com/";
|
||||
return {
|
||||
...options,
|
||||
agent: options.agent ?? resolveSlackProxyAgent(proxyTargetUrl),
|
||||
agent: options.agent ?? resolveSlackProxyAgent(),
|
||||
retryConfig: options.retryConfig ?? SLACK_DEFAULT_RETRY_OPTIONS,
|
||||
...(slackApiUrl ? { slackApiUrl } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
export function resolveSlackWriteClientOptions(options: WebClientOptions = {}): WebClientOptions {
|
||||
const slackApiUrl = resolveSlackApiUrlFromOptions(options);
|
||||
const proxyTargetUrl = slackApiUrl ?? "https://slack.com/";
|
||||
return {
|
||||
...options,
|
||||
agent: options.agent ?? resolveSlackProxyAgent(proxyTargetUrl),
|
||||
agent: options.agent ?? resolveSlackProxyAgent(),
|
||||
retryConfig: options.retryConfig ?? SLACK_WRITE_RETRY_OPTIONS,
|
||||
maxRequestConcurrency: options.maxRequestConcurrency ?? 1,
|
||||
...(slackApiUrl ? { slackApiUrl } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@ let SLACK_DEFAULT_RETRY_OPTIONS: typeof import("./client.js").SLACK_DEFAULT_RETR
|
||||
let SLACK_WRITE_RETRY_OPTIONS: typeof import("./client.js").SLACK_WRITE_RETRY_OPTIONS;
|
||||
let WebClient: ReturnType<typeof vi.fn>;
|
||||
|
||||
const SLACK_API_URL_KEYS = ["SLACK_API_URL", "OPENCLAW_SLACK_API_URL"] as const;
|
||||
const PROXY_KEYS = [
|
||||
"HTTPS_PROXY",
|
||||
"HTTP_PROXY",
|
||||
@@ -57,22 +56,6 @@ function restoreProxyEnvForTest() {
|
||||
}
|
||||
}
|
||||
|
||||
function clearSlackApiUrlEnvForTest() {
|
||||
for (const key of SLACK_API_URL_KEYS) {
|
||||
delete process.env[key];
|
||||
}
|
||||
}
|
||||
|
||||
function restoreSlackApiUrlEnvForTest() {
|
||||
for (const key of SLACK_API_URL_KEYS) {
|
||||
if (originalEnv[key] !== undefined) {
|
||||
process.env[key] = originalEnv[key];
|
||||
} else {
|
||||
delete process.env[key];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function requireAgent<T extends { agent?: unknown }>(options: T): NonNullable<T["agent"]> {
|
||||
if (!options.agent) {
|
||||
throw new Error("expected proxy agent");
|
||||
@@ -107,11 +90,6 @@ beforeAll(async () => {
|
||||
beforeEach(() => {
|
||||
WebClient.mockClear();
|
||||
clearSlackWriteClientCacheForTest();
|
||||
clearSlackApiUrlEnvForTest();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
restoreSlackApiUrlEnvForTest();
|
||||
});
|
||||
|
||||
describe("slack web client config", () => {
|
||||
@@ -128,40 +106,6 @@ describe("slack web client config", () => {
|
||||
expect(options.retryConfig).toBe(customRetry);
|
||||
});
|
||||
|
||||
it("uses explicit Slack API URL as the Slack Web API root", () => {
|
||||
expect(
|
||||
resolveSlackWebClientOptions({ slackApiUrl: "http://127.0.0.1:49152/api/" }).slackApiUrl,
|
||||
).toBe("http://127.0.0.1:49152/api/");
|
||||
expect(
|
||||
resolveSlackWriteClientOptions({ slackApiUrl: "http://127.0.0.1:49152/api/" }).slackApiUrl,
|
||||
).toBe("http://127.0.0.1:49152/api/");
|
||||
});
|
||||
|
||||
it("uses SLACK_API_URL as the default Slack Web API root", () => {
|
||||
process.env.SLACK_API_URL = " http://127.0.0.1:49152/api/ ";
|
||||
|
||||
expect(resolveSlackWebClientOptions().slackApiUrl).toBe("http://127.0.0.1:49152/api/");
|
||||
expect(resolveSlackWriteClientOptions().slackApiUrl).toBe("http://127.0.0.1:49152/api/");
|
||||
});
|
||||
|
||||
it("does not read OPENCLAW_SLACK_API_URL as a default Slack Web API root", () => {
|
||||
process.env.OPENCLAW_SLACK_API_URL = "http://127.0.0.1:49152/api/";
|
||||
|
||||
expect(resolveSlackWebClientOptions().slackApiUrl).toBeUndefined();
|
||||
expect(resolveSlackWriteClientOptions().slackApiUrl).toBeUndefined();
|
||||
});
|
||||
|
||||
it("prefers explicit Slack API URL over SLACK_API_URL", () => {
|
||||
process.env.SLACK_API_URL = "http://127.0.0.1:49152/api/";
|
||||
|
||||
expect(
|
||||
resolveSlackWebClientOptions({ slackApiUrl: "http://127.0.0.1:49153/api/" }).slackApiUrl,
|
||||
).toBe("http://127.0.0.1:49153/api/");
|
||||
expect(
|
||||
resolveSlackWriteClientOptions({ slackApiUrl: "http://127.0.0.1:49153/api/" }).slackApiUrl,
|
||||
).toBe("http://127.0.0.1:49153/api/");
|
||||
});
|
||||
|
||||
it("passes merged options into WebClient", () => {
|
||||
const customAgent = {} as never;
|
||||
|
||||
@@ -246,38 +190,6 @@ describe("slack web client config", () => {
|
||||
expect(WebClient).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("keeps write clients separated by Slack API URL", () => {
|
||||
clearProxyEnvForTest();
|
||||
try {
|
||||
const first = getSlackWriteClient("xoxb-test", {
|
||||
slackApiUrl: "http://127.0.0.1:49152/api/",
|
||||
});
|
||||
const second = getSlackWriteClient("xoxb-test", {
|
||||
slackApiUrl: "http://127.0.0.1:49153/api/",
|
||||
});
|
||||
|
||||
expect(second).not.toBe(first);
|
||||
expect(WebClient).toHaveBeenCalledTimes(2);
|
||||
} finally {
|
||||
restoreProxyEnvForTest();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps write clients separated by SLACK_API_URL", () => {
|
||||
clearProxyEnvForTest();
|
||||
try {
|
||||
process.env.SLACK_API_URL = "http://127.0.0.1:49152/api/";
|
||||
const first = getSlackWriteClient("xoxb-test");
|
||||
process.env.SLACK_API_URL = "http://127.0.0.1:49153/api/";
|
||||
const second = getSlackWriteClient("xoxb-test");
|
||||
|
||||
expect(second).not.toBe(first);
|
||||
expect(WebClient).toHaveBeenCalledTimes(2);
|
||||
} finally {
|
||||
restoreProxyEnvForTest();
|
||||
}
|
||||
});
|
||||
|
||||
it("builds stable non-secret token cache keys", () => {
|
||||
const token = "xoxb-sensitive-token";
|
||||
const first = createSlackTokenCacheKey(token);
|
||||
|
||||
@@ -1,22 +1,14 @@
|
||||
// Slack plugin module implements client behavior.
|
||||
import { createHash } from "node:crypto";
|
||||
import { type WebClientOptions, WebClient } from "@slack/web-api";
|
||||
import {
|
||||
resolveSlackWebClientOptions,
|
||||
resolveSlackWriteClientOptions,
|
||||
type SlackApiUrlClientOptions,
|
||||
} from "./client-options.js";
|
||||
import { resolveSlackWebClientOptions, resolveSlackWriteClientOptions } from "./client-options.js";
|
||||
|
||||
const SLACK_WRITE_CLIENT_CACHE_MAX = 32;
|
||||
const slackWriteClientCache = new Map<string, WebClient>();
|
||||
|
||||
export type SlackWriteClientCacheOptions = SlackApiUrlClientOptions;
|
||||
|
||||
export {
|
||||
createSlackApiUrlClientOptions,
|
||||
resolveSlackWebClientOptions,
|
||||
resolveSlackWriteClientOptions,
|
||||
type SlackApiUrlClientOptions,
|
||||
SLACK_DEFAULT_RETRY_OPTIONS,
|
||||
SLACK_WRITE_RETRY_OPTIONS,
|
||||
} from "./client-options.js";
|
||||
@@ -33,27 +25,15 @@ export function createSlackTokenCacheKey(token: string): string {
|
||||
return `sha256:${createHash("sha256").update(token).digest("base64url")}`;
|
||||
}
|
||||
|
||||
function createSlackWriteClientCacheKey(
|
||||
token: string,
|
||||
options: SlackWriteClientCacheOptions,
|
||||
): string {
|
||||
export function getSlackWriteClient(token: string): WebClient {
|
||||
const tokenKey = createSlackTokenCacheKey(token);
|
||||
return options.slackApiUrl ? `${tokenKey}:api:${options.slackApiUrl}` : tokenKey;
|
||||
}
|
||||
|
||||
export function getSlackWriteClient(
|
||||
token: string,
|
||||
options: SlackWriteClientCacheOptions = {},
|
||||
): WebClient {
|
||||
const resolvedOptions = resolveSlackWriteClientOptions(options);
|
||||
const tokenKey = createSlackWriteClientCacheKey(token, resolvedOptions);
|
||||
const cached = slackWriteClientCache.get(tokenKey);
|
||||
if (cached) {
|
||||
slackWriteClientCache.delete(tokenKey);
|
||||
slackWriteClientCache.set(tokenKey, cached);
|
||||
return cached;
|
||||
}
|
||||
const client = new WebClient(token, resolvedOptions);
|
||||
const client = createSlackWriteClient(token);
|
||||
if (slackWriteClientCache.size >= SLACK_WRITE_CLIENT_CACHE_MAX) {
|
||||
const oldestTokenKey = slackWriteClientCache.keys().next().value;
|
||||
if (oldestTokenKey) {
|
||||
|
||||
@@ -1,114 +0,0 @@
|
||||
// Slack tests cover real Web API routing behavior.
|
||||
import { createServer, type Server } from "node:http";
|
||||
import type { AddressInfo } from "node:net";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { createSlackWebClient } from "./client.js";
|
||||
|
||||
const SLACK_API_URL_KEYS = ["SLACK_API_URL"] as const;
|
||||
const PROXY_KEYS = [
|
||||
"HTTPS_PROXY",
|
||||
"HTTP_PROXY",
|
||||
"https_proxy",
|
||||
"http_proxy",
|
||||
"NO_PROXY",
|
||||
"no_proxy",
|
||||
"OPENCLAW_PROXY_ACTIVE",
|
||||
"OPENCLAW_PROXY_CA_FILE",
|
||||
] as const;
|
||||
const TEST_ENV_KEYS = [...SLACK_API_URL_KEYS, ...PROXY_KEYS] as const;
|
||||
const originalEnv = { ...process.env };
|
||||
|
||||
type SlackApiRequest = {
|
||||
authorization?: string;
|
||||
method?: string;
|
||||
url?: string;
|
||||
};
|
||||
|
||||
function restoreTestEnv() {
|
||||
for (const key of TEST_ENV_KEYS) {
|
||||
if (originalEnv[key] !== undefined) {
|
||||
process.env[key] = originalEnv[key];
|
||||
} else {
|
||||
delete process.env[key];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function closeServer(server: Server): Promise<void> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.close((error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
return;
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function startSlackApiServer(requests: SlackApiRequest[]): Promise<{
|
||||
baseUrl: string;
|
||||
close(): Promise<void>;
|
||||
}> {
|
||||
const server = createServer((request, response) => {
|
||||
requests.push({
|
||||
authorization: request.headers.authorization,
|
||||
method: request.method,
|
||||
url: request.url,
|
||||
});
|
||||
request.resume();
|
||||
response.writeHead(200, { "content-type": "application/json" });
|
||||
response.end(
|
||||
`${JSON.stringify({
|
||||
ok: true,
|
||||
team: "Mock Slack",
|
||||
team_id: "TMOCK",
|
||||
url: "https://mock.slack.test/",
|
||||
user: "mock-bot",
|
||||
user_id: "UMOCK",
|
||||
})}\n`,
|
||||
);
|
||||
});
|
||||
await new Promise<void>((resolve) => {
|
||||
server.listen(0, "127.0.0.1", resolve);
|
||||
});
|
||||
const address = server.address() as AddressInfo;
|
||||
return {
|
||||
baseUrl: `http://127.0.0.1:${address.port}`,
|
||||
close: () => closeServer(server),
|
||||
};
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
restoreTestEnv();
|
||||
});
|
||||
|
||||
describe("Slack Web API routing", () => {
|
||||
it("routes real WebClient requests to the SLACK_API_URL root", async () => {
|
||||
for (const key of TEST_ENV_KEYS) {
|
||||
delete process.env[key];
|
||||
}
|
||||
const requests: SlackApiRequest[] = [];
|
||||
const server = await startSlackApiServer(requests);
|
||||
try {
|
||||
process.env.SLACK_API_URL = `${server.baseUrl}/api/`;
|
||||
|
||||
const client = createSlackWebClient("xoxb-route-proof", {
|
||||
retryConfig: { retries: 0 },
|
||||
timeout: 1000,
|
||||
});
|
||||
const result = await client.auth.test();
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
expect(requests).toEqual([
|
||||
{
|
||||
authorization: "Bearer xoxb-route-proof",
|
||||
method: "POST",
|
||||
url: "/api/auth.test",
|
||||
},
|
||||
]);
|
||||
} finally {
|
||||
await server.close();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -38,22 +38,6 @@ describe("slack config schema", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects Slack Web API URL config overrides", () => {
|
||||
const res = SlackConfigSchema.safeParse({
|
||||
apiUrl: "http://127.0.0.1:49152/api/",
|
||||
accounts: { ops: { apiUrl: "http://127.0.0.1:49153/api/" } },
|
||||
});
|
||||
|
||||
expect(res.success).toBe(false);
|
||||
if (!res.success) {
|
||||
expect(
|
||||
res.error.issues.some(
|
||||
(issue) => issue.code === "unrecognized_keys" && issue.keys.includes("apiUrl"),
|
||||
),
|
||||
).toBe(true);
|
||||
}
|
||||
});
|
||||
|
||||
it("accepts unfurl controls at root and account level", () => {
|
||||
const res = SlackConfigSchema.safeParse({
|
||||
unfurlLinks: false,
|
||||
|
||||
@@ -9,7 +9,6 @@ import {
|
||||
normalizeOptionalLowercaseString,
|
||||
} from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { resolveSlackAccount } from "./accounts.js";
|
||||
import { createSlackApiUrlClientOptions } from "./client-options.js";
|
||||
import { createSlackWebClient } from "./client.js";
|
||||
|
||||
type SlackUser = {
|
||||
@@ -51,10 +50,9 @@ type SlackAuthTestResponse = {
|
||||
team?: string;
|
||||
};
|
||||
|
||||
function createSlackDirectoryClient(params: DirectoryConfigParams) {
|
||||
function resolveReadToken(params: DirectoryConfigParams): string | undefined {
|
||||
const account = resolveSlackAccount({ cfg: params.cfg, accountId: params.accountId });
|
||||
const token = account.userToken ?? account.botToken?.trim();
|
||||
return token ? createSlackWebClient(token, createSlackApiUrlClientOptions()) : null;
|
||||
return account.userToken ?? account.botToken?.trim();
|
||||
}
|
||||
|
||||
function normalizeQuery(value?: string | null): string {
|
||||
@@ -103,10 +101,11 @@ function slackUserToDirectoryEntry(
|
||||
export async function getSlackDirectorySelfLive(
|
||||
params: DirectoryConfigParams,
|
||||
): Promise<ChannelDirectoryEntry | null> {
|
||||
const client = createSlackDirectoryClient(params);
|
||||
if (!client) {
|
||||
const token = resolveReadToken(params);
|
||||
if (!token) {
|
||||
return null;
|
||||
}
|
||||
const client = createSlackWebClient(token);
|
||||
const auth = (await client.auth.test()) as SlackAuthTestResponse;
|
||||
const userId = normalizeOptionalString(auth.user_id);
|
||||
if (!userId) {
|
||||
@@ -126,10 +125,11 @@ export async function getSlackDirectorySelfLive(
|
||||
export async function listSlackDirectoryPeersLive(
|
||||
params: DirectoryConfigParams,
|
||||
): Promise<ChannelDirectoryEntry[]> {
|
||||
const client = createSlackDirectoryClient(params);
|
||||
if (!client) {
|
||||
const token = resolveReadToken(params);
|
||||
if (!token) {
|
||||
return [];
|
||||
}
|
||||
const client = createSlackWebClient(token);
|
||||
const query = normalizeQuery(params.query);
|
||||
const members: SlackUser[] = [];
|
||||
let cursor: string | undefined;
|
||||
@@ -172,10 +172,11 @@ export async function listSlackDirectoryPeersLive(
|
||||
export async function listSlackDirectoryGroupsLive(
|
||||
params: DirectoryConfigParams,
|
||||
): Promise<ChannelDirectoryEntry[]> {
|
||||
const client = createSlackDirectoryClient(params);
|
||||
if (!client) {
|
||||
const token = resolveReadToken(params);
|
||||
if (!token) {
|
||||
return [];
|
||||
}
|
||||
const client = createSlackWebClient(token);
|
||||
const query = normalizeQuery(params.query);
|
||||
const channels: SlackChannel[] = [];
|
||||
let cursor: string | undefined;
|
||||
|
||||
@@ -32,7 +32,7 @@ import {
|
||||
resolveSlackAccountDmPolicy,
|
||||
} from "../accounts.js";
|
||||
import { isSlackAnyNativeApprovalClientEnabled } from "../approval-native-gates.js";
|
||||
import { createSlackApiUrlClientOptions, resolveSlackWebClientOptions } from "../client-options.js";
|
||||
import { resolveSlackWebClientOptions } from "../client-options.js";
|
||||
import { normalizeSlackWebhookPath, registerSlackHttpHandler } from "../http/index.js";
|
||||
import { SLACK_TEXT_LIMIT } from "../limits.js";
|
||||
import { resolveSlackChannelAllowlist } from "../resolve-channels.js";
|
||||
@@ -288,8 +288,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
||||
const typingReaction = slackCfg.typingReaction?.trim() ?? "";
|
||||
const mediaMaxBytes = (opts.mediaMaxMb ?? slackCfg.mediaMaxMb ?? 20) * 1024 * 1024;
|
||||
const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false;
|
||||
const slackApiUrlClientOptions = createSlackApiUrlClientOptions();
|
||||
const clientOptions = resolveSlackWebClientOptions(slackApiUrlClientOptions);
|
||||
const clientOptions = resolveSlackWebClientOptions();
|
||||
const { app, receiver, socketModeLogger } = createSlackBoltApp({
|
||||
interop: await getSlackBoltInterop(),
|
||||
slackMode,
|
||||
@@ -463,7 +462,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
||||
const resolved = await resolveSlackChannelAllowlist({
|
||||
token: resolveToken,
|
||||
entries,
|
||||
...slackApiUrlClientOptions,
|
||||
});
|
||||
const nextChannels = { ...channelsConfig };
|
||||
const mapping: string[] = [];
|
||||
@@ -509,7 +507,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
||||
const resolvedUsers = await resolveSlackUserAllowlist({
|
||||
token: resolveToken,
|
||||
entries: allowEntries,
|
||||
...slackApiUrlClientOptions,
|
||||
});
|
||||
const { mapping, unresolved, additions } = buildAllowlistResolutionSummary(
|
||||
resolvedUsers,
|
||||
@@ -556,7 +553,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
||||
const resolvedUsers = await resolveSlackUserAllowlist({
|
||||
token: resolveToken,
|
||||
entries: Array.from(userEntries),
|
||||
...slackApiUrlClientOptions,
|
||||
});
|
||||
const { resolvedMap, mapping, unresolved } = buildAllowlistResolutionSummary(
|
||||
resolvedUsers,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Slack plugin module implements probe behavior.
|
||||
import type { BaseProbeResult } from "openclaw/plugin-sdk/channel-contract";
|
||||
import { withTimeout } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
import { createSlackWebClient, type SlackApiUrlClientOptions } from "./client.js";
|
||||
import { createSlackWebClient } from "./client.js";
|
||||
import { formatSlackError } from "./errors.js";
|
||||
|
||||
export type SlackProbe = BaseProbeResult & {
|
||||
@@ -11,14 +11,8 @@ export type SlackProbe = BaseProbeResult & {
|
||||
team?: { id?: string; name?: string };
|
||||
};
|
||||
|
||||
export async function probeSlack(
|
||||
token: string,
|
||||
timeoutMs = 2500,
|
||||
options: SlackApiUrlClientOptions = {},
|
||||
): Promise<SlackProbe> {
|
||||
const client = options.slackApiUrl
|
||||
? createSlackWebClient(token, options)
|
||||
: createSlackWebClient(token);
|
||||
export async function probeSlack(token: string, timeoutMs = 2500): Promise<SlackProbe> {
|
||||
const client = createSlackWebClient(token);
|
||||
const start = Date.now();
|
||||
try {
|
||||
const result = await withTimeout(client.auth.test(), timeoutMs);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
// Slack plugin module implements resolve channels behavior.
|
||||
import type { WebClient } from "@slack/web-api";
|
||||
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import type { SlackApiUrlClientOptions } from "./client-options.js";
|
||||
import { createSlackWebClient } from "./client.js";
|
||||
import {
|
||||
collectSlackCursorItems,
|
||||
@@ -102,14 +101,8 @@ export async function resolveSlackChannelAllowlist(params: {
|
||||
token: string;
|
||||
entries: string[];
|
||||
client?: WebClient;
|
||||
slackApiUrl?: SlackApiUrlClientOptions["slackApiUrl"];
|
||||
}): Promise<SlackChannelResolution[]> {
|
||||
const client =
|
||||
params.client ??
|
||||
createSlackWebClient(
|
||||
params.token,
|
||||
params.slackApiUrl ? { slackApiUrl: params.slackApiUrl } : {},
|
||||
);
|
||||
const client = params.client ?? createSlackWebClient(params.token);
|
||||
const channels = await listSlackChannels(client);
|
||||
return resolveSlackAllowlistEntries<
|
||||
{ id?: string; name?: string },
|
||||
|
||||
@@ -4,7 +4,6 @@ import {
|
||||
normalizeLowercaseStringOrEmpty,
|
||||
normalizeOptionalString,
|
||||
} from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import type { SlackApiUrlClientOptions } from "./client-options.js";
|
||||
import { createSlackWebClient } from "./client.js";
|
||||
import {
|
||||
collectSlackCursorItems,
|
||||
@@ -154,14 +153,8 @@ export async function resolveSlackUserAllowlist(params: {
|
||||
token: string;
|
||||
entries: string[];
|
||||
client?: WebClient;
|
||||
slackApiUrl?: SlackApiUrlClientOptions["slackApiUrl"];
|
||||
}): Promise<SlackUserResolution[]> {
|
||||
const client =
|
||||
params.client ??
|
||||
createSlackWebClient(
|
||||
params.token,
|
||||
params.slackApiUrl ? { slackApiUrl: params.slackApiUrl } : {},
|
||||
);
|
||||
const client = params.client ?? createSlackWebClient(params.token);
|
||||
const users = await listSlackUsers(client);
|
||||
return resolveSlackAllowlistEntries<
|
||||
{ id?: string; name?: string; email?: string },
|
||||
|
||||
@@ -6,7 +6,7 @@ import {
|
||||
normalizeOptionalString,
|
||||
sortUniqueStrings,
|
||||
} from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { createSlackWebClient, type SlackApiUrlClientOptions } from "./client.js";
|
||||
import { createSlackWebClient } from "./client.js";
|
||||
import { formatSlackError } from "./errors.js";
|
||||
|
||||
export type SlackScopesResult = {
|
||||
@@ -95,9 +95,8 @@ async function callSlack(
|
||||
export async function fetchSlackScopes(
|
||||
token: string,
|
||||
timeoutMs: number,
|
||||
options: SlackApiUrlClientOptions = {},
|
||||
): Promise<SlackScopesResult> {
|
||||
const client = createSlackWebClient(token, { ...options, timeout: timeoutMs });
|
||||
const client = createSlackWebClient(token, { timeout: timeoutMs });
|
||||
const attempts: SlackScopesMethod[] = ["auth.test", "auth.scopes", "apps.permissions.info"];
|
||||
const errors: string[] = [];
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@ import type { SlackTokenSource } from "./accounts.js";
|
||||
import { resolveSlackAccount } from "./accounts.js";
|
||||
import { buildSlackBlocksFallbackText } from "./blocks-fallback.js";
|
||||
import { validateSlackBlocksArray } from "./blocks-input.js";
|
||||
import { createSlackApiUrlClientOptions } from "./client-options.js";
|
||||
import { createSlackTokenCacheKey, getSlackWriteClient } from "./client.js";
|
||||
import { markdownToSlackMrkdwnChunks } from "./format.js";
|
||||
import { SLACK_TEXT_LIMIT } from "./limits.js";
|
||||
@@ -751,7 +750,7 @@ async function sendMessageSlackQueuedInner(params: {
|
||||
blocks?: (Block | KnownBlock)[];
|
||||
}): Promise<SlackSendResult> {
|
||||
const { opts, cfg, account, token, recipient, blocks, trimmedMessage } = params;
|
||||
const client = opts.client ?? getSlackWriteClient(token, createSlackApiUrlClientOptions());
|
||||
const client = opts.client ?? getSlackWriteClient(token);
|
||||
const identity = resolveSlackSendIdentity({
|
||||
accountId: account.accountId,
|
||||
explicit: opts.identity,
|
||||
|
||||
@@ -433,6 +433,31 @@ describe("synology-chat security helpers", () => {
|
||||
expect(result).toContain("[truncated]");
|
||||
});
|
||||
|
||||
it("truncates long inputs without splitting a surrogate pair", () => {
|
||||
const loneSurrogatePattern =
|
||||
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?:^|[^\uD800-\uDBFF])[\uDC00-\uDFFF]/u;
|
||||
const input = "a".repeat(3999) + "\u{1F600}" + "b".repeat(2000);
|
||||
|
||||
const result = sanitizeInput(input);
|
||||
|
||||
expect(result).toContain("[truncated]");
|
||||
expect(result).not.toMatch(loneSurrogatePattern);
|
||||
expect(result).toBe(`${"a".repeat(3999)}... [truncated]`);
|
||||
});
|
||||
|
||||
it("keeps complete supplementary-plane characters that fit before truncation", () => {
|
||||
const loneSurrogatePattern =
|
||||
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?:^|[^\uD800-\uDBFF])[\uDC00-\uDFFF]/u;
|
||||
const emoji = "\u{1F600}";
|
||||
const input = "a".repeat(3998) + emoji + "b".repeat(2000);
|
||||
|
||||
const result = sanitizeInput(input);
|
||||
|
||||
expect(result).toContain("[truncated]");
|
||||
expect(result.startsWith(`${"a".repeat(3998)}${emoji}`)).toBe(true);
|
||||
expect(result).not.toMatch(loneSurrogatePattern);
|
||||
});
|
||||
|
||||
it("rate limits per user and caps tracked state", () => {
|
||||
const limiter = new RateLimiter(3, 60);
|
||||
expect(limiter.check("user1")).toBe(true);
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
import { resolveStableChannelMessageIngress } from "openclaw/plugin-sdk/channel-ingress-runtime";
|
||||
import { finiteSecondsToTimerSafeMilliseconds } from "openclaw/plugin-sdk/number-runtime";
|
||||
import { safeEqualSecret } from "openclaw/plugin-sdk/security-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
import {
|
||||
createFixedWindowRateLimiter,
|
||||
type FixedWindowRateLimiter,
|
||||
@@ -64,7 +65,7 @@ export function sanitizeInput(text: string): string {
|
||||
|
||||
const maxLength = 4000;
|
||||
if (sanitized.length > maxLength) {
|
||||
sanitized = sanitized.slice(0, maxLength) + "... [truncated]";
|
||||
sanitized = truncateUtf16Safe(sanitized, maxLength) + "... [truncated]";
|
||||
}
|
||||
|
||||
return sanitized;
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import * as querystring from "node:querystring";
|
||||
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
import {
|
||||
beginWebhookRequestPipelineOrReject,
|
||||
createWebhookInFlightLimiter,
|
||||
@@ -503,7 +504,7 @@ async function parseAndAuthorizeSynologyWebhook(params: {
|
||||
respondNoContent(params.res);
|
||||
return { ok: false };
|
||||
}
|
||||
const preview = cleanText.length > 100 ? `${cleanText.slice(0, 100)}...` : cleanText;
|
||||
const preview = cleanText.length > 100 ? `${truncateUtf16Safe(cleanText, 100)}...` : cleanText;
|
||||
return {
|
||||
ok: true,
|
||||
message: {
|
||||
@@ -574,7 +575,7 @@ async function processAuthorizedSynologyWebhook(params: {
|
||||
deliveryUserId,
|
||||
params.account.allowInsecureSsl,
|
||||
);
|
||||
const replyPreview = reply.length > 100 ? `${reply.slice(0, 100)}...` : reply;
|
||||
const replyPreview = reply.length > 100 ? `${truncateUtf16Safe(reply, 100)}...` : reply;
|
||||
params.log?.info?.(
|
||||
`Reply sent to ${params.message.payload.username} (${deliveryUserId}): ${replyPreview}`,
|
||||
);
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { buildTelegramMessageContextForTest } from "./bot-message-context.test-harness.js";
|
||||
import type { TelegramPromptContextEntry } from "./bot-message-context.types.js";
|
||||
|
||||
const telegramChatWindowContext: TelegramPromptContextEntry = {
|
||||
label: "Conversation context",
|
||||
source: "telegram",
|
||||
type: "chat_window",
|
||||
payload: {
|
||||
order: "chronological",
|
||||
relation: "selected_for_current_message",
|
||||
messages: [
|
||||
{
|
||||
message_id: "10",
|
||||
sender: "Pat",
|
||||
timestamp_ms: 1_700_000_000_000,
|
||||
body: "Earlier DM turn already in the transcript",
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
describe("buildTelegramMessageContext prompt context", () => {
|
||||
it("omits Telegram chat-window context for existing unthreaded private DM sessions", async () => {
|
||||
const ctx = await buildTelegramMessageContextForTest({
|
||||
message: {
|
||||
chat: { id: 1234, type: "private", first_name: "Pat" },
|
||||
from: { id: 1234, first_name: "Pat" },
|
||||
text: "continue",
|
||||
},
|
||||
promptContext: [telegramChatWindowContext],
|
||||
sessionRuntime: {
|
||||
readSessionUpdatedAt: ({ sessionKey }) =>
|
||||
sessionKey === "agent:main:main" ? 1_700_000_000_000 : undefined,
|
||||
},
|
||||
});
|
||||
|
||||
expect(ctx?.ctxPayload.SessionKey).toBe("agent:main:main");
|
||||
expect(ctx?.ctxPayload.UntrustedStructuredContext).toBeUndefined();
|
||||
});
|
||||
|
||||
it("keeps Telegram chat-window context for fresh private DM sessions", async () => {
|
||||
const ctx = await buildTelegramMessageContextForTest({
|
||||
message: {
|
||||
chat: { id: 1234, type: "private", first_name: "Pat" },
|
||||
from: { id: 1234, first_name: "Pat" },
|
||||
text: "start",
|
||||
},
|
||||
promptContext: [telegramChatWindowContext],
|
||||
});
|
||||
|
||||
expect(ctx?.ctxPayload.UntrustedStructuredContext).toEqual([telegramChatWindowContext]);
|
||||
});
|
||||
|
||||
it("keeps Telegram chat-window context for existing private DM replies", async () => {
|
||||
const ctx = await buildTelegramMessageContextForTest({
|
||||
message: {
|
||||
chat: { id: 1234, type: "private", first_name: "Pat" },
|
||||
from: { id: 1234, first_name: "Pat" },
|
||||
text: "replying with context",
|
||||
reply_to_message: {
|
||||
chat: { id: 1234, type: "private", first_name: "Pat" },
|
||||
from: { id: 1234, first_name: "Pat" },
|
||||
text: "older referenced turn",
|
||||
date: 1_700_000_000,
|
||||
message_id: 10,
|
||||
},
|
||||
},
|
||||
promptContext: [telegramChatWindowContext],
|
||||
sessionRuntime: {
|
||||
readSessionUpdatedAt: ({ sessionKey }) =>
|
||||
sessionKey === "agent:main:main" ? 1_700_000_000_000 : undefined,
|
||||
},
|
||||
});
|
||||
|
||||
expect(ctx?.ctxPayload.UntrustedStructuredContext).toEqual([telegramChatWindowContext]);
|
||||
});
|
||||
});
|
||||
@@ -113,6 +113,10 @@ export async function resolveTelegramMessageContextStorePath(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function isTelegramChatWindowPromptContext(entry: TelegramPromptContextEntry): boolean {
|
||||
return entry.source === "telegram" && entry.type === "chat_window";
|
||||
}
|
||||
|
||||
function replyTargetToChainEntry(replyTarget: TelegramReplyTarget): TelegramReplyChainEntry {
|
||||
return {
|
||||
...(replyTarget.id ? { messageId: replyTarget.id } : {}),
|
||||
@@ -378,6 +382,17 @@ export async function buildTelegramInboundContextPayload(params: {
|
||||
storePath,
|
||||
sessionKey: route.sessionKey,
|
||||
});
|
||||
const shouldSuppressPersistedDmChatWindowContext =
|
||||
!isGroup &&
|
||||
previousTimestamp !== undefined &&
|
||||
dmThreadId == null &&
|
||||
visibleReplyChain.length === 0 &&
|
||||
!visibleReplyTarget;
|
||||
// Existing plain DMs already carry their history through the persistent
|
||||
// transcript. Keep chat windows for fresh DMs, topics, replies, and groups.
|
||||
const visiblePromptContext = shouldSuppressPersistedDmChatWindowContext
|
||||
? promptContext.filter((entry) => !isTelegramChatWindowPromptContext(entry))
|
||||
: promptContext;
|
||||
const body = formatInboundEnvelope({
|
||||
channel: "Telegram",
|
||||
from: conversationLabel,
|
||||
@@ -559,7 +574,7 @@ export async function buildTelegramInboundContextPayload(params: {
|
||||
}
|
||||
: undefined,
|
||||
groupSystemPrompt: isGroup || (!isGroup && groupConfig) ? groupSystemPrompt : undefined,
|
||||
untrustedContext: promptContext.length > 0 ? promptContext : undefined,
|
||||
untrustedContext: visiblePromptContext.length > 0 ? visiblePromptContext : undefined,
|
||||
},
|
||||
contextVisibility: contextVisibilityMode,
|
||||
extra: {
|
||||
|
||||
@@ -23,6 +23,7 @@ type BuildTelegramMessageContextForTestParams = {
|
||||
message: Record<string, unknown>;
|
||||
me?: Record<string, unknown>;
|
||||
allMedia?: TelegramMediaRef[];
|
||||
promptContext?: BuildTelegramMessageContextParams["promptContext"];
|
||||
options?: BuildTelegramMessageContextParams["options"];
|
||||
cfg?: Record<string, unknown>;
|
||||
accountId?: string;
|
||||
@@ -112,6 +113,7 @@ export async function buildTelegramMessageContextForTest(
|
||||
me: { id: 7, username: "bot", ...params.me },
|
||||
} as never,
|
||||
allMedia: params.allMedia ?? [],
|
||||
promptContext: params.promptContext ?? [],
|
||||
storeAllowFrom: [],
|
||||
options: params.options ?? {},
|
||||
bot: {
|
||||
|
||||
@@ -4445,12 +4445,12 @@ describe("createTelegramBot", () => {
|
||||
});
|
||||
|
||||
expect(sendMessageSpy.mock.calls.length).toBeGreaterThan(1);
|
||||
for (const [index, call] of sendMessageSpy.mock.calls.entries()) {
|
||||
for (const call of sendMessageSpy.mock.calls) {
|
||||
const params = call[2] as
|
||||
| { reply_to_message_id?: number; reply_parameters?: { message_id?: number } }
|
||||
| undefined;
|
||||
const actual = params?.reply_parameters?.message_id ?? params?.reply_to_message_id;
|
||||
if (mode === "all" || index === 0) {
|
||||
if (mode === "all") {
|
||||
expect(actual).toBe(messageId);
|
||||
} else {
|
||||
expect(actual).toBeUndefined();
|
||||
|
||||
@@ -326,36 +326,39 @@ async function sendTelegramVoiceFallbackText(opts: {
|
||||
silent?: boolean;
|
||||
replyMarkup?: ReturnType<typeof buildInlineKeyboard>;
|
||||
replyQuoteText?: string;
|
||||
replyToMode?: ReplyToMode;
|
||||
}): Promise<number | undefined> {
|
||||
let firstDeliveredMessageId: number | undefined;
|
||||
const chunks = filterEmptyTelegramTextChunks(opts.chunkText(opts.text));
|
||||
let appliedReplyTo = false;
|
||||
for (const chunk of chunks) {
|
||||
// Only apply reply reference, quote text, and buttons to the first chunk.
|
||||
const replyToForChunk = !appliedReplyTo ? opts.replyToId : undefined;
|
||||
const applyQuoteForChunk = !appliedReplyTo;
|
||||
const messageId = await sendTelegramText(opts.bot, opts.chatId, chunk.text, opts.runtime, {
|
||||
replyToMessageId: replyToForChunk,
|
||||
replyQuoteMessageId: applyQuoteForChunk ? opts.replyQuoteMessageId : undefined,
|
||||
replyQuoteText: applyQuoteForChunk ? opts.replyQuoteText : undefined,
|
||||
replyQuotePosition: applyQuoteForChunk ? opts.replyQuotePosition : undefined,
|
||||
replyQuoteEntities: applyQuoteForChunk ? opts.replyQuoteEntities : undefined,
|
||||
thread: opts.thread,
|
||||
textMode: chunk.textMode,
|
||||
plainText: chunk.plainText,
|
||||
richMessages: opts.richMessages,
|
||||
linkPreview: opts.linkPreview,
|
||||
tableMode: opts.tableMode,
|
||||
silent: opts.silent,
|
||||
replyMarkup: !appliedReplyTo ? opts.replyMarkup : undefined,
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = messageId;
|
||||
}
|
||||
if (replyToForChunk) {
|
||||
appliedReplyTo = true;
|
||||
}
|
||||
}
|
||||
await sendChunkedTelegramReplyText({
|
||||
chunks,
|
||||
progress: { hasReplied: false, hasDelivered: false },
|
||||
replyToId: opts.replyToId,
|
||||
replyToMode: opts.replyToMode ?? "first",
|
||||
replyMarkup: opts.replyMarkup,
|
||||
replyQuoteText: opts.replyQuoteText,
|
||||
quoteOnlyOnFirstChunk: true,
|
||||
sendChunk: async ({ chunk, replyToMessageId, replyMarkup, replyQuoteText }) => {
|
||||
const messageId = await sendTelegramText(opts.bot, opts.chatId, chunk.text, opts.runtime, {
|
||||
replyToMessageId,
|
||||
replyQuoteMessageId: replyToMessageId ? opts.replyQuoteMessageId : undefined,
|
||||
replyQuoteText,
|
||||
replyQuotePosition: replyToMessageId ? opts.replyQuotePosition : undefined,
|
||||
replyQuoteEntities: replyToMessageId ? opts.replyQuoteEntities : undefined,
|
||||
thread: opts.thread,
|
||||
textMode: chunk.textMode,
|
||||
plainText: chunk.plainText,
|
||||
richMessages: opts.richMessages,
|
||||
linkPreview: opts.linkPreview,
|
||||
tableMode: opts.tableMode,
|
||||
silent: opts.silent,
|
||||
replyMarkup,
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = messageId;
|
||||
}
|
||||
},
|
||||
});
|
||||
return firstDeliveredMessageId;
|
||||
}
|
||||
|
||||
@@ -535,6 +538,7 @@ async function deliverMediaReply(params: {
|
||||
silent: params.silent,
|
||||
replyMarkup: params.replyMarkup,
|
||||
replyQuoteText: params.replyQuoteText,
|
||||
replyToMode: params.replyToMode,
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = fallbackMessageId;
|
||||
|
||||
@@ -1484,7 +1484,7 @@ describe("deliverReplies", () => {
|
||||
expectRecordFields(mockCallArg(sendMessage, 0, 2), { disable_notification: true });
|
||||
});
|
||||
|
||||
it("voice fallback applies reply-to only on first chunk when replyToMode is first", async () => {
|
||||
it("voice fallback avoids native replies for chunked first-mode fallback text", async () => {
|
||||
const { runtime, sendVoice, sendMessage, bot } = createVoiceFailureHarness({
|
||||
voiceError: createVoiceMessagesForbiddenError(),
|
||||
sendMessageResult: {
|
||||
@@ -1520,15 +1520,12 @@ describe("deliverReplies", () => {
|
||||
expect(sendVoice).toHaveBeenCalledTimes(1);
|
||||
expect(sendMessage.mock.calls.length).toBeGreaterThanOrEqual(2);
|
||||
expectRecordFields(mockCallArg(sendMessage, 0, 2), {
|
||||
reply_parameters: {
|
||||
message_id: 77,
|
||||
quote: "quoted context",
|
||||
allow_sending_without_reply: true,
|
||||
},
|
||||
reply_markup: {
|
||||
inline_keyboard: [[{ text: "Ack", callback_data: "ack" }]],
|
||||
},
|
||||
});
|
||||
expect(mockCallArg(sendMessage, 0, 2)).not.toHaveProperty("reply_to_message_id");
|
||||
expect(mockCallArg(sendMessage, 0, 2)).not.toHaveProperty("reply_parameters");
|
||||
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_to_message_id", 77);
|
||||
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_parameters");
|
||||
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_markup");
|
||||
@@ -1555,7 +1552,32 @@ describe("deliverReplies", () => {
|
||||
expect(sendMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("replyToMode 'first' only applies reply-to to the first text chunk", async () => {
|
||||
it("replyToMode 'first' keeps native reply-to for a single text chunk", async () => {
|
||||
const runtime = createRuntime();
|
||||
const sendMessage = vi.fn().mockResolvedValue({
|
||||
message_id: 20,
|
||||
chat: { id: "123" },
|
||||
});
|
||||
const bot = createBot({ sendMessage });
|
||||
|
||||
await deliverReplies({
|
||||
replies: [{ text: "one chunk", replyToId: "700" }],
|
||||
chatId: "123",
|
||||
token: "tok",
|
||||
runtime,
|
||||
bot,
|
||||
replyToMode: "first",
|
||||
textLimit: 4000,
|
||||
});
|
||||
|
||||
expect(sendMessage).toHaveBeenCalledTimes(1);
|
||||
expectRecordFields(mockCallArg(sendMessage, 0, 2), {
|
||||
reply_to_message_id: 700,
|
||||
allow_sending_without_reply: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("replyToMode 'first' avoids native reply-to for chunked text", async () => {
|
||||
const runtime = createRuntime();
|
||||
const sendMessage = vi.fn().mockResolvedValue({
|
||||
message_id: 20,
|
||||
@@ -1575,13 +1597,10 @@ describe("deliverReplies", () => {
|
||||
});
|
||||
|
||||
expect(sendMessage.mock.calls.length).toBeGreaterThanOrEqual(2);
|
||||
// First chunk should have reply_to_message_id
|
||||
expectRecordFields(mockCallArg(sendMessage, 0, 2), {
|
||||
reply_to_message_id: 700,
|
||||
allow_sending_without_reply: true,
|
||||
});
|
||||
// Second chunk should NOT have reply_to_message_id
|
||||
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_to_message_id");
|
||||
for (const call of sendMessage.mock.calls) {
|
||||
expect(call[2]).not.toHaveProperty("reply_to_message_id");
|
||||
expect(call[2]).not.toHaveProperty("reply_parameters");
|
||||
}
|
||||
});
|
||||
|
||||
it("clamps reply chunks to Telegram rich message limit", async () => {
|
||||
|
||||
@@ -184,9 +184,9 @@ describe("buildTelegramThreadParams", () => {
|
||||
{ input: { id: 0, scope: "dm" as const }, expected: undefined },
|
||||
{ input: { id: -1, scope: "dm" as const }, expected: undefined },
|
||||
{ input: { id: 1.9, scope: "dm" as const }, expected: { message_thread_id: 1 } },
|
||||
// id=0 should be included for forum and none scopes (not falsy)
|
||||
// id=0 should be included for forum scope (not falsy).
|
||||
{ input: { id: 0, scope: "forum" as const }, expected: { message_thread_id: 0 } },
|
||||
{ input: { id: 0, scope: "none" as const }, expected: { message_thread_id: 0 } },
|
||||
{ input: { id: 42, scope: "none" as const }, expected: undefined },
|
||||
])("builds thread params", ({ input, expected }) => {
|
||||
expect(buildTelegramThreadParams(input)).toEqual(expected);
|
||||
});
|
||||
|
||||
@@ -427,6 +427,10 @@ export function buildTelegramThreadParams(thread?: TelegramThreadSpec | null) {
|
||||
return normalized > 0 ? { message_thread_id: normalized } : undefined;
|
||||
}
|
||||
|
||||
if (thread.scope === "none") {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// Telegram rejects message_thread_id=1 for General forum topic
|
||||
if (normalized === TELEGRAM_GENERAL_TOPIC_ID) {
|
||||
return undefined;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Telegram plugin module implements reply threading behavior.
|
||||
import type { ReplyToMode } from "openclaw/plugin-sdk/config-contracts";
|
||||
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
|
||||
|
||||
export type DeliveryProgress = {
|
||||
hasReplied: boolean;
|
||||
@@ -48,17 +49,24 @@ export async function sendChunkedTelegramReplyText<
|
||||
}) => Promise<void>;
|
||||
}): Promise<void> {
|
||||
const applyDelivered = params.markDelivered ?? markDelivered;
|
||||
const suppressSingleUseReply =
|
||||
params.chunks.length > 1 && isSingleUseReplyToMode(params.replyToMode);
|
||||
for (let i = 0; i < params.chunks.length; i += 1) {
|
||||
const chunk = params.chunks[i];
|
||||
if (!chunk) {
|
||||
continue;
|
||||
}
|
||||
const isFirstChunk = i === 0;
|
||||
const replyToMessageId = resolveReplyToForSend({
|
||||
replyToId: params.replyToId,
|
||||
replyToMode: params.replyToMode,
|
||||
progress: params.progress,
|
||||
});
|
||||
// Telegram Desktop can render long formatted native-reply chunks as
|
||||
// unsupported messages. Multi-part `first` replies consume the reply target
|
||||
// without adding native reply params, preserving visible text.
|
||||
const replyToMessageId = suppressSingleUseReply
|
||||
? undefined
|
||||
: resolveReplyToForSend({
|
||||
replyToId: params.replyToId,
|
||||
replyToMode: params.replyToMode,
|
||||
progress: params.progress,
|
||||
});
|
||||
const shouldAttachQuote =
|
||||
Boolean(replyToMessageId) &&
|
||||
Boolean(params.replyQuoteText) &&
|
||||
@@ -70,7 +78,10 @@ export async function sendChunkedTelegramReplyText<
|
||||
replyMarkup: isFirstChunk ? params.replyMarkup : undefined,
|
||||
replyQuoteText: shouldAttachQuote ? params.replyQuoteText : undefined,
|
||||
});
|
||||
markReplyApplied(params.progress, replyToMessageId);
|
||||
markReplyApplied(
|
||||
params.progress,
|
||||
suppressSingleUseReply && isFirstChunk ? params.replyToId : replyToMessageId,
|
||||
);
|
||||
applyDelivered(params.progress);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,19 +82,37 @@ describe("telegram channel message adapter", () => {
|
||||
};
|
||||
|
||||
const provePayload = async () => {
|
||||
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-payload", chatId: "12345" });
|
||||
sendMessageTelegramMock.mockResolvedValueOnce({
|
||||
messageId: "tg-payload-2",
|
||||
chatId: "12345",
|
||||
receipt: {
|
||||
primaryPlatformMessageId: "tg-payload-1",
|
||||
platformMessageIds: ["tg-payload-1", "tg-payload-2"],
|
||||
parts: [
|
||||
{ platformMessageId: "tg-payload-1", kind: "text", index: 0 },
|
||||
{ platformMessageId: "tg-payload-2", kind: "text", index: 1 },
|
||||
],
|
||||
sentAt: 123,
|
||||
},
|
||||
});
|
||||
const result = await adapter.send!.payload!({
|
||||
cfg: {} as never,
|
||||
to: "12345",
|
||||
text: "payload",
|
||||
payload: { text: "payload" },
|
||||
replyToId: "900",
|
||||
replyToIdSource: "implicit",
|
||||
replyToMode: "first",
|
||||
threadId: "12",
|
||||
deps: { sendTelegram: sendMessageTelegramMock },
|
||||
});
|
||||
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith("12345", "payload", {
|
||||
cfg: {},
|
||||
verbose: false,
|
||||
messageThreadId: undefined,
|
||||
replyToMessageId: undefined,
|
||||
messageThreadId: 12,
|
||||
replyToMessageId: 900,
|
||||
replyToIdSource: "implicit",
|
||||
replyToMode: "first",
|
||||
accountId: undefined,
|
||||
silent: undefined,
|
||||
gatewayClientScopes: undefined,
|
||||
@@ -104,7 +122,8 @@ describe("telegram channel message adapter", () => {
|
||||
quoteText: undefined,
|
||||
buttons: undefined,
|
||||
});
|
||||
expect(result.receipt.platformMessageIds).toEqual(["tg-payload"]);
|
||||
expect(result.receipt.primaryPlatformMessageId).toBe("tg-payload-1");
|
||||
expect(result.receipt.platformMessageIds).toEqual(["tg-payload-1", "tg-payload-2"]);
|
||||
};
|
||||
|
||||
const proveReplyThreadSilent = async () => {
|
||||
@@ -114,6 +133,8 @@ describe("telegram channel message adapter", () => {
|
||||
to: "12345",
|
||||
text: "threaded",
|
||||
replyToId: "900",
|
||||
replyToIdSource: "implicit",
|
||||
replyToMode: "first",
|
||||
threadId: "12",
|
||||
silent: true,
|
||||
deps: { sendTelegram: sendMessageTelegramMock },
|
||||
@@ -123,6 +144,8 @@ describe("telegram channel message adapter", () => {
|
||||
verbose: false,
|
||||
messageThreadId: 12,
|
||||
replyToMessageId: 900,
|
||||
replyToIdSource: "implicit",
|
||||
replyToMode: "first",
|
||||
accountId: undefined,
|
||||
silent: true,
|
||||
gatewayClientScopes: undefined,
|
||||
@@ -138,6 +161,9 @@ describe("telegram channel message adapter", () => {
|
||||
cfg: {} as never,
|
||||
to: "12345",
|
||||
text: "batch",
|
||||
replyToId: "900",
|
||||
replyToIdSource: "implicit",
|
||||
replyToMode: "first",
|
||||
payload: {
|
||||
text: "batch",
|
||||
mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
|
||||
@@ -152,7 +178,9 @@ describe("telegram channel message adapter", () => {
|
||||
cfg: {},
|
||||
verbose: false,
|
||||
messageThreadId: undefined,
|
||||
replyToMessageId: undefined,
|
||||
replyToMessageId: 900,
|
||||
replyToIdSource: "implicit",
|
||||
replyToMode: "first",
|
||||
accountId: undefined,
|
||||
silent: undefined,
|
||||
gatewayClientScopes: undefined,
|
||||
@@ -172,6 +200,8 @@ describe("telegram channel message adapter", () => {
|
||||
verbose: false,
|
||||
messageThreadId: undefined,
|
||||
replyToMessageId: undefined,
|
||||
replyToIdSource: undefined,
|
||||
replyToMode: undefined,
|
||||
accountId: undefined,
|
||||
silent: undefined,
|
||||
gatewayClientScopes: undefined,
|
||||
@@ -222,6 +252,36 @@ describe("telegram channel message adapter", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps implicit first replies on the first delivered payload media", async () => {
|
||||
const adapter = requireTelegramMessageAdapter();
|
||||
sendMessageTelegramMock
|
||||
.mockResolvedValueOnce({ messageId: "tg-media-1", chatId: "12345" })
|
||||
.mockResolvedValueOnce({ messageId: "tg-media-2", chatId: "12345" });
|
||||
|
||||
await adapter.send!.payload!({
|
||||
cfg: {} as never,
|
||||
to: "12345",
|
||||
text: "batch",
|
||||
replyToId: "900",
|
||||
replyToIdSource: "implicit",
|
||||
replyToMode: "first",
|
||||
payload: {
|
||||
text: "batch",
|
||||
mediaUrls: ["", "https://example.com/a.png", "https://example.com/b.png"],
|
||||
},
|
||||
deps: { sendTelegram: sendMessageTelegramMock },
|
||||
});
|
||||
|
||||
const firstOpts = sendMessageTelegramMock.mock.calls[0]?.[2] as
|
||||
| { replyToMessageId?: number }
|
||||
| undefined;
|
||||
const secondOpts = sendMessageTelegramMock.mock.calls[1]?.[2] as
|
||||
| { replyToMessageId?: number }
|
||||
| undefined;
|
||||
expect(firstOpts?.replyToMessageId).toBe(900);
|
||||
expect(secondOpts?.replyToMessageId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("backs declared live preview finalizer capabilities with adapter proofs", async () => {
|
||||
const adapter = requireTelegramMessageAdapter();
|
||||
|
||||
|
||||
@@ -139,6 +139,16 @@ describe("isRecoverableTelegramNetworkError", () => {
|
||||
expect(isRecoverableTelegramNetworkError(undiciSnippetErr, { context: "polling" })).toBe(true);
|
||||
});
|
||||
|
||||
it("treats delete/react (idempotent) contexts like polling, not send", () => {
|
||||
const undiciSnippetErr = new Error("Undici: socket failure");
|
||||
// delete and react are idempotent Telegram operations; a transient
|
||||
// snippet-only error must be retried (allowMessageMatch defaults true),
|
||||
// matching polling/webhook. send stays strict as the regression guard.
|
||||
expect(isRecoverableTelegramNetworkError(undiciSnippetErr, { context: "delete" })).toBe(true);
|
||||
expect(isRecoverableTelegramNetworkError(undiciSnippetErr, { context: "react" })).toBe(true);
|
||||
expect(isRecoverableTelegramNetworkError(undiciSnippetErr, { context: "send" })).toBe(false);
|
||||
});
|
||||
|
||||
it("treats grammY failed-after envelope errors as recoverable in send context", () => {
|
||||
expect(
|
||||
isRecoverableTelegramNetworkError(
|
||||
|
||||
@@ -141,7 +141,13 @@ export function isTelegramMisdirectedRequestError(err: unknown): boolean {
|
||||
return false;
|
||||
}
|
||||
|
||||
export type TelegramNetworkErrorContext = "polling" | "send" | "webhook" | "unknown";
|
||||
export type TelegramNetworkErrorContext =
|
||||
| "polling"
|
||||
| "send"
|
||||
| "webhook"
|
||||
| "delete"
|
||||
| "react"
|
||||
| "unknown";
|
||||
export type TelegramNetworkErrorOrigin = {
|
||||
method?: string | null;
|
||||
url?: string | null;
|
||||
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
resolvePayloadMediaUrls,
|
||||
sendPayloadMediaSequenceOrFallback,
|
||||
} from "openclaw/plugin-sdk/reply-payload";
|
||||
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
|
||||
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
|
||||
import { sanitizeAssistantVisibleText } from "openclaw/plugin-sdk/text-chunking";
|
||||
import type { TelegramInlineButtons } from "./button-types.js";
|
||||
@@ -60,6 +61,8 @@ async function resolveTelegramSendContext(params: {
|
||||
deps?: OutboundSendDeps;
|
||||
accountId?: string | null;
|
||||
replyToId?: string | null;
|
||||
replyToIdSource?: TelegramSendOpts["replyToIdSource"];
|
||||
replyToMode?: TelegramSendOpts["replyToMode"];
|
||||
threadId?: string | number | null;
|
||||
formatting?: OutboundDeliveryFormattingOptions;
|
||||
silent?: boolean;
|
||||
@@ -74,6 +77,8 @@ async function resolveTelegramSendContext(params: {
|
||||
tableMode?: OutboundDeliveryFormattingOptions["tableMode"];
|
||||
messageThreadId?: number;
|
||||
replyToMessageId?: number;
|
||||
replyToIdSource?: TelegramSendOpts["replyToIdSource"];
|
||||
replyToMode?: TelegramSendOpts["replyToMode"];
|
||||
accountId?: string;
|
||||
silent?: boolean;
|
||||
gatewayClientScopes?: readonly string[];
|
||||
@@ -87,6 +92,8 @@ async function resolveTelegramSendContext(params: {
|
||||
cfg: params.cfg,
|
||||
messageThreadId: parseTelegramThreadId(params.threadId),
|
||||
replyToMessageId: parseTelegramReplyToMessageId(params.replyToId),
|
||||
...(params.replyToIdSource !== undefined ? { replyToIdSource: params.replyToIdSource } : {}),
|
||||
...(params.replyToMode !== undefined ? { replyToMode: params.replyToMode } : {}),
|
||||
accountId: params.accountId ?? undefined,
|
||||
silent: params.silent,
|
||||
gatewayClientScopes: params.gatewayClientScopes,
|
||||
@@ -151,6 +158,19 @@ export async function sendTelegramPayloadMessages(params: {
|
||||
quoteText,
|
||||
...(params.payload.audioAsVoice === true ? { asVoice: true } : {}),
|
||||
};
|
||||
const shouldConsumeImplicitReplyTarget =
|
||||
payloadOpts.replyToIdSource === "implicit" &&
|
||||
payloadOpts.replyToMode !== undefined &&
|
||||
isSingleUseReplyToMode(payloadOpts.replyToMode);
|
||||
const consumedImplicitReplyPayloadOpts = shouldConsumeImplicitReplyTarget
|
||||
? {
|
||||
...payloadOpts,
|
||||
replyToMessageId: undefined,
|
||||
replyToIdSource: undefined,
|
||||
replyToMode: undefined,
|
||||
}
|
||||
: payloadOpts;
|
||||
let implicitReplyTargetAvailable = true;
|
||||
if (reactionEmoji) {
|
||||
if (typeof replyToMessageId !== "number") {
|
||||
throw new Error("Telegram reaction requires a reply target");
|
||||
@@ -179,12 +199,18 @@ export async function sendTelegramPayloadMessages(params: {
|
||||
...payloadOpts,
|
||||
buttons,
|
||||
}),
|
||||
send: async ({ text: textLocal, mediaUrl, isFirst }) =>
|
||||
await params.send(params.to, textLocal, {
|
||||
...payloadOpts,
|
||||
send: async ({ text: textLocal, mediaUrl, isFirst }) => {
|
||||
const mediaPayloadOpts =
|
||||
shouldConsumeImplicitReplyTarget && !implicitReplyTargetAvailable
|
||||
? consumedImplicitReplyPayloadOpts
|
||||
: payloadOpts;
|
||||
implicitReplyTargetAvailable = false;
|
||||
return await params.send(params.to, textLocal, {
|
||||
...mediaPayloadOpts,
|
||||
mediaUrl,
|
||||
...(isFirst ? { buttons } : {}),
|
||||
}),
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -72,6 +72,7 @@ let isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess: typeof import("./telegr
|
||||
let listTelegramSpooledUpdateClaims: typeof import("./telegram-ingress-spool.js").listTelegramSpooledUpdateClaims;
|
||||
let listTelegramSpooledUpdates: typeof import("./telegram-ingress-spool.js").listTelegramSpooledUpdates;
|
||||
let recoverStaleTelegramSpooledUpdateClaims: typeof import("./telegram-ingress-spool.js").recoverStaleTelegramSpooledUpdateClaims;
|
||||
let telegramSpooledUpdateClaimLeaseMs: typeof import("./telegram-ingress-spool.js").TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS;
|
||||
let writeTelegramSpooledUpdate: typeof import("./telegram-ingress-spool.js").writeTelegramSpooledUpdate;
|
||||
let createTelegramSpooledReplayDeferredParticipant: typeof import("./bot-processing-outcome.js").createTelegramSpooledReplayDeferredParticipant;
|
||||
let TelegramMessageDispatchReplayForgetError: typeof import("./message-dispatch-dedupe.js").TelegramMessageDispatchReplayForgetError;
|
||||
@@ -685,6 +686,7 @@ describe("TelegramPollingSession", () => {
|
||||
listTelegramSpooledUpdateClaims,
|
||||
listTelegramSpooledUpdates,
|
||||
recoverStaleTelegramSpooledUpdateClaims,
|
||||
TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS: telegramSpooledUpdateClaimLeaseMs,
|
||||
writeTelegramSpooledUpdate,
|
||||
} = await import("./telegram-ingress-spool.js"));
|
||||
({ createTelegramSpooledReplayDeferredParticipant } =
|
||||
@@ -1067,13 +1069,13 @@ describe("TelegramPollingSession", () => {
|
||||
const queue = createChannelIngressQueue({ ...options, channelId: "telegram" });
|
||||
return {
|
||||
...queue,
|
||||
claim: async (...args: Parameters<typeof queue.claim>) => {
|
||||
if (args[0] === "0000000000000001" && !blockedFirstClaim) {
|
||||
claimNext: async (...args: Parameters<typeof queue.claimNext>) => {
|
||||
if (!blockedFirstClaim) {
|
||||
blockedFirstClaim = true;
|
||||
firstClaimStarted?.();
|
||||
await firstClaimGate;
|
||||
}
|
||||
return queue.claim(...args);
|
||||
return queue.claimNext(...args);
|
||||
},
|
||||
};
|
||||
},
|
||||
@@ -1661,6 +1663,86 @@ describe("TelegramPollingSession", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("stops refreshing a claim when the drain loop is stalled", async () => {
|
||||
vi.useFakeTimers({ now: 1_000 });
|
||||
const refreshHarness = installSpooledClaimRefreshHarness();
|
||||
await withTempSpool(async (tempDir) => {
|
||||
let blockedSecondClaim = false;
|
||||
let releaseSecondClaim: (() => void) | undefined;
|
||||
const secondClaimStarted = new Promise<void>((resolve) => {
|
||||
const gate = new Promise<void>((release) => {
|
||||
releaseSecondClaim = release;
|
||||
});
|
||||
setTelegramRuntime({
|
||||
state: {
|
||||
resolveStateDir: () => tempDir,
|
||||
openChannelIngressQueue: (
|
||||
options?: Omit<Parameters<typeof createChannelIngressQueue>[0], "channelId">,
|
||||
) => {
|
||||
const queue = createChannelIngressQueue({ ...options, channelId: "telegram" });
|
||||
return {
|
||||
...queue,
|
||||
claimNext: async (...args: Parameters<typeof queue.claimNext>) => {
|
||||
const claimOptions = args[0];
|
||||
const blockedLaneKeys = claimOptions?.blockedLaneKeys
|
||||
? Array.from(claimOptions.blockedLaneKeys)
|
||||
: [];
|
||||
const candidateIds = claimOptions?.candidateIds
|
||||
? Array.from(claimOptions.candidateIds)
|
||||
: [];
|
||||
if (
|
||||
candidateIds.includes("0000000000000043") &&
|
||||
blockedLaneKeys.length > 0 &&
|
||||
!blockedSecondClaim
|
||||
) {
|
||||
blockedSecondClaim = true;
|
||||
resolve();
|
||||
await gate;
|
||||
}
|
||||
return queue.claimNext(...args);
|
||||
},
|
||||
};
|
||||
},
|
||||
},
|
||||
} as TelegramRuntime);
|
||||
});
|
||||
const abort = new AbortController();
|
||||
let releaseHandler: (() => void) | undefined;
|
||||
const handlerDone = new Promise<void>((resolve) => {
|
||||
releaseHandler = resolve;
|
||||
});
|
||||
await writeSpooledTestUpdates(tempDir, [
|
||||
topicUpdate(42, 10, "first topic 10 turn"),
|
||||
topicUpdate(43, 11, "blocked topic 11 turn"),
|
||||
]);
|
||||
|
||||
const { runPromise, stopWorker } = startIsolatedIngressSession({
|
||||
abort,
|
||||
spoolDir: tempDir,
|
||||
handleUpdate: async () => {
|
||||
await handlerDone;
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
await secondClaimStarted;
|
||||
const before = await claimedAtForUpdate(tempDir, 42);
|
||||
vi.setSystemTime(1_000 + pollingSessionTesting.spooledClaimRefreshIntervalMs * 2 + 1);
|
||||
refreshHarness.triggerRefresh();
|
||||
await Promise.resolve();
|
||||
expect(await claimedAtForUpdate(tempDir, 42)).toBe(before);
|
||||
} finally {
|
||||
releaseSecondClaim?.();
|
||||
releaseHandler?.();
|
||||
abort.abort();
|
||||
stopWorker();
|
||||
refreshHarness.restore();
|
||||
vi.useRealTimers();
|
||||
await runPromise;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it("holds buffered spooled claims until deferred processing settles without blocking same-lane buffering", async () => {
|
||||
await withTempSpool(async (tempDir) => {
|
||||
const abort = new AbortController();
|
||||
@@ -2180,10 +2262,11 @@ describe("TelegramPollingSession", () => {
|
||||
if (!claimed) {
|
||||
throw new Error("Expected claimed update");
|
||||
}
|
||||
const liveOwnerPid = process.ppid > 0 ? process.ppid : 1;
|
||||
await adoptClaimOwner({
|
||||
spoolDir: tempDir,
|
||||
updateId: 42,
|
||||
ownerId: `${process.pid}:other-process`,
|
||||
ownerId: `${liveOwnerPid}:other-process`,
|
||||
claimedAt: Date.now(),
|
||||
});
|
||||
|
||||
@@ -2203,10 +2286,9 @@ describe("TelegramPollingSession", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("fails timed-out current-process claims before draining later same-lane updates", async () => {
|
||||
it("releases pid-reused claims before draining later same-lane updates", async () => {
|
||||
await withTempSpool(async (tempDir) => {
|
||||
const abort = new AbortController();
|
||||
const log = vi.fn();
|
||||
const events: string[] = [];
|
||||
await writeSpooledTestUpdates(tempDir, [
|
||||
topicUpdate(42, 10, "wedged topic 10 turn"),
|
||||
@@ -2232,7 +2314,6 @@ describe("TelegramPollingSession", () => {
|
||||
const { runPromise, stopWorker } = startIsolatedIngressSession({
|
||||
abort,
|
||||
spoolDir: tempDir,
|
||||
log,
|
||||
spooledUpdateHandlerTimeoutMs: 100,
|
||||
handleUpdate: async (update) => {
|
||||
events.push(`handled:${update.update_id}`);
|
||||
@@ -2240,21 +2321,58 @@ describe("TelegramPollingSession", () => {
|
||||
},
|
||||
});
|
||||
|
||||
await vi.waitFor(() => expect(events).toEqual(["handled:43"]));
|
||||
await vi.waitFor(() => expect(events).toEqual(["handled:42"]));
|
||||
await runPromise;
|
||||
expect(await failedUpdateReasons(tempDir)).toEqual([
|
||||
{ id: 42, reason: "lane-released-on-stuck" },
|
||||
]);
|
||||
expect(await pendingUpdateIds(tempDir, "all")).toEqual([]);
|
||||
expect(await failedUpdateReasons(tempDir)).toEqual([]);
|
||||
expect(await pendingUpdateIds(tempDir, "all")).toEqual([43]);
|
||||
expect(await listTelegramSpooledUpdateClaims({ spoolDir: tempDir })).toEqual([]);
|
||||
expectLogIncludes(
|
||||
log,
|
||||
"spooled update 42 Telegram spooled update claim owned by this process",
|
||||
);
|
||||
stopWorker();
|
||||
});
|
||||
});
|
||||
|
||||
it("reclaims an expired foreign claim so the lane can drain", async () => {
|
||||
await withTempSpool(async (tempDir) => {
|
||||
const abort = new AbortController();
|
||||
const events: number[] = [];
|
||||
await writeSpooledTestUpdates(tempDir, [
|
||||
topicUpdate(42, 10, "expired foreign claim"),
|
||||
topicUpdate(43, 10, "later topic 10 turn"),
|
||||
]);
|
||||
const interrupted = (await listTelegramSpooledUpdates({ spoolDir: tempDir })).find(
|
||||
(update) => update.updateId === 42,
|
||||
);
|
||||
if (!interrupted) {
|
||||
throw new Error("Expected interrupted update");
|
||||
}
|
||||
const claimed = await claimTelegramSpooledUpdate(interrupted);
|
||||
if (!claimed) {
|
||||
throw new Error("Expected claimed update");
|
||||
}
|
||||
await adoptClaimOwner({
|
||||
spoolDir: tempDir,
|
||||
updateId: 42,
|
||||
ownerId: "1:other-process",
|
||||
claimedAt: Date.now() - telegramSpooledUpdateClaimLeaseMs - 1,
|
||||
});
|
||||
|
||||
const { runPromise, stopWorker } = startIsolatedIngressSession({
|
||||
abort,
|
||||
spoolDir: tempDir,
|
||||
spooledUpdateHandlerTimeoutMs: 100,
|
||||
handleUpdate: async (update) => {
|
||||
events.push(update.update_id ?? -1);
|
||||
if (events.length === 2) {
|
||||
abort.abort();
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
await vi.waitFor(() => expect(events).toEqual([42, 43]));
|
||||
stopWorker();
|
||||
await runPromise;
|
||||
});
|
||||
});
|
||||
|
||||
it("scans past active-lane backlogs to start unrelated lanes", async () => {
|
||||
await withTempSpool(async (tempDir) => {
|
||||
const abort = new AbortController();
|
||||
|
||||
@@ -34,7 +34,7 @@ import { TelegramPollingTransportState } from "./polling-transport-state.js";
|
||||
import { TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS } from "./request-timeouts.js";
|
||||
import { getTelegramSequentialKey } from "./sequential-key.js";
|
||||
import {
|
||||
claimTelegramSpooledUpdate,
|
||||
claimNextTelegramSpooledUpdate,
|
||||
deleteTelegramSpooledUpdate,
|
||||
failTelegramSpooledUpdateClaim,
|
||||
isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess,
|
||||
@@ -44,6 +44,7 @@ import {
|
||||
refreshTelegramSpooledUpdateClaim,
|
||||
releaseTelegramSpooledUpdateClaim,
|
||||
resolveTelegramIngressSpoolDir,
|
||||
TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS,
|
||||
writeTelegramSpooledUpdate,
|
||||
type ClaimedTelegramSpooledUpdate,
|
||||
type TelegramSpooledUpdate,
|
||||
@@ -133,6 +134,7 @@ const TELEGRAM_SPOOLED_HANDLER_TIMEOUT_ENV = "OPENCLAW_TELEGRAM_SPOOLED_HANDLER_
|
||||
const TELEGRAM_SPOOLED_DRAIN_START_LIMIT = 100;
|
||||
const TELEGRAM_SPOOLED_DRAIN_SCAN_LIMIT = TELEGRAM_SPOOLED_DRAIN_START_LIMIT * 10;
|
||||
const TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS = 5 * 60 * 1000;
|
||||
const TELEGRAM_SPOOLED_CLAIM_HEALTH_GRACE_MS = 2 * TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS;
|
||||
const TELEGRAM_SPOOLED_SESSION_INIT_CONFLICT_RETRY_BASE_MS = 5_000;
|
||||
const TELEGRAM_SPOOLED_SESSION_INIT_CONFLICT_RETRY_MAX_MS = 60_000;
|
||||
const TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS = Math.ceil(
|
||||
@@ -324,6 +326,21 @@ type SpooledUpdateDrainResult = {
|
||||
// Account health restarts create a new session in the same process while an old
|
||||
// spooled handler may still be running after shutdown grace.
|
||||
const activeSpooledUpdateHandlersByLane = new Map<string, SpooledUpdateHandlerState>();
|
||||
type SpooledUpdateDrainHealth = {
|
||||
lastCompletedAt: number;
|
||||
};
|
||||
|
||||
const spooledUpdateDrainHealthBySpool = new Map<string, SpooledUpdateDrainHealth>();
|
||||
|
||||
function getSpooledUpdateDrainHealth(spoolDir: string): SpooledUpdateDrainHealth {
|
||||
const existing = spooledUpdateDrainHealthBySpool.get(spoolDir);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const created = { lastCompletedAt: Date.now() };
|
||||
spooledUpdateDrainHealthBySpool.set(spoolDir, created);
|
||||
return created;
|
||||
}
|
||||
|
||||
function resolveSpooledUpdateHandlerTimeoutMs(params: {
|
||||
configured?: number;
|
||||
@@ -564,32 +581,51 @@ export class TelegramPollingSession {
|
||||
}
|
||||
}
|
||||
|
||||
async #claimSpooledUpdate(
|
||||
update: TelegramSpooledUpdate,
|
||||
): Promise<ClaimedTelegramSpooledUpdate | null> {
|
||||
async #claimNextSpooledUpdate(params: {
|
||||
blockedLaneKeys: Set<string>;
|
||||
candidateUpdateIds: readonly number[];
|
||||
spoolDir: string;
|
||||
}): Promise<ClaimedTelegramSpooledUpdate | null> {
|
||||
try {
|
||||
return await claimTelegramSpooledUpdate(update);
|
||||
return await claimNextTelegramSpooledUpdate({
|
||||
spoolDir: params.spoolDir,
|
||||
blockedLaneKeys: params.blockedLaneKeys,
|
||||
botInfo: this.opts.botInfo,
|
||||
candidateUpdateIds: params.candidateUpdateIds,
|
||||
scanLimit: TELEGRAM_SPOOLED_DRAIN_SCAN_LIMIT,
|
||||
});
|
||||
} catch (err) {
|
||||
this.opts.log(
|
||||
`[telegram][diag] spooled update ${update.updateId} claim failed; keeping for retry: ${formatErrorMessage(err)}`,
|
||||
`[telegram][diag] spooled update claim failed; keeping pending updates for retry: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
#startSpooledUpdateClaimRefresh(update: ClaimedTelegramSpooledUpdate): () => void {
|
||||
// Refresh only while this process still owns useful work for this claim token.
|
||||
// Stopping before release/fail/delete lets stale recovery take over if work stalls.
|
||||
#startSpooledUpdateClaimRefresh(
|
||||
update: ClaimedTelegramSpooledUpdate,
|
||||
isDrainHealthy: () => boolean,
|
||||
onDrainUnhealthy: () => void,
|
||||
): () => void {
|
||||
// Refresh only while this process owns useful work and its drain loop is making progress.
|
||||
// Stopping the lease on a stalled drain lets another process recover the lane.
|
||||
let stopped = false;
|
||||
let refreshing = false;
|
||||
const refresh = async (): Promise<void> => {
|
||||
if (stopped || refreshing) {
|
||||
return;
|
||||
}
|
||||
if (!isDrainHealthy()) {
|
||||
onDrainUnhealthy();
|
||||
stopped = true;
|
||||
clearInterval(timer);
|
||||
return;
|
||||
}
|
||||
refreshing = true;
|
||||
try {
|
||||
const refreshed = await refreshTelegramSpooledUpdateClaim(update);
|
||||
if (!refreshed && !stopped) {
|
||||
onDrainUnhealthy();
|
||||
stopped = true;
|
||||
clearInterval(timer);
|
||||
}
|
||||
@@ -597,6 +633,11 @@ export class TelegramPollingSession {
|
||||
this.opts.log(
|
||||
`[telegram][diag] spooled update ${update.updateId} claim refresh failed: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
if (!stopped) {
|
||||
onDrainUnhealthy();
|
||||
stopped = true;
|
||||
clearInterval(timer);
|
||||
}
|
||||
} finally {
|
||||
refreshing = false;
|
||||
}
|
||||
@@ -730,58 +771,6 @@ export class TelegramPollingSession {
|
||||
return deferredSpooledUpdateClaimsByKey.has(buildDeferredSpooledUpdateClaimKey(update));
|
||||
}
|
||||
|
||||
async #failTimedOutCurrentProcessSpooledUpdateClaims(params: {
|
||||
activeLaneKeys: Set<string>;
|
||||
spoolDir: string;
|
||||
}): Promise<void> {
|
||||
const claims = await listTelegramSpooledUpdateClaims({ spoolDir: params.spoolDir });
|
||||
const now = Date.now();
|
||||
for (const claim of claims) {
|
||||
const claimOwner = claim.claim;
|
||||
if (!claimOwner) {
|
||||
continue;
|
||||
}
|
||||
if (this.#isDeferredSpooledUpdateClaim(claim)) {
|
||||
continue;
|
||||
}
|
||||
if (params.activeLaneKeys.has(this.#spooledUpdateLaneKey(claim))) {
|
||||
continue;
|
||||
}
|
||||
if (now - claimOwner.claimedAt < this.#spooledUpdateHandlerTimeoutMs) {
|
||||
continue;
|
||||
}
|
||||
if (!isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess(claim)) {
|
||||
continue;
|
||||
}
|
||||
// Same PID with a stale owner id means this process orphaned a previous
|
||||
// local handler state; different live PIDs may still be processing.
|
||||
if (claimOwner.processPid !== process.pid) {
|
||||
continue;
|
||||
}
|
||||
const claimedForMs = now - claimOwner.claimedAt;
|
||||
const message = `Telegram spooled update claim owned by this process for ${formatDurationPrecise(claimedForMs)} without active handler state; marking failed so the lane can continue.`;
|
||||
try {
|
||||
const failed = await failTelegramSpooledUpdateClaim({
|
||||
update: claim,
|
||||
reason: "lane-released-on-stuck",
|
||||
message,
|
||||
});
|
||||
if (!failed) {
|
||||
this.opts.log(
|
||||
`[telegram][diag] spooled update ${claim.updateId} current-process claim no longer had a processing marker to fail.`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
} catch (err) {
|
||||
this.opts.log(
|
||||
`[telegram][diag] spooled update ${claim.updateId} current-process claim could not be marked failed: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
this.opts.log(`[telegram][diag] spooled update ${claim.updateId} ${message}`);
|
||||
}
|
||||
}
|
||||
|
||||
async #failTimedOutDeferredSpooledUpdate(state: DeferredSpooledUpdateClaimState): Promise<void> {
|
||||
const message =
|
||||
state.timedOutMessage ??
|
||||
@@ -875,8 +864,12 @@ export class TelegramPollingSession {
|
||||
}
|
||||
|
||||
#spooledUpdateLaneKey(update: TelegramSpooledUpdate): string {
|
||||
return this.#rawSpooledUpdateLaneKey(update.update);
|
||||
}
|
||||
|
||||
#rawSpooledUpdateLaneKey(update: unknown): string {
|
||||
return getTelegramSequentialKey({
|
||||
update: update.update as Parameters<typeof getTelegramSequentialKey>[0]["update"],
|
||||
update: update as Parameters<typeof getTelegramSequentialKey>[0]["update"],
|
||||
...(this.opts.botInfo ? { me: this.opts.botInfo } : {}),
|
||||
});
|
||||
}
|
||||
@@ -904,20 +897,19 @@ export class TelegramPollingSession {
|
||||
|
||||
async #drainSpooledUpdates(params: {
|
||||
bot: TelegramBot;
|
||||
isDrainHealthy: () => boolean;
|
||||
spoolDir: string;
|
||||
}): Promise<SpooledUpdateDrainResult> {
|
||||
const activeLaneKeys = this.#activeSpooledUpdateLaneKeysForSpool(params.spoolDir);
|
||||
await this.#failTimedOutCurrentProcessSpooledUpdateClaims({
|
||||
activeLaneKeys,
|
||||
spoolDir: params.spoolDir,
|
||||
});
|
||||
await recoverStaleTelegramSpooledUpdateClaims({
|
||||
spoolDir: params.spoolDir,
|
||||
staleMs: 0,
|
||||
shouldRecover: (claim) =>
|
||||
!this.#isDeferredSpooledUpdateClaim(claim) &&
|
||||
!activeLaneKeys.has(this.#spooledUpdateLaneKey(claim)) &&
|
||||
!isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess(claim),
|
||||
!isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess(claim, {
|
||||
maxAgeMs: TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS,
|
||||
}),
|
||||
});
|
||||
const claimedLaneKeys = new Set(
|
||||
(
|
||||
@@ -932,31 +924,63 @@ export class TelegramPollingSession {
|
||||
spoolDir: params.spoolDir,
|
||||
limit: TELEGRAM_SPOOLED_DRAIN_SCAN_LIMIT,
|
||||
});
|
||||
const candidateUpdateIds = updates.map((update) => update.updateId);
|
||||
const blockedByLane = new Set<string>();
|
||||
let started = 0;
|
||||
const retryDelayedLaneKeys = new Set<string>();
|
||||
for (const update of updates) {
|
||||
const laneKey = this.#spooledUpdateLaneKey(update);
|
||||
if (this.opts.abortSignal?.aborted) {
|
||||
break;
|
||||
}
|
||||
if (resolveSpooledUpdateRetryDelayMs(update) > 0) {
|
||||
claimedLaneKeys.add(laneKey);
|
||||
continue;
|
||||
}
|
||||
const handlerKey = buildSpooledUpdateHandlerKey({ spoolDir: params.spoolDir, laneKey });
|
||||
if (activeSpooledUpdateHandlersByLane.has(handlerKey)) {
|
||||
blockedByLane.add(handlerKey);
|
||||
continue;
|
||||
}
|
||||
if (claimedLaneKeys.has(laneKey)) {
|
||||
continue;
|
||||
if (resolveSpooledUpdateRetryDelayMs(update) > 0) {
|
||||
retryDelayedLaneKeys.add(laneKey);
|
||||
}
|
||||
const claimedUpdate = await this.#claimSpooledUpdate(update);
|
||||
}
|
||||
const blockedLaneKeys = new Set([
|
||||
...activeLaneKeys,
|
||||
...claimedLaneKeys,
|
||||
...retryDelayedLaneKeys,
|
||||
]);
|
||||
let started = 0;
|
||||
while (started < TELEGRAM_SPOOLED_DRAIN_START_LIMIT) {
|
||||
if (this.opts.abortSignal?.aborted) {
|
||||
break;
|
||||
}
|
||||
const claimedUpdate = await this.#claimNextSpooledUpdate({
|
||||
blockedLaneKeys,
|
||||
candidateUpdateIds,
|
||||
spoolDir: params.spoolDir,
|
||||
});
|
||||
if (!claimedUpdate) {
|
||||
claimedLaneKeys.add(laneKey);
|
||||
break;
|
||||
}
|
||||
const laneKey = this.#spooledUpdateLaneKey(claimedUpdate);
|
||||
const handlerKey = buildSpooledUpdateHandlerKey({ spoolDir: params.spoolDir, laneKey });
|
||||
if (activeSpooledUpdateHandlersByLane.has(handlerKey)) {
|
||||
blockedByLane.add(handlerKey);
|
||||
await releaseTelegramSpooledUpdateClaim(claimedUpdate, {
|
||||
lastError: "active Telegram spool handler already owns this lane",
|
||||
});
|
||||
blockedLaneKeys.add(laneKey);
|
||||
continue;
|
||||
}
|
||||
const stopClaimRefresh = this.#startSpooledUpdateClaimRefresh(claimedUpdate);
|
||||
const stopClaimRefresh = this.#startSpooledUpdateClaimRefresh(
|
||||
claimedUpdate,
|
||||
params.isDrainHealthy,
|
||||
() => {
|
||||
const scopedReplyFenceLaneKey = buildTelegramReplyFenceLaneKey({
|
||||
accountId: this.opts.accountId,
|
||||
sequentialKey: laneKey,
|
||||
});
|
||||
const abortedReplyWork = supersedeTelegramReplyFenceLane(scopedReplyFenceLaneKey);
|
||||
if (!abortedReplyWork) {
|
||||
this.opts.log(
|
||||
`[telegram][diag] spooled update ${claimedUpdate.updateId} drain heartbeat expired without an active reply fence on lane ${laneKey}; stopping claim refresh.`,
|
||||
);
|
||||
}
|
||||
},
|
||||
);
|
||||
const handler = this.#handleClaimedSpooledUpdate({
|
||||
bot: params.bot,
|
||||
stopClaimRefresh,
|
||||
@@ -967,13 +991,13 @@ export class TelegramPollingSession {
|
||||
laneKey,
|
||||
task: handler,
|
||||
update: claimedUpdate,
|
||||
updateId: update.updateId,
|
||||
updateId: claimedUpdate.updateId,
|
||||
startedAt: Date.now(),
|
||||
stopClaimRefresh,
|
||||
};
|
||||
activeSpooledUpdateHandlersByLane.set(handlerKey, state);
|
||||
this.#spooledUpdateHandlerKeys.add(handlerKey);
|
||||
claimedLaneKeys.add(laneKey);
|
||||
blockedLaneKeys.add(laneKey);
|
||||
void handler.finally(() => {
|
||||
if (
|
||||
!deferredSpooledUpdateClaimsByKey.has(buildDeferredSpooledUpdateClaimKey(claimedUpdate))
|
||||
@@ -986,9 +1010,6 @@ export class TelegramPollingSession {
|
||||
this.#spooledUpdateHandlerKeys.delete(handlerKey);
|
||||
});
|
||||
started += 1;
|
||||
if (started >= TELEGRAM_SPOOLED_DRAIN_START_LIMIT) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return { blockedByLane, started };
|
||||
}
|
||||
@@ -1216,6 +1237,7 @@ export class TelegramPollingSession {
|
||||
void writeTelegramSpooledUpdate({
|
||||
spoolDir,
|
||||
update: message.update,
|
||||
laneKey: this.#rawSpooledUpdateLaneKey(message.update),
|
||||
}).then(
|
||||
(updateId) => {
|
||||
ackSpooledUpdate(message.requestId, { ok: true, updateId });
|
||||
@@ -1241,6 +1263,11 @@ export class TelegramPollingSession {
|
||||
this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
|
||||
const drainIntervalMs = Math.max(100, Math.floor(ingress.drainIntervalMs ?? 500));
|
||||
let drainActive = false;
|
||||
const drainHealth = getSpooledUpdateDrainHealth(spoolDir);
|
||||
// Fail closed when the spool stops making progress: keeping any claim live would
|
||||
// prevent a healthy process from recovering a wedged drain.
|
||||
const isDrainHealthy = () =>
|
||||
Date.now() - drainHealth.lastCompletedAt <= TELEGRAM_SPOOLED_CLAIM_HEALTH_GRACE_MS;
|
||||
const stopBot = () => {
|
||||
return Promise.resolve(bot.stop())
|
||||
.then(() => undefined)
|
||||
@@ -1282,8 +1309,13 @@ export class TelegramPollingSession {
|
||||
}
|
||||
drainActive = true;
|
||||
drainRequested = false;
|
||||
let drainCompleted = false;
|
||||
try {
|
||||
const drain = await this.#drainSpooledUpdates({ bot, spoolDir });
|
||||
const drain = await this.#drainSpooledUpdates({
|
||||
bot,
|
||||
isDrainHealthy,
|
||||
spoolDir,
|
||||
});
|
||||
consecutiveDrainFailures = 0;
|
||||
for (const handlerKey of stalledBacklogKeys) {
|
||||
if (
|
||||
@@ -1320,12 +1352,16 @@ export class TelegramPollingSession {
|
||||
} else if (timedOutRecovery) {
|
||||
stalledBacklogKeys.add(timedOutRecovery.handlerKey);
|
||||
}
|
||||
drainCompleted = true;
|
||||
} catch (err) {
|
||||
consecutiveDrainFailures += 1;
|
||||
this.opts.log(
|
||||
`[telegram][diag] isolated polling spool drain failed (${consecutiveDrainFailures}): ${formatErrorMessage(err)}`,
|
||||
);
|
||||
} finally {
|
||||
if (drainCompleted) {
|
||||
drainHealth.lastCompletedAt = Date.now();
|
||||
}
|
||||
drainActive = false;
|
||||
if (drainRequested && !restartRequested && !this.opts.abortSignal?.aborted) {
|
||||
drainRequested = false;
|
||||
@@ -1640,6 +1676,7 @@ const isGetUpdatesConflict = (err: unknown) => {
|
||||
export const testing = {
|
||||
resetActiveSpooledUpdateHandlersForTests: (): void => {
|
||||
activeSpooledUpdateHandlersByLane.clear();
|
||||
spooledUpdateDrainHealthBySpool.clear();
|
||||
},
|
||||
createTelegramRestartBackoffState,
|
||||
resetTelegramRestartBackoffState,
|
||||
|
||||
@@ -1650,6 +1650,49 @@ describe("sendMessageTelegram", () => {
|
||||
expect(res.messageId).toBe("71");
|
||||
});
|
||||
|
||||
it("does not reuse first-mode reply-to on media caption follow-up text", async () => {
|
||||
const chatId = "123";
|
||||
const longText = "A".repeat(1100);
|
||||
|
||||
const sendPhoto = vi.fn().mockResolvedValue({
|
||||
message_id: 70,
|
||||
chat: { id: chatId },
|
||||
});
|
||||
const sendMessage = vi.fn().mockResolvedValue({
|
||||
message_id: 71,
|
||||
chat: { id: chatId },
|
||||
});
|
||||
const api = { sendPhoto, sendMessage } as unknown as {
|
||||
sendPhoto: typeof sendPhoto;
|
||||
sendMessage: typeof sendMessage;
|
||||
};
|
||||
|
||||
mockLoadedMedia({
|
||||
buffer: Buffer.from("fake-image"),
|
||||
contentType: "image/jpeg",
|
||||
fileName: "photo.jpg",
|
||||
});
|
||||
|
||||
await sendMessageTelegram(chatId, longText, {
|
||||
cfg: TELEGRAM_TEST_CFG,
|
||||
token: "tok",
|
||||
api,
|
||||
mediaUrl: "https://example.com/photo.jpg",
|
||||
replyToMessageId: 500,
|
||||
replyToIdSource: "implicit",
|
||||
replyToMode: "first",
|
||||
});
|
||||
|
||||
expectMediaSendCall(firstMockCall(sendPhoto, "send photo call"), "send photo call", chatId, {
|
||||
caption: undefined,
|
||||
reply_to_message_id: 500,
|
||||
allow_sending_without_reply: true,
|
||||
});
|
||||
expect(sendMessage).toHaveBeenCalledWith(chatId, longText, {
|
||||
parse_mode: "HTML",
|
||||
});
|
||||
});
|
||||
|
||||
it("chunks long default markdown media follow-up text", async () => {
|
||||
const chatId = "123";
|
||||
const longText = `**${"A".repeat(5000)}**`;
|
||||
@@ -1658,7 +1701,10 @@ describe("sendMessageTelegram", () => {
|
||||
message_id: 72,
|
||||
chat: { id: chatId },
|
||||
});
|
||||
const sendMessage = vi.fn().mockResolvedValue({ message_id: 74, chat: { id: chatId } });
|
||||
const sendMessage = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ message_id: 73, chat: { id: chatId } })
|
||||
.mockResolvedValueOnce({ message_id: 74, chat: { id: chatId } });
|
||||
const api = { sendPhoto, sendMessage } as unknown as {
|
||||
sendPhoto: typeof sendPhoto;
|
||||
sendMessage: typeof sendMessage;
|
||||
@@ -1684,6 +1730,9 @@ describe("sendMessageTelegram", () => {
|
||||
expect(sendMessage.mock.calls.every((call) => call[2]?.parse_mode === "HTML")).toBe(true);
|
||||
expect(sendMessage.mock.calls.map((call) => String(call[1] ?? "")).join("")).toContain("A");
|
||||
expect(res.messageId).toBe("74");
|
||||
expect(res.receipt?.primaryPlatformMessageId).toBe("73");
|
||||
expect(res.receipt?.platformMessageIds).toEqual(["73", "74"]);
|
||||
expect(res.receipt?.parts.map((part) => part.kind)).toEqual(["text", "text"]);
|
||||
});
|
||||
|
||||
it("uses caption when text is within 1024 char limit", async () => {
|
||||
@@ -2499,6 +2548,93 @@ describe("sendMessageTelegram", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("returns a multipart receipt and avoids native replies for chunked first-mode text", async () => {
|
||||
const sendMessage = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ message_id: 101, chat: { id: "-1001234567890" } })
|
||||
.mockResolvedValueOnce({ message_id: 102, chat: { id: "-1001234567890" } });
|
||||
const api = { sendMessage } as unknown as {
|
||||
sendMessage: typeof sendMessage;
|
||||
};
|
||||
|
||||
const result = await sendMessageTelegram("-1001234567890", `BEGIN ${"A".repeat(4100)} END`, {
|
||||
cfg: TELEGRAM_TEST_CFG,
|
||||
token: "tok",
|
||||
api,
|
||||
messageThreadId: 271,
|
||||
replyToMessageId: 500,
|
||||
replyToIdSource: "implicit",
|
||||
replyToMode: "first",
|
||||
});
|
||||
|
||||
expect(sendMessage).toHaveBeenCalledTimes(2);
|
||||
expect(sendMessage.mock.calls[0]?.[2]).toEqual({
|
||||
parse_mode: "HTML",
|
||||
message_thread_id: 271,
|
||||
});
|
||||
expect(sendMessage.mock.calls[1]?.[2]).toEqual({
|
||||
parse_mode: "HTML",
|
||||
message_thread_id: 271,
|
||||
});
|
||||
expect(result.messageId).toBe("102");
|
||||
expect(result.receipt?.primaryPlatformMessageId).toBe("101");
|
||||
expect(result.receipt?.platformMessageIds).toEqual(["101", "102"]);
|
||||
expect(result.receipt?.threadId).toBe("271");
|
||||
expect(result.receipt?.replyToId).toBeUndefined();
|
||||
expect(
|
||||
result.receipt?.parts.map(({ platformMessageId, kind, index, threadId, replyToId }) => ({
|
||||
platformMessageId,
|
||||
kind,
|
||||
index,
|
||||
threadId,
|
||||
replyToId,
|
||||
})),
|
||||
).toEqual([
|
||||
{
|
||||
platformMessageId: "101",
|
||||
kind: "text",
|
||||
index: 0,
|
||||
threadId: "271",
|
||||
replyToId: undefined,
|
||||
},
|
||||
{
|
||||
platformMessageId: "102",
|
||||
kind: "text",
|
||||
index: 1,
|
||||
threadId: "271",
|
||||
replyToId: undefined,
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("keeps explicit native replies for chunked first-mode text", async () => {
|
||||
const sendMessage = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ message_id: 101, chat: { id: "-1001234567890" } })
|
||||
.mockResolvedValueOnce({ message_id: 102, chat: { id: "-1001234567890" } });
|
||||
const api = { sendMessage } as unknown as {
|
||||
sendMessage: typeof sendMessage;
|
||||
};
|
||||
|
||||
await sendMessageTelegram("-1001234567890", `BEGIN ${"A".repeat(4100)} END`, {
|
||||
cfg: TELEGRAM_TEST_CFG,
|
||||
token: "tok",
|
||||
api,
|
||||
replyToMessageId: 500,
|
||||
replyToIdSource: "explicit",
|
||||
replyToMode: "first",
|
||||
});
|
||||
|
||||
expect(sendMessage.mock.calls[0]?.[2]).toMatchObject({
|
||||
reply_to_message_id: 500,
|
||||
allow_sending_without_reply: true,
|
||||
});
|
||||
expect(sendMessage.mock.calls[1]?.[2]).toMatchObject({
|
||||
reply_to_message_id: 500,
|
||||
allow_sending_without_reply: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("fails topic sends instead of retrying without message_thread_id", async () => {
|
||||
const cases = [{ name: "forum", chatId: "-100123", text: "hello forum" }] as const;
|
||||
const threadErr = new Error("400: Bad Request: message thread not found");
|
||||
|
||||
@@ -3,12 +3,17 @@ import * as grammy from "grammy";
|
||||
import { type ApiClientOptions, Bot, HttpError } from "grammy";
|
||||
import type { ReactionType, ReactionTypeEmoji } from "grammy/types";
|
||||
import { recordChannelActivity } from "openclaw/plugin-sdk/channel-activity-runtime";
|
||||
import type { MarkdownTableMode } from "openclaw/plugin-sdk/config-contracts";
|
||||
import {
|
||||
createMessageReceiptFromOutboundResults,
|
||||
type MessageReceipt,
|
||||
} from "openclaw/plugin-sdk/channel-outbound";
|
||||
import type { MarkdownTableMode, ReplyToMode } from "openclaw/plugin-sdk/config-contracts";
|
||||
import { isDiagnosticFlagEnabled } from "openclaw/plugin-sdk/diagnostic-runtime";
|
||||
import { formatUncaughtError } from "openclaw/plugin-sdk/error-runtime";
|
||||
import { redactSensitiveText } from "openclaw/plugin-sdk/logging-core";
|
||||
import { parseStrictInteger } from "openclaw/plugin-sdk/number-runtime";
|
||||
import { resolveChunkMode, resolveTextChunkLimit } from "openclaw/plugin-sdk/reply-chunking";
|
||||
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
|
||||
import { createTelegramRetryRunner, type RetryConfig } from "openclaw/plugin-sdk/retry-runtime";
|
||||
import { createSubsystemLogger, logVerbose } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime";
|
||||
@@ -84,6 +89,8 @@ type TelegramEditMessageCaptionParams = Parameters<TelegramApi["editMessageCapti
|
||||
type TelegramCreateForumTopicParams = NonNullable<Parameters<TelegramApi["createForumTopic"]>[2]>;
|
||||
type TelegramThreadScopedParams = {
|
||||
message_thread_id?: number;
|
||||
reply_parameters?: { message_id?: number };
|
||||
reply_to_message_id?: number;
|
||||
};
|
||||
const InputFileCtor = grammy.InputFile;
|
||||
const MAX_TELEGRAM_PHOTO_DIMENSION_SUM = 10_000;
|
||||
@@ -111,6 +118,10 @@ type TelegramSendOpts = {
|
||||
silent?: boolean;
|
||||
/** Message ID to reply to (for threading) */
|
||||
replyToMessageId?: number;
|
||||
/** Whether replyToMessageId came from ambient context or explicit payload/action input. */
|
||||
replyToIdSource?: "explicit" | "implicit";
|
||||
/** Controls whether replyToMessageId is applied to every internal text chunk. */
|
||||
replyToMode?: ReplyToMode;
|
||||
/** Quote text for Telegram reply_parameters. */
|
||||
quoteText?: string;
|
||||
/** Forum topic thread ID (for forum supergroups) */
|
||||
@@ -124,6 +135,7 @@ type TelegramSendOpts = {
|
||||
type TelegramSendResult = {
|
||||
messageId: string;
|
||||
chatId: string;
|
||||
receipt?: MessageReceipt;
|
||||
};
|
||||
|
||||
type TelegramMessageLike = {
|
||||
@@ -274,6 +286,42 @@ function logTelegramOutboundSendOk(params: TelegramOutboundSuccessLogParams): vo
|
||||
sendLogger.info(parts.join(" "));
|
||||
}
|
||||
|
||||
function buildTelegramTextSendReceipt(params: {
|
||||
messageIds: readonly string[];
|
||||
chatId: string;
|
||||
messageThreadId?: number;
|
||||
replyToMessageId?: number;
|
||||
}): MessageReceipt | undefined {
|
||||
if (params.messageIds.length <= 1) {
|
||||
return undefined;
|
||||
}
|
||||
return createMessageReceiptFromOutboundResults({
|
||||
results: params.messageIds.map((messageId) => ({
|
||||
messageId,
|
||||
chatId: params.chatId,
|
||||
})),
|
||||
kind: "text",
|
||||
...(typeof params.messageThreadId === "number"
|
||||
? { threadId: String(params.messageThreadId) }
|
||||
: {}),
|
||||
...(typeof params.replyToMessageId === "number"
|
||||
? { replyToId: String(params.replyToMessageId) }
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
|
||||
function resolveAcceptedReplyToMessageId(
|
||||
params: TelegramThreadScopedParams | TelegramRichMessageContextParams | undefined,
|
||||
): number | undefined {
|
||||
if (!params) {
|
||||
return undefined;
|
||||
}
|
||||
if ("reply_to_message_id" in params) {
|
||||
return params.reply_to_message_id;
|
||||
}
|
||||
return params.reply_parameters?.message_id;
|
||||
}
|
||||
|
||||
const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i;
|
||||
const MESSAGE_NOT_MODIFIED_RE =
|
||||
/400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i;
|
||||
@@ -661,19 +709,26 @@ export async function sendMessageTelegram(
|
||||
(typeof account.config.mediaMaxMb === "number" ? account.config.mediaMaxMb : 100) * 1024 * 1024;
|
||||
const replyMarkup = buildInlineKeyboard(opts.buttons);
|
||||
|
||||
const threadParams = buildTelegramThreadReplyParams({
|
||||
thread: resolveTelegramSendThreadSpec({
|
||||
targetMessageThreadId: target.messageThreadId,
|
||||
messageThreadId: opts.messageThreadId,
|
||||
chatType: target.chatType,
|
||||
}),
|
||||
replyToMessageId: opts.replyToMessageId,
|
||||
replyQuoteText: opts.quoteText,
|
||||
useReplyIdAsQuoteSource: true,
|
||||
const threadSpec = resolveTelegramSendThreadSpec({
|
||||
targetMessageThreadId: target.messageThreadId,
|
||||
messageThreadId: opts.messageThreadId,
|
||||
chatType: target.chatType,
|
||||
});
|
||||
const richThreadParams = toTelegramRichMessageContextParams(threadParams);
|
||||
const hasThreadParams = Object.keys(threadParams).length > 0;
|
||||
const hasRichThreadParams = Object.keys(richThreadParams).length > 0;
|
||||
const singleUseReplyTo =
|
||||
opts.replyToIdSource === "implicit" &&
|
||||
opts.replyToMode !== undefined &&
|
||||
isSingleUseReplyToMode(opts.replyToMode);
|
||||
const buildThreadParams = (includeReplyTo: boolean) =>
|
||||
buildTelegramThreadReplyParams({
|
||||
thread: threadSpec,
|
||||
...(includeReplyTo
|
||||
? {
|
||||
replyToMessageId: opts.replyToMessageId,
|
||||
replyQuoteText: opts.quoteText,
|
||||
useReplyIdAsQuoteSource: true,
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
const requestWithDiag = createTelegramNonIdempotentRequestWithDiag({
|
||||
cfg,
|
||||
account,
|
||||
@@ -746,29 +801,59 @@ export async function sendMessageTelegram(
|
||||
return { result, acceptedParams: params };
|
||||
};
|
||||
|
||||
const buildTextParams = (isLastChunk: boolean) =>
|
||||
hasThreadParams || (isLastChunk && replyMarkup)
|
||||
? {
|
||||
...threadParams,
|
||||
...(isLastChunk && replyMarkup ? { reply_markup: replyMarkup } : {}),
|
||||
}
|
||||
: undefined;
|
||||
const shouldIncludeReplyForChunk = (
|
||||
index: number,
|
||||
chunkCount: number,
|
||||
replyToAlreadyUsed: boolean,
|
||||
) =>
|
||||
// Telegram Desktop can render long formatted reply chunks as unsupported messages.
|
||||
// Multi-part `first` replies keep chat/topic routing but avoid hiding chunk text.
|
||||
!replyToAlreadyUsed && (!singleUseReplyTo || (chunkCount === 1 && index === 0));
|
||||
|
||||
const buildRichTextParams = (isLastChunk: boolean) =>
|
||||
hasRichThreadParams || (isLastChunk && replyMarkup)
|
||||
const buildTextParams = (
|
||||
index: number,
|
||||
chunkCount: number,
|
||||
isLastChunk: boolean,
|
||||
replyToAlreadyUsed: boolean,
|
||||
) => {
|
||||
const params = buildThreadParams(
|
||||
shouldIncludeReplyForChunk(index, chunkCount, replyToAlreadyUsed),
|
||||
);
|
||||
return Object.keys(params).length > 0 || (isLastChunk && replyMarkup)
|
||||
? {
|
||||
...richThreadParams,
|
||||
...params,
|
||||
...(isLastChunk && replyMarkup ? { reply_markup: replyMarkup } : {}),
|
||||
}
|
||||
: undefined;
|
||||
};
|
||||
|
||||
const buildRichTextParams = (
|
||||
index: number,
|
||||
chunkCount: number,
|
||||
isLastChunk: boolean,
|
||||
replyToAlreadyUsed: boolean,
|
||||
) => {
|
||||
const params = toTelegramRichMessageContextParams(
|
||||
buildThreadParams(shouldIncludeReplyForChunk(index, chunkCount, replyToAlreadyUsed)),
|
||||
);
|
||||
return Object.keys(params).length > 0 || (isLastChunk && replyMarkup)
|
||||
? {
|
||||
...params,
|
||||
...(isLastChunk && replyMarkup ? { reply_markup: replyMarkup } : {}),
|
||||
}
|
||||
: undefined;
|
||||
};
|
||||
|
||||
const sendTelegramTextChunks = async (
|
||||
chunks: TelegramTextChunk[],
|
||||
context: string,
|
||||
): Promise<{ messageId: string; chatId: string }> => {
|
||||
options: { replyToAlreadyUsed?: boolean } = {},
|
||||
): Promise<TelegramSendResult> => {
|
||||
let lastMessageId = "";
|
||||
let lastChatId = chatId;
|
||||
let lastAcceptedParams: TelegramThreadScopedParams | undefined;
|
||||
let acceptedReplyToMessageId: number | undefined;
|
||||
const messageIds: string[] = [];
|
||||
let sentChunkCount = 0;
|
||||
for (let index = 0; index < chunks.length; index += 1) {
|
||||
const chunk = chunks[index];
|
||||
@@ -777,7 +862,12 @@ export async function sendMessageTelegram(
|
||||
}
|
||||
const { result: res, acceptedParams } = await sendTelegramTextChunk(
|
||||
chunk,
|
||||
buildTextParams(index === chunks.length - 1),
|
||||
buildTextParams(
|
||||
index,
|
||||
chunks.length,
|
||||
index === chunks.length - 1,
|
||||
options.replyToAlreadyUsed === true,
|
||||
),
|
||||
);
|
||||
const messageId = resolveTelegramMessageIdOrThrow(res, context);
|
||||
recordSentMessage(chatId, messageId, cfg);
|
||||
@@ -795,6 +885,8 @@ export async function sendMessageTelegram(
|
||||
lastMessageId = String(messageId);
|
||||
lastChatId = String(res?.chat?.id ?? chatId);
|
||||
lastAcceptedParams = acceptedParams;
|
||||
acceptedReplyToMessageId ??= resolveAcceptedReplyToMessageId(acceptedParams);
|
||||
messageIds.push(lastMessageId);
|
||||
sentChunkCount += 1;
|
||||
}
|
||||
if (lastMessageId) {
|
||||
@@ -810,7 +902,17 @@ export async function sendMessageTelegram(
|
||||
chunkCount: sentChunkCount,
|
||||
});
|
||||
}
|
||||
return { messageId: lastMessageId, chatId: lastChatId };
|
||||
const receipt = buildTelegramTextSendReceipt({
|
||||
messageIds,
|
||||
chatId: lastChatId,
|
||||
messageThreadId: lastAcceptedParams?.message_thread_id,
|
||||
replyToMessageId: acceptedReplyToMessageId,
|
||||
});
|
||||
return {
|
||||
messageId: lastMessageId,
|
||||
chatId: lastChatId,
|
||||
...(receipt ? { receipt } : {}),
|
||||
};
|
||||
};
|
||||
|
||||
const buildChunkedTextPlan = (rawText: string, context: string): TelegramTextChunk[] => {
|
||||
@@ -841,10 +943,14 @@ export async function sendMessageTelegram(
|
||||
}));
|
||||
};
|
||||
|
||||
const sendChunkedText = async (rawText: string, context: string) =>
|
||||
const sendChunkedText = async (
|
||||
rawText: string,
|
||||
context: string,
|
||||
options: { replyToAlreadyUsed?: boolean } = {},
|
||||
) =>
|
||||
useRichMessages
|
||||
? await sendTelegramRichTextChunks(buildRichTextPlan(rawText), context)
|
||||
: await sendTelegramTextChunks(buildChunkedTextPlan(rawText, context), context);
|
||||
? await sendTelegramRichTextChunks(buildRichTextPlan(rawText), context, options)
|
||||
: await sendTelegramTextChunks(buildChunkedTextPlan(rawText, context), context, options);
|
||||
|
||||
const buildRichTextPlan = (rawText: string): TelegramRichTextChunk[] => {
|
||||
const textLimit = Math.min(
|
||||
@@ -866,18 +972,26 @@ export async function sendMessageTelegram(
|
||||
const sendTelegramRichTextChunks = async (
|
||||
chunks: TelegramRichTextChunk[],
|
||||
context: string,
|
||||
): Promise<{ messageId: string; chatId: string }> => {
|
||||
options: { replyToAlreadyUsed?: boolean } = {},
|
||||
): Promise<TelegramSendResult> => {
|
||||
const richRawApi = getTelegramRichRawApi(api);
|
||||
let lastMessageId = "";
|
||||
let lastChatId = chatId;
|
||||
let lastAcceptedParams: TelegramRichMessageContextParams | undefined;
|
||||
let acceptedReplyToMessageId: number | undefined;
|
||||
const messageIds: string[] = [];
|
||||
let sentChunkCount = 0;
|
||||
for (let index = 0; index < chunks.length; index += 1) {
|
||||
const chunk = chunks[index];
|
||||
if (!chunk) {
|
||||
continue;
|
||||
}
|
||||
const acceptedParams = buildRichTextParams(index === chunks.length - 1);
|
||||
const acceptedParams = buildRichTextParams(
|
||||
index,
|
||||
chunks.length,
|
||||
index === chunks.length - 1,
|
||||
options.replyToAlreadyUsed === true,
|
||||
);
|
||||
const result = await requestWithChatNotFound(
|
||||
() =>
|
||||
richRawApi.sendRichMessage({
|
||||
@@ -907,6 +1021,8 @@ export async function sendMessageTelegram(
|
||||
lastMessageId = String(messageId);
|
||||
lastChatId = String(result?.chat?.id ?? chatId);
|
||||
lastAcceptedParams = acceptedParams;
|
||||
acceptedReplyToMessageId ??= resolveAcceptedReplyToMessageId(acceptedParams);
|
||||
messageIds.push(lastMessageId);
|
||||
sentChunkCount += 1;
|
||||
}
|
||||
if (lastMessageId) {
|
||||
@@ -922,7 +1038,17 @@ export async function sendMessageTelegram(
|
||||
chunkCount: sentChunkCount,
|
||||
});
|
||||
}
|
||||
return { messageId: lastMessageId, chatId: lastChatId };
|
||||
const receipt = buildTelegramTextSendReceipt({
|
||||
messageIds,
|
||||
chatId: lastChatId,
|
||||
messageThreadId: lastAcceptedParams?.message_thread_id,
|
||||
replyToMessageId: acceptedReplyToMessageId,
|
||||
});
|
||||
return {
|
||||
messageId: lastMessageId,
|
||||
chatId: lastChatId,
|
||||
...(receipt ? { receipt } : {}),
|
||||
};
|
||||
};
|
||||
|
||||
async function shouldSendTelegramImageAsPhoto(buffer: Buffer): Promise<boolean> {
|
||||
@@ -1001,8 +1127,10 @@ export async function sendMessageTelegram(
|
||||
const needsSeparateText = Boolean(followUpText);
|
||||
// When splitting, put reply_markup only on the follow-up text (the "main" content),
|
||||
// not on the media message.
|
||||
const mediaThreadParams = buildThreadParams(true);
|
||||
const mediaUsedReplyTo = resolveAcceptedReplyToMessageId(mediaThreadParams) !== undefined;
|
||||
const baseMediaParams = {
|
||||
...(hasThreadParams ? threadParams : {}),
|
||||
...mediaThreadParams,
|
||||
...(!needsSeparateText && replyMarkup ? { reply_markup: replyMarkup } : {}),
|
||||
};
|
||||
const videoDimensions =
|
||||
@@ -1145,8 +1273,13 @@ export async function sendMessageTelegram(
|
||||
// If text was too long for a caption, send it as a separate follow-up message.
|
||||
// Use HTML conversion so markdown renders like captions.
|
||||
if (needsSeparateText && followUpText) {
|
||||
const textResult = await sendChunkedText(followUpText, "text follow-up send");
|
||||
return { messageId: textResult.messageId, chatId: resolvedChatId };
|
||||
const textResult = await sendChunkedText(followUpText, "text follow-up send", {
|
||||
replyToAlreadyUsed: singleUseReplyTo && mediaUsedReplyTo,
|
||||
});
|
||||
return {
|
||||
...textResult,
|
||||
chatId: resolvedChatId,
|
||||
};
|
||||
}
|
||||
|
||||
return { messageId: String(mediaMessageId), chatId: resolvedChatId };
|
||||
@@ -1219,7 +1352,7 @@ export async function reactMessageTelegram(
|
||||
account,
|
||||
retry: opts.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "react" }),
|
||||
});
|
||||
const remove = opts.remove === true;
|
||||
const trimmedEmoji = emoji.trim();
|
||||
@@ -1276,7 +1409,7 @@ export async function deleteMessageTelegram(
|
||||
account,
|
||||
retry: opts.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "delete" }),
|
||||
});
|
||||
try {
|
||||
await requestWithDiag(() => api.deleteMessage(chatId, messageId), "deleteMessage", {
|
||||
|
||||
@@ -10,6 +10,7 @@ import { afterEach, describe, expect, it } from "vitest";
|
||||
import { clearTelegramRuntime, setTelegramRuntime } from "./runtime.js";
|
||||
import type { TelegramRuntime } from "./runtime.types.js";
|
||||
import {
|
||||
claimNextTelegramSpooledUpdate,
|
||||
claimTelegramSpooledUpdate,
|
||||
deleteTelegramSpooledUpdate,
|
||||
failTelegramSpooledUpdateClaim,
|
||||
@@ -118,6 +119,120 @@ describe("Telegram ingress spool", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("claims next update through the native ingress queue in update id order", async () => {
|
||||
await withTempSpool(async (spoolDir) => {
|
||||
await writeTelegramSpooledUpdate({
|
||||
spoolDir,
|
||||
update: { update_id: 101, message: { chat: { id: 1 }, message_id: 1, text: "second" } },
|
||||
now: 1,
|
||||
});
|
||||
await writeTelegramSpooledUpdate({
|
||||
spoolDir,
|
||||
update: { update_id: 100, message: { chat: { id: 1 }, message_id: 2, text: "first" } },
|
||||
now: 2,
|
||||
});
|
||||
|
||||
const claimed = await claimNextTelegramSpooledUpdate({ spoolDir });
|
||||
|
||||
expect(claimed?.updateId).toBe(100);
|
||||
expect(await listTelegramSpooledUpdates({ spoolDir })).toHaveLength(1);
|
||||
expect(
|
||||
(await listTelegramSpooledUpdateClaims({ spoolDir })).map((claim) => claim.updateId),
|
||||
).toEqual([100]);
|
||||
});
|
||||
});
|
||||
|
||||
it("derives lane keys while claiming legacy rows without stored lane keys", async () => {
|
||||
await withTempSpool(async (spoolDir) => {
|
||||
const stateDir = path.dirname(path.dirname(spoolDir));
|
||||
const queue = createChannelIngressQueue<{
|
||||
version: 1;
|
||||
updateId: number;
|
||||
receivedAt: number;
|
||||
update: unknown;
|
||||
}>({
|
||||
channelId: "telegram",
|
||||
accountId: "test",
|
||||
stateDir,
|
||||
});
|
||||
await queue.enqueue(
|
||||
"0000000000000042",
|
||||
{
|
||||
version: 1,
|
||||
updateId: 42,
|
||||
receivedAt: 1,
|
||||
update: {
|
||||
update_id: 42,
|
||||
message: {
|
||||
chat: { id: 100, type: "supergroup", is_forum: true },
|
||||
is_topic_message: true,
|
||||
message_id: 1,
|
||||
message_thread_id: 10,
|
||||
text: "blocked topic",
|
||||
},
|
||||
},
|
||||
},
|
||||
{ receivedAt: 1 },
|
||||
);
|
||||
await queue.enqueue(
|
||||
"0000000000000043",
|
||||
{
|
||||
version: 1,
|
||||
updateId: 43,
|
||||
receivedAt: 2,
|
||||
update: {
|
||||
update_id: 43,
|
||||
message: {
|
||||
chat: { id: 100, type: "supergroup", is_forum: true },
|
||||
is_topic_message: true,
|
||||
message_id: 2,
|
||||
message_thread_id: 11,
|
||||
text: "open topic",
|
||||
},
|
||||
},
|
||||
},
|
||||
{ receivedAt: 2 },
|
||||
);
|
||||
|
||||
const claimed = await claimNextTelegramSpooledUpdate({
|
||||
spoolDir,
|
||||
blockedLaneKeys: ["telegram:100:topic:10"],
|
||||
});
|
||||
|
||||
expect(claimed?.updateId).toBe(43);
|
||||
expect(claimed?.claim?.claimToken).toEqual(expect.any(String));
|
||||
expect(
|
||||
(await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId),
|
||||
).toEqual([42]);
|
||||
});
|
||||
});
|
||||
|
||||
it("does not claim outside the provided candidate update ids", async () => {
|
||||
await withTempSpool(async (spoolDir) => {
|
||||
await writeTelegramSpooledUpdate({
|
||||
spoolDir,
|
||||
update: { update_id: 200, message: { chat: { id: 1 }, message_id: 1, text: "first" } },
|
||||
now: 1,
|
||||
});
|
||||
await writeTelegramSpooledUpdate({
|
||||
spoolDir,
|
||||
update: { update_id: 201, message: { chat: { id: 2 }, message_id: 1, text: "later" } },
|
||||
now: 2,
|
||||
});
|
||||
|
||||
const claimed = await claimNextTelegramSpooledUpdate({
|
||||
spoolDir,
|
||||
blockedLaneKeys: ["telegram:1"],
|
||||
candidateUpdateIds: [200],
|
||||
});
|
||||
|
||||
expect(claimed).toBeNull();
|
||||
expect(
|
||||
(await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId),
|
||||
).toEqual([200, 201]);
|
||||
});
|
||||
});
|
||||
|
||||
it("releases failed claims back to the pending spool", async () => {
|
||||
await withTempSpool(async (spoolDir) => {
|
||||
await writeTelegramSpooledUpdate({
|
||||
@@ -303,7 +418,7 @@ describe("Telegram ingress spool", () => {
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("treats fresh claims with reused pids and different owner ids as live-owned", () => {
|
||||
it("does not treat fresh claims with the current pid and a different owner id as foreign", () => {
|
||||
const now = Date.now();
|
||||
expect(
|
||||
isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess({
|
||||
@@ -318,6 +433,25 @@ describe("Telegram ingress spool", () => {
|
||||
claimedAt: now,
|
||||
},
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("treats fresh claims with other live pids as live-owned", () => {
|
||||
const now = Date.now();
|
||||
const liveOwnerPid = process.ppid > 0 ? process.ppid : 1;
|
||||
expect(
|
||||
isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess({
|
||||
updateId: 51,
|
||||
path: path.join(os.tmpdir(), "51.json.processing"),
|
||||
pendingPath: path.join(os.tmpdir(), "51.json"),
|
||||
update: { update_id: 51 },
|
||||
receivedAt: now,
|
||||
claim: {
|
||||
processId: "other-process",
|
||||
processPid: liveOwnerPid,
|
||||
claimedAt: now,
|
||||
},
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -9,12 +9,15 @@ import type {
|
||||
ChannelIngressQueueRecord,
|
||||
} from "openclaw/plugin-sdk/channel-outbound";
|
||||
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
|
||||
import type { TelegramBotInfo } from "./bot-info.js";
|
||||
import { getTelegramRuntime } from "./runtime.js";
|
||||
import { getTelegramSequentialKey } from "./sequential-key.js";
|
||||
import { normalizeTelegramStateAccountId } from "./state-account-id.js";
|
||||
|
||||
const SPOOL_VERSION = 1;
|
||||
const TELEGRAM_INGRESS_SPOOL_PREFIX = "ingress-spool-";
|
||||
export const TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS = 6 * 60 * 60 * 1000;
|
||||
export const TELEGRAM_SPOOLED_UPDATE_CLAIM_LEASE_MS = 30 * 60 * 1000;
|
||||
const TELEGRAM_SPOOLED_UPDATE_FAILED_TTL_MS = 30 * 24 * 60 * 60 * 1000;
|
||||
const TELEGRAM_SPOOLED_UPDATE_FAILED_MAX_ENTRIES = 1000;
|
||||
const TELEGRAM_SPOOLED_UPDATE_PROCESS_ID = `${process.pid}:${randomUUID()}`;
|
||||
@@ -152,8 +155,13 @@ function processExists(pid: number): boolean {
|
||||
}
|
||||
}
|
||||
|
||||
function isFreshClaimOwner(claim: TelegramSpooledUpdateClaimOwner): boolean {
|
||||
return Date.now() - claim.claimedAt < TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS;
|
||||
function isFreshClaimOwner(
|
||||
claim: TelegramSpooledUpdateClaimOwner,
|
||||
options?: { maxAgeMs?: number; now?: number },
|
||||
): boolean {
|
||||
const now = options?.now ?? Date.now();
|
||||
const maxAgeMs = options?.maxAgeMs ?? TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS;
|
||||
return now - claim.claimedAt < maxAgeMs;
|
||||
}
|
||||
|
||||
function parseQueueRecord(
|
||||
@@ -196,6 +204,13 @@ function parseQueueClaim(
|
||||
};
|
||||
}
|
||||
|
||||
function spooledUpdateLaneKey(update: unknown, botInfo?: TelegramBotInfo): string {
|
||||
return getTelegramSequentialKey({
|
||||
update: update as Parameters<typeof getTelegramSequentialKey>[0]["update"],
|
||||
...(botInfo ? { me: botInfo } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
function sortTelegramUpdates<T extends TelegramSpooledUpdate>(updates: T[]): T[] {
|
||||
return updates.toSorted((a, b) => a.updateId - b.updateId);
|
||||
}
|
||||
@@ -207,18 +222,27 @@ function queueMutationTarget(update: TelegramSpooledUpdate): string | ChannelIng
|
||||
|
||||
export function isTelegramSpooledUpdateClaimOwnedByOtherLiveProcess(
|
||||
claim: ClaimedTelegramSpooledUpdate,
|
||||
options?: { maxAgeMs?: number; now?: number },
|
||||
): boolean {
|
||||
return Boolean(
|
||||
claim.claim &&
|
||||
claim.claim.processId !== TELEGRAM_SPOOLED_UPDATE_PROCESS_ID &&
|
||||
isFreshClaimOwner(claim.claim) &&
|
||||
claim.claim.processPid !== process.pid &&
|
||||
isFreshClaimOwner(claim.claim, options) &&
|
||||
processExists(claim.claim.processPid),
|
||||
);
|
||||
}
|
||||
|
||||
export function isTelegramSpooledUpdateClaimOwnedByCurrentProcess(
|
||||
claim: ClaimedTelegramSpooledUpdate,
|
||||
): boolean {
|
||||
return claim.claim?.processId === TELEGRAM_SPOOLED_UPDATE_PROCESS_ID;
|
||||
}
|
||||
|
||||
export async function writeTelegramSpooledUpdate(params: {
|
||||
spoolDir: string;
|
||||
update: unknown;
|
||||
laneKey?: string;
|
||||
now?: number;
|
||||
}): Promise<number> {
|
||||
const updateId = resolveTelegramUpdateId(params.update);
|
||||
@@ -236,7 +260,10 @@ export async function writeTelegramSpooledUpdate(params: {
|
||||
receivedAt,
|
||||
update: params.update,
|
||||
},
|
||||
{ receivedAt },
|
||||
{
|
||||
receivedAt,
|
||||
laneKey: params.laneKey ?? spooledUpdateLaneKey(params.update),
|
||||
},
|
||||
);
|
||||
return updateId;
|
||||
}
|
||||
@@ -271,6 +298,38 @@ export async function claimTelegramSpooledUpdate(
|
||||
return claimed ? parseQueueClaim(spoolDir, claimed) : null;
|
||||
}
|
||||
|
||||
export async function claimNextTelegramSpooledUpdate(params: {
|
||||
spoolDir: string;
|
||||
blockedLaneKeys?: Iterable<string>;
|
||||
botInfo?: TelegramBotInfo;
|
||||
candidateUpdateIds?: Iterable<number>;
|
||||
scanLimit?: number;
|
||||
}): Promise<ClaimedTelegramSpooledUpdate | null> {
|
||||
const queue = createTelegramIngressQueue(params.spoolDir);
|
||||
const claimed = await queue.claimNext({
|
||||
ownerId: TELEGRAM_SPOOLED_UPDATE_PROCESS_ID,
|
||||
blockedLaneKeys: params.blockedLaneKeys,
|
||||
...(params.candidateUpdateIds === undefined
|
||||
? {}
|
||||
: { candidateIds: [...params.candidateUpdateIds].map(queueEventId) }),
|
||||
orderBy: "id",
|
||||
scanLimit: params.scanLimit,
|
||||
deriveLaneKey: (record) => spooledUpdateLaneKey(record.payload.update, params.botInfo),
|
||||
});
|
||||
if (!claimed) {
|
||||
return null;
|
||||
}
|
||||
const update = parseQueueClaim(params.spoolDir, claimed);
|
||||
if (update) {
|
||||
return update;
|
||||
}
|
||||
await queue.fail(claimed, {
|
||||
reason: "invalid-spooled-update",
|
||||
message: "Telegram spooled update payload was invalid.",
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
export async function releaseTelegramSpooledUpdateClaim(
|
||||
update: ClaimedTelegramSpooledUpdate,
|
||||
options?: { lastError?: string; releasedAt?: number },
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Whatsapp plugin module implements util behavior.
|
||||
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-utility-runtime";
|
||||
|
||||
export function elide(text?: string, limit = 400) {
|
||||
if (!text) {
|
||||
@@ -8,7 +9,8 @@ export function elide(text?: string, limit = 400) {
|
||||
if (text.length <= limit) {
|
||||
return text;
|
||||
}
|
||||
return `${text.slice(0, limit)}… (truncated ${text.length - limit} chars)`;
|
||||
const truncated = truncateUtf16Safe(text, limit);
|
||||
return `${truncated}… (truncated ${text.length - truncated.length} chars)`;
|
||||
}
|
||||
|
||||
export function markWhatsAppVisibleDeliveryError(error: unknown): unknown {
|
||||
|
||||
@@ -365,6 +365,15 @@ describe("web auto-reply util", () => {
|
||||
});
|
||||
|
||||
describe("elide", () => {
|
||||
const hasLoneSurrogate = (value: string): boolean =>
|
||||
Array.from(value).some((char) => {
|
||||
if (char.length !== 1) {
|
||||
return false;
|
||||
}
|
||||
const codeUnit = char.charCodeAt(0);
|
||||
return codeUnit >= 0xd800 && codeUnit <= 0xdfff;
|
||||
});
|
||||
|
||||
it("returns undefined for undefined input", () => {
|
||||
expect(elide(undefined)).toBe(undefined);
|
||||
});
|
||||
@@ -376,6 +385,20 @@ describe("web auto-reply util", () => {
|
||||
it("truncates and annotates when over limit", () => {
|
||||
expect(elide("abcdef", 3)).toBe("abc… (truncated 3 chars)");
|
||||
});
|
||||
|
||||
it("does not split surrogate pairs when the limit lands inside an emoji", () => {
|
||||
const output = elide("😀😀😀", 5);
|
||||
|
||||
expect(output).toBe("😀😀… (truncated 2 chars)");
|
||||
expect(hasLoneSurrogate(output ?? "")).toBe(false);
|
||||
});
|
||||
|
||||
it("keeps a complete astral character when it fits before the limit", () => {
|
||||
const output = elide("ab😀cd", 4);
|
||||
|
||||
expect(output).toBe("ab😀… (truncated 2 chars)");
|
||||
expect(hasLoneSurrogate(output ?? "")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isLikelyWhatsAppCryptoError", () => {
|
||||
|
||||
@@ -288,6 +288,15 @@ export function markdownToWhatsApp(text: string): string {
|
||||
return `${WHATSAPP_INLINE_CODE_PLACEHOLDER}${inlineCodes.length - 1}${WHATSAPP_PLACEHOLDER_TERMINATOR}`;
|
||||
});
|
||||
|
||||
// Convert combined GFM strong+emphasis before plain strong so the plain
|
||||
// rules cannot leave literal `**` around the inner emphasis.
|
||||
result = result.replace(/\*\*\*(.+?)\*\*\*/g, "*_$1_*");
|
||||
result = result.replace(/___(.+?)___/g, "*_$1_*");
|
||||
result = result.replace(/\*\*_(.+?)_\*\*/g, "*_$1_*");
|
||||
result = result.replace(/__\*(.+?)\*__/g, "*_$1_*");
|
||||
result = result.replace(/_\*\*(.+?)\*\*_/g, "*_$1_*");
|
||||
result = result.replace(/\*__(.+?)__\*/g, "*_$1_*");
|
||||
|
||||
result = result.replace(/\*\*(.+?)\*\*/g, "*$1*");
|
||||
result = result.replace(/__(.+?)__/g, "*$1*");
|
||||
result = result.replace(/~~(.+?)~~/g, "~$1~");
|
||||
|
||||
@@ -42,6 +42,17 @@ describe("markdownToWhatsApp", () => {
|
||||
["returns empty string for empty input", "", ""],
|
||||
["returns plain text unchanged", "no formatting here", "no formatting here"],
|
||||
["handles bold inside a sentence", "This is **very** important", "This is *very* important"],
|
||||
["converts GFM ***bold italic*** to WhatsApp bold+italic", "***bi***", "*_bi_*"],
|
||||
["converts GFM __*bold italic*__ to WhatsApp bold+italic", "__*y*__", "*_y_*"],
|
||||
["converts GFM **_bold italic_** to WhatsApp bold+italic", "**_x_**", "*_x_*"],
|
||||
["converts GFM ___bold italic___ to WhatsApp bold+italic", "___z___", "*_z_*"],
|
||||
["converts GFM *__bold italic__* to WhatsApp bold+italic", "*__q__*", "*_q_*"],
|
||||
["converts GFM _**bold italic**_ to WhatsApp bold+italic", "_**r**_", "*_r_*"],
|
||||
[
|
||||
"preserves inline code containing bold-italic markers",
|
||||
"Use `***not bold italic***` here",
|
||||
"Use `***not bold italic***` here",
|
||||
],
|
||||
// Regression: a digit immediately after an inline-code span must not be
|
||||
// absorbed into the placeholder index (which previously dropped both).
|
||||
["preserves inline code immediately followed by a digit", "`a`5", "`a`5"],
|
||||
|
||||
@@ -18,6 +18,7 @@ const providerAuthRuntimeMocks = vi.hoisted(() => ({
|
||||
vi.mock("openclaw/plugin-sdk/provider-auth-runtime", () => providerAuthRuntimeMocks);
|
||||
|
||||
import plugin from "./index.js";
|
||||
import manifest from "./openclaw.plugin.json" with { type: "json" };
|
||||
import { buildLiveXaiProvider } from "./provider-catalog.js";
|
||||
import setupPlugin from "./setup-api.js";
|
||||
import {
|
||||
@@ -82,13 +83,24 @@ describe("xai provider plugin", () => {
|
||||
vi.unstubAllGlobals();
|
||||
});
|
||||
|
||||
it("exposes OAuth and device-code auth choices", async () => {
|
||||
it("exposes xAI OAuth and preserves the explicit device-code alias", async () => {
|
||||
const provider = await registerSingleProviderPlugin(plugin);
|
||||
|
||||
expect(provider.auth?.map((method) => method.id)).toEqual(["api-key", "oauth", "device-code"]);
|
||||
const oauth = provider.auth?.find((method) => method.id === "oauth");
|
||||
expect(oauth?.kind).toBe("oauth");
|
||||
expect(oauth?.wizard?.choiceId).toBe("xai-oauth");
|
||||
const deviceCode = provider.auth?.find((method) => method.id === "device-code");
|
||||
expect(deviceCode?.kind).toBe("device_code");
|
||||
expect(deviceCode?.wizard?.choiceId).toBe("xai-device-code");
|
||||
expect(deviceCode?.wizard?.assistantVisibility).toBe("manual-only");
|
||||
expect(manifest.providerAuthChoices).toContainEqual(
|
||||
expect.objectContaining({
|
||||
assistantVisibility: "manual-only",
|
||||
choiceId: "xai-device-code",
|
||||
method: "device-code",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("filters the xAI API-key catalog against live model ids", async () => {
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user