refactor: move channel message sdk compat into core

This commit is contained in:
Peter Steinberger
2026-05-27 13:47:26 +01:00
parent ef17bbaabf
commit 8e5183c60d
13 changed files with 458 additions and 385 deletions

View File

@@ -1,2 +1,2 @@
83399e00723ea5cc4e7d3a4db0baaef73ad681a42baf72d18b088c649c1c7772 plugin-sdk-api-baseline.json
89fd85479942e9cc3bf30692a0a94a0a0ebfed72ebe9eaf36cec650103cddb11 plugin-sdk-api-baseline.jsonl
ce09dfd1c6f67d49916da2557fb208744b7d8a4912bde944004f44c0998c8e9d plugin-sdk-api-baseline.json
371bdfb13fda61dda885827ffeb922bd46e97ca30e09fa0d09baab80c58a7d1e plugin-sdk-api-baseline.jsonl

View File

@@ -9,7 +9,9 @@ This page moved to [Channel outbound API](/plugins/sdk-channel-outbound).
`openclaw/plugin-sdk/channel-message-runtime` remain deprecated compatibility
subpaths for older plugins. New channel plugins should use
`openclaw/plugin-sdk/channel-outbound` for message lifecycle, receipt, durable
send, and live preview helpers.
send, and live preview helpers. The deprecated subpaths are thin aliases over
the shared channel message core and the focused inbound/outbound SDK surfaces;
do not add new helpers there.
Removal plan: keep these aliases through the external plugin migration window,
then remove them in the next major SDK cleanup after callers have moved to

View File

