mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
refactor: move MS Teams state migration to doctor
This commit is contained in:
committed by
GitHub
parent
3a335c6df1
commit
b9aade4b12
@@ -916,7 +916,7 @@ OpenClaw sends Teams polls as Adaptive Cards (there is no native Teams poll API)
|
||||
|
||||
- CLI: `openclaw message poll --channel msteams --target conversation:<id> ...`
|
||||
- Votes are recorded by the gateway in OpenClaw plugin-state SQLite under `state/openclaw.sqlite`.
|
||||
- Existing `msteams-polls.json` files are imported once when the MSTeams plugin starts.
|
||||
- Existing `msteams-polls.json` files are imported by `openclaw doctor --fix`, not by the running plugin.
|
||||
- The gateway must stay online to record votes.
|
||||
- Polls do not auto-post result summaries yet, and there is no supported poll-results CLI yet.
|
||||
|
||||
|
||||
@@ -1002,10 +1002,10 @@ sessionId})`; create, branch, continue, list, and fork flows live in their
|
||||
- The generic plugin SDK persistent-dedupe helper no longer exposes file-shaped
|
||||
options. Callers provide SQLite scope keys and durable dedupe rows live in
|
||||
shared plugin state.
|
||||
- Microsoft Teams SSO and delegated OAuth tokens moved from locked JSON files
|
||||
to SQLite plugin state. Doctor imports `msteams-sso-tokens.json` and
|
||||
`msteams-delegated.json`, rebuilds canonical SSO token keys from payloads,
|
||||
and removes the source files.
|
||||
- Microsoft Teams SSO tokens moved from locked JSON files to SQLite plugin
|
||||
state. Doctor imports `msteams-sso-tokens.json`, rebuilds canonical SSO token
|
||||
keys from payloads, and removes the source file. Delegated OAuth tokens stay
|
||||
on their existing private credential-file boundary.
|
||||
- Matrix sync cache state moved from `bot-storage.json` to SQLite plugin
|
||||
state. Doctor imports legacy raw or wrapped sync payloads and removes the
|
||||
source file. Active Matrix and QA Matrix clients pass a SQLite sync-store root
|
||||
@@ -1613,13 +1613,13 @@ Move these into the global database:
|
||||
`reply-cache`, `sent-echoes`) instead of `imessage/catchup/*.json`,
|
||||
`imessage/reply-cache.jsonl`, and `imessage/sent-echoes.jsonl`; the iMessage
|
||||
doctor/setup migration imports and removes the legacy files.
|
||||
- Microsoft Teams conversations, polls, delegated tokens, pending uploads, and
|
||||
feedback learnings now use SQLite plugin state/blob namespaces
|
||||
(`conversations`, `polls`, `delegated-tokens`, `pending-uploads`,
|
||||
- Microsoft Teams conversations, polls, SSO tokens, and feedback learnings now
|
||||
use SQLite plugin state namespaces (`conversations`, `polls`, `sso-tokens`,
|
||||
`feedback-learnings`) instead of `msteams-conversations.json`,
|
||||
`msteams-polls.json`, `msteams-delegated.json`,
|
||||
`msteams-pending-uploads.json`, and `*.learnings.json`; the Microsoft Teams
|
||||
doctor/setup migration imports and removes the legacy files.
|
||||
`msteams-polls.json`, `msteams-sso-tokens.json`, and `*.learnings.json`; the
|
||||
Microsoft Teams doctor/setup migration imports and archives the legacy files.
|
||||
Pending uploads are a short-lived SQLite cache and old JSON cache files are
|
||||
not migrated.
|
||||
- Matrix sync cache, storage metadata, thread bindings, inbound dedupe markers,
|
||||
startup verification cooldown state, credentials, recovery keys, and SDK
|
||||
IndexedDB crypto snapshots now use SQLite plugin state/blob namespaces under
|
||||
@@ -2191,8 +2191,6 @@ Add a repo check that fails new runtime writes to legacy state paths:
|
||||
- Microsoft Teams `msteams-conversations.json`
|
||||
- Microsoft Teams `msteams-polls.json`
|
||||
- Microsoft Teams `msteams-sso-tokens.json`
|
||||
- Microsoft Teams `msteams-delegated.json`
|
||||
- Microsoft Teams `msteams-pending-uploads.json`
|
||||
- Microsoft Teams `*.learnings.json`
|
||||
- Matrix `bot-storage.json`
|
||||
- Matrix `sync-store.json`
|
||||
|
||||
@@ -12,6 +12,27 @@ import type {
|
||||
} from "openclaw/plugin-sdk/runtime-doctor";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { stateMigrations } from "./doctor-contract-api.js";
|
||||
import {
|
||||
buildMSTeamsConversationStateKey,
|
||||
MSTEAMS_CONVERSATIONS_NAMESPACE,
|
||||
type MSTeamsLegacyConversationStoreData,
|
||||
} from "./src/conversation-store-state.js";
|
||||
import type { StoredConversationReference } from "./src/conversation-store.js";
|
||||
import {
|
||||
buildMSTeamsPollStateKey,
|
||||
buildMSTeamsPollVoteBucketKey,
|
||||
MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE,
|
||||
MSTEAMS_POLLS_NAMESPACE,
|
||||
selectMSTeamsPollVoteBucket,
|
||||
type MSTeamsPoll,
|
||||
type StoredMSTeamsPoll,
|
||||
type StoredMSTeamsPollVoteBucket,
|
||||
} from "./src/polls.js";
|
||||
import {
|
||||
makeMSTeamsSsoTokenStoreKey,
|
||||
MSTEAMS_SSO_TOKENS_NAMESPACE,
|
||||
type MSTeamsSsoStoredToken,
|
||||
} from "./src/sso-token-store.js";
|
||||
|
||||
function createDoctorContext(env: NodeJS.ProcessEnv): PluginDoctorStateMigrationContext {
|
||||
return {
|
||||
@@ -32,6 +53,14 @@ function learningStoreKey(storePath: string, sessionKey: string): string {
|
||||
return createHash("sha256").update(`${storePath}\0${sessionKey}`, "utf8").digest("hex");
|
||||
}
|
||||
|
||||
function migrationById(id: string) {
|
||||
const migration = stateMigrations.find((entry) => entry.id === id);
|
||||
if (!migration) {
|
||||
throw new Error(`missing migration ${id}`);
|
||||
}
|
||||
return migration;
|
||||
}
|
||||
|
||||
describe("msteams doctor state migration", () => {
|
||||
let stateDir = "";
|
||||
let env: NodeJS.ProcessEnv;
|
||||
@@ -46,6 +75,187 @@ describe("msteams doctor state migration", () => {
|
||||
await fs.rm(stateDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("imports legacy conversations into plugin state", async () => {
|
||||
const filePath = path.join(stateDir, "msteams-conversations.json");
|
||||
const ref: StoredConversationReference = {
|
||||
conversation: { id: "19:conv@thread.tacv2" },
|
||||
channelId: "msteams",
|
||||
serviceUrl: "https://service.example.com",
|
||||
user: { id: "user-1" },
|
||||
};
|
||||
await fs.writeFile(
|
||||
filePath,
|
||||
`${JSON.stringify({
|
||||
version: 1,
|
||||
conversations: {
|
||||
"19:conv@thread.tacv2": ref,
|
||||
},
|
||||
} satisfies MSTeamsLegacyConversationStoreData)}\n`,
|
||||
);
|
||||
|
||||
const migration = migrationById("msteams-conversations-json-to-plugin-state");
|
||||
const context = createDoctorContext(env);
|
||||
await expect(
|
||||
migration.detectLegacyState({
|
||||
config: {},
|
||||
env,
|
||||
stateDir,
|
||||
oauthDir: path.join(stateDir, "oauth"),
|
||||
context,
|
||||
}),
|
||||
).resolves.toMatchObject({
|
||||
preview: [expect.stringContaining("Microsoft Teams conversations")],
|
||||
});
|
||||
|
||||
const result = await migration.migrateLegacyState({
|
||||
config: {},
|
||||
env,
|
||||
stateDir,
|
||||
oauthDir: path.join(stateDir, "oauth"),
|
||||
context,
|
||||
});
|
||||
|
||||
expect(result.warnings).toEqual([]);
|
||||
expect(result.changes).toEqual([
|
||||
expect.stringContaining("Migrated 1 Microsoft Teams conversation entry"),
|
||||
expect.stringContaining("Archived Microsoft Teams conversation legacy source"),
|
||||
]);
|
||||
await expect(fs.access(filePath)).rejects.toThrow();
|
||||
await expect(fs.access(`${filePath}.migrated`)).resolves.toBeUndefined();
|
||||
const store = context.openPluginStateKeyedStore<StoredConversationReference>({
|
||||
namespace: MSTEAMS_CONVERSATIONS_NAMESPACE,
|
||||
maxEntries: 2000,
|
||||
});
|
||||
await expect(
|
||||
store.lookup(buildMSTeamsConversationStateKey("19:conv@thread.tacv2")),
|
||||
).resolves.toMatchObject({
|
||||
conversation: { id: "19:conv@thread.tacv2" },
|
||||
user: { id: "user-1" },
|
||||
});
|
||||
});
|
||||
|
||||
it("imports legacy polls and vote buckets into plugin state", async () => {
|
||||
const filePath = path.join(stateDir, "msteams-polls.json");
|
||||
const poll: MSTeamsPoll = {
|
||||
id: "poll-legacy",
|
||||
question: "Lunch?",
|
||||
options: ["Pizza", "Sushi"],
|
||||
maxSelections: 1,
|
||||
createdAt: new Date().toISOString(),
|
||||
votes: {
|
||||
"user-legacy": ["0"],
|
||||
"user-new": ["1"],
|
||||
},
|
||||
};
|
||||
await fs.writeFile(
|
||||
filePath,
|
||||
`${JSON.stringify({
|
||||
version: 1,
|
||||
polls: {
|
||||
"poll-legacy": poll,
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
const context = createDoctorContext(env);
|
||||
const voteBucketStore = context.openPluginStateKeyedStore<StoredMSTeamsPollVoteBucket>({
|
||||
namespace: MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE,
|
||||
maxEntries: 32_032,
|
||||
});
|
||||
const legacyBucket = selectMSTeamsPollVoteBucket("poll-legacy", "user-legacy");
|
||||
await voteBucketStore.register(buildMSTeamsPollVoteBucketKey("poll-legacy", legacyBucket), {
|
||||
pollId: "poll-legacy",
|
||||
bucket: legacyBucket,
|
||||
votes: { "user-legacy": ["1"] },
|
||||
updatedAt: poll.createdAt,
|
||||
});
|
||||
|
||||
const migration = migrationById("msteams-polls-json-to-plugin-state");
|
||||
const result = await migration.migrateLegacyState({
|
||||
config: {},
|
||||
env,
|
||||
stateDir,
|
||||
oauthDir: path.join(stateDir, "oauth"),
|
||||
context,
|
||||
});
|
||||
|
||||
expect(result.warnings).toEqual([]);
|
||||
expect(result.changes).toEqual([
|
||||
expect.stringContaining("Migrated 1 Microsoft Teams poll entry"),
|
||||
expect.stringContaining("Archived Microsoft Teams poll legacy source"),
|
||||
]);
|
||||
const pollStore = context.openPluginStateKeyedStore<StoredMSTeamsPoll>({
|
||||
namespace: MSTEAMS_POLLS_NAMESPACE,
|
||||
maxEntries: 2000,
|
||||
});
|
||||
await expect(pollStore.lookup(buildMSTeamsPollStateKey("poll-legacy"))).resolves.toMatchObject({
|
||||
id: "poll-legacy",
|
||||
question: "Lunch?",
|
||||
});
|
||||
const newBucket = selectMSTeamsPollVoteBucket("poll-legacy", "user-new");
|
||||
await expect(
|
||||
voteBucketStore.lookup(buildMSTeamsPollVoteBucketKey("poll-legacy", legacyBucket)),
|
||||
).resolves.toMatchObject({
|
||||
votes: { "user-legacy": ["1"] },
|
||||
});
|
||||
await expect(
|
||||
voteBucketStore.lookup(buildMSTeamsPollVoteBucketKey("poll-legacy", newBucket)),
|
||||
).resolves.toMatchObject({
|
||||
votes: { "user-new": ["1"] },
|
||||
});
|
||||
await expect(fs.access(`${filePath}.migrated`)).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("imports legacy SSO tokens into the existing plugin-state token namespace", async () => {
|
||||
const filePath = path.join(stateDir, "msteams-sso-tokens.json");
|
||||
const token: MSTeamsSsoStoredToken = {
|
||||
connectionName: "conn::alpha",
|
||||
userId: "user::one",
|
||||
token: "test-token-value",
|
||||
updatedAt: "2026-04-10T00:00:00.000Z",
|
||||
};
|
||||
await fs.writeFile(
|
||||
filePath,
|
||||
`${JSON.stringify({
|
||||
version: 1,
|
||||
tokens: {
|
||||
"legacy::wrong-key": token,
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
const migration = migrationById("msteams-sso-tokens-json-to-plugin-state");
|
||||
const context = createDoctorContext(env);
|
||||
const result = await migration.migrateLegacyState({
|
||||
config: {},
|
||||
env,
|
||||
stateDir,
|
||||
oauthDir: path.join(stateDir, "oauth"),
|
||||
context,
|
||||
});
|
||||
|
||||
expect(result.warnings).toEqual([]);
|
||||
expect(result.changes).toEqual([
|
||||
expect.stringContaining("Migrated 1 Microsoft Teams SSO token entry"),
|
||||
expect.stringContaining("Archived Microsoft Teams SSO-token legacy source"),
|
||||
]);
|
||||
const store = context.openPluginStateKeyedStore<MSTeamsSsoStoredToken>({
|
||||
namespace: MSTEAMS_SSO_TOKENS_NAMESPACE,
|
||||
maxEntries: 5000,
|
||||
});
|
||||
await expect(
|
||||
store.lookup(makeMSTeamsSsoTokenStoreKey("conn::alpha", "user::one")),
|
||||
).resolves.toEqual(token);
|
||||
expect(result.changes.join("\n")).not.toContain(token.token);
|
||||
expect(result.warnings.join("\n")).not.toContain(token.token);
|
||||
await expect(fs.access(`${filePath}.migrated`)).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not register a doctor migration for pending-upload cache files", () => {
|
||||
expect(stateMigrations.map((migration) => migration.id)).not.toContain(
|
||||
"msteams-pending-uploads-json-to-plugin-state",
|
||||
);
|
||||
});
|
||||
|
||||
it("imports legacy feedback learnings into plugin state", async () => {
|
||||
const agentStoreTemplate = path.join(stateDir, "agents", "{agentId}", "sessions");
|
||||
const mainStorePath = path.join(stateDir, "agents", "main", "sessions");
|
||||
@@ -69,7 +279,7 @@ describe("msteams doctor state migration", () => {
|
||||
await fs.writeFile(encodedSourcePath, JSON.stringify(["Be concise", "Use examples"]));
|
||||
await fs.writeFile(sanitizedSourcePath, JSON.stringify(["Prefer cards for channel feedback"]));
|
||||
|
||||
const migration = stateMigrations[0];
|
||||
const migration = migrationById("msteams-feedback-learnings-json-to-plugin-state");
|
||||
const context = createDoctorContext(env);
|
||||
await context
|
||||
.openPluginStateKeyedStore({
|
||||
|
||||
@@ -4,6 +4,43 @@ import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
|
||||
import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime";
|
||||
import { normalizeStoredConversationId } from "./src/conversation-store-helpers.js";
|
||||
import {
|
||||
buildMSTeamsConversationStateKey,
|
||||
MSTEAMS_CONVERSATIONS_LEGACY_FILENAME,
|
||||
MSTEAMS_CONVERSATIONS_NAMESPACE,
|
||||
MSTEAMS_SQLITE_MAX_CONVERSATION_ROWS,
|
||||
normalizeMSTeamsLegacyConversationStore,
|
||||
prepareMSTeamsConversationReferenceForStorage,
|
||||
selectRetainedMSTeamsConversations,
|
||||
type MSTeamsLegacyConversationStoreData,
|
||||
} from "./src/conversation-store-state.js";
|
||||
import type { StoredConversationReference } from "./src/conversation-store.js";
|
||||
import {
|
||||
buildMSTeamsPollStateKey,
|
||||
buildMSTeamsPollVoteBucketKey,
|
||||
MSTEAMS_MAX_POLL_VOTE_BUCKET_ROWS,
|
||||
MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE,
|
||||
MSTEAMS_POLLS_LEGACY_FILENAME,
|
||||
MSTEAMS_POLLS_NAMESPACE,
|
||||
MSTEAMS_SQLITE_MAX_POLL_ROWS,
|
||||
selectMSTeamsPollVoteBucket,
|
||||
selectRetainedMSTeamsPolls,
|
||||
splitMSTeamsPoll,
|
||||
type MSTeamsPoll,
|
||||
type MSTeamsPollStoreData,
|
||||
type StoredMSTeamsPoll,
|
||||
type StoredMSTeamsPollVoteBucket,
|
||||
} from "./src/polls.js";
|
||||
import {
|
||||
isMSTeamsSsoStoreData,
|
||||
makeMSTeamsSsoTokenStoreKey,
|
||||
MSTEAMS_MAX_SSO_TOKENS,
|
||||
MSTEAMS_SSO_TOKENS_LEGACY_FILENAME,
|
||||
MSTEAMS_SSO_TOKENS_NAMESPACE,
|
||||
normalizeMSTeamsSsoStoredToken,
|
||||
type MSTeamsSsoStoredToken,
|
||||
} from "./src/sso-token-store.js";
|
||||
|
||||
type FeedbackLearningEntry = {
|
||||
sessionKey: string;
|
||||
@@ -13,6 +50,7 @@ type FeedbackLearningEntry = {
|
||||
|
||||
const LEARNINGS_NAMESPACE = "feedback-learnings";
|
||||
const MAX_LEARNING_ENTRIES = 10_000;
|
||||
const MSTEAMS_PLUGIN_ID = "Microsoft Teams";
|
||||
|
||||
function encodeSessionKey(sessionKey: string): string {
|
||||
return Buffer.from(sessionKey, "utf8").toString("base64url");
|
||||
@@ -102,6 +140,90 @@ async function fileExists(filePath: string): Promise<boolean> {
|
||||
}
|
||||
}
|
||||
|
||||
function resolveStateFilePath(stateDir: string, filename: string): string {
|
||||
return path.join(stateDir, filename);
|
||||
}
|
||||
|
||||
async function readLegacyJsonFile<T>(
|
||||
filePath: string,
|
||||
parse: (value: unknown) => T | null,
|
||||
): Promise<T | null> {
|
||||
try {
|
||||
return parse(JSON.parse(await fs.readFile(filePath, "utf8")) as unknown);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
|
||||
}
|
||||
|
||||
function isStringArray(value: unknown): value is string[] {
|
||||
return Array.isArray(value) && value.every((entry) => typeof entry === "string");
|
||||
}
|
||||
|
||||
function parseLegacyConversationStore(value: unknown): MSTeamsLegacyConversationStoreData | null {
|
||||
if (!isRecord(value) || value.version !== 1 || !isRecord(value.conversations)) {
|
||||
return null;
|
||||
}
|
||||
return normalizeMSTeamsLegacyConversationStore({
|
||||
version: 1,
|
||||
conversations: value.conversations as Record<string, StoredConversationReference>,
|
||||
});
|
||||
}
|
||||
|
||||
function parseLegacyPoll(value: unknown): MSTeamsPoll | null {
|
||||
if (!isRecord(value)) {
|
||||
return null;
|
||||
}
|
||||
const votes = isRecord(value.votes) ? value.votes : null;
|
||||
if (
|
||||
typeof value.id !== "string" ||
|
||||
!value.id ||
|
||||
typeof value.question !== "string" ||
|
||||
!value.question ||
|
||||
!isStringArray(value.options) ||
|
||||
typeof value.maxSelections !== "number" ||
|
||||
!Number.isFinite(value.maxSelections) ||
|
||||
typeof value.createdAt !== "string" ||
|
||||
!votes
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
const normalizedVotes: Record<string, string[]> = {};
|
||||
for (const [voterId, selections] of Object.entries(votes)) {
|
||||
if (typeof voterId === "string" && isStringArray(selections)) {
|
||||
normalizedVotes[voterId] = selections;
|
||||
}
|
||||
}
|
||||
return {
|
||||
id: value.id,
|
||||
question: value.question,
|
||||
options: value.options,
|
||||
maxSelections: value.maxSelections,
|
||||
createdAt: value.createdAt,
|
||||
...(typeof value.updatedAt === "string" ? { updatedAt: value.updatedAt } : {}),
|
||||
...(typeof value.conversationId === "string" ? { conversationId: value.conversationId } : {}),
|
||||
...(typeof value.messageId === "string" ? { messageId: value.messageId } : {}),
|
||||
votes: normalizedVotes,
|
||||
};
|
||||
}
|
||||
|
||||
function parseLegacyPollStore(value: unknown): MSTeamsPollStoreData | null {
|
||||
if (!isRecord(value) || value.version !== 1 || !isRecord(value.polls)) {
|
||||
return null;
|
||||
}
|
||||
const polls: Record<string, MSTeamsPoll> = {};
|
||||
for (const [pollId, poll] of Object.entries(value.polls)) {
|
||||
const parsed = parseLegacyPoll(poll);
|
||||
if (parsed) {
|
||||
polls[pollId] = parsed;
|
||||
}
|
||||
}
|
||||
return { version: 1, polls };
|
||||
}
|
||||
|
||||
async function listLegacyLearningFiles(
|
||||
storePath: string,
|
||||
): Promise<
|
||||
@@ -147,25 +269,23 @@ async function listLegacyLearningFiles(
|
||||
|
||||
async function archiveLegacySource(params: {
|
||||
filePath: string;
|
||||
label?: string;
|
||||
changes: string[];
|
||||
warnings: string[];
|
||||
}): Promise<void> {
|
||||
const archivedPath = `${params.filePath}.migrated`;
|
||||
const label = params.label ?? "Microsoft Teams feedback-learning";
|
||||
if (await fileExists(archivedPath)) {
|
||||
params.warnings.push(
|
||||
`Left migrated Microsoft Teams feedback-learning source in place because ${archivedPath} already exists`,
|
||||
`Left migrated ${label} source in place because ${archivedPath} already exists`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await fs.rename(params.filePath, archivedPath);
|
||||
params.changes.push(
|
||||
`Archived Microsoft Teams feedback-learning legacy source -> ${archivedPath}`,
|
||||
);
|
||||
params.changes.push(`Archived ${label} legacy source -> ${archivedPath}`);
|
||||
} catch (err) {
|
||||
params.warnings.push(
|
||||
`Failed archiving Microsoft Teams feedback-learning legacy source: ${String(err)}`,
|
||||
);
|
||||
params.warnings.push(`Failed archiving ${label} legacy source: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -183,6 +303,200 @@ function mergeLearnings(legacy: string[], existing?: FeedbackLearningEntry): str
|
||||
}
|
||||
|
||||
export const stateMigrations: PluginDoctorStateMigration[] = [
|
||||
{
|
||||
id: "msteams-conversations-json-to-plugin-state",
|
||||
label: "Microsoft Teams conversations",
|
||||
async detectLegacyState(params) {
|
||||
const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_CONVERSATIONS_LEGACY_FILENAME);
|
||||
const state = await readLegacyJsonFile(filePath, parseLegacyConversationStore);
|
||||
if (!state || Object.keys(state.conversations).length === 0) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
preview: [
|
||||
`- ${MSTEAMS_PLUGIN_ID} conversations: ${Object.keys(state.conversations).length} entries -> plugin state (${MSTEAMS_CONVERSATIONS_NAMESPACE})`,
|
||||
],
|
||||
};
|
||||
},
|
||||
async migrateLegacyState(params) {
|
||||
const changes: string[] = [];
|
||||
const warnings: string[] = [];
|
||||
const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_CONVERSATIONS_LEGACY_FILENAME);
|
||||
const state = await readLegacyJsonFile(filePath, parseLegacyConversationStore);
|
||||
if (!state) {
|
||||
return { changes, warnings };
|
||||
}
|
||||
const store = params.context.openPluginStateKeyedStore<StoredConversationReference>({
|
||||
namespace: MSTEAMS_CONVERSATIONS_NAMESPACE,
|
||||
maxEntries: MSTEAMS_SQLITE_MAX_CONVERSATION_ROWS,
|
||||
});
|
||||
let imported = 0;
|
||||
for (const [rawConversationId, reference] of selectRetainedMSTeamsConversations(
|
||||
state.conversations,
|
||||
)) {
|
||||
const conversationId = normalizeStoredConversationId(rawConversationId);
|
||||
if (!conversationId) {
|
||||
continue;
|
||||
}
|
||||
const didImport = await store.registerIfAbsent(
|
||||
buildMSTeamsConversationStateKey(conversationId),
|
||||
prepareMSTeamsConversationReferenceForStorage(conversationId, reference),
|
||||
);
|
||||
if (didImport) {
|
||||
imported++;
|
||||
}
|
||||
}
|
||||
changes.push(
|
||||
`Migrated ${imported} ${MSTEAMS_PLUGIN_ID} conversation ${imported === 1 ? "entry" : "entries"} -> plugin state`,
|
||||
);
|
||||
await archiveLegacySource({
|
||||
filePath,
|
||||
label: `${MSTEAMS_PLUGIN_ID} conversation`,
|
||||
changes,
|
||||
warnings,
|
||||
});
|
||||
return { changes, warnings };
|
||||
},
|
||||
},
|
||||
{
|
||||
id: "msteams-polls-json-to-plugin-state",
|
||||
label: "Microsoft Teams polls",
|
||||
async detectLegacyState(params) {
|
||||
const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_POLLS_LEGACY_FILENAME);
|
||||
const state = await readLegacyJsonFile(filePath, parseLegacyPollStore);
|
||||
if (!state || Object.keys(state.polls).length === 0) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
preview: [
|
||||
`- ${MSTEAMS_PLUGIN_ID} polls: ${Object.keys(state.polls).length} entries -> plugin state (${MSTEAMS_POLLS_NAMESPACE})`,
|
||||
],
|
||||
};
|
||||
},
|
||||
async migrateLegacyState(params) {
|
||||
const changes: string[] = [];
|
||||
const warnings: string[] = [];
|
||||
const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_POLLS_LEGACY_FILENAME);
|
||||
const state = await readLegacyJsonFile(filePath, parseLegacyPollStore);
|
||||
if (!state) {
|
||||
return { changes, warnings };
|
||||
}
|
||||
const pollStore = params.context.openPluginStateKeyedStore<StoredMSTeamsPoll>({
|
||||
namespace: MSTEAMS_POLLS_NAMESPACE,
|
||||
maxEntries: MSTEAMS_SQLITE_MAX_POLL_ROWS,
|
||||
});
|
||||
const voteBucketStore = params.context.openPluginStateKeyedStore<StoredMSTeamsPollVoteBucket>(
|
||||
{
|
||||
namespace: MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE,
|
||||
maxEntries: MSTEAMS_MAX_POLL_VOTE_BUCKET_ROWS,
|
||||
},
|
||||
);
|
||||
let imported = 0;
|
||||
for (const [pollId, poll] of selectRetainedMSTeamsPolls(state.polls)) {
|
||||
const { metadata, votes } = splitMSTeamsPoll(poll);
|
||||
const didImportPoll = await pollStore.registerIfAbsent(
|
||||
buildMSTeamsPollStateKey(pollId),
|
||||
metadata,
|
||||
);
|
||||
const buckets = new Map<string, Record<string, string[]>>();
|
||||
for (const [voterId, selections] of Object.entries(votes)) {
|
||||
const bucket = selectMSTeamsPollVoteBucket(pollId, voterId);
|
||||
const bucketVotes = buckets.get(bucket) ?? {};
|
||||
bucketVotes[voterId] = selections;
|
||||
buckets.set(bucket, bucketVotes);
|
||||
}
|
||||
let importedVoteBucket = false;
|
||||
for (const [bucket, bucketVotes] of buckets) {
|
||||
const key = buildMSTeamsPollVoteBucketKey(pollId, bucket);
|
||||
const existing = await voteBucketStore.lookup(key);
|
||||
await voteBucketStore.register(key, {
|
||||
pollId,
|
||||
bucket,
|
||||
votes: { ...bucketVotes, ...existing?.votes },
|
||||
updatedAt: poll.updatedAt ?? poll.createdAt,
|
||||
});
|
||||
importedVoteBucket = true;
|
||||
}
|
||||
if (didImportPoll || importedVoteBucket) {
|
||||
imported++;
|
||||
}
|
||||
}
|
||||
changes.push(
|
||||
`Migrated ${imported} ${MSTEAMS_PLUGIN_ID} poll ${imported === 1 ? "entry" : "entries"} -> plugin state`,
|
||||
);
|
||||
await archiveLegacySource({
|
||||
filePath,
|
||||
label: `${MSTEAMS_PLUGIN_ID} poll`,
|
||||
changes,
|
||||
warnings,
|
||||
});
|
||||
return { changes, warnings };
|
||||
},
|
||||
},
|
||||
{
|
||||
id: "msteams-sso-tokens-json-to-plugin-state",
|
||||
label: "Microsoft Teams SSO tokens",
|
||||
async detectLegacyState(params) {
|
||||
const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_SSO_TOKENS_LEGACY_FILENAME);
|
||||
const state = await readLegacyJsonFile(filePath, (value) =>
|
||||
isMSTeamsSsoStoreData(value) ? value : null,
|
||||
);
|
||||
if (!state || Object.keys(state.tokens).length === 0) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
preview: [
|
||||
`- ${MSTEAMS_PLUGIN_ID} SSO tokens: ${Object.keys(state.tokens).length} entries -> plugin state (${MSTEAMS_SSO_TOKENS_NAMESPACE})`,
|
||||
],
|
||||
};
|
||||
},
|
||||
async migrateLegacyState(params) {
|
||||
const changes: string[] = [];
|
||||
const warnings: string[] = [];
|
||||
const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_SSO_TOKENS_LEGACY_FILENAME);
|
||||
const state = await readLegacyJsonFile(filePath, (value) =>
|
||||
isMSTeamsSsoStoreData(value) ? value : null,
|
||||
);
|
||||
if (!state) {
|
||||
return { changes, warnings };
|
||||
}
|
||||
const store = params.context.openPluginStateKeyedStore<MSTeamsSsoStoredToken>({
|
||||
namespace: MSTEAMS_SSO_TOKENS_NAMESPACE,
|
||||
maxEntries: MSTEAMS_MAX_SSO_TOKENS,
|
||||
});
|
||||
let imported = 0;
|
||||
let skipped = 0;
|
||||
for (const token of Object.values(state.tokens)) {
|
||||
const normalized = normalizeMSTeamsSsoStoredToken(token);
|
||||
if (!normalized) {
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
const didImport = await store.registerIfAbsent(
|
||||
makeMSTeamsSsoTokenStoreKey(normalized.connectionName, normalized.userId),
|
||||
normalized,
|
||||
);
|
||||
if (didImport) {
|
||||
imported++;
|
||||
}
|
||||
}
|
||||
changes.push(
|
||||
`Migrated ${imported} ${MSTEAMS_PLUGIN_ID} SSO token ${imported === 1 ? "entry" : "entries"} -> plugin state`,
|
||||
);
|
||||
if (skipped > 0) {
|
||||
warnings.push(
|
||||
`Skipped ${skipped} malformed ${MSTEAMS_PLUGIN_ID} SSO token ${skipped === 1 ? "entry" : "entries"} during migration`,
|
||||
);
|
||||
}
|
||||
await archiveLegacySource({
|
||||
filePath,
|
||||
label: `${MSTEAMS_PLUGIN_ID} SSO-token`,
|
||||
changes,
|
||||
warnings,
|
||||
});
|
||||
return { changes, warnings };
|
||||
},
|
||||
},
|
||||
{
|
||||
id: "msteams-feedback-learnings-json-to-plugin-state",
|
||||
label: "Microsoft Teams feedback learnings",
|
||||
|
||||
@@ -22,7 +22,7 @@ describe("msteams conversation store (plugin state)", () => {
|
||||
setMSTeamsRuntime(msteamsRuntimeStub);
|
||||
});
|
||||
|
||||
it("filters and prunes expired entries while preserving legacy entries without lastSeenAt", async () => {
|
||||
it("filters expired SQLite entries while preserving entries without lastSeenAt", async () => {
|
||||
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
|
||||
const env: NodeJS.ProcessEnv = {
|
||||
...process.env,
|
||||
@@ -35,35 +35,28 @@ describe("msteams conversation store (plugin state)", () => {
|
||||
serviceUrl: "https://service.example.com",
|
||||
user: { id: "u1", aadObjectId: "aad1" },
|
||||
};
|
||||
const filePath = path.join(stateDir, "msteams-conversations.json");
|
||||
await fs.promises.mkdir(path.dirname(filePath), { recursive: true });
|
||||
await fs.promises.writeFile(
|
||||
filePath,
|
||||
`${JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
conversations: {
|
||||
"19:active@thread.tacv2": ref,
|
||||
"19:old@thread.tacv2": {
|
||||
...ref,
|
||||
conversation: { id: "19:old@thread.tacv2" },
|
||||
lastSeenAt: new Date(Date.now() - 60_000).toISOString(),
|
||||
},
|
||||
"19:legacy@thread.tacv2": {
|
||||
...ref,
|
||||
conversation: { id: "19:legacy@thread.tacv2" },
|
||||
},
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
)}\n`,
|
||||
const sqliteStore = createPluginStateKeyedStoreForTests<StoredConversationReference>(
|
||||
"msteams",
|
||||
{
|
||||
namespace: "conversations",
|
||||
maxEntries: 2000,
|
||||
env,
|
||||
},
|
||||
);
|
||||
await sqliteStore.register(conversationStateKey("19:active@thread.tacv2"), ref);
|
||||
await sqliteStore.register(conversationStateKey("19:old@thread.tacv2"), {
|
||||
...ref,
|
||||
conversation: { id: "19:old@thread.tacv2" },
|
||||
lastSeenAt: new Date(Date.now() - 60_000).toISOString(),
|
||||
});
|
||||
await sqliteStore.register(conversationStateKey("19:legacy@thread.tacv2"), {
|
||||
...ref,
|
||||
conversation: { id: "19:legacy@thread.tacv2" },
|
||||
});
|
||||
|
||||
const store = createMSTeamsConversationStoreState({ env, ttlMs: 1_000 });
|
||||
const ids = (await store.list()).map((entry) => entry.conversationId).toSorted();
|
||||
expect(ids).toEqual(["19:active@thread.tacv2", "19:legacy@thread.tacv2"]);
|
||||
await expect(fs.promises.access(filePath)).rejects.toThrow();
|
||||
|
||||
expect(await store.get("19:old@thread.tacv2")).toBeNull();
|
||||
const legacyConversation = await store.get("19:legacy@thread.tacv2");
|
||||
@@ -87,7 +80,7 @@ describe("msteams conversation store (plugin state)", () => {
|
||||
).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not let a stale legacy JSON file overwrite existing SQLite rows", async () => {
|
||||
it("ignores a stale legacy JSON file at runtime", async () => {
|
||||
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
|
||||
const env: NodeJS.ProcessEnv = {
|
||||
...process.env,
|
||||
@@ -125,41 +118,23 @@ describe("msteams conversation store (plugin state)", () => {
|
||||
|
||||
const store = createMSTeamsConversationStoreState({ env });
|
||||
await expect(store.get("conv-current")).resolves.toEqual(ref);
|
||||
await expect(fs.promises.access(filePath)).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("hashes external conversation ids before using plugin-state keys", async () => {
|
||||
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
|
||||
const longConversationId = `a:${"x".repeat(900)}`;
|
||||
const filePath = path.join(stateDir, "msteams-conversations.json");
|
||||
await fs.promises.writeFile(
|
||||
filePath,
|
||||
`${JSON.stringify({
|
||||
version: 1,
|
||||
conversations: {
|
||||
[longConversationId]: {
|
||||
channelId: "msteams",
|
||||
serviceUrl: "https://service.example.com",
|
||||
user: { id: "long-user" },
|
||||
} satisfies StoredConversationReference,
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
const store = createMSTeamsConversationStoreState({ stateDir });
|
||||
|
||||
await expect(store.get(longConversationId)).resolves.toMatchObject({
|
||||
conversation: { id: longConversationId },
|
||||
user: { id: "long-user" },
|
||||
});
|
||||
await store.upsert(`${longConversationId}-new`, {
|
||||
await store.upsert(longConversationId, {
|
||||
conversation: { conversationType: "personal" },
|
||||
channelId: "msteams",
|
||||
serviceUrl: "https://service.example.com",
|
||||
user: { id: "long-user-new" },
|
||||
user: { id: "long-user" },
|
||||
});
|
||||
await expect(store.get(`${longConversationId}-new`)).resolves.toMatchObject({
|
||||
conversation: { id: `${longConversationId}-new` },
|
||||
user: { id: "long-user-new" },
|
||||
await expect(store.get(longConversationId)).resolves.toMatchObject({
|
||||
conversation: { id: longConversationId },
|
||||
user: { id: "long-user" },
|
||||
});
|
||||
});
|
||||
|
||||
@@ -199,29 +174,33 @@ describe("msteams conversation store (plugin state)", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps newest legacy conversations by lastSeenAt at the row cap", async () => {
|
||||
it("keeps newest conversations by lastSeenAt at the row cap", async () => {
|
||||
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
|
||||
const filePath = path.join(stateDir, "msteams-conversations.json");
|
||||
const conversations: Record<string, StoredConversationReference> = {
|
||||
"conv-recent": {
|
||||
conversation: { id: "conv-recent" },
|
||||
channelId: "msteams",
|
||||
serviceUrl: "https://service.example.com",
|
||||
lastSeenAt: "2026-03-25T20:00:00.000Z",
|
||||
const env: NodeJS.ProcessEnv = { ...process.env, OPENCLAW_STATE_DIR: stateDir };
|
||||
const sqliteStore = createPluginStateKeyedStoreForTests<StoredConversationReference>(
|
||||
"msteams",
|
||||
{
|
||||
namespace: "conversations",
|
||||
maxEntries: 2000,
|
||||
env,
|
||||
},
|
||||
};
|
||||
);
|
||||
for (let index = 0; index < 1000; index += 1) {
|
||||
const id = `conv-${String(index).padStart(4, "0")}`;
|
||||
conversations[id] = {
|
||||
await sqliteStore.register(conversationStateKey(id), {
|
||||
conversation: { id },
|
||||
channelId: "msteams",
|
||||
serviceUrl: "https://service.example.com",
|
||||
lastSeenAt: new Date(Date.UTC(2026, 1, 1, 0, 0, index)).toISOString(),
|
||||
};
|
||||
});
|
||||
}
|
||||
await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, conversations })}\n`);
|
||||
|
||||
const store = createMSTeamsConversationStoreState({ stateDir });
|
||||
const store = createMSTeamsConversationStoreState({ env });
|
||||
await store.upsert("conv-recent", {
|
||||
conversation: { id: "conv-recent" },
|
||||
channelId: "msteams",
|
||||
serviceUrl: "https://service.example.com",
|
||||
});
|
||||
const ids = (await store.list()).map((entry) => entry.conversationId);
|
||||
|
||||
expect(ids).toHaveLength(1000);
|
||||
@@ -229,29 +208,33 @@ describe("msteams conversation store (plugin state)", () => {
|
||||
expect(ids).not.toContain("conv-0000");
|
||||
});
|
||||
|
||||
it("treats timestamp-less legacy conversations as oldest during later cap pruning", async () => {
|
||||
it("treats timestamp-less conversations as oldest during later cap pruning", async () => {
|
||||
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
|
||||
const filePath = path.join(stateDir, "msteams-conversations.json");
|
||||
const conversations: Record<string, StoredConversationReference> = {
|
||||
"conv-legacy": {
|
||||
conversation: { id: "conv-legacy" },
|
||||
channelId: "msteams",
|
||||
serviceUrl: "https://service.example.com",
|
||||
const env: NodeJS.ProcessEnv = { ...process.env, OPENCLAW_STATE_DIR: stateDir };
|
||||
const sqliteStore = createPluginStateKeyedStoreForTests<StoredConversationReference>(
|
||||
"msteams",
|
||||
{
|
||||
namespace: "conversations",
|
||||
maxEntries: 2000,
|
||||
env,
|
||||
},
|
||||
};
|
||||
);
|
||||
await sqliteStore.register(conversationStateKey("conv-legacy"), {
|
||||
conversation: { id: "conv-legacy" },
|
||||
channelId: "msteams",
|
||||
serviceUrl: "https://service.example.com",
|
||||
});
|
||||
for (let index = 0; index < 999; index += 1) {
|
||||
const id = `conv-seen-${String(index).padStart(4, "0")}`;
|
||||
conversations[id] = {
|
||||
await sqliteStore.register(conversationStateKey(id), {
|
||||
conversation: { id },
|
||||
channelId: "msteams",
|
||||
serviceUrl: "https://service.example.com",
|
||||
lastSeenAt: new Date(Date.UTC(2026, 1, 1, 0, 0, index)).toISOString(),
|
||||
};
|
||||
});
|
||||
}
|
||||
await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, conversations })}\n`);
|
||||
|
||||
const store = createMSTeamsConversationStoreState({ stateDir });
|
||||
await store.list();
|
||||
const store = createMSTeamsConversationStoreState({ env });
|
||||
await store.upsert("conv-new", {
|
||||
conversation: { id: "conv-new" },
|
||||
channelId: "msteams",
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import {
|
||||
findPreferredDmConversationByUserId,
|
||||
mergeStoredConversationReference,
|
||||
@@ -18,25 +17,17 @@ import {
|
||||
toPluginJsonValue,
|
||||
withMSTeamsSqliteMutationLock,
|
||||
} from "./sqlite-state.js";
|
||||
import { resolveMSTeamsStorePath } from "./storage.js";
|
||||
import { readJsonFile } from "./store-fs.js";
|
||||
|
||||
type ConversationStoreData = {
|
||||
export type MSTeamsLegacyConversationStoreData = {
|
||||
version: 1;
|
||||
conversations: Record<string, StoredConversationReference>;
|
||||
};
|
||||
|
||||
type ConversationMigrationMarker = {
|
||||
importedAt: string;
|
||||
};
|
||||
|
||||
const STORE_FILENAME = "msteams-conversations.json";
|
||||
const CONVERSATIONS_NAMESPACE = "conversations";
|
||||
const CONVERSATION_MIGRATIONS_NAMESPACE = "conversation-migrations";
|
||||
const LEGACY_JSON_MIGRATION_KEY = "msteams-conversations-json-v1";
|
||||
const MAX_CONVERSATIONS = 1000;
|
||||
const SQLITE_MAX_CONVERSATION_ROWS = MAX_CONVERSATIONS + 1000;
|
||||
const CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000;
|
||||
export const MSTEAMS_CONVERSATIONS_LEGACY_FILENAME = "msteams-conversations.json";
|
||||
export const MSTEAMS_CONVERSATIONS_NAMESPACE = "conversations";
|
||||
export const MSTEAMS_MAX_CONVERSATIONS = 1000;
|
||||
export const MSTEAMS_SQLITE_MAX_CONVERSATION_ROWS = MSTEAMS_MAX_CONVERSATIONS + 1000;
|
||||
export const MSTEAMS_CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000;
|
||||
const CONVERSATION_LOCK_FILENAME = "msteams-conversations.sqlite.lock";
|
||||
|
||||
type MSTeamsConversationStoreStateOptions = {
|
||||
@@ -49,31 +40,15 @@ type MSTeamsConversationStoreStateOptions = {
|
||||
|
||||
function createConversationStateStore(params?: MSTeamsConversationStoreStateOptions) {
|
||||
return getMSTeamsRuntime().state.openKeyedStore<StoredConversationReference>({
|
||||
namespace: CONVERSATIONS_NAMESPACE,
|
||||
maxEntries: SQLITE_MAX_CONVERSATION_ROWS,
|
||||
namespace: MSTEAMS_CONVERSATIONS_NAMESPACE,
|
||||
maxEntries: MSTEAMS_SQLITE_MAX_CONVERSATION_ROWS,
|
||||
env: resolveMSTeamsSqliteStateEnv(params),
|
||||
});
|
||||
}
|
||||
|
||||
function createConversationMigrationStore(params?: MSTeamsConversationStoreStateOptions) {
|
||||
return getMSTeamsRuntime().state.openKeyedStore<ConversationMigrationMarker>({
|
||||
namespace: CONVERSATION_MIGRATIONS_NAMESPACE,
|
||||
maxEntries: 100,
|
||||
env: resolveMSTeamsSqliteStateEnv(params),
|
||||
});
|
||||
}
|
||||
|
||||
function resolveLegacyStorePath(params?: MSTeamsConversationStoreStateOptions): string {
|
||||
return resolveMSTeamsStorePath({
|
||||
filename: STORE_FILENAME,
|
||||
env: params?.env,
|
||||
homedir: params?.homedir,
|
||||
stateDir: params?.stateDir,
|
||||
storePath: params?.storePath,
|
||||
});
|
||||
}
|
||||
|
||||
function normalizeLegacyStore(value: ConversationStoreData): ConversationStoreData {
|
||||
export function normalizeMSTeamsLegacyConversationStore(
|
||||
value: MSTeamsLegacyConversationStoreData,
|
||||
): MSTeamsLegacyConversationStoreData {
|
||||
if (
|
||||
value.version !== 1 ||
|
||||
!value.conversations ||
|
||||
@@ -85,11 +60,11 @@ function normalizeLegacyStore(value: ConversationStoreData): ConversationStoreDa
|
||||
return value;
|
||||
}
|
||||
|
||||
function buildConversationStateKey(conversationId: string): string {
|
||||
export function buildMSTeamsConversationStateKey(conversationId: string): string {
|
||||
return crypto.createHash("sha256").update(conversationId).digest("hex");
|
||||
}
|
||||
|
||||
function prepareConversationReferenceForStorage(
|
||||
export function prepareMSTeamsConversationReferenceForStorage(
|
||||
conversationId: string,
|
||||
reference: StoredConversationReference,
|
||||
): StoredConversationReference {
|
||||
@@ -107,14 +82,30 @@ function getStoredConversationId(reference: StoredConversationReference): string
|
||||
return rawId ? normalizeStoredConversationId(rawId) : null;
|
||||
}
|
||||
|
||||
export function selectRetainedMSTeamsConversations(
|
||||
conversations: Record<string, StoredConversationReference>,
|
||||
ttlMs = MSTEAMS_CONVERSATION_TTL_MS,
|
||||
): Array<[string, StoredConversationReference]> {
|
||||
const retained = Object.entries(conversations).filter(([, reference]) => {
|
||||
const lastSeenAt = parseStoredConversationTimestamp(reference.lastSeenAt);
|
||||
return lastSeenAt == null || Date.now() - lastSeenAt <= ttlMs;
|
||||
});
|
||||
if (retained.length <= MSTEAMS_MAX_CONVERSATIONS) {
|
||||
return retained;
|
||||
}
|
||||
retained.sort((a, b) => {
|
||||
const aTs = parseStoredConversationTimestamp(a[1].lastSeenAt) ?? 0;
|
||||
const bTs = parseStoredConversationTimestamp(b[1].lastSeenAt) ?? 0;
|
||||
return aTs - bTs || a[0].localeCompare(b[0]);
|
||||
});
|
||||
return retained.slice(retained.length - MSTEAMS_MAX_CONVERSATIONS);
|
||||
}
|
||||
|
||||
export function createMSTeamsConversationStoreState(
|
||||
params?: MSTeamsConversationStoreStateOptions,
|
||||
): MSTeamsConversationStore {
|
||||
const ttlMs = params?.ttlMs ?? CONVERSATION_TTL_MS;
|
||||
const ttlMs = params?.ttlMs ?? MSTEAMS_CONVERSATION_TTL_MS;
|
||||
const conversationStore = createConversationStateStore(params);
|
||||
const migrationStore = createConversationMigrationStore(params);
|
||||
const legacyStorePath = resolveLegacyStorePath(params);
|
||||
let legacyImportPromise: Promise<void> | null = null;
|
||||
|
||||
const isExpired = (reference: StoredConversationReference): boolean => {
|
||||
const lastSeenAt = parseStoredConversationTimestamp(reference.lastSeenAt);
|
||||
@@ -122,66 +113,11 @@ export function createMSTeamsConversationStoreState(
|
||||
return lastSeenAt != null && Date.now() - lastSeenAt > ttlMs;
|
||||
};
|
||||
|
||||
const selectRetainedConversations = (
|
||||
conversations: Record<string, StoredConversationReference>,
|
||||
): Array<[string, StoredConversationReference]> => {
|
||||
const retained = Object.entries(conversations).filter(([, reference]) => !isExpired(reference));
|
||||
if (retained.length <= MAX_CONVERSATIONS) {
|
||||
return retained;
|
||||
}
|
||||
retained.sort((a, b) => {
|
||||
const aTs = parseStoredConversationTimestamp(a[1].lastSeenAt) ?? 0;
|
||||
const bTs = parseStoredConversationTimestamp(b[1].lastSeenAt) ?? 0;
|
||||
return aTs - bTs || a[0].localeCompare(b[0]);
|
||||
});
|
||||
return retained.slice(retained.length - MAX_CONVERSATIONS);
|
||||
};
|
||||
|
||||
const importLegacyStore = async (): Promise<void> => {
|
||||
if (await migrationStore.lookup(LEGACY_JSON_MIGRATION_KEY)) {
|
||||
return;
|
||||
}
|
||||
const empty: ConversationStoreData = { version: 1, conversations: {} };
|
||||
const { value, exists } = await readJsonFile<ConversationStoreData>(legacyStorePath, empty);
|
||||
if (!exists) {
|
||||
await migrationStore.register(LEGACY_JSON_MIGRATION_KEY, {
|
||||
importedAt: new Date().toISOString(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
const legacy = normalizeLegacyStore(value);
|
||||
for (const [rawConversationId, reference] of selectRetainedConversations(
|
||||
legacy.conversations,
|
||||
)) {
|
||||
const conversationId = normalizeStoredConversationId(rawConversationId);
|
||||
if (!conversationId) {
|
||||
continue;
|
||||
}
|
||||
await conversationStore.registerIfAbsent(
|
||||
buildConversationStateKey(conversationId),
|
||||
toPluginJsonValue(prepareConversationReferenceForStorage(conversationId, reference)),
|
||||
);
|
||||
}
|
||||
await migrationStore.register(LEGACY_JSON_MIGRATION_KEY, {
|
||||
importedAt: new Date().toISOString(),
|
||||
});
|
||||
await fs.rm(legacyStorePath, { force: true }).catch(() => {});
|
||||
};
|
||||
|
||||
const ensureLegacyImported = async (): Promise<void> => {
|
||||
legacyImportPromise ??= withMSTeamsSqliteMutationLock(
|
||||
params,
|
||||
CONVERSATION_LOCK_FILENAME,
|
||||
importLegacyStore,
|
||||
);
|
||||
await legacyImportPromise;
|
||||
};
|
||||
|
||||
const lookupStored = async (
|
||||
conversationId: string,
|
||||
): Promise<StoredConversationReference | null> => {
|
||||
const normalizedId = normalizeStoredConversationId(conversationId);
|
||||
const value = await conversationStore.lookup(buildConversationStateKey(normalizedId));
|
||||
const value = await conversationStore.lookup(buildMSTeamsConversationStateKey(normalizedId));
|
||||
if (!value) {
|
||||
return null;
|
||||
}
|
||||
@@ -192,7 +128,6 @@ export function createMSTeamsConversationStoreState(
|
||||
};
|
||||
|
||||
const entries = async (): Promise<Array<[string, StoredConversationReference]>> => {
|
||||
await ensureLegacyImported();
|
||||
const rows = await conversationStore.entries();
|
||||
const kept: Array<[string, StoredConversationReference]> = [];
|
||||
for (const row of rows) {
|
||||
@@ -208,7 +143,6 @@ export function createMSTeamsConversationStoreState(
|
||||
};
|
||||
|
||||
const lookup = async (conversationId: string): Promise<StoredConversationReference | null> => {
|
||||
await ensureLegacyImported();
|
||||
return await lookupStored(conversationId);
|
||||
};
|
||||
|
||||
@@ -218,8 +152,8 @@ export function createMSTeamsConversationStoreState(
|
||||
): Promise<void> => {
|
||||
const normalizedId = normalizeStoredConversationId(conversationId);
|
||||
await conversationStore.register(
|
||||
buildConversationStateKey(normalizedId),
|
||||
toPluginJsonValue(prepareConversationReferenceForStorage(normalizedId, reference)),
|
||||
buildMSTeamsConversationStateKey(normalizedId),
|
||||
toPluginJsonValue(prepareMSTeamsConversationReferenceForStorage(normalizedId, reference)),
|
||||
);
|
||||
const rows = [];
|
||||
for (const row of await conversationStore.entries()) {
|
||||
@@ -229,7 +163,7 @@ export function createMSTeamsConversationStoreState(
|
||||
}
|
||||
rows.push(row);
|
||||
}
|
||||
if (rows.length <= MAX_CONVERSATIONS) {
|
||||
if (rows.length <= MSTEAMS_MAX_CONVERSATIONS) {
|
||||
return;
|
||||
}
|
||||
const sorted = rows.toSorted((a, b) => {
|
||||
@@ -239,7 +173,7 @@ export function createMSTeamsConversationStoreState(
|
||||
const bId = getStoredConversationId(b.value) ?? b.key;
|
||||
return aTs - bTs || aId.localeCompare(bId);
|
||||
});
|
||||
for (const row of sorted.slice(0, rows.length - MAX_CONVERSATIONS)) {
|
||||
for (const row of sorted.slice(0, rows.length - MSTEAMS_MAX_CONVERSATIONS)) {
|
||||
await conversationStore.delete(row.key);
|
||||
}
|
||||
};
|
||||
@@ -264,7 +198,6 @@ export function createMSTeamsConversationStoreState(
|
||||
): Promise<void> => {
|
||||
const normalizedId = normalizeStoredConversationId(conversationId);
|
||||
await withMSTeamsSqliteMutationLock(params, CONVERSATION_LOCK_FILENAME, async () => {
|
||||
await importLegacyStore();
|
||||
const existing = await lookupStored(normalizedId);
|
||||
await register(
|
||||
normalizedId,
|
||||
@@ -280,8 +213,7 @@ export function createMSTeamsConversationStoreState(
|
||||
const remove = async (conversationId: string): Promise<boolean> => {
|
||||
const normalizedId = normalizeStoredConversationId(conversationId);
|
||||
return await withMSTeamsSqliteMutationLock(params, CONVERSATION_LOCK_FILENAME, async () => {
|
||||
await importLegacyStore();
|
||||
return await conversationStore.delete(buildConversationStateKey(normalizedId));
|
||||
return await conversationStore.delete(buildMSTeamsConversationStateKey(normalizedId));
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
@@ -212,58 +212,7 @@ describe("msteams pending uploads (fs-backed)", () => {
|
||||
expect(loaded?.consentCardActivityId).toBe("activity-xyz");
|
||||
});
|
||||
|
||||
it("ignores malformed or empty store files and returns undefined", async () => {
|
||||
const stateDir = await makeTempStateDir();
|
||||
const env = makeEnv(stateDir);
|
||||
const storePath = path.join(stateDir, "msteams-pending-uploads.json");
|
||||
await fs.promises.writeFile(storePath, "not valid json", "utf-8");
|
||||
|
||||
// Should not throw and should treat as empty
|
||||
expect(await getPendingUploadFs("anything", { env })).toBeUndefined();
|
||||
await expect(fs.promises.access(storePath)).rejects.toThrow();
|
||||
|
||||
const secondStateDir = await makeTempStateDir();
|
||||
const secondEnv = makeEnv(secondStateDir);
|
||||
const secondStorePath = path.join(secondStateDir, "msteams-pending-uploads.json");
|
||||
await fs.promises.writeFile(
|
||||
secondStorePath,
|
||||
JSON.stringify({ version: 2, uploads: {} }),
|
||||
"utf-8",
|
||||
);
|
||||
expect(await getPendingUploadFs("anything", { env: secondEnv })).toBeUndefined();
|
||||
await expect(fs.promises.access(secondStorePath)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("imports a legacy JSON file that appears after an empty migration marker", async () => {
|
||||
const stateDir = await makeTempStateDir();
|
||||
const env = makeEnv(stateDir);
|
||||
const storePath = path.join(stateDir, "msteams-pending-uploads.json");
|
||||
|
||||
expect(await getPendingUploadFs("upload-late", { env })).toBeUndefined();
|
||||
await fs.promises.writeFile(
|
||||
storePath,
|
||||
`${JSON.stringify({
|
||||
version: 1,
|
||||
uploads: {
|
||||
"upload-late": {
|
||||
id: "upload-late",
|
||||
bufferBase64: Buffer.from("late payload").toString("base64"),
|
||||
filename: "late.txt",
|
||||
conversationId: "19:conv@thread.v2",
|
||||
createdAt: Date.now(),
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const loaded = await getPendingUploadFs("upload-late", { env });
|
||||
expect(loaded?.filename).toBe("late.txt");
|
||||
expect(loaded?.buffer.toString("utf8")).toBe("late payload");
|
||||
await expect(fs.promises.access(storePath)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("skips malformed legacy upload rows while importing valid rows", async () => {
|
||||
it("ignores legacy pending-upload JSON cache files at runtime", async () => {
|
||||
const stateDir = await makeTempStateDir();
|
||||
const env = makeEnv(stateDir);
|
||||
const storePath = path.join(stateDir, "msteams-pending-uploads.json");
|
||||
@@ -272,16 +221,10 @@ describe("msteams pending uploads (fs-backed)", () => {
|
||||
`${JSON.stringify({
|
||||
version: 1,
|
||||
uploads: {
|
||||
broken: {
|
||||
id: "broken",
|
||||
filename: "broken.txt",
|
||||
conversationId: "19:conv@thread.v2",
|
||||
createdAt: Date.now(),
|
||||
},
|
||||
valid: {
|
||||
id: "valid",
|
||||
bufferBase64: Buffer.from("valid payload").toString("base64"),
|
||||
filename: "valid.txt",
|
||||
cached: {
|
||||
id: "cached",
|
||||
bufferBase64: Buffer.from("cached payload").toString("base64"),
|
||||
filename: "cached.txt",
|
||||
conversationId: "19:conv@thread.v2",
|
||||
createdAt: Date.now(),
|
||||
},
|
||||
@@ -290,9 +233,8 @@ describe("msteams pending uploads (fs-backed)", () => {
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
expect(await getPendingUploadFs("broken", { env })).toBeUndefined();
|
||||
const loaded = await getPendingUploadFs("valid", { env });
|
||||
expect(loaded?.buffer.toString("utf8")).toBe("valid payload");
|
||||
expect(await getPendingUploadFs("cached", { env })).toBeUndefined();
|
||||
await expect(fs.promises.access(storePath)).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
|
||||
import { getMSTeamsRuntime } from "./runtime.js";
|
||||
import {
|
||||
@@ -7,8 +6,6 @@ import {
|
||||
toPluginJsonValue,
|
||||
withMSTeamsSqliteMutationLock,
|
||||
} from "./sqlite-state.js";
|
||||
import { resolveMSTeamsStorePath } from "./storage.js";
|
||||
import { readJsonFile } from "./store-fs.js";
|
||||
|
||||
/** TTL for persisted pending uploads (matches in-memory store). */
|
||||
const PENDING_UPLOAD_TTL_MS = 5 * 60 * 1000;
|
||||
@@ -20,10 +17,8 @@ const MAX_PENDING_UPLOAD_CHUNK_ROWS = 45_000;
|
||||
const RAW_CHUNK_BYTES = 36 * 1024;
|
||||
const PENDING_UPLOAD_META_MAX_ENTRIES = MAX_PENDING_UPLOADS + 100;
|
||||
|
||||
const STORE_FILENAME = "msteams-pending-uploads.json";
|
||||
const PENDING_UPLOAD_META_NAMESPACE = "pending-uploads";
|
||||
const PENDING_UPLOAD_CHUNKS_NAMESPACE = "pending-upload-chunks";
|
||||
const PENDING_UPLOAD_MIGRATIONS_NAMESPACE = "pending-upload-migrations";
|
||||
const PENDING_UPLOAD_LOCK_FILENAME = "msteams-pending-uploads.sqlite.lock";
|
||||
|
||||
type PendingUploadFsRecord = {
|
||||
@@ -47,13 +42,6 @@ type PendingUploadFs = {
|
||||
createdAt: number;
|
||||
};
|
||||
|
||||
type PendingUploadStoreData = {
|
||||
version: 1;
|
||||
uploads: Record<string, PendingUploadFsRecord>;
|
||||
};
|
||||
|
||||
const empty: PendingUploadStoreData = { version: 1, uploads: {} };
|
||||
|
||||
type PendingUploadMetaRecord = Omit<PendingUploadFsRecord, "bufferBase64"> & {
|
||||
chunkCount: number;
|
||||
byteLength: number;
|
||||
@@ -65,10 +53,6 @@ type PendingUploadChunkRecord = {
|
||||
dataBase64: string;
|
||||
};
|
||||
|
||||
type PendingUploadMigrationMarker = {
|
||||
importedAt: string;
|
||||
};
|
||||
|
||||
type PendingUploadsFsOptions = {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
homedir?: () => string;
|
||||
@@ -77,16 +61,6 @@ type PendingUploadsFsOptions = {
|
||||
ttlMs?: number;
|
||||
};
|
||||
|
||||
function resolveLegacyFilePath(options: PendingUploadsFsOptions | undefined): string {
|
||||
return resolveMSTeamsStorePath({
|
||||
filename: STORE_FILENAME,
|
||||
env: options?.env,
|
||||
homedir: options?.homedir,
|
||||
stateDir: options?.stateDir,
|
||||
storePath: options?.storePath,
|
||||
});
|
||||
}
|
||||
|
||||
function createMetaStore(
|
||||
options: PendingUploadsFsOptions | undefined,
|
||||
): PluginStateKeyedStore<PendingUploadMetaRecord> {
|
||||
@@ -107,16 +81,6 @@ function createChunkStore(
|
||||
});
|
||||
}
|
||||
|
||||
function createMigrationStore(
|
||||
options: PendingUploadsFsOptions | undefined,
|
||||
): PluginStateKeyedStore<PendingUploadMigrationMarker> {
|
||||
return getMSTeamsRuntime().state.openKeyedStore<PendingUploadMigrationMarker>({
|
||||
namespace: PENDING_UPLOAD_MIGRATIONS_NAMESPACE,
|
||||
maxEntries: 100,
|
||||
env: resolveMSTeamsSqliteStateEnv(options),
|
||||
});
|
||||
}
|
||||
|
||||
function buildUploadKey(id: string): string {
|
||||
return `upload:${createHash("sha256").update(id).digest("hex")}`;
|
||||
}
|
||||
@@ -129,45 +93,6 @@ function buildChunkKey(id: string, index: number): string {
|
||||
return `${buildUploadKey(id)}:chunk:${String(index).padStart(4, "0")}`;
|
||||
}
|
||||
|
||||
function buildMigrationKey(filePath: string): string {
|
||||
return `legacy-json:${createHash("sha256").update(filePath).digest("hex")}`;
|
||||
}
|
||||
|
||||
function buildMigrationContentKey(filePath: string, value: unknown): string {
|
||||
return `legacy-json-content:${createHash("sha256")
|
||||
.update(filePath)
|
||||
.update("\0")
|
||||
.update(JSON.stringify(value) ?? "undefined")
|
||||
.digest("hex")}`;
|
||||
}
|
||||
|
||||
function pruneExpired(
|
||||
uploads: Record<string, PendingUploadFsRecord>,
|
||||
nowMs: number,
|
||||
ttlMs: number,
|
||||
): Record<string, PendingUploadFsRecord> {
|
||||
const kept: Record<string, PendingUploadFsRecord> = {};
|
||||
for (const [id, record] of Object.entries(uploads)) {
|
||||
if (nowMs - record.createdAt <= ttlMs) {
|
||||
kept[id] = record;
|
||||
}
|
||||
}
|
||||
return kept;
|
||||
}
|
||||
|
||||
function pruneToLimit(
|
||||
uploads: Record<string, PendingUploadFsRecord>,
|
||||
): Record<string, PendingUploadFsRecord> {
|
||||
const entries = Object.entries(uploads);
|
||||
if (entries.length <= MAX_PENDING_UPLOADS) {
|
||||
return uploads;
|
||||
}
|
||||
// Oldest createdAt first; drop the oldest until we fit.
|
||||
entries.sort((a, b) => a[1].createdAt - b[1].createdAt);
|
||||
const keep = entries.slice(entries.length - MAX_PENDING_UPLOADS);
|
||||
return Object.fromEntries(keep);
|
||||
}
|
||||
|
||||
function recordToUpload(
|
||||
record: PendingUploadFsRecord | PendingUploadMetaRecord,
|
||||
buffer: Buffer,
|
||||
@@ -183,63 +108,6 @@ function recordToUpload(
|
||||
};
|
||||
}
|
||||
|
||||
function isValidStore(value: unknown): value is PendingUploadStoreData {
|
||||
if (!value || typeof value !== "object") {
|
||||
return false;
|
||||
}
|
||||
const candidate = value as Partial<PendingUploadStoreData>;
|
||||
return (
|
||||
candidate.version === 1 &&
|
||||
typeof candidate.uploads === "object" &&
|
||||
candidate.uploads !== null &&
|
||||
!Array.isArray(candidate.uploads)
|
||||
);
|
||||
}
|
||||
|
||||
function normalizeLegacyUploadRecord(value: unknown): PendingUploadFsRecord | null {
|
||||
if (!value || typeof value !== "object") {
|
||||
return null;
|
||||
}
|
||||
const record = value as Partial<PendingUploadFsRecord>;
|
||||
if (
|
||||
typeof record.id !== "string" ||
|
||||
!record.id ||
|
||||
typeof record.bufferBase64 !== "string" ||
|
||||
typeof record.filename !== "string" ||
|
||||
!record.filename ||
|
||||
typeof record.conversationId !== "string" ||
|
||||
!record.conversationId ||
|
||||
typeof record.createdAt !== "number" ||
|
||||
!Number.isFinite(record.createdAt)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
id: record.id,
|
||||
bufferBase64: record.bufferBase64,
|
||||
filename: record.filename,
|
||||
contentType: typeof record.contentType === "string" ? record.contentType : undefined,
|
||||
conversationId: record.conversationId,
|
||||
consentCardActivityId:
|
||||
typeof record.consentCardActivityId === "string" ? record.consentCardActivityId : undefined,
|
||||
createdAt: record.createdAt,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeLegacyUploads(value: unknown): Record<string, PendingUploadFsRecord> {
|
||||
if (!isValidStore(value)) {
|
||||
return {};
|
||||
}
|
||||
const uploads: Record<string, PendingUploadFsRecord> = {};
|
||||
for (const record of Object.values(value.uploads)) {
|
||||
const normalized = normalizeLegacyUploadRecord(record);
|
||||
if (normalized) {
|
||||
uploads[normalized.id] = normalized;
|
||||
}
|
||||
}
|
||||
return uploads;
|
||||
}
|
||||
|
||||
async function deleteUploadRows(
|
||||
id: string,
|
||||
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
|
||||
@@ -302,38 +170,6 @@ async function registerUploadRows(
|
||||
);
|
||||
}
|
||||
|
||||
async function importLegacyStore(
|
||||
options: PendingUploadsFsOptions | undefined,
|
||||
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
|
||||
chunkStore: PluginStateKeyedStore<PendingUploadChunkRecord>,
|
||||
ttlMs: number,
|
||||
): Promise<void> {
|
||||
const legacyFilePath = resolveLegacyFilePath(options);
|
||||
const migrationStore = createMigrationStore(options);
|
||||
const migrationKey = buildMigrationKey(legacyFilePath);
|
||||
const imported = (await migrationStore.lookup(migrationKey)) !== undefined;
|
||||
const { value, exists } = await readJsonFile<unknown>(legacyFilePath, empty);
|
||||
if (!exists) {
|
||||
if (!imported) {
|
||||
await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() });
|
||||
}
|
||||
return;
|
||||
}
|
||||
const contentKey = buildMigrationContentKey(legacyFilePath, value);
|
||||
if (await migrationStore.lookup(contentKey)) {
|
||||
return;
|
||||
}
|
||||
const legacy = pruneToLimit(pruneExpired(normalizeLegacyUploads(value), Date.now(), ttlMs));
|
||||
for (const record of Object.values(legacy)) {
|
||||
await registerUploadRows(record, metaStore, chunkStore, ttlMs, false);
|
||||
}
|
||||
await migrationStore.register(contentKey, { importedAt: new Date().toISOString() });
|
||||
if (!imported) {
|
||||
await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() });
|
||||
}
|
||||
await fs.rm(legacyFilePath, { force: true }).catch(() => {});
|
||||
}
|
||||
|
||||
async function withPendingUploadLock<T>(
|
||||
options: PendingUploadsFsOptions | undefined,
|
||||
run: () => Promise<T>,
|
||||
@@ -341,17 +177,6 @@ async function withPendingUploadLock<T>(
|
||||
return await withMSTeamsSqliteMutationLock(options, PENDING_UPLOAD_LOCK_FILENAME, run);
|
||||
}
|
||||
|
||||
async function ensureLegacyImported(
|
||||
options: PendingUploadsFsOptions | undefined,
|
||||
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
|
||||
chunkStore: PluginStateKeyedStore<PendingUploadChunkRecord>,
|
||||
ttlMs: number,
|
||||
): Promise<void> {
|
||||
await withPendingUploadLock(options, () =>
|
||||
importLegacyStore(options, metaStore, chunkStore, ttlMs),
|
||||
);
|
||||
}
|
||||
|
||||
async function readUploadRows(
|
||||
id: string,
|
||||
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
|
||||
@@ -432,7 +257,6 @@ export async function storePendingUploadFs(
|
||||
const metaStore = createMetaStore(options);
|
||||
const chunkStore = createChunkStore(options);
|
||||
await withPendingUploadLock(options, async () => {
|
||||
await importLegacyStore(options, metaStore, chunkStore, ttlMs);
|
||||
await registerUploadRows(
|
||||
{
|
||||
id: upload.id,
|
||||
@@ -465,7 +289,6 @@ export async function getPendingUploadFs(
|
||||
const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS;
|
||||
const metaStore = createMetaStore(options);
|
||||
const chunkStore = createChunkStore(options);
|
||||
await ensureLegacyImported(options, metaStore, chunkStore, ttlMs);
|
||||
const upload = await readUploadRows(id, metaStore, chunkStore);
|
||||
if (!upload) {
|
||||
return undefined;
|
||||
@@ -488,11 +311,9 @@ export async function removePendingUploadFs(
|
||||
if (!id) {
|
||||
return;
|
||||
}
|
||||
const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS;
|
||||
const metaStore = createMetaStore(options);
|
||||
const chunkStore = createChunkStore(options);
|
||||
await withPendingUploadLock(options, async () => {
|
||||
await importLegacyStore(options, metaStore, chunkStore, ttlMs);
|
||||
await deleteUploadRows(id, metaStore, chunkStore);
|
||||
});
|
||||
}
|
||||
@@ -508,9 +329,7 @@ export async function setPendingUploadActivityIdFs(
|
||||
): Promise<void> {
|
||||
const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS;
|
||||
const metaStore = createMetaStore(options);
|
||||
const chunkStore = createChunkStore(options);
|
||||
await withPendingUploadLock(options, async () => {
|
||||
await importLegacyStore(options, metaStore, chunkStore, ttlMs);
|
||||
const record = await metaStore.lookup(buildMetaKey(id));
|
||||
if (!record || Date.now() - record.createdAt > ttlMs) {
|
||||
return;
|
||||
|
||||
@@ -152,7 +152,7 @@ describe("state poll store", () => {
|
||||
setMSTeamsRuntime(msteamsRuntimeStub);
|
||||
});
|
||||
|
||||
it("imports legacy JSON polls once and removes the old file", async () => {
|
||||
it("ignores legacy JSON polls at runtime", async () => {
|
||||
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
|
||||
const filePath = path.join(stateDir, "msteams-polls.json");
|
||||
await fs.promises.writeFile(
|
||||
@@ -173,18 +173,18 @@ describe("state poll store", () => {
|
||||
);
|
||||
|
||||
const store = createMSTeamsPollStoreState({ stateDir });
|
||||
await expect(store.getPoll("poll-legacy")).resolves.toMatchObject({
|
||||
id: "poll-legacy",
|
||||
question: "Legacy?",
|
||||
});
|
||||
await expect(fs.promises.access(filePath)).rejects.toThrow();
|
||||
await expect(store.getPoll("poll-legacy")).resolves.toBeNull();
|
||||
await expect(fs.promises.access(filePath)).resolves.toBeUndefined();
|
||||
|
||||
const updated = await store.recordVote({
|
||||
pollId: "poll-legacy",
|
||||
voterId: "user-1",
|
||||
selections: ["1"],
|
||||
await store.createPoll({
|
||||
id: "poll-new",
|
||||
question: "New?",
|
||||
options: ["A", "B"],
|
||||
maxSelections: 1,
|
||||
createdAt: new Date().toISOString(),
|
||||
votes: {},
|
||||
});
|
||||
expect(updated?.votes["user-1"]).toEqual(["1"]);
|
||||
await expect(store.getPoll("poll-new")).resolves.toMatchObject({ id: "poll-new" });
|
||||
await expect(
|
||||
fs.promises.access(path.join(stateDir, "state", "openclaw.sqlite")),
|
||||
).resolves.toBeUndefined();
|
||||
@@ -264,106 +264,6 @@ describe("state poll store", () => {
|
||||
expect(stored?.votes["user-new"]).toEqual(["1"]);
|
||||
});
|
||||
|
||||
it("fills missing legacy vote buckets after a partial metadata import", async () => {
|
||||
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
|
||||
const env = { ...process.env, OPENCLAW_STATE_DIR: stateDir };
|
||||
const filePath = path.join(stateDir, "msteams-polls.json");
|
||||
const metadata = {
|
||||
id: "poll-partial",
|
||||
question: "Partial?",
|
||||
options: ["A", "B"],
|
||||
maxSelections: 1,
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
const metadataStore = createPluginStateKeyedStoreForTests<typeof metadata>("msteams", {
|
||||
namespace: "polls",
|
||||
maxEntries: 2000,
|
||||
env,
|
||||
});
|
||||
await metadataStore.register("poll-partial", metadata);
|
||||
const voterHash = crypto
|
||||
.createHash("sha256")
|
||||
.update("poll-partial")
|
||||
.update("\0")
|
||||
.update("user-legacy")
|
||||
.digest("hex");
|
||||
const bucket = String(Number.parseInt(voterHash.slice(0, 8), 16) % 32).padStart(4, "0");
|
||||
const pollHash = crypto.createHash("sha256").update("poll-partial").digest("hex");
|
||||
const voteBucketStore = createPluginStateKeyedStoreForTests<{
|
||||
pollId: string;
|
||||
bucket: string;
|
||||
votes: Record<string, string[]>;
|
||||
updatedAt: string;
|
||||
}>("msteams", {
|
||||
namespace: "poll-vote-buckets",
|
||||
maxEntries: 32_032,
|
||||
env,
|
||||
});
|
||||
await voteBucketStore.register(`${pollHash}:${bucket}`, {
|
||||
pollId: "poll-partial",
|
||||
bucket,
|
||||
votes: { "user-legacy": ["0"] },
|
||||
updatedAt: metadata.createdAt,
|
||||
});
|
||||
await fs.promises.writeFile(
|
||||
filePath,
|
||||
`${JSON.stringify({
|
||||
version: 1,
|
||||
polls: {
|
||||
"poll-partial": {
|
||||
...metadata,
|
||||
votes: {
|
||||
"user-legacy": ["1"],
|
||||
"user-missing": ["1"],
|
||||
},
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
const store = createMSTeamsPollStoreState({ env });
|
||||
|
||||
await expect(store.getPoll("poll-partial")).resolves.toMatchObject({
|
||||
votes: {
|
||||
"user-legacy": ["0"],
|
||||
"user-missing": ["1"],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps newest legacy polls by update timestamp at the row cap", async () => {
|
||||
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
|
||||
const filePath = path.join(stateDir, "msteams-polls.json");
|
||||
const pollRows: Record<string, MSTeamsPoll> = {};
|
||||
const baseMs = Date.now() - 60_000;
|
||||
pollRows["poll-recent"] = {
|
||||
id: "poll-recent",
|
||||
question: "Recent?",
|
||||
options: ["A", "B"],
|
||||
maxSelections: 1,
|
||||
createdAt: new Date(baseMs + 2_000_000).toISOString(),
|
||||
updatedAt: new Date(baseMs + 2_000_000).toISOString(),
|
||||
votes: {},
|
||||
};
|
||||
for (let index = 0; index < 1000; index += 1) {
|
||||
const id = `poll-${String(index).padStart(4, "0")}`;
|
||||
pollRows[id] = {
|
||||
id,
|
||||
question: "Old?",
|
||||
options: ["A", "B"],
|
||||
maxSelections: 1,
|
||||
createdAt: new Date(baseMs + index).toISOString(),
|
||||
votes: {},
|
||||
};
|
||||
}
|
||||
await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, polls: pollRows })}\n`);
|
||||
|
||||
const store = createMSTeamsPollStoreState({ stateDir });
|
||||
|
||||
await expect(store.getPoll("poll-recent")).resolves.toMatchObject({ id: "poll-recent" });
|
||||
await expect(store.getPoll("poll-0000")).resolves.toBeNull();
|
||||
});
|
||||
|
||||
it("deletes vote buckets when pruning over the poll cap", async () => {
|
||||
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
|
||||
const env = { ...process.env, OPENCLAW_STATE_DIR: stateDir };
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import { parseStrictNonNegativeInteger } from "openclaw/plugin-sdk/number-runtime";
|
||||
import {
|
||||
isRecord,
|
||||
@@ -13,8 +12,6 @@ import {
|
||||
toPluginJsonValue,
|
||||
withMSTeamsSqliteMutationLock,
|
||||
} from "./sqlite-state.js";
|
||||
import { resolveMSTeamsStorePath } from "./storage.js";
|
||||
import { readJsonFile } from "./store-fs.js";
|
||||
|
||||
type MSTeamsPollVote = {
|
||||
pollId: string;
|
||||
@@ -52,31 +49,30 @@ type MSTeamsPollCard = {
|
||||
fallbackText: string;
|
||||
};
|
||||
|
||||
type PollStoreData = {
|
||||
export type MSTeamsPollStoreData = {
|
||||
version: 1;
|
||||
polls: Record<string, MSTeamsPoll>;
|
||||
};
|
||||
|
||||
type StoredMSTeamsPoll = Omit<MSTeamsPoll, "votes">;
|
||||
export type StoredMSTeamsPoll = Omit<MSTeamsPoll, "votes">;
|
||||
|
||||
type StoredMSTeamsPollVoteBucket = {
|
||||
export type StoredMSTeamsPollVoteBucket = {
|
||||
pollId: string;
|
||||
bucket: string;
|
||||
votes: Record<string, string[]>;
|
||||
updatedAt: string;
|
||||
};
|
||||
|
||||
const STORE_FILENAME = "msteams-polls.json";
|
||||
const POLLS_NAMESPACE = "polls";
|
||||
const POLL_VOTE_BUCKETS_NAMESPACE = "poll-vote-buckets";
|
||||
const POLL_MIGRATIONS_NAMESPACE = "poll-migrations";
|
||||
const LEGACY_POLLS_MIGRATION_KEY = "msteams-polls-json-v1";
|
||||
const MAX_POLLS = 1000;
|
||||
const SQLITE_MAX_POLL_ROWS = MAX_POLLS + 1000;
|
||||
export const MSTEAMS_POLLS_LEGACY_FILENAME = "msteams-polls.json";
|
||||
export const MSTEAMS_POLLS_NAMESPACE = "polls";
|
||||
export const MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE = "poll-vote-buckets";
|
||||
export const MSTEAMS_MAX_POLLS = 1000;
|
||||
export const MSTEAMS_SQLITE_MAX_POLL_ROWS = MSTEAMS_MAX_POLLS + 1000;
|
||||
// Keep worst-case retained vote buckets below plugin-state's per-plugin live row cap.
|
||||
const POLL_VOTE_BUCKET_COUNT = 32;
|
||||
const MAX_POLL_VOTE_BUCKET_ROWS = (MAX_POLLS + 1) * POLL_VOTE_BUCKET_COUNT;
|
||||
const POLL_TTL_MS = 30 * 24 * 60 * 60 * 1000;
|
||||
export const MSTEAMS_POLL_VOTE_BUCKET_COUNT = 32;
|
||||
export const MSTEAMS_MAX_POLL_VOTE_BUCKET_ROWS =
|
||||
(MSTEAMS_MAX_POLLS + 1) * MSTEAMS_POLL_VOTE_BUCKET_COUNT;
|
||||
export const MSTEAMS_POLL_TTL_MS = 30 * 24 * 60 * 60 * 1000;
|
||||
const POLL_LOCK_FILENAME = "msteams-polls.sqlite.lock";
|
||||
|
||||
function normalizeChoiceValue(value: unknown): string | null {
|
||||
@@ -248,30 +244,18 @@ type MSTeamsPollStoreStateOptions = {
|
||||
storePath?: string;
|
||||
};
|
||||
|
||||
type PollMigrationMarker = {
|
||||
importedAt: string;
|
||||
};
|
||||
|
||||
function createPollStateStore(params?: MSTeamsPollStoreStateOptions) {
|
||||
return getMSTeamsRuntime().state.openKeyedStore<StoredMSTeamsPoll>({
|
||||
namespace: POLLS_NAMESPACE,
|
||||
maxEntries: SQLITE_MAX_POLL_ROWS,
|
||||
namespace: MSTEAMS_POLLS_NAMESPACE,
|
||||
maxEntries: MSTEAMS_SQLITE_MAX_POLL_ROWS,
|
||||
env: resolveMSTeamsSqliteStateEnv(params),
|
||||
});
|
||||
}
|
||||
|
||||
function createPollVoteBucketStateStore(params?: MSTeamsPollStoreStateOptions) {
|
||||
return getMSTeamsRuntime().state.openKeyedStore<StoredMSTeamsPollVoteBucket>({
|
||||
namespace: POLL_VOTE_BUCKETS_NAMESPACE,
|
||||
maxEntries: MAX_POLL_VOTE_BUCKET_ROWS,
|
||||
env: resolveMSTeamsSqliteStateEnv(params),
|
||||
});
|
||||
}
|
||||
|
||||
function createPollMigrationStore(params?: MSTeamsPollStoreStateOptions) {
|
||||
return getMSTeamsRuntime().state.openKeyedStore<PollMigrationMarker>({
|
||||
namespace: POLL_MIGRATIONS_NAMESPACE,
|
||||
maxEntries: 100,
|
||||
namespace: MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE,
|
||||
maxEntries: MSTEAMS_MAX_POLL_VOTE_BUCKET_ROWS,
|
||||
env: resolveMSTeamsSqliteStateEnv(params),
|
||||
});
|
||||
}
|
||||
@@ -287,7 +271,7 @@ function parseTimestamp(value?: string): number | null {
|
||||
function pruneExpired<T extends { createdAt: string; updatedAt?: string }>(
|
||||
polls: Record<string, T>,
|
||||
) {
|
||||
const cutoff = Date.now() - POLL_TTL_MS;
|
||||
const cutoff = Date.now() - MSTEAMS_POLL_TTL_MS;
|
||||
const entries = Object.entries(polls).filter(([, poll]) => {
|
||||
const ts = parseTimestamp(poll.updatedAt ?? poll.createdAt) ?? 0;
|
||||
return ts >= cutoff;
|
||||
@@ -295,9 +279,11 @@ function pruneExpired<T extends { createdAt: string; updatedAt?: string }>(
|
||||
return Object.fromEntries(entries);
|
||||
}
|
||||
|
||||
function selectRetainedPolls(polls: Record<string, MSTeamsPoll>): Array<[string, MSTeamsPoll]> {
|
||||
export function selectRetainedMSTeamsPolls(
|
||||
polls: Record<string, MSTeamsPoll>,
|
||||
): Array<[string, MSTeamsPoll]> {
|
||||
const retained = Object.entries(pruneExpired(polls));
|
||||
if (retained.length <= MAX_POLLS) {
|
||||
if (retained.length <= MSTEAMS_MAX_POLLS) {
|
||||
return retained;
|
||||
}
|
||||
retained.sort((a, b) => {
|
||||
@@ -305,7 +291,7 @@ function selectRetainedPolls(polls: Record<string, MSTeamsPoll>): Array<[string,
|
||||
const bTs = parseTimestamp(b[1].updatedAt ?? b[1].createdAt) ?? 0;
|
||||
return aTs - bTs || a[0].localeCompare(b[0]);
|
||||
});
|
||||
return retained.slice(retained.length - MAX_POLLS);
|
||||
return retained.slice(retained.length - MSTEAMS_MAX_POLLS);
|
||||
}
|
||||
|
||||
export function normalizeMSTeamsPollSelections(poll: MSTeamsPoll, selections: string[]) {
|
||||
@@ -319,45 +305,37 @@ export function normalizeMSTeamsPollSelections(poll: MSTeamsPoll, selections: st
|
||||
return uniqueStrings(limited);
|
||||
}
|
||||
|
||||
export function splitMSTeamsPoll(poll: MSTeamsPoll): {
|
||||
metadata: StoredMSTeamsPoll;
|
||||
votes: MSTeamsPoll["votes"];
|
||||
} {
|
||||
const { votes, ...metadata } = poll;
|
||||
return { metadata, votes };
|
||||
}
|
||||
|
||||
function hashMSTeamsPollVote(pollId: string, voterId: string): string {
|
||||
return crypto.createHash("sha256").update(pollId).update("\0").update(voterId).digest("hex");
|
||||
}
|
||||
|
||||
export function buildMSTeamsPollStateKey(pollId: string): string {
|
||||
return crypto.createHash("sha256").update(pollId).digest("hex");
|
||||
}
|
||||
|
||||
export function selectMSTeamsPollVoteBucket(pollId: string, voterId: string): string {
|
||||
const bucket = Number.parseInt(hashMSTeamsPollVote(pollId, voterId).slice(0, 8), 16);
|
||||
return String(bucket % MSTEAMS_POLL_VOTE_BUCKET_COUNT).padStart(4, "0");
|
||||
}
|
||||
|
||||
export function buildMSTeamsPollVoteBucketKey(pollId: string, bucket: string): string {
|
||||
const pollDigest = crypto.createHash("sha256").update(pollId).digest("hex");
|
||||
return `${pollDigest}:${bucket}`;
|
||||
}
|
||||
|
||||
export function createMSTeamsPollStoreState(
|
||||
params?: MSTeamsPollStoreStateOptions,
|
||||
): MSTeamsPollStore {
|
||||
const pollStore = createPollStateStore(params);
|
||||
const voteBucketStore = createPollVoteBucketStateStore(params);
|
||||
const migrationStore = createPollMigrationStore(params);
|
||||
const legacyStorePath = resolveMSTeamsStorePath({
|
||||
filename: STORE_FILENAME,
|
||||
env: params?.env,
|
||||
homedir: params?.homedir,
|
||||
stateDir: params?.stateDir,
|
||||
storePath: params?.storePath,
|
||||
});
|
||||
let legacyImportPromise: Promise<void> | null = null;
|
||||
|
||||
const splitPoll = (
|
||||
poll: MSTeamsPoll,
|
||||
): { metadata: StoredMSTeamsPoll; votes: MSTeamsPoll["votes"] } => {
|
||||
const { votes, ...metadata } = poll;
|
||||
return { metadata, votes };
|
||||
};
|
||||
|
||||
const hashVote = (pollId: string, voterId: string): string => {
|
||||
return crypto.createHash("sha256").update(pollId).update("\0").update(voterId).digest("hex");
|
||||
};
|
||||
|
||||
const buildPollStateKey = (pollId: string): string => {
|
||||
return crypto.createHash("sha256").update(pollId).digest("hex");
|
||||
};
|
||||
|
||||
const selectVoteBucket = (pollId: string, voterId: string): string => {
|
||||
const bucket = Number.parseInt(hashVote(pollId, voterId).slice(0, 8), 16);
|
||||
return String(bucket % POLL_VOTE_BUCKET_COUNT).padStart(4, "0");
|
||||
};
|
||||
|
||||
const buildVoteBucketKey = (pollId: string, bucket: string): string => {
|
||||
const pollDigest = crypto.createHash("sha256").update(pollId).digest("hex");
|
||||
return `${pollDigest}:${bucket}`;
|
||||
};
|
||||
|
||||
const readPollVotes = async (pollId: string): Promise<Record<string, string[]>> => {
|
||||
const votes: Record<string, string[]> = {};
|
||||
@@ -384,13 +362,13 @@ export function createMSTeamsPollStoreState(
|
||||
): Promise<void> => {
|
||||
const buckets = new Map<string, Record<string, string[]>>();
|
||||
for (const [voterId, selections] of Object.entries(votes)) {
|
||||
const bucket = selectVoteBucket(pollId, voterId);
|
||||
const bucket = selectMSTeamsPollVoteBucket(pollId, voterId);
|
||||
const bucketVotes = buckets.get(bucket) ?? {};
|
||||
bucketVotes[voterId] = selections;
|
||||
buckets.set(bucket, bucketVotes);
|
||||
}
|
||||
for (const [bucket, bucketVotes] of buckets) {
|
||||
const key = buildVoteBucketKey(pollId, bucket);
|
||||
const key = buildMSTeamsPollVoteBucketKey(pollId, bucket);
|
||||
const existing = await voteBucketStore.lookup(key);
|
||||
await voteBucketStore.register(
|
||||
key,
|
||||
@@ -410,8 +388,8 @@ export function createMSTeamsPollStoreState(
|
||||
selections: string[],
|
||||
updatedAt: string,
|
||||
): Promise<void> => {
|
||||
const bucket = selectVoteBucket(pollId, voterId);
|
||||
const key = buildVoteBucketKey(pollId, bucket);
|
||||
const bucket = selectMSTeamsPollVoteBucket(pollId, voterId);
|
||||
const key = buildMSTeamsPollVoteBucketKey(pollId, bucket);
|
||||
const existing = await voteBucketStore.lookup(key);
|
||||
await voteBucketStore.register(
|
||||
key,
|
||||
@@ -428,48 +406,6 @@ export function createMSTeamsPollStoreState(
|
||||
return { ...metadata, votes: await readPollVotes(metadata.id) };
|
||||
};
|
||||
|
||||
const importLegacyStore = async (): Promise<void> => {
|
||||
if (await migrationStore.lookup(LEGACY_POLLS_MIGRATION_KEY)) {
|
||||
return;
|
||||
}
|
||||
const empty: PollStoreData = { version: 1, polls: {} };
|
||||
const { value, exists } = await readJsonFile<PollStoreData>(legacyStorePath, empty);
|
||||
if (!exists) {
|
||||
await migrationStore.register(LEGACY_POLLS_MIGRATION_KEY, {
|
||||
importedAt: new Date().toISOString(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
const legacyPolls =
|
||||
value.version === 1 &&
|
||||
value.polls &&
|
||||
typeof value.polls === "object" &&
|
||||
!Array.isArray(value.polls)
|
||||
? value.polls
|
||||
: {};
|
||||
for (const [pollId, poll] of selectRetainedPolls(legacyPolls)) {
|
||||
if (!pollId) {
|
||||
continue;
|
||||
}
|
||||
const { metadata, votes } = splitPoll(poll);
|
||||
await pollStore.registerIfAbsent(buildPollStateKey(pollId), toPluginJsonValue(metadata));
|
||||
await registerPollVotes(pollId, votes, poll.updatedAt ?? poll.createdAt);
|
||||
}
|
||||
await migrationStore.register(LEGACY_POLLS_MIGRATION_KEY, {
|
||||
importedAt: new Date().toISOString(),
|
||||
});
|
||||
await fs.rm(legacyStorePath, { force: true }).catch(() => {});
|
||||
};
|
||||
|
||||
const ensureLegacyImported = async (): Promise<void> => {
|
||||
legacyImportPromise ??= withMSTeamsSqliteMutationLock(
|
||||
params,
|
||||
POLL_LOCK_FILENAME,
|
||||
importLegacyStore,
|
||||
);
|
||||
await legacyImportPromise;
|
||||
};
|
||||
|
||||
const prunePollStoreToLimit = async (): Promise<void> => {
|
||||
const rows = [];
|
||||
for (const row of await pollStore.entries()) {
|
||||
@@ -480,7 +416,7 @@ export function createMSTeamsPollStoreState(
|
||||
}
|
||||
rows.push(row);
|
||||
}
|
||||
if (rows.length <= MAX_POLLS) {
|
||||
if (rows.length <= MSTEAMS_MAX_POLLS) {
|
||||
return;
|
||||
}
|
||||
const sorted = rows.toSorted((a, b) => {
|
||||
@@ -488,7 +424,7 @@ export function createMSTeamsPollStoreState(
|
||||
const bTs = parseTimestamp(b.value.updatedAt ?? b.value.createdAt) ?? 0;
|
||||
return aTs - bTs || a.key.localeCompare(b.key);
|
||||
});
|
||||
for (const row of sorted.slice(0, rows.length - MAX_POLLS)) {
|
||||
for (const row of sorted.slice(0, rows.length - MSTEAMS_MAX_POLLS)) {
|
||||
await pollStore.delete(row.key);
|
||||
await deletePollVotes(row.value.id);
|
||||
}
|
||||
@@ -496,9 +432,8 @@ export function createMSTeamsPollStoreState(
|
||||
|
||||
const createPoll = async (poll: MSTeamsPoll) => {
|
||||
await withMSTeamsSqliteMutationLock(params, POLL_LOCK_FILENAME, async () => {
|
||||
await importLegacyStore();
|
||||
const { metadata, votes } = splitPoll(poll);
|
||||
await pollStore.register(buildPollStateKey(poll.id), toPluginJsonValue(metadata));
|
||||
const { metadata, votes } = splitMSTeamsPoll(poll);
|
||||
await pollStore.register(buildMSTeamsPollStateKey(poll.id), toPluginJsonValue(metadata));
|
||||
await deletePollVotes(poll.id);
|
||||
await registerPollVotes(poll.id, votes, poll.updatedAt ?? poll.createdAt);
|
||||
await prunePollStoreToLimit();
|
||||
@@ -506,8 +441,7 @@ export function createMSTeamsPollStoreState(
|
||||
};
|
||||
|
||||
const getPoll = async (pollId: string) => {
|
||||
await ensureLegacyImported();
|
||||
const poll = await pollStore.lookup(buildPollStateKey(pollId));
|
||||
const poll = await pollStore.lookup(buildMSTeamsPollStateKey(pollId));
|
||||
if (!poll) {
|
||||
return null;
|
||||
}
|
||||
@@ -519,8 +453,7 @@ export function createMSTeamsPollStoreState(
|
||||
|
||||
const recordVote = async (vote: { pollId: string; voterId: string; selections: string[] }) => {
|
||||
return await withMSTeamsSqliteMutationLock(params, POLL_LOCK_FILENAME, async () => {
|
||||
await importLegacyStore();
|
||||
const pollKey = buildPollStateKey(vote.pollId);
|
||||
const pollKey = buildMSTeamsPollStateKey(vote.pollId);
|
||||
const poll = await pollStore.lookup(pollKey);
|
||||
if (!poll) {
|
||||
return null;
|
||||
|
||||
@@ -2,7 +2,7 @@ import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { beforeEach, describe, expect, it } from "vitest";
|
||||
import { setMSTeamsRuntime } from "./runtime.js";
|
||||
import { createMSTeamsSsoTokenStoreFs } from "./sso-token-store.js";
|
||||
import { msteamsRuntimeStub } from "./test-support/runtime.js";
|
||||
@@ -13,10 +13,6 @@ describe("msteams sso token store (plugin state)", () => {
|
||||
setMSTeamsRuntime(msteamsRuntimeStub);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("keeps distinct tokens when connectionName and userId contain the legacy delimiter", async () => {
|
||||
const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-"));
|
||||
const storePath = path.join(stateDir, "msteams-sso-tokens.json");
|
||||
@@ -47,7 +43,7 @@ describe("msteams sso token store (plugin state)", () => {
|
||||
).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("loads legacy flat-key files by rebuilding keys from stored token payloads", async () => {
|
||||
it("ignores legacy flat-key token files at runtime", async () => {
|
||||
const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-legacy-"));
|
||||
const storePath = path.join(stateDir, "msteams-sso-tokens.json");
|
||||
await fs.writeFile(
|
||||
@@ -76,13 +72,8 @@ describe("msteams sso token store (plugin state)", () => {
|
||||
connectionName: "conn",
|
||||
userId: "user-1",
|
||||
}),
|
||||
).toEqual({
|
||||
connectionName: "conn",
|
||||
userId: "user-1",
|
||||
token: "token-1",
|
||||
updatedAt: "2026-04-10T00:00:00.000Z",
|
||||
});
|
||||
await expect(fs.access(storePath)).rejects.toThrow();
|
||||
).toBeNull();
|
||||
await expect(fs.access(storePath)).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("keeps plugin-state keys bounded for long Teams identifiers", async () => {
|
||||
@@ -100,72 +91,4 @@ describe("msteams sso token store (plugin state)", () => {
|
||||
expect(await store.remove(token)).toBe(true);
|
||||
expect(await store.get(token)).toBeNull();
|
||||
});
|
||||
|
||||
it("imports a legacy token file that appears after an empty migration marker", async () => {
|
||||
const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-late-"));
|
||||
const storePath = path.join(stateDir, "msteams-sso-tokens.json");
|
||||
const store = createMSTeamsSsoTokenStoreFs({ storePath });
|
||||
|
||||
expect(await store.get({ connectionName: "conn", userId: "user-late" })).toBeNull();
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
`${JSON.stringify({
|
||||
version: 1,
|
||||
tokens: {
|
||||
late: {
|
||||
connectionName: "conn",
|
||||
userId: "user-late",
|
||||
token: "token-late",
|
||||
updatedAt: "2026-04-10T00:00:00.000Z",
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
"utf8",
|
||||
);
|
||||
|
||||
expect(await store.get({ connectionName: "conn", userId: "user-late" })).toEqual({
|
||||
connectionName: "conn",
|
||||
userId: "user-late",
|
||||
token: "token-late",
|
||||
updatedAt: "2026-04-10T00:00:00.000Z",
|
||||
});
|
||||
await expect(fs.access(storePath)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("does not resurrect removed tokens when a migrated legacy file cannot be deleted", async () => {
|
||||
const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-stale-"));
|
||||
const storePath = path.join(stateDir, "msteams-sso-tokens.json");
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
`${JSON.stringify({
|
||||
version: 1,
|
||||
tokens: {
|
||||
stale: {
|
||||
connectionName: "conn",
|
||||
userId: "user-stale",
|
||||
token: "token-stale",
|
||||
updatedAt: "2026-04-10T00:00:00.000Z",
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
"utf8",
|
||||
);
|
||||
const originalRm = fs.rm;
|
||||
vi.spyOn(fs, "rm").mockImplementation(async (target, options) => {
|
||||
if (target === storePath) {
|
||||
throw new Error("cannot remove");
|
||||
}
|
||||
return await originalRm(target, options);
|
||||
});
|
||||
|
||||
const store = createMSTeamsSsoTokenStoreFs({ storePath });
|
||||
expect(await store.get({ connectionName: "conn", userId: "user-stale" })).toEqual({
|
||||
connectionName: "conn",
|
||||
userId: "user-stale",
|
||||
token: "token-stale",
|
||||
updatedAt: "2026-04-10T00:00:00.000Z",
|
||||
});
|
||||
expect(await store.remove({ connectionName: "conn", userId: "user-stale" })).toBe(true);
|
||||
expect(await store.get({ connectionName: "conn", userId: "user-stale" })).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
*/
|
||||
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
|
||||
import { getMSTeamsRuntime } from "./runtime.js";
|
||||
import {
|
||||
@@ -20,10 +19,8 @@ import {
|
||||
toPluginJsonValue,
|
||||
withMSTeamsSqliteMutationLock,
|
||||
} from "./sqlite-state.js";
|
||||
import { resolveMSTeamsStorePath } from "./storage.js";
|
||||
import { readJsonFile } from "./store-fs.js";
|
||||
|
||||
type MSTeamsSsoStoredToken = {
|
||||
export type MSTeamsSsoStoredToken = {
|
||||
/** Connection name from the Bot Framework OAuth connection setting. */
|
||||
connectionName: string;
|
||||
/** Stable user identifier (AAD object ID preferred). */
|
||||
@@ -48,31 +45,20 @@ type SsoStoreData = {
|
||||
tokens: Record<string, MSTeamsSsoStoredToken>;
|
||||
};
|
||||
|
||||
const STORE_FILENAME = "msteams-sso-tokens.json";
|
||||
const SSO_TOKENS_NAMESPACE = "sso-tokens";
|
||||
const SSO_TOKEN_MIGRATIONS_NAMESPACE = "sso-token-migrations";
|
||||
export type MSTeamsSsoStoreData = SsoStoreData;
|
||||
|
||||
export const MSTEAMS_SSO_TOKENS_LEGACY_FILENAME = "msteams-sso-tokens.json";
|
||||
export const MSTEAMS_SSO_TOKENS_NAMESPACE = "sso-tokens";
|
||||
const SSO_TOKEN_LOCK_FILENAME = "msteams-sso-tokens.sqlite.lock";
|
||||
const MAX_SSO_TOKENS = 5000;
|
||||
export const MSTEAMS_MAX_SSO_TOKENS = 5000;
|
||||
const STORE_KEY_VERSION_PREFIX = "v2:";
|
||||
|
||||
function makeKey(connectionName: string, userId: string): string {
|
||||
export function makeMSTeamsSsoTokenStoreKey(connectionName: string, userId: string): string {
|
||||
return `${STORE_KEY_VERSION_PREFIX}${createHash("sha256")
|
||||
.update(JSON.stringify([connectionName, userId]))
|
||||
.digest("hex")}`;
|
||||
}
|
||||
|
||||
function buildMigrationKey(filePath: string): string {
|
||||
return `legacy-json:${createHash("sha256").update(filePath).digest("hex")}`;
|
||||
}
|
||||
|
||||
function buildMigrationContentKey(filePath: string, value: unknown): string {
|
||||
return `legacy-json-content:${createHash("sha256")
|
||||
.update(filePath)
|
||||
.update("\0")
|
||||
.update(JSON.stringify(value) ?? "undefined")
|
||||
.digest("hex")}`;
|
||||
}
|
||||
|
||||
function createTokenStore(params?: {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
homedir?: () => string;
|
||||
@@ -80,26 +66,13 @@ function createTokenStore(params?: {
|
||||
storePath?: string;
|
||||
}): PluginStateKeyedStore<MSTeamsSsoStoredToken> {
|
||||
return getMSTeamsRuntime().state.openKeyedStore<MSTeamsSsoStoredToken>({
|
||||
namespace: SSO_TOKENS_NAMESPACE,
|
||||
maxEntries: MAX_SSO_TOKENS,
|
||||
namespace: MSTEAMS_SSO_TOKENS_NAMESPACE,
|
||||
maxEntries: MSTEAMS_MAX_SSO_TOKENS,
|
||||
env: resolveMSTeamsSqliteStateEnv(params),
|
||||
});
|
||||
}
|
||||
|
||||
function createMigrationStore(params?: {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
homedir?: () => string;
|
||||
stateDir?: string;
|
||||
storePath?: string;
|
||||
}): PluginStateKeyedStore<{ importedAt: string }> {
|
||||
return getMSTeamsRuntime().state.openKeyedStore<{ importedAt: string }>({
|
||||
namespace: SSO_TOKEN_MIGRATIONS_NAMESPACE,
|
||||
maxEntries: 100,
|
||||
env: resolveMSTeamsSqliteStateEnv(params),
|
||||
});
|
||||
}
|
||||
|
||||
function normalizeStoredToken(value: unknown): MSTeamsSsoStoredToken | null {
|
||||
export function normalizeMSTeamsSsoStoredToken(value: unknown): MSTeamsSsoStoredToken | null {
|
||||
if (!value || typeof value !== "object") {
|
||||
return null;
|
||||
}
|
||||
@@ -125,7 +98,7 @@ function normalizeStoredToken(value: unknown): MSTeamsSsoStoredToken | null {
|
||||
};
|
||||
}
|
||||
|
||||
function isSsoStoreData(value: unknown): value is SsoStoreData {
|
||||
export function isMSTeamsSsoStoreData(value: unknown): value is MSTeamsSsoStoreData {
|
||||
if (!value || typeof value !== "object") {
|
||||
return false;
|
||||
}
|
||||
@@ -139,71 +112,17 @@ export function createMSTeamsSsoTokenStoreFs(params?: {
|
||||
stateDir?: string;
|
||||
storePath?: string;
|
||||
}): MSTeamsSsoTokenStore {
|
||||
const legacyFilePath = resolveMSTeamsStorePath({
|
||||
filename: STORE_FILENAME,
|
||||
env: params?.env,
|
||||
homedir: params?.homedir,
|
||||
stateDir: params?.stateDir,
|
||||
storePath: params?.storePath,
|
||||
});
|
||||
const empty: SsoStoreData = { version: 1, tokens: {} };
|
||||
const tokenStore = createTokenStore(params);
|
||||
const migrationStore = createMigrationStore(params);
|
||||
const migrationKey = buildMigrationKey(legacyFilePath);
|
||||
let legacyImportPromise: Promise<void> | null = null;
|
||||
|
||||
const importLegacyStore = async (): Promise<void> => {
|
||||
const imported = (await migrationStore.lookup(migrationKey)) !== undefined;
|
||||
const { value, exists } = await readJsonFile<unknown>(legacyFilePath, empty);
|
||||
const contentKey = exists ? buildMigrationContentKey(legacyFilePath, value) : null;
|
||||
if (contentKey && (await migrationStore.lookup(contentKey))) {
|
||||
return;
|
||||
}
|
||||
if (exists && isSsoStoreData(value)) {
|
||||
for (const stored of Object.values(value.tokens)) {
|
||||
const normalized = normalizeStoredToken(stored);
|
||||
if (!normalized) {
|
||||
continue;
|
||||
}
|
||||
await tokenStore.registerIfAbsent(
|
||||
makeKey(normalized.connectionName, normalized.userId),
|
||||
toPluginJsonValue(normalized),
|
||||
);
|
||||
}
|
||||
}
|
||||
if (contentKey) {
|
||||
await migrationStore.register(contentKey, { importedAt: new Date().toISOString() });
|
||||
}
|
||||
if (!imported) {
|
||||
await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() });
|
||||
}
|
||||
if (exists) {
|
||||
await fs.rm(legacyFilePath, { force: true }).catch(() => {});
|
||||
}
|
||||
};
|
||||
|
||||
const ensureLegacyImported = async (): Promise<void> => {
|
||||
if (!legacyImportPromise) {
|
||||
legacyImportPromise = withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, () =>
|
||||
importLegacyStore(),
|
||||
).finally(() => {
|
||||
legacyImportPromise = null;
|
||||
});
|
||||
}
|
||||
await legacyImportPromise;
|
||||
};
|
||||
|
||||
return {
|
||||
async get({ connectionName, userId }) {
|
||||
await ensureLegacyImported();
|
||||
return (await tokenStore.lookup(makeKey(connectionName, userId))) ?? null;
|
||||
return (await tokenStore.lookup(makeMSTeamsSsoTokenStoreKey(connectionName, userId))) ?? null;
|
||||
},
|
||||
|
||||
async save(token) {
|
||||
await withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, async () => {
|
||||
await importLegacyStore();
|
||||
await tokenStore.register(
|
||||
makeKey(token.connectionName, token.userId),
|
||||
makeMSTeamsSsoTokenStoreKey(token.connectionName, token.userId),
|
||||
toPluginJsonValue({ ...token }),
|
||||
);
|
||||
});
|
||||
@@ -212,8 +131,7 @@ export function createMSTeamsSsoTokenStoreFs(params?: {
|
||||
async remove({ connectionName, userId }) {
|
||||
let removed = false;
|
||||
await withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, async () => {
|
||||
await importLegacyStore();
|
||||
removed = await tokenStore.delete(makeKey(connectionName, userId));
|
||||
removed = await tokenStore.delete(makeMSTeamsSsoTokenStoreKey(connectionName, userId));
|
||||
});
|
||||
return removed;
|
||||
},
|
||||
@@ -225,13 +143,13 @@ export function createMSTeamsSsoTokenStoreMemory(): MSTeamsSsoTokenStore {
|
||||
const tokens = new Map<string, MSTeamsSsoStoredToken>();
|
||||
return {
|
||||
async get({ connectionName, userId }) {
|
||||
return tokens.get(makeKey(connectionName, userId)) ?? null;
|
||||
return tokens.get(makeMSTeamsSsoTokenStoreKey(connectionName, userId)) ?? null;
|
||||
},
|
||||
async save(token) {
|
||||
tokens.set(makeKey(token.connectionName, token.userId), { ...token });
|
||||
tokens.set(makeMSTeamsSsoTokenStoreKey(token.connectionName, token.userId), { ...token });
|
||||
},
|
||||
async remove({ connectionName, userId }) {
|
||||
return tokens.delete(makeKey(connectionName, userId));
|
||||
return tokens.delete(makeMSTeamsSsoTokenStoreKey(connectionName, userId));
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { withFileLock as withPathLock } from "openclaw/plugin-sdk/file-lock";
|
||||
import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
|
||||
import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
|
||||
import { pathExists } from "openclaw/plugin-sdk/security-runtime";
|
||||
|
||||
const STORE_LOCK_OPTIONS = {
|
||||
@@ -13,14 +13,7 @@ const STORE_LOCK_OPTIONS = {
|
||||
stale: 30_000,
|
||||
} as const;
|
||||
|
||||
export async function readJsonFile<T>(
|
||||
filePath: string,
|
||||
fallback: T,
|
||||
): Promise<{ value: T; exists: boolean }> {
|
||||
return await readJsonFileWithFallback(filePath, fallback);
|
||||
}
|
||||
|
||||
export async function writeJsonFile(filePath: string, value: unknown): Promise<void> {
|
||||
async function writeJsonFile(filePath: string, value: unknown): Promise<void> {
|
||||
await writeJsonFileAtomically(filePath, value);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user