Compare commits

...

1 Commits

Author SHA1 Message Date
Peter Steinberger
f8f374717f feat: prioritize foreground command queue work 2026-05-10 10:09:53 +01:00
8 changed files with 311 additions and 22 deletions

View File

@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
### Changes
- Auto-reply/queue: prioritize foreground user/manual turns ahead of lower-priority cron, heartbeat, memory, and deferred maintenance work within the same command lane, while preserving FIFO ordering within each priority and promoting old background entries to avoid starvation. Fixes #79589. Thanks @SebTardif.
- QA/Mantis: add Telegram live PR evidence automation with Convex-leased credentials, Crabbox transcript capture, motion GIF previews, and inline PR comments.
- QA/Mantis: add a Telegram desktop scenario builder that leases Crabbox, installs native Telegram Desktop, configures an OpenClaw Telegram gateway with leased bot credentials, and records VNC screenshot/video artifacts.
- Discord/voice: add realtime voice diagnostics for speaker turns, playback resets, barge-in detection, and audio cutoff analysis.

View File

@@ -15,9 +15,10 @@ We serialize inbound auto-reply runs (all channels) through a tiny in-process qu
## How it works
- A lane-aware FIFO queue drains each lane with a configurable concurrency cap (default 1 for unconfigured lanes; main defaults to 4, subagent to 8).
- A lane-aware queue drains each lane with a configurable concurrency cap (default 1 for unconfigured lanes; main defaults to 4, subagent to 8). Entries with the same priority remain FIFO; user/manual turns can jump ahead of lower-priority background work in the same lane.
- `runEmbeddedPiAgent` enqueues by **session key** (lane `session:<key>`) to guarantee only one active run per session.
- Each session run is then queued into a **global lane** (`main` by default) so overall parallelism is capped by `agents.defaults.maxConcurrent`.
- Priority is local to a lane. It does not interrupt an active run; it only chooses the next queued entry when a lane has capacity. A starvation guard promotes old low/normal-priority entries after a wait threshold.
- When verbose logging is enabled, queued runs emit a short notice if they waited more than ~2s before starting.
- Typing indicators still fire immediately on enqueue (when supported by the channel) so user experience is unchanged while we wait our turn.

View File

