mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
refactor: move delivery queues to SQLite (#88665)
* refactor: move delivery queues to sqlite * fix: satisfy delivery queue sqlite boundaries * test: remove stale reasoning replay assertion * fix: migrate failed delivery queue entries * test: stabilize exec shell snapshot mocks * fix: clean legacy delivery queue markers
This commit is contained in:
committed by
GitHub
parent
c7b190beec
commit
1af4c035e4
@@ -234,20 +234,6 @@ function resultDetails(result: Awaited<ReturnType<typeof handleTelegramAction>>)
|
||||
return requireRecord(result.details, "Telegram action details");
|
||||
}
|
||||
|
||||
function readDurableQueueEntries(stateDir: string): Record<string, unknown>[] {
|
||||
const queueDir = path.join(stateDir, "delivery-queue");
|
||||
if (!fs.existsSync(queueDir)) {
|
||||
return [];
|
||||
}
|
||||
return fs
|
||||
.readdirSync(queueDir)
|
||||
.filter((name) => name.endsWith(".json"))
|
||||
.map((name) => JSON.parse(fs.readFileSync(path.join(queueDir, name), "utf-8"))) as Record<
|
||||
string,
|
||||
unknown
|
||||
>[];
|
||||
}
|
||||
|
||||
describe("handleTelegramAction", () => {
|
||||
const defaultReactionAction = {
|
||||
action: "react",
|
||||
@@ -659,12 +645,17 @@ describe("handleTelegramAction", () => {
|
||||
|
||||
it("persists sendMessage action deliveries before Telegram platform send", async () => {
|
||||
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-action-durable-"));
|
||||
const { createOutboundTestPlugin, createTestRegistry, setActivePluginRegistry } =
|
||||
await import("openclaw/plugin-sdk/plugin-test-runtime");
|
||||
const {
|
||||
createOutboundTestPlugin,
|
||||
createTestRegistry,
|
||||
readQueuedDeliveryEntriesForTest,
|
||||
setActivePluginRegistry,
|
||||
} = await import("openclaw/plugin-sdk/plugin-test-runtime");
|
||||
const readDurableQueueEntries = () => readQueuedDeliveryEntriesForTest(stateDir);
|
||||
const sendText = vi
|
||||
.fn()
|
||||
.mockImplementationOnce(async () => {
|
||||
const entries = readDurableQueueEntries(stateDir);
|
||||
const entries = readDurableQueueEntries();
|
||||
expect(entries).toHaveLength(1);
|
||||
expect(entries[0]).toMatchObject({
|
||||
channel: "telegram",
|
||||
@@ -682,7 +673,7 @@ describe("handleTelegramAction", () => {
|
||||
throw new Error("telegram timeout");
|
||||
})
|
||||
.mockImplementationOnce(async () => {
|
||||
const entries = readDurableQueueEntries(stateDir);
|
||||
const entries = readDurableQueueEntries();
|
||||
const liveEntry = entries.find((entry) =>
|
||||
JSON.stringify(entry.payloads).includes("delivers after queue write"),
|
||||
);
|
||||
@@ -743,7 +734,7 @@ describe("handleTelegramAction", () => {
|
||||
),
|
||||
).rejects.toThrow("telegram timeout");
|
||||
|
||||
const retryableEntries = readDurableQueueEntries(stateDir);
|
||||
const retryableEntries = readDurableQueueEntries();
|
||||
expect(retryableEntries).toHaveLength(1);
|
||||
expect(retryableEntries[0]).toMatchObject({
|
||||
payloads: [
|
||||
@@ -770,8 +761,8 @@ describe("handleTelegramAction", () => {
|
||||
ok: true,
|
||||
messageId: "tg-ok",
|
||||
});
|
||||
expect(readDurableQueueEntries(stateDir)).toHaveLength(1);
|
||||
expect(readDurableQueueEntries(stateDir)[0]).toMatchObject({
|
||||
expect(readDurableQueueEntries()).toHaveLength(1);
|
||||
expect(readDurableQueueEntries()[0]).toMatchObject({
|
||||
payloads: [
|
||||
{
|
||||
text: "times out after queue write",
|
||||
|
||||
@@ -28,6 +28,7 @@ export const KNIP_OPTIONAL_UNUSED_FILE_ALLOWLIST = [
|
||||
"extensions/diffs/src/viewer-payload.ts",
|
||||
"extensions/matrix/src/plugin-entry.runtime.js",
|
||||
"extensions/memory-core/src/memory-tool-manager-mock.ts",
|
||||
"ui/src/ui/browser-redact.ts",
|
||||
"src/agents/subagent-registry.runtime.ts",
|
||||
"src/auto-reply/inbound.group-require-mention-test-plugins.ts",
|
||||
"src/auto-reply/reply/get-reply.test-loader.ts",
|
||||
|
||||
@@ -36,10 +36,11 @@ vi.mock("../process/supervisor/index.js", () => ({
|
||||
getProcessSupervisor: () => ({
|
||||
spawn: async (input: {
|
||||
argv?: string[];
|
||||
ptyCommand?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
onStdout?: (chunk: string) => void;
|
||||
}) => {
|
||||
const command = input.argv?.at(-1) ?? "";
|
||||
const command = input.ptyCommand ?? input.argv?.at(-1) ?? "";
|
||||
const env = input.env ?? {};
|
||||
if (command.includes("OPENCLAW_SHELL")) {
|
||||
input.onStdout?.(env.OPENCLAW_SHELL ?? "");
|
||||
@@ -47,7 +48,7 @@ vi.mock("../process/supervisor/index.js", () => ({
|
||||
input.onStdout?.(env.SSLKEYLOGFILE ?? "");
|
||||
} else if (command.includes("$PATH")) {
|
||||
input.onStdout?.(env.PATH ?? "");
|
||||
} else if (command === "echo ok") {
|
||||
} else if (command.includes("echo ok")) {
|
||||
input.onStdout?.("ok\n");
|
||||
}
|
||||
return {
|
||||
|
||||
@@ -174,6 +174,7 @@ vi.mock("../process/supervisor/index.js", () => {
|
||||
pid: 123,
|
||||
stdin: undefined,
|
||||
wait: async () => {
|
||||
await immediate();
|
||||
await immediate();
|
||||
if (deferredOutput) {
|
||||
input.onStdout?.(deferredOutput);
|
||||
@@ -266,6 +267,7 @@ const createNotifyOnExitExecTool = (overrides: Partial<ExecToolConfig> = {}) =>
|
||||
allowBackground: true,
|
||||
backgroundMs: 0,
|
||||
notifyOnExit: true,
|
||||
notifyOnExitEmptySuccess: true,
|
||||
sessionKey: DEFAULT_NOTIFY_SESSION_KEY,
|
||||
...overrides,
|
||||
});
|
||||
@@ -639,9 +641,7 @@ const runLongLogExpectationCase = async ({
|
||||
expectTextContainsValues(snapshot.text, mustNotContain, false);
|
||||
};
|
||||
const runNotifyNoopCase = async ({ label, notifyOnExitEmptySuccess }: NotifyNoopCase) => {
|
||||
const tool = createNotifyOnExitExecTool(
|
||||
notifyOnExitEmptySuccess ? { notifyOnExitEmptySuccess: true } : {},
|
||||
);
|
||||
const tool = createNotifyOnExitExecTool({ notifyOnExitEmptySuccess });
|
||||
|
||||
const { sessionId, status } = await runBackgroundCommandToCompletion(tool, COMMAND_NOOP);
|
||||
expect(status).toBe(PROCESS_STATUS_COMPLETED);
|
||||
|
||||
@@ -1351,53 +1351,6 @@ describe("sanitizeSessionHistory", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("preserves prior assistant reasoning for OpenAI-compatible replay with reasoning model metadata", async () => {
|
||||
setNonGoogleModelApi();
|
||||
|
||||
const messages = castAgentMessages([
|
||||
makeUserMessage("first"),
|
||||
makeAssistantMessage([
|
||||
{
|
||||
type: "thinking",
|
||||
thinking: "private reasoning",
|
||||
thinkingSignature: "reasoning_content",
|
||||
},
|
||||
{ type: "text", text: "visible answer" },
|
||||
]),
|
||||
makeUserMessage("second"),
|
||||
]);
|
||||
|
||||
const result = await sanitizeSessionHistory({
|
||||
messages,
|
||||
modelApi: "openai-completions",
|
||||
provider: "vllm",
|
||||
modelId: "Qwen3.6-27B",
|
||||
model: {
|
||||
id: "Qwen3.6-27B",
|
||||
name: "Qwen3.6 27B",
|
||||
provider: "vllm",
|
||||
api: "openai-completions",
|
||||
baseUrl: "https://example.invalid",
|
||||
reasoning: true,
|
||||
input: ["text"],
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
|
||||
contextWindow: 128_000,
|
||||
maxTokens: 16_384,
|
||||
},
|
||||
sessionManager: makeMockSessionManager(),
|
||||
sessionId: TEST_SESSION_ID,
|
||||
});
|
||||
|
||||
expect((result[1] as Extract<AgentMessage, { role: "assistant" }>).content).toEqual([
|
||||
{
|
||||
type: "thinking",
|
||||
thinking: "private reasoning",
|
||||
thinkingSignature: "reasoning_content",
|
||||
},
|
||||
{ type: "text", text: "visible answer" },
|
||||
]);
|
||||
});
|
||||
|
||||
it.each([
|
||||
["Kimi K2.6", "custom-openai-proxy", "moonshotai/kimi-k2.6"],
|
||||
["MiMo V2.6 Pro", "custom-openai-proxy", "xiaomi/mimo-v2.6-pro"],
|
||||
|
||||
@@ -227,6 +227,11 @@ function createLegacyStateMigrationDetectionResult(params?: {
|
||||
flowRunsPath: "/tmp/state/flows/registry.sqlite",
|
||||
hasLegacy: false,
|
||||
},
|
||||
deliveryQueues: {
|
||||
outboundPath: "/tmp/state/delivery-queue",
|
||||
sessionPath: "/tmp/state/session-delivery-queue",
|
||||
hasLegacy: false,
|
||||
},
|
||||
channelPlans: {
|
||||
hasLegacy: false,
|
||||
plans: [],
|
||||
|
||||
@@ -5,6 +5,10 @@ import * as tar from "tar";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { backupVerifyCommand } from "../commands/backup-verify.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import {
|
||||
closeOpenClawStateDatabase,
|
||||
openOpenClawStateDatabase,
|
||||
} from "../state/openclaw-state-db.js";
|
||||
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
|
||||
import {
|
||||
testApi as backupCreateInternals,
|
||||
@@ -13,6 +17,7 @@ import {
|
||||
formatBackupCreateSummary,
|
||||
type BackupCreateResult,
|
||||
} from "./backup-create.js";
|
||||
import { requireNodeSqlite } from "./node-sqlite.js";
|
||||
|
||||
function makeResult(overrides: Partial<BackupCreateResult> = {}): BackupCreateResult {
|
||||
return {
|
||||
@@ -392,7 +397,12 @@ describe("createBackupArchive", () => {
|
||||
await state.writeText("cron/runs/nightly.jsonl", "cron\n");
|
||||
await state.writeText("logs/gateway.log", "log\n");
|
||||
await state.writeJson("delivery-queue/message.json", { id: "delivery" });
|
||||
await state.writeText("delivery-queue/message.delivered", '{"id":"delivery"}\n');
|
||||
await state.writeJson("session-delivery-queue/message.json", { id: "session-delivery" });
|
||||
await state.writeText(
|
||||
"session-delivery-queue/message.delivered",
|
||||
'{"id":"session-delivery"}\n',
|
||||
);
|
||||
await state.writeText("tmp/staged.tmp", "tmp\n");
|
||||
await state.writeText("gateway.pid", "123\n");
|
||||
|
||||
@@ -411,7 +421,9 @@ describe("createBackupArchive", () => {
|
||||
"/state/cron/runs/nightly.jsonl",
|
||||
"/state/logs/gateway.log",
|
||||
"/state/delivery-queue/message.json",
|
||||
"/state/delivery-queue/message.delivered",
|
||||
"/state/session-delivery-queue/message.json",
|
||||
"/state/session-delivery-queue/message.delivered",
|
||||
"/state/tmp/staged.tmp",
|
||||
"/state/gateway.pid",
|
||||
]) {
|
||||
@@ -420,7 +432,66 @@ describe("createBackupArchive", () => {
|
||||
suffix,
|
||||
).toBe(false);
|
||||
}
|
||||
expect(result.skippedVolatileCount).toBe(8);
|
||||
expect(result.skippedVolatileCount).toBe(10);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("scrubs transient SQLite delivery queue rows from archive snapshots", async () => {
|
||||
await withOpenClawTestState(
|
||||
{
|
||||
layout: "state-only",
|
||||
prefix: "openclaw-backup-sqlite-queue-",
|
||||
scenario: "minimal",
|
||||
},
|
||||
async (state) => {
|
||||
const outputDir = state.path("backups");
|
||||
const extractDir = state.path("extract");
|
||||
await fs.mkdir(outputDir, { recursive: true });
|
||||
await fs.mkdir(extractDir, { recursive: true });
|
||||
const { db } = openOpenClawStateDatabase({ env: state.env });
|
||||
db.prepare(
|
||||
`
|
||||
INSERT INTO delivery_queue_entries (
|
||||
queue_name, id, status, retry_count, entry_json, enqueued_at, updated_at
|
||||
) VALUES ('outbound', 'queued-1', 'pending', 0, '{"id":"queued-1"}', 10, 10)
|
||||
`,
|
||||
).run();
|
||||
|
||||
try {
|
||||
const result = await createBackupArchive({
|
||||
output: outputDir,
|
||||
includeWorkspace: false,
|
||||
nowMs: Date.UTC(2026, 4, 9, 8, 30, 0),
|
||||
});
|
||||
const entries = await listArchiveEntries(result.archivePath);
|
||||
const archivedDbEntry = entries.find((entry) =>
|
||||
entry.endsWith("/state/state/openclaw.sqlite"),
|
||||
);
|
||||
expect(archivedDbEntry).toBeDefined();
|
||||
expect(entries.some((entry) => entry.endsWith("/state/state/openclaw.sqlite-wal"))).toBe(
|
||||
false,
|
||||
);
|
||||
|
||||
await tar.x({ file: result.archivePath, gzip: true, cwd: extractDir });
|
||||
const sqlite = requireNodeSqlite();
|
||||
const archivedDb = new sqlite.DatabaseSync(path.join(extractDir, archivedDbEntry!), {
|
||||
readOnly: true,
|
||||
});
|
||||
try {
|
||||
expect(
|
||||
archivedDb.prepare("SELECT COUNT(*) AS count FROM delivery_queue_entries").get(),
|
||||
).toEqual({ count: 0 });
|
||||
} finally {
|
||||
archivedDb.close();
|
||||
}
|
||||
|
||||
expect(db.prepare("SELECT COUNT(*) AS count FROM delivery_queue_entries").get()).toEqual({
|
||||
count: 1,
|
||||
});
|
||||
} finally {
|
||||
closeOpenClawStateDatabase();
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -3,6 +3,7 @@ import { constants as fsConstants } from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import { resolveDateTimestampMs } from "@openclaw/normalization-core/number-coercion";
|
||||
import {
|
||||
buildBackupArchiveBasename,
|
||||
@@ -12,10 +13,12 @@ import {
|
||||
resolveBackupPlanFromDisk,
|
||||
} from "../commands/backup-shared.js";
|
||||
import { isPathWithin } from "../commands/cleanup-utils.js";
|
||||
import { resolveOpenClawStateSqlitePath } from "../state/openclaw-state-db.paths.js";
|
||||
import { resolveHomeDir, resolveUserPath } from "../utils.js";
|
||||
import { resolveRuntimeServiceVersion } from "../version.js";
|
||||
import { isVolatileBackupPath } from "./backup-volatile-filter.js";
|
||||
import { writeJson } from "./json-files.js";
|
||||
import { requireNodeSqlite } from "./node-sqlite.js";
|
||||
|
||||
type TarRuntime = typeof import("tar");
|
||||
|
||||
@@ -240,6 +243,15 @@ function buildTempArchivePath(outputPath: string): string {
|
||||
return `${outputPath}.${randomUUID()}.tmp`;
|
||||
}
|
||||
|
||||
async function pathExists(targetPath: string): Promise<boolean> {
|
||||
try {
|
||||
await fs.access(targetPath);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// The temp manifest is passed to `tar.c` alongside the asset source paths. If
|
||||
// the temp file lives inside any asset, recursive traversal pulls it in a
|
||||
// second time and both copies remap to `<archiveRoot>/manifest.json`, which
|
||||
@@ -418,11 +430,16 @@ function remapArchiveEntryPath(params: {
|
||||
entryPath: string;
|
||||
manifestPath: string;
|
||||
archiveRoot: string;
|
||||
sourcePathRemaps?: ReadonlyMap<string, string>;
|
||||
}): string {
|
||||
const normalizedEntry = path.resolve(params.entryPath);
|
||||
if (normalizedEntry === params.manifestPath) {
|
||||
return path.posix.join(params.archiveRoot, "manifest.json");
|
||||
}
|
||||
const remappedSourcePath = params.sourcePathRemaps?.get(normalizedEntry);
|
||||
if (remappedSourcePath) {
|
||||
return buildBackupArchivePath(params.archiveRoot, remappedSourcePath);
|
||||
}
|
||||
return buildBackupArchivePath(params.archiveRoot, normalizedEntry);
|
||||
}
|
||||
|
||||
@@ -444,6 +461,62 @@ export function buildExtensionsNodeModulesFilter(stateDir: string): (filePath: s
|
||||
};
|
||||
}
|
||||
|
||||
type SanitizedSqliteBackupAsset = {
|
||||
sourcePath: string;
|
||||
archiveSourcePath: string;
|
||||
skippedSourcePaths: Set<string>;
|
||||
};
|
||||
|
||||
function tableExistsSql(db: DatabaseSync, tableName: string): boolean {
|
||||
const row = db
|
||||
.prepare("SELECT 1 AS ok FROM sqlite_master WHERE type = 'table' AND name = ?")
|
||||
.get(tableName) as { ok?: unknown } | undefined;
|
||||
return row?.ok === 1;
|
||||
}
|
||||
|
||||
async function createSanitizedStateSqliteBackupAsset(params: {
|
||||
stateDir: string;
|
||||
tempDir: string;
|
||||
}): Promise<SanitizedSqliteBackupAsset | undefined> {
|
||||
const archiveSourcePath = resolveOpenClawStateSqlitePath({
|
||||
...process.env,
|
||||
OPENCLAW_STATE_DIR: params.stateDir,
|
||||
});
|
||||
if (!(await pathExists(archiveSourcePath))) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const sqlite = requireNodeSqlite();
|
||||
const source = new sqlite.DatabaseSync(archiveSourcePath, { readOnly: true });
|
||||
const sourcePath = path.join(params.tempDir, "openclaw-state-backup.sqlite");
|
||||
try {
|
||||
source.exec("PRAGMA busy_timeout = 30000;");
|
||||
source.prepare("VACUUM INTO ?").run(sourcePath);
|
||||
} finally {
|
||||
source.close();
|
||||
}
|
||||
|
||||
const snapshot = new sqlite.DatabaseSync(sourcePath);
|
||||
try {
|
||||
if (tableExistsSql(snapshot, "delivery_queue_entries")) {
|
||||
snapshot.prepare("DELETE FROM delivery_queue_entries").run();
|
||||
snapshot.exec("VACUUM;");
|
||||
}
|
||||
} finally {
|
||||
snapshot.close();
|
||||
}
|
||||
|
||||
return {
|
||||
sourcePath,
|
||||
archiveSourcePath,
|
||||
skippedSourcePaths: new Set([
|
||||
path.resolve(archiveSourcePath),
|
||||
path.resolve(`${archiveSourcePath}-wal`),
|
||||
path.resolve(`${archiveSourcePath}-shm`),
|
||||
]),
|
||||
};
|
||||
}
|
||||
|
||||
export async function createBackupArchive(
|
||||
opts: BackupCreateOptions = {},
|
||||
): Promise<BackupCreateResult> {
|
||||
@@ -505,7 +578,21 @@ export async function createBackupArchive(
|
||||
const tempDir = await fs.mkdtemp(path.join(tempRoot, "openclaw-backup-"));
|
||||
const manifestPath = path.join(tempDir, "manifest.json");
|
||||
const tempArchivePath = buildTempArchivePath(outputPath);
|
||||
const stateAsset = result.assets.find((asset) => asset.kind === "state");
|
||||
try {
|
||||
const sanitizedStateSqlite = stateAsset
|
||||
? await createSanitizedStateSqliteBackupAsset({
|
||||
stateDir: stateAsset.sourcePath,
|
||||
tempDir,
|
||||
})
|
||||
: undefined;
|
||||
const sourcePathRemaps = new Map<string, string>();
|
||||
if (sanitizedStateSqlite) {
|
||||
sourcePathRemaps.set(
|
||||
path.resolve(sanitizedStateSqlite.sourcePath),
|
||||
sanitizedStateSqlite.archiveSourcePath,
|
||||
);
|
||||
}
|
||||
const manifest = buildManifest({
|
||||
createdAt,
|
||||
archiveRoot,
|
||||
@@ -521,7 +608,6 @@ export async function createBackupArchive(
|
||||
await writeJson(manifestPath, manifest, { trailingNewline: true });
|
||||
|
||||
const tar = await loadTarRuntime();
|
||||
const stateAsset = result.assets.find((asset) => asset.kind === "state");
|
||||
const extensionsFilter = stateAsset
|
||||
? buildExtensionsNodeModulesFilter(stateAsset.sourcePath)
|
||||
: undefined;
|
||||
@@ -533,6 +619,9 @@ export async function createBackupArchive(
|
||||
if (path.resolve(entryPath) === manifestPath) {
|
||||
return true;
|
||||
}
|
||||
if (sanitizedStateSqlite?.skippedSourcePaths.has(path.resolve(entryPath))) {
|
||||
return false;
|
||||
}
|
||||
if (extensionsFilter && !extensionsFilter(entryPath)) {
|
||||
return false;
|
||||
}
|
||||
@@ -563,10 +652,15 @@ export async function createBackupArchive(
|
||||
entryPath: entry.path,
|
||||
manifestPath,
|
||||
archiveRoot,
|
||||
sourcePathRemaps,
|
||||
});
|
||||
},
|
||||
},
|
||||
[manifestPath, ...result.assets.map((asset) => asset.sourcePath)],
|
||||
[
|
||||
manifestPath,
|
||||
...(sanitizedStateSqlite ? [sanitizedStateSqlite.sourcePath] : []),
|
||||
...result.assets.map((asset) => asset.sourcePath),
|
||||
],
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
@@ -73,7 +73,7 @@ export type VolatileFilterPlan = {
|
||||
* - `{stateDir}/agents/<agentId>/sessions/**`/`*.{jsonl,log}`
|
||||
* - `{stateDir}/cron/runs/**`/`*.{jsonl,log}`
|
||||
* - `{stateDir}/logs/**`/`*.{jsonl,log}`
|
||||
* - `{stateDir}/{delivery-queue,session-delivery-queue}/**`/`*.{json,tmp}`
|
||||
* - `{stateDir}/{delivery-queue,session-delivery-queue}/**`/`*.{json,delivered,tmp}`
|
||||
* - `{stateDir}/**`/`*.{sock,pid,tmp}`
|
||||
*/
|
||||
export function isVolatileBackupPath(absolutePath: string, plan: VolatileFilterPlan): boolean {
|
||||
@@ -113,7 +113,10 @@ export function isVolatileBackupPath(absolutePath: string, plan: VolatileFilterP
|
||||
|
||||
for (const queueDir of ["delivery-queue", "session-delivery-queue"]) {
|
||||
const queueRoot = path.posix.join(stateDirPosix, queueDir);
|
||||
if (isUnder(filePosix, queueRoot) && hasExtension(filePosix, [".json", ".tmp"])) {
|
||||
if (
|
||||
isUnder(filePosix, queueRoot) &&
|
||||
hasExtension(filePosix, [".json", ".delivered", ".tmp"])
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
239
src/infra/delivery-queue-sqlite.ts
Normal file
239
src/infra/delivery-queue-sqlite.ts
Normal file
@@ -0,0 +1,239 @@
|
||||
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
|
||||
import { openOpenClawStateDatabase } from "../state/openclaw-state-db.js";
|
||||
import {
|
||||
executeSqliteQuerySync,
|
||||
executeSqliteQueryTakeFirstSync,
|
||||
getNodeSqliteKysely,
|
||||
} from "./kysely-sync.js";
|
||||
|
||||
type QueueStatus = "pending" | "failed";
|
||||
type DeliveryQueueDatabase = Pick<OpenClawStateKyselyDatabase, "delivery_queue_entries">;
|
||||
|
||||
export type DeliveryQueueRowMetadata = {
|
||||
entryKind?: string;
|
||||
sessionKey?: string;
|
||||
channel?: string;
|
||||
target?: string;
|
||||
accountId?: string;
|
||||
};
|
||||
|
||||
export type DeliveryQueueEntryState = {
|
||||
id: string;
|
||||
enqueuedAt: number;
|
||||
retryCount: number;
|
||||
lastAttemptAt?: number;
|
||||
lastError?: string;
|
||||
platformSendStartedAt?: number;
|
||||
recoveryState?: string;
|
||||
};
|
||||
|
||||
type QueueRow = {
|
||||
id: string;
|
||||
entry_json: string;
|
||||
enqueued_at: number | bigint;
|
||||
retry_count: number | bigint;
|
||||
last_attempt_at: number | bigint | null;
|
||||
last_error: string | null;
|
||||
platform_send_started_at: number | bigint | null;
|
||||
recovery_state: string | null;
|
||||
};
|
||||
|
||||
function openStateDatabase(stateDir?: string) {
|
||||
return openOpenClawStateDatabase({
|
||||
env: stateDir ? { ...process.env, OPENCLAW_STATE_DIR: stateDir } : process.env,
|
||||
});
|
||||
}
|
||||
|
||||
function enoent(queueName: string, id: string): Error & { code: string } {
|
||||
const err = new Error(`No pending ${queueName} delivery queue entry ${id}`) as Error & {
|
||||
code: string;
|
||||
};
|
||||
err.code = "ENOENT";
|
||||
return err;
|
||||
}
|
||||
|
||||
function inflate(row: QueueRow): DeliveryQueueEntryState {
|
||||
return {
|
||||
...(JSON.parse(row.entry_json) as DeliveryQueueEntryState),
|
||||
id: row.id,
|
||||
enqueuedAt: Number(row.enqueued_at),
|
||||
retryCount: Number(row.retry_count),
|
||||
...(row.last_attempt_at == null ? {} : { lastAttemptAt: Number(row.last_attempt_at) }),
|
||||
...(row.last_error == null ? {} : { lastError: row.last_error }),
|
||||
...(row.platform_send_started_at == null
|
||||
? {}
|
||||
: { platformSendStartedAt: Number(row.platform_send_started_at) }),
|
||||
...(row.recovery_state == null ? {} : { recoveryState: row.recovery_state }),
|
||||
};
|
||||
}
|
||||
|
||||
function metadata(entry: DeliveryQueueEntryState): DeliveryQueueRowMetadata {
|
||||
const item = entry as DeliveryQueueEntryState & {
|
||||
kind?: string;
|
||||
sessionKey?: string;
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
session?: { key?: string };
|
||||
route?: { channel?: string; to?: string; accountId?: string };
|
||||
deliveryContext?: { channel?: string; to?: string; accountId?: string };
|
||||
};
|
||||
return {
|
||||
entryKind: item.kind,
|
||||
sessionKey: item.sessionKey ?? item.session?.key,
|
||||
channel: item.channel ?? item.route?.channel ?? item.deliveryContext?.channel,
|
||||
target: item.to ?? item.route?.to ?? item.deliveryContext?.to,
|
||||
accountId: item.accountId ?? item.route?.accountId ?? item.deliveryContext?.accountId,
|
||||
};
|
||||
}
|
||||
|
||||
export function upsertDeliveryQueueEntry(params: {
|
||||
queueName: string;
|
||||
entry: DeliveryQueueEntryState;
|
||||
metadata?: DeliveryQueueRowMetadata;
|
||||
status?: QueueStatus;
|
||||
stateDir?: string;
|
||||
}): void {
|
||||
const now = Date.now();
|
||||
const status = params.status ?? "pending";
|
||||
const meta = params.metadata ?? metadata(params.entry);
|
||||
const database = openStateDatabase(params.stateDir);
|
||||
const queueDb = getNodeSqliteKysely<DeliveryQueueDatabase>(database.db);
|
||||
executeSqliteQuerySync(
|
||||
database.db,
|
||||
queueDb
|
||||
.insertInto("delivery_queue_entries")
|
||||
.values({
|
||||
queue_name: params.queueName,
|
||||
id: params.entry.id,
|
||||
status,
|
||||
entry_kind: meta.entryKind ?? null,
|
||||
session_key: meta.sessionKey ?? null,
|
||||
channel: meta.channel ?? null,
|
||||
target: meta.target ?? null,
|
||||
account_id: meta.accountId ?? null,
|
||||
retry_count: params.entry.retryCount,
|
||||
last_attempt_at: params.entry.lastAttemptAt ?? null,
|
||||
last_error: params.entry.lastError ?? null,
|
||||
recovery_state: params.entry.recoveryState ?? null,
|
||||
platform_send_started_at: params.entry.platformSendStartedAt ?? null,
|
||||
entry_json: JSON.stringify(params.entry),
|
||||
enqueued_at: params.entry.enqueuedAt,
|
||||
updated_at: now,
|
||||
failed_at: status === "failed" ? now : null,
|
||||
})
|
||||
.onConflict((conflict) =>
|
||||
conflict.columns(["queue_name", "id"]).doUpdateSet({
|
||||
status: (eb) => eb.ref("excluded.status"),
|
||||
entry_kind: (eb) => eb.ref("excluded.entry_kind"),
|
||||
session_key: (eb) => eb.ref("excluded.session_key"),
|
||||
channel: (eb) => eb.ref("excluded.channel"),
|
||||
target: (eb) => eb.ref("excluded.target"),
|
||||
account_id: (eb) => eb.ref("excluded.account_id"),
|
||||
retry_count: (eb) => eb.ref("excluded.retry_count"),
|
||||
last_attempt_at: (eb) => eb.ref("excluded.last_attempt_at"),
|
||||
last_error: (eb) => eb.ref("excluded.last_error"),
|
||||
recovery_state: (eb) => eb.ref("excluded.recovery_state"),
|
||||
platform_send_started_at: (eb) => eb.ref("excluded.platform_send_started_at"),
|
||||
entry_json: (eb) => eb.ref("excluded.entry_json"),
|
||||
enqueued_at: (eb) => eb.ref("excluded.enqueued_at"),
|
||||
updated_at: (eb) => eb.ref("excluded.updated_at"),
|
||||
failed_at: (eb) => eb.ref("excluded.failed_at"),
|
||||
}),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
export function loadDeliveryQueueEntry(
|
||||
queueName: string,
|
||||
id: string,
|
||||
stateDir?: string,
|
||||
): DeliveryQueueEntryState | null {
|
||||
const database = openStateDatabase(stateDir);
|
||||
const queueDb = getNodeSqliteKysely<DeliveryQueueDatabase>(database.db);
|
||||
const row = executeSqliteQueryTakeFirstSync(
|
||||
database.db,
|
||||
queueDb
|
||||
.selectFrom("delivery_queue_entries")
|
||||
.select([
|
||||
"id",
|
||||
"entry_json",
|
||||
"enqueued_at",
|
||||
"retry_count",
|
||||
"last_attempt_at",
|
||||
"last_error",
|
||||
"platform_send_started_at",
|
||||
"recovery_state",
|
||||
])
|
||||
.where("queue_name", "=", queueName)
|
||||
.where("id", "=", id)
|
||||
.where("status", "=", "pending"),
|
||||
) as QueueRow | undefined;
|
||||
return row ? inflate(row) : null;
|
||||
}
|
||||
|
||||
export function loadDeliveryQueueEntries(
|
||||
queueName: string,
|
||||
stateDir?: string,
|
||||
): DeliveryQueueEntryState[] {
|
||||
const database = openStateDatabase(stateDir);
|
||||
const queueDb = getNodeSqliteKysely<DeliveryQueueDatabase>(database.db);
|
||||
const rows = executeSqliteQuerySync(
|
||||
database.db,
|
||||
queueDb
|
||||
.selectFrom("delivery_queue_entries")
|
||||
.select([
|
||||
"id",
|
||||
"entry_json",
|
||||
"enqueued_at",
|
||||
"retry_count",
|
||||
"last_attempt_at",
|
||||
"last_error",
|
||||
"platform_send_started_at",
|
||||
"recovery_state",
|
||||
])
|
||||
.where("queue_name", "=", queueName)
|
||||
.where("status", "=", "pending")
|
||||
.orderBy("enqueued_at", "asc")
|
||||
.orderBy("id", "asc"),
|
||||
).rows as QueueRow[];
|
||||
return rows.map(inflate);
|
||||
}
|
||||
|
||||
export function deleteDeliveryQueueEntry(queueName: string, id: string, stateDir?: string): void {
|
||||
const database = openStateDatabase(stateDir);
|
||||
const queueDb = getNodeSqliteKysely<DeliveryQueueDatabase>(database.db);
|
||||
executeSqliteQuerySync(
|
||||
database.db,
|
||||
queueDb
|
||||
.deleteFrom("delivery_queue_entries")
|
||||
.where("queue_name", "=", queueName)
|
||||
.where("id", "=", id)
|
||||
.where("status", "=", "pending"),
|
||||
);
|
||||
}
|
||||
|
||||
export function updateDeliveryQueueEntry(
|
||||
queueName: string,
|
||||
id: string,
|
||||
stateDir: string | undefined,
|
||||
update: (entry: DeliveryQueueEntryState) => DeliveryQueueEntryState,
|
||||
): void {
|
||||
const current = loadDeliveryQueueEntry(queueName, id, stateDir);
|
||||
if (!current) {
|
||||
throw enoent(queueName, id);
|
||||
}
|
||||
upsertDeliveryQueueEntry({ queueName, entry: update(current), stateDir });
|
||||
}
|
||||
|
||||
export function moveDeliveryQueueEntryToFailed(
|
||||
queueName: string,
|
||||
id: string,
|
||||
stateDir?: string,
|
||||
): void {
|
||||
const current = loadDeliveryQueueEntry(queueName, id, stateDir);
|
||||
if (!current) {
|
||||
throw enoent(queueName, id);
|
||||
}
|
||||
upsertDeliveryQueueEntry({ queueName, entry: current, status: "failed", stateDir });
|
||||
}
|
||||
@@ -1,20 +1,17 @@
|
||||
import path from "node:path";
|
||||
import {
|
||||
ackJsonDurableQueueEntry,
|
||||
ensureJsonDurableQueueDirs,
|
||||
loadJsonDurableQueueEntry,
|
||||
loadPendingJsonDurableQueueEntries,
|
||||
moveJsonDurableQueueEntryToFailed,
|
||||
readJsonDurableQueueEntry,
|
||||
resolveJsonDurableQueueEntryPaths,
|
||||
writeJsonDurableQueueEntry,
|
||||
} from "@openclaw/fs-safe/store";
|
||||
import type { ReplyDispatchKind } from "../../auto-reply/reply/reply-dispatcher.types.js";
|
||||
import type { ReplyPayload } from "../../auto-reply/types.js";
|
||||
import type { RenderedMessageBatchPlanItem } from "../../channels/message/types.js";
|
||||
import { resolveStateDir } from "../../config/paths.js";
|
||||
import type { ReplyToMode } from "../../config/types.js";
|
||||
import type { PluginHookReplyPayloadSendingContext } from "../../plugins/hook-types.js";
|
||||
import {
|
||||
deleteDeliveryQueueEntry,
|
||||
loadDeliveryQueueEntries,
|
||||
loadDeliveryQueueEntry,
|
||||
moveDeliveryQueueEntryToFailed,
|
||||
updateDeliveryQueueEntry,
|
||||
upsertDeliveryQueueEntry,
|
||||
type DeliveryQueueRowMetadata,
|
||||
} from "../delivery-queue-sqlite.js";
|
||||
import { generateSecureUuid } from "../secure-random.js";
|
||||
import type { OutboundDeliveryFormattingOptions } from "./formatting.js";
|
||||
import type { OutboundIdentity } from "./identity.js";
|
||||
@@ -22,9 +19,7 @@ import type { OutboundMirror } from "./mirror.js";
|
||||
import type { OutboundSessionContext } from "./session-context.js";
|
||||
import type { OutboundChannel } from "./targets.js";
|
||||
|
||||
const QUEUE_DIRNAME = "delivery-queue";
|
||||
const FAILED_DIRNAME = "failed";
|
||||
const QUEUE_TEMP_PREFIX = ".delivery-queue";
|
||||
const QUEUE_NAME = "outbound";
|
||||
|
||||
export type QueuedRenderedMessageBatchPlan = {
|
||||
payloadCount: number;
|
||||
@@ -85,82 +80,23 @@ export interface QueuedDelivery extends QueuedDeliveryPayload {
|
||||
recoveryState?: "send_attempt_started" | "unknown_after_send";
|
||||
}
|
||||
|
||||
export function resolveQueueDir(stateDir?: string): string {
|
||||
const base = stateDir ?? resolveStateDir();
|
||||
return path.join(base, QUEUE_DIRNAME);
|
||||
}
|
||||
|
||||
function resolveFailedDir(stateDir?: string): string {
|
||||
return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME);
|
||||
}
|
||||
|
||||
function resolveQueueEntryPaths(
|
||||
id: string,
|
||||
stateDir?: string,
|
||||
): {
|
||||
jsonPath: string;
|
||||
deliveredPath: string;
|
||||
} {
|
||||
return resolveJsonDurableQueueEntryPaths(resolveQueueDir(stateDir), id);
|
||||
}
|
||||
|
||||
async function writeQueueEntry(filePath: string, entry: QueuedDelivery): Promise<void> {
|
||||
await writeJsonDurableQueueEntry({
|
||||
filePath,
|
||||
entry,
|
||||
tempPrefix: QUEUE_TEMP_PREFIX,
|
||||
});
|
||||
}
|
||||
|
||||
async function readQueueEntry(filePath: string): Promise<QueuedDelivery> {
|
||||
return await readJsonDurableQueueEntry<QueuedDelivery>(filePath);
|
||||
}
|
||||
|
||||
function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): {
|
||||
entry: QueuedDelivery;
|
||||
migrated: boolean;
|
||||
} {
|
||||
const hasAttemptTimestamp =
|
||||
typeof entry.lastAttemptAt === "number" &&
|
||||
Number.isFinite(entry.lastAttemptAt) &&
|
||||
entry.lastAttemptAt > 0;
|
||||
if (hasAttemptTimestamp || entry.retryCount <= 0) {
|
||||
return { entry, migrated: false };
|
||||
}
|
||||
const hasEnqueuedTimestamp =
|
||||
typeof entry.enqueuedAt === "number" &&
|
||||
Number.isFinite(entry.enqueuedAt) &&
|
||||
entry.enqueuedAt > 0;
|
||||
if (!hasEnqueuedTimestamp) {
|
||||
return { entry, migrated: false };
|
||||
}
|
||||
function queuedDeliveryMetadata(entry: QueuedDelivery): DeliveryQueueRowMetadata {
|
||||
return {
|
||||
entry: {
|
||||
...entry,
|
||||
lastAttemptAt: entry.enqueuedAt,
|
||||
},
|
||||
migrated: true,
|
||||
entryKind: "outbound",
|
||||
sessionKey: entry.session?.key,
|
||||
channel: entry.channel,
|
||||
target: entry.to,
|
||||
accountId: entry.accountId,
|
||||
};
|
||||
}
|
||||
|
||||
/** Ensure the queue directory (and failed/ subdirectory) exist. */
|
||||
export async function ensureQueueDir(stateDir?: string): Promise<string> {
|
||||
const queueDir = resolveQueueDir(stateDir);
|
||||
await ensureJsonDurableQueueDirs({
|
||||
queueDir,
|
||||
failedDir: resolveFailedDir(stateDir),
|
||||
});
|
||||
return queueDir;
|
||||
}
|
||||
|
||||
/** Persist a delivery entry to disk before attempting send. Returns the entry ID. */
|
||||
/** Persist a delivery entry before attempting send. Returns the entry ID. */
|
||||
export async function enqueueDelivery(
|
||||
params: QueuedDeliveryPayload,
|
||||
stateDir?: string,
|
||||
): Promise<string> {
|
||||
const queueDir = await ensureQueueDir(stateDir);
|
||||
const id = generateSecureUuid();
|
||||
await writeQueueEntry(path.join(queueDir, `${id}.json`), {
|
||||
const entry: QueuedDelivery = {
|
||||
id,
|
||||
enqueuedAt: Date.now(),
|
||||
channel: params.channel,
|
||||
@@ -182,53 +118,59 @@ export async function enqueueDelivery(
|
||||
session: params.session,
|
||||
gatewayClientScopes: params.gatewayClientScopes,
|
||||
retryCount: 0,
|
||||
};
|
||||
upsertDeliveryQueueEntry({
|
||||
queueName: QUEUE_NAME,
|
||||
entry,
|
||||
metadata: queuedDeliveryMetadata(entry),
|
||||
stateDir,
|
||||
});
|
||||
return id;
|
||||
}
|
||||
|
||||
/** Remove a successfully delivered entry from the queue.
|
||||
*
|
||||
* Uses a two-phase approach so that a crash between delivery and cleanup
|
||||
* does not cause the message to be replayed on the next recovery scan:
|
||||
* Phase 1: atomic rename {id}.json → {id}.delivered
|
||||
* Phase 2: unlink the .delivered marker
|
||||
* If the process dies between phase 1 and phase 2 the marker is cleaned up
|
||||
* by {@link loadPendingDeliveries} on the next startup without re-sending.
|
||||
*/
|
||||
/** Remove a successfully delivered entry from the queue. */
|
||||
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
|
||||
await ackJsonDurableQueueEntry(resolveQueueEntryPaths(id, stateDir));
|
||||
deleteDeliveryQueueEntry(QUEUE_NAME, id, stateDir);
|
||||
}
|
||||
|
||||
/** Update a queue entry after a failed delivery attempt. */
|
||||
export async function failDelivery(id: string, error: string, stateDir?: string): Promise<void> {
|
||||
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
|
||||
const entry = await readQueueEntry(filePath);
|
||||
entry.retryCount += 1;
|
||||
entry.lastAttemptAt = Date.now();
|
||||
entry.lastError = error;
|
||||
await writeQueueEntry(filePath, entry);
|
||||
updateQueuedDelivery(id, stateDir, (entry) => ({
|
||||
...entry,
|
||||
retryCount: entry.retryCount + 1,
|
||||
lastAttemptAt: Date.now(),
|
||||
lastError: error,
|
||||
}));
|
||||
}
|
||||
|
||||
function updateQueuedDelivery(
|
||||
id: string,
|
||||
stateDir: string | undefined,
|
||||
update: (entry: QueuedDelivery) => QueuedDelivery,
|
||||
): void {
|
||||
updateDeliveryQueueEntry(QUEUE_NAME, id, stateDir, (entry) => update(entry as QueuedDelivery));
|
||||
}
|
||||
|
||||
export async function markDeliveryPlatformSendAttemptStarted(
|
||||
id: string,
|
||||
stateDir?: string,
|
||||
): Promise<void> {
|
||||
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
|
||||
const entry = await readQueueEntry(filePath);
|
||||
entry.platformSendStartedAt = entry.platformSendStartedAt ?? Date.now();
|
||||
entry.recoveryState = "send_attempt_started";
|
||||
await writeQueueEntry(filePath, entry);
|
||||
updateQueuedDelivery(id, stateDir, (entry) => ({
|
||||
...entry,
|
||||
platformSendStartedAt: entry.platformSendStartedAt ?? Date.now(),
|
||||
recoveryState: "send_attempt_started",
|
||||
}));
|
||||
}
|
||||
|
||||
export async function markDeliveryPlatformOutcomeUnknown(
|
||||
id: string,
|
||||
stateDir?: string,
|
||||
): Promise<void> {
|
||||
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
|
||||
const entry = await readQueueEntry(filePath);
|
||||
entry.platformSendStartedAt = entry.platformSendStartedAt ?? Date.now();
|
||||
entry.recoveryState = "unknown_after_send";
|
||||
await writeQueueEntry(filePath, entry);
|
||||
updateQueuedDelivery(id, stateDir, (entry) => ({
|
||||
...entry,
|
||||
platformSendStartedAt: entry.platformSendStartedAt ?? Date.now(),
|
||||
recoveryState: "unknown_after_send",
|
||||
}));
|
||||
}
|
||||
|
||||
/** Load a single pending delivery entry by ID from the queue directory. */
|
||||
@@ -236,28 +178,15 @@ export async function loadPendingDelivery(
|
||||
id: string,
|
||||
stateDir?: string,
|
||||
): Promise<QueuedDelivery | null> {
|
||||
return await loadJsonDurableQueueEntry({
|
||||
paths: resolveQueueEntryPaths(id, stateDir),
|
||||
tempPrefix: QUEUE_TEMP_PREFIX,
|
||||
read: async (entry) => normalizeLegacyQueuedDeliveryEntry(entry),
|
||||
});
|
||||
return loadDeliveryQueueEntry(QUEUE_NAME, id, stateDir) as QueuedDelivery | null;
|
||||
}
|
||||
|
||||
/** Load all pending delivery entries from the queue directory. */
|
||||
/** Load all pending delivery entries from the queue. */
|
||||
export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDelivery[]> {
|
||||
const queueDir = resolveQueueDir(stateDir);
|
||||
return await loadPendingJsonDurableQueueEntries({
|
||||
queueDir,
|
||||
tempPrefix: QUEUE_TEMP_PREFIX,
|
||||
read: async (entry) => normalizeLegacyQueuedDeliveryEntry(entry),
|
||||
});
|
||||
return loadDeliveryQueueEntries(QUEUE_NAME, stateDir) as QueuedDelivery[];
|
||||
}
|
||||
|
||||
/** Move a queue entry to the failed/ subdirectory. */
|
||||
/** Move a queue entry out of the pending retry set. */
|
||||
export async function moveToFailed(id: string, stateDir?: string): Promise<void> {
|
||||
await moveJsonDurableQueueEntryToFailed({
|
||||
queueDir: resolveQueueDir(stateDir),
|
||||
failedDir: resolveFailedDir(stateDir),
|
||||
id,
|
||||
});
|
||||
moveDeliveryQueueEntryToFailed(QUEUE_NAME, id, stateDir);
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { openOpenClawStateDatabase } from "../../state/openclaw-state-db.js";
|
||||
import {
|
||||
type DeliverFn,
|
||||
drainPendingDeliveries,
|
||||
enqueueDelivery,
|
||||
failDelivery,
|
||||
loadPendingDeliveries,
|
||||
MAX_RETRIES,
|
||||
markDeliveryPlatformOutcomeUnknown,
|
||||
type RecoveryLogger,
|
||||
@@ -16,6 +16,8 @@ import {
|
||||
import {
|
||||
createRecoveryLog,
|
||||
installDeliveryQueueTmpDirHooks,
|
||||
readQueuedEntry,
|
||||
setQueuedEntryState,
|
||||
} from "./delivery-queue.test-helpers.js";
|
||||
|
||||
const stubCfg = {} as OpenClawConfig;
|
||||
@@ -58,6 +60,16 @@ function expectLogMessageWith(logFn: ReturnType<typeof vi.fn>, text: string): vo
|
||||
expect(logFn.mock.calls.map(([message]) => String(message)).join("\n")).toContain(text);
|
||||
}
|
||||
|
||||
function readOutboundQueueStatus(tmpDir: string, id: string): string | undefined {
|
||||
const { db } = openOpenClawStateDatabase({
|
||||
env: { ...process.env, OPENCLAW_STATE_DIR: tmpDir },
|
||||
});
|
||||
const row = db
|
||||
.prepare("SELECT status FROM delivery_queue_entries WHERE queue_name = 'outbound' AND id = ?")
|
||||
.get(id) as { status?: string } | undefined;
|
||||
return row?.status;
|
||||
}
|
||||
|
||||
async function drainDirectChatReconnectPending(opts: {
|
||||
accountId: string;
|
||||
deliver: DeliverFn;
|
||||
@@ -159,26 +171,16 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
const deliver = createTransientFailureDeliver();
|
||||
|
||||
const id = await enqueueFailedDirectChatDelivery({ accountId: "acct1", stateDir: tmpDir });
|
||||
const queueDir = path.join(tmpDir, "delivery-queue");
|
||||
const filePath = path.join(queueDir, `${id}.json`);
|
||||
const before = JSON.parse(fs.readFileSync(filePath, "utf-8")) as {
|
||||
retryCount: number;
|
||||
lastAttemptAt?: number;
|
||||
lastError?: string;
|
||||
};
|
||||
const before = readQueuedEntry(tmpDir, id);
|
||||
|
||||
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
|
||||
const after = JSON.parse(fs.readFileSync(filePath, "utf-8")) as {
|
||||
retryCount: number;
|
||||
lastAttemptAt?: number;
|
||||
lastError?: string;
|
||||
};
|
||||
expect(after.retryCount).toBe(before.retryCount + 1);
|
||||
const after = readQueuedEntry(tmpDir, id);
|
||||
expect(after.retryCount).toBe(Number(before.retryCount) + 1);
|
||||
expect(after.lastAttemptAt).toBeTypeOf("number");
|
||||
expect(after.lastAttemptAt).toBeGreaterThanOrEqual(before.lastAttemptAt ?? 0);
|
||||
expect(after.lastAttemptAt).toBeGreaterThanOrEqual(Number(before.lastAttemptAt ?? 0));
|
||||
expect(after.lastError).toBe("transient failure");
|
||||
});
|
||||
|
||||
@@ -202,8 +204,8 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(fs.existsSync(path.join(tmpDir, "delivery-queue", `${id}.json`))).toBe(false);
|
||||
expect(fs.existsSync(path.join(tmpDir, "delivery-queue", "failed", `${id}.json`))).toBe(true);
|
||||
expect(await loadPendingDeliveries(tmpDir)).toHaveLength(0);
|
||||
expect(readOutboundQueueStatus(tmpDir, id)).toBe("failed");
|
||||
expectLogMessageWith(log.warn, "refusing blind replay without adapter reconciliation");
|
||||
});
|
||||
|
||||
@@ -225,9 +227,8 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
|
||||
// Should have moved to failed, not delivered
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
const failedDir = path.join(tmpDir, "delivery-queue", "failed");
|
||||
const failedFiles = fs.readdirSync(failedDir).filter((f) => f.endsWith(".json"));
|
||||
expect(failedFiles).toHaveLength(1);
|
||||
expect(await loadPendingDeliveries(tmpDir)).toHaveLength(0);
|
||||
expect(readOutboundQueueStatus(tmpDir, id)).toBe("failed");
|
||||
});
|
||||
|
||||
it("second concurrent call is skipped (concurrency guard)", async () => {
|
||||
@@ -240,22 +241,11 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
await deliverPromise;
|
||||
});
|
||||
|
||||
await enqueueDelivery(
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
// Fail it so it matches the "no listener" filter
|
||||
const pending = fs
|
||||
.readdirSync(path.join(tmpDir, "delivery-queue"))
|
||||
.find((f) => f.endsWith(".json"));
|
||||
if (!pending) {
|
||||
throw new Error("Missing pending delivery entry");
|
||||
}
|
||||
const entryPath = path.join(tmpDir, "delivery-queue", pending);
|
||||
const entry = JSON.parse(fs.readFileSync(entryPath, "utf-8"));
|
||||
entry.lastError = NO_LISTENER_ERROR;
|
||||
entry.retryCount = 1;
|
||||
fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2));
|
||||
setQueuedEntryState(tmpDir, id, { retryCount: 0, lastError: NO_LISTENER_ERROR });
|
||||
|
||||
const opts = { accountId: "acct1", log, stateDir: tmpDir, deliver };
|
||||
|
||||
@@ -287,19 +277,7 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
{ channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
const queuePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
||||
const entry = JSON.parse(fs.readFileSync(queuePath, "utf-8")) as {
|
||||
id: string;
|
||||
enqueuedAt: number;
|
||||
channel: string;
|
||||
to: string;
|
||||
accountId?: string;
|
||||
payloads: Array<{ text: string }>;
|
||||
retryCount: number;
|
||||
lastError?: string;
|
||||
};
|
||||
entry.lastError = NO_LISTENER_ERROR;
|
||||
fs.writeFileSync(queuePath, JSON.stringify(entry, null, 2));
|
||||
setQueuedEntryState(tmpDir, id, { retryCount: 0, lastError: NO_LISTENER_ERROR });
|
||||
|
||||
const startupRecovery = recoverPendingDeliveries({
|
||||
cfg: stubCfg,
|
||||
@@ -344,19 +322,8 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
{ channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
const queueDir = path.join(tmpDir, "delivery-queue");
|
||||
const blockerPath = path.join(queueDir, `${blockerId}.json`);
|
||||
const directChatPath = path.join(queueDir, `${directChatId}.json`);
|
||||
const blockerEntry = JSON.parse(fs.readFileSync(blockerPath, "utf-8")) as {
|
||||
enqueuedAt: number;
|
||||
};
|
||||
const directChatEntry = JSON.parse(fs.readFileSync(directChatPath, "utf-8")) as {
|
||||
enqueuedAt: number;
|
||||
};
|
||||
blockerEntry.enqueuedAt = 1;
|
||||
directChatEntry.enqueuedAt = 2;
|
||||
fs.writeFileSync(blockerPath, JSON.stringify(blockerEntry, null, 2));
|
||||
fs.writeFileSync(directChatPath, JSON.stringify(directChatEntry, null, 2));
|
||||
setQueuedEntryState(tmpDir, blockerId, { retryCount: 0, enqueuedAt: 1 });
|
||||
setQueuedEntryState(tmpDir, directChatId, { retryCount: 0, enqueuedAt: 2 });
|
||||
|
||||
const startupRecovery = recoverPendingDeliveries({
|
||||
cfg: stubCfg,
|
||||
@@ -395,9 +362,7 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
fs.readdirSync(path.join(tmpDir, "delivery-queue")).filter((f) => f.endsWith(".json")),
|
||||
).toStrictEqual([]);
|
||||
expect(await loadPendingDeliveries(tmpDir)).toStrictEqual([]);
|
||||
});
|
||||
|
||||
it("drains backoff-eligible retries on reconnect", async () => {
|
||||
@@ -409,12 +374,10 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
tmpDir,
|
||||
);
|
||||
await failDelivery(id, "network down", tmpDir);
|
||||
const entryPath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
||||
const entry = JSON.parse(fs.readFileSync(entryPath, "utf-8")) as {
|
||||
lastAttemptAt?: number;
|
||||
};
|
||||
entry.lastAttemptAt = Date.now() - 30_000;
|
||||
fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2));
|
||||
setQueuedEntryState(tmpDir, id, {
|
||||
retryCount: 1,
|
||||
lastAttemptAt: Date.now() - 30_000,
|
||||
});
|
||||
|
||||
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
|
||||
|
||||
@@ -466,7 +429,6 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
const log = createRecoveryLog();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
const id = await enqueueFailedDirectChatDelivery({ accountId: "acct1", stateDir: tmpDir });
|
||||
const entryPath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
||||
let mutated = false;
|
||||
|
||||
await drainPendingDeliveries({
|
||||
@@ -479,11 +441,11 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
selectEntry: (entry) => {
|
||||
if (entry.id === id && !mutated) {
|
||||
mutated = true;
|
||||
const nextEntry = JSON.parse(fs.readFileSync(entryPath, "utf-8")) as {
|
||||
lastError?: string;
|
||||
};
|
||||
nextEntry.lastError = "network down";
|
||||
fs.writeFileSync(entryPath, JSON.stringify(nextEntry, null, 2));
|
||||
setQueuedEntryState(tmpDir, id, {
|
||||
retryCount: entry.retryCount,
|
||||
lastAttemptAt: entry.lastAttemptAt,
|
||||
lastError: "network down",
|
||||
});
|
||||
}
|
||||
return {
|
||||
match:
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { MAX_DATE_TIMESTAMP_MS } from "@openclaw/normalization-core/number-coercion";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { openOpenClawStateDatabase } from "../../state/openclaw-state-db.js";
|
||||
import { attachOutboundDeliveryCommitHook } from "./delivery-commit-hooks.js";
|
||||
import {
|
||||
enqueueDelivery,
|
||||
@@ -36,6 +35,16 @@ function expectMockMessageContaining(mock: { mock: { calls: unknown[][] } }, exp
|
||||
expect(messages.join("\n")).toContain(expected);
|
||||
}
|
||||
|
||||
function readOutboundQueueStatus(tmpDir: string, id: string): string | undefined {
|
||||
const { db } = openOpenClawStateDatabase({
|
||||
env: { ...process.env, OPENCLAW_STATE_DIR: tmpDir },
|
||||
});
|
||||
const row = db
|
||||
.prepare("SELECT status FROM delivery_queue_entries WHERE queue_name = 'outbound' AND id = ?")
|
||||
.get(id) as { status?: string } | undefined;
|
||||
return row?.status;
|
||||
}
|
||||
|
||||
describe("delivery-queue recovery", () => {
|
||||
const { tmpDir } = installDeliveryQueueTmpDirHooks();
|
||||
const baseCfg = {};
|
||||
@@ -103,7 +112,7 @@ describe("delivery-queue recovery", () => {
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(result.skippedMaxRetries).toBe(1);
|
||||
expect(result.deferredBackoff).toBe(0);
|
||||
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true);
|
||||
expect(readOutboundQueueStatus(tmpDir(), id)).toBe("failed");
|
||||
});
|
||||
|
||||
it("increments retryCount on failed recovery attempt", async () => {
|
||||
@@ -147,7 +156,7 @@ describe("delivery-queue recovery", () => {
|
||||
deferredBackoff: 0,
|
||||
});
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true);
|
||||
expect(readOutboundQueueStatus(tmpDir(), id)).toBe("failed");
|
||||
expectMockMessageContaining(log.warn, "unknown_after_send");
|
||||
});
|
||||
|
||||
@@ -174,7 +183,7 @@ describe("delivery-queue recovery", () => {
|
||||
deferredBackoff: 0,
|
||||
});
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true);
|
||||
expect(readOutboundQueueStatus(tmpDir(), id)).toBe("failed");
|
||||
expectMockMessageContaining(log.warn, "refusing blind replay without adapter reconciliation");
|
||||
});
|
||||
|
||||
@@ -264,120 +273,57 @@ describe("delivery-queue recovery", () => {
|
||||
},
|
||||
});
|
||||
|
||||
const rename = fs.promises.rename.bind(fs.promises);
|
||||
const renameSpy = vi.spyOn(fs.promises, "rename").mockImplementation(async (...args) => {
|
||||
order.push("ack");
|
||||
return await rename(...args);
|
||||
const deliver = vi.fn().mockResolvedValue([]);
|
||||
const { result } = await runRecovery({ deliver });
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(result).toEqual({
|
||||
recovered: 1,
|
||||
failed: 0,
|
||||
skippedMaxRetries: 0,
|
||||
deferredBackoff: 0,
|
||||
});
|
||||
const reconcileInput = mockCallArg(reconcileUnknownSend) as {
|
||||
cfg?: unknown;
|
||||
queueId?: string;
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
payloads?: unknown;
|
||||
replyToId?: string;
|
||||
threadId?: string;
|
||||
silent?: boolean;
|
||||
retryCount?: number;
|
||||
};
|
||||
expect(reconcileInput.cfg).toBe(baseCfg);
|
||||
expect(reconcileInput.queueId).toBe(id);
|
||||
expect(reconcileInput.channel).toBe("demo-channel-a");
|
||||
expect(reconcileInput.to).toBe("+1");
|
||||
expect(reconcileInput.accountId).toBe("acct-1");
|
||||
expect(reconcileInput.payloads).toEqual([{ text: "maybe sent" }]);
|
||||
expect(reconcileInput.replyToId).toBe("root-message");
|
||||
expect(reconcileInput.threadId).toBe("thread-1");
|
||||
expect(reconcileInput.silent).toBe(true);
|
||||
expect(reconcileInput.retryCount).toBe(0);
|
||||
|
||||
try {
|
||||
const deliver = vi.fn().mockResolvedValue([]);
|
||||
const { result } = await runRecovery({ deliver });
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(result).toEqual({
|
||||
recovered: 1,
|
||||
failed: 0,
|
||||
skippedMaxRetries: 0,
|
||||
deferredBackoff: 0,
|
||||
});
|
||||
const reconcileInput = mockCallArg(reconcileUnknownSend) as {
|
||||
cfg?: unknown;
|
||||
queueId?: string;
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
payloads?: unknown;
|
||||
replyToId?: string;
|
||||
threadId?: string;
|
||||
silent?: boolean;
|
||||
retryCount?: number;
|
||||
};
|
||||
expect(reconcileInput.cfg).toBe(baseCfg);
|
||||
expect(reconcileInput.queueId).toBe(id);
|
||||
expect(reconcileInput.channel).toBe("demo-channel-a");
|
||||
expect(reconcileInput.to).toBe("+1");
|
||||
expect(reconcileInput.accountId).toBe("acct-1");
|
||||
expect(reconcileInput.payloads).toEqual([{ text: "maybe sent" }]);
|
||||
expect(reconcileInput.replyToId).toBe("root-message");
|
||||
expect(reconcileInput.threadId).toBe("thread-1");
|
||||
expect(reconcileInput.silent).toBe(true);
|
||||
expect(reconcileInput.retryCount).toBe(0);
|
||||
|
||||
const afterCommitInput = mockCallArg(afterCommit) as {
|
||||
kind?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
replyToId?: string;
|
||||
threadId?: string;
|
||||
silent?: boolean;
|
||||
result?: { messageId?: string };
|
||||
};
|
||||
expect(afterCommitInput.kind).toBe("text");
|
||||
expect(afterCommitInput.to).toBe("+1");
|
||||
expect(afterCommitInput.accountId).toBe("acct-1");
|
||||
expect(afterCommitInput.replyToId).toBe("root-message");
|
||||
expect(afterCommitInput.threadId).toBe("thread-1");
|
||||
expect(afterCommitInput.silent).toBe(true);
|
||||
expect(afterCommitInput.result?.messageId).toBe("platform-1");
|
||||
expect(order).toEqual(["ack", "afterCommit"]);
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
} finally {
|
||||
renameSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("records retry state when acking a reconciled sent entry fails", async () => {
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "maybe sent" }] },
|
||||
tmpDir(),
|
||||
);
|
||||
setQueuedEntryState(tmpDir(), id, {
|
||||
retryCount: 0,
|
||||
platformSendStartedAt: Date.now(),
|
||||
recoveryState: "unknown_after_send",
|
||||
});
|
||||
resolveOutboundChannelMessageAdapterMock.mockReturnValue({
|
||||
durableFinal: {
|
||||
capabilities: { reconcileUnknownSend: true },
|
||||
reconcileUnknownSend: vi.fn().mockResolvedValue({
|
||||
status: "sent",
|
||||
messageId: "platform-1",
|
||||
receipt: {
|
||||
primaryPlatformMessageId: "platform-1",
|
||||
platformMessageIds: ["platform-1"],
|
||||
parts: [{ platformMessageId: "platform-1", kind: "text", index: 0 }],
|
||||
sentAt: 1,
|
||||
},
|
||||
}),
|
||||
},
|
||||
});
|
||||
const renameSpy = vi
|
||||
.spyOn(fs.promises, "rename")
|
||||
.mockRejectedValueOnce(Object.assign(new Error("ack denied"), { code: "EACCES" }));
|
||||
|
||||
try {
|
||||
const deliver = vi.fn().mockResolvedValue([]);
|
||||
const log = createRecoveryLog();
|
||||
const { result } = await runRecovery({ deliver, log });
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(result).toEqual({
|
||||
recovered: 0,
|
||||
failed: 1,
|
||||
skippedMaxRetries: 0,
|
||||
deferredBackoff: 0,
|
||||
});
|
||||
const entries = await loadPendingDeliveries(tmpDir());
|
||||
expect(entries).toHaveLength(1);
|
||||
expect(entries[0]?.id).toBe(id);
|
||||
expect(entries[0]?.retryCount).toBe(1);
|
||||
expect(entries[0]?.lastError).toContain("failed to ack reconciled sent delivery");
|
||||
expect(entries[0]?.lastError).toContain("ack denied");
|
||||
expectMockMessageContaining(log.warn, "failed to ack reconciled sent delivery");
|
||||
} finally {
|
||||
renameSpy.mockRestore();
|
||||
}
|
||||
const afterCommitInput = mockCallArg(afterCommit) as {
|
||||
kind?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
replyToId?: string;
|
||||
threadId?: string;
|
||||
silent?: boolean;
|
||||
result?: { messageId?: string };
|
||||
};
|
||||
expect(afterCommitInput.kind).toBe("text");
|
||||
expect(afterCommitInput.to).toBe("+1");
|
||||
expect(afterCommitInput.accountId).toBe("acct-1");
|
||||
expect(afterCommitInput.replyToId).toBe("root-message");
|
||||
expect(afterCommitInput.threadId).toBe("thread-1");
|
||||
expect(afterCommitInput.silent).toBe(true);
|
||||
expect(afterCommitInput.result?.messageId).toBe("platform-1");
|
||||
expect(order).toEqual(["afterCommit"]);
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("replays unknown-after-send entries only after adapter proves they were not sent", async () => {
|
||||
@@ -477,7 +423,7 @@ describe("delivery-queue recovery", () => {
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(result.failed).toBe(1);
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true);
|
||||
expect(readOutboundQueueStatus(tmpDir(), id)).toBe("failed");
|
||||
expectMockMessageContaining(log.warn, "refusing blind replay without adapter reconciliation");
|
||||
});
|
||||
|
||||
@@ -495,7 +441,7 @@ describe("delivery-queue recovery", () => {
|
||||
expect(result.failed).toBe(1);
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true);
|
||||
expect(readOutboundQueueStatus(tmpDir(), id)).toBe("failed");
|
||||
expectMockMessageContaining(log.warn, "permanent error");
|
||||
});
|
||||
|
||||
@@ -517,7 +463,7 @@ describe("delivery-queue recovery", () => {
|
||||
expect(result.failed).toBe(1);
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true);
|
||||
expect(readOutboundQueueStatus(tmpDir(), id)).toBe("failed");
|
||||
expectMockMessageContaining(log.warn, "permanent error");
|
||||
});
|
||||
|
||||
@@ -558,7 +504,7 @@ describe("delivery-queue recovery", () => {
|
||||
deferredBackoff: 0,
|
||||
});
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true);
|
||||
expect(readOutboundQueueStatus(tmpDir(), id)).toBe("failed");
|
||||
expectMockMessageContaining(log.warn, "refusing blind replay without adapter reconciliation");
|
||||
});
|
||||
|
||||
@@ -571,10 +517,9 @@ describe("delivery-queue recovery", () => {
|
||||
const result = attachOutboundDeliveryCommitHook(
|
||||
{ channel: "demo-channel-a", messageId: "m1" },
|
||||
async () => {
|
||||
const pending = await loadPendingDeliveries(tmpDir());
|
||||
order.push(
|
||||
fs.existsSync(path.join(tmpDir(), "delivery-queue", "pending", `${id}.json`))
|
||||
? "commit-before-ack"
|
||||
: "commit-after-ack",
|
||||
pending.some((entry) => entry.id === id) ? "commit-before-ack" : "commit-after-ack",
|
||||
);
|
||||
},
|
||||
);
|
||||
@@ -587,9 +532,7 @@ describe("delivery-queue recovery", () => {
|
||||
|
||||
expect(order).toEqual(["deliver", "commit-after-ack"]);
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "pending", `${id}.json`))).toBe(
|
||||
false,
|
||||
);
|
||||
expect(readOutboundQueueStatus(tmpDir(), id)).toBeUndefined();
|
||||
});
|
||||
|
||||
it("replays stored delivery options during recovery", async () => {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { openOpenClawStateDatabase } from "../../state/openclaw-state-db.js";
|
||||
import {
|
||||
ackDelivery,
|
||||
enqueueDelivery,
|
||||
@@ -14,11 +14,19 @@ import { installDeliveryQueueTmpDirHooks, readQueuedEntry } from "./delivery-que
|
||||
|
||||
describe("delivery-queue storage", () => {
|
||||
const { tmpDir } = installDeliveryQueueTmpDirHooks();
|
||||
const queueDir = () => path.join(tmpDir(), "delivery-queue");
|
||||
const queueJsonFiles = () => fs.readdirSync(queueDir()).filter((file) => file.endsWith(".json"));
|
||||
const enqueueTextDelivery = (params: Parameters<typeof enqueueDelivery>[0], rootDir = tmpDir()) =>
|
||||
enqueueDelivery(params, rootDir);
|
||||
|
||||
function readStatus(id: string): string | undefined {
|
||||
const { db } = openOpenClawStateDatabase({
|
||||
env: { ...process.env, OPENCLAW_STATE_DIR: tmpDir() },
|
||||
});
|
||||
const row = db
|
||||
.prepare("SELECT status FROM delivery_queue_entries WHERE queue_name = 'outbound' AND id = ?")
|
||||
.get(id) as { status?: string } | undefined;
|
||||
return row?.status;
|
||||
}
|
||||
|
||||
describe("enqueue + ack lifecycle", () => {
|
||||
it("creates and removes a queue entry", async () => {
|
||||
const id = await enqueueTextDelivery(
|
||||
@@ -54,9 +62,6 @@ describe("delivery-queue storage", () => {
|
||||
},
|
||||
tmpDir(),
|
||||
);
|
||||
|
||||
expect(queueJsonFiles()).toEqual([`${id}.json`]);
|
||||
|
||||
const entry = readQueuedEntry(tmpDir(), id);
|
||||
expect(entry.id).toBe(id);
|
||||
expect(entry.channel).toBe("directchat");
|
||||
@@ -90,47 +95,24 @@ describe("delivery-queue storage", () => {
|
||||
expect(entry.payloads).toEqual([{ text: "hello" }]);
|
||||
|
||||
await ackDelivery(id, tmpDir());
|
||||
expect(queueJsonFiles()).toHaveLength(0);
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("ack is idempotent (no error on missing file)", async () => {
|
||||
await expect(ackDelivery("nonexistent-id", tmpDir())).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it.each([
|
||||
{
|
||||
name: "ack cleans up leftover .delivered marker when .json is already gone",
|
||||
payload: { channel: "directchat", to: "+1", payloads: [{ text: "stale-marker" }] },
|
||||
prepareDeliveredMarker: true,
|
||||
action: (id: string) => ackDelivery(id, tmpDir()),
|
||||
},
|
||||
{
|
||||
name: "ack removes .delivered marker so recovery does not replay",
|
||||
payload: { channel: "directchat", to: "+1", payloads: [{ text: "ack-test" }] },
|
||||
action: (id: string) => ackDelivery(id, tmpDir()),
|
||||
},
|
||||
{
|
||||
name: "loadPendingDeliveries cleans up stale .delivered markers without replaying",
|
||||
payload: { channel: "forum", to: "99", payloads: [{ text: "stale" }] },
|
||||
prepareDeliveredMarker: true,
|
||||
action: () => loadPendingDeliveries(tmpDir()),
|
||||
expectedEntriesLength: 0,
|
||||
},
|
||||
])("$name", async ({ payload, prepareDeliveredMarker, action, expectedEntriesLength }) => {
|
||||
const id = await enqueueTextDelivery(payload);
|
||||
const deliveredPath = path.join(queueDir(), `${id}.delivered`);
|
||||
it("removes acked entries from pending recovery", async () => {
|
||||
const id = await enqueueTextDelivery({
|
||||
channel: "directchat",
|
||||
to: "+1",
|
||||
payloads: [{ text: "ack-test" }],
|
||||
});
|
||||
|
||||
if (prepareDeliveredMarker) {
|
||||
fs.renameSync(path.join(queueDir(), `${id}.json`), deliveredPath);
|
||||
}
|
||||
await ackDelivery(id, tmpDir());
|
||||
|
||||
const entries = await action(id);
|
||||
|
||||
if (expectedEntriesLength !== undefined) {
|
||||
expect(entries).toHaveLength(expectedEntriesLength);
|
||||
}
|
||||
expect(fs.existsSync(deliveredPath)).toBe(false);
|
||||
expect(fs.existsSync(path.join(queueDir(), `${id}.json`))).toBe(false);
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
expect(readStatus(id)).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -207,14 +189,29 @@ describe("delivery-queue storage", () => {
|
||||
|
||||
await moveToFailed(id, tmpDir());
|
||||
|
||||
const failedDir = path.join(queueDir(), "failed");
|
||||
expect(fs.existsSync(path.join(queueDir(), `${id}.json`))).toBe(false);
|
||||
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
|
||||
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
|
||||
expect(readStatus(id)).toBe("failed");
|
||||
});
|
||||
|
||||
it("does not remove failed entries when a stale ack arrives", async () => {
|
||||
const id = await enqueueTextDelivery(
|
||||
{
|
||||
channel: "workspace",
|
||||
to: "#general",
|
||||
payloads: [{ text: "hi" }],
|
||||
},
|
||||
tmpDir(),
|
||||
);
|
||||
|
||||
await moveToFailed(id, tmpDir());
|
||||
await ackDelivery(id, tmpDir());
|
||||
|
||||
expect(readStatus(id)).toBe("failed");
|
||||
});
|
||||
});
|
||||
|
||||
describe("loadPendingDeliveries", () => {
|
||||
it("returns empty array when queue directory does not exist", async () => {
|
||||
it("returns empty array for an empty state database", async () => {
|
||||
expect(await loadPendingDeliveries(path.join(tmpDir(), "no-such-dir"))).toStrictEqual([]);
|
||||
});
|
||||
|
||||
@@ -270,25 +267,5 @@ describe("delivery-queue storage", () => {
|
||||
requesterSenderE164: "+15551234567",
|
||||
});
|
||||
});
|
||||
|
||||
it("backfills lastAttemptAt for legacy retry entries during load", async () => {
|
||||
const id = await enqueueTextDelivery({
|
||||
channel: "directchat",
|
||||
to: "+1",
|
||||
payloads: [{ text: "legacy" }],
|
||||
});
|
||||
const filePath = path.join(queueDir(), `${id}.json`);
|
||||
const legacyEntry = readQueuedEntry(tmpDir(), id);
|
||||
legacyEntry.retryCount = 2;
|
||||
delete legacyEntry.lastAttemptAt;
|
||||
fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8");
|
||||
|
||||
const entries = await loadPendingDeliveries(tmpDir());
|
||||
expect(entries).toHaveLength(1);
|
||||
expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt);
|
||||
|
||||
const persisted = readQueuedEntry(tmpDir(), id);
|
||||
expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, beforeEach, vi } from "vitest";
|
||||
import { openOpenClawStateDatabase } from "../../state/openclaw-state-db.js";
|
||||
import { resolvePreferredOpenClawTmpDir } from "../tmp-openclaw-dir.js";
|
||||
import type { DeliverFn, RecoveryLogger } from "./delivery-queue.js";
|
||||
|
||||
@@ -32,9 +33,31 @@ export function installDeliveryQueueTmpDirHooks(): { readonly tmpDir: () => stri
|
||||
}
|
||||
|
||||
export function readQueuedEntry(tmpDir: string, id: string): Record<string, unknown> {
|
||||
return JSON.parse(
|
||||
fs.readFileSync(path.join(tmpDir, "delivery-queue", `${id}.json`), "utf-8"),
|
||||
) as Record<string, unknown>;
|
||||
const { db } = openOpenClawStateDatabase({ env: { ...process.env, OPENCLAW_STATE_DIR: tmpDir } });
|
||||
const row = db
|
||||
.prepare(
|
||||
"SELECT entry_json FROM delivery_queue_entries WHERE queue_name = 'outbound' AND id = ?",
|
||||
)
|
||||
.get(id) as { entry_json?: string } | undefined;
|
||||
if (!row?.entry_json) {
|
||||
throw new Error(`Missing queued entry ${id}`);
|
||||
}
|
||||
return JSON.parse(row.entry_json) as Record<string, unknown>;
|
||||
}
|
||||
|
||||
export function readQueuedEntries(tmpDir: string): Record<string, unknown>[] {
|
||||
const { db } = openOpenClawStateDatabase({ env: { ...process.env, OPENCLAW_STATE_DIR: tmpDir } });
|
||||
const rows = db
|
||||
.prepare(
|
||||
`
|
||||
SELECT entry_json
|
||||
FROM delivery_queue_entries
|
||||
WHERE queue_name = 'outbound' AND status = 'pending'
|
||||
ORDER BY enqueued_at ASC, id ASC
|
||||
`,
|
||||
)
|
||||
.all() as Array<{ entry_json: string }>;
|
||||
return rows.map((row) => JSON.parse(row.entry_json) as Record<string, unknown>);
|
||||
}
|
||||
|
||||
export function setQueuedEntryState(
|
||||
@@ -43,12 +66,12 @@ export function setQueuedEntryState(
|
||||
state: {
|
||||
retryCount: number;
|
||||
lastAttemptAt?: number;
|
||||
lastError?: string;
|
||||
enqueuedAt?: number;
|
||||
platformSendStartedAt?: number;
|
||||
recoveryState?: "send_attempt_started" | "unknown_after_send";
|
||||
},
|
||||
): void {
|
||||
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
||||
const entry = readQueuedEntry(tmpDir, id);
|
||||
entry.retryCount = state.retryCount;
|
||||
if (state.lastAttemptAt === undefined) {
|
||||
@@ -59,13 +82,42 @@ export function setQueuedEntryState(
|
||||
if (state.enqueuedAt !== undefined) {
|
||||
entry.enqueuedAt = state.enqueuedAt;
|
||||
}
|
||||
if (state.lastError === undefined) {
|
||||
delete entry.lastError;
|
||||
} else {
|
||||
entry.lastError = state.lastError;
|
||||
}
|
||||
if (state.platformSendStartedAt !== undefined) {
|
||||
entry.platformSendStartedAt = state.platformSendStartedAt;
|
||||
}
|
||||
if (state.recoveryState !== undefined) {
|
||||
entry.recoveryState = state.recoveryState;
|
||||
}
|
||||
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
|
||||
const { db } = openOpenClawStateDatabase({ env: { ...process.env, OPENCLAW_STATE_DIR: tmpDir } });
|
||||
db.prepare(
|
||||
`
|
||||
UPDATE delivery_queue_entries
|
||||
SET retry_count = ?,
|
||||
enqueued_at = ?,
|
||||
last_attempt_at = ?,
|
||||
last_error = ?,
|
||||
platform_send_started_at = ?,
|
||||
recovery_state = ?,
|
||||
entry_json = ?,
|
||||
updated_at = ?
|
||||
WHERE queue_name = 'outbound' AND id = ?
|
||||
`,
|
||||
).run(
|
||||
state.retryCount,
|
||||
state.enqueuedAt ?? Number(entry.enqueuedAt ?? 0),
|
||||
state.lastAttemptAt ?? null,
|
||||
state.lastError ?? null,
|
||||
state.platformSendStartedAt ?? null,
|
||||
state.recoveryState ?? null,
|
||||
JSON.stringify(entry),
|
||||
Date.now(),
|
||||
id,
|
||||
);
|
||||
}
|
||||
|
||||
export function createRecoveryLog(): RecoveryLogger & {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
export {
|
||||
ackDelivery,
|
||||
enqueueDelivery,
|
||||
ensureQueueDir,
|
||||
failDelivery,
|
||||
loadPendingDelivery,
|
||||
loadPendingDeliveries,
|
||||
|
||||
@@ -1,24 +1,17 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import path from "node:path";
|
||||
import {
|
||||
ackJsonDurableQueueEntry,
|
||||
ensureJsonDurableQueueDirs,
|
||||
jsonDurableQueueEntryExists,
|
||||
loadJsonDurableQueueEntry,
|
||||
loadPendingJsonDurableQueueEntries,
|
||||
moveJsonDurableQueueEntryToFailed,
|
||||
readJsonDurableQueueEntry,
|
||||
resolveJsonDurableQueueEntryPaths,
|
||||
writeJsonDurableQueueEntry,
|
||||
} from "@openclaw/fs-safe/store";
|
||||
import type { ChatType } from "../channels/chat-type.js";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import {
|
||||
deleteDeliveryQueueEntry,
|
||||
loadDeliveryQueueEntries,
|
||||
loadDeliveryQueueEntry,
|
||||
moveDeliveryQueueEntryToFailed,
|
||||
updateDeliveryQueueEntry,
|
||||
upsertDeliveryQueueEntry,
|
||||
type DeliveryQueueRowMetadata,
|
||||
} from "./delivery-queue-sqlite.js";
|
||||
import { generateSecureUuid } from "./secure-random.js";
|
||||
|
||||
const QUEUE_DIRNAME = "session-delivery-queue";
|
||||
const FAILED_DIRNAME = "failed";
|
||||
const TMP_SWEEP_MAX_AGE_MS = 5_000;
|
||||
const QUEUE_TEMP_PREFIX = ".session-delivery-queue";
|
||||
const QUEUE_NAME = "session";
|
||||
|
||||
type SessionDeliveryContext = {
|
||||
channel?: string;
|
||||
@@ -74,71 +67,44 @@ function buildEntryId(idempotencyKey?: string): string {
|
||||
return createHash("sha256").update(idempotencyKey).digest("hex");
|
||||
}
|
||||
|
||||
async function writeQueueEntry(filePath: string, entry: QueuedSessionDelivery): Promise<void> {
|
||||
await writeJsonDurableQueueEntry({
|
||||
filePath,
|
||||
entry,
|
||||
tempPrefix: QUEUE_TEMP_PREFIX,
|
||||
});
|
||||
}
|
||||
|
||||
async function readQueueEntry(filePath: string): Promise<QueuedSessionDelivery> {
|
||||
return await readJsonDurableQueueEntry<QueuedSessionDelivery>(filePath);
|
||||
}
|
||||
|
||||
export function resolveSessionDeliveryQueueDir(stateDir?: string): string {
|
||||
const base = stateDir ?? resolveStateDir();
|
||||
return path.join(base, QUEUE_DIRNAME);
|
||||
}
|
||||
|
||||
function resolveFailedDir(stateDir?: string): string {
|
||||
return path.join(resolveSessionDeliveryQueueDir(stateDir), FAILED_DIRNAME);
|
||||
}
|
||||
|
||||
function resolveQueueEntryPaths(
|
||||
id: string,
|
||||
stateDir?: string,
|
||||
): {
|
||||
jsonPath: string;
|
||||
deliveredPath: string;
|
||||
} {
|
||||
return resolveJsonDurableQueueEntryPaths(resolveSessionDeliveryQueueDir(stateDir), id);
|
||||
}
|
||||
|
||||
async function ensureSessionDeliveryQueueDir(stateDir?: string): Promise<string> {
|
||||
const queueDir = resolveSessionDeliveryQueueDir(stateDir);
|
||||
await ensureJsonDurableQueueDirs({
|
||||
queueDir,
|
||||
failedDir: resolveFailedDir(stateDir),
|
||||
});
|
||||
return queueDir;
|
||||
function queuedSessionDeliveryMetadata(entry: QueuedSessionDelivery): DeliveryQueueRowMetadata {
|
||||
const route = entry.kind === "agentTurn" ? entry.route : undefined;
|
||||
return {
|
||||
entryKind: entry.kind,
|
||||
sessionKey: entry.sessionKey,
|
||||
channel: route?.channel ?? entry.deliveryContext?.channel,
|
||||
target: route?.to ?? entry.deliveryContext?.to,
|
||||
accountId: route?.accountId ?? entry.deliveryContext?.accountId,
|
||||
};
|
||||
}
|
||||
|
||||
export async function enqueueSessionDelivery(
|
||||
params: QueuedSessionDeliveryPayload,
|
||||
stateDir?: string,
|
||||
): Promise<string> {
|
||||
const queueDir = await ensureSessionDeliveryQueueDir(stateDir);
|
||||
const id = buildEntryId(params.idempotencyKey);
|
||||
const filePath = path.join(queueDir, `${id}.json`);
|
||||
|
||||
if (params.idempotencyKey) {
|
||||
if (await jsonDurableQueueEntryExists(filePath)) {
|
||||
return id;
|
||||
}
|
||||
if (params.idempotencyKey && loadDeliveryQueueEntry(QUEUE_NAME, id, stateDir)) {
|
||||
return id;
|
||||
}
|
||||
|
||||
await writeQueueEntry(filePath, {
|
||||
const entry: QueuedSessionDelivery = {
|
||||
...params,
|
||||
id,
|
||||
enqueuedAt: Date.now(),
|
||||
retryCount: 0,
|
||||
};
|
||||
upsertDeliveryQueueEntry({
|
||||
queueName: QUEUE_NAME,
|
||||
entry,
|
||||
metadata: queuedSessionDeliveryMetadata(entry),
|
||||
stateDir,
|
||||
});
|
||||
return id;
|
||||
}
|
||||
|
||||
export async function ackSessionDelivery(id: string, stateDir?: string): Promise<void> {
|
||||
await ackJsonDurableQueueEntry(resolveQueueEntryPaths(id, stateDir));
|
||||
deleteDeliveryQueueEntry(QUEUE_NAME, id, stateDir);
|
||||
}
|
||||
|
||||
export async function failSessionDelivery(
|
||||
@@ -146,38 +112,30 @@ export async function failSessionDelivery(
|
||||
error: string,
|
||||
stateDir?: string,
|
||||
): Promise<void> {
|
||||
const filePath = path.join(resolveSessionDeliveryQueueDir(stateDir), `${id}.json`);
|
||||
const entry = await readQueueEntry(filePath);
|
||||
entry.retryCount += 1;
|
||||
entry.lastAttemptAt = Date.now();
|
||||
entry.lastError = error;
|
||||
await writeQueueEntry(filePath, entry);
|
||||
updateDeliveryQueueEntry(QUEUE_NAME, id, stateDir, (entry) => {
|
||||
const queued = entry as QueuedSessionDelivery;
|
||||
return {
|
||||
...queued,
|
||||
retryCount: queued.retryCount + 1,
|
||||
lastAttemptAt: Date.now(),
|
||||
lastError: error,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
export async function loadPendingSessionDelivery(
|
||||
id: string,
|
||||
stateDir?: string,
|
||||
): Promise<QueuedSessionDelivery | null> {
|
||||
return await loadJsonDurableQueueEntry({
|
||||
paths: resolveQueueEntryPaths(id, stateDir),
|
||||
tempPrefix: QUEUE_TEMP_PREFIX,
|
||||
});
|
||||
return loadDeliveryQueueEntry(QUEUE_NAME, id, stateDir) as QueuedSessionDelivery | null;
|
||||
}
|
||||
|
||||
export async function loadPendingSessionDeliveries(
|
||||
stateDir?: string,
|
||||
): Promise<QueuedSessionDelivery[]> {
|
||||
return await loadPendingJsonDurableQueueEntries({
|
||||
queueDir: resolveSessionDeliveryQueueDir(stateDir),
|
||||
tempPrefix: QUEUE_TEMP_PREFIX,
|
||||
cleanupTmpMaxAgeMs: TMP_SWEEP_MAX_AGE_MS,
|
||||
});
|
||||
return loadDeliveryQueueEntries(QUEUE_NAME, stateDir) as QueuedSessionDelivery[];
|
||||
}
|
||||
|
||||
export async function moveSessionDeliveryToFailed(id: string, stateDir?: string): Promise<void> {
|
||||
await moveJsonDurableQueueEntryToFailed({
|
||||
queueDir: resolveSessionDeliveryQueueDir(stateDir),
|
||||
failedDir: resolveFailedDir(stateDir),
|
||||
id,
|
||||
});
|
||||
moveDeliveryQueueEntryToFailed(QUEUE_NAME, id, stateDir);
|
||||
}
|
||||
|
||||
@@ -1,16 +1,24 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { openOpenClawStateDatabase } from "../state/openclaw-state-db.js";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import {
|
||||
ackSessionDelivery,
|
||||
enqueueSessionDelivery,
|
||||
failSessionDelivery,
|
||||
loadPendingSessionDeliveries,
|
||||
resolveSessionDeliveryQueueDir,
|
||||
} from "./session-delivery-queue.js";
|
||||
|
||||
describe("session-delivery queue storage", () => {
|
||||
function readSessionQueueStatus(tempDir: string, id: string): string | undefined {
|
||||
const { db } = openOpenClawStateDatabase({
|
||||
env: { ...process.env, OPENCLAW_STATE_DIR: tempDir },
|
||||
});
|
||||
const row = db
|
||||
.prepare("SELECT status FROM delivery_queue_entries WHERE queue_name = 'session' AND id = ?")
|
||||
.get(id) as { status?: string } | undefined;
|
||||
return row?.status;
|
||||
}
|
||||
|
||||
it("dedupes entries when an idempotency key is reused", async () => {
|
||||
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
|
||||
const firstId = await enqueueSessionDelivery(
|
||||
@@ -60,9 +68,9 @@ describe("session-delivery queue storage", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("cleans up orphaned temporary queue files during load", async () => {
|
||||
it("moves entries out of pending retry state", async () => {
|
||||
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
|
||||
await enqueueSessionDelivery(
|
||||
const id = await enqueueSessionDelivery(
|
||||
{
|
||||
kind: "systemEvent",
|
||||
sessionKey: "agent:main:main",
|
||||
@@ -70,26 +78,10 @@ describe("session-delivery queue storage", () => {
|
||||
},
|
||||
tempDir,
|
||||
);
|
||||
const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "orphan-entry.tmp");
|
||||
fs.writeFileSync(tmpPath, "stale tmp");
|
||||
const staleAt = new Date(Date.now() - 60_000);
|
||||
fs.utimesSync(tmpPath, staleAt, staleAt);
|
||||
|
||||
await loadPendingSessionDeliveries(tempDir);
|
||||
await ackSessionDelivery(id, tempDir);
|
||||
|
||||
expect(fs.existsSync(tmpPath)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps fresh temporary queue files while a write may still be in flight", async () => {
|
||||
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
|
||||
const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "active-entry.tmp");
|
||||
fs.mkdirSync(path.dirname(tmpPath), { recursive: true });
|
||||
fs.writeFileSync(tmpPath, "active tmp");
|
||||
|
||||
await loadPendingSessionDeliveries(tempDir);
|
||||
|
||||
expect(fs.existsSync(tmpPath)).toBe(true);
|
||||
expect(readSessionQueueStatus(tempDir, id)).toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,7 +4,6 @@ export {
|
||||
failSessionDelivery,
|
||||
loadPendingSessionDelivery,
|
||||
loadPendingSessionDeliveries,
|
||||
resolveSessionDeliveryQueueDir,
|
||||
} from "./session-delivery-queue-storage.js";
|
||||
export type {
|
||||
QueuedSessionDelivery,
|
||||
|
||||
@@ -4,6 +4,7 @@ import path from "node:path";
|
||||
import { afterEach, beforeAll, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { resolveChannelAllowFromPath } from "../pairing/pairing-store.js";
|
||||
import { openOpenClawStateDatabase } from "../state/openclaw-state-db.js";
|
||||
import { createTrackedTempDirs } from "../test-utils/tracked-temp-dirs.js";
|
||||
import { detectLegacyStateMigrations, runLegacyStateMigrations } from "./state-migrations.js";
|
||||
|
||||
@@ -170,6 +171,7 @@ async function createLegacyStateFixture(params?: { includePreKey?: boolean }) {
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
vi.useRealTimers();
|
||||
await tempDirs.cleanup();
|
||||
});
|
||||
|
||||
@@ -280,6 +282,229 @@ describe("state migrations", () => {
|
||||
await expectMissingPath(resolveChannelAllowFromPath("chatapp", env, "beta"));
|
||||
});
|
||||
|
||||
it("migrates legacy delivery queue files into shared SQLite state", async () => {
|
||||
const root = await createTempDir();
|
||||
const stateDir = path.join(root, ".openclaw");
|
||||
const env = createEnv(stateDir);
|
||||
const cfg = createConfig();
|
||||
await fs.mkdir(path.join(stateDir, "delivery-queue"), { recursive: true });
|
||||
await fs.mkdir(path.join(stateDir, "delivery-queue", "failed"), { recursive: true });
|
||||
await fs.mkdir(path.join(stateDir, "session-delivery-queue"), { recursive: true });
|
||||
await fs.mkdir(path.join(stateDir, "session-delivery-queue", "failed"), { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(stateDir, "delivery-queue", "outbound-1.json"),
|
||||
JSON.stringify({
|
||||
id: "outbound-1",
|
||||
enqueuedAt: 10,
|
||||
retryCount: 2,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
accountId: "main",
|
||||
payloads: [{ text: "hi" }],
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
await fs.writeFile(
|
||||
path.join(stateDir, "session-delivery-queue", "session-1.json"),
|
||||
JSON.stringify({
|
||||
id: "session-1",
|
||||
kind: "agentTurn",
|
||||
sessionKey: "agent:main:main",
|
||||
message: "resume",
|
||||
messageId: "m1",
|
||||
retryCount: 0,
|
||||
enqueuedAt: 20,
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
await fs.writeFile(
|
||||
path.join(stateDir, "delivery-queue", "failed", "outbound-failed.json"),
|
||||
JSON.stringify({
|
||||
id: "outbound-failed",
|
||||
enqueuedAt: 30,
|
||||
retryCount: 3,
|
||||
channel: "telegram",
|
||||
to: "456",
|
||||
lastError: "permanent",
|
||||
payloads: [{ text: "nope" }],
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
await fs.writeFile(
|
||||
path.join(stateDir, "session-delivery-queue", "failed", "session-failed.json"),
|
||||
JSON.stringify({
|
||||
id: "session-failed",
|
||||
kind: "agentTurn",
|
||||
sessionKey: "agent:main:main",
|
||||
message: "failed resume",
|
||||
lastError: "expired",
|
||||
retryCount: 3,
|
||||
enqueuedAt: 40,
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const detected = await detectLegacyStateMigrations({ cfg, env, homedir: () => root });
|
||||
expect(detected.deliveryQueues.hasLegacy).toBe(true);
|
||||
|
||||
const result = await runLegacyStateMigrations({ detected });
|
||||
|
||||
expect(result.warnings).toStrictEqual([]);
|
||||
expect(result.changes).toContain(
|
||||
"Migrated 2 outbound delivery queue entries → shared SQLite state",
|
||||
);
|
||||
expect(result.changes).toContain(
|
||||
"Migrated 2 session delivery queue entries → shared SQLite state",
|
||||
);
|
||||
const { db } = openOpenClawStateDatabase({ env });
|
||||
const rows = db
|
||||
.prepare(
|
||||
"SELECT queue_name, id, status, channel, target, retry_count FROM delivery_queue_entries ORDER BY queue_name, id",
|
||||
)
|
||||
.all();
|
||||
expect(rows).toEqual([
|
||||
{
|
||||
queue_name: "outbound",
|
||||
id: "outbound-1",
|
||||
status: "pending",
|
||||
channel: "telegram",
|
||||
target: "123",
|
||||
retry_count: 2,
|
||||
},
|
||||
{
|
||||
queue_name: "outbound",
|
||||
id: "outbound-failed",
|
||||
status: "failed",
|
||||
channel: "telegram",
|
||||
target: "456",
|
||||
retry_count: 3,
|
||||
},
|
||||
{
|
||||
queue_name: "session",
|
||||
id: "session-1",
|
||||
status: "pending",
|
||||
channel: null,
|
||||
target: null,
|
||||
retry_count: 0,
|
||||
},
|
||||
{
|
||||
queue_name: "session",
|
||||
id: "session-failed",
|
||||
status: "failed",
|
||||
channel: null,
|
||||
target: null,
|
||||
retry_count: 3,
|
||||
},
|
||||
]);
|
||||
await expectMissingPath(path.join(stateDir, "delivery-queue"));
|
||||
await expectMissingPath(path.join(stateDir, "session-delivery-queue"));
|
||||
});
|
||||
|
||||
it("keeps legacy delivery queue files when shared SQLite already has a conflicting row", async () => {
|
||||
const root = await createTempDir();
|
||||
const stateDir = path.join(root, ".openclaw");
|
||||
const env = createEnv(stateDir);
|
||||
const cfg = createConfig();
|
||||
const queueDir = path.join(stateDir, "delivery-queue");
|
||||
await fs.mkdir(path.join(queueDir, "failed"), { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(queueDir, "outbound-1.json"),
|
||||
JSON.stringify({
|
||||
id: "outbound-1",
|
||||
enqueuedAt: 10,
|
||||
retryCount: 2,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
payloads: [{ text: "hi" }],
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
await fs.writeFile(path.join(queueDir, "outbound-1.delivered"), '{"id":"done"}\n', "utf8");
|
||||
await fs.writeFile(
|
||||
path.join(queueDir, "outbound-2.json"),
|
||||
JSON.stringify({
|
||||
id: "outbound-2",
|
||||
enqueuedAt: 11,
|
||||
retryCount: 1,
|
||||
channel: "telegram",
|
||||
to: "456",
|
||||
payloads: [{ text: "still pending" }],
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
await fs.writeFile(
|
||||
path.join(queueDir, "failed", "outbound-failed.json"),
|
||||
JSON.stringify({
|
||||
id: "outbound-failed",
|
||||
enqueuedAt: 12,
|
||||
retryCount: 3,
|
||||
channel: "telegram",
|
||||
to: "789",
|
||||
lastError: "nope",
|
||||
payloads: [{ text: "failed once" }],
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const { db } = openOpenClawStateDatabase({ env });
|
||||
db.prepare(
|
||||
`
|
||||
INSERT INTO delivery_queue_entries (
|
||||
queue_name, id, status, channel, target, retry_count, entry_json,
|
||||
enqueued_at, updated_at
|
||||
) VALUES (
|
||||
'outbound', 'outbound-1', 'pending', 'telegram', '123', 0,
|
||||
'{"id":"outbound-1","retryCount":0}', 10, 10
|
||||
)
|
||||
`,
|
||||
).run();
|
||||
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(1_000);
|
||||
const detected = await detectLegacyStateMigrations({ cfg, env, homedir: () => root });
|
||||
const result = await runLegacyStateMigrations({ detected });
|
||||
|
||||
expect(result.changes).toContain(
|
||||
"Migrated 2 outbound delivery queue entries → shared SQLite state",
|
||||
);
|
||||
expect(result.changes).toContain("Removed 1 outbound delivery queue delivered marker");
|
||||
expect(result.warnings).toStrictEqual([
|
||||
"Left outbound delivery queue in place because 1 entry already existed in shared state: outbound-1",
|
||||
]);
|
||||
await expect(fs.readFile(path.join(queueDir, "outbound-1.json"), "utf8")).resolves.toContain(
|
||||
'"retryCount":2',
|
||||
);
|
||||
await expectMissingPath(path.join(queueDir, "outbound-1.delivered"));
|
||||
expect(
|
||||
db
|
||||
.prepare(
|
||||
"SELECT retry_count FROM delivery_queue_entries WHERE queue_name = 'outbound' AND id = 'outbound-1'",
|
||||
)
|
||||
.get(),
|
||||
).toEqual({ retry_count: 0 });
|
||||
expect(
|
||||
db
|
||||
.prepare(
|
||||
"SELECT retry_count FROM delivery_queue_entries WHERE queue_name = 'outbound' AND id = 'outbound-2'",
|
||||
)
|
||||
.get(),
|
||||
).toEqual({ retry_count: 1 });
|
||||
expect(
|
||||
db
|
||||
.prepare(
|
||||
"SELECT retry_count, failed_at FROM delivery_queue_entries WHERE queue_name = 'outbound' AND id = 'outbound-failed'",
|
||||
)
|
||||
.get(),
|
||||
).toEqual({ retry_count: 3, failed_at: 12 });
|
||||
|
||||
vi.setSystemTime(2_000);
|
||||
const rerunDetected = await detectLegacyStateMigrations({ cfg, env, homedir: () => root });
|
||||
const rerunResult = await runLegacyStateMigrations({ detected: rerunDetected });
|
||||
expect(rerunResult.warnings).toStrictEqual([
|
||||
"Left outbound delivery queue in place because 1 entry already existed in shared state: outbound-1",
|
||||
]);
|
||||
});
|
||||
|
||||
it("preserves a corrupt target session store instead of overwriting it with legacy-only data", async () => {
|
||||
const { root, stateDir, env, cfg } = await createLegacyStateFixture();
|
||||
|
||||
|
||||
@@ -97,6 +97,11 @@ export type LegacyStateDetection = {
|
||||
flowRunsPath: string;
|
||||
hasLegacy: boolean;
|
||||
};
|
||||
deliveryQueues: {
|
||||
outboundPath: string;
|
||||
sessionPath: string;
|
||||
hasLegacy: boolean;
|
||||
};
|
||||
preview: string[];
|
||||
};
|
||||
|
||||
@@ -138,6 +143,14 @@ type DetectedPluginDoctorStateMigrationPlan = {
|
||||
|
||||
const PLUGIN_STATE_SQLITE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
|
||||
const TASK_STATE_SQLITE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
|
||||
const LEGACY_DELIVERY_QUEUE_DIRS = [
|
||||
{ label: "outbound delivery queue", queueName: "outbound", dirName: "delivery-queue" },
|
||||
{ label: "session delivery queue", queueName: "session", dirName: "session-delivery-queue" },
|
||||
] as const;
|
||||
type LegacyDeliveryQueueFile = {
|
||||
sourcePath: string;
|
||||
status: "pending" | "failed";
|
||||
};
|
||||
|
||||
class LegacyPluginStateSidecarConflictError extends Error {
|
||||
constructor(readonly conflictedKeys: string[]) {
|
||||
@@ -813,6 +826,284 @@ async function migrateLegacyTaskStateSidecars(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function resolveLegacyDeliveryQueuePath(stateDir: string, dirName: string): string {
|
||||
return path.join(stateDir, dirName);
|
||||
}
|
||||
|
||||
function listLegacyDeliveryQueueFiles(queueDir: string): LegacyDeliveryQueueFile[] {
|
||||
const pending = safeReadDir(queueDir)
|
||||
.filter((entry) => entry.isFile() && entry.name.endsWith(".json"))
|
||||
.map((entry) => ({ sourcePath: path.join(queueDir, entry.name), status: "pending" as const }));
|
||||
const failedDir = path.join(queueDir, "failed");
|
||||
const failed = safeReadDir(failedDir)
|
||||
.filter((entry) => entry.isFile() && entry.name.endsWith(".json"))
|
||||
.map((entry) => ({
|
||||
sourcePath: path.join(failedDir, entry.name),
|
||||
status: "failed" as const,
|
||||
}));
|
||||
return [...pending, ...failed];
|
||||
}
|
||||
|
||||
function listLegacyDeliveryQueueDeliveredMarkers(queueDir: string): string[] {
|
||||
return safeReadDir(queueDir)
|
||||
.filter((entry) => entry.isFile() && entry.name.endsWith(".delivered"))
|
||||
.map((entry) => path.join(queueDir, entry.name));
|
||||
}
|
||||
|
||||
function readLegacyDeliveryQueueEntry(sourcePath: string): Record<string, unknown> | null {
|
||||
try {
|
||||
const parsed = JSON.parse(fs.readFileSync(sourcePath, "utf8")) as unknown;
|
||||
return parsed && typeof parsed === "object" && !Array.isArray(parsed)
|
||||
? (parsed as Record<string, unknown>)
|
||||
: null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function legacyQueueMetadata(entry: Record<string, unknown>): {
|
||||
entryKind: string | null;
|
||||
sessionKey: string | null;
|
||||
channel: string | null;
|
||||
target: string | null;
|
||||
accountId: string | null;
|
||||
} {
|
||||
const session = entry.session as { key?: unknown } | undefined;
|
||||
const route = entry.route as { channel?: unknown; to?: unknown; accountId?: unknown } | undefined;
|
||||
const deliveryContext = entry.deliveryContext as
|
||||
| { channel?: unknown; to?: unknown; accountId?: unknown }
|
||||
| undefined;
|
||||
const stringOrNull = (value: unknown) => (typeof value === "string" ? value : null);
|
||||
return {
|
||||
entryKind: stringOrNull(entry.kind) ?? "outbound",
|
||||
sessionKey: stringOrNull(entry.sessionKey) ?? stringOrNull(session?.key),
|
||||
channel:
|
||||
stringOrNull(entry.channel) ??
|
||||
stringOrNull(route?.channel) ??
|
||||
stringOrNull(deliveryContext?.channel),
|
||||
target: stringOrNull(entry.to) ?? stringOrNull(route?.to) ?? stringOrNull(deliveryContext?.to),
|
||||
accountId:
|
||||
stringOrNull(entry.accountId) ??
|
||||
stringOrNull(route?.accountId) ??
|
||||
stringOrNull(deliveryContext?.accountId),
|
||||
};
|
||||
}
|
||||
|
||||
function buildLegacyDeliveryQueueRow(params: {
|
||||
queueName: string;
|
||||
id: string;
|
||||
status: "pending" | "failed";
|
||||
entry: Record<string, unknown>;
|
||||
now: number;
|
||||
}): SqliteBindRow {
|
||||
const enqueuedAt =
|
||||
typeof params.entry.enqueuedAt === "number" ? params.entry.enqueuedAt : params.now;
|
||||
const retryCount = typeof params.entry.retryCount === "number" ? params.entry.retryCount : 0;
|
||||
const failedAt =
|
||||
params.status === "failed"
|
||||
? typeof params.entry.failedAt === "number"
|
||||
? params.entry.failedAt
|
||||
: typeof params.entry.lastAttemptAt === "number"
|
||||
? params.entry.lastAttemptAt
|
||||
: enqueuedAt
|
||||
: null;
|
||||
const meta = legacyQueueMetadata(params.entry);
|
||||
return {
|
||||
queue_name: params.queueName,
|
||||
id: params.id,
|
||||
status: params.status,
|
||||
entry_kind: meta.entryKind,
|
||||
session_key: meta.sessionKey,
|
||||
channel: meta.channel,
|
||||
target: meta.target,
|
||||
account_id: meta.accountId,
|
||||
retry_count: retryCount,
|
||||
last_attempt_at:
|
||||
typeof params.entry.lastAttemptAt === "number" ? params.entry.lastAttemptAt : null,
|
||||
last_error: typeof params.entry.lastError === "string" ? params.entry.lastError : null,
|
||||
recovery_state:
|
||||
typeof params.entry.recoveryState === "string" ? params.entry.recoveryState : null,
|
||||
platform_send_started_at:
|
||||
typeof params.entry.platformSendStartedAt === "number"
|
||||
? params.entry.platformSendStartedAt
|
||||
: null,
|
||||
entry_json: JSON.stringify({ ...params.entry, id: params.id, enqueuedAt, retryCount }),
|
||||
enqueued_at: enqueuedAt,
|
||||
updated_at: params.now,
|
||||
failed_at: failedAt,
|
||||
};
|
||||
}
|
||||
|
||||
function legacyDeliveryQueueRowsMatch(
|
||||
existing: Record<string, unknown>,
|
||||
incoming: SqliteBindRow,
|
||||
): boolean {
|
||||
return [
|
||||
"status",
|
||||
"entry_kind",
|
||||
"session_key",
|
||||
"channel",
|
||||
"target",
|
||||
"account_id",
|
||||
"retry_count",
|
||||
"last_attempt_at",
|
||||
"last_error",
|
||||
"recovery_state",
|
||||
"platform_send_started_at",
|
||||
"entry_json",
|
||||
"enqueued_at",
|
||||
"failed_at",
|
||||
].every((column) => {
|
||||
const left = existing[column];
|
||||
const right = incoming[column];
|
||||
if (typeof left === "bigint" || typeof right === "bigint") {
|
||||
return (
|
||||
normalizeLegacySqliteInteger(left as number | bigint | null) ===
|
||||
normalizeLegacySqliteInteger(right as number | bigint | null)
|
||||
);
|
||||
}
|
||||
return left === right;
|
||||
});
|
||||
}
|
||||
|
||||
function removeLegacyDeliveryQueueDir(params: {
|
||||
queueDir: string;
|
||||
label: string;
|
||||
changes: string[];
|
||||
warnings: string[];
|
||||
}): void {
|
||||
try {
|
||||
fs.rmSync(params.queueDir, { recursive: true });
|
||||
params.changes.push(`Removed ${params.label} legacy source ${params.queueDir}`);
|
||||
} catch (err) {
|
||||
params.warnings.push(`Failed removing ${params.label} ${params.queueDir}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
function removeLegacyDeliveryQueueMarkers(
|
||||
markerPaths: string[],
|
||||
label: string,
|
||||
warnings: string[],
|
||||
): number | null {
|
||||
let removed = 0;
|
||||
for (const markerPath of markerPaths) {
|
||||
try {
|
||||
fs.rmSync(markerPath, { force: true });
|
||||
removed++;
|
||||
} catch (err) {
|
||||
warnings.push(`Failed removing ${label} marker ${markerPath}: ${String(err)}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
async function migrateLegacyDeliveryQueues(params: {
|
||||
stateDir: string;
|
||||
}): Promise<{ changes: string[]; warnings: string[] }> {
|
||||
const changes: string[] = [];
|
||||
const warnings: string[] = [];
|
||||
for (const queue of LEGACY_DELIVERY_QUEUE_DIRS) {
|
||||
const queueDir = resolveLegacyDeliveryQueuePath(params.stateDir, queue.dirName);
|
||||
const files = listLegacyDeliveryQueueFiles(queueDir);
|
||||
const markerPaths = listLegacyDeliveryQueueDeliveredMarkers(queueDir);
|
||||
if (files.length === 0 && markerPaths.length === 0) {
|
||||
continue;
|
||||
}
|
||||
let imported = 0;
|
||||
let skipped = 0;
|
||||
const conflicts: string[] = [];
|
||||
try {
|
||||
runOpenClawStateWriteTransaction(
|
||||
({ db }) => {
|
||||
const insert = db.prepare(
|
||||
`
|
||||
INSERT INTO delivery_queue_entries (
|
||||
queue_name, id, status, entry_kind, session_key, channel, target, account_id,
|
||||
retry_count, last_attempt_at, last_error, recovery_state,
|
||||
platform_send_started_at, entry_json, enqueued_at, updated_at, failed_at
|
||||
) VALUES (
|
||||
@queue_name, @id, @status, @entry_kind, @session_key, @channel, @target,
|
||||
@account_id, @retry_count, @last_attempt_at, @last_error, @recovery_state,
|
||||
@platform_send_started_at, @entry_json, @enqueued_at, @updated_at, @failed_at
|
||||
)
|
||||
`,
|
||||
);
|
||||
const now = Date.now();
|
||||
for (const file of files) {
|
||||
const entry = readLegacyDeliveryQueueEntry(file.sourcePath);
|
||||
const id =
|
||||
typeof entry?.id === "string" ? entry.id : path.basename(file.sourcePath, ".json");
|
||||
if (!entry || !id) {
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
const row = buildLegacyDeliveryQueueRow({
|
||||
queueName: queue.queueName,
|
||||
id,
|
||||
status: file.status,
|
||||
entry,
|
||||
now,
|
||||
});
|
||||
const existing = db
|
||||
.prepare(
|
||||
`
|
||||
SELECT status, entry_kind, session_key, channel, target, account_id,
|
||||
retry_count, last_attempt_at, last_error, recovery_state,
|
||||
platform_send_started_at, entry_json, enqueued_at, failed_at
|
||||
FROM delivery_queue_entries
|
||||
WHERE queue_name = ? AND id = ?
|
||||
`,
|
||||
)
|
||||
.get(queue.queueName, id);
|
||||
if (existing) {
|
||||
if (!legacyDeliveryQueueRowsMatch(existing as Record<string, unknown>, row)) {
|
||||
conflicts.push(id);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
insert.run(row);
|
||||
imported++;
|
||||
}
|
||||
},
|
||||
{ env: { ...process.env, OPENCLAW_STATE_DIR: params.stateDir } },
|
||||
);
|
||||
} catch (err) {
|
||||
warnings.push(`Failed migrating ${queue.label} ${queueDir}: ${String(err)}`);
|
||||
continue;
|
||||
}
|
||||
const removedMarkers = removeLegacyDeliveryQueueMarkers(markerPaths, queue.label, warnings);
|
||||
if (removedMarkers === null) {
|
||||
continue;
|
||||
}
|
||||
if (removedMarkers > 0) {
|
||||
changes.push(
|
||||
`Removed ${removedMarkers} ${queue.label} delivered ${removedMarkers === 1 ? "marker" : "markers"}`,
|
||||
);
|
||||
}
|
||||
if (imported > 0) {
|
||||
changes.push(
|
||||
`Migrated ${imported} ${queue.label} ${imported === 1 ? "entry" : "entries"} → shared SQLite state`,
|
||||
);
|
||||
}
|
||||
if (skipped > 0) {
|
||||
warnings.push(
|
||||
`Skipped ${skipped} malformed ${queue.label} ${skipped === 1 ? "entry" : "entries"}`,
|
||||
);
|
||||
warnings.push(`Left ${queue.label} in place because malformed entries need manual cleanup`);
|
||||
continue;
|
||||
}
|
||||
if (conflicts.length > 0) {
|
||||
warnings.push(
|
||||
`Left ${queue.label} in place because ${conflicts.length} ${conflicts.length === 1 ? "entry" : "entries"} already existed in shared state: ${conflicts[0]}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
removeLegacyDeliveryQueueDir({ queueDir, label: queue.label, changes, warnings });
|
||||
}
|
||||
return { changes, warnings };
|
||||
}
|
||||
|
||||
async function migrateLegacyPluginStateSidecar(params: {
|
||||
stateDir: string;
|
||||
}): Promise<{ changes: string[]; warnings: string[] }> {
|
||||
@@ -2002,6 +2293,15 @@ export async function detectLegacyStateMigrations(params: {
|
||||
const taskRunsSidecarPath = resolveLegacyTaskRunsSidecarPath(stateDir);
|
||||
const flowRunsSidecarPath = resolveLegacyFlowRunsSidecarPath(stateDir);
|
||||
const hasTaskStateSidecars = fileExists(taskRunsSidecarPath) || fileExists(flowRunsSidecarPath);
|
||||
const deliveryQueuePaths = {
|
||||
outboundPath: resolveLegacyDeliveryQueuePath(stateDir, "delivery-queue"),
|
||||
sessionPath: resolveLegacyDeliveryQueuePath(stateDir, "session-delivery-queue"),
|
||||
};
|
||||
const hasDeliveryQueues =
|
||||
listLegacyDeliveryQueueFiles(deliveryQueuePaths.outboundPath).length > 0 ||
|
||||
listLegacyDeliveryQueueDeliveredMarkers(deliveryQueuePaths.outboundPath).length > 0 ||
|
||||
listLegacyDeliveryQueueFiles(deliveryQueuePaths.sessionPath).length > 0 ||
|
||||
listLegacyDeliveryQueueDeliveredMarkers(deliveryQueuePaths.sessionPath).length > 0;
|
||||
const channelPlans = await collectChannelLegacyStateMigrationPlans({
|
||||
cfg: params.cfg,
|
||||
env,
|
||||
@@ -2034,6 +2334,9 @@ export async function detectLegacyStateMigrations(params: {
|
||||
if (fileExists(flowRunsSidecarPath)) {
|
||||
preview.push(`- Task flow sidecar: ${flowRunsSidecarPath} → shared SQLite state`);
|
||||
}
|
||||
if (hasDeliveryQueues) {
|
||||
preview.push("- Delivery queues: legacy JSON queue files → shared SQLite state");
|
||||
}
|
||||
if (channelPlans.length > 0) {
|
||||
preview.push(...channelPlans.map(buildLegacyMigrationPreview));
|
||||
}
|
||||
@@ -2077,6 +2380,10 @@ export async function detectLegacyStateMigrations(params: {
|
||||
flowRunsPath: flowRunsSidecarPath,
|
||||
hasLegacy: hasTaskStateSidecars,
|
||||
},
|
||||
deliveryQueues: {
|
||||
...deliveryQueuePaths,
|
||||
hasLegacy: hasDeliveryQueues,
|
||||
},
|
||||
preview,
|
||||
};
|
||||
}
|
||||
@@ -2332,6 +2639,9 @@ export async function runLegacyStateMigrations(params: {
|
||||
const taskStateSidecars = await migrateLegacyTaskStateSidecars({
|
||||
stateDir: detected.stateDir,
|
||||
});
|
||||
const deliveryQueues = await migrateLegacyDeliveryQueues({
|
||||
stateDir: detected.stateDir,
|
||||
});
|
||||
const preSessionChannelPlans = await runLegacyMigrationPlans(
|
||||
detected.channelPlans.plans.filter((plan) => plan.kind === "plugin-state-import"),
|
||||
);
|
||||
@@ -2350,6 +2660,7 @@ export async function runLegacyStateMigrations(params: {
|
||||
changes: [
|
||||
...pluginStateSidecar.changes,
|
||||
...taskStateSidecars.changes,
|
||||
...deliveryQueues.changes,
|
||||
...preSessionChannelPlans.changes,
|
||||
...pluginPlans.changes,
|
||||
...sessions.changes,
|
||||
@@ -2359,6 +2670,7 @@ export async function runLegacyStateMigrations(params: {
|
||||
warnings: [
|
||||
...pluginStateSidecar.warnings,
|
||||
...taskStateSidecars.warnings,
|
||||
...deliveryQueues.warnings,
|
||||
...preSessionChannelPlans.warnings,
|
||||
...pluginPlans.warnings,
|
||||
...sessions.warnings,
|
||||
@@ -2600,6 +2912,9 @@ export async function autoMigrateLegacyState(params: {
|
||||
const taskStateSidecars = await migrateLegacyTaskStateSidecars({
|
||||
stateDir: detected.stateDir,
|
||||
});
|
||||
const deliveryQueues = await migrateLegacyDeliveryQueues({
|
||||
stateDir: detected.stateDir,
|
||||
});
|
||||
const preSessionChannelPlans = await runLegacyMigrationPlans(
|
||||
detected.channelPlans.plans.filter((plan) => plan.kind === "plugin-state-import"),
|
||||
);
|
||||
@@ -2612,6 +2927,7 @@ export async function autoMigrateLegacyState(params: {
|
||||
...orphanKeys.changes,
|
||||
...pluginStateSidecar.changes,
|
||||
...taskStateSidecars.changes,
|
||||
...deliveryQueues.changes,
|
||||
...preSessionChannelPlans.changes,
|
||||
...pluginPlans.changes,
|
||||
];
|
||||
@@ -2620,6 +2936,7 @@ export async function autoMigrateLegacyState(params: {
|
||||
...orphanKeys.warnings,
|
||||
...pluginStateSidecar.warnings,
|
||||
...taskStateSidecars.warnings,
|
||||
...deliveryQueues.warnings,
|
||||
...preSessionChannelPlans.warnings,
|
||||
...pluginPlans.warnings,
|
||||
];
|
||||
@@ -2630,6 +2947,7 @@ export async function autoMigrateLegacyState(params: {
|
||||
orphanKeys.changes.length > 0 ||
|
||||
pluginStateSidecar.changes.length > 0 ||
|
||||
taskStateSidecars.changes.length > 0 ||
|
||||
deliveryQueues.changes.length > 0 ||
|
||||
preSessionChannelPlans.changes.length > 0 ||
|
||||
pluginPlans.changes.length > 0,
|
||||
skipped: true,
|
||||
@@ -2643,7 +2961,8 @@ export async function autoMigrateLegacyState(params: {
|
||||
!detected.channelPlans.hasLegacy &&
|
||||
!detected.pluginPlans?.hasLegacy &&
|
||||
!detected.pluginStateSidecar.hasLegacy &&
|
||||
!detected.taskStateSidecars.hasLegacy
|
||||
!detected.taskStateSidecars.hasLegacy &&
|
||||
!detected.deliveryQueues.hasLegacy
|
||||
) {
|
||||
const changes = [...stateDirResult.changes, ...orphanKeys.changes];
|
||||
const warnings = [...stateDirResult.warnings, ...orphanKeys.warnings];
|
||||
@@ -2663,6 +2982,9 @@ export async function autoMigrateLegacyState(params: {
|
||||
const taskStateSidecars = await migrateLegacyTaskStateSidecars({
|
||||
stateDir: detected.stateDir,
|
||||
});
|
||||
const deliveryQueues = await migrateLegacyDeliveryQueues({
|
||||
stateDir: detected.stateDir,
|
||||
});
|
||||
const preSessionChannelPlans = await runLegacyMigrationPlans(
|
||||
detected.channelPlans.plans.filter((plan) => plan.kind === "plugin-state-import"),
|
||||
);
|
||||
@@ -2682,6 +3004,7 @@ export async function autoMigrateLegacyState(params: {
|
||||
...orphanKeys.changes,
|
||||
...pluginStateSidecar.changes,
|
||||
...taskStateSidecars.changes,
|
||||
...deliveryQueues.changes,
|
||||
...preSessionChannelPlans.changes,
|
||||
...pluginPlans.changes,
|
||||
...sessions.changes,
|
||||
@@ -2693,6 +3016,7 @@ export async function autoMigrateLegacyState(params: {
|
||||
...orphanKeys.warnings,
|
||||
...pluginStateSidecar.warnings,
|
||||
...taskStateSidecars.warnings,
|
||||
...deliveryQueues.warnings,
|
||||
...preSessionChannelPlans.warnings,
|
||||
...pluginPlans.warnings,
|
||||
...sessions.warnings,
|
||||
|
||||
@@ -65,6 +65,7 @@ export type { PluginHookRegistration } from "../plugins/hook-types.js";
|
||||
export type { RuntimeEnv } from "../runtime.js";
|
||||
export type { MockFn } from "../test-utils/vitest-mock-fn.js";
|
||||
export { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
|
||||
export { readQueuedEntries as readQueuedDeliveryEntriesForTest } from "../infra/outbound/delivery-queue.test-helpers.js";
|
||||
export {
|
||||
registerProviderPlugin,
|
||||
registerProviderPlugins,
|
||||
|
||||
Reference in New Issue
Block a user