From 8e5183c60d4ae2ecd5620fd8e7d81a3af398ced1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 27 May 2026 13:47:26 +0100 Subject: [PATCH] refactor: move channel message sdk compat into core --- .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/plugins/sdk-channel-message.md | 4 +- scripts/check-deprecated-api-usage.mjs | 1 + src/channels/message/adapter.ts | 26 ++ .../message/inbound-reply-dispatch.ts | 332 ++++++++++++++++ src/channels/message/index.ts | 1 + src/infra/outbound/reply-payload-normalize.ts | 50 +++ src/plugin-sdk/channel-inbound.ts | 4 +- src/plugin-sdk/channel-message.test.ts | 2 + src/plugin-sdk/channel-outbound.ts | 31 +- src/plugin-sdk/inbound-reply-dispatch.test.ts | 2 +- src/plugin-sdk/inbound-reply-dispatch.ts | 354 ++---------------- src/plugin-sdk/reply-payload.ts | 32 +- 13 files changed, 458 insertions(+), 385 deletions(-) create mode 100644 src/channels/message/adapter.ts create mode 100644 src/channels/message/inbound-reply-dispatch.ts create mode 100644 src/infra/outbound/reply-payload-normalize.ts diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 9d0c5fbf3f39..649490d41bd3 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -83399e00723ea5cc4e7d3a4db0baaef73ad681a42baf72d18b088c649c1c7772 plugin-sdk-api-baseline.json -89fd85479942e9cc3bf30692a0a94a0a0ebfed72ebe9eaf36cec650103cddb11 plugin-sdk-api-baseline.jsonl +ce09dfd1c6f67d49916da2557fb208744b7d8a4912bde944004f44c0998c8e9d plugin-sdk-api-baseline.json +371bdfb13fda61dda885827ffeb922bd46e97ca30e09fa0d09baab80c58a7d1e plugin-sdk-api-baseline.jsonl diff --git a/docs/plugins/sdk-channel-message.md b/docs/plugins/sdk-channel-message.md index f6214d6ec2b2..0e7bb3e5494e 100644 --- a/docs/plugins/sdk-channel-message.md +++ b/docs/plugins/sdk-channel-message.md @@ -9,7 +9,9 @@ This page moved to [Channel outbound API](/plugins/sdk-channel-outbound). `openclaw/plugin-sdk/channel-message-runtime` remain deprecated compatibility subpaths for older plugins. New channel plugins should use `openclaw/plugin-sdk/channel-outbound` for message lifecycle, receipt, durable -send, and live preview helpers. +send, and live preview helpers. The deprecated subpaths are thin aliases over +the shared channel message core and the focused inbound/outbound SDK surfaces; +do not add new helpers there. Removal plan: keep these aliases through the external plugin migration window, then remove them in the next major SDK cleanup after callers have moved to diff --git a/scripts/check-deprecated-api-usage.mjs b/scripts/check-deprecated-api-usage.mjs index aff3c5fb2ec3..b3370f8961cc 100644 --- a/scripts/check-deprecated-api-usage.mjs +++ b/scripts/check-deprecated-api-usage.mjs @@ -166,6 +166,7 @@ const rules = [ allowedFiles: [ "src/channels/turn/durable-delivery.ts", "src/channels/turn/kernel.ts", + "src/channels/message/inbound-reply-dispatch.ts", "src/infra/outbound/deliver-runtime.ts", "src/infra/outbound/deliver.ts", "src/plugin-sdk/channel-message-runtime.ts", diff --git a/src/channels/message/adapter.ts b/src/channels/message/adapter.ts new file mode 100644 index 000000000000..da88984f45ae --- /dev/null +++ b/src/channels/message/adapter.ts @@ -0,0 +1,26 @@ +import type { + ChannelMessageAdapter, + ChannelMessageAdapterShape, + ChannelMessageReceiveAdapterShape, +} from "./types.js"; + +const defaultManualReceiveAdapter = { + defaultAckPolicy: "manual", + supportedAckPolicies: ["manual"], +} as const satisfies ChannelMessageReceiveAdapterShape; + +type ChannelMessageAdapterWithDefaultReceive = + TAdapter & { + receive: TAdapter["receive"] extends undefined + ? typeof defaultManualReceiveAdapter + : NonNullable; + }; + +export function defineChannelMessageAdapter( + adapter: TAdapter, +): ChannelMessageAdapter> { + return { + ...adapter, + receive: adapter.receive ?? defaultManualReceiveAdapter, + } as ChannelMessageAdapter>; +} diff --git a/src/channels/message/inbound-reply-dispatch.ts b/src/channels/message/inbound-reply-dispatch.ts new file mode 100644 index 000000000000..0485612fd74c --- /dev/null +++ b/src/channels/message/inbound-reply-dispatch.ts @@ -0,0 +1,332 @@ +/** + * Shared inbound reply dispatch helpers for channel message adapters and + * deprecated SDK compatibility facades. + */ + +import { withReplyDispatcher } from "../../auto-reply/dispatch.js"; +import type { GetReplyOptions } from "../../auto-reply/get-reply-options.types.js"; +import { + dispatchReplyFromConfig, + type DispatchFromConfigResult, +} from "../../auto-reply/reply/dispatch-from-config.js"; +import type { DispatchReplyWithBufferedBlockDispatcher } from "../../auto-reply/reply/provider-dispatcher.types.js"; +import type { ReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.types.js"; +import type { FinalizedMsgContext } from "../../auto-reply/templating.js"; +import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import { + normalizeOutboundReplyPayload, + type OutboundReplyPayload, +} from "../../infra/outbound/reply-payload-normalize.js"; +import { + hasFinalChannelTurnDispatch, + hasVisibleChannelTurnDispatch, + deliverInboundReplyWithMessageSendContext, + dispatchChannelInboundReply as dispatchChannelInboundReplyCore, + isDurableInboundReplyDeliveryHandled, + resolveChannelTurnDispatchCounts, + recordDroppedChannelInboundHistory, + runChannelInboundEvent as runChannelInboundEventCore, + runPreparedInboundReply as runPreparedInboundReplyCore, + throwIfDurableInboundReplyDeliveryFailed, +} from "../turn/kernel.js"; +import type { + ChannelTurnResult, + DispatchedChannelTurnResult, + DurableInboundReplyDeliveryOptions, +} from "../turn/kernel.js"; +import type { + AssembledChannelTurn, + PreparedChannelTurn, + RunChannelTurnParams, +} from "../turn/types.js"; + +export type { + ChannelTurnDroppedHistoryOptions, + ChannelTurnDroppedHistoryOptions as ChannelInboundDroppedHistoryOptions, + ChannelTurnRecordOptions, + ChannelTurnRecordOptions as InboundReplyRecordOptions, +} from "../turn/types.js"; +export type { DurableInboundReplyDeliveryParams } from "../turn/kernel.js"; +export type { ChannelBotLoopProtectionFacts } from "../turn/kernel.js"; +export { recordChannelBotPairLoopAndCheckSuppression } from "../turn/kernel.js"; + +type ReplyOptionsWithoutModelSelected = Omit< + Omit, + "onModelSelected" +>; +type RecordInboundSessionFn = typeof import("../session.js").recordInboundSession; + +type ReplyDispatchFromConfigOptions = Omit; +export type ChannelInboundEventRunnerParams< + TRaw, + TDispatchResult = DispatchFromConfigResult, +> = RunChannelTurnParams; +export type PreparedInboundReply = PreparedChannelTurn; +export type AssembledInboundReply = AssembledChannelTurn; +export type InboundReplyDispatchResult = ChannelTurnResult; + +/** Run an already prepared inbound reply through shared session-record + dispatch ordering. */ +type PreparedInboundReplyTurnWithBotLoopProtection = + PreparedChannelTurn & { + botLoopProtection: NonNullable["botLoopProtection"]>; + }; + +type PreparedInboundReplyTurnWithoutBotLoopProtection = Omit< + PreparedChannelTurn, + "botLoopProtection" +> & { + botLoopProtection?: undefined; +}; + +export function runPreparedInboundReply( + params: PreparedInboundReplyTurnWithBotLoopProtection, +): Promise>; +export function runPreparedInboundReply( + params: PreparedInboundReplyTurnWithoutBotLoopProtection, +): Promise>; +export function runPreparedInboundReply( + params: PreparedChannelTurn, +): Promise>; +export async function runPreparedInboundReply( + params: PreparedChannelTurn, +): Promise> { + return await runPreparedInboundReplyCore(params); +} + +/** @deprecated Use `runPreparedInboundReply`. */ +export function runPreparedInboundReplyTurn( + params: PreparedInboundReplyTurnWithBotLoopProtection, +): Promise>; +export function runPreparedInboundReplyTurn( + params: PreparedInboundReplyTurnWithoutBotLoopProtection, +): Promise>; +export function runPreparedInboundReplyTurn( + params: PreparedChannelTurn, +): Promise>; +export async function runPreparedInboundReplyTurn( + params: PreparedChannelTurn, +): Promise> { + return await runPreparedInboundReply(params); +} + +export async function runChannelInboundEvent( + params: ChannelInboundEventRunnerParams, +) { + return await runChannelInboundEventCore(params); +} + +/** @deprecated Use `runChannelInboundEvent`. */ +export async function runInboundReplyTurn( + params: ChannelInboundEventRunnerParams, +) { + return await runChannelInboundEvent(params); +} + +export async function dispatchChannelInboundReply(params: AssembledInboundReply) { + return await dispatchChannelInboundReplyCore(params); +} + +export { + hasFinalChannelTurnDispatch as hasFinalInboundReplyDispatch, + hasVisibleChannelTurnDispatch as hasVisibleInboundReplyDispatch, + deliverInboundReplyWithMessageSendContext as deliverDurableInboundReplyPayload, + deliverInboundReplyWithMessageSendContext, + recordDroppedChannelInboundHistory as recordDroppedChannelTurnHistory, + recordDroppedChannelInboundHistory, + resolveChannelTurnDispatchCounts as resolveInboundReplyDispatchCounts, +}; + +/** Run `dispatchReplyFromConfig` with a dispatcher that always gets its settled callback. */ +export async function dispatchReplyFromConfigWithSettledDispatcher(params: { + cfg: OpenClawConfig; + ctxPayload: FinalizedMsgContext; + dispatcher: ReplyDispatcher; + onSettled: () => void | Promise; + replyOptions?: ReplyDispatchFromConfigOptions; + configOverride?: OpenClawConfig; +}): Promise { + return await withReplyDispatcher({ + dispatcher: params.dispatcher, + onSettled: params.onSettled, + run: () => + dispatchReplyFromConfig({ + ctx: params.ctxPayload, + cfg: params.cfg, + dispatcher: params.dispatcher, + replyOptions: params.replyOptions, + configOverride: params.configOverride, + }), + }); +} + +/** Assemble the common inbound reply dispatch dependencies for a resolved route. */ +export function buildInboundReplyDispatchBase(params: { + cfg: OpenClawConfig; + channel: string; + accountId?: string; + route: { + agentId: string; + sessionKey: string; + }; + storePath: string; + ctxPayload: FinalizedMsgContext; + core: { + channel: { + session: { + recordInboundSession: RecordInboundSessionFn; + }; + reply: { + dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher; + }; + }; + }; +}) { + return { + cfg: params.cfg, + channel: params.channel, + accountId: params.accountId, + agentId: params.route.agentId, + routeSessionKey: params.route.sessionKey, + storePath: params.storePath, + ctxPayload: params.ctxPayload, + recordInboundSession: params.core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + params.core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + }; +} + +type BuildInboundReplyDispatchBaseParams = Parameters[0]; +type RecordChannelMessageReplyDispatchParams = { + cfg: OpenClawConfig; + channel: string; + accountId?: string; + agentId: string; + routeSessionKey: string; + storePath: string; + ctxPayload: FinalizedMsgContext; + recordInboundSession: RecordInboundSessionFn; + dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher; + deliver: (payload: OutboundReplyPayload) => Promise; + durable?: false | DurableInboundReplyDeliveryOptions; + onRecordError: (err: unknown) => void; + onDispatchError: (err: unknown, info: { kind: string }) => void; + replyOptions?: ReplyOptionsWithoutModelSelected; +}; + +/** + * Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn. + * + * @deprecated Compatibility reply-dispatch bridge. New channel plugins should + * expose a `message` adapter via `defineChannelMessageAdapter(...)` and route + * sends through `deliverInboundReplyWithMessageSendContext(...)` or + * `sendDurableMessageBatch(...)`. + */ +export async function dispatchChannelMessageReplyWithBase( + params: BuildInboundReplyDispatchBaseParams & + Pick< + RecordChannelMessageReplyDispatchParams, + "deliver" | "durable" | "onRecordError" | "onDispatchError" | "replyOptions" + >, +): Promise { + const dispatchBase = buildInboundReplyDispatchBase(params); + await recordChannelMessageReplyDispatch({ + ...dispatchBase, + deliver: params.deliver, + durable: params.durable, + onRecordError: params.onRecordError, + onDispatchError: params.onDispatchError, + replyOptions: params.replyOptions, + }); +} + +/** + * Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn. + * + * @deprecated Legacy inbound reply helper. New channel plugins should expose a + * `message` adapter via `defineChannelMessageAdapter(...)` and use + * `dispatchChannelMessageReplyWithBase` only for compatibility dispatchers that + * have not moved to the message lifecycle yet. + */ +export async function dispatchInboundReplyWithBase( + params: Parameters[0], +): Promise { + await dispatchChannelMessageReplyWithBase(params); +} + +/** + * Record the inbound session first, then dispatch the reply using normalized outbound delivery. + * + * @deprecated Compatibility reply-dispatch bridge. New channel plugins should + * expose a `message` adapter via `defineChannelMessageAdapter(...)` and route + * sends through `deliverInboundReplyWithMessageSendContext(...)` or + * `sendDurableMessageBatch(...)`. + */ +export async function recordChannelMessageReplyDispatch( + params: RecordChannelMessageReplyDispatchParams, +): Promise { + await dispatchChannelInboundReplyCore({ + cfg: params.cfg, + channel: params.channel, + accountId: params.accountId, + agentId: params.agentId, + routeSessionKey: params.routeSessionKey, + storePath: params.storePath, + ctxPayload: params.ctxPayload, + recordInboundSession: params.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: params.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + preparePayload: (payload) => + (payload && typeof payload === "object" + ? normalizeOutboundReplyPayload(payload as Record) + : {}) as OutboundReplyPayload, + deliver: async (payload, info) => { + if (params.durable) { + const durable = await deliverInboundReplyWithMessageSendContext({ + cfg: params.cfg, + channel: params.channel, + accountId: params.accountId, + agentId: params.agentId, + ctxPayload: params.ctxPayload, + payload, + info, + ...params.durable, + }); + throwIfDurableInboundReplyDeliveryFailed(durable); + if (isDurableInboundReplyDeliveryHandled(durable)) { + return durable.delivery; + } + } + return await params.deliver(payload as OutboundReplyPayload); + }, + onError: params.onDispatchError, + }, + replyPipeline: {}, + replyOptions: params.replyOptions, + record: { + onRecordError: params.onRecordError, + }, + }); +} + +/** + * Record the inbound session first, then dispatch the reply using normalized outbound delivery. + * + * @deprecated Legacy inbound reply helper. New channel plugins should expose a + * `message` adapter via `defineChannelMessageAdapter(...)` and use + * `recordChannelMessageReplyDispatch` only for compatibility dispatchers that + * have not moved to the message lifecycle yet. + */ +export async function recordInboundSessionAndDispatchReply( + params: RecordChannelMessageReplyDispatchParams, +): Promise { + await recordChannelMessageReplyDispatch(params); +} + +/** @deprecated Compatibility helper for legacy reply dispatch bridges. */ +export const buildChannelMessageReplyDispatchBase = buildInboundReplyDispatchBase; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ +export const hasFinalChannelMessageReplyDispatch = hasFinalChannelTurnDispatch; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ +export const hasVisibleChannelMessageReplyDispatch = hasVisibleChannelTurnDispatch; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ +export const resolveChannelMessageReplyDispatchCounts = resolveChannelTurnDispatchCounts; diff --git a/src/channels/message/index.ts b/src/channels/message/index.ts index 3ee4cf8dcd90..f1de2a7921be 100644 --- a/src/channels/message/index.ts +++ b/src/channels/message/index.ts @@ -1,4 +1,5 @@ export { deriveDurableFinalDeliveryRequirements } from "./capabilities.js"; +export { defineChannelMessageAdapter } from "./adapter.js"; export { createChannelMessageAdapterFromOutbound } from "./outbound-bridge.js"; export { createDurableInboundReceiveJournal } from "./durable-receive.js"; export { diff --git a/src/infra/outbound/reply-payload-normalize.ts b/src/infra/outbound/reply-payload-normalize.ts new file mode 100644 index 000000000000..fa5cb9b9cbd7 --- /dev/null +++ b/src/infra/outbound/reply-payload-normalize.ts @@ -0,0 +1,50 @@ +import type { ReplyPayload as InternalReplyPayload } from "../../auto-reply/reply-payload.js"; +import { readStringValue } from "../../shared/string-coerce.js"; + +export type OutboundReplyPayload = { + text?: string; + mediaUrls?: string[]; + mediaUrl?: string; + presentation?: InternalReplyPayload["presentation"]; + /** + * @deprecated Use presentation. Runtime support remains for legacy producers. + */ + interactive?: InternalReplyPayload["interactive"]; + channelData?: InternalReplyPayload["channelData"]; + sensitiveMedia?: boolean; + replyToId?: string; +}; + +function readObjectValue(value: unknown): object | undefined { + return value && typeof value === "object" && !Array.isArray(value) ? value : undefined; +} + +/** Extract the supported outbound reply fields from loose tool or agent payload objects. */ +export function normalizeOutboundReplyPayload( + payload: Record, +): OutboundReplyPayload { + const text = readStringValue(payload.text); + const mediaUrls = Array.isArray(payload.mediaUrls) + ? payload.mediaUrls.filter( + (entry): entry is string => typeof entry === "string" && entry.length > 0, + ) + : undefined; + const mediaUrl = readStringValue(payload.mediaUrl); + const presentation = readObjectValue( + payload.presentation, + ) as OutboundReplyPayload["presentation"]; + const interactive = readObjectValue(payload.interactive) as OutboundReplyPayload["interactive"]; + const channelData = readObjectValue(payload.channelData) as OutboundReplyPayload["channelData"]; + const sensitiveMedia = payload.sensitiveMedia === true ? true : undefined; + const replyToId = readStringValue(payload.replyToId); + return { + text, + mediaUrls, + mediaUrl, + presentation, + interactive, + channelData, + sensitiveMedia, + replyToId, + }; +} diff --git a/src/plugin-sdk/channel-inbound.ts b/src/plugin-sdk/channel-inbound.ts index 16c06a51b272..5b6fdf315a66 100644 --- a/src/plugin-sdk/channel-inbound.ts +++ b/src/plugin-sdk/channel-inbound.ts @@ -139,7 +139,7 @@ export { hasVisibleInboundReplyDispatch, recordChannelBotPairLoopAndCheckSuppression, resolveInboundReplyDispatchCounts, -} from "./inbound-reply-dispatch.js"; +} from "../channels/message/inbound-reply-dispatch.js"; export type { AssembledInboundReply, ChannelBotLoopProtectionFacts, @@ -148,7 +148,7 @@ export type { PreparedInboundReply, InboundReplyDispatchResult, InboundReplyRecordOptions, -} from "./inbound-reply-dispatch.js"; +} from "../channels/message/inbound-reply-dispatch.js"; export { toHistoryMediaEntries, diff --git a/src/plugin-sdk/channel-message.test.ts b/src/plugin-sdk/channel-message.test.ts index 7b15d265a9cd..4f08b345ab92 100644 --- a/src/plugin-sdk/channel-message.test.ts +++ b/src/plugin-sdk/channel-message.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it, vi } from "vitest"; +import { defineChannelMessageAdapter as defineCoreChannelMessageAdapter } from "../channels/message/index.js"; import { defineChannelMessageAdapter } from "./channel-outbound.js"; describe("defineChannelMessageAdapter", () => { @@ -28,6 +29,7 @@ describe("defineChannelMessageAdapter", () => { expect(channelMessageRuntime.withDurableMessageSendContext).toBe( channelMessage.withDurableMessageSendContext, ); + expect(channelOutbound.defineChannelMessageAdapter).toBe(defineCoreChannelMessageAdapter); expect(compat.createChannelReplyPipeline).toBe(channelReplyPipeline.createChannelReplyPipeline); }); diff --git a/src/plugin-sdk/channel-outbound.ts b/src/plugin-sdk/channel-outbound.ts index 736275d1d82b..29f52a77a426 100644 --- a/src/plugin-sdk/channel-outbound.ts +++ b/src/plugin-sdk/channel-outbound.ts @@ -1,9 +1,4 @@ // Shared outbound/message lifecycle helpers for channel plugins. -import type { - ChannelMessageAdapter, - ChannelMessageAdapterShape, -} from "../channels/message/index.js"; -import type { ChannelMessageReceiveAdapterShape } from "../channels/message/index.js"; import type { DurableMessageBatchSendResult, DurableMessageSendContext, @@ -23,12 +18,12 @@ export type { DurableMessageSendContextParams, } from "../channels/message/runtime.js"; export { - createChannelReplyPipeline as createChannelMessageReplyPipeline, createReplyPrefixContext, createReplyPrefixOptions, createTypingCallbacks, + createChannelReplyPipeline as createChannelMessageReplyPipeline, resolveChannelSourceReplyDeliveryMode as resolveChannelMessageSourceReplyDeliveryMode, -} from "./channel-reply-core.js"; +} from "../channels/message/index.js"; export { createFinalizableDraftLifecycle, @@ -89,6 +84,7 @@ export { listDeclaredReceiveAckPolicies, createLiveMessageState, createDurableMessageStateRecord, + defineChannelMessageAdapter, markLiveMessageCancelled, markLiveMessageFinalized, markLiveMessagePreviewUpdated, @@ -202,24 +198,3 @@ export async function withDurableMessageSendContext( const mod = await import("../channels/message/runtime.js"); return await mod.withDurableMessageSendContext(params, run); } - -const defaultManualReceiveAdapter = { - defaultAckPolicy: "manual", - supportedAckPolicies: ["manual"], -} as const satisfies ChannelMessageReceiveAdapterShape; - -type ChannelMessageAdapterWithDefaultReceive = - TAdapter & { - receive: TAdapter["receive"] extends undefined - ? typeof defaultManualReceiveAdapter - : NonNullable; - }; - -export function defineChannelMessageAdapter( - adapter: TAdapter, -): ChannelMessageAdapter> { - return { - ...adapter, - receive: adapter.receive ?? defaultManualReceiveAdapter, - } as ChannelMessageAdapter>; -} diff --git a/src/plugin-sdk/inbound-reply-dispatch.test.ts b/src/plugin-sdk/inbound-reply-dispatch.test.ts index 5ae4e7301c60..00880a4bea2d 100644 --- a/src/plugin-sdk/inbound-reply-dispatch.test.ts +++ b/src/plugin-sdk/inbound-reply-dispatch.test.ts @@ -301,7 +301,7 @@ describe("recordInboundSessionAndDispatchReply", () => { }); }); - it("exposes channel-message dispatch names as the canonical helpers for new channel code", () => { + it("keeps deprecated channel-message dispatch names as aliases for focused helpers", () => { expect(createChannelMessageReplyPipeline).toBe(createChannelReplyPipeline); expect(resolveChannelMessageSourceReplyDeliveryMode).toBe( resolveChannelSourceReplyDeliveryMode, diff --git a/src/plugin-sdk/inbound-reply-dispatch.ts b/src/plugin-sdk/inbound-reply-dispatch.ts index b766891413b9..c97535d142d7 100644 --- a/src/plugin-sdk/inbound-reply-dispatch.ts +++ b/src/plugin-sdk/inbound-reply-dispatch.ts @@ -4,330 +4,40 @@ * delivery helpers. */ -import { withReplyDispatcher } from "../auto-reply/dispatch.js"; -import type { GetReplyOptions } from "../auto-reply/get-reply-options.types.js"; -import { - dispatchReplyFromConfig, - type DispatchFromConfigResult, -} from "../auto-reply/reply/dispatch-from-config.js"; -import type { DispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.types.js"; -import type { ReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.types.js"; -import type { FinalizedMsgContext } from "../auto-reply/templating.js"; -import { - hasFinalChannelTurnDispatch, - hasVisibleChannelTurnDispatch, +export { + runPreparedInboundReply, + runPreparedInboundReplyTurn, + runChannelInboundEvent, + runInboundReplyTurn, + dispatchChannelInboundReply, + hasFinalInboundReplyDispatch, + hasVisibleInboundReplyDispatch, + deliverDurableInboundReplyPayload, deliverInboundReplyWithMessageSendContext, - dispatchChannelInboundReply as dispatchChannelInboundReplyCore, - isDurableInboundReplyDeliveryHandled, - resolveChannelTurnDispatchCounts, + recordDroppedChannelTurnHistory, recordDroppedChannelInboundHistory, - runChannelInboundEvent as runChannelInboundEventCore, - runPreparedInboundReply as runPreparedInboundReplyCore, - throwIfDurableInboundReplyDeliveryFailed, -} from "../channels/turn/kernel.js"; -import type { - ChannelTurnResult, - DispatchedChannelTurnResult, - DurableInboundReplyDeliveryOptions, -} from "../channels/turn/kernel.js"; -import type { - AssembledChannelTurn, - PreparedChannelTurn, - RunChannelTurnParams, -} from "../channels/turn/types.js"; + resolveInboundReplyDispatchCounts, + dispatchReplyFromConfigWithSettledDispatcher, + buildInboundReplyDispatchBase, + dispatchChannelMessageReplyWithBase, + dispatchInboundReplyWithBase, + recordChannelMessageReplyDispatch, + recordInboundSessionAndDispatchReply, + buildChannelMessageReplyDispatchBase, + hasFinalChannelMessageReplyDispatch, + hasVisibleChannelMessageReplyDispatch, + resolveChannelMessageReplyDispatchCounts, + recordChannelBotPairLoopAndCheckSuppression, +} from "../channels/message/inbound-reply-dispatch.js"; export type { ChannelTurnDroppedHistoryOptions, - ChannelTurnDroppedHistoryOptions as ChannelInboundDroppedHistoryOptions, + ChannelInboundDroppedHistoryOptions, ChannelTurnRecordOptions, - ChannelTurnRecordOptions as InboundReplyRecordOptions, -} from "../channels/turn/types.js"; -export type { DurableInboundReplyDeliveryParams } from "../channels/turn/kernel.js"; -export type { ChannelBotLoopProtectionFacts } from "../channels/turn/kernel.js"; -export { recordChannelBotPairLoopAndCheckSuppression } from "../channels/turn/kernel.js"; -import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { - normalizeOutboundReplyPayload, - type OutboundReplyPayload, - type ReplyPayload, -} from "./reply-payload.js"; - -type ReplyOptionsWithoutModelSelected = Omit< - Omit, - "onModelSelected" ->; -type RecordInboundSessionFn = typeof import("../channels/session.js").recordInboundSession; - -type ReplyDispatchFromConfigOptions = Omit; -export type ChannelInboundEventRunnerParams< - TRaw, - TDispatchResult = DispatchFromConfigResult, -> = RunChannelTurnParams; -export type PreparedInboundReply = PreparedChannelTurn; -export type AssembledInboundReply = AssembledChannelTurn; -export type InboundReplyDispatchResult = ChannelTurnResult; - -/** Run an already prepared inbound reply through shared session-record + dispatch ordering. */ -type PreparedInboundReplyTurnWithBotLoopProtection = - PreparedChannelTurn & { - botLoopProtection: NonNullable["botLoopProtection"]>; - }; - -type PreparedInboundReplyTurnWithoutBotLoopProtection = Omit< - PreparedChannelTurn, - "botLoopProtection" -> & { - botLoopProtection?: undefined; -}; - -export function runPreparedInboundReply( - params: PreparedInboundReplyTurnWithBotLoopProtection, -): Promise>; -export function runPreparedInboundReply( - params: PreparedInboundReplyTurnWithoutBotLoopProtection, -): Promise>; -export function runPreparedInboundReply( - params: PreparedChannelTurn, -): Promise>; -export async function runPreparedInboundReply( - params: PreparedChannelTurn, -): Promise> { - return await runPreparedInboundReplyCore(params); -} - -/** @deprecated Use `runPreparedInboundReply`. */ -export function runPreparedInboundReplyTurn( - params: PreparedInboundReplyTurnWithBotLoopProtection, -): Promise>; -export function runPreparedInboundReplyTurn( - params: PreparedInboundReplyTurnWithoutBotLoopProtection, -): Promise>; -export function runPreparedInboundReplyTurn( - params: PreparedChannelTurn, -): Promise>; -export async function runPreparedInboundReplyTurn( - params: PreparedChannelTurn, -): Promise> { - return await runPreparedInboundReply(params); -} - -export async function runChannelInboundEvent( - params: ChannelInboundEventRunnerParams, -) { - return await runChannelInboundEventCore(params); -} - -/** @deprecated Use `runChannelInboundEvent`. */ -export async function runInboundReplyTurn( - params: ChannelInboundEventRunnerParams, -) { - return await runChannelInboundEvent(params); -} - -export async function dispatchChannelInboundReply(params: AssembledInboundReply) { - return await dispatchChannelInboundReplyCore(params); -} - -export { - hasFinalChannelTurnDispatch as hasFinalInboundReplyDispatch, - hasVisibleChannelTurnDispatch as hasVisibleInboundReplyDispatch, - deliverInboundReplyWithMessageSendContext as deliverDurableInboundReplyPayload, - deliverInboundReplyWithMessageSendContext, - recordDroppedChannelInboundHistory as recordDroppedChannelTurnHistory, - recordDroppedChannelInboundHistory, - resolveChannelTurnDispatchCounts as resolveInboundReplyDispatchCounts, -}; - -/** Run `dispatchReplyFromConfig` with a dispatcher that always gets its settled callback. */ -export async function dispatchReplyFromConfigWithSettledDispatcher(params: { - cfg: OpenClawConfig; - ctxPayload: FinalizedMsgContext; - dispatcher: ReplyDispatcher; - onSettled: () => void | Promise; - replyOptions?: ReplyDispatchFromConfigOptions; - configOverride?: OpenClawConfig; -}): Promise { - return await withReplyDispatcher({ - dispatcher: params.dispatcher, - onSettled: params.onSettled, - run: () => - dispatchReplyFromConfig({ - ctx: params.ctxPayload, - cfg: params.cfg, - dispatcher: params.dispatcher, - replyOptions: params.replyOptions, - configOverride: params.configOverride, - }), - }); -} - -/** Assemble the common inbound reply dispatch dependencies for a resolved route. */ -export function buildInboundReplyDispatchBase(params: { - cfg: OpenClawConfig; - channel: string; - accountId?: string; - route: { - agentId: string; - sessionKey: string; - }; - storePath: string; - ctxPayload: FinalizedMsgContext; - core: { - channel: { - session: { - recordInboundSession: RecordInboundSessionFn; - }; - reply: { - dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher; - }; - }; - }; -}) { - return { - cfg: params.cfg, - channel: params.channel, - accountId: params.accountId, - agentId: params.route.agentId, - routeSessionKey: params.route.sessionKey, - storePath: params.storePath, - ctxPayload: params.ctxPayload, - recordInboundSession: params.core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - params.core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - }; -} - -type BuildInboundReplyDispatchBaseParams = Parameters[0]; -type RecordChannelMessageReplyDispatchParams = { - cfg: OpenClawConfig; - channel: string; - accountId?: string; - agentId: string; - routeSessionKey: string; - storePath: string; - ctxPayload: FinalizedMsgContext; - recordInboundSession: RecordInboundSessionFn; - dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher; - deliver: (payload: OutboundReplyPayload) => Promise; - durable?: false | DurableInboundReplyDeliveryOptions; - onRecordError: (err: unknown) => void; - onDispatchError: (err: unknown, info: { kind: string }) => void; - replyOptions?: ReplyOptionsWithoutModelSelected; -}; - -/** - * Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn. - * - * @deprecated Compatibility reply-dispatch bridge. New channel plugins should - * expose a `message` adapter via `defineChannelMessageAdapter(...)` and route - * sends through `deliverInboundReplyWithMessageSendContext(...)` or - * `sendDurableMessageBatch(...)`. - */ -export async function dispatchChannelMessageReplyWithBase( - params: BuildInboundReplyDispatchBaseParams & - Pick< - RecordChannelMessageReplyDispatchParams, - "deliver" | "durable" | "onRecordError" | "onDispatchError" | "replyOptions" - >, -): Promise { - const dispatchBase = buildInboundReplyDispatchBase(params); - await recordChannelMessageReplyDispatch({ - ...dispatchBase, - deliver: params.deliver, - durable: params.durable, - onRecordError: params.onRecordError, - onDispatchError: params.onDispatchError, - replyOptions: params.replyOptions, - }); -} - -/** - * Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn. - * - * @deprecated Legacy inbound reply helper. New channel plugins should expose a - * `message` adapter via `defineChannelMessageAdapter(...)` and use - * `dispatchChannelMessageReplyWithBase` only for compatibility dispatchers that - * have not moved to the message lifecycle yet. - */ -export async function dispatchInboundReplyWithBase( - params: Parameters[0], -): Promise { - await dispatchChannelMessageReplyWithBase(params); -} - -/** - * Record the inbound session first, then dispatch the reply using normalized outbound delivery. - * - * @deprecated Compatibility reply-dispatch bridge. New channel plugins should - * expose a `message` adapter via `defineChannelMessageAdapter(...)` and route - * sends through `deliverInboundReplyWithMessageSendContext(...)` or - * `sendDurableMessageBatch(...)`. - */ -export async function recordChannelMessageReplyDispatch( - params: RecordChannelMessageReplyDispatchParams, -): Promise { - await dispatchChannelInboundReplyCore({ - cfg: params.cfg, - channel: params.channel, - accountId: params.accountId, - agentId: params.agentId, - routeSessionKey: params.routeSessionKey, - storePath: params.storePath, - ctxPayload: params.ctxPayload, - recordInboundSession: params.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: params.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - preparePayload: (payload) => - (payload && typeof payload === "object" - ? normalizeOutboundReplyPayload(payload as Record) - : {}) as ReplyPayload, - deliver: async (payload, info) => { - if (params.durable) { - const durable = await deliverInboundReplyWithMessageSendContext({ - cfg: params.cfg, - channel: params.channel, - accountId: params.accountId, - agentId: params.agentId, - ctxPayload: params.ctxPayload, - payload, - info, - ...params.durable, - }); - throwIfDurableInboundReplyDeliveryFailed(durable); - if (isDurableInboundReplyDeliveryHandled(durable)) { - return durable.delivery; - } - } - return await params.deliver(payload as OutboundReplyPayload); - }, - onError: params.onDispatchError, - }, - replyPipeline: {}, - replyOptions: params.replyOptions, - record: { - onRecordError: params.onRecordError, - }, - }); -} - -/** - * Record the inbound session first, then dispatch the reply using normalized outbound delivery. - * - * @deprecated Legacy inbound reply helper. New channel plugins should expose a - * `message` adapter via `defineChannelMessageAdapter(...)` and use - * `recordChannelMessageReplyDispatch` only for compatibility dispatchers that - * have not moved to the message lifecycle yet. - */ -export async function recordInboundSessionAndDispatchReply( - params: RecordChannelMessageReplyDispatchParams, -): Promise { - await recordChannelMessageReplyDispatch(params); -} - -/** @deprecated Compatibility helper for legacy reply dispatch bridges. */ -export const buildChannelMessageReplyDispatchBase = buildInboundReplyDispatchBase; -/** @deprecated Compatibility helper for legacy reply dispatch results. */ -export const hasFinalChannelMessageReplyDispatch = hasFinalChannelTurnDispatch; -/** @deprecated Compatibility helper for legacy reply dispatch results. */ -export const hasVisibleChannelMessageReplyDispatch = hasVisibleChannelTurnDispatch; -/** @deprecated Compatibility helper for legacy reply dispatch results. */ -export const resolveChannelMessageReplyDispatchCounts = resolveChannelTurnDispatchCounts; + InboundReplyRecordOptions, + DurableInboundReplyDeliveryParams, + ChannelBotLoopProtectionFacts, + ChannelInboundEventRunnerParams, + PreparedInboundReply, + AssembledInboundReply, + InboundReplyDispatchResult, +} from "../channels/message/inbound-reply-dispatch.js"; diff --git a/src/plugin-sdk/reply-payload.ts b/src/plugin-sdk/reply-payload.ts index 6b6afd8c7c48..fb50b0cd2c12 100644 --- a/src/plugin-sdk/reply-payload.ts +++ b/src/plugin-sdk/reply-payload.ts @@ -1,8 +1,9 @@ import type { ReplyPayload as InternalReplyPayload } from "../auto-reply/reply-payload.js"; import type { ChannelOutboundAdapter } from "../channels/plugins/outbound.types.js"; +import { normalizeOutboundReplyPayload as normalizeCoreOutboundReplyPayload } from "../infra/outbound/reply-payload-normalize.js"; import { createReplyToFanout } from "../infra/outbound/reply-policy.js"; import { hasReplyPayloadContent } from "../interactive/payload.js"; -import { normalizeLowercaseStringOrEmpty, readStringValue } from "../shared/string-coerce.js"; +import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; import { normalizeStringEntries } from "../shared/string-normalization.js"; export type { MediaPayload, MediaPayloadInput } from "../channels/plugins/media-payload.js"; @@ -54,10 +55,6 @@ type SendPayloadAdapter = Pick< const REASONING_PREFIX_RE = /^(?:reasoning:|thinking\.{0,3}(?=\s*(?:>\s*)?_))/u; -function readObjectValue(value: unknown): object | undefined { - return value && typeof value === "object" && !Array.isArray(value) ? value : undefined; -} - function trimLeadingMarkdownQuoteMarkers(text: string): string { let candidate = text.trimStart(); while (candidate.startsWith(">")) { @@ -86,30 +83,7 @@ export function isReasoningReplyPayload(payload: ReasoningReplyPayload): boolean export function normalizeOutboundReplyPayload( payload: Record, ): OutboundReplyPayload { - const text = readStringValue(payload.text); - const mediaUrls = Array.isArray(payload.mediaUrls) - ? payload.mediaUrls.filter( - (entry): entry is string => typeof entry === "string" && entry.length > 0, - ) - : undefined; - const mediaUrl = readStringValue(payload.mediaUrl); - const presentation = readObjectValue( - payload.presentation, - ) as OutboundReplyPayload["presentation"]; - const interactive = readObjectValue(payload.interactive) as OutboundReplyPayload["interactive"]; - const channelData = readObjectValue(payload.channelData) as OutboundReplyPayload["channelData"]; - const sensitiveMedia = payload.sensitiveMedia === true ? true : undefined; - const replyToId = readStringValue(payload.replyToId); - return { - text, - mediaUrls, - mediaUrl, - presentation, - interactive, - channelData, - sensitiveMedia, - replyToId, - }; + return normalizeCoreOutboundReplyPayload(payload); } /** Wrap a deliverer so callers can hand it arbitrary payloads while channels receive normalized data. */