mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-27 01:43:29 +08:00
Compare commits
3 Commits
v2026.6.10
...
fix/slack-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa1c472a21 | ||
|
|
848129c05b | ||
|
|
b14fe065bb |
@@ -53,6 +53,7 @@ Docs: https://docs.openclaw.ai
|
||||
- CLI/media understanding: make `openclaw infer image describe --model <provider/model>` execute the explicit image model instead of skipping description when that model supports native vision.
|
||||
- Usage/providers: keep plugin-owned usage auth enabled when manifest-declared provider auth env vars such as `MINIMAX_CODE_PLAN_KEY` are present, so `/usage` can resolve MiniMax billing credentials through the provider plugin.
|
||||
- Tlon/uploads: route both hosted Memex upload targets and custom-S3 presigned upload URLs through the shared SSRF guard so blocked private or loopback destinations fail before upload, while public upload URLs continue through the existing hosted upload flow. (#69794) Thanks @drobison00.
|
||||
- Channels/thread routing: keep outbound replies in existing Slack, Mattermost, Matrix, Telegram, Discord, and QA-channel thread sessions by sharing the Plugin SDK thread-aware route builder across bundled plugins.
|
||||
|
||||
## 2026.4.20
|
||||
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
3a2cde4b15041b5456420b2052b572f9968a93690814d2cf924382fd2f54d1d3 plugin-sdk-api-baseline.json
|
||||
38cd9086be93fc9531a8036812c197118c7830d52d40424be08dc9c6d51092e2 plugin-sdk-api-baseline.jsonl
|
||||
d7f6e6ecdfb78c73760689af5a684c20ec7ca28509d4f63bf0d990a2d739c6ce plugin-sdk-api-baseline.json
|
||||
584681e4436a4e84c2ff20196ff194a63915caf4dda70de9c27f34ab0d7bde0b plugin-sdk-api-baseline.jsonl
|
||||
|
||||
@@ -176,6 +176,12 @@ surfaces:
|
||||
- `openclaw/plugin-sdk/outbound-media` and
|
||||
`openclaw/plugin-sdk/outbound-runtime` for media loading plus outbound
|
||||
identity/send delegates and payload planning
|
||||
- `buildThreadAwareOutboundSessionRoute(...)` from
|
||||
`openclaw/plugin-sdk/channel-core` when an outbound route should preserve an
|
||||
explicit `replyToId`/`threadId` or recover the current `:thread:` session
|
||||
after the base session key still matches. Provider plugins can override
|
||||
precedence, suffix behavior, and thread id normalization when their platform
|
||||
has native thread delivery semantics.
|
||||
- `openclaw/plugin-sdk/thread-bindings-runtime` for thread-binding lifecycle
|
||||
and adapter registration
|
||||
- `openclaw/plugin-sdk/agent-media-payload` only when a legacy agent/media
|
||||
|
||||
34
extensions/discord/src/outbound-session-route.test.ts
Normal file
34
extensions/discord/src/outbound-session-route.test.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { resolveDiscordOutboundSessionRoute } from "./outbound-session-route.js";
|
||||
|
||||
describe("resolveDiscordOutboundSessionRoute", () => {
|
||||
it("keeps explicit delivery thread ids without adding a session suffix", () => {
|
||||
const route = resolveDiscordOutboundSessionRoute({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
target: "channel:123",
|
||||
threadId: "thread-1",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:discord:channel:123",
|
||||
baseSessionKey: "agent:main:discord:channel:123",
|
||||
threadId: "thread-1",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not promote replyToId into Discord delivery thread metadata", () => {
|
||||
const route = resolveDiscordOutboundSessionRoute({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
target: "channel:123",
|
||||
replyToId: "message-1",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:discord:channel:123",
|
||||
baseSessionKey: "agent:main:discord:channel:123",
|
||||
});
|
||||
expect(route?.threadId).toBeUndefined();
|
||||
});
|
||||
});
|
||||
@@ -1,10 +1,6 @@
|
||||
import { buildThreadAwareOutboundSessionRoute } from "openclaw/plugin-sdk/channel-core";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
|
||||
import {
|
||||
buildOutboundBaseSessionKey,
|
||||
normalizeOutboundThreadId,
|
||||
resolveThreadSessionKeys,
|
||||
type RoutePeer,
|
||||
} from "openclaw/plugin-sdk/routing";
|
||||
import { buildOutboundBaseSessionKey, type RoutePeer } from "openclaw/plugin-sdk/routing";
|
||||
import { parseDiscordTarget } from "./target-parsing.js";
|
||||
|
||||
export type ResolveDiscordOutboundSessionRouteParams = {
|
||||
@@ -38,22 +34,19 @@ export function resolveDiscordOutboundSessionRoute(
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
const explicitThreadId = normalizeOutboundThreadId(params.threadId);
|
||||
const threadCandidate = explicitThreadId ?? normalizeOutboundThreadId(params.replyToId);
|
||||
const threadKeys = resolveThreadSessionKeys({
|
||||
baseSessionKey,
|
||||
threadId: threadCandidate,
|
||||
return buildThreadAwareOutboundSessionRoute({
|
||||
route: {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isDm ? ("direct" as const) : ("channel" as const),
|
||||
from: isDm ? `discord:${parsed.id}` : `discord:channel:${parsed.id}`,
|
||||
to: isDm ? `user:${parsed.id}` : `channel:${parsed.id}`,
|
||||
},
|
||||
threadId: params.threadId,
|
||||
precedence: ["threadId"],
|
||||
useSuffix: false,
|
||||
});
|
||||
return {
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isDm ? ("direct" as const) : ("channel" as const),
|
||||
from: isDm ? `discord:${parsed.id}` : `discord:channel:${parsed.id}`,
|
||||
to: isDm ? `user:${parsed.id}` : `channel:${parsed.id}`,
|
||||
threadId: explicitThreadId ?? undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveDiscordOutboundTargetKindHint(params: {
|
||||
|
||||
@@ -266,4 +266,71 @@ describe("resolveMatrixOutboundSessionRoute", () => {
|
||||
|
||||
expectCurrentDmRoomRoute(route);
|
||||
});
|
||||
|
||||
it("recovers channel thread routes from currentSessionKey and preserves Matrix event-id case", () => {
|
||||
const route = resolveMatrixOutboundSessionRoute({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
target: "room:!Ops:Example.Org",
|
||||
currentSessionKey: "agent:main:matrix:channel:!ops:example.org:thread:$RootEvent:Example.Org",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:matrix:channel:!ops:example.org:thread:$RootEvent:Example.Org",
|
||||
baseSessionKey: "agent:main:matrix:channel:!ops:example.org",
|
||||
threadId: "$RootEvent:Example.Org",
|
||||
});
|
||||
});
|
||||
|
||||
it("resolves per-room DM metadata from the base key when currentSessionKey has a thread suffix", () => {
|
||||
const storedSession = createStoredDirectDmSession();
|
||||
const route = resolveUserRoute({
|
||||
cfg: createMatrixRouteConfig({
|
||||
[currentDmSessionKey]: storedSession,
|
||||
}),
|
||||
accountId: "ops",
|
||||
target: "@alice:example.org",
|
||||
});
|
||||
const threadedRoute = resolveMatrixOutboundSessionRoute({
|
||||
cfg: createMatrixRouteConfig({
|
||||
[route?.baseSessionKey ?? currentDmSessionKey]: storedSession,
|
||||
}),
|
||||
agentId: "main",
|
||||
accountId: "ops",
|
||||
target: "@alice:example.org",
|
||||
resolvedTarget: {
|
||||
to: "@alice:example.org",
|
||||
kind: "user",
|
||||
source: "normalized",
|
||||
},
|
||||
currentSessionKey: `${route?.baseSessionKey}:thread:$DmRoot:Example.Org`,
|
||||
});
|
||||
|
||||
expect(threadedRoute).toMatchObject({
|
||||
sessionKey: `${route?.baseSessionKey}:thread:$DmRoot:Example.Org`,
|
||||
baseSessionKey: route?.baseSessionKey,
|
||||
to: "room:!dm:example.org",
|
||||
threadId: "$DmRoot:Example.Org",
|
||||
});
|
||||
});
|
||||
|
||||
it('does not recover currentSessionKey threads for shared dmScope "main" DMs', () => {
|
||||
const route = resolveMatrixOutboundSessionRoute({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
target: "@alice:example.org",
|
||||
currentSessionKey: "agent:main:main:thread:$DmRoot:Example.Org",
|
||||
resolvedTarget: {
|
||||
to: "@alice:example.org",
|
||||
kind: "user",
|
||||
source: "normalized",
|
||||
},
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:main",
|
||||
baseSessionKey: "agent:main:main",
|
||||
});
|
||||
expect(route?.threadId).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { normalizeAccountId } from "openclaw/plugin-sdk/account-id";
|
||||
import {
|
||||
buildChannelOutboundSessionRoute,
|
||||
buildThreadAwareOutboundSessionRoute,
|
||||
type ChannelOutboundSessionRouteParams,
|
||||
} from "openclaw/plugin-sdk/channel-core";
|
||||
import {
|
||||
@@ -8,6 +9,7 @@ import {
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
} from "openclaw/plugin-sdk/config-runtime";
|
||||
import { parseThreadSessionSuffix } from "openclaw/plugin-sdk/routing";
|
||||
import { resolveMatrixAccountConfig } from "./matrix/account-config.js";
|
||||
import { resolveDefaultMatrixAccountId } from "./matrix/accounts.js";
|
||||
import { resolveMatrixStoredSessionMeta } from "./matrix/session-store-metadata.js";
|
||||
@@ -38,7 +40,9 @@ function resolveMatrixCurrentDmRoomId(params: {
|
||||
currentSessionKey?: string;
|
||||
targetUserId: string;
|
||||
}): string | undefined {
|
||||
const sessionKey = params.currentSessionKey?.trim();
|
||||
const sessionKey =
|
||||
parseThreadSessionSuffix(params.currentSessionKey).baseSessionKey ??
|
||||
params.currentSessionKey?.trim();
|
||||
if (!sessionKey) {
|
||||
return undefined;
|
||||
}
|
||||
@@ -100,7 +104,7 @@ export function resolveMatrixOutboundSessionRoute(params: ChannelOutboundSession
|
||||
const from = target.kind === "user" ? `matrix:${target.id}` : `matrix:channel:${target.id}`;
|
||||
const to = `room:${roomScopedDmId ?? target.id}`;
|
||||
|
||||
return buildChannelOutboundSessionRoute({
|
||||
const baseRoute = buildChannelOutboundSessionRoute({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "matrix",
|
||||
@@ -110,4 +114,13 @@ export function resolveMatrixOutboundSessionRoute(params: ChannelOutboundSession
|
||||
from,
|
||||
to,
|
||||
});
|
||||
return buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute,
|
||||
replyToId: params.replyToId,
|
||||
threadId: params.threadId,
|
||||
currentSessionKey: params.currentSessionKey,
|
||||
normalizeThreadId: (threadId) => threadId,
|
||||
canRecoverCurrentThread: ({ route }) =>
|
||||
route.peer.kind !== "direct" || (params.cfg.session?.dmScope ?? "main") !== "main",
|
||||
});
|
||||
}
|
||||
|
||||
@@ -41,6 +41,54 @@ describe("mattermost session route", () => {
|
||||
expect(route?.sessionKey).toContain("thread456");
|
||||
});
|
||||
|
||||
it("recovers channel thread routes from currentSessionKey", () => {
|
||||
const route = resolveMattermostOutboundSessionRoute({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
accountId: "acct-1",
|
||||
target: "mattermost:channel:chan123",
|
||||
currentSessionKey: "agent:main:mattermost:channel:chan123:thread:root-post",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:mattermost:channel:chan123:thread:root-post",
|
||||
baseSessionKey: "agent:main:mattermost:channel:chan123",
|
||||
threadId: "root-post",
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps explicit replyToId ahead of recovered currentSessionKey thread", () => {
|
||||
const route = resolveMattermostOutboundSessionRoute({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
accountId: "acct-1",
|
||||
target: "mattermost:channel:chan123",
|
||||
replyToId: "explicit-root",
|
||||
currentSessionKey: "agent:main:mattermost:channel:chan123:thread:root-post",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:mattermost:channel:chan123:thread:explicit-root",
|
||||
threadId: "explicit-root",
|
||||
});
|
||||
});
|
||||
|
||||
it('does not recover currentSessionKey threads for shared dmScope "main" DMs', () => {
|
||||
const route = resolveMattermostOutboundSessionRoute({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
accountId: "acct-1",
|
||||
target: "@user123",
|
||||
currentSessionKey: "agent:main:main:thread:root-post",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:main",
|
||||
baseSessionKey: "agent:main:main",
|
||||
});
|
||||
expect(route?.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("returns null when the target is empty after normalization", () => {
|
||||
expect(
|
||||
resolveMattermostOutboundSessionRoute({
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import {
|
||||
buildChannelOutboundSessionRoute,
|
||||
resolveThreadSessionKeys,
|
||||
buildThreadAwareOutboundSessionRoute,
|
||||
stripChannelTargetPrefix,
|
||||
stripTargetKindPrefix,
|
||||
type ChannelOutboundSessionRouteParams,
|
||||
} from "openclaw/plugin-sdk/core";
|
||||
import { normalizeOutboundThreadId } from "openclaw/plugin-sdk/routing";
|
||||
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
|
||||
|
||||
export function resolveMattermostOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
|
||||
@@ -40,14 +39,12 @@ export function resolveMattermostOutboundSessionRoute(params: ChannelOutboundSes
|
||||
from: isUser ? `mattermost:${rawId}` : `mattermost:channel:${rawId}`,
|
||||
to: isUser ? `user:${rawId}` : `channel:${rawId}`,
|
||||
});
|
||||
const threadId = normalizeOutboundThreadId(params.replyToId ?? params.threadId);
|
||||
const threadKeys = resolveThreadSessionKeys({
|
||||
baseSessionKey: baseRoute.baseSessionKey,
|
||||
threadId,
|
||||
return buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute,
|
||||
replyToId: params.replyToId,
|
||||
threadId: params.threadId,
|
||||
currentSessionKey: params.currentSessionKey,
|
||||
canRecoverCurrentThread: ({ route }) =>
|
||||
route.chatType !== "direct" || (params.cfg.session?.dmScope ?? "main") !== "main",
|
||||
});
|
||||
return {
|
||||
...baseRoute,
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
...(threadId !== undefined ? { threadId } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -127,6 +127,53 @@ async function startQaChannelTestHarness(params?: {
|
||||
}
|
||||
|
||||
describe("qa-channel plugin", () => {
|
||||
it("derives thread-aware outbound session routes from explicit thread targets", async () => {
|
||||
const route = await qaChannelPlugin.messaging?.resolveOutboundSessionRoute?.({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
accountId: "default",
|
||||
target: "thread:qa-room/thread-1",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:qa-channel:channel:thread:qa-room/thread-1",
|
||||
baseSessionKey: "agent:main:qa-channel:channel:thread:qa-room/thread-1",
|
||||
});
|
||||
expect(route?.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("recovers thread-aware outbound session routes from currentSessionKey", async () => {
|
||||
const route = await qaChannelPlugin.messaging?.resolveOutboundSessionRoute?.({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
accountId: "default",
|
||||
target: "channel:qa-room",
|
||||
currentSessionKey: "agent:main:qa-channel:channel:channel:qa-room:thread:thread-1",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:qa-channel:channel:channel:qa-room:thread:thread-1",
|
||||
baseSessionKey: "agent:main:qa-channel:channel:channel:qa-room",
|
||||
threadId: "thread-1",
|
||||
});
|
||||
});
|
||||
|
||||
it('does not recover currentSessionKey threads for shared dmScope "main" DMs', async () => {
|
||||
const route = await qaChannelPlugin.messaging?.resolveOutboundSessionRoute?.({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
accountId: "default",
|
||||
target: "dm:alice",
|
||||
currentSessionKey: "agent:main:main:thread:thread-1",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:main",
|
||||
baseSessionKey: "agent:main:main",
|
||||
});
|
||||
expect(route?.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("roundtrips inbound DM traffic through the qa bus", { timeout: 20_000 }, async () => {
|
||||
const harness = await startQaChannelTestHarness({ allowFrom: ["*"] });
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import {
|
||||
buildChannelOutboundSessionRoute,
|
||||
buildThreadAwareOutboundSessionRoute,
|
||||
createChatChannelPlugin,
|
||||
} from "openclaw/plugin-sdk/channel-core";
|
||||
import { getChatChannelMeta } from "openclaw/plugin-sdk/channel-plugin-common";
|
||||
@@ -66,9 +67,17 @@ export const qaChannelPlugin: ChannelPlugin<ResolvedQaChannelAccount> = createCh
|
||||
/^((dm|channel):|thread:[^/]+\/)/i.test(raw.trim()) || raw.trim().length > 0,
|
||||
hint: "<dm:user|channel:room|thread:room/thread>",
|
||||
},
|
||||
resolveOutboundSessionRoute: ({ cfg, agentId, accountId, target, threadId }) => {
|
||||
resolveOutboundSessionRoute: ({
|
||||
cfg,
|
||||
agentId,
|
||||
accountId,
|
||||
target,
|
||||
replyToId,
|
||||
threadId,
|
||||
currentSessionKey,
|
||||
}) => {
|
||||
const parsed = parseQaTarget(target);
|
||||
return buildChannelOutboundSessionRoute({
|
||||
const baseRoute = buildChannelOutboundSessionRoute({
|
||||
cfg,
|
||||
agentId,
|
||||
channel: CHANNEL_ID,
|
||||
@@ -80,7 +89,14 @@ export const qaChannelPlugin: ChannelPlugin<ResolvedQaChannelAccount> = createCh
|
||||
chatType: parsed.chatType,
|
||||
from: `qa-channel:${accountId ?? DEFAULT_ACCOUNT_ID}`,
|
||||
to: buildQaTarget(parsed),
|
||||
threadId: threadId ?? parsed.threadId,
|
||||
});
|
||||
return buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute,
|
||||
replyToId,
|
||||
threadId: threadId ?? (target.trim().startsWith("thread:") ? undefined : parsed.threadId),
|
||||
currentSessionKey,
|
||||
canRecoverCurrentThread: ({ route }) =>
|
||||
route.chatType !== "direct" || (cfg.session?.dmScope ?? "main") !== "main",
|
||||
});
|
||||
},
|
||||
},
|
||||
|
||||
@@ -90,6 +90,14 @@ function requireSlackListPeers() {
|
||||
return listPeers;
|
||||
}
|
||||
|
||||
function requireSlackResolveOutboundSessionRoute() {
|
||||
const resolveRoute = slackPlugin.messaging?.resolveOutboundSessionRoute;
|
||||
if (!resolveRoute) {
|
||||
throw new Error("slack messaging.resolveOutboundSessionRoute unavailable");
|
||||
}
|
||||
return resolveRoute;
|
||||
}
|
||||
|
||||
describe("slackPlugin actions", () => {
|
||||
it("prefers session lookup for announce target routing", () => {
|
||||
expect(slackPlugin.meta.preferSessionLookupForAnnounceTarget).toBe(true);
|
||||
@@ -382,6 +390,90 @@ describe("slackPlugin outbound", () => {
|
||||
expect(slackPlugin.outbound?.textChunkLimit).toBe(8000);
|
||||
});
|
||||
|
||||
it("recovers thread route from currentSessionKey when no explicit thread target is provided", async () => {
|
||||
const resolveRoute = requireSlackResolveOutboundSessionRoute();
|
||||
|
||||
const route = await resolveRoute({
|
||||
cfg,
|
||||
agentId: "main",
|
||||
target: "channel:c123",
|
||||
currentSessionKey: "agent:main:slack:channel:c123:thread:1712345678.123456",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:slack:channel:c123:thread:1712345678.123456",
|
||||
baseSessionKey: "agent:main:slack:channel:c123",
|
||||
peer: { kind: "channel", id: "c123" },
|
||||
chatType: "channel",
|
||||
from: "slack:channel:c123",
|
||||
to: "channel:c123",
|
||||
threadId: "1712345678.123456",
|
||||
});
|
||||
});
|
||||
|
||||
it('does not recover a thread from currentSessionKey for shared dmScope "main" DMs', async () => {
|
||||
const resolveRoute = requireSlackResolveOutboundSessionRoute();
|
||||
|
||||
const route = await resolveRoute({
|
||||
cfg,
|
||||
agentId: "main",
|
||||
target: "user:U999",
|
||||
currentSessionKey: "agent:main:main:thread:1712345678.123456",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:main",
|
||||
baseSessionKey: "agent:main:main",
|
||||
peer: { kind: "direct", id: "U999" },
|
||||
chatType: "direct",
|
||||
from: "slack:U999",
|
||||
to: "user:U999",
|
||||
});
|
||||
expect(route?.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("recovers a DM thread from currentSessionKey when dmScope isolates DM peers", async () => {
|
||||
const resolveRoute = requireSlackResolveOutboundSessionRoute();
|
||||
|
||||
const route = await resolveRoute({
|
||||
cfg: {
|
||||
...cfg,
|
||||
session: { dmScope: "per-channel-peer" },
|
||||
},
|
||||
agentId: "main",
|
||||
target: "user:U123",
|
||||
currentSessionKey: "agent:main:slack:direct:u123:thread:1712345678.123456",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:slack:direct:u123:thread:1712345678.123456",
|
||||
baseSessionKey: "agent:main:slack:direct:u123",
|
||||
peer: { kind: "direct", id: "U123" },
|
||||
chatType: "direct",
|
||||
from: "slack:U123",
|
||||
to: "user:U123",
|
||||
threadId: "1712345678.123456",
|
||||
});
|
||||
});
|
||||
|
||||
it("prefers replyToId over threadId for outbound route derivation", async () => {
|
||||
const resolveRoute = requireSlackResolveOutboundSessionRoute();
|
||||
|
||||
const route = await resolveRoute({
|
||||
cfg,
|
||||
agentId: "main",
|
||||
target: "channel:c123",
|
||||
replyToId: "1712000000.000001",
|
||||
threadId: "1712345678.123456",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:slack:channel:c123:thread:1712000000.000001",
|
||||
baseSessionKey: "agent:main:slack:channel:c123",
|
||||
threadId: "1712000000.000001",
|
||||
});
|
||||
});
|
||||
|
||||
it("uses threadId as threadTs fallback for sendText", async () => {
|
||||
const sendSlack = vi.fn().mockResolvedValue({ messageId: "m-text" });
|
||||
const sendText = requireSlackSendText();
|
||||
|
||||
@@ -4,7 +4,10 @@ import {
|
||||
createFlatAllowlistOverrideResolver,
|
||||
} from "openclaw/plugin-sdk/allowlist-config-edit";
|
||||
import { adaptScopedAccountAccessor } from "openclaw/plugin-sdk/channel-config-helpers";
|
||||
import { createChatChannelPlugin } from "openclaw/plugin-sdk/channel-core";
|
||||
import {
|
||||
buildThreadAwareOutboundSessionRoute,
|
||||
createChatChannelPlugin,
|
||||
} from "openclaw/plugin-sdk/channel-core";
|
||||
import { createPairingPrefixStripper } from "openclaw/plugin-sdk/channel-pairing";
|
||||
import {
|
||||
createChannelDirectoryAdapter,
|
||||
@@ -13,12 +16,7 @@ import {
|
||||
import { buildPassiveProbedChannelStatusSummary } from "openclaw/plugin-sdk/extension-shared";
|
||||
import { createLazyRuntimeModule } from "openclaw/plugin-sdk/lazy-runtime";
|
||||
import { resolveOutboundSendDep } from "openclaw/plugin-sdk/outbound-runtime";
|
||||
import {
|
||||
buildOutboundBaseSessionKey,
|
||||
normalizeOutboundThreadId,
|
||||
resolveThreadSessionKeys,
|
||||
type RoutePeer,
|
||||
} from "openclaw/plugin-sdk/routing";
|
||||
import { buildOutboundBaseSessionKey, type RoutePeer } from "openclaw/plugin-sdk/routing";
|
||||
import {
|
||||
createComputedAccountStatusAdapter,
|
||||
createDefaultChannelRuntimeState,
|
||||
@@ -186,11 +184,24 @@ function buildSlackBaseSessionKey(params: {
|
||||
return buildOutboundBaseSessionKey({ ...params, channel: "slack" });
|
||||
}
|
||||
|
||||
function shouldRecoverSlackThreadFromCurrentSession(params: {
|
||||
cfg: OpenClawConfig;
|
||||
peerKind: RoutePeer["kind"];
|
||||
}): boolean {
|
||||
// Shared DM sessions (dmScope="main") do not encode the DM peer in the base key,
|
||||
// so inheriting a prior thread can bleed across unrelated direct-message targets.
|
||||
if (params.peerKind === "direct" && (params.cfg.session?.dmScope ?? "main") === "main") {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
async function resolveSlackOutboundSessionRoute(params: {
|
||||
cfg: OpenClawConfig;
|
||||
agentId: string;
|
||||
accountId?: string | null;
|
||||
target: string;
|
||||
currentSessionKey?: string | null;
|
||||
replyToId?: string | null;
|
||||
threadId?: string | number | null;
|
||||
}) {
|
||||
@@ -223,25 +234,29 @@ async function resolveSlackOutboundSessionRoute(params: {
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
const threadId = normalizeOutboundThreadId(params.threadId ?? params.replyToId);
|
||||
const threadKeys = resolveThreadSessionKeys({
|
||||
baseSessionKey,
|
||||
threadId,
|
||||
return buildThreadAwareOutboundSessionRoute({
|
||||
route: {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: peerKind === "direct" ? ("direct" as const) : ("channel" as const),
|
||||
from:
|
||||
peerKind === "direct"
|
||||
? `slack:${parsed.id}`
|
||||
: peerKind === "group"
|
||||
? `slack:group:${parsed.id}`
|
||||
: `slack:channel:${parsed.id}`,
|
||||
to: peerKind === "direct" ? `user:${parsed.id}` : `channel:${parsed.id}`,
|
||||
},
|
||||
replyToId: params.replyToId,
|
||||
threadId: params.threadId,
|
||||
currentSessionKey: params.currentSessionKey,
|
||||
canRecoverCurrentThread: () =>
|
||||
shouldRecoverSlackThreadFromCurrentSession({
|
||||
cfg: params.cfg,
|
||||
peerKind,
|
||||
}),
|
||||
});
|
||||
return {
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: peerKind === "direct" ? ("direct" as const) : ("channel" as const),
|
||||
from:
|
||||
peerKind === "direct"
|
||||
? `slack:${parsed.id}`
|
||||
: peerKind === "group"
|
||||
? `slack:group:${parsed.id}`
|
||||
: `slack:channel:${parsed.id}`,
|
||||
to: peerKind === "direct" ? `user:${parsed.id}` : `channel:${parsed.id}`,
|
||||
threadId,
|
||||
};
|
||||
}
|
||||
|
||||
function formatSlackScopeDiagnostic(params: {
|
||||
|
||||
@@ -4,7 +4,12 @@ import {
|
||||
createNestedAllowlistOverrideResolver,
|
||||
} from "openclaw/plugin-sdk/allowlist-config-edit";
|
||||
import type { ChannelMessageActionAdapter } from "openclaw/plugin-sdk/channel-contract";
|
||||
import { clearAccountEntryFields, createChatChannelPlugin } from "openclaw/plugin-sdk/channel-core";
|
||||
import {
|
||||
buildChannelOutboundSessionRoute,
|
||||
buildThreadAwareOutboundSessionRoute,
|
||||
clearAccountEntryFields,
|
||||
createChatChannelPlugin,
|
||||
} from "openclaw/plugin-sdk/channel-core";
|
||||
import { createAccountStatusSink } from "openclaw/plugin-sdk/channel-lifecycle";
|
||||
import { createPairingPrefixStripper } from "openclaw/plugin-sdk/channel-pairing";
|
||||
import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result";
|
||||
@@ -21,12 +26,7 @@ import {
|
||||
resolveOutboundSendDep,
|
||||
type OutboundSendDeps,
|
||||
} from "openclaw/plugin-sdk/outbound-runtime";
|
||||
import {
|
||||
buildOutboundBaseSessionKey,
|
||||
normalizeOutboundThreadId,
|
||||
resolveThreadSessionKeys,
|
||||
type RoutePeer,
|
||||
} from "openclaw/plugin-sdk/routing";
|
||||
import { type RoutePeer } from "openclaw/plugin-sdk/routing";
|
||||
import {
|
||||
createComputedAccountStatusAdapter,
|
||||
createDefaultChannelRuntimeState,
|
||||
@@ -445,30 +445,22 @@ function shouldStripTelegramThreadFromAnnounceOrigin(params: {
|
||||
return entryTarget.to !== requesterTarget.to;
|
||||
}
|
||||
|
||||
function buildTelegramBaseSessionKey(params: {
|
||||
cfg: OpenClawConfig;
|
||||
agentId: string;
|
||||
accountId?: string | null;
|
||||
peer: RoutePeer;
|
||||
}) {
|
||||
return buildOutboundBaseSessionKey({ ...params, channel: "telegram" });
|
||||
}
|
||||
|
||||
function resolveTelegramOutboundSessionRoute(params: {
|
||||
cfg: OpenClawConfig;
|
||||
agentId: string;
|
||||
accountId?: string | null;
|
||||
target: string;
|
||||
resolvedTarget?: { kind: string };
|
||||
replyToId?: string | null;
|
||||
threadId?: string | number | null;
|
||||
currentSessionKey?: string | null;
|
||||
}) {
|
||||
const parsed = parseTelegramTarget(params.target);
|
||||
const chatId = parsed.chatId.trim();
|
||||
if (!chatId) {
|
||||
return null;
|
||||
}
|
||||
const fallbackThreadId = normalizeOutboundThreadId(params.threadId);
|
||||
const resolvedThreadId = parsed.messageThreadId ?? parseTelegramThreadId(fallbackThreadId);
|
||||
const resolvedThreadId = parsed.messageThreadId ?? parseTelegramThreadId(params.threadId);
|
||||
const isGroup =
|
||||
parsed.chatType === "group" ||
|
||||
(parsed.chatType === "unknown" &&
|
||||
@@ -480,20 +472,12 @@ function resolveTelegramOutboundSessionRoute(params: {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: peerId,
|
||||
};
|
||||
const baseSessionKey = buildTelegramBaseSessionKey({
|
||||
const baseRoute = buildChannelOutboundSessionRoute({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "telegram",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
const threadKeys =
|
||||
resolvedThreadId && !isGroup
|
||||
? resolveThreadSessionKeys({ baseSessionKey, threadId: String(resolvedThreadId) })
|
||||
: null;
|
||||
return {
|
||||
sessionKey: threadKeys?.sessionKey ?? baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isGroup ? ("group" as const) : ("direct" as const),
|
||||
from: isGroup
|
||||
? `telegram:group:${peerId}`
|
||||
@@ -501,7 +485,25 @@ function resolveTelegramOutboundSessionRoute(params: {
|
||||
? `telegram:${chatId}:topic:${resolvedThreadId}`
|
||||
: `telegram:${chatId}`,
|
||||
to: `telegram:${chatId}`,
|
||||
...(isGroup && resolvedThreadId !== undefined ? { threadId: resolvedThreadId } : {}),
|
||||
});
|
||||
if (isGroup) {
|
||||
return baseRoute;
|
||||
}
|
||||
const route = buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute,
|
||||
threadId: resolvedThreadId,
|
||||
currentSessionKey: params.currentSessionKey,
|
||||
precedence: ["threadId", "currentSession"],
|
||||
canRecoverCurrentThread: ({ route }) =>
|
||||
route.chatType !== "direct" || (params.cfg.session?.dmScope ?? "main") !== "main",
|
||||
});
|
||||
return {
|
||||
...route,
|
||||
from:
|
||||
route.threadId !== undefined
|
||||
? `telegram:${chatId}:topic:${route.threadId}`
|
||||
: `telegram:${chatId}`,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
62
extensions/telegram/src/session-route.test.ts
Normal file
62
extensions/telegram/src/session-route.test.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { telegramPlugin } from "./channel.js";
|
||||
|
||||
describe("telegram session route", () => {
|
||||
it("keeps direct topic thread ids in a thread session suffix", async () => {
|
||||
const route = await telegramPlugin.messaging?.resolveOutboundSessionRoute?.({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
target: "12345:topic:99",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:main:thread:99",
|
||||
baseSessionKey: "agent:main:main",
|
||||
threadId: 99,
|
||||
});
|
||||
});
|
||||
|
||||
it("recovers direct topic thread routes from currentSessionKey when the DM scope is isolated", async () => {
|
||||
const route = await telegramPlugin.messaging?.resolveOutboundSessionRoute?.({
|
||||
cfg: { session: { dmScope: "per-channel-peer" } },
|
||||
agentId: "main",
|
||||
target: "12345",
|
||||
currentSessionKey: "agent:main:telegram:direct:12345:thread:12345:99",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:telegram:direct:12345:thread:12345:99",
|
||||
baseSessionKey: "agent:main:telegram:direct:12345",
|
||||
threadId: "12345:99",
|
||||
});
|
||||
});
|
||||
|
||||
it('does not recover currentSessionKey threads for shared dmScope "main" DMs', async () => {
|
||||
const route = await telegramPlugin.messaging?.resolveOutboundSessionRoute?.({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
target: "12345",
|
||||
currentSessionKey: "agent:main:main:thread:12345:99",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:main",
|
||||
baseSessionKey: "agent:main:main",
|
||||
});
|
||||
expect(route?.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("keeps group topic ids in the group peer route instead of adding a thread suffix", async () => {
|
||||
const route = await telegramPlugin.messaging?.resolveOutboundSessionRoute?.({
|
||||
cfg: {},
|
||||
agentId: "main",
|
||||
target: "-100:topic:99",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:telegram:group:-100:topic:99",
|
||||
baseSessionKey: "agent:main:telegram:group:-100:topic:99",
|
||||
threadId: 99,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -728,6 +728,246 @@ describe("gateway send mirroring", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("uses the derived thread session and derived thread delivery when route refines the provided base session", async () => {
|
||||
mockDeliverySuccess("m-derived-thread");
|
||||
const derivedThreadRoute = {
|
||||
sessionKey: "agent:main:slack:channel:resolved:thread:1710000000.9999",
|
||||
baseSessionKey: "agent:main:slack:channel:resolved",
|
||||
peer: { kind: "channel" as const, id: "resolved" },
|
||||
chatType: "channel" as const,
|
||||
from: "slack:channel:resolved",
|
||||
to: "channel:resolved",
|
||||
threadId: "1710000000.9999",
|
||||
};
|
||||
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce(derivedThreadRoute);
|
||||
|
||||
await runSend({
|
||||
to: "channel:C1",
|
||||
message: "hello",
|
||||
channel: "slack",
|
||||
sessionKey: "agent:main:slack:channel:resolved",
|
||||
idempotencyKey: "idem-derived-thread",
|
||||
});
|
||||
|
||||
expect(mocks.ensureOutboundSessionEntry).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
route: expect.objectContaining(derivedThreadRoute),
|
||||
}),
|
||||
);
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
session: expect.objectContaining({
|
||||
agentId: "main",
|
||||
key: derivedThreadRoute.sessionKey,
|
||||
}),
|
||||
threadId: derivedThreadRoute.threadId,
|
||||
mirror: expect.objectContaining({
|
||||
sessionKey: derivedThreadRoute.sessionKey,
|
||||
agentId: "main",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("uses the derived thread session and derived thread delivery when no sessionKey is provided", async () => {
|
||||
mockDeliverySuccess("m-derived-thread-no-session");
|
||||
const derivedThreadRoute = {
|
||||
sessionKey: "agent:main:slack:channel:resolved:thread:1710000000.9999",
|
||||
baseSessionKey: "agent:main:slack:channel:resolved",
|
||||
peer: { kind: "channel" as const, id: "resolved" },
|
||||
chatType: "channel" as const,
|
||||
from: "slack:channel:resolved",
|
||||
to: "channel:resolved",
|
||||
threadId: "1710000000.9999",
|
||||
};
|
||||
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce(derivedThreadRoute);
|
||||
|
||||
await runSend({
|
||||
to: "channel:C1",
|
||||
message: "hello",
|
||||
channel: "slack",
|
||||
idempotencyKey: "idem-derived-thread-no-session",
|
||||
});
|
||||
|
||||
expect(mocks.ensureOutboundSessionEntry).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
route: expect.objectContaining(derivedThreadRoute),
|
||||
}),
|
||||
);
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
session: expect.objectContaining({
|
||||
agentId: "main",
|
||||
key: derivedThreadRoute.sessionKey,
|
||||
}),
|
||||
threadId: derivedThreadRoute.threadId,
|
||||
mirror: expect.objectContaining({
|
||||
sessionKey: derivedThreadRoute.sessionKey,
|
||||
agentId: "main",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("stringifies numeric derived thread ids before classifying and delivering them", async () => {
|
||||
mockDeliverySuccess("m-derived-thread-numeric");
|
||||
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({
|
||||
sessionKey: "agent:main:slack:channel:resolved:thread:42",
|
||||
baseSessionKey: "agent:main:slack:channel:resolved",
|
||||
peer: { kind: "channel" as const, id: "resolved" },
|
||||
chatType: "channel" as const,
|
||||
from: "slack:channel:resolved",
|
||||
to: "channel:resolved",
|
||||
threadId: 42,
|
||||
});
|
||||
|
||||
await runSend({
|
||||
to: "channel:C1",
|
||||
message: "hello",
|
||||
channel: "slack",
|
||||
sessionKey: "agent:main:slack:channel:resolved",
|
||||
idempotencyKey: "idem-derived-thread-numeric",
|
||||
});
|
||||
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
session: expect.objectContaining({
|
||||
agentId: "main",
|
||||
key: "agent:main:slack:channel:resolved:thread:42",
|
||||
}),
|
||||
threadId: "42",
|
||||
mirror: expect.objectContaining({
|
||||
sessionKey: "agent:main:slack:channel:resolved:thread:42",
|
||||
agentId: "main",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps the provided thread session but still delivers to the derived thread", async () => {
|
||||
mockDeliverySuccess("m-thread-session");
|
||||
const derivedThreadRoute = {
|
||||
sessionKey: "agent:main:slack:channel:resolved:thread:1710000000.9999",
|
||||
baseSessionKey: "agent:main:slack:channel:resolved",
|
||||
peer: { kind: "channel" as const, id: "resolved" },
|
||||
chatType: "channel" as const,
|
||||
from: "slack:channel:resolved",
|
||||
to: "channel:resolved",
|
||||
threadId: "1710000000.9999",
|
||||
};
|
||||
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce(derivedThreadRoute);
|
||||
|
||||
await runSend({
|
||||
to: "channel:C1",
|
||||
message: "hello",
|
||||
channel: "slack",
|
||||
sessionKey: derivedThreadRoute.sessionKey,
|
||||
idempotencyKey: "idem-thread-session",
|
||||
});
|
||||
|
||||
expect(mocks.ensureOutboundSessionEntry).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
route: expect.objectContaining(derivedThreadRoute),
|
||||
}),
|
||||
);
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
session: expect.objectContaining({
|
||||
agentId: "main",
|
||||
key: derivedThreadRoute.sessionKey,
|
||||
}),
|
||||
threadId: derivedThreadRoute.threadId,
|
||||
mirror: expect.objectContaining({
|
||||
sessionKey: derivedThreadRoute.sessionKey,
|
||||
agentId: "main",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps an unrelated provided session key and does not inherit the derived thread id", async () => {
|
||||
mockDeliverySuccess("m-unrelated-session");
|
||||
const providedSessionKey = "agent:main:matrix:channel:!dm:example.org";
|
||||
const derivedThreadRoute = {
|
||||
sessionKey: "agent:main:slack:channel:resolved:thread:1710000000.9999",
|
||||
baseSessionKey: "agent:main:slack:channel:resolved",
|
||||
peer: { kind: "channel" as const, id: "resolved" },
|
||||
chatType: "channel" as const,
|
||||
from: "slack:channel:resolved",
|
||||
to: "channel:resolved",
|
||||
threadId: "1710000000.9999",
|
||||
};
|
||||
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce(derivedThreadRoute);
|
||||
|
||||
await runSend({
|
||||
to: "channel:C1",
|
||||
message: "hello",
|
||||
channel: "slack",
|
||||
sessionKey: providedSessionKey,
|
||||
idempotencyKey: "idem-unrelated-session",
|
||||
});
|
||||
|
||||
expect(mocks.ensureOutboundSessionEntry).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
route: expect.objectContaining(derivedThreadRoute),
|
||||
}),
|
||||
);
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
session: expect.objectContaining({
|
||||
agentId: "main",
|
||||
key: providedSessionKey,
|
||||
}),
|
||||
threadId: null,
|
||||
mirror: expect.objectContaining({
|
||||
sessionKey: providedSessionKey,
|
||||
agentId: "main",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("prefers an explicit request threadId over any derived thread id", async () => {
|
||||
mockDeliverySuccess("m-explicit-thread");
|
||||
const derivedThreadRoute = {
|
||||
sessionKey: "agent:main:slack:channel:resolved:thread:1710000000.9999",
|
||||
baseSessionKey: "agent:main:slack:channel:resolved",
|
||||
peer: { kind: "channel" as const, id: "resolved" },
|
||||
chatType: "channel" as const,
|
||||
from: "slack:channel:resolved",
|
||||
to: "channel:resolved",
|
||||
threadId: "1710000000.9999",
|
||||
};
|
||||
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce(derivedThreadRoute);
|
||||
|
||||
await runSend({
|
||||
to: "channel:C1",
|
||||
message: "hello",
|
||||
channel: "slack",
|
||||
threadId: "1711111111.111111",
|
||||
idempotencyKey: "idem-explicit-thread",
|
||||
});
|
||||
|
||||
expect(mocks.ensureOutboundSessionEntry).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
route: expect.objectContaining(derivedThreadRoute),
|
||||
}),
|
||||
);
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
session: expect.objectContaining({
|
||||
agentId: "main",
|
||||
key: derivedThreadRoute.sessionKey,
|
||||
}),
|
||||
threadId: "1711111111.111111",
|
||||
mirror: expect.objectContaining({
|
||||
sessionKey: derivedThreadRoute.sessionKey,
|
||||
agentId: "main",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to the provided sessionKey when outbound route lookup returns null", async () => {
|
||||
mockDeliverySuccess("m-session-fallback");
|
||||
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce(null);
|
||||
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
|
||||
import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js";
|
||||
import { resolveOutboundTarget } from "../../infra/outbound/targets.js";
|
||||
import { normalizeOutboundThreadId } from "../../infra/outbound/thread-id.js";
|
||||
import { extractToolPayload } from "../../infra/outbound/tool-payload.js";
|
||||
import { normalizePollInput } from "../../polls.js";
|
||||
import {
|
||||
@@ -260,6 +261,77 @@ function createGatewayInflightUnavailableFailure(params: {
|
||||
};
|
||||
}
|
||||
|
||||
type GatewayDerivedRoute = {
|
||||
sessionKey: string;
|
||||
baseSessionKey: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
|
||||
function classifyDerivedThreadRelationship(params: {
|
||||
providedSessionKey?: string;
|
||||
derivedRoute?: GatewayDerivedRoute | null;
|
||||
}): "none" | "base_refinement" | "same_thread" {
|
||||
const providedSessionKey = normalizeOptionalLowercaseString(params.providedSessionKey);
|
||||
const derivedThreadId = normalizeOutboundThreadId(params.derivedRoute?.threadId);
|
||||
if (!providedSessionKey || !params.derivedRoute || !derivedThreadId) {
|
||||
return "none";
|
||||
}
|
||||
const derivedSessionKey = normalizeOptionalLowercaseString(params.derivedRoute.sessionKey);
|
||||
const derivedBaseSessionKey = normalizeOptionalLowercaseString(
|
||||
params.derivedRoute.baseSessionKey,
|
||||
);
|
||||
if (derivedSessionKey && providedSessionKey === derivedSessionKey) {
|
||||
return "same_thread";
|
||||
}
|
||||
if (
|
||||
derivedBaseSessionKey &&
|
||||
providedSessionKey === derivedBaseSessionKey &&
|
||||
providedSessionKey !== derivedSessionKey
|
||||
) {
|
||||
return "base_refinement";
|
||||
}
|
||||
return "none";
|
||||
}
|
||||
|
||||
function resolveGatewayTranscriptSessionKey(params: {
|
||||
providedSessionKey?: string;
|
||||
derivedRoute?: GatewayDerivedRoute | null;
|
||||
}): string | undefined {
|
||||
const providedSessionKey = normalizeOptionalLowercaseString(params.providedSessionKey);
|
||||
const derivedSessionKey = normalizeOptionalLowercaseString(params.derivedRoute?.sessionKey);
|
||||
if (!providedSessionKey) {
|
||||
return derivedSessionKey;
|
||||
}
|
||||
return classifyDerivedThreadRelationship(params) === "base_refinement"
|
||||
? (derivedSessionKey ?? providedSessionKey)
|
||||
: providedSessionKey;
|
||||
}
|
||||
|
||||
function resolveGatewayDeliveryThreadId(params: {
|
||||
explicitThreadId?: string;
|
||||
providedSessionKey?: string;
|
||||
derivedRoute?: GatewayDerivedRoute | null;
|
||||
}): string | null {
|
||||
const explicitThreadId = normalizeOptionalString(params.explicitThreadId);
|
||||
if (explicitThreadId) {
|
||||
return explicitThreadId;
|
||||
}
|
||||
const derivedThreadId = normalizeOutboundThreadId(params.derivedRoute?.threadId);
|
||||
if (!derivedThreadId) {
|
||||
return null;
|
||||
}
|
||||
if (!normalizeOptionalLowercaseString(params.providedSessionKey)) {
|
||||
return derivedThreadId;
|
||||
}
|
||||
const relationship = classifyDerivedThreadRelationship({
|
||||
providedSessionKey: params.providedSessionKey,
|
||||
derivedRoute: params.derivedRoute,
|
||||
});
|
||||
return relationship === "base_refinement" || relationship === "same_thread"
|
||||
? derivedThreadId
|
||||
: null;
|
||||
}
|
||||
|
||||
export const sendHandlers: GatewayRequestHandlers = {
|
||||
"message.action": async ({ params, respond, context, client }) => {
|
||||
const p = params;
|
||||
@@ -486,24 +558,23 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
resolvedTarget: idLikeTarget,
|
||||
threadId,
|
||||
});
|
||||
const outboundRoute = derivedRoute
|
||||
? providedSessionKey
|
||||
? {
|
||||
...derivedRoute,
|
||||
sessionKey: providedSessionKey,
|
||||
baseSessionKey: providedSessionKey,
|
||||
}
|
||||
: derivedRoute
|
||||
: null;
|
||||
if (outboundRoute) {
|
||||
if (derivedRoute) {
|
||||
await ensureOutboundSessionEntry({
|
||||
cfg,
|
||||
channel,
|
||||
accountId,
|
||||
route: outboundRoute,
|
||||
route: derivedRoute,
|
||||
});
|
||||
}
|
||||
const outboundSessionKey = outboundRoute?.sessionKey ?? providedSessionKey;
|
||||
const outboundSessionKey = resolveGatewayTranscriptSessionKey({
|
||||
providedSessionKey,
|
||||
derivedRoute,
|
||||
});
|
||||
const deliveryThreadId = resolveGatewayDeliveryThreadId({
|
||||
explicitThreadId: threadId,
|
||||
providedSessionKey,
|
||||
derivedRoute,
|
||||
});
|
||||
const outboundSession = buildOutboundSessionContext({
|
||||
cfg,
|
||||
agentId: effectiveAgentId,
|
||||
@@ -517,7 +588,7 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
payloads: outboundPayloads,
|
||||
session: outboundSession,
|
||||
gifPlayback: request.gifPlayback,
|
||||
threadId: threadId ?? null,
|
||||
threadId: deliveryThreadId,
|
||||
deps: outboundDeps,
|
||||
gatewayClientScopes: client?.connect?.scopes ?? [],
|
||||
mirror: outboundSessionKey
|
||||
|
||||
@@ -2,6 +2,7 @@ import type { ChannelPlugin } from "../../channels/plugins/types.plugin.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
buildChannelOutboundSessionRoute,
|
||||
buildThreadAwareOutboundSessionRoute,
|
||||
stripChannelTargetPrefix,
|
||||
stripTargetKindPrefix,
|
||||
type ChannelOutboundSessionRouteParams,
|
||||
@@ -9,7 +10,6 @@ import {
|
||||
import {
|
||||
buildOutboundBaseSessionKey,
|
||||
normalizeOutboundThreadId,
|
||||
resolveThreadSessionKeys,
|
||||
type RoutePeer,
|
||||
} from "../../plugin-sdk/routing.js";
|
||||
import { setActivePluginRegistry } from "../../plugins/runtime.js";
|
||||
@@ -62,21 +62,19 @@ function buildThreadedChannelRoute(params: {
|
||||
accountId: params.accountId,
|
||||
peer: params.peer,
|
||||
});
|
||||
const normalizedThreadId = normalizeOutboundThreadId(params.threadId);
|
||||
const threadKeys = resolveThreadSessionKeys({
|
||||
baseSessionKey,
|
||||
threadId: normalizedThreadId,
|
||||
return buildThreadAwareOutboundSessionRoute({
|
||||
route: {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer: params.peer,
|
||||
chatType: params.chatType,
|
||||
from: params.from,
|
||||
to: params.to,
|
||||
},
|
||||
threadId: params.threadId,
|
||||
useSuffix: params.useSuffix,
|
||||
precedence: ["threadId", "replyToId", "currentSession"],
|
||||
});
|
||||
return {
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
baseSessionKey,
|
||||
peer: params.peer,
|
||||
chatType: params.chatType,
|
||||
from: params.from,
|
||||
to: params.to,
|
||||
...(normalizedThreadId !== undefined ? { threadId: params.threadId } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function parseForumTargetForTest(raw: string): {
|
||||
|
||||
@@ -16,11 +16,13 @@ export const createChannelPluginBase: typeof createChannelPluginBaseFromCore = (
|
||||
export {
|
||||
buildChannelConfigSchema,
|
||||
buildChannelOutboundSessionRoute,
|
||||
buildThreadAwareOutboundSessionRoute,
|
||||
clearAccountEntryFields,
|
||||
createChatChannelPlugin,
|
||||
defineChannelPluginEntry,
|
||||
defineSetupPluginEntry,
|
||||
parseOptionalDelimitedEntries,
|
||||
recoverCurrentThreadSessionId,
|
||||
stripChannelTargetPrefix,
|
||||
stripTargetKindPrefix,
|
||||
tryReadSecretFileSync,
|
||||
|
||||
@@ -25,11 +25,17 @@ import type { ReplyToMode } from "../config/types.base.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { buildOutboundBaseSessionKey } from "../infra/outbound/base-session-key.js";
|
||||
import type { OutboundDeliveryResult } from "../infra/outbound/deliver.js";
|
||||
import { normalizeOutboundThreadId } from "../infra/outbound/thread-id.js";
|
||||
import { resolveBundledPluginsDir } from "../plugins/bundled-dir.js";
|
||||
import type { ProviderRuntimeModel } from "../plugins/provider-runtime-model.types.js";
|
||||
import type { PluginRuntime } from "../plugins/runtime/types.js";
|
||||
import type { OpenClawPluginApi } from "../plugins/types.js";
|
||||
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
|
||||
import { resolveThreadSessionKeys } from "../routing/session-key.js";
|
||||
import { parseThreadSessionSuffix } from "../sessions/session-key-utils.js";
|
||||
import {
|
||||
normalizeLowercaseStringOrEmpty,
|
||||
normalizeOptionalLowercaseString,
|
||||
} from "../shared/string-coerce.js";
|
||||
|
||||
export type {
|
||||
AgentHarness,
|
||||
@@ -309,6 +315,96 @@ export function buildChannelOutboundSessionRoute(params: {
|
||||
};
|
||||
}
|
||||
|
||||
export type ThreadAwareOutboundSessionRouteThreadSource =
|
||||
| "replyToId"
|
||||
| "threadId"
|
||||
| "currentSession";
|
||||
|
||||
export type ThreadAwareOutboundSessionRouteRecoveryContext = {
|
||||
route: ChannelOutboundSessionRoute;
|
||||
currentBaseSessionKey: string;
|
||||
currentThreadId: string;
|
||||
};
|
||||
|
||||
export function recoverCurrentThreadSessionId(params: {
|
||||
route: ChannelOutboundSessionRoute;
|
||||
currentSessionKey?: string | null;
|
||||
canRecover?: (context: ThreadAwareOutboundSessionRouteRecoveryContext) => boolean;
|
||||
}): string | undefined {
|
||||
const current = parseThreadSessionSuffix(params.currentSessionKey);
|
||||
if (!current.baseSessionKey || !current.threadId) {
|
||||
return undefined;
|
||||
}
|
||||
if (
|
||||
normalizeOptionalLowercaseString(current.baseSessionKey) !==
|
||||
normalizeOptionalLowercaseString(params.route.baseSessionKey)
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
const context = {
|
||||
route: params.route,
|
||||
currentBaseSessionKey: current.baseSessionKey,
|
||||
currentThreadId: current.threadId,
|
||||
};
|
||||
if (params.canRecover && !params.canRecover(context)) {
|
||||
return undefined;
|
||||
}
|
||||
return current.threadId;
|
||||
}
|
||||
|
||||
export function buildThreadAwareOutboundSessionRoute(params: {
|
||||
route: ChannelOutboundSessionRoute;
|
||||
replyToId?: string | number | null;
|
||||
threadId?: string | number | null;
|
||||
currentSessionKey?: string | null;
|
||||
precedence?: readonly ThreadAwareOutboundSessionRouteThreadSource[];
|
||||
useSuffix?: boolean;
|
||||
parentSessionKey?: string;
|
||||
normalizeThreadId?: (threadId: string) => string;
|
||||
canRecoverCurrentThread?: (context: ThreadAwareOutboundSessionRouteRecoveryContext) => boolean;
|
||||
}): ChannelOutboundSessionRoute {
|
||||
const recoveredThreadId = recoverCurrentThreadSessionId({
|
||||
route: params.route,
|
||||
currentSessionKey: params.currentSessionKey,
|
||||
canRecover: params.canRecoverCurrentThread,
|
||||
});
|
||||
const candidates: Record<
|
||||
ThreadAwareOutboundSessionRouteThreadSource,
|
||||
{ routeThreadId: string | number; sessionThreadId: string } | undefined
|
||||
> = {
|
||||
replyToId: resolveThreadAwareOutboundCandidate(params.replyToId),
|
||||
threadId: resolveThreadAwareOutboundCandidate(params.threadId),
|
||||
currentSession: resolveThreadAwareOutboundCandidate(recoveredThreadId),
|
||||
};
|
||||
const precedence = params.precedence ?? ["replyToId", "threadId", "currentSession"];
|
||||
const candidate = precedence.map((source) => candidates[source]).find(Boolean);
|
||||
const threadKeys = resolveThreadSessionKeys({
|
||||
baseSessionKey: params.route.baseSessionKey,
|
||||
threadId: candidate?.sessionThreadId,
|
||||
parentSessionKey: candidate ? params.parentSessionKey : undefined,
|
||||
useSuffix: params.useSuffix,
|
||||
normalizeThreadId: params.normalizeThreadId,
|
||||
});
|
||||
return {
|
||||
...params.route,
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
...(candidate !== undefined ? { threadId: candidate.routeThreadId } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function resolveThreadAwareOutboundCandidate(
|
||||
threadId?: string | number | null,
|
||||
): { routeThreadId: string | number; sessionThreadId: string } | undefined {
|
||||
const sessionThreadId = normalizeOutboundThreadId(threadId);
|
||||
if (sessionThreadId === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
routeThreadId: typeof threadId === "number" ? threadId : sessionThreadId,
|
||||
sessionThreadId,
|
||||
};
|
||||
}
|
||||
|
||||
/** Options for a channel plugin entry that should register a channel capability. */
|
||||
type ChannelEntryConfigSchema<TPlugin> =
|
||||
TPlugin extends ChannelPlugin<unknown>
|
||||
|
||||
@@ -20,6 +20,7 @@ export {
|
||||
normalizeMainKey,
|
||||
normalizeOptionalAccountId,
|
||||
parseAgentSessionKey,
|
||||
parseThreadSessionSuffix,
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveThreadSessionKeys,
|
||||
sanitizeAgentId,
|
||||
|
||||
116
src/plugin-sdk/thread-aware-outbound-session-route.test.ts
Normal file
116
src/plugin-sdk/thread-aware-outbound-session-route.test.ts
Normal file
@@ -0,0 +1,116 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
buildThreadAwareOutboundSessionRoute,
|
||||
recoverCurrentThreadSessionId,
|
||||
type ChannelOutboundSessionRoute,
|
||||
} from "./core.js";
|
||||
|
||||
function baseRoute(
|
||||
overrides: Partial<ChannelOutboundSessionRoute> = {},
|
||||
): ChannelOutboundSessionRoute {
|
||||
return {
|
||||
sessionKey: "agent:main:workspace:channel:c123",
|
||||
baseSessionKey: "agent:main:workspace:channel:c123",
|
||||
peer: { kind: "channel", id: "c123" },
|
||||
chatType: "channel",
|
||||
from: "workspace:channel:c123",
|
||||
to: "channel:c123",
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("buildThreadAwareOutboundSessionRoute", () => {
|
||||
it("uses replyToId before threadId and recovered current-session thread by default", () => {
|
||||
const route = buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute(),
|
||||
replyToId: "reply-1",
|
||||
threadId: "thread-1",
|
||||
currentSessionKey: "agent:main:workspace:channel:c123:thread:current-1",
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:workspace:channel:c123:thread:reply-1",
|
||||
threadId: "reply-1",
|
||||
});
|
||||
});
|
||||
|
||||
it("supports provider-specific threadId-first precedence", () => {
|
||||
const route = buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute(),
|
||||
replyToId: "reply-1",
|
||||
threadId: "thread-1",
|
||||
precedence: ["threadId", "replyToId", "currentSession"],
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:workspace:channel:c123:thread:thread-1",
|
||||
threadId: "thread-1",
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps numeric delivery thread ids on the route while stringifying the session suffix", () => {
|
||||
const route = buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute(),
|
||||
threadId: 99,
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:workspace:channel:c123:thread:99",
|
||||
threadId: 99,
|
||||
});
|
||||
});
|
||||
|
||||
it("recovers a current-session thread only when the base session matches", () => {
|
||||
expect(
|
||||
recoverCurrentThreadSessionId({
|
||||
route: baseRoute(),
|
||||
currentSessionKey: "agent:main:workspace:channel:c123:thread:current-1",
|
||||
}),
|
||||
).toBe("current-1");
|
||||
expect(
|
||||
recoverCurrentThreadSessionId({
|
||||
route: baseRoute(),
|
||||
currentSessionKey: "agent:main:workspace:channel:other:thread:current-1",
|
||||
}),
|
||||
).toBeUndefined();
|
||||
});
|
||||
|
||||
it("lets providers veto current-session recovery", () => {
|
||||
const route = buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute(),
|
||||
currentSessionKey: "agent:main:workspace:channel:c123:thread:current-1",
|
||||
canRecoverCurrentThread: () => false,
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:workspace:channel:c123",
|
||||
});
|
||||
expect(route.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves provider-specific thread case when requested", () => {
|
||||
const route = buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute(),
|
||||
threadId: "$EventID:Example.Org",
|
||||
normalizeThreadId: (threadId) => threadId,
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:workspace:channel:c123:thread:$EventID:Example.Org",
|
||||
threadId: "$EventID:Example.Org",
|
||||
});
|
||||
});
|
||||
|
||||
it("can carry a delivery thread without adding a session suffix", () => {
|
||||
const route = buildThreadAwareOutboundSessionRoute({
|
||||
route: baseRoute(),
|
||||
threadId: "thread-1",
|
||||
useSuffix: false,
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:workspace:channel:c123",
|
||||
threadId: "thread-1",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -9,6 +9,7 @@ export {
|
||||
isAcpSessionKey,
|
||||
isSubagentSessionKey,
|
||||
parseAgentSessionKey,
|
||||
parseThreadSessionSuffix,
|
||||
type ParsedAgentSessionKey,
|
||||
} from "../sessions/session-key-utils.js";
|
||||
export {
|
||||
|
||||
Reference in New Issue
Block a user