mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 05:51:15 +08:00
Harden final delivery routing refresh (#83835)
* harden final delivery routing refresh * add changelog for final delivery hardening
This commit is contained in:
@@ -83,6 +83,7 @@ Docs: https://docs.openclaw.ai
|
||||
- LM Studio: resolve env-template API keys like `${LMSTUDIO_API_KEY}` through the standard SecretInput path instead of sending the raw template as the bearer token, and preserve header-auth and discovery-key precedence when the template is unset. Fixes #80495. (#80568) Thanks @MonkeyLeeT.
|
||||
- Discord/subagents: route the initial reply from thread-bound delegated sessions into the bound Discord thread instead of the parent channel. Fixes #83170. (#83172) Thanks @100menotu001.
|
||||
- Gateway/sessions: rotate failed agent sessions when their transcript file is missing instead of wedging per-channel lanes. Fixes #83488. (#83553) Thanks @LLagoon3.
|
||||
- Agents: refresh final-delivery routing from fresh session state before declaring a no-send failure, keeping recovered runs on the normal durable delivery path. (#83835) Thanks @joshavant.
|
||||
- Media: prevent image metadata probing from invoking external decoder delegates on unrecognized image bytes, and stop fallback chaining after real processing errors.
|
||||
- Media: install Sharp with the root package and fall back to sips, Windows native imaging, ImageMagick, GraphicsMagick, or ffmpeg for image resizing/conversion when Sharp is unavailable. Fixes #83401. Thanks @scotthuang.
|
||||
- Telegram: deliver generated media completions back into forum topics by preserving topic IDs across requester-agent handoff. (#83556) Thanks @fuller-stack-dev.
|
||||
|
||||
@@ -1524,6 +1524,21 @@ async function agentCommandInternal(
|
||||
}
|
||||
|
||||
const { deliverAgentCommandResult } = await loadDeliveryRuntime();
|
||||
const resolveFreshSessionEntryForDelivery =
|
||||
sessionStore && sessionKey
|
||||
? async (): Promise<SessionEntry | undefined> => {
|
||||
const { loadSessionStore } = await loadSessionStoreRuntime();
|
||||
const freshStore = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
clone: false,
|
||||
});
|
||||
const freshEntry = freshStore[sessionKey];
|
||||
if (freshEntry) {
|
||||
sessionStore[sessionKey] = freshEntry;
|
||||
}
|
||||
return freshEntry;
|
||||
}
|
||||
: undefined;
|
||||
const deliveryResult = await deliverAgentCommandResult({
|
||||
cfg,
|
||||
deps: resolvedDeps,
|
||||
@@ -1531,6 +1546,7 @@ async function agentCommandInternal(
|
||||
opts,
|
||||
outboundSession,
|
||||
sessionEntry,
|
||||
resolveFreshSessionEntryForDelivery,
|
||||
result,
|
||||
payloads,
|
||||
});
|
||||
|
||||
@@ -103,6 +103,9 @@ function latestNormalizerOptions(): MediaNormalizerOptions {
|
||||
}
|
||||
|
||||
function latestOutboundDeliveryArgs(): {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
payloads: ReplyPayload[];
|
||||
bestEffort?: boolean;
|
||||
queuePolicy?: string;
|
||||
@@ -111,7 +114,14 @@ function latestOutboundDeliveryArgs(): {
|
||||
if (!args || typeof args !== "object") {
|
||||
throw new Error("expected outbound delivery arguments");
|
||||
}
|
||||
return args as { payloads: ReplyPayload[]; bestEffort?: boolean; queuePolicy?: string };
|
||||
return args as {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
payloads: ReplyPayload[];
|
||||
bestEffort?: boolean;
|
||||
queuePolicy?: string;
|
||||
};
|
||||
}
|
||||
|
||||
type DeliveryStatusLike = {
|
||||
@@ -329,6 +339,62 @@ describe("normalizeAgentCommandReplyPayloads", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("refreshes stale implicit session routing before final delivery", async () => {
|
||||
deliverOutboundPayloadsMock.mockResolvedValue([{ channel: "slack", messageId: "msg-1" }]);
|
||||
const runtime = { log: vi.fn(), error: vi.fn() };
|
||||
const resolveFreshSessionEntryForDelivery = vi.fn(async () => ({
|
||||
sessionId: "session-1",
|
||||
updatedAt: 2,
|
||||
deliveryContext: {
|
||||
channel: "slack",
|
||||
to: "#fresh",
|
||||
accountId: "workspace-1",
|
||||
},
|
||||
}));
|
||||
|
||||
const delivered = await deliverAgentCommandResult({
|
||||
cfg: {
|
||||
agents: {
|
||||
list: [{ id: "tester", workspace: "/tmp/agent-workspace" }],
|
||||
},
|
||||
} as OpenClawConfig,
|
||||
deps: {} as CliDeps,
|
||||
runtime: runtime as never,
|
||||
opts: {
|
||||
message: "go",
|
||||
deliver: true,
|
||||
channel: "slack",
|
||||
sessionKey: "agent:tester:main",
|
||||
} as AgentCommandOpts,
|
||||
outboundSession: {
|
||||
key: "agent:tester:main",
|
||||
agentId: "tester",
|
||||
} as never,
|
||||
sessionEntry: {
|
||||
sessionId: "session-1",
|
||||
updatedAt: 1,
|
||||
},
|
||||
resolveFreshSessionEntryForDelivery,
|
||||
payloads: [{ text: "final answer" }],
|
||||
result: createResult(),
|
||||
});
|
||||
|
||||
expect(resolveFreshSessionEntryForDelivery).toHaveBeenCalledTimes(1);
|
||||
expect(deliverOutboundPayloadsMock).toHaveBeenCalledTimes(1);
|
||||
const deliverArgs = latestOutboundDeliveryArgs();
|
||||
expect(deliverArgs.channel).toBe("slack");
|
||||
expect(deliverArgs.to).toBe("#fresh");
|
||||
expect(deliverArgs.accountId).toBe("workspace-1");
|
||||
expect(delivered.deliverySucceeded).toBe(true);
|
||||
expectDeliveryStatusFields(delivered, {
|
||||
requested: true,
|
||||
attempted: true,
|
||||
status: "sent",
|
||||
succeeded: true,
|
||||
resultCount: 1,
|
||||
});
|
||||
});
|
||||
|
||||
it("does not report success when best-effort delivery records an error", async () => {
|
||||
deliverOutboundPayloadsMock.mockImplementationOnce(async (params: unknown) => {
|
||||
(
|
||||
|
||||
@@ -338,6 +338,7 @@ export async function deliverAgentCommandResult(params: {
|
||||
opts: AgentCommandOpts;
|
||||
outboundSession: OutboundSessionContext | undefined;
|
||||
sessionEntry: SessionEntry | undefined;
|
||||
resolveFreshSessionEntryForDelivery?: () => Promise<SessionEntry | undefined>;
|
||||
result: RunResult;
|
||||
payloads: RunResult["payloads"];
|
||||
}): Promise<AgentCommandDeliveryResult> {
|
||||
@@ -349,76 +350,144 @@ export async function deliverAgentCommandResult(params: {
|
||||
const turnSourceTo = opts.runContext?.currentChannelId ?? opts.to;
|
||||
const turnSourceAccountId = opts.runContext?.accountId ?? opts.accountId;
|
||||
const turnSourceThreadId = opts.runContext?.currentThreadTs ?? opts.threadId;
|
||||
const deliveryPlan = resolveAgentDeliveryPlan({
|
||||
sessionEntry,
|
||||
requestedChannel: opts.replyChannel ?? opts.channel,
|
||||
explicitTo: opts.replyTo ?? opts.to,
|
||||
explicitThreadId: opts.threadId,
|
||||
accountId: opts.replyAccountId ?? opts.accountId,
|
||||
wantsDelivery: deliver,
|
||||
turnSourceChannel,
|
||||
turnSourceTo,
|
||||
turnSourceAccountId,
|
||||
turnSourceThreadId,
|
||||
});
|
||||
let deliveryChannel = deliveryPlan.resolvedChannel;
|
||||
const explicitChannelHint = (opts.replyChannel ?? opts.channel)?.trim();
|
||||
if (deliver && isInternalMessageChannel(deliveryChannel) && !explicitChannelHint) {
|
||||
try {
|
||||
const selection = await resolveMessageChannelSelection({ cfg });
|
||||
deliveryChannel = selection.channel;
|
||||
} catch {
|
||||
// Keep the internal channel marker; error handling below reports the failure.
|
||||
const resolveDeliveryRouting = async (candidateSessionEntry: SessionEntry | undefined) => {
|
||||
const deliveryPlan = resolveAgentDeliveryPlan({
|
||||
sessionEntry: candidateSessionEntry,
|
||||
requestedChannel: opts.replyChannel ?? opts.channel,
|
||||
explicitTo: opts.replyTo ?? opts.to,
|
||||
explicitThreadId: opts.threadId,
|
||||
accountId: opts.replyAccountId ?? opts.accountId,
|
||||
wantsDelivery: deliver,
|
||||
turnSourceChannel,
|
||||
turnSourceTo,
|
||||
turnSourceAccountId,
|
||||
turnSourceThreadId,
|
||||
});
|
||||
let deliveryChannel = deliveryPlan.resolvedChannel;
|
||||
if (deliver && isInternalMessageChannel(deliveryChannel) && !explicitChannelHint) {
|
||||
try {
|
||||
const selection = await resolveMessageChannelSelection({ cfg });
|
||||
deliveryChannel = selection.channel;
|
||||
} catch {
|
||||
// Keep the internal channel marker; error handling below reports the failure.
|
||||
}
|
||||
}
|
||||
const effectiveDeliveryPlan =
|
||||
deliveryChannel === deliveryPlan.resolvedChannel
|
||||
? deliveryPlan
|
||||
: {
|
||||
...deliveryPlan,
|
||||
resolvedChannel: deliveryChannel,
|
||||
};
|
||||
// Channel docking: delivery channels are resolved via plugin registry.
|
||||
const deliveryPlugin =
|
||||
deliver && !isInternalMessageChannel(deliveryChannel)
|
||||
? getChannelPlugin(normalizeChannelId(deliveryChannel) ?? deliveryChannel)
|
||||
: undefined;
|
||||
const isDeliveryChannelKnown =
|
||||
isInternalMessageChannel(deliveryChannel) || Boolean(deliveryPlugin);
|
||||
const targetMode =
|
||||
opts.deliveryTargetMode ??
|
||||
effectiveDeliveryPlan.deliveryTargetMode ??
|
||||
(opts.to ? "explicit" : "implicit");
|
||||
const resolvedAccountId = effectiveDeliveryPlan.resolvedAccountId;
|
||||
const resolved =
|
||||
deliver && isDeliveryChannelKnown && deliveryChannel
|
||||
? resolveAgentOutboundTarget({
|
||||
cfg,
|
||||
plan: effectiveDeliveryPlan,
|
||||
targetMode,
|
||||
validateExplicitTarget: true,
|
||||
})
|
||||
: {
|
||||
resolvedTarget: null,
|
||||
resolvedTo: effectiveDeliveryPlan.resolvedTo,
|
||||
targetMode,
|
||||
};
|
||||
const resolvedThreadId = deliveryPlan.resolvedThreadId ?? opts.threadId;
|
||||
const replyTransport =
|
||||
deliveryPlugin?.threading?.resolveReplyTransport?.({
|
||||
cfg,
|
||||
accountId: resolvedAccountId,
|
||||
threadId: resolvedThreadId,
|
||||
}) ?? null;
|
||||
return {
|
||||
deliveryPlan,
|
||||
deliveryChannel,
|
||||
effectiveDeliveryPlan,
|
||||
deliveryPlugin,
|
||||
isDeliveryChannelKnown,
|
||||
targetMode,
|
||||
resolvedAccountId,
|
||||
resolved,
|
||||
resolvedTarget: resolved.resolvedTarget,
|
||||
deliveryTarget: resolved.resolvedTo,
|
||||
resolvedThreadId,
|
||||
resolvedReplyToId: replyTransport?.replyToId ?? undefined,
|
||||
resolvedThreadTarget:
|
||||
replyTransport && Object.hasOwn(replyTransport, "threadId")
|
||||
? (replyTransport.threadId ?? null)
|
||||
: (resolvedThreadId ?? null),
|
||||
};
|
||||
};
|
||||
const deliveryRoutingFailureReason = (
|
||||
route: Awaited<ReturnType<typeof resolveDeliveryRouting>>,
|
||||
): string | undefined => {
|
||||
if (!deliver) {
|
||||
return undefined;
|
||||
}
|
||||
if (isInternalMessageChannel(route.deliveryChannel)) {
|
||||
return "channel_resolved_to_internal";
|
||||
}
|
||||
if (!route.isDeliveryChannelKnown) {
|
||||
return "unknown_channel";
|
||||
}
|
||||
if (route.resolvedTarget && !route.resolvedTarget.ok) {
|
||||
return "invalid_delivery_target";
|
||||
}
|
||||
if (!route.deliveryTarget) {
|
||||
return "no_delivery_target";
|
||||
}
|
||||
return undefined;
|
||||
};
|
||||
const isRetryableFreshSessionRoutingFailure = (
|
||||
route: Awaited<ReturnType<typeof resolveDeliveryRouting>>,
|
||||
): boolean => {
|
||||
const reason = deliveryRoutingFailureReason(route);
|
||||
if (!reason) {
|
||||
return false;
|
||||
}
|
||||
if (reason === "unknown_channel") {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
let deliveryRouting = await resolveDeliveryRouting(sessionEntry);
|
||||
if (isRetryableFreshSessionRoutingFailure(deliveryRouting)) {
|
||||
const freshSessionEntry = await params.resolveFreshSessionEntryForDelivery?.();
|
||||
if (freshSessionEntry && freshSessionEntry !== sessionEntry) {
|
||||
const freshRouting = await resolveDeliveryRouting(freshSessionEntry);
|
||||
if (!deliveryRoutingFailureReason(freshRouting)) {
|
||||
if (!opts.json) {
|
||||
runtime.log(
|
||||
`[delivery] refreshed session routing before final delivery (session=${effectiveSessionKey ?? "unknown"} channel=${freshRouting.deliveryChannel})`,
|
||||
);
|
||||
}
|
||||
deliveryRouting = freshRouting;
|
||||
}
|
||||
}
|
||||
}
|
||||
const effectiveDeliveryPlan =
|
||||
deliveryChannel === deliveryPlan.resolvedChannel
|
||||
? deliveryPlan
|
||||
: {
|
||||
...deliveryPlan,
|
||||
resolvedChannel: deliveryChannel,
|
||||
};
|
||||
// Channel docking: delivery channels are resolved via plugin registry.
|
||||
const deliveryPlugin =
|
||||
deliver && !isInternalMessageChannel(deliveryChannel)
|
||||
? getChannelPlugin(normalizeChannelId(deliveryChannel) ?? deliveryChannel)
|
||||
: undefined;
|
||||
|
||||
const isDeliveryChannelKnown =
|
||||
isInternalMessageChannel(deliveryChannel) || Boolean(deliveryPlugin);
|
||||
|
||||
const targetMode =
|
||||
opts.deliveryTargetMode ??
|
||||
effectiveDeliveryPlan.deliveryTargetMode ??
|
||||
(opts.to ? "explicit" : "implicit");
|
||||
const resolvedAccountId = effectiveDeliveryPlan.resolvedAccountId;
|
||||
const resolved =
|
||||
deliver && isDeliveryChannelKnown && deliveryChannel
|
||||
? resolveAgentOutboundTarget({
|
||||
cfg,
|
||||
plan: effectiveDeliveryPlan,
|
||||
targetMode,
|
||||
validateExplicitTarget: true,
|
||||
})
|
||||
: {
|
||||
resolvedTarget: null,
|
||||
resolvedTo: effectiveDeliveryPlan.resolvedTo,
|
||||
targetMode,
|
||||
};
|
||||
const resolvedTarget = resolved.resolvedTarget;
|
||||
const deliveryTarget = resolved.resolvedTo;
|
||||
const resolvedThreadId = deliveryPlan.resolvedThreadId ?? opts.threadId;
|
||||
const replyTransport =
|
||||
deliveryPlugin?.threading?.resolveReplyTransport?.({
|
||||
cfg,
|
||||
accountId: resolvedAccountId,
|
||||
threadId: resolvedThreadId,
|
||||
}) ?? null;
|
||||
const resolvedReplyToId = replyTransport?.replyToId ?? undefined;
|
||||
const resolvedThreadTarget =
|
||||
replyTransport && Object.hasOwn(replyTransport, "threadId")
|
||||
? (replyTransport.threadId ?? null)
|
||||
: (resolvedThreadId ?? null);
|
||||
const {
|
||||
deliveryChannel,
|
||||
isDeliveryChannelKnown,
|
||||
resolvedAccountId,
|
||||
resolvedTarget,
|
||||
deliveryTarget,
|
||||
resolvedReplyToId,
|
||||
resolvedThreadTarget,
|
||||
} = deliveryRouting;
|
||||
|
||||
let deliveryLoggedError = false;
|
||||
const logDeliveryError = (err: unknown) => {
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
export { updateSessionStoreAfterAgentRun } from "./session-store.js";
|
||||
export { loadSessionStore } from "../../config/sessions.js";
|
||||
|
||||
Reference in New Issue
Block a user