Compare commits

..

9 Commits

Author SHA1 Message Date
kevinlin-openai
020be8f434 fix(codex): version app inventory cache keys 2026-06-27 13:57:03 -07:00
zengLingbiao
1b8b8500ce fix(auto-reply): truncate user-facing text on UTF-16 boundary (#97299)
Summary:
- The PR imports `truncateUtf16Safe` and uses it for the Codex usage-limit preview and verbose working-label truncation paths in auto-reply.
- PR surface: Source +2. Total +2 across 2 files.
- Reproducibility: yes. Current main uses raw `slice(0, N)` at both reported user-facing truncation sites, and ... ludes terminal before/after output showing dangling surrogates before the fix and safe truncation after it.

Automerge notes:
- No ClawSweeper repair was needed after automerge opt-in.

Validation:
- ClawSweeper review passed for head 74a0a32ed9.
- Required merge gates passed before the squash merge.

Prepared head SHA: 74a0a32ed9
Review: https://github.com/openclaw/openclaw/pull/97299#issuecomment-4820038635

Co-authored-by: zenglingbiao <zeng.lingbiao@xydigit.com>
Approved-by: takhoffman
2026-06-27 20:39:32 +00:00
Gio Della-Libera
c29e1fe764 Add hosted external catalog feed loader (#95868)
Merged via squash.

Prepared head SHA: 76da9328af
Co-authored-by: giodl73-repo <235387111+giodl73-repo@users.noreply.github.com>
Co-authored-by: giodl73-repo <235387111+giodl73-repo@users.noreply.github.com>
Reviewed-by: @giodl73-repo
2026-06-27 13:20:05 -07:00
Ayaan Zaidi
c52adf7505 test(telegram): cover retained preview chunk gaps 2026-06-27 12:30:57 -07:00
Ayaan Zaidi
199700de26 fix(telegram): replay retained preview gaps 2026-06-27 12:30:57 -07:00
Hannes Rudolph
b14a95b3fd docs: revert v2026.6.9 changelog update (#97306)
This reverts commit ebf1ba70d5.
2026-06-27 13:04:51 -06:00
Hannes Rudolph
ebf1ba70d5 docs: update changelog for v2026.6.9 (#97124) 2026-06-27 12:37:53 -06:00
Hannes Rudolph
78d70230b6 docs: align v2026.6.10 changelog heading (#97297) 2026-06-27 12:37:24 -06:00
shengting
98ed83f848 fix(model-fallback): don't rethrow provider-side AbortErrors as user cancellations (#90908)
* fix(model-fallback): don't rethrow provider-side AbortErrors as user cancellations

When the LLM API closes the connection mid-stream, the fetch layer
surfaces AbortError("This operation was aborted") with no external
abort signal triggered. The old guard `shouldRethrowAbort()` returned
false for these errors (because isTimeoutError matched the message),
so they fell through to the fallback loop but were never retried —
the error propagated up and produced SILENT_REPLY_TOKEN in group
sessions, permanently silencing the topic.

Replace the guard with a direct check: only rethrow AbortError when
the external abort signal is actually set (user/gateway cancellation).
Provider-side AbortErrors without an external signal now fall through
to the next fallback candidate, giving the system a chance to recover.

* fix(cron): forward abort signal into runWithModelFallback

Thread the cron executor's abort signal into the shared
runWithModelFallback call so that cron timeouts and cancellations
stop the fallback chain instead of retrying with the next candidate.

Previously, the run callback checked params.abortSignal?.aborted and
threw, but runWithModelFallback itself had no signal — so the new
guard in model-fallback.ts could not distinguish a caller abort from
a provider-side AbortError and would retry silently.

Also adds a focused regression test verifying the signal is forwarded.

---------

Co-authored-by: Shengting Xie <shengting@openclaw.ai>
Co-authored-by: yayu <yayu@yayuMacStudio.local>
2026-06-27 20:53:16 +03:00
25 changed files with 1006 additions and 573 deletions

View File

@@ -81,7 +81,7 @@ Automatic fast mode starts short conversations quickly, then returns longer or f
- Prevents [Docker](https://docs.openclaw.ai/install/docker) and [Podman](https://docs.openclaw.ai/install/podman) setup from running unbounded on hosts where GNU timeout is installed as `gtimeout`, so image pulls, builds, and detached startup receive the intended guard. [62b2e9e](https://github.com/openclaw/openclaw/commit/62b2e9ef14b4be6fd396621c8e5e248331f08695).
### Plugins, Packaging, and QA
### Plugins and Packaging
#### Codex service-tier clearing
@@ -96,7 +96,6 @@ Automatic fast mode starts short conversations quickly, then returns longer or f
#### Doctor check ordering
- Keeps core [`openclaw doctor`](https://docs.openclaw.ai/gateway/doctor) diagnostics in their normal order before extension checks, making lint and repair output easier to follow. [PR #86627](https://github.com/openclaw/openclaw/pull/86627). Thanks @giodl73-repo.
## 2026.6.9
### Highlights

View File

@@ -18,7 +18,11 @@ describe("Codex app inventory cache", () => {
} satisfies v2.AppsListResponse;
});
const key = buildCodexAppInventoryCacheKey({ codexHome: "/codex", authProfileId: "work" });
const key = buildCodexAppInventoryCacheKey(
{ codexHome: "/codex", authProfileId: "work" },
"2026.6.27",
"2026.6.27",
);
const read = cache.read({ key, request, nowMs: 0 });
expect(read.state).toBe("missing");
expect(read.refreshScheduled).toBe(true);
@@ -33,6 +37,14 @@ describe("Codex app inventory cache", () => {
expect(fresh.snapshot?.apps.map((item) => item.id)).toEqual(["app-1", "app-2"]);
});
it("changes the cache key when either build version changes", () => {
const input = { codexHome: "/codex", authProfileId: "work" };
const baseline = buildCodexAppInventoryCacheKey(input, "2026.6.27", "2026.6.27");
expect(buildCodexAppInventoryCacheKey(input, "2026.6.28", "2026.6.27")).not.toBe(baseline);
expect(buildCodexAppInventoryCacheKey(input, "2026.6.27", "2026.6.28")).not.toBe(baseline);
});
it("can read missing inventory without scheduling app/list", async () => {
const cache = new CodexAppInventoryCache({ ttlMs: 100 });
const request = vi.fn(async () => {
@@ -76,10 +88,7 @@ describe("Codex app inventory cache", () => {
expect(snapshot.apps.map((item) => item.id)).toEqual(["app-1", "google-calendar-app"]);
expect(request).toHaveBeenCalledTimes(2);
expect(request.mock.calls.map(([, params]) => params.cursor ?? null)).toEqual([
null,
"page-2",
]);
expect(request.mock.calls.map(([, params]) => params.cursor ?? null)).toEqual([null, "page-2"]);
});
it("uses stale inventory for the current read while still refreshing asynchronously", async () => {

View File

@@ -252,9 +252,15 @@ export function serializeCodexAppInventoryError(error: unknown): Record<string,
/** Shared app inventory cache used by Codex app-server runtime paths. */
export const defaultCodexAppInventoryCache = new CodexAppInventoryCache();
/** Builds a stable cache key from runtime identity fields. */
export function buildCodexAppInventoryCacheKey(input: CodexAppInventoryCacheKeyInput): string {
/** Builds a stable cache key from build versions and runtime identity fields. */
export function buildCodexAppInventoryCacheKey(
input: CodexAppInventoryCacheKeyInput,
openClawVersion: string,
codexPluginVersion: string,
): string {
return JSON.stringify({
openClawVersion,
codexPluginVersion,
codexHome: input.codexHome ?? null,
endpoint: input.endpoint ?? null,
runtimeIdentity: normalizeRuntimeIdentityForCacheKey(input.runtimeIdentity),

View File

@@ -3,13 +3,19 @@
* auth, account, and version inputs without storing secret material.
*/
import { createHash } from "node:crypto";
import { createRequire } from "node:module";
import { OPENCLAW_VERSION } from "openclaw/plugin-sdk/agent-harness-runtime";
import { readPluginPackageVersion } from "openclaw/plugin-sdk/extension-shared";
import {
buildCodexAppInventoryCacheKey,
type CodexAppInventoryCacheKeyInput,
} from "./app-inventory-cache.js";
import { resolveCodexAppServerHomeDir } from "./auth-bridge.js";
import type { CodexAppServerRuntimeOptions, CodexAppServerStartOptions } from "./config.js";
import type { CodexAppServerRuntimeIdentity } from "./client.js";
import type { CodexAppServerRuntimeOptions, CodexAppServerStartOptions } from "./config.js";
const require = createRequire(import.meta.url);
const CODEX_PLUGIN_VERSION = readPluginPackageVersion({ require });
/** Inputs that identify the Codex app inventory cache scope for one runtime. */
export type CodexPluginAppCacheKeyParams = Omit<
@@ -23,17 +29,21 @@ export type CodexPluginAppCacheKeyParams = Omit<
/** Builds the full app inventory cache key for Codex plugin/app discovery. */
export function buildCodexPluginAppCacheKey(params: CodexPluginAppCacheKeyParams): string {
return buildCodexAppInventoryCacheKey({
codexHome:
params.runtimeIdentity?.codexHome ??
resolveCodexPluginAppCacheCodexHome(params.appServer, params.agentDir),
endpoint: resolveCodexPluginAppCacheEndpoint(params.appServer),
authProfileId: params.authProfileId,
accountId: params.accountId,
envApiKeyFingerprint: params.envApiKeyFingerprint,
appServerVersion: params.appServerVersion ?? params.runtimeIdentity?.serverVersion,
runtimeIdentity: params.runtimeIdentity,
});
return buildCodexAppInventoryCacheKey(
{
codexHome:
params.runtimeIdentity?.codexHome ??
resolveCodexPluginAppCacheCodexHome(params.appServer, params.agentDir),
endpoint: resolveCodexPluginAppCacheEndpoint(params.appServer),
authProfileId: params.authProfileId,
accountId: params.accountId,
envApiKeyFingerprint: params.envApiKeyFingerprint,
appServerVersion: params.appServerVersion ?? params.runtimeIdentity?.serverVersion,
runtimeIdentity: params.runtimeIdentity,
},
OPENCLAW_VERSION,
CODEX_PLUGIN_VERSION,
);
}
/** Builds a durable thread-binding fingerprint for one initialized app-server runtime. */

View File

@@ -4445,12 +4445,12 @@ describe("createTelegramBot", () => {
});
expect(sendMessageSpy.mock.calls.length).toBeGreaterThan(1);
for (const call of sendMessageSpy.mock.calls) {
for (const [index, call] of sendMessageSpy.mock.calls.entries()) {
const params = call[2] as
| { reply_to_message_id?: number; reply_parameters?: { message_id?: number } }
| undefined;
const actual = params?.reply_parameters?.message_id ?? params?.reply_to_message_id;
if (mode === "all") {
if (mode === "all" || index === 0) {
expect(actual).toBe(messageId);
} else {
expect(actual).toBeUndefined();

View File

@@ -326,39 +326,36 @@ async function sendTelegramVoiceFallbackText(opts: {
silent?: boolean;
replyMarkup?: ReturnType<typeof buildInlineKeyboard>;
replyQuoteText?: string;
replyToMode?: ReplyToMode;
}): Promise<number | undefined> {
let firstDeliveredMessageId: number | undefined;
const chunks = filterEmptyTelegramTextChunks(opts.chunkText(opts.text));
await sendChunkedTelegramReplyText({
chunks,
progress: { hasReplied: false, hasDelivered: false },
replyToId: opts.replyToId,
replyToMode: opts.replyToMode ?? "first",
replyMarkup: opts.replyMarkup,
replyQuoteText: opts.replyQuoteText,
quoteOnlyOnFirstChunk: true,
sendChunk: async ({ chunk, replyToMessageId, replyMarkup, replyQuoteText }) => {
const messageId = await sendTelegramText(opts.bot, opts.chatId, chunk.text, opts.runtime, {
replyToMessageId,
replyQuoteMessageId: replyToMessageId ? opts.replyQuoteMessageId : undefined,
replyQuoteText,
replyQuotePosition: replyToMessageId ? opts.replyQuotePosition : undefined,
replyQuoteEntities: replyToMessageId ? opts.replyQuoteEntities : undefined,
thread: opts.thread,
textMode: chunk.textMode,
plainText: chunk.plainText,
richMessages: opts.richMessages,
linkPreview: opts.linkPreview,
tableMode: opts.tableMode,
silent: opts.silent,
replyMarkup,
});
if (firstDeliveredMessageId == null) {
firstDeliveredMessageId = messageId;
}
},
});
let appliedReplyTo = false;
for (const chunk of chunks) {
// Only apply reply reference, quote text, and buttons to the first chunk.
const replyToForChunk = !appliedReplyTo ? opts.replyToId : undefined;
const applyQuoteForChunk = !appliedReplyTo;
const messageId = await sendTelegramText(opts.bot, opts.chatId, chunk.text, opts.runtime, {
replyToMessageId: replyToForChunk,
replyQuoteMessageId: applyQuoteForChunk ? opts.replyQuoteMessageId : undefined,
replyQuoteText: applyQuoteForChunk ? opts.replyQuoteText : undefined,
replyQuotePosition: applyQuoteForChunk ? opts.replyQuotePosition : undefined,
replyQuoteEntities: applyQuoteForChunk ? opts.replyQuoteEntities : undefined,
thread: opts.thread,
textMode: chunk.textMode,
plainText: chunk.plainText,
richMessages: opts.richMessages,
linkPreview: opts.linkPreview,
tableMode: opts.tableMode,
silent: opts.silent,
replyMarkup: !appliedReplyTo ? opts.replyMarkup : undefined,
});
if (firstDeliveredMessageId == null) {
firstDeliveredMessageId = messageId;
}
if (replyToForChunk) {
appliedReplyTo = true;
}
}
return firstDeliveredMessageId;
}
@@ -538,7 +535,6 @@ async function deliverMediaReply(params: {
silent: params.silent,
replyMarkup: params.replyMarkup,
replyQuoteText: params.replyQuoteText,
replyToMode: params.replyToMode,
});
if (firstDeliveredMessageId == null) {
firstDeliveredMessageId = fallbackMessageId;

View File

@@ -1484,7 +1484,7 @@ describe("deliverReplies", () => {
expectRecordFields(mockCallArg(sendMessage, 0, 2), { disable_notification: true });
});
it("voice fallback avoids native replies for chunked first-mode fallback text", async () => {
it("voice fallback applies reply-to only on first chunk when replyToMode is first", async () => {
const { runtime, sendVoice, sendMessage, bot } = createVoiceFailureHarness({
voiceError: createVoiceMessagesForbiddenError(),
sendMessageResult: {
@@ -1520,12 +1520,15 @@ describe("deliverReplies", () => {
expect(sendVoice).toHaveBeenCalledTimes(1);
expect(sendMessage.mock.calls.length).toBeGreaterThanOrEqual(2);
expectRecordFields(mockCallArg(sendMessage, 0, 2), {
reply_parameters: {
message_id: 77,
quote: "quoted context",
allow_sending_without_reply: true,
},
reply_markup: {
inline_keyboard: [[{ text: "Ack", callback_data: "ack" }]],
},
});
expect(mockCallArg(sendMessage, 0, 2)).not.toHaveProperty("reply_to_message_id");
expect(mockCallArg(sendMessage, 0, 2)).not.toHaveProperty("reply_parameters");
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_to_message_id", 77);
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_parameters");
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_markup");
@@ -1552,32 +1555,7 @@ describe("deliverReplies", () => {
expect(sendMessage).not.toHaveBeenCalled();
});
it("replyToMode 'first' keeps native reply-to for a single text chunk", async () => {
const runtime = createRuntime();
const sendMessage = vi.fn().mockResolvedValue({
message_id: 20,
chat: { id: "123" },
});
const bot = createBot({ sendMessage });
await deliverReplies({
replies: [{ text: "one chunk", replyToId: "700" }],
chatId: "123",
token: "tok",
runtime,
bot,
replyToMode: "first",
textLimit: 4000,
});
expect(sendMessage).toHaveBeenCalledTimes(1);
expectRecordFields(mockCallArg(sendMessage, 0, 2), {
reply_to_message_id: 700,
allow_sending_without_reply: true,
});
});
it("replyToMode 'first' avoids native reply-to for chunked text", async () => {
it("replyToMode 'first' only applies reply-to to the first text chunk", async () => {
const runtime = createRuntime();
const sendMessage = vi.fn().mockResolvedValue({
message_id: 20,
@@ -1597,10 +1575,13 @@ describe("deliverReplies", () => {
});
expect(sendMessage.mock.calls.length).toBeGreaterThanOrEqual(2);
for (const call of sendMessage.mock.calls) {
expect(call[2]).not.toHaveProperty("reply_to_message_id");
expect(call[2]).not.toHaveProperty("reply_parameters");
}
// First chunk should have reply_to_message_id
expectRecordFields(mockCallArg(sendMessage, 0, 2), {
reply_to_message_id: 700,
allow_sending_without_reply: true,
});
// Second chunk should NOT have reply_to_message_id
expect(mockCallArg(sendMessage, 1, 2)).not.toHaveProperty("reply_to_message_id");
});
it("clamps reply chunks to Telegram rich message limit", async () => {

View File

@@ -184,9 +184,9 @@ describe("buildTelegramThreadParams", () => {
{ input: { id: 0, scope: "dm" as const }, expected: undefined },
{ input: { id: -1, scope: "dm" as const }, expected: undefined },
{ input: { id: 1.9, scope: "dm" as const }, expected: { message_thread_id: 1 } },
// id=0 should be included for forum scope (not falsy).
// id=0 should be included for forum and none scopes (not falsy)
{ input: { id: 0, scope: "forum" as const }, expected: { message_thread_id: 0 } },
{ input: { id: 42, scope: "none" as const }, expected: undefined },
{ input: { id: 0, scope: "none" as const }, expected: { message_thread_id: 0 } },
])("builds thread params", ({ input, expected }) => {
expect(buildTelegramThreadParams(input)).toEqual(expected);
});

View File

@@ -427,10 +427,6 @@ export function buildTelegramThreadParams(thread?: TelegramThreadSpec | null) {
return normalized > 0 ? { message_thread_id: normalized } : undefined;
}
if (thread.scope === "none") {
return undefined;
}
// Telegram rejects message_thread_id=1 for General forum topic
if (normalized === TELEGRAM_GENERAL_TOPIC_ID) {
return undefined;

View File

@@ -1,6 +1,5 @@
// Telegram plugin module implements reply threading behavior.
import type { ReplyToMode } from "openclaw/plugin-sdk/config-contracts";
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
export type DeliveryProgress = {
hasReplied: boolean;
@@ -49,24 +48,17 @@ export async function sendChunkedTelegramReplyText<
}) => Promise<void>;
}): Promise<void> {
const applyDelivered = params.markDelivered ?? markDelivered;
const suppressSingleUseReply =
params.chunks.length > 1 && isSingleUseReplyToMode(params.replyToMode);
for (let i = 0; i < params.chunks.length; i += 1) {
const chunk = params.chunks[i];
if (!chunk) {
continue;
}
const isFirstChunk = i === 0;
// Telegram Desktop can render long formatted native-reply chunks as
// unsupported messages. Multi-part `first` replies consume the reply target
// without adding native reply params, preserving visible text.
const replyToMessageId = suppressSingleUseReply
? undefined
: resolveReplyToForSend({
replyToId: params.replyToId,
replyToMode: params.replyToMode,
progress: params.progress,
});
const replyToMessageId = resolveReplyToForSend({
replyToId: params.replyToId,
replyToMode: params.replyToMode,
progress: params.progress,
});
const shouldAttachQuote =
Boolean(replyToMessageId) &&
Boolean(params.replyQuoteText) &&
@@ -78,10 +70,7 @@ export async function sendChunkedTelegramReplyText<
replyMarkup: isFirstChunk ? params.replyMarkup : undefined,
replyQuoteText: shouldAttachQuote ? params.replyQuoteText : undefined,
});
markReplyApplied(
params.progress,
suppressSingleUseReply && isFirstChunk ? params.replyToId : replyToMessageId,
);
markReplyApplied(params.progress, replyToMessageId);
applyDelivered(params.progress);
}
}

View File

@@ -82,37 +82,19 @@ describe("telegram channel message adapter", () => {
};
const provePayload = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({
messageId: "tg-payload-2",
chatId: "12345",
receipt: {
primaryPlatformMessageId: "tg-payload-1",
platformMessageIds: ["tg-payload-1", "tg-payload-2"],
parts: [
{ platformMessageId: "tg-payload-1", kind: "text", index: 0 },
{ platformMessageId: "tg-payload-2", kind: "text", index: 1 },
],
sentAt: 123,
},
});
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-payload", chatId: "12345" });
const result = await adapter.send!.payload!({
cfg: {} as never,
to: "12345",
text: "payload",
payload: { text: "payload" },
replyToId: "900",
replyToIdSource: "implicit",
replyToMode: "first",
threadId: "12",
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith("12345", "payload", {
cfg: {},
verbose: false,
messageThreadId: 12,
replyToMessageId: 900,
replyToIdSource: "implicit",
replyToMode: "first",
messageThreadId: undefined,
replyToMessageId: undefined,
accountId: undefined,
silent: undefined,
gatewayClientScopes: undefined,
@@ -122,8 +104,7 @@ describe("telegram channel message adapter", () => {
quoteText: undefined,
buttons: undefined,
});
expect(result.receipt.primaryPlatformMessageId).toBe("tg-payload-1");
expect(result.receipt.platformMessageIds).toEqual(["tg-payload-1", "tg-payload-2"]);
expect(result.receipt.platformMessageIds).toEqual(["tg-payload"]);
};
const proveReplyThreadSilent = async () => {
@@ -133,8 +114,6 @@ describe("telegram channel message adapter", () => {
to: "12345",
text: "threaded",
replyToId: "900",
replyToIdSource: "implicit",
replyToMode: "first",
threadId: "12",
silent: true,
deps: { sendTelegram: sendMessageTelegramMock },
@@ -144,8 +123,6 @@ describe("telegram channel message adapter", () => {
verbose: false,
messageThreadId: 12,
replyToMessageId: 900,
replyToIdSource: "implicit",
replyToMode: "first",
accountId: undefined,
silent: true,
gatewayClientScopes: undefined,
@@ -161,9 +138,6 @@ describe("telegram channel message adapter", () => {
cfg: {} as never,
to: "12345",
text: "batch",
replyToId: "900",
replyToIdSource: "implicit",
replyToMode: "first",
payload: {
text: "batch",
mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
@@ -178,9 +152,7 @@ describe("telegram channel message adapter", () => {
cfg: {},
verbose: false,
messageThreadId: undefined,
replyToMessageId: 900,
replyToIdSource: "implicit",
replyToMode: "first",
replyToMessageId: undefined,
accountId: undefined,
silent: undefined,
gatewayClientScopes: undefined,
@@ -200,8 +172,6 @@ describe("telegram channel message adapter", () => {
verbose: false,
messageThreadId: undefined,
replyToMessageId: undefined,
replyToIdSource: undefined,
replyToMode: undefined,
accountId: undefined,
silent: undefined,
gatewayClientScopes: undefined,
@@ -252,36 +222,6 @@ describe("telegram channel message adapter", () => {
});
});
it("keeps implicit first replies on the first delivered payload media", async () => {
const adapter = requireTelegramMessageAdapter();
sendMessageTelegramMock
.mockResolvedValueOnce({ messageId: "tg-media-1", chatId: "12345" })
.mockResolvedValueOnce({ messageId: "tg-media-2", chatId: "12345" });
await adapter.send!.payload!({
cfg: {} as never,
to: "12345",
text: "batch",
replyToId: "900",
replyToIdSource: "implicit",
replyToMode: "first",
payload: {
text: "batch",
mediaUrls: ["", "https://example.com/a.png", "https://example.com/b.png"],
},
deps: { sendTelegram: sendMessageTelegramMock },
});
const firstOpts = sendMessageTelegramMock.mock.calls[0]?.[2] as
| { replyToMessageId?: number }
| undefined;
const secondOpts = sendMessageTelegramMock.mock.calls[1]?.[2] as
| { replyToMessageId?: number }
| undefined;
expect(firstOpts?.replyToMessageId).toBe(900);
expect(secondOpts?.replyToMessageId).toBeUndefined();
});
it("backs declared live preview finalizer capabilities with adapter proofs", async () => {
const adapter = requireTelegramMessageAdapter();

View File

@@ -364,7 +364,11 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
};
const candidateTexts = [stream.lastDeliveredText?.(), lane.lastPartialText];
if (useFinalTextRecovery && remainingChunks.length === 0 && isPotentialTruncatedFinal(activeFullText)) {
if (
useFinalTextRecovery &&
remainingChunks.length === 0 &&
isPotentialTruncatedFinal(activeFullText)
) {
const resolvedFullCandidate = await params.resolveFinalTextCandidate?.({
finalText: text,
laneName,
@@ -379,7 +383,9 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
}
const retainedPreview =
useFinalTextRecovery && remainingChunks.length === 0 && isPotentialTruncatedFinal(activeFullText)
useFinalTextRecovery &&
remainingChunks.length === 0 &&
isPotentialTruncatedFinal(activeFullText)
? selectLongerFinalText({
finalText: activeFullText,
candidateTexts,
@@ -443,9 +449,20 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
} else {
await params.flushDraftLane(lane);
}
const activeChunkIndexAfterStop = useFinalTextRecovery ? clampActiveChunkIndex() : activeChunkIndex;
const activeChunkAfterStop = chunks[activeChunkIndexAfterStop] ?? activeChunk;
const remainingChunksAfterStop = chunks.slice(activeChunkIndexAfterStop + 1);
const activeChunkIndexAfterStop = useFinalTextRecovery
? clampActiveChunkIndex()
: activeChunkIndex;
const deliveredStreamTextAfterStop = stream.lastDeliveredText?.();
const retainedOriginalActiveChunkAfterStop =
activeChunkIndexAfterStop > activeChunkIndex &&
deliveredStreamTextAfterStop === activeChunk.trimEnd();
// `activeChunkIndex` is advanced by retained preview callbacks. If callbacks
// outrun the stream's delivered text, trust the delivered text and replay the gap.
const effectiveActiveChunkIndexAfterStop = retainedOriginalActiveChunkAfterStop
? activeChunkIndex
: activeChunkIndexAfterStop;
const activeChunkAfterStop = chunks[effectiveActiveChunkIndexAfterStop] ?? activeChunk;
const remainingChunksAfterStop = chunks.slice(effectiveActiveChunkIndexAfterStop + 1);
const messageId = stream.messageId();
if (typeof messageId !== "number") {
@@ -460,16 +477,12 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
return undefined;
}
const deliveredStreamTextAfterStop = stream.lastDeliveredText?.();
const activeChunkTextAfterStop = activeChunkAfterStop.trimEnd();
const retainedActiveChunkAfterStop =
activeChunkIndexAfterStop !== activeChunkIndex &&
deliveredStreamTextAfterStop === activeChunk.trimEnd();
if (
finalizePreview &&
deliveredStreamTextAfterStop !== undefined &&
deliveredStreamTextAfterStop !== activeChunkTextAfterStop &&
!retainedActiveChunkAfterStop
!retainedOriginalActiveChunkAfterStop
) {
if (
useFinalTextRecovery &&

View File

@@ -228,10 +228,7 @@ describe("createLaneTextDeliverer", () => {
expect(answer.update).toHaveBeenCalledWith(previousBlock);
expect(answer.update).not.toHaveBeenCalledWith(nextAssistantBlock);
expect(harness.clearDraftLane).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith(
{ text: previousBlock },
{ durable: false },
);
expect(harness.sendPayload).toHaveBeenCalledWith({ text: previousBlock }, { durable: false });
expect(harness.sendPayload).not.toHaveBeenCalledWith(
{ text: nextAssistantBlock },
expect.anything(),
@@ -924,7 +921,7 @@ describe("createLaneTextDeliverer", () => {
expect(harness.markDelivered).toHaveBeenCalledTimes(1);
});
it("does not resend chunks retained while stopping a long streamed final", async () => {
it("sends chunks after the reported delivered text when stop advances the retained index", async () => {
const answer = createTestDraftStream({ messageId: 999 });
const harness = createHarness({
answerStream: answer,
@@ -940,11 +937,35 @@ describe("createLaneTextDeliverer", () => {
const delivery = expectPreviewFinalized(result);
expect(delivery.content).toBe("Hello world again");
expect(harness.sendPayload).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith({ text: " again" });
expect(delivery.promptContextContent).toBe("Hello");
expect(harness.sendPayload).toHaveBeenCalledTimes(2);
expect(harness.sendPayload).toHaveBeenNthCalledWith(1, { text: " world" });
expect(harness.sendPayload).toHaveBeenNthCalledWith(2, { text: " again" });
expect(harness.markDelivered).toHaveBeenCalledTimes(1);
});
it("does not skip middle chunks when stop advances past the reported delivered chunk", async () => {
const answer = createTestDraftStream({ messageId: 999 });
const harness = createHarness({
answerStream: answer,
draftMaxChars: 6,
splitFinalTextForStream: () => ["chunk0", "chunk1", "chunk2", "chunk3"],
});
harness.lanes.answer.hasStreamedMessage = true;
answer.stop.mockImplementation(async () => {
harness.lanes.answer.activeChunkIndex = 2;
});
const result = await deliverFinalAnswer(harness, "chunk0chunk1chunk2chunk3");
const delivery = expectPreviewFinalized(result);
expect(delivery.promptContextContent).toBe("chunk0");
expect(harness.sendPayload).toHaveBeenCalledTimes(3);
expect(harness.sendPayload).toHaveBeenNthCalledWith(1, { text: "chunk1" });
expect(harness.sendPayload).toHaveBeenNthCalledWith(2, { text: "chunk2" });
expect(harness.sendPayload).toHaveBeenNthCalledWith(3, { text: "chunk3" });
});
it("compares retained delivered prefixes against the full final text", async () => {
let deliveredText = "Hello";
const answer = createTestDraftStream({ messageId: 999 });
@@ -995,11 +1016,15 @@ describe("createLaneTextDeliverer", () => {
expect(harness.editStreamMessage).toHaveBeenCalledWith({
laneName: "answer",
messageId: 999,
text: " world",
text: "Hello",
buttons,
});
expect(harness.sendPayload).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith({
expect(harness.sendPayload).toHaveBeenCalledTimes(2);
expect(harness.sendPayload).toHaveBeenNthCalledWith(1, {
text: " world",
channelData: { telegram: { buttons } },
});
expect(harness.sendPayload).toHaveBeenNthCalledWith(2, {
text: " again",
channelData: { telegram: { buttons } },
});

View File

@@ -19,7 +19,6 @@ import {
resolvePayloadMediaUrls,
sendPayloadMediaSequenceOrFallback,
} from "openclaw/plugin-sdk/reply-payload";
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import { sanitizeAssistantVisibleText } from "openclaw/plugin-sdk/text-chunking";
import type { TelegramInlineButtons } from "./button-types.js";
@@ -61,8 +60,6 @@ async function resolveTelegramSendContext(params: {
deps?: OutboundSendDeps;
accountId?: string | null;
replyToId?: string | null;
replyToIdSource?: TelegramSendOpts["replyToIdSource"];
replyToMode?: TelegramSendOpts["replyToMode"];
threadId?: string | number | null;
formatting?: OutboundDeliveryFormattingOptions;
silent?: boolean;
@@ -77,8 +74,6 @@ async function resolveTelegramSendContext(params: {
tableMode?: OutboundDeliveryFormattingOptions["tableMode"];
messageThreadId?: number;
replyToMessageId?: number;
replyToIdSource?: TelegramSendOpts["replyToIdSource"];
replyToMode?: TelegramSendOpts["replyToMode"];
accountId?: string;
silent?: boolean;
gatewayClientScopes?: readonly string[];
@@ -92,8 +87,6 @@ async function resolveTelegramSendContext(params: {
cfg: params.cfg,
messageThreadId: parseTelegramThreadId(params.threadId),
replyToMessageId: parseTelegramReplyToMessageId(params.replyToId),
...(params.replyToIdSource !== undefined ? { replyToIdSource: params.replyToIdSource } : {}),
...(params.replyToMode !== undefined ? { replyToMode: params.replyToMode } : {}),
accountId: params.accountId ?? undefined,
silent: params.silent,
gatewayClientScopes: params.gatewayClientScopes,
@@ -158,19 +151,6 @@ export async function sendTelegramPayloadMessages(params: {
quoteText,
...(params.payload.audioAsVoice === true ? { asVoice: true } : {}),
};
const shouldConsumeImplicitReplyTarget =
payloadOpts.replyToIdSource === "implicit" &&
payloadOpts.replyToMode !== undefined &&
isSingleUseReplyToMode(payloadOpts.replyToMode);
const consumedImplicitReplyPayloadOpts = shouldConsumeImplicitReplyTarget
? {
...payloadOpts,
replyToMessageId: undefined,
replyToIdSource: undefined,
replyToMode: undefined,
}
: payloadOpts;
let implicitReplyTargetAvailable = true;
if (reactionEmoji) {
if (typeof replyToMessageId !== "number") {
throw new Error("Telegram reaction requires a reply target");
@@ -199,18 +179,12 @@ export async function sendTelegramPayloadMessages(params: {
...payloadOpts,
buttons,
}),
send: async ({ text: textLocal, mediaUrl, isFirst }) => {
const mediaPayloadOpts =
shouldConsumeImplicitReplyTarget && !implicitReplyTargetAvailable
? consumedImplicitReplyPayloadOpts
: payloadOpts;
implicitReplyTargetAvailable = false;
return await params.send(params.to, textLocal, {
...mediaPayloadOpts,
send: async ({ text: textLocal, mediaUrl, isFirst }) =>
await params.send(params.to, textLocal, {
...payloadOpts,
mediaUrl,
...(isFirst ? { buttons } : {}),
});
},
}),
});
}

View File

@@ -1650,49 +1650,6 @@ describe("sendMessageTelegram", () => {
expect(res.messageId).toBe("71");
});
it("does not reuse first-mode reply-to on media caption follow-up text", async () => {
const chatId = "123";
const longText = "A".repeat(1100);
const sendPhoto = vi.fn().mockResolvedValue({
message_id: 70,
chat: { id: chatId },
});
const sendMessage = vi.fn().mockResolvedValue({
message_id: 71,
chat: { id: chatId },
});
const api = { sendPhoto, sendMessage } as unknown as {
sendPhoto: typeof sendPhoto;
sendMessage: typeof sendMessage;
};
mockLoadedMedia({
buffer: Buffer.from("fake-image"),
contentType: "image/jpeg",
fileName: "photo.jpg",
});
await sendMessageTelegram(chatId, longText, {
cfg: TELEGRAM_TEST_CFG,
token: "tok",
api,
mediaUrl: "https://example.com/photo.jpg",
replyToMessageId: 500,
replyToIdSource: "implicit",
replyToMode: "first",
});
expectMediaSendCall(firstMockCall(sendPhoto, "send photo call"), "send photo call", chatId, {
caption: undefined,
reply_to_message_id: 500,
allow_sending_without_reply: true,
});
expect(sendMessage).toHaveBeenCalledWith(chatId, longText, {
parse_mode: "HTML",
});
});
it("chunks long default markdown media follow-up text", async () => {
const chatId = "123";
const longText = `**${"A".repeat(5000)}**`;
@@ -1701,10 +1658,7 @@ describe("sendMessageTelegram", () => {
message_id: 72,
chat: { id: chatId },
});
const sendMessage = vi
.fn()
.mockResolvedValueOnce({ message_id: 73, chat: { id: chatId } })
.mockResolvedValueOnce({ message_id: 74, chat: { id: chatId } });
const sendMessage = vi.fn().mockResolvedValue({ message_id: 74, chat: { id: chatId } });
const api = { sendPhoto, sendMessage } as unknown as {
sendPhoto: typeof sendPhoto;
sendMessage: typeof sendMessage;
@@ -1730,9 +1684,6 @@ describe("sendMessageTelegram", () => {
expect(sendMessage.mock.calls.every((call) => call[2]?.parse_mode === "HTML")).toBe(true);
expect(sendMessage.mock.calls.map((call) => String(call[1] ?? "")).join("")).toContain("A");
expect(res.messageId).toBe("74");
expect(res.receipt?.primaryPlatformMessageId).toBe("73");
expect(res.receipt?.platformMessageIds).toEqual(["73", "74"]);
expect(res.receipt?.parts.map((part) => part.kind)).toEqual(["text", "text"]);
});
it("uses caption when text is within 1024 char limit", async () => {
@@ -2548,93 +2499,6 @@ describe("sendMessageTelegram", () => {
}
});
it("returns a multipart receipt and avoids native replies for chunked first-mode text", async () => {
const sendMessage = vi
.fn()
.mockResolvedValueOnce({ message_id: 101, chat: { id: "-1001234567890" } })
.mockResolvedValueOnce({ message_id: 102, chat: { id: "-1001234567890" } });
const api = { sendMessage } as unknown as {
sendMessage: typeof sendMessage;
};
const result = await sendMessageTelegram("-1001234567890", `BEGIN ${"A".repeat(4100)} END`, {
cfg: TELEGRAM_TEST_CFG,
token: "tok",
api,
messageThreadId: 271,
replyToMessageId: 500,
replyToIdSource: "implicit",
replyToMode: "first",
});
expect(sendMessage).toHaveBeenCalledTimes(2);
expect(sendMessage.mock.calls[0]?.[2]).toEqual({
parse_mode: "HTML",
message_thread_id: 271,
});
expect(sendMessage.mock.calls[1]?.[2]).toEqual({
parse_mode: "HTML",
message_thread_id: 271,
});
expect(result.messageId).toBe("102");
expect(result.receipt?.primaryPlatformMessageId).toBe("101");
expect(result.receipt?.platformMessageIds).toEqual(["101", "102"]);
expect(result.receipt?.threadId).toBe("271");
expect(result.receipt?.replyToId).toBeUndefined();
expect(
result.receipt?.parts.map(({ platformMessageId, kind, index, threadId, replyToId }) => ({
platformMessageId,
kind,
index,
threadId,
replyToId,
})),
).toEqual([
{
platformMessageId: "101",
kind: "text",
index: 0,
threadId: "271",
replyToId: undefined,
},
{
platformMessageId: "102",
kind: "text",
index: 1,
threadId: "271",
replyToId: undefined,
},
]);
});
it("keeps explicit native replies for chunked first-mode text", async () => {
const sendMessage = vi
.fn()
.mockResolvedValueOnce({ message_id: 101, chat: { id: "-1001234567890" } })
.mockResolvedValueOnce({ message_id: 102, chat: { id: "-1001234567890" } });
const api = { sendMessage } as unknown as {
sendMessage: typeof sendMessage;
};
await sendMessageTelegram("-1001234567890", `BEGIN ${"A".repeat(4100)} END`, {
cfg: TELEGRAM_TEST_CFG,
token: "tok",
api,
replyToMessageId: 500,
replyToIdSource: "explicit",
replyToMode: "first",
});
expect(sendMessage.mock.calls[0]?.[2]).toMatchObject({
reply_to_message_id: 500,
allow_sending_without_reply: true,
});
expect(sendMessage.mock.calls[1]?.[2]).toMatchObject({
reply_to_message_id: 500,
allow_sending_without_reply: true,
});
});
it("fails topic sends instead of retrying without message_thread_id", async () => {
const cases = [{ name: "forum", chatId: "-100123", text: "hello forum" }] as const;
const threadErr = new Error("400: Bad Request: message thread not found");

View File

@@ -3,17 +3,12 @@ import * as grammy from "grammy";
import { type ApiClientOptions, Bot, HttpError } from "grammy";
import type { ReactionType, ReactionTypeEmoji } from "grammy/types";
import { recordChannelActivity } from "openclaw/plugin-sdk/channel-activity-runtime";
import {
createMessageReceiptFromOutboundResults,
type MessageReceipt,
} from "openclaw/plugin-sdk/channel-outbound";
import type { MarkdownTableMode, ReplyToMode } from "openclaw/plugin-sdk/config-contracts";
import type { MarkdownTableMode } from "openclaw/plugin-sdk/config-contracts";
import { isDiagnosticFlagEnabled } from "openclaw/plugin-sdk/diagnostic-runtime";
import { formatUncaughtError } from "openclaw/plugin-sdk/error-runtime";
import { redactSensitiveText } from "openclaw/plugin-sdk/logging-core";
import { parseStrictInteger } from "openclaw/plugin-sdk/number-runtime";
import { resolveChunkMode, resolveTextChunkLimit } from "openclaw/plugin-sdk/reply-chunking";
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
import { createTelegramRetryRunner, type RetryConfig } from "openclaw/plugin-sdk/retry-runtime";
import { createSubsystemLogger, logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime";
@@ -89,8 +84,6 @@ type TelegramEditMessageCaptionParams = Parameters<TelegramApi["editMessageCapti
type TelegramCreateForumTopicParams = NonNullable<Parameters<TelegramApi["createForumTopic"]>[2]>;
type TelegramThreadScopedParams = {
message_thread_id?: number;
reply_parameters?: { message_id?: number };
reply_to_message_id?: number;
};
const InputFileCtor = grammy.InputFile;
const MAX_TELEGRAM_PHOTO_DIMENSION_SUM = 10_000;
@@ -118,10 +111,6 @@ type TelegramSendOpts = {
silent?: boolean;
/** Message ID to reply to (for threading) */
replyToMessageId?: number;
/** Whether replyToMessageId came from ambient context or explicit payload/action input. */
replyToIdSource?: "explicit" | "implicit";
/** Controls whether replyToMessageId is applied to every internal text chunk. */
replyToMode?: ReplyToMode;
/** Quote text for Telegram reply_parameters. */
quoteText?: string;
/** Forum topic thread ID (for forum supergroups) */
@@ -135,7 +124,6 @@ type TelegramSendOpts = {
type TelegramSendResult = {
messageId: string;
chatId: string;
receipt?: MessageReceipt;
};
type TelegramMessageLike = {
@@ -286,42 +274,6 @@ function logTelegramOutboundSendOk(params: TelegramOutboundSuccessLogParams): vo
sendLogger.info(parts.join(" "));
}
function buildTelegramTextSendReceipt(params: {
messageIds: readonly string[];
chatId: string;
messageThreadId?: number;
replyToMessageId?: number;
}): MessageReceipt | undefined {
if (params.messageIds.length <= 1) {
return undefined;
}
return createMessageReceiptFromOutboundResults({
results: params.messageIds.map((messageId) => ({
messageId,
chatId: params.chatId,
})),
kind: "text",
...(typeof params.messageThreadId === "number"
? { threadId: String(params.messageThreadId) }
: {}),
...(typeof params.replyToMessageId === "number"
? { replyToId: String(params.replyToMessageId) }
: {}),
});
}
function resolveAcceptedReplyToMessageId(
params: TelegramThreadScopedParams | TelegramRichMessageContextParams | undefined,
): number | undefined {
if (!params) {
return undefined;
}
if ("reply_to_message_id" in params) {
return params.reply_to_message_id;
}
return params.reply_parameters?.message_id;
}
const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i;
const MESSAGE_NOT_MODIFIED_RE =
/400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i;
@@ -709,26 +661,19 @@ export async function sendMessageTelegram(
(typeof account.config.mediaMaxMb === "number" ? account.config.mediaMaxMb : 100) * 1024 * 1024;
const replyMarkup = buildInlineKeyboard(opts.buttons);
const threadSpec = resolveTelegramSendThreadSpec({
targetMessageThreadId: target.messageThreadId,
messageThreadId: opts.messageThreadId,
chatType: target.chatType,
const threadParams = buildTelegramThreadReplyParams({
thread: resolveTelegramSendThreadSpec({
targetMessageThreadId: target.messageThreadId,
messageThreadId: opts.messageThreadId,
chatType: target.chatType,
}),
replyToMessageId: opts.replyToMessageId,
replyQuoteText: opts.quoteText,
useReplyIdAsQuoteSource: true,
});
const singleUseReplyTo =
opts.replyToIdSource === "implicit" &&
opts.replyToMode !== undefined &&
isSingleUseReplyToMode(opts.replyToMode);
const buildThreadParams = (includeReplyTo: boolean) =>
buildTelegramThreadReplyParams({
thread: threadSpec,
...(includeReplyTo
? {
replyToMessageId: opts.replyToMessageId,
replyQuoteText: opts.quoteText,
useReplyIdAsQuoteSource: true,
}
: {}),
});
const richThreadParams = toTelegramRichMessageContextParams(threadParams);
const hasThreadParams = Object.keys(threadParams).length > 0;
const hasRichThreadParams = Object.keys(richThreadParams).length > 0;
const requestWithDiag = createTelegramNonIdempotentRequestWithDiag({
cfg,
account,
@@ -801,59 +746,29 @@ export async function sendMessageTelegram(
return { result, acceptedParams: params };
};
const shouldIncludeReplyForChunk = (
index: number,
chunkCount: number,
replyToAlreadyUsed: boolean,
) =>
// Telegram Desktop can render long formatted reply chunks as unsupported messages.
// Multi-part `first` replies keep chat/topic routing but avoid hiding chunk text.
!replyToAlreadyUsed && (!singleUseReplyTo || (chunkCount === 1 && index === 0));
const buildTextParams = (
index: number,
chunkCount: number,
isLastChunk: boolean,
replyToAlreadyUsed: boolean,
) => {
const params = buildThreadParams(
shouldIncludeReplyForChunk(index, chunkCount, replyToAlreadyUsed),
);
return Object.keys(params).length > 0 || (isLastChunk && replyMarkup)
const buildTextParams = (isLastChunk: boolean) =>
hasThreadParams || (isLastChunk && replyMarkup)
? {
...params,
...threadParams,
...(isLastChunk && replyMarkup ? { reply_markup: replyMarkup } : {}),
}
: undefined;
};
const buildRichTextParams = (
index: number,
chunkCount: number,
isLastChunk: boolean,
replyToAlreadyUsed: boolean,
) => {
const params = toTelegramRichMessageContextParams(
buildThreadParams(shouldIncludeReplyForChunk(index, chunkCount, replyToAlreadyUsed)),
);
return Object.keys(params).length > 0 || (isLastChunk && replyMarkup)
const buildRichTextParams = (isLastChunk: boolean) =>
hasRichThreadParams || (isLastChunk && replyMarkup)
? {
...params,
...richThreadParams,
...(isLastChunk && replyMarkup ? { reply_markup: replyMarkup } : {}),
}
: undefined;
};
const sendTelegramTextChunks = async (
chunks: TelegramTextChunk[],
context: string,
options: { replyToAlreadyUsed?: boolean } = {},
): Promise<TelegramSendResult> => {
): Promise<{ messageId: string; chatId: string }> => {
let lastMessageId = "";
let lastChatId = chatId;
let lastAcceptedParams: TelegramThreadScopedParams | undefined;
let acceptedReplyToMessageId: number | undefined;
const messageIds: string[] = [];
let sentChunkCount = 0;
for (let index = 0; index < chunks.length; index += 1) {
const chunk = chunks[index];
@@ -862,12 +777,7 @@ export async function sendMessageTelegram(
}
const { result: res, acceptedParams } = await sendTelegramTextChunk(
chunk,
buildTextParams(
index,
chunks.length,
index === chunks.length - 1,
options.replyToAlreadyUsed === true,
),
buildTextParams(index === chunks.length - 1),
);
const messageId = resolveTelegramMessageIdOrThrow(res, context);
recordSentMessage(chatId, messageId, cfg);
@@ -885,8 +795,6 @@ export async function sendMessageTelegram(
lastMessageId = String(messageId);
lastChatId = String(res?.chat?.id ?? chatId);
lastAcceptedParams = acceptedParams;
acceptedReplyToMessageId ??= resolveAcceptedReplyToMessageId(acceptedParams);
messageIds.push(lastMessageId);
sentChunkCount += 1;
}
if (lastMessageId) {
@@ -902,17 +810,7 @@ export async function sendMessageTelegram(
chunkCount: sentChunkCount,
});
}
const receipt = buildTelegramTextSendReceipt({
messageIds,
chatId: lastChatId,
messageThreadId: lastAcceptedParams?.message_thread_id,
replyToMessageId: acceptedReplyToMessageId,
});
return {
messageId: lastMessageId,
chatId: lastChatId,
...(receipt ? { receipt } : {}),
};
return { messageId: lastMessageId, chatId: lastChatId };
};
const buildChunkedTextPlan = (rawText: string, context: string): TelegramTextChunk[] => {
@@ -943,14 +841,10 @@ export async function sendMessageTelegram(
}));
};
const sendChunkedText = async (
rawText: string,
context: string,
options: { replyToAlreadyUsed?: boolean } = {},
) =>
const sendChunkedText = async (rawText: string, context: string) =>
useRichMessages
? await sendTelegramRichTextChunks(buildRichTextPlan(rawText), context, options)
: await sendTelegramTextChunks(buildChunkedTextPlan(rawText, context), context, options);
? await sendTelegramRichTextChunks(buildRichTextPlan(rawText), context)
: await sendTelegramTextChunks(buildChunkedTextPlan(rawText, context), context);
const buildRichTextPlan = (rawText: string): TelegramRichTextChunk[] => {
const textLimit = Math.min(
@@ -972,26 +866,18 @@ export async function sendMessageTelegram(
const sendTelegramRichTextChunks = async (
chunks: TelegramRichTextChunk[],
context: string,
options: { replyToAlreadyUsed?: boolean } = {},
): Promise<TelegramSendResult> => {
): Promise<{ messageId: string; chatId: string }> => {
const richRawApi = getTelegramRichRawApi(api);
let lastMessageId = "";
let lastChatId = chatId;
let lastAcceptedParams: TelegramRichMessageContextParams | undefined;
let acceptedReplyToMessageId: number | undefined;
const messageIds: string[] = [];
let sentChunkCount = 0;
for (let index = 0; index < chunks.length; index += 1) {
const chunk = chunks[index];
if (!chunk) {
continue;
}
const acceptedParams = buildRichTextParams(
index,
chunks.length,
index === chunks.length - 1,
options.replyToAlreadyUsed === true,
);
const acceptedParams = buildRichTextParams(index === chunks.length - 1);
const result = await requestWithChatNotFound(
() =>
richRawApi.sendRichMessage({
@@ -1021,8 +907,6 @@ export async function sendMessageTelegram(
lastMessageId = String(messageId);
lastChatId = String(result?.chat?.id ?? chatId);
lastAcceptedParams = acceptedParams;
acceptedReplyToMessageId ??= resolveAcceptedReplyToMessageId(acceptedParams);
messageIds.push(lastMessageId);
sentChunkCount += 1;
}
if (lastMessageId) {
@@ -1038,17 +922,7 @@ export async function sendMessageTelegram(
chunkCount: sentChunkCount,
});
}
const receipt = buildTelegramTextSendReceipt({
messageIds,
chatId: lastChatId,
messageThreadId: lastAcceptedParams?.message_thread_id,
replyToMessageId: acceptedReplyToMessageId,
});
return {
messageId: lastMessageId,
chatId: lastChatId,
...(receipt ? { receipt } : {}),
};
return { messageId: lastMessageId, chatId: lastChatId };
};
async function shouldSendTelegramImageAsPhoto(buffer: Buffer): Promise<boolean> {
@@ -1127,10 +1001,8 @@ export async function sendMessageTelegram(
const needsSeparateText = Boolean(followUpText);
// When splitting, put reply_markup only on the follow-up text (the "main" content),
// not on the media message.
const mediaThreadParams = buildThreadParams(true);
const mediaUsedReplyTo = resolveAcceptedReplyToMessageId(mediaThreadParams) !== undefined;
const baseMediaParams = {
...mediaThreadParams,
...(hasThreadParams ? threadParams : {}),
...(!needsSeparateText && replyMarkup ? { reply_markup: replyMarkup } : {}),
};
const videoDimensions =
@@ -1273,13 +1145,8 @@ export async function sendMessageTelegram(
// If text was too long for a caption, send it as a separate follow-up message.
// Use HTML conversion so markdown renders like captions.
if (needsSeparateText && followUpText) {
const textResult = await sendChunkedText(followUpText, "text follow-up send", {
replyToAlreadyUsed: singleUseReplyTo && mediaUsedReplyTo,
});
return {
...textResult,
chatId: resolvedChatId,
};
const textResult = await sendChunkedText(followUpText, "text follow-up send");
return { messageId: textResult.messageId, chatId: resolvedChatId };
}
return { messageId: String(mediaMessageId), chatId: resolvedChatId };

View File

@@ -514,6 +514,7 @@ export async function compactEmbeddedAgentSessionDirect(
agentId: fallbackAgentId,
sessionId: params.sessionId,
sessionKey: fallbackSessionKey,
abortSignal: params.abortSignal,
prepareAgentHarnessRuntime: async ({ provider, model, agentHarnessRuntimeOverride }) => {
await ensureSelectedAgentHarnessPlugin({
config: params.config,

View File

@@ -2761,6 +2761,8 @@ describe("runWithModelFallback", () => {
it("does not fall back on user aborts", async () => {
const cfg = makeCfg();
const controller = new AbortController();
controller.abort(Object.assign(new Error("timeout"), { name: "TimeoutError" }));
const run = vi
.fn()
.mockRejectedValueOnce(Object.assign(new Error("aborted"), { name: "AbortError" }))
@@ -2771,6 +2773,7 @@ describe("runWithModelFallback", () => {
cfg,
provider: "openai",
model: "gpt-4.1-mini",
abortSignal: controller.signal,
run,
}),
).rejects.toThrow("aborted");
@@ -2797,6 +2800,94 @@ describe("runWithModelFallback", () => {
expect(run).toHaveBeenCalledTimes(1);
});
it("does not fall back when user cancels with AbortError reason", async () => {
const cfg = makeCfg();
const controller = new AbortController();
controller.abort(Object.assign(new Error("cancelled"), { name: "AbortError" }));
const run = vi
.fn()
.mockRejectedValueOnce(Object.assign(new Error("aborted"), { name: "AbortError" }))
.mockResolvedValueOnce("should not run");
await expect(
runWithModelFallback({
cfg,
provider: "openai",
model: "gpt-4.1-mini",
abortSignal: controller.signal,
run,
}),
).rejects.toThrow("aborted");
expect(run).toHaveBeenCalledTimes(1);
});
it("does not fall back when caller cancellation uses a string reason", async () => {
const cfg = makeCfg();
const controller = new AbortController();
controller.abort("Cancelled by operator.");
const run = vi
.fn()
.mockRejectedValueOnce(Object.assign(new Error("aborted"), { name: "AbortError" }))
.mockResolvedValueOnce("should not run");
await expect(
runWithModelFallback({
cfg,
provider: "openai",
model: "gpt-4.1-mini",
abortSignal: controller.signal,
run,
}),
).rejects.toThrow("aborted");
expect(run).toHaveBeenCalledTimes(1);
});
it("does not fall back when caller cancellation throws a plain error", async () => {
const cfg = makeCfg();
const controller = new AbortController();
controller.abort("Cancelled by operator.");
const run = vi
.fn()
.mockRejectedValueOnce(new Error("Cancelled by operator."))
.mockResolvedValueOnce("should not run");
await expect(
runWithModelFallback({
cfg,
provider: "openai",
model: "gpt-4.1-mini",
abortSignal: controller.signal,
run,
}),
).rejects.toThrow("Cancelled by operator.");
expect(run).toHaveBeenCalledTimes(1);
});
it("falls back when AbortError comes from the LLM provider (no external signal)", async () => {
const cfg = makeProviderFallbackCfg("openai");
const run = vi
.fn()
.mockRejectedValueOnce(
Object.assign(new Error("This operation was aborted"), { name: "AbortError" }),
)
.mockResolvedValueOnce({ payloads: [{ text: "fallback ok" }] });
const result = await runWithModelFallback({
cfg,
provider: "openai",
model: "gpt-4.1-mini",
run,
});
expect(result.result).toEqual({ payloads: [{ text: "fallback ok" }] });
expect(run).toHaveBeenCalledTimes(2);
expect(result.attempts[0]?.provider).toBe("openai");
expect(result.attempts[0]?.error).toBe("This operation was aborted");
});
it("does not fall back when the caller abort signal timed out", async () => {
const cfg = makeCfg();
const timeoutReason = new Error("chat run timed out");
@@ -2854,6 +2945,37 @@ describe("runWithModelFallback", () => {
expect(classifyResult).toHaveBeenCalledTimes(1);
});
it("does not fall back when a user AbortError is classified from the result", async () => {
const cfg = makeProviderFallbackCfg("openai");
const abortReason = new Error("chat run cancelled");
abortReason.name = "AbortError";
const controller = new AbortController();
controller.abort(abortReason);
const run = vi
.fn()
.mockResolvedValueOnce({ payloads: [] })
.mockResolvedValueOnce({ payloads: [{ text: "fallback should not run" }] });
const classifyResult = vi.fn(() => ({
message: "This operation was aborted",
reason: "timeout" as const,
code: "terminal_abort",
}));
await expect(
runWithModelFallback({
cfg,
provider: "openai",
model: "m1",
abortSignal: controller.signal,
run,
classifyResult,
}),
).rejects.toThrow("This operation was aborted");
expect(run).toHaveBeenCalledTimes(1);
expect(classifyResult).toHaveBeenCalledTimes(1);
});
it("does not fall back when a restart abort is classified from the result", async () => {
const cfg = makeProviderFallbackCfg("openai");
const controller = new AbortController();

View File

@@ -39,7 +39,6 @@ import {
describeFailoverError,
isFailoverError,
isNonProviderRuntimeCoordinationError,
isTimeoutError,
} from "./failover-error.js";
import {
shouldAllowCooldownProbeForReason,
@@ -189,25 +188,6 @@ type ModelFallbackRunFn<T> = (
options?: ModelFallbackRunOptions,
) => Promise<T>;
/**
* Fallback abort check. Only treats explicit AbortError names as user aborts.
* Message-based checks (e.g., "aborted") can mask timeouts and skip fallback.
*/
function isFallbackAbortError(err: unknown): boolean {
if (!err || typeof err !== "object") {
return false;
}
if (isFailoverError(err)) {
return false;
}
const name = "name" in err ? String(err.name) : "";
return name === "AbortError";
}
function shouldRethrowAbort(err: unknown): boolean {
return isFallbackAbortError(err) && !isTimeoutError(err);
}
function isTerminalAbort(signal: AbortSignal | undefined): boolean {
if (!signal?.aborted) {
return false;
@@ -225,6 +205,10 @@ function isTerminalAbort(signal: AbortSignal | undefined): boolean {
return reason.name === "ClientDisconnectError";
}
function isCallerAbortSignal(signal: AbortSignal | undefined): boolean {
return signal?.aborted === true;
}
function createModelCandidateCollector(allowlist: Set<string> | null | undefined): {
candidates: ModelCandidate[];
addExplicitCandidate: (candidate: ModelCandidate) => void;
@@ -367,7 +351,10 @@ async function runFallbackCandidate<T>(params: {
if (isNonProviderRuntimeCoordinationError(err)) {
throw err;
}
if (isTerminalAbort(params.abortSignal)) {
if (isTerminalAbort(params.abortSignal) || isCallerAbortSignal(params.abortSignal)) {
throw err;
}
if (isAgentRunRestartAbortReason(err)) {
throw err;
}
// Normalize abort-wrapped rate-limit errors (e.g. Google Vertex RESOURCE_EXHAUSTED)
@@ -378,9 +365,6 @@ async function runFallbackCandidate<T>(params: {
sessionId: params.attribution?.sessionId,
lane: params.attribution?.lane,
});
if (shouldRethrowAbort(err) && !normalizedFailover) {
throw err;
}
return { ok: false, error: normalizedFailover ?? err };
}
}
@@ -430,7 +414,7 @@ async function runFallbackAttempt<T>(params: {
attribution: params.attribution,
});
if (classifiedError) {
if (isTerminalAbort(params.abortSignal)) {
if (isTerminalAbort(params.abortSignal) || isCallerAbortSignal(params.abortSignal)) {
throw toErrorObject(classifiedError, "Non-Error thrown");
}
const preserveResultOnExhaustion =
@@ -1323,7 +1307,8 @@ function shouldDiscardDeferredSessionSuspension(params: {
}): boolean {
return (
isTerminalAbort(params.abortSignal) ||
shouldRethrowAbort(params.error) ||
isCallerAbortSignal(params.abortSignal) ||
isAgentRunRestartAbortReason(params.error) ||
isCommandLaneTaskTimeoutError(params.error) ||
isNonProviderRuntimeCoordinationError(params.error) ||
isLikelyContextOverflowError(formatErrorMessage(params.error))

View File

@@ -87,6 +87,7 @@ import { CommandLaneClearedError, GatewayDrainingError } from "../../process/com
import { CommandLane } from "../../process/lanes.js";
import { defaultRuntime } from "../../runtime.js";
import { shouldPreserveUserFacingSessionStateForInputProvenance } from "../../sessions/input-provenance.js";
import { truncateUtf16Safe } from "../../shared/utf16-slice.js";
import {
isMarkdownCapableMessageChannel,
resolveMessageChannel,
@@ -753,7 +754,7 @@ function extractCodexUsageLimitMessage(text: string): string | undefined {
if (!message) {
return undefined;
}
return message.length > 500 ? `${message.slice(0, 497)}...` : message;
return message.length > 500 ? `${truncateUtf16Safe(message, 497)}...` : message;
}
function isPureTransientRateLimitSummary(err: unknown): boolean {

View File

@@ -95,6 +95,7 @@ import { isAcpSessionKey } from "../../routing/session-key.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
import { resolveSilentReplyPolicyFromPolicies } from "../../shared/silent-reply-policy.js";
import { truncateUtf16Safe } from "../../shared/utf16-slice.js";
import { createTtsDirectiveTextStreamCleaner } from "../../tts/directives.js";
import {
normalizeTtsAutoMode,
@@ -2565,7 +2566,7 @@ export async function dispatchReplyFromConfig(
if (collapsed.length <= 80) {
return collapsed;
}
return `${collapsed.slice(0, 77).trimEnd()}...`;
return `${truncateUtf16Safe(collapsed, 77).trimEnd()}...`;
};
const formatPlanUpdateText = (payload: { explanation?: string; steps?: string[] }) => {
const explanation = payload.explanation?.replace(/\s+/g, " ").trim();

View File

@@ -283,6 +283,7 @@ export function createCronPromptExecutor(params: {
agentDir: params.agentDir,
agentId: params.agentId,
sessionKey: params.runSessionKey,
abortSignal: params.abortSignal,
prepareAgentHarnessRuntime: async ({ provider, model, agentHarnessRuntimeOverride }) => {
await ensureSelectedAgentHarnessPlugin({
config: params.cfgWithAgentDefaults,

View File

@@ -665,4 +665,21 @@ describe("runCronIsolatedAgentTurn — cron model override forwarding (#58065)",
expect(capturedFallbacksOverride).toEqual(["openai/gpt-4o"]);
});
it("forwards the cron abort signal into runWithModelFallback", async () => {
const controller = new AbortController();
let capturedAbortSignal: AbortSignal | undefined;
runWithModelFallbackMock.mockImplementation(
async (params: { provider: string; model: string; abortSignal?: AbortSignal }) => {
capturedAbortSignal = params.abortSignal;
return makeSuccessfulRunResult();
},
);
controller.abort(new Error("cron: job execution timed out"));
await runCronIsolatedAgentTurn(makeParams({ abortSignal: controller.signal }));
expect(capturedAbortSignal).toBe(controller.signal);
});
});

View File

@@ -1,10 +1,12 @@
import { describe, expect, it } from "vitest";
import { readFileSync } from "node:fs";
import { describe, expect, it, vi } from "vitest";
import officialExternalPluginCatalog from "../../scripts/lib/official-external-plugin-catalog.json" with { type: "json" };
import {
type OfficialExternalPluginCatalogEntry,
getOfficialExternalPluginCatalogEntry,
isOfficialExternalPluginCatalogFeed,
listOfficialExternalPluginCatalogEntries,
loadHostedOfficialExternalPluginCatalogEntries,
parseOfficialExternalPluginCatalogEntries,
resolveOfficialExternalProviderContractPluginIds,
resolveOfficialExternalProviderPluginIds,
@@ -23,6 +25,16 @@ function expectCatalogEntry(id: string): OfficialExternalPluginCatalogEntry {
}
describe("official external plugin catalog", () => {
it("keeps hosted fetch guard loading lazy for bundled catalog import paths", () => {
const source = readFileSync(
new URL("./official-external-plugin-catalog.ts", import.meta.url),
"utf8",
);
expect(source).not.toMatch(/from ["']\.\.\/infra\/net\/fetch-guard\.js["']/);
expect(source).toContain('await import("../infra/net/fetch-guard.js")');
});
it("ships the official plugin catalog as a feed-shaped bundled fallback", () => {
expect(isOfficialExternalPluginCatalogFeed(officialExternalPluginCatalog)).toBe(true);
expect(officialExternalPluginCatalog).toMatchObject({
@@ -45,7 +57,7 @@ describe("official external plugin catalog", () => {
).toBe(false);
expect(
isOfficialExternalPluginCatalogFeed({
schemaVersion: 2,
schemaVersion: 3,
id: "openclaw-official-external-plugins",
generatedAt: "2026-06-22T00:00:00.000Z",
sequence: 1,
@@ -54,10 +66,135 @@ describe("official external plugin catalog", () => {
).toBe(false);
});
it("accepts the live ClawHub feed schema version", () => {
expect(
isOfficialExternalPluginCatalogFeed({
schemaVersion: 2,
id: "clawhub-official",
generatedAt: "2026-06-25T01:19:39.629Z",
sequence: 11,
entries: [],
}),
).toBe(true);
});
it("keeps live ClawHub marketplace entries as metadata-only feed entries", () => {
const [entry] = parseOfficialExternalPluginCatalogEntries({
schemaVersion: 2,
id: "clawhub-official",
generatedAt: "2026-06-25T01:19:39.629Z",
sequence: 11,
entries: [
{
type: "plugin",
id: "@expediagroup/expedia-openclaw",
title: "Expedia Travel",
version: "1.0.4",
state: "available",
publisher: {
id: "expediagroup",
trust: "official",
},
install: {
candidates: [
{
sourceRef: "public-clawhub",
package: "@expediagroup/expedia-openclaw",
version: "1.0.4",
integrity:
"sha256:b355dda04403becaab8bbab069fd1e7b0578262e7459e598cc5b19615b5bdab9",
},
],
},
},
],
});
if (entry === undefined) {
throw new Error("Expected hosted ClawHub feed entry to parse");
}
expect(entry).toMatchObject({
id: "@expediagroup/expedia-openclaw",
title: "Expedia Travel",
version: "1.0.4",
});
expect(resolveOfficialExternalPluginId(entry)).toBeUndefined();
expect(resolveOfficialExternalPluginInstall(entry)).toBeNull();
});
it("does not synthesize trusted installs for unavailable or untrusted hosted entries", () => {
const entries = parseOfficialExternalPluginCatalogEntries({
schemaVersion: 2,
id: "clawhub-official",
generatedAt: "2026-06-25T01:19:39.629Z",
sequence: 11,
entries: [
{
type: "plugin",
id: "@example/unavailable",
title: "Unavailable",
version: "1.0.0",
state: "disabled",
publisher: { id: "example", trust: "official" },
install: {
candidates: [
{
sourceRef: "public-clawhub",
package: "@example/unavailable",
version: "1.0.0",
},
],
},
},
{
type: "plugin",
id: "@example/community",
title: "Community",
version: "1.0.0",
state: "available",
publisher: { id: "example", trust: "community" },
install: {
candidates: [
{
sourceRef: "public-clawhub",
package: "@example/community",
version: "1.0.0",
},
],
},
},
{
type: "plugin",
id: "@example/private-source",
title: "Private Source",
version: "1.0.0",
state: "available",
publisher: { id: "example", trust: "official" },
install: {
candidates: [
{
sourceRef: "private-feed",
package: "@example/private-source",
version: "1.0.0",
},
],
},
},
],
});
expect(entries).toHaveLength(3);
for (const entry of entries) {
expect(resolveOfficialExternalPluginId(entry)).toBeUndefined();
expect(resolveOfficialExternalPluginInstall(entry)).toBeNull();
}
});
it("keeps unsupported versioned feed wrappers out of legacy catalog parsing", () => {
expect(
parseOfficialExternalPluginCatalogEntries({
schemaVersion: 2,
schemaVersion: 3,
id: "future-feed",
generatedAt: "2026-06-22T00:00:00.000Z",
sequence: 1,
@@ -71,6 +208,182 @@ describe("official external plugin catalog", () => {
).toEqual([{ name: "legacy-catalog-entry" }]);
});
it("loads a hosted feed with conditional headers and checksum metadata", async () => {
const body = JSON.stringify({
schemaVersion: 1,
id: "openclaw-official-external-plugins",
generatedAt: "2026-06-22T00:00:00.000Z",
sequence: 2,
entries: [
{
name: "@openclaw/hosted-proof",
kind: "plugin",
openclaw: {
plugin: { id: "hosted-proof", label: "Hosted Proof" },
install: { npmSpec: "@openclaw/hosted-proof", defaultChoice: "npm" },
},
},
],
});
const fetchImpl = vi.fn(async (_url: RequestInfo | URL, init?: RequestInit) => {
const headers = new Headers(init?.headers);
expect(headers.get("if-none-match")).toBe('"old"');
expect(headers.get("if-modified-since")).toBe("Mon, 22 Jun 2026 00:00:00 GMT");
return new Response(body, {
status: 200,
headers: {
etag: '"next"',
"last-modified": "Mon, 22 Jun 2026 01:00:00 GMT",
"content-length": String(new TextEncoder().encode(body).byteLength),
},
});
});
const result = await loadHostedOfficialExternalPluginCatalogEntries({
fetchImpl,
ifNoneMatch: '"old"',
ifModifiedSince: "Mon, 22 Jun 2026 00:00:00 GMT",
});
expect(result.source).toBe("hosted");
expect(result.entries.map((entry) => entry.name)).toEqual(["@openclaw/hosted-proof"]);
if (result.source === "hosted") {
expect(result.feed.sequence).toBe(2);
expect(result.metadata).toMatchObject({
status: 200,
etag: '"next"',
lastModified: "Mon, 22 Jun 2026 01:00:00 GMT",
});
expect(result.metadata.checksum).toMatch(/^sha256:[0-9a-f]{64}$/);
}
});
it("keeps live ClawHub metadata-only entries after hosted feed loading", async () => {
const body = JSON.stringify({
schemaVersion: 2,
id: "clawhub-official",
generatedAt: "2026-06-25T01:19:39.629Z",
sequence: 11,
entries: [
{
type: "plugin",
id: "@expediagroup/expedia-openclaw",
title: "Expedia Travel",
version: "1.0.4",
state: "available",
publisher: {
id: "expediagroup",
trust: "official",
},
install: {
candidates: [
{
sourceRef: "public-clawhub",
package: "@expediagroup/expedia-openclaw",
version: "1.0.4",
integrity:
"sha256:b355dda04403becaab8bbab069fd1e7b0578262e7459e598cc5b19615b5bdab9",
},
],
},
},
],
});
const result = await loadHostedOfficialExternalPluginCatalogEntries({
fetchImpl: vi.fn(
async () =>
new Response(body, {
status: 200,
headers: {
"content-length": String(new TextEncoder().encode(body).byteLength),
},
}),
),
});
expect(result.source).toBe("hosted");
expect(result.entries).toHaveLength(1);
expect(result.entries[0]).toMatchObject({
id: "@expediagroup/expedia-openclaw",
title: "Expedia Travel",
version: "1.0.4",
});
});
it("falls back to the bundled catalog when hosted feed validation fails", async () => {
const result = await loadHostedOfficialExternalPluginCatalogEntries({
fetchImpl: vi.fn(
async () =>
new Response(JSON.stringify({ schemaVersion: 1, id: " ", entries: [] }), {
status: 200,
}),
),
});
expect(result.source).toBe("bundled-fallback");
expect(result.entries.length).toBe(listOfficialExternalPluginCatalogEntries().length);
if (result.source === "bundled-fallback") {
expect(result.error).toContain("supported schema version");
expect(result.metadata?.checksum).toMatch(/^sha256:[0-9a-f]{64}$/);
}
});
it("falls back to the bundled catalog on HTTP 304 until a snapshot cache exists", async () => {
const result = await loadHostedOfficialExternalPluginCatalogEntries({
fetchImpl: vi.fn(
async () =>
new Response(null, {
status: 304,
headers: { etag: '"same"', "last-modified": "Mon, 22 Jun 2026 01:00:00 GMT" },
}),
),
});
expect(result.source).toBe("bundled-fallback");
if (result.source === "bundled-fallback") {
expect(result.error).toContain("without a cached snapshot");
expect(result.metadata).toMatchObject({
status: 304,
etag: '"same"',
lastModified: "Mon, 22 Jun 2026 01:00:00 GMT",
});
}
});
it("falls back to the bundled catalog on checksum mismatch and oversized bodies", async () => {
const mismatch = await loadHostedOfficialExternalPluginCatalogEntries({
expectedSha256: "sha256:not-current",
fetchImpl: vi.fn(
async () =>
new Response(
JSON.stringify({
schemaVersion: 1,
id: "openclaw-official-external-plugins",
generatedAt: "2026-06-22T00:00:00.000Z",
sequence: 1,
entries: [],
}),
{ status: 200 },
),
),
});
expect(mismatch.source).toBe("bundled-fallback");
if (mismatch.source === "bundled-fallback") {
expect(mismatch.error).toContain("checksum mismatch");
expect(mismatch.metadata?.checksum).toMatch(/^sha256:[0-9a-f]{64}$/);
}
const oversized = await loadHostedOfficialExternalPluginCatalogEntries({
maxBytes: 4,
fetchImpl: vi.fn(async () => new Response("12345", { status: 200 })),
});
expect(oversized.source).toBe("bundled-fallback");
if (oversized.source === "bundled-fallback") {
expect(oversized.error).toContain("exceeds 4 bytes");
}
});
it("lists the externalized provider and capability plugins with install metadata", () => {
const providers = [
["arcee", "@openclaw/arcee-provider"],

View File

@@ -1,4 +1,5 @@
/** Reads official external plugin/channel/provider catalogs into manifest-like metadata. */
import { createHash } from "node:crypto";
import { normalizeOptionalString } from "@openclaw/normalization-core/string-coerce";
import { uniqueStrings } from "@openclaw/normalization-core/string-normalization";
import officialExternalChannelCatalog from "../../scripts/lib/official-external-channel-catalog.json" with { type: "json" };
@@ -77,16 +78,34 @@ export type OfficialExternalPluginCatalogManifest = {
/** Raw official external catalog entry loaded from generated catalog JSON. */
export type OfficialExternalPluginCatalogEntry = {
id?: string;
title?: string;
type?: string;
state?: string;
publisher?: {
id?: string;
trust?: string;
};
name?: string;
version?: string;
description?: string;
source?: string;
kind?: string;
install?: {
candidates?: readonly OfficialExternalPluginCatalogInstallCandidate[];
};
} & Partial<Record<ManifestKey, OfficialExternalPluginCatalogManifest>>;
export type OfficialExternalPluginCatalogInstallCandidate = {
sourceRef?: string;
package?: string;
version?: string;
integrity?: string;
};
/** Feed-shaped wrapper used by the bundled external plugin catalog fallback. */
export type OfficialExternalPluginCatalogFeed = {
schemaVersion: number;
schemaVersion: 1 | 2;
id: string;
generatedAt: string;
sequence: number;
@@ -94,6 +113,32 @@ export type OfficialExternalPluginCatalogFeed = {
entries: readonly OfficialExternalPluginCatalogEntry[];
};
export type HostedOfficialExternalPluginCatalogMetadata = {
url: string;
status: number;
etag?: string;
lastModified?: string;
checksum: string;
};
export type HostedOfficialExternalPluginCatalogLoadResult =
| {
source: "hosted";
entries: OfficialExternalPluginCatalogEntry[];
feed: OfficialExternalPluginCatalogFeed;
metadata: HostedOfficialExternalPluginCatalogMetadata;
}
| {
source: "bundled-fallback";
entries: OfficialExternalPluginCatalogEntry[];
error: string;
metadata?: Omit<HostedOfficialExternalPluginCatalogMetadata, "checksum"> & {
checksum?: string;
};
};
type FetchLike = (input: RequestInfo | URL, init?: RequestInit) => Promise<Response>;
type OfficialExternalProviderContract =
| "embeddingProviders"
| "mediaUnderstandingProviders"
@@ -107,7 +152,13 @@ const OFFICIAL_CATALOG_SOURCES = [
officialExternalPluginCatalog,
] as const;
const OFFICIAL_EXTERNAL_CATALOG_FEED_SCHEMA_VERSION = 1;
const OFFICIAL_EXTERNAL_CATALOG_FEED_SCHEMA_VERSIONS = new Set<unknown>([1, 2]);
export const DEFAULT_OFFICIAL_EXTERNAL_PLUGIN_CATALOG_FEED_URL =
"https://clawhub.ai/v1/feeds/plugins";
const DEFAULT_HOSTED_OFFICIAL_EXTERNAL_PLUGIN_CATALOG_TIMEOUT_MS = 5000;
const DEFAULT_HOSTED_OFFICIAL_EXTERNAL_PLUGIN_CATALOG_MAX_BYTES = 1024 * 1024;
const DEFAULT_HOSTED_OFFICIAL_EXTERNAL_PLUGIN_CATALOG_CHUNK_TIMEOUT_MS = 5000;
const OFFICIAL_EXTERNAL_PLUGIN_CATALOG_FEED_HOSTNAME_ALLOWLIST = ["clawhub.ai"];
export function isOfficialExternalPluginCatalogFeed(
raw: unknown,
@@ -116,8 +167,9 @@ export function isOfficialExternalPluginCatalogFeed(
return false;
}
const sequence = raw.sequence;
const entries = raw.entries;
return (
raw.schemaVersion === OFFICIAL_EXTERNAL_CATALOG_FEED_SCHEMA_VERSION &&
OFFICIAL_EXTERNAL_CATALOG_FEED_SCHEMA_VERSIONS.has(raw.schemaVersion) &&
typeof raw.id === "string" &&
raw.id.trim().length > 0 &&
typeof raw.generatedAt === "string" &&
@@ -125,7 +177,7 @@ export function isOfficialExternalPluginCatalogFeed(
typeof sequence === "number" &&
Number.isInteger(sequence) &&
sequence >= 0 &&
Array.isArray(raw.entries)
Array.isArray(entries)
);
}
@@ -153,6 +205,288 @@ export function parseOfficialExternalPluginCatalogEntries(
return list.filter((entry): entry is OfficialExternalPluginCatalogEntry => isRecord(entry));
}
function normalizeHostedCatalogHeader(value: string | null): string | undefined {
const normalized = normalizeOptionalString(value);
return normalized || undefined;
}
function sha256Hex(value: string): string {
return `sha256:${createHash("sha256").update(value).digest("hex")}`;
}
function resolveHostedCatalogFeedUrl(feedUrl: string | undefined): URL {
const raw = feedUrl?.trim() || DEFAULT_OFFICIAL_EXTERNAL_PLUGIN_CATALOG_FEED_URL;
let parsed: URL;
try {
parsed = new URL(raw);
} catch {
throw new Error("hosted catalog feed URL is invalid");
}
if (parsed.protocol !== "https:") {
throw new Error("hosted catalog feed URL must use HTTPS");
}
if (!OFFICIAL_EXTERNAL_PLUGIN_CATALOG_FEED_HOSTNAME_ALLOWLIST.includes(parsed.hostname)) {
throw new Error("hosted catalog feed URL hostname is not allowed");
}
return parsed;
}
function parseHostedCatalogContentLength(raw: string | null, maxBytes: number): void {
const normalized = normalizeOptionalString(raw);
if (!normalized) {
return;
}
if (!/^\d+$/.test(normalized)) {
throw new Error("hosted catalog feed has invalid content-length");
}
const size = Number(normalized);
if (!Number.isSafeInteger(size) || size > maxBytes) {
throw new Error(`hosted catalog feed exceeds ${maxBytes} bytes`);
}
}
function hasStreamingResponseBody(
response: Response,
): response is Response & { body: ReadableStream<Uint8Array> } {
return Boolean(
response.body && typeof (response.body as { getReader?: unknown }).getReader === "function",
);
}
async function readHostedCatalogChunkWithTimeout(
reader: ReadableStreamDefaultReader<Uint8Array>,
chunkTimeoutMs: number,
): Promise<Awaited<ReturnType<typeof reader.read>>> {
let timeoutId: ReturnType<typeof setTimeout> | undefined;
let timedOut = false;
return await new Promise((resolve, reject) => {
const clear = () => {
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
timeoutId = undefined;
}
};
timeoutId = setTimeout(() => {
timedOut = true;
clear();
void reader.cancel().catch(() => undefined);
reject(new Error(`hosted catalog feed read timed out after ${chunkTimeoutMs}ms`));
}, chunkTimeoutMs);
void reader.read().then(
(result) => {
clear();
if (!timedOut) {
resolve(result);
}
},
(err: unknown) => {
clear();
if (!timedOut) {
reject(err instanceof Error ? err : new Error(String(err)));
}
},
);
});
}
async function readHostedCatalogResponseText(params: {
response: Response;
maxBytes: number;
chunkTimeoutMs: number;
}): Promise<string> {
parseHostedCatalogContentLength(params.response.headers.get("content-length"), params.maxBytes);
if (!hasStreamingResponseBody(params.response)) {
const text = await params.response.text();
if (new TextEncoder().encode(text).byteLength > params.maxBytes) {
throw new Error(`hosted catalog feed exceeds ${params.maxBytes} bytes`);
}
return text;
}
const reader = params.response.body.getReader();
const chunks: Uint8Array[] = [];
let totalBytes = 0;
try {
while (true) {
const chunk = await readHostedCatalogChunkWithTimeout(reader, params.chunkTimeoutMs);
if (chunk.done) {
break;
}
totalBytes += chunk.value.byteLength;
if (totalBytes > params.maxBytes) {
throw new Error(`hosted catalog feed exceeds ${params.maxBytes} bytes`);
}
chunks.push(chunk.value);
}
} catch (err) {
await reader.cancel().catch(() => undefined);
throw err;
} finally {
reader.releaseLock();
}
const body = new Uint8Array(totalBytes);
let offset = 0;
for (const chunk of chunks) {
body.set(chunk, offset);
offset += chunk.byteLength;
}
return new TextDecoder().decode(body);
}
function bundledOfficialExternalPluginCatalogEntries(): OfficialExternalPluginCatalogEntry[] {
return OFFICIAL_CATALOG_SOURCES.flatMap((source) =>
parseOfficialExternalPluginCatalogEntries(source),
);
}
function dedupeOfficialExternalPluginCatalogEntries(
entries: OfficialExternalPluginCatalogEntry[],
): OfficialExternalPluginCatalogEntry[] {
const resolved = new Map<string, OfficialExternalPluginCatalogEntry>();
for (const entry of entries) {
const key = resolveOfficialExternalPluginCatalogEntryKey(entry);
if (key && !resolved.has(key)) {
resolved.set(key, entry);
}
}
return [...resolved.values()];
}
function resolveOfficialExternalPluginCatalogEntryKey(
entry: OfficialExternalPluginCatalogEntry,
): string | undefined {
const pluginId = resolveOfficialExternalPluginId(entry);
if (pluginId) {
return `${normalizeOptionalString(entry.kind) ?? "plugin"}:${pluginId}`;
}
const name = normalizeOptionalString(entry.name);
if (name) {
return name;
}
const id = normalizeOptionalString(entry.id);
if (id) {
return `${normalizeOptionalString(entry.kind) ?? normalizeOptionalString(entry.type) ?? "plugin"}:${id}`;
}
return undefined;
}
function bundledFallbackResult(
error: unknown,
metadata?: HostedOfficialExternalPluginCatalogLoadResult["metadata"],
): HostedOfficialExternalPluginCatalogLoadResult {
return {
source: "bundled-fallback",
entries: listOfficialExternalPluginCatalogEntries(),
error: error instanceof Error ? error.message : String(error),
...(metadata ? { metadata } : {}),
};
}
export async function loadHostedOfficialExternalPluginCatalogEntries(params?: {
feedUrl?: string;
fetchImpl?: FetchLike;
timeoutMs?: number;
maxBytes?: number;
chunkTimeoutMs?: number;
ifNoneMatch?: string;
ifModifiedSince?: string;
expectedSha256?: string;
}): Promise<HostedOfficialExternalPluginCatalogLoadResult> {
let url: URL;
try {
url = resolveHostedCatalogFeedUrl(params?.feedUrl);
} catch (err) {
return bundledFallbackResult(err);
}
const headers = new Headers();
const ifNoneMatch = normalizeOptionalString(params?.ifNoneMatch);
const ifModifiedSince = normalizeOptionalString(params?.ifModifiedSince);
if (ifNoneMatch) {
headers.set("if-none-match", ifNoneMatch);
}
if (ifModifiedSince) {
headers.set("if-modified-since", ifModifiedSince);
}
const metadataBase = (response: Response) => {
const etag = normalizeHostedCatalogHeader(response.headers.get("etag"));
const lastModified = normalizeHostedCatalogHeader(response.headers.get("last-modified"));
return {
url: url.href,
status: response.status,
...(etag ? { etag } : {}),
...(lastModified ? { lastModified } : {}),
};
};
let response: Response | undefined;
let release: (() => Promise<void>) | undefined;
try {
const { fetchWithSsrFGuard } = await import("../infra/net/fetch-guard.js");
const guarded = await fetchWithSsrFGuard({
url: url.href,
fetchImpl: params?.fetchImpl,
init: { method: "GET", headers },
requireHttps: true,
maxRedirects: 2,
timeoutMs: params?.timeoutMs ?? DEFAULT_HOSTED_OFFICIAL_EXTERNAL_PLUGIN_CATALOG_TIMEOUT_MS,
policy: { hostnameAllowlist: OFFICIAL_EXTERNAL_PLUGIN_CATALOG_FEED_HOSTNAME_ALLOWLIST },
auditContext: "official-external-plugin-catalog-feed",
});
response = guarded.response;
release = guarded.release;
const base = metadataBase(response);
if (response.status === 304) {
return bundledFallbackResult(
"hosted catalog feed returned HTTP 304 without a cached snapshot",
base,
);
}
if (!response.ok) {
return bundledFallbackResult(`hosted catalog feed returned HTTP ${response.status}`, base);
}
const body = await readHostedCatalogResponseText({
response,
maxBytes: params?.maxBytes ?? DEFAULT_HOSTED_OFFICIAL_EXTERNAL_PLUGIN_CATALOG_MAX_BYTES,
chunkTimeoutMs:
params?.chunkTimeoutMs ?? DEFAULT_HOSTED_OFFICIAL_EXTERNAL_PLUGIN_CATALOG_CHUNK_TIMEOUT_MS,
});
const checksum = sha256Hex(body);
const expectedSha256 = normalizeOptionalString(params?.expectedSha256);
if (expectedSha256 && expectedSha256 !== checksum) {
return bundledFallbackResult(
`hosted catalog feed checksum mismatch: expected ${expectedSha256}`,
{
...base,
checksum,
},
);
}
const raw = JSON.parse(body) as unknown;
if (!isOfficialExternalPluginCatalogFeed(raw)) {
return bundledFallbackResult("hosted catalog feed did not match a supported schema version", {
...base,
checksum,
});
}
return {
source: "hosted",
entries: dedupeOfficialExternalPluginCatalogEntries(
parseOfficialExternalPluginCatalogEntries(raw),
),
feed: raw,
metadata: {
...base,
checksum,
},
};
} catch (err) {
return bundledFallbackResult(err);
} finally {
if (response?.bodyUsed !== true) {
await response?.body?.cancel().catch(() => undefined);
}
await release?.().catch(() => undefined);
}
}
function normalizeDefaultChoice(value: unknown): PluginPackageInstall["defaultChoice"] | undefined {
return value === "clawhub" || value === "npm" || value === "local" ? value : undefined;
}
@@ -233,18 +567,7 @@ export function resolveOfficialExternalPluginInstall(
}
export function listOfficialExternalPluginCatalogEntries(): OfficialExternalPluginCatalogEntry[] {
const entries = OFFICIAL_CATALOG_SOURCES.flatMap((source) =>
parseOfficialExternalPluginCatalogEntries(source),
);
const resolved = new Map<string, OfficialExternalPluginCatalogEntry>();
for (const entry of entries) {
const pluginId = resolveOfficialExternalPluginId(entry);
const key = pluginId ? `${entry.kind ?? "plugin"}:${pluginId}` : (entry.name ?? "");
if (key && !resolved.has(key)) {
resolved.set(key, entry);
}
}
return [...resolved.values()];
return dedupeOfficialExternalPluginCatalogEntries(bundledOfficialExternalPluginCatalogEntries());
}
/** Resolves official external plugin owners for configured capability provider ids. */