mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(memory): fail open when embedding recall stalls
Preserve custom OpenAI-compatible memory embedding provider ids from #81170. Fixes #47884. Fixes #49524. Refs #56532. Co-authored-by: adone0 <vladyslav.yavorskyi@outlook.com>
This commit is contained in:
@@ -58,6 +58,11 @@ explicitly to use Gemini, Voyage, Mistral, DeepInfra, Bedrock, GitHub Copilot,
|
||||
Ollama, a local GGUF model, or an OpenAI-compatible `/v1/embeddings` endpoint.
|
||||
Legacy configs that still say `provider: "auto"` resolve to `openai`.
|
||||
|
||||
If OpenAI embeddings are unreachable from your network, memory recall fails open
|
||||
instead of blocking the turn. Set the existing `memorySearch.provider` field to a
|
||||
reachable local, Ollama, regional, or OpenAI-compatible provider to restore
|
||||
semantic ranking.
|
||||
|
||||
### Custom provider ids
|
||||
|
||||
`memorySearch.provider` can point at a custom `models.providers.<id>` entry for memory-specific provider adapters such as `ollama`, or for OpenAI-compatible model APIs such as `openai-responses` / `openai-completions`. OpenClaw resolves that provider's `api` owner for the embedding adapter while preserving the custom provider id for endpoint, auth, and model-prefix handling. This lets multi-GPU or multi-host setups dedicate memory embeddings to a specific local endpoint:
|
||||
|
||||
@@ -23,6 +23,12 @@ let backend: MemoryBackend = "builtin";
|
||||
let workspaceDir = "/workspace";
|
||||
let customStatus: Record<string, unknown> | undefined;
|
||||
let searchImpl: SearchImpl = async () => [];
|
||||
let getManagerImpl:
|
||||
| ((params: { cfg?: unknown; agentId?: string }) => Promise<{
|
||||
manager?: unknown;
|
||||
error?: string;
|
||||
}>)
|
||||
| undefined;
|
||||
let readFileImpl: (params: MemoryReadParams) => Promise<MemoryReadResult> = async (params) => ({
|
||||
text: "",
|
||||
path: params.relPath,
|
||||
@@ -52,9 +58,9 @@ const stubManager = {
|
||||
close: vi.fn(),
|
||||
};
|
||||
|
||||
const getMemorySearchManagerMock = vi.fn(async (_params: { cfg?: unknown; agentId?: string }) => ({
|
||||
manager: stubManager,
|
||||
}));
|
||||
const getMemorySearchManagerMock = vi.fn(async (params: { cfg?: unknown; agentId?: string }) =>
|
||||
getManagerImpl ? await getManagerImpl(params) : { manager: stubManager },
|
||||
);
|
||||
const readAgentMemoryFileMock = vi.fn(
|
||||
async (params: MemoryReadParams) => await readFileImpl(params),
|
||||
);
|
||||
@@ -84,6 +90,15 @@ export function setMemorySearchImpl(next: SearchImpl): void {
|
||||
searchImpl = next;
|
||||
}
|
||||
|
||||
export function setMemorySearchManagerImpl(
|
||||
next: (params: { cfg?: unknown; agentId?: string }) => Promise<{
|
||||
manager?: unknown;
|
||||
error?: string;
|
||||
}>,
|
||||
): void {
|
||||
getManagerImpl = next;
|
||||
}
|
||||
|
||||
export function setMemoryReadFileImpl(
|
||||
next: (params: MemoryReadParams) => Promise<MemoryReadResult>,
|
||||
): void {
|
||||
@@ -98,6 +113,7 @@ export function resetMemoryToolMockState(overrides?: {
|
||||
backend = overrides?.backend ?? "builtin";
|
||||
workspaceDir = "/workspace";
|
||||
customStatus = undefined;
|
||||
getManagerImpl = undefined;
|
||||
searchImpl = overrides?.searchImpl ?? (async () => []);
|
||||
readFileImpl =
|
||||
overrides?.readFileImpl ??
|
||||
|
||||
@@ -17,6 +17,7 @@ import {
|
||||
type MemoryReadParams,
|
||||
} from "./memory-tool-manager-mock.js";
|
||||
import { createMemoryCoreTestHarness } from "./test-helpers.js";
|
||||
import { testing as memoryToolsTesting } from "./tools.js";
|
||||
import {
|
||||
asOpenClawConfig,
|
||||
createAutoCitationsMemorySearchTool,
|
||||
@@ -51,6 +52,7 @@ async function waitFor<T>(task: () => Promise<T>, timeoutMs = 1500): Promise<T>
|
||||
|
||||
beforeEach(() => {
|
||||
clearMemoryPluginState();
|
||||
memoryToolsTesting.resetMemorySearchToolCooldowns();
|
||||
resetMemoryToolMockState({
|
||||
backend: "builtin",
|
||||
searchImpl: async () => [
|
||||
@@ -447,6 +449,105 @@ describe("memory tools", () => {
|
||||
expect(getMemorySearchManagerMockCalls()).toBe(1);
|
||||
});
|
||||
|
||||
it("does not cooldown primary memory when a corpus=all wiki supplement stalls", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
let searchCalls = 0;
|
||||
setMemorySearchImpl(async () => {
|
||||
searchCalls += 1;
|
||||
return [
|
||||
{
|
||||
path: "MEMORY.md",
|
||||
startLine: 5,
|
||||
endLine: 7,
|
||||
score: 0.9,
|
||||
snippet: "@@ -5,3 @@\nAssistant: noted",
|
||||
source: "memory" as const,
|
||||
},
|
||||
];
|
||||
});
|
||||
registerMemoryCorpusSupplement("memory-wiki", {
|
||||
search: async () => await new Promise(() => undefined),
|
||||
get: async () => null,
|
||||
});
|
||||
|
||||
const tool = createMemorySearchToolOrThrow();
|
||||
const stalledAllResultPromise = tool.execute("call_all_stalled_wiki", {
|
||||
query: "alpha",
|
||||
corpus: "all",
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(15_000);
|
||||
const stalledAllResult = await stalledAllResultPromise;
|
||||
expectUnavailableMemorySearchDetails(stalledAllResult.details, {
|
||||
error: "memory_search timed out after 15s",
|
||||
warning: "Memory search is unavailable due to an embedding/provider error.",
|
||||
action: "Check embedding provider configuration and retry memory_search.",
|
||||
});
|
||||
|
||||
const memoryResult = await tool.execute("call_memory_after_stalled_wiki", {
|
||||
query: "alpha",
|
||||
});
|
||||
const details = memoryResult.details as { results: Array<{ corpus: string; path: string }> };
|
||||
expect(details.results.map((entry) => [entry.corpus, entry.path])).toEqual([
|
||||
["memory", "MEMORY.md"],
|
||||
]);
|
||||
expect(searchCalls).toBe(2);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("cooldowns primary memory when corpus=all memory search stalls", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
let searchCalls = 0;
|
||||
setMemorySearchImpl(async () => {
|
||||
searchCalls += 1;
|
||||
return await new Promise(() => undefined);
|
||||
});
|
||||
registerMemoryCorpusSupplement("memory-wiki", {
|
||||
search: async () => [
|
||||
{
|
||||
corpus: "wiki",
|
||||
path: "entities/alpha.md",
|
||||
title: "Alpha",
|
||||
kind: "entity",
|
||||
score: 4,
|
||||
snippet: "Alpha wiki entry",
|
||||
},
|
||||
],
|
||||
get: async () => null,
|
||||
});
|
||||
|
||||
const tool = createMemorySearchToolOrThrow();
|
||||
const stalledAllResultPromise = tool.execute("call_all_stalled_memory", {
|
||||
query: "alpha",
|
||||
corpus: "all",
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(15_000);
|
||||
const stalledAllResult = await stalledAllResultPromise;
|
||||
expectUnavailableMemorySearchDetails(stalledAllResult.details, {
|
||||
error: "memory_search timed out after 15s",
|
||||
warning: "Memory search is unavailable due to an embedding/provider error.",
|
||||
action: "Check embedding provider configuration and retry memory_search.",
|
||||
});
|
||||
|
||||
const wikiOnlyResult = await tool.execute("call_all_after_stalled_memory", {
|
||||
query: "alpha",
|
||||
corpus: "all",
|
||||
});
|
||||
const details = wikiOnlyResult.details as {
|
||||
results: Array<{ corpus: string; path: string }>;
|
||||
};
|
||||
expect(details.results.map((entry) => [entry.corpus, entry.path])).toEqual([
|
||||
["wiki", "entities/alpha.md"],
|
||||
]);
|
||||
expect(searchCalls).toBe(1);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("falls back to a wiki corpus supplement for memory_get corpus=all", async () => {
|
||||
setMemoryReadFileImpl(async () => {
|
||||
throw new Error("path required");
|
||||
|
||||
@@ -6,8 +6,9 @@ import {
|
||||
resetMemoryToolMockState,
|
||||
setMemoryBackend,
|
||||
setMemorySearchImpl,
|
||||
setMemorySearchManagerImpl,
|
||||
} from "./memory-tool-manager-mock.js";
|
||||
import { createMemorySearchTool } from "./tools.js";
|
||||
import { createMemorySearchTool, testing as memoryToolsTesting } from "./tools.js";
|
||||
import { MemoryGetSchema, MemorySearchSchema } from "./tools.shared.js";
|
||||
import {
|
||||
asOpenClawConfig,
|
||||
@@ -56,6 +57,7 @@ describe("memory tool schemas", () => {
|
||||
describe("memory_search unavailable payloads", () => {
|
||||
beforeEach(() => {
|
||||
resetMemoryToolMockState({ searchImpl: async () => [] });
|
||||
memoryToolsTesting.resetMemorySearchToolCooldowns();
|
||||
});
|
||||
|
||||
it("rejects fractional maxResults before searching", async () => {
|
||||
@@ -128,6 +130,57 @@ describe("memory_search unavailable payloads", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("returns unavailable metadata when manager setup does not settle", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
setMemorySearchManagerImpl(async () => await new Promise(() => undefined));
|
||||
const tool = createMemorySearchToolOrThrow();
|
||||
|
||||
const resultPromise = tool.execute("manager-timeout", { query: "hello" });
|
||||
await vi.advanceTimersByTimeAsync(15_000);
|
||||
|
||||
const result = await resultPromise;
|
||||
expectUnavailableMemorySearchDetails(result.details, {
|
||||
error: "memory_search timed out after 15s",
|
||||
warning: "Memory search is unavailable due to an embedding/provider error.",
|
||||
action: "Check embedding provider configuration and retry memory_search.",
|
||||
});
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("returns unavailable metadata when memory search does not settle", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
let searchCalls = 0;
|
||||
setMemorySearchImpl(async () => {
|
||||
searchCalls += 1;
|
||||
return await new Promise(() => undefined);
|
||||
});
|
||||
const tool = createMemorySearchToolOrThrow();
|
||||
|
||||
const resultPromise = tool.execute("search-timeout", { query: "hello" });
|
||||
await vi.advanceTimersByTimeAsync(15_000);
|
||||
|
||||
const result = await resultPromise;
|
||||
expectUnavailableMemorySearchDetails(result.details, {
|
||||
error: "memory_search timed out after 15s",
|
||||
warning: "Memory search is unavailable due to an embedding/provider error.",
|
||||
action: "Check embedding provider configuration and retry memory_search.",
|
||||
});
|
||||
const cooldownResult = await tool.execute("search-cooldown", { query: "hello again" });
|
||||
expectUnavailableMemorySearchDetails(cooldownResult.details, {
|
||||
error: "memory_search timed out after 15s",
|
||||
warning: "Memory search is unavailable due to an embedding/provider error.",
|
||||
action: "Check embedding provider configuration and retry memory_search.",
|
||||
});
|
||||
expect(searchCalls).toBe(1);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("re-resolves the manager once when a cached sqlite handle was closed", async () => {
|
||||
let searchCalls = 0;
|
||||
setMemorySearchImpl(async () => {
|
||||
|
||||
@@ -42,6 +42,73 @@ type MemorySearchToolResult =
|
||||
| (MemorySearchResult & { corpus: MemorySource })
|
||||
| MemoryCorpusSearchResult;
|
||||
|
||||
const MEMORY_SEARCH_TOOL_TIMEOUT_MS = 15_000;
|
||||
const MEMORY_SEARCH_TOOL_COOLDOWN_MS = 60_000;
|
||||
|
||||
const memorySearchToolCooldowns = new Map<string, { until: number; error: string }>();
|
||||
|
||||
function resolveMemorySearchToolCooldownKey(options: {
|
||||
agentId?: string;
|
||||
agentSessionKey?: string;
|
||||
}): string {
|
||||
return options.agentId ?? options.agentSessionKey ?? "default";
|
||||
}
|
||||
|
||||
function readMemorySearchToolCooldown(key: string): { error: string } | undefined {
|
||||
const entry = memorySearchToolCooldowns.get(key);
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
if (entry.until <= Date.now()) {
|
||||
memorySearchToolCooldowns.delete(key);
|
||||
return undefined;
|
||||
}
|
||||
return { error: entry.error };
|
||||
}
|
||||
|
||||
function recordMemorySearchToolCooldown(key: string, error: string): void {
|
||||
memorySearchToolCooldowns.set(key, {
|
||||
until: Date.now() + MEMORY_SEARCH_TOOL_COOLDOWN_MS,
|
||||
error,
|
||||
});
|
||||
}
|
||||
|
||||
export const testing = {
|
||||
resetMemorySearchToolCooldowns() {
|
||||
memorySearchToolCooldowns.clear();
|
||||
},
|
||||
} as const;
|
||||
|
||||
async function runMemorySearchToolWithDeadline<T>(params: {
|
||||
timeoutMs: number;
|
||||
run: () => Promise<T>;
|
||||
}): Promise<{ status: "ok"; value: T } | { status: "unavailable"; error: string }> {
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
const timeoutPromise = new Promise<"timeout">((resolve) => {
|
||||
timer = setTimeout(() => resolve("timeout"), params.timeoutMs);
|
||||
timer.unref?.();
|
||||
});
|
||||
const task = params.run();
|
||||
task.catch(() => undefined);
|
||||
|
||||
try {
|
||||
const result = await Promise.race([task, timeoutPromise]);
|
||||
if (result === "timeout") {
|
||||
return {
|
||||
status: "unavailable",
|
||||
error: `memory_search timed out after ${Math.round(params.timeoutMs / 1000)}s`,
|
||||
};
|
||||
}
|
||||
return { status: "ok", value: result as T };
|
||||
} catch (error) {
|
||||
return { status: "unavailable", error: formatErrorMessage(error) };
|
||||
} finally {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function sortMemorySearchToolResults<T extends { score: number; path: string }>(results: T[]): T[] {
|
||||
return results.toSorted((left, right) => {
|
||||
if (left.score !== right.score) {
|
||||
@@ -264,164 +331,219 @@ export function createMemorySearchTool(options: {
|
||||
| "all"
|
||||
| "sessions"
|
||||
| undefined;
|
||||
const { resolveMemoryBackendConfig } = await loadMemoryToolRuntime();
|
||||
const shouldQueryMemory = requestedCorpus !== "wiki";
|
||||
const shouldQuerySupplements = requestedCorpus === "wiki" || requestedCorpus === "all";
|
||||
const memory = shouldQueryMemory ? await getMemoryManagerContext({ cfg, agentId }) : null;
|
||||
if (shouldQueryMemory && memory && "error" in memory && !shouldQuerySupplements) {
|
||||
return jsonResult(buildMemorySearchUnavailableResult(memory.error));
|
||||
}
|
||||
try {
|
||||
const citationsMode = resolveMemoryCitationsMode(cfg);
|
||||
const includeCitations = shouldIncludeCitations({
|
||||
mode: citationsMode,
|
||||
sessionKey: options.agentSessionKey,
|
||||
});
|
||||
const pluginConfig = resolveMemoryCorePluginConfig(cfg);
|
||||
const dreamingEnabled = resolveMemoryDreamingConfig({
|
||||
pluginConfig,
|
||||
cfg,
|
||||
}).enabled;
|
||||
const dreaming = resolveMemoryDeepDreamingConfig({
|
||||
pluginConfig,
|
||||
cfg,
|
||||
});
|
||||
const searchStartedAt = Date.now();
|
||||
let rawResults: MemorySearchResult[] = [];
|
||||
let surfacedMemoryResults: Array<MemorySearchResult & { corpus: MemorySource }> = [];
|
||||
let provider: string | undefined;
|
||||
let model: string | undefined;
|
||||
let fallback: unknown;
|
||||
let searchMode: string | undefined;
|
||||
let searchDebug:
|
||||
| {
|
||||
backend: string;
|
||||
configuredMode?: string;
|
||||
effectiveMode?: string;
|
||||
fallback?: string;
|
||||
searchMs: number;
|
||||
hits: number;
|
||||
}
|
||||
| undefined;
|
||||
if (shouldQueryMemory && memory && !("error" in memory)) {
|
||||
let activeMemory = memory;
|
||||
const runtimeDebug: MemorySearchRuntimeDebug[] = [];
|
||||
const qmdSearchModeOverride = resolveActiveMemoryQmdSearchModeOverride(
|
||||
cfg,
|
||||
options.agentSessionKey,
|
||||
);
|
||||
const searchSources: MemorySource[] | undefined =
|
||||
requestedCorpus === "sessions"
|
||||
? (["sessions"] as MemorySource[])
|
||||
: requestedCorpus === "memory"
|
||||
? (["memory"] as MemorySource[])
|
||||
: undefined;
|
||||
const searchOptions = {
|
||||
maxResults,
|
||||
minScore,
|
||||
const cooldownKey = resolveMemorySearchToolCooldownKey({
|
||||
agentId,
|
||||
agentSessionKey: options.agentSessionKey,
|
||||
});
|
||||
const cooldown =
|
||||
requestedCorpus === "wiki" ? undefined : readMemorySearchToolCooldown(cooldownKey);
|
||||
let activeUnavailablePhase: "memory" | "supplement" | undefined;
|
||||
let failedUnavailablePhase: "memory" | "supplement" | undefined;
|
||||
const runUnavailablePhase = async <T>(
|
||||
phase: "memory" | "supplement",
|
||||
task: () => Promise<T>,
|
||||
): Promise<T> => {
|
||||
activeUnavailablePhase = phase;
|
||||
try {
|
||||
return await task();
|
||||
} catch (error) {
|
||||
failedUnavailablePhase = phase;
|
||||
throw error;
|
||||
} finally {
|
||||
if (activeUnavailablePhase === phase) {
|
||||
activeUnavailablePhase = undefined;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const outcome = await runMemorySearchToolWithDeadline({
|
||||
timeoutMs: MEMORY_SEARCH_TOOL_TIMEOUT_MS,
|
||||
run: async () => {
|
||||
const { resolveMemoryBackendConfig } = await loadMemoryToolRuntime();
|
||||
const shouldQuerySupplements = requestedCorpus === "wiki" || requestedCorpus === "all";
|
||||
const shouldQueryMemory = requestedCorpus !== "wiki" && !cooldown;
|
||||
if (cooldown && !shouldQuerySupplements) {
|
||||
return jsonResult(buildMemorySearchUnavailableResult(cooldown.error));
|
||||
}
|
||||
const memory = shouldQueryMemory
|
||||
? await runUnavailablePhase(
|
||||
"memory",
|
||||
async () => await getMemoryManagerContext({ cfg, agentId }),
|
||||
)
|
||||
: null;
|
||||
if (shouldQueryMemory && memory && "error" in memory && !shouldQuerySupplements) {
|
||||
recordMemorySearchToolCooldown(
|
||||
cooldownKey,
|
||||
memory.error ?? "memory search unavailable",
|
||||
);
|
||||
return jsonResult(buildMemorySearchUnavailableResult(memory.error));
|
||||
}
|
||||
|
||||
const citationsMode = resolveMemoryCitationsMode(cfg);
|
||||
const includeCitations = shouldIncludeCitations({
|
||||
mode: citationsMode,
|
||||
sessionKey: options.agentSessionKey,
|
||||
qmdSearchModeOverride,
|
||||
onDebug: (debug: MemorySearchRuntimeDebug) => {
|
||||
runtimeDebug.push(debug);
|
||||
},
|
||||
...(searchSources ? { sources: searchSources } : {}),
|
||||
};
|
||||
try {
|
||||
rawResults = await activeMemory.manager.search(query, searchOptions);
|
||||
} catch (error) {
|
||||
if (!isClosedMemoryStoreError(error)) {
|
||||
throw error;
|
||||
}
|
||||
const refreshed = await getMemoryManagerContext({ cfg, agentId });
|
||||
if ("error" in refreshed) {
|
||||
throw error;
|
||||
}
|
||||
activeMemory = refreshed;
|
||||
rawResults = await activeMemory.manager.search(query, searchOptions);
|
||||
}
|
||||
if (rawResults.length === 0 && activeMemory.manager.sync) {
|
||||
await activeMemory.manager.sync({ reason: "search", force: true });
|
||||
rawResults = await activeMemory.manager.search(query, searchOptions);
|
||||
}
|
||||
rawResults = await filterMemorySearchHitsBySessionVisibility({
|
||||
cfg,
|
||||
agentId,
|
||||
requesterSessionKey: options.agentSessionKey,
|
||||
sandboxed: options.sandboxed === true,
|
||||
hits: rawResults,
|
||||
});
|
||||
if (requestedCorpus === "sessions") {
|
||||
rawResults = rawResults.filter((hit) => hit.source === "sessions");
|
||||
} else if (requestedCorpus === "memory") {
|
||||
rawResults = rawResults.filter((hit) => hit.source === "memory");
|
||||
}
|
||||
const status = activeMemory.manager.status();
|
||||
const decorated = decorateCitations(rawResults, includeCitations);
|
||||
const resolved = resolveMemoryBackendConfig({ cfg, agentId });
|
||||
const memoryResults =
|
||||
status.backend === "qmd"
|
||||
? clampResultsByInjectedChars(decorated, resolved.qmd?.limits.maxInjectedChars)
|
||||
: decorated;
|
||||
surfacedMemoryResults = memoryResults.map((result) => ({
|
||||
...result,
|
||||
corpus: result.source,
|
||||
}));
|
||||
if (dreamingEnabled) {
|
||||
queueShortTermRecallTracking({
|
||||
workspaceDir: status.workspaceDir,
|
||||
query,
|
||||
rawResults,
|
||||
surfacedResults: memoryResults,
|
||||
timezone: dreaming.timezone,
|
||||
const pluginConfig = resolveMemoryCorePluginConfig(cfg);
|
||||
const dreamingEnabled = resolveMemoryDreamingConfig({
|
||||
pluginConfig,
|
||||
cfg,
|
||||
}).enabled;
|
||||
const dreaming = resolveMemoryDeepDreamingConfig({
|
||||
pluginConfig,
|
||||
cfg,
|
||||
});
|
||||
const searchStartedAt = Date.now();
|
||||
let rawResults: MemorySearchResult[] = [];
|
||||
let surfacedMemoryResults: Array<MemorySearchResult & { corpus: MemorySource }> = [];
|
||||
let provider: string | undefined;
|
||||
let model: string | undefined;
|
||||
let fallback: unknown;
|
||||
let searchMode: string | undefined;
|
||||
let searchDebug:
|
||||
| {
|
||||
backend: string;
|
||||
configuredMode?: string;
|
||||
effectiveMode?: string;
|
||||
fallback?: string;
|
||||
searchMs: number;
|
||||
hits: number;
|
||||
}
|
||||
| undefined;
|
||||
if (shouldQueryMemory && memory && !("error" in memory)) {
|
||||
await runUnavailablePhase("memory", async () => {
|
||||
let activeMemory = memory;
|
||||
const runtimeDebug: MemorySearchRuntimeDebug[] = [];
|
||||
const qmdSearchModeOverride = resolveActiveMemoryQmdSearchModeOverride(
|
||||
cfg,
|
||||
options.agentSessionKey,
|
||||
);
|
||||
const searchSources: MemorySource[] | undefined =
|
||||
requestedCorpus === "sessions"
|
||||
? (["sessions"] as MemorySource[])
|
||||
: requestedCorpus === "memory"
|
||||
? (["memory"] as MemorySource[])
|
||||
: undefined;
|
||||
const searchOptions = {
|
||||
maxResults,
|
||||
minScore,
|
||||
sessionKey: options.agentSessionKey,
|
||||
qmdSearchModeOverride,
|
||||
onDebug: (debug: MemorySearchRuntimeDebug) => {
|
||||
runtimeDebug.push(debug);
|
||||
},
|
||||
...(searchSources ? { sources: searchSources } : {}),
|
||||
};
|
||||
try {
|
||||
rawResults = await activeMemory.manager.search(query, searchOptions);
|
||||
} catch (error) {
|
||||
if (!isClosedMemoryStoreError(error)) {
|
||||
throw error;
|
||||
}
|
||||
const refreshed = await getMemoryManagerContext({ cfg, agentId });
|
||||
if ("error" in refreshed) {
|
||||
throw error;
|
||||
}
|
||||
activeMemory = refreshed;
|
||||
rawResults = await activeMemory.manager.search(query, searchOptions);
|
||||
}
|
||||
if (rawResults.length === 0 && activeMemory.manager.sync) {
|
||||
await activeMemory.manager.sync({ reason: "search", force: true });
|
||||
rawResults = await activeMemory.manager.search(query, searchOptions);
|
||||
}
|
||||
rawResults = await filterMemorySearchHitsBySessionVisibility({
|
||||
cfg,
|
||||
agentId,
|
||||
requesterSessionKey: options.agentSessionKey,
|
||||
sandboxed: options.sandboxed === true,
|
||||
hits: rawResults,
|
||||
});
|
||||
if (requestedCorpus === "sessions") {
|
||||
rawResults = rawResults.filter((hit) => hit.source === "sessions");
|
||||
} else if (requestedCorpus === "memory") {
|
||||
rawResults = rawResults.filter((hit) => hit.source === "memory");
|
||||
}
|
||||
const status = activeMemory.manager.status();
|
||||
const decorated = decorateCitations(rawResults, includeCitations);
|
||||
const resolved = resolveMemoryBackendConfig({ cfg, agentId });
|
||||
const memoryResults =
|
||||
status.backend === "qmd"
|
||||
? clampResultsByInjectedChars(decorated, resolved.qmd?.limits.maxInjectedChars)
|
||||
: decorated;
|
||||
surfacedMemoryResults = memoryResults.map((result) => ({
|
||||
...result,
|
||||
corpus: result.source,
|
||||
}));
|
||||
if (dreamingEnabled) {
|
||||
queueShortTermRecallTracking({
|
||||
workspaceDir: status.workspaceDir,
|
||||
query,
|
||||
rawResults,
|
||||
surfacedResults: memoryResults,
|
||||
timezone: dreaming.timezone,
|
||||
});
|
||||
}
|
||||
provider = status.provider;
|
||||
model = status.model;
|
||||
fallback = status.fallback;
|
||||
const latestDebug = runtimeDebug.at(-1);
|
||||
searchMode = latestDebug?.effectiveMode;
|
||||
searchDebug = {
|
||||
backend: status.backend,
|
||||
configuredMode: latestDebug?.configuredMode,
|
||||
effectiveMode:
|
||||
status.backend === "qmd"
|
||||
? (latestDebug?.effectiveMode ?? latestDebug?.configuredMode)
|
||||
: "n/a",
|
||||
fallback: latestDebug?.fallback,
|
||||
searchMs: Math.max(0, Date.now() - searchStartedAt),
|
||||
hits: rawResults.length,
|
||||
};
|
||||
});
|
||||
}
|
||||
provider = status.provider;
|
||||
model = status.model;
|
||||
fallback = status.fallback;
|
||||
const latestDebug = runtimeDebug.at(-1);
|
||||
searchMode = latestDebug?.effectiveMode;
|
||||
searchDebug = {
|
||||
backend: status.backend,
|
||||
configuredMode: latestDebug?.configuredMode,
|
||||
effectiveMode:
|
||||
status.backend === "qmd"
|
||||
? (latestDebug?.effectiveMode ?? latestDebug?.configuredMode)
|
||||
: "n/a",
|
||||
fallback: latestDebug?.fallback,
|
||||
searchMs: Math.max(0, Date.now() - searchStartedAt),
|
||||
hits: rawResults.length,
|
||||
};
|
||||
const supplementResults = shouldQuerySupplements
|
||||
? await runUnavailablePhase(
|
||||
"supplement",
|
||||
async () =>
|
||||
await searchMemoryCorpusSupplements({
|
||||
query,
|
||||
maxResults,
|
||||
agentSessionKey: options.agentSessionKey,
|
||||
corpus: requestedCorpus,
|
||||
}),
|
||||
)
|
||||
: [];
|
||||
// Wiki and memory scores use incomparable scales, so corpus=all first
|
||||
// balances candidate selection and then backfills any unused slots.
|
||||
const effectiveMax = Math.max(1, maxResults ?? 10);
|
||||
const results = mergeMemorySearchCorpusResults({
|
||||
memoryResults: surfacedMemoryResults,
|
||||
supplementResults,
|
||||
maxResults: effectiveMax,
|
||||
balanceCorpora: requestedCorpus === "all",
|
||||
});
|
||||
return jsonResult({
|
||||
results,
|
||||
provider,
|
||||
model,
|
||||
fallback,
|
||||
citations: citationsMode,
|
||||
mode: searchMode,
|
||||
debug: searchDebug,
|
||||
});
|
||||
},
|
||||
});
|
||||
if (outcome.status === "unavailable") {
|
||||
const unavailablePhase = failedUnavailablePhase ?? activeUnavailablePhase;
|
||||
const shouldRecordCooldown =
|
||||
requestedCorpus !== "wiki" &&
|
||||
(requestedCorpus !== "all" || unavailablePhase === "memory");
|
||||
if (shouldRecordCooldown) {
|
||||
recordMemorySearchToolCooldown(cooldownKey, outcome.error);
|
||||
}
|
||||
const supplementResults = shouldQuerySupplements
|
||||
? await searchMemoryCorpusSupplements({
|
||||
query,
|
||||
maxResults,
|
||||
agentSessionKey: options.agentSessionKey,
|
||||
corpus: requestedCorpus,
|
||||
})
|
||||
: [];
|
||||
// Wiki and memory scores use incomparable scales, so corpus=all first
|
||||
// balances candidate selection and then backfills any unused slots.
|
||||
const effectiveMax = Math.max(1, maxResults ?? 10);
|
||||
const results = mergeMemorySearchCorpusResults({
|
||||
memoryResults: surfacedMemoryResults,
|
||||
supplementResults,
|
||||
maxResults: effectiveMax,
|
||||
balanceCorpora: requestedCorpus === "all",
|
||||
});
|
||||
return jsonResult({
|
||||
results,
|
||||
provider,
|
||||
model,
|
||||
fallback,
|
||||
citations: citationsMode,
|
||||
mode: searchMode,
|
||||
debug: searchDebug,
|
||||
});
|
||||
} catch (err) {
|
||||
const message = formatErrorMessage(err);
|
||||
return jsonResult(buildMemorySearchUnavailableResult(message));
|
||||
return jsonResult(buildMemorySearchUnavailableResult(outcome.error));
|
||||
}
|
||||
return outcome.value;
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@@ -624,7 +624,9 @@ describe("memory plugin e2e", () => {
|
||||
expect(providerOptions.fallback).toBe("none");
|
||||
expect(providerOptions.model).toBe("text-embedding-3-small");
|
||||
expect(providerOptions).not.toHaveProperty("remote");
|
||||
expect(embedQuery).toHaveBeenCalledWith("project memory");
|
||||
expect(embedQuery).toHaveBeenCalledWith("project memory", {
|
||||
signal: expect.any(AbortSignal),
|
||||
});
|
||||
} finally {
|
||||
vi.doUnmock("openclaw/plugin-sdk/memory-core-host-engine-embeddings");
|
||||
vi.doUnmock("openai");
|
||||
@@ -711,6 +713,99 @@ describe("memory plugin e2e", () => {
|
||||
});
|
||||
});
|
||||
|
||||
test("returns unavailable when memory_recall embedding does not settle", async () => {
|
||||
vi.useFakeTimers();
|
||||
const ensureGlobalUndiciEnvProxyDispatcher = vi.fn();
|
||||
const post = vi.fn(() => new Promise(() => undefined));
|
||||
const loadLanceDbModule = vi.fn(async () => ({
|
||||
connect: vi.fn(async () => ({
|
||||
tableNames: vi.fn(async () => ["memories"]),
|
||||
openTable: vi.fn(async () => ({
|
||||
vectorSearch: vi.fn(),
|
||||
countRows: vi.fn(async () => 0),
|
||||
add: vi.fn(async () => undefined),
|
||||
delete: vi.fn(async () => undefined),
|
||||
})),
|
||||
})),
|
||||
}));
|
||||
|
||||
try {
|
||||
await withMockedOpenAiMemoryPlugin({
|
||||
ensureGlobalUndiciEnvProxyDispatcher,
|
||||
openAiPost: post,
|
||||
loadLanceDbModule,
|
||||
run: async (dynamicMemoryPlugin) => {
|
||||
const registeredTools: any[] = [];
|
||||
const logger = {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
};
|
||||
const mockApi = {
|
||||
id: "memory-lancedb",
|
||||
name: "Memory (LanceDB)",
|
||||
source: "test",
|
||||
config: {},
|
||||
pluginConfig: {
|
||||
embedding: {
|
||||
apiKey: OPENAI_API_KEY,
|
||||
model: "text-embedding-3-small",
|
||||
},
|
||||
dbPath: getDbPath(),
|
||||
autoCapture: false,
|
||||
autoRecall: false,
|
||||
},
|
||||
runtime: {},
|
||||
logger,
|
||||
registerTool: (tool: any, opts: any) => {
|
||||
registeredTools.push({ tool, opts });
|
||||
},
|
||||
registerCli: vi.fn(),
|
||||
registerService: vi.fn(),
|
||||
on: vi.fn(),
|
||||
resolvePath: (filePath: string) => filePath,
|
||||
};
|
||||
|
||||
dynamicMemoryPlugin.register(mockApi as any);
|
||||
const recallTool = registeredTools.find((t) => t.opts?.name === "memory_recall")?.tool;
|
||||
if (!recallTool) {
|
||||
throw new Error("memory_recall tool was not registered");
|
||||
}
|
||||
|
||||
const resultPromise = recallTool.execute("timeout-call", { query: "project memory" });
|
||||
await vi.advanceTimersByTimeAsync(15_000);
|
||||
const result = await resultPromise;
|
||||
|
||||
expect(result.details).toMatchObject({
|
||||
count: 0,
|
||||
disabled: true,
|
||||
unavailable: true,
|
||||
error: "memory_recall timed out after 15s",
|
||||
});
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
"memory-lancedb: memory_recall timed out after 15000ms; returning unavailable memory result",
|
||||
);
|
||||
expect(loadLanceDbModule).not.toHaveBeenCalled();
|
||||
|
||||
const cooldownResult = await recallTool.execute("cooldown-call", {
|
||||
query: "project memory again",
|
||||
});
|
||||
expect(cooldownResult.details).toMatchObject({
|
||||
count: 0,
|
||||
disabled: true,
|
||||
unavailable: true,
|
||||
error: "memory_recall timed out after 15s",
|
||||
});
|
||||
expect(post).toHaveBeenCalledTimes(1);
|
||||
expect(loadLanceDbModule).not.toHaveBeenCalled();
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
test("normalizes signed decimal CLI limits through the shared parser", async () => {
|
||||
const ensureGlobalUndiciEnvProxyDispatcher = vi.fn();
|
||||
const toArray = vi.fn(async () => []);
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
import { Buffer } from "node:buffer";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type * as LanceDB from "@lancedb/lancedb";
|
||||
import type { AgentToolResult } from "openclaw/plugin-sdk/agent-core";
|
||||
import {
|
||||
optionalFiniteNumberSchema,
|
||||
optionalPositiveIntegerSchema,
|
||||
@@ -195,6 +196,8 @@ function resolveAutoCaptureStartIndex(
|
||||
|
||||
const TABLE_NAME = "memories";
|
||||
const DEFAULT_AUTO_RECALL_TIMEOUT_MS = 15_000;
|
||||
const DEFAULT_TOOL_RECALL_TIMEOUT_MS = 15_000;
|
||||
const DEFAULT_TOOL_RECALL_COOLDOWN_MS = 60_000;
|
||||
|
||||
function parsePositiveIntegerOption(value: string | undefined, flag: string): number | undefined {
|
||||
if (value === undefined) {
|
||||
@@ -442,8 +445,25 @@ class ProviderAdapterEmbeddings implements Embeddings {
|
||||
return result.provider;
|
||||
}
|
||||
|
||||
async embed(text: string): Promise<number[]> {
|
||||
return await (await this.getProvider()).embedQuery(text);
|
||||
async embed(text: string, options?: { timeoutMs?: number }): Promise<number[]> {
|
||||
const provider = await this.getProvider();
|
||||
if (!options?.timeoutMs) {
|
||||
return await provider.embedQuery(text);
|
||||
}
|
||||
const controller = new AbortController();
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
try {
|
||||
timer = setTimeout(
|
||||
() => controller.abort(new Error("memory-lancedb embedding timed out")),
|
||||
resolveTimerTimeoutMs(options.timeoutMs, 1),
|
||||
);
|
||||
timer.unref?.();
|
||||
return await provider.embedQuery(text, { signal: controller.signal });
|
||||
} finally {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -473,6 +493,34 @@ async function runWithTimeout<T>(params: {
|
||||
}
|
||||
}
|
||||
|
||||
function formatMemoryRecallError(error: unknown): string {
|
||||
return error instanceof Error ? error.message : String(error);
|
||||
}
|
||||
|
||||
function buildMemoryRecallUnavailableResult(error: string): AgentToolResult<{
|
||||
count: number;
|
||||
disabled: true;
|
||||
unavailable: true;
|
||||
error: string;
|
||||
}> {
|
||||
return {
|
||||
content: [{ type: "text", text: "Memory recall is unavailable right now." }],
|
||||
details: {
|
||||
count: 0,
|
||||
disabled: true,
|
||||
unavailable: true,
|
||||
error,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
class MemoryRecallEmbeddingError extends Error {
|
||||
constructor(readonly originalError: unknown) {
|
||||
super(formatMemoryRecallError(originalError));
|
||||
this.name = "MemoryRecallEmbeddingError";
|
||||
}
|
||||
}
|
||||
|
||||
export const testing = {
|
||||
runWithTimeout,
|
||||
} as const;
|
||||
@@ -676,6 +724,7 @@ export default definePluginEntry({
|
||||
const db = new MemoryDB(resolvedDbPath, vectorDim, cfg.storageOptions);
|
||||
const embeddings = createEmbeddings(api, cfg);
|
||||
const autoCaptureCursors = new Map<string, AutoCaptureCursor>();
|
||||
let memoryRecallCooldown: { until: number; error: string } | undefined;
|
||||
const resolveCurrentHookConfig = () => {
|
||||
const runtimePluginConfig = resolveLivePluginConfigObject(
|
||||
api.runtime.config?.current
|
||||
@@ -708,6 +757,22 @@ export default definePluginEntry({
|
||||
...asRecord(runtimePluginConfig),
|
||||
});
|
||||
};
|
||||
const readMemoryRecallCooldown = (): { error: string } | undefined => {
|
||||
if (!memoryRecallCooldown) {
|
||||
return undefined;
|
||||
}
|
||||
if (memoryRecallCooldown.until <= Date.now()) {
|
||||
memoryRecallCooldown = undefined;
|
||||
return undefined;
|
||||
}
|
||||
return { error: memoryRecallCooldown.error };
|
||||
};
|
||||
const recordMemoryRecallCooldown = (error: string): void => {
|
||||
memoryRecallCooldown = {
|
||||
until: Date.now() + DEFAULT_TOOL_RECALL_COOLDOWN_MS,
|
||||
error,
|
||||
};
|
||||
};
|
||||
|
||||
api.logger.info(`memory-lancedb: plugin registered (db: ${resolvedDbPath}, lazy init)`);
|
||||
api.registerMemoryCapability?.({
|
||||
@@ -739,10 +804,47 @@ export default definePluginEntry({
|
||||
const limit = readPositiveIntegerParam(rawParams, "limit") ?? 5;
|
||||
|
||||
const currentCfg = resolveCurrentHookConfig();
|
||||
const vector = await embeddings.embed(
|
||||
normalizeRecallQuery(query, currentCfg.recallMaxChars),
|
||||
);
|
||||
const results = await db.search(vector, limit, 0.1);
|
||||
const cooldown = readMemoryRecallCooldown();
|
||||
if (cooldown) {
|
||||
return buildMemoryRecallUnavailableResult(cooldown.error);
|
||||
}
|
||||
let recall: Awaited<ReturnType<typeof runWithTimeout<MemorySearchResult[]>>>;
|
||||
try {
|
||||
recall = await runWithTimeout({
|
||||
timeoutMs: DEFAULT_TOOL_RECALL_TIMEOUT_MS,
|
||||
task: async () => {
|
||||
let vector: number[];
|
||||
try {
|
||||
vector = await embeddings.embed(
|
||||
normalizeRecallQuery(query, currentCfg.recallMaxChars),
|
||||
{ timeoutMs: DEFAULT_TOOL_RECALL_TIMEOUT_MS },
|
||||
);
|
||||
} catch (error) {
|
||||
throw new MemoryRecallEmbeddingError(error);
|
||||
}
|
||||
return await db.search(vector, limit, 0.1);
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
if (!(error instanceof MemoryRecallEmbeddingError)) {
|
||||
throw error;
|
||||
}
|
||||
const message = formatMemoryRecallError(error.originalError);
|
||||
recordMemoryRecallCooldown(message);
|
||||
api.logger.warn?.(
|
||||
`memory-lancedb: memory_recall failed: ${message}; returning unavailable memory result`,
|
||||
);
|
||||
return buildMemoryRecallUnavailableResult(message);
|
||||
}
|
||||
if (recall.status === "timeout") {
|
||||
const message = `memory_recall timed out after ${Math.round(DEFAULT_TOOL_RECALL_TIMEOUT_MS / 1000)}s`;
|
||||
recordMemoryRecallCooldown(message);
|
||||
api.logger.warn?.(
|
||||
`memory-lancedb: memory_recall timed out after ${DEFAULT_TOOL_RECALL_TIMEOUT_MS}ms; returning unavailable memory result`,
|
||||
);
|
||||
return buildMemoryRecallUnavailableResult(message);
|
||||
}
|
||||
const results = recall.value;
|
||||
|
||||
if (results.length === 0) {
|
||||
return {
|
||||
|
||||
@@ -99,4 +99,24 @@ describe("OpenAI embedding provider", () => {
|
||||
dimensions: 512,
|
||||
});
|
||||
});
|
||||
|
||||
it("forwards custom provider ids to the remote embedding client", async () => {
|
||||
await createOpenAiEmbeddingProvider(createOptions({ provider: "bailian-embedding" }));
|
||||
|
||||
expect(mocks.resolveRemoteEmbeddingClient).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
provider: "bailian-embedding",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("defaults the remote embedding client lookup to openai", async () => {
|
||||
await createOpenAiEmbeddingProvider(createOptions({ provider: undefined }));
|
||||
|
||||
expect(mocks.resolveRemoteEmbeddingClient).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
provider: "openai",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -95,7 +95,7 @@ async function resolveOpenAiEmbeddingClient(
|
||||
options: MemoryEmbeddingProviderCreateOptions,
|
||||
): Promise<OpenAiEmbeddingClient> {
|
||||
const client = await resolveRemoteEmbeddingClient({
|
||||
provider: "openai",
|
||||
provider: options.provider ?? "openai",
|
||||
options,
|
||||
defaultBaseUrl: DEFAULT_OPENAI_BASE_URL,
|
||||
normalizeModel: normalizeOpenAiModel,
|
||||
|
||||
@@ -79,4 +79,36 @@ describe("OpenAI memory embedding adapter", () => {
|
||||
input_type: "document",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves the caller provider id for custom OpenAI-compatible embedding providers", async () => {
|
||||
const result = await openAiMemoryEmbeddingProviderAdapter.create({
|
||||
config: {} as never,
|
||||
provider: "bailian-embedding",
|
||||
model: "text-embedding-v3",
|
||||
fallback: "none",
|
||||
});
|
||||
|
||||
expect(mocks.createOpenAiEmbeddingProvider).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
provider: "bailian-embedding",
|
||||
fallback: "none",
|
||||
model: "text-embedding-v3",
|
||||
}),
|
||||
);
|
||||
expect(result.runtime?.cacheKeyData?.provider).toBe("bailian-embedding");
|
||||
});
|
||||
|
||||
it("defaults provider id to openai when the caller leaves it unset", async () => {
|
||||
await openAiMemoryEmbeddingProviderAdapter.create({
|
||||
config: {} as never,
|
||||
model: "text-embedding-3-small",
|
||||
fallback: "none",
|
||||
});
|
||||
|
||||
expect(mocks.createOpenAiEmbeddingProvider).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
provider: "openai",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -19,9 +19,10 @@ export const openAiMemoryEmbeddingProviderAdapter: MemoryEmbeddingProviderAdapte
|
||||
allowExplicitWhenConfiguredAuto: true,
|
||||
shouldContinueAutoSelection: isMissingEmbeddingApiKeyError,
|
||||
create: async (options) => {
|
||||
const resolvedProvider = options.provider ?? "openai";
|
||||
const { provider, client } = await createOpenAiEmbeddingProvider({
|
||||
...options,
|
||||
provider: "openai",
|
||||
provider: resolvedProvider,
|
||||
fallback: "none",
|
||||
});
|
||||
return {
|
||||
@@ -29,7 +30,7 @@ export const openAiMemoryEmbeddingProviderAdapter: MemoryEmbeddingProviderAdapte
|
||||
runtime: {
|
||||
id: "openai",
|
||||
cacheKeyData: {
|
||||
provider: "openai",
|
||||
provider: resolvedProvider,
|
||||
baseUrl: client.baseUrl,
|
||||
model: client.model,
|
||||
outputDimensionality: client.outputDimensionality,
|
||||
|
||||
Reference in New Issue
Block a user