Compare commits

...

16 Commits

Author SHA1 Message Date
Vincent Koc
91e48c8329 fix(gateway): preserve dead session labels 2026-05-16 16:34:44 +08:00
Vincent Koc
0f882e6fcf fix(runtime): trim schedule type defaults 2026-05-16 16:34:44 +08:00
Vincent Koc
16a702a254 test(duckduckgo): preserve generic endpoint mock type 2026-05-16 16:34:43 +08:00
Vincent Koc
9fd5939edf docs(changelog): note effect runtime sdk 2026-05-16 16:34:43 +08:00
Vincent Koc
00c4d06bc7 feat(plugin-sdk): expose effect runtime helpers 2026-05-16 16:34:42 +08:00
Vincent Koc
63c9b412bc feat(runtime): drive retries with effect schedule 2026-05-16 16:34:42 +08:00
Vincent Koc
03a5ab9907 feat(duckduckgo): run search client through effect 2026-05-16 16:34:41 +08:00
Vincent Koc
07ee8f14bf test(auto-reply): seed compact main session fixture 2026-05-16 16:34:41 +08:00
Vincent Koc
414a042cfd test(auto-reply): align trigger harness pi run mock 2026-05-16 16:34:40 +08:00
Vincent Koc
47d34eeb9b fix(runtime): satisfy effect kernel lint 2026-05-16 16:34:40 +08:00
Vincent Koc
740c68eda4 feat(runtime): bridge transcript streams through effect 2026-05-16 16:34:40 +08:00
Vincent Koc
8d822214fa feat(plugins): layer runtime load context with effect 2026-05-16 16:34:39 +08:00
Vincent Koc
66a7a56414 feat(gateway): add effect method adapter 2026-05-16 16:34:39 +08:00
Vincent Koc
ffb0f23c6e feat(runtime): route api key retries through effect 2026-05-16 16:34:38 +08:00
Vincent Koc
162fe50f1b feat(runtime): add effect retry kernel 2026-05-16 16:34:38 +08:00
Vincent Koc
489c0d7a28 build(runtime): add effect dependency 2026-05-16 16:34:37 +08:00
27 changed files with 935 additions and 160 deletions

View File

