From 03dec8bb3a00209780d3140ec3f10ad0eb069030 Mon Sep 17 00:00:00 2001 From: waterblue <34592262+zhanghang02@users.noreply.github.com> Date: Mon, 1 Jun 2026 05:17:32 +0800 Subject: [PATCH] fix(openai): avoid replay ids when Responses store is disabled Avoid replaying prior OpenAI Responses reasoning/message/function-call item ids when the outgoing request disables store, while preserving encrypted reasoning and normalized summary arrays for stateless replay. Keep explicit store-enabled OpenAI wrapper paths opted into item-id replay, and cover shared/simple Responses, ChatGPT/Codex Responses, and GitHub Copilot sanitizer behavior. Regression tests cover store-disabled id omission, encrypted reasoning preservation, idless Copilot reasoning replay, and direct builder payloads. Local proof included focused Vitest, broad lint, broad test-types, bundled-extension lint, plugin boundary checks, autoreview clean, and live OpenAI Responses gpt-5.5 proof. Co-authored-by: hang --- extensions/feishu/runtime-api.ts | 6 +- .../feishu/src/monitor-state-runtime-api.ts | 2 +- extensions/feishu/src/reply-dispatcher.ts | 4 +- .../connection-bound-ids.test.ts | 3 +- .../github-copilot/connection-bound-ids.ts | 8 +- extensions/github-copilot/stream.test.ts | 10 +- .../telegram/src/polling-session.test.ts | 18 +- .../src/telegram-ingress-spool.test.ts | 8 +- .../telegram/src/telegram-ingress-spool.ts | 10 +- .../voice-call/doctor-contract-api.test.ts | 3 + .../voice-call/src/manager.restore.test.ts | 3 + .../voice-call/src/manager.test-harness.ts | 3 + .../voice-call/src/manager/events.test.ts | 3 + .../voice-call/src/manager/store.test.ts | 6 + .../src/webhook.hangup-once.lifecycle.test.ts | 5 + extensions/whatsapp/src/inbound.media.test.ts | 10 +- .../src/monitor-inbox.test-harness.ts | 10 +- .../embedded-agent-runner-extraparams.test.ts | 26 ++ .../run/attempt.async-tasks.ts | 20 +- .../openai-responses.reasoning-replay.test.ts | 6 +- src/agents/openai-transport-stream.test.ts | 441 ++++++++++++++---- src/agents/openai-transport-stream.ts | 33 +- src/channels/message/ingress-queue.ts | 24 +- .../openai-chatgpt-responses.test.ts | 95 ++++ src/llm/providers/openai-chatgpt-responses.ts | 2 + .../providers/openai-compatible-auth.test.ts | 89 ++++ .../providers/openai-responses-shared.test.ts | 88 ++++ src/llm/providers/openai-responses-shared.ts | 44 +- src/llm/providers/openai-responses.ts | 11 +- src/llm/providers/stream-wrappers/openai.ts | 13 +- src/plugin-sdk/plugin-state-test-runtime.ts | 7 + src/plugins/registry.ts | 2 +- src/tasks/task-status-access.ts | 18 +- 33 files changed, 867 insertions(+), 164 deletions(-) diff --git a/extensions/feishu/runtime-api.ts b/extensions/feishu/runtime-api.ts index 1afa04bab800..397f53eac212 100644 --- a/extensions/feishu/runtime-api.ts +++ b/extensions/feishu/runtime-api.ts @@ -18,7 +18,11 @@ export type { ReplyPayload, } from "openclaw/plugin-sdk/core"; export type { OpenClawConfig as ClawdbotConfig } from "openclaw/plugin-sdk/core"; -export type { RuntimeEnv } from "openclaw/plugin-sdk/runtime"; +export type RuntimeEnv = { + log: (...args: unknown[]) => void; + error: (...args: unknown[]) => void; + exit: (code: number) => void; +}; export type { GroupToolPolicyConfig } from "openclaw/plugin-sdk/config-contracts"; export { DEFAULT_ACCOUNT_ID, diff --git a/extensions/feishu/src/monitor-state-runtime-api.ts b/extensions/feishu/src/monitor-state-runtime-api.ts index f427758f1759..90d7eacec18d 100644 --- a/extensions/feishu/src/monitor-state-runtime-api.ts +++ b/extensions/feishu/src/monitor-state-runtime-api.ts @@ -1,4 +1,4 @@ -export type { RuntimeEnv } from "openclaw/plugin-sdk/runtime"; +export type { RuntimeEnv } from "../runtime-api.js"; export { createFixedWindowRateLimiter, createWebhookAnomalyTracker, diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index 7604720cc204..1bd1f56548ad 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -575,7 +575,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP const queueIdleSideEffects = (options?: { markClosedForReply?: boolean }): Promise => { const nextIdleSideEffects = idleSideEffectsPromise.then(async () => { await closeStreaming(options); - await typingCallbacks?.onIdle?.(); + await Promise.resolve(typingCallbacks?.onIdle?.()); }); idleSideEffectsPromise = nextIdleSideEffects.catch(() => {}); return nextIdleSideEffects; @@ -609,7 +609,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP if (streamingEnabled && renderMode === "card") { startStreaming(); } - await typingCallbacks?.onReplyStart?.(); + await Promise.resolve(typingCallbacks?.onReplyStart?.()); }, deliver: async (payload: ReplyPayload, info) => { if (info?.kind === "final") { diff --git a/extensions/github-copilot/connection-bound-ids.test.ts b/extensions/github-copilot/connection-bound-ids.test.ts index 7827c189bfe1..d34cd9e3ca4a 100644 --- a/extensions/github-copilot/connection-bound-ids.test.ts +++ b/extensions/github-copilot/connection-bound-ids.test.ts @@ -64,7 +64,7 @@ describe("github-copilot connection-bound response IDs", () => { expect(input.map((item) => item.id)).toEqual([withEncrypted, withoutEncrypted]); }); - it("drops unsafe reasoning replay items instead of stripping their IDs", () => { + it("drops unsafe reasoning replay item IDs while keeping idless reasoning replay", () => { const overlongId = `5PX6gLHXT5wE+Y2tPmUV4gn+${"B".repeat(384)}`; const input = [ { @@ -80,6 +80,7 @@ describe("github-copilot connection-bound response IDs", () => { expect(sanitizeCopilotReplayResponseIds(input)).toBe(true); expect(input).toEqual([ + { type: "reasoning", encrypted_content: "missing-id", summary: [] }, { id: "rs_valid", type: "reasoning", encrypted_content: "valid", summary: [] }, ]); }); diff --git a/extensions/github-copilot/connection-bound-ids.ts b/extensions/github-copilot/connection-bound-ids.ts index 7f18df5a42c1..d40f040b785f 100644 --- a/extensions/github-copilot/connection-bound-ids.ts +++ b/extensions/github-copilot/connection-bound-ids.ts @@ -44,11 +44,11 @@ export function sanitizeCopilotReplayResponseIds(input: unknown): boolean { continue; } const id = item.id; - // Reasoning items always reference server-side encrypted state bound to the - // original item ID. Rewriting or stripping that ID can turn replay into an - // invalid or ambiguous server-state lookup, so drop unsafe reasoning items. + // Reasoning items with replay IDs reference server-side encrypted state + // bound to that ID. Drop unsafe IDs, but keep the store-disabled idless + // replay form produced by core Responses conversion. if (item.type === "reasoning") { - if (!isValidReasoningReplayId(id)) { + if (id !== undefined && !isValidReasoningReplayId(id)) { input.splice(index, 1); rewrote = true; } diff --git a/extensions/github-copilot/stream.test.ts b/extensions/github-copilot/stream.test.ts index 63c862d8a947..d4955f29ce9a 100644 --- a/extensions/github-copilot/stream.test.ts +++ b/extensions/github-copilot/stream.test.ts @@ -127,6 +127,7 @@ describe("wrapCopilotAnthropicStream", () => { const payload = { input: [ { id: reasoningId, type: "reasoning", encrypted_content: "valid-encrypted-payload" }, + { type: "reasoning", encrypted_content: "idless-encrypted-payload", summary: [] }, { id: overlongReasoningId, type: "reasoning", @@ -181,8 +182,13 @@ describe("wrapCopilotAnthropicStream", () => { onPayload: options.onPayload, }); expect(payloads[0]?.input[0]?.id).toBe(reasoningId); - expect(payloads[0]?.input.map((item) => item.type)).toEqual(["reasoning", "message"]); - expect(payloads[0]?.input[1]?.id).toMatch(/^msg_[a-f0-9]{16}$/); + expect(payloads[0]?.input.map((item) => item.type)).toEqual([ + "reasoning", + "reasoning", + "message", + ]); + expect(payloads[0]?.input[1]?.id).toBeUndefined(); + expect(payloads[0]?.input[2]?.id).toMatch(/^msg_[a-f0-9]{16}$/); }); it("rewrites Copilot Responses IDs returned by an existing payload hook", async () => { diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index 9b0ed3ed38e9..ac8b0f8c22f8 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -3,14 +3,15 @@ import os from "node:os"; import path from "node:path"; import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; import { MAX_TIMER_TIMEOUT_MS } from "openclaw/plugin-sdk/number-runtime"; -import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { createChannelIngressQueue } from "../../../src/channels/message/ingress-queue.js"; -import { executeSqliteQuerySync, getNodeSqliteKysely } from "../../../src/infra/kysely-sync.js"; -import type { DB as OpenClawStateKyselyDatabase } from "../../../src/state/openclaw-state-db.generated.js"; import { closeOpenClawStateDatabaseForTest, + createChannelIngressQueueForTests as createChannelIngressQueue, + executeSqliteQuerySync, + getNodeSqliteKysely, openOpenClawStateDatabase, -} from "../../../src/state/openclaw-state-db.js"; + type OpenClawStateKyselyDatabaseForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { clearTelegramRuntime, setTelegramRuntime } from "./runtime.js"; import type { TelegramRuntime } from "./runtime.types.js"; import type { TelegramIngressWorkerMessage } from "./telegram-ingress-worker.js"; @@ -105,7 +106,10 @@ type WorkerPollErrorListener = (message: { type WorkerMessageListener = (message: TelegramIngressWorkerMessage) => void; type AsyncVoidFn = () => Promise; type MockCallSource = { mock: { calls: Array> } }; -type TelegramPollingTestDatabase = Pick; +type TelegramPollingTestDatabase = Pick< + OpenClawStateKyselyDatabaseForTests, + "channel_ingress_events" +>; const POLLING_TEST_WATCHDOG_INTERVAL_MS = 30_000; @@ -115,7 +119,7 @@ function installTelegramIngressQueueRuntime(resolveStateDir: () => string): void resolveStateDir, openChannelIngressQueue: ( options?: Omit[0], "channelId">, - ) => createChannelIngressQueue({ ...(options ?? {}), channelId: "telegram" }), + ) => createChannelIngressQueue({ ...options, channelId: "telegram" }), }, } as TelegramRuntime); } diff --git a/extensions/telegram/src/telegram-ingress-spool.test.ts b/extensions/telegram/src/telegram-ingress-spool.test.ts index a8f8db16048b..5524eca39f3b 100644 --- a/extensions/telegram/src/telegram-ingress-spool.test.ts +++ b/extensions/telegram/src/telegram-ingress-spool.test.ts @@ -1,9 +1,11 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; +import { + closeOpenClawStateDatabaseForTest, + createChannelIngressQueueForTests as createChannelIngressQueue, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; import { afterEach, describe, expect, it } from "vitest"; -import { createChannelIngressQueue } from "../../../src/channels/message/ingress-queue.js"; -import { closeOpenClawStateDatabaseForTest } from "../../../src/state/openclaw-state-db.js"; import { clearTelegramRuntime, setTelegramRuntime } from "./runtime.js"; import type { TelegramRuntime } from "./runtime.types.js"; import { @@ -25,7 +27,7 @@ function installTelegramIngressQueueRuntime(resolveStateDir: () => string): void resolveStateDir, openChannelIngressQueue: ( options?: Omit[0], "channelId">, - ) => createChannelIngressQueue({ ...(options ?? {}), channelId: "telegram" }), + ) => createChannelIngressQueue({ ...options, channelId: "telegram" }), }, } as TelegramRuntime); } diff --git a/extensions/telegram/src/telegram-ingress-spool.ts b/extensions/telegram/src/telegram-ingress-spool.ts index 0002881b4b4f..4c49b30b2a55 100644 --- a/extensions/telegram/src/telegram-ingress-spool.ts +++ b/extensions/telegram/src/telegram-ingress-spool.ts @@ -1,11 +1,11 @@ import { randomUUID } from "node:crypto"; import os from "node:os"; import path from "node:path"; -import { - type ChannelIngressQueue, - type ChannelIngressQueueClaim, - type ChannelIngressQueueClaimRef, - type ChannelIngressQueueRecord, +import type { + ChannelIngressQueue, + ChannelIngressQueueClaim, + ChannelIngressQueueClaimRef, + ChannelIngressQueueRecord, } from "openclaw/plugin-sdk/channel-outbound"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { getTelegramRuntime } from "./runtime.js"; diff --git a/extensions/voice-call/doctor-contract-api.test.ts b/extensions/voice-call/doctor-contract-api.test.ts index 10ab8c380f8a..81c4d9f94eb4 100644 --- a/extensions/voice-call/doctor-contract-api.test.ts +++ b/extensions/voice-call/doctor-contract-api.test.ts @@ -40,6 +40,9 @@ function installStateRuntime(): void { }) as never, openSyncKeyedStore: (options: OpenKeyedStoreOptions) => createPluginStateSyncKeyedStoreForTests("voice-call", options), + openChannelIngressQueue: (() => { + throw new Error("openChannelIngressQueue is not used by voice-call doctor tests"); + }) as never, }, }); } diff --git a/extensions/voice-call/src/manager.restore.test.ts b/extensions/voice-call/src/manager.restore.test.ts index 6f4a31787a6a..5b849c724c88 100644 --- a/extensions/voice-call/src/manager.restore.test.ts +++ b/extensions/voice-call/src/manager.restore.test.ts @@ -24,6 +24,9 @@ function installStateRuntime(): void { }) as never, openSyncKeyedStore: (options: OpenKeyedStoreOptions) => createPluginStateSyncKeyedStoreForTests("voice-call", options), + openChannelIngressQueue: (() => { + throw new Error("openChannelIngressQueue is not used by voice-call restore tests"); + }) as never, }, }); } diff --git a/extensions/voice-call/src/manager.test-harness.ts b/extensions/voice-call/src/manager.test-harness.ts index 948956ee934d..2248b33f9b27 100644 --- a/extensions/voice-call/src/manager.test-harness.ts +++ b/extensions/voice-call/src/manager.test-harness.ts @@ -89,6 +89,9 @@ export function installVoiceCallStateRuntimeForTests(): void { }) as never, openSyncKeyedStore: (options: OpenKeyedStoreOptions) => createPluginStateSyncKeyedStoreForTests("voice-call", options), + openChannelIngressQueue: (() => { + throw new Error("openChannelIngressQueue is not used by voice-call manager tests"); + }) as never, }, }); } diff --git a/extensions/voice-call/src/manager/events.test.ts b/extensions/voice-call/src/manager/events.test.ts index 99b603155c5c..801327287768 100644 --- a/extensions/voice-call/src/manager/events.test.ts +++ b/extensions/voice-call/src/manager/events.test.ts @@ -26,6 +26,9 @@ function installStateRuntime(): void { }) as never, openSyncKeyedStore: (options: OpenKeyedStoreOptions) => createPluginStateSyncKeyedStoreForTests("voice-call", options), + openChannelIngressQueue: (() => { + throw new Error("openChannelIngressQueue is not used by voice-call event tests"); + }) as never, }, }); } diff --git a/extensions/voice-call/src/manager/store.test.ts b/extensions/voice-call/src/manager/store.test.ts index c0a4a3ef490f..c7a308b60943 100644 --- a/extensions/voice-call/src/manager/store.test.ts +++ b/extensions/voice-call/src/manager/store.test.ts @@ -29,6 +29,9 @@ function installStateRuntime(): void { }) as never, openSyncKeyedStore: (options: OpenKeyedStoreOptions) => createPluginStateSyncKeyedStoreForTests("voice-call", options), + openChannelIngressQueue: (() => { + throw new Error("openChannelIngressQueue is not used by voice-call store tests"); + }) as never, }, }); } @@ -88,6 +91,9 @@ describe("voice-call call record store", () => { openSyncKeyedStore: (() => { throw new Error("sqlite unavailable"); }) as never, + openChannelIngressQueue: (() => { + throw new Error("openChannelIngressQueue is not used by voice-call store tests"); + }) as never, }, }); diff --git a/extensions/voice-call/src/webhook.hangup-once.lifecycle.test.ts b/extensions/voice-call/src/webhook.hangup-once.lifecycle.test.ts index 04d4d25b1796..5c809694dd5a 100644 --- a/extensions/voice-call/src/webhook.hangup-once.lifecycle.test.ts +++ b/extensions/voice-call/src/webhook.hangup-once.lifecycle.test.ts @@ -21,6 +21,11 @@ function installStateRuntime(): void { }) as never, openSyncKeyedStore: (options: OpenKeyedStoreOptions) => createPluginStateSyncKeyedStoreForTests("voice-call", options), + openChannelIngressQueue: (() => { + throw new Error( + "openChannelIngressQueue is not used by voice-call webhook lifecycle tests", + ); + }) as never, }, }); } diff --git a/extensions/whatsapp/src/inbound.media.test.ts b/extensions/whatsapp/src/inbound.media.test.ts index e9bc5882fd31..f73744485fa4 100644 --- a/extensions/whatsapp/src/inbound.media.test.ts +++ b/extensions/whatsapp/src/inbound.media.test.ts @@ -141,9 +141,11 @@ vi.mock("openclaw/plugin-sdk/media-store", async () => { }); vi.mock("./runtime.js", async () => { - const { createChannelIngressQueue } = await vi.importActual< - typeof import("../../../src/channels/message/ingress-queue.js") - >("../../../src/channels/message/ingress-queue.js"); + const { createChannelIngressQueueForTests: createChannelIngressQueue } = await Promise.resolve( + vi.importActual( + "openclaw/plugin-sdk/plugin-state-test-runtime", + ), + ); const stateDir = `/tmp/openclaw-whatsapp-inbound-media-${Date.now()}-${Math.random()}`; return { getOptionalWhatsAppRuntime: () => undefined, @@ -153,7 +155,7 @@ vi.mock("./runtime.js", async () => { openKeyedStore: () => createInMemoryKeyedStore(), openChannelIngressQueue: ( options?: Omit[0], "channelId">, - ) => createChannelIngressQueue({ ...(options ?? {}), channelId: "whatsapp" }), + ) => createChannelIngressQueue({ ...options, channelId: "whatsapp" }), }, }), setWhatsAppRuntime: vi.fn(), diff --git a/extensions/whatsapp/src/monitor-inbox.test-harness.ts b/extensions/whatsapp/src/monitor-inbox.test-harness.ts index d0186d96fcd3..98a8fac68a96 100644 --- a/extensions/whatsapp/src/monitor-inbox.test-harness.ts +++ b/extensions/whatsapp/src/monitor-inbox.test-harness.ts @@ -136,9 +136,11 @@ vi.mock("openclaw/plugin-sdk/channel-activity-runtime", async () => { }); vi.mock("./runtime.js", async () => { - const { createChannelIngressQueue } = await vi.importActual< - typeof import("../../../src/channels/message/ingress-queue.js") - >("../../../src/channels/message/ingress-queue.js"); + const { createChannelIngressQueueForTests: createChannelIngressQueue } = await Promise.resolve( + vi.importActual( + "openclaw/plugin-sdk/plugin-state-test-runtime", + ), + ); return { getWhatsAppRuntime: () => ({ state: { @@ -146,7 +148,7 @@ vi.mock("./runtime.js", async () => { openKeyedStore: pluginRuntimeMocks.openKeyedStore, openChannelIngressQueue: ( options?: Omit[0], "channelId">, - ) => createChannelIngressQueue({ ...(options ?? {}), channelId: "whatsapp" }), + ) => createChannelIngressQueue({ ...options, channelId: "whatsapp" }), }, }), setWhatsAppRuntime: vi.fn(), diff --git a/src/agents/embedded-agent-runner-extraparams.test.ts b/src/agents/embedded-agent-runner-extraparams.test.ts index ecc7b885608c..69c5516153ff 100644 --- a/src/agents/embedded-agent-runner-extraparams.test.ts +++ b/src/agents/embedded-agent-runner-extraparams.test.ts @@ -2873,6 +2873,32 @@ describe("applyExtraParamsToAgent", () => { expect(payload.store).toBe(true); }); + it("keeps Responses replay item ids enabled for direct OpenAI store-enabled requests", () => { + let capturedOptions: + | (SimpleStreamOptions & { + replayResponsesItemIds?: boolean; + }) + | undefined; + const baseStreamFn: StreamFn = (_model, _context, options) => { + capturedOptions = options; + return {} as ReturnType; + }; + const streamFn = createOpenAIResponsesContextManagementWrapper(baseStreamFn, undefined); + + void streamFn( + { + api: "openai-responses", + provider: "openai", + id: "gpt-5", + baseUrl: "https://api.openai.com/v1", + } as unknown as Model<"openai-responses">, + { messages: [] }, + {}, + ); + + expect(capturedOptions?.replayResponsesItemIds).toBe(true); + }); + it("forces store=true for azure-openai provider with openai-responses API (#42800)", () => { const payload = runResponsesPayloadMutationCase({ applyProvider: "azure-openai", diff --git a/src/agents/embedded-agent-runner/run/attempt.async-tasks.ts b/src/agents/embedded-agent-runner/run/attempt.async-tasks.ts index 58025469c5fa..20d84d7d60dc 100644 --- a/src/agents/embedded-agent-runner/run/attempt.async-tasks.ts +++ b/src/agents/embedded-agent-runner/run/attempt.async-tasks.ts @@ -1,7 +1,10 @@ import { isCronRunSessionKey } from "../../../sessions/session-key-utils.js"; import { isTerminalTaskStatus } from "../../../tasks/task-executor-policy.js"; -import { findTaskByRunId, listTaskRecords } from "../../../tasks/task-registry.js"; import type { TaskRecord } from "../../../tasks/task-registry.types.js"; +import { + findTaskByRunIdForStatus, + listTasksForOwnerOrRequesterSessionKeyForStatus, +} from "../../../tasks/task-status-access.js"; export type AsyncStartedToolMeta = { toolName?: string; @@ -98,16 +101,10 @@ function collectAsyncTaskRunIds( if (!normalizedSessionKey) { return runIds; } - for (const task of listTaskRecords()) { + for (const task of listTasksForOwnerOrRequesterSessionKeyForStatus(normalizedSessionKey)) { if (!COMPLETION_REQUIRED_TASK_KINDS.has(task.taskKind ?? "")) { continue; } - if ( - task.requesterSessionKey !== normalizedSessionKey && - task.ownerKey !== normalizedSessionKey - ) { - continue; - } if (isTerminalTaskStatus(task.status)) { continue; } @@ -123,7 +120,7 @@ function findTerminalTasks(runIds: readonly string[]): { const pendingRunIds: string[] = []; const terminalTasks: TaskRecord[] = []; for (const runId of runIds) { - const task = findTaskByRunId(runId); + const task = findTaskByRunIdForStatus(runId); if (task && isTerminalTaskStatus(task.status)) { terminalTasks.push(task); continue; @@ -138,7 +135,7 @@ export function requiresCompletionRequiredAsyncTaskWait(params: { toolMetas: readonly AsyncStartedToolMeta[]; }): boolean { const sessionKey = params.sessionKey?.trim(); - if (!isCronRunSessionKey(sessionKey)) { + if (!sessionKey || !isCronRunSessionKey(sessionKey)) { return false; } if ( @@ -148,11 +145,10 @@ export function requiresCompletionRequiredAsyncTaskWait(params: { ) { return true; } - return listTaskRecords().some( + return listTasksForOwnerOrRequesterSessionKeyForStatus(sessionKey).some( (task) => COMPLETION_REQUIRED_TASK_KINDS.has(task.taskKind ?? "") && !isTerminalTaskStatus(task.status) && - (task.requesterSessionKey === sessionKey || task.ownerKey === sessionKey) && Boolean(task.runId?.trim()), ); } diff --git a/src/agents/openai-responses.reasoning-replay.test.ts b/src/agents/openai-responses.reasoning-replay.test.ts index 32eb83e7939e..12046b5bc67e 100644 --- a/src/agents/openai-responses.reasoning-replay.test.ts +++ b/src/agents/openai-responses.reasoning-replay.test.ts @@ -86,6 +86,7 @@ async function runAbortedOpenAIResponsesStream(params: { description: string; parameters: ReturnType; }>; + replayResponsesItemIds?: boolean; }) { const controller = new AbortController(); controller.abort(); @@ -100,11 +101,12 @@ async function runAbortedOpenAIResponsesStream(params: { }, { apiKey: "test", + replayResponsesItemIds: params.replayResponsesItemIds ?? true, signal: controller.signal, - onPayload: (nextPayload) => { + onPayload: (nextPayload: unknown) => { payload = nextPayload as Record; }, - }, + } as never, ); await responseStream.result(); diff --git a/src/agents/openai-transport-stream.test.ts b/src/agents/openai-transport-stream.test.ts index df6fd06160f2..388add3a699c 100644 --- a/src/agents/openai-transport-stream.test.ts +++ b/src/agents/openai-transport-stream.test.ts @@ -2750,12 +2750,14 @@ describe("openai transport stream", () => { call_id?: string; phase?: string; encrypted_content?: string; + summary?: unknown; }>; }; const reasoningItem = params.input?.find((item) => item.type === "reasoning"); expectRecordFields(reasoningItem, { type: "reasoning", + summary: [], }); expect(reasoningItem?.id).toBeUndefined(); expect(reasoningItem).not.toHaveProperty("encrypted_content"); @@ -2776,7 +2778,222 @@ describe("openai transport stream", () => { expect(functionCall?.id).toBeUndefined(); }); - it("preserves prior Responses replay item ids for custom Codex-compatible responses", () => { + it("omits Responses replay item ids when OpenAI Responses requests disable store", () => { + const params = buildOpenAIResponsesParams( + { + id: "gpt-5.5", + name: "GPT-5.5", + api: "openai-responses", + provider: "mycodex", + baseUrl: "http://127.0.0.1:8317/v1", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 1_000_000, + maxTokens: 128_000, + } satisfies Model<"openai-responses">, + { + systemPrompt: "system", + messages: [ + { + role: "assistant", + api: "openai-responses", + provider: "mycodex", + model: "gpt-5.5", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "toolUse", + timestamp: 1, + content: [ + { + type: "thinking", + thinking: "Need a tool.", + thinkingSignature: JSON.stringify({ + type: "reasoning", + id: "rs_prior", + encrypted_content: "ciphertext", + }), + }, + { + type: "text", + text: "Checking the price.", + textSignature: JSON.stringify({ + v: 1, + id: "msg_prior", + phase: "commentary", + }), + }, + { + type: "toolCall", + id: "call_abc|fc_prior", + name: "price_lookup", + arguments: { symbol: "SOL" }, + }, + ], + }, + { + role: "toolResult", + toolCallId: "call_abc|fc_prior", + toolName: "price_lookup", + content: [{ type: "text", text: "$83.95" }], + isError: false, + timestamp: 2, + }, + ], + tools: [], + } as never, + { sessionId: "session-123" }, + ) as { + store?: boolean; + input?: Array<{ + type?: string; + role?: string; + id?: string; + call_id?: string; + phase?: string; + encrypted_content?: string; + summary?: unknown; + }>; + }; + + expect(params.store).toBe(false); + const reasoningItem = params.input?.find((item) => item.type === "reasoning"); + expectRecordFields(reasoningItem, { + type: "reasoning", + summary: [], + }); + expect(reasoningItem?.id).toBeUndefined(); + expect(reasoningItem).not.toHaveProperty("encrypted_content"); + const assistantMessage = params.input?.find( + (item) => item.type === "message" && item.role === "assistant", + ); + expectRecordFields(assistantMessage, { + type: "message", + role: "assistant", + phase: "commentary", + }); + expect(assistantMessage?.id).toBeUndefined(); + const functionCall = params.input?.find((item) => item.type === "function_call"); + expectRecordFields(functionCall, { + type: "function_call", + call_id: "call_abc", + }); + expect(functionCall?.id).toBeUndefined(); + }); + + it("preserves Responses replay item ids when a store-enabled wrapper requests replay", () => { + const params = buildOpenAIResponsesParams( + { + id: "gpt-5.4", + name: "GPT-5.4", + api: "openai-responses", + provider: "openai", + baseUrl: "https://api.openai.com/v1", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200000, + maxTokens: 8192, + } satisfies Model<"openai-responses">, + { + systemPrompt: "system", + messages: [ + { + role: "assistant", + api: "openai-responses", + provider: "openai", + model: "gpt-5.4", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "toolUse", + timestamp: 1, + content: [ + { + type: "thinking", + thinking: "Need a tool.", + thinkingSignature: JSON.stringify({ + type: "reasoning", + id: "rs_prior", + encrypted_content: "ciphertext", + }), + }, + { + type: "text", + text: "Checking the price.", + textSignature: JSON.stringify({ + v: 1, + id: "msg_prior", + phase: "commentary", + }), + }, + { + type: "toolCall", + id: "call_abc|fc_prior", + name: "price_lookup", + arguments: { symbol: "SOL" }, + }, + ], + }, + { + role: "toolResult", + toolCallId: "call_abc|fc_prior", + toolName: "price_lookup", + content: [{ type: "text", text: "$83.95" }], + isError: false, + timestamp: 2, + }, + ], + tools: [], + } as never, + { replayResponsesItemIds: true, sessionId: "session-123" }, + ) as { + input?: Array<{ + type?: string; + role?: string; + id?: string; + call_id?: string; + phase?: string; + encrypted_content?: string; + summary?: unknown; + }>; + }; + + const reasoningItem = params.input?.find((item) => item.type === "reasoning"); + expectRecordFields(reasoningItem, { + type: "reasoning", + id: "rs_prior", + summary: [], + }); + const assistantMessage = params.input?.find( + (item) => item.type === "message" && item.role === "assistant", + ); + expectRecordFields(assistantMessage, { + type: "message", + role: "assistant", + id: "msg_prior", + phase: "commentary", + }); + const functionCall = params.input?.find((item) => item.type === "function_call"); + expectRecordFields(functionCall, { + type: "function_call", + id: "fc_prior", + call_id: "call_abc", + }); + }); + + it("omits prior Responses replay item ids when store is disabled for custom Codex-compatible responses", () => { const model = { id: "gpt-5.4", name: "GPT-5.4", @@ -2856,15 +3073,17 @@ describe("openai transport stream", () => { call_id?: string; phase?: string; encrypted_content?: string; + summary?: unknown; }>; }; const reasoningItem = params.input?.find((item) => item.type === "reasoning"); expectRecordFields(reasoningItem, { type: "reasoning", - id: "rs_prior", encrypted_content: "ciphertext", + summary: [], }); + expect(reasoningItem?.id).toBeUndefined(); expect(reasoningItem).not.toHaveProperty("__openclaw_replay"); const assistantMessage = params.input?.find( (item) => item.type === "message" && item.role === "assistant", @@ -2872,18 +3091,18 @@ describe("openai transport stream", () => { expectRecordFields(assistantMessage, { type: "message", role: "assistant", - id: "msg_prior", phase: "commentary", }); + expect(assistantMessage?.id).toBeUndefined(); const functionCall = params.input?.find((item) => item.type === "function_call"); expectRecordFields(functionCall, { type: "function_call", - id: "fc_prior", call_id: "call_abc", }); + expect(functionCall?.id).toBeUndefined(); }); - it("drops oversized GitHub Copilot Responses reasoning replay items before send", () => { + it("keeps GitHub Copilot Responses reasoning replay when store-disabled ids are omitted", () => { const model = { id: "gpt-5.5", name: "GPT-5.5", @@ -2934,6 +3153,73 @@ describe("openai transport stream", () => { tools: [], } as never, { sessionId: "session-123" }, + ) as { + input?: Array<{ + type?: string; + id?: string; + summary?: unknown; + }>; + }; + + const reasoningItem = params.input?.find((item) => item.type === "reasoning"); + expectRecordFields(reasoningItem, { + type: "reasoning", + summary: [], + }); + expect(reasoningItem?.id).toBeUndefined(); + }); + + it("drops oversized GitHub Copilot Responses reasoning replay ids before send", () => { + const model = { + id: "gpt-5.5", + name: "GPT-5.5", + api: "openai-responses", + provider: "github-copilot", + baseUrl: "https://api.githubcopilot.com", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 400000, + maxTokens: 8192, + } satisfies Model<"openai-responses">; + const longReasoningId = `rs_${"x".repeat(380)}`; + + const params = buildOpenAIResponsesParams( + model, + { + systemPrompt: "system", + messages: [ + { + role: "assistant", + api: "openai-responses", + provider: "github-copilot", + model: "gpt-5.5", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "toolUse", + timestamp: 1, + content: [ + { + type: "thinking", + thinking: "Need a tool.", + thinkingSignature: JSON.stringify({ + type: "reasoning", + id: longReasoningId, + summary: [], + }), + }, + ], + }, + ], + tools: [], + } as never, + { replayResponsesItemIds: true, sessionId: "session-123" }, ) as { input?: Array<{ type?: string; @@ -3006,14 +3292,16 @@ describe("openai transport stream", () => { type?: string; id?: string; encrypted_content?: string; + summary?: unknown; }>; }; const reasoningItem = params.input?.find((item) => item.type === "reasoning"); expectRecordFields(reasoningItem, { type: "reasoning", - id: "rs_prior", + summary: [], }); + expect(reasoningItem?.id).toBeUndefined(); expect(reasoningItem).not.toHaveProperty("encrypted_content"); }); @@ -3079,14 +3367,16 @@ describe("openai transport stream", () => { type?: string; id?: string; encrypted_content?: string; + summary?: unknown; }>; }; const reasoningItem = params.input?.find((item) => item.type === "reasoning"); expectRecordFields(reasoningItem, { type: "reasoning", - id: "rs_prior", + summary: [], }); + expect(reasoningItem?.id).toBeUndefined(); expect(reasoningItem).not.toHaveProperty("encrypted_content"); }); @@ -3154,15 +3444,17 @@ describe("openai transport stream", () => { type?: string; id?: string; encrypted_content?: string; + summary?: unknown; }>; }; const reasoningItem = params.input?.find((item) => item.type === "reasoning"); expectRecordFields(reasoningItem, { type: "reasoning", - id: "rs_prior", encrypted_content: "ciphertext", + summary: [], }); + expect(reasoningItem?.id).toBeUndefined(); expect(reasoningItem).not.toHaveProperty("__openclaw_replay"); }); @@ -3269,8 +3561,7 @@ describe("openai transport stream", () => { const functionOutput = params.input?.find((item) => item.type === "function_call_output"); expect(functionCall).toBeDefined(); expect(functionOutput).toBeDefined(); - expect(functionCall?.id).toMatch(/^fc_/); - expect(functionCall?.id?.length).toBeLessThanOrEqual(64); + expect(functionCall?.id).toBeUndefined(); expect(functionCall?.call_id).toBe("call_ug6lFGKwZDjHfzW8H0PDQRwN"); expect(functionOutput?.call_id).toBe(functionCall?.call_id); for (const item of params.input ?? []) { @@ -3283,7 +3574,7 @@ describe("openai transport stream", () => { } }); - it("keeps distinct overlong Copilot Responses replay item ids distinct", () => { + it("omits distinct overlong Copilot Responses replay item ids when store is disabled", () => { const sharedToolItemPrefix = "iVec" + "A".repeat(160); const firstToolCallId = `call_first|${sharedToolItemPrefix}Aa`; const secondToolCallId = `call_second|${sharedToolItemPrefix}BB`; @@ -3353,14 +3644,7 @@ describe("openai transport stream", () => { params.input?.filter((item) => item.type === "function_call_output") ?? []; expect(functionCalls).toHaveLength(2); expect(functionOutputs).toHaveLength(2); - expect(functionCalls.map((item) => item.id)).toEqual([ - expect.stringMatching(/^fc_/), - expect.stringMatching(/^fc_/), - ]); - expect(new Set(functionCalls.map((item) => item.id)).size).toBe(2); - for (const item of functionCalls) { - expect(item.id?.length).toBeLessThanOrEqual(64); - } + expect(functionCalls.map((item) => item.id)).toEqual([undefined, undefined]); expect(functionOutputs.map((item) => item.call_id)).toEqual(["call_first", "call_second"]); }); @@ -3678,69 +3962,66 @@ describe("openai transport stream", () => { baseUrl: "https://proxy.example.com/v1", }, }, - ])( - "omits orphan phase-tagged ids for $label responses payloads", - ({ label: _label, model }) => { - const params = buildOpenAIResponsesParams( - { - ...model, - reasoning: true, - input: ["text"], - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, - contextWindow: 200000, - maxTokens: 8192, - } as Model<"openai-responses">, - { - systemPrompt: "system", - messages: [ - { - role: "assistant", - api: model.api, - provider: model.provider, - model: model.id, - usage: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + ])("omits orphan phase-tagged ids for $label responses payloads", ({ label: _label, model }) => { + const params = buildOpenAIResponsesParams( + { + ...model, + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200000, + maxTokens: 8192, + } as Model<"openai-responses">, + { + systemPrompt: "system", + messages: [ + { + role: "assistant", + api: model.api, + provider: model.provider, + model: model.id, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: 1, + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ + v: 1, + id: "msg_commentary", + phase: "commentary", + }), }, - stopReason: "stop", - timestamp: 1, - content: [ - { - type: "text", - text: "Working...", - textSignature: JSON.stringify({ - v: 1, - id: "msg_commentary", - phase: "commentary", - }), - }, - ], - }, - { - role: "user", - content: "Continue", - timestamp: 2, - }, - ], - tools: [], - } as never, - undefined, - ) as { - input?: Array<{ role?: string; id?: string; phase?: string }>; - }; + ], + }, + { + role: "user", + content: "Continue", + timestamp: 2, + }, + ], + tools: [], + } as never, + undefined, + ) as { + input?: Array<{ role?: string; id?: string; phase?: string }>; + }; - const assistantItem = params.input?.find((item) => item.role === "assistant"); - expectRecordFields(assistantItem, { - role: "assistant", - phase: "commentary", - }); - expect(assistantItem?.id).toBeUndefined(); - }, - ); + const assistantItem = params.input?.find((item) => item.role === "assistant"); + expectRecordFields(assistantItem, { + role: "assistant", + phase: "commentary", + }); + expect(assistantItem?.id).toBeUndefined(); + }); it("strips the internal cache boundary from OpenAI system prompts", () => { const params = buildOpenAIResponsesParams( diff --git a/src/agents/openai-transport-stream.ts b/src/agents/openai-transport-stream.ts index eb255ef5e538..a06a1c5abbd9 100644 --- a/src/agents/openai-transport-stream.ts +++ b/src/agents/openai-transport-stream.ts @@ -173,6 +173,7 @@ type OpenAIResponsesOptions = BaseStreamOptions & { reasoning?: OpenAIReasoningEffort; reasoningEffort?: OpenAIReasoningEffort; reasoningSummary?: "auto" | "detailed" | "concise" | null; + replayResponsesItemIds?: boolean; serviceTier?: ResponseCreateParamsStreaming["service_tier"]; toolChoice?: ResponseCreateParamsStreaming["tool_choice"]; }; @@ -908,6 +909,16 @@ function readOpenAIResponsesReasoningReplayBlockMetadata( return isOpenAIResponsesReasoningReplayMetadata(value) ? value : undefined; } +function normalizeOpenAIResponsesReasoningReplayItem( + item: ReplayableResponseReasoningItem, +): ReplayableResponseReasoningItem { + const record = item as ReplayableResponseReasoningItem & Record; + if (record.type !== "reasoning" || Array.isArray(record.summary)) { + return item; + } + return { ...record, summary: [] } as ReplayableResponseReasoningItem; +} + function prepareOpenAIResponsesReasoningItemForReplay( item: ReplayableResponseReasoningItem, context: OpenAIResponsesReplayContext, @@ -916,16 +927,18 @@ function prepareOpenAIResponsesReasoningItemForReplay( const { [OPENAI_RESPONSES_REASONING_REPLAY_META_KEY]: rawMetadata, ...rest } = item as ReplayableResponseReasoningItem & Record; if (!("encrypted_content" in rest)) { - return rest as ReplayableResponseReasoningItem; + return normalizeOpenAIResponsesReasoningReplayItem(rest as ReplayableResponseReasoningItem); } const metadata = blockMetadata ?? (isOpenAIResponsesReasoningReplayMetadata(rawMetadata) ? rawMetadata : undefined); if (encryptedReasoningReplayMetadataMatches(metadata, context)) { - return rest as ReplayableResponseReasoningItem; + return normalizeOpenAIResponsesReasoningReplayItem(rest as ReplayableResponseReasoningItem); } const stripped = stripEncryptedContentFields(rest); - return stripped.value as ReplayableResponseReasoningItem; + return normalizeOpenAIResponsesReasoningReplayItem( + stripped.value as ReplayableResponseReasoningItem, + ); } async function createResponsesStreamWithEncryptedContentRetry(params: { @@ -1145,6 +1158,7 @@ function convertResponsesMessages( delete replayableReasoningItem.id; } if ( + shouldReplayResponsesItemIds && model.provider === "github-copilot" && !isSafeResponsesReplayItemId(replayableReasoningItem.id) ) { @@ -1190,7 +1204,7 @@ function convertResponsesMessages( : undefined; output.push({ type: "function_call", - id: itemId, + ...(itemId ? { id: itemId } : {}), call_id: callId, name: block.name, arguments: @@ -2077,6 +2091,12 @@ export function buildOpenAIResponsesParams( const compat = getCompat(model as OpenAIModeModel); const supportsDeveloperRole = typeof compat.supportsDeveloperRole === "boolean" ? compat.supportsDeveloperRole : undefined; + const payloadPolicy = resolveOpenAIResponsesPayloadPolicy(model, { + storeMode: "disable", + }); + const policyAllowsReplayIds = payloadPolicy.explicitStore !== false; + const replayResponsesItemIds = + !isNativeCodexResponses && (options?.replayResponsesItemIds ?? policyAllowsReplayIds); const messages = convertResponsesMessages( model, context, @@ -2085,7 +2105,7 @@ export function buildOpenAIResponsesParams( includeSystemPrompt: !isCodexResponses, supportsDeveloperRole, replayReasoningItems: true, - replayResponsesItemIds: !isNativeCodexResponses, + replayResponsesItemIds, authProfileId: options?.authProfileId, sessionId: options?.sessionId, }, @@ -2095,9 +2115,6 @@ export function buildOpenAIResponsesParams( } const cacheRetention = resolveCacheRetention(options?.cacheRetention); const promptCacheKey = resolvePromptCacheKey(options, cacheRetention); - const payloadPolicy = resolveOpenAIResponsesPayloadPolicy(model, { - storeMode: "disable", - }); const params: OpenAIResponsesRequestParams = { model: model.id, input: messages, diff --git a/src/channels/message/ingress-queue.ts b/src/channels/message/ingress-queue.ts index 27de960d15ca..bfcc94087c61 100644 --- a/src/channels/message/ingress-queue.ts +++ b/src/channels/message/ingress-queue.ts @@ -186,8 +186,8 @@ function affectedRows(result: { numAffectedRows?: bigint }): number { return Number(result.numAffectedRows ?? 0n); } -function parseJson(value: string): T { - return JSON.parse(value) as T; +function parseJson(value: string): unknown { + return JSON.parse(value) as unknown; } function baseRecord( @@ -198,13 +198,13 @@ function baseRecord( channelId: row.channel_id, accountId: row.account_id, queueName: row.queue_name, - payload: parseJson(row.payload_json), - ...(row.metadata_json === null ? {} : { metadata: parseJson(row.metadata_json) }), - receivedAt: Number(row.received_at), - updatedAt: Number(row.updated_at), + payload: parseJson(row.payload_json) as TPayload, + ...(row.metadata_json === null ? {} : { metadata: parseJson(row.metadata_json) as TMetadata }), + receivedAt: row.received_at, + updatedAt: row.updated_at, ...(row.lane_key === null ? {} : { laneKey: row.lane_key }), - attempts: Number(row.attempts), - ...(row.last_attempt_at === null ? {} : { lastAttemptAt: Number(row.last_attempt_at) }), + attempts: row.attempts, + ...(row.last_attempt_at === null ? {} : { lastAttemptAt: row.last_attempt_at }), ...(row.last_error === null ? {} : { lastError: row.last_error }), }; } @@ -217,7 +217,7 @@ function claimedRecord( claim: { token: row.claim_token ?? "", ownerId: row.claim_owner ?? "", - claimedAt: Number(row.claimed_at ?? 0), + claimedAt: row.claimed_at ?? 0, }, }; } @@ -230,10 +230,10 @@ function completedRecord( channelId: row.channel_id, accountId: row.account_id, queueName: row.queue_name, - completedAt: Number(row.completed_at ?? row.updated_at), + completedAt: row.completed_at ?? row.updated_at, ...(row.completed_metadata_json === null ? {} - : { metadata: parseJson(row.completed_metadata_json) }), + : { metadata: parseJson(row.completed_metadata_json) as TCompletedMetadata }), }; } @@ -243,7 +243,7 @@ function failedRecord(row: ChannelIngressRow): ChannelIngressQueueFailedRecord { channelId: row.channel_id, accountId: row.account_id, queueName: row.queue_name, - failedAt: Number(row.failed_at ?? row.updated_at), + failedAt: row.failed_at ?? row.updated_at, reason: row.failed_reason ?? "failed", ...(row.last_error === null ? {} : { message: row.last_error }), }; diff --git a/src/llm/providers/openai-chatgpt-responses.test.ts b/src/llm/providers/openai-chatgpt-responses.test.ts index f89a30a902de..bb6986ba0cdf 100644 --- a/src/llm/providers/openai-chatgpt-responses.test.ts +++ b/src/llm/providers/openai-chatgpt-responses.test.ts @@ -150,6 +150,101 @@ describe("streamOpenAICodexResponses transport", () => { expect(result.errorMessage).toContain("Request timed out after 5ms"); }); + it("does not replay Responses item ids for store-disabled ChatGPT requests", async () => { + let capturedPayload: + | { + store?: unknown; + input?: Array>; + } + | undefined; + const stream = streamOpenAICodexResponses( + model, + { + messages: [ + { + role: "assistant", + api: "openai-chatgpt-responses", + provider: model.provider, + model: model.id, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "toolUse", + timestamp: 1, + content: [ + { + type: "thinking", + thinking: "Need a tool.", + thinkingSignature: JSON.stringify({ + type: "reasoning", + id: "rs_prior", + encrypted_content: "ciphertext", + }), + }, + { + type: "text", + text: "Checking.", + textSignature: JSON.stringify({ + v: 1, + id: "msg_prior", + phase: "commentary", + }), + }, + { + type: "toolCall", + id: "call_abc|fc_prior", + name: "lookup", + arguments: {}, + }, + ], + }, + ], + }, + { + apiKey: createJwt({ + "https://api.openai.com/auth": { + chatgpt_account_id: "acct-1", + }, + }), + transport: "sse", + onPayload: (payload) => { + capturedPayload = payload as typeof capturedPayload; + throw new Error("stop after payload"); + }, + }, + ); + + const result = await stream.result(); + + expect(result.stopReason).toBe("error"); + expect(result.errorMessage).toBe("stop after payload"); + expect(capturedPayload?.store).toBe(false); + const reasoningItem = capturedPayload?.input?.find((item) => item.type === "reasoning"); + expect(reasoningItem).toMatchObject({ + type: "reasoning", + encrypted_content: "ciphertext", + summary: [], + }); + expect(reasoningItem).not.toHaveProperty("id"); + const messageItem = capturedPayload?.input?.find((item) => item.type === "message"); + expect(messageItem).toMatchObject({ + type: "message", + phase: "commentary", + }); + expect(messageItem).not.toHaveProperty("id"); + const functionCall = capturedPayload?.input?.find((item) => item.type === "function_call"); + expect(functionCall).toMatchObject({ + type: "function_call", + call_id: "call_abc", + }); + expect(functionCall).not.toHaveProperty("id"); + }); + it("caps oversized timeoutMs before creating request abort signals", async () => { stubHangingFetch(MAX_TIMER_TIMEOUT_MS); diff --git a/src/llm/providers/openai-chatgpt-responses.ts b/src/llm/providers/openai-chatgpt-responses.ts index 42503318ac48..2728a568c17d 100644 --- a/src/llm/providers/openai-chatgpt-responses.ts +++ b/src/llm/providers/openai-chatgpt-responses.ts @@ -480,6 +480,7 @@ function buildRequestBody( ): RequestBody { const messages = convertResponsesMessages(model, context, CODEX_TOOL_CALL_PROVIDERS, { includeSystemPrompt: false, + replayResponsesItemIds: false, }); const body: RequestBody = { @@ -1492,6 +1493,7 @@ async function processWebSocketStream( CODEX_TOOL_CALL_PROVIDERS, { includeSystemPrompt: false, + replayResponsesItemIds: false, }, ).filter((item) => item.type !== "function_call_output"); entry.continuation = { diff --git a/src/llm/providers/openai-compatible-auth.test.ts b/src/llm/providers/openai-compatible-auth.test.ts index 85545c9f0ac3..4b335b0a2363 100644 --- a/src/llm/providers/openai-compatible-auth.test.ts +++ b/src/llm/providers/openai-compatible-auth.test.ts @@ -54,4 +54,93 @@ describe("OpenAI-compatible provider credentials", () => { expect(result.stopReason).toBe("error"); expect(result.errorMessage).toBe("No API key for provider: custom-openai-compatible"); }); + + it("does not replay Responses item ids for direct store-disabled requests", async () => { + let capturedPayload: { store?: unknown; input?: Array> } | undefined; + const model = { + ...createBaseModel("openai-responses"), + reasoning: true, + } satisfies Model<"openai-responses">; + const stream = streamOpenAIResponses( + model, + { + messages: [ + { + role: "assistant", + api: "openai-responses", + provider: model.provider, + model: model.id, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "toolUse", + timestamp: 1, + content: [ + { + type: "thinking", + thinking: "Need a tool.", + thinkingSignature: JSON.stringify({ + type: "reasoning", + id: "rs_prior", + encrypted_content: "ciphertext", + }), + }, + { + type: "text", + text: "Checking.", + textSignature: JSON.stringify({ + v: 1, + id: "msg_prior", + phase: "commentary", + }), + }, + { + type: "toolCall", + id: "call_abc|fc_prior", + name: "lookup", + arguments: {}, + }, + ], + }, + ], + }, + { + apiKey: "sk-test", + onPayload: (payload) => { + capturedPayload = payload as typeof capturedPayload; + throw new Error("stop after payload"); + }, + }, + ); + + const result = await stream.result(); + + expect(result.stopReason).toBe("error"); + expect(result.errorMessage).toBe("stop after payload"); + expect(capturedPayload?.store).toBe(false); + const reasoningItem = capturedPayload?.input?.find((item) => item.type === "reasoning"); + expect(reasoningItem).toMatchObject({ + type: "reasoning", + encrypted_content: "ciphertext", + summary: [], + }); + expect(reasoningItem).not.toHaveProperty("id"); + const messageItem = capturedPayload?.input?.find((item) => item.type === "message"); + expect(messageItem).toMatchObject({ + type: "message", + phase: "commentary", + }); + expect(messageItem).not.toHaveProperty("id"); + const functionCall = capturedPayload?.input?.find((item) => item.type === "function_call"); + expect(functionCall).toMatchObject({ + type: "function_call", + call_id: "call_abc", + }); + expect(functionCall).not.toHaveProperty("id"); + }); }); diff --git a/src/llm/providers/openai-responses-shared.test.ts b/src/llm/providers/openai-responses-shared.test.ts index 68098048de81..17d1896942ea 100644 --- a/src/llm/providers/openai-responses-shared.test.ts +++ b/src/llm/providers/openai-responses-shared.test.ts @@ -238,4 +238,92 @@ describe("convertResponsesMessages", () => { ), ).not.toHaveProperty("id"); }); + + it("omits Responses replay item ids when requested by store-disabled callers", () => { + const input = convertResponsesMessages( + nativeOpenAIModel, + { + systemPrompt: "system", + messages: [ + { + role: "assistant", + api: nativeOpenAIModel.api, + provider: nativeOpenAIModel.provider, + model: nativeOpenAIModel.id, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "toolUse", + timestamp: 1, + content: [ + { + type: "thinking", + thinking: "Need a tool.", + thinkingSignature: JSON.stringify({ + type: "reasoning", + id: "rs_prior", + encrypted_content: "ciphertext", + }), + }, + { + type: "text", + text: "Checking the price.", + textSignature: JSON.stringify({ + v: 1, + id: "msg_prior", + phase: "commentary", + }), + }, + { + type: "toolCall", + id: "call_abc|fc_prior", + name: "price_lookup", + arguments: { symbol: "SOL" }, + }, + ], + }, + { + role: "toolResult", + toolCallId: "call_abc|fc_prior", + toolName: "price_lookup", + content: [{ type: "text", text: "$83.95" }], + isError: false, + timestamp: 2, + }, + ], + } satisfies Context, + allowedToolCallProviders, + { includeSystemPrompt: false, replayResponsesItemIds: false }, + ) as unknown as Array>; + + const reasoningItem = input.find((item) => item.type === "reasoning"); + expect(reasoningItem).toMatchObject({ + type: "reasoning", + encrypted_content: "ciphertext", + summary: [], + }); + expect(reasoningItem).not.toHaveProperty("id"); + + const assistantMessage = input.find( + (item) => item.type === "message" && item.role === "assistant", + ); + expect(assistantMessage).toMatchObject({ + type: "message", + role: "assistant", + phase: "commentary", + }); + expect(assistantMessage).not.toHaveProperty("id"); + + const functionCall = input.find((item) => item.type === "function_call"); + expect(functionCall).toMatchObject({ + type: "function_call", + call_id: "call_abc", + }); + expect(functionCall).not.toHaveProperty("id"); + }); }); diff --git a/src/llm/providers/openai-responses-shared.ts b/src/llm/providers/openai-responses-shared.ts index e5b92288790e..b0fc65e34e5d 100644 --- a/src/llm/providers/openai-responses-shared.ts +++ b/src/llm/providers/openai-responses-shared.ts @@ -41,6 +41,21 @@ import { transformMessages } from "./transform-messages.js"; // ============================================================================= type ReplayableResponseOutputMessage = Omit & { id?: string }; +type ReplayableResponseReasoningItem = Omit & { id?: string }; + +function normalizeResponsesReasoningReplayItem(params: { + item: ReplayableResponseReasoningItem; + replayResponsesItemIds: boolean; +}): ReplayableResponseReasoningItem { + const next = { ...(params.item as ReplayableResponseReasoningItem & Record) }; + if (!Array.isArray(next.summary)) { + next.summary = []; + } + if (!params.replayResponsesItemIds) { + delete next.id; + } + return next as ReplayableResponseReasoningItem; +} function encodeTextSignatureV1(id: string, phase?: TextSignatureV1["phase"]): string { const payload: TextSignatureV1 = { v: 1, id }; @@ -106,6 +121,7 @@ export interface OpenAIResponsesStreamOptions { export interface ConvertResponsesMessagesOptions { includeSystemPrompt?: boolean; + replayResponsesItemIds?: boolean; } export { convertResponsesTools }; export type { ConvertResponsesToolsOptions } from "./openai-responses-tools.js"; @@ -156,6 +172,7 @@ export function convertResponsesMessages( options?: ConvertResponsesMessagesOptions, ): ResponseInput { const messages: ResponseInput = []; + const shouldReplayResponsesItemIds = options?.replayResponsesItemIds ?? true; const normalizeIdPart = (part: string): string => { const sanitized = part.replace(/[^a-zA-Z0-9_-]/g, "_"); @@ -247,19 +264,24 @@ export function convertResponsesMessages( for (const block of msg.content) { if (block.type === "thinking") { if (block.thinkingSignature) { - const reasoningItem = JSON.parse(block.thinkingSignature) as ResponseReasoningItem; - output.push(reasoningItem); + const reasoningItem = normalizeResponsesReasoningReplayItem({ + item: JSON.parse(block.thinkingSignature) as ReplayableResponseReasoningItem, + replayResponsesItemIds: shouldReplayResponsesItemIds, + }); + output.push(reasoningItem as ResponseInputItem); previousReplayItemWasReasoning = true; } } else if (block.type === "text") { const textBlock = block; const parsedSignature = parseTextSignature(textBlock.textSignature); - let msgId = resolveReplayableResponsesMessageId({ - textSignatureId: parsedSignature?.id, - fallbackId: `msg_${msgIndex}`, - fallbackOrdinal: textFallbackOrdinal, - previousReplayItemWasReasoning, - }); + let msgId = shouldReplayResponsesItemIds + ? resolveReplayableResponsesMessageId({ + textSignatureId: parsedSignature?.id, + fallbackId: `msg_${msgIndex}`, + fallbackOrdinal: textFallbackOrdinal, + previousReplayItemWasReasoning, + }) + : undefined; if (!parsedSignature?.id) { textFallbackOrdinal += 1; } @@ -281,18 +303,18 @@ export function convertResponsesMessages( } else if (block.type === "toolCall") { const toolCall = block; const [callId, itemIdRaw] = toolCall.id.split("|"); - let itemId: string | undefined = itemIdRaw; + let itemId: string | undefined = shouldReplayResponsesItemIds ? itemIdRaw : undefined; // For different-model messages, set id to undefined to avoid pairing validation. // OpenAI tracks which fc_xxx IDs were paired with rs_xxx reasoning items. // By omitting the id, we avoid triggering that validation (like cross-provider does). - if (isDifferentModel && itemId?.startsWith("fc_")) { + if (shouldReplayResponsesItemIds && isDifferentModel && itemId?.startsWith("fc_")) { itemId = undefined; } output.push({ type: "function_call", - id: itemId, + ...(itemId ? { id: itemId } : {}), call_id: callId, name: toolCall.name, arguments: JSON.stringify(toolCall.arguments), diff --git a/src/llm/providers/openai-responses.ts b/src/llm/providers/openai-responses.ts index 61082850b5ef..5c4e129a7cb2 100644 --- a/src/llm/providers/openai-responses.ts +++ b/src/llm/providers/openai-responses.ts @@ -74,9 +74,14 @@ function formatOpenAIResponsesError(error: unknown): string { export interface OpenAIResponsesOptions extends StreamOptions { reasoningEffort?: "minimal" | "low" | "medium" | "high" | "xhigh"; reasoningSummary?: "auto" | "detailed" | "concise" | null; + replayResponsesItemIds?: boolean; serviceTier?: ResponseCreateParamsStreaming["service_tier"]; } +type OpenAIResponsesReplayOptions = SimpleStreamOptions & { + replayResponsesItemIds?: boolean; +}; + /** * Generate function for OpenAI Responses API */ @@ -126,6 +131,8 @@ export const streamSimpleOpenAIResponses: StreamFunction< return streamOpenAIResponses(model, context, { ...base, reasoningEffort: resolveResponsesReasoningEffort(model, options?.reasoning), + replayResponsesItemIds: (options as OpenAIResponsesReplayOptions | undefined) + ?.replayResponsesItemIds, } satisfies OpenAIResponsesOptions); }; @@ -185,7 +192,9 @@ function buildParams( context: Context, options?: OpenAIResponsesOptions, ) { - const messages = convertResponsesMessages(model, context, OPENAI_TOOL_CALL_PROVIDERS); + const messages = convertResponsesMessages(model, context, OPENAI_TOOL_CALL_PROVIDERS, { + replayResponsesItemIds: options?.replayResponsesItemIds ?? false, + }); const cacheRetention = resolveCacheRetention(options?.cacheRetention); const compat = getCompat(model); diff --git a/src/llm/providers/stream-wrappers/openai.ts b/src/llm/providers/stream-wrappers/openai.ts index 38c8b038a39b..608deaa80271 100644 --- a/src/llm/providers/stream-wrappers/openai.ts +++ b/src/llm/providers/stream-wrappers/openai.ts @@ -37,6 +37,9 @@ type OpenAIServiceTier = "auto" | "default" | "flex" | "priority"; type OpenClawSimpleStreamOptions = SimpleStreamOptions & { openclawCodeModeToolSurface?: boolean; }; +type OpenAIResponsesReplayOptions = Parameters[2] & { + replayResponsesItemIds?: boolean; +}; export { resolveOpenAITextVerbosity }; function resolveOpenAITextVerbosityForModel( @@ -393,15 +396,21 @@ export function createOpenAIResponsesContextManagementWrapper( } const originalOnPayload = options?.onPayload; - return underlying(model, context, { + const replayResponsesItemIds = + policy.explicitStore === undefined + ? (options as OpenAIResponsesReplayOptions | undefined)?.replayResponsesItemIds + : policy.explicitStore; + const nextOptions: OpenAIResponsesReplayOptions = { ...options, + ...(replayResponsesItemIds === undefined ? {} : { replayResponsesItemIds }), onPayload: (payload) => { if (payload && typeof payload === "object") { applyOpenAIResponsesPayloadPolicy(payload as Record, policy); } return originalOnPayload?.(payload, model); }, - }); + }; + return underlying(model, context, nextOptions); }; } diff --git a/src/plugin-sdk/plugin-state-test-runtime.ts b/src/plugin-sdk/plugin-state-test-runtime.ts index 7750b4f3d335..3fa25271d965 100644 --- a/src/plugin-sdk/plugin-state-test-runtime.ts +++ b/src/plugin-sdk/plugin-state-test-runtime.ts @@ -3,3 +3,10 @@ export { createPluginStateSyncKeyedStore as createPluginStateSyncKeyedStoreForTests, resetPluginStateStoreForTests, } from "../plugin-state/plugin-state-store.js"; +export { createChannelIngressQueue as createChannelIngressQueueForTests } from "../channels/message/ingress-queue.js"; +export { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js"; +export type { DB as OpenClawStateKyselyDatabaseForTests } from "../state/openclaw-state-db.generated.js"; +export { + closeOpenClawStateDatabaseForTest, + openOpenClawStateDatabase, +} from "../state/openclaw-state-db.js"; diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index 8501d116ca9f..ca0b3ff3d79c 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -2636,7 +2636,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { assertPluginStateAllowed(); const stateDir = options?.stateDir ?? baseState.resolveStateDir(); return createChannelIngressQueue({ - ...(options ?? {}), + ...options, channelId: pluginId, stateDir, }); diff --git a/src/tasks/task-status-access.ts b/src/tasks/task-status-access.ts index e251816e033d..89eaf32e9978 100644 --- a/src/tasks/task-status-access.ts +++ b/src/tasks/task-status-access.ts @@ -1,4 +1,10 @@ -import { getTaskById, listTasksForAgentId, listTasksForSessionKey } from "./task-registry.js"; +import { + findTaskByRunId, + getTaskById, + listTaskRecords, + listTasksForAgentId, + listTasksForSessionKey, +} from "./task-registry.js"; import type { TaskRecord } from "./task-registry.types.js"; export function getTaskSessionLookupByIdForStatus( @@ -18,6 +24,16 @@ export function listTasksForSessionKeyForStatus(sessionKey: string): TaskRecord[ return listTasksForSessionKey(sessionKey); } +export function listTasksForOwnerOrRequesterSessionKeyForStatus(sessionKey: string): TaskRecord[] { + return listTaskRecords().filter( + (task) => task.requesterSessionKey === sessionKey || task.ownerKey === sessionKey, + ); +} + export function listTasksForAgentIdForStatus(agentId: string): TaskRecord[] { return listTasksForAgentId(agentId); } + +export function findTaskByRunIdForStatus(runId: string): TaskRecord | undefined { + return findTaskByRunId(runId); +}