refactor(clawdbot-d9a): pass agent scope to command session lists

This commit is contained in:
Josh Lehman
2026-06-01 07:30:38 -07:00
parent 25ab3696ef
commit c682c25cf7
4 changed files with 145 additions and 21 deletions

View File

@@ -10,6 +10,7 @@ import type { HealthSummary } from "./health.js";
let testConfig: Record<string, unknown> = {};
let testStore: Record<string, { updatedAt?: number }> = {};
let listHealthSessionEntriesCalls: Array<{ agentId?: string; storePath?: string }> = [];
let healthPluginsForTest: HealthTestPlugin[] = [];
let setActivePluginRegistry: typeof import("../plugins/runtime.js").setActivePluginRegistry;
@@ -70,8 +71,10 @@ async function loadFreshHealthModulesForTest() {
loadSessionStore: () => testStore,
}));
vi.doMock("../config/sessions/session-accessor.js", () => ({
listSessionEntries: () =>
Object.entries(testStore).map(([sessionKey, entry]) => ({ sessionKey, entry })),
listSessionEntries: (scope?: { agentId?: string; storePath?: string }) => {
listHealthSessionEntriesCalls.push(scope ?? {});
return Object.entries(testStore).map(([sessionKey, entry]) => ({ sessionKey, entry }));
},
}));
vi.doMock("../plugins/runtime/runtime-web-channel-plugin.js", () => ({
webAuthExists: vi.fn(async () => true),
@@ -466,6 +469,7 @@ describe("getHealthSnapshot", () => {
beforeEach(() => {
buildTelegramHealthSummaryForTest = buildTelegramHealthSummary;
probeTelegramAccountForTestOverride = undefined;
listHealthSessionEntriesCalls = [];
healthPluginsForTest = [createTelegramHealthPlugin()];
setActivePluginRegistry(
createTestRegistry([
@@ -955,4 +959,20 @@ describe("getHealthSnapshot", () => {
expect(ops?.heartbeat.everyMs).toBe(60 * 60 * 1000);
expect(ops?.heartbeat.every).toBe("1h");
});
it("passes agent scope when summarizing configured agent sessions", async () => {
testConfig = {
agents: {
list: [{ id: "main", default: true }, { id: "ops" }],
},
};
testStore = {};
await getHealthSnapshot({ timeoutMs: 10, probe: false });
expect(listHealthSessionEntriesCalls).toEqual([
{ agentId: "main", storePath: "/tmp/sessions.json" },
{ agentId: "ops", storePath: "/tmp/sessions.json" },
]);
});
});

View File

@@ -233,9 +233,12 @@ const resolveAgentOrder = (cfg: OpenClawConfig) => {
return { defaultAgentId, ordered };
};
const buildSessionSummary = async (storePath: string) => {
const buildSessionSummary = async (storePath: string, agentId?: string) => {
const { listSessionEntries } = await import("../config/sessions/session-accessor.js");
const sessions = listSessionEntries({ storePath })
const sessions = listSessionEntries({
...(agentId ? { agentId } : {}),
storePath,
})
.filter(({ sessionKey }) => sessionKey !== "global" && sessionKey !== "unknown")
.map(({ sessionKey, entry }) => ({ key: sessionKey, updatedAt: entry?.updatedAt ?? 0 }))
.toSorted((a, b) => b.updatedAt - a.updatedAt);
@@ -423,8 +426,10 @@ export async function getHealthSnapshot(params?: {
const agents: AgentHealthSummary[] = [];
for (const entry of ordered) {
const storePath = resolveStorePath(cfg.session?.store, { agentId: entry.id });
const sessions = sessionCache.get(storePath) ?? (await buildSessionSummary(storePath));
sessionCache.set(storePath, sessions);
const sessionCacheKey = `${storePath}\0${entry.id}`;
const sessions =
sessionCache.get(sessionCacheKey) ?? (await buildSessionSummary(storePath, entry.id));
sessionCache.set(sessionCacheKey, sessions);
agents.push({
agentId: entry.id,
name: entry.name,
@@ -439,7 +444,10 @@ export async function getHealthSnapshot(params?: {
: 0;
const sessions =
defaultAgent?.sessions ??
(await buildSessionSummary(resolveStorePath(cfg.session?.store, { agentId: defaultAgentId })));
(await buildSessionSummary(
resolveStorePath(cfg.session?.store, { agentId: defaultAgentId }),
defaultAgentId,
));
const start = Date.now();
const cappedTimeout = resolveTimerTimeoutMs(timeoutMs, DEFAULT_TIMEOUT_MS, 50);
@@ -730,7 +738,7 @@ export async function healthCommand(
name: entry.name,
isDefault: entry.id === localAgents.defaultAgentId,
heartbeat: resolveHeartbeatSummary(cfg, entry.id),
sessions: await buildSessionSummary(storePath),
sessions: await buildSessionSummary(storePath, entry.id),
});
}
const resolvedAgents = agents.length > 0 ? agents : fallbackAgents;

View File

@@ -5,9 +5,12 @@ import type { TaskRegistrySummary } from "../tasks/task-registry.types.js";
const statusSummaryMocks = vi.hoisted(() => ({
hasConfiguredChannelsForReadOnlyScope: vi.fn(() => true),
buildChannelSummary: vi.fn(async () => ["ok"]),
listSessionEntries: vi.fn<() => Array<{ sessionKey: string; entry: Record<string, unknown> }>>(
() => [],
),
listSessionEntries: vi.fn<
(scope?: { agentId?: string; storePath?: string }) => Array<{
sessionKey: string;
entry: Record<string, unknown>;
}>
>(() => []),
configureTaskRegistryMaintenance: vi.fn(),
taskRegistrySummary: {
total: 0,
@@ -143,6 +146,8 @@ vi.mock("./status.link-channel.js", () => ({
}));
const { buildChannelSummary } = await import("../infra/channel-summary.js");
const { resolveStorePath } = await import("../config/sessions/paths.js");
const { listGatewayAgentsBasic } = await import("../gateway/agent-list.js");
const { resolveLinkChannelContext } = await import("./status.link-channel.js");
let getStatusSummary: typeof import("./status.summary.js").getStatusSummary;
let statusSummaryRuntime: typeof import("./status.summary.runtime.js").statusSummaryRuntime;
@@ -202,6 +207,13 @@ describe("getStatusSummary", () => {
statusSummaryMocks.hasConfiguredChannelsForReadOnlyScope.mockReturnValue(true);
statusSummaryMocks.buildChannelSummary.mockResolvedValue(["ok"]);
statusSummaryMocks.listSessionEntries.mockReturnValue([]);
vi.mocked(resolveStorePath).mockReturnValue("/tmp/sessions.json");
vi.mocked(listGatewayAgentsBasic).mockReturnValue({
defaultId: "main",
mainKey: "main",
scope: "per-sender",
agents: [{ id: "main" }],
});
});
it("includes runtimeVersion in the status payload", async () => {
@@ -361,6 +373,69 @@ describe("getStatusSummary", () => {
expect(hydratedKeys).not.toContain("agent:main:session-2");
});
it("passes agent scope when listing configured agent session stores", async () => {
vi.mocked(listGatewayAgentsBasic).mockReturnValue({
defaultId: "main",
mainKey: "main",
scope: "per-sender",
agents: [{ id: "main" }, { id: "ops" }],
});
vi.mocked(resolveStorePath).mockImplementation((_store, opts) => {
return `/tmp/${opts?.agentId ?? "main"}/sessions.json`;
});
statusSummaryMocks.listSessionEntries.mockImplementation((scope) =>
scope?.agentId === "ops"
? toSessionEntrySummaries({
main: { sessionId: "ops-session", updatedAt: 2 },
})
: toSessionEntrySummaries({
main: { sessionId: "main-session", updatedAt: 1 },
}),
);
const summary = await getStatusSummary({ includeChannelSummary: false });
expect(statusSummaryMocks.listSessionEntries).toHaveBeenCalledWith({
agentId: "main",
storePath: "/tmp/main/sessions.json",
});
expect(statusSummaryMocks.listSessionEntries).toHaveBeenCalledWith({
agentId: "ops",
storePath: "/tmp/ops/sessions.json",
});
expect(summary.sessions.count).toBe(2);
expect(summary.sessions.byAgent.map((agent) => [agent.agentId, agent.count])).toEqual([
["main", 1],
["ops", 1],
]);
});
it("aggregates shared file session stores only once", async () => {
vi.mocked(listGatewayAgentsBasic).mockReturnValue({
defaultId: "main",
mainKey: "main",
scope: "per-sender",
agents: [{ id: "main" }, { id: "ops" }],
});
vi.mocked(resolveStorePath).mockReturnValue("/tmp/shared-sessions.json");
statusSummaryMocks.listSessionEntries.mockReturnValue(
toSessionEntrySummaries({
main: { sessionId: "shared-session", updatedAt: 1 },
}),
);
const summary = await getStatusSummary({ includeChannelSummary: false });
expect(summary.sessions.count).toBe(1);
expect(summary.sessions.byAgent.map((agent) => [agent.agentId, agent.count])).toEqual([
["main", 1],
["ops", 1],
]);
expect(statusSummaryMocks.listSessionEntries).toHaveBeenCalledWith({
storePath: "/tmp/shared-sessions.json",
});
});
it("includes configured and selected model labels for pinned sessions", async () => {
vi.mocked(statusSummaryRuntime.resolveConfiguredStatusModelRef).mockReturnValue({
provider: "zhipu",

View File

@@ -132,8 +132,11 @@ function compareSessionCandidatesByUpdatedAt(left: SessionCandidate, right: Sess
return (right.updatedAt ?? 0) - (left.updatedAt ?? 0);
}
function listSessionCandidates(storePath: string) {
return listSessionEntries({ storePath })
function listSessionCandidates(storePath: string, agentId?: string) {
return listSessionEntries({
...(agentId ? { agentId } : {}),
storePath,
})
// Compatibility aggregate buckets are not real user sessions.
.filter(({ sessionKey }) => sessionKey !== "global" && sessionKey !== "unknown")
.map(({ sessionKey, entry }) => ({
@@ -249,13 +252,14 @@ export async function getStatusSummary(
}) ?? DEFAULT_CONTEXT_TOKENS;
const candidateCache = new Map<string, SessionCandidate[]>();
const loadSessionCandidates = (storePath: string) => {
const cached = candidateCache.get(storePath);
const loadSessionCandidates = (storePath: string, agentId?: string) => {
const cacheKey = `${storePath}\0${agentId ?? ""}`;
const cached = candidateCache.get(cacheKey);
if (cached) {
return cached;
}
const candidates = listSessionCandidates(storePath);
candidateCache.set(storePath, candidates);
const candidates = listSessionCandidates(storePath, agentId);
candidateCache.set(cacheKey, candidates);
return candidates;
};
const buildSessionRows = (
@@ -344,11 +348,20 @@ export async function getStatusSummary(
} satisfies SessionStatus;
});
const storeSources = agentList.agents.map((agent) => ({
agentId: agent.id,
storePath: resolveStorePath(cfg.session?.store, { agentId: agent.id }),
}));
const paths = new Set<string>();
const pathCounts = new Map<string, number>();
for (const source of storeSources) {
paths.add(source.storePath);
pathCounts.set(source.storePath, (pathCounts.get(source.storePath) ?? 0) + 1);
}
const byAgent = agentList.agents.map((agent) => {
const storePath = resolveStorePath(cfg.session?.store, { agentId: agent.id });
paths.add(storePath);
const candidates = loadSessionCandidates(storePath);
const candidates = loadSessionCandidates(storePath, agent.id);
const sessions = buildSessionRows(candidates.slice(0, RECENT_SESSION_LIMIT), {
agentIdOverride: agent.id,
});
@@ -360,8 +373,16 @@ export async function getStatusSummary(
};
});
const allSessions = Array.from(paths)
.flatMap((storePath) => loadSessionCandidates(storePath))
const allSessions = storeSources
.filter((source, index, sources) => {
return sources.findIndex((candidate) => candidate.storePath === source.storePath) === index;
})
.flatMap((source) =>
loadSessionCandidates(
source.storePath,
pathCounts.get(source.storePath) === 1 ? source.agentId : undefined,
),
)
.toSorted((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0));
const recent = buildSessionRows(allSessions.slice(0, RECENT_SESSION_LIMIT));
const totalSessions = allSessions.length;