Compare commits

...

11 Commits

Author SHA1 Message Date
Mariano Belinky
91ae967fae plugins: harden webhook taskflow API responses 2026-04-04 12:31:12 +02:00
Mariano Belinky
8f095d8f5f plugins: add bundled webhooks TaskFlow bridge 2026-04-04 12:18:13 +02:00
Mariano Belinky
b4649f3238 Tasks: route TaskFlow SDK through approved flow seam 2026-04-02 13:09:47 +02:00
Mariano Belinky
15c2ed0d03 Plugins: fix TaskFlow runtime gate regressions 2026-04-02 13:09:47 +02:00
Mariano Belinky
2032c14aab Changelog: move TaskFlow runtime entry to unreleased 2026-04-02 13:09:47 +02:00
Mariano Belinky
92cbe65237 Changelog: add TaskFlow runtime SDK entry 2026-04-02 13:09:47 +02:00
Mariano Belinky
67f64f9b7d TaskFlow: drop old runtime flow files 2026-04-02 13:09:46 +02:00
Mariano Belinky
d5ff7bdf30 TaskFlow: rename plugin runtime seam 2026-04-02 13:09:46 +02:00
Mariano Belinky
21678f0149 Plugins: normalize bound flow task results 2026-04-02 13:09:46 +02:00
Mariano Belinky
5375586a28 Tasks: export managed flow runtime helpers 2026-04-02 13:09:46 +02:00
Mariano Belinky
7ea3f13950 Plugins: add bound flow runtime 2026-04-02 13:09:46 +02:00
20 changed files with 2213 additions and 0 deletions

4
.github/labeler.yml vendored
View File

@@ -226,6 +226,10 @@
- changed-files:
- any-glob-to-any-file:
- "extensions/open-prose/**"
"extensions: webhooks":
- changed-files:
- any-glob-to-any-file:
- "extensions/webhooks/**"
"extensions: device-pair":
- changed-files:
- any-glob-to-any-file:

View File

