mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(sessions): cover terminal transcript markers
This commit is contained in:
committed by
Ayaan Zaidi
parent
0c9ac48d2c
commit
57bed6ae0c
@@ -4,6 +4,7 @@ export {
|
||||
readLatestAssistantTextFromSessionTranscript,
|
||||
resolveAndPersistSessionFile,
|
||||
resolveSessionStoreEntry,
|
||||
updateSessionStoreEntry,
|
||||
} from "openclaw/plugin-sdk/session-store-runtime";
|
||||
export { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime";
|
||||
export { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime";
|
||||
|
||||
@@ -70,7 +70,11 @@ const createChannelMessageReplyPipeline = vi.hoisted(() =>
|
||||
);
|
||||
const wasSentByBot = vi.hoisted(() => vi.fn(() => false));
|
||||
const appendSessionTranscriptMessage = vi.hoisted(() =>
|
||||
vi.fn(async (_params: { message?: unknown }) => ({ messageId: "m1" })),
|
||||
vi.fn(async ({ message }: { message?: unknown }) => ({
|
||||
messageId: "m1",
|
||||
message,
|
||||
appended: true,
|
||||
})),
|
||||
);
|
||||
const emitSessionTranscriptUpdate = vi.hoisted(() => vi.fn());
|
||||
const loadSessionStore = vi.hoisted(() => vi.fn());
|
||||
@@ -101,6 +105,7 @@ const resolveSessionStoreEntry = vi.hoisted(() =>
|
||||
existing: store[sessionKey],
|
||||
})),
|
||||
);
|
||||
const updateSessionStoreEntry = vi.hoisted(() => vi.fn(async () => null));
|
||||
|
||||
vi.mock("./draft-stream.js", () => ({
|
||||
createTelegramDraftStream,
|
||||
@@ -155,6 +160,7 @@ vi.mock("./bot-message-dispatch.runtime.js", () => ({
|
||||
resolveMarkdownTableMode,
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
updateSessionStoreEntry,
|
||||
}));
|
||||
|
||||
vi.mock("./bot-message-dispatch.agent.runtime.js", () => ({
|
||||
@@ -268,11 +274,13 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
loadSessionStore.mockReset();
|
||||
resolveStorePath.mockReset();
|
||||
resolveAndPersistSessionFile.mockReset();
|
||||
updateSessionStoreEntry.mockReset();
|
||||
generateTopicLabel.mockReset();
|
||||
getAgentScopedMediaLocalRoots.mockClear();
|
||||
resolveChunkMode.mockClear();
|
||||
resolveMarkdownTableMode.mockClear();
|
||||
resolveSessionStoreEntry.mockClear();
|
||||
updateSessionStoreEntry.mockClear();
|
||||
describeStickerImage.mockReset();
|
||||
loadModelCatalog.mockReset();
|
||||
findModelInCatalog.mockReset();
|
||||
@@ -1445,6 +1453,29 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("advances the session marker after mirroring preview-finalized finals", async () => {
|
||||
setupDraftStreams({ answerMessageId: 2001 });
|
||||
const context = createContext();
|
||||
context.ctxPayload.SessionKey = "agent:default:telegram:direct:123";
|
||||
loadSessionStore.mockReturnValue({
|
||||
"agent:default:telegram:direct:123": { sessionId: "s1" },
|
||||
});
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
});
|
||||
|
||||
await dispatchWithContext({ context });
|
||||
|
||||
const markerUpdateCall = expectRecordFields(mockCallArg(updateSessionStoreEntry), {
|
||||
storePath: "/tmp/sessions.json",
|
||||
sessionKey: "agent:default:telegram:direct:123",
|
||||
});
|
||||
const update = markerUpdateCall.update as (entry: { sessionId?: string }) => unknown;
|
||||
expect(update({ sessionId: "s1" })).toEqual({ updatedAt: expect.any(Number) });
|
||||
expect(update({ sessionId: "new-session" })).toBeNull();
|
||||
});
|
||||
|
||||
it("does not mirror non-final tool progress into the session transcript", async () => {
|
||||
const context = createContext();
|
||||
context.ctxPayload.SessionKey = "agent:default:telegram:direct:123";
|
||||
|
||||
@@ -79,6 +79,7 @@ import {
|
||||
resolveMarkdownTableMode,
|
||||
resolveAndPersistSessionFile,
|
||||
resolveSessionStoreEntry,
|
||||
updateSessionStoreEntry,
|
||||
} from "./bot-message-dispatch.runtime.js";
|
||||
import type { TelegramBotOptions } from "./bot.types.js";
|
||||
import { deliverReplies, emitInternalMessageSentHook } from "./bot/delivery.js";
|
||||
@@ -373,11 +374,26 @@ async function mirrorTelegramAssistantReplyToTranscript(params: {
|
||||
stopReason: "stop" as const,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
const { messageId, message: appendedMessage } = await appendSessionTranscriptMessage({
|
||||
const {
|
||||
appended,
|
||||
messageId,
|
||||
message: appendedMessage,
|
||||
} = await appendSessionTranscriptMessage({
|
||||
transcriptPath: sessionFile,
|
||||
message,
|
||||
config: params.cfg,
|
||||
});
|
||||
if (appended) {
|
||||
const transcriptMarkerUpdatedAt = Date.now();
|
||||
await updateSessionStoreEntry({
|
||||
storePath,
|
||||
sessionKey: params.sessionKey,
|
||||
update: (current) =>
|
||||
current.sessionId === sessionEntry.sessionId
|
||||
? { updatedAt: transcriptMarkerUpdatedAt }
|
||||
: null,
|
||||
});
|
||||
}
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
|
||||
@@ -48,6 +48,7 @@ import {
|
||||
claudeCliSessionTranscriptHasContent,
|
||||
resolveFallbackRetryPrompt,
|
||||
} from "./attempt-execution.helpers.js";
|
||||
import { persistSessionEntry } from "./attempt-execution.shared.js";
|
||||
import { resolveAgentRunContext } from "./run-context.js";
|
||||
import { clearCliSessionInStore } from "./session-store.js";
|
||||
import type { AgentCommandOpts } from "./types.js";
|
||||
|
||||
@@ -70,6 +70,8 @@ function clearRotatedTerminalMainSessionMetadata(
|
||||
endedAt: undefined,
|
||||
runtimeMs: undefined,
|
||||
abortedLastRun: undefined,
|
||||
sessionStartedAt: undefined,
|
||||
lastInteractionAt: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -362,16 +364,18 @@ export function resolveSession(opts: {
|
||||
resetType,
|
||||
resetOverride: channelReset,
|
||||
});
|
||||
const terminalMainTranscriptNewerThanRegistry = sessionEntry
|
||||
? hasTerminalMainSessionTranscriptNewerThanRegistrySync({
|
||||
entry: sessionEntry,
|
||||
sessionScope: sessionCfg?.scope,
|
||||
sessionKey,
|
||||
agentId: sessionAgentId,
|
||||
mainKey: sessionCfg?.mainKey,
|
||||
storePath,
|
||||
})
|
||||
: false;
|
||||
const requestedSessionId = opts.sessionId?.trim() || undefined;
|
||||
const terminalMainTranscriptNewerThanRegistry =
|
||||
sessionEntry && !requestedSessionId
|
||||
? hasTerminalMainSessionTranscriptNewerThanRegistrySync({
|
||||
entry: sessionEntry,
|
||||
sessionScope: sessionCfg?.scope,
|
||||
sessionKey,
|
||||
agentId: sessionAgentId,
|
||||
mainKey: sessionCfg?.mainKey,
|
||||
storePath,
|
||||
})
|
||||
: false;
|
||||
const fresh = sessionEntry
|
||||
? !terminalMainTranscriptNewerThanRegistry &&
|
||||
evaluateSessionFreshness({
|
||||
@@ -386,8 +390,8 @@ export function resolveSession(opts: {
|
||||
}).fresh
|
||||
: false;
|
||||
const sessionId =
|
||||
opts.sessionId?.trim() || (fresh ? sessionEntry?.sessionId : undefined) || crypto.randomUUID();
|
||||
const isNewSession = !fresh && !opts.sessionId;
|
||||
requestedSessionId || (fresh ? sessionEntry?.sessionId : undefined) || crypto.randomUUID();
|
||||
const isNewSession = !fresh && !requestedSessionId;
|
||||
const resolvedSessionEntry = terminalMainTranscriptNewerThanRegistry
|
||||
? clearRotatedTerminalMainSessionMetadata(sessionEntry)
|
||||
: sessionEntry;
|
||||
|
||||
@@ -186,6 +186,7 @@ async function writeTerminalTranscriptSessionStore(params: {
|
||||
sessionKey: string;
|
||||
sessionId: string;
|
||||
status?: SessionEntry["status"];
|
||||
omitStatus?: boolean;
|
||||
updatedAt: number;
|
||||
endedAt: number;
|
||||
transcriptMtimeMs: number;
|
||||
@@ -198,6 +199,7 @@ async function writeTerminalTranscriptSessionStore(params: {
|
||||
"utf-8",
|
||||
);
|
||||
await fs.utimes(transcriptPath, params.transcriptMtimeMs / 1000, params.transcriptMtimeMs / 1000);
|
||||
const status = params.status ?? (params.omitStatus ? undefined : "done");
|
||||
await writeSessionStoreFast(params.storePath, {
|
||||
[params.sessionKey]: {
|
||||
sessionId: params.sessionId,
|
||||
@@ -206,7 +208,7 @@ async function writeTerminalTranscriptSessionStore(params: {
|
||||
startedAt: params.endedAt - 10_000,
|
||||
endedAt: params.endedAt,
|
||||
runtimeMs: 9_000,
|
||||
status: params.status ?? "done",
|
||||
...(status ? { status } : {}),
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -1641,6 +1643,15 @@ describe("initSessionState reset policy", () => {
|
||||
transcriptMtimeOffsetMs: 0,
|
||||
expectNewSession: true,
|
||||
},
|
||||
{
|
||||
name: "main endedAt-only rows rotate when transcript is newer than updatedAt",
|
||||
sessionKey: "agent:main:main",
|
||||
updatedAtOffsetMs: -10_000,
|
||||
endedAtOffsetMs: -11_000,
|
||||
transcriptMtimeOffsetMs: 0,
|
||||
omitStatus: true,
|
||||
expectNewSession: true,
|
||||
},
|
||||
{
|
||||
name: "failed main terminal rows reuse when the transcript exists",
|
||||
sessionKey: "agent:main:main",
|
||||
@@ -1687,6 +1698,7 @@ describe("initSessionState reset policy", () => {
|
||||
sessionKey: scenario.sessionKey,
|
||||
sessionId: existingSessionId,
|
||||
status: scenario.status,
|
||||
omitStatus: scenario.omitStatus,
|
||||
updatedAt: terminalUpdatedAt,
|
||||
endedAt: terminalEndedAt,
|
||||
transcriptMtimeMs: now + scenario.transcriptMtimeOffsetMs,
|
||||
|
||||
@@ -151,9 +151,20 @@ describe("agent session resolution", () => {
|
||||
|
||||
it("rotates stale terminal main sessions whose transcript is newer than the registry", async () => {
|
||||
const scenarios = [
|
||||
{ label: "canonical main", mainKey: "main", sessionKey: "agent:main:main" },
|
||||
{ label: "raw main alias", mainKey: "main", sessionKey: "main" },
|
||||
{ label: "custom main alias", mainKey: "work", sessionKey: "agent:main:main" },
|
||||
{
|
||||
label: "canonical main",
|
||||
mainKey: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
status: "done" as const,
|
||||
},
|
||||
{ label: "raw main alias", mainKey: "main", sessionKey: "main", status: "done" as const },
|
||||
{
|
||||
label: "custom main alias",
|
||||
mainKey: "work",
|
||||
sessionKey: "agent:main:main",
|
||||
status: "done" as const,
|
||||
},
|
||||
{ label: "endedAt-only main", mainKey: "main", sessionKey: "agent:main:main" },
|
||||
];
|
||||
for (const scenario of scenarios) {
|
||||
await withTempHome(async (home) => {
|
||||
@@ -173,7 +184,9 @@ describe("agent session resolution", () => {
|
||||
sessionId,
|
||||
sessionFile,
|
||||
updatedAt: registryUpdatedAt,
|
||||
status: "done",
|
||||
...(scenario.status ? { status: scenario.status } : {}),
|
||||
sessionStartedAt: registryUpdatedAt - 60_000,
|
||||
lastInteractionAt: registryUpdatedAt - 30_000,
|
||||
startedAt: registryUpdatedAt - 1_000,
|
||||
endedAt: registryUpdatedAt - 100,
|
||||
},
|
||||
@@ -190,6 +203,8 @@ describe("agent session resolution", () => {
|
||||
expect(resolution.sessionEntry?.startedAt).toBeUndefined();
|
||||
expect(resolution.sessionEntry?.endedAt).toBeUndefined();
|
||||
expect(resolution.sessionEntry?.runtimeMs).toBeUndefined();
|
||||
expect(resolution.sessionEntry?.sessionStartedAt).toBeUndefined();
|
||||
expect(resolution.sessionEntry?.lastInteractionAt).toBeUndefined();
|
||||
|
||||
const sessionStore = {
|
||||
[scenario.sessionKey]: resolution.sessionEntry!,
|
||||
@@ -230,10 +245,73 @@ describe("agent session resolution", () => {
|
||||
expect(persisted?.startedAt).toBeUndefined();
|
||||
expect(persisted?.endedAt).toBeUndefined();
|
||||
expect(persisted?.runtimeMs).toBeUndefined();
|
||||
expect(persisted?.sessionStartedAt).toBeGreaterThan(registryUpdatedAt);
|
||||
expect(persisted?.lastInteractionAt).toBeGreaterThan(registryUpdatedAt);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
it("preserves explicit session-id resumes for stale terminal main rows", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const store = path.join(home, "sessions.json");
|
||||
const sessionFile = path.join(home, "explicit-terminal-main.jsonl");
|
||||
const sessionId = "explicit-terminal-main";
|
||||
const registryUpdatedAt = Date.now() - 10_000;
|
||||
fs.writeFileSync(sessionFile, JSON.stringify({ type: "session", id: sessionId }) + "\n");
|
||||
fs.utimesSync(
|
||||
sessionFile,
|
||||
(registryUpdatedAt + 5_000) / 1000,
|
||||
(registryUpdatedAt + 5_000) / 1000,
|
||||
);
|
||||
writeSessionStoreSeed(store, {
|
||||
"agent:main:main": {
|
||||
sessionId,
|
||||
sessionFile,
|
||||
updatedAt: registryUpdatedAt,
|
||||
status: "done",
|
||||
startedAt: registryUpdatedAt - 1_000,
|
||||
endedAt: registryUpdatedAt - 100,
|
||||
runtimeMs: 900,
|
||||
},
|
||||
});
|
||||
const cfg = mockConfig(home, store);
|
||||
|
||||
const resolution = resolveSession({ cfg, sessionId });
|
||||
|
||||
expect(resolution.sessionKey).toBe("agent:main:main");
|
||||
expect(resolution.sessionId).toBe(sessionId);
|
||||
expect(resolution.isNewSession).toBe(false);
|
||||
expect(resolution.sessionEntry?.sessionFile).toBe(sessionFile);
|
||||
expect(resolution.sessionEntry?.status).toBe("done");
|
||||
expect(resolution.sessionEntry?.startedAt).toBe(registryUpdatedAt - 1_000);
|
||||
expect(resolution.sessionEntry?.endedAt).toBe(registryUpdatedAt - 100);
|
||||
expect(resolution.sessionEntry?.runtimeMs).toBe(900);
|
||||
|
||||
if (!resolution.sessionKey || !resolution.sessionStore) {
|
||||
throw new Error("expected resolved explicit session store");
|
||||
}
|
||||
const resolvedTranscript = await resolveSessionTranscriptFile({
|
||||
sessionId: resolution.sessionId,
|
||||
sessionKey: resolution.sessionKey,
|
||||
sessionEntry: resolution.sessionEntry,
|
||||
sessionStore: resolution.sessionStore,
|
||||
storePath: resolution.storePath,
|
||||
agentId: "main",
|
||||
});
|
||||
expect(resolvedTranscript.sessionFile).toBe(sessionFile);
|
||||
|
||||
const persisted = loadSessionStore(resolution.storePath, { skipCache: true })[
|
||||
resolution.sessionKey
|
||||
];
|
||||
expect(persisted?.sessionId).toBe(sessionId);
|
||||
expect(persisted?.sessionFile).toBe(sessionFile);
|
||||
expect(persisted?.status).toBe("done");
|
||||
expect(persisted?.startedAt).toBe(registryUpdatedAt - 1_000);
|
||||
expect(persisted?.endedAt).toBe(registryUpdatedAt - 100);
|
||||
expect(persisted?.runtimeMs).toBe(900);
|
||||
});
|
||||
});
|
||||
|
||||
it("forwards resolved outbound session context when resuming by sessionId", async () => {
|
||||
await withCrossAgentResumeFixture(async ({ sessionId, sessionKey, cfg }) => {
|
||||
const resolution = resolveSession({ cfg, sessionId });
|
||||
|
||||
@@ -159,7 +159,10 @@ export function resolveTerminalMainSessionTranscriptRegistryCheck(
|
||||
if (candidateSessionKey !== configuredMainSessionKey) {
|
||||
return undefined;
|
||||
}
|
||||
if (!isTerminalSessionStatus(params.entry.status)) {
|
||||
const hasTerminalLifecycle =
|
||||
isTerminalSessionStatus(params.entry.status) ||
|
||||
resolvePositiveTimestamp(params.entry.endedAt) !== undefined;
|
||||
if (!hasTerminalLifecycle) {
|
||||
return undefined;
|
||||
}
|
||||
if (params.entry.status === "failed") {
|
||||
|
||||
@@ -138,6 +138,91 @@ describe("appendAssistantMessageToSessionTranscript", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("advances the session registry marker after managed transcript appends", async () => {
|
||||
const updatedAt = Date.parse("2026-05-18T09:00:00.000Z");
|
||||
const appendedAt = Date.parse("2026-05-18T09:05:00.000Z");
|
||||
const sessionFile = "managed-marker.jsonl";
|
||||
fs.writeFileSync(
|
||||
fixture.storePath(),
|
||||
JSON.stringify({
|
||||
[sessionKey]: {
|
||||
sessionId,
|
||||
sessionFile,
|
||||
updatedAt,
|
||||
status: "done",
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
vi.useFakeTimers({ toFake: ["Date"] });
|
||||
vi.setSystemTime(appendedAt);
|
||||
try {
|
||||
const result = await appendAssistantMessageToSessionTranscript({
|
||||
sessionKey,
|
||||
text: "Hello with registry marker",
|
||||
storePath: fixture.storePath(),
|
||||
});
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
const store = JSON.parse(fs.readFileSync(fixture.storePath(), "utf-8")) as Record<
|
||||
string,
|
||||
{ updatedAt?: number; status?: string }
|
||||
>;
|
||||
expect(store[sessionKey]?.updatedAt).toBe(appendedAt);
|
||||
expect(store[sessionKey]?.status).toBe("done");
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not advance the registry marker for duplicate delivery mirror replays", async () => {
|
||||
const updatedAt = Date.parse("2026-05-18T10:00:00.000Z");
|
||||
const firstAppendAt = Date.parse("2026-05-18T10:05:00.000Z");
|
||||
const duplicateReplayAt = Date.parse("2026-05-18T10:10:00.000Z");
|
||||
const sessionFile = "duplicate-marker.jsonl";
|
||||
fs.writeFileSync(
|
||||
fixture.storePath(),
|
||||
JSON.stringify({
|
||||
[sessionKey]: {
|
||||
sessionId,
|
||||
sessionFile,
|
||||
updatedAt,
|
||||
status: "done",
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
vi.useFakeTimers({ toFake: ["Date"] });
|
||||
try {
|
||||
vi.setSystemTime(firstAppendAt);
|
||||
const first = await appendAssistantMessageToSessionTranscript({
|
||||
sessionKey,
|
||||
text: "Replay-safe marker",
|
||||
storePath: fixture.storePath(),
|
||||
});
|
||||
expect(first.ok).toBe(true);
|
||||
|
||||
vi.setSystemTime(duplicateReplayAt);
|
||||
const duplicate = await appendAssistantMessageToSessionTranscript({
|
||||
sessionKey,
|
||||
text: "Replay-safe marker",
|
||||
storePath: fixture.storePath(),
|
||||
});
|
||||
expect(duplicate.ok).toBe(true);
|
||||
|
||||
const store = JSON.parse(fs.readFileSync(fixture.storePath(), "utf-8")) as Record<
|
||||
string,
|
||||
{ updatedAt?: number }
|
||||
>;
|
||||
expect(store[sessionKey]?.updatedAt).toBe(firstAppendAt);
|
||||
if (first.ok && duplicate.ok) {
|
||||
expect(duplicate.messageId).toBe(first.messageId);
|
||||
}
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("uses spawned cwd when creating a missing transcript header", async () => {
|
||||
const taskCwd = path.join(fixture.sessionsDir(), "task-repo");
|
||||
fs.mkdirSync(taskCwd, { recursive: true });
|
||||
|
||||
@@ -15,7 +15,7 @@ import {
|
||||
resolveSessionTranscriptPath,
|
||||
} from "./paths.js";
|
||||
import { resolveAndPersistSessionFile } from "./session-file.js";
|
||||
import { loadSessionStore, resolveSessionStoreEntry } from "./store.js";
|
||||
import { loadSessionStore, resolveSessionStoreEntry, updateSessionStoreEntry } from "./store.js";
|
||||
import { parseSessionThreadInfo } from "./thread-info.js";
|
||||
import { appendSessionTranscriptMessage } from "./transcript-append.js";
|
||||
import { createSessionTranscriptHeader } from "./transcript-header.js";
|
||||
@@ -299,7 +299,8 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
};
|
||||
}
|
||||
|
||||
return await runWithOwnedSessionTranscriptWriteLock(
|
||||
let transcriptMarkerUpdatedAt: number | undefined;
|
||||
const result = await runWithOwnedSessionTranscriptWriteLock(
|
||||
{ sessionFile, sessionKey: resolved.normalizedKey },
|
||||
async () => {
|
||||
const explicitIdempotencyKey =
|
||||
@@ -340,6 +341,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
if (!appended) {
|
||||
return { ok: true, sessionFile, messageId };
|
||||
}
|
||||
transcriptMarkerUpdatedAt = Date.now();
|
||||
|
||||
switch (params.updateMode ?? "inline") {
|
||||
case "inline":
|
||||
@@ -364,6 +366,15 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
return { ok: true, sessionFile, messageId };
|
||||
},
|
||||
);
|
||||
if (result.ok && transcriptMarkerUpdatedAt !== undefined) {
|
||||
await updateSessionStoreEntry({
|
||||
storePath,
|
||||
sessionKey: resolved.normalizedKey,
|
||||
update: (current) =>
|
||||
current.sessionId === entry.sessionId ? { updatedAt: transcriptMarkerUpdatedAt } : null,
|
||||
});
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function isRedundantDeliveryMirror(message: SessionTranscriptAssistantMessage): boolean {
|
||||
|
||||
@@ -723,14 +723,72 @@ describe("gateway agent handler", () => {
|
||||
expect(capturedEntry?.sessionFile).toBeUndefined();
|
||||
});
|
||||
|
||||
it("rotates a terminal main session when its transcript is newer than the registry row", async () => {
|
||||
const now = Date.parse("2026-05-18T09:47:00.000Z");
|
||||
it.each([
|
||||
{ name: "status terminal row", status: "done" as const },
|
||||
{ name: "endedAt-only terminal row" },
|
||||
])(
|
||||
"rotates a terminal main session from a $name when its transcript is newer",
|
||||
async (scenario) => {
|
||||
const now = Date.parse("2026-05-18T09:47:00.000Z");
|
||||
vi.useFakeTimers({ toFake: ["Date"] });
|
||||
dateOnlyFakeClockActive = true;
|
||||
vi.setSystemTime(now);
|
||||
|
||||
await withTempDir(
|
||||
{ prefix: "openclaw-gateway-terminal-main-newer-transcript-" },
|
||||
async (root) => {
|
||||
const sessionsDir = `${root}/sessions`;
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const sessionFile = "terminal-main-session.jsonl";
|
||||
const transcriptPath = `${sessionsDir}/${sessionFile}`;
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
`${JSON.stringify({ type: "session", id: "terminal-main-session" })}\n`,
|
||||
"utf8",
|
||||
);
|
||||
await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000));
|
||||
mocks.loadSessionEntry.mockReturnValue({
|
||||
cfg: {},
|
||||
storePath: `${sessionsDir}/sessions.json`,
|
||||
entry: {
|
||||
sessionId: "terminal-main-session",
|
||||
sessionFile,
|
||||
...(scenario.status ? { status: scenario.status } : {}),
|
||||
updatedAt: now - 10_000,
|
||||
sessionStartedAt: now - 60_000,
|
||||
lastInteractionAt: now - 10_000,
|
||||
startedAt: now - 20_000,
|
||||
endedAt: now - 15_000,
|
||||
runtimeMs: 5_000,
|
||||
},
|
||||
canonicalKey: "agent:main:main",
|
||||
});
|
||||
|
||||
const capturedEntry = await runMainAgentAndCaptureEntry(
|
||||
"test-idem-terminal-main-newer-transcript",
|
||||
);
|
||||
|
||||
const call = await waitForAgentCommandCall<{ sessionId?: string }>();
|
||||
expect(call.sessionId).not.toBe("terminal-main-session");
|
||||
expect(capturedEntry?.sessionId).not.toBe("terminal-main-session");
|
||||
expect(capturedEntry?.status).toBeUndefined();
|
||||
expect(capturedEntry?.startedAt).toBeUndefined();
|
||||
expect(capturedEntry?.endedAt).toBeUndefined();
|
||||
expect(capturedEntry?.runtimeMs).toBeUndefined();
|
||||
expect(capturedEntry?.sessionFile).toBeUndefined();
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it("honors explicit gateway session-id resumes for terminal main rows", async () => {
|
||||
const now = Date.parse("2026-05-18T09:48:00.000Z");
|
||||
vi.useFakeTimers({ toFake: ["Date"] });
|
||||
dateOnlyFakeClockActive = true;
|
||||
vi.setSystemTime(now);
|
||||
|
||||
await withTempDir(
|
||||
{ prefix: "openclaw-gateway-terminal-main-newer-transcript-" },
|
||||
{ prefix: "openclaw-gateway-terminal-main-explicit-resume-" },
|
||||
async (root) => {
|
||||
const sessionsDir = `${root}/sessions`;
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
@@ -742,35 +800,53 @@ describe("gateway agent handler", () => {
|
||||
"utf8",
|
||||
);
|
||||
await fs.utimes(transcriptPath, new Date(now - 1_000), new Date(now - 1_000));
|
||||
const existingEntry = {
|
||||
sessionId: "terminal-main-session",
|
||||
sessionFile,
|
||||
status: "done",
|
||||
updatedAt: now - 10_000,
|
||||
sessionStartedAt: now - 60_000,
|
||||
lastInteractionAt: now - 10_000,
|
||||
startedAt: now - 20_000,
|
||||
endedAt: now - 15_000,
|
||||
runtimeMs: 5_000,
|
||||
};
|
||||
mocks.loadSessionEntry.mockReturnValue({
|
||||
cfg: {},
|
||||
storePath: `${sessionsDir}/sessions.json`,
|
||||
entry: {
|
||||
sessionId: "terminal-main-session",
|
||||
sessionFile,
|
||||
status: "done",
|
||||
updatedAt: now - 10_000,
|
||||
sessionStartedAt: now - 60_000,
|
||||
lastInteractionAt: now - 10_000,
|
||||
startedAt: now - 20_000,
|
||||
endedAt: now - 15_000,
|
||||
runtimeMs: 5_000,
|
||||
},
|
||||
entry: existingEntry,
|
||||
canonicalKey: "agent:main:main",
|
||||
});
|
||||
let capturedEntry: Record<string, unknown> | undefined;
|
||||
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
|
||||
const store: Record<string, unknown> = {
|
||||
"agent:main:main": { ...existingEntry },
|
||||
};
|
||||
const result = await updater(store);
|
||||
capturedEntry = result as Record<string, unknown>;
|
||||
return result;
|
||||
});
|
||||
mocks.agentCommand.mockResolvedValue({
|
||||
payloads: [{ text: "ok" }],
|
||||
meta: { durationMs: 100 },
|
||||
});
|
||||
|
||||
const capturedEntry = await runMainAgentAndCaptureEntry(
|
||||
"test-idem-terminal-main-newer-transcript",
|
||||
);
|
||||
await invokeAgent({
|
||||
message: "resume terminal main",
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
sessionId: "terminal-main-session",
|
||||
idempotencyKey: "test-idem-terminal-main-explicit-resume",
|
||||
} as AgentParams);
|
||||
|
||||
const call = await waitForAgentCommandCall<{ sessionId?: string }>();
|
||||
expect(call.sessionId).not.toBe("terminal-main-session");
|
||||
expect(capturedEntry?.sessionId).not.toBe("terminal-main-session");
|
||||
expect(capturedEntry?.status).toBeUndefined();
|
||||
expect(capturedEntry?.startedAt).toBeUndefined();
|
||||
expect(capturedEntry?.endedAt).toBeUndefined();
|
||||
expect(capturedEntry?.runtimeMs).toBeUndefined();
|
||||
expect(capturedEntry?.sessionFile).toBeUndefined();
|
||||
expect(call.sessionId).toBe("terminal-main-session");
|
||||
expect(capturedEntry?.sessionId).toBe("terminal-main-session");
|
||||
expect(capturedEntry?.sessionFile).toBe(sessionFile);
|
||||
expect(capturedEntry?.status).toBe("done");
|
||||
expect(capturedEntry?.startedAt).toBe(now - 20_000);
|
||||
expect(capturedEntry?.endedAt).toBe(now - 15_000);
|
||||
expect(capturedEntry?.runtimeMs).toBe(5_000);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -1665,16 +1665,20 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
const isSystemGatewayRun =
|
||||
request.bootstrapContextRunKind === "cron" ||
|
||||
request.bootstrapContextRunKind === "heartbeat";
|
||||
const terminalMainTranscriptCheck = isSystemGatewayRun
|
||||
? undefined
|
||||
: resolveTerminalMainSessionTranscriptRegistryCheck({
|
||||
entry,
|
||||
sessionScope: cfgLocal.session?.scope,
|
||||
sessionKey: canonicalKey,
|
||||
agentId: canonicalSessionAgentId,
|
||||
mainKey: cfgLocal.session?.mainKey,
|
||||
storePath,
|
||||
});
|
||||
const requestedSessionMatchesEntry = Boolean(
|
||||
requestedSessionId && entry?.sessionId?.trim() === requestedSessionId,
|
||||
);
|
||||
const terminalMainTranscriptCheck =
|
||||
isSystemGatewayRun || requestedSessionMatchesEntry
|
||||
? undefined
|
||||
: resolveTerminalMainSessionTranscriptRegistryCheck({
|
||||
entry,
|
||||
sessionScope: cfgLocal.session?.scope,
|
||||
sessionKey: canonicalKey,
|
||||
agentId: canonicalSessionAgentId,
|
||||
mainKey: cfgLocal.session?.mainKey,
|
||||
storePath,
|
||||
});
|
||||
const terminalMainTranscriptNewerThanRegistry = terminalMainTranscriptCheck
|
||||
? await hasTerminalMainSessionTranscriptNewerThanRegistry({
|
||||
entry,
|
||||
|
||||
@@ -2328,6 +2328,54 @@ describe("chat directive tag stripping for non-streaming final payloads", () =>
|
||||
expect(nodeSend?.[2].sessionKey).toBe("agent:main:canon");
|
||||
});
|
||||
|
||||
it("chat.inject advances the session registry marker after transcript append", async () => {
|
||||
const fixtureDir = createTranscriptFixture("openclaw-chat-inject-registry-marker-");
|
||||
const updatedAt = Date.parse("2026-05-18T11:00:00.000Z");
|
||||
const appendedAt = Date.parse("2026-05-18T11:05:00.000Z");
|
||||
const storePath = path.join(path.dirname(mockState.transcriptPath), "sessions.json");
|
||||
fs.writeFileSync(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
main: {
|
||||
sessionId: mockState.sessionId,
|
||||
sessionFile: mockState.transcriptPath,
|
||||
updatedAt,
|
||||
status: "done",
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
const respond = vi.fn();
|
||||
const context = createChatContext();
|
||||
vi.useFakeTimers({ toFake: ["Date"] });
|
||||
vi.setSystemTime(appendedAt);
|
||||
try {
|
||||
await chatHandlers["chat.inject"]({
|
||||
params: {
|
||||
sessionKey: "main",
|
||||
message: "hello with registry marker",
|
||||
},
|
||||
respond,
|
||||
req: {} as never,
|
||||
client: null as never,
|
||||
isWebchatConnect: () => false,
|
||||
context: context as GatewayRequestContext,
|
||||
});
|
||||
|
||||
const response = lastRespondCall(respond);
|
||||
expect(response?.[0]).toBe(true);
|
||||
const store = JSON.parse(fs.readFileSync(storePath, "utf-8")) as Record<
|
||||
string,
|
||||
{ updatedAt?: number; status?: string }
|
||||
>;
|
||||
expect(store.main?.updatedAt).toBe(appendedAt);
|
||||
expect(store.main?.status).toBe("done");
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
fs.rmSync(fixtureDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("chat.inject scopes selected-agent global sessions before appending", async () => {
|
||||
createTranscriptFixture("openclaw-chat-inject-selected-global-");
|
||||
mockState.config = {
|
||||
|
||||
@@ -48,7 +48,7 @@ import { getReplyPayloadMetadata, type ReplyPayload } from "../../auto-reply/rep
|
||||
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
|
||||
import { stageSandboxMedia } from "../../auto-reply/reply/stage-sandbox-media.js";
|
||||
import type { MsgContext, TemplateContext } from "../../auto-reply/templating.js";
|
||||
import { resolveSessionFilePath } from "../../config/sessions.js";
|
||||
import { resolveSessionFilePath, updateSessionStoreEntry } from "../../config/sessions.js";
|
||||
import { resolveMirroredTranscriptText } from "../../config/sessions/transcript-mirror.js";
|
||||
import { CURRENT_SESSION_VERSION } from "../../config/sessions/version.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
@@ -1631,7 +1631,7 @@ async function appendAssistantTranscriptMessage(params: {
|
||||
}
|
||||
}
|
||||
|
||||
return await appendInjectedAssistantMessageToTranscript({
|
||||
const appended = await appendInjectedAssistantMessageToTranscript({
|
||||
transcriptPath,
|
||||
sessionKey: params.sessionKey,
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
@@ -1643,6 +1643,16 @@ async function appendAssistantTranscriptMessage(params: {
|
||||
ttsSupplement: params.ttsSupplement,
|
||||
config: params.cfg,
|
||||
});
|
||||
if (appended.ok && params.storePath) {
|
||||
const transcriptMarkerUpdatedAt = Date.now();
|
||||
await updateSessionStoreEntry({
|
||||
storePath: params.storePath,
|
||||
sessionKey: params.sessionKey,
|
||||
update: (current) =>
|
||||
current.sessionId === params.sessionId ? { updatedAt: transcriptMarkerUpdatedAt } : null,
|
||||
});
|
||||
}
|
||||
return appended;
|
||||
}
|
||||
|
||||
function collectSessionAbortPartials(params: {
|
||||
|
||||
Reference in New Issue
Block a user