@@ -9,6 +9,7 @@ import type {
import { sleepWithAbort } from "../../infra/backoff.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { enqueueCommandInLane, getQueueSize } from "../../process/command-queue.js";
import { CommandPriority } from "../../process/command-queue.types.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import {
completeTaskRunByRunId,
@@ -315,6 +316,7 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
return await enqueueCommandInLane(
resolveSessionLane(rewriteSessionKey),
async () => await rewriteTranscriptEntriesInFile(),
{ priority: CommandPriority.Low },
);
}
return await rewriteTranscriptEntriesInFile();
@@ -569,18 +571,21 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule
const schedulerAbort = createDeferredTurnMaintenanceAbortSignal();
let runPromise: Promise<void>;
try {
runPromise = enqueueCommandInLane(resolveDeferredTurnMaintenanceLane(sessionKey), async () =>
runDeferredTurnMaintenanceWorker({
contextEngine: params.contextEngine,
sessionId: params.sessionId,
sessionKey,
sessionFile: params.sessionFile,
sessionManager: params.sessionManager,
runtimeContext: params.runtimeContext,
agentId: params.agentId,
config: params.config,
runId: task.runId!,
}),
runPromise = enqueueCommandInLane(
resolveDeferredTurnMaintenanceLane(sessionKey),
async () =>
runDeferredTurnMaintenanceWorker({
contextEngine: params.contextEngine,
sessionId: params.sessionId,
sessionKey,
sessionFile: params.sessionFile,
sessionManager: params.sessionManager,
runtimeContext: params.runtimeContext,
agentId: params.agentId,
config: params.config,
runId: task.runId!,
}),
{ priority: CommandPriority.Low },
);
} catch (err) {
schedulerAbort.dispose();

View File

@@ -16,7 +16,10 @@ import { buildAgentHookContextChannelFields } from "../../plugins/hook-agent-con
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { resolveProviderAuthProfileId } from "../../plugins/provider-runtime.js";
import { enqueueCommandInLane } from "../../process/command-queue.js";
import type { CommandQueueEnqueueOptions } from "../../process/command-queue.types.js";
import {
CommandPriority,
type CommandQueueEnqueueOptions,
} from "../../process/command-queue.types.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { sanitizeForLog } from "../../terminal/ansi.js";
import { resolveUserPath } from "../../utils.js";
@@ -216,6 +219,23 @@ function withEmbeddedRunLaneTimeout(
return { ...opts, taskTimeoutMs: laneTaskTimeoutMs };
}
function resolveEmbeddedRunQueuePriority(params: RunEmbeddedPiAgentParams): CommandPriority {
if (params.queuePriority !== undefined) {
return params.queuePriority;
}
switch (params.trigger) {
case "user":
case "manual":
return CommandPriority.High;
case "cron":
case "heartbeat":
case "memory":
return CommandPriority.Low;
default:
return CommandPriority.Normal;
}
}
function normalizeEmbeddedRunAttemptResult(
attempt: EmbeddedRunAttemptForRunner,
): EmbeddedRunAttemptForRunner {
@@ -375,14 +395,21 @@ export async function runEmbeddedPiAgent(
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
const globalLane = resolveGlobalLane(params.lane);
const laneTaskTimeoutMs = resolveEmbeddedRunLaneTimeoutMs(params.timeoutMs);
const queuePriority = resolveEmbeddedRunQueuePriority(params);
const withQueuePriority = (opts?: CommandQueueEnqueueOptions): CommandQueueEnqueueOptions => ({
...opts,
priority: opts?.priority ?? queuePriority,
});
const withLaneTimeout = (opts?: CommandQueueEnqueueOptions) =>
withEmbeddedRunLaneTimeout(opts, laneTaskTimeoutMs);
withEmbeddedRunLaneTimeout(withQueuePriority(opts), laneTaskTimeoutMs);
const enqueueGlobal = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) =>
params.enqueue
? params.enqueue(task, withLaneTimeout(opts))
: enqueueCommandInLane(globalLane, task, withLaneTimeout(opts));
const enqueueSession = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) =>
params.enqueue ? params.enqueue(task, opts) : enqueueCommandInLane(sessionLane, task, opts);
params.enqueue
? params.enqueue(task, withQueuePriority(opts))
: enqueueCommandInLane(sessionLane, task, withQueuePriority(opts));
const channelHint = params.messageChannel ?? params.messageProvider;
const resolvedToolResultFormat =
params.toolResultFormat ??

View File

@@ -9,7 +9,10 @@ import type { ReplyOperation } from "../../../auto-reply/reply/reply-run-registr
import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../../../auto-reply/thinking.js";
import type { OpenClawConfig } from "../../../config/types.openclaw.js";
import type { PromptImageOrderEntry } from "../../../media/prompt-image-order.js";
import type { CommandQueueEnqueueFn } from "../../../process/command-queue.types.js";
import type {
CommandPriority,
CommandQueueEnqueueFn,
} from "../../../process/command-queue.types.js";
import type { InputProvenance } from "../../../sessions/input-provenance.js";
import type { ExecElevatedDefaults, ExecToolDefaults } from "../../bash-tools.exec-types.js";
import type { AgentStreamParams, ClientToolDefinition } from "../../command/shared-types.js";
@@ -184,6 +187,7 @@ export type RunEmbeddedPiAgentParams = {
sessionKey?: string;
}) => void | Promise<void>;
lane?: string;
queuePriority?: CommandPriority;
enqueue?: CommandQueueEnqueueFn;
extraSystemPrompt?: string;
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;

View File

@@ -1,5 +1,6 @@
import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { CommandPriority } from "./command-queue.types.js";
import { CommandLane } from "./lanes.js";
const diagnosticMocks = vi.hoisted(() => ({
@@ -23,6 +24,7 @@ type CommandQueueModule = typeof import("./command-queue.js");
let clearCommandLane: CommandQueueModule["clearCommandLane"];
let CommandLaneClearedError: CommandQueueModule["CommandLaneClearedError"];
let CommandLaneTaskTimeoutError: CommandQueueModule["CommandLaneTaskTimeoutError"];
let COMMAND_QUEUE_STARVATION_PROMOTION_MS: CommandQueueModule["COMMAND_QUEUE_STARVATION_PROMOTION_MS"];
let enqueueCommand: CommandQueueModule["enqueueCommand"];
let enqueueCommandInLane: CommandQueueModule["enqueueCommandInLane"];
let GatewayDrainingError: CommandQueueModule["GatewayDrainingError"];
@@ -68,6 +70,7 @@ describe("command queue", () => {
clearCommandLane,
CommandLaneClearedError,
CommandLaneTaskTimeoutError,
COMMAND_QUEUE_STARVATION_PROMOTION_MS,
enqueueCommand,
enqueueCommandInLane,
GatewayDrainingError,
@@ -133,6 +136,205 @@ describe("command queue", () => {
expect(getQueueSize()).toBe(0);
});
it("runs higher-priority queued tasks before lower-priority work", async () => {
const lane = `priority-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 0);
const calls: string[] = [];
const low = enqueueCommandInLane(
lane,
async () => {
calls.push("low");
return "low";
},
{ priority: CommandPriority.Low },
);
const high = enqueueCommandInLane(
lane,
async () => {
calls.push("high");
return "high";
},
{ priority: CommandPriority.High },
);
const normal = enqueueCommandInLane(
lane,
async () => {
calls.push("normal");
return "normal";
},
{ priority: CommandPriority.Normal },
);
setCommandLaneConcurrency(lane, 1);
await expect(Promise.all([low, high, normal])).resolves.toEqual(["low", "high", "normal"]);
expect(calls).toEqual(["high", "normal", "low"]);
});
it("preserves FIFO order within the same priority", async () => {
const lane = `priority-fifo-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 0);
const calls: number[] = [];
const first = enqueueCommandInLane(
lane,
async () => {
calls.push(1);
return 1;
},
{ priority: CommandPriority.High },
);
const second = enqueueCommandInLane(
lane,
async () => {
calls.push(2);
return 2;
},
{ priority: CommandPriority.High },
);
setCommandLaneConcurrency(lane, 1);
await expect(Promise.all([first, second])).resolves.toEqual([1, 2]);
expect(calls).toEqual([1, 2]);
});
it("promotes old lower-priority work to avoid starvation", async () => {
vi.useFakeTimers();
try {
vi.setSystemTime(0);
const lane = `priority-starvation-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 0);
const calls: string[] = [];
const low = enqueueCommandInLane(
lane,
async () => {
calls.push("low");
return "low";
},
{ priority: CommandPriority.Low },
);
vi.setSystemTime(COMMAND_QUEUE_STARVATION_PROMOTION_MS);
const high = enqueueCommandInLane(
lane,
async () => {
calls.push("high");
return "high";
},
{ priority: CommandPriority.High },
);
setCommandLaneConcurrency(lane, 1);
await vi.runAllTimersAsync();
await expect(Promise.all([low, high])).resolves.toEqual(["low", "high"]);
expect(calls).toEqual(["low", "high"]);
} finally {
vi.useRealTimers();
}
});
it("normalizes legacy and malformed priorities when selecting queued work", async () => {
const key = Symbol.for("openclaw.commandQueueState");
const globalStore = globalThis as Record<PropertyKey, unknown>;
const original = globalStore[key];
const lane = `priority-legacy-${Date.now()}-${Math.random().toString(16).slice(2)}`;
const calls: string[] = [];
try {
let resolveLegacy!: (value: string) => void;
let rejectLegacy!: (reason?: unknown) => void;
const legacy = new Promise<string>((resolve, reject) => {
resolveLegacy = resolve;
rejectLegacy = reject;
});
let resolveMalformed!: (value: string) => void;
let rejectMalformed!: (reason?: unknown) => void;
const malformed = new Promise<string>((resolve, reject) => {
resolveMalformed = resolve;
rejectMalformed = reject;
});
let resolveHigh!: (value: string) => void;
let rejectHigh!: (reason?: unknown) => void;
const high = new Promise<string>((resolve, reject) => {
resolveHigh = resolve;
rejectHigh = reject;
});
globalStore[key] = {
gatewayDraining: false,
lanes: new Map([
[
lane,
{
lane,
queue: [
{
task: async () => {
calls.push("legacy");
return "legacy";
},
resolve: resolveLegacy,
reject: rejectLegacy,
enqueuedAt: Date.now(),
warnAfterMs: 2_000,
},
{
task: async () => {
calls.push("malformed");
return "malformed";
},
resolve: resolveMalformed,
reject: rejectMalformed,
enqueuedAt: Date.now(),
priority: Number.NaN,
warnAfterMs: 2_000,
},
{
task: async () => {
calls.push("high");
return "high";
},
resolve: resolveHigh,
reject: rejectHigh,
enqueuedAt: Date.now(),
priority: CommandPriority.High,
warnAfterMs: 2_000,
},
],
activeTaskIds: new Set(),
maxConcurrent: 0,
draining: false,
generation: 0,
},
],
]),
activeTaskWaiters: new Set(),
nextTaskId: 1,
};
setCommandLaneConcurrency(lane, 1);
await expect(Promise.all([legacy, malformed, high])).resolves.toEqual([
"legacy",
"malformed",
"high",
]);
expect(calls).toEqual(["high", "legacy", "malformed"]);
} finally {
if (original !== undefined) {
globalStore[key] = original;
} else {
delete globalStore[key];
}
resetCommandQueueStateForTest();
}
});
it("logs enqueue depth after push", async () => {
const task = enqueueCommand(async () => {});

View File

@@ -4,7 +4,7 @@ import {
logLaneEnqueue,
} from "../logging/diagnostic-runtime.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
import type { CommandQueueEnqueueOptions } from "./command-queue.types.js";
import { CommandPriority, type CommandQueueEnqueueOptions } from "./command-queue.types.js";
import { CommandLane } from "./lanes.js";
/**
* Dedicated error type thrown when a queued command is rejected because
@@ -46,11 +46,14 @@ export class GatewayDrainingError extends Error {
// low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for
// the main auto-reply workflow.
export const COMMAND_QUEUE_STARVATION_PROMOTION_MS = 30_000;
type QueueEntry = {
task: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
enqueuedAt: number;
priority?: CommandPriority;
warnAfterMs: number;
taskTimeoutMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
@@ -117,6 +120,40 @@ function getLaneDepth(state: LaneState): number {
return state.queue.length + state.activeTaskIds.size;
}
function normalizeCommandPriority(value: unknown): CommandPriority {
if (value === CommandPriority.Low) {
return CommandPriority.Low;
}
if (value === CommandPriority.High) {
return CommandPriority.High;
}
return CommandPriority.Normal;
}
function resolveEffectiveCommandPriority(entry: QueueEntry, now: number): CommandPriority {
const priority = normalizeCommandPriority(entry.priority);
if (
priority < CommandPriority.High &&
now - entry.enqueuedAt >= COMMAND_QUEUE_STARVATION_PROMOTION_MS
) {
return CommandPriority.High;
}
return priority;
}
function pickNextQueueEntryIndex(queue: QueueEntry[], now: number): number {
let bestIndex = 0;
let bestPriority = CommandPriority.Low - 1;
for (let index = 0; index < queue.length; index += 1) {
const priority = resolveEffectiveCommandPriority(queue[index], now);
if (priority > bestPriority) {
bestPriority = priority;
bestIndex = index;
}
}
return bestIndex;
}
function createCommandLaneSnapshot(state: LaneState): CommandLaneSnapshot {
return {
lane: state.lane,
@@ -243,16 +280,18 @@ function drainLane(lane: string) {
const pump = () => {
try {
while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) {
const entry = state.queue.shift() as QueueEntry;
const waitedMs = Date.now() - entry.enqueuedAt;
const now = Date.now();
const entryIndex = pickNextQueueEntryIndex(state.queue, now);
const [entry] = state.queue.splice(entryIndex, 1) as [QueueEntry];
const waitedMs = now - entry.enqueuedAt;
if (waitedMs >= entry.warnAfterMs) {
try {
entry.onWait?.(waitedMs, state.queue.length);
entry.onWait?.(waitedMs, entryIndex);
} catch (err) {
diag.error(`lane onWait callback failed: lane=${lane} error="${String(err)}"`);
}
diag.warn(
`lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`,
`lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${entryIndex}`,
);
}
logLaneDequeue(lane, waitedMs, state.queue.length);
@@ -337,6 +376,7 @@ export function enqueueCommandInLane<T>(
resolve: (value) => resolve(value as T),
reject,
enqueuedAt: Date.now(),
priority: normalizeCommandPriority(opts?.priority),
warnAfterMs,
taskTimeoutMs: normalizeTaskTimeoutMs(opts?.taskTimeoutMs),
onWait: opts?.onWait,

View File

@@ -1,4 +1,13 @@
export const CommandPriority = {
Low: 0,
Normal: 1,
High: 2,
} as const;
export type CommandPriority = (typeof CommandPriority)[keyof typeof CommandPriority];
export type CommandQueueEnqueueOptions = {
priority?: CommandPriority;
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
taskTimeoutMs?: number;