refactor(logging): share diagnostic message lifecycle

Refactor diagnostic queued/state/processed emission into a shared helper used by dispatch and isolated cron turns.

Preserve dispatch processed-event behavior, cron queue-depth symmetry, and final cron session-id adoption while adding focused helper coverage and reviewer comments for the non-obvious invariants.
This commit is contained in:
Peter Steinberger
2026-05-25 19:48:45 +01:00
committed by GitHub
parent e844d1d6e5
commit baab4cf045
5 changed files with 258 additions and 68 deletions

View File

@@ -72,7 +72,7 @@ Skills own workflows; root owns hard policy and routing.
- Gateway/plugin metadata is process-stable: installs, manifests, catalogs, generated paths, bundled metadata. Changes require restart or explicit owner reload/install/doctor flow.
- Runtime hot paths: no freshness polling (`stat`/`realpath`/JSON reread/hash). Reuse current snapshots, install records, discovery, lookup tables, root scopes, resolved paths.
- Process-local metadata caches ok when lifecycle-owned and bounded/single-slot. Freshness exceptions need named owner + tests.
- Inline code comments: brief notes for tricky, bug-prone, or previously buggy logic.
- Inline comments: add them where they preserve reviewer context: cross-path invariants, lifecycle ordering, ownership boundaries, session/id adoption, queue-depth symmetry, or intentional caller differences. No obvious-mechanics comments.
- Gateway protocol changes: additive first; incompatible needs versioning/docs/client follow-through.
- Protocol version bumps: explicit owner confirmation only; never automatic/generated.
- Config contract: exported types, schema/help, metadata, baselines, docs aligned. Retired public keys stay retired; compat in raw migration/doctor only.
@@ -157,7 +157,6 @@ Skills own workflows; root owns hard policy and routing.
- Dynamic import: no static+dynamic import for same prod module. Use `*.runtime.ts` lazy boundary. After edits: `pnpm build`; check `[INEFFECTIVE_DYNAMIC_IMPORT]`.
- Cycles: keep `pnpm check:import-cycles` + architecture/madge green.
- Classes: no prototype mixins/mutations. Prefer inheritance/composition. Tests prefer per-instance stubs.
- Comments: brief, only non-obvious logic.
- Split files around ~700 LOC when clarity/testability improves.
- Naming: **OpenClaw** product/docs; `openclaw` CLI/package/path/config.
- English: American spelling.

View File

