From c682c25cf70f2a996eb28d69f969821c236ef9f6 Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Mon, 1 Jun 2026 07:30:38 -0700 Subject: [PATCH] refactor(clawdbot-d9a): pass agent scope to command session lists --- src/commands/health.snapshot.test.ts | 24 ++++++++- src/commands/health.ts | 20 ++++--- src/commands/status.summary.test.ts | 81 ++++++++++++++++++++++++++-- src/commands/status.summary.ts | 41 ++++++++++---- 4 files changed, 145 insertions(+), 21 deletions(-) diff --git a/src/commands/health.snapshot.test.ts b/src/commands/health.snapshot.test.ts index 15e50211ef22..5f0ef178800d 100644 --- a/src/commands/health.snapshot.test.ts +++ b/src/commands/health.snapshot.test.ts @@ -10,6 +10,7 @@ import type { HealthSummary } from "./health.js"; let testConfig: Record = {}; let testStore: Record = {}; +let listHealthSessionEntriesCalls: Array<{ agentId?: string; storePath?: string }> = []; let healthPluginsForTest: HealthTestPlugin[] = []; let setActivePluginRegistry: typeof import("../plugins/runtime.js").setActivePluginRegistry; @@ -70,8 +71,10 @@ async function loadFreshHealthModulesForTest() { loadSessionStore: () => testStore, })); vi.doMock("../config/sessions/session-accessor.js", () => ({ - listSessionEntries: () => - Object.entries(testStore).map(([sessionKey, entry]) => ({ sessionKey, entry })), + listSessionEntries: (scope?: { agentId?: string; storePath?: string }) => { + listHealthSessionEntriesCalls.push(scope ?? {}); + return Object.entries(testStore).map(([sessionKey, entry]) => ({ sessionKey, entry })); + }, })); vi.doMock("../plugins/runtime/runtime-web-channel-plugin.js", () => ({ webAuthExists: vi.fn(async () => true), @@ -466,6 +469,7 @@ describe("getHealthSnapshot", () => { beforeEach(() => { buildTelegramHealthSummaryForTest = buildTelegramHealthSummary; probeTelegramAccountForTestOverride = undefined; + listHealthSessionEntriesCalls = []; healthPluginsForTest = [createTelegramHealthPlugin()]; setActivePluginRegistry( createTestRegistry([ @@ -955,4 +959,20 @@ describe("getHealthSnapshot", () => { expect(ops?.heartbeat.everyMs).toBe(60 * 60 * 1000); expect(ops?.heartbeat.every).toBe("1h"); }); + + it("passes agent scope when summarizing configured agent sessions", async () => { + testConfig = { + agents: { + list: [{ id: "main", default: true }, { id: "ops" }], + }, + }; + testStore = {}; + + await getHealthSnapshot({ timeoutMs: 10, probe: false }); + + expect(listHealthSessionEntriesCalls).toEqual([ + { agentId: "main", storePath: "/tmp/sessions.json" }, + { agentId: "ops", storePath: "/tmp/sessions.json" }, + ]); + }); }); diff --git a/src/commands/health.ts b/src/commands/health.ts index 792ec788e10f..b448b2904dd5 100644 --- a/src/commands/health.ts +++ b/src/commands/health.ts @@ -233,9 +233,12 @@ const resolveAgentOrder = (cfg: OpenClawConfig) => { return { defaultAgentId, ordered }; }; -const buildSessionSummary = async (storePath: string) => { +const buildSessionSummary = async (storePath: string, agentId?: string) => { const { listSessionEntries } = await import("../config/sessions/session-accessor.js"); - const sessions = listSessionEntries({ storePath }) + const sessions = listSessionEntries({ + ...(agentId ? { agentId } : {}), + storePath, + }) .filter(({ sessionKey }) => sessionKey !== "global" && sessionKey !== "unknown") .map(({ sessionKey, entry }) => ({ key: sessionKey, updatedAt: entry?.updatedAt ?? 0 })) .toSorted((a, b) => b.updatedAt - a.updatedAt); @@ -423,8 +426,10 @@ export async function getHealthSnapshot(params?: { const agents: AgentHealthSummary[] = []; for (const entry of ordered) { const storePath = resolveStorePath(cfg.session?.store, { agentId: entry.id }); - const sessions = sessionCache.get(storePath) ?? (await buildSessionSummary(storePath)); - sessionCache.set(storePath, sessions); + const sessionCacheKey = `${storePath}\0${entry.id}`; + const sessions = + sessionCache.get(sessionCacheKey) ?? (await buildSessionSummary(storePath, entry.id)); + sessionCache.set(sessionCacheKey, sessions); agents.push({ agentId: entry.id, name: entry.name, @@ -439,7 +444,10 @@ export async function getHealthSnapshot(params?: { : 0; const sessions = defaultAgent?.sessions ?? - (await buildSessionSummary(resolveStorePath(cfg.session?.store, { agentId: defaultAgentId }))); + (await buildSessionSummary( + resolveStorePath(cfg.session?.store, { agentId: defaultAgentId }), + defaultAgentId, + )); const start = Date.now(); const cappedTimeout = resolveTimerTimeoutMs(timeoutMs, DEFAULT_TIMEOUT_MS, 50); @@ -730,7 +738,7 @@ export async function healthCommand( name: entry.name, isDefault: entry.id === localAgents.defaultAgentId, heartbeat: resolveHeartbeatSummary(cfg, entry.id), - sessions: await buildSessionSummary(storePath), + sessions: await buildSessionSummary(storePath, entry.id), }); } const resolvedAgents = agents.length > 0 ? agents : fallbackAgents; diff --git a/src/commands/status.summary.test.ts b/src/commands/status.summary.test.ts index 60dc9da87f88..44205e4e5496 100644 --- a/src/commands/status.summary.test.ts +++ b/src/commands/status.summary.test.ts @@ -5,9 +5,12 @@ import type { TaskRegistrySummary } from "../tasks/task-registry.types.js"; const statusSummaryMocks = vi.hoisted(() => ({ hasConfiguredChannelsForReadOnlyScope: vi.fn(() => true), buildChannelSummary: vi.fn(async () => ["ok"]), - listSessionEntries: vi.fn<() => Array<{ sessionKey: string; entry: Record }>>( - () => [], - ), + listSessionEntries: vi.fn< + (scope?: { agentId?: string; storePath?: string }) => Array<{ + sessionKey: string; + entry: Record; + }> + >(() => []), configureTaskRegistryMaintenance: vi.fn(), taskRegistrySummary: { total: 0, @@ -143,6 +146,8 @@ vi.mock("./status.link-channel.js", () => ({ })); const { buildChannelSummary } = await import("../infra/channel-summary.js"); +const { resolveStorePath } = await import("../config/sessions/paths.js"); +const { listGatewayAgentsBasic } = await import("../gateway/agent-list.js"); const { resolveLinkChannelContext } = await import("./status.link-channel.js"); let getStatusSummary: typeof import("./status.summary.js").getStatusSummary; let statusSummaryRuntime: typeof import("./status.summary.runtime.js").statusSummaryRuntime; @@ -202,6 +207,13 @@ describe("getStatusSummary", () => { statusSummaryMocks.hasConfiguredChannelsForReadOnlyScope.mockReturnValue(true); statusSummaryMocks.buildChannelSummary.mockResolvedValue(["ok"]); statusSummaryMocks.listSessionEntries.mockReturnValue([]); + vi.mocked(resolveStorePath).mockReturnValue("/tmp/sessions.json"); + vi.mocked(listGatewayAgentsBasic).mockReturnValue({ + defaultId: "main", + mainKey: "main", + scope: "per-sender", + agents: [{ id: "main" }], + }); }); it("includes runtimeVersion in the status payload", async () => { @@ -361,6 +373,69 @@ describe("getStatusSummary", () => { expect(hydratedKeys).not.toContain("agent:main:session-2"); }); + it("passes agent scope when listing configured agent session stores", async () => { + vi.mocked(listGatewayAgentsBasic).mockReturnValue({ + defaultId: "main", + mainKey: "main", + scope: "per-sender", + agents: [{ id: "main" }, { id: "ops" }], + }); + vi.mocked(resolveStorePath).mockImplementation((_store, opts) => { + return `/tmp/${opts?.agentId ?? "main"}/sessions.json`; + }); + statusSummaryMocks.listSessionEntries.mockImplementation((scope) => + scope?.agentId === "ops" + ? toSessionEntrySummaries({ + main: { sessionId: "ops-session", updatedAt: 2 }, + }) + : toSessionEntrySummaries({ + main: { sessionId: "main-session", updatedAt: 1 }, + }), + ); + + const summary = await getStatusSummary({ includeChannelSummary: false }); + + expect(statusSummaryMocks.listSessionEntries).toHaveBeenCalledWith({ + agentId: "main", + storePath: "/tmp/main/sessions.json", + }); + expect(statusSummaryMocks.listSessionEntries).toHaveBeenCalledWith({ + agentId: "ops", + storePath: "/tmp/ops/sessions.json", + }); + expect(summary.sessions.count).toBe(2); + expect(summary.sessions.byAgent.map((agent) => [agent.agentId, agent.count])).toEqual([ + ["main", 1], + ["ops", 1], + ]); + }); + + it("aggregates shared file session stores only once", async () => { + vi.mocked(listGatewayAgentsBasic).mockReturnValue({ + defaultId: "main", + mainKey: "main", + scope: "per-sender", + agents: [{ id: "main" }, { id: "ops" }], + }); + vi.mocked(resolveStorePath).mockReturnValue("/tmp/shared-sessions.json"); + statusSummaryMocks.listSessionEntries.mockReturnValue( + toSessionEntrySummaries({ + main: { sessionId: "shared-session", updatedAt: 1 }, + }), + ); + + const summary = await getStatusSummary({ includeChannelSummary: false }); + + expect(summary.sessions.count).toBe(1); + expect(summary.sessions.byAgent.map((agent) => [agent.agentId, agent.count])).toEqual([ + ["main", 1], + ["ops", 1], + ]); + expect(statusSummaryMocks.listSessionEntries).toHaveBeenCalledWith({ + storePath: "/tmp/shared-sessions.json", + }); + }); + it("includes configured and selected model labels for pinned sessions", async () => { vi.mocked(statusSummaryRuntime.resolveConfiguredStatusModelRef).mockReturnValue({ provider: "zhipu", diff --git a/src/commands/status.summary.ts b/src/commands/status.summary.ts index 4aeda3034378..3a300ec72655 100644 --- a/src/commands/status.summary.ts +++ b/src/commands/status.summary.ts @@ -132,8 +132,11 @@ function compareSessionCandidatesByUpdatedAt(left: SessionCandidate, right: Sess return (right.updatedAt ?? 0) - (left.updatedAt ?? 0); } -function listSessionCandidates(storePath: string) { - return listSessionEntries({ storePath }) +function listSessionCandidates(storePath: string, agentId?: string) { + return listSessionEntries({ + ...(agentId ? { agentId } : {}), + storePath, + }) // Compatibility aggregate buckets are not real user sessions. .filter(({ sessionKey }) => sessionKey !== "global" && sessionKey !== "unknown") .map(({ sessionKey, entry }) => ({ @@ -249,13 +252,14 @@ export async function getStatusSummary( }) ?? DEFAULT_CONTEXT_TOKENS; const candidateCache = new Map(); - const loadSessionCandidates = (storePath: string) => { - const cached = candidateCache.get(storePath); + const loadSessionCandidates = (storePath: string, agentId?: string) => { + const cacheKey = `${storePath}\0${agentId ?? ""}`; + const cached = candidateCache.get(cacheKey); if (cached) { return cached; } - const candidates = listSessionCandidates(storePath); - candidateCache.set(storePath, candidates); + const candidates = listSessionCandidates(storePath, agentId); + candidateCache.set(cacheKey, candidates); return candidates; }; const buildSessionRows = ( @@ -344,11 +348,20 @@ export async function getStatusSummary( } satisfies SessionStatus; }); + const storeSources = agentList.agents.map((agent) => ({ + agentId: agent.id, + storePath: resolveStorePath(cfg.session?.store, { agentId: agent.id }), + })); const paths = new Set(); + const pathCounts = new Map(); + for (const source of storeSources) { + paths.add(source.storePath); + pathCounts.set(source.storePath, (pathCounts.get(source.storePath) ?? 0) + 1); + } + const byAgent = agentList.agents.map((agent) => { const storePath = resolveStorePath(cfg.session?.store, { agentId: agent.id }); - paths.add(storePath); - const candidates = loadSessionCandidates(storePath); + const candidates = loadSessionCandidates(storePath, agent.id); const sessions = buildSessionRows(candidates.slice(0, RECENT_SESSION_LIMIT), { agentIdOverride: agent.id, }); @@ -360,8 +373,16 @@ export async function getStatusSummary( }; }); - const allSessions = Array.from(paths) - .flatMap((storePath) => loadSessionCandidates(storePath)) + const allSessions = storeSources + .filter((source, index, sources) => { + return sources.findIndex((candidate) => candidate.storePath === source.storePath) === index; + }) + .flatMap((source) => + loadSessionCandidates( + source.storePath, + pathCounts.get(source.storePath) === 1 ? source.agentId : undefined, + ), + ) .toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0)); const recent = buildSessionRows(allSessions.slice(0, RECENT_SESSION_LIMIT)); const totalSessions = allSessions.length;