From 71137cd7266b6f47bb563c3e22c4a42ff6118d13 Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Wed, 3 Jun 2026 06:48:48 -0700 Subject: [PATCH] clawdbot-d02.1.9.1.26: add transcript update identity contract --- .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- .../codex/src/app-server/transcript-mirror.ts | 1 + extensions/copilot/src/attempt.ts | 1 + .../copilot/src/dual-write-transcripts.ts | 8 +- .../manager-sync-ops.startup-catchup.test.ts | 73 +++++++++++++++++++ .../src/memory/manager-sync-ops.ts | 40 ++++++++-- .../telegram/src/bot-message-dispatch.ts | 1 + src/agents/command/attempt-execution.ts | 1 + .../tool-result-truncation.ts | 3 + .../transcript-rewrite.ts | 2 + src/config/sessions/session-accessor.test.ts | 7 ++ src/config/sessions/session-accessor.ts | 32 +++++++- src/config/sessions/transcript.test.ts | 2 + src/config/sessions/transcript.ts | 2 + src/gateway/server-session-events.ts | 9 ++- src/gateway/session-message-events.test.ts | 39 ++++++++++ src/gateway/sessions-history-http.test.ts | 29 ++++++++ src/gateway/sessions-history-http.ts | 8 +- src/sessions/transcript-events.test.ts | 69 ++++++++++++++++++ src/sessions/transcript-events.ts | 68 ++++++++++++++--- src/sessions/user-turn-transcript.ts | 1 + 21 files changed, 372 insertions(+), 28 deletions(-) diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index f306f9e64201..9c928e878f66 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -944ca9fb6d46b8a3fa5582fc276478adfecdb3125d8854523492a7ac155ee318 plugin-sdk-api-baseline.json -4b79e9cdc7feadb8bcaa89c31160e445141894556ec03652232c3e6a1948ce50 plugin-sdk-api-baseline.jsonl +d522f8860146243ff1e7fd0e4b7b89bce6be0c78ab06c564d25c204bdb93287b plugin-sdk-api-baseline.json +62d3c6a2f7bdc01c196a970cc269bb83afac34db27be3d8951edb1bbbbff8eaf plugin-sdk-api-baseline.jsonl diff --git a/extensions/codex/src/app-server/transcript-mirror.ts b/extensions/codex/src/app-server/transcript-mirror.ts index e18f0e27d22c..7fe9d16006e1 100644 --- a/extensions/codex/src/app-server/transcript-mirror.ts +++ b/extensions/codex/src/app-server/transcript-mirror.ts @@ -358,6 +358,7 @@ export async function mirrorCodexAppServerTranscript(params: { emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), message: update.message, messageId: update.messageId, diff --git a/extensions/copilot/src/attempt.ts b/extensions/copilot/src/attempt.ts index d37dad636648..8cc08fb64348 100644 --- a/extensions/copilot/src/attempt.ts +++ b/extensions/copilot/src/attempt.ts @@ -589,6 +589,7 @@ export async function runCopilotAttempt( await dualWriteCopilotTranscriptBestEffort({ sessionFile: sessionFileForMirror, sessionKey: readString((input as { sessionKey?: unknown }).sessionKey), + sessionId: readString(input.sessionId), agentId: readString(input.agentId), messages: taggedMessages, idempotencyScope: sessionIdForScope ? `copilot:${sessionIdForScope}` : undefined, diff --git a/extensions/copilot/src/dual-write-transcripts.ts b/extensions/copilot/src/dual-write-transcripts.ts index 7c9d3e4957c1..eefefa676228 100755 --- a/extensions/copilot/src/dual-write-transcripts.ts +++ b/extensions/copilot/src/dual-write-transcripts.ts @@ -96,6 +96,7 @@ function buildMirrorDedupeIdentity(message: MirroredAgentMessage): string { export interface MirrorCopilotTranscriptParams { sessionFile: string; sessionKey?: string; + sessionId?: string; agentId?: string; messages: AgentMessage[]; /** @@ -168,7 +169,12 @@ export async function mirrorCopilotTranscript( } if (params.sessionKey) { - emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey }); + emitSessionTranscriptUpdate({ + sessionFile: params.sessionFile, + sessionKey: params.sessionKey, + ...(params.sessionId ? { sessionId: params.sessionId } : {}), + ...(params.agentId ? { agentId: params.agentId } : {}), + }); } else { emitSessionTranscriptUpdate(params.sessionFile); } diff --git a/extensions/memory-core/src/memory/manager-sync-ops.startup-catchup.test.ts b/extensions/memory-core/src/memory/manager-sync-ops.startup-catchup.test.ts index 7a7066e9c3fc..f3b593208f50 100644 --- a/extensions/memory-core/src/memory/manager-sync-ops.startup-catchup.test.ts +++ b/extensions/memory-core/src/memory/manager-sync-ops.startup-catchup.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import type { DatabaseSync } from "node:sqlite"; +import { emitSessionTranscriptUpdate } from "openclaw/plugin-sdk/agent-harness-runtime"; import { resolveSessionTranscriptsDirForAgent, type OpenClawConfig, @@ -110,10 +111,23 @@ class SessionStartupCatchupHarness extends MemoryManagerSyncOps { return Array.from(this.sessionsDirtyFiles); } + getPendingSessionTargets(): MemorySyncParams["sessions"] { + return Array.from(this.sessionPendingTargets.values()); + } + isSessionsDirty(): boolean { return this.sessionsDirty; } + startTranscriptListener(): void { + this.ensureSessionListener(); + } + + stopTranscriptListener(): void { + this.sessionUnsubscribe?.(); + this.sessionUnsubscribe = null; + } + protected computeProviderKey(): string { return "test"; } @@ -155,6 +169,8 @@ describe("session startup catch-up", () => { }); afterEach(async () => { + vi.clearAllTimers(); + vi.useRealTimers(); vi.unstubAllEnvs(); await fs.rm(stateDir, { recursive: true, force: true }); }); @@ -249,4 +265,61 @@ describe("session startup catch-up", () => { expect(harness.indexedPaths).toEqual([]); }); + + it("queues transcript update identity without requiring a session file", async () => { + vi.useFakeTimers(); + const harness = new SessionStartupCatchupHarness([]); + harness.startTranscriptListener(); + + emitSessionTranscriptUpdate({ + target: { + agentId: "main", + sessionId: "thread", + sessionKey: "agent:main:thread", + targetKind: "runtime-session", + }, + }); + + expect(harness.getPendingSessionTargets()).toEqual([ + { agentId: "main", sessionId: "thread", sessionKey: "agent:main:thread" }, + ]); + harness.stopTranscriptListener(); + }); + + it("keeps canonical path transcript update compatibility", async () => { + vi.useFakeTimers(); + const session = await writeSessionFile("thread.jsonl"); + const harness = new SessionStartupCatchupHarness([]); + harness.startTranscriptListener(); + + emitSessionTranscriptUpdate({ + sessionFile: session.filePath, + sessionKey: "agent:main:thread", + }); + + expect(harness.getPendingSessionTargets()).toEqual([ + { agentId: "main", sessionId: "thread", sessionKey: "agent:main:thread" }, + ]); + harness.stopTranscriptListener(); + }); + + it("uses active-session-file update paths before target identity", async () => { + vi.useFakeTimers(); + const session = await writeSessionFile("thread.jsonl"); + const harness = new SessionStartupCatchupHarness([]); + harness.startTranscriptListener(); + + emitSessionTranscriptUpdate({ + sessionFile: session.filePath, + target: { + agentId: "main", + sessionId: "wrong-target", + sessionKey: "agent:main:wrong-target", + targetKind: "active-session-file", + }, + }); + + expect(harness.getPendingSessionTargets()).toEqual([{ agentId: "main", sessionId: "thread" }]); + harness.stopTranscriptListener(); + }); }); diff --git a/extensions/memory-core/src/memory/manager-sync-ops.ts b/extensions/memory-core/src/memory/manager-sync-ops.ts index 22838d970545..b29d33a069bf 100644 --- a/extensions/memory-core/src/memory/manager-sync-ops.ts +++ b/extensions/memory-core/src/memory/manager-sync-ops.ts @@ -780,15 +780,15 @@ export abstract class MemoryManagerSyncOps { if (this.closed) { return; } - const sessionFile = update.sessionFile; - if (!this.isSessionFileForAgent(sessionFile)) { - return; - } const target = this.resolveSessionTranscriptUpdateSyncTarget(update); if (target) { this.scheduleSessionDirty(target); return; } + const sessionFile = update.sessionFile; + if (!sessionFile || !this.isSessionFileForAgent(sessionFile)) { + return; + } this.scheduleSessionDirty(sessionFile); }); } @@ -1053,18 +1053,44 @@ export abstract class MemoryManagerSyncOps { private resolveSessionTranscriptUpdateSyncTarget(update: { agentId?: string; - sessionFile: string; + sessionFile?: string; sessionKey?: string; + target?: { + agentId: string; + sessionId: string; + sessionKey: string; + targetKind?: string; + }; }): MemorySessionSyncTarget | null { + if (update.sessionFile && isSessionArchiveArtifactName(path.basename(update.sessionFile))) { + return null; + } + if (update.target && update.target.targetKind !== "active-session-file") { + const agentId = update.target.agentId.trim(); + const sessionId = update.target.sessionId.trim(); + const sessionKey = update.target.sessionKey.trim(); + if (!agentId || !sessionId || normalizeAgentId(agentId) !== normalizeAgentId(this.agentId)) { + return null; + } + return { + agentId, + sessionId, + ...(sessionKey ? { sessionKey } : {}), + }; + } + if (!update.sessionFile) { + return null; + } const parsed = parseCanonicalSessionSyncTargetFromPath(update.sessionFile); - if (!parsed || isSessionArchiveArtifactName(path.basename(update.sessionFile))) { + if (!parsed) { return null; } const agentId = update.agentId?.trim() || parsed.agentId; if (!agentId || normalizeAgentId(agentId) !== normalizeAgentId(this.agentId)) { return null; } - const sessionKey = update.sessionKey?.trim(); + const sessionKey = + update.target?.targetKind === "active-session-file" ? undefined : update.sessionKey?.trim(); return { agentId, sessionId: parsed.sessionId, diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index b5ad1dac8aba..73e88bc1edf3 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -383,6 +383,7 @@ async function mirrorTelegramAssistantReplyToTranscript(params: { emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey, + sessionId: sessionEntry.sessionId, agentId: params.route.agentId, message: appendedMessage, messageId, diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index d0cf8104b1f5..0d7319ae0c71 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -299,6 +299,7 @@ async function persistTextTurnTranscript( emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey, + sessionId: params.sessionId, agentId: params.sessionAgentId, }); return sessionEntry; diff --git a/src/agents/embedded-agent-runner/tool-result-truncation.ts b/src/agents/embedded-agent-runner/tool-result-truncation.ts index e79922183126..b333b2f6f7af 100644 --- a/src/agents/embedded-agent-runner/tool-result-truncation.ts +++ b/src/agents/embedded-agent-runner/tool-result-truncation.ts @@ -711,6 +711,7 @@ function truncateOversizedToolResultsInExistingSessionManager(params: { emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey, + ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), }); } @@ -782,6 +783,7 @@ async function truncateOversizedToolResultsInTranscriptState(params: { emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey, + ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), }); } @@ -889,6 +891,7 @@ export async function truncateOversizedToolResultsInSession(params: { sessionFile, sessionId: params.sessionId, sessionKey: params.sessionKey, + agentId: params.agentId, }); } catch (err) { const errMsg = formatErrorMessage(err); diff --git a/src/agents/embedded-agent-runner/transcript-rewrite.ts b/src/agents/embedded-agent-runner/transcript-rewrite.ts index 1294bef7a37e..3f4a554ac6f8 100644 --- a/src/agents/embedded-agent-runner/transcript-rewrite.ts +++ b/src/agents/embedded-agent-runner/transcript-rewrite.ts @@ -406,6 +406,7 @@ export async function rewriteTranscriptEntriesInRuntimeTranscript(params: { emitSessionTranscriptUpdate({ sessionFile: target.sessionFile, sessionKey: target.sessionKey, + sessionId: target.sessionId, agentId: target.agentId, }); log.info( @@ -465,6 +466,7 @@ export async function rewriteTranscriptEntriesInSessionFile(params: { emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey, + ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), }); log.info( diff --git a/src/config/sessions/session-accessor.test.ts b/src/config/sessions/session-accessor.test.ts index 9e2cbff6b674..924601e54f00 100644 --- a/src/config/sessions/session-accessor.test.ts +++ b/src/config/sessions/session-accessor.test.ts @@ -296,8 +296,15 @@ describe("session accessor file-backed seam", () => { agentId: "main", message: appended.message, messageId: appended.messageId, + sessionId: scope.sessionId, sessionFile: transcriptPath, sessionKey: scope.sessionKey, + target: { + agentId: "main", + sessionId: scope.sessionId, + sessionKey: scope.sessionKey, + targetKind: "active-session-file", + }, }, ]); }); diff --git a/src/config/sessions/session-accessor.ts b/src/config/sessions/session-accessor.ts index ef3ae6f709a7..7aa2094559c0 100644 --- a/src/config/sessions/session-accessor.ts +++ b/src/config/sessions/session-accessor.ts @@ -2,7 +2,10 @@ import { randomUUID } from "node:crypto"; import path from "node:path"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; -import type { SessionTranscriptUpdate } from "../../sessions/transcript-events.js"; +import type { + SessionTranscriptUpdate, + SessionTranscriptUpdateTarget, +} from "../../sessions/transcript-events.js"; import { getRuntimeConfig } from "../io.js"; import type { OpenClawConfig } from "../types.openclaw.js"; import { resolveSessionTranscriptPathInDir, resolveStorePath } from "./paths.js"; @@ -263,6 +266,7 @@ export async function publishTranscriptUpdate( emitSessionTranscriptUpdate({ ...update, sessionFile: transcript.sessionFile, + ...(transcript.target ? { target: transcript.target } : {}), }); } @@ -373,17 +377,37 @@ function resolveAccessStorePath(scope: SessionAccessScope): string { }); } -async function resolveTranscriptAccess(scope: SessionTranscriptWriteScope): Promise<{ +type ResolvedTranscriptAccess = { sessionFile: string; -}> { + target?: SessionTranscriptUpdateTarget; +}; + +function projectTranscriptUpdateTarget( + target: SessionTranscriptRuntimeTarget, +): SessionTranscriptUpdateTarget { + return { + agentId: target.agentId, + sessionId: target.sessionId, + sessionKey: target.sessionKey, + targetKind: target.targetKind, + }; +} + +async function resolveTranscriptAccess( + scope: SessionTranscriptWriteScope, +): Promise { if (!scope.sessionId) { if (scope.sessionFile?.trim()) { return { sessionFile: scope.sessionFile }; } throw new Error(`Cannot resolve transcript scope without a session id: ${scope.sessionKey}`); } - return await resolveSessionTranscriptTarget({ + const target = await resolveSessionTranscriptTarget({ ...scope, sessionId: scope.sessionId, }); + return { + sessionFile: target.sessionFile, + target: projectTranscriptUpdateTarget(target), + }; } diff --git a/src/config/sessions/transcript.test.ts b/src/config/sessions/transcript.test.ts index 3465da61b270..e0a5e5f326bf 100644 --- a/src/config/sessions/transcript.test.ts +++ b/src/config/sessions/transcript.test.ts @@ -312,6 +312,7 @@ describe("appendAssistantMessageToSessionTranscript", () => { | undefined; expect(event?.sessionFile).toBe(sessionFile); expect(event?.sessionKey).toBe(sessionKey); + expect(event?.sessionId).toBe(sessionId); expect(event?.messageId).toBeTypeOf("string"); expect(message?.role).toBe("assistant"); expect(message?.provider).toBe("openclaw"); @@ -826,6 +827,7 @@ describe("appendAssistantMessageToSessionTranscript", () => { expect(emitSpy).toHaveBeenCalledWith({ sessionFile: result.sessionFile, sessionKey, + sessionId, }); } emitSpy.mockRestore(); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index 0655c4bc2ac3..3514a98330e2 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -342,6 +342,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { emitSessionTranscriptUpdate({ sessionFile, sessionKey, + sessionId: entry.sessionId, ...(params.agentId ? { agentId: params.agentId } : {}), message: appendedMessage, messageId, @@ -351,6 +352,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { emitSessionTranscriptUpdate({ sessionFile, sessionKey, + sessionId: entry.sessionId, ...(params.agentId ? { agentId: params.agentId } : {}), }); break; diff --git a/src/gateway/server-session-events.ts b/src/gateway/server-session-events.ts index d3c7268f4f3c..4f87911738c4 100644 --- a/src/gateway/server-session-events.ts +++ b/src/gateway/server-session-events.ts @@ -132,17 +132,20 @@ async function handleTranscriptUpdateBroadcast( }, update: SessionTranscriptUpdate, ): Promise { - const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile); + const sessionKey = + update.target?.sessionKey ?? + update.sessionKey ?? + (update.sessionFile ? resolveSessionKeyForTranscriptFile(update.sessionFile) : undefined); if (!sessionKey || update.message === undefined) { return; } - const effectiveAgentId = update.agentId; + const effectiveAgentId = update.target?.agentId ?? update.agentId; const defaultGlobalAgentId = sessionKey === "global" ? normalizeAgentId(resolveDefaultAgentId(getRuntimeConfig())) : undefined; const visibleAgentId = - update.agentId ?? + effectiveAgentId ?? (effectiveAgentId && effectiveAgentId !== defaultGlobalAgentId ? effectiveAgentId : undefined); const connIds = new Set(); for (const connId of params.sessionEventSubscribers.getAll()) { diff --git a/src/gateway/session-message-events.test.ts b/src/gateway/session-message-events.test.ts index f0a62725da46..2dad1c10c6ce 100644 --- a/src/gateway/session-message-events.test.ts +++ b/src/gateway/session-message-events.test.ts @@ -517,6 +517,45 @@ describe("session.message websocket events", () => { }); }); + test("broadcasts identity-only transcript updates to live session listeners", async () => { + const storePath = await createSessionStoreFile(); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + storePath, + }); + + await withOperatorSessionSubscriber(async (ws) => { + const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:main"); + emitSessionTranscriptUpdate({ + target: { + agentId: "main", + sessionId: "sess-main", + sessionKey: "agent:main:main", + targetKind: "runtime-session", + }, + message: { + role: "assistant", + content: [{ type: "text", text: "identity frame" }], + timestamp: Date.now(), + }, + messageId: "msg-identity-frame", + messageSeq: 1, + }); + + const messageEvent = await messageEventPromise; + expectRecordFields(messageEvent.payload, { + sessionKey: "agent:main:main", + messageId: "msg-identity-frame", + messageSeq: 1, + }); + }); + }); + test("includes live usage metadata on session.message transcript events", async () => { const storePath = await createSessionStoreFile(); await writeSessionStore({ diff --git a/src/gateway/sessions-history-http.test.ts b/src/gateway/sessions-history-http.test.ts index f8a57d376b36..e1ff8b12907e 100644 --- a/src/gateway/sessions-history-http.test.ts +++ b/src/gateway/sessions-history-http.test.ts @@ -599,6 +599,35 @@ describe("session history HTTP endpoints", () => { }); }); + test("streams identity-only transcript updates over SSE", async () => { + await seedSession({ text: "first message" }); + + await withGatewayHarness(async (harness) => { + const stream = await openSessionHistorySse(harness.port, "agent:main:main"); + await expectHistoryEventTexts(stream, ["first message"]); + + emitSessionTranscriptUpdate({ + target: { + agentId: "main", + sessionId: "sess-main", + sessionKey: "agent:main:main", + targetKind: "runtime-session", + }, + message: makeTranscriptAssistantMessage({ text: "identity second message" }), + messageId: "msg-identity-second", + messageSeq: 2, + }); + + await expectMessageEventMatch(stream, { + text: "identity second message", + seq: 2, + id: "msg-identity-second", + }); + + await stream.reader.cancel(); + }); + }); + test("refreshes SSE history for non-monotonic carried sequence", async () => { const storePath = await createSessionStoreFile(); const transcriptPath = path.join(path.dirname(storePath), "sess-main.jsonl"); diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index e5f2176b4418..62d045baa0de 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -8,6 +8,7 @@ import { import { getRuntimeConfig } from "../config/io.js"; import { loadSessionStore } from "../config/sessions.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { normalizeAgentId } from "../routing/session-key.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import type { AuthRateLimiter } from "./auth-rate-limit.js"; import type { ResolvedGatewayAuth } from "./auth.js"; @@ -306,8 +307,11 @@ export async function handleSessionHistoryHttpRequest( if (!entry?.sessionId) { return; } - const updatePath = canonicalizePath(update.sessionFile); - if (!updatePath || !transcriptCandidates.has(updatePath)) { + const updateMatchesIdentity = + update.target?.sessionId === entry.sessionId && + normalizeAgentId(update.target.agentId) === normalizeAgentId(target.agentId); + const updatePath = update.sessionFile ? canonicalizePath(update.sessionFile) : undefined; + if (!updateMatchesIdentity && (!updatePath || !transcriptCandidates.has(updatePath))) { return; } queueStreamWork(async () => { diff --git a/src/sessions/transcript-events.test.ts b/src/sessions/transcript-events.test.ts index 753a2d920d3e..52fd7b71e036 100644 --- a/src/sessions/transcript-events.test.ts +++ b/src/sessions/transcript-events.test.ts @@ -43,6 +43,75 @@ describe("transcript events", () => { }); }); + it("emits storage-neutral identity updates without session files", () => { + const listener = vi.fn(); + cleanup.push(onSessionTranscriptUpdate(listener)); + + emitSessionTranscriptUpdate({ + target: { + agentId: " main ", + sessionId: " sess-1 ", + sessionKey: " agent:main:main ", + targetKind: "runtime-session", + }, + messageId: " msg-1 ", + }); + + expect(listener).toHaveBeenCalledWith({ + target: { + agentId: "main", + sessionId: "sess-1", + sessionKey: "agent:main:main", + targetKind: "runtime-session", + }, + agentId: "main", + sessionId: "sess-1", + sessionKey: "agent:main:main", + messageId: "msg-1", + }); + }); + + it("derives target identity from top-level session metadata", () => { + const listener = vi.fn(); + cleanup.push(onSessionTranscriptUpdate(listener)); + + emitSessionTranscriptUpdate({ + sessionFile: "/tmp/session.jsonl", + sessionKey: "agent:main:main", + sessionId: "sess-1", + }); + + expect(listener).toHaveBeenCalledWith({ + sessionFile: "/tmp/session.jsonl", + target: { + agentId: "main", + sessionId: "sess-1", + sessionKey: "agent:main:main", + targetKind: "runtime-session", + }, + agentId: "main", + sessionId: "sess-1", + sessionKey: "agent:main:main", + }); + }); + + it("does not derive agent-scoped target identity from global session keys", () => { + const listener = vi.fn(); + cleanup.push(onSessionTranscriptUpdate(listener)); + + emitSessionTranscriptUpdate({ + sessionFile: "/tmp/session.jsonl", + sessionKey: "global", + sessionId: "global", + }); + + expect(listener).toHaveBeenCalledWith({ + sessionFile: "/tmp/session.jsonl", + sessionId: "global", + sessionKey: "global", + }); + }); + it("drops invalid message sequence values", () => { const listener = vi.fn(); cleanup.push(onSessionTranscriptUpdate(listener)); diff --git a/src/sessions/transcript-events.ts b/src/sessions/transcript-events.ts index 9f86a7da89fd..fffbdde0f04d 100644 --- a/src/sessions/transcript-events.ts +++ b/src/sessions/transcript-events.ts @@ -1,10 +1,21 @@ import { asPositiveSafeInteger } from "@openclaw/normalization-core/number-coercion"; import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce"; +import { parseAgentSessionKey } from "../routing/session-key.js"; + +export type SessionTranscriptUpdateTarget = { + agentId: string; + sessionId: string; + sessionKey: string; + targetKind: "active-session-file" | "runtime-session"; +}; export type SessionTranscriptUpdate = { - sessionFile: string; + /** @deprecated File-backed compatibility hint. Prefer `target` for identity. */ + sessionFile?: string; + target?: SessionTranscriptUpdateTarget; sessionKey?: string; agentId?: string; + sessionId?: string; message?: unknown; messageId?: string; messageSeq?: number; @@ -27,25 +38,29 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp ? { sessionFile: update } : { sessionFile: update.sessionFile, + target: update.target, sessionKey: update.sessionKey, agentId: update.agentId, + sessionId: update.sessionId, message: update.message, messageId: update.messageId, messageSeq: update.messageSeq, }; const trimmed = normalizeOptionalString(normalized.sessionFile); - if (!trimmed) { + const target = normalizeUpdateTarget(normalized); + if (!trimmed && !target) { return; } const messageSeq = asPositiveSafeInteger(normalized.messageSeq); + const sessionKey = normalizeOptionalString(normalized.sessionKey) ?? target?.sessionKey; + const agentId = normalizeOptionalString(normalized.agentId) ?? target?.agentId; + const sessionId = normalizeOptionalString(normalized.sessionId) ?? target?.sessionId; const nextUpdate: SessionTranscriptUpdate = { - sessionFile: trimmed, - ...(normalizeOptionalString(normalized.sessionKey) - ? { sessionKey: normalizeOptionalString(normalized.sessionKey) } - : {}), - ...(normalizeOptionalString(normalized.agentId) - ? { agentId: normalizeOptionalString(normalized.agentId) } - : {}), + ...(trimmed ? { sessionFile: trimmed } : {}), + ...(target ? { target } : {}), + ...(sessionKey ? { sessionKey } : {}), + ...(agentId ? { agentId } : {}), + ...(sessionId ? { sessionId } : {}), ...(normalized.message !== undefined ? { message: normalized.message } : {}), ...(normalizeOptionalString(normalized.messageId) ? { messageId: normalizeOptionalString(normalized.messageId) } @@ -60,3 +75,38 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp } } } + +function normalizeUpdateTarget(update: { + agentId?: string; + sessionId?: string; + sessionKey?: string; + target?: SessionTranscriptUpdate["target"]; +}): SessionTranscriptUpdateTarget | undefined { + const sessionKey = + normalizeOptionalString(update.target?.sessionKey) ?? + normalizeOptionalString(update.sessionKey); + const agentId = + normalizeOptionalString(update.target?.agentId) ?? + normalizeOptionalString(update.agentId) ?? + (sessionKey ? parseAgentSessionKey(sessionKey)?.agentId : undefined); + const sessionId = + normalizeOptionalString(update.target?.sessionId) ?? normalizeOptionalString(update.sessionId); + const targetKind = + normalizeTargetKind(update.target?.targetKind) ?? + (agentId && sessionId && sessionKey ? "runtime-session" : undefined); + if (!agentId || !sessionId || !sessionKey || !targetKind) { + return undefined; + } + return { + agentId, + sessionId, + sessionKey, + targetKind, + }; +} + +function normalizeTargetKind( + value: SessionTranscriptUpdateTarget["targetKind"] | undefined, +): SessionTranscriptUpdateTarget["targetKind"] | undefined { + return value === "active-session-file" || value === "runtime-session" ? value : undefined; +} diff --git a/src/sessions/user-turn-transcript.ts b/src/sessions/user-turn-transcript.ts index 9e693ac8d460..56d83896ba92 100644 --- a/src/sessions/user-turn-transcript.ts +++ b/src/sessions/user-turn-transcript.ts @@ -410,6 +410,7 @@ export async function appendUserTurnTranscriptMessage( emitSessionTranscriptUpdate({ sessionFile: params.transcriptPath, ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), message: appended.message, messageId: appended.messageId,