refactor: unify message decode and RPC payload mapping

This commit is contained in:
Peter Steinberger 2026-02-16 05:54:16 +01:00
parent ba9a1ff079
commit 5a9f5441b9
8 changed files with 527 additions and 328 deletions

View File

@ -2,6 +2,13 @@ import Foundation
import SQLite
extension MessageStore {
struct DecodedReaction: Sendable {
let isReaction: Bool
let reactionType: ReactionType?
let isReactionAdd: Bool?
let reactedToGUID: String?
}
static func tableColumns(connection: Connection, table: String) -> Set<String> {
do {
let rows = try connection.prepare("PRAGMA table_info(\(table))")
@ -116,4 +123,38 @@ extension MessageStore {
}
return normalized
}
func decodeReaction(
associatedType: Int?,
associatedGUID: String,
text: String
) -> DecodedReaction {
guard let typeValue = associatedType, ReactionType.isReaction(typeValue) else {
return DecodedReaction(
isReaction: false,
reactionType: nil,
isReactionAdd: nil,
reactedToGUID: nil
)
}
let isAdd = ReactionType.isReactionAdd(typeValue)
let rawType = isAdd ? typeValue : typeValue - 1000
let customEmoji = (rawType == 2006) ? extractCustomEmoji(from: text) : nil
guard let reactionType = ReactionType(rawValue: rawType, customEmoji: customEmoji) else {
return DecodedReaction(
isReaction: true,
reactionType: nil,
isReactionAdd: isAdd,
reactedToGUID: normalizeAssociatedGUID(associatedGUID)
)
}
return DecodedReaction(
isReaction: true,
reactionType: reactionType,
isReactionAdd: isAdd,
reactedToGUID: normalizeAssociatedGUID(associatedGUID)
)
}
}

View File

@ -1,6 +1,42 @@
import Foundation
import SQLite
private struct MessageRowColumns {
let rowID: Int
let chatID: Int?
let handleID: Int
let sender: Int
let text: Int
let date: Int
let isFromMe: Int
let service: Int
let isAudioMessage: Int
let destinationCallerID: Int
let guid: Int
let associatedGUID: Int
let associatedType: Int
let attachments: Int
let body: Int
let threadOriginatorGUID: Int
}
private struct DecodedMessageRow {
let rowID: Int64
let chatID: Int64
let handleID: Int64?
let sender: String
let text: String
let date: Date
let isFromMe: Bool
let service: String
let destinationCallerID: String
let guid: String
let associatedGUID: String
let associatedType: Int?
let attachments: Int
let threadOriginatorGUID: String
}
extension MessageStore {
public func messages(chatID: Int64, limit: Int) throws -> [Message] {
return try messages(chatID: chatID, limit: limit, filter: nil)
@ -56,67 +92,52 @@ extension MessageStore {
sql += " ORDER BY m.date DESC LIMIT ?"
bindings.append(limit)
let columns = MessageRowColumns(
rowID: 0,
chatID: nil,
handleID: 1,
sender: 2,
text: 3,
date: 4,
isFromMe: 5,
service: 6,
isAudioMessage: 7,
destinationCallerID: 8,
guid: 9,
associatedGUID: 10,
associatedType: 11,
attachments: 12,
body: 13,
threadOriginatorGUID: 14
)
return try withConnection { db in
var messages: [Message] = []
for row in try db.prepare(sql, bindings) {
let colRowID = 0
let colHandleID = 1
let colSender = 2
let colText = 3
let colDate = 4
let colIsFromMe = 5
let colService = 6
let colIsAudioMessage = 7
let colDestinationCallerID = 8
let colGUID = 9
let colAssociatedGUID = 10
let colAssociatedType = 11
let colAttachments = 12
let colBody = 13
let colThreadOriginatorGUID = 14
let rowID = int64Value(row[colRowID]) ?? 0
let handleID = int64Value(row[colHandleID])
var sender = stringValue(row[colSender])
let text = stringValue(row[colText])
let date = appleDate(from: int64Value(row[colDate]))
let isFromMe = boolValue(row[colIsFromMe])
let service = stringValue(row[colService])
let isAudioMessage = boolValue(row[colIsAudioMessage])
let destinationCallerID = stringValue(row[colDestinationCallerID])
if sender.isEmpty && !destinationCallerID.isEmpty {
sender = destinationCallerID
}
let guid = stringValue(row[colGUID])
let associatedGuid = stringValue(row[colAssociatedGUID])
let associatedType = intValue(row[colAssociatedType])
let attachments = intValue(row[colAttachments]) ?? 0
let body = dataValue(row[colBody])
let threadOriginatorGUID = stringValue(row[colThreadOriginatorGUID])
var resolvedText = text.isEmpty ? TypedStreamParser.parseAttributedBody(body) : text
if isAudioMessage, let transcription = try audioTranscription(for: rowID) {
resolvedText = transcription
}
let decoded = try decodeMessageRow(row, columns: columns, fallbackChatID: chatID)
let replyToGUID = replyToGUID(
associatedGuid: associatedGuid,
associatedType: associatedType
associatedGuid: decoded.associatedGUID,
associatedType: decoded.associatedType
)
messages.append(
Message(
rowID: rowID,
chatID: chatID,
sender: sender,
text: resolvedText,
date: date,
isFromMe: isFromMe,
service: service,
handleID: handleID,
attachmentsCount: attachments,
guid: guid,
replyToGUID: replyToGUID,
threadOriginatorGUID: threadOriginatorGUID.isEmpty ? nil : threadOriginatorGUID,
destinationCallerID: destinationCallerID.isEmpty ? nil : destinationCallerID
rowID: decoded.rowID,
chatID: decoded.chatID,
sender: decoded.sender,
text: decoded.text,
date: decoded.date,
isFromMe: decoded.isFromMe,
service: decoded.service,
handleID: decoded.handleID,
attachmentsCount: decoded.attachments,
guid: decoded.guid,
routing: Message.RoutingMetadata(
replyToGUID: replyToGUID,
threadOriginatorGUID: decoded.threadOriginatorGUID.isEmpty
? nil : decoded.threadOriginatorGUID,
destinationCallerID: decoded.destinationCallerID.isEmpty
? nil : decoded.destinationCallerID
)
))
}
return messages
@ -124,10 +145,20 @@ extension MessageStore {
}
public func messagesAfter(afterRowID: Int64, chatID: Int64?, limit: Int) throws -> [Message] {
return try messagesAfter(afterRowID: afterRowID, chatID: chatID, limit: limit, includeReactions: false)
return try messagesAfter(
afterRowID: afterRowID,
chatID: chatID,
limit: limit,
includeReactions: false
)
}
public func messagesAfter(afterRowID: Int64, chatID: Int64?, limit: Int, includeReactions: Bool) throws -> [Message] {
public func messagesAfter(
afterRowID: Int64,
chatID: Int64?,
limit: Int,
includeReactions: Bool
) throws -> [Message] {
let bodyColumn = hasAttributedBody ? "m.attributedBody" : "NULL"
let guidColumn = hasReactionColumns ? "m.guid" : "NULL"
let associatedGuidColumn = hasReactionColumns ? "m.associated_message_guid" : "NULL"
@ -141,9 +172,12 @@ extension MessageStore {
if includeReactions {
reactionFilter = ""
} else {
reactionFilter = hasReactionColumns
? " AND (m.associated_message_type IS NULL OR m.associated_message_type < 2000 OR m.associated_message_type > 3006)"
: ""
if hasReactionColumns {
reactionFilter =
" AND (m.associated_message_type IS NULL OR m.associated_message_type < 2000 OR m.associated_message_type > 3006)"
} else {
reactionFilter = ""
}
}
var sql = """
SELECT m.ROWID, cmj.chat_id, m.handle_id, h.id, IFNULL(m.text, '') AS text, m.date, m.is_from_me, m.service,
@ -164,92 +198,117 @@ extension MessageStore {
}
sql += " ORDER BY m.ROWID ASC LIMIT ?"
bindings.append(limit)
let columns = MessageRowColumns(
rowID: 0,
chatID: 1,
handleID: 2,
sender: 3,
text: 4,
date: 5,
isFromMe: 6,
service: 7,
isAudioMessage: 8,
destinationCallerID: 9,
guid: 10,
associatedGUID: 11,
associatedType: 12,
attachments: 13,
body: 14,
threadOriginatorGUID: 15
)
return try withConnection { db in
var messages: [Message] = []
for row in try db.prepare(sql, bindings) {
let colRowID = 0
let colChatID = 1
let colHandleID = 2
let colSender = 3
let colText = 4
let colDate = 5
let colIsFromMe = 6
let colService = 7
let colIsAudioMessage = 8
let colDestinationCallerID = 9
let colGUID = 10
let colAssociatedGUID = 11
let colAssociatedType = 12
let colAttachments = 13
let colBody = 14
let colThreadOriginatorGUID = 15
let rowID = int64Value(row[colRowID]) ?? 0
let resolvedChatID = int64Value(row[colChatID]) ?? chatID ?? 0
let handleID = int64Value(row[colHandleID])
var sender = stringValue(row[colSender])
let text = stringValue(row[colText])
let date = appleDate(from: int64Value(row[colDate]))
let isFromMe = boolValue(row[colIsFromMe])
let service = stringValue(row[colService])
let isAudioMessage = boolValue(row[colIsAudioMessage])
let destinationCallerID = stringValue(row[colDestinationCallerID])
if sender.isEmpty && !destinationCallerID.isEmpty {
sender = destinationCallerID
}
let guid = stringValue(row[colGUID])
let associatedGuid = stringValue(row[colAssociatedGUID])
let associatedType = intValue(row[colAssociatedType])
let attachments = intValue(row[colAttachments]) ?? 0
let body = dataValue(row[colBody])
let threadOriginatorGUID = stringValue(row[colThreadOriginatorGUID])
var resolvedText = text.isEmpty ? TypedStreamParser.parseAttributedBody(body) : text
if isAudioMessage, let transcription = try audioTranscription(for: rowID) {
resolvedText = transcription
}
let decoded = try decodeMessageRow(row, columns: columns, fallbackChatID: chatID)
let replyToGUID = replyToGUID(
associatedGuid: associatedGuid,
associatedType: associatedType
associatedGuid: decoded.associatedGUID,
associatedType: decoded.associatedType
)
// Determine if this is a reaction event
let typeValue = associatedType ?? 0
let isReactionEvent = ReactionType.isReaction(typeValue)
var reactionType: ReactionType? = nil
var isReactionAdd: Bool? = nil
var reactedToGUID: String? = nil
if isReactionEvent {
isReactionAdd = ReactionType.isReactionAdd(typeValue)
let rawType = (isReactionAdd ?? true) ? typeValue : typeValue - 1000
let customEmoji: String? = (rawType == 2006) ? extractCustomEmoji(from: resolvedText) : nil
reactionType = ReactionType(rawValue: rawType, customEmoji: customEmoji)
reactedToGUID = normalizeAssociatedGUID(associatedGuid)
}
let reaction = decodeReaction(
associatedType: decoded.associatedType,
associatedGUID: decoded.associatedGUID,
text: decoded.text
)
messages.append(
Message(
rowID: rowID,
chatID: resolvedChatID,
sender: sender,
text: resolvedText,
date: date,
isFromMe: isFromMe,
service: service,
handleID: handleID,
attachmentsCount: attachments,
guid: guid,
replyToGUID: replyToGUID,
threadOriginatorGUID: threadOriginatorGUID.isEmpty ? nil : threadOriginatorGUID,
destinationCallerID: destinationCallerID.isEmpty ? nil : destinationCallerID,
isReaction: isReactionEvent,
reactionType: reactionType,
isReactionAdd: isReactionAdd,
reactedToGUID: reactedToGUID
rowID: decoded.rowID,
chatID: decoded.chatID,
sender: decoded.sender,
text: decoded.text,
date: decoded.date,
isFromMe: decoded.isFromMe,
service: decoded.service,
handleID: decoded.handleID,
attachmentsCount: decoded.attachments,
guid: decoded.guid,
routing: Message.RoutingMetadata(
replyToGUID: replyToGUID,
threadOriginatorGUID: decoded.threadOriginatorGUID.isEmpty
? nil : decoded.threadOriginatorGUID,
destinationCallerID: decoded.destinationCallerID.isEmpty
? nil : decoded.destinationCallerID
),
reaction: Message.ReactionMetadata(
isReaction: reaction.isReaction,
reactionType: reaction.reactionType,
isReactionAdd: reaction.isReactionAdd,
reactedToGUID: reaction.reactedToGUID
)
))
}
return messages
}
}
private func decodeMessageRow(
_ row: [Binding?],
columns: MessageRowColumns,
fallbackChatID: Int64?
) throws -> DecodedMessageRow {
let rowID = int64Value(row[columns.rowID]) ?? 0
let resolvedChatID = columns.chatID.flatMap { int64Value(row[$0]) } ?? fallbackChatID ?? 0
let handleID = int64Value(row[columns.handleID])
let sender = stringValue(row[columns.sender])
let text = stringValue(row[columns.text])
let date = appleDate(from: int64Value(row[columns.date]))
let isFromMe = boolValue(row[columns.isFromMe])
let service = stringValue(row[columns.service])
let isAudioMessage = boolValue(row[columns.isAudioMessage])
let destinationCallerID = stringValue(row[columns.destinationCallerID])
let guid = stringValue(row[columns.guid])
let associatedGUID = stringValue(row[columns.associatedGUID])
let associatedType = intValue(row[columns.associatedType])
let attachments = intValue(row[columns.attachments]) ?? 0
let body = dataValue(row[columns.body])
let threadOriginatorGUID = stringValue(row[columns.threadOriginatorGUID])
var resolvedText = text.isEmpty ? TypedStreamParser.parseAttributedBody(body) : text
if isAudioMessage, let transcription = try audioTranscription(for: rowID) {
resolvedText = transcription
}
var resolvedSender = sender
if resolvedSender.isEmpty && !destinationCallerID.isEmpty {
resolvedSender = destinationCallerID
}
return DecodedMessageRow(
rowID: rowID,
chatID: resolvedChatID,
handleID: handleID,
sender: resolvedSender,
text: resolvedText,
date: date,
isFromMe: isFromMe,
service: service,
destinationCallerID: destinationCallerID,
guid: guid,
associatedGUID: associatedGUID,
associatedType: associatedType,
attachments: attachments,
threadOriginatorGUID: threadOriginatorGUID
)
}
}

View File

@ -98,24 +98,21 @@ extension MessageStore {
let destinationCallerID = stringValue(row[9])
let body = dataValue(row[10])
let origRowID = int64Value(row[11])
if sender.isEmpty && !destinationCallerID.isEmpty {
sender = destinationCallerID
}
let resolvedText = text.isEmpty ? TypedStreamParser.parseAttributedBody(body) : text
let isAdd = ReactionType.isReactionAdd(typeValue)
let rawType = isAdd ? typeValue : typeValue - 1000
// Extract custom emoji for type 2006/3006
let customEmoji: String? = (rawType == 2006) ? extractCustomEmoji(from: resolvedText) : nil
guard let reactionType = ReactionType(rawValue: rawType, customEmoji: customEmoji) else {
let decoded = decodeReaction(
associatedType: typeValue,
associatedGUID: associatedGUID,
text: resolvedText
)
guard let reactionType = decoded.reactionType, let isAdd = decoded.isReactionAdd else {
continue
}
// Normalize the associated GUID (remove "p:X/" prefix)
let reactedToGUID = normalizeAssociatedGUID(associatedGUID)
events.append(ReactionEvent(
rowID: rowID,
chatID: resolvedChatID,
@ -124,7 +121,7 @@ extension MessageStore {
sender: sender,
isFromMe: isFromMe,
date: date,
reactedToGUID: reactedToGUID,
reactedToGUID: decoded.reactedToGUID ?? "",
reactedToID: origRowID,
text: resolvedText
))

View File

@ -217,6 +217,41 @@ public struct ChatInfo: Sendable, Equatable {
}
public struct Message: Sendable, Equatable {
public struct RoutingMetadata: Sendable, Equatable {
public let replyToGUID: String?
public let threadOriginatorGUID: String?
public let destinationCallerID: String?
public init(
replyToGUID: String? = nil,
threadOriginatorGUID: String? = nil,
destinationCallerID: String? = nil
) {
self.replyToGUID = replyToGUID
self.threadOriginatorGUID = threadOriginatorGUID
self.destinationCallerID = destinationCallerID
}
}
public struct ReactionMetadata: Sendable, Equatable {
public let isReaction: Bool
public let reactionType: ReactionType?
public let isReactionAdd: Bool?
public let reactedToGUID: String?
public init(
isReaction: Bool = false,
reactionType: ReactionType? = nil,
isReactionAdd: Bool? = nil,
reactedToGUID: String? = nil
) {
self.isReaction = isReaction
self.reactionType = reactionType
self.isReactionAdd = isReactionAdd
self.reactedToGUID = reactedToGUID
}
}
public let rowID: Int64
public let chatID: Int64
public let guid: String
@ -244,6 +279,39 @@ public struct Message: Sendable, Equatable {
/// The GUID of the message being reacted to (only set when isReaction is true)
public let reactedToGUID: String?
public init(
rowID: Int64,
chatID: Int64,
sender: String,
text: String,
date: Date,
isFromMe: Bool,
service: String,
handleID: Int64?,
attachmentsCount: Int,
guid: String = "",
routing: RoutingMetadata = RoutingMetadata(),
reaction: ReactionMetadata = ReactionMetadata()
) {
self.rowID = rowID
self.chatID = chatID
self.guid = guid
self.replyToGUID = routing.replyToGUID
self.threadOriginatorGUID = routing.threadOriginatorGUID
self.sender = sender
self.text = text
self.date = date
self.isFromMe = isFromMe
self.service = service
self.handleID = handleID
self.attachmentsCount = attachmentsCount
self.destinationCallerID = routing.destinationCallerID
self.isReaction = reaction.isReaction
self.reactionType = reaction.reactionType
self.isReactionAdd = reaction.isReactionAdd
self.reactedToGUID = reaction.reactedToGUID
}
public init(
rowID: Int64,
chatID: Int64,
@ -263,23 +331,29 @@ public struct Message: Sendable, Equatable {
isReactionAdd: Bool? = nil,
reactedToGUID: String? = nil
) {
self.rowID = rowID
self.chatID = chatID
self.guid = guid
self.replyToGUID = replyToGUID
self.threadOriginatorGUID = threadOriginatorGUID
self.sender = sender
self.text = text
self.date = date
self.isFromMe = isFromMe
self.service = service
self.handleID = handleID
self.attachmentsCount = attachmentsCount
self.destinationCallerID = destinationCallerID
self.isReaction = isReaction
self.reactionType = reactionType
self.isReactionAdd = isReactionAdd
self.reactedToGUID = reactedToGUID
self.init(
rowID: rowID,
chatID: chatID,
sender: sender,
text: text,
date: date,
isFromMe: isFromMe,
service: service,
handleID: handleID,
attachmentsCount: attachmentsCount,
guid: guid,
routing: RoutingMetadata(
replyToGUID: replyToGUID,
threadOriginatorGUID: threadOriginatorGUID,
destinationCallerID: destinationCallerID
),
reaction: ReactionMetadata(
isReaction: isReaction,
reactionType: reactionType,
isReactionAdd: isReactionAdd,
reactedToGUID: reactedToGUID
)
)
}
}

View File

@ -62,7 +62,7 @@ struct MessagePayload: Codable {
self.attachments = attachments.map { AttachmentPayload(meta: $0) }
self.reactions = reactions.map { ReactionPayload(reaction: $0) }
self.destinationCallerID = message.destinationCallerID
// Reaction event metadata
if message.isReaction {
self.isReaction = true
@ -100,6 +100,18 @@ struct MessagePayload: Codable {
}
}
extension MessagePayload {
func asDictionary() throws -> [String: Any] {
let data = try MessagePayload.encoder.encode(self)
let json = try JSONSerialization.jsonObject(with: data)
return (json as? [String: Any]) ?? [:]
}
private static let encoder: JSONEncoder = {
JSONEncoder()
}()
}
struct ReactionPayload: Codable {
let id: Int64
let type: String

View File

@ -28,49 +28,17 @@ func messagePayload(
participants: [String],
attachments: [AttachmentMeta],
reactions: [Reaction]
) -> [String: Any] {
) throws -> [String: Any] {
let identifier = chatInfo?.identifier ?? ""
let guid = chatInfo?.guid ?? ""
let name = chatInfo?.name ?? ""
var payload: [String: Any] = [
"id": message.rowID,
"chat_id": message.chatID,
"guid": message.guid,
"sender": message.sender,
"is_from_me": message.isFromMe,
"text": message.text,
"created_at": CLIISO8601.format(message.date),
"attachments": attachments.map { attachmentPayload($0) },
"reactions": reactions.map { reactionPayload($0) },
"chat_identifier": identifier,
"chat_guid": guid,
"chat_name": name,
"participants": participants,
"is_group": isGroupHandle(identifier: identifier, guid: guid),
]
if let replyToGUID = message.replyToGUID, !replyToGUID.isEmpty {
payload["reply_to_guid"] = replyToGUID
}
if let destinationCallerID = message.destinationCallerID, !destinationCallerID.isEmpty {
payload["destination_caller_id"] = destinationCallerID
}
// Add reaction event metadata if this message is a reaction
if message.isReaction {
payload["is_reaction"] = true
if let reactionType = message.reactionType {
payload["reaction_type"] = reactionType.name
payload["reaction_emoji"] = reactionType.emoji
}
if let isReactionAdd = message.isReactionAdd {
payload["is_reaction_add"] = isReactionAdd
}
if let reactedToGUID = message.reactedToGUID, !reactedToGUID.isEmpty {
payload["reacted_to_guid"] = reactedToGUID
}
}
if let threadOriginatorGUID = message.threadOriginatorGUID, !threadOriginatorGUID.isEmpty {
payload["thread_originator_guid"] = threadOriginatorGUID
}
let core = MessagePayload(message: message, attachments: attachments, reactions: reactions)
var payload = try core.asDictionary()
payload["chat_identifier"] = identifier
payload["chat_guid"] = guid
payload["chat_name"] = name
payload["participants"] = participants
payload["is_group"] = isGroupHandle(identifier: identifier, guid: guid)
return payload
}

View File

@ -12,10 +12,9 @@ final class RPCServer {
private let watcher: MessageWatcher
private let output: RPCOutput
private let cache: ChatCache
private let subscriptions = SubscriptionStore()
private let verbose: Bool
private let sendMessage: (MessageSendOptions) throws -> Void
private var nextSubscriptionID = 1
private var subscriptions: [Int: Task<Void, Never>] = [:]
init(
store: MessageStore,
@ -37,9 +36,7 @@ final class RPCServer {
if trimmed.isEmpty { continue }
await handleLine(trimmed)
}
for task in subscriptions.values {
task.cancel()
}
await subscriptions.cancelAll()
}
func handleLineForTesting(_ line: String) async {
@ -77,117 +74,15 @@ final class RPCServer {
do {
switch method {
case "chats.list":
let limit = intParam(params["limit"]) ?? 20
let chats = try store.listChats(limit: max(limit, 1))
let payloads = try chats.map { chat in
let info = try cache.info(chatID: chat.id)
let participants = try cache.participants(chatID: chat.id)
let identifier = info?.identifier ?? chat.identifier
let guid = info?.guid ?? ""
let name = (info?.name.isEmpty == false ? info?.name : nil) ?? chat.name
let service = info?.service ?? chat.service
return chatPayload(
id: chat.id,
identifier: identifier,
guid: guid,
name: name,
service: service,
lastMessageAt: chat.lastMessageAt,
participants: participants
)
}
respond(id: id, result: ["chats": payloads])
try await handleChatsList(id: id, params: params)
case "messages.history":
guard let chatID = int64Param(params["chat_id"]) else {
throw RPCError.invalidParams("chat_id is required")
}
let limit = intParam(params["limit"]) ?? 50
let participants = stringArrayParam(params["participants"])
let startISO = stringParam(params["start"])
let endISO = stringParam(params["end"])
let includeAttachments = boolParam(params["attachments"]) ?? false
let filter = try MessageFilter.fromISO(
participants: participants,
startISO: startISO,
endISO: endISO
)
let filtered = try store.messages(chatID: chatID, limit: max(limit, 1), filter: filter)
let payloads = try filtered.map { message in
try buildMessagePayload(
store: store,
cache: cache,
message: message,
includeAttachments: includeAttachments
)
}
respond(id: id, result: ["messages": payloads])
try await handleMessagesHistory(id: id, params: params)
case "watch.subscribe":
let chatID = int64Param(params["chat_id"])
let sinceRowID = int64Param(params["since_rowid"])
let participants = stringArrayParam(params["participants"])
let startISO = stringParam(params["start"])
let endISO = stringParam(params["end"])
let includeAttachments = boolParam(params["attachments"]) ?? false
let includeReactions = boolParam(params["include_reactions"]) ?? false
let filter = try MessageFilter.fromISO(
participants: participants,
startISO: startISO,
endISO: endISO
)
let config = MessageWatcherConfiguration(includeReactions: includeReactions)
let subID = nextSubscriptionID
nextSubscriptionID += 1
let localStore = store
let localWatcher = watcher
let localCache = cache
let localWriter = output
let localFilter = filter
let localChatID = chatID
let localSinceRowID = sinceRowID
let localConfig = config
let localIncludeAttachments = includeAttachments
let task = Task {
do {
for try await message in localWatcher.stream(
chatID: localChatID,
sinceRowID: localSinceRowID,
configuration: localConfig
) {
if Task.isCancelled { return }
if !localFilter.allows(message) { continue }
let payload = try buildMessagePayload(
store: localStore,
cache: localCache,
message: message,
includeAttachments: localIncludeAttachments
)
localWriter.sendNotification(
method: "message",
params: ["subscription": subID, "message": payload]
)
}
} catch {
localWriter.sendNotification(
method: "error",
params: [
"subscription": subID,
"error": ["message": String(describing: error)],
]
)
}
}
subscriptions[subID] = task
respond(id: id, result: ["subscription": subID])
try await handleWatchSubscribe(id: id, params: params)
case "watch.unsubscribe":
guard let subID = intParam(params["subscription"]) else {
throw RPCError.invalidParams("subscription is required")
}
if let task = subscriptions.removeValue(forKey: subID) {
task.cancel()
}
respond(id: id, result: ["ok": true])
try await handleWatchUnsubscribe(id: id, params: params)
case "send":
try handleSend(params: params, id: id)
try await handleSend(params: params, id: id)
default:
output.sendError(id: id, error: RPCError.methodNotFound(method))
}
@ -213,7 +108,134 @@ final class RPCServer {
output.sendResponse(id: id, result: result)
}
private func handleSend(params: [String: Any], id: Any?) throws {
private func handleChatsList(id: Any?, params: [String: Any]) async throws {
let limit = intParam(params["limit"]) ?? 20
let chats = try store.listChats(limit: max(limit, 1))
var payloads: [[String: Any]] = []
payloads.reserveCapacity(chats.count)
for chat in chats {
let info = try await cache.info(chatID: chat.id)
let participants = try await cache.participants(chatID: chat.id)
let identifier = info?.identifier ?? chat.identifier
let guid = info?.guid ?? ""
let name = (info?.name.isEmpty == false ? info?.name : nil) ?? chat.name
let service = info?.service ?? chat.service
payloads.append(
chatPayload(
id: chat.id,
identifier: identifier,
guid: guid,
name: name,
service: service,
lastMessageAt: chat.lastMessageAt,
participants: participants
))
}
respond(id: id, result: ["chats": payloads])
}
private func handleMessagesHistory(id: Any?, params: [String: Any]) async throws {
guard let chatID = int64Param(params["chat_id"]) else {
throw RPCError.invalidParams("chat_id is required")
}
let limit = intParam(params["limit"]) ?? 50
let participants = stringArrayParam(params["participants"])
let startISO = stringParam(params["start"])
let endISO = stringParam(params["end"])
let includeAttachments = boolParam(params["attachments"]) ?? false
let filter = try MessageFilter.fromISO(
participants: participants,
startISO: startISO,
endISO: endISO
)
let filtered = try store.messages(chatID: chatID, limit: max(limit, 1), filter: filter)
var payloads: [[String: Any]] = []
payloads.reserveCapacity(filtered.count)
for message in filtered {
let payload = try await buildMessagePayload(
store: store,
cache: cache,
message: message,
includeAttachments: includeAttachments
)
payloads.append(payload)
}
respond(id: id, result: ["messages": payloads])
}
private func handleWatchSubscribe(id: Any?, params: [String: Any]) async throws {
let chatID = int64Param(params["chat_id"])
let sinceRowID = int64Param(params["since_rowid"])
let participants = stringArrayParam(params["participants"])
let startISO = stringParam(params["start"])
let endISO = stringParam(params["end"])
let includeAttachments = boolParam(params["attachments"]) ?? false
let includeReactions = boolParam(params["include_reactions"]) ?? false
let filter = try MessageFilter.fromISO(
participants: participants,
startISO: startISO,
endISO: endISO
)
let config = MessageWatcherConfiguration(includeReactions: includeReactions)
let subID = await subscriptions.allocateID()
let localStore = store
let localWatcher = watcher
let localCache = cache
let localWriter = output
let localFilter = filter
let localChatID = chatID
let localSinceRowID = sinceRowID
let localConfig = config
let localIncludeAttachments = includeAttachments
let task = Task {
do {
for try await message in localWatcher.stream(
chatID: localChatID,
sinceRowID: localSinceRowID,
configuration: localConfig
) {
if Task.isCancelled { return }
if !localFilter.allows(message) { continue }
let payload = try await buildMessagePayload(
store: localStore,
cache: localCache,
message: message,
includeAttachments: localIncludeAttachments
)
localWriter.sendNotification(
method: "message",
params: ["subscription": subID, "message": payload]
)
}
} catch {
localWriter.sendNotification(
method: "error",
params: [
"subscription": subID,
"error": ["message": String(describing: error)],
]
)
}
}
await subscriptions.insert(task, for: subID)
respond(id: id, result: ["subscription": subID])
}
private func handleWatchUnsubscribe(id: Any?, params: [String: Any]) async throws {
guard let subID = intParam(params["subscription"]) else {
throw RPCError.invalidParams("subscription is required")
}
if let task = await subscriptions.remove(subID) {
task.cancel()
}
respond(id: id, result: ["ok": true])
}
private func handleSend(params: [String: Any], id: Any?) async throws {
let text = stringParam(params["text"]) ?? ""
let file = stringParam(params["file"]) ?? ""
let serviceRaw = stringParam(params["service"]) ?? "auto"
@ -241,7 +263,7 @@ final class RPCServer {
var resolvedChatIdentifier = chatIdentifier
var resolvedChatGUID = chatGUID
if let chatID {
guard let info = try cache.info(chatID: chatID) else {
guard let info = try await cache.info(chatID: chatID) else {
throw RPCError.invalidParams("unknown chat_id \(chatID)")
}
resolvedChatIdentifier = info.identifier
@ -272,12 +294,12 @@ private func buildMessagePayload(
cache: ChatCache,
message: Message,
includeAttachments: Bool
) throws -> [String: Any] {
let chatInfo = try cache.info(chatID: message.chatID)
let participants = try cache.participants(chatID: message.chatID)
) async throws -> [String: Any] {
let chatInfo = try await cache.info(chatID: message.chatID)
let participants = try await cache.participants(chatID: message.chatID)
let attachments = includeAttachments ? try store.attachments(for: message.rowID) : []
let reactions = includeAttachments ? try store.reactions(for: message.rowID) : []
return messagePayload(
return try messagePayload(
message: message,
chatInfo: chatInfo,
participants: participants,
@ -286,7 +308,7 @@ private func buildMessagePayload(
)
}
private final class RPCWriter: RPCOutput, @unchecked Sendable {
private final class RPCWriter: RPCOutput, Sendable {
func sendResponse(id: Any, result: Any) {
send(["jsonrpc": "2.0", "id": id, "result": result])
}
@ -355,7 +377,33 @@ struct RPCError: Error {
}
}
private final class ChatCache: @unchecked Sendable {
private actor SubscriptionStore {
private var nextID = 1
private var tasks: [Int: Task<Void, Never>] = [:]
func allocateID() -> Int {
let id = nextID
nextID += 1
return id
}
func insert(_ task: Task<Void, Never>, for id: Int) {
tasks[id] = task
}
func remove(_ id: Int) -> Task<Void, Never>? {
tasks.removeValue(forKey: id)
}
func cancelAll() {
for task in tasks.values {
task.cancel()
}
tasks.removeAll()
}
}
private actor ChatCache {
private let store: MessageStore
private var infoCache: [Int64: ChatInfo] = [:]
private var participantsCache: [Int64: [String]] = [:]

View File

@ -30,7 +30,7 @@ func chatPayloadIncludesParticipantsAndGroupFlag() {
}
@Test
func messagePayloadIncludesChatFields() {
func messagePayloadIncludesChatFields() throws {
let message = Message(
rowID: 5,
chatID: 10,
@ -71,7 +71,7 @@ func messagePayloadIncludesChatFields() {
date: Date(timeIntervalSince1970: 2),
associatedMessageID: 5
)
let payload = messagePayload(
let payload = try messagePayload(
message: message,
chatInfo: chatInfo,
participants: ["+111"],
@ -93,7 +93,7 @@ func messagePayloadIncludesChatFields() {
}
@Test
func messagePayloadOmitsEmptyReplyToGuid() {
func messagePayloadOmitsEmptyReplyToGuid() throws {
let message = Message(
rowID: 6,
chatID: 10,
@ -107,7 +107,7 @@ func messagePayloadOmitsEmptyReplyToGuid() {
guid: "msg-guid-6",
replyToGUID: nil
)
let payload = messagePayload(
let payload = try messagePayload(
message: message,
chatInfo: nil,
participants: [],