From b9aade4b1262e01bfa5734208e47c350cc6a0346 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 4 Jun 2026 08:20:39 -0700 Subject: [PATCH] refactor: move MS Teams state migration to doctor --- docs/channels/msteams.md | 2 +- docs/refactor/database-first.md | 22 +- .../msteams/doctor-contract-api.test.ts | 212 ++++++++++- extensions/msteams/doctor-contract-api.ts | 328 +++++++++++++++++- .../src/conversation-store-state.test.ts | 135 ++++--- .../msteams/src/conversation-store-state.ts | 146 +++----- .../msteams/src/pending-uploads-fs.test.ts | 72 +--- extensions/msteams/src/pending-uploads-fs.ts | 181 ---------- extensions/msteams/src/polls.test.ts | 122 +------ extensions/msteams/src/polls.ts | 183 ++++------ .../msteams/src/sso-token-store.test.ts | 85 +---- extensions/msteams/src/sso-token-store.ts | 116 +------ extensions/msteams/src/store-fs.ts | 11 +- 13 files changed, 740 insertions(+), 875 deletions(-) diff --git a/docs/channels/msteams.md b/docs/channels/msteams.md index 4c4b03833c24..746e8f0d19f0 100644 --- a/docs/channels/msteams.md +++ b/docs/channels/msteams.md @@ -916,7 +916,7 @@ OpenClaw sends Teams polls as Adaptive Cards (there is no native Teams poll API) - CLI: `openclaw message poll --channel msteams --target conversation: ...` - Votes are recorded by the gateway in OpenClaw plugin-state SQLite under `state/openclaw.sqlite`. -- Existing `msteams-polls.json` files are imported once when the MSTeams plugin starts. +- Existing `msteams-polls.json` files are imported by `openclaw doctor --fix`, not by the running plugin. - The gateway must stay online to record votes. - Polls do not auto-post result summaries yet, and there is no supported poll-results CLI yet. diff --git a/docs/refactor/database-first.md b/docs/refactor/database-first.md index 73ed86ee2f58..301ca8d6855c 100644 --- a/docs/refactor/database-first.md +++ b/docs/refactor/database-first.md @@ -1002,10 +1002,10 @@ sessionId})`; create, branch, continue, list, and fork flows live in their - The generic plugin SDK persistent-dedupe helper no longer exposes file-shaped options. Callers provide SQLite scope keys and durable dedupe rows live in shared plugin state. -- Microsoft Teams SSO and delegated OAuth tokens moved from locked JSON files - to SQLite plugin state. Doctor imports `msteams-sso-tokens.json` and - `msteams-delegated.json`, rebuilds canonical SSO token keys from payloads, - and removes the source files. +- Microsoft Teams SSO tokens moved from locked JSON files to SQLite plugin + state. Doctor imports `msteams-sso-tokens.json`, rebuilds canonical SSO token + keys from payloads, and removes the source file. Delegated OAuth tokens stay + on their existing private credential-file boundary. - Matrix sync cache state moved from `bot-storage.json` to SQLite plugin state. Doctor imports legacy raw or wrapped sync payloads and removes the source file. Active Matrix and QA Matrix clients pass a SQLite sync-store root @@ -1613,13 +1613,13 @@ Move these into the global database: `reply-cache`, `sent-echoes`) instead of `imessage/catchup/*.json`, `imessage/reply-cache.jsonl`, and `imessage/sent-echoes.jsonl`; the iMessage doctor/setup migration imports and removes the legacy files. -- Microsoft Teams conversations, polls, delegated tokens, pending uploads, and - feedback learnings now use SQLite plugin state/blob namespaces - (`conversations`, `polls`, `delegated-tokens`, `pending-uploads`, +- Microsoft Teams conversations, polls, SSO tokens, and feedback learnings now + use SQLite plugin state namespaces (`conversations`, `polls`, `sso-tokens`, `feedback-learnings`) instead of `msteams-conversations.json`, - `msteams-polls.json`, `msteams-delegated.json`, - `msteams-pending-uploads.json`, and `*.learnings.json`; the Microsoft Teams - doctor/setup migration imports and removes the legacy files. + `msteams-polls.json`, `msteams-sso-tokens.json`, and `*.learnings.json`; the + Microsoft Teams doctor/setup migration imports and archives the legacy files. + Pending uploads are a short-lived SQLite cache and old JSON cache files are + not migrated. - Matrix sync cache, storage metadata, thread bindings, inbound dedupe markers, startup verification cooldown state, credentials, recovery keys, and SDK IndexedDB crypto snapshots now use SQLite plugin state/blob namespaces under @@ -2191,8 +2191,6 @@ Add a repo check that fails new runtime writes to legacy state paths: - Microsoft Teams `msteams-conversations.json` - Microsoft Teams `msteams-polls.json` - Microsoft Teams `msteams-sso-tokens.json` -- Microsoft Teams `msteams-delegated.json` -- Microsoft Teams `msteams-pending-uploads.json` - Microsoft Teams `*.learnings.json` - Matrix `bot-storage.json` - Matrix `sync-store.json` diff --git a/extensions/msteams/doctor-contract-api.test.ts b/extensions/msteams/doctor-contract-api.test.ts index bf5f88f76ecd..03d4b5a33944 100644 --- a/extensions/msteams/doctor-contract-api.test.ts +++ b/extensions/msteams/doctor-contract-api.test.ts @@ -12,6 +12,27 @@ import type { } from "openclaw/plugin-sdk/runtime-doctor"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { stateMigrations } from "./doctor-contract-api.js"; +import { + buildMSTeamsConversationStateKey, + MSTEAMS_CONVERSATIONS_NAMESPACE, + type MSTeamsLegacyConversationStoreData, +} from "./src/conversation-store-state.js"; +import type { StoredConversationReference } from "./src/conversation-store.js"; +import { + buildMSTeamsPollStateKey, + buildMSTeamsPollVoteBucketKey, + MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE, + MSTEAMS_POLLS_NAMESPACE, + selectMSTeamsPollVoteBucket, + type MSTeamsPoll, + type StoredMSTeamsPoll, + type StoredMSTeamsPollVoteBucket, +} from "./src/polls.js"; +import { + makeMSTeamsSsoTokenStoreKey, + MSTEAMS_SSO_TOKENS_NAMESPACE, + type MSTeamsSsoStoredToken, +} from "./src/sso-token-store.js"; function createDoctorContext(env: NodeJS.ProcessEnv): PluginDoctorStateMigrationContext { return { @@ -32,6 +53,14 @@ function learningStoreKey(storePath: string, sessionKey: string): string { return createHash("sha256").update(`${storePath}\0${sessionKey}`, "utf8").digest("hex"); } +function migrationById(id: string) { + const migration = stateMigrations.find((entry) => entry.id === id); + if (!migration) { + throw new Error(`missing migration ${id}`); + } + return migration; +} + describe("msteams doctor state migration", () => { let stateDir = ""; let env: NodeJS.ProcessEnv; @@ -46,6 +75,187 @@ describe("msteams doctor state migration", () => { await fs.rm(stateDir, { recursive: true, force: true }); }); + it("imports legacy conversations into plugin state", async () => { + const filePath = path.join(stateDir, "msteams-conversations.json"); + const ref: StoredConversationReference = { + conversation: { id: "19:conv@thread.tacv2" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + user: { id: "user-1" }, + }; + await fs.writeFile( + filePath, + `${JSON.stringify({ + version: 1, + conversations: { + "19:conv@thread.tacv2": ref, + }, + } satisfies MSTeamsLegacyConversationStoreData)}\n`, + ); + + const migration = migrationById("msteams-conversations-json-to-plugin-state"); + const context = createDoctorContext(env); + await expect( + migration.detectLegacyState({ + config: {}, + env, + stateDir, + oauthDir: path.join(stateDir, "oauth"), + context, + }), + ).resolves.toMatchObject({ + preview: [expect.stringContaining("Microsoft Teams conversations")], + }); + + const result = await migration.migrateLegacyState({ + config: {}, + env, + stateDir, + oauthDir: path.join(stateDir, "oauth"), + context, + }); + + expect(result.warnings).toEqual([]); + expect(result.changes).toEqual([ + expect.stringContaining("Migrated 1 Microsoft Teams conversation entry"), + expect.stringContaining("Archived Microsoft Teams conversation legacy source"), + ]); + await expect(fs.access(filePath)).rejects.toThrow(); + await expect(fs.access(`${filePath}.migrated`)).resolves.toBeUndefined(); + const store = context.openPluginStateKeyedStore({ + namespace: MSTEAMS_CONVERSATIONS_NAMESPACE, + maxEntries: 2000, + }); + await expect( + store.lookup(buildMSTeamsConversationStateKey("19:conv@thread.tacv2")), + ).resolves.toMatchObject({ + conversation: { id: "19:conv@thread.tacv2" }, + user: { id: "user-1" }, + }); + }); + + it("imports legacy polls and vote buckets into plugin state", async () => { + const filePath = path.join(stateDir, "msteams-polls.json"); + const poll: MSTeamsPoll = { + id: "poll-legacy", + question: "Lunch?", + options: ["Pizza", "Sushi"], + maxSelections: 1, + createdAt: new Date().toISOString(), + votes: { + "user-legacy": ["0"], + "user-new": ["1"], + }, + }; + await fs.writeFile( + filePath, + `${JSON.stringify({ + version: 1, + polls: { + "poll-legacy": poll, + }, + })}\n`, + ); + const context = createDoctorContext(env); + const voteBucketStore = context.openPluginStateKeyedStore({ + namespace: MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE, + maxEntries: 32_032, + }); + const legacyBucket = selectMSTeamsPollVoteBucket("poll-legacy", "user-legacy"); + await voteBucketStore.register(buildMSTeamsPollVoteBucketKey("poll-legacy", legacyBucket), { + pollId: "poll-legacy", + bucket: legacyBucket, + votes: { "user-legacy": ["1"] }, + updatedAt: poll.createdAt, + }); + + const migration = migrationById("msteams-polls-json-to-plugin-state"); + const result = await migration.migrateLegacyState({ + config: {}, + env, + stateDir, + oauthDir: path.join(stateDir, "oauth"), + context, + }); + + expect(result.warnings).toEqual([]); + expect(result.changes).toEqual([ + expect.stringContaining("Migrated 1 Microsoft Teams poll entry"), + expect.stringContaining("Archived Microsoft Teams poll legacy source"), + ]); + const pollStore = context.openPluginStateKeyedStore({ + namespace: MSTEAMS_POLLS_NAMESPACE, + maxEntries: 2000, + }); + await expect(pollStore.lookup(buildMSTeamsPollStateKey("poll-legacy"))).resolves.toMatchObject({ + id: "poll-legacy", + question: "Lunch?", + }); + const newBucket = selectMSTeamsPollVoteBucket("poll-legacy", "user-new"); + await expect( + voteBucketStore.lookup(buildMSTeamsPollVoteBucketKey("poll-legacy", legacyBucket)), + ).resolves.toMatchObject({ + votes: { "user-legacy": ["1"] }, + }); + await expect( + voteBucketStore.lookup(buildMSTeamsPollVoteBucketKey("poll-legacy", newBucket)), + ).resolves.toMatchObject({ + votes: { "user-new": ["1"] }, + }); + await expect(fs.access(`${filePath}.migrated`)).resolves.toBeUndefined(); + }); + + it("imports legacy SSO tokens into the existing plugin-state token namespace", async () => { + const filePath = path.join(stateDir, "msteams-sso-tokens.json"); + const token: MSTeamsSsoStoredToken = { + connectionName: "conn::alpha", + userId: "user::one", + token: "test-token-value", + updatedAt: "2026-04-10T00:00:00.000Z", + }; + await fs.writeFile( + filePath, + `${JSON.stringify({ + version: 1, + tokens: { + "legacy::wrong-key": token, + }, + })}\n`, + ); + + const migration = migrationById("msteams-sso-tokens-json-to-plugin-state"); + const context = createDoctorContext(env); + const result = await migration.migrateLegacyState({ + config: {}, + env, + stateDir, + oauthDir: path.join(stateDir, "oauth"), + context, + }); + + expect(result.warnings).toEqual([]); + expect(result.changes).toEqual([ + expect.stringContaining("Migrated 1 Microsoft Teams SSO token entry"), + expect.stringContaining("Archived Microsoft Teams SSO-token legacy source"), + ]); + const store = context.openPluginStateKeyedStore({ + namespace: MSTEAMS_SSO_TOKENS_NAMESPACE, + maxEntries: 5000, + }); + await expect( + store.lookup(makeMSTeamsSsoTokenStoreKey("conn::alpha", "user::one")), + ).resolves.toEqual(token); + expect(result.changes.join("\n")).not.toContain(token.token); + expect(result.warnings.join("\n")).not.toContain(token.token); + await expect(fs.access(`${filePath}.migrated`)).resolves.toBeUndefined(); + }); + + it("does not register a doctor migration for pending-upload cache files", () => { + expect(stateMigrations.map((migration) => migration.id)).not.toContain( + "msteams-pending-uploads-json-to-plugin-state", + ); + }); + it("imports legacy feedback learnings into plugin state", async () => { const agentStoreTemplate = path.join(stateDir, "agents", "{agentId}", "sessions"); const mainStorePath = path.join(stateDir, "agents", "main", "sessions"); @@ -69,7 +279,7 @@ describe("msteams doctor state migration", () => { await fs.writeFile(encodedSourcePath, JSON.stringify(["Be concise", "Use examples"])); await fs.writeFile(sanitizedSourcePath, JSON.stringify(["Prefer cards for channel feedback"])); - const migration = stateMigrations[0]; + const migration = migrationById("msteams-feedback-learnings-json-to-plugin-state"); const context = createDoctorContext(env); await context .openPluginStateKeyedStore({ diff --git a/extensions/msteams/doctor-contract-api.ts b/extensions/msteams/doctor-contract-api.ts index 0a2749089ef3..3706bd9df2ca 100644 --- a/extensions/msteams/doctor-contract-api.ts +++ b/extensions/msteams/doctor-contract-api.ts @@ -4,6 +4,43 @@ import fs from "node:fs/promises"; import path from "node:path"; import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime"; +import { normalizeStoredConversationId } from "./src/conversation-store-helpers.js"; +import { + buildMSTeamsConversationStateKey, + MSTEAMS_CONVERSATIONS_LEGACY_FILENAME, + MSTEAMS_CONVERSATIONS_NAMESPACE, + MSTEAMS_SQLITE_MAX_CONVERSATION_ROWS, + normalizeMSTeamsLegacyConversationStore, + prepareMSTeamsConversationReferenceForStorage, + selectRetainedMSTeamsConversations, + type MSTeamsLegacyConversationStoreData, +} from "./src/conversation-store-state.js"; +import type { StoredConversationReference } from "./src/conversation-store.js"; +import { + buildMSTeamsPollStateKey, + buildMSTeamsPollVoteBucketKey, + MSTEAMS_MAX_POLL_VOTE_BUCKET_ROWS, + MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE, + MSTEAMS_POLLS_LEGACY_FILENAME, + MSTEAMS_POLLS_NAMESPACE, + MSTEAMS_SQLITE_MAX_POLL_ROWS, + selectMSTeamsPollVoteBucket, + selectRetainedMSTeamsPolls, + splitMSTeamsPoll, + type MSTeamsPoll, + type MSTeamsPollStoreData, + type StoredMSTeamsPoll, + type StoredMSTeamsPollVoteBucket, +} from "./src/polls.js"; +import { + isMSTeamsSsoStoreData, + makeMSTeamsSsoTokenStoreKey, + MSTEAMS_MAX_SSO_TOKENS, + MSTEAMS_SSO_TOKENS_LEGACY_FILENAME, + MSTEAMS_SSO_TOKENS_NAMESPACE, + normalizeMSTeamsSsoStoredToken, + type MSTeamsSsoStoredToken, +} from "./src/sso-token-store.js"; type FeedbackLearningEntry = { sessionKey: string; @@ -13,6 +50,7 @@ type FeedbackLearningEntry = { const LEARNINGS_NAMESPACE = "feedback-learnings"; const MAX_LEARNING_ENTRIES = 10_000; +const MSTEAMS_PLUGIN_ID = "Microsoft Teams"; function encodeSessionKey(sessionKey: string): string { return Buffer.from(sessionKey, "utf8").toString("base64url"); @@ -102,6 +140,90 @@ async function fileExists(filePath: string): Promise { } } +function resolveStateFilePath(stateDir: string, filename: string): string { + return path.join(stateDir, filename); +} + +async function readLegacyJsonFile( + filePath: string, + parse: (value: unknown) => T | null, +): Promise { + try { + return parse(JSON.parse(await fs.readFile(filePath, "utf8")) as unknown); + } catch { + return null; + } +} + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === "object" && !Array.isArray(value); +} + +function isStringArray(value: unknown): value is string[] { + return Array.isArray(value) && value.every((entry) => typeof entry === "string"); +} + +function parseLegacyConversationStore(value: unknown): MSTeamsLegacyConversationStoreData | null { + if (!isRecord(value) || value.version !== 1 || !isRecord(value.conversations)) { + return null; + } + return normalizeMSTeamsLegacyConversationStore({ + version: 1, + conversations: value.conversations as Record, + }); +} + +function parseLegacyPoll(value: unknown): MSTeamsPoll | null { + if (!isRecord(value)) { + return null; + } + const votes = isRecord(value.votes) ? value.votes : null; + if ( + typeof value.id !== "string" || + !value.id || + typeof value.question !== "string" || + !value.question || + !isStringArray(value.options) || + typeof value.maxSelections !== "number" || + !Number.isFinite(value.maxSelections) || + typeof value.createdAt !== "string" || + !votes + ) { + return null; + } + const normalizedVotes: Record = {}; + for (const [voterId, selections] of Object.entries(votes)) { + if (typeof voterId === "string" && isStringArray(selections)) { + normalizedVotes[voterId] = selections; + } + } + return { + id: value.id, + question: value.question, + options: value.options, + maxSelections: value.maxSelections, + createdAt: value.createdAt, + ...(typeof value.updatedAt === "string" ? { updatedAt: value.updatedAt } : {}), + ...(typeof value.conversationId === "string" ? { conversationId: value.conversationId } : {}), + ...(typeof value.messageId === "string" ? { messageId: value.messageId } : {}), + votes: normalizedVotes, + }; +} + +function parseLegacyPollStore(value: unknown): MSTeamsPollStoreData | null { + if (!isRecord(value) || value.version !== 1 || !isRecord(value.polls)) { + return null; + } + const polls: Record = {}; + for (const [pollId, poll] of Object.entries(value.polls)) { + const parsed = parseLegacyPoll(poll); + if (parsed) { + polls[pollId] = parsed; + } + } + return { version: 1, polls }; +} + async function listLegacyLearningFiles( storePath: string, ): Promise< @@ -147,25 +269,23 @@ async function listLegacyLearningFiles( async function archiveLegacySource(params: { filePath: string; + label?: string; changes: string[]; warnings: string[]; }): Promise { const archivedPath = `${params.filePath}.migrated`; + const label = params.label ?? "Microsoft Teams feedback-learning"; if (await fileExists(archivedPath)) { params.warnings.push( - `Left migrated Microsoft Teams feedback-learning source in place because ${archivedPath} already exists`, + `Left migrated ${label} source in place because ${archivedPath} already exists`, ); return; } try { await fs.rename(params.filePath, archivedPath); - params.changes.push( - `Archived Microsoft Teams feedback-learning legacy source -> ${archivedPath}`, - ); + params.changes.push(`Archived ${label} legacy source -> ${archivedPath}`); } catch (err) { - params.warnings.push( - `Failed archiving Microsoft Teams feedback-learning legacy source: ${String(err)}`, - ); + params.warnings.push(`Failed archiving ${label} legacy source: ${String(err)}`); } } @@ -183,6 +303,200 @@ function mergeLearnings(legacy: string[], existing?: FeedbackLearningEntry): str } export const stateMigrations: PluginDoctorStateMigration[] = [ + { + id: "msteams-conversations-json-to-plugin-state", + label: "Microsoft Teams conversations", + async detectLegacyState(params) { + const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_CONVERSATIONS_LEGACY_FILENAME); + const state = await readLegacyJsonFile(filePath, parseLegacyConversationStore); + if (!state || Object.keys(state.conversations).length === 0) { + return null; + } + return { + preview: [ + `- ${MSTEAMS_PLUGIN_ID} conversations: ${Object.keys(state.conversations).length} entries -> plugin state (${MSTEAMS_CONVERSATIONS_NAMESPACE})`, + ], + }; + }, + async migrateLegacyState(params) { + const changes: string[] = []; + const warnings: string[] = []; + const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_CONVERSATIONS_LEGACY_FILENAME); + const state = await readLegacyJsonFile(filePath, parseLegacyConversationStore); + if (!state) { + return { changes, warnings }; + } + const store = params.context.openPluginStateKeyedStore({ + namespace: MSTEAMS_CONVERSATIONS_NAMESPACE, + maxEntries: MSTEAMS_SQLITE_MAX_CONVERSATION_ROWS, + }); + let imported = 0; + for (const [rawConversationId, reference] of selectRetainedMSTeamsConversations( + state.conversations, + )) { + const conversationId = normalizeStoredConversationId(rawConversationId); + if (!conversationId) { + continue; + } + const didImport = await store.registerIfAbsent( + buildMSTeamsConversationStateKey(conversationId), + prepareMSTeamsConversationReferenceForStorage(conversationId, reference), + ); + if (didImport) { + imported++; + } + } + changes.push( + `Migrated ${imported} ${MSTEAMS_PLUGIN_ID} conversation ${imported === 1 ? "entry" : "entries"} -> plugin state`, + ); + await archiveLegacySource({ + filePath, + label: `${MSTEAMS_PLUGIN_ID} conversation`, + changes, + warnings, + }); + return { changes, warnings }; + }, + }, + { + id: "msteams-polls-json-to-plugin-state", + label: "Microsoft Teams polls", + async detectLegacyState(params) { + const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_POLLS_LEGACY_FILENAME); + const state = await readLegacyJsonFile(filePath, parseLegacyPollStore); + if (!state || Object.keys(state.polls).length === 0) { + return null; + } + return { + preview: [ + `- ${MSTEAMS_PLUGIN_ID} polls: ${Object.keys(state.polls).length} entries -> plugin state (${MSTEAMS_POLLS_NAMESPACE})`, + ], + }; + }, + async migrateLegacyState(params) { + const changes: string[] = []; + const warnings: string[] = []; + const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_POLLS_LEGACY_FILENAME); + const state = await readLegacyJsonFile(filePath, parseLegacyPollStore); + if (!state) { + return { changes, warnings }; + } + const pollStore = params.context.openPluginStateKeyedStore({ + namespace: MSTEAMS_POLLS_NAMESPACE, + maxEntries: MSTEAMS_SQLITE_MAX_POLL_ROWS, + }); + const voteBucketStore = params.context.openPluginStateKeyedStore( + { + namespace: MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE, + maxEntries: MSTEAMS_MAX_POLL_VOTE_BUCKET_ROWS, + }, + ); + let imported = 0; + for (const [pollId, poll] of selectRetainedMSTeamsPolls(state.polls)) { + const { metadata, votes } = splitMSTeamsPoll(poll); + const didImportPoll = await pollStore.registerIfAbsent( + buildMSTeamsPollStateKey(pollId), + metadata, + ); + const buckets = new Map>(); + for (const [voterId, selections] of Object.entries(votes)) { + const bucket = selectMSTeamsPollVoteBucket(pollId, voterId); + const bucketVotes = buckets.get(bucket) ?? {}; + bucketVotes[voterId] = selections; + buckets.set(bucket, bucketVotes); + } + let importedVoteBucket = false; + for (const [bucket, bucketVotes] of buckets) { + const key = buildMSTeamsPollVoteBucketKey(pollId, bucket); + const existing = await voteBucketStore.lookup(key); + await voteBucketStore.register(key, { + pollId, + bucket, + votes: { ...bucketVotes, ...existing?.votes }, + updatedAt: poll.updatedAt ?? poll.createdAt, + }); + importedVoteBucket = true; + } + if (didImportPoll || importedVoteBucket) { + imported++; + } + } + changes.push( + `Migrated ${imported} ${MSTEAMS_PLUGIN_ID} poll ${imported === 1 ? "entry" : "entries"} -> plugin state`, + ); + await archiveLegacySource({ + filePath, + label: `${MSTEAMS_PLUGIN_ID} poll`, + changes, + warnings, + }); + return { changes, warnings }; + }, + }, + { + id: "msteams-sso-tokens-json-to-plugin-state", + label: "Microsoft Teams SSO tokens", + async detectLegacyState(params) { + const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_SSO_TOKENS_LEGACY_FILENAME); + const state = await readLegacyJsonFile(filePath, (value) => + isMSTeamsSsoStoreData(value) ? value : null, + ); + if (!state || Object.keys(state.tokens).length === 0) { + return null; + } + return { + preview: [ + `- ${MSTEAMS_PLUGIN_ID} SSO tokens: ${Object.keys(state.tokens).length} entries -> plugin state (${MSTEAMS_SSO_TOKENS_NAMESPACE})`, + ], + }; + }, + async migrateLegacyState(params) { + const changes: string[] = []; + const warnings: string[] = []; + const filePath = resolveStateFilePath(params.stateDir, MSTEAMS_SSO_TOKENS_LEGACY_FILENAME); + const state = await readLegacyJsonFile(filePath, (value) => + isMSTeamsSsoStoreData(value) ? value : null, + ); + if (!state) { + return { changes, warnings }; + } + const store = params.context.openPluginStateKeyedStore({ + namespace: MSTEAMS_SSO_TOKENS_NAMESPACE, + maxEntries: MSTEAMS_MAX_SSO_TOKENS, + }); + let imported = 0; + let skipped = 0; + for (const token of Object.values(state.tokens)) { + const normalized = normalizeMSTeamsSsoStoredToken(token); + if (!normalized) { + skipped++; + continue; + } + const didImport = await store.registerIfAbsent( + makeMSTeamsSsoTokenStoreKey(normalized.connectionName, normalized.userId), + normalized, + ); + if (didImport) { + imported++; + } + } + changes.push( + `Migrated ${imported} ${MSTEAMS_PLUGIN_ID} SSO token ${imported === 1 ? "entry" : "entries"} -> plugin state`, + ); + if (skipped > 0) { + warnings.push( + `Skipped ${skipped} malformed ${MSTEAMS_PLUGIN_ID} SSO token ${skipped === 1 ? "entry" : "entries"} during migration`, + ); + } + await archiveLegacySource({ + filePath, + label: `${MSTEAMS_PLUGIN_ID} SSO-token`, + changes, + warnings, + }); + return { changes, warnings }; + }, + }, { id: "msteams-feedback-learnings-json-to-plugin-state", label: "Microsoft Teams feedback learnings", diff --git a/extensions/msteams/src/conversation-store-state.test.ts b/extensions/msteams/src/conversation-store-state.test.ts index 932b8b17db3a..be8399107202 100644 --- a/extensions/msteams/src/conversation-store-state.test.ts +++ b/extensions/msteams/src/conversation-store-state.test.ts @@ -22,7 +22,7 @@ describe("msteams conversation store (plugin state)", () => { setMSTeamsRuntime(msteamsRuntimeStub); }); - it("filters and prunes expired entries while preserving legacy entries without lastSeenAt", async () => { + it("filters expired SQLite entries while preserving entries without lastSeenAt", async () => { const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); const env: NodeJS.ProcessEnv = { ...process.env, @@ -35,35 +35,28 @@ describe("msteams conversation store (plugin state)", () => { serviceUrl: "https://service.example.com", user: { id: "u1", aadObjectId: "aad1" }, }; - const filePath = path.join(stateDir, "msteams-conversations.json"); - await fs.promises.mkdir(path.dirname(filePath), { recursive: true }); - await fs.promises.writeFile( - filePath, - `${JSON.stringify( - { - version: 1, - conversations: { - "19:active@thread.tacv2": ref, - "19:old@thread.tacv2": { - ...ref, - conversation: { id: "19:old@thread.tacv2" }, - lastSeenAt: new Date(Date.now() - 60_000).toISOString(), - }, - "19:legacy@thread.tacv2": { - ...ref, - conversation: { id: "19:legacy@thread.tacv2" }, - }, - }, - }, - null, - 2, - )}\n`, + const sqliteStore = createPluginStateKeyedStoreForTests( + "msteams", + { + namespace: "conversations", + maxEntries: 2000, + env, + }, ); + await sqliteStore.register(conversationStateKey("19:active@thread.tacv2"), ref); + await sqliteStore.register(conversationStateKey("19:old@thread.tacv2"), { + ...ref, + conversation: { id: "19:old@thread.tacv2" }, + lastSeenAt: new Date(Date.now() - 60_000).toISOString(), + }); + await sqliteStore.register(conversationStateKey("19:legacy@thread.tacv2"), { + ...ref, + conversation: { id: "19:legacy@thread.tacv2" }, + }); const store = createMSTeamsConversationStoreState({ env, ttlMs: 1_000 }); const ids = (await store.list()).map((entry) => entry.conversationId).toSorted(); expect(ids).toEqual(["19:active@thread.tacv2", "19:legacy@thread.tacv2"]); - await expect(fs.promises.access(filePath)).rejects.toThrow(); expect(await store.get("19:old@thread.tacv2")).toBeNull(); const legacyConversation = await store.get("19:legacy@thread.tacv2"); @@ -87,7 +80,7 @@ describe("msteams conversation store (plugin state)", () => { ).resolves.toBeUndefined(); }); - it("does not let a stale legacy JSON file overwrite existing SQLite rows", async () => { + it("ignores a stale legacy JSON file at runtime", async () => { const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); const env: NodeJS.ProcessEnv = { ...process.env, @@ -125,41 +118,23 @@ describe("msteams conversation store (plugin state)", () => { const store = createMSTeamsConversationStoreState({ env }); await expect(store.get("conv-current")).resolves.toEqual(ref); + await expect(fs.promises.access(filePath)).resolves.toBeUndefined(); }); it("hashes external conversation ids before using plugin-state keys", async () => { const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); const longConversationId = `a:${"x".repeat(900)}`; - const filePath = path.join(stateDir, "msteams-conversations.json"); - await fs.promises.writeFile( - filePath, - `${JSON.stringify({ - version: 1, - conversations: { - [longConversationId]: { - channelId: "msteams", - serviceUrl: "https://service.example.com", - user: { id: "long-user" }, - } satisfies StoredConversationReference, - }, - })}\n`, - ); - const store = createMSTeamsConversationStoreState({ stateDir }); - await expect(store.get(longConversationId)).resolves.toMatchObject({ - conversation: { id: longConversationId }, - user: { id: "long-user" }, - }); - await store.upsert(`${longConversationId}-new`, { + await store.upsert(longConversationId, { conversation: { conversationType: "personal" }, channelId: "msteams", serviceUrl: "https://service.example.com", - user: { id: "long-user-new" }, + user: { id: "long-user" }, }); - await expect(store.get(`${longConversationId}-new`)).resolves.toMatchObject({ - conversation: { id: `${longConversationId}-new` }, - user: { id: "long-user-new" }, + await expect(store.get(longConversationId)).resolves.toMatchObject({ + conversation: { id: longConversationId }, + user: { id: "long-user" }, }); }); @@ -199,29 +174,33 @@ describe("msteams conversation store (plugin state)", () => { }); }); - it("keeps newest legacy conversations by lastSeenAt at the row cap", async () => { + it("keeps newest conversations by lastSeenAt at the row cap", async () => { const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); - const filePath = path.join(stateDir, "msteams-conversations.json"); - const conversations: Record = { - "conv-recent": { - conversation: { id: "conv-recent" }, - channelId: "msteams", - serviceUrl: "https://service.example.com", - lastSeenAt: "2026-03-25T20:00:00.000Z", + const env: NodeJS.ProcessEnv = { ...process.env, OPENCLAW_STATE_DIR: stateDir }; + const sqliteStore = createPluginStateKeyedStoreForTests( + "msteams", + { + namespace: "conversations", + maxEntries: 2000, + env, }, - }; + ); for (let index = 0; index < 1000; index += 1) { const id = `conv-${String(index).padStart(4, "0")}`; - conversations[id] = { + await sqliteStore.register(conversationStateKey(id), { conversation: { id }, channelId: "msteams", serviceUrl: "https://service.example.com", lastSeenAt: new Date(Date.UTC(2026, 1, 1, 0, 0, index)).toISOString(), - }; + }); } - await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, conversations })}\n`); - const store = createMSTeamsConversationStoreState({ stateDir }); + const store = createMSTeamsConversationStoreState({ env }); + await store.upsert("conv-recent", { + conversation: { id: "conv-recent" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + }); const ids = (await store.list()).map((entry) => entry.conversationId); expect(ids).toHaveLength(1000); @@ -229,29 +208,33 @@ describe("msteams conversation store (plugin state)", () => { expect(ids).not.toContain("conv-0000"); }); - it("treats timestamp-less legacy conversations as oldest during later cap pruning", async () => { + it("treats timestamp-less conversations as oldest during later cap pruning", async () => { const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); - const filePath = path.join(stateDir, "msteams-conversations.json"); - const conversations: Record = { - "conv-legacy": { - conversation: { id: "conv-legacy" }, - channelId: "msteams", - serviceUrl: "https://service.example.com", + const env: NodeJS.ProcessEnv = { ...process.env, OPENCLAW_STATE_DIR: stateDir }; + const sqliteStore = createPluginStateKeyedStoreForTests( + "msteams", + { + namespace: "conversations", + maxEntries: 2000, + env, }, - }; + ); + await sqliteStore.register(conversationStateKey("conv-legacy"), { + conversation: { id: "conv-legacy" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + }); for (let index = 0; index < 999; index += 1) { const id = `conv-seen-${String(index).padStart(4, "0")}`; - conversations[id] = { + await sqliteStore.register(conversationStateKey(id), { conversation: { id }, channelId: "msteams", serviceUrl: "https://service.example.com", lastSeenAt: new Date(Date.UTC(2026, 1, 1, 0, 0, index)).toISOString(), - }; + }); } - await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, conversations })}\n`); - const store = createMSTeamsConversationStoreState({ stateDir }); - await store.list(); + const store = createMSTeamsConversationStoreState({ env }); await store.upsert("conv-new", { conversation: { id: "conv-new" }, channelId: "msteams", diff --git a/extensions/msteams/src/conversation-store-state.ts b/extensions/msteams/src/conversation-store-state.ts index 4022b03972cd..91096ff11b2a 100644 --- a/extensions/msteams/src/conversation-store-state.ts +++ b/extensions/msteams/src/conversation-store-state.ts @@ -1,5 +1,4 @@ import crypto from "node:crypto"; -import fs from "node:fs/promises"; import { findPreferredDmConversationByUserId, mergeStoredConversationReference, @@ -18,25 +17,17 @@ import { toPluginJsonValue, withMSTeamsSqliteMutationLock, } from "./sqlite-state.js"; -import { resolveMSTeamsStorePath } from "./storage.js"; -import { readJsonFile } from "./store-fs.js"; -type ConversationStoreData = { +export type MSTeamsLegacyConversationStoreData = { version: 1; conversations: Record; }; -type ConversationMigrationMarker = { - importedAt: string; -}; - -const STORE_FILENAME = "msteams-conversations.json"; -const CONVERSATIONS_NAMESPACE = "conversations"; -const CONVERSATION_MIGRATIONS_NAMESPACE = "conversation-migrations"; -const LEGACY_JSON_MIGRATION_KEY = "msteams-conversations-json-v1"; -const MAX_CONVERSATIONS = 1000; -const SQLITE_MAX_CONVERSATION_ROWS = MAX_CONVERSATIONS + 1000; -const CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000; +export const MSTEAMS_CONVERSATIONS_LEGACY_FILENAME = "msteams-conversations.json"; +export const MSTEAMS_CONVERSATIONS_NAMESPACE = "conversations"; +export const MSTEAMS_MAX_CONVERSATIONS = 1000; +export const MSTEAMS_SQLITE_MAX_CONVERSATION_ROWS = MSTEAMS_MAX_CONVERSATIONS + 1000; +export const MSTEAMS_CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000; const CONVERSATION_LOCK_FILENAME = "msteams-conversations.sqlite.lock"; type MSTeamsConversationStoreStateOptions = { @@ -49,31 +40,15 @@ type MSTeamsConversationStoreStateOptions = { function createConversationStateStore(params?: MSTeamsConversationStoreStateOptions) { return getMSTeamsRuntime().state.openKeyedStore({ - namespace: CONVERSATIONS_NAMESPACE, - maxEntries: SQLITE_MAX_CONVERSATION_ROWS, + namespace: MSTEAMS_CONVERSATIONS_NAMESPACE, + maxEntries: MSTEAMS_SQLITE_MAX_CONVERSATION_ROWS, env: resolveMSTeamsSqliteStateEnv(params), }); } -function createConversationMigrationStore(params?: MSTeamsConversationStoreStateOptions) { - return getMSTeamsRuntime().state.openKeyedStore({ - namespace: CONVERSATION_MIGRATIONS_NAMESPACE, - maxEntries: 100, - env: resolveMSTeamsSqliteStateEnv(params), - }); -} - -function resolveLegacyStorePath(params?: MSTeamsConversationStoreStateOptions): string { - return resolveMSTeamsStorePath({ - filename: STORE_FILENAME, - env: params?.env, - homedir: params?.homedir, - stateDir: params?.stateDir, - storePath: params?.storePath, - }); -} - -function normalizeLegacyStore(value: ConversationStoreData): ConversationStoreData { +export function normalizeMSTeamsLegacyConversationStore( + value: MSTeamsLegacyConversationStoreData, +): MSTeamsLegacyConversationStoreData { if ( value.version !== 1 || !value.conversations || @@ -85,11 +60,11 @@ function normalizeLegacyStore(value: ConversationStoreData): ConversationStoreDa return value; } -function buildConversationStateKey(conversationId: string): string { +export function buildMSTeamsConversationStateKey(conversationId: string): string { return crypto.createHash("sha256").update(conversationId).digest("hex"); } -function prepareConversationReferenceForStorage( +export function prepareMSTeamsConversationReferenceForStorage( conversationId: string, reference: StoredConversationReference, ): StoredConversationReference { @@ -107,14 +82,30 @@ function getStoredConversationId(reference: StoredConversationReference): string return rawId ? normalizeStoredConversationId(rawId) : null; } +export function selectRetainedMSTeamsConversations( + conversations: Record, + ttlMs = MSTEAMS_CONVERSATION_TTL_MS, +): Array<[string, StoredConversationReference]> { + const retained = Object.entries(conversations).filter(([, reference]) => { + const lastSeenAt = parseStoredConversationTimestamp(reference.lastSeenAt); + return lastSeenAt == null || Date.now() - lastSeenAt <= ttlMs; + }); + if (retained.length <= MSTEAMS_MAX_CONVERSATIONS) { + return retained; + } + retained.sort((a, b) => { + const aTs = parseStoredConversationTimestamp(a[1].lastSeenAt) ?? 0; + const bTs = parseStoredConversationTimestamp(b[1].lastSeenAt) ?? 0; + return aTs - bTs || a[0].localeCompare(b[0]); + }); + return retained.slice(retained.length - MSTEAMS_MAX_CONVERSATIONS); +} + export function createMSTeamsConversationStoreState( params?: MSTeamsConversationStoreStateOptions, ): MSTeamsConversationStore { - const ttlMs = params?.ttlMs ?? CONVERSATION_TTL_MS; + const ttlMs = params?.ttlMs ?? MSTEAMS_CONVERSATION_TTL_MS; const conversationStore = createConversationStateStore(params); - const migrationStore = createConversationMigrationStore(params); - const legacyStorePath = resolveLegacyStorePath(params); - let legacyImportPromise: Promise | null = null; const isExpired = (reference: StoredConversationReference): boolean => { const lastSeenAt = parseStoredConversationTimestamp(reference.lastSeenAt); @@ -122,66 +113,11 @@ export function createMSTeamsConversationStoreState( return lastSeenAt != null && Date.now() - lastSeenAt > ttlMs; }; - const selectRetainedConversations = ( - conversations: Record, - ): Array<[string, StoredConversationReference]> => { - const retained = Object.entries(conversations).filter(([, reference]) => !isExpired(reference)); - if (retained.length <= MAX_CONVERSATIONS) { - return retained; - } - retained.sort((a, b) => { - const aTs = parseStoredConversationTimestamp(a[1].lastSeenAt) ?? 0; - const bTs = parseStoredConversationTimestamp(b[1].lastSeenAt) ?? 0; - return aTs - bTs || a[0].localeCompare(b[0]); - }); - return retained.slice(retained.length - MAX_CONVERSATIONS); - }; - - const importLegacyStore = async (): Promise => { - if (await migrationStore.lookup(LEGACY_JSON_MIGRATION_KEY)) { - return; - } - const empty: ConversationStoreData = { version: 1, conversations: {} }; - const { value, exists } = await readJsonFile(legacyStorePath, empty); - if (!exists) { - await migrationStore.register(LEGACY_JSON_MIGRATION_KEY, { - importedAt: new Date().toISOString(), - }); - return; - } - const legacy = normalizeLegacyStore(value); - for (const [rawConversationId, reference] of selectRetainedConversations( - legacy.conversations, - )) { - const conversationId = normalizeStoredConversationId(rawConversationId); - if (!conversationId) { - continue; - } - await conversationStore.registerIfAbsent( - buildConversationStateKey(conversationId), - toPluginJsonValue(prepareConversationReferenceForStorage(conversationId, reference)), - ); - } - await migrationStore.register(LEGACY_JSON_MIGRATION_KEY, { - importedAt: new Date().toISOString(), - }); - await fs.rm(legacyStorePath, { force: true }).catch(() => {}); - }; - - const ensureLegacyImported = async (): Promise => { - legacyImportPromise ??= withMSTeamsSqliteMutationLock( - params, - CONVERSATION_LOCK_FILENAME, - importLegacyStore, - ); - await legacyImportPromise; - }; - const lookupStored = async ( conversationId: string, ): Promise => { const normalizedId = normalizeStoredConversationId(conversationId); - const value = await conversationStore.lookup(buildConversationStateKey(normalizedId)); + const value = await conversationStore.lookup(buildMSTeamsConversationStateKey(normalizedId)); if (!value) { return null; } @@ -192,7 +128,6 @@ export function createMSTeamsConversationStoreState( }; const entries = async (): Promise> => { - await ensureLegacyImported(); const rows = await conversationStore.entries(); const kept: Array<[string, StoredConversationReference]> = []; for (const row of rows) { @@ -208,7 +143,6 @@ export function createMSTeamsConversationStoreState( }; const lookup = async (conversationId: string): Promise => { - await ensureLegacyImported(); return await lookupStored(conversationId); }; @@ -218,8 +152,8 @@ export function createMSTeamsConversationStoreState( ): Promise => { const normalizedId = normalizeStoredConversationId(conversationId); await conversationStore.register( - buildConversationStateKey(normalizedId), - toPluginJsonValue(prepareConversationReferenceForStorage(normalizedId, reference)), + buildMSTeamsConversationStateKey(normalizedId), + toPluginJsonValue(prepareMSTeamsConversationReferenceForStorage(normalizedId, reference)), ); const rows = []; for (const row of await conversationStore.entries()) { @@ -229,7 +163,7 @@ export function createMSTeamsConversationStoreState( } rows.push(row); } - if (rows.length <= MAX_CONVERSATIONS) { + if (rows.length <= MSTEAMS_MAX_CONVERSATIONS) { return; } const sorted = rows.toSorted((a, b) => { @@ -239,7 +173,7 @@ export function createMSTeamsConversationStoreState( const bId = getStoredConversationId(b.value) ?? b.key; return aTs - bTs || aId.localeCompare(bId); }); - for (const row of sorted.slice(0, rows.length - MAX_CONVERSATIONS)) { + for (const row of sorted.slice(0, rows.length - MSTEAMS_MAX_CONVERSATIONS)) { await conversationStore.delete(row.key); } }; @@ -264,7 +198,6 @@ export function createMSTeamsConversationStoreState( ): Promise => { const normalizedId = normalizeStoredConversationId(conversationId); await withMSTeamsSqliteMutationLock(params, CONVERSATION_LOCK_FILENAME, async () => { - await importLegacyStore(); const existing = await lookupStored(normalizedId); await register( normalizedId, @@ -280,8 +213,7 @@ export function createMSTeamsConversationStoreState( const remove = async (conversationId: string): Promise => { const normalizedId = normalizeStoredConversationId(conversationId); return await withMSTeamsSqliteMutationLock(params, CONVERSATION_LOCK_FILENAME, async () => { - await importLegacyStore(); - return await conversationStore.delete(buildConversationStateKey(normalizedId)); + return await conversationStore.delete(buildMSTeamsConversationStateKey(normalizedId)); }); }; diff --git a/extensions/msteams/src/pending-uploads-fs.test.ts b/extensions/msteams/src/pending-uploads-fs.test.ts index 630198c8699d..d81f4c012f27 100644 --- a/extensions/msteams/src/pending-uploads-fs.test.ts +++ b/extensions/msteams/src/pending-uploads-fs.test.ts @@ -212,58 +212,7 @@ describe("msteams pending uploads (fs-backed)", () => { expect(loaded?.consentCardActivityId).toBe("activity-xyz"); }); - it("ignores malformed or empty store files and returns undefined", async () => { - const stateDir = await makeTempStateDir(); - const env = makeEnv(stateDir); - const storePath = path.join(stateDir, "msteams-pending-uploads.json"); - await fs.promises.writeFile(storePath, "not valid json", "utf-8"); - - // Should not throw and should treat as empty - expect(await getPendingUploadFs("anything", { env })).toBeUndefined(); - await expect(fs.promises.access(storePath)).rejects.toThrow(); - - const secondStateDir = await makeTempStateDir(); - const secondEnv = makeEnv(secondStateDir); - const secondStorePath = path.join(secondStateDir, "msteams-pending-uploads.json"); - await fs.promises.writeFile( - secondStorePath, - JSON.stringify({ version: 2, uploads: {} }), - "utf-8", - ); - expect(await getPendingUploadFs("anything", { env: secondEnv })).toBeUndefined(); - await expect(fs.promises.access(secondStorePath)).rejects.toThrow(); - }); - - it("imports a legacy JSON file that appears after an empty migration marker", async () => { - const stateDir = await makeTempStateDir(); - const env = makeEnv(stateDir); - const storePath = path.join(stateDir, "msteams-pending-uploads.json"); - - expect(await getPendingUploadFs("upload-late", { env })).toBeUndefined(); - await fs.promises.writeFile( - storePath, - `${JSON.stringify({ - version: 1, - uploads: { - "upload-late": { - id: "upload-late", - bufferBase64: Buffer.from("late payload").toString("base64"), - filename: "late.txt", - conversationId: "19:conv@thread.v2", - createdAt: Date.now(), - }, - }, - })}\n`, - "utf-8", - ); - - const loaded = await getPendingUploadFs("upload-late", { env }); - expect(loaded?.filename).toBe("late.txt"); - expect(loaded?.buffer.toString("utf8")).toBe("late payload"); - await expect(fs.promises.access(storePath)).rejects.toThrow(); - }); - - it("skips malformed legacy upload rows while importing valid rows", async () => { + it("ignores legacy pending-upload JSON cache files at runtime", async () => { const stateDir = await makeTempStateDir(); const env = makeEnv(stateDir); const storePath = path.join(stateDir, "msteams-pending-uploads.json"); @@ -272,16 +221,10 @@ describe("msteams pending uploads (fs-backed)", () => { `${JSON.stringify({ version: 1, uploads: { - broken: { - id: "broken", - filename: "broken.txt", - conversationId: "19:conv@thread.v2", - createdAt: Date.now(), - }, - valid: { - id: "valid", - bufferBase64: Buffer.from("valid payload").toString("base64"), - filename: "valid.txt", + cached: { + id: "cached", + bufferBase64: Buffer.from("cached payload").toString("base64"), + filename: "cached.txt", conversationId: "19:conv@thread.v2", createdAt: Date.now(), }, @@ -290,9 +233,8 @@ describe("msteams pending uploads (fs-backed)", () => { "utf-8", ); - expect(await getPendingUploadFs("broken", { env })).toBeUndefined(); - const loaded = await getPendingUploadFs("valid", { env }); - expect(loaded?.buffer.toString("utf8")).toBe("valid payload"); + expect(await getPendingUploadFs("cached", { env })).toBeUndefined(); + await expect(fs.promises.access(storePath)).resolves.toBeUndefined(); }); }); diff --git a/extensions/msteams/src/pending-uploads-fs.ts b/extensions/msteams/src/pending-uploads-fs.ts index b4aad1e0b7f4..1a3ade1341a2 100644 --- a/extensions/msteams/src/pending-uploads-fs.ts +++ b/extensions/msteams/src/pending-uploads-fs.ts @@ -1,5 +1,4 @@ import { createHash } from "node:crypto"; -import fs from "node:fs/promises"; import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import { getMSTeamsRuntime } from "./runtime.js"; import { @@ -7,8 +6,6 @@ import { toPluginJsonValue, withMSTeamsSqliteMutationLock, } from "./sqlite-state.js"; -import { resolveMSTeamsStorePath } from "./storage.js"; -import { readJsonFile } from "./store-fs.js"; /** TTL for persisted pending uploads (matches in-memory store). */ const PENDING_UPLOAD_TTL_MS = 5 * 60 * 1000; @@ -20,10 +17,8 @@ const MAX_PENDING_UPLOAD_CHUNK_ROWS = 45_000; const RAW_CHUNK_BYTES = 36 * 1024; const PENDING_UPLOAD_META_MAX_ENTRIES = MAX_PENDING_UPLOADS + 100; -const STORE_FILENAME = "msteams-pending-uploads.json"; const PENDING_UPLOAD_META_NAMESPACE = "pending-uploads"; const PENDING_UPLOAD_CHUNKS_NAMESPACE = "pending-upload-chunks"; -const PENDING_UPLOAD_MIGRATIONS_NAMESPACE = "pending-upload-migrations"; const PENDING_UPLOAD_LOCK_FILENAME = "msteams-pending-uploads.sqlite.lock"; type PendingUploadFsRecord = { @@ -47,13 +42,6 @@ type PendingUploadFs = { createdAt: number; }; -type PendingUploadStoreData = { - version: 1; - uploads: Record; -}; - -const empty: PendingUploadStoreData = { version: 1, uploads: {} }; - type PendingUploadMetaRecord = Omit & { chunkCount: number; byteLength: number; @@ -65,10 +53,6 @@ type PendingUploadChunkRecord = { dataBase64: string; }; -type PendingUploadMigrationMarker = { - importedAt: string; -}; - type PendingUploadsFsOptions = { env?: NodeJS.ProcessEnv; homedir?: () => string; @@ -77,16 +61,6 @@ type PendingUploadsFsOptions = { ttlMs?: number; }; -function resolveLegacyFilePath(options: PendingUploadsFsOptions | undefined): string { - return resolveMSTeamsStorePath({ - filename: STORE_FILENAME, - env: options?.env, - homedir: options?.homedir, - stateDir: options?.stateDir, - storePath: options?.storePath, - }); -} - function createMetaStore( options: PendingUploadsFsOptions | undefined, ): PluginStateKeyedStore { @@ -107,16 +81,6 @@ function createChunkStore( }); } -function createMigrationStore( - options: PendingUploadsFsOptions | undefined, -): PluginStateKeyedStore { - return getMSTeamsRuntime().state.openKeyedStore({ - namespace: PENDING_UPLOAD_MIGRATIONS_NAMESPACE, - maxEntries: 100, - env: resolveMSTeamsSqliteStateEnv(options), - }); -} - function buildUploadKey(id: string): string { return `upload:${createHash("sha256").update(id).digest("hex")}`; } @@ -129,45 +93,6 @@ function buildChunkKey(id: string, index: number): string { return `${buildUploadKey(id)}:chunk:${String(index).padStart(4, "0")}`; } -function buildMigrationKey(filePath: string): string { - return `legacy-json:${createHash("sha256").update(filePath).digest("hex")}`; -} - -function buildMigrationContentKey(filePath: string, value: unknown): string { - return `legacy-json-content:${createHash("sha256") - .update(filePath) - .update("\0") - .update(JSON.stringify(value) ?? "undefined") - .digest("hex")}`; -} - -function pruneExpired( - uploads: Record, - nowMs: number, - ttlMs: number, -): Record { - const kept: Record = {}; - for (const [id, record] of Object.entries(uploads)) { - if (nowMs - record.createdAt <= ttlMs) { - kept[id] = record; - } - } - return kept; -} - -function pruneToLimit( - uploads: Record, -): Record { - const entries = Object.entries(uploads); - if (entries.length <= MAX_PENDING_UPLOADS) { - return uploads; - } - // Oldest createdAt first; drop the oldest until we fit. - entries.sort((a, b) => a[1].createdAt - b[1].createdAt); - const keep = entries.slice(entries.length - MAX_PENDING_UPLOADS); - return Object.fromEntries(keep); -} - function recordToUpload( record: PendingUploadFsRecord | PendingUploadMetaRecord, buffer: Buffer, @@ -183,63 +108,6 @@ function recordToUpload( }; } -function isValidStore(value: unknown): value is PendingUploadStoreData { - if (!value || typeof value !== "object") { - return false; - } - const candidate = value as Partial; - return ( - candidate.version === 1 && - typeof candidate.uploads === "object" && - candidate.uploads !== null && - !Array.isArray(candidate.uploads) - ); -} - -function normalizeLegacyUploadRecord(value: unknown): PendingUploadFsRecord | null { - if (!value || typeof value !== "object") { - return null; - } - const record = value as Partial; - if ( - typeof record.id !== "string" || - !record.id || - typeof record.bufferBase64 !== "string" || - typeof record.filename !== "string" || - !record.filename || - typeof record.conversationId !== "string" || - !record.conversationId || - typeof record.createdAt !== "number" || - !Number.isFinite(record.createdAt) - ) { - return null; - } - return { - id: record.id, - bufferBase64: record.bufferBase64, - filename: record.filename, - contentType: typeof record.contentType === "string" ? record.contentType : undefined, - conversationId: record.conversationId, - consentCardActivityId: - typeof record.consentCardActivityId === "string" ? record.consentCardActivityId : undefined, - createdAt: record.createdAt, - }; -} - -function normalizeLegacyUploads(value: unknown): Record { - if (!isValidStore(value)) { - return {}; - } - const uploads: Record = {}; - for (const record of Object.values(value.uploads)) { - const normalized = normalizeLegacyUploadRecord(record); - if (normalized) { - uploads[normalized.id] = normalized; - } - } - return uploads; -} - async function deleteUploadRows( id: string, metaStore: PluginStateKeyedStore, @@ -302,38 +170,6 @@ async function registerUploadRows( ); } -async function importLegacyStore( - options: PendingUploadsFsOptions | undefined, - metaStore: PluginStateKeyedStore, - chunkStore: PluginStateKeyedStore, - ttlMs: number, -): Promise { - const legacyFilePath = resolveLegacyFilePath(options); - const migrationStore = createMigrationStore(options); - const migrationKey = buildMigrationKey(legacyFilePath); - const imported = (await migrationStore.lookup(migrationKey)) !== undefined; - const { value, exists } = await readJsonFile(legacyFilePath, empty); - if (!exists) { - if (!imported) { - await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() }); - } - return; - } - const contentKey = buildMigrationContentKey(legacyFilePath, value); - if (await migrationStore.lookup(contentKey)) { - return; - } - const legacy = pruneToLimit(pruneExpired(normalizeLegacyUploads(value), Date.now(), ttlMs)); - for (const record of Object.values(legacy)) { - await registerUploadRows(record, metaStore, chunkStore, ttlMs, false); - } - await migrationStore.register(contentKey, { importedAt: new Date().toISOString() }); - if (!imported) { - await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() }); - } - await fs.rm(legacyFilePath, { force: true }).catch(() => {}); -} - async function withPendingUploadLock( options: PendingUploadsFsOptions | undefined, run: () => Promise, @@ -341,17 +177,6 @@ async function withPendingUploadLock( return await withMSTeamsSqliteMutationLock(options, PENDING_UPLOAD_LOCK_FILENAME, run); } -async function ensureLegacyImported( - options: PendingUploadsFsOptions | undefined, - metaStore: PluginStateKeyedStore, - chunkStore: PluginStateKeyedStore, - ttlMs: number, -): Promise { - await withPendingUploadLock(options, () => - importLegacyStore(options, metaStore, chunkStore, ttlMs), - ); -} - async function readUploadRows( id: string, metaStore: PluginStateKeyedStore, @@ -432,7 +257,6 @@ export async function storePendingUploadFs( const metaStore = createMetaStore(options); const chunkStore = createChunkStore(options); await withPendingUploadLock(options, async () => { - await importLegacyStore(options, metaStore, chunkStore, ttlMs); await registerUploadRows( { id: upload.id, @@ -465,7 +289,6 @@ export async function getPendingUploadFs( const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS; const metaStore = createMetaStore(options); const chunkStore = createChunkStore(options); - await ensureLegacyImported(options, metaStore, chunkStore, ttlMs); const upload = await readUploadRows(id, metaStore, chunkStore); if (!upload) { return undefined; @@ -488,11 +311,9 @@ export async function removePendingUploadFs( if (!id) { return; } - const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS; const metaStore = createMetaStore(options); const chunkStore = createChunkStore(options); await withPendingUploadLock(options, async () => { - await importLegacyStore(options, metaStore, chunkStore, ttlMs); await deleteUploadRows(id, metaStore, chunkStore); }); } @@ -508,9 +329,7 @@ export async function setPendingUploadActivityIdFs( ): Promise { const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS; const metaStore = createMetaStore(options); - const chunkStore = createChunkStore(options); await withPendingUploadLock(options, async () => { - await importLegacyStore(options, metaStore, chunkStore, ttlMs); const record = await metaStore.lookup(buildMetaKey(id)); if (!record || Date.now() - record.createdAt > ttlMs) { return; diff --git a/extensions/msteams/src/polls.test.ts b/extensions/msteams/src/polls.test.ts index 58c8e57cf1df..269b5ea3b29d 100644 --- a/extensions/msteams/src/polls.test.ts +++ b/extensions/msteams/src/polls.test.ts @@ -152,7 +152,7 @@ describe("state poll store", () => { setMSTeamsRuntime(msteamsRuntimeStub); }); - it("imports legacy JSON polls once and removes the old file", async () => { + it("ignores legacy JSON polls at runtime", async () => { const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); const filePath = path.join(stateDir, "msteams-polls.json"); await fs.promises.writeFile( @@ -173,18 +173,18 @@ describe("state poll store", () => { ); const store = createMSTeamsPollStoreState({ stateDir }); - await expect(store.getPoll("poll-legacy")).resolves.toMatchObject({ - id: "poll-legacy", - question: "Legacy?", - }); - await expect(fs.promises.access(filePath)).rejects.toThrow(); + await expect(store.getPoll("poll-legacy")).resolves.toBeNull(); + await expect(fs.promises.access(filePath)).resolves.toBeUndefined(); - const updated = await store.recordVote({ - pollId: "poll-legacy", - voterId: "user-1", - selections: ["1"], + await store.createPoll({ + id: "poll-new", + question: "New?", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date().toISOString(), + votes: {}, }); - expect(updated?.votes["user-1"]).toEqual(["1"]); + await expect(store.getPoll("poll-new")).resolves.toMatchObject({ id: "poll-new" }); await expect( fs.promises.access(path.join(stateDir, "state", "openclaw.sqlite")), ).resolves.toBeUndefined(); @@ -264,106 +264,6 @@ describe("state poll store", () => { expect(stored?.votes["user-new"]).toEqual(["1"]); }); - it("fills missing legacy vote buckets after a partial metadata import", async () => { - const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); - const env = { ...process.env, OPENCLAW_STATE_DIR: stateDir }; - const filePath = path.join(stateDir, "msteams-polls.json"); - const metadata = { - id: "poll-partial", - question: "Partial?", - options: ["A", "B"], - maxSelections: 1, - createdAt: new Date().toISOString(), - }; - const metadataStore = createPluginStateKeyedStoreForTests("msteams", { - namespace: "polls", - maxEntries: 2000, - env, - }); - await metadataStore.register("poll-partial", metadata); - const voterHash = crypto - .createHash("sha256") - .update("poll-partial") - .update("\0") - .update("user-legacy") - .digest("hex"); - const bucket = String(Number.parseInt(voterHash.slice(0, 8), 16) % 32).padStart(4, "0"); - const pollHash = crypto.createHash("sha256").update("poll-partial").digest("hex"); - const voteBucketStore = createPluginStateKeyedStoreForTests<{ - pollId: string; - bucket: string; - votes: Record; - updatedAt: string; - }>("msteams", { - namespace: "poll-vote-buckets", - maxEntries: 32_032, - env, - }); - await voteBucketStore.register(`${pollHash}:${bucket}`, { - pollId: "poll-partial", - bucket, - votes: { "user-legacy": ["0"] }, - updatedAt: metadata.createdAt, - }); - await fs.promises.writeFile( - filePath, - `${JSON.stringify({ - version: 1, - polls: { - "poll-partial": { - ...metadata, - votes: { - "user-legacy": ["1"], - "user-missing": ["1"], - }, - }, - }, - })}\n`, - ); - - const store = createMSTeamsPollStoreState({ env }); - - await expect(store.getPoll("poll-partial")).resolves.toMatchObject({ - votes: { - "user-legacy": ["0"], - "user-missing": ["1"], - }, - }); - }); - - it("keeps newest legacy polls by update timestamp at the row cap", async () => { - const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); - const filePath = path.join(stateDir, "msteams-polls.json"); - const pollRows: Record = {}; - const baseMs = Date.now() - 60_000; - pollRows["poll-recent"] = { - id: "poll-recent", - question: "Recent?", - options: ["A", "B"], - maxSelections: 1, - createdAt: new Date(baseMs + 2_000_000).toISOString(), - updatedAt: new Date(baseMs + 2_000_000).toISOString(), - votes: {}, - }; - for (let index = 0; index < 1000; index += 1) { - const id = `poll-${String(index).padStart(4, "0")}`; - pollRows[id] = { - id, - question: "Old?", - options: ["A", "B"], - maxSelections: 1, - createdAt: new Date(baseMs + index).toISOString(), - votes: {}, - }; - } - await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, polls: pollRows })}\n`); - - const store = createMSTeamsPollStoreState({ stateDir }); - - await expect(store.getPoll("poll-recent")).resolves.toMatchObject({ id: "poll-recent" }); - await expect(store.getPoll("poll-0000")).resolves.toBeNull(); - }); - it("deletes vote buckets when pruning over the poll cap", async () => { const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); const env = { ...process.env, OPENCLAW_STATE_DIR: stateDir }; diff --git a/extensions/msteams/src/polls.ts b/extensions/msteams/src/polls.ts index f0faa64c19fb..82b93e03b2cd 100644 --- a/extensions/msteams/src/polls.ts +++ b/extensions/msteams/src/polls.ts @@ -1,5 +1,4 @@ import crypto from "node:crypto"; -import fs from "node:fs/promises"; import { parseStrictNonNegativeInteger } from "openclaw/plugin-sdk/number-runtime"; import { isRecord, @@ -13,8 +12,6 @@ import { toPluginJsonValue, withMSTeamsSqliteMutationLock, } from "./sqlite-state.js"; -import { resolveMSTeamsStorePath } from "./storage.js"; -import { readJsonFile } from "./store-fs.js"; type MSTeamsPollVote = { pollId: string; @@ -52,31 +49,30 @@ type MSTeamsPollCard = { fallbackText: string; }; -type PollStoreData = { +export type MSTeamsPollStoreData = { version: 1; polls: Record; }; -type StoredMSTeamsPoll = Omit; +export type StoredMSTeamsPoll = Omit; -type StoredMSTeamsPollVoteBucket = { +export type StoredMSTeamsPollVoteBucket = { pollId: string; bucket: string; votes: Record; updatedAt: string; }; -const STORE_FILENAME = "msteams-polls.json"; -const POLLS_NAMESPACE = "polls"; -const POLL_VOTE_BUCKETS_NAMESPACE = "poll-vote-buckets"; -const POLL_MIGRATIONS_NAMESPACE = "poll-migrations"; -const LEGACY_POLLS_MIGRATION_KEY = "msteams-polls-json-v1"; -const MAX_POLLS = 1000; -const SQLITE_MAX_POLL_ROWS = MAX_POLLS + 1000; +export const MSTEAMS_POLLS_LEGACY_FILENAME = "msteams-polls.json"; +export const MSTEAMS_POLLS_NAMESPACE = "polls"; +export const MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE = "poll-vote-buckets"; +export const MSTEAMS_MAX_POLLS = 1000; +export const MSTEAMS_SQLITE_MAX_POLL_ROWS = MSTEAMS_MAX_POLLS + 1000; // Keep worst-case retained vote buckets below plugin-state's per-plugin live row cap. -const POLL_VOTE_BUCKET_COUNT = 32; -const MAX_POLL_VOTE_BUCKET_ROWS = (MAX_POLLS + 1) * POLL_VOTE_BUCKET_COUNT; -const POLL_TTL_MS = 30 * 24 * 60 * 60 * 1000; +export const MSTEAMS_POLL_VOTE_BUCKET_COUNT = 32; +export const MSTEAMS_MAX_POLL_VOTE_BUCKET_ROWS = + (MSTEAMS_MAX_POLLS + 1) * MSTEAMS_POLL_VOTE_BUCKET_COUNT; +export const MSTEAMS_POLL_TTL_MS = 30 * 24 * 60 * 60 * 1000; const POLL_LOCK_FILENAME = "msteams-polls.sqlite.lock"; function normalizeChoiceValue(value: unknown): string | null { @@ -248,30 +244,18 @@ type MSTeamsPollStoreStateOptions = { storePath?: string; }; -type PollMigrationMarker = { - importedAt: string; -}; - function createPollStateStore(params?: MSTeamsPollStoreStateOptions) { return getMSTeamsRuntime().state.openKeyedStore({ - namespace: POLLS_NAMESPACE, - maxEntries: SQLITE_MAX_POLL_ROWS, + namespace: MSTEAMS_POLLS_NAMESPACE, + maxEntries: MSTEAMS_SQLITE_MAX_POLL_ROWS, env: resolveMSTeamsSqliteStateEnv(params), }); } function createPollVoteBucketStateStore(params?: MSTeamsPollStoreStateOptions) { return getMSTeamsRuntime().state.openKeyedStore({ - namespace: POLL_VOTE_BUCKETS_NAMESPACE, - maxEntries: MAX_POLL_VOTE_BUCKET_ROWS, - env: resolveMSTeamsSqliteStateEnv(params), - }); -} - -function createPollMigrationStore(params?: MSTeamsPollStoreStateOptions) { - return getMSTeamsRuntime().state.openKeyedStore({ - namespace: POLL_MIGRATIONS_NAMESPACE, - maxEntries: 100, + namespace: MSTEAMS_POLL_VOTE_BUCKETS_NAMESPACE, + maxEntries: MSTEAMS_MAX_POLL_VOTE_BUCKET_ROWS, env: resolveMSTeamsSqliteStateEnv(params), }); } @@ -287,7 +271,7 @@ function parseTimestamp(value?: string): number | null { function pruneExpired( polls: Record, ) { - const cutoff = Date.now() - POLL_TTL_MS; + const cutoff = Date.now() - MSTEAMS_POLL_TTL_MS; const entries = Object.entries(polls).filter(([, poll]) => { const ts = parseTimestamp(poll.updatedAt ?? poll.createdAt) ?? 0; return ts >= cutoff; @@ -295,9 +279,11 @@ function pruneExpired( return Object.fromEntries(entries); } -function selectRetainedPolls(polls: Record): Array<[string, MSTeamsPoll]> { +export function selectRetainedMSTeamsPolls( + polls: Record, +): Array<[string, MSTeamsPoll]> { const retained = Object.entries(pruneExpired(polls)); - if (retained.length <= MAX_POLLS) { + if (retained.length <= MSTEAMS_MAX_POLLS) { return retained; } retained.sort((a, b) => { @@ -305,7 +291,7 @@ function selectRetainedPolls(polls: Record): Array<[string, const bTs = parseTimestamp(b[1].updatedAt ?? b[1].createdAt) ?? 0; return aTs - bTs || a[0].localeCompare(b[0]); }); - return retained.slice(retained.length - MAX_POLLS); + return retained.slice(retained.length - MSTEAMS_MAX_POLLS); } export function normalizeMSTeamsPollSelections(poll: MSTeamsPoll, selections: string[]) { @@ -319,45 +305,37 @@ export function normalizeMSTeamsPollSelections(poll: MSTeamsPoll, selections: st return uniqueStrings(limited); } +export function splitMSTeamsPoll(poll: MSTeamsPoll): { + metadata: StoredMSTeamsPoll; + votes: MSTeamsPoll["votes"]; +} { + const { votes, ...metadata } = poll; + return { metadata, votes }; +} + +function hashMSTeamsPollVote(pollId: string, voterId: string): string { + return crypto.createHash("sha256").update(pollId).update("\0").update(voterId).digest("hex"); +} + +export function buildMSTeamsPollStateKey(pollId: string): string { + return crypto.createHash("sha256").update(pollId).digest("hex"); +} + +export function selectMSTeamsPollVoteBucket(pollId: string, voterId: string): string { + const bucket = Number.parseInt(hashMSTeamsPollVote(pollId, voterId).slice(0, 8), 16); + return String(bucket % MSTEAMS_POLL_VOTE_BUCKET_COUNT).padStart(4, "0"); +} + +export function buildMSTeamsPollVoteBucketKey(pollId: string, bucket: string): string { + const pollDigest = crypto.createHash("sha256").update(pollId).digest("hex"); + return `${pollDigest}:${bucket}`; +} + export function createMSTeamsPollStoreState( params?: MSTeamsPollStoreStateOptions, ): MSTeamsPollStore { const pollStore = createPollStateStore(params); const voteBucketStore = createPollVoteBucketStateStore(params); - const migrationStore = createPollMigrationStore(params); - const legacyStorePath = resolveMSTeamsStorePath({ - filename: STORE_FILENAME, - env: params?.env, - homedir: params?.homedir, - stateDir: params?.stateDir, - storePath: params?.storePath, - }); - let legacyImportPromise: Promise | null = null; - - const splitPoll = ( - poll: MSTeamsPoll, - ): { metadata: StoredMSTeamsPoll; votes: MSTeamsPoll["votes"] } => { - const { votes, ...metadata } = poll; - return { metadata, votes }; - }; - - const hashVote = (pollId: string, voterId: string): string => { - return crypto.createHash("sha256").update(pollId).update("\0").update(voterId).digest("hex"); - }; - - const buildPollStateKey = (pollId: string): string => { - return crypto.createHash("sha256").update(pollId).digest("hex"); - }; - - const selectVoteBucket = (pollId: string, voterId: string): string => { - const bucket = Number.parseInt(hashVote(pollId, voterId).slice(0, 8), 16); - return String(bucket % POLL_VOTE_BUCKET_COUNT).padStart(4, "0"); - }; - - const buildVoteBucketKey = (pollId: string, bucket: string): string => { - const pollDigest = crypto.createHash("sha256").update(pollId).digest("hex"); - return `${pollDigest}:${bucket}`; - }; const readPollVotes = async (pollId: string): Promise> => { const votes: Record = {}; @@ -384,13 +362,13 @@ export function createMSTeamsPollStoreState( ): Promise => { const buckets = new Map>(); for (const [voterId, selections] of Object.entries(votes)) { - const bucket = selectVoteBucket(pollId, voterId); + const bucket = selectMSTeamsPollVoteBucket(pollId, voterId); const bucketVotes = buckets.get(bucket) ?? {}; bucketVotes[voterId] = selections; buckets.set(bucket, bucketVotes); } for (const [bucket, bucketVotes] of buckets) { - const key = buildVoteBucketKey(pollId, bucket); + const key = buildMSTeamsPollVoteBucketKey(pollId, bucket); const existing = await voteBucketStore.lookup(key); await voteBucketStore.register( key, @@ -410,8 +388,8 @@ export function createMSTeamsPollStoreState( selections: string[], updatedAt: string, ): Promise => { - const bucket = selectVoteBucket(pollId, voterId); - const key = buildVoteBucketKey(pollId, bucket); + const bucket = selectMSTeamsPollVoteBucket(pollId, voterId); + const key = buildMSTeamsPollVoteBucketKey(pollId, bucket); const existing = await voteBucketStore.lookup(key); await voteBucketStore.register( key, @@ -428,48 +406,6 @@ export function createMSTeamsPollStoreState( return { ...metadata, votes: await readPollVotes(metadata.id) }; }; - const importLegacyStore = async (): Promise => { - if (await migrationStore.lookup(LEGACY_POLLS_MIGRATION_KEY)) { - return; - } - const empty: PollStoreData = { version: 1, polls: {} }; - const { value, exists } = await readJsonFile(legacyStorePath, empty); - if (!exists) { - await migrationStore.register(LEGACY_POLLS_MIGRATION_KEY, { - importedAt: new Date().toISOString(), - }); - return; - } - const legacyPolls = - value.version === 1 && - value.polls && - typeof value.polls === "object" && - !Array.isArray(value.polls) - ? value.polls - : {}; - for (const [pollId, poll] of selectRetainedPolls(legacyPolls)) { - if (!pollId) { - continue; - } - const { metadata, votes } = splitPoll(poll); - await pollStore.registerIfAbsent(buildPollStateKey(pollId), toPluginJsonValue(metadata)); - await registerPollVotes(pollId, votes, poll.updatedAt ?? poll.createdAt); - } - await migrationStore.register(LEGACY_POLLS_MIGRATION_KEY, { - importedAt: new Date().toISOString(), - }); - await fs.rm(legacyStorePath, { force: true }).catch(() => {}); - }; - - const ensureLegacyImported = async (): Promise => { - legacyImportPromise ??= withMSTeamsSqliteMutationLock( - params, - POLL_LOCK_FILENAME, - importLegacyStore, - ); - await legacyImportPromise; - }; - const prunePollStoreToLimit = async (): Promise => { const rows = []; for (const row of await pollStore.entries()) { @@ -480,7 +416,7 @@ export function createMSTeamsPollStoreState( } rows.push(row); } - if (rows.length <= MAX_POLLS) { + if (rows.length <= MSTEAMS_MAX_POLLS) { return; } const sorted = rows.toSorted((a, b) => { @@ -488,7 +424,7 @@ export function createMSTeamsPollStoreState( const bTs = parseTimestamp(b.value.updatedAt ?? b.value.createdAt) ?? 0; return aTs - bTs || a.key.localeCompare(b.key); }); - for (const row of sorted.slice(0, rows.length - MAX_POLLS)) { + for (const row of sorted.slice(0, rows.length - MSTEAMS_MAX_POLLS)) { await pollStore.delete(row.key); await deletePollVotes(row.value.id); } @@ -496,9 +432,8 @@ export function createMSTeamsPollStoreState( const createPoll = async (poll: MSTeamsPoll) => { await withMSTeamsSqliteMutationLock(params, POLL_LOCK_FILENAME, async () => { - await importLegacyStore(); - const { metadata, votes } = splitPoll(poll); - await pollStore.register(buildPollStateKey(poll.id), toPluginJsonValue(metadata)); + const { metadata, votes } = splitMSTeamsPoll(poll); + await pollStore.register(buildMSTeamsPollStateKey(poll.id), toPluginJsonValue(metadata)); await deletePollVotes(poll.id); await registerPollVotes(poll.id, votes, poll.updatedAt ?? poll.createdAt); await prunePollStoreToLimit(); @@ -506,8 +441,7 @@ export function createMSTeamsPollStoreState( }; const getPoll = async (pollId: string) => { - await ensureLegacyImported(); - const poll = await pollStore.lookup(buildPollStateKey(pollId)); + const poll = await pollStore.lookup(buildMSTeamsPollStateKey(pollId)); if (!poll) { return null; } @@ -519,8 +453,7 @@ export function createMSTeamsPollStoreState( const recordVote = async (vote: { pollId: string; voterId: string; selections: string[] }) => { return await withMSTeamsSqliteMutationLock(params, POLL_LOCK_FILENAME, async () => { - await importLegacyStore(); - const pollKey = buildPollStateKey(vote.pollId); + const pollKey = buildMSTeamsPollStateKey(vote.pollId); const poll = await pollStore.lookup(pollKey); if (!poll) { return null; diff --git a/extensions/msteams/src/sso-token-store.test.ts b/extensions/msteams/src/sso-token-store.test.ts index c9d2d9a6939d..a4a6ec3ff3ad 100644 --- a/extensions/msteams/src/sso-token-store.test.ts +++ b/extensions/msteams/src/sso-token-store.test.ts @@ -2,7 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it } from "vitest"; import { setMSTeamsRuntime } from "./runtime.js"; import { createMSTeamsSsoTokenStoreFs } from "./sso-token-store.js"; import { msteamsRuntimeStub } from "./test-support/runtime.js"; @@ -13,10 +13,6 @@ describe("msteams sso token store (plugin state)", () => { setMSTeamsRuntime(msteamsRuntimeStub); }); - afterEach(() => { - vi.restoreAllMocks(); - }); - it("keeps distinct tokens when connectionName and userId contain the legacy delimiter", async () => { const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-")); const storePath = path.join(stateDir, "msteams-sso-tokens.json"); @@ -47,7 +43,7 @@ describe("msteams sso token store (plugin state)", () => { ).resolves.toBeUndefined(); }); - it("loads legacy flat-key files by rebuilding keys from stored token payloads", async () => { + it("ignores legacy flat-key token files at runtime", async () => { const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-legacy-")); const storePath = path.join(stateDir, "msteams-sso-tokens.json"); await fs.writeFile( @@ -76,13 +72,8 @@ describe("msteams sso token store (plugin state)", () => { connectionName: "conn", userId: "user-1", }), - ).toEqual({ - connectionName: "conn", - userId: "user-1", - token: "token-1", - updatedAt: "2026-04-10T00:00:00.000Z", - }); - await expect(fs.access(storePath)).rejects.toThrow(); + ).toBeNull(); + await expect(fs.access(storePath)).resolves.toBeUndefined(); }); it("keeps plugin-state keys bounded for long Teams identifiers", async () => { @@ -100,72 +91,4 @@ describe("msteams sso token store (plugin state)", () => { expect(await store.remove(token)).toBe(true); expect(await store.get(token)).toBeNull(); }); - - it("imports a legacy token file that appears after an empty migration marker", async () => { - const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-late-")); - const storePath = path.join(stateDir, "msteams-sso-tokens.json"); - const store = createMSTeamsSsoTokenStoreFs({ storePath }); - - expect(await store.get({ connectionName: "conn", userId: "user-late" })).toBeNull(); - await fs.writeFile( - storePath, - `${JSON.stringify({ - version: 1, - tokens: { - late: { - connectionName: "conn", - userId: "user-late", - token: "token-late", - updatedAt: "2026-04-10T00:00:00.000Z", - }, - }, - })}\n`, - "utf8", - ); - - expect(await store.get({ connectionName: "conn", userId: "user-late" })).toEqual({ - connectionName: "conn", - userId: "user-late", - token: "token-late", - updatedAt: "2026-04-10T00:00:00.000Z", - }); - await expect(fs.access(storePath)).rejects.toThrow(); - }); - - it("does not resurrect removed tokens when a migrated legacy file cannot be deleted", async () => { - const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-stale-")); - const storePath = path.join(stateDir, "msteams-sso-tokens.json"); - await fs.writeFile( - storePath, - `${JSON.stringify({ - version: 1, - tokens: { - stale: { - connectionName: "conn", - userId: "user-stale", - token: "token-stale", - updatedAt: "2026-04-10T00:00:00.000Z", - }, - }, - })}\n`, - "utf8", - ); - const originalRm = fs.rm; - vi.spyOn(fs, "rm").mockImplementation(async (target, options) => { - if (target === storePath) { - throw new Error("cannot remove"); - } - return await originalRm(target, options); - }); - - const store = createMSTeamsSsoTokenStoreFs({ storePath }); - expect(await store.get({ connectionName: "conn", userId: "user-stale" })).toEqual({ - connectionName: "conn", - userId: "user-stale", - token: "token-stale", - updatedAt: "2026-04-10T00:00:00.000Z", - }); - expect(await store.remove({ connectionName: "conn", userId: "user-stale" })).toBe(true); - expect(await store.get({ connectionName: "conn", userId: "user-stale" })).toBeNull(); - }); }); diff --git a/extensions/msteams/src/sso-token-store.ts b/extensions/msteams/src/sso-token-store.ts index ec9ff006afe4..e969f59d906e 100644 --- a/extensions/msteams/src/sso-token-store.ts +++ b/extensions/msteams/src/sso-token-store.ts @@ -12,7 +12,6 @@ */ import { createHash } from "node:crypto"; -import fs from "node:fs/promises"; import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import { getMSTeamsRuntime } from "./runtime.js"; import { @@ -20,10 +19,8 @@ import { toPluginJsonValue, withMSTeamsSqliteMutationLock, } from "./sqlite-state.js"; -import { resolveMSTeamsStorePath } from "./storage.js"; -import { readJsonFile } from "./store-fs.js"; -type MSTeamsSsoStoredToken = { +export type MSTeamsSsoStoredToken = { /** Connection name from the Bot Framework OAuth connection setting. */ connectionName: string; /** Stable user identifier (AAD object ID preferred). */ @@ -48,31 +45,20 @@ type SsoStoreData = { tokens: Record; }; -const STORE_FILENAME = "msteams-sso-tokens.json"; -const SSO_TOKENS_NAMESPACE = "sso-tokens"; -const SSO_TOKEN_MIGRATIONS_NAMESPACE = "sso-token-migrations"; +export type MSTeamsSsoStoreData = SsoStoreData; + +export const MSTEAMS_SSO_TOKENS_LEGACY_FILENAME = "msteams-sso-tokens.json"; +export const MSTEAMS_SSO_TOKENS_NAMESPACE = "sso-tokens"; const SSO_TOKEN_LOCK_FILENAME = "msteams-sso-tokens.sqlite.lock"; -const MAX_SSO_TOKENS = 5000; +export const MSTEAMS_MAX_SSO_TOKENS = 5000; const STORE_KEY_VERSION_PREFIX = "v2:"; -function makeKey(connectionName: string, userId: string): string { +export function makeMSTeamsSsoTokenStoreKey(connectionName: string, userId: string): string { return `${STORE_KEY_VERSION_PREFIX}${createHash("sha256") .update(JSON.stringify([connectionName, userId])) .digest("hex")}`; } -function buildMigrationKey(filePath: string): string { - return `legacy-json:${createHash("sha256").update(filePath).digest("hex")}`; -} - -function buildMigrationContentKey(filePath: string, value: unknown): string { - return `legacy-json-content:${createHash("sha256") - .update(filePath) - .update("\0") - .update(JSON.stringify(value) ?? "undefined") - .digest("hex")}`; -} - function createTokenStore(params?: { env?: NodeJS.ProcessEnv; homedir?: () => string; @@ -80,26 +66,13 @@ function createTokenStore(params?: { storePath?: string; }): PluginStateKeyedStore { return getMSTeamsRuntime().state.openKeyedStore({ - namespace: SSO_TOKENS_NAMESPACE, - maxEntries: MAX_SSO_TOKENS, + namespace: MSTEAMS_SSO_TOKENS_NAMESPACE, + maxEntries: MSTEAMS_MAX_SSO_TOKENS, env: resolveMSTeamsSqliteStateEnv(params), }); } -function createMigrationStore(params?: { - env?: NodeJS.ProcessEnv; - homedir?: () => string; - stateDir?: string; - storePath?: string; -}): PluginStateKeyedStore<{ importedAt: string }> { - return getMSTeamsRuntime().state.openKeyedStore<{ importedAt: string }>({ - namespace: SSO_TOKEN_MIGRATIONS_NAMESPACE, - maxEntries: 100, - env: resolveMSTeamsSqliteStateEnv(params), - }); -} - -function normalizeStoredToken(value: unknown): MSTeamsSsoStoredToken | null { +export function normalizeMSTeamsSsoStoredToken(value: unknown): MSTeamsSsoStoredToken | null { if (!value || typeof value !== "object") { return null; } @@ -125,7 +98,7 @@ function normalizeStoredToken(value: unknown): MSTeamsSsoStoredToken | null { }; } -function isSsoStoreData(value: unknown): value is SsoStoreData { +export function isMSTeamsSsoStoreData(value: unknown): value is MSTeamsSsoStoreData { if (!value || typeof value !== "object") { return false; } @@ -139,71 +112,17 @@ export function createMSTeamsSsoTokenStoreFs(params?: { stateDir?: string; storePath?: string; }): MSTeamsSsoTokenStore { - const legacyFilePath = resolveMSTeamsStorePath({ - filename: STORE_FILENAME, - env: params?.env, - homedir: params?.homedir, - stateDir: params?.stateDir, - storePath: params?.storePath, - }); - const empty: SsoStoreData = { version: 1, tokens: {} }; const tokenStore = createTokenStore(params); - const migrationStore = createMigrationStore(params); - const migrationKey = buildMigrationKey(legacyFilePath); - let legacyImportPromise: Promise | null = null; - - const importLegacyStore = async (): Promise => { - const imported = (await migrationStore.lookup(migrationKey)) !== undefined; - const { value, exists } = await readJsonFile(legacyFilePath, empty); - const contentKey = exists ? buildMigrationContentKey(legacyFilePath, value) : null; - if (contentKey && (await migrationStore.lookup(contentKey))) { - return; - } - if (exists && isSsoStoreData(value)) { - for (const stored of Object.values(value.tokens)) { - const normalized = normalizeStoredToken(stored); - if (!normalized) { - continue; - } - await tokenStore.registerIfAbsent( - makeKey(normalized.connectionName, normalized.userId), - toPluginJsonValue(normalized), - ); - } - } - if (contentKey) { - await migrationStore.register(contentKey, { importedAt: new Date().toISOString() }); - } - if (!imported) { - await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() }); - } - if (exists) { - await fs.rm(legacyFilePath, { force: true }).catch(() => {}); - } - }; - - const ensureLegacyImported = async (): Promise => { - if (!legacyImportPromise) { - legacyImportPromise = withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, () => - importLegacyStore(), - ).finally(() => { - legacyImportPromise = null; - }); - } - await legacyImportPromise; - }; return { async get({ connectionName, userId }) { - await ensureLegacyImported(); - return (await tokenStore.lookup(makeKey(connectionName, userId))) ?? null; + return (await tokenStore.lookup(makeMSTeamsSsoTokenStoreKey(connectionName, userId))) ?? null; }, async save(token) { await withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, async () => { - await importLegacyStore(); await tokenStore.register( - makeKey(token.connectionName, token.userId), + makeMSTeamsSsoTokenStoreKey(token.connectionName, token.userId), toPluginJsonValue({ ...token }), ); }); @@ -212,8 +131,7 @@ export function createMSTeamsSsoTokenStoreFs(params?: { async remove({ connectionName, userId }) { let removed = false; await withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, async () => { - await importLegacyStore(); - removed = await tokenStore.delete(makeKey(connectionName, userId)); + removed = await tokenStore.delete(makeMSTeamsSsoTokenStoreKey(connectionName, userId)); }); return removed; }, @@ -225,13 +143,13 @@ export function createMSTeamsSsoTokenStoreMemory(): MSTeamsSsoTokenStore { const tokens = new Map(); return { async get({ connectionName, userId }) { - return tokens.get(makeKey(connectionName, userId)) ?? null; + return tokens.get(makeMSTeamsSsoTokenStoreKey(connectionName, userId)) ?? null; }, async save(token) { - tokens.set(makeKey(token.connectionName, token.userId), { ...token }); + tokens.set(makeMSTeamsSsoTokenStoreKey(token.connectionName, token.userId), { ...token }); }, async remove({ connectionName, userId }) { - return tokens.delete(makeKey(connectionName, userId)); + return tokens.delete(makeMSTeamsSsoTokenStoreKey(connectionName, userId)); }, }; } diff --git a/extensions/msteams/src/store-fs.ts b/extensions/msteams/src/store-fs.ts index facbb51c70a1..a256a2912abb 100644 --- a/extensions/msteams/src/store-fs.ts +++ b/extensions/msteams/src/store-fs.ts @@ -1,5 +1,5 @@ import { withFileLock as withPathLock } from "openclaw/plugin-sdk/file-lock"; -import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; +import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; import { pathExists } from "openclaw/plugin-sdk/security-runtime"; const STORE_LOCK_OPTIONS = { @@ -13,14 +13,7 @@ const STORE_LOCK_OPTIONS = { stale: 30_000, } as const; -export async function readJsonFile( - filePath: string, - fallback: T, -): Promise<{ value: T; exists: boolean }> { - return await readJsonFileWithFallback(filePath, fallback); -} - -export async function writeJsonFile(filePath: string, value: unknown): Promise { +async function writeJsonFile(filePath: string, value: unknown): Promise { await writeJsonFileAtomically(filePath, value); }