mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
refactor: route transcript readers through facade
This commit is contained in:
@@ -18,11 +18,16 @@ import {
|
||||
} from "../config/sessions.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { readSessionMessagesAsync } from "../gateway/session-utils.fs.js";
|
||||
import { readSessionMessagesAsync } from "../gateway/session-transcript-readers.js";
|
||||
import { resolveGatewaySessionStoreTarget } from "../gateway/session-utils.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
import { isAcpSessionKey, isCronSessionKey, isSubagentSessionKey } from "../routing/session-key.js";
|
||||
import {
|
||||
isAcpSessionKey,
|
||||
isCronSessionKey,
|
||||
isSubagentSessionKey,
|
||||
resolveAgentIdFromSessionKey,
|
||||
} from "../routing/session-key.js";
|
||||
import { resolveSendPolicy } from "../sessions/send-policy.js";
|
||||
import {
|
||||
deliveryContextFromSession,
|
||||
@@ -537,9 +542,12 @@ async function recoverStore(params: {
|
||||
let messages: unknown[];
|
||||
try {
|
||||
messages = await readSessionMessagesAsync(
|
||||
entry.sessionId,
|
||||
params.storePath,
|
||||
entry.sessionFile,
|
||||
{
|
||||
agentId: resolveAgentIdFromSessionKey(sessionKey),
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId: entry.sessionId,
|
||||
storePath: params.storePath,
|
||||
},
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 20,
|
||||
|
||||
@@ -216,9 +216,10 @@ describe("readSubagentOutput", () => {
|
||||
}),
|
||||
).resolves.toBe("fresh recovered output");
|
||||
expect(deps.readSessionMessagesAsync).toHaveBeenCalledWith(
|
||||
"agent:main:subagent:child",
|
||||
undefined,
|
||||
"/tmp/openclaw-internal-run.jsonl",
|
||||
{
|
||||
sessionFile: "/tmp/openclaw-internal-run.jsonl",
|
||||
sessionId: "agent:main:subagent:child",
|
||||
},
|
||||
{ mode: "recent", maxMessages: 100, maxBytes: 1024 * 1024 },
|
||||
);
|
||||
expect(deps.callGateway).not.toHaveBeenCalled();
|
||||
|
||||
@@ -207,9 +207,10 @@ export async function readSubagentOutput(
|
||||
let messages: unknown[] | undefined;
|
||||
if (options?.sessionFile) {
|
||||
const transcriptMessages = await subagentAnnounceOutputDeps.readSessionMessagesAsync(
|
||||
sessionKey,
|
||||
undefined,
|
||||
options.sessionFile,
|
||||
{
|
||||
sessionFile: options.sessionFile,
|
||||
sessionId: sessionKey,
|
||||
},
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 100,
|
||||
|
||||
@@ -12,7 +12,7 @@ export {
|
||||
resolveStorePath,
|
||||
} from "../config/sessions.js";
|
||||
export { callGateway } from "../gateway/call.js";
|
||||
export { readSessionMessagesAsync } from "../gateway/session-utils.fs.js";
|
||||
export { readSessionMessagesAsync } from "../gateway/session-transcript-readers.js";
|
||||
export { dispatchGatewayMethodInProcess } from "../gateway/server-plugins.js";
|
||||
export {
|
||||
isEmbeddedAgentRunActive,
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import * as sessions from "../config/sessions.js";
|
||||
import * as gateway from "../gateway/call.js";
|
||||
import * as sessionUtils from "../gateway/session-utils.fs.js";
|
||||
import * as sessionUtils from "../gateway/session-transcript-readers.js";
|
||||
import { resolveInternalSessionEffectsTranscriptPath } from "./internal-session-effects.js";
|
||||
import * as announceDelivery from "./subagent-announce-delivery.js";
|
||||
import {
|
||||
@@ -32,7 +32,7 @@ vi.mock("../gateway/call.js", () => ({
|
||||
callGateway: vi.fn(async () => ({ runId: "test-run-id" })),
|
||||
}));
|
||||
|
||||
vi.mock("../gateway/session-utils.fs.js", () => ({
|
||||
vi.mock("../gateway/session-transcript-readers.js", () => ({
|
||||
readSessionMessagesAsync: vi.fn(async () => []),
|
||||
}));
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ import {
|
||||
type SessionEntry,
|
||||
} from "../config/sessions.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { readSessionMessagesAsync } from "../gateway/session-utils.fs.js";
|
||||
import { readSessionMessagesAsync } from "../gateway/session-transcript-readers.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { resolveInternalSessionEffectsTranscriptPath } from "./internal-session-effects.js";
|
||||
@@ -294,9 +294,12 @@ export async function recoverOrphanedSubagentSessions(params: {
|
||||
log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`);
|
||||
|
||||
const messages = await readSessionMessagesAsync(
|
||||
entry.sessionId,
|
||||
storePath,
|
||||
entry.sessionFile,
|
||||
{
|
||||
agentId: resolveAgentIdFromSessionKey(childSessionKey),
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId: entry.sessionId,
|
||||
storePath,
|
||||
},
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 200,
|
||||
|
||||
@@ -18,12 +18,14 @@ export {
|
||||
enforceChatHistoryFinalBudget,
|
||||
replaceOversizedChatHistoryMessages,
|
||||
} from "../../gateway/server-methods/chat.js";
|
||||
export { capArrayByJsonBytes } from "../../gateway/session-utils.fs.js";
|
||||
export {
|
||||
capArrayByJsonBytes,
|
||||
readSessionMessagesAsync,
|
||||
} from "../../gateway/session-transcript-readers.js";
|
||||
export {
|
||||
listSessionsFromStoreAsync,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
loadSessionEntry,
|
||||
readSessionMessagesAsync,
|
||||
resolveSessionModelRef,
|
||||
} from "../../gateway/session-utils.js";
|
||||
export { resolveSessionKeyFromResolveParams } from "../../gateway/sessions-resolve.js";
|
||||
|
||||
@@ -120,9 +120,12 @@ describe("embedded gateway stub", () => {
|
||||
maxMessages: 200,
|
||||
});
|
||||
expect(runtime.readSessionMessagesAsync).toHaveBeenCalledWith(
|
||||
"sess-main",
|
||||
"/tmp/openclaw-sessions.json",
|
||||
undefined,
|
||||
{
|
||||
agentId: "main",
|
||||
sessionFile: undefined,
|
||||
sessionId: "sess-main",
|
||||
storePath: "/tmp/openclaw-sessions.json",
|
||||
},
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 200,
|
||||
@@ -180,9 +183,12 @@ describe("embedded gateway stub", () => {
|
||||
maxMessages: 1,
|
||||
});
|
||||
expect(runtime.readSessionMessagesAsync).toHaveBeenCalledWith(
|
||||
"sess-main",
|
||||
"/tmp/openclaw-sessions.json",
|
||||
undefined,
|
||||
{
|
||||
agentId: "main",
|
||||
sessionFile: undefined,
|
||||
sessionId: "sess-main",
|
||||
storePath: "/tmp/openclaw-sessions.json",
|
||||
},
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 1,
|
||||
@@ -209,9 +215,12 @@ describe("embedded gateway stub", () => {
|
||||
maxMessages: 2,
|
||||
});
|
||||
expect(runtime.readSessionMessagesAsync).toHaveBeenCalledWith(
|
||||
"sess-main",
|
||||
"/tmp/openclaw-sessions.json",
|
||||
undefined,
|
||||
{
|
||||
agentId: "main",
|
||||
sessionFile: undefined,
|
||||
sessionId: "sess-main",
|
||||
storePath: "/tmp/openclaw-sessions.json",
|
||||
},
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 2,
|
||||
|
||||
@@ -9,7 +9,10 @@ import type {
|
||||
} from "../../../packages/gateway-protocol/src/index.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import type { CallGatewayOptions } from "../../gateway/call.js";
|
||||
import type { ReadSessionMessagesAsyncOptions } from "../../gateway/session-utils.fs.js";
|
||||
import type {
|
||||
ReadSessionMessagesAsyncOptions,
|
||||
SessionTranscriptReadScope,
|
||||
} from "../../gateway/session-transcript-readers.js";
|
||||
import type { SessionsListResult } from "../../gateway/session-utils.types.js";
|
||||
import type { SessionsResolveResult } from "../../gateway/sessions-resolve.js";
|
||||
import { parseAgentSessionKey } from "../../routing/session-key.js";
|
||||
@@ -71,9 +74,7 @@ interface EmbeddedGatewayRuntime {
|
||||
entry: Record<string, unknown> | undefined;
|
||||
};
|
||||
readSessionMessagesAsync: (
|
||||
sessionId: string,
|
||||
storePath: string,
|
||||
sessionFile: string | undefined,
|
||||
scope: SessionTranscriptReadScope,
|
||||
opts: ReadSessionMessagesAsyncOptions,
|
||||
) => Promise<unknown[]>;
|
||||
resolveSessionModelRef: (
|
||||
@@ -155,9 +156,12 @@ async function handleChatHistory(params: Record<string, unknown>): Promise<{
|
||||
const localMessages =
|
||||
sessionId && storePath
|
||||
? await rt.readSessionMessagesAsync(
|
||||
sessionId,
|
||||
storePath,
|
||||
entry?.sessionFile as string | undefined,
|
||||
{
|
||||
agentId: sessionAgentId,
|
||||
sessionFile: entry?.sessionFile as string | undefined,
|
||||
sessionId,
|
||||
storePath,
|
||||
},
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: max,
|
||||
|
||||
@@ -8,7 +8,7 @@ import { Type } from "typebox";
|
||||
import { getRuntimeConfig } from "../../config/config.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { callGateway } from "../../gateway/call.js";
|
||||
import { capArrayByJsonBytes } from "../../gateway/session-utils.fs.js";
|
||||
import { capArrayByJsonBytes } from "../../gateway/session-transcript-readers.js";
|
||||
import { jsonUtf8Bytes } from "../../infra/json-utf8-bytes.js";
|
||||
import { redactToolPayloadText } from "../../logging/redact.js";
|
||||
import { truncateUtf16Safe } from "../../utils.js";
|
||||
|
||||
@@ -18,10 +18,8 @@ import {
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { callGateway } from "../../gateway/call.js";
|
||||
import {
|
||||
deriveSessionTitle,
|
||||
readSessionTitleFieldsFromTranscriptAsync,
|
||||
} from "../../gateway/session-utils.js";
|
||||
import { readSessionTitleFieldsFromTranscriptAsync } from "../../gateway/session-transcript-readers.js";
|
||||
import { deriveSessionTitle } from "../../gateway/session-utils.js";
|
||||
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
|
||||
import { deliveryContextFromSession } from "../../utils/delivery-context.shared.js";
|
||||
import {
|
||||
@@ -385,12 +383,12 @@ export function createSessionsListTool(opts?: {
|
||||
return;
|
||||
}
|
||||
const target = titleTargets[next];
|
||||
const fields = await readSessionTitleFieldsFromTranscriptAsync(
|
||||
target.sessionId,
|
||||
const fields = await readSessionTitleFieldsFromTranscriptAsync({
|
||||
agentId: target.agentId,
|
||||
sessionFile: target.sessionFile,
|
||||
sessionId: target.sessionId,
|
||||
storePath,
|
||||
target.sessionFile,
|
||||
target.agentId,
|
||||
);
|
||||
});
|
||||
if (includeDerivedTitles && !target.row.derivedTitle) {
|
||||
target.row.derivedTitle = deriveSessionTitle(
|
||||
target.titleEntry,
|
||||
|
||||
@@ -10,7 +10,7 @@ import {
|
||||
type ToolContentBlock,
|
||||
} from "../chat/tool-content.js";
|
||||
import type { SessionEntry } from "../config/sessions.js";
|
||||
import { attachOpenClawTranscriptMeta } from "./session-utils.fs.js";
|
||||
import { attachOpenClawTranscriptMeta } from "./session-transcript-readers.js";
|
||||
|
||||
export const CLAUDE_CLI_PROVIDER = "claude-cli";
|
||||
const CLAUDE_PROJECTS_RELATIVE_DIR = path.join(".claude", "projects");
|
||||
|
||||
@@ -62,7 +62,8 @@ import {
|
||||
shouldRetryToolReadProbe,
|
||||
} from "./live-tool-probe-utils.js";
|
||||
import { startGatewayServer } from "./server.impl.js";
|
||||
import { loadSessionEntry, readSessionMessagesAsync } from "./session-utils.js";
|
||||
import { readSessionMessagesAsync } from "./session-transcript-readers.js";
|
||||
import { loadSessionEntry } from "./session-utils.js";
|
||||
|
||||
const ZAI_FALLBACK = isTruthyEnvValue(process.env.OPENCLAW_LIVE_GATEWAY_ZAI_FALLBACK);
|
||||
const REQUIRE_PROFILE_KEYS = isLiveProfileKeyModeEnabled();
|
||||
@@ -1889,10 +1890,17 @@ async function readSessionAssistantTexts(sessionKey: string, modelKey?: string):
|
||||
if (!entry?.sessionId) {
|
||||
return [];
|
||||
}
|
||||
const messages = await readSessionMessagesAsync(entry.sessionId, storePath, entry.sessionFile, {
|
||||
mode: "full",
|
||||
reason: "live model assistant text verification",
|
||||
});
|
||||
const messages = await readSessionMessagesAsync(
|
||||
{
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId: entry.sessionId,
|
||||
storePath,
|
||||
},
|
||||
{
|
||||
mode: "full",
|
||||
reason: "live model assistant text verification",
|
||||
},
|
||||
);
|
||||
const assistantTexts: string[] = [];
|
||||
for (const message of messages) {
|
||||
if (!message || typeof message !== "object") {
|
||||
|
||||
@@ -30,6 +30,9 @@ vi.mock("./http-utils.js", () => ({
|
||||
|
||||
vi.mock("./session-utils.js", () => ({
|
||||
loadSessionEntry: loadSessionEntryMock,
|
||||
}));
|
||||
|
||||
vi.mock("./session-transcript-readers.js", () => ({
|
||||
readSessionMessagesAsync: readSessionMessagesMock,
|
||||
}));
|
||||
|
||||
|
||||
@@ -26,7 +26,8 @@ import {
|
||||
resolveOpenAiCompatibleHttpSenderIsOwner,
|
||||
} from "./http-utils.js";
|
||||
import { authorizeOperatorScopesForMethod } from "./method-scopes.js";
|
||||
import { loadSessionEntry, readSessionMessagesAsync } from "./session-utils.js";
|
||||
import { readSessionMessagesAsync } from "./session-transcript-readers.js";
|
||||
import { loadSessionEntry } from "./session-utils.js";
|
||||
|
||||
const OUTGOING_IMAGE_ROUTE_PREFIX = "/api/chat/media/outgoing";
|
||||
const DEFAULT_TRANSIENT_OUTGOING_IMAGE_TTL_MS = 15 * 60 * 1000;
|
||||
@@ -703,10 +704,18 @@ async function getSessionManagedOutgoingAttachmentIndex(
|
||||
}
|
||||
}
|
||||
|
||||
const messages = await readSessionMessagesAsync(sessionId, storePath, entry.sessionFile, {
|
||||
mode: "full",
|
||||
reason: "managed outgoing attachment index",
|
||||
});
|
||||
const messages = await readSessionMessagesAsync(
|
||||
{
|
||||
agentId,
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId,
|
||||
storePath,
|
||||
},
|
||||
{
|
||||
mode: "full",
|
||||
reason: "managed outgoing attachment index",
|
||||
},
|
||||
);
|
||||
const index: SessionManagedOutgoingAttachmentIndex = new Set();
|
||||
for (const message of messages) {
|
||||
const meta = (message as { __openclaw?: { id?: string } } | null)?.["__openclaw"];
|
||||
|
||||
@@ -18,6 +18,15 @@ vi.mock("../session-utils.js", async () => {
|
||||
return {
|
||||
...actual,
|
||||
loadSessionEntry: hoisted.loadSessionEntry,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../session-transcript-readers.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../session-transcript-readers.js")>(
|
||||
"../session-transcript-readers.js",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
visitSessionMessagesAsync: hoisted.visitSessionMessagesAsync,
|
||||
};
|
||||
});
|
||||
@@ -205,12 +214,10 @@ describe("artifacts RPC handlers", () => {
|
||||
});
|
||||
|
||||
function mockedMessages(messages: unknown[]) {
|
||||
hoisted.visitSessionMessagesAsync.mockImplementation(
|
||||
async (_sessionId, _storePath, _sessionFile, visit) => {
|
||||
messages.forEach((message, index) => visit(message, index + 1));
|
||||
return messages.length;
|
||||
},
|
||||
);
|
||||
hoisted.visitSessionMessagesAsync.mockImplementation(async (_scope, visit) => {
|
||||
messages.forEach((message, index) => visit(message, index + 1));
|
||||
return messages.length;
|
||||
});
|
||||
}
|
||||
|
||||
it("lists stable transcript artifact summaries by sessionKey", async () => {
|
||||
@@ -233,9 +240,12 @@ describe("artifacts RPC handlers", () => {
|
||||
expect(artifact?.id).toMatch(/^artifact_/);
|
||||
expect(artifact).not.toHaveProperty("data");
|
||||
expect(hoisted.visitSessionMessagesAsync).toHaveBeenCalledWith(
|
||||
"sess-main",
|
||||
"/tmp/sessions.json",
|
||||
"/tmp/sess-main.jsonl",
|
||||
{
|
||||
agentId: "main",
|
||||
sessionFile: "/tmp/sess-main.jsonl",
|
||||
sessionId: "sess-main",
|
||||
storePath: "/tmp/sessions.json",
|
||||
},
|
||||
expect.any(Function),
|
||||
expect.objectContaining({ cache: "skip" }),
|
||||
);
|
||||
|
||||
@@ -25,7 +25,8 @@ import {
|
||||
resolveSessionStoreKey,
|
||||
resolveStoredSessionKeyForAgentStore,
|
||||
} from "../session-store-key.js";
|
||||
import { loadSessionEntry, visitSessionMessagesAsync } from "../session-utils.js";
|
||||
import { visitSessionMessagesAsync } from "../session-transcript-readers.js";
|
||||
import { loadSessionEntry } from "../session-utils.js";
|
||||
import type { GatewayRequestHandlers, RespondFn } from "./types.js";
|
||||
import { assertValidParams } from "./validation.js";
|
||||
|
||||
@@ -464,9 +465,12 @@ async function loadArtifacts(
|
||||
}
|
||||
const artifacts: ArtifactRecord[] = [];
|
||||
await visitSessionMessagesAsync(
|
||||
sessionId,
|
||||
storePath,
|
||||
entry?.sessionFile,
|
||||
{
|
||||
agentId: resolved.agentId ?? resolveAgentIdFromSessionKey(sessionKey),
|
||||
sessionFile: entry?.sessionFile,
|
||||
sessionId,
|
||||
storePath,
|
||||
},
|
||||
(message, seq) => {
|
||||
collectArtifactsFromMessage({
|
||||
message,
|
||||
|
||||
@@ -133,17 +133,19 @@ import { ADMIN_SCOPE } from "../method-scopes.js";
|
||||
import { getMaxChatHistoryMessagesBytes, MAX_PAYLOAD_BYTES } from "../server-constants.js";
|
||||
import { resolveSessionHistoryTailReadOptions } from "../session-history-state.js";
|
||||
import { readSessionTranscriptIndex } from "../session-transcript-index.fs.js";
|
||||
import {
|
||||
readSessionMessageByIdAsync,
|
||||
readSessionMessagesAsync,
|
||||
readRecentSessionMessagesAsync,
|
||||
} from "../session-transcript-readers.js";
|
||||
import {
|
||||
capArrayByJsonBytes,
|
||||
buildGatewaySessionInfo,
|
||||
getSessionDefaults,
|
||||
loadSessionEntry,
|
||||
listAgentsForGateway,
|
||||
readSessionMessageByIdAsync,
|
||||
readSessionMessagesAsync,
|
||||
resolveGatewayModelSupportsImages,
|
||||
resolveDeletedAgentIdFromSessionKey,
|
||||
readRecentSessionMessagesAsync,
|
||||
resolveSessionModelRef,
|
||||
resolveSessionStoreKey,
|
||||
} from "../session-utils.js";
|
||||
@@ -2287,6 +2289,7 @@ async function isChatMessageIdVisibleAfterHistoryFilters(params: {
|
||||
sessionId: string;
|
||||
storePath: string | undefined;
|
||||
sessionFile: string | undefined;
|
||||
agentId?: string;
|
||||
messageId: string;
|
||||
sessionStartedAt?: number;
|
||||
}): Promise<boolean> {
|
||||
@@ -2294,9 +2297,12 @@ async function isChatMessageIdVisibleAfterHistoryFilters(params: {
|
||||
return true;
|
||||
}
|
||||
const messages = await readSessionMessagesAsync(
|
||||
params.sessionId,
|
||||
params.storePath,
|
||||
params.sessionFile,
|
||||
{
|
||||
agentId: params.agentId,
|
||||
sessionFile: params.sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
storePath: params.storePath,
|
||||
},
|
||||
{
|
||||
mode: "full",
|
||||
reason: "chat.message.get visibility",
|
||||
@@ -2395,10 +2401,18 @@ async function handleChatHistoryRequest({
|
||||
};
|
||||
const localMessages =
|
||||
sessionId && storePath
|
||||
? await readRecentSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, {
|
||||
...localHistoryReadOptions,
|
||||
maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024),
|
||||
})
|
||||
? await readRecentSessionMessagesAsync(
|
||||
{
|
||||
agentId: sessionAgentId,
|
||||
sessionFile: entry?.sessionFile,
|
||||
sessionId,
|
||||
storePath,
|
||||
},
|
||||
{
|
||||
...localHistoryReadOptions,
|
||||
maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024),
|
||||
},
|
||||
)
|
||||
: [];
|
||||
const overreadContextMessage =
|
||||
localMessages.length > rawHistoryWindow.maxMessages ? localMessages[0] : undefined;
|
||||
@@ -2560,10 +2574,18 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
|
||||
const sessionAgentId = resolveSessionAgentId({
|
||||
sessionKey,
|
||||
config: cfg,
|
||||
agentId: selectedAgent.agentId,
|
||||
});
|
||||
const resolved = await readSessionMessageByIdAsync(
|
||||
sessionId,
|
||||
storePath,
|
||||
entry?.sessionFile,
|
||||
{
|
||||
agentId: sessionAgentId,
|
||||
sessionFile: entry?.sessionFile,
|
||||
sessionId,
|
||||
storePath,
|
||||
},
|
||||
messageId,
|
||||
);
|
||||
if (!resolved.found) {
|
||||
@@ -2574,6 +2596,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
sessionId,
|
||||
storePath,
|
||||
sessionFile: entry?.sessionFile,
|
||||
agentId: sessionAgentId,
|
||||
messageId,
|
||||
sessionStartedAt:
|
||||
typeof entry?.sessionStartedAt === "number" ? entry.sessionStartedAt : undefined,
|
||||
|
||||
@@ -93,6 +93,12 @@ import {
|
||||
resolveStoredSessionOwnerAgentId,
|
||||
} from "../session-store-key.js";
|
||||
import { reactivateCompletedSubagentSession } from "../session-subagent-reactivation.js";
|
||||
import {
|
||||
readRecentSessionMessagesWithStatsAsync,
|
||||
readRecentSessionTranscriptLines,
|
||||
readSessionMessageCountAsync,
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
} from "../session-transcript-readers.js";
|
||||
import {
|
||||
archiveFileOnDisk,
|
||||
buildGatewaySessionRow,
|
||||
@@ -101,10 +107,6 @@ import {
|
||||
loadGatewaySessionRow,
|
||||
loadSessionEntry,
|
||||
migrateAndPruneGatewaySessionStoreKey,
|
||||
readRecentSessionMessagesWithStatsAsync,
|
||||
readRecentSessionTranscriptLines,
|
||||
readSessionMessageCountAsync,
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
resolveDeletedAgentIdFromSessionKey,
|
||||
resolveFreshestSessionEntryFromStoreKeys,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
@@ -940,7 +942,12 @@ async function handleSessionSend(params: {
|
||||
}
|
||||
|
||||
const messageSeq =
|
||||
(await readSessionMessageCountAsync(entry.sessionId, storePath, entry.sessionFile)) + 1;
|
||||
(await readSessionMessageCountAsync({
|
||||
agentId: requestedAgentId,
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId: entry.sessionId,
|
||||
storePath,
|
||||
})) + 1;
|
||||
let sendAcked = false;
|
||||
let sendPayload: unknown;
|
||||
let sendCached = false;
|
||||
@@ -1278,10 +1285,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
continue;
|
||||
}
|
||||
const items = readSessionPreviewItemsFromTranscript(
|
||||
entry.sessionId,
|
||||
target.storePath,
|
||||
entry.sessionFile,
|
||||
target.agentId,
|
||||
{
|
||||
agentId: target.agentId,
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId: entry.sessionId,
|
||||
storePath: target.storePath,
|
||||
},
|
||||
limit,
|
||||
maxChars,
|
||||
);
|
||||
@@ -1653,11 +1662,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
let runError: unknown;
|
||||
let runMeta: Record<string, unknown> | undefined;
|
||||
const messageSeq = initialMessage
|
||||
? (await readSessionMessageCountAsync(
|
||||
createdEntry.sessionId,
|
||||
target.storePath,
|
||||
createdEntry.sessionFile,
|
||||
)) + 1
|
||||
? (await readSessionMessageCountAsync({
|
||||
agentId: target.agentId,
|
||||
sessionFile: createdEntry.sessionFile,
|
||||
sessionId: createdEntry.sessionId,
|
||||
storePath: target.storePath,
|
||||
})) + 1
|
||||
: undefined;
|
||||
|
||||
if (initialMessage) {
|
||||
@@ -2522,9 +2532,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
const { messages } = await readRecentSessionMessagesWithStatsAsync(
|
||||
entry.sessionId,
|
||||
storePath,
|
||||
entry.sessionFile,
|
||||
{
|
||||
agentId: requestedAgent.agentId,
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId: entry.sessionId,
|
||||
storePath,
|
||||
},
|
||||
{
|
||||
maxMessages: limit,
|
||||
maxLines: limit * 20 + 20,
|
||||
|
||||
@@ -12,11 +12,11 @@ import type {
|
||||
SessionMessageSubscriberRegistry,
|
||||
} from "./server-chat.js";
|
||||
import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js";
|
||||
import { readSessionMessageCountAsync } from "./session-transcript-readers.js";
|
||||
import {
|
||||
attachOpenClawTranscriptMeta,
|
||||
loadGatewaySessionRow,
|
||||
loadSessionEntry,
|
||||
readSessionMessageCountAsync,
|
||||
type GatewaySessionRow,
|
||||
} from "./session-utils.js";
|
||||
|
||||
@@ -174,7 +174,12 @@ async function handleTranscriptUpdateBroadcast(
|
||||
const { entry, storePath } = loadSessionEntry(sessionKey, { agentId: visibleAgentId });
|
||||
messageSeq = entry?.sessionId
|
||||
? asPositiveSafeInteger(
|
||||
await readSessionMessageCountAsync(entry.sessionId, storePath, entry.sessionFile),
|
||||
await readSessionMessageCountAsync({
|
||||
agentId: visibleAgentId,
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId: entry.sessionId,
|
||||
storePath,
|
||||
}),
|
||||
)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import { createHash } from "node:crypto";
|
||||
import { describe, expect, test, vi } from "vitest";
|
||||
import { HEARTBEAT_PROMPT } from "../auto-reply/heartbeat.js";
|
||||
import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js";
|
||||
import * as sessionUtils from "./session-utils.js";
|
||||
import * as sessionTranscriptReaders from "./session-transcript-readers.js";
|
||||
|
||||
type HistorySnapshot = ReturnType<typeof buildSessionHistorySnapshot>;
|
||||
type RawStateOptions = Omit<
|
||||
@@ -90,7 +90,7 @@ function appendAssistantText(state: SessionHistorySseState, text: string, messag
|
||||
describe("SessionHistorySseState", () => {
|
||||
test("uses the initial raw snapshot for both first history and seq seeding", () => {
|
||||
const readSpy = vi
|
||||
.spyOn(sessionUtils, "readSessionMessagesAsync")
|
||||
.spyOn(sessionTranscriptReaders, "readSessionMessagesAsync")
|
||||
.mockResolvedValue([assistantTextMessage("stale disk message", 1)]);
|
||||
try {
|
||||
const state = newState([assistantTextMessage("fresh snapshot message", 2)]);
|
||||
@@ -407,9 +407,11 @@ describe("SessionHistorySseState", () => {
|
||||
});
|
||||
|
||||
test("refreshes limited SSE history from bounded async tail reads", async () => {
|
||||
const fullReadSpy = vi.spyOn(sessionUtils, "readSessionMessagesAsync").mockResolvedValue([]);
|
||||
const fullReadSpy = vi
|
||||
.spyOn(sessionTranscriptReaders, "readSessionMessagesAsync")
|
||||
.mockResolvedValue([]);
|
||||
const tailReadSpy = vi
|
||||
.spyOn(sessionUtils, "readRecentSessionMessagesWithStatsAsync")
|
||||
.spyOn(sessionTranscriptReaders, "readRecentSessionMessagesWithStatsAsync")
|
||||
.mockResolvedValueOnce({
|
||||
messages: [assistantTextMessage("tail two", 8)],
|
||||
totalMessages: 8,
|
||||
|
||||
@@ -7,7 +7,7 @@ import {
|
||||
attachOpenClawTranscriptMeta,
|
||||
readRecentSessionMessagesWithStatsAsync,
|
||||
readSessionMessagesAsync,
|
||||
} from "./session-utils.js";
|
||||
} from "./session-transcript-readers.js";
|
||||
|
||||
// Session history state owns the SSE-friendly projection of transcript JSONL:
|
||||
// raw messages are projected for display, paginated by transcript seq, then
|
||||
@@ -39,6 +39,7 @@ type InlineSessionHistoryAppend = {
|
||||
};
|
||||
|
||||
type SessionHistoryTranscriptTarget = {
|
||||
agentId?: string;
|
||||
sessionId: string;
|
||||
storePath?: string;
|
||||
sessionFile?: string;
|
||||
@@ -344,9 +345,12 @@ export class SessionHistorySseState {
|
||||
private async readRawSnapshotAsync(): Promise<SessionHistoryRawSnapshot> {
|
||||
if (this.cursor === undefined && typeof this.limit === "number") {
|
||||
const snapshot = await readRecentSessionMessagesWithStatsAsync(
|
||||
this.target.sessionId,
|
||||
this.target.storePath,
|
||||
this.target.sessionFile,
|
||||
{
|
||||
agentId: this.target.agentId,
|
||||
sessionFile: this.target.sessionFile,
|
||||
sessionId: this.target.sessionId,
|
||||
storePath: this.target.storePath,
|
||||
},
|
||||
{
|
||||
...resolveSessionHistoryTailReadOptions(this.limit),
|
||||
},
|
||||
@@ -359,9 +363,12 @@ export class SessionHistorySseState {
|
||||
}
|
||||
return {
|
||||
rawMessages: await readSessionMessagesAsync(
|
||||
this.target.sessionId,
|
||||
this.target.storePath,
|
||||
this.target.sessionFile,
|
||||
{
|
||||
agentId: this.target.agentId,
|
||||
sessionFile: this.target.sessionFile,
|
||||
sessionId: this.target.sessionId,
|
||||
storePath: this.target.storePath,
|
||||
},
|
||||
{
|
||||
mode: "full",
|
||||
reason: "session history cursor pagination",
|
||||
|
||||
@@ -62,10 +62,10 @@ import {
|
||||
resolveStableSessionEndTranscript,
|
||||
type ArchivedSessionTranscript,
|
||||
} from "./session-transcript-files.fs.js";
|
||||
import { readSessionMessagesAsync } from "./session-transcript-readers.js";
|
||||
import {
|
||||
loadSessionEntry,
|
||||
migrateAndPruneGatewaySessionStoreKey,
|
||||
readSessionMessagesAsync,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
resolveSessionStoreKey,
|
||||
resolveSessionModelRef,
|
||||
@@ -712,10 +712,18 @@ export async function emitGatewayBeforeResetPluginHook(params: {
|
||||
let messages: unknown[] = [];
|
||||
try {
|
||||
if (typeof sessionId === "string" && sessionId.trim().length > 0) {
|
||||
messages = await readSessionMessagesAsync(sessionId, params.storePath, sessionFile, {
|
||||
mode: "full",
|
||||
reason: "before_reset hook payload",
|
||||
});
|
||||
messages = await readSessionMessagesAsync(
|
||||
{
|
||||
agentId,
|
||||
sessionFile,
|
||||
sessionId,
|
||||
storePath: params.storePath,
|
||||
},
|
||||
{
|
||||
mode: "full",
|
||||
reason: "before_reset hook payload",
|
||||
},
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
|
||||
@@ -27,6 +27,7 @@ import {
|
||||
} from "./session-utils.fs.js";
|
||||
|
||||
export type { ReadRecentSessionMessagesOptions, ReadSessionMessagesAsyncOptions };
|
||||
export { attachOpenClawTranscriptMeta, capArrayByJsonBytes } from "./session-utils.fs.js";
|
||||
|
||||
export type SessionTranscriptReadScope = {
|
||||
agentId?: string;
|
||||
|
||||
@@ -107,6 +107,9 @@ import type {
|
||||
export {
|
||||
archiveFileOnDisk,
|
||||
archiveSessionTranscripts,
|
||||
resolveSessionTranscriptCandidates,
|
||||
} from "./session-utils.fs.js";
|
||||
export {
|
||||
attachOpenClawTranscriptMeta,
|
||||
capArrayByJsonBytes,
|
||||
readFirstUserMessageFromTranscript,
|
||||
@@ -124,10 +127,11 @@ export {
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
readSessionMessagesAsync,
|
||||
visitSessionMessagesAsync,
|
||||
resolveSessionTranscriptCandidates,
|
||||
} from "./session-utils.fs.js";
|
||||
export type { ReadSessionMessagesAsyncOptions } from "./session-utils.fs.js";
|
||||
export type { SessionTranscriptReadScope } from "./session-transcript-readers.js";
|
||||
} from "./session-transcript-readers.js";
|
||||
export type {
|
||||
ReadSessionMessagesAsyncOptions,
|
||||
SessionTranscriptReadScope,
|
||||
} from "./session-transcript-readers.js";
|
||||
export { canonicalizeSpawnedByForAgent, resolveSessionStoreKey } from "./session-store-key.js";
|
||||
export type {
|
||||
GatewayAgentRow,
|
||||
|
||||
@@ -93,10 +93,14 @@ vi.mock("./session-utils.js", () => ({
|
||||
sessionId: "session-1",
|
||||
sessionFile: "/tmp/session-1.jsonl",
|
||||
}),
|
||||
readSessionMessagesAsync: async () => [],
|
||||
resolveSessionTranscriptCandidates: () => ["/tmp/session-1.jsonl"],
|
||||
}));
|
||||
|
||||
vi.mock("./session-transcript-readers.js", () => ({
|
||||
readRecentSessionMessagesWithStatsAsync: async () => ({ messages: [], totalMessages: 0 }),
|
||||
readSessionMessagesAsync: async () => [],
|
||||
}));
|
||||
|
||||
vi.mock("./session-history-state.js", () => ({
|
||||
buildSessionHistorySnapshot: () => ({
|
||||
history: { items: [], nextCursor: null, messages: [] },
|
||||
|
||||
@@ -32,6 +32,8 @@ import { resolveTranscriptPathForComparison } from "./session-transcript-path.js
|
||||
import {
|
||||
readRecentSessionMessagesWithStatsAsync,
|
||||
readSessionMessagesAsync,
|
||||
} from "./session-transcript-readers.js";
|
||||
import {
|
||||
resolveFreshestSessionEntryFromStoreKeys,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
resolveSessionTranscriptCandidates,
|
||||
@@ -142,9 +144,12 @@ export async function handleSessionHistoryHttpRequest(
|
||||
const boundedSnapshot =
|
||||
cursor === undefined && typeof limit === "number"
|
||||
? await readRecentSessionMessagesWithStatsAsync(
|
||||
entry.sessionId,
|
||||
target.storePath,
|
||||
entry.sessionFile,
|
||||
{
|
||||
agentId: target.agentId,
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId: entry.sessionId,
|
||||
storePath: target.storePath,
|
||||
},
|
||||
resolveSessionHistoryTailReadOptions(limit),
|
||||
)
|
||||
: undefined;
|
||||
@@ -153,10 +158,18 @@ export async function handleSessionHistoryHttpRequest(
|
||||
const rawSnapshot =
|
||||
boundedSnapshot?.messages ??
|
||||
(entry?.sessionId
|
||||
? await readSessionMessagesAsync(entry.sessionId, target.storePath, entry.sessionFile, {
|
||||
mode: "full",
|
||||
reason: "session history cursor pagination",
|
||||
})
|
||||
? await readSessionMessagesAsync(
|
||||
{
|
||||
agentId: target.agentId,
|
||||
sessionFile: entry.sessionFile,
|
||||
sessionId: entry.sessionId,
|
||||
storePath: target.storePath,
|
||||
},
|
||||
{
|
||||
mode: "full",
|
||||
reason: "session history cursor pagination",
|
||||
},
|
||||
)
|
||||
: []);
|
||||
const historySnapshot = buildSessionHistorySnapshot({
|
||||
rawMessages: rawSnapshot,
|
||||
@@ -192,6 +205,7 @@ export async function handleSessionHistoryHttpRequest(
|
||||
let sentHistory = history;
|
||||
const sseState = SessionHistorySseState.fromRawSnapshot({
|
||||
target: {
|
||||
agentId: target.agentId,
|
||||
sessionId: entry.sessionId,
|
||||
storePath: target.storePath,
|
||||
sessionFile: entry.sessionFile,
|
||||
|
||||
@@ -42,7 +42,7 @@ import {
|
||||
} from "../config/sessions.js";
|
||||
import { hasSessionAutoModelFallbackProvenance } from "../config/sessions/model-override-provenance.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { readRecentSessionUsageFromTranscript } from "../gateway/session-utils.fs.js";
|
||||
import { readRecentSessionUsageFromTranscript } from "../gateway/session-transcript-readers.js";
|
||||
import { formatTimeAgo } from "../infra/format-time/format-relative.ts";
|
||||
import { resolveCommitHash } from "../infra/git-commit.js";
|
||||
import {
|
||||
@@ -317,10 +317,12 @@ const readUsageFromSessionLog = (
|
||||
|
||||
try {
|
||||
const snapshot = readRecentSessionUsageFromTranscript(
|
||||
sessionId,
|
||||
storePath,
|
||||
sessionEntry?.sessionFile,
|
||||
agentId ?? (sessionKey ? resolveAgentIdFromSessionKey(sessionKey) : undefined),
|
||||
{
|
||||
agentId: agentId ?? (sessionKey ? resolveAgentIdFromSessionKey(sessionKey) : undefined),
|
||||
sessionFile: sessionEntry?.sessionFile,
|
||||
sessionId,
|
||||
storePath,
|
||||
},
|
||||
256 * 1024,
|
||||
);
|
||||
if (!snapshot) {
|
||||
|
||||
@@ -150,7 +150,6 @@ vi.mock("../gateway/session-utils.js", () => ({
|
||||
loadSessionEntry: (sessionKey: string, opts?: { agentId?: string }) =>
|
||||
loadSessionEntryMock(sessionKey, opts),
|
||||
migrateAndPruneGatewaySessionStoreKey: ({ key }: { key: string }) => ({ primaryKey: key }),
|
||||
readSessionMessagesAsync: async () => [],
|
||||
resolveGatewaySessionStoreTarget: ({ key }: { key: string }) => ({
|
||||
canonicalKey: key,
|
||||
storePath: "/tmp/openclaw-sessions.json",
|
||||
@@ -166,8 +165,9 @@ vi.mock("../gateway/session-reset-service.js", () => ({
|
||||
performGatewaySessionReset: () => ({ ok: true, key: "agent:main:main", entry: {} }),
|
||||
}));
|
||||
|
||||
vi.mock("../gateway/session-utils.fs.js", () => ({
|
||||
vi.mock("../gateway/session-transcript-readers.js", () => ({
|
||||
capArrayByJsonBytes: (items: unknown[]) => ({ items }),
|
||||
readSessionMessagesAsync: async () => [],
|
||||
}));
|
||||
|
||||
vi.mock("../gateway/sessions-patch.js", () => ({
|
||||
|
||||
@@ -47,7 +47,10 @@ import {
|
||||
} from "../gateway/server-methods/chat.js";
|
||||
import { loadGatewayModelCatalog } from "../gateway/server-model-catalog.js";
|
||||
import { performGatewaySessionReset } from "../gateway/session-reset-service.js";
|
||||
import { capArrayByJsonBytes } from "../gateway/session-utils.fs.js";
|
||||
import {
|
||||
capArrayByJsonBytes,
|
||||
readSessionMessagesAsync,
|
||||
} from "../gateway/session-transcript-readers.js";
|
||||
import {
|
||||
buildGatewaySessionInfo,
|
||||
getSessionDefaults,
|
||||
@@ -58,7 +61,6 @@ import {
|
||||
migrateAndPruneGatewaySessionStoreKey,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
resolveSessionModelRef,
|
||||
readSessionMessagesAsync,
|
||||
} from "../gateway/session-utils.js";
|
||||
import { applySessionsPatchToStore } from "../gateway/sessions-patch.js";
|
||||
import { type AgentEventPayload, onAgentEvent } from "../infra/agent-events.js";
|
||||
@@ -423,11 +425,19 @@ export class EmbeddedTuiBackend implements TuiBackend {
|
||||
const maxHistoryBytes = getMaxChatHistoryMessagesBytes();
|
||||
const localMessages =
|
||||
sessionId && storePath
|
||||
? await readSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, {
|
||||
mode: "recent",
|
||||
maxMessages: max,
|
||||
maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024),
|
||||
})
|
||||
? await readSessionMessagesAsync(
|
||||
{
|
||||
agentId: sessionAgentId,
|
||||
sessionFile: entry?.sessionFile,
|
||||
sessionId,
|
||||
storePath,
|
||||
},
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: max,
|
||||
maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024),
|
||||
},
|
||||
)
|
||||
: [];
|
||||
const rawMessages = augmentChatHistoryWithCliSessionImports({
|
||||
entry,
|
||||
|
||||
Reference in New Issue
Block a user