@@ -17,6 +17,8 @@ Docs: https://docs.openclaw.ai
- Feishu/comments: add a dedicated Drive comment-event flow with comment-thread context resolution, in-thread replies, and `feishu_drive` comment actions for document collaboration workflows. (#58497) thanks @wittam-01.
- Tasks/TaskFlow: restore the core TaskFlow substrate with managed-vs-mirrored sync modes, durable flow state/revision tracking, and `openclaw flows` inspection/recovery primitives so background orchestration can persist and be operated separately from plugin authoring layers. (#58930) Thanks @mbelinky.
- Tasks/TaskFlow: add managed child task spawning plus sticky cancel intent, so external orchestrators can stop scheduling immediately and let parent TaskFlows settle to `cancelled` once active child tasks finish. (#59610) Thanks @mbelinky.
- Plugins/TaskFlow: add a bound `api.runtime.taskFlow` seam so plugins and trusted authoring layers can create and drive managed TaskFlows from host-resolved OpenClaw context without passing owner identifiers on each call. (#59622) Thanks @mbelinky.
- Plugins/webhooks: add a bundled webhook ingress plugin so external automation can create and drive bound TaskFlows through per-route shared-secret endpoints. Thanks @mbelinky.
### Fixes

View File

@@ -115,6 +115,40 @@ await api.runtime.subagent.deleteSession({
Untrusted plugins can still run subagents, but override requests are rejected.
</Warning>
### `api.runtime.taskFlow`
Bind a TaskFlow runtime to an existing OpenClaw session key or trusted tool
context, then create and manage TaskFlows without passing an owner on every call.
```typescript
const taskFlow = api.runtime.taskFlow.fromToolContext(ctx);
const created = taskFlow.createManaged({
controllerId: "my-plugin/review-batch",
goal: "Review new pull requests",
});
const child = taskFlow.runTask({
flowId: created.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:reviewer",
task: "Review PR #123",
status: "running",
startedAt: Date.now(),
});
const waiting = taskFlow.setWaiting({
flowId: created.flowId,
expectedRevision: created.revision,
currentStep: "await-human-reply",
waitJson: { kind: "reply", channel: "telegram" },
});
```
Use `bindSession({ sessionKey, requesterOrigin })` when you already have a
trusted OpenClaw session key from your own binding layer. Do not bind from raw
user input.
### `api.runtime.tts`
Text-to-speech synthesis.

View File

@@ -0,0 +1,6 @@
export {
definePluginEntry,
type OpenClawPluginApi,
type PluginLogger,
type PluginRuntime,
} from "openclaw/plugin-sdk/core";

View File

@@ -0,0 +1,50 @@
import { definePluginEntry, type OpenClawPluginApi } from "./api.js";
import { resolveWebhooksPluginConfig } from "./src/config.js";
import { createTaskFlowWebhookRequestHandler, type TaskFlowWebhookTarget } from "./src/http.js";
export default definePluginEntry({
id: "webhooks",
name: "Webhooks",
description:
"Authenticated inbound webhooks that bind external automation to OpenClaw TaskFlows.",
async register(api: OpenClawPluginApi) {
const routes = await resolveWebhooksPluginConfig({
pluginConfig: api.pluginConfig,
cfg: api.config,
env: process.env,
logger: api.logger,
});
if (routes.length === 0) {
return;
}
const targetsByPath = new Map<string, TaskFlowWebhookTarget[]>();
const handler = createTaskFlowWebhookRequestHandler({
cfg: api.config,
targetsByPath,
});
for (const route of routes) {
const taskFlow = api.runtime.taskFlow.bindSession({
sessionKey: route.sessionKey,
});
const target: TaskFlowWebhookTarget = {
routeId: route.routeId,
path: route.path,
secret: route.secret,
defaultControllerId: route.controllerId,
taskFlow,
};
targetsByPath.set(target.path, [...(targetsByPath.get(target.path) ?? []), target]);
api.registerHttpRoute({
path: target.path,
auth: "plugin",
match: "exact",
handler,
});
api.logger.info?.(
`[webhooks] registered route ${route.routeId} on ${route.path} for session ${route.sessionKey}`,
);
}
},
});

View File

@@ -0,0 +1,48 @@
{
"id": "webhooks",
"name": "Webhooks",
"description": "Authenticated inbound webhooks that bind external automation to OpenClaw TaskFlows.",
"configSchema": {
"type": "object",
"additionalProperties": false,
"$defs": {
"secretRef": {
"type": "object",
"additionalProperties": false,
"properties": {
"source": {
"type": "string",
"enum": ["env", "file", "exec"]
},
"provider": { "type": "string" },
"id": { "type": "string" }
},
"required": ["source", "provider", "id"]
},
"secretInput": {
"anyOf": [{ "type": "string", "minLength": 1 }, { "$ref": "#/$defs/secretRef" }]
},
"route": {
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": { "type": "boolean" },
"path": { "type": "string", "minLength": 1 },
"sessionKey": { "type": "string", "minLength": 1 },
"secret": { "$ref": "#/$defs/secretInput" },
"controllerId": { "type": "string", "minLength": 1 },
"description": { "type": "string" }
},
"required": ["sessionKey", "secret"]
}
},
"properties": {
"routes": {
"type": "object",
"additionalProperties": {
"$ref": "#/$defs/route"
}
}
}
}
}

View File

@@ -0,0 +1,18 @@
{
"name": "@openclaw/webhooks",
"version": "2026.4.1-beta.1",
"private": true,
"description": "OpenClaw webhook bridge plugin",
"type": "module",
"dependencies": {
"zod": "^4.3.6"
},
"openclaw": {
"bundle": {
"stageRuntimeDependencies": true
},
"extensions": [
"./index.ts"
]
}
}

View File

@@ -0,0 +1,16 @@
export {
createFixedWindowRateLimiter,
createWebhookInFlightLimiter,
normalizeWebhookPath,
readJsonWebhookBodyOrReject,
resolveRequestClientIp,
resolveWebhookTargetWithAuthOrRejectSync,
withResolvedWebhookRequestPipeline,
WEBHOOK_IN_FLIGHT_DEFAULTS,
WEBHOOK_RATE_LIMIT_DEFAULTS,
type WebhookInFlightLimiter,
} from "openclaw/plugin-sdk/webhook-ingress";
export {
resolveConfiguredSecretInputString,
type OpenClawConfig,
} from "openclaw/plugin-sdk/config-runtime";

View File

@@ -0,0 +1,86 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../runtime-api.js";
import { resolveWebhooksPluginConfig } from "./config.js";
describe("resolveWebhooksPluginConfig", () => {
it("resolves default paths and SecretRef-backed secrets", async () => {
const routes = await resolveWebhooksPluginConfig({
pluginConfig: {
routes: {
zapier: {
sessionKey: "agent:main:main",
secret: {
source: "env",
provider: "default",
id: "OPENCLAW_WEBHOOK_SECRET",
},
},
},
},
cfg: {} as OpenClawConfig,
env: {
OPENCLAW_WEBHOOK_SECRET: "shared-secret",
},
});
expect(routes).toEqual([
{
routeId: "zapier",
path: "/plugins/webhooks/zapier",
sessionKey: "agent:main:main",
secret: "shared-secret",
controllerId: "webhooks/zapier",
},
]);
});
it("skips routes whose secret cannot be resolved", async () => {
const warn = vi.fn();
const routes = await resolveWebhooksPluginConfig({
pluginConfig: {
routes: {
missing: {
sessionKey: "agent:main:main",
secret: {
source: "env",
provider: "default",
id: "MISSING_SECRET",
},
},
},
},
cfg: {} as OpenClawConfig,
env: {},
logger: { warn } as never,
});
expect(routes).toEqual([]);
expect(warn).toHaveBeenCalledWith(
expect.stringContaining("[webhooks] skipping route missing:"),
);
});
it("rejects duplicate normalized paths", async () => {
await expect(
resolveWebhooksPluginConfig({
pluginConfig: {
routes: {
first: {
path: "/plugins/webhooks/shared",
sessionKey: "agent:main:main",
secret: "a",
},
second: {
path: "/plugins/webhooks/shared/",
sessionKey: "agent:main:other",
secret: "b",
},
},
},
cfg: {} as OpenClawConfig,
env: {},
}),
).rejects.toThrow(/conflicts with routes\.first\.path/i);
});
});

View File

@@ -0,0 +1,95 @@
import { z } from "zod";
import type { PluginLogger } from "../api.js";
import {
normalizeWebhookPath,
resolveConfiguredSecretInputString,
type OpenClawConfig,
} from "../runtime-api.js";
const secretRefSchema = z
.object({
source: z.enum(["env", "file", "exec"]),
provider: z.string().trim().min(1),
id: z.string().trim().min(1),
})
.strict();
const secretInputSchema = z.union([z.string().trim().min(1), secretRefSchema]);
const webhookRouteConfigSchema = z
.object({
enabled: z.boolean().optional().default(true),
path: z.string().trim().min(1).optional(),
sessionKey: z.string().trim().min(1),
secret: secretInputSchema,
controllerId: z.string().trim().min(1).optional(),
description: z.string().trim().min(1).optional(),
})
.strict();
const webhooksPluginConfigSchema = z
.object({
routes: z.record(z.string().trim().min(1), webhookRouteConfigSchema).default({}),
})
.strict();
export type ResolvedWebhookRouteConfig = {
routeId: string;
path: string;
sessionKey: string;
secret: string;
controllerId: string;
description?: string;
};
export async function resolveWebhooksPluginConfig(params: {
pluginConfig: unknown;
cfg: OpenClawConfig;
env: NodeJS.ProcessEnv;
logger?: PluginLogger;
}): Promise<ResolvedWebhookRouteConfig[]> {
const parsed = webhooksPluginConfigSchema.parse(params.pluginConfig ?? {});
const resolvedRoutes: ResolvedWebhookRouteConfig[] = [];
const seenPaths = new Map<string, string>();
for (const [routeId, route] of Object.entries(parsed.routes)) {
if (route.enabled === false) {
continue;
}
const path = normalizeWebhookPath(route.path ?? `/plugins/webhooks/${routeId}`);
const existingRouteId = seenPaths.get(path);
if (existingRouteId) {
throw new Error(
`webhooks.routes.${routeId}.path conflicts with routes.${existingRouteId}.path (${path}).`,
);
}
const secretResolution = await resolveConfiguredSecretInputString({
config: params.cfg,
env: params.env,
value: route.secret,
path: `plugins.entries.webhooks.routes.${routeId}.secret`,
});
const secret = secretResolution.value?.trim();
if (!secret) {
params.logger?.warn?.(
`[webhooks] skipping route ${routeId}: ${
secretResolution.unresolvedRefReason ?? "secret is empty or unresolved"
}`,
);
continue;
}
seenPaths.set(path, routeId);
resolvedRoutes.push({
routeId,
path,
sessionKey: route.sessionKey,
secret,
controllerId: route.controllerId ?? `webhooks/${routeId}`,
...(route.description ? { description: route.description } : {}),
});
}
return resolvedRoutes;
}

View File

@@ -0,0 +1,375 @@
import { EventEmitter } from "node:events";
import type { IncomingMessage } from "node:http";
import { afterEach, describe, expect, it, vi } from "vitest";
import { createRuntimeTaskFlow } from "../../../src/plugins/runtime/runtime-taskflow.js";
import { createMockServerResponse } from "../../../src/test-utils/mock-http-response.js";
import type { OpenClawConfig } from "../runtime-api.js";
import { createTaskFlowWebhookRequestHandler, type TaskFlowWebhookTarget } from "./http.js";
const hoisted = vi.hoisted(() => {
const sendMessageMock = vi.fn();
const cancelSessionMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
return {
sendMessageMock,
cancelSessionMock,
killSubagentRunAdminMock,
};
});
vi.mock("../../../src/tasks/task-registry-delivery-runtime.js", () => ({
sendMessage: hoisted.sendMessageMock,
}));
vi.mock("../../../src/acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.mock("../../../src/agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
type MockIncomingMessage = IncomingMessage & {
destroyed?: boolean;
destroy: () => MockIncomingMessage;
socket: { remoteAddress: string };
};
let nextSessionId = 0;
function createJsonRequest(params: {
path: string;
secret?: string;
body: unknown;
}): MockIncomingMessage {
const req = new EventEmitter() as MockIncomingMessage;
req.method = "POST";
req.url = params.path;
req.headers = {
"content-type": "application/json",
...(params.secret ? { "x-openclaw-webhook-secret": params.secret } : {}),
};
req.socket = { remoteAddress: "127.0.0.1" } as MockIncomingMessage["socket"];
req.destroyed = false;
req.destroy = (() => {
req.destroyed = true;
return req;
}) as MockIncomingMessage["destroy"];
void Promise.resolve().then(() => {
req.emit("data", Buffer.from(JSON.stringify(params.body), "utf8"));
req.emit("end");
});
return req;
}
function createHandler(): {
handler: ReturnType<typeof createTaskFlowWebhookRequestHandler>;
target: TaskFlowWebhookTarget;
} {
const runtime = createRuntimeTaskFlow();
nextSessionId += 1;
const target: TaskFlowWebhookTarget = {
routeId: "zapier",
path: "/plugins/webhooks/zapier",
secret: "shared-secret",
defaultControllerId: "webhooks/zapier",
taskFlow: runtime.bindSession({
sessionKey: `agent:main:webhook-test-${String(nextSessionId)}`,
}),
};
const targetsByPath = new Map<string, TaskFlowWebhookTarget[]>([[target.path, [target]]]);
return {
handler: createTaskFlowWebhookRequestHandler({
cfg: {} as OpenClawConfig,
targetsByPath,
}),
target,
};
}
async function dispatchJsonRequest(params: {
handler: ReturnType<typeof createTaskFlowWebhookRequestHandler>;
path: string;
secret?: string;
body: unknown;
}) {
const req = createJsonRequest({
path: params.path,
secret: params.secret,
body: params.body,
});
const res = createMockServerResponse();
await params.handler(req, res);
return res;
}
function parseJsonBody(res: { body?: string | Buffer | null }) {
return JSON.parse(String(res.body ?? ""));
}
afterEach(() => {
vi.clearAllMocks();
});
describe("createTaskFlowWebhookRequestHandler", () => {
it("rejects requests with the wrong secret", async () => {
const { handler, target } = createHandler();
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: "wrong-secret",
body: {
action: "list_flows",
},
});
expect(res.statusCode).toBe(401);
expect(res.body).toBe("unauthorized");
expect(target.taskFlow.list()).toEqual([]);
});
it("creates flows through the bound session and scrubs owner metadata from responses", async () => {
const { handler, target } = createHandler();
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "create_flow",
goal: "Review inbound queue",
},
});
expect(res.statusCode).toBe(200);
const parsed = parseJsonBody(res);
expect(parsed.ok).toBe(true);
expect(parsed.result.flow).toMatchObject({
syncMode: "managed",
controllerId: "webhooks/zapier",
goal: "Review inbound queue",
});
expect(parsed.result.flow.ownerKey).toBeUndefined();
expect(parsed.result.flow.requesterOrigin).toBeUndefined();
expect(target.taskFlow.get(parsed.result.flow.flowId)?.flowId).toBe(parsed.result.flow.flowId);
});
it("runs child tasks and scrubs task ownership fields from responses", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Triage inbox",
});
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
task: "Inspect the next message batch",
status: "running",
startedAt: 10,
lastEventAt: 10,
},
});
expect(res.statusCode).toBe(200);
const parsed = parseJsonBody(res);
expect(parsed.ok).toBe(true);
expect(parsed.result.created).toBe(true);
expect(parsed.result.task).toMatchObject({
parentFlowId: flow.flowId,
childSessionKey: "agent:main:subagent:child",
runtime: "acp",
});
expect(parsed.result.task.ownerKey).toBeUndefined();
expect(parsed.result.task.requesterSessionKey).toBeUndefined();
});
it("returns 404 for missing flow mutations", async () => {
const { handler, target } = createHandler();
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "set_waiting",
flowId: "flow-missing",
expectedRevision: 0,
},
});
expect(res.statusCode).toBe(404);
const parsed = parseJsonBody(res);
expect(parsed).toMatchObject({
ok: false,
code: "not_found",
error: "TaskFlow not found.",
result: {
applied: false,
code: "not_found",
},
});
});
it("returns 409 for revision conflicts", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Review inbox",
});
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "set_waiting",
flowId: flow.flowId,
expectedRevision: flow.revision + 1,
},
});
expect(res.statusCode).toBe(409);
const parsed = parseJsonBody(res);
expect(parsed).toMatchObject({
ok: false,
code: "revision_conflict",
result: {
applied: false,
code: "revision_conflict",
current: {
flowId: flow.flowId,
revision: flow.revision,
},
},
});
});
it("rejects internal runtimes and running-only metadata from external callers", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Review inbox",
});
const runtimeRes = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "cli",
task: "Inspect queue",
},
});
expect(runtimeRes.statusCode).toBe(400);
expect(parseJsonBody(runtimeRes)).toMatchObject({
ok: false,
code: "invalid_request",
});
const queuedMetadataRes = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
task: "Inspect queue",
startedAt: 10,
},
});
expect(queuedMetadataRes.statusCode).toBe(400);
expect(parseJsonBody(queuedMetadataRes)).toMatchObject({
ok: false,
code: "invalid_request",
error:
"status: status must be running when startedAt, lastEventAt, or progressSummary is provided",
});
});
it("reuses the same task record when retried with the same runId", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Triage inbox",
});
const first = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "retry-me",
task: "Inspect the next message batch",
},
});
const second = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "retry-me",
task: "Inspect the next message batch",
},
});
expect(first.statusCode).toBe(200);
expect(second.statusCode).toBe(200);
const firstParsed = parseJsonBody(first);
const secondParsed = parseJsonBody(second);
expect(firstParsed.result.task.taskId).toBe(secondParsed.result.task.taskId);
expect(target.taskFlow.getTaskSummary(flow.flowId)?.total).toBe(1);
});
it("returns 409 when cancellation targets a terminal flow", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Review inbox",
});
const finished = target.taskFlow.finish({
flowId: flow.flowId,
expectedRevision: flow.revision,
});
expect(finished.applied).toBe(true);
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "cancel_flow",
flowId: flow.flowId,
},
});
expect(res.statusCode).toBe(409);
expect(parseJsonBody(res)).toMatchObject({
ok: false,
code: "terminal",
error: "Flow is already succeeded.",
result: {
found: true,
cancelled: false,
reason: "Flow is already succeeded.",
},
});
});
});

