diff --git a/CHANGELOG.md b/CHANGELOG.md index 68d36bbb441b..469c6b142e76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ Docs: https://docs.openclaw.ai - Agents/code mode: return structured timeout and runtime-unavailable error codes for known worker failures. Fixes #83389. (#83444) Thanks @Kaspre. - QA-Lab: isolate multi-scenario suite workers when scenarios need startup config patches, preventing message-routing config from leaking into unrelated scenarios. - QA-Lab: make the commitments heartbeat-target-none scenario request an immediate heartbeat instead of waiting for the next scheduled heartbeat. +- Codex/Plugin SDK: deliver Codex-native subagent completions through a generic harness task runtime so harness-backed plugins can mirror durable task lifecycle and completion delivery without Codex-specific SDK imports. (#83445) Thanks @bryanpearson. - Gateway CLI: surface local post-challenge connect assembly failures immediately instead of waiting for the wrapper timeout. Fixes #68944. (#85253) Thanks @samzong. - Messages: strip unsupported web-search citation control markers from outbound replies before they reach WebChat or external channels. Fixes #85193. (#85204) Thanks @neeravmakwana. - Agents/exec: treat denied exec approvals as terminal instead of feeding them back into agent follow-up work, and recognize Chinese stop phrases in abort handling. Fixes #69386. (#85194) Thanks @samzong. diff --git a/docs/plugins/sdk-subpaths.md b/docs/plugins/sdk-subpaths.md index 989bcf0b0ea6..a28bd26c5701 100644 --- a/docs/plugins/sdk-subpaths.md +++ b/docs/plugins/sdk-subpaths.md @@ -44,8 +44,8 @@ but new code should not add imports from them: `agent-runtime-test-contracts`, ### Reserved bundled plugin helper subpaths -These subpaths are plugin-owned compatibility surfaces reserved for their owning -bundled plugin, not general SDK APIs: `plugin-sdk/codex-mcp-projection` and +These subpaths are plugin-owned compatibility surfaces for their owning bundled +plugin, not general SDK APIs: `plugin-sdk/codex-mcp-projection` and `plugin-sdk/codex-native-task-runtime`. Cross-owner extension imports are blocked by package contract guardrails. @@ -224,8 +224,9 @@ focused channel/runtime subpaths, `config-contracts`, `string-coerce-runtime`, | `plugin-sdk/runtime` | Broad runtime/logging/backup/plugin-install helpers | | `plugin-sdk/runtime-env` | Narrow runtime env, logger, timeout, retry, and backoff helpers | | `plugin-sdk/browser-config` | Supported browser config facade for normalized profile/defaults, CDP URL parsing, and browser-control auth helpers | + | `plugin-sdk/agent-harness-task-runtime` | Generic task lifecycle and completion delivery helpers for harness-backed agents using a host-issued task scope | | `plugin-sdk/codex-mcp-projection` | Reserved bundled Codex helper for projecting user MCP server config into Codex thread config; not for third-party plugins | - | `plugin-sdk/codex-native-task-runtime` | Reserved bundled Codex helper for native task mirror/runtime wiring; not for third-party plugins | + | `plugin-sdk/codex-native-task-runtime` | Private bundled Codex helper for native task mirror/runtime wiring; not for third-party plugins | | `plugin-sdk/channel-runtime-context` | Generic channel runtime-context registration and lookup helpers | | `plugin-sdk/matrix` | Deprecated Matrix compatibility facade for older third-party channel packages; new plugins should import `plugin-sdk/run-command` directly | | `plugin-sdk/mattermost` | Deprecated Mattermost compatibility facade for older third-party channel packages; new plugins should import generic SDK subpaths directly | diff --git a/extensions/codex/src/app-server/event-projector.test.ts b/extensions/codex/src/app-server/event-projector.test.ts index baa9045e40fc..f786534a2258 100644 --- a/extensions/codex/src/app-server/event-projector.test.ts +++ b/extensions/codex/src/app-server/event-projector.test.ts @@ -20,7 +20,6 @@ import { type CodexAppServerEventProjectorOptions, type CodexAppServerToolTelemetry, } from "./event-projector.js"; -import { CodexNativeSubagentTaskMirror } from "./native-subagent-task-mirror.js"; import { rememberCodexRateLimits, resetCodexRateLimitCacheForTests } from "./rate-limit-cache.js"; import { createCodexTestModel } from "./test-support.js"; @@ -996,36 +995,6 @@ describe("CodexAppServerEventProjector", () => { expect(result.assistantTexts).toStrictEqual([]); }); - it("mirrors native subagent notifications before current-turn filtering", async () => { - const projector = await createProjector({ - ...(await createParams()), - sessionKey: "agent:main:main", - } as EmbeddedRunAttemptParams); - const mirrorSpy = vi.spyOn(CodexNativeSubagentTaskMirror.prototype, "handleNotification"); - const notification = { - method: "item/completed", - params: { - threadId: THREAD_ID, - turnId: "child-turn", - item: { - type: "collabAgentToolCall", - tool: "spawnAgent", - senderThreadId: THREAD_ID, - receiverThreadIds: ["child-thread"], - agentsStates: { - "child-thread": { status: "completed", message: "done" }, - }, - }, - }, - } as ProjectorNotification; - - await projector.handleNotification(notification); - - expect(mirrorSpy).toHaveBeenCalledWith(notification); - const result = projector.buildResult(buildEmptyToolTelemetry()); - expect(result.assistantTexts).toEqual([]); - }); - it("ignores notifications that omit top-level thread and turn ids", async () => { const projector = await createProjector(); diff --git a/extensions/codex/src/app-server/event-projector.ts b/extensions/codex/src/app-server/event-projector.ts index 20181da8ee20..f089d6ecfaeb 100644 --- a/extensions/codex/src/app-server/event-projector.ts +++ b/extensions/codex/src/app-server/event-projector.ts @@ -22,7 +22,6 @@ import { } from "openclaw/plugin-sdk/agent-harness-runtime"; import { emitTrustedDiagnosticEvent } from "openclaw/plugin-sdk/diagnostic-runtime"; import { resolveCodexLocalRuntimeAttribution } from "./local-runtime-attribution.js"; -import { CodexNativeSubagentTaskMirror } from "./native-subagent-task-mirror.js"; import { readCodexNotificationThreadId, readCodexNotificationTurnId, @@ -169,34 +168,19 @@ export class CodexAppServerEventProjector { private guardianReviewCount = 0; private completedCompactionCount = 0; private latestRateLimits: JsonValue | undefined; - private readonly nativeSubagentTaskMirror: CodexNativeSubagentTaskMirror; constructor( private readonly params: EmbeddedRunAttemptParams, private readonly threadId: string, private readonly turnId: string, private readonly options: CodexAppServerEventProjectorOptions = {}, - ) { - this.nativeSubagentTaskMirror = new CodexNativeSubagentTaskMirror({ - parentThreadId: threadId, - requesterSessionKey: params.sessionKey, - agentId: params.agentId, - }); - } + ) {} async handleNotification(notification: CodexServerNotification): Promise { const params = isJsonObject(notification.params) ? notification.params : undefined; if (!params) { return; } - try { - this.nativeSubagentTaskMirror.handleNotification(notification); - } catch (error) { - embeddedAgentLog.warn("Failed to mirror Codex native subagent lifecycle event", { - method: notification.method, - error: formatErrorMessage(error), - }); - } if (notification.method === "account/rateLimits/updated") { this.latestRateLimits = params; rememberCodexRateLimits(params); diff --git a/extensions/codex/src/app-server/native-subagent-monitor.test.ts b/extensions/codex/src/app-server/native-subagent-monitor.test.ts new file mode 100644 index 000000000000..42a61b170889 --- /dev/null +++ b/extensions/codex/src/app-server/native-subagent-monitor.test.ts @@ -0,0 +1,1125 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import type { + AgentHarnessTaskRecord, + AgentHarnessTaskRuntimeScope, +} from "openclaw/plugin-sdk/agent-harness-task-runtime"; +import { describe, expect, it, vi } from "vitest"; +import { + CodexNativeSubagentMonitor, + registerCodexNativeSubagentMonitor, +} from "./native-subagent-monitor.js"; +import type { CodexServerNotification } from "./protocol.js"; + +function createClient() { + const handlers = new Set<(notification: CodexServerNotification) => Promise | void>(); + const closeHandlers = new Set<() => void>(); + return { + addNotificationHandler( + handler: (notification: CodexServerNotification) => Promise | void, + ) { + handlers.add(handler); + return () => handlers.delete(handler); + }, + addCloseHandler(handler: (client: never) => void) { + const closeHandler = () => handler(undefined as never); + closeHandlers.add(closeHandler); + return () => { + closeHandlers.delete(closeHandler); + }; + }, + async notify(notification: CodexServerNotification) { + await Promise.all([...handlers].map(async (handler) => await handler(notification))); + }, + close() { + for (const handler of closeHandlers) { + handler(); + } + }, + }; +} + +function createRuntime() { + type DeliveryResult = { + delivered: boolean; + path: "direct" | "steered" | "none"; + error?: string; + phases?: Array<{ + phase: "direct-primary" | "steer-primary" | "steer-fallback"; + delivered: boolean; + path: "direct" | "steered" | "none"; + error?: string; + }>; + }; + const taskRuntime = { + createRunningTaskRun: vi.fn(), + recordTaskRunProgressByRunId: vi.fn(() => []), + finalizeTaskRunByRunId: vi.fn(() => []), + listTaskRecords: vi.fn((): AgentHarnessTaskRecord[] => []), + setDetachedTaskDeliveryStatusByRunId: vi.fn(() => []), + }; + return { + ...taskRuntime, + createAgentHarnessTaskRuntime: vi.fn(() => taskRuntime), + deliverAgentHarnessTaskCompletion: vi.fn( + async (): Promise => ({ + delivered: true, + path: "direct" as const, + }), + ), + }; +} + +function createTaskScope(requesterSessionKey = "agent:main:discord:channel:C123") { + return { requesterSessionKey } as AgentHarnessTaskRuntimeScope; +} + +async function notifyChildStarted( + client: ReturnType, + parentThreadId = "parent-thread", + childThreadId = "child-thread", + agentPath = childThreadId, +): Promise { + await client.notify({ + method: "thread/started", + params: { + thread: { + id: childThreadId, + source: { + subAgent: { + thread_spawn: { + parent_thread_id: parentThreadId, + depth: 1, + agent_path: agentPath, + }, + }, + }, + }, + }, + }); +} + +function nativeCompletionNotification(params: { + agentPath: string; + statusLabel: string; + result: string | null; + parentThreadId?: string; +}): CodexServerNotification { + const statusValue = params.result === null ? "null" : JSON.stringify(params.result); + const content = + `{"agent_path":${JSON.stringify(params.agentPath)},"status":{` + + `${JSON.stringify(params.statusLabel)}:${statusValue}}}`; + return { + method: "rawResponseItem/completed", + params: { + threadId: params.parentThreadId ?? "parent-thread", + item: { + type: "message", + role: "assistant", + phase: "commentary", + content: [ + { + type: "output_text", + text: JSON.stringify({ + author: params.agentPath, + recipient: "/root", + other_recipients: [], + content, + trigger_turn: false, + }), + }, + ], + }, + }, + }; +} + +describe("CodexNativeSubagentMonitor", () => { + it("keeps native subagent task mirroring alive on the shared client", async () => { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + taskRuntimeScope: createTaskScope("agent:main:main"), + agentId: "main", + }); + + await client.notify({ + method: "thread/started", + params: { + thread: { + id: "child-thread", + preview: "inspect the repo", + source: { + subAgent: { + thread_spawn: { + parent_thread_id: "parent-thread", + depth: 1, + agent_nickname: "Engineer", + }, + }, + }, + }, + }, + }); + await client.notify({ + method: "thread/status/changed", + params: { + threadId: "child-thread", + status: { type: "idle" }, + }, + }); + + expect(runtime.createRunningTaskRun).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + label: "Engineer", + task: "inspect the repo", + }), + ); + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + status: "succeeded", + }), + ); + }); + + it("delivers parent wakeups from Codex-native subagent completion notifications", async () => { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + const completion = nativeCompletionNotification({ + agentPath: "child-thread", + statusLabel: "completed", + result: "child final result", + }); + + await notifyChildStarted(client); + await client.notify(completion); + await client.notify(completion); + + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + status: "succeeded", + terminalSummary: "child final result", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledTimes(1); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + scope: expect.any(Object), + childSessionKey: "codex-thread:child-thread", + childSessionId: "child-thread", + announceId: "codex-native:parent-thread:child-thread:succeeded", + status: "succeeded", + statusLabel: "completed", + result: "child final result", + }), + ); + expect(runtime.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + deliveryStatus: "pending", + }), + ); + expect(runtime.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + deliveryStatus: "delivered", + }), + ); + }); + + it("delivers failed parent wakeups from Codex errored subagent notifications", async () => { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await notifyChildStarted(client); + await client.notify( + nativeCompletionNotification({ + agentPath: "child-thread", + statusLabel: "errored", + result: "child failed", + }), + ); + + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + status: "failed", + terminalSummary: "child failed", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + childSessionKey: "codex-thread:child-thread", + childSessionId: "child-thread", + announceId: "codex-native:parent-thread:child-thread:failed", + status: "failed", + statusLabel: "errored", + result: "child failed", + }), + ); + }); + + it("maps Codex agent_path completion notifications to child thread ids", async () => { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await notifyChildStarted(client, "parent-thread", "child-thread-id", "reviewer"); + await client.notify( + nativeCompletionNotification({ + agentPath: "reviewer", + statusLabel: "completed", + result: "review done", + }), + ); + + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread-id", + status: "succeeded", + terminalSummary: "review done", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + childSessionKey: "codex-thread:child-thread-id", + childSessionId: "child-thread-id", + announceId: "codex-native:parent-thread:child-thread-id:succeeded", + result: "review done", + }), + ); + }); + + it("maps item-only child thread ids as completion notification agent paths", async () => { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await client.notify({ + method: "item/started", + params: { + item: { + type: "collabAgentToolCall", + senderThreadId: "parent-thread", + receiverThreadIds: ["item-only-child"], + tool: "spawn_agent", + prompt: "inspect one thing", + }, + }, + }); + await client.notify( + nativeCompletionNotification({ + agentPath: "item-only-child", + statusLabel: "completed", + result: "item-only done", + }), + ); + + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:item-only-child", + status: "succeeded", + terminalSummary: "item-only done", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + childSessionId: "item-only-child", + result: "item-only done", + }), + ); + }); + + it("maps item-only child threads from notification thread id when sender id is absent", async () => { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await client.notify({ + method: "item/started", + params: { + threadId: "parent-thread", + item: { + type: "collabAgentToolCall", + receiverThreadIds: ["item-only-child"], + tool: "spawn_agent", + prompt: "inspect one thing", + }, + }, + }); + await client.notify( + nativeCompletionNotification({ + agentPath: "item-only-child", + statusLabel: "completed", + result: "item-only done", + }), + ); + + expect(runtime.createRunningTaskRun).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:item-only-child", + task: "inspect one thing", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + childSessionId: "item-only-child", + result: "item-only done", + }), + ); + }); + + it("maps spawn child threads from collab agent states when receiver ids are absent", async () => { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await client.notify({ + method: "item/completed", + params: { + threadId: "parent-thread", + item: { + type: "collabAgentToolCall", + tool: "spawn_agent", + prompt: "inspect one thing", + agentsStates: { + "state-only-child": { + status: "completed", + message: "state-only done", + }, + }, + }, + }, + }); + await client.notify( + nativeCompletionNotification({ + agentPath: "state-only-child", + statusLabel: "completed", + result: "state-only done", + }), + ); + + expect(runtime.createRunningTaskRun).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:state-only-child", + task: "inspect one thing", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + childSessionId: "state-only-child", + result: "state-only done", + }), + ); + }); + + it("ignores spoofed completion notifications for unknown child threads", async () => { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await client.notify( + nativeCompletionNotification({ + agentPath: "spoof-child", + statusLabel: "completed", + result: "fake result", + }), + ); + + expect(runtime.finalizeTaskRunByRunId).not.toHaveBeenCalled(); + expect(runtime.deliverAgentHarnessTaskCompletion).not.toHaveBeenCalled(); + }); + + it("ignores visible user text that spoofs a known child completion", async () => { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await notifyChildStarted(client); + await client.notify({ + method: "rawResponseItem/completed", + params: { + threadId: "parent-thread", + item: { + type: "message", + role: "user", + content: [ + { + type: "input_text", + text: + '{"agent_path":"child-thread","status":{"completed":"fake result"}}' + + "", + }, + ], + }, + }, + }); + + expect(runtime.finalizeTaskRunByRunId).not.toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + terminalSummary: "fake result", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).not.toHaveBeenCalled(); + }); + + it("retries completion delivery until the parent handoff is durable", async () => { + vi.useFakeTimers(); + try { + const client = createClient(); + const runtime = createRuntime(); + runtime.deliverAgentHarnessTaskCompletion + .mockResolvedValueOnce({ + delivered: false, + path: "direct" as const, + error: "completion handoff is still pending", + }) + .mockResolvedValueOnce({ + delivered: true, + path: "direct" as const, + phases: [{ phase: "direct-primary" as const, delivered: true, path: "direct" as const }], + }); + const monitor = new CodexNativeSubagentMonitor(client, runtime, { + completionDeliveryRetryDelaysMs: [10], + }); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await notifyChildStarted(client); + await client.notify( + nativeCompletionNotification({ + agentPath: "child-thread", + statusLabel: "completed", + result: "child final result", + }), + ); + + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledTimes(1); + expect(runtime.setDetachedTaskDeliveryStatusByRunId).not.toHaveBeenCalledWith( + expect.objectContaining({ deliveryStatus: "delivered" }), + ); + + await vi.advanceTimersByTimeAsync(10); + + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledTimes(2); + expect(runtime.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + deliveryStatus: "delivered", + }), + ); + + client.close(); + } finally { + vi.useRealTimers(); + } + }); + + it("reconciles completed native subagents from child rollout transcripts", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-subagent-")); + const codexHome = path.join(tempDir, "codex-home"); + const transcriptDir = path.join(codexHome, "sessions", "2026", "05", "17"); + await fs.mkdir(transcriptDir, { recursive: true }); + await fs.writeFile( + path.join(transcriptDir, "rollout-2026-05-17T17-14-08-child-thread.jsonl"), + [ + JSON.stringify({ + timestamp: "2026-05-18T00:14:08.000Z", + type: "session_meta", + payload: { + source: { + subagent: { + thread_spawn: { + parent_thread_id: "parent-thread", + depth: 1, + }, + }, + }, + thread_source: "subagent", + }, + }), + JSON.stringify({ + timestamp: "2026-05-18T00:14:48.094Z", + type: "event_msg", + payload: { + type: "task_complete", + last_agent_message: "child transcript final result", + completed_at: 1779063288, + }, + }), + "", + ].join("\n"), + ); + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime, { + codexHome, + transcriptPollDelaysMs: [60_000], + }); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await client.notify({ + method: "item/started", + params: { + item: { + type: "collabAgentToolCall", + senderThreadId: "parent-thread", + receiverThreadIds: ["child-thread"], + tool: "spawn_agent", + prompt: "check the weather", + }, + }, + }); + + await expect(monitor.reconcileChildTranscript("child-thread")).resolves.toBe(true); + + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + status: "succeeded", + endedAt: 1779063288000, + terminalSummary: "child transcript final result", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + scope: expect.any(Object), + childSessionKey: "codex-thread:child-thread", + childSessionId: "child-thread", + status: "succeeded", + statusLabel: "task_complete", + result: "child transcript final result", + }), + ); + + client.close(); + }); + + it("keeps polling after a transcript candidate belongs to a different parent", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-subagent-")); + const codexHome = path.join(tempDir, "codex-home"); + const transcriptDir = path.join(codexHome, "sessions", "2026", "05", "17"); + await fs.mkdir(transcriptDir, { recursive: true }); + const transcriptPath = path.join( + transcriptDir, + "rollout-2026-05-17T17-14-08-child-thread.jsonl", + ); + const writeTranscript = async (parentThreadId: string, message: string) => { + await fs.writeFile( + transcriptPath, + [ + JSON.stringify({ + type: "session_meta", + payload: { + source: { + subagent: { thread_spawn: { parent_thread_id: parentThreadId } }, + }, + }, + }), + JSON.stringify({ + timestamp: "2026-05-18T00:14:48.094Z", + type: "event_msg", + payload: { + type: "task_complete", + last_agent_message: message, + completed_at: 1779063288, + }, + }), + "", + ].join("\n"), + ); + }; + await writeTranscript("other-parent-thread", "wrong parent result"); + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime, { + codexHome, + transcriptPollDelaysMs: [60_000], + }); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + await notifyChildStarted(client); + + await expect(monitor.reconcileChildTranscript("child-thread")).resolves.toBe(false); + expect(runtime.finalizeTaskRunByRunId).not.toHaveBeenCalledWith( + expect.objectContaining({ + terminalSummary: "wrong parent result", + }), + ); + + await writeTranscript("parent-thread", "right parent result"); + await expect(monitor.reconcileChildTranscript("child-thread")).resolves.toBe(true); + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + status: "succeeded", + terminalSummary: "right parent result", + }), + ); + + client.close(); + }); + + it("reconciles existing running native subagent task rows when a parent registers", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-subagent-")); + const codexHome = path.join(tempDir, "codex-home"); + const transcriptDir = path.join(codexHome, "sessions", "2026", "05", "17"); + await fs.mkdir(transcriptDir, { recursive: true }); + await fs.writeFile( + path.join(transcriptDir, "rollout-2026-05-17T17-14-08-stale-child.jsonl"), + [ + JSON.stringify({ + type: "session_meta", + payload: { + source: { + subagent: { thread_spawn: { parent_thread_id: "parent-thread" } }, + }, + }, + }), + JSON.stringify({ + timestamp: "2026-05-18T00:14:48.094Z", + type: "event_msg", + payload: { + type: "task_complete", + last_agent_message: "stale child final result", + completed_at: 1779063288, + }, + }), + "", + ].join("\n"), + ); + const client = createClient(); + const runtime = createRuntime(); + runtime.listTaskRecords.mockReturnValue([ + { + taskId: "task-1", + runtime: "subagent", + taskKind: "codex-native", + requesterSessionKey: "agent:main:discord:channel:C123", + ownerKey: "agent:main:discord:channel:C123", + scopeKind: "session", + runId: "codex-thread:stale-child", + task: "check the weather", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + createdAt: 1, + }, + ]); + const monitor = new CodexNativeSubagentMonitor(client, runtime, { + codexHome, + transcriptPollDelaysMs: [60_000], + }); + + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + await vi.waitFor(() => { + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + childSessionId: "stale-child", + result: "stale child final result", + }), + ); + }); + + client.close(); + }); + + it("does not rescan transcript directories while a child poll is already scheduled", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-subagent-")); + const codexHome = path.join(tempDir, "codex-home"); + await fs.mkdir(path.join(codexHome, "sessions"), { recursive: true }); + const client = createClient(); + const runtime = createRuntime(); + const readdirSpy = vi.spyOn(fs, "readdir"); + const monitor = new CodexNativeSubagentMonitor(client, runtime, { + codexHome, + taskRowReconcileIntervalMs: 0, + transcriptPollDelaysMs: [60_000], + }); + + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + await notifyChildStarted(client, "parent-thread", "pending-child"); + runtime.listTaskRecords.mockReturnValue([ + { + taskId: "task-1", + runtime: "subagent", + taskKind: "codex-native", + requesterSessionKey: "agent:main:discord:channel:C123", + ownerKey: "agent:main:discord:channel:C123", + scopeKind: "session", + runId: "codex-thread:pending-child", + task: "check the weather", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + createdAt: 1, + }, + ]); + readdirSpy.mockClear(); + await monitor.reconcileKnownTaskRows(); + + expect(readdirSpy).not.toHaveBeenCalled(); + client.close(); + }); + + it("uses one transcript tree scan for multiple pending task rows", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-subagent-")); + const codexHome = path.join(tempDir, "codex-home"); + await fs.mkdir(path.join(codexHome, "sessions"), { recursive: true }); + const client = createClient(); + const runtime = createRuntime(); + runtime.listTaskRecords.mockReturnValue( + ["pending-child-a", "pending-child-b", "pending-child-c"].map((childThreadId, index) => ({ + taskId: `task-${index}`, + runtime: "subagent", + taskKind: "codex-native", + requesterSessionKey: "agent:main:discord:channel:C123", + ownerKey: "agent:main:discord:channel:C123", + scopeKind: "session", + runId: `codex-thread:${childThreadId}`, + task: "check the weather", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + createdAt: 1, + })), + ); + const readdirSpy = vi.spyOn(fs, "readdir"); + const monitor = new CodexNativeSubagentMonitor(client, runtime, { + codexHome, + taskRowReconcileIntervalMs: 0, + transcriptPollDelaysMs: [60_000], + }); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + readdirSpy.mockClear(); + await monitor.reconcileKnownTaskRows(); + + expect(readdirSpy).toHaveBeenCalledTimes(1); + expect(runtime.deliverAgentHarnessTaskCompletion).not.toHaveBeenCalled(); + client.close(); + }); + + it("reconciles completed native subagent transcripts from task rows without live child registration", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-subagent-")); + const codexHome = path.join(tempDir, "codex-home"); + const transcriptDir = path.join(codexHome, "sessions", "2026", "05", "17"); + await fs.mkdir(transcriptDir, { recursive: true }); + await fs.writeFile( + path.join(transcriptDir, "rollout-2026-05-17T19-35-43-unregistered-child.jsonl"), + [ + JSON.stringify({ + timestamp: "2026-05-18T02:35:44.420Z", + type: "session_meta", + payload: { + source: { + subagent: { + thread_spawn: { + parent_thread_id: "parent-thread", + depth: 1, + }, + }, + }, + }, + }), + JSON.stringify({ + timestamp: "2026-05-18T02:36:05.301Z", + type: "event_msg", + payload: { + type: "task_complete", + last_agent_message: "unregistered child final result", + completed_at: 1779071765, + }, + }), + "", + ].join("\n"), + ); + const client = createClient(); + const runtime = createRuntime(); + runtime.listTaskRecords.mockReturnValue([ + { + taskId: "task-1", + runtime: "subagent", + taskKind: "codex-native", + requesterSessionKey: "agent:main:discord:channel:C123", + ownerKey: "agent:main:discord:channel:C123", + scopeKind: "session", + runId: "codex-thread:unregistered-child", + task: "check the weather", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + createdAt: 1, + }, + ]); + const monitor = new CodexNativeSubagentMonitor(client, runtime, { + codexHome, + taskRowReconcileIntervalMs: 0, + }); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await monitor.reconcileKnownTaskRows(); + + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:unregistered-child", + status: "succeeded", + terminalSummary: "unregistered child final result", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + scope: expect.any(Object), + childSessionKey: "codex-thread:unregistered-child", + childSessionId: "unregistered-child", + result: "unregistered child final result", + }), + ); + + client.close(); + }); + + it("reconciles recent terminal native subagent rows that still need parent delivery", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-subagent-")); + const codexHome = path.join(tempDir, "codex-home"); + const transcriptDir = path.join(codexHome, "sessions", "2026", "05", "17"); + await fs.mkdir(transcriptDir, { recursive: true }); + await fs.writeFile( + path.join(transcriptDir, "rollout-2026-05-17T19-50-35-mirror-finalized-child.jsonl"), + [ + JSON.stringify({ + timestamp: "2026-05-18T02:50:36.018Z", + type: "session_meta", + payload: { + source: { + subagent: { + thread_spawn: { + parent_thread_id: "parent-thread", + }, + }, + }, + }, + }), + JSON.stringify({ + timestamp: "2026-05-18T02:57:07.752Z", + type: "event_msg", + payload: { + type: "task_complete", + last_agent_message: "mirror finalized child final result", + completed_at: 1779073027, + }, + }), + "", + ].join("\n"), + ); + const client = createClient(); + const runtime = createRuntime(); + const now = Date.now(); + runtime.listTaskRecords.mockReturnValue([ + { + taskId: "task-1", + runtime: "subagent", + taskKind: "codex-native", + requesterSessionKey: "agent:main:discord:channel:C123", + ownerKey: "agent:main:discord:channel:C123", + scopeKind: "session", + runId: "codex-thread:mirror-finalized-child", + task: "check the weather", + status: "succeeded", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + createdAt: now, + endedAt: now, + lastEventAt: now, + }, + ]); + const monitor = new CodexNativeSubagentMonitor(client, runtime, { + codexHome, + taskRowReconcileIntervalMs: 0, + }); + monitor.registerParent({ + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:discord:channel:C123", + taskRuntimeScope: createTaskScope(), + agentId: "main", + }); + + await monitor.reconcileKnownTaskRows(); + + expect(runtime.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:mirror-finalized-child", + deliveryStatus: "pending", + }), + ); + expect(runtime.deliverAgentHarnessTaskCompletion).toHaveBeenCalledWith( + expect.objectContaining({ + scope: expect.any(Object), + childSessionKey: "codex-thread:mirror-finalized-child", + childSessionId: "mirror-finalized-child", + result: "mirror finalized child final result", + }), + ); + expect(runtime.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:mirror-finalized-child", + deliveryStatus: "delivered", + }), + ); + + client.close(); + }); + + it("registers one monitor per shared app-server client", async () => { + const client = createClient(); + const runtime = createRuntime(); + registerCodexNativeSubagentMonitor({ + client: client as never, + parentThreadId: "parent-1", + requesterSessionKey: "agent:main:main", + taskRuntimeScope: createTaskScope("agent:main:main"), + runtime, + }); + registerCodexNativeSubagentMonitor({ + client: client as never, + parentThreadId: "parent-2", + requesterSessionKey: "agent:main:main", + taskRuntimeScope: createTaskScope("agent:main:main"), + runtime, + }); + + await client.notify({ + method: "thread/started", + params: { + thread: { + id: "child-2", + source: { + subAgent: { + thread_spawn: { + parent_thread_id: "parent-2", + depth: 1, + }, + }, + }, + }, + }, + }); + + expect(runtime.createRunningTaskRun).toHaveBeenCalledTimes(1); + expect(runtime.createRunningTaskRun).toHaveBeenCalledWith( + expect.objectContaining({ runId: "codex-thread:child-2" }), + ); + }); + + it("clears reconcile timers when the app-server client closes", async () => { + vi.useFakeTimers(); + try { + const client = createClient(); + const runtime = createRuntime(); + const monitor = new CodexNativeSubagentMonitor(client, runtime, { + codexHome: "/tmp/codex-home", + taskRowReconcileIntervalMs: 10, + }); + + client.close(); + await vi.advanceTimersByTimeAsync(30); + + expect(runtime.listTaskRecords).not.toHaveBeenCalled(); + monitor.dispose(); + } finally { + vi.useRealTimers(); + } + }); +}); diff --git a/extensions/codex/src/app-server/native-subagent-monitor.ts b/extensions/codex/src/app-server/native-subagent-monitor.ts new file mode 100644 index 000000000000..e6c70bc8ae4b --- /dev/null +++ b/extensions/codex/src/app-server/native-subagent-monitor.ts @@ -0,0 +1,1061 @@ +import { createHash } from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; +import { embeddedAgentLog, formatErrorMessage } from "openclaw/plugin-sdk/agent-harness-runtime"; +import { + createAgentHarnessTaskRuntime, + deliverAgentHarnessTaskCompletion, + isDurableAgentHarnessCompletionDelivery, + type AgentHarnessTaskRuntimeScope, + type AgentHarnessTaskRuntime, + type AgentHarnessTaskRecord, +} from "openclaw/plugin-sdk/agent-harness-task-runtime"; +import type { CodexAppServerClient } from "./client.js"; +import { + extractCodexNativeSubagentCompletions, + type CodexNativeSubagentCompletion, + type CodexNativeSubagentNotificationCompletion, +} from "./native-subagent-notification.js"; +import { + CODEX_NATIVE_SUBAGENT_RUN_ID_PREFIX, + CODEX_NATIVE_SUBAGENT_RUNTIME, + CODEX_NATIVE_SUBAGENT_TASK_KIND, +} from "./native-subagent-task-ids.js"; +import { + codexNativeSubagentRunId, + CodexNativeSubagentTaskMirror, +} from "./native-subagent-task-mirror.js"; +import type { CodexServerNotification, JsonObject, JsonValue } from "./protocol.js"; +import { isJsonObject } from "./protocol.js"; + +type NativeSubagentMonitorRuntime = { + createAgentHarnessTaskRuntime: typeof createAgentHarnessTaskRuntime; + deliverAgentHarnessTaskCompletion: typeof deliverAgentHarnessTaskCompletion; +}; + +type ParentState = { + parentThreadId: string; + requesterSessionKey?: string; + taskRuntimeScope?: AgentHarnessTaskRuntimeScope; + agentId?: string; + taskRuntime?: AgentHarnessTaskRuntime; + mirror?: CodexNativeSubagentTaskMirror; + deliveredCompletionKeys: Set; +}; + +type ChildState = { + childThreadId: string; + parentThreadId: string; + transcriptPath?: string; + transcriptPollAttempt: number; + transcriptPollTimer?: ReturnType; + transcriptTerminal: boolean; + pendingCompletion?: CodexNativeSubagentCompletion; + pendingCompletionEventAt?: number; + completionDeliveryAttempt: number; + completionDeliveryTimer?: ReturnType; + deliveringCompletionKey?: string; +}; + +type TranscriptCompletion = CodexNativeSubagentCompletion & { + parentThreadId?: string; + completedAt?: number; +}; + +type MonitorOptions = { + codexHome?: string; + transcriptPollDelaysMs?: readonly number[]; + completionDeliveryRetryDelaysMs?: readonly number[]; + taskRowReconcileIntervalMs?: number; +}; + +const DEFAULT_TRANSCRIPT_POLL_DELAYS_MS = [ + 2_000, 5_000, 10_000, 15_000, 30_000, 60_000, 120_000, 300_000, +]; +const DEFAULT_COMPLETION_DELIVERY_RETRY_DELAYS_MS = [ + 5_000, 15_000, 30_000, 60_000, 120_000, 300_000, +]; +const DEFAULT_TASK_ROW_RECONCILE_INTERVAL_MS = 10_000; +const RECENT_TERMINAL_TASK_RECONCILE_GRACE_MS = 60_000; + +const defaultRuntime: NativeSubagentMonitorRuntime = { + createAgentHarnessTaskRuntime, + deliverAgentHarnessTaskCompletion, +}; + +const monitors = new WeakMap(); + +export function registerCodexNativeSubagentMonitor(params: { + client: CodexAppServerClient; + parentThreadId: string; + requesterSessionKey?: string; + taskRuntimeScope?: AgentHarnessTaskRuntimeScope; + agentId?: string; + codexHome?: string; + runtime?: NativeSubagentMonitorRuntime; +}): void { + let monitor = monitors.get(params.client); + if (!monitor) { + monitor = new CodexNativeSubagentMonitor(params.client, params.runtime ?? defaultRuntime, { + codexHome: params.codexHome, + }); + monitors.set(params.client, monitor); + } else { + monitor.configure({ codexHome: params.codexHome }); + } + monitor.registerParent({ + parentThreadId: params.parentThreadId, + requesterSessionKey: params.requesterSessionKey, + taskRuntimeScope: params.taskRuntimeScope, + agentId: params.agentId, + }); +} + +export class CodexNativeSubagentMonitor { + private readonly startedAt = Date.now(); + private readonly parentStates = new Map(); + private readonly childThreadParents = new Map(); + private readonly childStates = new Map(); + private readonly childThreadIdsByAgentPath = new Map(); + private readonly transcriptPathsByChildThreadId = new Map(); + private codexHome?: string; + private transcriptPollDelaysMs: readonly number[]; + private completionDeliveryRetryDelaysMs: readonly number[]; + private taskRowReconcileTimer?: ReturnType; + + constructor( + client: Pick, + private readonly runtime: NativeSubagentMonitorRuntime = defaultRuntime, + options: MonitorOptions = {}, + ) { + this.codexHome = normalizeOptionalString(options.codexHome); + this.transcriptPollDelaysMs = + options.transcriptPollDelaysMs ?? DEFAULT_TRANSCRIPT_POLL_DELAYS_MS; + this.completionDeliveryRetryDelaysMs = + options.completionDeliveryRetryDelaysMs ?? DEFAULT_COMPLETION_DELIVERY_RETRY_DELAYS_MS; + this.startTaskRowReconciler( + options.taskRowReconcileIntervalMs ?? DEFAULT_TASK_ROW_RECONCILE_INTERVAL_MS, + ); + client.addNotificationHandler((notification) => this.handleNotification(notification)); + client.addCloseHandler?.(() => this.dispose()); + } + + dispose(): void { + this.clearTimers(); + this.parentStates.clear(); + this.childThreadParents.clear(); + this.childStates.clear(); + this.childThreadIdsByAgentPath.clear(); + this.transcriptPathsByChildThreadId.clear(); + } + + configure(options: MonitorOptions): void { + const codexHome = normalizeOptionalString(options.codexHome); + if (codexHome) { + this.codexHome = codexHome; + } + } + + registerParent(params: { + parentThreadId: string; + requesterSessionKey?: string; + taskRuntimeScope?: AgentHarnessTaskRuntimeScope; + agentId?: string; + }): void { + const parentThreadId = params.parentThreadId.trim(); + if (!parentThreadId) { + return; + } + const existing = this.parentStates.get(parentThreadId); + if (existing) { + existing.requesterSessionKey = params.requesterSessionKey ?? existing.requesterSessionKey; + existing.taskRuntimeScope = params.taskRuntimeScope ?? existing.taskRuntimeScope; + existing.agentId = params.agentId ?? existing.agentId; + this.ensureParentTaskRuntime(existing); + } else { + const state: ParentState = { + parentThreadId, + requesterSessionKey: params.requesterSessionKey, + taskRuntimeScope: params.taskRuntimeScope, + agentId: params.agentId, + deliveredCompletionKeys: new Set(), + }; + this.ensureParentTaskRuntime(state); + this.parentStates.set(parentThreadId, { + ...state, + }); + } + const state = this.parentStates.get(parentThreadId); + if (state) { + void this.reconcileExistingRunningTasksForParent(state); + } + } + + async handleNotification(notification: CodexServerNotification): Promise { + const state = this.resolveMirrorState(notification); + if (state?.mirror) { + try { + state.mirror.handleNotification(notification); + } catch (error) { + embeddedAgentLog.warn("Failed to mirror Codex native subagent lifecycle event", { + method: notification.method, + error: formatErrorMessage(error), + }); + } + } + await this.handleCompletionNotification(notification); + } + + private ensureParentTaskRuntime(state: ParentState): void { + if (state.taskRuntime || !state.requesterSessionKey || !state.taskRuntimeScope) { + return; + } + state.taskRuntime = this.runtime.createAgentHarnessTaskRuntime({ + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + taskKind: CODEX_NATIVE_SUBAGENT_TASK_KIND, + scope: state.taskRuntimeScope, + runIdPrefix: CODEX_NATIVE_SUBAGENT_RUN_ID_PREFIX, + }); + state.mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: state.parentThreadId, + requesterSessionKey: state.requesterSessionKey, + agentId: state.agentId, + }, + state.taskRuntime, + ); + } + + private resolveMirrorState(notification: CodexServerNotification): ParentState | undefined { + const params = isJsonObject(notification.params) ? notification.params : undefined; + if (!params) { + return undefined; + } + if (notification.method === "thread/started") { + const thread = isJsonObject(params.thread) ? params.thread : undefined; + const parentThreadId = readSpawnParentThreadId(thread); + const childThreadId = thread ? readString(thread, "id")?.trim() : undefined; + const agentPath = readSpawnAgentPath(thread); + const state = parentThreadId ? this.parentStates.get(parentThreadId) : undefined; + if (state && childThreadId && parentThreadId) { + this.registerChildThread(parentThreadId, childThreadId, { agentPath }); + } + return state; + } + if (notification.method === "thread/status/changed") { + const childThreadId = readString(params, "threadId")?.trim(); + const parentThreadId = childThreadId ? this.childThreadParents.get(childThreadId) : undefined; + return parentThreadId ? this.parentStates.get(parentThreadId) : undefined; + } + if (notification.method === "item/started" || notification.method === "item/completed") { + const item = isJsonObject(params.item) ? params.item : undefined; + const parentThreadId = item + ? (readString(item, "senderThreadId") ?? readString(params, "threadId"))?.trim() + : undefined; + const state = parentThreadId ? this.parentStates.get(parentThreadId) : undefined; + if (state && parentThreadId) { + const isSpawnAgentTool = normalizeToolName(readString(item, "tool")) === "spawnagent"; + const childThreadIds = isSpawnAgentTool + ? new Set([ + ...readStringArray(item?.receiverThreadIds), + ...readObjectStringKeys(item?.agentsStates), + ]) + : new Set(readStringArray(item?.receiverThreadIds)); + for (const childThreadId of childThreadIds) { + this.registerChildThread(parentThreadId, childThreadId); + } + } + return state; + } + return undefined; + } + + private async handleCompletionNotification(notification: CodexServerNotification): Promise { + const params = isJsonObject(notification.params) ? notification.params : undefined; + const parentThreadId = params ? readString(params, "threadId")?.trim() : undefined; + const state = parentThreadId ? this.parentStates.get(parentThreadId) : undefined; + if (!state) { + return; + } + const completions = extractCodexNativeSubagentCompletions(notification); + for (const nativeCompletion of completions) { + const childThreadId = this.resolveChildThreadIdForAgentPath( + state.parentThreadId, + nativeCompletion.agentPath, + ); + const childState = childThreadId ? this.childStates.get(childThreadId) : undefined; + if (!childState || childState.parentThreadId !== state.parentThreadId) { + embeddedAgentLog.warn( + "Ignoring Codex native subagent completion for unknown child thread", + { + parentThreadId: state.parentThreadId, + agentPath: nativeCompletion.agentPath, + }, + ); + continue; + } + const completion = toThreadCompletion(nativeCompletion, childState.childThreadId); + await this.processCompletion(state, completion); + } + } + + async reconcileChildTranscript( + childThreadId: string, + options: { allowTreeScan?: boolean } = {}, + ): Promise { + const childState = this.childStates.get(childThreadId.trim()); + const state = childState ? this.parentStates.get(childState.parentThreadId) : undefined; + if (!childState || !state || childState.transcriptTerminal) { + return false; + } + const codexHome = this.codexHome; + if (!codexHome) { + return false; + } + const completion = await this.findTranscriptCompletionForChild(childState, options); + if (!completion) { + return false; + } + const transcriptParentThreadId = completion.completion.parentThreadId; + if (transcriptParentThreadId && transcriptParentThreadId !== state.parentThreadId) { + embeddedAgentLog.warn("Codex native subagent transcript parent did not match monitor state", { + childThreadId: childState.childThreadId, + expectedParentThreadId: state.parentThreadId, + transcriptParentThreadId, + }); + childState.transcriptPath = undefined; + this.transcriptPathsByChildThreadId.delete(childState.childThreadId); + return false; + } + await this.processCompletion(state, completion.completion, completion.completion.completedAt); + return true; + } + + private async processCompletion( + state: ParentState, + completion: CodexNativeSubagentCompletion, + eventAt: number = Date.now(), + ): Promise { + this.finalizeCompletionTask(completion, eventAt); + const childState = this.childStates.get(completion.childThreadId); + if (childState) { + childState.transcriptTerminal = true; + if (childState.transcriptPollTimer) { + clearTimeout(childState.transcriptPollTimer); + childState.transcriptPollTimer = undefined; + } + } + if (!state.requesterSessionKey) { + return; + } + const completionKey = buildCompletionDedupeKey(state.parentThreadId, completion); + if (state.deliveredCompletionKeys.has(completionKey)) { + return; + } + const deliveryState = + childState ?? this.ensureChildState(state.parentThreadId, completion.childThreadId); + deliveryState.pendingCompletion = completion; + deliveryState.pendingCompletionEventAt = eventAt; + this.markCompletionDeliveryPending(completion); + await this.deliverPendingCompletion(state, deliveryState); + } + + private async deliverPendingCompletion( + state: ParentState, + childState: ChildState, + ): Promise { + const completion = childState.pendingCompletion; + if (!completion || !state.requesterSessionKey || !state.taskRuntimeScope) { + return; + } + const completionKey = buildCompletionDedupeKey(state.parentThreadId, completion); + if ( + state.deliveredCompletionKeys.has(completionKey) || + childState.deliveringCompletionKey === completionKey + ) { + return; + } + childState.deliveringCompletionKey = completionKey; + try { + const delivery = await this.runtime.deliverAgentHarnessTaskCompletion({ + scope: state.taskRuntimeScope, + childSessionKey: codexNativeSubagentRunId(completion.childThreadId), + childSessionId: completion.childThreadId, + announceId: `codex-native:${state.parentThreadId}:${completion.childThreadId}:${completion.status}`, + announceType: "Codex native subagent", + taskLabel: "Codex native subagent", + status: completion.status, + statusLabel: completion.statusLabel, + result: completion.result, + replyInstruction: + "Use the Codex native subagent result to continue or wrap up the parent task. If this is a Discord/channel session, send the visible response with the message tool instead of only writing a transcript final answer. Reply in your normal assistant voice and do not expose internal notification markup.", + }); + if (isDurableAgentHarnessCompletionDelivery(delivery)) { + state.deliveredCompletionKeys.add(completionKey); + childState.pendingCompletion = undefined; + childState.pendingCompletionEventAt = undefined; + childState.completionDeliveryAttempt = 0; + if (childState.completionDeliveryTimer) { + clearTimeout(childState.completionDeliveryTimer); + childState.completionDeliveryTimer = undefined; + } + this.markCompletionDeliveryDelivered(completion); + return; + } + const error = delivery.error ?? "completion delivery did not produce a parent response"; + this.markCompletionDeliveryPending(completion, error); + this.scheduleCompletionDeliveryRetry(childState); + } catch (error) { + this.markCompletionDeliveryPending(completion, formatErrorMessage(error)); + this.scheduleCompletionDeliveryRetry(childState); + embeddedAgentLog.warn("Failed to deliver Codex native subagent completion", { + parentThreadId: state.parentThreadId, + childThreadId: completion.childThreadId, + error: formatErrorMessage(error), + }); + } finally { + childState.deliveringCompletionKey = undefined; + } + } + + private markCompletionDeliveryPending( + completion: CodexNativeSubagentCompletion, + error?: string, + ): void { + const taskRuntime = this.getTaskRuntimeForChild(completion.childThreadId); + if (!taskRuntime) { + return; + } + taskRuntime.setDetachedTaskDeliveryStatusByRunId({ + runId: codexNativeSubagentRunId(completion.childThreadId), + deliveryStatus: "pending", + ...(error ? { error } : {}), + }); + } + + private markCompletionDeliveryDelivered(completion: CodexNativeSubagentCompletion): void { + const taskRuntime = this.getTaskRuntimeForChild(completion.childThreadId); + if (!taskRuntime) { + return; + } + taskRuntime.setDetachedTaskDeliveryStatusByRunId({ + runId: codexNativeSubagentRunId(completion.childThreadId), + deliveryStatus: "delivered", + }); + } + + private scheduleCompletionDeliveryRetry(childState: ChildState): void { + if (!childState.pendingCompletion || childState.completionDeliveryTimer) { + return; + } + const attempt = childState.completionDeliveryAttempt; + const delayMs = + this.completionDeliveryRetryDelaysMs[ + Math.min(attempt, this.completionDeliveryRetryDelaysMs.length - 1) + ]; + childState.completionDeliveryAttempt += 1; + childState.completionDeliveryTimer = setTimeout(() => { + childState.completionDeliveryTimer = undefined; + const state = this.parentStates.get(childState.parentThreadId); + if (!state) { + return; + } + void this.deliverPendingCompletion(state, childState); + }, delayMs); + unrefTimer(childState.completionDeliveryTimer); + } + + private finalizeCompletionTask(completion: CodexNativeSubagentCompletion, eventAt: number): void { + const taskRuntime = this.getTaskRuntimeForChild(completion.childThreadId); + if (!taskRuntime) { + return; + } + taskRuntime.finalizeTaskRunByRunId({ + runId: codexNativeSubagentRunId(completion.childThreadId), + status: completion.status, + endedAt: eventAt, + lastEventAt: eventAt, + ...(completion.status === "succeeded" ? {} : { error: completion.result }), + progressSummary: completion.result, + terminalSummary: completion.result, + }); + } + + private getTaskRuntimeForChild(childThreadId: string): AgentHarnessTaskRuntime | undefined { + const childState = this.childStates.get(childThreadId.trim()); + const state = childState ? this.parentStates.get(childState.parentThreadId) : undefined; + return state?.taskRuntime; + } + + private registerChildThread( + parentThreadId: string, + childThreadId: string, + options: { agentPath?: string; scheduleTranscriptPoll?: boolean } = {}, + ): void { + const normalizedParentThreadId = parentThreadId.trim(); + const normalizedChildThreadId = childThreadId.trim(); + if (!normalizedParentThreadId || !normalizedChildThreadId) { + return; + } + this.childThreadParents.set(normalizedChildThreadId, normalizedParentThreadId); + this.childThreadIdsByAgentPath.set( + buildParentAgentPathKey(normalizedParentThreadId, normalizedChildThreadId), + normalizedChildThreadId, + ); + const agentPath = normalizeOptionalString(options.agentPath); + if (agentPath) { + this.childThreadIdsByAgentPath.set( + buildParentAgentPathKey(normalizedParentThreadId, agentPath), + normalizedChildThreadId, + ); + } + let childState = this.childStates.get(normalizedChildThreadId); + if (!childState) { + childState = { + childThreadId: normalizedChildThreadId, + parentThreadId: normalizedParentThreadId, + transcriptPollAttempt: 0, + transcriptTerminal: false, + completionDeliveryAttempt: 0, + }; + this.childStates.set(normalizedChildThreadId, childState); + } + if (options.scheduleTranscriptPoll !== false) { + this.scheduleTranscriptPoll(childState); + } + } + + private ensureChildState(parentThreadId: string, childThreadId: string): ChildState { + this.registerChildThread(parentThreadId, childThreadId); + return this.childStates.get(childThreadId.trim())!; + } + + private resolveChildThreadIdForAgentPath( + parentThreadId: string, + agentPath: string, + ): string | undefined { + const mapped = this.childThreadIdsByAgentPath.get( + buildParentAgentPathKey(parentThreadId, agentPath), + ); + if (mapped) { + return mapped; + } + const exactChild = this.childStates.get(agentPath); + return exactChild?.parentThreadId === parentThreadId ? exactChild.childThreadId : undefined; + } + + private scheduleTranscriptPoll(childState: ChildState): void { + if (!this.codexHome || childState.transcriptTerminal || childState.transcriptPollTimer) { + return; + } + const attempt = childState.transcriptPollAttempt; + const delayMs = + this.transcriptPollDelaysMs[Math.min(attempt, this.transcriptPollDelaysMs.length - 1)]; + childState.transcriptPollAttempt += 1; + childState.transcriptPollTimer = setTimeout(() => { + childState.transcriptPollTimer = undefined; + void this.reconcileChildTranscript(childState.childThreadId) + .catch((error) => { + embeddedAgentLog.warn("Failed to reconcile Codex native subagent transcript", { + childThreadId: childState.childThreadId, + error: formatErrorMessage(error), + }); + return false; + }) + .then((reconciled) => { + if (!reconciled) { + this.scheduleTranscriptPoll(childState); + } + }); + }, delayMs); + unrefTimer(childState.transcriptPollTimer); + } + + private clearTimers(): void { + if (this.taskRowReconcileTimer) { + clearInterval(this.taskRowReconcileTimer); + this.taskRowReconcileTimer = undefined; + } + for (const childState of this.childStates.values()) { + if (childState.transcriptPollTimer) { + clearTimeout(childState.transcriptPollTimer); + childState.transcriptPollTimer = undefined; + } + if (childState.completionDeliveryTimer) { + clearTimeout(childState.completionDeliveryTimer); + childState.completionDeliveryTimer = undefined; + } + } + } + + private startTaskRowReconciler(intervalMs: number): void { + if (!Number.isFinite(intervalMs) || intervalMs <= 0) { + return; + } + this.taskRowReconcileTimer = setInterval( + () => { + void this.reconcileKnownTaskRows().catch((error) => { + embeddedAgentLog.warn("Failed to reconcile Codex native subagent task rows", { + error: formatErrorMessage(error), + }); + }); + }, + Math.max(1, Math.floor(intervalMs)), + ); + unrefTimer(this.taskRowReconcileTimer); + } + + async reconcileKnownTaskRows(): Promise { + if (!this.codexHome) { + return; + } + for (const state of this.parentStates.values()) { + await this.reconcileKnownTaskRowsForParent(state); + } + } + + private async reconcileExistingRunningTasksForParent(state: ParentState): Promise { + if (!this.codexHome || !state.taskRuntime) { + return; + } + const tasks = state.taskRuntime.listTaskRecords(); + const candidates: Array<{ childThreadId: string; childState: ChildState }> = []; + for (const task of tasks) { + if (!this.shouldReconcileCodexNativeTask(task)) { + continue; + } + if (state.requesterSessionKey && task.requesterSessionKey !== state.requesterSessionKey) { + continue; + } + const childThreadId = task.runId!.slice(CODEX_NATIVE_SUBAGENT_RUN_ID_PREFIX.length).trim(); + if (!childThreadId) { + continue; + } + this.registerChildThread(state.parentThreadId, childThreadId, { + scheduleTranscriptPoll: false, + }); + const childState = this.childStates.get(childThreadId); + if (childState && !childState.transcriptPollTimer) { + candidates.push({ childThreadId, childState }); + } + } + await this.primeTranscriptPathCacheForChildren(candidates.map(({ childState }) => childState)); + for (const { childThreadId, childState } of candidates) { + const reconciled = await this.reconcileChildTranscript(childThreadId, { + allowTreeScan: false, + }); + if (!reconciled) { + this.scheduleTranscriptPoll(childState); + } + } + } + + private async reconcileKnownTaskRowsForParent(state: ParentState): Promise { + if (!this.codexHome || !state.taskRuntime) { + return; + } + const tasks = state.taskRuntime.listTaskRecords(); + const candidates: Array<{ + task: AgentHarnessTaskRecord; + childThreadId: string; + childState: ChildState; + }> = []; + for (const task of tasks) { + if (!this.shouldReconcileCodexNativeTask(task)) { + continue; + } + const childThreadId = task.runId!.slice(CODEX_NATIVE_SUBAGENT_RUN_ID_PREFIX.length).trim(); + if (!childThreadId) { + continue; + } + this.registerChildThread(state.parentThreadId, childThreadId, { + scheduleTranscriptPoll: false, + }); + const childState = this.childStates.get(childThreadId); + if (!childState || childState.transcriptPollTimer) { + continue; + } + candidates.push({ task, childThreadId, childState }); + } + await this.primeTranscriptPathCacheForChildren(candidates.map(({ childState }) => childState)); + for (const { task, childThreadId, childState } of candidates) { + const transcriptCompletion = await this.findTranscriptCompletionForChild(childState, { + allowTreeScan: false, + }); + if (!transcriptCompletion) { + this.scheduleTranscriptPoll(childState); + continue; + } + const parentThreadId = + transcriptCompletion.completion.parentThreadId ?? + this.childThreadParents.get(childThreadId); + if (!parentThreadId) { + embeddedAgentLog.warn("Codex native subagent transcript did not include a parent thread", { + childThreadId, + transcriptPath: transcriptCompletion.transcriptPath, + }); + continue; + } + if (parentThreadId !== state.parentThreadId) { + continue; + } + state.agentId = state.agentId ?? task.agentId; + await this.processCompletion( + state, + transcriptCompletion.completion, + transcriptCompletion.completion.completedAt, + ); + } + } + + private shouldReconcileCodexNativeTask(task: AgentHarnessTaskRecord): boolean { + if ( + task.runtime !== "subagent" || + task.taskKind !== "codex-native" || + !task.runId?.startsWith(CODEX_NATIVE_SUBAGENT_RUN_ID_PREFIX) + ) { + return false; + } + if ( + task.status === "running" || + task.status === "queued" || + task.deliveryStatus === "pending" + ) { + return true; + } + return task.deliveryStatus === "not_applicable" && this.isRecentTerminalTask(task); + } + + private isRecentTerminalTask(task: AgentHarnessTaskRecord): boolean { + if ( + task.status !== "succeeded" && + task.status !== "failed" && + task.status !== "timed_out" && + task.status !== "cancelled" && + task.status !== "lost" + ) { + return false; + } + const earliestRelevantAt = this.startedAt - RECENT_TERMINAL_TASK_RECONCILE_GRACE_MS; + return [task.createdAt, task.startedAt, task.endedAt, task.lastEventAt].some( + (timestamp) => typeof timestamp === "number" && timestamp >= earliestRelevantAt, + ); + } + + private async primeTranscriptPathCacheForChildren( + childStates: readonly ChildState[], + ): Promise { + const codexHome = this.codexHome; + if (!codexHome) { + return; + } + const missingChildThreadIds = new Set( + childStates + .filter( + (childState) => + !childState.transcriptPath && + !this.transcriptPathsByChildThreadId.has(childState.childThreadId), + ) + .map((childState) => childState.childThreadId), + ); + if (missingChildThreadIds.size === 0) { + return; + } + const transcriptPaths = await findTranscriptPaths({ + codexHome, + childThreadIds: missingChildThreadIds, + }); + for (const [childThreadId, transcriptPath] of transcriptPaths) { + this.transcriptPathsByChildThreadId.set(childThreadId, transcriptPath); + const childState = this.childStates.get(childThreadId); + if (childState) { + childState.transcriptPath = transcriptPath; + } + } + } + + private async findTranscriptCompletionForChild( + childState: ChildState, + options: { allowTreeScan?: boolean } = {}, + ): Promise<{ transcriptPath: string; completion: TranscriptCompletion } | undefined> { + const codexHome = this.codexHome; + if (!codexHome) { + return undefined; + } + const transcriptPath = + childState.transcriptPath ?? + this.transcriptPathsByChildThreadId.get(childState.childThreadId); + const completion = await findTranscriptCompletion({ + codexHome, + childThreadId: childState.childThreadId, + transcriptPath, + allowTreeScan: options.allowTreeScan ?? true, + }); + if (completion) { + childState.transcriptPath = completion.transcriptPath; + this.transcriptPathsByChildThreadId.set(childState.childThreadId, completion.transcriptPath); + } + return completion; + } +} + +function buildCompletionDedupeKey( + parentThreadId: string, + completion: CodexNativeSubagentCompletion, +): string { + const hash = createHash("sha256").update(completion.result).digest("hex").slice(0, 16); + return `${parentThreadId}:${completion.childThreadId}:${completion.status}:${hash}`; +} + +function buildParentAgentPathKey(parentThreadId: string, agentPath: string): string { + return `${parentThreadId}\0${agentPath}`; +} + +function toThreadCompletion( + completion: CodexNativeSubagentNotificationCompletion, + childThreadId: string, +): CodexNativeSubagentCompletion { + return { + childThreadId, + status: completion.status, + statusLabel: completion.statusLabel, + result: completion.result, + }; +} + +function readSpawnParentThreadId(thread: JsonObject | undefined): string | undefined { + const source = isJsonObject(thread?.source) ? thread.source : undefined; + const subAgent = isJsonObject(source?.subAgent) ? source.subAgent : undefined; + const spawn = isJsonObject(subAgent?.thread_spawn) ? subAgent.thread_spawn : undefined; + return readString(spawn, "parent_thread_id")?.trim(); +} + +function readSpawnAgentPath(thread: JsonObject | undefined): string | undefined { + const source = isJsonObject(thread?.source) ? thread.source : undefined; + const subAgent = isJsonObject(source?.subAgent) ? source.subAgent : undefined; + const spawn = isJsonObject(subAgent?.thread_spawn) ? subAgent.thread_spawn : undefined; + return readString(spawn, "agent_path")?.trim(); +} + +function readString(record: JsonObject | undefined, key: string): string | undefined { + const value = record?.[key]; + return typeof value === "string" ? value : undefined; +} + +function normalizeOptionalString(value: string | undefined): string | undefined { + const normalized = value?.trim(); + return normalized || undefined; +} + +function readStringArray(value: unknown): string[] { + if (!Array.isArray(value)) { + return []; + } + return value.filter((entry): entry is string => typeof entry === "string" && entry.trim() !== ""); +} + +function readObjectStringKeys(value: JsonValue | undefined): string[] { + if (!isJsonObject(value)) { + return []; + } + return Object.keys(value).filter((entry) => entry.trim() !== ""); +} + +function normalizeToolName(value: string | undefined): string | undefined { + return value?.replace(/[^a-z0-9]/giu, "").toLowerCase(); +} + +async function findTranscriptCompletion(params: { + codexHome: string; + childThreadId: string; + transcriptPath?: string; + allowTreeScan?: boolean; +}): Promise< + | { + transcriptPath: string; + completion: TranscriptCompletion; + } + | undefined +> { + const transcriptPath = + params.transcriptPath ?? + (params.allowTreeScan === false + ? undefined + : await findTranscriptPath({ + codexHome: params.codexHome, + childThreadId: params.childThreadId, + })); + if (!transcriptPath) { + return undefined; + } + const completion = await readTranscriptCompletion(transcriptPath, params.childThreadId); + return completion ? { transcriptPath, completion } : undefined; +} + +async function findTranscriptPaths(params: { + codexHome: string; + childThreadIds: ReadonlySet; +}): Promise> { + const sessionsDir = path.join(params.codexHome, "sessions"); + const found = new Map(); + const stack = [sessionsDir]; + while (stack.length > 0 && found.size < params.childThreadIds.size) { + const dir = stack.pop()!; + let entries: Array<{ name: string; isDirectory(): boolean; isFile(): boolean }>; + try { + entries = await fs.readdir(dir, { withFileTypes: true }); + } catch { + continue; + } + for (const entry of entries) { + const entryPath = path.join(dir, entry.name); + if (entry.isDirectory()) { + stack.push(entryPath); + continue; + } + if (!entry.isFile() || !entry.name.endsWith(".jsonl")) { + continue; + } + for (const childThreadId of params.childThreadIds) { + if (!found.has(childThreadId) && entry.name.includes(childThreadId)) { + found.set(childThreadId, entryPath); + } + } + } + } + return found; +} + +async function findTranscriptPath(params: { + codexHome: string; + childThreadId: string; +}): Promise { + const sessionsDir = path.join(params.codexHome, "sessions"); + const stack = [sessionsDir]; + while (stack.length > 0) { + const dir = stack.pop()!; + let entries: Array<{ name: string; isDirectory(): boolean; isFile(): boolean }>; + try { + entries = await fs.readdir(dir, { withFileTypes: true }); + } catch { + continue; + } + for (const entry of entries) { + const entryPath = path.join(dir, entry.name); + if (entry.isDirectory()) { + stack.push(entryPath); + continue; + } + if ( + entry.isFile() && + entry.name.endsWith(".jsonl") && + entry.name.includes(params.childThreadId) + ) { + return entryPath; + } + } + } + return undefined; +} + +async function readTranscriptCompletion( + transcriptPath: string, + childThreadId: string, +): Promise { + let contents: string; + try { + contents = await fs.readFile(transcriptPath, "utf8"); + } catch { + return undefined; + } + let parentThreadId: string | undefined; + let completion: TranscriptCompletion | undefined; + for (const line of contents.split(/\r?\n/u)) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + let entry: JsonValue; + try { + entry = JSON.parse(trimmed); + } catch { + continue; + } + if (!isJsonObject(entry)) { + continue; + } + const payload = isJsonObject(entry.payload) ? entry.payload : undefined; + if (!payload) { + continue; + } + if (readString(entry, "type") === "session_meta") { + parentThreadId = readTranscriptParentThreadId(payload) ?? parentThreadId; + continue; + } + if (readString(entry, "type") !== "event_msg") { + continue; + } + const payloadType = readString(payload, "type"); + if (payloadType === "task_complete") { + completion = { + childThreadId, + parentThreadId, + status: "succeeded", + statusLabel: "task_complete", + result: + readString(payload, "last_agent_message")?.trim() || + readString(payload, "message")?.trim() || + "(no output)", + completedAt: secondsToMillis(readNumber(payload, "completed_at")) ?? readTimestamp(entry), + }; + } else if (payloadType === "task_failed") { + const result = + readString(payload, "last_agent_message")?.trim() || + readString(payload, "error")?.trim() || + readString(payload, "message")?.trim() || + "Codex native subagent failed."; + completion = { + childThreadId, + parentThreadId, + status: "failed", + statusLabel: "task_failed", + result, + completedAt: readTimestamp(entry), + }; + } + } + return completion; +} + +function readTranscriptParentThreadId(payload: JsonObject): string | undefined { + const source = isJsonObject(payload.source) ? payload.source : undefined; + const subagent = + (isJsonObject(source?.subagent) ? source.subagent : undefined) ?? + (isJsonObject(source?.subAgent) ? source.subAgent : undefined); + const spawn = isJsonObject(subagent?.thread_spawn) ? subagent.thread_spawn : undefined; + return readString(spawn, "parent_thread_id")?.trim(); +} + +function readNumber(record: JsonObject, key: string): number | undefined { + const value = record[key]; + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function secondsToMillis(value: number | undefined): number | undefined { + return value === undefined ? undefined : Math.round(value * 1000); +} + +function readTimestamp(entry: JsonObject): number | undefined { + const timestamp = readString(entry, "timestamp"); + if (!timestamp) { + return undefined; + } + const parsed = Date.parse(timestamp); + return Number.isFinite(parsed) ? parsed : undefined; +} + +function unrefTimer(timer: ReturnType): void { + if (typeof timer === "object" && timer && "unref" in timer) { + (timer as { unref: () => void }).unref(); + } +} diff --git a/extensions/codex/src/app-server/native-subagent-notification.test.ts b/extensions/codex/src/app-server/native-subagent-notification.test.ts new file mode 100644 index 000000000000..be7b2007d37a --- /dev/null +++ b/extensions/codex/src/app-server/native-subagent-notification.test.ts @@ -0,0 +1,176 @@ +import { describe, expect, it } from "vitest"; +import { + extractCodexNativeSubagentCompletions, + extractCodexNativeSubagentCompletionsFromText, +} from "./native-subagent-notification.js"; + +function trustedInterAgentNotification(params: { + agentPath: string; + text: string; + threadId?: string; +}) { + return { + method: "rawResponseItem/completed", + params: { + threadId: params.threadId ?? "parent-thread", + item: { + type: "message", + role: "assistant", + phase: "commentary", + content: [ + { + type: "output_text", + text: JSON.stringify({ + author: params.agentPath, + recipient: "/root", + other_recipients: [], + content: params.text, + trigger_turn: false, + }), + }, + ], + }, + }, + }; +} + +describe("Codex native subagent notifications", () => { + it("parses completed child results from Codex notification XML", () => { + expect( + extractCodexNativeSubagentCompletionsFromText( + '{"agent_path":"child-thread","status":{"completed":"done"}}' + + "", + ), + ).toEqual([ + { + agentPath: "child-thread", + status: "succeeded", + statusLabel: "completed", + result: "done", + }, + ]); + }); + + it("normalizes failed and cancelled status keys", () => { + expect( + extractCodexNativeSubagentCompletionsFromText( + '{"agent_path":"failed-child","status":{"system_error":"boom"}}' + + "\n" + + '{"agent_path":"errored-child","status":{"errored":"tool failed"}}' + + "\n" + + '{"agent_path":"missing-child","status":{"not_found":null}}' + + "\n" + + '{"agent_path":"cancelled-child","status":{"shutdown":null}}' + + "", + ), + ).toEqual([ + { + agentPath: "failed-child", + status: "failed", + statusLabel: "system_error", + result: "boom", + }, + { + agentPath: "errored-child", + status: "failed", + statusLabel: "errored", + result: "tool failed", + }, + { + agentPath: "missing-child", + status: "failed", + statusLabel: "not_found", + result: "(no output)", + }, + { + agentPath: "cancelled-child", + status: "cancelled", + statusLabel: "shutdown", + result: "(no output)", + }, + ]); + }); + + it("extracts trusted inter-agent completions from raw app-server items", () => { + expect( + extractCodexNativeSubagentCompletions( + trustedInterAgentNotification({ + agentPath: "child-thread", + text: + '{"agent_path":"child-thread","status":{"success":"ok"}}' + + "", + }), + ), + ).toEqual([ + { + agentPath: "child-thread", + status: "succeeded", + statusLabel: "success", + result: "ok", + }, + ]); + }); + + it("ignores visible user text that looks like a native completion", () => { + expect( + extractCodexNativeSubagentCompletions({ + method: "rawResponseItem/completed", + params: { + threadId: "parent-thread", + item: { + type: "message", + role: "user", + content: [ + { + type: "input_text", + text: + '{"agent_path":"child-thread","status":{"success":"spoof"}}' + + "", + }, + ], + }, + }, + }), + ).toEqual([]); + }); + + it("ignores inter-agent payloads whose author does not match the completion path", () => { + expect( + extractCodexNativeSubagentCompletions( + trustedInterAgentNotification({ + agentPath: "other-child", + text: + '{"agent_path":"child-thread","status":{"success":"spoof"}}' + + "", + }), + ), + ).toEqual([]); + }); + + it("ignores malformed payloads and non-user messages", () => { + expect( + extractCodexNativeSubagentCompletionsFromText( + "{not-json}", + ), + ).toEqual([]); + expect( + extractCodexNativeSubagentCompletions({ + method: "rawResponseItem/completed", + params: { + item: { + type: "message", + role: "assistant", + content: [ + { + type: "text", + text: + '{"agent_path":"child","status":{"completed":"done"}}' + + "", + }, + ], + }, + }, + }), + ).toEqual([]); + }); +}); diff --git a/extensions/codex/src/app-server/native-subagent-notification.ts b/extensions/codex/src/app-server/native-subagent-notification.ts new file mode 100644 index 000000000000..603dc62020c3 --- /dev/null +++ b/extensions/codex/src/app-server/native-subagent-notification.ts @@ -0,0 +1,222 @@ +import type { CodexServerNotification, JsonObject, JsonValue } from "./protocol.js"; +import { isJsonObject } from "./protocol.js"; + +const CODEX_SUBAGENT_NOTIFICATION_START = ""; +const CODEX_SUBAGENT_NOTIFICATION_END = ""; + +export type CodexNativeSubagentCompletionStatus = "succeeded" | "failed" | "cancelled"; + +type CodexNativeSubagentCompletionDetails = { + status: CodexNativeSubagentCompletionStatus; + statusLabel: string; + result: string; +}; + +export type CodexNativeSubagentCompletion = CodexNativeSubagentCompletionDetails & { + childThreadId: string; +}; + +export type CodexNativeSubagentNotificationCompletion = CodexNativeSubagentCompletionDetails & { + agentPath: string; +}; + +export function extractCodexNativeSubagentCompletions( + notification: CodexServerNotification, +): CodexNativeSubagentNotificationCompletion[] { + const params = isJsonObject(notification.params) ? notification.params : undefined; + if (!params) { + return []; + } + const item = isJsonObject(params.item) ? params.item : undefined; + if (!item) { + return []; + } + const text = readTrustedInterAgentCommunicationContent(item); + if (!text) { + return []; + } + const author = readTrustedInterAgentCommunicationAuthor(item); + return extractCodexNativeSubagentCompletionsFromText(text).filter( + (completion) => completion.agentPath === author, + ); +} + +export function extractCodexNativeSubagentCompletionsFromText( + text: string, +): CodexNativeSubagentNotificationCompletion[] { + const completions: CodexNativeSubagentNotificationCompletion[] = []; + let cursor = 0; + while (cursor < text.length) { + const start = text.indexOf(CODEX_SUBAGENT_NOTIFICATION_START, cursor); + if (start < 0) { + break; + } + const bodyStart = start + CODEX_SUBAGENT_NOTIFICATION_START.length; + const end = text.indexOf(CODEX_SUBAGENT_NOTIFICATION_END, bodyStart); + if (end < 0) { + break; + } + const parsed = parseCodexNativeSubagentNotificationBody(text.slice(bodyStart, end)); + if (parsed) { + completions.push(parsed); + } + cursor = end + CODEX_SUBAGENT_NOTIFICATION_END.length; + } + return completions; +} + +function parseCodexNativeSubagentNotificationBody( + body: string, +): CodexNativeSubagentNotificationCompletion | undefined { + let payload: JsonValue; + try { + payload = JSON.parse(body.trim()); + } catch { + return undefined; + } + if (!isJsonObject(payload)) { + return undefined; + } + const agentPath = readString(payload, "agent_path")?.trim(); + const status = isJsonObject(payload.status) ? payload.status : undefined; + if (!agentPath || !status) { + return undefined; + } + const statusEntry = readCompletionStatus(status); + if (!statusEntry) { + return undefined; + } + return { + agentPath, + status: statusEntry.status, + statusLabel: statusEntry.label, + result: statusEntry.result, + }; +} + +function readCompletionStatus(status: JsonObject): + | { + status: CodexNativeSubagentCompletionStatus; + label: string; + result: string; + } + | undefined { + for (const [rawKey, value] of Object.entries(status)) { + const normalized = normalizeStatusKey(rawKey); + const mappedStatus = mapCompletionStatus(normalized); + if (!mappedStatus) { + continue; + } + return { + status: mappedStatus, + label: rawKey, + result: stringifyResult(value), + }; + } + return undefined; +} + +function mapCompletionStatus(value: string): CodexNativeSubagentCompletionStatus | undefined { + if (value === "completed" || value === "succeeded" || value === "success") { + return "succeeded"; + } + if ( + value === "cancelled" || + value === "canceled" || + value === "interrupted" || + value === "shutdown" + ) { + return "cancelled"; + } + if ( + value === "failed" || + value === "error" || + value === "errored" || + value === "systemerror" || + value === "notfound" + ) { + return "failed"; + } + return undefined; +} + +function stringifyResult(value: JsonValue | undefined): string { + if (typeof value === "string") { + return value.trim() || "(no output)"; + } + if (value === null || value === undefined) { + return "(no output)"; + } + try { + return JSON.stringify(value); + } catch { + return "(unserializable output)"; + } +} + +function readTrustedInterAgentCommunicationContent(item: JsonObject): string | undefined { + const communication = readTrustedInterAgentCommunication(item); + return typeof communication?.content === "string" ? communication.content : undefined; +} + +function readTrustedInterAgentCommunicationAuthor(item: JsonObject): string | undefined { + const communication = readTrustedInterAgentCommunication(item); + return typeof communication?.author === "string" ? communication.author : undefined; +} + +function readTrustedInterAgentCommunication(item: JsonObject): JsonObject | undefined { + if ( + readString(item, "type") !== "message" || + readString(item, "role") !== "assistant" || + readString(item, "phase") !== "commentary" + ) { + return undefined; + } + const text = extractSingleTextPart(item); + if (!text) { + return undefined; + } + let parsed: JsonValue; + try { + parsed = JSON.parse(text); + } catch { + return undefined; + } + if (!isJsonObject(parsed)) { + return undefined; + } + if ( + typeof parsed.author !== "string" || + typeof parsed.recipient !== "string" || + typeof parsed.content !== "string" || + parsed.trigger_turn !== false + ) { + return undefined; + } + return parsed; +} + +function extractSingleTextPart(item: JsonObject): string | undefined { + const content = item.content; + if (!Array.isArray(content) || content.length !== 1) { + return undefined; + } + const [entry] = content; + if (!isJsonObject(entry)) { + return undefined; + } + const type = readString(entry, "type"); + if (type !== "output_text" && type !== "text") { + return undefined; + } + return readString(entry, "text")?.trim(); +} + +function readString(record: JsonObject, key: string): string | undefined { + const value = record[key]; + return typeof value === "string" ? value : undefined; +} + +function normalizeStatusKey(value: string): string { + return value.replace(/[^a-z0-9]/giu, "").toLowerCase(); +} diff --git a/extensions/codex/src/app-server/native-subagent-task-ids.ts b/extensions/codex/src/app-server/native-subagent-task-ids.ts new file mode 100644 index 000000000000..88ff05819e8a --- /dev/null +++ b/extensions/codex/src/app-server/native-subagent-task-ids.ts @@ -0,0 +1,3 @@ +export const CODEX_NATIVE_SUBAGENT_RUNTIME = "subagent"; +export const CODEX_NATIVE_SUBAGENT_TASK_KIND = "codex-native"; +export const CODEX_NATIVE_SUBAGENT_RUN_ID_PREFIX = "codex-thread:"; diff --git a/extensions/codex/src/app-server/native-subagent-task-mirror.test.ts b/extensions/codex/src/app-server/native-subagent-task-mirror.test.ts index 7d826eed3080..459c9a3630e9 100644 --- a/extensions/codex/src/app-server/native-subagent-task-mirror.test.ts +++ b/extensions/codex/src/app-server/native-subagent-task-mirror.test.ts @@ -50,12 +50,7 @@ describe("CodexNativeSubagentTaskMirror", () => { }); expect(runtime.createRunningTaskRun).toHaveBeenCalledWith({ - runtime: "subagent", - taskKind: "codex-native", sourceId: "codex-thread:child-thread", - requesterSessionKey: "agent:main:main", - ownerKey: "agent:main:main", - scopeKind: "session", agentId: "main", runId: "codex-thread:child-thread", label: "Poincare", @@ -72,7 +67,6 @@ describe("CodexNativeSubagentTaskMirror", () => { ); expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", lastEventAt: 20_000, progressSummary: "Codex native subagent is active.", }); @@ -170,7 +164,6 @@ describe("CodexNativeSubagentTaskMirror", () => { expect(runtime.finalizeTaskRunByRunId).toHaveBeenNthCalledWith(1, { runId: codexNativeSubagentRunId("child-thread"), - runtime: "subagent", status: "succeeded", endedAt: 30_000, lastEventAt: 30_000, @@ -179,7 +172,6 @@ describe("CodexNativeSubagentTaskMirror", () => { }); expect(runtime.finalizeTaskRunByRunId).toHaveBeenNthCalledWith(2, { runId: codexNativeSubagentRunId("failed-child"), - runtime: "subagent", status: "failed", endedAt: 30_000, lastEventAt: 30_000, @@ -237,12 +229,7 @@ describe("CodexNativeSubagentTaskMirror", () => { }); expect(runtime.createRunningTaskRun).toHaveBeenCalledWith({ - runtime: "subagent", - taskKind: "codex-native", sourceId: "codex-thread:child-thread", - requesterSessionKey: "agent:main:main", - ownerKey: "agent:main:main", - scopeKind: "session", runId: "codex-thread:child-thread", label: "Codex subagent", task: "write the proof file", @@ -258,13 +245,11 @@ describe("CodexNativeSubagentTaskMirror", () => { ); expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", lastEventAt: 40_000, progressSummary: "Codex native subagent is initializing.", }); expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", status: "succeeded", endedAt: 40_000, lastEventAt: 40_000, @@ -273,6 +258,82 @@ describe("CodexNativeSubagentTaskMirror", () => { }); }); + it("uses the notification thread id when collab agent items omit sender thread id", () => { + const runtime = createRuntime(); + const mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + now: () => 42_000, + }, + runtime, + ); + + mirror.handleNotification({ + method: "item/started", + params: { + threadId: "parent-thread", + item: { + type: "collabAgentToolCall", + tool: "spawn_agent", + receiverThreadIds: ["child-thread"], + prompt: "inspect one thing", + }, + }, + }); + + expect(runtime.createRunningTaskRun).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + task: "inspect one thing", + }), + ); + }); + + it("creates spawn tasks from collab agent states when receiver thread ids are absent", () => { + const runtime = createRuntime(); + const mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + now: () => 43_000, + }, + runtime, + ); + + mirror.handleNotification({ + method: "item/completed", + params: { + threadId: "parent-thread", + item: { + type: "collabAgentToolCall", + tool: "spawn_agent", + prompt: "inspect one thing", + agentsStates: { + "child-thread": { + status: "completed", + message: "done", + }, + }, + }, + }, + }); + + expect(runtime.createRunningTaskRun).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + task: "inspect one thing", + }), + ); + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + status: "succeeded", + terminalSummary: "done", + }), + ); + }); + it("finalizes stale collab agent state from the blocked tool call status", () => { const runtime = createRuntime(); const mirror = new CodexNativeSubagentTaskMirror( @@ -306,13 +367,11 @@ describe("CodexNativeSubagentTaskMirror", () => { expect(runtime.recordTaskRunProgressByRunId).not.toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", lastEventAt: 45_000, progressSummary: "Native hook relay unavailable", }); expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", status: "succeeded", endedAt: 45_000, lastEventAt: 45_000, @@ -355,7 +414,6 @@ describe("CodexNativeSubagentTaskMirror", () => { expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", lastEventAt: 46_000, progressSummary: "Codex native subagent is initializing.", }); @@ -394,7 +452,6 @@ describe("CodexNativeSubagentTaskMirror", () => { expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", lastEventAt: 47_000, progressSummary: "wait timed out", }); @@ -441,7 +498,6 @@ describe("CodexNativeSubagentTaskMirror", () => { expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledTimes(1); expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", status: "succeeded", endedAt: 50_000, lastEventAt: 50_000, @@ -490,7 +546,6 @@ describe("CodexNativeSubagentTaskMirror", () => { expect(runtime.finalizeTaskRunByRunId).toHaveBeenNthCalledWith(1, { runId: "codex-thread:child-thread", - runtime: "subagent", status: "succeeded", endedAt: 55_000, lastEventAt: 55_000, @@ -499,7 +554,6 @@ describe("CodexNativeSubagentTaskMirror", () => { }); expect(runtime.finalizeTaskRunByRunId).toHaveBeenNthCalledWith(2, { runId: "codex-thread:child-thread", - runtime: "subagent", status: "failed", endedAt: 55_000, lastEventAt: 55_000, @@ -556,13 +610,11 @@ describe("CodexNativeSubagentTaskMirror", () => { expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", lastEventAt: 60_000, progressSummary: "Codex native subagent is initializing.", }); expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith({ runId: "codex-thread:child-thread", - runtime: "subagent", status: "succeeded", endedAt: 60_000, lastEventAt: 60_000, diff --git a/extensions/codex/src/app-server/native-subagent-task-mirror.ts b/extensions/codex/src/app-server/native-subagent-task-mirror.ts index 51fdddbbcc52..1f7486c7a182 100644 --- a/extensions/codex/src/app-server/native-subagent-task-mirror.ts +++ b/extensions/codex/src/app-server/native-subagent-task-mirror.ts @@ -1,11 +1,5 @@ -import { - CODEX_NATIVE_SUBAGENT_RUN_ID_PREFIX, - CODEX_NATIVE_SUBAGENT_RUNTIME, - CODEX_NATIVE_SUBAGENT_TASK_KIND, - createRunningTaskRun, - finalizeTaskRunByRunId, - recordTaskRunProgressByRunId, -} from "openclaw/plugin-sdk/codex-native-task-runtime"; +import type { AgentHarnessTaskRuntime } from "openclaw/plugin-sdk/agent-harness-task-runtime"; +import { CODEX_NATIVE_SUBAGENT_RUN_ID_PREFIX } from "./native-subagent-task-ids.js"; import type { CodexServerNotification, CodexSessionSource, @@ -19,11 +13,10 @@ import type { } from "./protocol.js"; import { isJsonObject } from "./protocol.js"; -export type TaskLifecycleRuntime = { - createRunningTaskRun: typeof createRunningTaskRun; - recordTaskRunProgressByRunId: typeof recordTaskRunProgressByRunId; - finalizeTaskRunByRunId: typeof finalizeTaskRunByRunId; -}; +export type TaskLifecycleRuntime = Pick< + AgentHarnessTaskRuntime, + "createRunningTaskRun" | "recordTaskRunProgressByRunId" | "finalizeTaskRunByRunId" +>; export type CodexNativeSubagentTaskMirrorParams = { parentThreadId: string; @@ -32,12 +25,6 @@ export type CodexNativeSubagentTaskMirrorParams = { now?: () => number; }; -const defaultRuntime: TaskLifecycleRuntime = { - createRunningTaskRun, - recordTaskRunProgressByRunId, - finalizeTaskRunByRunId, -}; - export class CodexNativeSubagentTaskMirror { private readonly mirroredThreadIds = new Set(); private readonly terminalRunIds = new Set(); @@ -45,7 +32,7 @@ export class CodexNativeSubagentTaskMirror { constructor( private readonly params: CodexNativeSubagentTaskMirrorParams, - private readonly runtime: TaskLifecycleRuntime = defaultRuntime, + private readonly runtime: TaskLifecycleRuntime, ) { this.now = params.now ?? Date.now; } @@ -95,16 +82,7 @@ export class CodexNativeSubagentTaskMirror { `Codex native subagent${label === "Codex subagent" ? "" : ` ${label}`}`; const createdAt = secondsToMillis(thread.createdAt) ?? this.now(); this.runtime.createRunningTaskRun({ - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, - taskKind: CODEX_NATIVE_SUBAGENT_TASK_KIND, sourceId: runId, - requesterSessionKey: this.params.requesterSessionKey, - ...(this.params.requesterSessionKey - ? { - ownerKey: this.params.requesterSessionKey, - scopeKind: "session" as const, - } - : {}), agentId: this.params.agentId, runId, label, @@ -140,7 +118,6 @@ export class CodexNativeSubagentTaskMirror { if (statusType === "active") { this.runtime.recordTaskRunProgressByRunId({ runId, - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, lastEventAt: eventAt, progressSummary: "Codex native subagent is active.", }); @@ -150,7 +127,6 @@ export class CodexNativeSubagentTaskMirror { this.terminalRunIds.add(runId); this.runtime.finalizeTaskRunByRunId({ runId, - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, status: "succeeded", endedAt: eventAt, lastEventAt: eventAt, @@ -163,7 +139,6 @@ export class CodexNativeSubagentTaskMirror { this.terminalRunIds.add(runId); this.runtime.finalizeTaskRunByRunId({ runId, - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, status: "failed", endedAt: eventAt, lastEventAt: eventAt, @@ -176,7 +151,6 @@ export class CodexNativeSubagentTaskMirror { if (statusType === "notLoaded") { this.runtime.recordTaskRunProgressByRunId({ runId, - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, lastEventAt: eventAt, progressSummary: "Codex native subagent is not loaded.", }); @@ -188,21 +162,23 @@ export class CodexNativeSubagentTaskMirror { if (!item || readString(item, "type") !== "collabAgentToolCall") { return; } - if (readString(item, "senderThreadId") !== this.params.parentThreadId) { + const senderThreadId = readString(item, "senderThreadId") ?? readString(params, "threadId"); + if (senderThreadId !== this.params.parentThreadId) { return; } - const receiverThreadIds = readStringArray(item.receiverThreadIds); const isSpawnAgentTool = normalizeToolName(readString(item, "tool")) === "spawnagent"; + const receiverThreadIds = readStringArray(item.receiverThreadIds); + const agentsStates = readAgentsStates(item.agentsStates); + const spawnChildThreadIds = new Set([...receiverThreadIds, ...agentsStates.keys()]); if (isSpawnAgentTool) { - for (const receiverThreadId of receiverThreadIds) { - this.createTaskFromCollabSpawnItem(receiverThreadId, item); + for (const childThreadId of spawnChildThreadIds) { + this.createTaskFromCollabSpawnItem(childThreadId, item); } } - const agentsStates = readAgentsStates(item.agentsStates); const toolCallStatus = normalizeCollabToolCallStatus(readString(item, "status")); const terminalToolCallThreadIds = new Set(); if (isSpawnAgentTool && isBlockedOrFailedCollabToolCallStatus(toolCallStatus)) { - for (const threadId of receiverThreadIds) { + for (const threadId of spawnChildThreadIds) { terminalToolCallThreadIds.add(threadId); } for (const threadId of agentsStates.keys()) { @@ -244,16 +220,7 @@ export class CodexNativeSubagentTaskMirror { const runId = codexNativeSubagentRunId(normalizedThreadId); const createdAt = this.now(); this.runtime.createRunningTaskRun({ - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, - taskKind: CODEX_NATIVE_SUBAGENT_TASK_KIND, sourceId: runId, - requesterSessionKey: this.params.requesterSessionKey, - ...(this.params.requesterSessionKey - ? { - ownerKey: this.params.requesterSessionKey, - scopeKind: "session" as const, - } - : {}), agentId: this.params.agentId, runId, label: "Codex subagent", @@ -284,7 +251,6 @@ export class CodexNativeSubagentTaskMirror { if (normalizedStatus === "pendingInit" || normalizedStatus === "running") { this.runtime.recordTaskRunProgressByRunId({ runId, - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, lastEventAt: eventAt, progressSummary: trimOptional(message) ?? @@ -298,7 +264,6 @@ export class CodexNativeSubagentTaskMirror { this.terminalRunIds.add(runId); this.runtime.finalizeTaskRunByRunId({ runId, - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, status: "succeeded", endedAt: eventAt, lastEventAt: eventAt, @@ -311,7 +276,6 @@ export class CodexNativeSubagentTaskMirror { this.terminalRunIds.add(runId); this.runtime.finalizeTaskRunByRunId({ runId, - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, status: "succeeded", endedAt: eventAt, lastEventAt: eventAt, @@ -324,7 +288,6 @@ export class CodexNativeSubagentTaskMirror { this.terminalRunIds.add(runId); this.runtime.finalizeTaskRunByRunId({ runId, - runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, status: normalizedStatus === "interrupted" || normalizedStatus === "shutdown" ? "cancelled" diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 84709aa503e0..22c36d151889 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -116,6 +116,7 @@ import { buildCodexNativeHookRelayConfig, CODEX_NATIVE_HOOK_RELAY_EVENTS, } from "./native-hook-relay.js"; +import { registerCodexNativeSubagentMonitor } from "./native-subagent-monitor.js"; import { describeCodexNotificationCorrelation, isCodexNotificationForTurn, @@ -2172,6 +2173,14 @@ export async function runCodexAppServerAttempt( return notificationQueue; }; + registerCodexNativeSubagentMonitor({ + client, + parentThreadId: thread.threadId, + requesterSessionKey: params.sessionKey, + taskRuntimeScope: params.agentHarnessTaskRuntimeScope, + agentId: params.agentId, + codexHome: appServer.start.env?.CODEX_HOME ?? resolveCodexAppServerHomeDir(agentDir), + }); const notificationCleanup = client.addNotificationHandler(enqueueNotification); const requestCleanup = client.addRequestHandler(async (request) => { let armCompletionWatchOnResponse = false; diff --git a/package.json b/package.json index 92797a51b456..f4a51b1453b2 100644 --- a/package.json +++ b/package.json @@ -509,9 +509,9 @@ "types": "./dist/plugin-sdk/codex-mcp-projection.d.ts", "default": "./dist/plugin-sdk/codex-mcp-projection.js" }, - "./plugin-sdk/codex-native-task-runtime": { - "types": "./dist/plugin-sdk/codex-native-task-runtime.d.ts", - "default": "./dist/plugin-sdk/codex-native-task-runtime.js" + "./plugin-sdk/agent-harness-task-runtime": { + "types": "./dist/plugin-sdk/agent-harness-task-runtime.d.ts", + "default": "./dist/plugin-sdk/agent-harness-task-runtime.js" }, "./plugin-sdk/agent-harness": { "types": "./dist/plugin-sdk/agent-harness.d.ts", diff --git a/scripts/lib/plugin-sdk-entrypoints.json b/scripts/lib/plugin-sdk-entrypoints.json index f8edc2728d1c..3613af572a20 100644 --- a/scripts/lib/plugin-sdk-entrypoints.json +++ b/scripts/lib/plugin-sdk-entrypoints.json @@ -103,6 +103,7 @@ "cli-backend", "codex-mcp-projection", "codex-native-task-runtime", + "agent-harness-task-runtime", "agent-harness", "agent-harness-runtime", "hook-runtime", diff --git a/scripts/lib/plugin-sdk-private-local-only-subpaths.json b/scripts/lib/plugin-sdk-private-local-only-subpaths.json index 0c6ea22241f4..91e92dd88706 100644 --- a/scripts/lib/plugin-sdk-private-local-only-subpaths.json +++ b/scripts/lib/plugin-sdk-private-local-only-subpaths.json @@ -1,4 +1,5 @@ [ + "codex-native-task-runtime", "qa-channel", "qa-channel-protocol", "qa-lab", diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 5a1ff6ad1ff6..93efd56f236a 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -18,6 +18,7 @@ import { resolveProviderAuthProfileId } from "../../plugins/provider-runtime.js" import { enqueueCommandInLane } from "../../process/command-queue.js"; import type { CommandQueueEnqueueOptions } from "../../process/command-queue.types.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; +import { createAgentHarnessTaskRuntimeScope } from "../../tasks/agent-harness-task-runtime-scope.js"; import { sanitizeForLog } from "../../terminal/ansi.js"; import { resolveUserPath } from "../../utils.js"; import { isMarkdownCapableMessageChannel } from "../../utils/message-channel.js"; @@ -1438,6 +1439,13 @@ export async function runEmbeddedPiAgent( // attempt too. Otherwise plugin-owned transports can skip PI auth // bootstrap but drift back to PI when the attempt is created. agentHarnessId: agentHarness.id, + ...(params.sessionKey + ? { + agentHarnessTaskRuntimeScope: createAgentHarnessTaskRuntimeScope({ + requesterSessionKey: params.sessionKey, + }), + } + : {}), runtimePlan, model: applyAuthHeaderOverride( applyLocalNoAuthHeaderOverride(effectiveModel, apiKeyInfo), diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index cce1375f2667..6f3979a01e75 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -7,6 +7,7 @@ import type { SessionSystemPromptReport } from "../../../config/sessions/types.j import type { ContextEngine, ContextEnginePromptCacheInfo } from "../../../context-engine/types.js"; import type { DiagnosticTraceContext } from "../../../infra/diagnostic-trace-context.js"; import type { PluginHookBeforeAgentStartResult } from "../../../plugins/hook-before-agent-start.types.js"; +import type { AgentHarnessTaskRuntimeScope } from "../../../tasks/agent-harness-task-runtime-scope.js"; import type { AcceptedSessionSpawn } from "../../accepted-session-spawn.js"; import type { AuthProfileStore } from "../../auth-profiles/types.js"; import type { @@ -53,6 +54,8 @@ export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & { agentHarnessId?: string; /** OpenClaw-owned runtime policy prepared by the orchestrator for this attempt. */ runtimePlan?: AgentRuntimePlan; + /** Host-issued scope for harnesses that mirror native child runs into task state. */ + agentHarnessTaskRuntimeScope?: AgentHarnessTaskRuntimeScope; /** Live observer called after wrapped tool outcomes are recorded. */ onToolOutcome?: ToolOutcomeObserver; model: Model; diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index 17b9c3f7d86d..0696ae0728af 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -855,7 +855,7 @@ describe("deliverSubagentAnnouncement active requester steering", () => { }); describe("deliverSubagentAnnouncement completion delivery", () => { - it("keeps completion announces session-internal while preserving route context for active requesters", async () => { + it("uses an active requester queue as the completion handoff when message-tool delivery is not required", async () => { const callGateway = createGatewayMock(); const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeMock(true); const result = await deliverSlackThreadAnnouncement({ @@ -886,6 +886,29 @@ describe("deliverSubagentAnnouncement completion delivery", () => { expect(callGateway).not.toHaveBeenCalled(); }); + it("does not also direct-run a queued message-tool-only active completion", async () => { + const callGateway = createGatewayMock(); + const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeMock(true); + const result = await deliverSlackThreadAnnouncement({ + callGateway, + sessionId: "requester-session-1", + isActive: true, + expectsCompletionMessage: true, + directIdempotencyKey: "announce-harness-task", + queueEmbeddedPiMessageWithOutcome, + sourceTool: "agent_harness_task", + }); + + expectRecordFields(result, { + delivered: true, + path: "steered", + enqueuedAt: 4_100, + deliveredAt: 4_200, + }); + expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledTimes(1); + expect(callGateway).not.toHaveBeenCalled(); + }); + it("keeps direct external delivery for dormant completion requesters", async () => { const callGateway = createGatewayMock(); const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeMock(false); diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 6cef16a3b293..d963f78ece77 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -64,6 +64,7 @@ import type { SpawnSubagentMode } from "./subagent-spawn.types.js"; const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000; const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000; const AGENT_MEDIATED_COMPLETION_TOOLS = new Set([ + "agent_harness_task", "image_generate", "music_generate", "subagent_announce", @@ -954,6 +955,17 @@ async function sendSubagentAnnounceDirectly(params: { const directAnnounceStillPending = isGatewayAgentRunPending(directAnnounceResponse); if (directAnnounceStillPending) { + if ( + params.expectsCompletionMessage && + expectedMediaUrls.length === 0 && + !requiresMessageToolDelivery + ) { + return { + delivered: false, + path: "direct", + error: "completion agent handoff is still pending", + }; + } return { delivered: true, path: "direct", diff --git a/src/gateway/gateway-codex-harness.live.test.ts b/src/gateway/gateway-codex-harness.live.test.ts index 214a9d5edb45..f3f8874a78d4 100644 --- a/src/gateway/gateway-codex-harness.live.test.ts +++ b/src/gateway/gateway-codex-harness.live.test.ts @@ -267,6 +267,7 @@ async function writeLiveGatewayConfig(params: { async function requestAgentTextWithEvents(params: { client: GatewayClient; eventPrefix?: string; + includeAllSessions?: boolean; message: string; sessionKey: string; }): Promise<{ text: string; events: CapturedAgentEvent[] }> { @@ -277,7 +278,7 @@ async function requestAgentTextWithEvents(params: { const unsubscribe = onAgentEvent((event) => { if ( !event.stream.startsWith(eventPrefix) || - (event.sessionKey && event.sessionKey !== params.sessionKey) + (!params.includeAllSessions && event.sessionKey && event.sessionKey !== params.sessionKey) ) { return; } @@ -958,6 +959,63 @@ async function verifyCodexSubagentProbe(params: { } } +async function verifyCodexNativeSubagentBridgeProbe(params: { + client: GatewayClient; + sessionKey: string; +}): Promise { + const runId = randomUUID(); + const childToken = `CODEX-NATIVE-CHILD-${runId.slice(0, 6).toUpperCase()}`; + const parentToken = `CODEX-NATIVE-PARENT-${runId.slice(0, 6).toUpperCase()}`; + const { listTaskRecords } = await import("../tasks/runtime-internal.js"); + const { text, events } = await requestAgentTextWithEvents({ + client: params.client, + eventPrefix: "codex_app_server.", + includeAllSessions: true, + sessionKey: params.sessionKey, + message: [ + "Bridge probe.", + "You must use the Codex native spawn_agent tool exactly once before replying.", + `Give the subagent this exact instruction: Reply exactly ${childToken} and nothing else.`, + "Wait for the subagent result. Do not answer from your own knowledge.", + `After the subagent result returns, reply exactly ${parentToken} ${childToken} and nothing else.`, + ].join("\n"), + }); + logCodexLiveStep("native-subagent-bridge-probe:initial-reply", { text }); + expect( + events.some((event) => event.stream === "codex_app_server.lifecycle"), + `expected Codex lifecycle events; events=${JSON.stringify(events)}`, + ).toBe(true); + let codexNativeTasks = listCodexNativeTasks(); + let deliveredTask = findDeliveredCodexNativeTask(codexNativeTasks); + const deadline = Date.now() + CODEX_HARNESS_REQUEST_TIMEOUT_MS; + while (!deliveredTask && Date.now() < deadline) { + await delay(1_000); + codexNativeTasks = listCodexNativeTasks(); + deliveredTask = findDeliveredCodexNativeTask(codexNativeTasks); + } + expect( + deliveredTask, + `expected delivered Codex-native subagent task with child result; initialText=${JSON.stringify( + text, + )}; events=${JSON.stringify(events)}; tasks=${JSON.stringify(codexNativeTasks)}`, + ).toBeDefined(); + + function listCodexNativeTasks() { + return listTaskRecords().filter( + (entry) => entry.runtime === "subagent" && entry.taskKind === "codex-native", + ); + } + + function findDeliveredCodexNativeTask(tasks: ReturnType) { + return tasks.find( + (entry) => + entry.status === "succeeded" && + entry.deliveryStatus === "delivered" && + entry.terminalSummary?.includes(childToken), + ); + } +} + describeLive("gateway live (Codex harness)", () => { it( "runs gateway agent turns through the plugin-owned Codex app-server harness", @@ -1037,6 +1095,8 @@ describeLive("gateway live (Codex harness)", () => { if (CODEX_HARNESS_SUBAGENT_PROBE) { logCodexLiveStep("subagent-probe:start", { sessionKey }); await verifyCodexSubagentProbe({ client, sessionKey }); + logCodexLiveStep("native-subagent-bridge-probe:start", { sessionKey }); + await verifyCodexNativeSubagentBridgeProbe({ client, sessionKey }); logCodexLiveStep("subagent-probe:done"); if (CODEX_HARNESS_SUBAGENT_ONLY) { return; diff --git a/src/plugin-sdk/agent-harness-task-runtime.test.ts b/src/plugin-sdk/agent-harness-task-runtime.test.ts new file mode 100644 index 000000000000..32ad23babc44 --- /dev/null +++ b/src/plugin-sdk/agent-harness-task-runtime.test.ts @@ -0,0 +1,201 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { deliverSubagentAnnouncement } from "../agents/subagent-announce-delivery.js"; +import { createAgentHarnessTaskRuntimeScope } from "../tasks/agent-harness-task-runtime-scope.js"; +import { createRunningTaskRun, finalizeTaskRunByRunId } from "../tasks/detached-task-runtime.js"; +import { listTaskRecords } from "../tasks/runtime-internal.js"; +import { + createAgentHarnessTaskRuntime, + deliverAgentHarnessTaskCompletion, + isDurableAgentHarnessCompletionDelivery, +} from "./agent-harness-task-runtime.js"; + +vi.mock("../agents/subagent-announce-delivery.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + deliverSubagentAnnouncement: vi.fn(async () => ({ delivered: true, path: "steered" })), + isInternalAnnounceRequesterSession: vi.fn(() => true), + }; +}); + +vi.mock("../tasks/detached-task-runtime.js", () => ({ + createRunningTaskRun: vi.fn((params) => ({ taskId: "task-1", ...params })), + recordTaskRunProgressByRunId: vi.fn(() => []), + finalizeTaskRunByRunId: vi.fn(() => []), + setDetachedTaskDeliveryStatusByRunId: vi.fn(() => []), +})); + +vi.mock("../tasks/runtime-internal.js", () => ({ + listTaskRecords: vi.fn(() => []), +})); + +describe("agent-harness-task-runtime", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(listTaskRecords).mockReturnValue([]); + }); + + function createScope(requesterSessionKey = "agent:main:channel:C123") { + return createAgentHarnessTaskRuntimeScope({ requesterSessionKey }); + } + + it("scopes task lifecycle mutations to the owning requester session", () => { + const runtime = createAgentHarnessTaskRuntime({ + runtime: "subagent", + taskKind: "example-harness", + scope: createScope(), + runIdPrefix: "example:", + }); + + runtime.createRunningTaskRun({ + runId: "example:child-1", + sourceId: "example:child-1", + task: "do work", + label: "worker", + }); + runtime.finalizeTaskRunByRunId({ + runId: "example:child-1", + status: "succeeded", + endedAt: 1, + }); + + expect(createRunningTaskRun).toHaveBeenCalledWith( + expect.objectContaining({ + runtime: "subagent", + taskKind: "example-harness", + requesterSessionKey: "agent:main:channel:C123", + ownerKey: "agent:main:channel:C123", + scopeKind: "session", + runId: "example:child-1", + }), + ); + expect(finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runtime: "subagent", + sessionKey: "agent:main:channel:C123", + runId: "example:child-1", + }), + ); + }); + + it("rejects task run ids outside the configured harness scope", () => { + const runtime = createAgentHarnessTaskRuntime({ + runtime: "subagent", + scope: createScope(), + runIdPrefix: "example:", + }); + + expect(() => + runtime.finalizeTaskRunByRunId({ + runId: "other:child-1", + status: "succeeded", + endedAt: 1, + }), + ).toThrow(/outside the configured scope/); + }); + + it("rejects caller-forged task runtime scopes", async () => { + const forgedScope = { + requesterSessionKey: "agent:other:channel:C999", + } as ReturnType; + expect(() => + createAgentHarnessTaskRuntime({ + runtime: "subagent", + scope: forgedScope, + }), + ).toThrow(/host-issued scope/); + await expect( + deliverAgentHarnessTaskCompletion({ + scope: forgedScope, + childSessionKey: "harness-thread:child", + childSessionId: "child", + announceId: "harness:parent:child:succeeded", + status: "succeeded", + result: "child final answer", + }), + ).rejects.toThrow(/host-issued scope/); + }); + + it("lists only task records owned by the scoped requester session", () => { + vi.mocked(listTaskRecords).mockReturnValue([ + { + taskId: "task-1", + runtime: "subagent", + taskKind: "example-harness", + requesterSessionKey: "agent:main:channel:C123", + ownerKey: "agent:main:channel:C123", + scopeKind: "session", + runId: "example:child-1", + task: "owned", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + createdAt: 1, + }, + { + taskId: "task-2", + runtime: "subagent", + taskKind: "example-harness", + requesterSessionKey: "agent:other:channel:C999", + ownerKey: "agent:other:channel:C999", + scopeKind: "session", + runId: "example:child-2", + task: "other", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + createdAt: 1, + }, + ]); + const runtime = createAgentHarnessTaskRuntime({ + runtime: "subagent", + taskKind: "example-harness", + scope: createScope(), + runIdPrefix: "example:", + }); + + expect(runtime.listTaskRecords().map((task) => task.taskId)).toEqual(["task-1"]); + }); + + it("delivers a generic harness completion through subagent announcement delivery", async () => { + await deliverAgentHarnessTaskCompletion({ + scope: createScope("agent:main:main"), + childSessionKey: "harness-thread:child", + childSessionId: "child", + announceId: "harness:parent:child:succeeded", + announceType: "Example harness worker", + taskLabel: "Example worker", + status: "succeeded", + statusLabel: "task_complete", + result: "child final answer", + }); + + expect(deliverSubagentAnnouncement).toHaveBeenCalledWith( + expect.objectContaining({ + requesterSessionKey: "agent:main:main", + announceId: "harness:parent:child:succeeded", + sourceSessionKey: "harness-thread:child", + sourceTool: "agent_harness_task", + expectsCompletionMessage: true, + directIdempotencyKey: "announce:harness:parent:child:succeeded", + }), + ); + }); + + it("checks durable direct delivery phases", () => { + expect( + isDurableAgentHarnessCompletionDelivery({ + delivered: true, + path: "direct", + phases: [{ phase: "direct-primary", delivered: true, path: "direct" }], + }), + ).toBe(true); + expect( + isDurableAgentHarnessCompletionDelivery({ + delivered: true, + path: "direct", + phases: [{ phase: "steer-fallback", delivered: true, path: "steered" }], + }), + ).toBe(false); + }); +}); diff --git a/src/plugin-sdk/agent-harness-task-runtime.ts b/src/plugin-sdk/agent-harness-task-runtime.ts new file mode 100644 index 000000000000..1563460b44bd --- /dev/null +++ b/src/plugin-sdk/agent-harness-task-runtime.ts @@ -0,0 +1,265 @@ +import { buildAnnounceIdempotencyKey } from "../agents/announce-idempotency.js"; +import { + AGENT_INTERNAL_EVENT_TYPE_TASK_COMPLETION, + type AgentInternalEventStatus, +} from "../agents/internal-event-contract.js"; +import { + formatAgentInternalEventsForPrompt, + type AgentInternalEvent, +} from "../agents/internal-events.js"; +import { + deliverSubagentAnnouncement, + isInternalAnnounceRequesterSession, + loadRequesterSessionEntry, + resolveSubagentCompletionOrigin, +} from "../agents/subagent-announce-delivery.js"; +import { resolveAnnounceOrigin } from "../agents/subagent-announce-origin.js"; +import { + assertAgentHarnessTaskRuntimeScope, + type AgentHarnessTaskRuntimeScope, +} from "../tasks/agent-harness-task-runtime-scope.js"; +import { + createRunningTaskRun, + finalizeTaskRunByRunId, + recordTaskRunProgressByRunId, + setDetachedTaskDeliveryStatusByRunId, +} from "../tasks/detached-task-runtime.js"; +import { listTaskRecords, type TaskRecord } from "../tasks/runtime-internal.js"; +import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js"; + +export type { TaskRecord as AgentHarnessTaskRecord }; +export type { AgentHarnessTaskRuntimeScope }; + +type AgentHarnessTaskRuntimeId = Parameters[0]["runtime"]; +type CreateRunningTaskRunParams = Parameters[0]; +type RecordTaskRunProgressParams = Parameters[0]; +type FinalizeTaskRunParams = Parameters[0]; +type SetDeliveryStatusParams = Parameters[0]; + +export type AgentHarnessTaskRuntimeScopeParams = { + runtime: AgentHarnessTaskRuntimeId; + scope: AgentHarnessTaskRuntimeScope; + taskKind?: string; + runIdPrefix?: string; +}; + +export type AgentHarnessScopedCreateRunningTaskRunParams = Omit< + CreateRunningTaskRunParams, + "runtime" | "taskKind" | "requesterSessionKey" | "ownerKey" | "scopeKind" +> & { + runId: string; +}; + +export type AgentHarnessScopedRecordTaskRunProgressParams = Omit< + RecordTaskRunProgressParams, + "runtime" | "sessionKey" +>; + +export type AgentHarnessScopedFinalizeTaskRunParams = Omit< + FinalizeTaskRunParams, + "runtime" | "sessionKey" +>; + +export type AgentHarnessScopedSetDeliveryStatusParams = Omit< + SetDeliveryStatusParams, + "runtime" | "sessionKey" +>; + +export type AgentHarnessTaskRuntime = { + createRunningTaskRun(params: AgentHarnessScopedCreateRunningTaskRunParams): TaskRecord; + recordTaskRunProgressByRunId(params: AgentHarnessScopedRecordTaskRunProgressParams): TaskRecord[]; + finalizeTaskRunByRunId(params: AgentHarnessScopedFinalizeTaskRunParams): TaskRecord[]; + setDetachedTaskDeliveryStatusByRunId( + params: AgentHarnessScopedSetDeliveryStatusParams, + ): TaskRecord[]; + listTaskRecords(): TaskRecord[]; +}; + +export type AgentHarnessCompletionStatus = "succeeded" | "failed" | "cancelled"; + +export type AgentHarnessCompletionDelivery = Awaited< + ReturnType +>; + +const AGENT_HARNESS_COMPLETION_SOURCE_TOOL = "agent_harness_task"; + +export function createAgentHarnessTaskRuntime( + params: AgentHarnessTaskRuntimeScopeParams, +): AgentHarnessTaskRuntime { + const runtime = params.runtime; + const scope = assertAgentHarnessTaskRuntimeScope(params.scope); + const requesterSessionKey = scope.requesterSessionKey; + const taskKind = normalizeOptionalString(params.taskKind); + const runIdPrefix = normalizeOptionalString(params.runIdPrefix); + const assertRunId = (runId: string) => assertScopedRunId(runId, runIdPrefix); + return { + createRunningTaskRun(taskParams) { + assertRunId(taskParams.runId); + return createRunningTaskRun({ + ...taskParams, + runtime, + ...(taskKind ? { taskKind } : {}), + requesterSessionKey, + ownerKey: requesterSessionKey, + scopeKind: "session", + }); + }, + recordTaskRunProgressByRunId(taskParams) { + assertRunId(taskParams.runId); + return recordTaskRunProgressByRunId({ + ...taskParams, + runtime, + sessionKey: requesterSessionKey, + }); + }, + finalizeTaskRunByRunId(taskParams) { + assertRunId(taskParams.runId); + return finalizeTaskRunByRunId({ + ...taskParams, + runtime, + sessionKey: requesterSessionKey, + }); + }, + setDetachedTaskDeliveryStatusByRunId(taskParams) { + assertRunId(taskParams.runId); + return setDetachedTaskDeliveryStatusByRunId({ + ...taskParams, + runtime, + sessionKey: requesterSessionKey, + }); + }, + listTaskRecords() { + return listTaskRecords().filter( + (task) => + task.runtime === runtime && + (!taskKind || task.taskKind === taskKind) && + task.scopeKind === "session" && + task.ownerKey === requesterSessionKey && + (!runIdPrefix || task.runId?.startsWith(runIdPrefix)), + ); + }, + }; +} + +export async function deliverAgentHarnessTaskCompletion(params: { + scope: AgentHarnessTaskRuntimeScope; + childSessionKey: string; + childSessionId: string; + announceId: string; + status: AgentHarnessCompletionStatus; + statusLabel?: string; + result: string; + taskLabel?: string; + announceType?: string; + replyInstruction?: string; + signal?: AbortSignal; +}): Promise { + const scope = assertAgentHarnessTaskRuntimeScope(params.scope); + const requesterSessionKey = scope.requesterSessionKey; + const childSessionKey = params.childSessionKey.trim(); + const childSessionId = params.childSessionId.trim(); + const taskLabel = params.taskLabel?.trim() || "Agent harness task"; + const announceType = params.announceType?.trim() || "Agent harness task"; + const statusLabel = params.statusLabel?.trim() || params.status; + const eventStatus = mapHarnessCompletionStatus(params.status); + const requesterIsSubagent = isInternalAnnounceRequesterSession(requesterSessionKey); + let directOrigin = scope.requesterOrigin; + if (!requesterIsSubagent) { + const { entry } = loadRequesterSessionEntry(requesterSessionKey); + directOrigin = resolveAnnounceOrigin(entry, scope.requesterOrigin); + } + const completionDirectOrigin = + requesterIsSubagent || !directOrigin + ? directOrigin + : await resolveSubagentCompletionOrigin({ + childSessionKey, + requesterSessionKey, + requesterOrigin: directOrigin, + childRunId: childSessionKey, + spawnMode: "run", + expectsCompletionMessage: true, + }); + const internalEvents: AgentInternalEvent[] = [ + { + type: AGENT_INTERNAL_EVENT_TYPE_TASK_COMPLETION, + source: "subagent", + childSessionKey, + childSessionId, + announceType, + taskLabel, + status: eventStatus, + statusLabel, + result: params.result, + replyInstruction: + params.replyInstruction?.trim() || + "Use the completed harness task result to continue or wrap up the parent task. If this is a channel session, send the visible response with the message tool instead of only writing a transcript final answer.", + }, + ]; + const prompt = formatAgentInternalEventsForPrompt(internalEvents); + return await deliverSubagentAnnouncement({ + requesterSessionKey, + announceId: params.announceId, + triggerMessage: prompt, + steerMessage: prompt, + internalEvents, + summaryLine: taskLabel, + requesterSessionOrigin: scope.requesterOrigin, + requesterOrigin: completionDirectOrigin ?? directOrigin, + completionDirectOrigin: completionDirectOrigin ?? directOrigin, + directOrigin, + sourceSessionKey: childSessionKey, + sourceChannel: INTERNAL_MESSAGE_CHANNEL, + sourceTool: AGENT_HARNESS_COMPLETION_SOURCE_TOOL, + targetRequesterSessionKey: requesterSessionKey, + requesterIsSubagent, + expectsCompletionMessage: true, + bestEffortDeliver: true, + directIdempotencyKey: buildAnnounceIdempotencyKey(params.announceId), + signal: params.signal, + }); +} + +function mapHarnessCompletionStatus( + status: AgentHarnessCompletionStatus, +): AgentInternalEventStatus { + if (status === "succeeded") { + return "ok"; + } + return "error"; +} + +export function isDurableAgentHarnessCompletionDelivery( + delivery: AgentHarnessCompletionDelivery, +): boolean { + if (!delivery.delivered) { + return false; + } + if (delivery.path === "steered") { + return true; + } + if (delivery.path !== "direct") { + return false; + } + const phases = Array.isArray(delivery.phases) ? delivery.phases : undefined; + if (!phases) { + return true; + } + return phases.some( + (phase) => phase.phase === "direct-primary" && phase.delivered && phase.path === "direct", + ); +} + +function normalizeOptionalString(value: string | undefined): string | undefined { + const normalized = value?.trim(); + return normalized || undefined; +} + +function assertScopedRunId(runId: string, runIdPrefix: string | undefined): void { + const normalized = runId.trim(); + if (!normalized) { + throw new Error("Agent harness task runtime requires runId"); + } + if (runIdPrefix && !normalized.startsWith(runIdPrefix)) { + throw new Error("Agent harness task runId is outside the configured scope"); + } +} diff --git a/src/plugin-sdk/entrypoints.ts b/src/plugin-sdk/entrypoints.ts index 55276266c4da..e2c242af5a06 100644 --- a/src/plugin-sdk/entrypoints.ts +++ b/src/plugin-sdk/entrypoints.ts @@ -35,10 +35,7 @@ export const deprecatedBarrelPluginSdkEntrypoints = pluginSdkSubpaths.filter((en // Transitional compatibility/helper surfaces owned by their matching bundled plugin. // Cross-owner extension imports are blocked by the package contract guardrails. -export const reservedBundledPluginSdkEntrypoints = [ - "codex-mcp-projection", - "codex-native-task-runtime", -] as const; +export const reservedBundledPluginSdkEntrypoints = ["codex-mcp-projection"] as const; // Supported SDK facades backed by bundled plugins. These are intentionally public // until they move to generic, plugin-neutral contracts. diff --git a/src/plugins/contracts/plugin-sdk-root-alias.test.ts b/src/plugins/contracts/plugin-sdk-root-alias.test.ts index 7fc7de3fa7d9..90a4bab0b24a 100644 --- a/src/plugins/contracts/plugin-sdk-root-alias.test.ts +++ b/src/plugins/contracts/plugin-sdk-root-alias.test.ts @@ -445,16 +445,16 @@ describe("plugin-sdk root alias", () => { it("keeps non-QA private local-only plugin-sdk subpaths out of the CJS root alias", () => { const packageRoot = path.dirname(path.dirname(path.dirname(rootAliasPath))); - const sourceCodexNativeTaskRuntimePath = path.join( + const sourceCodexMcpProjectionPath = path.join( packageRoot, "src", "plugin-sdk", - "codex-native-task-runtime.ts", + "codex-mcp-projection.ts", ); const sourceQaRuntimePath = path.join(packageRoot, "src", "plugin-sdk", "qa-runtime.ts"); const lazyModule = loadRootAliasWithStubs({ - privateLocalOnlySubpaths: ["codex-native-task-runtime", "qa-runtime"], - existingPaths: [sourceCodexNativeTaskRuntimePath, sourceQaRuntimePath], + privateLocalOnlySubpaths: ["codex-mcp-projection", "qa-runtime"], + existingPaths: [sourceCodexMcpProjectionPath, sourceQaRuntimePath], monolithicExports: { slowHelper: (): string => "loaded", }, @@ -462,8 +462,8 @@ describe("plugin-sdk root alias", () => { expect((lazyModule.moduleExports.slowHelper as () => string)()).toBe("loaded"); const aliasMap = (lazyModule.createJitiOptions.at(-1)?.alias ?? {}) as Record; - expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/codex-native-task-runtime"); - expect(aliasMap).not.toHaveProperty("@openclaw/plugin-sdk/codex-native-task-runtime"); + expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/codex-mcp-projection"); + expect(aliasMap).not.toHaveProperty("@openclaw/plugin-sdk/codex-mcp-projection"); expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/qa-runtime"); }); diff --git a/src/plugins/sdk-alias.test.ts b/src/plugins/sdk-alias.test.ts index c7536577bdf3..d6f80e5c5815 100644 --- a/src/plugins/sdk-alias.test.ts +++ b/src/plugins/sdk-alias.test.ts @@ -653,7 +653,7 @@ describe("plugin sdk alias helpers", () => { expect(subpaths).toEqual(["core", "qa-channel", "qa-channel-protocol", "qa-lab", "qa-runtime"]); }); - it("adds the non-QA private Codex task runtime subpath only for trusted Codex plugins", () => { + it("adds non-QA private Codex helper subpaths only for trusted Codex plugins", () => { const fixture = createPluginSdkAliasFixture({ packageExports: { "./plugin-sdk/core": { default: "./dist/plugin-sdk/core.js" }, @@ -664,13 +664,13 @@ describe("plugin sdk alias helpers", () => { { force: true }, ); fs.writeFileSync( - path.join(fixture.root, "src", "plugin-sdk", "codex-native-task-runtime.ts"), - "export const codexNativeTaskRuntime = true;\n", + path.join(fixture.root, "src", "plugin-sdk", "codex-mcp-projection.ts"), + "export const codexMcpProjection = true;\n", "utf-8", ); fs.writeFileSync( - path.join(fixture.root, "src", "plugin-sdk", "codex-mcp-projection.ts"), - "export const codexMcpProjection = true;\n", + path.join(fixture.root, "src", "plugin-sdk", "codex-native-task-runtime.ts"), + "export const codexNativeTaskRuntime = true;\n", "utf-8", ); fs.writeFileSync( @@ -927,31 +927,31 @@ describe("plugin sdk alias helpers", () => { }, }); const sourceRootAlias = path.join(fixture.root, "src", "plugin-sdk", "root-alias.cjs"); - const sourceCodexNativeTaskRuntimePath = path.join( - fixture.root, - "src", - "plugin-sdk", - "codex-native-task-runtime.ts", - ); const sourceCodexMcpProjectionPath = path.join( fixture.root, "src", "plugin-sdk", "codex-mcp-projection.ts", ); - const distRootAlias = path.join(fixture.root, "dist", "plugin-sdk", "root-alias.cjs"); - const distCodexNativeTaskRuntimePath = path.join( + const sourceCodexNativeTaskRuntimePath = path.join( fixture.root, - "dist", + "src", "plugin-sdk", - "codex-native-task-runtime.js", + "codex-native-task-runtime.ts", ); + const distRootAlias = path.join(fixture.root, "dist", "plugin-sdk", "root-alias.cjs"); const distCodexMcpProjectionPath = path.join( fixture.root, "dist", "plugin-sdk", "codex-mcp-projection.js", ); + const distCodexNativeTaskRuntimePath = path.join( + fixture.root, + "dist", + "plugin-sdk", + "codex-native-task-runtime.js", + ); const sourceQaRuntimePath = path.join(fixture.root, "src", "plugin-sdk", "qa-runtime.ts"); fs.writeFileSync(sourceRootAlias, "module.exports = {};\n", "utf-8"); fs.writeFileSync(distRootAlias, "module.exports = {};\n", "utf-8"); @@ -959,18 +959,13 @@ describe("plugin sdk alias helpers", () => { path.join(fixture.root, "scripts", "lib", "plugin-sdk-private-local-only-subpaths.json"), { force: true }, ); - fs.writeFileSync( - sourceCodexNativeTaskRuntimePath, - "export const codexNativeTaskRuntime = true;\n", - "utf-8", - ); fs.writeFileSync( sourceCodexMcpProjectionPath, "export const codexMcpProjection = true;\n", "utf-8", ); fs.writeFileSync( - distCodexNativeTaskRuntimePath, + sourceCodexNativeTaskRuntimePath, "export const codexNativeTaskRuntime = true;\n", "utf-8", ); @@ -979,6 +974,11 @@ describe("plugin sdk alias helpers", () => { "export const codexMcpProjection = true;\n", "utf-8", ); + fs.writeFileSync( + distCodexNativeTaskRuntimePath, + "export const codexNativeTaskRuntime = true;\n", + "utf-8", + ); fs.writeFileSync(sourceQaRuntimePath, "export const qaRuntime = true;\n", "utf-8"); const sourcePluginEntry = writePluginEntry( fixture.root, @@ -1050,25 +1050,25 @@ describe("plugin sdk alias helpers", () => { expect(fs.realpathSync(aliases["openclaw/plugin-sdk"] ?? "")).toBe( fs.realpathSync(sourceRootAlias), ); - expect(fs.realpathSync(aliases["openclaw/plugin-sdk/codex-native-task-runtime"] ?? "")).toBe( - fs.realpathSync(sourceCodexNativeTaskRuntimePath), - ); expect(fs.realpathSync(aliases["openclaw/plugin-sdk/codex-mcp-projection"] ?? "")).toBe( fs.realpathSync(sourceCodexMcpProjectionPath), ); - expect( - fs.realpathSync(installedAliases["openclaw/plugin-sdk/codex-native-task-runtime"] ?? ""), - ).toBe(fs.realpathSync(distCodexNativeTaskRuntimePath)); + expect(fs.realpathSync(aliases["openclaw/plugin-sdk/codex-native-task-runtime"] ?? "")).toBe( + fs.realpathSync(sourceCodexNativeTaskRuntimePath), + ); expect( fs.realpathSync(installedAliases["openclaw/plugin-sdk/codex-mcp-projection"] ?? ""), ).toBe(fs.realpathSync(distCodexMcpProjectionPath)); + expect( + fs.realpathSync(installedAliases["openclaw/plugin-sdk/codex-native-task-runtime"] ?? ""), + ).toBe(fs.realpathSync(distCodexNativeTaskRuntimePath)); expect(aliases["openclaw/plugin-sdk/qa-runtime"]).toBeUndefined(); - expect(otherAliases["openclaw/plugin-sdk/codex-native-task-runtime"]).toBeUndefined(); expect(otherAliases["openclaw/plugin-sdk/codex-mcp-projection"]).toBeUndefined(); - expect(installedOtherAliases["openclaw/plugin-sdk/codex-native-task-runtime"]).toBeUndefined(); + expect(otherAliases["openclaw/plugin-sdk/codex-native-task-runtime"]).toBeUndefined(); expect(installedOtherAliases["openclaw/plugin-sdk/codex-mcp-projection"]).toBeUndefined(); - expect(shadowCodexAliases["openclaw/plugin-sdk/codex-native-task-runtime"]).toBeUndefined(); + expect(installedOtherAliases["openclaw/plugin-sdk/codex-native-task-runtime"]).toBeUndefined(); expect(shadowCodexAliases["openclaw/plugin-sdk/codex-mcp-projection"]).toBeUndefined(); + expect(shadowCodexAliases["openclaw/plugin-sdk/codex-native-task-runtime"]).toBeUndefined(); }); it("applies explicit dist resolution to plugin-sdk subpath aliases too", () => { diff --git a/src/tasks/agent-harness-task-runtime-scope.ts b/src/tasks/agent-harness-task-runtime-scope.ts new file mode 100644 index 000000000000..433769557847 --- /dev/null +++ b/src/tasks/agent-harness-task-runtime-scope.ts @@ -0,0 +1,51 @@ +import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; +import type { DeliveryContext } from "../utils/delivery-context.types.js"; + +const scopeRegistryKey = Symbol.for("openclaw.agentHarnessTaskRuntimeScope.registry"); + +type ScopeRegistry = { + hostIssuedScopes: WeakSet; +}; + +type GlobalWithScopeRegistry = typeof globalThis & { + [scopeRegistryKey]?: ScopeRegistry; +}; + +function getScopeRegistry(): ScopeRegistry { + const globalState = globalThis as GlobalWithScopeRegistry; + globalState[scopeRegistryKey] ??= { + hostIssuedScopes: new WeakSet(), + }; + return globalState[scopeRegistryKey]; +} + +export type AgentHarnessTaskRuntimeScope = { + readonly requesterSessionKey: string; + readonly requesterOrigin?: DeliveryContext; +}; + +export function createAgentHarnessTaskRuntimeScope(params: { + requesterSessionKey: string; + requesterOrigin?: DeliveryContext; +}): AgentHarnessTaskRuntimeScope { + const requesterSessionKey = params.requesterSessionKey.trim(); + if (!requesterSessionKey) { + throw new Error("Agent harness task runtime scope requires requesterSessionKey"); + } + const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); + const scope: AgentHarnessTaskRuntimeScope = { + requesterSessionKey, + ...(requesterOrigin ? { requesterOrigin } : {}), + }; + getScopeRegistry().hostIssuedScopes.add(scope); + return scope; +} + +export function assertAgentHarnessTaskRuntimeScope( + scope: AgentHarnessTaskRuntimeScope, +): AgentHarnessTaskRuntimeScope { + if (!getScopeRegistry().hostIssuedScopes.has(scope)) { + throw new Error("Agent harness task runtime requires a host-issued scope"); + } + return scope; +} diff --git a/src/tasks/runtime-internal.ts b/src/tasks/runtime-internal.ts index 2c4b1ea0a53f..3d0ed7a080e4 100644 --- a/src/tasks/runtime-internal.ts +++ b/src/tasks/runtime-internal.ts @@ -29,3 +29,4 @@ export { setTaskRunDeliveryStatusByRunId, updateTaskNotifyPolicyById, } from "./task-registry.js"; +export type { TaskRecord } from "./task-registry.types.js"; diff --git a/src/tasks/task-registry.process-state.test.ts b/src/tasks/task-registry.process-state.test.ts new file mode 100644 index 000000000000..e65bab929c66 --- /dev/null +++ b/src/tasks/task-registry.process-state.test.ts @@ -0,0 +1,41 @@ +import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures"; +import { describe, expect, it } from "vitest"; + +describe("task registry process state", () => { + it("shares state across duplicate module instances", async () => { + const firstModule = await importFreshModule( + import.meta.url, + "./task-registry.process-state.js?scope=task-registry-state-a", + ); + const secondModule = await importFreshModule( + import.meta.url, + "./task-registry.process-state.js?scope=task-registry-state-b", + ); + const firstState = firstModule.getTaskRegistryProcessState(); + const secondState = secondModule.getTaskRegistryProcessState(); + + firstState.tasks.set("task-duplicate", { + taskId: "task-duplicate", + runtime: "subagent", + taskKind: "agent-harness", + requesterSessionKey: "agent:main:parent", + ownerKey: "agent:main:parent", + scopeKind: "session", + runId: "agent-harness:child-duplicate", + task: "Duplicate module task", + status: "running", + deliveryStatus: "pending", + notifyPolicy: "silent", + createdAt: 1, + }); + + expect(secondState.tasks.get("task-duplicate")).toEqual( + expect.objectContaining({ + runtime: "subagent", + taskKind: "agent-harness", + runId: "agent-harness:child-duplicate", + }), + ); + firstState.tasks.clear(); + }); +}); diff --git a/src/tasks/task-registry.process-state.ts b/src/tasks/task-registry.process-state.ts new file mode 100644 index 000000000000..f59714eec7d1 --- /dev/null +++ b/src/tasks/task-registry.process-state.ts @@ -0,0 +1,29 @@ +import type { TaskDeliveryState, TaskRecord } from "./task-registry.types.js"; + +export type TaskRegistryProcessState = { + tasks: Map; + taskDeliveryStates: Map; + taskIdsByRunId: Map>; + taskIdsByOwnerKey: Map>; + taskIdsByParentFlowId: Map>; + taskIdsByRelatedSessionKey: Map>; + tasksWithPendingDelivery: Set; +}; + +const TASK_REGISTRY_PROCESS_STATE_KEY = Symbol.for("openclaw.taskRegistry.state"); + +export function getTaskRegistryProcessState(): TaskRegistryProcessState { + const globalState = globalThis as typeof globalThis & { + [TASK_REGISTRY_PROCESS_STATE_KEY]?: TaskRegistryProcessState; + }; + globalState[TASK_REGISTRY_PROCESS_STATE_KEY] ??= { + tasks: new Map(), + taskDeliveryStates: new Map(), + taskIdsByRunId: new Map>(), + taskIdsByOwnerKey: new Map>(), + taskIdsByParentFlowId: new Map>(), + taskIdsByRelatedSessionKey: new Map>(), + tasksWithPendingDelivery: new Set(), + }; + return globalState[TASK_REGISTRY_PROCESS_STATE_KEY]; +} diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 91986331f67b..806a11ec2478 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -29,6 +29,7 @@ import { updateFlowRecordByIdExpectedRevision, } from "./task-flow-runtime-internal.js"; import type { TaskRegistryControlRuntime } from "./task-registry-control.types.js"; +import { getTaskRegistryProcessState } from "./task-registry.process-state.js"; import { getTaskRegistryObservers, getTaskRegistryStore, @@ -54,13 +55,14 @@ import type { const log = createSubsystemLogger("tasks/registry"); const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000; -const tasks = new Map(); -const taskDeliveryStates = new Map(); -const taskIdsByRunId = new Map>(); -const taskIdsByOwnerKey = new Map>(); -const taskIdsByParentFlowId = new Map>(); -const taskIdsByRelatedSessionKey = new Map>(); -const tasksWithPendingDelivery = new Set(); +const taskRegistryProcessState = getTaskRegistryProcessState(); +const tasks = taskRegistryProcessState.tasks; +const taskDeliveryStates = taskRegistryProcessState.taskDeliveryStates; +const taskIdsByRunId = taskRegistryProcessState.taskIdsByRunId; +const taskIdsByOwnerKey = taskRegistryProcessState.taskIdsByOwnerKey; +const taskIdsByParentFlowId = taskRegistryProcessState.taskIdsByParentFlowId; +const taskIdsByRelatedSessionKey = taskRegistryProcessState.taskIdsByRelatedSessionKey; +const tasksWithPendingDelivery = taskRegistryProcessState.tasksWithPendingDelivery; let listenerStarted = false; let listenerStop: (() => void) | null = null; let restoreAttempted = false; diff --git a/tsdown.config.ts b/tsdown.config.ts index 7e517d976100..425ead1324ff 100644 --- a/tsdown.config.ts +++ b/tsdown.config.ts @@ -309,8 +309,6 @@ function buildUnifiedDistEntries(): Record { ...dockerE2eHarnessEntries, // Internal compat artifact for the root-alias.cjs lazy loader. "plugin-sdk/compat": "src/plugin-sdk/compat.ts", - // Private bundled Codex helper for app-server native subagent task mirroring. - "plugin-sdk/codex-native-task-runtime": "src/plugin-sdk/codex-native-task-runtime.ts", // Private bundled Codex helper for app-server user MCP config projection. "plugin-sdk/codex-mcp-projection": "src/plugin-sdk/codex-mcp-projection.ts", ...Object.fromEntries(