fix(memory): retry transient FileProvider-backed reads (#85351)

This commit is contained in:
NianJiu
2026-06-02 03:40:20 +08:00
committed by GitHub
parent 193988bc5b
commit 5a55135146
15 changed files with 367 additions and 16 deletions

View File

@@ -1,2 +1,2 @@
bdcf661ec680f79819096950295bdb04805aac9639477058d8855f294f6d8034 plugin-sdk-api-baseline.json
6b8c92cc5a9277f90973370102fa31efb23ffd93008c3ed961d38e4a8a3073b0 plugin-sdk-api-baseline.jsonl
63d49032a9b4dc4874a0ca17be73ecc97a2df5d1f47b4e72db34868423370558 plugin-sdk-api-baseline.json
af79f7d711afa0a8563782b8f5cdd7e46b9aea245f5e7ebc464327a8969ed65e plugin-sdk-api-baseline.jsonl

View File

@@ -12,6 +12,7 @@ import {
chunkMarkdown,
hashText,
remapChunkLines,
retryTransientMemoryRead,
type MemoryChunk,
type MemorySource,
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
@@ -755,7 +756,12 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
if ("kind" in entry && entry.kind === "multimodal") {
return;
}
const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8"));
const content =
options.content ??
(await retryTransientMemoryRead(
() => fs.readFile(entry.absPath, "utf-8"),
`read memory markdown for indexing ${entry.absPath}`,
));
const chunks = filterNonEmptyMemoryChunks(chunkMarkdown(content, this.settings.chunking));
if (options.source === "sessions" && "lineMap" in entry) {
remapChunkLines(chunks, entry.lineMap);
@@ -785,7 +791,12 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
structuredInputBytes = multimodalChunk.structuredInputBytes;
chunks = [multimodalChunk.chunk];
} else {
const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8"));
const content =
options.content ??
(await retryTransientMemoryRead(
() => fs.readFile(entry.absPath, "utf-8"),
`read memory markdown for indexing ${entry.absPath}`,
));
const baseChunks = filterNonEmptyMemoryChunks(chunkMarkdown(content, this.settings.chunking));
chunks = this.provider
? enforceEmbeddingMaxInputTokens(this.provider, baseChunks, EMBEDDING_BATCH_MAX_TOKENS)

View File

@@ -164,6 +164,41 @@ describe("session startup catch-up", () => {
expect(harness.syncCalls).toEqual([{ reason: "session-startup-catchup" }]);
});
it("retries transient session transcript reads during session indexing", async () => {
const session = await writeSessionFile("thread.jsonl");
const harness = new SessionStartupCatchupHarness([]);
const realOpen = fs.open;
let attempts = 0;
const openSpy = vi
.spyOn(fs, "open")
.mockImplementation(async (...args: Parameters<typeof realOpen>) => {
const [target, flags, mode] = args;
if (
typeof target === "string" &&
path.resolve(target) === session.filePath &&
attempts++ === 0
) {
const err = new Error(
"Unknown system error -11: Unknown system error -11, open",
) as NodeJS.ErrnoException;
err.code = "UNKNOWN";
err.errno = -11;
throw err;
}
return await realOpen(target, flags, mode);
});
try {
await expect((harness as any).syncSessionFiles({ needsFullReindex: true })).resolves.toBe(
undefined,
);
expect(attempts).toBe(2);
} finally {
openSpy.mockRestore();
}
});
it("can mark startup catch-up files without scheduling background sync", async () => {
const session = await writeSessionFile("thread.jsonl");
const harness = new SessionStartupCatchupHarness([
@@ -197,4 +232,71 @@ describe("session startup catch-up", () => {
expect(harness.isSessionsDirty()).toBe(false);
expect(harness.syncCalls).toEqual([]);
});
it.each([
{
name: "read",
fileName: "delta-read.jsonl",
failOn: "read" as const,
code: "EWOULDBLOCK",
},
{
name: "open",
fileName: "delta-open.jsonl",
failOn: "open" as const,
code: "EAGAIN",
},
])("retries transient session transcript $name failures during delta updates", async (params) => {
const session = await writeSessionFile(params.fileName);
const harness = new SessionStartupCatchupHarness([]);
let attempts = 0;
const sessionBuffer = await fs.readFile(session.filePath);
const openSpy = vi
.spyOn(fs, "open")
.mockImplementation(async (...args: Parameters<typeof fs.open>) => {
const [target] = args;
if (
params.failOn === "open" &&
typeof target === "string" &&
path.resolve(target) === session.filePath &&
attempts++ === 0
) {
const err = new Error(
"Unknown system error -11: Unknown system error -11, open",
) as NodeJS.ErrnoException;
err.code = params.code;
err.errno = -11;
throw err;
}
return {
read: async (buffer: Buffer, offset: number, length: number, position: number | null) => {
if (params.failOn === "read" && attempts++ === 0) {
const err = new Error(
"Unknown system error -11: Unknown system error -11, read",
) as NodeJS.ErrnoException;
err.code = params.code;
err.errno = -11;
throw err;
}
const start = position ?? 0;
const chunk = sessionBuffer.subarray(start, start + length);
chunk.copy(buffer, offset);
return { bytesRead: chunk.length, buffer };
},
close: async () => {},
} as unknown as Awaited<ReturnType<typeof fs.open>>;
});
try {
const delta = await (harness as any).updateSessionDelta(session.filePath);
expect(delta).toMatchObject({
pendingBytes: session.size,
pendingMessages: 1,
});
expect(attempts).toBe(2);
} finally {
openSpy.mockRestore();
}
});
});

View File

@@ -29,6 +29,7 @@ import {
listMemoryFiles,
loadSqliteVecExtension,
normalizeExtraMemoryPaths,
retryTransientMemoryRead,
runWithConcurrency,
type MemorySource,
type MemorySyncProgressUpdate,
@@ -989,7 +990,10 @@ export abstract class MemoryManagerSyncOps {
}
let handle;
try {
handle = await fs.open(absPath, "r");
handle = await retryTransientMemoryRead(
() => fs.open(absPath, "r"),
`open session transcript for newline count ${absPath}`,
);
} catch (err) {
if (isFileMissingError(err)) {
return 0;
@@ -1002,7 +1006,10 @@ export abstract class MemoryManagerSyncOps {
const buffer = Buffer.alloc(SESSION_DELTA_READ_CHUNK_BYTES);
while (offset < end) {
const toRead = Math.min(buffer.length, end - offset);
const { bytesRead } = await handle.read(buffer, 0, toRead, offset);
const { bytesRead } = await retryTransientMemoryRead(
() => handle.read(buffer, 0, toRead, offset),
`count session transcript newlines ${absPath}`,
);
if (bytesRead <= 0) {
break;
}

View File

@@ -578,4 +578,51 @@ describe("compileMemoryWikiVault", () => {
fs.readFile(path.join(rootDir, "concepts", "gamma.md"), "utf8"),
).resolves.not.toContain("### Referenced By");
});
it("retries transient page reads during compile", async () => {
const { rootDir, config } = await createVault({
rootDir: nextCaseRoot(),
initialize: true,
});
const sourcePath = path.join(rootDir, "sources", "alpha.md");
await fs.writeFile(
sourcePath,
renderWikiMarkdown({
frontmatter: { pageType: "source", id: "source.alpha", title: "Alpha" },
body: "# Alpha\n",
}),
"utf8",
);
const realReadFile = fs.readFile;
let attempts = 0;
const readFileSpy = vi
.spyOn(fs, "readFile")
.mockImplementation(async (...args: Parameters<typeof realReadFile>) => {
const [target, options] = args;
if (
typeof target === "string" &&
path.resolve(target) === sourcePath &&
options === "utf8" &&
attempts++ === 0
) {
const err = new Error(
"Unknown system error -11: Unknown system error -11, read",
) as NodeJS.ErrnoException;
err.code = "EDEADLK";
err.errno = -11;
throw err;
}
return await realReadFile(target, options as never);
});
try {
const result = await compileMemoryWikiVault(config);
expect(result.pageCounts.source).toBe(1);
expect(attempts).toBeGreaterThanOrEqual(2);
} finally {
readFileSpy.mockRestore();
}
});
});

View File

@@ -1,6 +1,7 @@
import fs from "node:fs/promises";
import path from "node:path";
import { runTasksWithConcurrency } from "openclaw/plugin-sdk/concurrency-runtime";
import { retryTransientMemoryRead } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
import {
replaceManagedMarkdownBlock,
withTrailingNewline,
@@ -360,7 +361,10 @@ async function readPageSummaries(rootDir: string): Promise<WikiPageSummary[]> {
const readResult = await runTasksWithConcurrency({
tasks: filePaths.map((relativePath) => async () => {
const absolutePath = path.join(rootDir, relativePath);
const raw = await fs.readFile(absolutePath, "utf8");
const raw = await retryTransientMemoryRead(
() => fs.readFile(absolutePath, "utf8"),
`read wiki page ${absolutePath}`,
);
return toWikiPageSummary({ absolutePath, relativePath, raw });
}),
limit: READ_PAGE_SUMMARIES_CONCURRENCY,

View File

@@ -16,6 +16,7 @@ export {
type MemoryFileEntry,
} from "./host/internal.js";
export { readMemoryFile } from "./host/read-file.js";
export { isTransientMemoryReadError, retryTransientMemoryRead } from "./host/read-retry.js";
export {
buildMemoryReadResult,
buildMemoryReadResultFromSlice,

View File

@@ -1,4 +1,5 @@
import fsSync from "node:fs";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
@@ -163,6 +164,38 @@ describe("memory host SDK package internals", () => {
expect(imageEntry.contentText).toBe("Image file: diagram.png");
});
it("retries transient markdown reads while building file entries", async () => {
const tmpDir = getTmpDir();
const notePath = path.join(tmpDir, "note.md");
fsSync.writeFileSync(notePath, "hello", "utf-8");
const realOpen = fs.open;
let attempts = 0;
const openSpy = vi
.spyOn(fs, "open")
.mockImplementation(async (...args: Parameters<typeof realOpen>) => {
const [target, flags, mode] = args;
if (typeof target === "string" && path.resolve(target) === notePath && attempts++ === 0) {
const err = new Error(
"Unknown system error -11: Unknown system error -11, open",
) as NodeJS.ErrnoException;
err.code = "UNKNOWN";
err.errno = -11;
throw err;
}
return await realOpen(target, flags, mode);
});
try {
const entry = expectFileEntry(await buildFileEntry(notePath, tmpDir));
expect(entry.path).toBe("note.md");
expect(entry.kind).toBe("markdown");
expect(attempts).toBe(2);
} finally {
openSpy.mockRestore();
}
});
it("builds multimodal chunks lazily and rejects changed files", async () => {
const tmpDir = getTmpDir();
const imagePath = path.join(tmpDir, "diagram.png");

View File

@@ -29,6 +29,7 @@ import {
resolveCanonicalRootMemoryFile,
shouldSkipRootMemoryAuxiliaryPath,
} from "./openclaw-runtime-memory.js";
import { retryTransientMemoryRead } from "./read-retry.js";
import { normalizeStringEntries, uniqueStrings } from "./string-utils.js";
export { hashText } from "./hash.js";
@@ -245,10 +246,14 @@ export async function buildFileEntry(
let buffer: Buffer;
try {
buffer = (
await readRegularFile({
filePath: absPath,
maxBytes: multimodalSettings.maxFileBytes,
})
await retryTransientMemoryRead(
() =>
readRegularFile({
filePath: absPath,
maxBytes: multimodalSettings.maxFileBytes,
}),
`read multimodal memory file ${absPath}`,
)
).buffer;
} catch (err) {
if (isFileMissingError(err)) {
@@ -285,7 +290,12 @@ export async function buildFileEntry(
}
let content: string;
try {
content = (await readRegularFile({ filePath: absPath })).buffer.toString("utf-8");
content = (
await retryTransientMemoryRead(
() => readRegularFile({ filePath: absPath }),
`read memory index file ${absPath}`,
)
).buffer.toString("utf-8");
} catch (err) {
if (isFileMissingError(err)) {
return null;
@@ -322,7 +332,12 @@ async function loadMultimodalEmbeddingInput(
}
let buffer: Buffer;
try {
buffer = (await readRegularFile({ filePath: entry.absPath, maxBytes: entry.size })).buffer;
buffer = (
await retryTransientMemoryRead(
() => readRegularFile({ filePath: entry.absPath, maxBytes: entry.size }),
`read multimodal indexing file ${entry.absPath}`,
)
).buffer;
} catch (err) {
if (isFileMissingError(err)) {
return null;

View File

@@ -1,7 +1,7 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { readMemoryFile } from "./read-file.js";
async function createDirectorySymlink(target: string, linkPath: string): Promise<boolean> {
@@ -109,4 +109,52 @@ describe("readMemoryFile", () => {
await fs.rm(tmpRoot, { recursive: true, force: true });
}
});
it("retries transient read errors for workspace memory files", async () => {
const tmpRoot = await fs.mkdtemp(path.join(os.tmpdir(), "memory-read-file-"));
try {
const workspaceDir = path.join(tmpRoot, "workspace");
const relPath = "memory/retry.md";
const absPath = path.join(workspaceDir, relPath);
await fs.mkdir(path.dirname(absPath), { recursive: true });
await fs.writeFile(absPath, "alpha\nbeta", "utf-8");
const realOpen = fs.open;
let attempts = 0;
const openSpy = vi
.spyOn(fs, "open")
.mockImplementation(async (...args: Parameters<typeof realOpen>) => {
const [target, flags, mode] = args;
if (typeof target === "string" && path.resolve(target) === absPath && attempts++ === 0) {
const err = new Error(
"Unknown system error -11: Unknown system error -11, open",
) as NodeJS.ErrnoException;
err.code = "UNKNOWN";
err.errno = -11;
throw err;
}
return await realOpen(target, flags, mode);
});
try {
await expect(
readMemoryFile({
workspaceDir,
extraPaths: [],
relPath,
}),
).resolves.toEqual({
text: "alpha\nbeta",
path: relPath,
from: 1,
lines: 2,
});
expect(attempts).toBe(2);
} finally {
openSpy.mockRestore();
}
} finally {
await fs.rm(tmpRoot, { recursive: true, force: true });
}
});
});

View File

@@ -21,6 +21,7 @@ import {
DEFAULT_MEMORY_READ_LINES,
type MemoryReadResult,
} from "./read-file-shared.js";
import { retryTransientMemoryRead } from "./read-retry.js";
async function isAllowedAdditionalDirectoryPath(
additionalPath: string,
@@ -126,7 +127,12 @@ export async function readMemoryFile(params: {
}
let content: string;
try {
content = (await readRegularFile({ filePath: absPath })).buffer.toString("utf-8");
content = (
await retryTransientMemoryRead(
() => readRegularFile({ filePath: absPath }),
`read memory file ${absPath}`,
)
).buffer.toString("utf-8");
} catch (err) {
if (isFileDisappearedDuringReadError(err)) {
return { text: "", path: relPath };

View File

@@ -0,0 +1,25 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { retryTransientMemoryRead } from "./read-retry.js";
afterEach(() => {
vi.restoreAllMocks();
});
describe("retryTransientMemoryRead", () => {
it("uses a short two-retry budget for transient file read errors", async () => {
const err = new Error("Unknown system error -11: Unknown system error -11, read");
const run = vi.fn<() => Promise<string>>().mockRejectedValue(err);
const timeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((callback) => {
if (typeof callback === "function") {
callback();
}
return 0 as unknown as ReturnType<typeof setTimeout>;
});
await expect(retryTransientMemoryRead(run)).rejects.toThrow("Unknown system error -11");
expect(run).toHaveBeenCalledTimes(3);
expect(timeoutSpy).toHaveBeenNthCalledWith(1, expect.any(Function), 25);
expect(timeoutSpy).toHaveBeenNthCalledWith(2, expect.any(Function), 50);
});
});

View File

@@ -0,0 +1,44 @@
import { retryAsync } from "./retry-utils.js";
const TRANSIENT_MEMORY_READ_ERRNO = -11;
const TRANSIENT_MEMORY_READ_CODES = new Set(["EAGAIN", "EWOULDBLOCK", "EDEADLK"]);
const TRANSIENT_MEMORY_READ_MESSAGE = /Unknown system error -11\b/i;
function getErrno(error: unknown): number | undefined {
return typeof (error as NodeJS.ErrnoException | undefined)?.errno === "number"
? (error as NodeJS.ErrnoException).errno
: undefined;
}
function getCode(error: unknown): string | undefined {
return typeof (error as NodeJS.ErrnoException | undefined)?.code === "string"
? (error as NodeJS.ErrnoException).code
: undefined;
}
export function isTransientMemoryReadError(error: unknown): boolean {
const code = getCode(error);
if (code && TRANSIENT_MEMORY_READ_CODES.has(code)) {
return true;
}
const errno = getErrno(error);
if (errno === TRANSIENT_MEMORY_READ_ERRNO) {
return true;
}
return error instanceof Error && TRANSIENT_MEMORY_READ_MESSAGE.test(error.message);
}
export async function retryTransientMemoryRead<T>(
read: () => Promise<T>,
label = "memory read",
): Promise<T> {
return await retryAsync(read, {
attempts: 3,
minDelayMs: 25,
maxDelayMs: 50,
label,
shouldRetry: (error) => isTransientMemoryReadError(error),
});
}

View File

@@ -20,6 +20,7 @@ import {
stripInboundMetadata,
stripInternalRuntimeContext,
} from "./openclaw-runtime-session.js";
import { retryTransientMemoryRead } from "./read-retry.js";
const DREAMING_NARRATIVE_RUN_PREFIX = "dreaming-narrative-";
// Keep the historical one-line-per-message export shape for normal turns, but
@@ -565,7 +566,12 @@ export async function buildSessionEntry(
messageTimestampsMs: [],
};
}
const raw = (await readRegularFile({ filePath: absPath })).buffer.toString("utf-8");
const raw = (
await retryTransientMemoryRead(
() => readRegularFile({ filePath: absPath }),
`read session transcript ${absPath}`,
)
).buffer.toString("utf-8");
const collected: string[] = [];
const lineMap: number[] = [];
const messageTimestampsMs: number[] = [];

View File

@@ -13,11 +13,13 @@ export {
ensureMemoryIndexSchema,
hashText,
isFileMissingError,
isTransientMemoryReadError,
listMemoryFiles,
loadSqliteVecExtension,
normalizeExtraMemoryPaths,
parseEmbedding,
readMemoryFile,
retryTransientMemoryRead,
remapChunkLines,
requireNodeSqlite,
resolveMemoryBackendConfig,