mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
fix(discord): deliver same-session channel replies
Deliver same-session channel replies directly while preserving stale-reply guards. The fix bypasses the announce decider only when the requester and target are the same source channel, carries reply baselines into fire-and-forget follow-up delivery, and keeps history reads best-effort so timeout-zero sends still dispatch. It also includes focused regression coverage for delayed same-session replies, stale snapshots, retry timer caps, and the current strict-null/package-boundary blockers fixed while preparing the PR.
This commit is contained in:
@@ -54,6 +54,11 @@ type ResetAwareSessionStore = AcpSessionStore & {
|
||||
markFresh: (sessionKey: string) => void;
|
||||
};
|
||||
|
||||
type OpenClawLeaseSessionMetadata = {
|
||||
openclawLeaseId: string;
|
||||
openclawGatewayInstanceId: string;
|
||||
};
|
||||
|
||||
function withOpenClawManagedTurnTimeout<T extends object>(input: T): T & { timeoutMs: 0 } {
|
||||
// OpenClaw owns ACP turn deadlines. acpx treats timeout after partial agent
|
||||
// output as a completed turn, which can mark background work done early.
|
||||
@@ -63,6 +68,17 @@ function withOpenClawManagedTurnTimeout<T extends object>(input: T): T & { timeo
|
||||
};
|
||||
}
|
||||
|
||||
function withOpenClawLeaseSessionMetadata<T extends object>(
|
||||
record: T,
|
||||
metadata: OpenClawLeaseSessionMetadata,
|
||||
): T & OpenClawLeaseSessionMetadata {
|
||||
return {
|
||||
...record,
|
||||
openclawLeaseId: metadata.openclawLeaseId,
|
||||
openclawGatewayInstanceId: metadata.openclawGatewayInstanceId,
|
||||
};
|
||||
}
|
||||
|
||||
type AcpxLaunchLeaseContext = {
|
||||
leaseId: string;
|
||||
gatewayInstanceId: string;
|
||||
@@ -234,11 +250,10 @@ function createResetAwareSessionStore(
|
||||
if (!lease) {
|
||||
return record;
|
||||
}
|
||||
return {
|
||||
...(record as Record<string, unknown>),
|
||||
return withOpenClawLeaseSessionMetadata(record, {
|
||||
openclawLeaseId: lease.leaseId,
|
||||
openclawGatewayInstanceId: lease.gatewayInstanceId,
|
||||
} as unknown as AcpLoadedSessionRecord;
|
||||
});
|
||||
},
|
||||
async save(record: AcpSessionRecord): Promise<void> {
|
||||
let recordToSave = record;
|
||||
@@ -266,14 +281,18 @@ function createResetAwareSessionStore(
|
||||
state: "open",
|
||||
};
|
||||
await params.leaseStore.save(lease);
|
||||
recordToSave = {
|
||||
...(record as Record<string, unknown>),
|
||||
// ACPX uses agentCommand as reuse identity. Lease metadata belongs to
|
||||
// our sidecar record, so keep the persisted command stable.
|
||||
agentCommand: stableAgentCommand,
|
||||
openclawLeaseId: launch.leaseId,
|
||||
openclawGatewayInstanceId: launch.gatewayInstanceId,
|
||||
} as unknown as AcpSessionRecord;
|
||||
recordToSave = withOpenClawLeaseSessionMetadata(
|
||||
{
|
||||
...record,
|
||||
// ACPX uses agentCommand as reuse identity. Lease metadata belongs to
|
||||
// our sidecar record, so keep the persisted command stable.
|
||||
agentCommand: stableAgentCommand,
|
||||
},
|
||||
{
|
||||
openclawLeaseId: launch.leaseId,
|
||||
openclawGatewayInstanceId: launch.gatewayInstanceId,
|
||||
},
|
||||
);
|
||||
}
|
||||
await baseStore.save(recordToSave);
|
||||
if (sessionName) {
|
||||
|
||||
@@ -224,12 +224,12 @@ function resolveActiveProfileId(params: {
|
||||
const lastGood = [
|
||||
params.store.lastGood?.[OPENAI_PROVIDER_ID],
|
||||
params.store.lastGood?.[OPENAI_CODEX_PROVIDER_ID],
|
||||
].find((profileId): profileId is string => {
|
||||
if (!profileId) {
|
||||
return false;
|
||||
}
|
||||
return params.order.includes(profileId) && isActiveProfileCandidate(params, profileId);
|
||||
});
|
||||
].find(
|
||||
(profileId): profileId is string =>
|
||||
typeof profileId === "string" &&
|
||||
params.order.includes(profileId) &&
|
||||
isActiveProfileCandidate(params, profileId),
|
||||
);
|
||||
if (lastGood) {
|
||||
return lastGood;
|
||||
}
|
||||
|
||||
@@ -660,8 +660,7 @@ describe("markAuthProfileFailure — active windows do not extend on retry", ()
|
||||
reason: "rate_limit" | "billing" | "auth_permanent";
|
||||
cfg?: OpenClawConfig;
|
||||
}): Promise<void> {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(params.now);
|
||||
const dateNowSpy = vi.spyOn(Date, "now").mockReturnValue(params.now);
|
||||
try {
|
||||
await markAuthProfileFailure({
|
||||
store: params.store,
|
||||
@@ -670,7 +669,7 @@ describe("markAuthProfileFailure — active windows do not extend on retry", ()
|
||||
cfg: params.cfg,
|
||||
});
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
dateNowSpy.mockRestore();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -913,8 +912,7 @@ describe("markAuthProfileFailure — WHAM-aware Codex cooldowns", () => {
|
||||
reason?: "rate_limit" | "unknown";
|
||||
useLock?: boolean;
|
||||
}): Promise<void> {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(params.now);
|
||||
const dateNowSpy = vi.spyOn(Date, "now").mockReturnValue(params.now);
|
||||
if (params.useLock) {
|
||||
storeMocks.updateAuthProfileStoreWithLock.mockImplementationOnce(
|
||||
async (lockParams: { updater: (store: AuthProfileStore) => boolean }) => {
|
||||
@@ -931,7 +929,7 @@ describe("markAuthProfileFailure — WHAM-aware Codex cooldowns", () => {
|
||||
reason: params.reason ?? "rate_limit",
|
||||
});
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
dateNowSpy.mockRestore();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -111,6 +111,134 @@ describe("runSessionsSendA2AFlow announce delivery", () => {
|
||||
expect(sendParams.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("bypasses the announce decider for same-session channel replies", async () => {
|
||||
await runSessionsSendA2AFlow({
|
||||
targetSessionKey: "agent:main:discord:channel:target-room",
|
||||
displayKey: "agent:main:discord:channel:target-room",
|
||||
message: "Test message",
|
||||
announceTimeoutMs: 10_000,
|
||||
maxPingPongTurns: 2,
|
||||
requesterSessionKey: "agent:main:discord:channel:target-room",
|
||||
requesterChannel: "discord",
|
||||
roundOneReply: "Substantive channel reply",
|
||||
});
|
||||
|
||||
expect(runAgentStep).not.toHaveBeenCalled();
|
||||
const sendCall = requireGatewayCall("send");
|
||||
const sendParams = sendCall.params as Record<string, unknown>;
|
||||
expect(sendParams.channel).toBe("discord");
|
||||
expect(sendParams.to).toBe("channel:target-room");
|
||||
expect(sendParams.message).toBe("Substantive channel reply");
|
||||
});
|
||||
|
||||
it("bypasses the announce decider for delayed same-session channel replies", async () => {
|
||||
vi.mocked(readLatestAssistantReplySnapshot).mockResolvedValueOnce({
|
||||
text: "Delayed channel reply",
|
||||
fingerprint: "delayed-channel-reply",
|
||||
});
|
||||
|
||||
await runSessionsSendA2AFlow({
|
||||
targetSessionKey: "agent:main:discord:channel:target-room",
|
||||
displayKey: "agent:main:discord:channel:target-room",
|
||||
message: "Test message",
|
||||
announceTimeoutMs: 10_000,
|
||||
maxPingPongTurns: 2,
|
||||
requesterSessionKey: "agent:main:discord:channel:target-room",
|
||||
requesterChannel: "discord",
|
||||
baseline: {
|
||||
text: "Previous channel reply",
|
||||
fingerprint: "previous-channel-reply",
|
||||
},
|
||||
waitRunId: "run-delayed-channel",
|
||||
});
|
||||
|
||||
expect(firstMockArg(vi.mocked(waitForAgentRun), "agent run wait").runId).toBe(
|
||||
"run-delayed-channel",
|
||||
);
|
||||
expect(
|
||||
firstMockArg(vi.mocked(readLatestAssistantReplySnapshot), "assistant reply snapshot")
|
||||
.sessionKey,
|
||||
).toBe("agent:main:discord:channel:target-room");
|
||||
expect(runAgentStep).not.toHaveBeenCalled();
|
||||
const sendCall = requireGatewayCall("send");
|
||||
const sendParams = sendCall.params as Record<string, unknown>;
|
||||
expect(sendParams.channel).toBe("discord");
|
||||
expect(sendParams.to).toBe("channel:target-room");
|
||||
expect(sendParams.message).toBe("Delayed channel reply");
|
||||
});
|
||||
|
||||
it("does not direct-deliver a delayed same-session reply that matches the baseline", async () => {
|
||||
vi.mocked(readLatestAssistantReplySnapshot).mockResolvedValueOnce({
|
||||
text: "Previous channel reply",
|
||||
fingerprint: "previous-channel-reply",
|
||||
});
|
||||
|
||||
await runSessionsSendA2AFlow({
|
||||
targetSessionKey: "agent:main:discord:channel:target-room",
|
||||
displayKey: "agent:main:discord:channel:target-room",
|
||||
message: "Test message",
|
||||
announceTimeoutMs: 10_000,
|
||||
maxPingPongTurns: 2,
|
||||
requesterSessionKey: "agent:main:discord:channel:target-room",
|
||||
requesterChannel: "discord",
|
||||
baseline: {
|
||||
text: "Previous channel reply",
|
||||
fingerprint: "previous-channel-reply",
|
||||
},
|
||||
waitRunId: "run-delayed-channel",
|
||||
});
|
||||
|
||||
expect(firstMockArg(vi.mocked(waitForAgentRun), "agent run wait").runId).toBe(
|
||||
"run-delayed-channel",
|
||||
);
|
||||
expect(runAgentStep).not.toHaveBeenCalled();
|
||||
expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not direct-deliver a delayed same-session reply without a baseline", async () => {
|
||||
vi.mocked(readLatestAssistantReplySnapshot).mockResolvedValueOnce({
|
||||
text: "Maybe stale channel reply",
|
||||
fingerprint: "maybe-stale-channel-reply",
|
||||
});
|
||||
|
||||
await runSessionsSendA2AFlow({
|
||||
targetSessionKey: "agent:main:discord:channel:target-room",
|
||||
displayKey: "agent:main:discord:channel:target-room",
|
||||
message: "Test message",
|
||||
announceTimeoutMs: 10_000,
|
||||
maxPingPongTurns: 2,
|
||||
requesterSessionKey: "agent:main:discord:channel:target-room",
|
||||
requesterChannel: "discord",
|
||||
waitRunId: "run-delayed-channel",
|
||||
});
|
||||
|
||||
expect(firstMockArg(vi.mocked(waitForAgentRun), "agent run wait").runId).toBe(
|
||||
"run-delayed-channel",
|
||||
);
|
||||
expect(runAgentStep).not.toHaveBeenCalled();
|
||||
expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined();
|
||||
});
|
||||
|
||||
it("keeps the announce decider for same-session sends from a different channel", async () => {
|
||||
vi.mocked(runAgentStep).mockResolvedValueOnce("ANNOUNCE_SKIP");
|
||||
|
||||
await runSessionsSendA2AFlow({
|
||||
targetSessionKey: "agent:main:discord:channel:target-room",
|
||||
displayKey: "agent:main:discord:channel:target-room",
|
||||
message: "Test message",
|
||||
announceTimeoutMs: 10_000,
|
||||
maxPingPongTurns: 2,
|
||||
requesterSessionKey: "agent:main:discord:channel:target-room",
|
||||
requesterChannel: "webchat",
|
||||
roundOneReply: "Substantive channel reply",
|
||||
});
|
||||
|
||||
expect(runAgentStep).toHaveBeenCalledTimes(1);
|
||||
const stepInput = firstMockArg(vi.mocked(runAgentStep), "agent step");
|
||||
expect(stepInput.message).toBe("Agent-to-agent announce step.");
|
||||
expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined();
|
||||
});
|
||||
|
||||
it.each([
|
||||
{
|
||||
source: "deliveryContext.accountId",
|
||||
@@ -207,7 +335,7 @@ describe("runSessionsSendA2AFlow announce delivery", () => {
|
||||
expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined();
|
||||
});
|
||||
|
||||
it.each(["NO_REPLY", "HEARTBEAT_OK"])(
|
||||
it.each(["NO_REPLY", "HEARTBEAT_OK", "ANNOUNCE_SKIP"])(
|
||||
"suppresses exact announce control reply %s before channel delivery",
|
||||
async (announceReply) => {
|
||||
vi.mocked(runAgentStep).mockResolvedValueOnce(announceReply);
|
||||
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
import { runAgentStep } from "./agent-step.js";
|
||||
import { resolveAnnounceTarget } from "./sessions-announce-target.js";
|
||||
import {
|
||||
type AnnounceTarget,
|
||||
buildAgentToAgentAnnounceContext,
|
||||
buildAgentToAgentReplyContext,
|
||||
isAnnounceSkip,
|
||||
@@ -34,6 +35,38 @@ let sessionsSendA2ADeps: {
|
||||
callGateway: GatewayCaller;
|
||||
} = defaultSessionsSendA2ADeps;
|
||||
|
||||
async function deliverAnnounceReply(params: {
|
||||
announceTarget: AnnounceTarget;
|
||||
message: string;
|
||||
runContextId: string;
|
||||
}) {
|
||||
const message = params.message.trim();
|
||||
if (!message) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await sessionsSendA2ADeps.callGateway({
|
||||
method: "send",
|
||||
params: {
|
||||
to: params.announceTarget.to,
|
||||
message,
|
||||
channel: params.announceTarget.channel,
|
||||
accountId: params.announceTarget.accountId,
|
||||
threadId: params.announceTarget.threadId,
|
||||
idempotencyKey: crypto.randomUUID(),
|
||||
},
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn("sessions_send announce delivery failed", {
|
||||
runId: params.runContextId,
|
||||
channel: params.announceTarget.channel,
|
||||
to: params.announceTarget.to,
|
||||
error: formatErrorMessage(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function runSessionsSendA2AFlow(params: {
|
||||
targetSessionKey: string;
|
||||
displayKey: string;
|
||||
@@ -83,6 +116,27 @@ export async function runSessionsSendA2AFlow(params: {
|
||||
});
|
||||
const targetChannel = announceTarget?.channel ?? "unknown";
|
||||
|
||||
// A same-session send is a human-facing source-channel reply, not a true
|
||||
// agent-to-agent announcement. Asking the same session to decide whether to
|
||||
// announce can learn stale ANNOUNCE_SKIP patterns from its own history and
|
||||
// silently drop a normal channel response.
|
||||
if (
|
||||
announceTarget &&
|
||||
params.requesterSessionKey &&
|
||||
params.requesterSessionKey === params.targetSessionKey &&
|
||||
params.requesterChannel === announceTarget.channel
|
||||
) {
|
||||
if (params.waitRunId && !params.roundOneReply && !params.baseline) {
|
||||
return;
|
||||
}
|
||||
await deliverAnnounceReply({
|
||||
announceTarget,
|
||||
message: latestReply,
|
||||
runContextId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
params.maxPingPongTurns > 0 &&
|
||||
params.requesterSessionKey &&
|
||||
@@ -152,27 +206,11 @@ export async function runSessionsSendA2AFlow(params: {
|
||||
!isAnnounceSkip(announceReply) &&
|
||||
!isNonDeliverableSessionsReply(announceReply)
|
||||
) {
|
||||
try {
|
||||
await sessionsSendA2ADeps.callGateway({
|
||||
method: "send",
|
||||
params: {
|
||||
to: announceTarget.to,
|
||||
message: announceReply.trim(),
|
||||
channel: announceTarget.channel,
|
||||
accountId: announceTarget.accountId,
|
||||
threadId: announceTarget.threadId,
|
||||
idempotencyKey: crypto.randomUUID(),
|
||||
},
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn("sessions_send announce delivery failed", {
|
||||
runId: runContextId,
|
||||
channel: announceTarget.channel,
|
||||
to: announceTarget.to,
|
||||
error: formatErrorMessage(err),
|
||||
});
|
||||
}
|
||||
await deliverAnnounceReply({
|
||||
announceTarget,
|
||||
message: announceReply,
|
||||
runContextId,
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn("sessions_send announce flow failed", {
|
||||
|
||||
@@ -509,18 +509,30 @@ export function createSessionsSendTool(opts?: {
|
||||
});
|
||||
}
|
||||
|
||||
const requesterSessionKey = opts?.agentSessionKey;
|
||||
const requesterChannel = opts?.agentChannel;
|
||||
const sameSessionA2A = requesterSessionKey === resolvedKey;
|
||||
|
||||
// Capture the pre-run assistant snapshot before starting the nested run.
|
||||
// Fast in-process test doubles and short-circuit agent paths can finish
|
||||
// before we reach the post-run read, which would otherwise make the new
|
||||
// reply look like the baseline and hide it from the caller.
|
||||
// Fire-and-forget same-session sends still need this baseline because the
|
||||
// A2A follow-up may deliver directly to the source channel.
|
||||
const baselineReply =
|
||||
timeoutSeconds === 0
|
||||
? undefined
|
||||
: await readLatestAssistantReplySnapshot({
|
||||
timeoutSeconds !== 0
|
||||
? await readLatestAssistantReplySnapshot({
|
||||
sessionKey: resolvedKey,
|
||||
limit: SESSIONS_SEND_REPLY_HISTORY_LIMIT,
|
||||
callGateway: gatewayCall,
|
||||
});
|
||||
})
|
||||
: sameSessionA2A
|
||||
? await readLatestAssistantReplySnapshot({
|
||||
sessionKey: resolvedKey,
|
||||
limit: SESSIONS_SEND_REPLY_HISTORY_LIMIT,
|
||||
callGateway: gatewayCall,
|
||||
}).catch(() => undefined)
|
||||
: undefined;
|
||||
|
||||
const agentMessageContext = buildAgentToAgentMessageContext({
|
||||
requesterSessionKey: opts?.agentSessionKey,
|
||||
@@ -543,8 +555,6 @@ export function createSessionsSendTool(opts?: {
|
||||
extraSystemPrompt: agentMessageContext,
|
||||
inputProvenance,
|
||||
};
|
||||
const requesterSessionKey = opts?.agentSessionKey;
|
||||
const requesterChannel = opts?.agentChannel;
|
||||
const maxPingPongTurns = resolvePingPongTurns(cfg);
|
||||
|
||||
// Skip the A2A ping-pong + announce flow when the current caller is the
|
||||
|
||||
@@ -929,6 +929,83 @@ describe("sessions_send gating", () => {
|
||||
expect(details.sessionKey).toBe(MAIN_AGENT_SESSION_KEY);
|
||||
});
|
||||
|
||||
it("passes a baseline into fire-and-forget same-session A2A delivery", async () => {
|
||||
const { runSessionsSendA2AFlow } = await import("./sessions-send-tool.a2a.js");
|
||||
vi.mocked(runSessionsSendA2AFlow).mockClear();
|
||||
const tool = createMainSessionsSendTool();
|
||||
const staleAssistantMessage = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "older reply from a previous run" }],
|
||||
timestamp: 20,
|
||||
};
|
||||
|
||||
callGatewayMock.mockImplementation(async (opts: unknown) => {
|
||||
const request = opts as { method?: string };
|
||||
if (request.method === "sessions.list") {
|
||||
return {
|
||||
path: "/tmp/sessions.json",
|
||||
sessions: [{ key: MAIN_AGENT_SESSION_KEY, kind: "direct" }],
|
||||
};
|
||||
}
|
||||
if (request.method === "chat.history") {
|
||||
return { messages: [staleAssistantMessage] };
|
||||
}
|
||||
if (request.method === "agent") {
|
||||
return { runId: "run-fire-and-forget", acceptedAt: 123 };
|
||||
}
|
||||
return {};
|
||||
});
|
||||
|
||||
const result = await tool.execute("call-fire-and-forget-same-session", {
|
||||
sessionKey: MAIN_AGENT_SESSION_KEY,
|
||||
message: "ping",
|
||||
timeoutSeconds: 0,
|
||||
});
|
||||
|
||||
const details = requireDetails(result);
|
||||
expect(details.status).toBe("accepted");
|
||||
expect(details.sessionKey).toBe(MAIN_AGENT_SESSION_KEY);
|
||||
const flowParams = vi.mocked(runSessionsSendA2AFlow).mock.calls[0]?.[0];
|
||||
expect(flowParams?.waitRunId).toBe("run-fire-and-forget");
|
||||
expect(flowParams?.baseline?.text).toBe("older reply from a previous run");
|
||||
});
|
||||
|
||||
it("accepts fire-and-forget same-session sends when baseline history is unavailable", async () => {
|
||||
const { runSessionsSendA2AFlow } = await import("./sessions-send-tool.a2a.js");
|
||||
vi.mocked(runSessionsSendA2AFlow).mockClear();
|
||||
const tool = createMainSessionsSendTool();
|
||||
|
||||
callGatewayMock.mockImplementation(async (opts: unknown) => {
|
||||
const request = opts as { method?: string };
|
||||
if (request.method === "sessions.list") {
|
||||
return {
|
||||
path: "/tmp/sessions.json",
|
||||
sessions: [{ key: MAIN_AGENT_SESSION_KEY, kind: "direct" }],
|
||||
};
|
||||
}
|
||||
if (request.method === "chat.history") {
|
||||
throw new Error("history unavailable");
|
||||
}
|
||||
if (request.method === "agent") {
|
||||
return { runId: "run-fire-and-forget", acceptedAt: 123 };
|
||||
}
|
||||
return {};
|
||||
});
|
||||
|
||||
const result = await tool.execute("call-fire-and-forget-history-fail", {
|
||||
sessionKey: MAIN_AGENT_SESSION_KEY,
|
||||
message: "ping",
|
||||
timeoutSeconds: 0,
|
||||
});
|
||||
|
||||
const details = requireDetails(result);
|
||||
expect(details.status).toBe("accepted");
|
||||
expect(details.sessionKey).toBe(MAIN_AGENT_SESSION_KEY);
|
||||
const flowParams = vi.mocked(runSessionsSendA2AFlow).mock.calls[0]?.[0];
|
||||
expect(flowParams?.waitRunId).toBe("run-fire-and-forget");
|
||||
expect(flowParams?.baseline).toBeUndefined();
|
||||
});
|
||||
|
||||
it("caps oversized timeoutSeconds before waiting for the target run", async () => {
|
||||
const tool = createMainSessionsSendTool();
|
||||
const waitTimeouts: unknown[] = [];
|
||||
|
||||
@@ -375,7 +375,6 @@ function listProposalEntries(params: {
|
||||
if (!query) {
|
||||
return true;
|
||||
}
|
||||
const normalizedSearch = normalizedQuery;
|
||||
return [
|
||||
proposal.id,
|
||||
proposal.title,
|
||||
@@ -386,7 +385,9 @@ function listProposalEntries(params: {
|
||||
const lower = value.toLowerCase();
|
||||
return (
|
||||
lower.includes(query) ||
|
||||
(normalizedSearch ? normalizeProposalSearchText(lower).includes(normalizedSearch) : false)
|
||||
(normalizedQuery !== undefined &&
|
||||
normalizedQuery.length > 0 &&
|
||||
normalizeProposalSearchText(lower).includes(normalizedQuery))
|
||||
);
|
||||
});
|
||||
})
|
||||
|
||||
@@ -105,7 +105,8 @@ function collectSlotPluginIds(cfg: OpenClawConfig): string[] {
|
||||
return ["memory", "contextEngine"]
|
||||
.map((key) => normalizeId(slots?.[key]))
|
||||
.filter(
|
||||
(pluginId): pluginId is string => pluginId !== null && pluginId.toLowerCase() !== "none",
|
||||
(pluginId): pluginId is string =>
|
||||
typeof pluginId === "string" && pluginId.toLowerCase() !== "none",
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -315,7 +315,7 @@ describe("Session Store Cache", () => {
|
||||
throw new Error("Expected cached entry");
|
||||
}
|
||||
expect(entry?.polluted).toBeUndefined();
|
||||
expect(Object.hasOwn(entry, "__proto__")).toBe(true);
|
||||
expect(Object.hasOwn(entry as SessionEntry, "__proto__")).toBe(true);
|
||||
expect(Object.prototype).not.toHaveProperty("polluted");
|
||||
});
|
||||
|
||||
|
||||
@@ -421,17 +421,17 @@ function createPreferredProviderMatcher(params: {
|
||||
if (cached !== undefined) {
|
||||
return cached;
|
||||
}
|
||||
const preferredOwners = preferredOwnerPluginIdSet;
|
||||
if (!preferredOwnerPluginIdSet) {
|
||||
entryProviderCache.set(normalizedEntryProvider, false);
|
||||
return false;
|
||||
}
|
||||
const value =
|
||||
preferredOwners !== undefined &&
|
||||
Boolean(
|
||||
resolveOwningPluginIdsForProviderRef({
|
||||
provider: normalizedEntryProvider,
|
||||
config: params.cfg,
|
||||
workspaceDir: params.workspaceDir,
|
||||
env: params.env,
|
||||
})?.some((pluginId) => preferredOwners.has(pluginId)),
|
||||
);
|
||||
resolveOwningPluginIdsForProviderRef({
|
||||
provider: normalizedEntryProvider,
|
||||
config: params.cfg,
|
||||
workspaceDir: params.workspaceDir,
|
||||
env: params.env,
|
||||
})?.some((pluginId) => preferredOwnerPluginIdSet.has(pluginId)) ?? false;
|
||||
entryProviderCache.set(normalizedEntryProvider, value);
|
||||
return value;
|
||||
};
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { resolveTimerTimeoutMs } from "../shared/number-coercion.js";
|
||||
import { MAX_SAFE_TIMEOUT_DELAY_MS } from "../utils/timer-delay.js";
|
||||
import { MAX_TIMER_TIMEOUT_MS } from "../shared/number-coercion.js";
|
||||
import { resolveRetryConfig, retryAsync } from "./retry.js";
|
||||
|
||||
const randomMocks = vi.hoisted(() => ({
|
||||
@@ -206,12 +205,11 @@ describe("retryAsync", () => {
|
||||
vi.useFakeTimers();
|
||||
const timeoutSpy = vi.spyOn(globalThis, "setTimeout");
|
||||
const fn = vi.fn().mockRejectedValueOnce(new Error("boom")).mockResolvedValueOnce("ok");
|
||||
const scheduledDelayMs = resolveTimerTimeoutMs(MAX_SAFE_TIMEOUT_DELAY_MS, 0, 0);
|
||||
try {
|
||||
const promise = retryAsync(fn, 2, 3_000_000_000);
|
||||
await vi.advanceTimersByTimeAsync(MAX_SAFE_TIMEOUT_DELAY_MS);
|
||||
await vi.advanceTimersByTimeAsync(MAX_TIMER_TIMEOUT_MS);
|
||||
await expect(promise).resolves.toBe("ok");
|
||||
expect(timeoutSpy).toHaveBeenCalledWith(expect.any(Function), scheduledDelayMs);
|
||||
expect(timeoutSpy).toHaveBeenCalledWith(expect.any(Function), MAX_TIMER_TIMEOUT_MS);
|
||||
} finally {
|
||||
timeoutSpy.mockRestore();
|
||||
vi.clearAllTimers();
|
||||
@@ -228,14 +226,13 @@ describe("retryAsync", () => {
|
||||
.mockRejectedValueOnce(new Error("boom-1"))
|
||||
.mockRejectedValueOnce(new Error("boom-2"))
|
||||
.mockResolvedValueOnce("ok");
|
||||
const scheduledDelayMs = resolveTimerTimeoutMs(MAX_SAFE_TIMEOUT_DELAY_MS, 0, 0);
|
||||
try {
|
||||
const promise = retryAsync(fn, 3, Number.MAX_VALUE);
|
||||
await vi.advanceTimersByTimeAsync(MAX_SAFE_TIMEOUT_DELAY_MS);
|
||||
await vi.advanceTimersByTimeAsync(MAX_SAFE_TIMEOUT_DELAY_MS);
|
||||
await vi.advanceTimersByTimeAsync(MAX_TIMER_TIMEOUT_MS);
|
||||
await vi.advanceTimersByTimeAsync(MAX_TIMER_TIMEOUT_MS);
|
||||
await expect(promise).resolves.toBe("ok");
|
||||
expect(timeoutSpy).toHaveBeenNthCalledWith(1, expect.any(Function), scheduledDelayMs);
|
||||
expect(timeoutSpy).toHaveBeenNthCalledWith(2, expect.any(Function), scheduledDelayMs);
|
||||
expect(timeoutSpy).toHaveBeenNthCalledWith(1, expect.any(Function), MAX_TIMER_TIMEOUT_MS);
|
||||
expect(timeoutSpy).toHaveBeenNthCalledWith(2, expect.any(Function), MAX_TIMER_TIMEOUT_MS);
|
||||
} finally {
|
||||
timeoutSpy.mockRestore();
|
||||
vi.clearAllTimers();
|
||||
@@ -319,8 +316,8 @@ describe("resolveRetryConfig", () => {
|
||||
},
|
||||
expected: {
|
||||
attempts: 3,
|
||||
minDelayMs: MAX_SAFE_TIMEOUT_DELAY_MS,
|
||||
maxDelayMs: MAX_SAFE_TIMEOUT_DELAY_MS,
|
||||
minDelayMs: MAX_TIMER_TIMEOUT_MS,
|
||||
maxDelayMs: MAX_TIMER_TIMEOUT_MS,
|
||||
jitter: 0,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { asFiniteNumber } from "@openclaw/normalization-core/number-coercion";
|
||||
import { MAX_TIMER_TIMEOUT_MS, resolveTimerTimeoutMs } from "../shared/number-coercion.js";
|
||||
import { sleep } from "../utils.js";
|
||||
import { MAX_SAFE_TIMEOUT_DELAY_MS, resolveSafeTimeoutDelayMs } from "../utils/timer-delay.js";
|
||||
import { generateSecureFraction } from "./secure-random.js";
|
||||
|
||||
export type RetryConfig = {
|
||||
@@ -49,9 +49,9 @@ function resolveAttemptCount(value: unknown, fallback: number): number {
|
||||
|
||||
function resolveRetryDelayMs(value: number): number {
|
||||
if (value === Number.POSITIVE_INFINITY) {
|
||||
return MAX_SAFE_TIMEOUT_DELAY_MS;
|
||||
return MAX_TIMER_TIMEOUT_MS;
|
||||
}
|
||||
return resolveSafeTimeoutDelayMs(value, { minMs: 0 });
|
||||
return resolveTimerTimeoutMs(value, 0, 0);
|
||||
}
|
||||
|
||||
export function resolveRetryConfig(
|
||||
|
||||
Reference in New Issue
Block a user