@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
### Changes
- Plugin SDK: add `openclaw/plugin-sdk/effect-runtime` for typed Effect programs, services, retries, and stream bridges, and prove it through the DuckDuckGo plugin runtime.
- Providers/xAI: add xAI Grok OAuth login for SuperGrok subscribers, letting `xai/*` models and xAI media/tool providers authenticate without `XAI_API_KEY`.
- CLI/cron: add `openclaw cron run --wait` with timeout and poll interval controls, plus exact `cron.runs --run-id` filtering so automation can block on one queued manual run. (#81929) Thanks @ificator.
- Maintainer tooling: route Crabbox skill defaults through the repo brokered AWS config, leaving Blacksmith Testbox as an explicit opt-in instead of the broad-proof default.

View File

@@ -56,6 +56,41 @@ Provider and channel execution paths must use the active runtime config snapshot
## Reusable runtime utilities
Use `openclaw/plugin-sdk/effect-runtime` when plugin code needs OpenClaw's
pinned Effect integration for typed async programs, injectable services, retry
policies, or stream bridges. This subpath keeps plugin code on a supported SDK
surface instead of importing host internals or relying on a different Effect
version.
```typescript
import {
Context,
Effect,
Layer,
runOpenClawEffect,
} from "openclaw/plugin-sdk/effect-runtime";
type SearchRuntime = {
now: () => number;
};
const SearchRuntime = Context.GenericTag<SearchRuntime>("my-plugin/SearchRuntime");
const program = SearchRuntime.pipe(
Effect.map((runtime) => ({ startedAt: runtime.now() })),
);
const result = await runOpenClawEffect(
program.pipe(
Effect.provide(
Layer.succeed(SearchRuntime, {
now: Date.now,
}),
),
),
);
```
Use the channel-turn `botLoopProtection` facts for bot-authored inbound messages. Core applies the shared in-memory sliding-window guard before session record and dispatch, without tying the policy to one channel. The guard tracks `(scopeId, conversationId, participant pair)` keys, counts both directions of a pair together, applies a cooldown once the window budget is exceeded, and prunes inactive entries opportunistically.
Channel plugins that expose this behavior to operators should prefer the shared `channels.defaults.botLoopProtection` shape for baseline budgets, then layer channel/provider-specific overrides on top. The shared config uses seconds because it is user-facing:

View File

@@ -0,0 +1,100 @@
import { Effect } from "openclaw/plugin-sdk/effect-runtime";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { __testing } from "./ddg-client.js";
type DuckDuckGoSearchRuntimeOverrides = NonNullable<
Parameters<typeof __testing.duckDuckGoSearchRuntimeLayer>[0]
>;
type DuckDuckGoRunEndpoint = NonNullable<DuckDuckGoSearchRuntimeOverrides["runEndpoint"]>;
function resultHtml(params: { title: string; url: string; snippet: string }) {
return `
<a class="result__a" href="${params.url}">${params.title}</a>
<a class="result__snippet">${params.snippet}</a>
`;
}
describe("duckduckgo effect runtime", () => {
beforeEach(() => {
__testing.DDG_SEARCH_CACHE.clear();
});
it("runs the search through an injectable Effect runtime and caches payloads", async () => {
const cache: NonNullable<DuckDuckGoSearchRuntimeOverrides["cache"]> = new Map();
const now = vi.fn().mockReturnValueOnce(1_000).mockReturnValueOnce(1_037);
const runEndpointMock = vi.fn();
const runEndpoint: DuckDuckGoRunEndpoint = async (request, run) => {
runEndpointMock(request);
expect(request.timeoutSeconds).toBe(12);
expect(new URL(request.url).searchParams.get("q")).toBe("openclaw effect");
expect(new URL(request.url).searchParams.get("kl")).toBe("us-en");
expect(new URL(request.url).searchParams.get("kp")).toBe("-2");
return await run(
new Response(
resultHtml({
title: "OpenClaw &amp; Effect",
url: "https://duckduckgo.com/l/?uddg=https%3A%2F%2Fdocs.openclaw.ai%2F",
snippet: "Typed effects &amp; plugin runtime",
}),
{ status: 200 },
),
);
};
const runtime = __testing.duckDuckGoSearchRuntimeLayer({
cache,
now,
runEndpoint,
});
const search = {
query: "openclaw effect",
count: 3,
region: "us-en",
safeSearch: "off" as const,
timeoutSeconds: 12,
};
const first = await Effect.runPromise(
__testing.runDuckDuckGoSearchEffect(search).pipe(Effect.provide(runtime)),
);
const second = await Effect.runPromise(
__testing.runDuckDuckGoSearchEffect(search).pipe(Effect.provide(runtime)),
);
expect(runEndpointMock).toHaveBeenCalledOnce();
expect(first).toMatchObject({
count: 1,
provider: "duckduckgo",
query: "openclaw effect",
tookMs: 37,
});
expect(first.results).toEqual([
{
title: expect.stringContaining("OpenClaw & Effect"),
url: "https://docs.openclaw.ai/",
snippet: expect.stringContaining("Typed effects & plugin runtime"),
siteName: "docs.openclaw.ai",
},
]);
expect(second).toMatchObject({
cached: true,
count: 1,
provider: "duckduckgo",
});
});
it("fails through the Effect error channel for bot challenges", async () => {
const runtime = __testing.duckDuckGoSearchRuntimeLayer({
cache: new Map(),
runEndpoint: async (_request, run) =>
await run(new Response('<form id="challenge-form">Are you a human?</form>')),
});
await expect(
Effect.runPromise(
__testing
.runDuckDuckGoSearchEffect({ query: "openclaw" })
.pipe(Effect.provide(runtime)),
),
).rejects.toThrow("DuckDuckGo returned a bot-detection challenge.");
});
});

View File

@@ -1,4 +1,5 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { Context, Effect, Layer } from "openclaw/plugin-sdk/effect-runtime";
import {
DEFAULT_CACHE_TTL_MINUTES,
DEFAULT_SEARCH_COUNT,
@@ -28,6 +29,43 @@ const DDG_SEARCH_CACHE = new Map<
{ value: Record<string, unknown>; insertedAt: number; expiresAt: number }
>();
type DuckDuckGoEndpointRequest = {
url: string;
timeoutSeconds: number;
init: RequestInit;
signal?: AbortSignal;
};
type DuckDuckGoSearchRuntime = {
cache: typeof DDG_SEARCH_CACHE;
now: () => number;
runEndpoint: <T>(
request: DuckDuckGoEndpointRequest,
run: (response: Response) => Promise<T>,
) => Promise<T>;
};
const DuckDuckGoSearchRuntimeTag = Context.GenericTag<DuckDuckGoSearchRuntime>(
"openclaw/duckduckgo/SearchRuntime",
);
function createDefaultDuckDuckGoSearchRuntime(): DuckDuckGoSearchRuntime {
return {
cache: DDG_SEARCH_CACHE,
now: Date.now,
runEndpoint: withTrustedWebSearchEndpoint,
};
}
function duckDuckGoSearchRuntimeLayer(
runtime?: Partial<DuckDuckGoSearchRuntime>,
): Layer.Layer<DuckDuckGoSearchRuntime> {
return Layer.succeed(DuckDuckGoSearchRuntimeTag, {
...createDefaultDuckDuckGoSearchRuntime(),
...runtime,
});
}
type DuckDuckGoResult = {
title: string;
url: string;
@@ -121,92 +159,116 @@ export async function runDuckDuckGoSearch(params: {
timeoutSeconds?: number;
cacheTtlMinutes?: number;
}): Promise<Record<string, unknown>> {
const count = resolveSearchCount(params.count, DEFAULT_SEARCH_COUNT);
const region = params.region ?? resolveDdgRegion(params.config);
const safeSearch =
params.safeSearch === "strict" ||
params.safeSearch === "moderate" ||
params.safeSearch === "off"
? params.safeSearch
: resolveDdgSafeSearch(params.config);
const timeoutSeconds = resolveTimeoutSeconds(params.timeoutSeconds, DEFAULT_TIMEOUT_SECONDS);
const cacheTtlMs = resolveCacheTtlMs(params.cacheTtlMinutes, DEFAULT_CACHE_TTL_MINUTES);
const cacheKey = normalizeCacheKey(
JSON.stringify({
provider: "duckduckgo",
query: params.query,
count,
region: region ?? "",
safeSearch,
return await Effect.runPromise(
runDuckDuckGoSearchEffect(params).pipe(Effect.provide(duckDuckGoSearchRuntimeLayer())),
);
}
function runDuckDuckGoSearchEffect(params: {
config?: OpenClawConfig;
query: string;
count?: number;
region?: string;
safeSearch?: DdgSafeSearch;
timeoutSeconds?: number;
cacheTtlMinutes?: number;
}): Effect.Effect<Record<string, unknown>, unknown, DuckDuckGoSearchRuntime> {
return Effect.flatMap(DuckDuckGoSearchRuntimeTag, (runtime) =>
Effect.tryPromise({
try: async () => {
const count = resolveSearchCount(params.count, DEFAULT_SEARCH_COUNT);
const region = params.region ?? resolveDdgRegion(params.config);
const safeSearch =
params.safeSearch === "strict" ||
params.safeSearch === "moderate" ||
params.safeSearch === "off"
? params.safeSearch
: resolveDdgSafeSearch(params.config);
const timeoutSeconds = resolveTimeoutSeconds(params.timeoutSeconds, DEFAULT_TIMEOUT_SECONDS);
const cacheTtlMs = resolveCacheTtlMs(params.cacheTtlMinutes, DEFAULT_CACHE_TTL_MINUTES);
const cacheKey = normalizeCacheKey(
JSON.stringify({
provider: "duckduckgo",
query: params.query,
count,
region: region ?? "",
safeSearch,
}),
);
const cached = readCache(runtime.cache, cacheKey);
if (cached) {
return { ...cached.value, cached: true };
}
const url = new URL(DDG_HTML_ENDPOINT);
url.searchParams.set("q", params.query);
if (region) {
url.searchParams.set("kl", region);
}
url.searchParams.set("kp", DDG_SAFE_SEARCH_PARAM[safeSearch]);
const startedAt = runtime.now();
const results = await runtime.runEndpoint(
{
url: url.toString(),
timeoutSeconds,
init: {
method: "GET",
headers: {
"User-Agent":
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
},
},
},
async (response) => {
if (!response.ok) {
const detail = (await readResponseText(response, { maxBytes: 64_000 })).text;
throw new Error(
`DuckDuckGo search error (${response.status}): ${detail || response.statusText}`,
);
}
const html = await response.text();
if (isBotChallenge(html)) {
throw new Error("DuckDuckGo returned a bot-detection challenge.");
}
return parseDuckDuckGoHtml(html).slice(0, count);
},
);
const payload = {
query: params.query,
provider: "duckduckgo",
count: results.length,
tookMs: runtime.now() - startedAt,
externalContent: {
untrusted: true,
source: "web_search",
provider: "duckduckgo",
wrapped: true,
},
results: results.map((result) => ({
title: wrapWebContent(result.title, "web_search"),
url: result.url,
snippet: result.snippet ? wrapWebContent(result.snippet, "web_search") : "",
siteName: resolveSiteName(result.url) || undefined,
})),
} satisfies Record<string, unknown>;
writeCache(runtime.cache, cacheKey, payload, cacheTtlMs);
return payload;
},
catch: (error) => error,
}),
);
const cached = readCache(DDG_SEARCH_CACHE, cacheKey);
if (cached) {
return { ...cached.value, cached: true };
}
const url = new URL(DDG_HTML_ENDPOINT);
url.searchParams.set("q", params.query);
if (region) {
url.searchParams.set("kl", region);
}
url.searchParams.set("kp", DDG_SAFE_SEARCH_PARAM[safeSearch]);
const startedAt = Date.now();
const results = await withTrustedWebSearchEndpoint(
{
url: url.toString(),
timeoutSeconds,
init: {
method: "GET",
headers: {
"User-Agent":
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
},
},
},
async (response) => {
if (!response.ok) {
const detail = (await readResponseText(response, { maxBytes: 64_000 })).text;
throw new Error(
`DuckDuckGo search error (${response.status}): ${detail || response.statusText}`,
);
}
const html = await response.text();
if (isBotChallenge(html)) {
throw new Error("DuckDuckGo returned a bot-detection challenge.");
}
return parseDuckDuckGoHtml(html).slice(0, count);
},
);
const payload = {
query: params.query,
provider: "duckduckgo",
count: results.length,
tookMs: Date.now() - startedAt,
externalContent: {
untrusted: true,
source: "web_search",
provider: "duckduckgo",
wrapped: true,
},
results: results.map((result) => ({
title: wrapWebContent(result.title, "web_search"),
url: result.url,
snippet: result.snippet ? wrapWebContent(result.snippet, "web_search") : "",
siteName: resolveSiteName(result.url) || undefined,
})),
} satisfies Record<string, unknown>;
writeCache(DDG_SEARCH_CACHE, cacheKey, payload, cacheTtlMs);
return payload;
}
export const __testing = {
DDG_SEARCH_CACHE,
decodeDuckDuckGoUrl,
decodeHtmlEntities,
duckDuckGoSearchRuntimeLayer,
isBotChallenge,
parseDuckDuckGoHtml,
runDuckDuckGoSearchEffect,
};

View File

@@ -132,6 +132,10 @@
"types": "./dist/plugin-sdk/runtime.d.ts",
"default": "./dist/plugin-sdk/runtime.js"
},
"./plugin-sdk/effect-runtime": {
"types": "./dist/plugin-sdk/effect-runtime.d.ts",
"default": "./dist/plugin-sdk/effect-runtime.js"
},
"./plugin-sdk/runtime-doctor": {
"types": "./dist/plugin-sdk/runtime-doctor.d.ts",
"default": "./dist/plugin-sdk/runtime-doctor.js"
@@ -1773,6 +1777,7 @@
"commander": "14.0.3",
"croner": "10.0.1",
"dotenv": "17.4.2",
"effect": "3.21.2",
"express": "5.2.1",
"file-type": "22.0.1",
"grammy": "1.42.0",
@@ -1793,8 +1798,8 @@
"tokenjuice": "0.7.0",
"tree-sitter-bash": "0.25.1",
"tslog": "4.10.2",
"typescript": "6.0.3",
"typebox": "1.1.38",
"typescript": "6.0.3",
"undici": "8.3.0",
"web-push": "3.6.7",
"web-tree-sitter": "0.26.8",

View File

@@ -0,0 +1 @@
export * from "../../../src/plugin-sdk/effect-runtime.js";

24
pnpm-lock.yaml generated
View File

@@ -107,6 +107,9 @@ importers:
dotenv:
specifier: 17.4.2
version: 17.4.2
effect:
specifier: 3.21.2
version: 3.21.2
express:
specifier: 5.2.1
version: 5.2.1
@@ -5217,6 +5220,9 @@ packages:
ee-first@1.1.1:
resolution: {integrity: sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==}
effect@3.21.2:
resolution: {integrity: sha512-rXd2FGDM8KdjSIrc+mqEELo7ScW7xTVxEf1iInmPSpIde9/nyGuFM710cjTo7/EreGXiUX2MOonPpprbz2XHCg==}
emoji-regex@8.0.0:
resolution: {integrity: sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==}
@@ -5381,6 +5387,10 @@ packages:
resolution: {integrity: sha512-CGnyrvbhPlWYMngksqrSSUT1BAVP49dZocrHuK0SvtR0D5TMs5wP0o3j7jexDJW01KSadjBp1M/71o/KR3nD1w==}
engines: {node: '>=18'}
fast-check@3.23.2:
resolution: {integrity: sha512-h5+1OzzfCC3Ef7VbtKdcv7zsstUQwUDlYpUTvjeUsJAssPgLn7QzbboPtL5ro04Mq0rPOsMzl7q5hIbRs2wD1A==}
engines: {node: '>=8.0.0'}
fast-deep-equal@3.1.3:
resolution: {integrity: sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==}
@@ -6809,6 +6819,9 @@ packages:
resolution: {integrity: sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==}
engines: {node: '>=6'}
pure-rand@6.1.0:
resolution: {integrity: sha512-bVWawvoZoBYpp6yIoQtQXHZjmz35RSVHnUOTefl8Vcjr8snTPY1wnpSPMWekcFwbxI6gtmT7rSYPFvz71ldiOA==}
qified@0.10.1:
resolution: {integrity: sha512-+Owyggi9IxT1ePKGafcI87ubSmxol6smwJ+RAHDQlx9+9cPwFWDiKFFCPuWhr9ignlGpZ9vDQLw67N4dcTVFEA==}
engines: {node: '>=20'}
@@ -11589,6 +11602,11 @@ snapshots:
ee-first@1.1.1: {}
effect@3.21.2:
dependencies:
'@standard-schema/spec': 1.1.0
fast-check: 3.23.2
emoji-regex@8.0.0: {}
empathic@2.0.1: {}
@@ -11806,6 +11824,10 @@ snapshots:
fake-indexeddb@6.2.5: {}
fast-check@3.23.2:
dependencies:
pure-rand: 6.1.0
fast-deep-equal@3.1.3: {}
fast-fifo@1.3.2: {}
@@ -13598,6 +13620,8 @@ snapshots:
punycode@2.3.1: {}
pure-rand@6.1.0: {}
qified@0.10.1:
dependencies:
hookified: 2.2.0

View File

@@ -8,6 +8,7 @@
"self-hosted-provider-setup",
"routing",
"runtime",
"effect-runtime",
"runtime-doctor",
"runtime-env",
"runtime-logger",

View File

@@ -259,6 +259,26 @@ describe("executeWithApiKeyRotation", () => {
expect(sleep).not.toHaveBeenCalled();
});
it("throws the original rate-limit error after exhausting rotated keys", async () => {
const firstError = new Error("HTTP 429 too many requests on key 1");
const secondError = new Error("HTTP 429 too many requests on key 2");
const execute = vi
.fn<(apiKey: string) => Promise<string>>()
.mockRejectedValueOnce(firstError)
.mockRejectedValueOnce(secondError);
await expect(
executeWithApiKeyRotation({
provider: "openai",
apiKeys: ["key-1", "key-2"],
transientRetry: { attempts: 2, baseDelayMs: 0, maxDelayMs: 0 },
execute,
}),
).rejects.toBe(secondError);
expect(execute).toHaveBeenCalledTimes(2);
});
it("does not rotate keys for transient 500 after same-key retry exhaustion", async () => {
const sleep = vi.fn(async () => undefined);
const execute = vi.fn(async () => {

View File

@@ -1,3 +1,4 @@
import { runRetryingPromise } from "../effect-runtime/retry.js";
import { sleepWithAbort } from "../infra/backoff.js";
import { formatErrorMessage } from "../infra/errors.js";
import {
@@ -9,6 +10,16 @@ import {
} from "../provider-runtime/operation-retry.js";
import { collectProviderApiKeys, isApiKeyRateLimitError } from "./live-auth-keys.js";
class RotateApiKeyError extends Error {
constructor(
readonly error: unknown,
readonly messageForRetry: string,
) {
super(messageForRetry);
this.name = "RotateApiKeyError";
}
}
type ApiKeyRetryParams = {
apiKey: string;
error: unknown;
@@ -56,46 +67,66 @@ export async function executeWithApiKeyRotation<T>(
let lastError: unknown;
const transientRetry = resolveTransientProviderRetryOptions(params.transientRetry);
keyLoop: for (let apiKeyIndex = 0; apiKeyIndex < keys.length; apiKeyIndex += 1) {
for (let apiKeyIndex = 0; apiKeyIndex < keys.length; apiKeyIndex += 1) {
const apiKey = keys[apiKeyIndex];
const maxOperationAttempts = resolveTransientProviderAttempts(transientRetry);
for (let attemptNumber = 1; attemptNumber <= maxOperationAttempts; attemptNumber += 1) {
try {
return await params.execute(apiKey);
} catch (error) {
lastError = error;
const message = formatErrorMessage(error);
const rotateKey = params.shouldRetry
? params.shouldRetry({ apiKey, error, attempt: apiKeyIndex, message })
: isApiKeyRateLimitError(message);
try {
return await runRetryingPromise({
operation: async () => {
try {
return await params.execute(apiKey);
} catch (error) {
lastError = error;
const message = formatErrorMessage(error);
const rotateKey = params.shouldRetry
? params.shouldRetry({ apiKey, error, attempt: apiKeyIndex, message })
: isApiKeyRateLimitError(message);
if (rotateKey) {
if (apiKeyIndex + 1 >= keys.length) {
break;
if (rotateKey) {
throw new RotateApiKeyError(error, message);
}
throw error;
}
params.onRetry?.({ apiKey, error, attempt: apiKeyIndex, message });
break;
}
if (
!transientRetry ||
!shouldRetrySameKeyProviderOperation({
},
maxAttempts: maxOperationAttempts,
shouldRetry: (error, attemptNumber) => {
if (!transientRetry || error instanceof RotateApiKeyError) {
return false;
}
return shouldRetrySameKeyProviderOperation({
options: transientRetry,
error,
message,
message: formatErrorMessage(error),
provider: params.provider,
apiKeyIndex,
attemptNumber,
maxAttempts: maxOperationAttempts,
})
) {
break keyLoop;
});
},
resolveDelayMs: (attemptNumber) =>
transientRetry ? resolveTransientProviderDelayMs(transientRetry, attemptNumber) : 0,
sleep: async (delayMs) => {
const sleep = transientRetry?.sleep ?? sleepWithAbort;
await sleep(delayMs, transientRetry?.signal);
},
});
} catch (error) {
if (error instanceof RotateApiKeyError) {
lastError = error.error;
if (apiKeyIndex + 1 >= keys.length) {
break;
}
const delayMs = resolveTransientProviderDelayMs(transientRetry, attemptNumber);
const sleep = transientRetry.sleep ?? sleepWithAbort;
await sleep(delayMs, transientRetry.signal);
params.onRetry?.({
apiKey,
error: error.error,
attempt: apiKeyIndex,
message: error.messageForRetry,
});
continue;
}
lastError = error;
break;
}
}

View File

@@ -14,7 +14,7 @@ import {
expectBareNewOrResetAcknowledged,
withTempHome,
} from "../../test/helpers/auto-reply/trigger-handling-test-harness.js";
import { loadSessionStore, resolveSessionKey } from "../config/sessions.js";
import { loadSessionStore } from "../config/sessions.js";
import { registerGroupIntroPromptCases } from "./reply.triggers.group-intro-prompts.cases.js";
import { registerTriggerHandlingUsageSummaryCases } from "./reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.cases.js";
import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./reply/queue.js";
@@ -537,6 +537,7 @@ describe("trigger handling", () => {
const storePath = join(home, "compact-main.sessions.json");
const cfg = makeCfg(home);
cfg.session = { ...cfg.session, store: storePath };
await seedTargetSession(storePath, MAIN_SESSION_KEY);
mockSuccessfulCompaction();
const request = {
@@ -557,7 +558,7 @@ describe("trigger handling", () => {
expect(text?.startsWith("⚙️ Compacted")).toBe(true);
expect(getCompactEmbeddedPiSessionMock()).toHaveBeenCalledOnce();
const store = loadSessionStore(storePath);
const sessionKey = resolveSessionKey("per-sender", request);
const sessionKey = MAIN_SESSION_KEY;
expect(store[sessionKey]?.compactionCount).toBe(1);
});
});

View File

@@ -1,6 +1,11 @@
import fs from "node:fs";
import readline from "node:readline";
import {
asyncIterableStream,
openClawStreamToAsyncIterable,
} from "../../effect-runtime/stream.js";
// Shared streaming helpers for JSONL session transcripts.
//
// Callers historically read the entire transcript with `fs.readFile` before
@@ -50,7 +55,7 @@ export async function* streamSessionTranscriptLines(
const stream = fs.createReadStream(filePath, { encoding: "utf-8" });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
try {
for await (const line of rl) {
for await (const line of openClawStreamToAsyncIterable(asyncIterableStream(rl))) {
if (options.signal?.aborted) {
return;
}

View File

@@ -0,0 +1,41 @@
import { Effect, Either } from "effect";
export type OpenClawEffect<A, E = never, R = never> = Effect.Effect<A, E, R>;
export function promiseEffect<A, E = unknown>(params: {
try: (signal: AbortSignal) => PromiseLike<A>;
catch?: (error: unknown) => E;
}): OpenClawEffect<A, E> {
return Effect.tryPromise({
try: params.try,
catch: params.catch ?? ((error) => error as E),
});
}
export function syncEffect<A, E = unknown>(params: {
try: () => A;
catch?: (error: unknown) => E;
}): OpenClawEffect<A, E> {
return Effect.try({
try: params.try,
catch: params.catch ?? ((error) => error as E),
});
}
export async function runOpenClawEffect<A, E>(
effect: OpenClawEffect<A, E>,
): Promise<A> {
const result = await Effect.runPromise(Effect.either(effect));
if (Either.isLeft(result)) {
throw result.left;
}
return result.right;
}
export function runOpenClawEffectSync<A, E>(effect: OpenClawEffect<A, E>): A {
const result = Effect.runSync(Effect.either(effect));
if (Either.isLeft(result)) {
throw result.left;
}
return result.right;
}

View File

@@ -0,0 +1,87 @@
import { Effect } from "effect";
import { describe, expect, it, vi } from "vitest";
import { runOpenClawEffect, syncEffect } from "./index.js";
import { runRetryingPromise } from "./retry.js";
describe("effect runtime kernel", () => {
it("rethrows typed failures without exposing Effect fiber wrappers", async () => {
const error = new Error("typed failure");
await expect(runOpenClawEffect(Effect.fail(error))).rejects.toBe(error);
});
it("maps thrown sync exceptions into typed failures", async () => {
const error = new Error("sync failure");
await expect(
runOpenClawEffect(
syncEffect({
try: () => {
throw error;
},
catch: (err) => err,
}),
),
).rejects.toBe(error);
});
it("retries failed promise operations through the internal Effect runtime", async () => {
const sleep = vi.fn(async () => undefined);
const operation = vi
.fn<() => Promise<string>>()
.mockRejectedValueOnce(new Error("HTTP 503"))
.mockResolvedValueOnce("ok");
await expect(
runRetryingPromise({
operation,
maxAttempts: 2,
shouldRetry: () => true,
resolveDelayMs: () => 25,
sleep,
}),
).resolves.toBe("ok");
expect(operation).toHaveBeenCalledTimes(2);
expect(sleep).toHaveBeenCalledWith(25);
});
it("does not sleep after retry exhaustion", async () => {
const sleep = vi.fn(async () => undefined);
const error = new Error("HTTP 503");
const operation = vi.fn<() => Promise<string>>().mockRejectedValue(error);
await expect(
runRetryingPromise({
operation,
maxAttempts: 1,
shouldRetry: () => true,
resolveDelayMs: () => 25,
sleep,
}),
).rejects.toBe(error);
expect(operation).toHaveBeenCalledTimes(1);
expect(sleep).not.toHaveBeenCalled();
});
it("stops retrying when the predicate rejects the failure", async () => {
const sleep = vi.fn(async () => undefined);
const error = new Error("validation failed");
const operation = vi.fn<() => Promise<string>>().mockRejectedValue(error);
await expect(
runRetryingPromise({
operation,
maxAttempts: 3,
shouldRetry: () => false,
resolveDelayMs: () => 25,
sleep,
}),
).rejects.toBe(error);
expect(operation).toHaveBeenCalledTimes(1);
expect(sleep).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,57 @@
import { Effect, Schedule, ScheduleDecision, ScheduleInterval } from "effect";
import { promiseEffect, runOpenClawEffect, type OpenClawEffect } from "./index.js";
export type RetryingPromiseParams<T> = {
operation: () => Promise<T>;
maxAttempts: number;
shouldRetry: (error: unknown, attemptNumber: number) => boolean;
resolveDelayMs: (attemptNumber: number) => number;
sleep: (delayMs: number) => Promise<void>;
};
function retryDecisionSchedule(
params: Pick<RetryingPromiseParams<unknown>, "maxAttempts" | "shouldRetry">,
): Schedule.Schedule<number> {
return Schedule.makeWithState(1, (now, error, attemptNumber) => {
const shouldRetry =
attemptNumber < params.maxAttempts && params.shouldRetry(error, attemptNumber);
return Effect.succeed([
attemptNumber + 1,
attemptNumber,
shouldRetry
? ScheduleDecision.continueWith(ScheduleInterval.after(now))
: ScheduleDecision.done,
] as const);
});
}
function runAttempt<T>(
params: RetryingPromiseParams<T>,
retryDriver: Schedule.ScheduleDriver<number>,
): OpenClawEffect<T, unknown> {
return Effect.suspend(() =>
promiseEffect({
try: () => params.operation(),
}).pipe(
Effect.catchAll((error) => {
return retryDriver.next(error).pipe(
Effect.catchAll(() => Effect.fail(error)),
Effect.flatMap((attemptNumber) =>
promiseEffect({
try: () => params.sleep(params.resolveDelayMs(attemptNumber)),
}).pipe(Effect.flatMap(() => runAttempt(params, retryDriver))),
),
);
}),
),
);
}
export async function runRetryingPromise<T>(params: RetryingPromiseParams<T>): Promise<T> {
return await runOpenClawEffect(
Schedule.driver(retryDecisionSchedule(params)).pipe(
Effect.flatMap((retryDriver) => runAttempt(params, retryDriver)),
),
);
}

View File

@@ -0,0 +1,37 @@
import { describe, expect, it } from "vitest";
import { asyncIterableStream, openClawStreamToAsyncIterable } from "./stream.js";
async function collect<T>(iterable: AsyncIterable<T>): Promise<T[]> {
const values: T[] = [];
for await (const value of iterable) {
values.push(value);
}
return values;
}
describe("Effect stream bridge", () => {
it("round-trips async iterable values", async () => {
async function* source() {
yield "alpha";
yield "beta";
}
await expect(
collect(openClawStreamToAsyncIterable(asyncIterableStream(source()))),
).resolves.toEqual(["alpha", "beta"]);
});
it("maps async iterable failures before rethrowing them", async () => {
const original = new Error("source failed");
const mapped = new Error("mapped source failed");
async function* source() {
yield "alpha";
throw original;
}
await expect(
collect(openClawStreamToAsyncIterable(asyncIterableStream(source(), () => mapped))),
).rejects.toBe(mapped);
});
});

View File

@@ -0,0 +1,16 @@
import { Stream } from "effect";
export type OpenClawStream<A, E = unknown, R = never> = Stream.Stream<A, E, R>;
export function asyncIterableStream<A, E = unknown>(
iterable: AsyncIterable<A>,
onError: (error: unknown) => E = (error) => error as E,
): OpenClawStream<A, E> {
return Stream.fromAsyncIterable(iterable, onError);
}
export function openClawStreamToAsyncIterable<A, E>(
stream: OpenClawStream<A, E>,
): AsyncIterable<A> {
return Stream.toAsyncIterable(stream);
}

View File

@@ -1,24 +1,22 @@
import { syncEffect } from "../../effect-runtime/index.js";
import {
getDiagnosticStabilitySnapshot,
normalizeDiagnosticStabilityQuery,
} from "../../logging/diagnostic-stability.js";
import { ErrorCodes, errorShape } from "../protocol/index.js";
import { invalidGatewayMethodRequest, respondWithGatewayEffect } from "./effect.js";
import type { GatewayRequestHandlers } from "./types.js";
export const diagnosticsHandlers: GatewayRequestHandlers = {
"diagnostics.stability": async ({ params, respond }) => {
try {
const query = normalizeDiagnosticStabilityQuery(params);
respond(true, getDiagnosticStabilitySnapshot(query), undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
err instanceof Error ? err.message : "invalid diagnostics.stability params",
),
);
}
await respondWithGatewayEffect({
respond,
effect: syncEffect({
try: () => getDiagnosticStabilitySnapshot(normalizeDiagnosticStabilityQuery(params)),
catch: (err) =>
invalidGatewayMethodRequest(
err instanceof Error ? err.message : "invalid diagnostics.stability params",
),
}),
});
},
};

View File

@@ -0,0 +1,32 @@
import { Effect } from "effect";
import { describe, expect, it, vi } from "vitest";
import { invalidGatewayMethodRequest, respondWithGatewayEffect } from "./effect.js";
describe("gateway method Effect adapter", () => {
it("responds with the successful Effect payload", async () => {
const respond = vi.fn();
await respondWithGatewayEffect({
respond,
effect: Effect.succeed({ ok: true }),
meta: { cached: true },
});
expect(respond).toHaveBeenCalledWith(true, { ok: true }, undefined, { cached: true });
});
it("maps expected gateway failures to protocol errors", async () => {
const respond = vi.fn();
await respondWithGatewayEffect({
respond,
effect: Effect.fail(invalidGatewayMethodRequest("bad params")),
});
expect(respond).toHaveBeenCalledWith(false, undefined, {
code: "INVALID_REQUEST",
message: "bad params",
});
});
});

View File

@@ -0,0 +1,47 @@
import { runOpenClawEffect, type OpenClawEffect } from "../../effect-runtime/index.js";
import { ErrorCodes, errorShape, type ErrorShape } from "../protocol/index.js";
import type { RespondFn } from "./types.js";
type GatewayEffectErrorCode = (typeof ErrorCodes)[keyof typeof ErrorCodes];
export class GatewayMethodEffectError extends Error {
constructor(
readonly code: GatewayEffectErrorCode,
message: string,
readonly opts?: { details?: unknown; retryable?: boolean; retryAfterMs?: number },
) {
super(message);
this.name = "GatewayMethodEffectError";
}
}
export function invalidGatewayMethodRequest(message: string): GatewayMethodEffectError {
return new GatewayMethodEffectError(ErrorCodes.INVALID_REQUEST, message);
}
function shapeGatewayMethodError(error: unknown): ErrorShape {
if (error instanceof GatewayMethodEffectError) {
return errorShape(error.code, error.message, error.opts);
}
return errorShape(
ErrorCodes.UNAVAILABLE,
error instanceof Error ? error.message : "gateway method failed",
);
}
export async function respondWithGatewayEffect<T>(params: {
respond: RespondFn;
effect: OpenClawEffect<T, unknown>;
meta?: Record<string, unknown>;
}): Promise<void> {
try {
const payload = await runOpenClawEffect(params.effect);
if (params.meta) {
params.respond(true, payload, undefined, params.meta);
return;
}
params.respond(true, payload, undefined);
} catch (error) {
params.respond(false, undefined, shapeGatewayMethodError(error));
}
}

View File

@@ -516,6 +516,34 @@ function ensureSessionTranscriptFile(params: {
}
}
function readRawSessionCreateLabelFallback(params: {
storePath: string;
storeKeys: string[];
}): string | undefined {
let parsed: unknown;
try {
const raw = fs.readFileSync(params.storePath, "utf-8");
parsed = raw.trim() ? JSON.parse(raw) : {};
} catch {
return undefined;
}
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return undefined;
}
const store = parsed as Record<string, unknown>;
for (const key of params.storeKeys) {
const entry = store[key];
if (!entry || typeof entry !== "object" || Array.isArray(entry)) {
continue;
}
const label = normalizeOptionalString((entry as { label?: unknown }).label);
if (label) {
return label;
}
}
return undefined;
}
function resolveAbortSessionKey(params: {
context: Pick<GatewayRequestContext, "chatAbortControllers">;
requestedKey: string;
@@ -1271,6 +1299,15 @@ export const sessionsHandlers: GatewayRequestHandlers = {
: buildDashboardSessionKey(agentId);
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const targetAgentId = resolveAgentIdFromSessionKey(target.canonicalKey);
const requestedLabel = normalizeOptionalString(p.label);
const label =
requestedLabel ??
(!("label" in p)
? readRawSessionCreateLabelFallback({
storePath: target.storePath,
storeKeys: target.storeKeys,
})
: undefined);
const created = await updateSessionStore(target.storePath, async (store) => {
const patched = await applySessionsPatchToStore({
cfg,
@@ -1278,7 +1315,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
storeKey: target.canonicalKey,
patch: {
key: target.canonicalKey,
label: normalizeOptionalString(p.label),
label,
model: normalizeOptionalString(p.model),
},
loadGatewayModelCatalog: context.loadGatewayModelCatalog,

View File

@@ -0,0 +1,19 @@
// Public Effect helpers for plugins that need typed async programs, services, or streams.
export { Context, Effect, Layer, Schedule, ScheduleDecision, ScheduleInterval, Stream } from "effect";
export {
promiseEffect,
runOpenClawEffect,
runOpenClawEffectSync,
syncEffect,
type OpenClawEffect,
} from "../effect-runtime/index.js";
export {
runRetryingPromise,
type RetryingPromiseParams,
} from "../effect-runtime/retry.js";
export {
asyncIterableStream,
openClawStreamToAsyncIterable,
type OpenClawStream,
} from "../effect-runtime/stream.js";

View File

@@ -8,11 +8,13 @@ import { loadOpenClawPluginCliRegistry, loadOpenClawPlugins } from "./loader.js"
import { createEmptyPluginRegistry } from "./registry-empty.js";
import type { PluginRegistry } from "./registry.js";
import {
buildPluginRuntimeLoadOptions,
createPluginRuntimeLoaderLogger,
resolvePluginRuntimeLoadContext,
type PluginRuntimeLoadContext,
} from "./runtime/load-context.js";
import {
buildPluginRuntimeLoadOptionsWithEffect,
resolvePluginRuntimeLoadContextWithEffect,
} from "./runtime/load-context-effect.js";
import type {
OpenClawPluginCliCommandDescriptor,
OpenClawPluginCliContext,
@@ -57,7 +59,7 @@ function buildPluginCliLoaderParams(
loaderOptions?: PluginCliLoaderOptions,
) {
const onlyPluginIds = resolvePrimaryCommandManifestPluginIds(context, params?.primaryCommand);
return buildPluginRuntimeLoadOptions(context, {
return buildPluginRuntimeLoadOptionsWithEffect(context, {
...loaderOptions,
...(onlyPluginIds && onlyPluginIds.length > 0 ? { onlyPluginIds } : {}),
});
@@ -133,7 +135,7 @@ export function resolvePluginCliLoadContext(params: {
env?: NodeJS.ProcessEnv;
logger: PluginLogger;
}): PluginCliLoadContext {
return resolvePluginRuntimeLoadContext({
return resolvePluginRuntimeLoadContextWithEffect({
config: params.cfg,
env: params.env,
logger: params.logger,
@@ -177,7 +179,7 @@ export async function loadPluginCliCommandRegistryWithContext(params: {
return {
...params.context,
registry: loadOpenClawPlugins(
buildPluginRuntimeLoadOptions(params.context, {
buildPluginRuntimeLoadOptionsWithEffect(params.context, {
...params.loaderOptions,
...(onlyPluginIds && onlyPluginIds.length > 0 ? { onlyPluginIds } : {}),
activate: false,

View File

@@ -0,0 +1,58 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import {
buildPluginRuntimeLoadOptionsWithEffect,
resolvePluginRuntimeLoadContextWithEffect,
} from "./load-context-effect.js";
function createLogger() {
return {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
};
}
describe("plugin runtime Effect load context", () => {
it("resolves runtime load context through an Effect layer", () => {
const config = { plugins: { enabled: true } } as OpenClawConfig;
const env = { OPENCLAW_TEST: "1" };
const logger = createLogger();
const context = resolvePluginRuntimeLoadContextWithEffect({
config,
env,
logger,
workspaceDir: "/tmp/openclaw-agent",
});
expect(context.rawConfig).toBe(config);
expect(context.env).toBe(env);
expect(context.logger).toBe(logger);
expect(context.workspaceDir).toBe("/tmp/openclaw-agent");
});
it("builds load options from an existing context through Effect", () => {
const config = { plugins: { enabled: true } } as OpenClawConfig;
const context = resolvePluginRuntimeLoadContextWithEffect({
config,
env: {},
logger: createLogger(),
workspaceDir: "/tmp/openclaw-agent",
});
const options = buildPluginRuntimeLoadOptionsWithEffect(context, {
onlyPluginIds: ["demo"],
cache: false,
activate: false,
});
expect(options.config).toBe(context.config);
expect(options.workspaceDir).toBe("/tmp/openclaw-agent");
expect(options.onlyPluginIds).toEqual(["demo"]);
expect(options.cache).toBe(false);
expect(options.activate).toBe(false);
});
});

View File

@@ -0,0 +1,57 @@
import { Context, Effect, Layer } from "effect";
import { runOpenClawEffectSync, syncEffect } from "../../effect-runtime/index.js";
import type { PluginLoadOptions } from "../loader.js";
import {
buildPluginRuntimeLoadOptions,
resolvePluginRuntimeLoadContext,
type PluginRuntimeLoadContext,
type PluginRuntimeLoadContextOptions,
} from "./load-context.js";
export const PluginRuntimeLoadContextTag =
Context.GenericTag<PluginRuntimeLoadContext>("openclaw/PluginRuntimeLoadContext");
export function pluginRuntimeLoadContextLayer(
options?: PluginRuntimeLoadContextOptions,
): Layer.Layer<PluginRuntimeLoadContext> {
return Layer.effect(
PluginRuntimeLoadContextTag,
syncEffect({
try: () => resolvePluginRuntimeLoadContext(options),
}),
);
}
export function pluginRuntimeLoadContextValueLayer(
context: PluginRuntimeLoadContext,
): Layer.Layer<PluginRuntimeLoadContext> {
return Layer.succeed(PluginRuntimeLoadContextTag, context);
}
export function buildPluginRuntimeLoadOptionsEffect(
overrides?: Partial<PluginLoadOptions>,
): Effect.Effect<PluginLoadOptions, never, PluginRuntimeLoadContext> {
return Effect.map(PluginRuntimeLoadContextTag, (context) =>
buildPluginRuntimeLoadOptions(context, overrides),
);
}
export function resolvePluginRuntimeLoadContextWithEffect(
options?: PluginRuntimeLoadContextOptions,
): PluginRuntimeLoadContext {
return runOpenClawEffectSync(
PluginRuntimeLoadContextTag.pipe(Effect.provide(pluginRuntimeLoadContextLayer(options))),
);
}
export function buildPluginRuntimeLoadOptionsWithEffect(
context: PluginRuntimeLoadContext,
overrides?: Partial<PluginLoadOptions>,
): PluginLoadOptions {
return runOpenClawEffectSync(
buildPluginRuntimeLoadOptionsEffect(overrides).pipe(
Effect.provide(pluginRuntimeLoadContextValueLayer(context)),
),
);
}

View File

@@ -1,3 +1,4 @@
import { runRetryingPromise } from "../effect-runtime/retry.js";
import { sleepWithAbort } from "../infra/backoff.js";
import { formatErrorMessage } from "../infra/errors.js";
@@ -232,35 +233,30 @@ export async function executeProviderOperationWithRetry<T>(params: {
const retryConfig = providerOperationRetryConfig(params.stage, params.retry);
const retryOptions = resolveTransientProviderRetryOptions(retryConfig);
const maxAttempts = resolveTransientProviderAttempts(retryOptions);
let lastError: unknown;
for (let attemptNumber = 1; attemptNumber <= maxAttempts; attemptNumber += 1) {
try {
return await params.operation();
} catch (error) {
lastError = error;
const message = formatErrorMessage(error);
if (
!retryOptions ||
!shouldRetrySameKeyProviderOperation({
options: retryOptions,
error,
message,
provider: params.provider,
apiKeyIndex: 0,
attemptNumber,
maxAttempts,
stage: params.stage,
})
) {
throw error;
return await runRetryingPromise({
operation: params.operation,
maxAttempts,
shouldRetry: (error, attemptNumber) => {
if (!retryOptions) {
return false;
}
const delayMs = resolveTransientProviderDelayMs(retryOptions, attemptNumber);
const sleep = retryOptions.sleep ?? sleepWithAbort;
await sleep(delayMs, retryOptions.signal);
}
}
throw lastError;
return shouldRetrySameKeyProviderOperation({
options: retryOptions,
error,
message: formatErrorMessage(error),
provider: params.provider,
apiKeyIndex: 0,
attemptNumber,
maxAttempts,
stage: params.stage,
});
},
resolveDelayMs: (attemptNumber) =>
retryOptions ? resolveTransientProviderDelayMs(retryOptions, attemptNumber) : 0,
sleep: async (delayMs) => {
const sleep = retryOptions?.sleep ?? sleepWithAbort;
await sleep(delayMs, retryOptions?.signal);
},
});
}

View File

@@ -70,6 +70,11 @@ installPiEmbeddedMock();
vi.doMock("../../../src/agents/pi-embedded-runner/runs.js", () => ({
abortEmbeddedPiRun: (...args: unknown[]) => piEmbeddedMocks.abortEmbeddedPiRun(...args),
resolveActiveEmbeddedRunSessionId: (...args: unknown[]) =>
piEmbeddedMocks.resolveActiveEmbeddedRunSessionId(...args),
isEmbeddedPiRunActive: (...args: unknown[]) => piEmbeddedMocks.isEmbeddedPiRunActive(...args),
isEmbeddedPiRunStreaming: (...args: unknown[]) =>
piEmbeddedMocks.isEmbeddedPiRunStreaming(...args),
formatEmbeddedPiQueueFailureSummary: (outcome: { reason?: string; sessionId?: string }) =>
outcome.reason && outcome.sessionId
? `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=live`