712 lines
27 KiB
Swift
712 lines
27 KiB
Swift
//
|
|
// Copyright 2021 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
//
|
|
|
|
import Foundation
|
|
import GRDB
|
|
import LibSignalClient
|
|
|
|
public class MessageProcessor {
|
|
|
|
private enum Constants {
|
|
// We want a value that is just high enough to yield perf benefits.
|
|
static let incomingMessageBatchLimit = 16
|
|
|
|
static let incomingReceiptBatchLimit = 32
|
|
}
|
|
|
|
public static let messageProcessorDidDrainQueue = Notification.Name("messageProcessorDidDrainQueue")
|
|
|
|
private var hasPendingEnvelopes: Bool {
|
|
!pendingEnvelopes.isEmpty
|
|
}
|
|
|
|
public struct Stages: OptionSet {
|
|
public var rawValue: UInt8
|
|
|
|
public init(rawValue: UInt8) {
|
|
self.rawValue = rawValue
|
|
}
|
|
|
|
public static let messageFetcher = Stages(rawValue: 1 << 0)
|
|
public static let messageProcessor = Stages(rawValue: 1 << 1)
|
|
public static let groupMessageProcessor = Stages(rawValue: 1 << 2)
|
|
}
|
|
|
|
public func waitForFetchingAndProcessing(stages: Stages = [.messageFetcher, .messageProcessor, .groupMessageProcessor]) async throws(CancellationError) {
|
|
guard CurrentAppContext().shouldProcessIncomingMessages else {
|
|
return
|
|
}
|
|
|
|
var preconditions = [any Precondition]()
|
|
if stages.contains(.messageFetcher) {
|
|
preconditions.append(SSKEnvironment.shared.messageFetcherJobRef.preconditionForFetchingComplete())
|
|
}
|
|
if stages.contains(.messageProcessor) {
|
|
preconditions.append(NotificationPrecondition(
|
|
notificationName: Self.messageProcessorDidDrainQueue,
|
|
isSatisfied: { !self.hasPendingEnvelopes },
|
|
))
|
|
}
|
|
if stages.contains(.groupMessageProcessor) {
|
|
preconditions.append(NotificationPrecondition(
|
|
notificationName: GroupMessageProcessorManager.didFlushGroupsV2MessageQueue,
|
|
isSatisfied: { !SSKEnvironment.shared.groupMessageProcessorManagerRef.isProcessing() },
|
|
))
|
|
}
|
|
try await Preconditions(preconditions).waitUntilSatisfied()
|
|
}
|
|
|
|
private let appReadiness: AppReadiness
|
|
|
|
public init(appReadiness: AppReadiness) {
|
|
self.appReadiness = appReadiness
|
|
|
|
SwiftSingletons.register(self)
|
|
|
|
appReadiness.runNowOrWhenAppDidBecomeReadySync {
|
|
SSKEnvironment.shared.messagePipelineSupervisorRef.register(pipelineStage: self)
|
|
|
|
NotificationCenter.default.addObserver(
|
|
self,
|
|
selector: #selector(self.registrationStateDidChange),
|
|
name: .registrationStateDidChange,
|
|
object: nil,
|
|
)
|
|
}
|
|
}
|
|
|
|
public func enqueueReceivedEnvelopeData(
|
|
_ envelopeData: Data,
|
|
serverDeliveryTimestamp: UInt64,
|
|
envelopeSource: EnvelopeSource,
|
|
completion: @escaping () -> Void,
|
|
) {
|
|
self.queueForEnqueueing.async {
|
|
self._enqueueReceivedEnvelopeData(
|
|
envelopeData,
|
|
serverDeliveryTimestamp: serverDeliveryTimestamp,
|
|
envelopeSource: envelopeSource,
|
|
completion: completion,
|
|
)
|
|
}
|
|
}
|
|
|
|
public func flushEnqueuingQueue(completion: @escaping () -> Void) {
|
|
self.queueForEnqueueing.async(completion)
|
|
}
|
|
|
|
private func _enqueueReceivedEnvelopeData(
|
|
_ envelopeData: Data,
|
|
serverDeliveryTimestamp: UInt64,
|
|
envelopeSource: EnvelopeSource,
|
|
completion: @escaping () -> Void,
|
|
) {
|
|
assertOnQueue(self.queueForEnqueueing)
|
|
|
|
guard !envelopeData.isEmpty else {
|
|
owsFailDebug("Empty envelope, envelopeSource: \(envelopeSource).")
|
|
completion()
|
|
return
|
|
}
|
|
|
|
let protoEnvelope: SSKProtoEnvelope
|
|
do {
|
|
protoEnvelope = try SSKProtoEnvelope(serializedData: envelopeData)
|
|
} catch {
|
|
owsFailDebug("Failed to parse encrypted envelope \(error), envelopeSource: \(envelopeSource)")
|
|
completion()
|
|
return
|
|
}
|
|
|
|
// Drop any too-large messages on the floor. Well behaving clients should never send them.
|
|
guard (protoEnvelope.content ?? Data()).count <= Self.maxEnvelopeByteCount else {
|
|
owsFailDebug("Oversize envelope, envelopeSource: \(envelopeSource).")
|
|
completion()
|
|
return
|
|
}
|
|
|
|
enqueueReceivedEnvelope(
|
|
ReceivedEnvelope(
|
|
envelope: protoEnvelope,
|
|
serverDeliveryTimestamp: serverDeliveryTimestamp,
|
|
completion: completion,
|
|
),
|
|
envelopeSource: envelopeSource,
|
|
)
|
|
}
|
|
|
|
private func enqueueReceivedEnvelope(_ receivedEnvelope: ReceivedEnvelope, envelopeSource: EnvelopeSource) {
|
|
pendingEnvelopes.enqueue(receivedEnvelope)
|
|
drainPendingEnvelopes()
|
|
}
|
|
|
|
private static let maxEnvelopeByteCount = 256 * 1024
|
|
|
|
private let queueForEnqueueing = DispatchQueue(label: "org.signal.message-processor-enqueue")
|
|
private let queueForProcessing = DispatchQueue(label: "org.signal.message-processor-process", autoreleaseFrequency: .workItem)
|
|
|
|
#if TESTABLE_BUILD
|
|
var serialQueueForTests: DispatchQueue { queueForProcessing }
|
|
#endif
|
|
|
|
private var pendingEnvelopes = PendingEnvelopes()
|
|
|
|
private let isDrainingPendingEnvelopes = AtomicBool(false, lock: .init())
|
|
|
|
public func dropEnqueuedEnvelopes(completion: @escaping () -> Void) {
|
|
// Run this on queueForProcessing to ensure we're not going to drop
|
|
// envelopes that are currently being processed.
|
|
self.queueForProcessing.async {
|
|
self.pendingEnvelopes.removeAll()
|
|
completion()
|
|
}
|
|
}
|
|
|
|
private func drainPendingEnvelopes() {
|
|
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
|
|
|
|
guard CurrentAppContext().shouldProcessIncomingMessages else { return }
|
|
guard tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered else { return }
|
|
guard tsAccountManager.storedDeviceIdWithMaybeTransaction.ifValid != nil else { return }
|
|
guard SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted else { return }
|
|
|
|
queueForProcessing.async {
|
|
self.isDrainingPendingEnvelopes.set(true)
|
|
while autoreleasepool(invoking: { self.drainNextBatch() }) {}
|
|
self.isDrainingPendingEnvelopes.set(false)
|
|
if self.pendingEnvelopes.isEmpty {
|
|
NotificationCenter.default.postOnMainThread(name: Self.messageProcessorDidDrainQueue, object: nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
private var recentlyProcessedGuids = SetDeque<UUID>()
|
|
/// Should ideally match `MESSAGE_SENDER_MAX_CONCURRENCY`.
|
|
private var recentlyProcessedGuidLimit = 256
|
|
|
|
/// Returns whether or not to continue draining the queue.
|
|
private func drainNextBatch() -> Bool {
|
|
assertOnQueue(queueForProcessing)
|
|
|
|
guard SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted else {
|
|
return false
|
|
}
|
|
|
|
// If the app is in the background, use batch size of 1.
|
|
// This reduces the risk of us never being able to drain any
|
|
// messages from the queue. We should fine tune this number
|
|
// to yield the best perf we can get.
|
|
let batchLimitUpperBound = CurrentAppContext().isInBackground() ? 1 : max(
|
|
Constants.incomingMessageBatchLimit,
|
|
Constants.incomingReceiptBatchLimit,
|
|
)
|
|
let batch = pendingEnvelopes.nextBatch(batchSize: batchLimitUpperBound)
|
|
let batchEnvelopes = batch.batchEnvelopes
|
|
let pendingEnvelopesCount = batch.pendingEnvelopesCount
|
|
|
|
if pendingEnvelopesCount > self.recentlyProcessedGuidLimit {
|
|
self.recentlyProcessedGuidLimit = pendingEnvelopesCount
|
|
}
|
|
|
|
guard !batchEnvelopes.isEmpty else {
|
|
return false
|
|
}
|
|
|
|
var startTime: CFTimeInterval = 0
|
|
|
|
var processedEnvelopesCount = 0
|
|
|
|
var messageCount = 0
|
|
var receiptCount = 0
|
|
|
|
SSKEnvironment.shared.databaseStorageRef.write { tx in
|
|
// Start the timer once we acquire a write transaction.
|
|
startTime = CACurrentMediaTime()
|
|
|
|
// This is only called via `drainPendingEnvelopes`, and that confirms that
|
|
// we're registered. Consequently, this generally shouldn't fail.
|
|
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
|
|
guard let registeredState = try? tsAccountManager.registeredState(tx: tx) else {
|
|
return
|
|
}
|
|
guard let localDeviceId = tsAccountManager.storedDeviceId(tx: tx).ifValid else {
|
|
return
|
|
}
|
|
|
|
var remainingEnvelopes = batchEnvelopes[...]
|
|
while
|
|
!remainingEnvelopes.isEmpty,
|
|
messageCount < Constants.incomingMessageBatchLimit,
|
|
receiptCount < Constants.incomingReceiptBatchLimit
|
|
{
|
|
guard SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted else {
|
|
break
|
|
}
|
|
autoreleasepool {
|
|
// If we build a request, we must handle it to ensure it's not lost if we
|
|
// stop processing envelopes.
|
|
let relatedRequests = buildNextCombinedRequest(
|
|
envelopes: &remainingEnvelopes,
|
|
localIdentifiers: registeredState.localIdentifiers,
|
|
localDeviceId: localDeviceId,
|
|
tx: tx,
|
|
)
|
|
if relatedRequests.first?.deliveryReceiptMessageTimestamps != nil {
|
|
receiptCount += relatedRequests.count
|
|
} else {
|
|
messageCount += relatedRequests.count
|
|
}
|
|
handle(
|
|
relatedRequests: relatedRequests,
|
|
registeredState: registeredState,
|
|
transaction: tx,
|
|
)
|
|
}
|
|
}
|
|
processedEnvelopesCount += batchEnvelopes.count - remainingEnvelopes.count
|
|
}
|
|
for processedEnvelope in batchEnvelopes.prefix(processedEnvelopesCount) {
|
|
guard let serverGuid = ValidatedIncomingEnvelope.parseServerGuid(fromEnvelope: processedEnvelope.envelope) else {
|
|
continue
|
|
}
|
|
recentlyProcessedGuids.pushBack(serverGuid)
|
|
}
|
|
while recentlyProcessedGuids.count > recentlyProcessedGuidLimit {
|
|
_ = recentlyProcessedGuids.popFront()
|
|
}
|
|
// The groups processing logic relies on `removeProcessedEnvelopes` being
|
|
// called after the `write`'s `addSyncCompletion` blocks.
|
|
pendingEnvelopes.removeProcessedEnvelopes(processedEnvelopesCount)
|
|
let endTime = CACurrentMediaTime()
|
|
let formattedDuration = String(format: "%.1f", (endTime - startTime) * 1000)
|
|
Logger.info("Processed \(processedEnvelopesCount) envelopes (of \(pendingEnvelopesCount) total) in \(formattedDuration)ms")
|
|
return true
|
|
}
|
|
|
|
// If envelopes is not empty, this will emit a single request for a non-delivery receipt or one or more requests
|
|
// all for delivery receipts.
|
|
private func buildNextCombinedRequest(
|
|
envelopes: inout ArraySlice<ReceivedEnvelope>,
|
|
localIdentifiers: LocalIdentifiers,
|
|
localDeviceId: DeviceId,
|
|
tx: DBWriteTransaction,
|
|
) -> [ProcessingRequest] {
|
|
var results = [ProcessingRequest]()
|
|
while let envelope = envelopes.first {
|
|
envelopes.removeFirst()
|
|
let request = processingRequest(
|
|
for: envelope,
|
|
localIdentifiers: localIdentifiers,
|
|
localDeviceId: localDeviceId,
|
|
tx: tx,
|
|
)
|
|
results.append(request)
|
|
if request.deliveryReceiptMessageTimestamps == nil {
|
|
// If we hit a non-delivery receipt envelope, handle it immediately to avoid
|
|
// keeping potentially large decrypted envelopes in memory.
|
|
break
|
|
}
|
|
}
|
|
return results
|
|
}
|
|
|
|
private func handle(relatedRequests: [ProcessingRequest], registeredState: RegisteredState, transaction: DBWriteTransaction) {
|
|
// Efficiently handle delivery receipts for the same message by fetching the sent message only
|
|
// once and only using one updateWith... to update the message with new recipient state.
|
|
BatchingDeliveryReceiptContext.withDeferredUpdates(transaction: transaction) { context in
|
|
for request in relatedRequests {
|
|
handleProcessingRequest(request, context: context, registeredState: registeredState, tx: transaction)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func reallyHandleProcessingRequest(
|
|
_ request: ProcessingRequest,
|
|
context: DeliveryReceiptContext,
|
|
registeredState: RegisteredState,
|
|
transaction: DBWriteTransaction,
|
|
) {
|
|
switch request.state {
|
|
case .completed(error: let error):
|
|
Logger.info("Envelope completed early with error \(String(describing: error))")
|
|
case .enqueueForGroup(let decryptedEnvelope, let envelopeData):
|
|
SSKEnvironment.shared.groupMessageProcessorManagerRef.enqueue(
|
|
envelope: decryptedEnvelope,
|
|
envelopeData: envelopeData,
|
|
serverDeliveryTimestamp: request.receivedEnvelope.serverDeliveryTimestamp,
|
|
tx: transaction,
|
|
)
|
|
SSKEnvironment.shared.messageReceiverRef.finishProcessingEnvelope(decryptedEnvelope, tx: transaction)
|
|
case .messageReceiverRequest(let messageReceiverRequest):
|
|
SSKEnvironment.shared.messageReceiverRef.handleRequest(messageReceiverRequest, context: context, registeredState: registeredState, tx: transaction)
|
|
SSKEnvironment.shared.messageReceiverRef.finishProcessingEnvelope(messageReceiverRequest.decryptedEnvelope, tx: transaction)
|
|
case .clearPlaceholdersOnly(let decryptedEnvelope):
|
|
SSKEnvironment.shared.messageReceiverRef.finishProcessingEnvelope(decryptedEnvelope, tx: transaction)
|
|
case .serverReceipt(let serverReceiptEnvelope):
|
|
SSKEnvironment.shared.messageReceiverRef.handleDeliveryReceipt(envelope: serverReceiptEnvelope, context: context, tx: transaction)
|
|
}
|
|
}
|
|
|
|
private func handleProcessingRequest(
|
|
_ request: ProcessingRequest,
|
|
context: DeliveryReceiptContext,
|
|
registeredState: RegisteredState,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
reallyHandleProcessingRequest(request, context: context, registeredState: registeredState, transaction: tx)
|
|
tx.addSyncCompletion { request.receivedEnvelope.completion() }
|
|
}
|
|
|
|
@objc
|
|
private func registrationStateDidChange() {
|
|
self.drainPendingEnvelopes()
|
|
}
|
|
}
|
|
|
|
private struct ProcessingRequest {
|
|
enum State {
|
|
case completed(error: Error?)
|
|
case enqueueForGroup(decryptedEnvelope: DecryptedIncomingEnvelope, envelopeData: Data)
|
|
case messageReceiverRequest(MessageReceiverRequest)
|
|
case serverReceipt(ServerReceiptEnvelope)
|
|
// Message decrypted but had an invalid protobuf.
|
|
case clearPlaceholdersOnly(DecryptedIncomingEnvelope)
|
|
}
|
|
|
|
let receivedEnvelope: ReceivedEnvelope
|
|
let state: State
|
|
|
|
// If this request is for a delivery receipt, return the timestamps for the sent-messages it
|
|
// corresponds to.
|
|
var deliveryReceiptMessageTimestamps: [UInt64]? {
|
|
switch state {
|
|
case .completed, .enqueueForGroup, .clearPlaceholdersOnly:
|
|
return nil
|
|
case .serverReceipt(let envelope):
|
|
return [envelope.validatedEnvelope.timestamp]
|
|
case .messageReceiverRequest(let request):
|
|
guard
|
|
case .receiptMessage = request.messageType,
|
|
let receiptMessage = request.protoContent.receiptMessage,
|
|
receiptMessage.type == .delivery
|
|
else {
|
|
return nil
|
|
}
|
|
return receiptMessage.timestamp
|
|
}
|
|
}
|
|
|
|
init(_ receivedEnvelope: ReceivedEnvelope, state: State) {
|
|
self.receivedEnvelope = receivedEnvelope
|
|
self.state = state
|
|
}
|
|
}
|
|
|
|
private struct ProcessingRequestBuilder {
|
|
let receivedEnvelope: ReceivedEnvelope
|
|
let blockingManager: BlockingManager
|
|
let localDeviceId: DeviceId
|
|
let localIdentifiers: LocalIdentifiers
|
|
let messageDecrypter: OWSMessageDecrypter
|
|
let messageReceiver: MessageReceiver
|
|
|
|
init(
|
|
_ receivedEnvelope: ReceivedEnvelope,
|
|
blockingManager: BlockingManager,
|
|
localDeviceId: DeviceId,
|
|
localIdentifiers: LocalIdentifiers,
|
|
messageDecrypter: OWSMessageDecrypter,
|
|
messageReceiver: MessageReceiver,
|
|
) {
|
|
self.receivedEnvelope = receivedEnvelope
|
|
self.blockingManager = blockingManager
|
|
self.localDeviceId = localDeviceId
|
|
self.localIdentifiers = localIdentifiers
|
|
self.messageDecrypter = messageDecrypter
|
|
self.messageReceiver = messageReceiver
|
|
}
|
|
|
|
func build(tx: DBWriteTransaction) -> ProcessingRequest.State {
|
|
do {
|
|
let decryptionResult = try receivedEnvelope.decryptIfNeeded(
|
|
messageDecrypter: messageDecrypter,
|
|
localIdentifiers: localIdentifiers,
|
|
localDeviceId: localDeviceId,
|
|
tx: tx,
|
|
)
|
|
switch decryptionResult {
|
|
case .serverReceipt(let receiptEnvelope):
|
|
return .serverReceipt(receiptEnvelope)
|
|
case .decryptedMessage(let decryptedEnvelope):
|
|
return processingRequest(for: decryptedEnvelope, tx: tx)
|
|
}
|
|
} catch {
|
|
return .completed(error: error)
|
|
}
|
|
}
|
|
|
|
private enum ProcessingStep {
|
|
case discard
|
|
case enqueueForGroupProcessing
|
|
case processNow(shouldDiscardVisibleMessages: Bool)
|
|
}
|
|
|
|
private func processingStep(
|
|
for decryptedEnvelope: DecryptedIncomingEnvelope,
|
|
tx: DBWriteTransaction,
|
|
) -> ProcessingStep {
|
|
guard
|
|
let contentProto = decryptedEnvelope.content,
|
|
let groupContextV2 = GroupMessageProcessorManager.groupContextV2(from: contentProto)
|
|
else {
|
|
// Non-v2-group messages can be processed immediately.
|
|
return .processNow(shouldDiscardVisibleMessages: false)
|
|
}
|
|
|
|
guard
|
|
GroupMessageProcessorManager.canContextBeProcessedImmediately(
|
|
groupContext: groupContextV2,
|
|
tx: tx,
|
|
)
|
|
else {
|
|
// Some v2 group messages required group state to be
|
|
// updated before they can be processed.
|
|
return .enqueueForGroupProcessing
|
|
}
|
|
let discardMode = SpecificGroupMessageProcessor.discardMode(
|
|
forMessageFrom: decryptedEnvelope.sourceAci,
|
|
groupContext: groupContextV2,
|
|
tx: tx,
|
|
)
|
|
switch discardMode {
|
|
case .discard:
|
|
// Some v2 group messages should be discarded and not processed.
|
|
return .discard
|
|
case .doNotDiscard:
|
|
return .processNow(shouldDiscardVisibleMessages: false)
|
|
case .discardVisibleMessages:
|
|
// Some v2 group messages should be processed, but discarding any "visible"
|
|
// messages, e.g. text messages or calls.
|
|
return .processNow(shouldDiscardVisibleMessages: true)
|
|
}
|
|
}
|
|
|
|
private func processingRequest(
|
|
for decryptedEnvelope: DecryptedIncomingEnvelope,
|
|
tx: DBWriteTransaction,
|
|
) -> ProcessingRequest.State {
|
|
owsPrecondition(CurrentAppContext().shouldProcessIncomingMessages)
|
|
|
|
// Pre-processing has to happen during the same transaction that performed
|
|
// decryption.
|
|
messageReceiver.preprocessEnvelope(decryptedEnvelope, tx: tx)
|
|
|
|
// If the sender is in the block list, we can skip scheduling any additional processing.
|
|
let sourceAddress = SignalServiceAddress(decryptedEnvelope.sourceAci)
|
|
if blockingManager.isAddressBlocked(sourceAddress, transaction: tx) {
|
|
Logger.info("Skipping processing for blocked envelope from \(decryptedEnvelope.sourceAci)")
|
|
return .completed(error: MessageProcessingError.blockedSender)
|
|
}
|
|
|
|
if decryptedEnvelope.localIdentity == .pni {
|
|
let identityManager = DependenciesBridge.shared.identityManager
|
|
identityManager.setShouldSharePhoneNumber(with: decryptedEnvelope.sourceAci, tx: tx)
|
|
}
|
|
|
|
switch processingStep(for: decryptedEnvelope, tx: tx) {
|
|
case .discard:
|
|
// Do nothing.
|
|
return .completed(error: nil)
|
|
|
|
case .enqueueForGroupProcessing:
|
|
// If we can't process the message immediately, we enqueue it for
|
|
// for processing in the same transaction within which it was decrypted
|
|
// to prevent data loss.
|
|
let envelopeData: Data
|
|
do {
|
|
envelopeData = try decryptedEnvelope.envelope.serializedData()
|
|
} catch {
|
|
owsFailDebug("failed to reserialize envelope: \(error)")
|
|
return .completed(error: error)
|
|
}
|
|
return .enqueueForGroup(decryptedEnvelope: decryptedEnvelope, envelopeData: envelopeData)
|
|
|
|
case .processNow(let shouldDiscardVisibleMessages):
|
|
// Envelopes can be processed immediately if they're:
|
|
// 1. Not a GV2 message.
|
|
// 2. A GV2 message that doesn't require updating the group.
|
|
//
|
|
// The advantage to processing the message immediately is that we can full
|
|
// process the message in the same transaction that we used to decrypt it.
|
|
// This results in a significant perf benefit verse queueing the message
|
|
// and waiting for that queue to open new transactions and process
|
|
// messages. The downside is that if we *fail* to process this message
|
|
// (e.g. the app crashed or was killed), we'll have to re-decrypt again
|
|
// before we process. This is safe since the decrypt operation would also
|
|
// be rolled back (since the transaction didn't commit) and should be rare.
|
|
messageReceiver.checkForUnknownLinkedDevice(in: decryptedEnvelope, tx: tx)
|
|
|
|
let buildResult = MessageReceiverRequest.buildRequest(
|
|
for: decryptedEnvelope,
|
|
serverDeliveryTimestamp: receivedEnvelope.serverDeliveryTimestamp,
|
|
shouldDiscardVisibleMessages: shouldDiscardVisibleMessages,
|
|
tx: tx,
|
|
)
|
|
|
|
switch buildResult {
|
|
case .discard:
|
|
return .completed(error: nil)
|
|
case .noContent:
|
|
return .clearPlaceholdersOnly(decryptedEnvelope)
|
|
case .request(let messageReceiverRequest):
|
|
return .messageReceiverRequest(messageReceiverRequest)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private extension MessageProcessor {
|
|
func processingRequest(
|
|
for envelope: ReceivedEnvelope,
|
|
localIdentifiers: LocalIdentifiers,
|
|
localDeviceId: DeviceId,
|
|
tx: DBWriteTransaction,
|
|
) -> ProcessingRequest {
|
|
assertOnQueue(queueForProcessing)
|
|
if let serverGuid = ValidatedIncomingEnvelope.parseServerGuid(fromEnvelope: envelope.envelope), recentlyProcessedGuids.contains(serverGuid) {
|
|
return ProcessingRequest(envelope, state: .completed(error: OWSGenericError("Skipping because it was recently processed.")))
|
|
}
|
|
let builder = ProcessingRequestBuilder(
|
|
envelope,
|
|
blockingManager: SSKEnvironment.shared.blockingManagerRef,
|
|
localDeviceId: localDeviceId,
|
|
localIdentifiers: localIdentifiers,
|
|
messageDecrypter: SSKEnvironment.shared.messageDecrypterRef,
|
|
messageReceiver: SSKEnvironment.shared.messageReceiverRef,
|
|
)
|
|
return ProcessingRequest(envelope, state: builder.build(tx: tx))
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
extension MessageProcessor: MessageProcessingPipelineStage {
|
|
public func supervisorDidResumeMessageProcessing(_ supervisor: MessagePipelineSupervisor) {
|
|
drainPendingEnvelopes()
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
private struct ReceivedEnvelope {
|
|
let envelope: SSKProtoEnvelope
|
|
let serverDeliveryTimestamp: UInt64
|
|
let completion: () -> Void
|
|
|
|
enum DecryptionResult {
|
|
case serverReceipt(ServerReceiptEnvelope)
|
|
case decryptedMessage(DecryptedIncomingEnvelope)
|
|
}
|
|
|
|
func decryptIfNeeded(
|
|
messageDecrypter: OWSMessageDecrypter,
|
|
localIdentifiers: LocalIdentifiers,
|
|
localDeviceId: DeviceId,
|
|
tx: DBWriteTransaction,
|
|
) throws -> DecryptionResult {
|
|
// Figure out what type of envelope we're dealing with.
|
|
let validatedEnvelope = try ValidatedIncomingEnvelope(envelope, localIdentifiers: localIdentifiers)
|
|
|
|
switch validatedEnvelope.kind {
|
|
case .serverReceipt:
|
|
return .serverReceipt(try ServerReceiptEnvelope(validatedEnvelope))
|
|
case .identifiedSender(let cipherType):
|
|
return .decryptedMessage(
|
|
try messageDecrypter.decryptIdentifiedEnvelope(
|
|
validatedEnvelope,
|
|
cipherType: cipherType,
|
|
localIdentifiers: localIdentifiers,
|
|
localDeviceId: localDeviceId,
|
|
tx: tx,
|
|
),
|
|
)
|
|
case .unidentifiedSender:
|
|
return .decryptedMessage(
|
|
try messageDecrypter.decryptUnidentifiedSenderEnvelope(
|
|
validatedEnvelope,
|
|
localIdentifiers: localIdentifiers,
|
|
localDeviceId: localDeviceId,
|
|
tx: tx,
|
|
),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
public enum EnvelopeSource {
|
|
case unknown
|
|
case websocketIdentified
|
|
case websocketUnidentified
|
|
case debugUI
|
|
case tests
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
private class PendingEnvelopes {
|
|
private let unfairLock = UnfairLock()
|
|
private var pendingEnvelopes = [ReceivedEnvelope]()
|
|
|
|
var isEmpty: Bool {
|
|
unfairLock.withLock { pendingEnvelopes.isEmpty }
|
|
}
|
|
|
|
var count: Int {
|
|
unfairLock.withLock { pendingEnvelopes.count }
|
|
}
|
|
|
|
struct Batch {
|
|
let batchEnvelopes: [ReceivedEnvelope]
|
|
let pendingEnvelopesCount: Int
|
|
}
|
|
|
|
func nextBatch(batchSize: Int) -> Batch {
|
|
unfairLock.withLock {
|
|
Batch(
|
|
batchEnvelopes: Array(pendingEnvelopes.prefix(batchSize)),
|
|
pendingEnvelopesCount: pendingEnvelopes.count,
|
|
)
|
|
}
|
|
}
|
|
|
|
func removeProcessedEnvelopes(_ processedEnvelopesCount: Int) {
|
|
unfairLock.withLock {
|
|
pendingEnvelopes.removeFirst(processedEnvelopesCount)
|
|
}
|
|
}
|
|
|
|
func removeAll() {
|
|
unfairLock.withLock {
|
|
pendingEnvelopes.removeAll()
|
|
}
|
|
}
|
|
|
|
func enqueue(_ receivedEnvelope: ReceivedEnvelope) {
|
|
unfairLock.withLock {
|
|
pendingEnvelopes.append(receivedEnvelope)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
public enum MessageProcessingError: Error {
|
|
case wrongDestinationUuid
|
|
case invalidMessageTypeForDestinationUuid
|
|
case blockedSender
|
|
}
|