mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 14:01:24 +08:00
Compare commits
16 Commits
v2026.5.20
...
feat/effec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91e48c8329 | ||
|
|
0f882e6fcf | ||
|
|
16a702a254 | ||
|
|
9fd5939edf | ||
|
|
00c4d06bc7 | ||
|
|
63c9b412bc | ||
|
|
03a5ab9907 | ||
|
|
07ee8f14bf | ||
|
|
414a042cfd | ||
|
|
47d34eeb9b | ||
|
|
740c68eda4 | ||
|
|
8d822214fa | ||
|
|
66a7a56414 | ||
|
|
ffb0f23c6e | ||
|
|
162fe50f1b | ||
|
|
489c0d7a28 |
@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Changes
|
||||
|
||||
- Plugin SDK: add `openclaw/plugin-sdk/effect-runtime` for typed Effect programs, services, retries, and stream bridges, and prove it through the DuckDuckGo plugin runtime.
|
||||
- Providers/xAI: add xAI Grok OAuth login for SuperGrok subscribers, letting `xai/*` models and xAI media/tool providers authenticate without `XAI_API_KEY`.
|
||||
- CLI/cron: add `openclaw cron run --wait` with timeout and poll interval controls, plus exact `cron.runs --run-id` filtering so automation can block on one queued manual run. (#81929) Thanks @ificator.
|
||||
- Maintainer tooling: route Crabbox skill defaults through the repo brokered AWS config, leaving Blacksmith Testbox as an explicit opt-in instead of the broad-proof default.
|
||||
|
||||
@@ -56,6 +56,41 @@ Provider and channel execution paths must use the active runtime config snapshot
|
||||
|
||||
## Reusable runtime utilities
|
||||
|
||||
Use `openclaw/plugin-sdk/effect-runtime` when plugin code needs OpenClaw's
|
||||
pinned Effect integration for typed async programs, injectable services, retry
|
||||
policies, or stream bridges. This subpath keeps plugin code on a supported SDK
|
||||
surface instead of importing host internals or relying on a different Effect
|
||||
version.
|
||||
|
||||
```typescript
|
||||
import {
|
||||
Context,
|
||||
Effect,
|
||||
Layer,
|
||||
runOpenClawEffect,
|
||||
} from "openclaw/plugin-sdk/effect-runtime";
|
||||
|
||||
type SearchRuntime = {
|
||||
now: () => number;
|
||||
};
|
||||
|
||||
const SearchRuntime = Context.GenericTag<SearchRuntime>("my-plugin/SearchRuntime");
|
||||
|
||||
const program = SearchRuntime.pipe(
|
||||
Effect.map((runtime) => ({ startedAt: runtime.now() })),
|
||||
);
|
||||
|
||||
const result = await runOpenClawEffect(
|
||||
program.pipe(
|
||||
Effect.provide(
|
||||
Layer.succeed(SearchRuntime, {
|
||||
now: Date.now,
|
||||
}),
|
||||
),
|
||||
),
|
||||
);
|
||||
```
|
||||
|
||||
Use the channel-turn `botLoopProtection` facts for bot-authored inbound messages. Core applies the shared in-memory sliding-window guard before session record and dispatch, without tying the policy to one channel. The guard tracks `(scopeId, conversationId, participant pair)` keys, counts both directions of a pair together, applies a cooldown once the window budget is exceeded, and prunes inactive entries opportunistically.
|
||||
|
||||
Channel plugins that expose this behavior to operators should prefer the shared `channels.defaults.botLoopProtection` shape for baseline budgets, then layer channel/provider-specific overrides on top. The shared config uses seconds because it is user-facing:
|
||||
|
||||
100
extensions/duckduckgo/src/ddg-client.effect.test.ts
Normal file
100
extensions/duckduckgo/src/ddg-client.effect.test.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
import { Effect } from "openclaw/plugin-sdk/effect-runtime";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { __testing } from "./ddg-client.js";
|
||||
|
||||
type DuckDuckGoSearchRuntimeOverrides = NonNullable<
|
||||
Parameters<typeof __testing.duckDuckGoSearchRuntimeLayer>[0]
|
||||
>;
|
||||
type DuckDuckGoRunEndpoint = NonNullable<DuckDuckGoSearchRuntimeOverrides["runEndpoint"]>;
|
||||
|
||||
function resultHtml(params: { title: string; url: string; snippet: string }) {
|
||||
return `
|
||||
<a class="result__a" href="${params.url}">${params.title}</a>
|
||||
<a class="result__snippet">${params.snippet}</a>
|
||||
`;
|
||||
}
|
||||
|
||||
describe("duckduckgo effect runtime", () => {
|
||||
beforeEach(() => {
|
||||
__testing.DDG_SEARCH_CACHE.clear();
|
||||
});
|
||||
|
||||
it("runs the search through an injectable Effect runtime and caches payloads", async () => {
|
||||
const cache: NonNullable<DuckDuckGoSearchRuntimeOverrides["cache"]> = new Map();
|
||||
const now = vi.fn().mockReturnValueOnce(1_000).mockReturnValueOnce(1_037);
|
||||
const runEndpointMock = vi.fn();
|
||||
const runEndpoint: DuckDuckGoRunEndpoint = async (request, run) => {
|
||||
runEndpointMock(request);
|
||||
expect(request.timeoutSeconds).toBe(12);
|
||||
expect(new URL(request.url).searchParams.get("q")).toBe("openclaw effect");
|
||||
expect(new URL(request.url).searchParams.get("kl")).toBe("us-en");
|
||||
expect(new URL(request.url).searchParams.get("kp")).toBe("-2");
|
||||
return await run(
|
||||
new Response(
|
||||
resultHtml({
|
||||
title: "OpenClaw & Effect",
|
||||
url: "https://duckduckgo.com/l/?uddg=https%3A%2F%2Fdocs.openclaw.ai%2F",
|
||||
snippet: "Typed effects & plugin runtime",
|
||||
}),
|
||||
{ status: 200 },
|
||||
),
|
||||
);
|
||||
};
|
||||
const runtime = __testing.duckDuckGoSearchRuntimeLayer({
|
||||
cache,
|
||||
now,
|
||||
runEndpoint,
|
||||
});
|
||||
const search = {
|
||||
query: "openclaw effect",
|
||||
count: 3,
|
||||
region: "us-en",
|
||||
safeSearch: "off" as const,
|
||||
timeoutSeconds: 12,
|
||||
};
|
||||
|
||||
const first = await Effect.runPromise(
|
||||
__testing.runDuckDuckGoSearchEffect(search).pipe(Effect.provide(runtime)),
|
||||
);
|
||||
const second = await Effect.runPromise(
|
||||
__testing.runDuckDuckGoSearchEffect(search).pipe(Effect.provide(runtime)),
|
||||
);
|
||||
|
||||
expect(runEndpointMock).toHaveBeenCalledOnce();
|
||||
expect(first).toMatchObject({
|
||||
count: 1,
|
||||
provider: "duckduckgo",
|
||||
query: "openclaw effect",
|
||||
tookMs: 37,
|
||||
});
|
||||
expect(first.results).toEqual([
|
||||
{
|
||||
title: expect.stringContaining("OpenClaw & Effect"),
|
||||
url: "https://docs.openclaw.ai/",
|
||||
snippet: expect.stringContaining("Typed effects & plugin runtime"),
|
||||
siteName: "docs.openclaw.ai",
|
||||
},
|
||||
]);
|
||||
expect(second).toMatchObject({
|
||||
cached: true,
|
||||
count: 1,
|
||||
provider: "duckduckgo",
|
||||
});
|
||||
});
|
||||
|
||||
it("fails through the Effect error channel for bot challenges", async () => {
|
||||
const runtime = __testing.duckDuckGoSearchRuntimeLayer({
|
||||
cache: new Map(),
|
||||
runEndpoint: async (_request, run) =>
|
||||
await run(new Response('<form id="challenge-form">Are you a human?</form>')),
|
||||
});
|
||||
|
||||
await expect(
|
||||
Effect.runPromise(
|
||||
__testing
|
||||
.runDuckDuckGoSearchEffect({ query: "openclaw" })
|
||||
.pipe(Effect.provide(runtime)),
|
||||
),
|
||||
).rejects.toThrow("DuckDuckGo returned a bot-detection challenge.");
|
||||
});
|
||||
});
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
|
||||
import { Context, Effect, Layer } from "openclaw/plugin-sdk/effect-runtime";
|
||||
import {
|
||||
DEFAULT_CACHE_TTL_MINUTES,
|
||||
DEFAULT_SEARCH_COUNT,
|
||||
@@ -28,6 +29,43 @@ const DDG_SEARCH_CACHE = new Map<
|
||||
{ value: Record<string, unknown>; insertedAt: number; expiresAt: number }
|
||||
>();
|
||||
|
||||
type DuckDuckGoEndpointRequest = {
|
||||
url: string;
|
||||
timeoutSeconds: number;
|
||||
init: RequestInit;
|
||||
signal?: AbortSignal;
|
||||
};
|
||||
|
||||
type DuckDuckGoSearchRuntime = {
|
||||
cache: typeof DDG_SEARCH_CACHE;
|
||||
now: () => number;
|
||||
runEndpoint: <T>(
|
||||
request: DuckDuckGoEndpointRequest,
|
||||
run: (response: Response) => Promise<T>,
|
||||
) => Promise<T>;
|
||||
};
|
||||
|
||||
const DuckDuckGoSearchRuntimeTag = Context.GenericTag<DuckDuckGoSearchRuntime>(
|
||||
"openclaw/duckduckgo/SearchRuntime",
|
||||
);
|
||||
|
||||
function createDefaultDuckDuckGoSearchRuntime(): DuckDuckGoSearchRuntime {
|
||||
return {
|
||||
cache: DDG_SEARCH_CACHE,
|
||||
now: Date.now,
|
||||
runEndpoint: withTrustedWebSearchEndpoint,
|
||||
};
|
||||
}
|
||||
|
||||
function duckDuckGoSearchRuntimeLayer(
|
||||
runtime?: Partial<DuckDuckGoSearchRuntime>,
|
||||
): Layer.Layer<DuckDuckGoSearchRuntime> {
|
||||
return Layer.succeed(DuckDuckGoSearchRuntimeTag, {
|
||||
...createDefaultDuckDuckGoSearchRuntime(),
|
||||
...runtime,
|
||||
});
|
||||
}
|
||||
|
||||
type DuckDuckGoResult = {
|
||||
title: string;
|
||||
url: string;
|
||||
@@ -121,92 +159,116 @@ export async function runDuckDuckGoSearch(params: {
|
||||
timeoutSeconds?: number;
|
||||
cacheTtlMinutes?: number;
|
||||
}): Promise<Record<string, unknown>> {
|
||||
const count = resolveSearchCount(params.count, DEFAULT_SEARCH_COUNT);
|
||||
const region = params.region ?? resolveDdgRegion(params.config);
|
||||
const safeSearch =
|
||||
params.safeSearch === "strict" ||
|
||||
params.safeSearch === "moderate" ||
|
||||
params.safeSearch === "off"
|
||||
? params.safeSearch
|
||||
: resolveDdgSafeSearch(params.config);
|
||||
const timeoutSeconds = resolveTimeoutSeconds(params.timeoutSeconds, DEFAULT_TIMEOUT_SECONDS);
|
||||
const cacheTtlMs = resolveCacheTtlMs(params.cacheTtlMinutes, DEFAULT_CACHE_TTL_MINUTES);
|
||||
const cacheKey = normalizeCacheKey(
|
||||
JSON.stringify({
|
||||
provider: "duckduckgo",
|
||||
query: params.query,
|
||||
count,
|
||||
region: region ?? "",
|
||||
safeSearch,
|
||||
return await Effect.runPromise(
|
||||
runDuckDuckGoSearchEffect(params).pipe(Effect.provide(duckDuckGoSearchRuntimeLayer())),
|
||||
);
|
||||
}
|
||||
|
||||
function runDuckDuckGoSearchEffect(params: {
|
||||
config?: OpenClawConfig;
|
||||
query: string;
|
||||
count?: number;
|
||||
region?: string;
|
||||
safeSearch?: DdgSafeSearch;
|
||||
timeoutSeconds?: number;
|
||||
cacheTtlMinutes?: number;
|
||||
}): Effect.Effect<Record<string, unknown>, unknown, DuckDuckGoSearchRuntime> {
|
||||
return Effect.flatMap(DuckDuckGoSearchRuntimeTag, (runtime) =>
|
||||
Effect.tryPromise({
|
||||
try: async () => {
|
||||
const count = resolveSearchCount(params.count, DEFAULT_SEARCH_COUNT);
|
||||
const region = params.region ?? resolveDdgRegion(params.config);
|
||||
const safeSearch =
|
||||
params.safeSearch === "strict" ||
|
||||
params.safeSearch === "moderate" ||
|
||||
params.safeSearch === "off"
|
||||
? params.safeSearch
|
||||
: resolveDdgSafeSearch(params.config);
|
||||
const timeoutSeconds = resolveTimeoutSeconds(params.timeoutSeconds, DEFAULT_TIMEOUT_SECONDS);
|
||||
const cacheTtlMs = resolveCacheTtlMs(params.cacheTtlMinutes, DEFAULT_CACHE_TTL_MINUTES);
|
||||
const cacheKey = normalizeCacheKey(
|
||||
JSON.stringify({
|
||||
provider: "duckduckgo",
|
||||
query: params.query,
|
||||
count,
|
||||
region: region ?? "",
|
||||
safeSearch,
|
||||
}),
|
||||
);
|
||||
const cached = readCache(runtime.cache, cacheKey);
|
||||
if (cached) {
|
||||
return { ...cached.value, cached: true };
|
||||
}
|
||||
|
||||
const url = new URL(DDG_HTML_ENDPOINT);
|
||||
url.searchParams.set("q", params.query);
|
||||
if (region) {
|
||||
url.searchParams.set("kl", region);
|
||||
}
|
||||
url.searchParams.set("kp", DDG_SAFE_SEARCH_PARAM[safeSearch]);
|
||||
|
||||
const startedAt = runtime.now();
|
||||
const results = await runtime.runEndpoint(
|
||||
{
|
||||
url: url.toString(),
|
||||
timeoutSeconds,
|
||||
init: {
|
||||
method: "GET",
|
||||
headers: {
|
||||
"User-Agent":
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
||||
},
|
||||
},
|
||||
},
|
||||
async (response) => {
|
||||
if (!response.ok) {
|
||||
const detail = (await readResponseText(response, { maxBytes: 64_000 })).text;
|
||||
throw new Error(
|
||||
`DuckDuckGo search error (${response.status}): ${detail || response.statusText}`,
|
||||
);
|
||||
}
|
||||
|
||||
const html = await response.text();
|
||||
if (isBotChallenge(html)) {
|
||||
throw new Error("DuckDuckGo returned a bot-detection challenge.");
|
||||
}
|
||||
return parseDuckDuckGoHtml(html).slice(0, count);
|
||||
},
|
||||
);
|
||||
|
||||
const payload = {
|
||||
query: params.query,
|
||||
provider: "duckduckgo",
|
||||
count: results.length,
|
||||
tookMs: runtime.now() - startedAt,
|
||||
externalContent: {
|
||||
untrusted: true,
|
||||
source: "web_search",
|
||||
provider: "duckduckgo",
|
||||
wrapped: true,
|
||||
},
|
||||
results: results.map((result) => ({
|
||||
title: wrapWebContent(result.title, "web_search"),
|
||||
url: result.url,
|
||||
snippet: result.snippet ? wrapWebContent(result.snippet, "web_search") : "",
|
||||
siteName: resolveSiteName(result.url) || undefined,
|
||||
})),
|
||||
} satisfies Record<string, unknown>;
|
||||
|
||||
writeCache(runtime.cache, cacheKey, payload, cacheTtlMs);
|
||||
return payload;
|
||||
},
|
||||
catch: (error) => error,
|
||||
}),
|
||||
);
|
||||
const cached = readCache(DDG_SEARCH_CACHE, cacheKey);
|
||||
if (cached) {
|
||||
return { ...cached.value, cached: true };
|
||||
}
|
||||
|
||||
const url = new URL(DDG_HTML_ENDPOINT);
|
||||
url.searchParams.set("q", params.query);
|
||||
if (region) {
|
||||
url.searchParams.set("kl", region);
|
||||
}
|
||||
url.searchParams.set("kp", DDG_SAFE_SEARCH_PARAM[safeSearch]);
|
||||
|
||||
const startedAt = Date.now();
|
||||
const results = await withTrustedWebSearchEndpoint(
|
||||
{
|
||||
url: url.toString(),
|
||||
timeoutSeconds,
|
||||
init: {
|
||||
method: "GET",
|
||||
headers: {
|
||||
"User-Agent":
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
||||
},
|
||||
},
|
||||
},
|
||||
async (response) => {
|
||||
if (!response.ok) {
|
||||
const detail = (await readResponseText(response, { maxBytes: 64_000 })).text;
|
||||
throw new Error(
|
||||
`DuckDuckGo search error (${response.status}): ${detail || response.statusText}`,
|
||||
);
|
||||
}
|
||||
|
||||
const html = await response.text();
|
||||
if (isBotChallenge(html)) {
|
||||
throw new Error("DuckDuckGo returned a bot-detection challenge.");
|
||||
}
|
||||
return parseDuckDuckGoHtml(html).slice(0, count);
|
||||
},
|
||||
);
|
||||
|
||||
const payload = {
|
||||
query: params.query,
|
||||
provider: "duckduckgo",
|
||||
count: results.length,
|
||||
tookMs: Date.now() - startedAt,
|
||||
externalContent: {
|
||||
untrusted: true,
|
||||
source: "web_search",
|
||||
provider: "duckduckgo",
|
||||
wrapped: true,
|
||||
},
|
||||
results: results.map((result) => ({
|
||||
title: wrapWebContent(result.title, "web_search"),
|
||||
url: result.url,
|
||||
snippet: result.snippet ? wrapWebContent(result.snippet, "web_search") : "",
|
||||
siteName: resolveSiteName(result.url) || undefined,
|
||||
})),
|
||||
} satisfies Record<string, unknown>;
|
||||
|
||||
writeCache(DDG_SEARCH_CACHE, cacheKey, payload, cacheTtlMs);
|
||||
return payload;
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
DDG_SEARCH_CACHE,
|
||||
decodeDuckDuckGoUrl,
|
||||
decodeHtmlEntities,
|
||||
duckDuckGoSearchRuntimeLayer,
|
||||
isBotChallenge,
|
||||
parseDuckDuckGoHtml,
|
||||
runDuckDuckGoSearchEffect,
|
||||
};
|
||||
|
||||
@@ -132,6 +132,10 @@
|
||||
"types": "./dist/plugin-sdk/runtime.d.ts",
|
||||
"default": "./dist/plugin-sdk/runtime.js"
|
||||
},
|
||||
"./plugin-sdk/effect-runtime": {
|
||||
"types": "./dist/plugin-sdk/effect-runtime.d.ts",
|
||||
"default": "./dist/plugin-sdk/effect-runtime.js"
|
||||
},
|
||||
"./plugin-sdk/runtime-doctor": {
|
||||
"types": "./dist/plugin-sdk/runtime-doctor.d.ts",
|
||||
"default": "./dist/plugin-sdk/runtime-doctor.js"
|
||||
@@ -1773,6 +1777,7 @@
|
||||
"commander": "14.0.3",
|
||||
"croner": "10.0.1",
|
||||
"dotenv": "17.4.2",
|
||||
"effect": "3.21.2",
|
||||
"express": "5.2.1",
|
||||
"file-type": "22.0.1",
|
||||
"grammy": "1.42.0",
|
||||
@@ -1793,8 +1798,8 @@
|
||||
"tokenjuice": "0.7.0",
|
||||
"tree-sitter-bash": "0.25.1",
|
||||
"tslog": "4.10.2",
|
||||
"typescript": "6.0.3",
|
||||
"typebox": "1.1.38",
|
||||
"typescript": "6.0.3",
|
||||
"undici": "8.3.0",
|
||||
"web-push": "3.6.7",
|
||||
"web-tree-sitter": "0.26.8",
|
||||
|
||||
1
packages/plugin-sdk/src/effect-runtime.ts
Normal file
1
packages/plugin-sdk/src/effect-runtime.ts
Normal file
@@ -0,0 +1 @@
|
||||
export * from "../../../src/plugin-sdk/effect-runtime.js";
|
||||
24
pnpm-lock.yaml
generated
24
pnpm-lock.yaml
generated
@@ -107,6 +107,9 @@ importers:
|
||||
dotenv:
|
||||
specifier: 17.4.2
|
||||
version: 17.4.2
|
||||
effect:
|
||||
specifier: 3.21.2
|
||||
version: 3.21.2
|
||||
express:
|
||||
specifier: 5.2.1
|
||||
version: 5.2.1
|
||||
@@ -5217,6 +5220,9 @@ packages:
|
||||
ee-first@1.1.1:
|
||||
resolution: {integrity: sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==}
|
||||
|
||||
effect@3.21.2:
|
||||
resolution: {integrity: sha512-rXd2FGDM8KdjSIrc+mqEELo7ScW7xTVxEf1iInmPSpIde9/nyGuFM710cjTo7/EreGXiUX2MOonPpprbz2XHCg==}
|
||||
|
||||
emoji-regex@8.0.0:
|
||||
resolution: {integrity: sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==}
|
||||
|
||||
@@ -5381,6 +5387,10 @@ packages:
|
||||
resolution: {integrity: sha512-CGnyrvbhPlWYMngksqrSSUT1BAVP49dZocrHuK0SvtR0D5TMs5wP0o3j7jexDJW01KSadjBp1M/71o/KR3nD1w==}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
fast-check@3.23.2:
|
||||
resolution: {integrity: sha512-h5+1OzzfCC3Ef7VbtKdcv7zsstUQwUDlYpUTvjeUsJAssPgLn7QzbboPtL5ro04Mq0rPOsMzl7q5hIbRs2wD1A==}
|
||||
engines: {node: '>=8.0.0'}
|
||||
|
||||
fast-deep-equal@3.1.3:
|
||||
resolution: {integrity: sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==}
|
||||
|
||||
@@ -6809,6 +6819,9 @@ packages:
|
||||
resolution: {integrity: sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==}
|
||||
engines: {node: '>=6'}
|
||||
|
||||
pure-rand@6.1.0:
|
||||
resolution: {integrity: sha512-bVWawvoZoBYpp6yIoQtQXHZjmz35RSVHnUOTefl8Vcjr8snTPY1wnpSPMWekcFwbxI6gtmT7rSYPFvz71ldiOA==}
|
||||
|
||||
qified@0.10.1:
|
||||
resolution: {integrity: sha512-+Owyggi9IxT1ePKGafcI87ubSmxol6smwJ+RAHDQlx9+9cPwFWDiKFFCPuWhr9ignlGpZ9vDQLw67N4dcTVFEA==}
|
||||
engines: {node: '>=20'}
|
||||
@@ -11589,6 +11602,11 @@ snapshots:
|
||||
|
||||
ee-first@1.1.1: {}
|
||||
|
||||
effect@3.21.2:
|
||||
dependencies:
|
||||
'@standard-schema/spec': 1.1.0
|
||||
fast-check: 3.23.2
|
||||
|
||||
emoji-regex@8.0.0: {}
|
||||
|
||||
empathic@2.0.1: {}
|
||||
@@ -11806,6 +11824,10 @@ snapshots:
|
||||
|
||||
fake-indexeddb@6.2.5: {}
|
||||
|
||||
fast-check@3.23.2:
|
||||
dependencies:
|
||||
pure-rand: 6.1.0
|
||||
|
||||
fast-deep-equal@3.1.3: {}
|
||||
|
||||
fast-fifo@1.3.2: {}
|
||||
@@ -13598,6 +13620,8 @@ snapshots:
|
||||
|
||||
punycode@2.3.1: {}
|
||||
|
||||
pure-rand@6.1.0: {}
|
||||
|
||||
qified@0.10.1:
|
||||
dependencies:
|
||||
hookified: 2.2.0
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
"self-hosted-provider-setup",
|
||||
"routing",
|
||||
"runtime",
|
||||
"effect-runtime",
|
||||
"runtime-doctor",
|
||||
"runtime-env",
|
||||
"runtime-logger",
|
||||
|
||||
@@ -259,6 +259,26 @@ describe("executeWithApiKeyRotation", () => {
|
||||
expect(sleep).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("throws the original rate-limit error after exhausting rotated keys", async () => {
|
||||
const firstError = new Error("HTTP 429 too many requests on key 1");
|
||||
const secondError = new Error("HTTP 429 too many requests on key 2");
|
||||
const execute = vi
|
||||
.fn<(apiKey: string) => Promise<string>>()
|
||||
.mockRejectedValueOnce(firstError)
|
||||
.mockRejectedValueOnce(secondError);
|
||||
|
||||
await expect(
|
||||
executeWithApiKeyRotation({
|
||||
provider: "openai",
|
||||
apiKeys: ["key-1", "key-2"],
|
||||
transientRetry: { attempts: 2, baseDelayMs: 0, maxDelayMs: 0 },
|
||||
execute,
|
||||
}),
|
||||
).rejects.toBe(secondError);
|
||||
|
||||
expect(execute).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("does not rotate keys for transient 500 after same-key retry exhaustion", async () => {
|
||||
const sleep = vi.fn(async () => undefined);
|
||||
const execute = vi.fn(async () => {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { runRetryingPromise } from "../effect-runtime/retry.js";
|
||||
import { sleepWithAbort } from "../infra/backoff.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import {
|
||||
@@ -9,6 +10,16 @@ import {
|
||||
} from "../provider-runtime/operation-retry.js";
|
||||
import { collectProviderApiKeys, isApiKeyRateLimitError } from "./live-auth-keys.js";
|
||||
|
||||
class RotateApiKeyError extends Error {
|
||||
constructor(
|
||||
readonly error: unknown,
|
||||
readonly messageForRetry: string,
|
||||
) {
|
||||
super(messageForRetry);
|
||||
this.name = "RotateApiKeyError";
|
||||
}
|
||||
}
|
||||
|
||||
type ApiKeyRetryParams = {
|
||||
apiKey: string;
|
||||
error: unknown;
|
||||
@@ -56,46 +67,66 @@ export async function executeWithApiKeyRotation<T>(
|
||||
|
||||
let lastError: unknown;
|
||||
const transientRetry = resolveTransientProviderRetryOptions(params.transientRetry);
|
||||
keyLoop: for (let apiKeyIndex = 0; apiKeyIndex < keys.length; apiKeyIndex += 1) {
|
||||
for (let apiKeyIndex = 0; apiKeyIndex < keys.length; apiKeyIndex += 1) {
|
||||
const apiKey = keys[apiKeyIndex];
|
||||
const maxOperationAttempts = resolveTransientProviderAttempts(transientRetry);
|
||||
for (let attemptNumber = 1; attemptNumber <= maxOperationAttempts; attemptNumber += 1) {
|
||||
try {
|
||||
return await params.execute(apiKey);
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
const message = formatErrorMessage(error);
|
||||
const rotateKey = params.shouldRetry
|
||||
? params.shouldRetry({ apiKey, error, attempt: apiKeyIndex, message })
|
||||
: isApiKeyRateLimitError(message);
|
||||
try {
|
||||
return await runRetryingPromise({
|
||||
operation: async () => {
|
||||
try {
|
||||
return await params.execute(apiKey);
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
const message = formatErrorMessage(error);
|
||||
const rotateKey = params.shouldRetry
|
||||
? params.shouldRetry({ apiKey, error, attempt: apiKeyIndex, message })
|
||||
: isApiKeyRateLimitError(message);
|
||||
|
||||
if (rotateKey) {
|
||||
if (apiKeyIndex + 1 >= keys.length) {
|
||||
break;
|
||||
if (rotateKey) {
|
||||
throw new RotateApiKeyError(error, message);
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
params.onRetry?.({ apiKey, error, attempt: apiKeyIndex, message });
|
||||
break;
|
||||
}
|
||||
|
||||
if (
|
||||
!transientRetry ||
|
||||
!shouldRetrySameKeyProviderOperation({
|
||||
},
|
||||
maxAttempts: maxOperationAttempts,
|
||||
shouldRetry: (error, attemptNumber) => {
|
||||
if (!transientRetry || error instanceof RotateApiKeyError) {
|
||||
return false;
|
||||
}
|
||||
return shouldRetrySameKeyProviderOperation({
|
||||
options: transientRetry,
|
||||
error,
|
||||
message,
|
||||
message: formatErrorMessage(error),
|
||||
provider: params.provider,
|
||||
apiKeyIndex,
|
||||
attemptNumber,
|
||||
maxAttempts: maxOperationAttempts,
|
||||
})
|
||||
) {
|
||||
break keyLoop;
|
||||
});
|
||||
},
|
||||
resolveDelayMs: (attemptNumber) =>
|
||||
transientRetry ? resolveTransientProviderDelayMs(transientRetry, attemptNumber) : 0,
|
||||
sleep: async (delayMs) => {
|
||||
const sleep = transientRetry?.sleep ?? sleepWithAbort;
|
||||
await sleep(delayMs, transientRetry?.signal);
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
if (error instanceof RotateApiKeyError) {
|
||||
lastError = error.error;
|
||||
if (apiKeyIndex + 1 >= keys.length) {
|
||||
break;
|
||||
}
|
||||
|
||||
const delayMs = resolveTransientProviderDelayMs(transientRetry, attemptNumber);
|
||||
const sleep = transientRetry.sleep ?? sleepWithAbort;
|
||||
await sleep(delayMs, transientRetry.signal);
|
||||
params.onRetry?.({
|
||||
apiKey,
|
||||
error: error.error,
|
||||
attempt: apiKeyIndex,
|
||||
message: error.messageForRetry,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
lastError = error;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import {
|
||||
expectBareNewOrResetAcknowledged,
|
||||
withTempHome,
|
||||
} from "../../test/helpers/auto-reply/trigger-handling-test-harness.js";
|
||||
import { loadSessionStore, resolveSessionKey } from "../config/sessions.js";
|
||||
import { loadSessionStore } from "../config/sessions.js";
|
||||
import { registerGroupIntroPromptCases } from "./reply.triggers.group-intro-prompts.cases.js";
|
||||
import { registerTriggerHandlingUsageSummaryCases } from "./reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.cases.js";
|
||||
import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./reply/queue.js";
|
||||
@@ -537,6 +537,7 @@ describe("trigger handling", () => {
|
||||
const storePath = join(home, "compact-main.sessions.json");
|
||||
const cfg = makeCfg(home);
|
||||
cfg.session = { ...cfg.session, store: storePath };
|
||||
await seedTargetSession(storePath, MAIN_SESSION_KEY);
|
||||
mockSuccessfulCompaction();
|
||||
|
||||
const request = {
|
||||
@@ -557,7 +558,7 @@ describe("trigger handling", () => {
|
||||
expect(text?.startsWith("⚙️ Compacted")).toBe(true);
|
||||
expect(getCompactEmbeddedPiSessionMock()).toHaveBeenCalledOnce();
|
||||
const store = loadSessionStore(storePath);
|
||||
const sessionKey = resolveSessionKey("per-sender", request);
|
||||
const sessionKey = MAIN_SESSION_KEY;
|
||||
expect(store[sessionKey]?.compactionCount).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import fs from "node:fs";
|
||||
import readline from "node:readline";
|
||||
|
||||
import {
|
||||
asyncIterableStream,
|
||||
openClawStreamToAsyncIterable,
|
||||
} from "../../effect-runtime/stream.js";
|
||||
|
||||
// Shared streaming helpers for JSONL session transcripts.
|
||||
//
|
||||
// Callers historically read the entire transcript with `fs.readFile` before
|
||||
@@ -50,7 +55,7 @@ export async function* streamSessionTranscriptLines(
|
||||
const stream = fs.createReadStream(filePath, { encoding: "utf-8" });
|
||||
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
|
||||
try {
|
||||
for await (const line of rl) {
|
||||
for await (const line of openClawStreamToAsyncIterable(asyncIterableStream(rl))) {
|
||||
if (options.signal?.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
41
src/effect-runtime/index.ts
Normal file
41
src/effect-runtime/index.ts
Normal file
@@ -0,0 +1,41 @@
|
||||
import { Effect, Either } from "effect";
|
||||
|
||||
export type OpenClawEffect<A, E = never, R = never> = Effect.Effect<A, E, R>;
|
||||
|
||||
export function promiseEffect<A, E = unknown>(params: {
|
||||
try: (signal: AbortSignal) => PromiseLike<A>;
|
||||
catch?: (error: unknown) => E;
|
||||
}): OpenClawEffect<A, E> {
|
||||
return Effect.tryPromise({
|
||||
try: params.try,
|
||||
catch: params.catch ?? ((error) => error as E),
|
||||
});
|
||||
}
|
||||
|
||||
export function syncEffect<A, E = unknown>(params: {
|
||||
try: () => A;
|
||||
catch?: (error: unknown) => E;
|
||||
}): OpenClawEffect<A, E> {
|
||||
return Effect.try({
|
||||
try: params.try,
|
||||
catch: params.catch ?? ((error) => error as E),
|
||||
});
|
||||
}
|
||||
|
||||
export async function runOpenClawEffect<A, E>(
|
||||
effect: OpenClawEffect<A, E>,
|
||||
): Promise<A> {
|
||||
const result = await Effect.runPromise(Effect.either(effect));
|
||||
if (Either.isLeft(result)) {
|
||||
throw result.left;
|
||||
}
|
||||
return result.right;
|
||||
}
|
||||
|
||||
export function runOpenClawEffectSync<A, E>(effect: OpenClawEffect<A, E>): A {
|
||||
const result = Effect.runSync(Effect.either(effect));
|
||||
if (Either.isLeft(result)) {
|
||||
throw result.left;
|
||||
}
|
||||
return result.right;
|
||||
}
|
||||
87
src/effect-runtime/retry.test.ts
Normal file
87
src/effect-runtime/retry.test.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
import { Effect } from "effect";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { runOpenClawEffect, syncEffect } from "./index.js";
|
||||
import { runRetryingPromise } from "./retry.js";
|
||||
|
||||
describe("effect runtime kernel", () => {
|
||||
it("rethrows typed failures without exposing Effect fiber wrappers", async () => {
|
||||
const error = new Error("typed failure");
|
||||
|
||||
await expect(runOpenClawEffect(Effect.fail(error))).rejects.toBe(error);
|
||||
});
|
||||
|
||||
it("maps thrown sync exceptions into typed failures", async () => {
|
||||
const error = new Error("sync failure");
|
||||
|
||||
await expect(
|
||||
runOpenClawEffect(
|
||||
syncEffect({
|
||||
try: () => {
|
||||
throw error;
|
||||
},
|
||||
catch: (err) => err,
|
||||
}),
|
||||
),
|
||||
).rejects.toBe(error);
|
||||
});
|
||||
|
||||
it("retries failed promise operations through the internal Effect runtime", async () => {
|
||||
const sleep = vi.fn(async () => undefined);
|
||||
const operation = vi
|
||||
.fn<() => Promise<string>>()
|
||||
.mockRejectedValueOnce(new Error("HTTP 503"))
|
||||
.mockResolvedValueOnce("ok");
|
||||
|
||||
await expect(
|
||||
runRetryingPromise({
|
||||
operation,
|
||||
maxAttempts: 2,
|
||||
shouldRetry: () => true,
|
||||
resolveDelayMs: () => 25,
|
||||
sleep,
|
||||
}),
|
||||
).resolves.toBe("ok");
|
||||
|
||||
expect(operation).toHaveBeenCalledTimes(2);
|
||||
expect(sleep).toHaveBeenCalledWith(25);
|
||||
});
|
||||
|
||||
it("does not sleep after retry exhaustion", async () => {
|
||||
const sleep = vi.fn(async () => undefined);
|
||||
const error = new Error("HTTP 503");
|
||||
const operation = vi.fn<() => Promise<string>>().mockRejectedValue(error);
|
||||
|
||||
await expect(
|
||||
runRetryingPromise({
|
||||
operation,
|
||||
maxAttempts: 1,
|
||||
shouldRetry: () => true,
|
||||
resolveDelayMs: () => 25,
|
||||
sleep,
|
||||
}),
|
||||
).rejects.toBe(error);
|
||||
|
||||
expect(operation).toHaveBeenCalledTimes(1);
|
||||
expect(sleep).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("stops retrying when the predicate rejects the failure", async () => {
|
||||
const sleep = vi.fn(async () => undefined);
|
||||
const error = new Error("validation failed");
|
||||
const operation = vi.fn<() => Promise<string>>().mockRejectedValue(error);
|
||||
|
||||
await expect(
|
||||
runRetryingPromise({
|
||||
operation,
|
||||
maxAttempts: 3,
|
||||
shouldRetry: () => false,
|
||||
resolveDelayMs: () => 25,
|
||||
sleep,
|
||||
}),
|
||||
).rejects.toBe(error);
|
||||
|
||||
expect(operation).toHaveBeenCalledTimes(1);
|
||||
expect(sleep).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
57
src/effect-runtime/retry.ts
Normal file
57
src/effect-runtime/retry.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import { Effect, Schedule, ScheduleDecision, ScheduleInterval } from "effect";
|
||||
|
||||
import { promiseEffect, runOpenClawEffect, type OpenClawEffect } from "./index.js";
|
||||
|
||||
export type RetryingPromiseParams<T> = {
|
||||
operation: () => Promise<T>;
|
||||
maxAttempts: number;
|
||||
shouldRetry: (error: unknown, attemptNumber: number) => boolean;
|
||||
resolveDelayMs: (attemptNumber: number) => number;
|
||||
sleep: (delayMs: number) => Promise<void>;
|
||||
};
|
||||
|
||||
function retryDecisionSchedule(
|
||||
params: Pick<RetryingPromiseParams<unknown>, "maxAttempts" | "shouldRetry">,
|
||||
): Schedule.Schedule<number> {
|
||||
return Schedule.makeWithState(1, (now, error, attemptNumber) => {
|
||||
const shouldRetry =
|
||||
attemptNumber < params.maxAttempts && params.shouldRetry(error, attemptNumber);
|
||||
return Effect.succeed([
|
||||
attemptNumber + 1,
|
||||
attemptNumber,
|
||||
shouldRetry
|
||||
? ScheduleDecision.continueWith(ScheduleInterval.after(now))
|
||||
: ScheduleDecision.done,
|
||||
] as const);
|
||||
});
|
||||
}
|
||||
|
||||
function runAttempt<T>(
|
||||
params: RetryingPromiseParams<T>,
|
||||
retryDriver: Schedule.ScheduleDriver<number>,
|
||||
): OpenClawEffect<T, unknown> {
|
||||
return Effect.suspend(() =>
|
||||
promiseEffect({
|
||||
try: () => params.operation(),
|
||||
}).pipe(
|
||||
Effect.catchAll((error) => {
|
||||
return retryDriver.next(error).pipe(
|
||||
Effect.catchAll(() => Effect.fail(error)),
|
||||
Effect.flatMap((attemptNumber) =>
|
||||
promiseEffect({
|
||||
try: () => params.sleep(params.resolveDelayMs(attemptNumber)),
|
||||
}).pipe(Effect.flatMap(() => runAttempt(params, retryDriver))),
|
||||
),
|
||||
);
|
||||
}),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
export async function runRetryingPromise<T>(params: RetryingPromiseParams<T>): Promise<T> {
|
||||
return await runOpenClawEffect(
|
||||
Schedule.driver(retryDecisionSchedule(params)).pipe(
|
||||
Effect.flatMap((retryDriver) => runAttempt(params, retryDriver)),
|
||||
),
|
||||
);
|
||||
}
|
||||
37
src/effect-runtime/stream.test.ts
Normal file
37
src/effect-runtime/stream.test.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { asyncIterableStream, openClawStreamToAsyncIterable } from "./stream.js";
|
||||
|
||||
async function collect<T>(iterable: AsyncIterable<T>): Promise<T[]> {
|
||||
const values: T[] = [];
|
||||
for await (const value of iterable) {
|
||||
values.push(value);
|
||||
}
|
||||
return values;
|
||||
}
|
||||
|
||||
describe("Effect stream bridge", () => {
|
||||
it("round-trips async iterable values", async () => {
|
||||
async function* source() {
|
||||
yield "alpha";
|
||||
yield "beta";
|
||||
}
|
||||
|
||||
await expect(
|
||||
collect(openClawStreamToAsyncIterable(asyncIterableStream(source()))),
|
||||
).resolves.toEqual(["alpha", "beta"]);
|
||||
});
|
||||
|
||||
it("maps async iterable failures before rethrowing them", async () => {
|
||||
const original = new Error("source failed");
|
||||
const mapped = new Error("mapped source failed");
|
||||
async function* source() {
|
||||
yield "alpha";
|
||||
throw original;
|
||||
}
|
||||
|
||||
await expect(
|
||||
collect(openClawStreamToAsyncIterable(asyncIterableStream(source(), () => mapped))),
|
||||
).rejects.toBe(mapped);
|
||||
});
|
||||
});
|
||||
16
src/effect-runtime/stream.ts
Normal file
16
src/effect-runtime/stream.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
import { Stream } from "effect";
|
||||
|
||||
export type OpenClawStream<A, E = unknown, R = never> = Stream.Stream<A, E, R>;
|
||||
|
||||
export function asyncIterableStream<A, E = unknown>(
|
||||
iterable: AsyncIterable<A>,
|
||||
onError: (error: unknown) => E = (error) => error as E,
|
||||
): OpenClawStream<A, E> {
|
||||
return Stream.fromAsyncIterable(iterable, onError);
|
||||
}
|
||||
|
||||
export function openClawStreamToAsyncIterable<A, E>(
|
||||
stream: OpenClawStream<A, E>,
|
||||
): AsyncIterable<A> {
|
||||
return Stream.toAsyncIterable(stream);
|
||||
}
|
||||
@@ -1,24 +1,22 @@
|
||||
import { syncEffect } from "../../effect-runtime/index.js";
|
||||
import {
|
||||
getDiagnosticStabilitySnapshot,
|
||||
normalizeDiagnosticStabilityQuery,
|
||||
} from "../../logging/diagnostic-stability.js";
|
||||
import { ErrorCodes, errorShape } from "../protocol/index.js";
|
||||
import { invalidGatewayMethodRequest, respondWithGatewayEffect } from "./effect.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
export const diagnosticsHandlers: GatewayRequestHandlers = {
|
||||
"diagnostics.stability": async ({ params, respond }) => {
|
||||
try {
|
||||
const query = normalizeDiagnosticStabilityQuery(params);
|
||||
respond(true, getDiagnosticStabilitySnapshot(query), undefined);
|
||||
} catch (err) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
err instanceof Error ? err.message : "invalid diagnostics.stability params",
|
||||
),
|
||||
);
|
||||
}
|
||||
await respondWithGatewayEffect({
|
||||
respond,
|
||||
effect: syncEffect({
|
||||
try: () => getDiagnosticStabilitySnapshot(normalizeDiagnosticStabilityQuery(params)),
|
||||
catch: (err) =>
|
||||
invalidGatewayMethodRequest(
|
||||
err instanceof Error ? err.message : "invalid diagnostics.stability params",
|
||||
),
|
||||
}),
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
32
src/gateway/server-methods/effect.test.ts
Normal file
32
src/gateway/server-methods/effect.test.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
import { Effect } from "effect";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { invalidGatewayMethodRequest, respondWithGatewayEffect } from "./effect.js";
|
||||
|
||||
describe("gateway method Effect adapter", () => {
|
||||
it("responds with the successful Effect payload", async () => {
|
||||
const respond = vi.fn();
|
||||
|
||||
await respondWithGatewayEffect({
|
||||
respond,
|
||||
effect: Effect.succeed({ ok: true }),
|
||||
meta: { cached: true },
|
||||
});
|
||||
|
||||
expect(respond).toHaveBeenCalledWith(true, { ok: true }, undefined, { cached: true });
|
||||
});
|
||||
|
||||
it("maps expected gateway failures to protocol errors", async () => {
|
||||
const respond = vi.fn();
|
||||
|
||||
await respondWithGatewayEffect({
|
||||
respond,
|
||||
effect: Effect.fail(invalidGatewayMethodRequest("bad params")),
|
||||
});
|
||||
|
||||
expect(respond).toHaveBeenCalledWith(false, undefined, {
|
||||
code: "INVALID_REQUEST",
|
||||
message: "bad params",
|
||||
});
|
||||
});
|
||||
});
|
||||
47
src/gateway/server-methods/effect.ts
Normal file
47
src/gateway/server-methods/effect.ts
Normal file
@@ -0,0 +1,47 @@
|
||||
import { runOpenClawEffect, type OpenClawEffect } from "../../effect-runtime/index.js";
|
||||
import { ErrorCodes, errorShape, type ErrorShape } from "../protocol/index.js";
|
||||
import type { RespondFn } from "./types.js";
|
||||
|
||||
type GatewayEffectErrorCode = (typeof ErrorCodes)[keyof typeof ErrorCodes];
|
||||
|
||||
export class GatewayMethodEffectError extends Error {
|
||||
constructor(
|
||||
readonly code: GatewayEffectErrorCode,
|
||||
message: string,
|
||||
readonly opts?: { details?: unknown; retryable?: boolean; retryAfterMs?: number },
|
||||
) {
|
||||
super(message);
|
||||
this.name = "GatewayMethodEffectError";
|
||||
}
|
||||
}
|
||||
|
||||
export function invalidGatewayMethodRequest(message: string): GatewayMethodEffectError {
|
||||
return new GatewayMethodEffectError(ErrorCodes.INVALID_REQUEST, message);
|
||||
}
|
||||
|
||||
function shapeGatewayMethodError(error: unknown): ErrorShape {
|
||||
if (error instanceof GatewayMethodEffectError) {
|
||||
return errorShape(error.code, error.message, error.opts);
|
||||
}
|
||||
return errorShape(
|
||||
ErrorCodes.UNAVAILABLE,
|
||||
error instanceof Error ? error.message : "gateway method failed",
|
||||
);
|
||||
}
|
||||
|
||||
export async function respondWithGatewayEffect<T>(params: {
|
||||
respond: RespondFn;
|
||||
effect: OpenClawEffect<T, unknown>;
|
||||
meta?: Record<string, unknown>;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const payload = await runOpenClawEffect(params.effect);
|
||||
if (params.meta) {
|
||||
params.respond(true, payload, undefined, params.meta);
|
||||
return;
|
||||
}
|
||||
params.respond(true, payload, undefined);
|
||||
} catch (error) {
|
||||
params.respond(false, undefined, shapeGatewayMethodError(error));
|
||||
}
|
||||
}
|
||||
@@ -516,6 +516,34 @@ function ensureSessionTranscriptFile(params: {
|
||||
}
|
||||
}
|
||||
|
||||
function readRawSessionCreateLabelFallback(params: {
|
||||
storePath: string;
|
||||
storeKeys: string[];
|
||||
}): string | undefined {
|
||||
let parsed: unknown;
|
||||
try {
|
||||
const raw = fs.readFileSync(params.storePath, "utf-8");
|
||||
parsed = raw.trim() ? JSON.parse(raw) : {};
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
return undefined;
|
||||
}
|
||||
const store = parsed as Record<string, unknown>;
|
||||
for (const key of params.storeKeys) {
|
||||
const entry = store[key];
|
||||
if (!entry || typeof entry !== "object" || Array.isArray(entry)) {
|
||||
continue;
|
||||
}
|
||||
const label = normalizeOptionalString((entry as { label?: unknown }).label);
|
||||
if (label) {
|
||||
return label;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveAbortSessionKey(params: {
|
||||
context: Pick<GatewayRequestContext, "chatAbortControllers">;
|
||||
requestedKey: string;
|
||||
@@ -1271,6 +1299,15 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
: buildDashboardSessionKey(agentId);
|
||||
const target = resolveGatewaySessionStoreTarget({ cfg, key });
|
||||
const targetAgentId = resolveAgentIdFromSessionKey(target.canonicalKey);
|
||||
const requestedLabel = normalizeOptionalString(p.label);
|
||||
const label =
|
||||
requestedLabel ??
|
||||
(!("label" in p)
|
||||
? readRawSessionCreateLabelFallback({
|
||||
storePath: target.storePath,
|
||||
storeKeys: target.storeKeys,
|
||||
})
|
||||
: undefined);
|
||||
const created = await updateSessionStore(target.storePath, async (store) => {
|
||||
const patched = await applySessionsPatchToStore({
|
||||
cfg,
|
||||
@@ -1278,7 +1315,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
storeKey: target.canonicalKey,
|
||||
patch: {
|
||||
key: target.canonicalKey,
|
||||
label: normalizeOptionalString(p.label),
|
||||
label,
|
||||
model: normalizeOptionalString(p.model),
|
||||
},
|
||||
loadGatewayModelCatalog: context.loadGatewayModelCatalog,
|
||||
|
||||
19
src/plugin-sdk/effect-runtime.ts
Normal file
19
src/plugin-sdk/effect-runtime.ts
Normal file
@@ -0,0 +1,19 @@
|
||||
// Public Effect helpers for plugins that need typed async programs, services, or streams.
|
||||
|
||||
export { Context, Effect, Layer, Schedule, ScheduleDecision, ScheduleInterval, Stream } from "effect";
|
||||
export {
|
||||
promiseEffect,
|
||||
runOpenClawEffect,
|
||||
runOpenClawEffectSync,
|
||||
syncEffect,
|
||||
type OpenClawEffect,
|
||||
} from "../effect-runtime/index.js";
|
||||
export {
|
||||
runRetryingPromise,
|
||||
type RetryingPromiseParams,
|
||||
} from "../effect-runtime/retry.js";
|
||||
export {
|
||||
asyncIterableStream,
|
||||
openClawStreamToAsyncIterable,
|
||||
type OpenClawStream,
|
||||
} from "../effect-runtime/stream.js";
|
||||
@@ -8,11 +8,13 @@ import { loadOpenClawPluginCliRegistry, loadOpenClawPlugins } from "./loader.js"
|
||||
import { createEmptyPluginRegistry } from "./registry-empty.js";
|
||||
import type { PluginRegistry } from "./registry.js";
|
||||
import {
|
||||
buildPluginRuntimeLoadOptions,
|
||||
createPluginRuntimeLoaderLogger,
|
||||
resolvePluginRuntimeLoadContext,
|
||||
type PluginRuntimeLoadContext,
|
||||
} from "./runtime/load-context.js";
|
||||
import {
|
||||
buildPluginRuntimeLoadOptionsWithEffect,
|
||||
resolvePluginRuntimeLoadContextWithEffect,
|
||||
} from "./runtime/load-context-effect.js";
|
||||
import type {
|
||||
OpenClawPluginCliCommandDescriptor,
|
||||
OpenClawPluginCliContext,
|
||||
@@ -57,7 +59,7 @@ function buildPluginCliLoaderParams(
|
||||
loaderOptions?: PluginCliLoaderOptions,
|
||||
) {
|
||||
const onlyPluginIds = resolvePrimaryCommandManifestPluginIds(context, params?.primaryCommand);
|
||||
return buildPluginRuntimeLoadOptions(context, {
|
||||
return buildPluginRuntimeLoadOptionsWithEffect(context, {
|
||||
...loaderOptions,
|
||||
...(onlyPluginIds && onlyPluginIds.length > 0 ? { onlyPluginIds } : {}),
|
||||
});
|
||||
@@ -133,7 +135,7 @@ export function resolvePluginCliLoadContext(params: {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
logger: PluginLogger;
|
||||
}): PluginCliLoadContext {
|
||||
return resolvePluginRuntimeLoadContext({
|
||||
return resolvePluginRuntimeLoadContextWithEffect({
|
||||
config: params.cfg,
|
||||
env: params.env,
|
||||
logger: params.logger,
|
||||
@@ -177,7 +179,7 @@ export async function loadPluginCliCommandRegistryWithContext(params: {
|
||||
return {
|
||||
...params.context,
|
||||
registry: loadOpenClawPlugins(
|
||||
buildPluginRuntimeLoadOptions(params.context, {
|
||||
buildPluginRuntimeLoadOptionsWithEffect(params.context, {
|
||||
...params.loaderOptions,
|
||||
...(onlyPluginIds && onlyPluginIds.length > 0 ? { onlyPluginIds } : {}),
|
||||
activate: false,
|
||||
|
||||
58
src/plugins/runtime/load-context-effect.test.ts
Normal file
58
src/plugins/runtime/load-context-effect.test.ts
Normal file
@@ -0,0 +1,58 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
buildPluginRuntimeLoadOptionsWithEffect,
|
||||
resolvePluginRuntimeLoadContextWithEffect,
|
||||
} from "./load-context-effect.js";
|
||||
|
||||
function createLogger() {
|
||||
return {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
describe("plugin runtime Effect load context", () => {
|
||||
it("resolves runtime load context through an Effect layer", () => {
|
||||
const config = { plugins: { enabled: true } } as OpenClawConfig;
|
||||
const env = { OPENCLAW_TEST: "1" };
|
||||
const logger = createLogger();
|
||||
|
||||
const context = resolvePluginRuntimeLoadContextWithEffect({
|
||||
config,
|
||||
env,
|
||||
logger,
|
||||
workspaceDir: "/tmp/openclaw-agent",
|
||||
});
|
||||
|
||||
expect(context.rawConfig).toBe(config);
|
||||
expect(context.env).toBe(env);
|
||||
expect(context.logger).toBe(logger);
|
||||
expect(context.workspaceDir).toBe("/tmp/openclaw-agent");
|
||||
});
|
||||
|
||||
it("builds load options from an existing context through Effect", () => {
|
||||
const config = { plugins: { enabled: true } } as OpenClawConfig;
|
||||
const context = resolvePluginRuntimeLoadContextWithEffect({
|
||||
config,
|
||||
env: {},
|
||||
logger: createLogger(),
|
||||
workspaceDir: "/tmp/openclaw-agent",
|
||||
});
|
||||
|
||||
const options = buildPluginRuntimeLoadOptionsWithEffect(context, {
|
||||
onlyPluginIds: ["demo"],
|
||||
cache: false,
|
||||
activate: false,
|
||||
});
|
||||
|
||||
expect(options.config).toBe(context.config);
|
||||
expect(options.workspaceDir).toBe("/tmp/openclaw-agent");
|
||||
expect(options.onlyPluginIds).toEqual(["demo"]);
|
||||
expect(options.cache).toBe(false);
|
||||
expect(options.activate).toBe(false);
|
||||
});
|
||||
});
|
||||
57
src/plugins/runtime/load-context-effect.ts
Normal file
57
src/plugins/runtime/load-context-effect.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import { Context, Effect, Layer } from "effect";
|
||||
|
||||
import { runOpenClawEffectSync, syncEffect } from "../../effect-runtime/index.js";
|
||||
import type { PluginLoadOptions } from "../loader.js";
|
||||
import {
|
||||
buildPluginRuntimeLoadOptions,
|
||||
resolvePluginRuntimeLoadContext,
|
||||
type PluginRuntimeLoadContext,
|
||||
type PluginRuntimeLoadContextOptions,
|
||||
} from "./load-context.js";
|
||||
|
||||
export const PluginRuntimeLoadContextTag =
|
||||
Context.GenericTag<PluginRuntimeLoadContext>("openclaw/PluginRuntimeLoadContext");
|
||||
|
||||
export function pluginRuntimeLoadContextLayer(
|
||||
options?: PluginRuntimeLoadContextOptions,
|
||||
): Layer.Layer<PluginRuntimeLoadContext> {
|
||||
return Layer.effect(
|
||||
PluginRuntimeLoadContextTag,
|
||||
syncEffect({
|
||||
try: () => resolvePluginRuntimeLoadContext(options),
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
export function pluginRuntimeLoadContextValueLayer(
|
||||
context: PluginRuntimeLoadContext,
|
||||
): Layer.Layer<PluginRuntimeLoadContext> {
|
||||
return Layer.succeed(PluginRuntimeLoadContextTag, context);
|
||||
}
|
||||
|
||||
export function buildPluginRuntimeLoadOptionsEffect(
|
||||
overrides?: Partial<PluginLoadOptions>,
|
||||
): Effect.Effect<PluginLoadOptions, never, PluginRuntimeLoadContext> {
|
||||
return Effect.map(PluginRuntimeLoadContextTag, (context) =>
|
||||
buildPluginRuntimeLoadOptions(context, overrides),
|
||||
);
|
||||
}
|
||||
|
||||
export function resolvePluginRuntimeLoadContextWithEffect(
|
||||
options?: PluginRuntimeLoadContextOptions,
|
||||
): PluginRuntimeLoadContext {
|
||||
return runOpenClawEffectSync(
|
||||
PluginRuntimeLoadContextTag.pipe(Effect.provide(pluginRuntimeLoadContextLayer(options))),
|
||||
);
|
||||
}
|
||||
|
||||
export function buildPluginRuntimeLoadOptionsWithEffect(
|
||||
context: PluginRuntimeLoadContext,
|
||||
overrides?: Partial<PluginLoadOptions>,
|
||||
): PluginLoadOptions {
|
||||
return runOpenClawEffectSync(
|
||||
buildPluginRuntimeLoadOptionsEffect(overrides).pipe(
|
||||
Effect.provide(pluginRuntimeLoadContextValueLayer(context)),
|
||||
),
|
||||
);
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
import { runRetryingPromise } from "../effect-runtime/retry.js";
|
||||
import { sleepWithAbort } from "../infra/backoff.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
|
||||
@@ -232,35 +233,30 @@ export async function executeProviderOperationWithRetry<T>(params: {
|
||||
const retryConfig = providerOperationRetryConfig(params.stage, params.retry);
|
||||
const retryOptions = resolveTransientProviderRetryOptions(retryConfig);
|
||||
const maxAttempts = resolveTransientProviderAttempts(retryOptions);
|
||||
let lastError: unknown;
|
||||
|
||||
for (let attemptNumber = 1; attemptNumber <= maxAttempts; attemptNumber += 1) {
|
||||
try {
|
||||
return await params.operation();
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
const message = formatErrorMessage(error);
|
||||
if (
|
||||
!retryOptions ||
|
||||
!shouldRetrySameKeyProviderOperation({
|
||||
options: retryOptions,
|
||||
error,
|
||||
message,
|
||||
provider: params.provider,
|
||||
apiKeyIndex: 0,
|
||||
attemptNumber,
|
||||
maxAttempts,
|
||||
stage: params.stage,
|
||||
})
|
||||
) {
|
||||
throw error;
|
||||
return await runRetryingPromise({
|
||||
operation: params.operation,
|
||||
maxAttempts,
|
||||
shouldRetry: (error, attemptNumber) => {
|
||||
if (!retryOptions) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const delayMs = resolveTransientProviderDelayMs(retryOptions, attemptNumber);
|
||||
const sleep = retryOptions.sleep ?? sleepWithAbort;
|
||||
await sleep(delayMs, retryOptions.signal);
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
return shouldRetrySameKeyProviderOperation({
|
||||
options: retryOptions,
|
||||
error,
|
||||
message: formatErrorMessage(error),
|
||||
provider: params.provider,
|
||||
apiKeyIndex: 0,
|
||||
attemptNumber,
|
||||
maxAttempts,
|
||||
stage: params.stage,
|
||||
});
|
||||
},
|
||||
resolveDelayMs: (attemptNumber) =>
|
||||
retryOptions ? resolveTransientProviderDelayMs(retryOptions, attemptNumber) : 0,
|
||||
sleep: async (delayMs) => {
|
||||
const sleep = retryOptions?.sleep ?? sleepWithAbort;
|
||||
await sleep(delayMs, retryOptions?.signal);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@@ -70,6 +70,11 @@ installPiEmbeddedMock();
|
||||
|
||||
vi.doMock("../../../src/agents/pi-embedded-runner/runs.js", () => ({
|
||||
abortEmbeddedPiRun: (...args: unknown[]) => piEmbeddedMocks.abortEmbeddedPiRun(...args),
|
||||
resolveActiveEmbeddedRunSessionId: (...args: unknown[]) =>
|
||||
piEmbeddedMocks.resolveActiveEmbeddedRunSessionId(...args),
|
||||
isEmbeddedPiRunActive: (...args: unknown[]) => piEmbeddedMocks.isEmbeddedPiRunActive(...args),
|
||||
isEmbeddedPiRunStreaming: (...args: unknown[]) =>
|
||||
piEmbeddedMocks.isEmbeddedPiRunStreaming(...args),
|
||||
formatEmbeddedPiQueueFailureSummary: (outcome: { reason?: string; sessionId?: string }) =>
|
||||
outcome.reason && outcome.sessionId
|
||||
? `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=live`
|
||||
|
||||
Reference in New Issue
Block a user