View File

@@ -0,0 +1,805 @@
import { timingSafeEqual } from "node:crypto";
import type { IncomingMessage, ServerResponse } from "node:http";
import { z } from "zod";
import type { PluginRuntime } from "../api.js";
import {
createFixedWindowRateLimiter,
createWebhookInFlightLimiter,
readJsonWebhookBodyOrReject,
resolveRequestClientIp,
resolveWebhookTargetWithAuthOrRejectSync,
withResolvedWebhookRequestPipeline,
WEBHOOK_IN_FLIGHT_DEFAULTS,
WEBHOOK_RATE_LIMIT_DEFAULTS,
type OpenClawConfig,
type WebhookInFlightLimiter,
} from "../runtime-api.js";
type BoundTaskFlowRuntime = ReturnType<PluginRuntime["taskFlow"]["bindSession"]>;
type JsonValue = null | boolean | number | string | JsonValue[] | { [key: string]: JsonValue };
const jsonValueSchema: z.ZodType<JsonValue> = z.lazy(() =>
z.union([
z.null(),
z.boolean(),
z.number().finite(),
z.string(),
z.array(jsonValueSchema),
z.record(z.string(), jsonValueSchema),
]),
);
const nullableStringSchema = z.string().trim().min(1).nullable().optional();
const createFlowRequestSchema = z
.object({
action: z.literal("create_flow"),
controllerId: z.string().trim().min(1).optional(),
goal: z.string().trim().min(1),
status: z.enum(["queued", "running", "waiting", "blocked"]).optional(),
notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(),
currentStep: nullableStringSchema,
stateJson: jsonValueSchema.nullable().optional(),
waitJson: jsonValueSchema.nullable().optional(),
})
.strict();
const getFlowRequestSchema = z
.object({ action: z.literal("get_flow"), flowId: z.string().trim().min(1) })
.strict();
const listFlowsRequestSchema = z.object({ action: z.literal("list_flows") }).strict();
const findLatestFlowRequestSchema = z.object({ action: z.literal("find_latest_flow") }).strict();
const resolveFlowRequestSchema = z
.object({ action: z.literal("resolve_flow"), token: z.string().trim().min(1) })
.strict();
const getTaskSummaryRequestSchema = z
.object({ action: z.literal("get_task_summary"), flowId: z.string().trim().min(1) })
.strict();
const setWaitingRequestSchema = z
.object({
action: z.literal("set_waiting"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
currentStep: nullableStringSchema,
stateJson: jsonValueSchema.nullable().optional(),
waitJson: jsonValueSchema.nullable().optional(),
blockedTaskId: nullableStringSchema,
blockedSummary: nullableStringSchema,
})
.strict();
const resumeFlowRequestSchema = z
.object({
action: z.literal("resume_flow"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
status: z.enum(["queued", "running"]).optional(),
currentStep: nullableStringSchema,
stateJson: jsonValueSchema.nullable().optional(),
})
.strict();
const finishFlowRequestSchema = z
.object({
action: z.literal("finish_flow"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
stateJson: jsonValueSchema.nullable().optional(),
})
.strict();
const failFlowRequestSchema = z
.object({
action: z.literal("fail_flow"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
stateJson: jsonValueSchema.nullable().optional(),
blockedTaskId: nullableStringSchema,
blockedSummary: nullableStringSchema,
})
.strict();
const requestCancelRequestSchema = z
.object({
action: z.literal("request_cancel"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
})
.strict();
const cancelFlowRequestSchema = z
.object({
action: z.literal("cancel_flow"),
flowId: z.string().trim().min(1),
})
.strict();
const runTaskRequestSchema = z
.object({
action: z.literal("run_task"),
flowId: z.string().trim().min(1),
runtime: z.enum(["subagent", "acp"]),
sourceId: z.string().trim().min(1).optional(),
childSessionKey: z.string().trim().min(1).optional(),
parentTaskId: z.string().trim().min(1).optional(),
agentId: z.string().trim().min(1).optional(),
runId: z.string().trim().min(1).optional(),
label: z.string().trim().min(1).optional(),
task: z.string().trim().min(1),
preferMetadata: z.boolean().optional(),
notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(),
status: z.enum(["queued", "running"]).optional(),
startedAt: z.number().int().nonnegative().optional(),
lastEventAt: z.number().int().nonnegative().optional(),
progressSummary: nullableStringSchema,
})
.strict()
.superRefine((value, ctx) => {
if (
value.status !== "running" &&
(value.startedAt !== undefined ||
value.lastEventAt !== undefined ||
value.progressSummary !== undefined)
) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message:
"status must be running when startedAt, lastEventAt, or progressSummary is provided",
path: ["status"],
});
}
});
const webhookActionSchema = z.discriminatedUnion("action", [
createFlowRequestSchema,
getFlowRequestSchema,
listFlowsRequestSchema,
findLatestFlowRequestSchema,
resolveFlowRequestSchema,
getTaskSummaryRequestSchema,
setWaitingRequestSchema,
resumeFlowRequestSchema,
finishFlowRequestSchema,
failFlowRequestSchema,
requestCancelRequestSchema,
cancelFlowRequestSchema,
runTaskRequestSchema,
]);
type WebhookAction = z.infer<typeof webhookActionSchema>;
export type TaskFlowWebhookTarget = {
routeId: string;
path: string;
secret: string;
defaultControllerId: string;
taskFlow: BoundTaskFlowRuntime;
};
type FlowView = {
flowId: string;
syncMode: "task_mirrored" | "managed";
controllerId?: string;
revision: number;
status: string;
notifyPolicy: string;
goal: string;
currentStep?: string;
blockedTaskId?: string;
blockedSummary?: string;
stateJson?: JsonValue;
waitJson?: JsonValue;
cancelRequestedAt?: number;
createdAt: number;
updatedAt: number;
endedAt?: number;
};
type TaskView = {
taskId: string;
runtime: string;
sourceId?: string;
scopeKind: string;
childSessionKey?: string;
parentFlowId?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
status: string;
deliveryStatus: string;
notifyPolicy: string;
createdAt: number;
startedAt?: number;
endedAt?: number;
lastEventAt?: number;
cleanupAfter?: number;
error?: string;
progressSummary?: string;
terminalSummary?: string;
terminalOutcome?: string;
};
function toFlowView(flow: {
flowId: string;
syncMode: "task_mirrored" | "managed";
controllerId?: string;
revision: number;
status: string;
notifyPolicy: string;
goal: string;
currentStep?: string;
blockedTaskId?: string;
blockedSummary?: string;
stateJson?: JsonValue;
waitJson?: JsonValue;
cancelRequestedAt?: number;
createdAt: number;
updatedAt: number;
endedAt?: number;
}): FlowView {
return {
flowId: flow.flowId,
syncMode: flow.syncMode,
...(flow.controllerId ? { controllerId: flow.controllerId } : {}),
revision: flow.revision,
status: flow.status,
notifyPolicy: flow.notifyPolicy,
goal: flow.goal,
...(flow.currentStep ? { currentStep: flow.currentStep } : {}),
...(flow.blockedTaskId ? { blockedTaskId: flow.blockedTaskId } : {}),
...(flow.blockedSummary ? { blockedSummary: flow.blockedSummary } : {}),
...(flow.stateJson !== undefined ? { stateJson: flow.stateJson } : {}),
...(flow.waitJson !== undefined ? { waitJson: flow.waitJson } : {}),
...(flow.cancelRequestedAt !== undefined ? { cancelRequestedAt: flow.cancelRequestedAt } : {}),
createdAt: flow.createdAt,
updatedAt: flow.updatedAt,
...(flow.endedAt !== undefined ? { endedAt: flow.endedAt } : {}),
};
}
function toTaskView(task: {
taskId: string;
runtime: string;
sourceId?: string;
scopeKind: string;
childSessionKey?: string;
parentFlowId?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
status: string;
deliveryStatus: string;
notifyPolicy: string;
createdAt: number;
startedAt?: number;
endedAt?: number;
lastEventAt?: number;
cleanupAfter?: number;
error?: string;
progressSummary?: string;
terminalSummary?: string;
terminalOutcome?: string;
}): TaskView {
return {
taskId: task.taskId,
runtime: task.runtime,
...(task.sourceId ? { sourceId: task.sourceId } : {}),
scopeKind: task.scopeKind,
...(task.childSessionKey ? { childSessionKey: task.childSessionKey } : {}),
...(task.parentFlowId ? { parentFlowId: task.parentFlowId } : {}),
...(task.parentTaskId ? { parentTaskId: task.parentTaskId } : {}),
...(task.agentId ? { agentId: task.agentId } : {}),
...(task.runId ? { runId: task.runId } : {}),
...(task.label ? { label: task.label } : {}),
task: task.task,
status: task.status,
deliveryStatus: task.deliveryStatus,
notifyPolicy: task.notifyPolicy,
createdAt: task.createdAt,
...(task.startedAt !== undefined ? { startedAt: task.startedAt } : {}),
...(task.endedAt !== undefined ? { endedAt: task.endedAt } : {}),
...(task.lastEventAt !== undefined ? { lastEventAt: task.lastEventAt } : {}),
...(task.cleanupAfter !== undefined ? { cleanupAfter: task.cleanupAfter } : {}),
...(task.error ? { error: task.error } : {}),
...(task.progressSummary ? { progressSummary: task.progressSummary } : {}),
...(task.terminalSummary ? { terminalSummary: task.terminalSummary } : {}),
...(task.terminalOutcome ? { terminalOutcome: task.terminalOutcome } : {}),
};
}
function writeJson(res: ServerResponse, statusCode: number, body: unknown): void {
res.statusCode = statusCode;
res.setHeader("Content-Type", "application/json; charset=utf-8");
res.end(JSON.stringify(body));
}
function extractSharedSecret(req: IncomingMessage): string {
const authHeader = Array.isArray(req.headers.authorization)
? String(req.headers.authorization[0] ?? "")
: String(req.headers.authorization ?? "");
if (authHeader.toLowerCase().startsWith("bearer ")) {
return authHeader.slice("bearer ".length).trim();
}
const sharedHeader = req.headers["x-openclaw-webhook-secret"];
return Array.isArray(sharedHeader)
? String(sharedHeader[0] ?? "").trim()
: String(sharedHeader ?? "").trim();
}
function timingSafeEquals(left: string, right: string): boolean {
const leftBuffer = Buffer.from(left);
const rightBuffer = Buffer.from(right);
if (leftBuffer.length !== rightBuffer.length) {
const paddedLength = Math.max(1, leftBuffer.length, rightBuffer.length);
const paddedLeft = Buffer.alloc(paddedLength);
const paddedRight = Buffer.alloc(paddedLength);
leftBuffer.copy(paddedLeft);
rightBuffer.copy(paddedRight);
timingSafeEqual(paddedLeft, paddedRight);
return false;
}
return timingSafeEqual(leftBuffer, rightBuffer);
}
function formatZodError(error: z.ZodError): string {
const firstIssue = error.issues[0];
if (!firstIssue) {
return "invalid request";
}
const path = firstIssue.path.length > 0 ? `${firstIssue.path.join(".")}: ` : "";
return `${path}${firstIssue.message}`;
}
function mapMutationResult(
result:
| {
applied: true;
flow: FlowView;
}
| {
applied: false;
code: string;
current?: FlowView;
},
): unknown {
return result;
}
function mapMutationStatus(result: {
applied: boolean;
code?: "not_found" | "not_managed" | "revision_conflict";
}): { statusCode: number; code?: string; error?: string } {
if (result.applied) {
return { statusCode: 200 };
}
switch (result.code) {
case "not_found":
return {
statusCode: 404,
code: "not_found",
error: "TaskFlow not found.",
};
case "not_managed":
return {
statusCode: 409,
code: "not_managed",
error: "TaskFlow is not managed by this webhook surface.",
};
case "revision_conflict":
return {
statusCode: 409,
code: "revision_conflict",
error: "TaskFlow changed since the caller's expected revision.",
};
default:
return {
statusCode: 409,
code: "mutation_rejected",
error: "TaskFlow mutation was rejected.",
};
}
}
function mapRunTaskStatus(result: { created: boolean; found: boolean; reason?: string }): {
statusCode: number;
code?: string;
error?: string;
} {
if (result.created) {
return { statusCode: 200 };
}
if (!result.found) {
return {
statusCode: 404,
code: "not_found",
error: "TaskFlow not found.",
};
}
if (result.reason === "Flow cancellation has already been requested.") {
return {
statusCode: 409,
code: "cancel_requested",
error: result.reason,
};
}
if (result.reason === "Flow does not accept managed child tasks.") {
return {
statusCode: 409,
code: "not_managed",
error: result.reason,
};
}
if (result.reason?.startsWith("Flow is already ")) {
return {
statusCode: 409,
code: "terminal",
error: result.reason,
};
}
return {
statusCode: 409,
code: "task_not_created",
error: result.reason ?? "TaskFlow task was not created.",
};
}
function mapCancelStatus(result: { found: boolean; cancelled: boolean; reason?: string }): {
statusCode: number;
code?: string;
error?: string;
} {
if (result.cancelled) {
return { statusCode: 200 };
}
if (!result.found) {
return {
statusCode: 404,
code: "not_found",
error: "TaskFlow not found.",
};
}
if (result.reason === "One or more child tasks are still active.") {
return {
statusCode: 202,
code: "cancel_pending",
error: result.reason,
};
}
if (result.reason === "Flow changed while cancellation was in progress.") {
return {
statusCode: 409,
code: "revision_conflict",
error: result.reason,
};
}
if (result.reason?.startsWith("Flow is already ")) {
return {
statusCode: 409,
code: "terminal",
error: result.reason,
};
}
return {
statusCode: 409,
code: "cancel_rejected",
error: result.reason ?? "TaskFlow cancellation was rejected.",
};
}
function describeWebhookOutcome(params: { action: WebhookAction; result: unknown }): {
statusCode: number;
code?: string;
error?: string;
} {
switch (params.action.action) {
case "set_waiting":
case "resume_flow":
case "finish_flow":
case "fail_flow":
case "request_cancel":
return mapMutationStatus(
params.result as {
applied: boolean;
code?: "not_found" | "not_managed" | "revision_conflict";
},
);
case "cancel_flow":
return mapCancelStatus(
params.result as {
found: boolean;
cancelled: boolean;
reason?: string;
},
);
case "run_task":
return mapRunTaskStatus(
params.result as {
created: boolean;
found: boolean;
reason?: string;
},
);
default:
return { statusCode: 200 };
}
}
async function executeWebhookAction(params: {
action: WebhookAction;
target: TaskFlowWebhookTarget;
cfg: OpenClawConfig;
}): Promise<unknown> {
const { action, target } = params;
switch (action.action) {
case "create_flow": {
const flow = target.taskFlow.createManaged({
controllerId: action.controllerId ?? target.defaultControllerId,
goal: action.goal,
status: action.status,
notifyPolicy: action.notifyPolicy,
currentStep: action.currentStep ?? undefined,
stateJson: action.stateJson,
waitJson: action.waitJson,
});
return { flow: toFlowView(flow) };
}
case "get_flow": {
const flow = target.taskFlow.get(action.flowId);
return { flow: flow ? toFlowView(flow) : null };
}
case "list_flows":
return { flows: target.taskFlow.list().map(toFlowView) };
case "find_latest_flow": {
const flow = target.taskFlow.findLatest();
return { flow: flow ? toFlowView(flow) : null };
}
case "resolve_flow": {
const flow = target.taskFlow.resolve(action.token);
return { flow: flow ? toFlowView(flow) : null };
}
case "get_task_summary":
return { summary: target.taskFlow.getTaskSummary(action.flowId) ?? null };
case "set_waiting": {
const result = target.taskFlow.setWaiting({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
currentStep: action.currentStep,
stateJson: action.stateJson,
waitJson: action.waitJson,
blockedTaskId: action.blockedTaskId,
blockedSummary: action.blockedSummary,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "resume_flow": {
const result = target.taskFlow.resume({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
status: action.status,
currentStep: action.currentStep,
stateJson: action.stateJson,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "finish_flow": {
const result = target.taskFlow.finish({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
stateJson: action.stateJson,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "fail_flow": {
const result = target.taskFlow.fail({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
stateJson: action.stateJson,
blockedTaskId: action.blockedTaskId,
blockedSummary: action.blockedSummary,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "request_cancel": {
const result = target.taskFlow.requestCancel({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "cancel_flow": {
const result = await target.taskFlow.cancel({
flowId: action.flowId,
cfg: params.cfg,
});
return {
found: result.found,
cancelled: result.cancelled,
...(result.reason ? { reason: result.reason } : {}),
...(result.flow ? { flow: toFlowView(result.flow) } : {}),
...(result.tasks ? { tasks: result.tasks.map(toTaskView) } : {}),
};
}
case "run_task": {
const result = target.taskFlow.runTask({
flowId: action.flowId,
runtime: action.runtime,
sourceId: action.sourceId,
childSessionKey: action.childSessionKey,
parentTaskId: action.parentTaskId,
agentId: action.agentId,
runId: action.runId,
label: action.label,
task: action.task,
preferMetadata: action.preferMetadata,
notifyPolicy: action.notifyPolicy,
status: action.status,
startedAt: action.startedAt,
lastEventAt: action.lastEventAt,
progressSummary: action.progressSummary,
});
if (result.created) {
return {
created: true,
flow: toFlowView(result.flow),
task: toTaskView(result.task),
};
}
return {
found: result.found,
created: false,
reason: result.reason,
...(result.flow ? { flow: toFlowView(result.flow) } : {}),
};
}
}
}
export function createTaskFlowWebhookRequestHandler(params: {
cfg: OpenClawConfig;
targetsByPath: Map<string, TaskFlowWebhookTarget[]>;
inFlightLimiter?: WebhookInFlightLimiter;
}): (req: IncomingMessage, res: ServerResponse) => Promise<boolean> {
const rateLimiter = createFixedWindowRateLimiter({
windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys,
});
const inFlightLimiter =
params.inFlightLimiter ??
createWebhookInFlightLimiter({
maxInFlightPerKey: WEBHOOK_IN_FLIGHT_DEFAULTS.maxInFlightPerKey,
maxTrackedKeys: WEBHOOK_IN_FLIGHT_DEFAULTS.maxTrackedKeys,
});
return async (req: IncomingMessage, res: ServerResponse): Promise<boolean> => {
return await withResolvedWebhookRequestPipeline({
req,
res,
targetsByPath: params.targetsByPath,
allowMethods: ["POST"],
requireJsonContentType: true,
rateLimiter,
rateLimitKey: (() => {
const clientIp =
resolveRequestClientIp(
req,
params.cfg.gateway?.trustedProxies,
params.cfg.gateway?.allowRealIpFallback === true,
) ??
req.socket.remoteAddress ??
"unknown";
return `${new URL(req.url ?? "/", "http://localhost").pathname}:${clientIp}`;
})(),
inFlightLimiter,
handle: async ({ targets }) => {
const presentedSecret = extractSharedSecret(req);
const target = resolveWebhookTargetWithAuthOrRejectSync({
targets,
res,
isMatch: (candidate) =>
presentedSecret.length > 0 && timingSafeEquals(candidate.secret, presentedSecret),
});
if (!target) {
return true;
}
const body = await readJsonWebhookBodyOrReject({
req,
res,
maxBytes: 256 * 1024,
timeoutMs: 15_000,
emptyObjectOnEmpty: false,
invalidJsonMessage: "invalid request body",
});
if (!body.ok) {
return true;
}
const parsed = webhookActionSchema.safeParse(body.value);
if (!parsed.success) {
writeJson(res, 400, {
ok: false,
code: "invalid_request",
error: formatZodError(parsed.error),
});
return true;
}
const result = await executeWebhookAction({
action: parsed.data,
target,
cfg: params.cfg,
});
const outcome = describeWebhookOutcome({
action: parsed.data,
result,
});
writeJson(
res,
outcome.statusCode,
outcome.statusCode < 400
? {
ok: true,
routeId: target.routeId,
...(outcome.code ? { code: outcome.code } : {}),
result,
}
: {
ok: false,
routeId: target.routeId,
code: outcome.code ?? "request_rejected",
error: outcome.error ?? "request rejected",
result,
},
);
return true;
},
});
};
}

6
pnpm-lock.yaml generated
View File

@@ -675,6 +675,12 @@ importers:
extensions/volcengine: {}
extensions/webhooks:
dependencies:
zod:
specifier: ^4.3.6
version: 4.3.6
extensions/whatsapp:
dependencies:
'@whiskeysockets/baileys':

View File

@@ -189,6 +189,15 @@ describe("plugin runtime command execution", () => {
]);
},
},
{
name: "exposes runtime.taskFlow binding helpers",
assert: (runtime: ReturnType<typeof createPluginRuntime>) => {
expectFunctionKeys(runtime.taskFlow as Record<string, unknown>, [
"bindSession",
"fromToolContext",
]);
},
},
{
name: "exposes runtime.agent host helpers",
assert: (runtime: ReturnType<typeof createPluginRuntime>) => {

View File

@@ -16,6 +16,7 @@ import { createRuntimeEvents } from "./runtime-events.js";
import { createRuntimeLogging } from "./runtime-logging.js";
import { createRuntimeMedia } from "./runtime-media.js";
import { createRuntimeSystem } from "./runtime-system.js";
import { createRuntimeTaskFlow } from "./runtime-taskflow.js";
import type { PluginRuntime } from "./types.js";
const loadTtsRuntime = createLazyRuntimeModule(() => import("./runtime-tts.runtime.js"));
@@ -203,6 +204,7 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}):
events: createRuntimeEvents(),
logging: createRuntimeLogging(),
state: { resolveStateDir },
taskFlow: createRuntimeTaskFlow(),
} satisfies Omit<
PluginRuntime,
"tts" | "mediaUnderstanding" | "stt" | "modelAuth" | "imageGeneration"

View File

@@ -0,0 +1,157 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { getFlowById, resetFlowRegistryForTests } from "../../tasks/flow-registry.js";
import { getTaskById, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { createRuntimeTaskFlow } from "./runtime-taskflow.js";
const hoisted = vi.hoisted(() => {
const sendMessageMock = vi.fn();
const cancelSessionMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
return {
sendMessageMock,
cancelSessionMock,
killSubagentRunAdminMock,
};
});
vi.mock("../../tasks/task-registry-delivery-runtime.js", () => ({
sendMessage: hoisted.sendMessageMock,
}));
vi.mock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.mock("../../agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
afterEach(() => {
resetTaskRegistryForTests();
resetFlowRegistryForTests({ persist: false });
vi.clearAllMocks();
});
describe("runtime TaskFlow", () => {
it("binds managed TaskFlow operations to a session key", () => {
const runtime = createRuntimeTaskFlow();
const taskFlow = runtime.bindSession({
sessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
});
const created = taskFlow.createManaged({
controllerId: "tests/runtime-taskflow",
goal: "Triage inbox",
currentStep: "classify",
stateJson: { lane: "inbox" },
});
expect(created).toMatchObject({
syncMode: "managed",
ownerKey: "agent:main:main",
controllerId: "tests/runtime-taskflow",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
goal: "Triage inbox",
});
expect(taskFlow.get(created.flowId)?.flowId).toBe(created.flowId);
expect(taskFlow.findLatest()?.flowId).toBe(created.flowId);
expect(taskFlow.resolve("agent:main:main")?.flowId).toBe(created.flowId);
});
it("binds TaskFlows from trusted tool context", () => {
const runtime = createRuntimeTaskFlow();
const taskFlow = runtime.fromToolContext({
sessionKey: "agent:main:main",
deliveryContext: {
channel: "discord",
to: "channel:123",
threadId: "thread:456",
},
});
const created = taskFlow.createManaged({
controllerId: "tests/runtime-taskflow",
goal: "Review queue",
});
expect(created.requesterOrigin).toMatchObject({
channel: "discord",
to: "channel:123",
threadId: "thread:456",
});
});
it("rejects tool contexts without a bound session key", () => {
const runtime = createRuntimeTaskFlow();
expect(() =>
runtime.fromToolContext({
sessionKey: undefined,
deliveryContext: undefined,
}),
).toThrow("TaskFlow runtime requires tool context with a sessionKey.");
});
it("keeps TaskFlow reads owner-scoped and runs child tasks under the bound TaskFlow", () => {
const runtime = createRuntimeTaskFlow();
const ownerTaskFlow = runtime.bindSession({
sessionKey: "agent:main:main",
});
const otherTaskFlow = runtime.bindSession({
sessionKey: "agent:main:other",
});
const created = ownerTaskFlow.createManaged({
controllerId: "tests/runtime-taskflow",
goal: "Inspect PR batch",
});
expect(otherTaskFlow.get(created.flowId)).toBeUndefined();
expect(otherTaskFlow.list()).toEqual([]);
const child = ownerTaskFlow.runTask({
flowId: created.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "runtime-taskflow-child",
task: "Inspect PR 1",
status: "running",
startedAt: 10,
lastEventAt: 10,
});
expect(child).toMatchObject({
created: true,
flow: expect.objectContaining({
flowId: created.flowId,
}),
task: expect.objectContaining({
parentFlowId: created.flowId,
ownerKey: "agent:main:main",
runId: "runtime-taskflow-child",
}),
});
if (!child.created) {
throw new Error("expected child task creation to succeed");
}
expect(getTaskById(child.task.taskId)).toMatchObject({
parentFlowId: created.flowId,
ownerKey: "agent:main:main",
});
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
});
expect(ownerTaskFlow.getTaskSummary(created.flowId)).toMatchObject({
total: 1,
active: 1,
});
});
});

View File

@@ -0,0 +1,461 @@
import type { OpenClawConfig } from "../../config/config.js";
import {
findLatestFlowForOwner,
getFlowByIdForOwner,
listFlowsForOwner,
resolveFlowForLookupTokenForOwner,
} from "../../tasks/flow-owner-access.js";
import type { FlowRecord, JsonValue } from "../../tasks/flow-registry.types.js";
import {
createManagedFlow,
failFlow,
finishFlow,
type FlowUpdateResult,
requestFlowCancel,
resumeFlow,
setFlowWaiting,
} from "../../tasks/flow-runtime-internal.js";
import {
cancelFlowByIdForOwner,
getFlowTaskSummary,
runTaskInFlowForOwner,
} from "../../tasks/task-executor.js";
import type {
TaskDeliveryStatus,
TaskDeliveryState,
TaskNotifyPolicy,
TaskRecord,
TaskRegistrySummary,
TaskRuntime,
} from "../../tasks/task-registry.types.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
import type { OpenClawPluginToolContext } from "../types.js";
export type ManagedTaskFlowRecord = FlowRecord & {
syncMode: "managed";
controllerId: string;
};
export type ManagedTaskFlowMutationErrorCode = "not_found" | "not_managed" | "revision_conflict";
export type ManagedTaskFlowMutationResult =
| {
applied: true;
flow: ManagedTaskFlowRecord;
}
| {
applied: false;
code: ManagedTaskFlowMutationErrorCode;
current?: FlowRecord;
};
export type BoundTaskFlowTaskRunResult =
| {
created: true;
flow: ManagedTaskFlowRecord;
task: TaskRecord;
}
| {
created: false;
reason: string;
found: boolean;
flow?: FlowRecord;
};
export type BoundTaskFlowCancelResult = Awaited<ReturnType<typeof cancelFlowByIdForOwner>>;
export type BoundTaskFlowRuntime = {
readonly sessionKey: string;
readonly requesterOrigin?: TaskDeliveryState["requesterOrigin"];
createManaged: (params: {
controllerId: string;
goal: string;
status?: ManagedTaskFlowRecord["status"];
notifyPolicy?: TaskNotifyPolicy;
currentStep?: string | null;
stateJson?: JsonValue | null;
waitJson?: JsonValue | null;
cancelRequestedAt?: number | null;
createdAt?: number;
updatedAt?: number;
endedAt?: number | null;
}) => ManagedTaskFlowRecord;
get: (flowId: string) => FlowRecord | undefined;
list: () => FlowRecord[];
findLatest: () => FlowRecord | undefined;
resolve: (token: string) => FlowRecord | undefined;
getTaskSummary: (flowId: string) => TaskRegistrySummary | undefined;
setWaiting: (params: {
flowId: string;
expectedRevision: number;
currentStep?: string | null;
stateJson?: JsonValue | null;
waitJson?: JsonValue | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
updatedAt?: number;
}) => ManagedTaskFlowMutationResult;
resume: (params: {
flowId: string;
expectedRevision: number;
status?: Extract<ManagedTaskFlowRecord["status"], "queued" | "running">;
currentStep?: string | null;
stateJson?: JsonValue | null;
updatedAt?: number;
}) => ManagedTaskFlowMutationResult;
finish: (params: {
flowId: string;
expectedRevision: number;
stateJson?: JsonValue | null;
updatedAt?: number;
endedAt?: number;
}) => ManagedTaskFlowMutationResult;
fail: (params: {
flowId: string;
expectedRevision: number;
stateJson?: JsonValue | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
updatedAt?: number;
endedAt?: number;
}) => ManagedTaskFlowMutationResult;
requestCancel: (params: {
flowId: string;
expectedRevision: number;
cancelRequestedAt?: number;
}) => ManagedTaskFlowMutationResult;
cancel: (params: { flowId: string; cfg: OpenClawConfig }) => Promise<BoundTaskFlowCancelResult>;
runTask: (params: {
flowId: string;
runtime: TaskRuntime;
sourceId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
status?: "queued" | "running";
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
}) => BoundTaskFlowTaskRunResult;
};
export type PluginRuntimeTaskFlow = {
bindSession: (params: {
sessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
}) => BoundTaskFlowRuntime;
fromToolContext: (
ctx: Pick<OpenClawPluginToolContext, "sessionKey" | "deliveryContext">,
) => BoundTaskFlowRuntime;
};
function assertSessionKey(sessionKey: string | undefined, errorMessage: string): string {
const normalized = sessionKey?.trim();
if (!normalized) {
throw new Error(errorMessage);
}
return normalized;
}
function asManagedTaskFlowRecord(flow: FlowRecord | undefined): ManagedTaskFlowRecord | undefined {
if (!flow || flow.syncMode !== "managed" || !flow.controllerId) {
return undefined;
}
return flow as ManagedTaskFlowRecord;
}
function resolveManagedFlowForOwner(params: {
flowId: string;
ownerKey: string;
}):
| { ok: true; flow: ManagedTaskFlowRecord }
| { ok: false; code: "not_found" | "not_managed"; current?: FlowRecord } {
const flow = getFlowByIdForOwner({
flowId: params.flowId,
callerOwnerKey: params.ownerKey,
});
if (!flow) {
return { ok: false, code: "not_found" };
}
const managed = asManagedTaskFlowRecord(flow);
if (!managed) {
return { ok: false, code: "not_managed", current: flow };
}
return { ok: true, flow: managed };
}
function mapFlowUpdateResult(result: FlowUpdateResult): ManagedTaskFlowMutationResult {
if (result.applied) {
const managed = asManagedTaskFlowRecord(result.flow);
if (!managed) {
return {
applied: false,
code: "not_managed",
current: result.flow,
};
}
return {
applied: true,
flow: managed,
};
}
return {
applied: false,
code: result.reason,
...(result.current ? { current: result.current } : {}),
};
}
function createBoundTaskFlowRuntime(params: {
sessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
}): BoundTaskFlowRuntime {
const ownerKey = assertSessionKey(
params.sessionKey,
"TaskFlow runtime requires a bound sessionKey.",
);
const requesterOrigin = params.requesterOrigin
? normalizeDeliveryContext(params.requesterOrigin)
: undefined;
return {
sessionKey: ownerKey,
...(requesterOrigin ? { requesterOrigin } : {}),
createManaged: (input) =>
createManagedFlow({
ownerKey,
controllerId: input.controllerId,
requesterOrigin,
status: input.status,
notifyPolicy: input.notifyPolicy,
goal: input.goal,
currentStep: input.currentStep,
stateJson: input.stateJson,
waitJson: input.waitJson,
cancelRequestedAt: input.cancelRequestedAt,
createdAt: input.createdAt,
updatedAt: input.updatedAt,
endedAt: input.endedAt,
}) as ManagedTaskFlowRecord,
get: (flowId) =>
getFlowByIdForOwner({
flowId,
callerOwnerKey: ownerKey,
}),
list: () =>
listFlowsForOwner({
callerOwnerKey: ownerKey,
}),
findLatest: () =>
findLatestFlowForOwner({
callerOwnerKey: ownerKey,
}),
resolve: (token) =>
resolveFlowForLookupTokenForOwner({
token,
callerOwnerKey: ownerKey,
}),
getTaskSummary: (flowId) => {
const flow = getFlowByIdForOwner({
flowId,
callerOwnerKey: ownerKey,
});
return flow ? getFlowTaskSummary(flow.flowId) : undefined;
},
setWaiting: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
setFlowWaiting({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
currentStep: input.currentStep,
stateJson: input.stateJson,
waitJson: input.waitJson,
blockedTaskId: input.blockedTaskId,
blockedSummary: input.blockedSummary,
updatedAt: input.updatedAt,
}),
);
},
resume: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
resumeFlow({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
status: input.status,
currentStep: input.currentStep,
stateJson: input.stateJson,
updatedAt: input.updatedAt,
}),
);
},
finish: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
finishFlow({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
stateJson: input.stateJson,
updatedAt: input.updatedAt,
endedAt: input.endedAt,
}),
);
},
fail: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
failFlow({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
stateJson: input.stateJson,
blockedTaskId: input.blockedTaskId,
blockedSummary: input.blockedSummary,
updatedAt: input.updatedAt,
endedAt: input.endedAt,
}),
);
},
requestCancel: (input) => {
const flow = resolveManagedFlowForOwner({
flowId: input.flowId,
ownerKey,
});
if (!flow.ok) {
return {
applied: false,
code: flow.code,
...(flow.current ? { current: flow.current } : {}),
};
}
return mapFlowUpdateResult(
requestFlowCancel({
flowId: flow.flow.flowId,
expectedRevision: input.expectedRevision,
cancelRequestedAt: input.cancelRequestedAt,
}),
);
},
cancel: ({ flowId, cfg }) =>
cancelFlowByIdForOwner({
cfg,
flowId,
callerOwnerKey: ownerKey,
}),
runTask: (input) => {
const created = runTaskInFlowForOwner({
flowId: input.flowId,
callerOwnerKey: ownerKey,
runtime: input.runtime,
sourceId: input.sourceId,
childSessionKey: input.childSessionKey,
parentTaskId: input.parentTaskId,
agentId: input.agentId,
runId: input.runId,
label: input.label,
task: input.task,
preferMetadata: input.preferMetadata,
notifyPolicy: input.notifyPolicy,
deliveryStatus: input.deliveryStatus,
status: input.status,
startedAt: input.startedAt,
lastEventAt: input.lastEventAt,
progressSummary: input.progressSummary,
});
if (!created.created) {
return {
created: false,
found: created.found,
reason: created.reason ?? "Task was not created.",
...(created.flow ? { flow: created.flow } : {}),
};
}
const managed = asManagedTaskFlowRecord(created.flow);
if (!managed) {
return {
created: false,
found: true,
reason: "TaskFlow does not accept managed child tasks.",
flow: created.flow,
};
}
if (!created.task) {
return {
created: false,
found: true,
reason: "Task was not created.",
flow: created.flow,
};
}
return {
created: true,
flow: managed,
task: created.task,
};
},
};
}
export function createRuntimeTaskFlow(): PluginRuntimeTaskFlow {
return {
bindSession: (params) =>
createBoundTaskFlowRuntime({
sessionKey: params.sessionKey,
requesterOrigin: params.requesterOrigin,
}),
fromToolContext: (ctx) =>
createBoundTaskFlowRuntime({
sessionKey: assertSessionKey(
ctx.sessionKey,
"TaskFlow runtime requires tool context with a sessionKey.",
),
requesterOrigin: ctx.deliveryContext,
}),
};
}

View File

@@ -103,6 +103,7 @@ export type PluginRuntimeCore = {
state: {
resolveStateDir: typeof import("../../config/paths.js").resolveStateDir;
};
taskFlow: import("./runtime-taskflow.js").PluginRuntimeTaskFlow;
modelAuth: {
/** Resolve auth for a model. Only provider/model and optional cfg are used. */
getApiKeyForModel: (params: {

View File

@@ -4,6 +4,8 @@ export {
createManagedFlow,
deleteFlowRecordById,
findLatestFlowForOwnerKey,
failFlow,
finishFlow,
getFlowById,
listFlowRecords,
listFlowsForOwnerKey,
@@ -15,3 +17,5 @@ export {
syncFlowFromTask,
updateFlowRecordByIdExpectedRevision,
} from "./flow-registry.js";
export type { FlowUpdateResult } from "./flow-registry.js";

View File

@@ -324,6 +324,40 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
state: {
resolveStateDir: vi.fn(() => "/tmp/openclaw"),
},
taskFlow: {
bindSession: vi.fn(() => ({
sessionKey: "agent:main:main",
createManaged: vi.fn(),
get: vi.fn(),
list: vi.fn(() => []),
findLatest: vi.fn(),
resolve: vi.fn(),
getTaskSummary: vi.fn(),
setWaiting: vi.fn(),
resume: vi.fn(),
finish: vi.fn(),
fail: vi.fn(),
requestCancel: vi.fn(),
cancel: vi.fn(),
runTask: vi.fn(),
})) as unknown as PluginRuntime["taskFlow"]["bindSession"],
fromToolContext: vi.fn(() => ({
sessionKey: "agent:main:main",
createManaged: vi.fn(),
get: vi.fn(),
list: vi.fn(() => []),
findLatest: vi.fn(),
resolve: vi.fn(),
getTaskSummary: vi.fn(),
setWaiting: vi.fn(),
resume: vi.fn(),
finish: vi.fn(),
fail: vi.fn(),
requestCancel: vi.fn(),
cancel: vi.fn(),
runTask: vi.fn(),
})) as unknown as PluginRuntime["taskFlow"]["fromToolContext"],
},
modelAuth: {
getApiKeyForModel: vi.fn() as unknown as PluginRuntime["modelAuth"]["getApiKeyForModel"],
resolveApiKeyForProvider: