mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(plugins): snapshot agent event subscriptions
This commit is contained in:
@@ -0,0 +1,74 @@
|
||||
// Agent event subscription registration tests cover plugin-owned callback snapshotting.
|
||||
import {
|
||||
createPluginRegistryFixture,
|
||||
registerTestPlugin,
|
||||
} from "openclaw/plugin-sdk/plugin-test-contracts";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { dispatchPluginAgentEventSubscriptions } from "../host-hook-runtime.js";
|
||||
import type { PluginAgentEventSubscriptionRegistration } from "../host-hooks.js";
|
||||
import { createEmptyPluginRegistry } from "../registry-empty.js";
|
||||
import { setActivePluginRegistry } from "../runtime.js";
|
||||
import { createPluginRecord } from "../status.test-helpers.js";
|
||||
|
||||
describe("plugin agent event subscription registration", () => {
|
||||
afterEach(() => {
|
||||
setActivePluginRegistry(createEmptyPluginRegistry());
|
||||
});
|
||||
|
||||
it("snapshots subscription callbacks before event dispatch", () => {
|
||||
let handleReads = 0;
|
||||
let streamsReads = 0;
|
||||
const handledStreams: string[] = [];
|
||||
const { config, registry } = createPluginRegistryFixture();
|
||||
registerTestPlugin({
|
||||
registry,
|
||||
config,
|
||||
record: createPluginRecord({
|
||||
id: "volatile-agent-events",
|
||||
name: "Volatile Agent Events",
|
||||
}),
|
||||
register(api) {
|
||||
api.registerAgentEventSubscription({
|
||||
id: "events",
|
||||
description: "Event sink",
|
||||
get streams() {
|
||||
streamsReads += 1;
|
||||
if (streamsReads > 1) {
|
||||
throw new Error("streams getter re-read");
|
||||
}
|
||||
return ["tool"];
|
||||
},
|
||||
get handle() {
|
||||
handleReads += 1;
|
||||
if (handleReads > 1) {
|
||||
throw new Error("handle getter re-read");
|
||||
}
|
||||
return (event) => {
|
||||
handledStreams.push(event.stream);
|
||||
};
|
||||
},
|
||||
} as PluginAgentEventSubscriptionRegistration);
|
||||
},
|
||||
});
|
||||
setActivePluginRegistry(registry.registry);
|
||||
|
||||
expect(registry.registry.agentEventSubscriptions?.[0]?.subscription.description).toBe(
|
||||
"Event sink",
|
||||
);
|
||||
expect(handleReads).toBe(1);
|
||||
expect(streamsReads).toBe(1);
|
||||
|
||||
dispatchPluginAgentEventSubscriptions({
|
||||
registry: registry.registry,
|
||||
event: {
|
||||
runId: "run-1",
|
||||
stream: "tool",
|
||||
data: { name: "approval_fixture_tool" },
|
||||
},
|
||||
});
|
||||
|
||||
expect(handledStreams).toEqual(["tool"]);
|
||||
expect(handleReads).toBe(1);
|
||||
expect(streamsReads).toBe(1);
|
||||
});
|
||||
});
|
||||
@@ -2469,12 +2469,51 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
});
|
||||
};
|
||||
|
||||
const readAgentEventSubscriptionFields = (
|
||||
record: PluginRecord,
|
||||
subscription: PluginAgentEventSubscriptionRegistration,
|
||||
):
|
||||
| {
|
||||
id: unknown;
|
||||
description: unknown;
|
||||
streams: unknown;
|
||||
handle: unknown;
|
||||
}
|
||||
| undefined => {
|
||||
let id: unknown;
|
||||
try {
|
||||
id = subscription.id;
|
||||
return {
|
||||
id,
|
||||
description: subscription.description,
|
||||
streams: subscription.streams,
|
||||
handle: subscription.handle,
|
||||
};
|
||||
} catch (error) {
|
||||
const normalizedId = normalizeOptionalHostHookString(id);
|
||||
pushDiagnostic({
|
||||
level: "error",
|
||||
pluginId: record.id,
|
||||
source: record.source,
|
||||
message:
|
||||
`agent event subscription registration has unreadable fields` +
|
||||
`${normalizedId ? `: ${normalizedId}` : ""}: ${formatErrorMessage(error)}`,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
};
|
||||
|
||||
const registerAgentEventSubscription = (
|
||||
record: PluginRecord,
|
||||
subscription: PluginAgentEventSubscriptionRegistration,
|
||||
) => {
|
||||
const id = normalizePluginHostHookId(subscription.id);
|
||||
if (!id || typeof subscription.handle !== "function") {
|
||||
const fields = readAgentEventSubscriptionFields(record, subscription);
|
||||
if (!fields) {
|
||||
return;
|
||||
}
|
||||
const id = normalizePluginHostHookId(typeof fields.id === "string" ? fields.id : undefined);
|
||||
const handle = fields.handle;
|
||||
if (!id || typeof handle !== "function") {
|
||||
pushDiagnostic({
|
||||
level: "error",
|
||||
pluginId: record.id,
|
||||
@@ -2483,7 +2522,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
});
|
||||
return;
|
||||
}
|
||||
const streams = normalizeHostHookStringList(subscription.streams);
|
||||
const streams = normalizeHostHookStringList(fields.streams);
|
||||
if (streams === null) {
|
||||
pushDiagnostic({
|
||||
level: "error",
|
||||
@@ -2508,7 +2547,16 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
(registry.agentEventSubscriptions ??= []).push({
|
||||
pluginId: record.id,
|
||||
pluginName: record.name,
|
||||
subscription: { ...subscription, id, ...(streams !== undefined ? { streams } : {}) },
|
||||
subscription: {
|
||||
id,
|
||||
...(fields.description !== undefined ? { description: fields.description as string } : {}),
|
||||
...(streams !== undefined
|
||||
? {
|
||||
streams: streams as NonNullable<PluginAgentEventSubscriptionRegistration["streams"]>,
|
||||
}
|
||||
: {}),
|
||||
handle: handle as PluginAgentEventSubscriptionRegistration["handle"],
|
||||
},
|
||||
source: record.source,
|
||||
rootDir: record.rootDir,
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user