mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix: move compaction planning off the event loop
Move compaction planning work to a bounded worker-thread path so large transcript planning no longer monopolizes the agent event loop. Extract pure planning helpers, sanitize worker inputs before structured clone, package the worker entrypoint, and keep synchronous fallback only for worker-unavailable cases. Fixes #86358.
This commit is contained in:
committed by
GitHub
parent
6fd8cfd5bb
commit
6443d06764
@@ -91,6 +91,7 @@ const requiredPathGroups = [
|
||||
"scripts/postinstall-bundled-plugins.mjs",
|
||||
"dist/plugin-sdk/compat.js",
|
||||
"dist/plugin-sdk/root-alias.cjs",
|
||||
"dist/agents/compaction-planning.worker.js",
|
||||
"dist/agents/model-provider-auth.worker.js",
|
||||
"dist/task-registry-control.runtime.js",
|
||||
"dist/telegram-ingress-worker.runtime.js",
|
||||
|
||||
@@ -9,6 +9,10 @@ import {
|
||||
getCompactionProvider,
|
||||
type CompactionProvider,
|
||||
} from "../../plugins/compaction-provider.js";
|
||||
import {
|
||||
buildHistoryPrunePlanWithWorker,
|
||||
computeAdaptiveChunkRatioWithWorker,
|
||||
} from "../compaction-planning-worker.js";
|
||||
import {
|
||||
hasMeaningfulConversationContent,
|
||||
isRealConversationMessage,
|
||||
@@ -19,9 +23,7 @@ import {
|
||||
SAFETY_MARGIN,
|
||||
SUMMARIZATION_OVERHEAD_TOKENS,
|
||||
computeAdaptiveChunkRatio,
|
||||
estimateMessagesTokens,
|
||||
isOversizedForSummary,
|
||||
pruneHistoryForContextShare,
|
||||
resolveContextWindowTokens,
|
||||
summarizeInStages,
|
||||
} from "../compaction.js";
|
||||
@@ -1071,19 +1073,18 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void {
|
||||
let droppedSummary: string | undefined;
|
||||
|
||||
if (tokensBefore !== undefined) {
|
||||
const summarizableTokens =
|
||||
estimateMessagesTokens(messagesToSummarize) + estimateMessagesTokens(turnPrefixMessages);
|
||||
const newContentTokens = Math.max(0, Math.floor(tokensBefore - summarizableTokens));
|
||||
// Apply SAFETY_MARGIN so token underestimates don't trigger unnecessary pruning
|
||||
const maxHistoryTokens = Math.floor(contextWindowTokens * maxHistoryShare * SAFETY_MARGIN);
|
||||
const prunePlan = await buildHistoryPrunePlanWithWorker({
|
||||
messagesToSummarize,
|
||||
turnPrefixMessages,
|
||||
tokensBefore,
|
||||
contextWindowTokens,
|
||||
maxHistoryShare,
|
||||
parts: 2,
|
||||
signal,
|
||||
});
|
||||
const { newContentTokens, maxHistoryTokens, pruned } = prunePlan;
|
||||
|
||||
if (newContentTokens > maxHistoryTokens) {
|
||||
const pruned = pruneHistoryForContextShare({
|
||||
messages: messagesToSummarize,
|
||||
maxContextTokens: contextWindowTokens,
|
||||
maxHistoryShare,
|
||||
parts: 2,
|
||||
});
|
||||
if (newContentTokens > maxHistoryTokens && pruned) {
|
||||
if (pruned.droppedChunks > 0) {
|
||||
const newContentRatio = (newContentTokens / contextWindowTokens) * 100;
|
||||
log.warn(
|
||||
@@ -1097,10 +1098,11 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void {
|
||||
// Summarize dropped messages so context isn't lost
|
||||
if (pruned.droppedMessagesList.length > 0) {
|
||||
try {
|
||||
const droppedChunkRatio = computeAdaptiveChunkRatio(
|
||||
pruned.droppedMessagesList,
|
||||
contextWindowTokens,
|
||||
);
|
||||
const droppedChunkRatio = await computeAdaptiveChunkRatioWithWorker({
|
||||
messages: pruned.droppedMessagesList,
|
||||
contextWindow: contextWindowTokens,
|
||||
signal,
|
||||
});
|
||||
const droppedMaxChunkTokens = Math.max(
|
||||
1,
|
||||
Math.floor(contextWindowTokens * droppedChunkRatio) -
|
||||
@@ -1155,7 +1157,11 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void {
|
||||
// the summarization prompt, system prompt, previous summary, and reasoning budget
|
||||
// that generateSummary adds on top of the serialized conversation chunk.
|
||||
const allMessages = [...messagesToSummarize, ...turnPrefixMessages];
|
||||
const adaptiveRatio = computeAdaptiveChunkRatio(allMessages, contextWindowTokens);
|
||||
const adaptiveRatio = await computeAdaptiveChunkRatioWithWorker({
|
||||
messages: allMessages,
|
||||
contextWindow: contextWindowTokens,
|
||||
signal,
|
||||
});
|
||||
const maxChunkTokens = Math.max(
|
||||
1,
|
||||
Math.floor(contextWindowTokens * adaptiveRatio) - SUMMARIZATION_OVERHEAD_TOKENS,
|
||||
|
||||
90
src/agents/compaction-planning-worker.test.ts
Normal file
90
src/agents/compaction-planning-worker.test.ts
Normal file
@@ -0,0 +1,90 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { compactionPlanningWorkerTesting } from "./compaction-planning-worker.js";
|
||||
import { runCompactionPlanningWorkerInput } from "./compaction-planning.worker.js";
|
||||
import type { AgentMessage } from "./runtime/index.js";
|
||||
|
||||
function makeMessage(id: number, text = "x".repeat(4000)): AgentMessage {
|
||||
return {
|
||||
role: "user",
|
||||
content: text,
|
||||
timestamp: id,
|
||||
};
|
||||
}
|
||||
|
||||
describe("compaction planning worker", () => {
|
||||
it("resolves the packaged worker URL from stable and hashed dist modules", () => {
|
||||
expect(
|
||||
compactionPlanningWorkerTesting.resolveCompactionPlanningWorkerUrl(
|
||||
"file:///repo/dist/agents/compaction-planning-worker.js",
|
||||
).pathname,
|
||||
).toBe("/repo/dist/agents/compaction-planning.worker.js");
|
||||
expect(
|
||||
compactionPlanningWorkerTesting.resolveCompactionPlanningWorkerUrl(
|
||||
"file:///repo/dist/selection-abc123.js",
|
||||
).pathname,
|
||||
).toBe("/repo/dist/agents/compaction-planning.worker.js");
|
||||
});
|
||||
|
||||
it("rejects invalid worker input", () => {
|
||||
expect(runCompactionPlanningWorkerInput({ kind: "summaryChunks" })).toEqual({
|
||||
status: "failed",
|
||||
error: "invalid compaction planning worker input",
|
||||
});
|
||||
});
|
||||
|
||||
it("plans summary chunks in the worker", async () => {
|
||||
const value = await compactionPlanningWorkerTesting.runCompactionPlanningWorker({
|
||||
input: {
|
||||
kind: "summaryChunks",
|
||||
messages: [makeMessage(1), makeMessage(2), makeMessage(3)],
|
||||
maxChunkTokens: 1200,
|
||||
},
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
|
||||
expect(value.kind).toBe("summaryChunks");
|
||||
if (value.kind !== "summaryChunks") {
|
||||
return;
|
||||
}
|
||||
expect(value.chunks.flat().map((message) => message.timestamp)).toEqual([1, 2, 3]);
|
||||
expect(value.chunks.length).toBeGreaterThan(1);
|
||||
});
|
||||
|
||||
it("classifies missing worker runtime as unavailable", async () => {
|
||||
await expect(
|
||||
compactionPlanningWorkerTesting.runCompactionPlanningWorker({
|
||||
input: {
|
||||
kind: "summaryChunks",
|
||||
messages: [makeMessage(1)],
|
||||
maxChunkTokens: 1200,
|
||||
},
|
||||
timeoutMs: 500,
|
||||
workerUrl: new URL("./missing-compaction-planning.worker.js", import.meta.url),
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
code: "unavailable",
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps timers responsive while planning large histories", async () => {
|
||||
const timer = new Promise<"timer">((resolve) => {
|
||||
setTimeout(() => resolve("timer"), 0);
|
||||
});
|
||||
const planning = compactionPlanningWorkerTesting
|
||||
.runCompactionPlanningWorker({
|
||||
input: {
|
||||
kind: "stageSplit",
|
||||
messages: Array.from({ length: 180 }, (_, index) =>
|
||||
makeMessage(index + 1, "x".repeat(12_000)),
|
||||
),
|
||||
maxChunkTokens: 8000,
|
||||
parts: 4,
|
||||
},
|
||||
timeoutMs: 30_000,
|
||||
})
|
||||
.then(() => "planning" as const);
|
||||
|
||||
await expect(Promise.race([timer, planning])).resolves.toBe("timer");
|
||||
await expect(planning).resolves.toBe("planning");
|
||||
}, 30_000);
|
||||
});
|
||||
339
src/agents/compaction-planning-worker.ts
Normal file
339
src/agents/compaction-planning-worker.ts
Normal file
@@ -0,0 +1,339 @@
|
||||
import path from "node:path";
|
||||
import { fileURLToPath, pathToFileURL } from "node:url";
|
||||
import { Worker } from "node:worker_threads";
|
||||
import {
|
||||
buildHistoryPrunePlan,
|
||||
buildOversizedFallbackPlan,
|
||||
buildStageSplitPlan,
|
||||
buildSummaryChunks,
|
||||
computeAdaptiveChunkRatio,
|
||||
sanitizeCompactionMessages,
|
||||
type HistoryPrunePlan,
|
||||
type OversizedFallbackPlan,
|
||||
type StageSplitPlan,
|
||||
} from "./compaction-planning.js";
|
||||
import type {
|
||||
CompactionPlanningWorkerInput,
|
||||
CompactionPlanningWorkerResult,
|
||||
CompactionPlanningWorkerValue,
|
||||
} from "./compaction-planning.worker.js";
|
||||
import type { AgentMessage } from "./runtime/index.js";
|
||||
|
||||
const COMPACTION_PLANNING_WORKER_TIMEOUT_MS = 60_000;
|
||||
// Worker startup is more expensive than local planning for tiny histories.
|
||||
// Keep small compactions synchronous; move only starvation-sized plans off-thread.
|
||||
const COMPACTION_PLANNING_WORKER_MIN_MESSAGES = 64;
|
||||
|
||||
class CompactionPlanningWorkerError extends Error {
|
||||
constructor(
|
||||
message: string,
|
||||
readonly code: "unavailable" | "timeout" | "failed",
|
||||
) {
|
||||
super(message);
|
||||
this.name = "CompactionPlanningWorkerError";
|
||||
}
|
||||
}
|
||||
|
||||
function resolveCompactionPlanningWorkerUrl(currentModuleUrl = import.meta.url): URL {
|
||||
const currentPath = fileURLToPath(currentModuleUrl);
|
||||
const normalized = currentPath.replaceAll(path.sep, "/");
|
||||
const distMarker = "/dist/";
|
||||
const distIndex = normalized.lastIndexOf(distMarker);
|
||||
if (distIndex >= 0) {
|
||||
const distRoot = currentPath.slice(0, distIndex + distMarker.length);
|
||||
return pathToFileURL(path.join(distRoot, "agents", "compaction-planning.worker.js"));
|
||||
}
|
||||
const extension = path.extname(currentPath) || ".js";
|
||||
return new URL(`./compaction-planning.worker${extension}`, currentModuleUrl);
|
||||
}
|
||||
|
||||
function runCompactionPlanningWorker(params: {
|
||||
input: CompactionPlanningWorkerInput;
|
||||
signal?: AbortSignal;
|
||||
timeoutMs?: number;
|
||||
workerUrl?: URL;
|
||||
}): Promise<CompactionPlanningWorkerValue> {
|
||||
if (params.signal?.aborted) {
|
||||
return Promise.reject(params.signal.reason ?? new Error("compaction planning aborted"));
|
||||
}
|
||||
|
||||
const workerUrl = params.workerUrl ?? resolveCompactionPlanningWorkerUrl();
|
||||
const sourceWorkerExecArgv = workerUrl.pathname.endsWith(".ts") ? ["--import", "tsx"] : undefined;
|
||||
let worker: Worker;
|
||||
try {
|
||||
worker = new Worker(workerUrl, {
|
||||
workerData: params.input,
|
||||
execArgv: sourceWorkerExecArgv,
|
||||
});
|
||||
} catch (error) {
|
||||
return Promise.reject(
|
||||
new CompactionPlanningWorkerError(
|
||||
error instanceof Error ? error.message : String(error),
|
||||
"unavailable",
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
worker.unref?.();
|
||||
|
||||
return new Promise<CompactionPlanningWorkerValue>((resolve, reject) => {
|
||||
let settled = false;
|
||||
const timeout = setTimeout(() => {
|
||||
settle(
|
||||
() =>
|
||||
reject(
|
||||
new CompactionPlanningWorkerError("compaction planning worker timed out", "timeout"),
|
||||
),
|
||||
true,
|
||||
);
|
||||
}, params.timeoutMs ?? COMPACTION_PLANNING_WORKER_TIMEOUT_MS);
|
||||
|
||||
const abort = () => {
|
||||
settle(() => reject(params.signal?.reason ?? new Error("compaction planning aborted")), true);
|
||||
};
|
||||
|
||||
const settle = (finish: () => void, terminate: boolean) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(timeout);
|
||||
params.signal?.removeEventListener("abort", abort);
|
||||
worker.removeAllListeners();
|
||||
if (terminate) {
|
||||
void worker.terminate();
|
||||
}
|
||||
finish();
|
||||
};
|
||||
|
||||
params.signal?.addEventListener("abort", abort, { once: true });
|
||||
|
||||
worker.once("message", (message: CompactionPlanningWorkerResult) => {
|
||||
settle(() => {
|
||||
if (message.status === "ok") {
|
||||
resolve(message.value);
|
||||
return;
|
||||
}
|
||||
reject(new CompactionPlanningWorkerError(message.error, "failed"));
|
||||
}, false);
|
||||
});
|
||||
worker.once("error", (error) => {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
settle(() => reject(new CompactionPlanningWorkerError(message, "unavailable")), true);
|
||||
});
|
||||
worker.once("exit", (code) => {
|
||||
if (code === 0) {
|
||||
return;
|
||||
}
|
||||
settle(
|
||||
() =>
|
||||
reject(
|
||||
new CompactionPlanningWorkerError(
|
||||
`compaction planning worker exited with code ${code}`,
|
||||
"unavailable",
|
||||
),
|
||||
),
|
||||
false,
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function shouldFallbackToMainThread(error: unknown): boolean {
|
||||
return error instanceof CompactionPlanningWorkerError && error.code === "unavailable";
|
||||
}
|
||||
|
||||
function shouldUsePlanningWorker(messageCount: number): boolean {
|
||||
return messageCount >= COMPACTION_PLANNING_WORKER_MIN_MESSAGES;
|
||||
}
|
||||
|
||||
async function runWithUnavailableFallback<T extends CompactionPlanningWorkerValue>(params: {
|
||||
input: CompactionPlanningWorkerInput;
|
||||
signal?: AbortSignal;
|
||||
fallback: () => T;
|
||||
isExpected: (value: CompactionPlanningWorkerValue) => value is T;
|
||||
}): Promise<T> {
|
||||
try {
|
||||
const value = await runCompactionPlanningWorker({
|
||||
input: params.input,
|
||||
signal: params.signal,
|
||||
});
|
||||
if (params.isExpected(value)) {
|
||||
return value;
|
||||
}
|
||||
throw new CompactionPlanningWorkerError(
|
||||
"unexpected compaction planning worker result",
|
||||
"failed",
|
||||
);
|
||||
} catch (error) {
|
||||
if (shouldFallbackToMainThread(error)) {
|
||||
return params.fallback();
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export async function buildSummaryChunksWithWorker(params: {
|
||||
messages: AgentMessage[];
|
||||
maxChunkTokens: number;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<AgentMessage[][]> {
|
||||
const messages = sanitizeCompactionMessages(params.messages);
|
||||
if (!shouldUsePlanningWorker(messages.length)) {
|
||||
return buildSummaryChunks(params);
|
||||
}
|
||||
const value = await runWithUnavailableFallback({
|
||||
input: {
|
||||
kind: "summaryChunks",
|
||||
messages,
|
||||
maxChunkTokens: params.maxChunkTokens,
|
||||
},
|
||||
signal: params.signal,
|
||||
fallback: () => ({
|
||||
kind: "summaryChunks" as const,
|
||||
chunks: buildSummaryChunks(params),
|
||||
}),
|
||||
isExpected: (
|
||||
value,
|
||||
): value is Extract<CompactionPlanningWorkerValue, { kind: "summaryChunks" }> =>
|
||||
value.kind === "summaryChunks",
|
||||
});
|
||||
return value.chunks;
|
||||
}
|
||||
|
||||
export async function buildOversizedFallbackPlanWithWorker(params: {
|
||||
messages: AgentMessage[];
|
||||
contextWindow: number;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<OversizedFallbackPlan> {
|
||||
const messages = sanitizeCompactionMessages(params.messages);
|
||||
if (!shouldUsePlanningWorker(messages.length)) {
|
||||
return buildOversizedFallbackPlan(params);
|
||||
}
|
||||
const value = await runWithUnavailableFallback({
|
||||
input: {
|
||||
kind: "oversizedFallback",
|
||||
messages,
|
||||
contextWindow: params.contextWindow,
|
||||
},
|
||||
signal: params.signal,
|
||||
fallback: () => ({
|
||||
kind: "oversizedFallback" as const,
|
||||
...buildOversizedFallbackPlan(params),
|
||||
}),
|
||||
isExpected: (
|
||||
value,
|
||||
): value is Extract<CompactionPlanningWorkerValue, { kind: "oversizedFallback" }> =>
|
||||
value.kind === "oversizedFallback",
|
||||
});
|
||||
return {
|
||||
smallMessages: value.smallMessages,
|
||||
oversizedNotes: value.oversizedNotes,
|
||||
};
|
||||
}
|
||||
|
||||
export async function buildStageSplitPlanWithWorker(params: {
|
||||
messages: AgentMessage[];
|
||||
maxChunkTokens: number;
|
||||
parts?: number;
|
||||
minMessagesForSplit?: number;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<StageSplitPlan> {
|
||||
const messages = sanitizeCompactionMessages(params.messages);
|
||||
if (!shouldUsePlanningWorker(messages.length)) {
|
||||
return buildStageSplitPlan(params);
|
||||
}
|
||||
const value = await runWithUnavailableFallback({
|
||||
input: {
|
||||
kind: "stageSplit",
|
||||
messages,
|
||||
maxChunkTokens: params.maxChunkTokens,
|
||||
parts: params.parts,
|
||||
minMessagesForSplit: params.minMessagesForSplit,
|
||||
},
|
||||
signal: params.signal,
|
||||
fallback: () => ({
|
||||
kind: "stageSplit" as const,
|
||||
...buildStageSplitPlan(params),
|
||||
}),
|
||||
isExpected: (value): value is Extract<CompactionPlanningWorkerValue, { kind: "stageSplit" }> =>
|
||||
value.kind === "stageSplit",
|
||||
});
|
||||
return value.mode === "split" ? { mode: "split", chunks: value.chunks } : { mode: "single" };
|
||||
}
|
||||
|
||||
export async function buildHistoryPrunePlanWithWorker(params: {
|
||||
messagesToSummarize: AgentMessage[];
|
||||
turnPrefixMessages: AgentMessage[];
|
||||
tokensBefore: number;
|
||||
contextWindowTokens: number;
|
||||
maxHistoryShare: number;
|
||||
parts?: number;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<HistoryPrunePlan> {
|
||||
const messagesToSummarize = sanitizeCompactionMessages(params.messagesToSummarize);
|
||||
const turnPrefixMessages = sanitizeCompactionMessages(params.turnPrefixMessages);
|
||||
if (!shouldUsePlanningWorker(messagesToSummarize.length + turnPrefixMessages.length)) {
|
||||
return buildHistoryPrunePlan(params);
|
||||
}
|
||||
const value = await runWithUnavailableFallback({
|
||||
input: {
|
||||
kind: "historyPrune",
|
||||
messagesToSummarize,
|
||||
turnPrefixMessages,
|
||||
tokensBefore: params.tokensBefore,
|
||||
contextWindowTokens: params.contextWindowTokens,
|
||||
maxHistoryShare: params.maxHistoryShare,
|
||||
parts: params.parts,
|
||||
},
|
||||
signal: params.signal,
|
||||
fallback: () => ({
|
||||
kind: "historyPrune" as const,
|
||||
...buildHistoryPrunePlan(params),
|
||||
}),
|
||||
isExpected: (
|
||||
value,
|
||||
): value is Extract<CompactionPlanningWorkerValue, { kind: "historyPrune" }> =>
|
||||
value.kind === "historyPrune",
|
||||
});
|
||||
return {
|
||||
summarizableTokens: value.summarizableTokens,
|
||||
newContentTokens: value.newContentTokens,
|
||||
maxHistoryTokens: value.maxHistoryTokens,
|
||||
pruned: value.pruned,
|
||||
};
|
||||
}
|
||||
|
||||
export async function computeAdaptiveChunkRatioWithWorker(params: {
|
||||
messages: AgentMessage[];
|
||||
contextWindow: number;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<number> {
|
||||
const messages = sanitizeCompactionMessages(params.messages);
|
||||
if (!shouldUsePlanningWorker(messages.length)) {
|
||||
return computeAdaptiveChunkRatio(params.messages, params.contextWindow);
|
||||
}
|
||||
const value = await runWithUnavailableFallback({
|
||||
input: {
|
||||
kind: "adaptiveChunkRatio",
|
||||
messages,
|
||||
contextWindow: params.contextWindow,
|
||||
},
|
||||
signal: params.signal,
|
||||
fallback: () => ({
|
||||
kind: "adaptiveChunkRatio" as const,
|
||||
ratio: computeAdaptiveChunkRatio(params.messages, params.contextWindow),
|
||||
}),
|
||||
isExpected: (
|
||||
value,
|
||||
): value is Extract<CompactionPlanningWorkerValue, { kind: "adaptiveChunkRatio" }> =>
|
||||
value.kind === "adaptiveChunkRatio",
|
||||
});
|
||||
return value.ratio;
|
||||
}
|
||||
|
||||
export const compactionPlanningWorkerTesting = {
|
||||
resolveCompactionPlanningWorkerUrl,
|
||||
runCompactionPlanningWorker,
|
||||
CompactionPlanningWorkerError,
|
||||
};
|
||||
389
src/agents/compaction-planning.ts
Normal file
389
src/agents/compaction-planning.ts
Normal file
@@ -0,0 +1,389 @@
|
||||
import { stripRuntimeContextCustomMessages } from "./internal-runtime-context.js";
|
||||
import type { AgentMessage } from "./runtime/index.js";
|
||||
import { repairToolUseResultPairing, stripToolResultDetails } from "./session-transcript-repair.js";
|
||||
import { estimateTokens } from "./sessions/index.js";
|
||||
import { extractToolCallsFromAssistant, extractToolResultId } from "./tool-call-id.js";
|
||||
|
||||
export const BASE_CHUNK_RATIO = 0.4;
|
||||
export const MIN_CHUNK_RATIO = 0.15;
|
||||
export const SAFETY_MARGIN = 1.2; // 20% buffer for estimateTokens() inaccuracy
|
||||
const DEFAULT_PARTS = 2;
|
||||
|
||||
// Overhead reserved for summarization prompt, system prompt, previous summary,
|
||||
// and serialization wrappers (<conversation> tags, instructions, etc.).
|
||||
// generateSummary uses reasoning: "high" which also consumes context budget.
|
||||
export const SUMMARIZATION_OVERHEAD_TOKENS = 4096;
|
||||
|
||||
export type StageSplitPlan =
|
||||
| {
|
||||
mode: "single";
|
||||
}
|
||||
| {
|
||||
mode: "split";
|
||||
chunks: AgentMessage[][];
|
||||
};
|
||||
|
||||
export type OversizedFallbackPlan = {
|
||||
smallMessages: AgentMessage[];
|
||||
oversizedNotes: string[];
|
||||
};
|
||||
|
||||
export type HistoryPrunePlan = {
|
||||
summarizableTokens: number;
|
||||
newContentTokens: number;
|
||||
maxHistoryTokens: number;
|
||||
pruned?: ReturnType<typeof pruneHistoryForContextShare>;
|
||||
};
|
||||
|
||||
export function estimateMessagesTokens(messages: AgentMessage[]): number {
|
||||
// SECURITY: toolResult.details and runtime-context transcript entries must never enter LLM-facing compaction.
|
||||
const safe = sanitizeCompactionMessages(messages);
|
||||
return safe.reduce((sum, message) => sum + estimateTokens(message), 0);
|
||||
}
|
||||
|
||||
export function sanitizeCompactionMessages(messages: AgentMessage[]): AgentMessage[] {
|
||||
return stripToolResultDetails(stripRuntimeContextCustomMessages(messages));
|
||||
}
|
||||
|
||||
export function estimateCompactionMessageTokens(message: AgentMessage): number {
|
||||
return estimateMessagesTokens([message]);
|
||||
}
|
||||
|
||||
export function normalizeCompactionParts(parts: number, messageCount: number): number {
|
||||
if (!Number.isFinite(parts) || parts <= 1) {
|
||||
return 1;
|
||||
}
|
||||
return Math.min(Math.max(1, Math.floor(parts)), Math.max(1, messageCount));
|
||||
}
|
||||
|
||||
export function splitMessagesByTokenShare(
|
||||
messages: AgentMessage[],
|
||||
parts = DEFAULT_PARTS,
|
||||
): AgentMessage[][] {
|
||||
if (messages.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const normalizedParts = normalizeCompactionParts(parts, messages.length);
|
||||
if (normalizedParts <= 1) {
|
||||
return [messages];
|
||||
}
|
||||
|
||||
const totalTokens = estimateMessagesTokens(messages);
|
||||
const targetTokens = totalTokens / normalizedParts;
|
||||
const chunks: AgentMessage[][] = [];
|
||||
let current: AgentMessage[] = [];
|
||||
let currentTokens = 0;
|
||||
|
||||
let pendingToolCallIds = new Set<string>();
|
||||
let pendingChunkStartIndex: number | null = null;
|
||||
|
||||
const splitCurrentAtPendingBoundary = (): boolean => {
|
||||
if (
|
||||
pendingChunkStartIndex === null ||
|
||||
pendingChunkStartIndex <= 0 ||
|
||||
chunks.length >= normalizedParts - 1
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
chunks.push(current.slice(0, pendingChunkStartIndex));
|
||||
current = current.slice(pendingChunkStartIndex);
|
||||
currentTokens = current.reduce((sum, msg) => sum + estimateCompactionMessageTokens(msg), 0);
|
||||
pendingChunkStartIndex = 0;
|
||||
return true;
|
||||
};
|
||||
|
||||
for (const message of messages) {
|
||||
const messageTokens = estimateCompactionMessageTokens(message);
|
||||
|
||||
if (
|
||||
pendingToolCallIds.size === 0 &&
|
||||
chunks.length < normalizedParts - 1 &&
|
||||
current.length > 0 &&
|
||||
currentTokens + messageTokens > targetTokens
|
||||
) {
|
||||
chunks.push(current);
|
||||
current = [];
|
||||
currentTokens = 0;
|
||||
pendingChunkStartIndex = null;
|
||||
}
|
||||
|
||||
current.push(message);
|
||||
currentTokens += messageTokens;
|
||||
|
||||
if (message.role === "assistant") {
|
||||
const toolCalls = extractToolCallsFromAssistant(message);
|
||||
const stopReason = (message as { stopReason?: unknown }).stopReason;
|
||||
const keepsPending =
|
||||
stopReason !== "aborted" && stopReason !== "error" && toolCalls.length > 0;
|
||||
pendingToolCallIds = new Set();
|
||||
if (keepsPending) {
|
||||
for (const toolCall of toolCalls) {
|
||||
pendingToolCallIds.add(toolCall.id);
|
||||
}
|
||||
}
|
||||
pendingChunkStartIndex = keepsPending ? current.length - 1 : null;
|
||||
} else if (message.role === "toolResult" && pendingToolCallIds.size > 0) {
|
||||
const resultId = extractToolResultId(message);
|
||||
if (!resultId) {
|
||||
pendingToolCallIds = new Set();
|
||||
pendingChunkStartIndex = null;
|
||||
} else {
|
||||
pendingToolCallIds.delete(resultId);
|
||||
}
|
||||
if (
|
||||
pendingToolCallIds.size === 0 &&
|
||||
chunks.length < normalizedParts - 1 &&
|
||||
currentTokens > targetTokens
|
||||
) {
|
||||
splitCurrentAtPendingBoundary();
|
||||
pendingChunkStartIndex = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pendingToolCallIds.size > 0 && currentTokens > targetTokens) {
|
||||
splitCurrentAtPendingBoundary();
|
||||
}
|
||||
|
||||
if (current.length > 0) {
|
||||
chunks.push(current);
|
||||
}
|
||||
|
||||
return chunks;
|
||||
}
|
||||
|
||||
export function chunkMessagesByMaxTokens(
|
||||
messages: AgentMessage[],
|
||||
maxTokens: number,
|
||||
): AgentMessage[][] {
|
||||
if (messages.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Apply safety margin to compensate for estimateTokens() underestimation
|
||||
// (chars/4 heuristic misses multi-byte chars, special tokens, code tokens, etc.)
|
||||
const effectiveMax = Math.max(1, Math.floor(maxTokens / SAFETY_MARGIN));
|
||||
|
||||
const chunks: AgentMessage[][] = [];
|
||||
let currentChunk: AgentMessage[] = [];
|
||||
let currentTokens = 0;
|
||||
|
||||
for (const message of messages) {
|
||||
const messageTokens = estimateCompactionMessageTokens(message);
|
||||
if (currentChunk.length > 0 && currentTokens + messageTokens > effectiveMax) {
|
||||
chunks.push(currentChunk);
|
||||
currentChunk = [];
|
||||
currentTokens = 0;
|
||||
}
|
||||
|
||||
currentChunk.push(message);
|
||||
currentTokens += messageTokens;
|
||||
|
||||
if (messageTokens > effectiveMax) {
|
||||
// Split oversized messages to avoid unbounded chunk growth.
|
||||
chunks.push(currentChunk);
|
||||
currentChunk = [];
|
||||
currentTokens = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (currentChunk.length > 0) {
|
||||
chunks.push(currentChunk);
|
||||
}
|
||||
|
||||
return chunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute adaptive chunk ratio based on average message size.
|
||||
* When messages are large, we use smaller chunks to avoid exceeding model limits.
|
||||
*/
|
||||
export function computeAdaptiveChunkRatio(messages: AgentMessage[], contextWindow: number): number {
|
||||
if (messages.length === 0) {
|
||||
return BASE_CHUNK_RATIO;
|
||||
}
|
||||
|
||||
const totalTokens = estimateMessagesTokens(messages);
|
||||
const avgTokens = totalTokens / messages.length;
|
||||
|
||||
// Apply safety margin to account for estimation inaccuracy
|
||||
const safeAvgTokens = avgTokens * SAFETY_MARGIN;
|
||||
const avgRatio = safeAvgTokens / contextWindow;
|
||||
|
||||
// If average message is > 10% of context, reduce chunk ratio
|
||||
if (avgRatio > 0.1) {
|
||||
const reduction = Math.min(avgRatio * 2, BASE_CHUNK_RATIO - MIN_CHUNK_RATIO);
|
||||
return Math.max(MIN_CHUNK_RATIO, BASE_CHUNK_RATIO - reduction);
|
||||
}
|
||||
|
||||
return BASE_CHUNK_RATIO;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a single message is too large to summarize.
|
||||
* If single message > 50% of context, it can't be summarized safely.
|
||||
*/
|
||||
export function isOversizedForSummary(msg: AgentMessage, contextWindow: number): boolean {
|
||||
const tokens = estimateCompactionMessageTokens(msg) * SAFETY_MARGIN;
|
||||
return tokens > contextWindow * 0.5;
|
||||
}
|
||||
|
||||
export function buildSummaryChunks(params: {
|
||||
messages: AgentMessage[];
|
||||
maxChunkTokens: number;
|
||||
}): AgentMessage[][] {
|
||||
// SECURITY: never feed toolResult.details or runtime-context transcript entries into summarization prompts.
|
||||
const safeMessages = sanitizeCompactionMessages(params.messages);
|
||||
return chunkMessagesByMaxTokens(safeMessages, params.maxChunkTokens);
|
||||
}
|
||||
|
||||
export function buildOversizedFallbackPlan(params: {
|
||||
messages: AgentMessage[];
|
||||
contextWindow: number;
|
||||
}): OversizedFallbackPlan {
|
||||
const smallMessages: AgentMessage[] = [];
|
||||
const oversizedNotes: string[] = [];
|
||||
|
||||
for (const msg of params.messages) {
|
||||
if (isOversizedForSummary(msg, params.contextWindow)) {
|
||||
const role = (msg as { role?: string }).role ?? "message";
|
||||
const tokens = estimateCompactionMessageTokens(msg);
|
||||
oversizedNotes.push(
|
||||
`[Large ${role} (~${Math.round(tokens / 1000)}K tokens) omitted from summary]`,
|
||||
);
|
||||
} else {
|
||||
smallMessages.push(msg);
|
||||
}
|
||||
}
|
||||
|
||||
return { smallMessages, oversizedNotes };
|
||||
}
|
||||
|
||||
export function buildStageSplitPlan(params: {
|
||||
messages: AgentMessage[];
|
||||
maxChunkTokens: number;
|
||||
parts?: number;
|
||||
minMessagesForSplit?: number;
|
||||
}): StageSplitPlan {
|
||||
const minMessagesForSplit = Math.max(2, params.minMessagesForSplit ?? 4);
|
||||
const parts = normalizeCompactionParts(params.parts ?? DEFAULT_PARTS, params.messages.length);
|
||||
const totalTokens = estimateMessagesTokens(params.messages);
|
||||
|
||||
if (
|
||||
parts <= 1 ||
|
||||
params.messages.length < minMessagesForSplit ||
|
||||
totalTokens <= params.maxChunkTokens
|
||||
) {
|
||||
return { mode: "single" };
|
||||
}
|
||||
|
||||
const chunks = splitMessagesByTokenShare(params.messages, parts).filter(
|
||||
(chunk) => chunk.length > 0,
|
||||
);
|
||||
return chunks.length > 1 ? { mode: "split", chunks } : { mode: "single" };
|
||||
}
|
||||
|
||||
export function pruneHistoryForContextShare(params: {
|
||||
messages: AgentMessage[];
|
||||
maxContextTokens: number;
|
||||
maxHistoryShare?: number;
|
||||
parts?: number;
|
||||
mode?: "share" | "handoff";
|
||||
}): {
|
||||
messages: AgentMessage[];
|
||||
droppedMessagesList: AgentMessage[];
|
||||
droppedChunks: number;
|
||||
droppedMessages: number;
|
||||
droppedTokens: number;
|
||||
keptTokens: number;
|
||||
budgetTokens: number;
|
||||
} {
|
||||
const isHandoff = params.mode === "handoff";
|
||||
const defaultShare = isHandoff ? 0.2 : 0.5; // Stricter budget for handoff snapshots
|
||||
const maxHistoryShare = params.maxHistoryShare ?? defaultShare;
|
||||
const budgetTokens = Math.max(1, Math.floor(params.maxContextTokens * maxHistoryShare));
|
||||
let keptMessages = params.messages;
|
||||
const allDroppedMessages: AgentMessage[] = [];
|
||||
let droppedChunks = 0;
|
||||
let droppedMessages = 0;
|
||||
let droppedTokens = 0;
|
||||
|
||||
const parts = normalizeCompactionParts(params.parts ?? DEFAULT_PARTS, keptMessages.length);
|
||||
|
||||
while (keptMessages.length > 0 && estimateMessagesTokens(keptMessages) > budgetTokens) {
|
||||
const chunks = splitMessagesByTokenShare(keptMessages, parts);
|
||||
if (chunks.length <= 1) {
|
||||
break;
|
||||
}
|
||||
const [dropped, ...rest] = chunks;
|
||||
const flatRest = rest.flat();
|
||||
|
||||
// After dropping a chunk, repair tool_use/tool_result pairing to handle
|
||||
// orphaned tool_results (whose tool_use was in the dropped chunk).
|
||||
// repairToolUseResultPairing drops orphaned tool_results, preventing
|
||||
// "unexpected tool_use_id" errors from Anthropic's API.
|
||||
const repairReport = repairToolUseResultPairing(flatRest);
|
||||
const repairedKept = repairReport.messages;
|
||||
|
||||
// Track orphaned tool_results as dropped (they were in kept but their tool_use was dropped)
|
||||
const orphanedCount = repairReport.droppedOrphanCount;
|
||||
|
||||
droppedChunks += 1;
|
||||
droppedMessages += dropped.length + orphanedCount;
|
||||
droppedTokens += estimateMessagesTokens(dropped);
|
||||
// Note: We don't have the actual orphaned messages to add to droppedMessagesList
|
||||
// since repairToolUseResultPairing doesn't return them. This is acceptable since
|
||||
// the dropped messages are used for summarization, and orphaned tool_results
|
||||
// without their tool_use context aren't useful for summarization anyway.
|
||||
allDroppedMessages.push(...dropped);
|
||||
keptMessages = repairedKept;
|
||||
}
|
||||
|
||||
return {
|
||||
messages: keptMessages,
|
||||
droppedMessagesList: allDroppedMessages,
|
||||
droppedChunks,
|
||||
droppedMessages,
|
||||
droppedTokens,
|
||||
keptTokens: estimateMessagesTokens(keptMessages),
|
||||
budgetTokens,
|
||||
};
|
||||
}
|
||||
|
||||
export function buildHistoryPrunePlan(params: {
|
||||
messagesToSummarize: AgentMessage[];
|
||||
turnPrefixMessages: AgentMessage[];
|
||||
tokensBefore: number;
|
||||
contextWindowTokens: number;
|
||||
maxHistoryShare: number;
|
||||
parts?: number;
|
||||
}): HistoryPrunePlan {
|
||||
const summarizableTokens =
|
||||
estimateMessagesTokens(params.messagesToSummarize) +
|
||||
estimateMessagesTokens(params.turnPrefixMessages);
|
||||
const newContentTokens = Math.max(0, Math.floor(params.tokensBefore - summarizableTokens));
|
||||
// Apply SAFETY_MARGIN so token underestimates don't trigger unnecessary pruning.
|
||||
const maxHistoryTokens = Math.floor(
|
||||
params.contextWindowTokens * params.maxHistoryShare * SAFETY_MARGIN,
|
||||
);
|
||||
|
||||
if (newContentTokens <= maxHistoryTokens) {
|
||||
return {
|
||||
summarizableTokens,
|
||||
newContentTokens,
|
||||
maxHistoryTokens,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
summarizableTokens,
|
||||
newContentTokens,
|
||||
maxHistoryTokens,
|
||||
pruned: pruneHistoryForContextShare({
|
||||
messages: params.messagesToSummarize,
|
||||
maxContextTokens: params.contextWindowTokens,
|
||||
maxHistoryShare: params.maxHistoryShare,
|
||||
parts: params.parts,
|
||||
}),
|
||||
};
|
||||
}
|
||||
179
src/agents/compaction-planning.worker.ts
Normal file
179
src/agents/compaction-planning.worker.ts
Normal file
@@ -0,0 +1,179 @@
|
||||
import { parentPort, workerData } from "node:worker_threads";
|
||||
import {
|
||||
buildHistoryPrunePlan,
|
||||
buildOversizedFallbackPlan,
|
||||
buildStageSplitPlan,
|
||||
buildSummaryChunks,
|
||||
computeAdaptiveChunkRatio,
|
||||
type HistoryPrunePlan,
|
||||
type OversizedFallbackPlan,
|
||||
type StageSplitPlan,
|
||||
} from "./compaction-planning.js";
|
||||
import type { AgentMessage } from "./runtime/index.js";
|
||||
|
||||
export type CompactionPlanningWorkerInput =
|
||||
| {
|
||||
kind: "summaryChunks";
|
||||
messages: AgentMessage[];
|
||||
maxChunkTokens: number;
|
||||
}
|
||||
| {
|
||||
kind: "oversizedFallback";
|
||||
messages: AgentMessage[];
|
||||
contextWindow: number;
|
||||
}
|
||||
| {
|
||||
kind: "stageSplit";
|
||||
messages: AgentMessage[];
|
||||
maxChunkTokens: number;
|
||||
parts?: number;
|
||||
minMessagesForSplit?: number;
|
||||
}
|
||||
| {
|
||||
kind: "historyPrune";
|
||||
messagesToSummarize: AgentMessage[];
|
||||
turnPrefixMessages: AgentMessage[];
|
||||
tokensBefore: number;
|
||||
contextWindowTokens: number;
|
||||
maxHistoryShare: number;
|
||||
parts?: number;
|
||||
}
|
||||
| {
|
||||
kind: "adaptiveChunkRatio";
|
||||
messages: AgentMessage[];
|
||||
contextWindow: number;
|
||||
};
|
||||
|
||||
export type CompactionPlanningWorkerValue =
|
||||
| {
|
||||
kind: "summaryChunks";
|
||||
chunks: AgentMessage[][];
|
||||
}
|
||||
| ({
|
||||
kind: "oversizedFallback";
|
||||
} & OversizedFallbackPlan)
|
||||
| ({
|
||||
kind: "stageSplit";
|
||||
} & StageSplitPlan)
|
||||
| ({
|
||||
kind: "historyPrune";
|
||||
} & HistoryPrunePlan)
|
||||
| {
|
||||
kind: "adaptiveChunkRatio";
|
||||
ratio: number;
|
||||
};
|
||||
|
||||
export type CompactionPlanningWorkerResult =
|
||||
| {
|
||||
status: "ok";
|
||||
value: CompactionPlanningWorkerValue;
|
||||
}
|
||||
| {
|
||||
status: "failed";
|
||||
error: string;
|
||||
};
|
||||
|
||||
function isFiniteNumber(value: unknown): value is number {
|
||||
return typeof value === "number" && Number.isFinite(value);
|
||||
}
|
||||
|
||||
function isMessageArray(value: unknown): value is AgentMessage[] {
|
||||
return Array.isArray(value);
|
||||
}
|
||||
|
||||
function isWorkerInput(value: unknown): value is CompactionPlanningWorkerInput {
|
||||
if (!value || typeof value !== "object" || !("kind" in value)) {
|
||||
return false;
|
||||
}
|
||||
const input = value as Record<string, unknown>;
|
||||
switch (input.kind) {
|
||||
case "summaryChunks":
|
||||
return isMessageArray(input.messages) && isFiniteNumber(input.maxChunkTokens);
|
||||
case "oversizedFallback":
|
||||
return isMessageArray(input.messages) && isFiniteNumber(input.contextWindow);
|
||||
case "stageSplit":
|
||||
return isMessageArray(input.messages) && isFiniteNumber(input.maxChunkTokens);
|
||||
case "historyPrune":
|
||||
return (
|
||||
isMessageArray(input.messagesToSummarize) &&
|
||||
isMessageArray(input.turnPrefixMessages) &&
|
||||
isFiniteNumber(input.tokensBefore) &&
|
||||
isFiniteNumber(input.contextWindowTokens) &&
|
||||
isFiniteNumber(input.maxHistoryShare)
|
||||
);
|
||||
case "adaptiveChunkRatio":
|
||||
return isMessageArray(input.messages) && isFiniteNumber(input.contextWindow);
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export function runCompactionPlanningWorkerInput(input: unknown): CompactionPlanningWorkerResult {
|
||||
if (!isWorkerInput(input)) {
|
||||
return {
|
||||
status: "failed",
|
||||
error: "invalid compaction planning worker input",
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
switch (input.kind) {
|
||||
case "summaryChunks":
|
||||
return {
|
||||
status: "ok",
|
||||
value: {
|
||||
kind: "summaryChunks",
|
||||
chunks: buildSummaryChunks(input),
|
||||
},
|
||||
};
|
||||
case "oversizedFallback":
|
||||
return {
|
||||
status: "ok",
|
||||
value: {
|
||||
kind: "oversizedFallback",
|
||||
...buildOversizedFallbackPlan(input),
|
||||
},
|
||||
};
|
||||
case "stageSplit":
|
||||
return {
|
||||
status: "ok",
|
||||
value: {
|
||||
kind: "stageSplit",
|
||||
...buildStageSplitPlan(input),
|
||||
},
|
||||
};
|
||||
case "historyPrune":
|
||||
return {
|
||||
status: "ok",
|
||||
value: {
|
||||
kind: "historyPrune",
|
||||
...buildHistoryPrunePlan(input),
|
||||
},
|
||||
};
|
||||
case "adaptiveChunkRatio":
|
||||
return {
|
||||
status: "ok",
|
||||
value: {
|
||||
kind: "adaptiveChunkRatio",
|
||||
ratio: computeAdaptiveChunkRatio(input.messages, input.contextWindow),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
status: "failed",
|
||||
error: "unsupported compaction planning worker input",
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
status: "failed",
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (parentPort) {
|
||||
const sendToParent: (message: CompactionPlanningWorkerResult) => void =
|
||||
parentPort.postMessage.bind(parentPort);
|
||||
sendToParent(runCompactionPlanningWorkerInput(workerData));
|
||||
}
|
||||
@@ -17,6 +17,7 @@ vi.mock("openclaw/plugin-sdk/agent-sessions", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
import { sanitizeCompactionMessages } from "./compaction-planning.js";
|
||||
import { chunkMessagesByMaxTokens, splitMessagesByTokenShare } from "./compaction.js";
|
||||
|
||||
describe("compaction token accounting sanitization", () => {
|
||||
@@ -48,4 +49,35 @@ describe("compaction token accounting sanitization", () => {
|
||||
|
||||
expect(calledWithDetails).toBe(false);
|
||||
});
|
||||
|
||||
it("projects worker inputs to planning-safe messages before cloning", () => {
|
||||
const messages: AgentMessage[] = [
|
||||
{
|
||||
role: "toolResult",
|
||||
toolCallId: "call_1",
|
||||
toolName: "browser",
|
||||
isError: false,
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
details: { raw: "x".repeat(50_000) },
|
||||
timestamp: 1,
|
||||
} as any,
|
||||
{
|
||||
role: "custom",
|
||||
customType: "openclaw.runtime-context",
|
||||
content: "internal",
|
||||
timestamp: 2,
|
||||
} as any,
|
||||
{
|
||||
role: "user",
|
||||
content: "next",
|
||||
timestamp: 3,
|
||||
},
|
||||
];
|
||||
|
||||
const sanitized = sanitizeCompactionMessages(messages);
|
||||
|
||||
expect(sanitized).toHaveLength(2);
|
||||
expect(sanitized[0]).not.toHaveProperty("details");
|
||||
expect(sanitized.map((message) => message.role)).toEqual(["toolResult", "user"]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,24 +3,47 @@ import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { retryAsync } from "../infra/retry.js";
|
||||
import { isAbortError } from "../infra/unhandled-rejections.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import {
|
||||
buildOversizedFallbackPlanWithWorker,
|
||||
buildStageSplitPlanWithWorker,
|
||||
buildSummaryChunksWithWorker,
|
||||
} from "./compaction-planning-worker.js";
|
||||
import {
|
||||
BASE_CHUNK_RATIO,
|
||||
chunkMessagesByMaxTokens,
|
||||
computeAdaptiveChunkRatio,
|
||||
estimateMessagesTokens,
|
||||
isOversizedForSummary,
|
||||
MIN_CHUNK_RATIO,
|
||||
pruneHistoryForContextShare,
|
||||
SAFETY_MARGIN,
|
||||
splitMessagesByTokenShare,
|
||||
SUMMARIZATION_OVERHEAD_TOKENS,
|
||||
} from "./compaction-planning.js";
|
||||
import { DEFAULT_CONTEXT_TOKENS } from "./defaults.js";
|
||||
import { isTimeoutError } from "./failover-error.js";
|
||||
|
||||
type PartialSummaryError = Error & { partialSummary?: string };
|
||||
import { stripRuntimeContextCustomMessages } from "./internal-runtime-context.js";
|
||||
import type { AgentMessage } from "./runtime/index.js";
|
||||
import { repairToolUseResultPairing, stripToolResultDetails } from "./session-transcript-repair.js";
|
||||
import type { ExtensionContext } from "./sessions/index.js";
|
||||
import { estimateTokens, generateSummary as agentGenerateSummary } from "./sessions/index.js";
|
||||
import { extractToolCallsFromAssistant, extractToolResultId } from "./tool-call-id.js";
|
||||
import { generateSummary as agentGenerateSummary } from "./sessions/index.js";
|
||||
|
||||
export {
|
||||
BASE_CHUNK_RATIO,
|
||||
chunkMessagesByMaxTokens,
|
||||
computeAdaptiveChunkRatio,
|
||||
estimateMessagesTokens,
|
||||
isOversizedForSummary,
|
||||
MIN_CHUNK_RATIO,
|
||||
pruneHistoryForContextShare,
|
||||
SAFETY_MARGIN,
|
||||
splitMessagesByTokenShare,
|
||||
SUMMARIZATION_OVERHEAD_TOKENS,
|
||||
};
|
||||
|
||||
const log = createSubsystemLogger("compaction");
|
||||
|
||||
export const BASE_CHUNK_RATIO = 0.4;
|
||||
export const MIN_CHUNK_RATIO = 0.15;
|
||||
export const SAFETY_MARGIN = 1.2; // 20% buffer for estimateTokens() inaccuracy
|
||||
type PartialSummaryError = Error & { partialSummary?: string };
|
||||
|
||||
const DEFAULT_SUMMARY_FALLBACK = "No prior history.";
|
||||
const DEFAULT_PARTS = 2;
|
||||
const MERGE_SUMMARIES_INSTRUCTIONS = [
|
||||
"Merge these partial summaries into a single cohesive summary.",
|
||||
"",
|
||||
@@ -116,200 +139,6 @@ export function buildCompactionSummarizationInstructions(
|
||||
return `${identifierPreservation}\n\nAdditional focus:\n${custom}`;
|
||||
}
|
||||
|
||||
export function estimateMessagesTokens(messages: AgentMessage[]): number {
|
||||
// SECURITY: toolResult.details and runtime-context transcript entries must never enter LLM-facing compaction.
|
||||
const safe = stripToolResultDetails(stripRuntimeContextCustomMessages(messages));
|
||||
return safe.reduce((sum, message) => sum + estimateTokens(message), 0);
|
||||
}
|
||||
|
||||
function estimateCompactionMessageTokens(message: AgentMessage): number {
|
||||
return estimateMessagesTokens([message]);
|
||||
}
|
||||
|
||||
function normalizeParts(parts: number, messageCount: number): number {
|
||||
if (!Number.isFinite(parts) || parts <= 1) {
|
||||
return 1;
|
||||
}
|
||||
return Math.min(Math.max(1, Math.floor(parts)), Math.max(1, messageCount));
|
||||
}
|
||||
|
||||
export function splitMessagesByTokenShare(
|
||||
messages: AgentMessage[],
|
||||
parts = DEFAULT_PARTS,
|
||||
): AgentMessage[][] {
|
||||
if (messages.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const normalizedParts = normalizeParts(parts, messages.length);
|
||||
if (normalizedParts <= 1) {
|
||||
return [messages];
|
||||
}
|
||||
|
||||
const totalTokens = estimateMessagesTokens(messages);
|
||||
const targetTokens = totalTokens / normalizedParts;
|
||||
const chunks: AgentMessage[][] = [];
|
||||
let current: AgentMessage[] = [];
|
||||
let currentTokens = 0;
|
||||
|
||||
let pendingToolCallIds = new Set<string>();
|
||||
let pendingChunkStartIndex: number | null = null;
|
||||
|
||||
const splitCurrentAtPendingBoundary = (): boolean => {
|
||||
if (
|
||||
pendingChunkStartIndex === null ||
|
||||
pendingChunkStartIndex <= 0 ||
|
||||
chunks.length >= normalizedParts - 1
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
chunks.push(current.slice(0, pendingChunkStartIndex));
|
||||
current = current.slice(pendingChunkStartIndex);
|
||||
currentTokens = current.reduce((sum, msg) => sum + estimateCompactionMessageTokens(msg), 0);
|
||||
pendingChunkStartIndex = 0;
|
||||
return true;
|
||||
};
|
||||
|
||||
for (const message of messages) {
|
||||
const messageTokens = estimateCompactionMessageTokens(message);
|
||||
|
||||
if (
|
||||
pendingToolCallIds.size === 0 &&
|
||||
chunks.length < normalizedParts - 1 &&
|
||||
current.length > 0 &&
|
||||
currentTokens + messageTokens > targetTokens
|
||||
) {
|
||||
chunks.push(current);
|
||||
current = [];
|
||||
currentTokens = 0;
|
||||
pendingChunkStartIndex = null;
|
||||
}
|
||||
|
||||
current.push(message);
|
||||
currentTokens += messageTokens;
|
||||
|
||||
if (message.role === "assistant") {
|
||||
const toolCalls = extractToolCallsFromAssistant(message);
|
||||
const stopReason = (message as { stopReason?: unknown }).stopReason;
|
||||
const keepsPending =
|
||||
stopReason !== "aborted" && stopReason !== "error" && toolCalls.length > 0;
|
||||
pendingToolCallIds = new Set();
|
||||
if (keepsPending) {
|
||||
for (const toolCall of toolCalls) {
|
||||
pendingToolCallIds.add(toolCall.id);
|
||||
}
|
||||
}
|
||||
pendingChunkStartIndex = keepsPending ? current.length - 1 : null;
|
||||
} else if (message.role === "toolResult" && pendingToolCallIds.size > 0) {
|
||||
const resultId = extractToolResultId(message);
|
||||
if (!resultId) {
|
||||
pendingToolCallIds = new Set();
|
||||
pendingChunkStartIndex = null;
|
||||
} else {
|
||||
pendingToolCallIds.delete(resultId);
|
||||
}
|
||||
if (
|
||||
pendingToolCallIds.size === 0 &&
|
||||
chunks.length < normalizedParts - 1 &&
|
||||
currentTokens > targetTokens
|
||||
) {
|
||||
splitCurrentAtPendingBoundary();
|
||||
pendingChunkStartIndex = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pendingToolCallIds.size > 0 && currentTokens > targetTokens) {
|
||||
splitCurrentAtPendingBoundary();
|
||||
}
|
||||
|
||||
if (current.length > 0) {
|
||||
chunks.push(current);
|
||||
}
|
||||
|
||||
return chunks;
|
||||
}
|
||||
|
||||
// Overhead reserved for summarization prompt, system prompt, previous summary,
|
||||
// and serialization wrappers (<conversation> tags, instructions, etc.).
|
||||
// generateSummary uses reasoning: "high" which also consumes context budget.
|
||||
export const SUMMARIZATION_OVERHEAD_TOKENS = 4096;
|
||||
|
||||
export function chunkMessagesByMaxTokens(
|
||||
messages: AgentMessage[],
|
||||
maxTokens: number,
|
||||
): AgentMessage[][] {
|
||||
if (messages.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Apply safety margin to compensate for estimateTokens() underestimation
|
||||
// (chars/4 heuristic misses multi-byte chars, special tokens, code tokens, etc.)
|
||||
const effectiveMax = Math.max(1, Math.floor(maxTokens / SAFETY_MARGIN));
|
||||
|
||||
const chunks: AgentMessage[][] = [];
|
||||
let currentChunk: AgentMessage[] = [];
|
||||
let currentTokens = 0;
|
||||
|
||||
for (const message of messages) {
|
||||
const messageTokens = estimateCompactionMessageTokens(message);
|
||||
if (currentChunk.length > 0 && currentTokens + messageTokens > effectiveMax) {
|
||||
chunks.push(currentChunk);
|
||||
currentChunk = [];
|
||||
currentTokens = 0;
|
||||
}
|
||||
|
||||
currentChunk.push(message);
|
||||
currentTokens += messageTokens;
|
||||
|
||||
if (messageTokens > effectiveMax) {
|
||||
// Split oversized messages to avoid unbounded chunk growth.
|
||||
chunks.push(currentChunk);
|
||||
currentChunk = [];
|
||||
currentTokens = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (currentChunk.length > 0) {
|
||||
chunks.push(currentChunk);
|
||||
}
|
||||
|
||||
return chunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute adaptive chunk ratio based on average message size.
|
||||
* When messages are large, we use smaller chunks to avoid exceeding model limits.
|
||||
*/
|
||||
export function computeAdaptiveChunkRatio(messages: AgentMessage[], contextWindow: number): number {
|
||||
if (messages.length === 0) {
|
||||
return BASE_CHUNK_RATIO;
|
||||
}
|
||||
|
||||
const totalTokens = estimateMessagesTokens(messages);
|
||||
const avgTokens = totalTokens / messages.length;
|
||||
|
||||
// Apply safety margin to account for estimation inaccuracy
|
||||
const safeAvgTokens = avgTokens * SAFETY_MARGIN;
|
||||
const avgRatio = safeAvgTokens / contextWindow;
|
||||
|
||||
// If average message is > 10% of context, reduce chunk ratio
|
||||
if (avgRatio > 0.1) {
|
||||
const reduction = Math.min(avgRatio * 2, BASE_CHUNK_RATIO - MIN_CHUNK_RATIO);
|
||||
return Math.max(MIN_CHUNK_RATIO, BASE_CHUNK_RATIO - reduction);
|
||||
}
|
||||
|
||||
return BASE_CHUNK_RATIO;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a single message is too large to summarize.
|
||||
* If single message > 50% of context, it can't be summarized safely.
|
||||
*/
|
||||
export function isOversizedForSummary(msg: AgentMessage, contextWindow: number): boolean {
|
||||
const tokens = estimateCompactionMessageTokens(msg) * SAFETY_MARGIN;
|
||||
return tokens > contextWindow * 0.5;
|
||||
}
|
||||
|
||||
async function summarizeChunks(params: {
|
||||
messages: AgentMessage[];
|
||||
model: NonNullable<ExtensionContext["model"]>;
|
||||
@@ -326,9 +155,11 @@ async function summarizeChunks(params: {
|
||||
return params.previousSummary ?? DEFAULT_SUMMARY_FALLBACK;
|
||||
}
|
||||
|
||||
// SECURITY: never feed toolResult.details or runtime-context transcript entries into summarization prompts.
|
||||
const safeMessages = stripToolResultDetails(stripRuntimeContextCustomMessages(params.messages));
|
||||
const chunks = chunkMessagesByMaxTokens(safeMessages, params.maxChunkTokens);
|
||||
const chunks = await buildSummaryChunksWithWorker({
|
||||
messages: params.messages,
|
||||
maxChunkTokens: params.maxChunkTokens,
|
||||
signal: params.signal,
|
||||
});
|
||||
let summary = params.previousSummary;
|
||||
const effectiveInstructions = buildCompactionSummarizationInstructions(
|
||||
params.customInstructions,
|
||||
@@ -454,21 +285,12 @@ export async function summarizeWithFallback(params: {
|
||||
partialSummaryFallback = (fullError as PartialSummaryError).partialSummary;
|
||||
}
|
||||
|
||||
// Fallback 1: Summarize only small messages, note oversized ones
|
||||
const smallMessages: AgentMessage[] = [];
|
||||
const oversizedNotes: string[] = [];
|
||||
|
||||
for (const msg of messages) {
|
||||
if (isOversizedForSummary(msg, contextWindow)) {
|
||||
const role = (msg as { role?: string }).role ?? "message";
|
||||
const tokens = estimateCompactionMessageTokens(msg);
|
||||
oversizedNotes.push(
|
||||
`[Large ${role} (~${Math.round(tokens / 1000)}K tokens) omitted from summary]`,
|
||||
);
|
||||
} else {
|
||||
smallMessages.push(msg);
|
||||
}
|
||||
}
|
||||
// Fallback 1: Summarize only small messages, note oversized ones.
|
||||
const { smallMessages, oversizedNotes } = await buildOversizedFallbackPlanWithWorker({
|
||||
messages,
|
||||
contextWindow,
|
||||
signal: params.signal,
|
||||
});
|
||||
|
||||
// When nothing was oversized, `smallMessages` is the same transcript as the full attempt.
|
||||
// Re-summarizing it would duplicate the same failing API work (and duplicate warn logs).
|
||||
@@ -523,21 +345,20 @@ export async function summarizeInStages(params: {
|
||||
return params.previousSummary ?? DEFAULT_SUMMARY_FALLBACK;
|
||||
}
|
||||
|
||||
const minMessagesForSplit = Math.max(2, params.minMessagesForSplit ?? 4);
|
||||
const parts = normalizeParts(params.parts ?? DEFAULT_PARTS, messages.length);
|
||||
const totalTokens = estimateMessagesTokens(messages);
|
||||
const plan = await buildStageSplitPlanWithWorker({
|
||||
messages,
|
||||
maxChunkTokens: params.maxChunkTokens,
|
||||
parts: params.parts,
|
||||
minMessagesForSplit: params.minMessagesForSplit,
|
||||
signal: params.signal,
|
||||
});
|
||||
|
||||
if (parts <= 1 || messages.length < minMessagesForSplit || totalTokens <= params.maxChunkTokens) {
|
||||
return summarizeWithFallback(params);
|
||||
}
|
||||
|
||||
const splits = splitMessagesByTokenShare(messages, parts).filter((chunk) => chunk.length > 0);
|
||||
if (splits.length <= 1) {
|
||||
if (plan.mode === "single") {
|
||||
return summarizeWithFallback(params);
|
||||
}
|
||||
|
||||
const partialSummaries: string[] = [];
|
||||
for (const chunk of splits) {
|
||||
for (const chunk of plan.chunks) {
|
||||
partialSummaries.push(
|
||||
await summarizeWithFallback({
|
||||
...params,
|
||||
@@ -569,73 +390,6 @@ export async function summarizeInStages(params: {
|
||||
});
|
||||
}
|
||||
|
||||
export function pruneHistoryForContextShare(params: {
|
||||
messages: AgentMessage[];
|
||||
maxContextTokens: number;
|
||||
maxHistoryShare?: number;
|
||||
parts?: number;
|
||||
mode?: "share" | "handoff";
|
||||
}): {
|
||||
messages: AgentMessage[];
|
||||
droppedMessagesList: AgentMessage[];
|
||||
droppedChunks: number;
|
||||
droppedMessages: number;
|
||||
droppedTokens: number;
|
||||
keptTokens: number;
|
||||
budgetTokens: number;
|
||||
} {
|
||||
const isHandoff = params.mode === "handoff";
|
||||
const defaultShare = isHandoff ? 0.2 : 0.5; // Stricter budget for handoff snapshots
|
||||
const maxHistoryShare = params.maxHistoryShare ?? defaultShare;
|
||||
const budgetTokens = Math.max(1, Math.floor(params.maxContextTokens * maxHistoryShare));
|
||||
let keptMessages = params.messages;
|
||||
const allDroppedMessages: AgentMessage[] = [];
|
||||
let droppedChunks = 0;
|
||||
let droppedMessages = 0;
|
||||
let droppedTokens = 0;
|
||||
|
||||
const parts = normalizeParts(params.parts ?? DEFAULT_PARTS, keptMessages.length);
|
||||
|
||||
while (keptMessages.length > 0 && estimateMessagesTokens(keptMessages) > budgetTokens) {
|
||||
const chunks = splitMessagesByTokenShare(keptMessages, parts);
|
||||
if (chunks.length <= 1) {
|
||||
break;
|
||||
}
|
||||
const [dropped, ...rest] = chunks;
|
||||
const flatRest = rest.flat();
|
||||
|
||||
// After dropping a chunk, repair tool_use/tool_result pairing to handle
|
||||
// orphaned tool_results (whose tool_use was in the dropped chunk).
|
||||
// repairToolUseResultPairing drops orphaned tool_results, preventing
|
||||
// "unexpected tool_use_id" errors from Anthropic's API.
|
||||
const repairReport = repairToolUseResultPairing(flatRest);
|
||||
const repairedKept = repairReport.messages;
|
||||
|
||||
// Track orphaned tool_results as dropped (they were in kept but their tool_use was dropped)
|
||||
const orphanedCount = repairReport.droppedOrphanCount;
|
||||
|
||||
droppedChunks += 1;
|
||||
droppedMessages += dropped.length + orphanedCount;
|
||||
droppedTokens += estimateMessagesTokens(dropped);
|
||||
// Note: We don't have the actual orphaned messages to add to droppedMessagesList
|
||||
// since repairToolUseResultPairing doesn't return them. This is acceptable since
|
||||
// the dropped messages are used for summarization, and orphaned tool_results
|
||||
// without their tool_use context aren't useful for summarization anyway.
|
||||
allDroppedMessages.push(...dropped);
|
||||
keptMessages = repairedKept;
|
||||
}
|
||||
|
||||
return {
|
||||
messages: keptMessages,
|
||||
droppedMessagesList: allDroppedMessages,
|
||||
droppedChunks,
|
||||
droppedMessages,
|
||||
droppedTokens,
|
||||
keptTokens: estimateMessagesTokens(keptMessages),
|
||||
budgetTokens,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a concise handoff summary for model transitions, enforcing a 4000 token limit.
|
||||
*/
|
||||
|
||||
@@ -99,6 +99,7 @@ describe("tsdown config", () => {
|
||||
"agents/model-catalog.runtime",
|
||||
"agents/models-config.runtime",
|
||||
"cli/gateway-lifecycle.runtime",
|
||||
"agents/compaction-planning.worker",
|
||||
"agents/model-provider-auth.worker",
|
||||
"plugins/memory-state",
|
||||
"subagent-registry.runtime",
|
||||
|
||||
@@ -655,6 +655,7 @@ describe("collectMissingPackPaths", () => {
|
||||
"scripts/lib/official-external-provider-catalog.json",
|
||||
"scripts/lib/package-dist-imports.mjs",
|
||||
"scripts/postinstall-bundled-plugins.mjs",
|
||||
"dist/agents/compaction-planning.worker.js",
|
||||
"dist/agents/model-provider-auth.worker.js",
|
||||
"dist/task-registry-control.runtime.js",
|
||||
"dist/telegram-ingress-worker.runtime.js",
|
||||
@@ -688,6 +689,7 @@ describe("collectMissingPackPaths", () => {
|
||||
"scripts/lib/package-dist-imports.mjs",
|
||||
"scripts/postinstall-bundled-plugins.mjs",
|
||||
"dist/plugin-sdk/root-alias.cjs",
|
||||
"dist/agents/compaction-planning.worker.js",
|
||||
"dist/agents/model-provider-auth.worker.js",
|
||||
"dist/task-registry-control.runtime.js",
|
||||
"dist/telegram-ingress-worker.runtime.js",
|
||||
|
||||
@@ -251,6 +251,7 @@ function buildCoreDistEntries(): Record<string, string> {
|
||||
"agents/model-catalog.runtime": "src/agents/model-catalog.runtime.ts",
|
||||
"agents/models-config.runtime": "src/agents/models-config.runtime.ts",
|
||||
"agents/code-mode.worker": "src/agents/code-mode.worker.ts",
|
||||
"agents/compaction-planning.worker": "src/agents/compaction-planning.worker.ts",
|
||||
"agents/model-provider-auth.worker": "src/agents/model-provider-auth.worker.ts",
|
||||
"acp/control-plane/manager": "src/acp/control-plane/manager.ts",
|
||||
"cli/gateway-lifecycle.runtime": "src/cli/gateway-cli/lifecycle.runtime.ts",
|
||||
|
||||
Reference in New Issue
Block a user