@@ -166,6 +166,7 @@ const rules = [
allowedFiles: [
"src/channels/turn/durable-delivery.ts",
"src/channels/turn/kernel.ts",
"src/channels/message/inbound-reply-dispatch.ts",
"src/infra/outbound/deliver-runtime.ts",
"src/infra/outbound/deliver.ts",
"src/plugin-sdk/channel-message-runtime.ts",

View File

@@ -0,0 +1,26 @@
import type {
ChannelMessageAdapter,
ChannelMessageAdapterShape,
ChannelMessageReceiveAdapterShape,
} from "./types.js";
const defaultManualReceiveAdapter = {
defaultAckPolicy: "manual",
supportedAckPolicies: ["manual"],
} as const satisfies ChannelMessageReceiveAdapterShape;
type ChannelMessageAdapterWithDefaultReceive<TAdapter extends ChannelMessageAdapterShape> =
TAdapter & {
receive: TAdapter["receive"] extends undefined
? typeof defaultManualReceiveAdapter
: NonNullable<TAdapter["receive"]>;
};
export function defineChannelMessageAdapter<const TAdapter extends ChannelMessageAdapterShape>(
adapter: TAdapter,
): ChannelMessageAdapter<ChannelMessageAdapterWithDefaultReceive<TAdapter>> {
return {
...adapter,
receive: adapter.receive ?? defaultManualReceiveAdapter,
} as ChannelMessageAdapter<ChannelMessageAdapterWithDefaultReceive<TAdapter>>;
}

View File

@@ -0,0 +1,332 @@
/**
* Shared inbound reply dispatch helpers for channel message adapters and
* deprecated SDK compatibility facades.
*/
import { withReplyDispatcher } from "../../auto-reply/dispatch.js";
import type { GetReplyOptions } from "../../auto-reply/get-reply-options.types.js";
import {
dispatchReplyFromConfig,
type DispatchFromConfigResult,
} from "../../auto-reply/reply/dispatch-from-config.js";
import type { DispatchReplyWithBufferedBlockDispatcher } from "../../auto-reply/reply/provider-dispatcher.types.js";
import type { ReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.types.js";
import type { FinalizedMsgContext } from "../../auto-reply/templating.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import {
normalizeOutboundReplyPayload,
type OutboundReplyPayload,
} from "../../infra/outbound/reply-payload-normalize.js";
import {
hasFinalChannelTurnDispatch,
hasVisibleChannelTurnDispatch,
deliverInboundReplyWithMessageSendContext,
dispatchChannelInboundReply as dispatchChannelInboundReplyCore,
isDurableInboundReplyDeliveryHandled,
resolveChannelTurnDispatchCounts,
recordDroppedChannelInboundHistory,
runChannelInboundEvent as runChannelInboundEventCore,
runPreparedInboundReply as runPreparedInboundReplyCore,
throwIfDurableInboundReplyDeliveryFailed,
} from "../turn/kernel.js";
import type {
ChannelTurnResult,
DispatchedChannelTurnResult,
DurableInboundReplyDeliveryOptions,
} from "../turn/kernel.js";
import type {
AssembledChannelTurn,
PreparedChannelTurn,
RunChannelTurnParams,
} from "../turn/types.js";
export type {
ChannelTurnDroppedHistoryOptions,
ChannelTurnDroppedHistoryOptions as ChannelInboundDroppedHistoryOptions,
ChannelTurnRecordOptions,
ChannelTurnRecordOptions as InboundReplyRecordOptions,
} from "../turn/types.js";
export type { DurableInboundReplyDeliveryParams } from "../turn/kernel.js";
export type { ChannelBotLoopProtectionFacts } from "../turn/kernel.js";
export { recordChannelBotPairLoopAndCheckSuppression } from "../turn/kernel.js";
type ReplyOptionsWithoutModelSelected = Omit<
Omit<GetReplyOptions, "onBlockReply">,
"onModelSelected"
>;
type RecordInboundSessionFn = typeof import("../session.js").recordInboundSession;
type ReplyDispatchFromConfigOptions = Omit<GetReplyOptions, "onBlockReply">;
export type ChannelInboundEventRunnerParams<
TRaw,
TDispatchResult = DispatchFromConfigResult,
> = RunChannelTurnParams<TRaw, TDispatchResult>;
export type PreparedInboundReply<TDispatchResult> = PreparedChannelTurn<TDispatchResult>;
export type AssembledInboundReply = AssembledChannelTurn;
export type InboundReplyDispatchResult<TDispatchResult> = ChannelTurnResult<TDispatchResult>;
/** Run an already prepared inbound reply through shared session-record + dispatch ordering. */
type PreparedInboundReplyTurnWithBotLoopProtection<TDispatchResult> =
PreparedChannelTurn<TDispatchResult> & {
botLoopProtection: NonNullable<PreparedChannelTurn<TDispatchResult>["botLoopProtection"]>;
};
type PreparedInboundReplyTurnWithoutBotLoopProtection<TDispatchResult> = Omit<
PreparedChannelTurn<TDispatchResult>,
"botLoopProtection"
> & {
botLoopProtection?: undefined;
};
export function runPreparedInboundReply<TDispatchResult>(
params: PreparedInboundReplyTurnWithBotLoopProtection<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>>;
export function runPreparedInboundReply<TDispatchResult>(
params: PreparedInboundReplyTurnWithoutBotLoopProtection<TDispatchResult>,
): Promise<DispatchedChannelTurnResult<TDispatchResult>>;
export function runPreparedInboundReply<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>>;
export async function runPreparedInboundReply<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>> {
return await runPreparedInboundReplyCore(params);
}
/** @deprecated Use `runPreparedInboundReply`. */
export function runPreparedInboundReplyTurn<TDispatchResult>(
params: PreparedInboundReplyTurnWithBotLoopProtection<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>>;
export function runPreparedInboundReplyTurn<TDispatchResult>(
params: PreparedInboundReplyTurnWithoutBotLoopProtection<TDispatchResult>,
): Promise<DispatchedChannelTurnResult<TDispatchResult>>;
export function runPreparedInboundReplyTurn<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>>;
export async function runPreparedInboundReplyTurn<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>> {
return await runPreparedInboundReply(params);
}
export async function runChannelInboundEvent<TRaw, TDispatchResult = DispatchFromConfigResult>(
params: ChannelInboundEventRunnerParams<TRaw, TDispatchResult>,
) {
return await runChannelInboundEventCore(params);
}
/** @deprecated Use `runChannelInboundEvent`. */
export async function runInboundReplyTurn<TRaw, TDispatchResult = DispatchFromConfigResult>(
params: ChannelInboundEventRunnerParams<TRaw, TDispatchResult>,
) {
return await runChannelInboundEvent(params);
}
export async function dispatchChannelInboundReply(params: AssembledInboundReply) {
return await dispatchChannelInboundReplyCore(params);
}
export {
hasFinalChannelTurnDispatch as hasFinalInboundReplyDispatch,
hasVisibleChannelTurnDispatch as hasVisibleInboundReplyDispatch,
deliverInboundReplyWithMessageSendContext as deliverDurableInboundReplyPayload,
deliverInboundReplyWithMessageSendContext,
recordDroppedChannelInboundHistory as recordDroppedChannelTurnHistory,
recordDroppedChannelInboundHistory,
resolveChannelTurnDispatchCounts as resolveInboundReplyDispatchCounts,
};
/** Run `dispatchReplyFromConfig` with a dispatcher that always gets its settled callback. */
export async function dispatchReplyFromConfigWithSettledDispatcher(params: {
cfg: OpenClawConfig;
ctxPayload: FinalizedMsgContext;
dispatcher: ReplyDispatcher;
onSettled: () => void | Promise<void>;
replyOptions?: ReplyDispatchFromConfigOptions;
configOverride?: OpenClawConfig;
}): Promise<DispatchFromConfigResult> {
return await withReplyDispatcher({
dispatcher: params.dispatcher,
onSettled: params.onSettled,
run: () =>
dispatchReplyFromConfig({
ctx: params.ctxPayload,
cfg: params.cfg,
dispatcher: params.dispatcher,
replyOptions: params.replyOptions,
configOverride: params.configOverride,
}),
});
}
/** Assemble the common inbound reply dispatch dependencies for a resolved route. */
export function buildInboundReplyDispatchBase(params: {
cfg: OpenClawConfig;
channel: string;
accountId?: string;
route: {
agentId: string;
sessionKey: string;
};
storePath: string;
ctxPayload: FinalizedMsgContext;
core: {
channel: {
session: {
recordInboundSession: RecordInboundSessionFn;
};
reply: {
dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher;
};
};
};
}) {
return {
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
agentId: params.route.agentId,
routeSessionKey: params.route.sessionKey,
storePath: params.storePath,
ctxPayload: params.ctxPayload,
recordInboundSession: params.core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
params.core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
};
}
type BuildInboundReplyDispatchBaseParams = Parameters<typeof buildInboundReplyDispatchBase>[0];
type RecordChannelMessageReplyDispatchParams = {
cfg: OpenClawConfig;
channel: string;
accountId?: string;
agentId: string;
routeSessionKey: string;
storePath: string;
ctxPayload: FinalizedMsgContext;
recordInboundSession: RecordInboundSessionFn;
dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher;
deliver: (payload: OutboundReplyPayload) => Promise<void>;
durable?: false | DurableInboundReplyDeliveryOptions;
onRecordError: (err: unknown) => void;
onDispatchError: (err: unknown, info: { kind: string }) => void;
replyOptions?: ReplyOptionsWithoutModelSelected;
};
/**
* Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn.
*
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
* expose a `message` adapter via `defineChannelMessageAdapter(...)` and route
* sends through `deliverInboundReplyWithMessageSendContext(...)` or
* `sendDurableMessageBatch(...)`.
*/
export async function dispatchChannelMessageReplyWithBase(
params: BuildInboundReplyDispatchBaseParams &
Pick<
RecordChannelMessageReplyDispatchParams,
"deliver" | "durable" | "onRecordError" | "onDispatchError" | "replyOptions"
>,
): Promise<void> {
const dispatchBase = buildInboundReplyDispatchBase(params);
await recordChannelMessageReplyDispatch({
...dispatchBase,
deliver: params.deliver,
durable: params.durable,
onRecordError: params.onRecordError,
onDispatchError: params.onDispatchError,
replyOptions: params.replyOptions,
});
}
/**
* Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn.
*
* @deprecated Legacy inbound reply helper. New channel plugins should expose a
* `message` adapter via `defineChannelMessageAdapter(...)` and use
* `dispatchChannelMessageReplyWithBase` only for compatibility dispatchers that
* have not moved to the message lifecycle yet.
*/
export async function dispatchInboundReplyWithBase(
params: Parameters<typeof dispatchChannelMessageReplyWithBase>[0],
): Promise<void> {
await dispatchChannelMessageReplyWithBase(params);
}
/**
* Record the inbound session first, then dispatch the reply using normalized outbound delivery.
*
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
* expose a `message` adapter via `defineChannelMessageAdapter(...)` and route
* sends through `deliverInboundReplyWithMessageSendContext(...)` or
* `sendDurableMessageBatch(...)`.
*/
export async function recordChannelMessageReplyDispatch(
params: RecordChannelMessageReplyDispatchParams,
): Promise<void> {
await dispatchChannelInboundReplyCore({
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
agentId: params.agentId,
routeSessionKey: params.routeSessionKey,
storePath: params.storePath,
ctxPayload: params.ctxPayload,
recordInboundSession: params.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher: params.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
preparePayload: (payload) =>
(payload && typeof payload === "object"
? normalizeOutboundReplyPayload(payload as Record<string, unknown>)
: {}) as OutboundReplyPayload,
deliver: async (payload, info) => {
if (params.durable) {
const durable = await deliverInboundReplyWithMessageSendContext({
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
agentId: params.agentId,
ctxPayload: params.ctxPayload,
payload,
info,
...params.durable,
});
throwIfDurableInboundReplyDeliveryFailed(durable);
if (isDurableInboundReplyDeliveryHandled(durable)) {
return durable.delivery;
}
}
return await params.deliver(payload as OutboundReplyPayload);
},
onError: params.onDispatchError,
},
replyPipeline: {},
replyOptions: params.replyOptions,
record: {
onRecordError: params.onRecordError,
},
});
}
/**
* Record the inbound session first, then dispatch the reply using normalized outbound delivery.
*
* @deprecated Legacy inbound reply helper. New channel plugins should expose a
* `message` adapter via `defineChannelMessageAdapter(...)` and use
* `recordChannelMessageReplyDispatch` only for compatibility dispatchers that
* have not moved to the message lifecycle yet.
*/
export async function recordInboundSessionAndDispatchReply(
params: RecordChannelMessageReplyDispatchParams,
): Promise<void> {
await recordChannelMessageReplyDispatch(params);
}
/** @deprecated Compatibility helper for legacy reply dispatch bridges. */
export const buildChannelMessageReplyDispatchBase = buildInboundReplyDispatchBase;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const hasFinalChannelMessageReplyDispatch = hasFinalChannelTurnDispatch;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const hasVisibleChannelMessageReplyDispatch = hasVisibleChannelTurnDispatch;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const resolveChannelMessageReplyDispatchCounts = resolveChannelTurnDispatchCounts;

View File

@@ -1,4 +1,5 @@
export { deriveDurableFinalDeliveryRequirements } from "./capabilities.js";
export { defineChannelMessageAdapter } from "./adapter.js";
export { createChannelMessageAdapterFromOutbound } from "./outbound-bridge.js";
export { createDurableInboundReceiveJournal } from "./durable-receive.js";
export {

View File

@@ -0,0 +1,50 @@
import type { ReplyPayload as InternalReplyPayload } from "../../auto-reply/reply-payload.js";
import { readStringValue } from "../../shared/string-coerce.js";
export type OutboundReplyPayload = {
text?: string;
mediaUrls?: string[];
mediaUrl?: string;
presentation?: InternalReplyPayload["presentation"];
/**
* @deprecated Use presentation. Runtime support remains for legacy producers.
*/
interactive?: InternalReplyPayload["interactive"];
channelData?: InternalReplyPayload["channelData"];
sensitiveMedia?: boolean;
replyToId?: string;
};
function readObjectValue(value: unknown): object | undefined {
return value && typeof value === "object" && !Array.isArray(value) ? value : undefined;
}
/** Extract the supported outbound reply fields from loose tool or agent payload objects. */
export function normalizeOutboundReplyPayload(
payload: Record<string, unknown>,
): OutboundReplyPayload {
const text = readStringValue(payload.text);
const mediaUrls = Array.isArray(payload.mediaUrls)
? payload.mediaUrls.filter(
(entry): entry is string => typeof entry === "string" && entry.length > 0,
)
: undefined;
const mediaUrl = readStringValue(payload.mediaUrl);
const presentation = readObjectValue(
payload.presentation,
) as OutboundReplyPayload["presentation"];
const interactive = readObjectValue(payload.interactive) as OutboundReplyPayload["interactive"];
const channelData = readObjectValue(payload.channelData) as OutboundReplyPayload["channelData"];
const sensitiveMedia = payload.sensitiveMedia === true ? true : undefined;
const replyToId = readStringValue(payload.replyToId);
return {
text,
mediaUrls,
mediaUrl,
presentation,
interactive,
channelData,
sensitiveMedia,
replyToId,
};
}

View File

@@ -139,7 +139,7 @@ export {
hasVisibleInboundReplyDispatch,
recordChannelBotPairLoopAndCheckSuppression,
resolveInboundReplyDispatchCounts,
} from "./inbound-reply-dispatch.js";
} from "../channels/message/inbound-reply-dispatch.js";
export type {
AssembledInboundReply,
ChannelBotLoopProtectionFacts,
@@ -148,7 +148,7 @@ export type {
PreparedInboundReply,
InboundReplyDispatchResult,
InboundReplyRecordOptions,
} from "./inbound-reply-dispatch.js";
} from "../channels/message/inbound-reply-dispatch.js";
export {
toHistoryMediaEntries,

View File

@@ -1,4 +1,5 @@
import { describe, expect, it, vi } from "vitest";
import { defineChannelMessageAdapter as defineCoreChannelMessageAdapter } from "../channels/message/index.js";
import { defineChannelMessageAdapter } from "./channel-outbound.js";
describe("defineChannelMessageAdapter", () => {
@@ -28,6 +29,7 @@ describe("defineChannelMessageAdapter", () => {
expect(channelMessageRuntime.withDurableMessageSendContext).toBe(
channelMessage.withDurableMessageSendContext,
);
expect(channelOutbound.defineChannelMessageAdapter).toBe(defineCoreChannelMessageAdapter);
expect(compat.createChannelReplyPipeline).toBe(channelReplyPipeline.createChannelReplyPipeline);
});

View File

@@ -1,9 +1,4 @@
// Shared outbound/message lifecycle helpers for channel plugins.
import type {
ChannelMessageAdapter,
ChannelMessageAdapterShape,
} from "../channels/message/index.js";
import type { ChannelMessageReceiveAdapterShape } from "../channels/message/index.js";
import type {
DurableMessageBatchSendResult,
DurableMessageSendContext,
@@ -23,12 +18,12 @@ export type {
DurableMessageSendContextParams,
} from "../channels/message/runtime.js";
export {
createChannelReplyPipeline as createChannelMessageReplyPipeline,
createReplyPrefixContext,
createReplyPrefixOptions,
createTypingCallbacks,
createChannelReplyPipeline as createChannelMessageReplyPipeline,
resolveChannelSourceReplyDeliveryMode as resolveChannelMessageSourceReplyDeliveryMode,
} from "./channel-reply-core.js";
} from "../channels/message/index.js";
export {
createFinalizableDraftLifecycle,
@@ -89,6 +84,7 @@ export {
listDeclaredReceiveAckPolicies,
createLiveMessageState,
createDurableMessageStateRecord,
defineChannelMessageAdapter,
markLiveMessageCancelled,
markLiveMessageFinalized,
markLiveMessagePreviewUpdated,
@@ -202,24 +198,3 @@ export async function withDurableMessageSendContext<T>(
const mod = await import("../channels/message/runtime.js");
return await mod.withDurableMessageSendContext(params, run);
}
const defaultManualReceiveAdapter = {
defaultAckPolicy: "manual",
supportedAckPolicies: ["manual"],
} as const satisfies ChannelMessageReceiveAdapterShape;
type ChannelMessageAdapterWithDefaultReceive<TAdapter extends ChannelMessageAdapterShape> =
TAdapter & {
receive: TAdapter["receive"] extends undefined
? typeof defaultManualReceiveAdapter
: NonNullable<TAdapter["receive"]>;
};
export function defineChannelMessageAdapter<const TAdapter extends ChannelMessageAdapterShape>(
adapter: TAdapter,
): ChannelMessageAdapter<ChannelMessageAdapterWithDefaultReceive<TAdapter>> {
return {
...adapter,
receive: adapter.receive ?? defaultManualReceiveAdapter,
} as ChannelMessageAdapter<ChannelMessageAdapterWithDefaultReceive<TAdapter>>;
}

View File

@@ -301,7 +301,7 @@ describe("recordInboundSessionAndDispatchReply", () => {
});
});
it("exposes channel-message dispatch names as the canonical helpers for new channel code", () => {
it("keeps deprecated channel-message dispatch names as aliases for focused helpers", () => {
expect(createChannelMessageReplyPipeline).toBe(createChannelReplyPipeline);
expect(resolveChannelMessageSourceReplyDeliveryMode).toBe(
resolveChannelSourceReplyDeliveryMode,

View File

@@ -4,330 +4,40 @@
* delivery helpers.
*/
import { withReplyDispatcher } from "../auto-reply/dispatch.js";
import type { GetReplyOptions } from "../auto-reply/get-reply-options.types.js";
import {
dispatchReplyFromConfig,
type DispatchFromConfigResult,
} from "../auto-reply/reply/dispatch-from-config.js";
import type { DispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.types.js";
import type { ReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.types.js";
import type { FinalizedMsgContext } from "../auto-reply/templating.js";
import {
hasFinalChannelTurnDispatch,
hasVisibleChannelTurnDispatch,
export {
runPreparedInboundReply,
runPreparedInboundReplyTurn,
runChannelInboundEvent,
runInboundReplyTurn,
dispatchChannelInboundReply,
hasFinalInboundReplyDispatch,
hasVisibleInboundReplyDispatch,
deliverDurableInboundReplyPayload,
deliverInboundReplyWithMessageSendContext,
dispatchChannelInboundReply as dispatchChannelInboundReplyCore,
isDurableInboundReplyDeliveryHandled,
resolveChannelTurnDispatchCounts,
recordDroppedChannelTurnHistory,
recordDroppedChannelInboundHistory,
runChannelInboundEvent as runChannelInboundEventCore,
runPreparedInboundReply as runPreparedInboundReplyCore,
throwIfDurableInboundReplyDeliveryFailed,
} from "../channels/turn/kernel.js";
import type {
ChannelTurnResult,
DispatchedChannelTurnResult,
DurableInboundReplyDeliveryOptions,
} from "../channels/turn/kernel.js";
import type {
AssembledChannelTurn,
PreparedChannelTurn,
RunChannelTurnParams,
} from "../channels/turn/types.js";
resolveInboundReplyDispatchCounts,
dispatchReplyFromConfigWithSettledDispatcher,
buildInboundReplyDispatchBase,
dispatchChannelMessageReplyWithBase,
dispatchInboundReplyWithBase,
recordChannelMessageReplyDispatch,
recordInboundSessionAndDispatchReply,
buildChannelMessageReplyDispatchBase,
hasFinalChannelMessageReplyDispatch,
hasVisibleChannelMessageReplyDispatch,
resolveChannelMessageReplyDispatchCounts,
recordChannelBotPairLoopAndCheckSuppression,
} from "../channels/message/inbound-reply-dispatch.js";
export type {
ChannelTurnDroppedHistoryOptions,
ChannelTurnDroppedHistoryOptions as ChannelInboundDroppedHistoryOptions,
ChannelInboundDroppedHistoryOptions,
ChannelTurnRecordOptions,
ChannelTurnRecordOptions as InboundReplyRecordOptions,
} from "../channels/turn/types.js";
export type { DurableInboundReplyDeliveryParams } from "../channels/turn/kernel.js";
export type { ChannelBotLoopProtectionFacts } from "../channels/turn/kernel.js";
export { recordChannelBotPairLoopAndCheckSuppression } from "../channels/turn/kernel.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import {
normalizeOutboundReplyPayload,
type OutboundReplyPayload,
type ReplyPayload,
} from "./reply-payload.js";
type ReplyOptionsWithoutModelSelected = Omit<
Omit<GetReplyOptions, "onBlockReply">,
"onModelSelected"
>;
type RecordInboundSessionFn = typeof import("../channels/session.js").recordInboundSession;
type ReplyDispatchFromConfigOptions = Omit<GetReplyOptions, "onBlockReply">;
export type ChannelInboundEventRunnerParams<
TRaw,
TDispatchResult = DispatchFromConfigResult,
> = RunChannelTurnParams<TRaw, TDispatchResult>;
export type PreparedInboundReply<TDispatchResult> = PreparedChannelTurn<TDispatchResult>;
export type AssembledInboundReply = AssembledChannelTurn;
export type InboundReplyDispatchResult<TDispatchResult> = ChannelTurnResult<TDispatchResult>;
/** Run an already prepared inbound reply through shared session-record + dispatch ordering. */
type PreparedInboundReplyTurnWithBotLoopProtection<TDispatchResult> =
PreparedChannelTurn<TDispatchResult> & {
botLoopProtection: NonNullable<PreparedChannelTurn<TDispatchResult>["botLoopProtection"]>;
};
type PreparedInboundReplyTurnWithoutBotLoopProtection<TDispatchResult> = Omit<
PreparedChannelTurn<TDispatchResult>,
"botLoopProtection"
> & {
botLoopProtection?: undefined;
};
export function runPreparedInboundReply<TDispatchResult>(
params: PreparedInboundReplyTurnWithBotLoopProtection<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>>;
export function runPreparedInboundReply<TDispatchResult>(
params: PreparedInboundReplyTurnWithoutBotLoopProtection<TDispatchResult>,
): Promise<DispatchedChannelTurnResult<TDispatchResult>>;
export function runPreparedInboundReply<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>>;
export async function runPreparedInboundReply<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>> {
return await runPreparedInboundReplyCore(params);
}
/** @deprecated Use `runPreparedInboundReply`. */
export function runPreparedInboundReplyTurn<TDispatchResult>(
params: PreparedInboundReplyTurnWithBotLoopProtection<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>>;
export function runPreparedInboundReplyTurn<TDispatchResult>(
params: PreparedInboundReplyTurnWithoutBotLoopProtection<TDispatchResult>,
): Promise<DispatchedChannelTurnResult<TDispatchResult>>;
export function runPreparedInboundReplyTurn<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>>;
export async function runPreparedInboundReplyTurn<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>> {
return await runPreparedInboundReply(params);
}
export async function runChannelInboundEvent<TRaw, TDispatchResult = DispatchFromConfigResult>(
params: ChannelInboundEventRunnerParams<TRaw, TDispatchResult>,
) {
return await runChannelInboundEventCore(params);
}
/** @deprecated Use `runChannelInboundEvent`. */
export async function runInboundReplyTurn<TRaw, TDispatchResult = DispatchFromConfigResult>(
params: ChannelInboundEventRunnerParams<TRaw, TDispatchResult>,
) {
return await runChannelInboundEvent(params);
}
export async function dispatchChannelInboundReply(params: AssembledInboundReply) {
return await dispatchChannelInboundReplyCore(params);
}
export {
hasFinalChannelTurnDispatch as hasFinalInboundReplyDispatch,
hasVisibleChannelTurnDispatch as hasVisibleInboundReplyDispatch,
deliverInboundReplyWithMessageSendContext as deliverDurableInboundReplyPayload,
deliverInboundReplyWithMessageSendContext,
recordDroppedChannelInboundHistory as recordDroppedChannelTurnHistory,
recordDroppedChannelInboundHistory,
resolveChannelTurnDispatchCounts as resolveInboundReplyDispatchCounts,
};
/** Run `dispatchReplyFromConfig` with a dispatcher that always gets its settled callback. */
export async function dispatchReplyFromConfigWithSettledDispatcher(params: {
cfg: OpenClawConfig;
ctxPayload: FinalizedMsgContext;
dispatcher: ReplyDispatcher;
onSettled: () => void | Promise<void>;
replyOptions?: ReplyDispatchFromConfigOptions;
configOverride?: OpenClawConfig;
}): Promise<DispatchFromConfigResult> {
return await withReplyDispatcher({
dispatcher: params.dispatcher,
onSettled: params.onSettled,
run: () =>
dispatchReplyFromConfig({
ctx: params.ctxPayload,
cfg: params.cfg,
dispatcher: params.dispatcher,
replyOptions: params.replyOptions,
configOverride: params.configOverride,
}),
});
}
/** Assemble the common inbound reply dispatch dependencies for a resolved route. */
export function buildInboundReplyDispatchBase(params: {
cfg: OpenClawConfig;
channel: string;
accountId?: string;
route: {
agentId: string;
sessionKey: string;
};
storePath: string;
ctxPayload: FinalizedMsgContext;
core: {
channel: {
session: {
recordInboundSession: RecordInboundSessionFn;
};
reply: {
dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher;
};
};
};
}) {
return {
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
agentId: params.route.agentId,
routeSessionKey: params.route.sessionKey,
storePath: params.storePath,
ctxPayload: params.ctxPayload,
recordInboundSession: params.core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
params.core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
};
}
type BuildInboundReplyDispatchBaseParams = Parameters<typeof buildInboundReplyDispatchBase>[0];
type RecordChannelMessageReplyDispatchParams = {
cfg: OpenClawConfig;
channel: string;
accountId?: string;
agentId: string;
routeSessionKey: string;
storePath: string;
ctxPayload: FinalizedMsgContext;
recordInboundSession: RecordInboundSessionFn;
dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher;
deliver: (payload: OutboundReplyPayload) => Promise<void>;
durable?: false | DurableInboundReplyDeliveryOptions;
onRecordError: (err: unknown) => void;
onDispatchError: (err: unknown, info: { kind: string }) => void;
replyOptions?: ReplyOptionsWithoutModelSelected;
};
/**
* Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn.
*
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
* expose a `message` adapter via `defineChannelMessageAdapter(...)` and route
* sends through `deliverInboundReplyWithMessageSendContext(...)` or
* `sendDurableMessageBatch(...)`.
*/
export async function dispatchChannelMessageReplyWithBase(
params: BuildInboundReplyDispatchBaseParams &
Pick<
RecordChannelMessageReplyDispatchParams,
"deliver" | "durable" | "onRecordError" | "onDispatchError" | "replyOptions"
>,
): Promise<void> {
const dispatchBase = buildInboundReplyDispatchBase(params);
await recordChannelMessageReplyDispatch({
...dispatchBase,
deliver: params.deliver,
durable: params.durable,
onRecordError: params.onRecordError,
onDispatchError: params.onDispatchError,
replyOptions: params.replyOptions,
});
}
/**
* Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn.
*
* @deprecated Legacy inbound reply helper. New channel plugins should expose a
* `message` adapter via `defineChannelMessageAdapter(...)` and use
* `dispatchChannelMessageReplyWithBase` only for compatibility dispatchers that
* have not moved to the message lifecycle yet.
*/
export async function dispatchInboundReplyWithBase(
params: Parameters<typeof dispatchChannelMessageReplyWithBase>[0],
): Promise<void> {
await dispatchChannelMessageReplyWithBase(params);
}
/**
* Record the inbound session first, then dispatch the reply using normalized outbound delivery.
*
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
* expose a `message` adapter via `defineChannelMessageAdapter(...)` and route
* sends through `deliverInboundReplyWithMessageSendContext(...)` or
* `sendDurableMessageBatch(...)`.
*/
export async function recordChannelMessageReplyDispatch(
params: RecordChannelMessageReplyDispatchParams,
): Promise<void> {
await dispatchChannelInboundReplyCore({
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
agentId: params.agentId,
routeSessionKey: params.routeSessionKey,
storePath: params.storePath,
ctxPayload: params.ctxPayload,
recordInboundSession: params.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher: params.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
preparePayload: (payload) =>
(payload && typeof payload === "object"
? normalizeOutboundReplyPayload(payload as Record<string, unknown>)
: {}) as ReplyPayload,
deliver: async (payload, info) => {
if (params.durable) {
const durable = await deliverInboundReplyWithMessageSendContext({
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
agentId: params.agentId,
ctxPayload: params.ctxPayload,
payload,
info,
...params.durable,
});
throwIfDurableInboundReplyDeliveryFailed(durable);
if (isDurableInboundReplyDeliveryHandled(durable)) {
return durable.delivery;
}
}
return await params.deliver(payload as OutboundReplyPayload);
},
onError: params.onDispatchError,
},
replyPipeline: {},
replyOptions: params.replyOptions,
record: {
onRecordError: params.onRecordError,
},
});
}
/**
* Record the inbound session first, then dispatch the reply using normalized outbound delivery.
*
* @deprecated Legacy inbound reply helper. New channel plugins should expose a
* `message` adapter via `defineChannelMessageAdapter(...)` and use
* `recordChannelMessageReplyDispatch` only for compatibility dispatchers that
* have not moved to the message lifecycle yet.
*/
export async function recordInboundSessionAndDispatchReply(
params: RecordChannelMessageReplyDispatchParams,
): Promise<void> {
await recordChannelMessageReplyDispatch(params);
}
/** @deprecated Compatibility helper for legacy reply dispatch bridges. */
export const buildChannelMessageReplyDispatchBase = buildInboundReplyDispatchBase;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const hasFinalChannelMessageReplyDispatch = hasFinalChannelTurnDispatch;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const hasVisibleChannelMessageReplyDispatch = hasVisibleChannelTurnDispatch;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const resolveChannelMessageReplyDispatchCounts = resolveChannelTurnDispatchCounts;
InboundReplyRecordOptions,
DurableInboundReplyDeliveryParams,
ChannelBotLoopProtectionFacts,
ChannelInboundEventRunnerParams,
PreparedInboundReply,
AssembledInboundReply,
InboundReplyDispatchResult,
} from "../channels/message/inbound-reply-dispatch.js";

View File

@@ -1,8 +1,9 @@
import type { ReplyPayload as InternalReplyPayload } from "../auto-reply/reply-payload.js";
import type { ChannelOutboundAdapter } from "../channels/plugins/outbound.types.js";
import { normalizeOutboundReplyPayload as normalizeCoreOutboundReplyPayload } from "../infra/outbound/reply-payload-normalize.js";
import { createReplyToFanout } from "../infra/outbound/reply-policy.js";
import { hasReplyPayloadContent } from "../interactive/payload.js";
import { normalizeLowercaseStringOrEmpty, readStringValue } from "../shared/string-coerce.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { normalizeStringEntries } from "../shared/string-normalization.js";
export type { MediaPayload, MediaPayloadInput } from "../channels/plugins/media-payload.js";
@@ -54,10 +55,6 @@ type SendPayloadAdapter = Pick<
const REASONING_PREFIX_RE = /^(?:reasoning:|thinking\.{0,3}(?=\s*(?:>\s*)?_))/u;
function readObjectValue(value: unknown): object | undefined {
return value && typeof value === "object" && !Array.isArray(value) ? value : undefined;
}
function trimLeadingMarkdownQuoteMarkers(text: string): string {
let candidate = text.trimStart();
while (candidate.startsWith(">")) {
@@ -86,30 +83,7 @@ export function isReasoningReplyPayload(payload: ReasoningReplyPayload): boolean
export function normalizeOutboundReplyPayload(
payload: Record<string, unknown>,
): OutboundReplyPayload {
const text = readStringValue(payload.text);
const mediaUrls = Array.isArray(payload.mediaUrls)
? payload.mediaUrls.filter(
(entry): entry is string => typeof entry === "string" && entry.length > 0,
)
: undefined;
const mediaUrl = readStringValue(payload.mediaUrl);
const presentation = readObjectValue(
payload.presentation,
) as OutboundReplyPayload["presentation"];
const interactive = readObjectValue(payload.interactive) as OutboundReplyPayload["interactive"];
const channelData = readObjectValue(payload.channelData) as OutboundReplyPayload["channelData"];
const sensitiveMedia = payload.sensitiveMedia === true ? true : undefined;
const replyToId = readStringValue(payload.replyToId);
return {
text,
mediaUrls,
mediaUrl,
presentation,
interactive,
channelData,
sensitiveMedia,
replyToId,
};
return normalizeCoreOutboundReplyPayload(payload);
}
/** Wrap a deliverer so callers can hand it arbitrary payloads while channels receive normalized data. */