fix(openai): avoid replay ids when Responses store is disabled

Avoid replaying prior OpenAI Responses reasoning/message/function-call item ids when the outgoing request disables store, while preserving encrypted reasoning and normalized summary arrays for stateless replay. Keep explicit store-enabled OpenAI wrapper paths opted into item-id replay, and cover shared/simple Responses, ChatGPT/Codex Responses, and GitHub Copilot sanitizer behavior.

Regression tests cover store-disabled id omission, encrypted reasoning preservation, idless Copilot reasoning replay, and direct builder payloads. Local proof included focused Vitest, broad lint, broad test-types, bundled-extension lint, plugin boundary checks, autoreview clean, and live OpenAI Responses gpt-5.5 proof.

Co-authored-by: hang <zhanghang02@gmail.com>
This commit is contained in:
waterblue
2026-06-01 05:17:32 +08:00
committed by GitHub
parent 5bc80dbe27
commit 03dec8bb3a
33 changed files with 867 additions and 164 deletions

View File

@@ -18,7 +18,11 @@ export type {
ReplyPayload,
} from "openclaw/plugin-sdk/core";
export type { OpenClawConfig as ClawdbotConfig } from "openclaw/plugin-sdk/core";
export type { RuntimeEnv } from "openclaw/plugin-sdk/runtime";
export type RuntimeEnv = {
log: (...args: unknown[]) => void;
error: (...args: unknown[]) => void;
exit: (code: number) => void;
};
export type { GroupToolPolicyConfig } from "openclaw/plugin-sdk/config-contracts";
export {
DEFAULT_ACCOUNT_ID,

View File

@@ -1,4 +1,4 @@
export type { RuntimeEnv } from "openclaw/plugin-sdk/runtime";
export type { RuntimeEnv } from "../runtime-api.js";
export {
createFixedWindowRateLimiter,
createWebhookAnomalyTracker,

View File

@@ -575,7 +575,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
const queueIdleSideEffects = (options?: { markClosedForReply?: boolean }): Promise<void> => {
const nextIdleSideEffects = idleSideEffectsPromise.then(async () => {
await closeStreaming(options);
await typingCallbacks?.onIdle?.();
await Promise.resolve(typingCallbacks?.onIdle?.());
});
idleSideEffectsPromise = nextIdleSideEffects.catch(() => {});
return nextIdleSideEffects;
@@ -609,7 +609,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (streamingEnabled && renderMode === "card") {
startStreaming();
}
await typingCallbacks?.onReplyStart?.();
await Promise.resolve(typingCallbacks?.onReplyStart?.());
},
deliver: async (payload: ReplyPayload, info) => {
if (info?.kind === "final") {

View File

@@ -64,7 +64,7 @@ describe("github-copilot connection-bound response IDs", () => {
expect(input.map((item) => item.id)).toEqual([withEncrypted, withoutEncrypted]);
});
it("drops unsafe reasoning replay items instead of stripping their IDs", () => {
it("drops unsafe reasoning replay item IDs while keeping idless reasoning replay", () => {
const overlongId = `5PX6gLHXT5wE+Y2tPmUV4gn+${"B".repeat(384)}`;
const input = [
{
@@ -80,6 +80,7 @@ describe("github-copilot connection-bound response IDs", () => {
expect(sanitizeCopilotReplayResponseIds(input)).toBe(true);
expect(input).toEqual([
{ type: "reasoning", encrypted_content: "missing-id", summary: [] },
{ id: "rs_valid", type: "reasoning", encrypted_content: "valid", summary: [] },
]);
});

View File

@@ -44,11 +44,11 @@ export function sanitizeCopilotReplayResponseIds(input: unknown): boolean {
continue;
}
const id = item.id;
// Reasoning items always reference server-side encrypted state bound to the
// original item ID. Rewriting or stripping that ID can turn replay into an
// invalid or ambiguous server-state lookup, so drop unsafe reasoning items.
// Reasoning items with replay IDs reference server-side encrypted state
// bound to that ID. Drop unsafe IDs, but keep the store-disabled idless
// replay form produced by core Responses conversion.
if (item.type === "reasoning") {
if (!isValidReasoningReplayId(id)) {
if (id !== undefined && !isValidReasoningReplayId(id)) {
input.splice(index, 1);
rewrote = true;
}

View File

@@ -127,6 +127,7 @@ describe("wrapCopilotAnthropicStream", () => {
const payload = {
input: [
{ id: reasoningId, type: "reasoning", encrypted_content: "valid-encrypted-payload" },
{ type: "reasoning", encrypted_content: "idless-encrypted-payload", summary: [] },
{
id: overlongReasoningId,
type: "reasoning",
@@ -181,8 +182,13 @@ describe("wrapCopilotAnthropicStream", () => {
onPayload: options.onPayload,
});
expect(payloads[0]?.input[0]?.id).toBe(reasoningId);
expect(payloads[0]?.input.map((item) => item.type)).toEqual(["reasoning", "message"]);
expect(payloads[0]?.input[1]?.id).toMatch(/^msg_[a-f0-9]{16}$/);
expect(payloads[0]?.input.map((item) => item.type)).toEqual([
"reasoning",
"reasoning",
"message",
]);
expect(payloads[0]?.input[1]?.id).toBeUndefined();
expect(payloads[0]?.input[2]?.id).toMatch(/^msg_[a-f0-9]{16}$/);
});
it("rewrites Copilot Responses IDs returned by an existing payload hook", async () => {

View File

@@ -3,14 +3,15 @@ import os from "node:os";
import path from "node:path";
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
import { MAX_TIMER_TIMEOUT_MS } from "openclaw/plugin-sdk/number-runtime";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { createChannelIngressQueue } from "../../../src/channels/message/ingress-queue.js";
import { executeSqliteQuerySync, getNodeSqliteKysely } from "../../../src/infra/kysely-sync.js";
import type { DB as OpenClawStateKyselyDatabase } from "../../../src/state/openclaw-state-db.generated.js";
import {
closeOpenClawStateDatabaseForTest,
createChannelIngressQueueForTests as createChannelIngressQueue,
executeSqliteQuerySync,
getNodeSqliteKysely,
openOpenClawStateDatabase,
} from "../../../src/state/openclaw-state-db.js";
type OpenClawStateKyselyDatabaseForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { clearTelegramRuntime, setTelegramRuntime } from "./runtime.js";
import type { TelegramRuntime } from "./runtime.types.js";
import type { TelegramIngressWorkerMessage } from "./telegram-ingress-worker.js";
@@ -105,7 +106,10 @@ type WorkerPollErrorListener = (message: {
type WorkerMessageListener = (message: TelegramIngressWorkerMessage) => void;
type AsyncVoidFn = () => Promise<void>;
type MockCallSource = { mock: { calls: Array<Array<unknown>> } };
type TelegramPollingTestDatabase = Pick<OpenClawStateKyselyDatabase, "channel_ingress_events">;
type TelegramPollingTestDatabase = Pick<
OpenClawStateKyselyDatabaseForTests,
"channel_ingress_events"
>;
const POLLING_TEST_WATCHDOG_INTERVAL_MS = 30_000;
@@ -115,7 +119,7 @@ function installTelegramIngressQueueRuntime(resolveStateDir: () => string): void
resolveStateDir,
openChannelIngressQueue: (
options?: Omit<Parameters<typeof createChannelIngressQueue>[0], "channelId">,
) => createChannelIngressQueue({ ...(options ?? {}), channelId: "telegram" }),
) => createChannelIngressQueue({ ...options, channelId: "telegram" }),
},
} as TelegramRuntime);
}

View File

@@ -1,9 +1,11 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import {
closeOpenClawStateDatabaseForTest,
createChannelIngressQueueForTests as createChannelIngressQueue,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, describe, expect, it } from "vitest";
import { createChannelIngressQueue } from "../../../src/channels/message/ingress-queue.js";
import { closeOpenClawStateDatabaseForTest } from "../../../src/state/openclaw-state-db.js";
import { clearTelegramRuntime, setTelegramRuntime } from "./runtime.js";
import type { TelegramRuntime } from "./runtime.types.js";
import {
@@ -25,7 +27,7 @@ function installTelegramIngressQueueRuntime(resolveStateDir: () => string): void
resolveStateDir,
openChannelIngressQueue: (
options?: Omit<Parameters<typeof createChannelIngressQueue>[0], "channelId">,
) => createChannelIngressQueue({ ...(options ?? {}), channelId: "telegram" }),
) => createChannelIngressQueue({ ...options, channelId: "telegram" }),
},
} as TelegramRuntime);
}

View File

@@ -1,11 +1,11 @@
import { randomUUID } from "node:crypto";
import os from "node:os";
import path from "node:path";
import {
type ChannelIngressQueue,
type ChannelIngressQueueClaim,
type ChannelIngressQueueClaimRef,
type ChannelIngressQueueRecord,
import type {
ChannelIngressQueue,
ChannelIngressQueueClaim,
ChannelIngressQueueClaimRef,
ChannelIngressQueueRecord,
} from "openclaw/plugin-sdk/channel-outbound";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { getTelegramRuntime } from "./runtime.js";

View File

@@ -40,6 +40,9 @@ function installStateRuntime(): void {
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
openChannelIngressQueue: (() => {
throw new Error("openChannelIngressQueue is not used by voice-call doctor tests");
}) as never,
},
});
}

View File

@@ -24,6 +24,9 @@ function installStateRuntime(): void {
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
openChannelIngressQueue: (() => {
throw new Error("openChannelIngressQueue is not used by voice-call restore tests");
}) as never,
},
});
}

View File

@@ -89,6 +89,9 @@ export function installVoiceCallStateRuntimeForTests(): void {
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
openChannelIngressQueue: (() => {
throw new Error("openChannelIngressQueue is not used by voice-call manager tests");
}) as never,
},
});
}

View File

@@ -26,6 +26,9 @@ function installStateRuntime(): void {
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
openChannelIngressQueue: (() => {
throw new Error("openChannelIngressQueue is not used by voice-call event tests");
}) as never,
},
});
}

