mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-07 14:31:35 +08:00
Compare commits
4 Commits
codex/exec
...
codex/exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e1426b2a9 | ||
|
|
c625a3c306 | ||
|
|
330266545b | ||
|
|
e7e27b4994 |
@@ -40,7 +40,6 @@ function toExecLikeRequest(request: ApprovalRequest): ExecApprovalRequest {
|
||||
turnSourceChannel: request.request.turnSourceChannel ?? undefined,
|
||||
turnSourceTo: request.request.turnSourceTo ?? undefined,
|
||||
turnSourceAccountId: request.request.turnSourceAccountId ?? undefined,
|
||||
turnSourceThreadId: request.request.turnSourceThreadId ?? undefined,
|
||||
},
|
||||
createdAtMs: request.createdAtMs,
|
||||
expiresAtMs: request.expiresAtMs,
|
||||
@@ -73,7 +72,6 @@ function resolveRequestSessionTarget(params: {
|
||||
turnSourceChannel: execLikeRequest.request.turnSourceChannel ?? undefined,
|
||||
turnSourceTo: execLikeRequest.request.turnSourceTo ?? undefined,
|
||||
turnSourceAccountId: execLikeRequest.request.turnSourceAccountId ?? undefined,
|
||||
turnSourceThreadId: execLikeRequest.request.turnSourceThreadId ?? undefined,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -85,50 +83,31 @@ function resolveDiscordOriginTarget(params: {
|
||||
const turnSourceChannel = params.request.request.turnSourceChannel?.trim().toLowerCase() || "";
|
||||
const turnSourceTo = normalizeDiscordOriginChannelId(params.request.request.turnSourceTo);
|
||||
const turnSourceAccountId = params.request.request.turnSourceAccountId?.trim() || "";
|
||||
const turnSourceTarget =
|
||||
turnSourceChannel === "discord" && turnSourceTo
|
||||
? {
|
||||
to: turnSourceTo,
|
||||
accountId: turnSourceAccountId || undefined,
|
||||
}
|
||||
: null;
|
||||
if (
|
||||
turnSourceTarget?.accountId &&
|
||||
params.accountId &&
|
||||
normalizeAccountId(turnSourceTarget.accountId) !== normalizeAccountId(params.accountId)
|
||||
) {
|
||||
return null;
|
||||
if (turnSourceChannel === "discord" && turnSourceTo) {
|
||||
if (
|
||||
params.accountId &&
|
||||
turnSourceAccountId &&
|
||||
normalizeAccountId(turnSourceAccountId) !== normalizeAccountId(params.accountId)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
return { to: turnSourceTo };
|
||||
}
|
||||
|
||||
const sessionTarget = resolveRequestSessionTarget(params);
|
||||
if (!sessionTarget || sessionTarget.channel !== "discord") {
|
||||
const channelId = extractDiscordChannelId(params.request.request.sessionKey?.trim() || null);
|
||||
return channelId ? { to: channelId } : null;
|
||||
}
|
||||
if (
|
||||
sessionTarget?.channel === "discord" &&
|
||||
sessionTarget.accountId &&
|
||||
params.accountId &&
|
||||
sessionTarget.accountId &&
|
||||
normalizeAccountId(sessionTarget.accountId) !== normalizeAccountId(params.accountId)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
if (
|
||||
turnSourceTarget &&
|
||||
sessionTarget?.channel === "discord" &&
|
||||
turnSourceTarget.to !== normalizeDiscordOriginChannelId(sessionTarget.to)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (turnSourceTarget) {
|
||||
return { to: turnSourceTarget.to };
|
||||
}
|
||||
if (sessionTarget?.channel === "discord") {
|
||||
const targetTo = normalizeDiscordOriginChannelId(sessionTarget.to);
|
||||
return targetTo ? { to: targetTo } : null;
|
||||
}
|
||||
const legacyChannelId = extractDiscordChannelId(params.request.request.sessionKey?.trim() || null);
|
||||
if (legacyChannelId) {
|
||||
return { to: legacyChannelId };
|
||||
}
|
||||
return null;
|
||||
const targetTo = normalizeDiscordOriginChannelId(sessionTarget.to);
|
||||
return targetTo ? { to: targetTo } : null;
|
||||
}
|
||||
|
||||
function resolveDiscordApproverDmTargets(params: {
|
||||
|
||||
@@ -17,7 +17,6 @@ import {
|
||||
} from "./exec-approvals.js";
|
||||
|
||||
type ApprovalRequest = ExecApprovalRequest | PluginApprovalRequest;
|
||||
type TelegramOriginTarget = { to: string; threadId?: number; accountId?: string };
|
||||
|
||||
function isExecApprovalRequest(request: ApprovalRequest): request is ExecApprovalRequest {
|
||||
return "command" in request.request;
|
||||
@@ -57,40 +56,30 @@ function resolveRequestSessionTarget(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function resolveTurnSourceTelegramOriginTarget(params: {
|
||||
accountId: string;
|
||||
request: ApprovalRequest;
|
||||
}): TelegramOriginTarget | null {
|
||||
const turnSourceChannel = params.request.request.turnSourceChannel?.trim().toLowerCase() || "";
|
||||
const turnSourceTo = params.request.request.turnSourceTo?.trim() || "";
|
||||
const turnSourceAccountId = params.request.request.turnSourceAccountId?.trim() || "";
|
||||
if (turnSourceChannel !== "telegram" || !turnSourceTo) {
|
||||
return null;
|
||||
}
|
||||
if (
|
||||
turnSourceAccountId &&
|
||||
normalizeAccountId(turnSourceAccountId) !== normalizeAccountId(params.accountId)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
const threadId =
|
||||
typeof params.request.request.turnSourceThreadId === "number"
|
||||
? params.request.request.turnSourceThreadId
|
||||
: typeof params.request.request.turnSourceThreadId === "string"
|
||||
? Number.parseInt(params.request.request.turnSourceThreadId, 10)
|
||||
: undefined;
|
||||
return {
|
||||
to: turnSourceTo,
|
||||
threadId: Number.isFinite(threadId) ? threadId : undefined,
|
||||
accountId: turnSourceAccountId || undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveSessionTelegramOriginTarget(params: {
|
||||
function resolveTelegramOriginTarget(params: {
|
||||
cfg: OpenClawConfig;
|
||||
accountId: string;
|
||||
request: ApprovalRequest;
|
||||
}): TelegramOriginTarget | null {
|
||||
}) {
|
||||
const turnSourceChannel = params.request.request.turnSourceChannel?.trim().toLowerCase() || "";
|
||||
const turnSourceTo = params.request.request.turnSourceTo?.trim() || "";
|
||||
const turnSourceAccountId = params.request.request.turnSourceAccountId?.trim() || "";
|
||||
if (turnSourceChannel === "telegram" && turnSourceTo) {
|
||||
if (
|
||||
turnSourceAccountId &&
|
||||
normalizeAccountId(turnSourceAccountId) !== normalizeAccountId(params.accountId)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
const threadId =
|
||||
typeof params.request.request.turnSourceThreadId === "number"
|
||||
? params.request.request.turnSourceThreadId
|
||||
: typeof params.request.request.turnSourceThreadId === "string"
|
||||
? Number.parseInt(params.request.request.turnSourceThreadId, 10)
|
||||
: undefined;
|
||||
return { to: turnSourceTo, threadId: Number.isFinite(threadId) ? threadId : undefined };
|
||||
}
|
||||
|
||||
const sessionTarget = resolveRequestSessionTarget(params);
|
||||
if (!sessionTarget || sessionTarget.channel !== "telegram") {
|
||||
return null;
|
||||
@@ -104,32 +93,9 @@ function resolveSessionTelegramOriginTarget(params: {
|
||||
return {
|
||||
to: sessionTarget.to,
|
||||
threadId: sessionTarget.threadId,
|
||||
accountId: sessionTarget.accountId,
|
||||
};
|
||||
}
|
||||
|
||||
function telegramTargetsMatch(a: TelegramOriginTarget, b: TelegramOriginTarget): boolean {
|
||||
const accountMatches =
|
||||
!a.accountId ||
|
||||
!b.accountId ||
|
||||
normalizeAccountId(a.accountId) === normalizeAccountId(b.accountId);
|
||||
return a.to === b.to && a.threadId === b.threadId && accountMatches;
|
||||
}
|
||||
|
||||
function resolveTelegramOriginTarget(params: {
|
||||
cfg: OpenClawConfig;
|
||||
accountId: string;
|
||||
request: ApprovalRequest;
|
||||
}) {
|
||||
const turnSourceTarget = resolveTurnSourceTelegramOriginTarget(params);
|
||||
const sessionTarget = resolveSessionTelegramOriginTarget(params);
|
||||
if (turnSourceTarget && sessionTarget && !telegramTargetsMatch(turnSourceTarget, sessionTarget)) {
|
||||
return null;
|
||||
}
|
||||
const target = turnSourceTarget ?? sessionTarget;
|
||||
return target ? { to: target.to, threadId: target.threadId } : null;
|
||||
}
|
||||
|
||||
function resolveTelegramApproverDmTargets(params: {
|
||||
cfg: OpenClawConfig;
|
||||
accountId?: string | null;
|
||||
|
||||
@@ -1335,10 +1335,12 @@ export const registerTelegramHandlers = ({
|
||||
});
|
||||
const authorizedApprovalSender = isPluginApproval
|
||||
? pluginApprovalAuthorizedSender
|
||||
: execApprovalAuthorizedSender;
|
||||
if (!authorizedApprovalSender) {
|
||||
: execApprovalAuthorizedSender || pluginApprovalAuthorizedSender;
|
||||
if (
|
||||
!authorizedApprovalSender
|
||||
) {
|
||||
logVerbose(
|
||||
`Blocked telegram approval callback from ${senderId || "unknown"} (not authorized)`,
|
||||
`Blocked telegram exec approval callback from ${senderId || "unknown"} (not authorized)`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
@@ -1357,9 +1359,7 @@ export const registerTelegramHandlers = ({
|
||||
logVerbose(
|
||||
`telegram: failed to resolve approval callback ${approvalCallback.approvalId}: ${errStr}`,
|
||||
);
|
||||
await replyToCallbackChat(
|
||||
"❌ Failed to submit approval. Please try again or contact an admin.",
|
||||
);
|
||||
await replyToCallbackChat(`❌ Failed to submit approval: ${errStr}`);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
||||
@@ -216,6 +216,7 @@ describe("createTelegramBot", () => {
|
||||
it("blocks callback_query when inline buttons are allowlist-only and sender not authorized", async () => {
|
||||
onSpy.mockClear();
|
||||
replySpy.mockClear();
|
||||
sendMessageSpy.mockClear();
|
||||
|
||||
createTelegramBot({
|
||||
token: "tok",
|
||||
@@ -433,8 +434,8 @@ describe("createTelegramBot", () => {
|
||||
}),
|
||||
approvalId: "138e9b8c",
|
||||
decision: "allow-once",
|
||||
senderId: "9",
|
||||
allowPluginFallback: true,
|
||||
senderId: "9",
|
||||
});
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
expect(editMessageTextSpy).not.toHaveBeenCalled();
|
||||
@@ -541,8 +542,8 @@ describe("createTelegramBot", () => {
|
||||
}),
|
||||
approvalId: "plugin:138e9b8c",
|
||||
decision: "allow-once",
|
||||
senderId: "9",
|
||||
allowPluginFallback: true,
|
||||
senderId: "9",
|
||||
});
|
||||
expect(editMessageReplyMarkupSpy).toHaveBeenCalledTimes(1);
|
||||
expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-plugin-approve");
|
||||
@@ -595,52 +596,6 @@ describe("createTelegramBot", () => {
|
||||
expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-approve-blocked");
|
||||
});
|
||||
|
||||
it("does not leak raw approval callback errors back into Telegram chat", async () => {
|
||||
onSpy.mockClear();
|
||||
sendMessageSpy.mockClear();
|
||||
resolveExecApprovalSpy.mockClear();
|
||||
resolveExecApprovalSpy.mockRejectedValueOnce(new Error("gateway secret detail"));
|
||||
|
||||
loadConfig.mockReturnValue({
|
||||
channels: {
|
||||
telegram: {
|
||||
dmPolicy: "open",
|
||||
allowFrom: ["*"],
|
||||
execApprovals: {
|
||||
enabled: true,
|
||||
approvers: ["9"],
|
||||
target: "dm",
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
createTelegramBot({ token: "tok" });
|
||||
const callbackHandler = getOnHandler("callback_query") as (
|
||||
ctx: Record<string, unknown>,
|
||||
) => Promise<void>;
|
||||
|
||||
await callbackHandler({
|
||||
callbackQuery: {
|
||||
id: "cbq-approve-error",
|
||||
data: "/approve 138e9b8c allow-once",
|
||||
from: { id: 9, first_name: "Ada", username: "ada_bot" },
|
||||
message: {
|
||||
chat: { id: 1234, type: "private" },
|
||||
date: 1736380800,
|
||||
message_id: 25,
|
||||
text: "Approval required.",
|
||||
},
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({ download: async () => new Uint8Array() }),
|
||||
});
|
||||
|
||||
expect(sendMessageSpy).toHaveBeenCalledTimes(1);
|
||||
expect(sendMessageSpy.mock.calls[0]?.[1]).toBe(
|
||||
"❌ Failed to submit approval. Please try again or contact an admin.",
|
||||
);
|
||||
});
|
||||
|
||||
it("allows exec approval callbacks from target-only Telegram recipients", async () => {
|
||||
onSpy.mockClear();
|
||||
editMessageReplyMarkupSpy.mockClear();
|
||||
@@ -695,13 +650,82 @@ describe("createTelegramBot", () => {
|
||||
}),
|
||||
approvalId: "138e9b8c",
|
||||
decision: "allow-once",
|
||||
senderId: "9",
|
||||
allowPluginFallback: false,
|
||||
senderId: "9",
|
||||
});
|
||||
expect(editMessageReplyMarkupSpy).toHaveBeenCalledTimes(1);
|
||||
expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-approve-target");
|
||||
});
|
||||
|
||||
it("does not allow target-only recipients to use legacy plugin fallback ids", async () => {
|
||||
onSpy.mockClear();
|
||||
editMessageReplyMarkupSpy.mockClear();
|
||||
editMessageTextSpy.mockClear();
|
||||
resolveExecApprovalSpy.mockClear();
|
||||
replySpy.mockClear();
|
||||
resolveExecApprovalSpy.mockRejectedValueOnce(new Error("unknown or expired approval id"));
|
||||
|
||||
loadConfig.mockReturnValue({
|
||||
approvals: {
|
||||
exec: {
|
||||
enabled: true,
|
||||
mode: "targets",
|
||||
targets: [{ channel: "telegram", to: "9" }],
|
||||
},
|
||||
},
|
||||
channels: {
|
||||
telegram: {
|
||||
dmPolicy: "open",
|
||||
allowFrom: ["*"],
|
||||
},
|
||||
},
|
||||
});
|
||||
createTelegramBot({ token: "tok" });
|
||||
const callbackHandler = onSpy.mock.calls.find((call) => call[0] === "callback_query")?.[1] as (
|
||||
ctx: Record<string, unknown>,
|
||||
) => Promise<void>;
|
||||
expect(callbackHandler).toBeDefined();
|
||||
|
||||
await callbackHandler({
|
||||
callbackQuery: {
|
||||
id: "cbq-legacy-plugin-fallback-blocked",
|
||||
data: "/approve 138e9b8c allow-once",
|
||||
from: { id: 9, first_name: "Ada", username: "ada_bot" },
|
||||
message: {
|
||||
chat: { id: 1234, type: "private" },
|
||||
date: 1736380800,
|
||||
message_id: 25,
|
||||
text: "Legacy plugin approval required.",
|
||||
},
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({ download: async () => new Uint8Array() }),
|
||||
});
|
||||
|
||||
expect(resolveExecApprovalSpy).toHaveBeenCalledWith({
|
||||
cfg: expect.objectContaining({
|
||||
approvals: expect.objectContaining({
|
||||
exec: expect.objectContaining({
|
||||
enabled: true,
|
||||
mode: "targets",
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
approvalId: "138e9b8c",
|
||||
decision: "allow-once",
|
||||
allowPluginFallback: false,
|
||||
senderId: "9",
|
||||
});
|
||||
expect(editMessageReplyMarkupSpy).not.toHaveBeenCalled();
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
expect(sendMessageSpy).toHaveBeenCalledWith(
|
||||
1234,
|
||||
"❌ Failed to submit approval: Error: unknown or expired approval id",
|
||||
undefined,
|
||||
);
|
||||
expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-legacy-plugin-fallback-blocked");
|
||||
});
|
||||
|
||||
it("keeps plugin approval callback buttons for target-only recipients", async () => {
|
||||
onSpy.mockClear();
|
||||
editMessageReplyMarkupSpy.mockClear();
|
||||
|
||||
@@ -56,7 +56,6 @@ describe("resolveTelegramExecApproval", () => {
|
||||
approvalId: "legacy-plugin-123",
|
||||
decision: "allow-always",
|
||||
senderId: "9",
|
||||
allowPluginFallback: true,
|
||||
});
|
||||
|
||||
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith(1, "exec.approval.resolve", {
|
||||
@@ -73,25 +72,34 @@ describe("resolveTelegramExecApproval", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("does not fall back to plugin.approval.resolve without explicit permission", async () => {
|
||||
gatewayRuntimeHoisted.requestSpy.mockRejectedValueOnce(
|
||||
new Error("unknown or expired approval id"),
|
||||
);
|
||||
it("falls back to plugin.approval.resolve for structured approval-not-found errors", async () => {
|
||||
const err = new Error("invalid request") as Error & {
|
||||
gatewayCode?: string;
|
||||
details?: { reason?: string };
|
||||
};
|
||||
err.gatewayCode = "INVALID_REQUEST";
|
||||
err.details = { reason: "APPROVAL_NOT_FOUND" };
|
||||
gatewayRuntimeHoisted.requestSpy.mockRejectedValueOnce(err).mockResolvedValueOnce(undefined);
|
||||
const { resolveTelegramExecApproval } = await import("./exec-approval-resolver.js");
|
||||
|
||||
await expect(
|
||||
resolveTelegramExecApproval({
|
||||
cfg: {} as never,
|
||||
approvalId: "legacy-plugin-123",
|
||||
decision: "allow-always",
|
||||
senderId: "9",
|
||||
}),
|
||||
).rejects.toThrow("unknown or expired approval id");
|
||||
|
||||
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenCalledTimes(1);
|
||||
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenCalledWith("exec.approval.resolve", {
|
||||
id: "legacy-plugin-123",
|
||||
decision: "allow-always",
|
||||
await resolveTelegramExecApproval({
|
||||
cfg: {} as never,
|
||||
approvalId: "legacy-plugin-123",
|
||||
decision: "deny",
|
||||
senderId: "9",
|
||||
});
|
||||
|
||||
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith(1, "exec.approval.resolve", {
|
||||
id: "legacy-plugin-123",
|
||||
decision: "deny",
|
||||
});
|
||||
expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
"plugin.approval.resolve",
|
||||
{
|
||||
id: "legacy-plugin-123",
|
||||
decision: "deny",
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -7,17 +7,34 @@ export type ResolveTelegramExecApprovalParams = {
|
||||
approvalId: string;
|
||||
decision: ExecApprovalReplyDecision;
|
||||
senderId?: string | null;
|
||||
gatewayUrl?: string;
|
||||
allowPluginFallback?: boolean;
|
||||
gatewayUrl?: string;
|
||||
};
|
||||
|
||||
function isApprovalNotFoundError(err: unknown): boolean {
|
||||
return /unknown or expired approval id/i.test(String(err));
|
||||
}
|
||||
|
||||
export async function resolveTelegramExecApproval(
|
||||
params: ResolveTelegramExecApprovalParams,
|
||||
): Promise<void> {
|
||||
const isApprovalNotFoundError = (err: unknown): boolean => {
|
||||
if (!(err instanceof Error)) {
|
||||
return false;
|
||||
}
|
||||
const gatewayCode = (err as { gatewayCode?: unknown }).gatewayCode;
|
||||
if (gatewayCode === "APPROVAL_NOT_FOUND") {
|
||||
return true;
|
||||
}
|
||||
const details = (err as { details?: unknown }).details;
|
||||
if (
|
||||
gatewayCode === "INVALID_REQUEST" &&
|
||||
details &&
|
||||
typeof details === "object" &&
|
||||
!Array.isArray(details) &&
|
||||
(details as { reason?: unknown }).reason === "APPROVAL_NOT_FOUND"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
return /unknown or expired approval id/i.test(err.message);
|
||||
};
|
||||
|
||||
let readySettled = false;
|
||||
let resolveReady!: () => void;
|
||||
let rejectReady!: (err: unknown) => void;
|
||||
|
||||
@@ -204,38 +204,4 @@ describe("TelegramExecApprovalHandler", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not deliver plugin approvals for a different Telegram account", async () => {
|
||||
const cfg = {
|
||||
channels: {
|
||||
telegram: {
|
||||
execApprovals: {
|
||||
enabled: true,
|
||||
approvers: ["8460800771"],
|
||||
target: "dm",
|
||||
},
|
||||
accounts: {
|
||||
secondary: {
|
||||
execApprovals: {
|
||||
enabled: true,
|
||||
approvers: ["999"],
|
||||
target: "dm",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
const { handler, sendMessage } = createHandler(cfg);
|
||||
|
||||
await handler.handleRequested({
|
||||
...pluginRequest,
|
||||
request: {
|
||||
...pluginRequest.request,
|
||||
turnSourceAccountId: "secondary",
|
||||
},
|
||||
});
|
||||
|
||||
expect(sendMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,7 +4,6 @@ import {
|
||||
createExecApprovalChannelRuntime,
|
||||
type ExecApprovalChannelRuntime,
|
||||
resolveChannelNativeApprovalDeliveryPlan,
|
||||
resolveExecApprovalSessionTarget,
|
||||
} from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { resolveExecApprovalCommandDisplay } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import {
|
||||
@@ -17,7 +16,7 @@ import type {
|
||||
PluginApprovalRequest,
|
||||
PluginApprovalResolved,
|
||||
} from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { parseAgentSessionKey, normalizeAccountId } from "openclaw/plugin-sdk/routing";
|
||||
import { parseAgentSessionKey } from "openclaw/plugin-sdk/routing";
|
||||
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
|
||||
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { compileSafeRegex, testRegexWithBoundedInput } from "openclaw/plugin-sdk/security-runtime";
|
||||
@@ -40,52 +39,6 @@ type PendingMessage = {
|
||||
messageId: string;
|
||||
};
|
||||
|
||||
function isExecApprovalRequest(request: ApprovalRequest): request is ExecApprovalRequest {
|
||||
return "command" in request.request;
|
||||
}
|
||||
|
||||
function toExecLikeRequest(request: ApprovalRequest): ExecApprovalRequest {
|
||||
if (isExecApprovalRequest(request)) {
|
||||
return request;
|
||||
}
|
||||
return {
|
||||
id: request.id,
|
||||
request: {
|
||||
command: request.request.title,
|
||||
agentId: request.request.agentId ?? undefined,
|
||||
sessionKey: request.request.sessionKey ?? undefined,
|
||||
turnSourceChannel: request.request.turnSourceChannel ?? undefined,
|
||||
turnSourceTo: request.request.turnSourceTo ?? undefined,
|
||||
turnSourceAccountId: request.request.turnSourceAccountId ?? undefined,
|
||||
turnSourceThreadId: request.request.turnSourceThreadId ?? undefined,
|
||||
},
|
||||
createdAtMs: request.createdAtMs,
|
||||
expiresAtMs: request.expiresAtMs,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveBoundTelegramAccountId(params: {
|
||||
cfg: OpenClawConfig;
|
||||
request: ApprovalRequest;
|
||||
}): string | null {
|
||||
const turnSourceChannel = params.request.request.turnSourceChannel?.trim().toLowerCase();
|
||||
if (turnSourceChannel === "telegram") {
|
||||
return params.request.request.turnSourceAccountId?.trim() || null;
|
||||
}
|
||||
const sessionTarget = resolveExecApprovalSessionTarget({
|
||||
cfg: params.cfg,
|
||||
request: toExecLikeRequest(params.request),
|
||||
turnSourceChannel: params.request.request.turnSourceChannel ?? undefined,
|
||||
turnSourceTo: params.request.request.turnSourceTo ?? undefined,
|
||||
turnSourceAccountId: params.request.request.turnSourceAccountId ?? undefined,
|
||||
turnSourceThreadId: params.request.request.turnSourceThreadId ?? undefined,
|
||||
});
|
||||
if (!sessionTarget || sessionTarget.channel !== "telegram") {
|
||||
return null;
|
||||
}
|
||||
return sessionTarget.accountId?.trim() || null;
|
||||
}
|
||||
|
||||
export type TelegramExecApprovalHandlerOpts = {
|
||||
token: string;
|
||||
accountId: string;
|
||||
@@ -144,16 +97,6 @@ function matchesFilters(params: {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
const boundAccountId = resolveBoundTelegramAccountId({
|
||||
cfg: params.cfg,
|
||||
request: params.request,
|
||||
});
|
||||
if (
|
||||
boundAccountId &&
|
||||
normalizeAccountId(boundAccountId) !== normalizeAccountId(params.accountId)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -247,9 +247,7 @@ export const handleApproveCommand: CommandHandler = async (params, allowTextComm
|
||||
if (isApprovalNotFoundError(err)) {
|
||||
return {
|
||||
shouldContinue: false,
|
||||
reply: {
|
||||
text: pluginApprovalAuthorization.reason ?? "❌ You are not authorized to approve this request.",
|
||||
},
|
||||
reply: { text: `❌ Failed to submit approval: ${String(err)}` },
|
||||
};
|
||||
}
|
||||
return {
|
||||
|
||||
@@ -115,6 +115,7 @@ vi.resetModules();
|
||||
|
||||
const {
|
||||
addSubagentRunForTests,
|
||||
getSubagentRunByChildSessionKey,
|
||||
listSubagentRunsForRequester,
|
||||
resetSubagentRegistryForTests,
|
||||
} = await import("../../agents/subagent-registry.js");
|
||||
@@ -442,6 +443,25 @@ describe("/approve command", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps /approve blocked for unauthorized senders without explicit approval auth", async () => {
|
||||
const cfg = {
|
||||
commands: { text: true },
|
||||
channels: { slack: { allowFrom: ["*"] } },
|
||||
} as OpenClawConfig;
|
||||
const params = buildParams("/approve abc allow-once", cfg, {
|
||||
Provider: "slack",
|
||||
Surface: "slack",
|
||||
SenderId: "123",
|
||||
});
|
||||
params.command.isAuthorizedSender = false;
|
||||
|
||||
const result = await handleCommands(params);
|
||||
|
||||
expect(result.shouldContinue).toBe(false);
|
||||
expect(result.reply).toBeUndefined();
|
||||
expect(callGatewayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("accepts Telegram command mentions for /approve", async () => {
|
||||
const cfg = createTelegramApproveCfg();
|
||||
const params = buildParams("/approve@bot abc12345 allow-once", cfg, {
|
||||
@@ -486,23 +506,6 @@ describe("/approve command", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("does not treat implicit default approval auth as a bypass for unauthorized senders", async () => {
|
||||
const cfg = {
|
||||
commands: { text: true },
|
||||
} as OpenClawConfig;
|
||||
const params = buildParams("/approve abc12345 allow-once", cfg, {
|
||||
Provider: "webchat",
|
||||
Surface: "webchat",
|
||||
SenderId: "123",
|
||||
});
|
||||
params.command.isAuthorizedSender = false;
|
||||
|
||||
const result = await handleCommands(params);
|
||||
expect(result.shouldContinue).toBe(false);
|
||||
expect(result.reply).toBeUndefined();
|
||||
expect(callGatewayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("accepts Telegram /approve from exec target recipients even when native approvals are disabled", async () => {
|
||||
const cfg = {
|
||||
commands: { text: true },
|
||||
@@ -692,6 +695,24 @@ describe("/approve command", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("returns the real plugin not-found error for authorized plugin approvers", async () => {
|
||||
callGatewayMock.mockRejectedValueOnce(new Error("unknown or expired approval id"));
|
||||
const params = buildParams(
|
||||
"/approve plugin:abc123 allow-once",
|
||||
createDiscordApproveCfg({ enabled: false, approvers: ["123"], target: "channel" }),
|
||||
{
|
||||
Provider: "discord",
|
||||
Surface: "discord",
|
||||
SenderId: "123",
|
||||
},
|
||||
);
|
||||
|
||||
const result = await handleCommands(params);
|
||||
expect(result.shouldContinue).toBe(false);
|
||||
expect(result.reply?.text).toContain("unknown or expired approval id");
|
||||
expect(result.reply?.text).not.toContain("not authorized");
|
||||
});
|
||||
|
||||
it("rejects unauthorized or invalid Telegram /approve variants", async () => {
|
||||
for (const testCase of [
|
||||
{
|
||||
|
||||
@@ -2,36 +2,26 @@ import { getChannelPlugin } from "../channels/plugins/index.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { normalizeMessageChannel } from "../utils/message-channel.js";
|
||||
|
||||
export type ApprovalCommandAuthorization = {
|
||||
authorized: boolean;
|
||||
reason?: string;
|
||||
explicit: boolean;
|
||||
};
|
||||
|
||||
export function resolveApprovalCommandAuthorization(params: {
|
||||
cfg: OpenClawConfig;
|
||||
channel?: string | null;
|
||||
accountId?: string | null;
|
||||
senderId?: string | null;
|
||||
kind: "exec" | "plugin";
|
||||
}): ApprovalCommandAuthorization {
|
||||
}): { authorized: boolean; reason?: string; explicit: boolean } {
|
||||
const channel = normalizeMessageChannel(params.channel);
|
||||
if (!channel) {
|
||||
return { authorized: true, explicit: false };
|
||||
}
|
||||
const resolved = getChannelPlugin(channel)?.auth?.authorizeActorAction?.({
|
||||
const result = getChannelPlugin(channel)?.auth?.authorizeActorAction?.({
|
||||
cfg: params.cfg,
|
||||
accountId: params.accountId,
|
||||
senderId: params.senderId,
|
||||
action: "approve",
|
||||
approvalKind: params.kind,
|
||||
});
|
||||
if (!resolved) {
|
||||
if (!result) {
|
||||
return { authorized: true, explicit: false };
|
||||
}
|
||||
return {
|
||||
authorized: resolved.authorized,
|
||||
reason: resolved.reason,
|
||||
explicit: true,
|
||||
};
|
||||
return { ...result, explicit: true };
|
||||
}
|
||||
|
||||
@@ -1,21 +1,43 @@
|
||||
import type { GatewayClient } from "../gateway/client.js";
|
||||
import type { PluginApprovalRequest, PluginApprovalResolved } from "./plugin-approvals.js";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const mockGatewayClientStarts = vi.hoisted(() => vi.fn());
|
||||
const mockGatewayClientStops = vi.hoisted(() => vi.fn());
|
||||
const mockGatewayClientRequests = vi.hoisted(() => vi.fn(async () => ({ ok: true })));
|
||||
const mockCreateOperatorApprovalsGatewayClient = vi.hoisted(() => vi.fn());
|
||||
const loggerMocks = vi.hoisted(() => ({
|
||||
debug: vi.fn(),
|
||||
error: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../gateway/operator-approvals-client.js", () => ({
|
||||
createOperatorApprovalsGatewayClient: mockCreateOperatorApprovalsGatewayClient,
|
||||
}));
|
||||
|
||||
vi.mock("../logging/subsystem.js", () => ({
|
||||
createSubsystemLogger: () => loggerMocks,
|
||||
}));
|
||||
|
||||
let createExecApprovalChannelRuntime: typeof import("./exec-approval-channel-runtime.js").createExecApprovalChannelRuntime;
|
||||
|
||||
function createDeferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((promiseResolve, promiseReject) => {
|
||||
resolve = promiseResolve;
|
||||
reject = promiseReject;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
mockGatewayClientStarts.mockReset();
|
||||
mockGatewayClientStops.mockReset();
|
||||
mockGatewayClientRequests.mockReset();
|
||||
mockGatewayClientRequests.mockResolvedValue({ ok: true });
|
||||
loggerMocks.debug.mockReset();
|
||||
loggerMocks.error.mockReset();
|
||||
mockCreateOperatorApprovalsGatewayClient.mockReset().mockImplementation(async () => ({
|
||||
start: mockGatewayClientStarts,
|
||||
stop: mockGatewayClientStops,
|
||||
@@ -106,6 +128,49 @@ describe("createExecApprovalChannelRuntime", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("finalizes approvals that resolve while delivery is still in flight", async () => {
|
||||
const pendingDelivery = createDeferred<Array<{ id: string }>>();
|
||||
const finalizeResolved = vi.fn(async () => undefined);
|
||||
const runtime = createExecApprovalChannelRuntime<
|
||||
{ id: string },
|
||||
PluginApprovalRequest,
|
||||
PluginApprovalResolved
|
||||
>({
|
||||
label: "test/plugin-approvals",
|
||||
clientDisplayName: "Test Plugin Approvals",
|
||||
cfg: {} as never,
|
||||
eventKinds: ["plugin"],
|
||||
isConfigured: () => true,
|
||||
shouldHandle: () => true,
|
||||
deliverRequested: async () => pendingDelivery.promise,
|
||||
finalizeResolved,
|
||||
});
|
||||
|
||||
const requestPromise = runtime.handleRequested({
|
||||
id: "plugin:abc",
|
||||
request: {
|
||||
title: "Plugin approval",
|
||||
description: "Let plugin proceed",
|
||||
},
|
||||
createdAtMs: 1000,
|
||||
expiresAtMs: 2000,
|
||||
});
|
||||
await runtime.handleResolved({
|
||||
id: "plugin:abc",
|
||||
decision: "allow-once",
|
||||
ts: 1500,
|
||||
});
|
||||
|
||||
pendingDelivery.resolve([{ id: "plugin:abc" }]);
|
||||
await requestPromise;
|
||||
|
||||
expect(finalizeResolved).toHaveBeenCalledWith({
|
||||
request: expect.objectContaining({ id: "plugin:abc" }),
|
||||
resolved: expect.objectContaining({ id: "plugin:abc", decision: "allow-once" }),
|
||||
entries: [{ id: "plugin:abc" }],
|
||||
});
|
||||
});
|
||||
|
||||
it("routes gateway requests through the shared client", async () => {
|
||||
const runtime = createExecApprovalChannelRuntime({
|
||||
label: "test/exec-approvals",
|
||||
@@ -127,10 +192,150 @@ describe("createExecApprovalChannelRuntime", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("can retry start after gateway client creation fails", async () => {
|
||||
const boom = new Error("boom");
|
||||
mockCreateOperatorApprovalsGatewayClient
|
||||
.mockRejectedValueOnce(boom)
|
||||
.mockResolvedValueOnce({
|
||||
start: mockGatewayClientStarts,
|
||||
stop: mockGatewayClientStops,
|
||||
request: mockGatewayClientRequests,
|
||||
});
|
||||
const runtime = createExecApprovalChannelRuntime({
|
||||
label: "test/exec-approvals",
|
||||
clientDisplayName: "Test Exec Approvals",
|
||||
cfg: {} as never,
|
||||
isConfigured: () => true,
|
||||
shouldHandle: () => true,
|
||||
deliverRequested: async () => [],
|
||||
finalizeResolved: async () => undefined,
|
||||
});
|
||||
|
||||
await expect(runtime.start()).rejects.toThrow("boom");
|
||||
await runtime.start();
|
||||
|
||||
expect(mockCreateOperatorApprovalsGatewayClient).toHaveBeenCalledTimes(2);
|
||||
expect(mockGatewayClientStarts).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not leave a gateway client running when stop wins the startup race", async () => {
|
||||
const pendingClient = createDeferred<GatewayClient>();
|
||||
mockCreateOperatorApprovalsGatewayClient.mockReturnValueOnce(pendingClient.promise);
|
||||
const runtime = createExecApprovalChannelRuntime({
|
||||
label: "test/exec-approvals",
|
||||
clientDisplayName: "Test Exec Approvals",
|
||||
cfg: {} as never,
|
||||
isConfigured: () => true,
|
||||
shouldHandle: () => true,
|
||||
deliverRequested: async () => [],
|
||||
finalizeResolved: async () => undefined,
|
||||
});
|
||||
|
||||
const startPromise = runtime.start();
|
||||
const stopPromise = runtime.stop();
|
||||
pendingClient.resolve({
|
||||
start: mockGatewayClientStarts,
|
||||
stop: mockGatewayClientStops,
|
||||
request: mockGatewayClientRequests,
|
||||
});
|
||||
await startPromise;
|
||||
await stopPromise;
|
||||
|
||||
expect(mockGatewayClientStarts).not.toHaveBeenCalled();
|
||||
expect(mockGatewayClientStops).toHaveBeenCalledTimes(1);
|
||||
await expect(runtime.request("exec.approval.resolve", { id: "abc" })).rejects.toThrow(
|
||||
"gateway client not connected",
|
||||
);
|
||||
});
|
||||
|
||||
it("logs async request handling failures from gateway events", async () => {
|
||||
const runtime = createExecApprovalChannelRuntime<
|
||||
{ id: string },
|
||||
PluginApprovalRequest,
|
||||
PluginApprovalResolved
|
||||
>({
|
||||
label: "test/plugin-approvals",
|
||||
clientDisplayName: "Test Plugin Approvals",
|
||||
cfg: {} as never,
|
||||
eventKinds: ["plugin"],
|
||||
isConfigured: () => true,
|
||||
shouldHandle: () => true,
|
||||
deliverRequested: async () => {
|
||||
throw new Error("deliver failed");
|
||||
},
|
||||
finalizeResolved: async () => undefined,
|
||||
});
|
||||
|
||||
await runtime.start();
|
||||
const clientParams = mockCreateOperatorApprovalsGatewayClient.mock.calls[0]?.[0] as
|
||||
| { onEvent?: (evt: { event: string; payload: unknown }) => void }
|
||||
| undefined;
|
||||
|
||||
clientParams?.onEvent?.({
|
||||
event: "plugin.approval.requested",
|
||||
payload: {
|
||||
id: "plugin:abc",
|
||||
request: {
|
||||
title: "Plugin approval",
|
||||
description: "Let plugin proceed",
|
||||
},
|
||||
createdAtMs: 1000,
|
||||
expiresAtMs: 2000,
|
||||
},
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(loggerMocks.error).toHaveBeenCalledWith(
|
||||
"error handling approval request: deliver failed",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it("logs async expiration handling failures", async () => {
|
||||
vi.useFakeTimers();
|
||||
const runtime = createExecApprovalChannelRuntime<
|
||||
{ id: string },
|
||||
PluginApprovalRequest,
|
||||
PluginApprovalResolved
|
||||
>({
|
||||
label: "test/plugin-approvals",
|
||||
clientDisplayName: "Test Plugin Approvals",
|
||||
cfg: {} as never,
|
||||
nowMs: () => 1000,
|
||||
eventKinds: ["plugin"],
|
||||
isConfigured: () => true,
|
||||
shouldHandle: () => true,
|
||||
deliverRequested: async (request) => [{ id: request.id }],
|
||||
finalizeResolved: async () => undefined,
|
||||
finalizeExpired: async () => {
|
||||
throw new Error("expire failed");
|
||||
},
|
||||
});
|
||||
|
||||
await runtime.handleRequested({
|
||||
id: "plugin:abc",
|
||||
request: {
|
||||
title: "Plugin approval",
|
||||
description: "Let plugin proceed",
|
||||
},
|
||||
createdAtMs: 1000,
|
||||
expiresAtMs: 1001,
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
expect(loggerMocks.error).toHaveBeenCalledWith(
|
||||
"error handling approval expiration: expire failed",
|
||||
);
|
||||
});
|
||||
|
||||
it("subscribes to plugin approval events when requested", async () => {
|
||||
const deliverRequested = vi.fn(async (request) => [{ id: request.id }]);
|
||||
const finalizeResolved = vi.fn(async () => undefined);
|
||||
const runtime = createExecApprovalChannelRuntime({
|
||||
const runtime = createExecApprovalChannelRuntime<
|
||||
{ id: string },
|
||||
PluginApprovalRequest,
|
||||
PluginApprovalResolved
|
||||
>({
|
||||
label: "test/plugin-approvals",
|
||||
clientDisplayName: "Test Plugin Approvals",
|
||||
cfg: {} as never,
|
||||
@@ -184,64 +389,51 @@ describe("createExecApprovalChannelRuntime", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores invalid gateway approval payloads", async () => {
|
||||
const deliverRequested = vi.fn(async () => [{ id: "abc" }]);
|
||||
it("clears pending state when delivery throws", async () => {
|
||||
const deliverRequested = vi
|
||||
.fn<() => Promise<Array<{ id: string }>>>()
|
||||
.mockRejectedValueOnce(new Error("deliver failed"))
|
||||
.mockResolvedValueOnce([{ id: "abc" }]);
|
||||
const finalizeResolved = vi.fn(async () => undefined);
|
||||
const runtime = createExecApprovalChannelRuntime({
|
||||
label: "test/invalid-events",
|
||||
clientDisplayName: "Test Invalid Events",
|
||||
label: "test/delivery-failure",
|
||||
clientDisplayName: "Test Delivery Failure",
|
||||
cfg: {} as never,
|
||||
eventKinds: ["exec", "plugin"],
|
||||
isConfigured: () => true,
|
||||
shouldHandle: () => true,
|
||||
deliverRequested,
|
||||
finalizeResolved,
|
||||
});
|
||||
|
||||
await runtime.start();
|
||||
const clientParams = mockCreateOperatorApprovalsGatewayClient.mock.calls[0]?.[0] as
|
||||
| { onEvent?: (evt: { event: string; payload: unknown }) => void }
|
||||
| undefined;
|
||||
|
||||
clientParams?.onEvent?.({
|
||||
event: "plugin.approval.requested",
|
||||
payload: { id: "plugin:bad", request: null, createdAtMs: 1000, expiresAtMs: 2000 },
|
||||
});
|
||||
clientParams?.onEvent?.({
|
||||
event: "exec.approval.resolved",
|
||||
payload: { id: "abc", decision: "maybe", ts: 1500 },
|
||||
});
|
||||
await vi.waitFor(() => {
|
||||
expect(deliverRequested).not.toHaveBeenCalled();
|
||||
expect(finalizeResolved).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("caps the number of tracked pending approvals", async () => {
|
||||
vi.useFakeTimers();
|
||||
const deliverRequested = vi.fn(async (request) => [{ id: request.id }]);
|
||||
const runtime = createExecApprovalChannelRuntime({
|
||||
label: "test/pending-cap",
|
||||
clientDisplayName: "Test Pending Cap",
|
||||
cfg: {} as never,
|
||||
nowMs: () => 1000,
|
||||
isConfigured: () => true,
|
||||
shouldHandle: () => true,
|
||||
deliverRequested,
|
||||
finalizeResolved: async () => undefined,
|
||||
});
|
||||
|
||||
for (let index = 0; index < 1001; index += 1) {
|
||||
await runtime.handleRequested({
|
||||
id: `approval-${index}`,
|
||||
await expect(
|
||||
runtime.handleRequested({
|
||||
id: "abc",
|
||||
request: {
|
||||
command: `echo ${index}`,
|
||||
command: "echo abc",
|
||||
},
|
||||
createdAtMs: 1000,
|
||||
expiresAtMs: 61_000,
|
||||
});
|
||||
}
|
||||
expiresAtMs: 2000,
|
||||
}),
|
||||
).rejects.toThrow("deliver failed");
|
||||
|
||||
expect(deliverRequested).toHaveBeenCalledTimes(1000);
|
||||
await runtime.handleRequested({
|
||||
id: "abc",
|
||||
request: {
|
||||
command: "echo abc",
|
||||
},
|
||||
createdAtMs: 1000,
|
||||
expiresAtMs: 2000,
|
||||
});
|
||||
await runtime.handleResolved({
|
||||
id: "abc",
|
||||
decision: "allow-once",
|
||||
ts: 1500,
|
||||
});
|
||||
|
||||
expect(finalizeResolved).toHaveBeenCalledWith({
|
||||
request: expect.objectContaining({ id: "abc" }),
|
||||
resolved: expect.objectContaining({ id: "abc", decision: "allow-once" }),
|
||||
entries: [{ id: "abc" }],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -8,15 +8,19 @@ import type { PluginApprovalRequest, PluginApprovalResolved } from "./plugin-app
|
||||
|
||||
type ApprovalRequestEvent = ExecApprovalRequest | PluginApprovalRequest;
|
||||
type ApprovalResolvedEvent = ExecApprovalResolved | PluginApprovalResolved;
|
||||
const MAX_PENDING_APPROVALS = 1000;
|
||||
const MAX_PENDING_APPROVAL_TTL_MS = 30 * 60_000;
|
||||
|
||||
export type ExecApprovalChannelRuntimeEventKind = "exec" | "plugin";
|
||||
|
||||
type PendingApprovalEntry<TPending, TRequest extends ApprovalRequestEvent> = {
|
||||
type PendingApprovalEntry<
|
||||
TPending,
|
||||
TRequest extends ApprovalRequestEvent,
|
||||
TResolved extends ApprovalResolvedEvent,
|
||||
> = {
|
||||
request: TRequest;
|
||||
entries: TPending[];
|
||||
timeoutId: NodeJS.Timeout | null;
|
||||
delivering: boolean;
|
||||
pendingResolution: TResolved | null;
|
||||
};
|
||||
|
||||
export type ExecApprovalChannelRuntimeAdapter<
|
||||
@@ -66,11 +70,22 @@ export function createExecApprovalChannelRuntime<
|
||||
const log = createSubsystemLogger(adapter.label);
|
||||
const nowMs = adapter.nowMs ?? Date.now;
|
||||
const eventKinds = new Set<ExecApprovalChannelRuntimeEventKind>(adapter.eventKinds ?? ["exec"]);
|
||||
const pending = new Map<string, PendingApprovalEntry<TPending, TRequest>>();
|
||||
const pending = new Map<string, PendingApprovalEntry<TPending, TRequest, TResolved>>();
|
||||
let gatewayClient: GatewayClient | null = null;
|
||||
let started = false;
|
||||
let shouldRun = false;
|
||||
let startPromise: Promise<void> | null = null;
|
||||
|
||||
const clearPendingEntry = (approvalId: string): PendingApprovalEntry<TPending, TRequest> | null => {
|
||||
const spawn = (label: string, promise: Promise<void>): void => {
|
||||
void promise.catch((err: unknown) => {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
log.error(`${label}: ${message}`);
|
||||
});
|
||||
};
|
||||
|
||||
const clearPendingEntry = (
|
||||
approvalId: string,
|
||||
): PendingApprovalEntry<TPending, TRequest, TResolved> | null => {
|
||||
const entry = pending.get(approvalId);
|
||||
if (!entry) {
|
||||
return null;
|
||||
@@ -82,30 +97,6 @@ export function createExecApprovalChannelRuntime<
|
||||
return entry;
|
||||
};
|
||||
|
||||
const spawn = (label: string, promise: Promise<void>): void => {
|
||||
promise.catch((err) => {
|
||||
log.error(`${label}: ${err instanceof Error ? err.message : String(err)}`);
|
||||
});
|
||||
};
|
||||
|
||||
const isObjectRecord = (value: unknown): value is Record<string, unknown> =>
|
||||
typeof value === "object" && value !== null && !Array.isArray(value);
|
||||
const isFiniteNumber = (value: unknown): value is number =>
|
||||
typeof value === "number" && Number.isFinite(value);
|
||||
const isApprovalDecision = (value: unknown): value is ExecApprovalResolved["decision"] =>
|
||||
value === "allow-once" || value === "allow-always" || value === "deny";
|
||||
const isApprovalRequestPayload = (value: unknown): value is TRequest =>
|
||||
isObjectRecord(value) &&
|
||||
typeof value.id === "string" &&
|
||||
isObjectRecord(value.request) &&
|
||||
isFiniteNumber(value.createdAtMs) &&
|
||||
isFiniteNumber(value.expiresAtMs);
|
||||
const isApprovalResolvedPayload = (value: unknown): value is TResolved =>
|
||||
isObjectRecord(value) &&
|
||||
typeof value.id === "string" &&
|
||||
isApprovalDecision(value.decision) &&
|
||||
isFiniteNumber(value.ts);
|
||||
|
||||
const handleExpired = async (approvalId: string): Promise<void> => {
|
||||
const entry = clearPendingEntry(approvalId);
|
||||
if (!entry) {
|
||||
@@ -122,81 +113,94 @@ export function createExecApprovalChannelRuntime<
|
||||
if (!adapter.shouldHandle(request)) {
|
||||
return;
|
||||
}
|
||||
if (!pending.has(request.id) && pending.size >= MAX_PENDING_APPROVALS) {
|
||||
log.error(`dropping request ${request.id}: pending approval cap reached`);
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug(`received request ${request.id}`);
|
||||
const entries = await adapter.deliverRequested(request);
|
||||
if (!entries.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const timeoutMs = Math.min(
|
||||
MAX_PENDING_APPROVAL_TTL_MS,
|
||||
Math.max(0, request.expiresAtMs - nowMs()),
|
||||
);
|
||||
const timeoutId = setTimeout(() => {
|
||||
spawn(`expire ${request.id}`, handleExpired(request.id));
|
||||
}, timeoutMs);
|
||||
timeoutId.unref?.();
|
||||
|
||||
const existing = pending.get(request.id);
|
||||
if (existing?.timeoutId) {
|
||||
clearTimeout(existing.timeoutId);
|
||||
}
|
||||
pending.set(request.id, {
|
||||
const entry: PendingApprovalEntry<TPending, TRequest, TResolved> = {
|
||||
request,
|
||||
entries,
|
||||
timeoutId,
|
||||
});
|
||||
entries: [],
|
||||
timeoutId: null,
|
||||
delivering: true,
|
||||
pendingResolution: null,
|
||||
};
|
||||
pending.set(request.id, entry);
|
||||
let entries: TPending[];
|
||||
try {
|
||||
entries = await adapter.deliverRequested(request);
|
||||
} catch (err) {
|
||||
if (pending.get(request.id) === entry) {
|
||||
clearPendingEntry(request.id);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
const current = pending.get(request.id);
|
||||
if (current !== entry) {
|
||||
return;
|
||||
}
|
||||
if (!entries.length) {
|
||||
pending.delete(request.id);
|
||||
return;
|
||||
}
|
||||
entry.entries = entries;
|
||||
entry.delivering = false;
|
||||
if (entry.pendingResolution) {
|
||||
pending.delete(request.id);
|
||||
log.debug(`resolved ${entry.pendingResolution.id} with ${entry.pendingResolution.decision}`);
|
||||
await adapter.finalizeResolved({
|
||||
request: entry.request,
|
||||
resolved: entry.pendingResolution,
|
||||
entries: entry.entries,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const timeoutMs = Math.max(0, request.expiresAtMs - nowMs());
|
||||
const timeoutId = setTimeout(() => {
|
||||
spawn("error handling approval expiration", handleExpired(request.id));
|
||||
}, timeoutMs);
|
||||
timeoutId.unref?.();
|
||||
entry.timeoutId = timeoutId;
|
||||
};
|
||||
|
||||
const handleResolved = async (resolved: TResolved): Promise<void> => {
|
||||
const entry = clearPendingEntry(resolved.id);
|
||||
const entry = pending.get(resolved.id);
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
if (entry.delivering) {
|
||||
entry.pendingResolution = resolved;
|
||||
return;
|
||||
}
|
||||
const finalizedEntry = clearPendingEntry(resolved.id);
|
||||
if (!finalizedEntry) {
|
||||
return;
|
||||
}
|
||||
log.debug(`resolved ${resolved.id} with ${resolved.decision}`);
|
||||
await adapter.finalizeResolved({
|
||||
request: entry.request,
|
||||
request: finalizedEntry.request,
|
||||
resolved,
|
||||
entries: entry.entries,
|
||||
entries: finalizedEntry.entries,
|
||||
});
|
||||
};
|
||||
|
||||
const handleGatewayEvent = (evt: EventFrame): void => {
|
||||
if (evt.event === "exec.approval.requested" && eventKinds.has("exec")) {
|
||||
if (!isApprovalRequestPayload(evt.payload)) {
|
||||
log.error("received invalid exec.approval.requested payload");
|
||||
return;
|
||||
}
|
||||
spawn("event exec.approval.requested", handleRequested(evt.payload));
|
||||
spawn("error handling approval request", handleRequested(evt.payload as TRequest));
|
||||
return;
|
||||
}
|
||||
if (evt.event === "plugin.approval.requested" && eventKinds.has("plugin")) {
|
||||
if (!isApprovalRequestPayload(evt.payload)) {
|
||||
log.error("received invalid plugin.approval.requested payload");
|
||||
return;
|
||||
}
|
||||
spawn("event plugin.approval.requested", handleRequested(evt.payload));
|
||||
spawn("error handling approval request", handleRequested(evt.payload as TRequest));
|
||||
return;
|
||||
}
|
||||
if (evt.event === "exec.approval.resolved" && eventKinds.has("exec")) {
|
||||
if (!isApprovalResolvedPayload(evt.payload)) {
|
||||
log.error("received invalid exec.approval.resolved payload");
|
||||
return;
|
||||
}
|
||||
spawn("event exec.approval.resolved", handleResolved(evt.payload));
|
||||
spawn("error handling approval resolved", handleResolved(evt.payload as TResolved));
|
||||
return;
|
||||
}
|
||||
if (evt.event === "plugin.approval.resolved" && eventKinds.has("plugin")) {
|
||||
if (!isApprovalResolvedPayload(evt.payload)) {
|
||||
log.error("received invalid plugin.approval.resolved payload");
|
||||
return;
|
||||
}
|
||||
spawn("event plugin.approval.resolved", handleResolved(evt.payload));
|
||||
spawn("error handling approval resolved", handleResolved(evt.payload as TResolved));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -205,34 +209,54 @@ export function createExecApprovalChannelRuntime<
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
started = true;
|
||||
|
||||
if (!adapter.isConfigured()) {
|
||||
log.debug("disabled");
|
||||
if (startPromise) {
|
||||
await startPromise;
|
||||
return;
|
||||
}
|
||||
|
||||
gatewayClient = await createOperatorApprovalsGatewayClient({
|
||||
config: adapter.cfg,
|
||||
gatewayUrl: adapter.gatewayUrl,
|
||||
clientDisplayName: adapter.clientDisplayName,
|
||||
onEvent: handleGatewayEvent,
|
||||
onHelloOk: () => {
|
||||
log.debug("connected to gateway");
|
||||
},
|
||||
onConnectError: (err) => {
|
||||
log.error(`connect error: ${err.message}`);
|
||||
},
|
||||
onClose: (code, reason) => {
|
||||
log.debug(`gateway closed: ${code} ${reason}`);
|
||||
},
|
||||
shouldRun = true;
|
||||
startPromise = (async () => {
|
||||
if (!adapter.isConfigured()) {
|
||||
log.debug("disabled");
|
||||
return;
|
||||
}
|
||||
|
||||
const client = await createOperatorApprovalsGatewayClient({
|
||||
config: adapter.cfg,
|
||||
gatewayUrl: adapter.gatewayUrl,
|
||||
clientDisplayName: adapter.clientDisplayName,
|
||||
onEvent: handleGatewayEvent,
|
||||
onHelloOk: () => {
|
||||
log.debug("connected to gateway");
|
||||
},
|
||||
onConnectError: (err) => {
|
||||
log.error(`connect error: ${err.message}`);
|
||||
},
|
||||
onClose: (code, reason) => {
|
||||
log.debug(`gateway closed: ${code} ${reason}`);
|
||||
},
|
||||
});
|
||||
|
||||
if (!shouldRun) {
|
||||
client.stop();
|
||||
return;
|
||||
}
|
||||
client.start();
|
||||
gatewayClient = client;
|
||||
started = true;
|
||||
})().finally(() => {
|
||||
startPromise = null;
|
||||
});
|
||||
|
||||
gatewayClient.start();
|
||||
await startPromise;
|
||||
},
|
||||
|
||||
async stop(): Promise<void> {
|
||||
if (!started) {
|
||||
shouldRun = false;
|
||||
if (startPromise) {
|
||||
await startPromise.catch(() => {});
|
||||
}
|
||||
if (!started && !gatewayClient) {
|
||||
return;
|
||||
}
|
||||
started = false;
|
||||
|
||||
Reference in New Issue
Block a user