fix(ios): update group chats in realtime

Subscribe the iOS gateway chat transport to per-session transcript events so group chats update when other clients send messages. Constrain local user echo adoption to the optimistic row tied to the still-pending send run, so repeated same-content user messages from other clients append instead of replacing history.

Fixes #80231.

Co-authored-by: Yuval Dinodia <yetvald@gmail.com>
This commit is contained in:
Yuval Dinodia
2026-05-31 18:24:59 +02:00
committed by GitHub
parent 4ab2eb45d0
commit a6ee3dbbdd
4 changed files with 306 additions and 38 deletions

View File

@@ -5,7 +5,7 @@ import OpenClawProtocol
import OSLog
struct IOSGatewayChatTransport: OpenClawChatTransport {
private static let logger = Logger(subsystem: "ai.openclaw", category: "ios.chat.transport")
static let logger = Logger(subsystem: "ai.openclaw", category: "ios.chat.transport")
static let defaultChatSendTimeoutMs = 30000
private let gateway: GatewayNodeSession
@@ -167,8 +167,13 @@ struct IOSGatewayChatTransport: OpenClawChatTransport {
}
func setActiveSessionKey(_ sessionKey: String) async throws {
// Operator clients receive chat events without node-style subscriptions.
// (chat.subscribe is a node event, not an operator RPC method.)
struct Params: Codable { var key: String }
let data = try JSONEncoder().encode(Params(key: sessionKey))
let json = String(data: data, encoding: .utf8)
_ = try await self.gateway.request(
method: "sessions.messages.subscribe",
paramsJSON: json,
timeoutSeconds: 10)
}
func resetSession(sessionKey: String) async throws {
@@ -257,35 +262,8 @@ struct IOSGatewayChatTransport: OpenClawChatTransport {
let stream = await self.gateway.subscribeServerEvents()
for await evt in stream {
if Task.isCancelled { return }
switch evt.event {
case "tick":
continuation.yield(.tick)
case "seqGap":
continuation.yield(.seqGap)
case "health":
guard let payload = evt.payload else { break }
let ok = (try? GatewayPayloadDecoding.decode(
payload,
as: OpenClawGatewayHealthOK.self))?.ok ?? true
continuation.yield(.health(ok: ok))
case "chat":
guard let payload = evt.payload else { break }
if let chatPayload = try? GatewayPayloadDecoding.decode(
payload,
as: OpenClawChatEventPayload.self)
{
continuation.yield(.chat(chatPayload))
}
case "agent":
guard let payload = evt.payload else { break }
if let agentPayload = try? GatewayPayloadDecoding.decode(
payload,
as: OpenClawAgentEventPayload.self)
{
continuation.yield(.agent(agentPayload))
}
default:
break
if let mapped = Self.mapEventFrame(evt) {
continuation.yield(mapped)
}
}
}
@@ -295,4 +273,48 @@ struct IOSGatewayChatTransport: OpenClawChatTransport {
}
}
}
static func mapEventFrame(_ evt: EventFrame) -> OpenClawChatTransportEvent? {
switch evt.event {
case "tick":
return .tick
case "seqGap":
return .seqGap
case "health":
guard let payload = evt.payload else { return nil }
let ok = (try? GatewayPayloadDecoding.decode(
payload,
as: OpenClawGatewayHealthOK.self))?.ok ?? true
return .health(ok: ok)
case "chat":
guard let payload = evt.payload else { return nil }
guard let chatPayload = try? GatewayPayloadDecoding.decode(
payload,
as: OpenClawChatEventPayload.self)
else {
return nil
}
return .chat(chatPayload)
case "session.message":
guard let payload = evt.payload else { return nil }
guard let message = try? GatewayPayloadDecoding.decode(
payload,
as: OpenClawSessionMessageEventPayload.self)
else {
return nil
}
return .sessionMessage(message)
case "agent":
guard let payload = evt.payload else { return nil }
guard let agentPayload = try? GatewayPayloadDecoding.decode(
payload,
as: OpenClawAgentEventPayload.self)
else {
return nil
}
return .agent(agentPayload)
default:
return nil
}
}
}

View File

@@ -1,5 +1,6 @@
import Foundation
import OpenClawKit
import OpenClawProtocol
import Testing
@testable import OpenClaw
@@ -84,5 +85,76 @@ import Testing
try await transport.resetSession(sessionKey: "node-test")
Issue.record("Expected resetSession to throw when gateway not connected")
} catch {}
do {
try await transport.setActiveSessionKey("node-test")
Issue.record("Expected setActiveSessionKey to throw when gateway not connected")
} catch {}
}
@Test func mapsSessionMessageEventToSessionMessage() {
let payload = AnyCodable([
"sessionKey": AnyCodable("agent:main:main"),
"messageId": AnyCodable("msg-1"),
"messageSeq": AnyCodable(7),
"message": AnyCodable([
"role": AnyCodable("assistant"),
"content": AnyCodable([
AnyCodable([
"type": AnyCodable("text"),
"text": AnyCodable("agent reply"),
]),
]),
"timestamp": AnyCodable(1234.5),
]),
])
let frame = EventFrame(
type: "event",
event: "session.message",
payload: payload,
seq: 1,
stateversion: nil)
let mapped = IOSGatewayChatTransport.mapEventFrame(frame)
switch mapped {
case let .sessionMessage(message):
#expect(message.sessionKey == "agent:main:main")
#expect(message.messageId == "msg-1")
#expect(message.messageSeq == 7)
#expect(message.message?.role == "assistant")
#expect(message.message?.content.first?.text == "agent reply")
default:
Issue.record("expected .sessionMessage from session.message event, got \(String(describing: mapped))")
}
}
@Test func mapsChatEventToChat() {
let payload = AnyCodable([
"runId": AnyCodable("run-1"),
"sessionKey": AnyCodable("main"),
"state": AnyCodable("final"),
])
let frame = EventFrame(type: "event", event: "chat", payload: payload, seq: 1, stateversion: nil)
let mapped = IOSGatewayChatTransport.mapEventFrame(frame)
switch mapped {
case let .chat(chat):
#expect(chat.runId == "run-1")
#expect(chat.sessionKey == "main")
#expect(chat.state == "final")
default:
Issue.record("expected .chat from chat event, got \(String(describing: mapped))")
}
}
@Test func mapsUnknownEventToNil() {
let frame = EventFrame(
type: "event",
event: "unknown",
payload: AnyCodable(["a": AnyCodable(1)]),
seq: 1,
stateversion: nil)
let mapped = IOSGatewayChatTransport.mapEventFrame(frame)
#expect(mapped == nil)
}
}

View File

