From a6ee3dbbdd07a1c18ce6588c0622a4c1798b637d Mon Sep 17 00:00:00 2001 From: Yuval Dinodia <102706514+yetval@users.noreply.github.com> Date: Sun, 31 May 2026 18:24:59 +0200 Subject: [PATCH] fix(ios): update group chats in realtime Subscribe the iOS gateway chat transport to per-session transcript events so group chats update when other clients send messages. Constrain local user echo adoption to the optimistic row tied to the still-pending send run, so repeated same-content user messages from other clients append instead of replacing history. Fixes #80231. Co-authored-by: Yuval Dinodia --- .../Chat/IOSGatewayChatTransport.swift | 86 ++++++++----- .../Tests/IOSGatewayChatTransportTests.swift | 72 +++++++++++ .../OpenClawChatUI/ChatViewModel.swift | 66 +++++++++- .../OpenClawKitTests/ChatViewModelTests.swift | 120 ++++++++++++++++++ 4 files changed, 306 insertions(+), 38 deletions(-) diff --git a/apps/ios/Sources/Chat/IOSGatewayChatTransport.swift b/apps/ios/Sources/Chat/IOSGatewayChatTransport.swift index 3b5811a6674f..2ef849f17a89 100644 --- a/apps/ios/Sources/Chat/IOSGatewayChatTransport.swift +++ b/apps/ios/Sources/Chat/IOSGatewayChatTransport.swift @@ -5,7 +5,7 @@ import OpenClawProtocol import OSLog struct IOSGatewayChatTransport: OpenClawChatTransport { - private static let logger = Logger(subsystem: "ai.openclaw", category: "ios.chat.transport") + static let logger = Logger(subsystem: "ai.openclaw", category: "ios.chat.transport") static let defaultChatSendTimeoutMs = 30000 private let gateway: GatewayNodeSession @@ -167,8 +167,13 @@ struct IOSGatewayChatTransport: OpenClawChatTransport { } func setActiveSessionKey(_ sessionKey: String) async throws { - // Operator clients receive chat events without node-style subscriptions. - // (chat.subscribe is a node event, not an operator RPC method.) + struct Params: Codable { var key: String } + let data = try JSONEncoder().encode(Params(key: sessionKey)) + let json = String(data: data, encoding: .utf8) + _ = try await self.gateway.request( + method: "sessions.messages.subscribe", + paramsJSON: json, + timeoutSeconds: 10) } func resetSession(sessionKey: String) async throws { @@ -257,35 +262,8 @@ struct IOSGatewayChatTransport: OpenClawChatTransport { let stream = await self.gateway.subscribeServerEvents() for await evt in stream { if Task.isCancelled { return } - switch evt.event { - case "tick": - continuation.yield(.tick) - case "seqGap": - continuation.yield(.seqGap) - case "health": - guard let payload = evt.payload else { break } - let ok = (try? GatewayPayloadDecoding.decode( - payload, - as: OpenClawGatewayHealthOK.self))?.ok ?? true - continuation.yield(.health(ok: ok)) - case "chat": - guard let payload = evt.payload else { break } - if let chatPayload = try? GatewayPayloadDecoding.decode( - payload, - as: OpenClawChatEventPayload.self) - { - continuation.yield(.chat(chatPayload)) - } - case "agent": - guard let payload = evt.payload else { break } - if let agentPayload = try? GatewayPayloadDecoding.decode( - payload, - as: OpenClawAgentEventPayload.self) - { - continuation.yield(.agent(agentPayload)) - } - default: - break + if let mapped = Self.mapEventFrame(evt) { + continuation.yield(mapped) } } } @@ -295,4 +273,48 @@ struct IOSGatewayChatTransport: OpenClawChatTransport { } } } + + static func mapEventFrame(_ evt: EventFrame) -> OpenClawChatTransportEvent? { + switch evt.event { + case "tick": + return .tick + case "seqGap": + return .seqGap + case "health": + guard let payload = evt.payload else { return nil } + let ok = (try? GatewayPayloadDecoding.decode( + payload, + as: OpenClawGatewayHealthOK.self))?.ok ?? true + return .health(ok: ok) + case "chat": + guard let payload = evt.payload else { return nil } + guard let chatPayload = try? GatewayPayloadDecoding.decode( + payload, + as: OpenClawChatEventPayload.self) + else { + return nil + } + return .chat(chatPayload) + case "session.message": + guard let payload = evt.payload else { return nil } + guard let message = try? GatewayPayloadDecoding.decode( + payload, + as: OpenClawSessionMessageEventPayload.self) + else { + return nil + } + return .sessionMessage(message) + case "agent": + guard let payload = evt.payload else { return nil } + guard let agentPayload = try? GatewayPayloadDecoding.decode( + payload, + as: OpenClawAgentEventPayload.self) + else { + return nil + } + return .agent(agentPayload) + default: + return nil + } + } } diff --git a/apps/ios/Tests/IOSGatewayChatTransportTests.swift b/apps/ios/Tests/IOSGatewayChatTransportTests.swift index 5b895970601a..3ae45ec6acbd 100644 --- a/apps/ios/Tests/IOSGatewayChatTransportTests.swift +++ b/apps/ios/Tests/IOSGatewayChatTransportTests.swift @@ -1,5 +1,6 @@ import Foundation import OpenClawKit +import OpenClawProtocol import Testing @testable import OpenClaw @@ -84,5 +85,76 @@ import Testing try await transport.resetSession(sessionKey: "node-test") Issue.record("Expected resetSession to throw when gateway not connected") } catch {} + + do { + try await transport.setActiveSessionKey("node-test") + Issue.record("Expected setActiveSessionKey to throw when gateway not connected") + } catch {} + } + + @Test func mapsSessionMessageEventToSessionMessage() { + let payload = AnyCodable([ + "sessionKey": AnyCodable("agent:main:main"), + "messageId": AnyCodable("msg-1"), + "messageSeq": AnyCodable(7), + "message": AnyCodable([ + "role": AnyCodable("assistant"), + "content": AnyCodable([ + AnyCodable([ + "type": AnyCodable("text"), + "text": AnyCodable("agent reply"), + ]), + ]), + "timestamp": AnyCodable(1234.5), + ]), + ]) + let frame = EventFrame( + type: "event", + event: "session.message", + payload: payload, + seq: 1, + stateversion: nil) + let mapped = IOSGatewayChatTransport.mapEventFrame(frame) + + switch mapped { + case let .sessionMessage(message): + #expect(message.sessionKey == "agent:main:main") + #expect(message.messageId == "msg-1") + #expect(message.messageSeq == 7) + #expect(message.message?.role == "assistant") + #expect(message.message?.content.first?.text == "agent reply") + default: + Issue.record("expected .sessionMessage from session.message event, got \(String(describing: mapped))") + } + } + + @Test func mapsChatEventToChat() { + let payload = AnyCodable([ + "runId": AnyCodable("run-1"), + "sessionKey": AnyCodable("main"), + "state": AnyCodable("final"), + ]) + let frame = EventFrame(type: "event", event: "chat", payload: payload, seq: 1, stateversion: nil) + let mapped = IOSGatewayChatTransport.mapEventFrame(frame) + + switch mapped { + case let .chat(chat): + #expect(chat.runId == "run-1") + #expect(chat.sessionKey == "main") + #expect(chat.state == "final") + default: + Issue.record("expected .chat from chat event, got \(String(describing: mapped))") + } + } + + @Test func mapsUnknownEventToNil() { + let frame = EventFrame( + type: "event", + event: "unknown", + payload: AnyCodable(["a": AnyCodable(1)]), + seq: 1, + stateversion: nil) + let mapped = IOSGatewayChatTransport.mapEventFrame(frame) + #expect(mapped == nil) } } diff --git a/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatViewModel.swift b/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatViewModel.swift index 3de9a6e22314..18948e24a6fd 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatViewModel.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatViewModel.swift @@ -44,6 +44,8 @@ public final class OpenClawChatViewModel { didSet { self.pendingRunCount = self.pendingRuns.count } } + private var pendingLocalUserEchoMessageIDsByRunID: [String: UUID] = [:] + @ObservationIgnored private nonisolated(unsafe) var pendingRunTimeoutTasks: [String: Task] = [:] private let pendingRunTimeoutMs: UInt64 = 120_000 @@ -279,6 +281,7 @@ public final class OpenClawChatViewModel { self.messages = Self.reconcileMessageIDs( previous: self.messages, incoming: Self.decodeMessages(payload.messages ?? [])) + self.prunePendingLocalUserEchoMessageIDs() self.sessionId = payload.sessionId if !self.prefersExplicitThinkingLevel, let level = Self.normalizedThinkingLevel(payload.thinkingLevel) @@ -393,6 +396,43 @@ public final class OpenClawChatViewModel { return [role, toolCallId, toolName, contentFingerprint].joined(separator: "|") } + private func prunePendingLocalUserEchoMessageIDs() { + guard !self.pendingLocalUserEchoMessageIDsByRunID.isEmpty else { return } + let visibleMessageIDs = Set(self.messages.map(\.id)) + self.pendingLocalUserEchoMessageIDsByRunID = self.pendingLocalUserEchoMessageIDsByRunID.filter { + self.pendingRuns.contains($0.key) && visibleMessageIDs.contains($0.value) + } + } + + private func adoptPendingLocalUserEcho(incoming: OpenClawChatMessage) -> Bool { + guard let incomingKey = Self.userRefreshIdentityKey(for: incoming) else { return false } + guard let matchIndex = self.messages.lastIndex(where: { existing in + self.pendingLocalUserEchoMessageIDsByRunID.values.contains(existing.id) + && Self.userRefreshIdentityKey(for: existing) == incomingKey + }) else { + return false + } + + let existing = self.messages[matchIndex] + self.pendingLocalUserEchoMessageIDsByRunID = self.pendingLocalUserEchoMessageIDsByRunID.filter { + $0.value != existing.id + } + var updated = self.messages + updated[matchIndex] = OpenClawChatMessage( + id: existing.id, + role: incoming.role, + content: incoming.content, + timestamp: incoming.timestamp ?? existing.timestamp, + toolCallId: incoming.toolCallId, + toolName: incoming.toolName, + usage: incoming.usage, + stopReason: incoming.stopReason, + errorMessage: incoming.errorMessage) + self.messages = Self.dedupeMessages(updated) + self.prunePendingLocalUserEchoMessageIDs() + return true + } + private static func reconcileMessageIDs( previous: [OpenClawChatMessage], incoming: [OpenClawChatMessage]) -> [OpenClawChatMessage] @@ -619,12 +659,14 @@ public final class OpenClawChatViewModel { arguments: nil)) } let userMessageTimestamp = Date().timeIntervalSince1970 * 1000 + let userMessageID = UUID() self.messages.append( OpenClawChatMessage( - id: UUID(), + id: userMessageID, role: "user", content: userContent, timestamp: userMessageTimestamp)) + self.pendingLocalUserEchoMessageIDsByRunID[runId] = userMessageID // Clear input immediately for responsive UX (before network await) self.input = "" @@ -645,8 +687,10 @@ public final class OpenClawChatViewModel { "chat.ui transport send accepted sessionKey=\(sessionKey) " + "localRunId=\(runId) remoteRunId=\(response.runId)") if response.runId != runId { + let pendingUserMessageID = self.pendingLocalUserEchoMessageIDsByRunID.removeValue(forKey: runId) self.clearPendingRun(runId) self.pendingRuns.insert(response.runId) + self.pendingLocalUserEchoMessageIDsByRunID[response.runId] = pendingUserMessageID self.armPendingRunTimeout(runId: response.runId) } await self.refreshHistoryAfterRun() @@ -664,6 +708,7 @@ public final class OpenClawChatViewModel { userMessageTimestamp: userMessageTimestamp) } } catch { + self.pendingLocalUserEchoMessageIDsByRunID[runId] = nil self.clearPendingRun(runId) self.errorText = error.localizedDescription self.logDiagnostic( @@ -747,6 +792,7 @@ public final class OpenClawChatViewModel { self.onSessionChanged?(next) self.modelSelectionID = Self.defaultModelSelectionID self.messages = [] + self.pendingLocalUserEchoMessageIDsByRunID.removeAll() self.sessionId = nil self.pendingToolCallsById = [:] self.streamingAssistantText = nil @@ -1262,14 +1308,19 @@ public final class OpenClawChatViewModel { } guard let message = payload.message else { return } - guard message.role.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() == "user" else { - return - } - if self.pendingRunCount > 0 { + + let sanitized = Self.stripInboundMetadata(from: message) + + // The active client also receives the gateway's echo of the user turn it + // just sent. performSend already appended an optimistic row carrying a + // local client timestamp, while the echo carries a server timestamp, so + // the timestamp-keyed identity/dedupe paths below never collapse them. + // Adopt the server record only onto a still-visible row created by this + // client's pending send; same-content user turns from other clients must append. + if self.adoptPendingLocalUserEcho(incoming: sanitized) { return } - let sanitized = Self.stripInboundMetadata(from: message) let reconciled = Self.reconcileMessageIDs(previous: self.messages, incoming: self.messages + [sanitized]) self.messages = Self.dedupeMessages(reconciled) } @@ -1565,6 +1616,7 @@ public final class OpenClawChatViewModel { self.messages = Self.reconcileRunRefreshMessages( previous: self.messages, incoming: Self.decodeMessages(payload.messages ?? [])) + self.prunePendingLocalUserEchoMessageIDs() self.sessionId = payload.sessionId if !self.prefersExplicitThinkingLevel, let level = Self.normalizedThinkingLevel(payload.thinkingLevel) @@ -1597,6 +1649,7 @@ public final class OpenClawChatViewModel { private func clearPendingRun(_ runId: String) { let wasPending = self.pendingRuns.contains(runId) self.pendingRuns.remove(runId) + self.pendingLocalUserEchoMessageIDsByRunID[runId] = nil self.pendingRunTimeoutTasks[runId]?.cancel() self.pendingRunTimeoutTasks[runId] = nil if wasPending { @@ -1613,6 +1666,7 @@ public final class OpenClawChatViewModel { } self.pendingRunTimeoutTasks.removeAll() self.pendingRuns.removeAll() + self.pendingLocalUserEchoMessageIDsByRunID.removeAll() if let reason, !reason.isEmpty { self.errorText = reason for runId in runIds { diff --git a/apps/shared/OpenClawKit/Tests/OpenClawKitTests/ChatViewModelTests.swift b/apps/shared/OpenClawKit/Tests/OpenClawKitTests/ChatViewModelTests.swift index a100d602bf8d..7c1d1f5279b5 100644 --- a/apps/shared/OpenClawKit/Tests/OpenClawKitTests/ChatViewModelTests.swift +++ b/apps/shared/OpenClawKit/Tests/OpenClawKitTests/ChatViewModelTests.swift @@ -1121,6 +1121,126 @@ extension TestChatTransportState { #expect(await MainActor.run { vm.messages.isEmpty }) } + @Test func appendsExternalSessionAssistantMessageWhileRunPending() async throws { + let now = Date().timeIntervalSince1970 * 1000 + let (transport, vm) = await makeViewModel(historyResponses: [historyPayload()]) + + await MainActor.run { vm.load() } + try await waitUntil("bootstrap history loaded") { await MainActor.run { vm.messages.isEmpty } } + + await sendUserMessage(vm, text: "ping") + try await waitUntil("local run pending") { await MainActor.run { vm.pendingRunCount == 1 } } + + transport.emit( + .sessionMessage( + OpenClawSessionMessageEventPayload( + sessionKey: "agent:main:main", + message: OpenClawChatMessage( + role: "assistant", + content: [ + OpenClawChatMessageContent( + type: "text", + text: "agent reply", + mimeType: nil, + fileName: nil, + content: nil), + ], + timestamp: now + 1), + messageId: "msg-assistant-1", + messageSeq: 2))) + + try await waitUntil("assistant transcript visible while pending") { + await MainActor.run { + vm.messages.contains(where: { msg in + msg.role == "assistant" && + msg.content.first?.text == "agent reply" + }) + } + } + } + + @Test func dedupesGatewayEchoOfLocalUserMessage() async throws { + let (transport, vm) = await makeViewModel(historyResponses: [historyPayload()]) + + await MainActor.run { vm.load() } + try await waitUntil("bootstrap history loaded") { await MainActor.run { vm.messages.isEmpty } } + + await sendUserMessage(vm, text: "echo me") + try await waitUntil("optimistic user message visible") { + await MainActor.run { + vm.messages.count == 1 && vm.messages.first?.content.first?.text == "echo me" + } + } + + // Gateway echoes the same user turn over the session-message stream with a + // server-assigned timestamp that differs from the optimistic local one. + transport.emit( + .sessionMessage( + OpenClawSessionMessageEventPayload( + sessionKey: "agent:main:main", + message: OpenClawChatMessage( + role: "user", + content: [ + OpenClawChatMessageContent( + type: "text", + text: "echo me", + mimeType: nil, + fileName: nil, + content: nil), + ], + timestamp: Date().timeIntervalSince1970 * 1000 + 5_000), + messageId: "srv-echo-1", + messageSeq: 1))) + + try await Task.sleep(nanoseconds: 50_000_000) + #expect(await MainActor.run { + vm.messages.filter { msg in + msg.role == "user" && msg.content.first?.text == "echo me" + }.count == 1 + }) + } + + @Test func appendsSameContentUserTranscriptWhenItIsNotLocalEcho() async throws { + let now = Date().timeIntervalSince1970 * 1000 + let (transport, vm) = await makeViewModel( + historyResponses: [ + historyPayload(messages: [ + chatTextMessage(role: "user", text: "repeat", timestamp: now), + ]), + ]) + + await MainActor.run { vm.load() } + try await waitUntil("bootstrap history loaded") { + await MainActor.run { vm.messages.count == 1 } + } + + transport.emit( + .sessionMessage( + OpenClawSessionMessageEventPayload( + sessionKey: "agent:main:main", + message: OpenClawChatMessage( + role: "user", + content: [ + OpenClawChatMessageContent( + type: "text", + text: "repeat", + mimeType: nil, + fileName: nil, + content: nil), + ], + timestamp: now + 1_000), + messageId: "msg-repeat-2", + messageSeq: 2))) + + try await waitUntil("repeated user transcript appended") { + await MainActor.run { + vm.messages.filter { msg in + msg.role == "user" && msg.content.first?.text == "repeat" + }.count == 2 + } + } + } + @Test func ignoresExternalSessionUserMessageForOtherSession() async throws { let now = Date().timeIntervalSince1970 * 1000 let (transport, vm) = await makeViewModel(historyResponses: [historyPayload()])