From c2492d589e4993f4e05e5bde9bcb883e977d8ed5 Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Fri, 5 Jun 2026 10:13:39 -0700 Subject: [PATCH] clawdbot-d02.1.9.1.26: preserve public transcript update compatibility --- .../codex/src/app-server/transcript-mirror.ts | 11 ++- .../copilot/src/dual-write-transcripts.ts | 11 ++- .../manager-sync-ops.startup-catchup.test.ts | 3 +- .../src/memory/manager-sync-ops.ts | 33 ++++++- .../telegram/src/bot-message-dispatch.ts | 7 +- src/agents/command/attempt-execution.ts | 7 +- .../tool-result-truncation.ts | 22 ++++- .../transcript-rewrite.ts | 18 +++- src/config/sessions/transcript.test.ts | 2 - src/config/sessions/transcript.ts | 22 ++++- src/gateway/server-runtime-subscriptions.ts | 4 +- src/gateway/server-session-events.ts | 6 +- src/gateway/session-message-events.test.ts | 7 +- .../sessions-history-http.revocation.test.ts | 2 +- src/gateway/sessions-history-http.test.ts | 7 +- src/gateway/sessions-history-http.ts | 11 +-- .../memory-core-host-engine-foundation.ts | 11 +++ src/sessions/transcript-events.test.ts | 41 ++++++-- src/sessions/transcript-events.ts | 94 +++++++++++++------ src/sessions/user-turn-transcript.ts | 11 ++- 20 files changed, 263 insertions(+), 67 deletions(-) diff --git a/extensions/codex/src/app-server/transcript-mirror.ts b/extensions/codex/src/app-server/transcript-mirror.ts index 7fe9d16006e1..93e5d9621d42 100644 --- a/extensions/codex/src/app-server/transcript-mirror.ts +++ b/extensions/codex/src/app-server/transcript-mirror.ts @@ -358,8 +358,17 @@ export async function mirrorCodexAppServerTranscript(params: { emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), - ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.sessionId && params.sessionKey && params.agentId + ? { + target: { + agentId: params.agentId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + targetKind: "active-session-file", + }, + } + : {}), message: update.message, messageId: update.messageId, messageSeq: update.messageSeq, diff --git a/extensions/copilot/src/dual-write-transcripts.ts b/extensions/copilot/src/dual-write-transcripts.ts index eefefa676228..0c95a4660422 100755 --- a/extensions/copilot/src/dual-write-transcripts.ts +++ b/extensions/copilot/src/dual-write-transcripts.ts @@ -172,8 +172,17 @@ export async function mirrorCopilotTranscript( emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey, - ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.sessionId && params.agentId + ? { + target: { + agentId: params.agentId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + targetKind: "active-session-file", + }, + } + : {}), }); } else { emitSessionTranscriptUpdate(params.sessionFile); diff --git a/extensions/memory-core/src/memory/manager-sync-ops.startup-catchup.test.ts b/extensions/memory-core/src/memory/manager-sync-ops.startup-catchup.test.ts index f3b593208f50..c8c01f24a0d6 100644 --- a/extensions/memory-core/src/memory/manager-sync-ops.startup-catchup.test.ts +++ b/extensions/memory-core/src/memory/manager-sync-ops.startup-catchup.test.ts @@ -14,6 +14,7 @@ import type { MemorySyncProgressUpdate, } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { emitInternalSessionTranscriptUpdate } from "../../../../src/sessions/transcript-events.js"; import { MemoryManagerSyncOps } from "./manager-sync-ops.js"; type MemoryIndexEntry = { @@ -271,7 +272,7 @@ describe("session startup catch-up", () => { const harness = new SessionStartupCatchupHarness([]); harness.startTranscriptListener(); - emitSessionTranscriptUpdate({ + emitInternalSessionTranscriptUpdate({ target: { agentId: "main", sessionId: "thread", diff --git a/extensions/memory-core/src/memory/manager-sync-ops.ts b/extensions/memory-core/src/memory/manager-sync-ops.ts index 779757cbcc8e..28b66ca94946 100644 --- a/extensions/memory-core/src/memory/manager-sync-ops.ts +++ b/extensions/memory-core/src/memory/manager-sync-ops.ts @@ -114,15 +114,46 @@ const IGNORED_MEMORY_WATCH_DIR_NAMES = new Set([ ]); const log = createSubsystemLogger("memory"); +const MEMORY_CORE_TRANSCRIPT_UPDATE_SUBSCRIBER_KEY = Symbol.for( + "openclaw.memoryCore.sessionTranscriptUpdateSubscriber", +); const TEST_MEMORY_WATCH_FACTORY_KEY = Symbol.for("openclaw.test.memoryWatchFactory"); const TEST_MEMORY_NATIVE_WATCH_FACTORY_KEY = Symbol.for("openclaw.test.memoryNativeWatchFactory"); +type MemorySessionTranscriptUpdate = { + agentId?: string; + sessionFile?: string; + sessionKey?: string; + target?: { + agentId: string; + sessionId: string; + sessionKey: string; + targetKind: "active-session-file" | "runtime-session"; + }; +}; + +type MemoryTranscriptUpdateSubscriber = ( + listener: (update: MemorySessionTranscriptUpdate) => void, +) => () => void; + type NativeMemoryWatchPair = { dir: string; main: fsSync.FSWatcher; parent: fsSync.FSWatcher | null; }; +function subscribeMemorySessionTranscriptUpdates( + listener: (update: MemorySessionTranscriptUpdate) => void, +): () => void { + const injected = (globalThis as Record)[ + MEMORY_CORE_TRANSCRIPT_UPDATE_SUBSCRIBER_KEY + ]; + if (typeof injected === "function") { + return (injected as MemoryTranscriptUpdateSubscriber)(listener); + } + return onSessionTranscriptUpdate(listener); +} + function resolveMemoryWatchFactory(): typeof chokidar.watch { if (process.env.VITEST === "true" || process.env.NODE_ENV === "test") { const override = (globalThis as Record)[TEST_MEMORY_WATCH_FACTORY_KEY]; @@ -776,7 +807,7 @@ export abstract class MemoryManagerSyncOps { if (!this.sources.has("sessions") || this.sessionUnsubscribe) { return; } - this.sessionUnsubscribe = onSessionTranscriptUpdate((update) => { + this.sessionUnsubscribe = subscribeMemorySessionTranscriptUpdates((update) => { if (this.closed) { return; } diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 73e88bc1edf3..98342cfe7cb0 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -383,8 +383,13 @@ async function mirrorTelegramAssistantReplyToTranscript(params: { emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey, - sessionId: sessionEntry.sessionId, agentId: params.route.agentId, + target: { + agentId: params.route.agentId, + sessionId: sessionEntry.sessionId, + sessionKey: params.sessionKey, + targetKind: "active-session-file", + }, message: appendedMessage, messageId, }); diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index 0d7319ae0c71..b6098ea5e7fb 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -299,8 +299,13 @@ async function persistTextTurnTranscript( emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey, - sessionId: params.sessionId, agentId: params.sessionAgentId, + target: { + agentId: params.sessionAgentId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + targetKind: "active-session-file", + }, }); return sessionEntry; } diff --git a/src/agents/embedded-agent-runner/tool-result-truncation.ts b/src/agents/embedded-agent-runner/tool-result-truncation.ts index b333b2f6f7af..4400f7beb7ff 100644 --- a/src/agents/embedded-agent-runner/tool-result-truncation.ts +++ b/src/agents/embedded-agent-runner/tool-result-truncation.ts @@ -711,8 +711,17 @@ function truncateOversizedToolResultsInExistingSessionManager(params: { emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey, - ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.sessionId && params.sessionKey && params.agentId + ? { + target: { + agentId: params.agentId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + targetKind: "active-session-file", + }, + } + : {}), }); } @@ -783,8 +792,17 @@ async function truncateOversizedToolResultsInTranscriptState(params: { emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey, - ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.sessionId && params.sessionKey && params.agentId + ? { + target: { + agentId: params.agentId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + targetKind: "active-session-file", + }, + } + : {}), }); } diff --git a/src/agents/embedded-agent-runner/transcript-rewrite.ts b/src/agents/embedded-agent-runner/transcript-rewrite.ts index 3f4a554ac6f8..48bd08886e02 100644 --- a/src/agents/embedded-agent-runner/transcript-rewrite.ts +++ b/src/agents/embedded-agent-runner/transcript-rewrite.ts @@ -406,8 +406,13 @@ export async function rewriteTranscriptEntriesInRuntimeTranscript(params: { emitSessionTranscriptUpdate({ sessionFile: target.sessionFile, sessionKey: target.sessionKey, - sessionId: target.sessionId, agentId: target.agentId, + target: { + agentId: target.agentId, + sessionId: target.sessionId, + sessionKey: target.sessionKey, + targetKind: "active-session-file", + }, }); log.info( `[transcript-rewrite] rewrote ${result.rewrittenEntries} entr` + @@ -466,8 +471,17 @@ export async function rewriteTranscriptEntriesInSessionFile(params: { emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey, - ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.sessionId && params.sessionKey && params.agentId + ? { + target: { + agentId: params.agentId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + targetKind: "active-session-file", + }, + } + : {}), }); log.info( `[transcript-rewrite] rewrote ${result.rewrittenEntries} entr` + diff --git a/src/config/sessions/transcript.test.ts b/src/config/sessions/transcript.test.ts index e0a5e5f326bf..3465da61b270 100644 --- a/src/config/sessions/transcript.test.ts +++ b/src/config/sessions/transcript.test.ts @@ -312,7 +312,6 @@ describe("appendAssistantMessageToSessionTranscript", () => { | undefined; expect(event?.sessionFile).toBe(sessionFile); expect(event?.sessionKey).toBe(sessionKey); - expect(event?.sessionId).toBe(sessionId); expect(event?.messageId).toBeTypeOf("string"); expect(message?.role).toBe("assistant"); expect(message?.provider).toBe("openclaw"); @@ -827,7 +826,6 @@ describe("appendAssistantMessageToSessionTranscript", () => { expect(emitSpy).toHaveBeenCalledWith({ sessionFile: result.sessionFile, sessionKey, - sessionId, }); } emitSpy.mockRestore(); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index 3514a98330e2..20ecabbdcbcf 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -342,8 +342,17 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { emitSessionTranscriptUpdate({ sessionFile, sessionKey, - sessionId: entry.sessionId, ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.agentId + ? { + target: { + agentId: params.agentId, + sessionId: entry.sessionId, + sessionKey, + targetKind: "active-session-file", + }, + } + : {}), message: appendedMessage, messageId, }); @@ -352,8 +361,17 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { emitSessionTranscriptUpdate({ sessionFile, sessionKey, - sessionId: entry.sessionId, ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.agentId + ? { + target: { + agentId: params.agentId, + sessionId: entry.sessionId, + sessionKey, + targetKind: "active-session-file", + }, + } + : {}), }); break; case "none": diff --git a/src/gateway/server-runtime-subscriptions.ts b/src/gateway/server-runtime-subscriptions.ts index 4f55f50d55ef..d71887fb74cd 100644 --- a/src/gateway/server-runtime-subscriptions.ts +++ b/src/gateway/server-runtime-subscriptions.ts @@ -1,7 +1,7 @@ import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js"; import { onHeartbeatEvent } from "../infra/heartbeat-events.js"; import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; -import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import { onInternalSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import type { ChatAbortControllerEntry } from "./chat-abort.js"; import type { ChatRunState, @@ -109,7 +109,7 @@ export function startGatewayEventSubscriptions(params: { params.broadcast("heartbeat", evt, { dropIfSlow: true }); }); - const transcriptUnsub = onSessionTranscriptUpdate((evt) => { + const transcriptUnsub = onInternalSessionTranscriptUpdate((evt) => { void getTranscriptUpdateHandler().then((handler) => handler(evt)); }); diff --git a/src/gateway/server-session-events.ts b/src/gateway/server-session-events.ts index 4f87911738c4..33797aef7630 100644 --- a/src/gateway/server-session-events.ts +++ b/src/gateway/server-session-events.ts @@ -4,7 +4,7 @@ import { resolveDefaultAgentId } from "../agents/agent-scope.js"; import { getRuntimeConfig } from "../config/io.js"; import { normalizeAgentId } from "../routing/session-key.js"; import type { SessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; -import type { SessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import type { InternalSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { projectChatDisplayMessage } from "./chat-display-projection.js"; import type { GatewayBroadcastToConnIdsFn } from "./server-broadcast-types.js"; import type { @@ -117,7 +117,7 @@ export function createTranscriptUpdateBroadcastHandler(params: { sessionMessageSubscribers: SessionMessageSubscribers; }) { let broadcastQueue = Promise.resolve(); - return (update: SessionTranscriptUpdate): void => { + return (update: InternalSessionTranscriptUpdate): void => { broadcastQueue = broadcastQueue .then(() => handleTranscriptUpdateBroadcast(params, update)) .catch(() => undefined); @@ -130,7 +130,7 @@ async function handleTranscriptUpdateBroadcast( sessionEventSubscribers: SessionEventSubscribers; sessionMessageSubscribers: SessionMessageSubscribers; }, - update: SessionTranscriptUpdate, + update: InternalSessionTranscriptUpdate, ): Promise { const sessionKey = update.target?.sessionKey ?? diff --git a/src/gateway/session-message-events.test.ts b/src/gateway/session-message-events.test.ts index 2dad1c10c6ce..fcfb1a3f9ce1 100644 --- a/src/gateway/session-message-events.test.ts +++ b/src/gateway/session-message-events.test.ts @@ -5,7 +5,10 @@ import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from "vite import { appendAssistantMessageToSessionTranscript } from "../config/sessions/transcript.js"; import { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; import * as transcriptEvents from "../sessions/transcript-events.js"; -import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import { + emitInternalSessionTranscriptUpdate, + emitSessionTranscriptUpdate, +} from "../sessions/transcript-events.js"; import { testState } from "./test-helpers.runtime-state.js"; import { connectOk, @@ -531,7 +534,7 @@ describe("session.message websocket events", () => { await withOperatorSessionSubscriber(async (ws) => { const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:main"); - emitSessionTranscriptUpdate({ + emitInternalSessionTranscriptUpdate({ target: { agentId: "main", sessionId: "sess-main", diff --git a/src/gateway/sessions-history-http.revocation.test.ts b/src/gateway/sessions-history-http.revocation.test.ts index e7acdb9054eb..382210fcfa2a 100644 --- a/src/gateway/sessions-history-http.revocation.test.ts +++ b/src/gateway/sessions-history-http.revocation.test.ts @@ -26,7 +26,7 @@ vi.mock("../config/sessions.js", () => ({ })); vi.mock("../sessions/transcript-events.js", () => ({ - onSessionTranscriptUpdate: (cb: typeof transcriptUpdateHandler) => { + onInternalSessionTranscriptUpdate: (cb: typeof transcriptUpdateHandler) => { transcriptUpdateHandler = cb; return () => { if (transcriptUpdateHandler === cb) { diff --git a/src/gateway/sessions-history-http.test.ts b/src/gateway/sessions-history-http.test.ts index e1ff8b12907e..a626d7d3eb77 100644 --- a/src/gateway/sessions-history-http.test.ts +++ b/src/gateway/sessions-history-http.test.ts @@ -7,7 +7,10 @@ import { appendAssistantMessageToSessionTranscript, appendExactAssistantMessageToSessionTranscript, } from "../config/sessions/transcript.js"; -import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import { + emitInternalSessionTranscriptUpdate, + emitSessionTranscriptUpdate, +} from "../sessions/transcript-events.js"; import { testState } from "./test-helpers.runtime-state.js"; import { connectReq, @@ -606,7 +609,7 @@ describe("session history HTTP endpoints", () => { const stream = await openSessionHistorySse(harness.port, "agent:main:main"); await expectHistoryEventTexts(stream, ["first message"]); - emitSessionTranscriptUpdate({ + emitInternalSessionTranscriptUpdate({ target: { agentId: "main", sessionId: "sess-main", diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index 62d045baa0de..4a3d57309518 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -9,7 +9,7 @@ import { getRuntimeConfig } from "../config/io.js"; import { loadSessionStore } from "../config/sessions.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { normalizeAgentId } from "../routing/session-key.js"; -import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import { onInternalSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import type { AuthRateLimiter } from "./auth-rate-limit.js"; import type { ResolvedGatewayAuth } from "./auth.js"; import { @@ -298,12 +298,11 @@ export async function handleSessionHistoryHttpRequest( }); }, 15_000); - const unsubscribe: (() => void) | undefined = onSessionTranscriptUpdate((update) => { + const unsubscribe: (() => void) | undefined = onInternalSessionTranscriptUpdate((update) => { // Filter to candidate sessions synchronously before enqueueing any async - // work. `onSessionTranscriptUpdate` is a global fan-out listener, so every - // transcript write in the gateway would otherwise append a Promise-chain - // entry capturing `update.message` to every open SSE stream's queue — - // O(streams × updates) for busy deployments. + // work. Transcript updates are a global fan-out signal, so every transcript + // write in the gateway would otherwise append a Promise-chain entry + // capturing `update.message` to every open SSE stream's queue. if (!entry?.sessionId) { return; } diff --git a/src/plugin-sdk/memory-core-host-engine-foundation.ts b/src/plugin-sdk/memory-core-host-engine-foundation.ts index acf881bbe666..2a4fef60c5f6 100644 --- a/src/plugin-sdk/memory-core-host-engine-foundation.ts +++ b/src/plugin-sdk/memory-core-host-engine-foundation.ts @@ -1,3 +1,5 @@ +import { onInternalSessionTranscriptUpdate } from "../sessions/transcript-events.js"; + export * from "../../packages/memory-host-sdk/src/engine-foundation.js"; export { resolveAgentContextLimits, @@ -39,6 +41,15 @@ export { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; export { resolveGlobalSingleton } from "../shared/global-singleton.js"; export { runTasksWithConcurrency } from "../utils/run-with-concurrency.js"; export { splitShellArgs } from "../utils/shell-argv.js"; + +const MEMORY_CORE_TRANSCRIPT_UPDATE_SUBSCRIBER_KEY = Symbol.for( + "openclaw.memoryCore.sessionTranscriptUpdateSubscriber", +); + +// Memory-core needs target-only internal updates before the SQLite flip, but +// the public SDK listener must stay file-backed. Keep this hook process-local. +(globalThis as Record)[MEMORY_CORE_TRANSCRIPT_UPDATE_SUBSCRIBER_KEY] ??= + onInternalSessionTranscriptUpdate; export { resolveUserPath, shortenHomeInString, diff --git a/src/sessions/transcript-events.test.ts b/src/sessions/transcript-events.test.ts index 699763754f19..d754e6d47f25 100644 --- a/src/sessions/transcript-events.test.ts +++ b/src/sessions/transcript-events.test.ts @@ -1,5 +1,11 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { emitSessionTranscriptUpdate, onSessionTranscriptUpdate } from "./transcript-events.js"; +import { + emitInternalSessionTranscriptUpdate, + emitSessionTranscriptUpdate, + onInternalSessionTranscriptUpdate, + onSessionTranscriptUpdate, + type SessionTranscriptUpdate, +} from "./transcript-events.js"; const cleanup: Array<() => void> = []; @@ -43,7 +49,7 @@ describe("transcript events", () => { }); }); - it("emits storage-neutral identity updates without session files", () => { + it("does not expose identity-only updates to public listeners", () => { const listener = vi.fn(); cleanup.push(onSessionTranscriptUpdate(listener)); @@ -55,6 +61,23 @@ describe("transcript events", () => { targetKind: "runtime-session", }, messageId: " msg-1 ", + } as unknown as SessionTranscriptUpdate); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("emits storage-neutral identity updates to internal listeners", () => { + const listener = vi.fn(); + cleanup.push(onInternalSessionTranscriptUpdate(listener)); + + emitInternalSessionTranscriptUpdate({ + target: { + agentId: " main ", + sessionId: " sess-1 ", + sessionKey: " agent:main:main ", + targetKind: "runtime-session", + }, + messageId: " msg-1 ", }); expect(listener).toHaveBeenCalledWith({ @@ -71,14 +94,18 @@ describe("transcript events", () => { }); }); - it("derives target identity from top-level session metadata", () => { + it("includes target identity on public file updates when provided", () => { const listener = vi.fn(); cleanup.push(onSessionTranscriptUpdate(listener)); emitSessionTranscriptUpdate({ sessionFile: "/tmp/session.jsonl", - sessionKey: "agent:main:main", - sessionId: "sess-1", + target: { + agentId: "main", + sessionId: "sess-1", + sessionKey: "agent:main:main", + targetKind: "active-session-file", + }, }); expect(listener).toHaveBeenCalledWith({ @@ -95,19 +122,17 @@ describe("transcript events", () => { }); }); - it("does not derive agent-scoped target identity from global session keys", () => { + it("keeps public global file updates on the compatibility shape", () => { const listener = vi.fn(); cleanup.push(onSessionTranscriptUpdate(listener)); emitSessionTranscriptUpdate({ sessionFile: "/tmp/session.jsonl", sessionKey: "global", - sessionId: "global", }); expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl", - sessionId: "global", sessionKey: "global", }); }); diff --git a/src/sessions/transcript-events.ts b/src/sessions/transcript-events.ts index b49ccab1e537..c00c612f8d51 100644 --- a/src/sessions/transcript-events.ts +++ b/src/sessions/transcript-events.ts @@ -9,21 +9,31 @@ export type SessionTranscriptUpdateTarget = { targetKind: "active-session-file" | "runtime-session"; }; -export type SessionTranscriptUpdate = { - /** @deprecated File-backed compatibility hint. Prefer `target` for identity. */ - sessionFile?: string; +type SessionTranscriptUpdateFields = { target?: SessionTranscriptUpdateTarget; sessionKey?: string; agentId?: string; + /** @deprecated Pre-SQLite compatibility mirror. Prefer `target.sessionId`. */ sessionId?: string; message?: unknown; messageId?: string; messageSeq?: number; }; +export type SessionTranscriptUpdate = SessionTranscriptUpdateFields & { + /** @deprecated File-backed compatibility hint. Prefer `target` for identity. */ + sessionFile: string; +}; + +export type InternalSessionTranscriptUpdate = SessionTranscriptUpdateFields & { + sessionFile?: string; +}; + type SessionTranscriptListener = (update: SessionTranscriptUpdate) => void; +type InternalSessionTranscriptListener = (update: InternalSessionTranscriptUpdate) => void; const SESSION_TRANSCRIPT_LISTENERS = new Set(); +const INTERNAL_SESSION_TRANSCRIPT_LISTENERS = new Set(); export function onSessionTranscriptUpdate(listener: SessionTranscriptListener): () => void { SESSION_TRANSCRIPT_LISTENERS.add(listener); @@ -32,7 +42,36 @@ export function onSessionTranscriptUpdate(listener: SessionTranscriptListener): }; } +export function onInternalSessionTranscriptUpdate( + listener: InternalSessionTranscriptListener, +): () => void { + INTERNAL_SESSION_TRANSCRIPT_LISTENERS.add(listener); + return () => { + INTERNAL_SESSION_TRANSCRIPT_LISTENERS.delete(listener); + }; +} + export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUpdate): void { + const nextUpdate = normalizeSessionTranscriptUpdate(update, { allowIdentityOnly: false }); + if (!nextUpdate?.sessionFile) { + return; + } + emitPublicSessionTranscriptUpdate(nextUpdate as SessionTranscriptUpdate); + emitInternalTranscriptUpdate(nextUpdate); +} + +export function emitInternalSessionTranscriptUpdate(update: InternalSessionTranscriptUpdate): void { + const nextUpdate = normalizeSessionTranscriptUpdate(update, { allowIdentityOnly: true }); + if (!nextUpdate) { + return; + } + emitInternalTranscriptUpdate(nextUpdate); +} + +function normalizeSessionTranscriptUpdate( + update: string | InternalSessionTranscriptUpdate, + options: { allowIdentityOnly: boolean }, +): InternalSessionTranscriptUpdate | undefined { const normalized = typeof update === "string" ? { sessionFile: update } @@ -47,15 +86,15 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp messageSeq: update.messageSeq, }; const trimmed = normalizeOptionalString(normalized.sessionFile); - const target = normalizeUpdateTarget(normalized); - if (!trimmed && !target) { - return; + const target = normalizeUpdateTarget(normalized.target); + if (!trimmed && (!options.allowIdentityOnly || !target)) { + return undefined; } const messageSeq = asPositiveSafeInteger(normalized.messageSeq); const sessionKey = normalizeOptionalString(normalized.sessionKey) ?? target?.sessionKey; const agentId = normalizeOptionalString(normalized.agentId) ?? target?.agentId; const sessionId = normalizeOptionalString(normalized.sessionId) ?? target?.sessionId; - const nextUpdate: SessionTranscriptUpdate = { + return { ...(trimmed ? { sessionFile: trimmed } : {}), ...(target ? { target } : {}), ...(sessionKey ? { sessionKey } : {}), @@ -67,6 +106,9 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp : {}), ...(messageSeq !== undefined ? { messageSeq } : {}), }; +} + +function emitPublicSessionTranscriptUpdate(nextUpdate: SessionTranscriptUpdate): void { for (const listener of SESSION_TRANSCRIPT_LISTENERS) { try { listener(nextUpdate); @@ -76,29 +118,25 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp } } -function normalizeUpdateTarget(update: { - agentId?: string; - sessionId?: string; - sessionKey?: string; - sessionFile?: string; - target?: SessionTranscriptUpdate["target"]; -}): SessionTranscriptUpdateTarget | undefined { - const sessionKey = - normalizeOptionalString(update.target?.sessionKey) ?? - normalizeOptionalString(update.sessionKey); +function emitInternalTranscriptUpdate(nextUpdate: InternalSessionTranscriptUpdate): void { + for (const listener of INTERNAL_SESSION_TRANSCRIPT_LISTENERS) { + try { + listener(nextUpdate); + } catch { + /* ignore */ + } + } +} + +function normalizeUpdateTarget( + target: InternalSessionTranscriptUpdate["target"], +): SessionTranscriptUpdateTarget | undefined { + const sessionKey = normalizeOptionalString(target?.sessionKey); const agentId = - normalizeOptionalString(update.target?.agentId) ?? - normalizeOptionalString(update.agentId) ?? + normalizeOptionalString(target?.agentId) ?? (sessionKey ? parseAgentSessionKey(sessionKey)?.agentId : undefined); - const sessionId = - normalizeOptionalString(update.target?.sessionId) ?? normalizeOptionalString(update.sessionId); - const targetKind = - normalizeTargetKind(update.target?.targetKind) ?? - (agentId && sessionId && sessionKey - ? normalizeOptionalString(update.sessionFile) - ? "active-session-file" - : "runtime-session" - : undefined); + const sessionId = normalizeOptionalString(target?.sessionId); + const targetKind = normalizeTargetKind(target?.targetKind); if (!agentId || !sessionId || !sessionKey || !targetKind) { return undefined; } diff --git a/src/sessions/user-turn-transcript.ts b/src/sessions/user-turn-transcript.ts index 56d83896ba92..9aa46f832e95 100644 --- a/src/sessions/user-turn-transcript.ts +++ b/src/sessions/user-turn-transcript.ts @@ -410,8 +410,17 @@ export async function appendUserTurnTranscriptMessage( emitSessionTranscriptUpdate({ sessionFile: params.transcriptPath, ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), - ...(params.sessionId ? { sessionId: params.sessionId } : {}), ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.sessionId && params.sessionKey && params.agentId + ? { + target: { + agentId: params.agentId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + targetKind: "active-session-file", + }, + } + : {}), message: appended.message, messageId: appended.messageId, });