@@ -44,6 +44,8 @@ public final class OpenClawChatViewModel {
didSet { self.pendingRunCount = self.pendingRuns.count }
}
private var pendingLocalUserEchoMessageIDsByRunID: [String: UUID] = [:]
@ObservationIgnored
private nonisolated(unsafe) var pendingRunTimeoutTasks: [String: Task<Void, Never>] = [:]
private let pendingRunTimeoutMs: UInt64 = 120_000
@@ -279,6 +281,7 @@ public final class OpenClawChatViewModel {
self.messages = Self.reconcileMessageIDs(
previous: self.messages,
incoming: Self.decodeMessages(payload.messages ?? []))
self.prunePendingLocalUserEchoMessageIDs()
self.sessionId = payload.sessionId
if !self.prefersExplicitThinkingLevel,
let level = Self.normalizedThinkingLevel(payload.thinkingLevel)
@@ -393,6 +396,43 @@ public final class OpenClawChatViewModel {
return [role, toolCallId, toolName, contentFingerprint].joined(separator: "|")
}
private func prunePendingLocalUserEchoMessageIDs() {
guard !self.pendingLocalUserEchoMessageIDsByRunID.isEmpty else { return }
let visibleMessageIDs = Set(self.messages.map(\.id))
self.pendingLocalUserEchoMessageIDsByRunID = self.pendingLocalUserEchoMessageIDsByRunID.filter {
self.pendingRuns.contains($0.key) && visibleMessageIDs.contains($0.value)
}
}
private func adoptPendingLocalUserEcho(incoming: OpenClawChatMessage) -> Bool {
guard let incomingKey = Self.userRefreshIdentityKey(for: incoming) else { return false }
guard let matchIndex = self.messages.lastIndex(where: { existing in
self.pendingLocalUserEchoMessageIDsByRunID.values.contains(existing.id)
&& Self.userRefreshIdentityKey(for: existing) == incomingKey
}) else {
return false
}
let existing = self.messages[matchIndex]
self.pendingLocalUserEchoMessageIDsByRunID = self.pendingLocalUserEchoMessageIDsByRunID.filter {
$0.value != existing.id
}
var updated = self.messages
updated[matchIndex] = OpenClawChatMessage(
id: existing.id,
role: incoming.role,
content: incoming.content,
timestamp: incoming.timestamp ?? existing.timestamp,
toolCallId: incoming.toolCallId,
toolName: incoming.toolName,
usage: incoming.usage,
stopReason: incoming.stopReason,
errorMessage: incoming.errorMessage)
self.messages = Self.dedupeMessages(updated)
self.prunePendingLocalUserEchoMessageIDs()
return true
}
private static func reconcileMessageIDs(
previous: [OpenClawChatMessage],
incoming: [OpenClawChatMessage]) -> [OpenClawChatMessage]
@@ -619,12 +659,14 @@ public final class OpenClawChatViewModel {
arguments: nil))
}
let userMessageTimestamp = Date().timeIntervalSince1970 * 1000
let userMessageID = UUID()
self.messages.append(
OpenClawChatMessage(
id: UUID(),
id: userMessageID,
role: "user",
content: userContent,
timestamp: userMessageTimestamp))
self.pendingLocalUserEchoMessageIDsByRunID[runId] = userMessageID
// Clear input immediately for responsive UX (before network await)
self.input = ""
@@ -645,8 +687,10 @@ public final class OpenClawChatViewModel {
"chat.ui transport send accepted sessionKey=\(sessionKey) "
+ "localRunId=\(runId) remoteRunId=\(response.runId)")
if response.runId != runId {
let pendingUserMessageID = self.pendingLocalUserEchoMessageIDsByRunID.removeValue(forKey: runId)
self.clearPendingRun(runId)
self.pendingRuns.insert(response.runId)
self.pendingLocalUserEchoMessageIDsByRunID[response.runId] = pendingUserMessageID
self.armPendingRunTimeout(runId: response.runId)
}
await self.refreshHistoryAfterRun()
@@ -664,6 +708,7 @@ public final class OpenClawChatViewModel {
userMessageTimestamp: userMessageTimestamp)
}
} catch {
self.pendingLocalUserEchoMessageIDsByRunID[runId] = nil
self.clearPendingRun(runId)
self.errorText = error.localizedDescription
self.logDiagnostic(
@@ -747,6 +792,7 @@ public final class OpenClawChatViewModel {
self.onSessionChanged?(next)
self.modelSelectionID = Self.defaultModelSelectionID
self.messages = []
self.pendingLocalUserEchoMessageIDsByRunID.removeAll()
self.sessionId = nil
self.pendingToolCallsById = [:]
self.streamingAssistantText = nil
@@ -1262,14 +1308,19 @@ public final class OpenClawChatViewModel {
}
guard let message = payload.message else { return }
guard message.role.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() == "user" else {
return
}
if self.pendingRunCount > 0 {
let sanitized = Self.stripInboundMetadata(from: message)
// The active client also receives the gateway's echo of the user turn it
// just sent. performSend already appended an optimistic row carrying a
// local client timestamp, while the echo carries a server timestamp, so
// the timestamp-keyed identity/dedupe paths below never collapse them.
// Adopt the server record only onto a still-visible row created by this
// client's pending send; same-content user turns from other clients must append.
if self.adoptPendingLocalUserEcho(incoming: sanitized) {
return
}
let sanitized = Self.stripInboundMetadata(from: message)
let reconciled = Self.reconcileMessageIDs(previous: self.messages, incoming: self.messages + [sanitized])
self.messages = Self.dedupeMessages(reconciled)
}
@@ -1565,6 +1616,7 @@ public final class OpenClawChatViewModel {
self.messages = Self.reconcileRunRefreshMessages(
previous: self.messages,
incoming: Self.decodeMessages(payload.messages ?? []))
self.prunePendingLocalUserEchoMessageIDs()
self.sessionId = payload.sessionId
if !self.prefersExplicitThinkingLevel,
let level = Self.normalizedThinkingLevel(payload.thinkingLevel)
@@ -1597,6 +1649,7 @@ public final class OpenClawChatViewModel {
private func clearPendingRun(_ runId: String) {
let wasPending = self.pendingRuns.contains(runId)
self.pendingRuns.remove(runId)
self.pendingLocalUserEchoMessageIDsByRunID[runId] = nil
self.pendingRunTimeoutTasks[runId]?.cancel()
self.pendingRunTimeoutTasks[runId] = nil
if wasPending {
@@ -1613,6 +1666,7 @@ public final class OpenClawChatViewModel {
}
self.pendingRunTimeoutTasks.removeAll()
self.pendingRuns.removeAll()
self.pendingLocalUserEchoMessageIDsByRunID.removeAll()
if let reason, !reason.isEmpty {
self.errorText = reason
for runId in runIds {

View File

@@ -1121,6 +1121,126 @@ extension TestChatTransportState {
#expect(await MainActor.run { vm.messages.isEmpty })
}
@Test func appendsExternalSessionAssistantMessageWhileRunPending() async throws {
let now = Date().timeIntervalSince1970 * 1000
let (transport, vm) = await makeViewModel(historyResponses: [historyPayload()])
await MainActor.run { vm.load() }
try await waitUntil("bootstrap history loaded") { await MainActor.run { vm.messages.isEmpty } }
await sendUserMessage(vm, text: "ping")
try await waitUntil("local run pending") { await MainActor.run { vm.pendingRunCount == 1 } }
transport.emit(
.sessionMessage(
OpenClawSessionMessageEventPayload(
sessionKey: "agent:main:main",
message: OpenClawChatMessage(
role: "assistant",
content: [
OpenClawChatMessageContent(
type: "text",
text: "agent reply",
mimeType: nil,
fileName: nil,
content: nil),
],
timestamp: now + 1),
messageId: "msg-assistant-1",
messageSeq: 2)))
try await waitUntil("assistant transcript visible while pending") {
await MainActor.run {
vm.messages.contains(where: { msg in
msg.role == "assistant" &&
msg.content.first?.text == "agent reply"
})
}
}
}
@Test func dedupesGatewayEchoOfLocalUserMessage() async throws {
let (transport, vm) = await makeViewModel(historyResponses: [historyPayload()])
await MainActor.run { vm.load() }
try await waitUntil("bootstrap history loaded") { await MainActor.run { vm.messages.isEmpty } }
await sendUserMessage(vm, text: "echo me")
try await waitUntil("optimistic user message visible") {
await MainActor.run {
vm.messages.count == 1 && vm.messages.first?.content.first?.text == "echo me"
}
}
// Gateway echoes the same user turn over the session-message stream with a
// server-assigned timestamp that differs from the optimistic local one.
transport.emit(
.sessionMessage(
OpenClawSessionMessageEventPayload(
sessionKey: "agent:main:main",
message: OpenClawChatMessage(
role: "user",
content: [
OpenClawChatMessageContent(
type: "text",
text: "echo me",
mimeType: nil,
fileName: nil,
content: nil),
],
timestamp: Date().timeIntervalSince1970 * 1000 + 5_000),
messageId: "srv-echo-1",
messageSeq: 1)))
try await Task.sleep(nanoseconds: 50_000_000)
#expect(await MainActor.run {
vm.messages.filter { msg in
msg.role == "user" && msg.content.first?.text == "echo me"
}.count == 1
})
}
@Test func appendsSameContentUserTranscriptWhenItIsNotLocalEcho() async throws {
let now = Date().timeIntervalSince1970 * 1000
let (transport, vm) = await makeViewModel(
historyResponses: [
historyPayload(messages: [
chatTextMessage(role: "user", text: "repeat", timestamp: now),
]),
])
await MainActor.run { vm.load() }
try await waitUntil("bootstrap history loaded") {
await MainActor.run { vm.messages.count == 1 }
}
transport.emit(
.sessionMessage(
OpenClawSessionMessageEventPayload(
sessionKey: "agent:main:main",
message: OpenClawChatMessage(
role: "user",
content: [
OpenClawChatMessageContent(
type: "text",
text: "repeat",
mimeType: nil,
fileName: nil,
content: nil),
],
timestamp: now + 1_000),
messageId: "msg-repeat-2",
messageSeq: 2)))
try await waitUntil("repeated user transcript appended") {
await MainActor.run {
vm.messages.filter { msg in
msg.role == "user" && msg.content.first?.text == "repeat"
}.count == 2
}
}
}
@Test func ignoresExternalSessionUserMessageForOtherSession() async throws {
let now = Date().timeIntervalSince1970 * 1000
let (transport, vm) = await makeViewModel(historyResponses: [historyPayload()])