@@ -58,11 +58,9 @@ import { isAbortError } from "../../infra/unhandled-rejections.js";
import {
logMessageDispatchCompleted,
logMessageDispatchStarted,
logMessageProcessed,
logMessageQueued,
logSessionStateChange,
markDiagnosticSessionProgress,
} from "../../logging/diagnostic.js";
import { createDiagnosticMessageLifecycle } from "../../logging/message-lifecycle.js";
import { matchPluginCommand } from "../../plugins/commands.js";
import {
buildPluginBindingDeclinedText,
@@ -798,6 +796,17 @@ export async function dispatchReplyFromConfig(
normalizeOptionalString(ctx.SessionKey) ?? normalizeOptionalString(ctx.CommandTargetSessionKey);
const startTime = diagnosticsEnabled ? Date.now() : 0;
const canTrackSession = diagnosticsEnabled && Boolean(sessionKey);
const messageLifecycle = createDiagnosticMessageLifecycle({
enabled: diagnosticsEnabled,
channel,
chatId,
messageId,
sessionKey,
source: "dispatch",
processingReason: "message_start",
startedAtMs: startTime,
trackSessionState: canTrackSession,
});
const traceAttributes = {
surface: channel,
hasSessionKey: Boolean(sessionKey),
@@ -818,19 +827,7 @@ export async function dispatchReplyFromConfig(
error?: string;
},
) => {
if (!diagnosticsEnabled) {
return;
}
logMessageProcessed({
channel,
chatId,
messageId,
sessionKey,
durationMs: Date.now() - startTime,
outcome,
reason: opts?.reason,
error: opts?.error,
});
messageLifecycle.markProcessed(outcome, opts);
};
const recordAgentDispatchStarted = () => {
@@ -867,26 +864,11 @@ export async function dispatchReplyFromConfig(
};
const markProcessing = () => {
if (!canTrackSession || !sessionKey) {
return;
}
logMessageQueued({ sessionKey, channel, source: "dispatch" });
logSessionStateChange({
sessionKey,
state: "processing",
reason: "message_start",
});
messageLifecycle.markProcessing();
};
const markIdle = (reason: string) => {
if (!canTrackSession || !sessionKey) {
return;
}
logSessionStateChange({
sessionKey,
state: "idle",
reason,
});
messageLifecycle.markIdle(reason);
};
let inboundDedupeReplayUnsafe = false;

View File

@@ -16,11 +16,7 @@ import {
type SourceDeliveryPlan,
type SourceDeliveryVisibleDelivery,
} from "../../infra/outbound/source-delivery-plan.js";
import {
logMessageProcessed,
logMessageQueued,
logSessionStateChange,
} from "../../logging/diagnostic.js";
import { createDiagnosticMessageLifecycle } from "../../logging/message-lifecycle.js";
import { isCommandLaneTaskTimeoutError } from "../../process/command-queue.js";
import { CommandLane } from "../../process/lanes.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
@@ -1192,19 +1188,16 @@ export async function runCronIsolatedAgentTurn(params: {
const turnStartedAtMs = Date.now();
const diagnosticsEnabled = isDiagnosticsEnabled(params.cfg);
if (diagnosticsEnabled) {
logMessageQueued({
sessionId: prepared.context.runSessionId,
sessionKey: prepared.context.runSessionKey,
channel: "cron",
source: "cron-isolated",
});
logSessionStateChange({
sessionId: prepared.context.runSessionId,
sessionKey: prepared.context.runSessionKey,
state: "processing",
});
}
const messageLifecycle = createDiagnosticMessageLifecycle({
enabled: diagnosticsEnabled,
sessionId: prepared.context.runSessionId,
sessionKey: prepared.context.runSessionKey,
channel: "cron",
source: "cron-isolated",
startedAtMs: turnStartedAtMs,
trackSessionState: true,
});
messageLifecycle.markProcessing();
let outcome: "completed" | "error" = "completed";
let outcomeError: string | undefined;
@@ -1279,21 +1272,16 @@ export async function runCronIsolatedAgentTurn(params: {
),
});
} finally {
if (diagnosticsEnabled) {
logSessionStateChange({
sessionId: prepared.context.currentRunSessionId(),
sessionKey: prepared.context.runSessionKey,
state: "idle",
});
logMessageProcessed({
channel: "cron",
sessionId: prepared.context.currentRunSessionId(),
sessionKey: prepared.context.runSessionKey,
durationMs: Date.now() - turnStartedAtMs,
outcome,
error: outcomeError,
});
}
// Final lifecycle events use the adopted run session when the agent persisted one.
const finalSessionRef = {
sessionId: prepared.context.currentRunSessionId(),
sessionKey: prepared.context.runSessionKey,
};
messageLifecycle.markIdle(undefined, finalSessionRef);
messageLifecycle.markProcessed(outcome, {
...finalSessionRef,
error: outcomeError,
});
// Release runtime references after the run completes (success or failure).
// The session entry has already been persisted to disk by this point,
// so the in-memory store and run context can be safely dropped.

View File

@@ -0,0 +1,127 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const diagnosticMocks = vi.hoisted(() => ({
logMessageProcessed: vi.fn(),
logMessageQueued: vi.fn(),
logSessionStateChange: vi.fn(),
}));
vi.mock("./diagnostic.js", () => ({
logMessageProcessed: diagnosticMocks.logMessageProcessed,
logMessageQueued: diagnosticMocks.logMessageQueued,
logSessionStateChange: diagnosticMocks.logSessionStateChange,
}));
import { createDiagnosticMessageLifecycle } from "./message-lifecycle.js";
describe("createDiagnosticMessageLifecycle", () => {
beforeEach(() => {
diagnosticMocks.logMessageProcessed.mockReset();
diagnosticMocks.logMessageQueued.mockReset();
diagnosticMocks.logSessionStateChange.mockReset();
});
it("emits queued, state, and processed events through one lifecycle", () => {
const lifecycle = createDiagnosticMessageLifecycle({
enabled: true,
channel: "cron",
source: "cron-isolated",
sessionId: "initial-session",
sessionKey: "cron:job",
trackSessionState: true,
});
lifecycle.markProcessing();
lifecycle.markIdle(undefined, { sessionId: "final-session" });
lifecycle.markProcessed("completed", {
sessionId: "final-session",
durationMs: 42,
});
expect(diagnosticMocks.logMessageQueued).toHaveBeenCalledWith({
sessionId: "initial-session",
sessionKey: "cron:job",
channel: "cron",
source: "cron-isolated",
});
expect(diagnosticMocks.logSessionStateChange.mock.calls).toEqual([
[
{
sessionId: "initial-session",
sessionKey: "cron:job",
state: "processing",
reason: undefined,
},
],
[
{
sessionId: "final-session",
sessionKey: "cron:job",
state: "idle",
reason: undefined,
},
],
]);
expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith({
channel: "cron",
chatId: undefined,
messageId: undefined,
sessionId: "final-session",
sessionKey: "cron:job",
durationMs: 42,
outcome: "completed",
reason: undefined,
error: undefined,
});
});
it("keeps processed events independent of session-state tracking", () => {
const lifecycle = createDiagnosticMessageLifecycle({
enabled: true,
channel: "whatsapp",
source: "dispatch",
chatId: "chat-1",
messageId: "msg-1",
trackSessionState: false,
});
lifecycle.markProcessing();
lifecycle.markIdle("message_completed");
lifecycle.markProcessed("skipped", {
durationMs: 7,
reason: "duplicate",
});
expect(diagnosticMocks.logMessageQueued).not.toHaveBeenCalled();
expect(diagnosticMocks.logSessionStateChange).not.toHaveBeenCalled();
expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith({
channel: "whatsapp",
chatId: "chat-1",
messageId: "msg-1",
sessionId: undefined,
sessionKey: undefined,
durationMs: 7,
outcome: "skipped",
reason: "duplicate",
error: undefined,
});
});
it("emits nothing when disabled", () => {
const lifecycle = createDiagnosticMessageLifecycle({
enabled: false,
channel: "slack",
source: "dispatch",
sessionKey: "agent:main",
trackSessionState: true,
});
lifecycle.markProcessing();
lifecycle.markIdle("message_completed");
lifecycle.markProcessed("completed", { durationMs: 1 });
expect(diagnosticMocks.logMessageQueued).not.toHaveBeenCalled();
expect(diagnosticMocks.logSessionStateChange).not.toHaveBeenCalled();
expect(diagnosticMocks.logMessageProcessed).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,94 @@
import { logMessageProcessed, logMessageQueued, logSessionStateChange } from "./diagnostic.js";
type MessageLifecycleRef = {
sessionId?: string;
sessionKey?: string;
};
type MessageLifecycleOutcome = "completed" | "skipped" | "error";
type MessageLifecycleProcessedOptions = MessageLifecycleRef & {
durationMs?: number;
reason?: string;
error?: string;
};
export function createDiagnosticMessageLifecycle(
params: MessageLifecycleRef & {
enabled: boolean;
channel: string;
source: string;
chatId?: number | string;
messageId?: number | string;
processingReason?: string;
startedAtMs?: number;
trackSessionState: boolean;
},
) {
const startedAtMs = params.startedAtMs ?? Date.now();
const resolveRef = (override?: MessageLifecycleRef): MessageLifecycleRef => ({
sessionId: override?.sessionId ?? params.sessionId,
sessionKey: override?.sessionKey ?? params.sessionKey,
});
const hasSessionRef = (ref: MessageLifecycleRef): boolean =>
Boolean(ref.sessionId || ref.sessionKey);
// Processed events still matter without a session ref; queue-depth/state events do not.
const canTrackSessionState = (ref: MessageLifecycleRef): boolean =>
params.enabled && params.trackSessionState && hasSessionRef(ref);
return {
markProcessing(override?: MessageLifecycleRef): void {
const ref = resolveRef(override);
if (!canTrackSessionState(ref)) {
return;
}
logMessageQueued({
sessionId: ref.sessionId,
sessionKey: ref.sessionKey,
channel: params.channel,
source: params.source,
});
logSessionStateChange({
sessionId: ref.sessionId,
sessionKey: ref.sessionKey,
state: "processing",
reason: params.processingReason,
});
},
markIdle(reason?: string, override?: MessageLifecycleRef): void {
const ref = resolveRef(override);
if (!canTrackSessionState(ref)) {
return;
}
logSessionStateChange({
sessionId: ref.sessionId,
sessionKey: ref.sessionKey,
state: "idle",
reason,
});
},
markProcessed(
outcome: MessageLifecycleOutcome,
options?: MessageLifecycleProcessedOptions,
): void {
if (!params.enabled) {
return;
}
const ref = resolveRef(options);
logMessageProcessed({
channel: params.channel,
chatId: params.chatId,
messageId: params.messageId,
sessionId: ref.sessionId,
sessionKey: ref.sessionKey,
durationMs: options?.durationMs ?? Date.now() - startedAtMs,
outcome,
reason: options?.reason,
error: options?.error,
});
},
};
}