Compare commits

..

1 Commits

Author SHA1 Message Date
Josh Lehman
52c9e71d53 fix: align memory vector DB typing with node sqlite
Regeneration-Prompt: |
  Mainline check/prep was failing in memory-core because replaceMemoryVectorRow
  accepted a handwritten database shape whose prepare().run signature used
  unknown[] and was not assignable from node:sqlite DatabaseSync. Keep the
  helper behavior the same, preserve the existing call sites and test, and fix
  the type seam by aligning the helper with the real node:sqlite prepare type
  instead of widening individual callers.
2026-04-06 13:32:16 -07:00
276 changed files with 2657 additions and 9976 deletions

View File

@@ -6,18 +6,15 @@ Docs: https://docs.openclaw.ai
### Changes
- CLI/capabilities: add a first-class `openclaw capability ...` hub for provider-backed inference workflows across model, media, web, and embedding tasks, with capability inspection, provider discovery, and consistent JSON output. Thanks @Takhoffman.
- Providers/Anthropic: restore Claude CLI as the preferred local Anthropic path in onboarding, model-auth guidance, and doctor flows again, and keep the Docker Claude CLI live lane aligned with the restored guidance.
- Plugins/webhooks: add a bundled webhook ingress plugin so external automation can create and drive bound TaskFlows through per-route shared-secret endpoints. (#61892) Thanks @mbelinky.
- Tools/media: document per-provider music and video generation capabilities, and add shared live video-to-video sweep coverage for providers that support local reference clips.
### Fixes
- CLI/capabilities: keep provider-backed capability behavior aligned with actual runtime execution by fixing explicit TTS override handling, profile-aware gateway TTS prefs resolution, per-request transcription `prompt`/`language` overrides, image output MIME/extension mismatches, configured web-search fallback behavior, and agent-vs-CLI web-search execution drift.
- Channels/secrets: keep bundled channel artifact and secret-contract loading stable under lazy loading so bundled channel secrets continue to appear in `openclaw secret`, status, and security-audit surfaces.
- Providers/xAI: recognize `api.grok.x.ai` as an xAI-native endpoint again so native xAI web-search attribution keeps working on Grok-hosted base URLs. (#61377) Thanks @jjjojoj.
- Providers/Anthropic/cache: preserve thinking blocks for Claude Opus 4.5+, Sonnet 4.5+, and newer Claude 4-family models so Anthropic prompt-cache prefixes keep matching after thinking turns. (#61793)
- Auth/OpenAI Codex OAuth: reload fresh on-disk credentials inside the locked refresh path and retry once after `refresh_token_reused` rotates only the stored refresh token, so relogin/restart recovery stops getting stuck on stale cached auth state. Thanks @owen-ever.
- Providers/Ollama: honor the selected provider's `baseUrl` during streaming so multi-Ollama setups stop routing every stream to the first configured Ollama endpoint. (#61678)
- Browser/remote CDP: retry the DevTools websocket once after remote browser restarts so healthy remote browser profiles do not fail availability checks during CDP warm-up. (#57397) Thanks @ThanhNguyxn07.
- Memory/vector recall: surface explicit warnings when `sqlite-vec` is unavailable or vector writes are degraded so memory indexing no longer reports false-success while semantic recall is impaired.
@@ -66,7 +63,6 @@ Docs: https://docs.openclaw.ai
- Discord/voice: re-arm DAVE receive passthrough without suppressing decrypt-failure rejoin recovery, and clear capture state before finalize teardown so rapid speaker restarts keep their next utterance. (#41536) Thanks @wit-oc.
- Agents/exec: keep `strictInlineEval` commands blocked after approval timeouts on both gateway and node exec hosts, so timeout fallback no longer turns timed-out inline interpreter prompts into automatic execution.
- QQ Bot/media: route gateway-side attachment and fallback downloads through guarded QQ/Tencent HTTPS fetches so QQ media handling no longer follows arbitrary remote hosts.
- Exec/runtime events: mark background `notifyOnExit` summaries and ACP parent-stream relays as untrusted system events so lower-trust runtime output no longer re-enters later turns as trusted `System:` text.
- Hooks/wake: queue direct and mapped wake-hook payloads as untrusted system events so external wake content no longer enters the main session as trusted input. (#62003)
## 2026.4.5

View File

@@ -361,6 +361,14 @@
}
}
},
"update_plan": {
"emoji": "🗺️",
"title": "Update Plan",
"detailKeys": [
"explanation",
"plan.0.step"
]
},
"gateway": {
"emoji": "🔌",
"title": "Gateway",

View File

@@ -1,116 +0,0 @@
---
summary: "Capability-first CLI for provider-backed model, media, web, and embedding workflows"
read_when:
- Adding or modifying `openclaw capability` commands
- Designing stable headless capability automation
title: "Capability CLI"
---
# Capability CLI
`openclaw capability` is the canonical headless surface for provider-backed capabilities.
It intentionally exposes capability families, not raw gateway RPC names and not raw agent tool ids.
## Command tree
```text
openclaw capability
list
inspect
model
run
list
inspect
providers
auth login
auth logout
auth status
media
image
generate
edit
describe
describe-many
providers
audio
transcribe
providers
tts
convert
voices
providers
status
enable
disable
set-provider
video
generate
describe
providers
web
search
fetch
providers
memory
embedding
create
providers
```
## Transport
Supported transport flags:
- `--local`
- `--gateway`
Default transport is implicit auto at the command-family level:
- Stateless execution commands default to local.
- Gateway-managed state commands default to gateway.
Examples:
```bash
openclaw capability model run --prompt "hello" --json
openclaw capability media image generate --prompt "friendly lobster" --json
openclaw capability media tts status --json
openclaw capability embedding create --text "hello world" --json
```
## JSON output
Capability commands normalize JSON output under a shared envelope:
```json
{
"ok": true,
"capability": "media.image.generate",
"transport": "local",
"provider": "openai",
"model": "gpt-image-1",
"attempts": [],
"outputs": []
}
```
Top-level fields are stable:
- `ok`
- `capability`
- `transport`
- `provider`
- `model`
- `attempts`
- `outputs`
- `error`
## Notes
- `model run` reuses the agent runtime so provider/model overrides behave like normal agent execution.
- `media tts status` defaults to gateway because it reflects gateway-managed TTS state.

View File

@@ -35,7 +35,6 @@ This page describes the current CLI behavior. If commands change, update this do
- [`logs`](/cli/logs)
- [`system`](/cli/system)
- [`models`](/cli/models)
- [`capability`](/cli/capability)
- [`memory`](/cli/memory)
- [`directory`](/cli/directory)
- [`nodes`](/cli/nodes)
@@ -249,16 +248,6 @@ openclaw [--dev] [--profile <name>] <command>
fallbacks list|add|remove|clear
image-fallbacks list|add|remove|clear
scan
capability
list
inspect
model run|list|inspect|providers|auth login|logout|status
media image generate|edit|describe|describe-many|providers
media audio transcribe|providers
media tts convert|voices|providers|status|enable|disable|set-provider
media video generate|describe|providers
web search|fetch|providers
embedding create|providers
auth add|login|login-github-copilot|setup-token|paste-token
auth order get|set|clear
sandbox

View File

@@ -17,13 +17,6 @@ vi.mock("../runtime-api.js", () => ({
},
}));
vi.mock("./runtime.js", () => ({
ACPX_BACKEND_ID: "acpx",
AcpxRuntime: class {},
createAgentRegistry: vi.fn(() => ({})),
createFileSessionStore: vi.fn(() => ({})),
}));
import { getAcpRuntimeBackend } from "../runtime-api.js";
import { createAcpxRuntimeService } from "./service.js";

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-config-contract.js";

View File

@@ -4,6 +4,7 @@ import type {
} from "openclaw/plugin-sdk/channel-contract";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import {
asObjectRecord,
hasLegacyAccountStreamingAliases,
hasLegacyStreamingAliases,
normalizeLegacyDmAliases,
@@ -11,12 +12,6 @@ import {
} from "openclaw/plugin-sdk/runtime-doctor";
import { resolveDiscordPreviewStreamMode } from "./preview-streaming.js";
function asObjectRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: null;
}
function hasLegacyDiscordStreamingAliases(value: unknown): boolean {
return hasLegacyStreamingAliases(value, { includePreviewChunk: true });
}

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-contract.js";

View File

@@ -1,5 +1,3 @@
import { isRecord } from "./comment-shared.js";
export const FEISHU_CARD_INTERACTION_VERSION = "ocf1";
export type FeishuCardInteractionKind = "button" | "quick" | "meta";
@@ -55,6 +53,10 @@ export type DecodedFeishuCardAction =
reason: FeishuCardInteractionReason;
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function isInteractionKind(value: unknown): value is FeishuCardInteractionKind {
return value === "button" || value === "quick" || value === "meta";
}

View File

@@ -49,7 +49,6 @@ import {
PAIRING_APPROVED_MESSAGE,
} from "./channel-runtime-api.js";
import { createFeishuClient } from "./client.js";
import { isRecord } from "./comment-shared.js";
import { FeishuConfigSchema } from "./config-schema.js";
import {
buildFeishuConversationId,
@@ -78,6 +77,10 @@ function readFeishuMediaParam(params: Record<string, unknown>): string | undefin
return media.trim() ? media : undefined;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function hasLegacyFeishuCardCommandValue(actionValue: unknown): boolean {
return (
isRecord(actionValue) &&

View File

@@ -18,13 +18,6 @@ export function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
export function asRecord(value: unknown): Record<string, unknown> | undefined {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return undefined;
}
return value as Record<string, unknown>;
}
export function extractCommentElementText(element: unknown): string | undefined {
if (!isRecord(element)) {
return undefined;

View File

@@ -13,7 +13,6 @@ import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-actio
import { maybeHandleFeishuQuickActionMenu } from "./card-ux-launcher.js";
import { createEventDispatcher } from "./client.js";
import { handleFeishuCommentEvent } from "./comment-handler.js";
import { isRecord, readString } from "./comment-shared.js";
import {
hasProcessedFeishuMessage,
recordProcessedFeishuMessage,
@@ -171,6 +170,14 @@ type FeishuBotMenuEvent = {
};
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function readString(value: unknown): string | undefined {
return typeof value === "string" ? value : undefined;
}
function readStringOrNumber(value: unknown): string | number | undefined {
return typeof value === "string" || typeof value === "number" ? value : undefined;
}

View File

@@ -1,4 +1,3 @@
import { isRecord } from "./comment-shared.js";
import { normalizeFeishuExternalKey } from "./external-keys.js";
const FALLBACK_POST_TEXT = "[Rich text message]";
@@ -16,6 +15,10 @@ type PostPayload = {
content: unknown[];
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function toStringOrEmpty(value: unknown): string {
return typeof value === "string" ? value : "";
}

View File

@@ -1,6 +1,12 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { hasConfiguredSecretInput } from "openclaw/plugin-sdk/setup";
import { asRecord } from "./comment-shared.js";
function asRecord(value: unknown): Record<string, unknown> | undefined {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return undefined;
}
return value as Record<string, unknown>;
}
function hasNonEmptyString(value: unknown): boolean {
return typeof value === "string" && value.trim().length > 0;

View File

@@ -1 +0,0 @@
export { createFirecrawlWebFetchProvider } from "./src/firecrawl-fetch-provider.js";

View File

@@ -25,23 +25,6 @@ function resolveConfiguredGoogleVideoBaseUrl(req: VideoGenerationRequest): strin
return configured ? normalizeGoogleApiBaseUrl(configured) : undefined;
}
function parseVideoSize(size: string | undefined): { width: number; height: number } | undefined {
const trimmed = size?.trim();
if (!trimmed) {
return undefined;
}
const match = /^(\d+)x(\d+)$/u.exec(trimmed);
if (!match) {
return undefined;
}
const width = Number.parseInt(match[1] ?? "", 10);
const height = Number.parseInt(match[2] ?? "", 10);
if (!Number.isFinite(width) || !Number.isFinite(height)) {
return undefined;
}
return { width, height };
}
function resolveAspectRatio(params: {
aspectRatio?: string;
size?: string;
@@ -50,11 +33,20 @@ function resolveAspectRatio(params: {
if (direct === "16:9" || direct === "9:16") {
return direct;
}
const parsedSize = parseVideoSize(params.size);
if (!parsedSize) {
const size = params.size?.trim();
if (!size) {
return undefined;
}
return parsedSize.width >= parsedSize.height ? "16:9" : "9:16";
const match = /^(\d+)x(\d+)$/u.exec(size);
if (!match) {
return undefined;
}
const width = Number.parseInt(match[1] ?? "", 10);
const height = Number.parseInt(match[2] ?? "", 10);
if (!Number.isFinite(width) || !Number.isFinite(height)) {
return undefined;
}
return width >= height ? "16:9" : "9:16";
}
function resolveResolution(params: {
@@ -67,11 +59,17 @@ function resolveResolution(params: {
if (params.resolution === "1080P") {
return "1080p";
}
const parsedSize = parseVideoSize(params.size);
if (!parsedSize) {
const size = params.size?.trim();
if (!size) {
return undefined;
}
const maxEdge = Math.max(parsedSize.width, parsedSize.height);
const match = /^(\d+)x(\d+)$/u.exec(size);
if (!match) {
return undefined;
}
const width = Number.parseInt(match[1] ?? "", 10);
const height = Number.parseInt(match[2] ?? "", 10);
const maxEdge = Math.max(width, height);
return maxEdge >= 1920 ? "1080p" : maxEdge >= 1280 ? "720p" : undefined;
}

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-contract.js";

View File

@@ -1,6 +1,131 @@
export {
generateImage,
listRuntimeImageGenerationProviders,
type GenerateImageParams,
type GenerateImageRuntimeResult,
} from "openclaw/plugin-sdk/image-generation-runtime";
import {
buildNoCapabilityModelConfiguredMessage,
resolveCapabilityModelCandidates,
throwCapabilityGenerationFailure,
} from "openclaw/plugin-sdk/media-generation-runtime-shared";
import {
createSubsystemLogger,
describeFailoverError,
getImageGenerationProvider,
isFailoverError,
listImageGenerationProviders,
parseImageGenerationModelRef,
type AuthProfileStore,
type FallbackAttempt,
type GeneratedImageAsset,
type ImageGenerationResolution,
type ImageGenerationResult,
type ImageGenerationSourceImage,
type OpenClawConfig,
} from "../api.js";
const log = createSubsystemLogger("image-generation");
export type GenerateImageParams = {
cfg: OpenClawConfig;
prompt: string;
agentDir?: string;
authStore?: AuthProfileStore;
modelOverride?: string;
count?: number;
size?: string;
aspectRatio?: string;
resolution?: ImageGenerationResolution;
inputImages?: ImageGenerationSourceImage[];
};
export type GenerateImageRuntimeResult = {
images: GeneratedImageAsset[];
provider: string;
model: string;
attempts: FallbackAttempt[];
metadata?: Record<string, unknown>;
};
function buildNoImageGenerationModelConfiguredMessage(cfg: OpenClawConfig): string {
return buildNoCapabilityModelConfiguredMessage({
capabilityLabel: "image-generation",
modelConfigKey: "imageGenerationModel",
providers: listImageGenerationProviders(cfg),
fallbackSampleRef: "google/gemini-3-pro-image-preview",
});
}
export function listRuntimeImageGenerationProviders(params?: { config?: OpenClawConfig }) {
return listImageGenerationProviders(params?.config);
}
export async function generateImage(
params: GenerateImageParams,
): Promise<GenerateImageRuntimeResult> {
const candidates = resolveCapabilityModelCandidates({
cfg: params.cfg,
modelConfig: params.cfg.agents?.defaults?.imageGenerationModel,
modelOverride: params.modelOverride,
parseModelRef: parseImageGenerationModelRef,
});
if (candidates.length === 0) {
throw new Error(buildNoImageGenerationModelConfiguredMessage(params.cfg));
}
const attempts: FallbackAttempt[] = [];
let lastError: unknown;
for (const candidate of candidates) {
const provider = getImageGenerationProvider(candidate.provider, params.cfg);
if (!provider) {
const error = `No image-generation provider registered for ${candidate.provider}`;
attempts.push({
provider: candidate.provider,
model: candidate.model,
error,
});
lastError = new Error(error);
continue;
}
try {
const result: ImageGenerationResult = await provider.generateImage({
provider: candidate.provider,
model: candidate.model,
prompt: params.prompt,
cfg: params.cfg,
agentDir: params.agentDir,
authStore: params.authStore,
count: params.count,
size: params.size,
aspectRatio: params.aspectRatio,
resolution: params.resolution,
inputImages: params.inputImages,
});
if (!Array.isArray(result.images) || result.images.length === 0) {
throw new Error("Image generation provider returned no images.");
}
return {
images: result.images,
provider: candidate.provider,
model: result.model ?? candidate.model,
attempts,
metadata: result.metadata,
};
} catch (err) {
lastError = err;
const described = isFailoverError(err) ? describeFailoverError(err) : undefined;
attempts.push({
provider: candidate.provider,
model: candidate.model,
error: described?.message ?? (err instanceof Error ? err.message : String(err)),
reason: described?.reason,
status: described?.status,
code: described?.code,
});
log.debug(`image-generation candidate failed: ${candidate.provider}/${candidate.model}`);
}
}
throwCapabilityGenerationFailure({
capabilityLabel: "image generation",
attempts,
lastError,
});
}

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-contract.js";

View File

@@ -6,11 +6,7 @@ import {
type LobsterRunner,
type LobsterRunnerParams,
} from "./lobster-runner.js";
import {
type ManagedLobsterFlowResult,
resumeManagedLobsterFlow,
runManagedLobsterFlow,
} from "./lobster-taskflow.js";
import { resumeManagedLobsterFlow, runManagedLobsterFlow } from "./lobster-taskflow.js";
type BoundTaskFlow = ReturnType<
NonNullable<OpenClawPluginApi["runtime"]>["taskFlow"]["bindSession"]
@@ -194,20 +190,6 @@ function formatManagedFlowResult(result: ManagedFlowSuccessResult) {
};
}
function requireTaskFlowRuntime(taskFlow: BoundTaskFlow | undefined, action: "run" | "resume") {
if (!taskFlow) {
throw new Error(`Managed TaskFlow ${action} mode requires a bound taskFlow runtime`);
}
return taskFlow;
}
function resolveManagedFlowToolResult(result: ManagedLobsterFlowResult) {
if (!result.ok) {
throw result.error;
}
return formatManagedFlowResult(result);
}
export function createLobsterTool(api: OpenClawPluginApi, options?: LobsterToolOptions) {
const runner = options?.runner ?? createEmbeddedLobsterRunner();
return {
@@ -271,37 +253,47 @@ export function createLobsterTool(api: OpenClawPluginApi, options?: LobsterToolO
if (action === "run") {
const flowParams = parseRunFlowParams(params);
if (flowParams) {
return resolveManagedFlowToolResult(
await runManagedLobsterFlow({
taskFlow: requireTaskFlowRuntime(taskFlow, "run"),
runner,
runnerParams,
controllerId: flowParams.controllerId,
goal: flowParams.goal,
...(flowParams.stateJson !== undefined ? { stateJson: flowParams.stateJson } : {}),
...(flowParams.currentStep ? { currentStep: flowParams.currentStep } : {}),
...(flowParams.waitingStep ? { waitingStep: flowParams.waitingStep } : {}),
}),
);
if (!taskFlow) {
throw new Error("Managed TaskFlow run mode requires a bound taskFlow runtime");
}
const result = await runManagedLobsterFlow({
taskFlow,
runner,
runnerParams,
controllerId: flowParams.controllerId,
goal: flowParams.goal,
...(flowParams.stateJson !== undefined ? { stateJson: flowParams.stateJson } : {}),
...(flowParams.currentStep ? { currentStep: flowParams.currentStep } : {}),
...(flowParams.waitingStep ? { waitingStep: flowParams.waitingStep } : {}),
});
if (!result.ok) {
throw result.error;
}
return formatManagedFlowResult(result);
}
} else {
const flowParams = parseResumeFlowParams(params);
if (flowParams) {
return resolveManagedFlowToolResult(
await resumeManagedLobsterFlow({
taskFlow: requireTaskFlowRuntime(taskFlow, "resume"),
runner,
runnerParams: runnerParams as LobsterRunnerParams & {
action: "resume";
token: string;
approve: boolean;
},
flowId: flowParams.flowId,
expectedRevision: flowParams.expectedRevision,
...(flowParams.currentStep ? { currentStep: flowParams.currentStep } : {}),
...(flowParams.waitingStep ? { waitingStep: flowParams.waitingStep } : {}),
}),
);
if (!taskFlow) {
throw new Error("Managed TaskFlow resume mode requires a bound taskFlow runtime");
}
const result = await resumeManagedLobsterFlow({
taskFlow,
runner,
runnerParams: runnerParams as LobsterRunnerParams & {
action: "resume";
token: string;
approve: boolean;
},
flowId: flowParams.flowId,
expectedRevision: flowParams.expectedRevision,
...(flowParams.currentStep ? { currentStep: flowParams.currentStep } : {}),
...(flowParams.waitingStep ? { waitingStep: flowParams.waitingStep } : {}),
});
if (!result.ok) {
throw result.error;
}
return formatManagedFlowResult(result);
}
}

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-contract.js";

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-contract.js";

View File

@@ -462,10 +462,6 @@ export const mattermostPlugin: ChannelPlugin<ResolvedMattermostAccount> = create
: "channel",
),
},
resolveReplyTransport: ({ threadId, replyToId }) => ({
replyToId: replyToId ?? (threadId != null ? String(threadId) : undefined),
threadId,
}),
},
security: mattermostSecurityAdapter,
outbound: {

View File

@@ -34,7 +34,6 @@ import type {
MemorySearchCommandOptions,
} from "./cli.types.js";
import { previewRemDreaming } from "./dreaming-phases.js";
import { asRecord } from "./dreaming-shared.js";
import { resolveShortTermPromotionDreamingConfig } from "./dreaming.js";
import {
applyShortTermPromotions,
@@ -71,6 +70,13 @@ type LoadedMemoryCommandConfig = {
diagnostics: string[];
};
function asRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
function getMemoryCommandSecretTargetIds(): Set<string> {
return new Set([
"agents.defaults.memorySearch.remote.apiKey",

View File

@@ -1,8 +1,14 @@
import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core";
import { resolveMemoryDreamingConfig } from "openclaw/plugin-sdk/memory-core-host-status";
import { asRecord } from "./dreaming-shared.js";
import { resolveShortTermPromotionDreamingConfig } from "./dreaming.js";
function asRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
function resolveMemoryCorePluginConfig(cfg: OpenClawConfig): Record<string, unknown> {
const entry = asRecord(cfg.plugins?.entries?.["memory-core"]);
return asRecord(entry?.config) ?? {};

View File

@@ -13,7 +13,6 @@ import {
} from "openclaw/plugin-sdk/memory-core-host-status";
import { writeDailyDreamingPhaseBlock } from "./dreaming-markdown.js";
import { generateAndAppendDreamNarrative, type NarrativePhaseData } from "./dreaming-narrative.js";
import { asRecord, formatErrorMessage, normalizeTrimmedString } from "./dreaming-shared.js";
import {
readShortTermRecallEntries,
recordDreamingPhaseSignals,
@@ -99,6 +98,28 @@ const MANAGED_DAILY_DREAMING_BLOCKS = [
},
] as const;
function asRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
function normalizeTrimmedString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
function formatErrorMessage(err: unknown): string {
if (err instanceof Error) {
return err.message;
}
return String(err);
}
function buildCronDescription(params: {
tag: string;
phase: "light" | "rem";

View File

@@ -1,18 +0,0 @@
export function asRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
export function normalizeTrimmedString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
export function formatErrorMessage(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}

View File

@@ -13,7 +13,6 @@ import {
import { writeDeepDreamingReport } from "./dreaming-markdown.js";
import { generateAndAppendDreamNarrative, type NarrativePhaseData } from "./dreaming-narrative.js";
import { runDreamingSweepPhases } from "./dreaming-phases.js";
import { asRecord, formatErrorMessage, normalizeTrimmedString } from "./dreaming-shared.js";
import {
applyShortTermPromotions,
repairShortTermPromotionArtifacts,
@@ -104,6 +103,28 @@ type ReconcileResult =
| { status: "updated"; removed: number }
| { status: "noop"; removed: number };
function asRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
function normalizeTrimmedString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
function formatErrorMessage(err: unknown): string {
if (err instanceof Error) {
return err.message;
}
return String(err);
}
function formatRepairSummary(repair: {
rewroteStore: boolean;
removedInvalidEntries: number;

View File

@@ -0,0 +1,178 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import { afterAll, beforeAll, beforeEach, expect, vi, type Mock } from "vitest";
import type { MemoryIndexManager } from "./index.js";
type EmbeddingTestMocksModule = typeof import("./embedding.test-mocks.js");
type MemoryIndexModule = typeof import("./index.js");
type MemoryEmbeddingProvidersModule =
typeof import("../../../../src/plugins/memory-embedding-providers.js");
type MemorySearchManagerHandle = Awaited<
ReturnType<MemoryIndexModule["getMemorySearchManager"]>
>["manager"];
export function installEmbeddingManagerFixture(opts: {
fixturePrefix: string;
largeTokens: number;
smallTokens: number;
createCfg: (params: {
workspaceDir: string;
indexPath: string;
tokens: number;
}) => OpenClawConfig;
resetIndexEachTest?: boolean;
}) {
const resetIndexEachTest = opts.resetIndexEachTest ?? true;
let fixtureRoot: string | undefined;
let workspaceDir: string | undefined;
let memoryDir: string | undefined;
let managerLarge: MemoryIndexManager | undefined;
let managerSmall: MemoryIndexManager | undefined;
let embedBatch: Mock<(texts: string[]) => Promise<number[][]>> | undefined;
let getMemorySearchManager: MemoryIndexModule["getMemorySearchManager"];
let resetEmbeddingMocks: EmbeddingTestMocksModule["resetEmbeddingMocks"];
let clearRegistry: MemoryEmbeddingProvidersModule["clearMemoryEmbeddingProviders"];
let registerAdapter: MemoryEmbeddingProvidersModule["registerMemoryEmbeddingProvider"];
let restoreRegistry: MemoryEmbeddingProvidersModule["restoreRegisteredMemoryEmbeddingProviders"];
let listRegistry: MemoryEmbeddingProvidersModule["listRegisteredMemoryEmbeddingProviders"];
let originalRegistry:
| ReturnType<MemoryEmbeddingProvidersModule["listRegisteredMemoryEmbeddingProviders"]>
| undefined;
const resetManager = (manager: MemoryIndexManager) => {
(manager as unknown as { resetIndex: () => void }).resetIndex();
(manager as unknown as { dirty: boolean }).dirty = true;
};
const requireValue = <T>(value: T | undefined, name: string): T => {
if (!value) {
throw new Error(`${name} missing`);
}
return value;
};
const requireIndexManager = (
manager: MemorySearchManagerHandle,
name: string,
): MemoryIndexManager => {
if (!manager) {
throw new Error(`${name} missing`);
}
if (!("resetIndex" in manager) || typeof manager.resetIndex !== "function") {
throw new Error(`${name} is not a MemoryIndexManager`);
}
return manager as unknown as MemoryIndexManager;
};
const createManager = async (params: {
indexPath: string;
tokens: number;
name: string;
}): Promise<MemoryIndexManager> => {
const managerResult = await getMemorySearchManager({
cfg: opts.createCfg({
workspaceDir: requireValue(workspaceDir, "workspaceDir"),
indexPath: params.indexPath,
tokens: params.tokens,
}),
agentId: "main",
});
expect(managerResult.manager).not.toBeNull();
return requireIndexManager(managerResult.manager, params.name);
};
beforeAll(async () => {
vi.resetModules();
await import("./embedding.test-mocks.js");
const embeddingMocks = await import("./embedding.test-mocks.js");
embedBatch = embeddingMocks.getEmbedBatchMock();
resetEmbeddingMocks = embeddingMocks.resetEmbeddingMocks;
({ getMemorySearchManager } = await import("./index.js"));
({
clearMemoryEmbeddingProviders: clearRegistry,
registerMemoryEmbeddingProvider: registerAdapter,
restoreRegisteredMemoryEmbeddingProviders: restoreRegistry,
listRegisteredMemoryEmbeddingProviders: listRegistry,
} = await import("../../../../src/plugins/memory-embedding-providers.js"));
const savedRegistry = listRegistry();
clearRegistry();
registerAdapter({
id: "openai",
defaultModel: "mock-embed",
transport: "remote",
create: async () => ({ provider: null }),
});
originalRegistry = savedRegistry;
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), opts.fixturePrefix));
workspaceDir = path.join(fixtureRoot, "workspace");
memoryDir = path.join(workspaceDir, "memory");
await fs.mkdir(memoryDir, { recursive: true });
});
afterAll(async () => {
if (managerLarge) {
await managerLarge.close();
managerLarge = undefined;
}
if (managerSmall) {
await managerSmall.close();
managerSmall = undefined;
}
if (fixtureRoot) {
await fs.rm(fixtureRoot, { recursive: true, force: true });
fixtureRoot = undefined;
}
if (originalRegistry) {
restoreRegistry(originalRegistry);
originalRegistry = undefined;
} else {
clearRegistry();
}
});
beforeEach(async () => {
resetEmbeddingMocks();
const dir = requireValue(memoryDir, "memoryDir");
await fs.rm(dir, { recursive: true, force: true });
await fs.mkdir(dir, { recursive: true });
if (resetIndexEachTest) {
if (managerLarge) {
resetManager(managerLarge);
}
if (managerSmall) {
resetManager(managerSmall);
}
}
});
return {
get embedBatch() {
return requireValue(embedBatch, "embedBatch");
},
getFixtureRoot: () => requireValue(fixtureRoot, "fixtureRoot"),
getWorkspaceDir: () => requireValue(workspaceDir, "workspaceDir"),
getMemoryDir: () => requireValue(memoryDir, "memoryDir"),
getManagerLarge: async () => {
managerLarge ??= await createManager({
indexPath: path.join(requireValue(fixtureRoot, "fixtureRoot"), "index.large.sqlite"),
tokens: opts.largeTokens,
name: "managerLarge",
});
return managerLarge;
},
getManagerSmall: async () => {
managerSmall ??= await createManager({
indexPath: path.join(requireValue(fixtureRoot, "fixtureRoot"), "index.small.sqlite"),
tokens: opts.smallTokens,
name: "managerSmall",
});
return managerSmall;
},
resetManager,
};
}

View File

@@ -12,7 +12,6 @@ import {
type MemoryEmbeddingProviderCreateOptions,
type MemoryEmbeddingProviderRuntime,
} from "openclaw/plugin-sdk/memory-core-host-engine-embeddings";
import { formatErrorMessage } from "../dreaming-shared.js";
import { canAutoSelectLocal } from "./provider-adapters.js";
export {
@@ -44,6 +43,10 @@ type CreateEmbeddingProviderOptions = MemoryEmbeddingProviderCreateOptions & {
fallback: EmbeddingProviderFallback;
};
function formatErrorMessage(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}
function formatProviderError(adapter: MemoryEmbeddingProviderAdapter, err: unknown): string {
return adapter.formatSetupError?.(err) ?? formatErrorMessage(err);
}

View File

@@ -485,6 +485,19 @@ describe("memory index", () => {
}
});
it("reuses cached embeddings on forced reindex", async () => {
const cfg = createCfg({ storePath: indexMainPath, cacheEnabled: true });
const manager = await getPersistentManager(cfg);
// Seed the embedding cache once, then ensure a forced reindex doesn't
// re-embed when the cache is enabled.
await manager.sync({ reason: "test" });
const afterFirst = embedBatchCalls;
expect(afterFirst).toBeGreaterThan(0);
await manager.sync({ force: true });
expect(embedBatchCalls).toBe(afterFirst);
});
it.skip("finds keyword matches via hybrid search when query embedding is zero", async () => {
await expectHybridKeywordSearchFindsMemory(
createCfg({
@@ -514,6 +527,50 @@ describe("memory index", () => {
expect(status.vector?.available).toBe(available);
});
it("triggers full reindex and cleans up old-model FTS rows when switching from provider to FTS-only", async () => {
const sharedStorePath = path.join(workspaceDir, "index-provider-to-fts-only.sqlite");
const providerCfg = createCfg({ storePath: sharedStorePath, hybrid: { enabled: true } });
const providerResult = await getMemorySearchManager({ cfg: providerCfg, agentId: "main" });
const providerManager = requireManager(providerResult);
managersForCleanup.add(providerManager);
resetManagerForTest(providerManager);
await providerManager.sync({ reason: "test" });
const providerDb = (
providerManager as unknown as { db: { prepare: (s: string) => { get: () => { c: number } } } }
).db;
const providerFtsRows = providerDb
.prepare("SELECT COUNT(*) as c FROM chunks_fts WHERE model = 'mock-embed'")
.get();
expect(providerFtsRows.c).toBeGreaterThan(0);
await providerManager.close();
managersForCleanup.delete(providerManager);
forceNoProvider = true;
const ftsOnlyCfg = createCfg({ storePath: sharedStorePath, hybrid: { enabled: true } });
const ftsOnlyResult = await getMemorySearchManager({ cfg: ftsOnlyCfg, agentId: "main" });
const ftsOnlyManager = requireManager(ftsOnlyResult);
managersForCleanup.add(ftsOnlyManager);
await ftsOnlyManager.sync({ reason: "test" });
const db = (
ftsOnlyManager as unknown as { db: { prepare: (s: string) => { get: () => { c: number } } } }
).db;
const oldRows = db
.prepare("SELECT COUNT(*) as c FROM chunks_fts WHERE model = 'mock-embed'")
.get();
expect(oldRows.c).toBe(0);
const newRows = db
.prepare("SELECT COUNT(*) as c FROM chunks_fts WHERE model = 'fts-only'")
.get();
expect(newRows.c).toBeGreaterThan(0);
});
it("builds FTS index and returns search results when no embedding provider is available", async () => {
forceNoProvider = true;

View File

@@ -1,77 +0,0 @@
import { describe, expect, it } from "vitest";
import {
MEMORY_BATCH_FAILURE_LIMIT,
recordMemoryBatchFailure,
resetMemoryBatchFailureState,
} from "./manager-batch-state.js";
describe("memory batch state", () => {
it("resets failures after recovery", () => {
expect(
resetMemoryBatchFailureState({
enabled: true,
count: 1,
lastError: "batch failed",
lastProvider: "openai",
}),
).toEqual({
enabled: true,
count: 0,
lastError: undefined,
lastProvider: undefined,
});
});
it("disables batching after repeated failures", () => {
const once = recordMemoryBatchFailure(
{ enabled: true, count: 0 },
{ provider: "openai", message: "batch failed", attempts: 1 },
);
expect(once).toEqual({
enabled: true,
count: 1,
lastError: "batch failed",
lastProvider: "openai",
});
const twice = recordMemoryBatchFailure(once, {
provider: "openai",
message: "batch failed again",
attempts: 1,
});
expect(twice).toEqual({
enabled: false,
count: MEMORY_BATCH_FAILURE_LIMIT,
lastError: "batch failed again",
lastProvider: "openai",
});
});
it("force-disables batching immediately", () => {
expect(
recordMemoryBatchFailure(
{ enabled: true, count: 0 },
{ provider: "gemini", message: "not available", forceDisable: true },
),
).toEqual({
enabled: false,
count: MEMORY_BATCH_FAILURE_LIMIT,
lastError: "not available",
lastProvider: "gemini",
});
});
it("leaves disabled state unchanged", () => {
expect(
recordMemoryBatchFailure(
{ enabled: false, count: MEMORY_BATCH_FAILURE_LIMIT, lastError: "x", lastProvider: "y" },
{ provider: "openai", message: "ignored" },
),
).toEqual({
enabled: false,
count: MEMORY_BATCH_FAILURE_LIMIT,
lastError: "x",
lastProvider: "y",
});
});
});

View File

@@ -1,44 +0,0 @@
export const MEMORY_BATCH_FAILURE_LIMIT = 2;
export type MemoryBatchFailureState = {
enabled: boolean;
count: number;
lastError?: string;
lastProvider?: string;
};
export function resetMemoryBatchFailureState(
state: MemoryBatchFailureState,
): MemoryBatchFailureState {
return {
...state,
count: 0,
lastError: undefined,
lastProvider: undefined,
};
}
export function recordMemoryBatchFailure(
state: MemoryBatchFailureState,
params: {
provider: string;
message: string;
attempts?: number;
forceDisable?: boolean;
},
): MemoryBatchFailureState {
if (!state.enabled) {
return state;
}
const increment = params.forceDisable
? MEMORY_BATCH_FAILURE_LIMIT
: Math.max(1, params.attempts ?? 1);
const count = state.count + increment;
const enabled = !(params.forceDisable || count >= MEMORY_BATCH_FAILURE_LIMIT);
return {
enabled,
count,
lastError: params.message,
lastProvider: params.provider,
};
}

View File

@@ -1,85 +0,0 @@
import {
ensureMemoryIndexSchema,
requireNodeSqlite,
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
import { describe, expect, it, vi } from "vitest";
import {
collectMemoryCachedEmbeddings,
loadMemoryEmbeddingCache,
upsertMemoryEmbeddingCache,
} from "./manager-embedding-cache.js";
describe("memory embedding cache", () => {
const { DatabaseSync } = requireNodeSqlite();
function createDb() {
const db = new DatabaseSync(":memory:");
ensureMemoryIndexSchema({
db,
embeddingCacheTable: "embedding_cache",
cacheEnabled: true,
ftsTable: "chunks_fts",
ftsEnabled: false,
ftsTokenizer: "unicode61",
});
return db;
}
it("loads cached embeddings for the active provider key", () => {
const db = createDb();
try {
upsertMemoryEmbeddingCache({
db,
enabled: true,
provider: { id: "openai", model: "text-embedding-3-small" },
providerKey: "provider-key",
entries: [
{ hash: "a", embedding: [0.1, 0.2] },
{ hash: "b", embedding: [0.3, 0.4] },
],
now: 123,
});
const cached = loadMemoryEmbeddingCache({
db,
enabled: true,
provider: { id: "openai", model: "text-embedding-3-small" },
providerKey: "provider-key",
hashes: ["a", "b", "a"],
});
expect(cached).toEqual(
new Map([
["a", [0.1, 0.2]],
["b", [0.3, 0.4]],
]),
);
} finally {
db.close();
}
});
it("reuses cached embeddings on forced reindex instead of scheduling new embeds", () => {
const cached = new Map<string, number[]>([
["alpha", [0.1, 0.2]],
["beta", [0.3, 0.4]],
]);
const embedMissing = vi.fn();
const plan = collectMemoryCachedEmbeddings({
chunks: [{ hash: "alpha" }, { hash: "beta" }],
cached,
});
if (plan.missing.length > 0) {
embedMissing(plan.missing);
}
expect(plan.embeddings).toEqual([
[0.1, 0.2],
[0.3, 0.4],
]);
expect(plan.missing).toHaveLength(0);
expect(embedMissing).not.toHaveBeenCalled();
});
});

View File

@@ -1,117 +0,0 @@
import type { DatabaseSync, SQLInputValue } from "node:sqlite";
import {
parseEmbedding,
type MemoryChunk,
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
type EmbeddingCacheDb = Pick<DatabaseSync, "prepare">;
type EmbeddingProviderRef = {
id: string;
model: string;
};
export function loadMemoryEmbeddingCache(params: {
db: EmbeddingCacheDb;
enabled: boolean;
provider: EmbeddingProviderRef | null;
providerKey: string | null;
hashes: string[];
tableName?: string;
}): Map<string, number[]> {
const provider = params.provider;
if (!params.enabled || !provider || !params.providerKey || params.hashes.length === 0) {
return new Map();
}
const unique: string[] = [];
const seen = new Set<string>();
for (const hash of params.hashes) {
if (!hash || seen.has(hash)) {
continue;
}
seen.add(hash);
unique.push(hash);
}
if (unique.length === 0) {
return new Map();
}
const tableName = params.tableName ?? "embedding_cache";
const out = new Map<string, number[]>();
const baseParams: SQLInputValue[] = [provider.id, provider.model, params.providerKey];
const batchSize = 400;
for (let start = 0; start < unique.length; start += batchSize) {
const batch = unique.slice(start, start + batchSize);
const placeholders = batch.map(() => "?").join(", ");
const rows = params.db
.prepare(
`SELECT hash, embedding FROM ${tableName}\n` +
` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`,
)
.all(...baseParams, ...batch) as Array<{ hash: string; embedding: string }>;
for (const row of rows) {
out.set(row.hash, parseEmbedding(row.embedding));
}
}
return out;
}
export function upsertMemoryEmbeddingCache(params: {
db: EmbeddingCacheDb;
enabled: boolean;
provider: EmbeddingProviderRef | null;
providerKey: string | null;
entries: Array<{ hash: string; embedding: number[] }>;
now?: number;
tableName?: string;
}): void {
const provider = params.provider;
if (!params.enabled || !provider || !params.providerKey || params.entries.length === 0) {
return;
}
const tableName = params.tableName ?? "embedding_cache";
const now = params.now ?? Date.now();
const stmt = params.db.prepare(
`INSERT INTO ${tableName} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` +
` VALUES (?, ?, ?, ?, ?, ?, ?)\n` +
` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` +
` embedding=excluded.embedding,\n` +
` dims=excluded.dims,\n` +
` updated_at=excluded.updated_at`,
);
for (const entry of params.entries) {
const embedding = entry.embedding ?? [];
stmt.run(
provider.id,
provider.model,
params.providerKey,
entry.hash,
JSON.stringify(embedding),
embedding.length,
now,
);
}
}
export function collectMemoryCachedEmbeddings<T extends Pick<MemoryChunk, "hash">>(params: {
chunks: T[];
cached: Map<string, number[]>;
}): {
embeddings: number[][];
missing: Array<{ index: number; chunk: T }>;
} {
const embeddings: number[][] = Array.from({ length: params.chunks.length }, () => []);
const missing: Array<{ index: number; chunk: T }> = [];
for (let index = 0; index < params.chunks.length; index += 1) {
const chunk = params.chunks[index];
const hit = chunk?.hash ? params.cached.get(chunk.hash) : undefined;
if (hit && hit.length > 0) {
embeddings[index] = hit;
} else if (chunk) {
missing.push({ index, chunk });
}
}
return { embeddings, missing };
}

View File

@@ -1,6 +1,8 @@
import fs from "node:fs/promises";
import {
enforceEmbeddingMaxInputTokens,
estimateStructuredEmbeddingInputBytes,
estimateUtf8Bytes,
hasNonTextEmbeddingParts,
type EmbeddingInput,
} from "openclaw/plugin-sdk/memory-core-host-engine-embeddings";
@@ -10,30 +12,12 @@ import {
buildMultimodalChunkForIndexing,
chunkMarkdown,
hashText,
parseEmbedding,
remapChunkLines,
type MemoryChunk,
type MemoryFileEntry,
type MemorySource,
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
import {
MEMORY_BATCH_FAILURE_LIMIT,
recordMemoryBatchFailure,
resetMemoryBatchFailureState,
} from "./manager-batch-state.js";
import {
collectMemoryCachedEmbeddings,
loadMemoryEmbeddingCache,
upsertMemoryEmbeddingCache,
} from "./manager-embedding-cache.js";
import {
buildMemoryEmbeddingBatches,
buildTextEmbeddingInputs,
filterNonEmptyMemoryChunks,
isRetryableMemoryEmbeddingError,
resolveMemoryEmbeddingRetryDelay,
runMemoryEmbeddingRetryLoop,
} from "./manager-embedding-policy.js";
import { deleteMemoryFtsRows } from "./manager-fts-state.js";
import { MemoryManagerSyncOps } from "./manager-sync-ops.js";
import { replaceMemoryVectorRow } from "./manager-vector-write.js";
@@ -45,6 +29,7 @@ const EMBEDDING_INDEX_CONCURRENCY = 4;
const EMBEDDING_RETRY_MAX_ATTEMPTS = 3;
const EMBEDDING_RETRY_BASE_DELAY_MS = 500;
const EMBEDDING_RETRY_MAX_DELAY_MS = 8000;
const BATCH_FAILURE_LIMIT = 2;
const EMBEDDING_QUERY_TIMEOUT_REMOTE_MS = 60_000;
const EMBEDDING_QUERY_TIMEOUT_LOCAL_MS = 5 * 60_000;
const EMBEDDING_BATCH_TIMEOUT_REMOTE_MS = 2 * 60_000;
@@ -58,6 +43,108 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
protected abstract batchFailureLastProvider?: string;
protected abstract batchFailureLock: Promise<void>;
private buildEmbeddingBatches(chunks: MemoryChunk[]): MemoryChunk[][] {
const batches: MemoryChunk[][] = [];
let current: MemoryChunk[] = [];
let currentTokens = 0;
for (const chunk of chunks) {
const estimate = chunk.embeddingInput
? estimateStructuredEmbeddingInputBytes(chunk.embeddingInput)
: estimateUtf8Bytes(chunk.text);
const wouldExceed =
current.length > 0 && currentTokens + estimate > EMBEDDING_BATCH_MAX_TOKENS;
if (wouldExceed) {
batches.push(current);
current = [];
currentTokens = 0;
}
if (current.length === 0 && estimate > EMBEDDING_BATCH_MAX_TOKENS) {
batches.push([chunk]);
continue;
}
current.push(chunk);
currentTokens += estimate;
}
if (current.length > 0) {
batches.push(current);
}
return batches;
}
private loadEmbeddingCache(hashes: string[]): Map<string, number[]> {
if (!this.cache.enabled || !this.provider) {
return new Map();
}
if (hashes.length === 0) {
return new Map();
}
const unique: string[] = [];
const seen = new Set<string>();
for (const hash of hashes) {
if (!hash) {
continue;
}
if (seen.has(hash)) {
continue;
}
seen.add(hash);
unique.push(hash);
}
if (unique.length === 0) {
return new Map();
}
const out = new Map<string, number[]>();
const baseParams = [this.provider.id, this.provider.model, this.providerKey];
const batchSize = 400;
for (let start = 0; start < unique.length; start += batchSize) {
const batch = unique.slice(start, start + batchSize);
const placeholders = batch.map(() => "?").join(", ");
const rows = this.db
.prepare(
`SELECT hash, embedding FROM ${EMBEDDING_CACHE_TABLE}\n` +
` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`,
)
.all(...baseParams, ...batch) as Array<{ hash: string; embedding: string }>;
for (const row of rows) {
out.set(row.hash, parseEmbedding(row.embedding));
}
}
return out;
}
private upsertEmbeddingCache(entries: Array<{ hash: string; embedding: number[] }>): void {
if (!this.cache.enabled || !this.provider) {
return;
}
if (entries.length === 0) {
return;
}
const now = Date.now();
const stmt = this.db.prepare(
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` +
` VALUES (?, ?, ?, ?, ?, ?, ?)\n` +
` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` +
` embedding=excluded.embedding,\n` +
` dims=excluded.dims,\n` +
` updated_at=excluded.updated_at`,
);
for (const entry of entries) {
const embedding = entry.embedding ?? [];
stmt.run(
this.provider.id,
this.provider.model,
this.providerKey,
entry.hash,
JSON.stringify(embedding),
embedding.length,
now,
);
}
}
protected pruneEmbeddingCacheIfNeeded(): void {
if (!this.cache.enabled) {
return;
@@ -97,7 +184,7 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
}
const missingChunks = missing.map((m) => m.chunk);
const batches = buildMemoryEmbeddingBatches(missingChunks, EMBEDDING_BATCH_MAX_TOKENS);
const batches = this.buildEmbeddingBatches(missingChunks);
const toCache: Array<{ hash: string; embedding: number[] }> = [];
const provider = this.provider;
if (!provider) {
@@ -105,7 +192,7 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
}
let cursor = 0;
for (const batch of batches) {
const inputs = buildTextEmbeddingInputs(batch);
const inputs = batch.map((chunk) => chunk.embeddingInput ?? { text: chunk.text });
const hasStructuredInputs = inputs.some((input) => hasNonTextEmbeddingParts(input));
if (hasStructuredInputs && !provider.embedBatchInputs) {
throw new Error(
@@ -125,14 +212,7 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
}
cursor += batch.length;
}
upsertMemoryEmbeddingCache({
db: this.db,
enabled: this.cache.enabled,
provider: this.provider,
providerKey: this.providerKey,
entries: toCache,
tableName: EMBEDDING_CACHE_TABLE,
});
this.upsertEmbeddingCache(toCache);
return embeddings;
}
@@ -160,9 +240,8 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
_entry: MemoryFileEntry | SessionFileEntry,
source: MemorySource,
): Promise<number[][]> {
const provider = this.provider;
const batchEmbed = this.providerRuntime?.batchEmbed;
if (!provider || !batchEmbed) {
if (!this.provider || !batchEmbed) {
return this.embedChunksInBatches(chunks);
}
if (chunks.length === 0) {
@@ -175,7 +254,7 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
const missingChunks = missing.map((item) => item.chunk);
const batchResult = await this.runBatchWithFallback({
provider: provider.id,
provider: this.provider.id,
run: async () =>
await batchEmbed({
agentId: this.agentId,
@@ -201,14 +280,7 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
embeddings[item.index] = embedding;
toCache.push({ hash: item.chunk.hash, embedding });
}
upsertMemoryEmbeddingCache({
db: this.db,
enabled: this.cache.enabled,
provider,
providerKey: this.providerKey,
entries: toCache,
tableName: EMBEDDING_CACHE_TABLE,
});
this.upsertEmbeddingCache(toCache);
return embeddings;
}
@@ -216,92 +288,106 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
embeddings: number[][];
missing: Array<{ index: number; chunk: MemoryChunk }>;
} {
return collectMemoryCachedEmbeddings({
chunks,
cached: loadMemoryEmbeddingCache({
db: this.db,
enabled: this.cache.enabled,
provider: this.provider,
providerKey: this.providerKey,
hashes: chunks.map((chunk) => chunk.hash),
tableName: EMBEDDING_CACHE_TABLE,
}),
});
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
for (let i = 0; i < chunks.length; i += 1) {
const chunk = chunks[i];
const hit = chunk?.hash ? cached.get(chunk.hash) : undefined;
if (hit && hit.length > 0) {
embeddings[i] = hit;
} else if (chunk) {
missing.push({ index: i, chunk });
}
}
return { embeddings, missing };
}
protected async embedBatchWithRetry(texts: string[]): Promise<number[][]> {
if (texts.length === 0) {
return [];
}
const provider = this.provider;
if (!provider) {
if (!this.provider) {
throw new Error("Cannot embed batch in FTS-only mode (no embedding provider)");
}
return await runMemoryEmbeddingRetryLoop({
run: async () => {
let attempt = 0;
let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS;
while (true) {
try {
const timeoutMs = this.resolveEmbeddingTimeout("batch");
log.debug("memory embeddings: batch start", {
provider: provider.id,
provider: this.provider.id,
items: texts.length,
timeoutMs,
});
return await this.withTimeout(
provider.embedBatch(texts),
this.provider.embedBatch(texts),
timeoutMs,
`memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`,
);
},
isRetryable: isRetryableMemoryEmbeddingError,
waitForRetry: async (delayMs) => {
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
if (!this.isRetryableEmbeddingError(message) || attempt >= EMBEDDING_RETRY_MAX_ATTEMPTS) {
throw err;
}
await this.waitForEmbeddingRetry(delayMs, "retrying");
},
maxAttempts: EMBEDDING_RETRY_MAX_ATTEMPTS,
baseDelayMs: EMBEDDING_RETRY_BASE_DELAY_MS,
});
delayMs *= 2;
attempt += 1;
}
}
}
protected async embedBatchInputsWithRetry(inputs: EmbeddingInput[]): Promise<number[][]> {
if (inputs.length === 0) {
return [];
}
const provider = this.provider;
const embedBatchInputs = provider?.embedBatchInputs;
if (!embedBatchInputs) {
if (!this.provider?.embedBatchInputs) {
return await this.embedBatchWithRetry(inputs.map((input) => input.text));
}
return await runMemoryEmbeddingRetryLoop({
run: async () => {
let attempt = 0;
let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS;
while (true) {
try {
const timeoutMs = this.resolveEmbeddingTimeout("batch");
log.debug("memory embeddings: structured batch start", {
provider: provider.id,
provider: this.provider.id,
items: inputs.length,
timeoutMs,
});
return await this.withTimeout(
embedBatchInputs(inputs),
this.provider.embedBatchInputs(inputs),
timeoutMs,
`memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`,
);
},
isRetryable: isRetryableMemoryEmbeddingError,
waitForRetry: async (delayMs) => {
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
if (!this.isRetryableEmbeddingError(message) || attempt >= EMBEDDING_RETRY_MAX_ATTEMPTS) {
throw err;
}
await this.waitForEmbeddingRetry(delayMs, "retrying structured batch");
},
maxAttempts: EMBEDDING_RETRY_MAX_ATTEMPTS,
baseDelayMs: EMBEDDING_RETRY_BASE_DELAY_MS,
});
delayMs *= 2;
attempt += 1;
}
}
}
private async waitForEmbeddingRetry(delayMs: number, action: string): Promise<void> {
const waitMs = resolveMemoryEmbeddingRetryDelay(
delayMs,
Math.random(),
const waitMs = Math.min(
EMBEDDING_RETRY_MAX_DELAY_MS,
Math.round(delayMs * (1 + Math.random() * 0.2)),
);
log.warn(`memory embeddings rate limited; ${action} in ${waitMs}ms`);
await new Promise((resolve) => setTimeout(resolve, waitMs));
}
private isRetryableEmbeddingError(message: string): boolean {
return /(rate[_ ]limit|too many requests|429|resource has been exhausted|5\d\d|cloudflare|tokens per day)/i.test(
message,
);
}
private resolveEmbeddingTimeout(kind: "query" | "batch"): number {
const isLocal = this.provider?.id === "local";
if (kind === "query") {
@@ -363,16 +449,9 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
if (this.batchFailureCount > 0) {
log.debug("memory embeddings: batch recovered; resetting failure count");
}
const nextState = resetMemoryBatchFailureState({
enabled: this.batch.enabled,
count: this.batchFailureCount,
lastError: this.batchFailureLastError,
lastProvider: this.batchFailureLastProvider,
});
this.batch.enabled = nextState.enabled;
this.batchFailureCount = nextState.count;
this.batchFailureLastError = nextState.lastError;
this.batchFailureLastProvider = nextState.lastProvider;
this.batchFailureCount = 0;
this.batchFailureLastError = undefined;
this.batchFailureLastProvider = undefined;
});
}
@@ -386,20 +465,17 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
if (!this.batch.enabled) {
return { disabled: true, count: this.batchFailureCount };
}
const nextState = recordMemoryBatchFailure(
{
enabled: this.batch.enabled,
count: this.batchFailureCount,
lastError: this.batchFailureLastError,
lastProvider: this.batchFailureLastProvider,
},
params,
);
this.batch.enabled = nextState.enabled;
this.batchFailureCount = nextState.count;
this.batchFailureLastError = nextState.lastError;
this.batchFailureLastProvider = nextState.lastProvider;
return { disabled: !nextState.enabled, count: nextState.count };
const increment = params.forceDisable
? BATCH_FAILURE_LIMIT
: Math.max(1, params.attempts ?? 1);
this.batchFailureCount += increment;
this.batchFailureLastError = params.message;
this.batchFailureLastProvider = params.provider;
const disabled = params.forceDisable || this.batchFailureCount >= BATCH_FAILURE_LIMIT;
if (disabled) {
this.batch.enabled = false;
}
return { disabled, count: this.batchFailureCount };
});
}
@@ -455,7 +531,7 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
});
const suffix = failure.disabled ? "disabling batch" : "keeping batch enabled";
log.warn(
`memory embeddings: ${params.provider} batch failed (${failure.count}/${MEMORY_BATCH_FAILURE_LIMIT}); ${suffix}; falling back to non-batch embeddings: ${message}`,
`memory embeddings: ${params.provider} batch failed (${failure.count}/${BATCH_FAILURE_LIMIT}); ${suffix}; falling back to non-batch embeddings: ${message}`,
);
return await params.fallback();
}
@@ -477,13 +553,17 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
}
if (this.fts.enabled && this.fts.available) {
try {
deleteMemoryFtsRows({
db: this.db,
tableName: FTS_TABLE,
path: pathname,
source,
currentModel: this.provider?.model,
});
if (this.provider) {
// Scoped to current model — avoids removing rows from a different model.
this.db
.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`)
.run(pathname, source, this.provider.model);
} else {
// FTS-only: searchKeyword matches all models, so clear all to avoid stale rows.
this.db
.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ?`)
.run(pathname, source);
}
} catch {}
}
this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(pathname, source);
@@ -593,7 +673,9 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
return;
}
const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8"));
const chunks = filterNonEmptyMemoryChunks(chunkMarkdown(content, this.settings.chunking));
const chunks = chunkMarkdown(content, this.settings.chunking).filter(
(chunk) => chunk.text.trim().length > 0,
);
if (options.source === "sessions" && "lineMap" in entry) {
remapChunkLines(chunks, entry.lineMap);
}
@@ -623,7 +705,9 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
chunks = [multimodalChunk.chunk];
} else {
const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8"));
const baseChunks = filterNonEmptyMemoryChunks(chunkMarkdown(content, this.settings.chunking));
const baseChunks = chunkMarkdown(content, this.settings.chunking).filter(
(chunk) => chunk.text.trim().length > 0,
);
chunks = this.provider
? enforceEmbeddingMaxInputTokens(this.provider, baseChunks, EMBEDDING_BATCH_MAX_TOKENS)
: baseChunks;

View File

@@ -1,103 +0,0 @@
import { describe, expect, it, vi } from "vitest";
import {
buildMemoryEmbeddingBatches,
filterNonEmptyMemoryChunks,
isRetryableMemoryEmbeddingError,
resolveMemoryEmbeddingRetryDelay,
runMemoryEmbeddingRetryLoop,
} from "./manager-embedding-policy.js";
function chunk(text: string) {
return {
startLine: 1,
endLine: 1,
text,
hash: text,
};
}
describe("memory embedding policy", () => {
it("splits large files across multiple embedding batches", () => {
const line = "a".repeat(4200);
const batches = buildMemoryEmbeddingBatches([chunk(line), chunk(line)], 8000);
expect(batches).toHaveLength(2);
expect(batches.every((batch) => batch.length === 1)).toBe(true);
});
it("keeps small files in a single embedding batch", () => {
const line = "b".repeat(120);
const batches = buildMemoryEmbeddingBatches(
[chunk(line), chunk(line), chunk(line), chunk(line)],
8000,
);
expect(batches).toHaveLength(1);
expect(batches[0]).toHaveLength(4);
});
it("filters empty chunks before embedding", () => {
const chunks = filterNonEmptyMemoryChunks([chunk("\n\n"), chunk("hello"), chunk(" ")]);
expect(chunks.map((entry) => entry.text)).toEqual(["hello"]);
});
it("retries transient rate limit and 5xx errors", async () => {
const run = vi.fn(async () => {
const call = run.mock.calls.length;
if (call === 1) {
throw new Error("openai embeddings failed: 429 rate limit");
}
if (call === 2) {
throw new Error("openai embeddings failed: 502 Bad Gateway (cloudflare)");
}
return "ok";
});
const waits: number[] = [];
const result = await runMemoryEmbeddingRetryLoop({
run,
isRetryable: isRetryableMemoryEmbeddingError,
waitForRetry: async (delayMs) => {
waits.push(delayMs);
},
maxAttempts: 3,
baseDelayMs: 500,
});
expect(result).toBe("ok");
expect(run).toHaveBeenCalledTimes(3);
expect(waits).toEqual([500, 1000]);
});
it("retries too-many-tokens-per-day errors", async () => {
let calls = 0;
const waits: number[] = [];
const result = await runMemoryEmbeddingRetryLoop({
run: async () => {
calls += 1;
if (calls === 1) {
throw new Error("AWS Bedrock embeddings failed: Too many tokens per day");
}
return "ok";
},
isRetryable: isRetryableMemoryEmbeddingError,
waitForRetry: async (delayMs) => {
waits.push(delayMs);
},
maxAttempts: 3,
baseDelayMs: 500,
});
expect(result).toBe("ok");
expect(calls).toBe(2);
expect(waits).toEqual([500]);
});
it("caps retry jittered delays", () => {
expect(resolveMemoryEmbeddingRetryDelay(500, 0, 8000)).toBe(500);
expect(resolveMemoryEmbeddingRetryDelay(500, 1, 8000)).toBe(600);
expect(resolveMemoryEmbeddingRetryDelay(10_000, 1, 8000)).toBe(8000);
});
});

View File

@@ -1,123 +0,0 @@
type MemoryEmbeddingTextPart = {
type: "text";
text: string;
};
type MemoryEmbeddingInlineDataPart = {
type: "inline-data";
mimeType: string;
data: string;
};
type MemoryEmbeddingInput = {
text: string;
parts?: Array<MemoryEmbeddingTextPart | MemoryEmbeddingInlineDataPart>;
};
type MemoryEmbeddingChunk = {
text: string;
embeddingInput?: MemoryEmbeddingInput;
};
function estimateUtf8Bytes(text: string): number {
if (!text) {
return 0;
}
return Buffer.byteLength(text, "utf8");
}
function estimateStructuredEmbeddingInputBytes(input: MemoryEmbeddingInput): number {
if (!input.parts?.length) {
return estimateUtf8Bytes(input.text);
}
let total = 0;
for (const part of input.parts) {
if (part.type === "text") {
total += estimateUtf8Bytes(part.text);
} else {
total += estimateUtf8Bytes(part.mimeType);
total += estimateUtf8Bytes(part.data);
}
}
return total;
}
export function filterNonEmptyMemoryChunks<T extends MemoryEmbeddingChunk>(chunks: T[]): T[] {
return chunks.filter((chunk) => chunk.text.trim().length > 0);
}
export function buildMemoryEmbeddingBatches<T extends MemoryEmbeddingChunk>(
chunks: T[],
maxTokens: number,
): T[][] {
const batches: T[][] = [];
let current: T[] = [];
let currentTokens = 0;
for (const chunk of chunks) {
const estimate = chunk.embeddingInput
? estimateStructuredEmbeddingInputBytes(chunk.embeddingInput)
: estimateUtf8Bytes(chunk.text);
const wouldExceed = current.length > 0 && currentTokens + estimate > maxTokens;
if (wouldExceed) {
batches.push(current);
current = [];
currentTokens = 0;
}
if (current.length === 0 && estimate > maxTokens) {
batches.push([chunk]);
continue;
}
current.push(chunk);
currentTokens += estimate;
}
if (current.length > 0) {
batches.push(current);
}
return batches;
}
export function isRetryableMemoryEmbeddingError(message: string): boolean {
return /(rate[_ ]limit|too many requests|429|resource has been exhausted|5\d\d|cloudflare|tokens per day)/i.test(
message,
);
}
export function resolveMemoryEmbeddingRetryDelay(
delayMs: number,
randomValue: number,
maxDelayMs: number,
): number {
return Math.min(maxDelayMs, Math.round(delayMs * (1 + randomValue * 0.2)));
}
export async function runMemoryEmbeddingRetryLoop<T>(params: {
run: () => Promise<T>;
isRetryable: (message: string) => boolean;
waitForRetry: (delayMs: number) => Promise<void>;
maxAttempts: number;
baseDelayMs: number;
}): Promise<T> {
let attempt = 0;
let delayMs = params.baseDelayMs;
while (true) {
try {
return await params.run();
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
if (!params.isRetryable(message) || attempt >= params.maxAttempts) {
throw err;
}
await params.waitForRetry(delayMs);
delayMs *= 2;
attempt += 1;
}
}
}
export function buildTextEmbeddingInputs<T extends MemoryEmbeddingChunk>(
chunks: T[],
): MemoryEmbeddingInput[] {
return chunks.map((chunk) => chunk.embeddingInput ?? { text: chunk.text });
}

View File

@@ -1,63 +0,0 @@
import { DatabaseSync } from "node:sqlite";
import { afterEach, describe, expect, it } from "vitest";
import { deleteMemoryFtsRows } from "./manager-fts-state.js";
describe("memory FTS state", () => {
let db: DatabaseSync | null = null;
afterEach(() => {
db?.close();
db = null;
});
it("only removes rows for the active model when a provider is active", () => {
db = new DatabaseSync(":memory:");
db.exec("CREATE TABLE chunks_fts (path TEXT, source TEXT, model TEXT)");
db.prepare("INSERT INTO chunks_fts (path, source, model) VALUES (?, ?, ?)").run(
"memory/2026-01-12.md",
"memory",
"mock-embed",
);
db.prepare("INSERT INTO chunks_fts (path, source, model) VALUES (?, ?, ?)").run(
"memory/2026-01-12.md",
"memory",
"other-model",
);
deleteMemoryFtsRows({
db,
path: "memory/2026-01-12.md",
source: "memory",
currentModel: "mock-embed",
});
const rows = db.prepare("SELECT model FROM chunks_fts ORDER BY model").all() as Array<{
model: string;
}>;
expect(rows).toEqual([{ model: "other-model" }]);
});
it("removes all rows for the path in FTS-only mode", () => {
db = new DatabaseSync(":memory:");
db.exec("CREATE TABLE chunks_fts (path TEXT, source TEXT, model TEXT)");
db.prepare("INSERT INTO chunks_fts (path, source, model) VALUES (?, ?, ?)").run(
"memory/2026-01-12.md",
"memory",
"mock-embed",
);
db.prepare("INSERT INTO chunks_fts (path, source, model) VALUES (?, ?, ?)").run(
"memory/2026-01-12.md",
"memory",
"fts-only",
);
deleteMemoryFtsRows({
db,
path: "memory/2026-01-12.md",
source: "memory",
});
const count = db.prepare("SELECT COUNT(*) as c FROM chunks_fts").get() as { c: number };
expect(count.c).toBe(0);
});
});

View File

@@ -1,21 +0,0 @@
import type { DatabaseSync } from "node:sqlite";
import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
export function deleteMemoryFtsRows(params: {
db: DatabaseSync;
tableName?: string;
path: string;
source: MemorySource;
currentModel?: string;
}): void {
const tableName = params.tableName ?? "chunks_fts";
if (params.currentModel) {
params.db
.prepare(`DELETE FROM ${tableName} WHERE path = ? AND source = ? AND model = ?`)
.run(params.path, params.source, params.currentModel);
return;
}
params.db
.prepare(`DELETE FROM ${tableName} WHERE path = ? AND source = ?`)
.run(params.path, params.source);
}

View File

@@ -1,10 +1,6 @@
import type { SQLInputValue } from "node:sqlite";
import type { DatabaseSync } from "node:sqlite";
type VectorWriteDb = {
prepare: (sql: string) => {
run: (...params: SQLInputValue[]) => unknown;
};
};
type VectorWriteDb = Pick<DatabaseSync, "prepare">;
const vectorToBlob = (embedding: number[]): Buffer =>
Buffer.from(new Float32Array(embedding).buffer);

View File

@@ -0,0 +1,330 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { useFastShortTimeouts } from "../../../../test/helpers/fast-short-timeouts.js";
import { createOpenAIEmbeddingProviderMock } from "./test-embeddings-mock.js";
import { mockPublicPinnedHostname } from "./test-helpers/ssrf.js";
type MemoryIndexManager = import("./index.js").MemoryIndexManager;
type MemoryIndexModule = typeof import("./index.js");
const embedBatch = vi.fn(async (_texts: string[]) => [] as number[][]);
const embedQuery = vi.fn(async () => [0.5, 0.5, 0.5]);
let getMemorySearchManager: MemoryIndexModule["getMemorySearchManager"];
describe("memory indexing with OpenAI batches", () => {
let fixtureRoot: string;
let workspaceDir: string;
let memoryDir: string;
let indexPath: string;
let manager: MemoryIndexManager | null = null;
async function readOpenAIBatchUploadRequests(body: FormData) {
let uploadedRequests: Array<{ custom_id?: string }> = [];
const entries = body.entries() as IterableIterator<[string, FormDataEntryValue]>;
for (const [key, value] of entries) {
if (key !== "file") {
continue;
}
const text = typeof value === "string" ? value : await value.text();
uploadedRequests = text
.split("\n")
.filter(Boolean)
.map((line: string) => JSON.parse(line) as { custom_id?: string });
}
return uploadedRequests;
}
function createOpenAIBatchFetchMock(options?: {
onCreateBatch?: (ctx: { batchCreates: number }) => Response | Promise<Response>;
}) {
let uploadedRequests: Array<{ custom_id?: string }> = [];
const state = { batchCreates: 0 };
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url =
typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
if (url.endsWith("/files")) {
const body = init?.body;
if (!(body instanceof FormData)) {
throw new Error("expected FormData upload");
}
uploadedRequests = await readOpenAIBatchUploadRequests(body);
return new Response(JSON.stringify({ id: "file_1" }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
if (url.endsWith("/batches")) {
state.batchCreates += 1;
if (options?.onCreateBatch) {
return await options.onCreateBatch({ batchCreates: state.batchCreates });
}
return new Response(JSON.stringify({ id: "batch_1", status: "in_progress" }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
if (url.endsWith("/batches/batch_1")) {
return new Response(
JSON.stringify({ id: "batch_1", status: "completed", output_file_id: "file_out" }),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
}
if (url.endsWith("/files/file_out/content")) {
const lines = uploadedRequests.map((request, index) =>
JSON.stringify({
custom_id: request.custom_id,
response: {
status_code: 200,
body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] },
},
}),
);
return new Response(lines.join("\n"), {
status: 200,
headers: { "Content-Type": "application/jsonl" },
});
}
throw new Error(`unexpected fetch ${url}`);
});
return { fetchMock, state };
}
function createBatchCfg(): OpenClawConfig {
return {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "text-embedding-3-small",
store: { path: indexPath, vector: { enabled: false } },
sync: { watch: false, onSessionStart: false, onSearch: false },
query: { minScore: 0, hybrid: { enabled: false } },
remote: { batch: { enabled: true, wait: true, pollIntervalMs: 1 } },
},
},
list: [{ id: "main", default: true }],
},
} as OpenClawConfig;
}
beforeAll(async () => {
vi.resetModules();
vi.doMock("./embeddings.js", () => ({
createEmbeddingProvider: async () =>
createOpenAIEmbeddingProviderMock({
embedQuery,
embedBatch,
}),
}));
await import("./test-runtime-mocks.js");
({ getMemorySearchManager } = await import("./index.js"));
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-mem-batch-"));
workspaceDir = path.join(fixtureRoot, "workspace");
memoryDir = path.join(workspaceDir, "memory");
indexPath = path.join(fixtureRoot, "index.sqlite");
await fs.mkdir(memoryDir, { recursive: true });
const result = await getMemorySearchManager({ cfg: createBatchCfg(), agentId: "main" });
expect(result.manager).not.toBeNull();
if (!result.manager) {
throw new Error("manager missing");
}
manager = result.manager as unknown as MemoryIndexManager;
});
afterAll(async () => {
if (manager) {
await manager.close();
manager = null;
}
await fs.rm(fixtureRoot, { recursive: true, force: true });
});
beforeEach(async () => {
embedBatch.mockClear();
embedQuery.mockClear();
embedBatch.mockImplementation(async (texts: string[]) =>
texts.map((_text, index) => [index + 1, 0, 0]),
);
await fs.rm(memoryDir, { recursive: true, force: true });
await fs.mkdir(memoryDir, { recursive: true });
// Reuse one manager instance across tests; keep index state isolated.
if (!manager) {
throw new Error("manager missing");
}
(manager as unknown as { resetIndex: () => void }).resetIndex();
(manager as unknown as { dirty: boolean }).dirty = true;
(manager as unknown as { batchFailureCount: number }).batchFailureCount = 0;
(manager as unknown as { batchFailureLastError?: string }).batchFailureLastError = undefined;
(manager as unknown as { batchFailureLastProvider?: string }).batchFailureLastProvider =
undefined;
(manager as unknown as { batch: { enabled: boolean } }).batch.enabled = true;
});
afterEach(async () => {
vi.unstubAllGlobals();
});
it("uses OpenAI batch uploads when enabled", async () => {
const restoreTimeouts = useFastShortTimeouts();
const content = ["hello", "from", "batch"].join("\n\n");
await fs.writeFile(path.join(memoryDir, "2026-01-07.md"), content);
const { fetchMock } = createOpenAIBatchFetchMock();
vi.stubGlobal("fetch", fetchMock);
mockPublicPinnedHostname();
try {
if (!manager) {
throw new Error("manager missing");
}
const labels: string[] = [];
await manager.sync({
progress: (update) => {
if (update.label) {
labels.push(update.label);
}
},
});
const status = manager.status();
expect(status.chunks).toBeGreaterThan(0);
expect(embedBatch).not.toHaveBeenCalled();
expect(fetchMock).toHaveBeenCalled();
expect(labels.some((label) => label.toLowerCase().includes("batch"))).toBe(true);
} finally {
restoreTimeouts();
}
});
it("retries OpenAI batch create on transient failures", async () => {
const restoreTimeouts = useFastShortTimeouts();
const content = ["retry", "the", "batch"].join("\n\n");
await fs.writeFile(path.join(memoryDir, "2026-01-08.md"), content);
const { fetchMock, state } = createOpenAIBatchFetchMock({
onCreateBatch: ({ batchCreates }) => {
if (batchCreates === 1) {
return new Response("upstream connect error", { status: 503 });
}
return new Response(JSON.stringify({ id: "batch_1", status: "in_progress" }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
},
});
vi.stubGlobal("fetch", fetchMock);
mockPublicPinnedHostname();
try {
if (!manager) {
throw new Error("manager missing");
}
await manager.sync({ reason: "test" });
const status = manager.status();
expect(status.chunks).toBeGreaterThan(0);
expect(state.batchCreates).toBe(2);
} finally {
restoreTimeouts();
}
});
it("tracks batch failures, resets on success, and disables after repeated failures", async () => {
const restoreTimeouts = useFastShortTimeouts();
const memoryFile = path.join(memoryDir, "2026-01-09.md");
await fs.writeFile(memoryFile, ["flaky", "batch"].join("\n\n"));
let mtimeMs = Date.now();
const touch = async () => {
mtimeMs += 1_000;
const date = new Date(mtimeMs);
await fs.utimes(memoryFile, date, date);
};
await touch();
let mode: "fail" | "ok" = "fail";
const { fetchMock } = createOpenAIBatchFetchMock({
onCreateBatch: () =>
mode === "fail"
? new Response("batch failed", { status: 400 })
: new Response(JSON.stringify({ id: "batch_1", status: "in_progress" }), {
status: 200,
headers: { "Content-Type": "application/json" },
}),
});
vi.stubGlobal("fetch", fetchMock);
mockPublicPinnedHostname();
try {
if (!manager) {
throw new Error("manager missing");
}
// First failure: fallback to regular embeddings and increment failure count.
await manager.sync({ reason: "test" });
expect(embedBatch).toHaveBeenCalled();
let status = manager.status();
expect(status.batch?.enabled).toBe(true);
expect(status.batch?.failures).toBe(1);
// Success should reset failure count.
embedBatch.mockClear();
mode = "ok";
await fs.writeFile(memoryFile, ["flaky", "batch", "recovery"].join("\n\n"));
await touch();
(manager as unknown as { dirty: boolean }).dirty = true;
await manager.sync({ reason: "test" });
status = manager.status();
expect(status.batch?.enabled).toBe(true);
expect(status.batch?.failures).toBe(0);
expect(embedBatch).not.toHaveBeenCalled();
// Two more failures after reset should disable remote batching.
await (
manager as unknown as {
recordBatchFailure: (params: {
provider: string;
message: string;
attempts?: number;
forceDisable?: boolean;
}) => Promise<unknown>;
}
).recordBatchFailure({ provider: "openai", message: "batch failed", attempts: 1 });
await (
manager as unknown as {
recordBatchFailure: (params: {
provider: string;
message: string;
attempts?: number;
forceDisable?: boolean;
}) => Promise<unknown>;
}
).recordBatchFailure({ provider: "openai", message: "batch failed", attempts: 1 });
status = manager.status();
expect(status.batch?.enabled).toBe(false);
expect(status.batch?.failures).toBeGreaterThanOrEqual(2);
// Once disabled, batch endpoints are skipped and fallback embeddings run directly.
const fetchCalls = fetchMock.mock.calls.length;
embedBatch.mockClear();
await fs.writeFile(memoryFile, ["flaky", "batch", "fallback"].join("\n\n"));
await touch();
(manager as unknown as { dirty: boolean }).dirty = true;
await manager.sync({ reason: "test" });
expect(fetchMock.mock.calls.length).toBe(fetchCalls);
expect(embedBatch).toHaveBeenCalled();
} finally {
restoreTimeouts();
}
});
});

View File

@@ -0,0 +1,159 @@
import fs from "node:fs/promises";
import path from "node:path";
import type { MemorySyncProgressUpdate } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
import { describe, expect, it } from "vitest";
import { useFastShortTimeouts } from "../../../../test/helpers/fast-short-timeouts.js";
import { installEmbeddingManagerFixture } from "./embedding-manager.test-harness.js";
const fx = installEmbeddingManagerFixture({
fixturePrefix: "openclaw-mem-",
largeTokens: 4000,
smallTokens: 200,
createCfg: ({ workspaceDir, indexPath, tokens }) => ({
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "mock-embed",
store: { path: indexPath, vector: { enabled: false } },
chunking: { tokens, overlap: 0 },
sync: { watch: false, onSessionStart: false, onSearch: false },
query: { minScore: 0, hybrid: { enabled: false } },
},
},
list: [{ id: "main", default: true }],
},
}),
});
describe("memory embedding batches", () => {
function requireSync(manager: {
sync?: (params?: {
reason?: string;
progress?: (update: MemorySyncProgressUpdate) => void;
}) => Promise<void>;
}): (params?: {
reason?: string;
progress?: (update: MemorySyncProgressUpdate) => void;
}) => Promise<void> {
if (!manager.sync) {
throw new Error("manager.sync missing");
}
return manager.sync.bind(manager);
}
async function expectSyncWithFastTimeouts(manager: {
sync?: (params?: { reason?: string }) => Promise<void>;
}) {
const restoreFastTimeouts = useFastShortTimeouts();
try {
await requireSync(manager)({ reason: "test" });
} finally {
restoreFastTimeouts();
}
}
it("splits large files across multiple embedding batches", async () => {
const memoryDir = fx.getMemoryDir();
const manager = await fx.getManagerLarge();
// Keep this small but above the embedding batch byte threshold (8k) so we
// exercise multi-batch behavior without generating lots of chunks/DB rows.
const line = "a".repeat(4200);
const content = [line, line].join("\n");
await fs.writeFile(path.join(memoryDir, "2026-01-03.md"), content);
const updates: Array<{ completed: number; total: number; label?: string }> = [];
await requireSync(manager)({
progress: (update) => {
updates.push(update);
},
});
const status = manager.status();
const totalTexts = fx.embedBatch.mock.calls.reduce(
(sum: number, call: unknown[]) => sum + ((call[0] as string[] | undefined)?.length ?? 0),
0,
);
expect(totalTexts).toBe(status.chunks);
expect(fx.embedBatch.mock.calls.length).toBeGreaterThan(1);
const inputs: string[] = fx.embedBatch.mock.calls.flatMap(
(call: unknown[]) => (call[0] as string[] | undefined) ?? [],
);
expect(inputs.every((text) => Buffer.byteLength(text, "utf8") <= 8000)).toBe(true);
expect(updates.length).toBeGreaterThan(0);
expect(updates.some((update) => update.label?.includes("/"))).toBe(true);
const last = updates[updates.length - 1];
expect(last?.total).toBeGreaterThan(0);
expect(last?.completed).toBe(last?.total);
});
it("keeps small files in a single embedding batch", async () => {
const memoryDir = fx.getMemoryDir();
const manager = await fx.getManagerLarge();
const line = "b".repeat(120);
const content = Array.from({ length: 4 }, () => line).join("\n");
await fs.writeFile(path.join(memoryDir, "2026-01-04.md"), content);
await requireSync(manager)({ reason: "test" });
expect(fx.embedBatch.mock.calls.length).toBe(1);
});
it("retries embeddings on transient rate limit and 5xx errors", async () => {
const memoryDir = fx.getMemoryDir();
const manager = await fx.getManagerLarge();
const line = "d".repeat(120);
const content = Array.from({ length: 4 }, () => line).join("\n");
await fs.writeFile(path.join(memoryDir, "2026-01-06.md"), content);
const transientErrors = [
"openai embeddings failed: 429 rate limit",
"openai embeddings failed: 502 Bad Gateway (cloudflare)",
];
let calls = 0;
fx.embedBatch.mockImplementation(async (texts: string[]) => {
calls += 1;
const transient = transientErrors[calls - 1];
if (transient) {
throw new Error(transient);
}
return texts.map(() => [0, 1, 0]);
});
await expectSyncWithFastTimeouts(manager);
expect(calls).toBe(3);
}, 10000);
it("retries embeddings on too-many-tokens-per-day rate limits", async () => {
const memoryDir = fx.getMemoryDir();
const manager = await fx.getManagerLarge();
const line = "e".repeat(120);
const content = Array.from({ length: 4 }, () => line).join("\n");
await fs.writeFile(path.join(memoryDir, "2026-01-08.md"), content);
let calls = 0;
fx.embedBatch.mockImplementation(async (texts: string[]) => {
calls += 1;
if (calls === 1) {
throw new Error("AWS Bedrock embeddings failed: Too many tokens per day");
}
return texts.map(() => [0, 1, 0]);
});
await expectSyncWithFastTimeouts(manager);
expect(calls).toBe(2);
}, 10000);
it("skips empty chunks so embeddings input stays valid", async () => {
const memoryDir = fx.getMemoryDir();
const manager = await fx.getManagerLarge();
await fs.writeFile(path.join(memoryDir, "2026-01-07.md"), "\n\n\n");
await requireSync(manager)({ reason: "test" });
const inputs = fx.embedBatch.mock.calls.flatMap(
(call: unknown[]) => (call[0] as string[]) ?? [],
);
expect(inputs).not.toContain("");
});
});

View File

@@ -28,7 +28,6 @@ import {
} from "./embeddings.js";
import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js";
import { awaitPendingManagerWork, startAsyncSearchSync } from "./manager-async-state.js";
import { MEMORY_BATCH_FAILURE_LIMIT } from "./manager-batch-state.js";
import {
closeManagedCacheEntries,
getOrCreateManagedCacheEntry,
@@ -57,6 +56,8 @@ const SNIPPET_MAX_CHARS = 700;
const VECTOR_TABLE = "chunks_vec";
const FTS_TABLE = "chunks_fts";
const EMBEDDING_CACHE_TABLE = "embedding_cache";
const BATCH_FAILURE_LIMIT = 2;
const MEMORY_INDEX_MANAGER_CACHE_KEY = Symbol.for("openclaw.memoryIndexManagerCache");
const log = createSubsystemLogger("memory");
@@ -730,7 +731,7 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
batch: {
enabled: this.batch.enabled,
failures: this.batchFailureCount,
limit: MEMORY_BATCH_FAILURE_LIMIT,
limit: BATCH_FAILURE_LIMIT,
wait: this.batch.wait,
concurrency: this.batch.concurrency,
pollIntervalMs: this.batch.pollIntervalMs,

View File

@@ -1,40 +0,0 @@
import { describe, expect, it } from "vitest";
import { filterUnregisteredMemoryEmbeddingProviderAdapters } from "./provider-adapter-registration.js";
describe("filterUnregisteredMemoryEmbeddingProviderAdapters", () => {
it("keeps builtin adapters that are not already registered", () => {
const adapters = filterUnregisteredMemoryEmbeddingProviderAdapters({
builtinAdapters: [
{ id: "local" },
{ id: "openai" },
{ id: "gemini" },
{ id: "voyage" },
{ id: "mistral" },
],
registeredAdapters: [],
});
expect(adapters.map((adapter) => adapter.id)).toEqual([
"local",
"openai",
"gemini",
"voyage",
"mistral",
]);
});
it("skips builtin adapters that are already registered", () => {
const adapters = filterUnregisteredMemoryEmbeddingProviderAdapters({
builtinAdapters: [
{ id: "local" },
{ id: "openai" },
{ id: "gemini" },
{ id: "voyage" },
{ id: "mistral" },
],
registeredAdapters: [{ id: "local" }, { id: "gemini" }],
});
expect(adapters.map((adapter) => adapter.id)).toEqual(["openai", "voyage", "mistral"]);
});
});

View File

@@ -1,11 +0,0 @@
type AdapterLike = {
id: string;
};
export function filterUnregisteredMemoryEmbeddingProviderAdapters<T extends AdapterLike>(params: {
builtinAdapters: readonly T[];
registeredAdapters: readonly AdapterLike[];
}): T[] {
const existingIds = new Set(params.registeredAdapters.map((adapter) => adapter.id));
return params.builtinAdapters.filter((adapter) => !existingIds.has(adapter.id));
}

View File

@@ -0,0 +1,63 @@
import type { MemoryEmbeddingProviderAdapter } from "openclaw/plugin-sdk/memory-core-host-engine-embeddings";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { registerBuiltInMemoryEmbeddingProviders } from "./provider-adapters.js";
const mocks = vi.hoisted(() => ({
listRegisteredMemoryEmbeddingProviderAdapters: vi.fn<() => MemoryEmbeddingProviderAdapter[]>(
() => [],
),
listMemoryEmbeddingProviders: vi.fn(() => {
throw new Error("fallback capability loading should stay cold during memory-core register");
}),
}));
vi.mock("openclaw/plugin-sdk/memory-core-host-engine-embeddings", async () => {
const actual = await vi.importActual<
typeof import("openclaw/plugin-sdk/memory-core-host-engine-embeddings")
>("openclaw/plugin-sdk/memory-core-host-engine-embeddings");
return {
...actual,
listRegisteredMemoryEmbeddingProviderAdapters:
mocks.listRegisteredMemoryEmbeddingProviderAdapters,
listMemoryEmbeddingProviders: mocks.listMemoryEmbeddingProviders,
};
});
beforeEach(() => {
mocks.listRegisteredMemoryEmbeddingProviderAdapters.mockReset();
mocks.listRegisteredMemoryEmbeddingProviderAdapters.mockReturnValue([]);
mocks.listMemoryEmbeddingProviders.mockClear();
});
describe("registerBuiltInMemoryEmbeddingProviders", () => {
it("uses only already-registered providers when avoiding duplicates", () => {
const ids: string[] = [];
registerBuiltInMemoryEmbeddingProviders({
registerMemoryEmbeddingProvider(adapter) {
ids.push(adapter.id);
},
});
expect(ids).toEqual(["local", "openai", "gemini", "voyage", "mistral"]);
expect(mocks.listRegisteredMemoryEmbeddingProviderAdapters).toHaveBeenCalledTimes(1);
expect(mocks.listMemoryEmbeddingProviders).not.toHaveBeenCalled();
});
it("skips builtin adapters that are already registered in the current load", () => {
mocks.listRegisteredMemoryEmbeddingProviderAdapters.mockReturnValue([
{ id: "local", create: vi.fn() } as MemoryEmbeddingProviderAdapter,
{ id: "gemini", create: vi.fn() } as MemoryEmbeddingProviderAdapter,
]);
const ids: string[] = [];
registerBuiltInMemoryEmbeddingProviders({
registerMemoryEmbeddingProvider(adapter) {
ids.push(adapter.id);
},
});
expect(ids).toEqual(["openai", "voyage", "mistral"]);
expect(mocks.listMemoryEmbeddingProviders).not.toHaveBeenCalled();
});
});

View File

@@ -21,8 +21,6 @@ import {
} from "openclaw/plugin-sdk/memory-core-host-engine-embeddings";
import { resolveUserPath } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import { getProviderEnvVars } from "openclaw/plugin-sdk/provider-env-vars";
import { formatErrorMessage } from "../dreaming-shared.js";
import { filterUnregisteredMemoryEmbeddingProviderAdapters } from "./provider-adapter-registration.js";
export type BuiltinMemoryEmbeddingProviderDoctorMetadata = {
providerId: string;
@@ -32,6 +30,10 @@ export type BuiltinMemoryEmbeddingProviderDoctorMetadata = {
autoSelectPriority?: number;
};
function formatErrorMessage(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}
function isMissingApiKeyError(err: unknown): boolean {
return formatErrorMessage(err).includes("No API key found for provider");
}
@@ -335,10 +337,13 @@ export function registerBuiltInMemoryEmbeddingProviders(register: {
// Only inspect providers already registered in the current load. Falling back
// to capability discovery here can recursively trigger plugin loading while
// memory-core itself is still registering.
for (const adapter of filterUnregisteredMemoryEmbeddingProviderAdapters({
builtinAdapters: builtinMemoryEmbeddingProviderAdapters,
registeredAdapters: listRegisteredMemoryEmbeddingProviderAdapters(),
})) {
const existingIds = new Set(
listRegisteredMemoryEmbeddingProviderAdapters().map((adapter) => adapter.id),
);
for (const adapter of builtinMemoryEmbeddingProviderAdapters) {
if (existingIds.has(adapter.id)) {
continue;
}
register.registerMemoryEmbeddingProvider(adapter);
}
}

View File

@@ -41,7 +41,6 @@ import {
type ResolvedQmdConfig,
type ResolvedQmdMcporterConfig,
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
import { asRecord } from "../dreaming-shared.js";
import { resolveQmdCollectionPatternFlags, type QmdCollectionPatternFlag } from "./qmd-compat.js";
type SqliteDatabase = import("node:sqlite").DatabaseSync;
@@ -1760,14 +1759,16 @@ export class QmdMemoryManager implements MemorySearchManager {
}
const parsedUnknown: unknown = JSON.parse(result.stdout);
const isRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);
const structured =
asRecord(parsedUnknown) && asRecord(parsedUnknown.structuredContent)
isRecord(parsedUnknown) && isRecord(parsedUnknown.structuredContent)
? parsedUnknown.structuredContent
: parsedUnknown;
const results: unknown[] =
asRecord(structured) && Array.isArray(structured.results)
isRecord(structured) && Array.isArray(structured.results)
? (structured.results as unknown[])
: Array.isArray(structured)
? structured
@@ -1775,7 +1776,7 @@ export class QmdMemoryManager implements MemorySearchManager {
const out: QmdQueryResult[] = [];
for (const item of results) {
if (!asRecord(item)) {
if (!isRecord(item)) {
continue;
}
const docidRaw = item.docid;

View File

@@ -0,0 +1,70 @@
import {
OPENAI_BATCH_ENDPOINT,
runOpenAiEmbeddingBatches,
} from "openclaw/plugin-sdk/memory-core-host-engine-embeddings";
import type { MemoryChunk } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
export function createOpenAIEmbeddingProviderMock(params: {
embedQuery: (input: string) => Promise<number[]>;
embedBatch: (input: string[]) => Promise<number[][]>;
}) {
const openAiClient = {
baseUrl: "https://api.openai.com/v1",
headers: { Authorization: "Bearer test", "Content-Type": "application/json" },
fetchImpl: (...args: Parameters<typeof fetch>) => {
if (!globalThis.fetch) {
throw new Error("fetch is not available");
}
return globalThis.fetch(...args);
},
model: "text-embedding-3-small",
};
return {
requestedProvider: "openai",
provider: {
id: "openai",
model: "text-embedding-3-small",
embedQuery: params.embedQuery,
embedBatch: params.embedBatch,
},
runtime: {
id: "openai",
cacheKeyData: {
provider: "openai",
baseUrl: openAiClient.baseUrl,
model: openAiClient.model,
},
batchEmbed: async (options: {
agentId: string;
chunks: MemoryChunk[];
wait: boolean;
concurrency: number;
pollIntervalMs: number;
timeoutMs: number;
debug: (message: string, data?: Record<string, unknown>) => void;
}) => {
const byCustomId = await runOpenAiEmbeddingBatches({
openAi: openAiClient,
agentId: options.agentId,
requests: options.chunks.map((chunk: MemoryChunk, index: number) => ({
custom_id: String(index),
method: "POST",
url: OPENAI_BATCH_ENDPOINT,
body: {
model: openAiClient.model,
input: chunk.text,
},
})),
wait: options.wait,
concurrency: options.concurrency,
pollIntervalMs: options.pollIntervalMs,
timeoutMs: options.timeoutMs,
debug: options.debug,
});
return options.chunks.map(
(_: MemoryChunk, index: number) => byCustomId.get(String(index)) ?? [],
);
},
},
};
}

View File

@@ -10,7 +10,6 @@ import {
summarizeConceptTagScriptCoverage,
type ConceptTagScriptCoverage,
} from "./concept-vocabulary.js";
import { asRecord } from "./dreaming-shared.js";
const SHORT_TERM_PATH_RE = /(?:^|\/)memory\/(\d{4})-(\d{2})-(\d{2})\.md$/;
const SHORT_TERM_BASENAME_RE = /^(\d{4})-(\d{2})-(\d{2})\.md$/;
@@ -1529,6 +1528,13 @@ export async function auditShortTermPromotionArtifacts(params: {
};
}
function asRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
export async function repairShortTermPromotionArtifacts(params: {
workspaceDir: string;
}): Promise<RepairShortTermPromotionArtifactsResult> {

View File

@@ -10,11 +10,12 @@ import type { OpenClawConfig } from "../api.js";
import type { ResolvedMemoryWikiConfig } from "./config.js";
import { appendMemoryWikiLog } from "./log.js";
import { renderMarkdownFence, renderWikiMarkdown, slugifyWikiSegment } from "./markdown.js";
import { writeImportedSourcePage } from "./source-page-shared.js";
import { pathExists, resolveArtifactKey } from "./source-path-shared.js";
import {
pruneImportedSourceEntries,
readMemoryWikiSourceSyncState,
setImportedSourceEntry,
shouldSkipImportedSourceWrite,
writeMemoryWikiSourceSyncState,
} from "./source-sync-state.js";
import { initializeMemoryWikiVault } from "./vault.js";
@@ -183,6 +184,9 @@ async function writeBridgeSourcePage(params: {
relativePath: params.artifact.relativePath,
});
const title = resolveBridgeTitle(params.artifact, params.agentIds);
const pageAbsPath = path.join(params.config.vault.path, pagePath);
const created = !(await pathExists(pageAbsPath));
const sourceUpdatedAt = new Date(params.sourceUpdatedAtMs).toISOString();
const renderFingerprint = createHash("sha1")
.update(
JSON.stringify({
@@ -193,56 +197,71 @@ async function writeBridgeSourcePage(params: {
}),
)
.digest("hex");
return writeImportedSourcePage({
const shouldSkip = await shouldSkipImportedSourceWrite({
vaultRoot: params.config.vault.path,
syncKey: params.artifact.syncKey,
sourcePath: params.artifact.absolutePath,
expectedPagePath: pagePath,
expectedSourcePath: params.artifact.absolutePath,
sourceUpdatedAtMs: params.sourceUpdatedAtMs,
sourceSize: params.sourceSize,
renderFingerprint,
pagePath,
group: "bridge",
state: params.state,
buildRendered: (raw, updatedAt) => {
const contentLanguage =
params.artifact.artifactType === "memory-events" ? "json" : "markdown";
return renderWikiMarkdown({
frontmatter: {
pageType: "source",
id: pageId,
title,
sourceType:
params.artifact.artifactType === "memory-events"
? "memory-bridge-events"
: "memory-bridge",
sourcePath: params.artifact.absolutePath,
bridgeRelativePath: params.artifact.relativePath,
bridgeWorkspaceDir: params.artifact.workspaceDir,
bridgeAgentIds: params.agentIds,
status: "active",
updatedAt,
},
body: [
`# ${title}`,
"",
"## Bridge Source",
`- Workspace: \`${params.artifact.workspaceDir}\``,
`- Relative path: \`${params.artifact.relativePath}\``,
`- Kind: \`${params.artifact.artifactType}\``,
`- Agents: ${params.agentIds.length > 0 ? params.agentIds.join(", ") : "unknown"}`,
`- Updated: ${updatedAt}`,
"",
"## Content",
renderMarkdownFence(raw, contentLanguage),
"",
"## Notes",
"<!-- openclaw:human:start -->",
"<!-- openclaw:human:end -->",
"",
].join("\n"),
});
});
if (shouldSkip) {
return { pagePath, changed: false, created };
}
const raw = await fs.readFile(params.artifact.absolutePath, "utf8");
const contentLanguage = params.artifact.artifactType === "memory-events" ? "json" : "markdown";
const rendered = renderWikiMarkdown({
frontmatter: {
pageType: "source",
id: pageId,
title,
sourceType:
params.artifact.artifactType === "memory-events" ? "memory-bridge-events" : "memory-bridge",
sourcePath: params.artifact.absolutePath,
bridgeRelativePath: params.artifact.relativePath,
bridgeWorkspaceDir: params.artifact.workspaceDir,
bridgeAgentIds: params.agentIds,
status: "active",
updatedAt: sourceUpdatedAt,
},
body: [
`# ${title}`,
"",
"## Bridge Source",
`- Workspace: \`${params.artifact.workspaceDir}\``,
`- Relative path: \`${params.artifact.relativePath}\``,
`- Kind: \`${params.artifact.artifactType}\``,
`- Agents: ${params.agentIds.length > 0 ? params.agentIds.join(", ") : "unknown"}`,
`- Updated: ${sourceUpdatedAt}`,
"",
"## Content",
renderMarkdownFence(raw, contentLanguage),
"",
"## Notes",
"<!-- openclaw:human:start -->",
"<!-- openclaw:human:end -->",
"",
].join("\n"),
});
const existing = await fs.readFile(pageAbsPath, "utf8").catch(() => "");
if (existing !== rendered) {
await fs.writeFile(pageAbsPath, rendered, "utf8");
}
setImportedSourceEntry({
syncKey: params.artifact.syncKey,
state: params.state,
entry: {
group: "bridge",
pagePath,
sourcePath: params.artifact.absolutePath,
sourceUpdatedAtMs: params.sourceUpdatedAtMs,
sourceSize: params.sourceSize,
renderFingerprint,
},
});
return { pagePath, changed: existing !== rendered, created };
}
export async function syncMemoryWikiBridgeSources(params: {

View File

@@ -185,29 +185,6 @@ function formatJsonOrText<T>(
return json ? JSON.stringify(result, null, 2) : render(result);
}
async function runWikiCommandWithSummary<T>(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
run: () => Promise<T>;
render: (result: T) => string;
}): Promise<T> {
const result = await params.run();
writeOutput(formatJsonOrText(result, params.json, params.render), params.stdout);
return result;
}
async function runSyncedWikiCommandWithSummary<T>(params: {
config: ResolvedMemoryWikiConfig;
appConfig?: OpenClawConfig;
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
run: () => Promise<T>;
render: (result: T) => string;
}): Promise<T> {
await syncMemoryWikiImportedSources({ config: params.config, appConfig: params.appConfig });
return runWikiCommandWithSummary(params);
}
function addWikiSearchConfigOptions<T extends Command>(command: T): T {
return command
.option(
@@ -271,13 +248,15 @@ export async function runWikiInit(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runWikiCommandWithSummary({
json: params.json,
stdout: params.stdout,
run: () => initializeMemoryWikiVault(params.config),
render: (value) =>
const result = await initializeMemoryWikiVault(params.config);
const summary = formatJsonOrText(
result,
params.json,
(value) =>
`Initialized wiki vault at ${value.rootDir} (${value.createdDirectories.length} dirs, ${value.createdFiles.length} files).`,
});
);
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiCompile(params: {
@@ -286,15 +265,16 @@ export async function runWikiCompile(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runSyncedWikiCommandWithSummary({
config: params.config,
appConfig: params.appConfig,
json: params.json,
stdout: params.stdout,
run: () => compileMemoryWikiVault(params.config),
render: (value) =>
await syncMemoryWikiImportedSources({ config: params.config, appConfig: params.appConfig });
const result = await compileMemoryWikiVault(params.config);
const summary = formatJsonOrText(
result,
params.json,
(value) =>
`Compiled wiki vault at ${value.vaultRoot} (${value.pages.length} pages, ${value.updatedFiles.length} indexes updated).`,
});
);
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiLint(params: {
@@ -303,15 +283,16 @@ export async function runWikiLint(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runSyncedWikiCommandWithSummary({
config: params.config,
appConfig: params.appConfig,
json: params.json,
stdout: params.stdout,
run: () => lintMemoryWikiVault(params.config),
render: (value) =>
await syncMemoryWikiImportedSources({ config: params.config, appConfig: params.appConfig });
const result = await lintMemoryWikiVault(params.config);
const summary = formatJsonOrText(
result,
params.json,
(value) =>
`Linted wiki vault at ${value.vaultRoot} (${value.issueCount} issues, report: ${value.reportPath}).`,
});
);
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiIngest(params: {
@@ -321,18 +302,19 @@ export async function runWikiIngest(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runWikiCommandWithSummary({
json: params.json,
stdout: params.stdout,
run: () =>
ingestMemoryWikiSource({
config: params.config,
inputPath: params.inputPath,
title: params.title,
}),
render: (value) =>
`Ingested ${value.sourcePath} into ${value.pagePath}. Refreshed ${value.indexUpdatedFiles.length} index file${value.indexUpdatedFiles.length === 1 ? "" : "s"}.`,
const result = await ingestMemoryWikiSource({
config: params.config,
inputPath: params.inputPath,
title: params.title,
});
const summary = formatJsonOrText(
result,
params.json,
(value) =>
`Ingested ${value.sourcePath} into ${value.pagePath}. Refreshed ${value.indexUpdatedFiles.length} index file${value.indexUpdatedFiles.length === 1 ? "" : "s"}.`,
);
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiSearch(params: {
@@ -483,17 +465,18 @@ export async function runWikiBridgeImport(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runWikiCommandWithSummary({
json: params.json,
stdout: params.stdout,
run: () =>
syncMemoryWikiImportedSources({
config: params.config,
appConfig: params.appConfig,
}),
render: (value) =>
`Bridge import synced ${value.artifactCount} artifacts across ${value.workspaces} workspaces (${value.importedCount} new, ${value.updatedCount} updated, ${value.skippedCount} unchanged, ${value.removedCount} removed). Indexes ${value.indexesRefreshed ? `refreshed (${value.indexUpdatedFiles.length} files)` : `not refreshed (${value.indexRefreshReason})`}.`,
const result = await syncMemoryWikiImportedSources({
config: params.config,
appConfig: params.appConfig,
});
const summary = formatJsonOrText(
result,
params.json,
(value) =>
`Bridge import synced ${value.artifactCount} artifacts across ${value.workspaces} workspaces (${value.importedCount} new, ${value.updatedCount} updated, ${value.skippedCount} unchanged, ${value.removedCount} removed). Indexes ${value.indexesRefreshed ? `refreshed (${value.indexUpdatedFiles.length} files)` : `not refreshed (${value.indexRefreshReason})`}.`,
);
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiUnsafeLocalImport(params: {
@@ -502,17 +485,18 @@ export async function runWikiUnsafeLocalImport(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runWikiCommandWithSummary({
json: params.json,
stdout: params.stdout,
run: () =>
syncMemoryWikiImportedSources({
config: params.config,
appConfig: params.appConfig,
}),
render: (value) =>
`Unsafe-local import synced ${value.artifactCount} artifacts (${value.importedCount} new, ${value.updatedCount} updated, ${value.skippedCount} unchanged, ${value.removedCount} removed). Indexes ${value.indexesRefreshed ? `refreshed (${value.indexUpdatedFiles.length} files)` : `not refreshed (${value.indexRefreshReason})`}.`,
const result = await syncMemoryWikiImportedSources({
config: params.config,
appConfig: params.appConfig,
});
const summary = formatJsonOrText(
result,
params.json,
(value) =>
`Unsafe-local import synced ${value.artifactCount} artifacts (${value.importedCount} new, ${value.updatedCount} updated, ${value.skippedCount} unchanged, ${value.removedCount} removed). Indexes ${value.indexesRefreshed ? `refreshed (${value.indexUpdatedFiles.length} files)` : `not refreshed (${value.indexRefreshReason})`}.`,
);
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiObsidianStatus(params: {
@@ -520,15 +504,14 @@ export async function runWikiObsidianStatus(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runWikiCommandWithSummary({
json: params.json,
stdout: params.stdout,
run: () => probeObsidianCli(),
render: (value) =>
value.available
? `Obsidian CLI available at ${value.command}`
: "Obsidian CLI is not available on PATH.",
});
const result = await probeObsidianCli();
const summary = formatJsonOrText(result, params.json, (value) =>
value.available
? `Obsidian CLI available at ${value.command}`
: "Obsidian CLI is not available on PATH.",
);
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiObsidianSearch(params: {
@@ -537,12 +520,10 @@ export async function runWikiObsidianSearch(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runWikiCommandWithSummary({
json: params.json,
stdout: params.stdout,
run: () => runObsidianSearch({ config: params.config, query: params.query }),
render: (value) => value.stdout.trim(),
});
const result = await runObsidianSearch({ config: params.config, query: params.query });
const summary = formatJsonOrText(result, params.json, (value) => value.stdout.trim());
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiObsidianOpenCli(params: {
@@ -551,12 +532,14 @@ export async function runWikiObsidianOpenCli(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runWikiCommandWithSummary({
json: params.json,
stdout: params.stdout,
run: () => runObsidianOpen({ config: params.config, vaultPath: params.vaultPath }),
render: (value) => value.stdout.trim() || "Opened in Obsidian.",
});
const result = await runObsidianOpen({ config: params.config, vaultPath: params.vaultPath });
const summary = formatJsonOrText(
result,
params.json,
(value) => value.stdout.trim() || "Opened in Obsidian.",
);
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiObsidianCommandCli(params: {
@@ -565,12 +548,14 @@ export async function runWikiObsidianCommandCli(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runWikiCommandWithSummary({
json: params.json,
stdout: params.stdout,
run: () => runObsidianCommand({ config: params.config, id: params.id }),
render: (value) => value.stdout.trim() || "Command sent to Obsidian.",
});
const result = await runObsidianCommand({ config: params.config, id: params.id });
const summary = formatJsonOrText(
result,
params.json,
(value) => value.stdout.trim() || "Command sent to Obsidian.",
);
writeOutput(summary, params.stdout);
return result;
}
export async function runWikiObsidianDailyCli(params: {
@@ -578,12 +563,14 @@ export async function runWikiObsidianDailyCli(params: {
json?: boolean;
stdout?: Pick<NodeJS.WriteStream, "write">;
}) {
return runWikiCommandWithSummary({
json: params.json,
stdout: params.stdout,
run: () => runObsidianDaily({ config: params.config }),
render: (value) => value.stdout.trim() || "Opened today's daily note.",
});
const result = await runObsidianDaily({ config: params.config });
const summary = formatJsonOrText(
result,
params.json,
(value) => value.stdout.trim() || "Opened today's daily note.",
);
writeOutput(summary, params.stdout);
return result;
}
export function registerWikiCli(

View File

@@ -1,61 +0,0 @@
import fs from "node:fs/promises";
import path from "node:path";
import { pathExists } from "./source-path-shared.js";
import {
setImportedSourceEntry,
shouldSkipImportedSourceWrite,
type MemoryWikiImportedSourceGroup,
} from "./source-sync-state.js";
type ImportedSourceState = Parameters<typeof shouldSkipImportedSourceWrite>[0]["state"];
export async function writeImportedSourcePage(params: {
vaultRoot: string;
syncKey: string;
sourcePath: string;
sourceUpdatedAtMs: number;
sourceSize: number;
renderFingerprint: string;
pagePath: string;
group: MemoryWikiImportedSourceGroup;
state: ImportedSourceState;
buildRendered: (raw: string, updatedAt: string) => string;
}): Promise<{ pagePath: string; changed: boolean; created: boolean }> {
const pageAbsPath = path.join(params.vaultRoot, params.pagePath);
const created = !(await pathExists(pageAbsPath));
const updatedAt = new Date(params.sourceUpdatedAtMs).toISOString();
const shouldSkip = await shouldSkipImportedSourceWrite({
vaultRoot: params.vaultRoot,
syncKey: params.syncKey,
expectedPagePath: params.pagePath,
expectedSourcePath: params.sourcePath,
sourceUpdatedAtMs: params.sourceUpdatedAtMs,
sourceSize: params.sourceSize,
renderFingerprint: params.renderFingerprint,
state: params.state,
});
if (shouldSkip) {
return { pagePath: params.pagePath, changed: false, created };
}
const raw = await fs.readFile(params.sourcePath, "utf8");
const rendered = params.buildRendered(raw, updatedAt);
const existing = await fs.readFile(pageAbsPath, "utf8").catch(() => "");
if (existing !== rendered) {
await fs.writeFile(pageAbsPath, rendered, "utf8");
}
setImportedSourceEntry({
syncKey: params.syncKey,
state: params.state,
entry: {
group: params.group,
pagePath: params.pagePath,
sourcePath: params.sourcePath,
sourceUpdatedAtMs: params.sourceUpdatedAtMs,
sourceSize: params.sourceSize,
renderFingerprint: params.renderFingerprint,
},
});
return { pagePath: params.pagePath, changed: existing !== rendered, created };
}

View File

@@ -5,11 +5,12 @@ import type { BridgeMemoryWikiResult } from "./bridge.js";
import type { ResolvedMemoryWikiConfig } from "./config.js";
import { appendMemoryWikiLog } from "./log.js";
import { renderMarkdownFence, renderWikiMarkdown, slugifyWikiSegment } from "./markdown.js";
import { writeImportedSourcePage } from "./source-page-shared.js";
import { resolveArtifactKey } from "./source-path-shared.js";
import { pathExists, resolveArtifactKey } from "./source-path-shared.js";
import {
pruneImportedSourceEntries,
readMemoryWikiSourceSyncState,
setImportedSourceEntry,
shouldSkipImportedSourceWrite,
writeMemoryWikiSourceSyncState,
} from "./source-sync-state.js";
import { initializeMemoryWikiVault } from "./vault.js";
@@ -128,6 +129,9 @@ async function writeUnsafeLocalSourcePage(params: {
configuredPath: params.artifact.configuredPath,
absolutePath: params.artifact.absolutePath,
});
const pageAbsPath = path.join(params.config.vault.path, pagePath);
const created = !(await pathExists(pageAbsPath));
const updatedAt = new Date(params.sourceUpdatedAtMs).toISOString();
const title = resolveUnsafeLocalTitle(params.artifact);
const renderFingerprint = createHash("sha1")
.update(
@@ -137,48 +141,67 @@ async function writeUnsafeLocalSourcePage(params: {
}),
)
.digest("hex");
return writeImportedSourcePage({
const shouldSkip = await shouldSkipImportedSourceWrite({
vaultRoot: params.config.vault.path,
syncKey: params.artifact.syncKey,
sourcePath: params.artifact.absolutePath,
expectedPagePath: pagePath,
expectedSourcePath: params.artifact.absolutePath,
sourceUpdatedAtMs: params.sourceUpdatedAtMs,
sourceSize: params.sourceSize,
renderFingerprint,
pagePath,
group: "unsafe-local",
state: params.state,
buildRendered: (raw, updatedAt) =>
renderWikiMarkdown({
frontmatter: {
pageType: "source",
id: pageId,
title,
sourceType: "memory-unsafe-local",
provenanceMode: "unsafe-local",
sourcePath: params.artifact.absolutePath,
unsafeLocalConfiguredPath: params.artifact.configuredPath,
unsafeLocalRelativePath: params.artifact.relativePath,
status: "active",
updatedAt,
},
body: [
`# ${title}`,
"",
"## Unsafe Local Source",
`- Configured path: \`${params.artifact.configuredPath}\``,
`- Relative path: \`${params.artifact.relativePath}\``,
`- Updated: ${updatedAt}`,
"",
"## Content",
renderMarkdownFence(raw, detectFenceLanguage(params.artifact.absolutePath)),
"",
"## Notes",
"<!-- openclaw:human:start -->",
"<!-- openclaw:human:end -->",
"",
].join("\n"),
}),
});
if (shouldSkip) {
return { pagePath, changed: false, created };
}
const raw = await fs.readFile(params.artifact.absolutePath, "utf8");
const rendered = renderWikiMarkdown({
frontmatter: {
pageType: "source",
id: pageId,
title,
sourceType: "memory-unsafe-local",
provenanceMode: "unsafe-local",
sourcePath: params.artifact.absolutePath,
unsafeLocalConfiguredPath: params.artifact.configuredPath,
unsafeLocalRelativePath: params.artifact.relativePath,
status: "active",
updatedAt,
},
body: [
`# ${title}`,
"",
"## Unsafe Local Source",
`- Configured path: \`${params.artifact.configuredPath}\``,
`- Relative path: \`${params.artifact.relativePath}\``,
`- Updated: ${updatedAt}`,
"",
"## Content",
renderMarkdownFence(raw, detectFenceLanguage(params.artifact.absolutePath)),
"",
"## Notes",
"<!-- openclaw:human:start -->",
"<!-- openclaw:human:end -->",
"",
].join("\n"),
});
const existing = await fs.readFile(pageAbsPath, "utf8").catch(() => "");
if (existing !== rendered) {
await fs.writeFile(pageAbsPath, rendered, "utf8");
}
setImportedSourceEntry({
syncKey: params.artifact.syncKey,
state: params.state,
entry: {
group: "unsafe-local",
pagePath,
sourcePath: params.artifact.absolutePath,
sourceUpdatedAtMs: params.sourceUpdatedAtMs,
sourceSize: params.sourceSize,
renderFingerprint,
},
});
return { pagePath, changed: existing !== rendered, created };
}
export async function syncMemoryWikiUnsafeLocalSources(

View File

@@ -1,6 +1,5 @@
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
import { buildMicrosoftFoundryProvider } from "./provider.js";
import { buildMicrosoftFoundryRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
export default definePluginEntry({
id: "microsoft-foundry",
@@ -8,6 +7,5 @@ export default definePluginEntry({
description: "Microsoft Foundry provider with Entra ID and API key auth",
register(api) {
api.registerProvider(buildMicrosoftFoundryProvider());
api.registerRealtimeTranscriptionProvider(buildMicrosoftFoundryRealtimeTranscriptionProvider());
},
});

View File

@@ -1,58 +0,0 @@
import { describe, expect, it } from "vitest";
import { buildMicrosoftFoundryRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
describe("buildMicrosoftFoundryRealtimeTranscriptionProvider", () => {
it("normalizes foundry config from the voice provider block", () => {
const provider = buildMicrosoftFoundryRealtimeTranscriptionProvider();
const resolved = provider.resolveConfig?.({
cfg: {} as never,
rawConfig: {
providers: {
"microsoft-foundry": {
apiKey: "azure-test-key",
baseUrl: "https://example.services.ai.azure.com/openai/v1",
deployment: "gpt-realtime",
apiVersion: "2025-04-01-preview",
},
},
},
});
expect(resolved).toEqual({
apiKey: "azure-test-key",
baseUrl: "https://example.services.ai.azure.com/openai/v1",
deployment: "gpt-realtime",
apiVersion: "2025-04-01-preview",
});
});
it("accepts model-provider style config with api-key headers", () => {
const provider = buildMicrosoftFoundryRealtimeTranscriptionProvider();
const resolved = provider.resolveConfig?.({
cfg: {} as never,
rawConfig: {
providers: {
"microsoft-foundry": {
baseUrl: "https://example.services.ai.azure.com/openai/v1",
headers: {
"api-key": "azure-test-key",
},
model: "gpt-realtime",
},
},
},
});
expect(resolved).toEqual({
apiKey: "azure-test-key",
baseUrl: "https://example.services.ai.azure.com/openai/v1",
deployment: "gpt-realtime",
model: "gpt-realtime",
});
});
it("registers foundry aliases for voice provider selection", () => {
const provider = buildMicrosoftFoundryRealtimeTranscriptionProvider();
expect(provider.aliases).toContain("azure-foundry");
});
});

View File

@@ -1,313 +0,0 @@
import type {
RealtimeTranscriptionProviderConfig,
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
RealtimeTranscriptionSessionCreateRequest,
} from "openclaw/plugin-sdk/realtime-transcription";
import WebSocket from "ws";
import { normalizeFoundryEndpoint, PROVIDER_ID } from "./shared.js";
type FoundryRealtimeTranscriptionProviderConfig = {
apiKey?: string;
baseUrl?: string;
endpoint?: string;
deployment?: string;
model?: string;
apiVersion?: string;
silenceDurationMs?: number;
vadThreshold?: number;
};
type FoundryRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & {
apiKey: string;
baseUrl: string;
deployment: string;
apiVersion: string;
silenceDurationMs: number;
vadThreshold: number;
};
type RealtimeEvent = {
type: string;
delta?: string;
transcript?: string;
error?: unknown;
item?: { transcript?: string } | null;
};
function trimToUndefined(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function asNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function asObject(value: unknown): Record<string, unknown> | undefined {
return typeof value === "object" && value !== null && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
function extractFoundryProviderConfig(
rawConfig: RealtimeTranscriptionProviderConfig,
): FoundryRealtimeTranscriptionProviderConfig {
const providers = asObject(rawConfig.providers);
const raw =
asObject(providers?.[PROVIDER_ID]) ??
asObject(rawConfig[PROVIDER_ID]) ??
asObject(rawConfig.microsoftFoundry) ??
asObject(rawConfig);
const providerBaseUrl = trimToUndefined(raw?.baseUrl);
const endpoint = trimToUndefined(raw?.endpoint);
return {
apiKey:
trimToUndefined(raw?.apiKey) ??
trimToUndefined(asObject(raw?.headers)?.["api-key"]) ??
trimToUndefined(asObject(raw?.headers)?.Authorization)?.replace(/^Bearer\s+/i, ""),
baseUrl: providerBaseUrl,
endpoint,
deployment:
trimToUndefined(raw?.deployment) ??
trimToUndefined(raw?.model) ??
trimToUndefined(raw?.deploymentName),
model: trimToUndefined(raw?.transcriptionModel) ?? trimToUndefined(raw?.model),
apiVersion: trimToUndefined(raw?.apiVersion),
silenceDurationMs: asNumber(raw?.silenceDurationMs),
vadThreshold: asNumber(raw?.vadThreshold),
};
}
function resolveFoundryRealtimeBaseUrl(
config: FoundryRealtimeTranscriptionProviderConfig,
): string | undefined {
if (config.endpoint) {
return normalizeFoundryEndpoint(config.endpoint);
}
if (!config.baseUrl) {
return undefined;
}
return normalizeFoundryEndpoint(config.baseUrl);
}
class FoundryRealtimeTranscriptionSession implements RealtimeTranscriptionSession {
private static readonly MAX_RECONNECT_ATTEMPTS = 5;
private static readonly RECONNECT_DELAY_MS = 1000;
private static readonly CONNECT_TIMEOUT_MS = 10_000;
private ws: WebSocket | null = null;
private connected = false;
private closed = false;
private reconnectAttempts = 0;
private pendingTranscript = "";
constructor(private readonly config: FoundryRealtimeTranscriptionSessionConfig) {}
async connect(): Promise<void> {
this.closed = false;
this.reconnectAttempts = 0;
await this.doConnect();
}
sendAudio(audio: Buffer): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
return;
}
this.sendEvent({
type: "input_audio_buffer.append",
audio: audio.toString("base64"),
});
}
close(): void {
this.closed = true;
this.connected = false;
if (this.ws) {
this.ws.close(1000, "Transcription session closed");
this.ws = null;
}
}
isConnected(): boolean {
return this.connected;
}
private async doConnect(): Promise<void> {
await new Promise<void>((resolve, reject) => {
const wsUrl = this.buildWebSocketUrl();
this.ws = new WebSocket(wsUrl, {
headers: {
"api-key": this.config.apiKey,
},
});
const connectTimeout = setTimeout(() => {
reject(new Error("Microsoft Foundry realtime transcription connection timeout"));
}, FoundryRealtimeTranscriptionSession.CONNECT_TIMEOUT_MS);
this.ws.on("open", () => {
clearTimeout(connectTimeout);
this.connected = true;
this.reconnectAttempts = 0;
this.sendEvent({
type: "session.update",
session: {
input_audio_format: "pcm16",
input_audio_transcription: {
model: this.config.deployment,
},
turn_detection: {
type: "server_vad",
threshold: this.config.vadThreshold,
prefix_padding_ms: 300,
silence_duration_ms: this.config.silenceDurationMs,
},
},
});
resolve();
});
this.ws.on("message", (data: Buffer) => {
try {
this.handleEvent(JSON.parse(data.toString()) as RealtimeEvent);
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
}
});
this.ws.on("error", (error) => {
if (!this.connected) {
clearTimeout(connectTimeout);
reject(error);
return;
}
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
});
this.ws.on("close", () => {
this.connected = false;
if (this.closed) {
return;
}
void this.attemptReconnect();
});
});
}
private buildWebSocketUrl(): string {
const httpBaseUrl = this.config.baseUrl.replace(/\/+$/, "");
const wsBaseUrl = httpBaseUrl.replace(/^http:/i, "ws:").replace(/^https:/i, "wss:");
const url = new URL(`${wsBaseUrl}/openai/realtime`);
url.searchParams.set("api-version", this.config.apiVersion);
url.searchParams.set("deployment", this.config.deployment);
return url.toString();
}
private async attemptReconnect(): Promise<void> {
if (this.closed) {
return;
}
if (this.reconnectAttempts >= FoundryRealtimeTranscriptionSession.MAX_RECONNECT_ATTEMPTS) {
this.config.onError?.(
new Error("Microsoft Foundry realtime transcription reconnect limit reached"),
);
return;
}
this.reconnectAttempts += 1;
const delay =
FoundryRealtimeTranscriptionSession.RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1);
await new Promise((resolve) => setTimeout(resolve, delay));
if (this.closed) {
return;
}
try {
await this.doConnect();
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
await this.attemptReconnect();
}
}
private handleEvent(event: RealtimeEvent): void {
switch (event.type) {
case "conversation.item.input_audio_transcription.delta":
case "conversation.item.audio_transcription.delta":
if (event.delta) {
this.pendingTranscript += event.delta;
this.config.onPartial?.(this.pendingTranscript);
}
return;
case "conversation.item.input_audio_transcription.completed":
case "conversation.item.audio_transcription.completed": {
const transcript = event.transcript ?? event.item?.transcript;
if (transcript) {
this.config.onTranscript?.(transcript);
}
this.pendingTranscript = "";
return;
}
case "input_audio_buffer.speech_started":
this.pendingTranscript = "";
this.config.onSpeechStart?.();
return;
case "error": {
const detail =
event.error && typeof event.error === "object" && "message" in event.error
? String((event.error as { message?: unknown }).message ?? "Unknown error")
: event.error
? String(event.error)
: "Unknown error";
this.config.onError?.(new Error(detail));
return;
}
default:
return;
}
}
private sendEvent(event: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(event));
}
}
}
export function buildMicrosoftFoundryRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin {
return {
id: PROVIDER_ID,
label: "Microsoft Foundry Realtime Transcription",
aliases: ["azure-foundry", "azure-openai-foundry"],
autoSelectOrder: 20,
resolveConfig: ({ rawConfig }) => extractFoundryProviderConfig(rawConfig),
isConfigured: ({ providerConfig }) => {
const config = extractFoundryProviderConfig(providerConfig);
return Boolean(config.apiKey && resolveFoundryRealtimeBaseUrl(config) && config.deployment);
},
createSession: (req) => {
const config = extractFoundryProviderConfig(req.providerConfig);
const baseUrl = resolveFoundryRealtimeBaseUrl(config);
if (!config.apiKey) {
throw new Error("Microsoft Foundry realtime transcription API key missing");
}
if (!baseUrl) {
throw new Error("Microsoft Foundry realtime transcription endpoint missing");
}
if (!config.deployment) {
throw new Error("Microsoft Foundry realtime transcription deployment missing");
}
return new FoundryRealtimeTranscriptionSession({
...req,
apiKey: config.apiKey,
baseUrl,
deployment: config.deployment,
apiVersion: config.apiVersion ?? "2025-04-01-preview",
silenceDurationMs: config.silenceDurationMs ?? 800,
vadThreshold: config.vadThreshold ?? 0.5,
});
},
};
}

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-contract.js";

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-contract.js";

View File

@@ -18,7 +18,6 @@ type OpenAIRealtimeTranscriptionProviderConfig = {
model?: string;
silenceDurationMs?: number;
vadThreshold?: number;
inputAudioFormat?: string;
};
type OpenAIRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & {
@@ -26,7 +25,6 @@ type OpenAIRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCrea
model: string;
silenceDurationMs: number;
vadThreshold: number;
inputAudioFormat: string;
};
type RealtimeEvent = {
@@ -53,7 +51,6 @@ function normalizeProviderConfig(
model: trimToUndefined(raw?.model) ?? trimToUndefined(raw?.sttModel),
silenceDurationMs: asFiniteNumber(raw?.silenceDurationMs),
vadThreshold: asFiniteNumber(raw?.vadThreshold),
inputAudioFormat: trimToUndefined(raw?.inputAudioFormat),
};
}
@@ -119,7 +116,7 @@ class OpenAIRealtimeTranscriptionSession implements RealtimeTranscriptionSession
this.sendEvent({
type: "transcription_session.update",
session: {
input_audio_format: this.config.inputAudioFormat,
input_audio_format: "g711_ulaw",
input_audio_transcription: {
model: this.config.model,
},
@@ -244,7 +241,6 @@ export function buildOpenAIRealtimeTranscriptionProvider(): RealtimeTranscriptio
model: config.model ?? "gpt-4o-transcribe",
silenceDurationMs: config.silenceDurationMs ?? 800,
vadThreshold: config.vadThreshold ?? 0.5,
inputAudioFormat: config.inputAudioFormat ?? "g711_ulaw",
});
},
};

View File

@@ -5,31 +5,6 @@ import { startQaLabServer } from "./lab-server.js";
import { startQaMockOpenAiServer } from "./mock-openai-server.js";
import { runQaSuite } from "./suite.js";
type InterruptibleServer = {
baseUrl: string;
stop(): Promise<void>;
};
async function runInterruptibleServer(label: string, server: InterruptibleServer) {
process.stdout.write(`${label}: ${server.baseUrl}\n`);
process.stdout.write("Press Ctrl+C to stop.\n");
const shutdown = async () => {
process.off("SIGINT", onSignal);
process.off("SIGTERM", onSignal);
await server.stop();
process.exit(0);
};
const onSignal = () => {
void shutdown();
};
process.on("SIGINT", onSignal);
process.on("SIGTERM", onSignal);
await new Promise(() => undefined);
}
export async function runQaLabSelfCheckCommand(opts: { output?: string }) {
const server = await startQaLabServer({
outputPath: opts.output,
@@ -85,7 +60,23 @@ export async function runQaLabUiCommand(opts: {
embeddedGateway: opts.embeddedGateway,
sendKickoffOnStart: opts.sendKickoffOnStart,
});
await runInterruptibleServer("QA Lab UI", server);
process.stdout.write(`QA Lab UI: ${server.baseUrl}\n`);
process.stdout.write("Press Ctrl+C to stop.\n");
const shutdown = async () => {
process.off("SIGINT", onSignal);
process.off("SIGTERM", onSignal);
await server.stop();
process.exit(0);
};
const onSignal = () => {
void shutdown();
};
process.on("SIGINT", onSignal);
process.on("SIGTERM", onSignal);
await new Promise(() => undefined);
}
export async function runQaDockerScaffoldCommand(opts: {
@@ -147,5 +138,21 @@ export async function runQaMockOpenAiCommand(opts: { host?: string; port?: numbe
host: opts.host,
port: Number.isFinite(opts.port) ? opts.port : undefined,
});
await runInterruptibleServer("QA mock OpenAI", server);
process.stdout.write(`QA mock OpenAI: ${server.baseUrl}\n`);
process.stdout.write("Press Ctrl+C to stop.\n");
const shutdown = async () => {
process.off("SIGINT", onSignal);
process.off("SIGTERM", onSignal);
await server.stop();
process.exit(0);
};
const onSignal = () => {
void shutdown();
};
process.on("SIGINT", onSignal);
process.on("SIGTERM", onSignal);
await new Promise(() => undefined);
}

View File

@@ -1,4 +1,4 @@
import { mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { mkdtemp, readFile, rm } from "node:fs/promises";
import { createServer } from "node:http";
import os from "node:os";
import path from "node:path";
@@ -182,20 +182,9 @@ describe("qa-lab server", () => {
});
it("serves the built QA UI bundle when available", async () => {
const uiDistDir = await mkdtemp(path.join(os.tmpdir(), "qa-lab-ui-dist-"));
cleanups.push(async () => {
await rm(uiDistDir, { recursive: true, force: true });
});
await writeFile(
path.join(uiDistDir, "index.html"),
"<!doctype html><html><head><title>QA Lab</title></head><body><div id='app'></div></body></html>",
"utf8",
);
const lab = await startQaLabServer({
host: "127.0.0.1",
port: 0,
uiDistDir,
});
cleanups.push(async () => {
await lab.stop();

View File

@@ -159,24 +159,13 @@ function missingUiHtml() {
</html>`;
}
function resolveUiDistDir(overrideDir?: string | null) {
if (overrideDir?.trim()) {
return overrideDir;
}
function resolveUiDistDir() {
const candidates = [
fileURLToPath(new URL("../web/dist", import.meta.url)),
path.resolve(process.cwd(), "extensions/qa-lab/web/dist"),
path.resolve(process.cwd(), "dist/extensions/qa-lab/web/dist"),
];
return (
candidates.find((candidate) => {
if (!fs.existsSync(candidate)) {
return false;
}
const indexPath = path.join(candidate, "index.html");
return fs.existsSync(indexPath) && fs.statSync(indexPath).isFile();
}) ?? candidates[0]
);
return candidates.find((candidate) => fs.existsSync(candidate)) ?? candidates[0];
}
function resolveAdvertisedBaseUrl(params: {
@@ -346,8 +335,8 @@ function proxyUpgradeRequest(params: {
params.socket.on("close", closeBoth);
}
function tryResolveUiAsset(pathname: string, overrideDir?: string | null): string | null {
const distDir = resolveUiDistDir(overrideDir);
function tryResolveUiAsset(pathname: string): string | null {
const distDir = resolveUiDistDir();
if (!fs.existsSync(distDir)) {
return null;
}
@@ -426,7 +415,6 @@ export async function startQaLabServer(params?: {
controlUiUrl?: string;
controlUiToken?: string;
controlUiProxyTarget?: string;
uiDistDir?: string;
autoKickoffTarget?: string;
embeddedGateway?: string;
sendKickoffOnStart?: boolean;
@@ -688,7 +676,7 @@ export async function startQaLabServer(params?: {
return;
}
const asset = tryResolveUiAsset(url.pathname, params?.uiDistDir);
const asset = tryResolveUiAsset(url.pathname);
if (!asset) {
const html = missingUiHtml();
res.writeHead(200, {

View File

@@ -70,7 +70,17 @@ function extractLastUserText(input: ResponsesInputItem[]) {
if (item.role !== "user" || !Array.isArray(item.content)) {
continue;
}
const text = extractInputText(item.content);
const text = item.content
.filter(
(entry): entry is { type: "input_text"; text: string } =>
!!entry &&
typeof entry === "object" &&
(entry as { type?: unknown }).type === "input_text" &&
typeof (entry as { text?: unknown }).text === "string",
)
.map((entry) => entry.text)
.join("\n")
.trim();
if (text) {
return text;
}
@@ -99,27 +109,23 @@ function extractToolOutput(input: ResponsesInputItem[]) {
return "";
}
function extractInputText(content: unknown[]): string {
return content
.filter(
(entry): entry is { type: "input_text"; text: string } =>
!!entry &&
typeof entry === "object" &&
(entry as { type?: unknown }).type === "input_text" &&
typeof (entry as { text?: unknown }).text === "string",
)
.map((entry) => entry.text)
.join("\n")
.trim();
}
function extractAllUserTexts(input: ResponsesInputItem[]) {
const texts: string[] = [];
for (const item of input) {
if (item.role !== "user" || !Array.isArray(item.content)) {
continue;
}
const text = extractInputText(item.content);
const text = item.content
.filter(
(entry): entry is { type: "input_text"; text: string } =>
!!entry &&
typeof entry === "object" &&
(entry as { type?: unknown }).type === "input_text" &&
typeof (entry as { text?: unknown }).text === "string",
)
.map((entry) => entry.text)
.join("\n")
.trim();
if (text) {
texts.push(text);
}
@@ -136,7 +142,17 @@ function extractAllInputTexts(input: ResponsesInputItem[]) {
if (!Array.isArray(item.content)) {
continue;
}
const text = extractInputText(item.content);
const text = item.content
.filter(
(entry): entry is { type: "input_text"; text: string } =>
!!entry &&
typeof entry === "object" &&
(entry as { type?: unknown }).type === "input_text" &&
typeof (entry as { text?: unknown }).text === "string",
)
.map((entry) => entry.text)
.join("\n")
.trim();
if (text) {
texts.push(text);
}

View File

@@ -124,10 +124,6 @@ export function buildQaGatewayConfig(params: {
providerMode === "live-openai"
? Object.fromEntries(selectedProviderIds.map((providerId) => [providerId, { enabled: true }]))
: {};
const allowedPlugins =
providerMode === "live-openai"
? ["memory-core", ...selectedProviderIds, "qa-channel"]
: ["memory-core", "qa-channel"];
const liveModelParams =
providerMode === "live-openai"
? {
@@ -151,7 +147,7 @@ export function buildQaGatewayConfig(params: {
return {
plugins: {
allow: allowedPlugins,
...(providerMode === "mock-openai" ? { allow: ["memory-core", "qa-channel"] } : {}),
entries: {
acpx: {
enabled: false,

View File

@@ -71,10 +71,7 @@ export function createSignalToolResultConfig(
};
}
export async function flush() {
await Promise.resolve();
await Promise.resolve();
}
export const flush = () => new Promise((resolve) => setTimeout(resolve, 0));
export function createMockSignalDaemonHandle(
overrides: {

View File

@@ -51,10 +51,7 @@ import {
import { resolveSlackChannelType } from "./channel-type.js";
import { shouldSuppressLocalSlackExecApprovalPrompt } from "./exec-approvals.js";
import { resolveSlackGroupRequireMention, resolveSlackGroupToolPolicy } from "./group-policy.js";
import {
compileSlackInteractiveReplies,
isSlackInteractiveRepliesEnabled,
} from "./interactive-replies.js";
import { isSlackInteractiveRepliesEnabled } from "./interactive-replies.js";
import { SLACK_TEXT_LIMIT } from "./limits.js";
import { slackOutbound } from "./outbound-adapter.js";
import type { SlackProbe } from "./probe.js";
@@ -327,10 +324,6 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount, SlackProbe> = crea
parseExplicitTarget: ({ raw }) => parseSlackExplicitTarget(raw),
inferTargetChatType: ({ to }) => parseSlackExplicitTarget(to)?.chatType,
resolveOutboundSessionRoute: async (params) => await resolveSlackOutboundSessionRoute(params),
transformReplyPayload: ({ payload, cfg, accountId }) =>
isSlackInteractiveRepliesEnabled({ cfg, accountId })
? compileSlackInteractiveReplies(payload)
: payload,
enableInteractiveReplies: ({ cfg, accountId }) =>
isSlackInteractiveRepliesEnabled({ cfg, accountId }),
hasStructuredReplyPayload: ({ payload }) => {

View File

@@ -4,6 +4,7 @@ import type {
} from "openclaw/plugin-sdk/channel-contract";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import {
asObjectRecord,
hasLegacyAccountStreamingAliases,
hasLegacyStreamingAliases,
normalizeLegacyDmAliases,
@@ -11,12 +12,6 @@ import {
} from "openclaw/plugin-sdk/runtime-doctor";
import { resolveSlackNativeStreaming, resolveSlackStreamingMode } from "./streaming-compat.js";
function asObjectRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: null;
}
function hasLegacySlackStreamingAliases(value: unknown): boolean {
return hasLegacyStreamingAliases(value, { includeNativeTransport: true });
}

View File

@@ -9,7 +9,6 @@ export {
isTtsProviderConfigured,
listSpeechVoices,
maybeApplyTtsToPayload,
resolveExplicitTtsOverrides,
resolveTtsAutoMode,
resolveTtsConfig,
resolveTtsPrefsPath,

View File

@@ -23,7 +23,7 @@ import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-pay
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import { isVerbose, logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/sandbox";
import { resolveConfigDir, resolveUserPath, stripMarkdown } from "openclaw/plugin-sdk/text-runtime";
import { CONFIG_DIR, resolveUserPath, stripMarkdown } from "openclaw/plugin-sdk/text-runtime";
import {
canonicalizeSpeechProviderId,
getSpeechProvider,
@@ -35,7 +35,6 @@ import {
summarizeText,
type SpeechModelOverridePolicy,
type SpeechProviderConfig,
type SpeechProviderOverrides,
type SpeechVoiceOption,
type TtsDirectiveOverrides,
type TtsDirectiveParseResult,
@@ -168,7 +167,7 @@ function resolveTtsPrefsPathValue(prefsPath: string | undefined): string {
if (envPath) {
return resolveUserPath(envPath);
}
return path.join(resolveConfigDir(process.env), "settings", "tts.json");
return path.join(CONFIG_DIR, "settings", "tts.json");
}
function resolveModelOverridePolicy(
@@ -495,66 +494,6 @@ export function setTtsProvider(prefsPath: string, provider: TtsProvider): void {
});
}
export function resolveExplicitTtsOverrides(params: {
cfg: OpenClawConfig;
prefsPath?: string;
provider?: string;
modelId?: string;
voiceId?: string;
}): TtsDirectiveOverrides {
const providerInput = params.provider?.trim();
const modelId = params.modelId?.trim();
const voiceId = params.voiceId?.trim();
const config = resolveTtsConfig(params.cfg);
const prefsPath = params.prefsPath ?? resolveTtsPrefsPath(config);
const selectedProvider =
canonicalizeSpeechProviderId(providerInput, params.cfg) ??
(modelId || voiceId ? getTtsProvider(config, prefsPath) : undefined);
if (providerInput && !selectedProvider) {
throw new Error(`Unknown TTS provider "${providerInput}".`);
}
if (!modelId && !voiceId) {
return selectedProvider ? { provider: selectedProvider } : {};
}
if (!selectedProvider) {
throw new Error("TTS model or voice overrides require a resolved provider.");
}
const provider = getSpeechProvider(selectedProvider, params.cfg);
if (!provider) {
throw new Error(`speech provider ${selectedProvider} is not registered`);
}
if (!provider.resolveTalkOverrides) {
throw new Error(
`TTS provider "${selectedProvider}" does not support model or voice overrides.`,
);
}
const providerOverrides = provider.resolveTalkOverrides({
talkProviderConfig: {},
params: {
...(voiceId ? { voiceId } : {}),
...(modelId ? { modelId } : {}),
},
});
if ((voiceId || modelId) && (!providerOverrides || Object.keys(providerOverrides).length === 0)) {
throw new Error(
`TTS provider "${selectedProvider}" ignored the requested model or voice overrides.`,
);
}
const overridesRecord = providerOverrides as SpeechProviderOverrides;
return {
provider: selectedProvider,
providerOverrides: {
[provider.id]: overridesRecord,
},
};
}
export function getTtsMaxLength(prefsPath: string): number {
const prefs = readPrefs(prefsPath);
return prefs.tts?.maxLength ?? DEFAULT_TTS_MAX_LENGTH;

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-contract.js";

View File

@@ -4,18 +4,13 @@ import type {
} from "openclaw/plugin-sdk/channel-contract";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import {
asObjectRecord,
hasLegacyAccountStreamingAliases,
hasLegacyStreamingAliases,
normalizeLegacyStreamingAliases,
} from "openclaw/plugin-sdk/runtime-doctor";
import { resolveTelegramPreviewStreamMode } from "./preview-streaming.js";
function asObjectRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: null;
}
function hasLegacyTelegramStreamingAliases(value: unknown): boolean {
return hasLegacyStreamingAliases(value, { includePreviewChunk: true });
}

View File

@@ -3,8 +3,8 @@ import os from "node:os";
import path from "node:path";
import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runtime";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { loadBundledPluginTestApiSync } from "openclaw/plugin-sdk/testing";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { loadBundledPluginTestApiSync } from "../../../src/test-utils/bundled-plugin-public-surface.js";
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
import {
__testing,

View File

@@ -856,6 +856,7 @@ export async function resetTelegramThreadBindingsForTests() {
for (const manager of getThreadBindingsState().managersByAccountId.values()) {
manager.stop();
}
await Promise.allSettled(getThreadBindingsState().persistQueueByAccountId.values());
getThreadBindingsState().persistQueueByAccountId.clear();
getThreadBindingsState().managersByAccountId.clear();
getThreadBindingsState().bindingsByAccountConversation.clear();

View File

@@ -1,10 +1,14 @@
import type { RuntimeEnv } from "../../api.js";
import { asRecord, extractCites, extractMessageText, type ParsedCite } from "./utils.js";
import { extractCites, extractMessageText, type ParsedCite } from "./utils.js";
type TlonScryApi = {
scry: (path: string) => Promise<unknown>;
};
function asRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === "object" ? (value as Record<string, unknown>) : null;
}
export function createTlonCitationResolver(params: { api: TlonScryApi; runtime: RuntimeEnv }) {
const { api, runtime } = params;

View File

@@ -1,6 +1,14 @@
import type { RuntimeEnv } from "../../api.js";
import type { Foreigns } from "../urbit/foreigns.js";
import { asRecord, formatChangesDate, formatErrorMessage } from "./utils.js";
import { formatChangesDate } from "./utils.js";
function asRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === "object" ? (value as Record<string, unknown>) : null;
}
function formatErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
export async function fetchGroupChanges(
api: { scry: (path: string) => Promise<unknown> },

View File

@@ -1,5 +1,13 @@
import type { RuntimeEnv } from "../../api.js";
import { asRecord, extractMessageText, formatErrorMessage } from "./utils.js";
import { extractMessageText } from "./utils.js";
function asRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === "object" ? (value as Record<string, unknown>) : null;
}
function formatErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
/**
* Format a number as @ud (with dots every 3 digits from the right)

View File

@@ -28,7 +28,6 @@ import {
mergeUniqueStrings,
shouldMigrateTlonSetting,
} from "./settings-helpers.js";
import { asRecord, formatErrorMessage, readString } from "./utils.js";
import {
extractMessageText,
formatModelName,
@@ -46,11 +45,24 @@ export type MonitorTlonOpts = {
accountId?: string | null;
};
function asRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === "object" ? (value as Record<string, unknown>) : null;
}
function readString(record: Record<string, unknown> | null, key: string): string | undefined {
const value = record?.[key];
return typeof value === "string" ? value : undefined;
}
function readNumber(record: Record<string, unknown> | null, key: string): number | undefined {
const value = record?.[key];
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function formatErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<void> {
const core = getTlonRuntime();
const cfg = core.config.loadConfig();

View File

@@ -181,19 +181,12 @@ export async function resolveAuthorizedMessageText(params: {
return citedContent + rawText;
}
export function asRecord(value: unknown): Record<string, unknown> | null {
function asRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === "object" ? (value as Record<string, unknown>) : null;
}
export function formatErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
export function readString(
record: Record<string, unknown> | null,
key: string,
): string | undefined {
const value = record?.[key];
function readString(record: Record<string, unknown>, key: string): string | undefined {
const value = record[key];
return typeof value === "string" ? value : undefined;
}

View File

@@ -4,10 +4,15 @@
"paths": {
"openclaw/extension-api": ["../src/extensionAPI.ts"],
"openclaw/plugin-sdk": ["../packages/plugin-sdk/dist/src/plugin-sdk/index.d.ts"],
"openclaw/plugin-sdk/*": ["../packages/plugin-sdk/dist/src/plugin-sdk/*.d.ts"],
"openclaw/plugin-sdk/*": [
"../packages/plugin-sdk/dist/src/plugin-sdk/*.d.ts",
"../packages/plugin-sdk/dist/packages/plugin-sdk/src/*.d.ts"
],
"openclaw/plugin-sdk/account-id": ["../src/plugin-sdk/account-id.ts"],
"@openclaw/*": ["../packages/plugin-sdk/dist/extensions/*", "../extensions/*"],
"@openclaw/plugin-sdk/*": ["../packages/plugin-sdk/dist/src/plugin-sdk/*.d.ts"]
"@openclaw/*": ["../packages/plugin-sdk/dist/packages/plugin-sdk/src/extensions/*"],
"@openclaw/plugin-sdk/*": [
"../packages/plugin-sdk/dist/packages/plugin-sdk/src/src/plugin-sdk/*"
]
}
}
}

View File

@@ -223,52 +223,38 @@ type TaskView = {
terminalOutcome?: string;
};
function pickOptionalFields<T extends object, TKey extends keyof T & string>(
source: T,
keys: readonly TKey[],
): Partial<Pick<T, TKey>> {
const result: Partial<Pick<T, TKey>> = {};
for (const key of keys) {
const value = source[key];
if (value !== undefined) {
result[key] = value;
}
}
return result;
function optionalField<TKey extends string, TValue>(
key: TKey,
value: TValue | undefined,
): Partial<Record<TKey, TValue>> {
return value !== undefined ? ({ [key]: value } as Record<TKey, TValue>) : {};
}
function pickOptionalTruthyStringFields<T extends object, TKey extends keyof T & string>(
source: T,
keys: readonly TKey[],
): Partial<Pick<T, TKey>> {
const result: Partial<Pick<T, TKey>> = {};
for (const key of keys) {
const value = source[key];
if (typeof value === "string" && value) {
result[key] = value as T[TKey];
}
}
return result;
function optionalTruthyStringField<TKey extends string>(
key: TKey,
value: string | undefined,
): Partial<Record<TKey, string>> {
return value ? ({ [key]: value } as Record<TKey, string>) : {};
}
function toFlowView(flow: FlowView): FlowView {
return {
flowId: flow.flowId,
syncMode: flow.syncMode,
...pickOptionalTruthyStringFields(flow, [
"controllerId",
"currentStep",
"blockedTaskId",
"blockedSummary",
]),
...optionalTruthyStringField("controllerId", flow.controllerId),
revision: flow.revision,
status: flow.status,
notifyPolicy: flow.notifyPolicy,
goal: flow.goal,
...pickOptionalFields(flow, ["stateJson", "waitJson", "cancelRequestedAt"]),
...optionalTruthyStringField("currentStep", flow.currentStep),
...optionalTruthyStringField("blockedTaskId", flow.blockedTaskId),
...optionalTruthyStringField("blockedSummary", flow.blockedSummary),
...optionalField("stateJson", flow.stateJson),
...optionalField("waitJson", flow.waitJson),
...optionalField("cancelRequestedAt", flow.cancelRequestedAt),
createdAt: flow.createdAt,
updatedAt: flow.updatedAt,
...pickOptionalFields(flow, ["endedAt"]),
...optionalField("endedAt", flow.endedAt),
};
}
@@ -276,26 +262,27 @@ function toTaskView(task: TaskView): TaskView {
return {
taskId: task.taskId,
runtime: task.runtime,
...pickOptionalTruthyStringFields(task, [
"sourceId",
"childSessionKey",
"parentFlowId",
"parentTaskId",
"agentId",
"runId",
"label",
"error",
"progressSummary",
"terminalSummary",
"terminalOutcome",
]),
...optionalTruthyStringField("sourceId", task.sourceId),
scopeKind: task.scopeKind,
...optionalTruthyStringField("childSessionKey", task.childSessionKey),
...optionalTruthyStringField("parentFlowId", task.parentFlowId),
...optionalTruthyStringField("parentTaskId", task.parentTaskId),
...optionalTruthyStringField("agentId", task.agentId),
...optionalTruthyStringField("runId", task.runId),
...optionalTruthyStringField("label", task.label),
task: task.task,
status: task.status,
deliveryStatus: task.deliveryStatus,
notifyPolicy: task.notifyPolicy,
createdAt: task.createdAt,
...pickOptionalFields(task, ["startedAt", "endedAt", "lastEventAt", "cleanupAfter"]),
...optionalField("startedAt", task.startedAt),
...optionalField("endedAt", task.endedAt),
...optionalField("lastEventAt", task.lastEventAt),
...optionalField("cleanupAfter", task.cleanupAfter),
...optionalTruthyStringField("error", task.error),
...optionalTruthyStringField("progressSummary", task.progressSummary),
...optionalTruthyStringField("terminalSummary", task.terminalSummary),
...optionalTruthyStringField("terminalOutcome", task.terminalOutcome),
};
}

View File

@@ -1,5 +1,8 @@
import { definePluginEntry } from "@openclaw/plugin-sdk/plugin-entry";
import { isRecord } from "./src/tool-config-shared.js";
function isRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
}
export default definePluginEntry({
id: "xai",

View File

@@ -1,6 +1,6 @@
import { normalizeXaiModelId } from "../model-id.js";
export function isRecord(value: unknown): value is Record<string, unknown> {
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

View File

@@ -6,7 +6,6 @@ import {
resolveXaiResponseTextCitationsAndInline,
XAI_RESPONSES_ENDPOINT,
} from "./responses-tool-shared.js";
import { isRecord } from "./tool-config-shared.js";
export { extractXaiWebSearchContent } from "./responses-tool-shared.js";
export const XAI_WEB_SEARCH_ENDPOINT = XAI_RESPONSES_ENDPOINT;
@@ -75,10 +74,14 @@ export function buildXaiWebSearchPayload(params: {
};
}
function asRecord(value: unknown): Record<string, unknown> | undefined {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
export function resolveXaiSearchConfig(searchConfig?: Record<string, unknown>): XaiWebSearchConfig {
return (
(isRecord(searchConfig?.grok) ? (searchConfig.grok as XaiWebSearchConfig) : undefined) ?? {}
);
return (asRecord(searchConfig?.grok) as XaiWebSearchConfig | undefined) ?? {};
}
export function resolveXaiWebSearchModel(searchConfig?: Record<string, unknown>): string {

View File

@@ -1,8 +1,11 @@
import type { OpenClawConfig } from "@openclaw/plugin-sdk/config-runtime";
import { isRecord } from "./tool-config-shared.js";
type JsonRecord = Record<string, unknown>;
function isRecord(value: unknown): value is JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function cloneRecord<T extends JsonRecord | undefined>(value: T): T {
if (!value) {
return value;

View File

@@ -1,4 +0,0 @@
export {
collectRuntimeConfigAssignments,
secretTargetRegistryEntries,
} from "./src/secret-contract.js";

View File

@@ -1,2 +1 @@
export { collectZalouserSecurityAuditFindings } from "./src/security-audit.js";
export { legacyConfigRules, normalizeCompatibilityConfig } from "./src/doctor.js";

View File

@@ -152,14 +152,6 @@ const ZALOUSER_LEGACY_CONFIG_RULES: ChannelDoctorLegacyConfigRule[] = [
},
];
export const legacyConfigRules = ZALOUSER_LEGACY_CONFIG_RULES;
export function normalizeCompatibilityConfig(params: {
cfg: OpenClawConfig;
}): ChannelDoctorConfigMutation {
return normalizeZalouserCompatibilityConfig(params.cfg);
}
export const collectZalouserMutableAllowlistWarnings =
createDangerousNameMatchingMutableAllowlistWarningCollector({
channel: "zalouser",
@@ -182,7 +174,7 @@ export const zalouserDoctor: ChannelDoctorAdapter = {
groupModel: "hybrid",
groupAllowFromFallbackToAllowFrom: false,
warnOnEmptyGroupSenderAllowlist: false,
legacyConfigRules,
normalizeCompatibilityConfig,
legacyConfigRules: ZALOUSER_LEGACY_CONFIG_RULES,
normalizeCompatibilityConfig: ({ cfg }) => normalizeZalouserCompatibilityConfig(cfg),
collectMutableAllowlistWarnings: collectZalouserMutableAllowlistWarnings,
};

View File

@@ -580,10 +580,6 @@
"types": "./dist/plugin-sdk/image-generation.d.ts",
"default": "./dist/plugin-sdk/image-generation.js"
},
"./plugin-sdk/image-generation-runtime": {
"types": "./dist/plugin-sdk/image-generation-runtime.d.ts",
"default": "./dist/plugin-sdk/image-generation-runtime.js"
},
"./plugin-sdk/image-generation-core": {
"types": "./dist/plugin-sdk/image-generation-core.d.ts",
"default": "./dist/plugin-sdk/image-generation-core.js"

View File

@@ -5,63 +5,63 @@
"type": "module",
"exports": {
"./config-runtime": {
"types": "./dist/src/plugin-sdk/config-runtime.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/config-runtime.d.ts",
"default": "./src/config-runtime.ts"
},
"./plugin-entry": {
"types": "./dist/src/plugin-sdk/plugin-entry.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/plugin-entry.d.ts",
"default": "./src/plugin-entry.ts"
},
"./provider-auth": {
"types": "./dist/src/plugin-sdk/provider-auth.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/provider-auth.d.ts",
"default": "./src/provider-auth.ts"
},
"./provider-auth-runtime": {
"types": "./dist/src/plugin-sdk/provider-auth-runtime.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/provider-auth-runtime.d.ts",
"default": "./src/provider-auth-runtime.ts"
},
"./provider-entry": {
"types": "./dist/src/plugin-sdk/provider-entry.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/provider-entry.d.ts",
"default": "./src/provider-entry.ts"
},
"./provider-http": {
"types": "./dist/src/plugin-sdk/provider-http.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/provider-http.d.ts",
"default": "./src/provider-http.ts"
},
"./provider-model-shared": {
"types": "./dist/src/plugin-sdk/provider-model-shared.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/provider-model-shared.d.ts",
"default": "./src/provider-model-shared.ts"
},
"./provider-onboard": {
"types": "./dist/src/plugin-sdk/provider-onboard.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/provider-onboard.d.ts",
"default": "./src/provider-onboard.ts"
},
"./provider-stream-shared": {
"types": "./dist/src/plugin-sdk/provider-stream-shared.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/provider-stream-shared.d.ts",
"default": "./src/provider-stream-shared.ts"
},
"./provider-tools": {
"types": "./dist/src/plugin-sdk/provider-tools.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/provider-tools.d.ts",
"default": "./src/provider-tools.ts"
},
"./provider-web-search": {
"types": "./dist/src/plugin-sdk/provider-web-search.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/provider-web-search.d.ts",
"default": "./src/provider-web-search.ts"
},
"./runtime-env": {
"types": "./dist/src/plugin-sdk/runtime-env.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/runtime-env.d.ts",
"default": "./src/runtime-env.ts"
},
"./secret-input": {
"types": "./dist/src/plugin-sdk/secret-input.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/secret-input.d.ts",
"default": "./src/secret-input.ts"
},
"./testing": {
"types": "./dist/src/plugin-sdk/testing.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/testing.d.ts",
"default": "./src/testing.ts"
},
"./video-generation": {
"types": "./dist/src/plugin-sdk/video-generation.d.ts",
"types": "./dist/packages/plugin-sdk/src/src/plugin-sdk/video-generation.d.ts",
"default": "./src/video-generation.ts"
}
}

View File

@@ -98,6 +98,10 @@ export function resolveExtensionTestPlan(params = {}) {
const relativeExtensionDir = normalizeRelative(path.relative(repoRoot, extensionDir));
const roots = [relativeExtensionDir];
const pairedCoreRoot = path.join(repoRoot, "src", extensionId);
if (fs.existsSync(pairedCoreRoot)) {
roots.push(normalizeRelative(path.relative(repoRoot, pairedCoreRoot)));
}
const usesChannelConfig = roots.some((root) => channelTestRoots.includes(root));
const usesAcpxConfig = roots.some((root) => isAcpxExtensionRoot(root));

View File

@@ -134,7 +134,6 @@
"googlechat-runtime-shared",
"media-generation-runtime-shared",
"image-generation",
"image-generation-runtime",
"image-generation-core",
"music-generation",
"music-generation-core",

View File

@@ -1,10 +1,7 @@
import { isAbsolute } from "node:path";
import type { AcpSessionRuntimeOptions, SessionAcpMeta } from "../../config/sessions/types.js";
import { normalizeText } from "../normalize-text.js";
import { AcpRuntimeError } from "../runtime/errors.js";
export { normalizeText } from "../normalize-text.js";
const MAX_RUNTIME_MODE_LENGTH = 64;
const MAX_MODEL_LENGTH = 200;
const MAX_PERMISSION_PROFILE_LENGTH = 80;
@@ -214,6 +211,14 @@ export function validateRuntimeOptionPatch(
return next;
}
export function normalizeText(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
export function normalizeRuntimeOptions(
options: AcpSessionRuntimeOptions | undefined,
): AcpSessionRuntimeOptions {

View File

@@ -1,7 +0,0 @@
export function normalizeText(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}

View File

@@ -3,11 +3,8 @@ import type { ChannelId } from "../channels/plugins/types.js";
import type { SessionBindingRecord } from "../infra/outbound/session-binding-service.js";
import { normalizeAccountId, resolveAgentIdFromSessionKey } from "../routing/session-key.js";
import { sanitizeAgentId } from "../routing/session-key.js";
import { normalizeText } from "./normalize-text.js";
import type { AcpRuntimeSessionMode } from "./runtime/types.js";
export { normalizeText } from "./normalize-text.js";
export type ConfiguredAcpBindingChannel = ChannelId;
export type ConfiguredAcpBindingSpec = {
@@ -37,6 +34,14 @@ export type AcpBindingConfigShape = {
label?: string;
};
export function normalizeText(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
export function normalizeMode(value: unknown): AcpRuntimeSessionMode {
const raw = normalizeText(value)?.toLowerCase();
return raw === "oneshot" ? "oneshot" : "persistent";

View File

@@ -1,5 +1,4 @@
import type { SessionAcpIdentity, SessionAcpMeta } from "../../config/sessions/types.js";
import { normalizeText } from "../normalize-text.js";
import { isSessionIdentityPending, resolveSessionIdentityFromMeta } from "./session-identity.js";
export const ACP_SESSION_IDENTITY_RENDERER_VERSION = "v1";
@@ -35,6 +34,14 @@ const ACP_AGENT_RESUME_HINT_BY_KEY = new Map<string, SessionResumeHintResolver>(
],
]);
function normalizeText(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function normalizeAgentHintKey(value: unknown): string | undefined {
const normalized = normalizeText(value);
if (!normalized) {

View File

@@ -3,9 +3,16 @@ import type {
SessionAcpIdentitySource,
SessionAcpMeta,
} from "../../config/sessions/types.js";
import { normalizeText } from "../normalize-text.js";
import type { AcpRuntimeHandle, AcpRuntimeStatus } from "./types.js";
function normalizeText(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function normalizeIdentityState(value: unknown): SessionAcpIdentity["state"] | undefined {
if (value !== "pending" && value !== "resolved") {
return undefined;

View File

@@ -109,11 +109,6 @@ describe("startAcpSpawnParentStreamRelay", () => {
expect(texts.some((text) => text.includes("Started codex session"))).toBe(true);
expect(texts.some((text) => text.includes("codex: hello from child"))).toBe(true);
expect(texts.some((text) => text.includes("codex run completed in 2s"))).toBe(true);
expect(
enqueueSystemEventMock.mock.calls.every(
(call) => (call[1] as { trusted?: boolean } | undefined)?.trusted === false,
),
).toBe(true);
expect(requestHeartbeatNowMock).toHaveBeenCalledWith(
expect.objectContaining({
reason: "acp:spawn:stream",

View File

@@ -200,11 +200,7 @@ export function startAcpSpawnParentStreamRelay(params: {
if (!shouldSurfaceUpdates) {
return;
}
enqueueSystemEvent(cleaned, {
sessionKey: parentSessionKey,
contextKey,
trusted: false,
});
enqueueSystemEvent(cleaned, { sessionKey: parentSessionKey, contextKey });
wake();
};
const emitStartNotice = () => {

Some files were not shown because too many files have changed in this diff Show More