clawdbot-d02.1.9.1.26: add transcript update identity contract

This commit is contained in:
Josh Lehman
2026-06-03 06:48:48 -07:00
parent c14d91e439
commit 71137cd726
21 changed files with 372 additions and 28 deletions

View File

@@ -1,2 +1,2 @@
944ca9fb6d46b8a3fa5582fc276478adfecdb3125d8854523492a7ac155ee318 plugin-sdk-api-baseline.json
4b79e9cdc7feadb8bcaa89c31160e445141894556ec03652232c3e6a1948ce50 plugin-sdk-api-baseline.jsonl
d522f8860146243ff1e7fd0e4b7b89bce6be0c78ab06c564d25c204bdb93287b plugin-sdk-api-baseline.json
62d3c6a2f7bdc01c196a970cc269bb83afac34db27be3d8951edb1bbbbff8eaf plugin-sdk-api-baseline.jsonl

View File

@@ -358,6 +358,7 @@ export async function mirrorCodexAppServerTranscript(params: {
emitSessionTranscriptUpdate({
sessionFile: params.sessionFile,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
...(params.sessionId ? { sessionId: params.sessionId } : {}),
...(params.agentId ? { agentId: params.agentId } : {}),
message: update.message,
messageId: update.messageId,

View File

@@ -589,6 +589,7 @@ export async function runCopilotAttempt(
await dualWriteCopilotTranscriptBestEffort({
sessionFile: sessionFileForMirror,
sessionKey: readString((input as { sessionKey?: unknown }).sessionKey),
sessionId: readString(input.sessionId),
agentId: readString(input.agentId),
messages: taggedMessages,
idempotencyScope: sessionIdForScope ? `copilot:${sessionIdForScope}` : undefined,

View File

@@ -96,6 +96,7 @@ function buildMirrorDedupeIdentity(message: MirroredAgentMessage): string {
export interface MirrorCopilotTranscriptParams {
sessionFile: string;
sessionKey?: string;
sessionId?: string;
agentId?: string;
messages: AgentMessage[];
/**
@@ -168,7 +169,12 @@ export async function mirrorCopilotTranscript(
}
if (params.sessionKey) {
emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey });
emitSessionTranscriptUpdate({
sessionFile: params.sessionFile,
sessionKey: params.sessionKey,
...(params.sessionId ? { sessionId: params.sessionId } : {}),
...(params.agentId ? { agentId: params.agentId } : {}),
});
} else {
emitSessionTranscriptUpdate(params.sessionFile);
}

View File

@@ -2,6 +2,7 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { DatabaseSync } from "node:sqlite";
import { emitSessionTranscriptUpdate } from "openclaw/plugin-sdk/agent-harness-runtime";
import {
resolveSessionTranscriptsDirForAgent,
type OpenClawConfig,
@@ -110,10 +111,23 @@ class SessionStartupCatchupHarness extends MemoryManagerSyncOps {
return Array.from(this.sessionsDirtyFiles);
}
getPendingSessionTargets(): MemorySyncParams["sessions"] {
return Array.from(this.sessionPendingTargets.values());
}
isSessionsDirty(): boolean {
return this.sessionsDirty;
}
startTranscriptListener(): void {
this.ensureSessionListener();
}
stopTranscriptListener(): void {
this.sessionUnsubscribe?.();
this.sessionUnsubscribe = null;
}
protected computeProviderKey(): string {
return "test";
}
@@ -155,6 +169,8 @@ describe("session startup catch-up", () => {
});
afterEach(async () => {
vi.clearAllTimers();
vi.useRealTimers();
vi.unstubAllEnvs();
await fs.rm(stateDir, { recursive: true, force: true });
});
@@ -249,4 +265,61 @@ describe("session startup catch-up", () => {
expect(harness.indexedPaths).toEqual([]);
});
it("queues transcript update identity without requiring a session file", async () => {
vi.useFakeTimers();
const harness = new SessionStartupCatchupHarness([]);
harness.startTranscriptListener();
emitSessionTranscriptUpdate({
target: {
agentId: "main",
sessionId: "thread",
sessionKey: "agent:main:thread",
targetKind: "runtime-session",
},
});
expect(harness.getPendingSessionTargets()).toEqual([
{ agentId: "main", sessionId: "thread", sessionKey: "agent:main:thread" },
]);
harness.stopTranscriptListener();
});
it("keeps canonical path transcript update compatibility", async () => {
vi.useFakeTimers();
const session = await writeSessionFile("thread.jsonl");
const harness = new SessionStartupCatchupHarness([]);
harness.startTranscriptListener();
emitSessionTranscriptUpdate({
sessionFile: session.filePath,
sessionKey: "agent:main:thread",
});
expect(harness.getPendingSessionTargets()).toEqual([
{ agentId: "main", sessionId: "thread", sessionKey: "agent:main:thread" },
]);
harness.stopTranscriptListener();
});
it("uses active-session-file update paths before target identity", async () => {
vi.useFakeTimers();
const session = await writeSessionFile("thread.jsonl");
const harness = new SessionStartupCatchupHarness([]);
harness.startTranscriptListener();
emitSessionTranscriptUpdate({
sessionFile: session.filePath,
target: {
agentId: "main",
sessionId: "wrong-target",
sessionKey: "agent:main:wrong-target",
targetKind: "active-session-file",
},
});
expect(harness.getPendingSessionTargets()).toEqual([{ agentId: "main", sessionId: "thread" }]);
harness.stopTranscriptListener();
});
});

View File

@@ -780,15 +780,15 @@ export abstract class MemoryManagerSyncOps {
if (this.closed) {
return;
}
const sessionFile = update.sessionFile;
if (!this.isSessionFileForAgent(sessionFile)) {
return;
}
const target = this.resolveSessionTranscriptUpdateSyncTarget(update);
if (target) {
this.scheduleSessionDirty(target);
return;
}
const sessionFile = update.sessionFile;
if (!sessionFile || !this.isSessionFileForAgent(sessionFile)) {
return;
}
this.scheduleSessionDirty(sessionFile);
});
}
@@ -1053,18 +1053,44 @@ export abstract class MemoryManagerSyncOps {
private resolveSessionTranscriptUpdateSyncTarget(update: {
agentId?: string;
sessionFile: string;
sessionFile?: string;
sessionKey?: string;
target?: {
agentId: string;
sessionId: string;
sessionKey: string;
targetKind?: string;
};
}): MemorySessionSyncTarget | null {
if (update.sessionFile && isSessionArchiveArtifactName(path.basename(update.sessionFile))) {
return null;
}
if (update.target && update.target.targetKind !== "active-session-file") {
const agentId = update.target.agentId.trim();
const sessionId = update.target.sessionId.trim();
const sessionKey = update.target.sessionKey.trim();
if (!agentId || !sessionId || normalizeAgentId(agentId) !== normalizeAgentId(this.agentId)) {
return null;
}
return {
agentId,
sessionId,
...(sessionKey ? { sessionKey } : {}),
};
}
if (!update.sessionFile) {
return null;
}
const parsed = parseCanonicalSessionSyncTargetFromPath(update.sessionFile);
if (!parsed || isSessionArchiveArtifactName(path.basename(update.sessionFile))) {
if (!parsed) {
return null;
}
const agentId = update.agentId?.trim() || parsed.agentId;
if (!agentId || normalizeAgentId(agentId) !== normalizeAgentId(this.agentId)) {
return null;
}
const sessionKey = update.sessionKey?.trim();
const sessionKey =
update.target?.targetKind === "active-session-file" ? undefined : update.sessionKey?.trim();
return {
agentId,
sessionId: parsed.sessionId,

View File

@@ -383,6 +383,7 @@ async function mirrorTelegramAssistantReplyToTranscript(params: {
emitSessionTranscriptUpdate({
sessionFile,
sessionKey: params.sessionKey,
sessionId: sessionEntry.sessionId,
agentId: params.route.agentId,
message: appendedMessage,
messageId,

View File

@@ -299,6 +299,7 @@ async function persistTextTurnTranscript(
emitSessionTranscriptUpdate({
sessionFile,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
agentId: params.sessionAgentId,
});
return sessionEntry;

View File

@@ -711,6 +711,7 @@ function truncateOversizedToolResultsInExistingSessionManager(params: {
emitSessionTranscriptUpdate({
sessionFile: params.sessionFile,
sessionKey: params.sessionKey,
...(params.sessionId ? { sessionId: params.sessionId } : {}),
...(params.agentId ? { agentId: params.agentId } : {}),
});
}
@@ -782,6 +783,7 @@ async function truncateOversizedToolResultsInTranscriptState(params: {
emitSessionTranscriptUpdate({
sessionFile: params.sessionFile,
sessionKey: params.sessionKey,
...(params.sessionId ? { sessionId: params.sessionId } : {}),
...(params.agentId ? { agentId: params.agentId } : {}),
});
}
@@ -889,6 +891,7 @@ export async function truncateOversizedToolResultsInSession(params: {
sessionFile,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
agentId: params.agentId,
});
} catch (err) {
const errMsg = formatErrorMessage(err);

View File

@@ -406,6 +406,7 @@ export async function rewriteTranscriptEntriesInRuntimeTranscript(params: {
emitSessionTranscriptUpdate({
sessionFile: target.sessionFile,
sessionKey: target.sessionKey,
sessionId: target.sessionId,
agentId: target.agentId,
});
log.info(
@@ -465,6 +466,7 @@ export async function rewriteTranscriptEntriesInSessionFile(params: {
emitSessionTranscriptUpdate({
sessionFile: params.sessionFile,
sessionKey: params.sessionKey,
...(params.sessionId ? { sessionId: params.sessionId } : {}),
...(params.agentId ? { agentId: params.agentId } : {}),
});
log.info(

View File

@@ -296,8 +296,15 @@ describe("session accessor file-backed seam", () => {
agentId: "main",
message: appended.message,
messageId: appended.messageId,
sessionId: scope.sessionId,
sessionFile: transcriptPath,
sessionKey: scope.sessionKey,
target: {
agentId: "main",
sessionId: scope.sessionId,
sessionKey: scope.sessionKey,
targetKind: "active-session-file",
},
},
]);
});

View File

@@ -2,7 +2,10 @@ import { randomUUID } from "node:crypto";
import path from "node:path";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
import type { SessionTranscriptUpdate } from "../../sessions/transcript-events.js";
import type {
SessionTranscriptUpdate,
SessionTranscriptUpdateTarget,
} from "../../sessions/transcript-events.js";
import { getRuntimeConfig } from "../io.js";
import type { OpenClawConfig } from "../types.openclaw.js";
import { resolveSessionTranscriptPathInDir, resolveStorePath } from "./paths.js";
@@ -263,6 +266,7 @@ export async function publishTranscriptUpdate(
emitSessionTranscriptUpdate({
...update,
sessionFile: transcript.sessionFile,
...(transcript.target ? { target: transcript.target } : {}),
});
}
@@ -373,17 +377,37 @@ function resolveAccessStorePath(scope: SessionAccessScope): string {
});
}
async function resolveTranscriptAccess(scope: SessionTranscriptWriteScope): Promise<{
type ResolvedTranscriptAccess = {
sessionFile: string;
}> {
target?: SessionTranscriptUpdateTarget;
};
function projectTranscriptUpdateTarget(
target: SessionTranscriptRuntimeTarget,
): SessionTranscriptUpdateTarget {
return {
agentId: target.agentId,
sessionId: target.sessionId,
sessionKey: target.sessionKey,
targetKind: target.targetKind,
};
}
async function resolveTranscriptAccess(
scope: SessionTranscriptWriteScope,
): Promise<ResolvedTranscriptAccess> {
if (!scope.sessionId) {
if (scope.sessionFile?.trim()) {
return { sessionFile: scope.sessionFile };
}
throw new Error(`Cannot resolve transcript scope without a session id: ${scope.sessionKey}`);
}
return await resolveSessionTranscriptTarget({
const target = await resolveSessionTranscriptTarget({
...scope,
sessionId: scope.sessionId,
});
return {
sessionFile: target.sessionFile,
target: projectTranscriptUpdateTarget(target),
};
}

View File

@@ -312,6 +312,7 @@ describe("appendAssistantMessageToSessionTranscript", () => {
| undefined;
expect(event?.sessionFile).toBe(sessionFile);
expect(event?.sessionKey).toBe(sessionKey);
expect(event?.sessionId).toBe(sessionId);
expect(event?.messageId).toBeTypeOf("string");
expect(message?.role).toBe("assistant");
expect(message?.provider).toBe("openclaw");
@@ -826,6 +827,7 @@ describe("appendAssistantMessageToSessionTranscript", () => {
expect(emitSpy).toHaveBeenCalledWith({
sessionFile: result.sessionFile,
sessionKey,
sessionId,
});
}
emitSpy.mockRestore();

View File

@@ -342,6 +342,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
emitSessionTranscriptUpdate({
sessionFile,
sessionKey,
sessionId: entry.sessionId,
...(params.agentId ? { agentId: params.agentId } : {}),
message: appendedMessage,
messageId,
@@ -351,6 +352,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
emitSessionTranscriptUpdate({
sessionFile,
sessionKey,
sessionId: entry.sessionId,
...(params.agentId ? { agentId: params.agentId } : {}),
});
break;

View File

@@ -132,17 +132,20 @@ async function handleTranscriptUpdateBroadcast(
},
update: SessionTranscriptUpdate,
): Promise<void> {
const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile);
const sessionKey =
update.target?.sessionKey ??
update.sessionKey ??
(update.sessionFile ? resolveSessionKeyForTranscriptFile(update.sessionFile) : undefined);
if (!sessionKey || update.message === undefined) {
return;
}
const effectiveAgentId = update.agentId;
const effectiveAgentId = update.target?.agentId ?? update.agentId;
const defaultGlobalAgentId =
sessionKey === "global"
? normalizeAgentId(resolveDefaultAgentId(getRuntimeConfig()))
: undefined;
const visibleAgentId =
update.agentId ??
effectiveAgentId ??
(effectiveAgentId && effectiveAgentId !== defaultGlobalAgentId ? effectiveAgentId : undefined);
const connIds = new Set<string>();
for (const connId of params.sessionEventSubscribers.getAll()) {

View File

@@ -517,6 +517,45 @@ describe("session.message websocket events", () => {
});
});
test("broadcasts identity-only transcript updates to live session listeners", async () => {
const storePath = await createSessionStoreFile();
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
},
},
storePath,
});
await withOperatorSessionSubscriber(async (ws) => {
const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:main");
emitSessionTranscriptUpdate({
target: {
agentId: "main",
sessionId: "sess-main",
sessionKey: "agent:main:main",
targetKind: "runtime-session",
},
message: {
role: "assistant",
content: [{ type: "text", text: "identity frame" }],
timestamp: Date.now(),
},
messageId: "msg-identity-frame",
messageSeq: 1,
});
const messageEvent = await messageEventPromise;
expectRecordFields(messageEvent.payload, {
sessionKey: "agent:main:main",
messageId: "msg-identity-frame",
messageSeq: 1,
});
});
});
test("includes live usage metadata on session.message transcript events", async () => {
const storePath = await createSessionStoreFile();
await writeSessionStore({

View File

@@ -599,6 +599,35 @@ describe("session history HTTP endpoints", () => {
});
});
test("streams identity-only transcript updates over SSE", async () => {
await seedSession({ text: "first message" });
await withGatewayHarness(async (harness) => {
const stream = await openSessionHistorySse(harness.port, "agent:main:main");
await expectHistoryEventTexts(stream, ["first message"]);
emitSessionTranscriptUpdate({
target: {
agentId: "main",
sessionId: "sess-main",
sessionKey: "agent:main:main",
targetKind: "runtime-session",
},
message: makeTranscriptAssistantMessage({ text: "identity second message" }),
messageId: "msg-identity-second",
messageSeq: 2,
});
await expectMessageEventMatch(stream, {
text: "identity second message",
seq: 2,
id: "msg-identity-second",
});
await stream.reader.cancel();
});
});
test("refreshes SSE history for non-monotonic carried sequence", async () => {
const storePath = await createSessionStoreFile();
const transcriptPath = path.join(path.dirname(storePath), "sess-main.jsonl");

View File

@@ -8,6 +8,7 @@ import {
import { getRuntimeConfig } from "../config/io.js";
import { loadSessionStore } from "../config/sessions.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { normalizeAgentId } from "../routing/session-key.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import type { AuthRateLimiter } from "./auth-rate-limit.js";
import type { ResolvedGatewayAuth } from "./auth.js";
@@ -306,8 +307,11 @@ export async function handleSessionHistoryHttpRequest(
if (!entry?.sessionId) {
return;
}
const updatePath = canonicalizePath(update.sessionFile);
if (!updatePath || !transcriptCandidates.has(updatePath)) {
const updateMatchesIdentity =
update.target?.sessionId === entry.sessionId &&
normalizeAgentId(update.target.agentId) === normalizeAgentId(target.agentId);
const updatePath = update.sessionFile ? canonicalizePath(update.sessionFile) : undefined;
if (!updateMatchesIdentity && (!updatePath || !transcriptCandidates.has(updatePath))) {
return;
}
queueStreamWork(async () => {

View File

@@ -43,6 +43,75 @@ describe("transcript events", () => {
});
});
it("emits storage-neutral identity updates without session files", () => {
const listener = vi.fn();
cleanup.push(onSessionTranscriptUpdate(listener));
emitSessionTranscriptUpdate({
target: {
agentId: " main ",
sessionId: " sess-1 ",
sessionKey: " agent:main:main ",
targetKind: "runtime-session",
},
messageId: " msg-1 ",
});
expect(listener).toHaveBeenCalledWith({
target: {
agentId: "main",
sessionId: "sess-1",
sessionKey: "agent:main:main",
targetKind: "runtime-session",
},
agentId: "main",
sessionId: "sess-1",
sessionKey: "agent:main:main",
messageId: "msg-1",
});
});
it("derives target identity from top-level session metadata", () => {
const listener = vi.fn();
cleanup.push(onSessionTranscriptUpdate(listener));
emitSessionTranscriptUpdate({
sessionFile: "/tmp/session.jsonl",
sessionKey: "agent:main:main",
sessionId: "sess-1",
});
expect(listener).toHaveBeenCalledWith({
sessionFile: "/tmp/session.jsonl",
target: {
agentId: "main",
sessionId: "sess-1",
sessionKey: "agent:main:main",
targetKind: "runtime-session",
},
agentId: "main",
sessionId: "sess-1",
sessionKey: "agent:main:main",
});
});
it("does not derive agent-scoped target identity from global session keys", () => {
const listener = vi.fn();
cleanup.push(onSessionTranscriptUpdate(listener));
emitSessionTranscriptUpdate({
sessionFile: "/tmp/session.jsonl",
sessionKey: "global",
sessionId: "global",
});
expect(listener).toHaveBeenCalledWith({
sessionFile: "/tmp/session.jsonl",
sessionId: "global",
sessionKey: "global",
});
});
it("drops invalid message sequence values", () => {
const listener = vi.fn();
cleanup.push(onSessionTranscriptUpdate(listener));

View File

@@ -1,10 +1,21 @@
import { asPositiveSafeInteger } from "@openclaw/normalization-core/number-coercion";
import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce";
import { parseAgentSessionKey } from "../routing/session-key.js";
export type SessionTranscriptUpdateTarget = {
agentId: string;
sessionId: string;
sessionKey: string;
targetKind: "active-session-file" | "runtime-session";
};
export type SessionTranscriptUpdate = {
sessionFile: string;
/** @deprecated File-backed compatibility hint. Prefer `target` for identity. */
sessionFile?: string;
target?: SessionTranscriptUpdateTarget;
sessionKey?: string;
agentId?: string;
sessionId?: string;
message?: unknown;
messageId?: string;
messageSeq?: number;
@@ -27,25 +38,29 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp
? { sessionFile: update }
: {
sessionFile: update.sessionFile,
target: update.target,
sessionKey: update.sessionKey,
agentId: update.agentId,
sessionId: update.sessionId,
message: update.message,
messageId: update.messageId,
messageSeq: update.messageSeq,
};
const trimmed = normalizeOptionalString(normalized.sessionFile);
if (!trimmed) {
const target = normalizeUpdateTarget(normalized);
if (!trimmed && !target) {
return;
}
const messageSeq = asPositiveSafeInteger(normalized.messageSeq);
const sessionKey = normalizeOptionalString(normalized.sessionKey) ?? target?.sessionKey;
const agentId = normalizeOptionalString(normalized.agentId) ?? target?.agentId;
const sessionId = normalizeOptionalString(normalized.sessionId) ?? target?.sessionId;
const nextUpdate: SessionTranscriptUpdate = {
sessionFile: trimmed,
...(normalizeOptionalString(normalized.sessionKey)
? { sessionKey: normalizeOptionalString(normalized.sessionKey) }
: {}),
...(normalizeOptionalString(normalized.agentId)
? { agentId: normalizeOptionalString(normalized.agentId) }
: {}),
...(trimmed ? { sessionFile: trimmed } : {}),
...(target ? { target } : {}),
...(sessionKey ? { sessionKey } : {}),
...(agentId ? { agentId } : {}),
...(sessionId ? { sessionId } : {}),
...(normalized.message !== undefined ? { message: normalized.message } : {}),
...(normalizeOptionalString(normalized.messageId)
? { messageId: normalizeOptionalString(normalized.messageId) }
@@ -60,3 +75,38 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp
}
}
}
function normalizeUpdateTarget(update: {
agentId?: string;
sessionId?: string;
sessionKey?: string;
target?: SessionTranscriptUpdate["target"];
}): SessionTranscriptUpdateTarget | undefined {
const sessionKey =
normalizeOptionalString(update.target?.sessionKey) ??
normalizeOptionalString(update.sessionKey);
const agentId =
normalizeOptionalString(update.target?.agentId) ??
normalizeOptionalString(update.agentId) ??
(sessionKey ? parseAgentSessionKey(sessionKey)?.agentId : undefined);
const sessionId =
normalizeOptionalString(update.target?.sessionId) ?? normalizeOptionalString(update.sessionId);
const targetKind =
normalizeTargetKind(update.target?.targetKind) ??
(agentId && sessionId && sessionKey ? "runtime-session" : undefined);
if (!agentId || !sessionId || !sessionKey || !targetKind) {
return undefined;
}
return {
agentId,
sessionId,
sessionKey,
targetKind,
};
}
function normalizeTargetKind(
value: SessionTranscriptUpdateTarget["targetKind"] | undefined,
): SessionTranscriptUpdateTarget["targetKind"] | undefined {
return value === "active-session-file" || value === "runtime-session" ? value : undefined;
}

View File

@@ -410,6 +410,7 @@ export async function appendUserTurnTranscriptMessage(
emitSessionTranscriptUpdate({
sessionFile: params.transcriptPath,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
...(params.sessionId ? { sessionId: params.sessionId } : {}),
...(params.agentId ? { agentId: params.agentId } : {}),
message: appended.message,
messageId: appended.messageId,