Compare commits

..

4 Commits

Author SHA1 Message Date
scoootscooob
9e1426b2a9 refactor(exec): centralize native approval delivery 2026-03-30 08:33:57 -07:00
scoootscooob
c625a3c306 fix(exec): scope telegram legacy approval fallback 2026-03-30 08:33:44 -07:00
scoootscooob
330266545b fix(exec): restore channel approval routing 2026-03-30 08:33:41 -07:00
scoootscooob
e7e27b4994 fix(exec): clean up failed approval deliveries 2026-03-30 08:26:08 -07:00
13 changed files with 565 additions and 437 deletions

View File

@@ -40,7 +40,6 @@ function toExecLikeRequest(request: ApprovalRequest): ExecApprovalRequest {
turnSourceChannel: request.request.turnSourceChannel ?? undefined,
turnSourceTo: request.request.turnSourceTo ?? undefined,
turnSourceAccountId: request.request.turnSourceAccountId ?? undefined,
turnSourceThreadId: request.request.turnSourceThreadId ?? undefined,
},
createdAtMs: request.createdAtMs,
expiresAtMs: request.expiresAtMs,
@@ -73,7 +72,6 @@ function resolveRequestSessionTarget(params: {
turnSourceChannel: execLikeRequest.request.turnSourceChannel ?? undefined,
turnSourceTo: execLikeRequest.request.turnSourceTo ?? undefined,
turnSourceAccountId: execLikeRequest.request.turnSourceAccountId ?? undefined,
turnSourceThreadId: execLikeRequest.request.turnSourceThreadId ?? undefined,
});
}
@@ -85,50 +83,31 @@ function resolveDiscordOriginTarget(params: {
const turnSourceChannel = params.request.request.turnSourceChannel?.trim().toLowerCase() || "";
const turnSourceTo = normalizeDiscordOriginChannelId(params.request.request.turnSourceTo);
const turnSourceAccountId = params.request.request.turnSourceAccountId?.trim() || "";
const turnSourceTarget =
turnSourceChannel === "discord" && turnSourceTo
? {
to: turnSourceTo,
accountId: turnSourceAccountId || undefined,
}
: null;
if (
turnSourceTarget?.accountId &&
params.accountId &&
normalizeAccountId(turnSourceTarget.accountId) !== normalizeAccountId(params.accountId)
) {
return null;
if (turnSourceChannel === "discord" && turnSourceTo) {
if (
params.accountId &&
turnSourceAccountId &&
normalizeAccountId(turnSourceAccountId) !== normalizeAccountId(params.accountId)
) {
return null;
}
return { to: turnSourceTo };
}
const sessionTarget = resolveRequestSessionTarget(params);
if (!sessionTarget || sessionTarget.channel !== "discord") {
const channelId = extractDiscordChannelId(params.request.request.sessionKey?.trim() || null);
return channelId ? { to: channelId } : null;
}
if (
sessionTarget?.channel === "discord" &&
sessionTarget.accountId &&
params.accountId &&
sessionTarget.accountId &&
normalizeAccountId(sessionTarget.accountId) !== normalizeAccountId(params.accountId)
) {
return null;
}
if (
turnSourceTarget &&
sessionTarget?.channel === "discord" &&
turnSourceTarget.to !== normalizeDiscordOriginChannelId(sessionTarget.to)
) {
return null;
}
if (turnSourceTarget) {
return { to: turnSourceTarget.to };
}
if (sessionTarget?.channel === "discord") {
const targetTo = normalizeDiscordOriginChannelId(sessionTarget.to);
return targetTo ? { to: targetTo } : null;
}
const legacyChannelId = extractDiscordChannelId(params.request.request.sessionKey?.trim() || null);
if (legacyChannelId) {
return { to: legacyChannelId };
}
return null;
const targetTo = normalizeDiscordOriginChannelId(sessionTarget.to);
return targetTo ? { to: targetTo } : null;
}
function resolveDiscordApproverDmTargets(params: {

View File

@@ -17,7 +17,6 @@ import {
} from "./exec-approvals.js";
type ApprovalRequest = ExecApprovalRequest | PluginApprovalRequest;
type TelegramOriginTarget = { to: string; threadId?: number; accountId?: string };
function isExecApprovalRequest(request: ApprovalRequest): request is ExecApprovalRequest {
return "command" in request.request;
@@ -57,40 +56,30 @@ function resolveRequestSessionTarget(params: {
});
}
function resolveTurnSourceTelegramOriginTarget(params: {
accountId: string;
request: ApprovalRequest;
}): TelegramOriginTarget | null {
const turnSourceChannel = params.request.request.turnSourceChannel?.trim().toLowerCase() || "";
const turnSourceTo = params.request.request.turnSourceTo?.trim() || "";
const turnSourceAccountId = params.request.request.turnSourceAccountId?.trim() || "";
if (turnSourceChannel !== "telegram" || !turnSourceTo) {
return null;
}
if (
turnSourceAccountId &&
normalizeAccountId(turnSourceAccountId) !== normalizeAccountId(params.accountId)
) {
return null;
}
const threadId =
typeof params.request.request.turnSourceThreadId === "number"
? params.request.request.turnSourceThreadId
: typeof params.request.request.turnSourceThreadId === "string"
? Number.parseInt(params.request.request.turnSourceThreadId, 10)
: undefined;
return {
to: turnSourceTo,
threadId: Number.isFinite(threadId) ? threadId : undefined,
accountId: turnSourceAccountId || undefined,
};
}
function resolveSessionTelegramOriginTarget(params: {
function resolveTelegramOriginTarget(params: {
cfg: OpenClawConfig;
accountId: string;
request: ApprovalRequest;
}): TelegramOriginTarget | null {
}) {
const turnSourceChannel = params.request.request.turnSourceChannel?.trim().toLowerCase() || "";
const turnSourceTo = params.request.request.turnSourceTo?.trim() || "";
const turnSourceAccountId = params.request.request.turnSourceAccountId?.trim() || "";
if (turnSourceChannel === "telegram" && turnSourceTo) {
if (
turnSourceAccountId &&
normalizeAccountId(turnSourceAccountId) !== normalizeAccountId(params.accountId)
) {
return null;
}
const threadId =
typeof params.request.request.turnSourceThreadId === "number"
? params.request.request.turnSourceThreadId
: typeof params.request.request.turnSourceThreadId === "string"
? Number.parseInt(params.request.request.turnSourceThreadId, 10)
: undefined;
return { to: turnSourceTo, threadId: Number.isFinite(threadId) ? threadId : undefined };
}
const sessionTarget = resolveRequestSessionTarget(params);
if (!sessionTarget || sessionTarget.channel !== "telegram") {
return null;
@@ -104,32 +93,9 @@ function resolveSessionTelegramOriginTarget(params: {
return {
to: sessionTarget.to,
threadId: sessionTarget.threadId,
accountId: sessionTarget.accountId,
};
}
function telegramTargetsMatch(a: TelegramOriginTarget, b: TelegramOriginTarget): boolean {
const accountMatches =
!a.accountId ||
!b.accountId ||
normalizeAccountId(a.accountId) === normalizeAccountId(b.accountId);
return a.to === b.to && a.threadId === b.threadId && accountMatches;
}
function resolveTelegramOriginTarget(params: {
cfg: OpenClawConfig;
accountId: string;
request: ApprovalRequest;
}) {
const turnSourceTarget = resolveTurnSourceTelegramOriginTarget(params);
const sessionTarget = resolveSessionTelegramOriginTarget(params);
if (turnSourceTarget && sessionTarget && !telegramTargetsMatch(turnSourceTarget, sessionTarget)) {
return null;
}
const target = turnSourceTarget ?? sessionTarget;
return target ? { to: target.to, threadId: target.threadId } : null;
}
function resolveTelegramApproverDmTargets(params: {
cfg: OpenClawConfig;
accountId?: string | null;

View File

@@ -1335,10 +1335,12 @@ export const registerTelegramHandlers = ({
});
const authorizedApprovalSender = isPluginApproval
? pluginApprovalAuthorizedSender
: execApprovalAuthorizedSender;
if (!authorizedApprovalSender) {
: execApprovalAuthorizedSender || pluginApprovalAuthorizedSender;
if (
!authorizedApprovalSender
) {
logVerbose(
`Blocked telegram approval callback from ${senderId || "unknown"} (not authorized)`,
`Blocked telegram exec approval callback from ${senderId || "unknown"} (not authorized)`,
);
return;
}
@@ -1357,9 +1359,7 @@ export const registerTelegramHandlers = ({
logVerbose(
`telegram: failed to resolve approval callback ${approvalCallback.approvalId}: ${errStr}`,
);
await replyToCallbackChat(
"❌ Failed to submit approval. Please try again or contact an admin.",
);
await replyToCallbackChat(`❌ Failed to submit approval: ${errStr}`);
return;
}
try {

View File

@@ -216,6 +216,7 @@ describe("createTelegramBot", () => {
it("blocks callback_query when inline buttons are allowlist-only and sender not authorized", async () => {
onSpy.mockClear();
replySpy.mockClear();
sendMessageSpy.mockClear();
createTelegramBot({
token: "tok",
@@ -433,8 +434,8 @@ describe("createTelegramBot", () => {
}),
approvalId: "138e9b8c",
decision: "allow-once",
senderId: "9",
allowPluginFallback: true,
senderId: "9",
});
expect(replySpy).not.toHaveBeenCalled();
expect(editMessageTextSpy).not.toHaveBeenCalled();
@@ -541,8 +542,8 @@ describe("createTelegramBot", () => {
}),
approvalId: "plugin:138e9b8c",
decision: "allow-once",
senderId: "9",
allowPluginFallback: true,
senderId: "9",
});
expect(editMessageReplyMarkupSpy).toHaveBeenCalledTimes(1);
expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-plugin-approve");
@@ -595,52 +596,6 @@ describe("createTelegramBot", () => {
expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-approve-blocked");
});
it("does not leak raw approval callback errors back into Telegram chat", async () => {
onSpy.mockClear();
sendMessageSpy.mockClear();
resolveExecApprovalSpy.mockClear();
resolveExecApprovalSpy.mockRejectedValueOnce(new Error("gateway secret detail"));
loadConfig.mockReturnValue({
channels: {
telegram: {
dmPolicy: "open",
allowFrom: ["*"],
execApprovals: {
enabled: true,
approvers: ["9"],
target: "dm",
},
},
},
});
createTelegramBot({ token: "tok" });
const callbackHandler = getOnHandler("callback_query") as (
ctx: Record<string, unknown>,
) => Promise<void>;
await callbackHandler({
callbackQuery: {
id: "cbq-approve-error",
data: "/approve 138e9b8c allow-once",
from: { id: 9, first_name: "Ada", username: "ada_bot" },
message: {
chat: { id: 1234, type: "private" },
date: 1736380800,
message_id: 25,
text: "Approval required.",
},
},
me: { username: "openclaw_bot" },
getFile: async () => ({ download: async () => new Uint8Array() }),
});
expect(sendMessageSpy).toHaveBeenCalledTimes(1);
expect(sendMessageSpy.mock.calls[0]?.[1]).toBe(
"❌ Failed to submit approval. Please try again or contact an admin.",
);
});
it("allows exec approval callbacks from target-only Telegram recipients", async () => {
onSpy.mockClear();
editMessageReplyMarkupSpy.mockClear();
@@ -695,13 +650,82 @@ describe("createTelegramBot", () => {
}),
approvalId: "138e9b8c",
decision: "allow-once",
senderId: "9",
allowPluginFallback: false,
senderId: "9",
});
expect(editMessageReplyMarkupSpy).toHaveBeenCalledTimes(1);
expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-approve-target");
});
it("does not allow target-only recipients to use legacy plugin fallback ids", async () => {
onSpy.mockClear();
editMessageReplyMarkupSpy.mockClear();
editMessageTextSpy.mockClear();
resolveExecApprovalSpy.mockClear();
replySpy.mockClear();
resolveExecApprovalSpy.mockRejectedValueOnce(new Error("unknown or expired approval id"));
loadConfig.mockReturnValue({
approvals: {
exec: {
enabled: true,
mode: "targets",
targets: [{ channel: "telegram", to: "9" }],
},
},
channels: {
telegram: {
dmPolicy: "open",
allowFrom: ["*"],
},
},
});
createTelegramBot({ token: "tok" });
const callbackHandler = onSpy.mock.calls.find((call) => call[0] === "callback_query")?.[1] as (
ctx: Record<string, unknown>,
) => Promise<void>;
expect(callbackHandler).toBeDefined();
await callbackHandler({
callbackQuery: {
id: "cbq-legacy-plugin-fallback-blocked",
data: "/approve 138e9b8c allow-once",
from: { id: 9, first_name: "Ada", username: "ada_bot" },
message: {
chat: { id: 1234, type: "private" },
date: 1736380800,
message_id: 25,
text: "Legacy plugin approval required.",
},
},
me: { username: "openclaw_bot" },
getFile: async () => ({ download: async () => new Uint8Array() }),
});
expect(resolveExecApprovalSpy).toHaveBeenCalledWith({
cfg: expect.objectContaining({
approvals: expect.objectContaining({
exec: expect.objectContaining({
enabled: true,
mode: "targets",
}),
}),
}),
approvalId: "138e9b8c",
decision: "allow-once",
allowPluginFallback: false,
senderId: "9",
});
expect(editMessageReplyMarkupSpy).not.toHaveBeenCalled();
expect(replySpy).not.toHaveBeenCalled();
expect(sendMessageSpy).toHaveBeenCalledWith(
1234,
"❌ Failed to submit approval: Error: unknown or expired approval id",
undefined,
);
expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-legacy-plugin-fallback-blocked");
});
it("keeps plugin approval callback buttons for target-only recipients", async () => {
onSpy.mockClear();
editMessageReplyMarkupSpy.mockClear();

View File

@@ -56,7 +56,6 @@ describe("resolveTelegramExecApproval", () => {
approvalId: "legacy-plugin-123",
decision: "allow-always",
senderId: "9",
allowPluginFallback: true,
});
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith(1, "exec.approval.resolve", {
@@ -73,25 +72,34 @@ describe("resolveTelegramExecApproval", () => {
);
});
it("does not fall back to plugin.approval.resolve without explicit permission", async () => {
gatewayRuntimeHoisted.requestSpy.mockRejectedValueOnce(
new Error("unknown or expired approval id"),
);
it("falls back to plugin.approval.resolve for structured approval-not-found errors", async () => {
const err = new Error("invalid request") as Error & {
gatewayCode?: string;
details?: { reason?: string };
};
err.gatewayCode = "INVALID_REQUEST";
err.details = { reason: "APPROVAL_NOT_FOUND" };
gatewayRuntimeHoisted.requestSpy.mockRejectedValueOnce(err).mockResolvedValueOnce(undefined);
const { resolveTelegramExecApproval } = await import("./exec-approval-resolver.js");
await expect(
resolveTelegramExecApproval({
cfg: {} as never,
approvalId: "legacy-plugin-123",
decision: "allow-always",
senderId: "9",
}),
).rejects.toThrow("unknown or expired approval id");
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenCalledTimes(1);
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenCalledWith("exec.approval.resolve", {
id: "legacy-plugin-123",
decision: "allow-always",
await resolveTelegramExecApproval({
cfg: {} as never,
approvalId: "legacy-plugin-123",
decision: "deny",
senderId: "9",
});
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith(1, "exec.approval.resolve", {
id: "legacy-plugin-123",
decision: "deny",
});
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith(
2,
"plugin.approval.resolve",
{
id: "legacy-plugin-123",
decision: "deny",
},
);
});
});

View File

@@ -7,17 +7,34 @@ export type ResolveTelegramExecApprovalParams = {
approvalId: string;
decision: ExecApprovalReplyDecision;
senderId?: string | null;
gatewayUrl?: string;
allowPluginFallback?: boolean;
gatewayUrl?: string;
};
function isApprovalNotFoundError(err: unknown): boolean {
return /unknown or expired approval id/i.test(String(err));
}
export async function resolveTelegramExecApproval(
params: ResolveTelegramExecApprovalParams,
): Promise<void> {
const isApprovalNotFoundError = (err: unknown): boolean => {
if (!(err instanceof Error)) {
return false;
}
const gatewayCode = (err as { gatewayCode?: unknown }).gatewayCode;
if (gatewayCode === "APPROVAL_NOT_FOUND") {
return true;
}
const details = (err as { details?: unknown }).details;
if (
gatewayCode === "INVALID_REQUEST" &&
details &&
typeof details === "object" &&
!Array.isArray(details) &&
(details as { reason?: unknown }).reason === "APPROVAL_NOT_FOUND"
) {
return true;
}
return /unknown or expired approval id/i.test(err.message);
};
let readySettled = false;
let resolveReady!: () => void;
let rejectReady!: (err: unknown) => void;

View File

@@ -204,38 +204,4 @@ describe("TelegramExecApprovalHandler", () => {
}),
);
});
it("does not deliver plugin approvals for a different Telegram account", async () => {
const cfg = {
channels: {
telegram: {
execApprovals: {
enabled: true,
approvers: ["8460800771"],
target: "dm",
},
accounts: {
secondary: {
execApprovals: {
enabled: true,
approvers: ["999"],
target: "dm",
},
},
},
},
},
} as OpenClawConfig;
const { handler, sendMessage } = createHandler(cfg);
await handler.handleRequested({
...pluginRequest,
request: {
...pluginRequest.request,
turnSourceAccountId: "secondary",
},
});
expect(sendMessage).not.toHaveBeenCalled();
});
});

View File

@@ -4,7 +4,6 @@ import {
createExecApprovalChannelRuntime,
type ExecApprovalChannelRuntime,
resolveChannelNativeApprovalDeliveryPlan,
resolveExecApprovalSessionTarget,
} from "openclaw/plugin-sdk/infra-runtime";
import { resolveExecApprovalCommandDisplay } from "openclaw/plugin-sdk/infra-runtime";
import {
@@ -17,7 +16,7 @@ import type {
PluginApprovalRequest,
PluginApprovalResolved,
} from "openclaw/plugin-sdk/infra-runtime";
import { parseAgentSessionKey, normalizeAccountId } from "openclaw/plugin-sdk/routing";
import { parseAgentSessionKey } from "openclaw/plugin-sdk/routing";
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { compileSafeRegex, testRegexWithBoundedInput } from "openclaw/plugin-sdk/security-runtime";
@@ -40,52 +39,6 @@ type PendingMessage = {
messageId: string;
};
function isExecApprovalRequest(request: ApprovalRequest): request is ExecApprovalRequest {
return "command" in request.request;
}
function toExecLikeRequest(request: ApprovalRequest): ExecApprovalRequest {
if (isExecApprovalRequest(request)) {
return request;
}
return {
id: request.id,
request: {
command: request.request.title,
agentId: request.request.agentId ?? undefined,
sessionKey: request.request.sessionKey ?? undefined,
turnSourceChannel: request.request.turnSourceChannel ?? undefined,
turnSourceTo: request.request.turnSourceTo ?? undefined,
turnSourceAccountId: request.request.turnSourceAccountId ?? undefined,
turnSourceThreadId: request.request.turnSourceThreadId ?? undefined,
},
createdAtMs: request.createdAtMs,
expiresAtMs: request.expiresAtMs,
};
}
function resolveBoundTelegramAccountId(params: {
cfg: OpenClawConfig;
request: ApprovalRequest;
}): string | null {
const turnSourceChannel = params.request.request.turnSourceChannel?.trim().toLowerCase();
if (turnSourceChannel === "telegram") {
return params.request.request.turnSourceAccountId?.trim() || null;
}
const sessionTarget = resolveExecApprovalSessionTarget({
cfg: params.cfg,
request: toExecLikeRequest(params.request),
turnSourceChannel: params.request.request.turnSourceChannel ?? undefined,
turnSourceTo: params.request.request.turnSourceTo ?? undefined,
turnSourceAccountId: params.request.request.turnSourceAccountId ?? undefined,
turnSourceThreadId: params.request.request.turnSourceThreadId ?? undefined,
});
if (!sessionTarget || sessionTarget.channel !== "telegram") {
return null;
}
return sessionTarget.accountId?.trim() || null;
}
export type TelegramExecApprovalHandlerOpts = {
token: string;
accountId: string;
@@ -144,16 +97,6 @@ function matchesFilters(params: {
return false;
}
}
const boundAccountId = resolveBoundTelegramAccountId({
cfg: params.cfg,
request: params.request,
});
if (
boundAccountId &&
normalizeAccountId(boundAccountId) !== normalizeAccountId(params.accountId)
) {
return false;
}
return true;
}

View File

@@ -247,9 +247,7 @@ export const handleApproveCommand: CommandHandler = async (params, allowTextComm
if (isApprovalNotFoundError(err)) {
return {
shouldContinue: false,
reply: {
text: pluginApprovalAuthorization.reason ?? "❌ You are not authorized to approve this request.",
},
reply: { text: `❌ Failed to submit approval: ${String(err)}` },
};
}
return {

View File

@@ -115,6 +115,7 @@ vi.resetModules();
const {
addSubagentRunForTests,
getSubagentRunByChildSessionKey,
listSubagentRunsForRequester,
resetSubagentRegistryForTests,
} = await import("../../agents/subagent-registry.js");
@@ -442,6 +443,25 @@ describe("/approve command", () => {
);
});
it("keeps /approve blocked for unauthorized senders without explicit approval auth", async () => {
const cfg = {
commands: { text: true },
channels: { slack: { allowFrom: ["*"] } },
} as OpenClawConfig;
const params = buildParams("/approve abc allow-once", cfg, {
Provider: "slack",
Surface: "slack",
SenderId: "123",
});
params.command.isAuthorizedSender = false;
const result = await handleCommands(params);
expect(result.shouldContinue).toBe(false);
expect(result.reply).toBeUndefined();
expect(callGatewayMock).not.toHaveBeenCalled();
});
it("accepts Telegram command mentions for /approve", async () => {
const cfg = createTelegramApproveCfg();
const params = buildParams("/approve@bot abc12345 allow-once", cfg, {
@@ -486,23 +506,6 @@ describe("/approve command", () => {
);
});
it("does not treat implicit default approval auth as a bypass for unauthorized senders", async () => {
const cfg = {
commands: { text: true },
} as OpenClawConfig;
const params = buildParams("/approve abc12345 allow-once", cfg, {
Provider: "webchat",
Surface: "webchat",
SenderId: "123",
});
params.command.isAuthorizedSender = false;
const result = await handleCommands(params);
expect(result.shouldContinue).toBe(false);
expect(result.reply).toBeUndefined();
expect(callGatewayMock).not.toHaveBeenCalled();
});
it("accepts Telegram /approve from exec target recipients even when native approvals are disabled", async () => {
const cfg = {
commands: { text: true },
@@ -692,6 +695,24 @@ describe("/approve command", () => {
}
});
it("returns the real plugin not-found error for authorized plugin approvers", async () => {
callGatewayMock.mockRejectedValueOnce(new Error("unknown or expired approval id"));
const params = buildParams(
"/approve plugin:abc123 allow-once",
createDiscordApproveCfg({ enabled: false, approvers: ["123"], target: "channel" }),
{
Provider: "discord",
Surface: "discord",
SenderId: "123",
},
);
const result = await handleCommands(params);
expect(result.shouldContinue).toBe(false);
expect(result.reply?.text).toContain("unknown or expired approval id");
expect(result.reply?.text).not.toContain("not authorized");
});
it("rejects unauthorized or invalid Telegram /approve variants", async () => {
for (const testCase of [
{

View File

@@ -2,36 +2,26 @@ import { getChannelPlugin } from "../channels/plugins/index.js";
import type { OpenClawConfig } from "../config/config.js";
import { normalizeMessageChannel } from "../utils/message-channel.js";
export type ApprovalCommandAuthorization = {
authorized: boolean;
reason?: string;
explicit: boolean;
};
export function resolveApprovalCommandAuthorization(params: {
cfg: OpenClawConfig;
channel?: string | null;
accountId?: string | null;
senderId?: string | null;
kind: "exec" | "plugin";
}): ApprovalCommandAuthorization {
}): { authorized: boolean; reason?: string; explicit: boolean } {
const channel = normalizeMessageChannel(params.channel);
if (!channel) {
return { authorized: true, explicit: false };
}
const resolved = getChannelPlugin(channel)?.auth?.authorizeActorAction?.({
const result = getChannelPlugin(channel)?.auth?.authorizeActorAction?.({
cfg: params.cfg,
accountId: params.accountId,
senderId: params.senderId,
action: "approve",
approvalKind: params.kind,
});
if (!resolved) {
if (!result) {
return { authorized: true, explicit: false };
}
return {
authorized: resolved.authorized,
reason: resolved.reason,
explicit: true,
};
return { ...result, explicit: true };
}

View File

@@ -1,21 +1,43 @@
import type { GatewayClient } from "../gateway/client.js";
import type { PluginApprovalRequest, PluginApprovalResolved } from "./plugin-approvals.js";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const mockGatewayClientStarts = vi.hoisted(() => vi.fn());
const mockGatewayClientStops = vi.hoisted(() => vi.fn());
const mockGatewayClientRequests = vi.hoisted(() => vi.fn(async () => ({ ok: true })));
const mockCreateOperatorApprovalsGatewayClient = vi.hoisted(() => vi.fn());
const loggerMocks = vi.hoisted(() => ({
debug: vi.fn(),
error: vi.fn(),
}));
vi.mock("../gateway/operator-approvals-client.js", () => ({
createOperatorApprovalsGatewayClient: mockCreateOperatorApprovalsGatewayClient,
}));
vi.mock("../logging/subsystem.js", () => ({
createSubsystemLogger: () => loggerMocks,
}));
let createExecApprovalChannelRuntime: typeof import("./exec-approval-channel-runtime.js").createExecApprovalChannelRuntime;
function createDeferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((promiseResolve, promiseReject) => {
resolve = promiseResolve;
reject = promiseReject;
});
return { promise, resolve, reject };
}
beforeEach(() => {
mockGatewayClientStarts.mockReset();
mockGatewayClientStops.mockReset();
mockGatewayClientRequests.mockReset();
mockGatewayClientRequests.mockResolvedValue({ ok: true });
loggerMocks.debug.mockReset();
loggerMocks.error.mockReset();
mockCreateOperatorApprovalsGatewayClient.mockReset().mockImplementation(async () => ({
start: mockGatewayClientStarts,
stop: mockGatewayClientStops,
@@ -106,6 +128,49 @@ describe("createExecApprovalChannelRuntime", () => {
});
});
it("finalizes approvals that resolve while delivery is still in flight", async () => {
const pendingDelivery = createDeferred<Array<{ id: string }>>();
const finalizeResolved = vi.fn(async () => undefined);
const runtime = createExecApprovalChannelRuntime<
{ id: string },
PluginApprovalRequest,
PluginApprovalResolved
>({
label: "test/plugin-approvals",
clientDisplayName: "Test Plugin Approvals",
cfg: {} as never,
eventKinds: ["plugin"],
isConfigured: () => true,
shouldHandle: () => true,
deliverRequested: async () => pendingDelivery.promise,
finalizeResolved,
});
const requestPromise = runtime.handleRequested({
id: "plugin:abc",
request: {
title: "Plugin approval",
description: "Let plugin proceed",
},
createdAtMs: 1000,
expiresAtMs: 2000,
});
await runtime.handleResolved({
id: "plugin:abc",
decision: "allow-once",
ts: 1500,
});
pendingDelivery.resolve([{ id: "plugin:abc" }]);
await requestPromise;
expect(finalizeResolved).toHaveBeenCalledWith({
request: expect.objectContaining({ id: "plugin:abc" }),
resolved: expect.objectContaining({ id: "plugin:abc", decision: "allow-once" }),
entries: [{ id: "plugin:abc" }],
});
});
it("routes gateway requests through the shared client", async () => {
const runtime = createExecApprovalChannelRuntime({
label: "test/exec-approvals",
@@ -127,10 +192,150 @@ describe("createExecApprovalChannelRuntime", () => {
});
});
it("can retry start after gateway client creation fails", async () => {
const boom = new Error("boom");
mockCreateOperatorApprovalsGatewayClient
.mockRejectedValueOnce(boom)
.mockResolvedValueOnce({
start: mockGatewayClientStarts,
stop: mockGatewayClientStops,
request: mockGatewayClientRequests,
});
const runtime = createExecApprovalChannelRuntime({
label: "test/exec-approvals",
clientDisplayName: "Test Exec Approvals",
cfg: {} as never,
isConfigured: () => true,
shouldHandle: () => true,
deliverRequested: async () => [],
finalizeResolved: async () => undefined,
});
await expect(runtime.start()).rejects.toThrow("boom");
await runtime.start();
expect(mockCreateOperatorApprovalsGatewayClient).toHaveBeenCalledTimes(2);
expect(mockGatewayClientStarts).toHaveBeenCalledTimes(1);
});
it("does not leave a gateway client running when stop wins the startup race", async () => {
const pendingClient = createDeferred<GatewayClient>();
mockCreateOperatorApprovalsGatewayClient.mockReturnValueOnce(pendingClient.promise);
const runtime = createExecApprovalChannelRuntime({
label: "test/exec-approvals",
clientDisplayName: "Test Exec Approvals",
cfg: {} as never,
isConfigured: () => true,
shouldHandle: () => true,
deliverRequested: async () => [],
finalizeResolved: async () => undefined,
});
const startPromise = runtime.start();
const stopPromise = runtime.stop();
pendingClient.resolve({
start: mockGatewayClientStarts,
stop: mockGatewayClientStops,
request: mockGatewayClientRequests,
});
await startPromise;
await stopPromise;
expect(mockGatewayClientStarts).not.toHaveBeenCalled();
expect(mockGatewayClientStops).toHaveBeenCalledTimes(1);
await expect(runtime.request("exec.approval.resolve", { id: "abc" })).rejects.toThrow(
"gateway client not connected",
);
});
it("logs async request handling failures from gateway events", async () => {
const runtime = createExecApprovalChannelRuntime<
{ id: string },
PluginApprovalRequest,
PluginApprovalResolved
>({
label: "test/plugin-approvals",
clientDisplayName: "Test Plugin Approvals",
cfg: {} as never,
eventKinds: ["plugin"],
isConfigured: () => true,
shouldHandle: () => true,
deliverRequested: async () => {
throw new Error("deliver failed");
},
finalizeResolved: async () => undefined,
});
await runtime.start();
const clientParams = mockCreateOperatorApprovalsGatewayClient.mock.calls[0]?.[0] as
| { onEvent?: (evt: { event: string; payload: unknown }) => void }
| undefined;
clientParams?.onEvent?.({
event: "plugin.approval.requested",
payload: {
id: "plugin:abc",
request: {
title: "Plugin approval",
description: "Let plugin proceed",
},
createdAtMs: 1000,
expiresAtMs: 2000,
},
});
await vi.waitFor(() => {
expect(loggerMocks.error).toHaveBeenCalledWith(
"error handling approval request: deliver failed",
);
});
});
it("logs async expiration handling failures", async () => {
vi.useFakeTimers();
const runtime = createExecApprovalChannelRuntime<
{ id: string },
PluginApprovalRequest,
PluginApprovalResolved
>({
label: "test/plugin-approvals",
clientDisplayName: "Test Plugin Approvals",
cfg: {} as never,
nowMs: () => 1000,
eventKinds: ["plugin"],
isConfigured: () => true,
shouldHandle: () => true,
deliverRequested: async (request) => [{ id: request.id }],
finalizeResolved: async () => undefined,
finalizeExpired: async () => {
throw new Error("expire failed");
},
});
await runtime.handleRequested({
id: "plugin:abc",
request: {
title: "Plugin approval",
description: "Let plugin proceed",
},
createdAtMs: 1000,
expiresAtMs: 1001,
});
await vi.advanceTimersByTimeAsync(1);
expect(loggerMocks.error).toHaveBeenCalledWith(
"error handling approval expiration: expire failed",
);
});
it("subscribes to plugin approval events when requested", async () => {
const deliverRequested = vi.fn(async (request) => [{ id: request.id }]);
const finalizeResolved = vi.fn(async () => undefined);
const runtime = createExecApprovalChannelRuntime({
const runtime = createExecApprovalChannelRuntime<
{ id: string },
PluginApprovalRequest,
PluginApprovalResolved
>({
label: "test/plugin-approvals",
clientDisplayName: "Test Plugin Approvals",
cfg: {} as never,
@@ -184,64 +389,51 @@ describe("createExecApprovalChannelRuntime", () => {
});
});
it("ignores invalid gateway approval payloads", async () => {
const deliverRequested = vi.fn(async () => [{ id: "abc" }]);
it("clears pending state when delivery throws", async () => {
const deliverRequested = vi
.fn<() => Promise<Array<{ id: string }>>>()
.mockRejectedValueOnce(new Error("deliver failed"))
.mockResolvedValueOnce([{ id: "abc" }]);
const finalizeResolved = vi.fn(async () => undefined);
const runtime = createExecApprovalChannelRuntime({
label: "test/invalid-events",
clientDisplayName: "Test Invalid Events",
label: "test/delivery-failure",
clientDisplayName: "Test Delivery Failure",
cfg: {} as never,
eventKinds: ["exec", "plugin"],
isConfigured: () => true,
shouldHandle: () => true,
deliverRequested,
finalizeResolved,
});
await runtime.start();
const clientParams = mockCreateOperatorApprovalsGatewayClient.mock.calls[0]?.[0] as
| { onEvent?: (evt: { event: string; payload: unknown }) => void }
| undefined;
clientParams?.onEvent?.({
event: "plugin.approval.requested",
payload: { id: "plugin:bad", request: null, createdAtMs: 1000, expiresAtMs: 2000 },
});
clientParams?.onEvent?.({
event: "exec.approval.resolved",
payload: { id: "abc", decision: "maybe", ts: 1500 },
});
await vi.waitFor(() => {
expect(deliverRequested).not.toHaveBeenCalled();
expect(finalizeResolved).not.toHaveBeenCalled();
});
});
it("caps the number of tracked pending approvals", async () => {
vi.useFakeTimers();
const deliverRequested = vi.fn(async (request) => [{ id: request.id }]);
const runtime = createExecApprovalChannelRuntime({
label: "test/pending-cap",
clientDisplayName: "Test Pending Cap",
cfg: {} as never,
nowMs: () => 1000,
isConfigured: () => true,
shouldHandle: () => true,
deliverRequested,
finalizeResolved: async () => undefined,
});
for (let index = 0; index < 1001; index += 1) {
await runtime.handleRequested({
id: `approval-${index}`,
await expect(
runtime.handleRequested({
id: "abc",
request: {
command: `echo ${index}`,
command: "echo abc",
},
createdAtMs: 1000,
expiresAtMs: 61_000,
});
}
expiresAtMs: 2000,
}),
).rejects.toThrow("deliver failed");
expect(deliverRequested).toHaveBeenCalledTimes(1000);
await runtime.handleRequested({
id: "abc",
request: {
command: "echo abc",
},
createdAtMs: 1000,
expiresAtMs: 2000,
});
await runtime.handleResolved({
id: "abc",
decision: "allow-once",
ts: 1500,
});
expect(finalizeResolved).toHaveBeenCalledWith({
request: expect.objectContaining({ id: "abc" }),
resolved: expect.objectContaining({ id: "abc", decision: "allow-once" }),
entries: [{ id: "abc" }],
});
});
});

View File

@@ -8,15 +8,19 @@ import type { PluginApprovalRequest, PluginApprovalResolved } from "./plugin-app
type ApprovalRequestEvent = ExecApprovalRequest | PluginApprovalRequest;
type ApprovalResolvedEvent = ExecApprovalResolved | PluginApprovalResolved;
const MAX_PENDING_APPROVALS = 1000;
const MAX_PENDING_APPROVAL_TTL_MS = 30 * 60_000;
export type ExecApprovalChannelRuntimeEventKind = "exec" | "plugin";
type PendingApprovalEntry<TPending, TRequest extends ApprovalRequestEvent> = {
type PendingApprovalEntry<
TPending,
TRequest extends ApprovalRequestEvent,
TResolved extends ApprovalResolvedEvent,
> = {
request: TRequest;
entries: TPending[];
timeoutId: NodeJS.Timeout | null;
delivering: boolean;
pendingResolution: TResolved | null;
};
export type ExecApprovalChannelRuntimeAdapter<
@@ -66,11 +70,22 @@ export function createExecApprovalChannelRuntime<
const log = createSubsystemLogger(adapter.label);
const nowMs = adapter.nowMs ?? Date.now;
const eventKinds = new Set<ExecApprovalChannelRuntimeEventKind>(adapter.eventKinds ?? ["exec"]);
const pending = new Map<string, PendingApprovalEntry<TPending, TRequest>>();
const pending = new Map<string, PendingApprovalEntry<TPending, TRequest, TResolved>>();
let gatewayClient: GatewayClient | null = null;
let started = false;
let shouldRun = false;
let startPromise: Promise<void> | null = null;
const clearPendingEntry = (approvalId: string): PendingApprovalEntry<TPending, TRequest> | null => {
const spawn = (label: string, promise: Promise<void>): void => {
void promise.catch((err: unknown) => {
const message = err instanceof Error ? err.message : String(err);
log.error(`${label}: ${message}`);
});
};
const clearPendingEntry = (
approvalId: string,
): PendingApprovalEntry<TPending, TRequest, TResolved> | null => {
const entry = pending.get(approvalId);
if (!entry) {
return null;
@@ -82,30 +97,6 @@ export function createExecApprovalChannelRuntime<
return entry;
};
const spawn = (label: string, promise: Promise<void>): void => {
promise.catch((err) => {
log.error(`${label}: ${err instanceof Error ? err.message : String(err)}`);
});
};
const isObjectRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);
const isFiniteNumber = (value: unknown): value is number =>
typeof value === "number" && Number.isFinite(value);
const isApprovalDecision = (value: unknown): value is ExecApprovalResolved["decision"] =>
value === "allow-once" || value === "allow-always" || value === "deny";
const isApprovalRequestPayload = (value: unknown): value is TRequest =>
isObjectRecord(value) &&
typeof value.id === "string" &&
isObjectRecord(value.request) &&
isFiniteNumber(value.createdAtMs) &&
isFiniteNumber(value.expiresAtMs);
const isApprovalResolvedPayload = (value: unknown): value is TResolved =>
isObjectRecord(value) &&
typeof value.id === "string" &&
isApprovalDecision(value.decision) &&
isFiniteNumber(value.ts);
const handleExpired = async (approvalId: string): Promise<void> => {
const entry = clearPendingEntry(approvalId);
if (!entry) {
@@ -122,81 +113,94 @@ export function createExecApprovalChannelRuntime<
if (!adapter.shouldHandle(request)) {
return;
}
if (!pending.has(request.id) && pending.size >= MAX_PENDING_APPROVALS) {
log.error(`dropping request ${request.id}: pending approval cap reached`);
return;
}
log.debug(`received request ${request.id}`);
const entries = await adapter.deliverRequested(request);
if (!entries.length) {
return;
}
const timeoutMs = Math.min(
MAX_PENDING_APPROVAL_TTL_MS,
Math.max(0, request.expiresAtMs - nowMs()),
);
const timeoutId = setTimeout(() => {
spawn(`expire ${request.id}`, handleExpired(request.id));
}, timeoutMs);
timeoutId.unref?.();
const existing = pending.get(request.id);
if (existing?.timeoutId) {
clearTimeout(existing.timeoutId);
}
pending.set(request.id, {
const entry: PendingApprovalEntry<TPending, TRequest, TResolved> = {
request,
entries,
timeoutId,
});
entries: [],
timeoutId: null,
delivering: true,
pendingResolution: null,
};
pending.set(request.id, entry);
let entries: TPending[];
try {
entries = await adapter.deliverRequested(request);
} catch (err) {
if (pending.get(request.id) === entry) {
clearPendingEntry(request.id);
}
throw err;
}
const current = pending.get(request.id);
if (current !== entry) {
return;
}
if (!entries.length) {
pending.delete(request.id);
return;
}
entry.entries = entries;
entry.delivering = false;
if (entry.pendingResolution) {
pending.delete(request.id);
log.debug(`resolved ${entry.pendingResolution.id} with ${entry.pendingResolution.decision}`);
await adapter.finalizeResolved({
request: entry.request,
resolved: entry.pendingResolution,
entries: entry.entries,
});
return;
}
const timeoutMs = Math.max(0, request.expiresAtMs - nowMs());
const timeoutId = setTimeout(() => {
spawn("error handling approval expiration", handleExpired(request.id));
}, timeoutMs);
timeoutId.unref?.();
entry.timeoutId = timeoutId;
};
const handleResolved = async (resolved: TResolved): Promise<void> => {
const entry = clearPendingEntry(resolved.id);
const entry = pending.get(resolved.id);
if (!entry) {
return;
}
if (entry.delivering) {
entry.pendingResolution = resolved;
return;
}
const finalizedEntry = clearPendingEntry(resolved.id);
if (!finalizedEntry) {
return;
}
log.debug(`resolved ${resolved.id} with ${resolved.decision}`);
await adapter.finalizeResolved({
request: entry.request,
request: finalizedEntry.request,
resolved,
entries: entry.entries,
entries: finalizedEntry.entries,
});
};
const handleGatewayEvent = (evt: EventFrame): void => {
if (evt.event === "exec.approval.requested" && eventKinds.has("exec")) {
if (!isApprovalRequestPayload(evt.payload)) {
log.error("received invalid exec.approval.requested payload");
return;
}
spawn("event exec.approval.requested", handleRequested(evt.payload));
spawn("error handling approval request", handleRequested(evt.payload as TRequest));
return;
}
if (evt.event === "plugin.approval.requested" && eventKinds.has("plugin")) {
if (!isApprovalRequestPayload(evt.payload)) {
log.error("received invalid plugin.approval.requested payload");
return;
}
spawn("event plugin.approval.requested", handleRequested(evt.payload));
spawn("error handling approval request", handleRequested(evt.payload as TRequest));
return;
}
if (evt.event === "exec.approval.resolved" && eventKinds.has("exec")) {
if (!isApprovalResolvedPayload(evt.payload)) {
log.error("received invalid exec.approval.resolved payload");
return;
}
spawn("event exec.approval.resolved", handleResolved(evt.payload));
spawn("error handling approval resolved", handleResolved(evt.payload as TResolved));
return;
}
if (evt.event === "plugin.approval.resolved" && eventKinds.has("plugin")) {
if (!isApprovalResolvedPayload(evt.payload)) {
log.error("received invalid plugin.approval.resolved payload");
return;
}
spawn("event plugin.approval.resolved", handleResolved(evt.payload));
spawn("error handling approval resolved", handleResolved(evt.payload as TResolved));
}
};
@@ -205,34 +209,54 @@ export function createExecApprovalChannelRuntime<
if (started) {
return;
}
started = true;
if (!adapter.isConfigured()) {
log.debug("disabled");
if (startPromise) {
await startPromise;
return;
}
gatewayClient = await createOperatorApprovalsGatewayClient({
config: adapter.cfg,
gatewayUrl: adapter.gatewayUrl,
clientDisplayName: adapter.clientDisplayName,
onEvent: handleGatewayEvent,
onHelloOk: () => {
log.debug("connected to gateway");
},
onConnectError: (err) => {
log.error(`connect error: ${err.message}`);
},
onClose: (code, reason) => {
log.debug(`gateway closed: ${code} ${reason}`);
},
shouldRun = true;
startPromise = (async () => {
if (!adapter.isConfigured()) {
log.debug("disabled");
return;
}
const client = await createOperatorApprovalsGatewayClient({
config: adapter.cfg,
gatewayUrl: adapter.gatewayUrl,
clientDisplayName: adapter.clientDisplayName,
onEvent: handleGatewayEvent,
onHelloOk: () => {
log.debug("connected to gateway");
},
onConnectError: (err) => {
log.error(`connect error: ${err.message}`);
},
onClose: (code, reason) => {
log.debug(`gateway closed: ${code} ${reason}`);
},
});
if (!shouldRun) {
client.stop();
return;
}
client.start();
gatewayClient = client;
started = true;
})().finally(() => {
startPromise = null;
});
gatewayClient.start();
await startPromise;
},
async stop(): Promise<void> {
if (!started) {
shouldRun = false;
if (startPromise) {
await startPromise.catch(() => {});
}
if (!started && !gatewayClient) {
return;
}
started = false;