fix(outbound): thread session keys into outbound hooks (#73706)

Thread the canonical outbound session key into plugin message_sending and message_sent hook contexts, and align native command redirect routed delivery with the agent runtime session key. This lets plugins correlate agent_end with outbound delivery hooks without seeing missing or divergent session keys.

Verification:
- gh pr checks 73706 --repo openclaw/openclaw --watch=false
- Real behavior proof: https://github.com/openclaw/openclaw/actions/runs/26526635074/job/78131933497

Thanks @zeroaltitude.

Co-authored-by: Edward Abrams <zeroaltitude@gmail.com>
This commit is contained in:
Edward Abrams
2026-05-27 16:43:27 -07:00
committed by GitHub
parent c9151ba902
commit 05db911775
7 changed files with 563 additions and 3 deletions

View File

@@ -0,0 +1,193 @@
/**
* Real-runtime behavior proof for #73706.
*
* This script does NOT use vitest mocks. It wires up the production
* `deliverOutboundPayloads` path against:
* - a real `PluginRegistry` populated with one real channel plugin and
* two real plugin hooks (`message_sending`, `message_sent`)
* - the real `getGlobalHookRunner()` / `initializeGlobalHookRunner()`
* singleton path (no fake hook runner)
* - the real `setActivePluginRegistry` channel resolution path (no
* fake channel adapter)
*
* It then exercises three scenarios:
*
* 1. Direct outbound delivery with `session.key` set: confirms the
* `message_sending` and `message_sent` hook contexts both receive
* the canonical `sessionKey`.
*
* 2. Direct outbound delivery with NO session: confirms `sessionKey`
* is absent from both hook contexts (the "narrowed" docs branch).
*
* 3. Native-redirect simulation: outbound delivery whose `session.key`
* is set to the redirect TARGET session (i.e., what the agent
* runtime resolves as `params.sessionKey` when
* `CommandTargetSessionKey` is set and `CommandSource === "native"`,
* and what `dispatch-from-config.ts` now passes through to
* `routeReply`). Confirms `message_sending` / `message_sent`
* observe the redirect-target session, NOT the inbound session.
* This is the runtime invariant Clawsweeper asked us to pin
* with a regression test.
*
* Run with:
* pnpm tsx scripts/proof-73706-message-sending-session-key.ts
*/
import { deliverOutboundPayloads } from "../src/infra/outbound/deliver.js";
import type {
PluginHookMessageContext,
PluginHookMessageReceivedEvent,
} from "../src/plugins/hook-message.types.js";
import { initializeGlobalHookRunner } from "../src/plugins/hook-runner-global.js";
import { addTestHook, createMockPluginRegistry } from "../src/plugins/hooks.test-helpers.js";
import type { PluginRegistry } from "../src/plugins/registry.js";
import { setActivePluginRegistry } from "../src/plugins/runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../src/test-utils/channel-plugins.js";
type CapturedContext = {
hook: "message_sending" | "message_sent";
ctx: PluginHookMessageContext;
event: PluginHookMessageReceivedEvent;
};
function buildRegistry(captured: CapturedContext[], channelId: "matrix"): PluginRegistry {
// Real outbound channel plugin: returns a synthetic delivery result
// without touching any network. This drives `deliverOutboundPayloads`
// through its real channel-resolution + sendText path.
const sendText = async () => ({
channel: channelId,
messageId: `mx-${Date.now()}`,
roomId: "!room:example",
});
const channelRegistry = createTestRegistry([
{
pluginId: channelId,
source: "proof",
plugin: createOutboundTestPlugin({
id: channelId,
outbound: { deliveryMode: "direct", sendText },
}),
},
]);
// Real hook handlers: capture exactly what delivery hands to plugins.
const hookRegistry = createMockPluginRegistry([]);
addTestHook({
registry: hookRegistry,
pluginId: "proof-message-sending",
hookName: "message_sending",
handler: async (event: unknown, ctx: unknown) => {
captured.push({
hook: "message_sending",
ctx: ctx as PluginHookMessageContext,
event: event as PluginHookMessageReceivedEvent,
});
// Returning undefined means "do not modify or cancel".
return undefined;
},
});
addTestHook({
registry: hookRegistry,
pluginId: "proof-message-sent",
hookName: "message_sent",
handler: async (event: unknown, ctx: unknown) => {
captured.push({
hook: "message_sent",
ctx: ctx as PluginHookMessageContext,
event: event as PluginHookMessageReceivedEvent,
});
},
});
return {
...channelRegistry,
hooks: hookRegistry.hooks,
typedHooks: hookRegistry.typedHooks,
plugins: hookRegistry.plugins,
};
}
async function runScenario(
label: string,
opts: { sessionKey?: string },
): Promise<CapturedContext[]> {
const captured: CapturedContext[] = [];
const registry = buildRegistry(captured, "matrix");
setActivePluginRegistry(registry);
initializeGlobalHookRunner(registry);
const result = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: `proof: ${label}` }],
skipQueue: true,
...(opts.sessionKey ? { session: { key: opts.sessionKey } } : {}),
});
console.log(`\n=== Scenario: ${label} ===`);
console.log(`deliverOutboundPayloads result:`, JSON.stringify(result));
for (const entry of captured) {
console.log(
`[${entry.hook}] ctx.sessionKey = ${
entry.ctx.sessionKey === undefined ? "(undefined)" : JSON.stringify(entry.ctx.sessionKey)
}`,
);
console.log(`[${entry.hook}] full ctx = ${JSON.stringify(entry.ctx)}`);
}
return captured;
}
async function main() {
console.log("[proof-73706] Real-runtime behavior proof for outbound session-key threading.");
console.log(
"[proof-73706] Production code paths: deliverOutboundPayloads + getGlobalHookRunner.",
);
const scenario1 = await runScenario(
"outbound delivery WITH session.key (canonical key from agent runtime)",
{ sessionKey: "agent:tank:slack:channel:CHAN1" },
);
const scenario2 = await runScenario(
"outbound delivery WITHOUT session (narrowed docs branch)",
{},
);
const scenario3 = await runScenario(
"native-redirect: session.key = CommandTargetSessionKey (what dispatch-from-config.ts now passes)",
{ sessionKey: "agent:tank:telegram:direct:999" },
);
// Assertions — make the proof self-checking so the captured output is
// not silently green when the runtime regresses.
const expectFromHook = (
captured: CapturedContext[],
hook: "message_sending" | "message_sent",
expected: string | undefined,
): void => {
const entry = captured.find((c) => c.hook === hook);
if (!entry) {
throw new Error(`[proof-73706] No ${hook} hook fired.`);
}
if (entry.ctx.sessionKey !== expected) {
throw new Error(
`[proof-73706] ${hook} sessionKey mismatch: expected ${JSON.stringify(expected)} got ${JSON.stringify(entry.ctx.sessionKey)}`,
);
}
};
expectFromHook(scenario1, "message_sending", "agent:tank:slack:channel:CHAN1");
expectFromHook(scenario1, "message_sent", "agent:tank:slack:channel:CHAN1");
expectFromHook(scenario2, "message_sending", undefined);
expectFromHook(scenario2, "message_sent", undefined);
expectFromHook(scenario3, "message_sending", "agent:tank:telegram:direct:999");
expectFromHook(scenario3, "message_sent", "agent:tank:telegram:direct:999");
console.log("\n[proof-73706] All runtime assertions passed.");
}
main().catch((err) => {
console.error("[proof-73706] FAILED:", err);
process.exitCode = 1;
});

