diff --git a/CHANGELOG.md b/CHANGELOG.md index e60a3373aea7..7c28a540d929 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -156,6 +156,7 @@ Docs: https://docs.openclaw.ai - Pairing: enforce pending request limits per account instead of per shared channel queue, so one account's outstanding pairing challenges no longer block new pairing on other accounts. Thanks @smaeljaish771 and @vincentkoc. - Exec approvals: unwrap `caffeinate` and `sandbox-exec` before persisting allow-always trust so later shell payload changes still require a fresh approval. Thanks @tdjackey and @vincentkoc. - Matrix/DM threads: keep strict unnamed fresh-invite rooms promotable even when Matrix omits the optional direct hint, preserve repair-failed local DM promotions while still revalidating later room metadata, and keep both bound and thread-isolated Matrix sessions reporting the correct route policy. (#58099) Thanks @gumadeiras. +- ClawFlow: add a small flow runtime substrate for authoring layers with persisted wait targets and output bags, plus bundled skills/Lobster examples and richer `flows show` / `doctor` recovery hints for multi-task flow state. (#58336) Thanks @mbelinky. ## 2026.3.28 diff --git a/docs/automation/clawflow.md b/docs/automation/clawflow.md index c488d76d978a..6c3be1bd3966 100644 --- a/docs/automation/clawflow.md +++ b/docs/automation/clawflow.md @@ -45,6 +45,51 @@ ClawFlow sits above that ledger: For a single detached run, the flow can be a one-task flow. For more structured work, ClawFlow can keep multiple task runs under the same job. +## Runtime substrate + +ClawFlow is the runtime substrate, not a workflow language. + +It owns: + +- the flow id +- the owner session and return context +- waiting state +- small persisted outputs +- finish, fail, cancel, and blocked state + +It does **not** own branching or business logic. Put that in the authoring layer that sits above it: + +- Lobster +- acpx +- plain TypeScript helpers +- bundled skills + +In practice, authoring layers target a small runtime surface: + +- `createFlow(...)` +- `runTaskInFlow(...)` +- `setFlowWaiting(...)` +- `setFlowOutput(...)` +- `appendFlowOutput(...)` +- `emitFlowUpdate(...)` +- `resumeFlow(...)` +- `finishFlow(...)` +- `failFlow(...)` + +That keeps flow ownership and return-to-thread behavior in core without forcing a single DSL on top of it. + +## Authoring pattern + +The intended shape is linear: + +1. Create one flow for the job. +2. Run one detached task under that flow. +3. Wait for the child task or outside event. +4. Resume the flow in the caller. +5. Spawn the next child task or finish. + +ClawFlow persists the minimal state needed to resume that job: the current step, the task it is waiting on, and a small output bag for handoff between steps. + ## CLI surface The flow CLI is intentionally small: @@ -53,6 +98,8 @@ The flow CLI is intentionally small: - `openclaw flows show ` shows one flow and its linked tasks - `openclaw flows cancel ` cancels the flow and any active child tasks +`flows show` also surfaces the current wait target and any stored output keys, which is often enough to answer "what is this job waiting on?" without digging into every child task. + The lookup token accepts either a flow id or the owner session key. ## Related diff --git a/docs/cli/flows.md b/docs/cli/flows.md index e0c495e602ee..6c8f0e4a7b19 100644 --- a/docs/cli/flows.md +++ b/docs/cli/flows.md @@ -37,7 +37,7 @@ openclaw flows show openclaw flows show --json ``` -The output includes the flow status, current step, blocked summary when present, and linked tasks. +The output includes the flow status, current step, wait target, blocked summary when present, stored output keys, and linked tasks. ### `flows cancel` diff --git a/docs/tools/lobster.md b/docs/tools/lobster.md index dc3ad95ee157..58e55af97e03 100644 --- a/docs/tools/lobster.md +++ b/docs/tools/lobster.md @@ -10,6 +10,8 @@ read_when: Lobster is a workflow shell that lets OpenClaw run multi-step tool sequences as a single, deterministic operation with explicit approval checkpoints. +Lobster is one authoring layer above [ClawFlow](/automation/clawflow). Lobster can decide the step logic, but ClawFlow still owns the job identity, owner context, and how detached work returns to the original conversation. + ## Hook Your assistant can build the tools that manage itself. Ask for a workflow, and 30 minutes later you have a CLI plus pipelines that run as one call. Lobster is the missing piece: deterministic pipelines, explicit approvals, and resumable state. diff --git a/skills/clawflow-inbox-triage/SKILL.md b/skills/clawflow-inbox-triage/SKILL.md new file mode 100644 index 000000000000..a03985945ce1 --- /dev/null +++ b/skills/clawflow-inbox-triage/SKILL.md @@ -0,0 +1,62 @@ +--- +name: clawflow-inbox-triage +description: Example ClawFlow authoring pattern for inbox triage. Use when messages need different treatment based on intent, with some routes notifying immediately, some waiting on outside answers, and others rolling into a later summary. +metadata: { "openclaw": { "emoji": "📥" } } +--- + +# ClawFlow inbox triage + +This is a concrete example of how to think about ClawFlow without turning the core runtime into a DSL. + +## Goal + +Triage inbox items with one owner flow: + +- business → post to Slack and wait for reply +- personal → notify the owner now +- everything else → keep for end-of-day summary + +## Pattern + +1. Create one flow for the inbox batch. +2. Run one detached task to classify new items. +3. Resume the flow when classification completes. +4. Route each item in the calling logic. +5. Persist only the summary bucket and the current wait target. + +## Suggested flow outputs + +- `business_threads` +- `personal_items` +- `eod_summary` + +## Minimal runtime calls + +```ts +const flow = createFlow({ + ownerSessionKey, + goal: "triage inbox", +}); + +runTaskInFlow({ + flowId: flow.flowId, + runtime: "acp", + task: "Classify inbox messages", + currentStep: "wait_for_classification", +}); + +resumeFlow({ + flowId: flow.flowId, + currentStep: "route_items", +}); + +appendFlowOutput({ + flowId: flow.flowId, + key: "eod_summary", + value: { subject: "Newsletter", route: "later" }, +}); +``` + +## Related example + +- `skills/clawflow/examples/inbox-triage.lobster` diff --git a/skills/clawflow/SKILL.md b/skills/clawflow/SKILL.md new file mode 100644 index 000000000000..bfb2bc6a56b8 --- /dev/null +++ b/skills/clawflow/SKILL.md @@ -0,0 +1,76 @@ +--- +name: clawflow +description: Use when work should span one or more detached tasks but still behave like one job with a single owner context. ClawFlow is the runtime substrate under authoring layers like Lobster, acpx, or plain code. Keep conditional logic in the caller; use ClawFlow for flow identity, waiting, outputs, and user-facing emergence. +metadata: { "openclaw": { "emoji": "🪝" } } +--- + +# ClawFlow + +Use ClawFlow when a job needs to outlive one prompt or one detached run, but you still want one owner session, one thread context, and one place to inspect or resume the work. + +## When to use it + +- Multi-step background work with one owner +- Work that waits on detached ACP or subagent tasks +- Jobs that may need to emit one clear update back to the owner +- Jobs that need a small persisted output bag between steps + +## What ClawFlow owns + +- flow identity +- owner session and return context +- waiting state +- small persisted outputs +- finish, fail, cancel, and blocked state + +It does **not** own branching or business logic. Put that in Lobster, acpx, or the calling code. + +## Runtime pattern + +1. `createFlow(...)` +2. `runTaskInFlow(...)` +3. `setFlowWaiting(...)` or `setFlowOutput(...)` +4. `resumeFlow(...)` +5. `emitFlowUpdate(...)` only when needed +6. `finishFlow(...)` or `failFlow(...)` + +## Example shape + +```ts +const flow = createFlow({ + ownerSessionKey, + goal: "triage inbox", +}); + +const classify = runTaskInFlow({ + flowId: flow.flowId, + runtime: "acp", + task: "Classify inbox messages", + currentStep: "wait_for_classification", +}); + +resumeFlow({ + flowId: flow.flowId, + currentStep: "route_results", +}); + +setFlowOutput({ + flowId: flow.flowId, + key: "classification", + value: { route: "business" }, +}); +``` + +## Keep conditionals above the runtime + +Use the flow runtime for state and task linkage. Keep decisions in the authoring layer: + +- `business` → post to Slack and wait +- `personal` → notify the owner now +- `later` → append to an end-of-day summary bucket + +## Examples + +- See `skills/clawflow/examples/inbox-triage.lobster` +- See `skills/clawflow/examples/pr-intake.lobster` +- See `skills/clawflow-inbox-triage/SKILL.md` for a concrete routing pattern diff --git a/skills/clawflow/examples/inbox-triage.lobster b/skills/clawflow/examples/inbox-triage.lobster new file mode 100644 index 000000000000..cc462b05a96c --- /dev/null +++ b/skills/clawflow/examples/inbox-triage.lobster @@ -0,0 +1,33 @@ +# Illustrative Lobster authoring example for a ClawFlow-style inbox triage job. +# Swap the placeholder commands for your own tools or scripts. + +name: inbox-triage +steps: + - id: fetch + command: gog.gmail.search --query 'newer_than:1d' --max 20 + + - id: classify + command: >- + openclaw.invoke --tool llm-task --action json --args-json + '{"prompt":"Classify each inbox item as business, personal, or later. Return one JSON object per item with route and summary.","thinking":"low","schema":{"type":"object","properties":{"items":{"type":"array"}},"required":["items"],"additionalProperties":false}}' + stdin: $fetch.stdout + + - id: post_business + command: slack-route --bucket business + stdin: $classify.stdout + condition: $classify.json.items[0].route == "business" + + - id: wait_for_business_reply + command: echo '{"status":"waiting","reason":"slack_reply"}' + condition: $classify.json.items[0].route == "business" + + - id: notify_personal + command: >- + openclaw.invoke --tool message --action send --args-json + '{"provider":"telegram","to":"owner-thread","content":"Personal inbox item needs attention."}' + condition: $classify.json.items[0].route == "personal" + + - id: stash_for_eod + command: summary-append --bucket eod + stdin: $classify.stdout + condition: $classify.json.items[0].route == "later" diff --git a/skills/clawflow/examples/pr-intake.lobster b/skills/clawflow/examples/pr-intake.lobster new file mode 100644 index 000000000000..db8c58e66110 --- /dev/null +++ b/skills/clawflow/examples/pr-intake.lobster @@ -0,0 +1,32 @@ +# Illustrative Lobster authoring example for a ClawFlow-style PR intake lane. +# Replace the placeholder commands with repo-specific tooling. + +name: pr-intake +steps: + - id: fetch + command: gh pr list --repo owner/repo --state open --json number,title,body,headRefName + + - id: classify + command: >- + openclaw.invoke --tool llm-task --action json --args-json + '{"prompt":"Classify each PR as close, request_changes, refactor, or maintainer_review. Return intent and recommended next action.","thinking":"low","schema":{"type":"object","properties":{"items":{"type":"array"}},"required":["items"],"additionalProperties":false}}' + stdin: $fetch.stdout + + - id: close_low_signal + command: pr-close-low-signal + stdin: $classify.stdout + condition: $classify.json.items[0].nextAction == "close" + + - id: request_changes + command: pr-request-changes + stdin: $classify.stdout + condition: $classify.json.items[0].nextAction == "request_changes" + + - id: refactor_branch + command: pr-refactor-branch + stdin: $classify.stdout + condition: $classify.json.items[0].nextAction == "refactor" + + - id: escalate + command: echo '{"status":"notify","target":"maintainer"}' + condition: $classify.json.items[0].nextAction == "maintainer_review" diff --git a/src/commands/doctor-workspace-status.test.ts b/src/commands/doctor-workspace-status.test.ts index 327eaebba461..595734fd2c1b 100644 --- a/src/commands/doctor-workspace-status.test.ts +++ b/src/commands/doctor-workspace-status.test.ts @@ -182,6 +182,7 @@ describe("noteWorkspaceStatus", () => { status: "waiting", notifyPolicy: "done_only", goal: "Process PRs", + waitingOnTaskId: "task-wait-missing", createdAt: 10, updatedAt: 20, }, @@ -210,7 +211,9 @@ describe("noteWorkspaceStatus", () => { const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "ClawFlow recovery"); expect(recoveryCalls).toHaveLength(1); const body = String(recoveryCalls[0]?.[0]); - expect(body).toContain("flow-orphaned: waiting linear flow has no linked tasks"); + expect(body).toContain( + "flow-orphaned: waiting flow points at missing task task-wait-missing", + ); expect(body).toContain("flow-blocked: blocked flow points at missing task task-missing"); expect(body).toContain("openclaw flows show "); expect(body).toContain("openclaw flows cancel "); diff --git a/src/commands/doctor-workspace-status.ts b/src/commands/doctor-workspace-status.ts index d0afa05df0e6..21a5c56de4df 100644 --- a/src/commands/doctor-workspace-status.ts +++ b/src/commands/doctor-workspace-status.ts @@ -12,20 +12,32 @@ function noteFlowRecoveryHints() { const suspicious = listFlowRecords().flatMap((flow) => { const tasks = listTasksForFlowId(flow.flowId); const findings: string[] = []; + const missingWaitingTask = + flow.shape === "linear" && + flow.status === "waiting" && + flow.waitingOnTaskId && + !tasks.some((task) => task.taskId === flow.waitingOnTaskId); + const missingBlockedTask = + flow.status === "blocked" && + flow.blockedTaskId && + !tasks.some((task) => task.taskId === flow.blockedTaskId); if ( flow.shape === "linear" && (flow.status === "running" || flow.status === "waiting" || flow.status === "blocked") && - tasks.length === 0 + tasks.length === 0 && + !missingWaitingTask && + !missingBlockedTask ) { findings.push( `${flow.flowId}: ${flow.status} linear flow has no linked tasks; inspect or cancel it manually.`, ); } - if ( - flow.status === "blocked" && - flow.blockedTaskId && - !tasks.some((task) => task.taskId === flow.blockedTaskId) - ) { + if (missingWaitingTask) { + findings.push( + `${flow.flowId}: waiting flow points at missing task ${flow.waitingOnTaskId}; inspect or cancel it manually.`, + ); + } + if (missingBlockedTask) { findings.push( `${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`, ); diff --git a/src/commands/flows.test.ts b/src/commands/flows.test.ts index 4f9f817baf91..21bceee11a9c 100644 --- a/src/commands/flows.test.ts +++ b/src/commands/flows.test.ts @@ -46,6 +46,10 @@ const flowFixture = { notifyPolicy: "done_only", goal: "Process related PRs", currentStep: "wait_for", + waitingOnTaskId: "task-12345678", + outputs: { + bucket: ["personal"], + }, createdAt: Date.parse("2026-03-31T10:00:00.000Z"), updatedAt: Date.parse("2026-03-31T10:05:00.000Z"), } as const; @@ -109,7 +113,7 @@ describe("flows commands", () => { await flowsListCommand({}, runtime); expect(runtimeLogs[0]).toContain("Flows: 1"); - expect(runtimeLogs[1]).toContain("Flow pressure: 0 active · 0 blocked · 1 total"); + expect(runtimeLogs[1]).toContain("Flow pressure: 1 active · 0 blocked · 1 total"); expect(runtimeLogs.join("\n")).toContain("Process related PRs"); expect(runtimeLogs.join("\n")).toContain("1 active/2 total"); }); @@ -122,6 +126,8 @@ describe("flows commands", () => { expect(runtimeLogs.join("\n")).toContain("shape: linear"); expect(runtimeLogs.join("\n")).toContain("currentStep: wait_for"); + expect(runtimeLogs.join("\n")).toContain("waitingOnTaskId: task-12345678"); + expect(runtimeLogs.join("\n")).toContain("outputKeys: bucket"); expect(runtimeLogs.join("\n")).toContain("tasks: 2 total · 1 active · 0 issues"); expect(runtimeLogs.join("\n")).toContain("task-12345678 running run-12345678 Review PR"); }); diff --git a/src/commands/flows.ts b/src/commands/flows.ts index 1c9732a9d899..b52a2b6f33ed 100644 --- a/src/commands/flows.ts +++ b/src/commands/flows.ts @@ -78,7 +78,7 @@ function formatFlowRows(flows: FlowRecord[], rich: boolean) { function formatFlowListSummary(flows: FlowRecord[]) { const active = flows.filter( - (flow) => flow.status === "queued" || flow.status === "running", + (flow) => flow.status === "queued" || flow.status === "running" || flow.status === "waiting", ).length; const blocked = flows.filter((flow) => flow.status === "blocked").length; return `${active} active · ${blocked} blocked · ${flows.length} total`; @@ -167,6 +167,10 @@ export async function flowsShowCommand( `ownerSessionKey: ${flow.ownerSessionKey}`, `goal: ${flow.goal}`, `currentStep: ${flow.currentStep ?? "n/a"}`, + `waitingOnTaskId: ${flow.waitingOnTaskId ?? "n/a"}`, + `outputKeys: ${ + flow.outputs ? Object.keys(flow.outputs).toSorted().join(", ") || "n/a" : "n/a" + }`, `blockedTaskId: ${flow.blockedTaskId ?? "n/a"}`, `blockedSummary: ${flow.blockedSummary ?? "n/a"}`, `createdAt: ${new Date(flow.createdAt).toISOString()}`, diff --git a/src/tasks/flow-registry.store.sqlite.ts b/src/tasks/flow-registry.store.sqlite.ts index 041fea737415..ecbd3e000dd2 100644 --- a/src/tasks/flow-registry.store.sqlite.ts +++ b/src/tasks/flow-registry.store.sqlite.ts @@ -4,7 +4,7 @@ import { requireNodeSqlite } from "../infra/node-sqlite.js"; import type { DeliveryContext } from "../utils/delivery-context.js"; import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js"; import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js"; -import type { FlowRecord, FlowShape } from "./flow-registry.types.js"; +import type { FlowOutputBag, FlowRecord, FlowShape } from "./flow-registry.types.js"; type FlowRegistryRow = { flow_id: string; @@ -15,6 +15,8 @@ type FlowRegistryRow = { notify_policy: FlowRecord["notifyPolicy"]; goal: string; current_step: string | null; + waiting_on_task_id: string | null; + outputs_json: string | null; blocked_task_id: string | null; blocked_summary: string | null; created_at: number | bigint; @@ -65,6 +67,7 @@ function parseJsonValue(raw: string | null): T | undefined { function rowToFlowRecord(row: FlowRegistryRow): FlowRecord { const endedAt = normalizeNumber(row.ended_at); const requesterOrigin = parseJsonValue(row.requester_origin_json); + const outputs = parseJsonValue(row.outputs_json); return { flowId: row.flow_id, shape: row.shape === "linear" ? "linear" : "single_task", @@ -74,6 +77,8 @@ function rowToFlowRecord(row: FlowRegistryRow): FlowRecord { notifyPolicy: row.notify_policy, goal: row.goal, ...(row.current_step ? { currentStep: row.current_step } : {}), + ...(row.waiting_on_task_id ? { waitingOnTaskId: row.waiting_on_task_id } : {}), + ...(outputs ? { outputs } : {}), ...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}), ...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}), createdAt: normalizeNumber(row.created_at) ?? 0, @@ -92,6 +97,8 @@ function bindFlowRecord(record: FlowRecord) { notify_policy: record.notifyPolicy, goal: record.goal, current_step: record.currentStep ?? null, + waiting_on_task_id: record.waitingOnTaskId ?? null, + outputs_json: serializeJson(record.outputs), blocked_task_id: record.blockedTaskId ?? null, blocked_summary: record.blockedSummary ?? null, created_at: record.createdAt, @@ -112,6 +119,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements { notify_policy, goal, current_step, + waiting_on_task_id, + outputs_json, blocked_task_id, blocked_summary, created_at, @@ -130,6 +139,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements { notify_policy, goal, current_step, + waiting_on_task_id, + outputs_json, blocked_task_id, blocked_summary, created_at, @@ -144,6 +155,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements { @notify_policy, @goal, @current_step, + @waiting_on_task_id, + @outputs_json, @blocked_task_id, @blocked_summary, @created_at, @@ -158,6 +171,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements { notify_policy = excluded.notify_policy, goal = excluded.goal, current_step = excluded.current_step, + waiting_on_task_id = excluded.waiting_on_task_id, + outputs_json = excluded.outputs_json, blocked_task_id = excluded.blocked_task_id, blocked_summary = excluded.blocked_summary, created_at = excluded.created_at, @@ -180,6 +195,8 @@ function ensureSchema(db: DatabaseSync) { notify_policy TEXT NOT NULL, goal TEXT NOT NULL, current_step TEXT, + waiting_on_task_id TEXT, + outputs_json TEXT, blocked_task_id TEXT, blocked_summary TEXT, created_at INTEGER NOT NULL, @@ -188,6 +205,8 @@ function ensureSchema(db: DatabaseSync) { ); `); ensureColumn(db, "flow_runs", "shape", "TEXT"); + ensureColumn(db, "flow_runs", "waiting_on_task_id", "TEXT"); + ensureColumn(db, "flow_runs", "outputs_json", "TEXT"); ensureColumn(db, "flow_runs", "blocked_task_id", "TEXT"); ensureColumn(db, "flow_runs", "blocked_summary", "TEXT"); db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`); diff --git a/src/tasks/flow-registry.store.test.ts b/src/tasks/flow-registry.store.test.ts index e83d20a50806..c0e742bd2f8e 100644 --- a/src/tasks/flow-registry.store.test.ts +++ b/src/tasks/flow-registry.store.test.ts @@ -15,6 +15,10 @@ function createStoredFlow(): FlowRecord { notifyPolicy: "done_only", goal: "Restored flow", currentStep: "spawn_task", + waitingOnTaskId: "task-waiting", + outputs: { + bucket: ["business"], + }, blockedTaskId: "task-restored", blockedSummary: "Writable session required.", createdAt: 100, @@ -64,6 +68,10 @@ describe("flow-registry store runtime", () => { flowId: "flow-restored", shape: "linear", goal: "Restored flow", + waitingOnTaskId: "task-waiting", + outputs: { + bucket: ["business"], + }, blockedTaskId: "task-restored", blockedSummary: "Writable session required.", }); @@ -94,6 +102,10 @@ describe("flow-registry store runtime", () => { goal: "Persisted flow", status: "waiting", currentStep: "ask_user", + waitingOnTaskId: "task-restored", + outputs: { + bucket: ["personal"], + }, }); resetFlowRegistryForTests({ persist: false }); @@ -103,6 +115,10 @@ describe("flow-registry store runtime", () => { shape: "linear", status: "waiting", currentStep: "ask_user", + waitingOnTaskId: "task-restored", + outputs: { + bucket: ["personal"], + }, }); }); }); diff --git a/src/tasks/flow-registry.test.ts b/src/tasks/flow-registry.test.ts index 5a0af86e4e59..a6a723229045 100644 --- a/src/tasks/flow-registry.test.ts +++ b/src/tasks/flow-registry.test.ts @@ -61,11 +61,19 @@ describe("flow-registry", () => { const updated = updateFlowRecordById(created.flowId, { status: "waiting", currentStep: "ask_user", + waitingOnTaskId: "task-123", + outputs: { + bucket: ["personal"], + }, }); expect(updated).toMatchObject({ flowId: created.flowId, status: "waiting", currentStep: "ask_user", + waitingOnTaskId: "task-123", + outputs: { + bucket: ["personal"], + }, }); expect(listFlowRecords()).toEqual([ diff --git a/src/tasks/flow-registry.ts b/src/tasks/flow-registry.ts index 76c49efc764d..f1defc59cac5 100644 --- a/src/tasks/flow-registry.ts +++ b/src/tasks/flow-registry.ts @@ -1,15 +1,23 @@ import crypto from "node:crypto"; import { getFlowRegistryStore, resetFlowRegistryRuntimeForTests } from "./flow-registry.store.js"; -import type { FlowRecord, FlowShape, FlowStatus } from "./flow-registry.types.js"; +import type { FlowOutputBag, FlowRecord, FlowShape, FlowStatus } from "./flow-registry.types.js"; import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js"; const flows = new Map(); let restoreAttempted = false; +function cloneFlowOutputs(outputs: FlowOutputBag | undefined): FlowOutputBag | undefined { + if (!outputs) { + return undefined; + } + return JSON.parse(JSON.stringify(outputs)) as FlowOutputBag; +} + function cloneFlowRecord(record: FlowRecord): FlowRecord { return { ...record, ...(record.requesterOrigin ? { requesterOrigin: { ...record.requesterOrigin } } : {}), + ...(record.outputs ? { outputs: cloneFlowOutputs(record.outputs) } : {}), }; } @@ -38,25 +46,16 @@ function resolveFlowBlockedSummary( return task.terminalSummary?.trim() || task.progressSummary?.trim() || undefined; } -type FlowRecordPatch = Partial< - Omit< - Pick< - FlowRecord, - | "status" - | "notifyPolicy" - | "goal" - | "currentStep" - | "blockedTaskId" - | "blockedSummary" - | "updatedAt" - | "endedAt" - >, - "currentStep" | "blockedTaskId" | "blockedSummary" | "endedAt" - > -> & { +type FlowRecordPatch = { + status?: FlowStatus; + notifyPolicy?: TaskNotifyPolicy; + goal?: string; currentStep?: string | null; + waitingOnTaskId?: string | null; + outputs?: FlowOutputBag | null; blockedTaskId?: string | null; blockedSummary?: string | null; + updatedAt?: number; endedAt?: number | null; }; @@ -125,6 +124,8 @@ export function createFlowRecord(params: { notifyPolicy?: TaskNotifyPolicy; goal: string; currentStep?: string; + waitingOnTaskId?: string; + outputs?: FlowOutputBag; blockedTaskId?: string; blockedSummary?: string; createdAt?: number; @@ -142,6 +143,8 @@ export function createFlowRecord(params: { notifyPolicy: ensureNotifyPolicy(params.notifyPolicy), goal: params.goal, currentStep: params.currentStep?.trim() || undefined, + waitingOnTaskId: params.waitingOnTaskId?.trim() || undefined, + outputs: cloneFlowOutputs(params.outputs), blockedTaskId: params.blockedTaskId?.trim() || undefined, blockedSummary: params.blockedSummary?.trim() || undefined, createdAt: now, @@ -212,6 +215,14 @@ export function updateFlowRecordById(flowId: string, patch: FlowRecordPatch): Fl patch.currentStep === undefined ? current.currentStep : patch.currentStep?.trim() || undefined, + waitingOnTaskId: + patch.waitingOnTaskId === undefined + ? current.waitingOnTaskId + : patch.waitingOnTaskId?.trim() || undefined, + outputs: + patch.outputs === undefined + ? cloneFlowOutputs(current.outputs) + : (cloneFlowOutputs(patch.outputs ?? undefined) ?? undefined), blockedTaskId: patch.blockedTaskId === undefined ? current.blockedTaskId diff --git a/src/tasks/flow-registry.types.ts b/src/tasks/flow-registry.types.ts index e984e12132a7..188f5c9ceebb 100644 --- a/src/tasks/flow-registry.types.ts +++ b/src/tasks/flow-registry.types.ts @@ -3,6 +3,16 @@ import type { TaskNotifyPolicy } from "./task-registry.types.js"; export type FlowShape = "single_task" | "linear"; +export type FlowOutputValue = + | null + | boolean + | number + | string + | FlowOutputValue[] + | { [key: string]: FlowOutputValue }; + +export type FlowOutputBag = Record; + export type FlowStatus = | "queued" | "running" @@ -22,6 +32,8 @@ export type FlowRecord = { notifyPolicy: TaskNotifyPolicy; goal: string; currentStep?: string; + waitingOnTaskId?: string; + outputs?: FlowOutputBag; blockedTaskId?: string; blockedSummary?: string; createdAt: number; diff --git a/src/tasks/flow-runtime.test.ts b/src/tasks/flow-runtime.test.ts new file mode 100644 index 000000000000..5fdf009bbedd --- /dev/null +++ b/src/tasks/flow-runtime.test.ts @@ -0,0 +1,281 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { getFlowById, resetFlowRegistryForTests, updateFlowRecordById } from "./flow-registry.js"; +import { + appendFlowOutput, + createFlow, + emitFlowUpdate, + failFlow, + finishFlow, + resumeFlow, + runTaskInFlow, + setFlowOutput, +} from "./flow-runtime.js"; +import { listTasksForFlowId, resetTaskRegistryForTests } from "./task-registry.js"; + +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; +const mocks = vi.hoisted(() => ({ + sendMessageMock: vi.fn(), + enqueueSystemEventMock: vi.fn(), + requestHeartbeatNowMock: vi.fn(), +})); + +vi.mock("./task-registry-delivery-runtime.js", () => ({ + sendMessage: (...args: unknown[]) => mocks.sendMessageMock(...args), +})); + +vi.mock("../infra/system-events.js", () => ({ + enqueueSystemEvent: (...args: unknown[]) => mocks.enqueueSystemEventMock(...args), +})); + +vi.mock("../infra/heartbeat-wake.js", () => ({ + requestHeartbeatNow: (...args: unknown[]) => mocks.requestHeartbeatNowMock(...args), +})); + +vi.mock("../infra/agent-events.js", () => ({ + onAgentEvent: () => () => {}, +})); + +vi.mock("../acp/control-plane/manager.js", () => ({ + getAcpSessionManager: () => ({ + cancelSession: vi.fn(), + }), +})); + +vi.mock("../agents/subagent-control.js", () => ({ + killSubagentRunAdmin: vi.fn(), +})); + +async function withFlowRuntimeStateDir(run: (root: string) => Promise): Promise { + await withTempDir({ prefix: "openclaw-flow-runtime-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + try { + await run(root); + } finally { + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + } + }); +} + +describe("flow-runtime", () => { + afterEach(() => { + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + mocks.sendMessageMock.mockReset(); + mocks.enqueueSystemEventMock.mockReset(); + mocks.requestHeartbeatNowMock.mockReset(); + }); + + it("runs a child task under a linear flow and marks the flow as waiting on it", async () => { + await withFlowRuntimeStateDir(async () => { + const flow = createFlow({ + ownerSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + goal: "Triage inbox", + }); + + const started = runTaskInFlow({ + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-runtime-1", + task: "Classify inbox messages", + currentStep: "wait_for_classification", + }); + + expect(started.task).toMatchObject({ + requesterSessionKey: "agent:main:main", + parentFlowId: flow.flowId, + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-runtime-1", + status: "queued", + }); + expect(started.flow).toMatchObject({ + flowId: flow.flowId, + status: "waiting", + currentStep: "wait_for_classification", + waitingOnTaskId: started.task.taskId, + }); + expect(listTasksForFlowId(flow.flowId)).toHaveLength(1); + }); + }); + + it("stores outputs and waiting metadata across sqlite restore", async () => { + await withFlowRuntimeStateDir(async () => { + const flow = createFlow({ + ownerSessionKey: "agent:main:main", + goal: "Inbox routing", + }); + + const started = runTaskInFlow({ + flowId: flow.flowId, + runtime: "subagent", + childSessionKey: "agent:codex:subagent:child", + runId: "run-flow-runtime-restore", + task: "Bucket messages", + }); + + setFlowOutput({ + flowId: flow.flowId, + key: "classification", + value: { + business: 1, + personal: 2, + }, + }); + appendFlowOutput({ + flowId: flow.flowId, + key: "eod_summary", + value: { + subject: "Newsletter", + }, + }); + + resetTaskRegistryForTests({ persist: false }); + resetFlowRegistryForTests({ persist: false }); + + expect(getFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + status: "waiting", + waitingOnTaskId: started.task.taskId, + outputs: { + classification: { + business: 1, + personal: 2, + }, + eod_summary: [ + { + subject: "Newsletter", + }, + ], + }, + }); + }); + }); + + it("reopens a blocked flow with resume and marks terminal states with finish/fail", async () => { + await withFlowRuntimeStateDir(async () => { + const flow = createFlow({ + ownerSessionKey: "agent:main:main", + goal: "Review inbox", + }); + const started = runTaskInFlow({ + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-runtime-reopen", + task: "Review inbox", + }); + + updateFlowRecordById(flow.flowId, { + status: "blocked", + blockedTaskId: started.task.taskId, + blockedSummary: "Need auth.", + endedAt: 120, + }); + + expect(resumeFlow({ flowId: flow.flowId, currentStep: "retry_auth" })).toMatchObject({ + flowId: flow.flowId, + status: "running", + currentStep: "retry_auth", + }); + expect(getFlowById(flow.flowId)?.blockedTaskId).toBeUndefined(); + expect(getFlowById(flow.flowId)?.waitingOnTaskId).toBeUndefined(); + expect(getFlowById(flow.flowId)?.endedAt).toBeUndefined(); + + expect( + finishFlow({ flowId: flow.flowId, currentStep: "finish", endedAt: 200 }), + ).toMatchObject({ + flowId: flow.flowId, + status: "succeeded", + currentStep: "finish", + endedAt: 200, + }); + + const failed = createFlow({ + ownerSessionKey: "agent:main:main", + goal: "Failing flow", + }); + expect(failFlow({ flowId: failed.flowId, currentStep: "abort", endedAt: 300 })).toMatchObject( + { + flowId: failed.flowId, + status: "failed", + currentStep: "abort", + endedAt: 300, + }, + ); + }); + }); + + it("delivers explicit flow updates through the flow owner context when possible", async () => { + await withFlowRuntimeStateDir(async () => { + const flow = createFlow({ + ownerSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + threadId: "42", + }, + goal: "Inbox routing", + }); + + const result = await emitFlowUpdate({ + flowId: flow.flowId, + content: "Personal message needs your attention.", + eventKey: "personal-alert", + }); + + expect(result.delivery).toBe("direct"); + expect(mocks.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + to: "telegram:123", + threadId: "42", + content: "Personal message needs your attention.", + idempotencyKey: `flow:${flow.flowId}:update:personal-alert`, + mirror: expect.objectContaining({ + sessionKey: "agent:main:main", + }), + }), + ); + }); + }); + + it("falls back to session-queued flow updates when direct delivery is unavailable", async () => { + await withFlowRuntimeStateDir(async () => { + const flow = createFlow({ + ownerSessionKey: "agent:main:main", + goal: "Inbox routing", + }); + + const result = await emitFlowUpdate({ + flowId: flow.flowId, + content: "Business email sent to Slack and waiting for reply.", + }); + + expect(result.delivery).toBe("session_queued"); + expect(mocks.enqueueSystemEventMock).toHaveBeenCalledWith( + "Business email sent to Slack and waiting for reply.", + expect.objectContaining({ + sessionKey: "agent:main:main", + contextKey: `flow:${flow.flowId}`, + }), + ); + expect(mocks.requestHeartbeatNowMock).toHaveBeenCalledWith({ + reason: "clawflow-update", + sessionKey: "agent:main:main", + }); + }); + }); +}); diff --git a/src/tasks/flow-runtime.ts b/src/tasks/flow-runtime.ts new file mode 100644 index 000000000000..0384dc5f139a --- /dev/null +++ b/src/tasks/flow-runtime.ts @@ -0,0 +1,377 @@ +import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; +import { enqueueSystemEvent } from "../infra/system-events.js"; +import { parseAgentSessionKey } from "../routing/session-key.js"; +import { isDeliverableMessageChannel } from "../utils/message-channel.js"; +import { createFlowRecord, getFlowById, updateFlowRecordById } from "./flow-registry.js"; +import type { FlowOutputBag, FlowOutputValue, FlowRecord } from "./flow-registry.types.js"; +import { createQueuedTaskRun, createRunningTaskRun } from "./task-executor.js"; +import { listTasksForFlowId } from "./task-registry.js"; +import type { + TaskDeliveryStatus, + TaskNotifyPolicy, + TaskRecord, + TaskRuntime, +} from "./task-registry.types.js"; + +let deliveryRuntimePromise: Promise | null = + null; + +type FlowTaskLaunch = "queued" | "running"; + +export type FlowUpdateDelivery = "direct" | "session_queued" | "parent_missing" | "failed"; + +function loadFlowDeliveryRuntime() { + deliveryRuntimePromise ??= import("./task-registry-delivery-runtime.js"); + return deliveryRuntimePromise; +} + +function requireFlow(flowId: string): FlowRecord { + const flow = getFlowById(flowId); + if (!flow) { + throw new Error(`Flow not found: ${flowId}`); + } + return flow; +} + +function requireLinearFlow(flowId: string): FlowRecord { + const flow = requireFlow(flowId); + if (flow.shape !== "linear") { + throw new Error(`Flow is not linear: ${flowId}`); + } + return flow; +} + +function cloneOutputValue(value: T): T { + return JSON.parse(JSON.stringify(value)) as T; +} + +function updateRequiredFlow( + flowId: string, + patch: Parameters[1], +): FlowRecord { + const updated = updateFlowRecordById(flowId, patch); + if (!updated) { + throw new Error(`Flow not found: ${flowId}`); + } + return updated; +} + +function resolveFlowOutputs(flow: FlowRecord): FlowOutputBag { + return flow.outputs ? cloneOutputValue(flow.outputs) : {}; +} + +function canDeliverFlowToRequesterOrigin(flow: FlowRecord): boolean { + const channel = flow.requesterOrigin?.channel?.trim(); + const to = flow.requesterOrigin?.to?.trim(); + return Boolean(channel && to && isDeliverableMessageChannel(channel)); +} + +export function createFlow(params: { + ownerSessionKey: string; + requesterOrigin?: FlowRecord["requesterOrigin"]; + goal: string; + notifyPolicy?: TaskNotifyPolicy; + currentStep?: string; + createdAt?: number; + updatedAt?: number; +}): FlowRecord { + return createFlowRecord({ + shape: "linear", + ownerSessionKey: params.ownerSessionKey, + requesterOrigin: params.requesterOrigin, + goal: params.goal, + notifyPolicy: params.notifyPolicy, + currentStep: params.currentStep, + status: "queued", + createdAt: params.createdAt, + updatedAt: params.updatedAt, + }); +} + +export function runTaskInFlow(params: { + flowId: string; + runtime: TaskRuntime; + sourceId?: string; + childSessionKey?: string; + parentTaskId?: string; + agentId?: string; + runId?: string; + label?: string; + task: string; + preferMetadata?: boolean; + notifyPolicy?: TaskNotifyPolicy; + deliveryStatus?: TaskDeliveryStatus; + launch?: FlowTaskLaunch; + startedAt?: number; + lastEventAt?: number; + progressSummary?: string | null; + currentStep?: string; +}): { flow: FlowRecord; task: TaskRecord } { + const flow = requireLinearFlow(params.flowId); + const launch = params.launch ?? "queued"; + const task = + launch === "running" + ? createRunningTaskRun({ + runtime: params.runtime, + sourceId: params.sourceId, + requesterSessionKey: flow.ownerSessionKey, + requesterOrigin: flow.requesterOrigin, + parentFlowId: flow.flowId, + childSessionKey: params.childSessionKey, + parentTaskId: params.parentTaskId, + agentId: params.agentId, + runId: params.runId, + label: params.label, + task: params.task, + preferMetadata: params.preferMetadata, + notifyPolicy: params.notifyPolicy ?? flow.notifyPolicy, + deliveryStatus: params.deliveryStatus, + startedAt: params.startedAt, + lastEventAt: params.lastEventAt, + progressSummary: params.progressSummary, + }) + : createQueuedTaskRun({ + runtime: params.runtime, + sourceId: params.sourceId, + requesterSessionKey: flow.ownerSessionKey, + requesterOrigin: flow.requesterOrigin, + parentFlowId: flow.flowId, + childSessionKey: params.childSessionKey, + parentTaskId: params.parentTaskId, + agentId: params.agentId, + runId: params.runId, + label: params.label, + task: params.task, + preferMetadata: params.preferMetadata, + notifyPolicy: params.notifyPolicy ?? flow.notifyPolicy, + deliveryStatus: params.deliveryStatus, + }); + return { + task, + flow: updateRequiredFlow(flow.flowId, { + status: "waiting", + currentStep: params.currentStep ?? flow.currentStep ?? "wait_for_task", + waitingOnTaskId: task.taskId, + blockedTaskId: null, + blockedSummary: null, + endedAt: null, + updatedAt: task.lastEventAt ?? task.startedAt ?? Date.now(), + }), + }; +} + +export function setFlowWaiting(params: { + flowId: string; + currentStep?: string | null; + waitingOnTaskId?: string | null; + updatedAt?: number; +}): FlowRecord { + const flow = requireLinearFlow(params.flowId); + if (params.waitingOnTaskId?.trim()) { + const waitingOnTaskId = params.waitingOnTaskId.trim(); + const linkedTaskIds = new Set(listTasksForFlowId(flow.flowId).map((task) => task.taskId)); + if (!linkedTaskIds.has(waitingOnTaskId)) { + throw new Error(`Flow ${flow.flowId} is not linked to task ${waitingOnTaskId}`); + } + } + return updateRequiredFlow(flow.flowId, { + status: "waiting", + currentStep: params.currentStep, + waitingOnTaskId: params.waitingOnTaskId, + endedAt: null, + updatedAt: params.updatedAt ?? Date.now(), + }); +} + +export function setFlowOutput(params: { + flowId: string; + key: string; + value: FlowOutputValue; + updatedAt?: number; +}): FlowRecord { + const flow = requireLinearFlow(params.flowId); + const key = params.key.trim(); + if (!key) { + throw new Error("Flow output key is required."); + } + const outputs = resolveFlowOutputs(flow); + outputs[key] = cloneOutputValue(params.value); + return updateRequiredFlow(flow.flowId, { + outputs, + updatedAt: params.updatedAt ?? Date.now(), + }); +} + +export function appendFlowOutput(params: { + flowId: string; + key: string; + value: FlowOutputValue; + updatedAt?: number; +}): FlowRecord { + const flow = requireLinearFlow(params.flowId); + const key = params.key.trim(); + if (!key) { + throw new Error("Flow output key is required."); + } + const outputs = resolveFlowOutputs(flow); + const nextValue = cloneOutputValue(params.value); + const current = outputs[key]; + if (current === undefined) { + outputs[key] = [nextValue]; + } else if (Array.isArray(current)) { + outputs[key] = [...current, nextValue]; + } else { + throw new Error(`Flow output ${key} is not an array.`); + } + return updateRequiredFlow(flow.flowId, { + outputs, + updatedAt: params.updatedAt ?? Date.now(), + }); +} + +export function resumeFlow(params: { + flowId: string; + currentStep?: string | null; + updatedAt?: number; +}): FlowRecord { + const flow = requireLinearFlow(params.flowId); + return updateRequiredFlow(flow.flowId, { + status: "running", + currentStep: params.currentStep, + waitingOnTaskId: null, + blockedTaskId: null, + blockedSummary: null, + endedAt: null, + updatedAt: params.updatedAt ?? Date.now(), + }); +} + +export function finishFlow(params: { + flowId: string; + currentStep?: string | null; + updatedAt?: number; + endedAt?: number; +}): FlowRecord { + const flow = requireLinearFlow(params.flowId); + const endedAt = params.endedAt ?? params.updatedAt ?? Date.now(); + return updateRequiredFlow(flow.flowId, { + status: "succeeded", + currentStep: params.currentStep, + waitingOnTaskId: null, + blockedTaskId: null, + blockedSummary: null, + updatedAt: params.updatedAt ?? endedAt, + endedAt, + }); +} + +export function failFlow(params: { + flowId: string; + currentStep?: string | null; + updatedAt?: number; + endedAt?: number; +}): FlowRecord { + const flow = requireLinearFlow(params.flowId); + const endedAt = params.endedAt ?? params.updatedAt ?? Date.now(); + return updateRequiredFlow(flow.flowId, { + status: "failed", + currentStep: params.currentStep, + waitingOnTaskId: null, + blockedTaskId: null, + blockedSummary: null, + updatedAt: params.updatedAt ?? endedAt, + endedAt, + }); +} + +export async function emitFlowUpdate(params: { + flowId: string; + content: string; + eventKey?: string; + currentStep?: string | null; + updatedAt?: number; +}): Promise<{ flow: FlowRecord; delivery: FlowUpdateDelivery }> { + const flow = requireFlow(params.flowId); + const content = params.content.trim(); + if (!content) { + throw new Error("Flow update content is required."); + } + const ownerSessionKey = flow.ownerSessionKey.trim(); + const updatedAt = params.updatedAt ?? Date.now(); + const updatedFlow = updateRequiredFlow(flow.flowId, { + currentStep: params.currentStep, + updatedAt, + }); + if (!ownerSessionKey) { + return { + flow: updatedFlow, + delivery: "parent_missing", + }; + } + if (!canDeliverFlowToRequesterOrigin(updatedFlow)) { + try { + enqueueSystemEvent(content, { + sessionKey: ownerSessionKey, + contextKey: `flow:${updatedFlow.flowId}`, + deliveryContext: updatedFlow.requesterOrigin, + }); + requestHeartbeatNow({ + reason: "clawflow-update", + sessionKey: ownerSessionKey, + }); + return { + flow: updatedFlow, + delivery: "session_queued", + }; + } catch { + return { + flow: updatedFlow, + delivery: "failed", + }; + } + } + try { + const requesterAgentId = parseAgentSessionKey(ownerSessionKey)?.agentId; + const idempotencyKey = `flow:${updatedFlow.flowId}:update:${params.eventKey?.trim() || updatedAt}`; + const { sendMessage } = await loadFlowDeliveryRuntime(); + await sendMessage({ + channel: updatedFlow.requesterOrigin?.channel, + to: updatedFlow.requesterOrigin?.to ?? "", + accountId: updatedFlow.requesterOrigin?.accountId, + threadId: updatedFlow.requesterOrigin?.threadId, + content, + agentId: requesterAgentId, + idempotencyKey, + mirror: { + sessionKey: ownerSessionKey, + agentId: requesterAgentId, + idempotencyKey, + }, + }); + return { + flow: updatedFlow, + delivery: "direct", + }; + } catch { + try { + enqueueSystemEvent(content, { + sessionKey: ownerSessionKey, + contextKey: `flow:${updatedFlow.flowId}`, + deliveryContext: updatedFlow.requesterOrigin, + }); + requestHeartbeatNow({ + reason: "clawflow-update", + sessionKey: ownerSessionKey, + }); + return { + flow: updatedFlow, + delivery: "session_queued", + }; + } catch { + return { + flow: updatedFlow, + delivery: "failed", + }; + } + } +} diff --git a/src/tasks/task-registry-import-boundary.test.ts b/src/tasks/task-registry-import-boundary.test.ts index 6d0a7dc83890..d6c5091f9de3 100644 --- a/src/tasks/task-registry-import-boundary.test.ts +++ b/src/tasks/task-registry-import-boundary.test.ts @@ -12,6 +12,7 @@ const ALLOWED_IMPORTERS = new Set([ "commands/doctor-workspace-status.ts", "commands/flows.ts", "commands/tasks.ts", + "tasks/flow-runtime.ts", "tasks/task-executor.ts", "tasks/task-registry.maintenance.ts", ]);