View File

@@ -29,6 +29,9 @@ function installStateRuntime(): void {
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
openChannelIngressQueue: (() => {
throw new Error("openChannelIngressQueue is not used by voice-call store tests");
}) as never,
},
});
}
@@ -88,6 +91,9 @@ describe("voice-call call record store", () => {
openSyncKeyedStore: (() => {
throw new Error("sqlite unavailable");
}) as never,
openChannelIngressQueue: (() => {
throw new Error("openChannelIngressQueue is not used by voice-call store tests");
}) as never,
},
});

View File

@@ -21,6 +21,11 @@ function installStateRuntime(): void {
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
openChannelIngressQueue: (() => {
throw new Error(
"openChannelIngressQueue is not used by voice-call webhook lifecycle tests",
);
}) as never,
},
});
}

View File

@@ -141,9 +141,11 @@ vi.mock("openclaw/plugin-sdk/media-store", async () => {
});
vi.mock("./runtime.js", async () => {
const { createChannelIngressQueue } = await vi.importActual<
typeof import("../../../src/channels/message/ingress-queue.js")
>("../../../src/channels/message/ingress-queue.js");
const { createChannelIngressQueueForTests: createChannelIngressQueue } = await Promise.resolve(
vi.importActual<typeof import("openclaw/plugin-sdk/plugin-state-test-runtime")>(
"openclaw/plugin-sdk/plugin-state-test-runtime",
),
);
const stateDir = `/tmp/openclaw-whatsapp-inbound-media-${Date.now()}-${Math.random()}`;
return {
getOptionalWhatsAppRuntime: () => undefined,
@@ -153,7 +155,7 @@ vi.mock("./runtime.js", async () => {
openKeyedStore: () => createInMemoryKeyedStore(),
openChannelIngressQueue: (
options?: Omit<Parameters<typeof createChannelIngressQueue>[0], "channelId">,
) => createChannelIngressQueue({ ...(options ?? {}), channelId: "whatsapp" }),
) => createChannelIngressQueue({ ...options, channelId: "whatsapp" }),
},
}),
setWhatsAppRuntime: vi.fn(),

View File

@@ -136,9 +136,11 @@ vi.mock("openclaw/plugin-sdk/channel-activity-runtime", async () => {
});
vi.mock("./runtime.js", async () => {
const { createChannelIngressQueue } = await vi.importActual<
typeof import("../../../src/channels/message/ingress-queue.js")
>("../../../src/channels/message/ingress-queue.js");
const { createChannelIngressQueueForTests: createChannelIngressQueue } = await Promise.resolve(
vi.importActual<typeof import("openclaw/plugin-sdk/plugin-state-test-runtime")>(
"openclaw/plugin-sdk/plugin-state-test-runtime",
),
);
return {
getWhatsAppRuntime: () => ({
state: {
@@ -146,7 +148,7 @@ vi.mock("./runtime.js", async () => {
openKeyedStore: pluginRuntimeMocks.openKeyedStore,
openChannelIngressQueue: (
options?: Omit<Parameters<typeof createChannelIngressQueue>[0], "channelId">,
) => createChannelIngressQueue({ ...(options ?? {}), channelId: "whatsapp" }),
) => createChannelIngressQueue({ ...options, channelId: "whatsapp" }),
},
}),
setWhatsAppRuntime: vi.fn(),

View File

@@ -2873,6 +2873,32 @@ describe("applyExtraParamsToAgent", () => {
expect(payload.store).toBe(true);
});
it("keeps Responses replay item ids enabled for direct OpenAI store-enabled requests", () => {
let capturedOptions:
| (SimpleStreamOptions & {
replayResponsesItemIds?: boolean;
})
| undefined;
const baseStreamFn: StreamFn = (_model, _context, options) => {
capturedOptions = options;
return {} as ReturnType<StreamFn>;
};
const streamFn = createOpenAIResponsesContextManagementWrapper(baseStreamFn, undefined);
void streamFn(
{
api: "openai-responses",
provider: "openai",
id: "gpt-5",
baseUrl: "https://api.openai.com/v1",
} as unknown as Model<"openai-responses">,
{ messages: [] },
{},
);
expect(capturedOptions?.replayResponsesItemIds).toBe(true);
});
it("forces store=true for azure-openai provider with openai-responses API (#42800)", () => {
const payload = runResponsesPayloadMutationCase({
applyProvider: "azure-openai",

View File

@@ -1,7 +1,10 @@
import { isCronRunSessionKey } from "../../../sessions/session-key-utils.js";
import { isTerminalTaskStatus } from "../../../tasks/task-executor-policy.js";
import { findTaskByRunId, listTaskRecords } from "../../../tasks/task-registry.js";
import type { TaskRecord } from "../../../tasks/task-registry.types.js";
import {
findTaskByRunIdForStatus,
listTasksForOwnerOrRequesterSessionKeyForStatus,
} from "../../../tasks/task-status-access.js";
export type AsyncStartedToolMeta = {
toolName?: string;
@@ -98,16 +101,10 @@ function collectAsyncTaskRunIds(
if (!normalizedSessionKey) {
return runIds;
}
for (const task of listTaskRecords()) {
for (const task of listTasksForOwnerOrRequesterSessionKeyForStatus(normalizedSessionKey)) {
if (!COMPLETION_REQUIRED_TASK_KINDS.has(task.taskKind ?? "")) {
continue;
}
if (
task.requesterSessionKey !== normalizedSessionKey &&
task.ownerKey !== normalizedSessionKey
) {
continue;
}
if (isTerminalTaskStatus(task.status)) {
continue;
}
@@ -123,7 +120,7 @@ function findTerminalTasks(runIds: readonly string[]): {
const pendingRunIds: string[] = [];
const terminalTasks: TaskRecord[] = [];
for (const runId of runIds) {
const task = findTaskByRunId(runId);
const task = findTaskByRunIdForStatus(runId);
if (task && isTerminalTaskStatus(task.status)) {
terminalTasks.push(task);
continue;
@@ -138,7 +135,7 @@ export function requiresCompletionRequiredAsyncTaskWait(params: {
toolMetas: readonly AsyncStartedToolMeta[];
}): boolean {
const sessionKey = params.sessionKey?.trim();
if (!isCronRunSessionKey(sessionKey)) {
if (!sessionKey || !isCronRunSessionKey(sessionKey)) {
return false;
}
if (
@@ -148,11 +145,10 @@ export function requiresCompletionRequiredAsyncTaskWait(params: {
) {
return true;
}
return listTaskRecords().some(
return listTasksForOwnerOrRequesterSessionKeyForStatus(sessionKey).some(
(task) =>
COMPLETION_REQUIRED_TASK_KINDS.has(task.taskKind ?? "") &&
!isTerminalTaskStatus(task.status) &&
(task.requesterSessionKey === sessionKey || task.ownerKey === sessionKey) &&
Boolean(task.runId?.trim()),
);
}

View File

@@ -86,6 +86,7 @@ async function runAbortedOpenAIResponsesStream(params: {
description: string;
parameters: ReturnType<typeof Type.Object>;
}>;
replayResponsesItemIds?: boolean;
}) {
const controller = new AbortController();
controller.abort();
@@ -100,11 +101,12 @@ async function runAbortedOpenAIResponsesStream(params: {
},
{
apiKey: "test",
replayResponsesItemIds: params.replayResponsesItemIds ?? true,
signal: controller.signal,
onPayload: (nextPayload) => {
onPayload: (nextPayload: unknown) => {
payload = nextPayload as Record<string, unknown>;
},
},
} as never,
);
await responseStream.result();

View File

@@ -2750,12 +2750,14 @@ describe("openai transport stream", () => {
call_id?: string;
phase?: string;
encrypted_content?: string;
summary?: unknown;
}>;
};
const reasoningItem = params.input?.find((item) => item.type === "reasoning");
expectRecordFields(reasoningItem, {
type: "reasoning",
summary: [],
});
expect(reasoningItem?.id).toBeUndefined();
expect(reasoningItem).not.toHaveProperty("encrypted_content");
@@ -2776,7 +2778,222 @@ describe("openai transport stream", () => {
expect(functionCall?.id).toBeUndefined();
});
it("preserves prior Responses replay item ids for custom Codex-compatible responses", () => {
it("omits Responses replay item ids when OpenAI Responses requests disable store", () => {
const params = buildOpenAIResponsesParams(
{
id: "gpt-5.5",
name: "GPT-5.5",
api: "openai-responses",
provider: "mycodex",
baseUrl: "http://127.0.0.1:8317/v1",
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 1_000_000,
maxTokens: 128_000,
} satisfies Model<"openai-responses">,
{
systemPrompt: "system",
messages: [
{
role: "assistant",
api: "openai-responses",
provider: "mycodex",
model: "gpt-5.5",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "toolUse",
timestamp: 1,
content: [
{
type: "thinking",
thinking: "Need a tool.",
thinkingSignature: JSON.stringify({
type: "reasoning",
id: "rs_prior",
encrypted_content: "ciphertext",
}),
},
{
type: "text",
text: "Checking the price.",
textSignature: JSON.stringify({
v: 1,
id: "msg_prior",
phase: "commentary",
}),
},
{
type: "toolCall",
id: "call_abc|fc_prior",
name: "price_lookup",
arguments: { symbol: "SOL" },
},
],
},
{
role: "toolResult",
toolCallId: "call_abc|fc_prior",
toolName: "price_lookup",
content: [{ type: "text", text: "$83.95" }],
isError: false,
timestamp: 2,
},
],
tools: [],
} as never,
{ sessionId: "session-123" },
) as {
store?: boolean;
input?: Array<{
type?: string;
role?: string;
id?: string;
call_id?: string;
phase?: string;
encrypted_content?: string;
summary?: unknown;
}>;
};
expect(params.store).toBe(false);
const reasoningItem = params.input?.find((item) => item.type === "reasoning");
expectRecordFields(reasoningItem, {
type: "reasoning",
summary: [],
});
expect(reasoningItem?.id).toBeUndefined();
expect(reasoningItem).not.toHaveProperty("encrypted_content");
const assistantMessage = params.input?.find(
(item) => item.type === "message" && item.role === "assistant",
);
expectRecordFields(assistantMessage, {
type: "message",
role: "assistant",
phase: "commentary",
});
expect(assistantMessage?.id).toBeUndefined();
const functionCall = params.input?.find((item) => item.type === "function_call");
expectRecordFields(functionCall, {
type: "function_call",
call_id: "call_abc",
});
expect(functionCall?.id).toBeUndefined();
});
it("preserves Responses replay item ids when a store-enabled wrapper requests replay", () => {
const params = buildOpenAIResponsesParams(
{
id: "gpt-5.4",
name: "GPT-5.4",
api: "openai-responses",
provider: "openai",
baseUrl: "https://api.openai.com/v1",
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 200000,
maxTokens: 8192,
} satisfies Model<"openai-responses">,
{
systemPrompt: "system",
messages: [
{
role: "assistant",
api: "openai-responses",
provider: "openai",
model: "gpt-5.4",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "toolUse",
timestamp: 1,
content: [
{
type: "thinking",
thinking: "Need a tool.",
thinkingSignature: JSON.stringify({
type: "reasoning",
id: "rs_prior",
encrypted_content: "ciphertext",
}),
},
{
type: "text",
text: "Checking the price.",
textSignature: JSON.stringify({
v: 1,
id: "msg_prior",
phase: "commentary",
}),
},
{
type: "toolCall",
id: "call_abc|fc_prior",
name: "price_lookup",
arguments: { symbol: "SOL" },
},
],
},
{
role: "toolResult",
toolCallId: "call_abc|fc_prior",
toolName: "price_lookup",
content: [{ type: "text", text: "$83.95" }],
isError: false,
timestamp: 2,
},
],
tools: [],
} as never,
{ replayResponsesItemIds: true, sessionId: "session-123" },
) as {
input?: Array<{
type?: string;
role?: string;
id?: string;
call_id?: string;
phase?: string;
encrypted_content?: string;
summary?: unknown;
}>;
};
const reasoningItem = params.input?.find((item) => item.type === "reasoning");
expectRecordFields(reasoningItem, {
type: "reasoning",
id: "rs_prior",
summary: [],
});
const assistantMessage = params.input?.find(
(item) => item.type === "message" && item.role === "assistant",
);
expectRecordFields(assistantMessage, {
type: "message",
role: "assistant",
id: "msg_prior",
phase: "commentary",
});
const functionCall = params.input?.find((item) => item.type === "function_call");
expectRecordFields(functionCall, {
type: "function_call",
id: "fc_prior",
call_id: "call_abc",
});
});
it("omits prior Responses replay item ids when store is disabled for custom Codex-compatible responses", () => {
const model = {
id: "gpt-5.4",
name: "GPT-5.4",
@@ -2856,15 +3073,17 @@ describe("openai transport stream", () => {
call_id?: string;
phase?: string;
encrypted_content?: string;
summary?: unknown;
}>;
};
const reasoningItem = params.input?.find((item) => item.type === "reasoning");
expectRecordFields(reasoningItem, {
type: "reasoning",
id: "rs_prior",
encrypted_content: "ciphertext",
summary: [],
});
expect(reasoningItem?.id).toBeUndefined();
expect(reasoningItem).not.toHaveProperty("__openclaw_replay");
const assistantMessage = params.input?.find(
(item) => item.type === "message" && item.role === "assistant",
@@ -2872,18 +3091,18 @@ describe("openai transport stream", () => {
expectRecordFields(assistantMessage, {
type: "message",
role: "assistant",
id: "msg_prior",
phase: "commentary",
});
expect(assistantMessage?.id).toBeUndefined();
const functionCall = params.input?.find((item) => item.type === "function_call");
expectRecordFields(functionCall, {
type: "function_call",
id: "fc_prior",
call_id: "call_abc",
});
expect(functionCall?.id).toBeUndefined();
});
it("drops oversized GitHub Copilot Responses reasoning replay items before send", () => {
it("keeps GitHub Copilot Responses reasoning replay when store-disabled ids are omitted", () => {
const model = {
id: "gpt-5.5",
name: "GPT-5.5",
@@ -2934,6 +3153,73 @@ describe("openai transport stream", () => {
tools: [],
} as never,
{ sessionId: "session-123" },
) as {
input?: Array<{
type?: string;
id?: string;
summary?: unknown;
}>;
};
const reasoningItem = params.input?.find((item) => item.type === "reasoning");
expectRecordFields(reasoningItem, {
type: "reasoning",
summary: [],
});
expect(reasoningItem?.id).toBeUndefined();
});
it("drops oversized GitHub Copilot Responses reasoning replay ids before send", () => {
const model = {
id: "gpt-5.5",
name: "GPT-5.5",
api: "openai-responses",
provider: "github-copilot",
baseUrl: "https://api.githubcopilot.com",
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 400000,
maxTokens: 8192,
} satisfies Model<"openai-responses">;
const longReasoningId = `rs_${"x".repeat(380)}`;
const params = buildOpenAIResponsesParams(
model,
{
systemPrompt: "system",
messages: [
{
role: "assistant",
api: "openai-responses",
provider: "github-copilot",
model: "gpt-5.5",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "toolUse",
timestamp: 1,
content: [
{
type: "thinking",
thinking: "Need a tool.",
thinkingSignature: JSON.stringify({
type: "reasoning",
id: longReasoningId,
summary: [],
}),
},
],
},
],
tools: [],
} as never,
{ replayResponsesItemIds: true, sessionId: "session-123" },
) as {
input?: Array<{
type?: string;
@@ -3006,14 +3292,16 @@ describe("openai transport stream", () => {
type?: string;
id?: string;
encrypted_content?: string;
summary?: unknown;
}>;
};
const reasoningItem = params.input?.find((item) => item.type === "reasoning");
expectRecordFields(reasoningItem, {
type: "reasoning",
id: "rs_prior",
summary: [],
});
expect(reasoningItem?.id).toBeUndefined();
expect(reasoningItem).not.toHaveProperty("encrypted_content");
});
@@ -3079,14 +3367,16 @@ describe("openai transport stream", () => {
type?: string;
id?: string;
encrypted_content?: string;
summary?: unknown;
}>;
};
const reasoningItem = params.input?.find((item) => item.type === "reasoning");
expectRecordFields(reasoningItem, {
type: "reasoning",
id: "rs_prior",
summary: [],
});
expect(reasoningItem?.id).toBeUndefined();
expect(reasoningItem).not.toHaveProperty("encrypted_content");
});
@@ -3154,15 +3444,17 @@ describe("openai transport stream", () => {
type?: string;
id?: string;
encrypted_content?: string;
summary?: unknown;
}>;
};
const reasoningItem = params.input?.find((item) => item.type === "reasoning");
expectRecordFields(reasoningItem, {
type: "reasoning",
id: "rs_prior",
encrypted_content: "ciphertext",
summary: [],
});
expect(reasoningItem?.id).toBeUndefined();
expect(reasoningItem).not.toHaveProperty("__openclaw_replay");
});
@@ -3269,8 +3561,7 @@ describe("openai transport stream", () => {
const functionOutput = params.input?.find((item) => item.type === "function_call_output");
expect(functionCall).toBeDefined();
expect(functionOutput).toBeDefined();
expect(functionCall?.id).toMatch(/^fc_/);
expect(functionCall?.id?.length).toBeLessThanOrEqual(64);
expect(functionCall?.id).toBeUndefined();
expect(functionCall?.call_id).toBe("call_ug6lFGKwZDjHfzW8H0PDQRwN");
expect(functionOutput?.call_id).toBe(functionCall?.call_id);
for (const item of params.input ?? []) {
@@ -3283,7 +3574,7 @@ describe("openai transport stream", () => {
}
});
it("keeps distinct overlong Copilot Responses replay item ids distinct", () => {
it("omits distinct overlong Copilot Responses replay item ids when store is disabled", () => {
const sharedToolItemPrefix = "iVec" + "A".repeat(160);
const firstToolCallId = `call_first|${sharedToolItemPrefix}Aa`;
const secondToolCallId = `call_second|${sharedToolItemPrefix}BB`;
@@ -3353,14 +3644,7 @@ describe("openai transport stream", () => {
params.input?.filter((item) => item.type === "function_call_output") ?? [];
expect(functionCalls).toHaveLength(2);
expect(functionOutputs).toHaveLength(2);
expect(functionCalls.map((item) => item.id)).toEqual([
expect.stringMatching(/^fc_/),
expect.stringMatching(/^fc_/),
]);
expect(new Set(functionCalls.map((item) => item.id)).size).toBe(2);
for (const item of functionCalls) {
expect(item.id?.length).toBeLessThanOrEqual(64);
}
expect(functionCalls.map((item) => item.id)).toEqual([undefined, undefined]);
expect(functionOutputs.map((item) => item.call_id)).toEqual(["call_first", "call_second"]);
});
@@ -3678,69 +3962,66 @@ describe("openai transport stream", () => {
baseUrl: "https://proxy.example.com/v1",
},
},
])(
"omits orphan phase-tagged ids for $label responses payloads",
({ label: _label, model }) => {
const params = buildOpenAIResponsesParams(
{
...model,
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 200000,
maxTokens: 8192,
} as Model<"openai-responses">,
{
systemPrompt: "system",
messages: [
{
role: "assistant",
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
])("omits orphan phase-tagged ids for $label responses payloads", ({ label: _label, model }) => {
const params = buildOpenAIResponsesParams(
{
...model,
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 200000,
maxTokens: 8192,
} as Model<"openai-responses">,
{
systemPrompt: "system",
messages: [
{
role: "assistant",
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: 1,
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({
v: 1,
id: "msg_commentary",
phase: "commentary",
}),
},
stopReason: "stop",
timestamp: 1,
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({
v: 1,
id: "msg_commentary",
phase: "commentary",
}),
},
],
},
{
role: "user",
content: "Continue",
timestamp: 2,
},
],
tools: [],
} as never,
undefined,
) as {
input?: Array<{ role?: string; id?: string; phase?: string }>;
};
],
},
{
role: "user",
content: "Continue",
timestamp: 2,
},
],
tools: [],
} as never,
undefined,
) as {
input?: Array<{ role?: string; id?: string; phase?: string }>;
};
const assistantItem = params.input?.find((item) => item.role === "assistant");
expectRecordFields(assistantItem, {
role: "assistant",
phase: "commentary",
});
expect(assistantItem?.id).toBeUndefined();
},
);
const assistantItem = params.input?.find((item) => item.role === "assistant");
expectRecordFields(assistantItem, {
role: "assistant",
phase: "commentary",
});
expect(assistantItem?.id).toBeUndefined();
});
it("strips the internal cache boundary from OpenAI system prompts", () => {
const params = buildOpenAIResponsesParams(

View File

@@ -173,6 +173,7 @@ type OpenAIResponsesOptions = BaseStreamOptions & {
reasoning?: OpenAIReasoningEffort;
reasoningEffort?: OpenAIReasoningEffort;
reasoningSummary?: "auto" | "detailed" | "concise" | null;
replayResponsesItemIds?: boolean;
serviceTier?: ResponseCreateParamsStreaming["service_tier"];
toolChoice?: ResponseCreateParamsStreaming["tool_choice"];
};
@@ -908,6 +909,16 @@ function readOpenAIResponsesReasoningReplayBlockMetadata(
return isOpenAIResponsesReasoningReplayMetadata(value) ? value : undefined;
}
function normalizeOpenAIResponsesReasoningReplayItem(
item: ReplayableResponseReasoningItem,
): ReplayableResponseReasoningItem {
const record = item as ReplayableResponseReasoningItem & Record<string, unknown>;
if (record.type !== "reasoning" || Array.isArray(record.summary)) {
return item;
}
return { ...record, summary: [] } as ReplayableResponseReasoningItem;
}
function prepareOpenAIResponsesReasoningItemForReplay(
item: ReplayableResponseReasoningItem,
context: OpenAIResponsesReplayContext,
@@ -916,16 +927,18 @@ function prepareOpenAIResponsesReasoningItemForReplay(
const { [OPENAI_RESPONSES_REASONING_REPLAY_META_KEY]: rawMetadata, ...rest } =
item as ReplayableResponseReasoningItem & Record<string, unknown>;
if (!("encrypted_content" in rest)) {
return rest as ReplayableResponseReasoningItem;
return normalizeOpenAIResponsesReasoningReplayItem(rest as ReplayableResponseReasoningItem);
}
const metadata =
blockMetadata ??
(isOpenAIResponsesReasoningReplayMetadata(rawMetadata) ? rawMetadata : undefined);
if (encryptedReasoningReplayMetadataMatches(metadata, context)) {
return rest as ReplayableResponseReasoningItem;
return normalizeOpenAIResponsesReasoningReplayItem(rest as ReplayableResponseReasoningItem);
}
const stripped = stripEncryptedContentFields(rest);
return stripped.value as ReplayableResponseReasoningItem;
return normalizeOpenAIResponsesReasoningReplayItem(
stripped.value as ReplayableResponseReasoningItem,
);
}
async function createResponsesStreamWithEncryptedContentRetry(params: {
@@ -1145,6 +1158,7 @@ function convertResponsesMessages(
delete replayableReasoningItem.id;
}
if (
shouldReplayResponsesItemIds &&
model.provider === "github-copilot" &&
!isSafeResponsesReplayItemId(replayableReasoningItem.id)
) {
@@ -1190,7 +1204,7 @@ function convertResponsesMessages(
: undefined;
output.push({
type: "function_call",
id: itemId,
...(itemId ? { id: itemId } : {}),
call_id: callId,
name: block.name,
arguments:
@@ -2077,6 +2091,12 @@ export function buildOpenAIResponsesParams(
const compat = getCompat(model as OpenAIModeModel);
const supportsDeveloperRole =
typeof compat.supportsDeveloperRole === "boolean" ? compat.supportsDeveloperRole : undefined;
const payloadPolicy = resolveOpenAIResponsesPayloadPolicy(model, {
storeMode: "disable",
});
const policyAllowsReplayIds = payloadPolicy.explicitStore !== false;
const replayResponsesItemIds =
!isNativeCodexResponses && (options?.replayResponsesItemIds ?? policyAllowsReplayIds);
const messages = convertResponsesMessages(
model,
context,
@@ -2085,7 +2105,7 @@ export function buildOpenAIResponsesParams(
includeSystemPrompt: !isCodexResponses,
supportsDeveloperRole,
replayReasoningItems: true,
replayResponsesItemIds: !isNativeCodexResponses,
replayResponsesItemIds,
authProfileId: options?.authProfileId,
sessionId: options?.sessionId,
},
@@ -2095,9 +2115,6 @@ export function buildOpenAIResponsesParams(
}
const cacheRetention = resolveCacheRetention(options?.cacheRetention);
const promptCacheKey = resolvePromptCacheKey(options, cacheRetention);
const payloadPolicy = resolveOpenAIResponsesPayloadPolicy(model, {
storeMode: "disable",
});
const params: OpenAIResponsesRequestParams = {
model: model.id,
input: messages,

View File

@@ -186,8 +186,8 @@ function affectedRows(result: { numAffectedRows?: bigint }): number {
return Number(result.numAffectedRows ?? 0n);
}
function parseJson<T>(value: string): T {
return JSON.parse(value) as T;
function parseJson(value: string): unknown {
return JSON.parse(value) as unknown;
}
function baseRecord<TPayload, TMetadata>(
@@ -198,13 +198,13 @@ function baseRecord<TPayload, TMetadata>(
channelId: row.channel_id,
accountId: row.account_id,
queueName: row.queue_name,
payload: parseJson<TPayload>(row.payload_json),
...(row.metadata_json === null ? {} : { metadata: parseJson<TMetadata>(row.metadata_json) }),
receivedAt: Number(row.received_at),
updatedAt: Number(row.updated_at),
payload: parseJson(row.payload_json) as TPayload,
...(row.metadata_json === null ? {} : { metadata: parseJson(row.metadata_json) as TMetadata }),
receivedAt: row.received_at,
updatedAt: row.updated_at,
...(row.lane_key === null ? {} : { laneKey: row.lane_key }),
attempts: Number(row.attempts),
...(row.last_attempt_at === null ? {} : { lastAttemptAt: Number(row.last_attempt_at) }),
attempts: row.attempts,
...(row.last_attempt_at === null ? {} : { lastAttemptAt: row.last_attempt_at }),
...(row.last_error === null ? {} : { lastError: row.last_error }),
};
}
@@ -217,7 +217,7 @@ function claimedRecord<TPayload, TMetadata>(
claim: {
token: row.claim_token ?? "",
ownerId: row.claim_owner ?? "",
claimedAt: Number(row.claimed_at ?? 0),
claimedAt: row.claimed_at ?? 0,
},
};
}
@@ -230,10 +230,10 @@ function completedRecord<TCompletedMetadata>(
channelId: row.channel_id,
accountId: row.account_id,
queueName: row.queue_name,
completedAt: Number(row.completed_at ?? row.updated_at),
completedAt: row.completed_at ?? row.updated_at,
...(row.completed_metadata_json === null
? {}
: { metadata: parseJson<TCompletedMetadata>(row.completed_metadata_json) }),
: { metadata: parseJson(row.completed_metadata_json) as TCompletedMetadata }),
};
}
@@ -243,7 +243,7 @@ function failedRecord(row: ChannelIngressRow): ChannelIngressQueueFailedRecord {
channelId: row.channel_id,
accountId: row.account_id,
queueName: row.queue_name,
failedAt: Number(row.failed_at ?? row.updated_at),
failedAt: row.failed_at ?? row.updated_at,
reason: row.failed_reason ?? "failed",
...(row.last_error === null ? {} : { message: row.last_error }),
};

View File

@@ -150,6 +150,101 @@ describe("streamOpenAICodexResponses transport", () => {
expect(result.errorMessage).toContain("Request timed out after 5ms");
});
it("does not replay Responses item ids for store-disabled ChatGPT requests", async () => {
let capturedPayload:
| {
store?: unknown;
input?: Array<Record<string, unknown>>;
}
| undefined;
const stream = streamOpenAICodexResponses(
model,
{
messages: [
{
role: "assistant",
api: "openai-chatgpt-responses",
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "toolUse",
timestamp: 1,
content: [
{
type: "thinking",
thinking: "Need a tool.",
thinkingSignature: JSON.stringify({
type: "reasoning",
id: "rs_prior",
encrypted_content: "ciphertext",
}),
},
{
type: "text",
text: "Checking.",
textSignature: JSON.stringify({
v: 1,
id: "msg_prior",
phase: "commentary",
}),
},
{
type: "toolCall",
id: "call_abc|fc_prior",
name: "lookup",
arguments: {},
},
],
},
],
},
{
apiKey: createJwt({
"https://api.openai.com/auth": {
chatgpt_account_id: "acct-1",
},
}),
transport: "sse",
onPayload: (payload) => {
capturedPayload = payload as typeof capturedPayload;
throw new Error("stop after payload");
},
},
);
const result = await stream.result();
expect(result.stopReason).toBe("error");
expect(result.errorMessage).toBe("stop after payload");
expect(capturedPayload?.store).toBe(false);
const reasoningItem = capturedPayload?.input?.find((item) => item.type === "reasoning");
expect(reasoningItem).toMatchObject({
type: "reasoning",
encrypted_content: "ciphertext",
summary: [],
});
expect(reasoningItem).not.toHaveProperty("id");
const messageItem = capturedPayload?.input?.find((item) => item.type === "message");
expect(messageItem).toMatchObject({
type: "message",
phase: "commentary",
});
expect(messageItem).not.toHaveProperty("id");
const functionCall = capturedPayload?.input?.find((item) => item.type === "function_call");
expect(functionCall).toMatchObject({
type: "function_call",
call_id: "call_abc",
});
expect(functionCall).not.toHaveProperty("id");
});
it("caps oversized timeoutMs before creating request abort signals", async () => {
stubHangingFetch(MAX_TIMER_TIMEOUT_MS);

View File

@@ -480,6 +480,7 @@ function buildRequestBody(
): RequestBody {
const messages = convertResponsesMessages(model, context, CODEX_TOOL_CALL_PROVIDERS, {
includeSystemPrompt: false,
replayResponsesItemIds: false,
});
const body: RequestBody = {
@@ -1492,6 +1493,7 @@ async function processWebSocketStream(
CODEX_TOOL_CALL_PROVIDERS,
{
includeSystemPrompt: false,
replayResponsesItemIds: false,
},
).filter((item) => item.type !== "function_call_output");
entry.continuation = {

View File

@@ -54,4 +54,93 @@ describe("OpenAI-compatible provider credentials", () => {
expect(result.stopReason).toBe("error");
expect(result.errorMessage).toBe("No API key for provider: custom-openai-compatible");
});
it("does not replay Responses item ids for direct store-disabled requests", async () => {
let capturedPayload: { store?: unknown; input?: Array<Record<string, unknown>> } | undefined;
const model = {
...createBaseModel("openai-responses"),
reasoning: true,
} satisfies Model<"openai-responses">;
const stream = streamOpenAIResponses(
model,
{
messages: [
{
role: "assistant",
api: "openai-responses",
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "toolUse",
timestamp: 1,
content: [
{
type: "thinking",
thinking: "Need a tool.",
thinkingSignature: JSON.stringify({
type: "reasoning",
id: "rs_prior",
encrypted_content: "ciphertext",
}),
},
{
type: "text",
text: "Checking.",
textSignature: JSON.stringify({
v: 1,
id: "msg_prior",
phase: "commentary",
}),
},
{
type: "toolCall",
id: "call_abc|fc_prior",
name: "lookup",
arguments: {},
},
],
},
],
},
{
apiKey: "sk-test",
onPayload: (payload) => {
capturedPayload = payload as typeof capturedPayload;
throw new Error("stop after payload");
},
},
);
const result = await stream.result();
expect(result.stopReason).toBe("error");
expect(result.errorMessage).toBe("stop after payload");
expect(capturedPayload?.store).toBe(false);
const reasoningItem = capturedPayload?.input?.find((item) => item.type === "reasoning");
expect(reasoningItem).toMatchObject({
type: "reasoning",
encrypted_content: "ciphertext",
summary: [],
});
expect(reasoningItem).not.toHaveProperty("id");
const messageItem = capturedPayload?.input?.find((item) => item.type === "message");
expect(messageItem).toMatchObject({
type: "message",
phase: "commentary",
});
expect(messageItem).not.toHaveProperty("id");
const functionCall = capturedPayload?.input?.find((item) => item.type === "function_call");
expect(functionCall).toMatchObject({
type: "function_call",
call_id: "call_abc",
});
expect(functionCall).not.toHaveProperty("id");
});
});

View File

@@ -238,4 +238,92 @@ describe("convertResponsesMessages", () => {
),
).not.toHaveProperty("id");
});
it("omits Responses replay item ids when requested by store-disabled callers", () => {
const input = convertResponsesMessages(
nativeOpenAIModel,
{
systemPrompt: "system",
messages: [
{
role: "assistant",
api: nativeOpenAIModel.api,
provider: nativeOpenAIModel.provider,
model: nativeOpenAIModel.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "toolUse",
timestamp: 1,
content: [
{
type: "thinking",
thinking: "Need a tool.",
thinkingSignature: JSON.stringify({
type: "reasoning",
id: "rs_prior",
encrypted_content: "ciphertext",
}),
},
{
type: "text",
text: "Checking the price.",
textSignature: JSON.stringify({
v: 1,
id: "msg_prior",
phase: "commentary",
}),
},
{
type: "toolCall",
id: "call_abc|fc_prior",
name: "price_lookup",
arguments: { symbol: "SOL" },
},
],
},
{
role: "toolResult",
toolCallId: "call_abc|fc_prior",
toolName: "price_lookup",
content: [{ type: "text", text: "$83.95" }],
isError: false,
timestamp: 2,
},
],
} satisfies Context,
allowedToolCallProviders,
{ includeSystemPrompt: false, replayResponsesItemIds: false },
) as unknown as Array<Record<string, unknown>>;
const reasoningItem = input.find((item) => item.type === "reasoning");
expect(reasoningItem).toMatchObject({
type: "reasoning",
encrypted_content: "ciphertext",
summary: [],
});
expect(reasoningItem).not.toHaveProperty("id");
const assistantMessage = input.find(
(item) => item.type === "message" && item.role === "assistant",
);
expect(assistantMessage).toMatchObject({
type: "message",
role: "assistant",
phase: "commentary",
});
expect(assistantMessage).not.toHaveProperty("id");
const functionCall = input.find((item) => item.type === "function_call");
expect(functionCall).toMatchObject({
type: "function_call",
call_id: "call_abc",
});
expect(functionCall).not.toHaveProperty("id");
});
});

View File

@@ -41,6 +41,21 @@ import { transformMessages } from "./transform-messages.js";
// =============================================================================
type ReplayableResponseOutputMessage = Omit<ResponseOutputMessage, "id"> & { id?: string };
type ReplayableResponseReasoningItem = Omit<ResponseReasoningItem, "id"> & { id?: string };
function normalizeResponsesReasoningReplayItem(params: {
item: ReplayableResponseReasoningItem;
replayResponsesItemIds: boolean;
}): ReplayableResponseReasoningItem {
const next = { ...(params.item as ReplayableResponseReasoningItem & Record<string, unknown>) };
if (!Array.isArray(next.summary)) {
next.summary = [];
}
if (!params.replayResponsesItemIds) {
delete next.id;
}
return next as ReplayableResponseReasoningItem;
}
function encodeTextSignatureV1(id: string, phase?: TextSignatureV1["phase"]): string {
const payload: TextSignatureV1 = { v: 1, id };
@@ -106,6 +121,7 @@ export interface OpenAIResponsesStreamOptions {
export interface ConvertResponsesMessagesOptions {
includeSystemPrompt?: boolean;
replayResponsesItemIds?: boolean;
}
export { convertResponsesTools };
export type { ConvertResponsesToolsOptions } from "./openai-responses-tools.js";
@@ -156,6 +172,7 @@ export function convertResponsesMessages<TApi extends Api>(
options?: ConvertResponsesMessagesOptions,
): ResponseInput {
const messages: ResponseInput = [];
const shouldReplayResponsesItemIds = options?.replayResponsesItemIds ?? true;
const normalizeIdPart = (part: string): string => {
const sanitized = part.replace(/[^a-zA-Z0-9_-]/g, "_");
@@ -247,19 +264,24 @@ export function convertResponsesMessages<TApi extends Api>(
for (const block of msg.content) {
if (block.type === "thinking") {
if (block.thinkingSignature) {
const reasoningItem = JSON.parse(block.thinkingSignature) as ResponseReasoningItem;
output.push(reasoningItem);
const reasoningItem = normalizeResponsesReasoningReplayItem({
item: JSON.parse(block.thinkingSignature) as ReplayableResponseReasoningItem,
replayResponsesItemIds: shouldReplayResponsesItemIds,
});
output.push(reasoningItem as ResponseInputItem);
previousReplayItemWasReasoning = true;
}
} else if (block.type === "text") {
const textBlock = block;
const parsedSignature = parseTextSignature(textBlock.textSignature);
let msgId = resolveReplayableResponsesMessageId({
textSignatureId: parsedSignature?.id,
fallbackId: `msg_${msgIndex}`,
fallbackOrdinal: textFallbackOrdinal,
previousReplayItemWasReasoning,
});
let msgId = shouldReplayResponsesItemIds
? resolveReplayableResponsesMessageId({
textSignatureId: parsedSignature?.id,
fallbackId: `msg_${msgIndex}`,
fallbackOrdinal: textFallbackOrdinal,
previousReplayItemWasReasoning,
})
: undefined;
if (!parsedSignature?.id) {
textFallbackOrdinal += 1;
}
@@ -281,18 +303,18 @@ export function convertResponsesMessages<TApi extends Api>(
} else if (block.type === "toolCall") {
const toolCall = block;
const [callId, itemIdRaw] = toolCall.id.split("|");
let itemId: string | undefined = itemIdRaw;
let itemId: string | undefined = shouldReplayResponsesItemIds ? itemIdRaw : undefined;
// For different-model messages, set id to undefined to avoid pairing validation.
// OpenAI tracks which fc_xxx IDs were paired with rs_xxx reasoning items.
// By omitting the id, we avoid triggering that validation (like cross-provider does).
if (isDifferentModel && itemId?.startsWith("fc_")) {
if (shouldReplayResponsesItemIds && isDifferentModel && itemId?.startsWith("fc_")) {
itemId = undefined;
}
output.push({
type: "function_call",
id: itemId,
...(itemId ? { id: itemId } : {}),
call_id: callId,
name: toolCall.name,
arguments: JSON.stringify(toolCall.arguments),

View File

@@ -74,9 +74,14 @@ function formatOpenAIResponsesError(error: unknown): string {
export interface OpenAIResponsesOptions extends StreamOptions {
reasoningEffort?: "minimal" | "low" | "medium" | "high" | "xhigh";
reasoningSummary?: "auto" | "detailed" | "concise" | null;
replayResponsesItemIds?: boolean;
serviceTier?: ResponseCreateParamsStreaming["service_tier"];
}
type OpenAIResponsesReplayOptions = SimpleStreamOptions & {
replayResponsesItemIds?: boolean;
};
/**
* Generate function for OpenAI Responses API
*/
@@ -126,6 +131,8 @@ export const streamSimpleOpenAIResponses: StreamFunction<
return streamOpenAIResponses(model, context, {
...base,
reasoningEffort: resolveResponsesReasoningEffort(model, options?.reasoning),
replayResponsesItemIds: (options as OpenAIResponsesReplayOptions | undefined)
?.replayResponsesItemIds,
} satisfies OpenAIResponsesOptions);
};
@@ -185,7 +192,9 @@ function buildParams(
context: Context,
options?: OpenAIResponsesOptions,
) {
const messages = convertResponsesMessages(model, context, OPENAI_TOOL_CALL_PROVIDERS);
const messages = convertResponsesMessages(model, context, OPENAI_TOOL_CALL_PROVIDERS, {
replayResponsesItemIds: options?.replayResponsesItemIds ?? false,
});
const cacheRetention = resolveCacheRetention(options?.cacheRetention);
const compat = getCompat(model);

View File

@@ -37,6 +37,9 @@ type OpenAIServiceTier = "auto" | "default" | "flex" | "priority";
type OpenClawSimpleStreamOptions = SimpleStreamOptions & {
openclawCodeModeToolSurface?: boolean;
};
type OpenAIResponsesReplayOptions = Parameters<StreamFn>[2] & {
replayResponsesItemIds?: boolean;
};
export { resolveOpenAITextVerbosity };
function resolveOpenAITextVerbosityForModel(
@@ -393,15 +396,21 @@ export function createOpenAIResponsesContextManagementWrapper(
}
const originalOnPayload = options?.onPayload;
return underlying(model, context, {
const replayResponsesItemIds =
policy.explicitStore === undefined
? (options as OpenAIResponsesReplayOptions | undefined)?.replayResponsesItemIds
: policy.explicitStore;
const nextOptions: OpenAIResponsesReplayOptions = {
...options,
...(replayResponsesItemIds === undefined ? {} : { replayResponsesItemIds }),
onPayload: (payload) => {
if (payload && typeof payload === "object") {
applyOpenAIResponsesPayloadPolicy(payload as Record<string, unknown>, policy);
}
return originalOnPayload?.(payload, model);
},
});
};
return underlying(model, context, nextOptions);
};
}

View File

@@ -3,3 +3,10 @@ export {
createPluginStateSyncKeyedStore as createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "../plugin-state/plugin-state-store.js";
export { createChannelIngressQueue as createChannelIngressQueueForTests } from "../channels/message/ingress-queue.js";
export { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js";
export type { DB as OpenClawStateKyselyDatabaseForTests } from "../state/openclaw-state-db.generated.js";
export {
closeOpenClawStateDatabaseForTest,
openOpenClawStateDatabase,
} from "../state/openclaw-state-db.js";

View File

@@ -2636,7 +2636,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
assertPluginStateAllowed();
const stateDir = options?.stateDir ?? baseState.resolveStateDir();
return createChannelIngressQueue<TPayload, TMetadata, TCompletedMetadata>({
...(options ?? {}),
...options,
channelId: pluginId,
stateDir,
});

View File

@@ -1,4 +1,10 @@
import { getTaskById, listTasksForAgentId, listTasksForSessionKey } from "./task-registry.js";
import {
findTaskByRunId,
getTaskById,
listTaskRecords,
listTasksForAgentId,
listTasksForSessionKey,
} from "./task-registry.js";
import type { TaskRecord } from "./task-registry.types.js";
export function getTaskSessionLookupByIdForStatus(
@@ -18,6 +24,16 @@ export function listTasksForSessionKeyForStatus(sessionKey: string): TaskRecord[
return listTasksForSessionKey(sessionKey);
}
export function listTasksForOwnerOrRequesterSessionKeyForStatus(sessionKey: string): TaskRecord[] {
return listTaskRecords().filter(
(task) => task.requesterSessionKey === sessionKey || task.ownerKey === sessionKey,
);
}
export function listTasksForAgentIdForStatus(agentId: string): TaskRecord[] {
return listTasksForAgentId(agentId);
}
export function findTaskByRunIdForStatus(runId: string): TaskRecord | undefined {
return findTaskByRunId(runId);
}