mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 14:01:24 +08:00
Compare commits
11 Commits
v2026.4.2
...
workspace/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91ae967fae | ||
|
|
8f095d8f5f | ||
|
|
b4649f3238 | ||
|
|
15c2ed0d03 | ||
|
|
2032c14aab | ||
|
|
92cbe65237 | ||
|
|
67f64f9b7d | ||
|
|
d5ff7bdf30 | ||
|
|
21678f0149 | ||
|
|
5375586a28 | ||
|
|
7ea3f13950 |
4
.github/labeler.yml
vendored
4
.github/labeler.yml
vendored
@@ -226,6 +226,10 @@
|
||||
- changed-files:
|
||||
- any-glob-to-any-file:
|
||||
- "extensions/open-prose/**"
|
||||
"extensions: webhooks":
|
||||
- changed-files:
|
||||
- any-glob-to-any-file:
|
||||
- "extensions/webhooks/**"
|
||||
"extensions: device-pair":
|
||||
- changed-files:
|
||||
- any-glob-to-any-file:
|
||||
|
||||
@@ -17,6 +17,8 @@ Docs: https://docs.openclaw.ai
|
||||
- Feishu/comments: add a dedicated Drive comment-event flow with comment-thread context resolution, in-thread replies, and `feishu_drive` comment actions for document collaboration workflows. (#58497) thanks @wittam-01.
|
||||
- Tasks/TaskFlow: restore the core TaskFlow substrate with managed-vs-mirrored sync modes, durable flow state/revision tracking, and `openclaw flows` inspection/recovery primitives so background orchestration can persist and be operated separately from plugin authoring layers. (#58930) Thanks @mbelinky.
|
||||
- Tasks/TaskFlow: add managed child task spawning plus sticky cancel intent, so external orchestrators can stop scheduling immediately and let parent TaskFlows settle to `cancelled` once active child tasks finish. (#59610) Thanks @mbelinky.
|
||||
- Plugins/TaskFlow: add a bound `api.runtime.taskFlow` seam so plugins and trusted authoring layers can create and drive managed TaskFlows from host-resolved OpenClaw context without passing owner identifiers on each call. (#59622) Thanks @mbelinky.
|
||||
- Plugins/webhooks: add a bundled webhook ingress plugin so external automation can create and drive bound TaskFlows through per-route shared-secret endpoints. Thanks @mbelinky.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
@@ -115,6 +115,40 @@ await api.runtime.subagent.deleteSession({
|
||||
Untrusted plugins can still run subagents, but override requests are rejected.
|
||||
</Warning>
|
||||
|
||||
### `api.runtime.taskFlow`
|
||||
|
||||
Bind a TaskFlow runtime to an existing OpenClaw session key or trusted tool
|
||||
context, then create and manage TaskFlows without passing an owner on every call.
|
||||
|
||||
```typescript
|
||||
const taskFlow = api.runtime.taskFlow.fromToolContext(ctx);
|
||||
|
||||
const created = taskFlow.createManaged({
|
||||
controllerId: "my-plugin/review-batch",
|
||||
goal: "Review new pull requests",
|
||||
});
|
||||
|
||||
const child = taskFlow.runTask({
|
||||
flowId: created.flowId,
|
||||
runtime: "acp",
|
||||
childSessionKey: "agent:main:subagent:reviewer",
|
||||
task: "Review PR #123",
|
||||
status: "running",
|
||||
startedAt: Date.now(),
|
||||
});
|
||||
|
||||
const waiting = taskFlow.setWaiting({
|
||||
flowId: created.flowId,
|
||||
expectedRevision: created.revision,
|
||||
currentStep: "await-human-reply",
|
||||
waitJson: { kind: "reply", channel: "telegram" },
|
||||
});
|
||||
```
|
||||
|
||||
Use `bindSession({ sessionKey, requesterOrigin })` when you already have a
|
||||
trusted OpenClaw session key from your own binding layer. Do not bind from raw
|
||||
user input.
|
||||
|
||||
### `api.runtime.tts`
|
||||
|
||||
Text-to-speech synthesis.
|
||||
|
||||
6
extensions/webhooks/api.ts
Normal file
6
extensions/webhooks/api.ts
Normal file
@@ -0,0 +1,6 @@
|
||||
export {
|
||||
definePluginEntry,
|
||||
type OpenClawPluginApi,
|
||||
type PluginLogger,
|
||||
type PluginRuntime,
|
||||
} from "openclaw/plugin-sdk/core";
|
||||
50
extensions/webhooks/index.ts
Normal file
50
extensions/webhooks/index.ts
Normal file
@@ -0,0 +1,50 @@
|
||||
import { definePluginEntry, type OpenClawPluginApi } from "./api.js";
|
||||
import { resolveWebhooksPluginConfig } from "./src/config.js";
|
||||
import { createTaskFlowWebhookRequestHandler, type TaskFlowWebhookTarget } from "./src/http.js";
|
||||
|
||||
export default definePluginEntry({
|
||||
id: "webhooks",
|
||||
name: "Webhooks",
|
||||
description:
|
||||
"Authenticated inbound webhooks that bind external automation to OpenClaw TaskFlows.",
|
||||
async register(api: OpenClawPluginApi) {
|
||||
const routes = await resolveWebhooksPluginConfig({
|
||||
pluginConfig: api.pluginConfig,
|
||||
cfg: api.config,
|
||||
env: process.env,
|
||||
logger: api.logger,
|
||||
});
|
||||
if (routes.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const targetsByPath = new Map<string, TaskFlowWebhookTarget[]>();
|
||||
const handler = createTaskFlowWebhookRequestHandler({
|
||||
cfg: api.config,
|
||||
targetsByPath,
|
||||
});
|
||||
|
||||
for (const route of routes) {
|
||||
const taskFlow = api.runtime.taskFlow.bindSession({
|
||||
sessionKey: route.sessionKey,
|
||||
});
|
||||
const target: TaskFlowWebhookTarget = {
|
||||
routeId: route.routeId,
|
||||
path: route.path,
|
||||
secret: route.secret,
|
||||
defaultControllerId: route.controllerId,
|
||||
taskFlow,
|
||||
};
|
||||
targetsByPath.set(target.path, [...(targetsByPath.get(target.path) ?? []), target]);
|
||||
api.registerHttpRoute({
|
||||
path: target.path,
|
||||
auth: "plugin",
|
||||
match: "exact",
|
||||
handler,
|
||||
});
|
||||
api.logger.info?.(
|
||||
`[webhooks] registered route ${route.routeId} on ${route.path} for session ${route.sessionKey}`,
|
||||
);
|
||||
}
|
||||
},
|
||||
});
|
||||
48
extensions/webhooks/openclaw.plugin.json
Normal file
48
extensions/webhooks/openclaw.plugin.json
Normal file
@@ -0,0 +1,48 @@
|
||||
{
|
||||
"id": "webhooks",
|
||||
"name": "Webhooks",
|
||||
"description": "Authenticated inbound webhooks that bind external automation to OpenClaw TaskFlows.",
|
||||
"configSchema": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"$defs": {
|
||||
"secretRef": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"source": {
|
||||
"type": "string",
|
||||
"enum": ["env", "file", "exec"]
|
||||
},
|
||||
"provider": { "type": "string" },
|
||||
"id": { "type": "string" }
|
||||
},
|
||||
"required": ["source", "provider", "id"]
|
||||
},
|
||||
"secretInput": {
|
||||
"anyOf": [{ "type": "string", "minLength": 1 }, { "$ref": "#/$defs/secretRef" }]
|
||||
},
|
||||
"route": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"enabled": { "type": "boolean" },
|
||||
"path": { "type": "string", "minLength": 1 },
|
||||
"sessionKey": { "type": "string", "minLength": 1 },
|
||||
"secret": { "$ref": "#/$defs/secretInput" },
|
||||
"controllerId": { "type": "string", "minLength": 1 },
|
||||
"description": { "type": "string" }
|
||||
},
|
||||
"required": ["sessionKey", "secret"]
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"routes": {
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"$ref": "#/$defs/route"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
18
extensions/webhooks/package.json
Normal file
18
extensions/webhooks/package.json
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"name": "@openclaw/webhooks",
|
||||
"version": "2026.4.1-beta.1",
|
||||
"private": true,
|
||||
"description": "OpenClaw webhook bridge plugin",
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"zod": "^4.3.6"
|
||||
},
|
||||
"openclaw": {
|
||||
"bundle": {
|
||||
"stageRuntimeDependencies": true
|
||||
},
|
||||
"extensions": [
|
||||
"./index.ts"
|
||||
]
|
||||
}
|
||||
}
|
||||
16
extensions/webhooks/runtime-api.ts
Normal file
16
extensions/webhooks/runtime-api.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
export {
|
||||
createFixedWindowRateLimiter,
|
||||
createWebhookInFlightLimiter,
|
||||
normalizeWebhookPath,
|
||||
readJsonWebhookBodyOrReject,
|
||||
resolveRequestClientIp,
|
||||
resolveWebhookTargetWithAuthOrRejectSync,
|
||||
withResolvedWebhookRequestPipeline,
|
||||
WEBHOOK_IN_FLIGHT_DEFAULTS,
|
||||
WEBHOOK_RATE_LIMIT_DEFAULTS,
|
||||
type WebhookInFlightLimiter,
|
||||
} from "openclaw/plugin-sdk/webhook-ingress";
|
||||
export {
|
||||
resolveConfiguredSecretInputString,
|
||||
type OpenClawConfig,
|
||||
} from "openclaw/plugin-sdk/config-runtime";
|
||||
86
extensions/webhooks/src/config.test.ts
Normal file
86
extensions/webhooks/src/config.test.ts
Normal file
@@ -0,0 +1,86 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../runtime-api.js";
|
||||
import { resolveWebhooksPluginConfig } from "./config.js";
|
||||
|
||||
describe("resolveWebhooksPluginConfig", () => {
|
||||
it("resolves default paths and SecretRef-backed secrets", async () => {
|
||||
const routes = await resolveWebhooksPluginConfig({
|
||||
pluginConfig: {
|
||||
routes: {
|
||||
zapier: {
|
||||
sessionKey: "agent:main:main",
|
||||
secret: {
|
||||
source: "env",
|
||||
provider: "default",
|
||||
id: "OPENCLAW_WEBHOOK_SECRET",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
cfg: {} as OpenClawConfig,
|
||||
env: {
|
||||
OPENCLAW_WEBHOOK_SECRET: "shared-secret",
|
||||
},
|
||||
});
|
||||
|
||||
expect(routes).toEqual([
|
||||
{
|
||||
routeId: "zapier",
|
||||
path: "/plugins/webhooks/zapier",
|
||||
sessionKey: "agent:main:main",
|
||||
secret: "shared-secret",
|
||||
controllerId: "webhooks/zapier",
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("skips routes whose secret cannot be resolved", async () => {
|
||||
const warn = vi.fn();
|
||||
|
||||
const routes = await resolveWebhooksPluginConfig({
|
||||
pluginConfig: {
|
||||
routes: {
|
||||
missing: {
|
||||
sessionKey: "agent:main:main",
|
||||
secret: {
|
||||
source: "env",
|
||||
provider: "default",
|
||||
id: "MISSING_SECRET",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
cfg: {} as OpenClawConfig,
|
||||
env: {},
|
||||
logger: { warn } as never,
|
||||
});
|
||||
|
||||
expect(routes).toEqual([]);
|
||||
expect(warn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("[webhooks] skipping route missing:"),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects duplicate normalized paths", async () => {
|
||||
await expect(
|
||||
resolveWebhooksPluginConfig({
|
||||
pluginConfig: {
|
||||
routes: {
|
||||
first: {
|
||||
path: "/plugins/webhooks/shared",
|
||||
sessionKey: "agent:main:main",
|
||||
secret: "a",
|
||||
},
|
||||
second: {
|
||||
path: "/plugins/webhooks/shared/",
|
||||
sessionKey: "agent:main:other",
|
||||
secret: "b",
|
||||
},
|
||||
},
|
||||
},
|
||||
cfg: {} as OpenClawConfig,
|
||||
env: {},
|
||||
}),
|
||||
).rejects.toThrow(/conflicts with routes\.first\.path/i);
|
||||
});
|
||||
});
|
||||
95
extensions/webhooks/src/config.ts
Normal file
95
extensions/webhooks/src/config.ts
Normal file
@@ -0,0 +1,95 @@
|
||||
import { z } from "zod";
|
||||
import type { PluginLogger } from "../api.js";
|
||||
import {
|
||||
normalizeWebhookPath,
|
||||
resolveConfiguredSecretInputString,
|
||||
type OpenClawConfig,
|
||||
} from "../runtime-api.js";
|
||||
|
||||
const secretRefSchema = z
|
||||
.object({
|
||||
source: z.enum(["env", "file", "exec"]),
|
||||
provider: z.string().trim().min(1),
|
||||
id: z.string().trim().min(1),
|
||||
})
|
||||
.strict();
|
||||
|
||||
const secretInputSchema = z.union([z.string().trim().min(1), secretRefSchema]);
|
||||
|
||||
const webhookRouteConfigSchema = z
|
||||
.object({
|
||||
enabled: z.boolean().optional().default(true),
|
||||
path: z.string().trim().min(1).optional(),
|
||||
sessionKey: z.string().trim().min(1),
|
||||
secret: secretInputSchema,
|
||||
controllerId: z.string().trim().min(1).optional(),
|
||||
description: z.string().trim().min(1).optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
const webhooksPluginConfigSchema = z
|
||||
.object({
|
||||
routes: z.record(z.string().trim().min(1), webhookRouteConfigSchema).default({}),
|
||||
})
|
||||
.strict();
|
||||
|
||||
export type ResolvedWebhookRouteConfig = {
|
||||
routeId: string;
|
||||
path: string;
|
||||
sessionKey: string;
|
||||
secret: string;
|
||||
controllerId: string;
|
||||
description?: string;
|
||||
};
|
||||
|
||||
export async function resolveWebhooksPluginConfig(params: {
|
||||
pluginConfig: unknown;
|
||||
cfg: OpenClawConfig;
|
||||
env: NodeJS.ProcessEnv;
|
||||
logger?: PluginLogger;
|
||||
}): Promise<ResolvedWebhookRouteConfig[]> {
|
||||
const parsed = webhooksPluginConfigSchema.parse(params.pluginConfig ?? {});
|
||||
const resolvedRoutes: ResolvedWebhookRouteConfig[] = [];
|
||||
const seenPaths = new Map<string, string>();
|
||||
|
||||
for (const [routeId, route] of Object.entries(parsed.routes)) {
|
||||
if (route.enabled === false) {
|
||||
continue;
|
||||
}
|
||||
const path = normalizeWebhookPath(route.path ?? `/plugins/webhooks/${routeId}`);
|
||||
const existingRouteId = seenPaths.get(path);
|
||||
if (existingRouteId) {
|
||||
throw new Error(
|
||||
`webhooks.routes.${routeId}.path conflicts with routes.${existingRouteId}.path (${path}).`,
|
||||
);
|
||||
}
|
||||
|
||||
const secretResolution = await resolveConfiguredSecretInputString({
|
||||
config: params.cfg,
|
||||
env: params.env,
|
||||
value: route.secret,
|
||||
path: `plugins.entries.webhooks.routes.${routeId}.secret`,
|
||||
});
|
||||
const secret = secretResolution.value?.trim();
|
||||
if (!secret) {
|
||||
params.logger?.warn?.(
|
||||
`[webhooks] skipping route ${routeId}: ${
|
||||
secretResolution.unresolvedRefReason ?? "secret is empty or unresolved"
|
||||
}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
seenPaths.set(path, routeId);
|
||||
resolvedRoutes.push({
|
||||
routeId,
|
||||
path,
|
||||
sessionKey: route.sessionKey,
|
||||
secret,
|
||||
controllerId: route.controllerId ?? `webhooks/${routeId}`,
|
||||
...(route.description ? { description: route.description } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
return resolvedRoutes;
|
||||
}
|
||||
375
extensions/webhooks/src/http.test.ts
Normal file
375
extensions/webhooks/src/http.test.ts
Normal file
@@ -0,0 +1,375 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import type { IncomingMessage } from "node:http";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { createRuntimeTaskFlow } from "../../../src/plugins/runtime/runtime-taskflow.js";
|
||||
import { createMockServerResponse } from "../../../src/test-utils/mock-http-response.js";
|
||||
import type { OpenClawConfig } from "../runtime-api.js";
|
||||
import { createTaskFlowWebhookRequestHandler, type TaskFlowWebhookTarget } from "./http.js";
|
||||
|
||||
const hoisted = vi.hoisted(() => {
|
||||
const sendMessageMock = vi.fn();
|
||||
const cancelSessionMock = vi.fn();
|
||||
const killSubagentRunAdminMock = vi.fn();
|
||||
return {
|
||||
sendMessageMock,
|
||||
cancelSessionMock,
|
||||
killSubagentRunAdminMock,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../../../src/tasks/task-registry-delivery-runtime.js", () => ({
|
||||
sendMessage: hoisted.sendMessageMock,
|
||||
}));
|
||||
|
||||
vi.mock("../../../src/acp/control-plane/manager.js", () => ({
|
||||
getAcpSessionManager: () => ({
|
||||
cancelSession: hoisted.cancelSessionMock,
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../../../src/agents/subagent-control.js", () => ({
|
||||
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
|
||||
}));
|
||||
|
||||
type MockIncomingMessage = IncomingMessage & {
|
||||
destroyed?: boolean;
|
||||
destroy: () => MockIncomingMessage;
|
||||
socket: { remoteAddress: string };
|
||||
};
|
||||
|
||||
let nextSessionId = 0;
|
||||
|
||||
function createJsonRequest(params: {
|
||||
path: string;
|
||||
secret?: string;
|
||||
body: unknown;
|
||||
}): MockIncomingMessage {
|
||||
const req = new EventEmitter() as MockIncomingMessage;
|
||||
req.method = "POST";
|
||||
req.url = params.path;
|
||||
req.headers = {
|
||||
"content-type": "application/json",
|
||||
...(params.secret ? { "x-openclaw-webhook-secret": params.secret } : {}),
|
||||
};
|
||||
req.socket = { remoteAddress: "127.0.0.1" } as MockIncomingMessage["socket"];
|
||||
req.destroyed = false;
|
||||
req.destroy = (() => {
|
||||
req.destroyed = true;
|
||||
return req;
|
||||
}) as MockIncomingMessage["destroy"];
|
||||
|
||||
void Promise.resolve().then(() => {
|
||||
req.emit("data", Buffer.from(JSON.stringify(params.body), "utf8"));
|
||||
req.emit("end");
|
||||
});
|
||||
|
||||
return req;
|
||||
}
|
||||
|
||||
function createHandler(): {
|
||||
handler: ReturnType<typeof createTaskFlowWebhookRequestHandler>;
|
||||
target: TaskFlowWebhookTarget;
|
||||
} {
|
||||
const runtime = createRuntimeTaskFlow();
|
||||
nextSessionId += 1;
|
||||
const target: TaskFlowWebhookTarget = {
|
||||
routeId: "zapier",
|
||||
path: "/plugins/webhooks/zapier",
|
||||
secret: "shared-secret",
|
||||
defaultControllerId: "webhooks/zapier",
|
||||
taskFlow: runtime.bindSession({
|
||||
sessionKey: `agent:main:webhook-test-${String(nextSessionId)}`,
|
||||
}),
|
||||
};
|
||||
const targetsByPath = new Map<string, TaskFlowWebhookTarget[]>([[target.path, [target]]]);
|
||||
return {
|
||||
handler: createTaskFlowWebhookRequestHandler({
|
||||
cfg: {} as OpenClawConfig,
|
||||
targetsByPath,
|
||||
}),
|
||||
target,
|
||||
};
|
||||
}
|
||||
|
||||
async function dispatchJsonRequest(params: {
|
||||
handler: ReturnType<typeof createTaskFlowWebhookRequestHandler>;
|
||||
path: string;
|
||||
secret?: string;
|
||||
body: unknown;
|
||||
}) {
|
||||
const req = createJsonRequest({
|
||||
path: params.path,
|
||||
secret: params.secret,
|
||||
body: params.body,
|
||||
});
|
||||
const res = createMockServerResponse();
|
||||
await params.handler(req, res);
|
||||
return res;
|
||||
}
|
||||
|
||||
function parseJsonBody(res: { body?: string | Buffer | null }) {
|
||||
return JSON.parse(String(res.body ?? ""));
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("createTaskFlowWebhookRequestHandler", () => {
|
||||
it("rejects requests with the wrong secret", async () => {
|
||||
const { handler, target } = createHandler();
|
||||
const res = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: "wrong-secret",
|
||||
body: {
|
||||
action: "list_flows",
|
||||
},
|
||||
});
|
||||
|
||||
expect(res.statusCode).toBe(401);
|
||||
expect(res.body).toBe("unauthorized");
|
||||
expect(target.taskFlow.list()).toEqual([]);
|
||||
});
|
||||
|
||||
it("creates flows through the bound session and scrubs owner metadata from responses", async () => {
|
||||
const { handler, target } = createHandler();
|
||||
const res = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: target.secret,
|
||||
body: {
|
||||
action: "create_flow",
|
||||
goal: "Review inbound queue",
|
||||
},
|
||||
});
|
||||
|
||||
expect(res.statusCode).toBe(200);
|
||||
const parsed = parseJsonBody(res);
|
||||
expect(parsed.ok).toBe(true);
|
||||
expect(parsed.result.flow).toMatchObject({
|
||||
syncMode: "managed",
|
||||
controllerId: "webhooks/zapier",
|
||||
goal: "Review inbound queue",
|
||||
});
|
||||
expect(parsed.result.flow.ownerKey).toBeUndefined();
|
||||
expect(parsed.result.flow.requesterOrigin).toBeUndefined();
|
||||
expect(target.taskFlow.get(parsed.result.flow.flowId)?.flowId).toBe(parsed.result.flow.flowId);
|
||||
});
|
||||
|
||||
it("runs child tasks and scrubs task ownership fields from responses", async () => {
|
||||
const { handler, target } = createHandler();
|
||||
const flow = target.taskFlow.createManaged({
|
||||
controllerId: "webhooks/zapier",
|
||||
goal: "Triage inbox",
|
||||
});
|
||||
const res = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: target.secret,
|
||||
body: {
|
||||
action: "run_task",
|
||||
flowId: flow.flowId,
|
||||
runtime: "acp",
|
||||
childSessionKey: "agent:main:subagent:child",
|
||||
task: "Inspect the next message batch",
|
||||
status: "running",
|
||||
startedAt: 10,
|
||||
lastEventAt: 10,
|
||||
},
|
||||
});
|
||||
|
||||
expect(res.statusCode).toBe(200);
|
||||
const parsed = parseJsonBody(res);
|
||||
expect(parsed.ok).toBe(true);
|
||||
expect(parsed.result.created).toBe(true);
|
||||
expect(parsed.result.task).toMatchObject({
|
||||
parentFlowId: flow.flowId,
|
||||
childSessionKey: "agent:main:subagent:child",
|
||||
runtime: "acp",
|
||||
});
|
||||
expect(parsed.result.task.ownerKey).toBeUndefined();
|
||||
expect(parsed.result.task.requesterSessionKey).toBeUndefined();
|
||||
});
|
||||
|
||||
it("returns 404 for missing flow mutations", async () => {
|
||||
const { handler, target } = createHandler();
|
||||
const res = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: target.secret,
|
||||
body: {
|
||||
action: "set_waiting",
|
||||
flowId: "flow-missing",
|
||||
expectedRevision: 0,
|
||||
},
|
||||
});
|
||||
|
||||
expect(res.statusCode).toBe(404);
|
||||
const parsed = parseJsonBody(res);
|
||||
expect(parsed).toMatchObject({
|
||||
ok: false,
|
||||
code: "not_found",
|
||||
error: "TaskFlow not found.",
|
||||
result: {
|
||||
applied: false,
|
||||
code: "not_found",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("returns 409 for revision conflicts", async () => {
|
||||
const { handler, target } = createHandler();
|
||||
const flow = target.taskFlow.createManaged({
|
||||
controllerId: "webhooks/zapier",
|
||||
goal: "Review inbox",
|
||||
});
|
||||
const res = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: target.secret,
|
||||
body: {
|
||||
action: "set_waiting",
|
||||
flowId: flow.flowId,
|
||||
expectedRevision: flow.revision + 1,
|
||||
},
|
||||
});
|
||||
|
||||
expect(res.statusCode).toBe(409);
|
||||
const parsed = parseJsonBody(res);
|
||||
expect(parsed).toMatchObject({
|
||||
ok: false,
|
||||
code: "revision_conflict",
|
||||
result: {
|
||||
applied: false,
|
||||
code: "revision_conflict",
|
||||
current: {
|
||||
flowId: flow.flowId,
|
||||
revision: flow.revision,
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects internal runtimes and running-only metadata from external callers", async () => {
|
||||
const { handler, target } = createHandler();
|
||||
const flow = target.taskFlow.createManaged({
|
||||
controllerId: "webhooks/zapier",
|
||||
goal: "Review inbox",
|
||||
});
|
||||
|
||||
const runtimeRes = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: target.secret,
|
||||
body: {
|
||||
action: "run_task",
|
||||
flowId: flow.flowId,
|
||||
runtime: "cli",
|
||||
task: "Inspect queue",
|
||||
},
|
||||
});
|
||||
expect(runtimeRes.statusCode).toBe(400);
|
||||
expect(parseJsonBody(runtimeRes)).toMatchObject({
|
||||
ok: false,
|
||||
code: "invalid_request",
|
||||
});
|
||||
|
||||
const queuedMetadataRes = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: target.secret,
|
||||
body: {
|
||||
action: "run_task",
|
||||
flowId: flow.flowId,
|
||||
runtime: "acp",
|
||||
task: "Inspect queue",
|
||||
startedAt: 10,
|
||||
},
|
||||
});
|
||||
expect(queuedMetadataRes.statusCode).toBe(400);
|
||||
expect(parseJsonBody(queuedMetadataRes)).toMatchObject({
|
||||
ok: false,
|
||||
code: "invalid_request",
|
||||
error:
|
||||
"status: status must be running when startedAt, lastEventAt, or progressSummary is provided",
|
||||
});
|
||||
});
|
||||
|
||||
it("reuses the same task record when retried with the same runId", async () => {
|
||||
const { handler, target } = createHandler();
|
||||
const flow = target.taskFlow.createManaged({
|
||||
controllerId: "webhooks/zapier",
|
||||
goal: "Triage inbox",
|
||||
});
|
||||
|
||||
const first = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: target.secret,
|
||||
body: {
|
||||
action: "run_task",
|
||||
flowId: flow.flowId,
|
||||
runtime: "acp",
|
||||
childSessionKey: "agent:main:subagent:child",
|
||||
runId: "retry-me",
|
||||
task: "Inspect the next message batch",
|
||||
},
|
||||
});
|
||||
const second = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: target.secret,
|
||||
body: {
|
||||
action: "run_task",
|
||||
flowId: flow.flowId,
|
||||
runtime: "acp",
|
||||
childSessionKey: "agent:main:subagent:child",
|
||||
runId: "retry-me",
|
||||
task: "Inspect the next message batch",
|
||||
},
|
||||
});
|
||||
|
||||
expect(first.statusCode).toBe(200);
|
||||
expect(second.statusCode).toBe(200);
|
||||
const firstParsed = parseJsonBody(first);
|
||||
const secondParsed = parseJsonBody(second);
|
||||
expect(firstParsed.result.task.taskId).toBe(secondParsed.result.task.taskId);
|
||||
expect(target.taskFlow.getTaskSummary(flow.flowId)?.total).toBe(1);
|
||||
});
|
||||
|
||||
it("returns 409 when cancellation targets a terminal flow", async () => {
|
||||
const { handler, target } = createHandler();
|
||||
const flow = target.taskFlow.createManaged({
|
||||
controllerId: "webhooks/zapier",
|
||||
goal: "Review inbox",
|
||||
});
|
||||
const finished = target.taskFlow.finish({
|
||||
flowId: flow.flowId,
|
||||
expectedRevision: flow.revision,
|
||||
});
|
||||
expect(finished.applied).toBe(true);
|
||||
|
||||
const res = await dispatchJsonRequest({
|
||||
handler,
|
||||
path: target.path,
|
||||
secret: target.secret,
|
||||
body: {
|
||||
action: "cancel_flow",
|
||||
flowId: flow.flowId,
|
||||
},
|
||||
});
|
||||
|
||||
expect(res.statusCode).toBe(409);
|
||||
expect(parseJsonBody(res)).toMatchObject({
|
||||
ok: false,
|
||||
code: "terminal",
|
||||
error: "Flow is already succeeded.",
|
||||
result: {
|
||||
found: true,
|
||||
cancelled: false,
|
||||
reason: "Flow is already succeeded.",
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
805
extensions/webhooks/src/http.ts
Normal file
805
extensions/webhooks/src/http.ts
Normal file
@@ -0,0 +1,805 @@
|
||||
import { timingSafeEqual } from "node:crypto";
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import { z } from "zod";
|
||||
import type { PluginRuntime } from "../api.js";
|
||||
import {
|
||||
createFixedWindowRateLimiter,
|
||||
createWebhookInFlightLimiter,
|
||||
readJsonWebhookBodyOrReject,
|
||||
resolveRequestClientIp,
|
||||
resolveWebhookTargetWithAuthOrRejectSync,
|
||||
withResolvedWebhookRequestPipeline,
|
||||
WEBHOOK_IN_FLIGHT_DEFAULTS,
|
||||
WEBHOOK_RATE_LIMIT_DEFAULTS,
|
||||
type OpenClawConfig,
|
||||
type WebhookInFlightLimiter,
|
||||
} from "../runtime-api.js";
|
||||
|
||||
type BoundTaskFlowRuntime = ReturnType<PluginRuntime["taskFlow"]["bindSession"]>;
|
||||
|
||||
type JsonValue = null | boolean | number | string | JsonValue[] | { [key: string]: JsonValue };
|
||||
|
||||
const jsonValueSchema: z.ZodType<JsonValue> = z.lazy(() =>
|
||||
z.union([
|
||||
z.null(),
|
||||
z.boolean(),
|
||||
z.number().finite(),
|
||||
z.string(),
|
||||
z.array(jsonValueSchema),
|
||||
z.record(z.string(), jsonValueSchema),
|
||||
]),
|
||||
);
|
||||
|
||||
const nullableStringSchema = z.string().trim().min(1).nullable().optional();
|
||||
|
||||
const createFlowRequestSchema = z
|
||||
.object({
|
||||
action: z.literal("create_flow"),
|
||||
controllerId: z.string().trim().min(1).optional(),
|
||||
goal: z.string().trim().min(1),
|
||||
status: z.enum(["queued", "running", "waiting", "blocked"]).optional(),
|
||||
notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(),
|
||||
currentStep: nullableStringSchema,
|
||||
stateJson: jsonValueSchema.nullable().optional(),
|
||||
waitJson: jsonValueSchema.nullable().optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
const getFlowRequestSchema = z
|
||||
.object({ action: z.literal("get_flow"), flowId: z.string().trim().min(1) })
|
||||
.strict();
|
||||
const listFlowsRequestSchema = z.object({ action: z.literal("list_flows") }).strict();
|
||||
const findLatestFlowRequestSchema = z.object({ action: z.literal("find_latest_flow") }).strict();
|
||||
const resolveFlowRequestSchema = z
|
||||
.object({ action: z.literal("resolve_flow"), token: z.string().trim().min(1) })
|
||||
.strict();
|
||||
const getTaskSummaryRequestSchema = z
|
||||
.object({ action: z.literal("get_task_summary"), flowId: z.string().trim().min(1) })
|
||||
.strict();
|
||||
|
||||
const setWaitingRequestSchema = z
|
||||
.object({
|
||||
action: z.literal("set_waiting"),
|
||||
flowId: z.string().trim().min(1),
|
||||
expectedRevision: z.number().int().nonnegative(),
|
||||
currentStep: nullableStringSchema,
|
||||
stateJson: jsonValueSchema.nullable().optional(),
|
||||
waitJson: jsonValueSchema.nullable().optional(),
|
||||
blockedTaskId: nullableStringSchema,
|
||||
blockedSummary: nullableStringSchema,
|
||||
})
|
||||
.strict();
|
||||
|
||||
const resumeFlowRequestSchema = z
|
||||
.object({
|
||||
action: z.literal("resume_flow"),
|
||||
flowId: z.string().trim().min(1),
|
||||
expectedRevision: z.number().int().nonnegative(),
|
||||
status: z.enum(["queued", "running"]).optional(),
|
||||
currentStep: nullableStringSchema,
|
||||
stateJson: jsonValueSchema.nullable().optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
const finishFlowRequestSchema = z
|
||||
.object({
|
||||
action: z.literal("finish_flow"),
|
||||
flowId: z.string().trim().min(1),
|
||||
expectedRevision: z.number().int().nonnegative(),
|
||||
stateJson: jsonValueSchema.nullable().optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
const failFlowRequestSchema = z
|
||||
.object({
|
||||
action: z.literal("fail_flow"),
|
||||
flowId: z.string().trim().min(1),
|
||||
expectedRevision: z.number().int().nonnegative(),
|
||||
stateJson: jsonValueSchema.nullable().optional(),
|
||||
blockedTaskId: nullableStringSchema,
|
||||
blockedSummary: nullableStringSchema,
|
||||
})
|
||||
.strict();
|
||||
|
||||
const requestCancelRequestSchema = z
|
||||
.object({
|
||||
action: z.literal("request_cancel"),
|
||||
flowId: z.string().trim().min(1),
|
||||
expectedRevision: z.number().int().nonnegative(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
const cancelFlowRequestSchema = z
|
||||
.object({
|
||||
action: z.literal("cancel_flow"),
|
||||
flowId: z.string().trim().min(1),
|
||||
})
|
||||
.strict();
|
||||
|
||||
const runTaskRequestSchema = z
|
||||
.object({
|
||||
action: z.literal("run_task"),
|
||||
flowId: z.string().trim().min(1),
|
||||
runtime: z.enum(["subagent", "acp"]),
|
||||
sourceId: z.string().trim().min(1).optional(),
|
||||
childSessionKey: z.string().trim().min(1).optional(),
|
||||
parentTaskId: z.string().trim().min(1).optional(),
|
||||
agentId: z.string().trim().min(1).optional(),
|
||||
runId: z.string().trim().min(1).optional(),
|
||||
label: z.string().trim().min(1).optional(),
|
||||
task: z.string().trim().min(1),
|
||||
preferMetadata: z.boolean().optional(),
|
||||
notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(),
|
||||
status: z.enum(["queued", "running"]).optional(),
|
||||
startedAt: z.number().int().nonnegative().optional(),
|
||||
lastEventAt: z.number().int().nonnegative().optional(),
|
||||
progressSummary: nullableStringSchema,
|
||||
})
|
||||
.strict()
|
||||
.superRefine((value, ctx) => {
|
||||
if (
|
||||
value.status !== "running" &&
|
||||
(value.startedAt !== undefined ||
|
||||
value.lastEventAt !== undefined ||
|
||||
value.progressSummary !== undefined)
|
||||
) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message:
|
||||
"status must be running when startedAt, lastEventAt, or progressSummary is provided",
|
||||
path: ["status"],
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const webhookActionSchema = z.discriminatedUnion("action", [
|
||||
createFlowRequestSchema,
|
||||
getFlowRequestSchema,
|
||||
listFlowsRequestSchema,
|
||||
findLatestFlowRequestSchema,
|
||||
resolveFlowRequestSchema,
|
||||
getTaskSummaryRequestSchema,
|
||||
setWaitingRequestSchema,
|
||||
resumeFlowRequestSchema,
|
||||
finishFlowRequestSchema,
|
||||
failFlowRequestSchema,
|
||||
requestCancelRequestSchema,
|
||||
cancelFlowRequestSchema,
|
||||
runTaskRequestSchema,
|
||||
]);
|
||||
|
||||
type WebhookAction = z.infer<typeof webhookActionSchema>;
|
||||
|
||||
export type TaskFlowWebhookTarget = {
|
||||
routeId: string;
|
||||
path: string;
|
||||
secret: string;
|
||||
defaultControllerId: string;
|
||||
taskFlow: BoundTaskFlowRuntime;
|
||||
};
|
||||
|
||||
type FlowView = {
|
||||
flowId: string;
|
||||
syncMode: "task_mirrored" | "managed";
|
||||
controllerId?: string;
|
||||
revision: number;
|
||||
status: string;
|
||||
notifyPolicy: string;
|
||||
goal: string;
|
||||
currentStep?: string;
|
||||
blockedTaskId?: string;
|
||||
blockedSummary?: string;
|
||||
stateJson?: JsonValue;
|
||||
waitJson?: JsonValue;
|
||||
cancelRequestedAt?: number;
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
endedAt?: number;
|
||||
};
|
||||
|
||||
type TaskView = {
|
||||
taskId: string;
|
||||
runtime: string;
|
||||
sourceId?: string;
|
||||
scopeKind: string;
|
||||
childSessionKey?: string;
|
||||
parentFlowId?: string;
|
||||
parentTaskId?: string;
|
||||
agentId?: string;
|
||||
runId?: string;
|
||||
label?: string;
|
||||
task: string;
|
||||
status: string;
|
||||
deliveryStatus: string;
|
||||
notifyPolicy: string;
|
||||
createdAt: number;
|
||||
startedAt?: number;
|
||||
endedAt?: number;
|
||||
lastEventAt?: number;
|
||||
cleanupAfter?: number;
|
||||
error?: string;
|
||||
progressSummary?: string;
|
||||
terminalSummary?: string;
|
||||
terminalOutcome?: string;
|
||||
};
|
||||
|
||||
function toFlowView(flow: {
|
||||
flowId: string;
|
||||
syncMode: "task_mirrored" | "managed";
|
||||
controllerId?: string;
|
||||
revision: number;
|
||||
status: string;
|
||||
notifyPolicy: string;
|
||||
goal: string;
|
||||
currentStep?: string;
|
||||
blockedTaskId?: string;
|
||||
blockedSummary?: string;
|
||||
stateJson?: JsonValue;
|
||||
waitJson?: JsonValue;
|
||||
cancelRequestedAt?: number;
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
endedAt?: number;
|
||||
}): FlowView {
|
||||
return {
|
||||
flowId: flow.flowId,
|
||||
syncMode: flow.syncMode,
|
||||
...(flow.controllerId ? { controllerId: flow.controllerId } : {}),
|
||||
revision: flow.revision,
|
||||
status: flow.status,
|
||||
notifyPolicy: flow.notifyPolicy,
|
||||
goal: flow.goal,
|
||||
...(flow.currentStep ? { currentStep: flow.currentStep } : {}),
|
||||
...(flow.blockedTaskId ? { blockedTaskId: flow.blockedTaskId } : {}),
|
||||
...(flow.blockedSummary ? { blockedSummary: flow.blockedSummary } : {}),
|
||||
...(flow.stateJson !== undefined ? { stateJson: flow.stateJson } : {}),
|
||||
...(flow.waitJson !== undefined ? { waitJson: flow.waitJson } : {}),
|
||||
...(flow.cancelRequestedAt !== undefined ? { cancelRequestedAt: flow.cancelRequestedAt } : {}),
|
||||
createdAt: flow.createdAt,
|
||||
updatedAt: flow.updatedAt,
|
||||
...(flow.endedAt !== undefined ? { endedAt: flow.endedAt } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function toTaskView(task: {
|
||||
taskId: string;
|
||||
runtime: string;
|
||||
sourceId?: string;
|
||||
scopeKind: string;
|
||||
childSessionKey?: string;
|
||||
parentFlowId?: string;
|
||||
parentTaskId?: string;
|
||||
agentId?: string;
|
||||
runId?: string;
|
||||
label?: string;
|
||||
task: string;
|
||||
status: string;
|
||||
deliveryStatus: string;
|
||||
notifyPolicy: string;
|
||||
createdAt: number;
|
||||
startedAt?: number;
|
||||
endedAt?: number;
|
||||
lastEventAt?: number;
|
||||
cleanupAfter?: number;
|
||||
error?: string;
|
||||
progressSummary?: string;
|
||||
terminalSummary?: string;
|
||||
terminalOutcome?: string;
|
||||
}): TaskView {
|
||||
return {
|
||||
taskId: task.taskId,
|
||||
runtime: task.runtime,
|
||||
...(task.sourceId ? { sourceId: task.sourceId } : {}),
|
||||
scopeKind: task.scopeKind,
|
||||
...(task.childSessionKey ? { childSessionKey: task.childSessionKey } : {}),
|
||||
...(task.parentFlowId ? { parentFlowId: task.parentFlowId } : {}),
|
||||
...(task.parentTaskId ? { parentTaskId: task.parentTaskId } : {}),
|
||||
...(task.agentId ? { agentId: task.agentId } : {}),
|
||||
...(task.runId ? { runId: task.runId } : {}),
|
||||
...(task.label ? { label: task.label } : {}),
|
||||
task: task.task,
|
||||
status: task.status,
|
||||
deliveryStatus: task.deliveryStatus,
|
||||
notifyPolicy: task.notifyPolicy,
|
||||
createdAt: task.createdAt,
|
||||
...(task.startedAt !== undefined ? { startedAt: task.startedAt } : {}),
|
||||
...(task.endedAt !== undefined ? { endedAt: task.endedAt } : {}),
|
||||
...(task.lastEventAt !== undefined ? { lastEventAt: task.lastEventAt } : {}),
|
||||
...(task.cleanupAfter !== undefined ? { cleanupAfter: task.cleanupAfter } : {}),
|
||||
...(task.error ? { error: task.error } : {}),
|
||||
...(task.progressSummary ? { progressSummary: task.progressSummary } : {}),
|
||||
...(task.terminalSummary ? { terminalSummary: task.terminalSummary } : {}),
|
||||
...(task.terminalOutcome ? { terminalOutcome: task.terminalOutcome } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function writeJson(res: ServerResponse, statusCode: number, body: unknown): void {
|
||||
res.statusCode = statusCode;
|
||||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
res.end(JSON.stringify(body));
|
||||
}
|
||||
|
||||
function extractSharedSecret(req: IncomingMessage): string {
|
||||
const authHeader = Array.isArray(req.headers.authorization)
|
||||
? String(req.headers.authorization[0] ?? "")
|
||||
: String(req.headers.authorization ?? "");
|
||||
if (authHeader.toLowerCase().startsWith("bearer ")) {
|
||||
return authHeader.slice("bearer ".length).trim();
|
||||
}
|
||||
const sharedHeader = req.headers["x-openclaw-webhook-secret"];
|
||||
return Array.isArray(sharedHeader)
|
||||
? String(sharedHeader[0] ?? "").trim()
|
||||
: String(sharedHeader ?? "").trim();
|
||||
}
|
||||
|
||||
function timingSafeEquals(left: string, right: string): boolean {
|
||||
const leftBuffer = Buffer.from(left);
|
||||
const rightBuffer = Buffer.from(right);
|
||||
if (leftBuffer.length !== rightBuffer.length) {
|
||||
const paddedLength = Math.max(1, leftBuffer.length, rightBuffer.length);
|
||||
const paddedLeft = Buffer.alloc(paddedLength);
|
||||
const paddedRight = Buffer.alloc(paddedLength);
|
||||
leftBuffer.copy(paddedLeft);
|
||||
rightBuffer.copy(paddedRight);
|
||||
timingSafeEqual(paddedLeft, paddedRight);
|
||||
return false;
|
||||
}
|
||||
return timingSafeEqual(leftBuffer, rightBuffer);
|
||||
}
|
||||
|
||||
function formatZodError(error: z.ZodError): string {
|
||||
const firstIssue = error.issues[0];
|
||||
if (!firstIssue) {
|
||||
return "invalid request";
|
||||
}
|
||||
const path = firstIssue.path.length > 0 ? `${firstIssue.path.join(".")}: ` : "";
|
||||
return `${path}${firstIssue.message}`;
|
||||
}
|
||||
|
||||
function mapMutationResult(
|
||||
result:
|
||||
| {
|
||||
applied: true;
|
||||
flow: FlowView;
|
||||
}
|
||||
| {
|
||||
applied: false;
|
||||
code: string;
|
||||
current?: FlowView;
|
||||
},
|
||||
): unknown {
|
||||
return result;
|
||||
}
|
||||
|
||||
function mapMutationStatus(result: {
|
||||
applied: boolean;
|
||||
code?: "not_found" | "not_managed" | "revision_conflict";
|
||||
}): { statusCode: number; code?: string; error?: string } {
|
||||
if (result.applied) {
|
||||
return { statusCode: 200 };
|
||||
}
|
||||
switch (result.code) {
|
||||
case "not_found":
|
||||
return {
|
||||
statusCode: 404,
|
||||
code: "not_found",
|
||||
error: "TaskFlow not found.",
|
||||
};
|
||||
case "not_managed":
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "not_managed",
|
||||
error: "TaskFlow is not managed by this webhook surface.",
|
||||
};
|
||||
case "revision_conflict":
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "revision_conflict",
|
||||
error: "TaskFlow changed since the caller's expected revision.",
|
||||
};
|
||||
default:
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "mutation_rejected",
|
||||
error: "TaskFlow mutation was rejected.",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function mapRunTaskStatus(result: { created: boolean; found: boolean; reason?: string }): {
|
||||
statusCode: number;
|
||||
code?: string;
|
||||
error?: string;
|
||||
} {
|
||||
if (result.created) {
|
||||
return { statusCode: 200 };
|
||||
}
|
||||
if (!result.found) {
|
||||
return {
|
||||
statusCode: 404,
|
||||
code: "not_found",
|
||||
error: "TaskFlow not found.",
|
||||
};
|
||||
}
|
||||
if (result.reason === "Flow cancellation has already been requested.") {
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "cancel_requested",
|
||||
error: result.reason,
|
||||
};
|
||||
}
|
||||
if (result.reason === "Flow does not accept managed child tasks.") {
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "not_managed",
|
||||
error: result.reason,
|
||||
};
|
||||
}
|
||||
if (result.reason?.startsWith("Flow is already ")) {
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "terminal",
|
||||
error: result.reason,
|
||||
};
|
||||
}
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "task_not_created",
|
||||
error: result.reason ?? "TaskFlow task was not created.",
|
||||
};
|
||||
}
|
||||
|
||||
function mapCancelStatus(result: { found: boolean; cancelled: boolean; reason?: string }): {
|
||||
statusCode: number;
|
||||
code?: string;
|
||||
error?: string;
|
||||
} {
|
||||
if (result.cancelled) {
|
||||
return { statusCode: 200 };
|
||||
}
|
||||
if (!result.found) {
|
||||
return {
|
||||
statusCode: 404,
|
||||
code: "not_found",
|
||||
error: "TaskFlow not found.",
|
||||
};
|
||||
}
|
||||
if (result.reason === "One or more child tasks are still active.") {
|
||||
return {
|
||||
statusCode: 202,
|
||||
code: "cancel_pending",
|
||||
error: result.reason,
|
||||
};
|
||||
}
|
||||
if (result.reason === "Flow changed while cancellation was in progress.") {
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "revision_conflict",
|
||||
error: result.reason,
|
||||
};
|
||||
}
|
||||
if (result.reason?.startsWith("Flow is already ")) {
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "terminal",
|
||||
error: result.reason,
|
||||
};
|
||||
}
|
||||
return {
|
||||
statusCode: 409,
|
||||
code: "cancel_rejected",
|
||||
error: result.reason ?? "TaskFlow cancellation was rejected.",
|
||||
};
|
||||
}
|
||||
|
||||
function describeWebhookOutcome(params: { action: WebhookAction; result: unknown }): {
|
||||
statusCode: number;
|
||||
code?: string;
|
||||
error?: string;
|
||||
} {
|
||||
switch (params.action.action) {
|
||||
case "set_waiting":
|
||||
case "resume_flow":
|
||||
case "finish_flow":
|
||||
case "fail_flow":
|
||||
case "request_cancel":
|
||||
return mapMutationStatus(
|
||||
params.result as {
|
||||
applied: boolean;
|
||||
code?: "not_found" | "not_managed" | "revision_conflict";
|
||||
},
|
||||
);
|
||||
case "cancel_flow":
|
||||
return mapCancelStatus(
|
||||
params.result as {
|
||||
found: boolean;
|
||||
cancelled: boolean;
|
||||
reason?: string;
|
||||
},
|
||||
);
|
||||
case "run_task":
|
||||
return mapRunTaskStatus(
|
||||
params.result as {
|
||||
created: boolean;
|
||||
found: boolean;
|
||||
reason?: string;
|
||||
},
|
||||
);
|
||||
default:
|
||||
return { statusCode: 200 };
|
||||
}
|
||||
}
|
||||
|
||||
async function executeWebhookAction(params: {
|
||||
action: WebhookAction;
|
||||
target: TaskFlowWebhookTarget;
|
||||
cfg: OpenClawConfig;
|
||||
}): Promise<unknown> {
|
||||
const { action, target } = params;
|
||||
switch (action.action) {
|
||||
case "create_flow": {
|
||||
const flow = target.taskFlow.createManaged({
|
||||
controllerId: action.controllerId ?? target.defaultControllerId,
|
||||
goal: action.goal,
|
||||
status: action.status,
|
||||
notifyPolicy: action.notifyPolicy,
|
||||
currentStep: action.currentStep ?? undefined,
|
||||
stateJson: action.stateJson,
|
||||
waitJson: action.waitJson,
|
||||
});
|
||||
return { flow: toFlowView(flow) };
|
||||
}
|
||||
case "get_flow": {
|
||||
const flow = target.taskFlow.get(action.flowId);
|
||||
return { flow: flow ? toFlowView(flow) : null };
|
||||
}
|
||||
case "list_flows":
|
||||
return { flows: target.taskFlow.list().map(toFlowView) };
|
||||
case "find_latest_flow": {
|
||||
const flow = target.taskFlow.findLatest();
|
||||
return { flow: flow ? toFlowView(flow) : null };
|
||||
}
|
||||
case "resolve_flow": {
|
||||
const flow = target.taskFlow.resolve(action.token);
|
||||
return { flow: flow ? toFlowView(flow) : null };
|
||||
}
|
||||
case "get_task_summary":
|
||||
return { summary: target.taskFlow.getTaskSummary(action.flowId) ?? null };
|
||||
case "set_waiting": {
|
||||
const result = target.taskFlow.setWaiting({
|
||||
flowId: action.flowId,
|
||||
expectedRevision: action.expectedRevision,
|
||||
currentStep: action.currentStep,
|
||||
stateJson: action.stateJson,
|
||||
waitJson: action.waitJson,
|
||||
blockedTaskId: action.blockedTaskId,
|
||||
blockedSummary: action.blockedSummary,
|
||||
});
|
||||
return mapMutationResult(
|
||||
result.applied
|
||||
? { applied: true, flow: toFlowView(result.flow) }
|
||||
: {
|
||||
applied: false,
|
||||
code: result.code,
|
||||
...(result.current ? { current: toFlowView(result.current) } : {}),
|
||||
},
|
||||
);
|
||||
}
|
||||
case "resume_flow": {
|
||||
const result = target.taskFlow.resume({
|
||||
flowId: action.flowId,
|
||||
expectedRevision: action.expectedRevision,
|
||||
status: action.status,
|
||||
currentStep: action.currentStep,
|
||||
stateJson: action.stateJson,
|
||||
});
|
||||
return mapMutationResult(
|
||||
result.applied
|
||||
? { applied: true, flow: toFlowView(result.flow) }
|
||||
: {
|
||||
applied: false,
|
||||
code: result.code,
|
||||
...(result.current ? { current: toFlowView(result.current) } : {}),
|
||||
},
|
||||
);
|
||||
}
|
||||
case "finish_flow": {
|
||||
const result = target.taskFlow.finish({
|
||||
flowId: action.flowId,
|
||||
expectedRevision: action.expectedRevision,
|
||||
stateJson: action.stateJson,
|
||||
});
|
||||
return mapMutationResult(
|
||||
result.applied
|
||||
? { applied: true, flow: toFlowView(result.flow) }
|
||||
: {
|
||||
applied: false,
|
||||
code: result.code,
|
||||
...(result.current ? { current: toFlowView(result.current) } : {}),
|
||||
},
|
||||
);
|
||||
}
|
||||
case "fail_flow": {
|
||||
const result = target.taskFlow.fail({
|
||||
flowId: action.flowId,
|
||||
expectedRevision: action.expectedRevision,
|
||||
stateJson: action.stateJson,
|
||||
blockedTaskId: action.blockedTaskId,
|
||||
blockedSummary: action.blockedSummary,
|
||||
});
|
||||
return mapMutationResult(
|
||||
result.applied
|
||||
? { applied: true, flow: toFlowView(result.flow) }
|
||||
: {
|
||||
applied: false,
|
||||
code: result.code,
|
||||
...(result.current ? { current: toFlowView(result.current) } : {}),
|
||||
},
|
||||
);
|
||||
}
|
||||
case "request_cancel": {
|
||||
const result = target.taskFlow.requestCancel({
|
||||
flowId: action.flowId,
|
||||
expectedRevision: action.expectedRevision,
|
||||
});
|
||||
return mapMutationResult(
|
||||
result.applied
|
||||
? { applied: true, flow: toFlowView(result.flow) }
|
||||
: {
|
||||
applied: false,
|
||||
code: result.code,
|
||||
...(result.current ? { current: toFlowView(result.current) } : {}),
|
||||
},
|
||||
);
|
||||
}
|
||||
case "cancel_flow": {
|
||||
const result = await target.taskFlow.cancel({
|
||||
flowId: action.flowId,
|
||||
cfg: params.cfg,
|
||||
});
|
||||
return {
|
||||
found: result.found,
|
||||
cancelled: result.cancelled,
|
||||
...(result.reason ? { reason: result.reason } : {}),
|
||||
...(result.flow ? { flow: toFlowView(result.flow) } : {}),
|
||||
...(result.tasks ? { tasks: result.tasks.map(toTaskView) } : {}),
|
||||
};
|
||||
}
|
||||
case "run_task": {
|
||||
const result = target.taskFlow.runTask({
|
||||
flowId: action.flowId,
|
||||
runtime: action.runtime,
|
||||
sourceId: action.sourceId,
|
||||
childSessionKey: action.childSessionKey,
|
||||
parentTaskId: action.parentTaskId,
|
||||
agentId: action.agentId,
|
||||
runId: action.runId,
|
||||
label: action.label,
|
||||
task: action.task,
|
||||
preferMetadata: action.preferMetadata,
|
||||
notifyPolicy: action.notifyPolicy,
|
||||
status: action.status,
|
||||
startedAt: action.startedAt,
|
||||
lastEventAt: action.lastEventAt,
|
||||
progressSummary: action.progressSummary,
|
||||
});
|
||||
if (result.created) {
|
||||
return {
|
||||
created: true,
|
||||
flow: toFlowView(result.flow),
|
||||
task: toTaskView(result.task),
|
||||
};
|
||||
}
|
||||
return {
|
||||
found: result.found,
|
||||
created: false,
|
||||
reason: result.reason,
|
||||
...(result.flow ? { flow: toFlowView(result.flow) } : {}),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function createTaskFlowWebhookRequestHandler(params: {
|
||||
cfg: OpenClawConfig;
|
||||
targetsByPath: Map<string, TaskFlowWebhookTarget[]>;
|
||||
inFlightLimiter?: WebhookInFlightLimiter;
|
||||
}): (req: IncomingMessage, res: ServerResponse) => Promise<boolean> {
|
||||
const rateLimiter = createFixedWindowRateLimiter({
|
||||
windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
|
||||
maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
|
||||
maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys,
|
||||
});
|
||||
const inFlightLimiter =
|
||||
params.inFlightLimiter ??
|
||||
createWebhookInFlightLimiter({
|
||||
maxInFlightPerKey: WEBHOOK_IN_FLIGHT_DEFAULTS.maxInFlightPerKey,
|
||||
maxTrackedKeys: WEBHOOK_IN_FLIGHT_DEFAULTS.maxTrackedKeys,
|
||||
});
|
||||
|
||||
return async (req: IncomingMessage, res: ServerResponse): Promise<boolean> => {
|
||||
return await withResolvedWebhookRequestPipeline({
|
||||
req,
|
||||
res,
|
||||
targetsByPath: params.targetsByPath,
|
||||
allowMethods: ["POST"],
|
||||
requireJsonContentType: true,
|
||||
rateLimiter,
|
||||
rateLimitKey: (() => {
|
||||
const clientIp =
|
||||
resolveRequestClientIp(
|
||||
req,
|
||||
params.cfg.gateway?.trustedProxies,
|
||||
params.cfg.gateway?.allowRealIpFallback === true,
|
||||
) ??
|
||||
req.socket.remoteAddress ??
|
||||
"unknown";
|
||||
return `${new URL(req.url ?? "/", "http://localhost").pathname}:${clientIp}`;
|
||||
})(),
|
||||
inFlightLimiter,
|
||||
handle: async ({ targets }) => {
|
||||
const presentedSecret = extractSharedSecret(req);
|
||||
const target = resolveWebhookTargetWithAuthOrRejectSync({
|
||||
targets,
|
||||
res,
|
||||
isMatch: (candidate) =>
|
||||
presentedSecret.length > 0 && timingSafeEquals(candidate.secret, presentedSecret),
|
||||
});
|
||||
if (!target) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const body = await readJsonWebhookBodyOrReject({
|
||||
req,
|
||||
res,
|
||||
maxBytes: 256 * 1024,
|
||||
timeoutMs: 15_000,
|
||||
emptyObjectOnEmpty: false,
|
||||
invalidJsonMessage: "invalid request body",
|
||||
});
|
||||
if (!body.ok) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const parsed = webhookActionSchema.safeParse(body.value);
|
||||
if (!parsed.success) {
|
||||
writeJson(res, 400, {
|
||||
ok: false,
|
||||
code: "invalid_request",
|
||||
error: formatZodError(parsed.error),
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
const result = await executeWebhookAction({
|
||||
action: parsed.data,
|
||||
target,
|
||||
cfg: params.cfg,
|
||||
});
|
||||
const outcome = describeWebhookOutcome({
|
||||
action: parsed.data,
|
||||
result,
|
||||
});
|
||||
writeJson(
|
||||
res,
|
||||
outcome.statusCode,
|
||||
outcome.statusCode < 400
|
||||
? {
|
||||
ok: true,
|
||||
routeId: target.routeId,
|
||||
...(outcome.code ? { code: outcome.code } : {}),
|
||||
result,
|
||||
}
|
||||
: {
|
||||
ok: false,
|
||||
routeId: target.routeId,
|
||||
code: outcome.code ?? "request_rejected",
|
||||
error: outcome.error ?? "request rejected",
|
||||
result,
|
||||
},
|
||||
);
|
||||
return true;
|
||||
},
|
||||
});
|
||||
};
|
||||
}
|
||||
6
pnpm-lock.yaml
generated
6
pnpm-lock.yaml
generated
@@ -675,6 +675,12 @@ importers:
|
||||
|
||||
extensions/volcengine: {}
|
||||
|
||||
extensions/webhooks:
|
||||
dependencies:
|
||||
zod:
|
||||
specifier: ^4.3.6
|
||||
version: 4.3.6
|
||||
|
||||
extensions/whatsapp:
|
||||
dependencies:
|
||||
'@whiskeysockets/baileys':
|
||||
|
||||
@@ -189,6 +189,15 @@ describe("plugin runtime command execution", () => {
|
||||
]);
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "exposes runtime.taskFlow binding helpers",
|
||||
assert: (runtime: ReturnType<typeof createPluginRuntime>) => {
|
||||
expectFunctionKeys(runtime.taskFlow as Record<string, unknown>, [
|
||||
"bindSession",
|
||||
"fromToolContext",
|
||||
]);
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "exposes runtime.agent host helpers",
|
||||
assert: (runtime: ReturnType<typeof createPluginRuntime>) => {
|
||||
|
||||
@@ -16,6 +16,7 @@ import { createRuntimeEvents } from "./runtime-events.js";
|
||||
import { createRuntimeLogging } from "./runtime-logging.js";
|
||||
import { createRuntimeMedia } from "./runtime-media.js";
|
||||
import { createRuntimeSystem } from "./runtime-system.js";
|
||||
import { createRuntimeTaskFlow } from "./runtime-taskflow.js";
|
||||
import type { PluginRuntime } from "./types.js";
|
||||
|
||||
const loadTtsRuntime = createLazyRuntimeModule(() => import("./runtime-tts.runtime.js"));
|
||||
@@ -203,6 +204,7 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}):
|
||||
events: createRuntimeEvents(),
|
||||
logging: createRuntimeLogging(),
|
||||
state: { resolveStateDir },
|
||||
taskFlow: createRuntimeTaskFlow(),
|
||||
} satisfies Omit<
|
||||
PluginRuntime,
|
||||
"tts" | "mediaUnderstanding" | "stt" | "modelAuth" | "imageGeneration"
|
||||
|
||||
157
src/plugins/runtime/runtime-taskflow.test.ts
Normal file
157
src/plugins/runtime/runtime-taskflow.test.ts
Normal file
@@ -0,0 +1,157 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { getFlowById, resetFlowRegistryForTests } from "../../tasks/flow-registry.js";
|
||||
import { getTaskById, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
import { createRuntimeTaskFlow } from "./runtime-taskflow.js";
|
||||
|
||||
const hoisted = vi.hoisted(() => {
|
||||
const sendMessageMock = vi.fn();
|
||||
const cancelSessionMock = vi.fn();
|
||||
const killSubagentRunAdminMock = vi.fn();
|
||||
return {
|
||||
sendMessageMock,
|
||||
cancelSessionMock,
|
||||
killSubagentRunAdminMock,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../../tasks/task-registry-delivery-runtime.js", () => ({
|
||||
sendMessage: hoisted.sendMessageMock,
|
||||
}));
|
||||
|
||||
vi.mock("../../acp/control-plane/manager.js", () => ({
|
||||
getAcpSessionManager: () => ({
|
||||
cancelSession: hoisted.cancelSessionMock,
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../../agents/subagent-control.js", () => ({
|
||||
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
|
||||
}));
|
||||
|
||||
afterEach(() => {
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests({ persist: false });
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("runtime TaskFlow", () => {
|
||||
it("binds managed TaskFlow operations to a session key", () => {
|
||||
const runtime = createRuntimeTaskFlow();
|
||||
const taskFlow = runtime.bindSession({
|
||||
sessionKey: "agent:main:main",
|
||||
requesterOrigin: {
|
||||
channel: "telegram",
|
||||
to: "telegram:123",
|
||||
},
|
||||
});
|
||||
|
||||
const created = taskFlow.createManaged({
|
||||
controllerId: "tests/runtime-taskflow",
|
||||
goal: "Triage inbox",
|
||||
currentStep: "classify",
|
||||
stateJson: { lane: "inbox" },
|
||||
});
|
||||
|
||||
expect(created).toMatchObject({
|
||||
syncMode: "managed",
|
||||
ownerKey: "agent:main:main",
|
||||
controllerId: "tests/runtime-taskflow",
|
||||
requesterOrigin: {
|
||||
channel: "telegram",
|
||||
to: "telegram:123",
|
||||
},
|
||||
goal: "Triage inbox",
|
||||
});
|
||||
expect(taskFlow.get(created.flowId)?.flowId).toBe(created.flowId);
|
||||
expect(taskFlow.findLatest()?.flowId).toBe(created.flowId);
|
||||
expect(taskFlow.resolve("agent:main:main")?.flowId).toBe(created.flowId);
|
||||
});
|
||||
|
||||
it("binds TaskFlows from trusted tool context", () => {
|
||||
const runtime = createRuntimeTaskFlow();
|
||||
const taskFlow = runtime.fromToolContext({
|
||||
sessionKey: "agent:main:main",
|
||||
deliveryContext: {
|
||||
channel: "discord",
|
||||
to: "channel:123",
|
||||
threadId: "thread:456",
|
||||
},
|
||||
});
|
||||
|
||||
const created = taskFlow.createManaged({
|
||||
controllerId: "tests/runtime-taskflow",
|
||||
goal: "Review queue",
|
||||
});
|
||||
|
||||
expect(created.requesterOrigin).toMatchObject({
|
||||
channel: "discord",
|
||||
to: "channel:123",
|
||||
threadId: "thread:456",
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects tool contexts without a bound session key", () => {
|
||||
const runtime = createRuntimeTaskFlow();
|
||||
expect(() =>
|
||||
runtime.fromToolContext({
|
||||
sessionKey: undefined,
|
||||
deliveryContext: undefined,
|
||||
}),
|
||||
).toThrow("TaskFlow runtime requires tool context with a sessionKey.");
|
||||
});
|
||||
|
||||
it("keeps TaskFlow reads owner-scoped and runs child tasks under the bound TaskFlow", () => {
|
||||
const runtime = createRuntimeTaskFlow();
|
||||
const ownerTaskFlow = runtime.bindSession({
|
||||
sessionKey: "agent:main:main",
|
||||
});
|
||||
const otherTaskFlow = runtime.bindSession({
|
||||
sessionKey: "agent:main:other",
|
||||
});
|
||||
|
||||
const created = ownerTaskFlow.createManaged({
|
||||
controllerId: "tests/runtime-taskflow",
|
||||
goal: "Inspect PR batch",
|
||||
});
|
||||
|
||||
expect(otherTaskFlow.get(created.flowId)).toBeUndefined();
|
||||
expect(otherTaskFlow.list()).toEqual([]);
|
||||
|
||||
const child = ownerTaskFlow.runTask({
|
||||
flowId: created.flowId,
|
||||
runtime: "acp",
|
||||
childSessionKey: "agent:main:subagent:child",
|
||||
runId: "runtime-taskflow-child",
|
||||
task: "Inspect PR 1",
|
||||
status: "running",
|
||||
startedAt: 10,
|
||||
lastEventAt: 10,
|
||||
});
|
||||
|
||||
expect(child).toMatchObject({
|
||||
created: true,
|
||||
flow: expect.objectContaining({
|
||||
flowId: created.flowId,
|
||||
}),
|
||||
task: expect.objectContaining({
|
||||
parentFlowId: created.flowId,
|
||||
ownerKey: "agent:main:main",
|
||||
runId: "runtime-taskflow-child",
|
||||
}),
|
||||
});
|
||||
if (!child.created) {
|
||||
throw new Error("expected child task creation to succeed");
|
||||
}
|
||||
expect(getTaskById(child.task.taskId)).toMatchObject({
|
||||
parentFlowId: created.flowId,
|
||||
ownerKey: "agent:main:main",
|
||||
});
|
||||
expect(getFlowById(created.flowId)).toMatchObject({
|
||||
flowId: created.flowId,
|
||||
});
|
||||
expect(ownerTaskFlow.getTaskSummary(created.flowId)).toMatchObject({
|
||||
total: 1,
|
||||
active: 1,
|
||||
});
|
||||
});
|
||||
});
|
||||
461
src/plugins/runtime/runtime-taskflow.ts
Normal file
461
src/plugins/runtime/runtime-taskflow.ts
Normal file
@@ -0,0 +1,461 @@
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import {
|
||||
findLatestFlowForOwner,
|
||||
getFlowByIdForOwner,
|
||||
listFlowsForOwner,
|
||||
resolveFlowForLookupTokenForOwner,
|
||||
} from "../../tasks/flow-owner-access.js";
|
||||
import type { FlowRecord, JsonValue } from "../../tasks/flow-registry.types.js";
|
||||
import {
|
||||
createManagedFlow,
|
||||
failFlow,
|
||||
finishFlow,
|
||||
type FlowUpdateResult,
|
||||
requestFlowCancel,
|
||||
resumeFlow,
|
||||
setFlowWaiting,
|
||||
} from "../../tasks/flow-runtime-internal.js";
|
||||
import {
|
||||
cancelFlowByIdForOwner,
|
||||
getFlowTaskSummary,
|
||||
runTaskInFlowForOwner,
|
||||
} from "../../tasks/task-executor.js";
|
||||
import type {
|
||||
TaskDeliveryStatus,
|
||||
TaskDeliveryState,
|
||||
TaskNotifyPolicy,
|
||||
TaskRecord,
|
||||
TaskRegistrySummary,
|
||||
TaskRuntime,
|
||||
} from "../../tasks/task-registry.types.js";
|
||||
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
|
||||
import type { OpenClawPluginToolContext } from "../types.js";
|
||||
|
||||
export type ManagedTaskFlowRecord = FlowRecord & {
|
||||
syncMode: "managed";
|
||||
controllerId: string;
|
||||
};
|
||||
|
||||
export type ManagedTaskFlowMutationErrorCode = "not_found" | "not_managed" | "revision_conflict";
|
||||
|
||||
export type ManagedTaskFlowMutationResult =
|
||||
| {
|
||||
applied: true;
|
||||
flow: ManagedTaskFlowRecord;
|
||||
}
|
||||
| {
|
||||
applied: false;
|
||||
code: ManagedTaskFlowMutationErrorCode;
|
||||
current?: FlowRecord;
|
||||
};
|
||||
|
||||
export type BoundTaskFlowTaskRunResult =
|
||||
| {
|
||||
created: true;
|
||||
flow: ManagedTaskFlowRecord;
|
||||
task: TaskRecord;
|
||||
}
|
||||
| {
|
||||
created: false;
|
||||
reason: string;
|
||||
found: boolean;
|
||||
flow?: FlowRecord;
|
||||
};
|
||||
|
||||
export type BoundTaskFlowCancelResult = Awaited<ReturnType<typeof cancelFlowByIdForOwner>>;
|
||||
|
||||
export type BoundTaskFlowRuntime = {
|
||||
readonly sessionKey: string;
|
||||
readonly requesterOrigin?: TaskDeliveryState["requesterOrigin"];
|
||||
createManaged: (params: {
|
||||
controllerId: string;
|
||||
goal: string;
|
||||
status?: ManagedTaskFlowRecord["status"];
|
||||
notifyPolicy?: TaskNotifyPolicy;
|
||||
currentStep?: string | null;
|
||||
stateJson?: JsonValue | null;
|
||||
waitJson?: JsonValue | null;
|
||||
cancelRequestedAt?: number | null;
|
||||
createdAt?: number;
|
||||
updatedAt?: number;
|
||||
endedAt?: number | null;
|
||||
}) => ManagedTaskFlowRecord;
|
||||
get: (flowId: string) => FlowRecord | undefined;
|
||||
list: () => FlowRecord[];
|
||||
findLatest: () => FlowRecord | undefined;
|
||||
resolve: (token: string) => FlowRecord | undefined;
|
||||
getTaskSummary: (flowId: string) => TaskRegistrySummary | undefined;
|
||||
setWaiting: (params: {
|
||||
flowId: string;
|
||||
expectedRevision: number;
|
||||
currentStep?: string | null;
|
||||
stateJson?: JsonValue | null;
|
||||
waitJson?: JsonValue | null;
|
||||
blockedTaskId?: string | null;
|
||||
blockedSummary?: string | null;
|
||||
updatedAt?: number;
|
||||
}) => ManagedTaskFlowMutationResult;
|
||||
resume: (params: {
|
||||
flowId: string;
|
||||
expectedRevision: number;
|
||||
status?: Extract<ManagedTaskFlowRecord["status"], "queued" | "running">;
|
||||
currentStep?: string | null;
|
||||
stateJson?: JsonValue | null;
|
||||
updatedAt?: number;
|
||||
}) => ManagedTaskFlowMutationResult;
|
||||
finish: (params: {
|
||||
flowId: string;
|
||||
expectedRevision: number;
|
||||
stateJson?: JsonValue | null;
|
||||
updatedAt?: number;
|
||||
endedAt?: number;
|
||||
}) => ManagedTaskFlowMutationResult;
|
||||
fail: (params: {
|
||||
flowId: string;
|
||||
expectedRevision: number;
|
||||
stateJson?: JsonValue | null;
|
||||
blockedTaskId?: string | null;
|
||||
blockedSummary?: string | null;
|
||||
updatedAt?: number;
|
||||
endedAt?: number;
|
||||
}) => ManagedTaskFlowMutationResult;
|
||||
requestCancel: (params: {
|
||||
flowId: string;
|
||||
expectedRevision: number;
|
||||
cancelRequestedAt?: number;
|
||||
}) => ManagedTaskFlowMutationResult;
|
||||
cancel: (params: { flowId: string; cfg: OpenClawConfig }) => Promise<BoundTaskFlowCancelResult>;
|
||||
runTask: (params: {
|
||||
flowId: string;
|
||||
runtime: TaskRuntime;
|
||||
sourceId?: string;
|
||||
childSessionKey?: string;
|
||||
parentTaskId?: string;
|
||||
agentId?: string;
|
||||
runId?: string;
|
||||
label?: string;
|
||||
task: string;
|
||||
preferMetadata?: boolean;
|
||||
notifyPolicy?: TaskNotifyPolicy;
|
||||
deliveryStatus?: TaskDeliveryStatus;
|
||||
status?: "queued" | "running";
|
||||
startedAt?: number;
|
||||
lastEventAt?: number;
|
||||
progressSummary?: string | null;
|
||||
}) => BoundTaskFlowTaskRunResult;
|
||||
};
|
||||
|
||||
export type PluginRuntimeTaskFlow = {
|
||||
bindSession: (params: {
|
||||
sessionKey: string;
|
||||
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
|
||||
}) => BoundTaskFlowRuntime;
|
||||
fromToolContext: (
|
||||
ctx: Pick<OpenClawPluginToolContext, "sessionKey" | "deliveryContext">,
|
||||
) => BoundTaskFlowRuntime;
|
||||
};
|
||||
|
||||
function assertSessionKey(sessionKey: string | undefined, errorMessage: string): string {
|
||||
const normalized = sessionKey?.trim();
|
||||
if (!normalized) {
|
||||
throw new Error(errorMessage);
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function asManagedTaskFlowRecord(flow: FlowRecord | undefined): ManagedTaskFlowRecord | undefined {
|
||||
if (!flow || flow.syncMode !== "managed" || !flow.controllerId) {
|
||||
return undefined;
|
||||
}
|
||||
return flow as ManagedTaskFlowRecord;
|
||||
}
|
||||
|
||||
function resolveManagedFlowForOwner(params: {
|
||||
flowId: string;
|
||||
ownerKey: string;
|
||||
}):
|
||||
| { ok: true; flow: ManagedTaskFlowRecord }
|
||||
| { ok: false; code: "not_found" | "not_managed"; current?: FlowRecord } {
|
||||
const flow = getFlowByIdForOwner({
|
||||
flowId: params.flowId,
|
||||
callerOwnerKey: params.ownerKey,
|
||||
});
|
||||
if (!flow) {
|
||||
return { ok: false, code: "not_found" };
|
||||
}
|
||||
const managed = asManagedTaskFlowRecord(flow);
|
||||
if (!managed) {
|
||||
return { ok: false, code: "not_managed", current: flow };
|
||||
}
|
||||
return { ok: true, flow: managed };
|
||||
}
|
||||
|
||||
function mapFlowUpdateResult(result: FlowUpdateResult): ManagedTaskFlowMutationResult {
|
||||
if (result.applied) {
|
||||
const managed = asManagedTaskFlowRecord(result.flow);
|
||||
if (!managed) {
|
||||
return {
|
||||
applied: false,
|
||||
code: "not_managed",
|
||||
current: result.flow,
|
||||
};
|
||||
}
|
||||
return {
|
||||
applied: true,
|
||||
flow: managed,
|
||||
};
|
||||
}
|
||||
return {
|
||||
applied: false,
|
||||
code: result.reason,
|
||||
...(result.current ? { current: result.current } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function createBoundTaskFlowRuntime(params: {
|
||||
sessionKey: string;
|
||||
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
|
||||
}): BoundTaskFlowRuntime {
|
||||
const ownerKey = assertSessionKey(
|
||||
params.sessionKey,
|
||||
"TaskFlow runtime requires a bound sessionKey.",
|
||||
);
|
||||
const requesterOrigin = params.requesterOrigin
|
||||
? normalizeDeliveryContext(params.requesterOrigin)
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
sessionKey: ownerKey,
|
||||
...(requesterOrigin ? { requesterOrigin } : {}),
|
||||
createManaged: (input) =>
|
||||
createManagedFlow({
|
||||
ownerKey,
|
||||
controllerId: input.controllerId,
|
||||
requesterOrigin,
|
||||
status: input.status,
|
||||
notifyPolicy: input.notifyPolicy,
|
||||
goal: input.goal,
|
||||
currentStep: input.currentStep,
|
||||
stateJson: input.stateJson,
|
||||
waitJson: input.waitJson,
|
||||
cancelRequestedAt: input.cancelRequestedAt,
|
||||
createdAt: input.createdAt,
|
||||
updatedAt: input.updatedAt,
|
||||
endedAt: input.endedAt,
|
||||
}) as ManagedTaskFlowRecord,
|
||||
get: (flowId) =>
|
||||
getFlowByIdForOwner({
|
||||
flowId,
|
||||
callerOwnerKey: ownerKey,
|
||||
}),
|
||||
list: () =>
|
||||
listFlowsForOwner({
|
||||
callerOwnerKey: ownerKey,
|
||||
}),
|
||||
findLatest: () =>
|
||||
findLatestFlowForOwner({
|
||||
callerOwnerKey: ownerKey,
|
||||
}),
|
||||
resolve: (token) =>
|
||||
resolveFlowForLookupTokenForOwner({
|
||||
token,
|
||||
callerOwnerKey: ownerKey,
|
||||
}),
|
||||
getTaskSummary: (flowId) => {
|
||||
const flow = getFlowByIdForOwner({
|
||||
flowId,
|
||||
callerOwnerKey: ownerKey,
|
||||
});
|
||||
return flow ? getFlowTaskSummary(flow.flowId) : undefined;
|
||||
},
|
||||
setWaiting: (input) => {
|
||||
const flow = resolveManagedFlowForOwner({
|
||||
flowId: input.flowId,
|
||||
ownerKey,
|
||||
});
|
||||
if (!flow.ok) {
|
||||
return {
|
||||
applied: false,
|
||||
code: flow.code,
|
||||
...(flow.current ? { current: flow.current } : {}),
|
||||
};
|
||||
}
|
||||
return mapFlowUpdateResult(
|
||||
setFlowWaiting({
|
||||
flowId: flow.flow.flowId,
|
||||
expectedRevision: input.expectedRevision,
|
||||
currentStep: input.currentStep,
|
||||
stateJson: input.stateJson,
|
||||
waitJson: input.waitJson,
|
||||
blockedTaskId: input.blockedTaskId,
|
||||
blockedSummary: input.blockedSummary,
|
||||
updatedAt: input.updatedAt,
|
||||
}),
|
||||
);
|
||||
},
|
||||
resume: (input) => {
|
||||
const flow = resolveManagedFlowForOwner({
|
||||
flowId: input.flowId,
|
||||
ownerKey,
|
||||
});
|
||||
if (!flow.ok) {
|
||||
return {
|
||||
applied: false,
|
||||
code: flow.code,
|
||||
...(flow.current ? { current: flow.current } : {}),
|
||||
};
|
||||
}
|
||||
return mapFlowUpdateResult(
|
||||
resumeFlow({
|
||||
flowId: flow.flow.flowId,
|
||||
expectedRevision: input.expectedRevision,
|
||||
status: input.status,
|
||||
currentStep: input.currentStep,
|
||||
stateJson: input.stateJson,
|
||||
updatedAt: input.updatedAt,
|
||||
}),
|
||||
);
|
||||
},
|
||||
finish: (input) => {
|
||||
const flow = resolveManagedFlowForOwner({
|
||||
flowId: input.flowId,
|
||||
ownerKey,
|
||||
});
|
||||
if (!flow.ok) {
|
||||
return {
|
||||
applied: false,
|
||||
code: flow.code,
|
||||
...(flow.current ? { current: flow.current } : {}),
|
||||
};
|
||||
}
|
||||
return mapFlowUpdateResult(
|
||||
finishFlow({
|
||||
flowId: flow.flow.flowId,
|
||||
expectedRevision: input.expectedRevision,
|
||||
stateJson: input.stateJson,
|
||||
updatedAt: input.updatedAt,
|
||||
endedAt: input.endedAt,
|
||||
}),
|
||||
);
|
||||
},
|
||||
fail: (input) => {
|
||||
const flow = resolveManagedFlowForOwner({
|
||||
flowId: input.flowId,
|
||||
ownerKey,
|
||||
});
|
||||
if (!flow.ok) {
|
||||
return {
|
||||
applied: false,
|
||||
code: flow.code,
|
||||
...(flow.current ? { current: flow.current } : {}),
|
||||
};
|
||||
}
|
||||
return mapFlowUpdateResult(
|
||||
failFlow({
|
||||
flowId: flow.flow.flowId,
|
||||
expectedRevision: input.expectedRevision,
|
||||
stateJson: input.stateJson,
|
||||
blockedTaskId: input.blockedTaskId,
|
||||
blockedSummary: input.blockedSummary,
|
||||
updatedAt: input.updatedAt,
|
||||
endedAt: input.endedAt,
|
||||
}),
|
||||
);
|
||||
},
|
||||
requestCancel: (input) => {
|
||||
const flow = resolveManagedFlowForOwner({
|
||||
flowId: input.flowId,
|
||||
ownerKey,
|
||||
});
|
||||
if (!flow.ok) {
|
||||
return {
|
||||
applied: false,
|
||||
code: flow.code,
|
||||
...(flow.current ? { current: flow.current } : {}),
|
||||
};
|
||||
}
|
||||
return mapFlowUpdateResult(
|
||||
requestFlowCancel({
|
||||
flowId: flow.flow.flowId,
|
||||
expectedRevision: input.expectedRevision,
|
||||
cancelRequestedAt: input.cancelRequestedAt,
|
||||
}),
|
||||
);
|
||||
},
|
||||
cancel: ({ flowId, cfg }) =>
|
||||
cancelFlowByIdForOwner({
|
||||
cfg,
|
||||
flowId,
|
||||
callerOwnerKey: ownerKey,
|
||||
}),
|
||||
runTask: (input) => {
|
||||
const created = runTaskInFlowForOwner({
|
||||
flowId: input.flowId,
|
||||
callerOwnerKey: ownerKey,
|
||||
runtime: input.runtime,
|
||||
sourceId: input.sourceId,
|
||||
childSessionKey: input.childSessionKey,
|
||||
parentTaskId: input.parentTaskId,
|
||||
agentId: input.agentId,
|
||||
runId: input.runId,
|
||||
label: input.label,
|
||||
task: input.task,
|
||||
preferMetadata: input.preferMetadata,
|
||||
notifyPolicy: input.notifyPolicy,
|
||||
deliveryStatus: input.deliveryStatus,
|
||||
status: input.status,
|
||||
startedAt: input.startedAt,
|
||||
lastEventAt: input.lastEventAt,
|
||||
progressSummary: input.progressSummary,
|
||||
});
|
||||
if (!created.created) {
|
||||
return {
|
||||
created: false,
|
||||
found: created.found,
|
||||
reason: created.reason ?? "Task was not created.",
|
||||
...(created.flow ? { flow: created.flow } : {}),
|
||||
};
|
||||
}
|
||||
const managed = asManagedTaskFlowRecord(created.flow);
|
||||
if (!managed) {
|
||||
return {
|
||||
created: false,
|
||||
found: true,
|
||||
reason: "TaskFlow does not accept managed child tasks.",
|
||||
flow: created.flow,
|
||||
};
|
||||
}
|
||||
if (!created.task) {
|
||||
return {
|
||||
created: false,
|
||||
found: true,
|
||||
reason: "Task was not created.",
|
||||
flow: created.flow,
|
||||
};
|
||||
}
|
||||
return {
|
||||
created: true,
|
||||
flow: managed,
|
||||
task: created.task,
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function createRuntimeTaskFlow(): PluginRuntimeTaskFlow {
|
||||
return {
|
||||
bindSession: (params) =>
|
||||
createBoundTaskFlowRuntime({
|
||||
sessionKey: params.sessionKey,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
}),
|
||||
fromToolContext: (ctx) =>
|
||||
createBoundTaskFlowRuntime({
|
||||
sessionKey: assertSessionKey(
|
||||
ctx.sessionKey,
|
||||
"TaskFlow runtime requires tool context with a sessionKey.",
|
||||
),
|
||||
requesterOrigin: ctx.deliveryContext,
|
||||
}),
|
||||
};
|
||||
}
|
||||
@@ -103,6 +103,7 @@ export type PluginRuntimeCore = {
|
||||
state: {
|
||||
resolveStateDir: typeof import("../../config/paths.js").resolveStateDir;
|
||||
};
|
||||
taskFlow: import("./runtime-taskflow.js").PluginRuntimeTaskFlow;
|
||||
modelAuth: {
|
||||
/** Resolve auth for a model. Only provider/model and optional cfg are used. */
|
||||
getApiKeyForModel: (params: {
|
||||
|
||||
@@ -4,6 +4,8 @@ export {
|
||||
createManagedFlow,
|
||||
deleteFlowRecordById,
|
||||
findLatestFlowForOwnerKey,
|
||||
failFlow,
|
||||
finishFlow,
|
||||
getFlowById,
|
||||
listFlowRecords,
|
||||
listFlowsForOwnerKey,
|
||||
@@ -15,3 +17,5 @@ export {
|
||||
syncFlowFromTask,
|
||||
updateFlowRecordByIdExpectedRevision,
|
||||
} from "./flow-registry.js";
|
||||
|
||||
export type { FlowUpdateResult } from "./flow-registry.js";
|
||||
|
||||
@@ -324,6 +324,40 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
|
||||
state: {
|
||||
resolveStateDir: vi.fn(() => "/tmp/openclaw"),
|
||||
},
|
||||
taskFlow: {
|
||||
bindSession: vi.fn(() => ({
|
||||
sessionKey: "agent:main:main",
|
||||
createManaged: vi.fn(),
|
||||
get: vi.fn(),
|
||||
list: vi.fn(() => []),
|
||||
findLatest: vi.fn(),
|
||||
resolve: vi.fn(),
|
||||
getTaskSummary: vi.fn(),
|
||||
setWaiting: vi.fn(),
|
||||
resume: vi.fn(),
|
||||
finish: vi.fn(),
|
||||
fail: vi.fn(),
|
||||
requestCancel: vi.fn(),
|
||||
cancel: vi.fn(),
|
||||
runTask: vi.fn(),
|
||||
})) as unknown as PluginRuntime["taskFlow"]["bindSession"],
|
||||
fromToolContext: vi.fn(() => ({
|
||||
sessionKey: "agent:main:main",
|
||||
createManaged: vi.fn(),
|
||||
get: vi.fn(),
|
||||
list: vi.fn(() => []),
|
||||
findLatest: vi.fn(),
|
||||
resolve: vi.fn(),
|
||||
getTaskSummary: vi.fn(),
|
||||
setWaiting: vi.fn(),
|
||||
resume: vi.fn(),
|
||||
finish: vi.fn(),
|
||||
fail: vi.fn(),
|
||||
requestCancel: vi.fn(),
|
||||
cancel: vi.fn(),
|
||||
runTask: vi.fn(),
|
||||
})) as unknown as PluginRuntime["taskFlow"]["fromToolContext"],
|
||||
},
|
||||
modelAuth: {
|
||||
getApiKeyForModel: vi.fn() as unknown as PluginRuntime["modelAuth"]["getApiKeyForModel"],
|
||||
resolveApiKeyForProvider:
|
||||
|
||||
Reference in New Issue
Block a user