View File

@@ -4632,6 +4632,83 @@ describe("dispatchReplyFromConfig", () => {
expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);
});
it("routes native-command-redirect replies using the redirect target sessionKey for outbound delivery", async () => {
// Regression test for the native redirect session-key contract:
// when a native command targets a different session via
// `CommandTargetSessionKey`, the agent runtime resolves its
// `params.sessionKey` as `CommandTargetSessionKey ?? SessionKey`
// (see `get-reply.ts`). Routed reply delivery must mirror that so
// `agent_end` (fired with the runtime sessionKey) and the outbound
// `message_sending` hook (fired with `OutboundSessionContext.key`)
// see the same canonical key. Without this alignment, plugins
// correlating per-turn state across `agent_end` and `message_sending`
// would receive divergent keys on every native redirect.
setNoAbort();
mocks.routeReply.mockClear();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "slack",
Surface: "slack",
AccountId: "acc-1",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:999",
CommandSource: "native",
SessionKey: "agent:main:slack:channel:CHAN1",
CommandTargetSessionKey: "agent:main:telegram:direct:999",
});
const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload;
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(mocks.routeReply).toHaveBeenCalledTimes(1);
expect(mocks.routeReply).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:999",
sessionKey: "agent:main:telegram:direct:999",
policySessionKey: "agent:main:telegram:direct:999",
}),
);
});
it("routes non-native (text) command replies using the inbound sessionKey for outbound delivery", async () => {
// Companion regression test: for non-native commands the routed
// reply must keep the inbound `SessionKey` as both the canonical
// session key and the policy key, even if `CommandTargetSessionKey`
// happens to be set on the context. This guards against accidental
// generalization of the native-redirect branch.
setNoAbort();
mocks.routeReply.mockClear();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "slack",
Surface: "slack",
AccountId: "acc-1",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:999",
CommandSource: "text",
SessionKey: "agent:main:slack:channel:CHAN1",
CommandTargetSessionKey: "agent:main:telegram:direct:999",
});
const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload;
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(mocks.routeReply).toHaveBeenCalledTimes(1);
expect(mocks.routeReply).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:999",
sessionKey: "agent:main:slack:channel:CHAN1",
policySessionKey: "agent:main:slack:channel:CHAN1",
}),
);
});
it("emits diagnostics when enabled", async () => {
setNoAbort();
const cfg = { diagnostics: { enabled: true } } as OpenClawConfig;

View File

@@ -1304,11 +1304,18 @@ export async function dispatchReplyFromConfig(
return null;
}
markInboundDedupeReplayUnsafe();
// Outbound session.key must match the session key used by the agent
// runtime that produced this payload, so agent_end and message delivery
// hooks expose the same canonical key for native command redirects.
const agentRuntimeSessionKey =
ctx.CommandSource === "native"
? (resolveCommandTurnTargetSessionKey(ctx) ?? ctx.SessionKey)
: ctx.SessionKey;
return await routeReplyRuntime.routeReply({
payload,
channel: routeReplyChannel,
to: routeReplyTo,
sessionKey: ctx.SessionKey,
sessionKey: agentRuntimeSessionKey,
policySessionKey: resolveCommandTurnTargetSessionKey(ctx) ?? ctx.SessionKey,
policyConversationType: resolveRoutedPolicyConversationType(ctx),
accountId: replyRoute.accountId,

View File

@@ -2946,6 +2946,203 @@ describe("deliverOutboundPayloads", () => {
expect(sentCall?.[1]?.channelId).toBe("matrix");
});
it("threads sessionKey into the message_sending hook context when session is provided", async () => {
hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "message_sending",
);
const sendText = vi.fn().mockResolvedValue({
channel: "matrix" as const,
messageId: "mx-1",
roomId: "!room",
});
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendText },
}),
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room",
payloads: [{ text: "hello" }],
session: { key: "agent:tank:main" },
});
expect(hookMocks.runner.runMessageSending).toHaveBeenCalledTimes(1);
expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({
channelId: "matrix",
sessionKey: "agent:tank:main",
}),
);
});
it("forwards session.key (canonical) into message_sending ctx and never falls back to policyKey", async () => {
// Contract test for OutboundSessionContext.key semantics:
// session.key MUST reach plugins via ctx.sessionKey, even when a
// different session.policyKey is also present. Delivery must not hand
// the policy key to plugins that correlate against agent_end.
hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "message_sending",
);
const sendText = vi.fn().mockResolvedValue({
channel: "matrix" as const,
messageId: "mx-3",
roomId: "!room",
});
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendText },
}),
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room",
payloads: [{ text: "hi" }],
session: {
key: "agent:tank:main",
policyKey: "agent:tank:discord:tank:direct:1594",
},
});
expect(hookMocks.runner.runMessageSending).toHaveBeenCalledTimes(1);
expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({ sessionKey: "agent:tank:main" }),
);
});
it("omits sessionKey from the message_sending hook context when session is absent", async () => {
hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "message_sending",
);
const sendText = vi.fn().mockResolvedValue({
channel: "matrix" as const,
messageId: "mx-2",
roomId: "!room",
});
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendText },
}),
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room",
payloads: [{ text: "hi" }],
});
expect(hookMocks.runner.runMessageSending).toHaveBeenCalledTimes(1);
const ctx = hookMocks.runner.runMessageSending.mock.calls[0]?.[1] as {
sessionKey?: string;
};
expect(ctx?.sessionKey).toBeUndefined();
expect(ctx).not.toHaveProperty("sessionKey");
});
it("threads sessionKey into the message_sent hook context when session is provided", async () => {
// Contract test for `message_sent`: the documented JSDoc says the
// outbound delivery hooks mirror `OutboundSessionContext.key`. This
// test pins `message_sent` to that contract so it cannot diverge
// from `message_sending` unobserved.
hookMocks.runner.hasHooks.mockReturnValue(true);
const sendText = vi.fn().mockResolvedValue({
channel: "matrix" as const,
messageId: "mx-sent-1",
roomId: "!room",
});
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendText },
}),
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room",
payloads: [{ text: "hello" }],
session: { key: "agent:tank:main" },
});
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledTimes(1);
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
expect.objectContaining({ to: "!room", content: "hello", success: true }),
expect.objectContaining({
channelId: "matrix",
sessionKey: "agent:tank:main",
}),
);
});
it("omits sessionKey from the message_sent hook context when session is absent", async () => {
hookMocks.runner.hasHooks.mockReturnValue(true);
const sendText = vi.fn().mockResolvedValue({
channel: "matrix" as const,
messageId: "mx-sent-2",
roomId: "!room",
});
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendText },
}),
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room",
payloads: [{ text: "hi" }],
});
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledTimes(1);
const sentCtx = hookMocks.runner.runMessageSent.mock.calls[0]?.[1] as {
sessionKey?: string;
};
expect(sentCtx?.sessionKey).toBeUndefined();
});
it("short-circuits lower-priority message_sending hooks after cancel=true", async () => {
const hookRegistry = createEmptyPluginRegistry();
const high = vi.fn().mockResolvedValue({ cancel: true, content: "blocked" });

View File

@@ -685,6 +685,15 @@ type MessageSentEvent = {
messageId?: string;
};
/**
* Best-effort session identifier for delivery telemetry only. Falls back to
* `policyKey` as a last resort so diagnostic emission still has a stable
* string when neither mirror nor canonical key are available. **Do not use
* this value for hook-context correlation** — use `sessionKeyForInternalHooks`
* (mirror.sessionKey ?? session.key, no policyKey fallback) instead, so we
* never accidentally hand the policy key to plugins that expect the canonical
* session key.
*/
function sessionKeyForDeliveryDiagnostics(params: {
mirror?: DeliveryMirror;
session?: OutboundSessionContext;
@@ -1005,6 +1014,14 @@ function createMessageSentEmitter(params: {
channelId: params.channel,
accountId: params.accountId ?? undefined,
conversationId: params.to,
// Mirror the canonical outbound session key into the `message_sent`
// hook context so plugins that observe both `message_sending` and
// `message_sent` see the same `sessionKey` (and so it matches the
// value the internal `message:sent` hook fires with). The value is
// already computed for the internal hook below; reusing it here
// keeps the contract documented in `PluginHookMessageContext`
// honest for both outbound delivery hooks.
sessionKey: params.sessionKeyForInternalHooks,
messageId: event.messageId,
isGroup: params.mirrorIsGroup,
groupId: params.mirrorGroupId,
@@ -1052,6 +1069,7 @@ async function applyMessageSendingHook(params: {
accountId?: string;
replyToId?: string | null;
threadId?: string | number | null;
sessionKey?: string;
}): Promise<{
cancelled: boolean;
cancelReason?: string;
@@ -1085,6 +1103,7 @@ async function applyMessageSendingHook(params: {
channelId: params.channel,
accountId: params.accountId ?? undefined,
conversationId: params.to,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
},
);
if (sendingResult?.cancel) {
@@ -1454,6 +1473,15 @@ async function deliverOutboundPayloadsCore(
});
}
const hookRunner = getGlobalHookRunner();
// Canonical session key forwarded to internal lifecycle hooks
// (`message:sent` event, `message_sending` plugin hook ctx, etc.). Mirror
// delivery wins because mirror sends are explicitly bound to the mirror's
// session; otherwise we use `session.key`, which by contract equals the
// agent runtime's `params.sessionKey` for the run that produced the
// payload (see OutboundSessionContext.key JSDoc). We deliberately do NOT
// fall back to `session.policyKey` here — the policy key describes the
// delivery target's policy, not the canonical control session, and
// handing it to plugins that correlate against agent_end would be wrong.
const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key;
const mirrorIsGroup = params.mirror?.isGroup;
const mirrorGroupId = params.mirror?.groupId;
@@ -1535,6 +1563,7 @@ async function deliverOutboundPayloadsCore(
accountId,
replyToId: resolveCurrentReplyTo(payload).replyToId,
threadId: params.threadId,
sessionKey: sessionKeyForInternalHooks,
});
if (hookResult.cancelled) {
const hookEffect =

View File

@@ -5,9 +5,31 @@ import type { SilentReplyConversationType } from "../../shared/silent-reply-poli
import { normalizeOptionalString } from "../../shared/string-coerce.js";
export type OutboundSessionContext = {
/** Canonical session key used for internal hook dispatch. */
/**
* Canonical session key used for internal hook dispatch.
*
* MUST equal the agent runtime's `params.sessionKey` for the run that
* produced the payload being delivered. Plugins observing both
* `agent_end`/`llm_input`/`llm_output`/`before_tool_call`/`after_tool_call`
* and `message_sending`/`message_sent` rely on this equality to correlate
* per-turn state across the agent-loop and delivery boundaries.
*
* Callers populating this field should use the same value the agent runner
* received as its sessionKey — in the chat path that is
* `targetSessionKey || ctx.SessionKey` (see
* `auto-reply/reply/get-reply.ts`). Followup, ACP, command, and cron
* delivery paths each have their own canonical value to forward; consult
* the relevant runner.
*/
key?: string;
/** Session key used for policy resolution when delivery differs from the control session. */
/**
* Session key used for policy resolution when delivery differs from the
* control session. Used to look up silent-reply policy, send rate limits,
* agent-scoped channel preferences, etc., for the chat the reply is being
* delivered into. May equal `key` when there is no redirect; otherwise
* `policyKey` describes the *delivery target*'s session while `key`
* describes the *control session* whose hooks fire.
*/
policyKey?: string;
/** Explicit conversation type for policy resolution when a session key is generic. */
conversationType?: SilentReplyConversationType;

View File

@@ -5,7 +5,42 @@ export type PluginHookMessageContext = {
channelId: string;
accountId?: string;
conversationId?: string;
/**
* Canonical session key for this conversation — the same value the agent
* runtime sees as `params.sessionKey` for the run that produced the
* outbound payload, and the same value `agent_end`/`llm_input`/`llm_output`
* fire with. Plugins correlating per-turn state across `agent_end` and
* `message_sending` rely on this equality.
*
* For inbound message hooks (`inbound_claim` etc.), this is the canonical
* session for the inbound conversation as resolved by `resolveSessionKey`
* / `deriveInboundMessageHookContext`.
*
* For outbound delivery hooks (`message_sending` and `message_sent`),
* this mirrors `OutboundSessionContext.key` from the dispatch path when
* delivery has a session attached. When the outbound path has no
* resolvable session (e.g. internal smoke runs without
* `OutboundSessionContext`), this field is omitted; plugins must treat
* it as optional.
*/
sessionKey?: string;
/**
* Per-turn run identifier (UUID), unique to one end-to-end agent turn:
* stable across all LLM-call iterations, retry attempts (compaction,
* empty-response, planning-only, etc.), and multi-payload reply chunks
* within that turn; distinct for each new inbound user message and for
* each cron/heartbeat/followup-triggered run.
*
* Generated once in `agent-runner-execution.ts`/`followup-runner.ts` via
* `crypto.randomUUID()`. Currently populated for inbound message hooks
* (`inbound_claim`, `message_received`) and for agent-runtime hooks that
* already receive the run id (e.g. `agent_end`, `llm_input`, `llm_output`).
* It is **not yet** plumbed through the outbound delivery path, so
* plugins observing `message_sending` / `message_sent` should not rely
* on `runId` to correlate against `agent_end`; use `sessionKey` for
* outbound→inbound correlation today (with the caveat that it cannot
* disambiguate concurrent turns in the same session).
*/
runId?: string;
messageId?: string;
senderId?: string;