mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
clawdbot-d02.1.9.1.26: preserve public transcript update compatibility
This commit is contained in:
@@ -358,8 +358,17 @@ export async function mirrorCodexAppServerTranscript(params: {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: params.sessionFile,
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.sessionId && params.sessionKey && params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
message: update.message,
|
||||
messageId: update.messageId,
|
||||
messageSeq: update.messageSeq,
|
||||
|
||||
@@ -172,8 +172,17 @@ export async function mirrorCopilotTranscript(
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.sessionId && params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
} else {
|
||||
emitSessionTranscriptUpdate(params.sessionFile);
|
||||
|
||||
@@ -14,6 +14,7 @@ import type {
|
||||
MemorySyncProgressUpdate,
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { emitInternalSessionTranscriptUpdate } from "../../../../src/sessions/transcript-events.js";
|
||||
import { MemoryManagerSyncOps } from "./manager-sync-ops.js";
|
||||
|
||||
type MemoryIndexEntry = {
|
||||
@@ -271,7 +272,7 @@ describe("session startup catch-up", () => {
|
||||
const harness = new SessionStartupCatchupHarness([]);
|
||||
harness.startTranscriptListener();
|
||||
|
||||
emitSessionTranscriptUpdate({
|
||||
emitInternalSessionTranscriptUpdate({
|
||||
target: {
|
||||
agentId: "main",
|
||||
sessionId: "thread",
|
||||
|
||||
@@ -114,15 +114,46 @@ const IGNORED_MEMORY_WATCH_DIR_NAMES = new Set([
|
||||
]);
|
||||
|
||||
const log = createSubsystemLogger("memory");
|
||||
const MEMORY_CORE_TRANSCRIPT_UPDATE_SUBSCRIBER_KEY = Symbol.for(
|
||||
"openclaw.memoryCore.sessionTranscriptUpdateSubscriber",
|
||||
);
|
||||
const TEST_MEMORY_WATCH_FACTORY_KEY = Symbol.for("openclaw.test.memoryWatchFactory");
|
||||
const TEST_MEMORY_NATIVE_WATCH_FACTORY_KEY = Symbol.for("openclaw.test.memoryNativeWatchFactory");
|
||||
|
||||
type MemorySessionTranscriptUpdate = {
|
||||
agentId?: string;
|
||||
sessionFile?: string;
|
||||
sessionKey?: string;
|
||||
target?: {
|
||||
agentId: string;
|
||||
sessionId: string;
|
||||
sessionKey: string;
|
||||
targetKind: "active-session-file" | "runtime-session";
|
||||
};
|
||||
};
|
||||
|
||||
type MemoryTranscriptUpdateSubscriber = (
|
||||
listener: (update: MemorySessionTranscriptUpdate) => void,
|
||||
) => () => void;
|
||||
|
||||
type NativeMemoryWatchPair = {
|
||||
dir: string;
|
||||
main: fsSync.FSWatcher;
|
||||
parent: fsSync.FSWatcher | null;
|
||||
};
|
||||
|
||||
function subscribeMemorySessionTranscriptUpdates(
|
||||
listener: (update: MemorySessionTranscriptUpdate) => void,
|
||||
): () => void {
|
||||
const injected = (globalThis as Record<symbol, unknown>)[
|
||||
MEMORY_CORE_TRANSCRIPT_UPDATE_SUBSCRIBER_KEY
|
||||
];
|
||||
if (typeof injected === "function") {
|
||||
return (injected as MemoryTranscriptUpdateSubscriber)(listener);
|
||||
}
|
||||
return onSessionTranscriptUpdate(listener);
|
||||
}
|
||||
|
||||
function resolveMemoryWatchFactory(): typeof chokidar.watch {
|
||||
if (process.env.VITEST === "true" || process.env.NODE_ENV === "test") {
|
||||
const override = (globalThis as Record<PropertyKey, unknown>)[TEST_MEMORY_WATCH_FACTORY_KEY];
|
||||
@@ -776,7 +807,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
if (!this.sources.has("sessions") || this.sessionUnsubscribe) {
|
||||
return;
|
||||
}
|
||||
this.sessionUnsubscribe = onSessionTranscriptUpdate((update) => {
|
||||
this.sessionUnsubscribe = subscribeMemorySessionTranscriptUpdates((update) => {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -383,8 +383,13 @@ async function mirrorTelegramAssistantReplyToTranscript(params: {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionId: sessionEntry.sessionId,
|
||||
agentId: params.route.agentId,
|
||||
target: {
|
||||
agentId: params.route.agentId,
|
||||
sessionId: sessionEntry.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
message: appendedMessage,
|
||||
messageId,
|
||||
});
|
||||
|
||||
@@ -299,8 +299,13 @@ async function persistTextTurnTranscript(
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionId: params.sessionId,
|
||||
agentId: params.sessionAgentId,
|
||||
target: {
|
||||
agentId: params.sessionAgentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
});
|
||||
return sessionEntry;
|
||||
}
|
||||
|
||||
@@ -711,8 +711,17 @@ function truncateOversizedToolResultsInExistingSessionManager(params: {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.sessionId && params.sessionKey && params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -783,8 +792,17 @@ async function truncateOversizedToolResultsInTranscriptState(params: {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.sessionId && params.sessionKey && params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -406,8 +406,13 @@ export async function rewriteTranscriptEntriesInRuntimeTranscript(params: {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: target.sessionFile,
|
||||
sessionKey: target.sessionKey,
|
||||
sessionId: target.sessionId,
|
||||
agentId: target.agentId,
|
||||
target: {
|
||||
agentId: target.agentId,
|
||||
sessionId: target.sessionId,
|
||||
sessionKey: target.sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
});
|
||||
log.info(
|
||||
`[transcript-rewrite] rewrote ${result.rewrittenEntries} entr` +
|
||||
@@ -466,8 +471,17 @@ export async function rewriteTranscriptEntriesInSessionFile(params: {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.sessionId && params.sessionKey && params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
log.info(
|
||||
`[transcript-rewrite] rewrote ${result.rewrittenEntries} entr` +
|
||||
|
||||
@@ -312,7 +312,6 @@ describe("appendAssistantMessageToSessionTranscript", () => {
|
||||
| undefined;
|
||||
expect(event?.sessionFile).toBe(sessionFile);
|
||||
expect(event?.sessionKey).toBe(sessionKey);
|
||||
expect(event?.sessionId).toBe(sessionId);
|
||||
expect(event?.messageId).toBeTypeOf("string");
|
||||
expect(message?.role).toBe("assistant");
|
||||
expect(message?.provider).toBe("openclaw");
|
||||
@@ -827,7 +826,6 @@ describe("appendAssistantMessageToSessionTranscript", () => {
|
||||
expect(emitSpy).toHaveBeenCalledWith({
|
||||
sessionFile: result.sessionFile,
|
||||
sessionKey,
|
||||
sessionId,
|
||||
});
|
||||
}
|
||||
emitSpy.mockRestore();
|
||||
|
||||
@@ -342,8 +342,17 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile,
|
||||
sessionKey,
|
||||
sessionId: entry.sessionId,
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: entry.sessionId,
|
||||
sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
message: appendedMessage,
|
||||
messageId,
|
||||
});
|
||||
@@ -352,8 +361,17 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile,
|
||||
sessionKey,
|
||||
sessionId: entry.sessionId,
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: entry.sessionId,
|
||||
sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
break;
|
||||
case "none":
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js";
|
||||
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
|
||||
import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
|
||||
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import { onInternalSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import type { ChatAbortControllerEntry } from "./chat-abort.js";
|
||||
import type {
|
||||
ChatRunState,
|
||||
@@ -109,7 +109,7 @@ export function startGatewayEventSubscriptions(params: {
|
||||
params.broadcast("heartbeat", evt, { dropIfSlow: true });
|
||||
});
|
||||
|
||||
const transcriptUnsub = onSessionTranscriptUpdate((evt) => {
|
||||
const transcriptUnsub = onInternalSessionTranscriptUpdate((evt) => {
|
||||
void getTranscriptUpdateHandler().then((handler) => handler(evt));
|
||||
});
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import { resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { getRuntimeConfig } from "../config/io.js";
|
||||
import { normalizeAgentId } from "../routing/session-key.js";
|
||||
import type { SessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
|
||||
import type { SessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import type { InternalSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import { projectChatDisplayMessage } from "./chat-display-projection.js";
|
||||
import type { GatewayBroadcastToConnIdsFn } from "./server-broadcast-types.js";
|
||||
import type {
|
||||
@@ -117,7 +117,7 @@ export function createTranscriptUpdateBroadcastHandler(params: {
|
||||
sessionMessageSubscribers: SessionMessageSubscribers;
|
||||
}) {
|
||||
let broadcastQueue = Promise.resolve();
|
||||
return (update: SessionTranscriptUpdate): void => {
|
||||
return (update: InternalSessionTranscriptUpdate): void => {
|
||||
broadcastQueue = broadcastQueue
|
||||
.then(() => handleTranscriptUpdateBroadcast(params, update))
|
||||
.catch(() => undefined);
|
||||
@@ -130,7 +130,7 @@ async function handleTranscriptUpdateBroadcast(
|
||||
sessionEventSubscribers: SessionEventSubscribers;
|
||||
sessionMessageSubscribers: SessionMessageSubscribers;
|
||||
},
|
||||
update: SessionTranscriptUpdate,
|
||||
update: InternalSessionTranscriptUpdate,
|
||||
): Promise<void> {
|
||||
const sessionKey =
|
||||
update.target?.sessionKey ??
|
||||
|
||||
@@ -5,7 +5,10 @@ import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from "vite
|
||||
import { appendAssistantMessageToSessionTranscript } from "../config/sessions/transcript.js";
|
||||
import { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
|
||||
import * as transcriptEvents from "../sessions/transcript-events.js";
|
||||
import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import {
|
||||
emitInternalSessionTranscriptUpdate,
|
||||
emitSessionTranscriptUpdate,
|
||||
} from "../sessions/transcript-events.js";
|
||||
import { testState } from "./test-helpers.runtime-state.js";
|
||||
import {
|
||||
connectOk,
|
||||
@@ -531,7 +534,7 @@ describe("session.message websocket events", () => {
|
||||
|
||||
await withOperatorSessionSubscriber(async (ws) => {
|
||||
const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:main");
|
||||
emitSessionTranscriptUpdate({
|
||||
emitInternalSessionTranscriptUpdate({
|
||||
target: {
|
||||
agentId: "main",
|
||||
sessionId: "sess-main",
|
||||
|
||||
@@ -26,7 +26,7 @@ vi.mock("../config/sessions.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("../sessions/transcript-events.js", () => ({
|
||||
onSessionTranscriptUpdate: (cb: typeof transcriptUpdateHandler) => {
|
||||
onInternalSessionTranscriptUpdate: (cb: typeof transcriptUpdateHandler) => {
|
||||
transcriptUpdateHandler = cb;
|
||||
return () => {
|
||||
if (transcriptUpdateHandler === cb) {
|
||||
|
||||
@@ -7,7 +7,10 @@ import {
|
||||
appendAssistantMessageToSessionTranscript,
|
||||
appendExactAssistantMessageToSessionTranscript,
|
||||
} from "../config/sessions/transcript.js";
|
||||
import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import {
|
||||
emitInternalSessionTranscriptUpdate,
|
||||
emitSessionTranscriptUpdate,
|
||||
} from "../sessions/transcript-events.js";
|
||||
import { testState } from "./test-helpers.runtime-state.js";
|
||||
import {
|
||||
connectReq,
|
||||
@@ -606,7 +609,7 @@ describe("session history HTTP endpoints", () => {
|
||||
const stream = await openSessionHistorySse(harness.port, "agent:main:main");
|
||||
await expectHistoryEventTexts(stream, ["first message"]);
|
||||
|
||||
emitSessionTranscriptUpdate({
|
||||
emitInternalSessionTranscriptUpdate({
|
||||
target: {
|
||||
agentId: "main",
|
||||
sessionId: "sess-main",
|
||||
|
||||
@@ -9,7 +9,7 @@ import { getRuntimeConfig } from "../config/io.js";
|
||||
import { loadSessionStore } from "../config/sessions.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { normalizeAgentId } from "../routing/session-key.js";
|
||||
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import { onInternalSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import type { AuthRateLimiter } from "./auth-rate-limit.js";
|
||||
import type { ResolvedGatewayAuth } from "./auth.js";
|
||||
import {
|
||||
@@ -298,12 +298,11 @@ export async function handleSessionHistoryHttpRequest(
|
||||
});
|
||||
}, 15_000);
|
||||
|
||||
const unsubscribe: (() => void) | undefined = onSessionTranscriptUpdate((update) => {
|
||||
const unsubscribe: (() => void) | undefined = onInternalSessionTranscriptUpdate((update) => {
|
||||
// Filter to candidate sessions synchronously before enqueueing any async
|
||||
// work. `onSessionTranscriptUpdate` is a global fan-out listener, so every
|
||||
// transcript write in the gateway would otherwise append a Promise-chain
|
||||
// entry capturing `update.message` to every open SSE stream's queue —
|
||||
// O(streams × updates) for busy deployments.
|
||||
// work. Transcript updates are a global fan-out signal, so every transcript
|
||||
// write in the gateway would otherwise append a Promise-chain entry
|
||||
// capturing `update.message` to every open SSE stream's queue.
|
||||
if (!entry?.sessionId) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { onInternalSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
|
||||
export * from "../../packages/memory-host-sdk/src/engine-foundation.js";
|
||||
export {
|
||||
resolveAgentContextLimits,
|
||||
@@ -39,6 +41,15 @@ export { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
export { resolveGlobalSingleton } from "../shared/global-singleton.js";
|
||||
export { runTasksWithConcurrency } from "../utils/run-with-concurrency.js";
|
||||
export { splitShellArgs } from "../utils/shell-argv.js";
|
||||
|
||||
const MEMORY_CORE_TRANSCRIPT_UPDATE_SUBSCRIBER_KEY = Symbol.for(
|
||||
"openclaw.memoryCore.sessionTranscriptUpdateSubscriber",
|
||||
);
|
||||
|
||||
// Memory-core needs target-only internal updates before the SQLite flip, but
|
||||
// the public SDK listener must stay file-backed. Keep this hook process-local.
|
||||
(globalThis as Record<symbol, unknown>)[MEMORY_CORE_TRANSCRIPT_UPDATE_SUBSCRIBER_KEY] ??=
|
||||
onInternalSessionTranscriptUpdate;
|
||||
export {
|
||||
resolveUserPath,
|
||||
shortenHomeInString,
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { emitSessionTranscriptUpdate, onSessionTranscriptUpdate } from "./transcript-events.js";
|
||||
import {
|
||||
emitInternalSessionTranscriptUpdate,
|
||||
emitSessionTranscriptUpdate,
|
||||
onInternalSessionTranscriptUpdate,
|
||||
onSessionTranscriptUpdate,
|
||||
type SessionTranscriptUpdate,
|
||||
} from "./transcript-events.js";
|
||||
|
||||
const cleanup: Array<() => void> = [];
|
||||
|
||||
@@ -43,7 +49,7 @@ describe("transcript events", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("emits storage-neutral identity updates without session files", () => {
|
||||
it("does not expose identity-only updates to public listeners", () => {
|
||||
const listener = vi.fn();
|
||||
cleanup.push(onSessionTranscriptUpdate(listener));
|
||||
|
||||
@@ -55,6 +61,23 @@ describe("transcript events", () => {
|
||||
targetKind: "runtime-session",
|
||||
},
|
||||
messageId: " msg-1 ",
|
||||
} as unknown as SessionTranscriptUpdate);
|
||||
|
||||
expect(listener).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("emits storage-neutral identity updates to internal listeners", () => {
|
||||
const listener = vi.fn();
|
||||
cleanup.push(onInternalSessionTranscriptUpdate(listener));
|
||||
|
||||
emitInternalSessionTranscriptUpdate({
|
||||
target: {
|
||||
agentId: " main ",
|
||||
sessionId: " sess-1 ",
|
||||
sessionKey: " agent:main:main ",
|
||||
targetKind: "runtime-session",
|
||||
},
|
||||
messageId: " msg-1 ",
|
||||
});
|
||||
|
||||
expect(listener).toHaveBeenCalledWith({
|
||||
@@ -71,14 +94,18 @@ describe("transcript events", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("derives target identity from top-level session metadata", () => {
|
||||
it("includes target identity on public file updates when provided", () => {
|
||||
const listener = vi.fn();
|
||||
cleanup.push(onSessionTranscriptUpdate(listener));
|
||||
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
sessionKey: "agent:main:main",
|
||||
sessionId: "sess-1",
|
||||
target: {
|
||||
agentId: "main",
|
||||
sessionId: "sess-1",
|
||||
sessionKey: "agent:main:main",
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
});
|
||||
|
||||
expect(listener).toHaveBeenCalledWith({
|
||||
@@ -95,19 +122,17 @@ describe("transcript events", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("does not derive agent-scoped target identity from global session keys", () => {
|
||||
it("keeps public global file updates on the compatibility shape", () => {
|
||||
const listener = vi.fn();
|
||||
cleanup.push(onSessionTranscriptUpdate(listener));
|
||||
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
sessionKey: "global",
|
||||
sessionId: "global",
|
||||
});
|
||||
|
||||
expect(listener).toHaveBeenCalledWith({
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
sessionId: "global",
|
||||
sessionKey: "global",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -9,21 +9,31 @@ export type SessionTranscriptUpdateTarget = {
|
||||
targetKind: "active-session-file" | "runtime-session";
|
||||
};
|
||||
|
||||
export type SessionTranscriptUpdate = {
|
||||
/** @deprecated File-backed compatibility hint. Prefer `target` for identity. */
|
||||
sessionFile?: string;
|
||||
type SessionTranscriptUpdateFields = {
|
||||
target?: SessionTranscriptUpdateTarget;
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
/** @deprecated Pre-SQLite compatibility mirror. Prefer `target.sessionId`. */
|
||||
sessionId?: string;
|
||||
message?: unknown;
|
||||
messageId?: string;
|
||||
messageSeq?: number;
|
||||
};
|
||||
|
||||
export type SessionTranscriptUpdate = SessionTranscriptUpdateFields & {
|
||||
/** @deprecated File-backed compatibility hint. Prefer `target` for identity. */
|
||||
sessionFile: string;
|
||||
};
|
||||
|
||||
export type InternalSessionTranscriptUpdate = SessionTranscriptUpdateFields & {
|
||||
sessionFile?: string;
|
||||
};
|
||||
|
||||
type SessionTranscriptListener = (update: SessionTranscriptUpdate) => void;
|
||||
type InternalSessionTranscriptListener = (update: InternalSessionTranscriptUpdate) => void;
|
||||
|
||||
const SESSION_TRANSCRIPT_LISTENERS = new Set<SessionTranscriptListener>();
|
||||
const INTERNAL_SESSION_TRANSCRIPT_LISTENERS = new Set<InternalSessionTranscriptListener>();
|
||||
|
||||
export function onSessionTranscriptUpdate(listener: SessionTranscriptListener): () => void {
|
||||
SESSION_TRANSCRIPT_LISTENERS.add(listener);
|
||||
@@ -32,7 +42,36 @@ export function onSessionTranscriptUpdate(listener: SessionTranscriptListener):
|
||||
};
|
||||
}
|
||||
|
||||
export function onInternalSessionTranscriptUpdate(
|
||||
listener: InternalSessionTranscriptListener,
|
||||
): () => void {
|
||||
INTERNAL_SESSION_TRANSCRIPT_LISTENERS.add(listener);
|
||||
return () => {
|
||||
INTERNAL_SESSION_TRANSCRIPT_LISTENERS.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUpdate): void {
|
||||
const nextUpdate = normalizeSessionTranscriptUpdate(update, { allowIdentityOnly: false });
|
||||
if (!nextUpdate?.sessionFile) {
|
||||
return;
|
||||
}
|
||||
emitPublicSessionTranscriptUpdate(nextUpdate as SessionTranscriptUpdate);
|
||||
emitInternalTranscriptUpdate(nextUpdate);
|
||||
}
|
||||
|
||||
export function emitInternalSessionTranscriptUpdate(update: InternalSessionTranscriptUpdate): void {
|
||||
const nextUpdate = normalizeSessionTranscriptUpdate(update, { allowIdentityOnly: true });
|
||||
if (!nextUpdate) {
|
||||
return;
|
||||
}
|
||||
emitInternalTranscriptUpdate(nextUpdate);
|
||||
}
|
||||
|
||||
function normalizeSessionTranscriptUpdate(
|
||||
update: string | InternalSessionTranscriptUpdate,
|
||||
options: { allowIdentityOnly: boolean },
|
||||
): InternalSessionTranscriptUpdate | undefined {
|
||||
const normalized =
|
||||
typeof update === "string"
|
||||
? { sessionFile: update }
|
||||
@@ -47,15 +86,15 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp
|
||||
messageSeq: update.messageSeq,
|
||||
};
|
||||
const trimmed = normalizeOptionalString(normalized.sessionFile);
|
||||
const target = normalizeUpdateTarget(normalized);
|
||||
if (!trimmed && !target) {
|
||||
return;
|
||||
const target = normalizeUpdateTarget(normalized.target);
|
||||
if (!trimmed && (!options.allowIdentityOnly || !target)) {
|
||||
return undefined;
|
||||
}
|
||||
const messageSeq = asPositiveSafeInteger(normalized.messageSeq);
|
||||
const sessionKey = normalizeOptionalString(normalized.sessionKey) ?? target?.sessionKey;
|
||||
const agentId = normalizeOptionalString(normalized.agentId) ?? target?.agentId;
|
||||
const sessionId = normalizeOptionalString(normalized.sessionId) ?? target?.sessionId;
|
||||
const nextUpdate: SessionTranscriptUpdate = {
|
||||
return {
|
||||
...(trimmed ? { sessionFile: trimmed } : {}),
|
||||
...(target ? { target } : {}),
|
||||
...(sessionKey ? { sessionKey } : {}),
|
||||
@@ -67,6 +106,9 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp
|
||||
: {}),
|
||||
...(messageSeq !== undefined ? { messageSeq } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function emitPublicSessionTranscriptUpdate(nextUpdate: SessionTranscriptUpdate): void {
|
||||
for (const listener of SESSION_TRANSCRIPT_LISTENERS) {
|
||||
try {
|
||||
listener(nextUpdate);
|
||||
@@ -76,29 +118,25 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeUpdateTarget(update: {
|
||||
agentId?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
sessionFile?: string;
|
||||
target?: SessionTranscriptUpdate["target"];
|
||||
}): SessionTranscriptUpdateTarget | undefined {
|
||||
const sessionKey =
|
||||
normalizeOptionalString(update.target?.sessionKey) ??
|
||||
normalizeOptionalString(update.sessionKey);
|
||||
function emitInternalTranscriptUpdate(nextUpdate: InternalSessionTranscriptUpdate): void {
|
||||
for (const listener of INTERNAL_SESSION_TRANSCRIPT_LISTENERS) {
|
||||
try {
|
||||
listener(nextUpdate);
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeUpdateTarget(
|
||||
target: InternalSessionTranscriptUpdate["target"],
|
||||
): SessionTranscriptUpdateTarget | undefined {
|
||||
const sessionKey = normalizeOptionalString(target?.sessionKey);
|
||||
const agentId =
|
||||
normalizeOptionalString(update.target?.agentId) ??
|
||||
normalizeOptionalString(update.agentId) ??
|
||||
normalizeOptionalString(target?.agentId) ??
|
||||
(sessionKey ? parseAgentSessionKey(sessionKey)?.agentId : undefined);
|
||||
const sessionId =
|
||||
normalizeOptionalString(update.target?.sessionId) ?? normalizeOptionalString(update.sessionId);
|
||||
const targetKind =
|
||||
normalizeTargetKind(update.target?.targetKind) ??
|
||||
(agentId && sessionId && sessionKey
|
||||
? normalizeOptionalString(update.sessionFile)
|
||||
? "active-session-file"
|
||||
: "runtime-session"
|
||||
: undefined);
|
||||
const sessionId = normalizeOptionalString(target?.sessionId);
|
||||
const targetKind = normalizeTargetKind(target?.targetKind);
|
||||
if (!agentId || !sessionId || !sessionKey || !targetKind) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
@@ -410,8 +410,17 @@ export async function appendUserTurnTranscriptMessage(
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: params.transcriptPath,
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.sessionId && params.sessionKey && params.agentId
|
||||
? {
|
||||
target: {
|
||||
agentId: params.agentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
targetKind: "active-session-file",
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
message: appended.message,
|
||||
messageId: appended.messageId,
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user