fix(cron): isolate main-session cron wake lanes (#82767)

* fix(cron): isolate main-session cron wake lanes

* test(cron): expect dedicated main cron lanes

* fix(cron): route global main cron wakes

* docs(changelog): note cron main-session lane fix

---------

Co-authored-by: Galin Iliev <Galin.Iliev@microsoft.com>
Co-authored-by: Galin Iliev <5711535+galiniliev@users.noreply.github.com>
This commit is contained in:
Galin Iliev
2026-05-19 21:12:28 -07:00
committed by GitHub
parent a54c73687f
commit 9eee202a69
11 changed files with 312 additions and 47 deletions

View File

@@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai
- Plugins/hooks: apply a default 30-second timeout to `before_compaction` and `after_compaction` hooks so a hung plugin handler no longer blocks compaction completion. (#84153)
- Discord: preserve disabled presentation buttons when adapting and rendering Discord message controls. (#84188) Thanks @100menotu001.
- Twitch: add a test-only client-manager registry reset helper so non-isolated Twitch tests can clear cached managers between cases. Fixes #83887. (#84244) Thanks @hclsys.
- Cron: run main-session scheduled work on a cron-owned wake lane while preserving reply delivery context, so background cron turns no longer block human main-session chat. Fixes #82766. (#82767) Thanks @galiniliev.
- Cron: use structured embedded-run denial metadata for isolated scheduled tasks so blocked exec requests fail the job without treating ordinary assistant prose as a denial. (#84067) Thanks @abnershang.
- Cron: keep recovered tool warnings diagnostic for successful scheduled runs so final cron output is delivered instead of being replaced by a post-processing warning. (#84045) Thanks @abnershang.
- Plugins/perf: thread explicit plugin discovery results through `loadBundledCapabilityRuntimeRegistry`, `resolveBundledPluginSources`, and `listChannelCatalogEntries` so callers that already hold a discovery result skip redundant filesystem walks. Thanks @SebTardif.

View File

@@ -90,14 +90,14 @@ This fires ~56 times per month instead of 01 times per month. OpenClaw use
| Style | `--session` value | Runs in | Best for |
| --------------- | ------------------- | ------------------------ | ------------------------------- |
| Main session | `main` | Next heartbeat turn | Reminders, system events |
| Main session | `main` | Dedicated cron wake lane | Reminders, system events |
| Isolated | `isolated` | Dedicated `cron:<jobId>` | Reports, background chores |
| Current session | `current` | Bound at creation time | Context-aware recurring work |
| Custom session | `session:custom-id` | Persistent named session | Workflows that build on history |
<AccordionGroup>
<Accordion title="Main session vs isolated vs custom">
**Main session** jobs enqueue a system event and optionally wake the heartbeat (`--wake now` or `--wake next-heartbeat`). Those system events do not extend daily/idle reset freshness for the target session. **Isolated** jobs run a dedicated agent turn with a fresh session. **Custom sessions** (`session:xxx`) persist context across runs, enabling workflows like daily standups that build on previous summaries.
**Main session** jobs enqueue a system event into a cron-owned run lane and optionally wake the heartbeat (`--wake now` or `--wake next-heartbeat`). They can use the target main session's last delivery context for replies, but they do not append routine cron turns to the human chat lane and do not extend daily/idle reset freshness for the target session. **Isolated** jobs run a dedicated agent turn with a fresh session. **Custom sessions** (`session:xxx`) persist context across runs, enabling workflows like daily standups that build on previous summaries.
</Accordion>
<Accordion title="What 'fresh session' means for isolated jobs">
For isolated jobs, "fresh session" means a new transcript/session id for each run. OpenClaw may carry safe preferences such as thinking/fast/verbose settings, labels, and explicit user-selected model/auth overrides, but it does not inherit ambient conversation context from an older cron row: channel/group routing, send or queue policy, elevation, origin, or ACP runtime binding. Use `current` or `session:<id>` when a recurring job should deliberately build on the same conversation context.

View File

@@ -12,6 +12,10 @@ const noopLogger = createNoopLogger();
const { makeStorePath } = createCronStoreHarness();
installCronTestHooks({ logger: noopLogger });
function expectCronRunSessionKey(value: unknown, jobId: string) {
expect(value).toMatch(new RegExp(`^agent:main:cron:${jobId}:run:\\d+$`));
}
describe("CronService interval/cron jobs fire on time", () => {
const runLateTimerAndLoadJob = async ({
cron,
@@ -34,6 +38,7 @@ describe("CronService interval/cron jobs fire on time", () => {
const expectMainSystemEvent = (
enqueueSystemEvent: ReturnType<typeof vi.fn>,
expectedText: string,
jobId: string,
) => {
const matchingCall = enqueueSystemEvent.mock.calls.find(([text]) => text === expectedText);
if (!matchingCall) {
@@ -41,7 +46,7 @@ describe("CronService interval/cron jobs fire on time", () => {
}
const options = matchingCall[1] as Record<string, unknown>;
expect(options.agentId).toBeUndefined();
expect(options.sessionKey).toBeUndefined();
expectCronRunSessionKey(options.sessionKey, jobId);
expect(typeof options.contextKey).toBe("string");
expect(String(options.contextKey).startsWith("cron:")).toBe(true);
};
@@ -85,7 +90,7 @@ describe("CronService interval/cron jobs fire on time", () => {
jobId: job.id,
firstDueAt,
});
expectMainSystemEvent(enqueueSystemEvent, "tick");
expectMainSystemEvent(enqueueSystemEvent, "tick", job.id);
expect(updated?.state.lastStatus).toBe("ok");
// nextRunAtMs must advance by at least one full interval past the due time.
expect(updated?.state.nextRunAtMs).toBeGreaterThanOrEqual(firstDueAt + 10_000);
@@ -122,7 +127,7 @@ describe("CronService interval/cron jobs fire on time", () => {
jobId: job.id,
firstDueAt,
});
expectMainSystemEvent(enqueueSystemEvent, "cron-tick");
expectMainSystemEvent(enqueueSystemEvent, "cron-tick", job.id);
expect(updated?.state.lastStatus).toBe("ok");
// nextRunAtMs should be the next whole-minute boundary (60s later).
expect(updated?.state.nextRunAtMs).toBe(firstDueAt + 60_000);

View File

@@ -43,7 +43,7 @@ describe("cron main job passes heartbeat target=last", () => {
runHeartbeatOnce: params.runHeartbeatOnce,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
return { cron, requestHeartbeat };
return { cron, enqueueSystemEvent, requestHeartbeat };
}
function requireRunHeartbeatOnceCall(
@@ -66,6 +66,7 @@ describe("cron main job passes heartbeat target=last", () => {
source?: string;
intent?: string;
reason?: string;
sessionKey?: string;
heartbeat?: unknown;
};
}
@@ -109,6 +110,8 @@ describe("cron main job passes heartbeat target=last", () => {
// heartbeat runner delivers the response to the last active channel.
const callArgs = requireRunHeartbeatOnceCall(runHeartbeatOnce);
expect(callArgs.heartbeat.target).toBe("last");
expect(callArgs.sessionKey).toMatch(/^agent:main:cron:test-main-delivery:run:\d+$/);
expect(callArgs.sessionKey).not.toBe("agent:main:main");
});
it("should preserve heartbeat.target=last when wakeMode=now falls back to requestHeartbeat", async () => {
@@ -140,6 +143,10 @@ describe("cron main job passes heartbeat target=last", () => {
expect(heartbeatRequest.source).toBe("cron");
expect(heartbeatRequest.intent).toBe("immediate");
expect(heartbeatRequest.reason).toBe("cron:test-main-delivery-busy");
expect(heartbeatRequest.sessionKey).toMatch(
/^agent:main:cron:test-main-delivery-busy:run:\d+$/,
);
expect(heartbeatRequest.sessionKey).not.toBe("agent:main:main");
expect(heartbeatRequest.heartbeat).toEqual({ target: "last" });
});
@@ -160,7 +167,7 @@ describe("cron main job passes heartbeat target=last", () => {
durationMs: 50,
}));
const { cron, requestHeartbeat } = createCronWithSpies({
const { cron, enqueueSystemEvent, requestHeartbeat } = createCronWithSpies({
storePath,
runHeartbeatOnce,
});
@@ -172,7 +179,11 @@ describe("cron main job passes heartbeat target=last", () => {
expect(heartbeatRequest.source).toBe("cron");
expect(heartbeatRequest.intent).toBe("event");
expect(heartbeatRequest.reason).toBe("cron:test-next-heartbeat");
expect(heartbeatRequest.sessionKey).toMatch(/^agent:main:cron:test-next-heartbeat:run:\d+$/);
expect(heartbeatRequest.sessionKey).not.toBe("agent:main:main");
expect(heartbeatRequest.heartbeat).toEqual({ target: "last" });
expect(runHeartbeatOnce).not.toHaveBeenCalled();
const enqueueOptions = enqueueSystemEvent.mock.calls[0]?.[1] as { sessionKey?: string };
expect(enqueueOptions.sessionKey).toBe(heartbeatRequest.sessionKey);
});
});

View File

@@ -19,6 +19,10 @@ const { makeStorePath } = createCronStoreHarness({
prefix: "openclaw-cron-runs-one-shot-",
});
function expectCronRunSessionKey(value: unknown, jobId: string) {
expect(value).toMatch(new RegExp(`^agent:main:cron:${jobId}:run:\\d+$`));
}
function createCronEventHarness() {
const events: CronEvent[] = [];
const waiters: Array<{
@@ -202,27 +206,33 @@ async function addMainOneShotHelloJob(
function expectMainSystemEventPosted(
enqueueSystemEvent: ReturnType<typeof vi.fn>,
params: { text: string; jobId: string; sessionKey?: string },
params: { text: string; jobId: string },
) {
expect(enqueueSystemEvent).toHaveBeenCalledWith(params.text, {
const matchingCall = enqueueSystemEvent.mock.calls.find(([text]) => text === params.text);
if (!matchingCall) {
throw new Error(`missing system event ${params.text}`);
}
const options = matchingCall[1] as Record<string, unknown>;
expect(options).toMatchObject({
agentId: undefined,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
contextKey: `cron:${params.jobId}`,
});
expectCronRunSessionKey(options.sessionKey, params.jobId);
}
function expectQueuedCronHeartbeat(
requestHeartbeat: ReturnType<typeof vi.fn>,
params: { jobId: string; sessionKey?: string },
params: { jobId: string },
) {
expect(requestHeartbeat).toHaveBeenCalledWith({
const request = requestHeartbeat.mock.calls[0]?.[0] as Record<string, unknown> | undefined;
expect(request).toMatchObject({
source: "cron",
intent: "immediate",
reason: `cron:${params.jobId}`,
agentId: undefined,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
heartbeat: { target: "last" },
});
expectCronRunSessionKey(request?.sessionKey, params.jobId);
}
async function stopCronAndCleanup(cron: CronService, store: { cleanup: () => Promise<void> }) {
@@ -403,7 +413,7 @@ describe("CronService", () => {
await cron.run(job.id, "force");
expect(runHeartbeatOnce).toHaveBeenCalled();
expectQueuedCronHeartbeat(requestHeartbeat, { jobId: job.id, sessionKey });
expectQueuedCronHeartbeat(requestHeartbeat, { jobId: job.id });
expect(job.state.lastStatus).toBe("ok");
expect(job.state.lastError).toBeUndefined();
@@ -430,7 +440,7 @@ describe("CronService", () => {
await cron.run(job.id, "force");
expect(runHeartbeatOnce).toHaveBeenCalledTimes(1);
expectQueuedCronHeartbeat(requestHeartbeat, { jobId: job.id, sessionKey });
expectQueuedCronHeartbeat(requestHeartbeat, { jobId: job.id });
expect(job.state.lastStatus).toBe("ok");
expect(job.state.lastError).toBeUndefined();

View File

@@ -1,5 +1,6 @@
import type { CronConfig } from "../../config/types.cron.js";
import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heartbeat-wake.js";
import type { DeliveryContext } from "../../utils/delivery-context.types.js";
import type {
CronAgentExecutionPhaseUpdate,
CronAgentExecutionStarted,
@@ -82,6 +83,7 @@ export type CronServiceDeps = {
agentId?: string;
sessionKey?: string;
contextKey?: string;
deliveryContext?: DeliveryContext;
forceSenderIsOwnerFalse?: boolean;
},
) => void;

View File

@@ -1,8 +1,9 @@
import fs from "node:fs/promises";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/service.test-harness.js";
import { createCronServiceState } from "../../cron/service/state.js";
import { onTimer } from "../../cron/service/timer.js";
import { executeJobCore, onTimer } from "../../cron/service/timer.js";
import { loadCronStore, saveCronStore } from "../../cron/store.js";
import type { CronJob } from "../../cron/types.js";
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
@@ -49,6 +50,62 @@ afterEach(() => {
});
describe("cron service timer seam coverage", () => {
it("routes main cron jobs onto a cron run lane derived from the target agent", async () => {
const { storePath } = await makeStorePath();
const now = Date.parse("2026-03-23T12:00:00.000Z");
const enqueueSystemEvent = vi.fn();
const requestHeartbeat = vi.fn();
const runHeartbeatOnce = vi.fn(async () => ({ status: "ran" as const, durationMs: 1 }));
const job = {
...createDueMainJob({ now, wakeMode: "now" }),
sessionKey: "agent:main-pr-router:main",
state: { runningAtMs: now },
};
const cronRunSessionKey = `agent:main-pr-router:cron:main-heartbeat-job:run:${now}`;
const sessionStorePath = path.join(path.dirname(path.dirname(storePath)), "sessions.json");
await fs.writeFile(
sessionStorePath,
JSON.stringify({
"agent:main-pr-router:main": {
lastChannel: "discord",
lastTo: "channel-1",
lastAccountId: "default",
},
}),
"utf8",
);
const state = createCronServiceState({
storePath,
cronEnabled: true,
log: logger,
nowMs: () => now,
resolveSessionStorePath: () => sessionStorePath,
enqueueSystemEvent,
requestHeartbeat,
runHeartbeatOnce,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
const result = await executeJobCore(state, job);
expect(result).toMatchObject({ status: "ok", sessionKey: cronRunSessionKey });
expect(enqueueSystemEvent).toHaveBeenCalledWith("heartbeat seam tick", {
agentId: undefined,
sessionKey: cronRunSessionKey,
contextKey: "cron:main-heartbeat-job",
deliveryContext: { channel: "discord", to: "channel-1", accountId: "default" },
});
expect(runHeartbeatOnce).toHaveBeenCalledWith({
source: "cron",
intent: "immediate",
reason: "cron:main-heartbeat-job",
agentId: undefined,
sessionKey: cronRunSessionKey,
heartbeat: { target: "last" },
});
});
it("persists the next schedule and hands off next-heartbeat main jobs", async () => {
const { storePath } = await makeStorePath();
const now = Date.parse("2026-03-23T12:00:00.000Z");
@@ -73,9 +130,10 @@ describe("cron service timer seam coverage", () => {
await onTimer(state);
const cronRunSessionKey = `agent:main:cron:main-heartbeat-job:run:${now}`;
expect(enqueueSystemEvent).toHaveBeenCalledWith("heartbeat seam tick", {
agentId: undefined,
sessionKey: "agent:main:main",
sessionKey: cronRunSessionKey,
contextKey: "cron:main-heartbeat-job",
});
expect(requestHeartbeat).toHaveBeenCalledWith({
@@ -83,7 +141,7 @@ describe("cron service timer seam coverage", () => {
intent: "event",
reason: "cron:main-heartbeat-job",
agentId: undefined,
sessionKey: "agent:main:main",
sessionKey: cronRunSessionKey,
heartbeat: { target: "last" },
});
@@ -103,7 +161,7 @@ describe("cron service timer seam coverage", () => {
expect(task.sourceId).toBe("main-heartbeat-job");
expect(task.ownerKey).toBe("");
expect(task.scopeKind).toBe("system");
expect(task.childSessionKey).toBe("agent:main:main");
expect(task.childSessionKey).toBe(cronRunSessionKey);
expect(task.runId).toBe(`cron:main-heartbeat-job:${now}`);
expect(task.label).toBe("main heartbeat job");
expect(task.task).toBe("main heartbeat job");
@@ -201,9 +259,10 @@ describe("cron service timer seam coverage", () => {
{ jobId: "main-heartbeat-job", error: ledgerError },
"cron: failed to create task ledger record",
);
const cronRunSessionKey = `agent:main:cron:main-heartbeat-job:run:${now}`;
expect(enqueueSystemEvent).toHaveBeenCalledWith("heartbeat seam tick", {
agentId: undefined,
sessionKey: "agent:main:main",
sessionKey: cronRunSessionKey,
contextKey: "cron:main-heartbeat-job",
});

View File

@@ -6,13 +6,21 @@ import {
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
isRetryableHeartbeatBusySkipReason,
} from "../../infra/heartbeat-wake.js";
import { DEFAULT_AGENT_ID, isSubagentSessionKey } from "../../routing/session-key.js";
import { loadSessionStore } from "../../config/sessions/store-load.js";
import {
DEFAULT_AGENT_ID,
isSubagentSessionKey,
normalizeAgentId,
resolveAgentIdFromSessionKey,
} from "../../routing/session-key.js";
import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js";
import {
completeTaskRunByRunId,
createRunningTaskRun,
failTaskRunByRunId,
} from "../../tasks/detached-task-runtime.js";
import { deliveryContextFromSession } from "../../utils/delivery-context.shared.js";
import type { DeliveryContext } from "../../utils/delivery-context.types.js";
import { clearCronJobActive, markCronJobActive } from "../active-jobs.js";
import { resolveCronDeliveryPlan, resolveFailureDestination } from "../delivery-plan.js";
import { resolveCronAgentSessionKey } from "../isolated-agent/session-key.js";
@@ -420,10 +428,55 @@ export function normalizeCronRunErrorText(err: unknown): string {
return String(err);
}
function normalizeCronLaneSegment(value: string | undefined, fallback: string): string {
const normalized = normalizeOptionalLowercaseString(value)
?.replace(/[^a-z0-9_-]+/g, "-")
.replace(/^-+|-+$/g, "")
.slice(0, 64);
return normalized || fallback;
}
function resolveMainSessionCronRunSessionKey(job: CronJob, startedAt: number): string {
const explicitAgentId = job.agentId?.trim();
const agentId = normalizeAgentId(
explicitAgentId || resolveAgentIdFromSessionKey(job.sessionKey),
);
const jobSegment = normalizeCronLaneSegment(job.id, "job");
const runSegment = normalizeCronLaneSegment(String(Math.max(0, Math.floor(startedAt))), "run");
return `agent:${agentId}:cron:${jobSegment}:run:${runSegment}`;
}
function resolveMainSessionCronDeliveryContext(
state: CronServiceState,
job: CronJob,
): DeliveryContext | undefined {
const targetSessionKey = job.sessionKey?.trim();
if (!targetSessionKey) {
return undefined;
}
const explicitAgentId = job.agentId?.trim();
const agentId = normalizeAgentId(
explicitAgentId || resolveAgentIdFromSessionKey(targetSessionKey),
);
const storePath = state.deps.resolveSessionStorePath?.(agentId) ?? state.deps.sessionStorePath;
if (!storePath) {
return undefined;
}
try {
return deliveryContextFromSession(loadSessionStore(storePath)[targetSessionKey]);
} catch {
return undefined;
}
}
function resolveCronTaskChildSessionKey(params: {
state: CronServiceState;
job: CronJob;
startedAt: number;
}): string | undefined {
if (params.job.sessionTarget === "main") {
return resolveMainSessionCronRunSessionKey(params.job, params.startedAt);
}
const explicitSessionKey = params.job.sessionKey?.trim();
if (explicitSessionKey) {
return explicitSessionKey;
@@ -1706,11 +1759,15 @@ async function executeMainSessionCronJob(
: 'main job requires payload.kind="systemEvent"',
};
}
const targetMainSessionKey = job.sessionKey;
const cronStartedAt =
typeof job.state.runningAtMs === "number" ? job.state.runningAtMs : state.deps.nowMs();
const cronRunSessionKey = resolveMainSessionCronRunSessionKey(job, cronStartedAt);
const deliveryContext = resolveMainSessionCronDeliveryContext(state, job);
state.deps.enqueueSystemEvent(text, {
agentId: job.agentId,
sessionKey: targetMainSessionKey,
sessionKey: cronRunSessionKey,
contextKey: `cron:${job.id}`,
...(deliveryContext ? { deliveryContext } : {}),
});
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
const reason = `cron:${job.id}`;
@@ -1728,7 +1785,7 @@ async function executeMainSessionCronJob(
intent: "immediate",
reason,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
sessionKey: cronRunSessionKey,
heartbeat: { target: "last" },
});
if (
@@ -1744,10 +1801,10 @@ async function executeMainSessionCronJob(
intent: "immediate",
reason,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
sessionKey: cronRunSessionKey,
heartbeat: { target: "last" },
});
return { status: "ok", summary: text };
return { status: "ok", summary: text, sessionKey: cronRunSessionKey };
}
if (abortSignal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
@@ -1761,21 +1818,31 @@ async function executeMainSessionCronJob(
intent: "immediate",
reason,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
sessionKey: cronRunSessionKey,
heartbeat: { target: "last" },
});
return { status: "ok", summary: text };
return { status: "ok", summary: text, sessionKey: cronRunSessionKey };
}
await waitWithAbort(retryDelayMs);
}
if (heartbeatResult.status === "ran") {
return { status: "ok", summary: text };
return { status: "ok", summary: text, sessionKey: cronRunSessionKey };
}
if (heartbeatResult.status === "skipped") {
return { status: "skipped", error: heartbeatResult.reason, summary: text };
return {
status: "skipped",
error: heartbeatResult.reason,
summary: text,
sessionKey: cronRunSessionKey,
};
}
return { status: "error", error: heartbeatResult.reason, summary: text };
return {
status: "error",
error: heartbeatResult.reason,
summary: text,
sessionKey: cronRunSessionKey,
};
}
if (abortSignal?.aborted) {
@@ -1786,10 +1853,10 @@ async function executeMainSessionCronJob(
intent: job.wakeMode === "now" ? "immediate" : "event",
reason: `cron:${job.id}`,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
sessionKey: cronRunSessionKey,
heartbeat: { target: "last" },
});
return { status: "ok", summary: text };
return { status: "ok", summary: text, sessionKey: cronRunSessionKey };
}
async function executeDetachedCronJob(

View File

@@ -136,6 +136,10 @@ function callArg(
return call[argIndex];
}
function expectMainCronRunSessionKey(value: unknown, jobId: string) {
expect(value).toMatch(new RegExp(`^agent:main:cron:${jobId}:run:\\d+$`));
}
function lastMockCall(mock: { mock: { calls: Array<Array<unknown>> } }, label: string) {
const calls = mock.mock.calls;
const call = calls[calls.length - 1];
@@ -365,14 +369,102 @@ describe("buildGatewayCronService", () => {
await state.cron.run(job.id, "force");
expect(callArg(enqueueSystemEventMock, 0, 0, "system event text")).toBe("hello");
expect(
requireRecord(callArg(enqueueSystemEventMock, 0, 1, "system event options"), "options")
.sessionKey,
).toBe("agent:main:discord:channel:ops");
expect(
requireRecord(callArg(requestHeartbeatMock, 0, 0, "heartbeat request"), "request")
.sessionKey,
).toBe("agent:main:discord:channel:ops");
const eventOptions = requireRecord(
callArg(enqueueSystemEventMock, 0, 1, "system event options"),
"options",
);
expectMainCronRunSessionKey(eventOptions.sessionKey, job.id);
const heartbeatRequest = requireRecord(
callArg(requestHeartbeatMock, 0, 0, "heartbeat request"),
"request",
);
expectMainCronRunSessionKey(heartbeatRequest.sessionKey, job.id);
} finally {
state.cron.stop();
}
});
it("routes global-scope main cron jobs through the global queue for queued wakes", async () => {
const cfg = {
...createCronConfig("server-cron-global-queued"),
session: { mainKey: "main", scope: "global" },
} as OpenClawConfig;
loadConfigMock.mockReturnValue(cfg);
const state = buildGatewayCronService({
cfg,
deps: {} as CliDeps,
broadcast: () => {},
});
try {
const job = await state.cron.add({
name: "global-queued",
enabled: true,
schedule: { kind: "at", at: new Date(1).toISOString() },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello global" },
});
await state.cron.run(job.id, "force");
expect(callArg(enqueueSystemEventMock, 0, 0, "system event text")).toBe("hello global");
const eventOptions = requireRecord(
callArg(enqueueSystemEventMock, 0, 1, "system event options"),
"options",
);
expect(eventOptions.sessionKey).toBe("global");
const heartbeatRequest = requireRecord(
callArg(requestHeartbeatMock, 0, 0, "heartbeat request"),
"request",
);
expect(heartbeatRequest.agentId).toBe("main");
expect(heartbeatRequest.sessionKey).toBe("global");
} finally {
state.cron.stop();
}
});
it("routes global-scope immediate main cron jobs through the global heartbeat lane", async () => {
const cfg = {
...createCronConfig("server-cron-global-now"),
session: { mainKey: "main", scope: "global" },
} as OpenClawConfig;
loadConfigMock.mockReturnValue(cfg);
const state = buildGatewayCronService({
cfg,
deps: {} as CliDeps,
broadcast: () => {},
});
try {
const job = await state.cron.add({
name: "global-now",
enabled: true,
schedule: { kind: "at", at: new Date(1).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello now" },
});
await state.cron.run(job.id, "force");
const eventOptions = requireRecord(
callArg(enqueueSystemEventMock, 0, 1, "system event options"),
"options",
);
expect(eventOptions.sessionKey).toBe("global");
const heartbeatRun = requireRecord(
callArg(runHeartbeatOnceMock, 0, 0, "heartbeat run options"),
"heartbeat run options",
);
expect(heartbeatRun.agentId).toBe("main");
expect(heartbeatRun.sessionKey).toBe("global");
expect(heartbeatRun.heartbeat).toEqual({
target: "last",
to: undefined,
accountId: undefined,
});
} finally {
state.cron.stop();
}
@@ -529,7 +621,7 @@ describe("buildGatewayCronService", () => {
callArg(requestHeartbeatMock, 0, 0, "heartbeat request"),
"heartbeat request",
);
expect(call.sessionKey).toBe("agent:main:telegram:group:123:topic:456");
expectMainCronRunSessionKey(call.sessionKey, job.id);
expect(call.heartbeat).toEqual({
target: "last",
to: undefined,

View File

@@ -34,7 +34,11 @@ import type {
PluginHookGatewayCronService,
PluginHookGatewayContext,
} from "../plugins/hook-types.js";
import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js";
import {
normalizeAgentId,
resolveEventSessionKey,
toAgentStoreSessionKey,
} from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import { parseAgentSessionKey } from "../sessions/session-key-utils.js";
import {
@@ -228,13 +232,21 @@ export function buildGatewayCronService(params: {
requestedAgentId ?? derivedAgentId,
);
const agentId = resolvedAgentId || undefined;
const sessionKey = agentId
const resolvedSessionKey = agentId
? resolveCronSessionKey({
runtimeConfig,
agentId,
requestedSessionKey,
})
: undefined;
const sessionKey =
resolvedSessionKey && runtimeConfig.session?.scope === "global"
? resolveEventSessionKey(
resolvedSessionKey,
runtimeConfig.session?.mainKey,
runtimeConfig.session?.scope,
)
: resolvedSessionKey;
return { runtimeConfig, agentId, sessionKey };
};
@@ -301,6 +313,7 @@ export function buildGatewayCronService(params: {
enqueueSystemEvent(text, {
sessionKey,
contextKey: opts?.contextKey,
deliveryContext: opts?.deliveryContext,
forceSenderIsOwnerFalse: opts?.forceSenderIsOwnerFalse,
trusted: opts?.forceSenderIsOwnerFalse !== true,
});

View File

@@ -6,6 +6,7 @@ import { afterAll, beforeEach, describe, expect, test, vi } from "vitest";
import type WebSocket from "ws";
import { resetConfigRuntimeState } from "../config/config.js";
import type { GuardedFetchOptions } from "../infra/net/fetch-guard.js";
import { peekSystemEvents } from "../infra/system-events.js";
import type { GatewayCronState } from "./server-cron.js";
import {
connectOk,
@@ -15,7 +16,6 @@ import {
rpcReq,
startServerWithClient,
testState,
waitForSystemEvent,
} from "./test-helpers.js";
const fetchWithSsrFGuardMock = vi.hoisted(() =>
@@ -431,7 +431,8 @@ describe("gateway server cron", () => {
cronEnabled: false,
});
const cronState = await createDirectCronState();
const cronEvents = createCronEventCollector();
const cronState = await createDirectCronState({ broadcast: cronEvents.broadcast });
try {
const addRes = await directCronReq(cronState, "cron.add", {
@@ -495,7 +496,11 @@ describe("gateway server cron", () => {
const runRes = await cronState.cron.run(routeJobId, "force");
expect(runRes).toEqual({ ok: true, ran: true });
const events = await waitForSystemEvent();
const routeFinished = await cronEvents.wait(
(payload) => payload.jobId === routeJobId && payload.action === "finished",
);
expect(typeof routeFinished.sessionKey).toBe("string");
const events = peekSystemEvents(routeFinished.sessionKey as string);
expect(events.some((event) => event.includes("cron route check"))).toBe(true);
const wrappedAtMs = Date.now() + 1000;